Initial commit: Clean SEO analysis system

This commit is contained in:
Kevin Bataille
2026-02-16 05:25:16 +04:00
commit 3b51952336
13 changed files with 2611 additions and 0 deletions

348
content_gap_analyzer.py Normal file
View File

@@ -0,0 +1,348 @@
"""
Content gap analyzer for SEO strategy.
Identifies missing topics and content opportunities using AI analysis.
"""
import csv
import json
import argparse
import time
from pathlib import Path
from collections import defaultdict
from openai import OpenAI
from config import Config
class ContentGapAnalyzer:
"""Identify content gaps and opportunities."""
def __init__(self):
"""Initialize analyzer."""
self.config = Config
self.output_dir = self.config.OUTPUT_DIR
self.logs = []
self.client = None
if self.config.OPENROUTER_API_KEY:
self.client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=self.config.OPENROUTER_API_KEY,
)
def log(self, message):
"""Add message to log."""
self.logs.append(message)
print(message)
def load_posts(self, posts_csv):
"""Load post titles and data."""
posts = []
if not posts_csv.exists():
self.log(f"❌ File not found: {posts_csv}")
return posts
try:
with open(posts_csv, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
posts.append({
'id': row.get('ID', ''),
'title': row.get('Title', ''),
'url': row.get('URL', ''),
'traffic': int(row.get('traffic', 0) or 0),
'impressions': int(row.get('impressions', 0) or 0),
'top_keywords': row.get('top_keywords', '')
})
self.log(f"✓ Loaded {len(posts)} posts")
except Exception as e:
self.log(f"❌ Error reading posts: {e}")
return posts
def load_gsc_data(self, gsc_csv):
"""Load Search Console queries for gap analysis."""
queries = []
if not gsc_csv.exists():
self.log(f"⚠️ GSC file not found: {gsc_csv}")
return queries
try:
with open(gsc_csv, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
try:
query = row.get('Query', '').strip()
if not query:
continue
impressions = int(row.get('Impressions', 0) or 0)
clicks = int(row.get('Clicks', 0) or 0)
# Only include queries with impressions but low clicks
if impressions > 0 and (clicks / impressions < 0.05):
queries.append({
'query': query,
'impressions': impressions,
'clicks': clicks,
'ctr': clicks / impressions if impressions > 0 else 0
})
except (ValueError, TypeError):
continue
self.log(f"✓ Loaded {len(queries)} underperforming queries")
except Exception as e:
self.log(f"⚠️ Error reading GSC file: {e}")
return queries
def extract_topics(self, posts):
"""Extract topic clusters from post titles using AI."""
if not self.client or len(posts) == 0:
self.log("⚠️ Cannot extract topics without AI client or posts")
return {}
try:
self.log("🤖 Extracting topic clusters from post titles...")
# Batch posts into groups
titles = [p['title'] for p in posts][:100] # Limit to first 100
prompt = f"""Analyze these {len(titles)} blog post titles and identify topic clusters:
Titles:
{chr(10).join(f'{i+1}. {t}' for i, t in enumerate(titles))}
Extract for each post:
1. Primary topic category
2. Subtopics covered
3. Content type (guide, tutorial, review, comparison, etc.)
Then identify:
1. Top 10 topic clusters with post counts
2. Most common subtopics
3. Over/under-represented topics
Return JSON:
{{
"post_topics": {{
"1": {{"primary": "...", "subtopics": ["..."], "type": "..."}},
...
}},
"topic_clusters": [
{{"cluster": "...", "post_count": 0, "importance": "high/medium/low"}}
],
"coverage_gaps": ["topic 1", "topic 2", ...],
"niche": "detected niche or industry"
}}"""
response = self.client.chat.completions.create(
model=self.config.AI_MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=1500
)
try:
result_text = response.choices[0].message.content
start_idx = result_text.find('{')
end_idx = result_text.rfind('}') + 1
if start_idx >= 0 and end_idx > start_idx:
return json.loads(result_text[start_idx:end_idx])
except json.JSONDecodeError:
self.log("⚠️ Could not parse topic extraction response")
return {}
except Exception as e:
self.log(f"⚠️ Topic extraction failed: {e}")
return {}
def identify_content_gaps(self, topic_analysis, queries):
"""Use AI to identify content gaps and suggest new topics."""
if not self.client:
return []
try:
self.log("🤖 Identifying content gaps and opportunities...")
clusters = topic_analysis.get('topic_clusters', [])
gaps = topic_analysis.get('coverage_gaps', [])
niche = topic_analysis.get('niche', 'general')
# Prepare query analysis
top_queries = sorted(queries, key=lambda x: x['impressions'], reverse=True)[:20]
queries_str = '\n'.join([f"- {q['query']} ({q['impressions']} impr, {q['ctr']:.1%} CTR)"
for q in top_queries])
prompt = f"""Based on content analysis and search demand, identify content gaps:
Existing Topics: {', '.join([c.get('cluster', '') for c in clusters[:10]])}
Coverage Gaps: {', '.join(gaps[:5])}
Niche: {niche}
Top Underperforming Queries (low CTR despite impressions):
{queries_str}
Identify high-value missing topics that could:
1. Fill coverage gaps
2. Target underperforming queries (CTR improvement)
3. Capitalize on search demand
4. Complement existing content
For each suggestion:
- Topic title
- Why it's valuable (search demand + intent)
- Search volume estimate (high/medium/low)
- How it complements existing content
- Recommended content format
- Estimated traffic potential
Prioritize by traffic opportunity. Max 20 ideas.
Return JSON:
{{
"content_opportunities": [
{{
"title": "...",
"why_valuable": "...",
"search_volume": "high/medium/low",
"complements": "existing topic",
"format": "guide/tutorial/comparison/review/list",
"traffic_potential": number,
"priority": "high/medium/low"
}}
]
}}"""
response = self.client.chat.completions.create(
model=self.config.AI_MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=2000
)
try:
result_text = response.choices[0].message.content
start_idx = result_text.find('{')
end_idx = result_text.rfind('}') + 1
if start_idx >= 0 and end_idx > start_idx:
result = json.loads(result_text[start_idx:end_idx])
return result.get('content_opportunities', [])
except json.JSONDecodeError:
self.log("⚠️ Could not parse gap analysis response")
return []
except Exception as e:
self.log(f"⚠️ Gap analysis failed: {e}")
return []
def export_gaps_csv(self, gaps, output_csv):
"""Export content gaps to CSV."""
if not gaps:
self.log("⚠️ No gaps to export")
return
try:
fieldnames = [
'priority', 'title', 'why_valuable', 'search_volume',
'complements', 'format', 'traffic_potential'
]
with open(output_csv, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore')
writer.writeheader()
for gap in sorted(gaps, key=lambda x: x.get('priority') == 'high', reverse=True):
writer.writerow(gap)
self.log(f"✓ Exported {len(gaps)} content gaps to {output_csv}")
except Exception as e:
self.log(f"❌ Error exporting CSV: {e}")
def export_topic_clusters_json(self, topic_analysis, output_json):
"""Export topic analysis to JSON."""
if not topic_analysis:
return
try:
with open(output_json, 'w', encoding='utf-8') as f:
json.dump(topic_analysis, f, indent=2)
self.log(f"✓ Exported topic analysis to {output_json}")
except Exception as e:
self.log(f"❌ Error exporting JSON: {e}")
def export_log(self, log_file):
"""Export analysis log."""
try:
with open(log_file, 'w', encoding='utf-8') as f:
f.write("Content Gap Analysis Report\n")
f.write("=" * 60 + "\n\n")
for msg in self.logs:
f.write(msg + "\n")
self.log(f"✓ Exported log to {log_file}")
except Exception as e:
self.log(f"❌ Error exporting log: {e}")
def run(self, posts_csv, gsc_csv, output_csv):
"""Run complete analysis workflow."""
self.log("📊 Starting content gap analysis...")
self.log(f"Posts: {posts_csv}")
self.log(f"GSC queries: {gsc_csv}\n")
# Load data
posts = self.load_posts(posts_csv)
queries = self.load_gsc_data(gsc_csv)
if not posts:
return
# Extract topics
topic_analysis = self.extract_topics(posts)
if topic_analysis:
self.log(f"✓ Identified {len(topic_analysis.get('topic_clusters', []))} topic clusters")
# Identify gaps
gaps = self.identify_content_gaps(topic_analysis, queries)
if gaps:
self.log(f"✓ Identified {len(gaps)} content opportunities")
# Export
self.log("\n📁 Exporting results...")
self.export_gaps_csv(gaps, output_csv)
topic_json = self.output_dir / 'topic_clusters.json'
self.export_topic_clusters_json(topic_analysis, topic_json)
# Export log
log_dir = self.output_dir / 'logs'
log_dir.mkdir(exist_ok=True)
log_file = log_dir / 'content_gap_analysis_log.txt'
self.export_log(log_file)
self.log("\n✓ Content gap analysis complete!")
def main():
"""CLI entry point."""
parser = argparse.ArgumentParser(description='Analyze content gaps')
parser.add_argument('--posts-csv', type=Path,
default=Path('output/results/posts_with_analytics.csv'),
help='Posts CSV')
parser.add_argument('--gsc-queries', type=Path,
default=Path('input/analytics/gsc/Requêtes.csv'),
help='GSC queries CSV')
parser.add_argument('--output', type=Path,
default=Path('output/results/content_gaps.csv'),
help='Output gaps CSV')
args = parser.parse_args()
analyzer = ContentGapAnalyzer()
analyzer.run(args.posts_csv, args.gsc_queries, args.output)
if __name__ == '__main__':
main()