mirror of
https://github.com/mwisnowski/mtg_python_deckbuilder.git
synced 2025-09-21 20:40:47 +02:00
865 lines
38 KiB
Python
865 lines
38 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Dict, Any, List, Tuple
|
|
import copy
|
|
from deck_builder.builder import DeckBuilder
|
|
from deck_builder.phases.phase0_core import BRACKET_DEFINITIONS
|
|
from deck_builder import builder_constants as bc
|
|
import os
|
|
import time
|
|
import json
|
|
from datetime import datetime as _dt
|
|
import re
|
|
|
|
|
|
def commander_names() -> List[str]:
|
|
tmp = DeckBuilder()
|
|
df = tmp.load_commander_data()
|
|
return df["name"].astype(str).tolist()
|
|
|
|
|
|
def commander_candidates(query: str, limit: int = 10) -> List[Tuple[str, int, List[str]]]:
|
|
# Normalize query similar to CLI to reduce case sensitivity surprises
|
|
tmp = DeckBuilder()
|
|
try:
|
|
if hasattr(tmp, '_normalize_commander_query'):
|
|
query = tmp._normalize_commander_query(query) # type: ignore[attr-defined]
|
|
else:
|
|
# Light fallback: basic title case
|
|
query = ' '.join([w[:1].upper() + w[1:].lower() if w else w for w in str(query).split(' ')])
|
|
except Exception:
|
|
pass
|
|
df = tmp.load_commander_data()
|
|
# Filter to plausible commanders: Legendary Creature, or text explicitly allows being a commander.
|
|
try:
|
|
cols = set(df.columns.astype(str))
|
|
has_type = ('type' in cols) or ('type_line' in cols)
|
|
has_text = ('text' in cols) or ('oracleText' in cols)
|
|
if has_type or has_text:
|
|
def _is_commander_row(_r) -> bool:
|
|
try:
|
|
tline = str(_r.get('type', _r.get('type_line', '')) or '').lower()
|
|
textv = str(_r.get('text', _r.get('oracleText', '')) or '').lower()
|
|
if 'legendary' in tline and 'creature' in tline:
|
|
return True
|
|
if 'legendary' in tline and 'planeswalker' in tline and 'can be your commander' in textv:
|
|
return True
|
|
if 'can be your commander' in textv:
|
|
return True
|
|
except Exception:
|
|
return False
|
|
return False
|
|
df_comm = df[df.apply(_is_commander_row, axis=1)]
|
|
if not df_comm.empty:
|
|
df = df_comm
|
|
# else: keep df as-is when columns not present
|
|
except Exception:
|
|
pass
|
|
names = df["name"].astype(str).tolist()
|
|
# Reuse existing scoring helpers through the DeckBuilder API
|
|
scored_raw = tmp._gather_candidates(query, names)
|
|
# Consider a wider pool for re-ranking so exact substrings bubble up
|
|
pool = scored_raw[: max(limit * 5, 50)]
|
|
# Force-include any names that contain the raw query as a substring (case-insensitive)
|
|
# to avoid missing obvious matches like 'Inti, Seneschal of the Sun' for 'inti'.
|
|
try:
|
|
q_raw = (query or "").strip().lower()
|
|
if q_raw:
|
|
have = {n for (n, _s) in pool}
|
|
# Map original scores for reuse
|
|
base_scores = {n: int(s) for (n, s) in scored_raw}
|
|
for n in names:
|
|
nl = str(n).lower()
|
|
if q_raw in nl and n not in have:
|
|
# Assign a reasonable base score if not present; favor prefixes
|
|
approx = base_scores.get(n, 90 if nl.startswith(q_raw) else 80)
|
|
pool.append((n, approx))
|
|
except Exception:
|
|
pass
|
|
# Attach color identity for each candidate
|
|
try:
|
|
df = tmp.load_commander_data()
|
|
except Exception:
|
|
df = None
|
|
q = (query or "").strip().lower()
|
|
tokens = [t for t in re.split(r"[\s,]+", q) if t]
|
|
def _color_list_for(name: str) -> List[str]:
|
|
colors: List[str] = []
|
|
try:
|
|
if df is not None:
|
|
row = df[df["name"].astype(str) == str(name)]
|
|
if not row.empty:
|
|
ci = row.iloc[0].get("colorIdentity")
|
|
if isinstance(ci, list):
|
|
colors = [str(c).upper() for c in ci if str(c).strip()]
|
|
elif isinstance(ci, str) and ci.strip():
|
|
parts = [p.strip().upper() for p in ci.replace('[', '').replace(']', '').replace("'", '').split(',') if p.strip()]
|
|
colors = parts if parts else list(ci)
|
|
if not colors:
|
|
colors = ["C"]
|
|
except Exception:
|
|
colors = ["C"]
|
|
return colors
|
|
|
|
rescored: List[Tuple[str, int, List[str], int, int, int]] = [] # (name, orig_score, colors, rank_score, pos, exact_first_word)
|
|
for name, score in pool:
|
|
colors: List[str] = []
|
|
colors = _color_list_for(name)
|
|
nl = str(name).lower()
|
|
bonus = 0
|
|
pos = nl.find(q) if q else -1
|
|
# Extract first word (letters only) for exact first-word preference
|
|
try:
|
|
m_first = re.match(r"^[a-z0-9']+", nl)
|
|
first_word = m_first.group(0) if m_first else ""
|
|
except Exception:
|
|
first_word = nl.split(" ", 1)[0] if nl else ""
|
|
exact_first = 1 if (q and first_word == q) else 0
|
|
# Base heuristics
|
|
if q:
|
|
if nl == q:
|
|
bonus += 100
|
|
if nl.startswith(q):
|
|
bonus += 60
|
|
if re.search(r"\b" + re.escape(q), nl):
|
|
bonus += 40
|
|
if q in nl:
|
|
bonus += 30
|
|
# Strongly prefer exact first-word equality over general prefix
|
|
if exact_first:
|
|
bonus += 140
|
|
# Multi-token bonuses
|
|
if tokens:
|
|
present = sum(1 for t in tokens if t in nl)
|
|
all_present = 1 if all(t in nl for t in tokens) else 0
|
|
bonus += present * 10 + all_present * 40
|
|
# Extra if first token is a prefix
|
|
if nl.startswith(tokens[0]):
|
|
bonus += 15
|
|
# Favor shorter names slightly and earlier positions
|
|
bonus += max(0, 20 - len(nl))
|
|
if pos >= 0:
|
|
bonus += max(0, 20 - pos)
|
|
rank_score = int(score) + bonus
|
|
rescored.append((name, int(score), colors, rank_score, pos if pos >= 0 else 10**6, exact_first))
|
|
|
|
# Sort: exact first-word matches first, then by rank score desc, then earliest position, then original score desc, then name asc
|
|
rescored.sort(key=lambda x: (-x[5], -x[3], x[4], -x[1], x[0]))
|
|
top = rescored[:limit]
|
|
return [(name, orig_score, colors) for (name, orig_score, colors, _r, _p, _e) in top]
|
|
|
|
|
|
def commander_inspect(name: str) -> Dict[str, Any]:
|
|
tmp = DeckBuilder()
|
|
df = tmp.load_commander_data()
|
|
row = df[df["name"] == name]
|
|
if row.empty:
|
|
return {"ok": False, "error": "Commander not found"}
|
|
pretty = tmp._format_commander_pretty(row.iloc[0])
|
|
return {"ok": True, "pretty": pretty}
|
|
|
|
|
|
def commander_select(name: str) -> Dict[str, Any]:
|
|
tmp = DeckBuilder()
|
|
df = tmp.load_commander_data()
|
|
# Try exact match, then normalized match
|
|
row = df[df["name"] == name]
|
|
if row.empty:
|
|
try:
|
|
if hasattr(tmp, '_normalize_commander_query'):
|
|
name2 = tmp._normalize_commander_query(name) # type: ignore[attr-defined]
|
|
else:
|
|
name2 = ' '.join([w[:1].upper() + w[1:].lower() if w else w for w in str(name).split(' ')])
|
|
row = df[df["name"] == name2]
|
|
except Exception:
|
|
row = df[df["name"] == name]
|
|
if row.empty:
|
|
return {"ok": False, "error": "Commander not found"}
|
|
tmp._apply_commander_selection(row.iloc[0])
|
|
# Derive tags and a quick preview of bracket choices
|
|
tags = list(dict.fromkeys(tmp.commander_tags)) if hasattr(tmp, "commander_tags") else []
|
|
return {
|
|
"ok": True,
|
|
"name": name,
|
|
"tags": tags,
|
|
}
|
|
|
|
|
|
def tags_for_commander(name: str) -> List[str]:
|
|
tmp = DeckBuilder()
|
|
df = tmp.load_commander_data()
|
|
row = df[df["name"] == name]
|
|
if row.empty:
|
|
return []
|
|
raw = row.iloc[0].get("themeTags", [])
|
|
if isinstance(raw, list):
|
|
return list(dict.fromkeys([str(t).strip() for t in raw if str(t).strip()]))
|
|
if isinstance(raw, str) and raw.strip():
|
|
parts = [p.strip().strip("'\"") for p in raw.split(',')]
|
|
return [p for p in parts if p]
|
|
return []
|
|
|
|
|
|
def bracket_options() -> List[Dict[str, Any]]:
|
|
return [{"level": b.level, "name": b.name, "desc": b.short_desc} for b in BRACKET_DEFINITIONS]
|
|
|
|
|
|
def ideal_defaults() -> Dict[str, Any]:
|
|
return {
|
|
"ramp": getattr(bc, 'DEFAULT_RAMP_COUNT', 10),
|
|
"lands": getattr(bc, 'DEFAULT_LAND_COUNT', 35),
|
|
"basic_lands": getattr(bc, 'DEFAULT_BASIC_LAND_COUNT', 20),
|
|
"fetch_lands": getattr(bc, 'FETCH_LAND_DEFAULT_COUNT', 3),
|
|
"creatures": getattr(bc, 'DEFAULT_CREATURE_COUNT', 28),
|
|
"removal": getattr(bc, 'DEFAULT_REMOVAL_COUNT', 10),
|
|
"wipes": getattr(bc, 'DEFAULT_WIPES_COUNT', 2),
|
|
"card_advantage": getattr(bc, 'DEFAULT_CARD_ADVANTAGE_COUNT', 8),
|
|
"protection": getattr(bc, 'DEFAULT_PROTECTION_COUNT', 4),
|
|
}
|
|
|
|
|
|
def ideal_labels() -> Dict[str, str]:
|
|
return {
|
|
'ramp': 'Ramp',
|
|
'lands': 'Total Lands',
|
|
'basic_lands': 'Basic Lands (Min)',
|
|
'fetch_lands': 'Fetch Lands',
|
|
'creatures': 'Creatures',
|
|
'removal': 'Spot Removal',
|
|
'wipes': 'Board Wipes',
|
|
'card_advantage': 'Card Advantage',
|
|
'protection': 'Protection',
|
|
}
|
|
|
|
|
|
def _ensure_setup_ready(out, force: bool = False) -> None:
|
|
"""Ensure card CSVs exist and tagging has completed; bootstrap if needed.
|
|
|
|
Mirrors the CLI behavior used in build_deck_full: if csv_files/cards.csv is
|
|
missing, too old, or the tagging flag is absent, run initial setup and tagging.
|
|
"""
|
|
def _write_status(payload: dict) -> None:
|
|
try:
|
|
os.makedirs('csv_files', exist_ok=True)
|
|
# Preserve started_at if present
|
|
status_path = os.path.join('csv_files', '.setup_status.json')
|
|
existing = {}
|
|
try:
|
|
if os.path.exists(status_path):
|
|
with open(status_path, 'r', encoding='utf-8') as _rf:
|
|
existing = json.load(_rf) or {}
|
|
except Exception:
|
|
existing = {}
|
|
# Merge and keep started_at unless explicitly overridden
|
|
merged = {**existing, **payload}
|
|
if 'started_at' not in merged and existing.get('started_at'):
|
|
merged['started_at'] = existing.get('started_at')
|
|
merged['updated'] = _dt.now().isoformat(timespec='seconds')
|
|
with open(status_path, 'w', encoding='utf-8') as f:
|
|
json.dump(merged, f)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
cards_path = os.path.join('csv_files', 'cards.csv')
|
|
flag_path = os.path.join('csv_files', '.tagging_complete.json')
|
|
refresh_needed = bool(force)
|
|
if force:
|
|
_write_status({"running": True, "phase": "setup", "message": "Forcing full setup and tagging...", "started_at": _dt.now().isoformat(timespec='seconds'), "percent": 0})
|
|
|
|
if not os.path.exists(cards_path):
|
|
out("cards.csv not found. Running initial setup and tagging...")
|
|
_write_status({"running": True, "phase": "setup", "message": "Preparing card database (initial setup)...", "started_at": _dt.now().isoformat(timespec='seconds'), "percent": 0})
|
|
refresh_needed = True
|
|
else:
|
|
try:
|
|
age_seconds = time.time() - os.path.getmtime(cards_path)
|
|
if age_seconds > 7 * 24 * 60 * 60 and not force:
|
|
out("cards.csv is older than 7 days. Refreshing data (setup + tagging)...")
|
|
_write_status({"running": True, "phase": "setup", "message": "Refreshing card database (initial setup)...", "started_at": _dt.now().isoformat(timespec='seconds'), "percent": 0})
|
|
refresh_needed = True
|
|
except Exception:
|
|
pass
|
|
|
|
if not os.path.exists(flag_path):
|
|
out("Tagging completion flag not found. Performing full tagging...")
|
|
if not refresh_needed:
|
|
_write_status({"running": True, "phase": "tagging", "message": "Applying tags to card database...", "started_at": _dt.now().isoformat(timespec='seconds'), "percent": 0})
|
|
refresh_needed = True
|
|
|
|
if refresh_needed:
|
|
try:
|
|
from file_setup.setup import initial_setup # type: ignore
|
|
# Always run initial_setup when forced or when cards are missing/stale
|
|
initial_setup()
|
|
except Exception as e:
|
|
out(f"Initial setup failed: {e}")
|
|
_write_status({"running": False, "phase": "error", "message": f"Initial setup failed: {e}"})
|
|
return
|
|
# Tagging with granular color progress
|
|
try:
|
|
from tagging import tagger as _tagger # type: ignore
|
|
from settings import COLORS as _COLORS # type: ignore
|
|
colors = list(_COLORS)
|
|
total = len(colors)
|
|
_write_status({
|
|
"running": True,
|
|
"phase": "tagging",
|
|
"message": "Tagging cards (this may take a while)...",
|
|
"color": None,
|
|
"percent": 0,
|
|
"color_idx": 0,
|
|
"color_total": total,
|
|
"tagging_started_at": _dt.now().isoformat(timespec='seconds')
|
|
})
|
|
for idx, _color in enumerate(colors, start=1):
|
|
try:
|
|
pct = int((idx - 1) * 100 / max(1, total))
|
|
# Estimate ETA based on average time per completed color
|
|
eta_s = None
|
|
try:
|
|
from datetime import datetime as __dt
|
|
ts = __dt.fromisoformat(json.load(open(os.path.join('csv_files', '.setup_status.json'), 'r', encoding='utf-8')).get('tagging_started_at')) # type: ignore
|
|
elapsed = max(0.0, (_dt.now() - ts).total_seconds())
|
|
completed = max(0, idx - 1)
|
|
if completed > 0:
|
|
avg = elapsed / completed
|
|
remaining = max(0, total - completed)
|
|
eta_s = int(avg * remaining)
|
|
except Exception:
|
|
eta_s = None
|
|
payload = {
|
|
"running": True,
|
|
"phase": "tagging",
|
|
"message": f"Tagging {_color}...",
|
|
"color": _color,
|
|
"percent": pct,
|
|
"color_idx": idx,
|
|
"color_total": total,
|
|
}
|
|
if eta_s is not None:
|
|
payload["eta_seconds"] = eta_s
|
|
_write_status(payload)
|
|
_tagger.load_dataframe(_color)
|
|
except Exception as e:
|
|
out(f"Tagging {_color} failed: {e}")
|
|
_write_status({"running": False, "phase": "error", "message": f"Tagging {_color} failed: {e}", "color": _color})
|
|
return
|
|
except Exception as e:
|
|
out(f"Tagging failed to start: {e}")
|
|
_write_status({"running": False, "phase": "error", "message": f"Tagging failed to start: {e}"})
|
|
return
|
|
try:
|
|
os.makedirs('csv_files', exist_ok=True)
|
|
with open(flag_path, 'w', encoding='utf-8') as _fh:
|
|
json.dump({'tagged_at': _dt.now().isoformat(timespec='seconds')}, _fh)
|
|
# Final status with percent 100 and timing info
|
|
finished_dt = _dt.now()
|
|
finished = finished_dt.isoformat(timespec='seconds')
|
|
# Compute duration_seconds if started_at exists
|
|
duration_s = None
|
|
try:
|
|
from datetime import datetime as __dt
|
|
status_path = os.path.join('csv_files', '.setup_status.json')
|
|
with open(status_path, 'r', encoding='utf-8') as _rf:
|
|
_st = json.load(_rf) or {}
|
|
if _st.get('started_at'):
|
|
start_dt = __dt.fromisoformat(_st['started_at'])
|
|
duration_s = int(max(0.0, (finished_dt - start_dt).total_seconds()))
|
|
except Exception:
|
|
duration_s = None
|
|
payload = {"running": False, "phase": "done", "message": "Setup complete", "color": None, "percent": 100, "finished_at": finished}
|
|
if duration_s is not None:
|
|
payload["duration_seconds"] = duration_s
|
|
_write_status(payload)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
# Non-fatal; downstream loads will still attempt and surface errors in logs
|
|
_write_status({"running": False, "phase": "error", "message": "Setup check failed"})
|
|
|
|
|
|
def run_build(commander: str, tags: List[str], bracket: int, ideals: Dict[str, int]) -> Dict[str, Any]:
|
|
"""Run the deck build end-to-end with provided selections and capture logs.
|
|
|
|
Returns: { ok: bool, log: str, csv_path: Optional[str], txt_path: Optional[str], error: Optional[str] }
|
|
"""
|
|
logs: List[str] = []
|
|
|
|
def out(msg: str) -> None:
|
|
try:
|
|
logs.append(msg)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
# Provide a no-op input function so any leftover prompts auto-accept defaults
|
|
b = DeckBuilder(output_func=out, input_func=lambda _prompt: "", headless=True)
|
|
# Ensure setup/tagging present for web headless run
|
|
_ensure_setup_ready(out)
|
|
# Commander selection
|
|
df = b.load_commander_data()
|
|
row = df[df["name"].astype(str) == str(commander)]
|
|
if row.empty:
|
|
return {"ok": False, "error": f"Commander not found: {commander}", "log": "\n".join(logs)}
|
|
b._apply_commander_selection(row.iloc[0])
|
|
|
|
# Tags
|
|
b.selected_tags = list(tags or [])
|
|
b.primary_tag = b.selected_tags[0] if len(b.selected_tags) > 0 else None
|
|
b.secondary_tag = b.selected_tags[1] if len(b.selected_tags) > 1 else None
|
|
b.tertiary_tag = b.selected_tags[2] if len(b.selected_tags) > 2 else None
|
|
try:
|
|
b._update_commander_dict_with_selected_tags()
|
|
except Exception:
|
|
pass
|
|
|
|
# Bracket
|
|
bd = next((x for x in BRACKET_DEFINITIONS if int(getattr(x, 'level', 0)) == int(bracket)), None)
|
|
if bd is None:
|
|
return {"ok": False, "error": f"Invalid bracket level: {bracket}", "log": "\n".join(logs)}
|
|
b.bracket_definition = bd
|
|
b.bracket_level = bd.level
|
|
b.bracket_name = bd.name
|
|
b.bracket_limits = dict(getattr(bd, 'limits', {}))
|
|
|
|
# Ideal counts
|
|
b.ideal_counts = {k: int(v) for k, v in (ideals or {}).items()}
|
|
|
|
# Load data and run phases
|
|
try:
|
|
b.determine_color_identity()
|
|
b.setup_dataframes()
|
|
except Exception as e:
|
|
out(f"Failed to load color identity/card pool: {e}")
|
|
|
|
try:
|
|
b._run_land_build_steps()
|
|
except Exception as e:
|
|
out(f"Land build failed: {e}")
|
|
|
|
try:
|
|
if hasattr(b, 'add_creatures_phase'):
|
|
b.add_creatures_phase()
|
|
except Exception as e:
|
|
out(f"Creature phase failed: {e}")
|
|
try:
|
|
if hasattr(b, 'add_spells_phase'):
|
|
b.add_spells_phase()
|
|
except Exception as e:
|
|
out(f"Spell phase failed: {e}")
|
|
try:
|
|
if hasattr(b, 'post_spell_land_adjust'):
|
|
b.post_spell_land_adjust()
|
|
except Exception as e:
|
|
out(f"Post-spell land adjust failed: {e}")
|
|
|
|
# Reporting/exports
|
|
csv_path = None
|
|
txt_path = None
|
|
try:
|
|
if hasattr(b, 'run_reporting_phase'):
|
|
b.run_reporting_phase()
|
|
except Exception as e:
|
|
out(f"Reporting phase failed: {e}")
|
|
try:
|
|
if hasattr(b, 'export_decklist_csv'):
|
|
csv_path = b.export_decklist_csv() # type: ignore[attr-defined]
|
|
except Exception as e:
|
|
out(f"CSV export failed: {e}")
|
|
try:
|
|
if hasattr(b, 'export_decklist_text'):
|
|
# Try to mirror build_deck_full behavior by displaying the contents
|
|
import os as _os
|
|
base, _ext = _os.path.splitext(_os.path.basename(csv_path)) if csv_path else (f"deck_{b.timestamp}", "")
|
|
txt_path = b.export_decklist_text(filename=base + '.txt') # type: ignore[attr-defined]
|
|
try:
|
|
b._display_txt_contents(txt_path)
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
out(f"Text export failed: {e}")
|
|
|
|
# Build structured summary for UI
|
|
summary = None
|
|
try:
|
|
if hasattr(b, 'build_deck_summary'):
|
|
summary = b.build_deck_summary() # type: ignore[attr-defined]
|
|
except Exception:
|
|
summary = None
|
|
# Write sidecar summary JSON next to CSV (if available)
|
|
try:
|
|
if summary and csv_path:
|
|
import os as _os
|
|
import json as _json
|
|
base, _ = _os.path.splitext(csv_path)
|
|
sidecar = base + '.summary.json'
|
|
meta = {
|
|
"commander": getattr(b, 'commander_name', '') or getattr(b, 'commander', ''),
|
|
"tags": list(getattr(b, 'selected_tags', []) or []) or [t for t in [getattr(b, 'primary_tag', None), getattr(b, 'secondary_tag', None), getattr(b, 'tertiary_tag', None)] if t],
|
|
"bracket_level": getattr(b, 'bracket_level', None),
|
|
"csv": csv_path,
|
|
"txt": txt_path,
|
|
}
|
|
payload = {"meta": meta, "summary": summary}
|
|
with open(sidecar, 'w', encoding='utf-8') as f:
|
|
_json.dump(payload, f, ensure_ascii=False, indent=2)
|
|
except Exception:
|
|
pass
|
|
return {"ok": True, "log": "\n".join(logs), "csv_path": csv_path, "txt_path": txt_path, "summary": summary}
|
|
except Exception as e:
|
|
logs.append(f"Build failed: {e}")
|
|
return {"ok": False, "error": str(e), "log": "\n".join(logs)}
|
|
|
|
|
|
# -----------------
|
|
# Step-by-step build session
|
|
# -----------------
|
|
def _make_stages(b: DeckBuilder) -> List[Dict[str, Any]]:
|
|
stages: List[Dict[str, Any]] = []
|
|
# Web UI: skip theme confirmation stages (CLI-only pauses)
|
|
# Land steps 1..8 (if present)
|
|
for i in range(1, 9):
|
|
fn = getattr(b, f"run_land_step{i}", None)
|
|
if callable(fn):
|
|
stages.append({"key": f"land{i}", "label": f"Lands (Step {i})", "runner_name": f"run_land_step{i}"})
|
|
# Creatures split into theme sub-stages for web confirm
|
|
if getattr(b, 'primary_tag', None) and hasattr(b, 'add_creatures_primary_phase'):
|
|
stages.append({"key": "creatures_primary", "label": "Creatures: Primary", "runner_name": "add_creatures_primary_phase"})
|
|
if getattr(b, 'secondary_tag', None) and hasattr(b, 'add_creatures_secondary_phase'):
|
|
stages.append({"key": "creatures_secondary", "label": "Creatures: Secondary", "runner_name": "add_creatures_secondary_phase"})
|
|
if getattr(b, 'tertiary_tag', None) and hasattr(b, 'add_creatures_tertiary_phase'):
|
|
stages.append({"key": "creatures_tertiary", "label": "Creatures: Tertiary", "runner_name": "add_creatures_tertiary_phase"})
|
|
if hasattr(b, 'add_creatures_fill_phase'):
|
|
stages.append({"key": "creatures_fill", "label": "Creatures: Fill", "runner_name": "add_creatures_fill_phase"})
|
|
# Spells: prefer granular categories when available; otherwise fall back to bulk
|
|
spell_categories: List[Tuple[str, str, str]] = [
|
|
("ramp", "Confirm Ramp", "add_ramp"),
|
|
("removal", "Confirm Removal", "add_removal"),
|
|
("wipes", "Confirm Board Wipes", "add_board_wipes"),
|
|
("card_advantage", "Confirm Card Advantage", "add_card_advantage"),
|
|
("protection", "Confirm Protection", "add_protection"),
|
|
]
|
|
any_granular = any(callable(getattr(b, rn, None)) for _key, _label, rn in spell_categories)
|
|
if any_granular:
|
|
for key, label, runner in spell_categories:
|
|
if callable(getattr(b, runner, None)):
|
|
# Web UI: omit confirm stages; show only the action stage
|
|
label_action = label.replace("Confirm ", "")
|
|
stages.append({"key": f"spells_{key}", "label": label_action, "runner_name": runner})
|
|
# Ensure we include the theme filler step to top up to 100 cards
|
|
if callable(getattr(b, 'fill_remaining_theme_spells', None)):
|
|
stages.append({"key": "spells_fill", "label": "Theme Spell Fill", "runner_name": "fill_remaining_theme_spells"})
|
|
elif hasattr(b, 'add_spells_phase'):
|
|
stages.append({"key": "spells", "label": "Spells", "runner_name": "add_spells_phase"})
|
|
# Post-adjust
|
|
if hasattr(b, 'post_spell_land_adjust'):
|
|
stages.append({"key": "post_adjust", "label": "Post-Spell Land Adjust", "runner_name": "post_spell_land_adjust"})
|
|
# Reporting
|
|
if hasattr(b, 'run_reporting_phase'):
|
|
stages.append({"key": "reporting", "label": "Reporting", "runner_name": "run_reporting_phase"})
|
|
# Export is not a separate stage here; we will auto-export at the final continue.
|
|
return stages
|
|
|
|
|
|
def start_build_ctx(commander: str, tags: List[str], bracket: int, ideals: Dict[str, int]) -> Dict[str, Any]:
|
|
logs: List[str] = []
|
|
|
|
def out(msg: str) -> None:
|
|
logs.append(msg)
|
|
|
|
# Provide a no-op input function so staged web builds never block on input
|
|
b = DeckBuilder(output_func=out, input_func=lambda _prompt: "", headless=True)
|
|
# Ensure setup/tagging present before staged build
|
|
_ensure_setup_ready(out)
|
|
# Commander selection
|
|
df = b.load_commander_data()
|
|
row = df[df["name"].astype(str) == str(commander)]
|
|
if row.empty:
|
|
raise ValueError(f"Commander not found: {commander}")
|
|
b._apply_commander_selection(row.iloc[0])
|
|
# Tags
|
|
b.selected_tags = list(tags or [])
|
|
b.primary_tag = b.selected_tags[0] if len(b.selected_tags) > 0 else None
|
|
b.secondary_tag = b.selected_tags[1] if len(b.selected_tags) > 1 else None
|
|
b.tertiary_tag = b.selected_tags[2] if len(b.selected_tags) > 2 else None
|
|
try:
|
|
b._update_commander_dict_with_selected_tags()
|
|
except Exception:
|
|
pass
|
|
# Bracket
|
|
bd = next((x for x in BRACKET_DEFINITIONS if int(getattr(x, 'level', 0)) == int(bracket)), None)
|
|
if bd is None:
|
|
raise ValueError(f"Invalid bracket level: {bracket}")
|
|
b.bracket_definition = bd
|
|
b.bracket_level = bd.level
|
|
b.bracket_name = bd.name
|
|
b.bracket_limits = dict(getattr(bd, 'limits', {}))
|
|
# Ideals
|
|
b.ideal_counts = {k: int(v) for k, v in (ideals or {}).items()}
|
|
# Data load
|
|
b.determine_color_identity()
|
|
b.setup_dataframes()
|
|
# Stages
|
|
stages = _make_stages(b)
|
|
ctx = {
|
|
"builder": b,
|
|
"logs": logs,
|
|
"stages": stages,
|
|
"idx": 0,
|
|
"last_log_idx": 0,
|
|
"csv_path": None,
|
|
"txt_path": None,
|
|
"snapshot": None,
|
|
}
|
|
return ctx
|
|
|
|
|
|
def _snapshot_builder(b: DeckBuilder) -> Dict[str, Any]:
|
|
"""Capture mutable state needed to rerun a stage."""
|
|
snap: Dict[str, Any] = {}
|
|
# Core collections
|
|
snap["card_library"] = copy.deepcopy(getattr(b, 'card_library', {}))
|
|
snap["tag_counts"] = copy.deepcopy(getattr(b, 'tag_counts', {}))
|
|
snap["_card_name_tags_index"] = copy.deepcopy(getattr(b, '_card_name_tags_index', {}))
|
|
snap["suggested_lands_queue"] = copy.deepcopy(getattr(b, 'suggested_lands_queue', []))
|
|
# Caches and pools
|
|
try:
|
|
if getattr(b, '_combined_cards_df', None) is not None:
|
|
snap["_combined_cards_df"] = b._combined_cards_df.copy(deep=True)
|
|
except Exception:
|
|
snap["_combined_cards_df"] = None
|
|
try:
|
|
if getattr(b, '_full_cards_df', None) is not None:
|
|
snap["_full_cards_df"] = b._full_cards_df.copy(deep=True)
|
|
except Exception:
|
|
snap["_full_cards_df"] = None
|
|
snap["_color_source_matrix_baseline"] = copy.deepcopy(getattr(b, '_color_source_matrix_baseline', None))
|
|
snap["_color_source_matrix_cache"] = copy.deepcopy(getattr(b, '_color_source_matrix_cache', None))
|
|
snap["_color_source_cache_dirty"] = getattr(b, '_color_source_cache_dirty', True)
|
|
snap["_spell_pip_weights_cache"] = copy.deepcopy(getattr(b, '_spell_pip_weights_cache', None))
|
|
snap["_spell_pip_cache_dirty"] = getattr(b, '_spell_pip_cache_dirty', True)
|
|
return snap
|
|
|
|
|
|
def _restore_builder(b: DeckBuilder, snap: Dict[str, Any]) -> None:
|
|
b.card_library = copy.deepcopy(snap.get("card_library", {}))
|
|
b.tag_counts = copy.deepcopy(snap.get("tag_counts", {}))
|
|
b._card_name_tags_index = copy.deepcopy(snap.get("_card_name_tags_index", {}))
|
|
b.suggested_lands_queue = copy.deepcopy(snap.get("suggested_lands_queue", []))
|
|
if "_combined_cards_df" in snap:
|
|
b._combined_cards_df = snap["_combined_cards_df"]
|
|
if "_full_cards_df" in snap:
|
|
b._full_cards_df = snap["_full_cards_df"]
|
|
b._color_source_matrix_baseline = copy.deepcopy(snap.get("_color_source_matrix_baseline", None))
|
|
b._color_source_matrix_cache = copy.deepcopy(snap.get("_color_source_matrix_cache", None))
|
|
b._color_source_cache_dirty = bool(snap.get("_color_source_cache_dirty", True))
|
|
b._spell_pip_weights_cache = copy.deepcopy(snap.get("_spell_pip_weights_cache", None))
|
|
b._spell_pip_cache_dirty = bool(snap.get("_spell_pip_cache_dirty", True))
|
|
|
|
|
|
def run_stage(ctx: Dict[str, Any], rerun: bool = False) -> Dict[str, Any]:
|
|
b: DeckBuilder = ctx["builder"]
|
|
stages: List[Dict[str, Any]] = ctx["stages"]
|
|
logs: List[str] = ctx["logs"]
|
|
|
|
# If all stages done, finalize exports (interactive/manual build)
|
|
if ctx["idx"] >= len(stages):
|
|
if not ctx.get("csv_path") and hasattr(b, 'export_decklist_csv'):
|
|
try:
|
|
ctx["csv_path"] = b.export_decklist_csv() # type: ignore[attr-defined]
|
|
except Exception as e:
|
|
logs.append(f"CSV export failed: {e}")
|
|
if not ctx.get("txt_path") and hasattr(b, 'export_decklist_text'):
|
|
try:
|
|
import os as _os
|
|
base, _ext = _os.path.splitext(_os.path.basename(ctx.get("csv_path") or f"deck_{b.timestamp}.csv"))
|
|
ctx["txt_path"] = b.export_decklist_text(filename=base + '.txt') # type: ignore[attr-defined]
|
|
# Export the run configuration JSON for manual builds
|
|
try:
|
|
b.export_run_config_json(directory='config', filename=base + '.json') # type: ignore[attr-defined]
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
logs.append(f"Text export failed: {e}")
|
|
# Build structured summary for UI
|
|
summary = None
|
|
try:
|
|
if hasattr(b, 'build_deck_summary'):
|
|
summary = b.build_deck_summary() # type: ignore[attr-defined]
|
|
except Exception:
|
|
summary = None
|
|
# Write sidecar summary JSON next to CSV (if available)
|
|
try:
|
|
if summary and ctx.get("csv_path"):
|
|
import os as _os
|
|
import json as _json
|
|
csv_path = ctx.get("csv_path")
|
|
base, _ = _os.path.splitext(csv_path)
|
|
sidecar = base + '.summary.json'
|
|
meta = {
|
|
"commander": getattr(b, 'commander_name', '') or getattr(b, 'commander', ''),
|
|
"tags": list(getattr(b, 'selected_tags', []) or []) or [t for t in [getattr(b, 'primary_tag', None), getattr(b, 'secondary_tag', None), getattr(b, 'tertiary_tag', None)] if t],
|
|
"bracket_level": getattr(b, 'bracket_level', None),
|
|
"csv": ctx.get("csv_path"),
|
|
"txt": ctx.get("txt_path"),
|
|
}
|
|
payload = {"meta": meta, "summary": summary}
|
|
with open(sidecar, 'w', encoding='utf-8') as f:
|
|
_json.dump(payload, f, ensure_ascii=False, indent=2)
|
|
except Exception:
|
|
pass
|
|
return {
|
|
"done": True,
|
|
"label": "Complete",
|
|
"log_delta": "",
|
|
"idx": len(stages),
|
|
"total": len(stages),
|
|
"csv_path": ctx.get("csv_path"),
|
|
"txt_path": ctx.get("txt_path"),
|
|
"summary": summary,
|
|
}
|
|
|
|
# Determine which stage index to run (rerun last visible, else current)
|
|
if rerun:
|
|
i = max(0, int(ctx.get("last_visible_idx", ctx["idx"]) or 1) - 1)
|
|
else:
|
|
i = ctx["idx"]
|
|
|
|
# Iterate forward until we find a stage that adds cards, skipping no-ops
|
|
while i < len(stages):
|
|
stage = stages[i]
|
|
label = stage["label"]
|
|
runner_name = stage["runner_name"]
|
|
|
|
# Take snapshot before executing; for rerun, restore first if we have one
|
|
if rerun and ctx.get("snapshot") is not None and i == max(0, int(ctx.get("last_visible_idx", ctx["idx"]) or 1) - 1):
|
|
_restore_builder(b, ctx["snapshot"]) # restore to pre-stage state
|
|
snap_before = _snapshot_builder(b)
|
|
|
|
# Run the stage and capture logs delta
|
|
start_log = len(logs)
|
|
fn = getattr(b, runner_name, None)
|
|
if callable(fn):
|
|
try:
|
|
fn()
|
|
except Exception as e:
|
|
logs.append(f"Stage '{label}' failed: {e}")
|
|
else:
|
|
logs.append(f"Runner not available: {runner_name}")
|
|
delta_log = "\n".join(logs[start_log:])
|
|
|
|
# Compute added cards based on snapshot
|
|
try:
|
|
prev_lib = snap_before.get("card_library", {}) if isinstance(snap_before, dict) else {}
|
|
added_cards: list[dict] = []
|
|
for name, entry in b.card_library.items():
|
|
try:
|
|
prev_entry = prev_lib.get(name)
|
|
prev_count = int(prev_entry.get('Count', 0)) if isinstance(prev_entry, dict) else 0
|
|
new_count = int(entry.get('Count', 1))
|
|
delta_count = max(0, new_count - prev_count)
|
|
if delta_count <= 0:
|
|
continue
|
|
role = str(entry.get('Role') or '').strip()
|
|
sub_role = str(entry.get('SubRole') or '').strip()
|
|
added_by = str(entry.get('AddedBy') or '').strip()
|
|
trig = str(entry.get('TriggerTag') or '').strip()
|
|
parts: list[str] = []
|
|
if role:
|
|
parts.append(role)
|
|
if sub_role:
|
|
parts.append(sub_role)
|
|
if added_by:
|
|
parts.append(f"by {added_by}")
|
|
if trig:
|
|
parts.append(f"tag: {trig}")
|
|
reason = " • ".join(parts)
|
|
added_cards.append({
|
|
"name": name,
|
|
"count": delta_count,
|
|
"reason": reason,
|
|
"role": role,
|
|
"sub_role": sub_role,
|
|
"trigger_tag": trig,
|
|
})
|
|
except Exception:
|
|
continue
|
|
added_cards.sort(key=lambda x: (x.get('reason') or '', x['name']))
|
|
except Exception:
|
|
added_cards = []
|
|
|
|
# If this stage added cards, present it and advance idx
|
|
if added_cards:
|
|
ctx["snapshot"] = snap_before # snapshot for rerun
|
|
ctx["idx"] = i + 1
|
|
ctx["last_visible_idx"] = i + 1
|
|
return {
|
|
"done": False,
|
|
"label": label,
|
|
"log_delta": delta_log,
|
|
"added_cards": added_cards,
|
|
"idx": i + 1,
|
|
"total": len(stages),
|
|
}
|
|
|
|
# No cards added: skip showing this stage and advance to next
|
|
i += 1
|
|
# Continue loop to auto-advance
|
|
|
|
# If we reached here, all remaining stages were no-ops; finalize exports
|
|
ctx["idx"] = len(stages)
|
|
if not ctx.get("csv_path") and hasattr(b, 'export_decklist_csv'):
|
|
try:
|
|
ctx["csv_path"] = b.export_decklist_csv() # type: ignore[attr-defined]
|
|
except Exception as e:
|
|
logs.append(f"CSV export failed: {e}")
|
|
if not ctx.get("txt_path") and hasattr(b, 'export_decklist_text'):
|
|
try:
|
|
import os as _os
|
|
base, _ext = _os.path.splitext(_os.path.basename(ctx.get("csv_path") or f"deck_{b.timestamp}.csv"))
|
|
ctx["txt_path"] = b.export_decklist_text(filename=base + '.txt') # type: ignore[attr-defined]
|
|
# Export the run configuration JSON for manual builds
|
|
try:
|
|
b.export_run_config_json(directory='config', filename=base + '.json') # type: ignore[attr-defined]
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
logs.append(f"Text export failed: {e}")
|
|
# Build structured summary for UI
|
|
summary = None
|
|
try:
|
|
if hasattr(b, 'build_deck_summary'):
|
|
summary = b.build_deck_summary() # type: ignore[attr-defined]
|
|
except Exception:
|
|
summary = None
|
|
# Write sidecar summary JSON next to CSV (if available)
|
|
try:
|
|
if summary and ctx.get("csv_path"):
|
|
import os as _os
|
|
import json as _json
|
|
csv_path = ctx.get("csv_path")
|
|
base, _ = _os.path.splitext(csv_path)
|
|
sidecar = base + '.summary.json'
|
|
meta = {
|
|
"commander": getattr(b, 'commander_name', '') or getattr(b, 'commander', ''),
|
|
"tags": list(getattr(b, 'selected_tags', []) or []) or [t for t in [getattr(b, 'primary_tag', None), getattr(b, 'secondary_tag', None), getattr(b, 'tertiary_tag', None)] if t],
|
|
"bracket_level": getattr(b, 'bracket_level', None),
|
|
"csv": ctx.get("csv_path"),
|
|
"txt": ctx.get("txt_path"),
|
|
}
|
|
payload = {"meta": meta, "summary": summary}
|
|
with open(sidecar, 'w', encoding='utf-8') as f:
|
|
_json.dump(payload, f, ensure_ascii=False, indent=2)
|
|
except Exception:
|
|
pass
|
|
return {
|
|
"done": True,
|
|
"label": "Complete",
|
|
"log_delta": "",
|
|
"idx": len(stages),
|
|
"total": len(stages),
|
|
"csv_path": ctx.get("csv_path"),
|
|
"txt_path": ctx.get("txt_path"),
|
|
"summary": summary,
|
|
}
|