diff --git a/bot/bot.py b/bot/bot.py index fb83d1d..5e2a3dd 100644 --- a/bot/bot.py +++ b/bot/bot.py @@ -5,6 +5,10 @@ from secret import TOKEN import textbook import re from urllib.parse import urlparse, urlunparse +from msgprocessor import ( + TrackerRemovalMsgProcessor, + TrackerRemovalProcessorMessage +) import random HAS_LINK_RE = r'(https?:\/\/[^\s]+|www\.[^\s]+)' @@ -45,10 +49,31 @@ async def start(msg: Message): @bot.message_handler(func=lambda message: True) async def got_message(msg: Message): - if re.match(string=msg.text, pattern=HAS_LINK_RE): - fixed_reply = process_text(msg.text) - if fixed_reply: - await bot.reply_to(msg, fixed_reply) + # god i love nones as fuck + if msg.text is None: + return + if msg.from_user is None: + return + if msg.from_user.username is None: + return + + tracker_removal_result = TrackerRemovalMsgProcessor( + TrackerRemovalProcessorMessage( + fromUsername=msg.from_user.username, + text=msg.text + ) + ).process() + + if not tracker_removal_result.needsToReply: + return + + try: + await bot.delete_message(msg.chat.id, msg.id, 5) + except Exception as e: + print(e) # todo: логгер + return + + await bot.send_message(msg.chat.id, tracker_removal_result.text) async def main(): diff --git a/bot/exception.py b/bot/exception.py new file mode 100644 index 0000000..ae8b18c --- /dev/null +++ b/bot/exception.py @@ -0,0 +1,9 @@ +class UrlRemoverNotImplementedException(Exception): + def __init__(self, domain: str): + self.__base_message = "Url remover for domain not implemented" + self.domain = domain + super().__init__(self.__base_message) + + + def __str__(self): + return f'{self.__base_message}: {self.domain}' diff --git a/bot/msgprocessor.py b/bot/msgprocessor.py new file mode 100644 index 0000000..068de6a --- /dev/null +++ b/bot/msgprocessor.py @@ -0,0 +1,118 @@ +from typing import Callable +from dataclasses import dataclass +from exception import UrlRemoverNotImplementedException +from urllib.parse import urlparse, parse_qs, urlencode, urlunparse +from utils import find_all_string_entries + + +@dataclass(init=True) +class TrackerRemovalProcessorMessage: + fromUsername: str + text: str + + +@dataclass(frozen=True, init=True) +class TrackerRemovalResult: + needsToReply: bool + text: str + + +class TrackerRemovalMsgProcessor: + def __init__(self, msg: TrackerRemovalProcessorMessage): + self.__msg = msg + + def process(self) -> TrackerRemovalResult: + if not self.__remove_trackers_from_msg_urls(): + return TrackerRemovalResult(needsToReply=False, text="") # дальнейшие трансформации смысла не имеют + self.__emplace_sender_into_msg_text() + return TrackerRemovalResult(needsToReply=True, text=self.__msg.text) # сообщение было изменено, нужно ответ отослать + + def __remove_trackers_from_msg_urls(self) -> bool: + trackers_extracted = False + # todo: вообще мы работаем с http и это юзкейс обскьюрный + # но ссылка может быть и без указания схемы, телега может распарсить + # просто строку через точки и в конце какой то домен верхнего уровня как ссылку + SCHEME = "http://" + links_entries = find_all_string_entries(self.__msg.text, SCHEME) + + # разобьем строку на просто текст и ссылки + # с in-place преобразованием на норм ссылки + splitted_by_urls: list[str] = [] + before_link_idx = 0 + for link_entry in links_entries: + # будем искать либо конец строки либо пробел + space_index = len(self.__msg.text) + try: + space_index = self.__msg.text.index(" ", link_entry) + except ValueError: + pass + + if link_entry - before_link_idx > 0: + splitted_by_urls.append(self.__msg.text[before_link_idx:link_entry]) + + url = self.__msg.text[link_entry:space_index] + removed_trackers_url = self.__remove_tracker(url) + if url != removed_trackers_url: + trackers_extracted = True + splitted_by_urls.append(removed_trackers_url) + + if len(self.__msg.text) - before_link_idx > 0: + splitted_by_urls.append(self.__msg.text[before_link_idx:]) + + self.__msg.text = "".join(splitted_by_urls) + return trackers_extracted + + @staticmethod + def __remove_tracker(url: str) -> str: + parsed_url = urlparse(url) + if parsed_url.hostname is None: + return url + hostname = str(parsed_url.hostname) + try: + return TrackerRemoverFactory.make_remover(hostname)(url) + except UrlRemoverNotImplementedException: + return url + + def __emplace_sender_into_msg_text(self): + self.__msg.text = f'Message from {self.__msg.fromUsername}:\n\n{self.__msg.text}' + + +class TrackerRemoverFactory: + TrackerRemover = Callable[[str], str] + + @staticmethod + def make_remover(domain: str) -> TrackerRemover: + @dataclass(frozen=True, init=True) + class RemoverIdentifyer: + domains: list[str] + remover: TrackerRemoverFactory.TrackerRemover + + removers_by_domain = [ + RemoverIdentifyer( + domains=["youtube.com", "youtu.be"], + remover=TrackerRemoverFactory.remove_yt_trackers, + ) + ] + remover_one = [ + r + for r in removers_by_domain + if len([d for d in r.domains if d.endswith(domain)]) != 0 + ] + if len(remover_one) == 0: + raise UrlRemoverNotImplementedException(domain) + return remover_one[0].remover + + @staticmethod + def remove_yt_trackers(url: str) -> str: + # todo: подумать как обобщить, мб билдер стратегии поиска трекера + # но эт сильно на потом + QUERY_PARAMS_TRACKER = "si" + parsed_url = urlparse(url) + query_params = parse_qs(parsed_url.query) + if QUERY_PARAMS_TRACKER in query_params: + del query_params[QUERY_PARAMS_TRACKER] + return urlunparse( + parsed_url._replace( + query=urlencode(query_params, doseq=True) + ) + ) diff --git a/bot/utils.py b/bot/utils.py new file mode 100644 index 0000000..532fd32 --- /dev/null +++ b/bot/utils.py @@ -0,0 +1,6 @@ +def find_all_string_entries(s: str, sub: str) -> list[int]: + result = [] + for i in range(len(s)-len(sub)+1): + if s[i:i+len(sub)] == sub: # в питоне точно сравнение строк лексикографически? (забыл) + result.append(i) + return result