Files
bdbot/handlers/group_handlers.py
2026-01-28 12:36:27 +03:00

190 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Handlers for group events."""
import telebot
from typing import Optional
from telebot import types
from sqlalchemy.orm import Session
from database import get_db_session, User, Chat, UserChat
def register_group_handlers(bot: telebot.TeleBot) -> None:
"""Register all group event handlers."""
@bot.message_handler(content_types=['new_chat_members'])
def handle_new_member(message: telebot.types.Message) -> None:
"""Handle bot being added to a chat."""
# Check if bot was added
bot_me = bot.get_me()
if not bot_me:
return
for member in message.new_chat_members:
if member.id == bot_me.id:
chat_id = message.chat.id
chat_title: str = message.chat.title or "Unknown"
db = get_db_session()
try:
# Check if chat exists
chat = db.query(Chat).filter(Chat.chat_id == chat_id).first()
if not chat:
chat = Chat(chat_id=chat_id, chat_title=chat_title, bot_is_admin=False)
db.add(chat)
db.commit()
# Check if bot is admin
try:
bot_me = bot.get_me()
if not bot_me:
return
bot_member = bot.get_chat_member(chat_id, bot_me.id)
is_admin = bot_member.status in ['administrator', 'creator']
chat.bot_is_admin = is_admin
db.commit()
if not is_admin:
# Request admin rights
bot.reply_to(
message,
"Привет! Мне нужны права администратора, чтобы видеть список участников чата. "
"Пожалуйста, выдайте мне права администратора."
)
else:
# Sync users and show statistics
sync_chat_users(bot, db, chat_id)
show_statistics(bot, message.chat)
except Exception:
# Bot might not have permission to check
chat.bot_is_admin = False
db.commit()
bot.reply_to(
message,
"Привет! Мне нужны права администратора, чтобы видеть список участников чата. "
"Пожалуйста, выдайте мне права администратора."
)
finally:
db.close()
break
# NOTE:
# Updates about the bot itself (когда бота повышают до админа / понижают)
# приходят в поле `my_chat_member`, а не `chat_member`.
# Для их обработки в pyTelegramBotAPI нужно использовать my_chat_member_handler.
@bot.my_chat_member_handler()
def handle_chat_member_update(message: telebot.types.ChatMemberUpdated) -> None:
"""Handle my_chat_member updates (bot role changes, e.g. becoming admin)."""
bot_me = bot.get_me()
if not bot_me:
return
if message.new_chat_member.user.id == bot_me.id:
# Bot's status changed
chat_id = message.chat.id
chat_title: str = message.chat.title or "Unknown"
db = get_db_session()
try:
chat = db.query(Chat).filter(Chat.chat_id == chat_id).first()
if not chat:
chat = Chat(chat_id=chat_id, chat_title=chat_title, bot_is_admin=False)
db.add(chat)
# Check if bot became admin
is_admin = message.new_chat_member.status in ['administrator', 'creator']
was_admin = chat.bot_is_admin if chat else False
if chat:
chat.bot_is_admin = is_admin
chat.chat_title = chat_title
db.commit()
if is_admin and not was_admin:
# Bot just became admin - sync users and show statistics
sync_chat_users(bot, db, chat_id)
show_statistics(bot, message.chat)
finally:
db.close()
def sync_chat_users(bot: telebot.TeleBot, db: Session, chat_id: int) -> None:
"""Sync users from chat with database."""
try:
# Get all chat members (this requires admin rights)
# Note: This is a simplified approach - in production you might want to
# use get_chat_administrators or iterate through members differently
# For now, we'll sync users as they interact with the bot
# Get all users who already shared birthday
existing_users = db.query(User).all()
# Try to check if they're in this chat and add relationships
for user in existing_users:
try:
member = bot.get_chat_member(chat_id, user.user_id)
if member.status not in ['left', 'kicked']:
# User is in chat - ensure relationship exists
user_chat = db.query(UserChat).filter(
UserChat.user_id == user.user_id,
UserChat.chat_id == chat_id
).first()
if not user_chat:
user_chat = UserChat(user_id=user.user_id, chat_id=chat_id)
db.add(user_chat)
except Exception:
# User might have blocked bot or is not in chat
pass
db.commit()
except Exception:
# If sync fails, continue anyway
pass
def show_statistics(bot: telebot.TeleBot, chat: telebot.types.Chat) -> None:
"""Show statistics about users who shared their birthday."""
db = get_db_session()
try:
chat_id = chat.id
# Get all chat members
try:
members_count = bot.get_chat_member_count(chat_id)
except Exception:
members_count = 0
# Get users from this chat who shared birthday
users_with_birthday = db.query(User).join(UserChat).filter(
UserChat.chat_id == chat_id
).distinct().count()
# Create message
if members_count > 0:
percentage = (users_with_birthday / members_count) * 100
message_text = (
f"Отлично! Теперь я админ этого чата.\n\n"
f"📊 Статистика:\n"
f"• Поделились днем рождения: {users_with_birthday} из {members_count} участников\n"
f"• Процент: {percentage:.1f}%"
)
else:
message_text = (
f"Отлично! Теперь я админ этого чата.\n\n"
f"📊 Поделились днем рождения: {users_with_birthday} участников"
)
# Create inline keyboard with button to start private chat
bot_me = bot.get_me()
if not bot_me or not bot_me.username:
bot.send_message(chat_id, message_text)
return
keyboard = types.InlineKeyboardMarkup()
start_button = types.InlineKeyboardButton(
text="Поделиться днем рождения",
url=f"https://t.me/{bot_me.username}?start=share_birthday"
)
keyboard.add(start_button)
bot.send_message(chat_id, message_text, reply_markup=keyboard)
finally:
db.close()