# Telebot imports from telebot.async_telebot import AsyncTeleBot from telebot.asyncio_storage import StateMemoryStorage, StatePickleStorage from telebot.asyncio_handler_backends import State, StatesGroup from telebot.asyncio_filters import StateFilter from telebot import types from telebot.callback_data import CallbackData, CallbackDataFilter from telebot.types import Message # Async things imports import asyncio # Other modules imports import sqlite3 import json from datetime import datetime import math import socket import os from typing import Union # Local imports from config import token from constants import ( MAX_QUEUES_OWN, MAX_QUEUES_PARTS_IN, MAX_NAME_LENGTH, MAX_QUEUE_NAME_LENGTH, ) import textbook import keyboards # DB from db.base import Session, engine, Base from db.models import User, Queue, QueueUser bot = AsyncTeleBot(token, state_storage=StatePickleStorage()) class States(StatesGroup): default = State() changing_name = State() changing_queue_name = State() # Utils def get_queue_stats_text(queue: Queue) -> str: s = textbook.queue_stats.format(name=queue.name, count=len(queue.users)) return s async def get_queue_from_state_data(call: types.CallbackQuery) -> Queue: async with bot.retrieve_data( user_id=call.from_user.id, chat_id=call.message.chat.id ) as state_data: queue_id = state_data.get("queue_id", None) if not queue_id: await bot.answer_callback_query( callback_query_id=call.id, text=textbook.queue_operational_error ) return None queue = session.query(Queue).filter_by(id=queue_id).first() if queue.owner.id != call.from_user.id: await bot.answer_callback_query( callback_query_id=call.id, text=textbook.queue_operational_error ) return None return queue async def get_parting_queue_from_state_data(call: types.CallbackQuery) -> Queue: async with bot.retrieve_data( user_id=call.from_user.id, chat_id=call.message.chat.id ) as state_data: queue_id = state_data.get("part_queue_id", None) if not queue_id: await bot.answer_callback_query( callback_query_id=call.id, text=textbook.queue_operational_error ) return None queue = session.query(Queue).filter_by(id=queue_id).first() return queue def get_first_queue_user(queue: Queue) -> Union[QueueUser, None]: arr = sorted( queue.users, key=lambda qu: qu.position ) # TODO: Maybe there is a better solution..? return arr[0] if len(arr) else None def kick_first(queue: Queue) -> bool: if len(queue.users): first_user = get_first_queue_user(queue) session.delete(first_user) session.commit() normalize_queue(queue) return True return False def proceed_queue_user(queue: Queue, user: User) -> Union[QueueUser, None]: first_queue_user = get_first_queue_user(queue) if first_queue_user: if user.id == first_queue_user.user_id: kick_first(queue) next_queue_user = get_first_queue_user(queue) return next_queue_user return None def normalize_queue(queue: Queue) -> Queue: # first_user = get_first_queue_user(queue) # if first_user.position != 0: # setattr(first_user, "position", 0) # session.commit() # queue = session.query(Queue).filter_by(id=queue.id).first() for i, qu in enumerate(sorted(queue.users, key=lambda qu: qu.position)): setattr(qu, "position", i) session.commit() return queue async def update_queue_users_message(msg: Message, queue: Queue): queue = normalize_queue(queue) users_str = "\n".join([f"{qu.position}. {qu.user.name}" for qu in queue.users]) await bot.edit_message_text( chat_id=msg.chat.id, message_id=msg.id, text=textbook.queue_users_list.format(name=queue.name, users_str=users_str), reply_markup=keyboards.queue_users(queue.id), parse_mode="html", ) # Basic @bot.message_handler(commands=["start"]) async def start(msg: Message): if msg.chat.type == "private": user = session.query(User).filter_by(id=msg.from_user.id).first() if not user: user = User( id=msg.from_user.id, name=msg.from_user.first_name, username=msg.from_user.username, ) session.add(user) session.commit() await bot.send_message(chat_id=msg.chat.id, text=textbook.start) await asyncio.sleep(2) await bot.set_state(user_id=msg.from_user.id, state=States.default) if len(msg.text.split()) > 1: command, queue_id = msg.text.split() if queue := session.query(Queue).filter_by(id=queue_id).first(): if ( not session.query(QueueUser) .filter_by(queue_id=queue.id, user_id=msg.from_user.id) .first() ) and not len(user.takes_part_in_queues) > MAX_QUEUES_PARTS_IN: last_user = ( session.query(QueueUser) .filter_by(queue_id=queue.id) .order_by(QueueUser.position.desc()) .first() ) if last_user: position = last_user.position + 1 else: position = 0 queue_user = QueueUser( user_id=msg.from_user.id, queue_id=queue.id, position=position ) session.add(queue_user) session.commit() await bot.send_message( chat_id=msg.chat.id, text=textbook.joined_queue.format( name=queue.name, position=queue_user.position ), parse_mode="html", ) else: await bot.send_message( chat_id=msg.chat.id, text=textbook.error_joining_queue.format(name=queue.name), parse_mode="html", ) else: await bot.send_message( chat_id=msg.chat.id, text=textbook.menu.format(name=user.name), reply_markup=keyboards.menu(), ) else: await bot.send_message(chat_id=msg.chat.id, text=textbook.groups_plug) @bot.callback_query_handler(func=lambda c: c.data == "to_menu") async def to_menu_handler(call: types.CallbackQuery): user = session.query(User).filter_by(id=call.from_user.id).first() await bot.set_state(user_id=call.from_user.id, state=States.default) await bot.edit_message_text( chat_id=call.message.chat.id, message_id=call.message.id, text=textbook.menu.format(name=user.name), reply_markup=keyboards.menu(), ) @bot.callback_query_handler(func=lambda c: c.data[:2] == "p:") async def proceed_user_handler(call: types.CallbackQuery): queue_id = call.data[2:] queue = session.query(Queue).filter_by(id=queue_id).first() user = session.query(User).filter_by(id=call.from_user.id).first() if next_queue_user := proceed_queue_user(queue, user): try: await bot.send_message( chat_id=next_queue_user.user_id, text=textbook.your_turn.format(name=queue.name), reply_markup=keyboards.your_turn(queue.id), parse_mode="html", ) except: await bot.send_message( chat_id=queue.owner.id, text=textbook.error_turn.format(name=queue.name), reply_markup=keyboards.your_turn(queue.id), parse_mode="html", ) else: try: await bot.send_message( chat_id=queue.owner.id, text=textbook.queue_finished.format(name=queue.name), parse_mode="html", ) except: pass await bot.edit_message_text( chat_id=call.message.chat.id, message_id=call.message.id, text=textbook.finished_turn.format(name=queue.name), reply_markup=None, ) await bot.answer_callback_query(callback_query_id=call.id) # Main menu @bot.callback_query_handler(func=lambda c: c.data == "new_queue") async def new_queue_handler(call: types.CallbackQuery): user = session.query(User).filter_by(id=call.from_user.id).first() if user: if len(user.owns_queues) < MAX_QUEUES_OWN: queue = Queue(owner_id=call.from_user.id) session.add(queue) session.commit() await bot.edit_message_text( chat_id=call.message.chat.id, message_id=call.message.id, text=textbook.new_queue_created.format(id=queue.id), reply_markup=keyboards.to_menu_keyboard(), ) else: await bot.answer_callback_query( callback_query_id=call.id, text=textbook.queue_limit, show_alert=True ) await bot.answer_callback_query(callback_query_id=call.id) @bot.callback_query_handler(func=lambda c: c.data == "my_queues") async def my_queues_handler(call: types.CallbackQuery): user = session.query(User).filter_by(id=call.from_user.id).first() queues = user.owns_queues await bot.edit_message_text( chat_id=call.message.chat.id, message_id=call.message.id, text=textbook.my_queues_list.format(count=len(queues)), reply_markup=keyboards.my_queues(queues), ) await bot.answer_callback_query(callback_query_id=call.id) @bot.callback_query_handler(func=lambda c: c.data == "parts_queues") async def parts_queues_handler(call: types.CallbackQuery): user = session.query(User).filter_by(id=call.from_user.id).first() queues = [qu.queue for qu in user.takes_part_in_queues] await bot.edit_message_text( chat_id=call.message.chat.id, message_id=call.message.id, text=textbook.parts_queues_menu.format(count=len(queues)), reply_markup=keyboards.parts_queues(queues), ) await bot.answer_callback_query(callback_query_id=call.id) @bot.callback_query_handler(func=lambda c: c.data == "settings") async def settings(call: types.CallbackQuery): await bot.set_state(user_id=call.from_user.id, state=States.default) await bot.edit_message_text( chat_id=call.message.chat.id, message_id=call.message.id, text=textbook.settings, reply_markup=keyboards.settings(), ) await bot.answer_callback_query(callback_query_id=call.id) @bot.callback_query_handler(func=lambda c: c.data == "about") async def about_handler(call: types.CallbackQuery): await bot.answer_callback_query( callback_query_id=call.id, text=textbook.about, show_alert=True, ) # Queue parts list menu @bot.callback_query_handler(func=lambda c: c.data[:2] == "t:") async def queue_parts_handler(call: types.CallbackQuery, queue_id: str = None): queue_id = call.data[2:] if not queue_id else queue_id queue = session.query(Queue).filter_by(id=queue_id).first() if not queue: await bot.answer_callback_query(callback_query_id=call.id, text=textbook.error) return None async with bot.retrieve_data( user_id=call.from_user.id, chat_id=call.message.chat.id ) as state_data: state_data["part_queue_id"] = queue_id users_str = "\n".join([f"{qu.position}. {qu.user.name}" for qu in queue.users]) await bot.edit_message_text( chat_id=call.message.chat.id, message_id=call.message.id, text=textbook.part_queue.format(name=queue.name, users_str=users_str), reply_markup=keyboards.queue_part_in_menu(), parse_mode="html", ) await bot.answer_callback_query(callback_query_id=call.id) # Queue partin menu @bot.callback_query_handler(func=lambda c: c.data == "leave_queue") async def leave_queue_handler(call: types.CallbackQuery): if queue := await get_parting_queue_from_state_data(call): queueuser = ( session.query(QueueUser) .filter_by(queue_id=queue.id, user_id=call.from_user.id) .first() ) session.delete(queueuser) session.commit() normalize_queue(queue) await bot.answer_callback_query( callback_query_id=call.id, text=textbook.leaved_queue.format(name=queue.name), ) await parts_queues_handler(call) return None await bot.answer_callback_query(callback_query_id=call.id) @bot.callback_query_handler(func=lambda c: c.data == "refresh_list") async def refresh_list_handler(call: types.CallbackQuery): if queue := await get_parting_queue_from_state_data(call): users_str = "\n".join([f"{qu.position}. {qu.user.name}" for qu in queue.users]) try: await bot.edit_message_text( chat_id=call.message.chat.id, message_id=call.message.id, text=textbook.part_queue.format(name=queue.name, users_str=users_str), reply_markup=keyboards.queue_part_in_menu(), parse_mode="html", ) except: await asyncio.sleep(2) await bot.answer_callback_query(callback_query_id=call.id) # Queue own list menu @bot.callback_query_handler(func=lambda c: c.data[:2] == "q:") async def queue_handler(call: types.CallbackQuery, queue_id: str = None): queue_id = call.data[2:] if not queue_id else queue_id queue = session.query(Queue).filter_by(id=queue_id).first() if not queue: await bot.answer_callback_query(callback_query_id=call.id, text=textbook.error) return None async with bot.retrieve_data( user_id=call.from_user.id, chat_id=call.message.chat.id ) as state_data: state_data["queue_id"] = queue_id await bot.edit_message_text( chat_id=call.message.chat.id, message_id=call.message.id, text=get_queue_stats_text(queue), reply_markup=keyboards.queue_menu(), parse_mode="html", ) await bot.answer_callback_query(callback_query_id=call.id) # Queue menu @bot.callback_query_handler(func=lambda c: c.data == "get_queue_users") async def get_queue_users_handler(call: types.CallbackQuery): if queue := await get_queue_from_state_data(call): users_str = "\n".join([f"{qu.position}. {qu.user.name}" for qu in queue.users]) await bot.edit_message_text( chat_id=call.message.chat.id, message_id=call.message.id, text=textbook.queue_users_list.format(name=queue.name, users_str=users_str), reply_markup=keyboards.queue_users(queue.id), parse_mode="html", ) await bot.answer_callback_query(callback_query_id=call.id) @bot.callback_query_handler(func=lambda c: c.data == "get_queue_link") async def get_queue_link_handler(call: types.CallbackQuery): if queue := await get_queue_from_state_data(call): await bot.send_message( chat_id=call.message.chat.id, text=textbook.link_template.format(link=queue.id), ) await bot.answer_callback_query(callback_query_id=call.id) @bot.callback_query_handler(func=lambda c: c.data == "queue_settings") async def queue_settings_handler(call: types.CallbackQuery): if queue := await get_queue_from_state_data(call): await bot.edit_message_text( chat_id=call.message.chat.id, message_id=call.message.id, text=textbook.queue_settings.format(name=queue.name), reply_markup=keyboards.queue_settings(queue_id=queue.id), ) await bot.answer_callback_query(callback_query_id=call.id) @bot.callback_query_handler(func=lambda c: c.data == "start_queue") async def start_queue_handler(call: types.CallbackQuery): if queue := await get_queue_from_state_data(call): if first_queue_user := get_first_queue_user(queue): try: await bot.send_message( chat_id=first_queue_user.user_id, text=textbook.your_turn.format(name=queue.name), reply_markup=keyboards.your_turn(queue.id), parse_mode="html", ) except: await bot.send_message( chat_id=queue.owner_id, text=textbook.error_turn.format(name=first_queue_user.user.name), parse_mode="html", ) await bot.answer_callback_query( callback_query_id=call.id, text=textbook.queue_started.format(name=queue.name), show_alert=True, ) else: await bot.answer_callback_query( callback_query_id=call.id, text=textbook.kick_first_error, show_alert=True, ) await bot.answer_callback_query(callback_query_id=call.id) # Queue users @bot.callback_query_handler(func=lambda c: c.data == "kick_first") async def kick_first_handler(call: types.CallbackQuery): if queue := await get_queue_from_state_data(call): if queue.owner_id == call.from_user.id: if kick_first(queue): await bot.answer_callback_query( callback_query_id=call.id, text=textbook.first_kicked ) await update_queue_users_message(call.message, queue) else: await bot.answer_callback_query( callback_query_id=call.id, text=textbook.kick_first_error, show_alert=True, ) else: await bot.answer_callback_query( callback_query_id=call.id, text=textbook.queue_operational_error, show_alert=True, ) await bot.answer_callback_query(callback_query_id=call.id) @bot.callback_query_handler(func=lambda c: c.data == "swap_users") async def swap_users_handler(call: types.CallbackQuery): await bot.answer_callback_query( callback_query_id=call.id, text=textbook.in_development_plug, show_alert=True, ) @bot.callback_query_handler(func=lambda c: c.data == "refresh_users") async def refresh_users_handler(call: types.CallbackQuery): if queue := await get_queue_from_state_data(call): try: users_str = "\n".join( [f"{qu.position}. {qu.user.name}" for qu in queue.users] ) await bot.edit_message_text( chat_id=call.message.chat.id, message_id=call.message.id, text=textbook.queue_users_list.format( name=queue.name, users_str=users_str ), reply_markup=keyboards.queue_users(queue.id), parse_mode="html", ) except: await asyncio.sleep(2) await bot.answer_callback_query(callback_query_id=call.id) # Queue settings @bot.callback_query_handler(func=lambda c: c.data == "edit_queue_name") async def edit_queue_name_handler(call: types.CallbackQuery): if await get_queue_from_state_data(call): await bot.set_state(user_id=call.from_user.id, state=States.changing_queue_name) await bot.edit_message_text( chat_id=call.message.chat.id, message_id=call.message.id, text=textbook.edit_queue_name, reply_markup=keyboards.edit_name(), ) @bot.callback_query_handler( func=lambda c: c.data == "cancel", state=States.changing_queue_name ) async def edit_queue_name_cancel_handler(call: types.CallbackQuery): await queue_settings_handler(call) @bot.message_handler(content_types=["text"], state=States.changing_queue_name) async def update_queue_name(msg: Message): if len(msg.text) > MAX_QUEUE_NAME_LENGTH or "\n" in msg.text: await bot.send_message(chat_id=msg.chat.id, text=textbook.edit_name_error) return None async with bot.retrieve_data( user_id=msg.from_user.id, chat_id=msg.chat.id ) as state_data: queue_id = state_data.get("queue_id", None) queue = session.query(Queue).filter_by(id=queue_id).first() if not queue: await bot.send_message(chat_id=msg.chat.id, text=textbook.edit_name_error) return None setattr(queue, "name", msg.text) session.commit() await bot.send_message(chat_id=msg.chat.id, text=textbook.edit_queue_name_success) await asyncio.sleep(1) await bot.set_state(user_id=msg.from_user.id, state=States.default) await bot.send_message( chat_id=msg.chat.id, text=get_queue_stats_text(queue), reply_markup=keyboards.queue_menu(), parse_mode="html", ) @bot.callback_query_handler(func=lambda c: c.data == "edit_queue_description") async def edit_queue_description_handler(call: types.CallbackQuery): await bot.answer_callback_query( callback_query_id=call.id, text=textbook.in_development_plug, show_alert=True, ) @bot.callback_query_handler(func=lambda c: c.data == "delete_queue_approve") async def delete_queue_approve_handler(call: types.CallbackQuery): await bot.edit_message_text( chat_id=call.message.chat.id, message_id=call.message.id, text=textbook.delete_queue_approve, reply_markup=keyboards.approve_queue_delete(), ) await bot.answer_callback_query(callback_query_id=call.id) @bot.callback_query_handler(func=lambda c: c.data == "delete_queue") async def delete_queue_handler(call: types.CallbackQuery): if queue := await get_queue_from_state_data(call): for qu in queue.users: session.delete(qu) # TODO: Use SQLAlchemy to cascade-delete all users session.delete(queue) session.commit() await bot.answer_callback_query( callback_query_id=call.id, text=textbook.queue_deleted ) await my_queues_handler(call) await bot.answer_callback_query(callback_query_id=call.id) # User settings @bot.callback_query_handler(func=lambda c: c.data == "edit_name") async def edit_name_handler(call: types.CallbackQuery): await bot.set_state(user_id=call.from_user.id, state=States.changing_name) await bot.edit_message_text( chat_id=call.message.chat.id, message_id=call.message.id, text=textbook.edit_name, reply_markup=keyboards.edit_name(), ) @bot.callback_query_handler( func=lambda c: c.data == "cancel", state=States.changing_name ) async def edit_name_cancel_handler(call: types.CallbackQuery): await settings(call) @bot.message_handler(content_types=["text"], state=States.changing_name) async def update_queue_name(msg: Message): if len(msg.text) > MAX_NAME_LENGTH or "\n" in msg.text: await bot.send_message(chat_id=msg.chat.id, text=textbook.edit_name_error) return None user = session.query(User).filter_by(id=msg.from_user.id).first() setattr(user, "name", msg.text) session.commit() await bot.send_message(chat_id=msg.chat.id, text=textbook.edit_name_success) await asyncio.sleep(1) await bot.set_state(user_id=msg.from_user.id, state=States.default) await bot.send_message( chat_id=msg.chat.id, text=textbook.settings, reply_markup=keyboards.settings(), ) # Other @bot.message_handler(commands=["mystate"]) async def mystate(msg: Message): state = await bot.get_state(user_id=msg.from_user.id) await bot.send_message(chat_id=msg.from_user.id, text=state) @bot.message_handler(commands=["chatid"]) async def chatid(msg: Message): await bot.send_message(chat_id=msg.chat.id, text=msg.chat.id) @bot.message_handler(commands=["stats"]) async def stats(msg: Message): users_count = session.query(User).count() queues_count = session.query(Queue).count() await bot.send_message( chat_id=msg.chat.id, text=textbook.stats.format(users_count=users_count, queues_count=queues_count), ) @bot.message_handler(commands=["changelog"]) async def changelog(msg: Message): with open("changelog.txt", "r") as file: await bot.send_message(chat_id=msg.chat.id, text=file.read(), parse_mode="html") # Launch async def main(): a = asyncio.create_task(bot.polling(non_stop=True)) await a if __name__ == "__main__": print("Bot started", flush=True) Base.metadata.create_all(engine) session = Session() bot.add_custom_filter(StateFilter(bot)) bot.enable_saving_states(filename="./.state-save/states.pkl") asyncio.run(main())