mirror of
https://github.com/mwisnowski/mtg_python_deckbuilder.git
synced 2025-09-22 04:50:46 +02:00

- Include/exclude cards feature complete with 300+ card knowledge base and intelligent fuzzy matching - Enhanced visual validation with warning icons and performance benchmarks (100% pass rate) - Mobile responsive design with bottom-floating controls, two-column layout, and horizontal scroll prevention - Dark theme confirmation modal for fuzzy matches with card preview and alternatives - Dual architecture support for web UI staging system and CLI direct build paths - All M3 checklist items completed: fuzzy match modal, enhanced algorithm, summary panel, mobile responsive, Playwright tests
2166 lines
96 KiB
Python
2166 lines
96 KiB
Python
from __future__ import annotations
|
||
|
||
from dataclasses import dataclass, field
|
||
from typing import Optional, List, Dict, Any, Callable, Tuple, Set
|
||
import pandas as pd
|
||
import math
|
||
import random
|
||
import re
|
||
import datetime
|
||
|
||
# Logging (must precede heavy module logic to ensure handlers ready)
|
||
import logging_util
|
||
|
||
# Phase 0 core primitives (fuzzy helpers, bracket definitions)
|
||
from .phases.phase0_core import (
|
||
_full_ratio, _top_matches,
|
||
EXACT_NAME_THRESHOLD, FIRST_WORD_THRESHOLD, MAX_PRESENTED_CHOICES,
|
||
BracketDefinition
|
||
)
|
||
# Include/exclude utilities (M1: Config + Validation + Persistence)
|
||
from .include_exclude_utils import (
|
||
IncludeExcludeDiagnostics,
|
||
fuzzy_match_card_name,
|
||
validate_list_sizes,
|
||
collapse_duplicates
|
||
)
|
||
from .phases.phase1_commander import CommanderSelectionMixin
|
||
from .phases.phase2_lands_basics import LandBasicsMixin
|
||
from .phases.phase2_lands_staples import LandStaplesMixin
|
||
from .phases.phase2_lands_kindred import LandKindredMixin
|
||
from .phases.phase2_lands_fetch import LandFetchMixin
|
||
from .phases.phase2_lands_duals import LandDualsMixin
|
||
from .phases.phase2_lands_triples import LandTripleMixin
|
||
from .phases.phase2_lands_misc import LandMiscUtilityMixin
|
||
from .phases.phase2_lands_optimize import LandOptimizationMixin
|
||
from .phases.phase3_creatures import CreatureAdditionMixin
|
||
from .phases.phase4_spells import SpellAdditionMixin
|
||
from .phases.phase5_color_balance import ColorBalanceMixin
|
||
from .phases.phase6_reporting import ReportingMixin
|
||
|
||
# Local application imports
|
||
from . import builder_constants as bc
|
||
from . import builder_utils as bu
|
||
import os
|
||
from settings import CSV_DIRECTORY
|
||
from file_setup.setup import initial_setup
|
||
|
||
# Create logger consistent with existing pattern (mirrors tagging/tagger.py usage)
|
||
logger = logging_util.logging.getLogger(__name__)
|
||
logger.setLevel(logging_util.LOG_LEVEL)
|
||
# Avoid duplicate handler attachment if reloaded (defensive; get_logger already guards but we mirror tagger.py approach)
|
||
if not any(isinstance(h, logging_util.logging.FileHandler) and getattr(h, 'baseFilename', '').endswith('deck_builder.log') for h in logger.handlers):
|
||
logger.addHandler(logging_util.file_handler)
|
||
if not any(isinstance(h, logging_util.logging.StreamHandler) for h in logger.handlers):
|
||
logger.addHandler(logging_util.stream_handler)
|
||
|
||
## Phase 0 extraction note: fuzzy helpers & BRACKET_DEFINITIONS imported above
|
||
|
||
## Phase 0 extraction: BracketDefinition & BRACKET_DEFINITIONS now imported
|
||
|
||
@dataclass
|
||
class DeckBuilder(
|
||
CommanderSelectionMixin,
|
||
LandBasicsMixin,
|
||
LandStaplesMixin,
|
||
LandKindredMixin,
|
||
LandFetchMixin,
|
||
LandDualsMixin,
|
||
LandTripleMixin,
|
||
LandMiscUtilityMixin,
|
||
LandOptimizationMixin,
|
||
CreatureAdditionMixin,
|
||
SpellAdditionMixin,
|
||
ColorBalanceMixin,
|
||
ReportingMixin
|
||
):
|
||
def build_deck_full(self):
|
||
"""Orchestrate the full deck build process, chaining all major phases."""
|
||
start_ts = datetime.datetime.now()
|
||
logger.info("=== Deck Build: BEGIN ===")
|
||
try:
|
||
# Ensure CSVs exist and are tagged before starting any deck build logic
|
||
try:
|
||
import time as _time
|
||
import json as _json
|
||
from datetime import datetime as _dt
|
||
cards_path = os.path.join(CSV_DIRECTORY, 'cards.csv')
|
||
flag_path = os.path.join(CSV_DIRECTORY, '.tagging_complete.json')
|
||
refresh_needed = False
|
||
if not os.path.exists(cards_path):
|
||
logger.info("cards.csv not found. Running initial setup and tagging before deck build...")
|
||
refresh_needed = True
|
||
else:
|
||
try:
|
||
age_seconds = _time.time() - os.path.getmtime(cards_path)
|
||
if age_seconds > 7 * 24 * 60 * 60:
|
||
logger.info("cards.csv is older than 7 days. Refreshing data before deck build...")
|
||
refresh_needed = True
|
||
except Exception:
|
||
pass
|
||
if not os.path.exists(flag_path):
|
||
logger.info("Tagging completion flag not found. Performing full tagging before deck build...")
|
||
refresh_needed = True
|
||
if refresh_needed:
|
||
initial_setup()
|
||
from tagging import tagger as _tagger
|
||
_tagger.run_tagging()
|
||
try:
|
||
os.makedirs(CSV_DIRECTORY, exist_ok=True)
|
||
with open(flag_path, 'w', encoding='utf-8') as _fh:
|
||
_json.dump({'tagged_at': _dt.now().isoformat(timespec='seconds')}, _fh)
|
||
except Exception:
|
||
logger.warning("Failed to write tagging completion flag (non-fatal).")
|
||
except Exception as e:
|
||
logger.error(f"Failed ensuring CSVs before deck build: {e}")
|
||
self.run_initial_setup()
|
||
self.run_deck_build_step1()
|
||
self.run_deck_build_step2()
|
||
self._run_land_build_steps()
|
||
# M2: Inject includes after lands, before creatures/spells
|
||
logger.info(f"DEBUG BUILD: About to inject includes. Include cards: {self.include_cards}")
|
||
self._inject_includes_after_lands()
|
||
logger.info(f"DEBUG BUILD: Finished injecting includes. Current deck size: {len(self.card_library)}")
|
||
if hasattr(self, 'add_creatures_phase'):
|
||
self.add_creatures_phase()
|
||
if hasattr(self, 'add_spells_phase'):
|
||
self.add_spells_phase()
|
||
if hasattr(self, 'post_spell_land_adjust'):
|
||
self.post_spell_land_adjust()
|
||
# Modular reporting phase
|
||
if hasattr(self, 'run_reporting_phase'):
|
||
self.run_reporting_phase()
|
||
# Immediately after content additions and summary, if compliance is enforced later,
|
||
# we want to display what would be swapped. For interactive runs, surface a dry prompt.
|
||
try:
|
||
# Compute a quick compliance snapshot here to hint at upcoming enforcement
|
||
if hasattr(self, 'compute_and_print_compliance') and not getattr(self, 'headless', False):
|
||
from deck_builder.brackets_compliance import evaluate_deck as _eval # type: ignore
|
||
bracket_key = str(getattr(self, 'bracket_name', '') or getattr(self, 'bracket_level', 'core')).lower()
|
||
commander = getattr(self, 'commander_name', None)
|
||
snap = _eval(self.card_library, commander_name=commander, bracket=bracket_key)
|
||
if snap.get('overall') == 'FAIL':
|
||
self.output_func("\nNote: Limits exceeded. You'll get a chance to review swaps next.")
|
||
except Exception:
|
||
pass
|
||
if hasattr(self, 'export_decklist_csv'):
|
||
# If user opted out of owned-only, silently load all owned files for marking
|
||
try:
|
||
if not self.use_owned_only and not self.owned_card_names:
|
||
self._load_all_owned_silent()
|
||
except Exception:
|
||
pass
|
||
csv_path = self.export_decklist_csv()
|
||
try:
|
||
import os as _os
|
||
base, _ext = _os.path.splitext(_os.path.basename(csv_path))
|
||
txt_path = self.export_decklist_text(filename=base + '.txt') # type: ignore[attr-defined]
|
||
# Display the text file contents for easy copy/paste to online deck builders
|
||
self._display_txt_contents(txt_path)
|
||
# Compute bracket compliance and save a JSON report alongside exports
|
||
try:
|
||
if hasattr(self, 'compute_and_print_compliance'):
|
||
report0 = self.compute_and_print_compliance(base_stem=base) # type: ignore[attr-defined]
|
||
# If non-compliant and interactive, offer enforcement now
|
||
try:
|
||
if isinstance(report0, dict) and report0.get('overall') == 'FAIL' and not getattr(self, 'headless', False):
|
||
from deck_builder.phases.phase6_reporting import ReportingMixin as _RM # type: ignore
|
||
if isinstance(self, _RM) and hasattr(self, 'enforce_and_reexport'):
|
||
self.output_func("One or more bracket limits exceeded. Enter to auto-resolve, or Ctrl+C to skip.")
|
||
try:
|
||
_ = self.input_func("")
|
||
except Exception:
|
||
pass
|
||
self.enforce_and_reexport(base_stem=base, mode='prompt') # type: ignore[attr-defined]
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
# If owned-only build is incomplete, generate recommendations
|
||
try:
|
||
total_cards = sum(int(v.get('Count', 1)) for v in self.card_library.values())
|
||
if self.use_owned_only and total_cards < 100:
|
||
missing = 100 - total_cards
|
||
rec_limit = int(math.ceil(1.5 * float(missing)))
|
||
self._generate_recommendations(base_stem=base, limit=rec_limit)
|
||
except Exception:
|
||
pass
|
||
# Also export a matching JSON config for replay (interactive builds only)
|
||
if not getattr(self, 'headless', False):
|
||
try:
|
||
# Choose config output dir: DECK_CONFIG dir > /app/config > ./config
|
||
import os as _os
|
||
cfg_path_env = _os.getenv('DECK_CONFIG')
|
||
cfg_dir = None
|
||
if cfg_path_env:
|
||
cfg_dir = _os.path.dirname(cfg_path_env) or '.'
|
||
elif _os.path.isdir('/app/config'):
|
||
cfg_dir = '/app/config'
|
||
else:
|
||
cfg_dir = 'config'
|
||
if cfg_dir:
|
||
_os.makedirs(cfg_dir, exist_ok=True)
|
||
self.export_run_config_json(directory=cfg_dir, filename=base + '.json') # type: ignore[attr-defined]
|
||
# Also, if DECK_CONFIG explicitly points to a file path, write exactly there too
|
||
if cfg_path_env:
|
||
cfg_dir2 = _os.path.dirname(cfg_path_env) or '.'
|
||
cfg_name2 = _os.path.basename(cfg_path_env)
|
||
_os.makedirs(cfg_dir2, exist_ok=True)
|
||
self.export_run_config_json(directory=cfg_dir2, filename=cfg_name2) # type: ignore[attr-defined]
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
logger.warning("Plaintext export failed (non-fatal)")
|
||
# If owned-only and deck not complete, print a note
|
||
try:
|
||
if self.use_owned_only:
|
||
total_cards = sum(int(v.get('Count', 1)) for v in self.card_library.values())
|
||
if total_cards < 100:
|
||
self.output_func(f"Note: deck is incomplete ({total_cards}/100). Not enough owned cards to fill the deck.")
|
||
except Exception:
|
||
pass
|
||
end_ts = datetime.datetime.now()
|
||
logger.info(f"=== Deck Build: COMPLETE in {(end_ts - start_ts).total_seconds():.2f}s ===")
|
||
except KeyboardInterrupt:
|
||
logger.warning("Deck build cancelled by user (KeyboardInterrupt).")
|
||
self.output_func("\nDeck build cancelled by user.")
|
||
except Exception as e:
|
||
logger.exception("Deck build failed with exception")
|
||
self.output_func(f"Deck build failed: {e}")
|
||
|
||
def _display_txt_contents(self, txt_path: str):
|
||
"""Display the contents of the exported .txt file for easy copy/paste to online deck builders."""
|
||
try:
|
||
import os
|
||
if not os.path.exists(txt_path):
|
||
self.output_func("Warning: Text file not found for display.")
|
||
return
|
||
|
||
with open(txt_path, 'r', encoding='utf-8') as f:
|
||
contents = f.read().strip()
|
||
|
||
if not contents:
|
||
self.output_func("Warning: Text file is empty.")
|
||
return
|
||
|
||
# Create a nice display format
|
||
filename = os.path.basename(txt_path)
|
||
separator = "=" * 60
|
||
|
||
self.output_func(f"\n{separator}")
|
||
self.output_func(f"DECK LIST - {filename}")
|
||
self.output_func("Ready for copy/paste to Moxfield, EDHREC, or other deck builders")
|
||
self.output_func(f"{separator}")
|
||
self.output_func(contents)
|
||
# self.output_func(f"{separator}")
|
||
# self.output_func(f"Deck list also saved to: {txt_path}")
|
||
self.output_func(f"{separator}\n")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Failed to display text file contents: {e}")
|
||
self.output_func(f"Warning: Could not display deck list contents. Check {txt_path} manually.")
|
||
|
||
def add_creatures_phase(self):
|
||
"""Run the creature addition phase (delegated to CreatureAdditionMixin)."""
|
||
if hasattr(super(), 'add_creatures_phase'):
|
||
return super().add_creatures_phase()
|
||
raise NotImplementedError("Creature addition phase not implemented.")
|
||
|
||
def add_spells_phase(self):
|
||
"""Run the spell addition phase (delegated to SpellAdditionMixin)."""
|
||
if hasattr(super(), 'add_spells_phase'):
|
||
return super().add_spells_phase()
|
||
raise NotImplementedError("Spell addition phase not implemented.")
|
||
# ---------------------------
|
||
# Lightweight confirmations (CLI pauses; web auto-continues)
|
||
# ---------------------------
|
||
def _pause(self, message: str = "Press Enter to continue...") -> None:
|
||
try:
|
||
_ = self.input_func(message)
|
||
except Exception:
|
||
pass
|
||
|
||
def confirm_primary_theme(self) -> None:
|
||
if getattr(self, 'primary_tag', None):
|
||
self.output_func(f"Primary Theme: {self.primary_tag}")
|
||
self._pause()
|
||
|
||
def confirm_secondary_theme(self) -> None:
|
||
if getattr(self, 'secondary_tag', None):
|
||
self.output_func(f"Secondary Theme: {self.secondary_tag}")
|
||
self._pause()
|
||
|
||
def confirm_tertiary_theme(self) -> None:
|
||
if getattr(self, 'tertiary_tag', None):
|
||
self.output_func(f"Tertiary Theme: {self.tertiary_tag}")
|
||
self._pause()
|
||
|
||
def confirm_ramp_spells(self) -> None:
|
||
self.output_func("Confirm Ramp")
|
||
self._pause()
|
||
|
||
def confirm_removal_spells(self) -> None:
|
||
self.output_func("Confirm Removal")
|
||
self._pause()
|
||
|
||
def confirm_wipes_spells(self) -> None:
|
||
self.output_func("Confirm Board Wipes")
|
||
self._pause()
|
||
|
||
def confirm_card_advantage_spells(self) -> None:
|
||
self.output_func("Confirm Card Advantage")
|
||
self._pause()
|
||
|
||
def confirm_protection_spells(self) -> None:
|
||
self.output_func("Confirm Protection")
|
||
self._pause()
|
||
# Commander core selection state
|
||
commander_name: str = ""
|
||
commander_row: Optional[pd.Series] = None
|
||
commander_tags: List[str] = field(default_factory=list)
|
||
|
||
# Tag prioritization
|
||
primary_tag: Optional[str] = None
|
||
secondary_tag: Optional[str] = None
|
||
tertiary_tag: Optional[str] = None
|
||
selected_tags: List[str] = field(default_factory=list)
|
||
# How to combine multiple selected tags when prioritizing cards: 'AND' or 'OR'
|
||
tag_mode: str = 'AND'
|
||
|
||
# Future deck config placeholders
|
||
color_identity: List[str] = field(default_factory=list) # raw list of color letters e.g. ['B','G']
|
||
color_identity_key: Optional[str] = None # canonical key form e.g. 'B, G'
|
||
color_identity_full: Optional[str] = None # human readable e.g. 'Golgari: Black/Green'
|
||
files_to_load: List[str] = field(default_factory=list) # csv file stems to load
|
||
synergy_profile: Dict[str, Any] = field(default_factory=dict)
|
||
deck_goal: Optional[str] = None
|
||
|
||
# Aggregated commander info (scalar fields)
|
||
commander_dict: Dict[str, Any] = field(default_factory=dict)
|
||
|
||
# Power bracket state (Deck Building Step 1)
|
||
bracket_level: Optional[int] = None
|
||
bracket_name: Optional[str] = None
|
||
bracket_limits: Dict[str, Optional[int]] = field(default_factory=dict)
|
||
bracket_definition: Optional[BracketDefinition] = None
|
||
|
||
# Cached data
|
||
_commander_df: Optional[pd.DataFrame] = None
|
||
_combined_cards_df: Optional[pd.DataFrame] = None
|
||
_full_cards_df: Optional[pd.DataFrame] = None # immutable snapshot of original combined pool
|
||
# Owned-cards mode
|
||
use_owned_only: bool = False
|
||
owned_card_names: set[str] = field(default_factory=set)
|
||
owned_files_selected: List[str] = field(default_factory=list)
|
||
# Soft preference: bias selection toward owned names without excluding others
|
||
prefer_owned: bool = False
|
||
|
||
# Include/Exclude Cards (M1: Full Configuration Support)
|
||
include_cards: List[str] = field(default_factory=list)
|
||
exclude_cards: List[str] = field(default_factory=list)
|
||
enforcement_mode: str = "warn" # "warn" | "strict"
|
||
allow_illegal: bool = False
|
||
fuzzy_matching: bool = True
|
||
# Diagnostics storage for include/exclude processing
|
||
include_exclude_diagnostics: Optional[Dict[str, Any]] = None
|
||
|
||
# Deck library (cards added so far) mapping name->record
|
||
card_library: Dict[str, Dict[str, Any]] = field(default_factory=dict)
|
||
# Tag tracking: counts of unique cards per tag (not per copy)
|
||
tag_counts: Dict[str,int] = field(default_factory=dict)
|
||
# Internal map name -> set of tags used for uniqueness checks
|
||
_card_name_tags_index: Dict[str,set] = field(default_factory=dict)
|
||
# Deferred suggested lands based on tags / conditions
|
||
suggested_lands_queue: List[Dict[str, Any]] = field(default_factory=list)
|
||
# Baseline color source matrix captured after land build, before spell adjustments
|
||
color_source_matrix_baseline: Optional[Dict[str, Dict[str,int]]] = None
|
||
# Live cached color source matrix (recomputed lazily when lands change)
|
||
_color_source_matrix_cache: Optional[Dict[str, Dict[str,int]]] = None
|
||
_color_source_cache_dirty: bool = True
|
||
# Cached spell pip weights (invalidate on non-land changes)
|
||
_spell_pip_weights_cache: Optional[Dict[str, float]] = None
|
||
_spell_pip_cache_dirty: bool = True
|
||
|
||
# Build/session timestamp for export naming
|
||
timestamp: str = field(default_factory=lambda: datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
|
||
|
||
# IO injection for testing
|
||
input_func: Callable[[str], str] = field(default=lambda prompt: input(prompt))
|
||
output_func: Callable[[str], None] = field(default=lambda msg: print(msg))
|
||
# Random support (no external seeding)
|
||
_rng: Any = field(default=None, repr=False)
|
||
|
||
# Logging / output behavior
|
||
log_outputs: bool = True # if True, mirror output_func messages into logger at INFO level
|
||
_original_output_func: Optional[Callable[[str], None]] = field(default=None, repr=False)
|
||
|
||
# Chosen land counts (only fetches are tracked/exported; others vary randomly)
|
||
fetch_count: Optional[int] = None
|
||
# Whether this build is running in headless mode (suppress some interactive-only exports)
|
||
headless: bool = False
|
||
|
||
def __post_init__(self):
|
||
"""Post-init hook to wrap the provided output function so that all user-facing
|
||
messages are also captured in the central log (at INFO level) unless disabled.
|
||
"""
|
||
if self.log_outputs:
|
||
# Preserve original
|
||
self._original_output_func = self.output_func
|
||
|
||
def _wrapped(msg: str):
|
||
# Collapse excessive blank lines for log readability, but keep printing original
|
||
log_msg = msg.rstrip()
|
||
if log_msg:
|
||
logger.info(log_msg)
|
||
self._original_output_func(msg)
|
||
|
||
self.output_func = _wrapped
|
||
|
||
def _run_land_build_steps(self):
|
||
"""Run all land build steps (1-8) in order, logging progress."""
|
||
for step in range(1, 9):
|
||
m = getattr(self, f"run_land_step{step}", None)
|
||
if callable(m):
|
||
logger.info(f"Land Step {step}: begin")
|
||
m()
|
||
logger.info(f"Land Step {step}: complete (current land count {self._current_land_count() if hasattr(self, '_current_land_count') else 'n/a'})")
|
||
|
||
def _generate_recommendations(self, base_stem: str, limit: int):
|
||
"""Silently build a full (non-owned-filtered) deck with same choices and export top recommendations.
|
||
|
||
- Uses same commander, tags, bracket, and ideal_counts.
|
||
- Excludes any cards already in this deck's library.
|
||
- Exports CSV and TXT to deck_files with suffix _recommendations.
|
||
"""
|
||
try:
|
||
# Nothing to recommend if limit <= 0 or no commander
|
||
if limit <= 0 or not self.commander_row is not None:
|
||
return
|
||
# Prepare a quiet builder
|
||
def _silent_out(_msg: str) -> None:
|
||
return None
|
||
def _silent_in(_prompt: str) -> str:
|
||
return ""
|
||
rec = DeckBuilder(input_func=_silent_in, output_func=_silent_out, log_outputs=False, headless=True)
|
||
# Carry over selections
|
||
rec.commander_name = self.commander_name
|
||
rec.commander_row = self.commander_row
|
||
rec.commander_tags = list(self.commander_tags)
|
||
rec.primary_tag = self.primary_tag
|
||
rec.secondary_tag = self.secondary_tag
|
||
rec.tertiary_tag = self.tertiary_tag
|
||
rec.selected_tags = list(self.selected_tags)
|
||
rec.bracket_definition = self.bracket_definition
|
||
rec.bracket_level = self.bracket_level
|
||
rec.bracket_name = self.bracket_name
|
||
rec.bracket_limits = dict(self.bracket_limits) if self.bracket_limits else {}
|
||
rec.ideal_counts = dict(self.ideal_counts) if self.ideal_counts else {}
|
||
# Initialize commander dict (also adds commander to library)
|
||
try:
|
||
if rec.commander_row is not None:
|
||
rec._initialize_commander_dict(rec.commander_row)
|
||
except Exception:
|
||
pass
|
||
# Build on full pool (owned-only disabled by default)
|
||
rec.determine_color_identity()
|
||
rec.setup_dataframes()
|
||
# Ensure bracket applied and counts present
|
||
try:
|
||
rec.run_deck_build_step1()
|
||
except Exception:
|
||
pass
|
||
# Run the content-adding phases silently
|
||
try:
|
||
rec._run_land_build_steps()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
if hasattr(rec, 'add_creatures_phase'):
|
||
rec.add_creatures_phase()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
if hasattr(rec, 'add_spells_phase'):
|
||
rec.add_spells_phase()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
if hasattr(rec, 'post_spell_land_adjust'):
|
||
rec.post_spell_land_adjust()
|
||
except Exception:
|
||
pass
|
||
|
||
# Build recommendation subset excluding already-chosen names
|
||
chosen = set(self.card_library.keys())
|
||
rec_items = []
|
||
for nm, info in rec.card_library.items():
|
||
if nm not in chosen:
|
||
rec_items.append((nm, info))
|
||
if not rec_items:
|
||
return
|
||
# Cap to requested limit
|
||
rec_subset: Dict[str, Dict[str, Any]] = {}
|
||
for nm, info in rec_items[:max(0, int(limit))]:
|
||
rec_subset[nm] = info
|
||
|
||
# Temporarily export subset using the recommendation builder's context/snapshots
|
||
original_lib = rec.card_library
|
||
try:
|
||
rec.card_library = rec_subset
|
||
# Export CSV and TXT with suffix
|
||
rec.export_decklist_csv(directory='deck_files', filename=base_stem + '_recommendations.csv', suppress_output=True) # type: ignore[attr-defined]
|
||
rec.export_decklist_text(directory='deck_files', filename=base_stem + '_recommendations.txt', suppress_output=True) # type: ignore[attr-defined]
|
||
finally:
|
||
rec.card_library = original_lib
|
||
# Notify user succinctly
|
||
try:
|
||
self.output_func(f"Recommended but unowned cards in deck_files/{base_stem}_recommendations.csv")
|
||
except Exception:
|
||
pass
|
||
except Exception as _e:
|
||
try:
|
||
self.output_func(f"Failed to generate recommendations: {_e}")
|
||
except Exception:
|
||
pass
|
||
|
||
# ---------------------------
|
||
# Owned Cards Helpers
|
||
# ---------------------------
|
||
def _card_library_dir(self) -> str:
|
||
"""Return folder to read owned cards from, preferring 'owned_cards'.
|
||
|
||
Precedence:
|
||
- OWNED_CARDS_DIR env var
|
||
- CARD_LIBRARY_DIR env var (back-compat)
|
||
- 'owned_cards' if exists
|
||
- 'card_library' if exists (back-compat)
|
||
- default 'owned_cards'
|
||
"""
|
||
try:
|
||
import os as _os
|
||
# Env overrides
|
||
env_dir = _os.getenv('OWNED_CARDS_DIR') or _os.getenv('CARD_LIBRARY_DIR')
|
||
if env_dir:
|
||
return env_dir
|
||
# Prefer new name
|
||
if _os.path.isdir('owned_cards'):
|
||
return 'owned_cards'
|
||
if _os.path.isdir('card_library'):
|
||
return 'card_library'
|
||
return 'owned_cards'
|
||
except Exception:
|
||
return 'owned_cards'
|
||
|
||
def _find_owned_files(self) -> List[str]:
|
||
import os as _os
|
||
folder = self._card_library_dir()
|
||
try:
|
||
entries = []
|
||
if _os.path.isdir(folder):
|
||
for name in _os.listdir(folder):
|
||
p = _os.path.join(folder, name)
|
||
if _os.path.isfile(p) and name.lower().endswith(('.txt', '.csv')):
|
||
entries.append(p)
|
||
return sorted(entries)
|
||
except Exception:
|
||
return []
|
||
|
||
def _parse_owned_line(self, line: str) -> Optional[str]:
|
||
s = (line or '').strip()
|
||
if not s or s.startswith('#') or s.startswith('//'):
|
||
return None
|
||
parts = s.split()
|
||
if len(parts) >= 2 and (parts[0].isdigit() or (parts[0].lower().endswith('x') and parts[0][:-1].isdigit())):
|
||
s = ' '.join(parts[1:])
|
||
return s.strip() or None
|
||
|
||
def _read_txt_owned(self, path: str) -> List[str]:
|
||
out: List[str] = []
|
||
try:
|
||
with open(path, 'r', encoding='utf-8', errors='ignore') as f:
|
||
for line in f:
|
||
n = self._parse_owned_line(line)
|
||
if n:
|
||
out.append(n)
|
||
except Exception:
|
||
pass
|
||
return out
|
||
|
||
def _read_csv_owned(self, path: str) -> List[str]:
|
||
import csv as _csv
|
||
names: List[str] = []
|
||
try:
|
||
with open(path, 'r', encoding='utf-8', errors='ignore', newline='') as f:
|
||
try:
|
||
reader = _csv.DictReader(f)
|
||
headers = [h.strip() for h in (reader.fieldnames or [])]
|
||
candidates = [c for c in ('name', 'card', 'Card', 'card_name', 'Card Name') if c in headers]
|
||
if candidates:
|
||
key = candidates[0]
|
||
for row in reader:
|
||
val = (row.get(key) or '').strip()
|
||
if val:
|
||
names.append(val)
|
||
else:
|
||
f.seek(0)
|
||
reader2 = _csv.reader(f)
|
||
for row in reader2:
|
||
if not row:
|
||
continue
|
||
val = (row[0] or '').strip()
|
||
if val and val.lower() not in ('name', 'card', 'card name'):
|
||
names.append(val)
|
||
except Exception:
|
||
# Fallback plain reader
|
||
f.seek(0)
|
||
for line in f:
|
||
if line.strip():
|
||
names.append(line.strip())
|
||
except Exception:
|
||
pass
|
||
return names
|
||
|
||
def _load_owned_from_files(self, files: List[str]) -> set[str]:
|
||
names: List[str] = []
|
||
for p in files:
|
||
pl = p.lower()
|
||
try:
|
||
if pl.endswith('.txt'):
|
||
names.extend(self._read_txt_owned(p))
|
||
elif pl.endswith('.csv'):
|
||
names.extend(self._read_csv_owned(p))
|
||
except Exception:
|
||
continue
|
||
clean = {n.strip() for n in names if isinstance(n, str) and n.strip()}
|
||
return clean
|
||
|
||
def _prompt_use_owned_cards(self):
|
||
# Quick existence check: only prompt if any owned files are present
|
||
files = self._find_owned_files()
|
||
if not files:
|
||
# No owned lists present; skip prompting entirely
|
||
return
|
||
resp = self.input_func("Use only owned cards? (y/N): ").strip().lower()
|
||
self.use_owned_only = (resp in ('y', 'yes'))
|
||
if not self.use_owned_only:
|
||
return
|
||
self.output_func("Select owned card files by number (comma-separated), or press Enter to use all:")
|
||
for i, p in enumerate(files):
|
||
try:
|
||
base = p.replace('\\', '/').split('/')[-1]
|
||
except Exception:
|
||
base = p
|
||
self.output_func(f" [{i}] {base}")
|
||
raw = self.input_func("Selection: ").strip()
|
||
selected: List[str] = []
|
||
if not raw:
|
||
selected = files
|
||
else:
|
||
seen = set()
|
||
for tok in raw.split(','):
|
||
tok = tok.strip()
|
||
if tok.isdigit():
|
||
idx = int(tok)
|
||
if 0 <= idx < len(files) and idx not in seen:
|
||
selected.append(files[idx])
|
||
seen.add(idx)
|
||
if not selected:
|
||
self.output_func("No valid selections; using all owned files.")
|
||
selected = files
|
||
self.owned_files_selected = selected
|
||
self.owned_card_names = self._load_owned_from_files(selected)
|
||
self.output_func(f"Owned cards loaded: {len(self.owned_card_names)} unique names from {len(selected)} file(s).")
|
||
|
||
# Public helper for headless/tests: enable/disable owned-only and optionally preload files
|
||
def set_owned_mode(self, owned_only: bool, files: Optional[List[str]] = None):
|
||
self.use_owned_only = bool(owned_only)
|
||
if not self.use_owned_only:
|
||
self.owned_card_names = set()
|
||
self.owned_files_selected = []
|
||
return
|
||
if files is None:
|
||
return
|
||
# Normalize to existing files
|
||
valid: List[str] = []
|
||
for p in files:
|
||
try:
|
||
if os.path.isfile(p) and p.lower().endswith(('.txt', '.csv')):
|
||
valid.append(p)
|
||
else:
|
||
# try relative to card_library
|
||
alt = os.path.join(self._card_library_dir(), p)
|
||
if os.path.isfile(alt) and alt.lower().endswith(('.txt', '.csv')):
|
||
valid.append(alt)
|
||
except Exception:
|
||
continue
|
||
if valid:
|
||
self.owned_files_selected = valid
|
||
self.owned_card_names = self._load_owned_from_files(valid)
|
||
|
||
# Internal helper: when user opts out, silently load all owned files for CSV flagging
|
||
def _load_all_owned_silent(self):
|
||
try:
|
||
files = self._find_owned_files()
|
||
if not files:
|
||
return
|
||
self.owned_files_selected = files
|
||
self.owned_card_names = self._load_owned_from_files(files)
|
||
except Exception:
|
||
pass
|
||
|
||
# ---------------------------
|
||
# RNG Initialization
|
||
# ---------------------------
|
||
def _get_rng(self): # lazy init
|
||
if self._rng is None:
|
||
import random as _r
|
||
self._rng = _r
|
||
return self._rng
|
||
|
||
# ---------------------------
|
||
# Data Loading
|
||
# ---------------------------
|
||
def load_commander_data(self) -> pd.DataFrame:
|
||
if self._commander_df is not None:
|
||
return self._commander_df
|
||
df = pd.read_csv(
|
||
bc.COMMANDER_CSV_PATH,
|
||
converters=getattr(bc, "COMMANDER_CONVERTERS", None)
|
||
)
|
||
if "themeTags" not in df.columns:
|
||
df["themeTags"] = [[] for _ in range(len(df))]
|
||
if "creatureTypes" not in df.columns:
|
||
df["creatureTypes"] = [[] for _ in range(len(df))]
|
||
self._commander_df = df
|
||
return df
|
||
|
||
# ---------------------------
|
||
# Fuzzy Search Helpers
|
||
# ---------------------------
|
||
def _auto_accept(self, query: str, candidate: str) -> bool:
|
||
full = _full_ratio(query, candidate)
|
||
if full >= EXACT_NAME_THRESHOLD:
|
||
return True
|
||
q_first = query.strip().split()[0].lower() if query.strip() else ""
|
||
c_first = candidate.split()[0].lower()
|
||
if q_first and _full_ratio(q_first, c_first) >= FIRST_WORD_THRESHOLD:
|
||
return True
|
||
return False
|
||
|
||
def _gather_candidates(self, query: str, names: List[str]) -> List[tuple]:
|
||
scored = _top_matches(query, names, MAX_PRESENTED_CHOICES)
|
||
uniq: Dict[str, int] = {}
|
||
for n, s in scored:
|
||
uniq[n] = max(uniq.get(n, 0), s)
|
||
return sorted(uniq.items(), key=lambda x: x[1], reverse=True)
|
||
|
||
# ---------------------------
|
||
# Commander Dict Initialization
|
||
# ---------------------------
|
||
def _initialize_commander_dict(self, row: pd.Series):
|
||
def get(field: str, default=""):
|
||
return row.get(field, default) if isinstance(row, pd.Series) else default
|
||
|
||
mana_cost = get("manaCost", "")
|
||
mana_value = get("manaValue", get("cmc", None))
|
||
try:
|
||
if mana_value is None and isinstance(mana_cost, str):
|
||
mana_value = mana_cost.count("}") if "}" in mana_cost else None
|
||
except Exception:
|
||
pass
|
||
|
||
color_identity_raw = get("colorIdentity", get("colors", []))
|
||
if isinstance(color_identity_raw, str):
|
||
stripped = color_identity_raw.strip("[] ")
|
||
if "," in stripped:
|
||
color_identity = [c.strip(" '\"") for c in stripped.split(",")]
|
||
else:
|
||
color_identity = list(stripped)
|
||
else:
|
||
color_identity = color_identity_raw if isinstance(color_identity_raw, list) else []
|
||
|
||
colors_field = get("colors", color_identity)
|
||
if isinstance(colors_field, str):
|
||
colors = list(colors_field)
|
||
else:
|
||
colors = colors_field if isinstance(colors_field, list) else []
|
||
|
||
type_line = get("type", get("type_line", ""))
|
||
creature_types = get("creatureTypes", [])
|
||
if isinstance(creature_types, str):
|
||
creature_types = [s.strip() for s in creature_types.split(",") if s.strip()]
|
||
|
||
text_field = get("text", get("oracleText", ""))
|
||
if isinstance(text_field, str):
|
||
text_field = text_field.replace("\\n", "\n")
|
||
|
||
power = get("power", "")
|
||
toughness = get("toughness", "")
|
||
themes = get("themeTags", [])
|
||
if isinstance(themes, str):
|
||
themes = [t.strip() for t in themes.split(",") if t.strip()]
|
||
|
||
cmc = get("cmc", mana_value if mana_value is not None else 0.0)
|
||
try:
|
||
cmc = float(cmc) if cmc not in ("", None) else 0.0
|
||
except Exception:
|
||
cmc = 0.0
|
||
|
||
self.commander_dict = {
|
||
"Commander Name": self.commander_name,
|
||
"Mana Cost": mana_cost,
|
||
"Mana Value": mana_value,
|
||
"Color Identity": color_identity,
|
||
"Colors": colors,
|
||
"Type": type_line,
|
||
"Creature Types": creature_types,
|
||
"Text": text_field,
|
||
"Power": power,
|
||
"Toughness": toughness,
|
||
"Themes": themes,
|
||
"CMC": cmc,
|
||
}
|
||
# Ensure commander added to card library
|
||
try:
|
||
self.add_card(
|
||
card_name=self.commander_name,
|
||
card_type=type_line,
|
||
mana_cost=mana_cost,
|
||
mana_value=cmc,
|
||
creature_types=creature_types if isinstance(creature_types, list) else [],
|
||
tags=themes if isinstance(themes, list) else [],
|
||
is_commander=True
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
# ---------------------------
|
||
# Pretty Display
|
||
# ---------------------------
|
||
def _format_commander_pretty(self, row: pd.Series) -> str:
|
||
|
||
def norm(val):
|
||
if isinstance(val, list) and len(val) == 1:
|
||
val = val[0]
|
||
if val is None or (isinstance(val, float) and math.isnan(val)):
|
||
return "-"
|
||
return val
|
||
|
||
def join_list(val, sep=", "):
|
||
val = norm(val)
|
||
if isinstance(val, list):
|
||
return sep.join(str(x) for x in val) if val else "-"
|
||
return str(val)
|
||
|
||
name = norm(row.get("name", ""))
|
||
face_name = norm(row.get("faceName", name))
|
||
edhrec = norm(row.get("edhrecRank", "-"))
|
||
color_identity = join_list(row.get("colorIdentity", row.get("colors", [])), "")
|
||
colors = join_list(row.get("colors", []), "")
|
||
mana_cost = norm(row.get("manaCost", ""))
|
||
mana_value = norm(row.get("manaValue", row.get("cmc", "-")))
|
||
type_line = norm(row.get("type", row.get("type_line", "")))
|
||
creature_types = join_list(row.get("creatureTypes", []))
|
||
text_field = norm(row.get("text", row.get("oracleText", "")))
|
||
text_field = str(text_field).replace("\\n", "\n")
|
||
power = norm(row.get("power", "-"))
|
||
toughness = norm(row.get("toughness", "-"))
|
||
keywords = join_list(row.get("keywords", []))
|
||
raw_tags = row.get("themeTags", [])
|
||
if isinstance(raw_tags, str):
|
||
tags_list = [t.strip() for t in raw_tags.split(",") if t.strip()]
|
||
elif isinstance(raw_tags, list):
|
||
if len(raw_tags) == 1 and isinstance(raw_tags[0], list):
|
||
tags_list = raw_tags[0]
|
||
else:
|
||
tags_list = raw_tags
|
||
else:
|
||
tags_list = []
|
||
layout = norm(row.get("layout", "-"))
|
||
side = norm(row.get("side", "-"))
|
||
|
||
lines = [
|
||
"Selected Commander:",
|
||
f"Name: {name}",
|
||
f"Face Name: {face_name}",
|
||
f"EDHREC Rank: {edhrec}",
|
||
f"Color Identity: {color_identity}",
|
||
f"Colors: {colors}",
|
||
f"Mana Cost: {mana_cost}",
|
||
f"Mana Value: {mana_value}",
|
||
f"Type: {type_line}",
|
||
f"Creature Types: {creature_types}",
|
||
f"Power/Toughness: {power}/{toughness}",
|
||
f"Keywords: {keywords}",
|
||
f"Layout: {layout}",
|
||
f"Side: {side}",
|
||
]
|
||
if tags_list:
|
||
lines.append("Theme Tags:")
|
||
for t in tags_list:
|
||
lines.append(f" - {t}")
|
||
else:
|
||
lines.append("Theme Tags: -")
|
||
lines.extend([
|
||
"Text:",
|
||
text_field,
|
||
""
|
||
])
|
||
return "\n".join(lines)
|
||
|
||
def _present_commander_and_confirm(self, df: pd.DataFrame, name: str) -> bool:
|
||
row = df[df["name"] == name].iloc[0]
|
||
pretty = self._format_commander_pretty(row)
|
||
self.output_func("\n" + pretty)
|
||
while True:
|
||
resp = self.input_func("Is this the commander you want? (y/n): ").strip().lower()
|
||
if resp in ("y", "yes"):
|
||
self._apply_commander_selection(row)
|
||
return True
|
||
if resp in ("n", "no"):
|
||
return False
|
||
self.output_func("Please enter y or n.")
|
||
|
||
# (Commander selection, tag prioritization, and power bracket methods moved to CommanderSelectionMixin in phases/phase1_commander.py)
|
||
|
||
# ---------------------------
|
||
# Color Identity & Card Pool Loading (New Step)
|
||
# ---------------------------
|
||
def _canonical_color_key(self, colors: List[str]) -> str:
|
||
"""Return canonical key like 'B, G, W' or 'COLORLESS'. Uses alphabetical ordering.
|
||
|
||
The legacy constants expect a specific ordering (alphabetical seems consistent in provided maps).
|
||
"""
|
||
if not colors:
|
||
return 'COLORLESS'
|
||
# Deduplicate & sort
|
||
uniq = sorted({c.strip().upper() for c in colors if c.strip()})
|
||
return ', '.join(uniq)
|
||
|
||
def determine_color_identity(self) -> Tuple[str, List[str]]:
|
||
"""Determine color identity key/full name and derive csv file list.
|
||
|
||
Returns (color_identity_full, files_to_load).
|
||
"""
|
||
if self.commander_row is None:
|
||
raise RuntimeError("Commander must be selected before determining color identity.")
|
||
|
||
raw_ci = self.commander_row.get('colorIdentity')
|
||
if isinstance(raw_ci, list):
|
||
colors_list = raw_ci
|
||
elif isinstance(raw_ci, str) and raw_ci.strip():
|
||
# Could be formatted like "['B','G']" or 'BG'; attempt simple parsing
|
||
if ',' in raw_ci:
|
||
colors_list = [c.strip().strip("'[] ") for c in raw_ci.split(',') if c.strip().strip("'[] ")]
|
||
else:
|
||
colors_list = [c for c in raw_ci if c.isalpha()]
|
||
else:
|
||
# Fallback to 'colors' field or treat as colorless
|
||
alt = self.commander_row.get('colors')
|
||
if isinstance(alt, list):
|
||
colors_list = alt
|
||
elif isinstance(alt, str) and alt.strip():
|
||
colors_list = [c for c in alt if c.isalpha()]
|
||
else:
|
||
colors_list = []
|
||
|
||
self.color_identity = [c.upper() for c in colors_list]
|
||
self.color_identity_key = self._canonical_color_key(self.color_identity)
|
||
|
||
# Match against maps
|
||
full = None
|
||
load_files: List[str] = []
|
||
key = self.color_identity_key
|
||
if key in bc.MONO_COLOR_MAP:
|
||
full, load_files = bc.MONO_COLOR_MAP[key]
|
||
elif key in bc.DUAL_COLOR_MAP:
|
||
info = bc.DUAL_COLOR_MAP[key]
|
||
full, load_files = info[0], info[2]
|
||
elif key in bc.TRI_COLOR_MAP:
|
||
info = bc.TRI_COLOR_MAP[key]
|
||
full, load_files = info[0], info[2]
|
||
elif key in bc.OTHER_COLOR_MAP:
|
||
info = bc.OTHER_COLOR_MAP[key]
|
||
full, load_files = info[0], info[2]
|
||
else:
|
||
# Unknown / treat as colorless fallback
|
||
full, load_files = 'Unknown', ['colorless']
|
||
|
||
self.color_identity_full = full
|
||
self.files_to_load = load_files
|
||
return full, load_files
|
||
|
||
def setup_dataframes(self) -> pd.DataFrame:
|
||
"""Load all csv files for current color identity into one combined DataFrame.
|
||
|
||
Each file stem in files_to_load corresponds to csv_files/{stem}_cards.csv.
|
||
The result is cached and returned. Minimal validation only (non-empty, required columns exist if known).
|
||
"""
|
||
if self._combined_cards_df is not None:
|
||
return self._combined_cards_df
|
||
if not self.files_to_load:
|
||
# Attempt to determine if not yet done
|
||
self.determine_color_identity()
|
||
dfs = []
|
||
required = getattr(bc, 'CSV_REQUIRED_COLUMNS', [])
|
||
for stem in self.files_to_load:
|
||
path = f'csv_files/{stem}_cards.csv'
|
||
try:
|
||
df = pd.read_csv(path)
|
||
if required:
|
||
missing = [c for c in required if c not in df.columns]
|
||
if missing:
|
||
# Skip or still keep with warning; choose to warn
|
||
self.output_func(f"Warning: {path} missing columns: {missing}")
|
||
dfs.append(df)
|
||
except FileNotFoundError:
|
||
self.output_func(f"Warning: CSV file not found: {path}")
|
||
continue
|
||
if not dfs:
|
||
raise RuntimeError("No CSV files loaded for color identity.")
|
||
combined = pd.concat(dfs, axis=0, ignore_index=True)
|
||
# Drop duplicate rows by 'name' if column exists
|
||
if 'name' in combined.columns:
|
||
combined = combined.drop_duplicates(subset='name', keep='first')
|
||
# If owned-only mode, filter combined pool to owned names (case-insensitive)
|
||
if self.use_owned_only:
|
||
try:
|
||
owned_lower = {n.lower() for n in self.owned_card_names}
|
||
name_col = None
|
||
if 'name' in combined.columns:
|
||
name_col = 'name'
|
||
elif 'Card Name' in combined.columns:
|
||
name_col = 'Card Name'
|
||
if name_col is not None:
|
||
mask = combined[name_col].astype(str).str.lower().isin(owned_lower)
|
||
prev = len(combined)
|
||
combined = combined[mask].copy()
|
||
self.output_func(f"Owned-only mode: filtered card pool from {prev} to {len(combined)} records.")
|
||
else:
|
||
self.output_func("Owned-only mode: no recognizable name column to filter on; skipping filter.")
|
||
except Exception as _e:
|
||
self.output_func(f"Owned-only mode: failed to filter combined pool: {_e}")
|
||
# Soft prefer-owned does not filter the pool; biasing is applied later at selection time
|
||
|
||
# Apply exclude card filtering (M0.5: Phase 1 - Exclude Only)
|
||
if hasattr(self, 'exclude_cards') and self.exclude_cards:
|
||
try:
|
||
from deck_builder.include_exclude_utils import normalize_punctuation
|
||
|
||
# Find name column
|
||
name_col = None
|
||
if 'name' in combined.columns:
|
||
name_col = 'name'
|
||
elif 'Card Name' in combined.columns:
|
||
name_col = 'Card Name'
|
||
|
||
if name_col is not None:
|
||
excluded_matches = []
|
||
original_count = len(combined)
|
||
|
||
# Normalize exclude patterns for matching (with punctuation normalization)
|
||
normalized_excludes = {normalize_punctuation(pattern): pattern for pattern in self.exclude_cards}
|
||
|
||
# Create a mask to track which rows to exclude
|
||
exclude_mask = pd.Series([False] * len(combined), index=combined.index)
|
||
|
||
# Check each card against exclude patterns
|
||
for idx, card_name in combined[name_col].items():
|
||
if not exclude_mask[idx]: # Only check if not already excluded
|
||
normalized_card = normalize_punctuation(str(card_name))
|
||
|
||
# Check if this card matches any exclude pattern
|
||
for normalized_exclude, original_pattern in normalized_excludes.items():
|
||
if normalized_card == normalized_exclude:
|
||
excluded_matches.append({
|
||
'pattern': original_pattern,
|
||
'matched_card': str(card_name),
|
||
'similarity': 1.0
|
||
})
|
||
exclude_mask[idx] = True
|
||
break # Found a match, no need to check other patterns
|
||
|
||
# Apply the exclusions in one operation
|
||
if exclude_mask.any():
|
||
combined = combined[~exclude_mask].copy()
|
||
self.output_func(f"Excluded {len(excluded_matches)} cards from pool (was {original_count}, now {len(combined)})")
|
||
for match in excluded_matches[:5]: # Show first 5 matches
|
||
self.output_func(f" - Excluded '{match['matched_card']}' (pattern: '{match['pattern']}', similarity: {match['similarity']:.2f})")
|
||
if len(excluded_matches) > 5:
|
||
self.output_func(f" - ... and {len(excluded_matches) - 5} more")
|
||
else:
|
||
self.output_func(f"No cards matched exclude patterns: {', '.join(self.exclude_cards)}")
|
||
else:
|
||
self.output_func("Exclude mode: no recognizable name column to filter on; skipping exclude filter.")
|
||
except Exception as e:
|
||
self.output_func(f"Exclude mode: failed to filter excluded cards: {e}")
|
||
import traceback
|
||
self.output_func(f"Exclude traceback: {traceback.format_exc()}")
|
||
|
||
self._combined_cards_df = combined
|
||
# Preserve original snapshot for enrichment across subsequent removals
|
||
# Note: This snapshot should also exclude filtered cards to prevent them from being accessible
|
||
if self._full_cards_df is None:
|
||
self._full_cards_df = combined.copy()
|
||
return combined
|
||
|
||
# ---------------------------
|
||
# Include/Exclude Processing (M1: Config + Validation + Persistence)
|
||
# ---------------------------
|
||
def _inject_includes_after_lands(self) -> None:
|
||
"""
|
||
M2: Inject valid include cards after land selection, before creature/spell fill.
|
||
|
||
This method:
|
||
1. Processes include/exclude lists if not already done
|
||
2. Injects valid include cards that passed validation
|
||
3. Tracks diagnostics for category limit overrides
|
||
4. Ensures excluded cards cannot re-enter via downstream heuristics
|
||
"""
|
||
# Skip if no include cards specified
|
||
if not getattr(self, 'include_cards', None):
|
||
return
|
||
|
||
# Process includes/excludes if not already done
|
||
if not getattr(self, 'include_exclude_diagnostics', None):
|
||
self._process_includes_excludes()
|
||
|
||
# Get validated include cards
|
||
validated_includes = self.include_cards # Already processed by _process_includes_excludes
|
||
if not validated_includes:
|
||
return
|
||
|
||
# Initialize diagnostics if not present
|
||
if not self.include_exclude_diagnostics:
|
||
self.include_exclude_diagnostics = {}
|
||
|
||
# Track cards that will be injected
|
||
injected_cards = []
|
||
over_ideal_tracking = {}
|
||
|
||
logger.info(f"INCLUDE_INJECTION: Starting injection of {len(validated_includes)} include cards")
|
||
|
||
# Inject each valid include card
|
||
for card_name in validated_includes:
|
||
if not card_name or card_name in self.card_library:
|
||
continue # Skip empty names or already added cards
|
||
|
||
# Attempt to find card in available pool for metadata enrichment
|
||
card_info = self._find_card_in_pool(card_name)
|
||
if not card_info:
|
||
# Card not found in pool - could be missing or already excluded
|
||
continue
|
||
|
||
# Extract metadata
|
||
card_type = card_info.get('type', card_info.get('type_line', ''))
|
||
mana_cost = card_info.get('mana_cost', card_info.get('manaCost', ''))
|
||
mana_value = card_info.get('mana_value', card_info.get('manaValue', card_info.get('cmc', None)))
|
||
creature_types = card_info.get('creatureTypes', [])
|
||
theme_tags = card_info.get('themeTags', [])
|
||
|
||
# Normalize theme tags
|
||
if isinstance(theme_tags, str):
|
||
theme_tags = [t.strip() for t in theme_tags.split(',') if t.strip()]
|
||
elif not isinstance(theme_tags, list):
|
||
theme_tags = []
|
||
|
||
# Determine card category for over-ideal tracking
|
||
category = self._categorize_card_for_limits(card_type)
|
||
if category:
|
||
# Check if this include would exceed ideal counts
|
||
current_count = self._count_cards_in_category(category)
|
||
ideal_count = getattr(self, 'ideal_counts', {}).get(category, float('inf'))
|
||
if current_count >= ideal_count:
|
||
if category not in over_ideal_tracking:
|
||
over_ideal_tracking[category] = []
|
||
over_ideal_tracking[category].append(card_name)
|
||
|
||
# Add the include card
|
||
self.add_card(
|
||
card_name=card_name,
|
||
card_type=card_type,
|
||
mana_cost=mana_cost,
|
||
mana_value=mana_value,
|
||
creature_types=creature_types,
|
||
tags=theme_tags,
|
||
role='include',
|
||
added_by='include_injection'
|
||
)
|
||
|
||
injected_cards.append(card_name)
|
||
logger.info(f"INCLUDE_ADD: {card_name} (category: {category or 'unknown'})")
|
||
|
||
# Update diagnostics
|
||
self.include_exclude_diagnostics['include_added'] = injected_cards
|
||
self.include_exclude_diagnostics['include_over_ideal'] = over_ideal_tracking
|
||
|
||
# Output summary
|
||
if injected_cards:
|
||
self.output_func(f"\nInclude Cards Injected ({len(injected_cards)}):")
|
||
for card in injected_cards:
|
||
self.output_func(f" + {card}")
|
||
if over_ideal_tracking:
|
||
self.output_func("\nCategory Limit Overrides:")
|
||
for category, cards in over_ideal_tracking.items():
|
||
self.output_func(f" {category}: {', '.join(cards)}")
|
||
else:
|
||
self.output_func("No include cards were injected (already present or invalid)")
|
||
|
||
def _find_card_in_pool(self, card_name: str) -> Optional[Dict[str, any]]:
|
||
"""Find a card in the current card pool and return its metadata."""
|
||
if not card_name:
|
||
return None
|
||
|
||
# Check combined cards dataframe first
|
||
df = getattr(self, '_combined_cards_df', None)
|
||
if df is not None and not df.empty and 'name' in df.columns:
|
||
matches = df[df['name'].str.lower() == card_name.lower()]
|
||
if not matches.empty:
|
||
return matches.iloc[0].to_dict()
|
||
|
||
# Fallback to full cards dataframe if no match in combined
|
||
df_full = getattr(self, '_full_cards_df', None)
|
||
if df_full is not None and not df_full.empty and 'name' in df_full.columns:
|
||
matches = df_full[df_full['name'].str.lower() == card_name.lower()]
|
||
if not matches.empty:
|
||
return matches.iloc[0].to_dict()
|
||
|
||
return None
|
||
|
||
def _categorize_card_for_limits(self, card_type: str) -> Optional[str]:
|
||
"""Categorize a card type for ideal count tracking."""
|
||
if not card_type:
|
||
return None
|
||
|
||
type_lower = card_type.lower()
|
||
|
||
if 'creature' in type_lower:
|
||
return 'creatures'
|
||
elif 'land' in type_lower:
|
||
return 'lands'
|
||
elif any(spell_type in type_lower for spell_type in ['instant', 'sorcery', 'enchantment', 'artifact', 'planeswalker']):
|
||
# For spells, we could get more specific, but for now group as general spells
|
||
return 'spells'
|
||
else:
|
||
return 'other'
|
||
|
||
def _count_cards_in_category(self, category: str) -> int:
|
||
"""Count cards currently in deck library by category."""
|
||
if not category or not self.card_library:
|
||
return 0
|
||
|
||
count = 0
|
||
for name, entry in self.card_library.items():
|
||
card_type = entry.get('Card Type', '')
|
||
if not card_type:
|
||
continue
|
||
|
||
entry_category = self._categorize_card_for_limits(card_type)
|
||
if entry_category == category:
|
||
count += entry.get('Count', 1)
|
||
|
||
return count
|
||
|
||
def _process_includes_excludes(self) -> IncludeExcludeDiagnostics:
|
||
"""
|
||
Process and validate include/exclude card lists with fuzzy matching.
|
||
|
||
Returns:
|
||
IncludeExcludeDiagnostics: Complete diagnostics of processing results
|
||
"""
|
||
# Initialize diagnostics
|
||
diagnostics = IncludeExcludeDiagnostics(
|
||
missing_includes=[],
|
||
ignored_color_identity=[],
|
||
illegal_dropped=[],
|
||
illegal_allowed=[],
|
||
excluded_removed=[],
|
||
duplicates_collapsed={},
|
||
include_added=[],
|
||
include_over_ideal={},
|
||
fuzzy_corrections={},
|
||
confirmation_needed=[],
|
||
list_size_warnings={}
|
||
)
|
||
|
||
# 1. Collapse duplicates for both lists
|
||
include_unique, include_dupes = collapse_duplicates(self.include_cards)
|
||
exclude_unique, exclude_dupes = collapse_duplicates(self.exclude_cards)
|
||
|
||
# Update internal lists with unique versions
|
||
self.include_cards = include_unique
|
||
self.exclude_cards = exclude_unique
|
||
|
||
# Track duplicates in diagnostics
|
||
diagnostics.duplicates_collapsed.update(include_dupes)
|
||
diagnostics.duplicates_collapsed.update(exclude_dupes)
|
||
|
||
# 2. Validate list sizes
|
||
size_validation = validate_list_sizes(self.include_cards, self.exclude_cards)
|
||
if not size_validation['valid']:
|
||
# List too long - this is a critical error
|
||
for error in size_validation['errors']:
|
||
self.output_func(f"List size error: {error}")
|
||
|
||
diagnostics.list_size_warnings = size_validation.get('warnings', {})
|
||
|
||
# 3. Get available card names for fuzzy matching
|
||
available_cards = set()
|
||
if self._combined_cards_df is not None and not self._combined_cards_df.empty:
|
||
name_col = 'name' if 'name' in self._combined_cards_df.columns else 'Card Name'
|
||
if name_col in self._combined_cards_df.columns:
|
||
available_cards = set(self._combined_cards_df[name_col].astype(str))
|
||
|
||
# 4. Process includes with fuzzy matching and color identity validation
|
||
processed_includes = []
|
||
for card_name in self.include_cards:
|
||
if not card_name.strip():
|
||
continue
|
||
|
||
# Fuzzy match if enabled
|
||
if self.fuzzy_matching and available_cards:
|
||
match_result = fuzzy_match_card_name(card_name, available_cards)
|
||
if match_result.auto_accepted and match_result.matched_name:
|
||
if match_result.matched_name != card_name:
|
||
diagnostics.fuzzy_corrections[card_name] = match_result.matched_name
|
||
processed_includes.append(match_result.matched_name)
|
||
elif match_result.suggestions:
|
||
# Needs user confirmation
|
||
diagnostics.confirmation_needed.append({
|
||
"input": card_name,
|
||
"suggestions": match_result.suggestions,
|
||
"confidence": match_result.confidence
|
||
})
|
||
else:
|
||
# No good matches found
|
||
diagnostics.missing_includes.append(card_name)
|
||
else:
|
||
# Direct matching or fuzzy disabled
|
||
processed_includes.append(card_name)
|
||
|
||
# 5. Color identity validation for includes
|
||
if processed_includes and hasattr(self, 'color_identity') and self.color_identity:
|
||
# This would need commander color identity checking logic
|
||
# For now, accept all includes (color validation can be added later)
|
||
pass
|
||
|
||
# 6. Handle exclude conflicts (exclude overrides include)
|
||
final_includes = []
|
||
for include in processed_includes:
|
||
if include in self.exclude_cards:
|
||
diagnostics.excluded_removed.append(include)
|
||
self.output_func(f"Card '{include}' appears in both include and exclude lists - excluding takes precedence")
|
||
else:
|
||
final_includes.append(include)
|
||
|
||
# Update processed lists
|
||
self.include_cards = final_includes
|
||
|
||
# Store diagnostics for later use
|
||
self.include_exclude_diagnostics = diagnostics.__dict__
|
||
|
||
return diagnostics
|
||
|
||
def _get_fuzzy_suggestions(self, input_name: str, available_cards: Set[str], max_suggestions: int = 3) -> List[str]:
|
||
"""
|
||
Get fuzzy match suggestions for a card name.
|
||
|
||
Args:
|
||
input_name: User input card name
|
||
available_cards: Set of available card names
|
||
max_suggestions: Maximum number of suggestions to return
|
||
|
||
Returns:
|
||
List of suggested card names
|
||
"""
|
||
if not input_name or not available_cards:
|
||
return []
|
||
|
||
match_result = fuzzy_match_card_name(input_name, available_cards)
|
||
return match_result.suggestions[:max_suggestions]
|
||
|
||
def _enforce_includes_strict(self) -> None:
|
||
"""
|
||
Enforce strict mode for includes - raise error if any valid includes are missing.
|
||
|
||
Raises:
|
||
RuntimeError: If enforcement_mode is 'strict' and includes are missing
|
||
"""
|
||
if self.enforcement_mode != "strict":
|
||
return
|
||
|
||
if not self.include_exclude_diagnostics:
|
||
return
|
||
|
||
missing = self.include_exclude_diagnostics.get('missing_includes', [])
|
||
if missing:
|
||
missing_str = ', '.join(missing)
|
||
raise RuntimeError(f"Strict mode: Failed to include required cards: {missing_str}")
|
||
|
||
# ---------------------------
|
||
# Card Library Management
|
||
# ---------------------------
|
||
def add_card(self,
|
||
card_name: str,
|
||
card_type: str = '',
|
||
mana_cost: str = '',
|
||
mana_value: Optional[float] = None,
|
||
creature_types: Optional[List[str]] = None,
|
||
tags: Optional[List[str]] = None,
|
||
is_commander: bool = False,
|
||
role: Optional[str] = None,
|
||
sub_role: Optional[str] = None,
|
||
added_by: Optional[str] = None,
|
||
trigger_tag: Optional[str] = None,
|
||
synergy: Optional[int] = None) -> None:
|
||
"""Add (or increment) a card in the deck library.
|
||
|
||
Stores minimal metadata; duplicates increment Count. Basic lands allowed unlimited.
|
||
M2: Prevents re-entry of excluded cards via downstream heuristics.
|
||
"""
|
||
# M2: Exclude re-entry prevention - check if card is in exclude list
|
||
if not is_commander and hasattr(self, 'exclude_cards') and self.exclude_cards:
|
||
from .include_exclude_utils import normalize_punctuation
|
||
|
||
# Normalize the card name for comparison (with punctuation normalization)
|
||
normalized_card = normalize_punctuation(card_name)
|
||
normalized_excludes = {normalize_punctuation(exc): exc for exc in self.exclude_cards}
|
||
|
||
if normalized_card in normalized_excludes:
|
||
# Log the prevention but don't output to avoid spam
|
||
logger.info(f"EXCLUDE_REENTRY_PREVENTED: Blocked re-addition of excluded card '{card_name}' (pattern: '{normalized_excludes[normalized_card]}')")
|
||
return
|
||
|
||
# In owned-only mode, block adding cards not in owned list (except the commander itself)
|
||
try:
|
||
if getattr(self, 'use_owned_only', False) and not is_commander:
|
||
owned = getattr(self, 'owned_card_names', set()) or set()
|
||
if owned and card_name.lower() not in {n.lower() for n in owned}:
|
||
# Silently skip non-owned additions
|
||
return
|
||
except Exception:
|
||
pass
|
||
|
||
# Enforce color identity / card-pool legality: if the card is not present in the
|
||
# current dataframes snapshot (which is filtered by color identity), skip it.
|
||
# Allow the commander to bypass this check.
|
||
try:
|
||
if not is_commander:
|
||
# Permit basic lands even if they aren't present in the current CSV pool.
|
||
# Some distributions may omit basics from the per-color card CSVs, but they are
|
||
# always legal within color identity. We therefore bypass pool filtering for
|
||
# basic/snow basic lands and Wastes.
|
||
try:
|
||
basic_names = bu.basic_land_names()
|
||
except Exception:
|
||
basic_names = set()
|
||
|
||
if str(card_name) not in basic_names:
|
||
# Use filtered pool (_combined_cards_df) instead of unfiltered (_full_cards_df)
|
||
# This ensures exclude filtering is respected during card addition
|
||
df_src = self._combined_cards_df if self._combined_cards_df is not None else self._full_cards_df
|
||
if df_src is not None and not df_src.empty and 'name' in df_src.columns:
|
||
if df_src[df_src['name'].astype(str).str.lower() == str(card_name).lower()].empty:
|
||
# Not in the legal pool (likely off-color or unavailable)
|
||
try:
|
||
self.output_func(f"Skipped illegal/off-pool card: {card_name}")
|
||
except Exception:
|
||
pass
|
||
return
|
||
except Exception:
|
||
# If any unexpected error occurs, fall through (do not block legitimate adds)
|
||
pass
|
||
if creature_types is None:
|
||
creature_types = []
|
||
if tags is None:
|
||
tags = []
|
||
# Compute mana value if missing from cost (simple heuristic: count symbols between braces)
|
||
if mana_value is None and mana_cost:
|
||
try:
|
||
if '{' in mana_cost and '}' in mana_cost:
|
||
# naive parse: digits add numeric value; individual colored symbols count as 1
|
||
symbols = re.findall(r'\{([^}]+)\}', mana_cost)
|
||
total = 0
|
||
for sym in symbols:
|
||
if sym.isdigit():
|
||
total += int(sym)
|
||
else:
|
||
total += 1
|
||
mana_value = total
|
||
except Exception:
|
||
mana_value = None
|
||
entry = self.card_library.get(card_name)
|
||
if entry:
|
||
# Enforce Commander singleton rules: only basic lands may have multiple copies
|
||
try:
|
||
from deck_builder import builder_constants as bc
|
||
from settings import MULTIPLE_COPY_CARDS
|
||
except Exception:
|
||
MULTIPLE_COPY_CARDS = [] # type: ignore
|
||
is_land = 'land' in str(card_type or entry.get('Card Type','')).lower()
|
||
is_basic = False
|
||
try:
|
||
basic_list = getattr(bc, 'BASIC_LANDS', [])
|
||
is_basic = any(card_name == bl or card_name.startswith(bl + ' ') for bl in basic_list)
|
||
except Exception:
|
||
pass
|
||
if is_land and not is_basic:
|
||
# Non-basic land: do not increment
|
||
return
|
||
if card_name in MULTIPLE_COPY_CARDS:
|
||
# Explicit multi-copy list still restricted to 1 in Commander context
|
||
return
|
||
# Basic lands (or other allowed future exceptions) increment
|
||
entry['Count'] += 1
|
||
# Optionally enrich metadata if provided
|
||
if role is not None:
|
||
entry['Role'] = role
|
||
if sub_role is not None:
|
||
entry['SubRole'] = sub_role
|
||
if added_by is not None:
|
||
entry['AddedBy'] = added_by
|
||
if trigger_tag is not None:
|
||
entry['TriggerTag'] = trigger_tag
|
||
if synergy is not None:
|
||
entry['Synergy'] = synergy
|
||
else:
|
||
# If no tags passed attempt enrichment from filtered pool first, then full snapshot
|
||
if not tags:
|
||
# Use filtered pool (_combined_cards_df) instead of unfiltered (_full_cards_df)
|
||
# This ensures exclude filtering is respected during card enrichment
|
||
df_src = self._combined_cards_df if self._combined_cards_df is not None else self._full_cards_df
|
||
try:
|
||
if df_src is not None and not df_src.empty and 'name' in df_src.columns:
|
||
row_match = df_src[df_src['name'] == card_name]
|
||
if not row_match.empty:
|
||
raw_tags = row_match.iloc[0].get('themeTags', [])
|
||
if isinstance(raw_tags, list):
|
||
tags = [str(t).strip() for t in raw_tags if str(t).strip()]
|
||
elif isinstance(raw_tags, str) and raw_tags.strip():
|
||
# tolerate comma separated
|
||
parts = [p.strip().strip("'\"") for p in raw_tags.split(',')]
|
||
tags = [p for p in parts if p]
|
||
except Exception:
|
||
pass
|
||
# Enrich missing type and mana_cost for accurate categorization
|
||
if (not card_type) or (not mana_cost):
|
||
try:
|
||
# Use filtered pool (_combined_cards_df) instead of unfiltered (_full_cards_df)
|
||
# This ensures exclude filtering is respected during card enrichment
|
||
df_src = self._combined_cards_df if self._combined_cards_df is not None else self._full_cards_df
|
||
if df_src is not None and not df_src.empty and 'name' in df_src.columns:
|
||
row_match2 = df_src[df_src['name'].astype(str).str.lower() == str(card_name).lower()]
|
||
if not row_match2.empty:
|
||
if not card_type:
|
||
card_type = str(row_match2.iloc[0].get('type', row_match2.iloc[0].get('type_line', '')) or '')
|
||
if not mana_cost:
|
||
mana_cost = str(row_match2.iloc[0].get('mana_cost', row_match2.iloc[0].get('manaCost', '')) or '')
|
||
except Exception:
|
||
pass
|
||
# Normalize & dedupe tags
|
||
norm_tags: list[str] = []
|
||
seen_tag = set()
|
||
for t in tags:
|
||
if not isinstance(t, str):
|
||
t = str(t)
|
||
tt = t.strip()
|
||
if not tt or tt.lower() == 'nan':
|
||
continue
|
||
if tt not in seen_tag:
|
||
norm_tags.append(tt)
|
||
seen_tag.add(tt)
|
||
tags = norm_tags
|
||
self.card_library[card_name] = {
|
||
'Card Name': card_name,
|
||
'Card Type': card_type,
|
||
'Mana Cost': mana_cost,
|
||
'Mana Value': mana_value,
|
||
'Creature Types': creature_types,
|
||
'Tags': tags,
|
||
'Commander': is_commander,
|
||
'Count': 1,
|
||
'Role': (role or ('commander' if is_commander else None)),
|
||
'SubRole': sub_role,
|
||
'AddedBy': added_by,
|
||
'TriggerTag': trigger_tag,
|
||
'Synergy': synergy,
|
||
}
|
||
# Update tag counts for new unique card
|
||
tag_set = set(tags)
|
||
self._card_name_tags_index[card_name] = tag_set
|
||
for tg in tag_set:
|
||
self.tag_counts[tg] = self.tag_counts.get(tg, 0) + 1
|
||
# Keep commander dict CMC up to date if adding commander
|
||
if is_commander and self.commander_dict:
|
||
if mana_value is not None:
|
||
self.commander_dict['CMC'] = mana_value
|
||
# Remove this card from combined pool if present
|
||
self._remove_from_pool(card_name)
|
||
# Invalidate color source cache if land added
|
||
try:
|
||
if 'land' in str(card_type).lower():
|
||
self._color_source_cache_dirty = True
|
||
else:
|
||
self._spell_pip_cache_dirty = True
|
||
except Exception:
|
||
pass
|
||
|
||
def _remove_from_pool(self, card_name: str):
|
||
if self._combined_cards_df is None:
|
||
return
|
||
df = self._combined_cards_df
|
||
if 'name' in df.columns:
|
||
self._combined_cards_df = df[df['name'] != card_name]
|
||
elif 'Card Name' in df.columns:
|
||
self._combined_cards_df = df[df['Card Name'] != card_name]
|
||
|
||
# (Power bracket summary/printing now provided by mixin; _format_limits retained locally for reuse)
|
||
|
||
@staticmethod
|
||
def _format_limits(limits: Dict[str, Optional[int]]) -> str:
|
||
labels = {
|
||
"game_changers": "Game Changers",
|
||
"mass_land_denial": "Mass Land Denial",
|
||
"extra_turns": "Extra Turn Cards",
|
||
"tutors_nonland": "Nonland Tutors",
|
||
"two_card_combos": "Two-Card Combos"
|
||
}
|
||
lines = []
|
||
for key, label in labels.items():
|
||
val = limits.get(key, None)
|
||
if val is None:
|
||
lines.append(f" {label}: Unlimited")
|
||
else:
|
||
lines.append(f" {label}: {val}")
|
||
return "\n".join(lines)
|
||
|
||
def run_deck_build_step1(self):
|
||
self.select_power_bracket()
|
||
|
||
# ---------------------------
|
||
# Reporting Helper
|
||
# ---------------------------
|
||
def print_commander_dict_table(self):
|
||
if self.commander_row is None:
|
||
self.output_func("No commander selected.")
|
||
return
|
||
block = self._format_commander_pretty(self.commander_row)
|
||
self.output_func("\n" + block)
|
||
# New: show which CSV files (stems) were loaded for this color identity
|
||
if self.files_to_load:
|
||
file_list = ", ".join(f"{stem}_cards.csv" for stem in self.files_to_load)
|
||
self.output_func(f"Card Pool Files: {file_list}")
|
||
# Owned-only status
|
||
if getattr(self, 'use_owned_only', False):
|
||
try:
|
||
self.output_func(f"Owned-only mode: {len(self.owned_card_names)} cards from {len(self.owned_files_selected)} file(s)")
|
||
except Exception:
|
||
self.output_func("Owned-only mode: enabled")
|
||
if self.selected_tags:
|
||
self.output_func("Chosen Tags:")
|
||
if self.primary_tag:
|
||
self.output_func(f" Primary : {self.primary_tag}")
|
||
if self.secondary_tag:
|
||
self.output_func(f" Secondary: {self.secondary_tag}")
|
||
if self.tertiary_tag:
|
||
self.output_func(f" Tertiary : {self.tertiary_tag}")
|
||
self.output_func("")
|
||
if self.bracket_definition:
|
||
self.output_func(f"Power Bracket: {self.bracket_level} - {self.bracket_name}")
|
||
self.output_func(self._format_limits(self.bracket_limits))
|
||
self.output_func("")
|
||
|
||
# ---------------------------
|
||
# Orchestration
|
||
# ---------------------------
|
||
def run_initial_setup(self):
|
||
self.choose_commander()
|
||
self.select_commander_tags()
|
||
# Ask if user wants to limit pool to owned cards and gather selection
|
||
try:
|
||
self._prompt_use_owned_cards()
|
||
except Exception as e:
|
||
self.output_func(f"Owned-cards prompt failed (continuing without): {e}")
|
||
# New: color identity & card pool loading
|
||
try:
|
||
self.determine_color_identity()
|
||
self.setup_dataframes()
|
||
except Exception as e:
|
||
self.output_func(f"Failed to load color-identity card pool: {e}")
|
||
self.print_commander_dict_table()
|
||
|
||
def run_full_initial_with_bracket(self):
|
||
self.run_initial_setup()
|
||
self.run_deck_build_step1()
|
||
# (Further steps can be chained here)
|
||
self.print_commander_dict_table()
|
||
|
||
# ===========================
|
||
# Deck Building Step 2: Ideal Composition Counts
|
||
# ===========================
|
||
ideal_counts: Dict[str, int] = field(default_factory=dict)
|
||
|
||
def run_deck_build_step2(self) -> Dict[str, int]:
|
||
"""Determine ideal counts for general card categories (bracket‑agnostic baseline).
|
||
|
||
Prompts the user (Enter to keep default). Stores results in ideal_counts and returns it.
|
||
Categories:
|
||
ramp, lands, basic_lands, creatures, removal, wipes, card_advantage, protection
|
||
"""
|
||
# Initialize defaults from constants if not already present
|
||
defaults = {
|
||
'ramp': bc.DEFAULT_RAMP_COUNT,
|
||
'lands': bc.DEFAULT_LAND_COUNT,
|
||
'basic_lands': bc.DEFAULT_BASIC_LAND_COUNT,
|
||
'fetch_lands': getattr(bc, 'FETCH_LAND_DEFAULT_COUNT', 3),
|
||
'creatures': bc.DEFAULT_CREATURE_COUNT,
|
||
'removal': bc.DEFAULT_REMOVAL_COUNT,
|
||
'wipes': bc.DEFAULT_WIPES_COUNT,
|
||
'card_advantage': bc.DEFAULT_CARD_ADVANTAGE_COUNT,
|
||
'protection': bc.DEFAULT_PROTECTION_COUNT,
|
||
}
|
||
|
||
# Seed existing values if already set (allow re-run keeping previous choices)
|
||
for k, v in defaults.items():
|
||
if k not in self.ideal_counts:
|
||
self.ideal_counts[k] = v
|
||
|
||
self.output_func("\nSet Ideal Deck Composition Counts (press Enter to accept default/current):")
|
||
for key, prompt in bc.DECK_COMPOSITION_PROMPTS.items():
|
||
if key not in defaults: # skip price prompts & others for this step
|
||
continue
|
||
current_default = self.ideal_counts[key]
|
||
value = self._prompt_int_with_default(f"{prompt} ", current_default, minimum=0, maximum=200)
|
||
self.ideal_counts[key] = value
|
||
|
||
# Basic validation adjustments
|
||
# Ensure basic_lands <= lands
|
||
if self.ideal_counts['basic_lands'] > self.ideal_counts['lands']:
|
||
self.output_func("Adjusting basic lands to not exceed total lands.")
|
||
self.ideal_counts['basic_lands'] = self.ideal_counts['lands']
|
||
|
||
self._print_ideal_counts_summary()
|
||
return self.ideal_counts
|
||
|
||
# Helper to prompt integer values with default
|
||
def _prompt_int_with_default(self, prompt: str, default: int, minimum: int = 0, maximum: int = 999) -> int:
|
||
while True:
|
||
raw = self.input_func(f"{prompt}[{default}] ").strip()
|
||
if raw == "":
|
||
return default
|
||
if raw.isdigit():
|
||
val = int(raw)
|
||
if minimum <= val <= maximum:
|
||
return val
|
||
self.output_func(f"Enter a number between {minimum} and {maximum}, or press Enter for {default}.")
|
||
|
||
def _print_ideal_counts_summary(self):
|
||
self.output_func("\nIdeal Composition Targets:")
|
||
order = [
|
||
('ramp', 'Ramp Pieces'),
|
||
('lands', 'Total Lands'),
|
||
('basic_lands', 'Minimum Basic Lands'),
|
||
('fetch_lands', 'Fetch Lands'),
|
||
('creatures', 'Creatures'),
|
||
('removal', 'Spot Removal'),
|
||
('wipes', 'Board Wipes'),
|
||
('card_advantage', 'Card Advantage'),
|
||
('protection', 'Protection')
|
||
]
|
||
width = max(len(label) for _, label in order)
|
||
for key, label in order:
|
||
if key in self.ideal_counts:
|
||
self.output_func(f" {label.ljust(width)} : {self.ideal_counts[key]}")
|
||
|
||
# Public wrapper for external callers / tests
|
||
def print_ideal_counts(self):
|
||
if not self.ideal_counts:
|
||
self.output_func("Ideal counts not set. Run run_deck_build_step2() first.")
|
||
return
|
||
# Reuse formatting but with a simpler heading per user request
|
||
self.output_func("\nIdeal Counts:")
|
||
order = [
|
||
('ramp', 'Ramp'),
|
||
('lands', 'Total Lands'),
|
||
('basic_lands', 'Basic Lands (Min)'),
|
||
('fetch_lands', 'Fetch Lands'),
|
||
('creatures', 'Creatures'),
|
||
('removal', 'Spot Removal'),
|
||
('wipes', 'Board Wipes'),
|
||
('card_advantage', 'Card Advantage'),
|
||
('protection', 'Protection')
|
||
]
|
||
width = max(len(label) for _, label in order)
|
||
for key, label in order:
|
||
if key in self.ideal_counts:
|
||
self.output_func(f" {label.ljust(width)} : {self.ideal_counts[key]}")
|
||
|
||
# (Basic land logic moved to LandBasicsMixin in phases/phase2_lands_basics.py)
|
||
|
||
# ---------------------------
|
||
# Land Building Step 2: Staple Nonbasic Lands (NO Kindred yet)
|
||
# ---------------------------
|
||
def _current_land_count(self) -> int:
|
||
"""Return total number of land cards currently in the library (counts duplicates)."""
|
||
total = 0
|
||
for name, entry in self.card_library.items():
|
||
# If we recorded type when adding basics or staples, use that
|
||
ctype = entry.get('Card Type', '')
|
||
if ctype and 'land' in ctype.lower():
|
||
total += entry.get('Count', 1)
|
||
continue
|
||
# Else attempt enrichment from combined pool
|
||
if self._combined_cards_df is not None and 'name' in self._combined_cards_df.columns:
|
||
row = self._combined_cards_df[self._combined_cards_df['name'] == name]
|
||
if not row.empty:
|
||
type_field = str(row.iloc[0].get('type', '')).lower()
|
||
if 'land' in type_field:
|
||
total += entry.get('Count', 1)
|
||
return total
|
||
|
||
# (Staple land logic moved to LandStaplesMixin in phases/phase2_lands_staples.py)
|
||
|
||
# ---------------------------
|
||
# Land Building Step 3: Kindred / Creature-Type Focused Lands
|
||
# ---------------------------
|
||
# (Kindred land logic moved to LandKindredMixin in phases/phase2_lands_kindred.py)
|
||
|
||
# (Fetch land logic moved to LandFetchMixin in phases/phase2_lands_fetch.py)
|
||
|
||
# ---------------------------
|
||
# Internal Helper: Basic Land Floor
|
||
# ---------------------------
|
||
def _basic_floor(self, min_basic_cfg: int) -> int:
|
||
"""Return the minimum number of basics we will not trim below.
|
||
|
||
Currently defined as ceil(bc.BASIC_FLOOR_FACTOR * configured_basic_count). Centralizing here so
|
||
future tuning (e.g., dynamic by color count, bracket, or pip distribution) only
|
||
needs a single change. min_basic_cfg already accounts for ideal_counts override.
|
||
"""
|
||
try:
|
||
return max(0, int(math.ceil(bc.BASIC_FLOOR_FACTOR * float(min_basic_cfg))))
|
||
except Exception:
|
||
return max(0, min_basic_cfg)
|
||
|
||
# ---------------------------
|
||
# Land Building Step 5: Dual Lands (Two-Color Typed Lands)
|
||
# ---------------------------
|
||
# (Dual land logic moved to LandDualsMixin in phases/phase2_lands_duals.py)
|
||
|
||
# ---------------------------
|
||
# Land Building Step 6: Triple (Tri-Color) Typed Lands
|
||
# ---------------------------
|
||
def add_triple_lands(self, requested_count: Optional[int] = None):
|
||
"""Add three-color typed lands (e.g., Triomes) respecting land target and basic floor.
|
||
|
||
Logic parallels add_dual_lands but restricted to lands whose type line contains exactly
|
||
three distinct basic land types that are all within the deck's color identity.
|
||
Selection aims for 1-2 (default) with weighted random ordering among viable tri-color combos
|
||
to avoid always choosing the same land when multiple exist.
|
||
"""
|
||
if not self.files_to_load:
|
||
try:
|
||
self.determine_color_identity()
|
||
self.setup_dataframes()
|
||
except Exception as e:
|
||
self.output_func(f"Cannot add triple lands until color identity resolved: {e}")
|
||
return
|
||
colors = [c for c in self.color_identity if c in ['W','U','B','R','G']]
|
||
if len(colors) < 3:
|
||
self.output_func("Triple Lands: Fewer than three colors; skipping step 6.")
|
||
return
|
||
|
||
land_target = getattr(self, 'ideal_counts', {}).get('lands', getattr(bc, 'DEFAULT_LAND_COUNT', 35)) if getattr(self, 'ideal_counts', None) else getattr(bc, 'DEFAULT_LAND_COUNT', 35)
|
||
|
||
df = self._combined_cards_df
|
||
pool: list[str] = []
|
||
type_map: dict[str,str] = {}
|
||
tri_buckets: dict[frozenset[str], list[str]] = {}
|
||
if df is not None and not df.empty and {'name','type'}.issubset(df.columns):
|
||
try:
|
||
for _, row in df.iterrows():
|
||
try:
|
||
name = str(row.get('name',''))
|
||
if not name or name in self.card_library:
|
||
continue
|
||
tline = str(row.get('type','')).lower()
|
||
if 'land' not in tline:
|
||
continue
|
||
basics_found = [b for b in ['plains','island','swamp','mountain','forest'] if b in tline]
|
||
uniq_basics = []
|
||
for b in basics_found:
|
||
if b not in uniq_basics:
|
||
uniq_basics.append(b)
|
||
if len(uniq_basics) != 3:
|
||
continue
|
||
mapped = set()
|
||
for b in uniq_basics:
|
||
if b == 'plains':
|
||
mapped.add('W')
|
||
elif b == 'island':
|
||
mapped.add('U')
|
||
elif b == 'swamp':
|
||
mapped.add('B')
|
||
elif b == 'mountain':
|
||
mapped.add('R')
|
||
elif b == 'forest':
|
||
mapped.add('G')
|
||
if len(mapped) != 3:
|
||
continue
|
||
if not mapped.issubset(set(colors)):
|
||
continue
|
||
pool.append(name)
|
||
type_map[name] = tline
|
||
key = frozenset(mapped)
|
||
tri_buckets.setdefault(key, []).append(name)
|
||
except Exception:
|
||
continue
|
||
except Exception:
|
||
pass
|
||
pool = list(dict.fromkeys(pool))
|
||
if not pool:
|
||
self.output_func("Triple Lands: No candidate triple typed lands found.")
|
||
return
|
||
|
||
# Rank tri lands: those that can enter untapped / have cycling / fetchable (heuristic), else default
|
||
def rank(name: str) -> int:
|
||
lname = name.lower()
|
||
tline = type_map.get(name,'')
|
||
score = 0
|
||
# Triomes & similar premium typed tri-lands
|
||
if 'forest' in tline and 'plains' in tline and 'island' in tline:
|
||
score += 1 # minor bump per type already inherent; focus on special abilities
|
||
if 'cycling' in tline:
|
||
score += 3
|
||
if 'enters the battlefield tapped' not in tline:
|
||
score += 5
|
||
if 'trium' in lname or 'triome' in lname or 'panorama' in lname:
|
||
score += 4
|
||
if 'domain' in tline:
|
||
score += 1
|
||
return score
|
||
for key, names in tri_buckets.items():
|
||
names.sort(key=lambda n: rank(n), reverse=True)
|
||
if len(names) > 1:
|
||
rng_obj = getattr(self, 'rng', None)
|
||
try:
|
||
weighted = [(n, max(1, rank(n))+1) for n in names]
|
||
shuffled: list[str] = []
|
||
while weighted:
|
||
total = sum(w for _, w in weighted)
|
||
r = (rng_obj.random() if rng_obj else self._get_rng().random()) * total
|
||
acc = 0.0
|
||
for idx, (n, w) in enumerate(weighted):
|
||
acc += w
|
||
if r <= acc:
|
||
shuffled.append(n)
|
||
del weighted[idx]
|
||
break
|
||
tri_buckets[key] = shuffled
|
||
except Exception:
|
||
tri_buckets[key] = names
|
||
else:
|
||
tri_buckets[key] = names
|
||
min_basic_cfg = getattr(bc, 'DEFAULT_BASIC_LAND_COUNT', 20)
|
||
if hasattr(self, 'ideal_counts') and self.ideal_counts:
|
||
min_basic_cfg = self.ideal_counts.get('basic_lands', min_basic_cfg)
|
||
basic_floor = self._basic_floor(min_basic_cfg)
|
||
|
||
default_triple_target = getattr(bc, 'TRIPLE_LAND_DEFAULT_COUNT', 2)
|
||
remaining_capacity = max(0, land_target - self._current_land_count())
|
||
effective_default = min(default_triple_target, remaining_capacity if remaining_capacity>0 else len(pool), len(pool))
|
||
desired = effective_default if requested_count is None else max(0, int(requested_count))
|
||
if desired == 0:
|
||
self.output_func("Triple Lands: Desired count 0; skipping.")
|
||
return
|
||
if remaining_capacity == 0 and desired > 0:
|
||
slots_needed = desired
|
||
freed = 0
|
||
while freed < slots_needed and self._count_basic_lands() > basic_floor:
|
||
target_basic = self._choose_basic_to_trim()
|
||
if not target_basic or not self._decrement_card(target_basic):
|
||
break
|
||
freed += 1
|
||
if freed == 0:
|
||
desired = 0
|
||
remaining_capacity = max(0, land_target - self._current_land_count())
|
||
desired = min(desired, remaining_capacity, len(pool))
|
||
if desired <= 0:
|
||
self.output_func("Triple Lands: No capacity after trimming; skipping.")
|
||
return
|
||
|
||
chosen: list[str] = []
|
||
bucket_keys = list(tri_buckets.keys())
|
||
rng = getattr(self, 'rng', None)
|
||
try:
|
||
if rng:
|
||
rng.shuffle(bucket_keys) # type: ignore
|
||
else:
|
||
random.shuffle(bucket_keys)
|
||
except Exception:
|
||
pass
|
||
indices = {k:0 for k in bucket_keys}
|
||
while len(chosen) < desired and bucket_keys:
|
||
progressed = False
|
||
for k in list(bucket_keys):
|
||
idx = indices[k]
|
||
names = tri_buckets.get(k, [])
|
||
if idx >= len(names):
|
||
continue
|
||
name = names[idx]
|
||
indices[k] += 1
|
||
if name in chosen:
|
||
continue
|
||
chosen.append(name)
|
||
progressed = True
|
||
if len(chosen) >= desired:
|
||
break
|
||
if not progressed:
|
||
break
|
||
|
||
added: list[str] = []
|
||
for name in chosen:
|
||
if self._current_land_count() >= land_target:
|
||
break
|
||
self.add_card(name, card_type='Land')
|
||
added.append(name)
|
||
|
||
self.output_func("\nTriple Lands Added (Step 6):")
|
||
if not added:
|
||
self.output_func(" (None added)")
|
||
else:
|
||
width = max(len(n) for n in added)
|
||
for n in added:
|
||
self.output_func(f" {n.ljust(width)} : 1")
|
||
self.output_func(f" Land Count Now : {self._current_land_count()} / {land_target}")
|
||
|
||
def run_land_step6(self, requested_count: Optional[int] = None):
|
||
self.add_triple_lands(requested_count=requested_count)
|
||
self._enforce_land_cap(step_label="Triples (Step 6)")
|
||
|
||
# ---------------------------
|
||
# Land Building Step 7: Misc / Utility Lands
|
||
# ---------------------------
|
||
# (Misc utility land logic moved to LandMiscUtilityMixin in phases/phase2_lands_misc.py)
|
||
# (Tapped land optimization moved to LandOptimizationMixin in phases/phase2_lands_optimize.py)
|
||
|
||
# ---------------------------
|
||
# Tag-driven utility suggestions
|
||
# ---------------------------
|
||
def _build_tag_driven_land_suggestions(self):
|
||
|
||
# Delegate construction of suggestion dicts to utility module.
|
||
suggestions = bu.build_tag_driven_suggestions(self)
|
||
if suggestions:
|
||
self.suggested_lands_queue.extend(suggestions)
|
||
|
||
def _apply_land_suggestions_if_room(self):
|
||
if not self.suggested_lands_queue:
|
||
return
|
||
land_target = getattr(self, 'ideal_counts', {}).get('lands', getattr(bc, 'DEFAULT_LAND_COUNT', 35)) if getattr(self, 'ideal_counts', None) else getattr(bc, 'DEFAULT_LAND_COUNT', 35)
|
||
applied: list[dict] = []
|
||
remaining: list[dict] = []
|
||
min_basic_cfg = getattr(bc, 'DEFAULT_BASIC_LAND_COUNT', 20)
|
||
if hasattr(self, 'ideal_counts') and self.ideal_counts:
|
||
min_basic_cfg = self.ideal_counts.get('basic_lands', min_basic_cfg)
|
||
basic_floor = self._basic_floor(min_basic_cfg)
|
||
for sug in self.suggested_lands_queue:
|
||
name = sug['name']
|
||
if name in self.card_library:
|
||
continue
|
||
if not sug['condition'](self):
|
||
remaining.append(sug)
|
||
continue
|
||
if self._current_land_count() >= land_target:
|
||
if sug.get('defer_if_full'):
|
||
if self._count_basic_lands() > basic_floor:
|
||
target_basic = self._choose_basic_to_trim()
|
||
if not target_basic or not self._decrement_card(target_basic):
|
||
remaining.append(sug)
|
||
continue
|
||
else:
|
||
remaining.append(sug)
|
||
continue
|
||
self.add_card(name, card_type='Land')
|
||
if sug.get('flex') and name in self.card_library:
|
||
self.card_library[name]['Role'] = 'flex'
|
||
applied.append(sug)
|
||
self.suggested_lands_queue = remaining
|
||
if applied:
|
||
self.output_func("\nTag-Driven Utility Lands Added:")
|
||
width = max(len(s['name']) for s in applied)
|
||
for s in applied:
|
||
role = ' (flex)' if s.get('flex') else ''
|
||
self.output_func(f" {s['name'].ljust(width)} : 1 {s['reason']}{role}")
|
||
|
||
# ---------------------------
|
||
# (Color balance helpers & post-spell adjustment moved to ColorBalanceMixin)
|
||
|
||
# ---------------------------
|
||
# Land Cap Enforcement (applies after every non-basic step)
|
||
# ---------------------------
|
||
def _basic_land_names(self) -> set:
|
||
"""Return set of all basic (and snow basic) land names plus Wastes."""
|
||
|
||
return bu.basic_land_names()
|
||
|
||
def _count_basic_lands(self) -> int:
|
||
"""Count total copies of basic lands currently in the library."""
|
||
|
||
return bu.count_basic_lands(self.card_library)
|
||
|
||
def _choose_basic_to_trim(self) -> Optional[str]:
|
||
"""Return a basic land name to trim (highest count) or None."""
|
||
|
||
return bu.choose_basic_to_trim(self.card_library)
|
||
|
||
def _decrement_card(self, name: str) -> bool:
|
||
entry = self.card_library.get(name)
|
||
if not entry:
|
||
return False
|
||
cnt = entry.get('Count', 1)
|
||
was_land = 'land' in str(entry.get('Card Type','')).lower()
|
||
was_non_land = not was_land
|
||
if cnt <= 1:
|
||
# remove entire entry
|
||
try:
|
||
del self.card_library[name]
|
||
except Exception:
|
||
return False
|
||
else:
|
||
entry['Count'] = cnt - 1
|
||
if was_land:
|
||
self._color_source_cache_dirty = True
|
||
if was_non_land:
|
||
self._spell_pip_cache_dirty = True
|
||
return True
|
||
|
||
def _enforce_land_cap(self, step_label: str = ""):
|
||
"""Delegate land cap enforcement to utility helper."""
|
||
|
||
bu.enforce_land_cap(self, step_label)
|
||
|
||
# ===========================
|
||
# Non-Land Addition: Creatures (moved to CreatureAdditionMixin)
|
||
# ===========================
|
||
# Implementation now in phases/phase3_creatures.py (CreatureAdditionMixin)
|
||
|
||
# Non-Creature Additions (moved to SpellAdditionMixin)
|
||
# Implementations now located in phases/phase4_spells.py (SpellAdditionMixin)
|
||
|
||
# (Type summary now provided by ReportingMixin)
|
||
|
||
# ---------------------------
|
||
# Card Library Reporting
|
||
# ---------------------------
|
||
# (CSV export now provided by ReportingMixin)
|
||
|
||
# (Card library printing & tag summary now provided by ReportingMixin)
|
||
|
||
# Internal helper for wrapping cell contents to keep table readable
|
||
# (_wrap_cell helper moved to ReportingMixin)
|
||
|
||
# Convenience to run Step 1 & 2 sequentially (future orchestrator)
|
||
def run_deck_build_steps_1_2(self):
|
||
self.run_deck_build_step1()
|
||
self.run_deck_build_step2()
|