|
| 1 | +"""Converts dynamic object annotations to KITTI format.""" |
| 2 | +import argparse |
| 3 | +import glob |
| 4 | +import json |
| 5 | +import os |
| 6 | +from datetime import datetime |
| 7 | +from os.path import join, basename |
| 8 | +from typing import List, Callable |
| 9 | + |
| 10 | +import numpy as np |
| 11 | +from pyquaternion import Quaternion |
| 12 | +from tqdm import tqdm |
| 13 | + |
| 14 | +from calibration import ( |
| 15 | + load_calib_from_json, |
| 16 | + get_3d_transform_camera_lidar, |
| 17 | + rigid_transform_3d, |
| 18 | + transform_rotation, |
| 19 | +) |
| 20 | +from constants import TIME_FORMAT, SIZE, LOCATION, ROTATION |
| 21 | +from plot_objects_on_image import ObjectAnnotationHandler |
| 22 | + |
| 23 | +IMAGE_DIMS = np.array([3848, 2168]) # width, height |
| 24 | + |
| 25 | +OCCLUSION_MAP = { |
| 26 | + "None": 0, |
| 27 | + "Light": 1, |
| 28 | + "Medium": 1, |
| 29 | + "Heavy": 2, |
| 30 | + "VeryHeavy": 2, |
| 31 | + "Undefined": 2, # If undefined we assume the worst |
| 32 | +} |
| 33 | + |
| 34 | + |
| 35 | +def _parse_class(obj_properties): |
| 36 | + obj_cls = obj_properties["class"] |
| 37 | + if obj_cls not in ("VulnerableVehicle", "Vehicle", "Pedestrian"): |
| 38 | + # Remove Animals, Debris, Movers and any other unwanted classes |
| 39 | + return None |
| 40 | + elif obj_properties["unclear"] or obj_properties["object_type"] == "Inconclusive": |
| 41 | + # Ignore unclear and inconclusive objects |
| 42 | + obj_cls = "DontCare" |
| 43 | + elif obj_cls == "VulnerableVehicle": |
| 44 | + # Rename the VulnerableVehicle class to Cyclist to match KITTI |
| 45 | + obj_cls = "Cyclist" |
| 46 | + # Remove stuff without rider |
| 47 | + if obj_properties.get("with_rider", "True") == "False": |
| 48 | + return None |
| 49 | + # Ignore everything that's not a bicyclist or motorbicyclist |
| 50 | + elif obj_properties["object_type"] not in ("Bicycle", "Motorcycle"): |
| 51 | + obj_cls = "DontCare" |
| 52 | + elif obj_cls == "Vehicle": |
| 53 | + # Ignore more exotic vehicle classes (HeavyEquip, TramTrain, Other) |
| 54 | + if obj_properties["object_type"] not in ("Car", "Van", "Truck", "Trailer", "Bus"): |
| 55 | + obj_cls = "DontCare" |
| 56 | + elif obj_cls == "Pedestrian": |
| 57 | + # No special treatment for pedestrians |
| 58 | + pass |
| 59 | + return obj_cls |
| 60 | + |
| 61 | + |
| 62 | +def _convert_to_kitti( |
| 63 | + objects: List[ObjectAnnotationHandler], yaw_func: Callable[[Quaternion], float] |
| 64 | +) -> List[str]: |
| 65 | + kitti_annotation_lines = [] |
| 66 | + for obj in objects: |
| 67 | + class_name = _parse_class(obj.properties) |
| 68 | + if class_name is None: |
| 69 | + continue # discard object |
| 70 | + truncation, xmax, xmin, ymax, ymin = _parse_bbox_2d(obj.outer_points) |
| 71 | + if obj.marking3d is None: |
| 72 | + size, location, yaw, alpha = [0, 0, 0], [0, 0, 0], 0, 0 |
| 73 | + else: |
| 74 | + size = obj.marking3d[SIZE][::-1] # H,W,L not L,W,H |
| 75 | + location = obj.marking3d[LOCATION] # x,y,z |
| 76 | + yaw = yaw_func(obj.marking3d[ROTATION]) |
| 77 | + alpha = 0 # TODO: calculate this! |
| 78 | + if class_name != "DontCare" and "occlusion_ratio" not in obj.properties: |
| 79 | + print("Missing occlusion for obj: ", obj) |
| 80 | + kitti_obj = " ".join( |
| 81 | + map( |
| 82 | + str, |
| 83 | + [ |
| 84 | + class_name, |
| 85 | + truncation, |
| 86 | + OCCLUSION_MAP[obj.properties.get("occlusion_ratio", "Undefined")], |
| 87 | + alpha, |
| 88 | + xmin, |
| 89 | + ymin, |
| 90 | + xmax, |
| 91 | + ymax, |
| 92 | + *size, |
| 93 | + *location, |
| 94 | + yaw, |
| 95 | + ], |
| 96 | + ) |
| 97 | + ) |
| 98 | + kitti_annotation_lines.append(kitti_obj) |
| 99 | + return kitti_annotation_lines |
| 100 | + |
| 101 | + |
| 102 | +def _parse_bbox_2d(outer_points): |
| 103 | + xmin_nonclip, ymin_nonclip = np.min(outer_points, axis=0) |
| 104 | + xmax_nonclip, ymax_nonclip = np.max(outer_points, axis=0) |
| 105 | + xmin, ymin = np.clip([xmin_nonclip, ymin_nonclip], a_min=0, a_max=IMAGE_DIMS) |
| 106 | + xmax, ymax = np.clip([xmax_nonclip, ymax_nonclip], a_min=0, a_max=IMAGE_DIMS) |
| 107 | + new_area = (xmax - xmin) * (ymax - ymin) |
| 108 | + old_area = (xmax_nonclip - xmin_nonclip) * (ymax_nonclip - ymin_nonclip) |
| 109 | + truncation = 1 - new_area / old_area if old_area > 0.1 else 0 |
| 110 | + return truncation, xmax, xmin, ymax, ymin |
| 111 | + |
| 112 | + |
| 113 | +def _lidar_to_camera(objects, calib): |
| 114 | + for obj in objects: |
| 115 | + if obj.marking3d is None: |
| 116 | + continue |
| 117 | + transform = get_3d_transform_camera_lidar(calib) |
| 118 | + obj.marking3d[ROTATION] = transform_rotation(obj.marking3d[ROTATION], transform) |
| 119 | + obj.marking3d[LOCATION] = rigid_transform_3d(obj.marking3d[LOCATION], transform) |
| 120 | + return objects |
| 121 | + |
| 122 | + |
| 123 | +def convert_annotation(calib_path, src_anno_pth, target_path): |
| 124 | + with open(src_anno_pth) as anno_file: |
| 125 | + src_anno = json.load(anno_file) |
| 126 | + vehicle, camera_name, time_str, id_ = basename(src_anno_pth.strip(".json")).split("_") |
| 127 | + id_ = int(id_) |
| 128 | + objects = ObjectAnnotationHandler.from_annotations(src_anno) |
| 129 | + objects = [obj[2] for obj in objects] |
| 130 | + |
| 131 | + # Convert objects from LIDAR to camera using calibration information |
| 132 | + frame_time = datetime.strptime(time_str, TIME_FORMAT) |
| 133 | + calib = load_calib_from_json(calib_path, vehicle, frame_time, camera_name) |
| 134 | + objects = _lidar_to_camera(objects, calib) |
| 135 | + # Write a KITTI-style annotation with obj in camera frame |
| 136 | + target_anno = _convert_to_kitti(objects, yaw_func=lambda rot: -rot.yaw_pitch_roll[0]) |
| 137 | + with open(join(target_path, f"{id_:06d}.txt"), "w") as target_file: |
| 138 | + target_file.write("\n".join(target_anno)) |
| 139 | + |
| 140 | + |
| 141 | +def _parse_args(): |
| 142 | + parser = argparse.ArgumentParser(description="Convert annotations to KITTI format") |
| 143 | + parser.add_argument("--dataset-dir", required=True, help="Root dataset directory") |
| 144 | + parser.add_argument("--target-dir", required=True, help="Output directory") |
| 145 | + return parser.parse_args() |
| 146 | + |
| 147 | + |
| 148 | +def main(): |
| 149 | + args = _parse_args() |
| 150 | + calib_path = join(args.dataset_dir, "calibration") |
| 151 | + source_path = join(args.dataset_dir, "annotations", "dynamic_objects") |
| 152 | + assert args.dataset_dir not in args.target_dir, "Do not write to the dataset" |
| 153 | + |
| 154 | + print("Looking up all source annotations...") |
| 155 | + source_anno_paths = glob.glob(f"{source_path}/*/*.json") |
| 156 | + |
| 157 | + # Create target directories |
| 158 | + os.makedirs(args.target_dir, exist_ok=True) |
| 159 | + |
| 160 | + for src_anno_pth in tqdm(source_anno_paths, desc="Converting annotations..."): |
| 161 | + try: |
| 162 | + convert_annotation(calib_path, src_anno_pth, args.target_dir) |
| 163 | + except Exception as err: |
| 164 | + print("Failed converting annotation: ", src_anno_pth, "with error:", str(err)) |
| 165 | + raise |
| 166 | + |
| 167 | + |
| 168 | +if __name__ == "__main__": |
| 169 | + main() |
0 commit comments