7
7
import pathlib
8
8
from typing import Optional
9
9
10
- from PIL import Image
11
10
import numpy as np
12
11
import rerun as rr # pip install rerun-sdk
13
12
import scipy .spatial .transform as st
14
13
import trimesh
14
+ import trimesh .visual
15
+ from PIL import Image
15
16
from urdf_parser_py import urdf as urdf_parser
16
17
17
18
18
19
class URDFLogger :
19
20
"""Class to log a URDF to Rerun."""
20
21
21
- def __init__ (self , filepath : str ) -> None :
22
+ def __init__ (self , filepath : str , entity_path_prefix : Optional [ str ] ) -> None :
22
23
self .urdf = urdf_parser .URDF .from_xml_file (filepath )
24
+ self .entity_path_prefix = entity_path_prefix
23
25
self .mat_name_to_mat = {mat .name : mat for mat in self .urdf .materials }
24
26
25
27
def link_entity_path (self , link : urdf_parser .Link ) -> str :
26
28
"""Return the entity path for the URDF link."""
27
29
root_name = self .urdf .get_root ()
28
30
link_names = self .urdf .get_chain (root_name , link .name )[0 ::2 ] # skip the joints
29
- return "/" .join (link_names )
31
+ return self . add_entity_path_prefix ( "/" .join (link_names ) )
30
32
31
33
def joint_entity_path (self , joint : urdf_parser .Joint ) -> str :
32
34
"""Return the entity path for the URDF joint."""
33
35
root_name = self .urdf .get_root ()
34
- link_names = self .urdf .get_chain (root_name , joint .child )[0 ::2 ] # skip the joints
35
- return "/" .join (link_names )
36
+ joint_names = self .urdf .get_chain (root_name , joint .child )[0 ::2 ] # skip the links
37
+ return self .add_entity_path_prefix ("/" .join (joint_names ))
38
+
39
+ def add_entity_path_prefix (self , entity_path : str ) -> str :
40
+ """Add prefix (if passed) to entity path."""
41
+ if self .entity_path_prefix is not None :
42
+ return f"{ self .entity_path_prefix } /{ entity_path } "
43
+ return entity_path
36
44
37
45
def log (self ) -> None :
38
46
"""Log a URDF file to Rerun."""
39
- rr .log ("" , rr .ViewCoordinates .RIGHT_HAND_Z_UP , timeless = True ) # default ROS convention
40
-
41
47
for joint in self .urdf .joints :
42
48
entity_path = self .joint_entity_path (joint )
43
49
self .log_joint (entity_path , joint )
@@ -47,11 +53,13 @@ def log(self) -> None:
47
53
self .log_link (entity_path , link )
48
54
49
55
def log_link (self , entity_path : str , link : urdf_parser .Link ) -> None :
56
+ """Log a URDF link to Rerun."""
50
57
# create one mesh out of all visuals
51
58
for i , visual in enumerate (link .visuals ):
52
59
self .log_visual (entity_path + f"/visual_{ i } " , visual )
53
60
54
61
def log_joint (self , entity_path : str , joint : urdf_parser .Joint ) -> None :
62
+ """Log a URDF joint to Rerun."""
55
63
translation = rotation = None
56
64
57
65
if joint .origin is not None and joint .origin .xyz is not None :
@@ -63,6 +71,7 @@ def log_joint(self, entity_path: str, joint: urdf_parser.Joint) -> None:
63
71
rr .log (entity_path , rr .Transform3D (translation = translation , mat3x3 = rotation ))
64
72
65
73
def log_visual (self , entity_path : str , visual : urdf_parser .Visual ) -> None :
74
+ """Log a URDF visual to Rerun."""
66
75
material = None
67
76
if visual .material is not None :
68
77
if visual .material .color is None and visual .material .texture is None :
@@ -106,15 +115,15 @@ def log_visual(self, entity_path: str, visual: urdf_parser.Visual) -> None:
106
115
if isinstance (mesh_or_scene , trimesh .Scene ):
107
116
scene = mesh_or_scene
108
117
# use dump to apply scene graph transforms and get a list of transformed meshes
109
- for i , mesh in enumerate (scene . dump ( )):
118
+ for i , mesh in enumerate (scene_to_trimeshes ( scene )):
110
119
if material is not None :
111
120
if material .color is not None :
112
121
mesh .visual = trimesh .visual .ColorVisuals ()
113
122
mesh .visual .vertex_colors = material .color .rgba
114
123
elif material .texture is not None :
115
124
texture_path = resolve_ros_path (material .texture .filename )
116
125
mesh .visual = trimesh .visual .texture .TextureVisuals (image = Image .open (texture_path ))
117
- log_trimesh (entity_path + f"/{ i } " , mesh )
126
+ log_trimesh (entity_path + f"/{ i } " , mesh )
118
127
else :
119
128
mesh = mesh_or_scene
120
129
if material is not None :
@@ -127,33 +136,46 @@ def log_visual(self, entity_path: str, visual: urdf_parser.Visual) -> None:
127
136
log_trimesh (entity_path , mesh )
128
137
129
138
139
+ def scene_to_trimeshes (scene : trimesh .Scene ) -> list [trimesh .Trimesh ]:
140
+ """
141
+ Convert a trimesh.Scene to a list of trimesh.Trimesh.
142
+
143
+ Skips objects that are not an instance of trimesh.Trimesh.
144
+ """
145
+ trimeshes = []
146
+ scene_dump = scene .dump ()
147
+ geometries = [scene_dump ] if not isinstance (scene_dump , list ) else scene_dump
148
+ for geometry in geometries :
149
+ if isinstance (geometry , trimesh .Trimesh ):
150
+ trimeshes .append (geometry )
151
+ elif isinstance (geometry , trimesh .Scene ):
152
+ trimeshes .extend (scene_to_trimeshes (geometry ))
153
+ return trimeshes
154
+
155
+
130
156
def log_trimesh (entity_path : str , mesh : trimesh .Trimesh ) -> None :
131
157
vertex_colors = albedo_texture = vertex_texcoords = None
132
158
133
159
if isinstance (mesh .visual , trimesh .visual .color .ColorVisuals ):
134
160
vertex_colors = mesh .visual .vertex_colors
135
161
elif isinstance (mesh .visual , trimesh .visual .texture .TextureVisuals ):
136
- albedo_texture = mesh .visual .material .baseColorTexture
137
- if len (np .asarray (albedo_texture ).shape ) == 2 :
138
- # If the texture is grayscale, we need to convert it to RGB.
139
- albedo_texture = np .stack ([albedo_texture ] * 3 , axis = - 1 )
140
- vertex_texcoords = mesh .visual .uv
141
- # Trimesh uses the OpenGL convention for UV coordinates, so we need to flip the V coordinate
142
- # since Rerun uses the Vulkan/Metal/DX12/WebGPU convention.
143
- if vertex_texcoords is not None :
162
+ trimesh_material = mesh .visual .material
163
+
164
+ if mesh .visual .uv is not None :
165
+ vertex_texcoords = mesh .visual .uv
166
+ # Trimesh uses the OpenGL convention for UV coordinates, so we need to flip the V coordinate
167
+ # since Rerun uses the Vulkan/Metal/DX12/WebGPU convention.
144
168
vertex_texcoords [:, 1 ] = 1.0 - vertex_texcoords [:, 1 ]
145
- else :
146
- # Neither simple color nor texture, so we'll try to retrieve vertex colors via trimesh.
147
- try :
148
- colors = mesh . visual . to_color (). vertex_colors
149
- if len ( colors ) == 4 :
150
- # If trimesh gives us a single vertex color for the entire mesh, we can interpret that
151
- # as an albedo factor for the whole primitive.
152
- mesh_material = Material ( albedo_factor = np . array ( colors ) )
169
+
170
+ if isinstance ( trimesh_material , trimesh .visual . material . PBRMaterial ):
171
+ if trimesh_material . baseColorTexture is not None :
172
+ albedo_texture = pil_image_to_albedo_texture ( trimesh_material . baseColorTexture )
173
+
174
+ elif isinstance ( trimesh_material , trimesh . visual . material . SimpleMaterial ):
175
+ if trimesh_material . image is not None :
176
+ albedo_texture = pil_image_to_albedo_texture ( trimesh_material . image )
153
177
else :
154
- vertex_colors = colors
155
- except Exception :
156
- pass
178
+ vertex_colors = mesh .visual .to_color ().vertex_colors
157
179
158
180
rr .log (
159
181
entity_path ,
@@ -169,10 +191,10 @@ def log_trimesh(entity_path: str, mesh: trimesh.Trimesh) -> None:
169
191
)
170
192
171
193
172
- def resolve_ros_path (path : str ) -> str :
194
+ def resolve_ros_path (path_str : str ) -> str :
173
195
"""Resolve a ROS path to an absolute path."""
174
- if path .startswith ("package://" ):
175
- path = pathlib .Path (path )
196
+ if path_str .startswith ("package://" ):
197
+ path = pathlib .Path (path_str )
176
198
package_name = path .parts [1 ]
177
199
relative_path = pathlib .Path (* path .parts [2 :])
178
200
@@ -185,10 +207,10 @@ def resolve_ros_path(path: str) -> str:
185
207
)
186
208
187
209
return str (package_path / relative_path )
188
- elif str ( path ) .startswith ("file://" ):
189
- return path [len ("file://" ) :]
210
+ elif path_str .startswith ("file://" ):
211
+ return path_str [len ("file://" ) :]
190
212
else :
191
- return path
213
+ return path_str
192
214
193
215
194
216
def resolve_ros2_package (package_name : str ) -> Optional [str ]:
@@ -203,7 +225,7 @@ def resolve_ros2_package(package_name: str) -> Optional[str]:
203
225
return None
204
226
205
227
206
- def resolve_ros1_package (package_name : str ) -> str :
228
+ def resolve_ros1_package (package_name : str ) -> Optional [ str ] :
207
229
try :
208
230
import rospkg
209
231
@@ -215,46 +237,103 @@ def resolve_ros1_package(package_name: str) -> str:
215
237
return None
216
238
217
239
218
- # The Rerun Viewer will always pass these two pieces of information:
219
- # 1. The path to be loaded, as a positional arg.
220
- # 2. A shared recording ID, via the `--recording-id` flag.
221
- #
222
- # It is up to you whether you make use of that shared recording ID or not.
223
- # If you use it, the data will end up in the same recording as all other plugins interested in
224
- # that file, otherwise you can just create a dedicated recording for it. Or both.
225
- parser = argparse .ArgumentParser (
226
- description = """
227
- This is an example executable data-loader plugin for the Rerun Viewer.
228
- Any executable on your `$PATH` with a name that starts with `rerun-loader-` will be
229
- treated as an external data-loader.
230
-
231
- This example will load URDF files, logs them to Rerun,
232
- and returns a special exit code to indicate that it doesn't support anything else.
233
-
234
- To try it out, copy it in your $PATH as `rerun-loader-python-example-urdf`,
235
- then open a URDF file with Rerun (`rerun example.urdf`).
236
- """
237
- )
238
- parser .add_argument ("filepath" , type = str )
239
- parser .add_argument ("--recording-id" , type = str )
240
- args = parser .parse_args ()
240
+ def pil_image_to_albedo_texture (image : Image .Image ) -> np .ndarray :
241
+ """Convert a PIL image to an albedo texture."""
242
+ albedo_texture = np .asarray (image )
243
+ if albedo_texture .ndim == 2 :
244
+ # If the texture is grayscale, we need to convert it to RGB since
245
+ # Rerun expects a 3-channel texture.
246
+ # See: https://github.com/rerun-io/rerun/issues/4878
247
+ albedo_texture = np .stack ([albedo_texture ] * 3 , axis = - 1 )
248
+ return albedo_texture
241
249
242
250
243
251
def main () -> None :
252
+ # The Rerun Viewer will always pass these two pieces of information:
253
+ # 1. The path to be loaded, as a positional arg.
254
+ # 2. A shared recording ID, via the `--recording-id` flag.
255
+ #
256
+ # It is up to you whether you make use of that shared recording ID or not.
257
+ # If you use it, the data will end up in the same recording as all other plugins interested in
258
+ # that file, otherwise you can just create a dedicated recording for it. Or both.
259
+ parser = argparse .ArgumentParser (
260
+ description = """
261
+ This is an example executable data-loader plugin for the Rerun Viewer.
262
+ Any executable on your `$PATH` with a name that starts with `rerun-loader-` will be
263
+ treated as an external data-loader.
264
+
265
+ This example will load URDF files, logs them to Rerun,
266
+ and returns a special exit code to indicate that it doesn't support anything else.
267
+
268
+ To try it out, copy it in your $PATH as `rerun-loader-python-example-urdf`,
269
+ then open a URDF file with Rerun (`rerun example.urdf`).
270
+ """
271
+ )
272
+ parser .add_argument ("filepath" , type = str )
273
+
274
+ parser .add_argument ("--application-id" , type = str , help = "optional recommended ID for the application" )
275
+ parser .add_argument ("--recording-id" , type = str , help = "optional recommended ID for the recording" )
276
+ parser .add_argument ("--entity-path-prefix" , type = str , help = "optional prefix for all entity paths" )
277
+ parser .add_argument (
278
+ "--timeless" , action = "store_true" , default = False , help = "optionally mark data to be logged as timeless"
279
+ )
280
+ parser .add_argument (
281
+ "--time" ,
282
+ type = str ,
283
+ action = "append" ,
284
+ help = "optional timestamps to log at (e.g. `--time sim_time=1709203426`)" ,
285
+ )
286
+ parser .add_argument (
287
+ "--sequence" ,
288
+ type = str ,
289
+ action = "append" ,
290
+ help = "optional sequences to log at (e.g. `--sequence sim_frame=42`)" ,
291
+ )
292
+ args = parser .parse_args ()
293
+
244
294
is_file = os .path .isfile (args .filepath )
245
295
is_urdf_file = ".urdf" in args .filepath
246
296
247
297
# Inform the Rerun Viewer that we do not support that kind of file.
248
298
if not is_file or not is_urdf_file :
249
299
exit (rr .EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE )
250
300
251
- rr .init ("rerun_example_external_data_loader_urdf" , recording_id = args .recording_id )
301
+ if args .application_id is not None :
302
+ app_id = args .application_id
303
+ else :
304
+ app_id = args .filepath
305
+
306
+ rr .init (app_id , recording_id = args .recording_id )
252
307
# The most important part of this: log to standard output so the Rerun Viewer can ingest it!
253
308
rr .stdout ()
254
309
255
- urdf_logger = URDFLogger (args .filepath )
310
+ set_time_from_args (args )
311
+
312
+ if args .entity_path_prefix is not None :
313
+ prefix = args .entity_path_prefix
314
+ else :
315
+ prefix = os .path .basename (args .filepath )
316
+
317
+ urdf_logger = URDFLogger (args .filepath , prefix )
256
318
urdf_logger .log ()
257
319
258
320
321
+ def set_time_from_args (args ) -> None :
322
+ if not args .timeless and args .time is not None :
323
+ for time_str in args .time :
324
+ parts = time_str .split ("=" )
325
+ if len (parts ) != 2 :
326
+ continue
327
+ timeline_name , time = parts
328
+ rr .set_time_nanos (timeline_name , int (time ))
329
+
330
+ for time_str in args .sequence :
331
+ parts = time_str .split ("=" )
332
+ if len (parts ) != 2 :
333
+ continue
334
+ timeline_name , time = parts
335
+ rr .set_time_sequence (timeline_name , int (time ))
336
+
337
+
259
338
if __name__ == "__main__" :
260
339
main ()
0 commit comments