Files
seo/scripts/analytics_importer.py
Kevin Bataille 8c7cd24685 Refactor SEO automation into unified CLI application
Major refactoring to create a clean, integrated CLI application:

### New Features:
- Unified CLI executable (./seo) with simple command structure
- All commands accept optional CSV file arguments
- Auto-detection of latest files when no arguments provided
- Simplified output directory structure (output/ instead of output/reports/)
- Cleaner export filename format (all_posts_YYYY-MM-DD.csv)

### Commands:
- export: Export all posts from WordPress sites
- analyze [csv]: Analyze posts with AI (optional CSV input)
- recategorize [csv]: Recategorize posts with AI
- seo_check: Check SEO quality
- categories: Manage categories across sites
- approve [files]: Review and approve recommendations
- full_pipeline: Run complete workflow
- analytics, gaps, opportunities, report, status

### Changes:
- Moved all scripts to scripts/ directory
- Created config.yaml for configuration
- Updated all scripts to use output/ directory
- Deprecated old seo-cli.py in favor of new ./seo
- Added AGENTS.md and CHANGELOG.md documentation
- Consolidated README.md with updated usage

### Technical:
- Added PyYAML dependency
- Removed hardcoded configuration values
- All scripts now properly integrated
- Better error handling and user feedback

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
2026-02-16 14:24:44 +01:00

428 lines
17 KiB
Python

"""
Analytics data importer for SEO analysis.
Merges Google Analytics and Search Console data with WordPress posts.
"""
import csv
import json
import argparse
from pathlib import Path
from urllib.parse import urlparse, parse_qs
from collections import defaultdict
from config import Config
class AnalyticsImporter:
"""Import and consolidate analytics data with WordPress posts."""
def __init__(self):
"""Initialize importer."""
self.config = Config
self.output_dir = self.config.OUTPUT_DIR
self.logs = []
self.unmatched_urls = []
def log(self, message):
"""Add message to log."""
self.logs.append(message)
print(message)
def normalize_url(self, url):
"""Normalize URL for matching."""
if not url:
return ""
# Remove trailing slash, protocol, www
url = url.rstrip('/')
if url.startswith('http'):
url = urlparse(url).path
url = url.replace('www.', '')
return url.lower()
def extract_post_slug_from_url(self, url):
"""Extract post slug from URL path."""
path = urlparse(url).path.rstrip('/')
parts = [p for p in path.split('/') if p]
if parts:
return parts[-1] # Last part is usually the slug
return None
def load_ga4_data(self, ga4_csv):
"""Load Google Analytics 4 data."""
ga_data = {}
if not ga4_csv.exists():
self.log(f"⚠️ GA4 file not found: {ga4_csv}")
return ga_data
try:
with open(ga4_csv, 'r', encoding='utf-8') as f:
# Skip comment lines at the top (lines starting with #)
lines = [line for line in f if not line.startswith('#')]
reader = csv.DictReader(lines)
for row in reader:
if not row:
continue
# Handle French and English column names
url = (row.get('Page path and screen class') or
row.get('Chemin de la page et classe de l\'écran') or
row.get('Page path') or
row.get('Page') or '')
if not url:
continue
# Normalize URL
normalized = self.normalize_url(url)
# Extract metrics (handle French and English column names)
try:
traffic = int(float(row.get('Screened Views', row.get('Views', row.get('Vues', '0'))) or 0))
users = int(float(row.get('Users', row.get('Utilisateurs actifs', '0')) or 0))
bounce_rate = float(row.get('Bounce rate', row.get('Taux de rebond', '0')) or 0)
avg_duration_str = (row.get('Average session duration',
row.get('Durée d\'engagement moyenne par utilisateur actif', '0')) or '0')
avg_duration = float(avg_duration_str.replace(',', '.'))
except (ValueError, TypeError):
traffic = users = 0
bounce_rate = avg_duration = 0
ga_data[normalized] = {
'traffic': traffic,
'users': users,
'bounce_rate': bounce_rate,
'avg_session_duration': avg_duration,
'ga_url': url
}
self.log(f"✓ Loaded {len(ga_data)} GA4 entries")
except Exception as e:
self.log(f"❌ Error reading GA4 file: {e}")
return ga_data
def load_gsc_data(self, gsc_csv):
"""Load Google Search Console data (Page-level or Query-level)."""
gsc_data = {}
if not gsc_csv.exists():
self.log(f"⚠️ GSC file not found: {gsc_csv}")
return gsc_data
try:
with open(gsc_csv, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
if not row:
continue
# Determine if this is page-level or query-level data
# Pages.csv has: "Pages les plus populaires", Queries.csv has: "Requêtes les plus fréquentes"
url = (row.get('Page') or
row.get('Pages les plus populaires') or
row.get('URL') or '')
query = row.get('Query') or row.get('Requêtes les plus fréquentes', '').strip()
# Skip rows without URLs (query-only data)
if not url:
continue
# Try to parse metrics with flexible column names
try:
# Handle different number formats (decimal separator, percentage signs)
clicks_str = row.get('Clics', row.get('Clicks', '0')) or '0'
impressions_str = row.get('Impressions', '0') or '0'
ctr_str = row.get('CTR', '0') or '0'
position_str = row.get('Position', '0') or '0'
clicks = int(float(clicks_str.replace(',', '.').rstrip('%')))
impressions = int(float(impressions_str.replace(',', '.')))
ctr = float(ctr_str.replace(',', '.').rstrip('%')) / 100
position = float(position_str.replace(',', '.'))
except (ValueError, TypeError, AttributeError):
clicks = impressions = 0
ctr = position = 0
normalized = self.normalize_url(url)
if normalized not in gsc_data:
gsc_data[normalized] = {
'impressions': 0,
'clicks': 0,
'avg_position': 0,
'ctr': 0,
'keywords': [],
'gsc_url': url
}
# Accumulate data (in case of multiple rows per URL)
gsc_data[normalized]['impressions'] += impressions
gsc_data[normalized]['clicks'] += clicks
# Store position
if position > 0:
gsc_data[normalized]['positions'] = gsc_data[normalized].get('positions', [])
gsc_data[normalized]['positions'].append(position)
if query and query not in gsc_data[normalized]['keywords']:
gsc_data[normalized]['keywords'].append(query)
# Calculate average positions and finalize
for data in gsc_data.values():
if data.get('positions'):
data['avg_position'] = sum(data['positions']) / len(data['positions'])
del data['positions']
# Recalculate CTR from totals
if data['impressions'] > 0:
data['ctr'] = data['clicks'] / data['impressions']
data['keywords_count'] = len(data.get('keywords', []))
self.log(f"✓ Loaded {len(gsc_data)} GSC entries")
except Exception as e:
self.log(f"❌ Error reading GSC file: {e}")
return gsc_data
def load_posts_csv(self, posts_csv):
"""Load existing WordPress posts CSV."""
posts = {}
if not posts_csv.exists():
self.log(f"⚠️ Posts file not found: {posts_csv}")
return posts
try:
with open(posts_csv, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
# Handle different column name variations
post_id = row.get('ID') or row.get('post_id')
post_url = row.get('URL') or row.get('Post URL') or row.get('post_url')
post_slug = row.get('Post Slug') or row.get('Slug') or row.get('post_slug')
post_title = row.get('Title') or row.get('post_title')
if not post_id:
continue
normalized = self.normalize_url(post_url) if post_url else ""
# Handle different SEO column names
seo_title = (row.get('SEO Title') or
row.get('proposed_seo_title') or
row.get('current_seo_title') or '')
meta_desc = (row.get('Meta Description') or
row.get('proposed_meta_description') or
row.get('current_meta_description') or '')
posts[post_id] = {
'title': post_title or '',
'url': post_url,
'slug': post_slug,
'normalized_url': normalized,
'seo_title': seo_title,
'meta_description': meta_desc,
**{k: v for k, v in row.items()
if k not in ['ID', 'post_id', 'Title', 'post_title', 'URL', 'Post URL', 'post_url',
'Post Slug', 'Slug', 'post_slug', 'SEO Title', 'proposed_seo_title',
'current_seo_title', 'Meta Description', 'proposed_meta_description',
'current_meta_description']}
}
self.log(f"✓ Loaded {len(posts)} posts from CSV")
except Exception as e:
self.log(f"❌ Error reading posts CSV: {e}")
return posts
def match_analytics_to_posts(self, posts, ga_data, gsc_data):
"""Match analytics data to posts with fuzzy matching."""
self.log("\n📊 Matching analytics data to posts...")
matched_count = 0
for post_id, post_info in posts.items():
slug = post_info.get('slug') or self.extract_post_slug_from_url(post_info.get('url', ''))
normalized_url = post_info.get('normalized_url', '')
# Try direct URL match first
if normalized_url in ga_data:
post_info['ga_data'] = ga_data[normalized_url]
matched_count += 1
else:
post_info['ga_data'] = {}
if normalized_url in gsc_data:
post_info['gsc_data'] = gsc_data[normalized_url]
matched_count += 1
else:
post_info['gsc_data'] = {}
# Try slug-based matching if URL didn't match
if not post_info.get('gsc_data') and slug:
for gsc_url, gsc_info in gsc_data.items():
if slug in gsc_url:
post_info['gsc_data'] = gsc_info
break
# Track unmatched GSC URLs
matched_gsc_urls = set()
for post in posts.values():
if post.get('gsc_data'):
matched_gsc_urls.add(id(post['gsc_data']))
for normalized_url, gsc_info in gsc_data.items():
if id(gsc_info) not in matched_gsc_urls and gsc_info.get('impressions', 0) > 0:
self.unmatched_urls.append({
'url': gsc_info.get('gsc_url', normalized_url),
'impressions': gsc_info.get('impressions', 0),
'clicks': gsc_info.get('clicks', 0),
'avg_position': gsc_info.get('avg_position', 0)
})
self.log(f"✓ Matched data to posts")
return posts
def enrich_posts_data(self, posts):
"""Enrich posts with calculated metrics."""
for post_info in posts.values():
ga = post_info.get('ga_data', {})
gsc = post_info.get('gsc_data', {})
# GA metrics
post_info['traffic'] = ga.get('traffic', 0)
post_info['users'] = ga.get('users', 0)
post_info['bounce_rate'] = ga.get('bounce_rate', 0)
post_info['avg_session_duration'] = ga.get('avg_session_duration', 0)
# GSC metrics
post_info['impressions'] = gsc.get('impressions', 0)
post_info['clicks'] = gsc.get('clicks', 0)
post_info['avg_position'] = gsc.get('avg_position', 0)
post_info['ctr'] = gsc.get('ctr', 0)
post_info['keywords_count'] = gsc.get('keywords_count', 0)
post_info['top_keywords'] = ','.join(gsc.get('keywords', [])[:5])
return posts
def export_enriched_csv(self, posts, output_csv):
"""Export enriched posts data to CSV."""
if not posts:
self.log("❌ No posts to export")
return
try:
fieldnames = [
'ID', 'Title', 'URL', 'SEO Title', 'Meta Description',
'traffic', 'users', 'bounce_rate', 'avg_session_duration',
'impressions', 'clicks', 'avg_position', 'ctr', 'keywords_count', 'top_keywords'
]
# Add any extra fields from original posts
all_keys = set()
for post in posts.values():
all_keys.update(post.keys())
extra_fields = [k for k in sorted(all_keys)
if k not in fieldnames and k not in ['ga_data', 'gsc_data', 'normalized_url', 'slug']]
fieldnames.extend(extra_fields)
with open(output_csv, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore')
writer.writeheader()
for post_id, post_info in sorted(posts.items()):
row = {'ID': post_id}
row.update(post_info)
# Clean up nested dicts
for key in ['ga_data', 'gsc_data']:
row.pop(key, None)
writer.writerow(row)
self.log(f"✓ Exported {len(posts)} posts to {output_csv}")
except Exception as e:
self.log(f"❌ Error exporting CSV: {e}")
def export_log(self, log_file):
"""Export analysis log and unmatched URLs."""
try:
with open(log_file, 'w', encoding='utf-8') as f:
f.write("SEO Analytics Import Report\n")
f.write("=" * 60 + "\n\n")
f.write("Import Log:\n")
f.write("-" * 60 + "\n")
for log_msg in self.logs:
f.write(log_msg + "\n")
f.write("\n" + "=" * 60 + "\n")
f.write(f"Unmatched URLs ({len(self.unmatched_urls)} total):\n")
f.write("-" * 60 + "\n")
if self.unmatched_urls:
# Sort by impressions descending
for url_data in sorted(self.unmatched_urls,
key=lambda x: x['impressions'],
reverse=True):
f.write(f"\nURL: {url_data['url']}\n")
f.write(f" Impressions: {url_data['impressions']}\n")
f.write(f" Clicks: {url_data['clicks']}\n")
f.write(f" Avg Position: {url_data['avg_position']:.1f}\n")
else:
f.write("✓ All URLs matched successfully!\n")
self.log(f"✓ Exported log to {log_file}")
except Exception as e:
self.log(f"❌ Error exporting log: {e}")
def run(self, ga_csv, gsc_csv, posts_csv, output_csv):
"""Run complete import workflow."""
self.log("Starting analytics import...")
self.log(f"GA4 CSV: {ga_csv}")
self.log(f"GSC CSV: {gsc_csv}")
self.log(f"Posts CSV: {posts_csv}\n")
# Load data
ga_data = self.load_ga4_data(ga_csv)
gsc_data = self.load_gsc_data(gsc_csv)
posts = self.load_posts_csv(posts_csv)
if not posts:
self.log("❌ No posts found. Cannot proceed.")
return
# Match and merge
posts = self.match_analytics_to_posts(posts, ga_data, gsc_data)
posts = self.enrich_posts_data(posts)
# Export
self.export_enriched_csv(posts, output_csv)
# Export log
log_dir = self.output_dir / 'logs'
log_dir.mkdir(exist_ok=True)
log_file = log_dir / 'import_log.txt'
self.export_log(log_file)
self.log("\n✓ Analytics import complete!")
def main():
"""CLI entry point."""
parser = argparse.ArgumentParser(description='Import and merge analytics data')
parser.add_argument('--ga-export', type=Path,
default=Path('input/analytics/ga4_export.csv'),
help='GA4 export CSV path')
parser.add_argument('--gsc-export', type=Path,
default=Path('input/analytics/gsc/Pages.csv'),
help='Search Console export CSV path (Pages data)')
parser.add_argument('--posts-csv', type=Path,
default=Path('input/new-propositions.csv'),
help='Posts CSV path')
parser.add_argument('--output', type=Path,
default=Path('output/results/posts_with_analytics.csv'),
help='Output CSV path')
args = parser.parse_args()
importer = AnalyticsImporter()
importer.run(args.ga_export, args.gsc_export, args.posts_csv, args.output)
if __name__ == '__main__':
main()