mirror of
https://github.com/mwisnowski/mtg_python_deckbuilder.git
synced 2025-12-18 00:20:13 +01:00
feat: add supplemental theme catalog tooling, additional theme selection, and custom theme selection
This commit is contained in:
parent
3a1b011dbc
commit
9428e09cef
39 changed files with 3643 additions and 198 deletions
|
|
@ -41,6 +41,13 @@ from .phases.phase6_reporting import ReportingMixin
|
|||
# Local application imports
|
||||
from . import builder_constants as bc
|
||||
from . import builder_utils as bu
|
||||
from deck_builder.theme_context import (
|
||||
ThemeContext,
|
||||
build_theme_context,
|
||||
default_user_theme_weight,
|
||||
theme_summary_payload,
|
||||
)
|
||||
from deck_builder.theme_resolution import ThemeResolutionInfo
|
||||
import os
|
||||
from settings import CSV_DIRECTORY
|
||||
from file_setup.setup import initial_setup
|
||||
|
|
@ -113,6 +120,35 @@ class DeckBuilder(
|
|||
except Exception:
|
||||
# Leave RNG as-is on unexpected error
|
||||
pass
|
||||
|
||||
def _theme_context_signature(self) -> Tuple[Any, ...]:
|
||||
resolved = tuple(
|
||||
str(tag) for tag in getattr(self, 'user_theme_resolved', []) if isinstance(tag, str)
|
||||
)
|
||||
resolution = getattr(self, 'user_theme_resolution', None)
|
||||
resolution_id = id(resolution) if resolution is not None else None
|
||||
return (
|
||||
str(getattr(self, 'primary_tag', '') or ''),
|
||||
str(getattr(self, 'secondary_tag', '') or ''),
|
||||
str(getattr(self, 'tertiary_tag', '') or ''),
|
||||
tuple(str(tag) for tag in getattr(self, 'selected_tags', []) if isinstance(tag, str)),
|
||||
resolved,
|
||||
str(getattr(self, 'tag_mode', 'AND') or 'AND').upper(),
|
||||
round(float(getattr(self, 'user_theme_weight', 1.0)), 4),
|
||||
resolution_id,
|
||||
)
|
||||
|
||||
def get_theme_context(self) -> ThemeContext:
|
||||
signature = self._theme_context_signature()
|
||||
if self._theme_context_cache is None or self._theme_context_cache_key != signature:
|
||||
context = build_theme_context(self)
|
||||
self._theme_context_cache = context
|
||||
self._theme_context_cache_key = signature
|
||||
return self._theme_context_cache
|
||||
|
||||
def get_theme_summary_payload(self) -> Dict[str, Any]:
|
||||
context = self.get_theme_context()
|
||||
return theme_summary_payload(context)
|
||||
def build_deck_full(self):
|
||||
"""Orchestrate the full deck build process, chaining all major phases."""
|
||||
start_ts = datetime.datetime.now()
|
||||
|
|
@ -424,6 +460,19 @@ class DeckBuilder(
|
|||
# Diagnostics storage for include/exclude processing
|
||||
include_exclude_diagnostics: Optional[Dict[str, Any]] = None
|
||||
|
||||
# Supplemental user themes (M4: Config & Headless Support)
|
||||
user_theme_requested: List[str] = field(default_factory=list)
|
||||
user_theme_resolved: List[str] = field(default_factory=list)
|
||||
user_theme_matches: List[Dict[str, Any]] = field(default_factory=list)
|
||||
user_theme_unresolved: List[Dict[str, Any]] = field(default_factory=list)
|
||||
user_theme_fuzzy_corrections: Dict[str, str] = field(default_factory=dict)
|
||||
theme_match_mode: str = "permissive"
|
||||
theme_catalog_version: Optional[str] = None
|
||||
user_theme_weight: float = field(default_factory=default_user_theme_weight)
|
||||
user_theme_resolution: Optional[ThemeResolutionInfo] = None
|
||||
_theme_context_cache: Optional[ThemeContext] = field(default=None, init=False, repr=False)
|
||||
_theme_context_cache_key: Optional[Tuple[Any, ...]] = field(default=None, init=False, repr=False)
|
||||
|
||||
# Deck library (cards added so far) mapping name->record
|
||||
card_library: Dict[str, Dict[str, Any]] = field(default_factory=dict)
|
||||
# Tag tracking: counts of unique cards per tag (not per copy)
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ from typing import List, Dict
|
|||
|
||||
from .. import builder_constants as bc
|
||||
from .. import builder_utils as bu
|
||||
from ..theme_context import annotate_theme_matches
|
||||
import logging_util
|
||||
|
||||
logger = logging_util.logging.getLogger(__name__)
|
||||
|
|
@ -31,48 +32,20 @@ class CreatureAdditionMixin:
|
|||
if 'type' not in df.columns:
|
||||
self.output_func("Card pool missing 'type' column; cannot add creatures.")
|
||||
return
|
||||
themes_ordered: List[tuple[str, str]] = []
|
||||
if self.primary_tag:
|
||||
themes_ordered.append(('primary', self.primary_tag))
|
||||
if self.secondary_tag:
|
||||
themes_ordered.append(('secondary', self.secondary_tag))
|
||||
if self.tertiary_tag:
|
||||
themes_ordered.append(('tertiary', self.tertiary_tag))
|
||||
if not themes_ordered:
|
||||
try:
|
||||
context = self.get_theme_context() # type: ignore[attr-defined]
|
||||
except Exception:
|
||||
context = None
|
||||
if context is None or not getattr(context, 'ordered_targets', []):
|
||||
self.output_func("No themes selected; skipping creature addition.")
|
||||
return
|
||||
themes_ordered = list(context.ordered_targets)
|
||||
selected_tags_lower = context.selected_slugs()
|
||||
if not themes_ordered or not selected_tags_lower:
|
||||
self.output_func("No themes selected; skipping creature addition.")
|
||||
return
|
||||
desired_total = (self.ideal_counts.get('creatures') if getattr(self, 'ideal_counts', None) else None) or getattr(bc, 'DEFAULT_CREATURE_COUNT', 25)
|
||||
n_themes = len(themes_ordered)
|
||||
if n_themes == 1:
|
||||
base_map = {'primary': 1.0}
|
||||
elif n_themes == 2:
|
||||
base_map = {'primary': 0.6, 'secondary': 0.4}
|
||||
else:
|
||||
base_map = {'primary': 0.5, 'secondary': 0.3, 'tertiary': 0.2}
|
||||
weights: Dict[str, float] = {}
|
||||
boosted_roles: set[str] = set()
|
||||
if n_themes > 1:
|
||||
for role, tag in themes_ordered:
|
||||
w = base_map.get(role, 0.0)
|
||||
lt = tag.lower()
|
||||
if 'kindred' in lt or 'tribal' in lt:
|
||||
mult = getattr(bc, 'WEIGHT_ADJUSTMENT_FACTORS', {}).get(f'kindred_{role}', 1.0)
|
||||
w *= mult
|
||||
boosted_roles.add(role)
|
||||
weights[role] = w
|
||||
total = sum(weights.values())
|
||||
if total > 1.0:
|
||||
for r in list(weights):
|
||||
weights[r] /= total
|
||||
else:
|
||||
rem = 1.0 - total
|
||||
base_sum_unboosted = sum(base_map[r] for r,_t in themes_ordered if r not in boosted_roles)
|
||||
if rem > 1e-6 and base_sum_unboosted > 0:
|
||||
for r,_t in themes_ordered:
|
||||
if r not in boosted_roles:
|
||||
weights[r] += rem * (base_map[r] / base_sum_unboosted)
|
||||
else:
|
||||
weights['primary'] = 1.0
|
||||
weights: Dict[str, float] = dict(getattr(context, 'weights', {}))
|
||||
creature_df = df[df['type'].str.contains('Creature', case=False, na=False)].copy()
|
||||
commander_name = getattr(self, 'commander', None) or getattr(self, 'commander_name', None)
|
||||
if commander_name and 'name' in creature_df.columns:
|
||||
|
|
@ -80,12 +53,9 @@ class CreatureAdditionMixin:
|
|||
if creature_df.empty:
|
||||
self.output_func("No creature rows in dataset; skipping.")
|
||||
return
|
||||
selected_tags_lower = [t.lower() for _r,t in themes_ordered]
|
||||
if '_parsedThemeTags' not in creature_df.columns:
|
||||
creature_df['_parsedThemeTags'] = creature_df['themeTags'].apply(bu.normalize_tag_cell)
|
||||
creature_df['_normTags'] = creature_df['_parsedThemeTags']
|
||||
creature_df['_multiMatch'] = creature_df['_normTags'].apply(lambda lst: sum(1 for t in selected_tags_lower if t in lst))
|
||||
combine_mode = getattr(self, 'tag_mode', 'AND')
|
||||
creature_df = annotate_theme_matches(creature_df, context)
|
||||
selected_tags_lower = context.selected_slugs()
|
||||
combine_mode = context.combine_mode
|
||||
base_top = 30
|
||||
top_n = int(base_top * getattr(bc, 'THEME_POOL_SIZE_MULTIPLIER', 2.0))
|
||||
synergy_bonus = getattr(bc, 'THEME_PRIORITY_BONUS', 1.2)
|
||||
|
|
@ -116,10 +86,20 @@ class CreatureAdditionMixin:
|
|||
owned_lower = {str(n).lower() for n in getattr(self, 'owned_card_names', set())} if getattr(self, 'prefer_owned', False) else set()
|
||||
owned_mult = getattr(bc, 'PREFER_OWNED_WEIGHT_MULTIPLIER', 1.25)
|
||||
weighted_pool = []
|
||||
for nm in subset_all['name'].tolist():
|
||||
bonus = getattr(context, 'match_bonus', 0.0)
|
||||
user_matches = subset_all['_userMatch'] if '_userMatch' in subset_all.columns else None
|
||||
names_list = subset_all['name'].tolist()
|
||||
for idx, nm in enumerate(names_list):
|
||||
w = weight_strong
|
||||
if owned_lower and str(nm).lower() in owned_lower:
|
||||
w *= owned_mult
|
||||
if user_matches is not None:
|
||||
try:
|
||||
u_count = max(0.0, float(user_matches.iloc[idx]))
|
||||
except Exception:
|
||||
u_count = 0.0
|
||||
if bonus > 1e-9 and u_count > 0:
|
||||
w *= (1.0 + bonus * u_count)
|
||||
weighted_pool.append((nm, w))
|
||||
chosen_all = bu.weighted_sample_without_replacement(weighted_pool, target_cap, rng=getattr(self, 'rng', None))
|
||||
for nm in chosen_all:
|
||||
|
|
@ -127,12 +107,13 @@ class CreatureAdditionMixin:
|
|||
continue
|
||||
row = subset_all[subset_all['name'] == nm].iloc[0]
|
||||
# Which selected themes does this card hit?
|
||||
selected_display_tags = [t for _r, t in themes_ordered]
|
||||
norm_tags = row.get('_normTags', []) if isinstance(row.get('_normTags', []), list) else []
|
||||
try:
|
||||
hits = [t for t in selected_display_tags if str(t).lower() in norm_tags]
|
||||
except Exception:
|
||||
hits = selected_display_tags
|
||||
hits = row.get('_matchTags', [])
|
||||
if not isinstance(hits, list):
|
||||
try:
|
||||
hits = list(hits)
|
||||
except Exception:
|
||||
hits = []
|
||||
match_score = row.get('_matchScore', row.get('_multiMatch', all_cnt))
|
||||
self.add_card(
|
||||
nm,
|
||||
card_type=row.get('type','Creature'),
|
||||
|
|
@ -144,7 +125,7 @@ class CreatureAdditionMixin:
|
|||
sub_role='all_theme',
|
||||
added_by='creature_all_theme',
|
||||
trigger_tag=", ".join(hits) if hits else None,
|
||||
synergy=int(row.get('_multiMatch', all_cnt)) if '_multiMatch' in row else all_cnt
|
||||
synergy=int(round(match_score)) if match_score is not None else int(row.get('_multiMatch', all_cnt))
|
||||
)
|
||||
added_names.append(nm)
|
||||
all_theme_added.append((nm, hits))
|
||||
|
|
@ -153,30 +134,42 @@ class CreatureAdditionMixin:
|
|||
break
|
||||
self.output_func(f"All-Theme AND Pre-Pass: added {len(all_theme_added)} / {target_cap} (matching all {all_cnt} themes)")
|
||||
# Per-theme distribution
|
||||
per_theme_added: Dict[str, List[str]] = {r: [] for r,_t in themes_ordered}
|
||||
for role, tag in themes_ordered:
|
||||
w = weights.get(role, 0.0)
|
||||
per_theme_added: Dict[str, List[str]] = {target.role: [] for target in themes_ordered}
|
||||
for target in themes_ordered:
|
||||
role = target.role
|
||||
tag = target.display
|
||||
slug = target.slug or (str(tag).lower() if tag else "")
|
||||
w = weights.get(role, target.weight if hasattr(target, 'weight') else 0.0)
|
||||
if w <= 0:
|
||||
continue
|
||||
remaining = max(0, desired_total - total_added)
|
||||
if remaining == 0:
|
||||
break
|
||||
target = int(math.ceil(desired_total * w * self._get_rng().uniform(1.0, 1.1)))
|
||||
target = min(target, remaining)
|
||||
if target <= 0:
|
||||
target_count = int(math.ceil(desired_total * w * self._get_rng().uniform(1.0, 1.1)))
|
||||
target_count = min(target_count, remaining)
|
||||
if target_count <= 0:
|
||||
continue
|
||||
tnorm = tag.lower()
|
||||
subset = creature_df[creature_df['_normTags'].apply(lambda lst, tn=tnorm: (tn in lst) or any(tn in x for x in lst))]
|
||||
subset = creature_df[creature_df['_normTags'].apply(lambda lst, tn=slug: (tn in lst) or any(tn in (item or '') for item in lst))]
|
||||
if combine_mode == 'AND' and len(selected_tags_lower) > 1:
|
||||
if (creature_df['_multiMatch'] >= 2).any():
|
||||
subset = subset[subset['_multiMatch'] >= 2]
|
||||
if subset.empty:
|
||||
self.output_func(f"Theme '{tag}' produced no creature candidates.")
|
||||
continue
|
||||
sort_cols: List[str] = []
|
||||
asc: List[bool] = []
|
||||
if '_matchScore' in subset.columns:
|
||||
sort_cols.append('_matchScore')
|
||||
asc.append(False)
|
||||
sort_cols.append('_multiMatch')
|
||||
asc.append(False)
|
||||
if 'edhrecRank' in subset.columns:
|
||||
subset = subset.sort_values(by=['_multiMatch','edhrecRank','manaValue'], ascending=[False, True, True], na_position='last')
|
||||
elif 'manaValue' in subset.columns:
|
||||
subset = subset.sort_values(by=['_multiMatch','manaValue'], ascending=[False, True], na_position='last')
|
||||
sort_cols.append('edhrecRank')
|
||||
asc.append(True)
|
||||
if 'manaValue' in subset.columns:
|
||||
sort_cols.append('manaValue')
|
||||
asc.append(True)
|
||||
subset = subset.sort_values(by=sort_cols, ascending=asc, na_position='last')
|
||||
if getattr(self, 'prefer_owned', False):
|
||||
owned_set = getattr(self, 'owned_card_names', None)
|
||||
if owned_set:
|
||||
|
|
@ -187,25 +180,51 @@ class CreatureAdditionMixin:
|
|||
continue
|
||||
owned_lower = {str(n).lower() for n in getattr(self, 'owned_card_names', set())} if getattr(self, 'prefer_owned', False) else set()
|
||||
owned_mult = getattr(bc, 'PREFER_OWNED_WEIGHT_MULTIPLIER', 1.25)
|
||||
bonus = getattr(context, 'match_bonus', 0.0)
|
||||
if combine_mode == 'AND':
|
||||
weighted_pool = []
|
||||
for nm, mm in zip(pool['name'], pool['_multiMatch']):
|
||||
base_w = (synergy_bonus*1.3 if mm >= 2 else (1.1 if mm == 1 else 0.8))
|
||||
for idx, nm in enumerate(pool['name']):
|
||||
mm = pool.iloc[idx].get('_matchScore', pool.iloc[idx].get('_multiMatch', 0))
|
||||
try:
|
||||
mm_val = float(mm)
|
||||
except Exception:
|
||||
mm_val = 0.0
|
||||
base_w = (synergy_bonus * 1.3 if mm_val >= 2 else (1.1 if mm_val >= 1 else 0.8))
|
||||
if owned_lower and str(nm).lower() in owned_lower:
|
||||
base_w *= owned_mult
|
||||
if bonus > 1e-9:
|
||||
try:
|
||||
u_match = float(pool.iloc[idx].get('_userMatch', 0))
|
||||
except Exception:
|
||||
u_match = 0.0
|
||||
if u_match > 0:
|
||||
base_w *= (1.0 + bonus * u_match)
|
||||
weighted_pool.append((nm, base_w))
|
||||
else:
|
||||
weighted_pool = []
|
||||
for nm, mm in zip(pool['name'], pool['_multiMatch']):
|
||||
base_w = (synergy_bonus if mm >= 2 else 1.0)
|
||||
for idx, nm in enumerate(pool['name']):
|
||||
mm = pool.iloc[idx].get('_matchScore', pool.iloc[idx].get('_multiMatch', 0))
|
||||
try:
|
||||
mm_val = float(mm)
|
||||
except Exception:
|
||||
mm_val = 0.0
|
||||
base_w = (synergy_bonus if mm_val >= 2 else 1.0)
|
||||
if owned_lower and str(nm).lower() in owned_lower:
|
||||
base_w *= owned_mult
|
||||
if bonus > 1e-9:
|
||||
try:
|
||||
u_match = float(pool.iloc[idx].get('_userMatch', 0))
|
||||
except Exception:
|
||||
u_match = 0.0
|
||||
if u_match > 0:
|
||||
base_w *= (1.0 + bonus * u_match)
|
||||
weighted_pool.append((nm, base_w))
|
||||
chosen = bu.weighted_sample_without_replacement(weighted_pool, target, rng=getattr(self, 'rng', None))
|
||||
chosen = bu.weighted_sample_without_replacement(weighted_pool, target_count, rng=getattr(self, 'rng', None))
|
||||
for nm in chosen:
|
||||
if commander_name and nm == commander_name:
|
||||
continue
|
||||
row = pool[pool['name']==nm].iloc[0]
|
||||
match_score = row.get('_matchScore', row.get('_multiMatch', 0))
|
||||
self.add_card(
|
||||
nm,
|
||||
card_type=row.get('type','Creature'),
|
||||
|
|
@ -217,14 +236,15 @@ class CreatureAdditionMixin:
|
|||
sub_role=role,
|
||||
added_by='creature_add',
|
||||
trigger_tag=tag,
|
||||
synergy=int(row.get('_multiMatch', 0)) if '_multiMatch' in row else None
|
||||
synergy=int(round(match_score)) if match_score is not None else int(row.get('_multiMatch', 0)) if '_multiMatch' in row else None
|
||||
)
|
||||
added_names.append(nm)
|
||||
per_theme_added[role].append(nm)
|
||||
total_added += 1
|
||||
if total_added >= desired_total:
|
||||
break
|
||||
self.output_func(f"Added {len(per_theme_added[role])} creatures for {role} theme '{tag}' (target {target}).")
|
||||
source_label = 'User' if target.source == 'user' else role.title()
|
||||
self.output_func(f"Added {len(per_theme_added[role])} creatures for {source_label} theme '{tag}' (target {target_count}).")
|
||||
if total_added >= desired_total:
|
||||
break
|
||||
# Fill remaining if still short
|
||||
|
|
@ -239,10 +259,20 @@ class CreatureAdditionMixin:
|
|||
else:
|
||||
multi_pool = multi_pool[multi_pool['_multiMatch'] > 0]
|
||||
if not multi_pool.empty:
|
||||
sort_cols: List[str] = []
|
||||
asc: List[bool] = []
|
||||
if '_matchScore' in multi_pool.columns:
|
||||
sort_cols.append('_matchScore')
|
||||
asc.append(False)
|
||||
sort_cols.append('_multiMatch')
|
||||
asc.append(False)
|
||||
if 'edhrecRank' in multi_pool.columns:
|
||||
multi_pool = multi_pool.sort_values(by=['_multiMatch','edhrecRank','manaValue'], ascending=[False, True, True], na_position='last')
|
||||
elif 'manaValue' in multi_pool.columns:
|
||||
multi_pool = multi_pool.sort_values(by=['_multiMatch','manaValue'], ascending=[False, True], na_position='last')
|
||||
sort_cols.append('edhrecRank')
|
||||
asc.append(True)
|
||||
if 'manaValue' in multi_pool.columns:
|
||||
sort_cols.append('manaValue')
|
||||
asc.append(True)
|
||||
multi_pool = multi_pool.sort_values(by=sort_cols, ascending=asc, na_position='last')
|
||||
if getattr(self, 'prefer_owned', False):
|
||||
owned_set = getattr(self, 'owned_card_names', None)
|
||||
if owned_set:
|
||||
|
|
@ -262,7 +292,7 @@ class CreatureAdditionMixin:
|
|||
role='creature',
|
||||
sub_role='fill',
|
||||
added_by='creature_fill',
|
||||
synergy=int(row.get('_multiMatch', 0)) if '_multiMatch' in row else None
|
||||
synergy=int(round(row.get('_matchScore', row.get('_multiMatch', 0)))) if '_matchScore' in row else int(row.get('_multiMatch', 0)) if '_multiMatch' in row else None
|
||||
)
|
||||
added_names.append(nm)
|
||||
total_added += 1
|
||||
|
|
@ -278,14 +308,18 @@ class CreatureAdditionMixin:
|
|||
self.output_func(f" - {nm} (tags: {', '.join(hits)})")
|
||||
else:
|
||||
self.output_func(f" - {nm}")
|
||||
for role, tag in themes_ordered:
|
||||
for target in themes_ordered:
|
||||
role = target.role
|
||||
tag = target.display
|
||||
lst = per_theme_added.get(role, [])
|
||||
if lst:
|
||||
self.output_func(f" {role.title()} '{tag}': {len(lst)}")
|
||||
label = 'User' if target.source == 'user' else role.title()
|
||||
self.output_func(f" {label} '{tag}': {len(lst)}")
|
||||
for nm in lst:
|
||||
self.output_func(f" - {nm}")
|
||||
else:
|
||||
self.output_func(f" {role.title()} '{tag}': 0")
|
||||
label = 'User' if target.source == 'user' else role.title()
|
||||
self.output_func(f" {label} '{tag}': 0")
|
||||
self.output_func(f" Total {total_added}/{desired_total}{' (dataset shortfall)' if total_added < desired_total else ''}")
|
||||
|
||||
def add_creatures_phase(self):
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import os
|
|||
|
||||
from .. import builder_utils as bu
|
||||
from .. import builder_constants as bc
|
||||
from ..theme_context import annotate_theme_matches
|
||||
import logging_util
|
||||
|
||||
logger = logging_util.logging.getLogger(__name__)
|
||||
|
|
@ -620,46 +621,17 @@ class SpellAdditionMixin:
|
|||
df = getattr(self, '_combined_cards_df', None)
|
||||
if df is None or df.empty or 'type' not in df.columns:
|
||||
return
|
||||
themes_ordered: List[tuple[str, str]] = []
|
||||
if self.primary_tag:
|
||||
themes_ordered.append(('primary', self.primary_tag))
|
||||
if self.secondary_tag:
|
||||
themes_ordered.append(('secondary', self.secondary_tag))
|
||||
if self.tertiary_tag:
|
||||
themes_ordered.append(('tertiary', self.tertiary_tag))
|
||||
if not themes_ordered:
|
||||
try:
|
||||
context = self.get_theme_context() # type: ignore[attr-defined]
|
||||
except Exception:
|
||||
context = None
|
||||
if context is None or not getattr(context, 'ordered_targets', []):
|
||||
return
|
||||
n_themes = len(themes_ordered)
|
||||
if n_themes == 1:
|
||||
base_map = {'primary': 1.0}
|
||||
elif n_themes == 2:
|
||||
base_map = {'primary': 0.6, 'secondary': 0.4}
|
||||
else:
|
||||
base_map = {'primary': 0.5, 'secondary': 0.3, 'tertiary': 0.2}
|
||||
weights: Dict[str, float] = {}
|
||||
boosted: set[str] = set()
|
||||
if n_themes > 1:
|
||||
for role, tag in themes_ordered:
|
||||
w = base_map.get(role, 0.0)
|
||||
lt = tag.lower()
|
||||
if 'kindred' in lt or 'tribal' in lt:
|
||||
mult = getattr(bc, 'WEIGHT_ADJUSTMENT_FACTORS', {}).get(f'kindred_{role}', 1.0)
|
||||
w *= mult
|
||||
boosted.add(role)
|
||||
weights[role] = w
|
||||
tot = sum(weights.values())
|
||||
if tot > 1.0:
|
||||
for r in weights:
|
||||
weights[r] /= tot
|
||||
else:
|
||||
rem = 1.0 - tot
|
||||
base_sum_unboosted = sum(base_map[r] for r, _ in themes_ordered if r not in boosted)
|
||||
if rem > 1e-6 and base_sum_unboosted > 0:
|
||||
for r, _ in themes_ordered:
|
||||
if r not in boosted:
|
||||
weights[r] += rem * (base_map[r] / base_sum_unboosted)
|
||||
else:
|
||||
weights['primary'] = 1.0
|
||||
themes_ordered = list(context.ordered_targets)
|
||||
selected_tags_lower = context.selected_slugs()
|
||||
if not themes_ordered or not selected_tags_lower:
|
||||
return
|
||||
weights: Dict[str, float] = dict(getattr(context, 'weights', {}))
|
||||
spells_df = df[
|
||||
~df['type'].str.contains('Land', case=False, na=False)
|
||||
& ~df['type'].str.contains('Creature', case=False, na=False)
|
||||
|
|
@ -667,33 +639,33 @@ class SpellAdditionMixin:
|
|||
spells_df = self._apply_bracket_pre_filters(spells_df)
|
||||
if spells_df.empty:
|
||||
return
|
||||
selected_tags_lower = [t.lower() for _r, t in themes_ordered]
|
||||
if '_parsedThemeTags' not in spells_df.columns:
|
||||
spells_df['_parsedThemeTags'] = spells_df['themeTags'].apply(bu.normalize_tag_cell)
|
||||
spells_df['_normTags'] = spells_df['_parsedThemeTags']
|
||||
spells_df['_multiMatch'] = spells_df['_normTags'].apply(
|
||||
lambda lst: sum(1 for t in selected_tags_lower if t in lst)
|
||||
)
|
||||
combine_mode = getattr(self, 'tag_mode', 'AND')
|
||||
spells_df = annotate_theme_matches(spells_df, context)
|
||||
combine_mode = context.combine_mode
|
||||
base_top = 40
|
||||
top_n = int(base_top * getattr(bc, 'THEME_POOL_SIZE_MULTIPLIER', 2.0))
|
||||
synergy_bonus = getattr(bc, 'THEME_PRIORITY_BONUS', 1.2)
|
||||
per_theme_added: Dict[str, List[str]] = {r: [] for r, _t in themes_ordered}
|
||||
per_theme_added: Dict[str, List[str]] = {target.role: [] for target in themes_ordered}
|
||||
total_added = 0
|
||||
for role, tag in themes_ordered:
|
||||
bonus = getattr(context, 'match_bonus', 0.0)
|
||||
for target in themes_ordered:
|
||||
role = target.role
|
||||
tag = target.display
|
||||
slug = target.slug or (str(tag).lower() if tag else "")
|
||||
if not slug:
|
||||
continue
|
||||
if remaining - total_added <= 0:
|
||||
break
|
||||
w = weights.get(role, 0.0)
|
||||
w = weights.get(role, target.weight if hasattr(target, 'weight') else 0.0)
|
||||
if w <= 0:
|
||||
continue
|
||||
target = int(math.ceil(remaining * w * self._get_rng().uniform(1.0, 1.1)))
|
||||
target = min(target, remaining - total_added)
|
||||
if target <= 0:
|
||||
available = remaining - total_added
|
||||
target_count = int(math.ceil(available * w * self._get_rng().uniform(1.0, 1.1)))
|
||||
target_count = min(target_count, available)
|
||||
if target_count <= 0:
|
||||
continue
|
||||
tnorm = tag.lower()
|
||||
subset = spells_df[
|
||||
spells_df['_normTags'].apply(
|
||||
lambda lst, tn=tnorm: (tn in lst) or any(tn in x for x in lst)
|
||||
lambda lst, tn=slug: (tn in lst) or any(tn in (item or '') for item in lst)
|
||||
)
|
||||
]
|
||||
if combine_mode == 'AND' and len(selected_tags_lower) > 1:
|
||||
|
|
@ -701,18 +673,20 @@ class SpellAdditionMixin:
|
|||
subset = subset[subset['_multiMatch'] >= 2]
|
||||
if subset.empty:
|
||||
continue
|
||||
sort_cols: List[str] = []
|
||||
asc: List[bool] = []
|
||||
if '_matchScore' in subset.columns:
|
||||
sort_cols.append('_matchScore')
|
||||
asc.append(False)
|
||||
sort_cols.append('_multiMatch')
|
||||
asc.append(False)
|
||||
if 'edhrecRank' in subset.columns:
|
||||
subset = subset.sort_values(
|
||||
by=['_multiMatch', 'edhrecRank', 'manaValue'],
|
||||
ascending=[False, True, True],
|
||||
na_position='last',
|
||||
)
|
||||
elif 'manaValue' in subset.columns:
|
||||
subset = subset.sort_values(
|
||||
by=['_multiMatch', 'manaValue'],
|
||||
ascending=[False, True],
|
||||
na_position='last',
|
||||
)
|
||||
sort_cols.append('edhrecRank')
|
||||
asc.append(True)
|
||||
if 'manaValue' in subset.columns:
|
||||
sort_cols.append('manaValue')
|
||||
asc.append(True)
|
||||
subset = subset.sort_values(by=sort_cols, ascending=asc, na_position='last')
|
||||
# Prefer-owned: stable reorder before trimming to top_n
|
||||
if getattr(self, 'prefer_owned', False):
|
||||
owned_set = getattr(self, 'owned_card_names', None)
|
||||
|
|
@ -726,23 +700,60 @@ class SpellAdditionMixin:
|
|||
# Build weighted pool with optional owned multiplier
|
||||
owned_lower = {str(n).lower() for n in getattr(self, 'owned_card_names', set())} if getattr(self, 'prefer_owned', False) else set()
|
||||
owned_mult = getattr(bc, 'PREFER_OWNED_WEIGHT_MULTIPLIER', 1.25)
|
||||
base_pairs = list(zip(pool['name'], pool['_multiMatch']))
|
||||
weighted_pool: list[tuple[str, float]] = []
|
||||
if combine_mode == 'AND':
|
||||
for nm, mm in base_pairs:
|
||||
base_w = (synergy_bonus*1.3 if mm >= 2 else (1.1 if mm == 1 else 0.8))
|
||||
for idx, nm in enumerate(pool['name']):
|
||||
mm = pool.iloc[idx].get('_matchScore', pool.iloc[idx].get('_multiMatch', 0))
|
||||
try:
|
||||
mm_val = float(mm)
|
||||
except Exception:
|
||||
mm_val = 0.0
|
||||
base_w = (synergy_bonus * 1.3 if mm_val >= 2 else (1.1 if mm_val >= 1 else 0.8))
|
||||
if owned_lower and str(nm).lower() in owned_lower:
|
||||
base_w *= owned_mult
|
||||
if bonus > 1e-9:
|
||||
try:
|
||||
u_match = float(pool.iloc[idx].get('_userMatch', 0))
|
||||
except Exception:
|
||||
u_match = 0.0
|
||||
if u_match > 0:
|
||||
base_w *= (1.0 + bonus * u_match)
|
||||
weighted_pool.append((nm, base_w))
|
||||
else:
|
||||
for nm, mm in base_pairs:
|
||||
base_w = (synergy_bonus if mm >= 2 else 1.0)
|
||||
for idx, nm in enumerate(pool['name']):
|
||||
mm = pool.iloc[idx].get('_matchScore', pool.iloc[idx].get('_multiMatch', 0))
|
||||
try:
|
||||
mm_val = float(mm)
|
||||
except Exception:
|
||||
mm_val = 0.0
|
||||
base_w = (synergy_bonus if mm_val >= 2 else 1.0)
|
||||
if owned_lower and str(nm).lower() in owned_lower:
|
||||
base_w *= owned_mult
|
||||
if bonus > 1e-9:
|
||||
try:
|
||||
u_match = float(pool.iloc[idx].get('_userMatch', 0))
|
||||
except Exception:
|
||||
u_match = 0.0
|
||||
if u_match > 0:
|
||||
base_w *= (1.0 + bonus * u_match)
|
||||
weighted_pool.append((nm, base_w))
|
||||
chosen = bu.weighted_sample_without_replacement(weighted_pool, target, rng=getattr(self, 'rng', None))
|
||||
chosen = bu.weighted_sample_without_replacement(weighted_pool, target_count, rng=getattr(self, 'rng', None))
|
||||
for nm in chosen:
|
||||
row = pool[pool['name'] == nm].iloc[0]
|
||||
match_score = row.get('_matchScore', row.get('_multiMatch', 0))
|
||||
synergy_value = None
|
||||
try:
|
||||
if match_score is not None:
|
||||
val = float(match_score)
|
||||
if not math.isnan(val):
|
||||
synergy_value = int(round(val))
|
||||
except Exception:
|
||||
synergy_value = None
|
||||
if synergy_value is None and '_multiMatch' in row:
|
||||
try:
|
||||
synergy_value = int(row.get('_multiMatch', 0))
|
||||
except Exception:
|
||||
synergy_value = None
|
||||
self.add_card(
|
||||
nm,
|
||||
card_type=row.get('type', ''),
|
||||
|
|
@ -753,7 +764,7 @@ class SpellAdditionMixin:
|
|||
sub_role=role,
|
||||
added_by='spell_theme_fill',
|
||||
trigger_tag=tag,
|
||||
synergy=int(row.get('_multiMatch', 0)) if '_multiMatch' in row else None
|
||||
synergy=synergy_value
|
||||
)
|
||||
per_theme_added[role].append(nm)
|
||||
total_added += 1
|
||||
|
|
@ -771,18 +782,20 @@ class SpellAdditionMixin:
|
|||
else:
|
||||
multi_pool = multi_pool[multi_pool['_multiMatch'] > 0]
|
||||
if not multi_pool.empty:
|
||||
sort_cols = []
|
||||
asc = []
|
||||
if '_matchScore' in multi_pool.columns:
|
||||
sort_cols.append('_matchScore')
|
||||
asc.append(False)
|
||||
sort_cols.append('_multiMatch')
|
||||
asc.append(False)
|
||||
if 'edhrecRank' in multi_pool.columns:
|
||||
multi_pool = multi_pool.sort_values(
|
||||
by=['_multiMatch', 'edhrecRank', 'manaValue'],
|
||||
ascending=[False, True, True],
|
||||
na_position='last',
|
||||
)
|
||||
elif 'manaValue' in multi_pool.columns:
|
||||
multi_pool = multi_pool.sort_values(
|
||||
by=['_multiMatch', 'manaValue'],
|
||||
ascending=[False, True],
|
||||
na_position='last',
|
||||
)
|
||||
sort_cols.append('edhrecRank')
|
||||
asc.append(True)
|
||||
if 'manaValue' in multi_pool.columns:
|
||||
sort_cols.append('manaValue')
|
||||
asc.append(True)
|
||||
multi_pool = multi_pool.sort_values(by=sort_cols, ascending=asc, na_position='last')
|
||||
if getattr(self, 'prefer_owned', False):
|
||||
owned_set = getattr(self, 'owned_card_names', None)
|
||||
if owned_set:
|
||||
|
|
@ -790,6 +803,20 @@ class SpellAdditionMixin:
|
|||
fill = multi_pool['name'].tolist()[:need]
|
||||
for nm in fill:
|
||||
row = multi_pool[multi_pool['name'] == nm].iloc[0]
|
||||
match_score = row.get('_matchScore', row.get('_multiMatch', 0))
|
||||
synergy_value = None
|
||||
try:
|
||||
if match_score is not None:
|
||||
val = float(match_score)
|
||||
if not math.isnan(val):
|
||||
synergy_value = int(round(val))
|
||||
except Exception:
|
||||
synergy_value = None
|
||||
if synergy_value is None and '_multiMatch' in row:
|
||||
try:
|
||||
synergy_value = int(row.get('_multiMatch', 0))
|
||||
except Exception:
|
||||
synergy_value = None
|
||||
self.add_card(
|
||||
nm,
|
||||
card_type=row.get('type', ''),
|
||||
|
|
@ -799,7 +826,7 @@ class SpellAdditionMixin:
|
|||
role='theme_spell',
|
||||
sub_role='fill_multi',
|
||||
added_by='spell_theme_fill',
|
||||
synergy=int(row.get('_multiMatch', 0)) if '_multiMatch' in row else None
|
||||
synergy=synergy_value
|
||||
)
|
||||
total_added += 1
|
||||
if total_added >= remaining:
|
||||
|
|
@ -875,10 +902,16 @@ class SpellAdditionMixin:
|
|||
self.output_func(f" - {nm}")
|
||||
if total_added:
|
||||
self.output_func("\nFinal Theme Spell Fill:")
|
||||
for role, tag in themes_ordered:
|
||||
for target in themes_ordered:
|
||||
role = target.role
|
||||
tag = target.display
|
||||
lst = per_theme_added.get(role, [])
|
||||
if lst:
|
||||
self.output_func(f" {role.title()} '{tag}': {len(lst)}")
|
||||
if target.source == 'user':
|
||||
label = target.role.replace('_', ' ').title()
|
||||
else:
|
||||
label = role.title()
|
||||
self.output_func(f" {label} '{tag}': {len(lst)}")
|
||||
for nm in lst:
|
||||
self.output_func(f" - {nm}")
|
||||
self.output_func(f" Total Theme Spells Added: {total_added}")
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import datetime as _dt
|
|||
import re as _re
|
||||
import logging_util
|
||||
|
||||
from code.deck_builder.summary_telemetry import record_land_summary
|
||||
from code.deck_builder.summary_telemetry import record_land_summary, record_theme_summary
|
||||
from code.deck_builder.shared_copy import build_land_headline, dfc_card_note
|
||||
|
||||
logger = logging_util.logging.getLogger(__name__)
|
||||
|
|
@ -627,6 +627,12 @@ class ReportingMixin:
|
|||
record_land_summary(land_summary)
|
||||
except Exception: # pragma: no cover - diagnostics only
|
||||
logger.debug("Failed to record MDFC telemetry", exc_info=True)
|
||||
try:
|
||||
theme_payload = self.get_theme_summary_payload() if hasattr(self, "get_theme_summary_payload") else None
|
||||
if theme_payload:
|
||||
record_theme_summary(theme_payload)
|
||||
except Exception: # pragma: no cover - diagnostics only
|
||||
logger.debug("Failed to record theme telemetry", exc_info=True)
|
||||
return summary_payload
|
||||
def export_decklist_csv(self, directory: str = 'deck_files', filename: str | None = None, suppress_output: bool = False) -> str:
|
||||
"""Export current decklist to CSV (enriched).
|
||||
|
|
@ -1046,6 +1052,13 @@ class ReportingMixin:
|
|||
# Capture fetch count (others vary run-to-run and are intentionally not recorded)
|
||||
chosen_fetch = getattr(self, 'fetch_count', None)
|
||||
|
||||
user_themes: List[str] = [
|
||||
str(theme)
|
||||
for theme in getattr(self, 'user_theme_requested', [])
|
||||
if isinstance(theme, str) and theme.strip()
|
||||
]
|
||||
theme_catalog_version = getattr(self, 'theme_catalog_version', None)
|
||||
|
||||
payload = {
|
||||
"commander": getattr(self, 'commander_name', '') or getattr(self, 'commander', '') or '',
|
||||
"primary_tag": getattr(self, 'primary_tag', None),
|
||||
|
|
@ -1067,6 +1080,12 @@ class ReportingMixin:
|
|||
"enforcement_mode": getattr(self, 'enforcement_mode', 'warn'),
|
||||
"allow_illegal": bool(getattr(self, 'allow_illegal', False)),
|
||||
"fuzzy_matching": bool(getattr(self, 'fuzzy_matching', True)),
|
||||
"additional_themes": user_themes,
|
||||
"theme_match_mode": getattr(self, 'theme_match_mode', 'permissive'),
|
||||
"theme_catalog_version": theme_catalog_version,
|
||||
# CamelCase aliases for downstream consumers (web diagnostics, external tooling)
|
||||
"userThemes": user_themes,
|
||||
"themeCatalogVersion": theme_catalog_version,
|
||||
# chosen fetch land count (others intentionally omitted for variance)
|
||||
"fetch_count": chosen_fetch,
|
||||
# actual ideal counts used for this run
|
||||
|
|
|
|||
|
|
@ -8,6 +8,8 @@ from typing import Any, Dict, Iterable
|
|||
__all__ = [
|
||||
"record_land_summary",
|
||||
"get_mdfc_metrics",
|
||||
"record_theme_summary",
|
||||
"get_theme_metrics",
|
||||
]
|
||||
|
||||
|
||||
|
|
@ -22,6 +24,16 @@ _metrics: Dict[str, Any] = {
|
|||
}
|
||||
_top_cards: Counter[str] = Counter()
|
||||
|
||||
_theme_metrics: Dict[str, Any] = {
|
||||
"total_builds": 0,
|
||||
"with_user_themes": 0,
|
||||
"last_updated": None,
|
||||
"last_updated_iso": None,
|
||||
"last_summary": None,
|
||||
}
|
||||
_user_theme_counter: Counter[str] = Counter()
|
||||
_user_theme_labels: Dict[str, str] = {}
|
||||
|
||||
|
||||
def _to_int(value: Any) -> int:
|
||||
try:
|
||||
|
|
@ -120,3 +132,110 @@ def _reset_metrics_for_test() -> None:
|
|||
}
|
||||
)
|
||||
_top_cards.clear()
|
||||
_theme_metrics.update(
|
||||
{
|
||||
"total_builds": 0,
|
||||
"with_user_themes": 0,
|
||||
"last_updated": None,
|
||||
"last_updated_iso": None,
|
||||
"last_summary": None,
|
||||
}
|
||||
)
|
||||
_user_theme_counter.clear()
|
||||
_user_theme_labels.clear()
|
||||
|
||||
|
||||
def _sanitize_theme_list(values: Iterable[Any]) -> list[str]:
|
||||
sanitized: list[str] = []
|
||||
seen: set[str] = set()
|
||||
for raw in values or []: # type: ignore[arg-type]
|
||||
text = str(raw or "").strip()
|
||||
if not text:
|
||||
continue
|
||||
key = text.casefold()
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
sanitized.append(text)
|
||||
return sanitized
|
||||
|
||||
|
||||
def record_theme_summary(theme_summary: Dict[str, Any] | None) -> None:
|
||||
if not isinstance(theme_summary, dict):
|
||||
return
|
||||
|
||||
commander_themes = _sanitize_theme_list(theme_summary.get("commanderThemes") or [])
|
||||
user_themes = _sanitize_theme_list(theme_summary.get("userThemes") or [])
|
||||
requested = _sanitize_theme_list(theme_summary.get("requested") or [])
|
||||
resolved = _sanitize_theme_list(theme_summary.get("resolved") or [])
|
||||
unresolved_raw = theme_summary.get("unresolved") or []
|
||||
if isinstance(unresolved_raw, (list, tuple)):
|
||||
unresolved = [str(item).strip() for item in unresolved_raw if str(item).strip()]
|
||||
else:
|
||||
unresolved = []
|
||||
mode = str(theme_summary.get("mode") or "AND")
|
||||
try:
|
||||
weight = float(theme_summary.get("weight", 1.0) or 1.0)
|
||||
except Exception:
|
||||
weight = 1.0
|
||||
catalog_version = theme_summary.get("themeCatalogVersion")
|
||||
matches = theme_summary.get("matches") if isinstance(theme_summary.get("matches"), list) else []
|
||||
fuzzy = theme_summary.get("fuzzyCorrections") if isinstance(theme_summary.get("fuzzyCorrections"), dict) else {}
|
||||
|
||||
merged: list[str] = []
|
||||
seen_merge: set[str] = set()
|
||||
for collection in (commander_themes, user_themes):
|
||||
for item in collection:
|
||||
key = item.casefold()
|
||||
if key in seen_merge:
|
||||
continue
|
||||
seen_merge.add(key)
|
||||
merged.append(item)
|
||||
|
||||
timestamp = time.time()
|
||||
iso = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(timestamp))
|
||||
|
||||
with _lock:
|
||||
_theme_metrics["total_builds"] = int(_theme_metrics.get("total_builds", 0) or 0) + 1
|
||||
if user_themes:
|
||||
_theme_metrics["with_user_themes"] = int(_theme_metrics.get("with_user_themes", 0) or 0) + 1
|
||||
for label in user_themes:
|
||||
key = label.casefold()
|
||||
_user_theme_counter[key] += 1
|
||||
if key not in _user_theme_labels:
|
||||
_user_theme_labels[key] = label
|
||||
_theme_metrics["last_summary"] = {
|
||||
"commanderThemes": commander_themes,
|
||||
"userThemes": user_themes,
|
||||
"mergedThemes": merged,
|
||||
"requested": requested,
|
||||
"resolved": resolved,
|
||||
"unresolved": unresolved,
|
||||
"unresolvedCount": len(unresolved),
|
||||
"mode": mode,
|
||||
"weight": weight,
|
||||
"matches": matches,
|
||||
"fuzzyCorrections": fuzzy,
|
||||
"themeCatalogVersion": catalog_version,
|
||||
}
|
||||
_theme_metrics["last_updated"] = timestamp
|
||||
_theme_metrics["last_updated_iso"] = iso
|
||||
|
||||
|
||||
def get_theme_metrics() -> Dict[str, Any]:
|
||||
with _lock:
|
||||
total = int(_theme_metrics.get("total_builds", 0) or 0)
|
||||
with_user = int(_theme_metrics.get("with_user_themes", 0) or 0)
|
||||
share = (with_user / total) if total else 0.0
|
||||
top_user: list[Dict[str, Any]] = []
|
||||
for key, count in _user_theme_counter.most_common(10):
|
||||
label = _user_theme_labels.get(key, key)
|
||||
top_user.append({"theme": label, "count": int(count)})
|
||||
return {
|
||||
"total_builds": total,
|
||||
"with_user_themes": with_user,
|
||||
"user_theme_share": share,
|
||||
"last_summary": _theme_metrics.get("last_summary"),
|
||||
"last_updated": _theme_metrics.get("last_updated_iso"),
|
||||
"top_user_themes": top_user,
|
||||
}
|
||||
|
|
|
|||
227
code/deck_builder/theme_catalog_loader.py
Normal file
227
code/deck_builder/theme_catalog_loader.py
Normal file
|
|
@ -0,0 +1,227 @@
|
|||
"""Lightweight loader for the supplemental theme catalog CSV."""
|
||||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from functools import lru_cache
|
||||
from pathlib import Path
|
||||
from typing import Iterable, Tuple
|
||||
|
||||
from code.logging_util import get_logger
|
||||
|
||||
LOGGER = get_logger(__name__)
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[2]
|
||||
DEFAULT_CATALOG_PATH = ROOT / "config" / "themes" / "theme_catalog.csv"
|
||||
JSON_FALLBACK_PATH = ROOT / "config" / "themes" / "theme_list.json"
|
||||
REQUIRED_COLUMNS = {"theme", "commander_count", "card_count"}
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ThemeCatalogEntry:
|
||||
"""Single row from the supplemental theme catalog."""
|
||||
|
||||
theme: str
|
||||
commander_count: int
|
||||
card_count: int
|
||||
|
||||
@property
|
||||
def source_count(self) -> int:
|
||||
return self.commander_count + self.card_count
|
||||
|
||||
|
||||
def _resolve_catalog_path(override: str | os.PathLike[str] | None) -> Path:
|
||||
if override:
|
||||
return Path(override).resolve()
|
||||
env_override = os.environ.get("THEME_CATALOG_PATH")
|
||||
if env_override:
|
||||
return Path(env_override).resolve()
|
||||
return DEFAULT_CATALOG_PATH
|
||||
|
||||
|
||||
def _parse_metadata(line: str) -> Tuple[str, dict[str, str]]:
|
||||
version = "unknown"
|
||||
meta: dict[str, str] = {}
|
||||
cleaned = line.lstrip("#").strip()
|
||||
if not cleaned:
|
||||
return version, meta
|
||||
for token in cleaned.split():
|
||||
if "=" not in token:
|
||||
continue
|
||||
key, value = token.split("=", 1)
|
||||
meta[key] = value
|
||||
if key == "version":
|
||||
version = value
|
||||
return version, meta
|
||||
|
||||
|
||||
def _to_int(value: object) -> int:
|
||||
if value is None:
|
||||
return 0
|
||||
if isinstance(value, int):
|
||||
return value
|
||||
text = str(value).strip()
|
||||
if not text:
|
||||
return 0
|
||||
return int(text)
|
||||
|
||||
|
||||
def load_theme_catalog(
|
||||
catalog_path: str | os.PathLike[str] | None = None,
|
||||
) -> tuple[list[ThemeCatalogEntry], str]:
|
||||
"""Load the supplemental theme catalog with memoization.
|
||||
|
||||
Args:
|
||||
catalog_path: Optional override path. Defaults to ``config/themes/theme_catalog.csv``
|
||||
or the ``THEME_CATALOG_PATH`` environment variable.
|
||||
|
||||
Returns:
|
||||
A tuple of ``(entries, version)`` where ``entries`` is a list of
|
||||
:class:`ThemeCatalogEntry` and ``version`` is the parsed catalog version.
|
||||
"""
|
||||
|
||||
resolved = _resolve_catalog_path(catalog_path)
|
||||
mtime = 0.0
|
||||
try:
|
||||
mtime = resolved.stat().st_mtime
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
entries, version = _load_catalog_cached(str(resolved), mtime)
|
||||
if entries:
|
||||
return list(entries), version
|
||||
# Fallback to JSON catalog when CSV export unavailable.
|
||||
fallback_entries, fallback_version = _load_json_catalog()
|
||||
if fallback_entries:
|
||||
return list(fallback_entries), fallback_version
|
||||
return list(entries), version
|
||||
|
||||
|
||||
@lru_cache(maxsize=4)
|
||||
def _load_catalog_cached(path_str: str, mtime: float) -> tuple[tuple[ThemeCatalogEntry, ...], str]:
|
||||
path = Path(path_str)
|
||||
if not path.exists():
|
||||
LOGGER.warning("theme_catalog_missing path=%s", path)
|
||||
return tuple(), "unknown"
|
||||
|
||||
with path.open("r", encoding="utf-8") as handle:
|
||||
first_line = handle.readline()
|
||||
version = "unknown"
|
||||
if first_line.startswith("#"):
|
||||
version, _ = _parse_metadata(first_line)
|
||||
else:
|
||||
handle.seek(0)
|
||||
|
||||
reader = csv.DictReader(handle)
|
||||
if reader.fieldnames is None:
|
||||
LOGGER.info("theme_catalog_loaded size=0 version=%s path=%s", version, path)
|
||||
return tuple(), version
|
||||
|
||||
missing = REQUIRED_COLUMNS - set(reader.fieldnames)
|
||||
if missing:
|
||||
raise ValueError(
|
||||
"theme_catalog.csv missing required columns: " + ", ".join(sorted(missing))
|
||||
)
|
||||
|
||||
entries: list[ThemeCatalogEntry] = []
|
||||
for row in reader:
|
||||
if not row:
|
||||
continue
|
||||
theme = str(row.get("theme", "")).strip()
|
||||
if not theme:
|
||||
continue
|
||||
try:
|
||||
commander = _to_int(row.get("commander_count"))
|
||||
card = _to_int(row.get("card_count"))
|
||||
except ValueError as exc: # pragma: no cover - defensive, should not happen
|
||||
raise ValueError(f"Invalid numeric values in theme catalog for theme '{theme}'") from exc
|
||||
entries.append(ThemeCatalogEntry(theme=theme, commander_count=commander, card_count=card))
|
||||
|
||||
LOGGER.info("theme_catalog_loaded size=%s version=%s path=%s", len(entries), version, path)
|
||||
return tuple(entries), version
|
||||
|
||||
|
||||
def _load_json_catalog() -> tuple[tuple[ThemeCatalogEntry, ...], str]:
|
||||
if not JSON_FALLBACK_PATH.exists():
|
||||
return tuple(), "unknown"
|
||||
try:
|
||||
mtime = JSON_FALLBACK_PATH.stat().st_mtime
|
||||
except Exception: # pragma: no cover - stat failures
|
||||
mtime = 0.0
|
||||
return _load_json_catalog_cached(str(JSON_FALLBACK_PATH), mtime)
|
||||
|
||||
|
||||
@lru_cache(maxsize=2)
|
||||
def _load_json_catalog_cached(path_str: str, mtime: float) -> tuple[tuple[ThemeCatalogEntry, ...], str]:
|
||||
path = Path(path_str)
|
||||
try:
|
||||
raw_text = path.read_text(encoding="utf-8")
|
||||
except Exception as exc: # pragma: no cover - IO edge cases
|
||||
LOGGER.warning("theme_catalog_json_read_error path=%s error=%s", path, exc)
|
||||
return tuple(), "unknown"
|
||||
if not raw_text.strip():
|
||||
return tuple(), "unknown"
|
||||
try:
|
||||
payload = json.loads(raw_text)
|
||||
except Exception as exc: # pragma: no cover - malformed JSON
|
||||
LOGGER.warning("theme_catalog_json_parse_error path=%s error=%s", path, exc)
|
||||
return tuple(), "unknown"
|
||||
themes = _iter_json_themes(payload)
|
||||
entries = tuple(themes)
|
||||
if not entries:
|
||||
return tuple(), "unknown"
|
||||
version = _extract_json_version(payload)
|
||||
LOGGER.info("theme_catalog_loaded_json size=%s version=%s path=%s", len(entries), version, path)
|
||||
return entries, version
|
||||
|
||||
|
||||
def _iter_json_themes(payload: object) -> Iterable[ThemeCatalogEntry]:
|
||||
if not isinstance(payload, dict):
|
||||
LOGGER.warning("theme_catalog_json_invalid_root type=%s", type(payload).__name__)
|
||||
return tuple()
|
||||
try:
|
||||
from type_definitions_theme_catalog import ThemeCatalog # pragma: no cover - primary import path
|
||||
except ImportError: # pragma: no cover - fallback when running as package
|
||||
from code.type_definitions_theme_catalog import ThemeCatalog # type: ignore
|
||||
|
||||
try:
|
||||
catalog = ThemeCatalog.model_validate(payload)
|
||||
except Exception as exc: # pragma: no cover - validation errors
|
||||
LOGGER.warning("theme_catalog_json_validate_error error=%s", exc)
|
||||
return tuple()
|
||||
|
||||
for theme in catalog.themes:
|
||||
commander_count = len(theme.example_commanders or [])
|
||||
# Prefer synergy count, fall back to example cards, ensure non-negative.
|
||||
inferred_card_count = max(len(theme.synergies or []), len(theme.example_cards or []))
|
||||
yield ThemeCatalogEntry(
|
||||
theme=theme.theme,
|
||||
commander_count=int(commander_count),
|
||||
card_count=int(inferred_card_count),
|
||||
)
|
||||
|
||||
|
||||
def _extract_json_version(payload: object) -> str:
|
||||
if not isinstance(payload, dict):
|
||||
return "json"
|
||||
meta = payload.get("metadata_info")
|
||||
if isinstance(meta, dict):
|
||||
version = meta.get("version")
|
||||
if isinstance(version, str) and version.strip():
|
||||
return version.strip()
|
||||
# Fallback to catalog hash if available
|
||||
recorded = None
|
||||
if isinstance(meta, dict):
|
||||
recorded = meta.get("catalog_hash")
|
||||
if isinstance(recorded, str) and recorded.strip():
|
||||
return recorded.strip()
|
||||
provenance = payload.get("provenance")
|
||||
if isinstance(provenance, dict):
|
||||
version = provenance.get("version")
|
||||
if isinstance(version, str) and version.strip():
|
||||
return version.strip()
|
||||
return "json"
|
||||
|
||||
|
||||
__all__ = ["ThemeCatalogEntry", "load_theme_catalog"]
|
||||
318
code/deck_builder/theme_context.py
Normal file
318
code/deck_builder/theme_context.py
Normal file
|
|
@ -0,0 +1,318 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, Iterable, List, Optional, Sequence
|
||||
|
||||
from deck_builder import builder_utils as bu
|
||||
from deck_builder.theme_matcher import normalize_theme
|
||||
from deck_builder.theme_resolution import ThemeResolutionInfo
|
||||
|
||||
import logging_util
|
||||
|
||||
logger = logging_util.logging.getLogger(__name__)
|
||||
|
||||
__all__ = [
|
||||
"ThemeTarget",
|
||||
"ThemeContext",
|
||||
"default_user_theme_weight",
|
||||
"build_theme_context",
|
||||
"annotate_theme_matches",
|
||||
"theme_summary_payload",
|
||||
]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ThemeTarget:
|
||||
"""Represents a prioritized theme target for selection weighting."""
|
||||
|
||||
role: str
|
||||
display: str
|
||||
slug: str
|
||||
source: str # "commander" | "user"
|
||||
weight: float = 0.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class ThemeContext:
|
||||
"""Captured theme aggregation for card selection and diagnostics."""
|
||||
|
||||
ordered_targets: List[ThemeTarget]
|
||||
combine_mode: str
|
||||
weights: Dict[str, float]
|
||||
commander_slugs: List[str]
|
||||
user_slugs: List[str]
|
||||
resolution: Optional[ThemeResolutionInfo]
|
||||
user_theme_weight: float
|
||||
|
||||
def selected_slugs(self) -> List[str]:
|
||||
return [target.slug for target in self.ordered_targets if target.slug]
|
||||
|
||||
@property
|
||||
def commander_selected(self) -> List[str]:
|
||||
return list(self.commander_slugs)
|
||||
|
||||
@property
|
||||
def user_selected(self) -> List[str]:
|
||||
return list(self.user_slugs)
|
||||
|
||||
@property
|
||||
def match_multiplier(self) -> float:
|
||||
try:
|
||||
value = float(self.user_theme_weight)
|
||||
except Exception:
|
||||
value = 1.0
|
||||
return value if value > 0 else 1.0
|
||||
|
||||
@property
|
||||
def match_bonus(self) -> float:
|
||||
return max(0.0, self.match_multiplier - 1.0)
|
||||
|
||||
|
||||
def default_user_theme_weight() -> float:
|
||||
"""Read the default user theme weighting multiplier from the environment."""
|
||||
|
||||
raw = os.getenv("USER_THEME_WEIGHT")
|
||||
if raw is None:
|
||||
return 1.0
|
||||
try:
|
||||
value = float(raw)
|
||||
except Exception:
|
||||
logger.warning("Invalid USER_THEME_WEIGHT=%s; falling back to 1.0", raw)
|
||||
return 1.0
|
||||
return value if value >= 0 else 0.0
|
||||
|
||||
|
||||
def _normalize_role(role: str) -> str:
|
||||
try:
|
||||
return str(role).strip().lower()
|
||||
except Exception:
|
||||
return str(role)
|
||||
|
||||
|
||||
def _normalize_tag(value: str | None) -> str:
|
||||
if not value:
|
||||
return ""
|
||||
try:
|
||||
return normalize_theme(value)
|
||||
except Exception:
|
||||
return str(value).strip().lower()
|
||||
|
||||
|
||||
def _theme_weight_factors(
|
||||
commander_targets: Sequence[ThemeTarget],
|
||||
user_targets: Sequence[ThemeTarget],
|
||||
user_theme_weight: float,
|
||||
) -> Dict[str, float]:
|
||||
"""Compute normalized weight allocations for commander and user themes."""
|
||||
|
||||
role_factors = {
|
||||
"primary": 1.0,
|
||||
"secondary": 0.75,
|
||||
"tertiary": 0.5,
|
||||
}
|
||||
raw_weights: Dict[str, float] = {}
|
||||
for target in commander_targets:
|
||||
factor = role_factors.get(_normalize_role(target.role), 0.5)
|
||||
raw_weights[target.role] = max(0.0, factor)
|
||||
user_total = max(0.0, user_theme_weight)
|
||||
per_user = (user_total / len(user_targets)) if user_targets else 0.0
|
||||
for target in user_targets:
|
||||
raw_weights[target.role] = max(0.0, per_user)
|
||||
total = sum(raw_weights.values())
|
||||
if total <= 0:
|
||||
if commander_targets:
|
||||
fallback = 1.0 / len(commander_targets)
|
||||
for target in commander_targets:
|
||||
raw_weights[target.role] = fallback
|
||||
elif user_targets:
|
||||
fallback = 1.0 / len(user_targets)
|
||||
for target in user_targets:
|
||||
raw_weights[target.role] = fallback
|
||||
else:
|
||||
return {}
|
||||
total = sum(raw_weights.values())
|
||||
return {role: weight / total for role, weight in raw_weights.items()}
|
||||
|
||||
|
||||
def build_theme_context(builder: Any) -> ThemeContext:
|
||||
"""Construct theme ordering, weights, and resolution metadata from a builder."""
|
||||
|
||||
commander_targets: List[ThemeTarget] = []
|
||||
for role in ("primary", "secondary", "tertiary"):
|
||||
tag = getattr(builder, f"{role}_tag", None)
|
||||
if not tag:
|
||||
continue
|
||||
slug = _normalize_tag(tag)
|
||||
commander_targets.append(
|
||||
ThemeTarget(role=role, display=str(tag), slug=slug, source="commander")
|
||||
)
|
||||
|
||||
user_resolved: List[str] = []
|
||||
resolution = getattr(builder, "user_theme_resolution", None)
|
||||
if resolution is not None and isinstance(resolution, ThemeResolutionInfo):
|
||||
user_resolved = list(resolution.resolved)
|
||||
else:
|
||||
raw_resolved = getattr(builder, "user_theme_resolved", [])
|
||||
if isinstance(raw_resolved, (list, tuple)):
|
||||
user_resolved = [str(item) for item in raw_resolved if str(item).strip()]
|
||||
user_targets: List[ThemeTarget] = []
|
||||
for index, theme in enumerate(user_resolved):
|
||||
slug = _normalize_tag(theme)
|
||||
role = f"user_{index + 1}"
|
||||
user_targets.append(
|
||||
ThemeTarget(role=role, display=str(theme), slug=slug, source="user")
|
||||
)
|
||||
|
||||
combine_mode = str(getattr(builder, "tag_mode", "AND") or "AND").upper()
|
||||
user_theme_weight = float(getattr(builder, "user_theme_weight", default_user_theme_weight()))
|
||||
weights = _theme_weight_factors(commander_targets, user_targets, user_theme_weight)
|
||||
|
||||
ordered_raw = commander_targets + user_targets
|
||||
ordered = [
|
||||
ThemeTarget(
|
||||
role=target.role,
|
||||
display=target.display,
|
||||
slug=target.slug,
|
||||
source=target.source,
|
||||
weight=weights.get(target.role, 0.0),
|
||||
)
|
||||
for target in ordered_raw
|
||||
]
|
||||
commander_slugs = [target.slug for target in ordered if target.source == "commander" and target.slug]
|
||||
user_slugs = [target.slug for target in ordered if target.source == "user" and target.slug]
|
||||
|
||||
info = resolution if isinstance(resolution, ThemeResolutionInfo) else None
|
||||
|
||||
# Log once per context creation for diagnostics
|
||||
try:
|
||||
logger.debug(
|
||||
"Theme context constructed: commander=%s user=%s mode=%s weight=%.3f",
|
||||
commander_slugs,
|
||||
user_slugs,
|
||||
combine_mode,
|
||||
user_theme_weight,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
for target in ordered:
|
||||
if target.source != "user":
|
||||
continue
|
||||
effective_weight = weights.get(target.role, target.weight)
|
||||
logger.info(
|
||||
"user_theme_applied theme='%s' slug=%s role=%s weight=%.3f mode=%s multiplier=%.3f",
|
||||
target.display,
|
||||
target.slug,
|
||||
target.role,
|
||||
float(effective_weight or 0.0),
|
||||
combine_mode,
|
||||
float(user_theme_weight or 0.0),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return ThemeContext(
|
||||
ordered_targets=ordered,
|
||||
combine_mode=combine_mode,
|
||||
weights=weights,
|
||||
commander_slugs=commander_slugs,
|
||||
user_slugs=user_slugs,
|
||||
resolution=info,
|
||||
user_theme_weight=user_theme_weight,
|
||||
)
|
||||
|
||||
|
||||
def annotate_theme_matches(df, context: ThemeContext):
|
||||
"""Add commander/user match columns to a working dataframe."""
|
||||
|
||||
if df is None or getattr(df, "empty", True):
|
||||
return df
|
||||
if "_parsedThemeTags" not in df.columns:
|
||||
df = df.copy()
|
||||
df["_parsedThemeTags"] = df["themeTags"].apply(bu.normalize_tag_cell)
|
||||
if "_normTags" not in df.columns:
|
||||
df = df.copy()
|
||||
df["_normTags"] = df["_parsedThemeTags"]
|
||||
|
||||
commander_set = set(context.commander_slugs)
|
||||
user_set = set(context.user_slugs)
|
||||
|
||||
def _match_count(tags: Iterable[str], needles: set[str]) -> int:
|
||||
if not tags or not needles:
|
||||
return 0
|
||||
try:
|
||||
return sum(1 for tag in tags if tag in needles)
|
||||
except Exception:
|
||||
total = 0
|
||||
for tag in tags:
|
||||
try:
|
||||
if tag in needles:
|
||||
total += 1
|
||||
except Exception:
|
||||
continue
|
||||
return total
|
||||
|
||||
df["_commanderMatch"] = df["_normTags"].apply(lambda tags: _match_count(tags, commander_set))
|
||||
df["_userMatch"] = df["_normTags"].apply(lambda tags: _match_count(tags, user_set))
|
||||
df["_multiMatch"] = df["_commanderMatch"] + df["_userMatch"]
|
||||
bonus = context.match_bonus
|
||||
if bonus > 0:
|
||||
df["_matchScore"] = df["_multiMatch"] + (df["_userMatch"] * bonus)
|
||||
else:
|
||||
df["_matchScore"] = df["_multiMatch"]
|
||||
|
||||
def _collect_hits(tags: Iterable[str]) -> List[str]:
|
||||
if not tags:
|
||||
return []
|
||||
hits: List[str] = []
|
||||
seen: set[str] = set()
|
||||
for target in context.ordered_targets:
|
||||
slug = target.slug
|
||||
if not slug or slug in seen:
|
||||
continue
|
||||
try:
|
||||
if slug in tags:
|
||||
hits.append(target.display)
|
||||
seen.add(slug)
|
||||
except Exception:
|
||||
continue
|
||||
return hits
|
||||
|
||||
df["_matchTags"] = df["_normTags"].apply(_collect_hits)
|
||||
return df
|
||||
|
||||
|
||||
def theme_summary_payload(context: ThemeContext) -> Dict[str, Any]:
|
||||
"""Produce a structured payload for UI/JSON exports summarizing themes."""
|
||||
|
||||
info = context.resolution
|
||||
requested: List[str] = []
|
||||
resolved: List[str] = []
|
||||
unresolved: List[str] = []
|
||||
matches: List[Dict[str, Any]] = []
|
||||
fuzzy: Dict[str, str] = {}
|
||||
catalog_version: Optional[str] = None
|
||||
if info is not None:
|
||||
requested = list(info.requested)
|
||||
resolved = list(info.resolved)
|
||||
unresolved = [item.get("input", "") for item in info.unresolved]
|
||||
matches = list(info.matches)
|
||||
fuzzy = dict(info.fuzzy_corrections)
|
||||
catalog_version = info.catalog_version
|
||||
else:
|
||||
resolved = [target.display for target in context.ordered_targets if target.source == "user"]
|
||||
|
||||
return {
|
||||
"commanderThemes": [target.display for target in context.ordered_targets if target.source == "commander"],
|
||||
"userThemes": [target.display for target in context.ordered_targets if target.source == "user"],
|
||||
"requested": requested,
|
||||
"resolved": resolved,
|
||||
"unresolved": unresolved,
|
||||
"matches": matches,
|
||||
"fuzzyCorrections": fuzzy,
|
||||
"mode": context.combine_mode,
|
||||
"weight": context.user_theme_weight,
|
||||
"themeCatalogVersion": catalog_version,
|
||||
}
|
||||
257
code/deck_builder/theme_matcher.py
Normal file
257
code/deck_builder/theme_matcher.py
Normal file
|
|
@ -0,0 +1,257 @@
|
|||
"""Fuzzy matching utilities for supplemental theme selection."""
|
||||
from __future__ import annotations
|
||||
|
||||
import difflib
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from functools import lru_cache
|
||||
from typing import Iterable, List, Sequence
|
||||
|
||||
from code.deck_builder.theme_catalog_loader import ThemeCatalogEntry
|
||||
|
||||
__all__ = [
|
||||
"normalize_theme",
|
||||
"ThemeScore",
|
||||
"ResolutionResult",
|
||||
"ThemeMatcher",
|
||||
"HIGH_MATCH_THRESHOLD",
|
||||
"ACCEPT_MATCH_THRESHOLD",
|
||||
"SUGGEST_MATCH_THRESHOLD",
|
||||
]
|
||||
|
||||
_SPACE_RE = re.compile(r"\s+")
|
||||
_NON_ALNUM_RE = re.compile(r"[^a-z0-9 ]+")
|
||||
|
||||
HIGH_MATCH_THRESHOLD = 90.0
|
||||
ACCEPT_MATCH_THRESHOLD = 80.0
|
||||
SUGGEST_MATCH_THRESHOLD = 60.0
|
||||
MIN_QUERY_LENGTH = 3
|
||||
MAX_SUGGESTIONS = 5
|
||||
|
||||
|
||||
def normalize_theme(value: str) -> str:
|
||||
text = (value or "").strip()
|
||||
text = _SPACE_RE.sub(" ", text)
|
||||
return text.casefold()
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class _IndexedTheme:
|
||||
display: str
|
||||
normalized: str
|
||||
tokens: tuple[str, ...]
|
||||
trigrams: tuple[str, ...]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ThemeScore:
|
||||
theme: str
|
||||
score: float
|
||||
|
||||
def rounded(self) -> float:
|
||||
return round(self.score, 4)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ResolutionResult:
|
||||
matched_theme: str | None
|
||||
score: float
|
||||
reason: str
|
||||
suggestions: List[ThemeScore]
|
||||
|
||||
|
||||
def _tokenize(text: str) -> tuple[str, ...]:
|
||||
cleaned = _NON_ALNUM_RE.sub(" ", text)
|
||||
parts = [p for p in cleaned.split() if p]
|
||||
return tuple(parts)
|
||||
|
||||
|
||||
def _trigrams(text: str) -> tuple[str, ...]:
|
||||
text = text.replace(" ", "_")
|
||||
if len(text) < 3:
|
||||
return tuple(text)
|
||||
extended = f"__{text}__"
|
||||
grams = [extended[i : i + 3] for i in range(len(extended) - 2)]
|
||||
return tuple(sorted(set(grams)))
|
||||
|
||||
|
||||
def _build_index(entries: Sequence[ThemeCatalogEntry]) -> tuple[tuple[_IndexedTheme, ...], dict[str, set[int]]]:
|
||||
indexed: list[_IndexedTheme] = []
|
||||
trigram_map: dict[str, set[int]] = {}
|
||||
for idx, entry in enumerate(entries):
|
||||
norm = normalize_theme(entry.theme)
|
||||
tokens = _tokenize(norm)
|
||||
trigrams = _trigrams(norm)
|
||||
indexed.append(
|
||||
_IndexedTheme(
|
||||
display=entry.theme,
|
||||
normalized=norm,
|
||||
tokens=tokens,
|
||||
trigrams=trigrams,
|
||||
)
|
||||
)
|
||||
for gram in trigrams:
|
||||
trigram_map.setdefault(gram, set()).add(idx)
|
||||
return tuple(indexed), trigram_map
|
||||
|
||||
|
||||
@dataclass
|
||||
class _QueryInfo:
|
||||
normalized: str
|
||||
tokens: tuple[str, ...]
|
||||
trigrams: tuple[str, ...]
|
||||
|
||||
|
||||
def _levenshtein(a: str, b: str) -> int:
|
||||
if a == b:
|
||||
return 0
|
||||
if not a:
|
||||
return len(b)
|
||||
if not b:
|
||||
return len(a)
|
||||
if len(a) < len(b):
|
||||
a, b = b, a
|
||||
previous = list(range(len(b) + 1))
|
||||
for i, ca in enumerate(a, start=1):
|
||||
current = [i]
|
||||
for j, cb in enumerate(b, start=1):
|
||||
insert_cost = current[j - 1] + 1
|
||||
delete_cost = previous[j] + 1
|
||||
replace_cost = previous[j - 1] + (0 if ca == cb else 1)
|
||||
current.append(min(insert_cost, delete_cost, replace_cost))
|
||||
previous = current
|
||||
return previous[-1]
|
||||
|
||||
|
||||
def _similarity(query: _QueryInfo, candidate: _IndexedTheme) -> float:
|
||||
if not candidate.trigrams:
|
||||
return 0.0
|
||||
if query.normalized == candidate.normalized:
|
||||
return 100.0
|
||||
|
||||
query_tokens = set(query.tokens)
|
||||
candidate_tokens = set(candidate.tokens)
|
||||
shared_tokens = len(query_tokens & candidate_tokens)
|
||||
token_base = max(len(query_tokens), len(candidate_tokens), 1)
|
||||
token_score = 100.0 * shared_tokens / token_base
|
||||
|
||||
query_trigrams = set(query.trigrams)
|
||||
candidate_trigrams = set(candidate.trigrams)
|
||||
if not query_trigrams:
|
||||
trigram_score = 0.0
|
||||
else:
|
||||
intersection = len(query_trigrams & candidate_trigrams)
|
||||
union = len(query_trigrams | candidate_trigrams)
|
||||
trigram_score = 100.0 * intersection / union if union else 0.0
|
||||
|
||||
seq_score = 100.0 * difflib.SequenceMatcher(None, query.normalized, candidate.normalized).ratio()
|
||||
distance = _levenshtein(query.normalized, candidate.normalized)
|
||||
max_len = max(len(query.normalized), len(candidate.normalized))
|
||||
distance_score = 100.0 * (1.0 - distance / max_len) if max_len else 0.0
|
||||
|
||||
prefix_bonus = 5.0 if candidate.normalized.startswith(query.normalized) else 0.0
|
||||
token_prefix_bonus = 5.0 if candidate.tokens and query.tokens and candidate.tokens[0].startswith(query.tokens[0]) else 0.0
|
||||
token_similarity_bonus = 0.0
|
||||
if query.tokens and candidate.tokens:
|
||||
token_similarity_bonus = 5.0 * difflib.SequenceMatcher(None, query.tokens[0], candidate.tokens[0]).ratio()
|
||||
distance_bonus = 0.0
|
||||
if distance <= 2:
|
||||
distance_bonus = 10.0 - (3.0 * distance)
|
||||
|
||||
score = (
|
||||
0.3 * trigram_score
|
||||
+ 0.2 * token_score
|
||||
+ 0.3 * seq_score
|
||||
+ 0.2 * distance_score
|
||||
+ prefix_bonus
|
||||
+ token_prefix_bonus
|
||||
+ distance_bonus
|
||||
+ token_similarity_bonus
|
||||
)
|
||||
if distance <= 2:
|
||||
score = max(score, 85.0 - 5.0 * distance)
|
||||
return min(score, 100.0)
|
||||
|
||||
|
||||
class ThemeMatcher:
|
||||
"""Fuzzy matcher backed by a trigram index.
|
||||
|
||||
On dev hardware (2025-10-02) resolving 20 queries against a 400-theme
|
||||
catalog completes in ≈0.65s (~0.03s per query) including Levenshtein
|
||||
scoring.
|
||||
"""
|
||||
|
||||
def __init__(self, entries: Sequence[ThemeCatalogEntry]):
|
||||
self._entries: tuple[_IndexedTheme, ...]
|
||||
self._trigram_index: dict[str, set[int]]
|
||||
self._entries, self._trigram_index = _build_index(entries)
|
||||
|
||||
@classmethod
|
||||
def from_entries(cls, entries: Iterable[ThemeCatalogEntry]) -> "ThemeMatcher":
|
||||
return cls(list(entries))
|
||||
|
||||
def resolve(self, raw_query: str, *, limit: int = MAX_SUGGESTIONS) -> ResolutionResult:
|
||||
normalized = normalize_theme(raw_query)
|
||||
if not normalized:
|
||||
return ResolutionResult(matched_theme=None, score=0.0, reason="empty_input", suggestions=[])
|
||||
|
||||
query = _QueryInfo(
|
||||
normalized=normalized,
|
||||
tokens=_tokenize(normalized),
|
||||
trigrams=_trigrams(normalized),
|
||||
)
|
||||
|
||||
if len(normalized.replace(" ", "")) < MIN_QUERY_LENGTH:
|
||||
exact = next((entry for entry in self._entries if entry.normalized == normalized), None)
|
||||
if exact:
|
||||
return ResolutionResult(
|
||||
matched_theme=exact.display,
|
||||
score=100.0,
|
||||
reason="short_exact",
|
||||
suggestions=[ThemeScore(theme=exact.display, score=100.0)],
|
||||
)
|
||||
return ResolutionResult(matched_theme=None, score=0.0, reason="input_too_short", suggestions=[])
|
||||
|
||||
candidates = self._candidate_indexes(query)
|
||||
if not candidates:
|
||||
return ResolutionResult(matched_theme=None, score=0.0, reason="no_candidates", suggestions=[])
|
||||
|
||||
scored: list[ThemeScore] = []
|
||||
seen: set[str] = set()
|
||||
for idx in candidates:
|
||||
entry = self._entries[idx]
|
||||
score = _similarity(query, entry)
|
||||
if score <= 0 or score < 20.0:
|
||||
continue
|
||||
if entry.display in seen:
|
||||
continue
|
||||
scored.append(ThemeScore(theme=entry.display, score=score))
|
||||
seen.add(entry.display)
|
||||
|
||||
scored.sort(key=lambda item: (-item.score, item.theme.casefold(), item.theme))
|
||||
suggestions = scored[:limit]
|
||||
|
||||
if not suggestions:
|
||||
return ResolutionResult(matched_theme=None, score=0.0, reason="no_match", suggestions=[])
|
||||
|
||||
top = suggestions[0]
|
||||
if top.score >= HIGH_MATCH_THRESHOLD:
|
||||
return ResolutionResult(matched_theme=top.theme, score=top.score, reason="high_confidence", suggestions=suggestions)
|
||||
if top.score >= ACCEPT_MATCH_THRESHOLD:
|
||||
return ResolutionResult(matched_theme=top.theme, score=top.score, reason="accepted_confidence", suggestions=suggestions)
|
||||
if top.score >= SUGGEST_MATCH_THRESHOLD:
|
||||
return ResolutionResult(matched_theme=None, score=top.score, reason="suggestions", suggestions=suggestions)
|
||||
return ResolutionResult(matched_theme=None, score=top.score, reason="no_match", suggestions=suggestions)
|
||||
|
||||
def _candidate_indexes(self, query: _QueryInfo) -> set[int]:
|
||||
if not query.trigrams:
|
||||
return set(range(len(self._entries)))
|
||||
candidates: set[int] = set()
|
||||
for gram in query.trigrams:
|
||||
candidates.update(self._trigram_index.get(gram, ()))
|
||||
return candidates or set(range(len(self._entries)))
|
||||
|
||||
|
||||
@lru_cache(maxsize=128)
|
||||
def build_matcher(entries: tuple[ThemeCatalogEntry, ...]) -> ThemeMatcher:
|
||||
return ThemeMatcher(entries)
|
||||
216
code/deck_builder/theme_resolution.py
Normal file
216
code/deck_builder/theme_resolution.py
Normal file
|
|
@ -0,0 +1,216 @@
|
|||
"""Shared theme resolution utilities for supplemental user themes.
|
||||
|
||||
This module centralizes the fuzzy resolution logic so both the headless
|
||||
runner and the web UI can reuse a consistent implementation.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, Iterable, List, Sequence
|
||||
|
||||
from deck_builder.theme_catalog_loader import load_theme_catalog
|
||||
from deck_builder.theme_matcher import (
|
||||
build_matcher,
|
||||
normalize_theme,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"ThemeResolutionInfo",
|
||||
"normalize_theme_match_mode",
|
||||
"clean_theme_inputs",
|
||||
"parse_theme_list",
|
||||
"resolve_additional_theme_inputs",
|
||||
]
|
||||
|
||||
|
||||
@dataclass
|
||||
class ThemeResolutionInfo:
|
||||
"""Captures the outcome of resolving user-supplied supplemental themes."""
|
||||
|
||||
requested: List[str]
|
||||
mode: str
|
||||
catalog_version: str
|
||||
resolved: List[str]
|
||||
matches: List[Dict[str, Any]]
|
||||
unresolved: List[Dict[str, Any]]
|
||||
fuzzy_corrections: Dict[str, str]
|
||||
|
||||
|
||||
def normalize_theme_match_mode(value: str | None) -> str:
|
||||
"""Normalize theme match mode inputs to ``strict`` or ``permissive``."""
|
||||
|
||||
if value is None:
|
||||
return "permissive"
|
||||
text = str(value).strip().lower()
|
||||
if text in {"strict", "s"}:
|
||||
return "strict"
|
||||
return "permissive"
|
||||
|
||||
|
||||
def clean_theme_inputs(values: Sequence[Any]) -> List[str]:
|
||||
"""Normalize, deduplicate, and filter empty user-provided theme strings."""
|
||||
|
||||
cleaned: List[str] = []
|
||||
seen: set[str] = set()
|
||||
for value in values or []:
|
||||
try:
|
||||
text = str(value).strip()
|
||||
except Exception:
|
||||
continue
|
||||
if not text:
|
||||
continue
|
||||
key = text.casefold()
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
cleaned.append(text)
|
||||
return cleaned
|
||||
|
||||
|
||||
def parse_theme_list(raw: str | None) -> List[str]:
|
||||
"""Parse CLI/config style theme lists separated by comma or semicolon."""
|
||||
|
||||
if raw is None:
|
||||
return []
|
||||
try:
|
||||
text = str(raw)
|
||||
except Exception:
|
||||
return []
|
||||
text = text.strip()
|
||||
if not text:
|
||||
return []
|
||||
delimiter = ";" if ";" in text else ","
|
||||
parts = [part.strip() for part in text.split(delimiter)]
|
||||
return clean_theme_inputs(parts)
|
||||
|
||||
|
||||
def resolve_additional_theme_inputs(
|
||||
requested: Sequence[str],
|
||||
mode: str,
|
||||
*,
|
||||
commander_tags: Iterable[str] = (),
|
||||
) -> ThemeResolutionInfo:
|
||||
"""Resolve user-provided additional themes against the catalog.
|
||||
|
||||
Args:
|
||||
requested: Raw user inputs.
|
||||
mode: Strictness mode (``strict`` aborts on unresolved themes).
|
||||
commander_tags: Tags already supplied by the selected commander; these
|
||||
are used to deduplicate resolved results so we do not re-add themes
|
||||
already covered by the commander selection.
|
||||
|
||||
Returns:
|
||||
:class:`ThemeResolutionInfo` describing resolved and unresolved themes.
|
||||
|
||||
Raises:
|
||||
ValueError: When ``mode`` is strict and one or more inputs cannot be
|
||||
resolved with sufficient confidence.
|
||||
"""
|
||||
|
||||
normalized_mode = normalize_theme_match_mode(mode)
|
||||
cleaned_inputs = clean_theme_inputs(requested)
|
||||
entries, version = load_theme_catalog(None)
|
||||
|
||||
if not cleaned_inputs:
|
||||
return ThemeResolutionInfo(
|
||||
requested=[],
|
||||
mode=normalized_mode,
|
||||
catalog_version=version,
|
||||
resolved=[],
|
||||
matches=[],
|
||||
unresolved=[],
|
||||
fuzzy_corrections={},
|
||||
)
|
||||
|
||||
if not entries:
|
||||
unresolved = [
|
||||
{"input": raw, "reason": "catalog_missing", "score": 0.0, "suggestions": []}
|
||||
for raw in cleaned_inputs
|
||||
]
|
||||
if normalized_mode == "strict":
|
||||
raise ValueError(
|
||||
"Unable to resolve additional themes in strict mode: catalog unavailable"
|
||||
)
|
||||
return ThemeResolutionInfo(
|
||||
requested=cleaned_inputs,
|
||||
mode=normalized_mode,
|
||||
catalog_version=version,
|
||||
resolved=[],
|
||||
matches=[],
|
||||
unresolved=unresolved,
|
||||
fuzzy_corrections={},
|
||||
)
|
||||
|
||||
matcher = build_matcher(tuple(entries))
|
||||
matches: List[Dict[str, Any]] = []
|
||||
unresolved: List[Dict[str, Any]] = []
|
||||
fuzzy: Dict[str, str] = {}
|
||||
for raw in cleaned_inputs:
|
||||
result = matcher.resolve(raw)
|
||||
suggestions = [
|
||||
{"theme": suggestion.theme, "score": float(round(suggestion.score, 4))}
|
||||
for suggestion in result.suggestions
|
||||
]
|
||||
if result.matched_theme:
|
||||
matches.append(
|
||||
{
|
||||
"input": raw,
|
||||
"matched": result.matched_theme,
|
||||
"score": float(round(result.score, 4)),
|
||||
"reason": result.reason,
|
||||
"suggestions": suggestions,
|
||||
}
|
||||
)
|
||||
if normalize_theme(raw) != normalize_theme(result.matched_theme):
|
||||
fuzzy[raw] = result.matched_theme
|
||||
else:
|
||||
unresolved.append(
|
||||
{
|
||||
"input": raw,
|
||||
"reason": result.reason,
|
||||
"score": float(round(result.score, 4)),
|
||||
"suggestions": suggestions,
|
||||
}
|
||||
)
|
||||
|
||||
commander_set = {
|
||||
normalize_theme(tag)
|
||||
for tag in commander_tags
|
||||
if isinstance(tag, str) and tag.strip()
|
||||
}
|
||||
resolved: List[str] = []
|
||||
seen_resolved: set[str] = set()
|
||||
for match in matches:
|
||||
norm = normalize_theme(match["matched"])
|
||||
if norm in seen_resolved:
|
||||
continue
|
||||
if commander_set and norm in commander_set:
|
||||
continue
|
||||
resolved.append(match["matched"])
|
||||
seen_resolved.add(norm)
|
||||
|
||||
if normalized_mode == "strict" and unresolved:
|
||||
parts: List[str] = []
|
||||
for item in unresolved:
|
||||
suggestion_text = ", ".join(
|
||||
f"{s['theme']} ({s['score']:.1f})" for s in item.get("suggestions", [])
|
||||
)
|
||||
if suggestion_text:
|
||||
parts.append(f"{item['input']} (suggestions: {suggestion_text})")
|
||||
else:
|
||||
parts.append(item["input"])
|
||||
raise ValueError(
|
||||
"Unable to resolve additional themes in strict mode: " + "; ".join(parts)
|
||||
)
|
||||
|
||||
return ThemeResolutionInfo(
|
||||
requested=cleaned_inputs,
|
||||
mode=normalized_mode,
|
||||
catalog_version=version,
|
||||
resolved=resolved,
|
||||
matches=matches,
|
||||
unresolved=unresolved,
|
||||
fuzzy_corrections=fuzzy,
|
||||
)
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue