Compare commits
2 Commits
master
...
working-co
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
69e4287366 | ||
|
|
6ef268ba80 |
355
PERFORMANCE_TRACKING_GUIDE.md
Normal file
355
PERFORMANCE_TRACKING_GUIDE.md
Normal file
@@ -0,0 +1,355 @@
|
||||
# SEO Performance Tracking Guide
|
||||
|
||||
Track and analyze your website's SEO performance using Google Analytics 4 and Google Search Console data.
|
||||
|
||||
## Overview
|
||||
|
||||
The SEO performance tracking features allow you to:
|
||||
|
||||
- **Analyze page performance** - Track pageviews, clicks, impressions, CTR, and rankings
|
||||
- **Find keyword opportunities** - Discover keywords you can rank higher for
|
||||
- **Generate SEO reports** - Create comprehensive performance reports
|
||||
- **Import data** - Support for both CSV imports and API integration
|
||||
|
||||
## Commands
|
||||
|
||||
### 1. `seo performance` - Analyze Page Performance
|
||||
|
||||
Analyze traffic and search performance data.
|
||||
|
||||
**Usage:**
|
||||
|
||||
```bash
|
||||
# Analyze with CSV exports
|
||||
./seo performance --ga4 analytics.csv --gsc search.csv
|
||||
|
||||
# Analyze GA4 data only
|
||||
./seo performance --ga4 analytics.csv
|
||||
|
||||
# Analyze GSC data only
|
||||
./seo performance --gsc search.csv
|
||||
|
||||
# With custom output
|
||||
./seo performance --ga4 analytics.csv --gsc search.csv --output custom_analysis.csv
|
||||
|
||||
# Preview
|
||||
./seo performance --ga4 analytics.csv --dry-run
|
||||
```
|
||||
|
||||
**Data Sources:**
|
||||
|
||||
- **Google Analytics 4**: Export from GA4 → Reports → Engagement → Pages and screens
|
||||
- **Google Search Console**: Export from GSC → Performance → Search results → Export
|
||||
|
||||
**Metrics Analyzed:**
|
||||
|
||||
| Metric | Source | Description |
|
||||
|--------|--------|-------------|
|
||||
| Pageviews | GA4 | Number of page views |
|
||||
| Sessions | GA4 | Number of sessions |
|
||||
| Bounce Rate | GA4 | Percentage of single-page sessions |
|
||||
| Engagement Rate | GA4 | Percentage of engaged sessions |
|
||||
| Clicks | GSC | Number of search clicks |
|
||||
| Impressions | GSC | Number of search impressions |
|
||||
| CTR | GSC | Click-through rate |
|
||||
| Position | GSC | Average search ranking |
|
||||
|
||||
### 2. `seo keywords` - Keyword Opportunities
|
||||
|
||||
Find keywords you can optimize for better rankings.
|
||||
|
||||
**Usage:**
|
||||
|
||||
```bash
|
||||
# Analyze keyword opportunities
|
||||
./seo keywords gsc_export.csv
|
||||
|
||||
# Limit results
|
||||
./seo keywords gsc_export.csv --limit 20
|
||||
|
||||
# Custom output
|
||||
./seo keywords gsc_export.csv --output keywords.csv
|
||||
```
|
||||
|
||||
**What It Finds:**
|
||||
|
||||
- Keywords ranking positions 5-20 (easy to improve)
|
||||
- High impression keywords with low CTR
|
||||
- Keywords with good traffic potential
|
||||
|
||||
**Example Output:**
|
||||
|
||||
```
|
||||
✅ Found 47 keyword opportunities!
|
||||
|
||||
Top opportunities:
|
||||
1. best vpn 2024 - Position: 8.5, Impressions: 1250
|
||||
2. torrent client - Position: 12.3, Impressions: 890
|
||||
3. vpn for gaming - Position: 9.1, Impressions: 650
|
||||
```
|
||||
|
||||
### 3. `seo report` - Generate SEO Report
|
||||
|
||||
Create comprehensive SEO performance reports.
|
||||
|
||||
**Usage:**
|
||||
|
||||
```bash
|
||||
# Generate report
|
||||
./seo report
|
||||
|
||||
# Custom output
|
||||
./seo report --output monthly_seo_report.md
|
||||
```
|
||||
|
||||
**Report Includes:**
|
||||
|
||||
- Performance summary
|
||||
- Traffic analysis
|
||||
- Keyword opportunities
|
||||
- SEO recommendations
|
||||
- Action items
|
||||
|
||||
## Data Export Guides
|
||||
|
||||
### Export from Google Analytics 4
|
||||
|
||||
1. Go to **Google Analytics** → Your Property
|
||||
2. Navigate to **Reports** → **Engagement** → **Pages and screens**
|
||||
3. Set date range (e.g., last 30 days)
|
||||
4. Click **Share** → **Download file** → **CSV**
|
||||
5. Save as `ga4_export.csv`
|
||||
|
||||
**Required Columns:**
|
||||
- Page path
|
||||
- Page title
|
||||
- Views (pageviews)
|
||||
- Sessions
|
||||
- Bounce rate
|
||||
- Engagement rate
|
||||
|
||||
### Export from Google Search Console
|
||||
|
||||
1. Go to **Google Search Console** → Your Property
|
||||
2. Click **Performance** → **Search results**
|
||||
3. Set date range (e.g., last 30 days)
|
||||
4. Check all metrics: Clicks, Impressions, CTR, Position
|
||||
5. Click **Export** → **CSV**
|
||||
6. Save as `gsc_export.csv`
|
||||
|
||||
**Required Columns:**
|
||||
- Page (URL)
|
||||
- Clicks
|
||||
- Impressions
|
||||
- CTR
|
||||
- Position
|
||||
|
||||
## API Integration (Advanced)
|
||||
|
||||
For automated data fetching, configure API credentials:
|
||||
|
||||
### 1. Google Analytics 4 API
|
||||
|
||||
**Setup:**
|
||||
|
||||
1. Go to [Google Cloud Console](https://console.cloud.google.com/)
|
||||
2. Create a new project or select existing
|
||||
3. Enable **Google Analytics Data API**
|
||||
4. Create service account credentials
|
||||
5. Download JSON key file
|
||||
6. Share GA4 property with service account email
|
||||
|
||||
**Configuration:**
|
||||
|
||||
Add to `.env`:
|
||||
```
|
||||
GA4_CREDENTIALS=/path/to/ga4-credentials.json
|
||||
GA4_PROPERTY_ID=properties/123456789
|
||||
```
|
||||
|
||||
### 2. Google Search Console API
|
||||
|
||||
**Setup:**
|
||||
|
||||
1. Go to [Google Cloud Console](https://console.cloud.google.com/)
|
||||
2. Enable **Search Console API**
|
||||
3. Create service account credentials
|
||||
4. Download JSON key file
|
||||
5. Share GSC property with service account email
|
||||
|
||||
**Configuration:**
|
||||
|
||||
Add to `.env`:
|
||||
```
|
||||
GSC_CREDENTIALS=/path/to/gsc-credentials.json
|
||||
GSC_SITE_URL=https://www.mistergeek.net
|
||||
```
|
||||
|
||||
### Using API Mode
|
||||
|
||||
Once configured, you can run without CSV files:
|
||||
|
||||
```bash
|
||||
# Fetch data directly from APIs
|
||||
./seo performance --start-date 2024-01-01 --end-date 2024-01-31
|
||||
```
|
||||
|
||||
## Performance Insights
|
||||
|
||||
### Low CTR Pages
|
||||
|
||||
Pages with high impressions but low CTR need better titles/descriptions:
|
||||
|
||||
```bash
|
||||
# Find pages with <2% CTR and 100+ impressions
|
||||
./seo performance --gsc search.csv
|
||||
# Check "low_ctr" section in output
|
||||
```
|
||||
|
||||
**Action:** Optimize meta titles and descriptions
|
||||
|
||||
### Low Position Pages
|
||||
|
||||
Pages ranking beyond position 20 need content optimization:
|
||||
|
||||
```bash
|
||||
# Find pages ranking >20 with 50+ impressions
|
||||
./seo performance --gsc search.csv
|
||||
# Check "low_position" section in output
|
||||
```
|
||||
|
||||
**Action:** Improve content quality, add internal links
|
||||
|
||||
### Keyword Opportunities
|
||||
|
||||
Keywords ranking 5-20 are easy to improve:
|
||||
|
||||
```bash
|
||||
./seo keywords gsc_export.csv --limit 50
|
||||
```
|
||||
|
||||
**Action:** Optimize content for these specific keywords
|
||||
|
||||
## Workflow Examples
|
||||
|
||||
### Weekly Performance Check
|
||||
|
||||
```bash
|
||||
# 1. Export fresh data from GA4 and GSC
|
||||
# 2. Analyze performance
|
||||
./seo performance --ga4 weekly_ga4.csv --gsc weekly_gsc.csv
|
||||
|
||||
# 3. Review keyword opportunities
|
||||
./seo keywords weekly_gsc.csv --limit 20
|
||||
|
||||
# 4. Generate report
|
||||
./seo report --output weekly_report.md
|
||||
```
|
||||
|
||||
### Monthly SEO Audit
|
||||
|
||||
```bash
|
||||
# 1. Export full month data
|
||||
# 2. Comprehensive analysis
|
||||
./seo performance --ga4 month_ga4.csv --gsc month_gsc.csv
|
||||
|
||||
# 3. Identify top issues
|
||||
# Review output for:
|
||||
# - Low CTR pages
|
||||
# - Low position pages
|
||||
# - High impression, low click pages
|
||||
|
||||
# 4. Generate action plan
|
||||
./seo report --output monthly_audit.md
|
||||
```
|
||||
|
||||
### Content Optimization Sprint
|
||||
|
||||
```bash
|
||||
# 1. Find keyword opportunities
|
||||
./seo keywords gsc.csv --limit 50 > opportunities.txt
|
||||
|
||||
# 2. For each opportunity:
|
||||
# - Review current content
|
||||
# - Optimize for target keyword
|
||||
# - Update meta description
|
||||
|
||||
# 3. Track improvements
|
||||
# Re-run analysis after 2 weeks
|
||||
./seo performance --gsc new_gsc.csv
|
||||
```
|
||||
|
||||
## Output Files
|
||||
|
||||
All analysis results are saved to `output/`:
|
||||
|
||||
| File | Description |
|
||||
|------|-------------|
|
||||
| `performance_data_*.csv` | Raw performance metrics |
|
||||
| `performance_analysis_*.csv` | Analysis with insights |
|
||||
| `seo_report_*.md` | Markdown report |
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### No Data Loaded
|
||||
|
||||
**Problem:** "No data loaded. Provide GA4 and/or GSC export files."
|
||||
|
||||
**Solution:**
|
||||
- Ensure CSV files are properly exported
|
||||
- Check file paths are correct
|
||||
- Verify CSV has required columns
|
||||
|
||||
### Column Name Errors
|
||||
|
||||
**Problem:** "KeyError: 'pageviews'"
|
||||
|
||||
**Solution:**
|
||||
- Ensure GA4 export includes pageviews column
|
||||
- Column names are normalized automatically
|
||||
- Check CSV encoding (UTF-8)
|
||||
|
||||
### API Authentication Errors
|
||||
|
||||
**Problem:** "Failed to initialize GA4 client"
|
||||
|
||||
**Solution:**
|
||||
- Verify service account JSON is valid
|
||||
- Check API is enabled in Google Cloud
|
||||
- Ensure service account has access to property
|
||||
|
||||
## Best Practices
|
||||
|
||||
### Data Collection
|
||||
|
||||
1. **Export regularly** - Weekly or monthly exports
|
||||
2. **Consistent date ranges** - Use same range for comparisons
|
||||
3. **Keep historical data** - Archive old exports for trend analysis
|
||||
|
||||
### Analysis
|
||||
|
||||
1. **Focus on trends** - Look at changes over time
|
||||
2. **Prioritize impact** - Fix high-traffic pages first
|
||||
3. **Track improvements** - Re-analyze after optimizations
|
||||
|
||||
### Reporting
|
||||
|
||||
1. **Regular reports** - Weekly/monthly cadence
|
||||
2. **Share insights** - Distribute to team/stakeholders
|
||||
3. **Action-oriented** - Include specific recommendations
|
||||
|
||||
## Related Commands
|
||||
|
||||
- `seo export` - Export posts from WordPress
|
||||
- `seo meta_description` - Generate meta descriptions
|
||||
- `seo update_meta` - Update meta on WordPress
|
||||
|
||||
## See Also
|
||||
|
||||
- [README.md](README.md) - Main documentation
|
||||
- [META_DESCRIPTION_GUIDE.md](META_DESCRIPTION_GUIDE.md) - Meta description guide
|
||||
- [ANALYTICS_SETUP.md](ANALYTICS_SETUP.md) - API setup guide (if exists)
|
||||
|
||||
---
|
||||
|
||||
**Made with ❤️ for better SEO automation**
|
||||
140
src/seo/app.py
140
src/seo/app.py
@@ -15,6 +15,9 @@ from .editorial_strategy import EditorialStrategyAnalyzer
|
||||
from .post_migrator import WordPressPostMigrator
|
||||
from .meta_description_generator import MetaDescriptionGenerator
|
||||
from .meta_description_updater import MetaDescriptionUpdater
|
||||
from .performance_tracker import SEOPerformanceTracker
|
||||
from .performance_analyzer import PerformanceAnalyzer
|
||||
from .media_importer import WordPressMediaImporter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -366,9 +369,142 @@ class SEOApp:
|
||||
def _find_latest_export(self) -> Optional[str]:
|
||||
"""Find the latest exported CSV file."""
|
||||
csv_files = list(self.output_dir.glob('all_posts_*.csv'))
|
||||
|
||||
|
||||
if not csv_files:
|
||||
return None
|
||||
|
||||
|
||||
latest = max(csv_files, key=lambda f: f.stat().st_ctime)
|
||||
return str(latest)
|
||||
|
||||
def performance(self, ga4_file: Optional[str] = None,
|
||||
gsc_file: Optional[str] = None,
|
||||
start_date: Optional[str] = None,
|
||||
end_date: Optional[str] = None,
|
||||
output_file: Optional[str] = None) -> Tuple[str, Dict]:
|
||||
"""
|
||||
Analyze page performance from GA4 and GSC data.
|
||||
|
||||
Args:
|
||||
ga4_file: Path to GA4 export CSV (or use API if credentials configured)
|
||||
gsc_file: Path to GSC export CSV (or use API if credentials configured)
|
||||
start_date: Start date YYYY-MM-DD (for API mode)
|
||||
end_date: End date YYYY-MM-DD (for API mode)
|
||||
output_file: Custom output file path
|
||||
|
||||
Returns:
|
||||
Tuple of (output_file_path, analysis_dict)
|
||||
"""
|
||||
logger.info("📊 Analyzing page performance...")
|
||||
|
||||
# If CSV files provided, use analyzer
|
||||
if ga4_file or gsc_file:
|
||||
analyzer = PerformanceAnalyzer()
|
||||
return analyzer.run(ga4_file=ga4_file, gsc_file=gsc_file, output_file=output_file)
|
||||
|
||||
# Otherwise try API mode
|
||||
tracker = SEOPerformanceTracker()
|
||||
if tracker.ga4_client or tracker.gsc_service:
|
||||
return tracker.run(start_date=start_date, end_date=end_date, output_file=output_file)
|
||||
else:
|
||||
logger.error("No data source available. Provide CSV exports or configure API credentials.")
|
||||
return "", {}
|
||||
|
||||
def keywords(self, gsc_file: str, limit: int = 50) -> List[Dict]:
|
||||
"""
|
||||
Analyze keyword opportunities from GSC data.
|
||||
|
||||
Args:
|
||||
gsc_file: Path to GSC export CSV
|
||||
limit: Maximum keywords to return
|
||||
|
||||
Returns:
|
||||
List of keyword opportunity dicts
|
||||
"""
|
||||
logger.info("🔍 Analyzing keyword opportunities...")
|
||||
|
||||
analyzer = PerformanceAnalyzer()
|
||||
analyzer.load_gsc_export(gsc_file)
|
||||
analysis = analyzer.analyze()
|
||||
|
||||
opportunities = analysis.get('keyword_opportunities', [])[:limit]
|
||||
|
||||
logger.info(f"Found {len(opportunities)} keyword opportunities")
|
||||
|
||||
return opportunities
|
||||
|
||||
def seo_report(self, output_file: Optional[str] = None) -> str:
|
||||
"""
|
||||
Generate comprehensive SEO performance report.
|
||||
|
||||
Args:
|
||||
output_file: Custom output file path
|
||||
|
||||
Returns:
|
||||
Path to report file
|
||||
"""
|
||||
logger.info("📄 Generating SEO report...")
|
||||
|
||||
if not output_file:
|
||||
output_dir = Path(__file__).parent.parent.parent / 'output'
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
output_file = output_dir / f'seo_report_{timestamp}.md'
|
||||
|
||||
output_file = Path(output_file)
|
||||
|
||||
# Generate report content
|
||||
report = self._generate_report_content()
|
||||
|
||||
# Write report
|
||||
with open(output_file, 'w', encoding='utf-8') as f:
|
||||
f.write(report)
|
||||
|
||||
logger.info(f"✓ Report saved to: {output_file}")
|
||||
return str(output_file)
|
||||
|
||||
def _generate_report_content(self) -> str:
|
||||
"""Generate markdown report content."""
|
||||
report = []
|
||||
report.append("# SEO Performance Report\n")
|
||||
report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||||
report.append("---\n")
|
||||
|
||||
# Summary section
|
||||
report.append("## 📊 Summary\n")
|
||||
report.append("This report provides insights into your website's SEO performance.\n")
|
||||
|
||||
# Add analysis sections
|
||||
report.append("## 📈 Traffic Analysis\n")
|
||||
report.append("*Import GA4/GSC data for detailed traffic analysis*\n")
|
||||
|
||||
report.append("## 🔍 Keyword Opportunities\n")
|
||||
report.append("*Import GSC data for keyword analysis*\n")
|
||||
|
||||
report.append("## 📝 SEO Recommendations\n")
|
||||
report.append("1. Review and optimize meta descriptions\n")
|
||||
report.append("2. Improve content for low-ranking pages\n")
|
||||
report.append("3. Build internal links to important pages\n")
|
||||
report.append("4. Monitor keyword rankings regularly\n")
|
||||
|
||||
return "\n".join(report)
|
||||
|
||||
def import_media(self, migration_report: str,
|
||||
source_site: str = 'mistergeek.net',
|
||||
destination_site: str = 'hellogeek.net',
|
||||
dry_run: bool = True) -> Dict:
|
||||
"""
|
||||
Import media from source to destination site for migrated posts.
|
||||
|
||||
Args:
|
||||
migration_report: Path to migration report CSV
|
||||
source_site: Source site name
|
||||
destination_site: Destination site name
|
||||
dry_run: If True, preview without importing
|
||||
|
||||
Returns:
|
||||
Statistics dict
|
||||
"""
|
||||
logger.info(f"📸 Importing media from {source_site} to {destination_site}...")
|
||||
|
||||
importer = WordPressMediaImporter(source_site, destination_site)
|
||||
return importer.run_from_migration_report(migration_report, dry_run=dry_run)
|
||||
|
||||
146
src/seo/cli.py
146
src/seo/cli.py
@@ -79,6 +79,16 @@ Examples:
|
||||
parser.add_argument('--category', nargs='+', help='Filter by category name(s)')
|
||||
parser.add_argument('--category-id', type=int, nargs='+', help='Filter by category ID(s)')
|
||||
parser.add_argument('--force', action='store_true', help='Force regenerate even for good quality meta descriptions')
|
||||
|
||||
# Performance arguments
|
||||
parser.add_argument('--ga4', help='Path to Google Analytics 4 export CSV')
|
||||
parser.add_argument('--gsc', help='Path to Google Search Console export CSV')
|
||||
parser.add_argument('--start-date', help='Start date YYYY-MM-DD (for API mode)')
|
||||
parser.add_argument('--end-date', help='End date YYYY-MM-DD (for API mode)')
|
||||
|
||||
# Media import arguments
|
||||
parser.add_argument('--from-site', help='Source site for media import (default: mistergeek.net)')
|
||||
parser.add_argument('--to-site', help='Destination site for media import (default: hellogeek.net)')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
@@ -107,6 +117,10 @@ Examples:
|
||||
'migrate': cmd_migrate,
|
||||
'meta_description': cmd_meta_description,
|
||||
'update_meta': cmd_update_meta,
|
||||
'performance': cmd_performance,
|
||||
'keywords': cmd_keywords,
|
||||
'report': cmd_report,
|
||||
'import_media': cmd_import_media,
|
||||
'status': cmd_status,
|
||||
'help': cmd_help,
|
||||
}
|
||||
@@ -513,6 +527,123 @@ def cmd_status(app, args):
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_performance(app, args):
|
||||
"""Analyze page performance from GA4 and GSC data."""
|
||||
if args.dry_run:
|
||||
print("Would analyze page performance")
|
||||
if args.ga4:
|
||||
print(f" GA4 file: {args.ga4}")
|
||||
if args.gsc:
|
||||
print(f" GSC file: {args.gsc}")
|
||||
return 0
|
||||
|
||||
print("Analyzing page performance...")
|
||||
|
||||
output_file, analysis = app.performance(
|
||||
ga4_file=args.ga4,
|
||||
gsc_file=args.gsc,
|
||||
start_date=args.start_date,
|
||||
end_date=args.end_date,
|
||||
output_file=args.output
|
||||
)
|
||||
|
||||
if output_file and analysis:
|
||||
print(f"\n✅ Performance analysis completed!")
|
||||
print(f" Results: {output_file}")
|
||||
print(f"\n📊 Summary:")
|
||||
summary = analysis.get('summary', {})
|
||||
print(f" Total pages: {summary.get('total_pages', 0)}")
|
||||
print(f" Total pageviews: {summary.get('total_pageviews', 0)}")
|
||||
print(f" Total clicks: {summary.get('total_clicks', 0)}")
|
||||
print(f" Average CTR: {summary.get('average_ctr', 0):.2%}")
|
||||
print(f" Average position: {summary.get('average_position', 0):.1f}")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_keywords(app, args):
|
||||
"""Analyze keyword opportunities from GSC data."""
|
||||
if args.dry_run:
|
||||
print("Would analyze keyword opportunities")
|
||||
if args.args:
|
||||
print(f" GSC file: {args.args[0]}")
|
||||
return 0
|
||||
|
||||
gsc_file = args.args[0] if args.args else None
|
||||
|
||||
if not gsc_file:
|
||||
print("❌ GSC export file required")
|
||||
print(" Usage: seo keywords <gsc_export.csv>")
|
||||
return 1
|
||||
|
||||
print(f"Analyzing keyword opportunities from {gsc_file}...")
|
||||
|
||||
opportunities = app.keywords(gsc_file=gsc_file, limit=args.limit or 50)
|
||||
|
||||
if opportunities:
|
||||
print(f"\n✅ Found {len(opportunities)} keyword opportunities!")
|
||||
print(f"\nTop opportunities:")
|
||||
for i, kw in enumerate(opportunities[:10], 1):
|
||||
print(f" {i}. {kw['query']} - Position: {kw['position']:.1f}, Impressions: {kw['impressions']}")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_report(app, args):
|
||||
"""Generate comprehensive SEO performance report."""
|
||||
if args.dry_run:
|
||||
print("Would generate SEO performance report")
|
||||
return 0
|
||||
|
||||
print("Generating SEO performance report...")
|
||||
|
||||
report_file = app.seo_report(output_file=args.output)
|
||||
|
||||
if report_file:
|
||||
print(f"\n✅ Report generated!")
|
||||
print(f" Report: {report_file}")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_import_media(app, args):
|
||||
"""Import media from source to destination site for migrated posts."""
|
||||
if args.dry_run:
|
||||
print("Would import media")
|
||||
print(f" Source: {args.from_site or 'mistergeek.net'}")
|
||||
print(f" Destination: {args.to_site or 'hellogeek.net'}")
|
||||
if args.args:
|
||||
print(f" Migration report: {args.args[0]}")
|
||||
return 0
|
||||
|
||||
migration_report = args.args[0] if args.args else None
|
||||
|
||||
if not migration_report:
|
||||
print("❌ Migration report CSV required")
|
||||
print(" Usage: seo import_media <migration_report.csv>")
|
||||
return 1
|
||||
|
||||
source_site = args.from_site or 'mistergeek.net'
|
||||
dest_site = args.to_site or 'hellogeek.net'
|
||||
|
||||
print(f"Importing media from {source_site} to {dest_site}...")
|
||||
print(f"Migration report: {migration_report}")
|
||||
|
||||
stats = app.import_media(
|
||||
migration_report=migration_report,
|
||||
source_site=source_site,
|
||||
destination_site=dest_site,
|
||||
dry_run=False
|
||||
)
|
||||
|
||||
if stats:
|
||||
print(f"\n✅ Media import completed!")
|
||||
print(f"\n📊 Summary:")
|
||||
print(f" Total posts: {stats.get('total_posts', 0)}")
|
||||
print(f" Posts with media: {stats.get('posts_with_media', 0)}")
|
||||
print(f" Images uploaded: {stats.get('images_uploaded', 0)}")
|
||||
print(f" Featured images set: {stats.get('featured_images_set', 0)}")
|
||||
print(f" Errors: {stats.get('errors', 0)}")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_help(app, args):
|
||||
"""Show help."""
|
||||
print("""
|
||||
@@ -549,6 +680,11 @@ Strategy & Migration:
|
||||
|
||||
Utility:
|
||||
status Show output files status
|
||||
performance [ga4.csv] [gsc.csv] Analyze page performance
|
||||
performance --ga4 analytics.csv --gsc search.csv Analyze with both sources
|
||||
keywords <gsc.csv> Show keyword opportunities
|
||||
report Generate SEO performance report
|
||||
import_media <report.csv> Import media for migrated posts
|
||||
help Show this help message
|
||||
|
||||
Export Options:
|
||||
@@ -570,6 +706,13 @@ Update Meta Options:
|
||||
--author Filter by author name(s)
|
||||
--force Force regenerate even for good quality meta descriptions
|
||||
|
||||
Performance Options:
|
||||
--ga4 Path to Google Analytics 4 export CSV
|
||||
--gsc Path to Google Search Console export CSV
|
||||
--start-date Start date YYYY-MM-DD (for API mode)
|
||||
--end-date End date YYYY-MM-DD (for API mode)
|
||||
--limit Limit number of results
|
||||
|
||||
Migration Options:
|
||||
--destination, --to Destination site: mistergeek.net, webscroll.fr, hellogeek.net
|
||||
--source, --from Source site for filtered migration
|
||||
@@ -617,6 +760,9 @@ Examples:
|
||||
seo update_meta --site A --category "VPN" --limit 10 # Update 10 posts in category
|
||||
seo update_meta --site A --author "john" --limit 10 # Update 10 posts by author
|
||||
seo update_meta --site A --dry-run # Preview changes
|
||||
seo performance --ga4 analytics.csv --gsc search.csv # Analyze performance
|
||||
seo keywords gsc_export.csv # Show keyword opportunities
|
||||
seo report # Generate SEO report
|
||||
seo status
|
||||
""")
|
||||
return 0
|
||||
|
||||
467
src/seo/media_importer.py
Normal file
467
src/seo/media_importer.py
Normal file
@@ -0,0 +1,467 @@
|
||||
"""
|
||||
Media Importer - Import media from one WordPress site to another
|
||||
Specifically designed for migrated posts
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
import requests
|
||||
from requests.auth import HTTPBasicAuth
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
import csv
|
||||
|
||||
from .config import Config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class WordPressMediaImporter:
|
||||
"""Import media from source WordPress site to destination site."""
|
||||
|
||||
def __init__(self, source_site: str = 'mistergeek.net',
|
||||
destination_site: str = 'hellogeek.net'):
|
||||
"""
|
||||
Initialize media importer.
|
||||
|
||||
Args:
|
||||
source_site: Source site name
|
||||
destination_site: Destination site name
|
||||
"""
|
||||
self.source_site = source_site
|
||||
self.destination_site = destination_site
|
||||
self.sites = Config.WORDPRESS_SITES
|
||||
|
||||
# Validate sites
|
||||
if source_site not in self.sites:
|
||||
raise ValueError(f"Source site '{source_site}' not found")
|
||||
if destination_site not in self.sites:
|
||||
raise ValueError(f"Destination site '{destination_site}' not found")
|
||||
|
||||
# Setup source
|
||||
self.source_config = self.sites[source_site]
|
||||
self.source_url = self.source_config['url'].rstrip('/')
|
||||
self.source_auth = HTTPBasicAuth(
|
||||
self.source_config['username'],
|
||||
self.source_config['password']
|
||||
)
|
||||
|
||||
# Setup destination
|
||||
self.dest_config = self.sites[destination_site]
|
||||
self.dest_url = self.dest_config['url'].rstrip('/')
|
||||
self.dest_auth = HTTPBasicAuth(
|
||||
self.dest_config['username'],
|
||||
self.dest_config['password']
|
||||
)
|
||||
|
||||
self.media_cache = {} # Cache source media ID -> dest media ID
|
||||
self.stats = {
|
||||
'total_posts': 0,
|
||||
'posts_with_media': 0,
|
||||
'images_downloaded': 0,
|
||||
'images_uploaded': 0,
|
||||
'featured_images_set': 0,
|
||||
'errors': 0
|
||||
}
|
||||
|
||||
def fetch_migrated_posts(self, post_ids: Optional[List[int]] = None) -> List[Dict]:
|
||||
"""
|
||||
Fetch posts that need media imported.
|
||||
|
||||
Args:
|
||||
post_ids: Specific post IDs to process
|
||||
|
||||
Returns:
|
||||
List of post dicts
|
||||
"""
|
||||
logger.info(f"Fetching posts from {self.destination_site}...")
|
||||
|
||||
if post_ids:
|
||||
# Fetch specific posts
|
||||
posts = []
|
||||
for post_id in post_ids:
|
||||
try:
|
||||
response = requests.get(
|
||||
f"{self.dest_url}/wp-json/wp/v2/posts/{post_id}",
|
||||
auth=self.dest_auth,
|
||||
timeout=10
|
||||
)
|
||||
if response.status_code == 200:
|
||||
posts.append(response.json())
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching post {post_id}: {e}")
|
||||
return posts
|
||||
else:
|
||||
# Fetch recent posts (assuming migrated posts are recent)
|
||||
try:
|
||||
response = requests.get(
|
||||
f"{self.dest_url}/wp-json/wp/v2/posts",
|
||||
params={
|
||||
'per_page': 100,
|
||||
'status': 'publish,draft',
|
||||
'_embed': True
|
||||
},
|
||||
auth=self.dest_auth,
|
||||
timeout=30
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching posts: {e}")
|
||||
return []
|
||||
|
||||
def get_source_post(self, post_id: int) -> Optional[Dict]:
|
||||
"""
|
||||
Fetch corresponding post from source site.
|
||||
|
||||
Args:
|
||||
post_id: Post ID on source site
|
||||
|
||||
Returns:
|
||||
Post dict or None
|
||||
"""
|
||||
try:
|
||||
response = requests.get(
|
||||
f"{self.source_url}/wp-json/wp/v2/posts/{post_id}",
|
||||
auth=self.source_auth,
|
||||
timeout=10,
|
||||
params={'_embed': True}
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
return response.json()
|
||||
else:
|
||||
logger.warning(f"Source post {post_id} not found")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching source post {post_id}: {e}")
|
||||
return None
|
||||
|
||||
def download_media(self, media_url: str) -> Optional[bytes]:
|
||||
"""
|
||||
Download media file from source site.
|
||||
|
||||
Args:
|
||||
media_url: URL of media file
|
||||
|
||||
Returns:
|
||||
File content bytes or None
|
||||
"""
|
||||
try:
|
||||
response = requests.get(media_url, timeout=30)
|
||||
response.raise_for_status()
|
||||
return response.content
|
||||
except Exception as e:
|
||||
logger.error(f"Error downloading {media_url}: {e}")
|
||||
return None
|
||||
|
||||
def upload_media(self, file_content: bytes, filename: str,
|
||||
mime_type: str = 'image/jpeg',
|
||||
alt_text: str = '',
|
||||
caption: str = '') -> Optional[int]:
|
||||
"""
|
||||
Upload media to destination site.
|
||||
|
||||
Args:
|
||||
file_content: File content bytes
|
||||
filename: Filename for the media
|
||||
mime_type: MIME type of the file
|
||||
alt_text: Alt text for the image
|
||||
caption: Caption for the image
|
||||
|
||||
Returns:
|
||||
Media ID on destination site or None
|
||||
"""
|
||||
try:
|
||||
# Upload file
|
||||
files = {'file': (filename, file_content, mime_type)}
|
||||
|
||||
response = requests.post(
|
||||
f"{self.dest_url}/wp-json/wp/v2/media",
|
||||
files=files,
|
||||
auth=self.dest_auth,
|
||||
headers={
|
||||
'Content-Disposition': f'attachment; filename={filename}',
|
||||
'Content-Type': mime_type
|
||||
},
|
||||
timeout=30
|
||||
)
|
||||
|
||||
if response.status_code == 201:
|
||||
media_data = response.json()
|
||||
media_id = media_data['id']
|
||||
|
||||
# Update alt text and caption
|
||||
if alt_text or caption:
|
||||
meta_update = {}
|
||||
if alt_text:
|
||||
meta_update['_wp_attachment_image_alt'] = alt_text
|
||||
if caption:
|
||||
meta_update['excerpt'] = caption
|
||||
|
||||
requests.post(
|
||||
f"{self.dest_url}/wp-json/wp/v2/media/{media_id}",
|
||||
json=meta_update,
|
||||
auth=self.dest_auth,
|
||||
timeout=10
|
||||
)
|
||||
|
||||
logger.info(f"✓ Uploaded {filename} (ID: {media_id})")
|
||||
return media_id
|
||||
else:
|
||||
logger.error(f"Error uploading {filename}: {response.status_code}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error uploading {filename}: {e}")
|
||||
return None
|
||||
|
||||
def import_featured_image(self, source_post: Dict, dest_post_id: int) -> bool:
|
||||
"""
|
||||
Import featured image from source post to destination post.
|
||||
|
||||
Args:
|
||||
source_post: Source post dict
|
||||
dest_post_id: Destination post ID
|
||||
|
||||
Returns:
|
||||
True if successful
|
||||
"""
|
||||
# Check if source has featured image
|
||||
featured_media_id = source_post.get('featured_media')
|
||||
if not featured_media_id:
|
||||
logger.info(f" No featured image on source post")
|
||||
return False
|
||||
|
||||
# Check if already imported
|
||||
if featured_media_id in self.media_cache:
|
||||
dest_media_id = self.media_cache[featured_media_id]
|
||||
logger.info(f" Using cached media ID: {dest_media_id}")
|
||||
else:
|
||||
# Fetch media details from source
|
||||
try:
|
||||
media_response = requests.get(
|
||||
f"{self.source_url}/wp-json/wp/v2/media/{featured_media_id}",
|
||||
auth=self.source_auth,
|
||||
timeout=10
|
||||
)
|
||||
|
||||
if media_response.status_code != 200:
|
||||
logger.error(f"Could not fetch media {featured_media_id}")
|
||||
return False
|
||||
|
||||
media_data = media_response.json()
|
||||
|
||||
# Download media file
|
||||
media_url = media_data.get('source_url', '')
|
||||
if not media_url:
|
||||
# Try alternative URL structure
|
||||
media_url = media_data.get('guid', {}).get('rendered', '')
|
||||
|
||||
file_content = self.download_media(media_url)
|
||||
if not file_content:
|
||||
return False
|
||||
|
||||
# Extract filename and mime type
|
||||
filename = media_data.get('slug', 'image.jpg') + '.jpg'
|
||||
mime_type = media_data.get('mime_type', 'image/jpeg')
|
||||
alt_text = media_data.get('alt_text', '')
|
||||
caption = media_data.get('caption', {}).get('rendered', '')
|
||||
|
||||
# Upload to destination
|
||||
dest_media_id = self.upload_media(
|
||||
file_content, filename, mime_type, alt_text, caption
|
||||
)
|
||||
|
||||
if not dest_media_id:
|
||||
return False
|
||||
|
||||
# Cache the mapping
|
||||
self.media_cache[featured_media_id] = dest_media_id
|
||||
self.stats['images_uploaded'] += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error importing featured image: {e}")
|
||||
return False
|
||||
|
||||
# Set featured image on destination post
|
||||
try:
|
||||
response = requests.post(
|
||||
f"{self.dest_url}/wp-json/wp/v2/posts/{dest_post_id}",
|
||||
json={'featured_media': dest_media_id},
|
||||
auth=self.dest_auth,
|
||||
timeout=10
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
logger.info(f"✓ Set featured image on post {dest_post_id}")
|
||||
self.stats['featured_images_set'] += 1
|
||||
return True
|
||||
else:
|
||||
logger.error(f"Error setting featured image: {response.status_code}")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error setting featured image: {e}")
|
||||
return False
|
||||
|
||||
def import_post_media(self, source_post: Dict, dest_post_id: int) -> int:
|
||||
"""
|
||||
Import all media from a post (featured image + inline images).
|
||||
|
||||
Args:
|
||||
source_post: Source post dict
|
||||
dest_post_id: Destination post ID
|
||||
|
||||
Returns:
|
||||
Number of images imported
|
||||
"""
|
||||
images_imported = 0
|
||||
|
||||
# Import featured image
|
||||
if self.import_featured_image(source_post, dest_post_id):
|
||||
images_imported += 1
|
||||
|
||||
# TODO: Import inline images from content
|
||||
# This would require parsing the content for <img> tags
|
||||
# and replacing source URLs with destination URLs
|
||||
|
||||
return images_imported
|
||||
|
||||
def process_posts(self, post_mappings: List[Tuple[int, int]],
|
||||
dry_run: bool = False) -> Dict:
|
||||
"""
|
||||
Process media import for mapped posts.
|
||||
|
||||
Args:
|
||||
post_mappings: List of (source_post_id, dest_post_id) tuples
|
||||
dry_run: If True, preview without importing
|
||||
|
||||
Returns:
|
||||
Statistics dict
|
||||
"""
|
||||
logger.info("\n" + "="*70)
|
||||
logger.info("MEDIA IMPORTER")
|
||||
logger.info("="*70)
|
||||
logger.info(f"Source: {self.source_site}")
|
||||
logger.info(f"Destination: {self.destination_site}")
|
||||
logger.info(f"Posts to process: {len(post_mappings)}")
|
||||
logger.info(f"Dry run: {dry_run}")
|
||||
logger.info("="*70)
|
||||
|
||||
self.stats['total_posts'] = len(post_mappings)
|
||||
|
||||
for i, (source_id, dest_id) in enumerate(post_mappings, 1):
|
||||
logger.info(f"\n[{i}/{len(post_mappings)}] Processing post mapping:")
|
||||
logger.info(f" Source: {source_id} → Destination: {dest_id}")
|
||||
|
||||
# Fetch source post
|
||||
source_post = self.get_source_post(source_id)
|
||||
if not source_post:
|
||||
logger.warning(f" Skipping: Source post not found")
|
||||
self.stats['errors'] += 1
|
||||
continue
|
||||
|
||||
# Check if source has media
|
||||
if not source_post.get('featured_media'):
|
||||
logger.info(f" No featured image to import")
|
||||
continue
|
||||
|
||||
self.stats['posts_with_media'] += 1
|
||||
|
||||
if dry_run:
|
||||
logger.info(f" [DRY RUN] Would import featured image")
|
||||
self.stats['images_downloaded'] += 1
|
||||
self.stats['images_uploaded'] += 1
|
||||
self.stats['featured_images_set'] += 1
|
||||
else:
|
||||
# Import media
|
||||
imported = self.import_post_media(source_post, dest_id)
|
||||
if imported > 0:
|
||||
self.stats['images_downloaded'] += imported
|
||||
|
||||
# Print summary
|
||||
logger.info("\n" + "="*70)
|
||||
logger.info("IMPORT SUMMARY")
|
||||
logger.info("="*70)
|
||||
logger.info(f"Total posts: {self.stats['total_posts']}")
|
||||
logger.info(f"Posts with media: {self.stats['posts_with_media']}")
|
||||
logger.info(f"Images downloaded: {self.stats['images_downloaded']}")
|
||||
logger.info(f"Images uploaded: {self.stats['images_uploaded']}")
|
||||
logger.info(f"Featured images set: {self.stats['featured_images_set']}")
|
||||
logger.info(f"Errors: {self.stats['errors']}")
|
||||
logger.info("="*70)
|
||||
|
||||
return self.stats
|
||||
|
||||
def run_from_csv(self, csv_file: str, dry_run: bool = False) -> Dict:
|
||||
"""
|
||||
Import media for posts listed in CSV file.
|
||||
|
||||
CSV should have columns: source_post_id, destination_post_id
|
||||
|
||||
Args:
|
||||
csv_file: Path to CSV file with post mappings
|
||||
dry_run: If True, preview without importing
|
||||
|
||||
Returns:
|
||||
Statistics dict
|
||||
"""
|
||||
logger.info(f"Loading post mappings from: {csv_file}")
|
||||
|
||||
try:
|
||||
with open(csv_file, 'r', encoding='utf-8') as f:
|
||||
reader = csv.DictReader(f)
|
||||
mappings = []
|
||||
|
||||
for row in reader:
|
||||
source_id = int(row.get('source_post_id', 0))
|
||||
dest_id = int(row.get('destination_post_id', 0))
|
||||
|
||||
if source_id and dest_id:
|
||||
mappings.append((source_id, dest_id))
|
||||
|
||||
logger.info(f"✓ Loaded {len(mappings)} post mappings")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading CSV: {e}")
|
||||
return self.stats
|
||||
|
||||
return self.process_posts(mappings, dry_run=dry_run)
|
||||
|
||||
def run_from_migration_report(self, report_file: str,
|
||||
dry_run: bool = False) -> Dict:
|
||||
"""
|
||||
Import media using migration report CSV.
|
||||
|
||||
Args:
|
||||
report_file: Path to migration report CSV
|
||||
dry_run: If True, preview without importing
|
||||
|
||||
Returns:
|
||||
Statistics dict
|
||||
"""
|
||||
logger.info(f"Loading migration report: {report_file}")
|
||||
|
||||
try:
|
||||
with open(report_file, 'r', encoding='utf-8') as f:
|
||||
reader = csv.DictReader(f)
|
||||
mappings = []
|
||||
|
||||
for row in reader:
|
||||
source_id = int(row.get('source_post_id', 0))
|
||||
dest_id = int(row.get('destination_post_id', 0))
|
||||
|
||||
if source_id and dest_id:
|
||||
mappings.append((source_id, dest_id))
|
||||
|
||||
logger.info(f"✓ Loaded {len(mappings)} post mappings from migration report")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading migration report: {e}")
|
||||
return self.stats
|
||||
|
||||
return self.process_posts(mappings, dry_run=dry_run)
|
||||
396
src/seo/performance_analyzer.py
Normal file
396
src/seo/performance_analyzer.py
Normal file
@@ -0,0 +1,396 @@
|
||||
"""
|
||||
SEO Performance Analyzer - Analyze page performance from imported data
|
||||
Supports Google Analytics and Search Console CSV imports
|
||||
"""
|
||||
|
||||
import csv
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PerformanceAnalyzer:
|
||||
"""Analyze SEO performance from imported CSV data."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize performance analyzer."""
|
||||
self.performance_data = []
|
||||
self.analysis_results = {}
|
||||
|
||||
def load_ga4_export(self, csv_file: str) -> List[Dict]:
|
||||
"""
|
||||
Load Google Analytics 4 export CSV.
|
||||
|
||||
Expected columns: page_path, page_title, pageviews, sessions, bounce_rate, etc.
|
||||
|
||||
Args:
|
||||
csv_file: Path to GA4 export CSV
|
||||
|
||||
Returns:
|
||||
List of data dicts
|
||||
"""
|
||||
logger.info(f"Loading GA4 export: {csv_file}")
|
||||
|
||||
try:
|
||||
with open(csv_file, 'r', encoding='utf-8') as f:
|
||||
reader = csv.DictReader(f)
|
||||
data = list(reader)
|
||||
|
||||
# Normalize column names
|
||||
normalized = []
|
||||
for row in data:
|
||||
normalized_row = {}
|
||||
for key, value in row.items():
|
||||
# Normalize key names
|
||||
new_key = key.lower().replace(' ', '_').replace('-', '_')
|
||||
if 'page' in new_key and 'path' in new_key:
|
||||
normalized_row['page'] = value
|
||||
elif 'page' in new_key and 'title' in new_key:
|
||||
normalized_row['page_title'] = value
|
||||
elif 'pageviews' in new_key or 'views' in new_key:
|
||||
normalized_row['pageviews'] = int(value) if value else 0
|
||||
elif 'sessions' in new_key:
|
||||
normalized_row['sessions'] = int(value) if value else 0
|
||||
elif 'bounce' in new_key and 'rate' in new_key:
|
||||
normalized_row['bounce_rate'] = float(value) if value else 0.0
|
||||
elif 'engagement' in new_key and 'rate' in new_key:
|
||||
normalized_row['engagement_rate'] = float(value) if value else 0.0
|
||||
elif 'duration' in new_key or 'time' in new_key:
|
||||
normalized_row['avg_session_duration'] = float(value) if value else 0.0
|
||||
else:
|
||||
normalized_row[new_key] = value
|
||||
|
||||
normalized.append(normalized_row)
|
||||
|
||||
self.performance_data.extend(normalized)
|
||||
logger.info(f"✓ Loaded {len(normalized)} rows from GA4")
|
||||
return normalized
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading GA4 export: {e}")
|
||||
return []
|
||||
|
||||
def load_gsc_export(self, csv_file: str) -> List[Dict]:
|
||||
"""
|
||||
Load Google Search Console export CSV.
|
||||
|
||||
Expected columns: Page, Clicks, Impressions, CTR, Position
|
||||
|
||||
Args:
|
||||
csv_file: Path to GSC export CSV
|
||||
|
||||
Returns:
|
||||
List of data dicts
|
||||
"""
|
||||
logger.info(f"Loading GSC export: {csv_file}")
|
||||
|
||||
try:
|
||||
with open(csv_file, 'r', encoding='utf-8') as f:
|
||||
reader = csv.DictReader(f)
|
||||
data = list(reader)
|
||||
|
||||
# Normalize column names
|
||||
normalized = []
|
||||
for row in data:
|
||||
normalized_row = {'page': ''}
|
||||
for key, value in row.items():
|
||||
new_key = key.lower().replace(' ', '_')
|
||||
if 'page' in new_key or 'url' in new_key:
|
||||
normalized_row['page'] = value
|
||||
elif 'clicks' in new_key:
|
||||
normalized_row['clicks'] = int(value) if value else 0
|
||||
elif 'impressions' in new_key:
|
||||
normalized_row['impressions'] = int(value) if value else 0
|
||||
elif 'ctr' in new_key:
|
||||
normalized_row['ctr'] = float(value) if value else 0.0
|
||||
elif 'position' in new_key or 'rank' in new_key:
|
||||
normalized_row['position'] = float(value) if value else 0.0
|
||||
elif 'query' in new_key or 'keyword' in new_key:
|
||||
normalized_row['query'] = value
|
||||
|
||||
normalized.append(normalized_row)
|
||||
|
||||
# Merge with existing data
|
||||
self._merge_gsc_data(normalized)
|
||||
|
||||
logger.info(f"✓ Loaded {len(normalized)} rows from GSC")
|
||||
return normalized
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading GSC export: {e}")
|
||||
return []
|
||||
|
||||
def _merge_gsc_data(self, gsc_data: List[Dict]):
|
||||
"""Merge GSC data with existing performance data."""
|
||||
# Create lookup by page
|
||||
existing_pages = {p.get('page', ''): p for p in self.performance_data}
|
||||
|
||||
for gsc_row in gsc_data:
|
||||
page = gsc_row.get('page', '')
|
||||
|
||||
if page in existing_pages:
|
||||
# Update existing record
|
||||
existing_pages[page].update(gsc_row)
|
||||
else:
|
||||
# Add new record
|
||||
new_record = {
|
||||
'page': page,
|
||||
'page_title': '',
|
||||
'pageviews': 0,
|
||||
'sessions': 0,
|
||||
'bounce_rate': 0.0,
|
||||
'engagement_rate': 0.0,
|
||||
'avg_session_duration': 0.0
|
||||
}
|
||||
new_record.update(gsc_row)
|
||||
self.performance_data.append(new_record)
|
||||
|
||||
def analyze(self) -> Dict:
|
||||
"""
|
||||
Analyze performance data.
|
||||
|
||||
Returns:
|
||||
Analysis results dict
|
||||
"""
|
||||
if not self.performance_data:
|
||||
logger.warning("No data to analyze")
|
||||
return {}
|
||||
|
||||
logger.info("\n" + "="*70)
|
||||
logger.info("PERFORMANCE ANALYSIS")
|
||||
logger.info("="*70)
|
||||
|
||||
# Calculate summary metrics
|
||||
total_pages = len(self.performance_data)
|
||||
total_pageviews = sum(p.get('pageviews', 0) for p in self.performance_data)
|
||||
total_clicks = sum(p.get('clicks', 0) for p in self.performance_data)
|
||||
total_impressions = sum(p.get('impressions', 0) for p in self.performance_data)
|
||||
|
||||
avg_ctr = total_clicks / total_impressions if total_impressions > 0 else 0.0
|
||||
avg_position = sum(p.get('position', 0) for p in self.performance_data) / total_pages if total_pages > 0 else 0.0
|
||||
|
||||
# Top pages
|
||||
top_by_views = sorted(
|
||||
self.performance_data,
|
||||
key=lambda x: x.get('pageviews', 0),
|
||||
reverse=True
|
||||
)[:20]
|
||||
|
||||
top_by_clicks = sorted(
|
||||
self.performance_data,
|
||||
key=lambda x: x.get('clicks', 0),
|
||||
reverse=True
|
||||
)[:20]
|
||||
|
||||
# Pages with issues
|
||||
low_ctr = [
|
||||
p for p in self.performance_data
|
||||
if p.get('impressions', 0) > 100 and p.get('ctr', 0) < 0.02
|
||||
]
|
||||
|
||||
low_position = [
|
||||
p for p in self.performance_data
|
||||
if p.get('impressions', 0) > 50 and p.get('position', 0) > 20
|
||||
]
|
||||
|
||||
high_impressions_low_clicks = [
|
||||
p for p in self.performance_data
|
||||
if p.get('impressions', 0) > 500 and p.get('ctr', 0) < 0.01
|
||||
]
|
||||
|
||||
# Keyword opportunities (from GSC data)
|
||||
keyword_opportunities = self._analyze_keywords()
|
||||
|
||||
analysis = {
|
||||
'summary': {
|
||||
'total_pages': total_pages,
|
||||
'total_pageviews': total_pageviews,
|
||||
'total_clicks': total_clicks,
|
||||
'total_impressions': total_impressions,
|
||||
'average_ctr': avg_ctr,
|
||||
'average_position': avg_position
|
||||
},
|
||||
'top_pages': {
|
||||
'by_views': top_by_views,
|
||||
'by_clicks': top_by_clicks
|
||||
},
|
||||
'issues': {
|
||||
'low_ctr': low_ctr,
|
||||
'low_position': low_position,
|
||||
'high_impressions_low_clicks': high_impressions_low_clicks
|
||||
},
|
||||
'keyword_opportunities': keyword_opportunities,
|
||||
'recommendations': self._generate_recommendations(analysis)
|
||||
}
|
||||
|
||||
# Log summary
|
||||
logger.info(f"Total pages analyzed: {total_pages}")
|
||||
logger.info(f"Total pageviews: {total_pageviews}")
|
||||
logger.info(f"Total clicks: {total_clicks}")
|
||||
logger.info(f"Total impressions: {total_impressions}")
|
||||
logger.info(f"Average CTR: {avg_ctr:.2%}")
|
||||
logger.info(f"Average position: {avg_position:.1f}")
|
||||
logger.info(f"\nPages with low CTR: {len(low_ctr)}")
|
||||
logger.info(f"Pages with low position: {len(low_position)}")
|
||||
logger.info(f"High impression, low click pages: {len(high_impressions_low_clicks)}")
|
||||
logger.info("="*70)
|
||||
|
||||
self.analysis_results = analysis
|
||||
return analysis
|
||||
|
||||
def _analyze_keywords(self) -> List[Dict]:
|
||||
"""Analyze keyword opportunities from GSC data."""
|
||||
keywords = {}
|
||||
|
||||
for page in self.performance_data:
|
||||
query = page.get('query', '')
|
||||
if not query:
|
||||
continue
|
||||
|
||||
if query not in keywords:
|
||||
keywords[query] = {
|
||||
'query': query,
|
||||
'clicks': 0,
|
||||
'impressions': 0,
|
||||
'position': 0.0,
|
||||
'pages': []
|
||||
}
|
||||
|
||||
keywords[query]['clicks'] += page.get('clicks', 0)
|
||||
keywords[query]['impressions'] += page.get('impressions', 0)
|
||||
keywords[query]['pages'].append(page.get('page', ''))
|
||||
|
||||
# Calculate average position per keyword
|
||||
for query in keywords:
|
||||
positions = [
|
||||
p.get('position', 0) for p in self.performance_data
|
||||
if p.get('query') == query
|
||||
]
|
||||
if positions:
|
||||
keywords[query]['position'] = sum(positions) / len(positions)
|
||||
|
||||
# Sort by impressions
|
||||
keyword_list = list(keywords.values())
|
||||
keyword_list.sort(key=lambda x: x['impressions'], reverse=True)
|
||||
|
||||
# Filter opportunities (position 5-20, high impressions)
|
||||
opportunities = [
|
||||
k for k in keyword_list
|
||||
if 5 <= k['position'] <= 20 and k['impressions'] > 100
|
||||
]
|
||||
|
||||
return opportunities[:50] # Top 50 opportunities
|
||||
|
||||
def _generate_recommendations(self, analysis: Dict) -> List[str]:
|
||||
"""Generate SEO recommendations."""
|
||||
recommendations = []
|
||||
|
||||
issues = analysis.get('issues', {})
|
||||
|
||||
# Low CTR
|
||||
low_ctr_count = len(issues.get('low_ctr', []))
|
||||
if low_ctr_count > 0:
|
||||
recommendations.append(
|
||||
f"📝 {low_ctr_count} pages have low CTR (<2% with 100+ impressions). "
|
||||
"Improve meta titles and descriptions to increase click-through rate."
|
||||
)
|
||||
|
||||
# Low position
|
||||
low_pos_count = len(issues.get('low_position', []))
|
||||
if low_pos_count > 0:
|
||||
recommendations.append(
|
||||
f"📊 {low_pos_count} pages rank beyond position 20. "
|
||||
"Consider content optimization and internal linking."
|
||||
)
|
||||
|
||||
# High impressions, low clicks
|
||||
high_imp_count = len(issues.get('high_impressions_low_clicks', []))
|
||||
if high_imp_count > 0:
|
||||
recommendations.append(
|
||||
f"⚠️ {high_imp_count} pages have 500+ impressions but <1% CTR. "
|
||||
"These are prime candidates for title/description optimization."
|
||||
)
|
||||
|
||||
# Keyword opportunities
|
||||
keyword_count = len(analysis.get('keyword_opportunities', []))
|
||||
if keyword_count > 0:
|
||||
recommendations.append(
|
||||
f"🎯 {keyword_count} keyword opportunities identified (ranking 5-20). "
|
||||
"Focus content optimization on these keywords."
|
||||
)
|
||||
|
||||
return recommendations
|
||||
|
||||
def save_analysis(self, output_file: Optional[str] = None) -> str:
|
||||
"""
|
||||
Save analysis results to CSV.
|
||||
|
||||
Args:
|
||||
output_file: Custom output file path
|
||||
|
||||
Returns:
|
||||
Path to saved file
|
||||
"""
|
||||
if not output_file:
|
||||
output_dir = Path(__file__).parent.parent.parent / 'output'
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
output_file = output_dir / f'performance_analysis_{timestamp}.csv'
|
||||
|
||||
output_file = Path(output_file)
|
||||
output_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
fieldnames = [
|
||||
'page', 'page_title', 'pageviews', 'sessions', 'bounce_rate',
|
||||
'engagement_rate', 'avg_session_duration', 'clicks', 'impressions',
|
||||
'ctr', 'position', 'query'
|
||||
]
|
||||
|
||||
logger.info(f"Saving analysis to {output_file}...")
|
||||
|
||||
with open(output_file, 'w', newline='', encoding='utf-8') as f:
|
||||
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
writer.writerows(self.performance_data)
|
||||
|
||||
logger.info(f"✓ Saved to: {output_file}")
|
||||
return str(output_file)
|
||||
|
||||
def run(self, ga4_file: Optional[str] = None,
|
||||
gsc_file: Optional[str] = None,
|
||||
output_file: Optional[str] = None) -> Tuple[str, Dict]:
|
||||
"""
|
||||
Run complete performance analysis.
|
||||
|
||||
Args:
|
||||
ga4_file: Path to GA4 export CSV
|
||||
gsc_file: Path to GSC export CSV
|
||||
output_file: Custom output file path
|
||||
|
||||
Returns:
|
||||
Tuple of (output_file_path, analysis_dict)
|
||||
"""
|
||||
logger.info("\n" + "="*70)
|
||||
logger.info("SEO PERFORMANCE ANALYZER")
|
||||
logger.info("="*70)
|
||||
|
||||
# Load data
|
||||
if ga4_file:
|
||||
self.load_ga4_export(ga4_file)
|
||||
if gsc_file:
|
||||
self.load_gsc_export(gsc_file)
|
||||
|
||||
if not self.performance_data:
|
||||
logger.error("No data loaded. Provide GA4 and/or GSC export files.")
|
||||
return "", {}
|
||||
|
||||
# Analyze
|
||||
analysis = self.analyze()
|
||||
|
||||
# Save
|
||||
output_path = self.save_analysis(output_file)
|
||||
|
||||
return output_path, analysis
|
||||
494
src/seo/performance_tracker.py
Normal file
494
src/seo/performance_tracker.py
Normal file
@@ -0,0 +1,494 @@
|
||||
"""
|
||||
SEO Performance Tracker - Google Analytics 4 & Search Console Integration
|
||||
Fetch and analyze page performance data for SEO optimization
|
||||
"""
|
||||
|
||||
import csv
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
# Optional Google imports
|
||||
try:
|
||||
from google.analytics.admin import AnalyticsAdminServiceClient
|
||||
from google.analytics.data import BetaAnalyticsDataClient
|
||||
from google.analytics.data_v1beta.types import (
|
||||
DateRange,
|
||||
Dimension,
|
||||
Metric,
|
||||
RunReportRequest,
|
||||
)
|
||||
from google.oauth2 import service_account
|
||||
from googleapiclient.discovery import build
|
||||
GOOGLE_AVAILABLE = True
|
||||
except ImportError:
|
||||
GOOGLE_AVAILABLE = False
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.warning("Google libraries not installed. API mode disabled. Use CSV imports instead.")
|
||||
|
||||
from .config import Config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SEOPerformanceTracker:
|
||||
"""Track and analyze SEO performance from Google Analytics and Search Console."""
|
||||
|
||||
def __init__(self, ga4_credentials: Optional[str] = None,
|
||||
gsc_credentials: Optional[str] = None,
|
||||
ga4_property_id: Optional[str] = None,
|
||||
gsc_site_url: Optional[str] = None):
|
||||
"""
|
||||
Initialize performance tracker.
|
||||
|
||||
Args:
|
||||
ga4_credentials: Path to GA4 service account JSON
|
||||
gsc_credentials: Path to GSC service account JSON
|
||||
ga4_property_id: GA4 property ID (e.g., "properties/123456789")
|
||||
gsc_site_url: GSC site URL (e.g., "https://www.mistergeek.net")
|
||||
"""
|
||||
self.ga4_credentials = ga4_credentials or Config.GA4_CREDENTIALS
|
||||
self.gsc_credentials = gsc_credentials or Config.GSC_CREDENTIALS
|
||||
self.ga4_property_id = ga4_property_id or Config.GA4_PROPERTY_ID
|
||||
self.gsc_site_url = gsc_site_url or Config.GSC_SITE_URL
|
||||
|
||||
self.ga4_client = None
|
||||
self.gsc_service = None
|
||||
|
||||
# Initialize clients
|
||||
self._init_ga4_client()
|
||||
self._init_gsc_service()
|
||||
|
||||
self.performance_data = []
|
||||
|
||||
def _init_ga4_client(self):
|
||||
"""Initialize Google Analytics 4 client."""
|
||||
if not GOOGLE_AVAILABLE:
|
||||
logger.warning("Google libraries not installed. API mode disabled.")
|
||||
return
|
||||
|
||||
if not self.ga4_credentials or not self.ga4_property_id:
|
||||
logger.warning("GA4 credentials not configured")
|
||||
return
|
||||
|
||||
try:
|
||||
credentials = service_account.Credentials.from_service_account_file(
|
||||
self.ga4_credentials,
|
||||
scopes=["https://www.googleapis.com/auth/analytics.readonly"]
|
||||
)
|
||||
self.ga4_client = BetaAnalyticsDataClient(credentials=credentials)
|
||||
logger.info("✓ GA4 client initialized")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize GA4 client: {e}")
|
||||
self.ga4_client = None
|
||||
|
||||
def _init_gsc_service(self):
|
||||
"""Initialize Google Search Console service."""
|
||||
if not GOOGLE_AVAILABLE:
|
||||
logger.warning("Google libraries not installed. API mode disabled.")
|
||||
return
|
||||
|
||||
if not self.gsc_credentials:
|
||||
logger.warning("GSC credentials not configured")
|
||||
return
|
||||
|
||||
try:
|
||||
credentials = service_account.Credentials.from_service_account_file(
|
||||
self.gsc_credentials,
|
||||
scopes=["https://www.googleapis.com/auth/webmasters.readonly"]
|
||||
)
|
||||
self.gsc_service = build('webmasters', 'v3', credentials=credentials)
|
||||
logger.info("✓ GSC service initialized")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize GSC service: {e}")
|
||||
self.gsc_service = None
|
||||
|
||||
def fetch_ga4_data(self, start_date: str, end_date: str,
|
||||
dimensions: Optional[List[str]] = None) -> List[Dict]:
|
||||
"""
|
||||
Fetch data from Google Analytics 4.
|
||||
|
||||
Args:
|
||||
start_date: Start date (YYYY-MM-DD)
|
||||
end_date: End date (YYYY-MM-DD)
|
||||
dimensions: List of dimensions to fetch
|
||||
|
||||
Returns:
|
||||
List of performance data dicts
|
||||
"""
|
||||
if not self.ga4_client:
|
||||
logger.warning("GA4 client not available")
|
||||
return []
|
||||
|
||||
logger.info(f"Fetching GA4 data from {start_date} to {end_date}...")
|
||||
|
||||
# Default dimensions
|
||||
if dimensions is None:
|
||||
dimensions = ['pagePath', 'pageTitle']
|
||||
|
||||
# Default metrics
|
||||
metrics = [
|
||||
'screenPageViews',
|
||||
'sessions',
|
||||
'bounceRate',
|
||||
'averageSessionDuration',
|
||||
'engagementRate'
|
||||
]
|
||||
|
||||
try:
|
||||
request = RunReportRequest(
|
||||
property=self.ga4_property_id,
|
||||
dimensions=[Dimension(name=dim) for dim in dimensions],
|
||||
metrics=[Metric(name=metric) for metric in metrics],
|
||||
date_ranges=[DateRange(start_date=start_date, end_date=end_date)]
|
||||
)
|
||||
|
||||
response = self.ga4_client.run_report(request)
|
||||
|
||||
data = []
|
||||
for row in response.rows:
|
||||
row_data = {}
|
||||
|
||||
# Extract dimensions
|
||||
for i, dim_header in enumerate(response.dimension_headers):
|
||||
row_data[dim_header.name] = row.dimension_values[i].value
|
||||
|
||||
# Extract metrics
|
||||
for i, metric_header in enumerate(response.metric_headers):
|
||||
value = row.metric_values[i].value
|
||||
# Convert to appropriate type
|
||||
if metric_header.name in ['bounceRate', 'engagementRate']:
|
||||
value = float(value) if value else 0.0
|
||||
elif metric_header.name in ['screenPageViews', 'sessions']:
|
||||
value = int(value) if value else 0
|
||||
elif metric_header.name == 'averageSessionDuration':
|
||||
value = float(value) if value else 0.0
|
||||
row_data[metric_header.name] = value
|
||||
|
||||
data.append(row_data)
|
||||
|
||||
logger.info(f"✓ Fetched {len(data)} rows from GA4")
|
||||
return data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching GA4 data: {e}")
|
||||
return []
|
||||
|
||||
def fetch_gsc_data(self, start_date: str, end_date: str,
|
||||
dimensions: Optional[List[str]] = None) -> List[Dict]:
|
||||
"""
|
||||
Fetch data from Google Search Console.
|
||||
|
||||
Args:
|
||||
start_date: Start date (YYYY-MM-DD)
|
||||
end_date: End date (YYYY-MM-DD)
|
||||
dimensions: List of dimensions to fetch
|
||||
|
||||
Returns:
|
||||
List of performance data dicts
|
||||
"""
|
||||
if not self.gsc_service:
|
||||
logger.warning("GSC service not available")
|
||||
return []
|
||||
|
||||
logger.info(f"Fetching GSC data from {start_date} to {end_date}...")
|
||||
|
||||
# Default dimensions
|
||||
if dimensions is None:
|
||||
dimensions = ['page']
|
||||
|
||||
try:
|
||||
# Build request
|
||||
request = {
|
||||
'startDate': start_date,
|
||||
'endDate': end_date,
|
||||
'dimensions': dimensions,
|
||||
'rowLimit': 5000,
|
||||
'startRow': 0
|
||||
}
|
||||
|
||||
response = self.gsc_service.searchanalytics().query(
|
||||
siteUrl=self.gsc_site_url,
|
||||
body=request
|
||||
).execute()
|
||||
|
||||
data = []
|
||||
if 'rows' in response:
|
||||
for row in response['rows']:
|
||||
row_data = {
|
||||
'page': row['keys'][0] if len(row['keys']) > 0 else '',
|
||||
'clicks': row.get('clicks', 0),
|
||||
'impressions': row.get('impressions', 0),
|
||||
'ctr': row.get('ctr', 0.0),
|
||||
'position': row.get('position', 0.0)
|
||||
}
|
||||
|
||||
# Add query if available
|
||||
if len(row['keys']) > 1:
|
||||
row_data['query'] = row['keys'][1]
|
||||
|
||||
data.append(row_data)
|
||||
|
||||
logger.info(f"✓ Fetched {len(data)} rows from GSC")
|
||||
return data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching GSC data: {e}")
|
||||
return []
|
||||
|
||||
def fetch_combined_data(self, start_date: str, end_date: str) -> List[Dict]:
|
||||
"""
|
||||
Fetch and combine data from GA4 and GSC.
|
||||
|
||||
Args:
|
||||
start_date: Start date (YYYY-MM-DD)
|
||||
end_date: End date (YYYY-MM-DD)
|
||||
|
||||
Returns:
|
||||
List of combined performance data dicts
|
||||
"""
|
||||
logger.info("\n" + "="*70)
|
||||
logger.info("FETCHING PERFORMANCE DATA")
|
||||
logger.info("="*70)
|
||||
|
||||
# Fetch from both sources
|
||||
ga4_data = self.fetch_ga4_data(start_date, end_date)
|
||||
gsc_data = self.fetch_gsc_data(start_date, end_date)
|
||||
|
||||
# Combine data by page path
|
||||
combined = {}
|
||||
|
||||
# Add GA4 data
|
||||
for row in ga4_data:
|
||||
page_path = row.get('pagePath', '')
|
||||
combined[page_path] = {
|
||||
'page': page_path,
|
||||
'page_title': row.get('pageTitle', ''),
|
||||
'pageviews': row.get('screenPageViews', 0),
|
||||
'sessions': row.get('sessions', 0),
|
||||
'bounce_rate': row.get('bounceRate', 0.0),
|
||||
'avg_session_duration': row.get('averageSessionDuration', 0.0),
|
||||
'engagement_rate': row.get('engagementRate', 0.0),
|
||||
'clicks': 0,
|
||||
'impressions': 0,
|
||||
'ctr': 0.0,
|
||||
'position': 0.0
|
||||
}
|
||||
|
||||
# Merge GSC data
|
||||
for row in gsc_data:
|
||||
page_path = row.get('page', '')
|
||||
|
||||
if page_path in combined:
|
||||
# Update existing record
|
||||
combined[page_path]['clicks'] = row.get('clicks', 0)
|
||||
combined[page_path]['impressions'] = row.get('impressions', 0)
|
||||
combined[page_path]['ctr'] = row.get('ctr', 0.0)
|
||||
combined[page_path]['position'] = row.get('position', 0.0)
|
||||
else:
|
||||
# Create new record
|
||||
combined[page_path] = {
|
||||
'page': page_path,
|
||||
'page_title': '',
|
||||
'pageviews': 0,
|
||||
'sessions': 0,
|
||||
'bounce_rate': 0.0,
|
||||
'avg_session_duration': 0.0,
|
||||
'engagement_rate': 0.0,
|
||||
'clicks': row.get('clicks', 0),
|
||||
'impressions': row.get('impressions', 0),
|
||||
'ctr': row.get('ctr', 0.0),
|
||||
'position': row.get('position', 0.0)
|
||||
}
|
||||
|
||||
self.performance_data = list(combined.values())
|
||||
|
||||
logger.info(f"✓ Combined {len(self.performance_data)} pages")
|
||||
logger.info("="*70)
|
||||
|
||||
return self.performance_data
|
||||
|
||||
def analyze_performance(self) -> Dict:
|
||||
"""
|
||||
Analyze performance data and generate insights.
|
||||
|
||||
Returns:
|
||||
Analysis results dict
|
||||
"""
|
||||
if not self.performance_data:
|
||||
return {}
|
||||
|
||||
logger.info("\n" + "="*70)
|
||||
logger.info("PERFORMANCE ANALYSIS")
|
||||
logger.info("="*70)
|
||||
|
||||
# Calculate metrics
|
||||
total_pageviews = sum(p.get('pageviews', 0) for p in self.performance_data)
|
||||
total_clicks = sum(p.get('clicks', 0) for p in self.performance_data)
|
||||
total_impressions = sum(p.get('impressions', 0) for p in self.performance_data)
|
||||
|
||||
avg_ctr = total_clicks / total_impressions if total_impressions > 0 else 0
|
||||
avg_position = sum(p.get('position', 0) for p in self.performance_data) / len(self.performance_data)
|
||||
|
||||
# Top pages by pageviews
|
||||
top_pages = sorted(
|
||||
self.performance_data,
|
||||
key=lambda x: x.get('pageviews', 0),
|
||||
reverse=True
|
||||
)[:10]
|
||||
|
||||
# Top pages by CTR
|
||||
top_ctr = sorted(
|
||||
[p for p in self.performance_data if p.get('impressions', 0) > 100],
|
||||
key=lambda x: x.get('ctr', 0),
|
||||
reverse=True
|
||||
)[:10]
|
||||
|
||||
# Pages needing improvement (low CTR)
|
||||
low_ctr = [
|
||||
p for p in self.performance_data
|
||||
if p.get('impressions', 0) > 100 and p.get('ctr', 0) < 0.02
|
||||
]
|
||||
|
||||
# Pages with good traffic but low position
|
||||
opportunity_pages = [
|
||||
p for p in self.performance_data
|
||||
if p.get('pageviews', 0) > 50 and p.get('position', 0) > 10
|
||||
]
|
||||
|
||||
analysis = {
|
||||
'summary': {
|
||||
'total_pages': len(self.performance_data),
|
||||
'total_pageviews': total_pageviews,
|
||||
'total_clicks': total_clicks,
|
||||
'total_impressions': total_impressions,
|
||||
'average_ctr': avg_ctr,
|
||||
'average_position': avg_position
|
||||
},
|
||||
'top_pages': top_pages,
|
||||
'top_ctr': top_ctr,
|
||||
'low_ctr': low_ctr,
|
||||
'opportunities': opportunity_pages,
|
||||
'recommendations': self._generate_recommendations(analysis)
|
||||
}
|
||||
|
||||
# Log summary
|
||||
logger.info(f"Total pages: {analysis['summary']['total_pages']}")
|
||||
logger.info(f"Total pageviews: {analysis['summary']['total_pageviews']}")
|
||||
logger.info(f"Total clicks: {analysis['summary']['total_clicks']}")
|
||||
logger.info(f"Average CTR: {analysis['summary']['average_ctr']:.2%}")
|
||||
logger.info(f"Average position: {analysis['summary']['average_position']:.1f}")
|
||||
logger.info("="*70)
|
||||
|
||||
return analysis
|
||||
|
||||
def _generate_recommendations(self, analysis: Dict) -> List[str]:
|
||||
"""Generate SEO recommendations based on analysis."""
|
||||
recommendations = []
|
||||
|
||||
# Low CTR recommendations
|
||||
low_ctr_count = len(analysis.get('low_ctr', []))
|
||||
if low_ctr_count > 0:
|
||||
recommendations.append(
|
||||
f"📝 {low_ctr_count} pages have low CTR (<2%). "
|
||||
"Consider improving meta titles and descriptions."
|
||||
)
|
||||
|
||||
# Position opportunities
|
||||
opportunity_count = len(analysis.get('opportunities', []))
|
||||
if opportunity_count > 0:
|
||||
recommendations.append(
|
||||
f"🎯 {opportunity_count} pages have good traffic but rank >10. "
|
||||
"Optimize content to improve rankings."
|
||||
)
|
||||
|
||||
# High impressions, low clicks
|
||||
high_impressions = [
|
||||
p for p in self.performance_data
|
||||
if p.get('impressions', 0) > 1000 and p.get('ctr', 0) < 0.01
|
||||
]
|
||||
if high_impressions:
|
||||
recommendations.append(
|
||||
f"⚠️ {len(high_impressions)} pages have high impressions but very low CTR. "
|
||||
"Review title tags for better click appeal."
|
||||
)
|
||||
|
||||
return recommendations
|
||||
|
||||
def save_to_csv(self, output_file: Optional[str] = None) -> str:
|
||||
"""
|
||||
Save performance data to CSV.
|
||||
|
||||
Args:
|
||||
output_file: Custom output file path
|
||||
|
||||
Returns:
|
||||
Path to saved file
|
||||
"""
|
||||
if not output_file:
|
||||
output_dir = Path(__file__).parent.parent.parent / 'output'
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
output_file = output_dir / f'performance_data_{timestamp}.csv'
|
||||
|
||||
output_file = Path(output_file)
|
||||
output_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
fieldnames = [
|
||||
'page', 'page_title', 'pageviews', 'sessions', 'bounce_rate',
|
||||
'avg_session_duration', 'engagement_rate', 'clicks', 'impressions',
|
||||
'ctr', 'position'
|
||||
]
|
||||
|
||||
logger.info(f"Saving {len(self.performance_data)} rows to {output_file}...")
|
||||
|
||||
with open(output_file, 'w', newline='', encoding='utf-8') as f:
|
||||
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
writer.writerows(self.performance_data)
|
||||
|
||||
logger.info(f"✓ Saved to: {output_file}")
|
||||
return str(output_file)
|
||||
|
||||
def run(self, start_date: Optional[str] = None,
|
||||
end_date: Optional[str] = None,
|
||||
output_file: Optional[str] = None) -> Tuple[str, Dict]:
|
||||
"""
|
||||
Run complete performance analysis.
|
||||
|
||||
Args:
|
||||
start_date: Start date (YYYY-MM-DD), default 30 days ago
|
||||
end_date: End date (YYYY-MM-DD), default yesterday
|
||||
output_file: Custom output file path
|
||||
|
||||
Returns:
|
||||
Tuple of (output_file_path, analysis_dict)
|
||||
"""
|
||||
# Default date range (last 30 days)
|
||||
if not end_date:
|
||||
end_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
|
||||
if not start_date:
|
||||
start_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
|
||||
|
||||
logger.info("\n" + "="*70)
|
||||
logger.info("SEO PERFORMANCE ANALYSIS")
|
||||
logger.info("="*70)
|
||||
logger.info(f"Date range: {start_date} to {end_date}")
|
||||
logger.info("="*70)
|
||||
|
||||
# Fetch data
|
||||
self.fetch_combined_data(start_date, end_date)
|
||||
|
||||
if not self.performance_data:
|
||||
logger.warning("No performance data available")
|
||||
return "", {}
|
||||
|
||||
# Analyze
|
||||
analysis = self.analyze_performance()
|
||||
|
||||
# Save
|
||||
output_path = self.save_to_csv(output_file)
|
||||
|
||||
return output_path, analysis
|
||||
Reference in New Issue
Block a user