Spaces:
Running
on
Zero
Running
on
Zero
| # This file contains function for video or image collection preprocessing. | |
| # For video we do the preprocessing and select k sharpest frames. | |
| # Afterwards scene is constructed | |
| import cv2 | |
| import numpy as np | |
| from tqdm import tqdm | |
| import pycolmap | |
| import os | |
| import time | |
| import tempfile | |
| from moviepy import VideoFileClip | |
| from matplotlib import pyplot as plt | |
| from PIL import Image | |
| import cv2 | |
| from tqdm import tqdm | |
| WORKDIR = "../outputs/" | |
| def get_rotation_moviepy(video_path): | |
| clip = VideoFileClip(video_path) | |
| rotation = 0 | |
| try: | |
| displaymatrix = clip.reader.infos['inputs'][0]['streams'][2]['metadata'].get('displaymatrix', '') | |
| if 'rotation of' in displaymatrix: | |
| angle = float(displaymatrix.strip().split('rotation of')[-1].split('degrees')[0]) | |
| rotation = int(angle) % 360 | |
| except Exception as e: | |
| print(f"No displaymatrix rotation found: {e}") | |
| clip.reader.close() | |
| #if clip.audio: | |
| # clip.audio.reader.close_proc() | |
| return rotation | |
| def resize_max_side(frame, max_size): | |
| h, w = frame.shape[:2] | |
| scale = max_size / max(h, w) | |
| if scale < 1: | |
| frame = cv2.resize(frame, (int(w * scale), int(h * scale))) | |
| return frame | |
| def read_video_frames(video_input, k=1, max_size=1024): | |
| """ | |
| Extracts every k-th frame from a video or list of images, resizes to max size, and returns frames as list. | |
| Parameters: | |
| video_input (str, file-like, or list): Path to video file, file-like object, or list of image files. | |
| k (int): Interval for frame extraction (every k-th frame). | |
| max_size (int): Maximum size for width or height after resizing. | |
| Returns: | |
| frames (list): List of resized frames (numpy arrays). | |
| """ | |
| # Handle list of image files (not single video in a list) | |
| if isinstance(video_input, list): | |
| # If it's a single video in a list, treat it as video | |
| if len(video_input) == 1 and video_input[0].name.endswith(('.mp4', '.avi', '.mov')): | |
| video_input = video_input[0] # unwrap single video file | |
| else: | |
| # Treat as list of images | |
| frames = [] | |
| for img_file in video_input: | |
| img = Image.open(img_file.name).convert("RGB") | |
| img.thumbnail((max_size, max_size)) | |
| frames.append(np.array(img)[...,::-1]) | |
| return frames | |
| # Handle file-like or path | |
| if hasattr(video_input, 'name'): | |
| video_path = video_input.name | |
| elif isinstance(video_input, (str, os.PathLike)): | |
| video_path = str(video_input) | |
| else: | |
| raise ValueError("Unsupported video input type. Must be a filepath, file-like object, or list of images.") | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise ValueError(f"Error: Could not open video {video_path}.") | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| frame_count = 0 | |
| frames = [] | |
| with tqdm(total=total_frames // k, desc="Processing Video", unit="frame") as pbar: | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if frame_count % k == 0: | |
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| h, w = frame.shape[:2] | |
| scale = max(h, w) / max_size | |
| if scale > 1: | |
| frame = cv2.resize(frame, (int(w / scale), int(h / scale))) | |
| frames.append(frame[...,[2,1,0]]) | |
| pbar.update(1) | |
| frame_count += 1 | |
| cap.release() | |
| return frames | |
| def resize_max_side(frame, max_size): | |
| """ | |
| Resizes the frame so that its largest side equals max_size, maintaining aspect ratio. | |
| """ | |
| height, width = frame.shape[:2] | |
| max_dim = max(height, width) | |
| if max_dim <= max_size: | |
| return frame # No need to resize | |
| scale = max_size / max_dim | |
| new_width = int(width * scale) | |
| new_height = int(height * scale) | |
| resized_frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_AREA) | |
| return resized_frame | |
| def variance_of_laplacian(image): | |
| # compute the Laplacian of the image and then return the focus | |
| # measure, which is simply the variance of the Laplacian | |
| return cv2.Laplacian(image, cv2.CV_64F).var() | |
| def process_all_frames(IMG_FOLDER = '/scratch/datasets/hq_data/night2_all_frames', | |
| to_visualize=False, | |
| save_images=True): | |
| dict_scores = {} | |
| for idx, img_name in tqdm(enumerate(sorted([x for x in os.listdir(IMG_FOLDER) if '.png' in x]))): | |
| img = cv2.imread(os.path.join(IMG_FOLDER, img_name))#[250:, 100:] | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| fm = variance_of_laplacian(gray) + \ | |
| variance_of_laplacian(cv2.resize(gray, (0,0), fx=0.75, fy=0.75)) + \ | |
| variance_of_laplacian(cv2.resize(gray, (0,0), fx=0.5, fy=0.5)) + \ | |
| variance_of_laplacian(cv2.resize(gray, (0,0), fx=0.25, fy=0.25)) | |
| if to_visualize: | |
| plt.figure() | |
| plt.title(f"Laplacian score: {fm:.2f}") | |
| plt.imshow(img[..., [2,1,0]]) | |
| plt.show() | |
| dict_scores[idx] = {"idx" : idx, | |
| "img_name" : img_name, | |
| "score" : fm} | |
| if save_images: | |
| dict_scores[idx]["img"] = img | |
| return dict_scores | |
| def select_optimal_frames(scores, k): | |
| """ | |
| Selects a minimal subset of frames while ensuring no gaps exceed k. | |
| Args: | |
| scores (list of float): List of scores where index represents frame number. | |
| k (int): Maximum allowed gap between selected frames. | |
| Returns: | |
| list of int: Indices of selected frames. | |
| """ | |
| n = len(scores) | |
| selected = [0, n-1] | |
| i = 0 # Start at the first frame | |
| while i < n: | |
| # Find the best frame to select within the next k frames | |
| best_idx = max(range(i, min(i + k + 1, n)), key=lambda x: scores[x], default=None) | |
| if best_idx is None: | |
| break # No more frames left | |
| selected.append(best_idx) | |
| i = best_idx + k + 1 # Move forward, ensuring gaps stay within k | |
| return sorted(selected) | |
| def variance_of_laplacian(image): | |
| """ | |
| Compute the variance of Laplacian as a focus measure. | |
| """ | |
| return cv2.Laplacian(image, cv2.CV_64F).var() | |
| def preprocess_frames(frames, verbose=False): | |
| """ | |
| Compute sharpness scores for a list of frames using multi-scale Laplacian variance. | |
| Args: | |
| frames (list of np.ndarray): List of frames (BGR images). | |
| verbose (bool): If True, print scores. | |
| Returns: | |
| list of float: Sharpness scores for each frame. | |
| """ | |
| scores = [] | |
| for idx, frame in enumerate(tqdm(frames, desc="Scoring frames")): | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| fm = ( | |
| variance_of_laplacian(gray) + | |
| variance_of_laplacian(cv2.resize(gray, (0, 0), fx=0.75, fy=0.75)) + | |
| variance_of_laplacian(cv2.resize(gray, (0, 0), fx=0.5, fy=0.5)) + | |
| variance_of_laplacian(cv2.resize(gray, (0, 0), fx=0.25, fy=0.25)) | |
| ) | |
| if verbose: | |
| print(f"Frame {idx}: Sharpness Score = {fm:.2f}") | |
| scores.append(fm) | |
| return scores | |
| def select_optimal_frames(scores, k): | |
| """ | |
| Selects k frames by splitting into k segments and picking the sharpest frame from each. | |
| Args: | |
| scores (list of float): List of sharpness scores. | |
| k (int): Number of frames to select. | |
| Returns: | |
| list of int: Indices of selected frames. | |
| """ | |
| n = len(scores) | |
| selected_indices = [] | |
| segment_size = n // k | |
| for i in range(k): | |
| start = i * segment_size | |
| end = (i + 1) * segment_size if i < k - 1 else n # Last chunk may be larger | |
| segment_scores = scores[start:end] | |
| if len(segment_scores) == 0: | |
| continue # Safety check if some segment is empty | |
| best_in_segment = start + np.argmax(segment_scores) | |
| selected_indices.append(best_in_segment) | |
| return sorted(selected_indices) | |
| def save_frames_to_scene_dir(frames, scene_dir): | |
| """ | |
| Saves a list of frames into the target scene directory under 'images/' subfolder. | |
| Args: | |
| frames (list of np.ndarray): List of frames (BGR images) to save. | |
| scene_dir (str): Target path where 'images/' subfolder will be created. | |
| """ | |
| images_dir = os.path.join(scene_dir, "images") | |
| os.makedirs(images_dir, exist_ok=True) | |
| for idx, frame in enumerate(frames): | |
| filename = os.path.join(images_dir, f"{idx:08d}.png") # 00000000.png, 00000001.png, etc. | |
| cv2.imwrite(filename, frame) | |
| print(f"Saved {len(frames)} frames to {images_dir}") | |
| def run_colmap_on_scene(scene_dir): | |
| """ | |
| Runs feature extraction, matching, and mapping on all images inside scene_dir/images using pycolmap. | |
| Args: | |
| scene_dir (str): Path to scene directory containing 'images' folder. | |
| TODO: if the function hasn't managed to match all the frames either increase image size, | |
| increase number of features or just remove those frames from the folder scene_dir/images | |
| """ | |
| start_time = time.time() | |
| print(f"Running COLMAP pipeline on all images inside {scene_dir}") | |
| # Setup paths | |
| database_path = os.path.join(scene_dir, "database.db") | |
| sparse_path = os.path.join(scene_dir, "sparse") | |
| image_dir = os.path.join(scene_dir, "images") | |
| # Make sure output directories exist | |
| os.makedirs(sparse_path, exist_ok=True) | |
| # Step 1: Feature Extraction | |
| pycolmap.extract_features( | |
| database_path, | |
| image_dir, | |
| sift_options={ | |
| "max_num_features": 512 * 2, | |
| "max_image_size": 512 * 1, | |
| } | |
| ) | |
| print(f"Finished feature extraction in {(time.time() - start_time):.2f}s.") | |
| # Step 2: Feature Matching | |
| pycolmap.match_exhaustive(database_path) | |
| print(f"Finished feature matching in {(time.time() - start_time):.2f}s.") | |
| # Step 3: Mapping | |
| pipeline_options = pycolmap.IncrementalPipelineOptions() | |
| pipeline_options.min_num_matches = 15 | |
| pipeline_options.multiple_models = True | |
| pipeline_options.max_num_models = 50 | |
| pipeline_options.max_model_overlap = 20 | |
| pipeline_options.min_model_size = 10 | |
| pipeline_options.extract_colors = True | |
| pipeline_options.num_threads = 8 | |
| pipeline_options.mapper.init_min_num_inliers = 30 | |
| pipeline_options.mapper.init_max_error = 8.0 | |
| pipeline_options.mapper.init_min_tri_angle = 5.0 | |
| reconstruction = pycolmap.incremental_mapping( | |
| database_path=database_path, | |
| image_path=image_dir, | |
| output_path=sparse_path, | |
| options=pipeline_options, | |
| ) | |
| print(f"Finished incremental mapping in {(time.time() - start_time):.2f}s.") | |
| # Step 4: Post-process Cameras to SIMPLE_PINHOLE | |
| recon_path = os.path.join(sparse_path, "0") | |
| reconstruction = pycolmap.Reconstruction(recon_path) | |
| for cam in reconstruction.cameras.values(): | |
| cam.model = 'SIMPLE_PINHOLE' | |
| cam.params = cam.params[:3] # Keep only [f, cx, cy] | |
| reconstruction.write(recon_path) | |
| print(f"Total pipeline time: {(time.time() - start_time):.2f}s.") | |