Files
crossfit/book_crossfit.py
2025-07-18 16:03:38 +02:00

427 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
# Native modules
import json
import logging
from datetime import datetime, timedelta
import os
import sys
import time
# Third-party modules
import requests
from dateutil.parser import parse
import pytz
from dotenv import load_dotenv
from urllib.parse import urlencode
from typing import List, Dict, Optional
from datetime import datetime, timedelta
# Parse session time (handles timezones if present)
from dateutil.parser import parse
import pytz
from urllib.parse import urlencode
from typing import List, Dict, Optional
from dotenv import load_dotenv
load_dotenv()
# Configuration
USERNAME = os.environ.get("CROSSFIT_USERNAME")
PASSWORD = os.environ.get("CROSSFIT_PASSWORD")
if not all([USERNAME, PASSWORD]):
raise ValueError("Missing environment variables: CROSSFIT_USERNAME and/or CROSSFIT_PASSWORD")
APPLICATION_ID = "81560887"
CATEGORY_ID = "677" # Activity category ID for CrossFit
TIMEZONE = "Europe/Paris" # Adjust to your timezone
TARGET_RESERVATION_TIME = "20:01" # When bookings open (8 PM)
DEVICE_TYPE = "3"
# Retry configuration
RETRY_MAX = 3
RETRY_BACKOFF = 1
APP_VERSION = "5.09.21"
# Define your preferred sessions
# Format: List of tuples (day_of_week, start_time, session_name_contains)
# day_of_week: 0=Monday, 6=Sunday
PREFERRED_SESSIONS = [
(4, "17:00", "WEIGHTLIFTING"), # Friday 17:00 WEIGHTLIFTING
(5, "12:30", "HYROX"), # Saturday 12:30 HYROX
(2, "18:30", "CONDITIONING"), # Wednesday 18:30 CONDITIONING
# (5, "15:15", "BIG WOD") # Saturday 15:15 BIG WOD
]
# Configure logging once at script startup
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("log/crossfit_booking.log"),
logging.StreamHandler()
]
)
class CrossFitBooker:
def __init__(self):
self.auth_token = None
self.user_id = None
self.session = requests.Session()
self.base_headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:140.0) Gecko/20100101 Firefox/140.0",
"Content-Type": "application/x-www-form-urlencoded",
"Nubapp-Origin": "user_apps",
}
self.session.headers.update(self.base_headers)
# Define mandatory parameters for API calls
self.mandatory_params = {
"app_version": APP_VERSION,
"device_type": DEVICE_TYPE,
"id_application": APPLICATION_ID,
"id_category_activity": CATEGORY_ID
}
def get_auth_headers(self) -> Dict:
"""Return headers with authorization if available"""
headers = self.base_headers.copy()
if self.auth_token:
headers["Authorization"] = f"Bearer {self.auth_token}"
return headers
def login(self) -> bool:
"""Authenticate and get the bearer token"""
try:
# First login endpoint
login_params = {
"app_version": APP_VERSION,
"device_type": DEVICE_TYPE,
"username": USERNAME,
"password": PASSWORD
}
response = self.session.post(
"https://sport.nubapp.com/api/v4/users/checkUser.php",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data=urlencode(login_params))
if not response.ok:
print(f"First login step failed: {response.status_code} - {response.text}")
return False
login_data = response.json()
self.user_id = str(login_data["data"]["user"]["id_user"])
# Second login endpoint
response = self.session.post(
"https://sport.nubapp.com/api/v4/login",
headers={"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8"},
data=urlencode({
"device_type": DEVICE_TYPE,
"username": USERNAME,
"password": PASSWORD
}))
if response.ok:
login_data = response.json()
self.auth_token = login_data.get("token")
if self.auth_token and self.user_id:
print("Successfully logged in")
return True
print(f"Login failed: {response.status_code} - {response.text}")
return False
except Exception as e:
print(f"Login error: {str(e)}")
return False
def get_available_sessions(self, start_date: datetime, end_date: datetime) -> Optional[Dict]:
"""Fetch available sessions from the API with comprehensive error handling"""
if not self.auth_token or not self.user_id:
print("Authentication required - missing token or user ID")
return None
url = "https://sport.nubapp.com/api/v4/activities/getActivitiesCalendar.php"
# Prepare request with mandatory parameters
request_data = self.mandatory_params.copy()
request_data.update({
"id_user": self.user_id,
"start_timestamp": start_date.strftime("%d-%m-%Y"),
"end_timestamp": end_date.strftime("%d-%m-%Y")
})
# Debugging logs
# print(f"Request Data: {request_data}")
# print(f"Headers: {self.get_auth_headers()}")
# Add retry logic with exponential backoff
for retry in range(RETRY_MAX):
try:
response = self.session.post(
url,
headers=self.get_auth_headers(),
data=urlencode(request_data),
timeout=10
)
break # Success, exit retry loop
except (requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
requests.exceptions.ReadTimeout) as e:
if retry == RETRY_MAX - 1:
raise # Final retry failed, propagate error
wait_time = RETRY_BACKOFF * (2 ** retry)
logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
# All retries exhausted
print(f"Failed after {RETRY_MAX} attempts")
return None
# Debug raw response
# print(f"Response Status Code: {response.status_code}")
# print(f"Response Content: {response.text}")
# Handle response
if response.status_code == 200:
try:
json_response = response.json()
return json_response
except ValueError:
print("Failed to decode JSON response")
return None
elif response.status_code == 400:
print("400 Bad Request - likely missing or invalid parameters")
print("Verify these parameters:")
for param, value in request_data.items():
print(f"- {param}: {value}")
return None
elif response.status_code == 401:
print("401 Unauthorized - token may be expired or invalid")
return None
elif 500 <= response.status_code < 600:
raise requests.exceptions.ConnectionError(f"Server error {response.status_code}")
else:
print(f"Unexpected status code: {response.status_code}")
return None
def book_session(self, session_id: str) -> bool:
"""Book a specific session with debug logging."""
return self._make_request(
url="https://sport.nubapp.com/api/v4/activities/bookActivityCalendar.php",
data=self._prepare_booking_data(session_id),
success_msg=f"Successfully booked session {session_id}"
)
def _prepare_booking_data(self, session_id: str) -> Dict:
"""Prepare request data for booking a session"""
return {
**self.mandatory_params,
"id_activity_calendar": session_id,
"id_user": self.user_id,
"action_by": self.user_id,
"n_guests": "0",
"booked_on": "3"
}
def _make_request(self, url: str, data: Dict, success_msg: str) -> bool:
"""Handle API requests with retry logic and response processing"""
for retry in range(RETRY_MAX):
try:
response = self.session.post(
url,
headers=self.get_auth_headers(),
data=urlencode(data),
timeout=10
)
if response.status_code == 200:
json_response = response.json()
if json_response.get("success", False):
logging.info(success_msg)
return True
logging.error(f"API returned success:false: {json_response}")
return False
logging.error(f"HTTP {response.status_code}: {response.text}")
return False
except (requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
requests.exceptions.ReadTimeout) as e:
if retry == RETRY_MAX - 1:
logging.error(f"All {RETRY_MAX} retry attempts failed")
raise
wait_time = RETRY_BACKOFF * (2 ** retry)
logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...")
time.sleep(wait_time)
logging.error(f"Failed to complete request after {RETRY_MAX} attempts")
return False
def is_session_bookable(self, session: Dict, current_time: datetime) -> bool:
"""Check if a session is bookable based on user_info, ignoring error codes."""
user_info = session.get("user_info", {})
# First check if can_join is true (primary condition)
if user_info.get("can_join", False):
return True
# If can_join is False, check if there's a booking window
booking_date_str = user_info.get("unableToBookUntilDate", "")
booking_time_str = user_info.get("unableToBookUntilTime", "")
if booking_date_str and booking_time_str:
try:
booking_datetime = datetime.strptime(
f"{booking_date_str} {booking_time_str}",
"%d-%m-%Y %H:%M"
)
booking_datetime = pytz.timezone(TIMEZONE).localize(booking_datetime)
if current_time >= booking_datetime:
return True # Booking window is open
else:
return False # Still waiting for booking to open
except ValueError:
pass # Ignore invalid date formats
# Default case: not bookable
return False
def matches_preferred_session(self, session: Dict, current_time: datetime) -> bool:
"""Check if session matches one of your preferred sessions."""
try:
session_time = parse(session["start_timestamp"])
if not session_time.tzinfo:
session_time = pytz.timezone(TIMEZONE).localize(session_time)
# Get day of week (0=Monday, 6=Sunday) and time
day_of_week = session_time.weekday()
session_time_str = session_time.strftime("%H:%M")
session_name = session.get("name_activity", "").upper()
# Check against preferred sessions
for preferred_day, preferred_time, preferred_name in PREFERRED_SESSIONS:
if (day_of_week == preferred_day and
session_time_str == preferred_time and
preferred_name in session_name): # Partial match
return True
return False
except Exception as e:
logging.error(f"Failed to check session: {str(e)}")
return False
def run_booking_cycle(self, current_time: datetime):
"""Run one cycle of checking and booking sessions"""
# Calculate date range to check (next 3 days)
start_date = current_time.date()
end_date = start_date + timedelta(days=3)
# Get available sessions
sessions_data = self.get_available_sessions(start_date, end_date)
if not sessions_data or not sessions_data.get("success", False):
print("No sessions available or error fetching sessions")
return
activities = sessions_data.get("data", {}).get("activities_calendar", [])
# Find sessions to book (both preferred and any available)
sessions_to_book = []
for session in activities:
if not self.is_session_bookable(session, current_time):
continue
if self.matches_preferred_session(session, current_time):
sessions_to_book.append(("Preferred", session))
elif current_time.strftime("%H:%M") == TARGET_RESERVATION_TIME:
# At booking time, consider all available sessions
sessions_to_book.append(("Available", session))
if not sessions_to_book:
print("No matching sessions found to book")
return
# Book sessions (preferred first)
sessions_to_book.sort(key=lambda x: 0 if x[0] == "Preferred" else 1)
for session_type, session in sessions_to_book:
session_time = datetime.strptime(session["start_datetime"], "%Y-%m-%d %H:%M:%S")
print(f"Attempting to book {session_type} session at {session_time} ({session['name_activity']})")
if self.book_session(session["id_activity_calendar"]):
print(f"Successfully booked {session_type} session at {session_time}")
else:
print(f"Failed to book {session_type} session at {session_time}")
def run(self):
"""Main execution loop"""
# Set up timezone
tz = pytz.timezone(TIMEZONE)
# Initial login
if not self.login():
logging.error("Authentication failed - exiting program")
return
while True:
current_time = datetime.now(tz)
print(f"\nCurrent time: {current_time}")
# Run booking cycle at the target time or if it's a test
if current_time.strftime("%H:%M") == TARGET_RESERVATION_TIME:
self.run_booking_cycle(current_time)
# Wait a minute to avoid checking again immediately
time.sleep(60)
else:
# Check again in 30 seconds
time.sleep(30)
if __name__ == "__main__":
booker = CrossFitBooker()
if not booker.login():
print("Failed to login")
exit(1)
# Set timezone for current_time
tz = pytz.timezone(TIMEZONE)
current_time = datetime.now(tz)
# Get sessions for the next 7 days
start_date = datetime.now()
end_date = start_date + timedelta(days=3)
session_data = booker.get_available_sessions(start_date, end_date)
if not session_data or not session_data.get("success", False):
logging.error("Failed to get session data")
exit(1)
activities = session_data.get("data", {}).get("activities_calendar", [])
bookable_sessions = []
for session in activities:
# Assuming the string is stored in a variable named session_time_str
session_time_str = "2025-07-19 20:01:08.858174+02:00"
session_time = datetime.strptime(session_time_str, "%Y-%m-%d %H:%M:%S.%f%z")
if booker.is_session_bookable(session, current_time):
# if booker.is_session_bookable(session, session_time):
bookable_sessions.append(session)
# print(f"Bookable sessions: {json.dumps(bookable_sessions, indent=2)}")
print(f"Found {len(bookable_sessions)} sessions to book")
for session in bookable_sessions:
is_prefered_session = booker.matches_preferred_session(session, current_time)
# print(session.get("name_activity") + " / " + str(is_prefered_session))
if is_prefered_session:
booker.book_session(session.get("id_activity_calendar"))