Enacted some cleanup of builder.py. moved Imports to the top and cut out duplicate code/moved it to builder_utils.py

This commit is contained in:
mwisnowski 2025-08-19 15:00:28 -07:00
parent b7ee6ea57d
commit ff1912f979
5 changed files with 201 additions and 333 deletions

View file

@ -3,10 +3,20 @@ from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any, Callable, Tuple
import pandas as pd
import math
import random
import re
import datetime
import os
import csv
import textwrap
from . import builder_constants as bc
from . import builder_utils as bu
# Attempt to use a fast fuzzy library; fall back gracefully
try:
from rapidfuzz import process as rf_process, fuzz as rf_fuzz
_FUZZ_BACKEND = "rapidfuzz"
@ -186,6 +196,9 @@ class DeckBuilder:
# IO injection for testing
input_func: Callable[[str], str] = field(default=lambda prompt: input(prompt))
output_func: Callable[[str], None] = field(default=lambda msg: print(msg))
# Deterministic random support
seed: Optional[int] = None
_rng: Any = field(default=None, repr=False)
# ---------------------------
# High-level Orchestration
@ -228,6 +241,15 @@ class DeckBuilder:
except Exception as e:
self.output_func(f"Deck build failed: {e}")
# ---------------------------
# RNG Initialization
# ---------------------------
def _get_rng(self): # lazy init to allow seed set post-construction
if self._rng is None:
import random as _r
self._rng = _r.Random(self.seed) if self.seed is not None else _r
return self._rng
# ---------------------------
# Data Loading
# ---------------------------
@ -349,7 +371,6 @@ class DeckBuilder:
# Pretty Display
# ---------------------------
def _format_commander_pretty(self, row: pd.Series) -> str:
import math
def norm(val):
if isinstance(val, list) and len(val) == 1:
@ -620,7 +641,6 @@ class DeckBuilder:
try:
if '{' in mana_cost and '}' in mana_cost:
# naive parse: digits add numeric value; individual colored symbols count as 1
import re
symbols = re.findall(r'\{([^}]+)\}', mana_cost)
total = 0
for sym in symbols:
@ -1437,7 +1457,6 @@ class DeckBuilder:
if desired <= 0:
self.output_func("Fetch Lands: No capacity (after trimming) or desired reduced to 0; skipping.")
return
import random
rng = getattr(self, 'rng', None)
color_specific_all: list[str] = []
for c in color_order:
@ -1496,7 +1515,6 @@ class DeckBuilder:
future tuning (e.g., dynamic by color count, bracket, or pip distribution) only
needs a single change. min_basic_cfg already accounts for ideal_counts override.
"""
import math
try:
return max(0, int(math.ceil(bc.BASIC_FLOOR_FACTOR * float(min_basic_cfg))))
except Exception:
@ -1604,18 +1622,16 @@ class DeckBuilder:
# This prevents always selecting the exact same first few duals when
# capacity is limited (e.g., consistently only the top 4 of 7 available).
if len(names) > 1:
rng_obj = getattr(self, 'rng', None)
try:
rng_obj = getattr(self, 'rng', None)
weighted: list[tuple[str, int]] = []
for n in names:
# Base weight derived from rank() (ensure >=1) and mildly amplified
w = max(1, rank(n)) + 1
weighted.append((n, w))
shuffled: list[str] = []
import random as _rand
while weighted:
total = sum(w for _, w in weighted)
r = (rng_obj.random() if rng_obj else _rand.random()) * total
r = (rng_obj.random() if rng_obj else self._get_rng().random()) * total
acc = 0.0
for idx, (n, w) in enumerate(weighted):
acc += w
@ -1625,11 +1641,10 @@ class DeckBuilder:
break
pair_buckets[key] = shuffled
except Exception:
pair_buckets[key] = names # fallback to deterministic order
pair_buckets[key] = names
else:
pair_buckets[key] = names
import random
min_basic_cfg = getattr(bc, 'DEFAULT_BASIC_LAND_COUNT', 20)
if hasattr(self, 'ideal_counts') and self.ideal_counts:
min_basic_cfg = self.ideal_counts.get('basic_lands', min_basic_cfg)
@ -1812,14 +1827,13 @@ class DeckBuilder:
for key, names in tri_buckets.items():
names.sort(key=lambda n: rank(n), reverse=True)
if len(names) > 1:
rng_obj = getattr(self, 'rng', None)
try:
rng_obj = getattr(self, 'rng', None)
weighted = [(n, max(1, rank(n))+1) for n in names]
import random as _rand
shuffled: list[str] = []
while weighted:
total = sum(w for _, w in weighted)
r = (rng_obj.random() if rng_obj else _rand.random()) * total
r = (rng_obj.random() if rng_obj else self._get_rng().random()) * total
acc = 0.0
for idx, (n, w) in enumerate(weighted):
acc += w
@ -1832,7 +1846,6 @@ class DeckBuilder:
tri_buckets[key] = names
else:
tri_buckets[key] = names
import random
min_basic_cfg = getattr(bc, 'DEFAULT_BASIC_LAND_COUNT', 20)
if hasattr(self, 'ideal_counts') and self.ideal_counts:
min_basic_cfg = self.ideal_counts.get('basic_lands', min_basic_cfg)
@ -1965,7 +1978,7 @@ class DeckBuilder:
# Build candidate pool using helper
basics = self._basic_land_names()
already = set(self.card_library.keys())
from . import builder_utils as bu
top_n = getattr(bc, 'MISC_LAND_TOP_POOL_SIZE', 30)
top_candidates = bu.select_top_land_candidates(df, already, basics, top_n)
if not top_candidates:
@ -2092,7 +2105,7 @@ class DeckBuilder:
tapped_info: list[tuple[str,int,int]] = [] # (name, penalty, tapped_flag 1/0)
total_tapped = 0
from . import builder_utils as bu
for name, entry in list(self.card_library.items()):
# Only consider lands
row = name_to_row.get(name)
@ -2233,7 +2246,7 @@ class DeckBuilder:
# Tag-driven utility suggestions
# ---------------------------
def _build_tag_driven_land_suggestions(self):
from . import builder_utils as bu
# Delegate construction of suggestion dicts to utility module.
suggestions = bu.build_tag_driven_suggestions(self)
if suggestions:
@ -2285,7 +2298,7 @@ class DeckBuilder:
# Cached: recompute only if dirty
if self._color_source_matrix_cache is not None and not self._color_source_cache_dirty:
return self._color_source_matrix_cache
from . import builder_utils as bu
matrix = bu.compute_color_source_matrix(self.card_library, getattr(self, '_full_cards_df', None))
self._color_source_matrix_cache = matrix
self._color_source_cache_dirty = False
@ -2297,7 +2310,7 @@ class DeckBuilder:
def _compute_spell_pip_weights(self) -> Dict[str, float]:
if self._spell_pip_weights_cache is not None and not self._spell_pip_cache_dirty:
return self._spell_pip_weights_cache
from . import builder_utils as bu
weights = bu.compute_spell_pip_weights(self.card_library, self.color_identity)
self._spell_pip_weights_cache = weights
self._spell_pip_cache_dirty = False
@ -2369,11 +2382,11 @@ class DeckBuilder:
overages[c] = over
def removal_candidate(exclude_colors: set[str]) -> Optional[str]:
from . import builder_utils as bu
return bu.select_color_balance_removal(self, exclude_colors, overages)
def addition_candidates(target_color: str) -> List[str]:
from . import builder_utils as bu
return bu.color_balance_addition_candidates(self, target_color, df)
for color, _, _, gap in deficits:
@ -2416,7 +2429,6 @@ class DeckBuilder:
self.output_func(f" {c}: {final_source_share.get(c,0.0)*100:5.1f}% (pip {pip_weights.get(c,0.0)*100:5.1f}%)")
if rebalance_basics:
try:
from deck_builder import builder_constants as bc
basic_map = getattr(bc, 'COLOR_TO_BASIC_LAND', {})
basics_present = {nm: entry for nm, entry in self.card_library.items() if nm in basic_map.values()}
if basics_present:
@ -2470,17 +2482,17 @@ class DeckBuilder:
# ---------------------------
def _basic_land_names(self) -> set:
"""Return set of all basic (and snow basic) land names plus Wastes."""
from . import builder_utils as bu
return bu.basic_land_names()
def _count_basic_lands(self) -> int:
"""Count total copies of basic lands currently in the library."""
from . import builder_utils as bu
return bu.count_basic_lands(self.card_library)
def _choose_basic_to_trim(self) -> Optional[str]:
"""Return a basic land name to trim (highest count) or None."""
from . import builder_utils as bu
return bu.choose_basic_to_trim(self.card_library)
def _decrement_card(self, name: str) -> bool:
@ -2506,7 +2518,7 @@ class DeckBuilder:
def _enforce_land_cap(self, step_label: str = ""):
"""Delegate land cap enforcement to utility helper."""
from . import builder_utils as bu
bu.enforce_land_cap(self, step_label)
# ===========================
@ -2523,10 +2535,6 @@ class DeckBuilder:
Kindred multipliers applied only when >1 theme.
Synergy prioritizes cards matching multiple selected themes.
"""
import re
import ast
import math
import random
df = getattr(self, '_combined_cards_df', None)
if df is None or df.empty:
self.output_func("Card pool not loaded; cannot add creatures.")
@ -2576,35 +2584,7 @@ class DeckBuilder:
weights[r] += rem * (base_map[r] / base_sum_unboosted)
else:
weights['primary'] = 1.0
def _parse_theme_tags(val) -> list[str]:
if isinstance(val, list):
out: list[str] = []
for v in val:
if isinstance(v, list):
out.extend(str(x) for x in v)
else:
out.append(str(v))
return [s.strip() for s in out if s and s.strip()]
if isinstance(val, str):
s = val.strip()
try:
parsed = ast.literal_eval(s)
if isinstance(parsed, list):
return [str(x).strip() for x in parsed if str(x).strip()]
except Exception:
pass
if s.startswith('[') and s.endswith(']'):
s = s[1:-1]
parts = [p.strip().strip("'\"") for p in s.split(',')]
cleaned = []
for p in parts:
if not p:
continue
q = re.sub(r"^[\[\s']+|[\]\s']+$", '', p)
if q:
cleaned.append(q)
return cleaned
return []
# Tag parsing unified via builder_utils.normalize_tag_cell
creature_df = df[df['type'].str.contains('Creature', case=False, na=False)].copy()
# Exclude commander from candidate pool to avoid duplicating it in creature additions
commander_name = getattr(self, 'commander', None) or getattr(self, 'commander_name', None)
@ -2615,8 +2595,8 @@ class DeckBuilder:
return
selected_tags_lower = [t.lower() for _r,t in themes_ordered]
if '_parsedThemeTags' not in creature_df.columns:
creature_df['_parsedThemeTags'] = creature_df['themeTags'].apply(_parse_theme_tags)
creature_df['_normTags'] = creature_df['_parsedThemeTags'].apply(lambda lst: [s.lower() for s in lst])
creature_df['_parsedThemeTags'] = creature_df['themeTags'].apply(bu.normalize_tag_cell)
creature_df['_normTags'] = creature_df['_parsedThemeTags'] # already lowercase
creature_df['_multiMatch'] = creature_df['_normTags'].apply(lambda lst: sum(1 for t in selected_tags_lower if t in lst))
base_top = 30
top_n = int(base_top * getattr(bc, 'THEME_POOL_SIZE_MULTIPLIER', 2.0))
@ -2631,7 +2611,7 @@ class DeckBuilder:
remaining = max(0, desired_total - total_added)
if remaining == 0:
break
target = int(math.ceil(desired_total * w * random.uniform(1.0, 1.1)))
target = int(math.ceil(desired_total * w * self._get_rng().uniform(1.0, 1.1)))
target = min(target, remaining)
if target <= 0:
continue
@ -2648,26 +2628,9 @@ class DeckBuilder:
pool = pool[~pool['name'].isin(added_names)]
if pool.empty:
continue
weights_vec = [synergy_bonus if mm >= 2 else 1.0 for mm in pool['_multiMatch']]
names_vec = pool['name'].tolist()
chosen: list[str] = []
try:
for _ in range(min(target, len(names_vec))):
totw = sum(weights_vec)
if totw <= 0:
break
r = random.random() * totw
acc = 0.0
idx = 0
for i, wv in enumerate(weights_vec):
acc += wv
if r <= acc:
idx = i
break
chosen.append(names_vec.pop(idx))
weights_vec.pop(idx)
except Exception:
chosen = names_vec[:target]
# Weighted sampling using helper
weighted_pool = [ (nm, (synergy_bonus if mm >= 2 else 1.0)) for nm, mm in zip(pool['name'], pool['_multiMatch']) ]
chosen = bu.weighted_sample_without_replacement(weighted_pool, target)
for nm in chosen:
if commander_name and nm == commander_name:
continue # safeguard
@ -2741,65 +2704,34 @@ class DeckBuilder:
if 'name' not in df.columns:
return
def norm_tags(cell):
if isinstance(cell, list):
return [str(t).strip().lower() for t in cell]
if isinstance(cell, str) and cell.strip():
# attempt split on common delimiters / list literal
raw = cell
for ch in '[]'":":
raw = raw.replace(ch, ' ')
parts = [p.strip().strip("'\"").lower() for p in raw.replace(';', ',').split(',') if p.strip()]
return parts
return []
work = df.copy()
work['_ltags'] = work.get('themeTags', []).apply(norm_tags)
# identify ramp-tagged cards (contains substring 'ramp')
def is_ramp(tags):
return any('ramp' in t for t in tags)
work = work[work['_ltags'].apply(is_ramp)]
work['_ltags'] = work.get('themeTags', []).apply(bu.normalize_tag_cell)
work = work[work['_ltags'].apply(lambda tags: any('ramp' in t for t in tags))]
if work.empty:
self.output_func('No ramp-tagged cards found in dataset.')
return
# Count existing ramp already in library & compute random bonus (0-20%) of original configured target
import math
import random
original_cfg = target_total
# Compute adjusted ramp target (with random bonus) via helper
existing_ramp = 0
for name, entry in self.card_library.items():
for t in entry.get('Tags', []):
if isinstance(t, str) and 'ramp' in t.lower():
existing_ramp += 1
break
bonus = math.ceil(original_cfg * random.uniform(0.0, 0.2)) if original_cfg > 0 else 0
if existing_ramp >= original_cfg:
to_add = original_cfg + bonus - existing_ramp
if to_add <= 0:
self.output_func(f"Ramp target met ({existing_ramp}/{original_cfg}). Random bonus {bonus} -> no additional ramp needed.")
return
self.output_func(f"Ramp target met ({existing_ramp}/{original_cfg}). Adding random bonus {bonus}; scheduling {to_add} extra ramp spells.")
if any(isinstance(t, str) and 'ramp' in t.lower() for t in entry.get('Tags', [])):
existing_ramp += 1
to_add, _bonus = bu.compute_adjusted_target('Ramp', target_total, existing_ramp, self.output_func, plural_word='ramp spells')
if existing_ramp >= target_total and to_add == 0:
return
if existing_ramp < target_total:
target_total = to_add
else:
remaining = original_cfg - existing_ramp
self.output_func(f"Existing ramp {existing_ramp}/{original_cfg}. Remaining need {remaining}. Random bonus {bonus}. Adding {remaining + bonus} ramp spells.")
target_total = remaining + bonus
target_total = to_add # overflow case adds extra beyond original
# Exclude lands (handled separately) and Commander
work = work[~work['type'].fillna('').str.contains('Land', case=False, na=False)]
commander_name = getattr(self, 'commander', None)
if commander_name:
work = work[work['name'] != commander_name]
# Sort priority
sort_cols = []
if 'edhrecRank' in work.columns:
sort_cols.append('edhrecRank')
if 'manaValue' in work.columns:
sort_cols.append('manaValue')
if sort_cols:
work = work.sort_values(by=sort_cols, ascending=[True]*len(sort_cols), na_position='last')
# Sort priority (lowest edhrecRank then manaValue)
work = bu.sort_by_priority(work, ['edhrecRank','manaValue'])
# Phase targets
import math
# top-level math
rocks_target = min(target_total, math.ceil(target_total/3))
dorks_target = min(target_total - rocks_target, math.ceil(target_total/4))
# remainder auto for general
@ -2867,16 +2799,7 @@ class DeckBuilder:
df = self._combined_cards_df.copy()
if 'name' not in df.columns:
return
def norm_tags(cell):
if isinstance(cell, list):
return [str(t).strip().lower() for t in cell]
if isinstance(cell, str):
raw = cell.lower()
for ch in '[]'":":
raw = raw.replace(ch, ' ')
return [p.strip().strip("'\"") for p in raw.replace(';', ',').split(',') if p.strip()]
return []
df['_ltags'] = df.get('themeTags', []).apply(norm_tags)
df['_ltags'] = df.get('themeTags', []).apply(bu.normalize_tag_cell)
def is_removal(tags):
return any('removal' in t or 'spot removal' in t for t in tags)
def is_wipe(tags):
@ -2886,34 +2809,18 @@ class DeckBuilder:
commander_name = getattr(self, 'commander', None)
if commander_name:
pool = pool[pool['name'] != commander_name]
sort_cols = []
if 'edhrecRank' in pool.columns:
sort_cols.append('edhrecRank')
if 'manaValue' in pool.columns:
sort_cols.append('manaValue')
if sort_cols:
pool = pool.sort_values(by=sort_cols, ascending=[True]*len(sort_cols), na_position='last')
# Count existing removal (excluding wipes) & random bonus up to 20%
import math
import random
original_cfg = target
# Sort priority
pool = bu.sort_by_priority(pool, ['edhrecRank','manaValue'])
# Adjusted removal target
existing = 0
for name, entry in self.card_library.items():
lt = [str(t).lower() for t in entry.get('Tags', [])]
if any(('removal' in t or 'spot removal' in t) for t in lt) and not any(('board wipe' in t or 'mass removal' in t) for t in lt):
existing += 1
bonus = math.ceil(original_cfg * random.uniform(0.0, 0.2)) if original_cfg > 0 else 0
if existing >= original_cfg:
to_add = original_cfg + bonus - existing
if to_add <= 0:
self.output_func(f"Removal target met ({existing}/{original_cfg}). Random bonus {bonus} -> no additional removal needed.")
return
self.output_func(f"Removal target met ({existing}/{original_cfg}). Adding random bonus {bonus}; scheduling {to_add} extra removal spells.")
target = to_add
else:
remaining_need = original_cfg - existing
target = remaining_need + bonus
self.output_func(f"Existing removal {existing}/{original_cfg}. Remaining need {remaining_need}. Random bonus {bonus}. Adding {target} removal spells.")
to_add, _bonus = bu.compute_adjusted_target('Removal', target, existing, self.output_func, plural_word='removal spells')
if existing >= target and to_add == 0:
return
target = to_add if existing < target else to_add
added = 0
added_names = []
for _, r in pool.iterrows():
@ -2943,16 +2850,7 @@ class DeckBuilder:
return
already = {n.lower() for n in self.card_library.keys()}
df = self._combined_cards_df.copy()
def norm(cell):
if isinstance(cell, list):
return [str(t).strip().lower() for t in cell]
if isinstance(cell, str):
raw = cell.lower()
for ch in '[]'":":
raw = raw.replace(ch, ' ')
return [p.strip().strip("'\"") for p in raw.replace(';', ',').split(',') if p.strip()]
return []
df['_ltags'] = df.get('themeTags', []).apply(norm)
df['_ltags'] = df.get('themeTags', []).apply(bu.normalize_tag_cell)
def is_wipe(tags):
return any('board wipe' in t or 'mass removal' in t for t in tags)
pool = df[df['_ltags'].apply(is_wipe)]
@ -2960,34 +2858,18 @@ class DeckBuilder:
commander_name = getattr(self, 'commander', None)
if commander_name:
pool = pool[pool['name'] != commander_name]
sort_cols = []
if 'edhrecRank' in pool.columns:
sort_cols.append('edhrecRank')
if 'manaValue' in pool.columns:
sort_cols.append('manaValue')
if sort_cols:
pool = pool.sort_values(by=sort_cols, ascending=[True]*len(sort_cols), na_position='last')
# Count existing wipes & random bonus up to 20%
import math
import random
original_cfg = target
# Sort priority
pool = bu.sort_by_priority(pool, ['edhrecRank','manaValue'])
# Adjusted board wipe target
existing = 0
for name, entry in self.card_library.items():
tags = [str(t).lower() for t in entry.get('Tags', [])]
if any(('board wipe' in t or 'mass removal' in t) for t in tags):
existing += 1
bonus = math.ceil(original_cfg * random.uniform(0.0, 0.2)) if original_cfg > 0 else 0
if existing >= original_cfg:
to_add = original_cfg + bonus - existing
if to_add <= 0:
self.output_func(f"Board wipe target met ({existing}/{original_cfg}). Random bonus {bonus} -> no additional wipes needed.")
return
self.output_func(f"Board wipe target met ({existing}/{original_cfg}). Adding random bonus {bonus}; scheduling {to_add} extra wipes.")
target = to_add
else:
remaining_need = original_cfg - existing
target = remaining_need + bonus
self.output_func(f"Existing wipes {existing}/{original_cfg}. Remaining need {remaining_need}. Random bonus {bonus}. Adding {target} wipes.")
to_add, _bonus = bu.compute_adjusted_target('Board wipe', target, existing, self.output_func, plural_word='wipes')
if existing >= target and to_add == 0:
return
target = to_add if existing < target else to_add
added = 0
added_names = []
for _, r in pool.iterrows():
@ -3015,40 +2897,19 @@ class DeckBuilder:
total_target = self.ideal_counts.get('card_advantage', 0)
if total_target <= 0 or self._combined_cards_df is None:
return
import math
# Count existing draw pieces & random bonus up to 20%
import random
original_cfg = total_target
existing = 0
for name, entry in self.card_library.items():
tags = [str(t).lower() for t in entry.get('Tags', [])]
if any(('draw' in t) or ('card advantage' in t) for t in tags):
existing += 1
bonus = math.ceil(original_cfg * random.uniform(0.0, 0.2)) if original_cfg > 0 else 0
if existing >= original_cfg:
to_add_total = original_cfg + bonus - existing
if to_add_total <= 0:
self.output_func(f"Card advantage target met ({existing}/{original_cfg}). Random bonus {bonus} -> no additional draw needed.")
return
self.output_func(f"Card advantage target met ({existing}/{original_cfg}). Adding random bonus {bonus}; scheduling {to_add_total} extra draw spells.")
total_target = to_add_total
else:
remaining_need = original_cfg - existing
total_target = remaining_need + bonus
self.output_func(f"Existing draw {existing}/{original_cfg}. Remaining need {remaining_need}. Random bonus {bonus}. Adding {total_target} draw spells.")
to_add_total, _bonus = bu.compute_adjusted_target('Card advantage', total_target, existing, self.output_func, plural_word='draw spells')
if existing >= total_target and to_add_total == 0:
return
total_target = to_add_total if existing < total_target else to_add_total
conditional_target = min(total_target, math.ceil(total_target * 0.2))
already = {n.lower() for n in self.card_library.keys()}
df = self._combined_cards_df.copy()
def norm(cell):
if isinstance(cell, list):
return [str(t).strip().lower() for t in cell]
if isinstance(cell, str):
raw = cell.lower()
for ch in '[]'":":
raw = raw.replace(ch, ' ')
return [p.strip().strip("'\"") for p in raw.replace(';', ',').split(',') if p.strip()]
return []
df['_ltags'] = df.get('themeTags', []).apply(norm)
df['_ltags'] = df.get('themeTags', []).apply(bu.normalize_tag_cell)
def is_draw(tags):
return any(('draw' in t) or ('card advantage' in t) for t in tags)
df = df[df['_ltags'].apply(is_draw)]
@ -3063,14 +2924,7 @@ class DeckBuilder:
conditional_df = df[df['_ltags'].apply(is_conditional)]
unconditional_df = df[~df.index.isin(conditional_df.index)]
def sortit(d):
sc = []
if 'edhrecRank' in d.columns:
sc.append('edhrecRank')
if 'manaValue' in d.columns:
sc.append('manaValue')
if sc:
d = d.sort_values(by=sc, ascending=[True]*len(sc), na_position='last')
return d
return bu.sort_by_priority(d, ['edhrecRank','manaValue'])
conditional_df = sortit(conditional_df)
unconditional_df = sortit(unconditional_df)
added_cond = 0
@ -3122,49 +2976,23 @@ class DeckBuilder:
return
already = {n.lower() for n in self.card_library.keys()}
df = self._combined_cards_df.copy()
def norm(cell):
if isinstance(cell, list):
return [str(t).strip().lower() for t in cell]
if isinstance(cell, str):
raw = cell.lower()
for ch in '[]'":":
raw = raw.replace(ch, ' ')
return [p.strip().strip("'\"") for p in raw.replace(';', ',').split(',') if p.strip()]
return []
df['_ltags'] = df.get('themeTags', []).apply(norm)
df['_ltags'] = df.get('themeTags', []).apply(bu.normalize_tag_cell)
pool = df[df['_ltags'].apply(lambda tags: any('protection' in t for t in tags))]
pool = pool[~pool['type'].fillna('').str.contains('Land', case=False, na=False)]
commander_name = getattr(self, 'commander', None)
if commander_name:
pool = pool[pool['name'] != commander_name]
sort_cols = []
if 'edhrecRank' in pool.columns:
sort_cols.append('edhrecRank')
if 'manaValue' in pool.columns:
sort_cols.append('manaValue')
if sort_cols:
pool = pool.sort_values(by=sort_cols, ascending=[True]*len(sort_cols), na_position='last')
# Count existing protection pieces & random bonus up to 20%
import math
import random
original_cfg = target
# Sort priority
pool = bu.sort_by_priority(pool, ['edhrecRank','manaValue'])
existing = 0
for name, entry in self.card_library.items():
tags = [str(t).lower() for t in entry.get('Tags', [])]
if any('protection' in t for t in tags):
existing += 1
bonus = math.ceil(original_cfg * random.uniform(0.0, 0.2)) if original_cfg > 0 else 0
if existing >= original_cfg:
to_add = original_cfg + bonus - existing
if to_add <= 0:
self.output_func(f"Protection target met ({existing}/{original_cfg}). Random bonus {bonus} -> no additional protection needed.")
return
self.output_func(f"Protection target met ({existing}/{original_cfg}). Adding random bonus {bonus}; scheduling {to_add} extra protection spells.")
target = to_add
else:
remaining_need = original_cfg - existing
target = remaining_need + bonus
self.output_func(f"Existing protection {existing}/{original_cfg}. Remaining need {remaining_need}. Random bonus {bonus}. Adding {target} protection spells.")
to_add, _bonus = bu.compute_adjusted_target('Protection', target, existing, self.output_func, plural_word='protection spells')
if existing >= target and to_add == 0:
return
target = to_add if existing < target else to_add
added = 0
added_names = []
for _, r in pool.iterrows():
@ -3210,10 +3038,7 @@ class DeckBuilder:
themes_ordered.append(('tertiary', self.tertiary_tag))
if not themes_ordered:
return
import ast
import re
import math
import random
# top-level ast, re, math, random
n_themes = len(themes_ordered)
if n_themes == 1:
base_map = {'primary': 1.0}
@ -3246,35 +3071,7 @@ class DeckBuilder:
else:
weights['primary'] = 1.0
def _parse_tags(val):
if isinstance(val, list):
out = []
for v in val:
if isinstance(v, list):
out.extend(str(x) for x in v)
else:
out.append(str(v))
return [s.strip() for s in out if s and s.strip()]
if isinstance(val, str):
s = val.strip()
try:
parsed = ast.literal_eval(s)
if isinstance(parsed, list):
return [str(x).strip() for x in parsed if str(x).strip()]
except Exception:
pass
if s.startswith('[') and s.endswith(']'):
s = s[1:-1]
parts = [p.strip().strip("'\"") for p in s.split(',')]
cleaned = []
for p in parts:
if not p:
continue
q = re.sub(r"^[\[\s']+|[\]\s']+$", '', p)
if q:
cleaned.append(q)
return cleaned
return []
# Tag parsing now standardized via builder_utils.normalize_tag_cell
# Filter to non-land, non-creature spells
spells_df = df[
@ -3285,8 +3082,8 @@ class DeckBuilder:
return
selected_tags_lower = [t.lower() for _r, t in themes_ordered]
if '_parsedThemeTags' not in spells_df.columns:
spells_df['_parsedThemeTags'] = spells_df['themeTags'].apply(_parse_tags)
spells_df['_normTags'] = spells_df['_parsedThemeTags'].apply(lambda lst: [s.lower() for s in lst])
spells_df['_parsedThemeTags'] = spells_df['themeTags'].apply(bu.normalize_tag_cell)
spells_df['_normTags'] = spells_df['_parsedThemeTags'] # already lowercase list
spells_df['_multiMatch'] = spells_df['_normTags'].apply(
lambda lst: sum(1 for t in selected_tags_lower if t in lst)
)
@ -3301,7 +3098,7 @@ class DeckBuilder:
w = weights.get(role, 0.0)
if w <= 0:
continue
target = int(math.ceil(remaining * w * random.uniform(1.0, 1.1)))
target = int(math.ceil(remaining * w * self._get_rng().uniform(1.0, 1.1)))
target = min(target, remaining - total_added)
if target <= 0:
continue
@ -3329,28 +3126,8 @@ class DeckBuilder:
pool = pool[~pool['name'].isin(self.card_library.keys())]
if pool.empty:
continue
names_vec = pool['name'].tolist()
weights_vec = [
synergy_bonus if mm >= 2 else 1.0 for mm in pool['_multiMatch']
]
chosen = []
try:
for _ in range(min(target, len(names_vec))):
totw = sum(weights_vec)
if totw <= 0:
break
r = random.random() * totw
acc = 0.0
idx = 0
for i, wv in enumerate(weights_vec):
acc += wv
if r <= acc:
idx = i
break
chosen.append(names_vec.pop(idx))
weights_vec.pop(idx)
except Exception:
chosen = names_vec[:target]
weighted_pool = [ (nm, (synergy_bonus if mm >= 2 else 1.0)) for nm, mm in zip(pool['name'], pool['_multiMatch']) ]
chosen = bu.weighted_sample_without_replacement(weighted_pool, target)
for nm in chosen:
row = pool[pool['name'] == nm].iloc[0]
self.add_card(
@ -3436,7 +3213,7 @@ class DeckBuilder:
# fallback: any leftover spell
subset = leftover
else:
cat_choice = random.choice(list(candidates_by_cat.keys()))
cat_choice = self._get_rng().choice(list(candidates_by_cat.keys()))
subset = candidates_by_cat[cat_choice]
# Sort subset
if 'edhrecRank' in subset.columns:
@ -3552,9 +3329,6 @@ class DeckBuilder:
If path not provided, writes to deck_files/<CommanderFirstWord>_<YYYYMMDD>.csv.
Returns the written path or None on failure.
"""
import datetime
import os
import csv
if not self.card_library:
self.output_func('No cards in library to export.')
return None
@ -3779,8 +3553,7 @@ class DeckBuilder:
ci = rev_snow.get(name, ci)
colors = rev_snow.get(name, colors)
# Sanitize NaN / 'nan' strings for display cleanliness
import math
# Sanitize NaN / 'nan' strings for display cleanliness (top-level math)
def _sanitize(val):
if val is None:
return ''
@ -3832,7 +3605,7 @@ class DeckBuilder:
self.output_func("\nTag Summary (unique cards per tag):")
def _clean_tag_key(tag: str) -> str:
import re
# top-level re
if not isinstance(tag, str):
tag = str(tag)
s = tag.strip()
@ -3879,8 +3652,7 @@ class DeckBuilder:
return ''
if prefer_long:
width = 80
# Normalize whitespace but preserve existing newlines (treat each paragraph separately)
import textwrap
# Normalize whitespace but preserve existing newlines (treat each paragraph separately)
paragraphs = str(text).split('\n')
wrapped_parts = []
for p in paragraphs:

View file

@ -10,9 +10,11 @@ from __future__ import annotations
from typing import Dict, Iterable
import re
import ast
import random as _rand
from . import builder_constants as bc
import math
COLOR_LETTERS = ['W', 'U', 'B', 'R', 'G']
@ -25,7 +27,6 @@ def parse_theme_tags(val) -> list[str]:
"['Tag1', 'Tag2']"
Tag1, Tag2
Returns list of stripped string tags (may be empty)."""
import ast
if isinstance(val, list):
flat: list[str] = []
for v in val:
@ -157,6 +158,9 @@ __all__ = [
'compute_spell_pip_weights',
'parse_theme_tags',
'normalize_theme_list',
'compute_adjusted_target',
'normalize_tag_cell',
'sort_by_priority',
'COLOR_LETTERS',
'tapped_land_penalty',
'replacement_land_score',
@ -174,6 +178,53 @@ __all__ = [
]
def compute_adjusted_target(category_label: str,
original_cfg: int,
existing: int,
output_func,
plural_word: str | None = None,
bonus_max_pct: float = 0.2,
rng=None) -> tuple[int, int]:
"""Compute how many additional cards of a category to add applying a random bonus.
Returns (to_add, bonus). to_add may be 0 if target already satisfied and bonus doesn't push above existing.
Parameters
----------
category_label : str
Human-readable label (e.g. 'Ramp', 'Removal').
original_cfg : int
Configured target count.
existing : int
How many already present.
output_func : callable
Function for emitting messages (e.g. print or logger).
plural_word : str | None
Phrase used in messages for plural additions. If None derives from label (lower + ' spells').
bonus_max_pct : float
Upper bound for random bonus percent (default 0.2 => up to +20%).
rng : object | None
Optional random-like object with uniform().
"""
if original_cfg <= 0:
return 0, 0
plural_word = plural_word or f"{category_label.lower()} spells"
# Random bonus between 0 and bonus_max_pct inclusive
roll = (rng.uniform(0.0, bonus_max_pct) if rng else _rand.uniform(0.0, bonus_max_pct))
bonus = math.ceil(original_cfg * roll) if original_cfg > 0 else 0
if existing >= original_cfg:
to_add = original_cfg + bonus - existing
if to_add <= 0:
output_func(f"{category_label} target met ({existing}/{original_cfg}). Random bonus {bonus} -> no additional {plural_word} needed.")
return 0, bonus
output_func(f"{category_label} target met ({existing}/{original_cfg}). Adding random bonus {bonus}; scheduling {to_add} extra {plural_word}.")
return to_add, bonus
remaining_need = original_cfg - existing
to_add = remaining_need + bonus
output_func(f"Existing {category_label.lower()} {existing}/{original_cfg}. Remaining need {remaining_need}. Random bonus {bonus}. Adding {to_add} {plural_word}.")
return to_add, bonus
def tapped_land_penalty(tline: str, text_field: str) -> tuple[int, int]:
"""Classify a land for tapped optimization.
@ -265,7 +316,7 @@ def weighted_sample_without_replacement(pool: list[tuple[str, int | float]], k:
"""
if k <= 0 or not pool:
return []
import random as _rand
# _rand imported at module level
local_rng = rng if rng is not None else _rand
working = pool.copy()
chosen: list[str] = []
@ -329,6 +380,44 @@ def select_top_land_candidates(df, already: set[str], basics: set[str], top_n: i
return out[:top_n]
# ---------------------------------------------------------------------------
# Generic DataFrame helpers (tag normalization & sorting)
# ---------------------------------------------------------------------------
def normalize_tag_cell(cell):
"""Normalize a themeTags-like cell into a lowercase list of tags.
Accepts list, nested list, or string forms. Mirrors logic previously in multiple
methods inside builder.py.
"""
if isinstance(cell, list):
out: list[str] = []
for v in cell:
if isinstance(v, list):
out.extend(str(x).strip().lower() for x in v if str(x).strip())
else:
vs = str(v).strip().lower()
if vs:
out.append(vs)
return out
if isinstance(cell, str):
raw = cell.lower()
for ch in '[]"':
raw = raw.replace(ch, ' ')
parts = [p.strip().strip("'\"") for p in raw.replace(';', ',').split(',') if p.strip()]
return [p for p in parts if p]
return []
def sort_by_priority(df, columns: list[str]):
"""Sort DataFrame by listed columns ascending if present; ignores missing.
Returns new DataFrame (does not mutate original)."""
present = [c for c in columns if c in df.columns]
if not present:
return df
return df.sort_values(by=present, ascending=[True]*len(present), na_position='last')
# ---------------------------------------------------------------------------
# Tag-driven land suggestion helpers
# ---------------------------------------------------------------------------
@ -493,7 +582,7 @@ def enforce_land_cap(builder, step_label: str = ""):
bc = __import__('deck_builder.builder_constants', fromlist=['DEFAULT_LAND_COUNT'])
land_target = builder.ideal_counts.get('lands', getattr(bc, 'DEFAULT_LAND_COUNT', 35))
min_basic = builder.ideal_counts.get('basic_lands', getattr(bc, 'DEFAULT_BASIC_LAND_COUNT', 20))
import math
# math not needed; using ceil via BASIC_FLOOR_FACTOR logic only
floor_basics = math.ceil(bc.BASIC_FLOOR_FACTOR * min_basic)
current_land = builder._current_land_count()
if current_land <= land_target:

View file

@ -35,12 +35,14 @@ def run(
dual_count: Optional[int] = None,
triple_count: Optional[int] = None,
utility_count: Optional[int] = None,
seed: Optional[int] = None,
) -> DeckBuilder:
"""Run a scripted non-interactive deck build and return the DeckBuilder instance.
Integer parameters (primary_choice, secondary_choice, tertiary_choice) correspond to the
numeric indices shown during interactive tag selection. Pass None to omit secondary/tertiary.
Optional counts (fetch_count, dual_count, triple_count, utility_count) constrain land steps.
seed: optional deterministic RNG seed for reproducible builds.
"""
scripted_inputs: List[str] = []
# Commander query & selection
@ -70,7 +72,7 @@ def run(
return scripted_inputs.pop(0)
raise RuntimeError("Ran out of scripted inputs for prompt: " + prompt)
builder = DeckBuilder(input_func=scripted_input)
builder = DeckBuilder(input_func=scripted_input, seed=seed)
builder.run_initial_setup()
builder.run_deck_build_step1()
builder.run_deck_build_step2()