|
|
import os |
|
|
import tempfile |
|
|
import numpy as np |
|
|
import cv2 |
|
|
from pathlib import Path |
|
|
import logging |
|
|
from transformers import DepthProImageProcessorFast, DepthProForDepthEstimation |
|
|
import torch |
|
|
from PIL import Image |
|
|
from fastapi import FastAPI, File, UploadFile, Form, HTTPException |
|
|
from fastapi.responses import JSONResponse, HTMLResponse |
|
|
from typing import Any, Dict, List, Tuple, Union |
|
|
import pillow_heif |
|
|
import json |
|
|
|
|
|
from depth_pro.utils import load_rgb, extract_exif |
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="Depth Pro Distance Estimation", |
|
|
description="Estimate distance and depth using Apple's Depth Pro model", |
|
|
version="1.0.0", |
|
|
docs_url="/docs", |
|
|
redoc_url="/redoc" |
|
|
) |
|
|
|
|
|
|
|
|
device = 'cpu' |
|
|
|
|
|
def initialize_depth_pipeline(): |
|
|
"""Initialize the Depth Pro pipeline""" |
|
|
try: |
|
|
print("Initializing Depth Pro pipeline...") |
|
|
image_processor = DepthProImageProcessorFast.from_pretrained("apple/DepthPro-hf") |
|
|
model = DepthProForDepthEstimation.from_pretrained("apple/DepthPro-hf").to(device) |
|
|
|
|
|
return model, image_processor |
|
|
except Exception as e: |
|
|
print(f"Error initializing pipeline: {e}") |
|
|
print("Falling back to dummy pipeline...") |
|
|
return None |
|
|
|
|
|
|
|
|
class DepthEstimator: |
|
|
def __init__(self, model=None, image_processor=None): |
|
|
self.device = torch.device('cpu') |
|
|
print("Initializing Depth Pro estimator...") |
|
|
self.model = model |
|
|
self.image_processor = image_processor |
|
|
print("Depth Pro estimator initialized successfully!") |
|
|
|
|
|
def estimate_depth(self, image_path): |
|
|
try: |
|
|
|
|
|
image = Image.open(image_path) |
|
|
|
|
|
|
|
|
resized_image, new_size = self.resize_image(image_path) |
|
|
|
|
|
rgb_image = load_rgb(resized_image.name) |
|
|
f_px = rgb_image[-1] |
|
|
eval_image = rgb_image[0] |
|
|
|
|
|
inputs = self.image_processor(eval_image, return_tensors="pt").to(self.device) |
|
|
with torch.no_grad(): |
|
|
outputs = self.model(**inputs) |
|
|
post_processed_output = self.image_processor.post_process_depth_estimation( |
|
|
outputs, target_sizes=[(new_size[1], new_size[0])], |
|
|
) |
|
|
result = post_processed_output[0] |
|
|
field_of_view = result["field_of_view"] |
|
|
focal_length = result["focal_length"] |
|
|
depth = result["predicted_depth"] |
|
|
|
|
|
|
|
|
if isinstance(depth, torch.Tensor): |
|
|
depth = depth.detach().cpu().numpy() |
|
|
elif not isinstance(depth, np.ndarray): |
|
|
depth = np.array(depth) |
|
|
|
|
|
|
|
|
print(f_px,focal_length) |
|
|
|
|
|
|
|
|
return depth, new_size, focal_length |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error in depth estimation: {e}") |
|
|
return None, None, None |
|
|
|
|
|
def resize_image(self, image_path, max_size=1536): |
|
|
with Image.open(image_path) as img: |
|
|
ratio = max_size / max(img.size) |
|
|
new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio)) |
|
|
img = img.resize(new_size, Image.Resampling.LANCZOS) |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_file: |
|
|
img.save(temp_file, format="PNG") |
|
|
return temp_file, new_size |
|
|
|
|
|
|
|
|
def find_topmost_pixel(mask): |
|
|
'''Top Pixel from footpath mask''' |
|
|
footpath_pixels = np.where(mask > 0) |
|
|
if len(footpath_pixels[0]) == 0: |
|
|
return None |
|
|
min_y = np.min(footpath_pixels[0]) |
|
|
top_pixels_mask = footpath_pixels[0] == min_y |
|
|
top_x_coords = footpath_pixels[1][top_pixels_mask] |
|
|
center_idx = len(top_x_coords) // 2 |
|
|
return (min_y, top_x_coords[center_idx]) |
|
|
|
|
|
def find_bottommost_footpath_pixel(mask, topmost_pixel): |
|
|
"""Find the bottommost pixel perpendicular to the topmost pixel within the mask""" |
|
|
if topmost_pixel is None: |
|
|
return None |
|
|
|
|
|
top_y, top_x = topmost_pixel |
|
|
|
|
|
|
|
|
mask_y_coords, mask_x_coords = np.where(mask > 0) |
|
|
column_mask = mask_x_coords == top_x |
|
|
column_y_coords = mask_y_coords[column_mask] |
|
|
|
|
|
if len(column_y_coords) == 0: |
|
|
|
|
|
footpath_pixels = np.where(mask > 0) |
|
|
if len(footpath_pixels[0]) == 0: |
|
|
return None |
|
|
max_y = np.max(footpath_pixels[0]) |
|
|
bottom_pixels_mask = footpath_pixels[0] == max_y |
|
|
bottom_x_coords = footpath_pixels[1][bottom_pixels_mask] |
|
|
center_idx = len(bottom_x_coords) // 2 |
|
|
return (max_y, bottom_x_coords[center_idx]) |
|
|
|
|
|
|
|
|
max_y_in_column = np.max(column_y_coords) |
|
|
return (max_y_in_column, top_x) |
|
|
|
|
|
|
|
|
def estimate_real_world_distance(depth_map, topmost_pixel, mask): |
|
|
"""Estimate real-world distance between two pixels using depth information""" |
|
|
|
|
|
if topmost_pixel is None or depth_map is None: |
|
|
return None |
|
|
|
|
|
|
|
|
bottommost_pixel = find_bottommost_footpath_pixel(mask, topmost_pixel) |
|
|
|
|
|
if bottommost_pixel is None: |
|
|
return None |
|
|
|
|
|
top_y, top_x = topmost_pixel |
|
|
bottom_y, bottom_x = bottommost_pixel |
|
|
|
|
|
|
|
|
if (top_y >= depth_map.shape[0] or top_x >= depth_map.shape[1] or |
|
|
bottom_y >= depth_map.shape[0] or bottom_x >= depth_map.shape[1]): |
|
|
return None |
|
|
|
|
|
topmost_depth = depth_map[top_y, top_x] |
|
|
bottommost_depth = depth_map[bottom_y, bottom_x] |
|
|
|
|
|
|
|
|
if np.isnan(topmost_depth) or np.isnan(bottommost_depth): |
|
|
print("Invalid depth values (NaN) found") |
|
|
return None |
|
|
|
|
|
distance_meters = float(topmost_depth - bottommost_depth) |
|
|
|
|
|
print(f"Distance calculation:") |
|
|
print(f" Topmost pixel: ({top_y}, {top_x}) = {topmost_depth:.3f}m") |
|
|
print(f" Bottommost pixel: ({bottom_y}, {bottom_x}) = {bottommost_depth:.3f}m") |
|
|
print(f" Distance: {distance_meters:.3f}m") |
|
|
|
|
|
return distance_meters |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("Initializing Depth Pro pipeline...") |
|
|
depth_model, image_processor = initialize_depth_pipeline() |
|
|
depth_estimator = DepthEstimator(depth_model, image_processor) |
|
|
|
|
|
@app.get("/health") |
|
|
async def health_check(): |
|
|
"""Health check endpoint for Docker""" |
|
|
return {"status": "healthy", "service": "Depth Pro Distance Estimation"} |
|
|
|
|
|
@app.get("/api") |
|
|
async def api_info(): |
|
|
"""API information endpoint""" |
|
|
return { |
|
|
"message": "Depth Pro Distance Estimation API", |
|
|
"docs": "/docs", |
|
|
"health": "/health", |
|
|
"estimate_endpoint": "/estimate-depth" |
|
|
} |
|
|
|
|
|
@app.post("/estimate-depth") |
|
|
async def estimate_depth_endpoint(file: UploadFile = File(...), mask: UploadFile = File(...)): |
|
|
"""FastAPI endpoint for depth estimation and distance calculation""" |
|
|
try: |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: |
|
|
content = await file.read() |
|
|
temp_file.write(content) |
|
|
temp_file_path = temp_file.name |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as mtemp_file: |
|
|
content = await mask.read() |
|
|
mtemp_file.write(content) |
|
|
temp_file_path_mask = mtemp_file.name |
|
|
|
|
|
|
|
|
image = cv2.imread(temp_file_path) |
|
|
mask = cv2.imread(temp_file_path_mask) |
|
|
if image is None or mask is None: |
|
|
return JSONResponse( |
|
|
status_code=400, |
|
|
content={"error": "Could not load image or mask"} |
|
|
) |
|
|
|
|
|
|
|
|
depth_map, new_size, focal_length_px = depth_estimator.estimate_depth(temp_file_path) |
|
|
|
|
|
if depth_map is None: |
|
|
return JSONResponse( |
|
|
status_code=500, |
|
|
content={"error": "Depth estimation failed"} |
|
|
) |
|
|
|
|
|
|
|
|
resized_image = cv2.resize(image, new_size) |
|
|
resized_mask = cv2.resize(mask, new_size) |
|
|
|
|
|
|
|
|
if len(resized_mask.shape) == 3: |
|
|
resized_mask = cv2.cvtColor(resized_mask, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
|
|
|
topmost_pixel = find_topmost_pixel(resized_mask) |
|
|
|
|
|
|
|
|
distance_meters = estimate_real_world_distance(depth_map, topmost_pixel, resized_mask) |
|
|
|
|
|
|
|
|
os.unlink(temp_file_path) |
|
|
os.unlink(temp_file_path_mask) |
|
|
|
|
|
result = { |
|
|
"depth_map_shape": depth_map.shape, |
|
|
"focal_length_px": float(focal_length_px) if focal_length_px is not None else None, |
|
|
"topmost_pixel": [ int(topmost_pixel[0]), int(topmost_pixel[1])] if topmost_pixel else None, |
|
|
"distance_meters": distance_meters, |
|
|
"depth_stats": { |
|
|
"min_depth": float(np.min(depth_map)), |
|
|
"max_depth": float(np.max(depth_map)), |
|
|
"mean_depth": float(np.mean(depth_map)) |
|
|
} |
|
|
} |
|
|
|
|
|
return JSONResponse(content=result) |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
if 'temp_file_path' in locals(): |
|
|
try: |
|
|
os.unlink(temp_file_path) |
|
|
except: |
|
|
pass |
|
|
if 'temp_file_path_mask' in locals(): |
|
|
try: |
|
|
os.unlink(temp_file_path_mask) |
|
|
except: |
|
|
pass |
|
|
return JSONResponse( |
|
|
status_code=500, |
|
|
content={"error": str(e)} |
|
|
) |
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
|
async def root(): |
|
|
"""Root endpoint with simple HTML interface""" |
|
|
html_content = """ |
|
|
<!DOCTYPE html> |
|
|
<html> |
|
|
<head> |
|
|
<title>Depth Pro Distance Estimation</title> |
|
|
<style> |
|
|
body { |
|
|
font-family: Arial, sans-serif; |
|
|
max-width: 800px; |
|
|
margin: 0 auto; |
|
|
padding: 20px; |
|
|
background-color: #f5f5f5; |
|
|
} |
|
|
.container { |
|
|
background-color: white; |
|
|
padding: 30px; |
|
|
border-radius: 10px; |
|
|
box-shadow: 0 2px 10px rgba(0,0,0,0.1); |
|
|
} |
|
|
h1 { |
|
|
color: #2c3e50; |
|
|
text-align: center; |
|
|
margin-bottom: 10px; |
|
|
} |
|
|
.subtitle { |
|
|
text-align: center; |
|
|
color: #7f8c8d; |
|
|
margin-bottom: 30px; |
|
|
} |
|
|
.upload-section { |
|
|
border: 2px dashed #3498db; |
|
|
border-radius: 10px; |
|
|
padding: 30px; |
|
|
text-align: center; |
|
|
margin: 20px 0; |
|
|
background-color: #ecf0f1; |
|
|
} |
|
|
input[type="file"] { |
|
|
margin: 10px 0; |
|
|
padding: 10px; |
|
|
border: 1px solid #bdc3c7; |
|
|
border-radius: 5px; |
|
|
} |
|
|
.file-group { |
|
|
margin: 20px 0; |
|
|
} |
|
|
.file-label { |
|
|
display: block; |
|
|
margin-bottom: 8px; |
|
|
font-weight: bold; |
|
|
color: #2c3e50; |
|
|
} |
|
|
button { |
|
|
background-color: #3498db; |
|
|
color: white; |
|
|
padding: 12px 25px; |
|
|
border: none; |
|
|
border-radius: 5px; |
|
|
cursor: pointer; |
|
|
font-size: 16px; |
|
|
} |
|
|
button:hover { |
|
|
background-color: #2980b9; |
|
|
} |
|
|
.results { |
|
|
margin-top: 20px; |
|
|
padding: 20px; |
|
|
border-radius: 5px; |
|
|
background-color: #e8f5e8; |
|
|
display: none; |
|
|
} |
|
|
.error { |
|
|
background-color: #ffeaa7; |
|
|
border-left: 4px solid #fdcb6e; |
|
|
padding: 10px; |
|
|
margin: 10px 0; |
|
|
} |
|
|
.endpoint-info { |
|
|
background-color: #74b9ff; |
|
|
color: white; |
|
|
padding: 15px; |
|
|
border-radius: 5px; |
|
|
margin: 20px 0; |
|
|
} |
|
|
.feature { |
|
|
margin: 10px 0; |
|
|
padding: 10px; |
|
|
border-left: 3px solid #3498db; |
|
|
background-color: #f8f9fa; |
|
|
} |
|
|
</style> |
|
|
</head> |
|
|
<body> |
|
|
<div class="container"> |
|
|
<h1>π Depth Pro Distance Estimation</h1> |
|
|
<p class="subtitle">Upload an image and a footpath mask to estimate depth and calculate distances using Apple's Depth Pro model</p> |
|
|
|
|
|
<div class="upload-section"> |
|
|
<h3>Upload Image and Mask</h3> |
|
|
<form id="uploadForm" enctype="multipart/form-data"> |
|
|
<div style="margin: 20px 0;"> |
|
|
<label for="imageFile" style="display: block; margin-bottom: 5px; font-weight: bold;">πΈ Main Image:</label> |
|
|
<input type="file" id="imageFile" name="file" accept="image/*" required style="width: 100%;"> |
|
|
</div> |
|
|
<div style="margin: 20px 0;"> |
|
|
<label for="maskFile" style="display: block; margin-bottom: 5px; font-weight: bold;">π Footpath Mask:</label> |
|
|
<input type="file" id="maskFile" name="mask" accept="image/*" required style="width: 100%;"> |
|
|
</div> |
|
|
<button type="submit">Analyze Image with Mask</button> |
|
|
</form> |
|
|
|
|
|
<div id="results" class="results"> |
|
|
<h3>Analysis Results:</h3> |
|
|
<div id="resultsContent"></div> |
|
|
</div> |
|
|
</div> |
|
|
|
|
|
<div class="endpoint-info"> |
|
|
<h3>π API Endpoints</h3> |
|
|
<p><strong>POST /estimate-depth</strong> - Upload image and footpath mask for depth estimation</p> |
|
|
<p><strong>GET /docs</strong> - API documentation</p> |
|
|
<p><strong>GET /health</strong> - Health check</p> |
|
|
</div> |
|
|
|
|
|
<div class="feature"> |
|
|
<h3>β¨ Features</h3> |
|
|
<ul> |
|
|
<li>π― Monocular depth estimation using Depth Pro</li> |
|
|
<li>π Footpath mask-based analysis</li> |
|
|
<li>π Real-world distance calculation between mask boundaries</li> |
|
|
<li>π₯οΈ CPU-optimized processing</li> |
|
|
<li>π Fast inference suitable for real-time use</li> |
|
|
</ul> |
|
|
</div> |
|
|
</div> |
|
|
|
|
|
<script> |
|
|
document.getElementById('uploadForm').addEventListener('submit', async function(e) { |
|
|
e.preventDefault(); |
|
|
|
|
|
const fileInput = document.getElementById('imageFile'); |
|
|
const maskInput = document.getElementById('maskFile'); |
|
|
const resultsDiv = document.getElementById('results'); |
|
|
const resultsContent = document.getElementById('resultsContent'); |
|
|
|
|
|
if (!fileInput.files[0]) { |
|
|
alert('Please select a main image file'); |
|
|
return; |
|
|
} |
|
|
|
|
|
if (!maskInput.files[0]) { |
|
|
alert('Please select a footpath mask file'); |
|
|
return; |
|
|
} |
|
|
|
|
|
const formData = new FormData(); |
|
|
formData.append('file', fileInput.files[0]); |
|
|
formData.append('mask', maskInput.files[0]); |
|
|
|
|
|
try { |
|
|
resultsContent.innerHTML = '<p>π Processing image and mask...</p>'; |
|
|
resultsDiv.style.display = 'block'; |
|
|
|
|
|
const response = await fetch('/estimate-depth', { |
|
|
method: 'POST', |
|
|
body: formData |
|
|
}); |
|
|
|
|
|
if (response.ok) { |
|
|
const result = await response.json(); |
|
|
|
|
|
let html = '<h4>π Results:</h4>'; |
|
|
html += `<p><strong>π Distance:</strong> ${result.distance_meters ? result.distance_meters.toFixed(3) + ' meters' : 'N/A'}</p>`; |
|
|
html += `<p><strong>π― Focal Length:</strong> ${result.focal_length_px ? result.focal_length_px.toFixed(2) + ' pixels' : 'N/A'}</p>`; |
|
|
html += `<p><strong>π Depth Map Shape:</strong> ${result.depth_map_shape ? result.depth_map_shape.join(' x ') : 'N/A'}</p>`; |
|
|
html += `<p><strong>π Top Mask Pixel:</strong> ${result.topmost_pixel ? `(${result.topmost_pixel[0]}, ${result.topmost_pixel[1]})` : 'N/A'}</p>`; |
|
|
|
|
|
if (result.depth_stats) { |
|
|
html += '<h4>π Depth Statistics:</h4>'; |
|
|
html += `<p><strong>Min Depth:</strong> ${result.depth_stats.min_depth.toFixed(3)}m</p>`; |
|
|
html += `<p><strong>Max Depth:</strong> ${result.depth_stats.max_depth.toFixed(3)}m</p>`; |
|
|
html += `<p><strong>Mean Depth:</strong> ${result.depth_stats.mean_depth.toFixed(3)}m</p>`; |
|
|
} |
|
|
|
|
|
resultsContent.innerHTML = html; |
|
|
} else { |
|
|
const error = await response.json(); |
|
|
resultsContent.innerHTML = `<div class="error">β Error: ${error.error || 'Processing failed'}</div>`; |
|
|
} |
|
|
} catch (error) { |
|
|
resultsContent.innerHTML = `<div class="error">β Network error: ${error.message}</div>`; |
|
|
} |
|
|
}); |
|
|
</script> |
|
|
</body> |
|
|
</html> |
|
|
""" |
|
|
return HTMLResponse(content=html_content) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run( |
|
|
app, |
|
|
host="0.0.0.0", |
|
|
port=7860, |
|
|
log_level="info", |
|
|
access_log=True |
|
|
) |
|
|
|