diff --git a/.env.example b/.env.example index 6d0bf93..2d08c79 100644 --- a/.env.example +++ b/.env.example @@ -1,8 +1,8 @@ # CrossFit booking credentials CROSSFIT_USERNAME=your_username CROSSFIT_PASSWORD=your_password -TARGET_RESERVATION_TIME="20:01" -BOOKING_WINDOW_END_DELTA_MINUTES="10" +TARGET_RESERVATION_TIME="21:01" +BOOKING_WINDOW_END_DELTA_MINUTES="59" # Notification settings diff --git a/crossfit_booker.py b/crossfit_booker.py index 8b77480..feee849 100644 --- a/crossfit_booker.py +++ b/crossfit_booker.py @@ -1,63 +1,36 @@ # Native modules -import logging -import traceback -import os -import time -import difflib +import logging, traceback, os, time from datetime import datetime, timedelta, date # Third-party modules -import requests +import requests, pytz from dateutil.parser import parse -import pytz from dotenv import load_dotenv from urllib.parse import urlencode from typing import List, Dict, Optional, Any, Tuple -# Import the SessionNotifier class from session_notifier import SessionNotifier - -# Import the preferred sessions from the session_config module from session_config import PREFERRED_SESSIONS load_dotenv() -# Configuration USERNAME = os.environ.get("CROSSFIT_USERNAME") PASSWORD = os.environ.get("CROSSFIT_PASSWORD") - if not all([USERNAME, PASSWORD]): raise ValueError("Missing environment variables: CROSSFIT_USERNAME and/or CROSSFIT_PASSWORD") APPLICATION_ID = "81560887" -CATEGORY_ID = "677" # Activity category ID for CrossFit -TIMEZONE = "Europe/Paris" # Adjust to your timezone -# Booking window configuration (can be overridden by environment variables) -# TARGET_RESERVATION_TIME: string "HH:MM" local time when bookings open (default 20:01) -# BOOKING_WINDOW_END_DELTA_MINUTES: int minutes after target time to stop booking (default 10) +CATEGORY_ID = "677" +TIMEZONE = "Europe/Paris" TARGET_RESERVATION_TIME = os.environ.get("TARGET_RESERVATION_TIME", "20:01") BOOKING_WINDOW_END_DELTA_MINUTES = int(os.environ.get("BOOKING_WINDOW_END_DELTA_MINUTES", "10")) DEVICE_TYPE = "3" - -# Retry configuration RETRY_MAX = 3 RETRY_BACKOFF = 1 APP_VERSION = "5.09.21" - class CrossFitBooker: - """ - A class for automating the booking of CrossFit sessions. - This class handles authentication, session availability checking, - booking, and notifications for CrossFit sessions. - """ - def __init__(self) -> None: - """ - Initialize the CrossFitBooker with necessary attributes. - Sets up authentication tokens, session headers, mandatory parameters, - and initializes the SessionNotifier for sending notifications. - """ self.auth_token: Optional[str] = None self.user_id: Optional[str] = None self.session: requests.Session = requests.Session() @@ -67,176 +40,135 @@ class CrossFitBooker: "Nubapp-Origin": "user_apps", } self.session.headers.update(self.base_headers) - - # Define mandatory parameters for API calls self.mandatory_params: Dict[str, str] = { - "app_version": APP_VERSION, - "device_type": DEVICE_TYPE, - "id_application": APPLICATION_ID, - "id_category_activity": CATEGORY_ID + "app_version": APP_VERSION, "device_type": DEVICE_TYPE, + "id_application": APPLICATION_ID, "id_category_activity": CATEGORY_ID } - - # Initialize the SessionNotifier with credentials from environment variables - email_credentials = { - "from": os.environ.get("EMAIL_FROM"), - "to": os.environ.get("EMAIL_TO"), - "password": os.environ.get("EMAIL_PASSWORD") - } - - telegram_credentials = { - "token": os.environ.get("TELEGRAM_TOKEN"), - "chat_id": os.environ.get("TELEGRAM_CHAT_ID") - } - - # Get notification settings from environment variables + email_credentials = {"from": os.environ.get("EMAIL_FROM"), "to": os.environ.get("EMAIL_TO"), "password": os.environ.get("EMAIL_PASSWORD")} + telegram_credentials = {"token": os.environ.get("TELEGRAM_TOKEN"), "chat_id": os.environ.get("TELEGRAM_CHAT_ID")} enable_email = os.environ.get("ENABLE_EMAIL_NOTIFICATIONS", "true").lower() in ("true", "1", "yes") enable_telegram = os.environ.get("ENABLE_TELEGRAM_NOTIFICATIONS", "true").lower() in ("true", "1", "yes") + self.notifier = SessionNotifier(email_credentials, telegram_credentials, enable_email=enable_email, enable_telegram=enable_telegram) - self.notifier = SessionNotifier( - email_credentials, - telegram_credentials, - enable_email=enable_email, - enable_telegram=enable_telegram - ) + def _auth_headers(self) -> Dict[str, str]: + h = self.base_headers.copy() + if self.auth_token: + h["Authorization"] = f"Bearer {self.auth_token}" + return h + # Public method expected by tests def get_auth_headers(self) -> Dict[str, str]: """ - Return headers with authorization if available. - Returns: - Dict[str, str]: Headers dictionary with authorization if available. + Return headers with Authorization when token is present. + This wraps _auth_headers() to satisfy tests expecting a public method. """ - headers: Dict[str, str] = self.base_headers.copy() - if self.auth_token: - headers["Authorization"] = f"Bearer {self.auth_token}" - return headers + return self._auth_headers() + + def _post(self, url: str, data: Dict[str, Any], headers: Optional[Dict[str, str]] = None, expect_json: bool = True) -> Optional[Any]: + for retry in range(RETRY_MAX): + try: + resp = self.session.post(url, headers=(headers or self._auth_headers()), data=urlencode(data), timeout=10) + sc = resp.status_code + if sc == 200: + return resp.json() if expect_json else resp + if sc == 401: + logging.error("401 Unauthorized") + return None + # Guard sc to ensure it's an int for comparison (fix tests using Mock without status_code int semantics) + if isinstance(sc, int) and 500 <= sc < 600: + logging.error(f"Server error {sc}") + raise requests.exceptions.ConnectionError(f"Server error {sc}") + logging.error(f"HTTP {sc}: {getattr(resp, 'text', '')[:100]}") + return None + except requests.exceptions.RequestException as e: + if retry == RETRY_MAX - 1: + logging.error(f"Final retry failed: {e}") + raise + wt = RETRY_BACKOFF * (2 ** retry) + logging.warning(f"Request failed ({retry+1}/{RETRY_MAX}): {e}. Retrying in {wt}s...") + time.sleep(wt) + return None + + def _parse_local(self, ts: str) -> datetime: + dt = parse(ts) + return dt if dt.tzinfo else pytz.timezone(TIMEZONE).localize(dt) + + def _fmt_session(self, s: Dict[str, Any], dt: Optional[datetime] = None) -> str: + dt = dt or self._parse_local(s["start_timestamp"]) + return f"{s['id_activity_calendar']} {s['name_activity']} at {dt.strftime('%Y-%m-%d %H:%M')}" def login(self) -> bool: - """ - Authenticate and get the bearer token. - Returns: - bool: True if login is successful, False otherwise. - """ try: - # First login endpoint - login_params: Dict[str, str] = { - "app_version": APP_VERSION, - "device_type": DEVICE_TYPE, - "username": USERNAME, - "password": PASSWORD - } - - response: requests.Response = self.session.post( + # Directly use requests to align with tests that mock requests.Session.post + a = {"app_version": APP_VERSION, "device_type": DEVICE_TYPE, "username": USERNAME, "password": PASSWORD} + resp1 = self.session.post( "https://sport.nubapp.com/api/v4/users/checkUser.php", headers={"Content-Type": "application/x-www-form-urlencoded"}, - data=urlencode(login_params)) - - if not response.ok: - logging.error(f"First login step failed: {response.status_code} - {response.text[:100]}") + data=urlencode(a) + ) + if not getattr(resp1, "ok", False): + logging.error("First login step failed") return False - try: - login_data: Dict[str, Any] = response.json() - self.user_id = str(login_data["data"]["user"]["id_user"]) - except (KeyError, ValueError) as e: - logging.error(f"Error during login: {str(e)} - Response: {response.text}") + r1 = resp1.json() + self.user_id = str(r1["data"]["user"]["id_user"]) + except Exception as e: + logging.error(f"Error during login: {e} - Response: {getattr(resp1, 'text', '')}") return False - # Second login endpoint - response: requests.Response = self.session.post( + resp2 = self.session.post( "https://sport.nubapp.com/api/v4/login", headers={"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8"}, - data=urlencode({ - "device_type": DEVICE_TYPE, - "username": USERNAME, - "password": PASSWORD - })) - - if response.ok: + data=urlencode({"device_type": DEVICE_TYPE, "username": USERNAME, "password": PASSWORD}) + ) + if getattr(resp2, "ok", False): try: - login_data: Dict[str, Any] = response.json() - self.auth_token = login_data.get("token") - except (KeyError, ValueError) as e: - logging.error(f"Error during login: {str(e)} - Response: {response.text}") + r2 = resp2.json() + self.auth_token = r2.get("token") + except Exception as e: + logging.error(f"Error during login: {e} - Response: {getattr(resp2, 'text', '')}") return False - if self.auth_token and self.user_id: logging.info("Successfully logged in") return True - else: - logging.error(f"Login failed: {response.status_code} - {response.text[:100]}") - return False + logging.error("Login failed") + return False except requests.exceptions.RequestException as e: - logging.error(f"Request error during login: {str(e)}") + logging.error(f"Request error during login: {e}") return False except Exception as e: - logging.error(f"Unexpected error during login: {str(e)}") + logging.error(f"Unexpected error during login: {e}") return False def get_available_sessions(self, start_date: date, end_date: date) -> Optional[Dict[str, Any]]: - """ - Fetch available sessions from the API. - Args: - start_date (date): Start date for fetching sessions. - end_date (date): End date for fetching sessions. - Returns: - Optional[Dict[str, Any]]: Dictionary containing available sessions if successful, None otherwise. - """ if not self.auth_token or not self.user_id: logging.error("Authentication required - missing token or user ID") return None - - url: str = "https://sport.nubapp.com/api/v4/activities/getActivitiesCalendar.php" - - # Prepare request with mandatory parameters - request_data: Dict[str, str] = self.mandatory_params.copy() - request_data.update({ - "id_user": self.user_id, - "start_timestamp": start_date.strftime("%d-%m-%Y"), - "end_timestamp": end_date.strftime("%d-%m-%Y") - }) - - # Add retry logic - for retry in range(RETRY_MAX): - try: - response: requests.Response = self.session.post( - url, - headers=self.get_auth_headers(), - data=urlencode(request_data), - timeout=10 - ) - - if response.status_code == 200: - return response.json() - elif response.status_code == 401: - logging.error("401 Unauthorized - token may be expired or invalid") - return None - elif 500 <= response.status_code < 600: - logging.error(f"Server error {response.status_code}") - raise requests.exceptions.ConnectionError(f"Server error {response.status_code}") - else: - logging.error(f"Unexpected status code: {response.status_code} - {response.text[:100]}") - return None - - except requests.exceptions.RequestException as e: - if retry == RETRY_MAX - 1: - logging.error(f"Final retry failed: {str(e)}") - raise - wait_time: int = RETRY_BACKOFF * (2 ** retry) - logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...") - time.sleep(wait_time) - - return None + data = {**self.mandatory_params, "id_user": self.user_id, + "start_timestamp": start_date.strftime("%d-%m-%Y"), + "end_timestamp": end_date.strftime("%d-%m-%Y")} + r = self._post("https://sport.nubapp.com/api/v4/activities/getActivitiesCalendar.php", data) + return r def book_session(self, session_id: str) -> bool: - """ - Book a specific session. - Args: - session_id (str): ID of the session to book. - Returns: - bool: True if booking is successful, False otherwise. - """ - url = "https://sport.nubapp.com/api/v4/activities/bookActivityCalendar.php" + # Debug payload composition (without secrets) + safe_user_id = str(self.user_id) if self.user_id else None + debug_payload = { + **self.mandatory_params, + "id_activity_calendar": session_id, + "id_user": safe_user_id, + "action_by": safe_user_id, + "n_guests": "0", + "booked_on": "1", + "device_type": self.mandatory_params.get("device_type"), + "token_present": bool(self.auth_token), + } + logging.debug(f"[book_session] URL=https://sport.nubapp.com/api/v4/activities/bookActivityCalendar.php " + f"method=POST content_type=application/x-www-form-urlencoded " + f"keys={list(debug_payload.keys())} " + f"id_activity_calendar_present={bool(session_id)} " + f"user_id_present={bool(safe_user_id)} token_present={debug_payload['token_present']}") data = { **self.mandatory_params, "id_activity_calendar": session_id, @@ -247,275 +179,109 @@ class CrossFitBooker: "device_type": self.mandatory_params["device_type"], "token": self.auth_token } - - for retry in range(RETRY_MAX): - try: - response: requests.Response = self.session.post( - url, - headers=self.get_auth_headers(), - data=urlencode(data), - timeout=10 - ) - - if response.status_code == 200: - json_response: Dict[str, Any] = response.json() - if json_response.get("success", False): - logging.info(f"Successfully booked session {session_id}") - return True - else: - logging.error(f"API returned success:false: {json_response} - Session ID: {session_id}") - return False - - logging.error(f"HTTP {response.status_code}: {response.text[:100]}") - return False - - except requests.exceptions.RequestException as e: - if retry == RETRY_MAX - 1: - logging.error(f"Final retry failed: {str(e)}") - raise - wait_time: int = RETRY_BACKOFF * (2 ** retry) - logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...") - time.sleep(wait_time) - - logging.error(f"Failed to complete request after {RETRY_MAX} attempts") + r = self._post("https://sport.nubapp.com/api/v4/activities/bookActivityCalendar.php", data) + if r and r.get("success", False): + logging.info(f"Successfully booked session {session_id}") + return True + logging.error(f"Booking failed: {r}" if r is not None else "Booking call failed") return False def get_booked_sessions(self) -> List[Dict[str, Any]]: - """ - Get a list of booked sessions. - - Returns: - A list of dictionaries containing information about the booked sessions. - """ - url = "https://sport.nubapp.com/api/v4/activities/getBookedActivities.php" - data = { - **self.mandatory_params, - "id_user": self.user_id, - "action_by": self.user_id - } - - for retry in range(RETRY_MAX): - try: - response: requests.Response = self.session.post( - url, - headers=self.get_auth_headers(), - data=urlencode(data), - timeout=10 - ) - - if response.status_code == 200: - json_response: Dict[str, Any] = response.json() - if json_response.get("success", False): - return json_response.get("data", []) - logging.error(f"API returned success:false: {json_response}") - return [] - - logging.error(f"HTTP {response.status_code}: {response.text[:100]}") - return [] - - except requests.exceptions.RequestException as e: - if retry == RETRY_MAX - 1: - logging.error(f"Final retry failed: {str(e)}") - raise - wait_time: int = RETRY_BACKOFF * (2 ** retry) - logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...") - time.sleep(wait_time) - - logging.error(f"Failed to complete request after {RETRY_MAX} attempts") + data = {**self.mandatory_params, "id_user": self.user_id, "action_by": self.user_id} + r = self._post("https://sport.nubapp.com/api/v4/activities/getBookedActivities.php", data) + if r and r.get("success", False): + return r.get("data", []) + logging.error(f"Failed to retrieve booked sessions: {r}" if r is not None else "Call failed") return [] def is_session_bookable(self, session: Dict[str, Any], current_time: datetime) -> bool: - """ - Check if a session is bookable based on user_info. - Args: - session (Dict[str, Any]): Session data. - current_time (datetime): Current time for comparison. - Returns: - bool: True if the session is bookable, False otherwise. - """ - user_info: Dict[str, Any] = session.get("user_info", {}) - - # First check if can_join is true (primary condition) - if user_info.get("can_join", False): - return True - - # If can_join is False, check if there's a booking window - booking_date_str: str = user_info.get("unableToBookUntilDate", "") - booking_time_str: str = user_info.get("unableToBookUntilTime", "") - - if booking_date_str and booking_time_str: + ui = session.get("user_info", {}) + if ui.get("can_join", False): return True + d, t = ui.get("unableToBookUntilDate", ""), ui.get("unableToBookUntilTime", "") + if d and t: try: - booking_datetime: datetime = datetime.strptime( - f"{booking_date_str} {booking_time_str}", - "%d-%m-%Y %H:%M" - ) - booking_datetime = pytz.timezone(TIMEZONE).localize(booking_datetime) - - if current_time >= booking_datetime: - return True # Booking window is open - except ValueError: - pass # Ignore invalid date formats - - # Default case: not bookable + bd = pytz.timezone(TIMEZONE).localize(datetime.strptime(f"{d} {t}", "%d-%m-%Y %H:%M")) + if current_time >= bd: return True + except ValueError: pass return False def matches_preferred_session(self, session: Dict[str, Any], current_time: datetime) -> bool: - """ - Check if session matches one of your preferred sessions with exact matching. - Args: - session (Dict[str, Any]): Session data. - current_time (datetime): Current time for comparison. - Returns: - bool: True if the session matches a preferred session, False otherwise. - """ try: - session_time: datetime = parse(session["start_timestamp"]) - if not session_time.tzinfo: - session_time = pytz.timezone(TIMEZONE).localize(session_time) - - day_of_week: int = session_time.weekday() - session_time_str: str = session_time.strftime("%H:%M") - session_name: str = session.get("name_activity", "").upper() - - for preferred_day, preferred_time, preferred_name in PREFERRED_SESSIONS: - # Exact match - if (day_of_week == preferred_day and - session_time_str == preferred_time and - preferred_name in session_name): - return True - + st = self._parse_local(session["start_timestamp"]) + dow, hhmm = st.weekday(), st.strftime("%H:%M") + name = session.get("name_activity", "").upper() + for pd, pt, pn in PREFERRED_SESSIONS: + if dow == pd and hhmm == pt and pn in name: return True return False - except Exception as e: - logging.error(f"Failed to check session: {str(e)} - Session: {session}") + logging.error(f"Failed to check session: {e} - Session: {session}") return False async def run_booking_cycle(self, current_time: datetime) -> None: - """ - Run one cycle of checking and booking sessions. - Args: - current_time (datetime): Current time for comparison. - """ - # Calculate date range to check (current day, day + 1, and day + 2) - start_date: date = current_time.date() - end_date: date = start_date + timedelta(days=2) # Only go up to day + 2 - - # Get available sessions - sessions_data: Optional[Dict[str, Any]] = self.get_available_sessions(start_date, end_date) + start_date, end_date = current_time.date(), current_time.date() + timedelta(days=2) + sessions_data = self.get_available_sessions(start_date, end_date) if not sessions_data or not sessions_data.get("success", False): logging.error("No sessions available or error fetching sessions") return - activities: List[Dict[str, Any]] = sessions_data.get("data", {}).get("activities_calendar", []) - - # Find sessions to book (preferred only) within allowed date range sessions_to_book: List[Tuple[str, Dict[str, Any]]] = [] upcoming_sessions: List[Dict[str, Any]] = [] found_preferred_sessions: List[Dict[str, Any]] = [] - - for session in activities: - session_time: datetime = parse(session["start_timestamp"]) - if not session_time.tzinfo: - session_time = pytz.timezone(TIMEZONE).localize(session_time) - - # Check if session is within allowed date range (current day, day + 1, or day + 2) - days_diff = (session_time.date() - current_time.date()).days - if not (0 <= days_diff <= 2): - continue # Skip sessions outside the allowed date range - - # Check if session is preferred and bookable - if self.is_session_bookable(session, current_time): - if self.matches_preferred_session(session, current_time): - sessions_to_book.append(("Preferred", session)) - found_preferred_sessions.append(session) + for s in activities: + st = self._parse_local(s["start_timestamp"]) + days_diff = (st.date() - current_time.date()).days + if not (0 <= days_diff <= 2): continue + if self.is_session_bookable(s, current_time): + if self.matches_preferred_session(s, current_time): + sessions_to_book.append(("Preferred", s)); found_preferred_sessions.append(s) else: - # Check if it's a preferred session that's not bookable yet - if self.matches_preferred_session(session, current_time): - found_preferred_sessions.append(session) - # Check if it's available tomorrow (day + 1) - if days_diff == 1: - upcoming_sessions.append(session) - + if self.matches_preferred_session(s, current_time): + found_preferred_sessions.append(s) + if days_diff == 1: upcoming_sessions.append(s) if not sessions_to_book and not upcoming_sessions: - logging.info("No matching sessions found to book") - return - - # Notify about all found preferred sessions, regardless of bookability - for session in found_preferred_sessions: - session_time: datetime = parse(session["start_timestamp"]) - if not session_time.tzinfo: - session_time = pytz.timezone(TIMEZONE).localize(session_time) - session_details = f"{session['id_activity_calendar']} {session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}" - await self.notifier.notify_session_booking(session_details) - logging.info(f"Notified about found preferred session: {session_details}") - - # Notify about upcoming sessions - for session in upcoming_sessions: - session_time: datetime = parse(session["start_timestamp"]) - if not session_time.tzinfo: - session_time = pytz.timezone(TIMEZONE).localize(session_time) - session_details = f"{session['id_activity_calendar']} {session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}" - await self.notifier.notify_upcoming_session(session_details, 1) # Days until is 1 for tomorrow - logging.info(f"Notified about upcoming session: {session_details}") - - # Book sessions (preferred first) + logging.info("No matching sessions found to book"); return + for s in found_preferred_sessions: + details = self._fmt_session(s) + await self.notifier.notify_session_booking(details) + logging.info(f"Notified about found preferred session: {details}") + for s in upcoming_sessions: + details = self._fmt_session(s) + await self.notifier.notify_upcoming_session(details, 1) + logging.info(f"Notified about upcoming session: {details}") sessions_to_book.sort(key=lambda x: 0 if x[0] == "Preferred" else 1) - for session_type, session in sessions_to_book: - session_time: datetime = datetime.strptime(session["start_timestamp"], "%Y-%m-%d %H:%M:%S") - logging.info(f"Attempting to book {session_type} session at {session_time} ({session['name_activity']})") - if self.book_session(session["id_activity_calendar"]): - # Send notification after successful booking - session_details = f"{session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}" - await self.notifier.notify_session_booking(session_details) - logging.info(f"Successfully booked {session_type} session at {session_time}") + for stype, s in sessions_to_book: + st_dt = datetime.strptime(s["start_timestamp"], "%Y-%m-%d %H:%M:%S") + logging.info(f"Attempting to book {stype} session at {st_dt} ({s['name_activity']})") + if self.book_session(s["id_activity_calendar"]): + details = f"{s['name_activity']} at {st_dt.strftime('%Y-%m-%d %H:%M')}" + await self.notifier.notify_session_booking(details) + logging.info(f"Successfully booked {stype} session at {st_dt}") else: - logging.error(f"Failed to book {session_type} session at {session_time}") - # Send notification about the failed booking - session_details = f"{session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}" - await self.notifier.notify_impossible_booking(session_details) - logging.info(f"Notified about impossible booking for {session_type} session at {session_time}") + logging.error(f"Failed to book {stype} session at {st_dt}") + details = f"{s['name_activity']} at {st_dt.strftime('%Y-%m-%d %H:%M')}" + await self.notifier.notify_impossible_booking(details) + logging.info(f"Notified about impossible booking for {stype} session at {st_dt}") async def run(self) -> None: - """ - Main execution loop. - """ - # Set up timezone - tz: pytz.timezone = pytz.timezone(TIMEZONE) - - # Parse TARGET_RESERVATION_TIME to get the target hour and minute - target_hour, target_minute = map(int, TARGET_RESERVATION_TIME.split(":")) - target_time = datetime.now(tz).replace(hour=target_hour, minute=target_minute, second=0, microsecond=0) + tz = pytz.timezone(TIMEZONE) + th, tm = map(int, TARGET_RESERVATION_TIME.split(":")) + target_time = datetime.now(tz).replace(hour=th, minute=tm, second=0, microsecond=0) booking_window_end = target_time + timedelta(minutes=BOOKING_WINDOW_END_DELTA_MINUTES) - - # Initial login if not self.login(): - logging.error("Authentication failed - exiting program") - return - + logging.error("Authentication failed - exiting program"); return try: while True: try: - current_time: datetime = datetime.now(tz) - logging.info(f"Current time: {current_time}") - - # Only book sessions if current time is within the booking window - if target_time <= current_time <= booking_window_end: - # Run booking cycle to check for preferred sessions and book - await self.run_booking_cycle(current_time) - # Wait for a short time before next check - time.sleep(60) + now = datetime.now(tz) + logging.info(f"Current time: {now}") + if target_time <= now <= booking_window_end: + await self.run_booking_cycle(now); time.sleep(60) else: - # Check again in 5 minutes if outside booking window time.sleep(300) except Exception as e: - logging.error(f"Unexpected error in booking cycle: {str(e)} - Traceback: {traceback.format_exc()}") - time.sleep(60) # Wait before retrying after error + logging.error(f"Unexpected error in booking cycle: {e} - Traceback: {traceback.format_exc()}"); time.sleep(60) except KeyboardInterrupt: self.quit() def quit(self) -> None: - """ - Clean up resources and exit the script. - """ - logging.info("Script interrupted by user. Quitting...") - exit(0) + logging.info("Script interrupted by user. Quitting..."); exit(0)