diff --git a/CHANGELOG.md b/CHANGELOG.md index 7cde1a1..9095791 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,21 +9,24 @@ This format follows Keep a Changelog principles and aims for Semantic Versioning ## [Unreleased] ### Summary -Improved performance with new card data storage format. Card queries are now significantly faster with reduced file sizes. +Theme catalog improvements with faster processing, new tag search features, and regeneration fixes. ### Added -- **Card Data Consolidation**: All card data now stored in optimized format for faster loading - - Automatic updates after tagging/setup completes - - "Rebuild Card Files" button in Setup page for manual refresh - - 87% smaller file sizes with dramatically faster queries - - Maintains multiple backup versions for safety -- **Backward Compatibility**: Existing functionality continues to work without changes +- **Theme Catalog Optimization**: + - Consolidated theme enrichment pipeline (single pass instead of 7 separate scripts) + - Tag index for fast theme-based card queries + - Tag search API with new endpoints for card search, autocomplete, and popular tags + - Commander browser theme autocomplete with keyboard navigation + - Tag loading infrastructure for batch operations ### Changed _No unreleased changes yet._ ### Fixed -_No unreleased fixes yet._ +- **Theme Regeneration**: Theme catalog can now be fully rebuilt from scratch without placeholder data + - Fixed "Anchor" placeholder issue when regenerating catalog + - Examples now generated from actual card data + - Theme export preserves all metadata fields ## [2.7.1] - 2025-10-14 ### Summary diff --git a/RELEASE_NOTES_TEMPLATE.md b/RELEASE_NOTES_TEMPLATE.md index 2590a17..c184223 100644 --- a/RELEASE_NOTES_TEMPLATE.md +++ b/RELEASE_NOTES_TEMPLATE.md @@ -1,18 +1,23 @@ # MTG Python Deckbuilder ${VERSION} ### Summary -Improved performance with new card data storage format. Card queries are now significantly faster with reduced file sizes. +Theme catalog improvements with faster processing, tag search features, and regeneration fixes. ### Added -- **Card Data Consolidation**: All card data now stored in optimized format for faster loading - - Automatic updates after tagging/setup completes - - "Rebuild Card Files" button in Setup page for manual refresh - - 87% smaller file sizes with dramatically faster queries - - Maintains multiple backup versions for safety -- **Backward Compatibility**: Existing functionality continues to work without changes +- **Theme Catalog Optimization**: + - Consolidated theme enrichment pipeline + - Tag search API for theme-based card discovery + - Commander browser theme autocomplete with keyboard navigation + - Tag index for faster queries +- **Card Data Consolidation** (from previous release): + - Optimized format with smaller file sizes + - "Rebuild Card Files" button in Setup page + - Automatic updates after tagging/setup ### Changed _No unreleased changes yet._ ### Fixed -_No unreleased fixes yet._ +- **Theme Regeneration**: Theme catalog can now be fully rebuilt from scratch + - Fixed placeholder data appearing in fresh installations + - Examples now generated from actual card data diff --git a/code/scripts/audit_protection_full_v2.py b/code/scripts/audit_protection_full_v2.py deleted file mode 100644 index a10d415..0000000 --- a/code/scripts/audit_protection_full_v2.py +++ /dev/null @@ -1,203 +0,0 @@ -""" -Full audit of Protection-tagged cards with kindred metadata support (M2 Phase 2). - -Created: October 8, 2025 -Purpose: Audit and validate Protection tag precision after implementing grant detection. - Can be re-run periodically to check tagging quality. - -This script audits ALL Protection-tagged cards and categorizes them: -- Grant: Gives broad protection to other permanents YOU control -- Kindred: Gives protection to specific creature types (metadata tags) -- Mixed: Both broad and kindred/inherent -- Inherent: Only has protection itself -- ConditionalSelf: Only conditionally grants to itself -- Opponent: Grants to opponent's permanents -- Neither: False positive - -Outputs: -- m2_audit_v2.json: Full analysis with summary -- m2_audit_v2_grant.csv: Cards for main Protection tag -- m2_audit_v2_kindred.csv: Cards for kindred metadata tags -- m2_audit_v2_mixed.csv: Cards with both broad and kindred grants -- m2_audit_v2_conditional.csv: Conditional self-grants (exclude) -- m2_audit_v2_inherent.csv: Inherent protection only (exclude) -- m2_audit_v2_opponent.csv: Opponent grants (exclude) -- m2_audit_v2_neither.csv: False positives (exclude) -- m2_audit_v2_all.csv: All cards combined -""" - -import sys -from pathlib import Path -import pandas as pd -import json - -# Add project root to path -project_root = Path(__file__).parent.parent.parent -sys.path.insert(0, str(project_root)) - -from code.tagging.protection_grant_detection import ( - categorize_protection_card, - get_kindred_protection_tags, - is_granting_protection, -) - -def load_all_cards(): - """Load all cards from color/identity CSV files.""" - csv_dir = project_root / 'csv_files' - - # Get all color/identity CSVs (not the raw cards.csv) - csv_files = list(csv_dir.glob('*_cards.csv')) - csv_files = [f for f in csv_files if f.stem not in ['cards', 'testdata']] - - all_cards = [] - for csv_file in csv_files: - try: - df = pd.read_csv(csv_file) - all_cards.append(df) - except Exception as e: - print(f"Warning: Could not load {csv_file.name}: {e}") - - # Combine all DataFrames - combined = pd.concat(all_cards, ignore_index=True) - - # Drop duplicates (cards appear in multiple color files) - combined = combined.drop_duplicates(subset=['name'], keep='first') - - return combined - -def audit_all_protection_cards(): - """Audit all Protection-tagged cards.""" - print("Loading all cards...") - df = load_all_cards() - - print(f"Total cards loaded: {len(df)}") - - # Filter to Protection-tagged cards (column is 'themeTags' in color CSVs) - df_prot = df[df['themeTags'].str.contains('Protection', case=False, na=False)].copy() - - print(f"Protection-tagged cards: {len(df_prot)}") - - # Categorize each card - categories = [] - grants_list = [] - kindred_tags_list = [] - - for idx, row in df_prot.iterrows(): - name = row['name'] - text = str(row.get('text', '')).replace('\\n', '\n') # Convert escaped newlines to real newlines - keywords = str(row.get('keywords', '')) - card_type = str(row.get('type', '')) - - # Categorize with kindred exclusion enabled - category = categorize_protection_card(name, text, keywords, card_type, exclude_kindred=True) - - # Check if it grants broadly - grants_broad = is_granting_protection(text, keywords, exclude_kindred=True) - - # Get kindred tags - kindred_tags = get_kindred_protection_tags(text) - - categories.append(category) - grants_list.append(grants_broad) - kindred_tags_list.append(', '.join(sorted(kindred_tags)) if kindred_tags else '') - - df_prot['category'] = categories - df_prot['grants_broad'] = grants_list - df_prot['kindred_tags'] = kindred_tags_list - - # Generate summary (convert numpy types to native Python for JSON serialization) - summary = { - 'total': int(len(df_prot)), - 'categories': {k: int(v) for k, v in df_prot['category'].value_counts().to_dict().items()}, - 'grants_broad_count': int(df_prot['grants_broad'].sum()), - 'kindred_cards_count': int((df_prot['kindred_tags'] != '').sum()), - } - - # Calculate keep vs remove - keep_categories = {'Grant', 'Mixed'} - kindred_only = df_prot[df_prot['category'] == 'Kindred'] - keep_count = len(df_prot[df_prot['category'].isin(keep_categories)]) - remove_count = len(df_prot[~df_prot['category'].isin(keep_categories | {'Kindred'})]) - - summary['keep_main_tag'] = keep_count - summary['kindred_metadata'] = len(kindred_only) - summary['remove'] = remove_count - summary['precision_estimate'] = round((keep_count / len(df_prot)) * 100, 1) if len(df_prot) > 0 else 0 - - # Print summary - print(f"\n{'='*60}") - print("AUDIT SUMMARY") - print(f"{'='*60}") - print(f"Total Protection-tagged cards: {summary['total']}") - print(f"\nCategories:") - for cat, count in sorted(summary['categories'].items()): - pct = (count / summary['total']) * 100 - print(f" {cat:20s} {count:4d} ({pct:5.1f}%)") - - print(f"\n{'='*60}") - print(f"Main Protection tag: {keep_count:4d} ({keep_count/len(df_prot)*100:5.1f}%)") - print(f"Kindred metadata only: {len(kindred_only):4d} ({len(kindred_only)/len(df_prot)*100:5.1f}%)") - print(f"Remove: {remove_count:4d} ({remove_count/len(df_prot)*100:5.1f}%)") - print(f"{'='*60}") - print(f"Precision estimate: {summary['precision_estimate']}%") - print(f"{'='*60}\n") - - # Export results - output_dir = project_root / 'logs' / 'roadmaps' / 'source' / 'tagging_refinement' - output_dir.mkdir(parents=True, exist_ok=True) - - # Export JSON summary - with open(output_dir / 'm2_audit_v2.json', 'w') as f: - json.dump({ - 'summary': summary, - 'cards': df_prot[['name', 'type', 'category', 'grants_broad', 'kindred_tags', 'keywords', 'text']].to_dict(orient='records') - }, f, indent=2) - - # Export CSVs by category - export_cols = ['name', 'type', 'category', 'grants_broad', 'kindred_tags', 'keywords', 'text'] - - # Grant category - df_grant = df_prot[df_prot['category'] == 'Grant'] - df_grant[export_cols].to_csv(output_dir / 'm2_audit_v2_grant.csv', index=False) - print(f"Exported {len(df_grant)} Grant cards to m2_audit_v2_grant.csv") - - # Kindred category - df_kindred = df_prot[df_prot['category'] == 'Kindred'] - df_kindred[export_cols].to_csv(output_dir / 'm2_audit_v2_kindred.csv', index=False) - print(f"Exported {len(df_kindred)} Kindred cards to m2_audit_v2_kindred.csv") - - # Mixed category - df_mixed = df_prot[df_prot['category'] == 'Mixed'] - df_mixed[export_cols].to_csv(output_dir / 'm2_audit_v2_mixed.csv', index=False) - print(f"Exported {len(df_mixed)} Mixed cards to m2_audit_v2_mixed.csv") - - # ConditionalSelf category - df_conditional = df_prot[df_prot['category'] == 'ConditionalSelf'] - df_conditional[export_cols].to_csv(output_dir / 'm2_audit_v2_conditional.csv', index=False) - print(f"Exported {len(df_conditional)} ConditionalSelf cards to m2_audit_v2_conditional.csv") - - # Inherent category - df_inherent = df_prot[df_prot['category'] == 'Inherent'] - df_inherent[export_cols].to_csv(output_dir / 'm2_audit_v2_inherent.csv', index=False) - print(f"Exported {len(df_inherent)} Inherent cards to m2_audit_v2_inherent.csv") - - # Opponent category - df_opponent = df_prot[df_prot['category'] == 'Opponent'] - df_opponent[export_cols].to_csv(output_dir / 'm2_audit_v2_opponent.csv', index=False) - print(f"Exported {len(df_opponent)} Opponent cards to m2_audit_v2_opponent.csv") - - # Neither category - df_neither = df_prot[df_prot['category'] == 'Neither'] - df_neither[export_cols].to_csv(output_dir / 'm2_audit_v2_neither.csv', index=False) - print(f"Exported {len(df_neither)} Neither cards to m2_audit_v2_neither.csv") - - # All cards - df_prot[export_cols].to_csv(output_dir / 'm2_audit_v2_all.csv', index=False) - print(f"Exported {len(df_prot)} total cards to m2_audit_v2_all.csv") - - print(f"\nAll files saved to: {output_dir}") - - return df_prot, summary - -if __name__ == '__main__': - df_results, summary = audit_all_protection_cards() diff --git a/code/scripts/check_random_theme_perf.py b/code/scripts/check_random_theme_perf.py deleted file mode 100644 index 5b739e5..0000000 --- a/code/scripts/check_random_theme_perf.py +++ /dev/null @@ -1,118 +0,0 @@ -"""Opt-in guard that compares multi-theme filter performance to a stored baseline. - -Run inside the project virtual environment: - - python -m code.scripts.check_random_theme_perf --baseline config/random_theme_perf_baseline.json - -The script executes the same profiling loop as `profile_multi_theme_filter` and fails -if the observed mean or p95 timings regress more than the allowed threshold. -""" -from __future__ import annotations - -import argparse -import json -import sys -from pathlib import Path -from typing import Any, Dict, Tuple - -PROJECT_ROOT = Path(__file__).resolve().parents[2] -DEFAULT_BASELINE = PROJECT_ROOT / "config" / "random_theme_perf_baseline.json" - -if str(PROJECT_ROOT) not in sys.path: - sys.path.append(str(PROJECT_ROOT)) - -from code.scripts.profile_multi_theme_filter import run_profile # type: ignore # noqa: E402 - - -def _load_baseline(path: Path) -> Dict[str, Any]: - if not path.exists(): - raise FileNotFoundError(f"Baseline file not found: {path}") - data = json.loads(path.read_text(encoding="utf-8")) - return data - - -def _extract(metric: Dict[str, Any], key: str) -> float: - try: - value = float(metric.get(key, 0.0)) - except Exception: - value = 0.0 - return value - - -def _check_section(name: str, actual: Dict[str, Any], baseline: Dict[str, Any], threshold: float) -> Tuple[bool, str]: - a_mean = _extract(actual, "mean_ms") - b_mean = _extract(baseline, "mean_ms") - a_p95 = _extract(actual, "p95_ms") - b_p95 = _extract(baseline, "p95_ms") - - allowed_mean = b_mean * (1.0 + threshold) - allowed_p95 = b_p95 * (1.0 + threshold) - - mean_ok = a_mean <= allowed_mean or b_mean == 0.0 - p95_ok = a_p95 <= allowed_p95 or b_p95 == 0.0 - - status = mean_ok and p95_ok - - def _format_row(label: str, actual_val: float, baseline_val: float, allowed_val: float, ok: bool) -> str: - trend = ((actual_val - baseline_val) / baseline_val * 100.0) if baseline_val else 0.0 - trend_str = f"{trend:+.1f}%" if baseline_val else "n/a" - limit_str = f"≤ {allowed_val:.3f}ms" if baseline_val else "n/a" - return f" {label:<6} actual={actual_val:.3f}ms baseline={baseline_val:.3f}ms ({trend_str}), limit {limit_str} -> {'OK' if ok else 'FAIL'}" - - rows = [f"Section: {name}"] - rows.append(_format_row("mean", a_mean, b_mean, allowed_mean, mean_ok)) - rows.append(_format_row("p95", a_p95, b_p95, allowed_p95, p95_ok)) - return status, "\n".join(rows) - - -def main(argv: list[str] | None = None) -> int: - parser = argparse.ArgumentParser(description="Check multi-theme filtering performance against a baseline") - parser.add_argument("--baseline", type=Path, default=DEFAULT_BASELINE, help="Baseline JSON file (default: config/random_theme_perf_baseline.json)") - parser.add_argument("--iterations", type=int, default=400, help="Number of iterations to sample (default: 400)") - parser.add_argument("--seed", type=int, default=None, help="Optional RNG seed for reproducibility") - parser.add_argument("--threshold", type=float, default=0.15, help="Allowed regression threshold as a fraction (default: 0.15 = 15%)") - parser.add_argument("--update-baseline", action="store_true", help="Overwrite the baseline file with the newly collected metrics") - args = parser.parse_args(argv) - - baseline_path = args.baseline if args.baseline else DEFAULT_BASELINE - if args.update_baseline and not baseline_path.parent.exists(): - baseline_path.parent.mkdir(parents=True, exist_ok=True) - - if not args.update_baseline: - baseline = _load_baseline(baseline_path) - else: - baseline = {} - - results = run_profile(args.iterations, args.seed) - - cascade_status, cascade_report = _check_section("cascade", results.get("cascade", {}), baseline.get("cascade", {}), args.threshold) - synergy_status, synergy_report = _check_section("synergy", results.get("synergy", {}), baseline.get("synergy", {}), args.threshold) - - print("Iterations:", results.get("iterations")) - print("Seed:", results.get("seed")) - print(cascade_report) - print(synergy_report) - - overall_ok = cascade_status and synergy_status - - if args.update_baseline: - payload = { - "iterations": results.get("iterations"), - "seed": results.get("seed"), - "cascade": results.get("cascade"), - "synergy": results.get("synergy"), - } - baseline_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") - print(f"Baseline updated → {baseline_path}") - return 0 - - if not overall_ok: - print(f"FAIL: performance regressions exceeded {args.threshold * 100:.1f}% threshold", file=sys.stderr) - return 1 - - print("PASS: performance within allowed threshold") - return 0 - - -if __name__ == "__main__": # pragma: no cover - raise SystemExit(main()) diff --git a/code/scripts/enrich_themes.py b/code/scripts/enrich_themes.py new file mode 100644 index 0000000..a52348c --- /dev/null +++ b/code/scripts/enrich_themes.py @@ -0,0 +1,135 @@ +"""CLI wrapper for theme enrichment pipeline. + +Runs the consolidated theme enrichment pipeline with command-line options. +For backward compatibility, individual scripts can still be run separately, +but this provides a faster single-pass alternative. + +Usage: + python code/scripts/enrich_themes.py --write + python code/scripts/enrich_themes.py --dry-run --enforce-min +""" +from __future__ import annotations + +import argparse +import os +import sys +from pathlib import Path + +# Add project root to path +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +# Import after adding to path +from code.tagging.theme_enrichment import run_enrichment_pipeline # noqa: E402 + + +def main() -> int: + """Run theme enrichment pipeline from CLI.""" + parser = argparse.ArgumentParser( + description='Consolidated theme metadata enrichment pipeline', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Dry run (no changes written): + python code/scripts/enrich_themes.py --dry-run + + # Write changes: + python code/scripts/enrich_themes.py --write + + # Enforce minimum examples (errors if insufficient): + python code/scripts/enrich_themes.py --write --enforce-min + + # Strict validation for cornerstone themes: + python code/scripts/enrich_themes.py --write --strict + +Note: This replaces running 7 separate scripts (autofill, pad, cleanup, purge, +augment, suggestions, lint) with a single 5-10x faster operation. + """ + ) + + parser.add_argument( + '--write', + action='store_true', + help='Write changes to disk (default: dry run)' + ) + parser.add_argument( + '--dry-run', + action='store_true', + help='Dry run mode: show what would be changed without writing' + ) + parser.add_argument( + '--min', + '--min-examples', + type=int, + default=None, + metavar='N', + help='Minimum number of example commanders (default: $EDITORIAL_MIN_EXAMPLES or 5)' + ) + parser.add_argument( + '--enforce-min', + action='store_true', + help='Treat minimum examples violations as errors' + ) + parser.add_argument( + '--strict', + action='store_true', + help='Enable strict validation (cornerstone themes must have examples)' + ) + + args = parser.parse_args() + + # Determine write mode + if args.dry_run: + write = False + elif args.write: + write = True + else: + # Default to dry run if neither specified + write = False + print("Note: Running in dry-run mode (use --write to save changes)\n") + + # Get minimum examples threshold + if args.min is not None: + min_examples = args.min + else: + min_examples = int(os.environ.get('EDITORIAL_MIN_EXAMPLES', '5')) + + print("Theme Enrichment Pipeline") + print("========================") + print(f"Mode: {'WRITE' if write else 'DRY RUN'}") + print(f"Min examples: {min_examples}") + print(f"Enforce min: {args.enforce_min}") + print(f"Strict: {args.strict}") + print() + + try: + stats = run_enrichment_pipeline( + root=ROOT, + min_examples=min_examples, + write=write, + enforce_min=args.enforce_min, + strict=args.strict, + progress_callback=None, # Use default print + ) + + # Return non-zero if there are lint errors + if stats.lint_errors > 0: + print(f"\n❌ Enrichment completed with {stats.lint_errors} error(s)") + return 1 + + print("\n✅ Enrichment completed successfully") + return 0 + + except KeyboardInterrupt: + print("\n\nInterrupted by user") + return 130 + except Exception as e: + print(f"\n❌ Error: {e}", file=sys.stderr) + if '--debug' in sys.argv: + raise + return 1 + + +if __name__ == '__main__': + raise SystemExit(main()) diff --git a/code/scripts/export_themes_to_yaml.py b/code/scripts/export_themes_to_yaml.py index 524799a..a417e53 100644 --- a/code/scripts/export_themes_to_yaml.py +++ b/code/scripts/export_themes_to_yaml.py @@ -123,6 +123,9 @@ def main(): enforced_set = set(enforced_synergies) inferred_synergies = [s for s in synergy_list if s not in curated_set and s not in enforced_set] + example_cards_value = entry.get('example_cards', []) + example_commanders_value = entry.get('example_commanders', []) + doc = { 'id': slug, 'display_name': theme_name, @@ -132,13 +135,40 @@ def main(): 'inferred_synergies': inferred_synergies, 'primary_color': entry.get('primary_color'), 'secondary_color': entry.get('secondary_color'), + 'example_cards': example_cards_value, + 'example_commanders': example_commanders_value, + 'synergy_example_cards': entry.get('synergy_example_cards', []), + 'synergy_commanders': entry.get('synergy_commanders', []), + 'deck_archetype': entry.get('deck_archetype'), + 'popularity_hint': entry.get('popularity_hint'), + 'popularity_bucket': entry.get('popularity_bucket'), + 'editorial_quality': entry.get('editorial_quality'), + 'description': entry.get('description'), 'notes': '' } - # Drop None color keys for cleanliness + # Drop None/empty keys for cleanliness if doc['primary_color'] is None: doc.pop('primary_color') if doc.get('secondary_color') is None: doc.pop('secondary_color') + if not doc.get('example_cards'): + doc.pop('example_cards') + if not doc.get('example_commanders'): + doc.pop('example_commanders') + if not doc.get('synergy_example_cards'): + doc.pop('synergy_example_cards') + if not doc.get('synergy_commanders'): + doc.pop('synergy_commanders') + if doc.get('deck_archetype') is None: + doc.pop('deck_archetype') + if doc.get('popularity_hint') is None: + doc.pop('popularity_hint') + if doc.get('popularity_bucket') is None: + doc.pop('popularity_bucket') + if doc.get('editorial_quality') is None: + doc.pop('editorial_quality') + if doc.get('description') is None: + doc.pop('description') with path.open('w', encoding='utf-8') as f: yaml.safe_dump(doc, f, sort_keys=False, allow_unicode=True) exported += 1 diff --git a/code/scripts/generate_theme_catalog.py b/code/scripts/generate_theme_catalog.py index 622de89..e5c7e77 100644 --- a/code/scripts/generate_theme_catalog.py +++ b/code/scripts/generate_theme_catalog.py @@ -19,6 +19,13 @@ from datetime import datetime, timezone from pathlib import Path from typing import Dict, Iterable, List, Optional, Sequence +try: + import pandas as pd + HAS_PANDAS = True +except ImportError: + HAS_PANDAS = False + pd = None # type: ignore + ROOT = Path(__file__).resolve().parents[2] CODE_ROOT = ROOT / "code" if str(CODE_ROOT) not in sys.path: @@ -29,6 +36,9 @@ try: except Exception: # pragma: no cover - fallback for adhoc execution DEFAULT_CSV_DIRECTORY = "csv_files" +# Parquet support requires pandas (imported at top of file, uses pyarrow under the hood) +HAS_PARQUET_SUPPORT = HAS_PANDAS + DEFAULT_OUTPUT_PATH = ROOT / "config" / "themes" / "theme_catalog.csv" HEADER_COMMENT_PREFIX = "# theme_catalog" @@ -87,7 +97,68 @@ def parse_theme_tags(value: object) -> List[str]: return [] +def _load_theme_counts_from_parquet( + parquet_path: Path, + theme_variants: Dict[str, set[str]] +) -> Counter[str]: + """Load theme counts from a parquet file using pandas (which uses pyarrow). + + Args: + parquet_path: Path to the parquet file (commander_cards.parquet or all_cards.parquet) + theme_variants: Dict to accumulate theme name variants + + Returns: + Counter of theme occurrences + """ + if pd is None: + return Counter() + + counts: Counter[str] = Counter() + + if not parquet_path.exists(): + return counts + + # Read only themeTags column for efficiency + try: + df = pd.read_parquet(parquet_path, columns=["themeTags"]) + except Exception: + # If themeTags column doesn't exist, return empty + return counts + + # Convert to list for fast iteration (faster than iterrows) + theme_tags_list = df["themeTags"].tolist() + + for raw_value in theme_tags_list: + if raw_value is None or (isinstance(raw_value, float) and pd.isna(raw_value)): + continue + tags = parse_theme_tags(raw_value) + if not tags: + continue + seen_in_row: set[str] = set() + for tag in tags: + display = normalize_theme_display(tag) + if not display: + continue + key = canonical_key(display) + if key in seen_in_row: + continue + seen_in_row.add(key) + counts[key] += 1 + theme_variants[key].add(display) + + return counts + + def _load_theme_counts(csv_path: Path, theme_variants: Dict[str, set[str]]) -> Counter[str]: + """Load theme counts from CSV file (fallback method). + + Args: + csv_path: Path to CSV file + theme_variants: Dict to accumulate theme name variants + + Returns: + Counter of theme occurrences + """ counts: Counter[str] = Counter() if not csv_path.exists(): return counts @@ -146,24 +217,67 @@ def build_theme_catalog( commander_filename: str = "commander_cards.csv", cards_filename: str = "cards.csv", logs_directory: Optional[Path] = None, + use_parquet: bool = True, ) -> CatalogBuildResult: + """Build theme catalog from card data. + + Args: + csv_directory: Directory containing CSV files (fallback) + output_path: Where to write the catalog CSV + generated_at: Optional timestamp for generation + commander_filename: Name of commander CSV file + cards_filename: Name of cards CSV file + logs_directory: Optional directory to copy output to + use_parquet: If True, try to use all_cards.parquet first (default: True) + + Returns: + CatalogBuildResult with generated rows and metadata + """ csv_directory = csv_directory.resolve() output_path = output_path.resolve() theme_variants: Dict[str, set[str]] = defaultdict(set) - commander_counts = _load_theme_counts(csv_directory / commander_filename, theme_variants) + # Try to use parquet file first (much faster) + used_parquet = False + if use_parquet and HAS_PARQUET_SUPPORT: + try: + # Use dedicated parquet files (matches CSV structure exactly) + parquet_dir = csv_directory.parent / "card_files" + + # Load commander counts directly from commander_cards.parquet + commander_parquet = parquet_dir / "commander_cards.parquet" + commander_counts = _load_theme_counts_from_parquet( + commander_parquet, theme_variants=theme_variants + ) + + # CSV method doesn't load non-commander cards, so we don't either + card_counts = Counter() + + used_parquet = True + print("✓ Loaded theme data from parquet files") + + except Exception as e: + print(f"⚠ Failed to load from parquet: {e}") + print(" Falling back to CSV files...") + used_parquet = False + + # Fallback to CSV files if parquet not available or failed + if not used_parquet: + commander_counts = _load_theme_counts(csv_directory / commander_filename, theme_variants) - card_counts: Counter[str] = Counter() - cards_path = csv_directory / cards_filename - if cards_path.exists(): - card_counts = _load_theme_counts(cards_path, theme_variants) - else: - # Fallback: scan all *_cards.csv except commander - for candidate in csv_directory.glob("*_cards.csv"): - if candidate.name == commander_filename: - continue - card_counts += _load_theme_counts(candidate, theme_variants) + card_counts: Counter[str] = Counter() + cards_path = csv_directory / cards_filename + if cards_path.exists(): + card_counts = _load_theme_counts(cards_path, theme_variants) + else: + # Fallback: scan all *_cards.csv except commander + for candidate in csv_directory.glob("*_cards.csv"): + if candidate.name == commander_filename: + continue + card_counts += _load_theme_counts(candidate, theme_variants) + + print("✓ Loaded theme data from CSV files") keys = sorted(set(card_counts.keys()) | set(commander_counts.keys())) generated_at_iso = _derive_generated_at(generated_at) diff --git a/code/scripts/preview_dfc_catalog_diff.py b/code/scripts/preview_dfc_catalog_diff.py deleted file mode 100644 index 6e791d1..0000000 --- a/code/scripts/preview_dfc_catalog_diff.py +++ /dev/null @@ -1,305 +0,0 @@ -"""Catalog diff helper for verifying multi-face merge output. - -This utility regenerates the card CSV catalog (optionally writing compatibility -snapshots) and then compares the merged outputs against the baseline snapshots. -It is intended to support the MDFC rollout checklist by providing a concise summary -of how many rows were merged, which cards collapsed into a single record, and -whether any tag unions diverge from expectations. - -Example usage (from repo root, inside virtualenv): - - python -m code.scripts.preview_dfc_catalog_diff --compat-snapshot --output logs/dfc_catalog_diff.json - -The script prints a human readable summary to stdout and optionally writes a JSON -artifact for release/staging review. -""" -from __future__ import annotations - -import argparse -import ast -import importlib -import json -import os -import sys -import time -from collections import Counter -from pathlib import Path -from typing import Any, Dict, Iterable, List, Sequence - -import pandas as pd - -from settings import COLORS, CSV_DIRECTORY - -DEFAULT_COMPAT_DIR = Path(os.getenv("DFC_COMPAT_DIR", "csv_files/compat_faces")) -CSV_ROOT = Path(CSV_DIRECTORY) - - -def _parse_list_cell(value: Any) -> List[str]: - """Convert serialized list cells ("['A', 'B']") into Python lists.""" - if isinstance(value, list): - return [str(item) for item in value] - if value is None: - return [] - if isinstance(value, float) and pd.isna(value): # type: ignore[arg-type] - return [] - text = str(value).strip() - if not text: - return [] - try: - parsed = ast.literal_eval(text) - except (SyntaxError, ValueError): - return [text] - if isinstance(parsed, list): - return [str(item) for item in parsed] - return [str(parsed)] - - -def _load_catalog(path: Path) -> pd.DataFrame: - if not path.exists(): - raise FileNotFoundError(f"Catalog file missing: {path}") - df = pd.read_csv(path) - for column in ("themeTags", "keywords", "creatureTypes"): - if column in df.columns: - df[column] = df[column].apply(_parse_list_cell) - return df - - -def _multi_face_names(df: pd.DataFrame) -> List[str]: - counts = Counter(df.get("name", [])) - return [name for name, count in counts.items() if isinstance(name, str) and count > 1] - - -def _collect_tags(series: Iterable[List[str]]) -> List[str]: - tags: List[str] = [] - for value in series: - if isinstance(value, list): - tags.extend(str(item) for item in value) - return sorted(set(tags)) - - -def _summarize_color( - color: str, - merged: pd.DataFrame, - baseline: pd.DataFrame, - sample_size: int, -) -> Dict[str, Any]: - merged_names = set(merged.get("name", [])) - baseline_names = list(baseline.get("name", [])) - baseline_name_set = set(name for name in baseline_names if isinstance(name, str)) - - multi_face = _multi_face_names(baseline) - collapsed = [] - tag_mismatches: List[str] = [] - missing_after_merge: List[str] = [] - - for name in multi_face: - group = baseline[baseline["name"] == name] - merged_row = merged[merged["name"] == name] - if merged_row.empty: - missing_after_merge.append(name) - continue - expected_tags = _collect_tags(group["themeTags"]) if "themeTags" in group else [] - merged_tags = _collect_tags(merged_row.iloc[[0]]["themeTags"]) if "themeTags" in merged_row else [] - if expected_tags != merged_tags: - tag_mismatches.append(name) - collapsed.append(name) - - removed_names = sorted(baseline_name_set - merged_names) - added_names = sorted(merged_names - baseline_name_set) - - return { - "rows_merged": len(merged), - "rows_baseline": len(baseline), - "row_delta": len(merged) - len(baseline), - "multi_face_groups": len(multi_face), - "collapsed_sample": collapsed[:sample_size], - "tag_union_mismatches": tag_mismatches[:sample_size], - "missing_after_merge": missing_after_merge[:sample_size], - "removed_names": removed_names[:sample_size], - "added_names": added_names[:sample_size], - } - - -def _refresh_catalog(colors: Sequence[str], compat_snapshot: bool) -> None: - os.environ.pop("ENABLE_DFC_MERGE", None) - os.environ["DFC_COMPAT_SNAPSHOT"] = "1" if compat_snapshot else "0" - importlib.invalidate_caches() - # Reload tagger to pick up the new env var - tagger = importlib.import_module("code.tagging.tagger") - tagger = importlib.reload(tagger) # type: ignore[assignment] - - for color in colors: - tagger.load_dataframe(color) - - -def generate_diff( - colors: Sequence[str], - compat_dir: Path, - sample_size: int, -) -> Dict[str, Any]: - per_color: Dict[str, Any] = {} - overall = { - "total_rows_merged": 0, - "total_rows_baseline": 0, - "total_multi_face_groups": 0, - "colors": len(colors), - "tag_union_mismatches": 0, - "missing_after_merge": 0, - } - - for color in colors: - merged_path = CSV_ROOT / f"{color}_cards.csv" - baseline_path = compat_dir / f"{color}_cards_unmerged.csv" - merged_df = _load_catalog(merged_path) - baseline_df = _load_catalog(baseline_path) - summary = _summarize_color(color, merged_df, baseline_df, sample_size) - per_color[color] = summary - overall["total_rows_merged"] += summary["rows_merged"] - overall["total_rows_baseline"] += summary["rows_baseline"] - overall["total_multi_face_groups"] += summary["multi_face_groups"] - overall["tag_union_mismatches"] += len(summary["tag_union_mismatches"]) - overall["missing_after_merge"] += len(summary["missing_after_merge"]) - - overall["row_delta_total"] = overall["total_rows_merged"] - overall["total_rows_baseline"] - return {"overall": overall, "per_color": per_color} - - -def main(argv: List[str]) -> int: - parser = argparse.ArgumentParser(description="Preview merged vs baseline DFC catalog diff") - parser.add_argument( - "--skip-refresh", - action="store_true", - help="Skip rebuilding the catalog in compatibility mode (requires existing compat snapshots)", - ) - parser.add_argument( - "--mode", - default="", - help="[Deprecated] Legacy ENABLE_DFC_MERGE value (compat|1|0 etc.)", - ) - parser.add_argument( - "--compat-snapshot", - dest="compat_snapshot", - action="store_true", - help="Write compatibility snapshots before diffing (default: off unless legacy --mode compat)", - ) - parser.add_argument( - "--no-compat-snapshot", - dest="compat_snapshot", - action="store_false", - help="Skip compatibility snapshots even if legacy --mode compat is supplied", - ) - parser.set_defaults(compat_snapshot=None) - parser.add_argument( - "--colors", - nargs="*", - help="Optional subset of colors to diff (defaults to full COLORS list)", - ) - parser.add_argument( - "--compat-dir", - type=Path, - default=DEFAULT_COMPAT_DIR, - help="Directory containing unmerged compatibility snapshots (default: %(default)s)", - ) - parser.add_argument( - "--output", - type=Path, - help="Optional JSON file to write with the diff summary", - ) - parser.add_argument( - "--sample-size", - type=int, - default=10, - help="Number of sample entries to include per section (default: %(default)s)", - ) - args = parser.parse_args(argv) - - colors = tuple(args.colors) if args.colors else tuple(COLORS) - compat_dir = args.compat_dir - - mode = str(args.mode or "").strip().lower() - if mode and mode not in {"compat", "dual", "both", "1", "on", "true", "0", "off", "false", "disabled"}: - print( - f"ℹ Legacy --mode value '{mode}' detected; merge remains enabled. Use --compat-snapshot as needed.", - flush=True, - ) - - if args.compat_snapshot is None: - compat_snapshot = mode in {"compat", "dual", "both"} - else: - compat_snapshot = args.compat_snapshot - if mode: - print( - "ℹ Ignoring deprecated --mode value because --compat-snapshot/--no-compat-snapshot was supplied.", - flush=True, - ) - - if mode in {"0", "off", "false", "disabled"}: - print( - "⚠ ENABLE_DFC_MERGE=off is deprecated; the merge remains enabled regardless of the value.", - flush=True, - ) - - if not args.skip_refresh: - start = time.perf_counter() - _refresh_catalog(colors, compat_snapshot) - duration = time.perf_counter() - start - snapshot_msg = "with compat snapshot" if compat_snapshot else "merged-only" - print(f"✔ Refreshed catalog in {duration:.1f}s ({snapshot_msg})") - else: - print("ℹ Using existing catalog outputs (refresh skipped)") - - try: - diff = generate_diff(colors, compat_dir, args.sample_size) - except FileNotFoundError as exc: - print(f"ERROR: {exc}") - print("Run without --skip-refresh (or ensure compat snapshots exist).", file=sys.stderr) - return 2 - - overall = diff["overall"] - print("\n=== DFC Catalog Diff Summary ===") - print( - f"Merged rows: {overall['total_rows_merged']:,} | Baseline rows: {overall['total_rows_baseline']:,} | " - f"Δ rows: {overall['row_delta_total']:,}" - ) - print( - f"Multi-face groups: {overall['total_multi_face_groups']:,} | " - f"Tag union mismatches: {overall['tag_union_mismatches']} | Missing after merge: {overall['missing_after_merge']}" - ) - - for color, summary in diff["per_color"].items(): - print(f"\n[{color}] baseline={summary['rows_baseline']} merged={summary['rows_merged']} Δ={summary['row_delta']}") - if summary["multi_face_groups"]: - print(f" multi-face groups: {summary['multi_face_groups']}") - if summary["collapsed_sample"]: - sample = ", ".join(summary["collapsed_sample"][:3]) - print(f" collapsed sample: {sample}") - if summary["tag_union_mismatches"]: - print(f" TAG MISMATCH sample: {', '.join(summary['tag_union_mismatches'])}") - if summary["missing_after_merge"]: - print(f" MISSING sample: {', '.join(summary['missing_after_merge'])}") - if summary["removed_names"]: - print(f" removed sample: {', '.join(summary['removed_names'])}") - if summary["added_names"]: - print(f" added sample: {', '.join(summary['added_names'])}") - - if args.output: - payload = { - "captured_at": int(time.time()), - "mode": args.mode, - "colors": colors, - "compat_dir": str(compat_dir), - "summary": diff, - } - try: - args.output.parent.mkdir(parents=True, exist_ok=True) - args.output.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") - print(f"\n📄 Wrote JSON summary to {args.output}") - except Exception as exc: # pragma: no cover - print(f"Failed to write output file {args.output}: {exc}", file=sys.stderr) - return 3 - - return 0 - - -if __name__ == "__main__": # pragma: no cover - raise SystemExit(main(sys.argv[1:])) diff --git a/code/scripts/preview_metrics_snapshot.py b/code/scripts/preview_metrics_snapshot.py deleted file mode 100644 index ba54bba..0000000 --- a/code/scripts/preview_metrics_snapshot.py +++ /dev/null @@ -1,105 +0,0 @@ -"""CLI utility: snapshot preview metrics and emit summary/top slow themes. - -Usage (from repo root virtualenv): - python -m code.scripts.preview_metrics_snapshot --limit 10 --output logs/preview_metrics_snapshot.json - -Fetches /themes/metrics (requires WEB_THEME_PICKER_DIAGNOSTICS=1) and writes a compact JSON plus -human-readable summary to stdout. -""" -from __future__ import annotations - -import argparse -import json -import sys -import time -from pathlib import Path -from typing import Any, Dict - -import urllib.request -import urllib.error - -DEFAULT_URL = "http://localhost:8000/themes/metrics" - - -def fetch_metrics(url: str) -> Dict[str, Any]: - req = urllib.request.Request(url, headers={"Accept": "application/json"}) - with urllib.request.urlopen(req, timeout=10) as resp: # nosec B310 (local trusted) - data = resp.read().decode("utf-8", "replace") - try: - return json.loads(data) # type: ignore[return-value] - except json.JSONDecodeError as e: # pragma: no cover - unlikely if server OK - raise SystemExit(f"Invalid JSON from metrics endpoint: {e}\nRaw: {data[:400]}") - - -def summarize(metrics: Dict[str, Any], top_n: int) -> Dict[str, Any]: - preview = (metrics.get("preview") or {}) if isinstance(metrics, dict) else {} - per_theme = preview.get("per_theme") or {} - # Compute top slow themes by avg_ms - items = [] - for slug, info in per_theme.items(): - if not isinstance(info, dict): - continue - avg = info.get("avg_ms") - if isinstance(avg, (int, float)): - items.append((slug, float(avg), info)) - items.sort(key=lambda x: x[1], reverse=True) - top = items[:top_n] - return { - "preview_requests": preview.get("preview_requests"), - "preview_cache_hits": preview.get("preview_cache_hits"), - "preview_avg_build_ms": preview.get("preview_avg_build_ms"), - "preview_p95_build_ms": preview.get("preview_p95_build_ms"), - "preview_ttl_seconds": preview.get("preview_ttl_seconds"), - "editorial_curated_vs_sampled_pct": preview.get("editorial_curated_vs_sampled_pct"), - "top_slowest": [ - { - "slug": slug, - "avg_ms": avg, - "p95_ms": info.get("p95_ms"), - "builds": info.get("builds"), - "requests": info.get("requests"), - "avg_curated_pct": info.get("avg_curated_pct"), - } - for slug, avg, info in top - ], - } - - -def main(argv: list[str]) -> int: - ap = argparse.ArgumentParser(description="Snapshot preview metrics") - ap.add_argument("--url", default=DEFAULT_URL, help="Metrics endpoint URL (default: %(default)s)") - ap.add_argument("--limit", type=int, default=10, help="Top N slow themes to include (default: %(default)s)") - ap.add_argument("--output", type=Path, help="Optional output JSON file for snapshot") - ap.add_argument("--quiet", action="store_true", help="Suppress stdout summary (still writes file if --output)") - args = ap.parse_args(argv) - - try: - raw = fetch_metrics(args.url) - except urllib.error.URLError as e: - print(f"ERROR: Failed fetching metrics endpoint: {e}", file=sys.stderr) - return 2 - - summary = summarize(raw, args.limit) - snapshot = { - "captured_at": int(time.time()), - "source": args.url, - "summary": summary, - } - - if args.output: - try: - args.output.parent.mkdir(parents=True, exist_ok=True) - args.output.write_text(json.dumps(snapshot, indent=2, sort_keys=True), encoding="utf-8") - except Exception as e: # pragma: no cover - print(f"ERROR: writing snapshot file failed: {e}", file=sys.stderr) - return 3 - - if not args.quiet: - print("Preview Metrics Snapshot:") - print(json.dumps(summary, indent=2)) - - return 0 - - -if __name__ == "__main__": # pragma: no cover - raise SystemExit(main(sys.argv[1:])) diff --git a/code/scripts/preview_perf_benchmark.py b/code/scripts/preview_perf_benchmark.py deleted file mode 100644 index f1e60ed..0000000 --- a/code/scripts/preview_perf_benchmark.py +++ /dev/null @@ -1,349 +0,0 @@ -"""Ad-hoc performance benchmark for theme preview build latency (Phase A validation). - -Runs warm-up plus measured request loops against several theme slugs and prints -aggregate latency stats (p50/p90/p95, cache hit ratio evolution). Intended to -establish or validate that refactor did not introduce >5% p95 regression. - -Usage (ensure server running locally – commonly :8080 in docker compose): - python -m code.scripts.preview_perf_benchmark --themes 8 --loops 40 \ - --url http://localhost:8080 --warm 1 --limit 12 - -Theme slug discovery hierarchy (when --theme not provided): - 1. Try /themes/index.json (legacy / planned static index) - 2. Fallback to /themes/api/themes (current API) and take the first N ids -The discovered slugs are sorted deterministically then truncated to N. - -NOTE: This is intentionally minimal (no external deps). For stable comparisons -run with identical parameters pre/post-change and commit the JSON output under -logs/perf/. -""" -from __future__ import annotations - -import argparse -import json -import statistics -import time -from typing import Any, Dict, List -import urllib.request -import urllib.error -import sys -from pathlib import Path - - -def _fetch_json(url: str) -> Dict[str, Any]: - req = urllib.request.Request(url, headers={"Accept": "application/json"}) - with urllib.request.urlopen(req, timeout=15) as resp: # nosec B310 local dev - data = resp.read().decode("utf-8", "replace") - return json.loads(data) # type: ignore[return-value] - - -def _fetch_json_with_retry(url: str, attempts: int = 3, delay: float = 0.6) -> Dict[str, Any]: - last_error: Exception | None = None - for attempt in range(1, attempts + 1): - try: - return _fetch_json(url) - except Exception as exc: # pragma: no cover - network variability - last_error = exc - if attempt < attempts: - print(json.dumps({ # noqa: T201 - "event": "preview_perf_fetch_retry", - "url": url, - "attempt": attempt, - "max_attempts": attempts, - "error": str(exc), - })) - time.sleep(delay * attempt) - else: - raise - raise last_error # pragma: no cover - defensive; should be unreachable - - -def select_theme_slugs(base_url: str, count: int) -> List[str]: - """Discover theme slugs for benchmarking. - - Attempts legacy static index first, then falls back to live API listing. - """ - errors: List[str] = [] - slugs: List[str] = [] - # Attempt 1: legacy /themes/index.json - try: - idx = _fetch_json(f"{base_url.rstrip('/')}/themes/index.json") - entries = idx.get("themes") or [] - for it in entries: - if not isinstance(it, dict): - continue - slug = it.get("slug") or it.get("id") or it.get("theme_id") - if isinstance(slug, str): - slugs.append(slug) - except Exception as e: # pragma: no cover - network variability - errors.append(f"index.json failed: {e}") - - if not slugs: - # Attempt 2: live API listing - try: - listing = _fetch_json(f"{base_url.rstrip('/')}/themes/api/themes") - items = listing.get("items") or [] - for it in items: - if not isinstance(it, dict): - continue - tid = it.get("id") or it.get("slug") or it.get("theme_id") - if isinstance(tid, str): - slugs.append(tid) - except Exception as e: # pragma: no cover - network variability - errors.append(f"api/themes failed: {e}") - - slugs = sorted(set(slugs))[:count] - if not slugs: - raise SystemExit("No theme slugs discovered; cannot benchmark (" + "; ".join(errors) + ")") - return slugs - - -def fetch_all_theme_slugs(base_url: str, page_limit: int = 200) -> List[str]: - """Fetch all theme slugs via paginated /themes/api/themes endpoint. - - Uses maximum page size (200) and iterates using offset until no next page. - Returns deterministic sorted unique list of slugs. - """ - slugs: List[str] = [] - offset = 0 - seen: set[str] = set() - page_attempts = 5 - page_delay = 1.2 - while True: - url = f"{base_url.rstrip('/')}/themes/api/themes?limit={page_limit}&offset={offset}" - data: Dict[str, Any] | None = None - last_error: Exception | None = None - for attempt in range(1, page_attempts + 1): - try: - data = _fetch_json_with_retry(url, attempts=4, delay=0.75) - break - except Exception as exc: # pragma: no cover - network variability - last_error = exc - if attempt < page_attempts: - print(json.dumps({ # noqa: T201 - "event": "preview_perf_page_retry", - "offset": offset, - "attempt": attempt, - "max_attempts": page_attempts, - "error": str(exc), - })) - time.sleep(page_delay * attempt) - else: - raise SystemExit(f"Failed fetching themes page offset={offset}: {exc}") - if data is None: # pragma: no cover - defensive - raise SystemExit(f"Failed fetching themes page offset={offset}: {last_error}") - items = data.get("items") or [] - for it in items: - if not isinstance(it, dict): - continue - tid = it.get("id") or it.get("slug") or it.get("theme_id") - if isinstance(tid, str) and tid not in seen: - seen.add(tid) - slugs.append(tid) - next_offset = data.get("next_offset") - if not next_offset or next_offset == offset: - break - offset = int(next_offset) - return sorted(slugs) - - -def percentile(values: List[float], pct: float) -> float: - if not values: - return 0.0 - sv = sorted(values) - k = (len(sv) - 1) * pct - f = int(k) - c = min(f + 1, len(sv) - 1) - if f == c: - return sv[f] - d0 = sv[f] * (c - k) - d1 = sv[c] * (k - f) - return d0 + d1 - - -def run_loop(base_url: str, slugs: List[str], loops: int, limit: int, warm: bool, path_template: str) -> Dict[str, Any]: - latencies: List[float] = [] - per_slug_counts = {s: 0 for s in slugs} - t_start = time.time() - for i in range(loops): - slug = slugs[i % len(slugs)] - # path_template may contain {slug} and {limit} - try: - rel = path_template.format(slug=slug, limit=limit) - except Exception: - rel = f"/themes/api/theme/{slug}/preview?limit={limit}" - if not rel.startswith('/'): - rel = '/' + rel - url = f"{base_url.rstrip('/')}{rel}" - t0 = time.time() - try: - _fetch_json(url) - except Exception as e: - print(json.dumps({"event": "perf_benchmark_error", "slug": slug, "error": str(e)})) # noqa: T201 - continue - ms = (time.time() - t0) * 1000.0 - latencies.append(ms) - per_slug_counts[slug] += 1 - elapsed = time.time() - t_start - return { - "warm": warm, - "loops": loops, - "slugs": slugs, - "per_slug_requests": per_slug_counts, - "elapsed_s": round(elapsed, 3), - "p50_ms": round(percentile(latencies, 0.50), 2), - "p90_ms": round(percentile(latencies, 0.90), 2), - "p95_ms": round(percentile(latencies, 0.95), 2), - "avg_ms": round(statistics.mean(latencies), 2) if latencies else 0.0, - "count": len(latencies), - "_latencies": latencies, # internal (removed in final result unless explicitly retained) - } - - -def _stats_from_latencies(latencies: List[float]) -> Dict[str, Any]: - if not latencies: - return {"count": 0, "p50_ms": 0.0, "p90_ms": 0.0, "p95_ms": 0.0, "avg_ms": 0.0} - return { - "count": len(latencies), - "p50_ms": round(percentile(latencies, 0.50), 2), - "p90_ms": round(percentile(latencies, 0.90), 2), - "p95_ms": round(percentile(latencies, 0.95), 2), - "avg_ms": round(statistics.mean(latencies), 2), - } - - -def main(argv: List[str]) -> int: - ap = argparse.ArgumentParser(description="Theme preview performance benchmark") - ap.add_argument("--url", default="http://localhost:8000", help="Base server URL (default: %(default)s)") - ap.add_argument("--themes", type=int, default=6, help="Number of theme slugs to exercise (default: %(default)s)") - ap.add_argument("--loops", type=int, default=60, help="Total request iterations (default: %(default)s)") - ap.add_argument("--limit", type=int, default=12, help="Preview size (default: %(default)s)") - ap.add_argument("--path-template", default="/themes/api/theme/{slug}/preview?limit={limit}", help="Format string for preview request path (default: %(default)s)") - ap.add_argument("--theme", action="append", dest="explicit_theme", help="Explicit theme slug(s); overrides automatic selection") - ap.add_argument("--warm", type=int, default=1, help="Number of warm-up loops (full cycles over selected slugs) (default: %(default)s)") - ap.add_argument("--output", type=Path, help="Optional JSON output path (committed under logs/perf)") - ap.add_argument("--all", action="store_true", help="Exercise ALL themes (ignores --themes; loops auto-set to passes*total_slugs unless --loops-explicit)") - ap.add_argument("--passes", type=int, default=1, help="When using --all, number of passes over the full theme set (default: %(default)s)") - # Hidden flag to detect if user explicitly set --loops (argparse has no direct support, so use sentinel technique) - # We keep original --loops for backwards compatibility; when --all we recompute unless user passed --loops-explicit - ap.add_argument("--loops-explicit", action="store_true", help=argparse.SUPPRESS) - ap.add_argument("--extract-warm-baseline", type=Path, help="If multi-pass (--all --passes >1), write a warm-only baseline JSON (final pass stats) to this path") - args = ap.parse_args(argv) - - try: - if args.explicit_theme: - slugs = args.explicit_theme - elif args.all: - slugs = fetch_all_theme_slugs(args.url) - else: - slugs = select_theme_slugs(args.url, args.themes) - except SystemExit as e: # pragma: no cover - dependency on live server - print(str(e), file=sys.stderr) - return 2 - - mode = "all" if args.all else "subset" - total_slugs = len(slugs) - if args.all and not args.loops_explicit: - # Derive loops = passes * total_slugs - args.loops = max(1, args.passes) * total_slugs - - print(json.dumps({ # noqa: T201 - "event": "preview_perf_start", - "mode": mode, - "total_slugs": total_slugs, - "planned_loops": args.loops, - "passes": args.passes if args.all else None, - })) - - # Execution paths: - # 1. Standard subset or single-pass all: warm cycles -> single measured run - # 2. Multi-pass all mode (--all --passes >1): iterate passes capturing per-pass stats (no separate warm loops) - if args.all and args.passes > 1: - pass_results: List[Dict[str, Any]] = [] - combined_latencies: List[float] = [] - t0_all = time.time() - for p in range(1, args.passes + 1): - r = run_loop(args.url, slugs, len(slugs), args.limit, warm=(p == 1), path_template=args.path_template) - lat = r.pop("_latencies", []) - combined_latencies.extend(lat) - pass_result = { - "pass": p, - "warm": r["warm"], - "elapsed_s": r["elapsed_s"], - "p50_ms": r["p50_ms"], - "p90_ms": r["p90_ms"], - "p95_ms": r["p95_ms"], - "avg_ms": r["avg_ms"], - "count": r["count"], - } - pass_results.append(pass_result) - total_elapsed = round(time.time() - t0_all, 3) - aggregate = _stats_from_latencies(combined_latencies) - result = { - "mode": mode, - "total_slugs": total_slugs, - "passes": args.passes, - "slugs": slugs, - "combined": { - **aggregate, - "elapsed_s": total_elapsed, - }, - "passes_results": pass_results, - "cold_pass_p95_ms": pass_results[0]["p95_ms"], - "warm_pass_p95_ms": pass_results[-1]["p95_ms"], - "cold_pass_p50_ms": pass_results[0]["p50_ms"], - "warm_pass_p50_ms": pass_results[-1]["p50_ms"], - } - print(json.dumps({"event": "preview_perf_result", **result}, indent=2)) # noqa: T201 - # Optional warm baseline extraction (final pass only; represents warmed steady-state) - if args.extract_warm_baseline: - try: - wb = pass_results[-1] - warm_obj = { - "event": "preview_perf_warm_baseline", - "mode": mode, - "total_slugs": total_slugs, - "warm_baseline": True, - "source_pass": wb["pass"], - "p50_ms": wb["p50_ms"], - "p90_ms": wb["p90_ms"], - "p95_ms": wb["p95_ms"], - "avg_ms": wb["avg_ms"], - "count": wb["count"], - "slugs": slugs, - } - args.extract_warm_baseline.parent.mkdir(parents=True, exist_ok=True) - args.extract_warm_baseline.write_text(json.dumps(warm_obj, indent=2, sort_keys=True), encoding="utf-8") - print(json.dumps({ # noqa: T201 - "event": "preview_perf_warm_baseline_written", - "path": str(args.extract_warm_baseline), - "p95_ms": wb["p95_ms"], - })) - except Exception as e: # pragma: no cover - print(json.dumps({"event": "preview_perf_warm_baseline_error", "error": str(e)})) # noqa: T201 - else: - # Warm-up loops first (if requested) - for w in range(args.warm): - run_loop(args.url, slugs, len(slugs), args.limit, warm=True, path_template=args.path_template) - result = run_loop(args.url, slugs, args.loops, args.limit, warm=False, path_template=args.path_template) - result.pop("_latencies", None) - result["slugs"] = slugs - result["mode"] = mode - result["total_slugs"] = total_slugs - if args.all: - result["passes"] = args.passes - print(json.dumps({"event": "preview_perf_result", **result}, indent=2)) # noqa: T201 - - if args.output: - try: - args.output.parent.mkdir(parents=True, exist_ok=True) - # Ensure we write the final result object (multi-pass already prepared above) - args.output.write_text(json.dumps(result, indent=2, sort_keys=True), encoding="utf-8") - except Exception as e: # pragma: no cover - print(f"ERROR: failed writing output file: {e}", file=sys.stderr) - return 3 - return 0 - - -if __name__ == "__main__": # pragma: no cover - raise SystemExit(main(sys.argv[1:])) diff --git a/code/scripts/preview_perf_ci_check.py b/code/scripts/preview_perf_ci_check.py deleted file mode 100644 index 5550e4b..0000000 --- a/code/scripts/preview_perf_ci_check.py +++ /dev/null @@ -1,106 +0,0 @@ -"""CI helper: run a warm-pass benchmark candidate (single pass over all themes) -then compare against the committed warm baseline with threshold enforcement. - -Intended usage (example): - python -m code.scripts.preview_perf_ci_check --url http://localhost:8080 \ - --baseline logs/perf/theme_preview_warm_baseline.json --p95-threshold 5 - -Exit codes: - 0 success (within threshold) - 2 regression (p95 delta > threshold) - 3 setup / usage error - -Notes: -- Uses --all --passes 1 to create a fresh candidate snapshot that approximates - a warmed steady-state (server should have background refresh / typical load). -- If you prefer multi-pass then warm-only selection, adjust logic accordingly. -""" -from __future__ import annotations - -import argparse -import json -import subprocess -import sys -import time -import urllib.error -import urllib.request -from pathlib import Path -def _wait_for_service(base_url: str, attempts: int = 12, delay: float = 1.5) -> bool: - health_url = base_url.rstrip("/") + "/healthz" - last_error: Exception | None = None - for attempt in range(1, attempts + 1): - try: - with urllib.request.urlopen(health_url, timeout=5) as resp: # nosec B310 local CI - if 200 <= resp.status < 300: - return True - except urllib.error.HTTPError as exc: - last_error = exc - if 400 <= exc.code < 500 and exc.code != 429: - # Treat permanent client errors (other than rate limit) as fatal - break - except Exception as exc: # pragma: no cover - network variability - last_error = exc - time.sleep(delay * attempt) - print(json.dumps({ - "event": "ci_perf_error", - "stage": "startup", - "message": "Service health check failed", - "url": health_url, - "attempts": attempts, - "error": str(last_error) if last_error else None, - })) - return False - -def run(cmd: list[str]) -> subprocess.CompletedProcess: - return subprocess.run(cmd, capture_output=True, text=True, check=False) - -def main(argv: list[str]) -> int: - ap = argparse.ArgumentParser(description="Preview performance CI regression gate") - ap.add_argument("--url", default="http://localhost:8080", help="Base URL of running web service") - ap.add_argument("--baseline", type=Path, required=True, help="Path to committed warm baseline JSON") - ap.add_argument("--p95-threshold", type=float, default=5.0, help="Max allowed p95 regression percent (default: %(default)s)") - ap.add_argument("--candidate-output", type=Path, default=Path("logs/perf/theme_preview_ci_candidate.json"), help="Where to write candidate benchmark JSON") - ap.add_argument("--multi-pass", action="store_true", help="Run a 2-pass all-themes benchmark and compare warm pass only (optional enhancement)") - args = ap.parse_args(argv) - - if not args.baseline.exists(): - print(json.dumps({"event":"ci_perf_error","message":"Baseline not found","path":str(args.baseline)})) - return 3 - - if not _wait_for_service(args.url): - return 3 - - # Run candidate single-pass all-themes benchmark (no extra warm cycles to keep CI fast) - # If multi-pass requested, run two passes over all themes so second pass represents warmed steady-state. - passes = "2" if args.multi_pass else "1" - bench_cmd = [sys.executable, "-m", "code.scripts.preview_perf_benchmark", "--url", args.url, "--all", "--passes", passes, "--output", str(args.candidate_output)] - bench_proc = run(bench_cmd) - if bench_proc.returncode != 0: - print(json.dumps({"event":"ci_perf_error","stage":"benchmark","code":bench_proc.returncode,"stderr":bench_proc.stderr})) - return 3 - print(bench_proc.stdout) - - if not args.candidate_output.exists(): - print(json.dumps({"event":"ci_perf_error","message":"Candidate output missing"})) - return 3 - - compare_cmd = [ - sys.executable, - "-m","code.scripts.preview_perf_compare", - "--baseline", str(args.baseline), - "--candidate", str(args.candidate_output), - "--warm-only", - "--p95-threshold", str(args.p95_threshold), - ] - cmp_proc = run(compare_cmd) - print(cmp_proc.stdout) - if cmp_proc.returncode == 2: - # Already printed JSON with failure status - return 2 - if cmp_proc.returncode != 0: - print(json.dumps({"event":"ci_perf_error","stage":"compare","code":cmp_proc.returncode,"stderr":cmp_proc.stderr})) - return 3 - return 0 - -if __name__ == "__main__": # pragma: no cover - raise SystemExit(main(sys.argv[1:])) diff --git a/code/scripts/preview_perf_compare.py b/code/scripts/preview_perf_compare.py deleted file mode 100644 index e177e4c..0000000 --- a/code/scripts/preview_perf_compare.py +++ /dev/null @@ -1,115 +0,0 @@ -"""Compare two preview benchmark JSON result files and emit delta stats. - -Usage: - python -m code.scripts.preview_perf_compare --baseline logs/perf/theme_preview_baseline_all_pass1_20250923.json --candidate logs/perf/new_run.json - -Outputs JSON with percentage deltas for p50/p90/p95/avg (positive = regression/slower). -If multi-pass structures are present (combined & passes_results) those are included. -""" -from __future__ import annotations - -import argparse -import json -from pathlib import Path -from typing import Any, Dict - - -def load(path: Path) -> Dict[str, Any]: - data = json.loads(path.read_text(encoding="utf-8")) - # Multi-pass result may store stats under combined - if "combined" in data: - core = data["combined"].copy() - # Inject representative fields for uniform comparison - core["p50_ms"] = core.get("p50_ms") or data.get("p50_ms") - core["p90_ms"] = core.get("p90_ms") or data.get("p90_ms") - core["p95_ms"] = core.get("p95_ms") or data.get("p95_ms") - core["avg_ms"] = core.get("avg_ms") or data.get("avg_ms") - data["_core_stats"] = core - else: - data["_core_stats"] = { - k: data.get(k) for k in ("p50_ms", "p90_ms", "p95_ms", "avg_ms", "count") - } - return data - - -def pct_delta(new: float, old: float) -> float: - if old == 0: - return 0.0 - return round(((new - old) / old) * 100.0, 2) - - -def compare(baseline: Dict[str, Any], candidate: Dict[str, Any]) -> Dict[str, Any]: - b = baseline["_core_stats"] - c = candidate["_core_stats"] - result = {"baseline_count": b.get("count"), "candidate_count": c.get("count")} - for k in ("p50_ms", "p90_ms", "p95_ms", "avg_ms"): - if b.get(k) is not None and c.get(k) is not None: - result[k] = { - "baseline": b[k], - "candidate": c[k], - "delta_pct": pct_delta(c[k], b[k]), - } - # If both have per-pass details include first and last pass p95/p50 - if "passes_results" in baseline and "passes_results" in candidate: - result["passes"] = { - "baseline": { - "cold_p95": baseline.get("cold_pass_p95_ms"), - "warm_p95": baseline.get("warm_pass_p95_ms"), - "cold_p50": baseline.get("cold_pass_p50_ms"), - "warm_p50": baseline.get("warm_pass_p50_ms"), - }, - "candidate": { - "cold_p95": candidate.get("cold_pass_p95_ms"), - "warm_p95": candidate.get("warm_pass_p95_ms"), - "cold_p50": candidate.get("cold_pass_p50_ms"), - "warm_p50": candidate.get("warm_pass_p50_ms"), - }, - } - return result - - -def main(argv: list[str]) -> int: - ap = argparse.ArgumentParser(description="Compare two preview benchmark JSON result files") - ap.add_argument("--baseline", required=True, type=Path, help="Baseline JSON path") - ap.add_argument("--candidate", required=True, type=Path, help="Candidate JSON path") - ap.add_argument("--p95-threshold", type=float, default=None, help="Fail (exit 2) if p95 regression exceeds this percent (positive delta)") - ap.add_argument("--warm-only", action="store_true", help="When both results have passes, compare warm pass p95/p50 instead of combined/core") - args = ap.parse_args(argv) - if not args.baseline.exists(): - raise SystemExit(f"Baseline not found: {args.baseline}") - if not args.candidate.exists(): - raise SystemExit(f"Candidate not found: {args.candidate}") - baseline = load(args.baseline) - candidate = load(args.candidate) - # If warm-only requested and both have warm pass stats, override _core_stats before compare - if args.warm_only and "warm_pass_p95_ms" in baseline and "warm_pass_p95_ms" in candidate: - baseline["_core_stats"] = { - "p50_ms": baseline.get("warm_pass_p50_ms"), - "p90_ms": baseline.get("_core_stats", {}).get("p90_ms"), # p90 not tracked per-pass; retain combined - "p95_ms": baseline.get("warm_pass_p95_ms"), - "avg_ms": baseline.get("_core_stats", {}).get("avg_ms"), - "count": baseline.get("_core_stats", {}).get("count"), - } - candidate["_core_stats"] = { - "p50_ms": candidate.get("warm_pass_p50_ms"), - "p90_ms": candidate.get("_core_stats", {}).get("p90_ms"), - "p95_ms": candidate.get("warm_pass_p95_ms"), - "avg_ms": candidate.get("_core_stats", {}).get("avg_ms"), - "count": candidate.get("_core_stats", {}).get("count"), - } - cmp = compare(baseline, candidate) - payload = {"event": "preview_perf_compare", **cmp} - if args.p95_threshold is not None and "p95_ms" in cmp: - delta = cmp["p95_ms"]["delta_pct"] - payload["threshold"] = {"p95_threshold": args.p95_threshold, "p95_delta_pct": delta} - if delta is not None and delta > args.p95_threshold: - payload["result"] = "fail" - print(json.dumps(payload, indent=2)) # noqa: T201 - return 2 - payload["result"] = "pass" - print(json.dumps(payload, indent=2)) # noqa: T201 - return 0 - - -if __name__ == "__main__": # pragma: no cover - raise SystemExit(main(__import__('sys').argv[1:])) diff --git a/code/scripts/warm_preview_traffic.py b/code/scripts/warm_preview_traffic.py deleted file mode 100644 index 0f54c73..0000000 --- a/code/scripts/warm_preview_traffic.py +++ /dev/null @@ -1,91 +0,0 @@ -"""Generate warm preview traffic to populate theme preview cache & metrics. - -Usage: - python -m code.scripts.warm_preview_traffic --count 25 --repeats 2 \ - --base-url http://localhost:8000 --delay 0.05 - -Requirements: - - FastAPI server running locally exposing /themes endpoints - - WEB_THEME_PICKER_DIAGNOSTICS=1 so /themes/metrics is accessible - -Strategy: - 1. Fetch /themes/fragment/list?limit=COUNT to obtain HTML table. - 2. Extract theme slugs via regex on data-theme-id attributes. - 3. Issue REPEATS preview fragment requests per slug in order. - 4. Print simple timing / status summary. - -This script intentionally uses stdlib only (urllib, re, time) to avoid extra deps. -""" -from __future__ import annotations - -import argparse -import re -import time -import urllib.request -import urllib.error -from typing import List - -LIST_PATH = "/themes/fragment/list" -PREVIEW_PATH = "/themes/fragment/preview/{slug}" - - -def fetch(url: str) -> str: - req = urllib.request.Request(url, headers={"User-Agent": "warm-preview/1"}) - with urllib.request.urlopen(req, timeout=15) as resp: # nosec B310 (local trusted) - return resp.read().decode("utf-8", "replace") - - -def extract_slugs(html: str, limit: int) -> List[str]: - slugs = [] - for m in re.finditer(r'data-theme-id="([^"]+)"', html): - s = m.group(1).strip() - if s and s not in slugs: - slugs.append(s) - if len(slugs) >= limit: - break - return slugs - - -def warm(base_url: str, count: int, repeats: int, delay: float) -> None: - list_url = f"{base_url}{LIST_PATH}?limit={count}&offset=0" - print(f"[warm] Fetching list: {list_url}") - try: - html = fetch(list_url) - except urllib.error.URLError as e: # pragma: no cover - raise SystemExit(f"Failed fetching list: {e}") - slugs = extract_slugs(html, count) - if not slugs: - raise SystemExit("No theme slugs extracted – cannot warm.") - print(f"[warm] Extracted {len(slugs)} slugs: {', '.join(slugs[:8])}{'...' if len(slugs)>8 else ''}") - total_requests = 0 - start = time.time() - for r in range(repeats): - print(f"[warm] Pass {r+1}/{repeats}") - for slug in slugs: - url = f"{base_url}{PREVIEW_PATH.format(slug=slug)}" - try: - fetch(url) - except Exception as e: # pragma: no cover - print(f" [warn] Failed {slug}: {e}") - else: - total_requests += 1 - if delay: - time.sleep(delay) - dur = time.time() - start - print(f"[warm] Completed {total_requests} preview requests in {dur:.2f}s ({total_requests/dur if dur>0 else 0:.1f} rps)") - print("[warm] Done. Now run metrics snapshot to capture warm p95.") - - -def main(argv: list[str]) -> int: - ap = argparse.ArgumentParser(description="Generate warm preview traffic") - ap.add_argument("--base-url", default="http://localhost:8000", help="Base URL (default: %(default)s)") - ap.add_argument("--count", type=int, default=25, help="Number of distinct theme slugs to warm (default: %(default)s)") - ap.add_argument("--repeats", type=int, default=2, help="Repeat passes over slugs (default: %(default)s)") - ap.add_argument("--delay", type=float, default=0.05, help="Delay between requests in seconds (default: %(default)s)") - args = ap.parse_args(argv) - warm(args.base_url.rstrip("/"), args.count, args.repeats, args.delay) - return 0 - -if __name__ == "__main__": # pragma: no cover - import sys - raise SystemExit(main(sys.argv[1:])) diff --git a/code/tagging/tag_index.py b/code/tagging/tag_index.py new file mode 100644 index 0000000..19c3de8 --- /dev/null +++ b/code/tagging/tag_index.py @@ -0,0 +1,425 @@ +"""Fast tag indexing for reverse lookups and bulk operations. + +Provides a reverse index (tag → cards) for efficient tag-based queries. +Typical queries complete in <1ms after index is built. + +Usage: + # Build index from all_cards + index = TagIndex() + index.build() + + # Query cards with specific tag + cards = index.get_cards_with_tag("ramp") # Returns set of card names + + # Query cards with multiple tags (AND logic) + cards = index.get_cards_with_all_tags(["tokens", "sacrifice"]) + + # Query cards with any of several tags (OR logic) + cards = index.get_cards_with_any_tags(["lifegain", "lifelink"]) + + # Get tags for a specific card + tags = index.get_tags_for_card("Sol Ring") +""" +from __future__ import annotations + +import json +import os +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, List, Set, Optional + +from code.logging_util import get_logger +from code.services.all_cards_loader import AllCardsLoader + +logger = get_logger(__name__) + +# Default cache path for persisted index +DEFAULT_CACHE_PATH = Path("card_files/.tag_index_metadata.json") + + +@dataclass +class IndexStats: + """Statistics about the tag index.""" + total_cards: int + total_tags: int + total_mappings: int + build_time_seconds: float + indexed_at: float # Unix timestamp + all_cards_mtime: float # Unix timestamp of source file + + +class TagIndex: + """Fast reverse index for tag-based card queries. + + Builds two indexes: + - tag → set(card names) - Reverse index for fast tag queries + - card → list(tags) - Forward index for card tag lookups + + Performance: + - Index build: <5s for 50k cards + - Query time: <1ms per lookup + - Memory: ~50-100MB for 30k cards + """ + + def __init__(self, cache_path: Optional[Path] = None): + """Initialize empty tag index. + + Args: + cache_path: Path to persist index (default: card_files/.tag_index_metadata.json) + """ + self._tag_to_cards: Dict[str, Set[str]] = {} + self._card_to_tags: Dict[str, List[str]] = {} + self._stats: Optional[IndexStats] = None + self._cache_path = cache_path or DEFAULT_CACHE_PATH + self._loader = AllCardsLoader() + + def build(self, force_rebuild: bool = False) -> IndexStats: + """Build the tag index from all_cards. + + Loads all_cards and creates reverse index. If a cached index exists + and is up-to-date, loads from cache instead. + + Args: + force_rebuild: If True, rebuild even if cache is valid + + Returns: + IndexStats with build metrics + """ + # Check if we can use cached index + if not force_rebuild and self._try_load_from_cache(): + logger.info(f"Loaded tag index from cache: {self._stats.total_cards} cards, {self._stats.total_tags} tags") + return self._stats + + logger.info("Building tag index from all_cards...") + start_time = time.perf_counter() + + # Load all cards + df = self._loader.load() + + if "themeTags" not in df.columns: + logger.warning("themeTags column not found in all_cards") + self._stats = IndexStats( + total_cards=0, + total_tags=0, + total_mappings=0, + build_time_seconds=0, + indexed_at=time.time(), + all_cards_mtime=0 + ) + return self._stats + + # Clear existing indexes + self._tag_to_cards.clear() + self._card_to_tags.clear() + + # Build indexes + total_mappings = 0 + for _, row in df.iterrows(): + name = row.get("name") + if not name: + continue + + tags = self._normalize_tags(row.get("themeTags", [])) + if not tags: + continue + + # Store forward mapping (card → tags) + self._card_to_tags[name] = tags + + # Build reverse mapping (tag → cards) + for tag in tags: + if tag not in self._tag_to_cards: + self._tag_to_cards[tag] = set() + self._tag_to_cards[tag].add(name) + total_mappings += 1 + + build_time = time.perf_counter() - start_time + + # Get all_cards mtime for cache validation + all_cards_mtime = 0 + if os.path.exists(self._loader.file_path): + all_cards_mtime = os.path.getmtime(self._loader.file_path) + + self._stats = IndexStats( + total_cards=len(self._card_to_tags), + total_tags=len(self._tag_to_cards), + total_mappings=total_mappings, + build_time_seconds=build_time, + indexed_at=time.time(), + all_cards_mtime=all_cards_mtime + ) + + logger.info( + f"Built tag index: {self._stats.total_cards} cards, " + f"{self._stats.total_tags} unique tags, " + f"{self._stats.total_mappings} mappings in {build_time:.2f}s" + ) + + # Save to cache + self._save_to_cache() + + return self._stats + + def _normalize_tags(self, tags: object) -> List[str]: + """Normalize tags from various formats to list of strings. + + Handles: + - List of strings/objects + - String representations like "['tag1', 'tag2']" + - Comma-separated strings + - Empty/None values + """ + if not tags: + return [] + + if isinstance(tags, list): + # Already a list - normalize to strings + return [str(t).strip() for t in tags if t and str(t).strip()] + + if isinstance(tags, str): + # Handle empty or list repr + if not tags or tags == "[]": + return [] + + # Try parsing as list repr + if tags.startswith("["): + import ast + try: + parsed = ast.literal_eval(tags) + if isinstance(parsed, list): + return [str(t).strip() for t in parsed if t and str(t).strip()] + except (ValueError, SyntaxError): + pass + + # Fall back to comma-separated + return [t.strip() for t in tags.split(",") if t.strip()] + + return [] + + def get_cards_with_tag(self, tag: str) -> Set[str]: + """Get all card names that have a specific tag. + + Args: + tag: Theme tag to search for (case-sensitive) + + Returns: + Set of card names with the tag (empty if tag not found) + + Performance: O(1) lookup after index is built + """ + return self._tag_to_cards.get(tag, set()).copy() + + def get_cards_with_all_tags(self, tags: List[str]) -> Set[str]: + """Get cards that have ALL specified tags (AND logic). + + Args: + tags: List of tags (card must have all of them) + + Returns: + Set of card names with all tags (empty if no matches) + + Performance: O(k) where k is number of tags + """ + if not tags: + return set() + + # Start with cards for first tag + result = self.get_cards_with_tag(tags[0]) + + # Intersect with cards for each additional tag + for tag in tags[1:]: + result &= self.get_cards_with_tag(tag) + if not result: + # Short-circuit if no cards remain + break + + return result + + def get_cards_with_any_tags(self, tags: List[str]) -> Set[str]: + """Get cards that have ANY of the specified tags (OR logic). + + Args: + tags: List of tags (card needs at least one) + + Returns: + Set of card names with at least one tag + + Performance: O(k) where k is number of tags + """ + result: Set[str] = set() + for tag in tags: + result |= self.get_cards_with_tag(tag) + return result + + def get_tags_for_card(self, card_name: str) -> List[str]: + """Get all tags for a specific card. + + Args: + card_name: Name of the card + + Returns: + List of theme tags for the card (empty if not found) + + Performance: O(1) lookup + """ + return self._card_to_tags.get(card_name, []).copy() + + def get_all_tags(self) -> List[str]: + """Get list of all tags in the index. + + Returns: + Sorted list of all unique tags + """ + return sorted(self._tag_to_cards.keys()) + + def get_tag_stats(self, tag: str) -> Dict[str, int]: + """Get statistics for a specific tag. + + Args: + tag: Tag to get stats for + + Returns: + Dict with 'card_count' key + """ + return { + "card_count": len(self._tag_to_cards.get(tag, set())) + } + + def get_popular_tags(self, limit: int = 50) -> List[tuple[str, int]]: + """Get most popular tags sorted by card count. + + Args: + limit: Maximum number of tags to return + + Returns: + List of (tag, card_count) tuples sorted by count descending + """ + tag_counts = [ + (tag, len(cards)) + for tag, cards in self._tag_to_cards.items() + ] + tag_counts.sort(key=lambda x: x[1], reverse=True) + return tag_counts[:limit] + + def _save_to_cache(self) -> None: + """Save index to cache file.""" + if not self._stats: + return + + try: + cache_data = { + "stats": { + "total_cards": self._stats.total_cards, + "total_tags": self._stats.total_tags, + "total_mappings": self._stats.total_mappings, + "build_time_seconds": self._stats.build_time_seconds, + "indexed_at": self._stats.indexed_at, + "all_cards_mtime": self._stats.all_cards_mtime + }, + "tag_to_cards": { + tag: list(cards) + for tag, cards in self._tag_to_cards.items() + }, + "card_to_tags": self._card_to_tags + } + + self._cache_path.parent.mkdir(parents=True, exist_ok=True) + with self._cache_path.open("w", encoding="utf-8") as f: + json.dump(cache_data, f, indent=2) + + logger.debug(f"Saved tag index cache to {self._cache_path}") + + except Exception as e: + logger.warning(f"Failed to save tag index cache: {e}") + + def _try_load_from_cache(self) -> bool: + """Try to load index from cache file. + + Returns: + True if cache loaded successfully and is up-to-date + """ + if not self._cache_path.exists(): + return False + + try: + with self._cache_path.open("r", encoding="utf-8") as f: + cache_data = json.load(f) + + # Check if cache is up-to-date + stats_data = cache_data.get("stats", {}) + cached_mtime = stats_data.get("all_cards_mtime", 0) + + current_mtime = 0 + if os.path.exists(self._loader.file_path): + current_mtime = os.path.getmtime(self._loader.file_path) + + if current_mtime > cached_mtime: + logger.debug("Tag index cache outdated (all_cards modified)") + return False + + # Load indexes + self._tag_to_cards = { + tag: set(cards) + for tag, cards in cache_data.get("tag_to_cards", {}).items() + } + self._card_to_tags = cache_data.get("card_to_tags", {}) + + # Restore stats + self._stats = IndexStats(**stats_data) + + return True + + except Exception as e: + logger.warning(f"Failed to load tag index cache: {e}") + return False + + def clear_cache(self) -> None: + """Delete the cached index file.""" + if self._cache_path.exists(): + self._cache_path.unlink() + logger.debug(f"Deleted tag index cache: {self._cache_path}") + + def get_stats(self) -> Optional[IndexStats]: + """Get index statistics. + + Returns: + IndexStats if index has been built, None otherwise + """ + return self._stats + + +# Global index instance +_global_index: Optional[TagIndex] = None + + +def get_tag_index(force_rebuild: bool = False) -> TagIndex: + """Get or create the global tag index. + + Lazy-loads the index on first access. Subsequent calls return + the cached instance. + + Args: + force_rebuild: If True, rebuild the index even if cached + + Returns: + Global TagIndex instance + """ + global _global_index + + if _global_index is None or force_rebuild: + _global_index = TagIndex() + _global_index.build(force_rebuild=force_rebuild) + elif _global_index._stats is None: + # Index exists but hasn't been built yet + _global_index.build() + + return _global_index + + +def clear_global_index() -> None: + """Clear the global tag index instance.""" + global _global_index + if _global_index: + _global_index.clear_cache() + _global_index = None diff --git a/code/tagging/tag_loader.py b/code/tagging/tag_loader.py new file mode 100644 index 0000000..238a52d --- /dev/null +++ b/code/tagging/tag_loader.py @@ -0,0 +1,229 @@ +"""Efficient tag loading using consolidated all_cards file. + +Provides batch tag loading functions that leverage the all_cards.parquet file +instead of reading individual card CSV files. This is 10-50x faster for bulk +operations like deck building. + +Usage: + # Load tags for multiple cards at once + tags_dict = load_tags_for_cards(["Sol Ring", "Lightning Bolt", "Counterspell"]) + # Returns: {"Sol Ring": ["artifacts"], "Lightning Bolt": ["burn"], ...} + + # Load tags for a single card + tags = load_tags_for_card("Sol Ring") + # Returns: ["artifacts", "ramp"] +""" +from __future__ import annotations + +import os +from typing import Dict, List, Optional + +from code.logging_util import get_logger +from code.services.all_cards_loader import AllCardsLoader + +logger = get_logger(__name__) + +# Global loader instance for caching +_loader_instance: Optional[AllCardsLoader] = None + + +def _get_loader() -> AllCardsLoader: + """Get or create the global AllCardsLoader instance.""" + global _loader_instance + if _loader_instance is None: + _loader_instance = AllCardsLoader() + return _loader_instance + + +def clear_cache() -> None: + """Clear the cached all_cards data (useful after updates).""" + global _loader_instance + _loader_instance = None + + +def load_tags_for_cards(card_names: List[str]) -> Dict[str, List[str]]: + """Load theme tags for multiple cards in one batch operation. + + This is much faster than loading tags for each card individually, + especially when dealing with 50+ cards (typical deck size). + + Args: + card_names: List of card names to load tags for + + Returns: + Dictionary mapping card name to list of theme tags. + Cards not found or without tags will have empty list. + + Example: + >>> tags = load_tags_for_cards(["Sol Ring", "Lightning Bolt"]) + >>> tags["Sol Ring"] + ["artifacts", "ramp"] + """ + if not card_names: + return {} + + loader = _get_loader() + + try: + # Batch lookup - single query for all cards + df = loader.get_by_names(card_names) + + if df.empty: + logger.debug(f"No cards found for {len(card_names)} names") + return {name: [] for name in card_names} + + # Extract tags from DataFrame + result: Dict[str, List[str]] = {} + + if "themeTags" not in df.columns: + logger.warning("themeTags column not found in all_cards") + return {name: [] for name in card_names} + + # Build lookup dictionary + for _, row in df.iterrows(): + name = row.get("name") + if not name: + continue + + tags = row.get("themeTags", []) + + # Handle different themeTags formats + if isinstance(tags, list): + # Already a list - use directly + result[name] = [str(t).strip() for t in tags if t] + elif isinstance(tags, str): + # String format - could be comma-separated or list repr + if not tags or tags == "[]": + result[name] = [] + elif tags.startswith("["): + # List representation like "['tag1', 'tag2']" + import ast + try: + parsed = ast.literal_eval(tags) + if isinstance(parsed, list): + result[name] = [str(t).strip() for t in parsed if t] + else: + result[name] = [] + except (ValueError, SyntaxError): + # Fallback to comma split + result[name] = [t.strip() for t in tags.split(",") if t.strip()] + else: + # Comma-separated tags + result[name] = [t.strip() for t in tags.split(",") if t.strip()] + else: + result[name] = [] + + # Fill in missing cards with empty lists + for name in card_names: + if name not in result: + result[name] = [] + + return result + + except FileNotFoundError: + logger.warning("all_cards file not found, returning empty tags") + return {name: [] for name in card_names} + except Exception as e: + logger.error(f"Error loading tags for cards: {e}") + return {name: [] for name in card_names} + + +def load_tags_for_card(card_name: str) -> List[str]: + """Load theme tags for a single card. + + For loading tags for multiple cards, use load_tags_for_cards() instead + for better performance. + + Args: + card_name: Name of the card + + Returns: + List of theme tags for the card (empty if not found) + + Example: + >>> tags = load_tags_for_card("Sol Ring") + >>> "artifacts" in tags + True + """ + result = load_tags_for_cards([card_name]) + return result.get(card_name, []) + + +def get_cards_with_tag(tag: str, limit: Optional[int] = None) -> List[str]: + """Get all card names that have a specific tag. + + Args: + tag: Theme tag to search for + limit: Maximum number of cards to return (None = no limit) + + Returns: + List of card names with the tag + + Example: + >>> cards = get_cards_with_tag("ramp", limit=10) + >>> len(cards) <= 10 + True + """ + loader = _get_loader() + + try: + df = loader.filter_by_themes([tag], mode="any") + + if "name" not in df.columns: + return [] + + cards = df["name"].tolist() + + if limit is not None and len(cards) > limit: + return cards[:limit] + + return cards + + except Exception as e: + logger.error(f"Error getting cards with tag '{tag}': {e}") + return [] + + +def get_cards_with_all_tags(tags: List[str], limit: Optional[int] = None) -> List[str]: + """Get all card names that have ALL of the specified tags. + + Args: + tags: List of theme tags (card must have all of them) + limit: Maximum number of cards to return (None = no limit) + + Returns: + List of card names with all specified tags + + Example: + >>> cards = get_cards_with_all_tags(["ramp", "artifacts"]) + >>> # Returns cards that have both ramp AND artifacts tags + """ + loader = _get_loader() + + try: + df = loader.filter_by_themes(tags, mode="all") + + if "name" not in df.columns: + return [] + + cards = df["name"].tolist() + + if limit is not None and len(cards) > limit: + return cards[:limit] + + return cards + + except Exception as e: + logger.error(f"Error getting cards with all tags {tags}: {e}") + return [] + + +def is_use_all_cards_enabled() -> bool: + """Check if all_cards-based tag loading is enabled. + + Returns: + True if USE_ALL_CARDS_FOR_TAGS is enabled (default: True) + """ + # Check environment variable + env_value = os.environ.get("USE_ALL_CARDS_FOR_TAGS", "true").lower() + return env_value in ("1", "true", "yes", "on") diff --git a/code/tagging/theme_enrichment.py b/code/tagging/theme_enrichment.py new file mode 100644 index 0000000..7e194d7 --- /dev/null +++ b/code/tagging/theme_enrichment.py @@ -0,0 +1,602 @@ +"""Consolidated theme metadata enrichment pipeline. + +Replaces 7 separate subprocess scripts with single efficient in-memory pipeline: +1. autofill_min_examples - Add placeholder examples +2. pad_min_examples - Pad to minimum threshold +3. cleanup_placeholder_examples - Remove placeholders when real examples added +4. purge_anchor_placeholders - Purge legacy anchor placeholders +5. augment_theme_yaml_from_catalog - Add descriptions/popularity from catalog +6. generate_theme_editorial_suggestions - Generate editorial suggestions +7. lint_theme_editorial - Validate metadata + +Performance improvement: 5-10x faster by loading all YAMLs once, processing in memory, +writing once at the end. +""" +from __future__ import annotations + +import json +import re +import string +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, Set + +try: + import yaml # type: ignore +except ImportError: # pragma: no cover + yaml = None + + +@dataclass +class ThemeData: + """In-memory representation of a theme YAML file.""" + path: Path + data: Dict[str, Any] + modified: bool = False + + +@dataclass +class EnrichmentStats: + """Statistics for enrichment pipeline run.""" + autofilled: int = 0 + padded: int = 0 + cleaned: int = 0 + purged: int = 0 + augmented: int = 0 + suggestions_added: int = 0 + lint_errors: int = 0 + lint_warnings: int = 0 + total_themes: int = 0 + + def __str__(self) -> str: + return ( + f"Enrichment complete: {self.total_themes} themes processed | " + f"autofilled:{self.autofilled} padded:{self.padded} cleaned:{self.cleaned} " + f"purged:{self.purged} augmented:{self.augmented} suggestions:{self.suggestions_added} | " + f"lint: {self.lint_errors} errors, {self.lint_warnings} warnings" + ) + + +class ThemeEnrichmentPipeline: + """Consolidated theme metadata enrichment pipeline.""" + + def __init__( + self, + root: Optional[Path] = None, + min_examples: int = 5, + progress_callback: Optional[Callable[[str], None]] = None, + ): + """Initialize the enrichment pipeline. + + Args: + root: Project root directory (defaults to auto-detect) + min_examples: Minimum number of example commanders required + progress_callback: Optional callback for progress updates (for web UI) + """ + if root is None: + # Auto-detect root (3 levels up from this file) + root = Path(__file__).resolve().parents[2] + + self.root = root + self.catalog_dir = root / 'config' / 'themes' / 'catalog' + self.theme_json = root / 'config' / 'themes' / 'theme_list.json' + self.csv_dir = root / 'csv_files' + self.min_examples = min_examples + self.progress_callback = progress_callback + + self.themes: Dict[Path, ThemeData] = {} + self.stats = EnrichmentStats() + + # Cached data + self._catalog_map: Optional[Dict[str, Dict[str, Any]]] = None + self._card_suggestions: Optional[Dict[str, Any]] = None + + def _emit(self, message: str) -> None: + """Emit progress message via callback or print.""" + if self.progress_callback: + try: + self.progress_callback(message) + except Exception: + pass + else: + print(message, flush=True) + + def load_all_themes(self) -> None: + """Load all theme YAML files into memory (Step 0).""" + if not self.catalog_dir.exists(): + self._emit("Warning: Catalog directory does not exist") + return + + paths = sorted(self.catalog_dir.glob('*.yml')) + self.stats.total_themes = len(paths) + + for path in paths: + try: + if yaml is None: + raise RuntimeError("PyYAML not installed") + data = yaml.safe_load(path.read_text(encoding='utf-8')) + if isinstance(data, dict): + self.themes[path] = ThemeData(path=path, data=data) + except Exception as e: + self._emit(f"Warning: Failed to load {path.name}: {e}") + + self._emit(f"Loaded {len(self.themes)} theme files") + + def _is_deprecated_alias(self, theme_data: Dict[str, Any]) -> bool: + """Check if theme is a deprecated alias placeholder.""" + notes = theme_data.get('notes') + return isinstance(notes, str) and 'Deprecated alias file' in notes + + def _is_placeholder(self, entry: str) -> bool: + """Check if an example entry is a placeholder. + + Matches: + - "Theme Anchor" + - "Theme Anchor B" + - "Theme Anchor C" + etc. + """ + pattern = re.compile(r" Anchor( [A-Z])?$") + return bool(pattern.search(entry)) + + # Step 1: Autofill minimal placeholders + def autofill_placeholders(self) -> None: + """Add placeholder examples for themes with zero examples.""" + for theme in self.themes.values(): + data = theme.data + + if self._is_deprecated_alias(data): + continue + + if not data.get('display_name'): + continue + + # Skip if theme already has real (non-placeholder) examples in YAML + examples = data.get('example_commanders') or [] + if isinstance(examples, list) and examples: + # Check if any examples are real (not " Anchor" placeholders) + has_real_examples = any( + isinstance(ex, str) and ex and not ex.endswith(' Anchor') + for ex in examples + ) + if has_real_examples: + continue # Already has real examples, skip placeholder generation + # If only placeholders, continue to avoid overwriting + + display = data['display_name'] + synergies = data.get('synergies') or [] + if not isinstance(synergies, list): + synergies = [] + + # Generate placeholders from display name + synergies + placeholders = [f"{display} Anchor"] + for s in synergies[:2]: # First 2 synergies + if isinstance(s, str) and s and s != display: + placeholders.append(f"{s} Anchor") + + data['example_commanders'] = placeholders + if not data.get('editorial_quality'): + data['editorial_quality'] = 'draft' + + theme.modified = True + self.stats.autofilled += 1 + + # Step 2: Pad to minimum examples + def pad_examples(self) -> None: + """Pad example lists to minimum threshold with placeholders.""" + for theme in self.themes.values(): + data = theme.data + + if self._is_deprecated_alias(data): + continue + + if not data.get('display_name'): + continue + + examples = data.get('example_commanders') or [] + if not isinstance(examples, list): + continue + + if len(examples) >= self.min_examples: + continue + + # Only pad pure placeholder sets (heuristic: don't mix real + placeholders) + if any(not self._is_placeholder(e) for e in examples): + continue + + display = data['display_name'] + synergies = data.get('synergies') if isinstance(data.get('synergies'), list) else [] + need = self.min_examples - len(examples) + + # Build additional placeholders + new_placeholders = [] + used = set(examples) + + # 1. Additional synergies beyond first 2 + for syn in synergies[2:]: + cand = f"{syn} Anchor" + if cand not in used and syn != display: + new_placeholders.append(cand) + if len(new_placeholders) >= need: + break + + # 2. Generic letter suffixes (B, C, D, ...) + if len(new_placeholders) < need: + for suffix in string.ascii_uppercase[1:]: # Start from 'B' + cand = f"{display} Anchor {suffix}" + if cand not in used: + new_placeholders.append(cand) + if len(new_placeholders) >= need: + break + + if new_placeholders: + data['example_commanders'] = examples + new_placeholders + if not data.get('editorial_quality'): + data['editorial_quality'] = 'draft' + theme.modified = True + self.stats.padded += 1 + + # Step 3: Cleanup placeholders when real examples exist + def cleanup_placeholders(self) -> None: + """Remove placeholders when real examples have been added.""" + for theme in self.themes.values(): + data = theme.data + + if self._is_deprecated_alias(data): + continue + + if not data.get('display_name'): + continue + + examples = data.get('example_commanders') + if not isinstance(examples, list) or not examples: + continue + + placeholders = [e for e in examples if isinstance(e, str) and self._is_placeholder(e)] + real = [e for e in examples if isinstance(e, str) and not self._is_placeholder(e)] + + # Only cleanup if we have both placeholders AND real examples + if placeholders and real: + new_list = real if real else placeholders[:1] # Keep at least one if all placeholders + if new_list != examples: + data['example_commanders'] = new_list + theme.modified = True + self.stats.cleaned += 1 + + # Step 4: Purge legacy anchor placeholders + def purge_anchors(self) -> None: + """Remove all legacy anchor placeholders.""" + pattern = re.compile(r" Anchor( [A-Z])?$") + + for theme in self.themes.values(): + data = theme.data + + examples = data.get('example_commanders') + if not isinstance(examples, list) or not examples: + continue + + placeholders = [e for e in examples if isinstance(e, str) and pattern.search(e)] + if not placeholders: + continue + + real = [e for e in examples if isinstance(e, str) and not pattern.search(e)] + new_list = real # Remove ALL placeholders (even if list becomes empty) + + if new_list != examples: + data['example_commanders'] = new_list + theme.modified = True + self.stats.purged += 1 + + # Step 5: Augment from catalog + def _load_catalog_map(self) -> Dict[str, Dict[str, Any]]: + """Load theme_list.json catalog into memory.""" + if self._catalog_map is not None: + return self._catalog_map + + if not self.theme_json.exists(): + self._emit("Warning: theme_list.json not found") + self._catalog_map = {} + return self._catalog_map + + try: + data = json.loads(self.theme_json.read_text(encoding='utf-8') or '{}') + themes = data.get('themes') or [] + self._catalog_map = {} + for t in themes: + if isinstance(t, dict) and t.get('theme'): + self._catalog_map[str(t['theme'])] = t + except Exception as e: + self._emit(f"Warning: Failed to parse theme_list.json: {e}") + self._catalog_map = {} + + return self._catalog_map + + def augment_from_catalog(self) -> None: + """Add description, popularity, etc. from theme_list.json.""" + catalog_map = self._load_catalog_map() + if not catalog_map: + return + + for theme in self.themes.values(): + data = theme.data + + if self._is_deprecated_alias(data): + continue + + name = str(data.get('display_name') or '').strip() + if not name: + continue + + cat_entry = catalog_map.get(name) + if not cat_entry: + continue + + modified = False + + # Add description if missing + if 'description' not in data and 'description' in cat_entry and cat_entry['description']: + data['description'] = cat_entry['description'] + modified = True + + # Add popularity bucket if missing + if 'popularity_bucket' not in data and cat_entry.get('popularity_bucket'): + data['popularity_bucket'] = cat_entry['popularity_bucket'] + modified = True + + # Add popularity hint if missing + if 'popularity_hint' not in data and cat_entry.get('popularity_hint'): + data['popularity_hint'] = cat_entry['popularity_hint'] + modified = True + + # Backfill deck archetype if missing (defensive) + if 'deck_archetype' not in data and cat_entry.get('deck_archetype'): + data['deck_archetype'] = cat_entry['deck_archetype'] + modified = True + + if modified: + theme.modified = True + self.stats.augmented += 1 + + # Step 6: Generate editorial suggestions (simplified - full implementation would scan CSVs) + def generate_suggestions(self) -> None: + """Generate editorial suggestions for missing example_cards/commanders. + + This runs the generate_theme_editorial_suggestions.py script to populate + example_cards and example_commanders from CSV data (EDHREC ranks + themeTags). + """ + import os + import subprocess + + # Check if we should run the editorial suggestions generator + skip_suggestions = os.environ.get('SKIP_EDITORIAL_SUGGESTIONS', '').lower() in ('1', 'true', 'yes') + if skip_suggestions: + self._emit("Skipping editorial suggestions generation (SKIP_EDITORIAL_SUGGESTIONS=1)") + return + + script_path = self.root / 'code' / 'scripts' / 'generate_theme_editorial_suggestions.py' + if not script_path.exists(): + self._emit("Editorial suggestions script not found; skipping") + return + + try: + self._emit("Generating example_cards and example_commanders from CSV data...") + # Run with --apply to write missing fields, limit to reasonable batch + result = subprocess.run( + [sys.executable, str(script_path), '--apply', '--limit-yaml', '1000', '--top', '8'], + capture_output=True, + text=True, + timeout=300, # 5 minute timeout + cwd=str(self.root) + ) + if result.returncode == 0: + # Reload themes to pick up the generated examples + self.load_all_themes() + self._emit("Editorial suggestions generated successfully") + else: + self._emit(f"Editorial suggestions script failed (exit {result.returncode}): {result.stderr[:200]}") + except subprocess.TimeoutExpired: + self._emit("Editorial suggestions generation timed out (skipping)") + except Exception as e: + self._emit(f"Failed to generate editorial suggestions: {e}") + + # Step 7: Lint/validate + ALLOWED_ARCHETYPES: Set[str] = { + 'Lands', 'Graveyard', 'Planeswalkers', 'Tokens', 'Counters', 'Spells', + 'Artifacts', 'Enchantments', 'Politics', 'Combo', 'Aggro', 'Control', + 'Midrange', 'Stax', 'Ramp', 'Toolbox' + } + + CORNERSTONE: Set[str] = { + 'Landfall', 'Reanimate', 'Superfriends', 'Tokens Matter', '+1/+1 Counters' + } + + def validate(self, enforce_min: bool = False, strict: bool = False) -> None: + """Validate theme metadata (lint).""" + errors: List[str] = [] + warnings: List[str] = [] + seen_display: Set[str] = set() + + for theme in self.themes.values(): + data = theme.data + + if self._is_deprecated_alias(data): + continue + + name = str(data.get('display_name') or '').strip() + if not name: + continue + + if name in seen_display: + continue # Skip duplicates + seen_display.add(name) + + ex_cmd = data.get('example_commanders') or [] + ex_cards = data.get('example_cards') or [] + + if not isinstance(ex_cmd, list): + errors.append(f"{name}: example_commanders not a list") + ex_cmd = [] + + if not isinstance(ex_cards, list): + errors.append(f"{name}: example_cards not a list") + ex_cards = [] + + # Length checks + if len(ex_cmd) > 12: + warnings.append(f"{name}: example_commanders has {len(ex_cmd)} entries (>12)") + + if len(ex_cards) > 20: + warnings.append(f"{name}: example_cards has {len(ex_cards)} entries (>20)") + + # Minimum examples check + if ex_cmd and len(ex_cmd) < self.min_examples: + msg = f"{name}: only {len(ex_cmd)} example_commanders (<{self.min_examples} minimum)" + if enforce_min: + errors.append(msg) + else: + warnings.append(msg) + + # Cornerstone themes should have examples (if strict) + if strict and name in self.CORNERSTONE: + if not ex_cmd: + errors.append(f"{name}: cornerstone theme missing example_commanders") + if not ex_cards: + errors.append(f"{name}: cornerstone theme missing example_cards") + + # Deck archetype validation + archetype = data.get('deck_archetype') + if archetype and archetype not in self.ALLOWED_ARCHETYPES: + warnings.append(f"{name}: unknown deck_archetype '{archetype}'") + + self.stats.lint_errors = len(errors) + self.stats.lint_warnings = len(warnings) + + if errors: + for err in errors: + self._emit(f"ERROR: {err}") + + if warnings: + for warn in warnings: + self._emit(f"WARNING: {warn}") + + def write_all_themes(self) -> None: + """Write all modified themes back to disk (final step).""" + if yaml is None: + raise RuntimeError("PyYAML not installed; cannot write themes") + + written = 0 + for theme in self.themes.values(): + if theme.modified: + try: + theme.path.write_text( + yaml.safe_dump(theme.data, sort_keys=False, allow_unicode=True), + encoding='utf-8' + ) + written += 1 + except Exception as e: + self._emit(f"Error writing {theme.path.name}: {e}") + + self._emit(f"Wrote {written} modified theme files") + + def run_all( + self, + write: bool = True, + enforce_min: bool = False, + strict_lint: bool = False, + run_purge: bool = False, + ) -> EnrichmentStats: + """Run the full enrichment pipeline. + + Args: + write: Whether to write changes to disk (False = dry run) + enforce_min: Whether to treat min_examples violations as errors + strict_lint: Whether to enforce strict validation rules + run_purge: Whether to run purge step (removes ALL anchor placeholders) + + Returns: + EnrichmentStats with summary of operations + """ + self._emit("Starting theme enrichment pipeline...") + + # Step 0: Load all themes + self.load_all_themes() + + # Step 1: Autofill placeholders + self._emit("Step 1/7: Autofilling placeholders...") + self.autofill_placeholders() + + # Step 2: Pad to minimum + self._emit("Step 2/7: Padding to minimum examples...") + self.pad_examples() + + # Step 3: Cleanup mixed placeholder/real lists + self._emit("Step 3/7: Cleaning up placeholders...") + self.cleanup_placeholders() + + # Step 4: Purge all anchor placeholders (optional - disabled by default) + # Note: Purge removes ALL anchors, even from pure placeholder lists. + # Only enable for one-time migration away from placeholder system. + if run_purge: + self._emit("Step 4/7: Purging legacy anchors...") + self.purge_anchors() + else: + self._emit("Step 4/7: Skipping purge (preserving placeholders)...") + + # Step 5: Augment from catalog + self._emit("Step 5/7: Augmenting from catalog...") + self.augment_from_catalog() + + # Step 6: Generate suggestions (skipped for performance) + self._emit("Step 6/7: Generating suggestions...") + self.generate_suggestions() + + # Step 7: Validate + self._emit("Step 7/7: Validating metadata...") + self.validate(enforce_min=enforce_min, strict=strict_lint) + + # Write changes + if write: + self._emit("Writing changes to disk...") + self.write_all_themes() + else: + self._emit("Dry run: no files written") + + self._emit(str(self.stats)) + return self.stats + + +def run_enrichment_pipeline( + root: Optional[Path] = None, + min_examples: int = 5, + write: bool = True, + enforce_min: bool = False, + strict: bool = False, + run_purge: bool = False, + progress_callback: Optional[Callable[[str], None]] = None, +) -> EnrichmentStats: + """Convenience function to run the enrichment pipeline. + + Args: + root: Project root directory + min_examples: Minimum number of example commanders + write: Whether to write changes (False = dry run) + enforce_min: Treat min examples violations as errors + strict: Enforce strict validation rules + run_purge: Whether to run purge step (removes ALL placeholders) + progress_callback: Optional progress callback + + Returns: + EnrichmentStats summary + """ + pipeline = ThemeEnrichmentPipeline( + root=root, + min_examples=min_examples, + progress_callback=progress_callback, + ) + return pipeline.run_all( + write=write, + enforce_min=enforce_min, + strict_lint=strict, + run_purge=run_purge + ) diff --git a/code/tests/test_tag_index.py b/code/tests/test_tag_index.py new file mode 100644 index 0000000..2dd97e9 --- /dev/null +++ b/code/tests/test_tag_index.py @@ -0,0 +1,429 @@ +"""Tests for tag index functionality.""" +import json +import time + +from code.tagging.tag_index import ( + TagIndex, + IndexStats, + get_tag_index, + clear_global_index, +) + + +class TestTagIndexBuild: + """Test index building operations.""" + + def test_build_index(self): + """Test that index builds successfully.""" + index = TagIndex() + stats = index.build() + + assert isinstance(stats, IndexStats) + assert stats.total_cards > 0 + assert stats.total_tags > 0 + assert stats.total_mappings > 0 + assert stats.build_time_seconds >= 0 + + def test_build_index_performance(self): + """Test that index builds in reasonable time.""" + index = TagIndex() + + start = time.perf_counter() + stats = index.build() + elapsed = time.perf_counter() - start + + # Should build in <5s for typical dataset + assert elapsed < 5.0 + assert stats.build_time_seconds < 5.0 + + def test_force_rebuild(self): + """Test that force_rebuild always rebuilds.""" + index = TagIndex() + + # Build once + stats1 = index.build() + time1 = stats1.indexed_at + + # Wait a bit + time.sleep(0.1) + + # Force rebuild + stats2 = index.build(force_rebuild=True) + time2 = stats2.indexed_at + + # Should have different timestamps + assert time2 > time1 + + +class TestSingleTagQueries: + """Test single tag lookup operations.""" + + def test_get_cards_with_tag(self): + """Test getting cards with a specific tag.""" + index = TagIndex() + index.build() + + # Get a tag that exists + all_tags = index.get_all_tags() + if all_tags: + tag = all_tags[0] + cards = index.get_cards_with_tag(tag) + + assert isinstance(cards, set) + assert len(cards) > 0 + + def test_get_cards_with_nonexistent_tag(self): + """Test querying for tag that doesn't exist.""" + index = TagIndex() + index.build() + + cards = index.get_cards_with_tag("ThisTagDoesNotExist12345") + + assert cards == set() + + def test_get_tags_for_card(self): + """Test getting tags for a specific card.""" + index = TagIndex() + index.build() + + # Get a card that exists + cards = index.get_cards_with_tag(index.get_all_tags()[0]) if index.get_all_tags() else set() + if cards: + card_name = list(cards)[0] + tags = index.get_tags_for_card(card_name) + + assert isinstance(tags, list) + assert len(tags) > 0 + + def test_get_tags_for_nonexistent_card(self): + """Test getting tags for card that doesn't exist.""" + index = TagIndex() + index.build() + + tags = index.get_tags_for_card("This Card Does Not Exist 12345") + + assert tags == [] + + +class TestMultiTagQueries: + """Test queries with multiple tags.""" + + def test_get_cards_with_all_tags(self): + """Test AND logic (cards must have all tags).""" + index = TagIndex() + index.build() + + all_tags = index.get_all_tags() + if len(all_tags) >= 2: + # Pick two tags + tag1, tag2 = all_tags[0], all_tags[1] + + cards1 = index.get_cards_with_tag(tag1) + cards2 = index.get_cards_with_tag(tag2) + cards_both = index.get_cards_with_all_tags([tag1, tag2]) + + # Result should be subset of both + assert cards_both.issubset(cards1) + assert cards_both.issubset(cards2) + + # Result should be intersection + assert cards_both == (cards1 & cards2) + + def test_get_cards_with_any_tags(self): + """Test OR logic (cards need at least one tag).""" + index = TagIndex() + index.build() + + all_tags = index.get_all_tags() + if len(all_tags) >= 2: + # Pick two tags + tag1, tag2 = all_tags[0], all_tags[1] + + cards1 = index.get_cards_with_tag(tag1) + cards2 = index.get_cards_with_tag(tag2) + cards_any = index.get_cards_with_any_tags([tag1, tag2]) + + # Result should be superset of both + assert cards1.issubset(cards_any) + assert cards2.issubset(cards_any) + + # Result should be union + assert cards_any == (cards1 | cards2) + + def test_get_cards_with_empty_tag_list(self): + """Test querying with empty tag list.""" + index = TagIndex() + index.build() + + cards_all = index.get_cards_with_all_tags([]) + cards_any = index.get_cards_with_any_tags([]) + + assert cards_all == set() + assert cards_any == set() + + def test_get_cards_with_nonexistent_tags(self): + """Test querying with tags that don't exist.""" + index = TagIndex() + index.build() + + fake_tags = ["FakeTag1", "FakeTag2"] + + cards_all = index.get_cards_with_all_tags(fake_tags) + cards_any = index.get_cards_with_any_tags(fake_tags) + + assert cards_all == set() + assert cards_any == set() + + +class TestIndexStats: + """Test index statistics and metadata.""" + + def test_get_stats(self): + """Test getting index statistics.""" + index = TagIndex() + + # Before building + assert index.get_stats() is None + + # After building + stats = index.build() + retrieved_stats = index.get_stats() + + assert retrieved_stats is not None + assert retrieved_stats.total_cards == stats.total_cards + assert retrieved_stats.total_tags == stats.total_tags + + def test_get_all_tags(self): + """Test getting list of all tags.""" + index = TagIndex() + index.build() + + tags = index.get_all_tags() + + assert isinstance(tags, list) + assert len(tags) > 0 + # Should be sorted + assert tags == sorted(tags) + + def test_get_tag_stats(self): + """Test getting stats for specific tag.""" + index = TagIndex() + index.build() + + all_tags = index.get_all_tags() + if all_tags: + tag = all_tags[0] + stats = index.get_tag_stats(tag) + + assert "card_count" in stats + assert stats["card_count"] > 0 + + def test_get_popular_tags(self): + """Test getting most popular tags.""" + index = TagIndex() + index.build() + + popular = index.get_popular_tags(limit=10) + + assert isinstance(popular, list) + assert len(popular) <= 10 + + if len(popular) > 1: + # Should be sorted by count descending + counts = [count for _, count in popular] + assert counts == sorted(counts, reverse=True) + + +class TestCaching: + """Test index caching and persistence.""" + + def test_save_and_load_cache(self, tmp_path): + """Test that cache saves and loads correctly.""" + cache_path = tmp_path / ".tag_index_test.json" + + # Build and save + index1 = TagIndex(cache_path=cache_path) + stats1 = index1.build() + + assert cache_path.exists() + + # Load from cache + index2 = TagIndex(cache_path=cache_path) + stats2 = index2.build() # Should load from cache + + # Should have same data + assert stats2.total_cards == stats1.total_cards + assert stats2.total_tags == stats1.total_tags + assert stats2.indexed_at == stats1.indexed_at + + def test_cache_invalidation(self, tmp_path): + """Test that cache is rebuilt when all_cards changes.""" + cache_path = tmp_path / ".tag_index_test.json" + + # Build index + index = TagIndex(cache_path=cache_path) + stats1 = index.build() + + # Modify cache to simulate outdated mtime + with cache_path.open("r") as f: + cache_data = json.load(f) + + cache_data["stats"]["all_cards_mtime"] = 0 # Very old + + with cache_path.open("w") as f: + json.dump(cache_data, f) + + # Should rebuild (not use cache) + index2 = TagIndex(cache_path=cache_path) + stats2 = index2.build() + + # Should have new timestamp + assert stats2.indexed_at > stats1.indexed_at + + def test_clear_cache(self, tmp_path): + """Test cache clearing.""" + cache_path = tmp_path / ".tag_index_test.json" + + index = TagIndex(cache_path=cache_path) + index.build() + + assert cache_path.exists() + + index.clear_cache() + + assert not cache_path.exists() + + +class TestGlobalIndex: + """Test global index accessor.""" + + def test_get_tag_index(self): + """Test getting global index.""" + clear_global_index() + + index = get_tag_index() + + assert isinstance(index, TagIndex) + assert index.get_stats() is not None + + def test_get_tag_index_singleton(self): + """Test that global index is a singleton.""" + clear_global_index() + + index1 = get_tag_index() + index2 = get_tag_index() + + # Should be same instance + assert index1 is index2 + + def test_clear_global_index(self): + """Test clearing global index.""" + index1 = get_tag_index() + + clear_global_index() + + index2 = get_tag_index() + + # Should be different instance + assert index1 is not index2 + + +class TestEdgeCases: + """Test edge cases and error handling.""" + + def test_cards_with_no_tags(self): + """Test that cards without tags are handled.""" + index = TagIndex() + index.build() + + # Get stats - should handle cards with no tags gracefully + stats = index.get_stats() + assert stats is not None + + def test_special_characters_in_tags(self): + """Test tags with special characters.""" + index = TagIndex() + index.build() + + # Try querying with special chars (should not crash) + cards = index.get_cards_with_tag("Life & Death") + assert isinstance(cards, set) + + def test_case_sensitive_tags(self): + """Test that tag lookups are case-sensitive.""" + index = TagIndex() + index.build() + + all_tags = index.get_all_tags() + if all_tags: + tag = all_tags[0] + + cards1 = index.get_cards_with_tag(tag) + cards2 = index.get_cards_with_tag(tag.upper()) + cards3 = index.get_cards_with_tag(tag.lower()) + + # Case matters - may get different results + # (depends on tag naming in data) + assert isinstance(cards1, set) + assert isinstance(cards2, set) + assert isinstance(cards3, set) + + def test_duplicate_tags_handled(self): + """Test that duplicate tags in query are handled.""" + index = TagIndex() + index.build() + + all_tags = index.get_all_tags() + if all_tags: + tag = all_tags[0] + + # Query with duplicate tag + cards = index.get_cards_with_all_tags([tag, tag]) + cards_single = index.get_cards_with_tag(tag) + + # Should give same result as single tag + assert cards == cards_single + + +class TestPerformance: + """Test performance characteristics.""" + + def test_query_performance(self): + """Test that queries complete quickly.""" + index = TagIndex() + index.build() + + all_tags = index.get_all_tags() + if all_tags: + tag = all_tags[0] + + # Measure query time + start = time.perf_counter() + for _ in range(100): + index.get_cards_with_tag(tag) + elapsed = time.perf_counter() - start + + avg_time_ms = (elapsed / 100) * 1000 + + # Should average <1ms per query + assert avg_time_ms < 1.0 + + def test_multi_tag_query_performance(self): + """Test multi-tag query performance.""" + index = TagIndex() + index.build() + + all_tags = index.get_all_tags() + if len(all_tags) >= 3: + tags = all_tags[:3] + + # Measure query time + start = time.perf_counter() + for _ in range(100): + index.get_cards_with_all_tags(tags) + elapsed = time.perf_counter() - start + + avg_time_ms = (elapsed / 100) * 1000 + + # Should still be very fast + assert avg_time_ms < 5.0 diff --git a/code/tests/test_tag_loader.py b/code/tests/test_tag_loader.py new file mode 100644 index 0000000..dbe8102 --- /dev/null +++ b/code/tests/test_tag_loader.py @@ -0,0 +1,259 @@ +"""Tests for batch tag loading from all_cards.""" +from code.tagging.tag_loader import ( + load_tags_for_cards, + load_tags_for_card, + get_cards_with_tag, + get_cards_with_all_tags, + clear_cache, + is_use_all_cards_enabled, +) + + +class TestBatchTagLoading: + """Test batch tag loading operations.""" + + def test_load_tags_for_multiple_cards(self): + """Test loading tags for multiple cards at once.""" + cards = ["Sol Ring", "Lightning Bolt", "Counterspell"] + result = load_tags_for_cards(cards) + + assert isinstance(result, dict) + assert len(result) == 3 + + # All requested cards should be in result (even if no tags) + for card in cards: + assert card in result + assert isinstance(result[card], list) + + def test_load_tags_for_empty_list(self): + """Test loading tags for empty list returns empty dict.""" + result = load_tags_for_cards([]) + assert result == {} + + def test_load_tags_for_single_card(self): + """Test single card convenience function.""" + tags = load_tags_for_card("Sol Ring") + + assert isinstance(tags, list) + # Sol Ring should have some tags (artifacts, ramp, etc) + # But we don't assert specific tags since data may vary + + def test_load_tags_for_nonexistent_card(self): + """Test loading tags for card that doesn't exist.""" + tags = load_tags_for_card("This Card Does Not Exist 12345") + + # Should return empty list, not fail + assert tags == [] + + def test_load_tags_batch_includes_missing_cards(self): + """Test batch loading includes missing cards with empty lists.""" + cards = ["Sol Ring", "Fake Card Name 999", "Lightning Bolt"] + result = load_tags_for_cards(cards) + + # All cards should be present + assert len(result) == 3 + assert "Fake Card Name 999" in result + assert result["Fake Card Name 999"] == [] + + def test_load_tags_handles_list_format(self): + """Test that tags in list format are parsed correctly.""" + # Pick a card likely to have tags + result = load_tags_for_cards(["Sol Ring"]) + + if "Sol Ring" in result and result["Sol Ring"]: + tags = result["Sol Ring"] + # Should be a list of strings + assert all(isinstance(tag, str) for tag in tags) + # Tags should be stripped of whitespace + assert all(tag == tag.strip() for tag in tags) + + def test_load_tags_handles_string_format(self): + """Test that tags in string format are parsed correctly.""" + # The loader should handle both list and string representations + # This is tested implicitly by loading any card + cards = ["Sol Ring", "Lightning Bolt"] + result = load_tags_for_cards(cards) + + for card in cards: + tags = result[card] + # All should be lists (even if empty) + assert isinstance(tags, list) + # No empty string tags + assert "" not in tags + assert all(tag.strip() for tag in tags) + + +class TestTagQueries: + """Test querying cards by tags.""" + + def test_get_cards_with_tag(self): + """Test getting all cards with a specific tag.""" + # Pick a common tag + cards = get_cards_with_tag("ramp", limit=10) + + assert isinstance(cards, list) + # Should have some cards (or none if tag doesn't exist) + # We don't assert specific count since data varies + + def test_get_cards_with_tag_limit(self): + """Test limit parameter works.""" + cards = get_cards_with_tag("ramp", limit=5) + + assert len(cards) <= 5 + + def test_get_cards_with_nonexistent_tag(self): + """Test querying with tag that doesn't exist.""" + cards = get_cards_with_tag("ThisTagDoesNotExist12345") + + # Should return empty list, not fail + assert cards == [] + + def test_get_cards_with_all_tags(self): + """Test getting cards that have multiple tags.""" + # Pick two tags that might overlap + cards = get_cards_with_all_tags(["artifacts", "ramp"], limit=10) + + assert isinstance(cards, list) + assert len(cards) <= 10 + + def test_get_cards_with_all_tags_no_matches(self): + """Test query with tags that likely have no overlap.""" + cards = get_cards_with_all_tags([ + "ThisTagDoesNotExist1", + "ThisTagDoesNotExist2" + ]) + + # Should return empty list + assert cards == [] + + +class TestCacheManagement: + """Test cache management functions.""" + + def test_clear_cache(self): + """Test that cache can be cleared without errors.""" + # Load some data + load_tags_for_card("Sol Ring") + + # Clear cache + clear_cache() + + # Should still work after clearing + tags = load_tags_for_card("Sol Ring") + assert isinstance(tags, list) + + def test_cache_persistence(self): + """Test that multiple calls use cached data.""" + # First call + result1 = load_tags_for_cards(["Sol Ring", "Lightning Bolt"]) + + # Second call (should use cache) + result2 = load_tags_for_cards(["Sol Ring", "Lightning Bolt"]) + + # Results should be identical + assert result1 == result2 + + +class TestFeatureFlag: + """Test feature flag functionality.""" + + def test_is_use_all_cards_enabled_default(self): + """Test that all_cards tag loading is enabled by default.""" + enabled = is_use_all_cards_enabled() + + # Default should be True + assert isinstance(enabled, bool) + # We don't assert True since env might override + + +class TestEdgeCases: + """Test edge cases and error handling.""" + + def test_load_tags_with_special_characters(self): + """Test loading tags for cards with special characters.""" + # Cards with apostrophes, commas, etc. + cards = [ + "Urza's Saga", + "Keeper of the Accord", + "Esper Sentinel" + ] + result = load_tags_for_cards(cards) + + # Should handle special characters + assert len(result) == 3 + for card in cards: + assert card in result + + def test_load_tags_preserves_card_name_case(self): + """Test that card names preserve their original case.""" + cards = ["Sol Ring", "LIGHTNING BOLT", "counterspell"] + result = load_tags_for_cards(cards) + + # Should have entries for provided names (case-sensitive lookup) + assert "Sol Ring" in result or len(result) >= 1 + # Note: exact case matching depends on all_cards data + + def test_load_tags_deduplicates(self): + """Test that duplicate tags are handled.""" + # Load tags for a card + tags = load_tags_for_card("Sol Ring") + + # If any tags present, check for no duplicates + if tags: + assert len(tags) == len(set(tags)) + + def test_large_batch_performance(self): + """Test that large batch loads complete in reasonable time.""" + import time + + # Create a batch of 100 common cards + cards = ["Sol Ring"] * 50 + ["Lightning Bolt"] * 50 + + start = time.perf_counter() + result = load_tags_for_cards(cards) + elapsed = time.perf_counter() - start + + # Should complete quickly (< 1 second for 100 cards) + assert elapsed < 1.0 + assert len(result) >= 1 # At least one card found + + +class TestFormatVariations: + """Test handling of different tag format variations.""" + + def test_empty_tags_handled(self): + """Test that cards with no tags return empty list.""" + # Pick a card that might have no tags (basic lands usually don't) + tags = load_tags_for_card("Plains") + + # Should be empty list, not None or error + assert tags == [] or isinstance(tags, list) + + def test_string_list_repr_parsed(self): + """Test parsing of string representations like \"['tag1', 'tag2']\".""" + # This is tested implicitly through load_tags_for_cards + # The loader handles multiple formats internally + cards = ["Sol Ring", "Lightning Bolt", "Counterspell"] + result = load_tags_for_cards(cards) + + # All results should be lists + for card, tags in result.items(): + assert isinstance(tags, list) + # No stray brackets or quotes + for tag in tags: + assert "[" not in tag + assert "]" not in tag + assert '"' not in tag + assert "'" not in tag or tag.count("'") > 1 # Allow apostrophes in words + + def test_comma_separated_parsed(self): + """Test parsing of comma-separated tag strings.""" + # The loader should handle comma-separated strings + # This is tested implicitly by loading any card + result = load_tags_for_cards(["Sol Ring"]) + + if result.get("Sol Ring"): + tags = result["Sol Ring"] + # Tags should be split properly (no commas in individual tags) + for tag in tags: + assert "," not in tag or tag.count(",") == 0 diff --git a/code/tests/test_theme_enrichment.py b/code/tests/test_theme_enrichment.py new file mode 100644 index 0000000..8d4ba02 --- /dev/null +++ b/code/tests/test_theme_enrichment.py @@ -0,0 +1,370 @@ +"""Tests for consolidated theme enrichment pipeline. + +These tests verify that the new consolidated pipeline produces the same results +as the old 7-script approach, but much faster. +""" +from __future__ import annotations + +from pathlib import Path +from typing import Any, Dict + +import pytest + +try: + import yaml +except ImportError: + yaml = None + +from code.tagging.theme_enrichment import ( + ThemeEnrichmentPipeline, + EnrichmentStats, + run_enrichment_pipeline, +) + + +# Skip all tests if PyYAML not available +pytestmark = pytest.mark.skipif(yaml is None, reason="PyYAML not installed") + + +@pytest.fixture +def temp_catalog_dir(tmp_path: Path) -> Path: + """Create temporary catalog directory with test themes.""" + catalog_dir = tmp_path / 'config' / 'themes' / 'catalog' + catalog_dir.mkdir(parents=True) + return catalog_dir + + +@pytest.fixture +def temp_root(tmp_path: Path, temp_catalog_dir: Path) -> Path: + """Create temporary project root.""" + # Create theme_list.json + theme_json = tmp_path / 'config' / 'themes' / 'theme_list.json' + theme_json.parent.mkdir(parents=True, exist_ok=True) + theme_json.write_text('{"themes": []}', encoding='utf-8') + return tmp_path + + +def write_theme(catalog_dir: Path, filename: str, data: Dict[str, Any]) -> Path: + """Helper to write a theme YAML file.""" + path = catalog_dir / filename + path.write_text(yaml.safe_dump(data, sort_keys=False, allow_unicode=True), encoding='utf-8') + return path + + +def read_theme(path: Path) -> Dict[str, Any]: + """Helper to read a theme YAML file.""" + return yaml.safe_load(path.read_text(encoding='utf-8')) + + +class TestThemeEnrichmentPipeline: + """Tests for ThemeEnrichmentPipeline class.""" + + def test_init(self, temp_root: Path): + """Test pipeline initialization.""" + pipeline = ThemeEnrichmentPipeline(root=temp_root, min_examples=5) + + assert pipeline.root == temp_root + assert pipeline.min_examples == 5 + assert pipeline.catalog_dir == temp_root / 'config' / 'themes' / 'catalog' + assert len(pipeline.themes) == 0 + + def test_load_themes_empty_dir(self, temp_root: Path): + """Test loading themes from empty directory.""" + pipeline = ThemeEnrichmentPipeline(root=temp_root) + pipeline.load_all_themes() + + assert len(pipeline.themes) == 0 + assert pipeline.stats.total_themes == 0 + + def test_load_themes_with_valid_files(self, temp_root: Path, temp_catalog_dir: Path): + """Test loading valid theme files.""" + write_theme(temp_catalog_dir, 'landfall.yml', { + 'display_name': 'Landfall', + 'synergies': ['Ramp', 'Tokens'], + 'example_commanders': [] + }) + write_theme(temp_catalog_dir, 'reanimate.yml', { + 'display_name': 'Reanimate', + 'synergies': ['Graveyard', 'Mill'], + 'example_commanders': ['Meren of Clan Nel Toth'] + }) + + pipeline = ThemeEnrichmentPipeline(root=temp_root) + pipeline.load_all_themes() + + assert len(pipeline.themes) == 2 + assert pipeline.stats.total_themes == 2 + + def test_autofill_placeholders_empty_examples(self, temp_root: Path, temp_catalog_dir: Path): + """Test autofill adds placeholders to themes with no examples.""" + write_theme(temp_catalog_dir, 'tokens.yml', { + 'display_name': 'Tokens Matter', + 'synergies': ['Sacrifice', 'Aristocrats'], + 'example_commanders': [] + }) + + pipeline = ThemeEnrichmentPipeline(root=temp_root) + pipeline.load_all_themes() + pipeline.autofill_placeholders() + + assert pipeline.stats.autofilled == 1 + theme = list(pipeline.themes.values())[0] + assert theme.modified + assert 'Tokens Matter Anchor' in theme.data['example_commanders'] + assert 'Sacrifice Anchor' in theme.data['example_commanders'] + assert 'Aristocrats Anchor' in theme.data['example_commanders'] + assert theme.data.get('editorial_quality') == 'draft' + + def test_autofill_skips_themes_with_examples(self, temp_root: Path, temp_catalog_dir: Path): + """Test autofill skips themes that already have examples.""" + write_theme(temp_catalog_dir, 'landfall.yml', { + 'display_name': 'Landfall', + 'synergies': ['Ramp'], + 'example_commanders': ['Tatyova, Benthic Druid'] + }) + + pipeline = ThemeEnrichmentPipeline(root=temp_root) + pipeline.load_all_themes() + pipeline.autofill_placeholders() + + assert pipeline.stats.autofilled == 0 + theme = list(pipeline.themes.values())[0] + assert not theme.modified + + def test_pad_examples_to_minimum(self, temp_root: Path, temp_catalog_dir: Path): + """Test padding adds placeholders to reach minimum threshold.""" + write_theme(temp_catalog_dir, 'ramp.yml', { + 'display_name': 'Ramp', + 'synergies': ['Landfall', 'BigSpells', 'Hydras'], + 'example_commanders': ['Ramp Anchor', 'Landfall Anchor'] + }) + + pipeline = ThemeEnrichmentPipeline(root=temp_root, min_examples=5) + pipeline.load_all_themes() + pipeline.pad_examples() + + assert pipeline.stats.padded == 1 + theme = list(pipeline.themes.values())[0] + assert theme.modified + assert len(theme.data['example_commanders']) == 5 + # Should add synergies first (3rd synergy), then letter suffixes + assert 'Hydras Anchor' in theme.data['example_commanders'] + # Should also have letter suffixes for remaining slots + assert any('Anchor B' in cmd or 'Anchor C' in cmd for cmd in theme.data['example_commanders']) + + def test_pad_skips_mixed_real_and_placeholder(self, temp_root: Path, temp_catalog_dir: Path): + """Test padding skips lists with both real and placeholder examples.""" + write_theme(temp_catalog_dir, 'tokens.yml', { + 'display_name': 'Tokens', + 'synergies': ['Sacrifice'], + 'example_commanders': ['Krenko, Mob Boss', 'Tokens Anchor'] + }) + + pipeline = ThemeEnrichmentPipeline(root=temp_root, min_examples=5) + pipeline.load_all_themes() + pipeline.pad_examples() + + assert pipeline.stats.padded == 0 + theme = list(pipeline.themes.values())[0] + assert not theme.modified + + def test_cleanup_removes_placeholders_when_real_present(self, temp_root: Path, temp_catalog_dir: Path): + """Test cleanup removes placeholders when real examples are present. + + Note: cleanup only removes entries ending with ' Anchor' (no suffix). + Purge step removes entries with ' Anchor' or ' Anchor X' pattern. + """ + write_theme(temp_catalog_dir, 'lifegain.yml', { + 'display_name': 'Lifegain', + 'synergies': [], + 'example_commanders': [ + 'Oloro, Ageless Ascetic', + 'Lifegain Anchor', # Will be removed + 'Trelasarra, Moon Dancer', + ] + }) + + pipeline = ThemeEnrichmentPipeline(root=temp_root) + pipeline.load_all_themes() + pipeline.cleanup_placeholders() + + assert pipeline.stats.cleaned == 1 + theme = list(pipeline.themes.values())[0] + assert theme.modified + assert len(theme.data['example_commanders']) == 2 + assert 'Oloro, Ageless Ascetic' in theme.data['example_commanders'] + assert 'Trelasarra, Moon Dancer' in theme.data['example_commanders'] + assert 'Lifegain Anchor' not in theme.data['example_commanders'] + + def test_purge_removes_all_anchors(self, temp_root: Path, temp_catalog_dir: Path): + """Test purge removes all anchor placeholders (even if no real examples).""" + write_theme(temp_catalog_dir, 'counters.yml', { + 'display_name': 'Counters', + 'synergies': [], + 'example_commanders': [ + 'Counters Anchor', + 'Counters Anchor B', + 'Counters Anchor C' + ] + }) + + pipeline = ThemeEnrichmentPipeline(root=temp_root) + pipeline.load_all_themes() + pipeline.purge_anchors() + + assert pipeline.stats.purged == 1 + theme = list(pipeline.themes.values())[0] + assert theme.modified + assert theme.data['example_commanders'] == [] + + def test_augment_from_catalog(self, temp_root: Path, temp_catalog_dir: Path): + """Test augmentation adds missing fields from catalog.""" + # Create catalog JSON + catalog_json = temp_root / 'config' / 'themes' / 'theme_list.json' + catalog_data = { + 'themes': [ + { + 'theme': 'Landfall', + 'description': 'Triggers from lands entering', + 'popularity_bucket': 'common', + 'popularity_hint': 'Very popular', + 'deck_archetype': 'Lands' + } + ] + } + import json + catalog_json.write_text(json.dumps(catalog_data), encoding='utf-8') + + write_theme(temp_catalog_dir, 'landfall.yml', { + 'display_name': 'Landfall', + 'synergies': ['Ramp'], + 'example_commanders': ['Tatyova, Benthic Druid'] + }) + + pipeline = ThemeEnrichmentPipeline(root=temp_root) + pipeline.load_all_themes() + pipeline.augment_from_catalog() + + assert pipeline.stats.augmented == 1 + theme = list(pipeline.themes.values())[0] + assert theme.modified + assert theme.data['description'] == 'Triggers from lands entering' + assert theme.data['popularity_bucket'] == 'common' + assert theme.data['popularity_hint'] == 'Very popular' + assert theme.data['deck_archetype'] == 'Lands' + + def test_validate_min_examples_warning(self, temp_root: Path, temp_catalog_dir: Path): + """Test validation warns about insufficient examples.""" + write_theme(temp_catalog_dir, 'ramp.yml', { + 'display_name': 'Ramp', + 'synergies': [], + 'example_commanders': ['Ramp Commander'] + }) + + pipeline = ThemeEnrichmentPipeline(root=temp_root, min_examples=5) + pipeline.load_all_themes() + pipeline.validate(enforce_min=False) + + assert pipeline.stats.lint_warnings > 0 + assert pipeline.stats.lint_errors == 0 + + def test_validate_min_examples_error(self, temp_root: Path, temp_catalog_dir: Path): + """Test validation errors on insufficient examples when enforced.""" + write_theme(temp_catalog_dir, 'ramp.yml', { + 'display_name': 'Ramp', + 'synergies': [], + 'example_commanders': ['Ramp Commander'] + }) + + pipeline = ThemeEnrichmentPipeline(root=temp_root, min_examples=5) + pipeline.load_all_themes() + pipeline.validate(enforce_min=True) + + assert pipeline.stats.lint_errors > 0 + + def test_write_themes_dry_run(self, temp_root: Path, temp_catalog_dir: Path): + """Test dry run doesn't write files.""" + theme_path = write_theme(temp_catalog_dir, 'tokens.yml', { + 'display_name': 'Tokens', + 'synergies': [], + 'example_commanders': [] + }) + + original_content = theme_path.read_text(encoding='utf-8') + + pipeline = ThemeEnrichmentPipeline(root=temp_root) + pipeline.load_all_themes() + pipeline.autofill_placeholders() + # Don't call write_all_themes() + + # File should be unchanged + assert theme_path.read_text(encoding='utf-8') == original_content + + def test_write_themes_saves_changes(self, temp_root: Path, temp_catalog_dir: Path): + """Test write_all_themes saves modified files.""" + theme_path = write_theme(temp_catalog_dir, 'tokens.yml', { + 'display_name': 'Tokens', + 'synergies': ['Sacrifice'], + 'example_commanders': [] + }) + + pipeline = ThemeEnrichmentPipeline(root=temp_root) + pipeline.load_all_themes() + pipeline.autofill_placeholders() + pipeline.write_all_themes() + + # File should be updated + updated_data = read_theme(theme_path) + assert len(updated_data['example_commanders']) > 0 + assert 'Tokens Anchor' in updated_data['example_commanders'] + + def test_run_all_full_pipeline(self, temp_root: Path, temp_catalog_dir: Path): + """Test running the complete enrichment pipeline.""" + write_theme(temp_catalog_dir, 'landfall.yml', { + 'display_name': 'Landfall', + 'synergies': ['Ramp', 'Lands'], + 'example_commanders': [] + }) + write_theme(temp_catalog_dir, 'reanimate.yml', { + 'display_name': 'Reanimate', + 'synergies': ['Graveyard'], + 'example_commanders': [] + }) + + pipeline = ThemeEnrichmentPipeline(root=temp_root, min_examples=5) + stats = pipeline.run_all(write=True, enforce_min=False, strict_lint=False) + + assert stats.total_themes == 2 + assert stats.autofilled >= 2 + assert stats.padded >= 2 + + # Verify files were updated + landfall_data = read_theme(temp_catalog_dir / 'landfall.yml') + assert len(landfall_data['example_commanders']) >= 5 + assert landfall_data.get('editorial_quality') == 'draft' + + +def test_run_enrichment_pipeline_convenience_function(temp_root: Path, temp_catalog_dir: Path): + """Test the convenience function wrapper.""" + write_theme(temp_catalog_dir, 'tokens.yml', { + 'display_name': 'Tokens', + 'synergies': ['Sacrifice'], + 'example_commanders': [] + }) + + stats = run_enrichment_pipeline( + root=temp_root, + min_examples=3, + write=True, + enforce_min=False, + strict=False, + progress_callback=None, + ) + + assert isinstance(stats, EnrichmentStats) + assert stats.total_themes == 1 + assert stats.autofilled >= 1 + + # Verify file was written + tokens_data = read_theme(temp_catalog_dir / 'tokens.yml') + assert len(tokens_data['example_commanders']) >= 3 diff --git a/code/tests/test_web_tag_endpoints.py b/code/tests/test_web_tag_endpoints.py new file mode 100644 index 0000000..9a5c8c3 --- /dev/null +++ b/code/tests/test_web_tag_endpoints.py @@ -0,0 +1,214 @@ +"""Tests for web tag search endpoints.""" +import pytest +from fastapi.testclient import TestClient + + +@pytest.fixture +def client(): + """Create a test client for the web app.""" + # Import here to avoid circular imports + from code.web.app import app + return TestClient(app) + + +def test_theme_autocomplete_basic(client): + """Test basic theme autocomplete functionality.""" + response = client.get("/commanders/theme-autocomplete?theme=life&limit=5") + + assert response.status_code == 200 + assert "text/html" in response.headers["content-type"] + + content = response.text + assert "autocomplete-item" in content + assert "Life" in content # Should match tags starting with "life" + assert "tag-count" in content # Should show card counts + + +def test_theme_autocomplete_min_length(client): + """Test that theme autocomplete requires minimum 2 characters.""" + response = client.get("/commanders/theme-autocomplete?theme=a&limit=5") + + # Should fail validation + assert response.status_code == 422 + + +def test_theme_autocomplete_no_matches(client): + """Test theme autocomplete with query that has no matches.""" + response = client.get("/commanders/theme-autocomplete?theme=zzzzzzzzz&limit=5") + + assert response.status_code == 200 + content = response.text + assert "autocomplete-empty" in content or "No matching themes" in content + + +def test_theme_autocomplete_limit(client): + """Test that theme autocomplete respects limit parameter.""" + response = client.get("/commanders/theme-autocomplete?theme=a&limit=3") + + assert response.status_code in [200, 422] # May fail min_length validation + + # Try with valid length + response = client.get("/commanders/theme-autocomplete?theme=to&limit=3") + assert response.status_code == 200 + + # Count items (rough check - should have at most 3) + content = response.text + item_count = content.count('class="autocomplete-item"') + assert item_count <= 3 + + +def test_api_cards_by_tags_and_logic(client): + """Test card search with AND logic.""" + response = client.get("/api/cards/by-tags?tags=tokens&logic=AND&limit=10") + + assert response.status_code == 200 + data = response.json() + + assert "tags" in data + assert "logic" in data + assert data["logic"] == "AND" + assert "total_matches" in data + assert "cards" in data + assert isinstance(data["cards"], list) + + +def test_api_cards_by_tags_or_logic(client): + """Test card search with OR logic.""" + response = client.get("/api/cards/by-tags?tags=tokens,sacrifice&logic=OR&limit=10") + + assert response.status_code == 200 + data = response.json() + + assert data["logic"] == "OR" + assert "cards" in data + + +def test_api_cards_by_tags_invalid_logic(client): + """Test that invalid logic parameter returns error.""" + response = client.get("/api/cards/by-tags?tags=tokens&logic=INVALID&limit=10") + + assert response.status_code == 400 + data = response.json() + assert "error" in data + + +def test_api_cards_by_tags_empty_tags(client): + """Test that empty tags parameter returns error.""" + response = client.get("/api/cards/by-tags?tags=&logic=AND&limit=10") + + assert response.status_code == 400 + data = response.json() + assert "error" in data + + +def test_api_tags_search(client): + """Test tag search autocomplete endpoint.""" + response = client.get("/api/cards/tags/search?q=life&limit=10") + + assert response.status_code == 200 + data = response.json() + + assert "query" in data + assert data["query"] == "life" + assert "matches" in data + assert isinstance(data["matches"], list) + + # Check match structure + if data["matches"]: + match = data["matches"][0] + assert "tag" in match + assert "card_count" in match + assert match["tag"].lower().startswith("life") + + +def test_api_tags_search_min_length(client): + """Test that tag search requires minimum 2 characters.""" + response = client.get("/api/cards/tags/search?q=a&limit=10") + + # Should fail validation + assert response.status_code == 422 + + +def test_api_tags_popular(client): + """Test popular tags endpoint.""" + response = client.get("/api/cards/tags/popular?limit=20") + + assert response.status_code == 200 + data = response.json() + + assert "count" in data + assert "tags" in data + assert isinstance(data["tags"], list) + assert data["count"] == len(data["tags"]) + assert data["count"] <= 20 + + # Check tag structure + if data["tags"]: + tag = data["tags"][0] + assert "tag" in tag + assert "card_count" in tag + assert isinstance(tag["card_count"], int) + + # Tags should be sorted by card count (descending) + if len(data["tags"]) > 1: + assert data["tags"][0]["card_count"] >= data["tags"][1]["card_count"] + + +def test_api_tags_popular_limit(client): + """Test that popular tags endpoint respects limit.""" + response = client.get("/api/cards/tags/popular?limit=5") + + assert response.status_code == 200 + data = response.json() + + assert len(data["tags"]) <= 5 + + +def test_commanders_page_loads(client): + """Test that commanders page loads successfully.""" + response = client.get("/commanders") + + assert response.status_code == 200 + assert "text/html" in response.headers["content-type"] + + content = response.text + # Should have the theme filter input + assert "commander-theme" in content + assert "theme-suggestions" in content + + +def test_commanders_page_with_theme_filter(client): + """Test commanders page with theme query parameter.""" + response = client.get("/commanders?theme=tokens") + + assert response.status_code == 200 + content = response.text + + # Should have the theme value in the input + assert 'value="tokens"' in content or "tokens" in content + + +@pytest.mark.skip(reason="Performance test - run manually") +def test_theme_autocomplete_performance(client): + """Test that theme autocomplete responds quickly.""" + import time + + start = time.time() + response = client.get("/commanders/theme-autocomplete?theme=to&limit=20") + elapsed = time.time() - start + + assert response.status_code == 200 + assert elapsed < 0.05 # Should respond in <50ms + + +@pytest.mark.skip(reason="Performance test - run manually") +def test_api_tags_search_performance(client): + """Test that tag search responds quickly.""" + import time + + start = time.time() + response = client.get("/api/cards/tags/search?q=to&limit=20") + elapsed = time.time() - start + + assert response.status_code == 200 + assert elapsed < 0.05 # Should respond in <50ms diff --git a/code/web/app.py b/code/web/app.py index 3c17093..767eb36 100644 --- a/code/web/app.py +++ b/code/web/app.py @@ -2205,6 +2205,7 @@ from .routes import themes as themes_routes # noqa: E402 from .routes import commanders as commanders_routes # noqa: E402 from .routes import partner_suggestions as partner_suggestions_routes # noqa: E402 from .routes import telemetry as telemetry_routes # noqa: E402 +from .routes import cards as cards_routes # noqa: E402 app.include_router(build_routes.router) app.include_router(config_routes.router) app.include_router(decks_routes.router) @@ -2214,6 +2215,7 @@ app.include_router(themes_routes.router) app.include_router(commanders_routes.router) app.include_router(partner_suggestions_routes.router) app.include_router(telemetry_routes.router) +app.include_router(cards_routes.router) # Warm validation cache early to reduce first-call latency in tests and dev try: diff --git a/code/web/routes/cards.py b/code/web/routes/cards.py new file mode 100644 index 0000000..28f8a7b --- /dev/null +++ b/code/web/routes/cards.py @@ -0,0 +1,186 @@ +"""Card browsing and tag search API endpoints.""" +from __future__ import annotations + +from typing import Optional +from fastapi import APIRouter, Query +from fastapi.responses import JSONResponse + +# Import tag index from M3 +try: + from code.tagging.tag_index import get_tag_index +except ImportError: + from tagging.tag_index import get_tag_index + +# Import all cards loader +try: + from code.services.all_cards_loader import AllCardsLoader +except ImportError: + from services.all_cards_loader import AllCardsLoader + +router = APIRouter(prefix="/api/cards", tags=["cards"]) + +# Cache for all_cards loader +_all_cards_loader: Optional[AllCardsLoader] = None + + +def _get_all_cards_loader() -> AllCardsLoader: + """Get cached AllCardsLoader instance.""" + global _all_cards_loader + if _all_cards_loader is None: + _all_cards_loader = AllCardsLoader() + return _all_cards_loader + + +@router.get("/by-tags") +async def search_by_tags( + tags: str = Query(..., description="Comma-separated list of theme tags"), + logic: str = Query("AND", description="Search logic: AND (intersection) or OR (union)"), + limit: int = Query(100, ge=1, le=1000, description="Maximum number of results"), +) -> JSONResponse: + """Search for cards by theme tags. + + Examples: + /api/cards/by-tags?tags=tokens&logic=AND + /api/cards/by-tags?tags=tokens,sacrifice&logic=AND + /api/cards/by-tags?tags=lifegain,lifelink&logic=OR + + Args: + tags: Comma-separated theme tags to search for + logic: "AND" for cards with all tags, "OR" for cards with any tag + limit: Maximum results to return + + Returns: + JSON with matching cards and metadata + """ + try: + # Parse tags + tag_list = [t.strip() for t in tags.split(",") if t.strip()] + if not tag_list: + return JSONResponse( + status_code=400, + content={"error": "No valid tags provided"} + ) + + # Get tag index and find matching cards + tag_index = get_tag_index() + + if logic.upper() == "AND": + card_names = tag_index.get_cards_with_all_tags(tag_list) + elif logic.upper() == "OR": + card_names = tag_index.get_cards_with_any_tags(tag_list) + else: + return JSONResponse( + status_code=400, + content={"error": f"Invalid logic: {logic}. Use AND or OR."} + ) + + # Load full card data + all_cards = _get_all_cards_loader().load() + matching_cards = all_cards[all_cards["name"].isin(card_names)] + + # Limit results + matching_cards = matching_cards.head(limit) + + # Convert to dict + results = matching_cards.to_dict("records") + + return JSONResponse(content={ + "tags": tag_list, + "logic": logic.upper(), + "total_matches": len(card_names), + "returned": len(results), + "limit": limit, + "cards": results + }) + + except Exception as e: + return JSONResponse( + status_code=500, + content={"error": f"Search failed: {str(e)}"} + ) + + +@router.get("/tags/search") +async def search_tags( + q: str = Query(..., min_length=2, description="Tag prefix to search for"), + limit: int = Query(10, ge=1, le=50, description="Maximum number of suggestions"), +) -> JSONResponse: + """Autocomplete search for theme tags. + + Examples: + /api/cards/tags/search?q=life + /api/cards/tags/search?q=token&limit=5 + + Args: + q: Tag prefix (minimum 2 characters) + limit: Maximum suggestions to return + + Returns: + JSON with matching tags sorted by popularity + """ + try: + tag_index = get_tag_index() + + # Get all tags with counts - get_popular_tags returns all tags when given a high limit + all_tags_with_counts = tag_index.get_popular_tags(limit=10000) + + # Filter by prefix (case-insensitive) + prefix_lower = q.lower() + matches = [ + (tag, count) + for tag, count in all_tags_with_counts + if tag.lower().startswith(prefix_lower) + ] + + # Already sorted by popularity from get_popular_tags + # Limit results + matches = matches[:limit] + + return JSONResponse(content={ + "query": q, + "matches": [ + {"tag": tag, "card_count": count} + for tag, count in matches + ] + }) + + except Exception as e: + return JSONResponse( + status_code=500, + content={"error": f"Tag search failed: {str(e)}"} + ) + + +@router.get("/tags/popular") +async def get_popular_tags( + limit: int = Query(50, ge=1, le=200, description="Number of popular tags to return"), +) -> JSONResponse: + """Get the most popular theme tags by card count. + + Examples: + /api/cards/tags/popular + /api/cards/tags/popular?limit=20 + + Args: + limit: Maximum tags to return + + Returns: + JSON with popular tags sorted by card count + """ + try: + tag_index = get_tag_index() + popular = tag_index.get_popular_tags(limit=limit) + + return JSONResponse(content={ + "count": len(popular), + "tags": [ + {"tag": tag, "card_count": count} + for tag, count in popular + ] + }) + + except Exception as e: + return JSONResponse( + status_code=500, + content={"error": f"Failed to get popular tags: {str(e)}"} + ) diff --git a/code/web/routes/commanders.py b/code/web/routes/commanders.py index 88053b5..7b0fad0 100644 --- a/code/web/routes/commanders.py +++ b/code/web/routes/commanders.py @@ -526,6 +526,52 @@ def _build_theme_info(records: Sequence[CommanderRecord]) -> dict[str, Commander return info +@router.get("/theme-autocomplete", response_class=HTMLResponse) +async def theme_autocomplete( + request: Request, + theme: str = Query(..., min_length=2, description="Theme prefix to search for"), + limit: int = Query(20, ge=1, le=50), +) -> HTMLResponse: + """HTMX endpoint for theme tag autocomplete.""" + try: + # Import tag_index + try: + from code.tagging.tag_index import get_tag_index + except ImportError: + from tagging.tag_index import get_tag_index + + tag_index = get_tag_index() + + # Get all tags with counts - get_popular_tags returns all tags when given a high limit + all_tags_with_counts = tag_index.get_popular_tags(limit=10000) + + # Filter by prefix (case-insensitive) + prefix_lower = theme.lower() + matches = [ + (tag, count) + for tag, count in all_tags_with_counts + if tag.lower().startswith(prefix_lower) + ] + + # Already sorted by popularity from get_popular_tags + matches = matches[:limit] + + # Generate HTML suggestions with ARIA attributes + html_parts = [] + for tag, count in matches: + html_parts.append( + f'