706 lines
24 KiB
Python
706 lines
24 KiB
Python
# Telebot imports
|
|
from telebot.async_telebot import AsyncTeleBot
|
|
from telebot.asyncio_storage import StateMemoryStorage, StatePickleStorage
|
|
from telebot.asyncio_handler_backends import State, StatesGroup
|
|
from telebot.asyncio_filters import StateFilter
|
|
from telebot import types
|
|
from telebot.callback_data import CallbackData, CallbackDataFilter
|
|
from telebot.types import Message
|
|
|
|
# Async things imports
|
|
import asyncio
|
|
|
|
# Other modules imports
|
|
import sqlite3
|
|
import json
|
|
from datetime import datetime
|
|
import math
|
|
import socket
|
|
import os
|
|
from typing import Union
|
|
|
|
# Local imports
|
|
from config import token
|
|
from constants import (
|
|
MAX_QUEUES_OWN,
|
|
MAX_QUEUES_PARTS_IN,
|
|
MAX_NAME_LENGTH,
|
|
MAX_QUEUE_NAME_LENGTH,
|
|
)
|
|
import textbook
|
|
import keyboards
|
|
|
|
# DB
|
|
from db.base import Session, engine, Base
|
|
from db.models import User, Queue, QueueUser
|
|
|
|
|
|
bot = AsyncTeleBot(token, state_storage=StatePickleStorage())
|
|
|
|
|
|
class States(StatesGroup):
|
|
default = State()
|
|
changing_name = State()
|
|
changing_queue_name = State()
|
|
|
|
|
|
# Utils
|
|
|
|
|
|
def get_queue_stats_text(queue: Queue) -> str:
|
|
s = textbook.queue_stats.format(name=queue.name, count=len(queue.users))
|
|
return s
|
|
|
|
|
|
async def get_queue_from_state_data(call: types.CallbackQuery) -> Queue:
|
|
async with bot.retrieve_data(
|
|
user_id=call.from_user.id, chat_id=call.message.chat.id
|
|
) as state_data:
|
|
queue_id = state_data.get("queue_id", None)
|
|
if not queue_id:
|
|
await bot.answer_callback_query(
|
|
callback_query_id=call.id, text=textbook.queue_operational_error
|
|
)
|
|
return None
|
|
queue = session.query(Queue).filter_by(id=queue_id).first()
|
|
if queue.owner.id != call.from_user.id:
|
|
await bot.answer_callback_query(
|
|
callback_query_id=call.id, text=textbook.queue_operational_error
|
|
)
|
|
return None
|
|
return queue
|
|
|
|
|
|
async def get_parting_queue_from_state_data(call: types.CallbackQuery) -> Queue:
|
|
async with bot.retrieve_data(
|
|
user_id=call.from_user.id, chat_id=call.message.chat.id
|
|
) as state_data:
|
|
queue_id = state_data.get("part_queue_id", None)
|
|
if not queue_id:
|
|
await bot.answer_callback_query(
|
|
callback_query_id=call.id, text=textbook.queue_operational_error
|
|
)
|
|
return None
|
|
queue = session.query(Queue).filter_by(id=queue_id).first()
|
|
return queue
|
|
|
|
|
|
def get_first_queue_user(queue: Queue) -> Union[QueueUser, None]:
|
|
arr = sorted(
|
|
queue.users, key=lambda qu: qu.position
|
|
) # TODO: Maybe there is a better solution..?
|
|
return arr[0] if len(arr) else None
|
|
|
|
|
|
def kick_first(queue: Queue) -> bool:
|
|
if len(queue.users):
|
|
first_user = get_first_queue_user(queue)
|
|
session.delete(first_user)
|
|
session.commit()
|
|
normalize_queue(queue)
|
|
return True
|
|
return False
|
|
|
|
|
|
def proceed_queue_user(queue: Queue, user: User) -> Union[QueueUser, None]:
|
|
first_queue_user = get_first_queue_user(queue)
|
|
if first_queue_user:
|
|
if user.id == first_queue_user.user_id:
|
|
kick_first(queue)
|
|
next_queue_user = get_first_queue_user(queue)
|
|
return next_queue_user
|
|
return None
|
|
|
|
|
|
def normalize_queue(queue: Queue) -> Queue:
|
|
# first_user = get_first_queue_user(queue)
|
|
# if first_user.position != 0:
|
|
# setattr(first_user, "position", 0)
|
|
# session.commit()
|
|
# queue = session.query(Queue).filter_by(id=queue.id).first()
|
|
for i, qu in enumerate(sorted(queue.users, key=lambda qu: qu.position)):
|
|
setattr(qu, "position", i)
|
|
session.commit()
|
|
return queue
|
|
|
|
|
|
async def update_queue_users_message(msg: Message, queue: Queue):
|
|
queue = normalize_queue(queue)
|
|
users_str = "\n".join([f"{qu.position}. {qu.user.name}" for qu in queue.users])
|
|
await bot.edit_message_text(
|
|
chat_id=msg.chat.id,
|
|
message_id=msg.id,
|
|
text=textbook.queue_users_list.format(name=queue.name, users_str=users_str),
|
|
reply_markup=keyboards.queue_users(queue.id),
|
|
parse_mode="html",
|
|
)
|
|
|
|
|
|
# Basic
|
|
|
|
|
|
@bot.message_handler(commands=["start"])
|
|
async def start(msg: Message):
|
|
if msg.chat.type == "private":
|
|
user = session.query(User).filter_by(id=msg.from_user.id).first()
|
|
if not user:
|
|
user = User(
|
|
id=msg.from_user.id,
|
|
name=msg.from_user.first_name,
|
|
username=msg.from_user.username,
|
|
)
|
|
session.add(user)
|
|
session.commit()
|
|
await bot.send_message(chat_id=msg.chat.id, text=textbook.start)
|
|
await asyncio.sleep(2)
|
|
await bot.set_state(user_id=msg.from_user.id, state=States.default)
|
|
if len(msg.text.split()) > 1:
|
|
command, queue_id = msg.text.split()
|
|
if queue := session.query(Queue).filter_by(id=queue_id).first():
|
|
if (
|
|
not session.query(QueueUser)
|
|
.filter_by(queue_id=queue.id, user_id=msg.from_user.id)
|
|
.first()
|
|
) and not len(user.takes_part_in_queues) > MAX_QUEUES_PARTS_IN:
|
|
last_user = (
|
|
session.query(QueueUser)
|
|
.filter_by(queue_id=queue.id)
|
|
.order_by(QueueUser.position.desc())
|
|
.first()
|
|
)
|
|
if last_user:
|
|
position = last_user.position + 1
|
|
else:
|
|
position = 0
|
|
queue_user = QueueUser(
|
|
user_id=msg.from_user.id, queue_id=queue.id, position=position
|
|
)
|
|
session.add(queue_user)
|
|
session.commit()
|
|
await bot.send_message(
|
|
chat_id=msg.chat.id,
|
|
text=textbook.joined_queue.format(
|
|
name=queue.name, position=queue_user.position
|
|
),
|
|
parse_mode="html",
|
|
)
|
|
else:
|
|
await bot.send_message(
|
|
chat_id=msg.chat.id,
|
|
text=textbook.error_joining_queue.format(name=queue.name),
|
|
parse_mode="html",
|
|
)
|
|
else:
|
|
await bot.send_message(
|
|
chat_id=msg.chat.id,
|
|
text=textbook.menu.format(name=user.name),
|
|
reply_markup=keyboards.menu(),
|
|
)
|
|
else:
|
|
await bot.send_message(chat_id=msg.chat.id, text=textbook.groups_plug)
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data == "to_menu")
|
|
async def to_menu_handler(call: types.CallbackQuery):
|
|
user = session.query(User).filter_by(id=call.from_user.id).first()
|
|
await bot.set_state(user_id=call.from_user.id, state=States.default)
|
|
await bot.edit_message_text(
|
|
chat_id=call.message.chat.id,
|
|
message_id=call.message.id,
|
|
text=textbook.menu.format(name=user.name),
|
|
reply_markup=keyboards.menu(),
|
|
)
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data[:2] == "p:")
|
|
async def proceed_user_handler(call: types.CallbackQuery):
|
|
queue_id = call.data[2:]
|
|
queue = session.query(Queue).filter_by(id=queue_id).first()
|
|
user = session.query(User).filter_by(id=call.from_user.id).first()
|
|
if next_queue_user := proceed_queue_user(queue, user):
|
|
try:
|
|
await bot.send_message(
|
|
chat_id=next_queue_user.user_id,
|
|
text=textbook.your_turn.format(name=queue.name),
|
|
reply_markup=keyboards.your_turn(queue.id),
|
|
parse_mode="html",
|
|
)
|
|
except:
|
|
await bot.send_message(
|
|
chat_id=queue.owner.id,
|
|
text=textbook.error_turn.format(name=queue.name),
|
|
reply_markup=keyboards.your_turn(queue.id),
|
|
parse_mode="html",
|
|
)
|
|
else:
|
|
try:
|
|
await bot.send_message(
|
|
chat_id=queue.owner.id,
|
|
text=textbook.queue_finished.format(name=queue.name),
|
|
parse_mode="html",
|
|
)
|
|
except:
|
|
pass
|
|
await bot.edit_message_text(
|
|
chat_id=call.message.chat.id,
|
|
message_id=call.message.id,
|
|
text=textbook.finished_turn.format(name=queue.name),
|
|
reply_markup=None,
|
|
)
|
|
await bot.answer_callback_query(callback_query_id=call.id)
|
|
|
|
|
|
# Main menu
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data == "new_queue")
|
|
async def new_queue_handler(call: types.CallbackQuery):
|
|
user = session.query(User).filter_by(id=call.from_user.id).first()
|
|
if user:
|
|
if len(user.owns_queues) < MAX_QUEUES_OWN:
|
|
queue = Queue(owner_id=call.from_user.id)
|
|
session.add(queue)
|
|
session.commit()
|
|
await bot.edit_message_text(
|
|
chat_id=call.message.chat.id,
|
|
message_id=call.message.id,
|
|
text=textbook.new_queue_created.format(id=queue.id),
|
|
reply_markup=keyboards.to_menu_keyboard(),
|
|
)
|
|
else:
|
|
await bot.answer_callback_query(
|
|
callback_query_id=call.id, text=textbook.queue_limit, show_alert=True
|
|
)
|
|
await bot.answer_callback_query(callback_query_id=call.id)
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data == "my_queues")
|
|
async def my_queues_handler(call: types.CallbackQuery):
|
|
user = session.query(User).filter_by(id=call.from_user.id).first()
|
|
queues = user.owns_queues
|
|
await bot.edit_message_text(
|
|
chat_id=call.message.chat.id,
|
|
message_id=call.message.id,
|
|
text=textbook.my_queues_list.format(count=len(queues)),
|
|
reply_markup=keyboards.my_queues(queues),
|
|
)
|
|
await bot.answer_callback_query(callback_query_id=call.id)
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data == "parts_queues")
|
|
async def parts_queues_handler(call: types.CallbackQuery):
|
|
user = session.query(User).filter_by(id=call.from_user.id).first()
|
|
queues = [qu.queue for qu in user.takes_part_in_queues]
|
|
await bot.edit_message_text(
|
|
chat_id=call.message.chat.id,
|
|
message_id=call.message.id,
|
|
text=textbook.parts_queues_menu.format(count=len(queues)),
|
|
reply_markup=keyboards.parts_queues(queues),
|
|
)
|
|
await bot.answer_callback_query(callback_query_id=call.id)
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data == "settings")
|
|
async def settings(call: types.CallbackQuery):
|
|
await bot.set_state(user_id=call.from_user.id, state=States.default)
|
|
await bot.edit_message_text(
|
|
chat_id=call.message.chat.id,
|
|
message_id=call.message.id,
|
|
text=textbook.settings,
|
|
reply_markup=keyboards.settings(),
|
|
)
|
|
await bot.answer_callback_query(callback_query_id=call.id)
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data == "about")
|
|
async def about_handler(call: types.CallbackQuery):
|
|
await bot.answer_callback_query(
|
|
callback_query_id=call.id,
|
|
text=textbook.about,
|
|
show_alert=True,
|
|
)
|
|
|
|
|
|
# Queue parts list menu
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data[:2] == "t:")
|
|
async def queue_parts_handler(call: types.CallbackQuery, queue_id: str = None):
|
|
queue_id = call.data[2:] if not queue_id else queue_id
|
|
queue = session.query(Queue).filter_by(id=queue_id).first()
|
|
if not queue:
|
|
await bot.answer_callback_query(callback_query_id=call.id, text=textbook.error)
|
|
return None
|
|
async with bot.retrieve_data(
|
|
user_id=call.from_user.id, chat_id=call.message.chat.id
|
|
) as state_data:
|
|
state_data["part_queue_id"] = queue_id
|
|
users_str = "\n".join([f"{qu.position}. {qu.user.name}" for qu in queue.users])
|
|
await bot.edit_message_text(
|
|
chat_id=call.message.chat.id,
|
|
message_id=call.message.id,
|
|
text=textbook.part_queue.format(name=queue.name, users_str=users_str),
|
|
reply_markup=keyboards.queue_part_in_menu(),
|
|
parse_mode="html",
|
|
)
|
|
await bot.answer_callback_query(callback_query_id=call.id)
|
|
|
|
|
|
# Queue partin menu
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data == "leave_queue")
|
|
async def leave_queue_handler(call: types.CallbackQuery):
|
|
if queue := await get_parting_queue_from_state_data(call):
|
|
queueuser = (
|
|
session.query(QueueUser)
|
|
.filter_by(queue_id=queue.id, user_id=call.from_user.id)
|
|
.first()
|
|
)
|
|
session.delete(queueuser)
|
|
session.commit()
|
|
normalize_queue(queue)
|
|
await bot.answer_callback_query(
|
|
callback_query_id=call.id,
|
|
text=textbook.leaved_queue.format(name=queue.name),
|
|
)
|
|
await parts_queues_handler(call)
|
|
return None
|
|
await bot.answer_callback_query(callback_query_id=call.id)
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data == "refresh_list")
|
|
async def refresh_list_handler(call: types.CallbackQuery):
|
|
if queue := await get_parting_queue_from_state_data(call):
|
|
users_str = "\n".join([f"{qu.position}. {qu.user.name}" for qu in queue.users])
|
|
try:
|
|
await bot.edit_message_text(
|
|
chat_id=call.message.chat.id,
|
|
message_id=call.message.id,
|
|
text=textbook.part_queue.format(name=queue.name, users_str=users_str),
|
|
reply_markup=keyboards.queue_part_in_menu(),
|
|
parse_mode="html",
|
|
)
|
|
except:
|
|
await asyncio.sleep(2)
|
|
await bot.answer_callback_query(callback_query_id=call.id)
|
|
|
|
|
|
# Queue own list menu
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data[:2] == "q:")
|
|
async def queue_handler(call: types.CallbackQuery, queue_id: str = None):
|
|
queue_id = call.data[2:] if not queue_id else queue_id
|
|
queue = session.query(Queue).filter_by(id=queue_id).first()
|
|
if not queue:
|
|
await bot.answer_callback_query(callback_query_id=call.id, text=textbook.error)
|
|
return None
|
|
async with bot.retrieve_data(
|
|
user_id=call.from_user.id, chat_id=call.message.chat.id
|
|
) as state_data:
|
|
state_data["queue_id"] = queue_id
|
|
await bot.edit_message_text(
|
|
chat_id=call.message.chat.id,
|
|
message_id=call.message.id,
|
|
text=get_queue_stats_text(queue),
|
|
reply_markup=keyboards.queue_menu(),
|
|
parse_mode="html",
|
|
)
|
|
await bot.answer_callback_query(callback_query_id=call.id)
|
|
|
|
|
|
# Queue menu
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data == "get_queue_users")
|
|
async def get_queue_users_handler(call: types.CallbackQuery):
|
|
if queue := await get_queue_from_state_data(call):
|
|
users_str = "\n".join([f"{qu.position}. {qu.user.name}" for qu in queue.users])
|
|
await bot.edit_message_text(
|
|
chat_id=call.message.chat.id,
|
|
message_id=call.message.id,
|
|
text=textbook.queue_users_list.format(name=queue.name, users_str=users_str),
|
|
reply_markup=keyboards.queue_users(queue.id),
|
|
parse_mode="html",
|
|
)
|
|
await bot.answer_callback_query(callback_query_id=call.id)
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data == "get_queue_link")
|
|
async def get_queue_link_handler(call: types.CallbackQuery):
|
|
if queue := await get_queue_from_state_data(call):
|
|
await bot.send_message(
|
|
chat_id=call.message.chat.id,
|
|
text=textbook.link_template.format(link=queue.id),
|
|
)
|
|
await bot.answer_callback_query(callback_query_id=call.id)
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data == "queue_settings")
|
|
async def queue_settings_handler(call: types.CallbackQuery):
|
|
if queue := await get_queue_from_state_data(call):
|
|
await bot.edit_message_text(
|
|
chat_id=call.message.chat.id,
|
|
message_id=call.message.id,
|
|
text=textbook.queue_settings.format(name=queue.name),
|
|
reply_markup=keyboards.queue_settings(queue_id=queue.id),
|
|
)
|
|
await bot.answer_callback_query(callback_query_id=call.id)
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data == "start_queue")
|
|
async def start_queue_handler(call: types.CallbackQuery):
|
|
if queue := await get_queue_from_state_data(call):
|
|
if first_queue_user := get_first_queue_user(queue):
|
|
try:
|
|
await bot.send_message(
|
|
chat_id=first_queue_user.user_id,
|
|
text=textbook.your_turn.format(name=queue.name),
|
|
reply_markup=keyboards.your_turn(queue.id),
|
|
parse_mode="html",
|
|
)
|
|
except:
|
|
await bot.send_message(
|
|
chat_id=queue.owner_id,
|
|
text=textbook.error_turn.format(name=first_queue_user.user.name),
|
|
parse_mode="html",
|
|
)
|
|
await bot.answer_callback_query(
|
|
callback_query_id=call.id,
|
|
text=textbook.queue_started.format(name=queue.name),
|
|
show_alert=True,
|
|
)
|
|
else:
|
|
await bot.answer_callback_query(
|
|
callback_query_id=call.id,
|
|
text=textbook.kick_first_error,
|
|
show_alert=True,
|
|
)
|
|
await bot.answer_callback_query(callback_query_id=call.id)
|
|
|
|
|
|
# Queue users
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data == "kick_first")
|
|
async def kick_first_handler(call: types.CallbackQuery):
|
|
if queue := await get_queue_from_state_data(call):
|
|
if queue.owner_id == call.from_user.id:
|
|
if kick_first(queue):
|
|
await bot.answer_callback_query(
|
|
callback_query_id=call.id, text=textbook.first_kicked
|
|
)
|
|
await update_queue_users_message(call.message, queue)
|
|
else:
|
|
await bot.answer_callback_query(
|
|
callback_query_id=call.id,
|
|
text=textbook.kick_first_error,
|
|
show_alert=True,
|
|
)
|
|
else:
|
|
await bot.answer_callback_query(
|
|
callback_query_id=call.id,
|
|
text=textbook.queue_operational_error,
|
|
show_alert=True,
|
|
)
|
|
await bot.answer_callback_query(callback_query_id=call.id)
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data == "swap_users")
|
|
async def swap_users_handler(call: types.CallbackQuery):
|
|
await bot.answer_callback_query(
|
|
callback_query_id=call.id,
|
|
text=textbook.in_development_plug,
|
|
show_alert=True,
|
|
)
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data == "refresh_users")
|
|
async def refresh_users_handler(call: types.CallbackQuery):
|
|
if queue := await get_queue_from_state_data(call):
|
|
try:
|
|
users_str = "\n".join(
|
|
[f"{qu.position}. {qu.user.name}" for qu in queue.users]
|
|
)
|
|
await bot.edit_message_text(
|
|
chat_id=call.message.chat.id,
|
|
message_id=call.message.id,
|
|
text=textbook.queue_users_list.format(
|
|
name=queue.name, users_str=users_str
|
|
),
|
|
reply_markup=keyboards.queue_users(queue.id),
|
|
parse_mode="html",
|
|
)
|
|
except:
|
|
await asyncio.sleep(2)
|
|
await bot.answer_callback_query(callback_query_id=call.id)
|
|
|
|
|
|
# Queue settings
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data == "edit_queue_name")
|
|
async def edit_queue_name_handler(call: types.CallbackQuery):
|
|
if await get_queue_from_state_data(call):
|
|
await bot.set_state(user_id=call.from_user.id, state=States.changing_queue_name)
|
|
await bot.edit_message_text(
|
|
chat_id=call.message.chat.id,
|
|
message_id=call.message.id,
|
|
text=textbook.edit_queue_name,
|
|
reply_markup=keyboards.edit_name(),
|
|
)
|
|
|
|
|
|
@bot.callback_query_handler(
|
|
func=lambda c: c.data == "cancel", state=States.changing_queue_name
|
|
)
|
|
async def edit_queue_name_cancel_handler(call: types.CallbackQuery):
|
|
await queue_settings_handler(call)
|
|
|
|
|
|
@bot.message_handler(content_types=["text"], state=States.changing_queue_name)
|
|
async def update_queue_name(msg: Message):
|
|
if len(msg.text) > MAX_QUEUE_NAME_LENGTH or "\n" in msg.text:
|
|
await bot.send_message(chat_id=msg.chat.id, text=textbook.edit_name_error)
|
|
return None
|
|
async with bot.retrieve_data(
|
|
user_id=msg.from_user.id, chat_id=msg.chat.id
|
|
) as state_data:
|
|
queue_id = state_data.get("queue_id", None)
|
|
queue = session.query(Queue).filter_by(id=queue_id).first()
|
|
if not queue:
|
|
await bot.send_message(chat_id=msg.chat.id, text=textbook.edit_name_error)
|
|
return None
|
|
setattr(queue, "name", msg.text)
|
|
session.commit()
|
|
await bot.send_message(chat_id=msg.chat.id, text=textbook.edit_queue_name_success)
|
|
await asyncio.sleep(1)
|
|
await bot.set_state(user_id=msg.from_user.id, state=States.default)
|
|
await bot.send_message(
|
|
chat_id=msg.chat.id,
|
|
text=get_queue_stats_text(queue),
|
|
reply_markup=keyboards.queue_menu(),
|
|
parse_mode="html",
|
|
)
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data == "edit_queue_description")
|
|
async def edit_queue_description_handler(call: types.CallbackQuery):
|
|
await bot.answer_callback_query(
|
|
callback_query_id=call.id,
|
|
text=textbook.in_development_plug,
|
|
show_alert=True,
|
|
)
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data == "delete_queue_approve")
|
|
async def delete_queue_approve_handler(call: types.CallbackQuery):
|
|
await bot.edit_message_text(
|
|
chat_id=call.message.chat.id,
|
|
message_id=call.message.id,
|
|
text=textbook.delete_queue_approve,
|
|
reply_markup=keyboards.approve_queue_delete(),
|
|
)
|
|
await bot.answer_callback_query(callback_query_id=call.id)
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data == "delete_queue")
|
|
async def delete_queue_handler(call: types.CallbackQuery):
|
|
if queue := await get_queue_from_state_data(call):
|
|
for qu in queue.users:
|
|
session.delete(qu) # TODO: Use SQLAlchemy to cascade-delete all users
|
|
session.delete(queue)
|
|
session.commit()
|
|
await bot.answer_callback_query(
|
|
callback_query_id=call.id, text=textbook.queue_deleted
|
|
)
|
|
await my_queues_handler(call)
|
|
await bot.answer_callback_query(callback_query_id=call.id)
|
|
|
|
|
|
# User settings
|
|
|
|
|
|
@bot.callback_query_handler(func=lambda c: c.data == "edit_name")
|
|
async def edit_name_handler(call: types.CallbackQuery):
|
|
await bot.set_state(user_id=call.from_user.id, state=States.changing_name)
|
|
await bot.edit_message_text(
|
|
chat_id=call.message.chat.id,
|
|
message_id=call.message.id,
|
|
text=textbook.edit_name,
|
|
reply_markup=keyboards.edit_name(),
|
|
)
|
|
|
|
|
|
@bot.callback_query_handler(
|
|
func=lambda c: c.data == "cancel", state=States.changing_name
|
|
)
|
|
async def edit_name_cancel_handler(call: types.CallbackQuery):
|
|
await settings(call)
|
|
|
|
|
|
@bot.message_handler(content_types=["text"], state=States.changing_name)
|
|
async def update_queue_name(msg: Message):
|
|
if len(msg.text) > MAX_NAME_LENGTH or "\n" in msg.text:
|
|
await bot.send_message(chat_id=msg.chat.id, text=textbook.edit_name_error)
|
|
return None
|
|
user = session.query(User).filter_by(id=msg.from_user.id).first()
|
|
setattr(user, "name", msg.text)
|
|
session.commit()
|
|
await bot.send_message(chat_id=msg.chat.id, text=textbook.edit_name_success)
|
|
await asyncio.sleep(1)
|
|
await bot.set_state(user_id=msg.from_user.id, state=States.default)
|
|
await bot.send_message(
|
|
chat_id=msg.chat.id,
|
|
text=textbook.settings,
|
|
reply_markup=keyboards.settings(),
|
|
)
|
|
|
|
|
|
# Other
|
|
|
|
|
|
@bot.message_handler(commands=["mystate"])
|
|
async def mystate(msg: Message):
|
|
state = await bot.get_state(user_id=msg.from_user.id)
|
|
await bot.send_message(chat_id=msg.from_user.id, text=state)
|
|
|
|
|
|
@bot.message_handler(commands=["chatid"])
|
|
async def chatid(msg: Message):
|
|
await bot.send_message(chat_id=msg.chat.id, text=msg.chat.id)
|
|
|
|
|
|
@bot.message_handler(commands=["stats"])
|
|
async def stats(msg: Message):
|
|
users_count = session.query(User).count()
|
|
queues_count = session.query(Queue).count()
|
|
await bot.send_message(
|
|
chat_id=msg.chat.id,
|
|
text=textbook.stats.format(users_count=users_count, queues_count=queues_count),
|
|
)
|
|
|
|
|
|
@bot.message_handler(commands=["changelog"])
|
|
async def changelog(msg: Message):
|
|
with open("changelog.txt", "r") as file:
|
|
await bot.send_message(chat_id=msg.chat.id, text=file.read(), parse_mode="html")
|
|
|
|
|
|
# Launch
|
|
|
|
|
|
async def main():
|
|
a = asyncio.create_task(bot.polling(non_stop=True))
|
|
await a
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("Bot started", flush=True)
|
|
Base.metadata.create_all(engine)
|
|
session = Session()
|
|
bot.add_custom_filter(StateFilter(bot))
|
|
bot.enable_saving_states(filename="./.state-save/states.pkl")
|
|
asyncio.run(main())
|