Spaces:
Running
on
Zero
Running
on
Zero
| # Copyright (C) 2022-present Naver Corporation. All rights reserved. | |
| # Licensed under CC BY-NC-SA 4.0 (non-commercial use only). | |
| import os | |
| import numpy as np | |
| import quaternion | |
| import habitat_sim | |
| import json | |
| from sklearn.neighbors import NearestNeighbors | |
| import cv2 | |
| # OpenCV to habitat camera convention transformation | |
| R_OPENCV2HABITAT = np.stack( | |
| (habitat_sim.geo.RIGHT, -habitat_sim.geo.UP, habitat_sim.geo.FRONT), axis=0 | |
| ) | |
| R_HABITAT2OPENCV = R_OPENCV2HABITAT.T | |
| DEG2RAD = np.pi / 180 | |
| def compute_camera_intrinsics(height, width, hfov): | |
| f = width / 2 / np.tan(hfov / 2 * np.pi / 180) | |
| cu, cv = width / 2, height / 2 | |
| return f, cu, cv | |
| def compute_camera_pose_opencv_convention(camera_position, camera_orientation): | |
| R_cam2world = quaternion.as_rotation_matrix(camera_orientation) @ R_OPENCV2HABITAT | |
| t_cam2world = np.asarray(camera_position) | |
| return R_cam2world, t_cam2world | |
| def compute_pointmap(depthmap, hfov): | |
| """Compute a HxWx3 pointmap in camera frame from a HxW depth map.""" | |
| height, width = depthmap.shape | |
| f, cu, cv = compute_camera_intrinsics(height, width, hfov) | |
| # Cast depth map to point | |
| z_cam = depthmap | |
| u, v = np.meshgrid(range(width), range(height)) | |
| x_cam = (u - cu) / f * z_cam | |
| y_cam = (v - cv) / f * z_cam | |
| X_cam = np.stack((x_cam, y_cam, z_cam), axis=-1) | |
| return X_cam | |
| def compute_pointcloud(depthmap, hfov, camera_position, camera_rotation): | |
| """Return a 3D point cloud corresponding to valid pixels of the depth map""" | |
| R_cam2world, t_cam2world = compute_camera_pose_opencv_convention( | |
| camera_position, camera_rotation | |
| ) | |
| X_cam = compute_pointmap(depthmap=depthmap, hfov=hfov) | |
| valid_mask = X_cam[:, :, 2] != 0.0 | |
| X_cam = X_cam.reshape(-1, 3)[valid_mask.flatten()] | |
| X_world = X_cam @ R_cam2world.T + t_cam2world.reshape(1, 3) | |
| return X_world | |
| def compute_pointcloud_overlaps_scikit( | |
| pointcloud1, pointcloud2, distance_threshold, compute_symmetric=False | |
| ): | |
| """ | |
| Compute 'overlapping' metrics based on a distance threshold between two point clouds. | |
| """ | |
| nbrs = NearestNeighbors(n_neighbors=1, algorithm="kd_tree").fit(pointcloud2) | |
| distances, indices = nbrs.kneighbors(pointcloud1) | |
| intersection1 = np.count_nonzero(distances.flatten() < distance_threshold) | |
| data = {"intersection1": intersection1, "size1": len(pointcloud1)} | |
| if compute_symmetric: | |
| nbrs = NearestNeighbors(n_neighbors=1, algorithm="kd_tree").fit(pointcloud1) | |
| distances, indices = nbrs.kneighbors(pointcloud2) | |
| intersection2 = np.count_nonzero(distances.flatten() < distance_threshold) | |
| data["intersection2"] = intersection2 | |
| data["size2"] = len(pointcloud2) | |
| return data | |
| def _append_camera_parameters(observation, hfov, camera_location, camera_rotation): | |
| """ | |
| Add camera parameters to the observation dictionnary produced by Habitat-Sim | |
| In-place modifications. | |
| """ | |
| R_cam2world, t_cam2world = compute_camera_pose_opencv_convention( | |
| camera_location, camera_rotation | |
| ) | |
| height, width = observation["depth"].shape | |
| f, cu, cv = compute_camera_intrinsics(height, width, hfov) | |
| K = np.asarray([[f, 0, cu], [0, f, cv], [0, 0, 1.0]]) | |
| observation["camera_intrinsics"] = K | |
| observation["t_cam2world"] = t_cam2world | |
| observation["R_cam2world"] = R_cam2world | |
| def look_at(eye, center, up, return_cam2world=True): | |
| """ | |
| Return camera pose looking at a given center point. | |
| Analogous of gluLookAt function, using OpenCV camera convention. | |
| """ | |
| z = center - eye | |
| z /= np.linalg.norm(z, axis=-1, keepdims=True) | |
| y = -up | |
| y = y - np.sum(y * z, axis=-1, keepdims=True) * z | |
| y /= np.linalg.norm(y, axis=-1, keepdims=True) | |
| x = np.cross(y, z, axis=-1) | |
| if return_cam2world: | |
| R = np.stack((x, y, z), axis=-1) | |
| t = eye | |
| else: | |
| # World to camera transformation | |
| # Transposed matrix | |
| R = np.stack((x, y, z), axis=-2) | |
| t = -np.einsum("...ij, ...j", R, eye) | |
| return R, t | |
| def look_at_for_habitat(eye, center, up, return_cam2world=True): | |
| R, t = look_at(eye, center, up) | |
| orientation = quaternion.from_rotation_matrix(R @ R_OPENCV2HABITAT.T) | |
| return orientation, t | |
| def generate_orientation_noise(pan_range, tilt_range, roll_range): | |
| return ( | |
| quaternion.from_rotation_vector( | |
| np.random.uniform(*pan_range) * DEG2RAD * habitat_sim.geo.UP | |
| ) | |
| * quaternion.from_rotation_vector( | |
| np.random.uniform(*tilt_range) * DEG2RAD * habitat_sim.geo.RIGHT | |
| ) | |
| * quaternion.from_rotation_vector( | |
| np.random.uniform(*roll_range) * DEG2RAD * habitat_sim.geo.FRONT | |
| ) | |
| ) | |
| class NoNaviguableSpaceError(RuntimeError): | |
| def __init__(self, *args): | |
| super().__init__(*args) | |
| class MultiviewHabitatSimGenerator: | |
| def __init__( | |
| self, | |
| scene, | |
| navmesh, | |
| scene_dataset_config_file, | |
| resolution=(240, 320), | |
| views_count=2, | |
| hfov=60, | |
| gpu_id=0, | |
| size=10000, | |
| minimum_covisibility=0.5, | |
| transform=None, | |
| ): | |
| self.scene = scene | |
| self.navmesh = navmesh | |
| self.scene_dataset_config_file = scene_dataset_config_file | |
| self.resolution = resolution | |
| self.views_count = views_count | |
| assert self.views_count >= 1 | |
| self.hfov = hfov | |
| self.gpu_id = gpu_id | |
| self.size = size | |
| self.transform = transform | |
| # Noise added to camera orientation | |
| self.pan_range = (-3, 3) | |
| self.tilt_range = (-10, 10) | |
| self.roll_range = (-5, 5) | |
| # Height range to sample cameras | |
| self.height_range = (1.2, 1.8) | |
| # Random steps between the camera views | |
| self.random_steps_count = 5 | |
| self.random_step_variance = 2.0 | |
| # Minimum fraction of the scene which should be valid (well defined depth) | |
| self.minimum_valid_fraction = 0.7 | |
| # Distance threshold to see to select pairs | |
| self.distance_threshold = 0.05 | |
| # Minimum IoU of a view point cloud with respect to the reference view to be kept. | |
| self.minimum_covisibility = minimum_covisibility | |
| # Maximum number of retries. | |
| self.max_attempts_count = 100 | |
| self.seed = None | |
| self._lazy_initialization() | |
| def _lazy_initialization(self): | |
| # Lazy random seeding and instantiation of the simulator to deal with multiprocessing properly | |
| if self.seed == None: | |
| # Re-seed numpy generator | |
| np.random.seed() | |
| self.seed = np.random.randint(2**32 - 1) | |
| sim_cfg = habitat_sim.SimulatorConfiguration() | |
| sim_cfg.scene_id = self.scene | |
| if ( | |
| self.scene_dataset_config_file is not None | |
| and self.scene_dataset_config_file != "" | |
| ): | |
| sim_cfg.scene_dataset_config_file = self.scene_dataset_config_file | |
| sim_cfg.random_seed = self.seed | |
| sim_cfg.load_semantic_mesh = False | |
| sim_cfg.gpu_device_id = self.gpu_id | |
| depth_sensor_spec = habitat_sim.CameraSensorSpec() | |
| depth_sensor_spec.uuid = "depth" | |
| depth_sensor_spec.sensor_type = habitat_sim.SensorType.DEPTH | |
| depth_sensor_spec.resolution = self.resolution | |
| depth_sensor_spec.hfov = self.hfov | |
| depth_sensor_spec.position = [0.0, 0.0, 0] | |
| depth_sensor_spec.orientation | |
| rgb_sensor_spec = habitat_sim.CameraSensorSpec() | |
| rgb_sensor_spec.uuid = "color" | |
| rgb_sensor_spec.sensor_type = habitat_sim.SensorType.COLOR | |
| rgb_sensor_spec.resolution = self.resolution | |
| rgb_sensor_spec.hfov = self.hfov | |
| rgb_sensor_spec.position = [0.0, 0.0, 0] | |
| agent_cfg = habitat_sim.agent.AgentConfiguration( | |
| sensor_specifications=[rgb_sensor_spec, depth_sensor_spec] | |
| ) | |
| cfg = habitat_sim.Configuration(sim_cfg, [agent_cfg]) | |
| self.sim = habitat_sim.Simulator(cfg) | |
| if self.navmesh is not None and self.navmesh != "": | |
| # Use pre-computed navmesh when available (usually better than those generated automatically) | |
| self.sim.pathfinder.load_nav_mesh(self.navmesh) | |
| if not self.sim.pathfinder.is_loaded: | |
| # Try to compute a navmesh | |
| navmesh_settings = habitat_sim.NavMeshSettings() | |
| navmesh_settings.set_defaults() | |
| self.sim.recompute_navmesh(self.sim.pathfinder, navmesh_settings, True) | |
| # Ensure that the navmesh is not empty | |
| if not self.sim.pathfinder.is_loaded: | |
| raise NoNaviguableSpaceError( | |
| f"No naviguable location (scene: {self.scene} -- navmesh: {self.navmesh})" | |
| ) | |
| self.agent = self.sim.initialize_agent(agent_id=0) | |
| def close(self): | |
| self.sim.close() | |
| def __del__(self): | |
| self.sim.close() | |
| def __len__(self): | |
| return self.size | |
| def sample_random_viewpoint(self): | |
| """Sample a random viewpoint using the navmesh""" | |
| nav_point = self.sim.pathfinder.get_random_navigable_point() | |
| # Sample a random viewpoint height | |
| viewpoint_height = np.random.uniform(*self.height_range) | |
| viewpoint_position = nav_point + viewpoint_height * habitat_sim.geo.UP | |
| viewpoint_orientation = quaternion.from_rotation_vector( | |
| np.random.uniform(0, 2 * np.pi) * habitat_sim.geo.UP | |
| ) * generate_orientation_noise(self.pan_range, self.tilt_range, self.roll_range) | |
| return viewpoint_position, viewpoint_orientation, nav_point | |
| def sample_other_random_viewpoint(self, observed_point, nav_point): | |
| """Sample a random viewpoint close to an existing one, using the navmesh and a reference observed point.""" | |
| other_nav_point = nav_point | |
| walk_directions = self.random_step_variance * np.asarray([1, 0, 1]) | |
| for i in range(self.random_steps_count): | |
| temp = self.sim.pathfinder.snap_point( | |
| other_nav_point + walk_directions * np.random.normal(size=3) | |
| ) | |
| # Snapping may return nan when it fails | |
| if not np.isnan(temp[0]): | |
| other_nav_point = temp | |
| other_viewpoint_height = np.random.uniform(*self.height_range) | |
| other_viewpoint_position = ( | |
| other_nav_point + other_viewpoint_height * habitat_sim.geo.UP | |
| ) | |
| # Set viewing direction towards the central point | |
| rotation, position = look_at_for_habitat( | |
| eye=other_viewpoint_position, | |
| center=observed_point, | |
| up=habitat_sim.geo.UP, | |
| return_cam2world=True, | |
| ) | |
| rotation = rotation * generate_orientation_noise( | |
| self.pan_range, self.tilt_range, self.roll_range | |
| ) | |
| return position, rotation, other_nav_point | |
| def is_other_pointcloud_overlapping(self, ref_pointcloud, other_pointcloud): | |
| """Check if a viewpoint is valid and overlaps significantly with a reference one.""" | |
| # Observation | |
| pixels_count = self.resolution[0] * self.resolution[1] | |
| valid_fraction = len(other_pointcloud) / pixels_count | |
| assert valid_fraction <= 1.0 and valid_fraction >= 0.0 | |
| overlap = compute_pointcloud_overlaps_scikit( | |
| ref_pointcloud, | |
| other_pointcloud, | |
| self.distance_threshold, | |
| compute_symmetric=True, | |
| ) | |
| covisibility = min( | |
| overlap["intersection1"] / pixels_count, | |
| overlap["intersection2"] / pixels_count, | |
| ) | |
| is_valid = (valid_fraction >= self.minimum_valid_fraction) and ( | |
| covisibility >= self.minimum_covisibility | |
| ) | |
| return is_valid, valid_fraction, covisibility | |
| def is_other_viewpoint_overlapping( | |
| self, ref_pointcloud, observation, position, rotation | |
| ): | |
| """Check if a viewpoint is valid and overlaps significantly with a reference one.""" | |
| # Observation | |
| other_pointcloud = compute_pointcloud( | |
| observation["depth"], self.hfov, position, rotation | |
| ) | |
| return self.is_other_pointcloud_overlapping(ref_pointcloud, other_pointcloud) | |
| def render_viewpoint(self, viewpoint_position, viewpoint_orientation): | |
| agent_state = habitat_sim.AgentState() | |
| agent_state.position = viewpoint_position | |
| agent_state.rotation = viewpoint_orientation | |
| self.agent.set_state(agent_state) | |
| viewpoint_observations = self.sim.get_sensor_observations(agent_ids=0) | |
| _append_camera_parameters( | |
| viewpoint_observations, self.hfov, viewpoint_position, viewpoint_orientation | |
| ) | |
| return viewpoint_observations | |
| def __getitem__(self, useless_idx): | |
| ref_position, ref_orientation, nav_point = self.sample_random_viewpoint() | |
| ref_observations = self.render_viewpoint(ref_position, ref_orientation) | |
| # Extract point cloud | |
| ref_pointcloud = compute_pointcloud( | |
| depthmap=ref_observations["depth"], | |
| hfov=self.hfov, | |
| camera_position=ref_position, | |
| camera_rotation=ref_orientation, | |
| ) | |
| pixels_count = self.resolution[0] * self.resolution[1] | |
| ref_valid_fraction = len(ref_pointcloud) / pixels_count | |
| assert ref_valid_fraction <= 1.0 and ref_valid_fraction >= 0.0 | |
| if ref_valid_fraction < self.minimum_valid_fraction: | |
| # This should produce a recursion error at some point when something is very wrong. | |
| return self[0] | |
| # Pick an reference observed point in the point cloud | |
| observed_point = np.mean(ref_pointcloud, axis=0) | |
| # Add the first image as reference | |
| viewpoints_observations = [ref_observations] | |
| viewpoints_covisibility = [ref_valid_fraction] | |
| viewpoints_positions = [ref_position] | |
| viewpoints_orientations = [quaternion.as_float_array(ref_orientation)] | |
| viewpoints_clouds = [ref_pointcloud] | |
| viewpoints_valid_fractions = [ref_valid_fraction] | |
| for _ in range(self.views_count - 1): | |
| # Generate an other viewpoint using some dummy random walk | |
| successful_sampling = False | |
| for sampling_attempt in range(self.max_attempts_count): | |
| position, rotation, _ = self.sample_other_random_viewpoint( | |
| observed_point, nav_point | |
| ) | |
| # Observation | |
| other_viewpoint_observations = self.render_viewpoint(position, rotation) | |
| other_pointcloud = compute_pointcloud( | |
| other_viewpoint_observations["depth"], self.hfov, position, rotation | |
| ) | |
| is_valid, valid_fraction, covisibility = ( | |
| self.is_other_pointcloud_overlapping( | |
| ref_pointcloud, other_pointcloud | |
| ) | |
| ) | |
| if is_valid: | |
| successful_sampling = True | |
| break | |
| if not successful_sampling: | |
| print("WARNING: Maximum number of attempts reached.") | |
| # Dirty hack, try using a novel original viewpoint | |
| return self[0] | |
| viewpoints_observations.append(other_viewpoint_observations) | |
| viewpoints_covisibility.append(covisibility) | |
| viewpoints_positions.append(position) | |
| viewpoints_orientations.append( | |
| quaternion.as_float_array(rotation) | |
| ) # WXYZ convention for the quaternion encoding. | |
| viewpoints_clouds.append(other_pointcloud) | |
| viewpoints_valid_fractions.append(valid_fraction) | |
| # Estimate relations between all pairs of images | |
| pairwise_visibility_ratios = np.ones( | |
| (len(viewpoints_observations), len(viewpoints_observations)) | |
| ) | |
| for i in range(len(viewpoints_observations)): | |
| pairwise_visibility_ratios[i, i] = viewpoints_valid_fractions[i] | |
| for j in range(i + 1, len(viewpoints_observations)): | |
| overlap = compute_pointcloud_overlaps_scikit( | |
| viewpoints_clouds[i], | |
| viewpoints_clouds[j], | |
| self.distance_threshold, | |
| compute_symmetric=True, | |
| ) | |
| pairwise_visibility_ratios[i, j] = ( | |
| overlap["intersection1"] / pixels_count | |
| ) | |
| pairwise_visibility_ratios[j, i] = ( | |
| overlap["intersection2"] / pixels_count | |
| ) | |
| # IoU is relative to the image 0 | |
| data = { | |
| "observations": viewpoints_observations, | |
| "positions": np.asarray(viewpoints_positions), | |
| "orientations": np.asarray(viewpoints_orientations), | |
| "covisibility_ratios": np.asarray(viewpoints_covisibility), | |
| "valid_fractions": np.asarray(viewpoints_valid_fractions, dtype=float), | |
| "pairwise_visibility_ratios": np.asarray( | |
| pairwise_visibility_ratios, dtype=float | |
| ), | |
| } | |
| if self.transform is not None: | |
| data = self.transform(data) | |
| return data | |
| def generate_random_spiral_trajectory( | |
| self, | |
| images_count=100, | |
| max_radius=0.5, | |
| half_turns=5, | |
| use_constant_orientation=False, | |
| ): | |
| """ | |
| Return a list of images corresponding to a spiral trajectory from a random starting point. | |
| Useful to generate nice visualisations. | |
| Use an even number of half turns to get a nice "C1-continuous" loop effect | |
| """ | |
| ref_position, ref_orientation, navpoint = self.sample_random_viewpoint() | |
| ref_observations = self.render_viewpoint(ref_position, ref_orientation) | |
| ref_pointcloud = compute_pointcloud( | |
| depthmap=ref_observations["depth"], | |
| hfov=self.hfov, | |
| camera_position=ref_position, | |
| camera_rotation=ref_orientation, | |
| ) | |
| pixels_count = self.resolution[0] * self.resolution[1] | |
| if len(ref_pointcloud) / pixels_count < self.minimum_valid_fraction: | |
| # Dirty hack: ensure that the valid part of the image is significant | |
| return self.generate_random_spiral_trajectory( | |
| images_count, max_radius, half_turns, use_constant_orientation | |
| ) | |
| # Pick an observed point in the point cloud | |
| observed_point = np.mean(ref_pointcloud, axis=0) | |
| ref_R, ref_t = compute_camera_pose_opencv_convention( | |
| ref_position, ref_orientation | |
| ) | |
| images = [] | |
| is_valid = [] | |
| # Spiral trajectory, use_constant orientation | |
| for i, alpha in enumerate(np.linspace(0, 1, images_count)): | |
| r = max_radius * np.abs( | |
| np.sin(alpha * np.pi) | |
| ) # Increase then decrease the radius | |
| theta = alpha * half_turns * np.pi | |
| x = r * np.cos(theta) | |
| y = r * np.sin(theta) | |
| z = 0.0 | |
| position = ( | |
| ref_position + (ref_R @ np.asarray([x, y, z]).reshape(3, 1)).flatten() | |
| ) | |
| if use_constant_orientation: | |
| orientation = ref_orientation | |
| else: | |
| # trajectory looking at a mean point in front of the ref observation | |
| orientation, position = look_at_for_habitat( | |
| eye=position, center=observed_point, up=habitat_sim.geo.UP | |
| ) | |
| observations = self.render_viewpoint(position, orientation) | |
| images.append(observations["color"][..., :3]) | |
| _is_valid, valid_fraction, iou = self.is_other_viewpoint_overlapping( | |
| ref_pointcloud, observations, position, orientation | |
| ) | |
| is_valid.append(_is_valid) | |
| return images, np.all(is_valid) | |