nova_rerun_bridge
31class NovaRerunBridge: 32 """Bridge between Nova and Rerun for visualization. 33 34 This class provides functionality to visualize Nova data in Rerun. 35 It handles trajectoy, collision scenes, blueprints and proper cleanup of resources. 36 37 Example: 38 ```python 39 from nova.core.nova import Nova 40 from nova_rerun_bridge import NovaRerunBridge 41 42 async def main(): 43 nova = Nova() 44 async with NovaRerunBridge(nova) as bridge: 45 await bridge.setup_blueprint() 46 await bridge.log_collision_scenes() 47 48 ``` 49 50 Args: 51 nova (Nova): Instance of Nova client 52 spawn (bool, optional): Whether to spawn Rerun viewer. Defaults to True. 53 """ 54 55 def __init__(self, nova: Nova, spawn: bool = True, recording_id=None) -> None: 56 self._ensure_models_exist() 57 self.nova = nova 58 self._streaming_tasks: dict[MotionGroup, asyncio.Task] = {} 59 if spawn: 60 recording_id = recording_id or f"nova_{datetime.now().strftime('%Y%m%d_%H%M%S')}" 61 rr.init(application_id="nova", recording_id=recording_id, spawn=True) 62 logger.add(sink=rr.LoggingHandler("logs/handler")) 63 64 def _ensure_models_exist(self): 65 """Ensure robot models are downloaded""" 66 models_dir = Path(get_project_root()) / "models" 67 if not models_dir.exists() or not list(models_dir.glob("*.glb")): 68 print("Models not found, run update_robot_models() or poetry run download-models") 69 70 async def setup_blueprint(self) -> None: 71 """Configure and send blueprint configuration to Rerun. 72 73 Fetches motion groups from Nova and configures visualization layout. 74 """ 75 cell = self.nova.cell() 76 77 controllers = await cell.controllers() 78 motion_groups = [] 79 80 if not controllers: 81 logger.warning("No controllers found") 82 return 83 84 for controller in controllers: 85 for motion_group in await controller.activated_motion_groups(): 86 motion_groups.append(motion_group.motion_group_id) 87 88 rr.reset_time() 89 rr.set_time_seconds(TIME_INTERVAL_NAME, 0) 90 91 send_blueprint(motion_groups) 92 self.log_coordinate_system() 93 94 def log_coordinate_system(self) -> None: 95 """Log the coordinate system of the cell.""" 96 97 coordinate_origins = np.zeros((3, 3)) # Origin points for x, y, z arrows 98 coordinate_vectors = ( 99 np.array( 100 [ 101 [1.0, 0.0, 0.0], # X direction 102 [0.0, 1.0, 0.0], # Y direction 103 [0.0, 0.0, 1.0], # Z direction 104 ] 105 ) 106 * 200.0 107 ) # Scale factor of 200.0 for better visibility 108 109 coordinate_colors = np.array( 110 [ 111 [1.0, 0.125, 0.376, 1.0], # #ff2060 - Red/Pink for X 112 [0.125, 0.875, 0.502, 1.0], # #20df80 - Green for Y 113 [0.125, 0.502, 1.0, 1.0], # #2080ff - Blue for Z 114 ] 115 ) 116 117 rr.log( 118 "coordinate_system_world", 119 rr.Arrows3D( 120 origins=coordinate_origins, 121 vectors=coordinate_vectors, 122 colors=coordinate_colors, 123 radii=rr.Radius.ui_points([5.0]), 124 ), 125 static=True, 126 ) 127 128 async def log_collision_scenes(self) -> dict[str, models.CollisionScene]: 129 """Fetch and log all collision scenes from Nova to Rerun.""" 130 collision_scenes = ( 131 await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes( 132 cell=self.nova.cell()._cell_id 133 ) 134 ) 135 log_collision_scenes(collision_scenes) 136 return collision_scenes 137 138 async def log_collision_scene(self, scene_id: str) -> dict[str, models.CollisionScene]: 139 """Log a specific collision scene by its ID. 140 141 Args: 142 scene_id (str): The ID of the collision scene to log 143 144 Raises: 145 ValueError: If scene_id is not found in stored collision scenes 146 """ 147 collision_scenes = ( 148 await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes( 149 cell=self.nova.cell()._cell_id 150 ) 151 ) 152 153 if scene_id not in collision_scenes: 154 raise ValueError(f"Collision scene with ID {scene_id} not found") 155 156 log_collision_scenes({scene_id: collision_scenes[scene_id]}) 157 return {scene_id: collision_scenes[scene_id]} 158 159 def _log_collision_scene(self, collision_scenes: dict[str, models.CollisionScene]) -> None: 160 log_collision_scenes(collision_scenes=collision_scenes) 161 162 async def log_saftey_zones(self, motion_group: MotionGroup) -> None: 163 tcp_names = await motion_group.tcp_names() 164 tcp = tcp_names[0] 165 166 rr.reset_time() 167 rr.set_time_seconds(TIME_INTERVAL_NAME, 0) 168 169 log_safety_zones( 170 motion_group.motion_group_id, await motion_group._get_optimizer_setup(tcp=tcp) 171 ) 172 173 def log_saftey_zones_( 174 self, motion_group_id: str, optimizer_setup: models.OptimizerSetup 175 ) -> None: 176 log_safety_zones(motion_group_id, optimizer_setup) 177 178 async def log_motion( 179 self, 180 motion_id: str, 181 timing_mode=TimingMode.CONTINUE, 182 time_offset: float = 0, 183 tool_asset: str = None, 184 ) -> None: 185 # Fetch motion details from api 186 motion = await self.nova._api_client.motion_api.get_planned_motion( 187 self.nova.cell()._cell_id, motion_id 188 ) 189 optimizer_config = ( 190 await self.nova._api_client.motion_group_infos_api.get_optimizer_configuration( 191 self.nova.cell()._cell_id, motion.motion_group 192 ) 193 ) 194 trajectory = await self.nova._api_client.motion_api.get_motion_trajectory( 195 self.nova.cell()._cell_id, motion_id, int(RECORDING_INTERVAL * 1000) 196 ) 197 198 motion_groups = await self.nova._api_client.motion_group_api.list_motion_groups( 199 self.nova.cell()._cell_id 200 ) 201 motion_motion_group = next( 202 (mg for mg in motion_groups.instances if mg.motion_group == motion.motion_group), None 203 ) 204 205 collision_scenes = ( 206 await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes( 207 cell=self.nova.cell()._cell_id 208 ) 209 ) 210 211 if motion_motion_group is None: 212 raise ValueError(f"Motion group {motion.motion_group} not found") 213 214 log_motion( 215 motion_id=motion_id, 216 model_from_controller=motion_motion_group.model_from_controller, 217 motion_group=motion.motion_group, 218 optimizer_config=optimizer_config, 219 trajectory=trajectory.trajectory, 220 collision_scenes=collision_scenes, 221 time_offset=time_offset, 222 timing_mode=timing_mode, 223 tool_asset=tool_asset, 224 ) 225 226 async def log_trajectory( 227 self, 228 joint_trajectory: models.JointTrajectory, 229 tcp: str, 230 motion_group: MotionGroup, 231 timing_mode=TimingMode.CONTINUE, 232 time_offset: float = 0, 233 tool_asset: str = None, 234 ) -> None: 235 if len(joint_trajectory.joint_positions) == 0: 236 raise ValueError("No joint trajectory provided") 237 load_plan_response = await motion_group._load_planned_motion(joint_trajectory, tcp) 238 await self.log_motion( 239 load_plan_response.motion, 240 timing_mode=timing_mode, 241 time_offset=time_offset, 242 tool_asset=tool_asset, 243 ) 244 245 def continue_after_sync(self) -> None: 246 continue_after_sync() 247 248 async def log_error_feedback( 249 self, error_feedback: PlanTrajectoryFailedResponseErrorFeedback 250 ) -> None: 251 if isinstance(error_feedback.actual_instance, FeedbackOutOfWorkspace): 252 if ( 253 error_feedback.actual_instance.invalid_tcp_pose 254 and error_feedback.actual_instance.invalid_tcp_pose.position 255 ): 256 position = error_feedback.actual_instance.invalid_tcp_pose.position 257 rr.log( 258 "motion/errors/FeedbackOutOfWorkspace", 259 rr.Points3D( 260 [[position[0], position[1], position[2]]], 261 radii=rr.Radius.ui_points([5.0]), 262 colors=[(255, 0, 0, 255)], 263 labels=["Out of Workspace"], 264 ), 265 static=True, 266 ) 267 268 async def start_streaming(self, motion_group: MotionGroup) -> None: 269 """Start streaming real-time robot state to Rerun viewer.""" 270 if motion_group in self._streaming_tasks: 271 return 272 273 task = asyncio.create_task(stream_motion_group(self, motion_group=motion_group)) 274 self._streaming_tasks[motion_group] = task 275 276 async def stop_streaming(self) -> None: 277 """Stop all streaming tasks.""" 278 for task in self._streaming_tasks.values(): 279 task.cancel() 280 self._streaming_tasks.clear() 281 282 async def log_actions( 283 self, actions: list[Action | CollisionFreeMotion] | Action, show_connection: bool = False 284 ) -> None: 285 from nova_rerun_bridge import trajectory 286 287 rr.reset_time() 288 rr.set_time_seconds(TIME_INTERVAL_NAME, trajectory._last_end_time) 289 290 if not isinstance(actions, list): 291 actions = [actions] 292 293 if len(actions) == 0: 294 raise ValueError("No actions provided") 295 296 # Collect poses from regular actions 297 regular_actions = [ 298 action for action in actions if isinstance(action, (Motion, WriteAction)) 299 ] 300 regular_poses = ( 301 CombinedActions(items=tuple(regular_actions)).poses() if regular_actions else [] 302 ) 303 304 # Collect poses from CollisionFreeMotion targets 305 collision_free_poses = [ 306 action.target 307 for action in actions 308 if isinstance(action, CollisionFreeMotion) and isinstance(action.target, Pose) 309 ] 310 311 # Combine all poses 312 all_poses = regular_poses + collision_free_poses 313 positions = [] 314 point_colors = [] 315 use_red = False 316 317 # Collect all positions and determine colors 318 for i, action in enumerate(actions): 319 if isinstance(action, WriteAction): 320 use_red = True 321 if i < len(all_poses): # Only process if there's a corresponding pose 322 pose = all_poses[i] 323 logger.debug(f"Pose: {pose}") 324 positions.append([pose.position.x, pose.position.y, pose.position.z]) 325 point_colors.append(colors.colors[1] if use_red else colors.colors[9]) 326 327 # Log all positions at once 328 rr.log( 329 "motion/actions", 330 rr.Points3D(positions, colors=point_colors, radii=rr.Radius.ui_points([5.0])), 331 static=True, 332 ) 333 334 if show_connection: 335 rr.log( 336 "motion/actions/connection", 337 rr.LineStrips3D([positions], colors=[155, 155, 155, 50]), 338 ) 339 340 async def __aenter__(self) -> "NovaRerunBridge": 341 """Context manager entry point. 342 343 Returns: 344 NovaRerunBridge: Self reference for context manager usage. 345 """ 346 return self 347 348 async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: 349 """Context manager exit point, ensures cleanup.""" 350 await self.cleanup() 351 352 async def cleanup(self) -> None: 353 """Cleanup resources and close Nova API client connection.""" 354 if hasattr(self.nova, "_api_client"): 355 await self.nova._api_client.close()
Bridge between Nova and Rerun for visualization.
This class provides functionality to visualize Nova data in Rerun. It handles trajectoy, collision scenes, blueprints and proper cleanup of resources.
Example:
from nova.core.nova import Nova
from nova_rerun_bridge import NovaRerunBridge
async def main():
nova = Nova()
async with NovaRerunBridge(nova) as bridge:
await bridge.setup_blueprint()
await bridge.log_collision_scenes()
Args: nova (Nova): Instance of Nova client spawn (bool, optional): Whether to spawn Rerun viewer. Defaults to True.
55 def __init__(self, nova: Nova, spawn: bool = True, recording_id=None) -> None: 56 self._ensure_models_exist() 57 self.nova = nova 58 self._streaming_tasks: dict[MotionGroup, asyncio.Task] = {} 59 if spawn: 60 recording_id = recording_id or f"nova_{datetime.now().strftime('%Y%m%d_%H%M%S')}" 61 rr.init(application_id="nova", recording_id=recording_id, spawn=True) 62 logger.add(sink=rr.LoggingHandler("logs/handler"))
70 async def setup_blueprint(self) -> None: 71 """Configure and send blueprint configuration to Rerun. 72 73 Fetches motion groups from Nova and configures visualization layout. 74 """ 75 cell = self.nova.cell() 76 77 controllers = await cell.controllers() 78 motion_groups = [] 79 80 if not controllers: 81 logger.warning("No controllers found") 82 return 83 84 for controller in controllers: 85 for motion_group in await controller.activated_motion_groups(): 86 motion_groups.append(motion_group.motion_group_id) 87 88 rr.reset_time() 89 rr.set_time_seconds(TIME_INTERVAL_NAME, 0) 90 91 send_blueprint(motion_groups) 92 self.log_coordinate_system()
Configure and send blueprint configuration to Rerun.
Fetches motion groups from Nova and configures visualization layout.
94 def log_coordinate_system(self) -> None: 95 """Log the coordinate system of the cell.""" 96 97 coordinate_origins = np.zeros((3, 3)) # Origin points for x, y, z arrows 98 coordinate_vectors = ( 99 np.array( 100 [ 101 [1.0, 0.0, 0.0], # X direction 102 [0.0, 1.0, 0.0], # Y direction 103 [0.0, 0.0, 1.0], # Z direction 104 ] 105 ) 106 * 200.0 107 ) # Scale factor of 200.0 for better visibility 108 109 coordinate_colors = np.array( 110 [ 111 [1.0, 0.125, 0.376, 1.0], # #ff2060 - Red/Pink for X 112 [0.125, 0.875, 0.502, 1.0], # #20df80 - Green for Y 113 [0.125, 0.502, 1.0, 1.0], # #2080ff - Blue for Z 114 ] 115 ) 116 117 rr.log( 118 "coordinate_system_world", 119 rr.Arrows3D( 120 origins=coordinate_origins, 121 vectors=coordinate_vectors, 122 colors=coordinate_colors, 123 radii=rr.Radius.ui_points([5.0]), 124 ), 125 static=True, 126 )
Log the coordinate system of the cell.
128 async def log_collision_scenes(self) -> dict[str, models.CollisionScene]: 129 """Fetch and log all collision scenes from Nova to Rerun.""" 130 collision_scenes = ( 131 await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes( 132 cell=self.nova.cell()._cell_id 133 ) 134 ) 135 log_collision_scenes(collision_scenes) 136 return collision_scenes
Fetch and log all collision scenes from Nova to Rerun.
138 async def log_collision_scene(self, scene_id: str) -> dict[str, models.CollisionScene]: 139 """Log a specific collision scene by its ID. 140 141 Args: 142 scene_id (str): The ID of the collision scene to log 143 144 Raises: 145 ValueError: If scene_id is not found in stored collision scenes 146 """ 147 collision_scenes = ( 148 await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes( 149 cell=self.nova.cell()._cell_id 150 ) 151 ) 152 153 if scene_id not in collision_scenes: 154 raise ValueError(f"Collision scene with ID {scene_id} not found") 155 156 log_collision_scenes({scene_id: collision_scenes[scene_id]}) 157 return {scene_id: collision_scenes[scene_id]}
Log a specific collision scene by its ID.
Args: scene_id (str): The ID of the collision scene to log
Raises: ValueError: If scene_id is not found in stored collision scenes
162 async def log_saftey_zones(self, motion_group: MotionGroup) -> None: 163 tcp_names = await motion_group.tcp_names() 164 tcp = tcp_names[0] 165 166 rr.reset_time() 167 rr.set_time_seconds(TIME_INTERVAL_NAME, 0) 168 169 log_safety_zones( 170 motion_group.motion_group_id, await motion_group._get_optimizer_setup(tcp=tcp) 171 )
178 async def log_motion( 179 self, 180 motion_id: str, 181 timing_mode=TimingMode.CONTINUE, 182 time_offset: float = 0, 183 tool_asset: str = None, 184 ) -> None: 185 # Fetch motion details from api 186 motion = await self.nova._api_client.motion_api.get_planned_motion( 187 self.nova.cell()._cell_id, motion_id 188 ) 189 optimizer_config = ( 190 await self.nova._api_client.motion_group_infos_api.get_optimizer_configuration( 191 self.nova.cell()._cell_id, motion.motion_group 192 ) 193 ) 194 trajectory = await self.nova._api_client.motion_api.get_motion_trajectory( 195 self.nova.cell()._cell_id, motion_id, int(RECORDING_INTERVAL * 1000) 196 ) 197 198 motion_groups = await self.nova._api_client.motion_group_api.list_motion_groups( 199 self.nova.cell()._cell_id 200 ) 201 motion_motion_group = next( 202 (mg for mg in motion_groups.instances if mg.motion_group == motion.motion_group), None 203 ) 204 205 collision_scenes = ( 206 await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes( 207 cell=self.nova.cell()._cell_id 208 ) 209 ) 210 211 if motion_motion_group is None: 212 raise ValueError(f"Motion group {motion.motion_group} not found") 213 214 log_motion( 215 motion_id=motion_id, 216 model_from_controller=motion_motion_group.model_from_controller, 217 motion_group=motion.motion_group, 218 optimizer_config=optimizer_config, 219 trajectory=trajectory.trajectory, 220 collision_scenes=collision_scenes, 221 time_offset=time_offset, 222 timing_mode=timing_mode, 223 tool_asset=tool_asset, 224 )
226 async def log_trajectory( 227 self, 228 joint_trajectory: models.JointTrajectory, 229 tcp: str, 230 motion_group: MotionGroup, 231 timing_mode=TimingMode.CONTINUE, 232 time_offset: float = 0, 233 tool_asset: str = None, 234 ) -> None: 235 if len(joint_trajectory.joint_positions) == 0: 236 raise ValueError("No joint trajectory provided") 237 load_plan_response = await motion_group._load_planned_motion(joint_trajectory, tcp) 238 await self.log_motion( 239 load_plan_response.motion, 240 timing_mode=timing_mode, 241 time_offset=time_offset, 242 tool_asset=tool_asset, 243 )
248 async def log_error_feedback( 249 self, error_feedback: PlanTrajectoryFailedResponseErrorFeedback 250 ) -> None: 251 if isinstance(error_feedback.actual_instance, FeedbackOutOfWorkspace): 252 if ( 253 error_feedback.actual_instance.invalid_tcp_pose 254 and error_feedback.actual_instance.invalid_tcp_pose.position 255 ): 256 position = error_feedback.actual_instance.invalid_tcp_pose.position 257 rr.log( 258 "motion/errors/FeedbackOutOfWorkspace", 259 rr.Points3D( 260 [[position[0], position[1], position[2]]], 261 radii=rr.Radius.ui_points([5.0]), 262 colors=[(255, 0, 0, 255)], 263 labels=["Out of Workspace"], 264 ), 265 static=True, 266 )
268 async def start_streaming(self, motion_group: MotionGroup) -> None: 269 """Start streaming real-time robot state to Rerun viewer.""" 270 if motion_group in self._streaming_tasks: 271 return 272 273 task = asyncio.create_task(stream_motion_group(self, motion_group=motion_group)) 274 self._streaming_tasks[motion_group] = task
Start streaming real-time robot state to Rerun viewer.
276 async def stop_streaming(self) -> None: 277 """Stop all streaming tasks.""" 278 for task in self._streaming_tasks.values(): 279 task.cancel() 280 self._streaming_tasks.clear()
Stop all streaming tasks.
282 async def log_actions( 283 self, actions: list[Action | CollisionFreeMotion] | Action, show_connection: bool = False 284 ) -> None: 285 from nova_rerun_bridge import trajectory 286 287 rr.reset_time() 288 rr.set_time_seconds(TIME_INTERVAL_NAME, trajectory._last_end_time) 289 290 if not isinstance(actions, list): 291 actions = [actions] 292 293 if len(actions) == 0: 294 raise ValueError("No actions provided") 295 296 # Collect poses from regular actions 297 regular_actions = [ 298 action for action in actions if isinstance(action, (Motion, WriteAction)) 299 ] 300 regular_poses = ( 301 CombinedActions(items=tuple(regular_actions)).poses() if regular_actions else [] 302 ) 303 304 # Collect poses from CollisionFreeMotion targets 305 collision_free_poses = [ 306 action.target 307 for action in actions 308 if isinstance(action, CollisionFreeMotion) and isinstance(action.target, Pose) 309 ] 310 311 # Combine all poses 312 all_poses = regular_poses + collision_free_poses 313 positions = [] 314 point_colors = [] 315 use_red = False 316 317 # Collect all positions and determine colors 318 for i, action in enumerate(actions): 319 if isinstance(action, WriteAction): 320 use_red = True 321 if i < len(all_poses): # Only process if there's a corresponding pose 322 pose = all_poses[i] 323 logger.debug(f"Pose: {pose}") 324 positions.append([pose.position.x, pose.position.y, pose.position.z]) 325 point_colors.append(colors.colors[1] if use_red else colors.colors[9]) 326 327 # Log all positions at once 328 rr.log( 329 "motion/actions", 330 rr.Points3D(positions, colors=point_colors, radii=rr.Radius.ui_points([5.0])), 331 static=True, 332 ) 333 334 if show_connection: 335 rr.log( 336 "motion/actions/connection", 337 rr.LineStrips3D([positions], colors=[155, 155, 155, 50]), 338 )