Files
crossfit/session_notifier.py

140 lines
5.5 KiB
Python

import smtplib
import os
import logging
from email.message import EmailMessage
from telegram import Bot
class SessionNotifier:
"""
A class to handle notifications for session bookings.
Supports sending notifications via email and Telegram.
Attributes:
email_credentials (dict): Dictionary containing email credentials
(e.g., 'from', 'to', 'password')
telegram_credentials (dict): Dictionary containing Telegram credentials
(e.g., 'token', 'chat_id')
enable_email (bool): Whether to enable email notifications
enable_telegram (bool): Whether to enable Telegram notifications
"""
def __init__(self, email_credentials, telegram_credentials, enable_email=True, enable_telegram=True):
"""
Initialize the SessionNotifier with email and Telegram credentials.
Args:
email_credentials (dict): Email credentials for authentication
telegram_credentials (dict): Telegram credentials for authentication
enable_email (bool): Whether to enable email notifications
enable_telegram (bool): Whether to enable Telegram notifications
"""
self.email_credentials = email_credentials
self.telegram_credentials = telegram_credentials
self.enable_email = enable_email
self.enable_telegram = enable_telegram
def send_email_notification(self, message):
"""
Send an email notification with the given message.
Args:
message (str): The message content to be sent in the email
"""
logging.debug("Sending email notification")
logging.debug(f"Email credentials: {self.email_credentials}")
# Create an EmailMessage object
email = EmailMessage()
email.set_content(message)
# Set the email sender and recipient
email["From"] = self.email_credentials["from"]
email["To"] = self.email_credentials["to"]
# Set the email subject
email["Subject"] = "Session Booking Notification"
# Send the email using smtplib
try:
smtp_server = os.environ.get("SMTP_SERVER")
if not smtp_server:
logging.error("SMTP server not configured in environment variables")
raise ValueError("SMTP server not configured")
with smtplib.SMTP_SSL(smtp_server, 465) as smtp:
logging.debug(f"Connecting to SMTP server: {smtp_server}")
smtp.login(self.email_credentials["from"], self.email_credentials['password'])
logging.debug("Logged in to SMTP server")
smtp.send_message(email)
logging.debug("Email sent successfully")
except Exception as e:
logging.error(f"Failed to send email: {str(e)}")
raise
async def send_telegram_notification(self, message):
"""
Send a Telegram notification with the given message.
Args:
message (str): The message content to be sent in the Telegram chat
"""
# Create a Bot instance with the provided token
bot = Bot(token=self.telegram_credentials["token"])
# Send the message to the specified chat ID and await the result
await bot.send_message(chat_id=self.telegram_credentials["chat_id"], text=message)
async def notify_session_booking(self, session_details):
"""
Notify about a session booking via email and Telegram.
Args:
session_details (str): Details about the booked session
"""
# Create messages for both email and Telegram
email_message = f"Session booked: {session_details}"
telegram_message = f"Session booked: {session_details}"
# Send notifications through enabled channels
if self.enable_email:
self.send_email_notification(email_message)
if self.enable_telegram:
await self.send_telegram_notification(telegram_message)
async def notify_upcoming_session(self, session_details, days_until):
"""
Notify about an upcoming session via email and Telegram.
Args:
session_details (str): Details about the upcoming session
days_until (int): Number of days until the session
"""
# Create messages for both email and Telegram
email_message = f"Session available soon: {session_details} (in {days_until} days)"
telegram_message = f"Session available soon: {session_details} (in {days_until} days)"
# Send notifications through enabled channels
if self.enable_email:
self.send_email_notification(email_message)
if self.enable_telegram:
await self.send_telegram_notification(telegram_message)
async def notify_impossible_booking(self, session_details):
"""
Notify about an impossible session booking via email and Telegram.
Args:
session_details (str): Details about the session that couldn't be booked
"""
# Create messages for both email and Telegram
email_message = f"Failed to book session: {session_details}"
telegram_message = f"Failed to book session: {session_details}"
# Send notifications through enabled channels
if self.enable_email:
self.send_email_notification(email_message)
if self.enable_telegram:
await self.send_telegram_notification(telegram_message)