Bracket enforcement + inline gating; global pool prune; compliance JSON artifacts; UI combos gating; compose envs consolidated; fix YAML; bump version to 2.2.5

This commit is contained in:
mwisnowski 2025-09-03 18:00:06 -07:00
parent 42c8fc9f9e
commit 4e03997923
32 changed files with 2819 additions and 125 deletions

View file

@ -0,0 +1,226 @@
from __future__ import annotations
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import json
import yaml
from deck_builder.combos import detect_combos
from .phases.phase0_core import BRACKET_DEFINITIONS
from type_definitions import ComplianceReport, CategoryFinding
POLICY_TAGS = {
"game_changers": "Bracket:GameChanger",
"extra_turns": "Bracket:ExtraTurn",
"mass_land_denial": "Bracket:MassLandDenial",
"tutors_nonland": "Bracket:TutorNonland",
}
# Local policy file mapping (mirrors tagging.bracket_policy_applier)
POLICY_FILES: Dict[str, str] = {
"game_changers": "config/card_lists/game_changers.json",
"extra_turns": "config/card_lists/extra_turns.json",
"mass_land_denial": "config/card_lists/mass_land_denial.json",
"tutors_nonland": "config/card_lists/tutors_nonland.json",
}
def _load_json_cards(path: str | Path) -> Tuple[List[str], Optional[str]]:
p = Path(path)
if not p.exists():
return [], None
try:
data = json.loads(p.read_text(encoding="utf-8"))
cards = [str(x).strip() for x in data.get("cards", []) if str(x).strip()]
version = str(data.get("list_version")) if data.get("list_version") else None
return cards, version
except Exception:
return [], None
def _load_brackets_yaml(path: str | Path = "config/brackets.yml") -> Dict[str, dict]:
p = Path(path)
if not p.exists():
return {}
try:
return yaml.safe_load(p.read_text(encoding="utf-8")) or {}
except Exception:
return {}
def _find_bracket_def(bracket_key: str) -> Tuple[str, int, Dict[str, Optional[int]]]:
key = (bracket_key or "core").strip().lower()
# Prefer YAML if available
y = _load_brackets_yaml()
if key in y:
meta = y[key]
name = str(meta.get("name", key.title()))
level = int(meta.get("level", 2))
limits = dict(meta.get("limits", {}))
return name, level, limits
# Fallback to in-code defaults
for bd in BRACKET_DEFINITIONS:
if bd.name.strip().lower() == key or str(bd.level) == key:
return bd.name, bd.level, dict(bd.limits)
# map common aliases
alias = bd.name.strip().lower()
if key in (alias, {1:"exhibition",2:"core",3:"upgraded",4:"optimized",5:"cedh"}.get(bd.level, "")):
return bd.name, bd.level, dict(bd.limits)
# Default to Core
core = next(b for b in BRACKET_DEFINITIONS if b.level == 2)
return core.name, core.level, dict(core.limits)
def _collect_tag_counts(card_library: Dict[str, Dict]) -> Tuple[Dict[str, int], Dict[str, List[str]]]:
counts: Dict[str, int] = {v: 0 for v in POLICY_TAGS.values()}
flagged_names: Dict[str, List[str]] = {k: [] for k in POLICY_TAGS.keys()}
for name, info in (card_library or {}).items():
tags = [t for t in (info.get("Tags") or []) if isinstance(t, str)]
for key, tag in POLICY_TAGS.items():
if tag in tags:
counts[tag] += 1
flagged_names[key].append(name)
return counts, flagged_names
def _canonicalize(name: str | None) -> str:
"""Match normalization similar to the tag applier.
- casefold
- normalize curly apostrophes to straight
- strip A- prefix (Arena/Alchemy variants)
- trim
"""
if not name:
return ""
s = str(name).strip().replace("\u2019", "'")
if s.startswith("A-") and len(s) > 2:
s = s[2:]
return s.casefold()
def _status_for(count: int, limit: Optional[int]) -> str:
if limit is None:
return "PASS"
return "PASS" if count <= int(limit) else "FAIL"
def evaluate_deck(
deck_cards: Dict[str, Dict],
commander_name: Optional[str],
bracket: str,
enforcement: str = "validate",
combos_path: str | Path = "config/card_lists/combos.json",
) -> ComplianceReport:
name, level, limits = _find_bracket_def(bracket)
counts_by_tag, names_by_key = _collect_tag_counts(deck_cards)
categories: Dict[str, CategoryFinding] = {}
messages: List[str] = []
# Prepare a canonicalized deck name map to support list-based matching
deck_canon_to_display: Dict[str, str] = {}
for n in (deck_cards or {}).keys():
cn = _canonicalize(n)
if cn and cn not in deck_canon_to_display:
deck_canon_to_display[cn] = n
# Map categories by combining tag-based counts with direct list matches by name
for key, tag in POLICY_TAGS.items():
# Start with any names found via tags
flagged_set: set[str] = set()
for nm in names_by_key.get(key, []) or []:
ckey = _canonicalize(nm)
if ckey:
flagged_set.add(ckey)
# Merge in list-based matches (by canonicalized name)
try:
file_path = POLICY_FILES.get(key)
if file_path:
names_list, _ver = _load_json_cards(file_path)
# Fallback for game_changers when file is empty: use in-code constants
if key == 'game_changers' and not names_list:
try:
from deck_builder import builder_constants as _bc
names_list = list(getattr(_bc, 'GAME_CHANGERS', []) or [])
except Exception:
names_list = []
listed = {_canonicalize(x) for x in names_list}
present = set(deck_canon_to_display.keys())
flagged_set |= (listed & present)
except Exception:
pass
# Build final flagged display names from the canonical set
flagged_names_disp = sorted({deck_canon_to_display.get(cn, cn) for cn in flagged_set})
c = len(flagged_set)
lim = limits.get(key)
status = _status_for(c, lim)
cat: CategoryFinding = {
"count": c,
"limit": lim,
"flagged": flagged_names_disp,
"status": status,
"notes": [],
}
categories[key] = cat
if status == "FAIL":
messages.append(f"{key.replace('_',' ').title()}: {c} exceeds limit {lim}")
# Two-card combos detection
combos = detect_combos(deck_cards.keys(), combos_path=combos_path)
cheap_early_pairs = [p for p in combos if p.cheap_early]
c_limit = limits.get("two_card_combos")
combos_status = _status_for(len(cheap_early_pairs), c_limit)
categories["two_card_combos"] = {
"count": len(cheap_early_pairs),
"limit": c_limit,
"flagged": [f"{p.a} + {p.b}" for p in cheap_early_pairs],
"status": combos_status,
"notes": ["Only counting cheap/early combos per policy"],
}
if combos_status == "FAIL":
messages.append("Two-card combos present beyond allowed bracket")
commander_flagged = False
if commander_name:
gch_cards, _ = _load_json_cards("config/card_lists/game_changers.json")
if any(commander_name.strip().lower() == x.lower() for x in gch_cards):
commander_flagged = True
# Exhibition/Core treat this as automatic fail; Upgraded counts toward limit
if level in (1, 2):
messages.append("Commander is on Game Changers list (not allowed for this bracket)")
categories["game_changers"]["status"] = "FAIL"
categories["game_changers"]["flagged"].append(commander_name)
# Build list_versions metadata
_, extra_ver = _load_json_cards("config/card_lists/extra_turns.json")
_, mld_ver = _load_json_cards("config/card_lists/mass_land_denial.json")
_, tutor_ver = _load_json_cards("config/card_lists/tutors_nonland.json")
_, gch_ver = _load_json_cards("config/card_lists/game_changers.json")
list_versions = {
"extra_turns": extra_ver,
"mass_land_denial": mld_ver,
"tutors_nonland": tutor_ver,
"game_changers": gch_ver,
}
# Overall verdict
overall = "PASS"
if any(cat.get("status") == "FAIL" for cat in categories.values()):
overall = "FAIL"
elif any(cat.get("status") == "WARN" for cat in categories.values()):
overall = "WARN"
report: ComplianceReport = {
"bracket": name.lower(),
"level": level,
"enforcement": enforcement,
"overall": overall,
"commander_flagged": commander_flagged,
"categories": categories,
"combos": [{"a": p.a, "b": p.b, "cheap_early": p.cheap_early, "setup_dependent": p.setup_dependent} for p in combos],
"list_versions": list_versions,
"messages": messages,
}
return report

View file

@ -119,6 +119,19 @@ class DeckBuilder(
# Modular reporting phase
if hasattr(self, 'run_reporting_phase'):
self.run_reporting_phase()
# Immediately after content additions and summary, if compliance is enforced later,
# we want to display what would be swapped. For interactive runs, surface a dry prompt.
try:
# Compute a quick compliance snapshot here to hint at upcoming enforcement
if hasattr(self, 'compute_and_print_compliance') and not getattr(self, 'headless', False):
from deck_builder.brackets_compliance import evaluate_deck as _eval # type: ignore
bracket_key = str(getattr(self, 'bracket_name', '') or getattr(self, 'bracket_level', 'core')).lower()
commander = getattr(self, 'commander_name', None)
snap = _eval(self.card_library, commander_name=commander, bracket=bracket_key)
if snap.get('overall') == 'FAIL':
self.output_func("\nNote: Limits exceeded. You'll get a chance to review swaps next.")
except Exception:
pass
if hasattr(self, 'export_decklist_csv'):
# If user opted out of owned-only, silently load all owned files for marking
try:
@ -133,6 +146,25 @@ class DeckBuilder(
txt_path = self.export_decklist_text(filename=base + '.txt') # type: ignore[attr-defined]
# Display the text file contents for easy copy/paste to online deck builders
self._display_txt_contents(txt_path)
# Compute bracket compliance and save a JSON report alongside exports
try:
if hasattr(self, 'compute_and_print_compliance'):
report0 = self.compute_and_print_compliance(base_stem=base) # type: ignore[attr-defined]
# If non-compliant and interactive, offer enforcement now
try:
if isinstance(report0, dict) and report0.get('overall') == 'FAIL' and not getattr(self, 'headless', False):
from deck_builder.phases.phase6_reporting import ReportingMixin as _RM # type: ignore
if isinstance(self, _RM) and hasattr(self, 'enforce_and_reexport'):
self.output_func("One or more bracket limits exceeded. Enter to auto-resolve, or Ctrl+C to skip.")
try:
_ = self.input_func("")
except Exception:
pass
self.enforce_and_reexport(base_stem=base, mode='prompt') # type: ignore[attr-defined]
except Exception:
pass
except Exception:
pass
# If owned-only build is incomplete, generate recommendations
try:
total_cards = sum(int(v.get('Count', 1)) for v in self.card_library.values())

View file

@ -0,0 +1,448 @@
from __future__ import annotations
from typing import Dict, List, Optional, Tuple, Set
from pathlib import Path
import json
# Lightweight, internal utilities to avoid circular imports
from .brackets_compliance import evaluate_deck, POLICY_FILES
def _load_list_cards(paths: List[str]) -> Set[str]:
out: Set[str] = set()
for p in paths:
try:
data = json.loads(Path(p).read_text(encoding="utf-8"))
for n in (data.get("cards") or []):
if isinstance(n, str) and n.strip():
out.add(n.strip())
except Exception:
continue
return out
def _candidate_pool_for_role(builder, role: str) -> List[Tuple[str, dict]]:
"""Return a prioritized list of (name, rowdict) candidates for a replacement of a given role.
This consults the current combined card pool, filters out lands and already-chosen names,
and applies a role->tag mapping to find suitable replacements.
"""
df = getattr(builder, "_combined_cards_df", None)
if df is None or getattr(df, "empty", True):
return []
if "name" not in df.columns:
return []
# Normalize tag list per row
def _norm_tags(x):
return [str(t).lower() for t in x] if isinstance(x, list) else []
work = df.copy()
work["_ltags"] = work.get("themeTags", []).apply(_norm_tags)
# Role to tag predicates
def _is_protection(tags: List[str]) -> bool:
return any("protection" in t for t in tags)
def _is_draw(tags: List[str]) -> bool:
return any(("draw" in t) or ("card advantage" in t) for t in tags)
def _is_removal(tags: List[str]) -> bool:
return any(("removal" in t) or ("spot removal" in t) for t in tags) and not any(("board wipe" in t) or ("mass removal" in t) for t in tags)
def _is_wipe(tags: List[str]) -> bool:
return any(("board wipe" in t) or ("mass removal" in t) for t in tags)
# Theme fallback: anything that matches selected tags (primary/secondary/tertiary)
sel_tags = [str(getattr(builder, k, "") or "").strip().lower() for k in ("primary_tag", "secondary_tag", "tertiary_tag")]
sel_tags = [t for t in sel_tags if t]
def _matches_theme(tags: List[str]) -> bool:
if not sel_tags:
return False
for t in tags:
for st in sel_tags:
if st in t:
return True
return False
pred = None
r = str(role or "").strip().lower()
if r == "protection":
pred = _is_protection
elif r == "card_advantage":
pred = _is_draw
elif r == "removal":
pred = _is_removal
elif r in ("wipe", "board_wipe", "wipes"):
pred = _is_wipe
else:
pred = _matches_theme
pool = work[~work["type"].fillna("").str.contains("Land", case=False, na=False)]
if pred is _matches_theme:
pool = pool[pool["_ltags"].apply(_matches_theme)]
else:
pool = pool[pool["_ltags"].apply(pred)]
# Exclude names already in the library
already_lower = {str(n).lower() for n in getattr(builder, "card_library", {}).keys()}
pool = pool[~pool["name"].astype(str).str.lower().isin(already_lower)]
# Sort by edhrecRank then manaValue
try:
from . import builder_utils as bu
sorted_df = bu.sort_by_priority(pool, ["edhrecRank", "manaValue"]) # type: ignore[attr-defined]
# Prefer-owned bias
if getattr(builder, "prefer_owned", False):
owned = getattr(builder, "owned_card_names", None)
if owned:
sorted_df = bu.prefer_owned_first(sorted_df, {str(n).lower() for n in owned}) # type: ignore[attr-defined]
except Exception:
sorted_df = pool
out: List[Tuple[str, dict]] = []
for _, r in sorted_df.iterrows():
nm = str(r.get("name"))
if not nm:
continue
out.append((nm, r.to_dict()))
return out
def _remove_card(builder, name: str) -> bool:
entry = getattr(builder, "card_library", {}).get(name)
if not entry:
return False
# Protect commander and locks
if bool(entry.get("Commander")):
return False
if str(entry.get("AddedBy", "")).strip().lower() == "lock":
return False
try:
del builder.card_library[name]
return True
except Exception:
return False
def _try_add_replacement(builder, target_role: Optional[str], forbidden: Set[str]) -> Optional[str]:
"""Attempt to add one replacement card for the given role, avoiding forbidden names.
Returns the name added, or None if no suitable candidate was found/added.
"""
role = (target_role or "").strip().lower()
tried_roles = [role] if role else []
if role not in ("protection", "card_advantage", "removal", "wipe", "board_wipe", "wipes"):
tried_roles.append("card_advantage")
tried_roles.append("protection")
tried_roles.append("removal")
for r in tried_roles or ["card_advantage"]:
candidates = _candidate_pool_for_role(builder, r)
for nm, row in candidates:
if nm in forbidden:
continue
# Enforce owned-only and color identity legality via builder.add_card (it will silently skip if illegal)
before = set(getattr(builder, "card_library", {}).keys())
builder.add_card(
nm,
card_type=str(row.get("type", row.get("type_line", "")) or ""),
mana_cost=str(row.get("mana_cost", row.get("manaCost", "")) or ""),
role=target_role or ("card_advantage" if r == "card_advantage" else ("protection" if r == "protection" else ("removal" if r == "removal" else "theme_spell"))),
added_by="enforcement"
)
after = set(getattr(builder, "card_library", {}).keys())
added = list(after - before)
if added:
return added[0]
return None
def enforce_bracket_compliance(builder, mode: str = "prompt") -> Dict:
"""Trim over-limit bracket categories and add role-consistent replacements.
mode: 'prompt' for interactive CLI (respects builder.headless); 'auto' for non-interactive.
Returns the final compliance report after enforcement (or the original if no changes).
"""
# Compute initial report
bracket_key = str(getattr(builder, 'bracket_name', '') or getattr(builder, 'bracket_level', 'core')).lower()
commander = getattr(builder, 'commander_name', None)
report = evaluate_deck(getattr(builder, 'card_library', {}), commander_name=commander, bracket=bracket_key)
if report.get("overall") != "FAIL":
return report
# Prepare prohibited set (avoid adding these during replacement)
forbidden_lists = list(POLICY_FILES.values())
prohibited: Set[str] = _load_list_cards(forbidden_lists)
# Determine offenders per category
cats = report.get("categories", {}) or {}
to_remove: List[str] = []
# Build a helper to rank offenders: keep better (lower edhrecRank) ones
df = getattr(builder, "_combined_cards_df", None)
def _score(name: str) -> Tuple[int, float, str]:
try:
if df is not None and not getattr(df, 'empty', True) and 'name' in df.columns:
r = df[df['name'].astype(str) == str(name)]
if not r.empty:
rank = int(r.iloc[0].get('edhrecRank') or 10**9)
mv = float(r.iloc[0].get('manaValue') or r.iloc[0].get('cmc') or 0.0)
return (rank, mv, str(name))
except Exception:
pass
return (10**9, 99.0, str(name))
# Interactive helper
interactive = (mode == 'prompt' and not bool(getattr(builder, 'headless', False)))
for key, cat in cats.items():
if key not in ("game_changers", "extra_turns", "mass_land_denial", "tutors_nonland"):
continue
lim = cat.get("limit")
cnt = int(cat.get("count", 0) or 0)
if lim is None or cnt <= int(lim):
continue
flagged = [n for n in (cat.get("flagged") or []) if isinstance(n, str)]
# Only consider flagged names that are actually in the library now
lib = getattr(builder, 'card_library', {})
present = [n for n in flagged if n in lib]
if not present:
continue
# Determine how many need trimming
over = cnt - int(lim)
# Sort by ascending desirability to keep: worst ranks first for removal
present_sorted = sorted(present, key=_score, reverse=True) # worst first
if interactive:
# Present choices to keep
try:
out = getattr(builder, 'output_func', print)
inp = getattr(builder, 'input_func', input)
out(f"\nEnforcement: {key.replace('_',' ').title()} is over the limit ({cnt} > {lim}).")
out("Select the indices to KEEP (comma-separated). Press Enter to auto-keep the best:")
for i, nm in enumerate(sorted(present, key=_score)):
sc = _score(nm)
out(f" [{i}] {nm} (edhrecRank={sc[0] if sc[0] < 10**9 else 'n/a'})")
raw = str(inp("Keep which? ").strip())
keep_idx: Set[int] = set()
if raw:
for tok in raw.split(','):
tok = tok.strip()
if tok.isdigit():
keep_idx.add(int(tok))
# Compute the names to keep up to the allowed count
allowed = max(0, int(lim))
keep_list: List[str] = []
for i, nm in enumerate(sorted(present, key=_score)):
if len(keep_list) >= allowed:
break
if i in keep_idx:
keep_list.append(nm)
# If still short, fill with best-ranked remaining
for nm in sorted(present, key=_score):
if len(keep_list) >= allowed:
break
if nm not in keep_list:
keep_list.append(nm)
# Remove the others (beyond keep_list)
for nm in present:
if nm not in keep_list and over > 0:
to_remove.append(nm)
over -= 1
if over > 0:
# If user kept too many, trim worst extras
for nm in present_sorted:
if over <= 0:
break
if nm in keep_list:
to_remove.append(nm)
over -= 1
except Exception:
# Fallback to auto behavior
to_remove.extend(present_sorted[:over])
else:
# Auto: remove the worst-ranked extras first
to_remove.extend(present_sorted[:over])
# Execute removals and replacements
actually_removed: List[str] = []
actually_added: List[str] = []
swaps: List[dict] = []
# Load preferred replacements mapping (lowercased keys/values)
pref_map_lower: Dict[str, str] = {}
try:
raw = getattr(builder, 'preferred_replacements', {}) or {}
for k, v in raw.items():
ks = str(k).strip().lower()
vs = str(v).strip().lower()
if ks and vs:
pref_map_lower[ks] = vs
except Exception:
pref_map_lower = {}
for nm in to_remove:
entry = getattr(builder, 'card_library', {}).get(nm)
if not entry:
continue
role = entry.get('Role') or None
if _remove_card(builder, nm):
actually_removed.append(nm)
# First, honor any explicit user-chosen replacement
added = None
try:
want = pref_map_lower.get(str(nm).strip().lower())
if want:
# Avoid adding prohibited or duplicates
lib_l = {str(x).strip().lower() for x in getattr(builder, 'card_library', {}).keys()}
if (want not in prohibited) and (want not in lib_l):
df = getattr(builder, '_combined_cards_df', None)
target_name = None
card_type = ''
mana_cost = ''
if df is not None and not getattr(df, 'empty', True) and 'name' in df.columns:
r = df[df['name'].astype(str).str.lower() == want]
if not r.empty:
target_name = str(r.iloc[0]['name'])
card_type = str(r.iloc[0].get('type', r.iloc[0].get('type_line', '')) or '')
mana_cost = str(r.iloc[0].get('mana_cost', r.iloc[0].get('manaCost', '')) or '')
# If we couldn't resolve row, still try to add by name
target = target_name or want
before = set(getattr(builder, 'card_library', {}).keys())
builder.add_card(target, card_type=card_type, mana_cost=mana_cost, role=role, added_by='enforcement')
after = set(getattr(builder, 'card_library', {}).keys())
delta = list(after - before)
if delta:
added = delta[0]
except Exception:
added = None
# If no explicit or failed, try to add an automatic role-consistent replacement
if not added:
added = _try_add_replacement(builder, role, prohibited)
if added:
actually_added.append(added)
swaps.append({"removed": nm, "added": added, "role": role})
else:
swaps.append({"removed": nm, "added": None, "role": role})
# Recompute report after initial category-based changes
final_report = evaluate_deck(getattr(builder, 'card_library', {}), commander_name=commander, bracket=bracket_key)
# --- Second pass: break cheap/early two-card combos if still over the limit ---
try:
cats2 = final_report.get("categories", {}) or {}
two = cats2.get("two_card_combos") or {}
curr = int(two.get("count", 0) or 0)
lim = two.get("limit")
if lim is not None and curr > int(lim):
# Build present cheap/early pairs from the report
pairs: List[Tuple[str, str]] = []
for p in (final_report.get("combos") or []):
try:
if not p.get("cheap_early"):
continue
a = str(p.get("a") or "").strip()
b = str(p.get("b") or "").strip()
if not a or not b:
continue
# Only consider if both still present
lib = getattr(builder, 'card_library', {}) or {}
if a in lib and b in lib:
pairs.append((a, b))
except Exception:
continue
# Helper to recompute count and frequencies from current pairs
def _freq(ps: List[Tuple[str, str]]) -> Dict[str, int]:
mp: Dict[str, int] = {}
for (a, b) in ps:
mp[a] = mp.get(a, 0) + 1
mp[b] = mp.get(b, 0) + 1
return mp
current_pairs = list(pairs)
blocked: Set[str] = set()
# Keep removing until combos count <= limit or no progress possible
while len(current_pairs) > int(lim):
freq = _freq(current_pairs)
if not freq:
break
# Rank candidates: break the most combos first; break ties by worst desirability
cand_names = list(freq.keys())
cand_names.sort(key=lambda nm: (-int(freq.get(nm, 0)), _score(nm)), reverse=False) # type: ignore[arg-type]
removed_any = False
for nm in cand_names:
if nm in blocked:
continue
entry = getattr(builder, 'card_library', {}).get(nm)
role = entry.get('Role') if isinstance(entry, dict) else None
# Try to remove; protects commander/locks inside helper
if _remove_card(builder, nm):
actually_removed.append(nm)
# Preferred replacement first
added = None
try:
want = pref_map_lower.get(str(nm).strip().lower())
if want:
lib_l = {str(x).strip().lower() for x in getattr(builder, 'card_library', {}).keys()}
if (want not in prohibited) and (want not in lib_l):
df2 = getattr(builder, '_combined_cards_df', None)
target_name = None
card_type = ''
mana_cost = ''
if df2 is not None and not getattr(df2, 'empty', True) and 'name' in df2.columns:
r = df2[df2['name'].astype(str).str.lower() == want]
if not r.empty:
target_name = str(r.iloc[0]['name'])
card_type = str(r.iloc[0].get('type', r.iloc[0].get('type_line', '')) or '')
mana_cost = str(r.iloc[0].get('mana_cost', r.iloc[0].get('manaCost', '')) or '')
target = target_name or want
before = set(getattr(builder, 'card_library', {}).keys())
builder.add_card(target, card_type=card_type, mana_cost=mana_cost, role=role, added_by='enforcement')
after = set(getattr(builder, 'card_library', {}).keys())
delta = list(after - before)
if delta:
added = delta[0]
except Exception:
added = None
if not added:
added = _try_add_replacement(builder, role, prohibited)
if added:
actually_added.append(added)
swaps.append({"removed": nm, "added": added, "role": role})
else:
swaps.append({"removed": nm, "added": None, "role": role})
# Update pairs by removing any that contain nm
current_pairs = [(a, b) for (a, b) in current_pairs if (a != nm and b != nm)]
removed_any = True
break
else:
blocked.add(nm)
if not removed_any:
# Cannot break further due to locks/commander; stop to avoid infinite loop
break
# Recompute report after combo-breaking
final_report = evaluate_deck(getattr(builder, 'card_library', {}), commander_name=commander, bracket=bracket_key)
except Exception:
# If combo-breaking fails for any reason, fall back to the current report
pass
# Attach enforcement actions for downstream consumers
try:
final_report.setdefault('enforcement', {})
final_report['enforcement']['removed'] = list(actually_removed)
final_report['enforcement']['added'] = list(actually_added)
final_report['enforcement']['swaps'] = list(swaps)
except Exception:
pass
# Log concise summary if possible
try:
out = getattr(builder, 'output_func', print)
if actually_removed or actually_added:
out("\nEnforcement applied:")
if actually_removed:
out("Removed:")
for x in actually_removed:
out(f" - {x}")
if actually_added:
out("Added:")
for x in actually_added:
out(f" + {x}")
out(f"Compliance after enforcement: {final_report.get('overall')}")
except Exception:
pass
return final_report

View file

@ -380,6 +380,8 @@ class CreatureAdditionMixin:
commander_name = getattr(self, 'commander', None) or getattr(self, 'commander_name', None)
if commander_name and 'name' in creature_df.columns:
creature_df = creature_df[creature_df['name'] != commander_name]
# Apply bracket-based pre-filters (e.g., disallow game changers or tutors when bracket limit == 0)
creature_df = self._apply_bracket_pre_filters(creature_df)
if creature_df.empty:
return None
if '_parsedThemeTags' not in creature_df.columns:
@ -392,6 +394,66 @@ class CreatureAdditionMixin:
creature_df['_multiMatch'] = creature_df['_normTags'].apply(lambda lst: sum(1 for t in selected_tags_lower if t in lst))
return creature_df
def _apply_bracket_pre_filters(self, df):
"""Preemptively filter disallowed categories for the current bracket for creatures.
Excludes when bracket limit == 0 for a category:
- Game Changers
- Nonland Tutors
Note: Extra Turns and Mass Land Denial generally don't apply to creature cards,
but if present as tags, they'll be respected too.
"""
try:
if df is None or getattr(df, 'empty', False):
return df
limits = getattr(self, 'bracket_limits', {}) or {}
disallow = {
'game_changers': (limits.get('game_changers') is not None and int(limits.get('game_changers')) == 0),
'tutors_nonland': (limits.get('tutors_nonland') is not None and int(limits.get('tutors_nonland')) == 0),
'extra_turns': (limits.get('extra_turns') is not None and int(limits.get('extra_turns')) == 0),
'mass_land_denial': (limits.get('mass_land_denial') is not None and int(limits.get('mass_land_denial')) == 0),
}
if not any(disallow.values()):
return df
def norm_tags(val):
try:
return [str(t).strip().lower() for t in (val or [])]
except Exception:
return []
if '_ltags' not in df.columns:
try:
if 'themeTags' in df.columns:
df = df.copy()
df['_ltags'] = df['themeTags'].apply(bu.normalize_tag_cell)
except Exception:
pass
tag_col = '_ltags' if '_ltags' in df.columns else ('themeTags' if 'themeTags' in df.columns else None)
if not tag_col:
return df
syn = {
'game_changers': { 'bracket:gamechanger', 'gamechanger', 'game-changer', 'game changer' },
'tutors_nonland': { 'bracket:tutornonland', 'tutor', 'tutors', 'nonland tutor', 'non-land tutor' },
'extra_turns': { 'bracket:extraturn', 'extra turn', 'extra turns', 'extraturn' },
'mass_land_denial': { 'bracket:masslanddenial', 'mass land denial', 'mld', 'masslanddenial' },
}
tags_series = df[tag_col].apply(norm_tags)
mask_keep = [True] * len(df)
for cat, dis in disallow.items():
if not dis:
continue
needles = syn.get(cat, set())
drop_idx = tags_series.apply(lambda lst, nd=needles: any(any(n in t for n in nd) for t in lst))
mask_keep = [mk and (not di) for mk, di in zip(mask_keep, drop_idx.tolist())]
try:
import pandas as _pd # type: ignore
mask_keep = _pd.Series(mask_keep, index=df.index)
except Exception:
pass
return df[mask_keep]
except Exception:
return df
def _add_creatures_for_role(self, role: str):
"""Add creatures for a single theme role ('primary'|'secondary'|'tertiary')."""
df = getattr(self, '_combined_cards_df', None)

View file

@ -2,6 +2,7 @@ from __future__ import annotations
import math
from typing import List, Dict
import os
from .. import builder_utils as bu
from .. import builder_constants as bc
@ -16,6 +17,99 @@ class SpellAdditionMixin:
(e.g., further per-category sub-mixins) can split this class if complexity grows.
"""
def _apply_bracket_pre_filters(self, df):
"""Preemptively filter disallowed categories for the current bracket.
Excludes when bracket limit == 0 for a category:
- Game Changers
- Extra Turns
- Mass Land Denial (MLD)
- Nonland Tutors
"""
try:
if df is None or getattr(df, 'empty', False):
return df
limits = getattr(self, 'bracket_limits', {}) or {}
# Determine which categories are hard-disallowed
disallow = {
'game_changers': (limits.get('game_changers') is not None and int(limits.get('game_changers')) == 0),
'extra_turns': (limits.get('extra_turns') is not None and int(limits.get('extra_turns')) == 0),
'mass_land_denial': (limits.get('mass_land_denial') is not None and int(limits.get('mass_land_denial')) == 0),
'tutors_nonland': (limits.get('tutors_nonland') is not None and int(limits.get('tutors_nonland')) == 0),
}
if not any(disallow.values()):
return df
# Normalize tags helper
def norm_tags(val):
try:
return [str(t).strip().lower() for t in (val or [])]
except Exception:
return []
# Build predicate masks only if column exists
if '_ltags' not in df.columns:
try:
from .. import builder_utils as _bu
if 'themeTags' in df.columns:
df = df.copy()
df['_ltags'] = df['themeTags'].apply(_bu.normalize_tag_cell)
except Exception:
pass
def has_any(tags, needles):
return any((nd in t) for t in tags for nd in needles)
tag_col = '_ltags' if '_ltags' in df.columns else ('themeTags' if 'themeTags' in df.columns else None)
if not tag_col:
return df
# Define synonyms per category
syn = {
'game_changers': { 'bracket:gamechanger', 'gamechanger', 'game-changer', 'game changer' },
'extra_turns': { 'bracket:extraturn', 'extra turn', 'extra turns', 'extraturn' },
'mass_land_denial': { 'bracket:masslanddenial', 'mass land denial', 'mld', 'masslanddenial' },
'tutors_nonland': { 'bracket:tutornonland', 'tutor', 'tutors', 'nonland tutor', 'non-land tutor' },
}
# Build exclusion mask
mask_keep = [True] * len(df)
tags_series = df[tag_col].apply(norm_tags)
for cat, dis in disallow.items():
if not dis:
continue
needles = syn.get(cat, set())
drop_idx = tags_series.apply(lambda lst, nd=needles: any(any(n in t for n in nd) for t in lst))
# Combine into keep mask
mask_keep = [mk and (not di) for mk, di in zip(mask_keep, drop_idx.tolist())]
try:
import pandas as _pd # type: ignore
mask_keep = _pd.Series(mask_keep, index=df.index)
except Exception:
pass
return df[mask_keep]
except Exception:
return df
def _debug_dump_pool(self, df, label: str) -> None:
"""If DEBUG_SPELL_POOLS_WRITE is set, write the pool to logs/pool_{label}_{timestamp}.csv"""
try:
if str(os.getenv('DEBUG_SPELL_POOLS_WRITE', '')).strip().lower() not in {"1","true","yes","on"}:
return
import os as _os
from datetime import datetime as _dt
_os.makedirs('logs', exist_ok=True)
ts = getattr(self, 'timestamp', _dt.now().strftime('%Y%m%d%H%M%S'))
path = _os.path.join('logs', f"pool_{label}_{ts}.csv")
cols = [c for c in ['name','type','manaValue','manaCost','edhrecRank','themeTags'] if c in df.columns]
try:
if cols:
df[cols].to_csv(path, index=False, encoding='utf-8')
else:
df.to_csv(path, index=False, encoding='utf-8')
except Exception:
df.to_csv(path, index=False)
try:
self.output_func(f"[DEBUG] Wrote pool CSV: {path} ({len(df)})")
except Exception:
pass
except Exception:
pass
# ---------------------------
# Ramp
# ---------------------------
@ -56,7 +150,16 @@ class SpellAdditionMixin:
commander_name = getattr(self, 'commander', None)
if commander_name:
work = work[work['name'] != commander_name]
work = self._apply_bracket_pre_filters(work)
work = bu.sort_by_priority(work, ['edhrecRank','manaValue'])
self._debug_dump_pool(work, 'ramp_all')
# Debug: print ramp pool details
try:
if str(os.getenv('DEBUG_SPELL_POOLS', '')).strip().lower() in {"1","true","yes","on"}:
names = work['name'].astype(str).head(30).tolist()
self.output_func(f"[DEBUG][Ramp] Total pool (non-lands): {len(work)}; top {len(names)}: {', '.join(names)}")
except Exception:
pass
# Prefer-owned bias: stable reorder to put owned first while preserving prior sort
if getattr(self, 'prefer_owned', False):
owned_set = getattr(self, 'owned_card_names', None)
@ -97,10 +200,24 @@ class SpellAdditionMixin:
return added_now
rocks_pool = work[work['type'].fillna('').str.contains('Artifact', case=False, na=False)]
try:
if str(os.getenv('DEBUG_SPELL_POOLS', '')).strip().lower() in {"1","true","yes","on"}:
rnames = rocks_pool['name'].astype(str).head(25).tolist()
self.output_func(f"[DEBUG][Ramp] Rocks pool: {len(rocks_pool)}; sample: {', '.join(rnames)}")
except Exception:
pass
self._debug_dump_pool(rocks_pool, 'ramp_rocks')
if rocks_target > 0:
add_from_pool(rocks_pool, rocks_target, added_rocks, 'Rocks')
dorks_pool = work[work['type'].fillna('').str.contains('Creature', case=False, na=False)]
try:
if str(os.getenv('DEBUG_SPELL_POOLS', '')).strip().lower() in {"1","true","yes","on"}:
dnames = dorks_pool['name'].astype(str).head(25).tolist()
self.output_func(f"[DEBUG][Ramp] Dorks pool: {len(dorks_pool)}; sample: {', '.join(dnames)}")
except Exception:
pass
self._debug_dump_pool(dorks_pool, 'ramp_dorks')
if dorks_target > 0:
add_from_pool(dorks_pool, dorks_target, added_dorks, 'Dorks')
@ -108,6 +225,13 @@ class SpellAdditionMixin:
remaining = target_total - current_total
if remaining > 0:
general_pool = work[~work['name'].isin(added_rocks + added_dorks)]
try:
if str(os.getenv('DEBUG_SPELL_POOLS', '')).strip().lower() in {"1","true","yes","on"}:
gnames = general_pool['name'].astype(str).head(25).tolist()
self.output_func(f"[DEBUG][Ramp] General pool (remaining): {len(general_pool)}; sample: {', '.join(gnames)}")
except Exception:
pass
self._debug_dump_pool(general_pool, 'ramp_general')
add_from_pool(general_pool, remaining, added_general, 'General')
total_added_now = len(added_rocks)+len(added_dorks)+len(added_general)
@ -148,7 +272,15 @@ class SpellAdditionMixin:
commander_name = getattr(self, 'commander', None)
if commander_name:
pool = pool[pool['name'] != commander_name]
pool = self._apply_bracket_pre_filters(pool)
pool = bu.sort_by_priority(pool, ['edhrecRank','manaValue'])
self._debug_dump_pool(pool, 'removal')
try:
if str(os.getenv('DEBUG_SPELL_POOLS', '')).strip().lower() in {"1","true","yes","on"}:
names = pool['name'].astype(str).head(40).tolist()
self.output_func(f"[DEBUG][Removal] Pool size: {len(pool)}; top {len(names)}: {', '.join(names)}")
except Exception:
pass
if getattr(self, 'prefer_owned', False):
owned_set = getattr(self, 'owned_card_names', None)
if owned_set:
@ -210,7 +342,15 @@ class SpellAdditionMixin:
commander_name = getattr(self, 'commander', None)
if commander_name:
pool = pool[pool['name'] != commander_name]
pool = self._apply_bracket_pre_filters(pool)
pool = bu.sort_by_priority(pool, ['edhrecRank','manaValue'])
self._debug_dump_pool(pool, 'wipes')
try:
if str(os.getenv('DEBUG_SPELL_POOLS', '')).strip().lower() in {"1","true","yes","on"}:
names = pool['name'].astype(str).head(30).tolist()
self.output_func(f"[DEBUG][Wipes] Pool size: {len(pool)}; sample: {', '.join(names)}")
except Exception:
pass
if getattr(self, 'prefer_owned', False):
owned_set = getattr(self, 'owned_card_names', None)
if owned_set:
@ -278,6 +418,7 @@ class SpellAdditionMixin:
def is_draw(tags):
return any(('draw' in t) or ('card advantage' in t) for t in tags)
df = df[df['_ltags'].apply(is_draw)]
df = self._apply_bracket_pre_filters(df)
df = df[~df['type'].fillna('').str.contains('Land', case=False, na=False)]
commander_name = getattr(self, 'commander', None)
if commander_name:
@ -291,6 +432,19 @@ class SpellAdditionMixin:
return bu.sort_by_priority(d, ['edhrecRank','manaValue'])
conditional_df = sortit(conditional_df)
unconditional_df = sortit(unconditional_df)
self._debug_dump_pool(conditional_df, 'card_advantage_conditional')
self._debug_dump_pool(unconditional_df, 'card_advantage_unconditional')
try:
if str(os.getenv('DEBUG_SPELL_POOLS', '')).strip().lower() in {"1","true","yes","on"}:
c_names = conditional_df['name'].astype(str).head(30).tolist()
u_names = unconditional_df['name'].astype(str).head(30).tolist()
self.output_func(f"[DEBUG][CardAdv] Total pool: {len(df)}; conditional: {len(conditional_df)}; unconditional: {len(unconditional_df)}")
if c_names:
self.output_func(f"[DEBUG][CardAdv] Conditional sample: {', '.join(c_names)}")
if u_names:
self.output_func(f"[DEBUG][CardAdv] Unconditional sample: {', '.join(u_names)}")
except Exception:
pass
if getattr(self, 'prefer_owned', False):
owned_set = getattr(self, 'owned_card_names', None)
if owned_set:
@ -368,7 +522,15 @@ class SpellAdditionMixin:
commander_name = getattr(self, 'commander', None)
if commander_name:
pool = pool[pool['name'] != commander_name]
pool = self._apply_bracket_pre_filters(pool)
pool = bu.sort_by_priority(pool, ['edhrecRank','manaValue'])
self._debug_dump_pool(pool, 'protection')
try:
if str(os.getenv('DEBUG_SPELL_POOLS', '')).strip().lower() in {"1","true","yes","on"}:
names = pool['name'].astype(str).head(30).tolist()
self.output_func(f"[DEBUG][Protection] Pool size: {len(pool)}; sample: {', '.join(names)}")
except Exception:
pass
if getattr(self, 'prefer_owned', False):
owned_set = getattr(self, 'owned_card_names', None)
if owned_set:
@ -467,6 +629,7 @@ class SpellAdditionMixin:
~df['type'].str.contains('Land', case=False, na=False)
& ~df['type'].str.contains('Creature', case=False, na=False)
].copy()
spells_df = self._apply_bracket_pre_filters(spells_df)
if spells_df.empty:
return
selected_tags_lower = [t.lower() for _r, t in themes_ordered]
@ -521,6 +684,7 @@ class SpellAdditionMixin:
if owned_set:
subset = bu.prefer_owned_first(subset, {str(n).lower() for n in owned_set})
pool = subset.head(top_n).copy()
pool = self._apply_bracket_pre_filters(pool)
pool = pool[~pool['name'].isin(self.card_library.keys())]
if pool.empty:
continue
@ -563,6 +727,7 @@ class SpellAdditionMixin:
if total_added < remaining:
need = remaining - total_added
multi_pool = spells_df[~spells_df['name'].isin(self.card_library.keys())].copy()
multi_pool = self._apply_bracket_pre_filters(multi_pool)
if combine_mode == 'AND' and len(selected_tags_lower) > 1:
prioritized = multi_pool[multi_pool['_multiMatch'] >= 2]
if prioritized.empty:
@ -607,6 +772,7 @@ class SpellAdditionMixin:
if total_added < remaining:
extra_needed = remaining - total_added
leftover = spells_df[~spells_df['name'].isin(self.card_library.keys())].copy()
leftover = self._apply_bracket_pre_filters(leftover)
if not leftover.empty:
if '_normTags' not in leftover.columns:
leftover['_normTags'] = leftover['themeTags'].apply(

View file

@ -26,6 +26,176 @@ class ReportingMixin:
self.print_card_library(table=True)
"""Phase 6: Reporting, summaries, and export helpers."""
def enforce_and_reexport(self, base_stem: str | None = None, mode: str = "prompt") -> dict:
"""Run bracket enforcement, then re-export CSV/TXT and recompute compliance.
mode: 'prompt' for CLI interactive; 'auto' for headless/web.
Returns the final compliance report dict.
"""
try:
# Lazy import to avoid cycles
from deck_builder.enforcement import enforce_bracket_compliance # type: ignore
except Exception:
self.output_func("Enforcement module unavailable.")
return {}
# Enforce
report = enforce_bracket_compliance(self, mode=mode)
# If enforcement removed cards without enough replacements, top up to 100 using theme filler
try:
total_cards = 0
for _n, _e in getattr(self, 'card_library', {}).items():
try:
total_cards += int(_e.get('Count', 1))
except Exception:
total_cards += 1
if int(total_cards) < 100 and hasattr(self, 'fill_remaining_theme_spells'):
before = int(total_cards)
try:
self.fill_remaining_theme_spells() # type: ignore[attr-defined]
except Exception:
pass
# Recompute after filler
try:
total_cards = 0
for _n, _e in getattr(self, 'card_library', {}).items():
try:
total_cards += int(_e.get('Count', 1))
except Exception:
total_cards += 1
except Exception:
total_cards = before
try:
self.output_func(f"Topped up deck to {total_cards}/100 after enforcement.")
except Exception:
pass
except Exception:
pass
# Print what changed
try:
enf = report.get('enforcement') or {}
removed = list(enf.get('removed') or [])
added = list(enf.get('added') or [])
if removed or added:
self.output_func("\nEnforcement Summary (swaps):")
if removed:
self.output_func("Removed:")
for n in removed:
self.output_func(f" - {n}")
if added:
self.output_func("Added:")
for n in added:
self.output_func(f" + {n}")
except Exception:
pass
# Re-export using same base, if provided
try:
import os as _os
import json as _json
if isinstance(base_stem, str) and base_stem.strip():
# Mirror CSV/TXT export naming
csv_name = base_stem + ".csv"
txt_name = base_stem + ".txt"
# Overwrite exports with updated library
self.export_decklist_csv(directory='deck_files', filename=csv_name, suppress_output=True) # type: ignore[attr-defined]
self.export_decklist_text(directory='deck_files', filename=txt_name, suppress_output=True) # type: ignore[attr-defined]
# Recompute and write compliance next to them
self.compute_and_print_compliance(base_stem=base_stem) # type: ignore[attr-defined]
# Inject enforcement details into the saved compliance JSON for UI transparency
comp_path = _os.path.join('deck_files', f"{base_stem}_compliance.json")
try:
if _os.path.exists(comp_path) and isinstance(report, dict) and report.get('enforcement'):
with open(comp_path, 'r', encoding='utf-8') as _f:
comp_obj = _json.load(_f)
comp_obj['enforcement'] = report.get('enforcement')
with open(comp_path, 'w', encoding='utf-8') as _f:
_json.dump(comp_obj, _f, indent=2)
except Exception:
pass
else:
# Fall back to default export flow
csv_path = self.export_decklist_csv() # type: ignore[attr-defined]
try:
base, _ = _os.path.splitext(csv_path)
base_only = _os.path.basename(base)
except Exception:
base_only = None
self.export_decklist_text(filename=(base_only + '.txt') if base_only else None) # type: ignore[attr-defined]
if base_only:
self.compute_and_print_compliance(base_stem=base_only) # type: ignore[attr-defined]
# Inject enforcement into written JSON as above
try:
comp_path = _os.path.join('deck_files', f"{base_only}_compliance.json")
if _os.path.exists(comp_path) and isinstance(report, dict) and report.get('enforcement'):
with open(comp_path, 'r', encoding='utf-8') as _f:
comp_obj = _json.load(_f)
comp_obj['enforcement'] = report.get('enforcement')
with open(comp_path, 'w', encoding='utf-8') as _f:
_json.dump(comp_obj, _f, indent=2)
except Exception:
pass
except Exception:
pass
return report
def compute_and_print_compliance(self, base_stem: str | None = None) -> dict:
"""Compute bracket compliance, print a compact summary, and optionally write a JSON report.
If base_stem is provided, writes deck_files/{base_stem}_compliance.json.
Returns the compliance report dict.
"""
try:
# Late import to avoid circulars in some environments
from deck_builder.brackets_compliance import evaluate_deck # type: ignore
except Exception:
self.output_func("Bracket compliance module unavailable.")
return {}
try:
bracket_key = str(getattr(self, 'bracket_name', '') or getattr(self, 'bracket_level', 'core')).lower()
commander = getattr(self, 'commander_name', None)
report = evaluate_deck(self.card_library, commander_name=commander, bracket=bracket_key)
except Exception as e:
self.output_func(f"Compliance evaluation failed: {e}")
return {}
# Print concise summary
try:
self.output_func("\nBracket Compliance:")
self.output_func(f" Overall: {report.get('overall', 'PASS')}")
cats = report.get('categories', {}) or {}
order = [
('game_changers', 'Game Changers'),
('mass_land_denial', 'Mass Land Denial'),
('extra_turns', 'Extra Turns'),
('tutors_nonland', 'Nonland Tutors'),
('two_card_combos', 'Two-Card Combos'),
]
for key, label in order:
c = cats.get(key, {}) or {}
cnt = int(c.get('count', 0) or 0)
lim = c.get('limit')
status = str(c.get('status') or 'PASS')
lim_txt = ('Unlimited' if lim is None else str(int(lim)))
self.output_func(f" {label:<16} {cnt} / {lim_txt} [{status}]")
except Exception:
pass
# Optionally write JSON report next to exports
if isinstance(base_stem, str) and base_stem.strip():
try:
import os as _os
_os.makedirs('deck_files', exist_ok=True)
path = _os.path.join('deck_files', f"{base_stem}_compliance.json")
import json as _json
with open(path, 'w', encoding='utf-8') as f:
_json.dump(report, f, indent=2)
self.output_func(f"Compliance report saved to {path}")
except Exception:
pass
return report
def _wrap_cell(self, text: str, width: int = 28) -> str:
"""Wraps a string to a specified width for table display.
Used for pretty-printing card names, roles, and tags in tabular output.