Files
crossfit/crossfit_booker.py
2025-07-24 20:51:39 +02:00

485 lines
20 KiB
Python

# Native modules
import logging
import traceback
import os
import time
import difflib
from datetime import datetime, timedelta, date
# Third-party modules
import requests
from dateutil.parser import parse
import pytz
from dotenv import load_dotenv
from urllib.parse import urlencode
from typing import List, Dict, Optional, Any, Tuple
# Import the SessionNotifier class
from session_notifier import SessionNotifier
# Import the preferred sessions from the session_config module
from session_config import PREFERRED_SESSIONS
load_dotenv()
# Configuration
USERNAME = os.environ.get("CROSSFIT_USERNAME")
PASSWORD = os.environ.get("CROSSFIT_PASSWORD")
if not all([USERNAME, PASSWORD]):
raise ValueError("Missing environment variables: CROSSFIT_USERNAME and/or CROSSFIT_PASSWORD")
APPLICATION_ID = "81560887"
CATEGORY_ID = "677" # Activity category ID for CrossFit
TIMEZONE = "Europe/Paris" # Adjust to your timezone
TARGET_RESERVATION_TIME = "20:01" # When bookings open (8 PM)
BOOKING_WINDOW_END_DELTA_MINUTES = 50 # End window book session
DEVICE_TYPE = "3"
# Retry configuration
RETRY_MAX = 3
RETRY_BACKOFF = 1
APP_VERSION = "5.09.21"
class CrossFitBooker:
"""
A class for automating the booking of CrossFit sessions.
This class handles authentication, session availability checking,
booking, and notifications for CrossFit sessions.
"""
def __init__(self) -> None:
"""
Initialize the CrossFitBooker with necessary attributes.
Sets up authentication tokens, session headers, mandatory parameters,
and initializes the SessionNotifier for sending notifications.
"""
self.auth_token: Optional[str] = None
self.user_id: Optional[str] = None
self.session: requests.Session = requests.Session()
self.base_headers: Dict[str, str] = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:140.0) Gecko/20100101 Firefox/140.0",
"Content-Type": "application/x-www-form-urlencoded",
"Nubapp-Origin": "user_apps",
}
self.session.headers.update(self.base_headers)
# Define mandatory parameters for API calls
self.mandatory_params: Dict[str, str] = {
"app_version": APP_VERSION,
"device_type": DEVICE_TYPE,
"id_application": APPLICATION_ID,
"id_category_activity": CATEGORY_ID
}
# Initialize the SessionNotifier with credentials from environment variables
email_credentials = {
"from": os.environ.get("EMAIL_FROM"),
"to": os.environ.get("EMAIL_TO"),
"password": os.environ.get("EMAIL_PASSWORD")
}
telegram_credentials = {
"token": os.environ.get("TELEGRAM_TOKEN"),
"chat_id": os.environ.get("TELEGRAM_CHAT_ID")
}
# Get notification settings from environment variables
enable_email = os.environ.get("ENABLE_EMAIL_NOTIFICATIONS", "true").lower() in ("true", "1", "yes")
enable_telegram = os.environ.get("ENABLE_TELEGRAM_NOTIFICATIONS", "true").lower() in ("true", "1", "yes")
self.notifier = SessionNotifier(
email_credentials,
telegram_credentials,
enable_email=enable_email,
enable_telegram=enable_telegram
)
def get_auth_headers(self) -> Dict[str, str]:
"""
Return headers with authorization if available.
Returns:
Dict[str, str]: Headers dictionary with authorization if available.
"""
headers: Dict[str, str] = self.base_headers.copy()
if self.auth_token:
headers["Authorization"] = f"Bearer {self.auth_token}"
return headers
def login(self) -> bool:
"""
Authenticate and get the bearer token.
Returns:
bool: True if login is successful, False otherwise.
"""
try:
# First login endpoint
login_params: Dict[str, str] = {
"app_version": APP_VERSION,
"device_type": DEVICE_TYPE,
"username": USERNAME,
"password": PASSWORD
}
response: requests.Response = self.session.post(
"https://sport.nubapp.com/api/v4/users/checkUser.php",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data=urlencode(login_params))
if not response.ok:
logging.error(f"First login step failed: {response.status_code} - {response.text[:100]}")
return False
try:
login_data: Dict[str, Any] = response.json()
self.user_id = str(login_data["data"]["user"]["id_user"])
except (KeyError, ValueError) as e:
logging.error(f"Error during login: {str(e)} - Response: {response.text}")
return False
# Second login endpoint
response: requests.Response = self.session.post(
"https://sport.nubapp.com/api/v4/login",
headers={"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8"},
data=urlencode({
"device_type": DEVICE_TYPE,
"username": USERNAME,
"password": PASSWORD
}))
if response.ok:
try:
login_data: Dict[str, Any] = response.json()
self.auth_token = login_data.get("token")
except (KeyError, ValueError) as e:
logging.error(f"Error during login: {str(e)} - Response: {response.text}")
return False
if self.auth_token and self.user_id:
logging.info("Successfully logged in")
return True
else:
logging.error(f"Login failed: {response.status_code} - {response.text[:100]}")
return False
except requests.exceptions.RequestException as e:
logging.error(f"Request error during login: {str(e)}")
return False
except Exception as e:
logging.error(f"Unexpected error during login: {str(e)}")
return False
def get_available_sessions(self, start_date: date, end_date: date) -> Optional[Dict[str, Any]]:
"""
Fetch available sessions from the API.
Args:
start_date (date): Start date for fetching sessions.
end_date (date): End date for fetching sessions.
Returns:
Optional[Dict[str, Any]]: Dictionary containing available sessions if successful, None otherwise.
"""
if not self.auth_token or not self.user_id:
logging.error("Authentication required - missing token or user ID")
return None
url: str = "https://sport.nubapp.com/api/v4/activities/getActivitiesCalendar.php"
# Prepare request with mandatory parameters
request_data: Dict[str, str] = self.mandatory_params.copy()
request_data.update({
"id_user": self.user_id,
"start_timestamp": start_date.strftime("%d-%m-%Y"),
"end_timestamp": end_date.strftime("%d-%m-%Y")
})
# Add retry logic
for retry in range(RETRY_MAX):
try:
response: requests.Response = self.session.post(
url,
headers=self.get_auth_headers(),
data=urlencode(request_data),
timeout=10
)
if response.status_code == 200:
return response.json()
elif response.status_code == 401:
logging.error("401 Unauthorized - token may be expired or invalid")
return None
elif 500 <= response.status_code < 600:
logging.error(f"Server error {response.status_code}")
raise requests.exceptions.ConnectionError(f"Server error {response.status_code}")
else:
logging.error(f"Unexpected status code: {response.status_code} - {response.text[:100]}")
return None
except requests.exceptions.RequestException as e:
if retry == RETRY_MAX - 1:
logging.error(f"Final retry failed: {str(e)}")
raise
wait_time: int = RETRY_BACKOFF * (2 ** retry)
logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...")
time.sleep(wait_time)
return None
def book_session(self, session_id: str) -> bool:
"""
Book a specific session.
Args:
session_id (str): ID of the session to book.
Returns:
bool: True if booking is successful, False otherwise.
"""
url = "https://sport.nubapp.com/api/v4/activities/bookActivityCalendar.php"
data = {
**self.mandatory_params,
"id_activity_calendar": session_id,
"id_user": self.user_id,
"action_by": self.user_id,
"n_guests": "0",
"booked_on": "3"
}
for retry in range(RETRY_MAX):
try:
response: requests.Response = self.session.post(
url,
headers=self.get_auth_headers(),
data=urlencode(data),
timeout=10
)
if response.status_code == 200:
json_response: Dict[str, Any] = response.json()
if json_response.get("success", False):
logging.info(f"Successfully booked session {session_id}")
return True
logging.error(f"API returned success:false: {json_response}")
return False
logging.error(f"HTTP {response.status_code}: {response.text[:100]}")
return False
except requests.exceptions.RequestException as e:
if retry == RETRY_MAX - 1:
logging.error(f"Final retry failed: {str(e)}")
raise
wait_time: int = RETRY_BACKOFF * (2 ** retry)
logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...")
time.sleep(wait_time)
logging.error(f"Failed to complete request after {RETRY_MAX} attempts")
return False
def is_session_bookable(self, session: Dict[str, Any], current_time: datetime) -> bool:
"""
Check if a session is bookable based on user_info.
Args:
session (Dict[str, Any]): Session data.
current_time (datetime): Current time for comparison.
Returns:
bool: True if the session is bookable, False otherwise.
"""
user_info: Dict[str, Any] = session.get("user_info", {})
# First check if can_join is true (primary condition)
if user_info.get("can_join", False):
return True
# If can_join is False, check if there's a booking window
booking_date_str: str = user_info.get("unableToBookUntilDate", "")
booking_time_str: str = user_info.get("unableToBookUntilTime", "")
if booking_date_str and booking_time_str:
try:
booking_datetime: datetime = datetime.strptime(
f"{booking_date_str} {booking_time_str}",
"%d-%m-%Y %H:%M"
)
booking_datetime = pytz.timezone(TIMEZONE).localize(booking_datetime)
if current_time >= booking_datetime:
return True # Booking window is open
except ValueError:
pass # Ignore invalid date formats
# Default case: not bookable
return False
def matches_preferred_session(self, session: Dict[str, Any], current_time: datetime) -> bool:
"""
Check if session matches one of your preferred sessions with fuzzy matching.
Args:
session (Dict[str, Any]): Session data.
current_time (datetime): Current time for comparison.
Returns:
bool: True if the session matches a preferred session, False otherwise.
"""
try:
session_time: datetime = parse(session["start_timestamp"])
if not session_time.tzinfo:
session_time = pytz.timezone(TIMEZONE).localize(session_time)
day_of_week: int = session_time.weekday()
session_time_str: str = session_time.strftime("%H:%M")
session_name: str = session.get("name_activity", "").upper()
for preferred_day, preferred_time, preferred_name in PREFERRED_SESSIONS:
# Exact match first
if (day_of_week == preferred_day and
session_time_str == preferred_time and
preferred_name in session_name):
return True
# Fuzzy match fallback (80% similarity)
ratio: float = difflib.SequenceMatcher(
None,
session_name.lower(),
preferred_name.lower()
).ratio()
if (day_of_week == preferred_day and
abs(session_time.hour - int(preferred_time.split(':')[0])) <= 1 and
ratio >= 0.8):
logging.debug(f"Fuzzy match: {session_name}{preferred_name} ({ratio:.2%})")
return True
return False
except Exception as e:
logging.error(f"Failed to check session: {str(e)} - Session: {session}")
return False
async def run_booking_cycle(self, current_time: datetime) -> None:
"""
Run one cycle of checking and booking sessions.
Args:
current_time (datetime): Current time for comparison.
"""
# Calculate date range to check (current day, day + 1, and day + 2)
start_date: date = current_time.date()
end_date: date = start_date + timedelta(days=2) # Only go up to day + 2
# Get available sessions
sessions_data: Optional[Dict[str, Any]] = self.get_available_sessions(start_date, end_date)
if not sessions_data or not sessions_data.get("success", False):
logging.error("No sessions available or error fetching sessions")
return
activities: List[Dict[str, Any]] = sessions_data.get("data", {}).get("activities_calendar", [])
# Find sessions to book (preferred only) within allowed date range
sessions_to_book: List[Tuple[str, Dict[str, Any]]] = []
upcoming_sessions: List[Dict[str, Any]] = []
found_preferred_sessions: List[Dict[str, Any]] = []
for session in activities:
session_time: datetime = parse(session["start_timestamp"])
if not session_time.tzinfo:
session_time = pytz.timezone(TIMEZONE).localize(session_time)
# Check if session is within allowed date range (current day, day + 1, or day + 2)
days_diff = (session_time.date() - current_time.date()).days
if not (0 <= days_diff <= 2):
continue # Skip sessions outside the allowed date range
# Check if session is preferred and bookable
if self.is_session_bookable(session, current_time):
if self.matches_preferred_session(session, current_time):
sessions_to_book.append(("Preferred", session))
found_preferred_sessions.append(session)
else:
# Check if it's a preferred session that's not bookable yet
if self.matches_preferred_session(session, current_time):
found_preferred_sessions.append(session)
# Check if it's available tomorrow (day + 1)
if days_diff == 1:
upcoming_sessions.append(session)
if not sessions_to_book and not upcoming_sessions:
logging.info("No matching sessions found to book")
return
# Notify about all found preferred sessions, regardless of bookability
for session in found_preferred_sessions:
session_time: datetime = parse(session["start_timestamp"])
if not session_time.tzinfo:
session_time = pytz.timezone(TIMEZONE).localize(session_time)
session_details = f"{session['id_activity_calendar']} {session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}"
await self.notifier.notify_session_booking(session_details)
logging.info(f"Notified about found preferred session: {session_details}")
# Notify about upcoming sessions
for session in upcoming_sessions:
session_time: datetime = parse(session["start_timestamp"])
if not session_time.tzinfo:
session_time = pytz.timezone(TIMEZONE).localize(session_time)
session_details = f"{session['id_activity_calendar']} {session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}"
await self.notifier.notify_upcoming_session(session_details, 1) # Days until is 1 for tomorrow
logging.info(f"Notified about upcoming session: {session_details}")
# Book sessions (preferred first)
sessions_to_book.sort(key=lambda x: 0 if x[0] == "Preferred" else 1)
for session_type, session in sessions_to_book:
session_time: datetime = datetime.strptime(session["start_timestamp"], "%Y-%m-%d %H:%M:%S")
logging.info(f"Attempting to book {session_type} session at {session_time} ({session['name_activity']})")
if self.book_session(session["id_activity_calendar"]):
# Send notification after successful booking
session_details = f"{session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}"
await self.notifier.notify_session_booking(session_details)
logging.info(f"Successfully booked {session_type} session at {session_time}")
else:
logging.error(f"Failed to book {session_type} session at {session_time}")
# Send notification about the failed booking
session_details = f"{session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}"
await self.notifier.notify_impossible_booking(session_details)
logging.info(f"Notified about impossible booking for {session_type} session at {session_time}")
async def run(self) -> None:
"""
Main execution loop.
"""
# Set up timezone
tz: pytz.timezone = pytz.timezone(TIMEZONE)
# Parse TARGET_RESERVATION_TIME to get the target hour and minute
target_hour, target_minute = map(int, TARGET_RESERVATION_TIME.split(":"))
target_time = datetime.now(tz).replace(hour=target_hour, minute=target_minute, second=0, microsecond=0)
booking_window_end = target_time + timedelta(minutes=BOOKING_WINDOW_END_DELTA_MINUTES)
# Initial login
if not self.login():
logging.error("Authentication failed - exiting program")
return
try:
while True:
try:
current_time: datetime = datetime.now(tz)
logging.info(f"Current time: {current_time}")
# Only book sessions if current time is within the booking window
if target_time <= current_time <= booking_window_end:
# Run booking cycle to check for preferred sessions and book
await self.run_booking_cycle(current_time)
# Wait for a short time before next check
time.sleep(60)
else:
# Check again in 5 minutes if outside booking window
time.sleep(300)
except Exception as e:
logging.error(f"Unexpected error in booking cycle: {str(e)} - Traceback: {traceback.format_exc()}")
time.sleep(60) # Wait before retrying after error
except KeyboardInterrupt:
self.quit()
def quit(self) -> None:
"""
Clean up resources and exit the script.
"""
logging.info("Script interrupted by user. Quitting...")
exit(0)