Spaces:
Running
Running
| import os | |
| import sys | |
| import torch | |
| import numpy as np | |
| from PIL import Image, ImageDraw, ImageFont | |
| import matplotlib.pyplot as plt | |
| import matplotlib.cm as cm | |
| from transformers import AutoProcessor, AutoModelForZeroShotObjectDetection | |
| import re | |
| import spacy | |
| from config import LOGS_DIR, OUTPUT_DIR | |
| from DepthEstimator import DepthEstimator | |
| from SoundMapper import SoundMapper | |
| from GenerateCaptions import generate_caption | |
| from GenerateCaptions import StreetSoundTextPipeline, ImageAnalyzer | |
| class ProcessVisualizer: | |
| def __init__(self, image_dir=LOGS_DIR, output_dir=None): | |
| self.image_dir = image_dir | |
| self.output_dir = output_dir if output_dir else os.path.join(OUTPUT_DIR, "visualizations") | |
| os.makedirs(self.output_dir, exist_ok=True) | |
| # Initialize components (but don't load models yet) | |
| self.depth_estimator = DepthEstimator(image_dir=self.image_dir) | |
| self.sound_mapper = SoundMapper() | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.dino = None | |
| self.dino_processor = None | |
| self.nlp = None | |
| # Create subdirectories for different visualization types | |
| self.dirs = { | |
| "bbox_original": os.path.join(self.output_dir, "bbox_original"), | |
| "bbox_depth": os.path.join(self.output_dir, "bbox_depth"), | |
| "depth_maps": os.path.join(self.output_dir, "depth_maps"), | |
| "combined": os.path.join(self.output_dir, "combined") | |
| } | |
| for dir_path in self.dirs.values(): | |
| os.makedirs(dir_path, exist_ok=True) | |
| def _load_nlp(self): | |
| if self.nlp is None: | |
| self.nlp = spacy.load("en_core_web_sm") | |
| return self.nlp | |
| def _load_dino(self): | |
| if self.dino is None: | |
| print("Loading DINO model...") | |
| self.dino = AutoModelForZeroShotObjectDetection.from_pretrained("IDEA-Research/grounding-dino-base").to(self.device) | |
| self.dino_processor = AutoProcessor.from_pretrained("IDEA-Research/grounding-dino-base") | |
| else: | |
| self.dino = self.dino.to(self.device) | |
| return self.dino, self.dino_processor | |
| def _unload_dino(self): | |
| if self.dino is not None: | |
| self.dino = self.dino.to("cpu") | |
| torch.cuda.empty_cache() | |
| def detect_nouns(self, caption_text): | |
| """Extract nouns from caption text for object detection""" | |
| print("Detecting nouns in caption...") | |
| nlp = self._load_nlp() | |
| all_nouns = [] | |
| # Extract nouns from sound source descriptions | |
| pattern = r'\d+\.\s+\*\*([^:]+)\*\*:' | |
| sources = re.findall(pattern, caption_text) | |
| for source in sources: | |
| clean_source = re.sub(r'sounds?|noise[s]?', '', source, flags=re.IGNORECASE).strip() | |
| if clean_source: | |
| source_doc = nlp(clean_source) | |
| for token in source_doc: | |
| if token.pos_ == "NOUN" and len(token.text) > 1: | |
| all_nouns.append(token.text.lower()) | |
| # Extract nouns from general text | |
| clean_caption = re.sub(r'[*()]', '', caption_text).strip() | |
| clean_caption = re.sub(r'##\w+', '', clean_caption) | |
| clean_caption = re.sub(r'\s+', ' ', clean_caption).strip() | |
| doc = nlp(clean_caption) | |
| for token in doc: | |
| if token.pos_ == "NOUN" and len(token.text) > 1: | |
| if token.text[0].isalpha(): | |
| all_nouns.append(token.text.lower()) | |
| matches = sorted(set(all_nouns)) | |
| print(f"Detected nouns: {matches}") | |
| return matches | |
| def detect_objects(self, image_path, caption_text): | |
| """Detect objects in image based on nouns from caption""" | |
| print(f"Processing image: {image_path}") | |
| # Extract nouns from caption | |
| nouns = self.detect_nouns(caption_text) | |
| if not nouns: | |
| print("No nouns detected in caption.") | |
| return None, None | |
| # Load image | |
| image = Image.open(image_path) | |
| # Load DINO model | |
| self.dino, self.dino_processor = self._load_dino() | |
| # Filter nouns | |
| filtered_nouns = [] | |
| for noun in nouns: | |
| if '##' not in noun and len(noun) > 1 and noun[0].isalpha(): | |
| filtered_nouns.append(noun) | |
| # Create text prompt for DINO | |
| text_prompt = " . ".join(filtered_nouns) | |
| print(f"Using text prompt for DINO: {text_prompt}") | |
| # Process image with DINO | |
| inputs = self.dino_processor(images=image, text=text_prompt, return_tensors="pt").to(self.device) | |
| with torch.no_grad(): | |
| outputs = self.dino(**inputs) | |
| results = self.dino_processor.post_process_grounded_object_detection( | |
| outputs, | |
| inputs.input_ids, | |
| box_threshold=0.25, | |
| text_threshold=0.25, | |
| target_sizes=[image.size[::-1]] | |
| ) | |
| # Clean up to save memory | |
| self._unload_dino() | |
| del inputs, outputs | |
| torch.cuda.empty_cache() | |
| # Process results | |
| result = results[0] | |
| labels = result["labels"] | |
| scores = result["scores"] | |
| bboxes = result["boxes"] | |
| # Clean labels | |
| clean_labels = [] | |
| for label in labels: | |
| clean_label = re.sub(r'##\w+', '', label) | |
| clean_labels.append(clean_label) | |
| print(f"Detected {len(clean_labels)} objects: {list(zip(clean_labels, scores.tolist()))}") | |
| return clean_labels, bboxes | |
| def estimate_depth(self): | |
| """Generate depth maps for all images in the directory""" | |
| print("Estimating depth for all images...") | |
| depth_maps = self.depth_estimator.estimate_depth(self.image_dir) | |
| # Convert depth maps to normalized grayscale for visualization | |
| normalized_maps = [] | |
| img_paths = [os.path.join(self.image_dir, f) for f in os.listdir(self.image_dir) | |
| if f.endswith(('.jpg', '.jpeg', '.png'))] | |
| for i, item in enumerate(depth_maps): | |
| depth_map = item["depth"] | |
| depth_array = np.array(depth_map) | |
| normalization = depth_array / 255.0 | |
| # Associate source path with depth map | |
| source_path = img_paths[i] if i < len(img_paths) else f"depth_{i}.jpg" | |
| filename = os.path.basename(source_path) | |
| # Save grayscale depth map | |
| depth_path = os.path.join(self.dirs["depth_maps"], f"depth_{filename}") | |
| depth_map.save(depth_path) | |
| normalized_maps.append({ | |
| "original": depth_map, | |
| "normalization": normalization, | |
| "path": depth_path, | |
| "source_path": source_path | |
| }) | |
| return normalized_maps | |
| def create_histogram_depth_zones(self, depth_map, num_zones=3): | |
| """Create depth zones based on histogram of depth values""" | |
| hist, bin_edge = np.histogram(depth_map.flatten(), bins=50, range=(0, 1)) | |
| cumulative = np.cumsum(hist) / np.sum(hist) | |
| thresholds = [0.0] | |
| for i in range(1, num_zones): | |
| target = i / num_zones | |
| idx = np.argmin(np.abs(cumulative - target)) | |
| thresholds.append(bin_edge[idx + 1]) | |
| thresholds.append(1.0) | |
| return thresholds | |
| def get_depth_zone(self, bbox, depth_map, num_zones=3): | |
| """Determine depth zone for a given bounding box""" | |
| x1, y1, x2, y2 = [int(coord) for coord in bbox] | |
| # Adjust for image dimensions | |
| height, width = depth_map.shape | |
| x1, y1 = max(0, x1), max(0, y1) | |
| x2, y2 = min(width, x2), min(height, y2) | |
| # Extract depth ROI | |
| depth_roi = depth_map[y1:y2, x1:x2] | |
| if depth_roi.size == 0: | |
| return num_zones - 1, 1.0 # Default to farthest zone | |
| # Calculate mean depth | |
| mean_depth = np.mean(depth_roi) | |
| # Determine zone | |
| thresholds = self.create_histogram_depth_zones(depth_map, num_zones) | |
| zone = 0 | |
| for i in range(num_zones): | |
| if thresholds[i] <= mean_depth < thresholds[i+1]: | |
| zone = i | |
| break | |
| weight = 1.0 - mean_depth # Higher weight for closer objects | |
| return zone, mean_depth | |
| def draw_bounding_boxes(self, image, labels, bboxes, scores=None, depth_zones=None): | |
| """Draw bounding boxes on image with depth zone information""" | |
| draw = ImageDraw.Draw(image) | |
| # Try to get a font, fallback to default if not available | |
| try: | |
| font = ImageFont.truetype("arial.ttf", 16) | |
| except IOError: | |
| try: | |
| font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 16) | |
| except: | |
| font = ImageFont.load_default() | |
| # Store colors as a class attribute for access in modified versions | |
| self.zone_colors = { | |
| 0: (255, 50, 50), # Bright red for near | |
| 1: (255, 180, 0), # Orange for medium | |
| 2: (50, 255, 50) # Bright green for far | |
| } | |
| for i, (label, bbox) in enumerate(zip(labels, bboxes)): | |
| x1, y1, x2, y2 = [int(coord) for coord in bbox] | |
| # Get color based on depth zone if available | |
| if depth_zones is not None and i < len(depth_zones): | |
| zone, depth = depth_zones[i] | |
| color = self.zone_colors.get(zone, (0, 0, 255)) | |
| zone_text = ["near", "medium", "far"][zone] | |
| label_text = f"{depth:.2f}" | |
| else: | |
| color = (255, 50, 50) # Default bright red | |
| label_text = label | |
| # Add score if available | |
| if scores is not None and i < len(scores): | |
| label_text += f" {scores[i]:.2f}" | |
| # Draw bounding box with thick border for better visibility | |
| draw.rectangle([x1, y1, x2, y2], outline=color, width=3) | |
| # Calculate text size more reliably | |
| if hasattr(draw, 'textsize'): | |
| text_size = draw.textsize(label_text, font=font) | |
| else: | |
| # Fallback sizing when textsize is not available | |
| text_width = len(label_text) * 8 # Approximate 8 pixels per character | |
| text_height = 20 # Approximate height for readability | |
| text_size = (text_width, text_height) | |
| # Draw label background with margin | |
| margin = 2 | |
| text_box = [ | |
| x1 - margin, | |
| y1 - text_size[1] - margin, | |
| x1 + text_size[0] + margin, | |
| y1 + margin | |
| ] | |
| draw.rectangle(text_box, fill=color) | |
| # Draw label text | |
| draw.text((x1, y1 - text_size[1]), label_text, fill=(255, 255, 255), font=font) | |
| return image | |
| def create_depth_map_visualization(self, depth_map, use_grayscale=True): | |
| """Create a visualization of the depth map | |
| Args: | |
| depth_map: Normalized depth map array | |
| use_grayscale: If True, creates grayscale image; otherwise, uses colored heatmap | |
| Returns: | |
| PIL Image with depth visualization | |
| """ | |
| # Normalize depth map to [0, 1] | |
| normalized_depth = depth_map.copy() | |
| if use_grayscale: | |
| # Convert to grayscale (multiplying by 255 for better visibility) | |
| grayscale = (normalized_depth * 255).astype(np.uint8) | |
| # Convert to RGB for consistent processing with bounding box drawing | |
| depth_img = Image.fromarray(grayscale).convert('RGB') | |
| else: | |
| # Apply colormap (jet) | |
| colored_depth = (cm.jet(normalized_depth) * 255).astype(np.uint8) | |
| # Convert to PIL Image (RGB) | |
| depth_img = Image.fromarray(colored_depth[:, :, :3]) | |
| return depth_img | |
| def process_images(self, lat=None, lon=None, single_view=None, save_with_heatmap=False): | |
| """ | |
| Process all images in the directory or a single view | |
| Args: | |
| lat: Latitude for caption generation | |
| lon: Longitude for caption generation | |
| single_view: Process only specified view if provided | |
| save_with_heatmap: If True, also saves depth maps as colored heatmaps | |
| """ | |
| # Get image paths | |
| if single_view: | |
| image_paths = [os.path.join(self.image_dir, f"{single_view}.jpg")] | |
| else: | |
| image_paths = [os.path.join(self.image_dir, f) for f in os.listdir(self.image_dir) | |
| if f.endswith(('.jpg', '.jpeg', '.png'))] | |
| if not image_paths: | |
| print(f"No images found in {self.image_dir}") | |
| return | |
| # Generate depth maps | |
| depth_maps = self.estimate_depth() | |
| # Process each image | |
| for i, image_path in enumerate(image_paths): | |
| image_basename = os.path.basename(image_path) | |
| view_name = os.path.splitext(image_basename)[0] | |
| print(f"\nProcessing {view_name} view ({i+1}/{len(image_paths)})...") | |
| # Generate caption if coordinates are provided | |
| caption_text = None | |
| analyzer = ImageAnalyzer() | |
| caption_text = analyzer.analyze_image(image_path) | |
| if lat and lon: | |
| view_result = generate_caption(lat, lon, view=view_name, panoramic=False) | |
| if view_result: | |
| caption_text = view_result.get("sound_description", "") | |
| print(f"Generated caption: {caption_text}") | |
| # Skip if no caption and lat/lon were provided | |
| if lat and lon and not caption_text: | |
| print(f"Failed to generate caption for {image_path}, skipping.") | |
| continue | |
| # Detect objects based on caption | |
| if caption_text: | |
| labels, bboxes = self.detect_objects(image_path, caption_text) | |
| else: | |
| # If no caption provided, use generic object detection | |
| print("No caption provided, using predefined nouns for detection...") | |
| generic_nouns = ["car", "person", "tree", "building", "road", "sign", "window", "door"] | |
| labels, bboxes = self.detect_objects(image_path, " ".join(generic_nouns)) | |
| if len(labels) == 0 or len(bboxes)==0: | |
| print(f"No objects detected in {image_path}, skipping.") | |
| continue | |
| # Find matching depth map | |
| depth_map_idx = next((idx for idx, data in enumerate(depth_maps) | |
| if os.path.basename(image_path) == os.path.basename(data.get("source_path", ""))), i % len(depth_maps)) | |
| depth_map = depth_maps[depth_map_idx]["normalization"] | |
| # Get depth zones for each detected object | |
| depth_zones = [] | |
| for bbox in bboxes: | |
| zone, mean_depth = self.get_depth_zone(bbox, depth_map) | |
| depth_zones.append((zone, mean_depth)) | |
| # Load and process original image | |
| original_img = Image.open(image_path).convert("RGB") | |
| bbox_img = original_img.copy() | |
| # Draw bounding boxes on original image | |
| bbox_img = self.draw_bounding_boxes(bbox_img, labels, bboxes, depth_zones=depth_zones) | |
| # Save image with bounding boxes | |
| bbox_path = os.path.join(self.dirs["bbox_original"], f"bbox_{image_basename}") | |
| bbox_img.save(bbox_path) | |
| print(f"Saved bounding boxes on original image: {bbox_path}") | |
| # Create grayscale depth map for better visibility of bounding boxes | |
| depth_vis = self.create_depth_map_visualization(depth_map, use_grayscale=True) | |
| # Draw bounding boxes on depth map visualization | |
| depth_bbox_img = depth_vis.copy() | |
| depth_bbox_img = self.draw_bounding_boxes(depth_bbox_img, labels, bboxes, depth_zones=depth_zones) | |
| # Draw bounding boxes directly on the original depth map | |
| # Load the saved grayscale depth map | |
| original_depth_path = depth_maps[depth_map_idx]["path"] | |
| original_depth_img = Image.open(original_depth_path).convert('RGB') | |
| # Draw boxes on the original depth map | |
| original_depth_bbox = original_depth_img.copy() | |
| original_depth_bbox = self.draw_bounding_boxes(original_depth_bbox, labels, bboxes, depth_zones=depth_zones) | |
| # Save the original depth map with bounding boxes | |
| original_depth_bbox_path = os.path.join(self.dirs["bbox_depth"], f"orig_depth_bbox_{image_basename}") | |
| original_depth_bbox.save(original_depth_bbox_path) | |
| print(f"Saved bounding boxes on original depth map: {original_depth_bbox_path}") | |
| # Save depth map with bounding boxes | |
| depth_bbox_path = os.path.join(self.dirs["bbox_depth"], f"depth_bbox_{image_basename}") | |
| depth_bbox_img.save(depth_bbox_path) | |
| print(f"Saved bounding boxes on depth map: {depth_bbox_path}") | |
| # Also save colored heatmap version if requested | |
| if save_with_heatmap: | |
| # Create a heatmap depth visualization | |
| depth_heatmap = self.create_depth_map_visualization(depth_map, use_grayscale=False) | |
| depth_heatmap_bbox = depth_heatmap.copy() | |
| depth_heatmap_bbox = self.draw_bounding_boxes(depth_heatmap_bbox, labels, bboxes, depth_zones=depth_zones) | |
| # Save heatmap version | |
| heatmap_path = os.path.join(self.dirs["bbox_depth"], f"heatmap_bbox_{image_basename}") | |
| depth_heatmap_bbox.save(heatmap_path) | |
| print(f"Saved bounding boxes on depth heatmap: {heatmap_path}") | |
| # Create combined visualization | |
| # Create a 2x1 grid showing original with bboxes and original depth with bboxes | |
| combined_width = original_img.width * 2 | |
| combined_height = original_img.height | |
| combined_img = Image.new('RGB', (combined_width, combined_height)) | |
| # Paste images | |
| combined_img.paste(bbox_img, (0, 0)) | |
| combined_img.paste(original_depth_bbox, (original_img.width, 0)) | |
| # Save combined image | |
| combined_path = os.path.join(self.dirs["combined"], f"combined_{image_basename}") | |
| combined_img.save(combined_path) | |
| print(f"Saved combined visualization: {combined_path}") | |
| print("\nVisualization process complete!") | |
| print(f"Results saved in {self.output_dir}") | |
| def cleanup(self): | |
| """Clean up resources""" | |
| if hasattr(self, 'depth_estimator'): | |
| self.depth_estimator._unload_model() | |
| if self.dino is not None: | |
| self.dino = self.dino.to("cpu") | |
| del self.dino | |
| self.dino = None | |
| if self.nlp is not None: | |
| del self.nlp | |
| self.nlp = None | |
| torch.cuda.empty_cache() | |
| def main(): | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Visualize intermediate steps of the Street Sound Pipeline") | |
| parser.add_argument("--image_dir", type=str, default=LOGS_DIR, help="Directory containing input images") | |
| parser.add_argument("--output_dir", type=str, default=None, help="Directory for output visualizations") | |
| parser.add_argument("--location", type=str, default=None, help='Location in format "latitude,longitude" (e.g., "40.7128,-74.0060")') | |
| parser.add_argument("--view", type=str, default=None, choices=["front", "back", "left", "right"], help="Process only the specified view") | |
| parser.add_argument("--skip_caption", action="store_true", help="Skip caption generation and use generic noun list") | |
| parser.add_argument("--save_heatmap", action="store_true", help="Also save depth maps as colored heatmaps with bounding boxes") | |
| parser.add_argument("--box_width", type=int, default=3, help="Width of bounding box lines") | |
| args = parser.parse_args() | |
| # Parse location if provided | |
| lat, lon = None, None | |
| if args.location and not args.skip_caption: | |
| try: | |
| lat, lon = map(float, args.location.split(",")) | |
| except ValueError: | |
| print("Error: Location must be in format 'latitude,longitude'") | |
| return | |
| # Initialize visualizer | |
| visualizer = ProcessVisualizer(image_dir=args.image_dir, output_dir=args.output_dir) | |
| # Set box width if provided | |
| if args.box_width != 3: | |
| draw_bounding_boxes_orig = visualizer.draw_bounding_boxes | |
| def draw_bounding_boxes_with_width(*args, **kwargs): | |
| draw = ImageDraw.Draw(args[0]) | |
| for i, (label, bbox) in enumerate(zip(args[1], args[2])): | |
| x1, y1, x2, y2 = [int(coord) for coord in bbox] | |
| depth_zones = kwargs.get('depth_zones') | |
| if depth_zones is not None and i < len(depth_zones): | |
| zone, depth = depth_zones[i] | |
| color = draw_bounding_boxes_orig.zone_colors.get(zone, (0, 0, 255)) | |
| else: | |
| color = (255, 0, 0) | |
| draw.rectangle([x1, y1, x2, y2], outline=color, width=args.box_width) | |
| return draw_bounding_boxes_orig(*args, **kwargs) | |
| visualizer.draw_bounding_boxes = draw_bounding_boxes_with_width | |
| try: | |
| # Process images | |
| visualizer.process_images(lat=lat, lon=lon, single_view=args.view, save_with_heatmap=args.save_heatmap) | |
| finally: | |
| # Clean up resources | |
| visualizer.cleanup() | |
| if __name__ == "__main__": | |
| main() |