Spaces:
Running
on
Zero
Running
on
Zero
| # Project EmbodiedGen | |
| # | |
| # Copyright (c) 2025 Horizon Robotics. All Rights Reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
| # implied. See the License for the specific language governing | |
| # permissions and limitations under the License. | |
| import json | |
| import os | |
| import random | |
| from collections import defaultdict, deque | |
| from functools import wraps | |
| from typing import Literal | |
| import numpy as np | |
| import torch | |
| import trimesh | |
| from matplotlib.path import Path | |
| from pyquaternion import Quaternion | |
| from scipy.spatial import ConvexHull | |
| from scipy.spatial.transform import Rotation as R | |
| from shapely.geometry import Polygon | |
| from embodied_gen.utils.enum import LayoutInfo, Scene3DItemEnum | |
| from embodied_gen.utils.log import logger | |
| __all__ = [ | |
| "with_seed", | |
| "matrix_to_pose", | |
| "pose_to_matrix", | |
| "quaternion_multiply", | |
| "check_reachable", | |
| "bfs_placement", | |
| "compose_mesh_scene", | |
| "compute_pinhole_intrinsics", | |
| ] | |
| def matrix_to_pose(matrix: np.ndarray) -> list[float]: | |
| """Convert a 4x4 transformation matrix to a pose (x, y, z, qx, qy, qz, qw). | |
| Args: | |
| matrix (np.ndarray): 4x4 transformation matrix. | |
| Returns: | |
| List[float]: Pose as [x, y, z, qx, qy, qz, qw]. | |
| """ | |
| x, y, z = matrix[:3, 3] | |
| rot_mat = matrix[:3, :3] | |
| quat = R.from_matrix(rot_mat).as_quat() | |
| qx, qy, qz, qw = quat | |
| return [x, y, z, qx, qy, qz, qw] | |
| def pose_to_matrix(pose: list[float]) -> np.ndarray: | |
| """Convert pose (x, y, z, qx, qy, qz, qw) to a 4x4 transformation matrix. | |
| Args: | |
| List[float]: Pose as [x, y, z, qx, qy, qz, qw]. | |
| Returns: | |
| matrix (np.ndarray): 4x4 transformation matrix. | |
| """ | |
| x, y, z, qx, qy, qz, qw = pose | |
| r = R.from_quat([qx, qy, qz, qw]) | |
| matrix = np.eye(4) | |
| matrix[:3, :3] = r.as_matrix() | |
| matrix[:3, 3] = [x, y, z] | |
| return matrix | |
| def compute_xy_bbox( | |
| vertices: np.ndarray, col_x: int = 0, col_y: int = 1 | |
| ) -> list[float]: | |
| x_vals = vertices[:, col_x] | |
| y_vals = vertices[:, col_y] | |
| return x_vals.min(), x_vals.max(), y_vals.min(), y_vals.max() | |
| def has_iou_conflict( | |
| new_box: list[float], | |
| placed_boxes: list[list[float]], | |
| iou_threshold: float = 0.0, | |
| ) -> bool: | |
| new_min_x, new_max_x, new_min_y, new_max_y = new_box | |
| for min_x, max_x, min_y, max_y in placed_boxes: | |
| ix1 = max(new_min_x, min_x) | |
| iy1 = max(new_min_y, min_y) | |
| ix2 = min(new_max_x, max_x) | |
| iy2 = min(new_max_y, max_y) | |
| inter_area = max(0, ix2 - ix1) * max(0, iy2 - iy1) | |
| if inter_area > iou_threshold: | |
| return True | |
| return False | |
| def with_seed(seed_attr_name: str = "seed"): | |
| """A parameterized decorator that temporarily sets the random seed.""" | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| seed = kwargs.get(seed_attr_name, None) | |
| if seed is not None: | |
| py_state = random.getstate() | |
| np_state = np.random.get_state() | |
| torch_state = torch.get_rng_state() | |
| random.seed(seed) | |
| np.random.seed(seed) | |
| torch.manual_seed(seed) | |
| try: | |
| result = func(*args, **kwargs) | |
| finally: | |
| random.setstate(py_state) | |
| np.random.set_state(np_state) | |
| torch.set_rng_state(torch_state) | |
| return result | |
| else: | |
| return func(*args, **kwargs) | |
| return wrapper | |
| return decorator | |
| def compute_convex_hull_path( | |
| vertices: np.ndarray, | |
| z_threshold: float = 0.05, | |
| interp_per_edge: int = 10, | |
| margin: float = -0.02, | |
| x_axis: int = 0, | |
| y_axis: int = 1, | |
| z_axis: int = 2, | |
| ) -> Path: | |
| top_vertices = vertices[ | |
| vertices[:, z_axis] > vertices[:, z_axis].max() - z_threshold | |
| ] | |
| top_xy = top_vertices[:, [x_axis, y_axis]] | |
| if len(top_xy) < 3: | |
| raise ValueError("Not enough points to form a convex hull") | |
| hull = ConvexHull(top_xy) | |
| hull_points = top_xy[hull.vertices] | |
| polygon = Polygon(hull_points) | |
| polygon = polygon.buffer(margin) | |
| hull_points = np.array(polygon.exterior.coords) | |
| dense_points = [] | |
| for i in range(len(hull_points)): | |
| p1 = hull_points[i] | |
| p2 = hull_points[(i + 1) % len(hull_points)] | |
| for t in np.linspace(0, 1, interp_per_edge, endpoint=False): | |
| pt = (1 - t) * p1 + t * p2 | |
| dense_points.append(pt) | |
| return Path(np.array(dense_points), closed=True) | |
| def find_parent_node(node: str, tree: dict) -> str | None: | |
| for parent, children in tree.items(): | |
| if any(child[0] == node for child in children): | |
| return parent | |
| return None | |
| def all_corners_inside(hull: Path, box: list, threshold: int = 3) -> bool: | |
| x1, x2, y1, y2 = box | |
| corners = [[x1, y1], [x2, y1], [x1, y2], [x2, y2]] | |
| num_inside = sum(hull.contains_point(c) for c in corners) | |
| return num_inside >= threshold | |
| def compute_axis_rotation_quat( | |
| axis: Literal["x", "y", "z"], angle_rad: float | |
| ) -> list[float]: | |
| if axis.lower() == "x": | |
| q = Quaternion(axis=[1, 0, 0], angle=angle_rad) | |
| elif axis.lower() == "y": | |
| q = Quaternion(axis=[0, 1, 0], angle=angle_rad) | |
| elif axis.lower() == "z": | |
| q = Quaternion(axis=[0, 0, 1], angle=angle_rad) | |
| else: | |
| raise ValueError(f"Unsupported axis '{axis}', must be one of x, y, z") | |
| return [q.x, q.y, q.z, q.w] | |
| def quaternion_multiply( | |
| init_quat: list[float], rotate_quat: list[float] | |
| ) -> list[float]: | |
| qx, qy, qz, qw = init_quat | |
| q1 = Quaternion(w=qw, x=qx, y=qy, z=qz) | |
| qx, qy, qz, qw = rotate_quat | |
| q2 = Quaternion(w=qw, x=qx, y=qy, z=qz) | |
| quat = q2 * q1 | |
| return [quat.x, quat.y, quat.z, quat.w] | |
| def check_reachable( | |
| base_xyz: np.ndarray, | |
| reach_xyz: np.ndarray, | |
| min_reach: float = 0.25, | |
| max_reach: float = 0.85, | |
| ) -> bool: | |
| """Check if the target point is within the reachable range.""" | |
| distance = np.linalg.norm(reach_xyz - base_xyz) | |
| return min_reach < distance < max_reach | |
| def bfs_placement( | |
| layout_file: str, | |
| floor_margin: float = 0, | |
| beside_margin: float = 0.1, | |
| max_attempts: int = 3000, | |
| init_rpy: tuple = (1.5708, 0.0, 0.0), | |
| rotate_objs: bool = True, | |
| rotate_bg: bool = True, | |
| rotate_context: bool = True, | |
| limit_reach_range: tuple[float, float] | None = (0.20, 0.85), | |
| max_orient_diff: float | None = 60, | |
| robot_dim: float = 0.12, | |
| seed: int = None, | |
| ) -> LayoutInfo: | |
| """Place objects in the layout using BFS traversal. | |
| Args: | |
| layout_file: Path to the JSON file defining the layout structure and assets. | |
| floor_margin: Z-offset for the background object, typically for objects placed on the floor. | |
| beside_margin: Minimum margin for objects placed 'beside' their parent, used when 'on' placement fails. | |
| max_attempts: Maximum number of attempts to find a non-overlapping position for an object. | |
| init_rpy: Initial Roll-Pitch-Yaw rotation rad applied to all object meshes to align the mesh's | |
| coordinate system with the world's (e.g., Z-up). | |
| rotate_objs: If True, apply a random rotation around the Z-axis for manipulated and distractor objects. | |
| rotate_bg: If True, apply a random rotation around the Y-axis for the background object. | |
| rotate_context: If True, apply a random rotation around the Z-axis for the context object. | |
| limit_reach_range: If set, enforce a check that manipulated objects are within the robot's reach range, in meter. | |
| max_orient_diff: If set, enforce a check that manipulated objects are within the robot's orientation range, in degree. | |
| robot_dim: The approximate dimension (e.g., diameter) of the robot for box representation. | |
| seed: Random seed for reproducible placement. | |
| Returns: | |
| A :class:`LayoutInfo` object containing the objects and their final computed 7D poses | |
| ([x, y, z, qx, qy, qz, qw]). | |
| """ | |
| layout_info = LayoutInfo.from_dict(json.load(open(layout_file, "r"))) | |
| asset_dir = os.path.dirname(layout_file) | |
| object_mapping = layout_info.objs_mapping | |
| position = {} # node: [x, y, z, qx, qy, qz, qw] | |
| parent_bbox_xy = {} | |
| placed_boxes_map = defaultdict(list) | |
| mesh_info = defaultdict(dict) | |
| robot_node = layout_info.relation[Scene3DItemEnum.ROBOT.value] | |
| for node in object_mapping: | |
| if object_mapping[node] == Scene3DItemEnum.BACKGROUND.value: | |
| bg_quat = ( | |
| compute_axis_rotation_quat( | |
| axis="y", | |
| angle_rad=np.random.uniform(0, 2 * np.pi), | |
| ) | |
| if rotate_bg | |
| else [0, 0, 0, 1] | |
| ) | |
| bg_quat = [round(q, 4) for q in bg_quat] | |
| continue | |
| mesh_path = ( | |
| f"{layout_info.assets[node]}/mesh/{node.replace(' ', '_')}.obj" | |
| ) | |
| mesh_path = os.path.join(asset_dir, mesh_path) | |
| mesh_info[node]["path"] = mesh_path | |
| mesh = trimesh.load(mesh_path) | |
| rotation = R.from_euler("xyz", init_rpy, degrees=False) | |
| vertices = mesh.vertices @ rotation.as_matrix().T | |
| z1 = np.percentile(vertices[:, 2], 1) | |
| z2 = np.percentile(vertices[:, 2], 99) | |
| if object_mapping[node] == Scene3DItemEnum.CONTEXT.value: | |
| object_quat = [0, 0, 0, 1] | |
| if rotate_context: | |
| angle_rad = np.random.uniform(0, 2 * np.pi) | |
| object_quat = compute_axis_rotation_quat( | |
| axis="z", angle_rad=angle_rad | |
| ) | |
| rotation = R.from_quat(object_quat).as_matrix() | |
| vertices = vertices @ rotation.T | |
| mesh_info[node]["surface"] = compute_convex_hull_path(vertices) | |
| # Put robot in the CONTEXT edge. | |
| x, y = random.choice(mesh_info[node]["surface"].vertices) | |
| theta = np.arctan2(y, x) | |
| quat_initial = Quaternion(axis=[0, 0, 1], angle=theta) | |
| quat_extra = Quaternion(axis=[0, 0, 1], angle=np.pi) | |
| quat = quat_extra * quat_initial | |
| _pose = [x, y, z2 - z1, quat.x, quat.y, quat.z, quat.w] | |
| position[robot_node] = [round(v, 4) for v in _pose] | |
| node_box = [ | |
| x - robot_dim / 2, | |
| x + robot_dim / 2, | |
| y - robot_dim / 2, | |
| y + robot_dim / 2, | |
| ] | |
| placed_boxes_map[node].append(node_box) | |
| elif rotate_objs: | |
| # For manipulated and distractor objects, apply random rotation | |
| angle_rad = np.random.uniform(0, 2 * np.pi) | |
| object_quat = compute_axis_rotation_quat( | |
| axis="z", angle_rad=angle_rad | |
| ) | |
| rotation = R.from_quat(object_quat).as_matrix() | |
| vertices = vertices @ rotation.T | |
| x1, x2, y1, y2 = compute_xy_bbox(vertices) | |
| mesh_info[node]["pose"] = [x1, x2, y1, y2, z1, z2, *object_quat] | |
| mesh_info[node]["area"] = max(1e-5, (x2 - x1) * (y2 - y1)) | |
| root = list(layout_info.tree.keys())[0] | |
| queue = deque([((root, None), layout_info.tree.get(root, []))]) | |
| while queue: | |
| (node, relation), children = queue.popleft() | |
| if node not in object_mapping: | |
| continue | |
| if object_mapping[node] == Scene3DItemEnum.BACKGROUND.value: | |
| position[node] = [0, 0, floor_margin, *bg_quat] | |
| else: | |
| x1, x2, y1, y2, z1, z2, qx, qy, qz, qw = mesh_info[node]["pose"] | |
| if object_mapping[node] == Scene3DItemEnum.CONTEXT.value: | |
| position[node] = [0, 0, -round(z1, 4), qx, qy, qz, qw] | |
| parent_bbox_xy[node] = [x1, x2, y1, y2, z1, z2] | |
| elif object_mapping[node] in [ | |
| Scene3DItemEnum.MANIPULATED_OBJS.value, | |
| Scene3DItemEnum.DISTRACTOR_OBJS.value, | |
| ]: | |
| parent_node = find_parent_node(node, layout_info.tree) | |
| parent_pos = position[parent_node] | |
| ( | |
| p_x1, | |
| p_x2, | |
| p_y1, | |
| p_y2, | |
| p_z1, | |
| p_z2, | |
| ) = parent_bbox_xy[parent_node] | |
| obj_dx = x2 - x1 | |
| obj_dy = y2 - y1 | |
| hull_path = mesh_info[parent_node].get("surface") | |
| for _ in range(max_attempts): | |
| node_x1 = random.uniform(p_x1, p_x2 - obj_dx) | |
| node_y1 = random.uniform(p_y1, p_y2 - obj_dy) | |
| node_box = [ | |
| node_x1, | |
| node_x1 + obj_dx, | |
| node_y1, | |
| node_y1 + obj_dy, | |
| ] | |
| if hull_path and not all_corners_inside( | |
| hull_path, node_box | |
| ): | |
| continue | |
| # Make sure the manipulated object is reachable by robot. | |
| if ( | |
| limit_reach_range is not None | |
| and object_mapping[node] | |
| == Scene3DItemEnum.MANIPULATED_OBJS.value | |
| ): | |
| cx = parent_pos[0] + node_box[0] + obj_dx / 2 | |
| cy = parent_pos[1] + node_box[2] + obj_dy / 2 | |
| cz = parent_pos[2] + p_z2 - z1 | |
| robot_pos = position[robot_node][:3] | |
| if not check_reachable( | |
| base_xyz=np.array(robot_pos), | |
| reach_xyz=np.array([cx, cy, cz]), | |
| min_reach=limit_reach_range[0], | |
| max_reach=limit_reach_range[1], | |
| ): | |
| continue | |
| # Make sure the manipulated object is inside the robot's orientation. | |
| if ( | |
| max_orient_diff is not None | |
| and object_mapping[node] | |
| == Scene3DItemEnum.MANIPULATED_OBJS.value | |
| ): | |
| cx = parent_pos[0] + node_box[0] + obj_dx / 2 | |
| cy = parent_pos[1] + node_box[2] + obj_dy / 2 | |
| cx2, cy2 = position[robot_node][:2] | |
| v1 = np.array([-cx2, -cy2]) | |
| v2 = np.array([cx - cx2, cy - cy2]) | |
| dot = np.dot(v1, v2) | |
| norms = np.linalg.norm(v1) * np.linalg.norm(v2) | |
| theta = np.arccos(np.clip(dot / norms, -1.0, 1.0)) | |
| theta = np.rad2deg(theta) | |
| if theta > max_orient_diff: | |
| continue | |
| if not has_iou_conflict( | |
| node_box, placed_boxes_map[parent_node] | |
| ): | |
| z_offset = 0 | |
| break | |
| else: | |
| logger.warning( | |
| f"Cannot place {node} on {parent_node} without overlap" | |
| f" after {max_attempts} attempts, place beside {parent_node}." | |
| ) | |
| for _ in range(max_attempts): | |
| node_x1 = random.choice( | |
| [ | |
| random.uniform( | |
| p_x1 - obj_dx - beside_margin, | |
| p_x1 - obj_dx, | |
| ), | |
| random.uniform(p_x2, p_x2 + beside_margin), | |
| ] | |
| ) | |
| node_y1 = random.choice( | |
| [ | |
| random.uniform( | |
| p_y1 - obj_dy - beside_margin, | |
| p_y1 - obj_dy, | |
| ), | |
| random.uniform(p_y2, p_y2 + beside_margin), | |
| ] | |
| ) | |
| node_box = [ | |
| node_x1, | |
| node_x1 + obj_dx, | |
| node_y1, | |
| node_y1 + obj_dy, | |
| ] | |
| z_offset = -(parent_pos[2] + p_z2) | |
| if not has_iou_conflict( | |
| node_box, placed_boxes_map[parent_node] | |
| ): | |
| break | |
| placed_boxes_map[parent_node].append(node_box) | |
| abs_cx = parent_pos[0] + node_box[0] + obj_dx / 2 | |
| abs_cy = parent_pos[1] + node_box[2] + obj_dy / 2 | |
| abs_cz = parent_pos[2] + p_z2 - z1 + z_offset | |
| position[node] = [ | |
| round(v, 4) | |
| for v in [abs_cx, abs_cy, abs_cz, qx, qy, qz, qw] | |
| ] | |
| parent_bbox_xy[node] = [x1, x2, y1, y2, z1, z2] | |
| sorted_children = sorted( | |
| children, key=lambda x: -mesh_info[x[0]].get("area", 0) | |
| ) | |
| for child, rel in sorted_children: | |
| queue.append(((child, rel), layout_info.tree.get(child, []))) | |
| layout_info.position = position | |
| return layout_info | |
| def compose_mesh_scene( | |
| layout_info: LayoutInfo, out_scene_path: str, with_bg: bool = False | |
| ) -> None: | |
| object_mapping = Scene3DItemEnum.object_mapping(layout_info.relation) | |
| scene = trimesh.Scene() | |
| for node in layout_info.assets: | |
| if object_mapping[node] == Scene3DItemEnum.BACKGROUND.value: | |
| mesh_path = f"{layout_info.assets[node]}/mesh_model.ply" | |
| if not with_bg: | |
| continue | |
| else: | |
| mesh_path = ( | |
| f"{layout_info.assets[node]}/mesh/{node.replace(' ', '_')}.obj" | |
| ) | |
| mesh = trimesh.load(mesh_path) | |
| offset = np.array(layout_info.position[node])[[0, 2, 1]] | |
| mesh.vertices += offset | |
| scene.add_geometry(mesh, node_name=node) | |
| os.makedirs(os.path.dirname(out_scene_path), exist_ok=True) | |
| scene.export(out_scene_path) | |
| logger.info(f"Composed interactive 3D layout saved in {out_scene_path}") | |
| return | |
| def compute_pinhole_intrinsics( | |
| image_w: int, image_h: int, fov_deg: float | |
| ) -> np.ndarray: | |
| fov_rad = np.deg2rad(fov_deg) | |
| fx = image_w / (2 * np.tan(fov_rad / 2)) | |
| fy = fx # assuming square pixels | |
| cx = image_w / 2 | |
| cy = image_h / 2 | |
| K = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]]) | |
| return K | |