Files
queuebot/bot/app/bot.py
2023-06-09 18:21:36 +03:00

465 lines
16 KiB
Python

# Telebot imports
from telebot.async_telebot import AsyncTeleBot
from telebot.asyncio_storage import StateMemoryStorage, StatePickleStorage
from telebot.asyncio_handler_backends import State, StatesGroup
from telebot.asyncio_filters import StateFilter
from telebot import types
from telebot.callback_data import CallbackData, CallbackDataFilter
from telebot.types import Message
# Async things imports
import asyncio
# Other modules imports
import sqlite3
import json
from datetime import datetime
import math
import socket
import os
# Local imports
from config import token, admins
import textbook
import keyboards
# DB
from db.base import Session, engine, Base
from db.models import User, Queue, QueueUser
bot = AsyncTeleBot(token, state_storage=StatePickleStorage())
class States(StatesGroup):
default = State()
changing_name = State()
changing_queue_name = State()
# Utils
def get_queue_stats_text(queue: Queue) -> str:
s = textbook.queue_stats.format(name=queue.name, count=len(queue.users))
return s
async def get_queue_from_state_data(call: types.CallbackQuery) -> Queue:
async with bot.retrieve_data(
user_id=call.from_user.id, chat_id=call.message.chat.id
) as state_data:
queue_id = state_data.get("queue_id", None)
if not queue_id:
await bot.answer_callback_query(
callback_query_id=call.id, text=textbook.queue_operational_error
)
return None
queue = session.query(Queue).filter_by(id=queue_id).first()
if queue.owner.id != call.from_user.id:
await bot.answer_callback_query(
callback_query_id=call.id, text=textbook.queue_operational_error
)
return None
return queue
def get_first_queue_user(queue: Queue) -> QueueUser:
arr = sorted(queue.users, key=lambda qu: qu.position)
return arr[0]
def proceed_queue(queue: Queue) -> bool:
if len(queue.users):
for qu in queue.users:
setattr(qu, "position", qu.position - 1)
first_user = get_first_queue_user(queue)
session.delete(first_user)
session.commit()
return True
return False
async def update_queue_users_message(msg: Message, queue: Queue):
users_str = "\n".join([f"{qu.position}. {qu.user.name}" for qu in queue.users])
await bot.edit_message_text(
chat_id=msg.chat.id,
message_id=msg.id,
text=textbook.queue_users_list.format(name=queue.name, users_str=users_str),
reply_markup=keyboards.queue_users(queue.id),
parse_mode="html",
)
# Basic
@bot.message_handler(commands=["start"])
async def start(msg: Message):
if msg.chat.type == "private":
user = session.query(User).filter_by(id=msg.from_user.id).first()
if not user:
user = User(
id=msg.from_user.id,
name=msg.from_user.first_name,
username=msg.from_user.username,
)
session.add(user)
session.commit()
await bot.send_message(chat_id=msg.chat.id, text=textbook.start)
await asyncio.sleep(2)
await bot.set_state(user_id=msg.from_user.id, state=States.default)
if len(msg.text.split()) > 1:
command, queue_id = msg.text.split()
if queue := session.query(Queue).filter_by(id=queue_id).first():
if (
not session.query(QueueUser)
.filter_by(queue_id=queue.id, user_id=msg.from_user.id)
.first()
):
last_user = (
session.query(QueueUser)
.filter_by(queue_id=queue.id)
.order_by(QueueUser.position.desc())
.first()
)
if last_user:
position = last_user.position + 1
else:
position = 0
queue_user = QueueUser(
user_id=msg.from_user.id, queue_id=queue.id, position=position
)
session.add(queue_user)
session.commit()
await bot.send_message(
chat_id=msg.chat.id,
text=textbook.joined_queue.format(
name=queue.name, position=queue_user.position
),
parse_mode="html",
)
else:
await bot.send_message(
chat_id=msg.chat.id,
text=textbook.error_joining_queue.format(name=queue.name),
parse_mode="html",
)
else:
await bot.send_message(
chat_id=msg.chat.id,
text=textbook.menu.format(name=user.name),
reply_markup=keyboards.menu(),
)
else:
await bot.send_message(chat_id=msg.chat.id, text=textbook.groups_plug)
@bot.callback_query_handler(func=lambda c: c.data == "to_menu")
async def to_menu_handler(call: types.CallbackQuery):
user = session.query(User).filter_by(id=call.from_user.id).first()
await bot.set_state(user_id=call.from_user.id, state=States.default)
await bot.edit_message_text(
chat_id=call.message.chat.id,
message_id=call.message.id,
text=textbook.menu.format(name=user.name),
reply_markup=keyboards.menu(),
)
# Main menu
@bot.callback_query_handler(func=lambda c: c.data == "new_queue")
async def new_queue_handler(call: types.CallbackQuery):
user = session.query(User).filter_by(id=call.from_user.id).first()
if user:
if len(user.owns_queues) < 4:
queue = Queue(owner_id=call.from_user.id)
session.add(queue)
session.commit()
await bot.answer_callback_query(
callback_query_id=call.id,
text=textbook.new_queue_created.format(id=queue.id),
show_alert=True,
)
else:
await bot.answer_callback_query(
callback_query_id=call.id, text=textbook.queue_limit, show_alert=True
)
await bot.answer_callback_query(callback_query_id=call.id)
@bot.callback_query_handler(func=lambda c: c.data == "my_queues")
async def my_queues_handler(call: types.CallbackQuery):
user = session.query(User).filter_by(id=call.from_user.id).first()
queues = user.owns_queues
await bot.edit_message_text(
chat_id=call.message.chat.id,
message_id=call.message.id,
text=textbook.my_queues_list.format(count=len(queues)),
reply_markup=keyboards.my_queues(queues),
)
await bot.answer_callback_query(callback_query_id=call.id)
@bot.callback_query_handler(func=lambda c: c.data == "settings")
async def settings(call: types.CallbackQuery):
await bot.set_state(user_id=call.from_user.id, state=States.default)
await bot.edit_message_text(
chat_id=call.message.chat.id,
message_id=call.message.id,
text=textbook.settings,
reply_markup=keyboards.settings(),
)
await bot.answer_callback_query(callback_query_id=call.id)
@bot.callback_query_handler(func=lambda c: c.data == "about")
async def about_handler(call: types.CallbackQuery):
await bot.answer_callback_query(
callback_query_id=call.id,
text=textbook.about,
show_alert=True,
)
# Queue list menu
@bot.callback_query_handler(func=lambda c: c.data[:2] == "q:")
async def queue_handler(call: types.CallbackQuery, queue_id: str = None):
queue_id = call.data[2:] if not queue_id else queue_id
queue = session.query(Queue).filter_by(id=queue_id).first()
if not queue:
await bot.answer_callback_query(callback_query_id=call.id, text=textbook.error)
return None
async with bot.retrieve_data(
user_id=call.from_user.id, chat_id=call.message.chat.id
) as state_data:
state_data["queue_id"] = queue_id
await bot.edit_message_text(
chat_id=call.message.chat.id,
message_id=call.message.id,
text=get_queue_stats_text(queue),
reply_markup=keyboards.queue_menu(),
parse_mode="html",
)
await bot.answer_callback_query(callback_query_id=call.id)
# Queue menu
@bot.callback_query_handler(func=lambda c: c.data == "get_queue_users")
async def get_queue_users_handler(call: types.CallbackQuery):
if queue := await get_queue_from_state_data(call):
users_str = "\n".join([f"{qu.position}. {qu.user.name}" for qu in queue.users])
await bot.edit_message_text(
chat_id=call.message.chat.id,
message_id=call.message.id,
text=textbook.queue_users_list.format(name=queue.name, users_str=users_str),
reply_markup=keyboards.queue_users(queue.id),
parse_mode="html",
)
await bot.answer_callback_query(callback_query_id=call.id)
@bot.callback_query_handler(func=lambda c: c.data == "get_queue_link")
async def get_queue_link_handler(call: types.CallbackQuery):
if queue := await get_queue_from_state_data(call):
await bot.send_message(
chat_id=call.message.chat.id,
text=textbook.link_template.format(link=queue.id),
)
await bot.answer_callback_query(callback_query_id=call.id)
@bot.callback_query_handler(func=lambda c: c.data == "queue_settings")
async def queue_settings_handler(call: types.CallbackQuery):
if queue := await get_queue_from_state_data(call):
await bot.edit_message_text(
chat_id=call.message.chat.id,
message_id=call.message.id,
text=textbook.queue_settings.format(name=queue.name),
reply_markup=keyboards.queue_settings(queue_id=queue.id),
)
await bot.answer_callback_query(callback_query_id=call.id)
@bot.callback_query_handler(func=lambda c: c.data == "start_queue")
async def start_queue_handler(call: types.CallbackQuery):
pass
# Queue users
@bot.callback_query_handler(func=lambda c: c.data == "kick_first")
async def get_queue_users_handler(call: types.CallbackQuery):
if queue := await get_queue_from_state_data(call):
if queue.owner_id == call.from_user.id:
if proceed_queue(queue):
await bot.answer_callback_query(
callback_query_id=call.id, text=textbook.first_kicked
)
await update_queue_users_message(call.message, queue)
else:
await bot.answer_callback_query(
callback_query_id=call.id,
text=textbook.kick_first_error,
show_alert=True,
)
else:
await bot.answer_callback_query(
callback_query_id=call.id,
text=textbook.queue_operational_error,
show_alert=True,
)
await bot.answer_callback_query(callback_query_id=call.id)
@bot.callback_query_handler(func=lambda c: c.data == "swap_users")
async def swap_users_position(call: types.CallbackQuery):
await bot.answer_callback_query(
callback_query_id=call.id,
text=textbook.in_development_plug,
show_alert=True,
)
# Queue settings
@bot.callback_query_handler(func=lambda c: c.data == "edit_queue_name")
async def edit_queue_name_handler(call: types.CallbackQuery):
if await get_queue_from_state_data(call):
await bot.set_state(user_id=call.from_user.id, state=States.changing_queue_name)
await bot.edit_message_text(
chat_id=call.message.chat.id,
message_id=call.message.id,
text=textbook.edit_name,
reply_markup=keyboards.edit_name(),
)
@bot.callback_query_handler(
func=lambda c: c.data == "cancel", state=States.changing_queue_name
)
async def edit_queue_name_cancel_handler(call: types.CallbackQuery):
await queue_settings_handler(call)
@bot.message_handler(content_types=["text"], state=States.changing_queue_name)
async def update_queue_name(msg: Message):
if len(msg.text) > 40 or "\n" in msg.text:
await bot.send_message(chat_id=msg.chat.id, text=textbook.edit_name_error)
return None
async with bot.retrieve_data(
user_id=msg.from_user.id, chat_id=msg.chat.id
) as state_data:
queue_id = state_data.get("queue_id", None)
queue = session.query(Queue).filter_by(id=queue_id).first()
if not queue:
await bot.send_message(chat_id=msg.chat.id, text=textbook.edit_name_error)
return None
setattr(queue, "name", msg.text)
session.commit()
await bot.send_message(chat_id=msg.chat.id, text=textbook.edit_queue_name_success)
await asyncio.sleep(1)
await bot.set_state(user_id=msg.from_user.id, state=States.default)
await bot.send_message(
chat_id=msg.chat.id,
text=get_queue_stats_text(queue),
reply_markup=keyboards.queue_menu(),
parse_mode="html",
)
@bot.callback_query_handler(func=lambda c: c.data == "delete_queue_approve")
async def delete_queue_approve_handler(call: types.CallbackQuery):
await bot.edit_message_text(
chat_id=call.message.chat.id,
message_id=call.message.id,
text=textbook.delete_queue_approve,
reply_markup=keyboards.approve_queue_delete(),
)
await bot.answer_callback_query(callback_query_id=call.id)
@bot.callback_query_handler(func=lambda c: c.data == "delete_queue")
async def delete_queue_handler(call: types.CallbackQuery):
if queue := await get_queue_from_state_data(call):
for qu in queue.users:
session.delete(qu) # TODO: Use SQLAlchemy to cascade-delete all users
session.delete(queue)
session.commit()
await bot.answer_callback_query(
callback_query_id=call.id, text=textbook.queue_deleted
)
await my_queues_handler(call)
await bot.answer_callback_query(callback_query_id=call.id)
# User settings
@bot.callback_query_handler(func=lambda c: c.data == "edit_name")
async def edit_name_handler(call: types.CallbackQuery):
await bot.set_state(user_id=call.from_user.id, state=States.changing_name)
await bot.edit_message_text(
chat_id=call.message.chat.id,
message_id=call.message.id,
text=textbook.edit_name,
reply_markup=keyboards.edit_name(),
)
@bot.callback_query_handler(
func=lambda c: c.data == "cancel", state=States.changing_name
)
async def edit_name_cancel_handler(call: types.CallbackQuery):
await settings(call)
@bot.message_handler(content_types=["text"], state=States.changing_name)
async def update_queue_name(msg: Message):
if len(msg.text) > 40 or "\n" in msg.text:
await bot.send_message(chat_id=msg.chat.id, text=textbook.edit_name_error)
return None
user = session.query(User).filter_by(id=msg.from_user.id).first()
setattr(user, "name", msg.text)
session.commit()
await bot.send_message(chat_id=msg.chat.id, text=textbook.edit_name_success)
await asyncio.sleep(1)
await bot.set_state(user_id=msg.from_user.id, state=States.default)
await bot.send_message(
chat_id=msg.chat.id,
text=textbook.settings,
reply_markup=keyboards.settings(),
)
# Other
@bot.message_handler(commands=["mystate"])
async def mystate(msg):
state = await bot.get_state(user_id=msg.from_user.id)
await bot.send_message(chat_id=msg.from_user.id, text=state)
# Launch
async def main():
a = asyncio.create_task(bot.polling(non_stop=True))
await a
if __name__ == "__main__":
print("Bot started", flush=True)
Base.metadata.create_all(engine)
session = Session()
bot.add_custom_filter(StateFilter(bot))
bot.enable_saving_states(filename="./.state-save/states.pkl")
asyncio.run(main())