mtg_python_deckbuilder/code/deck_builder/phases/phase2_lands_analysis.py

387 lines
17 KiB
Python
Raw Normal View History

from __future__ import annotations
import logging
import os
from typing import Any, Dict, Optional
from .. import builder_constants as bc
from .. import builder_utils as bu
"""Phase 2 (pre-step): Smart land base analysis (Roadmap 14, M1).
LandAnalysisMixin.run_land_analysis() is called from run_deck_build_step2()
AFTER ideal_counts defaults are seeded, so LAND_PROFILE and LAND_COUNT env
overrides win over the calculated values.
Responsibilities:
- compute_pip_density(): delegate to builder_utils
- analyze_curve(): delegate to builder_utils
- determine_profile(): basics / mid / fixing rules from Profile Definitions
- run_land_analysis(): orchestrates analysis, sets ideal_counts, self._land_profile
"""
logger = logging.getLogger(__name__)
class LandAnalysisMixin:
# ------------------------------------------------------------------
# Public entry point — called from run_deck_build_step2()
# ------------------------------------------------------------------
def run_land_analysis(self) -> None:
"""Analyse the commander and color identity to set a smart land profile.
Sets:
self._land_profile 'basics' | 'mid' | 'fixing' (default: 'mid')
self._speed_category 'fast' | 'mid' | 'slow'
self._land_report_data dict persisted for M3 diagnostics export
Mutates:
self.ideal_counts['lands'] and self.ideal_counts['basic_lands']
(only when enable_smart_lands is True on the builder; LAND_PROFILE/LAND_COUNT env overrides honoured after)
"""
# Per-build toggle from UI checkbox (defaults True = opted in)
if not getattr(self, 'enable_smart_lands', True):
return
try:
self._run_land_analysis_inner()
except Exception as exc:
logger.warning('run_land_analysis failed (%s); defaulting to mid profile', exc)
self._land_profile = 'mid'
self._speed_category = 'mid'
def _run_land_analysis_inner(self) -> None:
color_identity = getattr(self, 'color_identity', []) or []
colors = [c for c in color_identity if c in ('W', 'U', 'B', 'R', 'G')]
color_count = len(colors)
# --- Card pool DataFrame (available at step 2; card_library is still empty) ---
pool_df = getattr(self, '_combined_cards_df', None)
# --- Curve analysis: commander CMC + pool average CMC (weighted) ---
_cdict = getattr(self, 'commander_dict', None) or {}
commander_cmc = float(_cdict.get('CMC') or _cdict.get('Mana Value') or 3.5)
effective_cmc = commander_cmc
avg_pool_cmc: Optional[float] = None
if pool_df is not None and not pool_df.empty:
try:
non_land = pool_df[~pool_df['type'].str.lower().str.contains('land', na=False)]
if not non_land.empty and 'manaValue' in non_land.columns:
avg_pool_cmc = float(non_land['manaValue'].mean())
# Weight commander CMC more heavily (it's the clearest intent signal)
effective_cmc = commander_cmc * 0.6 + avg_pool_cmc * 0.4
except Exception as exc:
logger.debug('Pool average CMC failed (%s); using commander CMC only', exc)
curve_stats = bu.analyze_curve(effective_cmc, color_count)
speed: str = curve_stats['speed_category']
# Apply the speed-based offset relative to the user's configured ideal land count.
# e.g. if the user set 40 lands: fast gets 38, mid stays 40, slow gets 42-44.
# This respects custom ideals instead of always using the hardcoded 33/35/37-39.
mid_default = getattr(bc, 'LAND_COUNT_MID', 35)
_user_land_base = int((getattr(self, 'ideal_counts', None) or {}).get('lands', mid_default))
_speed_offset = curve_stats['land_target'] - mid_default
land_target: int = max(1, _user_land_base + _speed_offset)
_orig_land_target = curve_stats['land_target']
basic_target: int = (
max(color_count, int(round(curve_stats['basic_target'] * land_target / _orig_land_target)))
if _orig_land_target > 0
else curve_stats['basic_target']
)
# --- Pip density analysis from pool (card_library is empty at step 2) ---
pip_density: Dict[str, Dict[str, int]] = {}
try:
if pool_df is not None and not pool_df.empty:
# Convert pool to minimal dict format for compute_pip_density
records = pool_df[['manaCost', 'type']].fillna('').to_dict('records')
pool_dict = {
str(i): {
'Mana Cost': str(r.get('manaCost') or ''),
'Card Type': str(r.get('type') or ''),
}
for i, r in enumerate(records)
}
pip_density = bu.compute_pip_density(pool_dict, colors)
else:
# Fallback for tests / headless contexts without a loaded DataFrame
card_library = getattr(self, 'card_library', {})
pip_density = bu.compute_pip_density(card_library, colors)
except Exception as exc:
logger.warning('compute_pip_density failed (%s); profile from curve only', exc)
# --- Profile determination ---
profile = self._determine_profile(pip_density, color_count)
# --- Budget override ---
budget_total = getattr(self, 'budget_total', None)
if budget_total is not None and color_count >= 3:
budget_threshold = getattr(bc, 'BUDGET_FORCE_BASICS_THRESHOLD', 50.0)
if float(budget_total) < budget_threshold:
prev_profile = profile
profile = 'basics'
self.output_func(
f'[Smart Lands] Budget ${budget_total:.0f} < ${budget_threshold:.0f} '
f'with {color_count} colors: forcing basics-heavy profile '
f'(was {prev_profile}).'
)
# --- LAND_PROFILE env override (highest priority) ---
env_profile = os.environ.get('LAND_PROFILE', '').strip().lower()
if env_profile in ('basics', 'mid', 'fixing'):
profile = env_profile
# --- Compute basic count for profile ---
basics = self._basics_for_profile(profile, color_count, land_target)
# --- LAND_COUNT env override ---
env_land_count = os.environ.get('LAND_COUNT', '').strip()
if env_land_count.isdigit():
land_target = int(env_land_count)
# Re-clamp basics against (possibly overridden) land target
min_headroom = getattr(bc, 'BASICS_MIN_HEADROOM', 5)
basics = min(basics, land_target - min_headroom)
basics = max(basics, color_count)
# --- Apply to ideal_counts ---
ideal: Dict[str, int] = getattr(self, 'ideal_counts', {})
ideal['lands'] = land_target
ideal['basic_lands'] = basics
# --- Pip summary for reporting ---
total_double = sum(v.get('double', 0) for v in pip_density.values())
total_triple = sum(v.get('triple', 0) for v in pip_density.values())
# Pips were a deciding factor when they pushed profile away from the default
pip_was_deciding = (
(color_count >= 3 and (total_double >= 15 or total_triple >= 3))
or (color_count <= 2 and total_double < 5 and total_triple == 0)
)
# --- Persist analysis state ---
self._land_profile = profile
self._speed_category = speed
self._land_report_data: Dict[str, Any] = {
'profile': profile,
'speed_category': speed,
'commander_cmc': commander_cmc,
'effective_cmc': effective_cmc,
'avg_pool_cmc': avg_pool_cmc,
'color_count': color_count,
'land_target': land_target,
'basic_target': basics,
'pip_density': pip_density,
'total_double_pips': total_double,
'total_triple_pips': total_triple,
'pip_was_deciding': pip_was_deciding,
'budget_total': budget_total,
'env_overrides': {
'LAND_PROFILE': env_profile or None,
'LAND_COUNT': env_land_count or None,
},
}
rationale = self._build_rationale(profile, speed, commander_cmc, effective_cmc, color_count, pip_density, budget_total)
self._land_report_data['rationale'] = rationale
self.output_func(
f'\n[Smart Lands] Profile: {profile} | Speed: {speed} | '
f'Lands: {land_target} | Basics: {basics}'
)
self.output_func(f' Rationale: {rationale}')
# --- Earmark land slots: scale non-land ideals to fit within the remaining budget ---
# Commander takes 1 slot, so there are 99 slots for non-commander cards.
# If non-land ideal counts sum to more than (99 - land_target), the spell phases
# will fill those slots first (in spells-first builds) leaving no room for lands.
self._earmark_land_slots(land_target)
def _earmark_land_slots(self, land_target: int) -> None:
"""Scale non-land ideal_counts down so they fit within 99 - land_target slots.
This ensures the spell phases never consume the slots reserved for lands,
making backfill unnecessary in the normal case.
"""
NON_LAND_KEYS = ['creatures', 'ramp', 'removal', 'wipes', 'card_advantage', 'protection']
# 99 = total deck slots minus commander
deck_slots = getattr(bc, 'DECK_NON_COMMANDER_SLOTS', 99)
budget = deck_slots - land_target
if budget <= 0:
return
ideal: Dict[str, int] = getattr(self, 'ideal_counts', {})
current_sum = sum(int(ideal.get(k, 0)) for k in NON_LAND_KEYS)
if current_sum <= budget:
return # already fits; nothing to do
# Scale each key down proportionally (floor), then top up from the largest key first.
scale = budget / current_sum
new_vals: Dict[str, int] = {}
for k in NON_LAND_KEYS:
new_vals[k] = max(0, int(int(ideal.get(k, 0)) * scale))
remainder = budget - sum(new_vals.values())
# Distribute leftover slots to the largest keys first (preserves relative proportion)
for k in sorted(NON_LAND_KEYS, key=lambda x: -int(ideal.get(x, 0))):
if remainder <= 0:
break
new_vals[k] += 1
remainder -= 1
# Apply and report
adjustments: list[str] = []
for k in NON_LAND_KEYS:
old = int(ideal.get(k, 0))
new = new_vals[k]
if old != new:
ideal[k] = new
adjustments.append(f'{k}: {old}{new}')
if adjustments:
self.output_func(
f' [Smart Lands] Earmarked {land_target} land slots; '
f'scaled non-land targets to fit {budget} remaining: {", ".join(adjustments)}'
)
# ------------------------------------------------------------------
# Profile determination
# ------------------------------------------------------------------
def _determine_profile(
self,
pip_density: Dict[str, Dict[str, int]],
color_count: int,
) -> str:
"""Determine the land profile from pip density and color count.
Rules (in priority order):
1. 5-color fixing
2. 1-color basics
3. High pip density (15 double-pips or 3 triple-pips) AND 3+ colors fixing
4. Low pip density (<5 double-pips, 0 triple-pips) AND 1-2 colors basics
5. Otherwise mid
"""
if color_count >= 5:
return 'fixing'
if color_count <= 1:
return 'basics'
total_double = sum(v.get('double', 0) for v in pip_density.values())
total_triple = sum(v.get('triple', 0) for v in pip_density.values())
if color_count >= 3 and (total_double >= 15 or total_triple >= 3):
return 'fixing'
if color_count <= 2 and total_double < 5 and total_triple == 0:
return 'basics'
return 'mid'
# ------------------------------------------------------------------
# Basics count per profile
# ------------------------------------------------------------------
def _basics_for_profile(self, profile: str, color_count: int, land_target: int) -> int:
min_headroom = getattr(bc, 'BASICS_MIN_HEADROOM', 5)
if profile == 'basics':
ratio = getattr(bc, 'BASICS_HEAVY_RATIO', 0.60)
count = int(round(land_target * ratio))
elif profile == 'fixing':
per_color = getattr(bc, 'BASICS_FIXING_PER_COLOR', 2)
count = max(color_count * per_color, color_count)
else: # mid
# Default ratio preserved — same as current behavior
count = getattr(bc, 'DEFAULT_BASIC_LAND_COUNT', 15)
# Clamp
count = min(count, land_target - min_headroom)
count = max(count, color_count)
return count
# ------------------------------------------------------------------
# Rationale string
# ------------------------------------------------------------------
def _build_rationale(
self,
profile: str,
speed: str,
commander_cmc: float,
effective_cmc: float,
color_count: int,
pip_density: Dict[str, Dict[str, int]],
budget: Optional[float],
) -> str:
total_double = sum(v.get('double', 0) for v in pip_density.values())
total_triple = sum(v.get('triple', 0) for v in pip_density.values())
if abs(effective_cmc - commander_cmc) >= 0.2:
cmc_label = f'commander CMC {commander_cmc:.0f}, effective {effective_cmc:.1f} (with pool avg)'
else:
cmc_label = f'commander CMC {commander_cmc:.1f}'
parts = [
f'{color_count}-color identity',
f'{cmc_label} ({speed} deck)',
]
if pip_density:
parts.append(f'{total_double} double-pips, {total_triple} triple-or-more-pips')
if budget is not None:
parts.append(f'budget ${budget:.0f}')
profile_desc = {
'basics': 'basics-heavy (minimal fixing)',
'mid': 'balanced (moderate fixing)',
'fixing': 'fixing-heavy (extensive duals/fetches)',
}.get(profile, profile)
return f'{profile_desc}{", ".join(parts)}'
# ------------------------------------------------------------------
# Post-build diagnostics (M3) — called from build_deck_summary()
# ------------------------------------------------------------------
def generate_diagnostics(self) -> None:
"""Update _land_report_data with post-build actuals from card_library.
Runs after all land/spell phases have added cards so card_library is
fully populated. Safe to call even when ENABLE_SMART_LANDS is off
initialises _land_report_data with basic actuals if missing.
"""
if not hasattr(self, '_land_report_data'):
self._land_report_data = {}
library = getattr(self, 'card_library', {})
if not library:
return
# Build a name → row dict for type/oracle text lookups
df = getattr(self, '_combined_cards_df', None)
name_to_row: Dict[str, Any] = {}
if df is not None and not getattr(df, 'empty', True):
try:
for _, row in df.iterrows():
nm = str(row.get('name', '') or '')
if nm and nm not in name_to_row:
name_to_row[nm] = row.to_dict()
except Exception as exc:
logger.debug('generate_diagnostics: df scan failed (%s)', exc)
total_lands = 0
tapped_count = 0
fixing_count = 0
basic_count = 0
for name, info in library.items():
ctype = str(info.get('Card Type', '') or '')
if 'land' not in ctype.lower():
continue
total_lands += 1
if 'basic' in ctype.lower():
basic_count += 1
row = name_to_row.get(name, {})
tline = str(row.get('type', ctype) or ctype).lower()
text_field = str(row.get('text', '') or '').lower()
tapped_flag, _ = bu.tapped_land_penalty(tline, text_field)
if tapped_flag:
tapped_count += 1
if bu.is_color_fixing_land(tline, text_field):
fixing_count += 1
tapped_pct = round(tapped_count / total_lands * 100, 1) if total_lands else 0.0
self._land_report_data.update({
'actual_land_count': total_lands,
'actual_tapped_count': tapped_count,
'actual_fixing_count': fixing_count,
'actual_basic_count': basic_count,
'tapped_pct': tapped_pct,
})