import smtplib import os import logging from email.message import EmailMessage from telegram import Bot class SessionNotifier: """ A class to handle notifications for session bookings. Supports sending notifications via email and Telegram. Attributes: email_credentials (dict): Dictionary containing email credentials (e.g., 'from', 'to', 'password') telegram_credentials (dict): Dictionary containing Telegram credentials (e.g., 'token', 'chat_id') enable_email (bool): Whether to enable email notifications enable_telegram (bool): Whether to enable Telegram notifications """ def __init__(self, email_credentials, telegram_credentials, enable_email=True, enable_telegram=True): """ Initialize the SessionNotifier with email and Telegram credentials. Args: email_credentials (dict): Email credentials for authentication telegram_credentials (dict): Telegram credentials for authentication enable_email (bool): Whether to enable email notifications enable_telegram (bool): Whether to enable Telegram notifications """ self.email_credentials = email_credentials self.telegram_credentials = telegram_credentials self.enable_email = enable_email self.enable_telegram = enable_telegram # Check environment variable for impossible booking notifications self.notify_impossible = os.environ.get("NOTIFY_IMPOSSIBLE_BOOKING", "true").lower() in ("true", "1", "yes") def send_email_notification(self, message): """ Send an email notification with the given message. Args: message (str): The message content to be sent in the email """ logging.debug("Sending email notification") logging.debug(f"Email credentials: {self.email_credentials}") # Create an EmailMessage object email = EmailMessage() email.set_content(message) # Set the email sender and recipient email["From"] = self.email_credentials["from"] email["To"] = self.email_credentials["to"] # Set the email subject email["Subject"] = "Session Booking Notification" # Send the email using smtplib try: smtp_server = os.environ.get("SMTP_SERVER") if not smtp_server: logging.error("SMTP server not configured in environment variables") raise ValueError("SMTP server not configured") with smtplib.SMTP_SSL(smtp_server, 465) as smtp: logging.debug(f"Connecting to SMTP server: {smtp_server}") smtp.login(self.email_credentials["from"], self.email_credentials['password']) logging.debug("Logged in to SMTP server") smtp.send_message(email) logging.debug("Email sent successfully") except Exception as e: logging.error(f"Failed to send email: {str(e)}") raise async def send_telegram_notification(self, message): """ Send a Telegram notification with the given message. Args: message (str): The message content to be sent in the Telegram chat """ # Create a Bot instance with the provided token bot = Bot(token=self.telegram_credentials["token"]) # Send the message to the specified chat ID and await the result await bot.send_message(chat_id=self.telegram_credentials["chat_id"], text=message) async def notify_session_booking(self, session_details): """ Notify about a session booking via email and Telegram. Args: session_details (str): Details about the booked session """ # Create messages for both email and Telegram email_message = f"Session booked: {session_details}" telegram_message = f"Session booked: {session_details}" # Send notifications through enabled channels if self.enable_email: self.send_email_notification(email_message) if self.enable_telegram: await self.send_telegram_notification(telegram_message) async def notify_upcoming_session(self, session_details, days_until): """ Notify about an upcoming session via email and Telegram. Args: session_details (str): Details about the upcoming session days_until (int): Number of days until the session """ # Create messages for both email and Telegram email_message = f"Session available soon: {session_details} (in {days_until} days)" telegram_message = f"Session available soon: {session_details} (in {days_until} days)" # Send notifications through enabled channels if self.enable_email: self.send_email_notification(email_message) if self.enable_telegram: await self.send_telegram_notification(telegram_message) async def notify_impossible_booking(self, session_details, notify_if_impossible=None): """ Notify about an impossible session booking via email and Telegram. Args: session_details (str): Details about the session that couldn't be booked notify_if_impossible (bool, optional): Whether to send notifications for impossible bookings. If None, uses the value from the NOTIFY_IMPOSSIBLE_BOOKING environment variable. """ # Determine if notifications should be sent # First check the method parameter (if provided), then the environment variable should_notify = ( notify_if_impossible if notify_if_impossible is not None else self.notify_impossible ) # Only proceed if notifications for impossible bookings are enabled if not should_notify: return # Create messages for both email and Telegram email_message = f"Failed to book session: {session_details}" telegram_message = f"Failed to book session: {session_details}" # Send notifications through enabled channels if self.enable_email: self.send_email_notification(email_message) if self.enable_telegram: await self.send_telegram_notification(telegram_message)