diff --git a/.gitignore b/.gitignore
index 9024e6c..d721702 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1,164 @@
-**/secret.py
\ No newline at end of file
+**/secret.py
+
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+share/python-wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+MANIFEST
+
+# PyInstaller
+# Usually these files are written by a python script from a template
+# before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.nox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*.cover
+*.py,cover
+.hypothesis/
+.pytest_cache/
+cover/
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+local_settings.py
+db.sqlite3
+db.sqlite3-journal
+
+# Flask stuff:
+instance/
+.webassets-cache
+
+# Scrapy stuff:
+.scrapy
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+.pybuilder/
+target/
+
+# Jupyter Notebook
+.ipynb_checkpoints
+
+# IPython
+profile_default/
+ipython_config.py
+
+# pyenv
+# For a library or package, you might want to ignore these files since the code is
+# intended to run in multiple environments; otherwise, check them in:
+# .python-version
+
+# pipenv
+# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
+# However, in case of collaboration, if having platform-specific dependencies or dependencies
+# having no cross-platform support, pipenv may install dependencies that don't work, or not
+# install all needed dependencies.
+#Pipfile.lock
+
+# poetry
+# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
+# This is especially recommended for binary packages to ensure reproducibility, and is more
+# commonly ignored for libraries.
+# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
+#poetry.lock
+
+# pdm
+# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
+#pdm.lock
+# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
+# in version control.
+# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
+.pdm.toml
+.pdm-python
+.pdm-build/
+
+# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
+__pypackages__/
+
+# Celery stuff
+celerybeat-schedule
+celerybeat.pid
+
+# SageMath parsed files
+*.sage.py
+
+# Environments
+.env
+.venv
+env/
+venv/
+ENV/
+env.bak/
+venv.bak/
+
+# Spyder project settings
+.spyderproject
+.spyproject
+
+# Rope project settings
+.ropeproject
+
+# mkdocs documentation
+/site
+
+# mypy
+.mypy_cache/
+.dmypy.json
+dmypy.json
+
+# Pyre type checker
+.pyre/
+
+# pytype static type analyzer
+.pytype/
+
+# Cython debug symbols
+cython_debug/
+
+# PyCharm
+# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
+# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
+# and can be added to the global gitignore or merged into this file. For a more nuclear
+# option (not recommended) you can uncomment the following to ignore the entire idea folder.
+#.idea/
\ No newline at end of file
diff --git a/.vscode/settings.json b/.vscode/settings.json
new file mode 100644
index 0000000..a03b5fd
--- /dev/null
+++ b/.vscode/settings.json
@@ -0,0 +1,5 @@
+{
+ "python.testing.unittestArgs": ["-v", "-s", "./bot", "-p", "*_test.py"],
+ "python.testing.pytestEnabled": false,
+ "python.testing.unittestEnabled": true
+}
diff --git a/bot/bot.py b/bot/bot.py
index fb83d1d..ec3ef87 100644
--- a/bot/bot.py
+++ b/bot/bot.py
@@ -7,13 +7,15 @@ import re
from urllib.parse import urlparse, urlunparse
import random
-HAS_LINK_RE = r'(https?:\/\/[^\s]+|www\.[^\s]+)'
+from msgprocessor import TrackerRemovalMsgProcessor, TrackerRemovalProcessorMessage
+
+HAS_LINK_RE = r"(https?:\/\/[^\s]+|www\.[^\s]+)"
bot = AsyncTeleBot(TOKEN)
def extract_links(text: str):
- url_pattern = r'(https?://[^\s]+|www\.[^\s]+)'
+ url_pattern = r"(https?://[^\s]+|www\.[^\s]+)"
links = re.findall(url_pattern, text)
return links
@@ -45,10 +47,30 @@ async def start(msg: Message):
@bot.message_handler(func=lambda message: True)
async def got_message(msg: Message):
- if re.match(string=msg.text, pattern=HAS_LINK_RE):
- fixed_reply = process_text(msg.text)
- if fixed_reply:
- await bot.reply_to(msg, fixed_reply)
+ # god i love nones as fuck
+ if msg.text is None:
+ return
+ if msg.from_user is None:
+ return
+
+ tracker_removal_result = TrackerRemovalMsgProcessor(
+ TrackerRemovalProcessorMessage(fromUser=msg.from_user, text=msg.text)
+ ).process()
+
+ if not tracker_removal_result.needsToReply:
+ return
+
+ try:
+ await bot.delete_message(msg.chat.id, msg.id, timeout=5)
+ except Exception as e:
+ await bot.reply_to(
+ message=msg.id,
+ text="Uoghhhh, i am not an admin here? I can't cleanup this tracking(",
+ )
+ print(e, flush=True) # todo: логгер
+ return
+
+ await bot.send_message(msg.chat.id, tracker_removal_result.text, parse_mode="html")
async def main():
diff --git a/bot/exception.py b/bot/exception.py
new file mode 100644
index 0000000..ae8b18c
--- /dev/null
+++ b/bot/exception.py
@@ -0,0 +1,9 @@
+class UrlRemoverNotImplementedException(Exception):
+ def __init__(self, domain: str):
+ self.__base_message = "Url remover for domain not implemented"
+ self.domain = domain
+ super().__init__(self.__base_message)
+
+
+ def __str__(self):
+ return f'{self.__base_message}: {self.domain}'
diff --git a/bot/msgprocessor.py b/bot/msgprocessor.py
new file mode 100644
index 0000000..f7ad6d0
--- /dev/null
+++ b/bot/msgprocessor.py
@@ -0,0 +1,116 @@
+from typing import Callable
+from dataclasses import dataclass
+from exception import UrlRemoverNotImplementedException
+from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
+import re
+from telebot.types import User
+
+
+@dataclass(init=True, eq=True)
+class TrackerRemovalProcessorMessage:
+ fromUser: User
+ text: str
+
+
+@dataclass(frozen=True, init=True, eq=True)
+class TrackerRemovalResult:
+ needsToReply: bool
+ text: str
+
+
+class TrackerRemovalMsgProcessor:
+ def __init__(self, msg: TrackerRemovalProcessorMessage):
+ self.__msg = msg
+
+ def process(self) -> TrackerRemovalResult:
+ if not self.__remove_trackers_from_msg_urls():
+ return TrackerRemovalResult(
+ needsToReply=False, text=""
+ ) # дальнейшие трансформации смысла не имеют
+ self.__emplace_sender_into_msg_text()
+ return TrackerRemovalResult(
+ needsToReply=True, text=self.__msg.text
+ ) # сообщение было изменено, нужно ответ отослать
+
+ def __remove_trackers_from_msg_urls(self) -> bool:
+ trackers_extracted = False
+
+ # todo: вообще мы работаем с http и это юзкейс обскьюрный
+ # но ссылка может быть и без указания схемы, телега может распарсить
+ # просто строку через точки и в конце какой то домен верхнего уровня как ссылку
+ def is_url(url: str) -> bool:
+ SCHEMES = ["http://", "https://"]
+ return len([s for s in SCHEMES if url.startswith(s)]) != 0
+
+ SEPARATOR_CHARS = [" ", "\n"]
+ separator_regex = "(" + "|".join(SEPARATOR_CHARS) + ")"
+ lexems = re.split(separator_regex, self.__msg.text)
+ for i, l in enumerate(lexems):
+ if not is_url(l):
+ continue
+
+ removed_trackers_url = self.__remove_tracker(l)
+ if l == removed_trackers_url: # изменений урла не было
+ continue
+
+ trackers_extracted = True
+ lexems[i] = removed_trackers_url
+
+ self.__msg.text = "".join(lexems)
+ return trackers_extracted
+
+ @staticmethod
+ def __remove_tracker(url: str) -> str:
+ try:
+ parsed_url = urlparse(url)
+ except Exception:
+ return url
+ if parsed_url.hostname is None:
+ return url
+ hostname = str(parsed_url.hostname)
+ try:
+ return TrackerRemoverFactory.make_remover(hostname)(url)
+ except UrlRemoverNotImplementedException:
+ return url
+
+ def __emplace_sender_into_msg_text(self):
+ self.__msg.text = f'Message from {self.__msg.fromUser.first_name}:\n\n{self.__msg.text}'
+
+
+class TrackerRemoverFactory:
+ TrackerRemover = Callable[[str], str]
+
+ @staticmethod
+ def make_remover(domain: str) -> TrackerRemover:
+ @dataclass(frozen=True, init=True)
+ class RemoverIdentifyer:
+ domains: list[str]
+ remover: TrackerRemoverFactory.TrackerRemover
+
+ removers_by_domain = [
+ RemoverIdentifyer(
+ domains=["youtube.com", "youtu.be"],
+ remover=TrackerRemoverFactory.remove_yt_trackers,
+ )
+ ]
+ remover_one = [
+ r
+ for r in removers_by_domain
+ if len([d for d in r.domains if domain.endswith(d)]) != 0
+ ]
+ if len(remover_one) == 0:
+ raise UrlRemoverNotImplementedException(domain)
+ return remover_one[0].remover
+
+ @staticmethod
+ def remove_yt_trackers(url: str) -> str:
+ # todo: подумать как обобщить, мб билдер стратегии поиска трекера
+ # но эт сильно на потом
+ QUERY_PARAMS_TRACKER = "si"
+ parsed_url = urlparse(url)
+ query_params = parse_qs(parsed_url.query)
+ if QUERY_PARAMS_TRACKER in query_params:
+ del query_params[QUERY_PARAMS_TRACKER]
+ return urlunparse(
+ parsed_url._replace(query=urlencode(query_params, doseq=True))
+ )
diff --git a/bot/msgprocessor_test.py b/bot/msgprocessor_test.py
new file mode 100644
index 0000000..a01a9fb
--- /dev/null
+++ b/bot/msgprocessor_test.py
@@ -0,0 +1,124 @@
+import unittest
+from msgprocessor import (
+ TrackerRemovalMsgProcessor,
+ TrackerRemoverFactory,
+ TrackerRemovalProcessorMessage,
+ TrackerRemovalResult,
+)
+from dataclasses import dataclass
+
+
+@dataclass(init=True, eq=True)
+class TestUser:
+ id: int
+ first_name: str
+
+
+class TestRemoverFactory(unittest.TestCase):
+ factory = TrackerRemoverFactory()
+
+ def test_remove_strategy_constructor(self):
+ test_case_data = [
+ {"domain": "youtube.com", "remover": self.factory.remove_yt_trackers},
+ {
+ "domain": "lowerlevel.youtube.com",
+ "remover": self.factory.remove_yt_trackers,
+ },
+ {
+ "domain": "youtu.be",
+ "remover": self.factory.remove_yt_trackers,
+ },
+ {
+ "domain": "something.youtu.be",
+ "remover": self.factory.remove_yt_trackers,
+ },
+ ]
+ for test_case in test_case_data:
+ self.assertIs(
+ self.factory.make_remover(test_case["domain"]),
+ self.factory.remove_yt_trackers,
+ )
+
+ def test_remove_yt_si(self):
+ test_case_data = [
+ {
+ "url": "https://youtu.be/jNQXAC9IVRw?si=qLIZT1rvs99_jbgy",
+ "expected_url": "https://youtu.be/jNQXAC9IVRw",
+ },
+ {
+ "url": "https://youtu.be/jNQXAC9IVRw?si=qLIZT1rvs99_jbgy&t=16",
+ "expected_url": "https://youtu.be/jNQXAC9IVRw?t=16",
+ },
+ {
+ "url": "https://www.youtube.com/watch?v=jNQXAC9IVRw",
+ "expected_url": "https://www.youtube.com/watch?v=jNQXAC9IVRw",
+ },
+ {
+ "url": "http://www.youtube.com/watch?v=jNQXAC9IVRw&si=qLIZT1rvs99_jbgy&t=16",
+ "expected_url": "http://www.youtube.com/watch?v=jNQXAC9IVRw&t=16",
+ },
+ ]
+ for test_case in test_case_data:
+ self.assertEqual(
+ self.factory.remove_yt_trackers(test_case["url"]),
+ test_case["expected_url"],
+ )
+
+
+class TestRemovalMsgProcessor(unittest.TestCase):
+ def test_remove_links(self):
+ test_case_data = [
+ {
+ "msg_text": "https://youtu.be/jNQXAC9IVRw?si=qLIZT1rvs99_jbgy",
+ "sender": TestUser(id=123, first_name="Ghytro"),
+ "bot_responded": True,
+ "bot_response": 'Message from Ghytro:\n\nhttps://youtu.be/jNQXAC9IVRw',
+ },
+ {
+ "msg_text": "чекай https://youtu.be/jNQXAC9IVRw?si=qLIZT1rvs99_jbgy\nнаш слон хд",
+ "sender": TestUser(id=321, first_name="OllyHearn"),
+ "bot_responded": True,
+ "bot_response": 'Message from OllyHearn:\n\nчекай https://youtu.be/jNQXAC9IVRw\nнаш слон хд',
+ },
+ {
+ "msg_text": "а я такая нитакуся без si ссылки шлю сразу https://youtu.be/jNQXAC9IVRw и по нескольку штук\nhttp://www.youtube.com/watch?v=jNQXAC9IVRw&si=qLIZT1rvs99_jbgy&t=16 дада",
+ "sender": TestUser(id=321, first_name="OllyHearn"),
+ "bot_responded": True,
+ "bot_response": 'Message from OllyHearn:\n\nа я такая нитакуся без si ссылки шлю сразу https://youtu.be/jNQXAC9IVRw и по нескольку штук\nhttp://www.youtube.com/watch?v=jNQXAC9IVRw&t=16 дада',
+ },
+ {
+ "msg_text": "asdasdasdasdasdasdasd asdasd asdasd asdad sasa dadsas",
+ "sender": TestUser(id=123, first_name="Ghytro"),
+ "bot_responded": False,
+ "bot_response": "",
+ },
+ ]
+ for test_case in test_case_data:
+ result = TrackerRemovalMsgProcessor(
+ TrackerRemovalProcessorMessage(
+ fromUser=test_case["sender"], text=test_case["msg_text"]
+ )
+ ).process()
+ self.assertEqual(
+ result,
+ TrackerRemovalResult(
+ needsToReply=test_case["bot_responded"],
+ text=test_case["bot_response"],
+ ),
+ )
+
+
+if __name__ == "__main__":
+ test_classes_to_run = [TestRemoverFactory, TestRemovalMsgProcessor]
+
+ loader = unittest.TestLoader()
+
+ suites_list = []
+ for test_class in test_classes_to_run:
+ suite = loader.loadTestsFromTestCase(test_class)
+ suites_list.append(suite)
+
+ big_suite = unittest.TestSuite(suites_list)
+
+ runner = unittest.TextTestRunner()
+ results = runner.run(big_suite)