Files
nosibakabot/bot/msgprocessor.py

119 lines
4.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Callable
from dataclasses import dataclass
from exception import UrlRemoverNotImplementedException
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
from utils import find_all_string_entries
@dataclass(init=True)
class TrackerRemovalProcessorMessage:
fromUsername: str
text: str
@dataclass(frozen=True, init=True)
class TrackerRemovalResult:
needsToReply: bool
text: str
class TrackerRemovalMsgProcessor:
def __init__(self, msg: TrackerRemovalProcessorMessage):
self.__msg = msg
def process(self) -> TrackerRemovalResult:
if not self.__remove_trackers_from_msg_urls():
return TrackerRemovalResult(needsToReply=False, text="") # дальнейшие трансформации смысла не имеют
self.__emplace_sender_into_msg_text()
return TrackerRemovalResult(needsToReply=True, text=self.__msg.text) # сообщение было изменено, нужно ответ отослать
def __remove_trackers_from_msg_urls(self) -> bool:
trackers_extracted = False
# todo: вообще мы работаем с http и это юзкейс обскьюрный
# но ссылка может быть и без указания схемы, телега может распарсить
# просто строку через точки и в конце какой то домен верхнего уровня как ссылку
SCHEME = "http://"
links_entries = find_all_string_entries(self.__msg.text, SCHEME)
# разобьем строку на просто текст и ссылки
# с in-place преобразованием на норм ссылки
splitted_by_urls: list[str] = []
before_link_idx = 0
for link_entry in links_entries:
# будем искать либо конец строки либо пробел
space_index = len(self.__msg.text)
try:
space_index = self.__msg.text.index(" ", link_entry)
except ValueError:
pass
if link_entry - before_link_idx > 0:
splitted_by_urls.append(self.__msg.text[before_link_idx:link_entry])
url = self.__msg.text[link_entry:space_index]
removed_trackers_url = self.__remove_tracker(url)
if url != removed_trackers_url:
trackers_extracted = True
splitted_by_urls.append(removed_trackers_url)
if len(self.__msg.text) - before_link_idx > 0:
splitted_by_urls.append(self.__msg.text[before_link_idx:])
self.__msg.text = "".join(splitted_by_urls)
return trackers_extracted
@staticmethod
def __remove_tracker(url: str) -> str:
parsed_url = urlparse(url)
if parsed_url.hostname is None:
return url
hostname = str(parsed_url.hostname)
try:
return TrackerRemoverFactory.make_remover(hostname)(url)
except UrlRemoverNotImplementedException:
return url
def __emplace_sender_into_msg_text(self):
self.__msg.text = f'Message from {self.__msg.fromUsername}:\n\n{self.__msg.text}'
class TrackerRemoverFactory:
TrackerRemover = Callable[[str], str]
@staticmethod
def make_remover(domain: str) -> TrackerRemover:
@dataclass(frozen=True, init=True)
class RemoverIdentifyer:
domains: list[str]
remover: TrackerRemoverFactory.TrackerRemover
removers_by_domain = [
RemoverIdentifyer(
domains=["youtube.com", "youtu.be"],
remover=TrackerRemoverFactory.remove_yt_trackers,
)
]
remover_one = [
r
for r in removers_by_domain
if len([d for d in r.domains if d.endswith(domain)]) != 0
]
if len(remover_one) == 0:
raise UrlRemoverNotImplementedException(domain)
return remover_one[0].remover
@staticmethod
def remove_yt_trackers(url: str) -> str:
# todo: подумать как обобщить, мб билдер стратегии поиска трекера
# но эт сильно на потом
QUERY_PARAMS_TRACKER = "si"
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
if QUERY_PARAMS_TRACKER in query_params:
del query_params[QUERY_PARAMS_TRACKER]
return urlunparse(
parsed_url._replace(
query=urlencode(query_params, doseq=True)
)
)