Files
personnal-accounting/scripts/aggregate_by_month.py
2026-02-09 10:45:33 +01:00

328 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Script to aggregate all account statements by month or year
"""
import os
import csv
import sys
import argparse
import re
from datetime import datetime
from collections import defaultdict
import calendar
def parse_date(date_str, source_file):
"""
Parse date from various formats and return normalized (year, month, day)
"""
# Try different date formats
formats = [
'%d/%m/%Y', # DD/MM/YYYY
'%m/%d/%Y', # MM/DD/YYYY (Amex format)
'%Y-%m-%d', # YYYY-MM-DD (Revolut format)
]
for fmt in formats:
try:
dt = datetime.strptime(date_str, fmt)
return (dt.year, dt.month, dt.day)
except ValueError:
continue
# Try to extract from filename (for SNCF)
if 'salaire' in source_file.lower():
months = ['janvier', 'fevrier', 'mars', 'avril', 'mai', 'juin',
'juillet', 'aout', 'septembre', 'octobre', 'novembre', 'decembre']
for i, month in enumerate(months, 1):
if month.lower() in source_file.lower():
year_match = re.search(r'20(\d{2})', source_file)
year = int(year_match.group(1)) if year_match else datetime.now().year
return (year, i, 1)
# Default: return current date
return (datetime.now().year, datetime.now().month, 1)
def categorize_institution(source_file):
"""
Determine the institution based on the source filename
"""
source_lower = source_file.lower()
if 'boursobank' in source_lower or 'releve-compte' in source_lower:
return 'Boursobank'
elif 'american_express' in source_lower or 'amex' in source_lower:
return 'American Express'
elif 'monabanq' in source_lower or 'extrait de comptes' in source_lower:
return 'Monabanq'
elif 'revolut' in source_lower:
return 'Revolut'
elif 'sncf' in source_lower or 'salaire' in source_lower:
return 'SNCF'
elif 'la_poste' in source_lower or '2-la.poste' in source_lower or 'releve_ccp' in source_lower:
return 'La Poste'
return 'Other'
def process_csv_file(file_path):
"""
Process a CSV file and return a list of transactions
"""
transactions = []
institution = categorize_institution(os.path.basename(file_path))
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
# Get the date
date_str = row.get('Date', '')
if not date_str:
continue
# Parse and normalize the date
year, month, day = parse_date(date_str, row.get('Source', ''))
# Get amount (handle different column names)
amount_str = row.get('Amount', '') or row.get('Debit', '') or row.get('Credit', '0')
try:
amount = float(amount_str.replace(',', '.')) if amount_str else 0
except ValueError:
amount = 0
# Create transaction record
transactions.append({
'year': year,
'month': month,
'day': day,
'date_str': date_str,
'description': row.get('Description', ''),
'category': row.get('Category', 'Other'),
'amount': amount,
'institution': institution,
'source': row.get('Source', os.path.basename(file_path))
})
return transactions
def main():
parser = argparse.ArgumentParser(description='Aggregate all account statements by month or year')
parser.add_argument('--input-dir', default='output/csv',
help='Directory containing CSV files to aggregate (default: output/csv)')
parser.add_argument('--output-dir', default='output/reports',
help='Directory to save aggregated reports (default: output/reports)')
parser.add_argument('--annual', action='store_true',
help='Create annual reports instead of monthly reports')
parser.add_argument('--year', type=int,
help='Generate reports for a specific year only')
args = parser.parse_args()
# Create output directory
os.makedirs(args.output_dir, exist_ok=True)
report_type = "Annual" if args.annual else "Monthly"
print(f"\n{'='*60}")
print(f"{report_type} Aggregation of All Account Statements")
print(f"Input Directory: {os.path.abspath(args.input_dir)}")
print(f"Output Directory: {os.path.abspath(args.output_dir)}")
if args.year:
print(f"Year Filter: {args.year}")
print(f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"{'='*60}")
# Collect all transactions
all_transactions = []
# Find all CSV files in input directory
csv_files = [f for f in os.listdir(args.input_dir) if f.endswith('.csv')]
if not csv_files:
print(f"\nError: No CSV files found in {args.input_dir}")
return
# Process each CSV file
for csv_file in csv_files:
file_path = os.path.join(args.input_dir, csv_file)
print(f"\nProcessing: {csv_file}")
transactions = process_csv_file(file_path)
all_transactions.extend(transactions)
print(f" Found {len(transactions)} transactions")
# Group transactions by month
monthly_transactions = defaultdict(list)
for transaction in all_transactions:
key = (transaction['year'], transaction['month'])
monthly_transactions[key].append(transaction)
# Create monthly summary report
summary_file = os.path.join(args.output_dir, 'monthly_summary.csv')
with open(summary_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# Header
writer.writerow([
'Year', 'Month', 'Total Income', 'Total Expenses', 'Net Balance',
'Transaction Count', 'Institutions'
])
# Process each month
for year, month in sorted(monthly_transactions.keys()):
transactions = monthly_transactions[(year, month)]
month_name = calendar.month_name[month]
# Calculate totals
total_income = sum(t['amount'] for t in transactions if t['amount'] < 0) # Negative amounts are income in Revolut
total_expenses = sum(t['amount'] for t in transactions if t['amount'] > 0)
net_balance = total_income + total_expenses
transaction_count = len(transactions)
# Get unique institutions
institutions = sorted(list(set(t['institution'] for t in transactions)))
institutions_str = ', '.join(institutions)
# Write row
writer.writerow([
year, month_name, total_income, total_expenses, net_balance,
transaction_count, institutions_str
])
# Create yearly summary
yearly_summary = defaultdict(lambda: {'income': 0, 'expenses': 0, 'count': 0})
for transaction in all_transactions:
year = transaction['year']
yearly_summary[year]['count'] += 1
if transaction['amount'] < 0:
yearly_summary[year]['income'] += transaction['amount']
else:
yearly_summary[year]['expenses'] += transaction['amount']
# Create yearly summary file
yearly_file = os.path.join(args.output_dir, 'yearly_summary.csv')
with open(yearly_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['Year', 'Total Income', 'Total Expenses', 'Net Balance', 'Transaction Count'])
for year in sorted(yearly_summary.keys()):
data = yearly_summary[year]
net_balance = data['income'] + data['expenses']
writer.writerow([
year, data['income'], data['expenses'], net_balance, data['count']
])
# Create annual reports if requested
generated_files = [
os.path.basename(summary_file),
os.path.basename(yearly_file)
]
if args.annual:
# Create annual reports
for year in sorted(yearly_summary.keys()):
if args.year and year != args.year:
continue # Skip years not matching filter
print(f"\nCreating annual report for {year}...")
# Get all transactions for the year
year_transactions = [t for t in all_transactions if t['year'] == year]
# Group by category for the annual report
categories = defaultdict(lambda: {'count': 0, 'total': 0})
for transaction in year_transactions:
category = transaction['category']
amount = transaction['amount']
categories[category]['count'] += 1
categories[category]['total'] += amount
# Create annual detailed report
annual_file = os.path.join(args.output_dir, f'annual_report_{year}.csv')
with open(annual_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['Category', 'Transaction Count', 'Total Amount', 'Percentage'])
year_total = sum(c['total'] for c in categories.values())
# Sort categories by total amount
sorted_categories = sorted(categories.items(), key=lambda x: x[1]['total'], reverse=True)
for category, data in sorted_categories:
percentage = (data['total'] / year_total) * 100 if year_total != 0 else 0
writer.writerow([category, data['count'], data['total'], f"{percentage:.2f}%"])
# Create annual transactions file
annual_transactions_file = os.path.join(args.output_dir, f'annual_transactions_{year}.csv')
with open(annual_transactions_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=[
'Date', 'Description', 'Category', 'Amount',
'Institution', 'Source'
])
writer.writeheader()
# Sort transactions by date
sorted_transactions = sorted(year_transactions, key=lambda x: (x['month'], x['day'], x['description']))
for transaction in sorted_transactions:
writer.writerow({
'Date': transaction['date_str'],
'Description': transaction['description'],
'Category': transaction['category'],
'Amount': transaction['amount'],
'Institution': transaction['institution'],
'Source': transaction['source']
})
generated_files.append(os.path.basename(annual_file))
generated_files.append(os.path.basename(annual_transactions_file))
print(f" Created {os.path.basename(annual_file)} and {os.path.basename(annual_transactions_file)}")
else:
# Create monthly reports (existing functionality)
for year, month in sorted(monthly_transactions.keys()):
month_name = calendar.month_name[month].lower()
transactions = monthly_transactions[(year, month)]
# Create filename
detail_file = os.path.join(args.output_dir, f'transactions_{year}_{month_name}.csv')
with open(detail_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=[
'Date', 'Description', 'Category', 'Amount',
'Institution', 'Source'
])
writer.writeheader()
# Sort transactions by date
sorted_transactions = sorted(transactions, key=lambda x: (x['day'], x['description']))
for transaction in sorted_transactions:
writer.writerow({
'Date': transaction['date_str'],
'Description': transaction['description'],
'Category': transaction['category'],
'Amount': transaction['amount'],
'Institution': transaction['institution'],
'Source': transaction['source']
})
generated_files.append(f'transactions_{year}_{month_name}.csv')
# Print summary statistics
print(f"\n{'='*60}")
print(f"Aggregation Complete")
print(f"Total Transactions: {len(all_transactions)}")
print(f"Years with Data: {len(yearly_summary)}")
if not args.annual:
print(f"Months with Data: {len(monthly_transactions)}")
print(f"{'='*60}")
# List generated files
print("\nGenerated Files:")
for file in generated_files:
file_path = os.path.join(args.output_dir, file)
if os.path.exists(file_path):
file_size = os.path.getsize(file_path)
print(f" - {file} ({file_size:,} bytes)")
if __name__ == "__main__":
main()