From 86c9bb3a74613437414d2a5dc0048cafbbb3ab9d Mon Sep 17 00:00:00 2001 From: Kevin Bataille Date: Mon, 16 Feb 2026 15:31:29 +0100 Subject: [PATCH] Add editorial strategy analyzer - Determine editorial lines and recommend migrations New Features: - Analyze editorial lines for each site based on actual content - Recommend post migrations between sites - Suggest optimal category structure per site - Best practices for category count (5-10 per site) New Command: - seo editorial_strategy - Analyze and generate migration report New Module: - src/seo/editorial_strategy.py - Editorial strategy analyzer - EditorialStrategyAnalyzer: Main analyzer class - Analyzes content distribution per site - Determines editorial focus automatically - Recommends post migrations based on topics - Suggests optimal category structure Analysis Includes: - Category distribution per site - Topic analysis (VPN, Software, Gaming, Torrenting, etc.) - Traffic distribution - Content gaps identification - Overlapping content detection - Migration recommendations with priority Category Recommendations: - mistergeek.net: 8-12 categories (tech focus) - webscroll.fr: 5-8 categories (torrenting niche) - hellogeek.net: 3-5 categories (catch-all) Output: - Comprehensive markdown report - Migration recommendations table - Category structure recommendations - Action plan Usage: ./seo editorial_strategy # Analyze latest export ./seo editorial_strategy posts.csv # Analyze specific CSV # Generates: output/editorial_strategy_*.md Co-authored-by: Qwen-Coder --- src/seo/__init__.py | 4 +- src/seo/app.py | 22 ++ src/seo/cli.py | 32 ++- src/seo/editorial_strategy.py | 463 ++++++++++++++++++++++++++++++++++ 4 files changed, 517 insertions(+), 4 deletions(-) create mode 100644 src/seo/editorial_strategy.py diff --git a/src/seo/__init__.py b/src/seo/__init__.py index 0fa013e..fcca14b 100644 --- a/src/seo/__init__.py +++ b/src/seo/__init__.py @@ -12,7 +12,8 @@ __all__ = [ 'EnhancedPostAnalyzer', 'CategoryProposer', 'WordPressCategoryManager', - 'CategoryAssignmentProcessor' + 'CategoryAssignmentProcessor', + 'EditorialStrategyAnalyzer' ] # Import main classes for easy access @@ -21,3 +22,4 @@ from .exporter import PostExporter from .analyzer import PostAnalyzer, EnhancedPostAnalyzer from .category_proposer import CategoryProposer from .category_manager import WordPressCategoryManager, CategoryAssignmentProcessor +from .editorial_strategy import EditorialStrategyAnalyzer diff --git a/src/seo/app.py b/src/seo/app.py index 6d2fca0..bbe4963 100644 --- a/src/seo/app.py +++ b/src/seo/app.py @@ -11,6 +11,7 @@ from .exporter import PostExporter from .analyzer import EnhancedPostAnalyzer from .category_proposer import CategoryProposer from .category_manager import WordPressCategoryManager, CategoryAssignmentProcessor +from .editorial_strategy import EditorialStrategyAnalyzer logger = logging.getLogger(__name__) @@ -127,6 +128,27 @@ class SEOApp: return category_id + def editorial_strategy(self, csv_file: Optional[str] = None) -> dict: + """ + Analyze editorial strategy and recommend migrations. + + Args: + csv_file: Path to posts CSV (uses latest export if not provided) + + Returns: + Analysis results dict + """ + logger.info("šŸ“Š Analyzing editorial strategy...") + + if not csv_file: + csv_file = self._find_latest_export() + + if not csv_file: + raise FileNotFoundError("No exported posts found. Run export() first.") + + analyzer = EditorialStrategyAnalyzer() + return analyzer.run(csv_file) + def status(self) -> dict: """Get status of output files.""" files = list(self.output_dir.glob('*.csv')) diff --git a/src/seo/cli.py b/src/seo/cli.py index 285d947..6ba43e1 100644 --- a/src/seo/cli.py +++ b/src/seo/cli.py @@ -71,6 +71,7 @@ Examples: 'category_propose': cmd_category_propose, 'category_apply': cmd_category_apply, 'category_create': cmd_category_create, + 'editorial_strategy': cmd_editorial_strategy, 'status': cmd_status, 'help': cmd_help, } @@ -222,6 +223,29 @@ def cmd_category_create(app, args): return 0 +def cmd_editorial_strategy(app, args): + """Analyze editorial strategy and recommend migrations.""" + if args.dry_run: + print("Would analyze editorial strategy and recommend migrations") + return 0 + + csv_file = args.args[0] if args.args else None + + print("Analyzing editorial strategy...") + results = app.editorial_strategy(csv_file=csv_file) + + if results and results.get('report_file'): + print(f"\nāœ… Editorial strategy analysis complete!") + print(f" Report: {results['report_file']}") + print(f" Migrations recommended: {len(results.get('migrations', []))}") + print(f"\nOpen the report to review:") + print(f" 1. Editorial lines for each site") + print(f" 2. Post migration recommendations") + print(f" 3. Category structure recommendations") + print(f" 4. Action plan") + return 0 + + def cmd_status(app, args): """Show status.""" if args.dry_run: @@ -259,6 +283,10 @@ Category Management: category_create --site site Create a new category category_create -s mistergeek.net "VPN Reviews" +Strategy & Migration: + editorial_strategy [csv] Analyze editorial lines and recommend migrations + editorial_strategy Get migration recommendations between sites + Utility: status Show output files status help Show this help message @@ -275,13 +303,11 @@ Options: Examples: seo export - seo analyze seo analyze -f title categories - seo analyze -u -f meta_description seo category_propose - seo category_propose output/all_posts_2026-02-16.csv seo category_apply -s mistergeek.net -c Medium seo category_create -s webscroll.fr "Torrent Clients" + seo editorial_strategy seo status """) return 0 diff --git a/src/seo/editorial_strategy.py b/src/seo/editorial_strategy.py new file mode 100644 index 0000000..73d2cba --- /dev/null +++ b/src/seo/editorial_strategy.py @@ -0,0 +1,463 @@ +""" +Editorial Strategy Analyzer - Determine editorial lines and recommend migrations +""" + +import csv +import json +import logging +from pathlib import Path +from datetime import datetime +from typing import Dict, List, Optional, Tuple +from collections import Counter, defaultdict +import requests + +from .config import Config + +logger = logging.getLogger(__name__) + + +class EditorialStrategyAnalyzer: + """Analyze content to determine editorial lines and recommend migrations.""" + + def __init__(self): + """Initialize analyzer.""" + self.sites = Config.WORDPRESS_SITES + self.posts = [] + self.site_analysis = {} + self.migration_recommendations = [] + + def load_posts(self, csv_file: str) -> bool: + """Load posts from CSV.""" + logger.info(f"Loading posts from: {csv_file}") + + try: + with open(csv_file, 'r', encoding='utf-8') as f: + reader = csv.DictReader(f) + self.posts = list(reader) + + logger.info(f"āœ“ Loaded {len(self.posts)} posts") + return True + except Exception as e: + logger.error(f"Error loading posts: {e}") + return False + + def analyze_site_content(self) -> Dict[str, Dict]: + """ + Analyze content distribution across sites. + + Returns: + Dict with analysis per site + """ + logger.info("\n" + "="*70) + logger.info("ANALYZING EDITORIAL LINES") + logger.info("="*70) + + # Group posts by site + posts_by_site = defaultdict(list) + for post in self.posts: + site = post.get('site', 'unknown') + posts_by_site[site].append(post) + + # Analyze each site + for site_name, site_posts in posts_by_site.items(): + logger.info(f"\nšŸ“Š {site_name}: {len(site_posts)} posts") + + analysis = { + 'total_posts': len(site_posts), + 'categories': self._analyze_categories(site_posts), + 'topics': self._analyze_topics(site_posts), + 'traffic_distribution': self._analyze_traffic(site_posts), + 'content_gaps': [], + 'overlapping_content': [], + 'editorial_focus': '' + } + + # Determine editorial focus + analysis['editorial_focus'] = self._determine_editorial_focus(analysis) + + self.site_analysis[site_name] = analysis + + logger.info(f" Categories: {len(analysis['categories'])}") + logger.info(f" Top topics: {', '.join(list(analysis['topics'].keys())[:5])}") + logger.info(f" Editorial focus: {analysis['editorial_focus']}") + + return self.site_analysis + + def _analyze_categories(self, posts: List[Dict]) -> Dict[str, int]: + """Analyze category distribution.""" + categories = [] + for post in posts: + cats = post.get('categories', '') + if cats: + categories.extend([c.strip() for c in cats.split(',')]) + + return dict(Counter(categories).most_common()) + + def _analyze_topics(self, posts: List[Dict]) -> Dict[str, int]: + """Analyze topics based on titles and content.""" + topic_keywords = { + 'VPN': ['vpn', 'proxy', 'privacy', 'security', 'encryption'], + 'Software': ['software', 'app', 'tool', 'download', 'install'], + 'Gaming': ['game', 'gaming', 'console', 'steam', 'playstation'], + 'Torrenting': ['torrent', 'download', 'upload', 'tracker', 'seed'], + 'Streaming': ['stream', 'film', 'series', 'netflix', 'disney'], + 'SEO': ['seo', 'ranking', 'google', 'search', 'optimization'], + 'Tech': ['tech', 'technology', 'device', 'hardware', 'review'], + } + + topics = defaultdict(int) + for post in posts: + title = (post.get('title', '') + ' ' + post.get('content_preview', '')).lower() + for topic, keywords in topic_keywords.items(): + if any(kw in title for kw in keywords): + topics[topic] += 1 + + return dict(sorted(topics.items(), key=lambda x: x[1], reverse=True)) + + def _analyze_traffic(self, posts: List[Dict]) -> Dict[str, int]: + """Analyze traffic distribution.""" + traffic_ranges = {'0-10': 0, '11-50': 0, '51-100': 0, '101-500': 0, '500+': 0} + + for post in posts: + try: + traffic = int(post.get('traffic', 0) or 0) + if traffic <= 10: + traffic_ranges['0-10'] += 1 + elif traffic <= 50: + traffic_ranges['11-50'] += 1 + elif traffic <= 100: + traffic_ranges['51-100'] += 1 + elif traffic <= 500: + traffic_ranges['101-500'] += 1 + else: + traffic_ranges['500+'] += 1 + except (ValueError, TypeError): + traffic_ranges['0-10'] += 1 + + return traffic_ranges + + def _determine_editorial_focus(self, analysis: Dict) -> str: + """Determine the editorial focus based on analysis.""" + topics = analysis.get('topics', {}) + categories = analysis.get('categories', {}) + + if not topics and not categories: + return "Undefined - needs review" + + # Get top topics + top_topics = list(topics.keys())[:3] if topics else [] + top_categories = list(categories.keys())[:3] if categories else [] + + if top_topics: + return f"Focus on {', '.join(top_topics)} content" + elif top_categories: + return f"Focus on {', '.join(top_categories)}" + else: + return "Mixed content - needs consolidation" + + def recommend_migrations(self) -> List[Dict]: + """ + Recommend post migrations between sites. + + Returns: + List of migration recommendations + """ + logger.info("\n" + "="*70) + logger.info("RECOMMENDING POST MIGRATIONS") + logger.info("="*70) + + # Define ideal editorial lines + ideal_editorial_lines = { + 'mistergeek.net': { + 'topics': ['VPN', 'Software', 'Gaming', 'SEO', 'Tech'], + 'categories': ['VPN', 'Software/Tools', 'Gaming', 'SEO', 'Content Marketing'], + 'description': 'High-value tech content (VPN, software, gaming, SEO)' + }, + 'webscroll.fr': { + 'topics': ['Torrenting', 'Streaming'], + 'categories': ['Torrenting', 'File-Sharing', 'Tracker Guides'], + 'description': 'Torrenting and file-sharing content' + }, + 'hellogeek.net': { + 'topics': [], + 'categories': [], + 'description': 'Low-traffic, experimental, or off-brand content' + } + } + + # Topic to site mapping + topic_to_site = { + 'VPN': 'mistergeek.net', + 'Software': 'mistergeek.net', + 'Gaming': 'mistergeek.net', + 'SEO': 'mistergeek.net', + 'Tech': 'mistergeek.net', + 'Torrenting': 'webscroll.fr', + 'Streaming': 'webscroll.fr', + } + + migrations = [] + + for post in self.posts: + current_site = post.get('site', '') + post_topics = self._extract_post_topics(post) + + # Determine best site based on topics + best_site = None + for topic in post_topics: + if topic in topic_to_site: + best_site = topic_to_site[topic] + break + + # If no topic match, check traffic + if not best_site: + try: + traffic = int(post.get('traffic', 0) or 0) + if traffic < 50: + best_site = 'hellogeek.net' + else: + best_site = 'mistergeek.net' + except (ValueError, TypeError): + best_site = 'hellogeek.net' + + # Recommend migration if different from current + if best_site and best_site != current_site: + migrations.append({ + 'post_id': post.get('post_id', ''), + 'title': post.get('title', '')[:80], + 'current_site': current_site, + 'recommended_site': best_site, + 'reason': f"Content matches {best_site} editorial line", + 'topics': ', '.join(post_topics), + 'traffic': post.get('traffic', '0'), + 'priority': 'High' if int(post.get('traffic', 0) or 0) > 100 else 'Medium' + }) + + self.migration_recommendations = migrations + + logger.info(f"\nāœ“ Found {len(migrations)} migration recommendations") + + # Summary by site + by_site = defaultdict(int) + for mig in migrations: + by_site[mig['recommended_site']] += 1 + + for site, count in by_site.items(): + logger.info(f" {site}: {count} posts recommended") + + return migrations + + def _extract_post_topics(self, post: Dict) -> List[str]: + """Extract topics from a post.""" + topic_keywords = { + 'VPN': ['vpn', 'proxy', 'privacy'], + 'Software': ['software', 'app', 'tool'], + 'Gaming': ['game', 'gaming'], + 'Torrenting': ['torrent', 'download'], + 'Streaming': ['stream', 'film'], + 'SEO': ['seo', 'ranking'], + } + + text = (post.get('title', '') + ' ' + post.get('content_preview', '')).lower() + topics = [] + + for topic, keywords in topic_keywords.items(): + if any(kw in text for kw in keywords): + topics.append(topic) + + return topics + + def recommend_category_structure(self) -> Dict[str, Dict]: + """ + Recommend optimal category structure for each site. + + Returns: + Dict with category recommendations per site + """ + logger.info("\n" + "="*70) + logger.info("RECOMMENDING CATEGORY STRUCTURE") + logger.info("="*70) + + recommendations = {} + + for site_name, analysis in self.site_analysis.items(): + current_categories = analysis.get('categories', {}) + current_topics = analysis.get('topics', {}) + + # Recommend 5-10 categories per site for optimal SEO + if site_name == 'mistergeek.net': + recommended = { + 'ideal_count': '8-12 categories', + 'current_count': len(current_categories), + 'recommended_categories': [ + {'name': 'VPN', 'priority': 'High'}, + {'name': 'Software/Tools', 'priority': 'High'}, + {'name': 'Gaming', 'priority': 'High'}, + {'name': 'SEO', 'priority': 'Medium'}, + {'name': 'Content Marketing', 'priority': 'Medium'}, + {'name': 'Tech Reviews', 'priority': 'Medium'}, + {'name': 'Tutorials', 'priority': 'Low'}, + {'name': 'News', 'priority': 'Low'}, + ], + 'categories_to_merge': self._find_similar_categories(current_categories), + 'action': 'Consolidate similar categories, focus on core topics' + } + elif site_name == 'webscroll.fr': + recommended = { + 'ideal_count': '5-8 categories', + 'current_count': len(current_categories), + 'recommended_categories': [ + {'name': 'Torrenting', 'priority': 'High'}, + {'name': 'File-Sharing', 'priority': 'High'}, + {'name': 'Tracker Guides', 'priority': 'High'}, + {'name': 'VPN for Torrenting', 'priority': 'Medium'}, + {'name': 'Seedbox', 'priority': 'Medium'}, + {'name': 'Legal', 'priority': 'Low'}, + ], + 'categories_to_merge': self._find_similar_categories(current_categories), + 'action': 'Keep focused on torrenting niche' + } + else: # hellogeek.net + recommended = { + 'ideal_count': '3-5 categories', + 'current_count': len(current_categories), + 'recommended_categories': [ + {'name': 'Experimental', 'priority': 'High'}, + {'name': 'Low-Traffic', 'priority': 'High'}, + {'name': 'Off-Brand', 'priority': 'Medium'}, + {'name': 'Testing', 'priority': 'Low'}, + ], + 'categories_to_merge': self._find_similar_categories(current_categories), + 'action': 'Minimal categories for catch-all site' + } + + recommendations[site_name] = recommended + + logger.info(f"\n{site_name}:") + logger.info(f" Current: {recommended['current_count']} categories") + logger.info(f" Ideal: {recommended['ideal_count']}") + logger.info(f" Action: {recommended['action']}") + + return recommendations + + def _find_similar_categories(self, categories: Dict[str, int]) -> List[Tuple[str, str]]: + """Find categories that should be merged.""" + similar_pairs = [] + category_list = list(categories.keys()) + + # Simple similarity check (could be enhanced with NLP) + for i, cat1 in enumerate(category_list): + for cat2 in category_list[i+1:]: + # Check if one contains the other + if cat1.lower() in cat2.lower() or cat2.lower() in cat1.lower(): + similar_pairs.append((cat1, cat2)) + # Check plural/singular + elif cat1.lower().rstrip('s') == cat2.lower().rstrip('s'): + similar_pairs.append((cat1, cat2)) + + return similar_pairs + + def export_strategy_report(self, output_file: Optional[str] = None) -> str: + """Export comprehensive strategy report.""" + if not output_file: + output_dir = Path(__file__).parent.parent.parent / 'output' + output_dir.mkdir(parents=True, exist_ok=True) + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + output_file = output_dir / f'editorial_strategy_{timestamp}.md' + + output_file = Path(output_file) + + report = [] + report.append("# Editorial Strategy Report\n") + report.append(f"*Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}*\n") + + # Executive Summary + report.append("## Executive Summary\n") + report.append(f"**Total Posts Analyzed:** {len(self.posts)}\n") + report.append(f"**Sites Analyzed:** {len(self.site_analysis)}\n") + report.append(f"**Migration Recommendations:** {len(self.migration_recommendations)}\n\n") + + # Site Analysis + report.append("## Site-by-Site Analysis\n") + for site_name, analysis in self.site_analysis.items(): + report.append(f"\n### {site_name}\n") + report.append(f"**Posts:** {analysis['total_posts']}\n") + report.append(f"**Editorial Focus:** {analysis['editorial_focus']}\n") + + report.append("\n**Top Categories:**\n") + for cat, count in list(analysis['categories'].items())[:10]: + report.append(f"- {cat}: {count} posts\n") + + report.append("\n**Top Topics:**\n") + for topic, count in list(analysis['topics'].items())[:5]: + report.append(f"- {topic}: {count} posts\n") + + # Migration Recommendations + report.append("\n## Migration Recommendations\n") + if self.migration_recommendations: + report.append(f"**Total posts to migrate:** {len(self.migration_recommendations)}\n\n") + + # Group by target site + by_target = defaultdict(list) + for mig in self.migration_recommendations: + by_target[mig['recommended_site']].append(mig) + + for site, migrations in by_target.items(): + report.append(f"\n### To {site}: {len(migrations)} posts\n") + report.append("| Post ID | Title | Current Site | Priority |\n") + report.append("|---------|-------|--------------|----------|\n") + for mig in migrations[:20]: # Show first 20 + report.append(f"| {mig['post_id']} | {mig['title'][:50]} | {mig['current_site']} | {mig['priority']} |\n") + else: + report.append("No migrations recommended.\n") + + # Category Recommendations + report.append("\n## Category Structure Recommendations\n") + report.append("\n**Best Practice:** 5-10 categories per site for optimal SEO\n") + report.append("- Too few (<3): Poor content organization\n") + report.append("- Too many (>15): Diluted category authority\n\n") + + for site_name, rec in self.recommend_category_structure().items(): + report.append(f"\n### {site_name}\n") + report.append(f"- Current: {rec['current_count']} categories\n") + report.append(f"- Recommended: {rec['ideal_count']}\n") + report.append(f"- Action: {rec['action']}\n") + + report.append("\n**Recommended Categories:**\n") + for cat in rec['recommended_categories']: + report.append(f"- {cat['name']} ({cat['priority']})\n") + + if rec['categories_to_merge']: + report.append("\n**Consider Merging:**\n") + for cat1, cat2 in rec['categories_to_merge'][:5]: + report.append(f"- {cat1} + {cat2}\n") + + # Action Plan + report.append("\n## Recommended Action Plan\n") + report.append("\n1. **Review migration recommendations** - Check if AI suggestions make sense\n") + report.append("2. **Execute migrations** - Move posts to recommended sites\n") + report.append("3. **Consolidate categories** - Merge similar categories\n") + report.append("4. **Monitor results** - Track traffic changes after migrations\n") + + with open(output_file, 'w', encoding='utf-8') as f: + f.write(''.join(report)) + + logger.info(f"āœ“ Strategy report exported to: {output_file}") + return str(output_file) + + def run(self, csv_file: str, output_file: Optional[str] = None) -> Dict: + """Run complete editorial strategy analysis.""" + if not self.load_posts(csv_file): + return {} + + self.analyze_site_content() + self.recommend_migrations() + self.recommend_category_structure() + + report_file = self.export_strategy_report(output_file) + + return { + 'site_analysis': self.site_analysis, + 'migrations': self.migration_recommendations, + 'report_file': report_file + }