feat(themes): whitelist governance, synergy cap, docs + tests; feat(random): laid roadwork for random implementation, testing in headless confirmed

This commit is contained in:
matt 2025-09-17 13:23:27 -07:00
parent 03e839fb87
commit 16261bbf09
34 changed files with 12594 additions and 23 deletions

View file

@ -38,3 +38,10 @@ jobs:
- name: Tests
run: |
pytest -q || true
- name: Fast determinism tests (random subset)
env:
CSV_FILES_DIR: csv_files/testdata
RANDOM_MODES: "1"
run: |
pytest -q code/tests/test_random_determinism.py code/tests/test_random_build_api.py code/tests/test_seeded_builder_minimal.py code/tests/test_builder_rng_seeded_stream.py

1
.gitignore vendored
View file

@ -14,6 +14,7 @@ logs/
deck_files/
csv_files/
!config/card_lists/*.json
!config/themes/*.json
!config/deck.json
!test_exclude_cards.txt
!test_include_exclude_config.json

View file

@ -1,3 +1,5 @@
- Random Modes (alpha): added env flags RANDOM_MODES, RANDOM_UI, RANDOM_MAX_ATTEMPTS, RANDOM_TIMEOUT_MS.
- Determinism: CSV_FILES_DIR override to point tests to csv_files/testdata; permalink now carries optional random fields (seed/theme/constraints).
# Changelog
All notable changes to this project will be documented in this file.
@ -13,14 +15,24 @@ This format follows Keep a Changelog principles and aims for Semantic Versioning
## [Unreleased]
### Added
- Theme governance: whitelist configuration `config/themes/theme_whitelist.yml` (normalization, always_include, protected prefixes/suffixes, enforced synergies, synergy_cap).
- Theme extraction: dynamic ingestion of CSV-only tags (e.g., Kindred families) and PMI-based inferred synergies (positive PMI, co-occurrence threshold) blended with curated pairs.
- Enforced synergy injection for counters/tokens/graveyard clusters (e.g., Proliferate, Counters Matter, Graveyard Matters) before capping.
- Test coverage: `test_theme_whitelist_and_synergy_cap.py` ensuring enforced synergies present and cap (5) respected.
- Dependency: added PyYAML (optional runtime dependency for governance file parsing).
- CI: additional checks to improve stability and reproducibility.
- Tests: broader coverage for validation and web flows.
- Randomizer groundwork: added a small seeded RNG utility (`code/random_util.py`) and determinism unit tests; threaded RNG through Phase 3 (creatures) and Phase 4 (spells) for deterministic sampling when seeded.
- Random Modes (alpha): thin wrapper entrypoint `code/deck_builder/random_entrypoint.py` to select a commander deterministically by seed, plus a tiny frozen dataset under `csv_files/testdata/` and tests `code/tests/test_random_determinism.py`.
### Changed
- Synergy lists for now capped at 5 entries (precedence: curated > enforced > inferred) to improve UI scannability.
- Curated synergy matrix expanded (tokens, spells, artifacts/enchantments, counters, lands, graveyard, politics, life, tribal umbrellas) with noisy links (e.g., Burn on -1/-1 Counters) suppressed via denylist + PMI filtering.
- Tests: refactored to use pytest assertions and cleaned up fixtures/utilities to reduce noise and deprecations.
- Tests: HTTP-dependent tests now skip gracefully when the local web server is unavailable.
### Fixed
- Removed one-off / low-signal themes (global frequency <=1) except those protected or explicitly always included via whitelist configuration.
- Tests: reduced deprecation warnings and incidental failures; improved consistency and reliability across runs.
## [2.2.10] - 2025-09-11

View file

@ -88,6 +88,7 @@ Docker Hub (PowerShell) example:
docker run --rm `
-p 8080:8080 `
-e SHOW_LOGS=1 -e SHOW_DIAGNOSTICS=1 -e ENABLE_THEMES=1 -e THEME=system `
-e RANDOM_MODES=1 -e RANDOM_UI=1 -e RANDOM_MAX_ATTEMPTS=5 -e RANDOM_TIMEOUT_MS=5000 `
-v "${PWD}/deck_files:/app/deck_files" `
-v "${PWD}/logs:/app/logs" `
-v "${PWD}/csv_files:/app/csv_files" `
@ -127,6 +128,29 @@ GET http://localhost:8080/healthz -> { "status": "ok", "version": "dev", "upti
Theme preference reset (client-side): use the headers Reset Theme control to clear the saved browser preference; the server default (THEME) applies on next paint.
### Random Modes (alpha) and test dataset override
Enable experimental Random Modes and UI controls in Web runs by setting:
```yaml
services:
web:
environment:
- RANDOM_MODES=1
- RANDOM_UI=1
- RANDOM_MAX_ATTEMPTS=5
- RANDOM_TIMEOUT_MS=5000
```
For deterministic tests or development, you can point the app to a frozen dataset snapshot:
```yaml
services:
web:
environment:
- CSV_FILES_DIR=/app/csv_files/testdata
```
## Volumes
- `/app/deck_files``./deck_files`
- `/app/logs``./logs`
@ -160,6 +184,13 @@ Theme preference reset (client-side): use the headers Reset Theme control to
- WEB_TAG_WORKERS=<N> (process count; set based on CPU/memory)
- WEB_VIRTUALIZE=1 (enable virtualization)
- SHOW_DIAGNOSTICS=1 (enables diagnostics pages and overlay hotkey `v`)
- RANDOM_MODES=1 (enable random build endpoints)
- RANDOM_UI=1 (show Surprise/Theme/Reroll/Share controls)
- RANDOM_MAX_ATTEMPTS=5 (cap retry attempts)
- RANDOM_TIMEOUT_MS=5000 (per-build timeout in ms)
Testing/determinism helper (dev):
- CSV_FILES_DIR=csv_files/testdata — override CSV base dir to a frozen set for tests
## Manual build/run
```powershell

BIN
README.md

Binary file not shown.

View file

@ -1,14 +1,16 @@
# MTG Python Deckbuilder ${VERSION}
### Added
- CI improvements to increase stability and reproducibility of builds/tests.
- Expanded test coverage for validation and web flows.
- Theme whitelist governance (`config/themes/theme_whitelist.yml`) with normalization, enforced synergies, and synergy cap (5).
- Expanded curated synergy matrix plus PMI-based inferred synergies (data-driven) blended with curated anchors.
- Test: `test_theme_whitelist_and_synergy_cap.py` validates enforced synergy presence and cap compliance.
- PyYAML dependency for governance parsing.
### Changed
- Tests refactored to use pytest assertions and streamlined fixtures/utilities to reduce noise and deprecations.
- HTTP-dependent tests skip gracefully when the local web server is unavailable.
- Theme normalization (ETB -> Enter the Battlefield, Self Mill -> Mill, Pillow Fort -> Pillowfort, Reanimator -> Reanimate) applied prior to synergy derivation.
- Synergy output capped to 5 entries per theme (curated > enforced > inferred ordering).
### Fixed
- Reduced deprecation warnings and incidental test failures; improved consistency across runs.
- Removed ultra-rare themes (frequency <=1) except those protected/always included via whitelist.
---

View file

@ -74,6 +74,45 @@ class DeckBuilder(
ColorBalanceMixin,
ReportingMixin
):
# Seedable RNG support (minimal surface area):
# - seed: optional seed value stored for diagnostics
# - _rng: internal Random instance; access via self.rng
seed: Optional[int] = field(default=None, repr=False)
_rng: Any = field(default=None, repr=False)
@property
def rng(self):
"""Lazy, per-builder RNG instance. If a seed was set, use it deterministically."""
if self._rng is None:
try:
# If a seed was assigned pre-init, use it
if self.seed is not None:
# Import here to avoid any heavy import cycles at module import time
from random_util import set_seed as _set_seed # type: ignore
self._rng = _set_seed(int(self.seed))
else:
self._rng = random.Random()
except Exception:
# Fallback to module random
self._rng = random
return self._rng
def set_seed(self, seed: int | str) -> None:
"""Set deterministic seed for this builder and reset its RNG instance."""
try:
from random_util import derive_seed_from_string as _derive, set_seed as _set_seed # type: ignore
s = _derive(seed)
self.seed = int(s)
self._rng = _set_seed(s)
except Exception:
try:
self.seed = int(seed) if not isinstance(seed, int) else seed
r = random.Random()
r.seed(self.seed)
self._rng = r
except Exception:
# Leave RNG as-is on unexpected error
pass
def build_deck_full(self):
"""Orchestrate the full deck build process, chaining all major phases."""
start_ts = datetime.datetime.now()
@ -712,10 +751,8 @@ class DeckBuilder(
# RNG Initialization
# ---------------------------
def _get_rng(self): # lazy init
if self._rng is None:
import random as _r
self._rng = _r
return self._rng
# Delegate to seedable rng property for determinism support
return self.rng
# ---------------------------
# Data Loading
@ -1003,8 +1040,10 @@ class DeckBuilder(
self.determine_color_identity()
dfs = []
required = getattr(bc, 'CSV_REQUIRED_COLUMNS', [])
from path_util import csv_dir as _csv_dir
base = _csv_dir()
for stem in self.files_to_load:
path = f'csv_files/{stem}_cards.csv'
path = f"{base}/{stem}_cards.csv"
try:
df = pd.read_csv(path)
if required:

View file

@ -1,5 +1,6 @@
from typing import Dict, List, Final, Tuple, Union, Callable, Any as _Any
from settings import CARD_DATA_COLUMNS as CSV_REQUIRED_COLUMNS # unified
from path_util import csv_dir
__all__ = [
'CSV_REQUIRED_COLUMNS'
@ -13,7 +14,7 @@ MAX_FUZZY_CHOICES: Final[int] = 5 # Maximum number of fuzzy match choices
# Commander-related constants
DUPLICATE_CARD_FORMAT: Final[str] = '{card_name} x {count}'
COMMANDER_CSV_PATH: Final[str] = 'csv_files/commander_cards.csv'
COMMANDER_CSV_PATH: Final[str] = f"{csv_dir()}/commander_cards.csv"
DECK_DIRECTORY = '../deck_files'
COMMANDER_CONVERTERS: Final[Dict[str, str]] = {'themeTags': ast.literal_eval, 'creatureTypes': ast.literal_eval} # CSV loading converters
COMMANDER_POWER_DEFAULT: Final[int] = 0

View file

@ -121,7 +121,7 @@ class CreatureAdditionMixin:
if owned_lower and str(nm).lower() in owned_lower:
w *= owned_mult
weighted_pool.append((nm, w))
chosen_all = bu.weighted_sample_without_replacement(weighted_pool, target_cap)
chosen_all = bu.weighted_sample_without_replacement(weighted_pool, target_cap, rng=getattr(self, 'rng', None))
for nm in chosen_all:
if commander_name and nm == commander_name:
continue
@ -201,7 +201,7 @@ class CreatureAdditionMixin:
if owned_lower and str(nm).lower() in owned_lower:
base_w *= owned_mult
weighted_pool.append((nm, base_w))
chosen = bu.weighted_sample_without_replacement(weighted_pool, target)
chosen = bu.weighted_sample_without_replacement(weighted_pool, target, rng=getattr(self, 'rng', None))
for nm in chosen:
if commander_name and nm == commander_name:
continue
@ -507,7 +507,7 @@ class CreatureAdditionMixin:
return
synergy_bonus = getattr(bc, 'THEME_PRIORITY_BONUS', 1.2)
weighted_pool = [(nm, (synergy_bonus if mm >= 2 else 1.0)) for nm, mm in zip(pool['name'], pool['_multiMatch'])]
chosen = bu.weighted_sample_without_replacement(weighted_pool, target)
chosen = bu.weighted_sample_without_replacement(weighted_pool, target, rng=getattr(self, 'rng', None))
added = 0
for nm in chosen:
row = pool[pool['name']==nm].iloc[0]
@ -621,7 +621,7 @@ class CreatureAdditionMixin:
if owned_lower and str(nm).lower() in owned_lower:
w *= owned_mult
weighted_pool.append((nm, w))
chosen_all = bu.weighted_sample_without_replacement(weighted_pool, target_cap)
chosen_all = bu.weighted_sample_without_replacement(weighted_pool, target_cap, rng=getattr(self, 'rng', None))
added = 0
for nm in chosen_all:
row = subset_all[subset_all['name'] == nm].iloc[0]

View file

@ -139,7 +139,14 @@ class SpellAdditionMixin:
for name, entry in self.card_library.items():
if any(isinstance(t, str) and 'ramp' in t.lower() for t in entry.get('Tags', [])):
existing_ramp += 1
to_add, _bonus = bu.compute_adjusted_target('Ramp', target_total, existing_ramp, self.output_func, plural_word='ramp spells')
to_add, _bonus = bu.compute_adjusted_target(
'Ramp',
target_total,
existing_ramp,
self.output_func,
plural_word='ramp spells',
rng=getattr(self, 'rng', None)
)
if existing_ramp >= target_total and to_add == 0:
return
if existing_ramp < target_total:
@ -290,7 +297,14 @@ class SpellAdditionMixin:
lt = [str(t).lower() for t in entry.get('Tags', [])]
if any(('removal' in t or 'spot removal' in t) for t in lt) and not any(('board wipe' in t or 'mass removal' in t) for t in lt):
existing += 1
to_add, _bonus = bu.compute_adjusted_target('Removal', target, existing, self.output_func, plural_word='removal spells')
to_add, _bonus = bu.compute_adjusted_target(
'Removal',
target,
existing,
self.output_func,
plural_word='removal spells',
rng=getattr(self, 'rng', None)
)
if existing >= target and to_add == 0:
return
target = to_add if existing < target else to_add
@ -360,7 +374,14 @@ class SpellAdditionMixin:
tags = [str(t).lower() for t in entry.get('Tags', [])]
if any(('board wipe' in t or 'mass removal' in t) for t in tags):
existing += 1
to_add, _bonus = bu.compute_adjusted_target('Board wipe', target, existing, self.output_func, plural_word='wipes')
to_add, _bonus = bu.compute_adjusted_target(
'Board wipe',
target,
existing,
self.output_func,
plural_word='wipes',
rng=getattr(self, 'rng', None)
)
if existing >= target and to_add == 0:
return
target = to_add if existing < target else to_add
@ -407,7 +428,14 @@ class SpellAdditionMixin:
tags = [str(t).lower() for t in entry.get('Tags', [])]
if any(('draw' in t) or ('card advantage' in t) for t in tags):
existing += 1
to_add_total, _bonus = bu.compute_adjusted_target('Card advantage', total_target, existing, self.output_func, plural_word='draw spells')
to_add_total, _bonus = bu.compute_adjusted_target(
'Card advantage',
total_target,
existing,
self.output_func,
plural_word='draw spells',
rng=getattr(self, 'rng', None)
)
if existing >= total_target and to_add_total == 0:
return
total_target = to_add_total if existing < total_target else to_add_total
@ -540,7 +568,14 @@ class SpellAdditionMixin:
tags = [str(t).lower() for t in entry.get('Tags', [])]
if any('protection' in t for t in tags):
existing += 1
to_add, _bonus = bu.compute_adjusted_target('Protection', target, existing, self.output_func, plural_word='protection spells')
to_add, _bonus = bu.compute_adjusted_target(
'Protection',
target,
existing,
self.output_func,
plural_word='protection spells',
rng=getattr(self, 'rng', None)
)
if existing >= target and to_add == 0:
return
target = to_add if existing < target else to_add
@ -705,7 +740,7 @@ class SpellAdditionMixin:
if owned_lower and str(nm).lower() in owned_lower:
base_w *= owned_mult
weighted_pool.append((nm, base_w))
chosen = bu.weighted_sample_without_replacement(weighted_pool, target)
chosen = bu.weighted_sample_without_replacement(weighted_pool, target, rng=getattr(self, 'rng', None))
for nm in chosen:
row = pool[pool['name'] == nm].iloc[0]
self.add_card(

View file

@ -0,0 +1,181 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
import time
import pandas as pd
from deck_builder import builder_constants as bc
from random_util import get_random, generate_seed
@dataclass
class RandomBuildResult:
seed: int
commander: str
theme: Optional[str]
constraints: Optional[Dict[str, Any]]
def to_dict(self) -> Dict[str, Any]:
return {
"seed": int(self.seed),
"commander": self.commander,
"theme": self.theme,
"constraints": self.constraints or {},
}
def _load_commanders_df() -> pd.DataFrame:
"""Load commander CSV using the same path/converters as the builder.
Uses bc.COMMANDER_CSV_PATH and bc.COMMANDER_CONVERTERS for consistency.
"""
return pd.read_csv(bc.COMMANDER_CSV_PATH, converters=getattr(bc, "COMMANDER_CONVERTERS", None))
def _filter_by_theme(df: pd.DataFrame, theme: Optional[str]) -> pd.DataFrame:
if not theme:
return df
t = str(theme).strip().lower()
try:
mask = df.get("themeTags").apply(
lambda tags: any(str(x).strip().lower() == t for x in (tags or []))
)
sub = df[mask]
if len(sub) > 0:
return sub
except Exception:
pass
return df
def build_random_deck(
theme: Optional[str] = None,
constraints: Optional[Dict[str, Any]] = None,
seed: Optional[int | str] = None,
attempts: int = 5,
timeout_s: float = 5.0,
) -> RandomBuildResult:
"""Thin wrapper for random selection of a commander, deterministic when seeded.
Contract (initial/minimal):
- Inputs: optional theme filter, optional constraints dict, seed for determinism,
attempts (max reroll attempts), timeout_s (wall clock cap).
- Output: RandomBuildResult with chosen commander and the resolved seed.
Notes:
- This does NOT run the full deck builder yet; it focuses on picking a commander
deterministically for tests and plumbing. Full pipeline can be layered later.
- Determinism: when `seed` is provided, selection is stable across runs.
- When `seed` is None, a new high-entropy seed is generated and returned.
"""
# Resolve seed and RNG
resolved_seed = int(seed) if isinstance(seed, int) or (isinstance(seed, str) and str(seed).isdigit()) else None
if resolved_seed is None:
resolved_seed = generate_seed()
rng = get_random(resolved_seed)
# Bounds sanitation
attempts = max(1, int(attempts or 1))
try:
timeout_s = float(timeout_s)
except Exception:
timeout_s = 5.0
timeout_s = max(0.1, timeout_s)
# Load commander pool and apply theme filter (if any)
df_all = _load_commanders_df()
df = _filter_by_theme(df_all, theme)
# Stable ordering then seeded selection for deterministic behavior
names: List[str] = sorted(df["name"].astype(str).tolist()) if not df.empty else []
if not names:
# Fall back to entire pool by name if theme produced nothing
names = sorted(df_all["name"].astype(str).tolist())
if not names:
# Absolute fallback for pathological cases
names = ["Unknown Commander"]
# Simple attempt/timeout loop (placeholder for future constraints checks)
start = time.time()
pick = None
for _ in range(attempts):
if (time.time() - start) > timeout_s:
break
idx = rng.randrange(0, len(names))
candidate = names[idx]
# For now, accept the first candidate; constraint hooks can be added here.
pick = candidate
break
if pick is None:
# Timeout/attempts exhausted; choose deterministically based on seed modulo
pick = names[resolved_seed % len(names)]
return RandomBuildResult(seed=int(resolved_seed), commander=pick, theme=theme, constraints=constraints or {})
__all__ = [
"RandomBuildResult",
"build_random_deck",
]
# Full-build wrapper for deterministic end-to-end builds
@dataclass
class RandomFullBuildResult(RandomBuildResult):
decklist: List[Dict[str, Any]] | None = None
diagnostics: Dict[str, Any] | None = None
def build_random_full_deck(
theme: Optional[str] = None,
constraints: Optional[Dict[str, Any]] = None,
seed: Optional[int | str] = None,
attempts: int = 5,
timeout_s: float = 5.0,
) -> RandomFullBuildResult:
"""Select a commander deterministically, then run a full deck build via DeckBuilder.
Returns a compact result including the seed, commander, and a summarized decklist.
"""
base = build_random_deck(theme=theme, constraints=constraints, seed=seed, attempts=attempts, timeout_s=timeout_s)
# Run the full headless build with the chosen commander and the same seed
try:
from headless_runner import run as _run # type: ignore
except Exception as e:
return RandomFullBuildResult(
seed=base.seed,
commander=base.commander,
theme=base.theme,
constraints=base.constraints or {},
decklist=None,
diagnostics={"error": f"headless runner unavailable: {e}"},
)
builder = _run(command_name=base.commander, seed=base.seed)
# Summarize the decklist from builder.card_library
deck_items: List[Dict[str, Any]] = []
try:
lib = getattr(builder, 'card_library', {}) or {}
for name, info in lib.items():
try:
cnt = int(info.get('Count', 1)) if isinstance(info, dict) else 1
except Exception:
cnt = 1
deck_items.append({"name": str(name), "count": cnt})
deck_items.sort(key=lambda x: (str(x.get("name", "").lower()), int(x.get("count", 0))))
except Exception:
deck_items = []
diags: Dict[str, Any] = {"attempts": 1, "timeout_s": timeout_s}
return RandomFullBuildResult(
seed=base.seed,
commander=base.commander,
theme=base.theme,
constraints=base.constraints or {},
decklist=deck_items,
diagnostics=diags,
)

View file

@ -65,6 +65,7 @@ def run(
enforcement_mode: str = "warn",
allow_illegal: bool = False,
fuzzy_matching: bool = True,
seed: Optional[int | str] = None,
) -> DeckBuilder:
"""Run a scripted non-interactive deck build and return the DeckBuilder instance."""
scripted_inputs: List[str] = []
@ -109,6 +110,12 @@ def run(
return ""
builder = DeckBuilder(input_func=scripted_input)
# Optional deterministic seed for Random Modes (does not affect core when unset)
try:
if seed is not None:
builder.set_seed(seed) # type: ignore[attr-defined]
except Exception:
pass
# Mark this run as headless so builder can adjust exports and logging
try:
builder.headless = True # type: ignore[attr-defined]

16
code/path_util.py Normal file
View file

@ -0,0 +1,16 @@
from __future__ import annotations
import os
def csv_dir() -> str:
"""Return the base directory for CSV files.
Defaults to 'csv_files'. Override with CSV_FILES_DIR for tests or advanced setups.
"""
try:
base = os.getenv("CSV_FILES_DIR")
base = base.strip() if isinstance(base, str) else None
return base or "csv_files"
except Exception:
return "csv_files"

69
code/random_util.py Normal file
View file

@ -0,0 +1,69 @@
from __future__ import annotations
import hashlib
import secrets
import random
from typing import Union
"""
Seeded RNG utilities for deterministic behavior.
Contract (minimal):
- derive_seed_from_string(s): produce a stable, platform-independent int seed from a string or int.
- set_seed(seed): return a new random.Random instance seeded deterministically.
- generate_seed(): return a high-entropy, non-negative int suitable for seeding.
- get_random(seed=None): convenience to obtain a new Random instance (seeded when provided).
No globals/state: each call returns an independent Random instance.
"""
SeedLike = Union[int, str]
def _to_bytes(s: str) -> bytes:
try:
return s.encode("utf-8", errors="strict")
except Exception:
# Best-effort fallback
return s.encode("utf-8", errors="ignore")
def derive_seed_from_string(seed: SeedLike) -> int:
"""Derive a stable positive integer seed from a string or int.
- int inputs are normalized to a non-negative 63-bit value.
- str inputs use SHA-256 to generate a deterministic 63-bit value.
"""
if isinstance(seed, int):
# Normalize to 63-bit positive
return abs(int(seed)) & ((1 << 63) - 1)
# String path: deterministic, platform-independent
data = _to_bytes(str(seed))
h = hashlib.sha256(data).digest()
# Use first 8 bytes (64 bits) and mask to 63 bits to avoid sign issues
n = int.from_bytes(h[:8], byteorder="big", signed=False)
return n & ((1 << 63) - 1)
def set_seed(seed: SeedLike) -> random.Random:
"""Return a new Random instance seeded deterministically from the given seed."""
r = random.Random()
r.seed(derive_seed_from_string(seed))
return r
def get_random(seed: SeedLike | None = None) -> random.Random:
"""Return a new Random instance; seed when provided.
This avoids mutating the module-global PRNG and keeps streams isolated.
"""
if seed is None:
return random.Random()
return set_seed(seed)
def generate_seed() -> int:
"""Return a high-entropy positive 63-bit integer suitable for seeding."""
# secrets is preferred for entropy here; mask to 63 bits for consistency
return secrets.randbits(63)

View file

@ -0,0 +1,526 @@
import os
import json
import re
import sys
from collections import Counter
from typing import Dict, List, Set, Any
import pandas as pd
import itertools
import math
try:
import yaml # type: ignore
except Exception: # pragma: no cover - optional dependency; script warns if missing
yaml = None
# Ensure local 'code' package shadows stdlib 'code' module
ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
if ROOT not in sys.path:
sys.path.insert(0, ROOT)
from code.settings import CSV_DIRECTORY # type: ignore
from code.tagging import tag_constants # type: ignore
BASE_COLORS = {
'white': 'W',
'blue': 'U',
'black': 'B',
'red': 'R',
'green': 'G',
}
COLOR_LETTERS = set(BASE_COLORS.values())
def collect_theme_tags_from_constants() -> Set[str]:
tags: Set[str] = set()
# TYPE_TAG_MAPPING values
for tags_list in tag_constants.TYPE_TAG_MAPPING.values():
tags.update(tags_list)
# DRAW_RELATED_TAGS
tags.update(tag_constants.DRAW_RELATED_TAGS)
# Some known groupings categories as tags
for tgroup in tag_constants.TAG_GROUPS.values():
tags.update(tgroup)
# Known specific tags referenced in constants
for name in dir(tag_constants):
if name.endswith('_RELATED_TAGS') or name.endswith('_SPECIFIC_CARDS'):
val = getattr(tag_constants, name)
if isinstance(val, list):
# Only include tag-like strings (skip obvious card names)
for v in val:
if isinstance(v, str) and re.search(r"[A-Za-z]", v) and ' ' in v:
# Heuristic inclusion
pass
return tags
def collect_theme_tags_from_tagger_source() -> Set[str]:
tags: Set[str] = set()
tagger_path = os.path.join(os.path.dirname(__file__), '..', 'tagging', 'tagger.py')
tagger_path = os.path.abspath(tagger_path)
with open(tagger_path, 'r', encoding='utf-8') as f:
src = f.read()
# Find tag_utils.apply_tag_vectorized(df, mask, ['Tag1', 'Tag2', ...]) occurrences
vector_calls = re.findall(r"apply_tag_vectorized\([^\)]*\[([^\]]+)\]", src)
for group in vector_calls:
# Split strings within the list literal
parts = re.findall(r"'([^']+)'|\"([^\"]+)\"", group)
for a, b in parts:
s = a or b
if s:
tags.add(s)
# Also capture tags passed via apply_rules([... {'tags': [ ... ]} ...])
for group in re.findall(r"'tags'\s*:\s*\[([^\]]+)\]", src):
parts = re.findall(r"'([^']+)'|\"([^\"]+)\"", group)
for a, b in parts:
s = a or b
if s:
tags.add(s)
# Also capture tags passed via apply_rules([... {'tags': [ ... ]} ...])
for group in re.findall(r"['\"]tags['\"]\s*:\s*\[([^\]]+)\]", src):
parts = re.findall(r"'([^']+)'|\"([^\"]+)\"", group)
for a, b in parts:
s = a or b
if s:
tags.add(s)
return tags
def tally_tag_frequencies_by_base_color() -> Dict[str, Dict[str, int]]:
result: Dict[str, Dict[str, int]] = {c: Counter() for c in BASE_COLORS.keys()}
# Iterate over per-color CSVs; if not present, skip
for color in BASE_COLORS.keys():
path = os.path.join(CSV_DIRECTORY, f"{color}_cards.csv")
if not os.path.exists(path):
continue
try:
df = pd.read_csv(path, converters={'themeTags': pd.eval, 'colorIdentity': pd.eval})
except Exception:
df = pd.read_csv(path)
if 'themeTags' in df.columns:
try:
df['themeTags'] = df['themeTags'].apply(pd.eval)
except Exception:
df['themeTags'] = df['themeTags'].apply(lambda x: [])
if 'colorIdentity' in df.columns:
try:
df['colorIdentity'] = df['colorIdentity'].apply(pd.eval)
except Exception:
pass
if 'themeTags' not in df.columns:
continue
# Derive base colors from colorIdentity if available, else assume single color file
def rows_base_colors(row):
ids = row.get('colorIdentity') if isinstance(row, dict) else row
if isinstance(ids, list):
letters = set(ids)
else:
letters = set()
derived = set()
for name, letter in BASE_COLORS.items():
if letter in letters:
derived.add(name)
if not derived:
derived.add(color)
return derived
# Iterate rows
for _, row in df.iterrows():
tags = row['themeTags'] if isinstance(row['themeTags'], list) else []
# Compute base colors contribution
ci = row['colorIdentity'] if 'colorIdentity' in row else None
letters = set(ci) if isinstance(ci, list) else set()
bases = {name for name, letter in BASE_COLORS.items() if letter in letters}
if not bases:
bases = {color}
for bc in bases:
for t in tags:
result[bc][t] += 1
# Convert Counters to plain dicts
return {k: dict(v) for k, v in result.items()}
def gather_theme_tag_rows() -> List[List[str]]:
"""Collect per-card themeTags lists across all base color CSVs.
Returns a list of themeTags arrays, one per card row where themeTags is present.
"""
rows: List[List[str]] = []
for color in BASE_COLORS.keys():
path = os.path.join(CSV_DIRECTORY, f"{color}_cards.csv")
if not os.path.exists(path):
continue
try:
df = pd.read_csv(path, converters={'themeTags': pd.eval})
except Exception:
df = pd.read_csv(path)
if 'themeTags' in df.columns:
try:
df['themeTags'] = df['themeTags'].apply(pd.eval)
except Exception:
df['themeTags'] = df['themeTags'].apply(lambda x: [])
if 'themeTags' not in df.columns:
continue
for _, row in df.iterrows():
tags = row['themeTags'] if isinstance(row['themeTags'], list) else []
if tags:
rows.append(tags)
return rows
def compute_cooccurrence(rows: List[List[str]]):
"""Compute co-occurrence counts between tags.
Returns:
- co: dict[tag] -> Counter(other_tag -> co_count)
- counts: Counter[tag] overall occurrence counts
- total_rows: int number of rows (cards considered)
"""
co: Dict[str, Counter] = {}
counts: Counter = Counter()
for tags in rows:
uniq = sorted(set(t for t in tags if isinstance(t, str) and t))
for t in uniq:
counts[t] += 1
for a, b in itertools.combinations(uniq, 2):
co.setdefault(a, Counter())[b] += 1
co.setdefault(b, Counter())[a] += 1
return co, counts, len(rows)
def cooccurrence_scores_for(anchor: str, co: Dict[str, Counter], counts: Counter, total_rows: int) -> List[tuple[str, float, int]]:
"""Return list of (other_tag, score, co_count) sorted by score desc.
Score uses PMI: log2( (co_count * total_rows) / (count_a * count_b) ).
"""
results: List[tuple[str, float, int]] = []
if anchor not in co:
return results
count_a = max(1, counts.get(anchor, 1))
for other, co_count in co[anchor].items():
count_b = max(1, counts.get(other, 1))
# Avoid div by zero; require minimal counts
if co_count <= 0:
continue
# PMI
pmi = math.log2((co_count * max(1, total_rows)) / (count_a * count_b))
results.append((other, pmi, co_count))
results.sort(key=lambda x: (-x[1], -x[2], x[0]))
return results
def derive_synergies_for_tags(tags: Set[str]) -> Dict[str, List[str]]:
# Curated baseline mappings for important themes (extended)
pairs = [
# Tokens / go-wide
("Tokens Matter", ["Token Creation", "Creature Tokens", "Populate"]),
("Creature Tokens", ["Tokens Matter", "Token Creation", "Populate"]),
("Token Creation", ["Tokens Matter", "Creature Tokens", "Populate"]),
# Spells
("Spellslinger", ["Spells Matter", "Prowess", "Noncreature Spells"]),
("Noncreature Spells", ["Spellslinger", "Prowess"]),
("Prowess", ["Spellslinger", "Noncreature Spells"]),
# Artifacts / Enchantments
("Artifacts Matter", ["Treasure Token", "Equipment", "Vehicles", "Improvise"]),
("Enchantments Matter", ["Auras", "Constellation", "Card Draw"]),
("Auras", ["Constellation", "Voltron", "Enchantments Matter"]),
("Equipment", ["Voltron", "Double Strike", "Warriors Matter"]),
("Treasure Token", ["Sacrifice Matters", "Artifacts Matter", "Ramp"]),
("Vehicles", ["Artifacts Matter", "Equipment"]),
# Counters / Proliferate
("Counters Matter", ["Proliferate", "+1/+1 Counters", "Adapt", "Outlast"]),
("+1/+1 Counters", ["Proliferate", "Counters Matter", "Adapt", "Evolve"]),
("-1/-1 Counters", ["Proliferate", "Counters Matter", "Wither", "Persist", "Infect"]),
("Proliferate", ["Counters Matter", "+1/+1 Counters", "Planeswalkers"]),
# Lands / ramp
("Lands Matter", ["Landfall", "Domain", "Land Tutors"]),
("Landfall", ["Lands Matter", "Ramp", "Token Creation"]),
("Domain", ["Lands Matter", "Ramp"]),
# Combat / Voltron
("Voltron", ["Equipment", "Auras", "Double Strike"]),
# Card flow
("Card Draw", ["Loot", "Wheels", "Replacement Draw", "Unconditional Draw", "Conditional Draw"]),
("Loot", ["Card Draw", "Discard Matters", "Reanimate"]),
("Wheels", ["Discard Matters", "Card Draw", "Spellslinger"]),
("Discard Matters", ["Loot", "Wheels", "Hellbent", "Reanimate"]),
# Sacrifice / death
("Aristocrats", ["Sacrifice", "Death Triggers", "Token Creation"]),
("Sacrifice", ["Aristocrats", "Death Triggers", "Treasure Token"]),
("Death Triggers", ["Aristocrats", "Sacrifice"]),
# Graveyard cluster
("Graveyard Matters", ["Reanimate", "Mill", "Unearth", "Surveil"]),
("Reanimate", ["Mill", "Graveyard Matters", "Enter the Battlefield"]),
("Unearth", ["Reanimate", "Graveyard Matters"]),
("Surveil", ["Mill", "Reanimate", "Graveyard Matters"]),
# Planeswalkers / blink
("Superfriends", ["Planeswalkers", "Proliferate", "Token Creation"]),
("Planeswalkers", ["Proliferate", "Superfriends"]),
("Enter the Battlefield", ["Blink", "Reanimate", "Token Creation"]),
("Blink", ["Enter the Battlefield", "Flicker", "Token Creation"]),
# Politics / table dynamics
("Stax", ["Taxing Effects", "Hatebears"]),
("Monarch", ["Politics", "Group Hug", "Card Draw"]),
("Group Hug", ["Politics", "Card Draw"]),
# Life
("Life Matters", ["Lifegain", "Lifedrain", "Extort"]),
("Lifegain", ["Life Matters", "Lifedrain", "Extort"]),
("Lifedrain", ["Lifegain", "Life Matters"]),
# Treasure / economy cross-link
("Ramp", ["Treasure Token", "Land Tutors"]),
]
m: Dict[str, List[str]] = {}
for base, syn in pairs:
if base in tags:
m[base] = syn
return m
def load_whitelist_config() -> Dict[str, Any]:
"""Load whitelist governance YAML if present.
Returns empty dict if file missing or YAML unavailable.
"""
path = os.path.join('config', 'themes', 'theme_whitelist.yml')
if not os.path.exists(path) or yaml is None:
return {}
try:
with open(path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f) or {}
if not isinstance(data, dict):
return {}
return data
except Exception:
return {}
def apply_normalization(tags: Set[str], normalization: Dict[str, str]) -> Set[str]:
if not normalization:
return tags
normalized = set()
for t in tags:
normalized.add(normalization.get(t, t))
return normalized
def should_keep_theme(theme: str, total_count: int, cfg: Dict[str, Any], protected_prefixes: List[str], protected_suffixes: List[str], min_overrides: Dict[str, int]) -> bool:
# Always include explicit always_include list
if theme in cfg.get('always_include', []):
return True
# Protected prefixes/suffixes
for pref in protected_prefixes:
if theme.startswith(pref + ' '): # prefix followed by space
return True
for suff in protected_suffixes:
if theme.endswith(' ' + suff) or theme.endswith(suff):
return True
# Min frequency override
if theme in min_overrides:
return total_count >= min_overrides[theme]
# Default global rule (>1 occurrences)
return total_count > 1
def main() -> None:
whitelist_cfg = load_whitelist_config()
normalization_map: Dict[str, str] = whitelist_cfg.get('normalization', {}) if isinstance(whitelist_cfg.get('normalization', {}), dict) else {}
exclusions: Set[str] = set(whitelist_cfg.get('exclusions', []) or [])
protected_prefixes: List[str] = list(whitelist_cfg.get('protected_prefixes', []) or [])
protected_suffixes: List[str] = list(whitelist_cfg.get('protected_suffixes', []) or [])
min_overrides: Dict[str, int] = whitelist_cfg.get('min_frequency_overrides', {}) or {}
synergy_cap: int = int(whitelist_cfg.get('synergy_cap', 0) or 0)
enforced_synergies_cfg: Dict[str, List[str]] = whitelist_cfg.get('enforced_synergies', {}) or {}
theme_tags = set()
theme_tags |= collect_theme_tags_from_constants()
theme_tags |= collect_theme_tags_from_tagger_source()
# Also include any tags that already exist in the per-color CSVs. This captures
# dynamically constructed tags like "{CreatureType} Kindred" that don't appear
# as string literals in source code but are present in data.
try:
csv_rows = gather_theme_tag_rows()
if csv_rows:
for row_tags in csv_rows:
for t in row_tags:
if isinstance(t, str) and t:
theme_tags.add(t)
except Exception:
# If CSVs are unavailable, continue with tags from code only
csv_rows = []
# Normalization before other operations (so pruning & synergies use canonical names)
if normalization_map:
theme_tags = apply_normalization(theme_tags, normalization_map)
# Remove excluded / blacklisted helper tags we might not want to expose as themes
blacklist = {"Draw Triggers"}
theme_tags = {t for t in theme_tags if t and t not in blacklist and t not in exclusions}
# If we have frequency data, filter out extremely rare themes
# Rule: Drop any theme whose total count across all base colors is <= 1
# This removes one-off/accidental tags from the theme catalog.
# We apply the filter only when frequencies were computed successfully.
try:
_freq_probe = tally_tag_frequencies_by_base_color()
has_freqs = bool(_freq_probe)
except Exception:
has_freqs = False
if has_freqs:
def total_count(t: str) -> int:
total = 0
for color in BASE_COLORS.keys():
try:
total += int(_freq_probe.get(color, {}).get(t, 0))
except Exception:
pass
return total
kept: Set[str] = set()
for t in list(theme_tags):
if should_keep_theme(t, total_count(t), whitelist_cfg, protected_prefixes, protected_suffixes, min_overrides):
kept.add(t)
# Merge always_include even if absent
for extra in whitelist_cfg.get('always_include', []) or []:
kept.add(extra if isinstance(extra, str) else str(extra))
theme_tags = kept
# Sort tags for stable output
sorted_tags = sorted(theme_tags)
# Derive synergies mapping
synergies = derive_synergies_for_tags(theme_tags)
# Tally frequencies by base color if CSVs exist
try:
frequencies = tally_tag_frequencies_by_base_color()
except Exception:
frequencies = {}
# Co-occurrence synergies (data-driven) if CSVs exist
try:
# Reuse rows from earlier if available; otherwise gather now
rows = csv_rows if 'csv_rows' in locals() and csv_rows else gather_theme_tag_rows()
co_map, tag_counts, total_rows = compute_cooccurrence(rows)
except Exception:
rows = []
co_map, tag_counts, total_rows = {}, Counter(), 0
# Helper: compute primary/secondary colors for a theme
def primary_secondary_for(theme: str, freqs: Dict[str, Dict[str, int]]):
if not freqs:
return None, None
# Collect counts per base color for this theme
items = []
for color in BASE_COLORS.keys():
count = 0
try:
count = int(freqs.get(color, {}).get(theme, 0))
except Exception:
count = 0
items.append((color, count))
# Sort by count desc, then by color name for stability
items.sort(key=lambda x: (-x[1], x[0]))
# If all zeros, return None
if not items or items[0][1] <= 0:
return None, None
color_title = {
'white': 'White', 'blue': 'Blue', 'black': 'Black', 'red': 'Red', 'green': 'Green'
}
primary = color_title[items[0][0]]
secondary = None
# Find the next non-zero distinct color if available
for c, n in items[1:]:
if n > 0:
secondary = color_title[c]
break
return primary, secondary
output = []
def _uniq(seq: List[str]) -> List[str]:
seen = set()
out: List[str] = []
for x in seq:
if x not in seen:
out.append(x)
seen.add(x)
return out
for t in sorted_tags:
p, s = primary_secondary_for(t, frequencies)
# Build synergy list: curated + top co-occurrences
curated = synergies.get(t, [])
inferred: List[str] = []
if t in co_map and total_rows > 0:
# Denylist for clearly noisy combos
denylist = {
('-1/-1 Counters', 'Burn'),
('-1/-1 Counters', 'Voltron'),
}
# Whitelist focus for specific anchors
focus: Dict[str, List[str]] = {
'-1/-1 Counters': ['Counters Matter', 'Infect', 'Proliferate', 'Wither', 'Persist'],
}
# Compute PMI scores and filter
scored = cooccurrence_scores_for(t, co_map, tag_counts, total_rows)
# Keep only positive PMI and co-occurrence >= 5 (tunable)
filtered = [(o, s, c) for (o, s, c) in scored if s > 0 and c >= 5]
# If focused tags exist, ensure they bubble up first when present
preferred = focus.get(t, [])
if preferred:
# Partition into preferred and others
pref = [x for x in filtered if x[0] in preferred]
others = [x for x in filtered if x[0] not in preferred]
filtered = pref + others
# Select up to 6, skipping denylist and duplicates
for other, _score, _c in filtered:
if (t, other) in denylist or (other, t) in denylist:
continue
if other == t or other in curated or other in inferred:
continue
inferred.append(other)
if len(inferred) >= 6:
break
combined = list(curated)
# Enforced synergies from config (high precedence after curated)
enforced = enforced_synergies_cfg.get(t, [])
for es in enforced:
if es != t and es not in combined:
combined.append(es)
# Legacy automatic enforcement (backwards compatibility) if not already covered by enforced config
if not enforced:
if re.search(r'counter', t, flags=re.IGNORECASE) or t == 'Proliferate':
for needed in ['Counters Matter', 'Proliferate']:
if needed != t and needed not in combined:
combined.append(needed)
if re.search(r'token', t, flags=re.IGNORECASE) and t != 'Tokens Matter':
if 'Tokens Matter' not in combined:
combined.append('Tokens Matter')
# Append inferred last (lowest precedence)
for inf in inferred:
if inf != t and inf not in combined:
combined.append(inf)
# Deduplicate
combined = _uniq(combined)
# Apply synergy cap if configured (>0)
if synergy_cap > 0 and len(combined) > synergy_cap:
combined = combined[:synergy_cap]
entry = {
"theme": t,
"synergies": combined,
}
if p:
entry["primary_color"] = p
if s:
entry["secondary_color"] = s
output.append(entry)
os.makedirs(os.path.join('config', 'themes'), exist_ok=True)
with open(os.path.join('config', 'themes', 'theme_list.json'), 'w', encoding='utf-8') as f:
json.dump({
"themes": output,
"frequencies_by_base_color": frequencies,
"generated_from": "tagger + constants",
}, f, indent=2, ensure_ascii=False)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,15 @@
from __future__ import annotations
from deck_builder.builder import DeckBuilder
def test_builder_rng_same_seed_identical_streams():
b1 = DeckBuilder()
b1.set_seed('alpha')
seq1 = [b1.rng.random() for _ in range(5)]
b2 = DeckBuilder()
b2.set_seed('alpha')
seq2 = [b2.rng.random() for _ in range(5)]
assert seq1 == seq2

View file

@ -0,0 +1,33 @@
from deck_builder import builder_utils as bu
from random_util import set_seed
def test_weighted_sample_deterministic_same_seed():
pool = [("a", 1), ("b", 2), ("c", 3), ("d", 4)]
k = 3
rng1 = set_seed(12345)
sel1 = bu.weighted_sample_without_replacement(pool, k, rng=rng1)
# Reset to the same seed and expect the same selection order
rng2 = set_seed(12345)
sel2 = bu.weighted_sample_without_replacement(pool, k, rng=rng2)
assert sel1 == sel2
def test_compute_adjusted_target_deterministic_same_seed():
# Use a simple output func that collects messages (but we don't assert on them here)
msgs: list[str] = []
out = msgs.append
original_cfg = 10
existing = 4
rng1 = set_seed(999)
to_add1, bonus1 = bu.compute_adjusted_target(
"Ramp", original_cfg, existing, out, plural_word="ramp spells", rng=rng1
)
rng2 = set_seed(999)
to_add2, bonus2 = bu.compute_adjusted_target(
"Ramp", original_cfg, existing, out, plural_word="ramp spells", rng=rng2
)
assert (to_add1, bonus1) == (to_add2, bonus2)

View file

@ -0,0 +1,22 @@
from __future__ import annotations
import importlib
import os
from starlette.testclient import TestClient
def test_random_build_api_commander_and_seed(monkeypatch):
# Enable Random Modes and use tiny dataset
monkeypatch.setenv("RANDOM_MODES", "1")
monkeypatch.setenv("CSV_FILES_DIR", os.path.join("csv_files", "testdata"))
app_module = importlib.import_module('code.web.app')
client = TestClient(app_module.app)
payload = {"seed": 12345, "theme": "Goblin Kindred"}
r = client.post('/api/random_build', json=payload)
assert r.status_code == 200
data = r.json()
assert data["seed"] == 12345
assert isinstance(data.get("commander"), str)
assert data.get("commander")

View file

@ -0,0 +1,21 @@
from __future__ import annotations
import os
from deck_builder.random_entrypoint import build_random_deck
def test_random_build_is_deterministic_with_seed(monkeypatch):
# Force deterministic tiny dataset
monkeypatch.setenv("CSV_FILES_DIR", os.path.join("csv_files", "testdata"))
# Fixed seed should produce same commander consistently
out1 = build_random_deck(seed=12345)
out2 = build_random_deck(seed=12345)
assert out1.commander == out2.commander
assert out1.seed == out2.seed
def test_random_build_uses_theme_when_available(monkeypatch):
monkeypatch.setenv("CSV_FILES_DIR", os.path.join("csv_files", "testdata"))
# On tiny dataset, provide a theme that exists or not; either path should not crash
res = build_random_deck(theme="Goblin Kindred", seed=42)
assert isinstance(res.commander, str) and len(res.commander) > 0

View file

@ -0,0 +1,25 @@
from __future__ import annotations
import importlib
import os
from starlette.testclient import TestClient
def test_random_full_build_api_returns_deck_and_permalink(monkeypatch):
# Enable Random Modes and use tiny dataset
monkeypatch.setenv("RANDOM_MODES", "1")
monkeypatch.setenv("CSV_FILES_DIR", os.path.join("csv_files", "testdata"))
app_module = importlib.import_module('code.web.app')
client = TestClient(app_module.app)
payload = {"seed": 4242, "theme": "Goblin Kindred"}
r = client.post('/api/random_full_build', json=payload)
assert r.status_code == 200
data = r.json()
assert data["seed"] == 4242
assert isinstance(data.get("commander"), str) and data["commander"]
assert isinstance(data.get("decklist"), list)
# Permalink present and shaped like /build/from?state=...
assert data.get("permalink")
assert "/build/from?state=" in data["permalink"]

View file

@ -0,0 +1,17 @@
from __future__ import annotations
import os
from deck_builder.random_entrypoint import build_random_full_deck
def test_random_full_build_is_deterministic_on_frozen_dataset(monkeypatch):
# Use frozen dataset for determinism
monkeypatch.setenv("CSV_FILES_DIR", os.path.join("csv_files", "testdata"))
# Fixed seed should produce the same compact decklist
out1 = build_random_full_deck(theme="Goblin Kindred", seed=777)
out2 = build_random_full_deck(theme="Goblin Kindred", seed=777)
assert out1.seed == out2.seed == 777
assert out1.commander == out2.commander
assert isinstance(out1.decklist, list) and isinstance(out2.decklist, list)
assert out1.decklist == out2.decklist

View file

@ -0,0 +1,45 @@
import os
import json
import pytest
from fastapi.testclient import TestClient
@pytest.fixture(scope="module")
def client():
# Ensure flags and frozen dataset
os.environ["RANDOM_MODES"] = "1"
os.environ["RANDOM_UI"] = "1"
os.environ["CSV_FILES_DIR"] = os.path.join("csv_files", "testdata")
from web.app import app
with TestClient(app) as c:
yield c
def test_api_random_reroll_increments_seed(client: TestClient):
r1 = client.post("/api/random_full_build", json={"seed": 123})
assert r1.status_code == 200, r1.text
data1 = r1.json()
assert data1.get("seed") == 123
r2 = client.post("/api/random_reroll", json={"seed": 123})
assert r2.status_code == 200, r2.text
data2 = r2.json()
assert data2.get("seed") == 124
assert data2.get("permalink")
def test_hx_random_reroll_returns_html(client: TestClient):
headers = {"HX-Request": "true", "Content-Type": "application/json"}
r = client.post("/hx/random_reroll", data=json.dumps({"seed": 42}), headers=headers)
assert r.status_code == 200, r.text
# Accept either HTML fragment or JSON fallback
content_type = r.headers.get("content-type", "")
if "text/html" in content_type:
assert "Seed:" in r.text
else:
j = r.json()
assert j.get("seed") in (42, 43) # depends on increment policy

View file

@ -0,0 +1,37 @@
from __future__ import annotations
from random_util import derive_seed_from_string, set_seed, get_random, generate_seed
def test_derive_seed_from_string_stable():
# Known value derived from SHA-256('test-seed') first 8 bytes masked to 63 bits
assert derive_seed_from_string('test-seed') == 6214070892065607348
# Int passthrough-like behavior (normalized to positive 63-bit)
assert derive_seed_from_string(42) == 42
assert derive_seed_from_string(-42) == 42
def test_set_seed_deterministic_stream():
r1 = set_seed('alpha')
r2 = set_seed('alpha')
seq1 = [r1.random() for _ in range(5)]
seq2 = [r2.random() for _ in range(5)]
assert seq1 == seq2
def test_get_random_unseeded_independent():
a = get_random()
b = get_random()
# Advance a few steps
_ = [a.random() for _ in range(3)]
_ = [b.random() for _ in range(3)]
# They should not be the same object and streams should diverge vs seeded
assert a is not b
def test_generate_seed_range():
s = generate_seed()
assert isinstance(s, int)
assert s >= 0
# Ensure it's within 63-bit range
assert s < (1 << 63)

View file

@ -0,0 +1,18 @@
from __future__ import annotations
import os
from code.headless_runner import run
def test_headless_seed_threads_into_builder(monkeypatch):
# Use the tiny test dataset for speed/determinism
monkeypatch.setenv("CSV_FILES_DIR", os.path.join("csv_files", "testdata"))
# Use a commander known to be in tiny dataset or fallback path; we rely on search/confirm flow
# Provide a simple name that will fuzzy match one of the entries.
out1 = run(command_name="Krenko", seed=999)
out2 = run(command_name="Krenko", seed=999)
# Determinism: the seed should be set on the builder and identical across runs
assert getattr(out1, "seed", None) == getattr(out2, "seed", None) == 999
# Basic sanity: commander selection should have occurred
assert isinstance(getattr(out1, "commander_name", ""), str)
assert isinstance(getattr(out2, "commander_name", ""), str)

View file

@ -0,0 +1,84 @@
import json
import subprocess
import sys
from pathlib import Path
# This test validates that the whitelist governance + synergy cap logic
# (implemented in extract_themes.py and theme_whitelist.yml) behaves as expected.
# It focuses on a handful of anchor themes to keep runtime fast and deterministic.
ROOT = Path(__file__).resolve().parents[2]
SCRIPT = ROOT / "code" / "scripts" / "extract_themes.py"
OUTPUT_JSON = ROOT / "config" / "themes" / "theme_list.json"
def run_extractor():
# Re-run extraction so the test always evaluates fresh output.
# Using the current python executable ensures we run inside the active venv.
result = subprocess.run([sys.executable, str(SCRIPT)], capture_output=True, text=True)
assert result.returncode == 0, f"extract_themes.py failed: {result.stderr or result.stdout}"
assert OUTPUT_JSON.exists(), "Expected theme_list.json to be generated"
def load_themes():
data = json.loads(OUTPUT_JSON.read_text(encoding="utf-8"))
themes = data.get("themes", [])
mapping = {t["theme"]: t for t in themes if isinstance(t, dict) and "theme" in t}
return mapping
def assert_contains(theme_map, theme_name):
assert theme_name in theme_map, f"Expected theme '{theme_name}' in generated theme list"
def test_synergy_cap_and_enforced_inclusions():
run_extractor()
theme_map = load_themes()
# Target anchors to validate
anchors = [
"+1/+1 Counters",
"-1/-1 Counters",
"Counters Matter",
"Reanimate",
"Outlaw Kindred",
]
for a in anchors:
assert_contains(theme_map, a)
# Synergy cap check (<=5)
for a in anchors:
syn = theme_map[a]["synergies"]
assert len(syn) <= 5, f"Synergy cap violated for {a}: {syn} (len={len(syn)})"
# Enforced synergies for counters cluster
plus_syn = set(theme_map["+1/+1 Counters"]["synergies"])
assert {"Proliferate", "Counters Matter"}.issubset(plus_syn), "+1/+1 Counters missing enforced synergies"
minus_syn = set(theme_map["-1/-1 Counters"]["synergies"])
assert {"Proliferate", "Counters Matter"}.issubset(minus_syn), "-1/-1 Counters missing enforced synergies"
counters_matter_syn = set(theme_map["Counters Matter"]["synergies"])
assert "Proliferate" in counters_matter_syn, "Counters Matter should include Proliferate"
# Reanimate anchor (enforced synergy to Graveyard Matters retained while capped)
reanimate_syn = theme_map["Reanimate"]["synergies"]
assert "Graveyard Matters" in reanimate_syn, "Reanimate should include Graveyard Matters"
assert "Enter the Battlefield" in reanimate_syn, "Reanimate should include Enter the Battlefield (curated)"
# Outlaw Kindred - curated list should remain exactly its 5 intrinsic sub-tribes
outlaw_expected = {"Warlock Kindred", "Pirate Kindred", "Rogue Kindred", "Assassin Kindred", "Mercenary Kindred"}
outlaw_syn = set(theme_map["Outlaw Kindred"]["synergies"])
assert outlaw_syn == outlaw_expected, f"Outlaw Kindred synergies mismatch. Expected {outlaw_expected}, got {outlaw_syn}"
# No enforced synergy should be silently truncated if it was required (already ensured by ordering + length checks)
# Additional safety: ensure every enforced synergy appears in its anchor (sampling a subset)
for anchor, required in {
"+1/+1 Counters": ["Proliferate", "Counters Matter"],
"-1/-1 Counters": ["Proliferate", "Counters Matter"],
"Reanimate": ["Graveyard Matters"],
}.items():
present = set(theme_map[anchor]["synergies"])
missing = [r for r in required if r not in present]
assert not missing, f"Anchor {anchor} missing enforced synergies: {missing}"

View file

@ -78,6 +78,15 @@ ENABLE_THEMES = _as_bool(os.getenv("ENABLE_THEMES"), False)
ENABLE_PWA = _as_bool(os.getenv("ENABLE_PWA"), False)
ENABLE_PRESETS = _as_bool(os.getenv("ENABLE_PRESETS"), False)
ALLOW_MUST_HAVES = _as_bool(os.getenv("ALLOW_MUST_HAVES"), False)
RANDOM_MODES = _as_bool(os.getenv("RANDOM_MODES"), False)
RANDOM_UI = _as_bool(os.getenv("RANDOM_UI"), False)
def _as_int(val: str | None, default: int) -> int:
try:
return int(val) if val is not None and str(val).strip() != "" else default
except Exception:
return default
RANDOM_MAX_ATTEMPTS = _as_int(os.getenv("RANDOM_MAX_ATTEMPTS"), 5)
RANDOM_TIMEOUT_MS = _as_int(os.getenv("RANDOM_TIMEOUT_MS"), 5000)
# Theme default from environment: THEME=light|dark|system (case-insensitive). Defaults to system.
_THEME_ENV = (os.getenv("THEME") or "").strip().lower()
@ -96,6 +105,10 @@ templates.env.globals.update({
"enable_presets": ENABLE_PRESETS,
"allow_must_haves": ALLOW_MUST_HAVES,
"default_theme": DEFAULT_THEME,
"random_modes": RANDOM_MODES,
"random_ui": RANDOM_UI,
"random_max_attempts": RANDOM_MAX_ATTEMPTS,
"random_timeout_ms": RANDOM_TIMEOUT_MS,
})
# --- Simple fragment cache for template partials (low-risk, TTL-based) ---
@ -178,11 +191,272 @@ async def status_sys():
"ENABLE_PRESETS": bool(ENABLE_PRESETS),
"ALLOW_MUST_HAVES": bool(ALLOW_MUST_HAVES),
"DEFAULT_THEME": DEFAULT_THEME,
"RANDOM_MODES": bool(RANDOM_MODES),
"RANDOM_UI": bool(RANDOM_UI),
"RANDOM_MAX_ATTEMPTS": int(RANDOM_MAX_ATTEMPTS),
"RANDOM_TIMEOUT_MS": int(RANDOM_TIMEOUT_MS),
},
}
except Exception:
return {"version": "unknown", "uptime_seconds": 0, "flags": {}}
# --- Random Modes API ---
@app.post("/api/random_build")
async def api_random_build(request: Request):
# Gate behind feature flag
if not RANDOM_MODES:
raise HTTPException(status_code=404, detail="Random Modes disabled")
try:
body = {}
try:
body = await request.json()
if not isinstance(body, dict):
body = {}
except Exception:
body = {}
theme = body.get("theme")
constraints = body.get("constraints")
seed = body.get("seed")
attempts = body.get("attempts", int(RANDOM_MAX_ATTEMPTS))
timeout_ms = body.get("timeout_ms", int(RANDOM_TIMEOUT_MS))
# Convert ms -> seconds, clamp minimal
try:
timeout_s = max(0.1, float(timeout_ms) / 1000.0)
except Exception:
timeout_s = max(0.1, float(RANDOM_TIMEOUT_MS) / 1000.0)
# Import on-demand to avoid heavy costs at module import time
from deck_builder.random_entrypoint import build_random_deck # type: ignore
res = build_random_deck(
theme=theme,
constraints=constraints,
seed=seed,
attempts=int(attempts),
timeout_s=float(timeout_s),
)
rid = getattr(request.state, "request_id", None)
return {
"seed": int(res.seed),
"commander": res.commander,
"theme": res.theme,
"constraints": res.constraints or {},
"attempts": int(attempts),
"timeout_ms": int(timeout_ms),
"request_id": rid,
}
except HTTPException:
raise
except Exception as ex:
logging.getLogger("web").error(f"random_build failed: {ex}")
raise HTTPException(status_code=500, detail="random_build failed")
@app.post("/api/random_full_build")
async def api_random_full_build(request: Request):
# Gate behind feature flag
if not RANDOM_MODES:
raise HTTPException(status_code=404, detail="Random Modes disabled")
try:
body = {}
try:
body = await request.json()
if not isinstance(body, dict):
body = {}
except Exception:
body = {}
theme = body.get("theme")
constraints = body.get("constraints")
seed = body.get("seed")
attempts = body.get("attempts", int(RANDOM_MAX_ATTEMPTS))
timeout_ms = body.get("timeout_ms", int(RANDOM_TIMEOUT_MS))
# Convert ms -> seconds, clamp minimal
try:
timeout_s = max(0.1, float(timeout_ms) / 1000.0)
except Exception:
timeout_s = max(0.1, float(RANDOM_TIMEOUT_MS) / 1000.0)
# Build a full deck deterministically
from deck_builder.random_entrypoint import build_random_full_deck # type: ignore
res = build_random_full_deck(
theme=theme,
constraints=constraints,
seed=seed,
attempts=int(attempts),
timeout_s=float(timeout_s),
)
# Create a permalink token reusing the existing format from /build/permalink
payload = {
"commander": res.commander,
# Note: tags/bracket/ideals omitted; random modes focuses on seed replay
"random": {
"seed": int(res.seed),
"theme": res.theme,
"constraints": res.constraints or {},
},
}
try:
import base64
raw = _json.dumps(payload, separators=(",", ":"))
token = base64.urlsafe_b64encode(raw.encode("utf-8")).decode("ascii").rstrip("=")
permalink = f"/build/from?state={token}"
except Exception:
permalink = None
rid = getattr(request.state, "request_id", None)
return {
"seed": int(res.seed),
"commander": res.commander,
"decklist": res.decklist or [],
"theme": res.theme,
"constraints": res.constraints or {},
"permalink": permalink,
"attempts": int(attempts),
"timeout_ms": int(timeout_ms),
"request_id": rid,
}
except HTTPException:
raise
except Exception as ex:
logging.getLogger("web").error(f"random_full_build failed: {ex}")
raise HTTPException(status_code=500, detail="random_full_build failed")
@app.post("/api/random_reroll")
async def api_random_reroll(request: Request):
# Gate behind feature flag
if not RANDOM_MODES:
raise HTTPException(status_code=404, detail="Random Modes disabled")
try:
body = {}
try:
body = await request.json()
if not isinstance(body, dict):
body = {}
except Exception:
body = {}
theme = body.get("theme")
constraints = body.get("constraints")
last_seed = body.get("seed")
# Simple deterministic reroll policy: increment prior seed when provided; else generate fresh
try:
new_seed = int(last_seed) + 1 if last_seed is not None else None
except Exception:
new_seed = None
if new_seed is None:
from random_util import generate_seed # type: ignore
new_seed = int(generate_seed())
# Build with the new seed
timeout_ms = body.get("timeout_ms", int(RANDOM_TIMEOUT_MS))
try:
timeout_s = max(0.1, float(timeout_ms) / 1000.0)
except Exception:
timeout_s = max(0.1, float(RANDOM_TIMEOUT_MS) / 1000.0)
attempts = body.get("attempts", int(RANDOM_MAX_ATTEMPTS))
from deck_builder.random_entrypoint import build_random_full_deck # type: ignore
res = build_random_full_deck(
theme=theme,
constraints=constraints,
seed=new_seed,
attempts=int(attempts),
timeout_s=float(timeout_s),
)
payload = {
"commander": res.commander,
"random": {
"seed": int(res.seed),
"theme": res.theme,
"constraints": res.constraints or {},
},
}
try:
import base64
raw = _json.dumps(payload, separators=(",", ":"))
token = base64.urlsafe_b64encode(raw.encode("utf-8")).decode("ascii").rstrip("=")
permalink = f"/build/from?state={token}"
except Exception:
permalink = None
rid = getattr(request.state, "request_id", None)
return {
"previous_seed": (int(last_seed) if isinstance(last_seed, int) or (isinstance(last_seed, str) and str(last_seed).isdigit()) else None),
"seed": int(res.seed),
"commander": res.commander,
"decklist": res.decklist or [],
"theme": res.theme,
"constraints": res.constraints or {},
"permalink": permalink,
"attempts": int(attempts),
"timeout_ms": int(timeout_ms),
"request_id": rid,
}
except HTTPException:
raise
except Exception as ex:
logging.getLogger("web").error(f"random_reroll failed: {ex}")
raise HTTPException(status_code=500, detail="random_reroll failed")
@app.post("/hx/random_reroll")
async def hx_random_reroll(request: Request):
# Small HTMX endpoint returning a partial HTML fragment for in-page updates
if not RANDOM_UI or not RANDOM_MODES:
raise HTTPException(status_code=404, detail="Random UI disabled")
body = {}
try:
body = await request.json()
if not isinstance(body, dict):
body = {}
except Exception:
body = {}
last_seed = body.get("seed")
theme = body.get("theme")
constraints = body.get("constraints")
try:
new_seed = int(last_seed) + 1 if last_seed is not None else None
except Exception:
new_seed = None
if new_seed is None:
from random_util import generate_seed # type: ignore
new_seed = int(generate_seed())
from deck_builder.random_entrypoint import build_random_full_deck # type: ignore
res = build_random_full_deck(
theme=theme,
constraints=constraints,
seed=new_seed,
attempts=int(RANDOM_MAX_ATTEMPTS),
timeout_s=float(RANDOM_TIMEOUT_MS) / 1000.0,
)
# Render minimal fragment via Jinja2
try:
return templates.TemplateResponse(
"partials/random_result.html", # type: ignore
{
"request": request,
"seed": int(res.seed),
"commander": res.commander,
"decklist": res.decklist or [],
"theme": res.theme,
"constraints": res.constraints or {},
},
)
except Exception as ex:
logging.getLogger("web").error(f"hx_random_reroll template error: {ex}")
# Fallback to JSON to avoid total failure
return JSONResponse(
{
"seed": int(res.seed),
"commander": res.commander,
"decklist": res.decklist or [],
"theme": res.theme,
"constraints": res.constraints or {},
}
)
# Logs tail endpoint (read-only)
@app.get("/status/logs")
async def status_logs(

View file

@ -22,6 +22,7 @@ from html import escape as _esc
from deck_builder.builder import DeckBuilder
from deck_builder import builder_utils as bu
from ..services.combo_utils import detect_all as _detect_all
from path_util import csv_dir as _csv_dir
from ..services.alts_utils import get_cached as _alts_get_cached, set_cached as _alts_set_cached
# Cache for available card names used by validation endpoints
@ -39,7 +40,7 @@ def _available_cards() -> set[str]:
return _AVAILABLE_CARDS_CACHE
try:
import csv
path = 'csv_files/cards.csv'
path = f"{_csv_dir()}/cards.csv"
with open(path, 'r', encoding='utf-8', newline='') as f:
reader = csv.DictReader(f)
fields = reader.fieldnames or []
@ -2853,6 +2854,16 @@ async def build_permalink(request: Request):
},
"locks": list(sess.get("locks", [])),
}
# Optional: random build fields (if present in session)
try:
rb = sess.get("random_build") or {}
if rb:
# Only include known keys to avoid leaking unrelated session data
inc = {k: rb.get(k) for k in ("seed", "theme", "constraints") if k in rb}
if inc:
payload["random"] = inc
except Exception:
pass
# Add include/exclude cards and advanced options if feature is enabled
if ALLOW_MUST_HAVES:
@ -2899,6 +2910,15 @@ async def build_from(request: Request, state: str | None = None) -> HTMLResponse
sess["use_owned_only"] = bool(flags.get("owned_only"))
sess["prefer_owned"] = bool(flags.get("prefer_owned"))
sess["locks"] = list(data.get("locks", []))
# Optional random build rehydration
try:
r = data.get("random") or {}
if r:
sess["random_build"] = {
k: r.get(k) for k in ("seed", "theme", "constraints") if k in r
}
except Exception:
pass
# Import exclude_cards if feature is enabled and present
if ALLOW_MUST_HAVES and data.get("exclude_cards"):

View file

@ -61,7 +61,15 @@
el.innerHTML = '<div><strong>Version:</strong> '+String(v)+'</div>'+
(st ? '<div><strong>Server time (UTC):</strong> '+String(st)+'</div>' : '')+
'<div><strong>Uptime:</strong> '+String(up)+'s</div>'+
'<div><strong>Flags:</strong> SHOW_LOGS='+ (flags.SHOW_LOGS? '1':'0') +', SHOW_DIAGNOSTICS='+ (flags.SHOW_DIAGNOSTICS? '1':'0') +', SHOW_SETUP='+ (flags.SHOW_SETUP? '1':'0') +'</div>';
'<div><strong>Flags:</strong> '
+ 'SHOW_LOGS='+ (flags.SHOW_LOGS? '1':'0')
+ ', SHOW_DIAGNOSTICS='+ (flags.SHOW_DIAGNOSTICS? '1':'0')
+ ', SHOW_SETUP='+ (flags.SHOW_SETUP? '1':'0')
+ ', RANDOM_MODES='+ (flags.RANDOM_MODES? '1':'0')
+ ', RANDOM_UI='+ (flags.RANDOM_UI? '1':'0')
+ ', RANDOM_MAX_ATTEMPTS='+ String(flags.RANDOM_MAX_ATTEMPTS ?? '')
+ ', RANDOM_TIMEOUT_MS='+ String(flags.RANDOM_TIMEOUT_MS ?? '')
+ '</div>';
} catch(_){ el.textContent = 'Unavailable'; }
}
function load(){

View file

@ -0,0 +1,12 @@
<div class="random-result" hx-swap-oob="true" id="random-result">
<div class="random-meta">
<span class="seed">Seed: {{ seed }}</span>
{% if theme %}<span class="theme">Theme: {{ theme }}</span>{% endif %}
</div>
<h3 class="commander">{{ commander }}</h3>
<ul class="decklist">
{% for card in decklist %}
<li>{{ card }}</li>
{% endfor %}
</ul>
</div>

10899
config/themes/theme_list.json Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,99 @@
# Theme whitelist & governance configuration
# This file stabilizes the public theme taxonomy and synergy output.
#
# Sections:
# always_include: themes that must always appear even if frequency is low or zero
# protected_prefixes: any theme starting with one of these prefixes is never pruned
# protected_suffixes: any theme ending with one of these suffixes is never pruned
# min_frequency_overrides: per-theme minimum frequency required to retain (overrides global >1 rule)
# normalization: canonical name mapping (old -> new)
# exclusions: themes forcibly removed after normalization
# enforced_synergies: mapping of theme -> list of synergies that must be injected (before capping)
# synergy_cap: integer maximum number of synergies to emit per theme (after merging curated/enforced/inferred)
# notes: free-form documentation
#
# IMPORTANT: After editing, re-run: python code/scripts/extract_themes.py
always_include:
- Superfriends
- Storm
- Group Hug
- Pillowfort
- Stax
- Politics
- Reanimator
- Reanimate
- Graveyard Matters
- Treasure Token
- Tokens Matter
- Counters Matter
- +1/+1 Counters
- -1/-1 Counters
- Landfall
- Lands Matter
- Outlaw Kindred
protected_prefixes:
- Angel
- Dragon
- Elf
- Goblin
- Zombie
- Soldier
- Vampire
- Wizard
- Merfolk
- Spirit
- Sliver
- Dinosaur
- Construct
- Warrior
- Demon
- Hydra
- Treefolk
protected_suffixes:
- Kindred
min_frequency_overrides:
Storm: 0
Group Hug: 0
Pillowfort: 0
Politics: 0
Treasure Token: 0
Monarch: 0
Initiative: 0
Pillow Fort: 0 # alias that may appear; normalization may fold it
normalization:
ETB: Enter the Battlefield
Self Mill: Mill
Pillow Fort: Pillowfort
Reanimator: Reanimate # unify under single anchor; both appear in always_include for safety
exclusions:
- Draw Triggers
- Placeholder
- Test Tag
# Mandatory synergy injections independent of curated or inferred values.
# These are merged before the synergy cap is enforced.
enforced_synergies:
Counters Matter: [Proliferate]
+1/+1 Counters: [Proliferate, Counters Matter]
-1/-1 Counters: [Proliferate, Counters Matter]
Proliferate: [Counters Matter]
Creature Tokens: [Tokens Matter]
Token Creation: [Tokens Matter]
Treasure Token: [Artifacts Matter]
Reanimate: [Graveyard Matters]
Reanimator: [Graveyard Matters, Reanimate]
Graveyard Matters: [Reanimate]
synergy_cap: 5
notes: |
The synergy_cap trims verbose or noisy lists to improve UI scannability.
Precedence order when capping: curated > enforced > inferred (PMI).
Enforced synergies are guaranteed inclusion (unless they duplicate existing entries).
Protected prefixes/suffixes prevent pruning of tribal / kindred families even if low frequency.

View file

@ -20,6 +20,12 @@ services:
ALLOW_MUST_HAVES: "1" # 1=enable must-include/must-exclude cards feature; 0=disable
SHOW_MISC_POOL: "0"
# Random Modes (feature flags)
RANDOM_MODES: "0" # 1=enable random build endpoints and backend features
RANDOM_UI: "0" # 1=show Surprise/Theme/Reroll/Share controls in UI
RANDOM_MAX_ATTEMPTS: "5" # cap retry attempts
RANDOM_TIMEOUT_MS: "5000" # per-build timeout in ms
# Theming
THEME: "dark" # system|light|dark

View file

@ -21,6 +21,12 @@ services:
WEB_VIRTUALIZE: "1"
ALLOW_MUST_HAVES: "1" # 1=enable must-include/must-exclude cards feature; 0=disable
# Random Modes (feature flags)
RANDOM_MODES: "0" # 1=enable random build endpoints and backend features
RANDOM_UI: "0" # 1=show Surprise/Theme/Reroll/Share controls in UI
RANDOM_MAX_ATTEMPTS: "5" # cap retry attempts
RANDOM_TIMEOUT_MS: "5000" # per-build timeout in ms
# Theming
THEME: "system"

View file

@ -14,4 +14,7 @@ python-multipart>=0.0.9
# Config/schema validation
pydantic>=2.5.0
# YAML parsing for theme whitelist governance
PyYAML>=6.0
# Development dependencies are in requirements-dev.txt