refactor: modular route organization (Phase 1-2 complete)

- Split monolithic build route handler into focused modules
- Extract validation, multi-copy, include/exclude, themes, and partner routes
- Add response utilities and telemetry decorators
- Create route pattern documentation
- Fix multi-copy detection bug (tag key mismatch)
- Improve code maintainability and testability

Roadmap 9 M1 Phase 1-2
This commit is contained in:
matt 2026-03-03 21:49:08 -08:00
parent 97da117ccb
commit e81b47bccf
20 changed files with 2852 additions and 1552 deletions

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,216 @@
"""
Include/Exclude card list management routes.
Handles user-defined include (must-have) and exclude (forbidden) card lists
for deck building, including the card toggle endpoint and summary rendering.
"""
from __future__ import annotations
from typing import Any
from fastapi import APIRouter, Request, Form
from fastapi.responses import HTMLResponse, JSONResponse
from ..app import ALLOW_MUST_HAVES, templates
from ..services.build_utils import step5_base_ctx
from ..services.tasks import get_session, new_sid
from ..services.telemetry import log_include_exclude_toggle
from .build import _merge_hx_trigger
router = APIRouter()
def _must_have_state(sess: dict) -> tuple[dict[str, Any], list[str], list[str]]:
"""
Extract include/exclude card lists and enforcement settings from session.
Args:
sess: Session dictionary containing user state
Returns:
Tuple of (state_dict, includes_list, excludes_list) where:
- state_dict contains enforcement mode, fuzzy matching, and list contents
- includes_list contains card names to include
- excludes_list contains card names to exclude
"""
includes = list(sess.get("include_cards") or [])
excludes = list(sess.get("exclude_cards") or [])
state = {
"includes": includes,
"excludes": excludes,
"enforcement_mode": (sess.get("enforcement_mode") or "warn"),
"allow_illegal": bool(sess.get("allow_illegal")),
"fuzzy_matching": bool(sess.get("fuzzy_matching", True)),
}
return state, includes, excludes
def _render_include_exclude_summary(
request: Request,
sess: dict,
sid: str,
*,
state: dict[str, Any] | None = None,
includes: list[str] | None = None,
excludes: list[str] | None = None,
) -> HTMLResponse:
"""
Render the include/exclude summary template.
Args:
request: FastAPI request object
sess: Session dictionary
sid: Session ID for cookie
state: Optional pre-computed state dict
includes: Optional pre-computed includes list
excludes: Optional pre-computed excludes list
Returns:
HTMLResponse with rendered include/exclude summary
"""
ctx = step5_base_ctx(request, sess, include_name=False, include_locks=False)
if state is None or includes is None or excludes is None:
state, includes, excludes = _must_have_state(sess)
ctx["must_have_state"] = state
ctx["summary"] = sess.get("step5_summary") if sess.get("step5_summary_ready") else None
ctx["include_cards"] = includes
ctx["exclude_cards"] = excludes
response = templates.TemplateResponse("partials/include_exclude_summary.html", ctx)
response.set_cookie("sid", sid, httponly=True, samesite="lax")
return response
@router.post("/must-haves/toggle", response_class=HTMLResponse)
async def toggle_must_haves(
request: Request,
card_name: str = Form(...),
list_type: str = Form(...),
enabled: str = Form("1"),
):
"""
Toggle a card's inclusion in the include or exclude list.
This endpoint handles:
- Adding/removing cards from include (must-have) lists
- Adding/removing cards from exclude (forbidden) lists
- Mutual exclusivity (card can't be in both lists)
- List size limits (10 includes, 15 excludes)
- Case-insensitive duplicate detection
Args:
request: FastAPI request object
card_name: Name of the card to toggle
list_type: Either "include" or "exclude"
enabled: "1"/"true"/"yes"/"on" to add, anything else to remove
Returns:
HTMLResponse with updated include/exclude summary, or
JSONResponse with error if validation fails
HX-Trigger Events:
must-haves:toggle: Payload with card, list, enabled status, and counts
"""
if not ALLOW_MUST_HAVES:
return JSONResponse({"error": "Must-have lists are disabled"}, status_code=403)
name = str(card_name or "").strip()
if not name:
return JSONResponse({"error": "Card name is required"}, status_code=400)
list_key = str(list_type or "").strip().lower()
if list_key not in {"include", "exclude"}:
return JSONResponse({"error": "Unsupported toggle type"}, status_code=400)
enabled_flag = str(enabled).strip().lower() in {"1", "true", "yes", "on"}
sid = request.cookies.get("sid") or request.headers.get("X-Session-ID")
if not sid:
sid = new_sid()
sess = get_session(sid)
includes = list(sess.get("include_cards") or [])
excludes = list(sess.get("exclude_cards") or [])
include_lookup = {str(v).strip().lower(): str(v) for v in includes if str(v).strip()}
exclude_lookup = {str(v).strip().lower(): str(v) for v in excludes if str(v).strip()}
key = name.lower()
display_name = include_lookup.get(key) or exclude_lookup.get(key) or name
changed = False
include_limit = 10
exclude_limit = 15
def _remove_casefold(items: list[str], item_key: str) -> list[str]:
"""Remove items matching the given key (case-insensitive)."""
return [c for c in items if str(c).strip().lower() != item_key]
if list_key == "include":
if enabled_flag:
if key not in include_lookup:
if len(include_lookup) >= include_limit:
return JSONResponse({"error": f"Include limit reached ({include_limit})."}, status_code=400)
includes.append(name)
include_lookup[key] = name
changed = True
if key in exclude_lookup:
excludes = _remove_casefold(excludes, key)
exclude_lookup.pop(key, None)
changed = True
else:
if key in include_lookup:
includes = _remove_casefold(includes, key)
include_lookup.pop(key, None)
changed = True
else: # exclude
if enabled_flag:
if key not in exclude_lookup:
if len(exclude_lookup) >= exclude_limit:
return JSONResponse({"error": f"Exclude limit reached ({exclude_limit})."}, status_code=400)
excludes.append(name)
exclude_lookup[key] = name
changed = True
if key in include_lookup:
includes = _remove_casefold(includes, key)
include_lookup.pop(key, None)
changed = True
else:
if key in exclude_lookup:
excludes = _remove_casefold(excludes, key)
exclude_lookup.pop(key, None)
changed = True
if changed:
sess["include_cards"] = includes
sess["exclude_cards"] = excludes
if "include_exclude_diagnostics" in sess:
try:
del sess["include_exclude_diagnostics"]
except Exception:
pass
response = _render_include_exclude_summary(request, sess, sid)
try:
log_include_exclude_toggle(
request,
card_name=display_name,
action=list_key,
enabled=enabled_flag,
include_count=len(includes),
exclude_count=len(excludes),
)
except Exception:
pass
trigger_payload = {
"card": display_name,
"list": list_key,
"enabled": enabled_flag,
"include_count": len(includes),
"exclude_count": len(excludes),
}
try:
_merge_hx_trigger(response, {"must-haves:toggle": trigger_payload})
except Exception:
pass
return response

View file

@ -0,0 +1,349 @@
"""Multi-copy archetype routes for deck building.
Handles multi-copy package detection, selection, and integration with the deck builder.
Multi-copy archetypes allow multiple copies of specific cards (e.g., Hare Apparent, Dragon's Approach).
Routes:
GET /multicopy/check - Check if commander/tags suggest multi-copy archetype
POST /multicopy/save - Save or skip multi-copy selection
GET /new/multicopy - Get multi-copy suggestions for New Deck modal (inline)
Created: 2026-02-20
Roadmap: R9 M1 Phase 2
"""
from __future__ import annotations
from fastapi import APIRouter, Request, Form, Query
from fastapi.responses import HTMLResponse
from html import escape as _esc
from deck_builder.builder import DeckBuilder
from deck_builder import builder_utils as bu, builder_constants as bc
from ..app import templates
from ..services.tasks import get_session, new_sid
from ..services import orchestrator as orch
from ..services.build_utils import owned_names as owned_names_helper
router = APIRouter()
def _rebuild_ctx_with_multicopy(sess: dict) -> None:
"""Rebuild the staged context so Multi-Copy runs first, avoiding overfill.
This ensures the added cards are accounted for before lands and later phases,
which keeps totals near targets and shows the multi-copy additions ahead of basics.
Args:
sess: Session dictionary containing build state
"""
try:
if not sess or not sess.get("commander"):
return
# Build fresh ctx with the same options, threading multi_copy explicitly
opts = orch.bracket_options()
default_bracket = (opts[0]["level"] if opts else 1)
bracket_val = sess.get("bracket")
try:
safe_bracket = int(bracket_val) if bracket_val is not None else default_bracket
except Exception:
safe_bracket = int(default_bracket)
ideals_val = sess.get("ideals") or orch.ideal_defaults()
use_owned = bool(sess.get("use_owned_only"))
prefer = bool(sess.get("prefer_owned"))
owned_names = owned_names_helper() if (use_owned or prefer) else None
locks = list(sess.get("locks", []))
sess["build_ctx"] = orch.start_build_ctx(
commander=sess.get("commander"),
tags=sess.get("tags", []),
bracket=safe_bracket,
ideals=ideals_val,
tag_mode=sess.get("tag_mode", "AND"),
use_owned_only=use_owned,
prefer_owned=prefer,
owned_names=owned_names,
locks=locks,
custom_export_base=sess.get("custom_export_base"),
multi_copy=sess.get("multi_copy"),
prefer_combos=bool(sess.get("prefer_combos")),
combo_target_count=int(sess.get("combo_target_count", 2)),
combo_balance=str(sess.get("combo_balance", "mix")),
swap_mdfc_basics=bool(sess.get("swap_mdfc_basics")),
)
except Exception:
# If rebuild fails (e.g., commander not found in test), fall back to injecting
# a minimal Multi-Copy stage on the existing builder so the UI can render additions.
try:
ctx = sess.get("build_ctx")
if not isinstance(ctx, dict):
return
b = ctx.get("builder")
if b is None:
return
# Thread selection onto the builder; runner will be resilient without full DFs
try:
setattr(b, "_web_multi_copy", sess.get("multi_copy") or None)
except Exception:
pass
# Ensure minimal structures exist
try:
if not isinstance(getattr(b, "card_library", None), dict):
b.card_library = {}
except Exception:
pass
try:
if not isinstance(getattr(b, "ideal_counts", None), dict):
b.ideal_counts = {}
except Exception:
pass
# Inject a single Multi-Copy stage
ctx["stages"] = [{"key": "multi_copy", "label": "Multi-Copy Package", "runner_name": "__add_multi_copy__"}]
ctx["idx"] = 0
ctx["last_visible_idx"] = 0
except Exception:
# Leave existing context untouched on unexpected failure
pass
@router.get("/multicopy/check", response_class=HTMLResponse)
async def multicopy_check(request: Request) -> HTMLResponse:
"""If current commander/tags suggest a multi-copy archetype, render a choose-one modal.
Returns empty content when not applicable to avoid flashing a modal unnecessarily.
Args:
request: FastAPI request object
Returns:
HTMLResponse with multi-copy modal or empty string
"""
sid = request.cookies.get("sid") or new_sid()
sess = get_session(sid)
commander = str(sess.get("commander") or "").strip()
tags = list(sess.get("tags") or [])
if not commander:
return HTMLResponse("")
# Avoid re-prompting repeatedly for the same selection context
key = commander + "||" + ",".join(sorted([str(t).strip().lower() for t in tags if str(t).strip()]))
seen = set(sess.get("mc_seen_keys", []) or [])
if key in seen:
return HTMLResponse("")
# Build a light DeckBuilder seeded with commander + tags (no heavy data load required)
try:
tmp = DeckBuilder(output_func=lambda *_: None, input_func=lambda *_: "", headless=True)
df = tmp.load_commander_data()
row = df[df["name"].astype(str) == commander]
if row.empty:
return HTMLResponse("")
tmp._apply_commander_selection(row.iloc[0])
tmp.selected_tags = list(tags or [])
try:
tmp.primary_tag = tmp.selected_tags[0] if len(tmp.selected_tags) > 0 else None
tmp.secondary_tag = tmp.selected_tags[1] if len(tmp.selected_tags) > 1 else None
tmp.tertiary_tag = tmp.selected_tags[2] if len(tmp.selected_tags) > 2 else None
except Exception:
pass
# Establish color identity from the selected commander
try:
tmp.determine_color_identity()
except Exception:
pass
# Detect viable archetypes
results = bu.detect_viable_multi_copy_archetypes(tmp) or []
if not results:
# Remember this key to avoid re-checking until tags/commander change
try:
seen.add(key)
sess["mc_seen_keys"] = list(seen)
except Exception:
pass
return HTMLResponse("")
# Render modal template with top N (cap small for UX)
items = results[:5]
ctx = {
"request": request,
"items": items,
"commander": commander,
"tags": tags,
}
return templates.TemplateResponse("build/_multi_copy_modal.html", ctx)
except Exception:
return HTMLResponse("")
@router.post("/multicopy/save", response_class=HTMLResponse)
async def multicopy_save(
request: Request,
choice_id: str = Form(None),
count: int = Form(None),
thrumming: str | None = Form(None),
skip: str | None = Form(None),
) -> HTMLResponse:
"""Persist user selection (or skip) for multi-copy archetype in session and close modal.
Returns a tiny confirmation chip via OOB swap (optional) and removes the modal.
Args:
request: FastAPI request object
choice_id: Multi-copy archetype ID (e.g., 'hare_apparent')
count: Number of copies to include
thrumming: Whether to include Thrumming Stone
skip: Whether to skip multi-copy for this build
Returns:
HTMLResponse with confirmation chip (OOB swap)
"""
sid = request.cookies.get("sid") or new_sid()
sess = get_session(sid)
commander = str(sess.get("commander") or "").strip()
tags = list(sess.get("tags") or [])
key = commander + "||" + ",".join(sorted([str(t).strip().lower() for t in tags if str(t).strip()]))
# Update seen set to avoid re-prompt next load
seen = set(sess.get("mc_seen_keys", []) or [])
seen.add(key)
sess["mc_seen_keys"] = list(seen)
# Handle skip explicitly
if skip and str(skip).strip() in ("1","true","on","yes"):
# Clear any prior choice for this run
try:
if sess.get("multi_copy"):
del sess["multi_copy"]
if sess.get("mc_applied_key"):
del sess["mc_applied_key"]
except Exception:
pass
# Return nothing (modal will be removed client-side)
# Also emit an OOB chip indicating skip
chip = (
'<div id="last-action" hx-swap-oob="true">'
'<span class="chip" title="Click to dismiss">Dismissed multi-copy suggestions</span>'
'</div>'
)
return HTMLResponse(chip)
# Persist selection when provided
payload = None
try:
meta = bc.MULTI_COPY_ARCHETYPES.get(str(choice_id), {})
name = meta.get("name") or str(choice_id)
printed_cap = meta.get("printed_cap")
# Coerce count with bounds: default -> rec_window[0], cap by printed_cap when present
if count is None:
count = int(meta.get("default_count", 25))
try:
count = int(count)
except Exception:
count = int(meta.get("default_count", 25))
if isinstance(printed_cap, int) and printed_cap > 0:
count = max(1, min(printed_cap, count))
payload = {
"id": str(choice_id),
"name": name,
"count": int(count),
"thrumming": True if (thrumming and str(thrumming).strip() in ("1","true","on","yes")) else False,
}
sess["multi_copy"] = payload
# Mark as not yet applied so the next build start/continue can account for it once
try:
if sess.get("mc_applied_key"):
del sess["mc_applied_key"]
except Exception:
pass
# If there's an active build context, rebuild it so Multi-Copy runs first
if sess.get("build_ctx"):
_rebuild_ctx_with_multicopy(sess)
except Exception:
payload = None
# Return OOB chip summarizing the selection
if payload:
chip = (
'<div id="last-action" hx-swap-oob="true">'
f'<span class="chip" title="Click to dismiss">Selected multi-copy: '
f"<strong>{_esc(payload.get('name',''))}</strong> x{int(payload.get('count',0))}"
f"{' + Thrumming Stone' if payload.get('thrumming') else ''}</span>"
'</div>'
)
else:
chip = (
'<div id="last-action" hx-swap-oob="true">'
'<span class="chip" title="Click to dismiss">Saved</span>'
'</div>'
)
return HTMLResponse(chip)
@router.get("/new/multicopy", response_class=HTMLResponse)
async def build_new_multicopy(
request: Request,
commander: str = Query(""),
primary_tag: str | None = Query(None),
secondary_tag: str | None = Query(None),
tertiary_tag: str | None = Query(None),
tag_mode: str | None = Query("AND"),
) -> HTMLResponse:
"""Return multi-copy suggestions for the New Deck modal based on commander + selected tags.
This does not mutate the session; it simply renders a form snippet that posts with the main modal.
Args:
request: FastAPI request object
commander: Commander name
primary_tag: Primary theme tag
secondary_tag: Secondary theme tag
tertiary_tag: Tertiary theme tag
tag_mode: Tag matching mode (AND/OR)
Returns:
HTMLResponse with multi-copy suggestions or empty string
"""
name = (commander or "").strip()
if not name:
return HTMLResponse("")
try:
tmp = DeckBuilder(output_func=lambda *_: None, input_func=lambda *_: "", headless=True)
df = tmp.load_commander_data()
row = df[df["name"].astype(str) == name]
if row.empty:
return HTMLResponse("")
tmp._apply_commander_selection(row.iloc[0])
tags = [t for t in [primary_tag, secondary_tag, tertiary_tag] if t]
tmp.selected_tags = list(tags or [])
try:
tmp.primary_tag = tmp.selected_tags[0] if len(tmp.selected_tags) > 0 else None
tmp.secondary_tag = tmp.selected_tags[1] if len(tmp.selected_tags) > 1 else None
tmp.tertiary_tag = tmp.selected_tags[2] if len(tmp.selected_tags) > 2 else None
except Exception:
pass
try:
tmp.determine_color_identity()
except Exception:
pass
results = bu.detect_viable_multi_copy_archetypes(tmp) or []
# For the New Deck modal, only show suggestions where the matched tags intersect
# the explicitly selected tags (ignore commander-default themes).
sel_tags = {str(t).strip().lower() for t in (tags or []) if str(t).strip()}
def _matched_reason_tags(item: dict) -> set[str]:
out = set()
try:
for r in item.get('reasons', []) or []:
if not isinstance(r, str):
continue
rl = r.strip().lower()
if rl.startswith('tags:'):
body = rl.split('tags:', 1)[1].strip()
parts = [p.strip() for p in body.split(',') if p.strip()]
out.update(parts)
except Exception:
return set()
return out
if sel_tags:
results = [it for it in results if (_matched_reason_tags(it) & sel_tags)]
else:
# If no selected tags, do not show any multi-copy suggestions in the modal
results = []
if not results:
return HTMLResponse("")
items = results[:5]
ctx = {"request": request, "items": items}
return templates.TemplateResponse("build/_new_deck_multicopy.html", ctx)
except Exception:
return HTMLResponse("")

View file

@ -0,0 +1,738 @@
"""
Partner mechanics routes and utilities for deck building.
Handles partner commanders, backgrounds, Doctor/Companion pairings,
and partner preview/validation functionality.
"""
from __future__ import annotations
from typing import Any, Iterable
from urllib.parse import quote_plus
from fastapi import APIRouter, Request, Form
from fastapi.responses import JSONResponse
from ..app import (
ENABLE_PARTNER_MECHANICS,
ENABLE_PARTNER_SUGGESTIONS,
)
from ..services.telemetry import log_partner_suggestion_selected
from ..services.partner_suggestions import get_partner_suggestions
from ..services.commander_catalog_loader import (
load_commander_catalog,
find_commander_record,
CommanderRecord,
normalized_restricted_labels,
shared_restricted_partner_label,
)
from deck_builder.background_loader import load_background_cards
from deck_builder.partner_selection import apply_partner_inputs
from deck_builder.builder import DeckBuilder
from exceptions import CommanderPartnerError
from code.logging_util import get_logger
LOGGER = get_logger(__name__)
router = APIRouter()
_PARTNER_MODE_LABELS = {
"partner": "Partner",
"partner_restricted": "Partner (Restricted)",
"partner_with": "Partner With",
"background": "Choose a Background",
"doctor_companion": "Doctor & Companion",
}
_WUBRG_ORDER = ["W", "U", "B", "R", "G"]
_COLOR_NAME_MAP = {
"W": "White",
"U": "Blue",
"B": "Black",
"R": "Red",
"G": "Green",
}
def _color_code(identity: Iterable[str]) -> str:
"""Convert color identity to standard WUBRG-ordered code."""
colors = [str(c).strip().upper() for c in identity if str(c).strip()]
if not colors:
return "C"
ordered: list[str] = [c for c in _WUBRG_ORDER if c in colors]
for color in colors:
if color not in ordered:
ordered.append(color)
return "".join(ordered) or "C"
def _format_color_label(identity: Iterable[str]) -> str:
"""Format color identity as human-readable label with code."""
code = _color_code(identity)
if code == "C":
return "Colorless (C)"
names = [_COLOR_NAME_MAP.get(ch, ch) for ch in code]
return " / ".join(names) + f" ({code})"
def _partner_mode_label(mode: str | None) -> str:
"""Convert partner mode to display label."""
if not mode:
return "Partner Mechanics"
return _PARTNER_MODE_LABELS.get(mode, mode.title())
def _scryfall_image_url(card_name: str, version: str = "normal") -> str | None:
"""Generate Scryfall image URL for card."""
name = str(card_name or "").strip()
if not name:
return None
return f"https://api.scryfall.com/cards/named?fuzzy={quote_plus(name)}&format=image&version={version}"
def _scryfall_page_url(card_name: str) -> str | None:
"""Generate Scryfall search URL for card."""
name = str(card_name or "").strip()
if not name:
return None
return f"https://scryfall.com/search?q={quote_plus(name)}"
def _secondary_role_label(mode: str | None, secondary_name: str | None) -> str | None:
"""Determine the role label for the secondary commander based on pairing mode."""
if not mode:
return None
mode_lower = mode.lower()
if mode_lower == "background":
return "Background"
if mode_lower == "partner_with":
return "Partner With"
if mode_lower == "doctor_companion":
record = find_commander_record(secondary_name or "") if secondary_name else None
if record and getattr(record, "is_doctor", False):
return "Doctor"
if record and getattr(record, "is_doctors_companion", False):
return "Doctor's Companion"
return "Doctor pairing"
return "Partner commander"
def _combined_to_payload(combined: Any) -> dict[str, Any]:
"""Convert CombinedCommander object to JSON-serializable payload."""
color_identity = tuple(getattr(combined, "color_identity", ()) or ())
warnings = list(getattr(combined, "warnings", []) or [])
mode_obj = getattr(combined, "partner_mode", None)
mode_value = getattr(mode_obj, "value", None) if mode_obj is not None else None
secondary = getattr(combined, "secondary_name", None)
secondary_image = _scryfall_image_url(secondary)
secondary_url = _scryfall_page_url(secondary)
secondary_role = _secondary_role_label(mode_value, secondary)
return {
"primary_name": getattr(combined, "primary_name", None),
"secondary_name": secondary,
"partner_mode": mode_value,
"partner_mode_label": _partner_mode_label(mode_value),
"color_identity": list(color_identity),
"color_code": _color_code(color_identity),
"color_label": _format_color_label(color_identity),
"theme_tags": list(getattr(combined, "theme_tags", []) or []),
"warnings": warnings,
"secondary_image_url": secondary_image,
"secondary_scryfall_url": secondary_url,
"secondary_role_label": secondary_role,
}
def _build_partner_options(primary: CommanderRecord | None) -> tuple[list[dict[str, Any]], str | None]:
"""
Build list of valid partner options for a given primary commander.
Returns:
Tuple of (partner_options_list, variant_type) where variant is
"partner", "doctor_companion", or None
"""
if not ENABLE_PARTNER_MECHANICS:
return [], None
try:
catalog = load_commander_catalog()
except Exception:
return [], None
if primary is None:
return [], None
primary_name = primary.display_name.casefold()
primary_partner_targets = {target.casefold() for target in (primary.partner_with or ())}
primary_is_partner = bool(primary.is_partner or primary_partner_targets)
primary_restricted_labels = normalized_restricted_labels(primary)
primary_is_doctor = bool(primary.is_doctor)
primary_is_companion = bool(primary.is_doctors_companion)
variant: str | None = None
if primary_is_doctor or primary_is_companion:
variant = "doctor_companion"
elif primary_is_partner:
variant = "partner"
options: list[dict[str, Any]] = []
if variant is None:
return [], None
for record in catalog.entries:
if record.display_name.casefold() == primary_name:
continue
pairing_mode: str | None = None
role_label: str | None = None
restriction_label: str | None = None
record_name_cf = record.display_name.casefold()
is_direct_pair = bool(primary_partner_targets and record_name_cf in primary_partner_targets)
if variant == "doctor_companion":
if is_direct_pair:
pairing_mode = "partner_with"
role_label = "Partner With"
elif primary_is_doctor and record.is_doctors_companion:
pairing_mode = "doctor_companion"
role_label = "Doctor's Companion"
elif primary_is_companion and record.is_doctor:
pairing_mode = "doctor_companion"
role_label = "Doctor"
else:
if not record.is_partner or record.is_background:
continue
if primary_partner_targets:
if not is_direct_pair:
continue
pairing_mode = "partner_with"
role_label = "Partner With"
elif primary_restricted_labels:
restriction = shared_restricted_partner_label(primary, record)
if not restriction:
continue
pairing_mode = "partner_restricted"
restriction_label = restriction
else:
if record.partner_with:
continue
if not getattr(record, "has_plain_partner", False):
continue
if record.is_doctors_companion:
continue
pairing_mode = "partner"
if not pairing_mode:
continue
options.append(
{
"name": record.display_name,
"color_code": _color_code(record.color_identity),
"color_label": _format_color_label(record.color_identity),
"partner_with": list(record.partner_with or ()),
"pairing_mode": pairing_mode,
"role_label": role_label,
"restriction_label": restriction_label,
"mode_label": _partner_mode_label(pairing_mode),
"image_url": _scryfall_image_url(record.display_name),
"scryfall_url": _scryfall_page_url(record.display_name),
}
)
options.sort(key=lambda item: item["name"].casefold())
return options, variant
def _build_background_options() -> list[dict[str, Any]]:
"""Build list of available background cards for Choose a Background commanders."""
if not ENABLE_PARTNER_MECHANICS:
return []
options: list[dict[str, Any]] = []
try:
catalog = load_background_cards()
except FileNotFoundError as exc:
LOGGER.warning("background_cards_missing fallback_to_commander_catalog", extra={"error": str(exc)})
catalog = None
except Exception as exc: # pragma: no cover - unexpected loader failure
LOGGER.warning("background_cards_failed fallback_to_commander_catalog", exc_info=exc)
catalog = None
if catalog and getattr(catalog, "entries", None):
seen: set[str] = set()
for card in catalog.entries:
name_key = card.display_name.casefold()
if name_key in seen:
continue
seen.add(name_key)
options.append(
{
"name": card.display_name,
"color_code": _color_code(card.color_identity),
"color_label": _format_color_label(card.color_identity),
"image_url": _scryfall_image_url(card.display_name),
"scryfall_url": _scryfall_page_url(card.display_name),
"role_label": "Background",
}
)
if options:
options.sort(key=lambda item: item["name"].casefold())
return options
fallback_options = _background_options_from_commander_catalog()
if fallback_options:
return fallback_options
return options
def _background_options_from_commander_catalog() -> list[dict[str, Any]]:
"""Fallback: load backgrounds from commander catalog when background_cards.json is unavailable."""
try:
catalog = load_commander_catalog()
except Exception as exc: # pragma: no cover - catalog load issues handled elsewhere
LOGGER.warning("commander_catalog_background_fallback_failed", exc_info=exc)
return []
seen: set[str] = set()
options: list[dict[str, Any]] = []
for record in getattr(catalog, "entries", ()):
if not getattr(record, "is_background", False):
continue
name = getattr(record, "display_name", None)
if not name:
continue
key = str(name).casefold()
if key in seen:
continue
seen.add(key)
color_identity = getattr(record, "color_identity", tuple())
options.append(
{
"name": name,
"color_code": _color_code(color_identity),
"color_label": _format_color_label(color_identity),
"image_url": _scryfall_image_url(name),
"scryfall_url": _scryfall_page_url(name),
"role_label": "Background",
}
)
options.sort(key=lambda item: item["name"].casefold())
return options
def _partner_ui_context(
commander_name: str,
*,
partner_enabled: bool,
secondary_selection: str | None,
background_selection: str | None,
combined_preview: dict[str, Any] | None,
warnings: Iterable[str] | None,
partner_error: str | None,
auto_note: str | None,
auto_assigned: bool | None = None,
auto_prefill_allowed: bool = True,
) -> dict[str, Any]:
"""
Build complete partner UI context for rendering partner selection components.
This includes partner options, background options, preview payload,
suggestions, warnings, and all necessary state for the partner UI.
"""
record = find_commander_record(commander_name)
partner_options, partner_variant = _build_partner_options(record)
supports_backgrounds = bool(record.supports_backgrounds) if record else False
background_options = _build_background_options() if supports_backgrounds else []
selected_secondary = (secondary_selection or "").strip()
selected_background = (background_selection or "").strip()
warnings_list = list(warnings or [])
preview_payload: dict[str, Any] | None = combined_preview if isinstance(combined_preview, dict) else None
preview_error: str | None = None
auto_prefill_applied = False
auto_default_name: str | None = None
auto_note_value = auto_note
# Auto-prefill Partner With targets
if (
ENABLE_PARTNER_MECHANICS
and partner_variant == "partner"
and record
and record.partner_with
and not selected_secondary
and not selected_background
and auto_prefill_allowed
):
target_names = [name.strip() for name in record.partner_with if str(name).strip()]
for target in target_names:
for option in partner_options:
if option["name"].casefold() == target.casefold():
selected_secondary = option["name"]
auto_default_name = option["name"]
auto_prefill_applied = True
if not auto_note_value:
auto_note_value = f"Automatically paired with {option['name']} (Partner With)."
break
if auto_prefill_applied:
break
partner_active = bool((selected_secondary or selected_background) and ENABLE_PARTNER_MECHANICS)
partner_capable = bool(ENABLE_PARTNER_MECHANICS and (partner_options or background_options))
# Dynamic labels based on variant
placeholder = "Select a partner"
select_label = "Partner commander"
role_hint: str | None = None
if partner_variant == "doctor_companion" and record:
has_partner_with_option = any(option.get("pairing_mode") == "partner_with" for option in partner_options)
if record.is_doctor:
if has_partner_with_option:
placeholder = "Select a companion or Partner With match"
select_label = "Companion or Partner"
role_hint = "Choose a Doctor's Companion or Partner With match for this Doctor."
else:
placeholder = "Select a companion"
select_label = "Companion"
role_hint = "Choose a Doctor's Companion to pair with this Doctor."
elif record.is_doctors_companion:
if has_partner_with_option:
placeholder = "Select a Doctor or Partner With match"
select_label = "Doctor or Partner"
role_hint = "Choose a Doctor or Partner With pairing for this companion."
else:
placeholder = "Select a Doctor"
select_label = "Doctor partner"
role_hint = "Choose a Doctor to accompany this companion."
# Partner suggestions
suggestions_enabled = bool(ENABLE_PARTNER_MECHANICS and ENABLE_PARTNER_SUGGESTIONS)
suggestions_visible: list[dict[str, Any]] = []
suggestions_hidden: list[dict[str, Any]] = []
suggestions_total = 0
suggestions_metadata: dict[str, Any] = {}
suggestions_error: str | None = None
suggestions_loaded = False
if suggestions_enabled and record:
try:
suggestion_result = get_partner_suggestions(record.display_name)
except Exception as exc: # pragma: no cover - defensive logging
LOGGER.warning("partner suggestions failed", exc_info=exc)
suggestion_result = None
if suggestion_result is None:
suggestions_error = "Partner suggestions dataset is unavailable."
else:
suggestions_loaded = True
partner_names = [opt.get("name") for opt in (partner_options or []) if opt.get("name")]
background_names = [opt.get("name") for opt in (background_options or []) if opt.get("name")]
try:
visible, hidden = suggestion_result.flatten(partner_names, background_names, visible_limit=3)
except Exception as exc: # pragma: no cover - defensive
LOGGER.warning("partner suggestions flatten failed", exc_info=exc)
visible = []
hidden = []
suggestions_visible = visible
suggestions_hidden = hidden
suggestions_total = suggestion_result.total
if isinstance(suggestion_result.metadata, dict):
suggestions_metadata = dict(suggestion_result.metadata)
context = {
"partner_feature_available": ENABLE_PARTNER_MECHANICS,
"partner_capable": partner_capable,
"partner_enabled": partner_active,
"selected_secondary_commander": selected_secondary,
"selected_background": selected_background if supports_backgrounds else "",
"partner_options": partner_options if partner_options else [],
"background_options": background_options if background_options else [],
"primary_partner_with": list(record.partner_with) if record else [],
"primary_supports_backgrounds": supports_backgrounds,
"primary_is_partner": bool(record.is_partner) if record else False,
"primary_commander_display": record.display_name if record else commander_name,
"partner_preview": preview_payload,
"partner_warnings": warnings_list,
"partner_error": partner_error,
"partner_auto_note": auto_note_value,
"partner_auto_assigned": bool(auto_prefill_applied or auto_assigned),
"partner_auto_default": auto_default_name,
"partner_select_variant": partner_variant,
"partner_select_label": select_label,
"partner_select_placeholder": placeholder,
"partner_role_hint": role_hint,
"partner_suggestions_enabled": suggestions_enabled,
"partner_suggestions": suggestions_visible,
"partner_suggestions_hidden": suggestions_hidden,
"partner_suggestions_total": suggestions_total,
"partner_suggestions_metadata": suggestions_metadata,
"partner_suggestions_loaded": suggestions_loaded,
"partner_suggestions_error": suggestions_error,
"partner_suggestions_available": bool(suggestions_visible or suggestions_hidden),
"partner_suggestions_has_hidden": bool(suggestions_hidden),
"partner_suggestions_endpoint": "/api/partner/suggestions",
}
context["has_partner_options"] = bool(partner_options)
context["has_background_options"] = bool(background_options)
context["partner_hidden_value"] = "1" if partner_capable else "0"
context["partner_auto_opt_out"] = not bool(auto_prefill_allowed)
context["partner_prefill_available"] = bool(partner_variant == "partner" and partner_options)
# Generate preview if not provided
if preview_payload is None and ENABLE_PARTNER_MECHANICS and (selected_secondary or selected_background):
try:
builder = DeckBuilder(output_func=lambda *_: None, input_func=lambda *_: "", headless=True)
combined_obj = apply_partner_inputs(
builder,
primary_name=commander_name,
secondary_name=selected_secondary or None,
background_name=selected_background or None,
feature_enabled=True,
)
except CommanderPartnerError as exc:
preview_error = str(exc) or "Invalid partner selection."
except Exception as exc:
preview_error = f"Partner preview failed: {exc}"
else:
if combined_obj is not None:
preview_payload = _combined_to_payload(combined_obj)
if combined_obj.warnings:
for warn in combined_obj.warnings:
if warn not in warnings_list:
warnings_list.append(warn)
if preview_payload:
context["partner_preview"] = preview_payload
preview_tags = preview_payload.get("theme_tags")
if preview_tags:
context["partner_theme_tags"] = list(preview_tags)
if preview_error and not partner_error:
context["partner_error"] = preview_error
partner_error = preview_error
context["partner_warnings"] = warnings_list
return context
def _resolve_partner_selection(
commander_name: str,
*,
feature_enabled: bool,
partner_enabled: bool,
secondary_candidate: str | None,
background_candidate: str | None,
auto_opt_out: bool = False,
selection_source: str | None = None,
) -> tuple[
str | None,
dict[str, Any] | None,
list[str],
str | None,
str | None,
str | None,
str | None,
bool,
]:
"""
Resolve and validate partner selection, applying auto-pairing when appropriate.
Returns:
Tuple of (error, preview_payload, warnings, auto_note, resolved_secondary,
resolved_background, partner_mode, auto_assigned_flag)
"""
if not (feature_enabled and ENABLE_PARTNER_MECHANICS):
return None, None, [], None, None, None, None, False
secondary = (secondary_candidate or "").strip()
background = (background_candidate or "").strip()
auto_note: str | None = None
auto_assigned = False
selection_source_clean = (selection_source or "").strip().lower() or None
record = find_commander_record(commander_name)
partner_options, partner_variant = _build_partner_options(record)
supports_backgrounds = bool(record and record.supports_backgrounds)
background_options = _build_background_options() if supports_backgrounds else []
if not partner_enabled and not secondary and not background:
return None, None, [], None, None, None, None, False
if not supports_backgrounds:
background = ""
if not partner_options:
secondary = ""
if secondary and background:
return "Provide either a secondary commander or a background, not both.", None, [], auto_note, secondary, background, None, False
option_lookup = {opt["name"].casefold(): opt for opt in partner_options}
if secondary:
key = secondary.casefold()
if key not in option_lookup:
return "Selected partner is not valid for this commander.", None, [], auto_note, secondary, background or None, None, False
if background:
normalized_backgrounds = {opt["name"].casefold() for opt in background_options}
if background.casefold() not in normalized_backgrounds:
return "Selected background is not available.", None, [], auto_note, secondary or None, background, None, False
# Auto-assign Partner With targets
if not secondary and not background and not auto_opt_out and partner_variant == "partner" and record and record.partner_with:
target_names = [name.strip() for name in record.partner_with if str(name).strip()]
for target in target_names:
opt = option_lookup.get(target.casefold())
if opt:
secondary = opt["name"]
auto_note = f"Automatically paired with {secondary} (Partner With)."
auto_assigned = True
break
if not secondary and not background:
return None, None, [], auto_note, None, None, None, auto_assigned
builder = DeckBuilder(output_func=lambda *_: None, input_func=lambda *_: "", headless=True)
try:
combined = apply_partner_inputs(
builder,
primary_name=commander_name,
secondary_name=secondary or None,
background_name=background or None,
feature_enabled=True,
selection_source=selection_source_clean,
)
except CommanderPartnerError as exc:
message = str(exc) or "Invalid partner selection."
return message, None, [], auto_note, secondary or None, background or None, None, auto_assigned
except Exception as exc:
return f"Partner selection failed: {exc}", None, [], auto_note, secondary or None, background or None, None, auto_assigned
if combined is None:
return "Unable to resolve partner selection.", None, [], auto_note, secondary or None, background or None, None, auto_assigned
payload = _combined_to_payload(combined)
warnings = payload.get("warnings", []) or []
mode = payload.get("partner_mode")
if mode == "background":
resolved_background = payload.get("secondary_name")
return None, payload, warnings, auto_note, None, resolved_background, mode, auto_assigned
return None, payload, warnings, auto_note, payload.get("secondary_name"), None, mode, auto_assigned
@router.post("/partner/preview", response_class=JSONResponse)
async def build_partner_preview(
request: Request,
commander: str = Form(...),
partner_enabled: str | None = Form(None),
secondary_commander: str | None = Form(None),
background: str | None = Form(None),
partner_auto_opt_out: str | None = Form(None),
scope: str | None = Form(None),
selection_source: str | None = Form(None),
) -> JSONResponse:
"""
Preview a partner pairing and return combined commander details.
This endpoint validates partner selections and returns:
- Combined color identity and theme tags
- Partner preview payload with images and metadata
- Warnings about legality or capability mismatches
- Auto-pairing information for Partner With targets
Args:
request: FastAPI request
commander: Primary commander name
partner_enabled: Whether partner mechanics are enabled ("1"/"true"/etc.)
secondary_commander: Secondary partner commander name
background: Background card name (for Choose a Background commanders)
partner_auto_opt_out: Opt-out of auto-pairing for Partner With
scope: Request scope identifier
selection_source: Source of selection (e.g., "suggestion", "manual")
Returns:
JSONResponse with partner preview data and validation results
"""
partner_feature_enabled = ENABLE_PARTNER_MECHANICS
raw_partner_enabled = (partner_enabled or "").strip().lower()
partner_flag = partner_feature_enabled and raw_partner_enabled in {"1", "true", "on", "yes"}
auto_opt_out_flag = (partner_auto_opt_out or "").strip().lower() in {"1", "true", "on", "yes"}
selection_source_value = (selection_source or "").strip().lower() or None
try:
(
partner_error,
combined_payload,
partner_warnings,
partner_auto_note,
resolved_secondary,
resolved_background,
partner_mode,
partner_auto_assigned_flag,
) = _resolve_partner_selection(
commander,
feature_enabled=partner_feature_enabled,
partner_enabled=partner_flag,
secondary_candidate=secondary_commander,
background_candidate=background,
auto_opt_out=auto_opt_out_flag,
selection_source=selection_source_value,
)
except Exception as exc: # pragma: no cover - defensive
return JSONResponse(
{
"ok": False,
"error": f"Partner preview failed: {exc}",
"scope": scope or "",
}
)
partner_ctx = _partner_ui_context(
commander,
partner_enabled=partner_flag,
secondary_selection=resolved_secondary or secondary_commander,
background_selection=resolved_background or background,
combined_preview=combined_payload,
warnings=partner_warnings,
partner_error=partner_error,
auto_note=partner_auto_note,
auto_assigned=partner_auto_assigned_flag,
auto_prefill_allowed=not auto_opt_out_flag,
)
preview_payload = partner_ctx.get("partner_preview")
theme_tags = partner_ctx.get("partner_theme_tags") or []
warnings_list = partner_ctx.get("partner_warnings") or partner_warnings or []
response = {
"ok": True,
"scope": scope or "",
"preview": preview_payload,
"theme_tags": theme_tags,
"warnings": warnings_list,
"auto_note": partner_auto_note,
"resolved_secondary": resolved_secondary,
"resolved_background": resolved_background,
"partner_mode": partner_mode,
"auto_assigned": bool(partner_auto_assigned_flag),
}
if partner_error:
response["error"] = partner_error
try:
log_partner_suggestion_selected(
request,
commander=commander,
scope=scope,
partner_enabled=partner_flag,
auto_opt_out=auto_opt_out_flag,
auto_assigned=bool(partner_auto_assigned_flag),
selection_source=selection_source_value,
secondary_candidate=secondary_commander,
background_candidate=background,
resolved_secondary=resolved_secondary,
resolved_background=resolved_background,
partner_mode=partner_mode,
has_preview=bool(preview_payload),
warnings=warnings_list,
error=response.get("error"),
)
except Exception: # pragma: no cover - telemetry should not break responses
pass
return JSONResponse(response)

View file

@ -0,0 +1,205 @@
"""
Custom theme management routes for deck building.
Handles user-defined custom themes including adding, removing, choosing
suggestions, and switching between permissive/strict matching modes.
"""
from __future__ import annotations
from typing import Any
from fastapi import APIRouter, Request, Form
from fastapi.responses import HTMLResponse
from ..app import (
ENABLE_CUSTOM_THEMES,
USER_THEME_LIMIT,
DEFAULT_THEME_MATCH_MODE,
_sanitize_theme,
templates,
)
from ..services.tasks import get_session, new_sid
from ..services import custom_theme_manager as theme_mgr
router = APIRouter()
_INVALID_THEME_MESSAGE = (
"Theme names can only include letters, numbers, spaces, hyphens, apostrophes, and underscores."
)
def _custom_theme_context(
request: Request,
sess: dict,
*,
message: str | None = None,
level: str = "info",
) -> dict[str, Any]:
"""
Assemble the Additional Themes section context for the modal.
Args:
request: FastAPI request object
sess: Session dictionary
message: Optional status message to display
level: Message level ("info", "success", "warning", "error")
Returns:
Context dictionary for rendering the additional themes template
"""
if not ENABLE_CUSTOM_THEMES:
return {
"request": request,
"theme_state": None,
"theme_message": message,
"theme_message_level": level,
"theme_limit": USER_THEME_LIMIT,
"enable_custom_themes": False,
}
theme_mgr.set_limit(sess, USER_THEME_LIMIT)
state = theme_mgr.get_view_state(sess, default_mode=DEFAULT_THEME_MATCH_MODE)
return {
"request": request,
"theme_state": state,
"theme_message": message,
"theme_message_level": level,
"theme_limit": USER_THEME_LIMIT,
"enable_custom_themes": ENABLE_CUSTOM_THEMES,
}
@router.post("/themes/add", response_class=HTMLResponse)
async def build_theme_add(request: Request, theme: str = Form("")) -> HTMLResponse:
"""
Add a custom theme to the user's theme list.
Validates theme name format and enforces theme count limits.
Args:
request: FastAPI request object
theme: Theme name to add (will be trimmed and sanitized)
Returns:
HTMLResponse with updated themes list and status message
"""
if not ENABLE_CUSTOM_THEMES:
return HTMLResponse("", status_code=204)
sid = request.cookies.get("sid") or new_sid()
sess = get_session(sid)
trimmed = theme.strip()
sanitized = _sanitize_theme(trimmed) if trimmed else ""
if trimmed and not sanitized:
ctx = _custom_theme_context(request, sess, message=_INVALID_THEME_MESSAGE, level="error")
else:
value = sanitized if sanitized is not None else trimmed
_, message, level = theme_mgr.add_theme(
sess,
value,
commander_tags=list(sess.get("tags", [])),
mode=sess.get("theme_match_mode", DEFAULT_THEME_MATCH_MODE),
limit=USER_THEME_LIMIT,
)
ctx = _custom_theme_context(request, sess, message=message, level=level)
resp = templates.TemplateResponse("build/_new_deck_additional_themes.html", ctx)
resp.set_cookie("sid", sid, httponly=True, samesite="lax")
return resp
@router.post("/themes/remove", response_class=HTMLResponse)
async def build_theme_remove(request: Request, theme: str = Form("")) -> HTMLResponse:
"""
Remove a custom theme from the user's theme list.
Args:
request: FastAPI request object
theme: Theme name to remove
Returns:
HTMLResponse with updated themes list and status message
"""
if not ENABLE_CUSTOM_THEMES:
return HTMLResponse("", status_code=204)
sid = request.cookies.get("sid") or new_sid()
sess = get_session(sid)
value = _sanitize_theme(theme) or theme
_, message, level = theme_mgr.remove_theme(
sess,
value,
commander_tags=list(sess.get("tags", [])),
mode=sess.get("theme_match_mode", DEFAULT_THEME_MATCH_MODE),
)
ctx = _custom_theme_context(request, sess, message=message, level=level)
resp = templates.TemplateResponse("build/_new_deck_additional_themes.html", ctx)
resp.set_cookie("sid", sid, httponly=True, samesite="lax")
return resp
@router.post("/themes/choose", response_class=HTMLResponse)
async def build_theme_choose(
request: Request,
original: str = Form(""),
choice: str = Form(""),
) -> HTMLResponse:
"""
Replace an invalid theme with a suggested alternative.
When a user's custom theme doesn't perfectly match commander tags,
the system suggests alternatives. This route accepts the user's
choice from those suggestions.
Args:
request: FastAPI request object
original: The original (invalid) theme name
choice: The selected suggestion to use instead
Returns:
HTMLResponse with updated themes list and status message
"""
if not ENABLE_CUSTOM_THEMES:
return HTMLResponse("", status_code=204)
sid = request.cookies.get("sid") or new_sid()
sess = get_session(sid)
selection = _sanitize_theme(choice) or choice
_, message, level = theme_mgr.choose_suggestion(
sess,
original,
selection,
commander_tags=list(sess.get("tags", [])),
mode=sess.get("theme_match_mode", DEFAULT_THEME_MATCH_MODE),
)
ctx = _custom_theme_context(request, sess, message=message, level=level)
resp = templates.TemplateResponse("build/_new_deck_additional_themes.html", ctx)
resp.set_cookie("sid", sid, httponly=True, samesite="lax")
return resp
@router.post("/themes/mode", response_class=HTMLResponse)
async def build_theme_mode(request: Request, mode: str = Form("permissive")) -> HTMLResponse:
"""
Switch theme matching mode between permissive and strict.
- Permissive: Suggests alternatives for invalid themes
- Strict: Rejects invalid themes outright
Args:
request: FastAPI request object
mode: Either "permissive" or "strict"
Returns:
HTMLResponse with updated themes list and status message
"""
if not ENABLE_CUSTOM_THEMES:
return HTMLResponse("", status_code=204)
sid = request.cookies.get("sid") or new_sid()
sess = get_session(sid)
_, message, level = theme_mgr.set_mode(
sess,
mode,
commander_tags=list(sess.get("tags", [])),
)
ctx = _custom_theme_context(request, sess, message=message, level=level)
resp = templates.TemplateResponse("build/_new_deck_additional_themes.html", ctx)
resp.set_cookie("sid", sid, httponly=True, samesite="lax")
return resp

View file

@ -0,0 +1,379 @@
"""Validation endpoints for card name validation and include/exclude lists.
This module handles validation of card names and include/exclude lists for the deck builder,
including fuzzy matching, color identity validation, and limit enforcement.
"""
import os
from fastapi import APIRouter, Form, Request
from fastapi.responses import JSONResponse
from path_util import csv_dir as _csv_dir
router = APIRouter()
# Read configuration directly to avoid circular import with app.py
def _as_bool(val: str | bool | None, default: bool = False) -> bool:
"""Convert environment variable to boolean."""
if isinstance(val, bool):
return val
if val is None:
return default
s = str(val).strip().lower()
return s in ("1", "true", "yes", "on")
ALLOW_MUST_HAVES = _as_bool(os.getenv("ALLOW_MUST_HAVES"), True)
# Cache for available card names used by validation endpoints
_AVAILABLE_CARDS_CACHE: set[str] | None = None
_AVAILABLE_CARDS_NORM_SET: set[str] | None = None
_AVAILABLE_CARDS_NORM_MAP: dict[str, str] | None = None
def _available_cards() -> set[str]:
"""Fast load of available card names using the csv module (no pandas).
Reads only once and caches results in memory.
"""
global _AVAILABLE_CARDS_CACHE
if _AVAILABLE_CARDS_CACHE is not None:
return _AVAILABLE_CARDS_CACHE
try:
import csv
path = f"{_csv_dir()}/cards.csv"
with open(path, 'r', encoding='utf-8', newline='') as f:
reader = csv.DictReader(f)
fields = reader.fieldnames or []
name_col = None
for col in ['name', 'Name', 'card_name', 'CardName']:
if col in fields:
name_col = col
break
if name_col is None and fields:
# Heuristic: pick first field containing 'name'
for col in fields:
if 'name' in col.lower():
name_col = col
break
if name_col is None:
raise ValueError(f"No name-like column found in {path}: {fields}")
names: set[str] = set()
for row in reader:
try:
v = row.get(name_col)
if v:
names.add(str(v))
except Exception:
continue
_AVAILABLE_CARDS_CACHE = names
return _AVAILABLE_CARDS_CACHE
except Exception:
_AVAILABLE_CARDS_CACHE = set()
return _AVAILABLE_CARDS_CACHE
def _available_cards_normalized() -> tuple[set[str], dict[str, str]]:
"""Return cached normalized card names and mapping to originals."""
global _AVAILABLE_CARDS_NORM_SET, _AVAILABLE_CARDS_NORM_MAP
if _AVAILABLE_CARDS_NORM_SET is not None and _AVAILABLE_CARDS_NORM_MAP is not None:
return _AVAILABLE_CARDS_NORM_SET, _AVAILABLE_CARDS_NORM_MAP
# Build from available cards set
names = _available_cards()
try:
from code.deck_builder.include_exclude_utils import normalize_punctuation
except Exception:
# Fallback: identity normalization
def normalize_punctuation(x: str) -> str:
return str(x).strip().casefold()
norm_map: dict[str, str] = {}
for name in names:
try:
n = normalize_punctuation(name)
if n not in norm_map:
norm_map[n] = name
except Exception:
continue
_AVAILABLE_CARDS_NORM_MAP = norm_map
_AVAILABLE_CARDS_NORM_SET = set(norm_map.keys())
return _AVAILABLE_CARDS_NORM_SET, _AVAILABLE_CARDS_NORM_MAP
def warm_validation_name_cache() -> None:
"""Pre-populate the available-cards caches to avoid first-call latency."""
try:
_ = _available_cards()
_ = _available_cards_normalized()
except Exception:
# Best-effort warmup; proceed silently on failure
pass
@router.post("/validate/exclude_cards")
async def validate_exclude_cards(
request: Request,
exclude_cards: str = Form(default=""),
commander: str = Form(default="")
):
"""Legacy exclude cards validation endpoint - redirect to new unified endpoint."""
if not ALLOW_MUST_HAVES:
return JSONResponse({"error": "Feature not enabled"}, status_code=404)
# Call new unified endpoint
result = await validate_include_exclude_cards(
request=request,
include_cards="",
exclude_cards=exclude_cards,
commander=commander,
enforcement_mode="warn",
allow_illegal=False,
fuzzy_matching=True
)
# Transform to legacy format for backward compatibility
if hasattr(result, 'body'):
import json
data = json.loads(result.body)
if 'excludes' in data:
excludes = data['excludes']
return JSONResponse({
"count": excludes.get("count", 0),
"limit": excludes.get("limit", 15),
"over_limit": excludes.get("over_limit", False),
"cards": excludes.get("cards", []),
"duplicates": excludes.get("duplicates", {}),
"warnings": excludes.get("warnings", [])
})
return result
@router.post("/validate/include_exclude")
async def validate_include_exclude_cards(
request: Request,
include_cards: str = Form(default=""),
exclude_cards: str = Form(default=""),
commander: str = Form(default=""),
enforcement_mode: str = Form(default="warn"),
allow_illegal: bool = Form(default=False),
fuzzy_matching: bool = Form(default=True)
):
"""Validate include/exclude card lists with comprehensive diagnostics."""
if not ALLOW_MUST_HAVES:
return JSONResponse({"error": "Feature not enabled"}, status_code=404)
try:
from code.deck_builder.include_exclude_utils import (
parse_card_list_input, collapse_duplicates,
fuzzy_match_card_name, MAX_INCLUDES, MAX_EXCLUDES
)
from code.deck_builder.builder import DeckBuilder
# Parse inputs
include_list = parse_card_list_input(include_cards) if include_cards.strip() else []
exclude_list = parse_card_list_input(exclude_cards) if exclude_cards.strip() else []
# Collapse duplicates
include_unique, include_dupes = collapse_duplicates(include_list)
exclude_unique, exclude_dupes = collapse_duplicates(exclude_list)
# Initialize result structure
result = {
"includes": {
"count": len(include_unique),
"limit": MAX_INCLUDES,
"over_limit": len(include_unique) > MAX_INCLUDES,
"duplicates": include_dupes,
"cards": include_unique[:10] if len(include_unique) <= 10 else include_unique[:7] + ["..."],
"warnings": [],
"legal": [],
"illegal": [],
"color_mismatched": [],
"fuzzy_matches": {}
},
"excludes": {
"count": len(exclude_unique),
"limit": MAX_EXCLUDES,
"over_limit": len(exclude_unique) > MAX_EXCLUDES,
"duplicates": exclude_dupes,
"cards": exclude_unique[:10] if len(exclude_unique) <= 10 else exclude_unique[:7] + ["..."],
"warnings": [],
"legal": [],
"illegal": [],
"fuzzy_matches": {}
},
"conflicts": [], # Cards that appear in both lists
"confirmation_needed": [], # Cards needing fuzzy match confirmation
"overall_warnings": []
}
# Check for conflicts (cards in both lists)
conflicts = set(include_unique) & set(exclude_unique)
if conflicts:
result["conflicts"] = list(conflicts)
result["overall_warnings"].append(f"Cards appear in both lists: {', '.join(list(conflicts)[:3])}{'...' if len(conflicts) > 3 else ''}")
# Size warnings based on actual counts
if result["includes"]["over_limit"]:
result["includes"]["warnings"].append(f"Too many includes: {len(include_unique)}/{MAX_INCLUDES}")
elif len(include_unique) > MAX_INCLUDES * 0.8: # 80% capacity warning
result["includes"]["warnings"].append(f"Approaching limit: {len(include_unique)}/{MAX_INCLUDES}")
if result["excludes"]["over_limit"]:
result["excludes"]["warnings"].append(f"Too many excludes: {len(exclude_unique)}/{MAX_EXCLUDES}")
elif len(exclude_unique) > MAX_EXCLUDES * 0.8: # 80% capacity warning
result["excludes"]["warnings"].append(f"Approaching limit: {len(exclude_unique)}/{MAX_EXCLUDES}")
# If we have a commander, do advanced validation (color identity, etc.)
if commander and commander.strip():
try:
# Create a temporary builder
builder = DeckBuilder()
# Set up commander FIRST (before setup_dataframes)
df = builder.load_commander_data()
commander_rows = df[df["name"] == commander.strip()]
if not commander_rows.empty:
# Apply commander selection (this sets commander_row properly)
builder._apply_commander_selection(commander_rows.iloc[0])
# Now setup dataframes (this will use the commander info)
builder.setup_dataframes()
# Get available card names for fuzzy matching
name_col = 'name' if 'name' in builder._full_cards_df.columns else 'Name'
available_cards = set(builder._full_cards_df[name_col].tolist())
# Validate includes with fuzzy matching
for card_name in include_unique:
if fuzzy_matching:
match_result = fuzzy_match_card_name(card_name, available_cards)
if match_result.matched_name:
if match_result.auto_accepted:
result["includes"]["fuzzy_matches"][card_name] = match_result.matched_name
result["includes"]["legal"].append(match_result.matched_name)
else:
# Needs confirmation
result["confirmation_needed"].append({
"input": card_name,
"suggestions": match_result.suggestions,
"confidence": match_result.confidence,
"type": "include"
})
else:
result["includes"]["illegal"].append(card_name)
else:
# Exact match only
if card_name in available_cards:
result["includes"]["legal"].append(card_name)
else:
result["includes"]["illegal"].append(card_name)
# Validate excludes with fuzzy matching
for card_name in exclude_unique:
if fuzzy_matching:
match_result = fuzzy_match_card_name(card_name, available_cards)
if match_result.matched_name:
if match_result.auto_accepted:
result["excludes"]["fuzzy_matches"][card_name] = match_result.matched_name
result["excludes"]["legal"].append(match_result.matched_name)
else:
# Needs confirmation
result["confirmation_needed"].append({
"input": card_name,
"suggestions": match_result.suggestions,
"confidence": match_result.confidence,
"type": "exclude"
})
else:
result["excludes"]["illegal"].append(card_name)
else:
# Exact match only
if card_name in available_cards:
result["excludes"]["legal"].append(card_name)
else:
result["excludes"]["illegal"].append(card_name)
# Color identity validation for includes (only if we have a valid commander with colors)
commander_colors = getattr(builder, 'color_identity', [])
if commander_colors:
color_validated_includes = []
for card_name in result["includes"]["legal"]:
if builder._validate_card_color_identity(card_name):
color_validated_includes.append(card_name)
else:
# Add color-mismatched cards to illegal instead of separate category
result["includes"]["illegal"].append(card_name)
# Update legal includes to only those that pass color identity
result["includes"]["legal"] = color_validated_includes
except Exception as validation_error:
# Advanced validation failed, but return basic validation
result["overall_warnings"].append(f"Advanced validation unavailable: {str(validation_error)}")
else:
# No commander provided, do basic fuzzy matching only
if fuzzy_matching and (include_unique or exclude_unique):
try:
# Use cached available cards set (1st call populates cache)
available_cards = _available_cards()
# Fast path: normalized exact matches via cached sets
norm_set, norm_map = _available_cards_normalized()
# Validate includes with fuzzy matching
for card_name in include_unique:
from code.deck_builder.include_exclude_utils import normalize_punctuation
n = normalize_punctuation(card_name)
if n in norm_set:
result["includes"]["fuzzy_matches"][card_name] = norm_map[n]
result["includes"]["legal"].append(norm_map[n])
continue
match_result = fuzzy_match_card_name(card_name, available_cards)
if match_result.matched_name and match_result.auto_accepted:
# Exact or high-confidence match
result["includes"]["fuzzy_matches"][card_name] = match_result.matched_name
result["includes"]["legal"].append(match_result.matched_name)
elif not match_result.auto_accepted and match_result.suggestions:
# Needs confirmation - has suggestions but low confidence
result["confirmation_needed"].append({
"input": card_name,
"suggestions": match_result.suggestions,
"confidence": match_result.confidence,
"type": "include"
})
else:
# No match found at all, add to illegal
result["includes"]["illegal"].append(card_name)
# Validate excludes with fuzzy matching
for card_name in exclude_unique:
from code.deck_builder.include_exclude_utils import normalize_punctuation
n = normalize_punctuation(card_name)
if n in norm_set:
result["excludes"]["fuzzy_matches"][card_name] = norm_map[n]
result["excludes"]["legal"].append(norm_map[n])
continue
match_result = fuzzy_match_card_name(card_name, available_cards)
if match_result.matched_name:
if match_result.auto_accepted:
result["excludes"]["fuzzy_matches"][card_name] = match_result.matched_name
result["excludes"]["legal"].append(match_result.matched_name)
else:
# Needs confirmation
result["confirmation_needed"].append({
"input": card_name,
"suggestions": match_result.suggestions,
"confidence": match_result.confidence,
"type": "exclude"
})
else:
# No match found, add to illegal
result["excludes"]["illegal"].append(card_name)
except Exception as fuzzy_error:
result["overall_warnings"].append(f"Fuzzy matching unavailable: {str(fuzzy_error)}")
return JSONResponse(result)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=400)