feat: Added Partners, Backgrounds, and related variation selections to commander building.

This commit is contained in:
matt 2025-10-06 09:17:59 -07:00
parent 641b305955
commit d416c9b238
65 changed files with 11835 additions and 691 deletions

View file

@ -5,6 +5,7 @@ from fastapi import Request
from ..services import owned_store
from . import orchestrator as orch
from deck_builder import builder_constants as bc
from .. import app as app_module
def step5_base_ctx(request: Request, sess: dict, *, include_name: bool = True, include_locks: bool = True) -> Dict[str, Any]:
@ -21,6 +22,13 @@ def step5_base_ctx(request: Request, sess: dict, *, include_name: bool = True, i
"values": sess.get("ideals", orch.ideal_defaults()),
"owned_only": bool(sess.get("use_owned_only")),
"prefer_owned": bool(sess.get("prefer_owned")),
"partner_enabled": bool(sess.get("partner_enabled") and app_module.ENABLE_PARTNER_MECHANICS),
"secondary_commander": sess.get("secondary_commander"),
"background": sess.get("background"),
"partner_mode": sess.get("partner_mode"),
"partner_warnings": list(sess.get("partner_warnings", []) or []),
"combined_commander": sess.get("combined_commander"),
"partner_auto_note": sess.get("partner_auto_note"),
"owned_set": owned_set(),
"game_changers": bc.GAME_CHANGERS,
"replace_mode": bool(sess.get("replace_mode", True)),
@ -69,6 +77,9 @@ def start_ctx_from_session(sess: dict, *, set_on_session: bool = True) -> Dict[s
use_owned = bool(sess.get("use_owned_only"))
prefer = bool(sess.get("prefer_owned"))
owned_names_list = owned_names() if (use_owned or prefer) else None
partner_enabled = bool(sess.get("partner_enabled")) and app_module.ENABLE_PARTNER_MECHANICS
secondary_commander = sess.get("secondary_commander") if partner_enabled else None
background_choice = sess.get("background") if partner_enabled else None
ctx = orch.start_build_ctx(
commander=sess.get("commander"),
tags=sess.get("tags", []),
@ -87,9 +98,16 @@ def start_ctx_from_session(sess: dict, *, set_on_session: bool = True) -> Dict[s
include_cards=sess.get("include_cards"),
exclude_cards=sess.get("exclude_cards"),
swap_mdfc_basics=bool(sess.get("swap_mdfc_basics")),
partner_feature_enabled=partner_enabled,
secondary_commander=secondary_commander,
background_commander=background_choice,
)
if set_on_session:
sess["build_ctx"] = ctx
if partner_enabled:
ctx["partner_mode"] = sess.get("partner_mode")
ctx["combined_commander"] = sess.get("combined_commander")
ctx["partner_warnings"] = list(sess.get("partner_warnings", []) or [])
return ctx
@ -109,6 +127,7 @@ def commander_hover_context(
commander_name: str | None,
deck_tags: Iterable[Any] | None,
summary: Dict[str, Any] | None,
combined: Any | None = None,
) -> Dict[str, Any]:
try:
from .summary_utils import format_theme_label, format_theme_list
@ -140,6 +159,13 @@ def commander_hover_context(
result.append(label)
return result
combined_info: Dict[str, Any]
if isinstance(combined, dict):
combined_info = combined
else:
combined_info = {}
has_combined = bool(combined_info)
deck_theme_sources: list[Any] = []
_extend_sources(deck_theme_sources, list(deck_tags or []))
meta_info: Dict[str, Any] = {}
@ -176,6 +202,8 @@ def commander_hover_context(
_extend_sources(commander_theme_sources, commander_meta.get("tags"))
_extend_sources(commander_theme_sources, commander_meta.get("themes"))
_extend_sources(commander_theme_sources, combined_info.get("theme_tags"))
commander_theme_tags = format_theme_list(commander_theme_sources)
if commander_name and not commander_theme_tags:
try:
@ -211,6 +239,36 @@ def commander_hover_context(
slug_seen.add(slug)
commander_tag_slugs.append(slug)
raw_color_identity = combined_info.get("color_identity") if combined_info else None
commander_color_identity: list[str] = []
if isinstance(raw_color_identity, (list, tuple, set)):
for item in raw_color_identity:
token = str(item).strip().upper()
if token:
commander_color_identity.append(token)
commander_color_label = ""
if has_combined:
commander_color_label = str(combined_info.get("color_label") or "").strip()
if not commander_color_label and commander_color_identity:
commander_color_label = " / ".join(commander_color_identity)
if has_combined and not commander_color_label:
commander_color_label = "Colorless (C)"
commander_color_code = str(combined_info.get("color_code") or "").strip() if has_combined else ""
commander_partner_mode = str(combined_info.get("partner_mode") or "").strip() if has_combined else ""
commander_secondary_name = str(combined_info.get("secondary_name") or "").strip() if has_combined else ""
commander_primary_name = str(combined_info.get("primary_name") or commander_name or "").strip()
commander_display_name = commander_primary_name
if commander_secondary_name:
if commander_partner_mode == "background":
commander_display_name = f"{commander_primary_name} + Background: {commander_secondary_name}".strip()
else:
commander_display_name = f"{commander_primary_name} + {commander_secondary_name}".strip()
elif not commander_display_name:
commander_display_name = str(commander_name or "").strip()
reason_bits: list[str] = []
if deck_theme_tags:
reason_bits.append("Deck themes: " + ", ".join(deck_theme_tags))
@ -225,6 +283,13 @@ def commander_hover_context(
"commander_overlap_tags": overlap_tags,
"commander_reason_text": "; ".join(reason_bits),
"commander_role_label": format_theme_label("Commander") if commander_name else "",
"commander_color_identity": commander_color_identity,
"commander_color_label": commander_color_label,
"commander_color_code": commander_color_code,
"commander_partner_mode": commander_partner_mode,
"commander_secondary_name": commander_secondary_name,
"commander_primary_name": commander_primary_name,
"commander_display_name": commander_display_name,
}
@ -274,8 +339,11 @@ def step5_ctx_from_result(
commander_name=ctx.get("commander"),
deck_tags=sess.get("tags"),
summary=ctx.get("summary") if ctx.get("summary") else res.get("summary"),
combined=ctx.get("combined_commander"),
)
ctx.update(hover_meta)
if "commander_display_name" not in ctx or not ctx.get("commander_display_name"):
ctx["commander_display_name"] = ctx.get("commander")
return ctx

View file

@ -24,12 +24,16 @@ import re
from urllib.parse import quote
from path_util import csv_dir
from deck_builder.partner_background_utils import analyze_partner_background
__all__ = [
"CommanderRecord",
"CommanderCatalog",
"load_commander_catalog",
"clear_commander_catalog_cache",
"find_commander_record",
"normalized_restricted_labels",
"shared_restricted_partner_label",
]
@ -80,9 +84,13 @@ class CommanderRecord:
image_small_url: str
image_normal_url: str
partner_with: Tuple[str, ...]
has_plain_partner: bool
is_partner: bool
supports_backgrounds: bool
is_background: bool
is_doctor: bool
is_doctors_companion: bool
restricted_partner_labels: Tuple[str, ...]
search_haystack: str
@ -104,6 +112,36 @@ class CommanderCatalog:
_CACHE: Dict[str, CommanderCatalog] = {}
def normalized_restricted_labels(record: CommanderRecord | object) -> Dict[str, str]:
labels: Dict[str, str] = {}
raw_labels = getattr(record, "restricted_partner_labels", ()) or ()
for label in raw_labels:
text = str(label or "").strip()
if not text:
continue
key = text.casefold()
if key in labels:
continue
labels[key] = text
return labels
def shared_restricted_partner_label(
primary: CommanderRecord | object,
candidate: CommanderRecord | object,
) -> Optional[str]:
primary_labels = normalized_restricted_labels(primary)
if not primary_labels:
return None
candidate_labels = normalized_restricted_labels(candidate)
if not candidate_labels:
return None
for key, display in candidate_labels.items():
if key in primary_labels:
return display
return None
def clear_commander_catalog_cache() -> None:
"""Clear the in-memory commander catalog cache (testing/support)."""
@ -135,6 +173,31 @@ def load_commander_catalog(
return catalog
def find_commander_record(name: str | None) -> CommanderRecord | None:
"""Return the first commander record matching the provided name.
Matching is case-insensitive and considers display name, face name, raw name,
and slug variants. Returns ``None`` when the commander cannot be located.
"""
text = _clean_str(name)
if not text:
return None
lowered = text.casefold()
slug = _slugify(text)
try:
catalog = load_commander_catalog()
except Exception:
return None
for record in catalog.entries:
for candidate in (record.display_name, record.face_name, record.name):
if candidate and candidate.casefold() == lowered:
return record
if record.slug == slug:
return record
return None
# ---------------------------------------------------------------------------
# Internals
# ---------------------------------------------------------------------------
@ -220,15 +283,17 @@ def _row_to_record(row: Mapping[str, object], used_slugs: Iterable[str]) -> Comm
theme_tokens = tuple(dict.fromkeys(t.lower() for t in themes if t))
edhrec_rank = _parse_int(row.get("edhrecRank"))
layout = _clean_str(row.get("layout")) or "normal"
partner_with = tuple(_extract_partner_with(oracle_text))
is_partner = bool(
partner_with
or _contains_keyword(oracle_text, "partner")
or _contains_keyword(oracle_text, "friends forever")
or _contains_keyword(oracle_text, "doctor's companion")
)
supports_backgrounds = _contains_keyword(oracle_text, "choose a background")
is_background = "background" in (type_line.lower() if type_line else "")
detection = analyze_partner_background(type_line, oracle_text, raw_themes)
partner_with = detection.partner_with
if not partner_with:
partner_with = tuple(_parse_literal_list(row.get("partnerWith")))
has_plain_partner = detection.has_plain_partner
is_partner = detection.has_partner
supports_backgrounds = detection.choose_background
is_background = detection.is_background
is_doctor = detection.is_doctor
is_doctors_companion = detection.is_doctors_companion
restricted_partner_labels = tuple(detection.restricted_partner_labels)
image_small_url = _build_scryfall_url(display_name, "small")
image_normal_url = _build_scryfall_url(display_name, "normal")
@ -261,9 +326,13 @@ def _row_to_record(row: Mapping[str, object], used_slugs: Iterable[str]) -> Comm
image_small_url=image_small_url,
image_normal_url=image_normal_url,
partner_with=partner_with,
has_plain_partner=has_plain_partner,
is_partner=is_partner,
supports_backgrounds=supports_backgrounds,
is_background=is_background,
is_doctor=is_doctor,
is_doctors_companion=is_doctors_companion,
restricted_partner_labels=restricted_partner_labels,
search_haystack=search_haystack,
)
@ -277,7 +346,14 @@ def _clean_str(value: object) -> str:
def _clean_multiline(value: object) -> str:
if value is None:
return ""
text = str(value).replace("\r\n", "\n").replace("\r", "\n")
text = str(value)
if "\\r\\n" in text or "\\n" in text or "\\r" in text:
text = (
text.replace("\\r\\n", "\n")
.replace("\\r", "\n")
.replace("\\n", "\n")
)
text = text.replace("\r\n", "\n").replace("\r", "\n")
return "\n".join(line.rstrip() for line in text.split("\n"))
@ -334,35 +410,6 @@ def _split_to_list(value: object) -> List[str]:
parts = [part.strip() for part in text.split(",")]
return [part for part in parts if part]
def _extract_partner_with(text: str) -> List[str]:
if not text:
return []
out: List[str] = []
for raw_line in text.splitlines():
line = raw_line.strip()
if not line:
continue
anchor = "Partner with "
if anchor not in line:
continue
after = line.split(anchor, 1)[1]
# Remove reminder text in parentheses and trailing punctuation.
target = after.split("(", 1)[0]
target = target.replace(" and ", ",")
for token in target.split(","):
cleaned = token.strip().strip(".")
if cleaned:
out.append(cleaned)
return out
def _contains_keyword(text: str, needle: str) -> bool:
if not text:
return False
return needle.lower() in text.lower()
def _parse_color_identity(value: object) -> Tuple[Tuple[str, ...], bool]:
text = _clean_str(value)
if not text:

View file

@ -12,6 +12,11 @@ from datetime import datetime as _dt
import re
import unicodedata
from glob import glob
import subprocess
import sys
from pathlib import Path
from deck_builder.partner_selection import apply_partner_inputs
from exceptions import CommanderPartnerError
_TAG_ACRONYM_KEEP = {"EDH", "ETB", "ETBs", "CMC", "ET", "OTK"}
_REASON_SOURCE_OVERRIDES = {
@ -185,6 +190,93 @@ def _run_theme_metadata_enrichment(out_func=None) -> None:
return
def _maybe_refresh_partner_synergy(out_func=None, *, force: bool = False, root: str | os.PathLike[str] | None = None) -> None:
"""Generate partner synergy dataset when missing or stale.
The helper executes the build_partner_suggestions script when the analytics
payload is absent or older than its source assets. Failures are logged but do
not block the calling workflow.
"""
try:
root_path = Path(root) if root is not None else Path(__file__).resolve().parents[3]
except Exception:
return
try:
script_path = root_path / "code" / "scripts" / "build_partner_suggestions.py"
if not script_path.exists():
return
dataset_dir = root_path / "config" / "analytics"
output_path = dataset_dir / "partner_synergy.json"
needs_refresh = force or not output_path.exists()
dataset_mtime = 0.0
if output_path.exists():
try:
dataset_mtime = output_path.stat().st_mtime
except Exception:
dataset_mtime = 0.0
if not needs_refresh:
source_times: list[float] = []
candidates = [
root_path / "config" / "themes" / "theme_list.json",
root_path / "csv_files" / "commander_cards.csv",
]
for candidate in candidates:
try:
if candidate.exists():
source_times.append(candidate.stat().st_mtime)
except Exception:
continue
try:
deck_dir = root_path / "deck_files"
if deck_dir.is_dir():
latest_deck_mtime = 0.0
for pattern in ("*.json", "*.csv", "*.txt"):
for entry in deck_dir.rglob(pattern):
try:
mt = entry.stat().st_mtime
except Exception:
continue
if mt > latest_deck_mtime:
latest_deck_mtime = mt
if latest_deck_mtime:
source_times.append(latest_deck_mtime)
except Exception:
pass
newest_source = max(source_times) if source_times else 0.0
if newest_source and dataset_mtime < newest_source:
needs_refresh = True
if not needs_refresh:
return
try:
dataset_dir.mkdir(parents=True, exist_ok=True)
except Exception:
pass
cmd = [sys.executable, str(script_path), "--output", str(output_path)]
try:
subprocess.run(cmd, check=True, cwd=str(root_path))
if out_func:
try:
out_func("Partner suggestions dataset refreshed.")
except Exception:
pass
except Exception as exc:
if out_func:
try:
out_func(f"Partner suggestions dataset refresh failed: {exc}")
except Exception:
pass
except Exception:
return
def _global_prune_disallowed_pool(b: DeckBuilder) -> None:
"""Hard-prune disallowed categories from the working pool based on bracket limits.
@ -1054,6 +1146,10 @@ def _ensure_setup_ready(out, force: bool = False) -> None:
_run_theme_metadata_enrichment(out_func)
except Exception:
pass
try:
_maybe_refresh_partner_synergy(out_func, force=force)
except Exception:
pass
# Bust theme-related in-memory caches so new catalog reflects immediately
try:
from .theme_catalog_loader import bust_filter_cache # type: ignore
@ -1722,6 +1818,28 @@ def run_build(commander: str, tags: List[str], bracket: int, ideals: Dict[str, i
"csv": csv_path,
"txt": txt_path,
}
try:
commander_meta = b.get_commander_export_metadata() # type: ignore[attr-defined]
except Exception:
commander_meta = {}
names = commander_meta.get("commander_names") or []
if names:
meta["commander_names"] = names
combined_payload = commander_meta.get("combined_commander")
if combined_payload:
meta["combined_commander"] = combined_payload
partner_mode = commander_meta.get("partner_mode")
if partner_mode:
meta["partner_mode"] = partner_mode
color_identity = commander_meta.get("color_identity")
if color_identity:
meta["color_identity"] = color_identity
primary_commander = commander_meta.get("primary_commander")
if primary_commander:
meta["commander"] = primary_commander
secondary_commander = commander_meta.get("secondary_commander")
if secondary_commander:
meta["secondary_commander"] = secondary_commander
# Attach custom deck name if provided
try:
custom_base = getattr(b, 'custom_export_base', None)
@ -1842,6 +1960,111 @@ def _make_stages(b: DeckBuilder) -> List[Dict[str, Any]]:
return stages
def _apply_combined_commander_to_builder(builder: DeckBuilder, combined: Any) -> None:
"""Attach combined commander metadata to the builder."""
try:
builder.combined_commander = combined # type: ignore[attr-defined]
except Exception:
pass
try:
builder.partner_mode = getattr(combined, "partner_mode", None) # type: ignore[attr-defined]
except Exception:
pass
try:
builder.secondary_commander = getattr(combined, "secondary_name", None) # type: ignore[attr-defined]
except Exception:
pass
try:
builder.combined_color_identity = getattr(combined, "color_identity", None) # type: ignore[attr-defined]
builder.combined_theme_tags = getattr(combined, "theme_tags", None) # type: ignore[attr-defined]
builder.partner_warnings = getattr(combined, "warnings", None) # type: ignore[attr-defined]
except Exception:
pass
commander_dict = getattr(builder, "commander_dict", None)
if isinstance(commander_dict, dict):
try:
mode = getattr(getattr(combined, "partner_mode", None), "value", None)
commander_dict["Partner Mode"] = mode
commander_dict["Secondary Commander"] = getattr(combined, "secondary_name", None)
except Exception:
pass
def _add_secondary_commander_card(builder: DeckBuilder, commander_df: Any, combined: Any) -> None:
"""Ensure the partnered/background commander is present in the deck library."""
try:
secondary_name = getattr(combined, "secondary_name", None)
except Exception:
secondary_name = None
if not secondary_name:
return
try:
display_name = str(secondary_name).strip()
except Exception:
return
if not display_name:
return
try:
df = commander_df
if df is None:
return
match = df[df["name"].astype(str).str.casefold() == display_name.casefold()]
if match.empty and "faceName" in getattr(df, "columns", []):
match = df[df["faceName"].astype(str).str.casefold() == display_name.casefold()]
if match.empty:
return
row = match.iloc[0]
except Exception:
return
card_name = str(row.get("name") or display_name).strip()
card_type = str(row.get("type") or row.get("type_line") or "")
mana_cost = str(row.get("manaCost") or "")
mana_value = row.get("manaValue", row.get("cmc"))
try:
if mana_value in ("", None):
mana_value = None
else:
mana_value = float(mana_value)
except Exception:
mana_value = None
raw_creatures = row.get("creatureTypes")
if isinstance(raw_creatures, str):
creature_types = [part.strip() for part in raw_creatures.split(",") if part.strip()]
elif isinstance(raw_creatures, (list, tuple)):
creature_types = [str(part).strip() for part in raw_creatures if str(part).strip()]
else:
creature_types = []
raw_tags = row.get("themeTags")
if isinstance(raw_tags, str):
tags = [part.strip() for part in raw_tags.split(",") if part.strip()]
elif isinstance(raw_tags, (list, tuple)):
tags = [str(part).strip() for part in raw_tags if str(part).strip()]
else:
tags = []
try:
builder.add_card(
card_name=card_name,
card_type=card_type,
mana_cost=mana_cost,
mana_value=mana_value,
creature_types=creature_types,
tags=tags,
is_commander=True,
sub_role="Partner",
added_by="Partner Mechanics",
)
except Exception:
return
def start_build_ctx(
commander: str,
tags: List[str],
@ -1861,6 +2084,9 @@ def start_build_ctx(
include_cards: List[str] | None = None,
exclude_cards: List[str] | None = None,
swap_mdfc_basics: bool | None = None,
partner_feature_enabled: bool | None = None,
secondary_commander: str | None = None,
background_commander: str | None = None,
) -> Dict[str, Any]:
logs: List[str] = []
@ -1882,6 +2108,32 @@ def start_build_ctx(
if row.empty:
raise ValueError(f"Commander not found: {commander}")
b._apply_commander_selection(row.iloc[0])
if secondary_commander is not None:
secondary_commander = str(secondary_commander).strip()
if not secondary_commander:
secondary_commander = None
if background_commander is not None:
background_commander = str(background_commander).strip()
if not background_commander:
background_commander = None
combined_partner = None
if partner_feature_enabled and (secondary_commander or background_commander):
try:
combined_partner = apply_partner_inputs(
b,
primary_name=str(commander),
secondary_name=secondary_commander,
background_name=background_commander,
feature_enabled=True,
)
except CommanderPartnerError as exc:
out(f"Partner selection error: {exc}")
except Exception as exc:
out(f"Partner selection failed: {exc}")
else:
if combined_partner is not None:
_apply_combined_commander_to_builder(b, combined_partner)
_add_secondary_commander_card(b, df, combined_partner)
# Tags (explicit + supplemental applied upstream)
b.selected_tags = list(tags or [])
b.primary_tag = b.selected_tags[0] if len(b.selected_tags) > 0 else None
@ -2158,6 +2410,28 @@ def run_stage(ctx: Dict[str, Any], rerun: bool = False, show_skipped: bool = Fal
"csv": ctx.get("csv_path"),
"txt": ctx.get("txt_path"),
}
try:
commander_meta = b.get_commander_export_metadata() # type: ignore[attr-defined]
except Exception:
commander_meta = {}
names = commander_meta.get("commander_names") or []
if names:
meta["commander_names"] = names
combined_payload = commander_meta.get("combined_commander")
if combined_payload:
meta["combined_commander"] = combined_payload
partner_mode = commander_meta.get("partner_mode")
if partner_mode:
meta["partner_mode"] = partner_mode
color_identity = commander_meta.get("color_identity")
if color_identity:
meta["color_identity"] = color_identity
primary_commander = commander_meta.get("primary_commander")
if primary_commander:
meta["commander"] = primary_commander
secondary_commander = commander_meta.get("secondary_commander")
if secondary_commander:
meta["secondary_commander"] = secondary_commander
try:
custom_base = getattr(b, 'custom_export_base', None)
except Exception:
@ -2961,6 +3235,28 @@ def run_stage(ctx: Dict[str, Any], rerun: bool = False, show_skipped: bool = Fal
"csv": ctx.get("csv_path"),
"txt": ctx.get("txt_path"),
}
try:
commander_meta = b.get_commander_export_metadata() # type: ignore[attr-defined]
except Exception:
commander_meta = {}
names = commander_meta.get("commander_names") or []
if names:
meta["commander_names"] = names
combined_payload = commander_meta.get("combined_commander")
if combined_payload:
meta["combined_commander"] = combined_payload
partner_mode = commander_meta.get("partner_mode")
if partner_mode:
meta["partner_mode"] = partner_mode
color_identity = commander_meta.get("color_identity")
if color_identity:
meta["color_identity"] = color_identity
primary_commander = commander_meta.get("primary_commander")
if primary_commander:
meta["commander"] = primary_commander
secondary_commander = commander_meta.get("secondary_commander")
if secondary_commander:
meta["secondary_commander"] = secondary_commander
try:
custom_base = getattr(b, 'custom_export_base', None)
except Exception:

View file

@ -0,0 +1,595 @@
"""Partner suggestion dataset loader and scoring utilities."""
from __future__ import annotations
from dataclasses import dataclass
import json
import os
from pathlib import Path
from threading import Lock
from types import SimpleNamespace
from typing import Any, Iterable, Mapping, Optional, Sequence
from code.logging_util import get_logger
from deck_builder.combined_commander import CombinedCommander, PartnerMode, build_combined_commander
from deck_builder.suggestions import (
PartnerSuggestionContext,
ScoreResult,
is_noise_theme,
score_partner_candidate,
)
from deck_builder.color_identity_utils import canon_color_code, color_label_from_code
from deck_builder.partner_selection import normalize_lookup_name
from exceptions import CommanderPartnerError
LOGGER = get_logger(__name__)
_COLOR_NAME_MAP = {
"W": "White",
"U": "Blue",
"B": "Black",
"R": "Red",
"G": "Green",
"C": "Colorless",
}
_MODE_DISPLAY = {
PartnerMode.PARTNER.value: "Partner",
PartnerMode.PARTNER_WITH.value: "Partner With",
PartnerMode.BACKGROUND.value: "Choose a Background",
PartnerMode.DOCTOR_COMPANION.value: "Doctor & Companion",
}
_NOTE_LABELS = {
"partner_with_match": "Canonical Partner With pair",
"background_compatible": "Ideal background match",
"doctor_companion_match": "Doctor ↔ Companion pairing",
"shared_partner_keyword": "Both commanders have Partner",
"restricted_label_match": "Restricted partner label matches",
"observed_pairing": "Popular pairing in exported decks",
}
def _to_tuple(values: Iterable[str] | None) -> tuple[str, ...]:
if not values:
return tuple()
result: list[str] = []
seen: set[str] = set()
for value in values:
token = str(value or "").strip()
if not token:
continue
key = token.casefold()
if key in seen:
continue
seen.add(key)
result.append(token)
return tuple(result)
def _normalize(value: str | None) -> str:
return normalize_lookup_name(value)
def _color_code(identity: Iterable[str]) -> str:
code = canon_color_code(tuple(identity))
return code or "C"
def _color_label(identity: Iterable[str]) -> str:
return color_label_from_code(_color_code(identity))
def _mode_label(mode: PartnerMode | str | None) -> str:
if isinstance(mode, PartnerMode):
return _MODE_DISPLAY.get(mode.value, mode.value)
if isinstance(mode, str):
return _MODE_DISPLAY.get(mode, mode.title())
return "Partner Mechanics"
@dataclass(frozen=True)
class CommanderEntry:
"""Commander metadata extracted from the partner synergy dataset."""
key: str
name: str
display_name: str
payload: Mapping[str, Any]
partner_payload: Mapping[str, Any]
color_identity: tuple[str, ...]
themes: tuple[str, ...]
role_tags: tuple[str, ...]
def to_source(self) -> SimpleNamespace:
partner = self.partner_payload
partner_with = _to_tuple(partner.get("partner_with"))
supports_backgrounds = bool(partner.get("supports_backgrounds") or partner.get("choose_background"))
is_partner = bool(partner.get("has_partner") or partner.get("has_plain_partner"))
is_background = bool(partner.get("is_background"))
is_doctor = bool(partner.get("is_doctor"))
is_companion = bool(partner.get("is_doctors_companion"))
restricted_labels = _to_tuple(partner.get("restricted_partner_labels"))
return SimpleNamespace(
name=self.name,
display_name=self.display_name,
color_identity=self.color_identity,
colors=self.color_identity,
themes=self.themes,
theme_tags=self.themes,
raw_tags=self.themes,
partner_with=partner_with,
supports_backgrounds=supports_backgrounds,
is_partner=is_partner,
is_background=is_background,
is_doctor=is_doctor,
is_doctors_companion=is_companion,
restricted_partner_labels=restricted_labels,
oracle_text="",
type_line="",
)
@property
def canonical(self) -> str:
return self.key
@dataclass(frozen=True)
class PartnerSuggestionResult:
"""Structured partner suggestions grouped by mode."""
commander: str
display_name: str
canonical: str
metadata: Mapping[str, Any]
by_mode: Mapping[str, list[dict[str, Any]]]
total: int
def flatten(
self,
partner_names: Iterable[str],
background_names: Iterable[str],
*,
visible_limit: int = 3,
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
partner_allowed = {_normalize(name) for name in partner_names if name}
background_allowed = {_normalize(name) for name in background_names if name}
ordered_modes = [
PartnerMode.PARTNER_WITH.value,
PartnerMode.PARTNER.value,
PartnerMode.DOCTOR_COMPANION.value,
PartnerMode.BACKGROUND.value,
]
visible: list[dict[str, Any]] = []
hidden: list[dict[str, Any]] = []
for mode_key in ordered_modes:
suggestions = self.by_mode.get(mode_key, [])
for suggestion in suggestions:
name_key = _normalize(suggestion.get("name"))
if mode_key == PartnerMode.BACKGROUND.value:
if name_key not in background_allowed:
continue
else:
if name_key not in partner_allowed:
continue
target = visible if len(visible) < visible_limit else hidden
target.append(suggestion)
return visible, hidden
class PartnerSuggestionDataset:
"""Cached partner synergy dataset accessor."""
def __init__(self, path: Path) -> None:
self.path = path
self._payload: Optional[dict[str, Any]] = None
self._metadata: dict[str, Any] = {}
self._entries: dict[str, CommanderEntry] = {}
self._lookup: dict[str, CommanderEntry] = {}
self._pairing_counts: dict[tuple[str, str, str], int] = {}
self._context: PartnerSuggestionContext = PartnerSuggestionContext()
self._mtime_ns: int = -1
@property
def metadata(self) -> Mapping[str, Any]:
return self._metadata
@property
def context(self) -> PartnerSuggestionContext:
return self._context
def ensure_loaded(self, *, force: bool = False) -> None:
if not self.path.exists():
raise FileNotFoundError(self.path)
stat = self.path.stat()
if not force and self._payload is not None and stat.st_mtime_ns == self._mtime_ns:
return
raw = json.loads(self.path.read_text(encoding="utf-8") or "{}")
if not isinstance(raw, dict):
raise ValueError("partner synergy dataset is not a JSON object")
commanders = raw.get("commanders") or {}
if not isinstance(commanders, Mapping):
raise ValueError("commanders section missing in partner synergy dataset")
entries: dict[str, CommanderEntry] = {}
lookup: dict[str, CommanderEntry] = {}
for key, payload in commanders.items():
if not isinstance(payload, Mapping):
continue
display = str(payload.get("display_name") or payload.get("name") or key or "").strip()
if not display:
continue
name = str(payload.get("name") or display)
partner_payload = payload.get("partner") or {}
if not isinstance(partner_payload, Mapping):
partner_payload = {}
color_identity = _to_tuple(payload.get("color_identity"))
themes = tuple(
theme for theme in _to_tuple(payload.get("themes")) if not is_noise_theme(theme)
)
role_tags = _to_tuple(payload.get("role_tags"))
entry = CommanderEntry(
key=str(key),
name=name,
display_name=display,
payload=payload,
partner_payload=partner_payload,
color_identity=color_identity,
themes=themes,
role_tags=role_tags,
)
entries[entry.canonical] = entry
aliases = {
_normalize(entry.canonical),
_normalize(entry.display_name),
_normalize(entry.name),
}
for alias in aliases:
if alias and alias not in lookup:
lookup[alias] = entry
pairings: dict[tuple[str, str, str], int] = {}
pairing_block = raw.get("pairings") or {}
records = pairing_block.get("records") if isinstance(pairing_block, Mapping) else None
if isinstance(records, Sequence):
for record in records:
if not isinstance(record, Mapping):
continue
mode = str(record.get("mode") or "unknown").strip().replace("-", "_")
primary_key = _normalize(record.get("primary_canonical") or record.get("primary"))
secondary_key = _normalize(record.get("secondary_canonical") or record.get("secondary"))
if not mode or not primary_key or not secondary_key:
continue
try:
count = int(record.get("count", 0))
except Exception:
count = 0
if count <= 0:
continue
pairings[(mode, primary_key, secondary_key)] = count
pairings[(mode, secondary_key, primary_key)] = count
self._payload = raw
self._metadata = dict(raw.get("metadata") or {})
self._entries = entries
self._lookup = lookup
self._pairing_counts = pairings
self._context = PartnerSuggestionContext.from_dataset(raw)
self._mtime_ns = stat.st_mtime_ns
def lookup(self, name: str) -> Optional[CommanderEntry]:
key = _normalize(name)
if not key:
return None
entry = self._lookup.get(key)
if entry is not None:
return entry
return self._entries.get(key)
def entries(self) -> Iterable[CommanderEntry]:
return self._entries.values()
def pairing_count(self, mode: PartnerMode, primary: CommanderEntry, secondary: CommanderEntry) -> int:
return int(self._pairing_counts.get((mode.value, primary.canonical, secondary.canonical), 0))
def build_combined(
self,
primary: CommanderEntry,
candidate: CommanderEntry,
mode: PartnerMode,
) -> CombinedCommander:
primary_src = primary.to_source()
candidate_src = candidate.to_source()
return build_combined_commander(primary_src, candidate_src, mode)
ROOT_DIR = Path(__file__).resolve().parents[3]
DEFAULT_DATASET_PATH = (ROOT_DIR / "config" / "analytics" / "partner_synergy.json").resolve()
_DATASET_ENV_VAR = "PARTNER_SUGGESTIONS_DATASET"
_ENV_OVERRIDE = os.getenv(_DATASET_ENV_VAR)
_DATASET_PATH: Path = Path(_ENV_OVERRIDE).expanduser().resolve() if _ENV_OVERRIDE else DEFAULT_DATASET_PATH
_DATASET_CACHE: Optional[PartnerSuggestionDataset] = None
_DATASET_LOCK = Lock()
_DATASET_REFRESH_ATTEMPTED = False
def configure_dataset_path(path: str | Path | None) -> None:
"""Override the dataset path (primarily for tests)."""
global _DATASET_PATH, _DATASET_CACHE
if path is None:
_DATASET_PATH = DEFAULT_DATASET_PATH
os.environ.pop(_DATASET_ENV_VAR, None)
else:
resolved = Path(path).expanduser().resolve()
_DATASET_PATH = resolved
os.environ[_DATASET_ENV_VAR] = str(resolved)
_DATASET_CACHE = None
def load_dataset(*, force: bool = False, refresh: bool = False) -> Optional[PartnerSuggestionDataset]:
"""Return the cached dataset, reloading if needed.
Args:
force: When True, bypass the in-memory cache and reload the dataset from disk.
refresh: When True, attempt to regenerate the dataset before loading. This
resets the "already tried" guard so manual refresh actions can retry
regeneration after an earlier failure.
"""
global _DATASET_CACHE, _DATASET_REFRESH_ATTEMPTED
with _DATASET_LOCK:
if refresh:
_DATASET_REFRESH_ATTEMPTED = False
_DATASET_CACHE = None
dataset = _DATASET_CACHE
if dataset is None or force or refresh:
dataset = PartnerSuggestionDataset(_DATASET_PATH)
try:
dataset.ensure_loaded(force=force or refresh or dataset is not _DATASET_CACHE)
except FileNotFoundError:
LOGGER.debug("partner suggestions dataset missing at %s", _DATASET_PATH)
# Attempt to materialize the dataset automatically when using the default path.
allow_auto_refresh = (
_DATASET_PATH == DEFAULT_DATASET_PATH
and (refresh or not _DATASET_REFRESH_ATTEMPTED)
)
if allow_auto_refresh:
_DATASET_REFRESH_ATTEMPTED = True
try:
from .orchestrator import _maybe_refresh_partner_synergy # type: ignore
_maybe_refresh_partner_synergy(None, force=True)
except Exception as refresh_exc: # pragma: no cover - best-effort
LOGGER.debug(
"partner suggestions auto-refresh failed: %s",
refresh_exc,
exc_info=True,
)
try:
dataset.ensure_loaded(force=True)
except FileNotFoundError:
LOGGER.debug(
"partner suggestions dataset still missing after auto-refresh",
exc_info=True,
)
if refresh:
_DATASET_REFRESH_ATTEMPTED = False
_DATASET_CACHE = None
return None
except Exception as exc: # pragma: no cover - defensive logging
LOGGER.warning("partner suggestions dataset failed after auto-refresh", exc_info=exc)
if refresh:
_DATASET_REFRESH_ATTEMPTED = False
_DATASET_CACHE = None
return None
else:
_DATASET_CACHE = None
return None
except Exception as exc: # pragma: no cover - defensive logging
LOGGER.warning("partner suggestions dataset failed to load", exc_info=exc)
_DATASET_CACHE = None
return None
_DATASET_CACHE = dataset
return dataset
def _shared_restriction_label(primary: CommanderEntry, candidate: CommanderEntry) -> Optional[str]:
primary_labels = set(_to_tuple(primary.partner_payload.get("restricted_partner_labels")))
candidate_labels = set(_to_tuple(candidate.partner_payload.get("restricted_partner_labels")))
shared = primary_labels & candidate_labels
if not shared:
return None
return sorted(shared, key=str.casefold)[0]
def _color_delta(primary: CommanderEntry, combined: CombinedCommander) -> dict[str, list[str]]:
primary_colors = {color.upper() for color in primary.color_identity}
combined_colors = {color.upper() for color in combined.color_identity or ()}
added = [
_COLOR_NAME_MAP.get(color, color)
for color in sorted(combined_colors - primary_colors)
]
removed = [
_COLOR_NAME_MAP.get(color, color)
for color in sorted(primary_colors - combined_colors)
]
return {
"added": added,
"removed": removed,
}
def _reason_summary(
result: ScoreResult,
shared_themes: Sequence[str],
pairing_count: int,
color_delta: Mapping[str, Sequence[str]],
) -> tuple[str, list[str]]:
parts: list[str] = []
details: list[str] = []
score_percent = int(round(max(0.0, min(1.0, result.score)) * 100))
parts.append(f"{score_percent}% match")
if shared_themes:
label = ", ".join(shared_themes[:2])
parts.append(f"Shared themes: {label}")
if pairing_count > 0:
parts.append(f"Seen in {pairing_count} decks")
for note in result.notes:
label = _NOTE_LABELS.get(note)
if label and label not in details:
details.append(label)
if not details and pairing_count > 0:
details.append(f"Observed together {pairing_count} time(s)")
added = color_delta.get("added") or []
if added:
details.append("Adds " + ", ".join(added))
overlap_component = float(result.components.get("overlap", 0.0))
if overlap_component >= 0.35 and len(parts) < 3:
percent = int(round(overlap_component * 100))
details.append(f"Theme overlap {percent}%")
summary = "".join(parts[:3])
return summary, details
def _build_suggestion_payload(
primary: CommanderEntry,
candidate: CommanderEntry,
mode: PartnerMode,
result: ScoreResult,
combined: CombinedCommander,
pairing_count: int,
) -> dict[str, Any]:
shared_themes = sorted(
{
theme
for theme in primary.themes
if theme in candidate.themes and not is_noise_theme(theme)
},
key=str.casefold,
)
color_delta = _color_delta(primary, combined)
summary, details = _reason_summary(result, shared_themes, pairing_count, color_delta)
suggestion = {
"name": candidate.display_name,
"mode": mode.value,
"mode_label": _mode_label(mode),
"score": max(0.0, min(1.0, float(result.score))),
"score_percent": int(round(max(0.0, min(1.0, float(result.score))) * 100)),
"score_components": dict(result.components),
"notes": list(result.notes),
"shared_themes": shared_themes,
"candidate_themes": list(candidate.themes),
"theme_tags": list(combined.theme_tags or ()),
"summary": summary,
"reasons": details,
"pairing_count": pairing_count,
"color_code": combined.color_code or _color_code(combined.color_identity or ()),
"color_label": combined.color_label or _color_label(combined.color_identity or ()),
"color_identity": list(combined.color_identity or ()),
"candidate_colors": list(candidate.color_identity),
"primary_colors": list(combined.primary_color_identity or primary.color_identity),
"secondary_colors": list(combined.secondary_color_identity or candidate.color_identity),
"color_delta": color_delta,
"restriction_label": _shared_restriction_label(primary, candidate),
}
if combined.secondary_name:
suggestion["secondary_name"] = combined.secondary_name
suggestion["preview"] = {
"primary_name": combined.primary_name,
"secondary_name": combined.secondary_name,
"partner_mode": mode.value,
"partner_mode_label": _mode_label(mode),
"color_label": suggestion["color_label"],
"color_code": suggestion["color_code"],
"theme_tags": list(combined.theme_tags or ()),
"secondary_role_label": getattr(combined, "secondary_name", None) and (
"Background" if mode is PartnerMode.BACKGROUND else (
"Doctor's Companion" if mode is PartnerMode.DOCTOR_COMPANION else "Partner commander"
)
),
}
return suggestion
def get_partner_suggestions(
commander_name: str,
*,
limit_per_mode: int = 5,
include_modes: Optional[Sequence[PartnerMode]] = None,
min_score: float = 0.15,
refresh_dataset: bool = False,
) -> Optional[PartnerSuggestionResult]:
dataset = load_dataset(force=refresh_dataset, refresh=refresh_dataset)
if dataset is None:
return None
primary_entry = dataset.lookup(commander_name)
if primary_entry is None:
return PartnerSuggestionResult(
commander=commander_name,
display_name=commander_name,
canonical=_normalize(commander_name) or commander_name,
metadata=dataset.metadata,
by_mode={},
total=0,
)
allowed_modes = set(include_modes) if include_modes else {
PartnerMode.PARTNER,
PartnerMode.PARTNER_WITH,
PartnerMode.BACKGROUND,
PartnerMode.DOCTOR_COMPANION,
}
grouped: dict[str, list[dict[str, Any]]] = {
PartnerMode.PARTNER.value: [],
PartnerMode.PARTNER_WITH.value: [],
PartnerMode.BACKGROUND.value: [],
PartnerMode.DOCTOR_COMPANION.value: [],
}
total = 0
primary_source = primary_entry.payload
context = dataset.context
for candidate_entry in dataset.entries():
if candidate_entry.canonical == primary_entry.canonical:
continue
try:
result = score_partner_candidate(primary_source, candidate_entry.payload, context=context)
except Exception: # pragma: no cover - defensive scoring guard
continue
mode = result.mode
if mode is PartnerMode.NONE or mode not in allowed_modes:
continue
if result.score < min_score:
continue
try:
combined = dataset.build_combined(primary_entry, candidate_entry, mode)
except CommanderPartnerError:
continue
except Exception: # pragma: no cover - defensive
continue
pairing_count = dataset.pairing_count(mode, primary_entry, candidate_entry)
suggestion = _build_suggestion_payload(primary_entry, candidate_entry, mode, result, combined, pairing_count)
grouped[mode.value].append(suggestion)
total += 1
for mode_key, suggestions in grouped.items():
suggestions.sort(key=lambda item: (-float(item.get("score", 0.0)), item.get("name", "").casefold()))
if limit_per_mode > 0:
grouped[mode_key] = suggestions[:limit_per_mode]
return PartnerSuggestionResult(
commander=primary_entry.display_name,
display_name=primary_entry.display_name,
canonical=primary_entry.canonical,
metadata=dataset.metadata,
by_mode=grouped,
total=sum(len(s) for s in grouped.values()),
)

View file

@ -2,16 +2,19 @@ from __future__ import annotations
import json
import logging
from typing import Any, Dict
from typing import Any, Dict, Mapping, Optional, Sequence
from fastapi import Request
__all__ = [
"log_commander_page_view",
"log_commander_create_deck",
"log_partner_suggestions_generated",
"log_partner_suggestion_selected",
]
_LOGGER = logging.getLogger("web.commander_browser")
_PARTNER_LOGGER = logging.getLogger("web.partner_suggestions")
def _emit(logger: logging.Logger, payload: Dict[str, Any]) -> None:
@ -104,3 +107,113 @@ def log_commander_create_deck(
"client_ip": _client_ip(request),
}
_emit(_LOGGER, payload)
def _extract_dataset_metadata(metadata: Mapping[str, Any] | None) -> Dict[str, Any]:
if not isinstance(metadata, Mapping):
return {}
snapshot: Dict[str, Any] = {}
for key in ("dataset_version", "generated_at", "record_count", "entry_count", "build_id"):
if key in metadata:
snapshot[key] = metadata[key]
if not snapshot:
# Fall back to a small subset to avoid logging the full metadata document.
for key, value in list(metadata.items())[:5]:
snapshot[key] = value
return snapshot
def log_partner_suggestions_generated(
request: Request,
*,
commander_display: str,
commander_canonical: str,
include_modes: Sequence[str] | None,
available_modes: Sequence[str],
total: int,
mode_counts: Mapping[str, int],
visible_count: int,
hidden_count: int,
limit_per_mode: int,
visible_limit: int,
include_hidden: bool,
refresh_requested: bool,
dataset_metadata: Mapping[str, Any] | None = None,
) -> None:
payload: Dict[str, Any] = {
"event": "partner_suggestions.generated",
"request_id": _request_id(request),
"path": str(request.url.path),
"query": _query_snapshot(request),
"commander": {
"display": commander_display,
"canonical": commander_canonical,
},
"limits": {
"per_mode": int(limit_per_mode),
"visible": int(visible_limit),
"include_hidden": bool(include_hidden),
},
"result": {
"total": int(total),
"visible_count": int(visible_count),
"hidden_count": int(hidden_count),
"available_modes": list(available_modes),
"mode_counts": {str(key): int(value) for key, value in mode_counts.items()},
"metadata": _extract_dataset_metadata(dataset_metadata),
},
"filters": {
"include_modes": [str(mode) for mode in (include_modes or [])],
"refresh": bool(refresh_requested),
},
"client_ip": _client_ip(request),
}
_emit(_PARTNER_LOGGER, payload)
def log_partner_suggestion_selected(
request: Request,
*,
commander: str,
scope: str | None,
partner_enabled: bool,
auto_opt_out: bool,
auto_assigned: bool,
selection_source: Optional[str],
secondary_candidate: str | None,
background_candidate: str | None,
resolved_secondary: str | None,
resolved_background: str | None,
partner_mode: str | None,
has_preview: bool,
warnings: Sequence[str] | None,
error: str | None,
) -> None:
payload: Dict[str, Any] = {
"event": "partner_suggestions.selected",
"request_id": _request_id(request),
"path": str(request.url.path),
"scope": scope or "",
"commander": commander,
"partner_enabled": bool(partner_enabled),
"auto_opt_out": bool(auto_opt_out),
"auto_assigned": bool(auto_assigned),
"selection_source": (selection_source or "") or None,
"inputs": {
"secondary_candidate": secondary_candidate,
"background_candidate": background_candidate,
},
"resolved": {
"partner_mode": partner_mode,
"secondary": resolved_secondary,
"background": resolved_background,
},
"preview_available": bool(has_preview),
"warnings_count": len(warnings or []),
"has_error": bool(error),
"error": error,
"client_ip": _client_ip(request),
}
if warnings:
payload["warnings"] = list(warnings)
_emit(_PARTNER_LOGGER, payload)