From 24844ea787d0ba4ef5b6a7a5892d3258b1caaec4 Mon Sep 17 00:00:00 2001 From: Michael Korobkov Date: Thu, 7 Nov 2024 19:27:37 +0000 Subject: [PATCH] extract links, delete incoming msg, add sender to reply --- .gitignore | 165 ++++++++++++++++++++++++++++++++++++++- .vscode/settings.json | 5 ++ bot/bot.py | 34 ++++++-- bot/exception.py | 9 +++ bot/msgprocessor.py | 116 +++++++++++++++++++++++++++ bot/msgprocessor_test.py | 124 +++++++++++++++++++++++++++++ 6 files changed, 446 insertions(+), 7 deletions(-) create mode 100644 .vscode/settings.json create mode 100644 bot/exception.py create mode 100644 bot/msgprocessor.py create mode 100644 bot/msgprocessor_test.py diff --git a/.gitignore b/.gitignore index 9024e6c..d721702 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,164 @@ -**/secret.py \ No newline at end of file +**/secret.py + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..a03b5fd --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "python.testing.unittestArgs": ["-v", "-s", "./bot", "-p", "*_test.py"], + "python.testing.pytestEnabled": false, + "python.testing.unittestEnabled": true +} diff --git a/bot/bot.py b/bot/bot.py index fb83d1d..ec3ef87 100644 --- a/bot/bot.py +++ b/bot/bot.py @@ -7,13 +7,15 @@ import re from urllib.parse import urlparse, urlunparse import random -HAS_LINK_RE = r'(https?:\/\/[^\s]+|www\.[^\s]+)' +from msgprocessor import TrackerRemovalMsgProcessor, TrackerRemovalProcessorMessage + +HAS_LINK_RE = r"(https?:\/\/[^\s]+|www\.[^\s]+)" bot = AsyncTeleBot(TOKEN) def extract_links(text: str): - url_pattern = r'(https?://[^\s]+|www\.[^\s]+)' + url_pattern = r"(https?://[^\s]+|www\.[^\s]+)" links = re.findall(url_pattern, text) return links @@ -45,10 +47,30 @@ async def start(msg: Message): @bot.message_handler(func=lambda message: True) async def got_message(msg: Message): - if re.match(string=msg.text, pattern=HAS_LINK_RE): - fixed_reply = process_text(msg.text) - if fixed_reply: - await bot.reply_to(msg, fixed_reply) + # god i love nones as fuck + if msg.text is None: + return + if msg.from_user is None: + return + + tracker_removal_result = TrackerRemovalMsgProcessor( + TrackerRemovalProcessorMessage(fromUser=msg.from_user, text=msg.text) + ).process() + + if not tracker_removal_result.needsToReply: + return + + try: + await bot.delete_message(msg.chat.id, msg.id, timeout=5) + except Exception as e: + await bot.reply_to( + message=msg.id, + text="Uoghhhh, i am not an admin here? I can't cleanup this tracking(", + ) + print(e, flush=True) # todo: логгер + return + + await bot.send_message(msg.chat.id, tracker_removal_result.text, parse_mode="html") async def main(): diff --git a/bot/exception.py b/bot/exception.py new file mode 100644 index 0000000..ae8b18c --- /dev/null +++ b/bot/exception.py @@ -0,0 +1,9 @@ +class UrlRemoverNotImplementedException(Exception): + def __init__(self, domain: str): + self.__base_message = "Url remover for domain not implemented" + self.domain = domain + super().__init__(self.__base_message) + + + def __str__(self): + return f'{self.__base_message}: {self.domain}' diff --git a/bot/msgprocessor.py b/bot/msgprocessor.py new file mode 100644 index 0000000..f7ad6d0 --- /dev/null +++ b/bot/msgprocessor.py @@ -0,0 +1,116 @@ +from typing import Callable +from dataclasses import dataclass +from exception import UrlRemoverNotImplementedException +from urllib.parse import urlparse, parse_qs, urlencode, urlunparse +import re +from telebot.types import User + + +@dataclass(init=True, eq=True) +class TrackerRemovalProcessorMessage: + fromUser: User + text: str + + +@dataclass(frozen=True, init=True, eq=True) +class TrackerRemovalResult: + needsToReply: bool + text: str + + +class TrackerRemovalMsgProcessor: + def __init__(self, msg: TrackerRemovalProcessorMessage): + self.__msg = msg + + def process(self) -> TrackerRemovalResult: + if not self.__remove_trackers_from_msg_urls(): + return TrackerRemovalResult( + needsToReply=False, text="" + ) # дальнейшие трансформации смысла не имеют + self.__emplace_sender_into_msg_text() + return TrackerRemovalResult( + needsToReply=True, text=self.__msg.text + ) # сообщение было изменено, нужно ответ отослать + + def __remove_trackers_from_msg_urls(self) -> bool: + trackers_extracted = False + + # todo: вообще мы работаем с http и это юзкейс обскьюрный + # но ссылка может быть и без указания схемы, телега может распарсить + # просто строку через точки и в конце какой то домен верхнего уровня как ссылку + def is_url(url: str) -> bool: + SCHEMES = ["http://", "https://"] + return len([s for s in SCHEMES if url.startswith(s)]) != 0 + + SEPARATOR_CHARS = [" ", "\n"] + separator_regex = "(" + "|".join(SEPARATOR_CHARS) + ")" + lexems = re.split(separator_regex, self.__msg.text) + for i, l in enumerate(lexems): + if not is_url(l): + continue + + removed_trackers_url = self.__remove_tracker(l) + if l == removed_trackers_url: # изменений урла не было + continue + + trackers_extracted = True + lexems[i] = removed_trackers_url + + self.__msg.text = "".join(lexems) + return trackers_extracted + + @staticmethod + def __remove_tracker(url: str) -> str: + try: + parsed_url = urlparse(url) + except Exception: + return url + if parsed_url.hostname is None: + return url + hostname = str(parsed_url.hostname) + try: + return TrackerRemoverFactory.make_remover(hostname)(url) + except UrlRemoverNotImplementedException: + return url + + def __emplace_sender_into_msg_text(self): + self.__msg.text = f'Message from {self.__msg.fromUser.first_name}:\n\n{self.__msg.text}' + + +class TrackerRemoverFactory: + TrackerRemover = Callable[[str], str] + + @staticmethod + def make_remover(domain: str) -> TrackerRemover: + @dataclass(frozen=True, init=True) + class RemoverIdentifyer: + domains: list[str] + remover: TrackerRemoverFactory.TrackerRemover + + removers_by_domain = [ + RemoverIdentifyer( + domains=["youtube.com", "youtu.be"], + remover=TrackerRemoverFactory.remove_yt_trackers, + ) + ] + remover_one = [ + r + for r in removers_by_domain + if len([d for d in r.domains if domain.endswith(d)]) != 0 + ] + if len(remover_one) == 0: + raise UrlRemoverNotImplementedException(domain) + return remover_one[0].remover + + @staticmethod + def remove_yt_trackers(url: str) -> str: + # todo: подумать как обобщить, мб билдер стратегии поиска трекера + # но эт сильно на потом + QUERY_PARAMS_TRACKER = "si" + parsed_url = urlparse(url) + query_params = parse_qs(parsed_url.query) + if QUERY_PARAMS_TRACKER in query_params: + del query_params[QUERY_PARAMS_TRACKER] + return urlunparse( + parsed_url._replace(query=urlencode(query_params, doseq=True)) + ) diff --git a/bot/msgprocessor_test.py b/bot/msgprocessor_test.py new file mode 100644 index 0000000..a01a9fb --- /dev/null +++ b/bot/msgprocessor_test.py @@ -0,0 +1,124 @@ +import unittest +from msgprocessor import ( + TrackerRemovalMsgProcessor, + TrackerRemoverFactory, + TrackerRemovalProcessorMessage, + TrackerRemovalResult, +) +from dataclasses import dataclass + + +@dataclass(init=True, eq=True) +class TestUser: + id: int + first_name: str + + +class TestRemoverFactory(unittest.TestCase): + factory = TrackerRemoverFactory() + + def test_remove_strategy_constructor(self): + test_case_data = [ + {"domain": "youtube.com", "remover": self.factory.remove_yt_trackers}, + { + "domain": "lowerlevel.youtube.com", + "remover": self.factory.remove_yt_trackers, + }, + { + "domain": "youtu.be", + "remover": self.factory.remove_yt_trackers, + }, + { + "domain": "something.youtu.be", + "remover": self.factory.remove_yt_trackers, + }, + ] + for test_case in test_case_data: + self.assertIs( + self.factory.make_remover(test_case["domain"]), + self.factory.remove_yt_trackers, + ) + + def test_remove_yt_si(self): + test_case_data = [ + { + "url": "https://youtu.be/jNQXAC9IVRw?si=qLIZT1rvs99_jbgy", + "expected_url": "https://youtu.be/jNQXAC9IVRw", + }, + { + "url": "https://youtu.be/jNQXAC9IVRw?si=qLIZT1rvs99_jbgy&t=16", + "expected_url": "https://youtu.be/jNQXAC9IVRw?t=16", + }, + { + "url": "https://www.youtube.com/watch?v=jNQXAC9IVRw", + "expected_url": "https://www.youtube.com/watch?v=jNQXAC9IVRw", + }, + { + "url": "http://www.youtube.com/watch?v=jNQXAC9IVRw&si=qLIZT1rvs99_jbgy&t=16", + "expected_url": "http://www.youtube.com/watch?v=jNQXAC9IVRw&t=16", + }, + ] + for test_case in test_case_data: + self.assertEqual( + self.factory.remove_yt_trackers(test_case["url"]), + test_case["expected_url"], + ) + + +class TestRemovalMsgProcessor(unittest.TestCase): + def test_remove_links(self): + test_case_data = [ + { + "msg_text": "https://youtu.be/jNQXAC9IVRw?si=qLIZT1rvs99_jbgy", + "sender": TestUser(id=123, first_name="Ghytro"), + "bot_responded": True, + "bot_response": 'Message from Ghytro:\n\nhttps://youtu.be/jNQXAC9IVRw', + }, + { + "msg_text": "чекай https://youtu.be/jNQXAC9IVRw?si=qLIZT1rvs99_jbgy\nнаш слон хд", + "sender": TestUser(id=321, first_name="OllyHearn"), + "bot_responded": True, + "bot_response": 'Message from OllyHearn:\n\nчекай https://youtu.be/jNQXAC9IVRw\nнаш слон хд', + }, + { + "msg_text": "а я такая нитакуся без si ссылки шлю сразу https://youtu.be/jNQXAC9IVRw и по нескольку штук\nhttp://www.youtube.com/watch?v=jNQXAC9IVRw&si=qLIZT1rvs99_jbgy&t=16 дада", + "sender": TestUser(id=321, first_name="OllyHearn"), + "bot_responded": True, + "bot_response": 'Message from OllyHearn:\n\nа я такая нитакуся без si ссылки шлю сразу https://youtu.be/jNQXAC9IVRw и по нескольку штук\nhttp://www.youtube.com/watch?v=jNQXAC9IVRw&t=16 дада', + }, + { + "msg_text": "asdasdasdasdasdasdasd asdasd asdasd asdad sasa dadsas", + "sender": TestUser(id=123, first_name="Ghytro"), + "bot_responded": False, + "bot_response": "", + }, + ] + for test_case in test_case_data: + result = TrackerRemovalMsgProcessor( + TrackerRemovalProcessorMessage( + fromUser=test_case["sender"], text=test_case["msg_text"] + ) + ).process() + self.assertEqual( + result, + TrackerRemovalResult( + needsToReply=test_case["bot_responded"], + text=test_case["bot_response"], + ), + ) + + +if __name__ == "__main__": + test_classes_to_run = [TestRemoverFactory, TestRemovalMsgProcessor] + + loader = unittest.TestLoader() + + suites_list = [] + for test_class in test_classes_to_run: + suite = loader.loadTestsFromTestCase(test_class) + suites_list.append(suite) + + big_suite = unittest.TestSuite(suites_list) + + runner = unittest.TextTestRunner() + results = runner.run(big_suite)