diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..a03b5fd --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "python.testing.unittestArgs": ["-v", "-s", "./bot", "-p", "*_test.py"], + "python.testing.pytestEnabled": false, + "python.testing.unittestEnabled": true +} diff --git a/bot/bot.py b/bot/bot.py index 5e2a3dd..ec3ef87 100644 --- a/bot/bot.py +++ b/bot/bot.py @@ -5,19 +5,17 @@ from secret import TOKEN import textbook import re from urllib.parse import urlparse, urlunparse -from msgprocessor import ( - TrackerRemovalMsgProcessor, - TrackerRemovalProcessorMessage -) import random -HAS_LINK_RE = r'(https?:\/\/[^\s]+|www\.[^\s]+)' +from msgprocessor import TrackerRemovalMsgProcessor, TrackerRemovalProcessorMessage + +HAS_LINK_RE = r"(https?:\/\/[^\s]+|www\.[^\s]+)" bot = AsyncTeleBot(TOKEN) def extract_links(text: str): - url_pattern = r'(https?://[^\s]+|www\.[^\s]+)' + url_pattern = r"(https?://[^\s]+|www\.[^\s]+)" links = re.findall(url_pattern, text) return links @@ -54,26 +52,25 @@ async def got_message(msg: Message): return if msg.from_user is None: return - if msg.from_user.username is None: - return tracker_removal_result = TrackerRemovalMsgProcessor( - TrackerRemovalProcessorMessage( - fromUsername=msg.from_user.username, - text=msg.text - ) + TrackerRemovalProcessorMessage(fromUser=msg.from_user, text=msg.text) ).process() if not tracker_removal_result.needsToReply: return try: - await bot.delete_message(msg.chat.id, msg.id, 5) + await bot.delete_message(msg.chat.id, msg.id, timeout=5) except Exception as e: - print(e) # todo: логгер + await bot.reply_to( + message=msg.id, + text="Uoghhhh, i am not an admin here? I can't cleanup this tracking(", + ) + print(e, flush=True) # todo: логгер return - await bot.send_message(msg.chat.id, tracker_removal_result.text) + await bot.send_message(msg.chat.id, tracker_removal_result.text, parse_mode="html") async def main(): diff --git a/bot/msgprocessor.py b/bot/msgprocessor.py index 79b10f1..f7ad6d0 100644 --- a/bot/msgprocessor.py +++ b/bot/msgprocessor.py @@ -3,11 +3,12 @@ from dataclasses import dataclass from exception import UrlRemoverNotImplementedException from urllib.parse import urlparse, parse_qs, urlencode, urlunparse import re +from telebot.types import User @dataclass(init=True, eq=True) class TrackerRemovalProcessorMessage: - fromUsername: str + fromUser: User text: str @@ -23,12 +24,17 @@ class TrackerRemovalMsgProcessor: def process(self) -> TrackerRemovalResult: if not self.__remove_trackers_from_msg_urls(): - return TrackerRemovalResult(needsToReply=False, text="") # дальнейшие трансформации смысла не имеют + return TrackerRemovalResult( + needsToReply=False, text="" + ) # дальнейшие трансформации смысла не имеют self.__emplace_sender_into_msg_text() - return TrackerRemovalResult(needsToReply=True, text=self.__msg.text) # сообщение было изменено, нужно ответ отослать + return TrackerRemovalResult( + needsToReply=True, text=self.__msg.text + ) # сообщение было изменено, нужно ответ отослать def __remove_trackers_from_msg_urls(self) -> bool: trackers_extracted = False + # todo: вообще мы работаем с http и это юзкейс обскьюрный # но ссылка может быть и без указания схемы, телега может распарсить # просто строку через точки и в конце какой то домен верхнего уровня как ссылку @@ -37,14 +43,14 @@ class TrackerRemovalMsgProcessor: return len([s for s in SCHEMES if url.startswith(s)]) != 0 SEPARATOR_CHARS = [" ", "\n"] - separator_regex = "("+"|".join(SEPARATOR_CHARS)+")" + separator_regex = "(" + "|".join(SEPARATOR_CHARS) + ")" lexems = re.split(separator_regex, self.__msg.text) for i, l in enumerate(lexems): if not is_url(l): continue removed_trackers_url = self.__remove_tracker(l) - if l == removed_trackers_url: # изменений урла не было + if l == removed_trackers_url: # изменений урла не было continue trackers_extracted = True @@ -68,7 +74,7 @@ class TrackerRemovalMsgProcessor: return url def __emplace_sender_into_msg_text(self): - self.__msg.text = f'Message from @{self.__msg.fromUsername}:\n\n{self.__msg.text}' + self.__msg.text = f'Message from {self.__msg.fromUser.first_name}:\n\n{self.__msg.text}' class TrackerRemoverFactory: @@ -106,7 +112,5 @@ class TrackerRemoverFactory: if QUERY_PARAMS_TRACKER in query_params: del query_params[QUERY_PARAMS_TRACKER] return urlunparse( - parsed_url._replace( - query=urlencode(query_params, doseq=True) - ) + parsed_url._replace(query=urlencode(query_params, doseq=True)) ) diff --git a/bot/msgprocessor_test.py b/bot/msgprocessor_test.py index 5cff4de..a01a9fb 100644 --- a/bot/msgprocessor_test.py +++ b/bot/msgprocessor_test.py @@ -5,6 +5,13 @@ from msgprocessor import ( TrackerRemovalProcessorMessage, TrackerRemovalResult, ) +from dataclasses import dataclass + + +@dataclass(init=True, eq=True) +class TestUser: + id: int + first_name: str class TestRemoverFactory(unittest.TestCase): @@ -12,13 +19,10 @@ class TestRemoverFactory(unittest.TestCase): def test_remove_strategy_constructor(self): test_case_data = [ - { - "domain": "youtube.com", - "remover": self.factory.remove_yt_trackers - }, + {"domain": "youtube.com", "remover": self.factory.remove_yt_trackers}, { "domain": "lowerlevel.youtube.com", - "remover": self.factory.remove_yt_trackers + "remover": self.factory.remove_yt_trackers, }, { "domain": "youtu.be", @@ -27,7 +31,7 @@ class TestRemoverFactory(unittest.TestCase): { "domain": "something.youtu.be", "remover": self.factory.remove_yt_trackers, - } + }, ] for test_case in test_case_data: self.assertIs( @@ -39,23 +43,26 @@ class TestRemoverFactory(unittest.TestCase): test_case_data = [ { "url": "https://youtu.be/jNQXAC9IVRw?si=qLIZT1rvs99_jbgy", - "expected_url": "https://youtu.be/jNQXAC9IVRw" + "expected_url": "https://youtu.be/jNQXAC9IVRw", }, { "url": "https://youtu.be/jNQXAC9IVRw?si=qLIZT1rvs99_jbgy&t=16", - "expected_url": "https://youtu.be/jNQXAC9IVRw?t=16" + "expected_url": "https://youtu.be/jNQXAC9IVRw?t=16", }, { "url": "https://www.youtube.com/watch?v=jNQXAC9IVRw", - "expected_url": "https://www.youtube.com/watch?v=jNQXAC9IVRw" + "expected_url": "https://www.youtube.com/watch?v=jNQXAC9IVRw", }, { "url": "http://www.youtube.com/watch?v=jNQXAC9IVRw&si=qLIZT1rvs99_jbgy&t=16", - "expected_url": "http://www.youtube.com/watch?v=jNQXAC9IVRw&t=16" - } + "expected_url": "http://www.youtube.com/watch?v=jNQXAC9IVRw&t=16", + }, ] for test_case in test_case_data: - self.assertEqual(self.factory.remove_yt_trackers(test_case["url"]), test_case["expected_url"]) + self.assertEqual( + self.factory.remove_yt_trackers(test_case["url"]), + test_case["expected_url"], + ) class TestRemovalMsgProcessor(unittest.TestCase): @@ -63,44 +70,44 @@ class TestRemovalMsgProcessor(unittest.TestCase): test_case_data = [ { "msg_text": "https://youtu.be/jNQXAC9IVRw?si=qLIZT1rvs99_jbgy", - "sender_username": "Ghytro", + "sender": TestUser(id=123, first_name="Ghytro"), "bot_responded": True, - "bot_response": "Message from @Ghytro:\n\nhttps://youtu.be/jNQXAC9IVRw" + "bot_response": 'Message from Ghytro:\n\nhttps://youtu.be/jNQXAC9IVRw', }, { "msg_text": "чекай https://youtu.be/jNQXAC9IVRw?si=qLIZT1rvs99_jbgy\nнаш слон хд", - "sender_username": "OllyHearn", + "sender": TestUser(id=321, first_name="OllyHearn"), "bot_responded": True, - "bot_response": "Message from @OllyHearn:\n\nчекай https://youtu.be/jNQXAC9IVRw\nнаш слон хд" + "bot_response": 'Message from OllyHearn:\n\nчекай https://youtu.be/jNQXAC9IVRw\nнаш слон хд', }, { "msg_text": "а я такая нитакуся без si ссылки шлю сразу https://youtu.be/jNQXAC9IVRw и по нескольку штук\nhttp://www.youtube.com/watch?v=jNQXAC9IVRw&si=qLIZT1rvs99_jbgy&t=16 дада", - "sender_username": "OllyHearn", + "sender": TestUser(id=321, first_name="OllyHearn"), "bot_responded": True, - "bot_response": "Message from @OllyHearn:\n\nа я такая нитакуся без si ссылки шлю сразу https://youtu.be/jNQXAC9IVRw и по нескольку штук\nhttp://www.youtube.com/watch?v=jNQXAC9IVRw&t=16 дада" + "bot_response": 'Message from OllyHearn:\n\nа я такая нитакуся без si ссылки шлю сразу https://youtu.be/jNQXAC9IVRw и по нескольку штук\nhttp://www.youtube.com/watch?v=jNQXAC9IVRw&t=16 дада', }, { "msg_text": "asdasdasdasdasdasdasd asdasd asdasd asdad sasa dadsas", - "sender_username": "Ghytro", + "sender": TestUser(id=123, first_name="Ghytro"), "bot_responded": False, - "bot_response": "" - } + "bot_response": "", + }, ] for test_case in test_case_data: result = TrackerRemovalMsgProcessor( TrackerRemovalProcessorMessage( - fromUsername=test_case["sender_username"], - text=test_case["msg_text"] + fromUser=test_case["sender"], text=test_case["msg_text"] ) ).process() self.assertEqual( result, TrackerRemovalResult( needsToReply=test_case["bot_responded"], - text=test_case["bot_response"] - ) + text=test_case["bot_response"], + ), ) + if __name__ == "__main__": test_classes_to_run = [TestRemoverFactory, TestRemovalMsgProcessor] @@ -110,7 +117,7 @@ if __name__ == "__main__": for test_class in test_classes_to_run: suite = loader.loadTestsFromTestCase(test_class) suites_list.append(suite) - + big_suite = unittest.TestSuite(suites_list) runner = unittest.TextTestRunner()