nova.viewers
Viewer implementations for Nova programs.
This module provides different viewer backends that can be used to visualize and monitor Nova programs during execution.
1""" 2Viewer implementations for Nova programs. 3 4This module provides different viewer backends that can be used 5to visualize and monitor Nova programs during execution. 6""" 7 8from __future__ import annotations 9 10# Public API exports 11from .base import Viewer 12from .manager import ViewerManager, get_viewer_manager 13from .manager import cleanup_active_viewers as _cleanup_active_viewers 14from .manager import configure_active_viewers as _configure_active_viewers 15from .manager import log_planning_error_to_viewers as _log_planning_error_to_viewers 16from .manager import log_planning_results_to_viewers as _log_planning_results_to_viewers 17from .manager import register_viewer as _register_viewer 18from .manager import ( 19 setup_active_viewers_after_preconditions as _setup_active_viewers_after_preconditions, 20) 21from .protocol import NovaRerunBridgeProtocol 22from .rerun import Rerun 23from .utils import extract_collision_scenes_from_actions as _extract_collision_scenes_from_actions 24 25__all__ = [ 26 "Viewer", 27 "ViewerManager", 28 "Rerun", 29 "NovaRerunBridgeProtocol", 30 "get_viewer_manager", 31 # Internal functions (prefixed with underscore) 32 "_configure_active_viewers", 33 "_setup_active_viewers_after_preconditions", 34 "_cleanup_active_viewers", 35 "_log_planning_results_to_viewers", 36 "_log_planning_error_to_viewers", 37 "_extract_collision_scenes_from_actions", 38 "_register_viewer", 39]
16class Viewer(ABC): 17 """Abstract base class for Nova program viewers.""" 18 19 @abstractmethod 20 def configure(self, nova: Nova) -> None: 21 """Configure the viewer for program execution.""" 22 pass 23 24 async def setup_after_preconditions(self) -> None: 25 """Setup viewer components after preconditions are satisfied. 26 27 Override this method in subclasses that need to wait for preconditions 28 like active controllers before setting up visualization components. 29 """ 30 pass 31 32 @abstractmethod 33 def cleanup(self) -> None: 34 """Clean up the viewer after program execution.""" 35 pass 36 37 async def log_planning_success( 38 self, 39 actions: Sequence[Action], 40 trajectory: models.JointTrajectory, 41 tcp: str, 42 motion_group: MotionGroup, 43 ) -> None: 44 """Log successful planning results. 45 46 Args: 47 actions: List of actions that were planned 48 trajectory: The resulting trajectory 49 tcp: TCP used for planning 50 motion_group: The motion group used for planning 51 """ 52 pass 53 54 async def log_planning_failure( 55 self, actions: Sequence[Action], error: Exception, tcp: str, motion_group: MotionGroup 56 ) -> None: 57 """Log planning failure results. 58 59 Args: 60 actions: List of actions that failed to plan 61 error: The planning error that occurred 62 tcp: TCP used for planning 63 motion_group: The motion group used for planning 64 """ 65 pass
Abstract base class for Nova program viewers.
19 @abstractmethod 20 def configure(self, nova: Nova) -> None: 21 """Configure the viewer for program execution.""" 22 pass
Configure the viewer for program execution.
24 async def setup_after_preconditions(self) -> None: 25 """Setup viewer components after preconditions are satisfied. 26 27 Override this method in subclasses that need to wait for preconditions 28 like active controllers before setting up visualization components. 29 """ 30 pass
Setup viewer components after preconditions are satisfied.
Override this method in subclasses that need to wait for preconditions like active controllers before setting up visualization components.
32 @abstractmethod 33 def cleanup(self) -> None: 34 """Clean up the viewer after program execution.""" 35 pass
Clean up the viewer after program execution.
37 async def log_planning_success( 38 self, 39 actions: Sequence[Action], 40 trajectory: models.JointTrajectory, 41 tcp: str, 42 motion_group: MotionGroup, 43 ) -> None: 44 """Log successful planning results. 45 46 Args: 47 actions: List of actions that were planned 48 trajectory: The resulting trajectory 49 tcp: TCP used for planning 50 motion_group: The motion group used for planning 51 """ 52 pass
Log successful planning results.
Arguments:
- actions: List of actions that were planned
- trajectory: The resulting trajectory
- tcp: TCP used for planning
- motion_group: The motion group used for planning
54 async def log_planning_failure( 55 self, actions: Sequence[Action], error: Exception, tcp: str, motion_group: MotionGroup 56 ) -> None: 57 """Log planning failure results. 58 59 Args: 60 actions: List of actions that failed to plan 61 error: The planning error that occurred 62 tcp: TCP used for planning 63 motion_group: The motion group used for planning 64 """ 65 pass
Log planning failure results.
Arguments:
- actions: List of actions that failed to plan
- error: The planning error that occurred
- tcp: TCP used for planning
- motion_group: The motion group used for planning
21class ViewerManager: 22 """Manages the lifecycle and coordination of all active viewers.""" 23 24 def __init__(self) -> None: 25 self._viewers: WeakSet[Viewer] = WeakSet() 26 27 def register_viewer(self, viewer: Viewer) -> None: 28 """Register a viewer as active.""" 29 self._viewers.add(viewer) 30 31 def configure_viewers(self, nova: Nova) -> None: 32 """Configure all active viewers with the Nova instance.""" 33 for viewer in self._viewers: 34 viewer.configure(nova) 35 36 async def setup_viewers_after_preconditions(self) -> None: 37 """Setup all active viewers after preconditions are satisfied.""" 38 for viewer in self._viewers: 39 await viewer.setup_after_preconditions() 40 41 def cleanup_viewers(self) -> None: 42 """Clean up all active viewers.""" 43 for viewer in list(self._viewers): # Copy to avoid modification during iteration 44 viewer.cleanup() 45 self._viewers.clear() 46 47 async def log_planning_success( 48 self, 49 actions: Sequence[Action], 50 trajectory: models.JointTrajectory, 51 tcp: str, 52 motion_group: MotionGroup, 53 ) -> None: 54 """Log successful planning results to all active viewers.""" 55 for viewer in self._viewers: 56 try: 57 await viewer.log_planning_success(actions, trajectory, tcp, motion_group) 58 except Exception as e: 59 # Don't fail planning if logging fails 60 logger.warning("Failed to log planning results to viewer: %s", e) 61 62 async def log_planning_failure( 63 self, actions: Sequence[Action], error: Exception, tcp: str, motion_group: MotionGroup 64 ) -> None: 65 """Log planning failure to all active viewers.""" 66 for viewer in self._viewers: 67 try: 68 await viewer.log_planning_failure(actions, error, tcp, motion_group) 69 except Exception as e: 70 # Don't fail planning if logging fails 71 logger.warning("Failed to log planning error to viewer: %s", e) 72 73 @property 74 def has_active_viewers(self) -> bool: 75 """Check if there are any active viewers.""" 76 return len(self._viewers) > 0
Manages the lifecycle and coordination of all active viewers.
27 def register_viewer(self, viewer: Viewer) -> None: 28 """Register a viewer as active.""" 29 self._viewers.add(viewer)
Register a viewer as active.
31 def configure_viewers(self, nova: Nova) -> None: 32 """Configure all active viewers with the Nova instance.""" 33 for viewer in self._viewers: 34 viewer.configure(nova)
Configure all active viewers with the Nova instance.
36 async def setup_viewers_after_preconditions(self) -> None: 37 """Setup all active viewers after preconditions are satisfied.""" 38 for viewer in self._viewers: 39 await viewer.setup_after_preconditions()
Setup all active viewers after preconditions are satisfied.
41 def cleanup_viewers(self) -> None: 42 """Clean up all active viewers.""" 43 for viewer in list(self._viewers): # Copy to avoid modification during iteration 44 viewer.cleanup() 45 self._viewers.clear()
Clean up all active viewers.
47 async def log_planning_success( 48 self, 49 actions: Sequence[Action], 50 trajectory: models.JointTrajectory, 51 tcp: str, 52 motion_group: MotionGroup, 53 ) -> None: 54 """Log successful planning results to all active viewers.""" 55 for viewer in self._viewers: 56 try: 57 await viewer.log_planning_success(actions, trajectory, tcp, motion_group) 58 except Exception as e: 59 # Don't fail planning if logging fails 60 logger.warning("Failed to log planning results to viewer: %s", e)
Log successful planning results to all active viewers.
62 async def log_planning_failure( 63 self, actions: Sequence[Action], error: Exception, tcp: str, motion_group: MotionGroup 64 ) -> None: 65 """Log planning failure to all active viewers.""" 66 for viewer in self._viewers: 67 try: 68 await viewer.log_planning_failure(actions, error, tcp, motion_group) 69 except Exception as e: 70 # Don't fail planning if logging fails 71 logger.warning("Failed to log planning error to viewer: %s", e)
Log planning failure to all active viewers.
23class Rerun(Viewer): 24 """ 25 Rerun viewer for 3D visualization of robot motion and program execution. 26 27 This viewer automatically captures and visualizes: 28 - Robot trajectories and motion paths 29 - TCP poses and transformations 30 - Motion group states 31 - Planning requests and responses 32 - Collision scenes and safety zones (optional) 33 - Tool geometries attached to specific TCPs 34 35 Example usage: 36 # 3D view only (default) 37 @nova.program( 38 viewer=nova.viewers.Rerun( 39 tcp_tools={"vacuum": "assets/vacuum_cup.stl"} 40 ) 41 ) 42 43 # Full interface with detailed analysis panels 44 @nova.program( 45 viewer=nova.viewers.Rerun( 46 show_details=True, 47 show_safety_zones=True, 48 show_collision_link_chain=True, 49 show_safety_link_chain=True, 50 tcp_tools={ 51 "vacuum": "assets/vacuum_cup.stl", 52 "gripper": "assets/parallel_gripper.stl" 53 } 54 ) 55 ) 56 """ 57 58 def __init__( 59 self, 60 application_id: Optional[str] = None, 61 spawn: bool = True, 62 show_safety_zones: bool = True, 63 show_collision_scenes: bool = True, 64 show_collision_link_chain: bool = False, 65 show_safety_link_chain: bool = True, 66 tcp_tools: Optional[dict[str, str]] = None, 67 show_details: bool = False, 68 ) -> None: 69 """ 70 Initialize the Rerun viewer. 71 72 Args: 73 application_id: Optional application ID for the rerun recording 74 spawn: Whether to spawn a rerun viewer process automatically 75 show_safety_zones: Whether to visualize safety zones for motion groups 76 show_collision_scenes: Whether to show collision scenes 77 show_collision_link_chain: Whether to show robot collision mesh geometry 78 show_safety_link_chain: Whether to show robot safety geometry (from controller) 79 tcp_tools: Optional mapping of TCP IDs to tool asset file paths 80 show_details: Whether to show detailed analysis panels with charts and logs (False = 3D view only) 81 """ 82 self.application_id: Optional[str] = application_id 83 self.spawn: bool = spawn 84 self.show_safety_zones: bool = show_safety_zones 85 self.show_collision_scenes: bool = show_collision_scenes 86 self.show_collision_link_chain: bool = show_collision_link_chain 87 self.show_safety_link_chain: bool = show_safety_link_chain 88 self.tcp_tools: dict[str, str] = tcp_tools or {} 89 self.show_details: bool = show_details 90 self._bridge: Optional[NovaRerunBridgeProtocol] = None 91 self._logged_safety_zones: set[str] = ( 92 set() 93 ) # Track motion groups that already have safety zones logged 94 95 # Register this viewer as active 96 register_viewer(self) 97 98 def configure(self, nova: Nova) -> None: 99 """Configure rerun integration for program execution.""" 100 if self._bridge is not None: 101 return # Already configured 102 103 try: 104 from nova_rerun_bridge import NovaRerunBridge 105 106 bridge = NovaRerunBridge( 107 nova=nova, 108 spawn=self.spawn, 109 recording_id=self.application_id, 110 show_details=self.show_details, 111 show_collision_link_chain=self.show_collision_link_chain, 112 show_safety_link_chain=self.show_safety_link_chain, 113 ) 114 self._bridge = cast(NovaRerunBridgeProtocol, bridge) 115 # Don't setup async components immediately - wait for controllers to be ready 116 except ImportError: 117 # nova_rerun_bridge not available, skip rerun integration 118 logger.warning( 119 "Rerun viewer configured but nova_rerun_bridge not available. " 120 "Install with: uv add wandelbots-nova --extra nova-rerun-bridge" 121 ) 122 123 async def setup_after_preconditions(self) -> None: 124 """Setup async components after preconditions (like controllers) are satisfied.""" 125 if self._bridge and not hasattr(self, "_async_setup_done"): 126 await self._setup_async_components() 127 self._async_setup_done = True 128 129 async def _setup_async_components(self) -> None: 130 """Setup async components like blueprint.""" 131 if self._bridge: 132 # Initialize the bridge's own Nova client before using it 133 await self._bridge.__aenter__() 134 135 # Setup blueprint (show_details is already configured in bridge) 136 await self._bridge.setup_blueprint() 137 138 async def _ensure_safety_zones_logged(self, motion_group: MotionGroup) -> None: 139 """Ensure safety zones are logged for the given motion group. 140 141 This method is called during planning to ensure safety zones are shown 142 only for motion groups that are actually being used. 143 144 Args: 145 motion_group: The motion group to log safety zones for 146 """ 147 if not self.show_safety_zones or not self._bridge: 148 return 149 150 # Use the motion group ID as unique identifier 151 motion_group_id = motion_group.motion_group_id 152 153 if motion_group_id not in self._logged_safety_zones: 154 try: 155 await self._bridge.log_safety_zones(motion_group) 156 self._logged_safety_zones.add(motion_group_id) 157 except Exception as e: 158 logger.warning( 159 "Could not log safety zones for motion group %s: %s", motion_group_id, e 160 ) 161 162 async def _log_planning_results( 163 self, 164 actions: Sequence[Action], 165 trajectory: models.JointTrajectory, 166 tcp: str, 167 motion_group: MotionGroup, 168 ) -> None: 169 """Log planning results including actions, trajectory, and collision scenes. 170 171 Args: 172 actions: List of actions that were planned 173 trajectory: The resulting trajectory 174 tcp: TCP used for planning 175 motion_group: The motion group used for planning 176 """ 177 if not self._bridge: 178 return 179 180 try: 181 # Log actions 182 await self._bridge.log_actions(list(actions), motion_group=motion_group) 183 184 # Log trajectory with tool asset if configured for this TCP 185 tool_asset = self._resolve_tool_asset(tcp) 186 await self._bridge.log_trajectory(trajectory, tcp, motion_group, tool_asset=tool_asset) 187 188 # Log collision scenes from actions if configured 189 if self.show_collision_scenes: 190 collision_scenes = extract_collision_scenes_from_actions(actions) 191 if collision_scenes: 192 # Log collision scenes using the sync method 193 self._bridge._log_collision_scene(collision_scenes) 194 195 except Exception as e: 196 logger.warning("Failed to log planning results in Rerun viewer: %s", e) 197 198 async def log_planning_success( 199 self, 200 actions: Sequence[Action], 201 trajectory: models.JointTrajectory, 202 tcp: str, 203 motion_group: MotionGroup, 204 ) -> None: 205 """Log successful planning results to Rerun viewer. 206 207 Args: 208 actions: List of actions that were planned 209 trajectory: The resulting trajectory 210 tcp: TCP used for planning 211 motion_group: The motion group used for planning 212 """ 213 # Ensure safety zones are logged for this motion group (only on first use) 214 await self._ensure_safety_zones_logged(motion_group) 215 216 # Log the planning results 217 await self._log_planning_results(actions, trajectory, tcp, motion_group) 218 219 async def log_planning_failure( 220 self, actions: Sequence[Action], error: Exception, tcp: str, motion_group: MotionGroup 221 ) -> None: 222 """Log planning failure to Rerun viewer. 223 224 Args: 225 actions: List of actions that failed to plan 226 error: The planning error that occurred 227 tcp: TCP used for planning 228 motion_group: The motion group used for planning 229 """ 230 if not self._bridge: 231 return 232 233 # Ensure safety zones are logged for this motion group (only on first use) 234 await self._ensure_safety_zones_logged(motion_group) 235 236 try: 237 # Log the failed actions 238 await self._bridge.log_actions(list(actions), motion_group=motion_group) 239 240 # Handle specific PlanTrajectoryFailed errors which have additional data 241 from nova.core.exceptions import PlanTrajectoryFailed 242 243 if isinstance(error, PlanTrajectoryFailed): 244 # Log the trajectory from the failed plan 245 if hasattr(error.error, "joint_trajectory") and error.error.joint_trajectory: 246 await self._bridge.log_trajectory( 247 error.error.joint_trajectory, tcp, motion_group 248 ) 249 250 # Log error feedback if available 251 if hasattr(error.error, "error_feedback") and error.error.error_feedback: 252 await self._bridge.log_error_feedback(error.error.error_feedback) 253 254 # Log error information as text 255 import rerun as rr 256 257 error_message = f"Planning failed: {type(error).__name__}: {str(error)}" 258 rr.log("planning/errors", rr.TextLog(error_message, level=rr.TextLogLevel.ERROR)) 259 260 # Log collision scenes from actions if configured (they might be relevant to the failure) 261 if self.show_collision_scenes: 262 collision_scenes = extract_collision_scenes_from_actions(actions) 263 if collision_scenes: 264 # Log collision scenes using the sync method 265 self._bridge._log_collision_scene(collision_scenes) 266 267 except Exception as e: 268 logger.warning("Failed to log planning failure in Rerun viewer: %s", e) 269 270 def get_bridge(self) -> Optional[NovaRerunBridgeProtocol]: 271 """Get the underlying NovaRerunBridge instance. 272 273 This allows advanced users to access the full bridge functionality. 274 275 Returns: 276 The NovaRerunBridge instance if configured, None otherwise. 277 """ 278 return self._bridge 279 280 def cleanup(self) -> None: 281 """Clean up rerun integration after program execution.""" 282 self._bridge = None 283 self._logged_safety_zones.clear() # Reset safety zone tracking 284 285 def _resolve_tool_asset(self, tcp: str) -> Optional[str]: 286 """Resolve the tool asset file path for a given TCP. 287 288 Args: 289 tcp: The TCP ID to resolve tool asset for 290 291 Returns: 292 Path to tool asset file if configured, None otherwise 293 """ 294 return self.tcp_tools.get(tcp)
Rerun viewer for 3D visualization of robot motion and program execution.
This viewer automatically captures and visualizes:
- Robot trajectories and motion paths
- TCP poses and transformations
- Motion group states
- Planning requests and responses
- Collision scenes and safety zones (optional)
- Tool geometries attached to specific TCPs
Example usage:
3D view only (default)
@nova.program( viewer=nova.viewers.Rerun( tcp_tools={"vacuum": "assets/vacuum_cup.stl"} ) )
Full interface with detailed analysis panels
@nova.program( viewer=nova.viewers.Rerun( show_details=True, show_safety_zones=True, show_collision_link_chain=True, show_safety_link_chain=True, tcp_tools={ "vacuum": "assets/vacuum_cup.stl", "gripper": "assets/parallel_gripper.stl" } ) )
58 def __init__( 59 self, 60 application_id: Optional[str] = None, 61 spawn: bool = True, 62 show_safety_zones: bool = True, 63 show_collision_scenes: bool = True, 64 show_collision_link_chain: bool = False, 65 show_safety_link_chain: bool = True, 66 tcp_tools: Optional[dict[str, str]] = None, 67 show_details: bool = False, 68 ) -> None: 69 """ 70 Initialize the Rerun viewer. 71 72 Args: 73 application_id: Optional application ID for the rerun recording 74 spawn: Whether to spawn a rerun viewer process automatically 75 show_safety_zones: Whether to visualize safety zones for motion groups 76 show_collision_scenes: Whether to show collision scenes 77 show_collision_link_chain: Whether to show robot collision mesh geometry 78 show_safety_link_chain: Whether to show robot safety geometry (from controller) 79 tcp_tools: Optional mapping of TCP IDs to tool asset file paths 80 show_details: Whether to show detailed analysis panels with charts and logs (False = 3D view only) 81 """ 82 self.application_id: Optional[str] = application_id 83 self.spawn: bool = spawn 84 self.show_safety_zones: bool = show_safety_zones 85 self.show_collision_scenes: bool = show_collision_scenes 86 self.show_collision_link_chain: bool = show_collision_link_chain 87 self.show_safety_link_chain: bool = show_safety_link_chain 88 self.tcp_tools: dict[str, str] = tcp_tools or {} 89 self.show_details: bool = show_details 90 self._bridge: Optional[NovaRerunBridgeProtocol] = None 91 self._logged_safety_zones: set[str] = ( 92 set() 93 ) # Track motion groups that already have safety zones logged 94 95 # Register this viewer as active 96 register_viewer(self)
Initialize the Rerun viewer.
Arguments:
- application_id: Optional application ID for the rerun recording
- spawn: Whether to spawn a rerun viewer process automatically
- show_safety_zones: Whether to visualize safety zones for motion groups
- show_collision_scenes: Whether to show collision scenes
- show_collision_link_chain: Whether to show robot collision mesh geometry
- show_safety_link_chain: Whether to show robot safety geometry (from controller)
- tcp_tools: Optional mapping of TCP IDs to tool asset file paths
- show_details: Whether to show detailed analysis panels with charts and logs (False = 3D view only)
98 def configure(self, nova: Nova) -> None: 99 """Configure rerun integration for program execution.""" 100 if self._bridge is not None: 101 return # Already configured 102 103 try: 104 from nova_rerun_bridge import NovaRerunBridge 105 106 bridge = NovaRerunBridge( 107 nova=nova, 108 spawn=self.spawn, 109 recording_id=self.application_id, 110 show_details=self.show_details, 111 show_collision_link_chain=self.show_collision_link_chain, 112 show_safety_link_chain=self.show_safety_link_chain, 113 ) 114 self._bridge = cast(NovaRerunBridgeProtocol, bridge) 115 # Don't setup async components immediately - wait for controllers to be ready 116 except ImportError: 117 # nova_rerun_bridge not available, skip rerun integration 118 logger.warning( 119 "Rerun viewer configured but nova_rerun_bridge not available. " 120 "Install with: uv add wandelbots-nova --extra nova-rerun-bridge" 121 )
Configure rerun integration for program execution.
123 async def setup_after_preconditions(self) -> None: 124 """Setup async components after preconditions (like controllers) are satisfied.""" 125 if self._bridge and not hasattr(self, "_async_setup_done"): 126 await self._setup_async_components() 127 self._async_setup_done = True
Setup async components after preconditions (like controllers) are satisfied.
198 async def log_planning_success( 199 self, 200 actions: Sequence[Action], 201 trajectory: models.JointTrajectory, 202 tcp: str, 203 motion_group: MotionGroup, 204 ) -> None: 205 """Log successful planning results to Rerun viewer. 206 207 Args: 208 actions: List of actions that were planned 209 trajectory: The resulting trajectory 210 tcp: TCP used for planning 211 motion_group: The motion group used for planning 212 """ 213 # Ensure safety zones are logged for this motion group (only on first use) 214 await self._ensure_safety_zones_logged(motion_group) 215 216 # Log the planning results 217 await self._log_planning_results(actions, trajectory, tcp, motion_group)
Log successful planning results to Rerun viewer.
Arguments:
- actions: List of actions that were planned
- trajectory: The resulting trajectory
- tcp: TCP used for planning
- motion_group: The motion group used for planning
219 async def log_planning_failure( 220 self, actions: Sequence[Action], error: Exception, tcp: str, motion_group: MotionGroup 221 ) -> None: 222 """Log planning failure to Rerun viewer. 223 224 Args: 225 actions: List of actions that failed to plan 226 error: The planning error that occurred 227 tcp: TCP used for planning 228 motion_group: The motion group used for planning 229 """ 230 if not self._bridge: 231 return 232 233 # Ensure safety zones are logged for this motion group (only on first use) 234 await self._ensure_safety_zones_logged(motion_group) 235 236 try: 237 # Log the failed actions 238 await self._bridge.log_actions(list(actions), motion_group=motion_group) 239 240 # Handle specific PlanTrajectoryFailed errors which have additional data 241 from nova.core.exceptions import PlanTrajectoryFailed 242 243 if isinstance(error, PlanTrajectoryFailed): 244 # Log the trajectory from the failed plan 245 if hasattr(error.error, "joint_trajectory") and error.error.joint_trajectory: 246 await self._bridge.log_trajectory( 247 error.error.joint_trajectory, tcp, motion_group 248 ) 249 250 # Log error feedback if available 251 if hasattr(error.error, "error_feedback") and error.error.error_feedback: 252 await self._bridge.log_error_feedback(error.error.error_feedback) 253 254 # Log error information as text 255 import rerun as rr 256 257 error_message = f"Planning failed: {type(error).__name__}: {str(error)}" 258 rr.log("planning/errors", rr.TextLog(error_message, level=rr.TextLogLevel.ERROR)) 259 260 # Log collision scenes from actions if configured (they might be relevant to the failure) 261 if self.show_collision_scenes: 262 collision_scenes = extract_collision_scenes_from_actions(actions) 263 if collision_scenes: 264 # Log collision scenes using the sync method 265 self._bridge._log_collision_scene(collision_scenes) 266 267 except Exception as e: 268 logger.warning("Failed to log planning failure in Rerun viewer: %s", e)
Log planning failure to Rerun viewer.
Arguments:
- actions: List of actions that failed to plan
- error: The planning error that occurred
- tcp: TCP used for planning
- motion_group: The motion group used for planning
270 def get_bridge(self) -> Optional[NovaRerunBridgeProtocol]: 271 """Get the underlying NovaRerunBridge instance. 272 273 This allows advanced users to access the full bridge functionality. 274 275 Returns: 276 The NovaRerunBridge instance if configured, None otherwise. 277 """ 278 return self._bridge
Get the underlying NovaRerunBridge instance.
This allows advanced users to access the full bridge functionality.
Returns:
The NovaRerunBridge instance if configured, None otherwise.
17@runtime_checkable 18class NovaRerunBridgeProtocol(Protocol): 19 """Protocol defining the interface for NovaRerunBridge.""" 20 21 nova: Nova 22 show_safety_link_chain: bool 23 24 async def __aenter__(self) -> NovaRerunBridgeProtocol: 25 """Async context manager entry.""" 26 ... 27 28 async def __aexit__(self, exc_type, exc_val, exc_tb) -> Optional[bool]: 29 """Async context manager exit.""" 30 ... 31 32 async def setup_blueprint(self) -> None: 33 """Setup the blueprint.""" 34 ... 35 36 async def log_safety_zones(self, motion_group: MotionGroup) -> None: 37 """Log safety zones for a motion group.""" 38 ... 39 40 async def log_actions( 41 self, 42 actions: Union[list[Action], Action], 43 show_connection: bool = False, 44 show_labels: bool = False, 45 motion_group: Optional[MotionGroup] = None, 46 tcp: Optional[str] = None, 47 ) -> None: 48 """Log actions to the viewer.""" 49 ... 50 51 async def log_trajectory( 52 self, 53 joint_trajectory: models.JointTrajectory, 54 tcp: str, 55 motion_group: MotionGroup, 56 time_offset: float = 0, 57 tool_asset: Optional[str] = None, 58 ) -> None: 59 """Log trajectory to the viewer.""" 60 ... 61 62 def _log_collision_scene(self, collision_scenes: dict[str, models.CollisionScene]) -> None: 63 """Log collision scenes to the viewer.""" 64 ... 65 66 def log_coordinate_system(self) -> None: 67 """Log the coordinate system.""" 68 ... 69 70 async def log_error_feedback( 71 self, error_feedback: PlanTrajectoryFailedResponseErrorFeedback 72 ) -> None: 73 """Log error feedback to the viewer.""" 74 ...
Protocol defining the interface for NovaRerunBridge.
1945def _no_init_or_replace_init(self, *args, **kwargs): 1946 cls = type(self) 1947 1948 if cls._is_protocol: 1949 raise TypeError('Protocols cannot be instantiated') 1950 1951 # Already using a custom `__init__`. No need to calculate correct 1952 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1953 if cls.__init__ is not _no_init_or_replace_init: 1954 return 1955 1956 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1957 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1958 # searches for a proper new `__init__` in the MRO. The new `__init__` 1959 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1960 # instantiation of the protocol subclass will thus use the new 1961 # `__init__` and no longer call `_no_init_or_replace_init`. 1962 for base in cls.__mro__: 1963 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1964 if init is not _no_init_or_replace_init: 1965 cls.__init__ = init 1966 break 1967 else: 1968 # should not happen 1969 cls.__init__ = object.__init__ 1970 1971 cls.__init__(self, *args, **kwargs)
36 async def log_safety_zones(self, motion_group: MotionGroup) -> None: 37 """Log safety zones for a motion group.""" 38 ...
Log safety zones for a motion group.
40 async def log_actions( 41 self, 42 actions: Union[list[Action], Action], 43 show_connection: bool = False, 44 show_labels: bool = False, 45 motion_group: Optional[MotionGroup] = None, 46 tcp: Optional[str] = None, 47 ) -> None: 48 """Log actions to the viewer.""" 49 ...
Log actions to the viewer.
51 async def log_trajectory( 52 self, 53 joint_trajectory: models.JointTrajectory, 54 tcp: str, 55 motion_group: MotionGroup, 56 time_offset: float = 0, 57 tool_asset: Optional[str] = None, 58 ) -> None: 59 """Log trajectory to the viewer.""" 60 ...
Log trajectory to the viewer.
70 async def log_error_feedback( 71 self, error_feedback: PlanTrajectoryFailedResponseErrorFeedback 72 ) -> None: 73 """Log error feedback to the viewer.""" 74 ...
Log error feedback to the viewer.
83def get_viewer_manager() -> ViewerManager: 84 """Get the global viewer manager instance.""" 85 return _viewer_manager
Get the global viewer manager instance.
94def configure_active_viewers(nova: Nova) -> None: 95 """Configure all active viewers with the Nova instance. (Legacy function for backward compatibility)""" 96 _viewer_manager.configure_viewers(nova)
Configure all active viewers with the Nova instance. (Legacy function for backward compatibility)
99async def setup_active_viewers_after_preconditions() -> None: 100 """Setup all active viewers after preconditions are satisfied. (Legacy function for backward compatibility)""" 101 await _viewer_manager.setup_viewers_after_preconditions()
Setup all active viewers after preconditions are satisfied. (Legacy function for backward compatibility)
104def cleanup_active_viewers() -> None: 105 """Clean up all active viewers. (Legacy function for backward compatibility)""" 106 _viewer_manager.cleanup_viewers()
Clean up all active viewers. (Legacy function for backward compatibility)
109async def log_planning_results_to_viewers( 110 actions: Sequence[Action], 111 trajectory: models.JointTrajectory, 112 tcp: str, 113 motion_group: MotionGroup, 114) -> None: 115 """Log successful planning results to all active viewers. (Legacy function for backward compatibility)""" 116 await _viewer_manager.log_planning_success(actions, trajectory, tcp, motion_group)
Log successful planning results to all active viewers. (Legacy function for backward compatibility)
119async def log_planning_error_to_viewers( 120 actions: Sequence[Action], error: Exception, tcp: str, motion_group: MotionGroup 121) -> None: 122 """Log planning failure to all active viewers. (Legacy function for backward compatibility)""" 123 await _viewer_manager.log_planning_failure(actions, error, tcp, motion_group)
Log planning failure to all active viewers. (Legacy function for backward compatibility)
13def extract_collision_scenes_from_actions( 14 actions: Sequence[Action], 15) -> dict[str, models.CollisionScene]: 16 """Extract unique collision scenes from a list of actions. 17 18 Args: 19 actions: List of actions to extract collision scenes from 20 21 Returns: 22 Dictionary mapping collision scene IDs to CollisionScene objects 23 """ 24 from nova.actions.motions import CollisionFreeMotion, Motion 25 26 collision_scenes: dict[str, models.CollisionScene] = {} 27 28 for i, action in enumerate(actions): 29 # Check if action is a motion with collision_scene attribute 30 if isinstance(action, (Motion, CollisionFreeMotion)) and action.collision_scene is not None: 31 # Generate a deterministic ID based on action index and type 32 scene_id = f"action_{i}_{type(action).__name__}_scene" 33 collision_scenes[scene_id] = action.collision_scene 34 35 return collision_scenes
Extract unique collision scenes from a list of actions.
Arguments:
- actions: List of actions to extract collision scenes from
Returns:
Dictionary mapping collision scene IDs to CollisionScene objects
89def register_viewer(viewer: Viewer) -> None: 90 """Register a viewer as active. (Legacy function for backward compatibility)""" 91 _viewer_manager.register_viewer(viewer)
Register a viewer as active. (Legacy function for backward compatibility)