Spaces:
Running
on
Zero
Running
on
Zero
| # Project EmbodiedGen | |
| # | |
| # Copyright (c) 2025 Horizon Robotics. All Rights Reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
| # implied. See the License for the specific language governing | |
| # permissions and limitations under the License. | |
| import json | |
| import os | |
| from copy import deepcopy | |
| import numpy as np | |
| import sapien | |
| import torch | |
| import torchvision.transforms as transforms | |
| from mani_skill.envs.sapien_env import BaseEnv | |
| from mani_skill.sensors.camera import CameraConfig | |
| from mani_skill.utils import sapien_utils | |
| from mani_skill.utils.building import actors | |
| from mani_skill.utils.registration import register_env | |
| from mani_skill.utils.structs.actor import Actor | |
| from mani_skill.utils.structs.pose import Pose | |
| from mani_skill.utils.structs.types import ( | |
| GPUMemoryConfig, | |
| SceneConfig, | |
| SimConfig, | |
| ) | |
| from mani_skill.utils.visualization.misc import tile_images | |
| from tqdm import tqdm | |
| from embodied_gen.models.gs_model import GaussianOperator | |
| from embodied_gen.utils.enum import LayoutInfo, Scene3DItemEnum | |
| from embodied_gen.utils.geometry import bfs_placement, quaternion_multiply | |
| from embodied_gen.utils.log import logger | |
| from embodied_gen.utils.process_media import alpha_blend_rgba | |
| from embodied_gen.utils.simulation import ( | |
| SIM_COORD_ALIGN, | |
| load_assets_from_layout_file, | |
| ) | |
| __all__ = ["PickEmbodiedGen"] | |
| class PickEmbodiedGen(BaseEnv): | |
| SUPPORTED_ROBOTS = ["panda", "panda_wristcam", "fetch"] | |
| goal_thresh = 0.0 | |
| def __init__( | |
| self, | |
| *args, | |
| robot_uids: str | list[str] = "panda", | |
| robot_init_qpos_noise: float = 0.02, | |
| num_envs: int = 1, | |
| reconfiguration_freq: int = None, | |
| **kwargs, | |
| ): | |
| self.robot_init_qpos_noise = robot_init_qpos_noise | |
| if reconfiguration_freq is None: | |
| if num_envs == 1: | |
| reconfiguration_freq = 1 | |
| else: | |
| reconfiguration_freq = 0 | |
| # Init params from kwargs. | |
| layout_file = kwargs.pop("layout_file", None) | |
| replace_objs = kwargs.pop("replace_objs", True) | |
| self.enable_grasp = kwargs.pop("enable_grasp", False) | |
| self.init_quat = kwargs.pop("init_quat", [0.7071, 0, 0, 0.7071]) | |
| # Add small offset in z-axis to avoid collision. | |
| self.objs_z_offset = kwargs.pop("objs_z_offset", 0.002) | |
| self.robot_z_offset = kwargs.pop("robot_z_offset", 0.002) | |
| self.layouts = self.init_env_layouts( | |
| layout_file, num_envs, replace_objs | |
| ) | |
| self.robot_pose = self.compute_robot_init_pose( | |
| self.layouts, num_envs, self.robot_z_offset | |
| ) | |
| self.env_actors = dict() | |
| self.image_transform = transforms.PILToTensor() | |
| super().__init__( | |
| *args, | |
| robot_uids=robot_uids, | |
| reconfiguration_freq=reconfiguration_freq, | |
| num_envs=num_envs, | |
| **kwargs, | |
| ) | |
| self.bg_images = dict() | |
| if self.render_mode == "hybrid": | |
| self.bg_images = self.render_gs3d_images( | |
| self.layouts, num_envs, self.init_quat | |
| ) | |
| def init_env_layouts( | |
| layout_file: str, num_envs: int, replace_objs: bool | |
| ) -> list[LayoutInfo]: | |
| layout = LayoutInfo.from_dict(json.load(open(layout_file, "r"))) | |
| layouts = [] | |
| for env_idx in range(num_envs): | |
| if replace_objs and env_idx > 0: | |
| layout = bfs_placement(deepcopy(layout)) | |
| layouts.append(layout) | |
| return layouts | |
| def compute_robot_init_pose( | |
| layouts: list[LayoutInfo], num_envs: int, z_offset: float = 0.0 | |
| ) -> list[list[float]]: | |
| robot_pose = [] | |
| for env_idx in range(num_envs): | |
| layout = layouts[env_idx] | |
| robot_node = layout.relation[Scene3DItemEnum.ROBOT.value] | |
| x, y, z, qx, qy, qz, qw = layout.position[robot_node] | |
| robot_pose.append([x, y, z + z_offset, qw, qx, qy, qz]) | |
| return robot_pose | |
| def _default_sim_config(self): | |
| return SimConfig( | |
| scene_config=SceneConfig( | |
| solver_position_iterations=30, | |
| # contact_offset=0.04, | |
| # rest_offset=0.001, | |
| ), | |
| # sim_freq=200, | |
| control_freq=50, | |
| gpu_memory_config=GPUMemoryConfig( | |
| max_rigid_contact_count=2**20, max_rigid_patch_count=2**19 | |
| ), | |
| ) | |
| def _default_sensor_configs(self): | |
| pose = sapien_utils.look_at(eye=[0.3, 0, 0.6], target=[-0.1, 0, 0.1]) | |
| return [ | |
| CameraConfig("base_camera", pose, 128, 128, np.pi / 2, 0.01, 100) | |
| ] | |
| def _default_human_render_camera_configs(self): | |
| pose = sapien_utils.look_at( | |
| eye=[0.9, 0.0, 1.1], target=[0.0, 0.0, 0.9] | |
| ) | |
| return CameraConfig( | |
| "render_camera", pose, 256, 256, np.deg2rad(75), 0.01, 100 | |
| ) | |
| def _load_agent(self, options: dict): | |
| super()._load_agent(options, sapien.Pose(p=[-10, 0, 10])) | |
| def _load_scene(self, options: dict): | |
| all_objects = [] | |
| logger.info(f"Loading assets and decomposition mesh collisions...") | |
| for env_idx in range(self.num_envs): | |
| env_actors = load_assets_from_layout_file( | |
| self.scene, | |
| self.layouts[env_idx], | |
| z_offset=self.objs_z_offset, | |
| init_quat=self.init_quat, | |
| env_idx=env_idx, | |
| ) | |
| self.env_actors[f"env{env_idx}"] = env_actors | |
| all_objects.extend(env_actors.values()) | |
| self.obj = all_objects[-1] | |
| for obj in all_objects: | |
| self.remove_from_state_dict_registry(obj) | |
| self.all_objects = Actor.merge(all_objects, name="all_objects") | |
| self.add_to_state_dict_registry(self.all_objects) | |
| self.goal_site = actors.build_sphere( | |
| self.scene, | |
| radius=self.goal_thresh, | |
| color=[0, 1, 0, 0], | |
| name="goal_site", | |
| body_type="kinematic", | |
| add_collision=False, | |
| initial_pose=sapien.Pose(), | |
| ) | |
| self._hidden_objects.append(self.goal_site) | |
| def _initialize_episode(self, env_idx: torch.Tensor, options: dict): | |
| with torch.device(self.device): | |
| b = len(env_idx) | |
| goal_xyz = torch.zeros((b, 3)) | |
| goal_xyz[:, :2] = torch.rand((b, 2)) * 0.2 - 0.1 | |
| self.goal_site.set_pose(Pose.create_from_pq(goal_xyz)) | |
| qpos = np.array( | |
| [ | |
| 0.0, | |
| np.pi / 8, | |
| 0, | |
| -np.pi * 3 / 8, | |
| 0, | |
| np.pi * 3 / 4, | |
| np.pi / 4, | |
| 0.04, | |
| 0.04, | |
| ] | |
| ) | |
| qpos = ( | |
| np.random.normal( | |
| 0, self.robot_init_qpos_noise, (self.num_envs, len(qpos)) | |
| ) | |
| + qpos | |
| ) | |
| qpos[:, -2:] = 0.04 | |
| self.agent.robot.set_root_pose(np.array(self.robot_pose)) | |
| self.agent.reset(qpos) | |
| self.agent.init_qpos = qpos | |
| self.agent.controller.controllers["gripper"].reset() | |
| def render_gs3d_images( | |
| self, layouts: list[LayoutInfo], num_envs: int, init_quat: list[float] | |
| ) -> dict[str, np.ndarray]: | |
| sim_coord_align = ( | |
| torch.tensor(SIM_COORD_ALIGN).to(torch.float32).to(self.device) | |
| ) | |
| cameras = self.scene.sensors.copy() | |
| cameras.update(self.scene.human_render_cameras) | |
| bg_node = layouts[0].relation[Scene3DItemEnum.BACKGROUND.value] | |
| gs_path = os.path.join(layouts[0].assets[bg_node], "gs_model.ply") | |
| raw_gs: GaussianOperator = GaussianOperator.load_from_ply(gs_path) | |
| bg_images = dict() | |
| for env_idx in tqdm(range(num_envs), desc="Pre-rendering Background"): | |
| layout = layouts[env_idx] | |
| x, y, z, qx, qy, qz, qw = layout.position[bg_node] | |
| qx, qy, qz, qw = quaternion_multiply([qx, qy, qz, qw], init_quat) | |
| init_pose = torch.tensor([x, y, z, qx, qy, qz, qw]) | |
| gs_model = raw_gs.get_gaussians(instance_pose=init_pose) | |
| for key in cameras: | |
| camera = cameras[key] | |
| Ks = camera.camera.get_intrinsic_matrix() # (n_env, 3, 3) | |
| c2w = camera.camera.get_model_matrix() # (n_env, 4, 4) | |
| result = gs_model.render( | |
| c2w[env_idx] @ sim_coord_align, | |
| Ks[env_idx], | |
| image_width=camera.config.width, | |
| image_height=camera.config.height, | |
| ) | |
| bg_images[f"{key}-env{env_idx}"] = result.rgb[..., ::-1] | |
| return bg_images | |
| def render(self): | |
| if self.render_mode is None: | |
| raise RuntimeError("render_mode is not set.") | |
| if self.render_mode == "human": | |
| return self.render_human() | |
| elif self.render_mode == "rgb_array": | |
| res = self.render_rgb_array() | |
| return res | |
| elif self.render_mode == "sensors": | |
| res = self.render_sensors() | |
| return res | |
| elif self.render_mode == "all": | |
| return self.render_all() | |
| elif self.render_mode == "hybrid": | |
| return self.hybrid_render() | |
| else: | |
| raise NotImplementedError( | |
| f"Unsupported render mode {self.render_mode}." | |
| ) | |
| def render_rgb_array( | |
| self, camera_name: str = None, return_alpha: bool = False | |
| ): | |
| for obj in self._hidden_objects: | |
| obj.show_visual() | |
| self.scene.update_render( | |
| update_sensors=False, update_human_render_cameras=True | |
| ) | |
| images = [] | |
| render_images = self.scene.get_human_render_camera_images( | |
| camera_name, return_alpha | |
| ) | |
| for image in render_images.values(): | |
| images.append(image) | |
| if len(images) == 0: | |
| return None | |
| if len(images) == 1: | |
| return images[0] | |
| for obj in self._hidden_objects: | |
| obj.hide_visual() | |
| return tile_images(images) | |
| def render_sensors(self): | |
| images = [] | |
| sensor_images = self.get_sensor_images() | |
| for image in sensor_images.values(): | |
| for img in image.values(): | |
| images.append(img) | |
| return tile_images(images) | |
| def hybrid_render(self): | |
| fg_images = self.render_rgb_array( | |
| return_alpha=True | |
| ) # (n_env, h, w, 3) | |
| images = [] | |
| for key in self.bg_images: | |
| if "render_camera" not in key: | |
| continue | |
| env_idx = int(key.split("-env")[-1]) | |
| rgba = alpha_blend_rgba( | |
| fg_images[env_idx].cpu().numpy(), self.bg_images[key] | |
| ) | |
| images.append(self.image_transform(rgba)) | |
| images = torch.stack(images, dim=0) | |
| images = images.permute(0, 2, 3, 1) | |
| return images[..., :3] | |
| def evaluate(self): | |
| obj_to_goal_pos = ( | |
| self.obj.pose.p | |
| ) # self.goal_site.pose.p - self.obj.pose.p | |
| is_obj_placed = ( | |
| torch.linalg.norm(obj_to_goal_pos, axis=1) <= self.goal_thresh | |
| ) | |
| is_grasped = self.agent.is_grasping(self.obj) | |
| is_robot_static = self.agent.is_static(0.2) | |
| return dict( | |
| is_grasped=is_grasped, | |
| obj_to_goal_pos=obj_to_goal_pos, | |
| is_obj_placed=is_obj_placed, | |
| is_robot_static=is_robot_static, | |
| is_grasping=self.agent.is_grasping(self.obj), | |
| success=torch.logical_and(is_obj_placed, is_robot_static), | |
| ) | |
| def _get_obs_extra(self, info: dict): | |
| return dict() | |
| def compute_dense_reward(self, obs: any, action: torch.Tensor, info: dict): | |
| tcp_to_obj_dist = torch.linalg.norm( | |
| self.obj.pose.p - self.agent.tcp.pose.p, axis=1 | |
| ) | |
| reaching_reward = 1 - torch.tanh(5 * tcp_to_obj_dist) | |
| reward = reaching_reward | |
| is_grasped = info["is_grasped"] | |
| reward += is_grasped | |
| # obj_to_goal_dist = torch.linalg.norm( | |
| # self.goal_site.pose.p - self.obj.pose.p, axis=1 | |
| # ) | |
| obj_to_goal_dist = torch.linalg.norm( | |
| self.obj.pose.p - self.obj.pose.p, axis=1 | |
| ) | |
| place_reward = 1 - torch.tanh(5 * obj_to_goal_dist) | |
| reward += place_reward * is_grasped | |
| reward += info["is_obj_placed"] * is_grasped | |
| static_reward = 1 - torch.tanh( | |
| 5 | |
| * torch.linalg.norm(self.agent.robot.get_qvel()[..., :-2], axis=1) | |
| ) | |
| reward += static_reward * info["is_obj_placed"] * is_grasped | |
| reward[info["success"]] = 6 | |
| return reward | |
| def compute_normalized_dense_reward( | |
| self, obs: any, action: torch.Tensor, info: dict | |
| ): | |
| return self.compute_dense_reward(obs=obs, action=action, info=info) / 6 | |