Files
bdbot/bot/handlers/command_handlers.py
2026-01-28 15:53:27 +03:00

200 lines
9.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Async handlers for group commands."""
from telebot.async_telebot import AsyncTeleBot
from telebot import types
from datetime import datetime, timedelta, date
from typing import Dict, List
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from bot.database import get_db_session, User, Chat, UserChat
from bot.logger import get_logger
from collections import defaultdict
logger = get_logger(__name__)
def register_command_handlers(bot: AsyncTeleBot) -> None:
"""Register all command handlers."""
@bot.message_handler(commands=['stats'], chat_types=['group', 'supergroup'])
async def handle_stats(message: types.Message) -> None:
"""Handle /stats command - show statistics."""
chat_id = message.chat.id
user_id = message.from_user.id if message.from_user else None
logger.info(f"Command /stats from user {user_id} in chat {chat_id}")
async for db in get_db_session():
try:
# Check if bot is admin
result = await db.execute(select(Chat).filter(Chat.chat_id == chat_id))
chat = result.scalar_one_or_none()
if not chat or not chat.bot_is_admin:
logger.warning(f"/stats command denied - bot not admin in chat {chat_id}")
await bot.reply_to(message, "Мне нужны права администратора для выполнения этой команды.")
return
# Get total members count (exclude bots)
try:
total_members_raw = await bot.get_chat_member_count(chat_id)
# Try to subtract all bots (including this bot) using admin list
human_members = total_members_raw
try:
admins = await bot.get_chat_administrators(chat_id)
bots_in_admins = sum(1 for m in admins if getattr(m.user, "is_bot", False))
human_members = max(total_members_raw - bots_in_admins, 0)
except Exception:
human_members = total_members_raw
total_members = human_members
except Exception:
total_members = 0
# Get users who shared birthday
result = await db.execute(
select(func.count(User.user_id.distinct())).select_from(User).join(UserChat).filter(
UserChat.chat_id == chat_id
)
)
users_with_birthday = result.scalar() or 0
users_without_birthday = max(total_members - users_with_birthday, 0)
# Format message
if total_members > 0:
percentage = (users_with_birthday / total_members) * 100
stats_text = (
f"📊 Статистика чата:\n\n"
f"Всего участников: {total_members}\n"
f"• Поделились днем рождения: {users_with_birthday}\n"
f"Не поделились: {users_without_birthday}\n"
f"• Процент: {percentage:.1f}%"
)
else:
stats_text = (
f"📊 Статистика чата:\n\n"
f"• Поделились днем рождения: {users_with_birthday}"
)
await bot.reply_to(message, stats_text)
finally:
await db.close()
@bot.message_handler(commands=['week'], chat_types=['group', 'supergroup'])
async def handle_week(message: types.Message) -> None:
"""Handle /week command - show birthdays for next 7 days."""
chat_id = message.chat.id
user_id = message.from_user.id if message.from_user else None
logger.info(f"Command /week from user {user_id} in chat {chat_id}")
async for db in get_db_session():
try:
# Check if bot is admin
result = await db.execute(select(Chat).filter(Chat.chat_id == chat_id))
chat = result.scalar_one_or_none()
if not chat or not chat.bot_is_admin:
logger.warning(f"/week command denied - bot not admin in chat {chat_id}")
await bot.reply_to(message, "Мне нужны права администратора для выполнения этой команды.")
return
# Get birthdays for next 7 days
today = datetime.now().date()
birthdays = await get_birthdays_in_range(db, chat_id, today, days=7)
if not birthdays:
await bot.reply_to(message, "На ближайшие 7 дней дней рождений не запланировано.")
return
# Format message
message_text = "🎂 Дни рождения на ближайшие 7 дней:\n\n"
for date_str, names in sorted(birthdays.items()):
names_list = ", ".join(names)
message_text += f"{date_str}: {names_list}\n"
await bot.reply_to(message, message_text)
finally:
await db.close()
@bot.message_handler(commands=['month'], chat_types=['group', 'supergroup'])
async def handle_month(message: types.Message) -> None:
"""Handle /month command - show birthdays for next 31 days."""
chat_id = message.chat.id
user_id = message.from_user.id if message.from_user else None
logger.info(f"Command /month from user {user_id} in chat {chat_id}")
async for db in get_db_session():
try:
# Check if bot is admin
result = await db.execute(select(Chat).filter(Chat.chat_id == chat_id))
chat = result.scalar_one_or_none()
if not chat or not chat.bot_is_admin:
logger.warning(f"/month command denied - bot not admin in chat {chat_id}")
await bot.reply_to(message, "Мне нужны права администратора для выполнения этой команды.")
return
# Get birthdays for next 31 days
today = datetime.now().date()
birthdays = await get_birthdays_in_range(db, chat_id, today, days=31)
if not birthdays:
await bot.reply_to(message, "На ближайшие 31 день дней рождений не запланировано.")
return
# Format message
message_text = "🎂 Дни рождения на ближайшие 31 день:\n\n"
for date_str, names in sorted(birthdays.items()):
names_list = ", ".join(names)
message_text += f"{date_str}: {names_list}\n"
await bot.reply_to(message, message_text)
finally:
await db.close()
@bot.message_handler(commands=['help'], chat_types=['group', 'supergroup'])
async def handle_help(message: types.Message) -> None:
"""Handle /help command - show help message."""
help_text = (
"📚 Команды бота:\n\n"
"/stats - Показать статистику: сколько человек поделились днем рождения\n"
"/week - Показать дни рождения на ближайшие 7 дней\n"
"/month - Показать дни рождения на ближайшие 31 день\n"
"/help - Показать это сообщение\n\n"
"Чтобы поделиться своим днем рождения, напиши боту в личку /start\n\n"
"from olly & cursor with <3"
)
await bot.reply_to(message, help_text)
async def get_birthdays_in_range(db: AsyncSession, chat_id: int, start_date: date, days: int) -> Dict[str, List[str]]:
"""Get birthdays in the specified date range for users in the chat."""
birthdays = defaultdict(list)
# Get all users in this chat
result = await db.execute(
select(User).join(UserChat).filter(UserChat.chat_id == chat_id).distinct()
)
users = result.scalars().all()
end_date = start_date + timedelta(days=days)
for user in users:
# Create birthday date for current year
try:
birthday_this_year = datetime(start_date.year, user.birthday_month, user.birthday_day).date()
except ValueError:
# Invalid date (e.g., Feb 29 in non-leap year)
continue
# Check if birthday falls in range
if start_date <= birthday_this_year < end_date:
date_str = f"{user.birthday_day:02d}.{user.birthday_month:02d}"
birthdays[date_str].append(user.first_name)
else:
# Check next year if we're near year end
try:
birthday_next_year = datetime(start_date.year + 1, user.birthday_month, user.birthday_day).date()
if start_date <= birthday_next_year < end_date:
date_str = f"{user.birthday_day:02d}.{user.birthday_month:02d}"
birthdays[date_str].append(user.first_name)
except ValueError:
pass
return birthdays