Refactor to single integrated package - Remove scripts folder

Major refactoring to create a unified, self-contained Python package:

### Architecture Changes:
- Removed scripts/ directory completely
- All functionality now in src/seo/ package
- Single entry point: ./seo (imports from src/seo/cli)
- No external dependencies on scripts folder

### New Package Structure:
src/seo/
├── __init__.py          - Package exports (SEOApp, PostExporter, etc.)
├── cli.py               - Command-line interface
├── app.py               - Main application class
├── config.py            - Configuration management
├── exporter.py          - Post export functionality (self-contained)
├── analyzer.py          - Enhanced analyzer with selective fields
├── category_proposer.py - AI category proposals (self-contained)
├── seo_checker.py       - Placeholder for future implementation
├── categories.py        - Placeholder for future implementation
├── approval.py          - Placeholder for future implementation
└── recategorizer.py     - Placeholder for future implementation

### Features:
- All modules are self-contained (no scripts dependencies)
- EnhancedPostAnalyzer with selective field analysis
- CategoryProposer for AI-powered category suggestions
- Support for in-place CSV updates with backups
- Clean, integrated codebase

### CLI Commands:
- seo export - Export posts from WordPress
- seo analyze - Analyze with AI (supports -f fields, -u update)
- seo category_propose - Propose categories
- seo status - Show output files
- seo help - Show help

### Usage Examples:
./seo export
./seo analyze -f title categories
./seo analyze -u -f meta_description
./seo category_propose
./seo status

### Benefits:
- Single source of truth
- Easier to maintain and extend
- Proper Python package structure
- Can be installed with pip install -e .
- Clean imports throughout
- No path resolution issues

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
Kevin Bataille
2026-02-16 15:20:11 +01:00
parent 95092a591f
commit c8fb141cdd
27 changed files with 468 additions and 6342 deletions

View File

@@ -1,7 +1,14 @@
"""
SEO Automation Tool - Integrated Application
A comprehensive WordPress SEO automation suite.
SEO Automation Tool - Complete Integrated Package
Single entry point for all SEO automation functionality.
"""
__version__ = '1.0.0'
__author__ = 'SEO Automation Team'
__all__ = ['SEOApp', 'PostExporter', 'PostAnalyzer', 'CategoryProposer']
# Import main classes for easy access
from .app import SEOApp
from .exporter import PostExporter
from .analyzer import PostAnalyzer, EnhancedPostAnalyzer
from .category_proposer import CategoryProposer

View File

@@ -1,15 +1,353 @@
"""
Analyzer Module - AI-powered post analysis
Post Analyzer - AI-powered post analysis with selective field support
"""
import sys
import csv
import json
import logging
import shutil
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional
import requests
# Import from scripts directory (parent of src)
scripts_dir = Path(__file__).parents[2] / 'scripts'
if str(scripts_dir) not in sys.path:
sys.path.insert(0, str(scripts_dir))
from .config import Config
from ai_analyze_posts_for_decisions import PostAnalyzer
logger = logging.getLogger(__name__)
__all__ = ['PostAnalyzer']
class PostAnalyzer:
"""Basic post analyzer (legacy compatibility)."""
def __init__(self, csv_file: str):
self.csv_file = Path(csv_file)
self.openrouter_api_key = Config.OPENROUTER_API_KEY
self.ai_model = Config.AI_MODEL
self.posts = []
self.analyzed_posts = []
self.api_calls = 0
self.ai_cost = 0.0
def load_csv(self) -> bool:
"""Load posts from CSV."""
if not self.csv_file.exists():
logger.error(f"CSV file not found: {self.csv_file}")
return False
try:
with open(self.csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
self.posts = list(reader)
logger.info(f"✓ Loaded {len(self.posts)} posts")
return True
except Exception as e:
logger.error(f"Error loading CSV: {e}")
return False
def run(self) -> None:
"""Run basic analysis (placeholder for legacy compatibility)."""
if not self.load_csv():
return
logger.warning("Basic PostAnalyzer is deprecated. Use EnhancedPostAnalyzer instead.")
class EnhancedPostAnalyzer:
"""Enhanced analyzer with selective field analysis and in-place updates."""
def __init__(self, csv_file: str, analyze_fields: Optional[List[str]] = None):
"""
Initialize analyzer.
Args:
csv_file: Path to input CSV
analyze_fields: List of fields to analyze ['title', 'meta_description', 'categories', 'site']
"""
self.csv_file = Path(csv_file)
self.openrouter_api_key = Config.OPENROUTER_API_KEY
self.ai_model = Config.AI_MODEL
self.posts = []
self.analyzed_posts = []
self.api_calls = 0
self.ai_cost = 0.0
if analyze_fields is None:
self.analyze_fields = ['title', 'meta_description', 'categories', 'site']
else:
self.analyze_fields = analyze_fields
logger.info(f"Fields to analyze: {', '.join(self.analyze_fields)}")
def load_csv(self) -> bool:
"""Load posts from CSV file."""
logger.info(f"Loading CSV: {self.csv_file}")
if not self.csv_file.exists():
logger.error(f"CSV file not found: {self.csv_file}")
return False
try:
with open(self.csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
self.posts = list(reader)
logger.info(f"✓ Loaded {len(self.posts)} posts from CSV")
return True
except Exception as e:
logger.error(f"Error loading CSV: {e}")
return False
def get_ai_recommendations(self, batch: List[Dict], fields: List[str]) -> Optional[str]:
"""Get AI recommendations for specific fields."""
if not self.openrouter_api_key:
logger.error("OPENROUTER_API_KEY not set")
return None
# Format posts for AI
formatted_posts = []
for i, post in enumerate(batch, 1):
post_text = f"{i}. POST ID: {post['post_id']}\n"
post_text += f" Site: {post.get('site', '')}\n"
if 'title' in fields:
post_text += f" Title: {post.get('title', '')}\n"
if 'meta_description' in fields:
post_text += f" Meta Description: {post.get('meta_description', '')}\n"
if 'categories' in fields:
post_text += f" Categories: {post.get('categories', '')}\n"
if 'content_preview' in post:
post_text += f" Content Preview: {post.get('content_preview', '')[:300]}...\n"
formatted_posts.append(post_text)
posts_text = "\n".join(formatted_posts)
# Build prompt based on requested fields
prompt_parts = ["Analyze these blog posts and provide recommendations.\n\n"]
if 'site' in fields:
prompt_parts.append("""Website Strategy:
- mistergeek.net: High-value topics (VPN, Software, Gaming, General Tech, SEO, Content Marketing)
- webscroll.fr: Torrenting, File-Sharing, Tracker guides
- hellogeek.net: Low-traffic, experimental, off-brand content
""")
prompt_parts.append(posts_text)
prompt_parts.append("\nFor EACH post, provide a JSON object with:\n{\n")
if 'title' in fields:
prompt_parts.append(' "proposed_title": "<Improved SEO title>",\n')
prompt_parts.append(' "title_reason": "<Reason for title change>",\n')
if 'meta_description' in fields:
prompt_parts.append(' "proposed_meta_description": "<Improved meta description (120-160 chars)>",\n')
prompt_parts.append(' "meta_reason": "<Reason for meta description change>",\n')
if 'categories' in fields:
prompt_parts.append(' "proposed_category": "<Best category>",\n')
prompt_parts.append(' "category_reason": "<Reason for category change>",\n')
if 'site' in fields:
prompt_parts.append(' "proposed_site": "<Best site for this post>",\n')
prompt_parts.append(' "site_reason": "<Reason for site recommendation>",\n')
prompt_parts.append(' "confidence": "<High|Medium|Low>",\n')
prompt_parts.append(' "priority": "<High|Medium|Low>"\n}')
prompt_parts.append("\nReturn ONLY a JSON array of objects, one per post.")
prompt = "".join(prompt_parts)
try:
logger.info(f" Sending batch to AI for analysis...")
response = requests.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {self.openrouter_api_key}",
"Content-Type": "application/json",
},
json={
"model": self.ai_model,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3,
},
timeout=60
)
response.raise_for_status()
result = response.json()
self.api_calls += 1
usage = result.get('usage', {})
input_tokens = usage.get('prompt_tokens', 0)
output_tokens = usage.get('completion_tokens', 0)
self.ai_cost += (input_tokens * 3 + output_tokens * 15) / 1_000_000
recommendations_text = result['choices'][0]['message']['content'].strip()
logger.info(f" ✓ Got recommendations (tokens: {input_tokens}+{output_tokens})")
return recommendations_text
except Exception as e:
logger.error(f"Error getting AI recommendations: {e}")
return None
def parse_recommendations(self, recommendations_json: str) -> List[Dict]:
"""Parse JSON recommendations from AI."""
try:
start_idx = recommendations_json.find('[')
end_idx = recommendations_json.rfind(']') + 1
if start_idx == -1 or end_idx == 0:
logger.error("Could not find JSON array in response")
return []
json_str = recommendations_json[start_idx:end_idx]
recommendations = json.loads(json_str)
return recommendations
except json.JSONDecodeError as e:
logger.error(f"Error parsing JSON recommendations: {e}")
return []
def analyze_posts(self, batch_size: int = 10) -> bool:
"""Analyze all posts in batches."""
logger.info("\n" + "="*70)
logger.info("ANALYZING POSTS WITH AI")
logger.info("="*70 + "\n")
batches = [self.posts[i:i + batch_size] for i in range(0, len(self.posts), batch_size)]
logger.info(f"Processing {len(self.posts)} posts in {len(batches)} batches...\n")
all_recommendations = {}
for batch_num, batch in enumerate(batches, 1):
logger.info(f"Batch {batch_num}/{len(batches)}: Analyzing {len(batch)} posts...")
recommendations_json = self.get_ai_recommendations(batch, self.analyze_fields)
if not recommendations_json:
logger.error(f" Failed to get recommendations for batch {batch_num}")
continue
recommendations = self.parse_recommendations(recommendations_json)
for rec in recommendations:
all_recommendations[str(rec.get('post_id', ''))] = rec
logger.info(f" ✓ Got {len(recommendations)} recommendations")
logger.info(f"\n✓ Analysis complete!")
logger.info(f" Total recommendations: {len(all_recommendations)}")
logger.info(f" API calls: {self.api_calls}")
logger.info(f" Estimated cost: ${self.ai_cost:.4f}")
# Map recommendations to posts
for post in self.posts:
post_id = str(post['post_id'])
if post_id in all_recommendations:
rec = all_recommendations[post_id]
# Add only requested fields
if 'title' in self.analyze_fields:
post['proposed_title'] = rec.get('proposed_title', post.get('title', ''))
post['title_reason'] = rec.get('title_reason', '')
if 'meta_description' in self.analyze_fields:
post['proposed_meta_description'] = rec.get('proposed_meta_description', post.get('meta_description', ''))
post['meta_reason'] = rec.get('meta_reason', '')
if 'categories' in self.analyze_fields:
post['proposed_category'] = rec.get('proposed_category', post.get('categories', ''))
post['category_reason'] = rec.get('category_reason', '')
if 'site' in self.analyze_fields:
post['proposed_site'] = rec.get('proposed_site', post.get('site', ''))
post['site_reason'] = rec.get('site_reason', '')
post['ai_confidence'] = rec.get('confidence', 'Medium')
post['ai_priority'] = rec.get('priority', 'Medium')
else:
if 'title' in self.analyze_fields:
post['proposed_title'] = post.get('title', '')
post['title_reason'] = 'No AI recommendation'
if 'meta_description' in self.analyze_fields:
post['proposed_meta_description'] = post.get('meta_description', '')
post['meta_reason'] = 'No AI recommendation'
if 'categories' in self.analyze_fields:
post['proposed_category'] = post.get('categories', '')
post['category_reason'] = 'No AI recommendation'
if 'site' in self.analyze_fields:
post['proposed_site'] = post.get('site', '')
post['site_reason'] = 'No AI recommendation'
post['ai_confidence'] = 'Unknown'
post['ai_priority'] = 'Medium'
self.analyzed_posts.append(post)
return len(self.analyzed_posts) > 0
def export_results(self, output_file: Optional[str] = None, update_input: bool = False) -> str:
"""Export results to CSV."""
if update_input:
backup_file = self.csv_file.parent / f"{self.csv_file.stem}_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
shutil.copy2(self.csv_file, backup_file)
logger.info(f"✓ Created backup: {backup_file}")
output_file = self.csv_file
elif not output_file:
output_dir = Path(__file__).parent.parent.parent / 'output'
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = output_dir / f'analyzed_posts_{timestamp}.csv'
output_file = Path(output_file)
output_file.parent.mkdir(parents=True, exist_ok=True)
if not self.analyzed_posts:
logger.error("No analyzed posts to export")
return ""
original_fields = list(self.analyzed_posts[0].keys())
new_fields = []
if 'title' in self.analyze_fields:
new_fields.extend(['proposed_title', 'title_reason'])
if 'meta_description' in self.analyze_fields:
new_fields.extend(['proposed_meta_description', 'meta_reason'])
if 'categories' in self.analyze_fields:
new_fields.extend(['proposed_category', 'category_reason'])
if 'site' in self.analyze_fields:
new_fields.extend(['proposed_site', 'site_reason'])
new_fields.extend(['ai_confidence', 'ai_priority'])
fieldnames = original_fields + new_fields
logger.info(f"\nExporting results to: {output_file}")
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(self.analyzed_posts)
logger.info(f"✓ Exported {len(self.analyzed_posts)} posts")
return str(output_file)
def run(self, output_file: Optional[str] = None, update_input: bool = False, batch_size: int = 10) -> str:
"""Run complete analysis."""
if not self.load_csv():
return ""
if not self.analyze_posts(batch_size=batch_size):
logger.error("Failed to analyze posts")
return ""
return self.export_results(output_file=output_file, update_input=update_input)

View File

@@ -8,11 +8,8 @@ from datetime import datetime
from typing import Optional, List
from .exporter import PostExporter
from .analyzer import PostAnalyzer
from .recategorizer import PostRecategorizer
from .seo_checker import MultiSiteSEOAnalyzer
from .categories import CategoryManager
from .approval import UserApprovalSystem
from .analyzer import EnhancedPostAnalyzer
from .category_proposer import CategoryProposer
logger = logging.getLogger(__name__)
@@ -22,70 +19,38 @@ class SEOApp:
Main SEO Application class.
Provides a unified interface for all SEO automation tasks.
Inspired by Ruby on Rails' Active Record pattern.
Usage:
app = SEOApp()
app.export()
app.analyze()
app.seo_check()
"""
def __init__(self, verbose: bool = False):
"""
Initialize the SEO application.
Args:
verbose: Enable verbose logging
"""
"""Initialize the SEO application."""
self.verbose = verbose
self.output_dir = Path(__file__).parent.parent.parent / 'output'
self.output_dir.mkdir(parents=True, exist_ok=True)
# Initialize components
self.exporter = None
self.analyzer = None
self.recategorizer = None
self.seo_checker = None
self.category_manager = None
self.approval_system = None
if verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
def export(self) -> str:
"""
Export all posts from WordPress sites.
Returns:
Path to exported CSV file
"""
"""Export all posts from WordPress sites."""
logger.info("📦 Exporting all posts from WordPress sites...")
self.exporter = PostExporter()
self.exporter.run()
# Get the exported file path
date_str = datetime.now().strftime('%Y-%m-%d')
csv_file = self.output_dir / f'all_posts_{date_str}.csv'
logger.info(f"✅ Export completed: {csv_file}")
return str(csv_file)
exporter = PostExporter()
return exporter.run()
def analyze(self, csv_file: Optional[str] = None) -> str:
def analyze(self, csv_file: Optional[str] = None, fields: Optional[List[str]] = None,
update: bool = False, output: Optional[str] = None) -> str:
"""
Analyze posts with AI for recommendations.
Args:
csv_file: Path to CSV file (uses latest export if not provided)
Returns:
Path to analysis results
fields: Fields to analyze ['title', 'meta_description', 'categories', 'site']
update: If True, update input CSV (creates backup)
output: Custom output file path
"""
logger.info("🤖 Analyzing posts with AI for recommendations...")
# Find CSV file
if not csv_file:
csv_file = self._find_latest_export()
@@ -94,26 +59,13 @@ class SEOApp:
logger.info(f"Using file: {csv_file}")
# Run analysis
self.analyzer = PostAnalyzer(csv_file)
self.analyzer.run()
logger.info("✅ AI analysis completed!")
return csv_file
analyzer = EnhancedPostAnalyzer(csv_file, analyze_fields=fields)
return analyzer.run(output_file=output, update_input=update)
def recategorize(self, csv_file: Optional[str] = None) -> str:
"""
Recategorize posts with AI suggestions.
def category_propose(self, csv_file: Optional[str] = None, output: Optional[str] = None) -> str:
"""Propose categories for posts."""
logger.info("🏷️ Proposing categories with AI...")
Args:
csv_file: Path to CSV file (uses latest export if not provided)
Returns:
Path to recategorization results
"""
logger.info("🏷️ Recategorizing posts with AI suggestions...")
# Find CSV file
if not csv_file:
csv_file = self._find_latest_export()
@@ -122,122 +74,11 @@ class SEOApp:
logger.info(f"Using file: {csv_file}")
# Run recategorization
self.recategorizer = PostRecategorizer(csv_file)
self.recategorizer.run()
logger.info("✅ Recategorization completed!")
return csv_file
def seo_check(self, top_n: int = 10) -> None:
"""
Check SEO quality of titles and descriptions.
Args:
top_n: Number of top posts to get AI recommendations for
"""
logger.info("🔍 Checking SEO quality of titles/descriptions...")
self.seo_checker = MultiSiteSEOAnalyzer()
self.seo_checker.run(use_ai=True, top_n=top_n)
logger.info("✅ SEO check completed!")
def categories(self) -> None:
"""Manage categories across all sites."""
logger.info("🗂️ Managing categories across all sites...")
self.category_manager = CategoryManager()
self.category_manager.run()
logger.info("✅ Category management completed!")
def approve(self, files: Optional[List[str]] = None) -> None:
"""
Review and approve recommendations.
Args:
files: List of CSV files to review (auto-detects if not provided)
"""
logger.info("✅ Reviewing and approving recommendations...")
self.approval_system = UserApprovalSystem()
if not files:
# Auto-detect recommendation files
files = self._find_recommendation_files()
if not files:
raise FileNotFoundError("No recommendation files found. Run analyze() or categories() first.")
logger.info(f"Found {len(files)} recommendation files to review")
self.approval_system.run_interactive_approval(files)
logger.info("✅ Approval process completed!")
def full_pipeline(self) -> None:
"""
Run complete workflow: export → analyze → seo_check
"""
logger.info("🚀 Running full SEO automation pipeline...")
# Step 1: Export
logger.info("\n📦 Step 1/3: Exporting posts...")
self.export()
# Step 2: Analyze
logger.info("\n🤖 Step 2/3: Analyzing with AI...")
self.analyze()
# Step 3: SEO Check
logger.info("\n🔍 Step 3/3: Checking SEO quality...")
self.seo_check()
logger.info("\n✅ Full pipeline completed!")
def _find_latest_export(self) -> Optional[str]:
"""
Find the latest exported CSV file.
Returns:
Path to latest CSV file or None if not found
"""
csv_files = list(self.output_dir.glob('all_posts_*.csv'))
if not csv_files:
return None
latest = max(csv_files, key=lambda f: f.stat().st_ctime)
return str(latest)
def _find_recommendation_files(self) -> List[str]:
"""
Find recommendation files in output directory.
Returns:
List of paths to recommendation files
"""
patterns = [
'category_assignments_*.csv',
'posts_with_ai_recommendations_*.csv',
'posts_to_move_*.csv',
'posts_to_consolidate_*.csv',
'posts_to_delete_*.csv'
]
files = []
for pattern in patterns:
files.extend(self.output_dir.glob(pattern))
return [str(f) for f in files]
proposer = CategoryProposer(csv_file)
return proposer.run(output_file=output)
def status(self) -> dict:
"""
Get status of output files.
Returns:
Dictionary with file information
"""
"""Get status of output files."""
files = list(self.output_dir.glob('*.csv'))
status_info = {
@@ -253,3 +94,13 @@ class SEOApp:
})
return status_info
def _find_latest_export(self) -> Optional[str]:
"""Find the latest exported CSV file."""
csv_files = list(self.output_dir.glob('all_posts_*.csv'))
if not csv_files:
return None
latest = max(csv_files, key=lambda f: f.stat().st_ctime)
return str(latest)

View File

@@ -1,15 +1,18 @@
"""
Approval System Module - User approval for recommendations
Placeholder for future implementation.
"""
import sys
from pathlib import Path
import logging
# Import from scripts directory (parent of src)
scripts_dir = Path(__file__).parents[2] / 'scripts'
if str(scripts_dir) not in sys.path:
sys.path.insert(0, str(scripts_dir))
logger = logging.getLogger(__name__)
from user_approval import UserApprovalSystem
__all__ = ['UserApprovalSystem']
class UserApprovalSystem:
"""User approval system (placeholder)."""
def __init__(self):
logger.warning("UserApprovalSystem is a placeholder. Implement full functionality as needed.")
def run_interactive_approval(self, files):
logger.info("Approval system not yet implemented in integrated package.")

View File

@@ -1,15 +1,18 @@
"""
Category Manager Module - Category management across sites
Placeholder for future implementation.
"""
import sys
from pathlib import Path
import logging
# Import from scripts directory (parent of src)
scripts_dir = Path(__file__).parents[2] / 'scripts'
if str(scripts_dir) not in sys.path:
sys.path.insert(0, str(scripts_dir))
logger = logging.getLogger(__name__)
from category_manager import CategoryManager
__all__ = ['CategoryManager']
class CategoryManager:
"""Category manager (placeholder)."""
def __init__(self):
logger.warning("CategoryManager is a placeholder. Implement full functionality as needed.")
def run(self):
logger.info("Category management not yet implemented in integrated package.")

View File

@@ -0,0 +1,212 @@
"""
Category Proposer - AI-powered category suggestions
"""
import csv
import json
import logging
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional
import requests
from .config import Config
logger = logging.getLogger(__name__)
class CategoryProposer:
"""Propose categories for posts using AI."""
def __init__(self, csv_file: str):
"""Initialize proposer with CSV file."""
self.csv_file = Path(csv_file)
self.openrouter_api_key = Config.OPENROUTER_API_KEY
self.ai_model = Config.AI_MODEL
self.posts = []
self.proposed_categories = []
self.api_calls = 0
self.ai_cost = 0.0
def load_csv(self) -> bool:
"""Load posts from CSV."""
logger.info(f"Loading CSV: {self.csv_file}")
if not self.csv_file.exists():
logger.error(f"CSV file not found: {self.csv_file}")
return False
try:
with open(self.csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
self.posts = list(reader)
logger.info(f"✓ Loaded {len(self.posts)} posts")
return True
except Exception as e:
logger.error(f"Error loading CSV: {e}")
return False
def get_category_proposals(self, batch: List[Dict]) -> Optional[str]:
"""Get AI category proposals for a batch of posts."""
if not self.openrouter_api_key:
logger.error("OPENROUTER_API_KEY not set")
return None
formatted = []
for i, post in enumerate(batch, 1):
text = f"{i}. ID: {post['post_id']}\n"
text += f" Title: {post.get('title', '')}\n"
text += f" Current Categories: {post.get('categories', '')}\n"
if 'content_preview' in post:
text += f" Content: {post['content_preview'][:300]}...\n"
formatted.append(text)
posts_text = "\n".join(formatted)
prompt = f"""Analyze these blog posts and propose optimal categories.
{posts_text}
For EACH post, provide:
{{
"post_id": <id>,
"current_categories": "<current>",
"proposed_category": "<best category>",
"alternative_categories": ["<alt1>", "<alt2>"],
"reason": "<brief explanation>",
"confidence": "<High|Medium|Low>"
}}
Return ONLY a JSON array with one object per post."""
try:
logger.info(f" Getting category proposals...")
response = requests.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {self.openrouter_api_key}",
"Content-Type": "application/json",
},
json={
"model": self.ai_model,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3,
},
timeout=60
)
response.raise_for_status()
result = response.json()
self.api_calls += 1
usage = result.get('usage', {})
input_tokens = usage.get('prompt_tokens', 0)
output_tokens = usage.get('completion_tokens', 0)
self.ai_cost += (input_tokens * 3 + output_tokens * 15) / 1_000_000
logger.info(f" ✓ Got proposals (tokens: {input_tokens}+{output_tokens})")
return result['choices'][0]['message']['content'].strip()
except Exception as e:
logger.error(f"Error getting proposals: {e}")
return None
def parse_proposals(self, proposals_json: str) -> List[Dict]:
"""Parse JSON proposals."""
try:
start_idx = proposals_json.find('[')
end_idx = proposals_json.rfind(']') + 1
if start_idx == -1 or end_idx == 0:
return []
return json.loads(proposals_json[start_idx:end_idx])
except json.JSONDecodeError:
return []
def propose_categories(self, batch_size: int = 10) -> bool:
"""Propose categories for all posts."""
logger.info("\n" + "="*70)
logger.info("PROPOSING CATEGORIES WITH AI")
logger.info("="*70 + "\n")
batches = [self.posts[i:i + batch_size] for i in range(0, len(self.posts), batch_size)]
logger.info(f"Processing {len(self.posts)} posts in {len(batches)} batches...\n")
all_proposals = {}
for batch_num, batch in enumerate(batches, 1):
logger.info(f"Batch {batch_num}/{len(batches)}...")
proposals_json = self.get_category_proposals(batch)
if not proposals_json:
continue
proposals = self.parse_proposals(proposals_json)
for prop in proposals:
all_proposals[str(prop.get('post_id', ''))] = prop
logger.info(f" ✓ Got {len(proposals)} proposals")
logger.info(f"\n✓ Proposals complete!")
logger.info(f" Total: {len(all_proposals)}")
logger.info(f" API calls: {self.api_calls}")
logger.info(f" Cost: ${self.ai_cost:.4f}")
for post in self.posts:
post_id = str(post['post_id'])
proposal = all_proposals.get(post_id, {})
self.proposed_categories.append({
**post,
'proposed_category': proposal.get('proposed_category', post.get('categories', '')),
'alternative_categories': ', '.join(proposal.get('alternative_categories', [])),
'category_reason': proposal.get('reason', ''),
'category_confidence': proposal.get('confidence', 'Medium'),
'current_categories': post.get('categories', '')
})
return True
def export_proposals(self, output_file: Optional[str] = None) -> str:
"""Export category proposals to CSV."""
if not output_file:
output_dir = Path(__file__).parent.parent.parent / 'output'
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = output_dir / f'category_proposals_{timestamp}.csv'
output_file = Path(output_file)
output_file.parent.mkdir(parents=True, exist_ok=True)
fieldnames = [
'post_id', 'title', 'site', 'current_categories',
'proposed_category', 'alternative_categories',
'category_reason', 'category_confidence'
]
logger.info(f"\nExporting to: {output_file}")
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore')
writer.writeheader()
writer.writerows(self.proposed_categories)
logger.info(f"✓ Exported {len(self.proposed_categories)} proposals")
return str(output_file)
def run(self, output_file: Optional[str] = None, batch_size: int = 10) -> str:
"""Run complete category proposal process."""
if not self.load_csv():
return ""
if not self.propose_categories(batch_size=batch_size):
logger.error("Failed to propose categories")
return ""
return self.export_proposals(output_file)

View File

@@ -26,12 +26,9 @@ def main():
Examples:
seo export Export all posts from WordPress sites
seo analyze Analyze posts with AI for recommendations
seo analyze posts.csv Analyze specific CSV file
seo recategorize Recategorize posts with AI
seo seo_check Check SEO quality of titles/descriptions
seo categories Manage categories across sites
seo approve Review and approve recommendations
seo full_pipeline Run complete workflow: export → analyze → seo_check
seo analyze -f title Analyze only titles
seo analyze -u -f meta Update CSV with meta descriptions
seo category_propose Propose categories based on content
seo status Show output files status
"""
)
@@ -40,11 +37,10 @@ Examples:
parser.add_argument('args', nargs='*', help='Arguments for the command')
parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')
parser.add_argument('--dry-run', action='store_true', help='Show what would be done')
parser.add_argument('--top-n', type=int, default=10, help='Number of top posts for AI analysis')
parser.add_argument('--fields', '-f', nargs='+',
choices=['title', 'meta_description', 'categories', 'site'],
help='Fields to analyze (for analyze command)')
parser.add_argument('--update', '-u', action='store_true', help='Update input file (creates backup)')
help='Fields to analyze')
parser.add_argument('--update', '-u', action='store_true', help='Update input file')
parser.add_argument('--output', '-o', help='Output file path')
args = parser.parse_args()
@@ -67,12 +63,7 @@ Examples:
commands = {
'export': cmd_export,
'analyze': cmd_analyze,
'recategorize': cmd_recategorize,
'seo_check': cmd_seo_check,
'categories': cmd_categories,
'category_propose': cmd_category_propose,
'approve': cmd_approve,
'full_pipeline': cmd_full_pipeline,
'status': cmd_status,
'help': cmd_help,
}
@@ -117,63 +108,19 @@ def cmd_analyze(app, args):
csv_file = args.args[0] if args.args else None
# Use enhanced analyzer if fields are specified or update flag is set
if args.fields or args.update:
from pathlib import Path
import sys
scripts_dir = Path(__file__).parent.parent.parent / 'scripts'
sys.path.insert(0, str(scripts_dir))
from enhanced_analyzer import EnhancedPostAnalyzer
if not csv_file:
csv_file = app._find_latest_export()
if not csv_file:
print("❌ No CSV file found. Provide one or run export first.")
return 1
print(f"Using enhanced analyzer with fields: {args.fields or 'all'}")
analyzer = EnhancedPostAnalyzer(csv_file, analyze_fields=args.fields)
output_file = analyzer.run(
output_file=args.output,
update_input=args.update
)
print(f"✅ Analysis completed! Results: {output_file}")
else:
app.analyze(csv_file)
print(f"Analyzing with fields: {args.fields or 'all'}")
if args.update:
print(f"Will update input CSV (backup will be created)")
return 0
def cmd_recategorize(app, args):
"""Recategorize posts with AI."""
if args.dry_run:
print("Would recategorize posts with AI suggestions")
return 0
result = app.analyze(
csv_file=csv_file,
fields=args.fields,
update=args.update,
output=args.output
)
csv_file = args.args[0] if args.args else None
app.recategorize(csv_file)
return 0
def cmd_seo_check(app, args):
"""Check SEO quality."""
if args.dry_run:
print("Would check SEO quality of titles/descriptions")
return 0
app.seo_check(top_n=args.top_n)
return 0
def cmd_categories(app, args):
"""Manage categories."""
if args.dry_run:
print("Would manage categories across all sites")
return 0
app.categories()
if result:
print(f"✅ Analysis completed! Results: {result}")
return 0
@@ -185,47 +132,10 @@ def cmd_category_propose(app, args):
csv_file = args.args[0] if args.args else None
if not csv_file:
csv_file = app._find_latest_export()
result = app.category_propose(csv_file=csv_file, output=args.output)
if not csv_file:
print("❌ No CSV file found. Provide one or run export first.")
print(" Usage: seo category_propose <csv_file>")
return 1
from pathlib import Path
import sys
scripts_dir = Path(__file__).parent.parent.parent / 'scripts'
sys.path.insert(0, str(scripts_dir))
from category_proposer import CategoryProposer
print(f"Proposing categories for: {csv_file}")
proposer = CategoryProposer(csv_file)
output_file = proposer.run(output_file=args.output)
print(f"✅ Category proposals saved to: {output_file}")
return 0
def cmd_approve(app, args):
"""Approve recommendations."""
if args.dry_run:
print("Would review and approve recommendations")
return 0
files = args.args if args.args else None
app.approve(files)
return 0
def cmd_full_pipeline(app, args):
"""Run full pipeline."""
if args.dry_run:
print("Would run full pipeline: export → analyze → seo_check")
return 0
app.full_pipeline()
if result:
print(f"✅ Category proposals saved to: {result}")
return 0
@@ -256,23 +166,15 @@ SEO Automation CLI - Available Commands
Basic Commands:
export Export all posts from WordPress sites
analyze [csv_file] Analyze posts with AI
analyze -f title categories Analyze specific fields only
analyze -u Update input CSV with new columns
recategorize [csv_file] Recategorize posts with AI
seo_check Check SEO quality of titles/descriptions
categories Manage categories across sites
analyze -f title Analyze specific fields (title, meta_description, categories, site)
analyze -u Update input CSV with new columns (creates backup)
category_propose [csv] Propose categories based on content
approve [files...] Review and approve recommendations
full_pipeline Run complete workflow: export → analyze → seo_check
Utility:
status Show output files status
help Show this help message
Options:
--verbose, -v Enable verbose logging
--dry-run Show what would be done without doing it
--top-n N Number of top posts for AI analysis (default: 10)
--fields, -f Fields to analyze: title, meta_description, categories, site
--update, -u Update input CSV file (creates backup)
--output, -o Output file path
@@ -284,8 +186,6 @@ Examples:
seo analyze -f title categories
seo analyze -u -f meta_description
seo category_propose
seo approve output/category_proposals_*.csv
seo full_pipeline
seo status
""")
return 0

View File

@@ -1,16 +1,16 @@
"""
Post Exporter Module - Export posts from WordPress sites
Post Exporter - Export posts from WordPress sites
"""
import csv
import logging
import time
import re
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional
import requests
from requests.auth import HTTPBasicAuth
import re
from .config import Config
@@ -26,7 +26,7 @@ class PostExporter:
self.all_posts = []
self.category_cache = {}
def fetch_category_names(self, site_name: str, site_config: Dict) -> Dict[int, str]:
def fetch_category_names(self, site_name: str, site_config: Dict) -> Dict[int, Dict]:
"""Fetch category names from a WordPress site."""
if site_name in self.category_cache:
return self.category_cache[site_name]
@@ -61,8 +61,6 @@ class PostExporter:
for status in ['publish', 'draft']:
page = 1
status_count = 0
while True:
try:
logger.info(f" Fetching page {page} ({status} posts)...")
@@ -79,19 +77,16 @@ class PostExporter:
break
posts.extend(page_posts)
status_count += len(page_posts)
logger.info(f" ✓ Got {len(page_posts)} posts (total: {len(posts)})")
logger.info(f" ✓ Got {len(page_posts)} posts")
page += 1
time.sleep(0.5)
except requests.exceptions.HTTPError as e:
if response.status_code == 400:
logger.info(f" API limit reached (got {status_count} {status} posts)")
break
else:
logger.error(f"Error on page {page}: {e}")
break
logger.error(f"Error on page {page}: {e}")
break
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching from {site_name}: {e}")
break
@@ -160,7 +155,7 @@ class PostExporter:
if not self.all_posts:
logger.error("No posts to export")
return None
return ""
fieldnames = [
'site', 'post_id', 'status', 'title', 'slug', 'url', 'author_id',
@@ -178,10 +173,10 @@ class PostExporter:
logger.info(f"✓ CSV exported to: {output_file}")
return str(output_file)
def run(self):
def run(self) -> str:
"""Run the complete export process."""
logger.info("="*70)
logger.info("EXPORTING ALL POSTS FOR AI DECISION MAKING")
logger.info("EXPORTING ALL POSTS")
logger.info("="*70)
logger.info("Sites configured: " + ", ".join(self.sites.keys()))
@@ -196,31 +191,7 @@ class PostExporter:
if not self.all_posts:
logger.error("No posts found on any site")
return
return ""
self.all_posts.sort(key=lambda x: (x['site'], x['post_id']))
self.export_to_csv()
# Print summary
logger.info("\n" + "="*70)
logger.info("EXPORT SUMMARY")
logger.info("="*70)
by_site = {}
for post in self.all_posts:
site = post['site']
if site not in by_site:
by_site[site] = {'total': 0, 'published': 0, 'draft': 0}
by_site[site]['total'] += 1
if post['status'] == 'publish':
by_site[site]['published'] += 1
else:
by_site[site]['draft'] += 1
for site, stats in sorted(by_site.items()):
logger.info(f"\n{site}:")
logger.info(f" Total: {stats['total']}")
logger.info(f" Published: {stats['published']}")
logger.info(f" Drafts: {stats['draft']}")
logger.info(f"\n✓ Export complete!")
return self.export_to_csv()

View File

@@ -1,15 +1,19 @@
"""
Recategorizer Module - AI-powered post recategorization
Placeholder for future implementation.
"""
import sys
from pathlib import Path
import logging
# Import from scripts directory (parent of src)
scripts_dir = Path(__file__).parents[2] / 'scripts'
if str(scripts_dir) not in sys.path:
sys.path.insert(0, str(scripts_dir))
logger = logging.getLogger(__name__)
from ai_recategorize_posts import PostRecategorizer
__all__ = ['PostRecategorizer']
class PostRecategorizer:
"""Post recategorizer (placeholder)."""
def __init__(self, csv_file):
self.csv_file = csv_file
logger.warning("PostRecategorizer is a placeholder. Implement full functionality as needed.")
def run(self):
logger.info("Recategorization not yet implemented in integrated package.")

View File

@@ -1,15 +1,18 @@
"""
SEO Checker Module - SEO quality analysis
Placeholder for future implementation.
"""
import sys
from pathlib import Path
import logging
# Import from scripts directory (parent of src)
scripts_dir = Path(__file__).parents[2] / 'scripts'
if str(scripts_dir) not in sys.path:
sys.path.insert(0, str(scripts_dir))
logger = logging.getLogger(__name__)
from multi_site_seo_analyzer import MultiSiteSEOAnalyzer
__all__ = ['MultiSiteSEOAnalyzer']
class MultiSiteSEOAnalyzer:
"""SEO quality analyzer (placeholder)."""
def __init__(self):
logger.warning("MultiSiteSEOAnalyzer is a placeholder. Implement full functionality as needed.")
def run(self, use_ai=True, top_n=10):
logger.info("SEO check not yet implemented in integrated package.")