84 lines
2.5 KiB
Python
84 lines
2.5 KiB
Python
from telebot.async_telebot import AsyncTeleBot
|
|
from telebot.types import Message
|
|
import asyncio
|
|
from secret import TOKEN
|
|
import textbook
|
|
import re
|
|
from urllib.parse import urlparse, urlunparse
|
|
import random
|
|
|
|
from msgprocessor import TrackerRemovalMsgProcessor, TrackerRemovalProcessorMessage
|
|
|
|
HAS_LINK_RE = r"(https?:\/\/[^\s]+|www\.[^\s]+)"
|
|
|
|
bot = AsyncTeleBot(TOKEN)
|
|
|
|
|
|
def extract_links(text: str):
|
|
url_pattern = r"(https?://[^\s]+|www\.[^\s]+)"
|
|
links = re.findall(url_pattern, text)
|
|
return links
|
|
|
|
|
|
def process_text(text: str) -> str | None:
|
|
links = extract_links(text)
|
|
updated_links = []
|
|
for link in links:
|
|
if any(keyword in link for keyword in ("youtu.be", "youtube.com")):
|
|
parsed_link = urlparse(link)
|
|
if "si=" in parsed_link.query:
|
|
query_arr = parsed_link.query.split("&")
|
|
new_query_arr = [ele for ele in query_arr if "si=" not in ele]
|
|
new_query = "&".join(new_query_arr)
|
|
updated_link = parsed_link._replace(query=new_query)
|
|
updated_links.append(urlunparse(updated_link))
|
|
if updated_links:
|
|
return random.choice(textbook.reactions).format(link="\n".join(updated_links))
|
|
return None
|
|
|
|
|
|
@bot.message_handler(commands=["start"])
|
|
async def start(msg: Message):
|
|
if msg.chat.type == "private":
|
|
await bot.send_message(chat_id=msg.chat.id, text=textbook.start_private)
|
|
elif msg.chat.type in ("group", "supergroup"):
|
|
await bot.send_message(chat_id=msg.chat.id, text=textbook.start_group)
|
|
|
|
|
|
@bot.message_handler(func=lambda message: True)
|
|
async def got_message(msg: Message):
|
|
# god i love nones as fuck
|
|
if msg.text is None:
|
|
return
|
|
if msg.from_user is None:
|
|
return
|
|
|
|
tracker_removal_result = TrackerRemovalMsgProcessor(
|
|
TrackerRemovalProcessorMessage(fromUser=msg.from_user, text=msg.text)
|
|
).process()
|
|
|
|
if not tracker_removal_result.needsToReply:
|
|
return
|
|
|
|
try:
|
|
await bot.delete_message(msg.chat.id, msg.id, timeout=5)
|
|
except Exception as e:
|
|
await bot.reply_to(
|
|
message=msg.id,
|
|
text="Uoghhhh, i am not an admin here? I can't cleanup this tracking(",
|
|
)
|
|
print(e, flush=True) # todo: логгер
|
|
return
|
|
|
|
await bot.send_message(msg.chat.id, tracker_removal_result.text, parse_mode="html")
|
|
|
|
|
|
async def main():
|
|
a = asyncio.create_task(bot.polling(non_stop=True))
|
|
await a
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("Bot started", flush=True)
|
|
asyncio.run(main())
|