Spaces:
Sleeping
Sleeping
| import os, time, datetime | |
| from pathlib import Path | |
| import numpy as np | |
| from PIL import Image | |
| import torch | |
| from transformers import GLPNForDepthEstimation, GLPNImageProcessor | |
| import gradio as gr | |
| # ---- Keep Spaces stable (CPU-safe; quiet threading) ---- | |
| os.environ.setdefault("CUDA_VISIBLE_DEVICES", "") | |
| os.environ.setdefault("PYTORCH_ENABLE_MPS_FALLBACK", "1") | |
| os.environ.setdefault("OMP_NUM_THREADS", "1") | |
| os.environ.setdefault("OPENBLAS_NUM_THREADS", "1") | |
| DEVICE = torch.device( | |
| "cuda" if torch.cuda.is_available() | |
| else ("mps" if getattr(torch.backends, "mps", None) and torch.backends.mps.is_available() else "cpu") | |
| ) | |
| PROC = GLPNImageProcessor.from_pretrained("vinvino02/glpn-nyu") | |
| MODEL = GLPNForDepthEstimation.from_pretrained("vinvino02/glpn-nyu").to(DEVICE).eval() | |
| # Import Open3D (fail fast if missing) | |
| import open3d as o3d | |
| OUT_DIR = Path("outputs") | |
| OUT_DIR.mkdir(parents=True, exist_ok=True) | |
| def _resize_h480_m32(pil_img: Image.Image): | |
| h = min(pil_img.height, 480) | |
| h -= (h % 32) | |
| w = max(1, int(h * pil_img.width / max(1, pil_img.height))) | |
| return pil_img.resize((w, h), Image.BILINEAR) | |
| def _infer_depth(pil_img: Image.Image, logs): | |
| t0 = time.time() | |
| img_proc = _resize_h480_m32(pil_img) | |
| inputs = PROC(images=img_proc, return_tensors="pt") | |
| with torch.no_grad(): | |
| inputs = {k: v.to(DEVICE) for k, v in inputs.items()} | |
| out = MODEL(**inputs) | |
| pred = getattr(out, "predicted_depth", None) | |
| if pred is None: | |
| pred = out[0] if isinstance(out, (tuple, list)) else next(iter(out.values())) | |
| if pred.dim() == 3: | |
| pred = pred.unsqueeze(1) | |
| pred = torch.nn.functional.interpolate( | |
| pred, size=pil_img.size[::-1], mode="bicubic", align_corners=False | |
| ).squeeze(0).squeeze(0) | |
| depth = pred.detach().cpu().float().numpy() | |
| logs.append(f"[Depth] shape={depth.shape} device={DEVICE} time={time.time()-t0:.2f}s") | |
| return depth | |
| def _depth_preview(depth: np.ndarray) -> Image.Image: | |
| d = depth - float(depth.min()) | |
| rng = float(d.max()) + 1e-8 | |
| d /= rng | |
| return Image.fromarray((d * 255).astype(np.uint8)) | |
| def _to_u16(depth: np.ndarray) -> np.ndarray: | |
| d = depth - float(depth.min()) | |
| d /= (float(d.max()) + 1e-8) | |
| out = (d * 65535.0).astype(np.uint16) | |
| out[out == 0] = 1 | |
| return out | |
| def _rgbd_intrinsics(rgb: np.ndarray, depth_u16: np.ndarray, fx, fy): | |
| h, w = depth_u16.shape | |
| color = o3d.geometry.Image(rgb.astype(np.uint8)) | |
| depth = o3d.geometry.Image(depth_u16) | |
| rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth( | |
| color, depth, convert_rgb_to_intensity=False, depth_trunc=65535.0, depth_scale=1.0 | |
| ) | |
| intr = o3d.camera.PinholeCameraIntrinsic() | |
| intr.set_intrinsics(w, h, fx, fy, w/2.0, h/2.0) | |
| return rgbd, intr | |
| def _make_pointcloud(rgbd, intr, logs, nb_neighbors=20, std_ratio=20.0, down_voxel=0.0): | |
| t0 = time.time() | |
| pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd, intr) | |
| # If extremely sparse, skip aggressive outlier removal | |
| if np.asarray(pcd.points).shape[0] > 500: | |
| _, ind = pcd.remove_statistical_outlier(nb_neighbors=nb_neighbors, std_ratio=std_ratio) | |
| if len(ind) > 50: # keep at least some points | |
| pcd = pcd.select_by_index(ind) | |
| else: | |
| logs.append("[PCD] Outlier removal would drop almost all points; skipping.") | |
| else: | |
| logs.append("[PCD] Too few points for outlier removal; skipping.") | |
| if down_voxel and down_voxel > 0: | |
| pcd = pcd.voxel_down_sample(voxel_size=float(down_voxel)) | |
| npts = np.asarray(pcd.points).shape[0] | |
| logs.append(f"[PCD] points={npts} time={time.time()-t0:.2f}s (voxel={down_voxel})") | |
| return pcd | |
| def _make_mesh_with_fallback(pcd, poisson_depth, logs, method="poisson"): | |
| t0 = time.time() | |
| if np.asarray(pcd.points).shape[0] < 30: | |
| raise RuntimeError("Point cloud too small for meshing.") | |
| pcd.estimate_normals() | |
| pcd.orient_normals_to_align_with_direction() | |
| try: | |
| if method == "poisson": | |
| # Many Open3D wheels don’t support n_threads kwarg; don’t pass it. | |
| mesh = o3d.geometry.TriangleMesh.create_from_point_cloud_poisson( | |
| pcd, depth=int(poisson_depth) | |
| )[0] | |
| used = "Poisson" | |
| else: | |
| # Ball-Pivoting fallback | |
| distances = pcd.compute_nearest_neighbor_distance() | |
| if not distances: | |
| raise RuntimeError("No neighbor distances for Ball-Pivoting.") | |
| avg = float(sum(distances)) / len(distances) | |
| radii = [avg * r for r in (1.5, 2.5)] | |
| mesh = o3d.geometry.TriangleMesh.create_from_point_cloud_ball_pivoting( | |
| pcd, o3d.utility.DoubleVector(radii) | |
| ) | |
| used = "Ball-Pivoting" | |
| # Post clean & orient | |
| mesh.remove_duplicated_vertices() | |
| mesh.remove_duplicated_triangles() | |
| mesh.remove_degenerate_triangles() | |
| mesh.remove_non_manifold_edges() | |
| R = mesh.get_rotation_matrix_from_xyz((np.pi, 0, 0)) | |
| mesh.rotate(R, center=(0, 0, 0)) | |
| v = np.asarray(mesh.vertices).shape[0] | |
| f = np.asarray(mesh.triangles).shape[0] | |
| logs.append(f"[Mesh] method={used} V={v} F={f} time={time.time()-t0:.2f}s") | |
| return mesh | |
| except Exception as e: | |
| if method == "poisson": | |
| logs.append(f"[Mesh] Poisson failed: {e}. Falling back to Ball-Pivoting…") | |
| return _make_mesh_with_fallback(pcd, poisson_depth, logs, method="ball") | |
| raise | |
| def _timestamped(name: str, ext: str) -> Path: | |
| ts = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| return OUT_DIR / f"{name}_{ts}.{ext}" | |
| def run(pil_img: Image.Image, fx: int, fy: int, poisson_depth: int, down_voxel: float, verbose: bool): | |
| logs = [] | |
| try: | |
| if pil_img is None: | |
| return None, None, None, None, None, None, "Upload an image." | |
| # 1) Depth | |
| depth = _infer_depth(pil_img, logs) | |
| depth_prev = _depth_preview(depth) | |
| # 2) RGBD + intrinsics | |
| rgb = np.array(pil_img.convert("RGB")) | |
| depth_u16 = _to_u16(depth) | |
| rgbd, intr = _rgbd_intrinsics(rgb, depth_u16, fx, fy) | |
| # 3) Point cloud | |
| pcd = _make_pointcloud(rgbd, intr, logs, down_voxel=down_voxel) | |
| if np.asarray(pcd.points).shape[0] < 30: | |
| raise RuntimeError("Got < 30 points after filtering; try lowering outlier removal or increasing voxel size to 0.") | |
| # 4) Mesh with fallback | |
| mesh = _make_mesh_with_fallback(pcd, poisson_depth, logs) | |
| # 5) Save artifacts (persistent + timestamped) | |
| depth_png = _timestamped("depth_preview", "png") | |
| pcd_ply = _timestamped("pointcloud", "ply") | |
| mesh_ply = _timestamped("mesh", "ply") | |
| depth_prev.save(depth_png) | |
| o3d.io.write_point_cloud(str(pcd_ply), pcd, write_ascii=False) | |
| o3d.io.write_triangle_mesh(str(mesh_ply), mesh, write_ascii=False) | |
| log_txt = "\n".join(logs if verbose else logs[-20:]) | |
| return ( | |
| depth_prev, # preview image | |
| str(pcd_ply), # for Model3D viewer | |
| str(mesh_ply), # for Model3D viewer | |
| str(depth_png), # download depth | |
| str(pcd_ply), # download pcd | |
| str(mesh_ply), # download mesh | |
| log_txt | |
| ) | |
| except Exception as e: | |
| logs.append(f"[ERROR] {type(e).__name__}: {e}") | |
| return None, None, None, None, None, None, "\n".join(logs) | |
| with gr.Blocks(title="Room 3D Reconstruction (GLPN + Open3D)") as demo: | |
| gr.Markdown("### Room 3D Reconstruction — GLPN → RGB-D → Point Cloud → Mesh\nUpload a room photo. If Poisson fails, we auto-fallback to Ball-Pivoting.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| inp = gr.Image(type="pil", label="Input room image") | |
| fx = gr.Slider(200, 1200, value=500, step=10, label="fx (px)") | |
| fy = gr.Slider(200, 1200, value=500, step=10, label="fy (px)") | |
| pdepth = gr.Slider(6, 11, value=9, step=1, label="Poisson depth (lower = faster/stabler)") | |
| down = gr.Slider(0.0, 0.02, value=0.01, step=0.002, label="Voxel downsample (m)") | |
| verbose = gr.Checkbox(value=True, label="Verbose logs") | |
| btn = gr.Button("Reconstruct 3D", variant="primary") | |
| with gr.Column(): | |
| depth_img = gr.Image(label="Depth preview", interactive=False) | |
| pcd_view = gr.Model3D(label="Point Cloud (.ply)") | |
| mesh_view = gr.Model3D(label="Mesh (.ply)") | |
| with gr.Row(): | |
| depth_file = gr.File(label="Download depth (PNG)") | |
| pcd_file = gr.File(label="Download point cloud (.ply)") | |
| mesh_file = gr.File(label="Download mesh (.ply)") | |
| logs = gr.Textbox(label="Logs", max_lines=48, lines=20) | |
| btn.click( | |
| run, | |
| inputs=[inp, fx, fy, pdepth, down, verbose], | |
| outputs=[depth_img, pcd_view, mesh_view, depth_file, pcd_file, mesh_file, logs] | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue().launch() | |