from typing import Callable from dataclasses import dataclass from exception import UrlRemoverNotImplementedException from urllib.parse import urlparse, parse_qs, urlencode, urlunparse import re from telebot.types import User @dataclass(init=True, eq=True) class TrackerRemovalProcessorMessage: fromUser: User text: str @dataclass(frozen=True, init=True, eq=True) class TrackerRemovalResult: needsToReply: bool text: str class TrackerRemovalMsgProcessor: def __init__(self, msg: TrackerRemovalProcessorMessage): self.__msg = msg def process(self) -> TrackerRemovalResult: if not self.__remove_trackers_from_msg_urls(): return TrackerRemovalResult( needsToReply=False, text="" ) # дальнейшие трансформации смысла не имеют self.__emplace_sender_into_msg_text() return TrackerRemovalResult( needsToReply=True, text=self.__msg.text ) # сообщение было изменено, нужно ответ отослать def __remove_trackers_from_msg_urls(self) -> bool: trackers_extracted = False # todo: вообще мы работаем с http и это юзкейс обскьюрный # но ссылка может быть и без указания схемы, телега может распарсить # просто строку через точки и в конце какой то домен верхнего уровня как ссылку def is_url(url: str) -> bool: SCHEMES = ["http://", "https://"] return len([s for s in SCHEMES if url.startswith(s)]) != 0 SEPARATOR_CHARS = [" ", "\n"] separator_regex = "(" + "|".join(SEPARATOR_CHARS) + ")" lexems = re.split(separator_regex, self.__msg.text) for i, l in enumerate(lexems): if not is_url(l): continue removed_trackers_url = self.__remove_tracker(l) if l == removed_trackers_url: # изменений урла не было continue trackers_extracted = True lexems[i] = removed_trackers_url self.__msg.text = "".join(lexems) return trackers_extracted @staticmethod def __remove_tracker(url: str) -> str: try: parsed_url = urlparse(url) except Exception: return url if parsed_url.hostname is None: return url hostname = str(parsed_url.hostname) try: return TrackerRemoverFactory.make_remover(hostname)(url) except UrlRemoverNotImplementedException: return url def __emplace_sender_into_msg_text(self): self.__msg.text = f'Message from {self.__msg.fromUser.first_name}:\n\n{self.__msg.text}' class TrackerRemoverFactory: TrackerRemover = Callable[[str], str] @staticmethod def make_remover(domain: str) -> TrackerRemover: @dataclass(frozen=True, init=True) class RemoverIdentifyer: domains: list[str] remover: TrackerRemoverFactory.TrackerRemover removers_by_domain = [ RemoverIdentifyer( domains=["youtube.com", "youtu.be"], remover=TrackerRemoverFactory.remove_yt_trackers, ) ] remover_one = [ r for r in removers_by_domain if len([d for d in r.domains if domain.endswith(d)]) != 0 ] if len(remover_one) == 0: raise UrlRemoverNotImplementedException(domain) return remover_one[0].remover @staticmethod def remove_yt_trackers(url: str) -> str: # todo: подумать как обобщить, мб билдер стратегии поиска трекера # но эт сильно на потом QUERY_PARAMS_TRACKER = "si" parsed_url = urlparse(url) query_params = parse_qs(parsed_url.query) if QUERY_PARAMS_TRACKER in query_params: del query_params[QUERY_PARAMS_TRACKER] return urlunparse( parsed_url._replace(query=urlencode(query_params, doseq=True)) )