""" Video utilities for visualization. """ import os import cv2 import numpy as np import subprocess from PIL import Image def video_to_image_frames(input_video_path, save_directory=None, frames_per_second=1): """ Extracts image frames from a video file at the specified frame rate and saves them as JPEG format. Supports regular video files, webcam captures, WebM files, and GIF files, including incomplete files. Args: input_video_path: Path to the input video file save_directory: Directory to save extracted frames (default: None) frames_per_second: Number of frames to extract per second (default: 1) Returns: List of file paths to extracted frames """ extracted_frame_paths = [] # For GIF files, use PIL library for better handling if input_video_path.lower().endswith('.gif'): try: print(f"Processing GIF file using PIL: {input_video_path}") with Image.open(input_video_path) as gif_img: # Get GIF properties frame_duration_ms = gif_img.info.get('duration', 100) # Duration per frame in milliseconds gif_frame_rate = 1000.0 / frame_duration_ms if frame_duration_ms > 0 else 10.0 # Convert to frame rate print(f"GIF properties: {gif_img.n_frames} frames, {gif_frame_rate:.2f} FPS, {frame_duration_ms}ms per frame") # Calculate sampling interval sampling_interval = max(1, int(gif_frame_rate / frames_per_second)) if frames_per_second < gif_frame_rate else 1 saved_count = 0 for current_frame_index in range(gif_img.n_frames): gif_img.seek(current_frame_index) # Sample frames based on desired frame rate if current_frame_index % sampling_interval == 0: # Convert to RGB format if necessary rgb_frame = gif_img.convert('RGB') # Convert PIL image to numpy array frame_ndarray = np.array(rgb_frame) # Save frame as JPEG format frame_output_path = os.path.join(save_directory, f"frame_{saved_count:06d}.jpg") pil_image = Image.fromarray(frame_ndarray) pil_image.save(frame_output_path, 'JPEG', quality=95) extracted_frame_paths.append(frame_output_path) saved_count += 1 if extracted_frame_paths: print(f"Successfully extracted {len(extracted_frame_paths)} frames from GIF using PIL") return extracted_frame_paths except Exception as error: print(f"PIL GIF extraction error: {str(error)}, falling back to OpenCV") # For WebM files, use FFmpeg directly for more stable processing if input_video_path.lower().endswith('.webm'): try: print(f"Processing WebM file using FFmpeg: {input_video_path}") # Create a unique output pattern for the frames output_frame_pattern = os.path.join(save_directory, "frame_%04d.jpg") # Use FFmpeg to extract frames at specified frame rate ffmpeg_command = [ "ffmpeg", "-i", input_video_path, "-vf", f"fps={frames_per_second}", # Specified frames per second "-q:v", "2", # High quality output_frame_pattern ] # Run FFmpeg process ffmpeg_process = subprocess.Popen( ffmpeg_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) process_stdout, process_stderr = ffmpeg_process.communicate() # Collect all extracted frames for filename in sorted(os.listdir(save_directory)): if filename.startswith("frame_") and filename.endswith(".jpg"): full_frame_path = os.path.join(save_directory, filename) extracted_frame_paths.append(full_frame_path) if extracted_frame_paths: print(f"Successfully extracted {len(extracted_frame_paths)} frames from WebM using FFmpeg") return extracted_frame_paths print("FFmpeg extraction failed, falling back to OpenCV") except Exception as error: print(f"FFmpeg extraction error: {str(error)}, falling back to OpenCV") # Standard OpenCV method for non-WebM files or as fallback try: video_capture = cv2.VideoCapture(input_video_path) # For WebM files, try setting more robust decoder options if input_video_path.lower().endswith('.webm'): video_capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'VP80')) source_fps = video_capture.get(cv2.CAP_PROP_FPS) extraction_interval = max(1, int(source_fps / frames_per_second)) # Extract at specified frame rate processed_frame_count = 0 # Set error mode to suppress console warnings cv2.setLogLevel(0) while True: read_success, current_frame = video_capture.read() if not read_success: break if processed_frame_count % extraction_interval == 0: try: # Additional check for valid frame data if current_frame is not None and current_frame.size > 0: rgb_converted_frame = cv2.cvtColor(current_frame, cv2.COLOR_BGR2RGB) frame_output_path = os.path.join(save_directory, f"frame_{processed_frame_count:06d}.jpg") cv2.imwrite(frame_output_path, cv2.cvtColor(rgb_converted_frame, cv2.COLOR_RGB2BGR)) extracted_frame_paths.append(frame_output_path) except Exception as error: print(f"Warning: Failed to process frame {processed_frame_count}: {str(error)}") processed_frame_count += 1 # Safety limit to prevent infinite loops if processed_frame_count > 1000: break video_capture.release() print(f"Extracted {len(extracted_frame_paths)} frames from video using OpenCV") except Exception as error: print(f"Error extracting frames: {str(error)}") return extracted_frame_paths