Files
nosibakabot/bot/msgprocessor.py
2024-11-07 19:27:37 +00:00

117 lines
4.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Callable
from dataclasses import dataclass
from exception import UrlRemoverNotImplementedException
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
import re
from telebot.types import User
@dataclass(init=True, eq=True)
class TrackerRemovalProcessorMessage:
fromUser: User
text: str
@dataclass(frozen=True, init=True, eq=True)
class TrackerRemovalResult:
needsToReply: bool
text: str
class TrackerRemovalMsgProcessor:
def __init__(self, msg: TrackerRemovalProcessorMessage):
self.__msg = msg
def process(self) -> TrackerRemovalResult:
if not self.__remove_trackers_from_msg_urls():
return TrackerRemovalResult(
needsToReply=False, text=""
) # дальнейшие трансформации смысла не имеют
self.__emplace_sender_into_msg_text()
return TrackerRemovalResult(
needsToReply=True, text=self.__msg.text
) # сообщение было изменено, нужно ответ отослать
def __remove_trackers_from_msg_urls(self) -> bool:
trackers_extracted = False
# todo: вообще мы работаем с http и это юзкейс обскьюрный
# но ссылка может быть и без указания схемы, телега может распарсить
# просто строку через точки и в конце какой то домен верхнего уровня как ссылку
def is_url(url: str) -> bool:
SCHEMES = ["http://", "https://"]
return len([s for s in SCHEMES if url.startswith(s)]) != 0
SEPARATOR_CHARS = [" ", "\n"]
separator_regex = "(" + "|".join(SEPARATOR_CHARS) + ")"
lexems = re.split(separator_regex, self.__msg.text)
for i, l in enumerate(lexems):
if not is_url(l):
continue
removed_trackers_url = self.__remove_tracker(l)
if l == removed_trackers_url: # изменений урла не было
continue
trackers_extracted = True
lexems[i] = removed_trackers_url
self.__msg.text = "".join(lexems)
return trackers_extracted
@staticmethod
def __remove_tracker(url: str) -> str:
try:
parsed_url = urlparse(url)
except Exception:
return url
if parsed_url.hostname is None:
return url
hostname = str(parsed_url.hostname)
try:
return TrackerRemoverFactory.make_remover(hostname)(url)
except UrlRemoverNotImplementedException:
return url
def __emplace_sender_into_msg_text(self):
self.__msg.text = f'Message from <a href="tg://user?id={self.__msg.fromUser.id}">{self.__msg.fromUser.first_name}</a>:\n\n{self.__msg.text}'
class TrackerRemoverFactory:
TrackerRemover = Callable[[str], str]
@staticmethod
def make_remover(domain: str) -> TrackerRemover:
@dataclass(frozen=True, init=True)
class RemoverIdentifyer:
domains: list[str]
remover: TrackerRemoverFactory.TrackerRemover
removers_by_domain = [
RemoverIdentifyer(
domains=["youtube.com", "youtu.be"],
remover=TrackerRemoverFactory.remove_yt_trackers,
)
]
remover_one = [
r
for r in removers_by_domain
if len([d for d in r.domains if domain.endswith(d)]) != 0
]
if len(remover_one) == 0:
raise UrlRemoverNotImplementedException(domain)
return remover_one[0].remover
@staticmethod
def remove_yt_trackers(url: str) -> str:
# todo: подумать как обобщить, мб билдер стратегии поиска трекера
# но эт сильно на потом
QUERY_PARAMS_TRACKER = "si"
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
if QUERY_PARAMS_TRACKER in query_params:
del query_params[QUERY_PARAMS_TRACKER]
return urlunparse(
parsed_url._replace(query=urlencode(query_params, doseq=True))
)