This commit is contained in:
2026-01-28 11:30:30 +03:00
commit 641b88138d
18 changed files with 1655 additions and 0 deletions

1
handlers/__init__.py Normal file
View File

@ -0,0 +1 @@
# Handlers package

View File

@ -0,0 +1,169 @@
"""Handlers for group commands."""
import telebot
from datetime import datetime, timedelta, date
from typing import Dict, List
from sqlalchemy.orm import Session
from database import get_db_session, User, Chat, UserChat
from collections import defaultdict
def register_command_handlers(bot: telebot.TeleBot) -> None:
"""Register all command handlers."""
@bot.message_handler(commands=['stats'], chat_types=['group', 'supergroup'])
def handle_stats(message: telebot.types.Message) -> None:
"""Handle /stats command - show statistics."""
chat_id = message.chat.id
db = get_db_session()
try:
# Check if bot is admin
chat = db.query(Chat).filter(Chat.chat_id == chat_id).first()
if not chat or not chat.bot_is_admin:
bot.reply_to(message, "Мне нужны права администратора для выполнения этой команды.")
return
# Get total members count
try:
total_members = bot.get_chat_member_count(chat_id)
except Exception:
total_members = 0
# Get users who shared birthday
users_with_birthday = db.query(User).join(UserChat).filter(
UserChat.chat_id == chat_id
).distinct().count()
users_without_birthday = total_members - users_with_birthday
# Format message
if total_members > 0:
percentage = (users_with_birthday / total_members) * 100
stats_text = (
f"📊 Статистика чата:\n\n"
f"Всего участников: {total_members}\n"
f"• Поделились днем рождения: {users_with_birthday}\n"
f"Не поделились: {users_without_birthday}\n"
f"• Процент: {percentage:.1f}%"
)
else:
stats_text = (
f"📊 Статистика чата:\n\n"
f"• Поделились днем рождения: {users_with_birthday}"
)
bot.reply_to(message, stats_text)
finally:
db.close()
@bot.message_handler(commands=['week'], chat_types=['group', 'supergroup'])
def handle_week(message: telebot.types.Message) -> None:
"""Handle /week command - show birthdays for next 7 days."""
chat_id = message.chat.id
db = get_db_session()
try:
# Check if bot is admin
chat = db.query(Chat).filter(Chat.chat_id == chat_id).first()
if not chat or not chat.bot_is_admin:
bot.reply_to(message, "Мне нужны права администратора для выполнения этой команды.")
return
# Get birthdays for next 7 days
today = datetime.now().date()
birthdays = get_birthdays_in_range(db, chat_id, today, days=7)
if not birthdays:
bot.reply_to(message, "На ближайшие 7 дней дней рождений не запланировано.")
return
# Format message
message_text = "🎂 Дни рождения на ближайшие 7 дней:\n\n"
for date_str, names in sorted(birthdays.items()):
names_list = ", ".join(names)
message_text += f"{date_str}: {names_list}\n"
bot.reply_to(message, message_text)
finally:
db.close()
@bot.message_handler(commands=['month'], chat_types=['group', 'supergroup'])
def handle_month(message: telebot.types.Message) -> None:
"""Handle /month command - show birthdays for next 30 days."""
chat_id = message.chat.id
db = get_db_session()
try:
# Check if bot is admin
chat = db.query(Chat).filter(Chat.chat_id == chat_id).first()
if not chat or not chat.bot_is_admin:
bot.reply_to(message, "Мне нужны права администратора для выполнения этой команды.")
return
# Get birthdays for next 30 days
today = datetime.now().date()
birthdays = get_birthdays_in_range(db, chat_id, today, days=30)
if not birthdays:
bot.reply_to(message, "На ближайшие 30 дней дней рождений не запланировано.")
return
# Format message
message_text = "🎂 Дни рождения на ближайшие 30 дней:\n\n"
for date_str, names in sorted(birthdays.items()):
names_list = ", ".join(names)
message_text += f"{date_str}: {names_list}\n"
bot.reply_to(message, message_text)
finally:
db.close()
@bot.message_handler(commands=['help'], chat_types=['group', 'supergroup'])
def handle_help(message: telebot.types.Message) -> None:
"""Handle /help command - show help message."""
help_text = (
"📚 Команды бота:\n\n"
"/stats - Показать статистику: сколько человек поделились днем рождения\n"
"/week - Показать дни рождения на ближайшие 7 дней\n"
"/month - Показать дни рождения на ближайшие 30 дней\n"
"/help - Показать это сообщение\n\n"
"Чтобы поделиться своим днем рождения, напиши боту в личку /start\n\n"
"from olly & cursor with <3"
)
bot.reply_to(message, help_text)
def get_birthdays_in_range(db: Session, chat_id: int, start_date: date, days: int) -> Dict[str, List[str]]:
"""Get birthdays in the specified date range for users in the chat."""
birthdays = defaultdict(list)
# Get all users in this chat
users = db.query(User).join(UserChat).filter(
UserChat.chat_id == chat_id
).distinct().all()
end_date = start_date + timedelta(days=days)
for user in users:
# Create birthday date for current year
try:
birthday_this_year = datetime(start_date.year, user.birthday_month, user.birthday_day).date()
except ValueError:
# Invalid date (e.g., Feb 29 in non-leap year)
continue
# Check if birthday falls in range
if start_date <= birthday_this_year < end_date:
date_str = f"{user.birthday_day:02d}.{user.birthday_month:02d}"
birthdays[date_str].append(user.first_name)
else:
# Check next year if we're near year end
try:
birthday_next_year = datetime(start_date.year + 1, user.birthday_month, user.birthday_day).date()
if start_date <= birthday_next_year < end_date:
date_str = f"{user.birthday_day:02d}.{user.birthday_month:02d}"
birthdays[date_str].append(user.first_name)
except ValueError:
pass
return birthdays

185
handlers/group_handlers.py Normal file
View File

@ -0,0 +1,185 @@
"""Handlers for group events."""
import telebot
from typing import Optional
from telebot import types
from sqlalchemy.orm import Session
from database import get_db_session, User, Chat, UserChat
def register_group_handlers(bot: telebot.TeleBot) -> None:
"""Register all group event handlers."""
@bot.message_handler(content_types=['new_chat_members'])
def handle_new_member(message: telebot.types.Message) -> None:
"""Handle bot being added to a chat."""
# Check if bot was added
bot_me = bot.get_me()
if not bot_me:
return
for member in message.new_chat_members:
if member.id == bot_me.id:
chat_id = message.chat.id
chat_title: str = message.chat.title or "Unknown"
db = get_db_session()
try:
# Check if chat exists
chat = db.query(Chat).filter(Chat.chat_id == chat_id).first()
if not chat:
chat = Chat(chat_id=chat_id, chat_title=chat_title, bot_is_admin=False)
db.add(chat)
db.commit()
# Check if bot is admin
try:
bot_me = bot.get_me()
if not bot_me:
return
bot_member = bot.get_chat_member(chat_id, bot_me.id)
is_admin = bot_member.status in ['administrator', 'creator']
chat.bot_is_admin = is_admin
db.commit()
if not is_admin:
# Request admin rights
bot.reply_to(
message,
"Привет! Мне нужны права администратора, чтобы видеть список участников чата. "
"Пожалуйста, выдайте мне права администратора."
)
else:
# Sync users and show statistics
sync_chat_users(bot, db, chat_id)
show_statistics(bot, message.chat)
except Exception:
# Bot might not have permission to check
chat.bot_is_admin = False
db.commit()
bot.reply_to(
message,
"Привет! Мне нужны права администратора, чтобы видеть список участников чата. "
"Пожалуйста, выдайте мне права администратора."
)
finally:
db.close()
break
@bot.chat_member_handler()
def handle_chat_member_update(message: telebot.types.ChatMemberUpdated) -> None:
"""Handle chat member updates (including bot becoming admin)."""
bot_me = bot.get_me()
if not bot_me:
return
if message.new_chat_member.user.id == bot_me.id:
# Bot's status changed
chat_id = message.chat.id
chat_title: str = message.chat.title or "Unknown"
db = get_db_session()
try:
chat = db.query(Chat).filter(Chat.chat_id == chat_id).first()
if not chat:
chat = Chat(chat_id=chat_id, chat_title=chat_title, bot_is_admin=False)
db.add(chat)
# Check if bot became admin
is_admin = message.new_chat_member.status in ['administrator', 'creator']
was_admin = chat.bot_is_admin if chat else False
if chat:
chat.bot_is_admin = is_admin
chat.chat_title = chat_title
db.commit()
if is_admin and not was_admin:
# Bot just became admin - sync users and show statistics
sync_chat_users(bot, db, chat_id)
show_statistics(bot, message.chat)
finally:
db.close()
def sync_chat_users(bot: telebot.TeleBot, db: Session, chat_id: int) -> None:
"""Sync users from chat with database."""
try:
# Get all chat members (this requires admin rights)
# Note: This is a simplified approach - in production you might want to
# use get_chat_administrators or iterate through members differently
# For now, we'll sync users as they interact with the bot
# Get all users who already shared birthday
existing_users = db.query(User).all()
# Try to check if they're in this chat and add relationships
for user in existing_users:
try:
member = bot.get_chat_member(chat_id, user.user_id)
if member.status not in ['left', 'kicked']:
# User is in chat - ensure relationship exists
user_chat = db.query(UserChat).filter(
UserChat.user_id == user.user_id,
UserChat.chat_id == chat_id
).first()
if not user_chat:
user_chat = UserChat(user_id=user.user_id, chat_id=chat_id)
db.add(user_chat)
except Exception:
# User might have blocked bot or is not in chat
pass
db.commit()
except Exception:
# If sync fails, continue anyway
pass
def show_statistics(bot: telebot.TeleBot, chat: telebot.types.Chat) -> None:
"""Show statistics about users who shared their birthday."""
db = get_db_session()
try:
chat_id = chat.id
# Get all chat members
try:
members_count = bot.get_chat_member_count(chat_id)
except Exception:
members_count = 0
# Get users from this chat who shared birthday
users_with_birthday = db.query(User).join(UserChat).filter(
UserChat.chat_id == chat_id
).distinct().count()
# Create message
if members_count > 0:
percentage = (users_with_birthday / members_count) * 100
message_text = (
f"Отлично! Теперь я админ этого чата.\n\n"
f"📊 Статистика:\n"
f"• Поделились днем рождения: {users_with_birthday} из {members_count} участников\n"
f"• Процент: {percentage:.1f}%"
)
else:
message_text = (
f"Отлично! Теперь я админ этого чата.\n\n"
f"📊 Поделились днем рождения: {users_with_birthday} участников"
)
# Create inline keyboard with button to start private chat
bot_me = bot.get_me()
if not bot_me or not bot_me.username:
bot.send_message(chat_id, message_text)
return
keyboard = types.InlineKeyboardMarkup()
start_button = types.InlineKeyboardButton(
text="Поделиться днем рождения",
url=f"https://t.me/{bot_me.username}?start=share_birthday"
)
keyboard.add(start_button)
bot.send_message(chat_id, message_text, reply_markup=keyboard)
finally:
db.close()

View File

@ -0,0 +1,305 @@
"""Handlers for private messages."""
import re
from typing import Optional, Dict
import telebot
from telebot import types
from sqlalchemy.orm import Session
from database import get_db_session, User, Chat, UserChat
from messages import THEMES, get_theme_emoji
# User states for conversation flow
user_states: Dict[int, str] = {}
def register_private_handlers(bot: telebot.TeleBot) -> None:
"""Register all private message handlers."""
@bot.message_handler(commands=['start'], chat_types=['private'])
def handle_start(message: telebot.types.Message) -> None:
"""Handle /start command in private chat."""
if not message.from_user:
return
user_id = message.from_user.id
username: Optional[str] = message.from_user.username
first_name: str = message.from_user.first_name or "Пользователь"
db = get_db_session()
try:
# Check if user exists
user = db.query(User).filter(User.user_id == user_id).first()
if user:
# User already exists - ask if they want to update
bot.send_message(
user_id,
f"Привет, {first_name}! Ты уже поделился со мной днем рождения.\n"
f"Используй /update, чтобы обновить свои данные."
)
else:
# New user - ask for birthday
user_states[user_id] = 'waiting_birthday'
bot.send_message(
user_id,
f"Привет, {first_name}! 👋\n\n"
f"Пришли мне свой день рождения в формате ДД.ММ или ДД.ММ.ГГГГ\n"
f"Например: 15.03 или 15.03.1990"
)
finally:
db.close()
@bot.message_handler(commands=['update'], chat_types=['private'])
def handle_update(message: telebot.types.Message) -> None:
"""Handle /update command in private chat."""
if not message.from_user:
return
user_id = message.from_user.id
first_name: str = message.from_user.first_name or "Пользователь"
db = get_db_session()
try:
user = db.query(User).filter(User.user_id == user_id).first()
if user:
user_states[user_id] = 'waiting_birthday'
bot.send_message(
user_id,
f"Хорошо, {first_name}! Пришли мне свой день рождения в формате ДД.ММ или ДД.ММ.ГГГГ"
)
else:
bot.send_message(
user_id,
"Ты еще не поделился со мной днем рождения. Используй /start для начала."
)
finally:
db.close()
@bot.message_handler(func=lambda m: m.chat.type == 'private' and m.from_user and m.from_user.id in user_states and m.text)
def handle_birthday_input(message: telebot.types.Message) -> None:
"""Handle birthday input from user."""
if not message.from_user or not message.text:
return
user_id = message.from_user.id
state = user_states.get(user_id)
if state == 'waiting_birthday':
# Parse birthday
birthday_text = message.text.strip()
parsed = parse_birthday(birthday_text)
if not parsed:
bot.send_message(
user_id,
"Неверный формат даты. Используй формат ДД.ММ или ДД.ММ.ГГГГ\n"
"Например: 15.03 или 15.03.1990"
)
return
day, month, year = parsed
# Validate date
if not is_valid_date(day, month, year):
bot.send_message(
user_id,
"Неверная дата. Проверь правильность дня и месяца."
)
return
# Save birthday and ask for preference
db = get_db_session()
try:
if not message.from_user:
return
username: Optional[str] = message.from_user.username
first_name: str = message.from_user.first_name or "Пользователь"
user = db.query(User).filter(User.user_id == user_id).first()
if user:
# Update existing user
user.birthday_day = day
user.birthday_month = month
user.birthday_year = year
user.first_name = first_name
user.username = username
else:
# Create new user
user = User(
user_id=user_id,
username=username,
first_name=first_name,
birthday_day=day,
birthday_month=month,
birthday_year=year,
preference_theme="Музыка" # Default theme
)
db.add(user)
db.commit()
# Ask for preference theme
user_states[user_id] = 'waiting_preference'
ask_preference_theme(bot, user_id)
finally:
db.close()
elif state == 'waiting_preference':
# User selected preference theme
if not message.text:
return
theme_text = message.text.strip()
if theme_text not in THEMES:
bot.send_message(
user_id,
"Пожалуйста, выбери одну из предложенных тем, нажав на кнопку."
)
return
# Save preference
db = get_db_session()
try:
user = db.query(User).filter(User.user_id == user_id).first()
if user:
user.preference_theme = theme_text
db.commit()
# Update user in all chats
update_user_in_chats(bot, db, user)
bot.send_message(
user_id,
f"Отлично! Я запомнил твои предпочтения: {theme_text}\n\n"
f"Теперь я буду поздравлять тебя с днем рождения во всех чатах, где ты состоишь!"
)
user_states.pop(user_id, None)
finally:
db.close()
@bot.callback_query_handler(func=lambda call: call.data and call.data.startswith('theme_'))
def handle_theme_selection(call: telebot.types.CallbackQuery) -> None:
"""Handle theme selection from inline keyboard."""
if not call.from_user or not call.data or not call.message:
return
user_id = call.from_user.id
if user_states.get(user_id) != 'waiting_preference':
bot.answer_callback_query(call.id, "Это действие недоступно сейчас.")
return
theme = call.data.replace('theme_', '')
if theme not in THEMES:
bot.answer_callback_query(call.id, "Неверная тема.")
return
# Save preference
db = get_db_session()
try:
user = db.query(User).filter(User.user_id == user_id).first()
if user:
user.preference_theme = theme
db.commit()
# Update user in all chats
update_user_in_chats(bot, db, user)
bot.answer_callback_query(call.id, f"Выбрано: {theme}")
bot.edit_message_text(
f"Отлично! Я запомнил твои предпочтения: {theme}\n\n"
f"Теперь я буду поздравлять тебя с днем рождения во всех чатах, где ты состоишь!",
chat_id=call.message.chat.id,
message_id=call.message.message_id
)
user_states.pop(user_id, None)
finally:
db.close()
def parse_birthday(text: str) -> Optional[tuple[int, int, Optional[int]]]:
"""Parse birthday from text. Returns (day, month, year) or None."""
# Try DD.MM.YYYY format
match = re.match(r'^(\d{1,2})\.(\d{1,2})\.(\d{4})$', text)
if match:
day, month, year = int(match.group(1)), int(match.group(2)), int(match.group(3))
return (day, month, year)
# Try DD.MM format
match = re.match(r'^(\d{1,2})\.(\d{1,2})$', text)
if match:
day, month = int(match.group(1)), int(match.group(2))
return (day, month, None)
return None
def is_valid_date(day: int, month: int, year: Optional[int]) -> bool:
"""Check if date is valid."""
if month < 1 or month > 12:
return False
if day < 1 or day > 31:
return False
# Check specific month limits
if month in [4, 6, 9, 11] and day > 30:
return False
if month == 2:
if year:
# Check leap year
if (year % 4 == 0 and year % 100 != 0) or (year % 400 == 0):
max_day = 29
else:
max_day = 28
else:
max_day = 29 # Assume leap year if year not provided
if day > max_day:
return False
return True
def ask_preference_theme(bot: telebot.TeleBot, user_id: int) -> None:
"""Ask user to select preference theme."""
keyboard = types.InlineKeyboardMarkup(row_width=2)
for theme in THEMES:
emoji = get_theme_emoji(theme)
button = types.InlineKeyboardButton(
text=f"{emoji} {theme}",
callback_data=f'theme_{theme}'
)
keyboard.add(button)
bot.send_message(
user_id,
"Что тебе нравится? Выбери одну из тем:",
reply_markup=keyboard
)
def update_user_in_chats(bot: telebot.TeleBot, db: Session, user: User) -> None:
"""Update user information in all chats where bot is admin."""
# Get all chats where bot is admin
admin_chats = db.query(Chat).filter(Chat.bot_is_admin == True).all()
for chat in admin_chats:
try:
# Check if user is member of this chat
member = bot.get_chat_member(chat.chat_id, user.user_id)
if member.status not in ['left', 'kicked']:
# User is in chat - add/update relationship
user_chat = db.query(UserChat).filter(
UserChat.user_id == user.user_id,
UserChat.chat_id == chat.chat_id
).first()
if not user_chat:
user_chat = UserChat(user_id=user.user_id, chat_id=chat.chat_id)
db.add(user_chat)
db.commit()
except Exception:
# User might have blocked bot or bot was removed from chat
pass

93
handlers/scheduler.py Normal file
View File

@ -0,0 +1,93 @@
"""Scheduler for daily birthday notifications."""
import telebot
from datetime import datetime
from typing import Optional
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
import pytz
from database import get_db_session, User, Chat, UserChat
from messages import format_birthday_greeting
from config import Config
def send_birthday_notifications(bot: telebot.TeleBot) -> None:
"""Send birthday notifications to all chats for users with birthday today."""
db = get_db_session()
try:
today = datetime.now().date()
day = today.day
month = today.month
# Get all users with birthday today
users_with_birthday = db.query(User).filter(
User.birthday_day == day,
User.birthday_month == month
).all()
if not users_with_birthday:
return
# For each user, send greetings to all their chats
for user in users_with_birthday:
# Get all chats where user is a member
user_chats = db.query(Chat).join(UserChat).filter(
UserChat.user_id == user.user_id,
Chat.bot_is_admin == True
).all()
greeting = format_birthday_greeting(user.first_name, user.preference_theme)
for chat in user_chats:
try:
# Check if user is still in chat
member = bot.get_chat_member(chat.chat_id, user.user_id)
if member.status not in ['left', 'kicked']:
bot.send_message(chat.chat_id, greeting)
except telebot.apihelper.ApiTelegramException as e:
# Handle errors: user blocked bot, bot removed from chat, etc.
if e.error_code == 403:
# Bot was blocked or removed from chat
# Update chat status
chat.bot_is_admin = False
db.commit()
elif e.error_code == 400:
# Invalid chat or user
pass
# Silently continue for other errors
except Exception:
# Other errors - continue
pass
finally:
db.close()
def setup_scheduler(bot: telebot.TeleBot) -> BlockingScheduler:
"""Setup and start the scheduler for daily birthday notifications."""
# Parse notification time
time_str: str = Config.NOTIFICATION_TIME
try:
hour, minute = map(int, time_str.split(':'))
except (ValueError, AttributeError):
hour, minute = 9, 0 # Default to 9:00
# Get timezone
timezone_str: str = Config.TIMEZONE
try:
tz = pytz.timezone(timezone_str)
except (pytz.exceptions.UnknownTimeZoneError, AttributeError):
tz = pytz.UTC
# Create scheduler
scheduler = BlockingScheduler(timezone=tz)
# Add daily job
scheduler.add_job(
send_birthday_notifications,
trigger=CronTrigger(hour=hour, minute=minute),
args=[bot],
id='daily_birthday_notifications',
name='Send daily birthday notifications',
replace_existing=True
)
return scheduler