Files
crossfit/crossfit_booker.py
kbe 944421c68b refactor: Booking preferred sessions works
I modified the code to book only preferred sessions.
First it displays available sessions in console INFO.
Then if there is sessions that matches preferred ones,
it tries to book them and the notify bout it.
2025-08-12 00:20:21 +02:00

540 lines
22 KiB
Python

# Native modules
import logging
import traceback
import os
import time
import difflib
from datetime import datetime, timedelta, date
# Third-party modules
import requests
from dateutil.parser import parse
import pytz
from dotenv import load_dotenv
from urllib.parse import urlencode
from typing import List, Dict, Optional, Any, Tuple
# Import the SessionNotifier class
from session_notifier import SessionNotifier
# Import the preferred sessions from the session_config module
from session_config import PREFERRED_SESSIONS
load_dotenv()
# Configuration
USERNAME = os.environ.get("CROSSFIT_USERNAME")
PASSWORD = os.environ.get("CROSSFIT_PASSWORD")
if not all([USERNAME, PASSWORD]):
raise ValueError("Missing environment variables: CROSSFIT_USERNAME and/or CROSSFIT_PASSWORD")
APPLICATION_ID = "81560887"
CATEGORY_ID = "677" # Activity category ID for CrossFit
TIMEZONE = "Europe/Paris" # Adjust to your timezone
# Booking window configuration (can be overridden by environment variables)
# TARGET_RESERVATION_TIME: string "HH:MM" local time when bookings open (default 20:01)
# BOOKING_WINDOW_END_DELTA_MINUTES: int minutes after target time to stop booking (default 10)
TARGET_RESERVATION_TIME = os.environ.get("TARGET_RESERVATION_TIME", "20:01")
BOOKING_WINDOW_END_DELTA_MINUTES = int(os.environ.get("BOOKING_WINDOW_END_DELTA_MINUTES", "10"))
DEVICE_TYPE = "3"
# Retry configuration
RETRY_MAX = 3
RETRY_BACKOFF = 1
APP_VERSION = "5.09.21"
class CrossFitBooker:
"""
A class for automating the booking of CrossFit sessions.
This class handles authentication, session availability checking,
booking, and notifications for CrossFit sessions.
"""
def __init__(self) -> None:
"""
Initialize the CrossFitBooker with necessary attributes.
Sets up authentication tokens, session headers, mandatory parameters,
and initializes the SessionNotifier for sending notifications.
"""
self.auth_token: Optional[str] = None
self.user_id: Optional[str] = None
self.session: requests.Session = requests.Session()
self.base_headers: Dict[str, str] = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:140.0) Gecko/20100101 Firefox/140.0",
"Content-Type": "application/x-www-form-urlencoded",
"Nubapp-Origin": "user_apps",
}
self.session.headers.update(self.base_headers)
# Define mandatory parameters for API calls
self.mandatory_params: Dict[str, str] = {
"app_version": APP_VERSION,
"device_type": DEVICE_TYPE,
"id_application": APPLICATION_ID,
"id_category_activity": CATEGORY_ID
}
# Initialize the SessionNotifier with credentials from environment variables
email_credentials = {
"from": os.environ.get("EMAIL_FROM"),
"to": os.environ.get("EMAIL_TO"),
"password": os.environ.get("EMAIL_PASSWORD")
}
telegram_credentials = {
"token": os.environ.get("TELEGRAM_TOKEN"),
"chat_id": os.environ.get("TELEGRAM_CHAT_ID")
}
# Get notification settings from environment variables
enable_email = os.environ.get("ENABLE_EMAIL_NOTIFICATIONS", "true").lower() in ("true", "1", "yes")
enable_telegram = os.environ.get("ENABLE_TELEGRAM_NOTIFICATIONS", "true").lower() in ("true", "1", "yes")
self.notifier = SessionNotifier(
email_credentials,
telegram_credentials,
enable_email=enable_email,
enable_telegram=enable_telegram
)
def get_auth_headers(self) -> Dict[str, str]:
"""
Return headers with authorization if available.
Returns:
Dict[str, str]: Headers dictionary with authorization if available.
"""
headers: Dict[str, str] = self.base_headers.copy()
if self.auth_token:
headers["Authorization"] = f"Bearer {self.auth_token}"
return headers
def login(self) -> bool:
"""
Authenticate and get the bearer token.
Returns:
bool: True if login is successful, False otherwise.
"""
try:
# First login endpoint
login_params: Dict[str, str] = {
"app_version": APP_VERSION,
"device_type": DEVICE_TYPE,
"username": USERNAME,
"password": PASSWORD
}
response: requests.Response = self.session.post(
"https://sport.nubapp.com/api/v4/users/checkUser.php",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data=urlencode(login_params))
if not response.ok:
logging.error(f"First login step failed: {response.status_code} - {response.text[:100]}")
return False
try:
login_data: Dict[str, Any] = response.json()
self.user_id = str(login_data["data"]["user"]["id_user"])
except (KeyError, ValueError) as e:
logging.error(f"Error during login: {str(e)} - Response: {response.text}")
return False
# Second login endpoint
response: requests.Response = self.session.post(
"https://sport.nubapp.com/api/v4/login",
headers={"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8"},
data=urlencode({
"device_type": DEVICE_TYPE,
"username": USERNAME,
"password": PASSWORD
}))
if response.ok:
try:
login_data: Dict[str, Any] = response.json()
self.auth_token = login_data.get("token")
except (KeyError, ValueError) as e:
logging.error(f"Error during login: {str(e)} - Response: {response.text}")
return False
if self.auth_token and self.user_id:
logging.info("Successfully logged in")
return True
else:
logging.error(f"Login failed: {response.status_code} - {response.text[:100]}")
return False
except requests.exceptions.RequestException as e:
logging.error(f"Request error during login: {str(e)}")
return False
except Exception as e:
logging.error(f"Unexpected error during login: {str(e)}")
return False
def get_available_sessions(self, start_date: date, end_date: date) -> Optional[Dict[str, Any]]:
"""
Fetch available sessions from the API.
Args:
start_date (date): Start date for fetching sessions.
end_date (date): End date for fetching sessions.
Returns:
Optional[Dict[str, Any]]: Dictionary containing available sessions if successful, None otherwise.
"""
if not self.auth_token or not self.user_id:
logging.error("Authentication required - missing token or user ID")
return None
url: str = "https://sport.nubapp.com/api/v4/activities/getActivitiesCalendar.php"
# Prepare request with mandatory parameters
request_data: Dict[str, str] = self.mandatory_params.copy()
request_data.update({
"id_user": self.user_id,
"start_timestamp": start_date.strftime("%d-%m-%Y"),
"end_timestamp": end_date.strftime("%d-%m-%Y")
})
# Add retry logic
for retry in range(RETRY_MAX):
try:
response: requests.Response = self.session.post(
url,
headers=self.get_auth_headers(),
data=urlencode(request_data),
timeout=10
)
if response.status_code == 200:
return response.json()
elif response.status_code == 401:
logging.error("401 Unauthorized - token may be expired or invalid")
return None
elif 500 <= response.status_code < 600:
logging.error(f"Server error {response.status_code}")
raise requests.exceptions.ConnectionError(f"Server error {response.status_code}")
else:
logging.error(f"Unexpected status code: {response.status_code} - {response.text[:100]}")
return None
except requests.exceptions.RequestException as e:
if retry == RETRY_MAX - 1:
logging.error(f"Final retry failed: {str(e)}")
raise
wait_time: int = RETRY_BACKOFF * (2 ** retry)
logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...")
time.sleep(wait_time)
return None
def book_session(self, session_id: str) -> bool:
"""
Book a specific session.
Args:
session_id (str): ID of the session to book.
Returns:
bool: True if booking is successful, False otherwise.
"""
url = "https://sport.nubapp.com/api/v4/activities/bookActivityCalendar.php"
data = {
**self.mandatory_params,
"id_activity_calendar": session_id,
"id_user": self.user_id,
"action_by": self.user_id,
"n_guests": "0",
"booked_on": "1",
"device_type": self.mandatory_params["device_type"],
"token": self.auth_token
}
for retry in range(RETRY_MAX):
try:
response: requests.Response = self.session.post(
url,
headers=self.get_auth_headers(),
data=urlencode(data),
timeout=10
)
if response.status_code == 200:
json_response: Dict[str, Any] = response.json()
if json_response.get("success", False):
logging.info(f"Successfully booked session {session_id}")
return True
else:
logging.error(f"API returned success:false: {json_response} - Session ID: {session_id}")
return False
logging.error(f"HTTP {response.status_code}: {response.text[:100]}")
return False
except requests.exceptions.RequestException as e:
if retry == RETRY_MAX - 1:
logging.error(f"Final retry failed: {str(e)}")
raise
wait_time: int = RETRY_BACKOFF * (2 ** retry)
logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...")
time.sleep(wait_time)
logging.error(f"Failed to complete request after {RETRY_MAX} attempts")
return False
def get_booked_sessions(self) -> List[Dict[str, Any]]:
"""
Get a list of booked sessions.
Returns:
A list of dictionaries containing information about the booked sessions.
"""
url = "https://sport.nubapp.com/api/v4/activities/getBookedActivities.php"
data = {
**self.mandatory_params,
"id_user": self.user_id,
"action_by": self.user_id
}
for retry in range(RETRY_MAX):
try:
response: requests.Response = self.session.post(
url,
headers=self.get_auth_headers(),
data=urlencode(data),
timeout=10
)
if response.status_code == 200:
json_response: Dict[str, Any] = response.json()
if json_response.get("success", False):
return json_response.get("data", [])
logging.error(f"API returned success:false: {json_response}")
return []
logging.error(f"HTTP {response.status_code}: {response.text[:100]}")
return []
except requests.exceptions.RequestException as e:
if retry == RETRY_MAX - 1:
logging.error(f"Final retry failed: {str(e)}")
raise
wait_time: int = RETRY_BACKOFF * (2 ** retry)
logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...")
time.sleep(wait_time)
logging.error(f"Failed to complete request after {RETRY_MAX} attempts")
return []
def is_session_bookable(self, session: Dict[str, Any], current_time: datetime) -> bool:
"""
Check if a session is bookable based on user_info.
Args:
session (Dict[str, Any]): Session data.
current_time (datetime): Current time for comparison.
Returns:
bool: True if the session is bookable, False otherwise.
"""
user_info: Dict[str, Any] = session.get("user_info", {})
# First check if can_join is true (primary condition)
if user_info.get("can_join", False):
return True
# Default case: not bookable
return False
def matches_preferred_session(self, session: Dict[str, Any], current_time: datetime) -> bool:
"""
Check if session matches one of your preferred sessions with exact matching.
Args:
session (Dict[str, Any]): Session data.
current_time (datetime): Current time for comparison.
Returns:
bool: True if the session matches a preferred session, False otherwise.
"""
try:
session_time: datetime = parse(session["start_timestamp"])
if not session_time.tzinfo:
session_time = pytz.timezone(TIMEZONE).localize(session_time)
day_of_week: int = session_time.weekday()
session_time_str: str = session_time.strftime("%H:%M")
session_name: str = session.get("name_activity", "").upper()
for preferred_day, preferred_time, preferred_name in PREFERRED_SESSIONS:
# Exact match
if (day_of_week == preferred_day and
session_time_str == preferred_time and
preferred_name in session_name):
return True
return False
except Exception as e:
logging.error(f"Failed to check session: {str(e)} - Session: {session}")
return False
def display_upcoming_sessions(self, sessions: List[Dict[str, Any]], current_time: datetime) -> None:
"""
Display upcoming sessions with ID, name, date, and time.
Args:
sessions (List[Dict[str, Any]]): List of session data.
current_time (datetime): Current time for comparison.
"""
if not sessions:
logging.info("No sessions to display")
return
logging.info("Upcoming sessions:")
logging.info("ID\t\tName\t\tDate\t\tTime")
logging.info("="*50)
for session in sessions:
session_time: datetime = parse(session["start_timestamp"])
if not session_time.tzinfo:
session_time = pytz.timezone(TIMEZONE).localize(session_time)
# Format session details
session_id = session.get("id_activity_calendar", "N/A")
session_name = session.get("name_activity", "N/A")
session_date = session_time.strftime("%Y-%m-%d")
session_time_str = session_time.strftime("%H:%M")
# Display session details
logging.info(f"{session_id}\t{session_name}\t{session_date}\t{session_time_str}")
async def booker(self, current_time: datetime) -> None:
"""
Run one cycle of checking and booking sessions.
Args:
current_time (datetime): Current time for comparison.
"""
# Calculate date range to check (current day, day + 1, and day + 2)
start_date: date = current_time.date()
end_date: date = start_date + timedelta(days=2) # Only go up to day + 2
# Get available sessions
sessions_data: Optional[Dict[str, Any]] = self.get_available_sessions(start_date, end_date)
if not sessions_data or not sessions_data.get("success", False):
logging.error("No sessions available or error fetching sessions")
return
activities: List[Dict[str, Any]] = sessions_data.get("data", {}).get("activities_calendar", [])
# Display all available sessions within the date range
self.display_upcoming_sessions(activities, current_time)
# Find sessions to book (preferred only) within allowed date range
found_preferred_sessions: List[Dict[str, Any]] = []
for session in activities:
session_time: datetime = parse(session["start_timestamp"])
if not session_time.tzinfo:
session_time = pytz.timezone(TIMEZONE).localize(session_time)
# Check if session is within allowed date range (current day, day + 1, or day + 2)
days_diff = (session_time.date() - current_time.date()).days
if not (0 <= days_diff <= 2):
continue # Skip sessions outside the allowed date range
# Check if session is preferred and bookable
if self.is_session_bookable(session, current_time):
if self.matches_preferred_session(session, current_time):
found_preferred_sessions.append(session)
# Display preferred sessions found
if found_preferred_sessions:
logging.info("Preferred sessions found:")
for session in found_preferred_sessions:
session_time: datetime = parse(session["start_timestamp"])
if not session_time.tzinfo:
session_time = pytz.timezone(TIMEZONE).localize(session_time)
logging.info(f"ID: {session['id_activity_calendar']}, Name: {session['name_activity']}, Time: {session_time.strftime('%Y-%m-%d %H:%M')}")
else:
logging.info("No matching preferred sessions found")
# Book preferred sessions
if not found_preferred_sessions:
logging.info("No matching sessions found to book")
return
# Book sessions (preferred first)
sessions_to_book = [("Preferred", session) for session in found_preferred_sessions]
sessions_to_book.sort(key=lambda x: 0 if x[0] == "Preferred" else 1)
booked_sessions = []
for session_type, session in sessions_to_book:
session_time: datetime = parse(session["start_timestamp"])
logging.info(f"Attempting to book {session_type} session at {session_time} ({session['name_activity']})")
if self.book_session(session["id_activity_calendar"]):
# Display booked session
booked_sessions.append(session)
logging.info(f"Successfully booked {session_type} session at {session_time}")
# Notify about booked session
session_details = f"{session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}"
await self.notifier.notify_session_booking(session_details)
else:
logging.error(f"Failed to book {session_type} session at {session_time}")
# Notify about failed booking
session_details = f"{session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}"
await self.notifier.notify_impossible_booking(session_details)
logging.info(f"Notified about impossible booking for {session_type} session at {session_time}")
# Display all booked session(s)
if booked_sessions:
logging.info("Booked sessions:")
for session in booked_sessions:
session_time: datetime = parse(session["start_timestamp"])
if not session_time.tzinfo:
session_time = pytz.timezone(TIMEZONE).localize(session_time)
logging.info(f"ID: {session['id_activity_calendar']}, Name: {session['name_activity']}, Time: {session_time.strftime('%Y-%m-%d %H:%M')}")
else:
logging.info("No sessions were booked")
async def run(self) -> None:
"""
Main execution loop.
"""
# Set up timezone
tz: pytz.timezone = pytz.timezone(TIMEZONE)
# Parse TARGET_RESERVATION_TIME to get the target hour and minute
target_hour, target_minute = map(int, TARGET_RESERVATION_TIME.split(":"))
target_time = datetime.now(tz).replace(hour=target_hour, minute=target_minute, second=0, microsecond=0)
booking_window_end = target_time + timedelta(minutes=BOOKING_WINDOW_END_DELTA_MINUTES)
# Initial login
if not self.login():
logging.error("Authentication failed - exiting program")
return
try:
while True:
try:
current_time: datetime = datetime.now(tz)
logging.info(f"Current time: {current_time}")
# Only book sessions if current time is within the booking window
if target_time <= current_time <= booking_window_end:
# Run booking cycle to check for preferred sessions and book
await self.booker(current_time)
# Wait for a short time before next check
time.sleep(60)
else:
# Check again in 5 minutes if outside booking window
time.sleep(300)
except Exception as e:
logging.error(f"Unexpected error in booking cycle: {str(e)} - Traceback: {traceback.format_exc()}")
time.sleep(60) # Wait before retrying after error
except KeyboardInterrupt:
self.quit()
def quit(self) -> None:
"""
Clean up resources and exit the script.
"""
logging.info("Script interrupted by user. Quitting...")
exit(0)