|
|
""" |
|
|
Batch processing module for BackgroundFX Pro. |
|
|
Handles efficient processing of multiple files with optimized resource management. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import cv2 |
|
|
import numpy as np |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Optional, Tuple, Union, Callable, Any, Generator |
|
|
from dataclasses import dataclass, field |
|
|
from enum import Enum |
|
|
import time |
|
|
import threading |
|
|
from queue import Queue, PriorityQueue, Empty |
|
|
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed |
|
|
import multiprocessing as mp |
|
|
import json |
|
|
import hashlib |
|
|
import pickle |
|
|
import shutil |
|
|
import tempfile |
|
|
from datetime import datetime |
|
|
import psutil |
|
|
import mimetypes |
|
|
|
|
|
from ..utils.logger import setup_logger |
|
|
from ..utils.device import DeviceManager |
|
|
from ..utils import TimeEstimator, MemoryMonitor |
|
|
from .pipeline import ProcessingPipeline, PipelineConfig, PipelineResult, ProcessingMode |
|
|
from .video_processor import VideoProcessorAPI, VideoStats |
|
|
|
|
|
logger = setup_logger(__name__) |
|
|
|
|
|
|
|
|
class BatchPriority(Enum): |
|
|
"""Batch processing priority levels.""" |
|
|
LOW = 3 |
|
|
NORMAL = 2 |
|
|
HIGH = 1 |
|
|
URGENT = 0 |
|
|
|
|
|
|
|
|
class FileType(Enum): |
|
|
"""Supported file types.""" |
|
|
IMAGE = "image" |
|
|
VIDEO = "video" |
|
|
UNKNOWN = "unknown" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class BatchItem: |
|
|
"""Individual item in batch processing.""" |
|
|
id: str |
|
|
input_path: str |
|
|
output_path: str |
|
|
file_type: FileType |
|
|
priority: BatchPriority = BatchPriority.NORMAL |
|
|
background: Optional[Union[str, np.ndarray]] = None |
|
|
config_overrides: Dict[str, Any] = field(default_factory=dict) |
|
|
metadata: Dict[str, Any] = field(default_factory=dict) |
|
|
retry_count: int = 0 |
|
|
max_retries: int = 3 |
|
|
status: str = "pending" |
|
|
error: Optional[str] = None |
|
|
result: Optional[Any] = None |
|
|
processing_time: float = 0.0 |
|
|
|
|
|
def __lt__(self, other): |
|
|
"""Compare items by priority for PriorityQueue.""" |
|
|
return self.priority.value < other.priority.value |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class BatchConfig: |
|
|
"""Configuration for batch processing.""" |
|
|
|
|
|
max_workers: int = mp.cpu_count() |
|
|
use_multiprocessing: bool = False |
|
|
chunk_size: int = 10 |
|
|
|
|
|
|
|
|
max_memory_gb: float = 8.0 |
|
|
max_gpu_memory_gb: float = 4.0 |
|
|
cpu_limit_percent: float = 80.0 |
|
|
|
|
|
|
|
|
input_dir: Optional[str] = None |
|
|
output_dir: Optional[str] = None |
|
|
recursive: bool = True |
|
|
file_patterns: List[str] = field(default_factory=lambda: ["*.jpg", "*.png", "*.mp4", "*.avi"]) |
|
|
preserve_structure: bool = True |
|
|
|
|
|
|
|
|
default_background: Optional[Union[str, np.ndarray]] = None |
|
|
background_per_file: Dict[str, Union[str, np.ndarray]] = field(default_factory=dict) |
|
|
|
|
|
|
|
|
image_quality: int = 95 |
|
|
video_quality: str = "high" |
|
|
maintain_resolution: bool = True |
|
|
|
|
|
|
|
|
enable_caching: bool = True |
|
|
cache_dir: Optional[str] = None |
|
|
deduplicate: bool = True |
|
|
|
|
|
|
|
|
progress_callback: Optional[Callable[[float, Dict], None]] = None |
|
|
save_report: bool = True |
|
|
report_path: Optional[str] = None |
|
|
|
|
|
|
|
|
stop_on_error: bool = False |
|
|
skip_existing: bool = True |
|
|
|
|
|
|
|
|
pipeline_config: Optional[PipelineConfig] = None |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class BatchReport: |
|
|
"""Batch processing report.""" |
|
|
start_time: datetime |
|
|
end_time: Optional[datetime] = None |
|
|
total_items: int = 0 |
|
|
processed_items: int = 0 |
|
|
successful_items: int = 0 |
|
|
failed_items: int = 0 |
|
|
skipped_items: int = 0 |
|
|
total_processing_time: float = 0.0 |
|
|
avg_processing_time: float = 0.0 |
|
|
total_input_size_mb: float = 0.0 |
|
|
total_output_size_mb: float = 0.0 |
|
|
compression_ratio: float = 1.0 |
|
|
errors: List[Dict[str, Any]] = field(default_factory=list) |
|
|
warnings: List[str] = field(default_factory=list) |
|
|
resource_usage: Dict[str, Any] = field(default_factory=dict) |
|
|
quality_metrics: Dict[str, float] = field(default_factory=dict) |
|
|
|
|
|
|
|
|
class BatchProcessor: |
|
|
"""High-performance batch processing engine.""" |
|
|
|
|
|
def __init__(self, config: Optional[BatchConfig] = None): |
|
|
""" |
|
|
Initialize batch processor. |
|
|
|
|
|
Args: |
|
|
config: Batch processing configuration |
|
|
""" |
|
|
self.config = config or BatchConfig() |
|
|
self.logger = setup_logger(f"{__name__}.BatchProcessor") |
|
|
|
|
|
|
|
|
self.device_manager = DeviceManager() |
|
|
self.memory_monitor = MemoryMonitor() |
|
|
self.time_estimator = TimeEstimator() |
|
|
|
|
|
|
|
|
self.pipeline = ProcessingPipeline(self.config.pipeline_config) |
|
|
self.video_processor = VideoProcessorAPI() |
|
|
|
|
|
|
|
|
self.is_processing = False |
|
|
self.should_stop = False |
|
|
self.current_item = None |
|
|
|
|
|
|
|
|
self.pending_queue = PriorityQueue() |
|
|
self.processing_queue = Queue() |
|
|
self.completed_queue = Queue() |
|
|
|
|
|
|
|
|
if self.config.use_multiprocessing: |
|
|
self.executor = ProcessPoolExecutor(max_workers=self.config.max_workers) |
|
|
else: |
|
|
self.executor = ThreadPoolExecutor(max_workers=self.config.max_workers) |
|
|
|
|
|
|
|
|
self.cache_dir = Path(self.config.cache_dir or tempfile.mkdtemp(prefix="bgfx_cache_")) |
|
|
self.cache_index = {} |
|
|
|
|
|
|
|
|
self.report = BatchReport(start_time=datetime.now()) |
|
|
|
|
|
self.logger.info(f"BatchProcessor initialized with {self.config.max_workers} workers") |
|
|
|
|
|
def process_directory(self, |
|
|
input_dir: str, |
|
|
output_dir: str, |
|
|
background: Optional[Union[str, np.ndarray]] = None) -> BatchReport: |
|
|
""" |
|
|
Process all supported files in a directory. |
|
|
|
|
|
Args: |
|
|
input_dir: Input directory path |
|
|
output_dir: Output directory path |
|
|
background: Default background for all files |
|
|
|
|
|
Returns: |
|
|
Batch processing report |
|
|
""" |
|
|
input_path = Path(input_dir) |
|
|
output_path = Path(output_dir) |
|
|
|
|
|
if not input_path.exists(): |
|
|
raise ValueError(f"Input directory does not exist: {input_dir}") |
|
|
|
|
|
output_path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
items = self._collect_files(input_path, output_path, background) |
|
|
|
|
|
if not items: |
|
|
self.logger.warning("No files found to process") |
|
|
return self.report |
|
|
|
|
|
self.logger.info(f"Found {len(items)} files to process") |
|
|
|
|
|
|
|
|
return self.process_batch(items) |
|
|
|
|
|
def _collect_files(self, |
|
|
input_path: Path, |
|
|
output_path: Path, |
|
|
background: Optional[Union[str, np.ndarray]]) -> List[BatchItem]: |
|
|
"""Collect all files to process from directory.""" |
|
|
items = [] |
|
|
|
|
|
|
|
|
if self.config.recursive: |
|
|
file_iterator = input_path.rglob |
|
|
else: |
|
|
file_iterator = input_path.glob |
|
|
|
|
|
|
|
|
for pattern in self.config.file_patterns: |
|
|
for file_path in file_iterator(pattern): |
|
|
if file_path.is_file(): |
|
|
|
|
|
if self.config.preserve_structure: |
|
|
relative_path = file_path.relative_to(input_path) |
|
|
output_file = output_path / relative_path.parent / f"{file_path.stem}_processed{file_path.suffix}" |
|
|
else: |
|
|
output_file = output_path / f"{file_path.stem}_processed{file_path.suffix}" |
|
|
|
|
|
|
|
|
if self.config.skip_existing and output_file.exists(): |
|
|
self.report.skipped_items += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
file_type = self._detect_file_type(str(file_path)) |
|
|
|
|
|
|
|
|
item = BatchItem( |
|
|
id=self._generate_item_id(file_path), |
|
|
input_path=str(file_path), |
|
|
output_path=str(output_file), |
|
|
file_type=file_type, |
|
|
background=self.config.background_per_file.get( |
|
|
str(file_path), |
|
|
background or self.config.default_background |
|
|
) |
|
|
) |
|
|
|
|
|
items.append(item) |
|
|
|
|
|
return items |
|
|
|
|
|
def process_batch(self, items: List[BatchItem]) -> BatchReport: |
|
|
""" |
|
|
Process a batch of items. |
|
|
|
|
|
Args: |
|
|
items: List of batch items to process |
|
|
|
|
|
Returns: |
|
|
Batch processing report |
|
|
""" |
|
|
self.is_processing = True |
|
|
self.report = BatchReport(start_time=datetime.now()) |
|
|
self.report.total_items = len(items) |
|
|
|
|
|
try: |
|
|
|
|
|
for item in items: |
|
|
self.pending_queue.put(item) |
|
|
|
|
|
|
|
|
if self.config.deduplicate: |
|
|
items = self._deduplicate_items(items) |
|
|
|
|
|
|
|
|
self._process_items(items) |
|
|
|
|
|
finally: |
|
|
self.is_processing = False |
|
|
self.report.end_time = datetime.now() |
|
|
self.report.total_processing_time = ( |
|
|
self.report.end_time - self.report.start_time |
|
|
).total_seconds() |
|
|
|
|
|
if self.report.processed_items > 0: |
|
|
self.report.avg_processing_time = ( |
|
|
self.report.total_processing_time / self.report.processed_items |
|
|
) |
|
|
|
|
|
|
|
|
if self.config.save_report: |
|
|
self._save_report() |
|
|
|
|
|
return self.report |
|
|
|
|
|
def _process_items(self, items: List[BatchItem]): |
|
|
"""Process all items in the batch.""" |
|
|
|
|
|
chunks = [items[i:i + self.config.chunk_size] |
|
|
for i in range(0, len(items), self.config.chunk_size)] |
|
|
|
|
|
for chunk_idx, chunk in enumerate(chunks): |
|
|
if self.should_stop: |
|
|
break |
|
|
|
|
|
|
|
|
self._wait_for_resources() |
|
|
|
|
|
|
|
|
futures = [] |
|
|
for item in chunk: |
|
|
if self.should_stop: |
|
|
break |
|
|
|
|
|
future = self.executor.submit(self._process_single_item, item) |
|
|
futures.append((future, item)) |
|
|
|
|
|
|
|
|
for future, item in futures: |
|
|
try: |
|
|
result = future.result(timeout=300) |
|
|
item.result = result |
|
|
item.status = "completed" if result else "failed" |
|
|
|
|
|
if result: |
|
|
self.report.successful_items += 1 |
|
|
else: |
|
|
self.report.failed_items += 1 |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Processing failed for {item.id}: {e}") |
|
|
item.status = "failed" |
|
|
item.error = str(e) |
|
|
self.report.failed_items += 1 |
|
|
|
|
|
if self.config.stop_on_error: |
|
|
self.should_stop = True |
|
|
break |
|
|
|
|
|
finally: |
|
|
self.report.processed_items += 1 |
|
|
|
|
|
|
|
|
if self.config.progress_callback: |
|
|
progress = self.report.processed_items / self.report.total_items |
|
|
self.config.progress_callback(progress, { |
|
|
'current_item': item.id, |
|
|
'processed': self.report.processed_items, |
|
|
'total': self.report.total_items, |
|
|
'successful': self.report.successful_items, |
|
|
'failed': self.report.failed_items |
|
|
}) |
|
|
|
|
|
def _process_single_item(self, item: BatchItem) -> bool: |
|
|
""" |
|
|
Process a single batch item. |
|
|
|
|
|
Args: |
|
|
item: Batch item to process |
|
|
|
|
|
Returns: |
|
|
True if successful |
|
|
""" |
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
|
|
|
if self.config.enable_caching: |
|
|
cached_result = self._check_cache(item) |
|
|
if cached_result is not None: |
|
|
self._save_cached_result(item, cached_result) |
|
|
item.processing_time = time.time() - start_time |
|
|
return True |
|
|
|
|
|
|
|
|
if item.file_type == FileType.IMAGE: |
|
|
success = self._process_image(item) |
|
|
elif item.file_type == FileType.VIDEO: |
|
|
success = self._process_video(item) |
|
|
else: |
|
|
raise ValueError(f"Unsupported file type: {item.file_type}") |
|
|
|
|
|
|
|
|
if success and self.config.enable_caching: |
|
|
self._cache_result(item) |
|
|
|
|
|
item.processing_time = time.time() - start_time |
|
|
|
|
|
|
|
|
self._update_size_stats(item) |
|
|
|
|
|
return success |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Error processing {item.id}: {e}") |
|
|
item.error = str(e) |
|
|
|
|
|
|
|
|
if item.retry_count < item.max_retries: |
|
|
item.retry_count += 1 |
|
|
self.logger.info(f"Retrying {item.id} (attempt {item.retry_count}/{item.max_retries})") |
|
|
return self._process_single_item(item) |
|
|
|
|
|
return False |
|
|
|
|
|
def _process_image(self, item: BatchItem) -> bool: |
|
|
"""Process an image file.""" |
|
|
try: |
|
|
|
|
|
image = cv2.imread(item.input_path) |
|
|
if image is None: |
|
|
raise ValueError(f"Cannot load image: {item.input_path}") |
|
|
|
|
|
|
|
|
pipeline_config = self.config.pipeline_config or PipelineConfig() |
|
|
for key, value in item.config_overrides.items(): |
|
|
if hasattr(pipeline_config, key): |
|
|
setattr(pipeline_config, key, value) |
|
|
|
|
|
|
|
|
result = self.pipeline.process_image( |
|
|
image, |
|
|
item.background |
|
|
) |
|
|
|
|
|
if result.success and result.output_image is not None: |
|
|
|
|
|
output_path = Path(item.output_path) |
|
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
if output_path.suffix.lower() in ['.jpg', '.jpeg']: |
|
|
cv2.imwrite( |
|
|
str(output_path), |
|
|
result.output_image, |
|
|
[cv2.IMWRITE_JPEG_QUALITY, self.config.image_quality] |
|
|
) |
|
|
else: |
|
|
cv2.imwrite(str(output_path), result.output_image) |
|
|
|
|
|
|
|
|
item.metadata['quality_score'] = result.quality_score |
|
|
self._update_quality_metrics(result.quality_score) |
|
|
|
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Image processing failed for {item.input_path}: {e}") |
|
|
raise |
|
|
|
|
|
def _process_video(self, item: BatchItem) -> bool: |
|
|
"""Process a video file.""" |
|
|
try: |
|
|
|
|
|
output_path = Path(item.output_path) |
|
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
stats = self.video_processor.process_video( |
|
|
item.input_path, |
|
|
str(output_path), |
|
|
item.background |
|
|
) |
|
|
|
|
|
|
|
|
item.metadata['video_stats'] = { |
|
|
'frames_processed': stats.frames_processed, |
|
|
'frames_dropped': stats.frames_dropped, |
|
|
'processing_fps': stats.processing_fps, |
|
|
'avg_quality': stats.avg_quality_score |
|
|
} |
|
|
|
|
|
self._update_quality_metrics(stats.avg_quality_score) |
|
|
|
|
|
return stats.frames_processed > 0 |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Video processing failed for {item.input_path}: {e}") |
|
|
raise |
|
|
|
|
|
def _detect_file_type(self, file_path: str) -> FileType: |
|
|
"""Detect file type from path.""" |
|
|
mime_type, _ = mimetypes.guess_type(file_path) |
|
|
|
|
|
if mime_type: |
|
|
if mime_type.startswith('image/'): |
|
|
return FileType.IMAGE |
|
|
elif mime_type.startswith('video/'): |
|
|
return FileType.VIDEO |
|
|
|
|
|
|
|
|
ext = Path(file_path).suffix.lower() |
|
|
if ext in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp']: |
|
|
return FileType.IMAGE |
|
|
elif ext in ['.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv']: |
|
|
return FileType.VIDEO |
|
|
|
|
|
return FileType.UNKNOWN |
|
|
|
|
|
def _generate_item_id(self, file_path: Path) -> str: |
|
|
"""Generate unique ID for batch item.""" |
|
|
|
|
|
content = f"{file_path}{time.time()}" |
|
|
return hashlib.md5(content.encode()).hexdigest()[:16] |
|
|
|
|
|
def _deduplicate_items(self, items: List[BatchItem]) -> List[BatchItem]: |
|
|
"""Remove duplicate items based on file content hash.""" |
|
|
seen_hashes = set() |
|
|
unique_items = [] |
|
|
|
|
|
for item in items: |
|
|
try: |
|
|
file_hash = self._calculate_file_hash(item.input_path) |
|
|
|
|
|
if file_hash not in seen_hashes: |
|
|
seen_hashes.add(file_hash) |
|
|
unique_items.append(item) |
|
|
else: |
|
|
self.logger.info(f"Skipping duplicate: {item.input_path}") |
|
|
self.report.skipped_items += 1 |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.warning(f"Cannot calculate hash for {item.input_path}: {e}") |
|
|
unique_items.append(item) |
|
|
|
|
|
return unique_items |
|
|
|
|
|
def _calculate_file_hash(self, file_path: str, chunk_size: int = 8192) -> str: |
|
|
"""Calculate MD5 hash of file.""" |
|
|
hasher = hashlib.md5() |
|
|
|
|
|
with open(file_path, 'rb') as f: |
|
|
while chunk:= f.read(chunk_size): |
|
|
hasher.update(chunk) |
|
|
|
|
|
return hasher.hexdigest() |
|
|
|
|
|
def _check_cache(self, item: BatchItem) -> Optional[Any]: |
|
|
"""Check if item result is cached.""" |
|
|
cache_key = self._get_cache_key(item) |
|
|
cache_file = self.cache_dir / f"{cache_key}.pkl" |
|
|
|
|
|
if cache_file.exists(): |
|
|
try: |
|
|
with open(cache_file, 'rb') as f: |
|
|
cached_data = pickle.load(f) |
|
|
|
|
|
|
|
|
if cached_data.get('input_hash') == self._calculate_file_hash(item.input_path): |
|
|
self.logger.info(f"Using cached result for {item.id}") |
|
|
return cached_data['result'] |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.warning(f"Cache read failed: {e}") |
|
|
|
|
|
return None |
|
|
|
|
|
def _cache_result(self, item: BatchItem): |
|
|
"""Cache processing result.""" |
|
|
try: |
|
|
cache_key = self._get_cache_key(item) |
|
|
cache_file = self.cache_dir / f"{cache_key}.pkl" |
|
|
|
|
|
|
|
|
with open(item.output_path, 'rb') as f: |
|
|
result_data = f.read() |
|
|
|
|
|
|
|
|
cache_data = { |
|
|
'input_hash': self._calculate_file_hash(item.input_path), |
|
|
'result': result_data, |
|
|
'metadata': item.metadata, |
|
|
'timestamp': time.time() |
|
|
} |
|
|
|
|
|
with open(cache_file, 'wb') as f: |
|
|
pickle.dump(cache_data, f) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.warning(f"Cache write failed: {e}") |
|
|
|
|
|
def _save_cached_result(self, item: BatchItem, cached_data: bytes): |
|
|
"""Save cached result to output file.""" |
|
|
output_path = Path(item.output_path) |
|
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
with open(output_path, 'wb') as f: |
|
|
f.write(cached_data) |
|
|
|
|
|
def _get_cache_key(self, item: BatchItem) -> str: |
|
|
"""Generate cache key for item.""" |
|
|
|
|
|
key_parts = [ |
|
|
item.input_path, |
|
|
str(item.background) if item.background is not None else "none", |
|
|
json.dumps(item.config_overrides, sort_keys=True) |
|
|
] |
|
|
|
|
|
key_string = "|".join(key_parts) |
|
|
return hashlib.md5(key_string.encode()).hexdigest() |
|
|
|
|
|
def _wait_for_resources(self): |
|
|
"""Wait for sufficient resources before processing.""" |
|
|
while True: |
|
|
|
|
|
cpu_percent = psutil.cpu_percent(interval=1) |
|
|
if cpu_percent > self.config.cpu_limit_percent: |
|
|
self.logger.debug(f"CPU usage high ({cpu_percent}%), waiting...") |
|
|
time.sleep(2) |
|
|
continue |
|
|
|
|
|
|
|
|
memory = psutil.virtual_memory() |
|
|
memory_gb = (memory.total - memory.available) / (1024**3) |
|
|
if memory_gb > self.config.max_memory_gb: |
|
|
self.logger.debug(f"Memory usage high ({memory_gb:.1f}GB), waiting...") |
|
|
time.sleep(2) |
|
|
continue |
|
|
|
|
|
|
|
|
break |
|
|
|
|
|
def _update_size_stats(self, item: BatchItem): |
|
|
"""Update file size statistics.""" |
|
|
try: |
|
|
input_size = os.path.getsize(item.input_path) / (1024**2) |
|
|
output_size = os.path.getsize(item.output_path) / (1024**2) |
|
|
|
|
|
self.report.total_input_size_mb += input_size |
|
|
self.report.total_output_size_mb += output_size |
|
|
|
|
|
if self.report.total_input_size_mb > 0: |
|
|
self.report.compression_ratio = ( |
|
|
self.report.total_output_size_mb / self.report.total_input_size_mb |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.warning(f"Cannot update size stats: {e}") |
|
|
|
|
|
def _update_quality_metrics(self, quality_score: float): |
|
|
"""Update quality metrics in report.""" |
|
|
if 'scores' not in self.report.quality_metrics: |
|
|
self.report.quality_metrics['scores'] = [] |
|
|
|
|
|
self.report.quality_metrics['scores'].append(quality_score) |
|
|
|
|
|
scores = self.report.quality_metrics['scores'] |
|
|
self.report.quality_metrics['avg_quality'] = np.mean(scores) |
|
|
self.report.quality_metrics['min_quality'] = np.min(scores) |
|
|
self.report.quality_metrics['max_quality'] = np.max(scores) |
|
|
self.report.quality_metrics['std_quality'] = np.std(scores) |
|
|
|
|
|
def _save_report(self): |
|
|
"""Save processing report to file.""" |
|
|
try: |
|
|
report_path = self.config.report_path |
|
|
if not report_path: |
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
report_path = f"batch_report_{timestamp}.json" |
|
|
|
|
|
report_dict = { |
|
|
'start_time': self.report.start_time.isoformat(), |
|
|
'end_time': self.report.end_time.isoformat() if self.report.end_time else None, |
|
|
'total_items': self.report.total_items, |
|
|
'processed_items': self.report.processed_items, |
|
|
'successful_items': self.report.successful_items, |
|
|
'failed_items': self.report.failed_items, |
|
|
'skipped_items': self.report.skipped_items, |
|
|
'total_processing_time': self.report.total_processing_time, |
|
|
'avg_processing_time': self.report.avg_processing_time, |
|
|
'total_input_size_mb': self.report.total_input_size_mb, |
|
|
'total_output_size_mb': self.report.total_output_size_mb, |
|
|
'compression_ratio': self.report.compression_ratio, |
|
|
'quality_metrics': self.report.quality_metrics, |
|
|
'errors': self.report.errors, |
|
|
'warnings': self.report.warnings |
|
|
} |
|
|
|
|
|
with open(report_path, 'w') as f: |
|
|
json.dump(report_dict, f, indent=2) |
|
|
|
|
|
self.logger.info(f"Report saved to {report_path}") |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Failed to save report: {e}") |
|
|
|
|
|
def process_with_pattern(self, |
|
|
pattern: str, |
|
|
output_template: str, |
|
|
background: Optional[Union[str, np.ndarray]] = None) -> BatchReport: |
|
|
""" |
|
|
Process files matching a pattern with template-based output. |
|
|
|
|
|
Args: |
|
|
pattern: File pattern (e.g., "images/*.jpg") |
|
|
output_template: Output path template (e.g., "output/{name}_bg.{ext}") |
|
|
background: Background for processing |
|
|
|
|
|
Returns: |
|
|
Batch processing report |
|
|
""" |
|
|
items = [] |
|
|
|
|
|
for file_path in Path().glob(pattern): |
|
|
if file_path.is_file(): |
|
|
|
|
|
output_path = output_template.format( |
|
|
name=file_path.stem, |
|
|
ext=file_path.suffix[1:], |
|
|
dir=file_path.parent, |
|
|
date=datetime.now().strftime("%Y%m%d") |
|
|
) |
|
|
|
|
|
item = BatchItem( |
|
|
id=self._generate_item_id(file_path), |
|
|
input_path=str(file_path), |
|
|
output_path=output_path, |
|
|
file_type=self._detect_file_type(str(file_path)), |
|
|
background=background |
|
|
) |
|
|
|
|
|
items.append(item) |
|
|
|
|
|
return self.process_batch(items) |
|
|
|
|
|
def stop_processing(self): |
|
|
"""Stop batch processing.""" |
|
|
self.should_stop = True |
|
|
self.logger.info("Stopping batch processing...") |
|
|
|
|
|
def cleanup(self): |
|
|
"""Clean up resources.""" |
|
|
self.stop_processing() |
|
|
self.executor.shutdown(wait=True) |
|
|
|
|
|
|
|
|
if self.config.cache_dir is None: |
|
|
shutil.rmtree(self.cache_dir, ignore_errors=True) |
|
|
|
|
|
self.logger.info("Batch processor cleanup complete") |
|
|
|
|
|
def get_status(self) -> Dict[str, Any]: |
|
|
"""Get current processing status.""" |
|
|
return { |
|
|
'is_processing': self.is_processing, |
|
|
'total_items': self.report.total_items, |
|
|
'processed_items': self.report.processed_items, |
|
|
'successful_items': self.report.successful_items, |
|
|
'failed_items': self.report.failed_items, |
|
|
'skipped_items': self.report.skipped_items, |
|
|
'current_item': self.current_item.id if self.current_item else None, |
|
|
'progress': (self.report.processed_items / self.report.total_items * 100 |
|
|
if self.report.total_items > 0 else 0), |
|
|
'estimated_time_remaining': self.time_estimator.estimate_remaining( |
|
|
self.report.processed_items, |
|
|
self.report.total_items |
|
|
) if self.is_processing else None |
|
|
} |