- Shows High/Medium/Low count breakdown
- Helps verify all matching posts will be processed
- Example output:
Filtered to 328 proposals (confidence >= Medium)
Breakdown: High=293, Medium=35, Low=0
Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
481 lines
19 KiB
Python
481 lines
19 KiB
Python
"""
|
|
Category Manager - Create, update, and assign categories in WordPress
|
|
"""
|
|
|
|
import csv
|
|
import logging
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Tuple
|
|
import requests
|
|
from requests.auth import HTTPBasicAuth
|
|
|
|
from .config import Config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class WordPressCategoryManager:
|
|
"""Manage WordPress categories: create, update, and assign to posts."""
|
|
|
|
def __init__(self):
|
|
"""Initialize category manager."""
|
|
self.sites = Config.WORDPRESS_SITES
|
|
self.category_cache = {} # Cache categories by site
|
|
|
|
def get_site_auth(self, site_name: str) -> Tuple[str, HTTPBasicAuth]:
|
|
"""Get site URL and auth for a given site name."""
|
|
site_config = self.sites.get(site_name)
|
|
if not site_config:
|
|
raise ValueError(f"Site not found: {site_name}")
|
|
|
|
base_url = site_config['url'].rstrip('/')
|
|
auth = HTTPBasicAuth(site_config['username'], site_config['password'])
|
|
return base_url, auth
|
|
|
|
def fetch_categories(self, site_name: str) -> Dict[str, int]:
|
|
"""
|
|
Fetch all categories from a WordPress site.
|
|
|
|
Returns:
|
|
Dict mapping category name (slug) to category ID
|
|
"""
|
|
if site_name in self.category_cache:
|
|
return self.category_cache[site_name]
|
|
|
|
logger.info(f"Fetching categories from {site_name}...")
|
|
|
|
try:
|
|
base_url, auth = self.get_site_auth(site_name)
|
|
categories = {}
|
|
page = 1
|
|
|
|
while True:
|
|
response = requests.get(
|
|
f"{base_url}/wp-json/wp/v2/categories",
|
|
params={'per_page': 100, 'page': page},
|
|
auth=auth,
|
|
timeout=10
|
|
)
|
|
response.raise_for_status()
|
|
|
|
page_categories = response.json()
|
|
if not page_categories:
|
|
break
|
|
|
|
for cat in page_categories:
|
|
categories[cat['slug'].lower()] = {
|
|
'id': cat['id'],
|
|
'name': cat['name'],
|
|
'slug': cat['slug'],
|
|
'count': cat.get('count', 0)
|
|
}
|
|
|
|
# Check for more pages
|
|
if len(page_categories) < 100:
|
|
break
|
|
page += 1
|
|
|
|
self.category_cache[site_name] = categories
|
|
logger.info(f"✓ Fetched {len(categories)} categories from {site_name}")
|
|
return categories
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching categories from {site_name}: {e}")
|
|
return {}
|
|
|
|
def create_category(self, site_name: str, category_name: str,
|
|
description: str = '', parent_id: int = 0) -> Optional[int]:
|
|
"""
|
|
Create a new category in WordPress.
|
|
|
|
Args:
|
|
site_name: Site to create category on
|
|
category_name: Name of the category
|
|
description: Category description
|
|
parent_id: Parent category ID (0 for top-level)
|
|
|
|
Returns:
|
|
Category ID if successful, None otherwise
|
|
"""
|
|
try:
|
|
base_url, auth = self.get_site_auth(site_name)
|
|
|
|
# Create slug from name
|
|
slug = category_name.lower().replace(' ', '-').replace('/', '-')
|
|
|
|
logger.info(f"Creating category '{category_name}' on {site_name}...")
|
|
|
|
response = requests.post(
|
|
f"{base_url}/wp-json/wp/v2/categories",
|
|
json={
|
|
'name': category_name,
|
|
'slug': slug,
|
|
'description': description,
|
|
'parent': parent_id
|
|
},
|
|
auth=auth,
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 201:
|
|
category_data = response.json()
|
|
logger.info(f"✓ Created category '{category_name}' (ID: {category_data['id']})")
|
|
|
|
# Update cache
|
|
if site_name in self.category_cache:
|
|
self.category_cache[site_name][slug] = {
|
|
'id': category_data['id'],
|
|
'name': category_data['name'],
|
|
'slug': slug,
|
|
'count': 0
|
|
}
|
|
|
|
return category_data['id']
|
|
elif response.status_code == 400:
|
|
# Category might already exist - search for it
|
|
error_data = response.json()
|
|
if error_data.get('code') == 'term_exists':
|
|
term_id = error_data.get('data', {}).get('term_id')
|
|
if term_id:
|
|
logger.info(f" Category '{category_name}' already exists (ID: {term_id})")
|
|
|
|
# Fetch the category details
|
|
cat_response = requests.get(
|
|
f"{base_url}/wp-json/wp/v2/categories/{term_id}",
|
|
auth=auth,
|
|
timeout=10
|
|
)
|
|
if cat_response.status_code == 200:
|
|
cat_data = cat_response.json()
|
|
# Update cache
|
|
if site_name in self.category_cache:
|
|
self.category_cache[site_name][cat_data['slug']] = {
|
|
'id': cat_data['id'],
|
|
'name': cat_data['name'],
|
|
'slug': cat_data['slug'],
|
|
'count': cat_data.get('count', 0)
|
|
}
|
|
return cat_data['id']
|
|
|
|
logger.warning(f" Category already exists or error: {error_data}")
|
|
return None
|
|
else:
|
|
logger.error(f"Error creating category: {response.status_code} - {response.text}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating category: {e}")
|
|
return None
|
|
|
|
def get_or_create_category(self, site_name: str, category_name: str,
|
|
description: str = '') -> Optional[int]:
|
|
"""
|
|
Get existing category or create it if it doesn't exist.
|
|
|
|
Args:
|
|
site_name: Site to work with
|
|
category_name: Name of the category
|
|
description: Category description (used if creating)
|
|
|
|
Returns:
|
|
Category ID
|
|
"""
|
|
# Fetch categories if not cached
|
|
if site_name not in self.category_cache:
|
|
self.fetch_categories(site_name)
|
|
|
|
# Check if category exists (by exact name first)
|
|
categories = self.category_cache.get(site_name, {})
|
|
|
|
# Try exact name match (case-insensitive)
|
|
category_name_lower = category_name.lower()
|
|
for slug, cat_data in categories.items():
|
|
if cat_data['name'].lower() == category_name_lower:
|
|
logger.info(f"✓ Found existing category '{category_name}' (ID: {cat_data['id']})")
|
|
return cat_data['id']
|
|
|
|
# Try slug match
|
|
slug = category_name.lower().replace(' ', '-').replace('/', '-')
|
|
if slug in categories:
|
|
logger.info(f"✓ Found existing category '{category_name}' (ID: {categories[slug]['id']})")
|
|
return categories[slug]['id']
|
|
|
|
# Try alternative slug formats (handle French characters)
|
|
import unicodedata
|
|
normalized_slug = unicodedata.normalize('NFKD', slug)\
|
|
.encode('ascii', 'ignore')\
|
|
.decode('ascii')\
|
|
.lower()\
|
|
.replace(' ', '-')
|
|
|
|
if normalized_slug in categories:
|
|
logger.info(f"✓ Found existing category '{category_name}' (ID: {categories[normalized_slug]['id']})")
|
|
return categories[normalized_slug]['id']
|
|
|
|
# Try partial match (if slug contains the category name)
|
|
for slug, cat_data in categories.items():
|
|
if category_name_lower in cat_data['name'].lower() or cat_data['name'].lower() in category_name_lower:
|
|
logger.info(f"✓ Found similar category '{cat_data['name']}' (ID: {cat_data['id']})")
|
|
return cat_data['id']
|
|
|
|
# Create new category
|
|
logger.info(f"Creating new category '{category_name}'...")
|
|
return self.create_category(site_name, category_name, description)
|
|
|
|
def assign_post_to_category(self, site_name: str, post_id: int,
|
|
category_id: int, append: bool = True) -> bool:
|
|
"""
|
|
Assign a post to a category.
|
|
|
|
Args:
|
|
site_name: Site where post exists
|
|
post_id: Post ID
|
|
category_id: Category ID to assign
|
|
append: If True, add to existing categories; if False, replace all
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
try:
|
|
base_url, auth = self.get_site_auth(site_name)
|
|
|
|
if append:
|
|
# Get current categories
|
|
response = requests.get(
|
|
f"{base_url}/wp-json/wp/v2/posts/{post_id}",
|
|
auth=auth,
|
|
timeout=10
|
|
)
|
|
if response.status_code == 200:
|
|
post_data = response.json()
|
|
current_categories = post_data.get('categories', [])
|
|
if category_id not in current_categories:
|
|
current_categories.append(category_id)
|
|
else:
|
|
logger.error(f"Could not fetch post {post_id}")
|
|
return False
|
|
else:
|
|
current_categories = [category_id]
|
|
|
|
# Update post with new categories
|
|
response = requests.post(
|
|
f"{base_url}/wp-json/wp/v2/posts/{post_id}",
|
|
json={'categories': current_categories},
|
|
auth=auth,
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
logger.info(f"✓ Assigned post {post_id} to category {category_id}")
|
|
return True
|
|
else:
|
|
logger.error(f"Error assigning category: {response.status_code}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error assigning category: {e}")
|
|
return False
|
|
|
|
def bulk_assign_categories(self, site_name: str,
|
|
post_category_map: Dict[int, List[int]]) -> Dict[str, int]:
|
|
"""
|
|
Bulk assign posts to categories.
|
|
|
|
Args:
|
|
site_name: Site to work with
|
|
post_category_map: Dict mapping post_id to list of category_ids
|
|
|
|
Returns:
|
|
Statistics dict with success/failure counts
|
|
"""
|
|
stats = {'success': 0, 'failed': 0}
|
|
|
|
logger.info(f"Bulk assigning categories on {site_name}...")
|
|
|
|
for post_id, category_ids in post_category_map.items():
|
|
for category_id in category_ids:
|
|
if self.assign_post_to_category(site_name, post_id, category_id):
|
|
stats['success'] += 1
|
|
else:
|
|
stats['failed'] += 1
|
|
|
|
logger.info(f"✓ Bulk assignment complete: {stats['success']} successful, {stats['failed']} failed")
|
|
return stats
|
|
|
|
|
|
class CategoryAssignmentProcessor:
|
|
"""Process AI category proposals and apply them to WordPress."""
|
|
|
|
def __init__(self):
|
|
"""Initialize processor."""
|
|
self.category_manager = WordPressCategoryManager()
|
|
self.processing_stats = {
|
|
'total_posts': 0,
|
|
'categories_created': 0,
|
|
'posts_updated': 0,
|
|
'errors': 0
|
|
}
|
|
|
|
def load_proposals(self, proposals_csv: str) -> List[Dict]:
|
|
"""Load category proposals from CSV."""
|
|
logger.info(f"Loading proposals from: {proposals_csv}")
|
|
|
|
try:
|
|
with open(proposals_csv, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
proposals = list(reader)
|
|
|
|
logger.info(f"✓ Loaded {len(proposals)} proposals")
|
|
return proposals
|
|
except Exception as e:
|
|
logger.error(f"Error loading proposals: {e}")
|
|
return []
|
|
|
|
def process_proposals(self, proposals: List[Dict], site_name: str,
|
|
confidence_threshold: str = 'Medium',
|
|
strict: bool = False,
|
|
dry_run: bool = False) -> Dict[str, int]:
|
|
"""
|
|
Process AI category proposals and apply to WordPress.
|
|
|
|
Args:
|
|
proposals: List of proposal dicts from CSV
|
|
site_name: Site to apply changes to (filters proposals)
|
|
confidence_threshold: Minimum confidence to apply (High, Medium, Low)
|
|
strict: If True, only match exact confidence level
|
|
dry_run: If True, don't actually make changes
|
|
|
|
Returns:
|
|
Statistics dict
|
|
"""
|
|
logger.info("\n" + "="*70)
|
|
logger.info("PROCESSING CATEGORY PROPOSALS")
|
|
logger.info("="*70)
|
|
|
|
if dry_run:
|
|
logger.info("DRY RUN - No changes will be made")
|
|
|
|
# Filter by site
|
|
original_count = len(proposals)
|
|
proposals = [p for p in proposals if p.get('current_site', '') == site_name]
|
|
filtered_by_site = original_count - len(proposals)
|
|
|
|
logger.info(f"Filtered to {len(proposals)} posts on {site_name} ({filtered_by_site} excluded from other sites)")
|
|
|
|
# Filter by confidence
|
|
if strict:
|
|
# Exact match only
|
|
filtered_proposals = [
|
|
p for p in proposals
|
|
if p.get('category_confidence', 'Medium') == confidence_threshold
|
|
]
|
|
logger.info(f"Filtered to {len(filtered_proposals)} proposals (confidence = {confidence_threshold}, strict mode)")
|
|
else:
|
|
# Medium or better (default behavior)
|
|
confidence_order = {'High': 3, 'Medium': 2, 'Low': 1}
|
|
min_confidence = confidence_order.get(confidence_threshold, 2)
|
|
|
|
filtered_proposals = [
|
|
p for p in proposals
|
|
if confidence_order.get(p.get('category_confidence', 'Medium'), 2) >= min_confidence
|
|
]
|
|
logger.info(f"Filtered to {len(filtered_proposals)} proposals (confidence >= {confidence_threshold})")
|
|
|
|
# Show breakdown
|
|
high_count = sum(1 for p in filtered_proposals if p.get('category_confidence') == 'High')
|
|
medium_count = sum(1 for p in filtered_proposals if p.get('category_confidence') == 'Medium')
|
|
low_count = sum(1 for p in filtered_proposals if p.get('category_confidence') == 'Low')
|
|
logger.info(f" Breakdown: High={high_count}, Medium={medium_count}, Low={low_count}")
|
|
|
|
# Fetch existing categories
|
|
self.category_manager.fetch_categories(site_name)
|
|
|
|
# Process each proposal
|
|
for i, proposal in enumerate(filtered_proposals, 1):
|
|
post_title = proposal.get('title', 'Unknown')[:60]
|
|
post_id = proposal.get('post_id', '')
|
|
proposed_category = proposal.get('proposed_category', '')
|
|
current_categories = proposal.get('current_categories', '')
|
|
confidence = proposal.get('category_confidence', 'Medium')
|
|
|
|
logger.info(f"\n[{i}/{len(filtered_proposals)}] Post {post_id}: {post_title}...")
|
|
logger.info(f" Current categories: {current_categories}")
|
|
logger.info(f" Proposed: {proposed_category} (confidence: {confidence})")
|
|
|
|
if not post_id or not proposed_category:
|
|
logger.warning(" Skipping: Missing post_id or proposed_category")
|
|
self.processing_stats['errors'] += 1
|
|
continue
|
|
|
|
if dry_run:
|
|
logger.info(f" [DRY RUN] Would assign to: {proposed_category}")
|
|
continue
|
|
|
|
# Get or create the category
|
|
category_id = self.category_manager.get_or_create_category(
|
|
site_name,
|
|
proposed_category,
|
|
description=f"AI-proposed category (confidence: {confidence})"
|
|
)
|
|
|
|
if category_id:
|
|
self.processing_stats['categories_created'] += 1
|
|
|
|
# Assign post to category
|
|
if self.category_manager.assign_post_to_category(
|
|
site_name, post_id, category_id, append=True
|
|
):
|
|
self.processing_stats['posts_updated'] += 1
|
|
logger.info(f" ✓ Assigned to '{proposed_category}'")
|
|
else:
|
|
self.processing_stats['errors'] += 1
|
|
logger.error(f" ✗ Failed to assign category")
|
|
else:
|
|
self.processing_stats['errors'] += 1
|
|
logger.error(f" ✗ Failed to get/create category '{proposed_category}'")
|
|
|
|
self.processing_stats['total_posts'] = len(filtered_proposals)
|
|
|
|
# Print summary
|
|
logger.info("\n" + "="*70)
|
|
logger.info("PROCESSING SUMMARY")
|
|
logger.info("="*70)
|
|
logger.info(f"Total proposals processed: {self.processing_stats['total_posts']}")
|
|
logger.info(f"Categories created/found: {self.processing_stats['categories_created']}")
|
|
logger.info(f"Posts updated: {self.processing_stats['posts_updated']}")
|
|
logger.info(f"Errors: {self.processing_stats['errors']}")
|
|
|
|
return self.processing_stats
|
|
|
|
def run(self, proposals_csv: str, site_name: str,
|
|
confidence_threshold: str = 'Medium',
|
|
strict: bool = False,
|
|
dry_run: bool = False) -> Dict[str, int]:
|
|
"""
|
|
Run complete category assignment process.
|
|
|
|
Args:
|
|
proposals_csv: Path to proposals CSV
|
|
site_name: Site to apply changes to
|
|
confidence_threshold: Minimum confidence to apply
|
|
strict: If True, only match exact confidence level
|
|
dry_run: If True, preview changes without applying
|
|
|
|
Returns:
|
|
Statistics dict
|
|
"""
|
|
proposals = self.load_proposals(proposals_csv)
|
|
|
|
if not proposals:
|
|
logger.error("No proposals to process")
|
|
return self.processing_stats
|
|
|
|
return self.process_proposals(
|
|
proposals,
|
|
site_name,
|
|
confidence_threshold,
|
|
strict=strict,
|
|
dry_run=dry_run
|
|
)
|