With assistance from Github CoPilot, massively overhauled the builder functionality, splitting it into smaller modules to provide a better step-by-step focus and drastically reduce the overall size of the core builder module

This commit is contained in:
mwisnowski 2025-08-20 10:46:23 -07:00
parent ff1912f979
commit 760c36d75d
17 changed files with 3044 additions and 2602 deletions

File diff suppressed because it is too large Load diff

View file

@ -364,7 +364,7 @@ LAND_REMOVAL_MAX_ATTEMPTS: Final[int] = 3
PROTECTED_LANDS: Final[List[str]] = BASIC_LANDS + [land['name'] for land in KINDRED_STAPLE_LANDS]
# Other defaults
DEFAULT_CREATURE_COUNT: Final[int] = 25 # Default number of creatures
DEFAULT_CREATURE_COUNT: Final[int] = 35 # Default number of creatures
DEFAULT_REMOVAL_COUNT: Final[int] = 10 # Default number of spot removal spells
DEFAULT_WIPES_COUNT: Final[int] = 2 # Default number of board wipes

View file

@ -0,0 +1,149 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Dict, Optional
"""Phase 0: Core primitives & shared definitions extracted from monolithic builder.
This phase lifts out lightweight, lowrisk items that have no external
side effects and are broadly reused across the deck building pipeline:
Contents:
* Fuzzy matching backend selection & helpers (_full_ratio, _top_matches)
* Basic scoring thresholds (EXACT_NAME_THRESHOLD, FIRST_WORD_THRESHOLD, MAX_PRESENTED_CHOICES)
* BracketDefinition dataclass and BRACKET_DEFINITIONS list (power bracket taxonomy)
The original imports and symbol names are preserved so existing code in
builder.py can import:
from .phases.phase0_core import (
_full_ratio, _top_matches,
EXACT_NAME_THRESHOLD, FIRST_WORD_THRESHOLD, MAX_PRESENTED_CHOICES,
BracketDefinition, BRACKET_DEFINITIONS
)
No behavior change intended.
"""
# Attempt to use a fast fuzzy library; fall back gracefully
try:
from rapidfuzz import process as rf_process, fuzz as rf_fuzz # type: ignore
_FUZZ_BACKEND = "rapidfuzz"
except ImportError: # pragma: no cover - environment dependent
try:
from fuzzywuzzy import process as fw_process, fuzz as fw_fuzz # type: ignore
_FUZZ_BACKEND = "fuzzywuzzy"
except ImportError: # pragma: no cover
_FUZZ_BACKEND = "difflib"
if _FUZZ_BACKEND == "rapidfuzz":
def _full_ratio(a: str, b: str) -> float:
return rf_fuzz.ratio(a, b)
def _top_matches(query: str, choices: List[str], limit: int):
return [(name, int(score)) for name, score, _ in rf_process.extract(query, choices, limit=limit)]
elif _FUZZ_BACKEND == "fuzzywuzzy":
def _full_ratio(a: str, b: str) -> float:
return fw_fuzz.ratio(a, b)
def _top_matches(query: str, choices: List[str], limit: int):
return fw_process.extract(query, choices, limit=limit)
else:
from difflib import SequenceMatcher, get_close_matches
def _full_ratio(a: str, b: str) -> float:
return SequenceMatcher(None, a.lower(), b.lower()).ratio() * 100
def _top_matches(query: str, choices: List[str], limit: int):
close = get_close_matches(query, choices, n=limit, cutoff=0.0)
scored = [(c, int(_full_ratio(query, c))) for c in close]
if len(scored) < limit:
remaining = [c for c in choices if c not in close]
extra = sorted(
((c, int(_full_ratio(query, c))) for c in remaining),
key=lambda x: x[1],
reverse=True
)[: limit - len(scored)]
scored.extend(extra)
return scored
EXACT_NAME_THRESHOLD = 80
FIRST_WORD_THRESHOLD = 75
MAX_PRESENTED_CHOICES = 5
@dataclass(frozen=True)
class BracketDefinition:
level: int
name: str
short_desc: str
long_desc: str
limits: Dict[str, Optional[int]] # None = unlimited
BRACKET_DEFINITIONS: List[BracketDefinition] = [
BracketDefinition(
1,
"Exhibition",
"Ultra-casual / novelty; long games; focus on fun.",
("Throw down with your ultracasual deck. Winning isn't primary—show off something unusual. "
"Games go long and end slowly."),
{
"game_changers": 0,
"mass_land_denial": 0,
"extra_turns": 0,
"tutors_nonland": 3,
"two_card_combos": 0
}
),
BracketDefinition(
2,
"Core",
"Precon baseline; splashy turns; 9+ turn games.",
("Average modern precon: tuned engines & splashy turns, some pet/theme cards, usually longer games."),
{
"game_changers": 0,
"mass_land_denial": 0,
"extra_turns": 3,
"tutors_nonland": 3,
"two_card_combos": 0
}
),
BracketDefinition(
3,
"Upgraded",
"Refined beyond precon; faster; selective power.",
("Carefully selected cards; may include up to three Game Changers. Avoids cheap fast infinite twocard combos."),
{
"game_changers": 3,
"mass_land_denial": 0,
"extra_turns": 3,
"tutors_nonland": None,
"two_card_combos": 0
}
),
BracketDefinition(
4,
"Optimized",
"High power, explosive, not meta-focused.",
("Strong, explosive builds; any number of powerful effects, tutors, combos, and denial."),
{
"game_changers": None,
"mass_land_denial": None,
"extra_turns": None,
"tutors_nonland": None,
"two_card_combos": None
}
),
BracketDefinition(
5,
"cEDH",
"Competitive, meta-driven mindset.",
("Metagame/tournament mindset; precision choices; winning prioritized over expression."),
{
"game_changers": None,
"mass_land_denial": None,
"extra_turns": None,
"tutors_nonland": None,
"two_card_combos": None
}
),
]
__all__ = [
'_full_ratio', '_top_matches',
'EXACT_NAME_THRESHOLD', 'FIRST_WORD_THRESHOLD', 'MAX_PRESENTED_CHOICES',
'BracketDefinition', 'BRACKET_DEFINITIONS'
]

View file

@ -0,0 +1,200 @@
from __future__ import annotations
from typing import List, Optional
import pandas as pd
from .phase0_core import BracketDefinition, BRACKET_DEFINITIONS # noqa: F401
"""Phase 1: Commander & Tag Selection logic.
Extracted from builder.py to reduce monolith size. All public method names and
signatures preserved; DeckBuilder will delegate to these mixin-style functions.
Provided functions expect `self` to be a DeckBuilder instance exposing:
- input_func, output_func
- load_commander_data, _auto_accept, _gather_candidates, _format_commander_pretty,
_initialize_commander_dict
- commander_name, commander_row, commander_tags, commander_dict
- selected_tags, primary_tag, secondary_tag, tertiary_tag
- bracket_definition related attributes for printing summary
No sideeffect changes.
"""
# (Imports moved to top for lint compliance)
class CommanderSelectionMixin:
# ---------------------------
# Commander Selection
# ---------------------------
def choose_commander(self) -> str: # type: ignore[override]
df = self.load_commander_data()
names = df["name"].tolist()
while True:
query = self.input_func("Enter commander name: ").strip()
if not query:
self.output_func("No input provided. Try again.")
continue
direct_hits = [n for n in names if self._auto_accept(query, n)]
if len(direct_hits) == 1:
candidate = direct_hits[0]
self.output_func(f"(Auto match candidate) {candidate}")
if self._present_commander_and_confirm(df, candidate):
self.output_func(f"Confirmed: {candidate}")
return candidate
else:
self.output_func("Not confirmed. Starting over.\n")
continue
candidates = self._gather_candidates(query, names)
if not candidates:
self.output_func("No close matches found. Try again.")
continue
self.output_func("\nTop matches:")
for idx, (n, score) in enumerate(candidates, start=1):
self.output_func(f" {idx}. {n} (score {score})")
self.output_func("Enter number to inspect, 'r' to retry, or type a new name:")
choice = self.input_func("Selection: ").strip()
if choice.lower() == 'r':
continue
if choice.isdigit():
i = int(choice)
if 1 <= i <= len(candidates):
nm = candidates[i - 1][0]
if self._present_commander_and_confirm(df, nm):
self.output_func(f"Confirmed: {nm}")
return nm
else:
self.output_func("Not confirmed. Search again.\n")
continue
else:
self.output_func("Invalid index.")
continue
query = choice # treat as new query
def _present_commander_and_confirm(self, df: pd.DataFrame, name: str) -> bool: # type: ignore[override]
row = df[df["name"] == name].iloc[0]
pretty = self._format_commander_pretty(row)
self.output_func("\n" + pretty)
while True:
resp = self.input_func("Is this the commander you want? (y/n): ").strip().lower()
if resp in ("y", "yes"):
self._apply_commander_selection(row)
return True
if resp in ("n", "no"):
return False
self.output_func("Please enter y or n.")
def _apply_commander_selection(self, row: pd.Series): # type: ignore[override]
self.commander_name = row["name"]
self.commander_row = row
self.commander_tags = list(row.get("themeTags", []) or [])
self._initialize_commander_dict(row)
# ---------------------------
# Tag Prioritization
# ---------------------------
def select_commander_tags(self) -> List[str]: # type: ignore[override]
if not self.commander_name:
self.output_func("No commander chosen yet. Selecting commander first...")
self.choose_commander()
tags = list(dict.fromkeys(self.commander_tags))
if not tags:
self.output_func("Commander has no theme tags available.")
self.selected_tags = []
self.primary_tag = self.secondary_tag = self.tertiary_tag = None
self._update_commander_dict_with_selected_tags()
return self.selected_tags
self.output_func("\nAvailable Theme Tags:")
for i, t in enumerate(tags, 1):
self.output_func(f" {i}. {t}")
self.selected_tags = []
self.primary_tag = self._prompt_tag_choice(tags, "Select PRIMARY tag (required):", allow_stop=False)
self.selected_tags.append(self.primary_tag)
remaining = [t for t in tags if t not in self.selected_tags]
if remaining:
self.secondary_tag = self._prompt_tag_choice(remaining, "Select SECONDARY tag (or 0 to stop here):", allow_stop=True)
if self.secondary_tag:
self.selected_tags.append(self.secondary_tag)
remaining = [t for t in remaining if t != self.secondary_tag]
if remaining and self.secondary_tag:
self.tertiary_tag = self._prompt_tag_choice(remaining, "Select TERTIARY tag (or 0 to stop here):", allow_stop=True)
if self.tertiary_tag:
self.selected_tags.append(self.tertiary_tag)
self.output_func("\nChosen Tags (in priority order):")
if not self.selected_tags:
self.output_func(" (None)")
else:
for idx, tag in enumerate(self.selected_tags, 1):
label = ["Primary", "Secondary", "Tertiary"][idx - 1] if idx <= 3 else f"Tag {idx}"
self.output_func(f" {idx}. {tag} ({label})")
self._update_commander_dict_with_selected_tags()
return self.selected_tags
def _prompt_tag_choice(self, available: List[str], prompt_text: str, allow_stop: bool) -> Optional[str]: # type: ignore[override]
while True:
self.output_func("\nCurrent options:")
for i, t in enumerate(available, 1):
self.output_func(f" {i}. {t}")
if allow_stop:
self.output_func(" 0. Stop (no further tags)")
raw = self.input_func(f"{prompt_text} ").strip()
if allow_stop and raw == "0":
return None
if raw.isdigit():
idx = int(raw)
if 1 <= idx <= len(available):
return available[idx - 1]
matches = [t for t in available if t.lower() == raw.lower()]
if matches:
return matches[0]
self.output_func("Invalid selection. Try again.")
def _update_commander_dict_with_selected_tags(self): # type: ignore[override]
if not self.commander_dict and self.commander_row is not None:
self._initialize_commander_dict(self.commander_row)
if not self.commander_dict:
return
self.commander_dict["Primary Tag"] = self.primary_tag
self.commander_dict["Secondary Tag"] = self.secondary_tag
self.commander_dict["Tertiary Tag"] = self.tertiary_tag
self.commander_dict["Selected Tags"] = self.selected_tags.copy()
# ---------------------------
# Power Bracket Selection
# ---------------------------
def select_power_bracket(self) -> BracketDefinition: # type: ignore[override]
if self.bracket_definition:
return self.bracket_definition
self.output_func("\nChoose Deck Power Bracket:")
for bd in BRACKET_DEFINITIONS:
self.output_func(f" {bd.level}. {bd.name} - {bd.short_desc}")
while True:
raw = self.input_func("Enter bracket number (1-5) or 'info' for details: ").strip().lower()
if raw == "info":
self._print_bracket_details()
continue
if raw.isdigit():
num = int(raw)
match = next((bd for bd in BRACKET_DEFINITIONS if bd.level == num), None)
if match:
self.bracket_definition = match
self.bracket_level = match.level
self.bracket_name = match.name
self.bracket_limits = match.limits.copy()
self.output_func(f"\nSelected Bracket {match.level}: {match.name}")
self._print_selected_bracket_summary()
return match
self.output_func("Invalid input. Type 1-5 or 'info'.")
def _print_bracket_details(self): # type: ignore[override]
self.output_func("\nBracket Details:")
for bd in BRACKET_DEFINITIONS:
self.output_func(f"\n[{bd.level}] {bd.name}")
self.output_func(bd.long_desc)
self.output_func(self._format_limits(bd.limits))
def _print_selected_bracket_summary(self): # type: ignore[override]
self.output_func("\nBracket Constraints:")
if self.bracket_limits:
self.output_func(self._format_limits(self.bracket_limits))
__all__ = [
'CommanderSelectionMixin'
]

View file

@ -0,0 +1,115 @@
from __future__ import annotations
from typing import Dict, Optional
from .. import builder_constants as bc
"""Phase 2 (part 1): Basic land addition logic (Land Step 1).
Extracted from the monolithic `builder.py` to begin modularizing land building.
Responsibilities provided by this mixin:
- add_basic_lands(): core allocation & addition of basic (or snow) lands.
- run_land_step1(): public wrapper invoked by the deck build orchestrator.
Expected attributes / methods on the host DeckBuilder:
- color_identity, selected_tags, commander_tags, ideal_counts
- determine_color_identity(), setup_dataframes(), add_card()
- output_func for user messaging
- bc (builder_constants) imported in builder module; we import locally here.
"""
# (Imports moved to top for lint compliance)
class LandBasicsMixin:
def add_basic_lands(self): # type: ignore[override]
"""Add basic (or snow basic) lands based on color identity.
Logic:
- Determine target basics = ceil(1.3 * ideal_basic_min) (rounded) but capped by total land target
- Evenly distribute among colored identity letters (W,U,B,R,G)
- If commander/selected tags include 'Snow' (case-insensitive) use snow basics mapping
- Colorless commander: use Wastes for the entire basic allocation
"""
# Ensure color identity determined
if not getattr(self, 'files_to_load', []):
try:
self.determine_color_identity()
self.setup_dataframes()
except Exception as e: # pragma: no cover - defensive
self.output_func(f"Cannot add basics until color identity resolved: {e}")
return
# Ensure ideal counts (for min basics & total lands)
basic_min: Optional[int] = None
land_total: Optional[int] = None
if hasattr(self, 'ideal_counts') and getattr(self, 'ideal_counts'):
basic_min = self.ideal_counts.get('basic_lands') # type: ignore[attr-defined]
land_total = self.ideal_counts.get('lands') # type: ignore[attr-defined]
if basic_min is None:
basic_min = getattr(bc, 'DEFAULT_BASIC_LAND_COUNT', 20)
if land_total is None:
land_total = getattr(bc, 'DEFAULT_LAND_COUNT', 35)
# Target basics = 1.3 * minimum (rounded) but not exceeding total lands
target_basics = int(round(1.3 * basic_min))
if target_basics > land_total:
target_basics = land_total
if target_basics <= 0:
self.output_func("Target basic land count is zero; skipping basics.")
return
colors = [c for c in getattr(self, 'color_identity', []) if c in ['W', 'U', 'B', 'R', 'G']]
if not colors: # colorless special case -> Wastes only
colors = []
# Determine if snow preferred
selected_tags = getattr(self, 'selected_tags', []) or []
commander_tags = getattr(self, 'commander_tags', []) or []
tag_pool = selected_tags + commander_tags
use_snow = any('snow' in str(t).lower() for t in tag_pool)
snow_map = getattr(bc, 'SNOW_BASIC_LAND_MAPPING', {})
basic_map = getattr(bc, 'COLOR_TO_BASIC_LAND', {})
allocation: Dict[str, int] = {}
if not colors: # colorless
allocation_name = snow_map.get('C', 'Wastes') if use_snow else 'Wastes'
allocation[allocation_name] = target_basics
else:
n = len(colors)
base = target_basics // n
rem = target_basics % n
for idx, c in enumerate(sorted(colors)): # sorted for deterministic distribution
count = base + (1 if idx < rem else 0)
land_name = snow_map.get(c) if use_snow else basic_map.get(c)
if not land_name:
continue
allocation[land_name] = allocation.get(land_name, 0) + count
# Add to library
for land_name, count in allocation.items():
for _ in range(count):
# Role metadata: basics (or snow basics)
self.add_card(
land_name,
card_type='Land',
role='basic',
sub_role='snow-basic' if use_snow else 'basic',
added_by='lands_step1',
trigger_tag='Snow' if use_snow else None
)
# Summary output
self.output_func("\nBasic Lands Added:")
width = max((len(n) for n in allocation.keys()), default=0)
for name, cnt in allocation.items():
self.output_func(f" {name.ljust(width)} : {cnt}")
self.output_func(f" Total Basics : {sum(allocation.values())} (Target {target_basics}, Min {basic_min})")
def run_land_step1(self): # type: ignore[override]
"""Public wrapper to execute land building step 1 (basics)."""
self.add_basic_lands()
__all__ = [
'LandBasicsMixin'
]

View file

@ -0,0 +1,218 @@
from __future__ import annotations
from typing import List, Dict
import random
from .. import builder_constants as bc
"""Phase 2 (part 5): Dual typed lands (Land Step 5).
Extracted from `builder.py` to modularize land building. Handles addition of two-color
basic-typed dual lands (e.g., Shock lands, typed cycle) with basic land type detection
and heuristic ranking plus a weighted shuffle for variety.
Provided by LandDualsMixin:
- add_dual_lands(requested_count: int | None = None)
- run_land_step5(requested_count: int | None = None)
Host DeckBuilder must provide:
- attributes: files_to_load, color_identity, ideal_counts, card_library, _combined_cards_df
- methods: determine_color_identity(), setup_dataframes(), _current_land_count(),
_basic_floor(), _count_basic_lands(), _choose_basic_to_trim(), _decrement_card(),
add_card(), _enforce_land_cap(), output_func
"""
class LandDualsMixin:
def add_dual_lands(self, requested_count: int | None = None): # type: ignore[override]
"""Add two-color 'typed' dual lands based on color identity."""
if not getattr(self, 'files_to_load', []):
try:
self.determine_color_identity()
self.setup_dataframes()
except Exception as e: # pragma: no cover - defensive
self.output_func(f"Cannot add dual lands until color identity resolved: {e}")
return
colors = [c for c in getattr(self, 'color_identity', []) if c in ['W','U','B','R','G']]
if len(colors) < 2:
self.output_func("Dual Lands: Not multi-color; skipping step 5.")
return
land_target = (getattr(self, 'ideal_counts', {}) or {}).get('lands', getattr(bc, 'DEFAULT_LAND_COUNT', 35))
df = getattr(self, '_combined_cards_df', None)
pool: List[str] = []
type_to_card: Dict[str,str] = {}
pair_buckets: Dict[frozenset[str], List[str]] = {}
if df is not None and not df.empty and {'name','type'}.issubset(df.columns):
try:
for _, row in df.iterrows():
try:
name = str(row.get('name',''))
if not name or name in getattr(self, 'card_library', {}):
continue
tline = str(row.get('type','')).lower()
if 'land' not in tline:
continue
types_present = [basic for basic in ['plains','island','swamp','mountain','forest'] if basic in tline]
if len(types_present) < 2:
continue
mapped_colors = set()
for tp in types_present:
if tp == 'plains':
mapped_colors.add('W')
elif tp == 'island':
mapped_colors.add('U')
elif tp == 'swamp':
mapped_colors.add('B')
elif tp == 'mountain':
mapped_colors.add('R')
elif tp == 'forest':
mapped_colors.add('G')
if len(mapped_colors) != 2:
continue
if not mapped_colors.issubset(set(colors)):
continue
pool.append(name)
type_to_card[name] = tline
key = frozenset(mapped_colors)
pair_buckets.setdefault(key, []).append(name)
except Exception:
continue
except Exception:
pass
pool = list(dict.fromkeys(pool))
if not pool:
self.output_func("Dual Lands: No candidate dual typed lands found in dataset.")
return
def rank(name: str) -> int:
lname = name.lower()
tline = type_to_card.get(name,'')
score = 0
if any(kw in lname for kw in ['temple garden','sacred foundry','stomping ground','hallowed fountain','watery grave','overgrown tomb','breeding pool','godless shrine','steam vents','blood crypt']):
score += 10
if 'enters the battlefield tapped' not in tline:
score += 2
if 'snow' in tline:
score += 1
if 'enters the battlefield tapped' in tline and 'you gain' in tline:
score -= 1
return score
for key, names in pair_buckets.items():
names.sort(key=lambda n: rank(n), reverse=True)
if len(names) > 1:
rng_obj = getattr(self, 'rng', None)
try:
weighted = [(n, max(1, rank(n))+1) for n in names]
shuffled: List[str] = []
while weighted:
total = sum(w for _n, w in weighted)
r = (rng_obj.random() if rng_obj else random.random()) * total
acc = 0.0
for idx, (n, w) in enumerate(weighted):
acc += w
if r <= acc:
shuffled.append(n)
del weighted[idx]
break
pair_buckets[key] = shuffled
except Exception:
pair_buckets[key] = names
else:
pair_buckets[key] = names
min_basic_cfg = getattr(bc, 'DEFAULT_BASIC_LAND_COUNT', 20)
if getattr(self, 'ideal_counts', None):
min_basic_cfg = self.ideal_counts.get('basic_lands', min_basic_cfg) # type: ignore[attr-defined]
basic_floor = self._basic_floor(min_basic_cfg) # type: ignore[attr-defined]
default_dual_target = getattr(bc, 'DUAL_LAND_DEFAULT_COUNT', 6)
remaining_capacity = max(0, land_target - self._current_land_count()) # type: ignore[attr-defined]
effective_default = min(default_dual_target, remaining_capacity if remaining_capacity>0 else len(pool), len(pool))
desired = effective_default if requested_count is None else max(0, int(requested_count))
if desired == 0:
self.output_func("Dual Lands: Desired count 0; skipping.")
return
if remaining_capacity == 0 and desired > 0:
slots_needed = desired
freed_slots = 0
while freed_slots < slots_needed and self._count_basic_lands() > basic_floor: # type: ignore[attr-defined]
target_basic = self._choose_basic_to_trim() # type: ignore[attr-defined]
if not target_basic or not self._decrement_card(target_basic): # type: ignore[attr-defined]
break
freed_slots += 1
if freed_slots == 0:
desired = 0
remaining_capacity = max(0, land_target - self._current_land_count()) # type: ignore[attr-defined]
desired = min(desired, remaining_capacity, len(pool))
if desired <= 0:
self.output_func("Dual Lands: No capacity after trimming; skipping.")
return
chosen: List[str] = []
bucket_keys = list(pair_buckets.keys())
rng = getattr(self, 'rng', None)
try:
if rng:
rng.shuffle(bucket_keys) # type: ignore
else:
random.shuffle(bucket_keys)
except Exception:
pass
indices = {k:0 for k in bucket_keys}
while len(chosen) < desired and bucket_keys:
progressed = False
for k in list(bucket_keys):
idx = indices[k]
names = pair_buckets.get(k, [])
if idx >= len(names):
continue
name = names[idx]
indices[k] += 1
if name in chosen:
continue
chosen.append(name)
progressed = True
if len(chosen) >= desired:
break
if not progressed:
break
added: List[str] = []
for name in chosen:
if self._current_land_count() >= land_target: # type: ignore[attr-defined]
break
# Determine sub_role as concatenated color pair for traceability
try:
tline = type_to_card.get(name, '')
types_present = [basic for basic in ['plains','island','swamp','mountain','forest'] if basic in tline]
mapped_colors = []
for tp in types_present:
if tp == 'plains':
mapped_colors.append('W')
elif tp == 'island':
mapped_colors.append('U')
elif tp == 'swamp':
mapped_colors.append('B')
elif tp == 'mountain':
mapped_colors.append('R')
elif tp == 'forest':
mapped_colors.append('G')
sub_role = ''.join(sorted(set(mapped_colors))) if mapped_colors else None
except Exception:
sub_role = None
self.add_card(
name,
card_type='Land',
role='dual',
sub_role=sub_role,
added_by='lands_step5'
) # type: ignore[attr-defined]
added.append(name)
self.output_func("\nDual Lands Added (Step 5):")
if not added:
self.output_func(" (None added)")
else:
width = max(len(n) for n in added)
for n in added:
self.output_func(f" {n.ljust(width)} : 1")
self.output_func(f" Land Count Now : {self._current_land_count()} / {land_target}") # type: ignore[attr-defined]
def run_land_step5(self, requested_count: int | None = None): # type: ignore[override]
self.add_dual_lands(requested_count=requested_count)
self._enforce_land_cap(step_label="Duals (Step 5)") # type: ignore[attr-defined]
__all__ = [
'LandDualsMixin'
]

View file

@ -0,0 +1,147 @@
from __future__ import annotations
from typing import List
import random
from .. import builder_constants as bc
"""Phase 2 (part 4): Fetch lands (Land Step 4).
Extracted logic for adding color-specific and generic fetch lands.
Provided by LandFetchMixin:
- add_fetch_lands(requested_count=None)
- run_land_step4(requested_count=None)
Host DeckBuilder must supply:
- attributes: files_to_load, ideal_counts, color_identity, card_library
- methods: determine_color_identity(), setup_dataframes(), _current_land_count(),
_basic_floor(), _count_basic_lands(), _choose_basic_to_trim(), _decrement_card(),
add_card(), _prompt_int_with_default(), _enforce_land_cap(), output_func
"""
class LandFetchMixin:
def add_fetch_lands(self, requested_count: int | None = None): # type: ignore[override]
"""Add fetch lands (color-specific + generic) respecting land target."""
if not getattr(self, 'files_to_load', []):
try:
self.determine_color_identity()
self.setup_dataframes()
except Exception as e: # pragma: no cover - defensive
self.output_func(f"Cannot add fetch lands until color identity resolved: {e}")
return
land_target = (getattr(self, 'ideal_counts', {}).get('lands') if getattr(self, 'ideal_counts', None) else None) or getattr(bc, 'DEFAULT_LAND_COUNT', 35) # type: ignore[attr-defined]
current = self._current_land_count() # type: ignore[attr-defined]
color_order = [c for c in getattr(self, 'color_identity', []) if c in ['W','U','B','R','G']]
color_map = getattr(bc, 'COLOR_TO_FETCH_LANDS', {})
candidates: List[str] = []
for c in color_order:
for nm in color_map.get(c, []):
if nm not in candidates:
candidates.append(nm)
generic_list = getattr(bc, 'GENERIC_FETCH_LANDS', [])
for nm in generic_list:
if nm not in candidates:
candidates.append(nm)
candidates = [n for n in candidates if n not in getattr(self, 'card_library', {})]
if not candidates:
self.output_func("Fetch Lands: No eligible fetch lands remaining.")
return
default_fetch = getattr(bc, 'FETCH_LAND_DEFAULT_COUNT', 3)
remaining_capacity = max(0, land_target - current)
cap_for_default = remaining_capacity if remaining_capacity > 0 else len(candidates)
effective_default = min(default_fetch, cap_for_default, len(candidates))
existing_fetches = sum(1 for n in getattr(self, 'card_library', {}) if n in candidates)
fetch_cap = getattr(bc, 'FETCH_LAND_MAX_CAP', 99)
remaining_fetch_slots = max(0, fetch_cap - existing_fetches)
if requested_count is None:
self.output_func("\nAdd Fetch Lands (Step 4):")
self.output_func("Fetch lands help fix colors & enable landfall / graveyard synergies.")
prompt = f"Enter desired number of fetch lands (default: {effective_default}):"
desired = self._prompt_int_with_default(prompt + ' ', effective_default, minimum=0, maximum=20) # type: ignore[attr-defined]
else:
desired = max(0, int(requested_count))
if desired > remaining_fetch_slots:
desired = remaining_fetch_slots
if desired == 0:
self.output_func("Fetch Lands: Global fetch cap reached; skipping.")
return
if desired == 0:
self.output_func("Fetch Lands: Desired count 0; skipping.")
return
if remaining_capacity == 0 and desired > 0:
min_basic_cfg = getattr(bc, 'DEFAULT_BASIC_LAND_COUNT', 20)
if getattr(self, 'ideal_counts', None):
min_basic_cfg = self.ideal_counts.get('basic_lands', min_basic_cfg) # type: ignore[attr-defined]
floor_basics = self._basic_floor(min_basic_cfg) # type: ignore[attr-defined]
slots_needed = desired
while slots_needed > 0 and self._count_basic_lands() > floor_basics: # type: ignore[attr-defined]
target_basic = self._choose_basic_to_trim() # type: ignore[attr-defined]
if not target_basic or not self._decrement_card(target_basic): # type: ignore[attr-defined]
break
slots_needed -= 1
remaining_capacity = max(0, land_target - self._current_land_count()) # type: ignore[attr-defined]
if remaining_capacity > 0 and slots_needed == 0:
break
if slots_needed > 0 and remaining_capacity == 0:
desired -= slots_needed
remaining_capacity = max(0, land_target - self._current_land_count()) # type: ignore[attr-defined]
desired = min(desired, remaining_capacity, len(candidates), remaining_fetch_slots)
if desired <= 0:
self.output_func("Fetch Lands: No capacity (after trimming) or desired reduced to 0; skipping.")
return
rng = getattr(self, 'rng', None)
color_specific_all: List[str] = []
for c in color_order:
for n in color_map.get(c, []):
if n in candidates and n not in color_specific_all:
color_specific_all.append(n)
generic_all: List[str] = [n for n in generic_list if n in candidates]
def sampler(pool: List[str], k: int) -> List[str]:
if k <= 0 or not pool:
return []
if k >= len(pool):
return pool.copy()
try:
return (rng.sample if rng else random.sample)(pool, k) # type: ignore
except Exception:
return pool[:k]
need = desired
chosen: List[str] = []
take_color = min(need, len(color_specific_all))
chosen.extend(sampler(color_specific_all, take_color))
need -= len(chosen)
if need > 0:
chosen.extend(sampler(generic_all, min(need, len(generic_all))))
if len(chosen) < desired:
leftovers = [n for n in candidates if n not in chosen]
chosen.extend(leftovers[: desired - len(chosen)])
added: List[str] = []
for nm in chosen:
if self._current_land_count() >= land_target: # type: ignore[attr-defined]
break
note = 'generic' if nm in generic_list else 'color-specific'
self.add_card(
nm,
card_type='Land',
role='fetch',
sub_role=note,
added_by='lands_step4'
) # type: ignore[attr-defined]
added.append(nm)
self.output_func("\nFetch Lands Added (Step 4):")
if not added:
self.output_func(" (None added)")
else:
width = max(len(n) for n in added)
for n in added:
note = 'generic' if n in generic_list else 'color-specific'
self.output_func(f" {n.ljust(width)} : 1 ({note})")
self.output_func(f" Land Count Now : {self._current_land_count()} / {land_target}") # type: ignore[attr-defined]
def run_land_step4(self, requested_count: int | None = None): # type: ignore[override]
"""Public wrapper to add fetch lands. Optional requested_count to bypass prompt."""
self.add_fetch_lands(requested_count=requested_count)
self._enforce_land_cap(step_label="Fetch (Step 4)") # type: ignore[attr-defined]
__all__ = [
'LandFetchMixin'
]

View file

@ -0,0 +1,152 @@
from __future__ import annotations
from typing import List, Dict
from .. import builder_constants as bc
"""Phase 2 (part 3): Kindred / tribal land additions (Land Step 3).
Extracted from `builder.py` to reduce monolith size. Focuses on lands that care
about creature types or tribal synergies when a selected tag includes 'Kindred' or 'Tribal'.
Provided by LandKindredMixin:
- add_kindred_lands()
- run_land_step3()
Host DeckBuilder must provide:
- attributes: selected_tags, commander_tags, color_identity, ideal_counts, commander_row,
card_library, _full_cards_df
- methods: determine_color_identity(), setup_dataframes(), add_card(), _current_land_count(),
_basic_floor(), _count_basic_lands(), _choose_basic_to_trim(), _decrement_card(),
_enforce_land_cap(), output_func
"""
class LandKindredMixin:
def add_kindred_lands(self): # type: ignore[override]
"""Add kindred-oriented lands ONLY if a selected tag includes 'Kindred' or 'Tribal'.
Baseline inclusions on kindred focus:
- Path of Ancestry (always when kindred)
- Cavern of Souls (<=4 colors)
- Three Tree City (>=2 colors)
Dynamic tribe-specific lands: derived only from selected tags (not all commander tags).
Capacity: may swap excess basics (above 90% floor) similar to other steps.
"""
if not getattr(self, 'files_to_load', []):
try:
self.determine_color_identity()
self.setup_dataframes()
except Exception as e: # pragma: no cover - defensive
self.output_func(f"Cannot add kindred lands until color identity resolved: {e}")
return
if not any(('kindred' in t.lower() or 'tribal' in t.lower()) for t in (getattr(self, 'selected_tags', []) or [])):
self.output_func("Kindred Lands: No selected kindred/tribal tag; skipping.")
return
if hasattr(self, 'ideal_counts') and getattr(self, 'ideal_counts'):
land_target = self.ideal_counts.get('lands', getattr(bc, 'DEFAULT_LAND_COUNT', 35)) # type: ignore[attr-defined]
else:
land_target = getattr(bc, 'DEFAULT_LAND_COUNT', 35)
min_basic_cfg = getattr(bc, 'DEFAULT_BASIC_LAND_COUNT', 20)
if hasattr(self, 'ideal_counts') and getattr(self, 'ideal_counts'):
min_basic_cfg = self.ideal_counts.get('basic_lands', min_basic_cfg) # type: ignore[attr-defined]
basic_floor = self._basic_floor(min_basic_cfg) # type: ignore[attr-defined]
def ensure_capacity() -> bool:
if self._current_land_count() < land_target: # type: ignore[attr-defined]
return True
if self._count_basic_lands() <= basic_floor: # type: ignore[attr-defined]
return False
target_basic = self._choose_basic_to_trim() # type: ignore[attr-defined]
if not target_basic:
return False
if not self._decrement_card(target_basic): # type: ignore[attr-defined]
return False
return self._current_land_count() < land_target # type: ignore[attr-defined]
colors = getattr(self, 'color_identity', []) or []
added: List[str] = []
reasons: Dict[str, str] = {}
def try_add(name: str, reason: str):
if name in self.card_library: # type: ignore[attr-defined]
return
if not ensure_capacity():
return
self.add_card(
name,
card_type='Land',
role='kindred',
sub_role='baseline' if reason.startswith('kindred focus') else 'tribe-specific',
added_by='lands_step3',
trigger_tag='Kindred/Tribal'
) # type: ignore[attr-defined]
added.append(name)
reasons[name] = reason
# Baseline inclusions
try_add('Path of Ancestry', 'kindred focus')
if len(colors) <= 4:
try_add('Cavern of Souls', f"kindred focus ({len(colors)} colors)")
if len(colors) >= 2:
try_add('Three Tree City', f"kindred focus ({len(colors)} colors)")
# Dynamic tribe extraction
tribe_terms: set[str] = set()
for tag in (getattr(self, 'selected_tags', []) or []):
lower = tag.lower()
if 'kindred' in lower:
base = lower.replace('kindred', '').strip()
if base:
tribe_terms.add(base.split()[0])
elif 'tribal' in lower:
base = lower.replace('tribal', '').strip()
if base:
tribe_terms.add(base.split()[0])
snapshot = getattr(self, '_full_cards_df', None)
if snapshot is not None and not snapshot.empty and tribe_terms:
dynamic_limit = 5
for tribe in sorted(tribe_terms):
if self._current_land_count() >= land_target or dynamic_limit <= 0: # type: ignore[attr-defined]
break
tribe_lower = tribe.lower()
matches: List[str] = []
for _, row in snapshot.iterrows():
try:
nm = str(row.get('name', ''))
if not nm or nm in self.card_library: # type: ignore[attr-defined]
continue
tline = str(row.get('type', row.get('type_line', ''))).lower()
if 'land' not in tline:
continue
text_field = row.get('text', row.get('oracleText', ''))
text_str = str(text_field).lower() if text_field is not None else ''
nm_lower = nm.lower()
if (tribe_lower in nm_lower or f" {tribe_lower}" in text_str or f"{tribe_lower} " in text_str or f"{tribe_lower}s" in text_str):
matches.append(nm)
except Exception:
continue
for nm in matches[:2]:
if self._current_land_count() >= land_target or dynamic_limit <= 0: # type: ignore[attr-defined]
break
if nm in added or nm in getattr(bc, 'BASIC_LANDS', []):
continue
try_add(nm, f"text/name references '{tribe}'")
dynamic_limit -= 1
self.output_func("\nKindred Lands Added (Step 3):")
if not added:
self.output_func(" (None added)")
else:
width = max(len(n) for n in added)
for n in added:
self.output_func(f" {n.ljust(width)} : 1 ({reasons.get(n,'')})")
self.output_func(f" Land Count Now : {self._current_land_count()} / {land_target}") # type: ignore[attr-defined]
def run_land_step3(self): # type: ignore[override]
"""Public wrapper to add kindred-focused lands."""
self.add_kindred_lands()
self._enforce_land_cap(step_label="Kindred (Step 3)") # type: ignore[attr-defined]
__all__ = [
'LandKindredMixin'
]

View file

@ -0,0 +1,180 @@
from __future__ import annotations
from typing import Optional, List, Dict
from .. import builder_constants as bc
from .. import builder_utils as bu
class LandMiscUtilityMixin:
"""Mixin for Land Building Step 7: Misc / Utility Lands.
Provides:
- add_misc_utility_lands
- run_land_step7
- tag-driven suggestion queue helpers (_build_tag_driven_land_suggestions, _apply_land_suggestions_if_room)
Extracted verbatim (with light path adjustments) from original monolithic builder.
"""
def add_misc_utility_lands(self, requested_count: Optional[int] = None): # type: ignore[override]
if not getattr(self, 'files_to_load', None):
try:
self.determine_color_identity()
self.setup_dataframes()
except Exception as e:
self.output_func(f"Cannot add misc utility lands until color identity resolved: {e}")
return
df = getattr(self, '_combined_cards_df', None)
if df is None or df.empty:
self.output_func("Misc Lands: No card pool loaded.")
return
land_target = getattr(self, 'ideal_counts', {}).get('lands', getattr(bc, 'DEFAULT_LAND_COUNT', 35)) if getattr(self, 'ideal_counts', None) else getattr(bc, 'DEFAULT_LAND_COUNT', 35)
current = self._current_land_count()
remaining_capacity = max(0, land_target - current)
if remaining_capacity <= 0:
remaining_capacity = 0
min_basic_cfg = getattr(bc, 'DEFAULT_BASIC_LAND_COUNT', 20)
if hasattr(self, 'ideal_counts') and self.ideal_counts:
min_basic_cfg = self.ideal_counts.get('basic_lands', min_basic_cfg)
basic_floor = self._basic_floor(min_basic_cfg)
if requested_count is not None:
desired = max(0, int(requested_count))
else:
desired = max(0, land_target - current)
if desired == 0:
self.output_func("Misc Lands: No remaining land capacity; skipping.")
return
basics = self._basic_land_names()
already = set(self.card_library.keys())
top_n = getattr(bc, 'MISC_LAND_TOP_POOL_SIZE', 30)
top_candidates = bu.select_top_land_candidates(df, already, basics, top_n)
if not top_candidates:
self.output_func("Misc Lands: No remaining candidate lands.")
return
weighted_pool: List[tuple[str,int]] = []
base_weight_fix = getattr(bc, 'MISC_LAND_COLOR_FIX_PRIORITY_WEIGHT', 2)
fetch_names = set()
for seq in getattr(bc, 'COLOR_TO_FETCH_LANDS', {}).values():
for nm in seq:
fetch_names.add(nm)
for nm in getattr(bc, 'GENERIC_FETCH_LANDS', []):
fetch_names.add(nm)
existing_fetch_count = bu.count_existing_fetches(self.card_library)
fetch_cap = getattr(bc, 'FETCH_LAND_MAX_CAP', 99)
remaining_fetch_slots = max(0, fetch_cap - existing_fetch_count)
for edh_val, name, tline, text_lower in top_candidates:
w = 1
if bu.is_color_fixing_land(tline, text_lower):
w *= base_weight_fix
if name in fetch_names and remaining_fetch_slots <= 0:
continue
weighted_pool.append((name, w))
if self._current_land_count() >= land_target and desired > 0:
slots_needed = desired
freed = 0
while freed < slots_needed and self._count_basic_lands() > basic_floor:
target_basic = self._choose_basic_to_trim()
if not target_basic or not self._decrement_card(target_basic):
break
freed += 1
if freed == 0 and self._current_land_count() >= land_target:
self.output_func("Misc Lands: Cannot free capacity; skipping.")
return
remaining_capacity = max(0, land_target - self._current_land_count())
desired = min(desired, remaining_capacity, len(weighted_pool))
if desired <= 0:
self.output_func("Misc Lands: No capacity after trimming; skipping.")
return
rng = getattr(self, 'rng', None)
chosen = bu.weighted_sample_without_replacement(weighted_pool, desired, rng=rng)
added: List[str] = []
for nm in chosen:
if self._current_land_count() >= land_target:
break
# Misc utility lands baseline role
self.add_card(nm, card_type='Land', role='utility', sub_role='misc', added_by='lands_step7')
added.append(nm)
self.output_func("\nMisc Utility Lands Added (Step 7):")
if not added:
self.output_func(" (None added)")
else:
width = max(len(n) for n in added)
for n in added:
note = ''
row = next((r for r in top_candidates if r[1] == n), None)
if row:
for edh_val, name2, tline2, text_lower2 in top_candidates:
if name2 == n and bu.is_color_fixing_land(tline2, text_lower2):
note = '(fixing)'
break
self.output_func(f" {n.ljust(width)} : 1 {note}")
self.output_func(f" Land Count Now : {self._current_land_count()} / {land_target}")
def run_land_step7(self, requested_count: Optional[int] = None): # type: ignore[override]
self.add_misc_utility_lands(requested_count=requested_count)
self._enforce_land_cap(step_label="Utility (Step 7)")
self._build_tag_driven_land_suggestions()
self._apply_land_suggestions_if_room()
# ---- Tag-driven suggestion helpers (used after Step 7) ----
def _build_tag_driven_land_suggestions(self): # type: ignore[override]
suggestions = bu.build_tag_driven_suggestions(self)
if suggestions:
self.suggested_lands_queue.extend(suggestions)
def _apply_land_suggestions_if_room(self): # type: ignore[override]
if not self.suggested_lands_queue:
return
land_target = getattr(self, 'ideal_counts', {}).get('lands', getattr(bc, 'DEFAULT_LAND_COUNT', 35)) if getattr(self, 'ideal_counts', None) else getattr(bc, 'DEFAULT_LAND_COUNT', 35)
applied: List[Dict] = []
remaining: List[Dict] = []
min_basic_cfg = getattr(bc, 'DEFAULT_BASIC_LAND_COUNT', 20)
if hasattr(self, 'ideal_counts') and self.ideal_counts:
min_basic_cfg = self.ideal_counts.get('basic_lands', min_basic_cfg)
basic_floor = self._basic_floor(min_basic_cfg)
for sug in self.suggested_lands_queue:
name = sug['name']
if name in self.card_library:
continue
if not sug['condition'](self):
remaining.append(sug)
continue
if self._current_land_count() >= land_target:
if sug.get('defer_if_full'):
if self._count_basic_lands() > basic_floor:
target_basic = self._choose_basic_to_trim()
if not target_basic or not self._decrement_card(target_basic):
remaining.append(sug)
continue
else:
remaining.append(sug)
continue
# Tag suggestion additions (flex if marked)
self.add_card(
name,
card_type='Land',
role=('flex' if sug.get('flex') else 'utility'),
sub_role='tag-suggested',
added_by='tag_suggestion',
trigger_tag=sug.get('reason')
)
applied.append(sug)
self.suggested_lands_queue = remaining
if applied:
self.output_func("\nTag-Driven Utility Lands Added:")
width = max(len(s['name']) for s in applied)
for s in applied:
role = ' (flex)' if s.get('flex') else ''
self.output_func(f" {s['name'].ljust(width)} : 1 {s['reason']}{role}")

View file

@ -0,0 +1,153 @@
from __future__ import annotations
from typing import List
from .. import builder_constants as bc
from .. import builder_utils as bu
class LandOptimizationMixin:
"""Mixin for Land Building Step 8: ETB Tapped Minimization / Optimization Pass.
Provides optimize_tapped_lands and run_land_step8 (moved from monolithic builder).
"""
def optimize_tapped_lands(self): # type: ignore[override]
df = getattr(self, '_combined_cards_df', None)
if df is None or df.empty:
return
bracket_level = getattr(self, 'bracket_level', None)
threshold_map = getattr(bc, 'TAPPED_LAND_MAX_THRESHOLDS', {5:6,4:8,3:10,2:12,1:14})
threshold = threshold_map.get(bracket_level, 10)
name_to_row = {}
for _, row in df.iterrows():
nm = str(row.get('name',''))
if nm and nm not in name_to_row:
name_to_row[nm] = row.to_dict()
tapped_info = [] # (name, penalty, tapped_flag)
total_tapped = 0
for name, entry in list(self.card_library.items()):
row = name_to_row.get(name)
if not row:
continue
tline = str(row.get('type', row.get('type_line',''))).lower()
if 'land' not in tline:
continue
text_field = str(row.get('text', row.get('oracleText',''))).lower()
tapped_flag, penalty = bu.tapped_land_penalty(tline, text_field)
if tapped_flag:
total_tapped += 1
tapped_info.append((name, penalty, tapped_flag))
if total_tapped <= threshold:
self.output_func(f"Tapped Optimization (Step 8): {total_tapped} tapped/conditional lands (threshold {threshold}); no changes.")
return
over = total_tapped - threshold
swap_min_penalty = getattr(bc, 'TAPPED_LAND_SWAP_MIN_PENALTY', 6)
tapped_info.sort(key=lambda x: x[1], reverse=True)
to_consider = [t for t in tapped_info if t[1] >= swap_min_penalty]
if not to_consider:
self.output_func(f"Tapped Optimization (Step 8): Over threshold ({total_tapped}>{threshold}) but no suitable swaps (penalties too low).")
return
replacement_candidates: List[str] = []
seen = set(self.card_library.keys())
colors = [c for c in getattr(self, 'color_identity', []) if c in ['W','U','B','R','G']]
for _, row in df.iterrows():
try:
name = str(row.get('name',''))
if not name or name in seen or name in replacement_candidates:
continue
tline = str(row.get('type', row.get('type_line',''))).lower()
if 'land' not in tline:
continue
text_field = str(row.get('text', row.get('oracleText',''))).lower()
if 'enters the battlefield tapped' in text_field and 'you may pay 2 life' not in text_field and 'unless you control' not in text_field:
continue
produces_color = any(sym in text_field for sym in ['{w}','{u}','{b}','{r}','{g}'])
basic_types = [b for b in ['plains','island','swamp','mountain','forest'] if b in tline]
mapped = set()
for b in basic_types:
if b == 'plains':
mapped.add('W')
elif b == 'island':
mapped.add('U')
elif b == 'swamp':
mapped.add('B')
elif b == 'mountain':
mapped.add('R')
elif b == 'forest':
mapped.add('G')
if not produces_color and not (mapped & set(colors)):
continue
replacement_candidates.append(name)
except Exception:
continue
def repl_rank(name: str) -> int:
row = name_to_row.get(name, {})
tline = str(row.get('type', row.get('type_line','')))
text_field = str(row.get('text', row.get('oracleText','')))
return bu.replacement_land_score(name, tline, text_field)
replacement_candidates.sort(key=repl_rank, reverse=True)
swaps_made = []
idx_rep = 0
for name, penalty, _ in to_consider:
if over <= 0:
break
if not self._decrement_card(name):
continue
replacement = None
while idx_rep < len(replacement_candidates):
cand = replacement_candidates[idx_rep]
idx_rep += 1
if cand in getattr(bc, 'GENERIC_FETCH_LANDS', []) or any(cand in lst for lst in getattr(bc, 'COLOR_TO_FETCH_LANDS', {}).values()):
fetch_cap = getattr(bc, 'FETCH_LAND_MAX_CAP', 99)
existing_fetches = bu.count_existing_fetches(self.card_library)
if existing_fetches >= fetch_cap:
continue
replacement = cand
break
if replacement is None:
basics = self._basic_land_names()
basic_counts = {b: self.card_library.get(b, {}).get('Count',0) for b in basics}
color_basic_map = {'W':'Plains','U':'Island','B':'Swamp','R':'Mountain','G':'Forest'}
usable_basics = [color_basic_map[c] for c in colors if color_basic_map[c] in basics]
usable_basics.sort(key=lambda b: basic_counts.get(b,0))
replacement = usable_basics[0] if usable_basics else 'Wastes'
self.add_card(
replacement,
card_type='Land',
role='optimized',
sub_role='swap-in',
added_by='lands_step8',
trigger_tag='tapped_optimization'
)
swaps_made.append((name, replacement))
over -= 1
if not swaps_made:
self.output_func(f"Tapped Optimization (Step 8): Could not perform swaps; over threshold {total_tapped}>{threshold}.")
return
self.output_func("\nTapped Optimization (Step 8) Swaps:")
for old, new in swaps_made:
self.output_func(f" Replaced {old} -> {new}")
new_tapped = 0
for name, entry in self.card_library.items():
row = name_to_row.get(name)
if not row:
continue
text_field = str(row.get('text', row.get('oracleText',''))).lower()
if 'enters the battlefield tapped' in text_field and 'you may pay 2 life' not in text_field:
new_tapped += 1
self.output_func(f" Tapped Lands After : {new_tapped} (threshold {threshold})")
def run_land_step8(self): # type: ignore[override]
self.optimize_tapped_lands()
self._enforce_land_cap(step_label="Tapped Opt (Step 8)")
if self.color_source_matrix_baseline is None:
self.color_source_matrix_baseline = self._compute_color_source_matrix()

View file

@ -0,0 +1,150 @@
from __future__ import annotations
from typing import List, Dict
from .. import builder_constants as bc
"""Phase 2 (part 2): Staple nonbasic lands (Land Step 2).
Extracted logic for adding generic staple lands (excluding kindred / tribal, fetches, etc.).
Provided by LandStaplesMixin:
- _current_land_count(): counts land cards currently in library.
- add_staple_lands(): core staple inclusion logic with capacity management.
- run_land_step2(): public wrapper invoked by orchestrator.
Expected host DeckBuilder attributes / methods:
- card_library (dict), output_func
- files_to_load, determine_color_identity(), setup_dataframes()
- ideal_counts (dict) possibly present
- commander_tags, selected_tags, commander_row
- helper methods: _basic_floor, _count_basic_lands, _choose_basic_to_trim, _decrement_card, _enforce_land_cap
- builder_constants imported as bc in host package (we import locally for clarity)
"""
# (Imports moved to top for lint compliance)
class LandStaplesMixin:
# ---------------------------
# Land Building Step 2: Staple Nonbasic Lands (NO Kindred yet)
# ---------------------------
def _current_land_count(self) -> int: # type: ignore[override]
"""Return total number of land cards currently in the library (counts duplicates)."""
total = 0
for name, entry in self.card_library.items(): # type: ignore[attr-defined]
ctype = entry.get('Card Type', '')
if ctype and 'land' in ctype.lower():
total += entry.get('Count', 1)
continue
df = getattr(self, '_combined_cards_df', None)
if df is not None and 'name' in getattr(df, 'columns', []):
try:
row = df[df['name'] == name]
if not row.empty:
type_field = str(row.iloc[0].get('type', '')).lower()
if 'land' in type_field:
total += entry.get('Count', 1)
except Exception:
continue
return total
def add_staple_lands(self): # type: ignore[override]
"""Add generic staple lands defined in STAPLE_LAND_CONDITIONS (excluding kindred lands).
Respects total land target (ideal_counts['lands']). Skips additions once target reached.
Conditions may use commander tags (all available, not just selected), color identity, and commander power.
"""
if not getattr(self, 'files_to_load', []):
try:
self.determine_color_identity()
self.setup_dataframes()
except Exception as e: # pragma: no cover - defensive
self.output_func(f"Cannot add staple lands until color identity resolved: {e}")
return
land_target = None
if hasattr(self, 'ideal_counts') and getattr(self, 'ideal_counts'):
land_target = self.ideal_counts.get('lands') # type: ignore[attr-defined]
if land_target is None:
land_target = getattr(bc, 'DEFAULT_LAND_COUNT', 35)
min_basic_cfg = getattr(bc, 'DEFAULT_BASIC_LAND_COUNT', 20)
if hasattr(self, 'ideal_counts') and getattr(self, 'ideal_counts'):
min_basic_cfg = self.ideal_counts.get('basic_lands', min_basic_cfg) # type: ignore[attr-defined]
basic_floor = self._basic_floor(min_basic_cfg) # type: ignore[attr-defined]
def ensure_capacity() -> bool:
if self._current_land_count() < land_target: # type: ignore[attr-defined]
return True
if self._count_basic_lands() <= basic_floor: # type: ignore[attr-defined]
return False
target_basic = self._choose_basic_to_trim() # type: ignore[attr-defined]
if not target_basic:
return False
if not self._decrement_card(target_basic): # type: ignore[attr-defined]
return False
return self._current_land_count() < land_target # type: ignore[attr-defined]
commander_tags_all = set(getattr(self, 'commander_tags', []) or []) | set(getattr(self, 'selected_tags', []) or [])
colors = getattr(self, 'color_identity', []) or []
commander_power = 0
try:
row = getattr(self, 'commander_row', None)
if row is not None:
raw_power = row.get('power')
if isinstance(raw_power, (int, float)):
commander_power = int(raw_power)
elif isinstance(raw_power, str) and raw_power.isdigit():
commander_power = int(raw_power)
except Exception:
commander_power = 0
added: List[str] = []
reasons: Dict[str, str] = {}
for land_name, cond in getattr(bc, 'STAPLE_LAND_CONDITIONS', {}).items():
if not ensure_capacity():
self.output_func("Staple Lands: Cannot free capacity without violating basic floor; stopping additions.")
break
if land_name in self.card_library: # type: ignore[attr-defined]
continue
try:
include = cond(list(commander_tags_all), colors, commander_power)
except Exception:
include = False
if include:
self.add_card(
land_name,
card_type='Land',
role='staple',
sub_role='generic-staple',
added_by='lands_step2'
) # type: ignore[attr-defined]
added.append(land_name)
if land_name == 'Command Tower':
reasons[land_name] = f"multi-color ({len(colors)} colors)"
elif land_name == 'Exotic Orchard':
reasons[land_name] = f"multi-color ({len(colors)} colors)"
elif land_name == 'War Room':
reasons[land_name] = f"<=2 colors ({len(colors)})"
elif land_name == 'Reliquary Tower':
reasons[land_name] = 'always include'
elif land_name == 'Ash Barrens':
reasons[land_name] = 'no Landfall tag'
elif land_name == "Rogue's Passage":
reasons[land_name] = f"commander power {commander_power} >=5"
self.output_func("\nStaple Lands Added (Step 2):")
if not added:
self.output_func(" (None added)")
else:
width = max(len(n) for n in added)
for n in added:
reason = reasons.get(n, '')
self.output_func(f" {n.ljust(width)} : 1 {('(' + reason + ')') if reason else ''}")
self.output_func(f" Land Count Now : {self._current_land_count()} / {land_target}") # type: ignore[attr-defined]
def run_land_step2(self): # type: ignore[override]
"""Public wrapper for adding generic staple nonbasic lands (excluding kindred)."""
self.add_staple_lands()
self._enforce_land_cap(step_label="Staples (Step 2)") # type: ignore[attr-defined]
__all__ = [
'LandStaplesMixin'
]

View file

@ -0,0 +1,231 @@
from __future__ import annotations
from typing import Optional, List, Dict, Set
import re
from .. import builder_constants as bc
class LandTripleMixin:
"""Mixin providing logic for adding three-color (triple) lands (Step 6).
Extraction rationale:
- Isolates a coherent land selection concern from the monolithic builder.
- Mirrors earlier land step mixins with add_* and run_land_step6 methods.
Strategy:
1. Determine if the deck's color identity has at least 3 colors; else skip.
2. Build a pool of candidate triple lands whose type line / name indicates they
produce at least three of the deck colors (heuristic; full rules parsing is
intentionally avoided for speed / simplicity with CSV data).
3. Avoid adding duplicates or previously selected lands.
4. Trim basics (above a computed floor) if capacity is reached and we still
desire triple lands.
5. Respect user-provided requested_count if supplied; otherwise fall back to
default constant and capacity.
6. Apply a simple ranking + slight randomization for determinism + variety.
"""
def add_triple_lands(self, requested_count: Optional[int] = None):
# Preconditions: color identity & dataframes
if not getattr(self, 'files_to_load', None):
try:
self.determine_color_identity()
self.setup_dataframes()
except Exception as e: # pragma: no cover - defensive
self.output_func(f"Cannot add triple lands until setup complete: {e}")
return
colors = [c for c in getattr(self, 'color_identity', []) if c in ['W','U','B','R','G']]
if len(colors) < 3:
self.output_func("Triple Lands: Fewer than three colors; skipping step 6.")
return
land_target = getattr(self, 'ideal_counts', {}).get('lands', getattr(bc, 'DEFAULT_LAND_COUNT', 35)) if getattr(self, 'ideal_counts', None) else getattr(bc, 'DEFAULT_LAND_COUNT', 35)
df = getattr(self, '_combined_cards_df', None)
if df is None or df.empty or not {'name','type'}.issubset(df.columns):
self.output_func("Triple Lands: No combined card dataframe or missing columns; skipping.")
return
pool: List[str] = []
meta: Dict[str, str] = {}
wanted: Set[str] = set(colors)
basic_map = {
'plains': 'W',
'island': 'U',
'swamp': 'B',
'mountain': 'R',
'forest': 'G',
}
for _, row in df.iterrows(): # type: ignore
try:
name = str(row.get('name',''))
if not name or name in self.card_library:
continue
tline = str(row.get('type','')).lower()
if 'land' not in tline:
continue
# Heuristic: count unique basic types in type line
types_present = [b for b in basic_map if b in tline]
mapped = {basic_map[b] for b in types_present}
# Extract color production from rules text if present
text_field = str(row.get('text', row.get('oracleText',''))).lower()
color_syms = set(re.findall(r'\{([wubrg])\}', text_field))
color_syms_mapped = {c.upper() for c in color_syms}
lname = name.lower()
tri_keywords = [
'triome','panorama','citadel','tower','hub','garden','headquarters','sanctuary',
'stronghold','outpost','campus','shrine','domain','estate'
]
# Decide if candidate qualifies:
qualifies_by_types = len(mapped) >= 3
qualifies_by_text = len(color_syms_mapped) >= 3 and color_syms_mapped.issubset(wanted)
qualifies_by_name = any(kw in lname for kw in tri_keywords)
if not (qualifies_by_types or qualifies_by_text or (qualifies_by_name and (len(mapped) >= 2 or len(color_syms_mapped) >= 2))):
continue
# Consolidate produced colors for validation (prefer typed, else text)
produced = mapped if mapped else color_syms_mapped
if not produced.issubset(wanted):
continue
if qualifies_by_types or len(produced) >= 3:
pool.append(name)
meta[name] = tline
else:
pool.append(name)
meta[name] = tline + ' (heuristic-tri)'
except Exception: # pragma: no cover - defensive
continue
# De-duplicate while preserving order
pool = list(dict.fromkeys(pool))
if not pool:
self.output_func("Triple Lands: No candidates found.")
return
# Ranking heuristic: fully triple-typed > untapped > others; penalize ETB tapped
def rank(name: str) -> int:
tline = meta.get(name, '')
score = 0
if '(heuristic-tri)' not in tline:
score += 5
if 'enters the battlefield tapped' not in tline:
score += 2
if 'cycling' in tline:
score += 1
if 'enters the battlefield tapped' in tline and 'you gain' in tline:
score -= 1
return score
pool.sort(key=lambda n: rank(n), reverse=True)
# Slight randomized shuffle weighted by rank for variety
rng = getattr(self, 'rng', None) or self._get_rng()
try:
weighted = []
for n in pool:
w = max(1, rank(n)) + 1
weighted.append((n, w))
shuffled: List[str] = []
while weighted:
total = sum(w for _, w in weighted)
r = rng.random() * total
acc = 0.0
for idx, (n, w) in enumerate(weighted):
acc += w
if r <= acc:
shuffled.append(n)
del weighted[idx]
break
pool = shuffled
except Exception: # pragma: no cover - fallback
pass
# Capacity handling
remaining_capacity = max(0, land_target - self._current_land_count())
default_triple_target = getattr(bc, 'TRIPLE_LAND_DEFAULT_COUNT', 3)
effective_default = min(default_triple_target, remaining_capacity if remaining_capacity>0 else len(pool), len(pool))
if requested_count is None:
desired = effective_default
else:
desired = max(0, int(requested_count))
if desired == 0:
self.output_func("Triple Lands: Desired count 0; skipping.")
return
min_basic_cfg = getattr(bc, 'DEFAULT_BASIC_LAND_COUNT', 20)
if hasattr(self, 'ideal_counts') and self.ideal_counts:
min_basic_cfg = self.ideal_counts.get('basic_lands', min_basic_cfg)
basic_floor = self._basic_floor(min_basic_cfg)
if remaining_capacity == 0 and desired > 0:
slots_needed = desired
freed = 0
while freed < slots_needed and self._count_basic_lands() > basic_floor:
target_basic = self._choose_basic_to_trim()
if not target_basic:
break
if not self._decrement_card(target_basic):
break
freed += 1
if freed == 0:
desired = 0
remaining_capacity = max(0, land_target - self._current_land_count())
desired = min(desired, remaining_capacity, len(pool))
if desired <= 0:
self.output_func("Triple Lands: No capacity after trimming; skipping.")
return
added: List[str] = []
for name in pool:
if len(added) >= desired or self._current_land_count() >= land_target:
break
# Infer color trio from type line basic types
try:
row_match = df[df['name'] == name]
tline = ''
text_field = ''
sub_role = None
if not row_match.empty:
rw = row_match.iloc[0]
tline = str(rw.get('type','')).lower()
text_field = str(rw.get('text', rw.get('oracleText',''))).lower()
trio = []
for basic, col in [('plains','W'),('island','U'),('swamp','B'),('mountain','R'),('forest','G')]:
if basic in tline:
trio.append(col)
if len(trio) < 3:
color_syms = set(re.findall(r'\{([wubrg])\}', text_field))
trio = [c.upper() for c in color_syms]
if len(trio) >= 2:
sub_role = ''.join(sorted(set(trio)))
except Exception:
sub_role = None
self.add_card(
name,
card_type='Land',
role='triple',
sub_role=sub_role,
added_by='lands_step6'
)
added.append(name)
self.output_func("\nTriple Lands Added (Step 6):")
if not added:
self.output_func(" (None added)")
else:
width = max(len(n) for n in added)
for n in added:
self.output_func(f" {n.ljust(width)} : 1")
self.output_func(f" Land Count Now : {self._current_land_count()} / {land_target}")
def run_land_step6(self, requested_count: Optional[int] = None):
self.add_triple_lands(requested_count=requested_count)
self._enforce_land_cap(step_label="Triples (Step 6)")

View file

@ -0,0 +1,181 @@
from __future__ import annotations
import math
from typing import List, Dict
from .. import builder_constants as bc
from .. import builder_utils as bu
import logging_util
logger = logging_util.logging.getLogger(__name__)
class CreatureAdditionMixin:
"""Phase 3: Creature addition logic extracted from monolithic builder.
Responsibilities:
- Determine per-theme allocation weights (1-3 themes supported)
- Apply kindred/tribal multipliers when multiple themes selected
- Prioritize cards matching multiple selected themes
- Avoid duplicating the commander
- Deterministic weighted sampling via builder_utils helper
"""
def add_creatures(self): # noqa: C901 (complexity preserved during extraction)
df = getattr(self, '_combined_cards_df', None)
if df is None or df.empty:
self.output_func("Card pool not loaded; cannot add creatures.")
return
if 'type' not in df.columns:
self.output_func("Card pool missing 'type' column; cannot add creatures.")
return
themes_ordered: List[tuple[str, str]] = []
if self.primary_tag:
themes_ordered.append(('primary', self.primary_tag))
if self.secondary_tag:
themes_ordered.append(('secondary', self.secondary_tag))
if self.tertiary_tag:
themes_ordered.append(('tertiary', self.tertiary_tag))
if not themes_ordered:
self.output_func("No themes selected; skipping creature addition.")
return
desired_total = (self.ideal_counts.get('creatures') if getattr(self, 'ideal_counts', None) else None) or getattr(bc, 'DEFAULT_CREATURE_COUNT', 25)
n_themes = len(themes_ordered)
if n_themes == 1:
base_map = {'primary': 1.0}
elif n_themes == 2:
base_map = {'primary': 0.6, 'secondary': 0.4}
else:
base_map = {'primary': 0.5, 'secondary': 0.3, 'tertiary': 0.2}
weights: Dict[str, float] = {}
boosted_roles: set[str] = set()
if n_themes > 1:
for role, tag in themes_ordered:
w = base_map.get(role, 0.0)
lt = tag.lower()
if 'kindred' in lt or 'tribal' in lt:
mult = getattr(bc, 'WEIGHT_ADJUSTMENT_FACTORS', {}).get(f'kindred_{role}', 1.0)
w *= mult
boosted_roles.add(role)
weights[role] = w
total = sum(weights.values())
if total > 1.0:
for r in list(weights):
weights[r] /= total
else:
rem = 1.0 - total
base_sum_unboosted = sum(base_map[r] for r,_t in themes_ordered if r not in boosted_roles)
if rem > 1e-6 and base_sum_unboosted > 0:
for r,_t in themes_ordered:
if r not in boosted_roles:
weights[r] += rem * (base_map[r] / base_sum_unboosted)
else:
weights['primary'] = 1.0
creature_df = df[df['type'].str.contains('Creature', case=False, na=False)].copy()
commander_name = getattr(self, 'commander', None) or getattr(self, 'commander_name', None)
if commander_name and 'name' in creature_df.columns:
creature_df = creature_df[creature_df['name'] != commander_name]
if creature_df.empty:
self.output_func("No creature rows in dataset; skipping.")
return
selected_tags_lower = [t.lower() for _r,t in themes_ordered]
if '_parsedThemeTags' not in creature_df.columns:
creature_df['_parsedThemeTags'] = creature_df['themeTags'].apply(bu.normalize_tag_cell)
creature_df['_normTags'] = creature_df['_parsedThemeTags']
creature_df['_multiMatch'] = creature_df['_normTags'].apply(lambda lst: sum(1 for t in selected_tags_lower if t in lst))
base_top = 30
top_n = int(base_top * getattr(bc, 'THEME_POOL_SIZE_MULTIPLIER', 2.0))
synergy_bonus = getattr(bc, 'THEME_PRIORITY_BONUS', 1.2)
total_added = 0
added_names: List[str] = []
per_theme_added: Dict[str, List[str]] = {r: [] for r,_t in themes_ordered}
for role, tag in themes_ordered:
w = weights.get(role, 0.0)
if w <= 0:
continue
remaining = max(0, desired_total - total_added)
if remaining == 0:
break
target = int(math.ceil(desired_total * w * self._get_rng().uniform(1.0, 1.1)))
target = min(target, remaining)
if target <= 0:
continue
tnorm = tag.lower()
subset = creature_df[creature_df['_normTags'].apply(lambda lst, tn=tnorm: (tn in lst) or any(tn in x for x in lst))]
if subset.empty:
self.output_func(f"Theme '{tag}' produced no creature candidates.")
continue
if 'edhrecRank' in subset.columns:
subset = subset.sort_values(by=['_multiMatch','edhrecRank','manaValue'], ascending=[False, True, True], na_position='last')
elif 'manaValue' in subset.columns:
subset = subset.sort_values(by=['_multiMatch','manaValue'], ascending=[False, True], na_position='last')
pool = subset.head(top_n).copy()
pool = pool[~pool['name'].isin(added_names)]
if pool.empty:
continue
weighted_pool = [(nm, (synergy_bonus if mm >= 2 else 1.0)) for nm, mm in zip(pool['name'], pool['_multiMatch'])]
chosen = bu.weighted_sample_without_replacement(weighted_pool, target)
for nm in chosen:
if commander_name and nm == commander_name:
continue
row = pool[pool['name']==nm].iloc[0]
self.add_card(
nm,
card_type=row.get('type','Creature'),
mana_cost=row.get('manaCost',''),
mana_value=row.get('manaValue', row.get('cmc','')),
creature_types=row.get('creatureTypes', []) if isinstance(row.get('creatureTypes', []), list) else [],
tags=row.get('themeTags', []) if isinstance(row.get('themeTags', []), list) else [],
role='creature',
sub_role=role,
added_by='creature_add',
trigger_tag=tag,
synergy=int(row.get('_multiMatch', 0)) if '_multiMatch' in row else None
)
added_names.append(nm)
per_theme_added[role].append(nm)
total_added += 1
if total_added >= desired_total:
break
self.output_func(f"Added {len(per_theme_added[role])} creatures for {role} theme '{tag}' (target {target}).")
if total_added >= desired_total:
break
if total_added < desired_total:
need = desired_total - total_added
multi_pool = creature_df[~creature_df['name'].isin(added_names)].copy()
multi_pool = multi_pool[multi_pool['_multiMatch'] > 0]
if not multi_pool.empty:
if 'edhrecRank' in multi_pool.columns:
multi_pool = multi_pool.sort_values(by=['_multiMatch','edhrecRank','manaValue'], ascending=[False, True, True], na_position='last')
elif 'manaValue' in multi_pool.columns:
multi_pool = multi_pool.sort_values(by=['_multiMatch','manaValue'], ascending=[False, True], na_position='last')
fill = multi_pool['name'].tolist()[:need]
for nm in fill:
if commander_name and nm == commander_name:
continue
row = multi_pool[multi_pool['name']==nm].iloc[0]
self.add_card(
nm,
card_type=row.get('type','Creature'),
mana_cost=row.get('manaCost',''),
mana_value=row.get('manaValue', row.get('cmc','')),
creature_types=row.get('creatureTypes', []) if isinstance(row.get('creatureTypes', []), list) else [],
tags=row.get('themeTags', []) if isinstance(row.get('themeTags', []), list) else [],
role='creature',
sub_role='fill',
added_by='creature_fill',
synergy=int(row.get('_multiMatch', 0)) if '_multiMatch' in row else None
)
added_names.append(nm)
total_added += 1
if total_added >= desired_total:
break
self.output_func(f"Fill pass added {min(need, len(fill))} extra creatures (shortfall compensation).")
self.output_func("\nCreatures Added:")
for role, tag in themes_ordered:
lst = per_theme_added.get(role, [])
if lst:
self.output_func(f" {role.title()} '{tag}': {len(lst)}")
for nm in lst:
self.output_func(f" - {nm}")
else:
self.output_func(f" {role.title()} '{tag}': 0")
self.output_func(f" Total {total_added}/{desired_total}{' (dataset shortfall)' if total_added < desired_total else ''}")

View file

@ -0,0 +1,613 @@
from __future__ import annotations
import math
from typing import List, Dict
from .. import builder_utils as bu
from .. import builder_constants as bc
import logging_util
logger = logging_util.logging.getLogger(__name__)
class SpellAdditionMixin:
"""Phase 4: Non-creature spell additions (ramp, removal, wipes, draw, protection, thematic filler).
Extracted intact from monolithic builder. Logic intentionally unchanged; future refinements
(e.g., further per-category sub-mixins) can split this class if complexity grows.
"""
# ---------------------------
# Ramp
# ---------------------------
def add_ramp(self): # noqa: C901
"""Add ramp pieces in three phases: mana rocks (~1/3), mana dorks (~1/4), then general/other.
Selection is deterministic priority based: lowest edhrecRank then lowest mana value.
No theme weighting simple best-available filtering while avoiding duplicates.
"""
if not self._combined_cards_df is not None: # preserve original logic
return
target_total = self.ideal_counts.get('ramp', 0)
if target_total <= 0:
return
already = {n.lower() for n in self.card_library.keys()}
df = self._combined_cards_df
if 'name' not in df.columns:
return
work = df.copy()
work['_ltags'] = work.get('themeTags', []).apply(bu.normalize_tag_cell)
work = work[work['_ltags'].apply(lambda tags: any('ramp' in t for t in tags))]
if work.empty:
self.output_func('No ramp-tagged cards found in dataset.')
return
existing_ramp = 0
for name, entry in self.card_library.items():
if any(isinstance(t, str) and 'ramp' in t.lower() for t in entry.get('Tags', [])):
existing_ramp += 1
to_add, _bonus = bu.compute_adjusted_target('Ramp', target_total, existing_ramp, self.output_func, plural_word='ramp spells')
if existing_ramp >= target_total and to_add == 0:
return
if existing_ramp < target_total:
target_total = to_add
else:
target_total = to_add
work = work[~work['type'].fillna('').str.contains('Land', case=False, na=False)]
commander_name = getattr(self, 'commander', None)
if commander_name:
work = work[work['name'] != commander_name]
work = bu.sort_by_priority(work, ['edhrecRank','manaValue'])
rocks_target = min(target_total, math.ceil(target_total/3))
dorks_target = min(target_total - rocks_target, math.ceil(target_total/4))
added_rocks: List[str] = []
added_dorks: List[str] = []
added_general: List[str] = []
def add_from_pool(pool, remaining_needed, added_list, phase_name):
added_now = 0
for _, r in pool.iterrows():
nm = r['name']
if nm.lower() in already:
continue
self.add_card(
nm,
card_type=r.get('type',''),
mana_cost=r.get('manaCost',''),
mana_value=r.get('manaValue', r.get('cmc','')),
tags=r.get('themeTags', []) if isinstance(r.get('themeTags', []), list) else [],
role='ramp',
sub_role=phase_name.lower(),
added_by='spell_ramp'
)
already.add(nm.lower())
added_list.append(nm)
added_now += 1
if added_now >= remaining_needed:
break
if added_now:
self.output_func(f"Ramp phase {phase_name}: added {added_now}/{remaining_needed} target.")
return added_now
rocks_pool = work[work['type'].fillna('').str.contains('Artifact', case=False, na=False)]
if rocks_target > 0:
add_from_pool(rocks_pool, rocks_target, added_rocks, 'Rocks')
dorks_pool = work[work['type'].fillna('').str.contains('Creature', case=False, na=False)]
if dorks_target > 0:
add_from_pool(dorks_pool, dorks_target, added_dorks, 'Dorks')
current_total = len(added_rocks) + len(added_dorks)
remaining = target_total - current_total
if remaining > 0:
general_pool = work[~work['name'].isin(added_rocks + added_dorks)]
add_from_pool(general_pool, remaining, added_general, 'General')
total_added_now = len(added_rocks)+len(added_dorks)+len(added_general)
self.output_func(f"Total Ramp Added This Pass: {total_added_now}/{target_total}")
if total_added_now < target_total:
self.output_func('Ramp shortfall due to limited dataset.')
if total_added_now:
self.output_func("Ramp Cards Added:")
for nm in added_rocks:
self.output_func(f" [Rock] {nm}")
for nm in added_dorks:
self.output_func(f" [Dork] {nm}")
for nm in added_general:
self.output_func(f" [General] {nm}")
# ---------------------------
# Removal
# ---------------------------
def add_removal(self): # noqa: C901
target = self.ideal_counts.get('removal', 0)
if target <= 0 or self._combined_cards_df is None:
return
already = {n.lower() for n in self.card_library.keys()}
df = self._combined_cards_df.copy()
if 'name' not in df.columns:
return
df['_ltags'] = df.get('themeTags', []).apply(bu.normalize_tag_cell)
def is_removal(tags):
return any('removal' in t or 'spot removal' in t for t in tags)
def is_wipe(tags):
return any('board wipe' in t or 'mass removal' in t for t in tags)
pool = df[df['_ltags'].apply(is_removal) & ~df['_ltags'].apply(is_wipe)]
pool = pool[~pool['type'].fillna('').str.contains('Land', case=False, na=False)]
commander_name = getattr(self, 'commander', None)
if commander_name:
pool = pool[pool['name'] != commander_name]
pool = bu.sort_by_priority(pool, ['edhrecRank','manaValue'])
existing = 0
for name, entry in self.card_library.items():
lt = [str(t).lower() for t in entry.get('Tags', [])]
if any(('removal' in t or 'spot removal' in t) for t in lt) and not any(('board wipe' in t or 'mass removal' in t) for t in lt):
existing += 1
to_add, _bonus = bu.compute_adjusted_target('Removal', target, existing, self.output_func, plural_word='removal spells')
if existing >= target and to_add == 0:
return
target = to_add if existing < target else to_add
added = 0
added_names: List[str] = []
for _, r in pool.iterrows():
if added >= target:
break
nm = r['name']
if nm.lower() in already:
continue
self.add_card(
nm,
card_type=r.get('type',''),
mana_cost=r.get('manaCost',''),
mana_value=r.get('manaValue', r.get('cmc','')),
tags=r.get('themeTags', []) if isinstance(r.get('themeTags', []), list) else [],
role='removal',
sub_role='spot',
added_by='spell_removal'
)
already.add(nm.lower())
added += 1
added_names.append(nm)
self.output_func(f"Added Spot Removal This Pass: {added}/{target}{' (dataset shortfall)' if added < target else ''}")
if added_names:
self.output_func('Removal Cards Added:')
for nm in added_names:
self.output_func(f" - {nm}")
# ---------------------------
# Board Wipes
# ---------------------------
def add_board_wipes(self):
target = self.ideal_counts.get('wipes', 0)
if target <= 0 or self._combined_cards_df is None:
return
already = {n.lower() for n in self.card_library.keys()}
df = self._combined_cards_df.copy()
df['_ltags'] = df.get('themeTags', []).apply(bu.normalize_tag_cell)
def is_wipe(tags):
return any('board wipe' in t or 'mass removal' in t for t in tags)
pool = df[df['_ltags'].apply(is_wipe)]
pool = pool[~pool['type'].fillna('').str.contains('Land', case=False, na=False)]
commander_name = getattr(self, 'commander', None)
if commander_name:
pool = pool[pool['name'] != commander_name]
pool = bu.sort_by_priority(pool, ['edhrecRank','manaValue'])
existing = 0
for name, entry in self.card_library.items():
tags = [str(t).lower() for t in entry.get('Tags', [])]
if any(('board wipe' in t or 'mass removal' in t) for t in tags):
existing += 1
to_add, _bonus = bu.compute_adjusted_target('Board wipe', target, existing, self.output_func, plural_word='wipes')
if existing >= target and to_add == 0:
return
target = to_add if existing < target else to_add
added = 0
added_names: List[str] = []
for _, r in pool.iterrows():
if added >= target:
break
nm = r['name']
if nm.lower() in already:
continue
self.add_card(
nm,
card_type=r.get('type',''),
mana_cost=r.get('manaCost',''),
mana_value=r.get('manaValue', r.get('cmc','')),
tags=r.get('themeTags', []) if isinstance(r.get('themeTags', []), list) else [],
role='wipe',
sub_role='board',
added_by='spell_wipe'
)
already.add(nm.lower())
added += 1
added_names.append(nm)
self.output_func(f"Added Board Wipes This Pass: {added}/{target}{' (dataset shortfall)' if added < target else ''}")
if added_names:
self.output_func('Board Wipes Added:')
for nm in added_names:
self.output_func(f" - {nm}")
# ---------------------------
# Card Advantage
# ---------------------------
def add_card_advantage(self): # noqa: C901
total_target = self.ideal_counts.get('card_advantage', 0)
if total_target <= 0 or self._combined_cards_df is None:
return
existing = 0
for name, entry in self.card_library.items():
tags = [str(t).lower() for t in entry.get('Tags', [])]
if any(('draw' in t) or ('card advantage' in t) for t in tags):
existing += 1
to_add_total, _bonus = bu.compute_adjusted_target('Card advantage', total_target, existing, self.output_func, plural_word='draw spells')
if existing >= total_target and to_add_total == 0:
return
total_target = to_add_total if existing < total_target else to_add_total
conditional_target = min(total_target, math.ceil(total_target * 0.2))
already = {n.lower() for n in self.card_library.keys()}
df = self._combined_cards_df.copy()
df['_ltags'] = df.get('themeTags', []).apply(bu.normalize_tag_cell)
def is_draw(tags):
return any(('draw' in t) or ('card advantage' in t) for t in tags)
df = df[df['_ltags'].apply(is_draw)]
df = df[~df['type'].fillna('').str.contains('Land', case=False, na=False)]
commander_name = getattr(self, 'commander', None)
if commander_name:
df = df[df['name'] != commander_name]
CONDITIONAL_KEYS = ['conditional', 'situational', 'attacks', 'combat damage', 'when you cast']
def is_conditional(tags):
return any(any(k in t for k in CONDITIONAL_KEYS) for t in tags)
conditional_df = df[df['_ltags'].apply(is_conditional)]
unconditional_df = df[~df.index.isin(conditional_df.index)]
def sortit(d):
return bu.sort_by_priority(d, ['edhrecRank','manaValue'])
conditional_df = sortit(conditional_df)
unconditional_df = sortit(unconditional_df)
added_cond = 0
added_cond_names: List[str] = []
for _, r in conditional_df.iterrows():
if added_cond >= conditional_target:
break
nm = r['name']
if nm.lower() in already:
continue
self.add_card(
nm,
card_type=r.get('type',''),
mana_cost=r.get('manaCost',''),
mana_value=r.get('manaValue', r.get('cmc','')),
tags=r.get('themeTags', []) if isinstance(r.get('themeTags', []), list) else [],
role='card_advantage',
sub_role='conditional',
added_by='spell_draw'
)
already.add(nm.lower())
added_cond += 1
added_cond_names.append(nm)
remaining = total_target - added_cond
added_uncond = 0
added_uncond_names: List[str] = []
if remaining > 0:
for _, r in unconditional_df.iterrows():
if added_uncond >= remaining:
break
nm = r['name']
if nm.lower() in already:
continue
self.add_card(
nm,
card_type=r.get('type',''),
mana_cost=r.get('manaCost',''),
mana_value=r.get('manaValue', r.get('cmc','')),
tags=r.get('themeTags', []) if isinstance(r.get('themeTags', []), list) else [],
role='card_advantage',
sub_role='unconditional',
added_by='spell_draw'
)
already.add(nm.lower())
added_uncond += 1
added_uncond_names.append(nm)
self.output_func(f"Added Card Advantage This Pass: conditional {added_cond}/{conditional_target}, total {(added_cond+added_uncond)}/{total_target}{' (dataset shortfall)' if (added_cond+added_uncond) < total_target else ''}")
if added_cond_names or added_uncond_names:
self.output_func('Card Advantage Cards Added:')
for nm in added_cond_names:
self.output_func(f" [Conditional] {nm}")
for nm in added_uncond_names:
self.output_func(f" [Unconditional] {nm}")
# ---------------------------
# Protection
# ---------------------------
def add_protection(self):
target = self.ideal_counts.get('protection', 0)
if target <= 0 or self._combined_cards_df is None:
return
already = {n.lower() for n in self.card_library.keys()}
df = self._combined_cards_df.copy()
df['_ltags'] = df.get('themeTags', []).apply(bu.normalize_tag_cell)
pool = df[df['_ltags'].apply(lambda tags: any('protection' in t for t in tags))]
pool = pool[~pool['type'].fillna('').str.contains('Land', case=False, na=False)]
commander_name = getattr(self, 'commander', None)
if commander_name:
pool = pool[pool['name'] != commander_name]
pool = bu.sort_by_priority(pool, ['edhrecRank','manaValue'])
existing = 0
for name, entry in self.card_library.items():
tags = [str(t).lower() for t in entry.get('Tags', [])]
if any('protection' in t for t in tags):
existing += 1
to_add, _bonus = bu.compute_adjusted_target('Protection', target, existing, self.output_func, plural_word='protection spells')
if existing >= target and to_add == 0:
return
target = to_add if existing < target else to_add
added = 0
added_names: List[str] = []
for _, r in pool.iterrows():
if added >= target:
break
nm = r['name']
if nm.lower() in already:
continue
self.add_card(
nm,
card_type=r.get('type',''),
mana_cost=r.get('manaCost',''),
mana_value=r.get('manaValue', r.get('cmc','')),
tags=r.get('themeTags', []) if isinstance(r.get('themeTags', []), list) else [],
role='protection',
added_by='spell_protection'
)
already.add(nm.lower())
added += 1
added_names.append(nm)
self.output_func(f"Added Protection This Pass: {added}/{target}{' (dataset shortfall)' if added < target else ''}")
if added_names:
self.output_func('Protection Cards Added:')
for nm in added_names:
self.output_func(f" - {nm}")
# ---------------------------
# Theme Spell Filler to 100
# ---------------------------
def fill_remaining_theme_spells(self): # noqa: C901
total_cards = sum(entry.get('Count', 1) for entry in self.card_library.values())
remaining = 100 - total_cards
if remaining <= 0:
return
df = getattr(self, '_combined_cards_df', None)
if df is None or df.empty or 'type' not in df.columns:
return
themes_ordered: List[tuple[str, str]] = []
if self.primary_tag:
themes_ordered.append(('primary', self.primary_tag))
if self.secondary_tag:
themes_ordered.append(('secondary', self.secondary_tag))
if self.tertiary_tag:
themes_ordered.append(('tertiary', self.tertiary_tag))
if not themes_ordered:
return
n_themes = len(themes_ordered)
if n_themes == 1:
base_map = {'primary': 1.0}
elif n_themes == 2:
base_map = {'primary': 0.6, 'secondary': 0.4}
else:
base_map = {'primary': 0.5, 'secondary': 0.3, 'tertiary': 0.2}
weights: Dict[str, float] = {}
boosted: set[str] = set()
if n_themes > 1:
for role, tag in themes_ordered:
w = base_map.get(role, 0.0)
lt = tag.lower()
if 'kindred' in lt or 'tribal' in lt:
mult = getattr(bc, 'WEIGHT_ADJUSTMENT_FACTORS', {}).get(f'kindred_{role}', 1.0)
w *= mult
boosted.add(role)
weights[role] = w
tot = sum(weights.values())
if tot > 1.0:
for r in weights:
weights[r] /= tot
else:
rem = 1.0 - tot
base_sum_unboosted = sum(base_map[r] for r, _ in themes_ordered if r not in boosted)
if rem > 1e-6 and base_sum_unboosted > 0:
for r, _ in themes_ordered:
if r not in boosted:
weights[r] += rem * (base_map[r] / base_sum_unboosted)
else:
weights['primary'] = 1.0
spells_df = df[
~df['type'].str.contains('Land', case=False, na=False)
& ~df['type'].str.contains('Creature', case=False, na=False)
].copy()
if spells_df.empty:
return
selected_tags_lower = [t.lower() for _r, t in themes_ordered]
if '_parsedThemeTags' not in spells_df.columns:
spells_df['_parsedThemeTags'] = spells_df['themeTags'].apply(bu.normalize_tag_cell)
spells_df['_normTags'] = spells_df['_parsedThemeTags']
spells_df['_multiMatch'] = spells_df['_normTags'].apply(
lambda lst: sum(1 for t in selected_tags_lower if t in lst)
)
base_top = 40
top_n = int(base_top * getattr(bc, 'THEME_POOL_SIZE_MULTIPLIER', 2.0))
synergy_bonus = getattr(bc, 'THEME_PRIORITY_BONUS', 1.2)
per_theme_added: Dict[str, List[str]] = {r: [] for r, _t in themes_ordered}
total_added = 0
for role, tag in themes_ordered:
if remaining - total_added <= 0:
break
w = weights.get(role, 0.0)
if w <= 0:
continue
target = int(math.ceil(remaining * w * self._get_rng().uniform(1.0, 1.1)))
target = min(target, remaining - total_added)
if target <= 0:
continue
tnorm = tag.lower()
subset = spells_df[
spells_df['_normTags'].apply(
lambda lst, tn=tnorm: (tn in lst) or any(tn in x for x in lst)
)
]
if subset.empty:
continue
if 'edhrecRank' in subset.columns:
subset = subset.sort_values(
by=['_multiMatch', 'edhrecRank', 'manaValue'],
ascending=[False, True, True],
na_position='last',
)
elif 'manaValue' in subset.columns:
subset = subset.sort_values(
by=['_multiMatch', 'manaValue'],
ascending=[False, True],
na_position='last',
)
pool = subset.head(top_n).copy()
pool = pool[~pool['name'].isin(self.card_library.keys())]
if pool.empty:
continue
weighted_pool = [ (nm, (synergy_bonus if mm >= 2 else 1.0)) for nm, mm in zip(pool['name'], pool['_multiMatch']) ]
chosen = bu.weighted_sample_without_replacement(weighted_pool, target)
for nm in chosen:
row = pool[pool['name'] == nm].iloc[0]
self.add_card(
nm,
card_type=row.get('type', ''),
mana_cost=row.get('manaCost', ''),
mana_value=row.get('manaValue', row.get('cmc', '')),
tags=row.get('themeTags', []) if isinstance(row.get('themeTags', []), list) else [],
role='theme_spell',
sub_role=role,
added_by='spell_theme_fill',
trigger_tag=tag,
synergy=int(row.get('_multiMatch', 0)) if '_multiMatch' in row else None
)
per_theme_added[role].append(nm)
total_added += 1
if total_added >= remaining:
break
if total_added < remaining:
need = remaining - total_added
multi_pool = spells_df[~spells_df['name'].isin(self.card_library.keys())].copy()
multi_pool = multi_pool[multi_pool['_multiMatch'] > 0]
if not multi_pool.empty:
if 'edhrecRank' in multi_pool.columns:
multi_pool = multi_pool.sort_values(
by=['_multiMatch', 'edhrecRank', 'manaValue'],
ascending=[False, True, True],
na_position='last',
)
elif 'manaValue' in multi_pool.columns:
multi_pool = multi_pool.sort_values(
by=['_multiMatch', 'manaValue'],
ascending=[False, True],
na_position='last',
)
fill = multi_pool['name'].tolist()[:need]
for nm in fill:
row = multi_pool[multi_pool['name'] == nm].iloc[0]
self.add_card(
nm,
card_type=row.get('type', ''),
mana_cost=row.get('manaCost', ''),
mana_value=row.get('manaValue', row.get('cmc', '')),
tags=row.get('themeTags', []) if isinstance(row.get('themeTags', []), list) else [],
role='theme_spell',
sub_role='fill_multi',
added_by='spell_theme_fill',
synergy=int(row.get('_multiMatch', 0)) if '_multiMatch' in row else None
)
total_added += 1
if total_added >= remaining:
break
if total_added < remaining:
extra_needed = remaining - total_added
leftover = spells_df[~spells_df['name'].isin(self.card_library.keys())].copy()
if not leftover.empty:
if '_normTags' not in leftover.columns:
leftover['_normTags'] = leftover['themeTags'].apply(
lambda x: [str(t).lower() for t in x] if isinstance(x, list) else []
)
def has_any(tag_list, needles):
return any(any(nd in t for nd in needles) for t in tag_list)
def classify(row):
tags = row['_normTags']
if has_any(tags, ['ramp']):
return 'ramp'
if has_any(tags, ['card advantage', 'draw']):
return 'card_advantage'
if has_any(tags, ['protection']):
return 'protection'
if has_any(tags, ['board wipe', 'mass removal']):
return 'board_wipe'
if has_any(tags, ['removal']):
return 'removal'
return ''
leftover['_fillerCat'] = leftover.apply(classify, axis=1)
random_added: List[str] = []
for _ in range(extra_needed):
candidates_by_cat: Dict[str, any] = {}
for cat in ['ramp','card_advantage','protection','board_wipe','removal']:
subset = leftover[leftover['_fillerCat'] == cat]
if not subset.empty:
candidates_by_cat[cat] = subset
if not candidates_by_cat:
subset = leftover
else:
cat_choice = self._get_rng().choice(list(candidates_by_cat.keys()))
subset = candidates_by_cat[cat_choice]
if 'edhrecRank' in subset.columns:
subset = subset.sort_values(by=['edhrecRank','manaValue'], ascending=[True, True], na_position='last')
elif 'manaValue' in subset.columns:
subset = subset.sort_values(by=['manaValue'], ascending=[True], na_position='last')
row = subset.head(1)
if row.empty:
break
r0 = row.iloc[0]
nm = r0['name']
self.add_card(
nm,
card_type=r0.get('type',''),
mana_cost=r0.get('manaCost',''),
mana_value=r0.get('manaValue', r0.get('cmc','')),
tags=r0.get('themeTags', []) if isinstance(r0.get('themeTags', []), list) else [],
role='filler',
sub_role=r0.get('_fillerCat',''),
added_by='spell_general_filler'
)
random_added.append(nm)
leftover = leftover[leftover['name'] != nm]
total_added += 1
if total_added >= remaining:
break
if random_added:
self.output_func(" General Utility Filler Added:")
for nm in random_added:
self.output_func(f" - {nm}")
if total_added:
self.output_func("\nFinal Theme Spell Fill:")
for role, tag in themes_ordered:
lst = per_theme_added.get(role, [])
if lst:
self.output_func(f" {role.title()} '{tag}': {len(lst)}")
for nm in lst:
self.output_func(f" - {nm}")
self.output_func(f" Total Theme Spells Added: {total_added}")
# ---------------------------
# Orchestrator
# ---------------------------
def add_non_creature_spells(self):
"""Convenience orchestrator calling remaining non-creature spell categories then thematic fill."""
self.add_ramp()
self.add_removal()
self.add_board_wipes()
self.add_card_advantage()
self.add_protection()
self.fill_remaining_theme_spells()
self.print_type_summary()

View file

@ -0,0 +1,188 @@
from __future__ import annotations
from typing import Dict, Optional, List
import logging_util
from .. import builder_utils as bu
from .. import builder_constants as bc # noqa: F401 (future use / constants reference)
logger = logging_util.logging.getLogger(__name__)
class ColorBalanceMixin:
"""Phase 5A: Post-spell color source analysis & basic land rebalance.
Provides helper computations for color source matrix & spell pip weights plus
the post-spell adjustment routine that can (optionally) swap lands and
rebalance basics to better align mana sources with spell pip demand.
"""
# ---------------------------
# Color / pip computation helpers (cached)
# ---------------------------
def _compute_color_source_matrix(self) -> Dict[str, Dict[str,int]]:
if self._color_source_matrix_cache is not None and not self._color_source_cache_dirty:
return self._color_source_matrix_cache
matrix = bu.compute_color_source_matrix(self.card_library, getattr(self, '_full_cards_df', None))
self._color_source_matrix_cache = matrix
self._color_source_cache_dirty = False
return matrix
def _compute_spell_pip_weights(self) -> Dict[str, float]:
if self._spell_pip_weights_cache is not None and not self._spell_pip_cache_dirty:
return self._spell_pip_weights_cache
weights = bu.compute_spell_pip_weights(self.card_library, self.color_identity)
self._spell_pip_weights_cache = weights
self._spell_pip_cache_dirty = False
return weights
def _current_color_source_counts(self) -> Dict[str,int]:
matrix = self._compute_color_source_matrix()
counts = {c:0 for c in ['W','U','B','R','G']}
for name, colors in matrix.items():
entry = self.card_library.get(name, {})
copies = entry.get('Count',1)
for c, v in colors.items():
if v:
counts[c] += copies
return counts
# ---------------------------
# Post-spell land adjustment & basic rebalance
# ---------------------------
def post_spell_land_adjust(self,
pip_weights: Optional[Dict[str,float]] = None,
color_shortfall_threshold: float = 0.15,
perform_swaps: bool = True,
max_swaps: int = 5,
rebalance_basics: bool = True): # noqa: C901
if pip_weights is None:
pip_weights = self._compute_spell_pip_weights()
if self.color_source_matrix_baseline is None:
self.color_source_matrix_baseline = self._compute_color_source_matrix()
current_counts = self._current_color_source_counts()
total_sources = sum(current_counts.values()) or 1
source_share = {c: current_counts[c]/total_sources for c in current_counts}
deficits: List[tuple[str,float,float,float]] = []
for c in ['W','U','B','R','G']:
pip_share = pip_weights.get(c,0.0)
s_share = source_share.get(c,0.0)
gap = pip_share - s_share
if gap > color_shortfall_threshold and pip_share > 0.0:
deficits.append((c,pip_share,s_share,gap))
self.output_func("\nPost-Spell Color Distribution Analysis:")
self.output_func(" Color | Pip% | Source% | Diff%")
for c in ['W','U','B','R','G']:
self.output_func(f" {c:>1} {pip_weights.get(c,0.0)*100:5.1f}% {source_share.get(c,0.0)*100:6.1f}% {(pip_weights.get(c,0.0)-source_share.get(c,0.0))*100:6.1f}%")
if not deficits:
self.output_func(" No color deficits above threshold.")
else:
self.output_func(" Deficits (need more sources):")
for c, pip_share, s_share, gap in deficits:
self.output_func(f" {c}: need +{gap*100:.1f}% sources (pip {pip_share*100:.1f}% vs sources {s_share*100:.1f}%)")
if not perform_swaps or not deficits:
self.output_func(" (No land swaps performed.)")
return
df = getattr(self, '_combined_cards_df', None)
if df is None or df.empty:
self.output_func(" Swap engine: card pool unavailable; aborting swaps.")
return
deficits.sort(key=lambda x: x[3], reverse=True)
swaps_done: List[tuple[str,str,str]] = []
overages: Dict[str,float] = {}
for c in ['W','U','B','R','G']:
over = source_share.get(c,0.0) - pip_weights.get(c,0.0)
if over > 0:
overages[c] = over
def removal_candidate(exclude_colors: set[str]) -> Optional[str]:
return bu.select_color_balance_removal(self, exclude_colors, overages)
def addition_candidates(target_color: str) -> List[str]:
return bu.color_balance_addition_candidates(self, target_color, df)
for color, _, _, gap in deficits:
if len(swaps_done) >= max_swaps:
break
adds = addition_candidates(color)
if not adds:
continue
to_add = None
for cand in adds:
if cand not in self.card_library:
to_add = cand
break
if not to_add:
continue
to_remove = removal_candidate({color})
if not to_remove:
continue
if not self._decrement_card(to_remove):
continue
self.add_card(to_add, card_type='Land', role='color-fix', sub_role='swap-add', added_by='color_balance')
swaps_done.append((to_remove, to_add, color))
current_counts = self._current_color_source_counts()
total_sources = sum(current_counts.values()) or 1
source_share = {c: current_counts[c]/total_sources for c in current_counts}
new_gap = pip_weights.get(color,0.0) - source_share.get(color,0.0)
if new_gap <= color_shortfall_threshold:
continue
if swaps_done:
self.output_func("\nColor Balance Swaps Performed:")
for old, new, col in swaps_done:
self.output_func(f" [{col}] Replaced {old} -> {new}")
final_counts = self._current_color_source_counts()
final_total = sum(final_counts.values()) or 1
final_source_share = {c: final_counts[c]/final_total for c in final_counts}
self.output_func(" Updated Source Shares:")
for c in ['W','U','B','R','G']:
self.output_func(f" {c}: {final_source_share.get(c,0.0)*100:5.1f}% (pip {pip_weights.get(c,0.0)*100:5.1f}%)")
if rebalance_basics:
try:
basic_map = getattr(bc, 'COLOR_TO_BASIC_LAND', {})
basics_present = {nm: entry for nm, entry in self.card_library.items() if nm in basic_map.values()}
if basics_present:
total_basics = sum(e.get('Count',1) for e in basics_present.values())
if total_basics > 0:
desired_per_color: Dict[str,int] = {}
for c, basic_name in basic_map.items():
if c not in ['W','U','B','R','G']:
continue
desired = pip_weights.get(c,0.0) * total_basics
desired_per_color[c] = int(round(desired))
drift = total_basics - sum(desired_per_color.values())
if drift != 0:
ordered = sorted(desired_per_color.items(), key=lambda kv: pip_weights.get(kv[0],0.0), reverse=(drift>0))
i = 0
while drift != 0 and ordered:
c,_ = ordered[i % len(ordered)]
desired_per_color[c] += 1 if drift>0 else -1
drift += -1 if drift>0 else 1
i += 1
changes: List[tuple[str,int,int]] = []
for c, basic_name in basic_map.items():
if c not in ['W','U','B','R','G']:
continue
target = max(0, desired_per_color.get(c,0))
entry = self.card_library.get(basic_name)
old = entry.get('Count',0) if entry else 0
if old == 0 and target>0:
for _ in range(target):
self.add_card(basic_name, card_type='Land')
changes.append((basic_name, 0, target))
elif entry and old != target:
if target > old:
for _ in range(target-old):
self.add_card(basic_name, card_type='Land')
else:
for _ in range(old-target):
self._decrement_card(basic_name)
changes.append((basic_name, old, target))
if changes:
self.output_func("\nBasic Land Rebalance (toward pip distribution):")
for nm, old, new in changes:
self.output_func(f" {nm}: {old} -> {new}")
except Exception as e: # pragma: no cover (defensive)
self.output_func(f" Basic rebalance skipped (error: {e})")
else:
self.output_func(" (No viable swaps executed.)")

View file

@ -0,0 +1,218 @@
from __future__ import annotations
from typing import Dict, List
import csv
import os
import datetime as _dt
import re as _re
import logging_util
logger = logging_util.logging.getLogger(__name__)
try:
from prettytable import PrettyTable # type: ignore
except Exception: # pragma: no cover
PrettyTable = None # type: ignore
class ReportingMixin:
"""Phase 6: Reporting, summaries, and export helpers."""
def _wrap_cell(self, text: str, width: int = 28) -> str:
words = text.split()
lines: List[str] = []
current_line = []
current_len = 0
for w in words:
if current_len + len(w) + (1 if current_line else 0) > width:
lines.append(' '.join(current_line))
current_line = [w]
current_len = len(w)
else:
current_line.append(w)
current_len += len(w) + (1 if len(current_line) > 1 else 0)
if current_line:
lines.append(' '.join(current_line))
return '\n'.join(lines)
def print_type_summary(self):
type_counts: Dict[str,int] = {}
for name, info in self.card_library.items():
ctype = info.get('Type', 'Unknown')
cnt = info.get('Count',1)
type_counts[ctype] = type_counts.get(ctype,0) + cnt
total_cards = sum(type_counts.values())
self.output_func("\nType Summary:")
for t, c in sorted(type_counts.items(), key=lambda kv: (-kv[1], kv[0])):
self.output_func(f" {t:<15} {c:>3} ({(c/total_cards*100 if total_cards else 0):5.1f}%)")
def export_decklist_csv(self, directory: str = 'deck_files', filename: str | None = None) -> str:
"""Export current decklist to CSV (enriched).
Filename pattern (default): commanderFirstWord_firstTheme_YYYYMMDD.csv
Included columns (enriched when possible):
Name, Count, Type, ManaCost, ManaValue, Colors, Power, Toughness, Role, Tags, Text
Falls back gracefully if snapshot rows missing.
"""
os.makedirs(directory, exist_ok=True)
if filename is None:
cmdr = getattr(self, 'commander_name', '') or getattr(self, 'commander', '') or ''
cmdr_first = cmdr.split()[0] if cmdr else 'deck'
theme = getattr(self, 'primary_tag', None) or (self.selected_tags[0] if getattr(self, 'selected_tags', []) else None)
theme_first = str(theme).split()[0] if theme else 'notheme'
def _slug(s: str) -> str:
s2 = _re.sub(r'[^A-Za-z0-9_]+', '', s)
return s2 or 'x'
cmdr_slug = _slug(cmdr_first)
theme_slug = _slug(theme_first)
date_part = _dt.date.today().strftime('%Y%m%d')
filename = f"{cmdr_slug}_{theme_slug}_{date_part}.csv"
fname = os.path.join(directory, filename)
full_df = getattr(self, '_full_cards_df', None)
combined_df = getattr(self, '_combined_cards_df', None)
snapshot = full_df if full_df is not None else combined_df
row_lookup: Dict[str, any] = {}
if snapshot is not None and not snapshot.empty and 'name' in snapshot.columns:
for _, r in snapshot.iterrows():
nm = str(r.get('name'))
if nm not in row_lookup:
row_lookup[nm] = r
headers = [
"Name","Count","Type","ManaCost","ManaValue","Colors","Power","Toughness",
"Role","SubRole","AddedBy","TriggerTag","Synergy","Tags","Text"
]
# Precedence list for sorting
precedence_order = [
'Commander', 'Battle', 'Planeswalker', 'Creature', 'Instant', 'Sorcery', 'Artifact', 'Enchantment', 'Land'
]
precedence_index = {k: i for i, k in enumerate(precedence_order)}
commander_name = getattr(self, 'commander_name', '') or getattr(self, 'commander', '') or ''
def classify(primary_type_line: str, card_name: str) -> str:
if commander_name and card_name == commander_name:
return 'Commander'
tl = (primary_type_line or '').lower()
if 'battle' in tl:
return 'Battle'
if 'planeswalker' in tl:
return 'Planeswalker'
if 'creature' in tl:
return 'Creature'
if 'instant' in tl:
return 'Instant'
if 'sorcery' in tl:
return 'Sorcery'
if 'artifact' in tl:
return 'Artifact'
if 'enchantment' in tl:
return 'Enchantment'
if 'land' in tl:
return 'Land'
return 'ZZZ'
rows: List[tuple] = [] # (sort_key, row_data)
for name, info in self.card_library.items():
base_type = info.get('Card Type') or info.get('Type','')
base_mc = info.get('Mana Cost','')
base_mv = info.get('Mana Value', info.get('CMC',''))
role = info.get('Role','') or ''
tags = info.get('Tags',[]) or []
tags_join = '; '.join(tags)
text_field = ''
colors = ''
power = ''
toughness = ''
row = row_lookup.get(name)
if row is not None:
row_type = row.get('type', row.get('type_line', ''))
if row_type:
base_type = row_type
mc = row.get('manaCost', '')
if mc:
base_mc = mc
mv = row.get('manaValue', row.get('cmc', ''))
if mv not in (None, ''):
base_mv = mv
colors_raw = row.get('colorIdentity', row.get('colors', []))
if isinstance(colors_raw, list):
colors = ''.join(colors_raw)
elif colors_raw not in (None, ''):
colors = str(colors_raw)
power = row.get('power', '') or ''
toughness = row.get('toughness', '') or ''
text_field = row.get('text', row.get('oracleText', '')) or ''
# Normalize and coerce text
if isinstance(text_field, str):
cleaned = text_field
else:
try:
import math as _math
if isinstance(text_field, float) and (_math.isnan(text_field)):
cleaned = ''
else:
cleaned = str(text_field) if text_field is not None else ''
except Exception:
cleaned = str(text_field) if text_field is not None else ''
cleaned = cleaned.replace('\n', ' ').replace('\r', ' ')
while ' ' in cleaned:
cleaned = cleaned.replace(' ', ' ')
text_field = cleaned
cat = classify(base_type, name)
prec = precedence_index.get(cat, 999)
# Alphabetical within category (no mana value sorting)
rows.append(((prec, name.lower()), [
name,
info.get('Count',1),
base_type,
base_mc,
base_mv,
colors,
power,
toughness,
info.get('Role') or role,
info.get('SubRole') or '',
info.get('AddedBy') or '',
info.get('TriggerTag') or '',
info.get('Synergy') if info.get('Synergy') is not None else '',
tags_join,
text_field[:800] if isinstance(text_field, str) else str(text_field)[:800]
]))
# Now sort (category precedence, then alphabetical name)
rows.sort(key=lambda x: x[0])
with open(fname, 'w', newline='', encoding='utf-8') as f:
w = csv.writer(f)
w.writerow(headers)
for _, data_row in rows:
w.writerow(data_row)
self.output_func(f"Deck exported to {fname}")
return fname
def print_card_library(self, table: bool = True): # noqa: C901
if table and PrettyTable is None:
table = False
if not table:
self.output_func("\nCard Library:")
for name, info in sorted(self.card_library.items()):
self.output_func(f" {info.get('Count',1)}x {name} [{info.get('Type','')}] ({info.get('Role','')})")
return
# PrettyTable mode
pt = PrettyTable()
pt.field_names = ["Name","Count","Type","CMC","Role","Tags","Notes"]
pt.align = 'l'
for name, info in sorted(self.card_library.items()):
pt.add_row([
self._wrap_cell(name),
info.get('Count',1),
info.get('Type',''),
info.get('CMC',''),
self._wrap_cell(info.get('Role','')),
self._wrap_cell(','.join(info.get('Tags',[]) or [])),
self._wrap_cell(info.get('SourceNotes',''))
])
self.output_func("\nCard Library (tabular):")
self.output_func(pt.get_string())

View file

@ -17,7 +17,7 @@ Indices correspond to the numbered tag list presented during interaction.
"""
def run(
command_name: str = "Rocco, Street Chef",
command_name: str = "Pantlaza",
add_creatures: bool = True,
add_non_creature_spells: bool = True,
# Fine-grained toggles (used only if add_non_creature_spells is False)
@ -27,9 +27,9 @@ def run(
add_card_advantage: bool = True,
add_protection: bool = True,
use_multi_theme: bool = True,
primary_choice: int = 9,
secondary_choice: Optional[int] = 1,
tertiary_choice: Optional[int] = 11,
primary_choice: int = 2,
secondary_choice: Optional[int] = 2,
tertiary_choice: Optional[int] = 2,
add_lands: bool = True,
fetch_count: Optional[int] = 3,
dual_count: Optional[int] = None,
@ -62,7 +62,7 @@ def run(
else:
scripted_inputs.append("0") # stop at primary
# Bracket (meta power / style) selection; keeping existing scripted value
scripted_inputs.append("5")
scripted_inputs.append("3")
# Ideal count prompts (press Enter for defaults)
for _ in range(8):
scripted_inputs.append("")