feat: theme catalog optimization with tag search and faster enrichment

This commit is contained in:
matt 2025-10-15 17:17:46 -07:00
parent 952b151162
commit 9e6c68f559
26 changed files with 5906 additions and 5688 deletions

View file

@ -9,21 +9,24 @@ This format follows Keep a Changelog principles and aims for Semantic Versioning
## [Unreleased]
### Summary
Improved performance with new card data storage format. Card queries are now significantly faster with reduced file sizes.
Theme catalog improvements with faster processing, new tag search features, and regeneration fixes.
### Added
- **Card Data Consolidation**: All card data now stored in optimized format for faster loading
- Automatic updates after tagging/setup completes
- "Rebuild Card Files" button in Setup page for manual refresh
- 87% smaller file sizes with dramatically faster queries
- Maintains multiple backup versions for safety
- **Backward Compatibility**: Existing functionality continues to work without changes
- **Theme Catalog Optimization**:
- Consolidated theme enrichment pipeline (single pass instead of 7 separate scripts)
- Tag index for fast theme-based card queries
- Tag search API with new endpoints for card search, autocomplete, and popular tags
- Commander browser theme autocomplete with keyboard navigation
- Tag loading infrastructure for batch operations
### Changed
_No unreleased changes yet._
### Fixed
_No unreleased fixes yet._
- **Theme Regeneration**: Theme catalog can now be fully rebuilt from scratch without placeholder data
- Fixed "Anchor" placeholder issue when regenerating catalog
- Examples now generated from actual card data
- Theme export preserves all metadata fields
## [2.7.1] - 2025-10-14
### Summary

View file

@ -1,18 +1,23 @@
# MTG Python Deckbuilder ${VERSION}
### Summary
Improved performance with new card data storage format. Card queries are now significantly faster with reduced file sizes.
Theme catalog improvements with faster processing, tag search features, and regeneration fixes.
### Added
- **Card Data Consolidation**: All card data now stored in optimized format for faster loading
- Automatic updates after tagging/setup completes
- "Rebuild Card Files" button in Setup page for manual refresh
- 87% smaller file sizes with dramatically faster queries
- Maintains multiple backup versions for safety
- **Backward Compatibility**: Existing functionality continues to work without changes
- **Theme Catalog Optimization**:
- Consolidated theme enrichment pipeline
- Tag search API for theme-based card discovery
- Commander browser theme autocomplete with keyboard navigation
- Tag index for faster queries
- **Card Data Consolidation** (from previous release):
- Optimized format with smaller file sizes
- "Rebuild Card Files" button in Setup page
- Automatic updates after tagging/setup
### Changed
_No unreleased changes yet._
### Fixed
_No unreleased fixes yet._
- **Theme Regeneration**: Theme catalog can now be fully rebuilt from scratch
- Fixed placeholder data appearing in fresh installations
- Examples now generated from actual card data

View file

@ -1,203 +0,0 @@
"""
Full audit of Protection-tagged cards with kindred metadata support (M2 Phase 2).
Created: October 8, 2025
Purpose: Audit and validate Protection tag precision after implementing grant detection.
Can be re-run periodically to check tagging quality.
This script audits ALL Protection-tagged cards and categorizes them:
- Grant: Gives broad protection to other permanents YOU control
- Kindred: Gives protection to specific creature types (metadata tags)
- Mixed: Both broad and kindred/inherent
- Inherent: Only has protection itself
- ConditionalSelf: Only conditionally grants to itself
- Opponent: Grants to opponent's permanents
- Neither: False positive
Outputs:
- m2_audit_v2.json: Full analysis with summary
- m2_audit_v2_grant.csv: Cards for main Protection tag
- m2_audit_v2_kindred.csv: Cards for kindred metadata tags
- m2_audit_v2_mixed.csv: Cards with both broad and kindred grants
- m2_audit_v2_conditional.csv: Conditional self-grants (exclude)
- m2_audit_v2_inherent.csv: Inherent protection only (exclude)
- m2_audit_v2_opponent.csv: Opponent grants (exclude)
- m2_audit_v2_neither.csv: False positives (exclude)
- m2_audit_v2_all.csv: All cards combined
"""
import sys
from pathlib import Path
import pandas as pd
import json
# Add project root to path
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
from code.tagging.protection_grant_detection import (
categorize_protection_card,
get_kindred_protection_tags,
is_granting_protection,
)
def load_all_cards():
"""Load all cards from color/identity CSV files."""
csv_dir = project_root / 'csv_files'
# Get all color/identity CSVs (not the raw cards.csv)
csv_files = list(csv_dir.glob('*_cards.csv'))
csv_files = [f for f in csv_files if f.stem not in ['cards', 'testdata']]
all_cards = []
for csv_file in csv_files:
try:
df = pd.read_csv(csv_file)
all_cards.append(df)
except Exception as e:
print(f"Warning: Could not load {csv_file.name}: {e}")
# Combine all DataFrames
combined = pd.concat(all_cards, ignore_index=True)
# Drop duplicates (cards appear in multiple color files)
combined = combined.drop_duplicates(subset=['name'], keep='first')
return combined
def audit_all_protection_cards():
"""Audit all Protection-tagged cards."""
print("Loading all cards...")
df = load_all_cards()
print(f"Total cards loaded: {len(df)}")
# Filter to Protection-tagged cards (column is 'themeTags' in color CSVs)
df_prot = df[df['themeTags'].str.contains('Protection', case=False, na=False)].copy()
print(f"Protection-tagged cards: {len(df_prot)}")
# Categorize each card
categories = []
grants_list = []
kindred_tags_list = []
for idx, row in df_prot.iterrows():
name = row['name']
text = str(row.get('text', '')).replace('\\n', '\n') # Convert escaped newlines to real newlines
keywords = str(row.get('keywords', ''))
card_type = str(row.get('type', ''))
# Categorize with kindred exclusion enabled
category = categorize_protection_card(name, text, keywords, card_type, exclude_kindred=True)
# Check if it grants broadly
grants_broad = is_granting_protection(text, keywords, exclude_kindred=True)
# Get kindred tags
kindred_tags = get_kindred_protection_tags(text)
categories.append(category)
grants_list.append(grants_broad)
kindred_tags_list.append(', '.join(sorted(kindred_tags)) if kindred_tags else '')
df_prot['category'] = categories
df_prot['grants_broad'] = grants_list
df_prot['kindred_tags'] = kindred_tags_list
# Generate summary (convert numpy types to native Python for JSON serialization)
summary = {
'total': int(len(df_prot)),
'categories': {k: int(v) for k, v in df_prot['category'].value_counts().to_dict().items()},
'grants_broad_count': int(df_prot['grants_broad'].sum()),
'kindred_cards_count': int((df_prot['kindred_tags'] != '').sum()),
}
# Calculate keep vs remove
keep_categories = {'Grant', 'Mixed'}
kindred_only = df_prot[df_prot['category'] == 'Kindred']
keep_count = len(df_prot[df_prot['category'].isin(keep_categories)])
remove_count = len(df_prot[~df_prot['category'].isin(keep_categories | {'Kindred'})])
summary['keep_main_tag'] = keep_count
summary['kindred_metadata'] = len(kindred_only)
summary['remove'] = remove_count
summary['precision_estimate'] = round((keep_count / len(df_prot)) * 100, 1) if len(df_prot) > 0 else 0
# Print summary
print(f"\n{'='*60}")
print("AUDIT SUMMARY")
print(f"{'='*60}")
print(f"Total Protection-tagged cards: {summary['total']}")
print(f"\nCategories:")
for cat, count in sorted(summary['categories'].items()):
pct = (count / summary['total']) * 100
print(f" {cat:20s} {count:4d} ({pct:5.1f}%)")
print(f"\n{'='*60}")
print(f"Main Protection tag: {keep_count:4d} ({keep_count/len(df_prot)*100:5.1f}%)")
print(f"Kindred metadata only: {len(kindred_only):4d} ({len(kindred_only)/len(df_prot)*100:5.1f}%)")
print(f"Remove: {remove_count:4d} ({remove_count/len(df_prot)*100:5.1f}%)")
print(f"{'='*60}")
print(f"Precision estimate: {summary['precision_estimate']}%")
print(f"{'='*60}\n")
# Export results
output_dir = project_root / 'logs' / 'roadmaps' / 'source' / 'tagging_refinement'
output_dir.mkdir(parents=True, exist_ok=True)
# Export JSON summary
with open(output_dir / 'm2_audit_v2.json', 'w') as f:
json.dump({
'summary': summary,
'cards': df_prot[['name', 'type', 'category', 'grants_broad', 'kindred_tags', 'keywords', 'text']].to_dict(orient='records')
}, f, indent=2)
# Export CSVs by category
export_cols = ['name', 'type', 'category', 'grants_broad', 'kindred_tags', 'keywords', 'text']
# Grant category
df_grant = df_prot[df_prot['category'] == 'Grant']
df_grant[export_cols].to_csv(output_dir / 'm2_audit_v2_grant.csv', index=False)
print(f"Exported {len(df_grant)} Grant cards to m2_audit_v2_grant.csv")
# Kindred category
df_kindred = df_prot[df_prot['category'] == 'Kindred']
df_kindred[export_cols].to_csv(output_dir / 'm2_audit_v2_kindred.csv', index=False)
print(f"Exported {len(df_kindred)} Kindred cards to m2_audit_v2_kindred.csv")
# Mixed category
df_mixed = df_prot[df_prot['category'] == 'Mixed']
df_mixed[export_cols].to_csv(output_dir / 'm2_audit_v2_mixed.csv', index=False)
print(f"Exported {len(df_mixed)} Mixed cards to m2_audit_v2_mixed.csv")
# ConditionalSelf category
df_conditional = df_prot[df_prot['category'] == 'ConditionalSelf']
df_conditional[export_cols].to_csv(output_dir / 'm2_audit_v2_conditional.csv', index=False)
print(f"Exported {len(df_conditional)} ConditionalSelf cards to m2_audit_v2_conditional.csv")
# Inherent category
df_inherent = df_prot[df_prot['category'] == 'Inherent']
df_inherent[export_cols].to_csv(output_dir / 'm2_audit_v2_inherent.csv', index=False)
print(f"Exported {len(df_inherent)} Inherent cards to m2_audit_v2_inherent.csv")
# Opponent category
df_opponent = df_prot[df_prot['category'] == 'Opponent']
df_opponent[export_cols].to_csv(output_dir / 'm2_audit_v2_opponent.csv', index=False)
print(f"Exported {len(df_opponent)} Opponent cards to m2_audit_v2_opponent.csv")
# Neither category
df_neither = df_prot[df_prot['category'] == 'Neither']
df_neither[export_cols].to_csv(output_dir / 'm2_audit_v2_neither.csv', index=False)
print(f"Exported {len(df_neither)} Neither cards to m2_audit_v2_neither.csv")
# All cards
df_prot[export_cols].to_csv(output_dir / 'm2_audit_v2_all.csv', index=False)
print(f"Exported {len(df_prot)} total cards to m2_audit_v2_all.csv")
print(f"\nAll files saved to: {output_dir}")
return df_prot, summary
if __name__ == '__main__':
df_results, summary = audit_all_protection_cards()

View file

@ -1,118 +0,0 @@
"""Opt-in guard that compares multi-theme filter performance to a stored baseline.
Run inside the project virtual environment:
python -m code.scripts.check_random_theme_perf --baseline config/random_theme_perf_baseline.json
The script executes the same profiling loop as `profile_multi_theme_filter` and fails
if the observed mean or p95 timings regress more than the allowed threshold.
"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Any, Dict, Tuple
PROJECT_ROOT = Path(__file__).resolve().parents[2]
DEFAULT_BASELINE = PROJECT_ROOT / "config" / "random_theme_perf_baseline.json"
if str(PROJECT_ROOT) not in sys.path:
sys.path.append(str(PROJECT_ROOT))
from code.scripts.profile_multi_theme_filter import run_profile # type: ignore # noqa: E402
def _load_baseline(path: Path) -> Dict[str, Any]:
if not path.exists():
raise FileNotFoundError(f"Baseline file not found: {path}")
data = json.loads(path.read_text(encoding="utf-8"))
return data
def _extract(metric: Dict[str, Any], key: str) -> float:
try:
value = float(metric.get(key, 0.0))
except Exception:
value = 0.0
return value
def _check_section(name: str, actual: Dict[str, Any], baseline: Dict[str, Any], threshold: float) -> Tuple[bool, str]:
a_mean = _extract(actual, "mean_ms")
b_mean = _extract(baseline, "mean_ms")
a_p95 = _extract(actual, "p95_ms")
b_p95 = _extract(baseline, "p95_ms")
allowed_mean = b_mean * (1.0 + threshold)
allowed_p95 = b_p95 * (1.0 + threshold)
mean_ok = a_mean <= allowed_mean or b_mean == 0.0
p95_ok = a_p95 <= allowed_p95 or b_p95 == 0.0
status = mean_ok and p95_ok
def _format_row(label: str, actual_val: float, baseline_val: float, allowed_val: float, ok: bool) -> str:
trend = ((actual_val - baseline_val) / baseline_val * 100.0) if baseline_val else 0.0
trend_str = f"{trend:+.1f}%" if baseline_val else "n/a"
limit_str = f"{allowed_val:.3f}ms" if baseline_val else "n/a"
return f" {label:<6} actual={actual_val:.3f}ms baseline={baseline_val:.3f}ms ({trend_str}), limit {limit_str} -> {'OK' if ok else 'FAIL'}"
rows = [f"Section: {name}"]
rows.append(_format_row("mean", a_mean, b_mean, allowed_mean, mean_ok))
rows.append(_format_row("p95", a_p95, b_p95, allowed_p95, p95_ok))
return status, "\n".join(rows)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Check multi-theme filtering performance against a baseline")
parser.add_argument("--baseline", type=Path, default=DEFAULT_BASELINE, help="Baseline JSON file (default: config/random_theme_perf_baseline.json)")
parser.add_argument("--iterations", type=int, default=400, help="Number of iterations to sample (default: 400)")
parser.add_argument("--seed", type=int, default=None, help="Optional RNG seed for reproducibility")
parser.add_argument("--threshold", type=float, default=0.15, help="Allowed regression threshold as a fraction (default: 0.15 = 15%)")
parser.add_argument("--update-baseline", action="store_true", help="Overwrite the baseline file with the newly collected metrics")
args = parser.parse_args(argv)
baseline_path = args.baseline if args.baseline else DEFAULT_BASELINE
if args.update_baseline and not baseline_path.parent.exists():
baseline_path.parent.mkdir(parents=True, exist_ok=True)
if not args.update_baseline:
baseline = _load_baseline(baseline_path)
else:
baseline = {}
results = run_profile(args.iterations, args.seed)
cascade_status, cascade_report = _check_section("cascade", results.get("cascade", {}), baseline.get("cascade", {}), args.threshold)
synergy_status, synergy_report = _check_section("synergy", results.get("synergy", {}), baseline.get("synergy", {}), args.threshold)
print("Iterations:", results.get("iterations"))
print("Seed:", results.get("seed"))
print(cascade_report)
print(synergy_report)
overall_ok = cascade_status and synergy_status
if args.update_baseline:
payload = {
"iterations": results.get("iterations"),
"seed": results.get("seed"),
"cascade": results.get("cascade"),
"synergy": results.get("synergy"),
}
baseline_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
print(f"Baseline updated → {baseline_path}")
return 0
if not overall_ok:
print(f"FAIL: performance regressions exceeded {args.threshold * 100:.1f}% threshold", file=sys.stderr)
return 1
print("PASS: performance within allowed threshold")
return 0
if __name__ == "__main__": # pragma: no cover
raise SystemExit(main())

View file

@ -0,0 +1,135 @@
"""CLI wrapper for theme enrichment pipeline.
Runs the consolidated theme enrichment pipeline with command-line options.
For backward compatibility, individual scripts can still be run separately,
but this provides a faster single-pass alternative.
Usage:
python code/scripts/enrich_themes.py --write
python code/scripts/enrich_themes.py --dry-run --enforce-min
"""
from __future__ import annotations
import argparse
import os
import sys
from pathlib import Path
# Add project root to path
ROOT = Path(__file__).resolve().parents[2]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
# Import after adding to path
from code.tagging.theme_enrichment import run_enrichment_pipeline # noqa: E402
def main() -> int:
"""Run theme enrichment pipeline from CLI."""
parser = argparse.ArgumentParser(
description='Consolidated theme metadata enrichment pipeline',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Dry run (no changes written):
python code/scripts/enrich_themes.py --dry-run
# Write changes:
python code/scripts/enrich_themes.py --write
# Enforce minimum examples (errors if insufficient):
python code/scripts/enrich_themes.py --write --enforce-min
# Strict validation for cornerstone themes:
python code/scripts/enrich_themes.py --write --strict
Note: This replaces running 7 separate scripts (autofill, pad, cleanup, purge,
augment, suggestions, lint) with a single 5-10x faster operation.
"""
)
parser.add_argument(
'--write',
action='store_true',
help='Write changes to disk (default: dry run)'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Dry run mode: show what would be changed without writing'
)
parser.add_argument(
'--min',
'--min-examples',
type=int,
default=None,
metavar='N',
help='Minimum number of example commanders (default: $EDITORIAL_MIN_EXAMPLES or 5)'
)
parser.add_argument(
'--enforce-min',
action='store_true',
help='Treat minimum examples violations as errors'
)
parser.add_argument(
'--strict',
action='store_true',
help='Enable strict validation (cornerstone themes must have examples)'
)
args = parser.parse_args()
# Determine write mode
if args.dry_run:
write = False
elif args.write:
write = True
else:
# Default to dry run if neither specified
write = False
print("Note: Running in dry-run mode (use --write to save changes)\n")
# Get minimum examples threshold
if args.min is not None:
min_examples = args.min
else:
min_examples = int(os.environ.get('EDITORIAL_MIN_EXAMPLES', '5'))
print("Theme Enrichment Pipeline")
print("========================")
print(f"Mode: {'WRITE' if write else 'DRY RUN'}")
print(f"Min examples: {min_examples}")
print(f"Enforce min: {args.enforce_min}")
print(f"Strict: {args.strict}")
print()
try:
stats = run_enrichment_pipeline(
root=ROOT,
min_examples=min_examples,
write=write,
enforce_min=args.enforce_min,
strict=args.strict,
progress_callback=None, # Use default print
)
# Return non-zero if there are lint errors
if stats.lint_errors > 0:
print(f"\n❌ Enrichment completed with {stats.lint_errors} error(s)")
return 1
print("\n✅ Enrichment completed successfully")
return 0
except KeyboardInterrupt:
print("\n\nInterrupted by user")
return 130
except Exception as e:
print(f"\n❌ Error: {e}", file=sys.stderr)
if '--debug' in sys.argv:
raise
return 1
if __name__ == '__main__':
raise SystemExit(main())

View file

@ -123,6 +123,9 @@ def main():
enforced_set = set(enforced_synergies)
inferred_synergies = [s for s in synergy_list if s not in curated_set and s not in enforced_set]
example_cards_value = entry.get('example_cards', [])
example_commanders_value = entry.get('example_commanders', [])
doc = {
'id': slug,
'display_name': theme_name,
@ -132,13 +135,40 @@ def main():
'inferred_synergies': inferred_synergies,
'primary_color': entry.get('primary_color'),
'secondary_color': entry.get('secondary_color'),
'example_cards': example_cards_value,
'example_commanders': example_commanders_value,
'synergy_example_cards': entry.get('synergy_example_cards', []),
'synergy_commanders': entry.get('synergy_commanders', []),
'deck_archetype': entry.get('deck_archetype'),
'popularity_hint': entry.get('popularity_hint'),
'popularity_bucket': entry.get('popularity_bucket'),
'editorial_quality': entry.get('editorial_quality'),
'description': entry.get('description'),
'notes': ''
}
# Drop None color keys for cleanliness
# Drop None/empty keys for cleanliness
if doc['primary_color'] is None:
doc.pop('primary_color')
if doc.get('secondary_color') is None:
doc.pop('secondary_color')
if not doc.get('example_cards'):
doc.pop('example_cards')
if not doc.get('example_commanders'):
doc.pop('example_commanders')
if not doc.get('synergy_example_cards'):
doc.pop('synergy_example_cards')
if not doc.get('synergy_commanders'):
doc.pop('synergy_commanders')
if doc.get('deck_archetype') is None:
doc.pop('deck_archetype')
if doc.get('popularity_hint') is None:
doc.pop('popularity_hint')
if doc.get('popularity_bucket') is None:
doc.pop('popularity_bucket')
if doc.get('editorial_quality') is None:
doc.pop('editorial_quality')
if doc.get('description') is None:
doc.pop('description')
with path.open('w', encoding='utf-8') as f:
yaml.safe_dump(doc, f, sort_keys=False, allow_unicode=True)
exported += 1

View file

@ -19,6 +19,13 @@ from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Sequence
try:
import pandas as pd
HAS_PANDAS = True
except ImportError:
HAS_PANDAS = False
pd = None # type: ignore
ROOT = Path(__file__).resolve().parents[2]
CODE_ROOT = ROOT / "code"
if str(CODE_ROOT) not in sys.path:
@ -29,6 +36,9 @@ try:
except Exception: # pragma: no cover - fallback for adhoc execution
DEFAULT_CSV_DIRECTORY = "csv_files"
# Parquet support requires pandas (imported at top of file, uses pyarrow under the hood)
HAS_PARQUET_SUPPORT = HAS_PANDAS
DEFAULT_OUTPUT_PATH = ROOT / "config" / "themes" / "theme_catalog.csv"
HEADER_COMMENT_PREFIX = "# theme_catalog"
@ -87,7 +97,68 @@ def parse_theme_tags(value: object) -> List[str]:
return []
def _load_theme_counts_from_parquet(
parquet_path: Path,
theme_variants: Dict[str, set[str]]
) -> Counter[str]:
"""Load theme counts from a parquet file using pandas (which uses pyarrow).
Args:
parquet_path: Path to the parquet file (commander_cards.parquet or all_cards.parquet)
theme_variants: Dict to accumulate theme name variants
Returns:
Counter of theme occurrences
"""
if pd is None:
return Counter()
counts: Counter[str] = Counter()
if not parquet_path.exists():
return counts
# Read only themeTags column for efficiency
try:
df = pd.read_parquet(parquet_path, columns=["themeTags"])
except Exception:
# If themeTags column doesn't exist, return empty
return counts
# Convert to list for fast iteration (faster than iterrows)
theme_tags_list = df["themeTags"].tolist()
for raw_value in theme_tags_list:
if raw_value is None or (isinstance(raw_value, float) and pd.isna(raw_value)):
continue
tags = parse_theme_tags(raw_value)
if not tags:
continue
seen_in_row: set[str] = set()
for tag in tags:
display = normalize_theme_display(tag)
if not display:
continue
key = canonical_key(display)
if key in seen_in_row:
continue
seen_in_row.add(key)
counts[key] += 1
theme_variants[key].add(display)
return counts
def _load_theme_counts(csv_path: Path, theme_variants: Dict[str, set[str]]) -> Counter[str]:
"""Load theme counts from CSV file (fallback method).
Args:
csv_path: Path to CSV file
theme_variants: Dict to accumulate theme name variants
Returns:
Counter of theme occurrences
"""
counts: Counter[str] = Counter()
if not csv_path.exists():
return counts
@ -146,24 +217,67 @@ def build_theme_catalog(
commander_filename: str = "commander_cards.csv",
cards_filename: str = "cards.csv",
logs_directory: Optional[Path] = None,
use_parquet: bool = True,
) -> CatalogBuildResult:
"""Build theme catalog from card data.
Args:
csv_directory: Directory containing CSV files (fallback)
output_path: Where to write the catalog CSV
generated_at: Optional timestamp for generation
commander_filename: Name of commander CSV file
cards_filename: Name of cards CSV file
logs_directory: Optional directory to copy output to
use_parquet: If True, try to use all_cards.parquet first (default: True)
Returns:
CatalogBuildResult with generated rows and metadata
"""
csv_directory = csv_directory.resolve()
output_path = output_path.resolve()
theme_variants: Dict[str, set[str]] = defaultdict(set)
commander_counts = _load_theme_counts(csv_directory / commander_filename, theme_variants)
# Try to use parquet file first (much faster)
used_parquet = False
if use_parquet and HAS_PARQUET_SUPPORT:
try:
# Use dedicated parquet files (matches CSV structure exactly)
parquet_dir = csv_directory.parent / "card_files"
card_counts: Counter[str] = Counter()
cards_path = csv_directory / cards_filename
if cards_path.exists():
card_counts = _load_theme_counts(cards_path, theme_variants)
else:
# Fallback: scan all *_cards.csv except commander
for candidate in csv_directory.glob("*_cards.csv"):
if candidate.name == commander_filename:
continue
card_counts += _load_theme_counts(candidate, theme_variants)
# Load commander counts directly from commander_cards.parquet
commander_parquet = parquet_dir / "commander_cards.parquet"
commander_counts = _load_theme_counts_from_parquet(
commander_parquet, theme_variants=theme_variants
)
# CSV method doesn't load non-commander cards, so we don't either
card_counts = Counter()
used_parquet = True
print("✓ Loaded theme data from parquet files")
except Exception as e:
print(f"⚠ Failed to load from parquet: {e}")
print(" Falling back to CSV files...")
used_parquet = False
# Fallback to CSV files if parquet not available or failed
if not used_parquet:
commander_counts = _load_theme_counts(csv_directory / commander_filename, theme_variants)
card_counts: Counter[str] = Counter()
cards_path = csv_directory / cards_filename
if cards_path.exists():
card_counts = _load_theme_counts(cards_path, theme_variants)
else:
# Fallback: scan all *_cards.csv except commander
for candidate in csv_directory.glob("*_cards.csv"):
if candidate.name == commander_filename:
continue
card_counts += _load_theme_counts(candidate, theme_variants)
print("✓ Loaded theme data from CSV files")
keys = sorted(set(card_counts.keys()) | set(commander_counts.keys()))
generated_at_iso = _derive_generated_at(generated_at)

View file

@ -1,305 +0,0 @@
"""Catalog diff helper for verifying multi-face merge output.
This utility regenerates the card CSV catalog (optionally writing compatibility
snapshots) and then compares the merged outputs against the baseline snapshots.
It is intended to support the MDFC rollout checklist by providing a concise summary
of how many rows were merged, which cards collapsed into a single record, and
whether any tag unions diverge from expectations.
Example usage (from repo root, inside virtualenv):
python -m code.scripts.preview_dfc_catalog_diff --compat-snapshot --output logs/dfc_catalog_diff.json
The script prints a human readable summary to stdout and optionally writes a JSON
artifact for release/staging review.
"""
from __future__ import annotations
import argparse
import ast
import importlib
import json
import os
import sys
import time
from collections import Counter
from pathlib import Path
from typing import Any, Dict, Iterable, List, Sequence
import pandas as pd
from settings import COLORS, CSV_DIRECTORY
DEFAULT_COMPAT_DIR = Path(os.getenv("DFC_COMPAT_DIR", "csv_files/compat_faces"))
CSV_ROOT = Path(CSV_DIRECTORY)
def _parse_list_cell(value: Any) -> List[str]:
"""Convert serialized list cells ("['A', 'B']") into Python lists."""
if isinstance(value, list):
return [str(item) for item in value]
if value is None:
return []
if isinstance(value, float) and pd.isna(value): # type: ignore[arg-type]
return []
text = str(value).strip()
if not text:
return []
try:
parsed = ast.literal_eval(text)
except (SyntaxError, ValueError):
return [text]
if isinstance(parsed, list):
return [str(item) for item in parsed]
return [str(parsed)]
def _load_catalog(path: Path) -> pd.DataFrame:
if not path.exists():
raise FileNotFoundError(f"Catalog file missing: {path}")
df = pd.read_csv(path)
for column in ("themeTags", "keywords", "creatureTypes"):
if column in df.columns:
df[column] = df[column].apply(_parse_list_cell)
return df
def _multi_face_names(df: pd.DataFrame) -> List[str]:
counts = Counter(df.get("name", []))
return [name for name, count in counts.items() if isinstance(name, str) and count > 1]
def _collect_tags(series: Iterable[List[str]]) -> List[str]:
tags: List[str] = []
for value in series:
if isinstance(value, list):
tags.extend(str(item) for item in value)
return sorted(set(tags))
def _summarize_color(
color: str,
merged: pd.DataFrame,
baseline: pd.DataFrame,
sample_size: int,
) -> Dict[str, Any]:
merged_names = set(merged.get("name", []))
baseline_names = list(baseline.get("name", []))
baseline_name_set = set(name for name in baseline_names if isinstance(name, str))
multi_face = _multi_face_names(baseline)
collapsed = []
tag_mismatches: List[str] = []
missing_after_merge: List[str] = []
for name in multi_face:
group = baseline[baseline["name"] == name]
merged_row = merged[merged["name"] == name]
if merged_row.empty:
missing_after_merge.append(name)
continue
expected_tags = _collect_tags(group["themeTags"]) if "themeTags" in group else []
merged_tags = _collect_tags(merged_row.iloc[[0]]["themeTags"]) if "themeTags" in merged_row else []
if expected_tags != merged_tags:
tag_mismatches.append(name)
collapsed.append(name)
removed_names = sorted(baseline_name_set - merged_names)
added_names = sorted(merged_names - baseline_name_set)
return {
"rows_merged": len(merged),
"rows_baseline": len(baseline),
"row_delta": len(merged) - len(baseline),
"multi_face_groups": len(multi_face),
"collapsed_sample": collapsed[:sample_size],
"tag_union_mismatches": tag_mismatches[:sample_size],
"missing_after_merge": missing_after_merge[:sample_size],
"removed_names": removed_names[:sample_size],
"added_names": added_names[:sample_size],
}
def _refresh_catalog(colors: Sequence[str], compat_snapshot: bool) -> None:
os.environ.pop("ENABLE_DFC_MERGE", None)
os.environ["DFC_COMPAT_SNAPSHOT"] = "1" if compat_snapshot else "0"
importlib.invalidate_caches()
# Reload tagger to pick up the new env var
tagger = importlib.import_module("code.tagging.tagger")
tagger = importlib.reload(tagger) # type: ignore[assignment]
for color in colors:
tagger.load_dataframe(color)
def generate_diff(
colors: Sequence[str],
compat_dir: Path,
sample_size: int,
) -> Dict[str, Any]:
per_color: Dict[str, Any] = {}
overall = {
"total_rows_merged": 0,
"total_rows_baseline": 0,
"total_multi_face_groups": 0,
"colors": len(colors),
"tag_union_mismatches": 0,
"missing_after_merge": 0,
}
for color in colors:
merged_path = CSV_ROOT / f"{color}_cards.csv"
baseline_path = compat_dir / f"{color}_cards_unmerged.csv"
merged_df = _load_catalog(merged_path)
baseline_df = _load_catalog(baseline_path)
summary = _summarize_color(color, merged_df, baseline_df, sample_size)
per_color[color] = summary
overall["total_rows_merged"] += summary["rows_merged"]
overall["total_rows_baseline"] += summary["rows_baseline"]
overall["total_multi_face_groups"] += summary["multi_face_groups"]
overall["tag_union_mismatches"] += len(summary["tag_union_mismatches"])
overall["missing_after_merge"] += len(summary["missing_after_merge"])
overall["row_delta_total"] = overall["total_rows_merged"] - overall["total_rows_baseline"]
return {"overall": overall, "per_color": per_color}
def main(argv: List[str]) -> int:
parser = argparse.ArgumentParser(description="Preview merged vs baseline DFC catalog diff")
parser.add_argument(
"--skip-refresh",
action="store_true",
help="Skip rebuilding the catalog in compatibility mode (requires existing compat snapshots)",
)
parser.add_argument(
"--mode",
default="",
help="[Deprecated] Legacy ENABLE_DFC_MERGE value (compat|1|0 etc.)",
)
parser.add_argument(
"--compat-snapshot",
dest="compat_snapshot",
action="store_true",
help="Write compatibility snapshots before diffing (default: off unless legacy --mode compat)",
)
parser.add_argument(
"--no-compat-snapshot",
dest="compat_snapshot",
action="store_false",
help="Skip compatibility snapshots even if legacy --mode compat is supplied",
)
parser.set_defaults(compat_snapshot=None)
parser.add_argument(
"--colors",
nargs="*",
help="Optional subset of colors to diff (defaults to full COLORS list)",
)
parser.add_argument(
"--compat-dir",
type=Path,
default=DEFAULT_COMPAT_DIR,
help="Directory containing unmerged compatibility snapshots (default: %(default)s)",
)
parser.add_argument(
"--output",
type=Path,
help="Optional JSON file to write with the diff summary",
)
parser.add_argument(
"--sample-size",
type=int,
default=10,
help="Number of sample entries to include per section (default: %(default)s)",
)
args = parser.parse_args(argv)
colors = tuple(args.colors) if args.colors else tuple(COLORS)
compat_dir = args.compat_dir
mode = str(args.mode or "").strip().lower()
if mode and mode not in {"compat", "dual", "both", "1", "on", "true", "0", "off", "false", "disabled"}:
print(
f" Legacy --mode value '{mode}' detected; merge remains enabled. Use --compat-snapshot as needed.",
flush=True,
)
if args.compat_snapshot is None:
compat_snapshot = mode in {"compat", "dual", "both"}
else:
compat_snapshot = args.compat_snapshot
if mode:
print(
" Ignoring deprecated --mode value because --compat-snapshot/--no-compat-snapshot was supplied.",
flush=True,
)
if mode in {"0", "off", "false", "disabled"}:
print(
"⚠ ENABLE_DFC_MERGE=off is deprecated; the merge remains enabled regardless of the value.",
flush=True,
)
if not args.skip_refresh:
start = time.perf_counter()
_refresh_catalog(colors, compat_snapshot)
duration = time.perf_counter() - start
snapshot_msg = "with compat snapshot" if compat_snapshot else "merged-only"
print(f"✔ Refreshed catalog in {duration:.1f}s ({snapshot_msg})")
else:
print(" Using existing catalog outputs (refresh skipped)")
try:
diff = generate_diff(colors, compat_dir, args.sample_size)
except FileNotFoundError as exc:
print(f"ERROR: {exc}")
print("Run without --skip-refresh (or ensure compat snapshots exist).", file=sys.stderr)
return 2
overall = diff["overall"]
print("\n=== DFC Catalog Diff Summary ===")
print(
f"Merged rows: {overall['total_rows_merged']:,} | Baseline rows: {overall['total_rows_baseline']:,} | "
f"Δ rows: {overall['row_delta_total']:,}"
)
print(
f"Multi-face groups: {overall['total_multi_face_groups']:,} | "
f"Tag union mismatches: {overall['tag_union_mismatches']} | Missing after merge: {overall['missing_after_merge']}"
)
for color, summary in diff["per_color"].items():
print(f"\n[{color}] baseline={summary['rows_baseline']} merged={summary['rows_merged']} Δ={summary['row_delta']}")
if summary["multi_face_groups"]:
print(f" multi-face groups: {summary['multi_face_groups']}")
if summary["collapsed_sample"]:
sample = ", ".join(summary["collapsed_sample"][:3])
print(f" collapsed sample: {sample}")
if summary["tag_union_mismatches"]:
print(f" TAG MISMATCH sample: {', '.join(summary['tag_union_mismatches'])}")
if summary["missing_after_merge"]:
print(f" MISSING sample: {', '.join(summary['missing_after_merge'])}")
if summary["removed_names"]:
print(f" removed sample: {', '.join(summary['removed_names'])}")
if summary["added_names"]:
print(f" added sample: {', '.join(summary['added_names'])}")
if args.output:
payload = {
"captured_at": int(time.time()),
"mode": args.mode,
"colors": colors,
"compat_dir": str(compat_dir),
"summary": diff,
}
try:
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
print(f"\n📄 Wrote JSON summary to {args.output}")
except Exception as exc: # pragma: no cover
print(f"Failed to write output file {args.output}: {exc}", file=sys.stderr)
return 3
return 0
if __name__ == "__main__": # pragma: no cover
raise SystemExit(main(sys.argv[1:]))

View file

@ -1,105 +0,0 @@
"""CLI utility: snapshot preview metrics and emit summary/top slow themes.
Usage (from repo root virtualenv):
python -m code.scripts.preview_metrics_snapshot --limit 10 --output logs/preview_metrics_snapshot.json
Fetches /themes/metrics (requires WEB_THEME_PICKER_DIAGNOSTICS=1) and writes a compact JSON plus
human-readable summary to stdout.
"""
from __future__ import annotations
import argparse
import json
import sys
import time
from pathlib import Path
from typing import Any, Dict
import urllib.request
import urllib.error
DEFAULT_URL = "http://localhost:8000/themes/metrics"
def fetch_metrics(url: str) -> Dict[str, Any]:
req = urllib.request.Request(url, headers={"Accept": "application/json"})
with urllib.request.urlopen(req, timeout=10) as resp: # nosec B310 (local trusted)
data = resp.read().decode("utf-8", "replace")
try:
return json.loads(data) # type: ignore[return-value]
except json.JSONDecodeError as e: # pragma: no cover - unlikely if server OK
raise SystemExit(f"Invalid JSON from metrics endpoint: {e}\nRaw: {data[:400]}")
def summarize(metrics: Dict[str, Any], top_n: int) -> Dict[str, Any]:
preview = (metrics.get("preview") or {}) if isinstance(metrics, dict) else {}
per_theme = preview.get("per_theme") or {}
# Compute top slow themes by avg_ms
items = []
for slug, info in per_theme.items():
if not isinstance(info, dict):
continue
avg = info.get("avg_ms")
if isinstance(avg, (int, float)):
items.append((slug, float(avg), info))
items.sort(key=lambda x: x[1], reverse=True)
top = items[:top_n]
return {
"preview_requests": preview.get("preview_requests"),
"preview_cache_hits": preview.get("preview_cache_hits"),
"preview_avg_build_ms": preview.get("preview_avg_build_ms"),
"preview_p95_build_ms": preview.get("preview_p95_build_ms"),
"preview_ttl_seconds": preview.get("preview_ttl_seconds"),
"editorial_curated_vs_sampled_pct": preview.get("editorial_curated_vs_sampled_pct"),
"top_slowest": [
{
"slug": slug,
"avg_ms": avg,
"p95_ms": info.get("p95_ms"),
"builds": info.get("builds"),
"requests": info.get("requests"),
"avg_curated_pct": info.get("avg_curated_pct"),
}
for slug, avg, info in top
],
}
def main(argv: list[str]) -> int:
ap = argparse.ArgumentParser(description="Snapshot preview metrics")
ap.add_argument("--url", default=DEFAULT_URL, help="Metrics endpoint URL (default: %(default)s)")
ap.add_argument("--limit", type=int, default=10, help="Top N slow themes to include (default: %(default)s)")
ap.add_argument("--output", type=Path, help="Optional output JSON file for snapshot")
ap.add_argument("--quiet", action="store_true", help="Suppress stdout summary (still writes file if --output)")
args = ap.parse_args(argv)
try:
raw = fetch_metrics(args.url)
except urllib.error.URLError as e:
print(f"ERROR: Failed fetching metrics endpoint: {e}", file=sys.stderr)
return 2
summary = summarize(raw, args.limit)
snapshot = {
"captured_at": int(time.time()),
"source": args.url,
"summary": summary,
}
if args.output:
try:
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(json.dumps(snapshot, indent=2, sort_keys=True), encoding="utf-8")
except Exception as e: # pragma: no cover
print(f"ERROR: writing snapshot file failed: {e}", file=sys.stderr)
return 3
if not args.quiet:
print("Preview Metrics Snapshot:")
print(json.dumps(summary, indent=2))
return 0
if __name__ == "__main__": # pragma: no cover
raise SystemExit(main(sys.argv[1:]))

View file

@ -1,349 +0,0 @@
"""Ad-hoc performance benchmark for theme preview build latency (Phase A validation).
Runs warm-up plus measured request loops against several theme slugs and prints
aggregate latency stats (p50/p90/p95, cache hit ratio evolution). Intended to
establish or validate that refactor did not introduce >5% p95 regression.
Usage (ensure server running locally commonly :8080 in docker compose):
python -m code.scripts.preview_perf_benchmark --themes 8 --loops 40 \
--url http://localhost:8080 --warm 1 --limit 12
Theme slug discovery hierarchy (when --theme not provided):
1. Try /themes/index.json (legacy / planned static index)
2. Fallback to /themes/api/themes (current API) and take the first N ids
The discovered slugs are sorted deterministically then truncated to N.
NOTE: This is intentionally minimal (no external deps). For stable comparisons
run with identical parameters pre/post-change and commit the JSON output under
logs/perf/.
"""
from __future__ import annotations
import argparse
import json
import statistics
import time
from typing import Any, Dict, List
import urllib.request
import urllib.error
import sys
from pathlib import Path
def _fetch_json(url: str) -> Dict[str, Any]:
req = urllib.request.Request(url, headers={"Accept": "application/json"})
with urllib.request.urlopen(req, timeout=15) as resp: # nosec B310 local dev
data = resp.read().decode("utf-8", "replace")
return json.loads(data) # type: ignore[return-value]
def _fetch_json_with_retry(url: str, attempts: int = 3, delay: float = 0.6) -> Dict[str, Any]:
last_error: Exception | None = None
for attempt in range(1, attempts + 1):
try:
return _fetch_json(url)
except Exception as exc: # pragma: no cover - network variability
last_error = exc
if attempt < attempts:
print(json.dumps({ # noqa: T201
"event": "preview_perf_fetch_retry",
"url": url,
"attempt": attempt,
"max_attempts": attempts,
"error": str(exc),
}))
time.sleep(delay * attempt)
else:
raise
raise last_error # pragma: no cover - defensive; should be unreachable
def select_theme_slugs(base_url: str, count: int) -> List[str]:
"""Discover theme slugs for benchmarking.
Attempts legacy static index first, then falls back to live API listing.
"""
errors: List[str] = []
slugs: List[str] = []
# Attempt 1: legacy /themes/index.json
try:
idx = _fetch_json(f"{base_url.rstrip('/')}/themes/index.json")
entries = idx.get("themes") or []
for it in entries:
if not isinstance(it, dict):
continue
slug = it.get("slug") or it.get("id") or it.get("theme_id")
if isinstance(slug, str):
slugs.append(slug)
except Exception as e: # pragma: no cover - network variability
errors.append(f"index.json failed: {e}")
if not slugs:
# Attempt 2: live API listing
try:
listing = _fetch_json(f"{base_url.rstrip('/')}/themes/api/themes")
items = listing.get("items") or []
for it in items:
if not isinstance(it, dict):
continue
tid = it.get("id") or it.get("slug") or it.get("theme_id")
if isinstance(tid, str):
slugs.append(tid)
except Exception as e: # pragma: no cover - network variability
errors.append(f"api/themes failed: {e}")
slugs = sorted(set(slugs))[:count]
if not slugs:
raise SystemExit("No theme slugs discovered; cannot benchmark (" + "; ".join(errors) + ")")
return slugs
def fetch_all_theme_slugs(base_url: str, page_limit: int = 200) -> List[str]:
"""Fetch all theme slugs via paginated /themes/api/themes endpoint.
Uses maximum page size (200) and iterates using offset until no next page.
Returns deterministic sorted unique list of slugs.
"""
slugs: List[str] = []
offset = 0
seen: set[str] = set()
page_attempts = 5
page_delay = 1.2
while True:
url = f"{base_url.rstrip('/')}/themes/api/themes?limit={page_limit}&offset={offset}"
data: Dict[str, Any] | None = None
last_error: Exception | None = None
for attempt in range(1, page_attempts + 1):
try:
data = _fetch_json_with_retry(url, attempts=4, delay=0.75)
break
except Exception as exc: # pragma: no cover - network variability
last_error = exc
if attempt < page_attempts:
print(json.dumps({ # noqa: T201
"event": "preview_perf_page_retry",
"offset": offset,
"attempt": attempt,
"max_attempts": page_attempts,
"error": str(exc),
}))
time.sleep(page_delay * attempt)
else:
raise SystemExit(f"Failed fetching themes page offset={offset}: {exc}")
if data is None: # pragma: no cover - defensive
raise SystemExit(f"Failed fetching themes page offset={offset}: {last_error}")
items = data.get("items") or []
for it in items:
if not isinstance(it, dict):
continue
tid = it.get("id") or it.get("slug") or it.get("theme_id")
if isinstance(tid, str) and tid not in seen:
seen.add(tid)
slugs.append(tid)
next_offset = data.get("next_offset")
if not next_offset or next_offset == offset:
break
offset = int(next_offset)
return sorted(slugs)
def percentile(values: List[float], pct: float) -> float:
if not values:
return 0.0
sv = sorted(values)
k = (len(sv) - 1) * pct
f = int(k)
c = min(f + 1, len(sv) - 1)
if f == c:
return sv[f]
d0 = sv[f] * (c - k)
d1 = sv[c] * (k - f)
return d0 + d1
def run_loop(base_url: str, slugs: List[str], loops: int, limit: int, warm: bool, path_template: str) -> Dict[str, Any]:
latencies: List[float] = []
per_slug_counts = {s: 0 for s in slugs}
t_start = time.time()
for i in range(loops):
slug = slugs[i % len(slugs)]
# path_template may contain {slug} and {limit}
try:
rel = path_template.format(slug=slug, limit=limit)
except Exception:
rel = f"/themes/api/theme/{slug}/preview?limit={limit}"
if not rel.startswith('/'):
rel = '/' + rel
url = f"{base_url.rstrip('/')}{rel}"
t0 = time.time()
try:
_fetch_json(url)
except Exception as e:
print(json.dumps({"event": "perf_benchmark_error", "slug": slug, "error": str(e)})) # noqa: T201
continue
ms = (time.time() - t0) * 1000.0
latencies.append(ms)
per_slug_counts[slug] += 1
elapsed = time.time() - t_start
return {
"warm": warm,
"loops": loops,
"slugs": slugs,
"per_slug_requests": per_slug_counts,
"elapsed_s": round(elapsed, 3),
"p50_ms": round(percentile(latencies, 0.50), 2),
"p90_ms": round(percentile(latencies, 0.90), 2),
"p95_ms": round(percentile(latencies, 0.95), 2),
"avg_ms": round(statistics.mean(latencies), 2) if latencies else 0.0,
"count": len(latencies),
"_latencies": latencies, # internal (removed in final result unless explicitly retained)
}
def _stats_from_latencies(latencies: List[float]) -> Dict[str, Any]:
if not latencies:
return {"count": 0, "p50_ms": 0.0, "p90_ms": 0.0, "p95_ms": 0.0, "avg_ms": 0.0}
return {
"count": len(latencies),
"p50_ms": round(percentile(latencies, 0.50), 2),
"p90_ms": round(percentile(latencies, 0.90), 2),
"p95_ms": round(percentile(latencies, 0.95), 2),
"avg_ms": round(statistics.mean(latencies), 2),
}
def main(argv: List[str]) -> int:
ap = argparse.ArgumentParser(description="Theme preview performance benchmark")
ap.add_argument("--url", default="http://localhost:8000", help="Base server URL (default: %(default)s)")
ap.add_argument("--themes", type=int, default=6, help="Number of theme slugs to exercise (default: %(default)s)")
ap.add_argument("--loops", type=int, default=60, help="Total request iterations (default: %(default)s)")
ap.add_argument("--limit", type=int, default=12, help="Preview size (default: %(default)s)")
ap.add_argument("--path-template", default="/themes/api/theme/{slug}/preview?limit={limit}", help="Format string for preview request path (default: %(default)s)")
ap.add_argument("--theme", action="append", dest="explicit_theme", help="Explicit theme slug(s); overrides automatic selection")
ap.add_argument("--warm", type=int, default=1, help="Number of warm-up loops (full cycles over selected slugs) (default: %(default)s)")
ap.add_argument("--output", type=Path, help="Optional JSON output path (committed under logs/perf)")
ap.add_argument("--all", action="store_true", help="Exercise ALL themes (ignores --themes; loops auto-set to passes*total_slugs unless --loops-explicit)")
ap.add_argument("--passes", type=int, default=1, help="When using --all, number of passes over the full theme set (default: %(default)s)")
# Hidden flag to detect if user explicitly set --loops (argparse has no direct support, so use sentinel technique)
# We keep original --loops for backwards compatibility; when --all we recompute unless user passed --loops-explicit
ap.add_argument("--loops-explicit", action="store_true", help=argparse.SUPPRESS)
ap.add_argument("--extract-warm-baseline", type=Path, help="If multi-pass (--all --passes >1), write a warm-only baseline JSON (final pass stats) to this path")
args = ap.parse_args(argv)
try:
if args.explicit_theme:
slugs = args.explicit_theme
elif args.all:
slugs = fetch_all_theme_slugs(args.url)
else:
slugs = select_theme_slugs(args.url, args.themes)
except SystemExit as e: # pragma: no cover - dependency on live server
print(str(e), file=sys.stderr)
return 2
mode = "all" if args.all else "subset"
total_slugs = len(slugs)
if args.all and not args.loops_explicit:
# Derive loops = passes * total_slugs
args.loops = max(1, args.passes) * total_slugs
print(json.dumps({ # noqa: T201
"event": "preview_perf_start",
"mode": mode,
"total_slugs": total_slugs,
"planned_loops": args.loops,
"passes": args.passes if args.all else None,
}))
# Execution paths:
# 1. Standard subset or single-pass all: warm cycles -> single measured run
# 2. Multi-pass all mode (--all --passes >1): iterate passes capturing per-pass stats (no separate warm loops)
if args.all and args.passes > 1:
pass_results: List[Dict[str, Any]] = []
combined_latencies: List[float] = []
t0_all = time.time()
for p in range(1, args.passes + 1):
r = run_loop(args.url, slugs, len(slugs), args.limit, warm=(p == 1), path_template=args.path_template)
lat = r.pop("_latencies", [])
combined_latencies.extend(lat)
pass_result = {
"pass": p,
"warm": r["warm"],
"elapsed_s": r["elapsed_s"],
"p50_ms": r["p50_ms"],
"p90_ms": r["p90_ms"],
"p95_ms": r["p95_ms"],
"avg_ms": r["avg_ms"],
"count": r["count"],
}
pass_results.append(pass_result)
total_elapsed = round(time.time() - t0_all, 3)
aggregate = _stats_from_latencies(combined_latencies)
result = {
"mode": mode,
"total_slugs": total_slugs,
"passes": args.passes,
"slugs": slugs,
"combined": {
**aggregate,
"elapsed_s": total_elapsed,
},
"passes_results": pass_results,
"cold_pass_p95_ms": pass_results[0]["p95_ms"],
"warm_pass_p95_ms": pass_results[-1]["p95_ms"],
"cold_pass_p50_ms": pass_results[0]["p50_ms"],
"warm_pass_p50_ms": pass_results[-1]["p50_ms"],
}
print(json.dumps({"event": "preview_perf_result", **result}, indent=2)) # noqa: T201
# Optional warm baseline extraction (final pass only; represents warmed steady-state)
if args.extract_warm_baseline:
try:
wb = pass_results[-1]
warm_obj = {
"event": "preview_perf_warm_baseline",
"mode": mode,
"total_slugs": total_slugs,
"warm_baseline": True,
"source_pass": wb["pass"],
"p50_ms": wb["p50_ms"],
"p90_ms": wb["p90_ms"],
"p95_ms": wb["p95_ms"],
"avg_ms": wb["avg_ms"],
"count": wb["count"],
"slugs": slugs,
}
args.extract_warm_baseline.parent.mkdir(parents=True, exist_ok=True)
args.extract_warm_baseline.write_text(json.dumps(warm_obj, indent=2, sort_keys=True), encoding="utf-8")
print(json.dumps({ # noqa: T201
"event": "preview_perf_warm_baseline_written",
"path": str(args.extract_warm_baseline),
"p95_ms": wb["p95_ms"],
}))
except Exception as e: # pragma: no cover
print(json.dumps({"event": "preview_perf_warm_baseline_error", "error": str(e)})) # noqa: T201
else:
# Warm-up loops first (if requested)
for w in range(args.warm):
run_loop(args.url, slugs, len(slugs), args.limit, warm=True, path_template=args.path_template)
result = run_loop(args.url, slugs, args.loops, args.limit, warm=False, path_template=args.path_template)
result.pop("_latencies", None)
result["slugs"] = slugs
result["mode"] = mode
result["total_slugs"] = total_slugs
if args.all:
result["passes"] = args.passes
print(json.dumps({"event": "preview_perf_result", **result}, indent=2)) # noqa: T201
if args.output:
try:
args.output.parent.mkdir(parents=True, exist_ok=True)
# Ensure we write the final result object (multi-pass already prepared above)
args.output.write_text(json.dumps(result, indent=2, sort_keys=True), encoding="utf-8")
except Exception as e: # pragma: no cover
print(f"ERROR: failed writing output file: {e}", file=sys.stderr)
return 3
return 0
if __name__ == "__main__": # pragma: no cover
raise SystemExit(main(sys.argv[1:]))

View file

@ -1,106 +0,0 @@
"""CI helper: run a warm-pass benchmark candidate (single pass over all themes)
then compare against the committed warm baseline with threshold enforcement.
Intended usage (example):
python -m code.scripts.preview_perf_ci_check --url http://localhost:8080 \
--baseline logs/perf/theme_preview_warm_baseline.json --p95-threshold 5
Exit codes:
0 success (within threshold)
2 regression (p95 delta > threshold)
3 setup / usage error
Notes:
- Uses --all --passes 1 to create a fresh candidate snapshot that approximates
a warmed steady-state (server should have background refresh / typical load).
- If you prefer multi-pass then warm-only selection, adjust logic accordingly.
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
def _wait_for_service(base_url: str, attempts: int = 12, delay: float = 1.5) -> bool:
health_url = base_url.rstrip("/") + "/healthz"
last_error: Exception | None = None
for attempt in range(1, attempts + 1):
try:
with urllib.request.urlopen(health_url, timeout=5) as resp: # nosec B310 local CI
if 200 <= resp.status < 300:
return True
except urllib.error.HTTPError as exc:
last_error = exc
if 400 <= exc.code < 500 and exc.code != 429:
# Treat permanent client errors (other than rate limit) as fatal
break
except Exception as exc: # pragma: no cover - network variability
last_error = exc
time.sleep(delay * attempt)
print(json.dumps({
"event": "ci_perf_error",
"stage": "startup",
"message": "Service health check failed",
"url": health_url,
"attempts": attempts,
"error": str(last_error) if last_error else None,
}))
return False
def run(cmd: list[str]) -> subprocess.CompletedProcess:
return subprocess.run(cmd, capture_output=True, text=True, check=False)
def main(argv: list[str]) -> int:
ap = argparse.ArgumentParser(description="Preview performance CI regression gate")
ap.add_argument("--url", default="http://localhost:8080", help="Base URL of running web service")
ap.add_argument("--baseline", type=Path, required=True, help="Path to committed warm baseline JSON")
ap.add_argument("--p95-threshold", type=float, default=5.0, help="Max allowed p95 regression percent (default: %(default)s)")
ap.add_argument("--candidate-output", type=Path, default=Path("logs/perf/theme_preview_ci_candidate.json"), help="Where to write candidate benchmark JSON")
ap.add_argument("--multi-pass", action="store_true", help="Run a 2-pass all-themes benchmark and compare warm pass only (optional enhancement)")
args = ap.parse_args(argv)
if not args.baseline.exists():
print(json.dumps({"event":"ci_perf_error","message":"Baseline not found","path":str(args.baseline)}))
return 3
if not _wait_for_service(args.url):
return 3
# Run candidate single-pass all-themes benchmark (no extra warm cycles to keep CI fast)
# If multi-pass requested, run two passes over all themes so second pass represents warmed steady-state.
passes = "2" if args.multi_pass else "1"
bench_cmd = [sys.executable, "-m", "code.scripts.preview_perf_benchmark", "--url", args.url, "--all", "--passes", passes, "--output", str(args.candidate_output)]
bench_proc = run(bench_cmd)
if bench_proc.returncode != 0:
print(json.dumps({"event":"ci_perf_error","stage":"benchmark","code":bench_proc.returncode,"stderr":bench_proc.stderr}))
return 3
print(bench_proc.stdout)
if not args.candidate_output.exists():
print(json.dumps({"event":"ci_perf_error","message":"Candidate output missing"}))
return 3
compare_cmd = [
sys.executable,
"-m","code.scripts.preview_perf_compare",
"--baseline", str(args.baseline),
"--candidate", str(args.candidate_output),
"--warm-only",
"--p95-threshold", str(args.p95_threshold),
]
cmp_proc = run(compare_cmd)
print(cmp_proc.stdout)
if cmp_proc.returncode == 2:
# Already printed JSON with failure status
return 2
if cmp_proc.returncode != 0:
print(json.dumps({"event":"ci_perf_error","stage":"compare","code":cmp_proc.returncode,"stderr":cmp_proc.stderr}))
return 3
return 0
if __name__ == "__main__": # pragma: no cover
raise SystemExit(main(sys.argv[1:]))

View file

@ -1,115 +0,0 @@
"""Compare two preview benchmark JSON result files and emit delta stats.
Usage:
python -m code.scripts.preview_perf_compare --baseline logs/perf/theme_preview_baseline_all_pass1_20250923.json --candidate logs/perf/new_run.json
Outputs JSON with percentage deltas for p50/p90/p95/avg (positive = regression/slower).
If multi-pass structures are present (combined & passes_results) those are included.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any, Dict
def load(path: Path) -> Dict[str, Any]:
data = json.loads(path.read_text(encoding="utf-8"))
# Multi-pass result may store stats under combined
if "combined" in data:
core = data["combined"].copy()
# Inject representative fields for uniform comparison
core["p50_ms"] = core.get("p50_ms") or data.get("p50_ms")
core["p90_ms"] = core.get("p90_ms") or data.get("p90_ms")
core["p95_ms"] = core.get("p95_ms") or data.get("p95_ms")
core["avg_ms"] = core.get("avg_ms") or data.get("avg_ms")
data["_core_stats"] = core
else:
data["_core_stats"] = {
k: data.get(k) for k in ("p50_ms", "p90_ms", "p95_ms", "avg_ms", "count")
}
return data
def pct_delta(new: float, old: float) -> float:
if old == 0:
return 0.0
return round(((new - old) / old) * 100.0, 2)
def compare(baseline: Dict[str, Any], candidate: Dict[str, Any]) -> Dict[str, Any]:
b = baseline["_core_stats"]
c = candidate["_core_stats"]
result = {"baseline_count": b.get("count"), "candidate_count": c.get("count")}
for k in ("p50_ms", "p90_ms", "p95_ms", "avg_ms"):
if b.get(k) is not None and c.get(k) is not None:
result[k] = {
"baseline": b[k],
"candidate": c[k],
"delta_pct": pct_delta(c[k], b[k]),
}
# If both have per-pass details include first and last pass p95/p50
if "passes_results" in baseline and "passes_results" in candidate:
result["passes"] = {
"baseline": {
"cold_p95": baseline.get("cold_pass_p95_ms"),
"warm_p95": baseline.get("warm_pass_p95_ms"),
"cold_p50": baseline.get("cold_pass_p50_ms"),
"warm_p50": baseline.get("warm_pass_p50_ms"),
},
"candidate": {
"cold_p95": candidate.get("cold_pass_p95_ms"),
"warm_p95": candidate.get("warm_pass_p95_ms"),
"cold_p50": candidate.get("cold_pass_p50_ms"),
"warm_p50": candidate.get("warm_pass_p50_ms"),
},
}
return result
def main(argv: list[str]) -> int:
ap = argparse.ArgumentParser(description="Compare two preview benchmark JSON result files")
ap.add_argument("--baseline", required=True, type=Path, help="Baseline JSON path")
ap.add_argument("--candidate", required=True, type=Path, help="Candidate JSON path")
ap.add_argument("--p95-threshold", type=float, default=None, help="Fail (exit 2) if p95 regression exceeds this percent (positive delta)")
ap.add_argument("--warm-only", action="store_true", help="When both results have passes, compare warm pass p95/p50 instead of combined/core")
args = ap.parse_args(argv)
if not args.baseline.exists():
raise SystemExit(f"Baseline not found: {args.baseline}")
if not args.candidate.exists():
raise SystemExit(f"Candidate not found: {args.candidate}")
baseline = load(args.baseline)
candidate = load(args.candidate)
# If warm-only requested and both have warm pass stats, override _core_stats before compare
if args.warm_only and "warm_pass_p95_ms" in baseline and "warm_pass_p95_ms" in candidate:
baseline["_core_stats"] = {
"p50_ms": baseline.get("warm_pass_p50_ms"),
"p90_ms": baseline.get("_core_stats", {}).get("p90_ms"), # p90 not tracked per-pass; retain combined
"p95_ms": baseline.get("warm_pass_p95_ms"),
"avg_ms": baseline.get("_core_stats", {}).get("avg_ms"),
"count": baseline.get("_core_stats", {}).get("count"),
}
candidate["_core_stats"] = {
"p50_ms": candidate.get("warm_pass_p50_ms"),
"p90_ms": candidate.get("_core_stats", {}).get("p90_ms"),
"p95_ms": candidate.get("warm_pass_p95_ms"),
"avg_ms": candidate.get("_core_stats", {}).get("avg_ms"),
"count": candidate.get("_core_stats", {}).get("count"),
}
cmp = compare(baseline, candidate)
payload = {"event": "preview_perf_compare", **cmp}
if args.p95_threshold is not None and "p95_ms" in cmp:
delta = cmp["p95_ms"]["delta_pct"]
payload["threshold"] = {"p95_threshold": args.p95_threshold, "p95_delta_pct": delta}
if delta is not None and delta > args.p95_threshold:
payload["result"] = "fail"
print(json.dumps(payload, indent=2)) # noqa: T201
return 2
payload["result"] = "pass"
print(json.dumps(payload, indent=2)) # noqa: T201
return 0
if __name__ == "__main__": # pragma: no cover
raise SystemExit(main(__import__('sys').argv[1:]))

View file

@ -1,91 +0,0 @@
"""Generate warm preview traffic to populate theme preview cache & metrics.
Usage:
python -m code.scripts.warm_preview_traffic --count 25 --repeats 2 \
--base-url http://localhost:8000 --delay 0.05
Requirements:
- FastAPI server running locally exposing /themes endpoints
- WEB_THEME_PICKER_DIAGNOSTICS=1 so /themes/metrics is accessible
Strategy:
1. Fetch /themes/fragment/list?limit=COUNT to obtain HTML table.
2. Extract theme slugs via regex on data-theme-id attributes.
3. Issue REPEATS preview fragment requests per slug in order.
4. Print simple timing / status summary.
This script intentionally uses stdlib only (urllib, re, time) to avoid extra deps.
"""
from __future__ import annotations
import argparse
import re
import time
import urllib.request
import urllib.error
from typing import List
LIST_PATH = "/themes/fragment/list"
PREVIEW_PATH = "/themes/fragment/preview/{slug}"
def fetch(url: str) -> str:
req = urllib.request.Request(url, headers={"User-Agent": "warm-preview/1"})
with urllib.request.urlopen(req, timeout=15) as resp: # nosec B310 (local trusted)
return resp.read().decode("utf-8", "replace")
def extract_slugs(html: str, limit: int) -> List[str]:
slugs = []
for m in re.finditer(r'data-theme-id="([^"]+)"', html):
s = m.group(1).strip()
if s and s not in slugs:
slugs.append(s)
if len(slugs) >= limit:
break
return slugs
def warm(base_url: str, count: int, repeats: int, delay: float) -> None:
list_url = f"{base_url}{LIST_PATH}?limit={count}&offset=0"
print(f"[warm] Fetching list: {list_url}")
try:
html = fetch(list_url)
except urllib.error.URLError as e: # pragma: no cover
raise SystemExit(f"Failed fetching list: {e}")
slugs = extract_slugs(html, count)
if not slugs:
raise SystemExit("No theme slugs extracted cannot warm.")
print(f"[warm] Extracted {len(slugs)} slugs: {', '.join(slugs[:8])}{'...' if len(slugs)>8 else ''}")
total_requests = 0
start = time.time()
for r in range(repeats):
print(f"[warm] Pass {r+1}/{repeats}")
for slug in slugs:
url = f"{base_url}{PREVIEW_PATH.format(slug=slug)}"
try:
fetch(url)
except Exception as e: # pragma: no cover
print(f" [warn] Failed {slug}: {e}")
else:
total_requests += 1
if delay:
time.sleep(delay)
dur = time.time() - start
print(f"[warm] Completed {total_requests} preview requests in {dur:.2f}s ({total_requests/dur if dur>0 else 0:.1f} rps)")
print("[warm] Done. Now run metrics snapshot to capture warm p95.")
def main(argv: list[str]) -> int:
ap = argparse.ArgumentParser(description="Generate warm preview traffic")
ap.add_argument("--base-url", default="http://localhost:8000", help="Base URL (default: %(default)s)")
ap.add_argument("--count", type=int, default=25, help="Number of distinct theme slugs to warm (default: %(default)s)")
ap.add_argument("--repeats", type=int, default=2, help="Repeat passes over slugs (default: %(default)s)")
ap.add_argument("--delay", type=float, default=0.05, help="Delay between requests in seconds (default: %(default)s)")
args = ap.parse_args(argv)
warm(args.base_url.rstrip("/"), args.count, args.repeats, args.delay)
return 0
if __name__ == "__main__": # pragma: no cover
import sys
raise SystemExit(main(sys.argv[1:]))

425
code/tagging/tag_index.py Normal file
View file

@ -0,0 +1,425 @@
"""Fast tag indexing for reverse lookups and bulk operations.
Provides a reverse index (tag cards) for efficient tag-based queries.
Typical queries complete in <1ms after index is built.
Usage:
# Build index from all_cards
index = TagIndex()
index.build()
# Query cards with specific tag
cards = index.get_cards_with_tag("ramp") # Returns set of card names
# Query cards with multiple tags (AND logic)
cards = index.get_cards_with_all_tags(["tokens", "sacrifice"])
# Query cards with any of several tags (OR logic)
cards = index.get_cards_with_any_tags(["lifegain", "lifelink"])
# Get tags for a specific card
tags = index.get_tags_for_card("Sol Ring")
"""
from __future__ import annotations
import json
import os
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Set, Optional
from code.logging_util import get_logger
from code.services.all_cards_loader import AllCardsLoader
logger = get_logger(__name__)
# Default cache path for persisted index
DEFAULT_CACHE_PATH = Path("card_files/.tag_index_metadata.json")
@dataclass
class IndexStats:
"""Statistics about the tag index."""
total_cards: int
total_tags: int
total_mappings: int
build_time_seconds: float
indexed_at: float # Unix timestamp
all_cards_mtime: float # Unix timestamp of source file
class TagIndex:
"""Fast reverse index for tag-based card queries.
Builds two indexes:
- tag set(card names) - Reverse index for fast tag queries
- card list(tags) - Forward index for card tag lookups
Performance:
- Index build: <5s for 50k cards
- Query time: <1ms per lookup
- Memory: ~50-100MB for 30k cards
"""
def __init__(self, cache_path: Optional[Path] = None):
"""Initialize empty tag index.
Args:
cache_path: Path to persist index (default: card_files/.tag_index_metadata.json)
"""
self._tag_to_cards: Dict[str, Set[str]] = {}
self._card_to_tags: Dict[str, List[str]] = {}
self._stats: Optional[IndexStats] = None
self._cache_path = cache_path or DEFAULT_CACHE_PATH
self._loader = AllCardsLoader()
def build(self, force_rebuild: bool = False) -> IndexStats:
"""Build the tag index from all_cards.
Loads all_cards and creates reverse index. If a cached index exists
and is up-to-date, loads from cache instead.
Args:
force_rebuild: If True, rebuild even if cache is valid
Returns:
IndexStats with build metrics
"""
# Check if we can use cached index
if not force_rebuild and self._try_load_from_cache():
logger.info(f"Loaded tag index from cache: {self._stats.total_cards} cards, {self._stats.total_tags} tags")
return self._stats
logger.info("Building tag index from all_cards...")
start_time = time.perf_counter()
# Load all cards
df = self._loader.load()
if "themeTags" not in df.columns:
logger.warning("themeTags column not found in all_cards")
self._stats = IndexStats(
total_cards=0,
total_tags=0,
total_mappings=0,
build_time_seconds=0,
indexed_at=time.time(),
all_cards_mtime=0
)
return self._stats
# Clear existing indexes
self._tag_to_cards.clear()
self._card_to_tags.clear()
# Build indexes
total_mappings = 0
for _, row in df.iterrows():
name = row.get("name")
if not name:
continue
tags = self._normalize_tags(row.get("themeTags", []))
if not tags:
continue
# Store forward mapping (card → tags)
self._card_to_tags[name] = tags
# Build reverse mapping (tag → cards)
for tag in tags:
if tag not in self._tag_to_cards:
self._tag_to_cards[tag] = set()
self._tag_to_cards[tag].add(name)
total_mappings += 1
build_time = time.perf_counter() - start_time
# Get all_cards mtime for cache validation
all_cards_mtime = 0
if os.path.exists(self._loader.file_path):
all_cards_mtime = os.path.getmtime(self._loader.file_path)
self._stats = IndexStats(
total_cards=len(self._card_to_tags),
total_tags=len(self._tag_to_cards),
total_mappings=total_mappings,
build_time_seconds=build_time,
indexed_at=time.time(),
all_cards_mtime=all_cards_mtime
)
logger.info(
f"Built tag index: {self._stats.total_cards} cards, "
f"{self._stats.total_tags} unique tags, "
f"{self._stats.total_mappings} mappings in {build_time:.2f}s"
)
# Save to cache
self._save_to_cache()
return self._stats
def _normalize_tags(self, tags: object) -> List[str]:
"""Normalize tags from various formats to list of strings.
Handles:
- List of strings/objects
- String representations like "['tag1', 'tag2']"
- Comma-separated strings
- Empty/None values
"""
if not tags:
return []
if isinstance(tags, list):
# Already a list - normalize to strings
return [str(t).strip() for t in tags if t and str(t).strip()]
if isinstance(tags, str):
# Handle empty or list repr
if not tags or tags == "[]":
return []
# Try parsing as list repr
if tags.startswith("["):
import ast
try:
parsed = ast.literal_eval(tags)
if isinstance(parsed, list):
return [str(t).strip() for t in parsed if t and str(t).strip()]
except (ValueError, SyntaxError):
pass
# Fall back to comma-separated
return [t.strip() for t in tags.split(",") if t.strip()]
return []
def get_cards_with_tag(self, tag: str) -> Set[str]:
"""Get all card names that have a specific tag.
Args:
tag: Theme tag to search for (case-sensitive)
Returns:
Set of card names with the tag (empty if tag not found)
Performance: O(1) lookup after index is built
"""
return self._tag_to_cards.get(tag, set()).copy()
def get_cards_with_all_tags(self, tags: List[str]) -> Set[str]:
"""Get cards that have ALL specified tags (AND logic).
Args:
tags: List of tags (card must have all of them)
Returns:
Set of card names with all tags (empty if no matches)
Performance: O(k) where k is number of tags
"""
if not tags:
return set()
# Start with cards for first tag
result = self.get_cards_with_tag(tags[0])
# Intersect with cards for each additional tag
for tag in tags[1:]:
result &= self.get_cards_with_tag(tag)
if not result:
# Short-circuit if no cards remain
break
return result
def get_cards_with_any_tags(self, tags: List[str]) -> Set[str]:
"""Get cards that have ANY of the specified tags (OR logic).
Args:
tags: List of tags (card needs at least one)
Returns:
Set of card names with at least one tag
Performance: O(k) where k is number of tags
"""
result: Set[str] = set()
for tag in tags:
result |= self.get_cards_with_tag(tag)
return result
def get_tags_for_card(self, card_name: str) -> List[str]:
"""Get all tags for a specific card.
Args:
card_name: Name of the card
Returns:
List of theme tags for the card (empty if not found)
Performance: O(1) lookup
"""
return self._card_to_tags.get(card_name, []).copy()
def get_all_tags(self) -> List[str]:
"""Get list of all tags in the index.
Returns:
Sorted list of all unique tags
"""
return sorted(self._tag_to_cards.keys())
def get_tag_stats(self, tag: str) -> Dict[str, int]:
"""Get statistics for a specific tag.
Args:
tag: Tag to get stats for
Returns:
Dict with 'card_count' key
"""
return {
"card_count": len(self._tag_to_cards.get(tag, set()))
}
def get_popular_tags(self, limit: int = 50) -> List[tuple[str, int]]:
"""Get most popular tags sorted by card count.
Args:
limit: Maximum number of tags to return
Returns:
List of (tag, card_count) tuples sorted by count descending
"""
tag_counts = [
(tag, len(cards))
for tag, cards in self._tag_to_cards.items()
]
tag_counts.sort(key=lambda x: x[1], reverse=True)
return tag_counts[:limit]
def _save_to_cache(self) -> None:
"""Save index to cache file."""
if not self._stats:
return
try:
cache_data = {
"stats": {
"total_cards": self._stats.total_cards,
"total_tags": self._stats.total_tags,
"total_mappings": self._stats.total_mappings,
"build_time_seconds": self._stats.build_time_seconds,
"indexed_at": self._stats.indexed_at,
"all_cards_mtime": self._stats.all_cards_mtime
},
"tag_to_cards": {
tag: list(cards)
for tag, cards in self._tag_to_cards.items()
},
"card_to_tags": self._card_to_tags
}
self._cache_path.parent.mkdir(parents=True, exist_ok=True)
with self._cache_path.open("w", encoding="utf-8") as f:
json.dump(cache_data, f, indent=2)
logger.debug(f"Saved tag index cache to {self._cache_path}")
except Exception as e:
logger.warning(f"Failed to save tag index cache: {e}")
def _try_load_from_cache(self) -> bool:
"""Try to load index from cache file.
Returns:
True if cache loaded successfully and is up-to-date
"""
if not self._cache_path.exists():
return False
try:
with self._cache_path.open("r", encoding="utf-8") as f:
cache_data = json.load(f)
# Check if cache is up-to-date
stats_data = cache_data.get("stats", {})
cached_mtime = stats_data.get("all_cards_mtime", 0)
current_mtime = 0
if os.path.exists(self._loader.file_path):
current_mtime = os.path.getmtime(self._loader.file_path)
if current_mtime > cached_mtime:
logger.debug("Tag index cache outdated (all_cards modified)")
return False
# Load indexes
self._tag_to_cards = {
tag: set(cards)
for tag, cards in cache_data.get("tag_to_cards", {}).items()
}
self._card_to_tags = cache_data.get("card_to_tags", {})
# Restore stats
self._stats = IndexStats(**stats_data)
return True
except Exception as e:
logger.warning(f"Failed to load tag index cache: {e}")
return False
def clear_cache(self) -> None:
"""Delete the cached index file."""
if self._cache_path.exists():
self._cache_path.unlink()
logger.debug(f"Deleted tag index cache: {self._cache_path}")
def get_stats(self) -> Optional[IndexStats]:
"""Get index statistics.
Returns:
IndexStats if index has been built, None otherwise
"""
return self._stats
# Global index instance
_global_index: Optional[TagIndex] = None
def get_tag_index(force_rebuild: bool = False) -> TagIndex:
"""Get or create the global tag index.
Lazy-loads the index on first access. Subsequent calls return
the cached instance.
Args:
force_rebuild: If True, rebuild the index even if cached
Returns:
Global TagIndex instance
"""
global _global_index
if _global_index is None or force_rebuild:
_global_index = TagIndex()
_global_index.build(force_rebuild=force_rebuild)
elif _global_index._stats is None:
# Index exists but hasn't been built yet
_global_index.build()
return _global_index
def clear_global_index() -> None:
"""Clear the global tag index instance."""
global _global_index
if _global_index:
_global_index.clear_cache()
_global_index = None

229
code/tagging/tag_loader.py Normal file
View file

@ -0,0 +1,229 @@
"""Efficient tag loading using consolidated all_cards file.
Provides batch tag loading functions that leverage the all_cards.parquet file
instead of reading individual card CSV files. This is 10-50x faster for bulk
operations like deck building.
Usage:
# Load tags for multiple cards at once
tags_dict = load_tags_for_cards(["Sol Ring", "Lightning Bolt", "Counterspell"])
# Returns: {"Sol Ring": ["artifacts"], "Lightning Bolt": ["burn"], ...}
# Load tags for a single card
tags = load_tags_for_card("Sol Ring")
# Returns: ["artifacts", "ramp"]
"""
from __future__ import annotations
import os
from typing import Dict, List, Optional
from code.logging_util import get_logger
from code.services.all_cards_loader import AllCardsLoader
logger = get_logger(__name__)
# Global loader instance for caching
_loader_instance: Optional[AllCardsLoader] = None
def _get_loader() -> AllCardsLoader:
"""Get or create the global AllCardsLoader instance."""
global _loader_instance
if _loader_instance is None:
_loader_instance = AllCardsLoader()
return _loader_instance
def clear_cache() -> None:
"""Clear the cached all_cards data (useful after updates)."""
global _loader_instance
_loader_instance = None
def load_tags_for_cards(card_names: List[str]) -> Dict[str, List[str]]:
"""Load theme tags for multiple cards in one batch operation.
This is much faster than loading tags for each card individually,
especially when dealing with 50+ cards (typical deck size).
Args:
card_names: List of card names to load tags for
Returns:
Dictionary mapping card name to list of theme tags.
Cards not found or without tags will have empty list.
Example:
>>> tags = load_tags_for_cards(["Sol Ring", "Lightning Bolt"])
>>> tags["Sol Ring"]
["artifacts", "ramp"]
"""
if not card_names:
return {}
loader = _get_loader()
try:
# Batch lookup - single query for all cards
df = loader.get_by_names(card_names)
if df.empty:
logger.debug(f"No cards found for {len(card_names)} names")
return {name: [] for name in card_names}
# Extract tags from DataFrame
result: Dict[str, List[str]] = {}
if "themeTags" not in df.columns:
logger.warning("themeTags column not found in all_cards")
return {name: [] for name in card_names}
# Build lookup dictionary
for _, row in df.iterrows():
name = row.get("name")
if not name:
continue
tags = row.get("themeTags", [])
# Handle different themeTags formats
if isinstance(tags, list):
# Already a list - use directly
result[name] = [str(t).strip() for t in tags if t]
elif isinstance(tags, str):
# String format - could be comma-separated or list repr
if not tags or tags == "[]":
result[name] = []
elif tags.startswith("["):
# List representation like "['tag1', 'tag2']"
import ast
try:
parsed = ast.literal_eval(tags)
if isinstance(parsed, list):
result[name] = [str(t).strip() for t in parsed if t]
else:
result[name] = []
except (ValueError, SyntaxError):
# Fallback to comma split
result[name] = [t.strip() for t in tags.split(",") if t.strip()]
else:
# Comma-separated tags
result[name] = [t.strip() for t in tags.split(",") if t.strip()]
else:
result[name] = []
# Fill in missing cards with empty lists
for name in card_names:
if name not in result:
result[name] = []
return result
except FileNotFoundError:
logger.warning("all_cards file not found, returning empty tags")
return {name: [] for name in card_names}
except Exception as e:
logger.error(f"Error loading tags for cards: {e}")
return {name: [] for name in card_names}
def load_tags_for_card(card_name: str) -> List[str]:
"""Load theme tags for a single card.
For loading tags for multiple cards, use load_tags_for_cards() instead
for better performance.
Args:
card_name: Name of the card
Returns:
List of theme tags for the card (empty if not found)
Example:
>>> tags = load_tags_for_card("Sol Ring")
>>> "artifacts" in tags
True
"""
result = load_tags_for_cards([card_name])
return result.get(card_name, [])
def get_cards_with_tag(tag: str, limit: Optional[int] = None) -> List[str]:
"""Get all card names that have a specific tag.
Args:
tag: Theme tag to search for
limit: Maximum number of cards to return (None = no limit)
Returns:
List of card names with the tag
Example:
>>> cards = get_cards_with_tag("ramp", limit=10)
>>> len(cards) <= 10
True
"""
loader = _get_loader()
try:
df = loader.filter_by_themes([tag], mode="any")
if "name" not in df.columns:
return []
cards = df["name"].tolist()
if limit is not None and len(cards) > limit:
return cards[:limit]
return cards
except Exception as e:
logger.error(f"Error getting cards with tag '{tag}': {e}")
return []
def get_cards_with_all_tags(tags: List[str], limit: Optional[int] = None) -> List[str]:
"""Get all card names that have ALL of the specified tags.
Args:
tags: List of theme tags (card must have all of them)
limit: Maximum number of cards to return (None = no limit)
Returns:
List of card names with all specified tags
Example:
>>> cards = get_cards_with_all_tags(["ramp", "artifacts"])
>>> # Returns cards that have both ramp AND artifacts tags
"""
loader = _get_loader()
try:
df = loader.filter_by_themes(tags, mode="all")
if "name" not in df.columns:
return []
cards = df["name"].tolist()
if limit is not None and len(cards) > limit:
return cards[:limit]
return cards
except Exception as e:
logger.error(f"Error getting cards with all tags {tags}: {e}")
return []
def is_use_all_cards_enabled() -> bool:
"""Check if all_cards-based tag loading is enabled.
Returns:
True if USE_ALL_CARDS_FOR_TAGS is enabled (default: True)
"""
# Check environment variable
env_value = os.environ.get("USE_ALL_CARDS_FOR_TAGS", "true").lower()
return env_value in ("1", "true", "yes", "on")

View file

@ -0,0 +1,602 @@
"""Consolidated theme metadata enrichment pipeline.
Replaces 7 separate subprocess scripts with single efficient in-memory pipeline:
1. autofill_min_examples - Add placeholder examples
2. pad_min_examples - Pad to minimum threshold
3. cleanup_placeholder_examples - Remove placeholders when real examples added
4. purge_anchor_placeholders - Purge legacy anchor placeholders
5. augment_theme_yaml_from_catalog - Add descriptions/popularity from catalog
6. generate_theme_editorial_suggestions - Generate editorial suggestions
7. lint_theme_editorial - Validate metadata
Performance improvement: 5-10x faster by loading all YAMLs once, processing in memory,
writing once at the end.
"""
from __future__ import annotations
import json
import re
import string
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set
try:
import yaml # type: ignore
except ImportError: # pragma: no cover
yaml = None
@dataclass
class ThemeData:
"""In-memory representation of a theme YAML file."""
path: Path
data: Dict[str, Any]
modified: bool = False
@dataclass
class EnrichmentStats:
"""Statistics for enrichment pipeline run."""
autofilled: int = 0
padded: int = 0
cleaned: int = 0
purged: int = 0
augmented: int = 0
suggestions_added: int = 0
lint_errors: int = 0
lint_warnings: int = 0
total_themes: int = 0
def __str__(self) -> str:
return (
f"Enrichment complete: {self.total_themes} themes processed | "
f"autofilled:{self.autofilled} padded:{self.padded} cleaned:{self.cleaned} "
f"purged:{self.purged} augmented:{self.augmented} suggestions:{self.suggestions_added} | "
f"lint: {self.lint_errors} errors, {self.lint_warnings} warnings"
)
class ThemeEnrichmentPipeline:
"""Consolidated theme metadata enrichment pipeline."""
def __init__(
self,
root: Optional[Path] = None,
min_examples: int = 5,
progress_callback: Optional[Callable[[str], None]] = None,
):
"""Initialize the enrichment pipeline.
Args:
root: Project root directory (defaults to auto-detect)
min_examples: Minimum number of example commanders required
progress_callback: Optional callback for progress updates (for web UI)
"""
if root is None:
# Auto-detect root (3 levels up from this file)
root = Path(__file__).resolve().parents[2]
self.root = root
self.catalog_dir = root / 'config' / 'themes' / 'catalog'
self.theme_json = root / 'config' / 'themes' / 'theme_list.json'
self.csv_dir = root / 'csv_files'
self.min_examples = min_examples
self.progress_callback = progress_callback
self.themes: Dict[Path, ThemeData] = {}
self.stats = EnrichmentStats()
# Cached data
self._catalog_map: Optional[Dict[str, Dict[str, Any]]] = None
self._card_suggestions: Optional[Dict[str, Any]] = None
def _emit(self, message: str) -> None:
"""Emit progress message via callback or print."""
if self.progress_callback:
try:
self.progress_callback(message)
except Exception:
pass
else:
print(message, flush=True)
def load_all_themes(self) -> None:
"""Load all theme YAML files into memory (Step 0)."""
if not self.catalog_dir.exists():
self._emit("Warning: Catalog directory does not exist")
return
paths = sorted(self.catalog_dir.glob('*.yml'))
self.stats.total_themes = len(paths)
for path in paths:
try:
if yaml is None:
raise RuntimeError("PyYAML not installed")
data = yaml.safe_load(path.read_text(encoding='utf-8'))
if isinstance(data, dict):
self.themes[path] = ThemeData(path=path, data=data)
except Exception as e:
self._emit(f"Warning: Failed to load {path.name}: {e}")
self._emit(f"Loaded {len(self.themes)} theme files")
def _is_deprecated_alias(self, theme_data: Dict[str, Any]) -> bool:
"""Check if theme is a deprecated alias placeholder."""
notes = theme_data.get('notes')
return isinstance(notes, str) and 'Deprecated alias file' in notes
def _is_placeholder(self, entry: str) -> bool:
"""Check if an example entry is a placeholder.
Matches:
- "Theme Anchor"
- "Theme Anchor B"
- "Theme Anchor C"
etc.
"""
pattern = re.compile(r" Anchor( [A-Z])?$")
return bool(pattern.search(entry))
# Step 1: Autofill minimal placeholders
def autofill_placeholders(self) -> None:
"""Add placeholder examples for themes with zero examples."""
for theme in self.themes.values():
data = theme.data
if self._is_deprecated_alias(data):
continue
if not data.get('display_name'):
continue
# Skip if theme already has real (non-placeholder) examples in YAML
examples = data.get('example_commanders') or []
if isinstance(examples, list) and examples:
# Check if any examples are real (not " Anchor" placeholders)
has_real_examples = any(
isinstance(ex, str) and ex and not ex.endswith(' Anchor')
for ex in examples
)
if has_real_examples:
continue # Already has real examples, skip placeholder generation
# If only placeholders, continue to avoid overwriting
display = data['display_name']
synergies = data.get('synergies') or []
if not isinstance(synergies, list):
synergies = []
# Generate placeholders from display name + synergies
placeholders = [f"{display} Anchor"]
for s in synergies[:2]: # First 2 synergies
if isinstance(s, str) and s and s != display:
placeholders.append(f"{s} Anchor")
data['example_commanders'] = placeholders
if not data.get('editorial_quality'):
data['editorial_quality'] = 'draft'
theme.modified = True
self.stats.autofilled += 1
# Step 2: Pad to minimum examples
def pad_examples(self) -> None:
"""Pad example lists to minimum threshold with placeholders."""
for theme in self.themes.values():
data = theme.data
if self._is_deprecated_alias(data):
continue
if not data.get('display_name'):
continue
examples = data.get('example_commanders') or []
if not isinstance(examples, list):
continue
if len(examples) >= self.min_examples:
continue
# Only pad pure placeholder sets (heuristic: don't mix real + placeholders)
if any(not self._is_placeholder(e) for e in examples):
continue
display = data['display_name']
synergies = data.get('synergies') if isinstance(data.get('synergies'), list) else []
need = self.min_examples - len(examples)
# Build additional placeholders
new_placeholders = []
used = set(examples)
# 1. Additional synergies beyond first 2
for syn in synergies[2:]:
cand = f"{syn} Anchor"
if cand not in used and syn != display:
new_placeholders.append(cand)
if len(new_placeholders) >= need:
break
# 2. Generic letter suffixes (B, C, D, ...)
if len(new_placeholders) < need:
for suffix in string.ascii_uppercase[1:]: # Start from 'B'
cand = f"{display} Anchor {suffix}"
if cand not in used:
new_placeholders.append(cand)
if len(new_placeholders) >= need:
break
if new_placeholders:
data['example_commanders'] = examples + new_placeholders
if not data.get('editorial_quality'):
data['editorial_quality'] = 'draft'
theme.modified = True
self.stats.padded += 1
# Step 3: Cleanup placeholders when real examples exist
def cleanup_placeholders(self) -> None:
"""Remove placeholders when real examples have been added."""
for theme in self.themes.values():
data = theme.data
if self._is_deprecated_alias(data):
continue
if not data.get('display_name'):
continue
examples = data.get('example_commanders')
if not isinstance(examples, list) or not examples:
continue
placeholders = [e for e in examples if isinstance(e, str) and self._is_placeholder(e)]
real = [e for e in examples if isinstance(e, str) and not self._is_placeholder(e)]
# Only cleanup if we have both placeholders AND real examples
if placeholders and real:
new_list = real if real else placeholders[:1] # Keep at least one if all placeholders
if new_list != examples:
data['example_commanders'] = new_list
theme.modified = True
self.stats.cleaned += 1
# Step 4: Purge legacy anchor placeholders
def purge_anchors(self) -> None:
"""Remove all legacy anchor placeholders."""
pattern = re.compile(r" Anchor( [A-Z])?$")
for theme in self.themes.values():
data = theme.data
examples = data.get('example_commanders')
if not isinstance(examples, list) or not examples:
continue
placeholders = [e for e in examples if isinstance(e, str) and pattern.search(e)]
if not placeholders:
continue
real = [e for e in examples if isinstance(e, str) and not pattern.search(e)]
new_list = real # Remove ALL placeholders (even if list becomes empty)
if new_list != examples:
data['example_commanders'] = new_list
theme.modified = True
self.stats.purged += 1
# Step 5: Augment from catalog
def _load_catalog_map(self) -> Dict[str, Dict[str, Any]]:
"""Load theme_list.json catalog into memory."""
if self._catalog_map is not None:
return self._catalog_map
if not self.theme_json.exists():
self._emit("Warning: theme_list.json not found")
self._catalog_map = {}
return self._catalog_map
try:
data = json.loads(self.theme_json.read_text(encoding='utf-8') or '{}')
themes = data.get('themes') or []
self._catalog_map = {}
for t in themes:
if isinstance(t, dict) and t.get('theme'):
self._catalog_map[str(t['theme'])] = t
except Exception as e:
self._emit(f"Warning: Failed to parse theme_list.json: {e}")
self._catalog_map = {}
return self._catalog_map
def augment_from_catalog(self) -> None:
"""Add description, popularity, etc. from theme_list.json."""
catalog_map = self._load_catalog_map()
if not catalog_map:
return
for theme in self.themes.values():
data = theme.data
if self._is_deprecated_alias(data):
continue
name = str(data.get('display_name') or '').strip()
if not name:
continue
cat_entry = catalog_map.get(name)
if not cat_entry:
continue
modified = False
# Add description if missing
if 'description' not in data and 'description' in cat_entry and cat_entry['description']:
data['description'] = cat_entry['description']
modified = True
# Add popularity bucket if missing
if 'popularity_bucket' not in data and cat_entry.get('popularity_bucket'):
data['popularity_bucket'] = cat_entry['popularity_bucket']
modified = True
# Add popularity hint if missing
if 'popularity_hint' not in data and cat_entry.get('popularity_hint'):
data['popularity_hint'] = cat_entry['popularity_hint']
modified = True
# Backfill deck archetype if missing (defensive)
if 'deck_archetype' not in data and cat_entry.get('deck_archetype'):
data['deck_archetype'] = cat_entry['deck_archetype']
modified = True
if modified:
theme.modified = True
self.stats.augmented += 1
# Step 6: Generate editorial suggestions (simplified - full implementation would scan CSVs)
def generate_suggestions(self) -> None:
"""Generate editorial suggestions for missing example_cards/commanders.
This runs the generate_theme_editorial_suggestions.py script to populate
example_cards and example_commanders from CSV data (EDHREC ranks + themeTags).
"""
import os
import subprocess
# Check if we should run the editorial suggestions generator
skip_suggestions = os.environ.get('SKIP_EDITORIAL_SUGGESTIONS', '').lower() in ('1', 'true', 'yes')
if skip_suggestions:
self._emit("Skipping editorial suggestions generation (SKIP_EDITORIAL_SUGGESTIONS=1)")
return
script_path = self.root / 'code' / 'scripts' / 'generate_theme_editorial_suggestions.py'
if not script_path.exists():
self._emit("Editorial suggestions script not found; skipping")
return
try:
self._emit("Generating example_cards and example_commanders from CSV data...")
# Run with --apply to write missing fields, limit to reasonable batch
result = subprocess.run(
[sys.executable, str(script_path), '--apply', '--limit-yaml', '1000', '--top', '8'],
capture_output=True,
text=True,
timeout=300, # 5 minute timeout
cwd=str(self.root)
)
if result.returncode == 0:
# Reload themes to pick up the generated examples
self.load_all_themes()
self._emit("Editorial suggestions generated successfully")
else:
self._emit(f"Editorial suggestions script failed (exit {result.returncode}): {result.stderr[:200]}")
except subprocess.TimeoutExpired:
self._emit("Editorial suggestions generation timed out (skipping)")
except Exception as e:
self._emit(f"Failed to generate editorial suggestions: {e}")
# Step 7: Lint/validate
ALLOWED_ARCHETYPES: Set[str] = {
'Lands', 'Graveyard', 'Planeswalkers', 'Tokens', 'Counters', 'Spells',
'Artifacts', 'Enchantments', 'Politics', 'Combo', 'Aggro', 'Control',
'Midrange', 'Stax', 'Ramp', 'Toolbox'
}
CORNERSTONE: Set[str] = {
'Landfall', 'Reanimate', 'Superfriends', 'Tokens Matter', '+1/+1 Counters'
}
def validate(self, enforce_min: bool = False, strict: bool = False) -> None:
"""Validate theme metadata (lint)."""
errors: List[str] = []
warnings: List[str] = []
seen_display: Set[str] = set()
for theme in self.themes.values():
data = theme.data
if self._is_deprecated_alias(data):
continue
name = str(data.get('display_name') or '').strip()
if not name:
continue
if name in seen_display:
continue # Skip duplicates
seen_display.add(name)
ex_cmd = data.get('example_commanders') or []
ex_cards = data.get('example_cards') or []
if not isinstance(ex_cmd, list):
errors.append(f"{name}: example_commanders not a list")
ex_cmd = []
if not isinstance(ex_cards, list):
errors.append(f"{name}: example_cards not a list")
ex_cards = []
# Length checks
if len(ex_cmd) > 12:
warnings.append(f"{name}: example_commanders has {len(ex_cmd)} entries (>12)")
if len(ex_cards) > 20:
warnings.append(f"{name}: example_cards has {len(ex_cards)} entries (>20)")
# Minimum examples check
if ex_cmd and len(ex_cmd) < self.min_examples:
msg = f"{name}: only {len(ex_cmd)} example_commanders (<{self.min_examples} minimum)"
if enforce_min:
errors.append(msg)
else:
warnings.append(msg)
# Cornerstone themes should have examples (if strict)
if strict and name in self.CORNERSTONE:
if not ex_cmd:
errors.append(f"{name}: cornerstone theme missing example_commanders")
if not ex_cards:
errors.append(f"{name}: cornerstone theme missing example_cards")
# Deck archetype validation
archetype = data.get('deck_archetype')
if archetype and archetype not in self.ALLOWED_ARCHETYPES:
warnings.append(f"{name}: unknown deck_archetype '{archetype}'")
self.stats.lint_errors = len(errors)
self.stats.lint_warnings = len(warnings)
if errors:
for err in errors:
self._emit(f"ERROR: {err}")
if warnings:
for warn in warnings:
self._emit(f"WARNING: {warn}")
def write_all_themes(self) -> None:
"""Write all modified themes back to disk (final step)."""
if yaml is None:
raise RuntimeError("PyYAML not installed; cannot write themes")
written = 0
for theme in self.themes.values():
if theme.modified:
try:
theme.path.write_text(
yaml.safe_dump(theme.data, sort_keys=False, allow_unicode=True),
encoding='utf-8'
)
written += 1
except Exception as e:
self._emit(f"Error writing {theme.path.name}: {e}")
self._emit(f"Wrote {written} modified theme files")
def run_all(
self,
write: bool = True,
enforce_min: bool = False,
strict_lint: bool = False,
run_purge: bool = False,
) -> EnrichmentStats:
"""Run the full enrichment pipeline.
Args:
write: Whether to write changes to disk (False = dry run)
enforce_min: Whether to treat min_examples violations as errors
strict_lint: Whether to enforce strict validation rules
run_purge: Whether to run purge step (removes ALL anchor placeholders)
Returns:
EnrichmentStats with summary of operations
"""
self._emit("Starting theme enrichment pipeline...")
# Step 0: Load all themes
self.load_all_themes()
# Step 1: Autofill placeholders
self._emit("Step 1/7: Autofilling placeholders...")
self.autofill_placeholders()
# Step 2: Pad to minimum
self._emit("Step 2/7: Padding to minimum examples...")
self.pad_examples()
# Step 3: Cleanup mixed placeholder/real lists
self._emit("Step 3/7: Cleaning up placeholders...")
self.cleanup_placeholders()
# Step 4: Purge all anchor placeholders (optional - disabled by default)
# Note: Purge removes ALL anchors, even from pure placeholder lists.
# Only enable for one-time migration away from placeholder system.
if run_purge:
self._emit("Step 4/7: Purging legacy anchors...")
self.purge_anchors()
else:
self._emit("Step 4/7: Skipping purge (preserving placeholders)...")
# Step 5: Augment from catalog
self._emit("Step 5/7: Augmenting from catalog...")
self.augment_from_catalog()
# Step 6: Generate suggestions (skipped for performance)
self._emit("Step 6/7: Generating suggestions...")
self.generate_suggestions()
# Step 7: Validate
self._emit("Step 7/7: Validating metadata...")
self.validate(enforce_min=enforce_min, strict=strict_lint)
# Write changes
if write:
self._emit("Writing changes to disk...")
self.write_all_themes()
else:
self._emit("Dry run: no files written")
self._emit(str(self.stats))
return self.stats
def run_enrichment_pipeline(
root: Optional[Path] = None,
min_examples: int = 5,
write: bool = True,
enforce_min: bool = False,
strict: bool = False,
run_purge: bool = False,
progress_callback: Optional[Callable[[str], None]] = None,
) -> EnrichmentStats:
"""Convenience function to run the enrichment pipeline.
Args:
root: Project root directory
min_examples: Minimum number of example commanders
write: Whether to write changes (False = dry run)
enforce_min: Treat min examples violations as errors
strict: Enforce strict validation rules
run_purge: Whether to run purge step (removes ALL placeholders)
progress_callback: Optional progress callback
Returns:
EnrichmentStats summary
"""
pipeline = ThemeEnrichmentPipeline(
root=root,
min_examples=min_examples,
progress_callback=progress_callback,
)
return pipeline.run_all(
write=write,
enforce_min=enforce_min,
strict_lint=strict,
run_purge=run_purge
)

View file

@ -0,0 +1,429 @@
"""Tests for tag index functionality."""
import json
import time
from code.tagging.tag_index import (
TagIndex,
IndexStats,
get_tag_index,
clear_global_index,
)
class TestTagIndexBuild:
"""Test index building operations."""
def test_build_index(self):
"""Test that index builds successfully."""
index = TagIndex()
stats = index.build()
assert isinstance(stats, IndexStats)
assert stats.total_cards > 0
assert stats.total_tags > 0
assert stats.total_mappings > 0
assert stats.build_time_seconds >= 0
def test_build_index_performance(self):
"""Test that index builds in reasonable time."""
index = TagIndex()
start = time.perf_counter()
stats = index.build()
elapsed = time.perf_counter() - start
# Should build in <5s for typical dataset
assert elapsed < 5.0
assert stats.build_time_seconds < 5.0
def test_force_rebuild(self):
"""Test that force_rebuild always rebuilds."""
index = TagIndex()
# Build once
stats1 = index.build()
time1 = stats1.indexed_at
# Wait a bit
time.sleep(0.1)
# Force rebuild
stats2 = index.build(force_rebuild=True)
time2 = stats2.indexed_at
# Should have different timestamps
assert time2 > time1
class TestSingleTagQueries:
"""Test single tag lookup operations."""
def test_get_cards_with_tag(self):
"""Test getting cards with a specific tag."""
index = TagIndex()
index.build()
# Get a tag that exists
all_tags = index.get_all_tags()
if all_tags:
tag = all_tags[0]
cards = index.get_cards_with_tag(tag)
assert isinstance(cards, set)
assert len(cards) > 0
def test_get_cards_with_nonexistent_tag(self):
"""Test querying for tag that doesn't exist."""
index = TagIndex()
index.build()
cards = index.get_cards_with_tag("ThisTagDoesNotExist12345")
assert cards == set()
def test_get_tags_for_card(self):
"""Test getting tags for a specific card."""
index = TagIndex()
index.build()
# Get a card that exists
cards = index.get_cards_with_tag(index.get_all_tags()[0]) if index.get_all_tags() else set()
if cards:
card_name = list(cards)[0]
tags = index.get_tags_for_card(card_name)
assert isinstance(tags, list)
assert len(tags) > 0
def test_get_tags_for_nonexistent_card(self):
"""Test getting tags for card that doesn't exist."""
index = TagIndex()
index.build()
tags = index.get_tags_for_card("This Card Does Not Exist 12345")
assert tags == []
class TestMultiTagQueries:
"""Test queries with multiple tags."""
def test_get_cards_with_all_tags(self):
"""Test AND logic (cards must have all tags)."""
index = TagIndex()
index.build()
all_tags = index.get_all_tags()
if len(all_tags) >= 2:
# Pick two tags
tag1, tag2 = all_tags[0], all_tags[1]
cards1 = index.get_cards_with_tag(tag1)
cards2 = index.get_cards_with_tag(tag2)
cards_both = index.get_cards_with_all_tags([tag1, tag2])
# Result should be subset of both
assert cards_both.issubset(cards1)
assert cards_both.issubset(cards2)
# Result should be intersection
assert cards_both == (cards1 & cards2)
def test_get_cards_with_any_tags(self):
"""Test OR logic (cards need at least one tag)."""
index = TagIndex()
index.build()
all_tags = index.get_all_tags()
if len(all_tags) >= 2:
# Pick two tags
tag1, tag2 = all_tags[0], all_tags[1]
cards1 = index.get_cards_with_tag(tag1)
cards2 = index.get_cards_with_tag(tag2)
cards_any = index.get_cards_with_any_tags([tag1, tag2])
# Result should be superset of both
assert cards1.issubset(cards_any)
assert cards2.issubset(cards_any)
# Result should be union
assert cards_any == (cards1 | cards2)
def test_get_cards_with_empty_tag_list(self):
"""Test querying with empty tag list."""
index = TagIndex()
index.build()
cards_all = index.get_cards_with_all_tags([])
cards_any = index.get_cards_with_any_tags([])
assert cards_all == set()
assert cards_any == set()
def test_get_cards_with_nonexistent_tags(self):
"""Test querying with tags that don't exist."""
index = TagIndex()
index.build()
fake_tags = ["FakeTag1", "FakeTag2"]
cards_all = index.get_cards_with_all_tags(fake_tags)
cards_any = index.get_cards_with_any_tags(fake_tags)
assert cards_all == set()
assert cards_any == set()
class TestIndexStats:
"""Test index statistics and metadata."""
def test_get_stats(self):
"""Test getting index statistics."""
index = TagIndex()
# Before building
assert index.get_stats() is None
# After building
stats = index.build()
retrieved_stats = index.get_stats()
assert retrieved_stats is not None
assert retrieved_stats.total_cards == stats.total_cards
assert retrieved_stats.total_tags == stats.total_tags
def test_get_all_tags(self):
"""Test getting list of all tags."""
index = TagIndex()
index.build()
tags = index.get_all_tags()
assert isinstance(tags, list)
assert len(tags) > 0
# Should be sorted
assert tags == sorted(tags)
def test_get_tag_stats(self):
"""Test getting stats for specific tag."""
index = TagIndex()
index.build()
all_tags = index.get_all_tags()
if all_tags:
tag = all_tags[0]
stats = index.get_tag_stats(tag)
assert "card_count" in stats
assert stats["card_count"] > 0
def test_get_popular_tags(self):
"""Test getting most popular tags."""
index = TagIndex()
index.build()
popular = index.get_popular_tags(limit=10)
assert isinstance(popular, list)
assert len(popular) <= 10
if len(popular) > 1:
# Should be sorted by count descending
counts = [count for _, count in popular]
assert counts == sorted(counts, reverse=True)
class TestCaching:
"""Test index caching and persistence."""
def test_save_and_load_cache(self, tmp_path):
"""Test that cache saves and loads correctly."""
cache_path = tmp_path / ".tag_index_test.json"
# Build and save
index1 = TagIndex(cache_path=cache_path)
stats1 = index1.build()
assert cache_path.exists()
# Load from cache
index2 = TagIndex(cache_path=cache_path)
stats2 = index2.build() # Should load from cache
# Should have same data
assert stats2.total_cards == stats1.total_cards
assert stats2.total_tags == stats1.total_tags
assert stats2.indexed_at == stats1.indexed_at
def test_cache_invalidation(self, tmp_path):
"""Test that cache is rebuilt when all_cards changes."""
cache_path = tmp_path / ".tag_index_test.json"
# Build index
index = TagIndex(cache_path=cache_path)
stats1 = index.build()
# Modify cache to simulate outdated mtime
with cache_path.open("r") as f:
cache_data = json.load(f)
cache_data["stats"]["all_cards_mtime"] = 0 # Very old
with cache_path.open("w") as f:
json.dump(cache_data, f)
# Should rebuild (not use cache)
index2 = TagIndex(cache_path=cache_path)
stats2 = index2.build()
# Should have new timestamp
assert stats2.indexed_at > stats1.indexed_at
def test_clear_cache(self, tmp_path):
"""Test cache clearing."""
cache_path = tmp_path / ".tag_index_test.json"
index = TagIndex(cache_path=cache_path)
index.build()
assert cache_path.exists()
index.clear_cache()
assert not cache_path.exists()
class TestGlobalIndex:
"""Test global index accessor."""
def test_get_tag_index(self):
"""Test getting global index."""
clear_global_index()
index = get_tag_index()
assert isinstance(index, TagIndex)
assert index.get_stats() is not None
def test_get_tag_index_singleton(self):
"""Test that global index is a singleton."""
clear_global_index()
index1 = get_tag_index()
index2 = get_tag_index()
# Should be same instance
assert index1 is index2
def test_clear_global_index(self):
"""Test clearing global index."""
index1 = get_tag_index()
clear_global_index()
index2 = get_tag_index()
# Should be different instance
assert index1 is not index2
class TestEdgeCases:
"""Test edge cases and error handling."""
def test_cards_with_no_tags(self):
"""Test that cards without tags are handled."""
index = TagIndex()
index.build()
# Get stats - should handle cards with no tags gracefully
stats = index.get_stats()
assert stats is not None
def test_special_characters_in_tags(self):
"""Test tags with special characters."""
index = TagIndex()
index.build()
# Try querying with special chars (should not crash)
cards = index.get_cards_with_tag("Life & Death")
assert isinstance(cards, set)
def test_case_sensitive_tags(self):
"""Test that tag lookups are case-sensitive."""
index = TagIndex()
index.build()
all_tags = index.get_all_tags()
if all_tags:
tag = all_tags[0]
cards1 = index.get_cards_with_tag(tag)
cards2 = index.get_cards_with_tag(tag.upper())
cards3 = index.get_cards_with_tag(tag.lower())
# Case matters - may get different results
# (depends on tag naming in data)
assert isinstance(cards1, set)
assert isinstance(cards2, set)
assert isinstance(cards3, set)
def test_duplicate_tags_handled(self):
"""Test that duplicate tags in query are handled."""
index = TagIndex()
index.build()
all_tags = index.get_all_tags()
if all_tags:
tag = all_tags[0]
# Query with duplicate tag
cards = index.get_cards_with_all_tags([tag, tag])
cards_single = index.get_cards_with_tag(tag)
# Should give same result as single tag
assert cards == cards_single
class TestPerformance:
"""Test performance characteristics."""
def test_query_performance(self):
"""Test that queries complete quickly."""
index = TagIndex()
index.build()
all_tags = index.get_all_tags()
if all_tags:
tag = all_tags[0]
# Measure query time
start = time.perf_counter()
for _ in range(100):
index.get_cards_with_tag(tag)
elapsed = time.perf_counter() - start
avg_time_ms = (elapsed / 100) * 1000
# Should average <1ms per query
assert avg_time_ms < 1.0
def test_multi_tag_query_performance(self):
"""Test multi-tag query performance."""
index = TagIndex()
index.build()
all_tags = index.get_all_tags()
if len(all_tags) >= 3:
tags = all_tags[:3]
# Measure query time
start = time.perf_counter()
for _ in range(100):
index.get_cards_with_all_tags(tags)
elapsed = time.perf_counter() - start
avg_time_ms = (elapsed / 100) * 1000
# Should still be very fast
assert avg_time_ms < 5.0

View file

@ -0,0 +1,259 @@
"""Tests for batch tag loading from all_cards."""
from code.tagging.tag_loader import (
load_tags_for_cards,
load_tags_for_card,
get_cards_with_tag,
get_cards_with_all_tags,
clear_cache,
is_use_all_cards_enabled,
)
class TestBatchTagLoading:
"""Test batch tag loading operations."""
def test_load_tags_for_multiple_cards(self):
"""Test loading tags for multiple cards at once."""
cards = ["Sol Ring", "Lightning Bolt", "Counterspell"]
result = load_tags_for_cards(cards)
assert isinstance(result, dict)
assert len(result) == 3
# All requested cards should be in result (even if no tags)
for card in cards:
assert card in result
assert isinstance(result[card], list)
def test_load_tags_for_empty_list(self):
"""Test loading tags for empty list returns empty dict."""
result = load_tags_for_cards([])
assert result == {}
def test_load_tags_for_single_card(self):
"""Test single card convenience function."""
tags = load_tags_for_card("Sol Ring")
assert isinstance(tags, list)
# Sol Ring should have some tags (artifacts, ramp, etc)
# But we don't assert specific tags since data may vary
def test_load_tags_for_nonexistent_card(self):
"""Test loading tags for card that doesn't exist."""
tags = load_tags_for_card("This Card Does Not Exist 12345")
# Should return empty list, not fail
assert tags == []
def test_load_tags_batch_includes_missing_cards(self):
"""Test batch loading includes missing cards with empty lists."""
cards = ["Sol Ring", "Fake Card Name 999", "Lightning Bolt"]
result = load_tags_for_cards(cards)
# All cards should be present
assert len(result) == 3
assert "Fake Card Name 999" in result
assert result["Fake Card Name 999"] == []
def test_load_tags_handles_list_format(self):
"""Test that tags in list format are parsed correctly."""
# Pick a card likely to have tags
result = load_tags_for_cards(["Sol Ring"])
if "Sol Ring" in result and result["Sol Ring"]:
tags = result["Sol Ring"]
# Should be a list of strings
assert all(isinstance(tag, str) for tag in tags)
# Tags should be stripped of whitespace
assert all(tag == tag.strip() for tag in tags)
def test_load_tags_handles_string_format(self):
"""Test that tags in string format are parsed correctly."""
# The loader should handle both list and string representations
# This is tested implicitly by loading any card
cards = ["Sol Ring", "Lightning Bolt"]
result = load_tags_for_cards(cards)
for card in cards:
tags = result[card]
# All should be lists (even if empty)
assert isinstance(tags, list)
# No empty string tags
assert "" not in tags
assert all(tag.strip() for tag in tags)
class TestTagQueries:
"""Test querying cards by tags."""
def test_get_cards_with_tag(self):
"""Test getting all cards with a specific tag."""
# Pick a common tag
cards = get_cards_with_tag("ramp", limit=10)
assert isinstance(cards, list)
# Should have some cards (or none if tag doesn't exist)
# We don't assert specific count since data varies
def test_get_cards_with_tag_limit(self):
"""Test limit parameter works."""
cards = get_cards_with_tag("ramp", limit=5)
assert len(cards) <= 5
def test_get_cards_with_nonexistent_tag(self):
"""Test querying with tag that doesn't exist."""
cards = get_cards_with_tag("ThisTagDoesNotExist12345")
# Should return empty list, not fail
assert cards == []
def test_get_cards_with_all_tags(self):
"""Test getting cards that have multiple tags."""
# Pick two tags that might overlap
cards = get_cards_with_all_tags(["artifacts", "ramp"], limit=10)
assert isinstance(cards, list)
assert len(cards) <= 10
def test_get_cards_with_all_tags_no_matches(self):
"""Test query with tags that likely have no overlap."""
cards = get_cards_with_all_tags([
"ThisTagDoesNotExist1",
"ThisTagDoesNotExist2"
])
# Should return empty list
assert cards == []
class TestCacheManagement:
"""Test cache management functions."""
def test_clear_cache(self):
"""Test that cache can be cleared without errors."""
# Load some data
load_tags_for_card("Sol Ring")
# Clear cache
clear_cache()
# Should still work after clearing
tags = load_tags_for_card("Sol Ring")
assert isinstance(tags, list)
def test_cache_persistence(self):
"""Test that multiple calls use cached data."""
# First call
result1 = load_tags_for_cards(["Sol Ring", "Lightning Bolt"])
# Second call (should use cache)
result2 = load_tags_for_cards(["Sol Ring", "Lightning Bolt"])
# Results should be identical
assert result1 == result2
class TestFeatureFlag:
"""Test feature flag functionality."""
def test_is_use_all_cards_enabled_default(self):
"""Test that all_cards tag loading is enabled by default."""
enabled = is_use_all_cards_enabled()
# Default should be True
assert isinstance(enabled, bool)
# We don't assert True since env might override
class TestEdgeCases:
"""Test edge cases and error handling."""
def test_load_tags_with_special_characters(self):
"""Test loading tags for cards with special characters."""
# Cards with apostrophes, commas, etc.
cards = [
"Urza's Saga",
"Keeper of the Accord",
"Esper Sentinel"
]
result = load_tags_for_cards(cards)
# Should handle special characters
assert len(result) == 3
for card in cards:
assert card in result
def test_load_tags_preserves_card_name_case(self):
"""Test that card names preserve their original case."""
cards = ["Sol Ring", "LIGHTNING BOLT", "counterspell"]
result = load_tags_for_cards(cards)
# Should have entries for provided names (case-sensitive lookup)
assert "Sol Ring" in result or len(result) >= 1
# Note: exact case matching depends on all_cards data
def test_load_tags_deduplicates(self):
"""Test that duplicate tags are handled."""
# Load tags for a card
tags = load_tags_for_card("Sol Ring")
# If any tags present, check for no duplicates
if tags:
assert len(tags) == len(set(tags))
def test_large_batch_performance(self):
"""Test that large batch loads complete in reasonable time."""
import time
# Create a batch of 100 common cards
cards = ["Sol Ring"] * 50 + ["Lightning Bolt"] * 50
start = time.perf_counter()
result = load_tags_for_cards(cards)
elapsed = time.perf_counter() - start
# Should complete quickly (< 1 second for 100 cards)
assert elapsed < 1.0
assert len(result) >= 1 # At least one card found
class TestFormatVariations:
"""Test handling of different tag format variations."""
def test_empty_tags_handled(self):
"""Test that cards with no tags return empty list."""
# Pick a card that might have no tags (basic lands usually don't)
tags = load_tags_for_card("Plains")
# Should be empty list, not None or error
assert tags == [] or isinstance(tags, list)
def test_string_list_repr_parsed(self):
"""Test parsing of string representations like \"['tag1', 'tag2']\"."""
# This is tested implicitly through load_tags_for_cards
# The loader handles multiple formats internally
cards = ["Sol Ring", "Lightning Bolt", "Counterspell"]
result = load_tags_for_cards(cards)
# All results should be lists
for card, tags in result.items():
assert isinstance(tags, list)
# No stray brackets or quotes
for tag in tags:
assert "[" not in tag
assert "]" not in tag
assert '"' not in tag
assert "'" not in tag or tag.count("'") > 1 # Allow apostrophes in words
def test_comma_separated_parsed(self):
"""Test parsing of comma-separated tag strings."""
# The loader should handle comma-separated strings
# This is tested implicitly by loading any card
result = load_tags_for_cards(["Sol Ring"])
if result.get("Sol Ring"):
tags = result["Sol Ring"]
# Tags should be split properly (no commas in individual tags)
for tag in tags:
assert "," not in tag or tag.count(",") == 0

View file

@ -0,0 +1,370 @@
"""Tests for consolidated theme enrichment pipeline.
These tests verify that the new consolidated pipeline produces the same results
as the old 7-script approach, but much faster.
"""
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict
import pytest
try:
import yaml
except ImportError:
yaml = None
from code.tagging.theme_enrichment import (
ThemeEnrichmentPipeline,
EnrichmentStats,
run_enrichment_pipeline,
)
# Skip all tests if PyYAML not available
pytestmark = pytest.mark.skipif(yaml is None, reason="PyYAML not installed")
@pytest.fixture
def temp_catalog_dir(tmp_path: Path) -> Path:
"""Create temporary catalog directory with test themes."""
catalog_dir = tmp_path / 'config' / 'themes' / 'catalog'
catalog_dir.mkdir(parents=True)
return catalog_dir
@pytest.fixture
def temp_root(tmp_path: Path, temp_catalog_dir: Path) -> Path:
"""Create temporary project root."""
# Create theme_list.json
theme_json = tmp_path / 'config' / 'themes' / 'theme_list.json'
theme_json.parent.mkdir(parents=True, exist_ok=True)
theme_json.write_text('{"themes": []}', encoding='utf-8')
return tmp_path
def write_theme(catalog_dir: Path, filename: str, data: Dict[str, Any]) -> Path:
"""Helper to write a theme YAML file."""
path = catalog_dir / filename
path.write_text(yaml.safe_dump(data, sort_keys=False, allow_unicode=True), encoding='utf-8')
return path
def read_theme(path: Path) -> Dict[str, Any]:
"""Helper to read a theme YAML file."""
return yaml.safe_load(path.read_text(encoding='utf-8'))
class TestThemeEnrichmentPipeline:
"""Tests for ThemeEnrichmentPipeline class."""
def test_init(self, temp_root: Path):
"""Test pipeline initialization."""
pipeline = ThemeEnrichmentPipeline(root=temp_root, min_examples=5)
assert pipeline.root == temp_root
assert pipeline.min_examples == 5
assert pipeline.catalog_dir == temp_root / 'config' / 'themes' / 'catalog'
assert len(pipeline.themes) == 0
def test_load_themes_empty_dir(self, temp_root: Path):
"""Test loading themes from empty directory."""
pipeline = ThemeEnrichmentPipeline(root=temp_root)
pipeline.load_all_themes()
assert len(pipeline.themes) == 0
assert pipeline.stats.total_themes == 0
def test_load_themes_with_valid_files(self, temp_root: Path, temp_catalog_dir: Path):
"""Test loading valid theme files."""
write_theme(temp_catalog_dir, 'landfall.yml', {
'display_name': 'Landfall',
'synergies': ['Ramp', 'Tokens'],
'example_commanders': []
})
write_theme(temp_catalog_dir, 'reanimate.yml', {
'display_name': 'Reanimate',
'synergies': ['Graveyard', 'Mill'],
'example_commanders': ['Meren of Clan Nel Toth']
})
pipeline = ThemeEnrichmentPipeline(root=temp_root)
pipeline.load_all_themes()
assert len(pipeline.themes) == 2
assert pipeline.stats.total_themes == 2
def test_autofill_placeholders_empty_examples(self, temp_root: Path, temp_catalog_dir: Path):
"""Test autofill adds placeholders to themes with no examples."""
write_theme(temp_catalog_dir, 'tokens.yml', {
'display_name': 'Tokens Matter',
'synergies': ['Sacrifice', 'Aristocrats'],
'example_commanders': []
})
pipeline = ThemeEnrichmentPipeline(root=temp_root)
pipeline.load_all_themes()
pipeline.autofill_placeholders()
assert pipeline.stats.autofilled == 1
theme = list(pipeline.themes.values())[0]
assert theme.modified
assert 'Tokens Matter Anchor' in theme.data['example_commanders']
assert 'Sacrifice Anchor' in theme.data['example_commanders']
assert 'Aristocrats Anchor' in theme.data['example_commanders']
assert theme.data.get('editorial_quality') == 'draft'
def test_autofill_skips_themes_with_examples(self, temp_root: Path, temp_catalog_dir: Path):
"""Test autofill skips themes that already have examples."""
write_theme(temp_catalog_dir, 'landfall.yml', {
'display_name': 'Landfall',
'synergies': ['Ramp'],
'example_commanders': ['Tatyova, Benthic Druid']
})
pipeline = ThemeEnrichmentPipeline(root=temp_root)
pipeline.load_all_themes()
pipeline.autofill_placeholders()
assert pipeline.stats.autofilled == 0
theme = list(pipeline.themes.values())[0]
assert not theme.modified
def test_pad_examples_to_minimum(self, temp_root: Path, temp_catalog_dir: Path):
"""Test padding adds placeholders to reach minimum threshold."""
write_theme(temp_catalog_dir, 'ramp.yml', {
'display_name': 'Ramp',
'synergies': ['Landfall', 'BigSpells', 'Hydras'],
'example_commanders': ['Ramp Anchor', 'Landfall Anchor']
})
pipeline = ThemeEnrichmentPipeline(root=temp_root, min_examples=5)
pipeline.load_all_themes()
pipeline.pad_examples()
assert pipeline.stats.padded == 1
theme = list(pipeline.themes.values())[0]
assert theme.modified
assert len(theme.data['example_commanders']) == 5
# Should add synergies first (3rd synergy), then letter suffixes
assert 'Hydras Anchor' in theme.data['example_commanders']
# Should also have letter suffixes for remaining slots
assert any('Anchor B' in cmd or 'Anchor C' in cmd for cmd in theme.data['example_commanders'])
def test_pad_skips_mixed_real_and_placeholder(self, temp_root: Path, temp_catalog_dir: Path):
"""Test padding skips lists with both real and placeholder examples."""
write_theme(temp_catalog_dir, 'tokens.yml', {
'display_name': 'Tokens',
'synergies': ['Sacrifice'],
'example_commanders': ['Krenko, Mob Boss', 'Tokens Anchor']
})
pipeline = ThemeEnrichmentPipeline(root=temp_root, min_examples=5)
pipeline.load_all_themes()
pipeline.pad_examples()
assert pipeline.stats.padded == 0
theme = list(pipeline.themes.values())[0]
assert not theme.modified
def test_cleanup_removes_placeholders_when_real_present(self, temp_root: Path, temp_catalog_dir: Path):
"""Test cleanup removes placeholders when real examples are present.
Note: cleanup only removes entries ending with ' Anchor' (no suffix).
Purge step removes entries with ' Anchor' or ' Anchor X' pattern.
"""
write_theme(temp_catalog_dir, 'lifegain.yml', {
'display_name': 'Lifegain',
'synergies': [],
'example_commanders': [
'Oloro, Ageless Ascetic',
'Lifegain Anchor', # Will be removed
'Trelasarra, Moon Dancer',
]
})
pipeline = ThemeEnrichmentPipeline(root=temp_root)
pipeline.load_all_themes()
pipeline.cleanup_placeholders()
assert pipeline.stats.cleaned == 1
theme = list(pipeline.themes.values())[0]
assert theme.modified
assert len(theme.data['example_commanders']) == 2
assert 'Oloro, Ageless Ascetic' in theme.data['example_commanders']
assert 'Trelasarra, Moon Dancer' in theme.data['example_commanders']
assert 'Lifegain Anchor' not in theme.data['example_commanders']
def test_purge_removes_all_anchors(self, temp_root: Path, temp_catalog_dir: Path):
"""Test purge removes all anchor placeholders (even if no real examples)."""
write_theme(temp_catalog_dir, 'counters.yml', {
'display_name': 'Counters',
'synergies': [],
'example_commanders': [
'Counters Anchor',
'Counters Anchor B',
'Counters Anchor C'
]
})
pipeline = ThemeEnrichmentPipeline(root=temp_root)
pipeline.load_all_themes()
pipeline.purge_anchors()
assert pipeline.stats.purged == 1
theme = list(pipeline.themes.values())[0]
assert theme.modified
assert theme.data['example_commanders'] == []
def test_augment_from_catalog(self, temp_root: Path, temp_catalog_dir: Path):
"""Test augmentation adds missing fields from catalog."""
# Create catalog JSON
catalog_json = temp_root / 'config' / 'themes' / 'theme_list.json'
catalog_data = {
'themes': [
{
'theme': 'Landfall',
'description': 'Triggers from lands entering',
'popularity_bucket': 'common',
'popularity_hint': 'Very popular',
'deck_archetype': 'Lands'
}
]
}
import json
catalog_json.write_text(json.dumps(catalog_data), encoding='utf-8')
write_theme(temp_catalog_dir, 'landfall.yml', {
'display_name': 'Landfall',
'synergies': ['Ramp'],
'example_commanders': ['Tatyova, Benthic Druid']
})
pipeline = ThemeEnrichmentPipeline(root=temp_root)
pipeline.load_all_themes()
pipeline.augment_from_catalog()
assert pipeline.stats.augmented == 1
theme = list(pipeline.themes.values())[0]
assert theme.modified
assert theme.data['description'] == 'Triggers from lands entering'
assert theme.data['popularity_bucket'] == 'common'
assert theme.data['popularity_hint'] == 'Very popular'
assert theme.data['deck_archetype'] == 'Lands'
def test_validate_min_examples_warning(self, temp_root: Path, temp_catalog_dir: Path):
"""Test validation warns about insufficient examples."""
write_theme(temp_catalog_dir, 'ramp.yml', {
'display_name': 'Ramp',
'synergies': [],
'example_commanders': ['Ramp Commander']
})
pipeline = ThemeEnrichmentPipeline(root=temp_root, min_examples=5)
pipeline.load_all_themes()
pipeline.validate(enforce_min=False)
assert pipeline.stats.lint_warnings > 0
assert pipeline.stats.lint_errors == 0
def test_validate_min_examples_error(self, temp_root: Path, temp_catalog_dir: Path):
"""Test validation errors on insufficient examples when enforced."""
write_theme(temp_catalog_dir, 'ramp.yml', {
'display_name': 'Ramp',
'synergies': [],
'example_commanders': ['Ramp Commander']
})
pipeline = ThemeEnrichmentPipeline(root=temp_root, min_examples=5)
pipeline.load_all_themes()
pipeline.validate(enforce_min=True)
assert pipeline.stats.lint_errors > 0
def test_write_themes_dry_run(self, temp_root: Path, temp_catalog_dir: Path):
"""Test dry run doesn't write files."""
theme_path = write_theme(temp_catalog_dir, 'tokens.yml', {
'display_name': 'Tokens',
'synergies': [],
'example_commanders': []
})
original_content = theme_path.read_text(encoding='utf-8')
pipeline = ThemeEnrichmentPipeline(root=temp_root)
pipeline.load_all_themes()
pipeline.autofill_placeholders()
# Don't call write_all_themes()
# File should be unchanged
assert theme_path.read_text(encoding='utf-8') == original_content
def test_write_themes_saves_changes(self, temp_root: Path, temp_catalog_dir: Path):
"""Test write_all_themes saves modified files."""
theme_path = write_theme(temp_catalog_dir, 'tokens.yml', {
'display_name': 'Tokens',
'synergies': ['Sacrifice'],
'example_commanders': []
})
pipeline = ThemeEnrichmentPipeline(root=temp_root)
pipeline.load_all_themes()
pipeline.autofill_placeholders()
pipeline.write_all_themes()
# File should be updated
updated_data = read_theme(theme_path)
assert len(updated_data['example_commanders']) > 0
assert 'Tokens Anchor' in updated_data['example_commanders']
def test_run_all_full_pipeline(self, temp_root: Path, temp_catalog_dir: Path):
"""Test running the complete enrichment pipeline."""
write_theme(temp_catalog_dir, 'landfall.yml', {
'display_name': 'Landfall',
'synergies': ['Ramp', 'Lands'],
'example_commanders': []
})
write_theme(temp_catalog_dir, 'reanimate.yml', {
'display_name': 'Reanimate',
'synergies': ['Graveyard'],
'example_commanders': []
})
pipeline = ThemeEnrichmentPipeline(root=temp_root, min_examples=5)
stats = pipeline.run_all(write=True, enforce_min=False, strict_lint=False)
assert stats.total_themes == 2
assert stats.autofilled >= 2
assert stats.padded >= 2
# Verify files were updated
landfall_data = read_theme(temp_catalog_dir / 'landfall.yml')
assert len(landfall_data['example_commanders']) >= 5
assert landfall_data.get('editorial_quality') == 'draft'
def test_run_enrichment_pipeline_convenience_function(temp_root: Path, temp_catalog_dir: Path):
"""Test the convenience function wrapper."""
write_theme(temp_catalog_dir, 'tokens.yml', {
'display_name': 'Tokens',
'synergies': ['Sacrifice'],
'example_commanders': []
})
stats = run_enrichment_pipeline(
root=temp_root,
min_examples=3,
write=True,
enforce_min=False,
strict=False,
progress_callback=None,
)
assert isinstance(stats, EnrichmentStats)
assert stats.total_themes == 1
assert stats.autofilled >= 1
# Verify file was written
tokens_data = read_theme(temp_catalog_dir / 'tokens.yml')
assert len(tokens_data['example_commanders']) >= 3

View file

@ -0,0 +1,214 @@
"""Tests for web tag search endpoints."""
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def client():
"""Create a test client for the web app."""
# Import here to avoid circular imports
from code.web.app import app
return TestClient(app)
def test_theme_autocomplete_basic(client):
"""Test basic theme autocomplete functionality."""
response = client.get("/commanders/theme-autocomplete?theme=life&limit=5")
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
content = response.text
assert "autocomplete-item" in content
assert "Life" in content # Should match tags starting with "life"
assert "tag-count" in content # Should show card counts
def test_theme_autocomplete_min_length(client):
"""Test that theme autocomplete requires minimum 2 characters."""
response = client.get("/commanders/theme-autocomplete?theme=a&limit=5")
# Should fail validation
assert response.status_code == 422
def test_theme_autocomplete_no_matches(client):
"""Test theme autocomplete with query that has no matches."""
response = client.get("/commanders/theme-autocomplete?theme=zzzzzzzzz&limit=5")
assert response.status_code == 200
content = response.text
assert "autocomplete-empty" in content or "No matching themes" in content
def test_theme_autocomplete_limit(client):
"""Test that theme autocomplete respects limit parameter."""
response = client.get("/commanders/theme-autocomplete?theme=a&limit=3")
assert response.status_code in [200, 422] # May fail min_length validation
# Try with valid length
response = client.get("/commanders/theme-autocomplete?theme=to&limit=3")
assert response.status_code == 200
# Count items (rough check - should have at most 3)
content = response.text
item_count = content.count('class="autocomplete-item"')
assert item_count <= 3
def test_api_cards_by_tags_and_logic(client):
"""Test card search with AND logic."""
response = client.get("/api/cards/by-tags?tags=tokens&logic=AND&limit=10")
assert response.status_code == 200
data = response.json()
assert "tags" in data
assert "logic" in data
assert data["logic"] == "AND"
assert "total_matches" in data
assert "cards" in data
assert isinstance(data["cards"], list)
def test_api_cards_by_tags_or_logic(client):
"""Test card search with OR logic."""
response = client.get("/api/cards/by-tags?tags=tokens,sacrifice&logic=OR&limit=10")
assert response.status_code == 200
data = response.json()
assert data["logic"] == "OR"
assert "cards" in data
def test_api_cards_by_tags_invalid_logic(client):
"""Test that invalid logic parameter returns error."""
response = client.get("/api/cards/by-tags?tags=tokens&logic=INVALID&limit=10")
assert response.status_code == 400
data = response.json()
assert "error" in data
def test_api_cards_by_tags_empty_tags(client):
"""Test that empty tags parameter returns error."""
response = client.get("/api/cards/by-tags?tags=&logic=AND&limit=10")
assert response.status_code == 400
data = response.json()
assert "error" in data
def test_api_tags_search(client):
"""Test tag search autocomplete endpoint."""
response = client.get("/api/cards/tags/search?q=life&limit=10")
assert response.status_code == 200
data = response.json()
assert "query" in data
assert data["query"] == "life"
assert "matches" in data
assert isinstance(data["matches"], list)
# Check match structure
if data["matches"]:
match = data["matches"][0]
assert "tag" in match
assert "card_count" in match
assert match["tag"].lower().startswith("life")
def test_api_tags_search_min_length(client):
"""Test that tag search requires minimum 2 characters."""
response = client.get("/api/cards/tags/search?q=a&limit=10")
# Should fail validation
assert response.status_code == 422
def test_api_tags_popular(client):
"""Test popular tags endpoint."""
response = client.get("/api/cards/tags/popular?limit=20")
assert response.status_code == 200
data = response.json()
assert "count" in data
assert "tags" in data
assert isinstance(data["tags"], list)
assert data["count"] == len(data["tags"])
assert data["count"] <= 20
# Check tag structure
if data["tags"]:
tag = data["tags"][0]
assert "tag" in tag
assert "card_count" in tag
assert isinstance(tag["card_count"], int)
# Tags should be sorted by card count (descending)
if len(data["tags"]) > 1:
assert data["tags"][0]["card_count"] >= data["tags"][1]["card_count"]
def test_api_tags_popular_limit(client):
"""Test that popular tags endpoint respects limit."""
response = client.get("/api/cards/tags/popular?limit=5")
assert response.status_code == 200
data = response.json()
assert len(data["tags"]) <= 5
def test_commanders_page_loads(client):
"""Test that commanders page loads successfully."""
response = client.get("/commanders")
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
content = response.text
# Should have the theme filter input
assert "commander-theme" in content
assert "theme-suggestions" in content
def test_commanders_page_with_theme_filter(client):
"""Test commanders page with theme query parameter."""
response = client.get("/commanders?theme=tokens")
assert response.status_code == 200
content = response.text
# Should have the theme value in the input
assert 'value="tokens"' in content or "tokens" in content
@pytest.mark.skip(reason="Performance test - run manually")
def test_theme_autocomplete_performance(client):
"""Test that theme autocomplete responds quickly."""
import time
start = time.time()
response = client.get("/commanders/theme-autocomplete?theme=to&limit=20")
elapsed = time.time() - start
assert response.status_code == 200
assert elapsed < 0.05 # Should respond in <50ms
@pytest.mark.skip(reason="Performance test - run manually")
def test_api_tags_search_performance(client):
"""Test that tag search responds quickly."""
import time
start = time.time()
response = client.get("/api/cards/tags/search?q=to&limit=20")
elapsed = time.time() - start
assert response.status_code == 200
assert elapsed < 0.05 # Should respond in <50ms

View file

@ -2205,6 +2205,7 @@ from .routes import themes as themes_routes # noqa: E402
from .routes import commanders as commanders_routes # noqa: E402
from .routes import partner_suggestions as partner_suggestions_routes # noqa: E402
from .routes import telemetry as telemetry_routes # noqa: E402
from .routes import cards as cards_routes # noqa: E402
app.include_router(build_routes.router)
app.include_router(config_routes.router)
app.include_router(decks_routes.router)
@ -2214,6 +2215,7 @@ app.include_router(themes_routes.router)
app.include_router(commanders_routes.router)
app.include_router(partner_suggestions_routes.router)
app.include_router(telemetry_routes.router)
app.include_router(cards_routes.router)
# Warm validation cache early to reduce first-call latency in tests and dev
try:

186
code/web/routes/cards.py Normal file
View file

@ -0,0 +1,186 @@
"""Card browsing and tag search API endpoints."""
from __future__ import annotations
from typing import Optional
from fastapi import APIRouter, Query
from fastapi.responses import JSONResponse
# Import tag index from M3
try:
from code.tagging.tag_index import get_tag_index
except ImportError:
from tagging.tag_index import get_tag_index
# Import all cards loader
try:
from code.services.all_cards_loader import AllCardsLoader
except ImportError:
from services.all_cards_loader import AllCardsLoader
router = APIRouter(prefix="/api/cards", tags=["cards"])
# Cache for all_cards loader
_all_cards_loader: Optional[AllCardsLoader] = None
def _get_all_cards_loader() -> AllCardsLoader:
"""Get cached AllCardsLoader instance."""
global _all_cards_loader
if _all_cards_loader is None:
_all_cards_loader = AllCardsLoader()
return _all_cards_loader
@router.get("/by-tags")
async def search_by_tags(
tags: str = Query(..., description="Comma-separated list of theme tags"),
logic: str = Query("AND", description="Search logic: AND (intersection) or OR (union)"),
limit: int = Query(100, ge=1, le=1000, description="Maximum number of results"),
) -> JSONResponse:
"""Search for cards by theme tags.
Examples:
/api/cards/by-tags?tags=tokens&logic=AND
/api/cards/by-tags?tags=tokens,sacrifice&logic=AND
/api/cards/by-tags?tags=lifegain,lifelink&logic=OR
Args:
tags: Comma-separated theme tags to search for
logic: "AND" for cards with all tags, "OR" for cards with any tag
limit: Maximum results to return
Returns:
JSON with matching cards and metadata
"""
try:
# Parse tags
tag_list = [t.strip() for t in tags.split(",") if t.strip()]
if not tag_list:
return JSONResponse(
status_code=400,
content={"error": "No valid tags provided"}
)
# Get tag index and find matching cards
tag_index = get_tag_index()
if logic.upper() == "AND":
card_names = tag_index.get_cards_with_all_tags(tag_list)
elif logic.upper() == "OR":
card_names = tag_index.get_cards_with_any_tags(tag_list)
else:
return JSONResponse(
status_code=400,
content={"error": f"Invalid logic: {logic}. Use AND or OR."}
)
# Load full card data
all_cards = _get_all_cards_loader().load()
matching_cards = all_cards[all_cards["name"].isin(card_names)]
# Limit results
matching_cards = matching_cards.head(limit)
# Convert to dict
results = matching_cards.to_dict("records")
return JSONResponse(content={
"tags": tag_list,
"logic": logic.upper(),
"total_matches": len(card_names),
"returned": len(results),
"limit": limit,
"cards": results
})
except Exception as e:
return JSONResponse(
status_code=500,
content={"error": f"Search failed: {str(e)}"}
)
@router.get("/tags/search")
async def search_tags(
q: str = Query(..., min_length=2, description="Tag prefix to search for"),
limit: int = Query(10, ge=1, le=50, description="Maximum number of suggestions"),
) -> JSONResponse:
"""Autocomplete search for theme tags.
Examples:
/api/cards/tags/search?q=life
/api/cards/tags/search?q=token&limit=5
Args:
q: Tag prefix (minimum 2 characters)
limit: Maximum suggestions to return
Returns:
JSON with matching tags sorted by popularity
"""
try:
tag_index = get_tag_index()
# Get all tags with counts - get_popular_tags returns all tags when given a high limit
all_tags_with_counts = tag_index.get_popular_tags(limit=10000)
# Filter by prefix (case-insensitive)
prefix_lower = q.lower()
matches = [
(tag, count)
for tag, count in all_tags_with_counts
if tag.lower().startswith(prefix_lower)
]
# Already sorted by popularity from get_popular_tags
# Limit results
matches = matches[:limit]
return JSONResponse(content={
"query": q,
"matches": [
{"tag": tag, "card_count": count}
for tag, count in matches
]
})
except Exception as e:
return JSONResponse(
status_code=500,
content={"error": f"Tag search failed: {str(e)}"}
)
@router.get("/tags/popular")
async def get_popular_tags(
limit: int = Query(50, ge=1, le=200, description="Number of popular tags to return"),
) -> JSONResponse:
"""Get the most popular theme tags by card count.
Examples:
/api/cards/tags/popular
/api/cards/tags/popular?limit=20
Args:
limit: Maximum tags to return
Returns:
JSON with popular tags sorted by card count
"""
try:
tag_index = get_tag_index()
popular = tag_index.get_popular_tags(limit=limit)
return JSONResponse(content={
"count": len(popular),
"tags": [
{"tag": tag, "card_count": count}
for tag, count in popular
]
})
except Exception as e:
return JSONResponse(
status_code=500,
content={"error": f"Failed to get popular tags: {str(e)}"}
)

View file

@ -526,6 +526,52 @@ def _build_theme_info(records: Sequence[CommanderRecord]) -> dict[str, Commander
return info
@router.get("/theme-autocomplete", response_class=HTMLResponse)
async def theme_autocomplete(
request: Request,
theme: str = Query(..., min_length=2, description="Theme prefix to search for"),
limit: int = Query(20, ge=1, le=50),
) -> HTMLResponse:
"""HTMX endpoint for theme tag autocomplete."""
try:
# Import tag_index
try:
from code.tagging.tag_index import get_tag_index
except ImportError:
from tagging.tag_index import get_tag_index
tag_index = get_tag_index()
# Get all tags with counts - get_popular_tags returns all tags when given a high limit
all_tags_with_counts = tag_index.get_popular_tags(limit=10000)
# Filter by prefix (case-insensitive)
prefix_lower = theme.lower()
matches = [
(tag, count)
for tag, count in all_tags_with_counts
if tag.lower().startswith(prefix_lower)
]
# Already sorted by popularity from get_popular_tags
matches = matches[:limit]
# Generate HTML suggestions with ARIA attributes
html_parts = []
for tag, count in matches:
html_parts.append(
f'<div class="autocomplete-item" data-value="{tag}" role="option">'
f'{tag} <span class="tag-count">({count})</span></div>'
)
html = "\n".join(html_parts) if html_parts else '<div class="autocomplete-empty">No matching themes</div>'
return HTMLResponse(content=html)
except Exception as e:
return HTMLResponse(content=f'<div class="autocomplete-error">Error: {str(e)}</div>')
@router.get("/", response_class=HTMLResponse)
async def commanders_index(
request: Request,

View file

@ -153,40 +153,44 @@ def _display_tags_from_entry(entry: Dict[str, Any]) -> List[str]:
def _run_theme_metadata_enrichment(out_func=None) -> None:
"""Run full metadata enrichment sequence after theme catalog/YAML generation.
Idempotent: each script is safe to re-run; errors are swallowed (logged) to avoid
Uses consolidated ThemeEnrichmentPipeline for 5-10x faster processing.
Idempotent: safe to re-run; errors are swallowed (logged) to avoid
impacting primary setup/tagging pipeline. Designed to centralize logic so both
manual refresh (routes/themes.py) and automatic setup flows invoke identical steps.
"""
try:
import os
import sys
import subprocess
root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))
scripts_dir = os.path.join(root, 'code', 'scripts')
py = sys.executable
steps: List[List[str]] = [
[py, os.path.join(scripts_dir, 'autofill_min_examples.py')],
[py, os.path.join(scripts_dir, 'pad_min_examples.py'), '--min', os.environ.get('EDITORIAL_MIN_EXAMPLES', '5')],
[py, os.path.join(scripts_dir, 'cleanup_placeholder_examples.py'), '--apply'],
[py, os.path.join(scripts_dir, 'purge_anchor_placeholders.py'), '--apply'],
# Augment YAML with description / popularity buckets from the freshly built catalog
[py, os.path.join(scripts_dir, 'augment_theme_yaml_from_catalog.py')],
[py, os.path.join(scripts_dir, 'generate_theme_editorial_suggestions.py'), '--apply', '--limit-yaml', '0'],
[py, os.path.join(scripts_dir, 'lint_theme_editorial.py')], # non-strict lint pass
]
from pathlib import Path
from code.tagging.theme_enrichment import run_enrichment_pipeline
root = Path(__file__).resolve().parents[3]
min_examples = int(os.environ.get('EDITORIAL_MIN_EXAMPLES', '5'))
def _emit(msg: str):
try:
if out_func:
out_func(msg)
except Exception:
pass
for cmd in steps:
# Run consolidated pipeline instead of 7 separate subprocess scripts
stats = run_enrichment_pipeline(
root=root,
min_examples=min_examples,
write=True,
enforce_min=False, # Non-strict lint pass
strict=False,
progress_callback=_emit,
)
_emit(f"Theme enrichment complete: {stats.total_themes} themes processed")
except Exception as e:
if out_func:
try:
subprocess.run(cmd, check=True)
except Exception as e:
_emit(f"[metadata_enrich] step failed ({os.path.basename(cmd[1]) if len(cmd)>1 else cmd}): {e}")
continue
except Exception:
out_func(f"[metadata_enrich] pipeline failed: {e}")
except Exception:
pass
return
@ -1144,6 +1148,13 @@ def _ensure_setup_ready(out, force: bool = False) -> None:
# Run metadata enrichment (best-effort) after export sequence.
try:
_run_theme_metadata_enrichment(out_func)
# Rebuild theme_list.json to pick up newly generated example_cards/commanders
# from the enrichment pipeline (which populates them from CSV data)
if use_merge and os.path.exists(build_script):
args = [_sys.executable, build_script]
if force:
args.append('--force')
_run(args, check=True)
except Exception:
pass
try:

View file

@ -23,15 +23,23 @@
<span class="filter-label">Commander name</span>
<input type="search" id="commander-search" name="q" value="{{ query }}" placeholder="Search commander names..." autocomplete="off" />
</label>
<label>
<span class="filter-label">Theme</span>
<input type="search" id="commander-theme" name="theme" value="{{ theme_query }}" placeholder="Search themes..." list="theme-suggestions" autocomplete="off" />
</label>
<datalist id="theme-suggestions">
{% for name in theme_options[:200] %}
<option value="{{ name }}"></option>
{% endfor %}
</datalist>
<div class="filter-field">
<label for="commander-theme" class="filter-label">Theme:</label>
<div class="autocomplete-container">
<input type="search" id="commander-theme" name="theme" value="{{ theme_query }}"
placeholder="Search themes..." autocomplete="off"
role="combobox"
aria-autocomplete="list"
aria-controls="theme-suggestions"
aria-expanded="false"
hx-get="/commanders/theme-autocomplete"
hx-trigger="keyup changed delay:300ms"
hx-target="#theme-suggestions"
hx-include="[name='theme']"
hx-swap="innerHTML" />
<div id="theme-suggestions" class="autocomplete-dropdown" role="listbox" aria-label="Theme suggestions"></div>
</div>
</div>
<label>
<span class="filter-label">Color identity</span>
<select id="commander-color" name="color">
@ -185,6 +193,18 @@
.commander-thumb img { width:100%; }
.skeleton-thumb { width:min(70vw, 220px); height:calc(min(70vw, 220px) * 1.4); }
}
/* Autocomplete dropdown styles */
.autocomplete-container { position:relative; width:100%; }
.autocomplete-dropdown { position:absolute; top:100%; left:0; right:0; z-index:1000; background:var(--panel); border:1px solid var(--border); border-radius:8px; margin-top:4px; max-height:280px; overflow-y:auto; box-shadow:0 4px 12px rgba(0,0,0,.25); display:none; }
.autocomplete-dropdown:not(:empty) { display:block; }
.autocomplete-item { padding:.5rem .75rem; cursor:pointer; border-bottom:1px solid var(--border); transition:background .15s ease; }
.autocomplete-item:last-child { border-bottom:none; }
.autocomplete-item:hover, .autocomplete-item:focus, .autocomplete-item.selected { background:rgba(148,163,184,.15); }
.autocomplete-item.selected { background:rgba(148,163,184,.25); border-left:3px solid var(--ring); padding-left:calc(.75rem - 3px); }
.autocomplete-item .tag-count { color:var(--muted); font-size:.85rem; float:right; }
.autocomplete-empty { padding:.75rem; text-align:center; color:var(--muted); font-size:.85rem; }
.autocomplete-error { padding:.75rem; text-align:center; color:#f87171; font-size:.85rem; }
</style>
<script>
(function(){
@ -215,6 +235,107 @@
resetPage();
setLastTrigger('theme');
});
// Autocomplete dropdown handling
const autocompleteDropdown = document.getElementById('theme-suggestions');
if (autocompleteDropdown) {
let selectedIndex = -1;
// Helper to get all autocomplete items
const getItems = () => Array.from(autocompleteDropdown.querySelectorAll('.autocomplete-item'));
// Helper to select an item by index
const selectItem = (index) => {
const items = getItems();
items.forEach((item, i) => {
if (i === index) {
item.classList.add('selected');
item.scrollIntoView({ block: 'nearest', behavior: 'smooth' });
} else {
item.classList.remove('selected');
}
});
selectedIndex = index;
};
// Helper to apply selected item
const applySelectedItem = () => {
const items = getItems();
const item = items[selectedIndex];
if (item && item.dataset.value) {
themeField.value = item.dataset.value;
autocompleteDropdown.innerHTML = '';
selectedIndex = -1;
themeField.dispatchEvent(new Event('input', { bubbles: true }));
form.dispatchEvent(new Event('submit', { bubbles: true }));
}
};
// Reset selection when dropdown content changes
const observer = new MutationObserver(() => {
selectedIndex = -1;
getItems().forEach(item => item.classList.remove('selected'));
// Update aria-expanded based on dropdown content
const hasContent = autocompleteDropdown.children.length > 0;
themeField.setAttribute('aria-expanded', hasContent ? 'true' : 'false');
});
observer.observe(autocompleteDropdown, { childList: true });
// Click handler for autocomplete items
document.body.addEventListener('click', (e) => {
const item = e.target.closest('.autocomplete-item');
if (item && item.dataset.value) {
themeField.value = item.dataset.value;
autocompleteDropdown.innerHTML = '';
selectedIndex = -1;
themeField.dispatchEvent(new Event('input', { bubbles: true }));
form.dispatchEvent(new Event('submit', { bubbles: true }));
}
});
// Close dropdown when clicking outside
document.addEventListener('click', (e) => {
if (!e.target.closest('.autocomplete-container')) {
autocompleteDropdown.innerHTML = '';
selectedIndex = -1;
}
});
// Keyboard navigation
themeField.addEventListener('keydown', (e) => {
const items = getItems();
const hasItems = items.length > 0;
if (e.key === 'Escape') {
autocompleteDropdown.innerHTML = '';
selectedIndex = -1;
e.preventDefault();
} else if (e.key === 'ArrowDown' && hasItems) {
e.preventDefault();
const newIndex = selectedIndex < items.length - 1 ? selectedIndex + 1 : 0;
selectItem(newIndex);
} else if (e.key === 'ArrowUp' && hasItems) {
e.preventDefault();
const newIndex = selectedIndex > 0 ? selectedIndex - 1 : items.length - 1;
selectItem(newIndex);
} else if (e.key === 'Enter' && selectedIndex >= 0 && hasItems) {
e.preventDefault();
applySelectedItem();
}
});
// Mouse hover to highlight items
autocompleteDropdown.addEventListener('mouseover', (e) => {
const item = e.target.closest('.autocomplete-item');
if (item) {
const items = getItems();
const index = items.indexOf(item);
if (index >= 0) {
selectItem(index);
}
}
});
}
}
form.addEventListener('submit', () => {
if (!form.dataset.lastTrigger) {

File diff suppressed because it is too large Load diff