Spaces:
Running
on
Zero
Running
on
Zero
| import os | |
| import numpy as np | |
| import os.path as osp | |
| from PIL import Image | |
| from tqdm import tqdm | |
| import csv | |
| import imageio | |
| def _read_image(img_rel_path) -> np.ndarray: | |
| image_to_read = img_rel_path | |
| image = Image.open(image_to_read) # [H, W, rgb] | |
| image = np.asarray(image) | |
| return image | |
| def depth_read(filename): | |
| depth_in = _read_image(filename) | |
| depth_decoded = depth_in / 1000.0 | |
| return depth_decoded | |
| def extract_scannet( | |
| root, | |
| sample_len=-1, | |
| csv_save_path="", | |
| datatset_name="", | |
| scene_number=16, | |
| scene_frames_len=120, | |
| stride=1, | |
| saved_rgb_dir="", | |
| saved_disp_dir="", | |
| ): | |
| scenes_names = os.listdir(root) | |
| scenes_names = sorted(scenes_names)[:scene_number] | |
| all_samples = [] | |
| for i, seq_name in enumerate(tqdm(scenes_names)): | |
| all_img_names = os.listdir(osp.join(root, seq_name, "color")) | |
| all_img_names = [x for x in all_img_names if x.endswith(".jpg")] | |
| all_img_names = sorted(all_img_names, key=lambda x: int(x.split(".")[0])) | |
| all_img_names = all_img_names[:scene_frames_len:stride] | |
| print(f"sequence frame number: {len(all_img_names)}") | |
| seq_len = len(all_img_names) | |
| step = sample_len if sample_len > 0 else seq_len | |
| for ref_idx in range(0, seq_len, step): | |
| print(f"Progress: {seq_name}, {ref_idx // step + 1} / {seq_len//step}") | |
| video_imgs = [] | |
| video_depths = [] | |
| if (ref_idx + step) <= seq_len: | |
| ref_e = ref_idx + step | |
| else: | |
| continue | |
| for idx in range(ref_idx, ref_e): | |
| im_path = osp.join(root, seq_name, "color", all_img_names[idx]) | |
| depth_path = osp.join( | |
| root, seq_name, "depth", all_img_names[idx][:-3] + "png" | |
| ) | |
| depth = depth_read(depth_path) | |
| disp = depth | |
| video_depths.append(disp) | |
| video_imgs.append(np.array(Image.open(im_path))) | |
| disp_video = np.array(video_depths)[:, None] | |
| img_video = np.array(video_imgs)[..., 0:3] | |
| disp_video = disp_video[:, :, 8:-8, 11:-11] | |
| img_video = img_video[:, 8:-8, 11:-11, :] | |
| data_root = saved_rgb_dir + datatset_name | |
| disp_root = saved_disp_dir + datatset_name | |
| os.makedirs(data_root, exist_ok=True) | |
| os.makedirs(disp_root, exist_ok=True) | |
| img_video_dir = data_root | |
| disp_video_dir = disp_root | |
| img_video_path = os.path.join(img_video_dir, f"{seq_name}_rgb_left.mp4") | |
| disp_video_path = os.path.join(disp_video_dir, f"{seq_name}_disparity.npz") | |
| imageio.mimsave( | |
| img_video_path, img_video, fps=15, quality=9, macro_block_size=1 | |
| ) | |
| np.savez(disp_video_path, disparity=disp_video) | |
| sample = {} | |
| sample["filepath_left"] = os.path.join( | |
| f"{datatset_name}/{seq_name}_rgb_left.mp4" | |
| ) | |
| sample["filepath_disparity"] = os.path.join( | |
| f"{datatset_name}/{seq_name}_disparity.npz" | |
| ) | |
| all_samples.append(sample) | |
| filename_ = csv_save_path | |
| os.makedirs(os.path.dirname(filename_), exist_ok=True) | |
| fields = ["filepath_left", "filepath_disparity"] | |
| with open(filename_, "w") as csvfile: | |
| writer = csv.DictWriter(csvfile, fieldnames=fields) | |
| writer.writeheader() | |
| writer.writerows(all_samples) | |
| print(f"{filename_} has been saved.") | |
| if __name__ == "__main__": | |
| extract_scannet( | |
| root="path/to/ScanNet_v2/raw/scans_test", | |
| saved_rgb_dir="./benchmark/datasets/", | |
| saved_disp_dir="./benchmark/datasets/", | |
| csv_save_path=f"./benchmark/datasets/scannet.csv", | |
| sample_len=-1, | |
| datatset_name="scannet", | |
| scene_number=100, | |
| scene_frames_len=90 * 3, | |
| stride=3, | |
| ) | |