"""Async handlers for group commands.""" from telebot.async_telebot import AsyncTeleBot from telebot import types from datetime import datetime, timedelta, date from typing import Dict, List from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func from bot.database import get_db_session, User, Chat, UserChat from bot.logger import get_logger from collections import defaultdict logger = get_logger(__name__) def register_command_handlers(bot: AsyncTeleBot) -> None: """Register all command handlers.""" @bot.message_handler(commands=['stats'], chat_types=['group', 'supergroup']) async def handle_stats(message: types.Message) -> None: """Handle /stats command - show statistics.""" chat_id = message.chat.id user_id = message.from_user.id if message.from_user else None logger.info(f"Command /stats from user {user_id} in chat {chat_id}") async for db in get_db_session(): try: # Check if bot is admin result = await db.execute(select(Chat).filter(Chat.chat_id == chat_id)) chat = result.scalar_one_or_none() if not chat or not chat.bot_is_admin: logger.warning(f"/stats command denied - bot not admin in chat {chat_id}") await bot.reply_to(message, "Мне нужны права администратора для выполнения этой команды.") return # Get total members count (exclude bots) try: total_members_raw = await bot.get_chat_member_count(chat_id) # Try to subtract all bots (including this bot) using admin list human_members = total_members_raw try: admins = await bot.get_chat_administrators(chat_id) bots_in_admins = sum(1 for m in admins if getattr(m.user, "is_bot", False)) human_members = max(total_members_raw - bots_in_admins, 0) except Exception: human_members = total_members_raw total_members = human_members except Exception: total_members = 0 # Get users who shared birthday result = await db.execute( select(func.count(User.user_id.distinct())).select_from(User).join(UserChat).filter( UserChat.chat_id == chat_id ) ) users_with_birthday = result.scalar() or 0 users_without_birthday = max(total_members - users_with_birthday, 0) # Format message if total_members > 0: percentage = (users_with_birthday / total_members) * 100 stats_text = ( f"📊 Статистика чата:\n\n" f"• Всего участников: {total_members}\n" f"• Поделились днем рождения: {users_with_birthday}\n" f"• Не поделились: {users_without_birthday}\n" f"• Процент: {percentage:.1f}%" ) else: stats_text = ( f"📊 Статистика чата:\n\n" f"• Поделились днем рождения: {users_with_birthday}" ) await bot.reply_to(message, stats_text) finally: await db.close() @bot.message_handler(commands=['week'], chat_types=['group', 'supergroup']) async def handle_week(message: types.Message) -> None: """Handle /week command - show birthdays for next 7 days.""" chat_id = message.chat.id user_id = message.from_user.id if message.from_user else None logger.info(f"Command /week from user {user_id} in chat {chat_id}") async for db in get_db_session(): try: # Check if bot is admin result = await db.execute(select(Chat).filter(Chat.chat_id == chat_id)) chat = result.scalar_one_or_none() if not chat or not chat.bot_is_admin: logger.warning(f"/week command denied - bot not admin in chat {chat_id}") await bot.reply_to(message, "Мне нужны права администратора для выполнения этой команды.") return # Get birthdays for next 7 days today = datetime.now().date() birthdays = await get_birthdays_in_range(db, chat_id, today, days=7) if not birthdays: await bot.reply_to(message, "На ближайшие 7 дней дней рождений не запланировано.") return # Format message message_text = "🎂 Дни рождения на ближайшие 7 дней:\n\n" for date_str, names in sorted(birthdays.items()): names_list = ", ".join(names) message_text += f"• {date_str}: {names_list}\n" await bot.reply_to(message, message_text) finally: await db.close() @bot.message_handler(commands=['month'], chat_types=['group', 'supergroup']) async def handle_month(message: types.Message) -> None: """Handle /month command - show birthdays for next 31 days.""" chat_id = message.chat.id user_id = message.from_user.id if message.from_user else None logger.info(f"Command /month from user {user_id} in chat {chat_id}") async for db in get_db_session(): try: # Check if bot is admin result = await db.execute(select(Chat).filter(Chat.chat_id == chat_id)) chat = result.scalar_one_or_none() if not chat or not chat.bot_is_admin: logger.warning(f"/month command denied - bot not admin in chat {chat_id}") await bot.reply_to(message, "Мне нужны права администратора для выполнения этой команды.") return # Get birthdays for next 31 days today = datetime.now().date() birthdays = await get_birthdays_in_range(db, chat_id, today, days=31) if not birthdays: await bot.reply_to(message, "На ближайшие 31 день дней рождений не запланировано.") return # Format message message_text = "🎂 Дни рождения на ближайшие 31 день:\n\n" for date_str, names in sorted(birthdays.items()): names_list = ", ".join(names) message_text += f"• {date_str}: {names_list}\n" await bot.reply_to(message, message_text) finally: await db.close() @bot.message_handler(commands=['help'], chat_types=['group', 'supergroup']) async def handle_help(message: types.Message) -> None: """Handle /help command - show help message.""" help_text = ( "📚 Команды бота:\n\n" "/stats - Показать статистику: сколько человек поделились днем рождения\n" "/week - Показать дни рождения на ближайшие 7 дней\n" "/month - Показать дни рождения на ближайшие 31 день\n" "/help - Показать это сообщение\n\n" "Чтобы поделиться своим днем рождения, напиши боту в личку /start\n\n" "from olly & cursor with <3" ) await bot.reply_to(message, help_text) async def get_birthdays_in_range(db: AsyncSession, chat_id: int, start_date: date, days: int) -> Dict[str, List[str]]: """Get birthdays in the specified date range for users in the chat.""" birthdays = defaultdict(list) # Get all users in this chat result = await db.execute( select(User).join(UserChat).filter(UserChat.chat_id == chat_id).distinct() ) users = result.scalars().all() end_date = start_date + timedelta(days=days) for user in users: # Create birthday date for current year try: birthday_this_year = datetime(start_date.year, user.birthday_month, user.birthday_day).date() except ValueError: # Invalid date (e.g., Feb 29 in non-leap year) continue # Check if birthday falls in range if start_date <= birthday_this_year < end_date: date_str = f"{user.birthday_day:02d}.{user.birthday_month:02d}" birthdays[date_str].append(user.first_name) else: # Check next year if we're near year end try: birthday_next_year = datetime(start_date.year + 1, user.birthday_month, user.birthday_day).date() if start_date <= birthday_next_year < end_date: date_str = f"{user.birthday_day:02d}.{user.birthday_month:02d}" birthdays[date_str].append(user.first_name) except ValueError: pass return birthdays