|
|
""" |
|
|
Base configuration class for BackgroundFX Pro. |
|
|
|
|
|
Provides a flexible, environment-aware configuration system with validation, |
|
|
type checking, and automatic loading from multiple sources. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import yaml |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, Optional, List, Union, Type, TypeVar |
|
|
from dataclasses import dataclass, field, asdict |
|
|
from abc import ABC, abstractmethod |
|
|
import logging |
|
|
from dotenv import load_dotenv |
|
|
from pydantic import BaseModel, ValidationError, Field |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
T = TypeVar('T', bound='BaseConfig') |
|
|
|
|
|
|
|
|
class ConfigurationError(Exception): |
|
|
"""Raised when configuration is invalid or missing.""" |
|
|
pass |
|
|
|
|
|
|
|
|
class ConfigSource(ABC): |
|
|
"""Abstract base class for configuration sources.""" |
|
|
|
|
|
@abstractmethod |
|
|
def load(self) -> Dict[str, Any]: |
|
|
"""Load configuration from source.""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def exists(self) -> bool: |
|
|
"""Check if configuration source exists.""" |
|
|
pass |
|
|
|
|
|
|
|
|
class EnvFileSource(ConfigSource): |
|
|
"""Load configuration from .env file.""" |
|
|
|
|
|
def __init__(self, path: str = ".env"): |
|
|
self.path = Path(path) |
|
|
|
|
|
def exists(self) -> bool: |
|
|
return self.path.exists() |
|
|
|
|
|
def load(self) -> Dict[str, Any]: |
|
|
if not self.exists(): |
|
|
return {} |
|
|
|
|
|
load_dotenv(self.path) |
|
|
return dict(os.environ) |
|
|
|
|
|
|
|
|
class JSONFileSource(ConfigSource): |
|
|
"""Load configuration from JSON file.""" |
|
|
|
|
|
def __init__(self, path: str): |
|
|
self.path = Path(path) |
|
|
|
|
|
def exists(self) -> bool: |
|
|
return self.path.exists() |
|
|
|
|
|
def load(self) -> Dict[str, Any]: |
|
|
if not self.exists(): |
|
|
return {} |
|
|
|
|
|
with open(self.path, 'r') as f: |
|
|
return json.load(f) |
|
|
|
|
|
|
|
|
class YAMLFileSource(ConfigSource): |
|
|
"""Load configuration from YAML file.""" |
|
|
|
|
|
def __init__(self, path: str): |
|
|
self.path = Path(path) |
|
|
|
|
|
def exists(self) -> bool: |
|
|
return self.path.exists() |
|
|
|
|
|
def load(self) -> Dict[str, Any]: |
|
|
if not self.exists(): |
|
|
return {} |
|
|
|
|
|
with open(self.path, 'r') as f: |
|
|
return yaml.safe_load(f) |
|
|
|
|
|
|
|
|
class EnvironmentSource(ConfigSource): |
|
|
"""Load configuration from environment variables.""" |
|
|
|
|
|
def __init__(self, prefix: str = "BACKGROUNDFX_"): |
|
|
self.prefix = prefix |
|
|
|
|
|
def exists(self) -> bool: |
|
|
return True |
|
|
|
|
|
def load(self) -> Dict[str, Any]: |
|
|
config = {} |
|
|
for key, value in os.environ.items(): |
|
|
if key.startswith(self.prefix): |
|
|
|
|
|
clean_key = key[len(self.prefix):].lower() |
|
|
config[clean_key] = value |
|
|
return config |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class BaseConfig: |
|
|
""" |
|
|
Base configuration class with common settings. |
|
|
|
|
|
Provides methods for loading, validating, and merging configurations |
|
|
from multiple sources. |
|
|
""" |
|
|
|
|
|
|
|
|
app_name: str = "BackgroundFX Pro" |
|
|
app_version: str = "1.0.0" |
|
|
environment: str = field(default_factory=lambda: os.getenv("ENVIRONMENT", "development")) |
|
|
debug: bool = field(default_factory=lambda: os.getenv("DEBUG", "false").lower() == "true") |
|
|
|
|
|
|
|
|
host: str = "0.0.0.0" |
|
|
port: int = 8000 |
|
|
workers: int = 4 |
|
|
reload: bool = False |
|
|
|
|
|
|
|
|
base_dir: Path = field(default_factory=lambda: Path(__file__).parent.parent) |
|
|
data_dir: Path = field(default_factory=lambda: Path("data")) |
|
|
temp_dir: Path = field(default_factory=lambda: Path("/tmp/backgroundfx")) |
|
|
log_dir: Path = field(default_factory=lambda: Path("logs")) |
|
|
|
|
|
|
|
|
log_level: str = "INFO" |
|
|
log_format: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" |
|
|
log_file: Optional[str] = None |
|
|
|
|
|
|
|
|
secret_key: str = field(default_factory=lambda: os.getenv("SECRET_KEY", "change-me-in-production")) |
|
|
api_key_header: str = "X-API-Key" |
|
|
cors_origins: List[str] = field(default_factory=lambda: ["*"]) |
|
|
|
|
|
|
|
|
rate_limit_enabled: bool = True |
|
|
rate_limit_requests: int = 100 |
|
|
rate_limit_window: int = 3600 |
|
|
|
|
|
|
|
|
enable_video_processing: bool = True |
|
|
enable_batch_processing: bool = True |
|
|
enable_ai_backgrounds: bool = True |
|
|
enable_webhooks: bool = True |
|
|
|
|
|
_sources: List[ConfigSource] = field(default_factory=list, init=False, repr=False) |
|
|
_validators: List[callable] = field(default_factory=list, init=False, repr=False) |
|
|
|
|
|
def __post_init__(self): |
|
|
"""Initialize configuration after dataclass creation.""" |
|
|
self.setup_directories() |
|
|
self.validate() |
|
|
|
|
|
def setup_directories(self): |
|
|
"""Create necessary directories if they don't exist.""" |
|
|
for dir_attr in ['data_dir', 'temp_dir', 'log_dir']: |
|
|
dir_path = getattr(self, dir_attr) |
|
|
if isinstance(dir_path, str): |
|
|
dir_path = Path(dir_path) |
|
|
setattr(self, dir_attr, dir_path) |
|
|
dir_path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
def add_source(self, source: ConfigSource): |
|
|
"""Add a configuration source.""" |
|
|
self._sources.append(source) |
|
|
|
|
|
def add_validator(self, validator: callable): |
|
|
"""Add a configuration validator.""" |
|
|
self._validators.append(validator) |
|
|
|
|
|
def load_from_sources(self): |
|
|
"""Load configuration from all registered sources.""" |
|
|
merged_config = {} |
|
|
|
|
|
for source in self._sources: |
|
|
if source.exists(): |
|
|
try: |
|
|
config = source.load() |
|
|
merged_config.update(config) |
|
|
logger.debug(f"Loaded configuration from {source.__class__.__name__}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to load from {source.__class__.__name__}: {e}") |
|
|
|
|
|
|
|
|
for key, value in merged_config.items(): |
|
|
if hasattr(self, key): |
|
|
|
|
|
attr_type = type(getattr(self, key)) |
|
|
try: |
|
|
if attr_type == bool: |
|
|
value = str(value).lower() in ('true', '1', 'yes', 'on') |
|
|
elif attr_type == Path: |
|
|
value = Path(value) |
|
|
else: |
|
|
value = attr_type(value) |
|
|
setattr(self, key, value) |
|
|
except (ValueError, TypeError) as e: |
|
|
logger.warning(f"Failed to set {key}: {e}") |
|
|
|
|
|
def validate(self): |
|
|
"""Validate configuration.""" |
|
|
errors = [] |
|
|
|
|
|
|
|
|
for validator in self._validators: |
|
|
try: |
|
|
validator(self) |
|
|
except Exception as e: |
|
|
errors.append(str(e)) |
|
|
|
|
|
|
|
|
if not self.secret_key or self.secret_key == "change-me-in-production": |
|
|
if self.environment == "production": |
|
|
errors.append("SECRET_KEY must be set in production") |
|
|
|
|
|
if self.port < 1 or self.port > 65535: |
|
|
errors.append(f"Invalid port: {self.port}") |
|
|
|
|
|
if self.workers < 1: |
|
|
errors.append(f"Invalid workers count: {self.workers}") |
|
|
|
|
|
if self.rate_limit_requests < 1: |
|
|
errors.append(f"Invalid rate limit: {self.rate_limit_requests}") |
|
|
|
|
|
if errors: |
|
|
raise ConfigurationError("\n".join(errors)) |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
"""Convert configuration to dictionary.""" |
|
|
return { |
|
|
k: str(v) if isinstance(v, Path) else v |
|
|
for k, v in asdict(self).items() |
|
|
if not k.startswith('_') |
|
|
} |
|
|
|
|
|
def to_json(self, indent: int = 2) -> str: |
|
|
"""Convert configuration to JSON string.""" |
|
|
return json.dumps(self.to_dict(), indent=indent, default=str) |
|
|
|
|
|
def to_yaml(self) -> str: |
|
|
"""Convert configuration to YAML string.""" |
|
|
return yaml.dump(self.to_dict(), default_flow_style=False) |
|
|
|
|
|
def save(self, path: Union[str, Path], format: str = "json"): |
|
|
"""Save configuration to file.""" |
|
|
path = Path(path) |
|
|
|
|
|
if format == "json": |
|
|
content = self.to_json() |
|
|
elif format == "yaml": |
|
|
content = self.to_yaml() |
|
|
elif format == "env": |
|
|
content = self.to_env() |
|
|
else: |
|
|
raise ValueError(f"Unsupported format: {format}") |
|
|
|
|
|
with open(path, 'w') as f: |
|
|
f.write(content) |
|
|
|
|
|
def to_env(self) -> str: |
|
|
"""Convert configuration to .env format.""" |
|
|
lines = [] |
|
|
for key, value in self.to_dict().items(): |
|
|
if value is not None: |
|
|
env_key = f"BACKGROUNDFX_{key.upper()}" |
|
|
if isinstance(value, (list, dict)): |
|
|
value = json.dumps(value) |
|
|
lines.append(f"{env_key}={value}") |
|
|
return "\n".join(lines) |
|
|
|
|
|
@classmethod |
|
|
def from_file(cls: Type[T], path: Union[str, Path]) -> T: |
|
|
"""Load configuration from file.""" |
|
|
path = Path(path) |
|
|
|
|
|
if not path.exists(): |
|
|
raise ConfigurationError(f"Configuration file not found: {path}") |
|
|
|
|
|
config = cls() |
|
|
|
|
|
if path.suffix == ".json": |
|
|
config.add_source(JSONFileSource(str(path))) |
|
|
elif path.suffix in (".yaml", ".yml"): |
|
|
config.add_source(YAMLFileSource(str(path))) |
|
|
elif path.suffix == ".env": |
|
|
config.add_source(EnvFileSource(str(path))) |
|
|
else: |
|
|
raise ConfigurationError(f"Unsupported file format: {path.suffix}") |
|
|
|
|
|
config.load_from_sources() |
|
|
return config |
|
|
|
|
|
@classmethod |
|
|
def from_env(cls: Type[T], prefix: str = "BACKGROUNDFX_") -> T: |
|
|
"""Load configuration from environment variables.""" |
|
|
config = cls() |
|
|
config.add_source(EnvironmentSource(prefix)) |
|
|
config.load_from_sources() |
|
|
return config |
|
|
|
|
|
def merge(self, other: 'BaseConfig'): |
|
|
"""Merge another configuration into this one.""" |
|
|
for key, value in other.to_dict().items(): |
|
|
if hasattr(self, key): |
|
|
setattr(self, key, value) |
|
|
|
|
|
def get_section(self, section: str) -> Dict[str, Any]: |
|
|
"""Get configuration section as dictionary.""" |
|
|
result = {} |
|
|
prefix = f"{section}_" |
|
|
|
|
|
for key, value in self.to_dict().items(): |
|
|
if key.startswith(prefix): |
|
|
clean_key = key[len(prefix):] |
|
|
result[clean_key] = value |
|
|
|
|
|
return result |
|
|
|
|
|
def __repr__(self) -> str: |
|
|
"""String representation of configuration.""" |
|
|
items = [] |
|
|
for key, value in self.to_dict().items(): |
|
|
if 'secret' in key.lower() or 'password' in key.lower() or 'key' in key.lower(): |
|
|
value = "***" |
|
|
items.append(f"{key}={value}") |
|
|
return f"{self.__class__.__name__}({', '.join(items[:5])}...)" |
|
|
|
|
|
|
|
|
class ConfigManager: |
|
|
""" |
|
|
Manages multiple configuration instances and environments. |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self._configs: Dict[str, BaseConfig] = {} |
|
|
self._active: Optional[str] = None |
|
|
|
|
|
def register(self, name: str, config: BaseConfig): |
|
|
"""Register a configuration.""" |
|
|
self._configs[name] = config |
|
|
if self._active is None: |
|
|
self._active = name |
|
|
|
|
|
def get(self, name: Optional[str] = None) -> BaseConfig: |
|
|
"""Get configuration by name or active configuration.""" |
|
|
name = name or self._active |
|
|
if name not in self._configs: |
|
|
raise ConfigurationError(f"Configuration not found: {name}") |
|
|
return self._configs[name] |
|
|
|
|
|
def set_active(self, name: str): |
|
|
"""Set active configuration.""" |
|
|
if name not in self._configs: |
|
|
raise ConfigurationError(f"Configuration not found: {name}") |
|
|
self._active = name |
|
|
|
|
|
def reload(self, name: Optional[str] = None): |
|
|
"""Reload configuration from sources.""" |
|
|
config = self.get(name) |
|
|
config.load_from_sources() |
|
|
config.validate() |
|
|
|
|
|
@property |
|
|
def active(self) -> BaseConfig: |
|
|
"""Get active configuration.""" |
|
|
return self.get() |
|
|
|
|
|
|
|
|
|
|
|
config_manager = ConfigManager() |