Files
seo/scripts/opportunity_analyzer.py
Kevin Bataille 8c7cd24685 Refactor SEO automation into unified CLI application
Major refactoring to create a clean, integrated CLI application:

### New Features:
- Unified CLI executable (./seo) with simple command structure
- All commands accept optional CSV file arguments
- Auto-detection of latest files when no arguments provided
- Simplified output directory structure (output/ instead of output/reports/)
- Cleaner export filename format (all_posts_YYYY-MM-DD.csv)

### Commands:
- export: Export all posts from WordPress sites
- analyze [csv]: Analyze posts with AI (optional CSV input)
- recategorize [csv]: Recategorize posts with AI
- seo_check: Check SEO quality
- categories: Manage categories across sites
- approve [files]: Review and approve recommendations
- full_pipeline: Run complete workflow
- analytics, gaps, opportunities, report, status

### Changes:
- Moved all scripts to scripts/ directory
- Created config.yaml for configuration
- Updated all scripts to use output/ directory
- Deprecated old seo-cli.py in favor of new ./seo
- Added AGENTS.md and CHANGELOG.md documentation
- Consolidated README.md with updated usage

### Technical:
- Added PyYAML dependency
- Removed hardcoded configuration values
- All scripts now properly integrated
- Better error handling and user feedback

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
2026-02-16 14:24:44 +01:00

348 lines
14 KiB
Python

"""
Keyword opportunity analyzer for SEO optimization.
Identifies high-potential keywords ranking at positions 11-30.
"""
import csv
import json
import argparse
import time
from pathlib import Path
from openai import OpenAI
from config import Config
class OpportunityAnalyzer:
"""Analyze keyword opportunities for SEO optimization."""
def __init__(self):
"""Initialize analyzer."""
self.config = Config
self.output_dir = self.config.OUTPUT_DIR
self.logs = []
self.client = None
if self.config.OPENROUTER_API_KEY:
self.client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=self.config.OPENROUTER_API_KEY,
)
def log(self, message):
"""Add message to log."""
self.logs.append(message)
print(message)
def load_posts(self, posts_csv):
"""Load posts with analytics data."""
posts = []
if not posts_csv.exists():
self.log(f"❌ File not found: {posts_csv}")
return posts
try:
with open(posts_csv, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
try:
posts.append({
'id': row.get('ID', ''),
'title': row.get('Title', ''),
'url': row.get('URL', ''),
'impressions': int(row.get('impressions', 0) or 0),
'clicks': int(row.get('clicks', 0) or 0),
'avg_position': float(row.get('avg_position', 0) or 0),
'ctr': float(row.get('ctr', 0) or 0),
'traffic': int(row.get('traffic', 0) or 0),
'bounce_rate': float(row.get('bounce_rate', 0) or 0),
'keywords_count': int(row.get('keywords_count', 0) or 0),
'top_keywords': row.get('top_keywords', '')
})
except (ValueError, TypeError):
continue
self.log(f"✓ Loaded {len(posts)} posts")
except Exception as e:
self.log(f"❌ Error reading posts: {e}")
return posts
def filter_opportunities(self, posts, min_pos, max_pos, min_impressions):
"""Filter posts with keywords in opportunity range or high traffic for optimization."""
opportunities = []
for post in posts:
position = post.get('avg_position', 0)
impressions = post.get('impressions', 0)
traffic = post.get('traffic', 0)
# Primary filter: position range (if data available)
if position > 0:
if min_pos <= position <= max_pos and impressions >= min_impressions:
opportunities.append(post)
# Fallback: filter by traffic when position data unavailable
# Include posts with any traffic for optimization analysis
elif traffic > 0:
opportunities.append(post)
self.log(f"✓ Found {len(opportunities)} posts for optimization analysis")
if opportunities:
traffic_posts = [p for p in opportunities if p.get('traffic', 0) > 0]
self.log(f" ({len(traffic_posts)} have traffic data, {len(opportunities) - len(traffic_posts)} selected for analysis)")
return opportunities
def calculate_opportunity_score(self, post):
"""Calculate opportunity score (0-100) for a post."""
position = post.get('avg_position', 50)
impressions = post.get('impressions', 0)
ctr = post.get('ctr', 0)
traffic = post.get('traffic', 0)
# Position score (35%): Closer to page 1 = higher
# Position 11-30 range
position_score = max(0, (30 - position) / 19 * 35)
# Traffic potential (30%): Based on impressions
# Normalize to 0-30
traffic_potential = min(30, (impressions / 1000) * 30)
# CTR improvement potential (20%): Gap between current and expected CTR
# Expected CTR at position X
expected_ctr_map = {
11: 0.02, 12: 0.02, 13: 0.015, 14: 0.015, 15: 0.013,
16: 0.012, 17: 0.011, 18: 0.01, 19: 0.009, 20: 0.008,
21: 0.008, 22: 0.007, 23: 0.007, 24: 0.006, 25: 0.006,
26: 0.006, 27: 0.005, 28: 0.005, 29: 0.005, 30: 0.004
}
expected_ctr = expected_ctr_map.get(int(position), 0.005)
ctr_gap = max(0, expected_ctr - ctr)
ctr_score = min(20, (ctr_gap / expected_ctr * 100 / 5) * 20)
# Content quality (15%): Existing traffic and engagement
quality_score = min(15, (traffic / 100) * 7.5 +
(100 - post.get('bounce_rate', 50)) / 100 * 7.5)
return round(position_score + traffic_potential + ctr_score + quality_score, 1)
def estimate_traffic_gain(self, post):
"""Estimate potential traffic gain from optimization."""
position = post.get('avg_position', 50)
impressions = post.get('impressions', 0)
ctr = post.get('ctr', 0)
# Estimate CTR improvement from moving one position up
# Moving from position X to X-1 typically improves CTR by 20-30%
current_traffic = impressions * ctr
if position > 11:
# Target position: 1 ahead
improvement_factor = 1.25 # 25% improvement per position
estimated_new_traffic = current_traffic * improvement_factor
gain = estimated_new_traffic - current_traffic
else:
gain = 0
return round(gain, 0)
def generate_ai_recommendations(self, post):
"""Generate AI recommendations for top opportunities."""
if not self.client:
return None
try:
keywords = post.get('top_keywords', '').split(',')[:5]
keywords_str = ', '.join([k.strip() for k in keywords if k.strip()])
prompt = f"""Analyze keyword optimization opportunities for this blog post:
Post Title: {post['title']}
Current Position: {post['avg_position']:.1f}
Monthly Impressions: {post['impressions']}
Current CTR: {post['ctr']:.2%}
Top Keywords: {keywords_str}
Provide 2-3 specific, actionable recommendations to:
1. Improve the SEO title to increase CTR
2. Enhance the meta description
3. Target structural improvements (headers, content gaps)
Focus on moving this post from positions 11-20 to page 1 (positions 1-10).
Be specific and practical.
Return as JSON:
{{
"title_recommendations": ["recommendation 1", "recommendation 2"],
"description_recommendations": ["recommendation 1", "recommendation 2"],
"content_recommendations": ["recommendation 1", "recommendation 2"],
"estimated_effort_hours": number,
"expected_position_improvement": number
}}"""
response = self.client.chat.completions.create(
model=self.config.AI_MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=500
)
try:
result_text = response.choices[0].message.content
# Extract JSON
start_idx = result_text.find('{')
end_idx = result_text.rfind('}') + 1
if start_idx >= 0 and end_idx > start_idx:
return json.loads(result_text[start_idx:end_idx])
except json.JSONDecodeError:
self.log(f"⚠️ Could not parse AI response for {post['title']}")
return None
except Exception as e:
self.log(f"⚠️ AI generation failed for {post['title']}: {e}")
return None
def export_opportunities_csv(self, opportunities, output_csv):
"""Export opportunities to CSV."""
if not opportunities:
self.log("⚠️ No opportunities to export")
return
try:
fieldnames = [
'ID', 'Title', 'URL', 'avg_position', 'impressions', 'clicks',
'ctr', 'traffic', 'bounce_rate', 'keywords_count', 'top_keywords',
'opportunity_score', 'estimated_traffic_gain',
'title_recommendations', 'description_recommendations',
'content_recommendations', 'estimated_effort_hours',
'expected_position_improvement'
]
with open(output_csv, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore')
writer.writeheader()
for opp in sorted(opportunities, key=lambda x: x['opportunity_score'], reverse=True):
row = {
'ID': opp['id'],
'Title': opp['title'],
'URL': opp['url'],
'avg_position': opp['avg_position'],
'impressions': opp['impressions'],
'clicks': opp['clicks'],
'ctr': f"{opp['ctr']:.2%}",
'traffic': opp['traffic'],
'bounce_rate': opp['bounce_rate'],
'keywords_count': opp['keywords_count'],
'top_keywords': opp['top_keywords'],
'opportunity_score': opp['opportunity_score'],
'estimated_traffic_gain': opp['estimated_traffic_gain'],
'title_recommendations': opp.get('title_recommendations_str', ''),
'description_recommendations': opp.get('description_recommendations_str', ''),
'content_recommendations': opp.get('content_recommendations_str', ''),
'estimated_effort_hours': opp.get('estimated_effort_hours', ''),
'expected_position_improvement': opp.get('expected_position_improvement', '')
}
writer.writerow(row)
self.log(f"✓ Exported {len(opportunities)} opportunities to {output_csv}")
except Exception as e:
self.log(f"❌ Error exporting CSV: {e}")
def export_log(self, log_file):
"""Export analysis log."""
try:
with open(log_file, 'w', encoding='utf-8') as f:
f.write("SEO Opportunity Analysis Report\n")
f.write("=" * 60 + "\n\n")
for msg in self.logs:
f.write(msg + "\n")
self.log(f"✓ Exported log to {log_file}")
except Exception as e:
self.log(f"❌ Error exporting log: {e}")
def run(self, posts_csv, output_csv, min_position=11, max_position=30,
min_impressions=50, top_n=20):
"""Run complete analysis workflow."""
self.log("🔍 Starting keyword opportunity analysis...")
self.log(f"Input: {posts_csv}")
self.log(f"Position range: {min_position}-{max_position}")
self.log(f"Min impressions: {min_impressions}")
self.log(f"Top N for AI analysis: {top_n}\n")
# Load posts
posts = self.load_posts(posts_csv)
if not posts:
return
# Filter opportunities
opportunities = self.filter_opportunities(posts, min_position, max_position, min_impressions)
if not opportunities:
self.log("⚠️ No opportunities found in specified range")
return
# Calculate scores
self.log("\n📊 Calculating opportunity scores...")
for opp in opportunities:
opp['opportunity_score'] = self.calculate_opportunity_score(opp)
opp['estimated_traffic_gain'] = self.estimate_traffic_gain(opp)
# Sort by score
opportunities = sorted(opportunities, key=lambda x: x['opportunity_score'], reverse=True)
# Get AI recommendations for top N
self.log(f"\n🤖 Generating AI recommendations for top {min(top_n, len(opportunities))} opportunities...")
for i, opp in enumerate(opportunities[:top_n]):
self.log(f" [{i+1}/{min(top_n, len(opportunities))}] {opp['title'][:50]}...")
recommendations = self.generate_ai_recommendations(opp)
if recommendations:
opp['title_recommendations_str'] = '; '.join(recommendations.get('title_recommendations', []))
opp['description_recommendations_str'] = '; '.join(recommendations.get('description_recommendations', []))
opp['content_recommendations_str'] = '; '.join(recommendations.get('content_recommendations', []))
opp['estimated_effort_hours'] = recommendations.get('estimated_effort_hours', '')
opp['expected_position_improvement'] = recommendations.get('expected_position_improvement', '')
time.sleep(0.2) # Rate limiting
# Export
self.log("\n📁 Exporting results...")
self.export_opportunities_csv(opportunities, output_csv)
# Export log
log_dir = self.output_dir / 'logs'
log_dir.mkdir(exist_ok=True)
log_file = log_dir / 'opportunity_analysis_log.txt'
self.export_log(log_file)
self.log(f"\n✓ Analysis complete! {len(opportunities)} opportunities identified.")
self.log(f" Top opportunity: {opportunities[0]['title'][:50]}... (score: {opportunities[0]['opportunity_score']})")
def main():
"""CLI entry point."""
parser = argparse.ArgumentParser(description='Analyze keyword opportunities')
parser.add_argument('--input', type=Path,
default=Path('output/results/posts_with_analytics.csv'),
help='Input posts CSV')
parser.add_argument('--output', type=Path,
default=Path('output/results/keyword_opportunities.csv'),
help='Output opportunities CSV')
parser.add_argument('--min-position', type=int, default=11,
help='Minimum position (start of range)')
parser.add_argument('--max-position', type=int, default=30,
help='Maximum position (end of range)')
parser.add_argument('--min-impressions', type=int, default=50,
help='Minimum impressions to consider')
parser.add_argument('--top-n', type=int, default=20,
help='Top N for AI recommendations')
args = parser.parse_args()
analyzer = OpportunityAnalyzer()
analyzer.run(args.input, args.output, args.min_position, args.max_position,
args.min_impressions, args.top_n)
if __name__ == '__main__':
main()