mirror of
https://github.com/mwisnowski/mtg_python_deckbuilder.git
synced 2025-12-17 08:00:13 +01:00
feat: migrate to unified Parquet format with instant GitHub setup and 4x faster tagging
This commit is contained in:
parent
e9e949aae3
commit
8435312c8f
58 changed files with 11921 additions and 3961 deletions
776
code/file_setup/old/setup_utils.py
Normal file
776
code/file_setup/old/setup_utils.py
Normal file
|
|
@ -0,0 +1,776 @@
|
|||
"""MTG Python Deckbuilder setup utilities.
|
||||
|
||||
This module provides utility functions for setting up and managing the MTG Python Deckbuilder
|
||||
application. It handles tasks such as downloading card data, filtering cards by various criteria,
|
||||
and processing legendary creatures for commander format.
|
||||
|
||||
Key Features:
|
||||
- Card data download from MTGJSON
|
||||
- DataFrame filtering and processing
|
||||
- Color identity filtering
|
||||
- Commander validation
|
||||
- CSV file management
|
||||
|
||||
The module integrates with settings.py for configuration and exceptions.py for error handling.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
# Standard library imports
|
||||
import ast
|
||||
import requests
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Union, TypedDict, Iterable, Dict, Any
|
||||
|
||||
# Third-party imports
|
||||
import pandas as pd
|
||||
from tqdm import tqdm
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
# Local application imports
|
||||
from .setup_constants import (
|
||||
CSV_PROCESSING_COLUMNS,
|
||||
CARD_TYPES_TO_EXCLUDE,
|
||||
NON_LEGAL_SETS,
|
||||
SORT_CONFIG,
|
||||
FILTER_CONFIG,
|
||||
COLUMN_ORDER,
|
||||
TAGGED_COLUMN_ORDER,
|
||||
SETUP_COLORS,
|
||||
COLOR_ABRV,
|
||||
BANNED_CARDS,
|
||||
)
|
||||
from exceptions import (
|
||||
MTGJSONDownloadError,
|
||||
DataFrameProcessingError,
|
||||
ColorFilterError,
|
||||
CommanderValidationError
|
||||
)
|
||||
from type_definitions import CardLibraryDF
|
||||
from settings import FILL_NA_COLUMNS, CSV_DIRECTORY
|
||||
import logging_util
|
||||
|
||||
# Create logger for this module
|
||||
logger = logging_util.logging.getLogger(__name__)
|
||||
logger.setLevel(logging_util.LOG_LEVEL)
|
||||
logger.addHandler(logging_util.file_handler)
|
||||
logger.addHandler(logging_util.stream_handler)
|
||||
|
||||
|
||||
def _is_primary_side(value: object) -> bool:
|
||||
"""Return True when the provided side marker corresponds to a primary face."""
|
||||
try:
|
||||
if pd.isna(value):
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
text = str(value).strip().lower()
|
||||
return text in {"", "a"}
|
||||
|
||||
|
||||
def _summarize_secondary_face_exclusions(
|
||||
names: Iterable[str],
|
||||
source_df: pd.DataFrame,
|
||||
) -> List[Dict[str, Any]]:
|
||||
summaries: List[Dict[str, Any]] = []
|
||||
if not names:
|
||||
return summaries
|
||||
|
||||
for raw_name in names:
|
||||
name = str(raw_name)
|
||||
group = source_df[source_df['name'] == name]
|
||||
if group.empty:
|
||||
continue
|
||||
|
||||
primary_rows = group[group['side'].apply(_is_primary_side)] if 'side' in group.columns else pd.DataFrame()
|
||||
primary_face = (
|
||||
str(primary_rows['faceName'].iloc[0])
|
||||
if not primary_rows.empty and 'faceName' in primary_rows.columns
|
||||
else ""
|
||||
)
|
||||
layout = str(group['layout'].iloc[0]) if 'layout' in group.columns and not group.empty else ""
|
||||
faces = sorted(set(str(v) for v in group.get('faceName', pd.Series(dtype=str)).dropna().tolist()))
|
||||
eligible_faces = sorted(
|
||||
set(
|
||||
str(v)
|
||||
for v in group
|
||||
.loc[~group['side'].apply(_is_primary_side) if 'side' in group.columns else [False] * len(group)]
|
||||
.get('faceName', pd.Series(dtype=str))
|
||||
.dropna()
|
||||
.tolist()
|
||||
)
|
||||
)
|
||||
|
||||
summaries.append(
|
||||
{
|
||||
"name": name,
|
||||
"primary_face": primary_face or name.split('//')[0].strip(),
|
||||
"layout": layout,
|
||||
"faces": faces,
|
||||
"eligible_faces": eligible_faces,
|
||||
"reason": "secondary_face_only",
|
||||
}
|
||||
)
|
||||
|
||||
return summaries
|
||||
|
||||
|
||||
def _write_commander_exclusions_log(entries: List[Dict[str, Any]]) -> None:
|
||||
"""Persist commander exclusion diagnostics for downstream tooling."""
|
||||
|
||||
path = Path(CSV_DIRECTORY) / ".commander_exclusions.json"
|
||||
|
||||
if not entries:
|
||||
try:
|
||||
path.unlink()
|
||||
except FileNotFoundError:
|
||||
return
|
||||
except Exception as exc:
|
||||
logger.debug("Unable to remove commander exclusion log: %s", exc)
|
||||
return
|
||||
|
||||
payload = {
|
||||
"generated_at": datetime.now().isoformat(timespec='seconds'),
|
||||
"secondary_face_only": entries,
|
||||
}
|
||||
|
||||
try:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with path.open('w', encoding='utf-8') as handle:
|
||||
json.dump(payload, handle, indent=2, ensure_ascii=False)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to write commander exclusion diagnostics: %s", exc)
|
||||
|
||||
|
||||
def _enforce_primary_face_commander_rules(
|
||||
candidate_df: pd.DataFrame,
|
||||
source_df: pd.DataFrame,
|
||||
) -> pd.DataFrame:
|
||||
"""Retain only primary faces and record any secondary-face-only exclusions."""
|
||||
|
||||
if candidate_df.empty or 'side' not in candidate_df.columns:
|
||||
_write_commander_exclusions_log([])
|
||||
return candidate_df
|
||||
|
||||
mask_primary = candidate_df['side'].apply(_is_primary_side)
|
||||
primary_df = candidate_df[mask_primary].copy()
|
||||
secondary_df = candidate_df[~mask_primary]
|
||||
|
||||
primary_names = set(str(n) for n in primary_df.get('name', pd.Series(dtype=str)))
|
||||
secondary_only_names = sorted(
|
||||
set(str(n) for n in secondary_df.get('name', pd.Series(dtype=str))) - primary_names
|
||||
)
|
||||
|
||||
if secondary_only_names:
|
||||
logger.info(
|
||||
"Excluding %d commander entries where only a secondary face is eligible: %s",
|
||||
len(secondary_only_names),
|
||||
", ".join(secondary_only_names),
|
||||
)
|
||||
|
||||
entries = _summarize_secondary_face_exclusions(secondary_only_names, source_df)
|
||||
_write_commander_exclusions_log(entries)
|
||||
|
||||
return primary_df
|
||||
|
||||
|
||||
def _coerce_tag_list(value: object) -> List[str]:
|
||||
"""Normalize various list-like representations into a list of strings."""
|
||||
|
||||
if value is None:
|
||||
return []
|
||||
if isinstance(value, float) and pd.isna(value):
|
||||
return []
|
||||
if isinstance(value, (list, tuple, set)):
|
||||
return [str(v).strip() for v in value if str(v).strip()]
|
||||
text = str(value).strip()
|
||||
if not text:
|
||||
return []
|
||||
try:
|
||||
parsed = ast.literal_eval(text)
|
||||
if isinstance(parsed, (list, tuple, set)):
|
||||
return [str(v).strip() for v in parsed if str(v).strip()]
|
||||
except Exception:
|
||||
pass
|
||||
parts = [part.strip() for part in text.replace(";", ",").split(",")]
|
||||
return [part for part in parts if part]
|
||||
|
||||
|
||||
def _collect_commander_tag_metadata(csv_dir: Union[str, Path]) -> Dict[str, Dict[str, List[str]]]:
|
||||
"""Aggregate theme and creature tags from color-tagged CSV files."""
|
||||
|
||||
path = Path(csv_dir)
|
||||
if not path.exists():
|
||||
return {}
|
||||
|
||||
combined: Dict[str, Dict[str, set[str]]] = {}
|
||||
columns = ("themeTags", "creatureTypes", "roleTags")
|
||||
|
||||
for color in SETUP_COLORS:
|
||||
color_path = path / f"{color}_cards.csv"
|
||||
if not color_path.exists():
|
||||
continue
|
||||
try:
|
||||
df = pd.read_csv(color_path, low_memory=False)
|
||||
except Exception as exc:
|
||||
logger.debug("Unable to read %s for commander tag enrichment: %s", color_path, exc)
|
||||
continue
|
||||
|
||||
if df.empty or ("name" not in df.columns and "faceName" not in df.columns):
|
||||
continue
|
||||
|
||||
for _, row in df.iterrows():
|
||||
face_key = str(row.get("faceName", "")).strip()
|
||||
name_key = str(row.get("name", "")).strip()
|
||||
keys = {k for k in (face_key, name_key) if k}
|
||||
if not keys:
|
||||
continue
|
||||
|
||||
for key in keys:
|
||||
bucket = combined.setdefault(key, {col: set() for col in columns})
|
||||
for col in columns:
|
||||
if col not in row:
|
||||
continue
|
||||
values = _coerce_tag_list(row.get(col))
|
||||
if values:
|
||||
bucket[col].update(values)
|
||||
|
||||
enriched: Dict[str, Dict[str, List[str]]] = {}
|
||||
for key, data in combined.items():
|
||||
enriched[key] = {col: sorted(values) for col, values in data.items() if values}
|
||||
return enriched
|
||||
|
||||
|
||||
def enrich_commander_rows_with_tags(
|
||||
df: pd.DataFrame,
|
||||
csv_dir: Union[str, Path],
|
||||
) -> pd.DataFrame:
|
||||
"""Attach theme and creature tag metadata to commander rows when available."""
|
||||
|
||||
if df.empty:
|
||||
df = df.copy()
|
||||
for column in ("themeTags", "creatureTypes", "roleTags"):
|
||||
if column not in df.columns:
|
||||
df[column] = []
|
||||
return df
|
||||
|
||||
metadata = _collect_commander_tag_metadata(csv_dir)
|
||||
if not metadata:
|
||||
df = df.copy()
|
||||
for column in ("themeTags", "creatureTypes", "roleTags"):
|
||||
if column not in df.columns:
|
||||
df[column] = [[] for _ in range(len(df))]
|
||||
return df
|
||||
|
||||
df = df.copy()
|
||||
for column in ("themeTags", "creatureTypes", "roleTags"):
|
||||
if column not in df.columns:
|
||||
df[column] = [[] for _ in range(len(df))]
|
||||
|
||||
theme_values: List[List[str]] = []
|
||||
creature_values: List[List[str]] = []
|
||||
role_values: List[List[str]] = []
|
||||
|
||||
for _, row in df.iterrows():
|
||||
face_key = str(row.get("faceName", "")).strip()
|
||||
name_key = str(row.get("name", "")).strip()
|
||||
|
||||
entry_face = metadata.get(face_key, {})
|
||||
entry_name = metadata.get(name_key, {})
|
||||
|
||||
combined: Dict[str, set[str]] = {
|
||||
"themeTags": set(_coerce_tag_list(row.get("themeTags"))),
|
||||
"creatureTypes": set(_coerce_tag_list(row.get("creatureTypes"))),
|
||||
"roleTags": set(_coerce_tag_list(row.get("roleTags"))),
|
||||
}
|
||||
|
||||
for source in (entry_face, entry_name):
|
||||
for column in combined:
|
||||
combined[column].update(source.get(column, []))
|
||||
|
||||
theme_values.append(sorted(combined["themeTags"]))
|
||||
creature_values.append(sorted(combined["creatureTypes"]))
|
||||
role_values.append(sorted(combined["roleTags"]))
|
||||
|
||||
df["themeTags"] = theme_values
|
||||
df["creatureTypes"] = creature_values
|
||||
df["roleTags"] = role_values
|
||||
|
||||
enriched_rows = sum(1 for t, c, r in zip(theme_values, creature_values, role_values) if t or c or r)
|
||||
logger.debug("Enriched %d commander rows with tag metadata", enriched_rows)
|
||||
|
||||
return df
|
||||
|
||||
# Type definitions
|
||||
class FilterRule(TypedDict):
|
||||
"""Type definition for filter rules configuration."""
|
||||
exclude: Optional[List[str]]
|
||||
require: Optional[List[str]]
|
||||
|
||||
class FilterConfig(TypedDict):
|
||||
"""Type definition for complete filter configuration."""
|
||||
layout: FilterRule
|
||||
availability: FilterRule
|
||||
promoTypes: FilterRule
|
||||
securityStamp: FilterRule
|
||||
def download_cards_csv(url: str, output_path: Union[str, Path]) -> None:
|
||||
"""Download cards data from MTGJSON and save to CSV.
|
||||
|
||||
Downloads card data from the specified MTGJSON URL and saves it to a local CSV file.
|
||||
Shows a progress bar during download using tqdm.
|
||||
|
||||
Args:
|
||||
url: URL to download cards data from (typically MTGJSON API endpoint)
|
||||
output_path: Path where the downloaded CSV file will be saved
|
||||
|
||||
Raises:
|
||||
MTGJSONDownloadError: If download fails due to network issues or invalid response
|
||||
|
||||
Example:
|
||||
>>> download_cards_csv('https://mtgjson.com/api/v5/cards.csv', 'cards.csv')
|
||||
"""
|
||||
try:
|
||||
response = requests.get(url, stream=True)
|
||||
response.raise_for_status()
|
||||
total_size = int(response.headers.get('content-length', 0))
|
||||
|
||||
with open(output_path, 'wb') as f:
|
||||
with tqdm(total=total_size, unit='iB', unit_scale=True, desc='Downloading cards data') as pbar:
|
||||
for chunk in response.iter_content(chunk_size=8192):
|
||||
size = f.write(chunk)
|
||||
pbar.update(size)
|
||||
|
||||
except requests.RequestException as e:
|
||||
logger.error(f'Failed to download cards data from {url}')
|
||||
raise MTGJSONDownloadError(
|
||||
"Failed to download cards data",
|
||||
url,
|
||||
getattr(e.response, 'status_code', None) if hasattr(e, 'response') else None
|
||||
) from e
|
||||
def check_csv_exists(filepath: Union[str, Path]) -> bool:
|
||||
"""Check if a CSV file exists at the specified path.
|
||||
|
||||
Verifies the existence of a CSV file at the given path. This function is used
|
||||
to determine if card data needs to be downloaded or if it already exists locally.
|
||||
|
||||
Args:
|
||||
filepath: Path to the CSV file to check
|
||||
|
||||
Returns:
|
||||
bool: True if the file exists, False otherwise
|
||||
|
||||
Example:
|
||||
>>> if not check_csv_exists('cards.csv'):
|
||||
... download_cards_csv(MTGJSON_API_URL, 'cards.csv')
|
||||
"""
|
||||
return Path(filepath).is_file()
|
||||
|
||||
def save_color_filtered_csvs(df: pd.DataFrame, out_dir: Union[str, Path]) -> None:
|
||||
"""Generate and save color-identity filtered CSVs for all configured colors.
|
||||
|
||||
Iterates across configured color names and their corresponding color identity
|
||||
abbreviations, filters the provided DataFrame using standard filters plus
|
||||
color identity, and writes each filtered set to CSV in the provided directory.
|
||||
|
||||
Args:
|
||||
df: Source DataFrame containing card data.
|
||||
out_dir: Output directory for the generated CSV files.
|
||||
|
||||
Raises:
|
||||
DataFrameProcessingError: If filtering fails.
|
||||
ColorFilterError: If color filtering fails for a specific color.
|
||||
"""
|
||||
out_path = Path(out_dir)
|
||||
out_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Base-filter once for efficiency, then per-color filter without redoing base filters
|
||||
try:
|
||||
# Apply full standard filtering including banned list once, then slice per color
|
||||
base_df = filter_dataframe(df, BANNED_CARDS)
|
||||
except Exception as e:
|
||||
# Wrap any unexpected issues as DataFrameProcessingError
|
||||
raise DataFrameProcessingError(
|
||||
"Failed to prepare base DataFrame for color filtering",
|
||||
"base_color_filtering",
|
||||
str(e)
|
||||
) from e
|
||||
|
||||
for color_name, color_id in zip(SETUP_COLORS, COLOR_ABRV):
|
||||
try:
|
||||
logger.info(f"Generating {color_name}_cards.csv")
|
||||
color_df = base_df[base_df['colorIdentity'] == color_id]
|
||||
color_df.to_csv(out_path / f"{color_name}_cards.csv", index=False)
|
||||
except Exception as e:
|
||||
raise ColorFilterError(
|
||||
"Failed to generate color CSV",
|
||||
color_id,
|
||||
str(e)
|
||||
) from e
|
||||
|
||||
def filter_dataframe(df: pd.DataFrame, banned_cards: List[str]) -> pd.DataFrame:
|
||||
"""Apply standard filters to the cards DataFrame using configuration from settings.
|
||||
|
||||
Applies a series of filters to the cards DataFrame based on configuration from settings.py.
|
||||
This includes handling null values, applying basic filters, removing illegal sets and banned cards,
|
||||
and processing special card types.
|
||||
|
||||
Args:
|
||||
df: pandas DataFrame containing card data to filter
|
||||
banned_cards: List of card names that are banned and should be excluded
|
||||
|
||||
Returns:
|
||||
pd.DataFrame: A new DataFrame containing only the cards that pass all filters
|
||||
|
||||
Raises:
|
||||
DataFrameProcessingError: If any filtering operation fails
|
||||
|
||||
Example:
|
||||
>>> filtered_df = filter_dataframe(cards_df, ['Channel', 'Black Lotus'])
|
||||
"""
|
||||
try:
|
||||
logger.info('Starting standard DataFrame filtering')
|
||||
|
||||
# Fill null values according to configuration
|
||||
for col, fill_value in FILL_NA_COLUMNS.items():
|
||||
if col == 'faceName':
|
||||
fill_value = df['name']
|
||||
df[col] = df[col].fillna(fill_value)
|
||||
logger.debug(f'Filled NA values in {col} with {fill_value}')
|
||||
|
||||
# Apply basic filters from configuration
|
||||
filtered_df = df.copy()
|
||||
filter_config: FilterConfig = FILTER_CONFIG # Type hint for configuration
|
||||
for field, rules in filter_config.items():
|
||||
if field not in filtered_df.columns:
|
||||
logger.warning('Skipping filter for missing field %s', field)
|
||||
continue
|
||||
|
||||
for rule_type, values in rules.items():
|
||||
if not values:
|
||||
continue
|
||||
|
||||
if rule_type == 'exclude':
|
||||
for value in values:
|
||||
mask = filtered_df[field].astype(str).str.contains(
|
||||
value,
|
||||
case=False,
|
||||
na=False,
|
||||
regex=False
|
||||
)
|
||||
filtered_df = filtered_df[~mask]
|
||||
elif rule_type == 'require':
|
||||
for value in values:
|
||||
mask = filtered_df[field].astype(str).str.contains(
|
||||
value,
|
||||
case=False,
|
||||
na=False,
|
||||
regex=False
|
||||
)
|
||||
filtered_df = filtered_df[mask]
|
||||
else:
|
||||
logger.warning('Unknown filter rule type %s for field %s', rule_type, field)
|
||||
continue
|
||||
|
||||
logger.debug(f'Applied {rule_type} filter for {field}: {values}')
|
||||
|
||||
# Remove illegal sets
|
||||
for set_code in NON_LEGAL_SETS:
|
||||
filtered_df = filtered_df[~filtered_df['printings'].str.contains(set_code, na=False)]
|
||||
logger.debug('Removed illegal sets')
|
||||
|
||||
# Remove banned cards (exact, case-insensitive match on name or faceName)
|
||||
if banned_cards:
|
||||
banned_set = {b.casefold() for b in banned_cards}
|
||||
name_lc = filtered_df['name'].astype(str).str.casefold()
|
||||
face_lc = filtered_df['faceName'].astype(str).str.casefold()
|
||||
mask = ~(name_lc.isin(banned_set) | face_lc.isin(banned_set))
|
||||
before = len(filtered_df)
|
||||
filtered_df = filtered_df[mask]
|
||||
after = len(filtered_df)
|
||||
logger.debug(f'Removed banned cards: {before - after} filtered out')
|
||||
|
||||
# Remove special card types
|
||||
for card_type in CARD_TYPES_TO_EXCLUDE:
|
||||
filtered_df = filtered_df[~filtered_df['type'].str.contains(card_type, na=False)]
|
||||
logger.debug('Removed special card types')
|
||||
|
||||
# Select columns, sort, and drop duplicates
|
||||
filtered_df = filtered_df[CSV_PROCESSING_COLUMNS]
|
||||
filtered_df = filtered_df.sort_values(
|
||||
by=SORT_CONFIG['columns'],
|
||||
key=lambda col: col.str.lower() if not SORT_CONFIG['case_sensitive'] else col
|
||||
)
|
||||
filtered_df = filtered_df.drop_duplicates(subset='faceName', keep='first')
|
||||
logger.info('Completed standard DataFrame filtering')
|
||||
|
||||
return filtered_df
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f'Failed to filter DataFrame: {str(e)}')
|
||||
raise DataFrameProcessingError(
|
||||
"Failed to filter DataFrame",
|
||||
"standard_filtering",
|
||||
str(e)
|
||||
) from e
|
||||
def filter_by_color_identity(df: pd.DataFrame, color_identity: str) -> pd.DataFrame:
|
||||
"""Filter DataFrame by color identity with additional color-specific processing.
|
||||
|
||||
This function extends the base filter_dataframe functionality with color-specific
|
||||
filtering logic. It is used by setup.py's filter_by_color function but provides
|
||||
a more robust and configurable implementation.
|
||||
|
||||
Args:
|
||||
df: DataFrame to filter
|
||||
color_identity: Color identity to filter by (e.g., 'W', 'U,B', 'Colorless')
|
||||
|
||||
Returns:
|
||||
DataFrame filtered by color identity
|
||||
|
||||
Raises:
|
||||
ColorFilterError: If color identity is invalid or filtering fails
|
||||
DataFrameProcessingError: If general filtering operations fail
|
||||
"""
|
||||
try:
|
||||
logger.info(f'Filtering cards for color identity: {color_identity}')
|
||||
|
||||
# Validate color identity
|
||||
with tqdm(total=1, desc='Validating color identity') as pbar:
|
||||
if not isinstance(color_identity, str):
|
||||
raise ColorFilterError(
|
||||
"Invalid color identity type",
|
||||
str(color_identity),
|
||||
"Color identity must be a string"
|
||||
)
|
||||
pbar.update(1)
|
||||
|
||||
# Apply base filtering
|
||||
with tqdm(total=1, desc='Applying base filtering') as pbar:
|
||||
filtered_df = filter_dataframe(df, BANNED_CARDS)
|
||||
pbar.update(1)
|
||||
|
||||
# Filter by color identity
|
||||
with tqdm(total=1, desc='Filtering by color identity') as pbar:
|
||||
filtered_df = filtered_df[filtered_df['colorIdentity'] == color_identity]
|
||||
logger.debug(f'Applied color identity filter: {color_identity}')
|
||||
pbar.update(1)
|
||||
|
||||
# Additional color-specific processing
|
||||
with tqdm(total=1, desc='Performing color-specific processing') as pbar:
|
||||
# Placeholder for future color-specific processing
|
||||
pbar.update(1)
|
||||
logger.info(f'Completed color identity filtering for {color_identity}')
|
||||
return filtered_df
|
||||
|
||||
except DataFrameProcessingError as e:
|
||||
raise ColorFilterError(
|
||||
"Color filtering failed",
|
||||
color_identity,
|
||||
str(e)
|
||||
) from e
|
||||
except Exception as e:
|
||||
raise ColorFilterError(
|
||||
"Unexpected error during color filtering",
|
||||
color_identity,
|
||||
str(e)
|
||||
) from e
|
||||
|
||||
def process_legendary_cards(df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""Process and filter legendary cards for commander eligibility with comprehensive validation.
|
||||
|
||||
Args:
|
||||
df: DataFrame containing all cards
|
||||
|
||||
Returns:
|
||||
DataFrame containing only commander-eligible cards
|
||||
|
||||
Raises:
|
||||
CommanderValidationError: If validation fails for legendary status, special cases, or set legality
|
||||
DataFrameProcessingError: If general processing fails
|
||||
"""
|
||||
try:
|
||||
logger.info('Starting commander validation process')
|
||||
|
||||
filtered_df = df.copy()
|
||||
# Step 1: Check legendary status
|
||||
try:
|
||||
with tqdm(total=1, desc='Checking legendary status') as pbar:
|
||||
# Normalize type line for matching
|
||||
type_line = filtered_df['type'].astype(str).str.lower()
|
||||
|
||||
# Base predicates
|
||||
is_legendary = type_line.str.contains('legendary')
|
||||
is_creature = type_line.str.contains('creature')
|
||||
# Planeswalkers are only eligible if they explicitly state they can be your commander (handled in special cases step)
|
||||
is_enchantment = type_line.str.contains('enchantment')
|
||||
is_artifact = type_line.str.contains('artifact')
|
||||
is_vehicle_or_spacecraft = type_line.str.contains('vehicle') | type_line.str.contains('spacecraft')
|
||||
|
||||
# 1. Always allow Legendary Creatures (includes artifact/enchantment creatures already)
|
||||
allow_legendary_creature = is_legendary & is_creature
|
||||
|
||||
# 2. Allow Legendary Enchantment Creature (already covered by legendary creature) – ensure no plain legendary enchantments without creature type slip through
|
||||
allow_enchantment_creature = is_legendary & is_enchantment & is_creature
|
||||
|
||||
# 3. Allow certain Legendary Artifacts:
|
||||
# a) Vehicles/Spacecraft that have printed power & toughness
|
||||
has_power_toughness = filtered_df['power'].notna() & filtered_df['toughness'].notna()
|
||||
allow_artifact_vehicle = is_legendary & is_artifact & is_vehicle_or_spacecraft & has_power_toughness
|
||||
|
||||
# (Artifacts or planeswalkers with explicit permission text will be added in special cases step.)
|
||||
|
||||
baseline_mask = allow_legendary_creature | allow_enchantment_creature | allow_artifact_vehicle
|
||||
filtered_df = filtered_df[baseline_mask].copy()
|
||||
|
||||
if filtered_df.empty:
|
||||
raise CommanderValidationError(
|
||||
"No baseline eligible commanders found",
|
||||
"legendary_check",
|
||||
"After applying commander rules no cards qualified"
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
"Baseline commander counts: total=%d legendary_creatures=%d enchantment_creatures=%d artifact_vehicles=%d",
|
||||
len(filtered_df),
|
||||
int((allow_legendary_creature).sum()),
|
||||
int((allow_enchantment_creature).sum()),
|
||||
int((allow_artifact_vehicle).sum())
|
||||
)
|
||||
pbar.update(1)
|
||||
except Exception as e:
|
||||
raise CommanderValidationError(
|
||||
"Legendary status check failed",
|
||||
"legendary_check",
|
||||
str(e)
|
||||
) from e
|
||||
|
||||
# Step 2: Validate special cases
|
||||
try:
|
||||
with tqdm(total=1, desc='Validating special cases') as pbar:
|
||||
# Add any card (including planeswalkers, artifacts, non-legendary cards) that explicitly allow being a commander
|
||||
special_cases = df['text'].str.contains('can be your commander', na=False, case=False)
|
||||
special_commanders = df[special_cases].copy()
|
||||
filtered_df = pd.concat([filtered_df, special_commanders]).drop_duplicates()
|
||||
logger.debug(f'Added {len(special_commanders)} special commander cards')
|
||||
pbar.update(1)
|
||||
except Exception as e:
|
||||
raise CommanderValidationError(
|
||||
"Special case validation failed",
|
||||
"special_cases",
|
||||
str(e)
|
||||
) from e
|
||||
|
||||
# Step 3: Verify set legality
|
||||
try:
|
||||
with tqdm(total=1, desc='Verifying set legality') as pbar:
|
||||
initial_count = len(filtered_df)
|
||||
for set_code in NON_LEGAL_SETS:
|
||||
filtered_df = filtered_df[
|
||||
~filtered_df['printings'].str.contains(set_code, na=False)
|
||||
]
|
||||
removed_count = initial_count - len(filtered_df)
|
||||
logger.debug(f'Removed {removed_count} cards from illegal sets')
|
||||
pbar.update(1)
|
||||
except Exception as e:
|
||||
raise CommanderValidationError(
|
||||
"Set legality verification failed",
|
||||
"set_legality",
|
||||
str(e)
|
||||
) from e
|
||||
filtered_df = _enforce_primary_face_commander_rules(filtered_df, df)
|
||||
|
||||
logger.info('Commander validation complete. %d valid commanders found', len(filtered_df))
|
||||
return filtered_df
|
||||
|
||||
except CommanderValidationError:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise DataFrameProcessingError(
|
||||
"Failed to process legendary cards",
|
||||
"commander_processing",
|
||||
str(e)
|
||||
) from e
|
||||
|
||||
def process_card_dataframe(df: CardLibraryDF, batch_size: int = 1000, columns_to_keep: Optional[List[str]] = None,
|
||||
include_commander_cols: bool = False, skip_availability_checks: bool = False) -> CardLibraryDF:
|
||||
"""Process DataFrame with common operations in batches.
|
||||
|
||||
Args:
|
||||
df: DataFrame to process
|
||||
batch_size: Size of batches for processing
|
||||
columns_to_keep: List of columns to keep (default: COLUMN_ORDER)
|
||||
include_commander_cols: Whether to include commander-specific columns
|
||||
skip_availability_checks: Whether to skip availability and security checks (default: False)
|
||||
|
||||
Args:
|
||||
df: DataFrame to process
|
||||
batch_size: Size of batches for processing
|
||||
columns_to_keep: List of columns to keep (default: COLUMN_ORDER)
|
||||
include_commander_cols: Whether to include commander-specific columns
|
||||
|
||||
Returns:
|
||||
CardLibraryDF: Processed DataFrame with standardized structure
|
||||
"""
|
||||
logger.info("Processing card DataFrame...")
|
||||
|
||||
if columns_to_keep is None:
|
||||
columns_to_keep = TAGGED_COLUMN_ORDER.copy()
|
||||
if include_commander_cols:
|
||||
commander_cols = ['printings', 'text', 'power', 'toughness', 'keywords']
|
||||
columns_to_keep.extend(col for col in commander_cols if col not in columns_to_keep)
|
||||
|
||||
# Fill NA values
|
||||
df.loc[:, 'colorIdentity'] = df['colorIdentity'].fillna('Colorless')
|
||||
df.loc[:, 'faceName'] = df['faceName'].fillna(df['name'])
|
||||
|
||||
# Process in batches
|
||||
total_batches = len(df) // batch_size + 1
|
||||
processed_dfs = []
|
||||
|
||||
for i in tqdm(range(total_batches), desc="Processing batches"):
|
||||
start_idx = i * batch_size
|
||||
end_idx = min((i + 1) * batch_size, len(df))
|
||||
batch = df.iloc[start_idx:end_idx].copy()
|
||||
|
||||
if not skip_availability_checks:
|
||||
columns_to_keep = COLUMN_ORDER.copy()
|
||||
logger.debug("Performing column checks...")
|
||||
# Common processing steps
|
||||
batch = batch[batch['availability'].str.contains('paper', na=False)]
|
||||
batch = batch.loc[batch['layout'] != 'reversible_card']
|
||||
batch = batch.loc[batch['promoTypes'] != 'playtest']
|
||||
batch = batch.loc[batch['securityStamp'] != 'heart']
|
||||
batch = batch.loc[batch['securityStamp'] != 'acorn']
|
||||
# Keep only specified columns
|
||||
batch = batch[columns_to_keep]
|
||||
processed_dfs.append(batch)
|
||||
else:
|
||||
logger.debug("Skipping column checks...")
|
||||
# Even when skipping availability checks, still ensure columns_to_keep if provided
|
||||
if columns_to_keep is not None:
|
||||
try:
|
||||
batch = batch[columns_to_keep]
|
||||
except Exception:
|
||||
# If requested columns are not present, keep as-is
|
||||
pass
|
||||
processed_dfs.append(batch)
|
||||
|
||||
# Combine processed batches
|
||||
result = pd.concat(processed_dfs, ignore_index=True)
|
||||
|
||||
# Final processing
|
||||
result.drop_duplicates(subset='faceName', keep='first', inplace=True)
|
||||
result.sort_values(by=['name', 'side'], key=lambda col: col.str.lower(), inplace=True)
|
||||
|
||||
logger.info("DataFrame processing completed")
|
||||
return result
|
||||
|
||||
# Backward-compatibility wrapper used by deck_builder.builder
|
||||
def regenerate_csvs_all() -> None: # pragma: no cover - simple delegator
|
||||
"""Delegate to setup.regenerate_csvs_all to preserve existing imports.
|
||||
|
||||
Some modules import regenerate_csvs_all from setup_utils. Keep this
|
||||
function as a stable indirection to avoid breaking callers.
|
||||
"""
|
||||
from . import setup as setup_module # local import to avoid circular import
|
||||
setup_module.regenerate_csvs_all()
|
||||
Loading…
Add table
Add a link
Reference in a new issue