refactor: backend standardization (service layer, validation, route splitting) + image cache and Scryfall API fixes

This commit is contained in:
matt 2026-03-17 16:34:50 -07:00
parent e81b47bccf
commit f784741416
35 changed files with 7054 additions and 4344 deletions

View file

@ -87,10 +87,42 @@ class ImageCache:
self.bulk_data_path = Path(bulk_data_path)
self.client = ScryfallBulkDataClient()
self._last_download_time: float = 0.0
# In-memory index of available images (avoids repeated filesystem checks)
# Key: (size, sanitized_filename), Value: True if exists
self._image_index: dict[tuple[str, str], bool] = {}
self._index_built = False
def is_enabled(self) -> bool:
"""Check if image caching is enabled via environment variable."""
return os.getenv("CACHE_CARD_IMAGES", "0") == "1"
def _build_image_index(self) -> None:
"""
Build in-memory index of cached images to avoid repeated filesystem checks.
This dramatically improves performance by eliminating stat() calls for every image.
"""
if self._index_built or not self.is_enabled():
return
logger.info("Building image cache index...")
start_time = time.time()
for size in IMAGE_SIZES:
size_dir = self.base_dir / size
if not size_dir.exists():
continue
# Scan directory for .jpg files
for image_file in size_dir.glob("*.jpg"):
# Store just the filename without extension
filename = image_file.stem
self._image_index[(size, filename)] = True
elapsed = time.time() - start_time
total_images = len(self._image_index)
logger.info(f"Image index built: {total_images} images indexed in {elapsed:.3f}s")
self._index_built = True
def get_image_path(self, card_name: str, size: str = "normal") -> Optional[Path]:
"""
@ -105,12 +137,17 @@ class ImageCache:
"""
if not self.is_enabled():
return None
# Build index on first access (lazy initialization)
if not self._index_built:
self._build_image_index()
safe_name = sanitize_filename(card_name)
image_path = self.base_dir / size / f"{safe_name}.jpg"
if image_path.exists():
return image_path
# Check in-memory index first (fast)
if (size, safe_name) in self._image_index:
return self.base_dir / size / f"{safe_name}.jpg"
return None
def get_image_url(self, card_name: str, size: str = "normal") -> str:

View file

@ -58,6 +58,7 @@ class ScryfallBulkDataClient:
try:
req = Request(url)
req.add_header("User-Agent", "MTG-Deckbuilder/3.0 (Image Cache)")
req.add_header("Accept", "application/json")
with urlopen(req, timeout=30) as response:
import json
return json.loads(response.read().decode("utf-8"))

View file

@ -96,7 +96,6 @@ def _write_dataset(path: Path) -> Path:
def _fresh_client(tmp_path: Path) -> tuple[TestClient, Path]:
dataset_path = _write_dataset(tmp_path / "partner_synergy.json")
os.environ["ENABLE_PARTNER_MECHANICS"] = "1"
os.environ["ENABLE_PARTNER_SUGGESTIONS"] = "1"
for module_name in (
"code.web.app",
"code.web.routes.partner_suggestions",
@ -177,7 +176,6 @@ def test_partner_suggestions_api_returns_ranked_candidates(tmp_path: Path) -> No
except Exception:
pass
os.environ.pop("ENABLE_PARTNER_MECHANICS", None)
os.environ.pop("ENABLE_PARTNER_SUGGESTIONS", None)
for module_name in (
"code.web.app",
"code.web.routes.partner_suggestions",
@ -245,7 +243,6 @@ def test_partner_suggestions_api_refresh_flag(monkeypatch) -> None:
from code.web.services.partner_suggestions import PartnerSuggestionResult
monkeypatch.setattr(route, "ENABLE_PARTNER_MECHANICS", True)
monkeypatch.setattr(route, "ENABLE_PARTNER_SUGGESTIONS", True)
captured: dict[str, bool] = {"refresh": False}

View file

@ -1,44 +0,0 @@
import base64
import json
from starlette.testclient import TestClient
def test_permalink_includes_locks_and_restores_notice(monkeypatch):
# Lazy import to ensure fresh app state
import importlib
app_module = importlib.import_module('code.web.app')
client = TestClient(app_module.app)
# Seed a session with a commander and locks by calling /build and directly touching session via cookie path
# Start a session
r = client.get('/build')
assert r.status_code == 200
# Now set some session state by invoking endpoints that mutate session
# Simulate selecting commander and a lock
# Use /build/from to load a permalink-like payload directly
payload = {
"commander": "Atraxa, Praetors' Voice",
"tags": ["proliferate"],
"bracket": 3,
"ideals": {"ramp": 10, "lands": 36, "basic_lands": 18, "creatures": 28, "removal": 10, "wipes": 3, "card_advantage": 8, "protection": 4},
"tag_mode": "AND",
"flags": {"owned_only": False, "prefer_owned": False},
"locks": ["Swords to Plowshares", "Sol Ring"],
}
raw = json.dumps(payload, separators=(",", ":")).encode('utf-8')
token = base64.urlsafe_b64encode(raw).decode('ascii').rstrip('=')
r2 = client.get(f'/build/from?state={token}')
assert r2.status_code == 200
# Step 4 should contain the locks restored chip
body = r2.text
assert 'locks restored' in body.lower()
# Ask the server for a permalink now and ensure locks are present
r3 = client.get('/build/permalink')
assert r3.status_code == 200
data = r3.json()
# Prefer decoded state when token not provided
state = data.get('state') or {}
assert 'locks' in state
assert set([s.lower() for s in state.get('locks', [])]) == {"swords to plowshares", "sol ring"}

View file

@ -0,0 +1,407 @@
"""Tests for service layer base classes, interfaces, and registry."""
from __future__ import annotations
import pytest
import time
from typing import Dict, Any
from code.web.services.base import (
BaseService,
StateService,
DataService,
CachedService,
ServiceError,
ValidationError,
NotFoundError,
)
from code.web.services.registry import ServiceRegistry, get_registry, reset_registry
from code.web.services.tasks import SessionManager
class TestBaseService:
"""Test BaseService abstract base class."""
def test_validation_helper(self):
"""Test _validate helper method."""
service = BaseService()
# Should not raise on True
service._validate(True, "Should not raise")
# Should raise on False
with pytest.raises(ValidationError, match="Should raise"):
service._validate(False, "Should raise")
class MockStateService(StateService):
"""Mock state service for testing."""
def _initialize_state(self, key: str) -> Dict[str, Any]:
return {"created": time.time(), "data": f"init-{key}"}
def _should_cleanup(self, key: str, state: Dict[str, Any]) -> bool:
# Cleanup if "expired" flag is set
return state.get("expired", False)
class TestStateService:
"""Test StateService base class."""
def test_get_state_creates_new(self):
"""Test that get_state creates new state."""
service = MockStateService()
state = service.get_state("test-key")
assert "created" in state
assert state["data"] == "init-test-key"
def test_get_state_returns_existing(self):
"""Test that get_state returns existing state."""
service = MockStateService()
state1 = service.get_state("test-key")
state1["custom"] = "value"
state2 = service.get_state("test-key")
assert state2 is state1
assert state2["custom"] == "value"
def test_set_and_get_value(self):
"""Test setting and getting state values."""
service = MockStateService()
service.set_state_value("key1", "field1", "value1")
assert service.get_state_value("key1", "field1") == "value1"
assert service.get_state_value("key1", "missing", "default") == "default"
def test_cleanup_state(self):
"""Test cleanup of expired state."""
service = MockStateService()
# Create some state
service.get_state("keep1")
service.get_state("keep2")
service.get_state("expire1")
service.get_state("expire2")
# Mark some as expired
service.set_state_value("expire1", "expired", True)
service.set_state_value("expire2", "expired", True)
# Cleanup
removed = service.cleanup_state()
assert removed == 2
# Verify expired are gone
state = service._state
assert "keep1" in state
assert "keep2" in state
assert "expire1" not in state
assert "expire2" not in state
class MockDataService(DataService[Dict[str, Any]]):
"""Mock data service for testing."""
def __init__(self, data: Dict[str, Any]):
super().__init__()
self._mock_data = data
def _load_data(self) -> Dict[str, Any]:
return self._mock_data.copy()
class TestDataService:
"""Test DataService base class."""
def test_lazy_loading(self):
"""Test that data is loaded lazily."""
service = MockDataService({"key": "value"})
assert not service.is_loaded()
data = service.get_data()
assert service.is_loaded()
assert data["key"] == "value"
def test_cached_loading(self):
"""Test that data is cached after first load."""
service = MockDataService({"key": "value"})
data1 = service.get_data()
data1["modified"] = True
data2 = service.get_data()
assert data2 is data1
assert data2["modified"]
def test_force_reload(self):
"""Test force reload of data."""
service = MockDataService({"key": "value"})
data1 = service.get_data()
data1["modified"] = True
data2 = service.get_data(force_reload=True)
assert data2 is not data1
assert "modified" not in data2
class MockCachedService(CachedService[str, int]):
"""Mock cached service for testing."""
def __init__(self, ttl_seconds: int | None = None, max_size: int | None = None):
super().__init__(ttl_seconds=ttl_seconds, max_size=max_size)
self.compute_count = 0
def _compute_value(self, key: str) -> int:
self.compute_count += 1
return len(key)
class TestCachedService:
"""Test CachedService base class."""
def test_cache_hit(self):
"""Test that values are cached."""
service = MockCachedService()
value1 = service.get("hello")
assert value1 == 5
assert service.compute_count == 1
value2 = service.get("hello")
assert value2 == 5
assert service.compute_count == 1 # Should not recompute
def test_cache_miss(self):
"""Test cache miss computes new value."""
service = MockCachedService()
value1 = service.get("hello")
value2 = service.get("world")
assert value1 == 5
assert value2 == 5
assert service.compute_count == 2
def test_ttl_expiration(self):
"""Test TTL-based expiration."""
service = MockCachedService(ttl_seconds=1)
value1 = service.get("hello")
assert service.compute_count == 1
# Should hit cache immediately
value2 = service.get("hello")
assert service.compute_count == 1
# Wait for expiration
time.sleep(1.1)
value3 = service.get("hello")
assert service.compute_count == 2 # Should recompute
def test_max_size_limit(self):
"""Test cache size limit."""
service = MockCachedService(max_size=2)
service.get("key1")
service.get("key2")
service.get("key3") # Should evict oldest (key1)
# key1 should be evicted
assert len(service._cache) == 2
assert "key1" not in service._cache
assert "key2" in service._cache
assert "key3" in service._cache
def test_invalidate_single(self):
"""Test invalidating single cache entry."""
service = MockCachedService()
service.get("key1")
service.get("key2")
service.invalidate("key1")
assert "key1" not in service._cache
assert "key2" in service._cache
def test_invalidate_all(self):
"""Test invalidating entire cache."""
service = MockCachedService()
service.get("key1")
service.get("key2")
service.invalidate()
assert len(service._cache) == 0
class MockService:
"""Mock service for registry testing."""
def __init__(self, value: str):
self.value = value
class TestServiceRegistry:
"""Test ServiceRegistry for dependency injection."""
def test_register_and_get_singleton(self):
"""Test registering and retrieving singleton."""
registry = ServiceRegistry()
instance = MockService("test")
registry.register_singleton(MockService, instance)
retrieved = registry.get(MockService)
assert retrieved is instance
assert retrieved.value == "test"
def test_register_and_get_factory(self):
"""Test registering and retrieving from factory."""
registry = ServiceRegistry()
registry.register_factory(MockService, lambda: MockService("factory"))
instance1 = registry.get(MockService)
instance2 = registry.get(MockService)
assert instance1 is not instance2 # Factory creates new instances
assert instance1.value == "factory"
assert instance2.value == "factory"
def test_lazy_singleton(self):
"""Test lazy-initialized singleton."""
registry = ServiceRegistry()
call_count = {"count": 0}
def factory():
call_count["count"] += 1
return MockService("lazy")
registry.register_lazy_singleton(MockService, factory)
instance1 = registry.get(MockService)
assert call_count["count"] == 1
instance2 = registry.get(MockService)
assert call_count["count"] == 1 # Should not call factory again
assert instance1 is instance2
def test_duplicate_registration_error(self):
"""Test error on duplicate registration."""
registry = ServiceRegistry()
registry.register_singleton(MockService, MockService("first"))
with pytest.raises(ValueError, match="already registered"):
registry.register_singleton(MockService, MockService("second"))
def test_get_unregistered_error(self):
"""Test error on getting unregistered service."""
registry = ServiceRegistry()
with pytest.raises(KeyError, match="not registered"):
registry.get(MockService)
def test_try_get(self):
"""Test try_get returns None for unregistered."""
registry = ServiceRegistry()
result = registry.try_get(MockService)
assert result is None
registry.register_singleton(MockService, MockService("test"))
result = registry.try_get(MockService)
assert result is not None
def test_is_registered(self):
"""Test checking if service is registered."""
registry = ServiceRegistry()
assert not registry.is_registered(MockService)
registry.register_singleton(MockService, MockService("test"))
assert registry.is_registered(MockService)
def test_unregister(self):
"""Test unregistering a service."""
registry = ServiceRegistry()
registry.register_singleton(MockService, MockService("test"))
assert registry.is_registered(MockService)
registry.unregister(MockService)
assert not registry.is_registered(MockService)
def test_clear(self):
"""Test clearing all services."""
registry = ServiceRegistry()
registry.register_singleton(MockService, MockService("test"))
registry.clear()
assert not registry.is_registered(MockService)
class TestSessionManager:
"""Test SessionManager refactored service."""
def test_new_session_id(self):
"""Test creating new session IDs."""
manager = SessionManager()
sid1 = manager.new_session_id()
sid2 = manager.new_session_id()
assert isinstance(sid1, str)
assert isinstance(sid2, str)
assert sid1 != sid2
assert len(sid1) == 32 # UUID hex is 32 chars
def test_get_session_creates_new(self):
"""Test get_session with None creates new."""
manager = SessionManager()
session = manager.get_session(None)
assert "created" in session
assert "updated" in session
def test_get_session_returns_existing(self):
"""Test get_session returns existing session."""
manager = SessionManager()
sid = manager.new_session_id()
session1 = manager.get_session(sid)
session1["custom"] = "data"
session2 = manager.get_session(sid)
assert session2 is session1
assert session2["custom"] == "data"
def test_set_and_get_value(self):
"""Test setting and getting session values."""
manager = SessionManager()
sid = manager.new_session_id()
manager.set_value(sid, "key1", "value1")
assert manager.get_value(sid, "key1") == "value1"
assert manager.get_value(sid, "missing", "default") == "default"
def test_cleanup_expired_sessions(self):
"""Test cleanup of expired sessions."""
manager = SessionManager(ttl_seconds=1)
sid1 = manager.new_session_id()
sid2 = manager.new_session_id()
manager.get_session(sid1)
time.sleep(1.1) # Let sid1 expire
manager.get_session(sid2) # sid2 is fresh
removed = manager.cleanup_state()
assert removed == 1
# sid1 should be gone, sid2 should exist
assert sid1 not in manager._state
assert sid2 in manager._state

View file

@ -0,0 +1,340 @@
"""Tests for validation framework (models, validators, card names)."""
from __future__ import annotations
import pytest
from pydantic import ValidationError
from code.web.validation.models import (
BuildRequest,
CommanderSearchRequest,
ThemeValidationRequest,
OwnedCardsImportRequest,
BatchBuildRequest,
CardReplacementRequest,
PowerBracket,
OwnedMode,
CommanderPartnerType,
)
from code.web.validation.card_names import CardNameValidator
from code.web.validation.validators import (
ThemeValidator,
PowerBracketValidator,
ColorIdentityValidator,
)
from code.web.validation.messages import ValidationMessages, MSG
class TestBuildRequest:
"""Test BuildRequest Pydantic model."""
def test_minimal_valid_request(self):
"""Test minimal valid build request."""
req = BuildRequest(commander="Atraxa, Praetors' Voice")
assert req.commander == "Atraxa, Praetors' Voice"
assert req.themes == []
assert req.power_bracket == PowerBracket.BRACKET_2
assert req.owned_mode == OwnedMode.OFF
def test_full_valid_request(self):
"""Test fully populated build request."""
req = BuildRequest(
commander="Kess, Dissident Mage",
themes=["Spellslinger", "Graveyard"],
power_bracket=PowerBracket.BRACKET_3,
owned_mode=OwnedMode.PREFER,
must_include=["Counterspell", "Lightning Bolt"],
must_exclude=["Armageddon"]
)
assert req.commander == "Kess, Dissident Mage"
assert len(req.themes) == 2
assert req.power_bracket == PowerBracket.BRACKET_3
assert len(req.must_include) == 2
def test_commander_whitespace_stripped(self):
"""Test commander name whitespace is stripped."""
req = BuildRequest(commander=" Atraxa ")
assert req.commander == "Atraxa"
def test_commander_empty_fails(self):
"""Test empty commander name fails validation."""
with pytest.raises(ValidationError):
BuildRequest(commander="")
with pytest.raises(ValidationError):
BuildRequest(commander=" ")
def test_themes_deduplicated(self):
"""Test themes are deduplicated case-insensitively."""
req = BuildRequest(
commander="Test",
themes=["Spellslinger", "spellslinger", "SPELLSLINGER", "Tokens"]
)
assert len(req.themes) == 2
assert "Spellslinger" in req.themes
assert "Tokens" in req.themes
def test_partner_validation_requires_name(self):
"""Test partner mode requires partner name."""
with pytest.raises(ValidationError, match="Partner mode requires partner_name"):
BuildRequest(
commander="Kydele, Chosen of Kruphix",
partner_mode=CommanderPartnerType.PARTNER
)
def test_partner_valid_with_name(self):
"""Test partner mode valid with name."""
req = BuildRequest(
commander="Kydele, Chosen of Kruphix",
partner_mode=CommanderPartnerType.PARTNER,
partner_name="Thrasios, Triton Hero"
)
assert req.partner_mode == CommanderPartnerType.PARTNER
assert req.partner_name == "Thrasios, Triton Hero"
def test_background_requires_name(self):
"""Test background mode requires background name."""
with pytest.raises(ValidationError, match="Background mode requires background_name"):
BuildRequest(
commander="Erinis, Gloom Stalker",
partner_mode=CommanderPartnerType.BACKGROUND
)
def test_custom_theme_requires_both(self):
"""Test custom theme requires both name and tags."""
with pytest.raises(ValidationError, match="Custom theme requires both name and tags"):
BuildRequest(
commander="Test",
custom_theme_name="My Theme"
)
with pytest.raises(ValidationError, match="Custom theme tags require theme name"):
BuildRequest(
commander="Test",
custom_theme_tags=["Tag1", "Tag2"]
)
class TestCommanderSearchRequest:
"""Test CommanderSearchRequest model."""
def test_valid_search(self):
"""Test valid search request."""
req = CommanderSearchRequest(query="Atraxa")
assert req.query == "Atraxa"
assert req.limit == 10
def test_custom_limit(self):
"""Test custom limit."""
req = CommanderSearchRequest(query="Test", limit=25)
assert req.limit == 25
def test_empty_query_fails(self):
"""Test empty query fails."""
with pytest.raises(ValidationError):
CommanderSearchRequest(query="")
def test_limit_bounds(self):
"""Test limit must be within bounds."""
with pytest.raises(ValidationError):
CommanderSearchRequest(query="Test", limit=0)
with pytest.raises(ValidationError):
CommanderSearchRequest(query="Test", limit=101)
class TestCardNameValidator:
"""Test card name validation and normalization."""
def test_normalize_lowercase(self):
"""Test normalization converts to lowercase."""
assert CardNameValidator.normalize("Atraxa, Praetors' Voice") == "atraxa, praetors' voice"
def test_normalize_removes_diacritics(self):
"""Test normalization removes diacritics."""
assert CardNameValidator.normalize("Dánitha Capashen") == "danitha capashen"
assert CardNameValidator.normalize("Gisela, the Broken Blade") == "gisela, the broken blade"
def test_normalize_standardizes_apostrophes(self):
"""Test normalization standardizes apostrophes."""
assert CardNameValidator.normalize("Atraxa, Praetors' Voice") == CardNameValidator.normalize("Atraxa, Praetors' Voice")
assert CardNameValidator.normalize("Atraxa, Praetors` Voice") == CardNameValidator.normalize("Atraxa, Praetors' Voice")
def test_normalize_collapses_whitespace(self):
"""Test normalization collapses whitespace."""
assert CardNameValidator.normalize("Test Card") == "test card"
assert CardNameValidator.normalize(" Test ") == "test"
def test_validator_caches_normalization(self):
"""Test validator caches normalized lookups."""
validator = CardNameValidator()
validator._card_names = {"Atraxa, Praetors' Voice"}
validator._normalized_map = {
"atraxa, praetors' voice": "Atraxa, Praetors' Voice"
}
validator._loaded = True
# Should find exact match
assert validator.is_valid("Atraxa, Praetors' Voice")
class TestThemeValidator:
"""Test theme validation."""
def test_validate_themes_separates_valid_invalid(self):
"""Test validation separates valid from invalid themes."""
validator = ThemeValidator()
validator._themes = {"Spellslinger", "spellslinger", "Tokens", "tokens"}
validator._loaded = True
valid, invalid = validator.validate_themes(["Spellslinger", "Invalid", "Tokens"])
assert "Spellslinger" in valid
assert "Tokens" in valid
assert "Invalid" in invalid
class TestPowerBracketValidator:
"""Test power bracket validation."""
def test_valid_brackets(self):
"""Test valid bracket values (1-4)."""
assert PowerBracketValidator.is_valid_bracket(1)
assert PowerBracketValidator.is_valid_bracket(2)
assert PowerBracketValidator.is_valid_bracket(3)
assert PowerBracketValidator.is_valid_bracket(4)
def test_invalid_brackets(self):
"""Test invalid bracket values."""
assert not PowerBracketValidator.is_valid_bracket(0)
assert not PowerBracketValidator.is_valid_bracket(5)
assert not PowerBracketValidator.is_valid_bracket(-1)
class TestColorIdentityValidator:
"""Test color identity validation."""
def test_parse_comma_separated(self):
"""Test parsing comma-separated colors."""
colors = ColorIdentityValidator.parse_colors("W,U,B")
assert colors == {"W", "U", "B"}
def test_parse_concatenated(self):
"""Test parsing concatenated colors."""
colors = ColorIdentityValidator.parse_colors("WUB")
assert colors == {"W", "U", "B"}
def test_parse_empty(self):
"""Test parsing empty string."""
colors = ColorIdentityValidator.parse_colors("")
assert colors == set()
def test_colorless_subset_any(self):
"""Test colorless cards valid in any deck."""
validator = ColorIdentityValidator()
assert validator.is_subset({"C"}, {"W", "U"})
assert validator.is_subset(set(), {"R", "G"})
def test_subset_validation(self):
"""Test subset validation."""
validator = ColorIdentityValidator()
# Valid: card colors subset of commander
assert validator.is_subset({"W", "U"}, {"W", "U", "B"})
# Invalid: card has colors not in commander
assert not validator.is_subset({"W", "U", "B"}, {"W", "U"})
class TestValidationMessages:
"""Test validation message formatting."""
def test_format_commander_invalid(self):
"""Test commander invalid message formatting."""
msg = MSG.format_commander_invalid("Test Commander")
assert "Test Commander" in msg
assert "not found" in msg
def test_format_themes_invalid(self):
"""Test multiple invalid themes formatting."""
msg = MSG.format_themes_invalid(["Theme1", "Theme2"])
assert "Theme1" in msg
assert "Theme2" in msg
def test_format_bracket_exceeded(self):
"""Test bracket exceeded message formatting."""
msg = MSG.format_bracket_exceeded("Mana Crypt", 4, 2)
assert "Mana Crypt" in msg
assert "4" in msg
assert "2" in msg
def test_format_color_mismatch(self):
"""Test color mismatch message formatting."""
msg = MSG.format_color_mismatch("Card", "WUB", "WU")
assert "Card" in msg
assert "WUB" in msg
assert "WU" in msg
class TestBatchBuildRequest:
"""Test batch build request validation."""
def test_valid_batch(self):
"""Test valid batch request."""
base = BuildRequest(commander="Test")
req = BatchBuildRequest(base_config=base, count=5)
assert req.count == 5
assert req.base_config.commander == "Test"
def test_count_limit(self):
"""Test batch count limit."""
base = BuildRequest(commander="Test")
with pytest.raises(ValidationError):
BatchBuildRequest(base_config=base, count=11)
class TestCardReplacementRequest:
"""Test card replacement request validation."""
def test_valid_replacement(self):
"""Test valid replacement request."""
req = CardReplacementRequest(card_name="Sol Ring", reason="Too powerful")
assert req.card_name == "Sol Ring"
assert req.reason == "Too powerful"
def test_whitespace_stripped(self):
"""Test whitespace is stripped."""
req = CardReplacementRequest(card_name=" Sol Ring ")
assert req.card_name == "Sol Ring"
def test_empty_name_fails(self):
"""Test empty card name fails."""
with pytest.raises(ValidationError):
CardReplacementRequest(card_name="")
class TestOwnedCardsImportRequest:
"""Test owned cards import validation."""
def test_valid_import(self):
"""Test valid import request."""
req = OwnedCardsImportRequest(format_type="csv", content="Name\nSol Ring\n")
assert req.format_type == "csv"
assert "Sol Ring" in req.content
def test_invalid_format(self):
"""Test invalid format fails."""
with pytest.raises(ValidationError):
OwnedCardsImportRequest(format_type="invalid", content="test")
def test_empty_content_fails(self):
"""Test empty content fails."""
with pytest.raises(ValidationError):
OwnedCardsImportRequest(format_type="csv", content="")

View file

@ -166,6 +166,7 @@ SHOW_SETUP = _as_bool(os.getenv("SHOW_SETUP"), True)
SHOW_DIAGNOSTICS = _as_bool(os.getenv("SHOW_DIAGNOSTICS"), False)
SHOW_COMMANDERS = _as_bool(os.getenv("SHOW_COMMANDERS"), True)
SHOW_VIRTUALIZE = _as_bool(os.getenv("WEB_VIRTUALIZE"), False)
CACHE_CARD_IMAGES = _as_bool(os.getenv("CACHE_CARD_IMAGES"), False)
ENABLE_THEMES = _as_bool(os.getenv("ENABLE_THEMES"), True)
ENABLE_PWA = _as_bool(os.getenv("ENABLE_PWA"), False)
ENABLE_PRESETS = _as_bool(os.getenv("ENABLE_PRESETS"), False)
@ -173,8 +174,7 @@ ALLOW_MUST_HAVES = _as_bool(os.getenv("ALLOW_MUST_HAVES"), True)
SHOW_MUST_HAVE_BUTTONS = _as_bool(os.getenv("SHOW_MUST_HAVE_BUTTONS"), False)
ENABLE_CUSTOM_THEMES = _as_bool(os.getenv("ENABLE_CUSTOM_THEMES"), True)
WEB_IDEALS_UI = os.getenv("WEB_IDEALS_UI", "slider").strip().lower() # 'input' or 'slider'
ENABLE_PARTNER_MECHANICS = _as_bool(os.getenv("ENABLE_PARTNER_MECHANICS"), True)
ENABLE_PARTNER_SUGGESTIONS = _as_bool(os.getenv("ENABLE_PARTNER_SUGGESTIONS"), True)
ENABLE_PARTNER_MECHANICS = _as_bool(os.getenv("ENABLE_PARTNER_GESTIONS"), True)
ENABLE_BATCH_BUILD = _as_bool(os.getenv("ENABLE_BATCH_BUILD"), True)
RANDOM_MODES = _as_bool(os.getenv("RANDOM_MODES"), True) # initial snapshot (legacy)
RANDOM_UI = _as_bool(os.getenv("RANDOM_UI"), True)
@ -310,13 +310,12 @@ templates.env.globals.update({
"enable_presets": ENABLE_PRESETS,
"enable_custom_themes": ENABLE_CUSTOM_THEMES,
"enable_partner_mechanics": ENABLE_PARTNER_MECHANICS,
"enable_partner_suggestions": ENABLE_PARTNER_SUGGESTIONS,
"allow_must_haves": ALLOW_MUST_HAVES,
"show_must_have_buttons": SHOW_MUST_HAVE_BUTTONS,
"default_theme": DEFAULT_THEME,
"random_modes": RANDOM_MODES,
"random_ui": RANDOM_UI,
"random_max_attempts": RANDOM_MAX_ATTEMPTS,
"card_images_cached": CACHE_CARD_IMAGES,
"random_timeout_ms": RANDOM_TIMEOUT_MS,
"random_reroll_throttle_ms": int(RANDOM_REROLL_THROTTLE_MS),
"theme_picker_diagnostics": THEME_PICKER_DIAGNOSTICS,
@ -2261,6 +2260,10 @@ from .routes import build_multicopy as build_multicopy_routes # noqa: E402
from .routes import build_include_exclude as build_include_exclude_routes # noqa: E402
from .routes import build_themes as build_themes_routes # noqa: E402
from .routes import build_partners as build_partners_routes # noqa: E402
from .routes import build_wizard as build_wizard_routes # noqa: E402
from .routes import build_newflow as build_newflow_routes # noqa: E402
from .routes import build_alternatives as build_alternatives_routes # noqa: E402
from .routes import build_compliance as build_compliance_routes # noqa: E402
from .routes import configs as config_routes # noqa: E402
from .routes import decks as decks_routes # noqa: E402
from .routes import setup as setup_routes # noqa: E402
@ -2279,6 +2282,10 @@ app.include_router(build_multicopy_routes.router, prefix="/build")
app.include_router(build_include_exclude_routes.router, prefix="/build")
app.include_router(build_themes_routes.router, prefix="/build")
app.include_router(build_partners_routes.router, prefix="/build")
app.include_router(build_wizard_routes.router, prefix="/build")
app.include_router(build_newflow_routes.router, prefix="/build")
app.include_router(build_alternatives_routes.router)
app.include_router(build_compliance_routes.router)
app.include_router(config_routes.router)
app.include_router(decks_routes.router)
app.include_router(setup_routes.router)

View file

@ -31,18 +31,46 @@ async def get_download_status():
import json
status_file = Path("card_files/images/.download_status.json")
last_result_file = Path("card_files/images/.last_download_result.json")
if not status_file.exists():
# Check cache statistics if no download in progress
# No active download - return cache stats plus last download result if available
stats = _image_cache.cache_statistics()
last_download = None
if last_result_file.exists():
try:
with last_result_file.open('r', encoding='utf-8') as f:
last_download = json.load(f)
except Exception:
pass
return JSONResponse({
"running": False,
"last_download": last_download,
"stats": stats
})
try:
with status_file.open('r', encoding='utf-8') as f:
status = json.load(f)
# If download is complete (or errored), persist result, clean up status file
if not status.get("running", False):
try:
with last_result_file.open('w', encoding='utf-8') as f:
json.dump(status, f)
except Exception:
pass
try:
status_file.unlink()
except Exception:
pass
cache_stats = _image_cache.cache_statistics()
return JSONResponse({
"running": False,
"last_download": status,
"stats": cache_stats
})
return JSONResponse(status)
except Exception as e:
logger.warning(f"Could not read status file: {e}")
@ -136,7 +164,7 @@ async def get_card_image(size: str, card_name: str, face: str = Query(default="f
image_path = _image_cache.get_image_path(card_name, size)
if image_path and image_path.exists():
logger.info(f"Serving cached image: {card_name} ({size}, {face})")
logger.debug(f"Serving cached image: {card_name} ({size}, {face})")
return FileResponse(
image_path,
media_type="image/jpeg",

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,615 @@
"""Build Alternatives Route
Phase 5 extraction from build.py:
- GET /build/alternatives - Role-based card suggestions with tag overlap fallback
This module provides intelligent alternative card suggestions based on deck role,
tags, and builder context. Supports owned-only filtering and caching.
"""
from __future__ import annotations
from fastapi import APIRouter, Request, Query
from fastapi.responses import HTMLResponse
from typing import Any
from ..app import templates
from ..services.tasks import get_session, new_sid
from ..services.build_utils import owned_set as owned_set_helper, builder_present_names, builder_display_map
from deck_builder.builder import DeckBuilder
from deck_builder import builder_constants as bc
from deck_builder import builder_utils as bu
from ..services.alts_utils import get_cached as _alts_get_cached, set_cached as _alts_set_cached
router = APIRouter(prefix="/build")
@router.get("/alternatives", response_class=HTMLResponse)
async def build_alternatives(
request: Request,
name: str,
stage: str | None = None,
owned_only: int = Query(0),
refresh: int = Query(0),
) -> HTMLResponse:
"""Suggest alternative cards for a given card name, preferring role-specific pools.
Strategy:
1) Determine the seed card's role from the current deck (Role field) or optional `stage` hint.
2) Build a candidate pool from the combined DataFrame using the same filters as the build phase
for that role (ramp/removal/wipes/card_advantage/protection).
3) Exclude commander, lands (where applicable), in-deck, locked, and the seed itself; then sort
by edhrecRank/manaValue. Apply owned-only filter if requested.
4) Fall back to tag-overlap similarity when role cannot be determined or data is missing.
Returns an HTML partial listing up to ~10 alternatives with Replace buttons.
"""
sid = request.cookies.get("sid") or new_sid()
sess = get_session(sid)
ctx = sess.get("build_ctx") or {}
b = ctx.get("builder") if isinstance(ctx, dict) else None
# Owned library
owned_set = owned_set_helper()
require_owned = bool(int(owned_only or 0)) or bool(sess.get("use_owned_only"))
refresh_requested = bool(int(refresh or 0))
# If builder context missing, show a guidance message
if not b:
html = '<div class="alts"><div class="muted">Start the build to see alternatives.</div></div>'
return HTMLResponse(html)
try:
name_disp = str(name).strip()
name_l = name_disp.lower()
commander_l = str((sess.get("commander") or "")).strip().lower()
locked_set = {str(x).strip().lower() for x in (sess.get("locks", []) or [])}
# Exclusions from prior inline replacements
alts_exclude = {str(x).strip().lower() for x in (sess.get("alts_exclude", []) or [])}
alts_exclude_v = int(sess.get("alts_exclude_v") or 0)
# Resolve role from stage hint or current library entry
stage_hint = (stage or "").strip().lower()
stage_map = {
"ramp": "ramp",
"removal": "removal",
"wipes": "wipe",
"wipe": "wipe",
"board_wipe": "wipe",
"card_advantage": "card_advantage",
"draw": "card_advantage",
"protection": "protection",
# Additional mappings for creature stages
"creature": "creature",
"creatures": "creature",
"primary": "creature",
"secondary": "creature",
# Land-related hints
"land": "land",
"lands": "land",
"utility": "land",
"misc": "land",
"fetch": "land",
"dual": "land",
}
hinted_role = stage_map.get(stage_hint) if stage_hint else None
lib = getattr(b, "card_library", {}) or {}
# Case-insensitive lookup in deck library
lib_key = None
try:
if name_disp in lib:
lib_key = name_disp
else:
lm = {str(k).strip().lower(): k for k in lib.keys()}
lib_key = lm.get(name_l)
except Exception:
lib_key = None
entry = lib.get(lib_key) if lib_key else None
role = hinted_role or (entry.get("Role") if isinstance(entry, dict) else None)
if isinstance(role, str):
role = role.strip().lower()
# Build role-specific pool from combined DataFrame
items: list[dict] = []
def _clean(value: Any) -> str:
try:
if value is None:
return ""
if isinstance(value, float) and value != value:
return ""
text = str(value)
return text.strip()
except Exception:
return ""
def _normalize_tags(raw: Any) -> list[str]:
if not raw:
return []
if isinstance(raw, (list, tuple, set)):
return [str(t).strip() for t in raw if str(t).strip()]
if isinstance(raw, str):
txt = raw.strip()
if not txt:
return []
if txt.startswith("[") and txt.endswith("]"):
try:
import json as _json
parsed = _json.loads(txt)
if isinstance(parsed, list):
return [str(t).strip() for t in parsed if str(t).strip()]
except Exception:
pass
return [s.strip() for s in txt.split(',') if s.strip()]
return []
def _meta_from_row(row_obj: Any) -> dict[str, Any]:
meta = {
"mana": "",
"rarity": "",
"role": "",
"tags": [],
"hover_simple": True,
}
if row_obj is None:
meta["role"] = _clean(used_role or "")
return meta
def _pull(*keys: str) -> Any:
for key in keys:
try:
if isinstance(row_obj, dict):
val = row_obj.get(key)
elif hasattr(row_obj, "get"):
val = row_obj.get(key)
else:
val = getattr(row_obj, key, None)
except Exception:
val = None
if val not in (None, ""):
if isinstance(val, float) and val != val:
continue
return val
return None
meta["mana"] = _clean(_pull("mana_cost", "manaCost", "mana", "manaValue", "cmc", "mv"))
meta["rarity"] = _clean(_pull("rarity"))
role_val = _pull("role", "primaryRole", "subRole")
if not role_val:
role_val = used_role or ""
meta["role"] = _clean(role_val)
tags_val = _pull("themeTags", "_ltags", "tags")
meta_tags = _normalize_tags(tags_val)
meta["tags"] = meta_tags
meta["hover_simple"] = not (meta["mana"] or meta["rarity"] or (meta_tags and len(meta_tags) > 0))
return meta
def _build_meta_map(df_obj) -> dict[str, dict[str, Any]]:
mapping: dict[str, dict[str, Any]] = {}
try:
if df_obj is None or not hasattr(df_obj, "iterrows"):
return mapping
for _, row in df_obj.iterrows():
try:
nm_val = str(row.get("name") or "").strip()
except Exception:
nm_val = ""
if not nm_val:
continue
key = nm_val.lower()
if key in mapping:
continue
mapping[key] = _meta_from_row(row)
except Exception:
return mapping
return mapping
def _sampler(seq: list[str], limit: int) -> list[str]:
if limit <= 0:
return []
if len(seq) <= limit:
return list(seq)
rng = getattr(b, "rng", None)
try:
if rng is not None:
return rng.sample(seq, limit) if len(seq) >= limit else list(seq)
import random as _rnd
return _rnd.sample(seq, limit) if len(seq) >= limit else list(seq)
except Exception:
return list(seq[:limit])
used_role = role if isinstance(role, str) and role else None
# Promote to 'land' role when the seed card is a land (regardless of stored role)
try:
if entry and isinstance(entry, dict):
ctype = str(entry.get("Card Type") or entry.get("Type") or "").lower()
if "land" in ctype:
used_role = "land"
except Exception:
pass
df = getattr(b, "_combined_cards_df", None)
# Compute current deck fingerprint to avoid stale cached alternatives after stage changes
in_deck: set[str] = builder_present_names(b)
try:
import hashlib as _hl
deck_fp = _hl.md5(
("|".join(sorted(in_deck)) if in_deck else "").encode("utf-8")
).hexdigest()[:8]
except Exception:
deck_fp = str(len(in_deck))
# Use a cache key that includes the exclusions version and deck fingerprint
cache_key = (name_l, commander_l, used_role or "_fallback_", require_owned, alts_exclude_v, deck_fp)
cached = None
if used_role != 'land' and not refresh_requested:
cached = _alts_get_cached(cache_key)
if cached is not None:
return HTMLResponse(cached)
def _render_and_cache(_items: list[dict]):
html_str = templates.get_template("build/_alternatives.html").render({
"request": request,
"name": name_disp,
"require_owned": require_owned,
"items": _items,
})
# Skip caching when used_role == land or refresh requested for per-call randomness
if used_role != 'land' and not refresh_requested:
try:
_alts_set_cached(cache_key, html_str)
except Exception:
pass
return HTMLResponse(html_str)
# Helper: map display names
def _display_map_for(lower_pool: set[str]) -> dict[str, str]:
try:
return builder_display_map(b, lower_pool)
except Exception:
return {nm: nm for nm in lower_pool}
# Common exclusions
# in_deck already computed above
def _exclude(df0):
out = df0.copy()
if "name" in out.columns:
out["_lname"] = out["name"].astype(str).str.strip().str.lower()
mask = ~out["_lname"].isin({name_l} | in_deck | locked_set | alts_exclude | ({commander_l} if commander_l else set()))
out = out[mask]
return out
# If we have data and a recognized role, mirror the phase logic
if df is not None and hasattr(df, "copy") and (used_role in {"ramp","removal","wipe","card_advantage","protection","creature","land"}):
pool = df.copy()
try:
pool["_ltags"] = pool.get("themeTags", []).apply(bu.normalize_tag_cell)
except Exception:
# best-effort normalize
pool["_ltags"] = pool.get("themeTags", []).apply(lambda x: [str(t).strip().lower() for t in (x or [])] if isinstance(x, list) else [])
# Role-specific base filtering
if used_role != "land":
# Exclude lands for non-land roles
if "type" in pool.columns:
pool = pool[~pool["type"].fillna("").str.contains("Land", case=False, na=False)]
else:
# Keep only lands
if "type" in pool.columns:
pool = pool[pool["type"].fillna("").str.contains("Land", case=False, na=False)]
# Seed info to guide filtering
seed_is_basic = False
try:
seed_is_basic = bool(name_l in {b.strip().lower() for b in getattr(bc, 'BASIC_LANDS', [])})
except Exception:
seed_is_basic = False
if seed_is_basic:
# For basics: show other basics (different colors) to allow quick swaps
try:
pool = pool[pool['name'].astype(str).str.strip().str.lower().isin({x.lower() for x in getattr(bc, 'BASIC_LANDS', [])})]
except Exception:
pass
else:
# For non-basics: prefer other non-basics
try:
pool = pool[~pool['name'].astype(str).str.strip().str.lower().isin({x.lower() for x in getattr(bc, 'BASIC_LANDS', [])})]
except Exception:
pass
# Apply mono-color misc land filters (no debug CSV dependency)
try:
colors = list(getattr(b, 'color_identity', []) or [])
mono = len(colors) <= 1
mono_exclude = {n.lower() for n in getattr(bc, 'MONO_COLOR_MISC_LAND_EXCLUDE', [])}
mono_keep = {n.lower() for n in getattr(bc, 'MONO_COLOR_MISC_LAND_KEEP_ALWAYS', [])}
kindred_all = {n.lower() for n in getattr(bc, 'KINDRED_ALL_LAND_NAMES', [])}
any_color_phrases = [s.lower() for s in getattr(bc, 'ANY_COLOR_MANA_PHRASES', [])]
extra_rainbow_terms = [s.lower() for s in getattr(bc, 'MONO_COLOR_RAINBOW_TEXT_EXTRA', [])]
fetch_names = set()
for seq in getattr(bc, 'COLOR_TO_FETCH_LANDS', {}).values():
for nm in seq:
fetch_names.add(nm.lower())
for nm in getattr(bc, 'GENERIC_FETCH_LANDS', []):
fetch_names.add(nm.lower())
# World Tree check needs all five colors
need_all_colors = {'w','u','b','r','g'}
def _illegal_world_tree(nm: str) -> bool:
return nm == 'the world tree' and set(c.lower() for c in colors) != need_all_colors
# Text column fallback
text_col = 'text'
if text_col not in pool.columns:
for c in pool.columns:
if 'text' in c.lower():
text_col = c
break
def _exclude_row(row) -> bool:
nm_l = str(row['name']).strip().lower()
if mono and nm_l in mono_exclude and nm_l not in mono_keep and nm_l not in kindred_all:
return True
if mono and nm_l not in mono_keep and nm_l not in kindred_all:
try:
txt = str(row.get(text_col, '') or '').lower()
if any(p in txt for p in any_color_phrases + extra_rainbow_terms):
return True
except Exception:
pass
if nm_l in fetch_names:
return True
if _illegal_world_tree(nm_l):
return True
return False
pool = pool[~pool.apply(_exclude_row, axis=1)]
except Exception:
pass
# Optional sub-role filtering (only if enough depth)
try:
subrole = str((entry or {}).get('SubRole') or '').strip().lower()
if subrole:
# Heuristic categories for grouping
cat_map = {
'fetch': 'fetch',
'dual': 'dual',
'triple': 'triple',
'misc': 'misc',
'utility': 'misc',
'basic': 'basic'
}
target_cat = None
for key, val in cat_map.items():
if key in subrole:
target_cat = val
break
if target_cat and len(pool) > 25:
# Lightweight textual filter using known markers
def _cat_row(rname: str, rtype: str) -> str:
rl = rname.lower()
rt = rtype.lower()
if any(k in rl for k in ('vista','strand','delta','mire','heath','rainforest','mesa','foothills','catacombs','tarn','flat','expanse','wilds','landscape','tunnel','terrace','vista')):
return 'fetch'
if 'triple' in rt or 'three' in rt:
return 'triple'
if any(t in rt for t in ('forest','plains','island','swamp','mountain')) and any(sym in rt for sym in ('forest','plains','island','swamp','mountain')) and 'land' in rt:
# Basic-check crude
return 'basic'
return 'misc'
try:
tmp = pool.copy()
tmp['_cat'] = tmp.apply(lambda r: _cat_row(str(r.get('name','')), str(r.get('type',''))), axis=1)
sub_pool = tmp[tmp['_cat'] == target_cat]
if len(sub_pool) >= 10:
pool = sub_pool.drop(columns=['_cat'])
except Exception:
pass
except Exception:
pass
# Exclude commander explicitly
if "name" in pool.columns and commander_l:
pool = pool[pool["name"].astype(str).str.strip().str.lower() != commander_l]
# Role-specific filter
def _is_wipe(tags: list[str]) -> bool:
return any(("board wipe" in t) or ("mass removal" in t) for t in tags)
def _is_removal(tags: list[str]) -> bool:
return any(("removal" in t) or ("spot removal" in t) for t in tags)
def _is_draw(tags: list[str]) -> bool:
return any(("draw" in t) or ("card advantage" in t) for t in tags)
def _matches_selected(tags: list[str]) -> bool:
try:
sel = [str(t).strip().lower() for t in (sess.get("tags") or []) if str(t).strip()]
if not sel:
return True
st = set(sel)
return any(any(s in t for s in st) for t in tags)
except Exception:
return True
if used_role == "ramp":
pool = pool[pool["_ltags"].apply(lambda tags: any("ramp" in t for t in tags))]
elif used_role == "removal":
pool = pool[pool["_ltags"].apply(_is_removal) & ~pool["_ltags"].apply(_is_wipe)]
elif used_role == "wipe":
pool = pool[pool["_ltags"].apply(_is_wipe)]
elif used_role == "card_advantage":
pool = pool[pool["_ltags"].apply(_is_draw)]
elif used_role == "protection":
pool = pool[pool["_ltags"].apply(lambda tags: any("protection" in t for t in tags))]
elif used_role == "creature":
# Keep only creatures; bias toward selected theme tags when available
if "type" in pool.columns:
pool = pool[pool["type"].fillna("").str.contains("Creature", case=False, na=False)]
try:
pool = pool[pool["_ltags"].apply(_matches_selected)]
except Exception:
pass
elif used_role == "land":
# Already constrained to lands; no additional tag filter needed
pass
# Sort by priority like the builder
try:
pool = bu.sort_by_priority(pool, ["edhrecRank","manaValue"])
except Exception:
pass
# Exclusions and ownership (for non-random roles this stays before slicing)
pool = _exclude(pool)
try:
if bool(sess.get("prefer_owned")) and getattr(b, "owned_card_names", None):
pool = bu.prefer_owned_first(pool, {str(n).lower() for n in getattr(b, "owned_card_names", set())})
except Exception:
pass
row_meta = _build_meta_map(pool)
# Land role: random 12 from top 60-100 window
if used_role == 'land':
import random as _rnd
total = len(pool)
if total == 0:
pass
else:
cap = min(100, total)
floor = min(60, cap) # if fewer than 60 just use all
if cap <= 12:
window_size = cap
else:
if cap == floor:
window_size = cap
else:
rng_obj = getattr(b, 'rng', None)
if rng_obj:
window_size = rng_obj.randint(floor, cap)
else:
window_size = _rnd.randint(floor, cap)
window_df = pool.head(window_size)
names = window_df['name'].astype(str).str.strip().tolist()
# Random sample up to 12 distinct names
sample_n = min(12, len(names))
if sample_n > 0:
if getattr(b, 'rng', None):
chosen = getattr(b,'rng').sample(names, sample_n) if len(names) >= sample_n else names
else:
chosen = _rnd.sample(names, sample_n) if len(names) >= sample_n else names
lower_map = {n.strip().lower(): n for n in chosen}
display_map = _display_map_for(set(k for k in lower_map.keys()))
for nm_lc, orig in lower_map.items():
is_owned = (nm_lc in owned_set)
if require_owned and not is_owned:
continue
if nm_lc == name_l or (in_deck and nm_lc in in_deck):
continue
meta = row_meta.get(nm_lc) or _meta_from_row(None)
items.append({
'name': display_map.get(nm_lc, orig),
'name_lower': nm_lc,
'owned': is_owned,
'tags': meta.get('tags') or [],
'role': meta.get('role', ''),
'mana': meta.get('mana', ''),
'rarity': meta.get('rarity', ''),
'hover_simple': bool(meta.get('hover_simple', True)),
})
if items:
return _render_and_cache(items)
else:
# Default deterministic top-N (increase to 12 for parity)
lower_pool: list[str] = []
try:
lower_pool = pool["name"].astype(str).str.strip().str.lower().tolist()
except Exception:
lower_pool = []
display_map = _display_map_for(set(lower_pool))
iteration_order = lower_pool
if refresh_requested and len(lower_pool) > 12:
window_size = min(len(lower_pool), 30)
window = lower_pool[:window_size]
sampled = _sampler(window, min(window_size, 12))
seen_sampled = set(sampled)
iteration_order = sampled + [nm for nm in lower_pool if nm not in seen_sampled]
for nm_l in iteration_order:
is_owned = (nm_l in owned_set)
if require_owned and not is_owned:
continue
if nm_l == name_l or (in_deck and nm_l in in_deck):
continue
meta = row_meta.get(nm_l) or _meta_from_row(None)
items.append({
"name": display_map.get(nm_l, nm_l),
"name_lower": nm_l,
"owned": is_owned,
"tags": meta.get("tags") or [],
"role": meta.get("role", ""),
"mana": meta.get("mana", ""),
"rarity": meta.get("rarity", ""),
"hover_simple": bool(meta.get("hover_simple", True)),
})
if len(items) >= 12:
break
if items:
return _render_and_cache(items)
# Fallback: tag-similarity suggestions (previous behavior)
tags_idx = getattr(b, "_card_name_tags_index", {}) or {}
seed_tags = set(tags_idx.get(name_l) or [])
all_names = set(tags_idx.keys())
candidates: list[tuple[str, int]] = [] # (name, score)
for nm in all_names:
if nm == name_l:
continue
if commander_l and nm == commander_l:
continue
if in_deck and nm in in_deck:
continue
if locked_set and nm in locked_set:
continue
if nm in alts_exclude:
continue
tgs = set(tags_idx.get(nm) or [])
score = len(seed_tags & tgs)
if score <= 0:
continue
candidates.append((nm, score))
# If no tag-based candidates, try shared trigger tag from library entry
if not candidates and isinstance(entry, dict):
try:
trig = str(entry.get("TriggerTag") or "").strip().lower()
except Exception:
trig = ""
if trig:
for nm, tglist in tags_idx.items():
if nm == name_l:
continue
if nm in {str(k).strip().lower() for k in lib.keys()}:
continue
if trig in {str(t).strip().lower() for t in (tglist or [])}:
candidates.append((nm, 1))
def _owned(nm: str) -> bool:
return nm in owned_set
candidates.sort(key=lambda x: (-x[1], 0 if _owned(x[0]) else 1, x[0]))
if refresh_requested and len(candidates) > 1:
name_sequence = [nm for nm, _score in candidates]
sampled_names = _sampler(name_sequence, min(len(name_sequence), 10))
sampled_set = set(sampled_names)
reordered: list[tuple[str, int]] = []
for nm in sampled_names:
for cand_nm, cand_score in candidates:
if cand_nm == nm:
reordered.append((cand_nm, cand_score))
break
for cand_nm, cand_score in candidates:
if cand_nm not in sampled_set:
reordered.append((cand_nm, cand_score))
candidates = reordered
pool_lower = {nm for (nm, _s) in candidates}
display_map = _display_map_for(pool_lower)
seen = set()
for nm, score in candidates:
if nm in seen:
continue
seen.add(nm)
is_owned = (nm in owned_set)
if require_owned and not is_owned:
continue
items.append({
"name": display_map.get(nm, nm),
"name_lower": nm,
"owned": is_owned,
"tags": list(tags_idx.get(nm) or []),
"role": "",
"mana": "",
"rarity": "",
"hover_simple": True,
})
if len(items) >= 10:
break
return _render_and_cache(items)
except Exception as e:
return HTMLResponse(f'<div class="alts"><div class="muted">No alternatives: {e}</div></div>')

View file

@ -0,0 +1,653 @@
"""Build Compliance and Card Replacement Routes
Phase 5 extraction from build.py:
- POST /build/replace - Inline card replacement with undo tracking
- POST /build/replace/undo - Undo last replacement
- GET /build/compare - Batch build comparison stub
- GET /build/compliance - Bracket compliance panel
- POST /build/enforce/apply - Apply bracket enforcement
- GET /build/enforcement - Full-page enforcement review
This module handles card replacement, bracket compliance checking, and enforcement.
"""
from __future__ import annotations
from fastapi import APIRouter, Request, Form, Query
from fastapi.responses import HTMLResponse, JSONResponse
from typing import Any
import json
from ..app import templates
from ..services.tasks import get_session, new_sid
from ..services.build_utils import (
step5_ctx_from_result,
step5_error_ctx,
step5_empty_ctx,
owned_set as owned_set_helper,
)
from ..services import orchestrator as orch
from deck_builder.builder import DeckBuilder
from html import escape as _esc
from urllib.parse import quote_plus
router = APIRouter(prefix="/build")
def _merge_hx_trigger(response: Any, payload: dict[str, Any]) -> None:
if not payload or response is None:
return
try:
existing = response.headers.get("HX-Trigger") if hasattr(response, "headers") else None
except Exception:
existing = None
try:
if existing:
try:
data = json.loads(existing)
except Exception:
data = {}
if isinstance(data, dict):
data.update(payload)
response.headers["HX-Trigger"] = json.dumps(data)
return
response.headers["HX-Trigger"] = json.dumps(payload)
except Exception:
try:
response.headers["HX-Trigger"] = json.dumps(payload)
except Exception:
pass
@router.post("/replace", response_class=HTMLResponse)
async def build_replace(request: Request, old: str = Form(...), new: str = Form(...), owned_only: str = Form("0")) -> HTMLResponse:
"""Inline replace: swap `old` with `new` in the current builder when possible, and suppress `old` from future alternatives.
Falls back to lock-and-rerun guidance if no active builder is present.
"""
sid = request.cookies.get("sid") or new_sid()
sess = get_session(sid)
o_disp = str(old).strip()
n_disp = str(new).strip()
o = o_disp.lower()
n = n_disp.lower()
owned_only_flag = str(owned_only or "").strip().lower()
owned_only_int = 1 if owned_only_flag in {"1", "true", "yes", "on"} else 0
# Maintain locks to bias future picks and enforcement
locks = set(sess.get("locks", []))
locks.discard(o)
locks.add(n)
sess["locks"] = list(locks)
# Track last replace for optional undo
try:
sess["last_replace"] = {"old": o, "new": n}
except Exception:
pass
ctx = sess.get("build_ctx") or {}
try:
ctx["locks"] = {str(x) for x in locks}
except Exception:
pass
# Record preferred replacements
try:
pref = ctx.get("preferred_replacements") if isinstance(ctx, dict) else None
if not isinstance(pref, dict):
pref = {}
ctx["preferred_replacements"] = pref
pref[o] = n
except Exception:
pass
b: DeckBuilder | None = ctx.get("builder") if isinstance(ctx, dict) else None
if b is not None:
try:
lib = getattr(b, "card_library", {}) or {}
# Find the exact key for `old` in a case-insensitive manner
old_key = None
if o_disp in lib:
old_key = o_disp
else:
for k in list(lib.keys()):
if str(k).strip().lower() == o:
old_key = k
break
if old_key is None:
raise KeyError("old card not in deck")
old_info = dict(lib.get(old_key) or {})
role = str(old_info.get("Role") or "").strip()
subrole = str(old_info.get("SubRole") or "").strip()
try:
count = int(old_info.get("Count", 1))
except Exception:
count = 1
# Remove old entry
try:
del lib[old_key]
except Exception:
pass
# Resolve canonical name and info for new
df = getattr(b, "_combined_cards_df", None)
new_key = n_disp
card_type = ""
mana_cost = ""
trigger_tag = str(old_info.get("TriggerTag") or "")
if df is not None:
try:
row = df[df["name"].astype(str).str.strip().str.lower() == n]
if not row.empty:
new_key = str(row.iloc[0]["name"]) or n_disp
card_type = str(row.iloc[0].get("type", row.iloc[0].get("type_line", "")) or "")
mana_cost = str(row.iloc[0].get("mana_cost", row.iloc[0].get("manaCost", "")) or "")
except Exception:
pass
lib[new_key] = {
"Count": count,
"Card Type": card_type,
"Mana Cost": mana_cost,
"Role": role,
"SubRole": subrole,
"AddedBy": "Replace",
"TriggerTag": trigger_tag,
}
# Mirror preferred replacements onto the builder for enforcement
try:
cur = getattr(b, "preferred_replacements", {}) or {}
cur[str(o)] = str(n)
setattr(b, "preferred_replacements", cur)
except Exception:
pass
# Update alternatives exclusion set and bump version to invalidate caches
try:
ex = {str(x).strip().lower() for x in (sess.get("alts_exclude", []) or [])}
ex.add(o)
sess["alts_exclude"] = list(ex)
sess["alts_exclude_v"] = int(sess.get("alts_exclude_v") or 0) + 1
except Exception:
pass
# Success panel and OOB updates (refresh compliance panel)
# Compute ownership of the new card for UI badge update
is_owned = (n in owned_set_helper())
refresh = (
'<div hx-get="/build/alternatives?name='
+ quote_plus(new_key)
+ f'&owned_only={owned_only_int}" hx-trigger="load delay:80ms" '
'hx-target="closest .alts" hx-swap="outerHTML" aria-hidden="true"></div>'
)
html = (
'<div class="alts" style="margin-top:.35rem; padding:.5rem; border:1px solid var(--border); border-radius:8px; background:#0f1115;">'
f'<div>Replaced <strong>{o_disp}</strong> with <strong>{new_key}</strong>.</div>'
'<div class="muted" style="margin-top:.35rem;">Compliance panel will refresh.</div>'
'<div style="margin-top:.35rem; display:flex; gap:.5rem; align-items:center; flex-wrap:wrap;">'
'<button type="button" class="btn" onclick="try{this.closest(\'.alts\').remove();}catch(_){}">Close</button>'
'</div>'
+ refresh +
'</div>'
)
# Inline mutate the nearest card tile to reflect the new card without a rerun
mutator = """
<script>
(function(){
try{
var panel = document.currentScript && document.currentScript.previousElementSibling && document.currentScript.previousElementSibling.classList && document.currentScript.previousElementSibling.classList.contains('alts') ? document.currentScript.previousElementSibling : null;
if(!panel){ return; }
var oldName = panel.getAttribute('data-old') || '';
var newName = panel.getAttribute('data-new') || '';
var isOwned = panel.getAttribute('data-owned') === '1';
var isLocked = panel.getAttribute('data-locked') === '1';
var tile = panel.closest('.card-tile');
if(!tile) return;
tile.setAttribute('data-card-name', newName);
var img = tile.querySelector('img.card-thumb');
if(img){
var base = 'https://api.scryfall.com/cards/named?fuzzy=' + encodeURIComponent(newName) + '&format=image&version=';
img.src = base + 'normal';
img.setAttribute('srcset',
'https://api.scryfall.com/cards/named?fuzzy=' + encodeURIComponent(newName) + '&format=image&version=small 160w, ' +
'https://api.scryfall.com/cards/named?fuzzy=' + encodeURIComponent(newName) + '&format=image&version=normal 488w, ' +
'https://api.scryfall.com/cards/named?fuzzy=' + encodeURIComponent(newName) + '&format=image&version=large 672w'
);
img.setAttribute('alt', newName + ' image');
img.setAttribute('data-card-name', newName);
}
var nameEl = tile.querySelector('.name');
if(nameEl){ nameEl.textContent = newName; }
var own = tile.querySelector('.owned-badge');
if(own){
own.textContent = isOwned ? '' : '';
own.title = isOwned ? 'Owned' : 'Not owned';
tile.setAttribute('data-owned', isOwned ? '1' : '0');
}
tile.classList.toggle('locked', isLocked);
var imgBtn = tile.querySelector('.img-btn');
if(imgBtn){
try{
var valsAttr = imgBtn.getAttribute('hx-vals') || '{}';
var obj = JSON.parse(valsAttr.replace(/&quot;/g, '"'));
obj.name = newName;
imgBtn.setAttribute('hx-vals', JSON.stringify(obj));
}catch(e){}
}
var lockBtn = tile.querySelector('.lock-box .btn-lock');
if(lockBtn){
try{
var v = lockBtn.getAttribute('hx-vals') || '{}';
var o = JSON.parse(v.replace(/&quot;/g, '"'));
o.name = newName;
lockBtn.setAttribute('hx-vals', JSON.stringify(o));
}catch(e){}
}
}catch(_){}
})();
</script>
"""
chip = (
f'<div id="last-action" hx-swap-oob="true">'
f'<span class="chip" title="Click to dismiss">Replaced <strong>{o_disp}</strong> → <strong>{new_key}</strong></span>'
f'</div>'
)
# OOB fetch to refresh compliance panel
refresher = (
'<div hx-get="/build/compliance" hx-target="#compliance-panel" hx-swap="outerHTML" '
'hx-trigger="load" hx-swap-oob="true"></div>'
)
# Include data attributes on the panel div for the mutator script
data_owned = '1' if is_owned else '0'
data_locked = '1' if (n in locks) else '0'
prefix = '<div class="alts"'
replacement = (
'<div class="alts" '
+ 'data-old="' + _esc(o_disp) + '" '
+ 'data-new="' + _esc(new_key) + '" '
+ 'data-owned="' + data_owned + '" '
+ 'data-locked="' + data_locked + '"'
)
html = html.replace(prefix, replacement, 1)
return HTMLResponse(html + mutator + chip + refresher)
except Exception:
# Fall back to rerun guidance if inline swap fails
pass
# Fallback: advise rerun
hint = (
'<div class="alts" style="margin-top:.35rem; padding:.5rem; border:1px solid var(--border); border-radius:8px; background:#0f1115;">'
f'<div>Locked <strong>{new}</strong> and unlocked <strong>{old}</strong>.</div>'
'<div class="muted" style="margin-top:.35rem;">Now click <em>Rerun Stage</em> with Replace: On to apply this change.</div>'
'<div style="margin-top:.35rem; display:flex; gap:.5rem; align-items:center; flex-wrap:wrap;">'
'<form hx-post="/build/step5/rerun" hx-target="#wizard" hx-swap="innerHTML" style="display:inline;">'
'<input type="hidden" name="show_skipped" value="1" />'
'<button type="submit" class="btn-rerun">Rerun stage</button>'
'</form>'
'<form hx-post="/build/replace/undo" hx-target="closest .alts" hx-swap="outerHTML" style="display:inline; margin:0;">'
f'<input type="hidden" name="old" value="{old}" />'
f'<input type="hidden" name="new" value="{new}" />'
'<button type="submit" class="btn" title="Undo this replace">Undo</button>'
'</form>'
'<button type="button" class="btn" onclick="try{this.closest(\'.alts\').remove();}catch(_){}">Close</button>'
'</div>'
'</div>'
)
chip = (
f'<div id="last-action" hx-swap-oob="true">'
f'<span class="chip" title="Click to dismiss">Replaced <strong>{old}</strong> → <strong>{new}</strong></span>'
f'</div>'
)
# Also add old to exclusions and bump version for future alt calls
try:
ex = {str(x).strip().lower() for x in (sess.get("alts_exclude", []) or [])}
ex.add(o)
sess["alts_exclude"] = list(ex)
sess["alts_exclude_v"] = int(sess.get("alts_exclude_v") or 0) + 1
except Exception:
pass
return HTMLResponse(hint + chip)
@router.post("/replace/undo", response_class=HTMLResponse)
async def build_replace_undo(request: Request, old: str = Form(None), new: str = Form(None)) -> HTMLResponse:
"""Undo the last replace by restoring the previous lock state (best-effort)."""
sid = request.cookies.get("sid") or new_sid()
sess = get_session(sid)
last = sess.get("last_replace") or {}
try:
# Prefer provided args, else fallback to last recorded
o = (str(old).strip().lower() if old else str(last.get("old") or "")).strip()
n = (str(new).strip().lower() if new else str(last.get("new") or "")).strip()
except Exception:
o, n = "", ""
locks = set(sess.get("locks", []))
changed = False
if n and n in locks:
locks.discard(n)
changed = True
if o:
locks.add(o)
changed = True
sess["locks"] = list(locks)
if sess.get("build_ctx"):
try:
sess["build_ctx"]["locks"] = {str(x) for x in locks}
except Exception:
pass
# Clear last_replace after undo
try:
if sess.get("last_replace"):
del sess["last_replace"]
except Exception:
pass
# Return confirmation panel and OOB chip
msg = 'Undid replace' if changed else 'No changes to undo'
html = (
'<div class="alts" style="margin-top:.35rem; padding:.5rem; border:1px solid var(--border); border-radius:8px; background:#0f1115;">'
f'<div>{msg}.</div>'
'<div class="muted" style="margin-top:.35rem;">Rerun the stage to recompute picks if needed.</div>'
'<div style="margin-top:.35rem; display:flex; gap:.5rem; align-items:center; flex-wrap:wrap;">'
'<form hx-post="/build/step5/rerun" hx-target="#wizard" hx-swap="innerHTML" style="display:inline;">'
'<input type="hidden" name="show_skipped" value="1" />'
'<button type="submit" class="btn-rerun">Rerun stage</button>'
'</form>'
'<button type="button" class="btn" onclick="try{this.closest(\'.alts\').remove();}catch(_){}">Close</button>'
'</div>'
'</div>'
)
chip = (
f'<div id="last-action" hx-swap-oob="true">'
f'<span class="chip" title="Click to dismiss">{msg}</span>'
f'</div>'
)
return HTMLResponse(html + chip)
@router.get("/compare")
async def build_compare(runA: str, runB: str):
"""Stub: return empty diffs; later we can diff summary files under deck_files."""
return JSONResponse({"ok": True, "added": [], "removed": [], "changed": []})
@router.get("/compliance", response_class=HTMLResponse)
async def build_compliance_panel(request: Request) -> HTMLResponse:
"""Render a live Bracket compliance panel with manual enforcement controls.
Computes compliance against the current builder state without exporting, attaches a non-destructive
enforcement plan (swaps with added=None) when FAIL, and returns a reusable HTML partial.
Returns empty content when no active build context exists.
"""
sid = request.cookies.get("sid") or new_sid()
sess = get_session(sid)
ctx = sess.get("build_ctx") or {}
b: DeckBuilder | None = ctx.get("builder") if isinstance(ctx, dict) else None
if not b:
return HTMLResponse("")
# Compute compliance snapshot in-memory and attach planning preview
comp = None
try:
if hasattr(b, 'compute_and_print_compliance'):
comp = b.compute_and_print_compliance(base_stem=None)
except Exception:
comp = None
try:
if comp:
from ..services import orchestrator as orch
comp = orch._attach_enforcement_plan(b, comp)
except Exception:
pass
if not comp:
return HTMLResponse("")
# Build flagged metadata (role, owned) for visual tiles and role-aware alternatives
# For combo violations, expand pairs into individual cards (exclude commander) so each can be replaced.
flagged_meta: list[dict] = []
try:
cats = comp.get('categories') or {}
owned_lower = owned_set_helper()
lib = getattr(b, 'card_library', {}) or {}
commander_l = str((sess.get('commander') or '')).strip().lower()
# map category key -> display label
labels = {
'game_changers': 'Game Changers',
'extra_turns': 'Extra Turns',
'mass_land_denial': 'Mass Land Denial',
'tutors_nonland': 'Nonland Tutors',
'two_card_combos': 'Two-Card Combos',
}
seen_lower: set[str] = set()
for key, cat in cats.items():
try:
status = str(cat.get('status') or '').upper()
# Only surface tiles for WARN and FAIL
if status not in {"WARN", "FAIL"}:
continue
# For two-card combos, split pairs into individual cards and skip commander
if key == 'two_card_combos' and status == 'FAIL':
# Prefer the structured combos list to ensure we only expand counted pairs
pairs = []
try:
for p in (comp.get('combos') or []):
if p.get('cheap_early'):
pairs.append((str(p.get('a') or '').strip(), str(p.get('b') or '').strip()))
except Exception:
pairs = []
# Fallback to parsing flagged strings like "A + B"
if not pairs:
try:
for s in (cat.get('flagged') or []):
if not isinstance(s, str):
continue
parts = [x.strip() for x in s.split('+') if x and x.strip()]
if len(parts) == 2:
pairs.append((parts[0], parts[1]))
except Exception:
pass
for a, bname in pairs:
for nm in (a, bname):
if not nm:
continue
nm_l = nm.strip().lower()
if nm_l == commander_l:
# Don't prompt replacing the commander
continue
if nm_l in seen_lower:
continue
seen_lower.add(nm_l)
entry = lib.get(nm) or lib.get(nm_l) or lib.get(str(nm).strip()) or {}
role = entry.get('Role') or ''
flagged_meta.append({
'name': nm,
'category': labels.get(key, key.replace('_',' ').title()),
'role': role,
'owned': (nm_l in owned_lower),
'severity': status,
})
continue
# Default handling for list/tag categories
names = [n for n in (cat.get('flagged') or []) if isinstance(n, str)]
for nm in names:
nm_l = str(nm).strip().lower()
if nm_l in seen_lower:
continue
seen_lower.add(nm_l)
entry = lib.get(nm) or lib.get(str(nm).strip()) or lib.get(nm_l) or {}
role = entry.get('Role') or ''
flagged_meta.append({
'name': nm,
'category': labels.get(key, key.replace('_',' ').title()),
'role': role,
'owned': (nm_l in owned_lower),
'severity': status,
})
except Exception:
continue
except Exception:
flagged_meta = []
# Render partial
ctx2 = {"request": request, "compliance": comp, "flagged_meta": flagged_meta}
return templates.TemplateResponse("build/_compliance_panel.html", ctx2)
@router.post("/enforce/apply", response_class=HTMLResponse)
async def build_enforce_apply(request: Request) -> HTMLResponse:
"""Apply bracket enforcement now using current locks as user guidance.
This adds lock placeholders if needed, runs enforcement + re-export, reloads compliance, and re-renders Step 5.
"""
sid = request.cookies.get("sid") or new_sid()
sess = get_session(sid)
# Ensure build context exists
ctx = sess.get("build_ctx") or {}
b: DeckBuilder | None = ctx.get("builder") if isinstance(ctx, dict) else None
if not b:
# No active build: show Step 5 with an error
err_ctx = step5_error_ctx(request, sess, "No active build context to enforce.")
resp = templates.TemplateResponse("build/_step5.html", err_ctx)
resp.set_cookie("sid", sid, httponly=True, samesite="lax")
_merge_hx_trigger(resp, {"step5:refresh": {"token": err_ctx.get("summary_token", 0)}})
return resp
# Ensure we have a CSV base stem for consistent re-exports
base_stem = None
try:
csv_path = ctx.get("csv_path")
if isinstance(csv_path, str) and csv_path:
import os as _os
base_stem = _os.path.splitext(_os.path.basename(csv_path))[0]
except Exception:
base_stem = None
# If missing, export once to establish base
if not base_stem:
try:
ctx["csv_path"] = b.export_decklist_csv()
import os as _os
base_stem = _os.path.splitext(_os.path.basename(ctx["csv_path"]))[0]
# Also produce a text export for completeness
ctx["txt_path"] = b.export_decklist_text(filename=base_stem + '.txt')
except Exception:
base_stem = None
# Add lock placeholders into the library before enforcement so user choices are present
try:
locks = {str(x).strip().lower() for x in (sess.get("locks", []) or [])}
if locks:
df = getattr(b, "_combined_cards_df", None)
lib_l = {str(n).strip().lower() for n in getattr(b, 'card_library', {}).keys()}
for lname in locks:
if lname in lib_l:
continue
target_name = None
card_type = ''
mana_cost = ''
try:
if df is not None and not df.empty:
row = df[df['name'].astype(str).str.lower() == lname]
if not row.empty:
target_name = str(row.iloc[0]['name'])
card_type = str(row.iloc[0].get('type', row.iloc[0].get('type_line', '')) or '')
mana_cost = str(row.iloc[0].get('mana_cost', row.iloc[0].get('manaCost', '')) or '')
except Exception:
target_name = None
if target_name:
b.card_library[target_name] = {
'Count': 1,
'Card Type': card_type,
'Mana Cost': mana_cost,
'Role': 'Locked',
'SubRole': '',
'AddedBy': 'Lock',
'TriggerTag': '',
}
except Exception:
pass
# Thread preferred replacements from context onto builder so enforcement can honor them
try:
pref = ctx.get("preferred_replacements") if isinstance(ctx, dict) else None
if isinstance(pref, dict):
setattr(b, 'preferred_replacements', dict(pref))
except Exception:
pass
# Run enforcement + re-exports (tops up to 100 internally)
try:
rep = b.enforce_and_reexport(base_stem=base_stem, mode='auto')
except Exception as e:
err_ctx = step5_error_ctx(request, sess, f"Enforcement failed: {e}")
resp = templates.TemplateResponse("build/_step5.html", err_ctx)
resp.set_cookie("sid", sid, httponly=True, samesite="lax")
_merge_hx_trigger(resp, {"step5:refresh": {"token": err_ctx.get("summary_token", 0)}})
return resp
# Reload compliance JSON and summary
compliance = None
try:
if base_stem:
import os as _os
import json as _json
comp_path = _os.path.join('deck_files', f"{base_stem}_compliance.json")
if _os.path.exists(comp_path):
with open(comp_path, 'r', encoding='utf-8') as _cf:
compliance = _json.load(_cf)
except Exception:
compliance = None
# Rebuild Step 5 context (done state)
# Ensure csv/txt paths on ctx reflect current base
try:
import os as _os
ctx["csv_path"] = _os.path.join('deck_files', f"{base_stem}.csv") if base_stem else ctx.get("csv_path")
ctx["txt_path"] = _os.path.join('deck_files', f"{base_stem}.txt") if base_stem else ctx.get("txt_path")
except Exception:
pass
# Compute total_cards
try:
total_cards = 0
for _n, _e in getattr(b, 'card_library', {}).items():
try:
total_cards += int(_e.get('Count', 1))
except Exception:
total_cards += 1
except Exception:
total_cards = None
res = {
"done": True,
"label": "Complete",
"log_delta": "",
"idx": len(ctx.get("stages", []) or []),
"total": len(ctx.get("stages", []) or []),
"csv_path": ctx.get("csv_path"),
"txt_path": ctx.get("txt_path"),
"summary": getattr(b, 'build_deck_summary', lambda: None)(),
"total_cards": total_cards,
"added_total": 0,
"compliance": compliance or rep,
}
page_ctx = step5_ctx_from_result(request, sess, res, status_text="Build complete", show_skipped=True)
resp = templates.TemplateResponse(request, "build/_step5.html", page_ctx)
resp.set_cookie("sid", sid, httponly=True, samesite="lax")
_merge_hx_trigger(resp, {"step5:refresh": {"token": page_ctx.get("summary_token", 0)}})
return resp
@router.get("/enforcement", response_class=HTMLResponse)
async def build_enforcement_fullpage(request: Request) -> HTMLResponse:
"""Full-page enforcement review: show compliance panel with swaps and controls."""
sid = request.cookies.get("sid") or new_sid()
sess = get_session(sid)
ctx = sess.get("build_ctx") or {}
b: DeckBuilder | None = ctx.get("builder") if isinstance(ctx, dict) else None
if not b:
# No active build
base = step5_empty_ctx(request, sess)
resp = templates.TemplateResponse("build/_step5.html", base)
resp.set_cookie("sid", sid, httponly=True, samesite="lax")
return resp
# Compute compliance snapshot and attach planning preview
comp = None
try:
if hasattr(b, 'compute_and_print_compliance'):
comp = b.compute_and_print_compliance(base_stem=None)
except Exception:
comp = None
try:
if comp:
from ..services import orchestrator as orch
comp = orch._attach_enforcement_plan(b, comp)
except Exception:
pass
try:
summary_token = int(sess.get("step5_summary_token", 0))
except Exception:
summary_token = 0
ctx2 = {"request": request, "compliance": comp, "summary_token": summary_token}
resp = templates.TemplateResponse(request, "build/enforcement.html", ctx2)
resp.set_cookie("sid", sid, httponly=True, samesite="lax")
_merge_hx_trigger(resp, {"step5:refresh": {"token": ctx2.get("summary_token", 0)}})
return resp

File diff suppressed because it is too large Load diff

View file

@ -14,7 +14,6 @@ from fastapi.responses import JSONResponse
from ..app import (
ENABLE_PARTNER_MECHANICS,
ENABLE_PARTNER_SUGGESTIONS,
)
from ..services.telemetry import log_partner_suggestion_selected
from ..services.partner_suggestions import get_partner_suggestions
@ -408,7 +407,7 @@ def _partner_ui_context(
role_hint = "Choose a Doctor to accompany this companion."
# Partner suggestions
suggestions_enabled = bool(ENABLE_PARTNER_MECHANICS and ENABLE_PARTNER_SUGGESTIONS)
suggestions_enabled = bool(ENABLE_PARTNER_MECHANICS)
suggestions_visible: list[dict[str, Any]] = []
suggestions_hidden: list[dict[str, Any]] = []
suggestions_total = 0

View file

@ -0,0 +1,257 @@
"""Build Permalinks and Lock Management Routes
Phase 5 extraction from build.py:
- POST /build/lock - Card lock toggle with HTMX swap
- GET /build/permalink - State serialization (base64 JSON)
- GET /build/from - State restoration from permalink
This module handles build state persistence and card lock management.
"""
from __future__ import annotations
from fastapi import APIRouter, Request, Form, Query
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from typing import Any
import json
import gzip
from ..app import ALLOW_MUST_HAVES, templates
from ..services.tasks import get_session, new_sid
from ..services import orchestrator as orch
from html import escape as _esc
router = APIRouter(prefix="/build")
def _merge_hx_trigger(response: Any, payload: dict[str, Any]) -> None:
if not payload or response is None:
return
try:
existing = response.headers.get("HX-Trigger") if hasattr(response, "headers") else None
except Exception:
existing = None
try:
if existing:
try:
data = json.loads(existing)
except Exception:
data = {}
if isinstance(data, dict):
data.update(payload)
response.headers["HX-Trigger"] = json.dumps(data)
return
response.headers["HX-Trigger"] = json.dumps(payload)
except Exception:
try:
response.headers["HX-Trigger"] = json.dumps(payload)
except Exception:
pass
@router.post("/lock")
async def build_lock(request: Request, name: str = Form(...), locked: int = Form(...), from_list: str = Form(None)) -> HTMLResponse:
"""Toggle card lock for a given card name (HTMX-based).
Maintains an in-session locks set and reflects changes in the build context.
Returns an updated HTML button with HTMX attributes for easy swapping.
"""
sid = request.cookies.get("sid") or new_sid()
sess = get_session(sid)
name_l = str(name).strip().lower()
locks = set(sess.get("locks", []))
is_locked = bool(int(locked or 0))
if is_locked:
locks.add(name_l)
else:
locks.discard(name_l)
sess["locks"] = list(locks)
# Update build context if it exists
try:
ctx = sess.get("build_ctx") or {}
if ctx and isinstance(ctx, dict):
ctx["locks"] = {str(x) for x in locks}
except Exception:
pass
# Build lock button HTML
if is_locked:
label = "🔒"
title = f"Unlock {name}"
next_state = 0
else:
label = "🔓"
title = f"Lock {name}"
next_state = 1
html = (
f'<button class="btn btn-lock" type="button" title="{_esc(title)}" '
f'hx-post="/build/lock" hx-target="this" hx-swap="outerHTML" '
f'hx-vals=\'{{"name":"{_esc(name)}","locked":{next_state}}}\'>{label}</button>'
)
# OOB chip and lock count update
lock_count = len(locks)
chip = (
f'<div id="locks-chip" hx-swap-oob="true">'
f'<span class="chip">🔒 {lock_count}</span>'
f'</div>'
)
# If coming from locked-cards list, remove the row on unlock
if from_list and not is_locked:
# Return empty content to remove the <li> parent of the button
html = ""
return HTMLResponse(html + chip)
@router.get("/permalink")
async def build_permalink(request: Request):
"""Return a URL-safe JSON payload representing current run config (basic)."""
sid = request.cookies.get("sid") or new_sid()
sess = get_session(sid)
payload: dict[str, Any] = {
"commander": sess.get("commander"),
"tags": sess.get("tags", []),
"bracket": sess.get("bracket"),
"ideals": sess.get("ideals"),
"locks": list(sess.get("locks", []) or []),
"tag_mode": sess.get("tag_mode", "AND"),
"flags": {
"owned_only": bool(sess.get("use_owned_only")),
"prefer_owned": bool(sess.get("prefer_owned")),
"swap_mdfc_basics": bool(sess.get("swap_mdfc_basics")),
},
}
# Include random build fields if present
try:
rb = sess.get("random_build")
if isinstance(rb, dict) and rb:
random_payload: dict[str, Any] = {}
for key in ("seed", "theme", "constraints", "primary_theme", "secondary_theme", "tertiary_theme"):
if rb.get(key) is not None:
random_payload[key] = rb.get(key)
if isinstance(rb.get("resolved_themes"), list):
random_payload["resolved_themes"] = list(rb.get("resolved_themes") or [])
if isinstance(rb.get("resolved_theme_info"), dict):
random_payload["resolved_theme_info"] = dict(rb.get("resolved_theme_info"))
if rb.get("combo_fallback") is not None:
random_payload["combo_fallback"] = bool(rb.get("combo_fallback"))
if rb.get("synergy_fallback") is not None:
random_payload["synergy_fallback"] = bool(rb.get("synergy_fallback"))
if rb.get("fallback_reason") is not None:
random_payload["fallback_reason"] = rb.get("fallback_reason")
if isinstance(rb.get("requested_themes"), dict):
requested_payload = dict(rb.get("requested_themes"))
if "auto_fill_enabled" in requested_payload:
requested_payload["auto_fill_enabled"] = bool(requested_payload.get("auto_fill_enabled"))
random_payload["requested_themes"] = requested_payload
if rb.get("auto_fill_enabled") is not None:
random_payload["auto_fill_enabled"] = bool(rb.get("auto_fill_enabled"))
if rb.get("auto_fill_applied") is not None:
random_payload["auto_fill_applied"] = bool(rb.get("auto_fill_applied"))
auto_filled = rb.get("auto_filled_themes")
if isinstance(auto_filled, list):
random_payload["auto_filled_themes"] = list(auto_filled)
display = rb.get("display_themes")
if isinstance(display, list):
random_payload["display_themes"] = list(display)
if random_payload:
payload["random"] = random_payload
except Exception:
pass
# Include exclude_cards if feature is enabled and present
if ALLOW_MUST_HAVES and sess.get("exclude_cards"):
payload["exclude_cards"] = sess.get("exclude_cards")
# Compress and base64 encode the JSON payload for shorter URLs
try:
import base64
raw = json.dumps(payload, separators=(',', ':')).encode("utf-8")
# Use gzip compression to significantly reduce permalink length
compressed = gzip.compress(raw, compresslevel=9)
token = base64.urlsafe_b64encode(compressed).decode("ascii").rstrip("=")
except Exception:
return JSONResponse({"error": "Failed to generate permalink"}, status_code=500)
link = f"/build/from?state={token}"
return JSONResponse({
"permalink": link,
"state": payload,
})
@router.get("/from")
async def build_from(request: Request, state: str | None = None) -> RedirectResponse:
"""Load a run from a permalink token and redirect to main build page."""
sid = request.cookies.get("sid") or new_sid()
sess = get_session(sid)
if state:
try:
import base64
import json as _json
pad = '=' * (-len(state) % 4)
compressed = base64.urlsafe_b64decode((state + pad).encode("ascii"))
# Decompress the state data
raw = gzip.decompress(compressed).decode("utf-8")
data = _json.loads(raw)
sess["commander"] = data.get("commander")
sess["tags"] = data.get("tags", [])
sess["bracket"] = data.get("bracket")
if data.get("ideals"):
sess["ideals"] = data.get("ideals")
sess["tag_mode"] = data.get("tag_mode", "AND")
flags = data.get("flags") or {}
sess["use_owned_only"] = bool(flags.get("owned_only"))
sess["prefer_owned"] = bool(flags.get("prefer_owned"))
sess["swap_mdfc_basics"] = bool(flags.get("swap_mdfc_basics"))
sess["locks"] = list(data.get("locks", []))
# Optional random build rehydration
try:
r = data.get("random") or {}
if r:
rb_payload: dict[str, Any] = {}
for key in ("seed", "theme", "constraints", "primary_theme", "secondary_theme", "tertiary_theme"):
if r.get(key) is not None:
rb_payload[key] = r.get(key)
if isinstance(r.get("resolved_themes"), list):
rb_payload["resolved_themes"] = list(r.get("resolved_themes") or [])
if isinstance(r.get("resolved_theme_info"), dict):
rb_payload["resolved_theme_info"] = dict(r.get("resolved_theme_info"))
if r.get("combo_fallback") is not None:
rb_payload["combo_fallback"] = bool(r.get("combo_fallback"))
if r.get("synergy_fallback") is not None:
rb_payload["synergy_fallback"] = bool(r.get("synergy_fallback"))
if r.get("fallback_reason") is not None:
rb_payload["fallback_reason"] = r.get("fallback_reason")
if isinstance(r.get("requested_themes"), dict):
requested_payload = dict(r.get("requested_themes"))
if "auto_fill_enabled" in requested_payload:
requested_payload["auto_fill_enabled"] = bool(requested_payload.get("auto_fill_enabled"))
rb_payload["requested_themes"] = requested_payload
if r.get("auto_fill_enabled") is not None:
rb_payload["auto_fill_enabled"] = bool(r.get("auto_fill_enabled"))
if r.get("auto_fill_applied") is not None:
rb_payload["auto_fill_applied"] = bool(r.get("auto_fill_applied"))
auto_filled = r.get("auto_filled_themes")
if isinstance(auto_filled, list):
rb_payload["auto_filled_themes"] = list(auto_filled)
display = r.get("display_themes")
if isinstance(display, list):
rb_payload["display_themes"] = list(display)
if "seed" in rb_payload:
try:
seed_int = int(rb_payload["seed"])
rb_payload["seed"] = seed_int
rb_payload.setdefault("recent_seeds", [seed_int])
except Exception:
rb_payload.setdefault("recent_seeds", [])
sess["random_build"] = rb_payload
except Exception:
pass
# Import exclude_cards if feature is enabled and present
if ALLOW_MUST_HAVES and data.get("exclude_cards"):
sess["exclude_cards"] = data.get("exclude_cards")
sess["last_step"] = 4
except Exception:
pass
# Redirect to main build page which will render the proper layout
resp = RedirectResponse(url="/build/", status_code=303)
resp.set_cookie("sid", sid, httponly=True, samesite="lax")
return resp

File diff suppressed because it is too large Load diff

View file

@ -7,7 +7,7 @@ from fastapi.responses import JSONResponse
from deck_builder.combined_commander import PartnerMode
from ..app import ENABLE_PARTNER_MECHANICS, ENABLE_PARTNER_SUGGESTIONS
from ..app import ENABLE_PARTNER_MECHANICS
from ..services.partner_suggestions import get_partner_suggestions
from ..services.telemetry import log_partner_suggestions_generated
@ -64,7 +64,7 @@ async def partner_suggestions_api(
mode: Optional[List[str]] = Query(None, description="Restrict results to specific partner modes"),
refresh: bool = Query(False, description="When true, force a dataset refresh before scoring"),
):
if not (ENABLE_PARTNER_MECHANICS and ENABLE_PARTNER_SUGGESTIONS):
if not ENABLE_PARTNER_MECHANICS:
raise HTTPException(status_code=404, detail="Partner suggestions are disabled")
commander_name = (commander or "").strip()

306
code/web/services/base.py Normal file
View file

@ -0,0 +1,306 @@
"""Base classes for web services.
Provides standardized patterns for service layer implementation including
state management, data loading, and caching.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any, Dict, Generic, Optional, TypeVar
import threading
import time
T = TypeVar("T")
K = TypeVar("K")
V = TypeVar("V")
class ServiceError(Exception):
"""Base exception for service layer errors."""
pass
class ValidationError(ServiceError):
"""Validation failed."""
pass
class NotFoundError(ServiceError):
"""Resource not found."""
pass
class BaseService(ABC):
"""Abstract base class for all services.
Provides common patterns for initialization, validation, and error handling.
Services should be stateless where possible and inject dependencies via __init__.
"""
def __init__(self) -> None:
"""Initialize service. Override in subclasses to inject dependencies."""
pass
def _validate(self, condition: bool, message: str) -> None:
"""Validate a condition, raise ValidationError if false.
Args:
condition: Condition to check
message: Error message if validation fails
Raises:
ValidationError: If condition is False
"""
if not condition:
raise ValidationError(message)
class StateService(BaseService):
"""Base class for services that manage mutable state.
Provides thread-safe state management with automatic cleanup.
Subclasses should implement _initialize_state and _should_cleanup.
"""
def __init__(self) -> None:
super().__init__()
self._state: Dict[str, Dict[str, Any]] = {}
self._lock = threading.RLock()
def get_state(self, key: str) -> Dict[str, Any]:
"""Get or create state for a key.
Args:
key: State key (e.g., session ID)
Returns:
State dictionary
"""
with self._lock:
if key not in self._state:
self._state[key] = self._initialize_state(key)
return self._state[key]
def set_state_value(self, key: str, field: str, value: Any) -> None:
"""Set a field in state.
Args:
key: State key
field: Field name
value: Value to set
"""
with self._lock:
state = self.get_state(key)
state[field] = value
def get_state_value(self, key: str, field: str, default: Any = None) -> Any:
"""Get a field from state.
Args:
key: State key
field: Field name
default: Default value if field not found
Returns:
Field value or default
"""
with self._lock:
state = self.get_state(key)
return state.get(field, default)
def cleanup_state(self) -> int:
"""Clean up expired or invalid state.
Returns:
Number of entries cleaned up
"""
with self._lock:
to_remove = [k for k, v in self._state.items() if self._should_cleanup(k, v)]
for key in to_remove:
del self._state[key]
return len(to_remove)
@abstractmethod
def _initialize_state(self, key: str) -> Dict[str, Any]:
"""Initialize state for a new key.
Args:
key: State key
Returns:
Initial state dictionary
"""
pass
@abstractmethod
def _should_cleanup(self, key: str, state: Dict[str, Any]) -> bool:
"""Check if state should be cleaned up.
Args:
key: State key
state: State dictionary
Returns:
True if state should be removed
"""
pass
class DataService(BaseService, Generic[T]):
"""Base class for services that load and manage data.
Provides patterns for lazy loading, validation, and refresh.
Subclasses should implement _load_data.
"""
def __init__(self) -> None:
super().__init__()
self._data: Optional[T] = None
self._loaded = False
self._lock = threading.RLock()
def get_data(self, force_reload: bool = False) -> T:
"""Get data, loading if necessary.
Args:
force_reload: Force reload even if already loaded
Returns:
Loaded data
Raises:
ServiceError: If data loading fails
"""
with self._lock:
if force_reload or not self._loaded:
self._data = self._load_data()
self._loaded = True
if self._data is None:
raise ServiceError("Failed to load data")
return self._data
def is_loaded(self) -> bool:
"""Check if data is loaded.
Returns:
True if data is loaded
"""
with self._lock:
return self._loaded
def reload(self) -> T:
"""Force reload data.
Returns:
Reloaded data
"""
return self.get_data(force_reload=True)
@abstractmethod
def _load_data(self) -> T:
"""Load data from source.
Returns:
Loaded data
Raises:
ServiceError: If loading fails
"""
pass
class CachedService(BaseService, Generic[K, V]):
"""Base class for services with caching behavior.
Provides thread-safe caching with TTL and size limits.
Subclasses should implement _compute_value.
"""
def __init__(self, ttl_seconds: Optional[int] = None, max_size: Optional[int] = None) -> None:
"""Initialize cached service.
Args:
ttl_seconds: Time-to-live for cache entries (None = no expiration)
max_size: Maximum cache size (None = no limit)
"""
super().__init__()
self._cache: Dict[K, tuple[V, float]] = {}
self._lock = threading.RLock()
self._ttl_seconds = ttl_seconds
self._max_size = max_size
def get(self, key: K, force_recompute: bool = False) -> V:
"""Get cached value or compute it.
Args:
key: Cache key
force_recompute: Force recompute even if cached
Returns:
Cached or computed value
"""
with self._lock:
now = time.time()
# Check cache
if not force_recompute and key in self._cache:
value, timestamp = self._cache[key]
if self._ttl_seconds is None or (now - timestamp) < self._ttl_seconds:
return value
# Compute new value
value = self._compute_value(key)
# Store in cache
self._cache[key] = (value, now)
# Enforce size limit (simple LRU: remove oldest)
if self._max_size is not None and len(self._cache) > self._max_size:
oldest_key = min(self._cache.keys(), key=lambda k: self._cache[k][1])
del self._cache[oldest_key]
return value
def invalidate(self, key: Optional[K] = None) -> None:
"""Invalidate cache entry or entire cache.
Args:
key: Cache key to invalidate (None = invalidate all)
"""
with self._lock:
if key is None:
self._cache.clear()
elif key in self._cache:
del self._cache[key]
def cleanup_expired(self) -> int:
"""Remove expired cache entries.
Returns:
Number of entries removed
"""
if self._ttl_seconds is None:
return 0
with self._lock:
now = time.time()
expired = [k for k, (_, ts) in self._cache.items() if (now - ts) >= self._ttl_seconds]
for key in expired:
del self._cache[key]
return len(expired)
@abstractmethod
def _compute_value(self, key: K) -> V:
"""Compute value for a cache key.
Args:
key: Cache key
Returns:
Computed value
Raises:
ServiceError: If computation fails
"""
pass

View file

@ -0,0 +1,318 @@
"""Service interfaces using Protocol for structural typing.
Defines contracts for different types of services without requiring inheritance.
Use these for type hints and dependency injection.
"""
from __future__ import annotations
from typing import Protocol, Any, Dict, List, Optional, TypeVar, runtime_checkable
import pandas as pd
T = TypeVar("T")
K = TypeVar("K")
V = TypeVar("V")
@runtime_checkable
class SessionService(Protocol):
"""Interface for session management services."""
def new_session_id(self) -> str:
"""Create a new session ID.
Returns:
Unique session identifier
"""
...
def get_session(self, session_id: Optional[str]) -> Dict[str, Any]:
"""Get or create session state.
Args:
session_id: Session identifier (creates new if None)
Returns:
Session state dictionary
"""
...
def set_value(self, session_id: str, key: str, value: Any) -> None:
"""Set a value in session state.
Args:
session_id: Session identifier
key: State key
value: Value to store
"""
...
def get_value(self, session_id: str, key: str, default: Any = None) -> Any:
"""Get a value from session state.
Args:
session_id: Session identifier
key: State key
default: Default value if key not found
Returns:
Stored value or default
"""
...
def cleanup_expired(self) -> int:
"""Clean up expired sessions.
Returns:
Number of sessions cleaned up
"""
...
@runtime_checkable
class CardLoaderService(Protocol):
"""Interface for card data loading services."""
def get_cards(self, force_reload: bool = False) -> pd.DataFrame:
"""Get card data.
Args:
force_reload: Force reload from source
Returns:
DataFrame with card data
"""
...
def is_loaded(self) -> bool:
"""Check if card data is loaded.
Returns:
True if data is loaded
"""
...
@runtime_checkable
class CatalogService(Protocol):
"""Interface for catalog services (commanders, themes, etc.)."""
def get_catalog(self, force_reload: bool = False) -> pd.DataFrame:
"""Get catalog data.
Args:
force_reload: Force reload from source
Returns:
DataFrame with catalog data
"""
...
def search(self, query: str, **filters: Any) -> pd.DataFrame:
"""Search catalog with filters.
Args:
query: Search query string
**filters: Additional filters
Returns:
Filtered DataFrame
"""
...
@runtime_checkable
class OwnedCardsService(Protocol):
"""Interface for owned cards management."""
def get_owned_names(self) -> List[str]:
"""Get list of owned card names.
Returns:
List of card names
"""
...
def add_owned_names(self, names: List[str]) -> None:
"""Add card names to owned list.
Args:
names: Card names to add
"""
...
def remove_owned_name(self, name: str) -> bool:
"""Remove a card name from owned list.
Args:
name: Card name to remove
Returns:
True if removed, False if not found
"""
...
def clear_owned(self) -> None:
"""Clear all owned cards."""
...
def import_from_file(self, file_content: str, format_type: str) -> int:
"""Import owned cards from file content.
Args:
file_content: File content to parse
format_type: Format type (csv, txt, etc.)
Returns:
Number of cards imported
"""
...
@runtime_checkable
class CacheService(Protocol[K, V]):
"""Interface for caching services."""
def get(self, key: K, default: Optional[V] = None) -> Optional[V]:
"""Get cached value.
Args:
key: Cache key
default: Default value if not found
Returns:
Cached value or default
"""
...
def set(self, key: K, value: V, ttl: Optional[int] = None) -> None:
"""Set cached value.
Args:
key: Cache key
value: Value to cache
ttl: Time-to-live in seconds (None = no expiration)
"""
...
def invalidate(self, key: Optional[K] = None) -> None:
"""Invalidate cache entry or entire cache.
Args:
key: Cache key (None = invalidate all)
"""
...
def cleanup_expired(self) -> int:
"""Remove expired cache entries.
Returns:
Number of entries removed
"""
...
@runtime_checkable
class BuildOrchestratorService(Protocol):
"""Interface for deck build orchestration."""
def orchestrate_build(
self,
session_id: str,
commander_name: str,
theme_tags: List[str],
**options: Any
) -> Dict[str, Any]:
"""Orchestrate a deck build.
Args:
session_id: Session identifier
commander_name: Commander card name
theme_tags: List of theme tags
**options: Additional build options
Returns:
Build result dictionary
"""
...
def get_build_status(self, session_id: str) -> Dict[str, Any]:
"""Get build status for a session.
Args:
session_id: Session identifier
Returns:
Build status dictionary
"""
...
@runtime_checkable
class ValidationService(Protocol):
"""Interface for validation services."""
def validate_commander(self, name: str) -> tuple[bool, Optional[str]]:
"""Validate commander name.
Args:
name: Card name
Returns:
(is_valid, error_message) tuple
"""
...
def validate_themes(self, themes: List[str]) -> tuple[bool, List[str]]:
"""Validate theme tags.
Args:
themes: List of theme tags
Returns:
(is_valid, invalid_themes) tuple
"""
...
def normalize_card_name(self, name: str) -> str:
"""Normalize card name for lookups.
Args:
name: Raw card name
Returns:
Normalized card name
"""
...
@runtime_checkable
class TelemetryService(Protocol):
"""Interface for telemetry/metrics services."""
def record_event(self, event_type: str, properties: Optional[Dict[str, Any]] = None) -> None:
"""Record a telemetry event.
Args:
event_type: Type of event
properties: Event properties
"""
...
def record_timing(self, operation: str, duration_ms: float) -> None:
"""Record operation timing.
Args:
operation: Operation name
duration_ms: Duration in milliseconds
"""
...
def increment_counter(self, counter_name: str, value: int = 1) -> None:
"""Increment a counter.
Args:
counter_name: Counter name
value: Increment value
"""
...

View file

@ -0,0 +1,202 @@
"""Service registry for dependency injection.
Provides a centralized registry for managing service instances and dependencies.
Supports singleton and factory patterns with thread-safe access.
"""
from __future__ import annotations
from typing import Any, Callable, Dict, Optional, Type, TypeVar, cast
import threading
T = TypeVar("T")
class ServiceRegistry:
"""Thread-safe service registry for dependency injection.
Manages service instances and factories with support for:
- Singleton services (one instance per registry)
- Factory services (new instance per request)
- Lazy initialization
- Thread-safe access
Example:
registry = ServiceRegistry()
registry.register_singleton(SessionService, session_service_instance)
registry.register_factory(BuildService, lambda: BuildService(deps...))
# Get services
session_svc = registry.get(SessionService)
build_svc = registry.get(BuildService)
"""
def __init__(self) -> None:
"""Initialize empty registry."""
self._singletons: Dict[Type[Any], Any] = {}
self._factories: Dict[Type[Any], Callable[[], Any]] = {}
self._lock = threading.RLock()
def register_singleton(self, service_type: Type[T], instance: T) -> None:
"""Register a singleton service instance.
Args:
service_type: Service type/interface
instance: Service instance to register
Raises:
ValueError: If service already registered
"""
with self._lock:
if service_type in self._singletons or service_type in self._factories:
raise ValueError(f"Service {service_type.__name__} already registered")
self._singletons[service_type] = instance
def register_factory(self, service_type: Type[T], factory: Callable[[], T]) -> None:
"""Register a factory for creating service instances.
Args:
service_type: Service type/interface
factory: Factory function that returns service instance
Raises:
ValueError: If service already registered
"""
with self._lock:
if service_type in self._singletons or service_type in self._factories:
raise ValueError(f"Service {service_type.__name__} already registered")
self._factories[service_type] = factory
def register_lazy_singleton(self, service_type: Type[T], factory: Callable[[], T]) -> None:
"""Register a lazy-initialized singleton service.
The factory will be called once on first access, then the instance is cached.
Args:
service_type: Service type/interface
factory: Factory function that returns service instance
Raises:
ValueError: If service already registered
"""
with self._lock:
if service_type in self._singletons or service_type in self._factories:
raise ValueError(f"Service {service_type.__name__} already registered")
# Wrap factory to cache result
instance_cache: Dict[str, Any] = {}
def lazy_factory() -> T:
if "instance" not in instance_cache:
instance_cache["instance"] = factory()
return instance_cache["instance"]
self._factories[service_type] = lazy_factory
def get(self, service_type: Type[T]) -> T:
"""Get service instance.
Args:
service_type: Service type/interface
Returns:
Service instance
Raises:
KeyError: If service not registered
"""
with self._lock:
# Check singletons first
if service_type in self._singletons:
return cast(T, self._singletons[service_type])
# Check factories
if service_type in self._factories:
return cast(T, self._factories[service_type]())
raise KeyError(f"Service {service_type.__name__} not registered")
def try_get(self, service_type: Type[T]) -> Optional[T]:
"""Try to get service instance, return None if not registered.
Args:
service_type: Service type/interface
Returns:
Service instance or None
"""
try:
return self.get(service_type)
except KeyError:
return None
def is_registered(self, service_type: Type[Any]) -> bool:
"""Check if service is registered.
Args:
service_type: Service type/interface
Returns:
True if registered
"""
with self._lock:
return service_type in self._singletons or service_type in self._factories
def unregister(self, service_type: Type[Any]) -> None:
"""Unregister a service.
Args:
service_type: Service type/interface
"""
with self._lock:
self._singletons.pop(service_type, None)
self._factories.pop(service_type, None)
def clear(self) -> None:
"""Clear all registered services."""
with self._lock:
self._singletons.clear()
self._factories.clear()
def get_registered_types(self) -> list[Type[Any]]:
"""Get list of all registered service types.
Returns:
List of service types
"""
with self._lock:
return list(self._singletons.keys()) + list(self._factories.keys())
# Global registry instance
_global_registry: Optional[ServiceRegistry] = None
_global_registry_lock = threading.Lock()
def get_registry() -> ServiceRegistry:
"""Get the global service registry instance.
Creates registry on first access (lazy initialization).
Returns:
Global ServiceRegistry instance
"""
global _global_registry
if _global_registry is None:
with _global_registry_lock:
if _global_registry is None:
_global_registry = ServiceRegistry()
return _global_registry
def reset_registry() -> None:
"""Reset the global registry (primarily for testing).
Clears all registered services and creates a new registry instance.
"""
global _global_registry
with _global_registry_lock:
_global_registry = ServiceRegistry()

View file

@ -4,45 +4,194 @@ import time
import uuid
from typing import Dict, Any, Optional
# Extremely simple in-memory session/task store for MVP
_SESSIONS: Dict[str, Dict[str, Any]] = {}
_TTL_SECONDS = 60 * 60 * 8 # 8 hours
from .base import StateService
from .interfaces import SessionService
# Session TTL: 8 hours
SESSION_TTL_SECONDS = 60 * 60 * 8
class SessionManager(StateService):
"""Session management service.
Manages user sessions with automatic TTL-based cleanup.
Thread-safe with in-memory storage.
"""
def __init__(self, ttl_seconds: int = SESSION_TTL_SECONDS) -> None:
"""Initialize session manager.
Args:
ttl_seconds: Session time-to-live in seconds
"""
super().__init__()
self._ttl_seconds = ttl_seconds
def new_session_id(self) -> str:
"""Create a new session ID.
Returns:
Unique session identifier
"""
return uuid.uuid4().hex
def touch_session(self, session_id: str) -> Dict[str, Any]:
"""Update session last access time.
Args:
session_id: Session identifier
Returns:
Session state dictionary
"""
now = time.time()
state = self.get_state(session_id)
state["updated"] = now
return state
def get_session(self, session_id: Optional[str]) -> Dict[str, Any]:
"""Get or create session state.
Args:
session_id: Session identifier (creates new if None)
Returns:
Session state dictionary
"""
if not session_id:
session_id = self.new_session_id()
return self.touch_session(session_id)
def set_value(self, session_id: str, key: str, value: Any) -> None:
"""Set a value in session state.
Args:
session_id: Session identifier
key: State key
value: Value to store
"""
self.touch_session(session_id)[key] = value
def get_value(self, session_id: str, key: str, default: Any = None) -> Any:
"""Get a value from session state.
Args:
session_id: Session identifier
key: State key
default: Default value if key not found
Returns:
Stored value or default
"""
return self.touch_session(session_id).get(key, default)
def _initialize_state(self, key: str) -> Dict[str, Any]:
"""Initialize state for a new session.
Args:
key: Session ID
Returns:
Initial session state
"""
now = time.time()
return {"created": now, "updated": now}
def _should_cleanup(self, key: str, state: Dict[str, Any]) -> bool:
"""Check if session should be cleaned up.
Args:
key: Session ID
state: Session state
Returns:
True if session is expired
"""
now = time.time()
updated = state.get("updated", 0)
return (now - updated) > self._ttl_seconds
# Global session manager instance
_session_manager: Optional[SessionManager] = None
def _get_manager() -> SessionManager:
"""Get or create global session manager instance.
Returns:
SessionManager instance
"""
global _session_manager
if _session_manager is None:
_session_manager = SessionManager()
return _session_manager
# Backward-compatible function API
def new_sid() -> str:
return uuid.uuid4().hex
"""Create a new session ID.
Returns:
Unique session identifier
"""
return _get_manager().new_session_id()
def touch_session(sid: str) -> Dict[str, Any]:
now = time.time()
s = _SESSIONS.get(sid)
if not s:
s = {"created": now, "updated": now}
_SESSIONS[sid] = s
else:
s["updated"] = now
return s
"""Update session last access time.
Args:
sid: Session identifier
Returns:
Session state dictionary
"""
return _get_manager().touch_session(sid)
def get_session(sid: Optional[str]) -> Dict[str, Any]:
if not sid:
sid = new_sid()
return touch_session(sid)
"""Get or create session state.
Args:
sid: Session identifier (creates new if None)
Returns:
Session state dictionary
"""
return _get_manager().get_session(sid)
def set_session_value(sid: str, key: str, value: Any) -> None:
touch_session(sid)[key] = value
"""Set a value in session state.
Args:
sid: Session identifier
key: State key
value: Value to store
"""
_get_manager().set_value(sid, key, value)
def get_session_value(sid: str, key: str, default: Any = None) -> Any:
return touch_session(sid).get(key, default)
"""Get a value from session state.
Args:
sid: Session identifier
key: State key
default: Default value if key not found
Returns:
Stored value or default
"""
return _get_manager().get_value(sid, key, default)
def cleanup_expired() -> None:
now = time.time()
expired = [sid for sid, s in _SESSIONS.items() if now - s.get("updated", 0) > _TTL_SECONDS]
for sid in expired:
try:
del _SESSIONS[sid]
except Exception:
pass
def cleanup_expired() -> int:
"""Clean up expired sessions.
Returns:
Number of sessions cleaned up
"""
return _get_manager().cleanup_state()

View file

@ -10,11 +10,6 @@
</aside>
<div class="grow" data-skeleton>
<div hx-get="/build/banner" hx-trigger="load"></div>
{% if locks_restored and locks_restored > 0 %}
<div class="muted" style="margin:.35rem 0;">
<span class="chip" title="Locks restored from permalink">🔒 {{ locks_restored }} locks restored</span>
</div>
{% endif %}
<h4>Chosen Ideals</h4>
<ul>
{% for key, label in labels.items() %}

View file

@ -186,9 +186,7 @@
<span class="chip" title="Multi-Copy package summary"><span class="dot dot-purple"></span> {{ mc_summary }}</span>
{% endif %}
<span id="locks-chip">{% if locks and locks|length > 0 %}<span class="chip" title="Locked cards">🔒 {{ locks|length }} locked</span>{% endif %}</span>
<button type="button" class="btn ml-auto" title="Copy permalink"
onclick="(async()=>{try{const r=await fetch('/build/permalink');const j=await r.json();const url=(j.permalink?location.origin+j.permalink:location.href+'#'+btoa(JSON.stringify(j.state||{}))); await navigator.clipboard.writeText(url); toast && toast('Permalink copied');}catch(e){alert('Copied state to console'); console.log(e);}})()">Copy Permalink</button>
<button type="button" class="btn" title="Open a saved permalink" onclick="(function(){try{var token = prompt('Paste a /build/from?state=... URL or token:'); if(!token) return; var m = token.match(/state=([^&]+)/); var t = m? m[1] : token.trim(); if(!t) return; window.location.href = '/build/from?state=' + encodeURIComponent(t); }catch(_){}})()">Open Permalink…</button>
</div>
{% set pct = ((deck_count / 100.0) * 100.0) if deck_count else 0 %}
{% set pct_clamped = (pct if pct <= 100 else 100) %}

View file

@ -27,7 +27,6 @@
<a href="/decks/compare" class="btn" role="button" title="Compare two finished decks">Compare</a>
<button id="deck-compare-selected" type="button" title="Compare two selected decks" disabled>Compare selected</button>
<button id="deck-compare-latest" type="button" title="Pick the latest two decks">Latest two</button>
<button id="deck-open-permalink" type="button" title="Open a saved permalink">Open Permalink…</button>
<button id="deck-reset-all" type="button" title="Reset filter, sort, and theme">Reset all</button>
<button id="deck-help" type="button" title="Keyboard shortcuts and tips" aria-haspopup="dialog" aria-controls="deck-help-modal">Help</button>
<span id="deck-count" class="muted" aria-live="polite"></span>
@ -127,7 +126,6 @@
var txtOnlyCb = document.getElementById('deck-txt-only');
var cmpSelBtn = document.getElementById('deck-compare-selected');
var cmpLatestBtn = document.getElementById('deck-compare-latest');
var openPermalinkBtn = document.getElementById('deck-open-permalink');
if (!list) return;
// Panels and themes discovery from data-tags-pipe
@ -416,18 +414,6 @@
} catch(_){ }
});
// Open permalink prompt
if (openPermalinkBtn) openPermalinkBtn.addEventListener('click', function(){
try{
var token = prompt('Paste a /build/from?state=... URL or token:');
if(!token) return;
var m = token.match(/state=([^&]+)/);
var t = m ? m[1] : token.trim();
if(!t) return;
window.location.href = '/build/from?state=' + encodeURIComponent(t);
}catch(_){ }
});
if (resetAllBtn) resetAllBtn.addEventListener('click', function(){
// Clear UI state
try {

View file

@ -18,9 +18,7 @@
<div class="random-meta" style="display:flex; gap:12px; align-items:center; flex-wrap:wrap;">
<span class="seed">Seed: <strong>{{ seed }}</strong></span>
{% if theme %}<span class="theme">Theme: <strong>{{ theme }}</strong></span>{% endif %}
{% if permalink %}
<button class="btn btn-compact" type="button" aria-label="Copy permalink for this exact build" onclick="(async()=>{try{await navigator.clipboard.writeText(location.origin + '{{ permalink }}');(window.toast&&toast('Permalink copied'))||console.log('Permalink copied');}catch(e){alert('Copy failed');}})()">Copy Permalink</button>
{% endif %}
{% if show_diagnostics and diagnostics %}
<span class="diag-badges" aria-label="Diagnostics" role="status" aria-live="polite" aria-atomic="true">
<span class="diag-badge" title="Attempts tried before acceptance" aria-label="Attempts tried before acceptance">

View file

@ -148,10 +148,10 @@
</section>
<script>
(function(){
// Minimal styling helper to unify button widths
// Minimal styling helper to unify button widths (only for content buttons)
try {
var style = document.createElement('style');
style.textContent = '.btn{min-width:180px;}';
style.textContent = '.content .btn{min-width:180px;}';
document.head.appendChild(style);
} catch(e){}
function update(data){
@ -325,27 +325,38 @@
statusLine.style.color = '#94a3b8';
if (statsLine) statsLine.style.display = 'none';
} else {
var totalCount = 0;
var totalSizeMB = 0;
if (stats.small) {
totalCount += stats.small.count || 0;
totalSizeMB += stats.small.size_mb || 0;
}
if (stats.normal) {
totalCount += stats.normal.count || 0;
totalSizeMB += stats.normal.size_mb || 0;
}
if (totalCount > 0) {
// Card count = use the max across sizes (each card has one image per size, so avoid double-counting)
var cardCount = Math.max(
(stats.small && stats.small.count) || 0,
(stats.normal && stats.normal.count) || 0
);
var totalSizeMB = ((stats.small && stats.small.size_mb) || 0) + ((stats.normal && stats.normal.size_mb) || 0);
var sizeCount = (stats.small ? 1 : 0) + (stats.normal ? 1 : 0);
if (cardCount > 0) {
statusLine.textContent = 'Cache exists';
statusLine.style.color = '#34d399';
if (statsLine) {
statsLine.style.display = '';
statsLine.textContent = totalCount.toLocaleString() + ' images cached • ' + totalSizeMB.toFixed(1) + ' MB';
var statsText = cardCount.toLocaleString() + ' cards cached • ' + totalSizeMB.toFixed(1) + ' MB';
// If we have last download info, append new card count
if (data.last_download) {
var ld = data.last_download;
if (ld.stats && typeof ld.stats.downloaded === 'number') {
var newCards = sizeCount > 0 ? Math.round(ld.stats.downloaded / sizeCount) : ld.stats.downloaded;
if (newCards > 0) {
statsText += ' • Last run: +' + newCards.toLocaleString() + ' new cards';
} else {
statsText += ' • Last run: fully up to date';
}
} else if (ld.message) {
statsText += ' • ' + ld.message;
}
}
statsLine.textContent = statsText;
}
} else {
statusLine.textContent = 'No images cached';
statusLine.textContent = 'No cards cached';
statusLine.style.color = '#94a3b8';
if (statsLine) statsLine.style.display = 'none';
}
@ -354,6 +365,23 @@
// Hide download progress
if (downloadStatus) downloadStatus.style.display = 'none';
if (progressBar) progressBar.style.display = 'none';
} else if (data.phase === 'error' || data.message) {
// Previous download failed - show error and allow retry
statusLine.textContent = 'Last download failed';
statusLine.style.color = '#f87171';
if (statsLine) {
statsLine.style.display = '';
statsLine.textContent = data.message || 'Unknown error';
}
if (downloadStatus) downloadStatus.style.display = 'none';
if (progressBar) progressBar.style.display = 'none';
} else {
// No stats, no error - likely no download attempted yet
statusLine.textContent = 'No cards cached';
statusLine.style.color = '#94a3b8';
if (statsLine) statsLine.style.display = 'none';
if (downloadStatus) downloadStatus.style.display = 'none';
if (progressBar) progressBar.style.display = 'none';
}
})
.catch(function(){

View file

@ -0,0 +1,13 @@
"""Validation package for web application.
Provides centralized validation using Pydantic models and custom validators
for all web route inputs and business logic validation.
"""
from __future__ import annotations
__all__ = [
"models",
"validators",
"card_names",
"messages",
]

View file

@ -0,0 +1,256 @@
"""Card name validation and normalization.
Provides utilities for validating and normalizing card names against
the card database, handling punctuation, case sensitivity, and multi-face cards.
"""
from __future__ import annotations
from typing import Optional, Tuple, List, Set
import re
import unicodedata
class CardNameValidator:
"""Validates and normalizes card names against card database.
Handles:
- Case normalization
- Punctuation variants
- Multi-face cards (// separator)
- Accent/diacritic handling
- Fuzzy matching for common typos
"""
def __init__(self) -> None:
"""Initialize validator with card database."""
self._card_names: Set[str] = set()
self._normalized_map: dict[str, str] = {}
self._loaded = False
def _ensure_loaded(self) -> None:
"""Lazy-load card database on first use."""
if self._loaded:
return
try:
from deck_builder import builder_utils as bu
df = bu._load_all_cards_parquet()
if not df.empty and 'name' in df.columns:
for name in df['name'].dropna():
name_str = str(name).strip()
if name_str:
self._card_names.add(name_str)
# Map normalized version to original
normalized = self.normalize(name_str)
self._normalized_map[normalized] = name_str
self._loaded = True
except Exception:
# Defensive: if loading fails, validator still works but won't validate
self._loaded = True
@staticmethod
def normalize(name: str) -> str:
"""Normalize card name for comparison.
Args:
name: Raw card name
Returns:
Normalized card name (lowercase, no diacritics, standardized punctuation)
"""
if not name:
return ""
# Strip whitespace
cleaned = name.strip()
# Remove diacritics/accents
nfd = unicodedata.normalize('NFD', cleaned)
cleaned = ''.join(c for c in nfd if unicodedata.category(c) != 'Mn')
# Lowercase
cleaned = cleaned.lower()
# Standardize punctuation
cleaned = re.sub(r"[''`]", "'", cleaned) # Normalize apostrophes
cleaned = re.sub(r'["""]', '"', cleaned) # Normalize quotes
cleaned = re.sub(r'', '-', cleaned) # Normalize dashes
# Collapse multiple spaces
cleaned = re.sub(r'\s+', ' ', cleaned)
return cleaned.strip()
def is_valid(self, name: str) -> bool:
"""Check if card name exists in database.
Args:
name: Card name to validate
Returns:
True if card exists
"""
self._ensure_loaded()
if not name or not name.strip():
return False
# Try exact match first
if name in self._card_names:
return True
# Try normalized match
normalized = self.normalize(name)
return normalized in self._normalized_map
def get_canonical_name(self, name: str) -> Optional[str]:
"""Get canonical (database) name for a card.
Args:
name: Card name (any capitalization/punctuation)
Returns:
Canonical name if found, None otherwise
"""
self._ensure_loaded()
if not name or not name.strip():
return None
# Return exact match if exists
if name in self._card_names:
return name
# Try normalized lookup
normalized = self.normalize(name)
return self._normalized_map.get(normalized)
def validate_and_normalize(self, name: str) -> Tuple[bool, Optional[str], Optional[str]]:
"""Validate and normalize a card name.
Args:
name: Card name to validate
Returns:
(is_valid, canonical_name, error_message) tuple
"""
if not name or not name.strip():
return False, None, "Card name cannot be empty"
canonical = self.get_canonical_name(name)
if canonical:
return True, canonical, None
else:
return False, None, f"Card '{name}' not found in database"
def is_valid_commander(self, name: str) -> Tuple[bool, Optional[str]]:
"""Check if card name is a valid commander.
Args:
name: Card name to validate
Returns:
(is_valid, error_message) tuple
"""
self._ensure_loaded()
is_valid, canonical, error = self.validate_and_normalize(name)
if not is_valid:
return False, error
# Check if card can be commander (has Legendary type)
try:
from deck_builder import builder_utils as bu
df = bu._load_all_cards_parquet()
if not df.empty:
# Match by canonical name
card_row = df[df['name'] == canonical]
if card_row.empty:
return False, f"Card '{name}' not found"
# Check type line for Legendary
type_line = str(card_row['type'].iloc[0] if 'type' in card_row else '')
if 'Legendary' not in type_line and 'legendary' not in type_line.lower():
return False, f"'{name}' is not a Legendary creature (cannot be commander)"
# Check for Creature or Planeswalker
is_creature = 'Creature' in type_line or 'creature' in type_line.lower()
is_pw = 'Planeswalker' in type_line or 'planeswalker' in type_line.lower()
# Check for specific commander abilities
oracle_text = str(card_row['oracle'].iloc[0] if 'oracle' in card_row else '')
can_be_commander = ' can be your commander' in oracle_text.lower()
if not (is_creature or is_pw or can_be_commander):
return False, f"'{name}' cannot be a commander"
return True, None
except Exception:
# Defensive: if check fails, assume valid if card exists
return True, None
def validate_card_list(self, names: List[str]) -> Tuple[List[str], List[str]]:
"""Validate a list of card names.
Args:
names: List of card names to validate
Returns:
(valid_names, invalid_names) tuple with canonical names
"""
valid: List[str] = []
invalid: List[str] = []
for name in names:
is_valid, canonical, _ = self.validate_and_normalize(name)
if is_valid and canonical:
valid.append(canonical)
else:
invalid.append(name)
return valid, invalid
# Global validator instance
_validator: Optional[CardNameValidator] = None
def get_validator() -> CardNameValidator:
"""Get global card name validator instance.
Returns:
CardNameValidator instance
"""
global _validator
if _validator is None:
_validator = CardNameValidator()
return _validator
# Convenience functions
def is_valid_card(name: str) -> bool:
"""Check if card name is valid."""
return get_validator().is_valid(name)
def get_canonical_name(name: str) -> Optional[str]:
"""Get canonical card name."""
return get_validator().get_canonical_name(name)
def is_valid_commander(name: str) -> Tuple[bool, Optional[str]]:
"""Check if card is a valid commander."""
return get_validator().is_valid_commander(name)
def validate_card_list(names: List[str]) -> Tuple[List[str], List[str]]:
"""Validate a list of card names."""
return get_validator().validate_card_list(names)

View file

@ -0,0 +1,129 @@
"""Error message templates for validation errors.
Provides consistent, user-friendly error messages for validation failures.
"""
from __future__ import annotations
from typing import List
class ValidationMessages:
"""Standard validation error messages."""
# Commander validation
COMMANDER_REQUIRED = "Commander name is required"
COMMANDER_INVALID = "Commander '{name}' not found in database"
COMMANDER_NOT_LEGENDARY = "'{name}' is not a Legendary creature (cannot be commander)"
COMMANDER_CANNOT_COMMAND = "'{name}' cannot be a commander"
# Partner validation
PARTNER_REQUIRES_NAME = "Partner mode requires a partner commander name"
BACKGROUND_REQUIRES_NAME = "Background mode requires a background name"
PARTNER_NAME_REQUIRES_MODE = "Partner name specified but partner mode not set"
BACKGROUND_INVALID_MODE = "Background name only valid with background partner mode"
# Theme validation
THEME_INVALID = "Theme '{name}' not found in catalog"
THEMES_INVALID = "Invalid themes: {names}"
THEME_REQUIRED = "At least one theme is required"
# Card validation
CARD_NOT_FOUND = "Card '{name}' not found in database"
CARD_NAME_EMPTY = "Card name cannot be empty"
CARDS_NOT_FOUND = "Cards not found: {names}"
# Bracket validation
BRACKET_INVALID = "Power bracket must be between 1 and 4"
BRACKET_EXCEEDED = "'{name}' is bracket {card_bracket}, exceeds limit of {limit}"
# Color validation
COLOR_IDENTITY_MISMATCH = "Card '{name}' colors ({card_colors}) exceed commander colors ({commander_colors})"
# Custom theme validation
CUSTOM_THEME_REQUIRES_NAME_AND_TAGS = "Custom theme requires both name and tags"
CUSTOM_THEME_NAME_REQUIRED = "Custom theme tags require a theme name"
CUSTOM_THEME_TAGS_REQUIRED = "Custom theme name requires tags"
# List validation
MUST_INCLUDE_TOO_MANY = "Must-include list cannot exceed 99 cards"
MUST_EXCLUDE_TOO_MANY = "Must-exclude list cannot exceed 500 cards"
# Batch validation
BATCH_COUNT_INVALID = "Batch count must be between 1 and 10"
BATCH_COUNT_EXCEEDED = "Batch count cannot exceed 10"
# File validation
FILE_CONTENT_EMPTY = "File content cannot be empty"
FILE_FORMAT_INVALID = "File format '{format}' not supported"
# General
VALUE_REQUIRED = "Value is required"
VALUE_TOO_LONG = "Value exceeds maximum length of {max_length}"
VALUE_TOO_SHORT = "Value must be at least {min_length} characters"
@staticmethod
def format_commander_invalid(name: str) -> str:
"""Format commander invalid message."""
return ValidationMessages.COMMANDER_INVALID.format(name=name)
@staticmethod
def format_commander_not_legendary(name: str) -> str:
"""Format commander not legendary message."""
return ValidationMessages.COMMANDER_NOT_LEGENDARY.format(name=name)
@staticmethod
def format_theme_invalid(name: str) -> str:
"""Format theme invalid message."""
return ValidationMessages.THEME_INVALID.format(name=name)
@staticmethod
def format_themes_invalid(names: List[str]) -> str:
"""Format multiple invalid themes message."""
return ValidationMessages.THEMES_INVALID.format(names=", ".join(names))
@staticmethod
def format_card_not_found(name: str) -> str:
"""Format card not found message."""
return ValidationMessages.CARD_NOT_FOUND.format(name=name)
@staticmethod
def format_cards_not_found(names: List[str]) -> str:
"""Format multiple cards not found message."""
return ValidationMessages.CARDS_NOT_FOUND.format(names=", ".join(names))
@staticmethod
def format_bracket_exceeded(name: str, card_bracket: int, limit: int) -> str:
"""Format bracket exceeded message."""
return ValidationMessages.BRACKET_EXCEEDED.format(
name=name,
card_bracket=card_bracket,
limit=limit
)
@staticmethod
def format_color_mismatch(name: str, card_colors: str, commander_colors: str) -> str:
"""Format color identity mismatch message."""
return ValidationMessages.COLOR_IDENTITY_MISMATCH.format(
name=name,
card_colors=card_colors,
commander_colors=commander_colors
)
@staticmethod
def format_file_format_invalid(format_type: str) -> str:
"""Format invalid file format message."""
return ValidationMessages.FILE_FORMAT_INVALID.format(format=format_type)
@staticmethod
def format_value_too_long(max_length: int) -> str:
"""Format value too long message."""
return ValidationMessages.VALUE_TOO_LONG.format(max_length=max_length)
@staticmethod
def format_value_too_short(min_length: int) -> str:
"""Format value too short message."""
return ValidationMessages.VALUE_TOO_SHORT.format(min_length=min_length)
# Convenience access
MSG = ValidationMessages

View file

@ -0,0 +1,212 @@
"""Pydantic models for request validation.
Defines typed models for all web route inputs with automatic validation.
"""
from __future__ import annotations
from typing import Optional, List
from pydantic import BaseModel, Field, field_validator, model_validator
from enum import Enum
class PowerBracket(int, Enum):
"""Power bracket enumeration (1-4)."""
BRACKET_1 = 1
BRACKET_2 = 2
BRACKET_3 = 3
BRACKET_4 = 4
class DeckMode(str, Enum):
"""Deck building mode."""
STANDARD = "standard"
RANDOM = "random"
HEADLESS = "headless"
class OwnedMode(str, Enum):
"""Owned cards usage mode."""
OFF = "off"
PREFER = "prefer"
ONLY = "only"
class CommanderPartnerType(str, Enum):
"""Commander partner configuration type."""
SINGLE = "single"
PARTNER = "partner"
BACKGROUND = "background"
PARTNER_WITH = "partner_with"
class BuildRequest(BaseModel):
"""Build request validation model."""
commander: str = Field(..., min_length=1, max_length=200, description="Commander card name")
themes: List[str] = Field(default_factory=list, max_length=5, description="Theme tags")
power_bracket: PowerBracket = Field(default=PowerBracket.BRACKET_2, description="Power bracket (1-4)")
# Partner configuration
partner_mode: Optional[CommanderPartnerType] = Field(default=None, description="Partner type")
partner_name: Optional[str] = Field(default=None, max_length=200, description="Partner commander name")
background_name: Optional[str] = Field(default=None, max_length=200, description="Background name")
# Owned cards
owned_mode: OwnedMode = Field(default=OwnedMode.OFF, description="Owned cards mode")
# Custom theme
custom_theme_name: Optional[str] = Field(default=None, max_length=100, description="Custom theme name")
custom_theme_tags: Optional[List[str]] = Field(default=None, max_length=20, description="Custom theme tags")
# Include/exclude lists
must_include: Optional[List[str]] = Field(default=None, max_length=99, description="Must-include card names")
must_exclude: Optional[List[str]] = Field(default=None, max_length=500, description="Must-exclude card names")
# Random modes
random_commander: bool = Field(default=False, description="Randomize commander")
random_themes: bool = Field(default=False, description="Randomize themes")
random_seed: Optional[int] = Field(default=None, ge=0, description="Random seed")
@field_validator("commander")
@classmethod
def validate_commander_not_empty(cls, v: str) -> str:
"""Ensure commander name is not just whitespace."""
if not v or not v.strip():
raise ValueError("Commander name cannot be empty")
return v.strip()
@field_validator("themes")
@classmethod
def validate_themes_unique(cls, v: List[str]) -> List[str]:
"""Ensure themes are unique and non-empty."""
if not v:
return []
cleaned = [t.strip() for t in v if t and t.strip()]
seen = set()
unique = []
for theme in cleaned:
lower = theme.lower()
if lower not in seen:
seen.add(lower)
unique.append(theme)
return unique
@model_validator(mode="after")
def validate_partner_consistency(self) -> "BuildRequest":
"""Validate partner configuration consistency."""
if self.partner_mode == CommanderPartnerType.PARTNER:
if not self.partner_name:
raise ValueError("Partner mode requires partner_name")
if self.partner_mode == CommanderPartnerType.BACKGROUND:
if not self.background_name:
raise ValueError("Background mode requires background_name")
if self.partner_name and not self.partner_mode:
raise ValueError("partner_name requires partner_mode to be set")
if self.background_name and self.partner_mode != CommanderPartnerType.BACKGROUND:
raise ValueError("background_name only valid with background partner_mode")
return self
@model_validator(mode="after")
def validate_custom_theme_consistency(self) -> "BuildRequest":
"""Validate custom theme requires both name and tags."""
if self.custom_theme_name and not self.custom_theme_tags:
raise ValueError("Custom theme requires both name and tags")
if self.custom_theme_tags and not self.custom_theme_name:
raise ValueError("Custom theme tags require theme name")
return self
class CommanderSearchRequest(BaseModel):
"""Commander search/validation request."""
query: str = Field(..., min_length=1, max_length=200, description="Search query")
limit: int = Field(default=10, ge=1, le=100, description="Maximum results")
@field_validator("query")
@classmethod
def validate_query_not_empty(cls, v: str) -> str:
"""Ensure query is not just whitespace."""
if not v or not v.strip():
raise ValueError("Search query cannot be empty")
return v.strip()
class ThemeValidationRequest(BaseModel):
"""Theme validation request."""
themes: List[str] = Field(..., min_length=1, max_length=10, description="Themes to validate")
@field_validator("themes")
@classmethod
def validate_themes_not_empty(cls, v: List[str]) -> List[str]:
"""Ensure themes are not empty."""
cleaned = [t.strip() for t in v if t and t.strip()]
if not cleaned:
raise ValueError("At least one valid theme required")
return cleaned
class OwnedCardsImportRequest(BaseModel):
"""Owned cards import request."""
format_type: str = Field(..., pattern="^(csv|txt|arena)$", description="File format")
content: str = Field(..., min_length=1, description="File content")
@field_validator("content")
@classmethod
def validate_content_not_empty(cls, v: str) -> str:
"""Ensure content is not empty."""
if not v or not v.strip():
raise ValueError("File content cannot be empty")
return v
class BatchBuildRequest(BaseModel):
"""Batch build request for multiple variations."""
base_config: BuildRequest = Field(..., description="Base build configuration")
count: int = Field(..., ge=1, le=10, description="Number of builds to generate")
variation_seed: Optional[int] = Field(default=None, ge=0, description="Seed for variations")
@field_validator("count")
@classmethod
def validate_count_reasonable(cls, v: int) -> int:
"""Ensure batch count is reasonable."""
if v > 10:
raise ValueError("Batch count cannot exceed 10")
return v
class CardReplacementRequest(BaseModel):
"""Card replacement request for compliance."""
card_name: str = Field(..., min_length=1, max_length=200, description="Card to replace")
reason: Optional[str] = Field(default=None, max_length=500, description="Replacement reason")
@field_validator("card_name")
@classmethod
def validate_card_name_not_empty(cls, v: str) -> str:
"""Ensure card name is not empty."""
if not v or not v.strip():
raise ValueError("Card name cannot be empty")
return v.strip()
class DeckExportRequest(BaseModel):
"""Deck export request."""
format_type: str = Field(..., pattern="^(csv|txt|json|arena)$", description="Export format")
include_commanders: bool = Field(default=True, description="Include commanders in export")
include_lands: bool = Field(default=True, description="Include lands in export")
class Config:
"""Pydantic configuration."""
use_enum_values = True

View file

@ -0,0 +1,223 @@
"""Custom validators for business logic validation.
Provides validators for themes, commanders, and other domain-specific validation.
"""
from __future__ import annotations
from typing import List, Tuple, Optional
import pandas as pd
class ThemeValidator:
"""Validates theme tags against theme catalog."""
def __init__(self) -> None:
"""Initialize validator."""
self._themes: set[str] = set()
self._loaded = False
def _ensure_loaded(self) -> None:
"""Lazy-load theme catalog."""
if self._loaded:
return
try:
from ..services import theme_catalog_loader
catalog = theme_catalog_loader.get_theme_catalog()
if not catalog.empty and 'name' in catalog.columns:
for theme in catalog['name'].dropna():
theme_str = str(theme).strip()
if theme_str:
self._themes.add(theme_str)
# Also add lowercase version for case-insensitive matching
self._themes.add(theme_str.lower())
self._loaded = True
except Exception:
self._loaded = True
def is_valid(self, theme: str) -> bool:
"""Check if theme exists in catalog.
Args:
theme: Theme tag to validate
Returns:
True if theme is valid
"""
self._ensure_loaded()
if not theme or not theme.strip():
return False
# Check exact match and case-insensitive
return theme in self._themes or theme.lower() in self._themes
def validate_themes(self, themes: List[str]) -> Tuple[List[str], List[str]]:
"""Validate a list of themes.
Args:
themes: List of theme tags
Returns:
(valid_themes, invalid_themes) tuple
"""
self._ensure_loaded()
valid: List[str] = []
invalid: List[str] = []
for theme in themes:
if not theme or not theme.strip():
continue
if self.is_valid(theme):
valid.append(theme)
else:
invalid.append(theme)
return valid, invalid
def get_all_themes(self) -> List[str]:
"""Get all available themes.
Returns:
List of theme names
"""
self._ensure_loaded()
# Return case-preserved versions
return sorted([t for t in self._themes if t and t[0].isupper()])
class PowerBracketValidator:
"""Validates power bracket values and card compliance."""
@staticmethod
def is_valid_bracket(bracket: int) -> bool:
"""Check if bracket value is valid (1-4).
Args:
bracket: Power bracket value
Returns:
True if valid (1-4)
"""
return isinstance(bracket, int) and 1 <= bracket <= 4
@staticmethod
def validate_card_for_bracket(card_name: str, bracket: int) -> Tuple[bool, Optional[str]]:
"""Check if card is allowed in power bracket.
Args:
card_name: Card name to check
bracket: Target power bracket (1-4)
Returns:
(is_allowed, error_message) tuple
"""
if not PowerBracketValidator.is_valid_bracket(bracket):
return False, f"Invalid power bracket: {bracket}"
try:
from deck_builder import builder_utils as bu
df = bu._load_all_cards_parquet()
if df.empty:
return True, None # Assume allowed if no data
card_row = df[df['name'] == card_name]
if card_row.empty:
return False, f"Card '{card_name}' not found"
# Check bracket column if it exists
if 'bracket' in card_row.columns:
card_bracket = card_row['bracket'].iloc[0]
if pd.notna(card_bracket):
card_bracket_int = int(card_bracket)
if card_bracket_int > bracket:
return False, f"'{card_name}' is bracket {card_bracket_int}, exceeds limit of {bracket}"
return True, None
except Exception:
# Defensive: assume allowed if check fails
return True, None
class ColorIdentityValidator:
"""Validates color identity constraints."""
@staticmethod
def parse_colors(color_str: str) -> set[str]:
"""Parse color identity string to set.
Args:
color_str: Color string (e.g., "W,U,B" or "Grixis")
Returns:
Set of color codes (W, U, B, R, G, C)
"""
if not color_str:
return set()
# Handle comma-separated
if ',' in color_str:
return {c.strip().upper() for c in color_str.split(',') if c.strip()}
# Handle concatenated (e.g., "WUB")
colors = set()
for char in color_str.upper():
if char in 'WUBRGC':
colors.add(char)
return colors
@staticmethod
def is_subset(card_colors: set[str], commander_colors: set[str]) -> bool:
"""Check if card colors are subset of commander colors.
Args:
card_colors: Card's color identity
commander_colors: Commander's color identity
Returns:
True if card is valid in commander's colors
"""
# Colorless cards (C) are valid in any deck
if card_colors == {'C'} or not card_colors:
return True
# Check if card colors are subset of commander colors
return card_colors.issubset(commander_colors)
# Global validator instances
_theme_validator: Optional[ThemeValidator] = None
_bracket_validator: Optional[PowerBracketValidator] = None
_color_validator: Optional[ColorIdentityValidator] = None
def get_theme_validator() -> ThemeValidator:
"""Get global theme validator instance."""
global _theme_validator
if _theme_validator is None:
_theme_validator = ThemeValidator()
return _theme_validator
def get_bracket_validator() -> PowerBracketValidator:
"""Get global bracket validator instance."""
global _bracket_validator
if _bracket_validator is None:
_bracket_validator = PowerBracketValidator()
return _bracket_validator
def get_color_validator() -> ColorIdentityValidator:
"""Get global color validator instance."""
global _color_validator
if _color_validator is None:
_color_validator = ColorIdentityValidator()
return _color_validator