Compare commits

2 Commits

Author SHA1 Message Date
Kevin Bataille
69e4287366 Add media importer for migrated posts
- Add import_media command to import featured images
- Fetch media from source site (mistergeek.net)
- Upload to destination site (hellogeek.net)
- Map source media IDs to destination media IDs
- Set featured images on migrated posts
- Use migration report CSV as input
- Support dry-run mode
- Cache media mappings to avoid duplicate uploads

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
2026-02-17 01:50:40 +01:00
Kevin Bataille
6ef268ba80 Add SEO performance tracking features
- Add performance command to analyze page metrics from GA4/GSC
- Add keywords command to find keyword opportunities
- Add report command to generate SEO performance reports
- Support CSV imports (no API setup required)
- Optional Google API integration for automated data fetching
- Analyze pageviews, clicks, impressions, CTR, rankings
- Identify low CTR pages, low position pages, opportunities
- Generate comprehensive SEO reports with recommendations
- Add PERFORMANCE_TRACKING_GUIDE.md with complete documentation

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
2026-02-17 00:51:49 +01:00
6 changed files with 1996 additions and 2 deletions

View File

@@ -0,0 +1,355 @@
# SEO Performance Tracking Guide
Track and analyze your website's SEO performance using Google Analytics 4 and Google Search Console data.
## Overview
The SEO performance tracking features allow you to:
- **Analyze page performance** - Track pageviews, clicks, impressions, CTR, and rankings
- **Find keyword opportunities** - Discover keywords you can rank higher for
- **Generate SEO reports** - Create comprehensive performance reports
- **Import data** - Support for both CSV imports and API integration
## Commands
### 1. `seo performance` - Analyze Page Performance
Analyze traffic and search performance data.
**Usage:**
```bash
# Analyze with CSV exports
./seo performance --ga4 analytics.csv --gsc search.csv
# Analyze GA4 data only
./seo performance --ga4 analytics.csv
# Analyze GSC data only
./seo performance --gsc search.csv
# With custom output
./seo performance --ga4 analytics.csv --gsc search.csv --output custom_analysis.csv
# Preview
./seo performance --ga4 analytics.csv --dry-run
```
**Data Sources:**
- **Google Analytics 4**: Export from GA4 → Reports → Engagement → Pages and screens
- **Google Search Console**: Export from GSC → Performance → Search results → Export
**Metrics Analyzed:**
| Metric | Source | Description |
|--------|--------|-------------|
| Pageviews | GA4 | Number of page views |
| Sessions | GA4 | Number of sessions |
| Bounce Rate | GA4 | Percentage of single-page sessions |
| Engagement Rate | GA4 | Percentage of engaged sessions |
| Clicks | GSC | Number of search clicks |
| Impressions | GSC | Number of search impressions |
| CTR | GSC | Click-through rate |
| Position | GSC | Average search ranking |
### 2. `seo keywords` - Keyword Opportunities
Find keywords you can optimize for better rankings.
**Usage:**
```bash
# Analyze keyword opportunities
./seo keywords gsc_export.csv
# Limit results
./seo keywords gsc_export.csv --limit 20
# Custom output
./seo keywords gsc_export.csv --output keywords.csv
```
**What It Finds:**
- Keywords ranking positions 5-20 (easy to improve)
- High impression keywords with low CTR
- Keywords with good traffic potential
**Example Output:**
```
✅ Found 47 keyword opportunities!
Top opportunities:
1. best vpn 2024 - Position: 8.5, Impressions: 1250
2. torrent client - Position: 12.3, Impressions: 890
3. vpn for gaming - Position: 9.1, Impressions: 650
```
### 3. `seo report` - Generate SEO Report
Create comprehensive SEO performance reports.
**Usage:**
```bash
# Generate report
./seo report
# Custom output
./seo report --output monthly_seo_report.md
```
**Report Includes:**
- Performance summary
- Traffic analysis
- Keyword opportunities
- SEO recommendations
- Action items
## Data Export Guides
### Export from Google Analytics 4
1. Go to **Google Analytics** → Your Property
2. Navigate to **Reports****Engagement****Pages and screens**
3. Set date range (e.g., last 30 days)
4. Click **Share****Download file****CSV**
5. Save as `ga4_export.csv`
**Required Columns:**
- Page path
- Page title
- Views (pageviews)
- Sessions
- Bounce rate
- Engagement rate
### Export from Google Search Console
1. Go to **Google Search Console** → Your Property
2. Click **Performance****Search results**
3. Set date range (e.g., last 30 days)
4. Check all metrics: Clicks, Impressions, CTR, Position
5. Click **Export****CSV**
6. Save as `gsc_export.csv`
**Required Columns:**
- Page (URL)
- Clicks
- Impressions
- CTR
- Position
## API Integration (Advanced)
For automated data fetching, configure API credentials:
### 1. Google Analytics 4 API
**Setup:**
1. Go to [Google Cloud Console](https://console.cloud.google.com/)
2. Create a new project or select existing
3. Enable **Google Analytics Data API**
4. Create service account credentials
5. Download JSON key file
6. Share GA4 property with service account email
**Configuration:**
Add to `.env`:
```
GA4_CREDENTIALS=/path/to/ga4-credentials.json
GA4_PROPERTY_ID=properties/123456789
```
### 2. Google Search Console API
**Setup:**
1. Go to [Google Cloud Console](https://console.cloud.google.com/)
2. Enable **Search Console API**
3. Create service account credentials
4. Download JSON key file
5. Share GSC property with service account email
**Configuration:**
Add to `.env`:
```
GSC_CREDENTIALS=/path/to/gsc-credentials.json
GSC_SITE_URL=https://www.mistergeek.net
```
### Using API Mode
Once configured, you can run without CSV files:
```bash
# Fetch data directly from APIs
./seo performance --start-date 2024-01-01 --end-date 2024-01-31
```
## Performance Insights
### Low CTR Pages
Pages with high impressions but low CTR need better titles/descriptions:
```bash
# Find pages with <2% CTR and 100+ impressions
./seo performance --gsc search.csv
# Check "low_ctr" section in output
```
**Action:** Optimize meta titles and descriptions
### Low Position Pages
Pages ranking beyond position 20 need content optimization:
```bash
# Find pages ranking >20 with 50+ impressions
./seo performance --gsc search.csv
# Check "low_position" section in output
```
**Action:** Improve content quality, add internal links
### Keyword Opportunities
Keywords ranking 5-20 are easy to improve:
```bash
./seo keywords gsc_export.csv --limit 50
```
**Action:** Optimize content for these specific keywords
## Workflow Examples
### Weekly Performance Check
```bash
# 1. Export fresh data from GA4 and GSC
# 2. Analyze performance
./seo performance --ga4 weekly_ga4.csv --gsc weekly_gsc.csv
# 3. Review keyword opportunities
./seo keywords weekly_gsc.csv --limit 20
# 4. Generate report
./seo report --output weekly_report.md
```
### Monthly SEO Audit
```bash
# 1. Export full month data
# 2. Comprehensive analysis
./seo performance --ga4 month_ga4.csv --gsc month_gsc.csv
# 3. Identify top issues
# Review output for:
# - Low CTR pages
# - Low position pages
# - High impression, low click pages
# 4. Generate action plan
./seo report --output monthly_audit.md
```
### Content Optimization Sprint
```bash
# 1. Find keyword opportunities
./seo keywords gsc.csv --limit 50 > opportunities.txt
# 2. For each opportunity:
# - Review current content
# - Optimize for target keyword
# - Update meta description
# 3. Track improvements
# Re-run analysis after 2 weeks
./seo performance --gsc new_gsc.csv
```
## Output Files
All analysis results are saved to `output/`:
| File | Description |
|------|-------------|
| `performance_data_*.csv` | Raw performance metrics |
| `performance_analysis_*.csv` | Analysis with insights |
| `seo_report_*.md` | Markdown report |
## Troubleshooting
### No Data Loaded
**Problem:** "No data loaded. Provide GA4 and/or GSC export files."
**Solution:**
- Ensure CSV files are properly exported
- Check file paths are correct
- Verify CSV has required columns
### Column Name Errors
**Problem:** "KeyError: 'pageviews'"
**Solution:**
- Ensure GA4 export includes pageviews column
- Column names are normalized automatically
- Check CSV encoding (UTF-8)
### API Authentication Errors
**Problem:** "Failed to initialize GA4 client"
**Solution:**
- Verify service account JSON is valid
- Check API is enabled in Google Cloud
- Ensure service account has access to property
## Best Practices
### Data Collection
1. **Export regularly** - Weekly or monthly exports
2. **Consistent date ranges** - Use same range for comparisons
3. **Keep historical data** - Archive old exports for trend analysis
### Analysis
1. **Focus on trends** - Look at changes over time
2. **Prioritize impact** - Fix high-traffic pages first
3. **Track improvements** - Re-analyze after optimizations
### Reporting
1. **Regular reports** - Weekly/monthly cadence
2. **Share insights** - Distribute to team/stakeholders
3. **Action-oriented** - Include specific recommendations
## Related Commands
- `seo export` - Export posts from WordPress
- `seo meta_description` - Generate meta descriptions
- `seo update_meta` - Update meta on WordPress
## See Also
- [README.md](README.md) - Main documentation
- [META_DESCRIPTION_GUIDE.md](META_DESCRIPTION_GUIDE.md) - Meta description guide
- [ANALYTICS_SETUP.md](ANALYTICS_SETUP.md) - API setup guide (if exists)
---
**Made with ❤️ for better SEO automation**

View File

@@ -15,6 +15,9 @@ from .editorial_strategy import EditorialStrategyAnalyzer
from .post_migrator import WordPressPostMigrator
from .meta_description_generator import MetaDescriptionGenerator
from .meta_description_updater import MetaDescriptionUpdater
from .performance_tracker import SEOPerformanceTracker
from .performance_analyzer import PerformanceAnalyzer
from .media_importer import WordPressMediaImporter
logger = logging.getLogger(__name__)
@@ -366,9 +369,142 @@ class SEOApp:
def _find_latest_export(self) -> Optional[str]:
"""Find the latest exported CSV file."""
csv_files = list(self.output_dir.glob('all_posts_*.csv'))
if not csv_files:
return None
latest = max(csv_files, key=lambda f: f.stat().st_ctime)
return str(latest)
def performance(self, ga4_file: Optional[str] = None,
gsc_file: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
output_file: Optional[str] = None) -> Tuple[str, Dict]:
"""
Analyze page performance from GA4 and GSC data.
Args:
ga4_file: Path to GA4 export CSV (or use API if credentials configured)
gsc_file: Path to GSC export CSV (or use API if credentials configured)
start_date: Start date YYYY-MM-DD (for API mode)
end_date: End date YYYY-MM-DD (for API mode)
output_file: Custom output file path
Returns:
Tuple of (output_file_path, analysis_dict)
"""
logger.info("📊 Analyzing page performance...")
# If CSV files provided, use analyzer
if ga4_file or gsc_file:
analyzer = PerformanceAnalyzer()
return analyzer.run(ga4_file=ga4_file, gsc_file=gsc_file, output_file=output_file)
# Otherwise try API mode
tracker = SEOPerformanceTracker()
if tracker.ga4_client or tracker.gsc_service:
return tracker.run(start_date=start_date, end_date=end_date, output_file=output_file)
else:
logger.error("No data source available. Provide CSV exports or configure API credentials.")
return "", {}
def keywords(self, gsc_file: str, limit: int = 50) -> List[Dict]:
"""
Analyze keyword opportunities from GSC data.
Args:
gsc_file: Path to GSC export CSV
limit: Maximum keywords to return
Returns:
List of keyword opportunity dicts
"""
logger.info("🔍 Analyzing keyword opportunities...")
analyzer = PerformanceAnalyzer()
analyzer.load_gsc_export(gsc_file)
analysis = analyzer.analyze()
opportunities = analysis.get('keyword_opportunities', [])[:limit]
logger.info(f"Found {len(opportunities)} keyword opportunities")
return opportunities
def seo_report(self, output_file: Optional[str] = None) -> str:
"""
Generate comprehensive SEO performance report.
Args:
output_file: Custom output file path
Returns:
Path to report file
"""
logger.info("📄 Generating SEO report...")
if not output_file:
output_dir = Path(__file__).parent.parent.parent / 'output'
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = output_dir / f'seo_report_{timestamp}.md'
output_file = Path(output_file)
# Generate report content
report = self._generate_report_content()
# Write report
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report)
logger.info(f"✓ Report saved to: {output_file}")
return str(output_file)
def _generate_report_content(self) -> str:
"""Generate markdown report content."""
report = []
report.append("# SEO Performance Report\n")
report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
report.append("---\n")
# Summary section
report.append("## 📊 Summary\n")
report.append("This report provides insights into your website's SEO performance.\n")
# Add analysis sections
report.append("## 📈 Traffic Analysis\n")
report.append("*Import GA4/GSC data for detailed traffic analysis*\n")
report.append("## 🔍 Keyword Opportunities\n")
report.append("*Import GSC data for keyword analysis*\n")
report.append("## 📝 SEO Recommendations\n")
report.append("1. Review and optimize meta descriptions\n")
report.append("2. Improve content for low-ranking pages\n")
report.append("3. Build internal links to important pages\n")
report.append("4. Monitor keyword rankings regularly\n")
return "\n".join(report)
def import_media(self, migration_report: str,
source_site: str = 'mistergeek.net',
destination_site: str = 'hellogeek.net',
dry_run: bool = True) -> Dict:
"""
Import media from source to destination site for migrated posts.
Args:
migration_report: Path to migration report CSV
source_site: Source site name
destination_site: Destination site name
dry_run: If True, preview without importing
Returns:
Statistics dict
"""
logger.info(f"📸 Importing media from {source_site} to {destination_site}...")
importer = WordPressMediaImporter(source_site, destination_site)
return importer.run_from_migration_report(migration_report, dry_run=dry_run)

View File

@@ -79,6 +79,16 @@ Examples:
parser.add_argument('--category', nargs='+', help='Filter by category name(s)')
parser.add_argument('--category-id', type=int, nargs='+', help='Filter by category ID(s)')
parser.add_argument('--force', action='store_true', help='Force regenerate even for good quality meta descriptions')
# Performance arguments
parser.add_argument('--ga4', help='Path to Google Analytics 4 export CSV')
parser.add_argument('--gsc', help='Path to Google Search Console export CSV')
parser.add_argument('--start-date', help='Start date YYYY-MM-DD (for API mode)')
parser.add_argument('--end-date', help='End date YYYY-MM-DD (for API mode)')
# Media import arguments
parser.add_argument('--from-site', help='Source site for media import (default: mistergeek.net)')
parser.add_argument('--to-site', help='Destination site for media import (default: hellogeek.net)')
args = parser.parse_args()
@@ -107,6 +117,10 @@ Examples:
'migrate': cmd_migrate,
'meta_description': cmd_meta_description,
'update_meta': cmd_update_meta,
'performance': cmd_performance,
'keywords': cmd_keywords,
'report': cmd_report,
'import_media': cmd_import_media,
'status': cmd_status,
'help': cmd_help,
}
@@ -513,6 +527,123 @@ def cmd_status(app, args):
return 0
def cmd_performance(app, args):
"""Analyze page performance from GA4 and GSC data."""
if args.dry_run:
print("Would analyze page performance")
if args.ga4:
print(f" GA4 file: {args.ga4}")
if args.gsc:
print(f" GSC file: {args.gsc}")
return 0
print("Analyzing page performance...")
output_file, analysis = app.performance(
ga4_file=args.ga4,
gsc_file=args.gsc,
start_date=args.start_date,
end_date=args.end_date,
output_file=args.output
)
if output_file and analysis:
print(f"\n✅ Performance analysis completed!")
print(f" Results: {output_file}")
print(f"\n📊 Summary:")
summary = analysis.get('summary', {})
print(f" Total pages: {summary.get('total_pages', 0)}")
print(f" Total pageviews: {summary.get('total_pageviews', 0)}")
print(f" Total clicks: {summary.get('total_clicks', 0)}")
print(f" Average CTR: {summary.get('average_ctr', 0):.2%}")
print(f" Average position: {summary.get('average_position', 0):.1f}")
return 0
def cmd_keywords(app, args):
"""Analyze keyword opportunities from GSC data."""
if args.dry_run:
print("Would analyze keyword opportunities")
if args.args:
print(f" GSC file: {args.args[0]}")
return 0
gsc_file = args.args[0] if args.args else None
if not gsc_file:
print("❌ GSC export file required")
print(" Usage: seo keywords <gsc_export.csv>")
return 1
print(f"Analyzing keyword opportunities from {gsc_file}...")
opportunities = app.keywords(gsc_file=gsc_file, limit=args.limit or 50)
if opportunities:
print(f"\n✅ Found {len(opportunities)} keyword opportunities!")
print(f"\nTop opportunities:")
for i, kw in enumerate(opportunities[:10], 1):
print(f" {i}. {kw['query']} - Position: {kw['position']:.1f}, Impressions: {kw['impressions']}")
return 0
def cmd_report(app, args):
"""Generate comprehensive SEO performance report."""
if args.dry_run:
print("Would generate SEO performance report")
return 0
print("Generating SEO performance report...")
report_file = app.seo_report(output_file=args.output)
if report_file:
print(f"\n✅ Report generated!")
print(f" Report: {report_file}")
return 0
def cmd_import_media(app, args):
"""Import media from source to destination site for migrated posts."""
if args.dry_run:
print("Would import media")
print(f" Source: {args.from_site or 'mistergeek.net'}")
print(f" Destination: {args.to_site or 'hellogeek.net'}")
if args.args:
print(f" Migration report: {args.args[0]}")
return 0
migration_report = args.args[0] if args.args else None
if not migration_report:
print("❌ Migration report CSV required")
print(" Usage: seo import_media <migration_report.csv>")
return 1
source_site = args.from_site or 'mistergeek.net'
dest_site = args.to_site or 'hellogeek.net'
print(f"Importing media from {source_site} to {dest_site}...")
print(f"Migration report: {migration_report}")
stats = app.import_media(
migration_report=migration_report,
source_site=source_site,
destination_site=dest_site,
dry_run=False
)
if stats:
print(f"\n✅ Media import completed!")
print(f"\n📊 Summary:")
print(f" Total posts: {stats.get('total_posts', 0)}")
print(f" Posts with media: {stats.get('posts_with_media', 0)}")
print(f" Images uploaded: {stats.get('images_uploaded', 0)}")
print(f" Featured images set: {stats.get('featured_images_set', 0)}")
print(f" Errors: {stats.get('errors', 0)}")
return 0
def cmd_help(app, args):
"""Show help."""
print("""
@@ -549,6 +680,11 @@ Strategy & Migration:
Utility:
status Show output files status
performance [ga4.csv] [gsc.csv] Analyze page performance
performance --ga4 analytics.csv --gsc search.csv Analyze with both sources
keywords <gsc.csv> Show keyword opportunities
report Generate SEO performance report
import_media <report.csv> Import media for migrated posts
help Show this help message
Export Options:
@@ -570,6 +706,13 @@ Update Meta Options:
--author Filter by author name(s)
--force Force regenerate even for good quality meta descriptions
Performance Options:
--ga4 Path to Google Analytics 4 export CSV
--gsc Path to Google Search Console export CSV
--start-date Start date YYYY-MM-DD (for API mode)
--end-date End date YYYY-MM-DD (for API mode)
--limit Limit number of results
Migration Options:
--destination, --to Destination site: mistergeek.net, webscroll.fr, hellogeek.net
--source, --from Source site for filtered migration
@@ -617,6 +760,9 @@ Examples:
seo update_meta --site A --category "VPN" --limit 10 # Update 10 posts in category
seo update_meta --site A --author "john" --limit 10 # Update 10 posts by author
seo update_meta --site A --dry-run # Preview changes
seo performance --ga4 analytics.csv --gsc search.csv # Analyze performance
seo keywords gsc_export.csv # Show keyword opportunities
seo report # Generate SEO report
seo status
""")
return 0

467
src/seo/media_importer.py Normal file
View File

@@ -0,0 +1,467 @@
"""
Media Importer - Import media from one WordPress site to another
Specifically designed for migrated posts
"""
import logging
import os
import tempfile
import requests
from requests.auth import HTTPBasicAuth
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Tuple
import csv
from .config import Config
logger = logging.getLogger(__name__)
class WordPressMediaImporter:
"""Import media from source WordPress site to destination site."""
def __init__(self, source_site: str = 'mistergeek.net',
destination_site: str = 'hellogeek.net'):
"""
Initialize media importer.
Args:
source_site: Source site name
destination_site: Destination site name
"""
self.source_site = source_site
self.destination_site = destination_site
self.sites = Config.WORDPRESS_SITES
# Validate sites
if source_site not in self.sites:
raise ValueError(f"Source site '{source_site}' not found")
if destination_site not in self.sites:
raise ValueError(f"Destination site '{destination_site}' not found")
# Setup source
self.source_config = self.sites[source_site]
self.source_url = self.source_config['url'].rstrip('/')
self.source_auth = HTTPBasicAuth(
self.source_config['username'],
self.source_config['password']
)
# Setup destination
self.dest_config = self.sites[destination_site]
self.dest_url = self.dest_config['url'].rstrip('/')
self.dest_auth = HTTPBasicAuth(
self.dest_config['username'],
self.dest_config['password']
)
self.media_cache = {} # Cache source media ID -> dest media ID
self.stats = {
'total_posts': 0,
'posts_with_media': 0,
'images_downloaded': 0,
'images_uploaded': 0,
'featured_images_set': 0,
'errors': 0
}
def fetch_migrated_posts(self, post_ids: Optional[List[int]] = None) -> List[Dict]:
"""
Fetch posts that need media imported.
Args:
post_ids: Specific post IDs to process
Returns:
List of post dicts
"""
logger.info(f"Fetching posts from {self.destination_site}...")
if post_ids:
# Fetch specific posts
posts = []
for post_id in post_ids:
try:
response = requests.get(
f"{self.dest_url}/wp-json/wp/v2/posts/{post_id}",
auth=self.dest_auth,
timeout=10
)
if response.status_code == 200:
posts.append(response.json())
except Exception as e:
logger.error(f"Error fetching post {post_id}: {e}")
return posts
else:
# Fetch recent posts (assuming migrated posts are recent)
try:
response = requests.get(
f"{self.dest_url}/wp-json/wp/v2/posts",
params={
'per_page': 100,
'status': 'publish,draft',
'_embed': True
},
auth=self.dest_auth,
timeout=30
)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Error fetching posts: {e}")
return []
def get_source_post(self, post_id: int) -> Optional[Dict]:
"""
Fetch corresponding post from source site.
Args:
post_id: Post ID on source site
Returns:
Post dict or None
"""
try:
response = requests.get(
f"{self.source_url}/wp-json/wp/v2/posts/{post_id}",
auth=self.source_auth,
timeout=10,
params={'_embed': True}
)
if response.status_code == 200:
return response.json()
else:
logger.warning(f"Source post {post_id} not found")
return None
except Exception as e:
logger.error(f"Error fetching source post {post_id}: {e}")
return None
def download_media(self, media_url: str) -> Optional[bytes]:
"""
Download media file from source site.
Args:
media_url: URL of media file
Returns:
File content bytes or None
"""
try:
response = requests.get(media_url, timeout=30)
response.raise_for_status()
return response.content
except Exception as e:
logger.error(f"Error downloading {media_url}: {e}")
return None
def upload_media(self, file_content: bytes, filename: str,
mime_type: str = 'image/jpeg',
alt_text: str = '',
caption: str = '') -> Optional[int]:
"""
Upload media to destination site.
Args:
file_content: File content bytes
filename: Filename for the media
mime_type: MIME type of the file
alt_text: Alt text for the image
caption: Caption for the image
Returns:
Media ID on destination site or None
"""
try:
# Upload file
files = {'file': (filename, file_content, mime_type)}
response = requests.post(
f"{self.dest_url}/wp-json/wp/v2/media",
files=files,
auth=self.dest_auth,
headers={
'Content-Disposition': f'attachment; filename={filename}',
'Content-Type': mime_type
},
timeout=30
)
if response.status_code == 201:
media_data = response.json()
media_id = media_data['id']
# Update alt text and caption
if alt_text or caption:
meta_update = {}
if alt_text:
meta_update['_wp_attachment_image_alt'] = alt_text
if caption:
meta_update['excerpt'] = caption
requests.post(
f"{self.dest_url}/wp-json/wp/v2/media/{media_id}",
json=meta_update,
auth=self.dest_auth,
timeout=10
)
logger.info(f"✓ Uploaded {filename} (ID: {media_id})")
return media_id
else:
logger.error(f"Error uploading {filename}: {response.status_code}")
return None
except Exception as e:
logger.error(f"Error uploading {filename}: {e}")
return None
def import_featured_image(self, source_post: Dict, dest_post_id: int) -> bool:
"""
Import featured image from source post to destination post.
Args:
source_post: Source post dict
dest_post_id: Destination post ID
Returns:
True if successful
"""
# Check if source has featured image
featured_media_id = source_post.get('featured_media')
if not featured_media_id:
logger.info(f" No featured image on source post")
return False
# Check if already imported
if featured_media_id in self.media_cache:
dest_media_id = self.media_cache[featured_media_id]
logger.info(f" Using cached media ID: {dest_media_id}")
else:
# Fetch media details from source
try:
media_response = requests.get(
f"{self.source_url}/wp-json/wp/v2/media/{featured_media_id}",
auth=self.source_auth,
timeout=10
)
if media_response.status_code != 200:
logger.error(f"Could not fetch media {featured_media_id}")
return False
media_data = media_response.json()
# Download media file
media_url = media_data.get('source_url', '')
if not media_url:
# Try alternative URL structure
media_url = media_data.get('guid', {}).get('rendered', '')
file_content = self.download_media(media_url)
if not file_content:
return False
# Extract filename and mime type
filename = media_data.get('slug', 'image.jpg') + '.jpg'
mime_type = media_data.get('mime_type', 'image/jpeg')
alt_text = media_data.get('alt_text', '')
caption = media_data.get('caption', {}).get('rendered', '')
# Upload to destination
dest_media_id = self.upload_media(
file_content, filename, mime_type, alt_text, caption
)
if not dest_media_id:
return False
# Cache the mapping
self.media_cache[featured_media_id] = dest_media_id
self.stats['images_uploaded'] += 1
except Exception as e:
logger.error(f"Error importing featured image: {e}")
return False
# Set featured image on destination post
try:
response = requests.post(
f"{self.dest_url}/wp-json/wp/v2/posts/{dest_post_id}",
json={'featured_media': dest_media_id},
auth=self.dest_auth,
timeout=10
)
if response.status_code == 200:
logger.info(f"✓ Set featured image on post {dest_post_id}")
self.stats['featured_images_set'] += 1
return True
else:
logger.error(f"Error setting featured image: {response.status_code}")
return False
except Exception as e:
logger.error(f"Error setting featured image: {e}")
return False
def import_post_media(self, source_post: Dict, dest_post_id: int) -> int:
"""
Import all media from a post (featured image + inline images).
Args:
source_post: Source post dict
dest_post_id: Destination post ID
Returns:
Number of images imported
"""
images_imported = 0
# Import featured image
if self.import_featured_image(source_post, dest_post_id):
images_imported += 1
# TODO: Import inline images from content
# This would require parsing the content for <img> tags
# and replacing source URLs with destination URLs
return images_imported
def process_posts(self, post_mappings: List[Tuple[int, int]],
dry_run: bool = False) -> Dict:
"""
Process media import for mapped posts.
Args:
post_mappings: List of (source_post_id, dest_post_id) tuples
dry_run: If True, preview without importing
Returns:
Statistics dict
"""
logger.info("\n" + "="*70)
logger.info("MEDIA IMPORTER")
logger.info("="*70)
logger.info(f"Source: {self.source_site}")
logger.info(f"Destination: {self.destination_site}")
logger.info(f"Posts to process: {len(post_mappings)}")
logger.info(f"Dry run: {dry_run}")
logger.info("="*70)
self.stats['total_posts'] = len(post_mappings)
for i, (source_id, dest_id) in enumerate(post_mappings, 1):
logger.info(f"\n[{i}/{len(post_mappings)}] Processing post mapping:")
logger.info(f" Source: {source_id} → Destination: {dest_id}")
# Fetch source post
source_post = self.get_source_post(source_id)
if not source_post:
logger.warning(f" Skipping: Source post not found")
self.stats['errors'] += 1
continue
# Check if source has media
if not source_post.get('featured_media'):
logger.info(f" No featured image to import")
continue
self.stats['posts_with_media'] += 1
if dry_run:
logger.info(f" [DRY RUN] Would import featured image")
self.stats['images_downloaded'] += 1
self.stats['images_uploaded'] += 1
self.stats['featured_images_set'] += 1
else:
# Import media
imported = self.import_post_media(source_post, dest_id)
if imported > 0:
self.stats['images_downloaded'] += imported
# Print summary
logger.info("\n" + "="*70)
logger.info("IMPORT SUMMARY")
logger.info("="*70)
logger.info(f"Total posts: {self.stats['total_posts']}")
logger.info(f"Posts with media: {self.stats['posts_with_media']}")
logger.info(f"Images downloaded: {self.stats['images_downloaded']}")
logger.info(f"Images uploaded: {self.stats['images_uploaded']}")
logger.info(f"Featured images set: {self.stats['featured_images_set']}")
logger.info(f"Errors: {self.stats['errors']}")
logger.info("="*70)
return self.stats
def run_from_csv(self, csv_file: str, dry_run: bool = False) -> Dict:
"""
Import media for posts listed in CSV file.
CSV should have columns: source_post_id, destination_post_id
Args:
csv_file: Path to CSV file with post mappings
dry_run: If True, preview without importing
Returns:
Statistics dict
"""
logger.info(f"Loading post mappings from: {csv_file}")
try:
with open(csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
mappings = []
for row in reader:
source_id = int(row.get('source_post_id', 0))
dest_id = int(row.get('destination_post_id', 0))
if source_id and dest_id:
mappings.append((source_id, dest_id))
logger.info(f"✓ Loaded {len(mappings)} post mappings")
except Exception as e:
logger.error(f"Error loading CSV: {e}")
return self.stats
return self.process_posts(mappings, dry_run=dry_run)
def run_from_migration_report(self, report_file: str,
dry_run: bool = False) -> Dict:
"""
Import media using migration report CSV.
Args:
report_file: Path to migration report CSV
dry_run: If True, preview without importing
Returns:
Statistics dict
"""
logger.info(f"Loading migration report: {report_file}")
try:
with open(report_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
mappings = []
for row in reader:
source_id = int(row.get('source_post_id', 0))
dest_id = int(row.get('destination_post_id', 0))
if source_id and dest_id:
mappings.append((source_id, dest_id))
logger.info(f"✓ Loaded {len(mappings)} post mappings from migration report")
except Exception as e:
logger.error(f"Error loading migration report: {e}")
return self.stats
return self.process_posts(mappings, dry_run=dry_run)

View File

@@ -0,0 +1,396 @@
"""
SEO Performance Analyzer - Analyze page performance from imported data
Supports Google Analytics and Search Console CSV imports
"""
import csv
import logging
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
class PerformanceAnalyzer:
"""Analyze SEO performance from imported CSV data."""
def __init__(self):
"""Initialize performance analyzer."""
self.performance_data = []
self.analysis_results = {}
def load_ga4_export(self, csv_file: str) -> List[Dict]:
"""
Load Google Analytics 4 export CSV.
Expected columns: page_path, page_title, pageviews, sessions, bounce_rate, etc.
Args:
csv_file: Path to GA4 export CSV
Returns:
List of data dicts
"""
logger.info(f"Loading GA4 export: {csv_file}")
try:
with open(csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
data = list(reader)
# Normalize column names
normalized = []
for row in data:
normalized_row = {}
for key, value in row.items():
# Normalize key names
new_key = key.lower().replace(' ', '_').replace('-', '_')
if 'page' in new_key and 'path' in new_key:
normalized_row['page'] = value
elif 'page' in new_key and 'title' in new_key:
normalized_row['page_title'] = value
elif 'pageviews' in new_key or 'views' in new_key:
normalized_row['pageviews'] = int(value) if value else 0
elif 'sessions' in new_key:
normalized_row['sessions'] = int(value) if value else 0
elif 'bounce' in new_key and 'rate' in new_key:
normalized_row['bounce_rate'] = float(value) if value else 0.0
elif 'engagement' in new_key and 'rate' in new_key:
normalized_row['engagement_rate'] = float(value) if value else 0.0
elif 'duration' in new_key or 'time' in new_key:
normalized_row['avg_session_duration'] = float(value) if value else 0.0
else:
normalized_row[new_key] = value
normalized.append(normalized_row)
self.performance_data.extend(normalized)
logger.info(f"✓ Loaded {len(normalized)} rows from GA4")
return normalized
except Exception as e:
logger.error(f"Error loading GA4 export: {e}")
return []
def load_gsc_export(self, csv_file: str) -> List[Dict]:
"""
Load Google Search Console export CSV.
Expected columns: Page, Clicks, Impressions, CTR, Position
Args:
csv_file: Path to GSC export CSV
Returns:
List of data dicts
"""
logger.info(f"Loading GSC export: {csv_file}")
try:
with open(csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
data = list(reader)
# Normalize column names
normalized = []
for row in data:
normalized_row = {'page': ''}
for key, value in row.items():
new_key = key.lower().replace(' ', '_')
if 'page' in new_key or 'url' in new_key:
normalized_row['page'] = value
elif 'clicks' in new_key:
normalized_row['clicks'] = int(value) if value else 0
elif 'impressions' in new_key:
normalized_row['impressions'] = int(value) if value else 0
elif 'ctr' in new_key:
normalized_row['ctr'] = float(value) if value else 0.0
elif 'position' in new_key or 'rank' in new_key:
normalized_row['position'] = float(value) if value else 0.0
elif 'query' in new_key or 'keyword' in new_key:
normalized_row['query'] = value
normalized.append(normalized_row)
# Merge with existing data
self._merge_gsc_data(normalized)
logger.info(f"✓ Loaded {len(normalized)} rows from GSC")
return normalized
except Exception as e:
logger.error(f"Error loading GSC export: {e}")
return []
def _merge_gsc_data(self, gsc_data: List[Dict]):
"""Merge GSC data with existing performance data."""
# Create lookup by page
existing_pages = {p.get('page', ''): p for p in self.performance_data}
for gsc_row in gsc_data:
page = gsc_row.get('page', '')
if page in existing_pages:
# Update existing record
existing_pages[page].update(gsc_row)
else:
# Add new record
new_record = {
'page': page,
'page_title': '',
'pageviews': 0,
'sessions': 0,
'bounce_rate': 0.0,
'engagement_rate': 0.0,
'avg_session_duration': 0.0
}
new_record.update(gsc_row)
self.performance_data.append(new_record)
def analyze(self) -> Dict:
"""
Analyze performance data.
Returns:
Analysis results dict
"""
if not self.performance_data:
logger.warning("No data to analyze")
return {}
logger.info("\n" + "="*70)
logger.info("PERFORMANCE ANALYSIS")
logger.info("="*70)
# Calculate summary metrics
total_pages = len(self.performance_data)
total_pageviews = sum(p.get('pageviews', 0) for p in self.performance_data)
total_clicks = sum(p.get('clicks', 0) for p in self.performance_data)
total_impressions = sum(p.get('impressions', 0) for p in self.performance_data)
avg_ctr = total_clicks / total_impressions if total_impressions > 0 else 0.0
avg_position = sum(p.get('position', 0) for p in self.performance_data) / total_pages if total_pages > 0 else 0.0
# Top pages
top_by_views = sorted(
self.performance_data,
key=lambda x: x.get('pageviews', 0),
reverse=True
)[:20]
top_by_clicks = sorted(
self.performance_data,
key=lambda x: x.get('clicks', 0),
reverse=True
)[:20]
# Pages with issues
low_ctr = [
p for p in self.performance_data
if p.get('impressions', 0) > 100 and p.get('ctr', 0) < 0.02
]
low_position = [
p for p in self.performance_data
if p.get('impressions', 0) > 50 and p.get('position', 0) > 20
]
high_impressions_low_clicks = [
p for p in self.performance_data
if p.get('impressions', 0) > 500 and p.get('ctr', 0) < 0.01
]
# Keyword opportunities (from GSC data)
keyword_opportunities = self._analyze_keywords()
analysis = {
'summary': {
'total_pages': total_pages,
'total_pageviews': total_pageviews,
'total_clicks': total_clicks,
'total_impressions': total_impressions,
'average_ctr': avg_ctr,
'average_position': avg_position
},
'top_pages': {
'by_views': top_by_views,
'by_clicks': top_by_clicks
},
'issues': {
'low_ctr': low_ctr,
'low_position': low_position,
'high_impressions_low_clicks': high_impressions_low_clicks
},
'keyword_opportunities': keyword_opportunities,
'recommendations': self._generate_recommendations(analysis)
}
# Log summary
logger.info(f"Total pages analyzed: {total_pages}")
logger.info(f"Total pageviews: {total_pageviews}")
logger.info(f"Total clicks: {total_clicks}")
logger.info(f"Total impressions: {total_impressions}")
logger.info(f"Average CTR: {avg_ctr:.2%}")
logger.info(f"Average position: {avg_position:.1f}")
logger.info(f"\nPages with low CTR: {len(low_ctr)}")
logger.info(f"Pages with low position: {len(low_position)}")
logger.info(f"High impression, low click pages: {len(high_impressions_low_clicks)}")
logger.info("="*70)
self.analysis_results = analysis
return analysis
def _analyze_keywords(self) -> List[Dict]:
"""Analyze keyword opportunities from GSC data."""
keywords = {}
for page in self.performance_data:
query = page.get('query', '')
if not query:
continue
if query not in keywords:
keywords[query] = {
'query': query,
'clicks': 0,
'impressions': 0,
'position': 0.0,
'pages': []
}
keywords[query]['clicks'] += page.get('clicks', 0)
keywords[query]['impressions'] += page.get('impressions', 0)
keywords[query]['pages'].append(page.get('page', ''))
# Calculate average position per keyword
for query in keywords:
positions = [
p.get('position', 0) for p in self.performance_data
if p.get('query') == query
]
if positions:
keywords[query]['position'] = sum(positions) / len(positions)
# Sort by impressions
keyword_list = list(keywords.values())
keyword_list.sort(key=lambda x: x['impressions'], reverse=True)
# Filter opportunities (position 5-20, high impressions)
opportunities = [
k for k in keyword_list
if 5 <= k['position'] <= 20 and k['impressions'] > 100
]
return opportunities[:50] # Top 50 opportunities
def _generate_recommendations(self, analysis: Dict) -> List[str]:
"""Generate SEO recommendations."""
recommendations = []
issues = analysis.get('issues', {})
# Low CTR
low_ctr_count = len(issues.get('low_ctr', []))
if low_ctr_count > 0:
recommendations.append(
f"📝 {low_ctr_count} pages have low CTR (<2% with 100+ impressions). "
"Improve meta titles and descriptions to increase click-through rate."
)
# Low position
low_pos_count = len(issues.get('low_position', []))
if low_pos_count > 0:
recommendations.append(
f"📊 {low_pos_count} pages rank beyond position 20. "
"Consider content optimization and internal linking."
)
# High impressions, low clicks
high_imp_count = len(issues.get('high_impressions_low_clicks', []))
if high_imp_count > 0:
recommendations.append(
f"⚠️ {high_imp_count} pages have 500+ impressions but <1% CTR. "
"These are prime candidates for title/description optimization."
)
# Keyword opportunities
keyword_count = len(analysis.get('keyword_opportunities', []))
if keyword_count > 0:
recommendations.append(
f"🎯 {keyword_count} keyword opportunities identified (ranking 5-20). "
"Focus content optimization on these keywords."
)
return recommendations
def save_analysis(self, output_file: Optional[str] = None) -> str:
"""
Save analysis results to CSV.
Args:
output_file: Custom output file path
Returns:
Path to saved file
"""
if not output_file:
output_dir = Path(__file__).parent.parent.parent / 'output'
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = output_dir / f'performance_analysis_{timestamp}.csv'
output_file = Path(output_file)
output_file.parent.mkdir(parents=True, exist_ok=True)
fieldnames = [
'page', 'page_title', 'pageviews', 'sessions', 'bounce_rate',
'engagement_rate', 'avg_session_duration', 'clicks', 'impressions',
'ctr', 'position', 'query'
]
logger.info(f"Saving analysis to {output_file}...")
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(self.performance_data)
logger.info(f"✓ Saved to: {output_file}")
return str(output_file)
def run(self, ga4_file: Optional[str] = None,
gsc_file: Optional[str] = None,
output_file: Optional[str] = None) -> Tuple[str, Dict]:
"""
Run complete performance analysis.
Args:
ga4_file: Path to GA4 export CSV
gsc_file: Path to GSC export CSV
output_file: Custom output file path
Returns:
Tuple of (output_file_path, analysis_dict)
"""
logger.info("\n" + "="*70)
logger.info("SEO PERFORMANCE ANALYZER")
logger.info("="*70)
# Load data
if ga4_file:
self.load_ga4_export(ga4_file)
if gsc_file:
self.load_gsc_export(gsc_file)
if not self.performance_data:
logger.error("No data loaded. Provide GA4 and/or GSC export files.")
return "", {}
# Analyze
analysis = self.analyze()
# Save
output_path = self.save_analysis(output_file)
return output_path, analysis

View File

@@ -0,0 +1,494 @@
"""
SEO Performance Tracker - Google Analytics 4 & Search Console Integration
Fetch and analyze page performance data for SEO optimization
"""
import csv
import json
import logging
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
# Optional Google imports
try:
from google.analytics.admin import AnalyticsAdminServiceClient
from google.analytics.data import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import (
DateRange,
Dimension,
Metric,
RunReportRequest,
)
from google.oauth2 import service_account
from googleapiclient.discovery import build
GOOGLE_AVAILABLE = True
except ImportError:
GOOGLE_AVAILABLE = False
logger = logging.getLogger(__name__)
logger.warning("Google libraries not installed. API mode disabled. Use CSV imports instead.")
from .config import Config
logger = logging.getLogger(__name__)
class SEOPerformanceTracker:
"""Track and analyze SEO performance from Google Analytics and Search Console."""
def __init__(self, ga4_credentials: Optional[str] = None,
gsc_credentials: Optional[str] = None,
ga4_property_id: Optional[str] = None,
gsc_site_url: Optional[str] = None):
"""
Initialize performance tracker.
Args:
ga4_credentials: Path to GA4 service account JSON
gsc_credentials: Path to GSC service account JSON
ga4_property_id: GA4 property ID (e.g., "properties/123456789")
gsc_site_url: GSC site URL (e.g., "https://www.mistergeek.net")
"""
self.ga4_credentials = ga4_credentials or Config.GA4_CREDENTIALS
self.gsc_credentials = gsc_credentials or Config.GSC_CREDENTIALS
self.ga4_property_id = ga4_property_id or Config.GA4_PROPERTY_ID
self.gsc_site_url = gsc_site_url or Config.GSC_SITE_URL
self.ga4_client = None
self.gsc_service = None
# Initialize clients
self._init_ga4_client()
self._init_gsc_service()
self.performance_data = []
def _init_ga4_client(self):
"""Initialize Google Analytics 4 client."""
if not GOOGLE_AVAILABLE:
logger.warning("Google libraries not installed. API mode disabled.")
return
if not self.ga4_credentials or not self.ga4_property_id:
logger.warning("GA4 credentials not configured")
return
try:
credentials = service_account.Credentials.from_service_account_file(
self.ga4_credentials,
scopes=["https://www.googleapis.com/auth/analytics.readonly"]
)
self.ga4_client = BetaAnalyticsDataClient(credentials=credentials)
logger.info("✓ GA4 client initialized")
except Exception as e:
logger.error(f"Failed to initialize GA4 client: {e}")
self.ga4_client = None
def _init_gsc_service(self):
"""Initialize Google Search Console service."""
if not GOOGLE_AVAILABLE:
logger.warning("Google libraries not installed. API mode disabled.")
return
if not self.gsc_credentials:
logger.warning("GSC credentials not configured")
return
try:
credentials = service_account.Credentials.from_service_account_file(
self.gsc_credentials,
scopes=["https://www.googleapis.com/auth/webmasters.readonly"]
)
self.gsc_service = build('webmasters', 'v3', credentials=credentials)
logger.info("✓ GSC service initialized")
except Exception as e:
logger.error(f"Failed to initialize GSC service: {e}")
self.gsc_service = None
def fetch_ga4_data(self, start_date: str, end_date: str,
dimensions: Optional[List[str]] = None) -> List[Dict]:
"""
Fetch data from Google Analytics 4.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
dimensions: List of dimensions to fetch
Returns:
List of performance data dicts
"""
if not self.ga4_client:
logger.warning("GA4 client not available")
return []
logger.info(f"Fetching GA4 data from {start_date} to {end_date}...")
# Default dimensions
if dimensions is None:
dimensions = ['pagePath', 'pageTitle']
# Default metrics
metrics = [
'screenPageViews',
'sessions',
'bounceRate',
'averageSessionDuration',
'engagementRate'
]
try:
request = RunReportRequest(
property=self.ga4_property_id,
dimensions=[Dimension(name=dim) for dim in dimensions],
metrics=[Metric(name=metric) for metric in metrics],
date_ranges=[DateRange(start_date=start_date, end_date=end_date)]
)
response = self.ga4_client.run_report(request)
data = []
for row in response.rows:
row_data = {}
# Extract dimensions
for i, dim_header in enumerate(response.dimension_headers):
row_data[dim_header.name] = row.dimension_values[i].value
# Extract metrics
for i, metric_header in enumerate(response.metric_headers):
value = row.metric_values[i].value
# Convert to appropriate type
if metric_header.name in ['bounceRate', 'engagementRate']:
value = float(value) if value else 0.0
elif metric_header.name in ['screenPageViews', 'sessions']:
value = int(value) if value else 0
elif metric_header.name == 'averageSessionDuration':
value = float(value) if value else 0.0
row_data[metric_header.name] = value
data.append(row_data)
logger.info(f"✓ Fetched {len(data)} rows from GA4")
return data
except Exception as e:
logger.error(f"Error fetching GA4 data: {e}")
return []
def fetch_gsc_data(self, start_date: str, end_date: str,
dimensions: Optional[List[str]] = None) -> List[Dict]:
"""
Fetch data from Google Search Console.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
dimensions: List of dimensions to fetch
Returns:
List of performance data dicts
"""
if not self.gsc_service:
logger.warning("GSC service not available")
return []
logger.info(f"Fetching GSC data from {start_date} to {end_date}...")
# Default dimensions
if dimensions is None:
dimensions = ['page']
try:
# Build request
request = {
'startDate': start_date,
'endDate': end_date,
'dimensions': dimensions,
'rowLimit': 5000,
'startRow': 0
}
response = self.gsc_service.searchanalytics().query(
siteUrl=self.gsc_site_url,
body=request
).execute()
data = []
if 'rows' in response:
for row in response['rows']:
row_data = {
'page': row['keys'][0] if len(row['keys']) > 0 else '',
'clicks': row.get('clicks', 0),
'impressions': row.get('impressions', 0),
'ctr': row.get('ctr', 0.0),
'position': row.get('position', 0.0)
}
# Add query if available
if len(row['keys']) > 1:
row_data['query'] = row['keys'][1]
data.append(row_data)
logger.info(f"✓ Fetched {len(data)} rows from GSC")
return data
except Exception as e:
logger.error(f"Error fetching GSC data: {e}")
return []
def fetch_combined_data(self, start_date: str, end_date: str) -> List[Dict]:
"""
Fetch and combine data from GA4 and GSC.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
Returns:
List of combined performance data dicts
"""
logger.info("\n" + "="*70)
logger.info("FETCHING PERFORMANCE DATA")
logger.info("="*70)
# Fetch from both sources
ga4_data = self.fetch_ga4_data(start_date, end_date)
gsc_data = self.fetch_gsc_data(start_date, end_date)
# Combine data by page path
combined = {}
# Add GA4 data
for row in ga4_data:
page_path = row.get('pagePath', '')
combined[page_path] = {
'page': page_path,
'page_title': row.get('pageTitle', ''),
'pageviews': row.get('screenPageViews', 0),
'sessions': row.get('sessions', 0),
'bounce_rate': row.get('bounceRate', 0.0),
'avg_session_duration': row.get('averageSessionDuration', 0.0),
'engagement_rate': row.get('engagementRate', 0.0),
'clicks': 0,
'impressions': 0,
'ctr': 0.0,
'position': 0.0
}
# Merge GSC data
for row in gsc_data:
page_path = row.get('page', '')
if page_path in combined:
# Update existing record
combined[page_path]['clicks'] = row.get('clicks', 0)
combined[page_path]['impressions'] = row.get('impressions', 0)
combined[page_path]['ctr'] = row.get('ctr', 0.0)
combined[page_path]['position'] = row.get('position', 0.0)
else:
# Create new record
combined[page_path] = {
'page': page_path,
'page_title': '',
'pageviews': 0,
'sessions': 0,
'bounce_rate': 0.0,
'avg_session_duration': 0.0,
'engagement_rate': 0.0,
'clicks': row.get('clicks', 0),
'impressions': row.get('impressions', 0),
'ctr': row.get('ctr', 0.0),
'position': row.get('position', 0.0)
}
self.performance_data = list(combined.values())
logger.info(f"✓ Combined {len(self.performance_data)} pages")
logger.info("="*70)
return self.performance_data
def analyze_performance(self) -> Dict:
"""
Analyze performance data and generate insights.
Returns:
Analysis results dict
"""
if not self.performance_data:
return {}
logger.info("\n" + "="*70)
logger.info("PERFORMANCE ANALYSIS")
logger.info("="*70)
# Calculate metrics
total_pageviews = sum(p.get('pageviews', 0) for p in self.performance_data)
total_clicks = sum(p.get('clicks', 0) for p in self.performance_data)
total_impressions = sum(p.get('impressions', 0) for p in self.performance_data)
avg_ctr = total_clicks / total_impressions if total_impressions > 0 else 0
avg_position = sum(p.get('position', 0) for p in self.performance_data) / len(self.performance_data)
# Top pages by pageviews
top_pages = sorted(
self.performance_data,
key=lambda x: x.get('pageviews', 0),
reverse=True
)[:10]
# Top pages by CTR
top_ctr = sorted(
[p for p in self.performance_data if p.get('impressions', 0) > 100],
key=lambda x: x.get('ctr', 0),
reverse=True
)[:10]
# Pages needing improvement (low CTR)
low_ctr = [
p for p in self.performance_data
if p.get('impressions', 0) > 100 and p.get('ctr', 0) < 0.02
]
# Pages with good traffic but low position
opportunity_pages = [
p for p in self.performance_data
if p.get('pageviews', 0) > 50 and p.get('position', 0) > 10
]
analysis = {
'summary': {
'total_pages': len(self.performance_data),
'total_pageviews': total_pageviews,
'total_clicks': total_clicks,
'total_impressions': total_impressions,
'average_ctr': avg_ctr,
'average_position': avg_position
},
'top_pages': top_pages,
'top_ctr': top_ctr,
'low_ctr': low_ctr,
'opportunities': opportunity_pages,
'recommendations': self._generate_recommendations(analysis)
}
# Log summary
logger.info(f"Total pages: {analysis['summary']['total_pages']}")
logger.info(f"Total pageviews: {analysis['summary']['total_pageviews']}")
logger.info(f"Total clicks: {analysis['summary']['total_clicks']}")
logger.info(f"Average CTR: {analysis['summary']['average_ctr']:.2%}")
logger.info(f"Average position: {analysis['summary']['average_position']:.1f}")
logger.info("="*70)
return analysis
def _generate_recommendations(self, analysis: Dict) -> List[str]:
"""Generate SEO recommendations based on analysis."""
recommendations = []
# Low CTR recommendations
low_ctr_count = len(analysis.get('low_ctr', []))
if low_ctr_count > 0:
recommendations.append(
f"📝 {low_ctr_count} pages have low CTR (<2%). "
"Consider improving meta titles and descriptions."
)
# Position opportunities
opportunity_count = len(analysis.get('opportunities', []))
if opportunity_count > 0:
recommendations.append(
f"🎯 {opportunity_count} pages have good traffic but rank >10. "
"Optimize content to improve rankings."
)
# High impressions, low clicks
high_impressions = [
p for p in self.performance_data
if p.get('impressions', 0) > 1000 and p.get('ctr', 0) < 0.01
]
if high_impressions:
recommendations.append(
f"⚠️ {len(high_impressions)} pages have high impressions but very low CTR. "
"Review title tags for better click appeal."
)
return recommendations
def save_to_csv(self, output_file: Optional[str] = None) -> str:
"""
Save performance data to CSV.
Args:
output_file: Custom output file path
Returns:
Path to saved file
"""
if not output_file:
output_dir = Path(__file__).parent.parent.parent / 'output'
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = output_dir / f'performance_data_{timestamp}.csv'
output_file = Path(output_file)
output_file.parent.mkdir(parents=True, exist_ok=True)
fieldnames = [
'page', 'page_title', 'pageviews', 'sessions', 'bounce_rate',
'avg_session_duration', 'engagement_rate', 'clicks', 'impressions',
'ctr', 'position'
]
logger.info(f"Saving {len(self.performance_data)} rows to {output_file}...")
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(self.performance_data)
logger.info(f"✓ Saved to: {output_file}")
return str(output_file)
def run(self, start_date: Optional[str] = None,
end_date: Optional[str] = None,
output_file: Optional[str] = None) -> Tuple[str, Dict]:
"""
Run complete performance analysis.
Args:
start_date: Start date (YYYY-MM-DD), default 30 days ago
end_date: End date (YYYY-MM-DD), default yesterday
output_file: Custom output file path
Returns:
Tuple of (output_file_path, analysis_dict)
"""
# Default date range (last 30 days)
if not end_date:
end_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
if not start_date:
start_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
logger.info("\n" + "="*70)
logger.info("SEO PERFORMANCE ANALYSIS")
logger.info("="*70)
logger.info(f"Date range: {start_date} to {end_date}")
logger.info("="*70)
# Fetch data
self.fetch_combined_data(start_date, end_date)
if not self.performance_data:
logger.warning("No performance data available")
return "", {}
# Analyze
analysis = self.analyze_performance()
# Save
output_path = self.save_to_csv(output_file)
return output_path, analysis