Files
nosibakabot/bot/bot.py
2024-11-07 22:34:07 +03:00

84 lines
2.5 KiB
Python

from telebot.async_telebot import AsyncTeleBot
from telebot.types import Message
import asyncio
from secret import TOKEN
import textbook
import re
from urllib.parse import urlparse, urlunparse
import random
from msgprocessor import TrackerRemovalMsgProcessor, TrackerRemovalProcessorMessage
HAS_LINK_RE = r"(https?:\/\/[^\s]+|www\.[^\s]+)"
bot = AsyncTeleBot(TOKEN)
def extract_links(text: str):
url_pattern = r"(https?://[^\s]+|www\.[^\s]+)"
links = re.findall(url_pattern, text)
return links
def process_text(text: str) -> str | None:
links = extract_links(text)
updated_links = []
for link in links:
if any(keyword in link for keyword in ("youtu.be", "youtube.com")):
parsed_link = urlparse(link)
if "si=" in parsed_link.query:
query_arr = parsed_link.query.split("&")
new_query_arr = [ele for ele in query_arr if "si=" not in ele]
new_query = "&".join(new_query_arr)
updated_link = parsed_link._replace(query=new_query)
updated_links.append(urlunparse(updated_link))
if updated_links:
return random.choice(textbook.reactions).format(link="\n".join(updated_links))
return None
@bot.message_handler(commands=["start"])
async def start(msg: Message):
if msg.chat.type == "private":
await bot.send_message(chat_id=msg.chat.id, text=textbook.start_private)
elif msg.chat.type in ("group", "supergroup"):
await bot.send_message(chat_id=msg.chat.id, text=textbook.start_group)
@bot.message_handler(func=lambda message: True)
async def got_message(msg: Message):
# god i love nones as fuck
if msg.text is None:
return
if msg.from_user is None:
return
tracker_removal_result = TrackerRemovalMsgProcessor(
TrackerRemovalProcessorMessage(fromUser=msg.from_user, text=msg.text)
).process()
if not tracker_removal_result.needsToReply:
return
try:
await bot.delete_message(msg.chat.id, msg.id, timeout=5)
except Exception as e:
await bot.reply_to(
msg,
"Uoghhhh, i am not an admin here? I can't cleanup this tracking(",
)
print(e, flush=True) # todo: логгер
return
await bot.send_message(msg.chat.id, tracker_removal_result.text, parse_mode="html")
async def main():
a = asyncio.create_task(bot.polling(non_stop=True))
await a
if __name__ == "__main__":
print("Bot started", flush=True)
asyncio.run(main())