nova.viewers

Viewer implementations for Nova programs.

This module provides different viewer backends that can be used to visualize and monitor Nova programs during execution.

 1"""
 2Viewer implementations for Nova programs.
 3
 4This module provides different viewer backends that can be used
 5to visualize and monitor Nova programs during execution.
 6"""
 7
 8from __future__ import annotations
 9
10# Public API exports
11from .base import Viewer
12from .manager import ViewerManager, get_viewer_manager
13from .manager import cleanup_active_viewers as _cleanup_active_viewers
14from .manager import configure_active_viewers as _configure_active_viewers
15from .manager import log_planning_error_to_viewers as _log_planning_error_to_viewers
16from .manager import log_planning_results_to_viewers as _log_planning_results_to_viewers
17from .manager import register_viewer as _register_viewer
18from .manager import (
19    setup_active_viewers_after_preconditions as _setup_active_viewers_after_preconditions,
20)
21from .protocol import NovaRerunBridgeProtocol
22from .rerun import Rerun
23from .utils import extract_collision_scenes_from_actions as _extract_collision_scenes_from_actions
24
25__all__ = [
26    "Viewer",
27    "ViewerManager",
28    "Rerun",
29    "NovaRerunBridgeProtocol",
30    "get_viewer_manager",
31    # Internal functions (prefixed with underscore)
32    "_configure_active_viewers",
33    "_setup_active_viewers_after_preconditions",
34    "_cleanup_active_viewers",
35    "_log_planning_results_to_viewers",
36    "_log_planning_error_to_viewers",
37    "_extract_collision_scenes_from_actions",
38    "_register_viewer",
39]
class Viewer(abc.ABC):
16class Viewer(ABC):
17    """Abstract base class for Nova program viewers."""
18
19    @abstractmethod
20    def configure(self, nova: Nova) -> None:
21        """Configure the viewer for program execution."""
22        pass
23
24    async def setup_after_preconditions(self) -> None:
25        """Setup viewer components after preconditions are satisfied.
26
27        Override this method in subclasses that need to wait for preconditions
28        like active controllers before setting up visualization components.
29        """
30        pass
31
32    @abstractmethod
33    def cleanup(self) -> None:
34        """Clean up the viewer after program execution."""
35        pass
36
37    async def log_planning_success(
38        self,
39        actions: Sequence[Action],
40        trajectory: models.JointTrajectory,
41        tcp: str,
42        motion_group: MotionGroup,
43    ) -> None:
44        """Log successful planning results.
45
46        Args:
47            actions: List of actions that were planned
48            trajectory: The resulting trajectory
49            tcp: TCP used for planning
50            motion_group: The motion group used for planning
51        """
52        pass
53
54    async def log_planning_failure(
55        self, actions: Sequence[Action], error: Exception, tcp: str, motion_group: MotionGroup
56    ) -> None:
57        """Log planning failure results.
58
59        Args:
60            actions: List of actions that failed to plan
61            error: The planning error that occurred
62            tcp: TCP used for planning
63            motion_group: The motion group used for planning
64        """
65        pass

Abstract base class for Nova program viewers.

@abstractmethod
def configure(self, nova: nova.Nova) -> None:
19    @abstractmethod
20    def configure(self, nova: Nova) -> None:
21        """Configure the viewer for program execution."""
22        pass

Configure the viewer for program execution.

async def setup_after_preconditions(self) -> None:
24    async def setup_after_preconditions(self) -> None:
25        """Setup viewer components after preconditions are satisfied.
26
27        Override this method in subclasses that need to wait for preconditions
28        like active controllers before setting up visualization components.
29        """
30        pass

Setup viewer components after preconditions are satisfied.

Override this method in subclasses that need to wait for preconditions like active controllers before setting up visualization components.

@abstractmethod
def cleanup(self) -> None:
32    @abstractmethod
33    def cleanup(self) -> None:
34        """Clean up the viewer after program execution."""
35        pass

Clean up the viewer after program execution.

async def log_planning_success( self, actions: Sequence[nova.actions.Action], trajectory: wandelbots_api_client.models.joint_trajectory.JointTrajectory, tcp: str, motion_group: nova.MotionGroup) -> None:
37    async def log_planning_success(
38        self,
39        actions: Sequence[Action],
40        trajectory: models.JointTrajectory,
41        tcp: str,
42        motion_group: MotionGroup,
43    ) -> None:
44        """Log successful planning results.
45
46        Args:
47            actions: List of actions that were planned
48            trajectory: The resulting trajectory
49            tcp: TCP used for planning
50            motion_group: The motion group used for planning
51        """
52        pass

Log successful planning results.

Arguments:
  • actions: List of actions that were planned
  • trajectory: The resulting trajectory
  • tcp: TCP used for planning
  • motion_group: The motion group used for planning
async def log_planning_failure( self, actions: Sequence[nova.actions.Action], error: Exception, tcp: str, motion_group: nova.MotionGroup) -> None:
54    async def log_planning_failure(
55        self, actions: Sequence[Action], error: Exception, tcp: str, motion_group: MotionGroup
56    ) -> None:
57        """Log planning failure results.
58
59        Args:
60            actions: List of actions that failed to plan
61            error: The planning error that occurred
62            tcp: TCP used for planning
63            motion_group: The motion group used for planning
64        """
65        pass

Log planning failure results.

Arguments:
  • actions: List of actions that failed to plan
  • error: The planning error that occurred
  • tcp: TCP used for planning
  • motion_group: The motion group used for planning
class ViewerManager:
21class ViewerManager:
22    """Manages the lifecycle and coordination of all active viewers."""
23
24    def __init__(self) -> None:
25        self._viewers: WeakSet[Viewer] = WeakSet()
26
27    def register_viewer(self, viewer: Viewer) -> None:
28        """Register a viewer as active."""
29        self._viewers.add(viewer)
30
31    def configure_viewers(self, nova: Nova) -> None:
32        """Configure all active viewers with the Nova instance."""
33        for viewer in self._viewers:
34            viewer.configure(nova)
35
36    async def setup_viewers_after_preconditions(self) -> None:
37        """Setup all active viewers after preconditions are satisfied."""
38        for viewer in self._viewers:
39            await viewer.setup_after_preconditions()
40
41    def cleanup_viewers(self) -> None:
42        """Clean up all active viewers."""
43        for viewer in list(self._viewers):  # Copy to avoid modification during iteration
44            viewer.cleanup()
45        self._viewers.clear()
46
47    async def log_planning_success(
48        self,
49        actions: Sequence[Action],
50        trajectory: models.JointTrajectory,
51        tcp: str,
52        motion_group: MotionGroup,
53    ) -> None:
54        """Log successful planning results to all active viewers."""
55        for viewer in self._viewers:
56            try:
57                await viewer.log_planning_success(actions, trajectory, tcp, motion_group)
58            except Exception as e:
59                # Don't fail planning if logging fails
60                logger.warning("Failed to log planning results to viewer: %s", e)
61
62    async def log_planning_failure(
63        self, actions: Sequence[Action], error: Exception, tcp: str, motion_group: MotionGroup
64    ) -> None:
65        """Log planning failure to all active viewers."""
66        for viewer in self._viewers:
67            try:
68                await viewer.log_planning_failure(actions, error, tcp, motion_group)
69            except Exception as e:
70                # Don't fail planning if logging fails
71                logger.warning("Failed to log planning error to viewer: %s", e)
72
73    @property
74    def has_active_viewers(self) -> bool:
75        """Check if there are any active viewers."""
76        return len(self._viewers) > 0

Manages the lifecycle and coordination of all active viewers.

def register_viewer(self, viewer: Viewer) -> None:
27    def register_viewer(self, viewer: Viewer) -> None:
28        """Register a viewer as active."""
29        self._viewers.add(viewer)

Register a viewer as active.

def configure_viewers(self, nova: nova.Nova) -> None:
31    def configure_viewers(self, nova: Nova) -> None:
32        """Configure all active viewers with the Nova instance."""
33        for viewer in self._viewers:
34            viewer.configure(nova)

Configure all active viewers with the Nova instance.

async def setup_viewers_after_preconditions(self) -> None:
36    async def setup_viewers_after_preconditions(self) -> None:
37        """Setup all active viewers after preconditions are satisfied."""
38        for viewer in self._viewers:
39            await viewer.setup_after_preconditions()

Setup all active viewers after preconditions are satisfied.

def cleanup_viewers(self) -> None:
41    def cleanup_viewers(self) -> None:
42        """Clean up all active viewers."""
43        for viewer in list(self._viewers):  # Copy to avoid modification during iteration
44            viewer.cleanup()
45        self._viewers.clear()

Clean up all active viewers.

async def log_planning_success( self, actions: Sequence[nova.actions.Action], trajectory: wandelbots_api_client.models.joint_trajectory.JointTrajectory, tcp: str, motion_group: nova.MotionGroup) -> None:
47    async def log_planning_success(
48        self,
49        actions: Sequence[Action],
50        trajectory: models.JointTrajectory,
51        tcp: str,
52        motion_group: MotionGroup,
53    ) -> None:
54        """Log successful planning results to all active viewers."""
55        for viewer in self._viewers:
56            try:
57                await viewer.log_planning_success(actions, trajectory, tcp, motion_group)
58            except Exception as e:
59                # Don't fail planning if logging fails
60                logger.warning("Failed to log planning results to viewer: %s", e)

Log successful planning results to all active viewers.

async def log_planning_failure( self, actions: Sequence[nova.actions.Action], error: Exception, tcp: str, motion_group: nova.MotionGroup) -> None:
62    async def log_planning_failure(
63        self, actions: Sequence[Action], error: Exception, tcp: str, motion_group: MotionGroup
64    ) -> None:
65        """Log planning failure to all active viewers."""
66        for viewer in self._viewers:
67            try:
68                await viewer.log_planning_failure(actions, error, tcp, motion_group)
69            except Exception as e:
70                # Don't fail planning if logging fails
71                logger.warning("Failed to log planning error to viewer: %s", e)

Log planning failure to all active viewers.

has_active_viewers: bool
73    @property
74    def has_active_viewers(self) -> bool:
75        """Check if there are any active viewers."""
76        return len(self._viewers) > 0

Check if there are any active viewers.

class Rerun(nova.viewers.Viewer):
 23class Rerun(Viewer):
 24    """
 25    Rerun viewer for 3D visualization of robot motion and program execution.
 26
 27    This viewer automatically captures and visualizes:
 28    - Robot trajectories and motion paths
 29    - TCP poses and transformations
 30    - Motion group states
 31    - Planning requests and responses
 32    - Collision scenes and safety zones (optional)
 33    - Tool geometries attached to specific TCPs
 34
 35    Example usage:
 36        # 3D view only (default)
 37        @nova.program(
 38            viewer=nova.viewers.Rerun(
 39                tcp_tools={"vacuum": "assets/vacuum_cup.stl"}
 40            )
 41        )
 42
 43        # Full interface with detailed analysis panels
 44        @nova.program(
 45            viewer=nova.viewers.Rerun(
 46                show_details=True,
 47                show_safety_zones=True,
 48                show_collision_link_chain=True,
 49                show_safety_link_chain=True,
 50                tcp_tools={
 51                    "vacuum": "assets/vacuum_cup.stl",
 52                    "gripper": "assets/parallel_gripper.stl"
 53                }
 54            )
 55        )
 56    """
 57
 58    def __init__(
 59        self,
 60        application_id: Optional[str] = None,
 61        spawn: bool = True,
 62        show_safety_zones: bool = True,
 63        show_collision_scenes: bool = True,
 64        show_collision_link_chain: bool = False,
 65        show_safety_link_chain: bool = True,
 66        tcp_tools: Optional[dict[str, str]] = None,
 67        show_details: bool = False,
 68    ) -> None:
 69        """
 70        Initialize the Rerun viewer.
 71
 72        Args:
 73            application_id: Optional application ID for the rerun recording
 74            spawn: Whether to spawn a rerun viewer process automatically
 75            show_safety_zones: Whether to visualize safety zones for motion groups
 76            show_collision_scenes: Whether to show collision scenes
 77            show_collision_link_chain: Whether to show robot collision mesh geometry
 78            show_safety_link_chain: Whether to show robot safety geometry (from controller)
 79            tcp_tools: Optional mapping of TCP IDs to tool asset file paths
 80            show_details: Whether to show detailed analysis panels with charts and logs (False = 3D view only)
 81        """
 82        self.application_id: Optional[str] = application_id
 83        self.spawn: bool = spawn
 84        self.show_safety_zones: bool = show_safety_zones
 85        self.show_collision_scenes: bool = show_collision_scenes
 86        self.show_collision_link_chain: bool = show_collision_link_chain
 87        self.show_safety_link_chain: bool = show_safety_link_chain
 88        self.tcp_tools: dict[str, str] = tcp_tools or {}
 89        self.show_details: bool = show_details
 90        self._bridge: Optional[NovaRerunBridgeProtocol] = None
 91        self._logged_safety_zones: set[str] = (
 92            set()
 93        )  # Track motion groups that already have safety zones logged
 94
 95        # Register this viewer as active
 96        register_viewer(self)
 97
 98    def configure(self, nova: Nova) -> None:
 99        """Configure rerun integration for program execution."""
100        if self._bridge is not None:
101            return  # Already configured
102
103        try:
104            from nova_rerun_bridge import NovaRerunBridge
105
106            bridge = NovaRerunBridge(
107                nova=nova,
108                spawn=self.spawn,
109                recording_id=self.application_id,
110                show_details=self.show_details,
111                show_collision_link_chain=self.show_collision_link_chain,
112                show_safety_link_chain=self.show_safety_link_chain,
113            )
114            self._bridge = cast(NovaRerunBridgeProtocol, bridge)
115            # Don't setup async components immediately - wait for controllers to be ready
116        except ImportError:
117            # nova_rerun_bridge not available, skip rerun integration
118            logger.warning(
119                "Rerun viewer configured but nova_rerun_bridge not available. "
120                "Install with: uv add wandelbots-nova --extra nova-rerun-bridge"
121            )
122
123    async def setup_after_preconditions(self) -> None:
124        """Setup async components after preconditions (like controllers) are satisfied."""
125        if self._bridge and not hasattr(self, "_async_setup_done"):
126            await self._setup_async_components()
127            self._async_setup_done = True
128
129    async def _setup_async_components(self) -> None:
130        """Setup async components like blueprint."""
131        if self._bridge:
132            # Initialize the bridge's own Nova client before using it
133            await self._bridge.__aenter__()
134
135            # Setup blueprint (show_details is already configured in bridge)
136            await self._bridge.setup_blueprint()
137
138    async def _ensure_safety_zones_logged(self, motion_group: MotionGroup) -> None:
139        """Ensure safety zones are logged for the given motion group.
140
141        This method is called during planning to ensure safety zones are shown
142        only for motion groups that are actually being used.
143
144        Args:
145            motion_group: The motion group to log safety zones for
146        """
147        if not self.show_safety_zones or not self._bridge:
148            return
149
150        # Use the motion group ID as unique identifier
151        motion_group_id = motion_group.motion_group_id
152
153        if motion_group_id not in self._logged_safety_zones:
154            try:
155                await self._bridge.log_safety_zones(motion_group)
156                self._logged_safety_zones.add(motion_group_id)
157            except Exception as e:
158                logger.warning(
159                    "Could not log safety zones for motion group %s: %s", motion_group_id, e
160                )
161
162    async def _log_planning_results(
163        self,
164        actions: Sequence[Action],
165        trajectory: models.JointTrajectory,
166        tcp: str,
167        motion_group: MotionGroup,
168    ) -> None:
169        """Log planning results including actions, trajectory, and collision scenes.
170
171        Args:
172            actions: List of actions that were planned
173            trajectory: The resulting trajectory
174            tcp: TCP used for planning
175            motion_group: The motion group used for planning
176        """
177        if not self._bridge:
178            return
179
180        try:
181            # Log actions
182            await self._bridge.log_actions(list(actions), motion_group=motion_group)
183
184            # Log trajectory with tool asset if configured for this TCP
185            tool_asset = self._resolve_tool_asset(tcp)
186            await self._bridge.log_trajectory(trajectory, tcp, motion_group, tool_asset=tool_asset)
187
188            # Log collision scenes from actions if configured
189            if self.show_collision_scenes:
190                collision_scenes = extract_collision_scenes_from_actions(actions)
191                if collision_scenes:
192                    # Log collision scenes using the sync method
193                    self._bridge._log_collision_scene(collision_scenes)
194
195        except Exception as e:
196            logger.warning("Failed to log planning results in Rerun viewer: %s", e)
197
198    async def log_planning_success(
199        self,
200        actions: Sequence[Action],
201        trajectory: models.JointTrajectory,
202        tcp: str,
203        motion_group: MotionGroup,
204    ) -> None:
205        """Log successful planning results to Rerun viewer.
206
207        Args:
208            actions: List of actions that were planned
209            trajectory: The resulting trajectory
210            tcp: TCP used for planning
211            motion_group: The motion group used for planning
212        """
213        # Ensure safety zones are logged for this motion group (only on first use)
214        await self._ensure_safety_zones_logged(motion_group)
215
216        # Log the planning results
217        await self._log_planning_results(actions, trajectory, tcp, motion_group)
218
219    async def log_planning_failure(
220        self, actions: Sequence[Action], error: Exception, tcp: str, motion_group: MotionGroup
221    ) -> None:
222        """Log planning failure to Rerun viewer.
223
224        Args:
225            actions: List of actions that failed to plan
226            error: The planning error that occurred
227            tcp: TCP used for planning
228            motion_group: The motion group used for planning
229        """
230        if not self._bridge:
231            return
232
233        # Ensure safety zones are logged for this motion group (only on first use)
234        await self._ensure_safety_zones_logged(motion_group)
235
236        try:
237            # Log the failed actions
238            await self._bridge.log_actions(list(actions), motion_group=motion_group)
239
240            # Handle specific PlanTrajectoryFailed errors which have additional data
241            from nova.core.exceptions import PlanTrajectoryFailed
242
243            if isinstance(error, PlanTrajectoryFailed):
244                # Log the trajectory from the failed plan
245                if hasattr(error.error, "joint_trajectory") and error.error.joint_trajectory:
246                    await self._bridge.log_trajectory(
247                        error.error.joint_trajectory, tcp, motion_group
248                    )
249
250                # Log error feedback if available
251                if hasattr(error.error, "error_feedback") and error.error.error_feedback:
252                    await self._bridge.log_error_feedback(error.error.error_feedback)
253
254            # Log error information as text
255            import rerun as rr
256
257            error_message = f"Planning failed: {type(error).__name__}: {str(error)}"
258            rr.log("planning/errors", rr.TextLog(error_message, level=rr.TextLogLevel.ERROR))
259
260            # Log collision scenes from actions if configured (they might be relevant to the failure)
261            if self.show_collision_scenes:
262                collision_scenes = extract_collision_scenes_from_actions(actions)
263                if collision_scenes:
264                    # Log collision scenes using the sync method
265                    self._bridge._log_collision_scene(collision_scenes)
266
267        except Exception as e:
268            logger.warning("Failed to log planning failure in Rerun viewer: %s", e)
269
270    def get_bridge(self) -> Optional[NovaRerunBridgeProtocol]:
271        """Get the underlying NovaRerunBridge instance.
272
273        This allows advanced users to access the full bridge functionality.
274
275        Returns:
276            The NovaRerunBridge instance if configured, None otherwise.
277        """
278        return self._bridge
279
280    def cleanup(self) -> None:
281        """Clean up rerun integration after program execution."""
282        self._bridge = None
283        self._logged_safety_zones.clear()  # Reset safety zone tracking
284
285    def _resolve_tool_asset(self, tcp: str) -> Optional[str]:
286        """Resolve the tool asset file path for a given TCP.
287
288        Args:
289            tcp: The TCP ID to resolve tool asset for
290
291        Returns:
292            Path to tool asset file if configured, None otherwise
293        """
294        return self.tcp_tools.get(tcp)

Rerun viewer for 3D visualization of robot motion and program execution.

This viewer automatically captures and visualizes:

  • Robot trajectories and motion paths
  • TCP poses and transformations
  • Motion group states
  • Planning requests and responses
  • Collision scenes and safety zones (optional)
  • Tool geometries attached to specific TCPs
Example usage:

3D view only (default)

@nova.program( viewer=nova.viewers.Rerun( tcp_tools={"vacuum": "assets/vacuum_cup.stl"} ) )

Full interface with detailed analysis panels

@nova.program( viewer=nova.viewers.Rerun( show_details=True, show_safety_zones=True, show_collision_link_chain=True, show_safety_link_chain=True, tcp_tools={ "vacuum": "assets/vacuum_cup.stl", "gripper": "assets/parallel_gripper.stl" } ) )

Rerun( application_id: Optional[str] = None, spawn: bool = True, show_safety_zones: bool = True, show_collision_scenes: bool = True, show_collision_link_chain: bool = False, show_safety_link_chain: bool = True, tcp_tools: Optional[dict[str, str]] = None, show_details: bool = False)
58    def __init__(
59        self,
60        application_id: Optional[str] = None,
61        spawn: bool = True,
62        show_safety_zones: bool = True,
63        show_collision_scenes: bool = True,
64        show_collision_link_chain: bool = False,
65        show_safety_link_chain: bool = True,
66        tcp_tools: Optional[dict[str, str]] = None,
67        show_details: bool = False,
68    ) -> None:
69        """
70        Initialize the Rerun viewer.
71
72        Args:
73            application_id: Optional application ID for the rerun recording
74            spawn: Whether to spawn a rerun viewer process automatically
75            show_safety_zones: Whether to visualize safety zones for motion groups
76            show_collision_scenes: Whether to show collision scenes
77            show_collision_link_chain: Whether to show robot collision mesh geometry
78            show_safety_link_chain: Whether to show robot safety geometry (from controller)
79            tcp_tools: Optional mapping of TCP IDs to tool asset file paths
80            show_details: Whether to show detailed analysis panels with charts and logs (False = 3D view only)
81        """
82        self.application_id: Optional[str] = application_id
83        self.spawn: bool = spawn
84        self.show_safety_zones: bool = show_safety_zones
85        self.show_collision_scenes: bool = show_collision_scenes
86        self.show_collision_link_chain: bool = show_collision_link_chain
87        self.show_safety_link_chain: bool = show_safety_link_chain
88        self.tcp_tools: dict[str, str] = tcp_tools or {}
89        self.show_details: bool = show_details
90        self._bridge: Optional[NovaRerunBridgeProtocol] = None
91        self._logged_safety_zones: set[str] = (
92            set()
93        )  # Track motion groups that already have safety zones logged
94
95        # Register this viewer as active
96        register_viewer(self)

Initialize the Rerun viewer.

Arguments:
  • application_id: Optional application ID for the rerun recording
  • spawn: Whether to spawn a rerun viewer process automatically
  • show_safety_zones: Whether to visualize safety zones for motion groups
  • show_collision_scenes: Whether to show collision scenes
  • show_collision_link_chain: Whether to show robot collision mesh geometry
  • show_safety_link_chain: Whether to show robot safety geometry (from controller)
  • tcp_tools: Optional mapping of TCP IDs to tool asset file paths
  • show_details: Whether to show detailed analysis panels with charts and logs (False = 3D view only)
application_id: Optional[str]
spawn: bool
show_safety_zones: bool
show_collision_scenes: bool
tcp_tools: dict[str, str]
show_details: bool
def configure(self, nova: nova.Nova) -> None:
 98    def configure(self, nova: Nova) -> None:
 99        """Configure rerun integration for program execution."""
100        if self._bridge is not None:
101            return  # Already configured
102
103        try:
104            from nova_rerun_bridge import NovaRerunBridge
105
106            bridge = NovaRerunBridge(
107                nova=nova,
108                spawn=self.spawn,
109                recording_id=self.application_id,
110                show_details=self.show_details,
111                show_collision_link_chain=self.show_collision_link_chain,
112                show_safety_link_chain=self.show_safety_link_chain,
113            )
114            self._bridge = cast(NovaRerunBridgeProtocol, bridge)
115            # Don't setup async components immediately - wait for controllers to be ready
116        except ImportError:
117            # nova_rerun_bridge not available, skip rerun integration
118            logger.warning(
119                "Rerun viewer configured but nova_rerun_bridge not available. "
120                "Install with: uv add wandelbots-nova --extra nova-rerun-bridge"
121            )

Configure rerun integration for program execution.

async def setup_after_preconditions(self) -> None:
123    async def setup_after_preconditions(self) -> None:
124        """Setup async components after preconditions (like controllers) are satisfied."""
125        if self._bridge and not hasattr(self, "_async_setup_done"):
126            await self._setup_async_components()
127            self._async_setup_done = True

Setup async components after preconditions (like controllers) are satisfied.

async def log_planning_success( self, actions: Sequence[nova.actions.Action], trajectory: wandelbots_api_client.models.joint_trajectory.JointTrajectory, tcp: str, motion_group: nova.MotionGroup) -> None:
198    async def log_planning_success(
199        self,
200        actions: Sequence[Action],
201        trajectory: models.JointTrajectory,
202        tcp: str,
203        motion_group: MotionGroup,
204    ) -> None:
205        """Log successful planning results to Rerun viewer.
206
207        Args:
208            actions: List of actions that were planned
209            trajectory: The resulting trajectory
210            tcp: TCP used for planning
211            motion_group: The motion group used for planning
212        """
213        # Ensure safety zones are logged for this motion group (only on first use)
214        await self._ensure_safety_zones_logged(motion_group)
215
216        # Log the planning results
217        await self._log_planning_results(actions, trajectory, tcp, motion_group)

Log successful planning results to Rerun viewer.

Arguments:
  • actions: List of actions that were planned
  • trajectory: The resulting trajectory
  • tcp: TCP used for planning
  • motion_group: The motion group used for planning
async def log_planning_failure( self, actions: Sequence[nova.actions.Action], error: Exception, tcp: str, motion_group: nova.MotionGroup) -> None:
219    async def log_planning_failure(
220        self, actions: Sequence[Action], error: Exception, tcp: str, motion_group: MotionGroup
221    ) -> None:
222        """Log planning failure to Rerun viewer.
223
224        Args:
225            actions: List of actions that failed to plan
226            error: The planning error that occurred
227            tcp: TCP used for planning
228            motion_group: The motion group used for planning
229        """
230        if not self._bridge:
231            return
232
233        # Ensure safety zones are logged for this motion group (only on first use)
234        await self._ensure_safety_zones_logged(motion_group)
235
236        try:
237            # Log the failed actions
238            await self._bridge.log_actions(list(actions), motion_group=motion_group)
239
240            # Handle specific PlanTrajectoryFailed errors which have additional data
241            from nova.core.exceptions import PlanTrajectoryFailed
242
243            if isinstance(error, PlanTrajectoryFailed):
244                # Log the trajectory from the failed plan
245                if hasattr(error.error, "joint_trajectory") and error.error.joint_trajectory:
246                    await self._bridge.log_trajectory(
247                        error.error.joint_trajectory, tcp, motion_group
248                    )
249
250                # Log error feedback if available
251                if hasattr(error.error, "error_feedback") and error.error.error_feedback:
252                    await self._bridge.log_error_feedback(error.error.error_feedback)
253
254            # Log error information as text
255            import rerun as rr
256
257            error_message = f"Planning failed: {type(error).__name__}: {str(error)}"
258            rr.log("planning/errors", rr.TextLog(error_message, level=rr.TextLogLevel.ERROR))
259
260            # Log collision scenes from actions if configured (they might be relevant to the failure)
261            if self.show_collision_scenes:
262                collision_scenes = extract_collision_scenes_from_actions(actions)
263                if collision_scenes:
264                    # Log collision scenes using the sync method
265                    self._bridge._log_collision_scene(collision_scenes)
266
267        except Exception as e:
268            logger.warning("Failed to log planning failure in Rerun viewer: %s", e)

Log planning failure to Rerun viewer.

Arguments:
  • actions: List of actions that failed to plan
  • error: The planning error that occurred
  • tcp: TCP used for planning
  • motion_group: The motion group used for planning
def get_bridge(self) -> Optional[NovaRerunBridgeProtocol]:
270    def get_bridge(self) -> Optional[NovaRerunBridgeProtocol]:
271        """Get the underlying NovaRerunBridge instance.
272
273        This allows advanced users to access the full bridge functionality.
274
275        Returns:
276            The NovaRerunBridge instance if configured, None otherwise.
277        """
278        return self._bridge

Get the underlying NovaRerunBridge instance.

This allows advanced users to access the full bridge functionality.

Returns:

The NovaRerunBridge instance if configured, None otherwise.

def cleanup(self) -> None:
280    def cleanup(self) -> None:
281        """Clean up rerun integration after program execution."""
282        self._bridge = None
283        self._logged_safety_zones.clear()  # Reset safety zone tracking

Clean up rerun integration after program execution.

@runtime_checkable
class NovaRerunBridgeProtocol(typing.Protocol):
17@runtime_checkable
18class NovaRerunBridgeProtocol(Protocol):
19    """Protocol defining the interface for NovaRerunBridge."""
20
21    nova: Nova
22    show_safety_link_chain: bool
23
24    async def __aenter__(self) -> NovaRerunBridgeProtocol:
25        """Async context manager entry."""
26        ...
27
28    async def __aexit__(self, exc_type, exc_val, exc_tb) -> Optional[bool]:
29        """Async context manager exit."""
30        ...
31
32    async def setup_blueprint(self) -> None:
33        """Setup the blueprint."""
34        ...
35
36    async def log_safety_zones(self, motion_group: MotionGroup) -> None:
37        """Log safety zones for a motion group."""
38        ...
39
40    async def log_actions(
41        self,
42        actions: Union[list[Action], Action],
43        show_connection: bool = False,
44        show_labels: bool = False,
45        motion_group: Optional[MotionGroup] = None,
46        tcp: Optional[str] = None,
47    ) -> None:
48        """Log actions to the viewer."""
49        ...
50
51    async def log_trajectory(
52        self,
53        joint_trajectory: models.JointTrajectory,
54        tcp: str,
55        motion_group: MotionGroup,
56        time_offset: float = 0,
57        tool_asset: Optional[str] = None,
58    ) -> None:
59        """Log trajectory to the viewer."""
60        ...
61
62    def _log_collision_scene(self, collision_scenes: dict[str, models.CollisionScene]) -> None:
63        """Log collision scenes to the viewer."""
64        ...
65
66    def log_coordinate_system(self) -> None:
67        """Log the coordinate system."""
68        ...
69
70    async def log_error_feedback(
71        self, error_feedback: PlanTrajectoryFailedResponseErrorFeedback
72    ) -> None:
73        """Log error feedback to the viewer."""
74        ...

Protocol defining the interface for NovaRerunBridge.

NovaRerunBridgeProtocol(*args, **kwargs)
1945def _no_init_or_replace_init(self, *args, **kwargs):
1946    cls = type(self)
1947
1948    if cls._is_protocol:
1949        raise TypeError('Protocols cannot be instantiated')
1950
1951    # Already using a custom `__init__`. No need to calculate correct
1952    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1953    if cls.__init__ is not _no_init_or_replace_init:
1954        return
1955
1956    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1957    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1958    # searches for a proper new `__init__` in the MRO. The new `__init__`
1959    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1960    # instantiation of the protocol subclass will thus use the new
1961    # `__init__` and no longer call `_no_init_or_replace_init`.
1962    for base in cls.__mro__:
1963        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1964        if init is not _no_init_or_replace_init:
1965            cls.__init__ = init
1966            break
1967    else:
1968        # should not happen
1969        cls.__init__ = object.__init__
1970
1971    cls.__init__(self, *args, **kwargs)
nova: nova.Nova
async def setup_blueprint(self) -> None:
32    async def setup_blueprint(self) -> None:
33        """Setup the blueprint."""
34        ...

Setup the blueprint.

async def log_safety_zones(self, motion_group: nova.MotionGroup) -> None:
36    async def log_safety_zones(self, motion_group: MotionGroup) -> None:
37        """Log safety zones for a motion group."""
38        ...

Log safety zones for a motion group.

async def log_actions( self, actions: Union[list[nova.actions.Action], nova.actions.Action], show_connection: bool = False, show_labels: bool = False, motion_group: Optional[nova.MotionGroup] = None, tcp: Optional[str] = None) -> None:
40    async def log_actions(
41        self,
42        actions: Union[list[Action], Action],
43        show_connection: bool = False,
44        show_labels: bool = False,
45        motion_group: Optional[MotionGroup] = None,
46        tcp: Optional[str] = None,
47    ) -> None:
48        """Log actions to the viewer."""
49        ...

Log actions to the viewer.

async def log_trajectory( self, joint_trajectory: wandelbots_api_client.models.joint_trajectory.JointTrajectory, tcp: str, motion_group: nova.MotionGroup, time_offset: float = 0, tool_asset: Optional[str] = None) -> None:
51    async def log_trajectory(
52        self,
53        joint_trajectory: models.JointTrajectory,
54        tcp: str,
55        motion_group: MotionGroup,
56        time_offset: float = 0,
57        tool_asset: Optional[str] = None,
58    ) -> None:
59        """Log trajectory to the viewer."""
60        ...

Log trajectory to the viewer.

def log_coordinate_system(self) -> None:
66    def log_coordinate_system(self) -> None:
67        """Log the coordinate system."""
68        ...

Log the coordinate system.

async def log_error_feedback( self, error_feedback: wandelbots_api_client.models.plan_trajectory_failed_response_error_feedback.PlanTrajectoryFailedResponseErrorFeedback) -> None:
70    async def log_error_feedback(
71        self, error_feedback: PlanTrajectoryFailedResponseErrorFeedback
72    ) -> None:
73        """Log error feedback to the viewer."""
74        ...

Log error feedback to the viewer.

def get_viewer_manager() -> ViewerManager:
83def get_viewer_manager() -> ViewerManager:
84    """Get the global viewer manager instance."""
85    return _viewer_manager

Get the global viewer manager instance.

def _configure_active_viewers(nova: nova.Nova) -> None:
94def configure_active_viewers(nova: Nova) -> None:
95    """Configure all active viewers with the Nova instance. (Legacy function for backward compatibility)"""
96    _viewer_manager.configure_viewers(nova)

Configure all active viewers with the Nova instance. (Legacy function for backward compatibility)

async def _setup_active_viewers_after_preconditions() -> None:
 99async def setup_active_viewers_after_preconditions() -> None:
100    """Setup all active viewers after preconditions are satisfied. (Legacy function for backward compatibility)"""
101    await _viewer_manager.setup_viewers_after_preconditions()

Setup all active viewers after preconditions are satisfied. (Legacy function for backward compatibility)

def _cleanup_active_viewers() -> None:
104def cleanup_active_viewers() -> None:
105    """Clean up all active viewers. (Legacy function for backward compatibility)"""
106    _viewer_manager.cleanup_viewers()

Clean up all active viewers. (Legacy function for backward compatibility)

async def _log_planning_results_to_viewers( actions: Sequence[nova.actions.Action], trajectory: wandelbots_api_client.models.joint_trajectory.JointTrajectory, tcp: str, motion_group: nova.MotionGroup) -> None:
109async def log_planning_results_to_viewers(
110    actions: Sequence[Action],
111    trajectory: models.JointTrajectory,
112    tcp: str,
113    motion_group: MotionGroup,
114) -> None:
115    """Log successful planning results to all active viewers. (Legacy function for backward compatibility)"""
116    await _viewer_manager.log_planning_success(actions, trajectory, tcp, motion_group)

Log successful planning results to all active viewers. (Legacy function for backward compatibility)

async def _log_planning_error_to_viewers( actions: Sequence[nova.actions.Action], error: Exception, tcp: str, motion_group: nova.MotionGroup) -> None:
119async def log_planning_error_to_viewers(
120    actions: Sequence[Action], error: Exception, tcp: str, motion_group: MotionGroup
121) -> None:
122    """Log planning failure to all active viewers. (Legacy function for backward compatibility)"""
123    await _viewer_manager.log_planning_failure(actions, error, tcp, motion_group)

Log planning failure to all active viewers. (Legacy function for backward compatibility)

def _extract_collision_scenes_from_actions( actions: Sequence[nova.actions.Action]) -> dict[str, wandelbots_api_client.models.collision_scene.CollisionScene]:
13def extract_collision_scenes_from_actions(
14    actions: Sequence[Action],
15) -> dict[str, models.CollisionScene]:
16    """Extract unique collision scenes from a list of actions.
17
18    Args:
19        actions: List of actions to extract collision scenes from
20
21    Returns:
22        Dictionary mapping collision scene IDs to CollisionScene objects
23    """
24    from nova.actions.motions import CollisionFreeMotion, Motion
25
26    collision_scenes: dict[str, models.CollisionScene] = {}
27
28    for i, action in enumerate(actions):
29        # Check if action is a motion with collision_scene attribute
30        if isinstance(action, (Motion, CollisionFreeMotion)) and action.collision_scene is not None:
31            # Generate a deterministic ID based on action index and type
32            scene_id = f"action_{i}_{type(action).__name__}_scene"
33            collision_scenes[scene_id] = action.collision_scene
34
35    return collision_scenes

Extract unique collision scenes from a list of actions.

Arguments:
  • actions: List of actions to extract collision scenes from
Returns:

Dictionary mapping collision scene IDs to CollisionScene objects

def _register_viewer(viewer: Viewer) -> None:
89def register_viewer(viewer: Viewer) -> None:
90    """Register a viewer as active. (Legacy function for backward compatibility)"""
91    _viewer_manager.register_viewer(viewer)

Register a viewer as active. (Legacy function for backward compatibility)