nova_rerun_bridge
34class NovaRerunBridge: 35 """Bridge between Nova and Rerun for visualization. 36 37 This class provides functionality to visualize Nova data in Rerun. 38 It handles trajectoy, collision scenes, blueprints and proper cleanup of resources. 39 40 Example: 41 ```python 42 from nova.core.nova import Nova 43 from nova_rerun_bridge import NovaRerunBridge 44 45 async def main(): 46 nova = Nova() 47 async with NovaRerunBridge(nova) as bridge: 48 await bridge.setup_blueprint() 49 await bridge.log_collision_scenes() 50 51 ``` 52 53 Args: 54 nova (Nova): Instance of Nova client 55 spawn (bool, optional): Whether to spawn Rerun viewer. Defaults to True. 56 """ 57 58 def __init__( 59 self, 60 nova: Nova, 61 spawn: bool = True, 62 recording_id=None, 63 show_details: bool = True, 64 show_collision_link_chain: bool = False, 65 show_safety_link_chain: bool = True, 66 ) -> None: 67 self._ensure_models_exist() 68 # Store the original nova instance for initial setup and to copy connection parameters 69 self.nova = nova 70 # Create a separate Nova instance for the bridge to use for API calls 71 self._bridge_nova = None 72 self._streaming_tasks: dict[MotionGroup, asyncio.Task] = {} 73 # Track timing per motion group - each motion group has its own timeline 74 self._motion_group_timers: dict[str, float] = {} 75 self.show_details = show_details 76 self.show_collision_link_chain = show_collision_link_chain 77 self.show_safety_link_chain = show_safety_link_chain 78 79 recording_id = recording_id or f"nova_{datetime.now().strftime('%Y%m%d_%H%M%S')}" 80 81 if "VSCODE_PROXY_URI" in os.environ: 82 rr.init(application_id="nova", recording_id=recording_id, spawn=False) 83 rr.save("nova.rrd") 84 logger.info(f"Install rerun app and open the visual log on {get_rerun_address()}") 85 elif spawn: 86 rr.init(application_id="nova", recording_id=recording_id, spawn=True) 87 88 logger.add(sink=rr.LoggingHandler("logs/handler")) 89 90 def _ensure_models_exist(self): 91 """Ensure robot models are downloaded""" 92 models_dir = Path(get_project_root()) / "models" 93 if not models_dir.exists() or not list(models_dir.glob("*.glb")): 94 print("Models not found, run update_robot_models() or uv run download-models") 95 96 async def setup_blueprint(self) -> None: 97 """Configure and send blueprint configuration to Rerun. 98 99 Fetches motion groups from Nova and configures visualization layout. 100 """ 101 nova = self._bridge_nova or self.nova 102 cell = nova.cell() 103 104 controllers = await cell.controllers() 105 motion_groups = [] 106 107 if not controllers: 108 logger.warning("No controllers found") 109 return 110 111 for controller in controllers: 112 for motion_group in await controller.activated_motion_groups(): 113 motion_groups.append(motion_group.motion_group_id) 114 115 rr.reset_time() 116 rr.set_time(TIME_INTERVAL_NAME, duration=0) 117 118 send_blueprint(motion_groups, self.show_details) 119 self.log_coordinate_system() 120 121 def log_coordinate_system(self) -> None: 122 """Log the coordinate system of the cell.""" 123 124 coordinate_origins = np.zeros((3, 3)) # Origin points for x, y, z arrows 125 coordinate_vectors = ( 126 np.array( 127 [ 128 [1.0, 0.0, 0.0], # X direction 129 [0.0, 1.0, 0.0], # Y direction 130 [0.0, 0.0, 1.0], # Z direction 131 ] 132 ) 133 * 200.0 134 ) # Scale factor of 200.0 for better visibility 135 136 coordinate_colors = np.array( 137 [ 138 [1.0, 0.125, 0.376, 1.0], # #ff2060 - Red/Pink for X 139 [0.125, 0.875, 0.502, 1.0], # #20df80 - Green for Y 140 [0.125, 0.502, 1.0, 1.0], # #2080ff - Blue for Z 141 ] 142 ) 143 144 rr.log( 145 "coordinate_system_world", 146 rr.Arrows3D( 147 origins=coordinate_origins, 148 vectors=coordinate_vectors, 149 colors=coordinate_colors, 150 radii=rr.Radius.ui_points([5.0]), 151 ), 152 static=True, 153 ) 154 155 async def log_collision_scenes(self) -> dict[str, models.CollisionScene]: 156 """Fetch and log all collision scenes from Nova to Rerun.""" 157 bridge_nova = self._bridge_nova or self.nova 158 collision_scenes = ( 159 await bridge_nova._api_client.store_collision_scenes_api.list_stored_collision_scenes( 160 cell=bridge_nova.cell()._cell_id 161 ) 162 ) 163 log_collision_scenes(collision_scenes) 164 return collision_scenes 165 166 async def log_collision_scene(self, scene_id: str) -> dict[str, models.CollisionScene]: 167 """Log a specific collision scene by its ID. 168 169 Args: 170 scene_id (str): The ID of the collision scene to log 171 172 Raises: 173 ValueError: If scene_id is not found in stored collision scenes 174 """ 175 bridge_nova = self._bridge_nova or self.nova 176 collision_scenes = ( 177 await bridge_nova._api_client.store_collision_scenes_api.list_stored_collision_scenes( 178 cell=bridge_nova.cell()._cell_id 179 ) 180 ) 181 182 if scene_id not in collision_scenes: 183 raise ValueError(f"Collision scene with ID {scene_id} not found") 184 185 log_collision_scenes({scene_id: collision_scenes[scene_id]}) 186 return {scene_id: collision_scenes[scene_id]} 187 188 def _log_collision_scene(self, collision_scenes: dict[str, models.CollisionScene]) -> None: 189 log_collision_scenes(collision_scenes=collision_scenes) 190 191 async def log_safety_zones(self, motion_group: MotionGroup) -> None: 192 tcp_names = await motion_group.tcp_names() 193 tcp = tcp_names[0] 194 195 rr.reset_time() 196 rr.set_time(TIME_INTERVAL_NAME, duration=0) 197 198 log_safety_zones( 199 motion_group.motion_group_id, await motion_group._get_optimizer_setup(tcp=tcp) 200 ) 201 202 # Backward compatibility method (deprecated) 203 async def log_saftey_zones(self, motion_group: MotionGroup) -> None: 204 """Deprecated: Use log_safety_zones instead.""" 205 await self.log_safety_zones(motion_group) 206 207 def log_safety_zones_( 208 self, motion_group_id: str, optimizer_setup: models.OptimizerSetup 209 ) -> None: 210 log_safety_zones(motion_group_id, optimizer_setup) 211 212 async def log_motion( 213 self, 214 motion_id: str, 215 timing_mode=TimingMode.CONTINUE, 216 time_offset: float = 0, 217 tool_asset: Optional[str] = None, 218 ) -> None: 219 """Log motion trajectory to Rerun viewer. 220 221 Args: 222 motion_id: The motion identifier 223 timing_mode: DEPRECATED - Timing mode (ignored in new implementation) 224 time_offset: Time offset for visualization 225 tool_asset: Optional tool asset file path 226 """ 227 if timing_mode != TimingMode.CONTINUE: 228 import warnings 229 230 warnings.warn( 231 "timing_mode parameter is deprecated and will be removed in a future version. " 232 "Timing is now handled automatically per motion group.", 233 DeprecationWarning, 234 stacklevel=2, 235 ) 236 logger.debug(f"log_motion called with motion_id: {motion_id}") 237 try: 238 # Use the bridge's own Nova client for API calls 239 bridge_nova = self._bridge_nova or self.nova 240 logger.debug( 241 f"Using bridge_nova: {bridge_nova is not None}, bridge_nova._api_client: {hasattr(bridge_nova, '_api_client') if bridge_nova else False}" 242 ) 243 244 # Fetch motion details from api 245 logger.debug("Fetching motion details...") 246 motion = await bridge_nova._api_client.motion_api.get_planned_motion( 247 bridge_nova.cell()._cell_id, motion_id 248 ) 249 logger.debug("Fetching optimizer config...") 250 optimizer_config = ( 251 await bridge_nova._api_client.motion_group_infos_api.get_optimizer_configuration( 252 bridge_nova.cell()._cell_id, motion.motion_group 253 ) 254 ) 255 logger.debug("Fetching trajectory...") 256 trajectory = await bridge_nova._api_client.motion_api.get_motion_trajectory( 257 bridge_nova.cell()._cell_id, motion_id, int(RECORDING_INTERVAL * 1000) 258 ) 259 260 logger.debug("Fetching motion groups...") 261 motion_groups = await bridge_nova._api_client.motion_group_api.list_motion_groups( 262 bridge_nova.cell()._cell_id 263 ) 264 motion_motion_group = next( 265 (mg for mg in motion_groups.instances if mg.motion_group == motion.motion_group), 266 None, 267 ) 268 269 logger.debug("Fetching collision scenes...") 270 collision_scenes = await bridge_nova._api_client.store_collision_scenes_api.list_stored_collision_scenes( 271 cell=bridge_nova.cell()._cell_id 272 ) 273 274 if motion_motion_group is None: 275 raise ValueError(f"Motion group {motion.motion_group} not found") 276 277 # Get or initialize the timer for this motion group 278 motion_group_id = motion.motion_group 279 current_time = self._motion_group_timers.get(motion_group_id, 0.0) 280 281 logger.debug( 282 f"Calling log_motion function with trajectory points: {len(trajectory.trajectory or [])}" 283 ) 284 log_motion( 285 motion_id=motion_id, 286 model_from_controller=motion_motion_group.model_from_controller, 287 motion_group=motion.motion_group, 288 optimizer_config=optimizer_config, 289 trajectory=trajectory.trajectory or [], 290 collision_scenes=collision_scenes, 291 time_offset=current_time + time_offset, 292 tool_asset=tool_asset, 293 show_collision_link_chain=self.show_collision_link_chain, 294 show_safety_link_chain=self.show_safety_link_chain, 295 ) 296 # Update the timer for this motion group based on trajectory duration 297 if trajectory.trajectory: 298 last_trajectory_point = trajectory.trajectory[-1] 299 if last_trajectory_point.time is not None: 300 self._motion_group_timers[motion_group_id] = ( 301 current_time + time_offset + last_trajectory_point.time 302 ) 303 logger.debug("log_motion completed successfully") 304 except RuntimeError as e: 305 if "Session is closed" in str(e): 306 # Session is closed, skip trajectory logging 307 logger.debug(f"Skipping trajectory logging due to closed session: {e}") 308 return 309 else: 310 raise 311 except Exception as e: 312 # Log other errors but don't fail 313 logger.error(f"Failed to log motion trajectory: {e}") 314 raise 315 316 async def log_trajectory( 317 self, 318 joint_trajectory: models.JointTrajectory, 319 tcp: str, 320 motion_group: MotionGroup, 321 timing_mode=TimingMode.CONTINUE, 322 time_offset: float = 0, 323 tool_asset: Optional[str] = None, 324 ) -> None: 325 """Log joint trajectory to Rerun viewer. 326 327 Args: 328 joint_trajectory: The joint trajectory to log 329 tcp: TCP identifier 330 motion_group: Motion group for planning 331 timing_mode: DEPRECATED - Timing mode (ignored in new implementation) 332 time_offset: Time offset for visualization 333 tool_asset: Optional tool asset file path 334 """ 335 if timing_mode != TimingMode.CONTINUE: 336 import warnings 337 338 warnings.warn( 339 "timing_mode parameter is deprecated and will be removed in a future version. " 340 "Timing is now handled automatically per motion group.", 341 DeprecationWarning, 342 stacklevel=2, 343 ) 344 345 if len(joint_trajectory.joint_positions) == 0: 346 raise ValueError("No joint trajectory provided") 347 348 load_plan_response = await motion_group._load_planned_motion(joint_trajectory, tcp) 349 350 await self.log_motion( 351 load_plan_response.motion, time_offset=time_offset, tool_asset=tool_asset 352 ) 353 354 def continue_after_sync(self) -> None: 355 """No longer needed with per-motion-group timing. 356 357 This method is now a no-op since timing is automatically managed 358 per motion group. Each motion group maintains its own independent timeline. 359 360 .. deprecated:: 361 continue_after_sync() is deprecated and will be removed in a future version. 362 Timing is now handled automatically per motion group. 363 """ 364 import warnings 365 366 warnings.warn( 367 "continue_after_sync() is deprecated and will be removed in a future version. " 368 "Timing is now handled automatically per motion group.", 369 DeprecationWarning, 370 stacklevel=2, 371 ) 372 373 async def log_error_feedback( 374 self, error_feedback: PlanTrajectoryFailedResponseErrorFeedback 375 ) -> None: 376 if isinstance(error_feedback.actual_instance, FeedbackOutOfWorkspace): 377 if ( 378 error_feedback.actual_instance.invalid_tcp_pose 379 and error_feedback.actual_instance.invalid_tcp_pose.position 380 ): 381 position = error_feedback.actual_instance.invalid_tcp_pose.position 382 rr.log( 383 "motion/errors/FeedbackOutOfWorkspace", 384 rr.Points3D( 385 [[position[0], position[1], position[2]]], 386 radii=rr.Radius.ui_points([5.0]), 387 colors=[(255, 0, 0, 255)], 388 labels=["Out of Workspace"], 389 ), 390 static=True, 391 ) 392 393 if isinstance(error_feedback.actual_instance, FeedbackCollision): 394 collisions = error_feedback.actual_instance.collisions 395 if not collisions: 396 return 397 398 for i, collision in enumerate(collisions): 399 if collision.position_on_a is None or collision.position_on_b is None: 400 continue 401 if collision.position_on_a.world is None or collision.position_on_b.world is None: 402 continue 403 if collision.normal_world_on_b is None: 404 continue 405 406 # Extract positions 407 pos_a = collision.position_on_a.world 408 pos_b = collision.position_on_b.world 409 normal = collision.normal_world_on_b 410 411 # Scale normal for visibility 412 arrow_length = 50 413 414 # Log collision points 415 rr.log( 416 f"motion/errors/FeedbackCollision/collisions/point_{i}/a", 417 rr.Points3D( 418 [pos_a], radii=rr.Radius.ui_points([5.0]), colors=[(255, 0, 0, 255)] 419 ), 420 static=True, 421 ) 422 423 rr.log( 424 f"motion/errors/FeedbackCollision/collisions/point_{i}/b", 425 rr.Points3D( 426 [pos_b], radii=rr.Radius.ui_points([5.0]), colors=[(0, 0, 255, 255)] 427 ), 428 static=True, 429 ) 430 431 # Log normal vector as arrow 432 rr.log( 433 f"motion/errors/FeedbackCollision/collisions/normal_{i}", 434 rr.Arrows3D( 435 origins=[pos_b], 436 vectors=[ 437 [ 438 normal[0] * arrow_length, 439 normal[1] * arrow_length, 440 normal[2] * arrow_length, 441 ] 442 ], 443 colors=[(255, 255, 0, 255)], 444 ), 445 static=True, 446 ) 447 448 async def start_streaming(self, motion_group: MotionGroup) -> None: 449 """Start streaming real-time robot state to Rerun viewer.""" 450 if motion_group in self._streaming_tasks: 451 return 452 453 task = asyncio.create_task(stream_motion_group(self, motion_group=motion_group)) 454 self._streaming_tasks[motion_group] = task 455 456 async def stop_streaming(self) -> None: 457 """Stop all streaming tasks.""" 458 for task in self._streaming_tasks.values(): 459 task.cancel() 460 self._streaming_tasks.clear() 461 462 async def log_actions( 463 self, 464 actions: list[Action | CollisionFreeMotion] | Action, 465 show_connection: bool = False, 466 show_labels: bool = False, 467 motion_group: Optional[MotionGroup] = None, 468 tcp: Optional[str] = None, 469 ) -> None: 470 """Log robot actions as points in the Rerun viewer. 471 472 This method visualizes robot actions by determining their TCP poses and displaying 473 them as colored points in 3D space. Joint motions are converted to poses using 474 forward kinematics with the specified TCP. 475 476 Args: 477 actions: Single action or list of actions to visualize 478 show_connection: Whether to draw lines connecting consecutive action points 479 show_labels: Whether to display action type labels on points 480 motion_group: Motion group for forward kinematics (required for joint motions) 481 tcp: TCP identifier to use for forward kinematics. If None and motion_group 482 is provided, uses the first available TCP. Should match the TCP used 483 for trajectory planning to ensure consistency. 484 485 Raises: 486 ValueError: If no actions are provided 487 """ 488 rr.reset_time() 489 490 # Use motion group specific timing if available 491 if motion_group is not None: 492 motion_group_time = self._motion_group_timers.get(motion_group.motion_group_id, 0.0) 493 rr.set_time(TIME_INTERVAL_NAME, duration=motion_group_time) 494 else: 495 # Fallback to time 0 if no motion group provided 496 rr.set_time(TIME_INTERVAL_NAME, duration=0.0) 497 498 if not isinstance(actions, list): 499 actions = [actions] 500 501 if len(actions) == 0: 502 raise ValueError("No actions provided") 503 504 # Determine the TCP to use - if not provided, get the first available TCP 505 if tcp is None and motion_group is not None: 506 tcp_names = await motion_group.tcp_names() 507 tcp = tcp_names[0] if tcp_names else "Flange" 508 509 positions = [] 510 point_colors = [] 511 labels = [] 512 513 # Keep track of the last pose for write actions 514 last_pose = None 515 last_joints = None 516 517 # Process each action to get its pose 518 for i, action in enumerate(actions): 519 pose = None 520 action_type = getattr(action, "type", type(action).__name__) 521 522 if isinstance(action, WriteAction): 523 # Write actions use the last pose or joint config 524 if last_pose is not None: 525 pose = last_pose 526 elif last_joints is not None and motion_group is not None and tcp is not None: 527 # Use forward kinematics to convert joint config to pose 528 pose = await self._joint_to_pose(last_joints, motion_group, tcp) 529 else: 530 # Skip write actions without a previous pose/joint config 531 continue 532 533 elif isinstance(action, CollisionFreeMotion) and isinstance(action.target, Pose): 534 pose = action.target 535 last_pose = pose 536 537 elif isinstance(action, Motion): 538 if hasattr(action, "target"): 539 if isinstance(action.target, Pose): 540 # Cartesian motion 541 pose = action.target 542 last_pose = pose 543 elif ( 544 isinstance(action.target, tuple) 545 and motion_group is not None 546 and tcp is not None 547 ): 548 # Joint motion - use forward kinematics 549 pose = await self._joint_to_pose(action.target, motion_group, tcp) 550 last_joints = action.target 551 last_pose = pose 552 else: 553 # Skip actions without a usable target 554 continue 555 else: 556 # Skip actions without a target 557 continue 558 else: 559 # Skip other action types that we don't know how to handle 560 continue 561 562 # If we reach here, we should have a valid pose or we need to skip 563 if pose is None: 564 continue 565 566 logger.debug(f"Action {i}: {action_type}, Pose: {pose}") 567 positions.append([pose.position.x, pose.position.y, pose.position.z]) 568 569 # Determine action type and color using better color palette 570 from nova_rerun_bridge.colors import colors 571 572 if isinstance(action, WriteAction): 573 point_colors.append(tuple(colors[0])) # Light purple for IO actions 574 elif action_type == "joint_ptp": 575 point_colors.append(tuple(colors[2])) # Medium purple for joint motions 576 elif action_type == "cartesian_ptp": 577 point_colors.append(tuple(colors[4])) # Deeper purple for cartesian motions 578 elif action_type == "linear": 579 point_colors.append(tuple(colors[6])) # Dark purple for linear motions 580 elif action_type == "circular": 581 point_colors.append(tuple(colors[8])) # Very dark purple for circular motions 582 else: 583 point_colors.append(tuple(colors[10])) # Darkest color for other actions 584 585 # Create descriptive label with ID and action type (only if needed) 586 labels.append(f"{len(positions) - 1}: {action_type}") 587 588 entity_path = ( 589 f"motion/{motion_group.motion_group_id}/actions" if motion_group else "motion/actions" 590 ) 591 592 # Log all positions with labels and colors 593 if positions: 594 # Prepare labels 595 point_labels = labels 596 597 rr.log( 598 entity_path, 599 rr.Points3D( 600 positions, 601 colors=point_colors, 602 labels=point_labels, 603 show_labels=show_labels, 604 radii=rr.Radius.ui_points([8.0]), 605 ), 606 static=True, 607 ) 608 609 # Log connections between consecutive actions if show_connection is True 610 if show_connection and len(positions) > 1: 611 connection_lines = [] 612 for i in range(len(positions) - 1): 613 connection_lines.append([positions[i], positions[i + 1]]) 614 615 rr.log( 616 f"{entity_path}/connections", 617 rr.LineStrips3D( 618 connection_lines, 619 colors=[(128, 128, 128)], # Gray connections 620 radii=rr.Radius.ui_points([2.0]), 621 ), 622 static=True, 623 ) 624 625 async def _joint_to_pose( 626 self, joint_config: tuple[float, ...], motion_group: MotionGroup, tcp: str 627 ) -> Pose: 628 """Convert joint configuration to pose using forward kinematics. 629 630 Args: 631 joint_config: The joint configuration to convert 632 motion_group: The motion group for forward kinematics 633 tcp: The TCP identifier used for planning (should match the TCP used in planning) 634 """ 635 try: 636 # Use Nova's forward kinematics API to get TCP pose from joint configuration 637 from nova.api import models 638 639 # Create a joint position object 640 joint_position = models.Joints(joints=list(joint_config)) 641 642 # Create TcpPoseRequest for forward kinematics calculation 643 tcp_pose_request = models.TcpPoseRequest( 644 motion_group=motion_group.motion_group_id, joint_position=joint_position, tcp=tcp 645 ) 646 647 # Use Nova's API to calculate the TCP pose from joint configuration 648 api_pose = await motion_group._api_gateway.motion_group_kinematic_api.calculate_forward_kinematic( 649 cell=motion_group._cell, 650 motion_group=motion_group.motion_group_id, 651 tcp_pose_request=tcp_pose_request, 652 ) 653 654 # Convert API pose to Nova Pose 655 orientation = api_pose.orientation 656 return Pose( 657 ( 658 api_pose.position.x, 659 api_pose.position.y, 660 api_pose.position.z, 661 orientation.x if orientation else 0.0, 662 orientation.y if orientation else 0.0, 663 orientation.z if orientation else 0.0, 664 ) 665 ) 666 667 except Exception as e: 668 logger.warning(f"Failed to convert joints to pose using forward kinematics: {e}") 669 # Fallback: return a pose at origin 670 return Pose((0.0, 0.0, 0.0, 0.0, 0.0, 0.0)) 671 672 def get_motion_group_time(self, motion_group_id: str) -> float: 673 """Get the current timeline position for a motion group. 674 675 Args: 676 motion_group_id: The motion group identifier 677 678 Returns: 679 Current time position for the motion group (0.0 if not seen before) 680 """ 681 return self._motion_group_timers.get(motion_group_id, 0.0) 682 683 def reset_motion_group_time(self, motion_group_id: str) -> None: 684 """Reset the timeline for a specific motion group back to 0. 685 686 Args: 687 motion_group_id: The motion group identifier to reset 688 """ 689 self._motion_group_timers[motion_group_id] = 0.0 690 691 async def __aenter__(self) -> "NovaRerunBridge": 692 """Context manager entry point. 693 694 Creates a separate Nova client instance for the bridge to use. 695 696 Returns: 697 NovaRerunBridge: Self reference for context manager usage. 698 """ 699 # Create a separate Nova instance for the bridge using the same connection parameters 700 # This ensures the bridge has its own session that won't be closed by the main program 701 api_client = self.nova._api_client 702 703 # Extract the host without the /api/v1 suffix 704 host = api_client._host 705 if host.endswith("/api/v1"): 706 host = host[:-7] # Remove '/api/v1' 707 elif host.endswith("/api/"): 708 host = host[:-5] # Remove '/api/' 709 710 self._bridge_nova = Nova( 711 host=host, 712 access_token=api_client._access_token, 713 username=api_client._username, 714 password=api_client._password, 715 version=api_client._version, 716 verify_ssl=api_client._verify_ssl, 717 ) 718 await self._bridge_nova.__aenter__() 719 return self 720 721 async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: 722 """Context manager exit point, ensures cleanup.""" 723 if "VSCODE_PROXY_URI" in os.environ: 724 logger.info(f"Install rerun app and open the visual log on {get_rerun_address()}") 725 726 await self.cleanup() 727 728 async def cleanup(self) -> None: 729 """Cleanup resources and close Nova API client connection.""" 730 # Clean up the bridge's own Nova instance 731 if self._bridge_nova is not None: 732 await self._bridge_nova.__aexit__(None, None, None) 733 self._bridge_nova = None 734 735 # Note: Don't clean up self.nova as it belongs to the main program
Bridge between Nova and Rerun for visualization.
This class provides functionality to visualize Nova data in Rerun. It handles trajectoy, collision scenes, blueprints and proper cleanup of resources.
Example:
from nova.core.nova import Nova from nova_rerun_bridge import NovaRerunBridge async def main(): nova = Nova() async with NovaRerunBridge(nova) as bridge: await bridge.setup_blueprint() await bridge.log_collision_scenes()
Arguments:
- nova (Nova): Instance of Nova client
- spawn (bool, optional): Whether to spawn Rerun viewer. Defaults to True.
58 def __init__( 59 self, 60 nova: Nova, 61 spawn: bool = True, 62 recording_id=None, 63 show_details: bool = True, 64 show_collision_link_chain: bool = False, 65 show_safety_link_chain: bool = True, 66 ) -> None: 67 self._ensure_models_exist() 68 # Store the original nova instance for initial setup and to copy connection parameters 69 self.nova = nova 70 # Create a separate Nova instance for the bridge to use for API calls 71 self._bridge_nova = None 72 self._streaming_tasks: dict[MotionGroup, asyncio.Task] = {} 73 # Track timing per motion group - each motion group has its own timeline 74 self._motion_group_timers: dict[str, float] = {} 75 self.show_details = show_details 76 self.show_collision_link_chain = show_collision_link_chain 77 self.show_safety_link_chain = show_safety_link_chain 78 79 recording_id = recording_id or f"nova_{datetime.now().strftime('%Y%m%d_%H%M%S')}" 80 81 if "VSCODE_PROXY_URI" in os.environ: 82 rr.init(application_id="nova", recording_id=recording_id, spawn=False) 83 rr.save("nova.rrd") 84 logger.info(f"Install rerun app and open the visual log on {get_rerun_address()}") 85 elif spawn: 86 rr.init(application_id="nova", recording_id=recording_id, spawn=True) 87 88 logger.add(sink=rr.LoggingHandler("logs/handler"))
96 async def setup_blueprint(self) -> None: 97 """Configure and send blueprint configuration to Rerun. 98 99 Fetches motion groups from Nova and configures visualization layout. 100 """ 101 nova = self._bridge_nova or self.nova 102 cell = nova.cell() 103 104 controllers = await cell.controllers() 105 motion_groups = [] 106 107 if not controllers: 108 logger.warning("No controllers found") 109 return 110 111 for controller in controllers: 112 for motion_group in await controller.activated_motion_groups(): 113 motion_groups.append(motion_group.motion_group_id) 114 115 rr.reset_time() 116 rr.set_time(TIME_INTERVAL_NAME, duration=0) 117 118 send_blueprint(motion_groups, self.show_details) 119 self.log_coordinate_system()
Configure and send blueprint configuration to Rerun.
Fetches motion groups from Nova and configures visualization layout.
121 def log_coordinate_system(self) -> None: 122 """Log the coordinate system of the cell.""" 123 124 coordinate_origins = np.zeros((3, 3)) # Origin points for x, y, z arrows 125 coordinate_vectors = ( 126 np.array( 127 [ 128 [1.0, 0.0, 0.0], # X direction 129 [0.0, 1.0, 0.0], # Y direction 130 [0.0, 0.0, 1.0], # Z direction 131 ] 132 ) 133 * 200.0 134 ) # Scale factor of 200.0 for better visibility 135 136 coordinate_colors = np.array( 137 [ 138 [1.0, 0.125, 0.376, 1.0], # #ff2060 - Red/Pink for X 139 [0.125, 0.875, 0.502, 1.0], # #20df80 - Green for Y 140 [0.125, 0.502, 1.0, 1.0], # #2080ff - Blue for Z 141 ] 142 ) 143 144 rr.log( 145 "coordinate_system_world", 146 rr.Arrows3D( 147 origins=coordinate_origins, 148 vectors=coordinate_vectors, 149 colors=coordinate_colors, 150 radii=rr.Radius.ui_points([5.0]), 151 ), 152 static=True, 153 )
Log the coordinate system of the cell.
155 async def log_collision_scenes(self) -> dict[str, models.CollisionScene]: 156 """Fetch and log all collision scenes from Nova to Rerun.""" 157 bridge_nova = self._bridge_nova or self.nova 158 collision_scenes = ( 159 await bridge_nova._api_client.store_collision_scenes_api.list_stored_collision_scenes( 160 cell=bridge_nova.cell()._cell_id 161 ) 162 ) 163 log_collision_scenes(collision_scenes) 164 return collision_scenes
Fetch and log all collision scenes from Nova to Rerun.
166 async def log_collision_scene(self, scene_id: str) -> dict[str, models.CollisionScene]: 167 """Log a specific collision scene by its ID. 168 169 Args: 170 scene_id (str): The ID of the collision scene to log 171 172 Raises: 173 ValueError: If scene_id is not found in stored collision scenes 174 """ 175 bridge_nova = self._bridge_nova or self.nova 176 collision_scenes = ( 177 await bridge_nova._api_client.store_collision_scenes_api.list_stored_collision_scenes( 178 cell=bridge_nova.cell()._cell_id 179 ) 180 ) 181 182 if scene_id not in collision_scenes: 183 raise ValueError(f"Collision scene with ID {scene_id} not found") 184 185 log_collision_scenes({scene_id: collision_scenes[scene_id]}) 186 return {scene_id: collision_scenes[scene_id]}
Log a specific collision scene by its ID.
Arguments:
- scene_id (str): The ID of the collision scene to log
Raises:
- ValueError: If scene_id is not found in stored collision scenes
191 async def log_safety_zones(self, motion_group: MotionGroup) -> None: 192 tcp_names = await motion_group.tcp_names() 193 tcp = tcp_names[0] 194 195 rr.reset_time() 196 rr.set_time(TIME_INTERVAL_NAME, duration=0) 197 198 log_safety_zones( 199 motion_group.motion_group_id, await motion_group._get_optimizer_setup(tcp=tcp) 200 )
203 async def log_saftey_zones(self, motion_group: MotionGroup) -> None: 204 """Deprecated: Use log_safety_zones instead.""" 205 await self.log_safety_zones(motion_group)
Deprecated: Use log_safety_zones instead.
212 async def log_motion( 213 self, 214 motion_id: str, 215 timing_mode=TimingMode.CONTINUE, 216 time_offset: float = 0, 217 tool_asset: Optional[str] = None, 218 ) -> None: 219 """Log motion trajectory to Rerun viewer. 220 221 Args: 222 motion_id: The motion identifier 223 timing_mode: DEPRECATED - Timing mode (ignored in new implementation) 224 time_offset: Time offset for visualization 225 tool_asset: Optional tool asset file path 226 """ 227 if timing_mode != TimingMode.CONTINUE: 228 import warnings 229 230 warnings.warn( 231 "timing_mode parameter is deprecated and will be removed in a future version. " 232 "Timing is now handled automatically per motion group.", 233 DeprecationWarning, 234 stacklevel=2, 235 ) 236 logger.debug(f"log_motion called with motion_id: {motion_id}") 237 try: 238 # Use the bridge's own Nova client for API calls 239 bridge_nova = self._bridge_nova or self.nova 240 logger.debug( 241 f"Using bridge_nova: {bridge_nova is not None}, bridge_nova._api_client: {hasattr(bridge_nova, '_api_client') if bridge_nova else False}" 242 ) 243 244 # Fetch motion details from api 245 logger.debug("Fetching motion details...") 246 motion = await bridge_nova._api_client.motion_api.get_planned_motion( 247 bridge_nova.cell()._cell_id, motion_id 248 ) 249 logger.debug("Fetching optimizer config...") 250 optimizer_config = ( 251 await bridge_nova._api_client.motion_group_infos_api.get_optimizer_configuration( 252 bridge_nova.cell()._cell_id, motion.motion_group 253 ) 254 ) 255 logger.debug("Fetching trajectory...") 256 trajectory = await bridge_nova._api_client.motion_api.get_motion_trajectory( 257 bridge_nova.cell()._cell_id, motion_id, int(RECORDING_INTERVAL * 1000) 258 ) 259 260 logger.debug("Fetching motion groups...") 261 motion_groups = await bridge_nova._api_client.motion_group_api.list_motion_groups( 262 bridge_nova.cell()._cell_id 263 ) 264 motion_motion_group = next( 265 (mg for mg in motion_groups.instances if mg.motion_group == motion.motion_group), 266 None, 267 ) 268 269 logger.debug("Fetching collision scenes...") 270 collision_scenes = await bridge_nova._api_client.store_collision_scenes_api.list_stored_collision_scenes( 271 cell=bridge_nova.cell()._cell_id 272 ) 273 274 if motion_motion_group is None: 275 raise ValueError(f"Motion group {motion.motion_group} not found") 276 277 # Get or initialize the timer for this motion group 278 motion_group_id = motion.motion_group 279 current_time = self._motion_group_timers.get(motion_group_id, 0.0) 280 281 logger.debug( 282 f"Calling log_motion function with trajectory points: {len(trajectory.trajectory or [])}" 283 ) 284 log_motion( 285 motion_id=motion_id, 286 model_from_controller=motion_motion_group.model_from_controller, 287 motion_group=motion.motion_group, 288 optimizer_config=optimizer_config, 289 trajectory=trajectory.trajectory or [], 290 collision_scenes=collision_scenes, 291 time_offset=current_time + time_offset, 292 tool_asset=tool_asset, 293 show_collision_link_chain=self.show_collision_link_chain, 294 show_safety_link_chain=self.show_safety_link_chain, 295 ) 296 # Update the timer for this motion group based on trajectory duration 297 if trajectory.trajectory: 298 last_trajectory_point = trajectory.trajectory[-1] 299 if last_trajectory_point.time is not None: 300 self._motion_group_timers[motion_group_id] = ( 301 current_time + time_offset + last_trajectory_point.time 302 ) 303 logger.debug("log_motion completed successfully") 304 except RuntimeError as e: 305 if "Session is closed" in str(e): 306 # Session is closed, skip trajectory logging 307 logger.debug(f"Skipping trajectory logging due to closed session: {e}") 308 return 309 else: 310 raise 311 except Exception as e: 312 # Log other errors but don't fail 313 logger.error(f"Failed to log motion trajectory: {e}") 314 raise
Log motion trajectory to Rerun viewer.
Arguments:
- motion_id: The motion identifier
- timing_mode: DEPRECATED - Timing mode (ignored in new implementation)
- time_offset: Time offset for visualization
- tool_asset: Optional tool asset file path
316 async def log_trajectory( 317 self, 318 joint_trajectory: models.JointTrajectory, 319 tcp: str, 320 motion_group: MotionGroup, 321 timing_mode=TimingMode.CONTINUE, 322 time_offset: float = 0, 323 tool_asset: Optional[str] = None, 324 ) -> None: 325 """Log joint trajectory to Rerun viewer. 326 327 Args: 328 joint_trajectory: The joint trajectory to log 329 tcp: TCP identifier 330 motion_group: Motion group for planning 331 timing_mode: DEPRECATED - Timing mode (ignored in new implementation) 332 time_offset: Time offset for visualization 333 tool_asset: Optional tool asset file path 334 """ 335 if timing_mode != TimingMode.CONTINUE: 336 import warnings 337 338 warnings.warn( 339 "timing_mode parameter is deprecated and will be removed in a future version. " 340 "Timing is now handled automatically per motion group.", 341 DeprecationWarning, 342 stacklevel=2, 343 ) 344 345 if len(joint_trajectory.joint_positions) == 0: 346 raise ValueError("No joint trajectory provided") 347 348 load_plan_response = await motion_group._load_planned_motion(joint_trajectory, tcp) 349 350 await self.log_motion( 351 load_plan_response.motion, time_offset=time_offset, tool_asset=tool_asset 352 )
Log joint trajectory to Rerun viewer.
Arguments:
- joint_trajectory: The joint trajectory to log
- tcp: TCP identifier
- motion_group: Motion group for planning
- timing_mode: DEPRECATED - Timing mode (ignored in new implementation)
- time_offset: Time offset for visualization
- tool_asset: Optional tool asset file path
354 def continue_after_sync(self) -> None: 355 """No longer needed with per-motion-group timing. 356 357 This method is now a no-op since timing is automatically managed 358 per motion group. Each motion group maintains its own independent timeline. 359 360 .. deprecated:: 361 continue_after_sync() is deprecated and will be removed in a future version. 362 Timing is now handled automatically per motion group. 363 """ 364 import warnings 365 366 warnings.warn( 367 "continue_after_sync() is deprecated and will be removed in a future version. " 368 "Timing is now handled automatically per motion group.", 369 DeprecationWarning, 370 stacklevel=2, 371 )
No longer needed with per-motion-group timing.
This method is now a no-op since timing is automatically managed per motion group. Each motion group maintains its own independent timeline.
Deprecated since version : continue_after_sync() is deprecated and will be removed in a future version. Timing is now handled automatically per motion group.
373 async def log_error_feedback( 374 self, error_feedback: PlanTrajectoryFailedResponseErrorFeedback 375 ) -> None: 376 if isinstance(error_feedback.actual_instance, FeedbackOutOfWorkspace): 377 if ( 378 error_feedback.actual_instance.invalid_tcp_pose 379 and error_feedback.actual_instance.invalid_tcp_pose.position 380 ): 381 position = error_feedback.actual_instance.invalid_tcp_pose.position 382 rr.log( 383 "motion/errors/FeedbackOutOfWorkspace", 384 rr.Points3D( 385 [[position[0], position[1], position[2]]], 386 radii=rr.Radius.ui_points([5.0]), 387 colors=[(255, 0, 0, 255)], 388 labels=["Out of Workspace"], 389 ), 390 static=True, 391 ) 392 393 if isinstance(error_feedback.actual_instance, FeedbackCollision): 394 collisions = error_feedback.actual_instance.collisions 395 if not collisions: 396 return 397 398 for i, collision in enumerate(collisions): 399 if collision.position_on_a is None or collision.position_on_b is None: 400 continue 401 if collision.position_on_a.world is None or collision.position_on_b.world is None: 402 continue 403 if collision.normal_world_on_b is None: 404 continue 405 406 # Extract positions 407 pos_a = collision.position_on_a.world 408 pos_b = collision.position_on_b.world 409 normal = collision.normal_world_on_b 410 411 # Scale normal for visibility 412 arrow_length = 50 413 414 # Log collision points 415 rr.log( 416 f"motion/errors/FeedbackCollision/collisions/point_{i}/a", 417 rr.Points3D( 418 [pos_a], radii=rr.Radius.ui_points([5.0]), colors=[(255, 0, 0, 255)] 419 ), 420 static=True, 421 ) 422 423 rr.log( 424 f"motion/errors/FeedbackCollision/collisions/point_{i}/b", 425 rr.Points3D( 426 [pos_b], radii=rr.Radius.ui_points([5.0]), colors=[(0, 0, 255, 255)] 427 ), 428 static=True, 429 ) 430 431 # Log normal vector as arrow 432 rr.log( 433 f"motion/errors/FeedbackCollision/collisions/normal_{i}", 434 rr.Arrows3D( 435 origins=[pos_b], 436 vectors=[ 437 [ 438 normal[0] * arrow_length, 439 normal[1] * arrow_length, 440 normal[2] * arrow_length, 441 ] 442 ], 443 colors=[(255, 255, 0, 255)], 444 ), 445 static=True, 446 )
448 async def start_streaming(self, motion_group: MotionGroup) -> None: 449 """Start streaming real-time robot state to Rerun viewer.""" 450 if motion_group in self._streaming_tasks: 451 return 452 453 task = asyncio.create_task(stream_motion_group(self, motion_group=motion_group)) 454 self._streaming_tasks[motion_group] = task
Start streaming real-time robot state to Rerun viewer.
456 async def stop_streaming(self) -> None: 457 """Stop all streaming tasks.""" 458 for task in self._streaming_tasks.values(): 459 task.cancel() 460 self._streaming_tasks.clear()
Stop all streaming tasks.
462 async def log_actions( 463 self, 464 actions: list[Action | CollisionFreeMotion] | Action, 465 show_connection: bool = False, 466 show_labels: bool = False, 467 motion_group: Optional[MotionGroup] = None, 468 tcp: Optional[str] = None, 469 ) -> None: 470 """Log robot actions as points in the Rerun viewer. 471 472 This method visualizes robot actions by determining their TCP poses and displaying 473 them as colored points in 3D space. Joint motions are converted to poses using 474 forward kinematics with the specified TCP. 475 476 Args: 477 actions: Single action or list of actions to visualize 478 show_connection: Whether to draw lines connecting consecutive action points 479 show_labels: Whether to display action type labels on points 480 motion_group: Motion group for forward kinematics (required for joint motions) 481 tcp: TCP identifier to use for forward kinematics. If None and motion_group 482 is provided, uses the first available TCP. Should match the TCP used 483 for trajectory planning to ensure consistency. 484 485 Raises: 486 ValueError: If no actions are provided 487 """ 488 rr.reset_time() 489 490 # Use motion group specific timing if available 491 if motion_group is not None: 492 motion_group_time = self._motion_group_timers.get(motion_group.motion_group_id, 0.0) 493 rr.set_time(TIME_INTERVAL_NAME, duration=motion_group_time) 494 else: 495 # Fallback to time 0 if no motion group provided 496 rr.set_time(TIME_INTERVAL_NAME, duration=0.0) 497 498 if not isinstance(actions, list): 499 actions = [actions] 500 501 if len(actions) == 0: 502 raise ValueError("No actions provided") 503 504 # Determine the TCP to use - if not provided, get the first available TCP 505 if tcp is None and motion_group is not None: 506 tcp_names = await motion_group.tcp_names() 507 tcp = tcp_names[0] if tcp_names else "Flange" 508 509 positions = [] 510 point_colors = [] 511 labels = [] 512 513 # Keep track of the last pose for write actions 514 last_pose = None 515 last_joints = None 516 517 # Process each action to get its pose 518 for i, action in enumerate(actions): 519 pose = None 520 action_type = getattr(action, "type", type(action).__name__) 521 522 if isinstance(action, WriteAction): 523 # Write actions use the last pose or joint config 524 if last_pose is not None: 525 pose = last_pose 526 elif last_joints is not None and motion_group is not None and tcp is not None: 527 # Use forward kinematics to convert joint config to pose 528 pose = await self._joint_to_pose(last_joints, motion_group, tcp) 529 else: 530 # Skip write actions without a previous pose/joint config 531 continue 532 533 elif isinstance(action, CollisionFreeMotion) and isinstance(action.target, Pose): 534 pose = action.target 535 last_pose = pose 536 537 elif isinstance(action, Motion): 538 if hasattr(action, "target"): 539 if isinstance(action.target, Pose): 540 # Cartesian motion 541 pose = action.target 542 last_pose = pose 543 elif ( 544 isinstance(action.target, tuple) 545 and motion_group is not None 546 and tcp is not None 547 ): 548 # Joint motion - use forward kinematics 549 pose = await self._joint_to_pose(action.target, motion_group, tcp) 550 last_joints = action.target 551 last_pose = pose 552 else: 553 # Skip actions without a usable target 554 continue 555 else: 556 # Skip actions without a target 557 continue 558 else: 559 # Skip other action types that we don't know how to handle 560 continue 561 562 # If we reach here, we should have a valid pose or we need to skip 563 if pose is None: 564 continue 565 566 logger.debug(f"Action {i}: {action_type}, Pose: {pose}") 567 positions.append([pose.position.x, pose.position.y, pose.position.z]) 568 569 # Determine action type and color using better color palette 570 from nova_rerun_bridge.colors import colors 571 572 if isinstance(action, WriteAction): 573 point_colors.append(tuple(colors[0])) # Light purple for IO actions 574 elif action_type == "joint_ptp": 575 point_colors.append(tuple(colors[2])) # Medium purple for joint motions 576 elif action_type == "cartesian_ptp": 577 point_colors.append(tuple(colors[4])) # Deeper purple for cartesian motions 578 elif action_type == "linear": 579 point_colors.append(tuple(colors[6])) # Dark purple for linear motions 580 elif action_type == "circular": 581 point_colors.append(tuple(colors[8])) # Very dark purple for circular motions 582 else: 583 point_colors.append(tuple(colors[10])) # Darkest color for other actions 584 585 # Create descriptive label with ID and action type (only if needed) 586 labels.append(f"{len(positions) - 1}: {action_type}") 587 588 entity_path = ( 589 f"motion/{motion_group.motion_group_id}/actions" if motion_group else "motion/actions" 590 ) 591 592 # Log all positions with labels and colors 593 if positions: 594 # Prepare labels 595 point_labels = labels 596 597 rr.log( 598 entity_path, 599 rr.Points3D( 600 positions, 601 colors=point_colors, 602 labels=point_labels, 603 show_labels=show_labels, 604 radii=rr.Radius.ui_points([8.0]), 605 ), 606 static=True, 607 ) 608 609 # Log connections between consecutive actions if show_connection is True 610 if show_connection and len(positions) > 1: 611 connection_lines = [] 612 for i in range(len(positions) - 1): 613 connection_lines.append([positions[i], positions[i + 1]]) 614 615 rr.log( 616 f"{entity_path}/connections", 617 rr.LineStrips3D( 618 connection_lines, 619 colors=[(128, 128, 128)], # Gray connections 620 radii=rr.Radius.ui_points([2.0]), 621 ), 622 static=True, 623 )
Log robot actions as points in the Rerun viewer.
This method visualizes robot actions by determining their TCP poses and displaying them as colored points in 3D space. Joint motions are converted to poses using forward kinematics with the specified TCP.
Arguments:
- actions: Single action or list of actions to visualize
- show_connection: Whether to draw lines connecting consecutive action points
- show_labels: Whether to display action type labels on points
- motion_group: Motion group for forward kinematics (required for joint motions)
- tcp: TCP identifier to use for forward kinematics. If None and motion_group is provided, uses the first available TCP. Should match the TCP used for trajectory planning to ensure consistency.
Raises:
- ValueError: If no actions are provided
672 def get_motion_group_time(self, motion_group_id: str) -> float: 673 """Get the current timeline position for a motion group. 674 675 Args: 676 motion_group_id: The motion group identifier 677 678 Returns: 679 Current time position for the motion group (0.0 if not seen before) 680 """ 681 return self._motion_group_timers.get(motion_group_id, 0.0)
Get the current timeline position for a motion group.
Arguments:
- motion_group_id: The motion group identifier
Returns:
Current time position for the motion group (0.0 if not seen before)
683 def reset_motion_group_time(self, motion_group_id: str) -> None: 684 """Reset the timeline for a specific motion group back to 0. 685 686 Args: 687 motion_group_id: The motion group identifier to reset 688 """ 689 self._motion_group_timers[motion_group_id] = 0.0
Reset the timeline for a specific motion group back to 0.
Arguments:
- motion_group_id: The motion group identifier to reset
728 async def cleanup(self) -> None: 729 """Cleanup resources and close Nova API client connection.""" 730 # Clean up the bridge's own Nova instance 731 if self._bridge_nova is not None: 732 await self._bridge_nova.__aexit__(None, None, None) 733 self._bridge_nova = None 734 735 # Note: Don't clean up self.nova as it belongs to the main program
Cleanup resources and close Nova API client connection.