nova_rerun_bridge

 1from nova_rerun_bridge import (
 2    dh_robot,
 3    hull_visualizer,
 4    motion_storage,
 5    nova_rerun_bridge,
 6    robot_visualizer,
 7)
 8
 9__all__ = ["dh_robot", "hull_visualizer", "robot_visualizer", "motion_storage", "NovaRerunBridge"]
10
11NovaRerunBridge = nova_rerun_bridge.NovaRerunBridge
class NovaRerunBridge:
 34class NovaRerunBridge:
 35    """Bridge between Nova and Rerun for visualization.
 36
 37    This class provides functionality to visualize Nova data in Rerun.
 38    It handles trajectoy, collision scenes, blueprints and proper cleanup of resources.
 39
 40    Example:
 41        ```python
 42        from nova.core.nova import Nova
 43        from nova_rerun_bridge import NovaRerunBridge
 44
 45        async def main():
 46            nova = Nova()
 47            async with NovaRerunBridge(nova) as bridge:
 48                await bridge.setup_blueprint()
 49                await bridge.log_collision_scenes()
 50
 51        ```
 52
 53    Args:
 54        nova (Nova): Instance of Nova client
 55        spawn (bool, optional): Whether to spawn Rerun viewer. Defaults to True.
 56    """
 57
 58    def __init__(
 59        self,
 60        nova: Nova,
 61        spawn: bool = True,
 62        recording_id=None,
 63        show_details: bool = True,
 64        show_collision_link_chain: bool = False,
 65        show_safety_link_chain: bool = True,
 66    ) -> None:
 67        self._ensure_models_exist()
 68        # Store the original nova instance for initial setup and to copy connection parameters
 69        self.nova = nova
 70        # Create a separate Nova instance for the bridge to use for API calls
 71        self._bridge_nova = None
 72        self._streaming_tasks: dict[MotionGroup, asyncio.Task] = {}
 73        # Track timing per motion group - each motion group has its own timeline
 74        self._motion_group_timers: dict[str, float] = {}
 75        self.show_details = show_details
 76        self.show_collision_link_chain = show_collision_link_chain
 77        self.show_safety_link_chain = show_safety_link_chain
 78
 79        recording_id = recording_id or f"nova_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
 80
 81        if "VSCODE_PROXY_URI" in os.environ:
 82            rr.init(application_id="nova", recording_id=recording_id, spawn=False)
 83            rr.save("nova.rrd")
 84            logger.info(f"Install rerun app and open the visual log on {get_rerun_address()}")
 85        elif spawn:
 86            rr.init(application_id="nova", recording_id=recording_id, spawn=True)
 87
 88        logger.add(sink=rr.LoggingHandler("logs/handler"))
 89
 90    def _ensure_models_exist(self):
 91        """Ensure robot models are downloaded"""
 92        models_dir = Path(get_project_root()) / "models"
 93        if not models_dir.exists() or not list(models_dir.glob("*.glb")):
 94            print("Models not found, run update_robot_models() or uv run download-models")
 95
 96    async def setup_blueprint(self) -> None:
 97        """Configure and send blueprint configuration to Rerun.
 98
 99        Fetches motion groups from Nova and configures visualization layout.
100        """
101        nova = self._bridge_nova or self.nova
102        cell = nova.cell()
103
104        controllers = await cell.controllers()
105        motion_groups = []
106
107        if not controllers:
108            logger.warning("No controllers found")
109            return
110
111        for controller in controllers:
112            for motion_group in await controller.activated_motion_groups():
113                motion_groups.append(motion_group.motion_group_id)
114
115        rr.reset_time()
116        rr.set_time(TIME_INTERVAL_NAME, duration=0)
117
118        send_blueprint(motion_groups, self.show_details)
119        self.log_coordinate_system()
120
121    def log_coordinate_system(self) -> None:
122        """Log the coordinate system of the cell."""
123
124        coordinate_origins = np.zeros((3, 3))  # Origin points for x, y, z arrows
125        coordinate_vectors = (
126            np.array(
127                [
128                    [1.0, 0.0, 0.0],  # X direction
129                    [0.0, 1.0, 0.0],  # Y direction
130                    [0.0, 0.0, 1.0],  # Z direction
131                ]
132            )
133            * 200.0
134        )  # Scale factor of 200.0 for better visibility
135
136        coordinate_colors = np.array(
137            [
138                [1.0, 0.125, 0.376, 1.0],  # #ff2060 - Red/Pink for X
139                [0.125, 0.875, 0.502, 1.0],  # #20df80 - Green for Y
140                [0.125, 0.502, 1.0, 1.0],  # #2080ff - Blue for Z
141            ]
142        )
143
144        rr.log(
145            "coordinate_system_world",
146            rr.Arrows3D(
147                origins=coordinate_origins,
148                vectors=coordinate_vectors,
149                colors=coordinate_colors,
150                radii=rr.Radius.ui_points([5.0]),
151            ),
152            static=True,
153        )
154
155    async def log_collision_scenes(self) -> dict[str, models.CollisionScene]:
156        """Fetch and log all collision scenes from Nova to Rerun."""
157        bridge_nova = self._bridge_nova or self.nova
158        collision_scenes = (
159            await bridge_nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
160                cell=bridge_nova.cell()._cell_id
161            )
162        )
163        log_collision_scenes(collision_scenes)
164        return collision_scenes
165
166    async def log_collision_scene(self, scene_id: str) -> dict[str, models.CollisionScene]:
167        """Log a specific collision scene by its ID.
168
169        Args:
170            scene_id (str): The ID of the collision scene to log
171
172        Raises:
173            ValueError: If scene_id is not found in stored collision scenes
174        """
175        bridge_nova = self._bridge_nova or self.nova
176        collision_scenes = (
177            await bridge_nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
178                cell=bridge_nova.cell()._cell_id
179            )
180        )
181
182        if scene_id not in collision_scenes:
183            raise ValueError(f"Collision scene with ID {scene_id} not found")
184
185        log_collision_scenes({scene_id: collision_scenes[scene_id]})
186        return {scene_id: collision_scenes[scene_id]}
187
188    def _log_collision_scene(self, collision_scenes: dict[str, models.CollisionScene]) -> None:
189        log_collision_scenes(collision_scenes=collision_scenes)
190
191    async def log_safety_zones(self, motion_group: MotionGroup) -> None:
192        tcp_names = await motion_group.tcp_names()
193        tcp = tcp_names[0]
194
195        rr.reset_time()
196        rr.set_time(TIME_INTERVAL_NAME, duration=0)
197
198        log_safety_zones(
199            motion_group.motion_group_id, await motion_group._get_optimizer_setup(tcp=tcp)
200        )
201
202    # Backward compatibility method (deprecated)
203    async def log_saftey_zones(self, motion_group: MotionGroup) -> None:
204        """Deprecated: Use log_safety_zones instead."""
205        await self.log_safety_zones(motion_group)
206
207    def log_safety_zones_(
208        self, motion_group_id: str, optimizer_setup: models.OptimizerSetup
209    ) -> None:
210        log_safety_zones(motion_group_id, optimizer_setup)
211
212    async def log_motion(
213        self,
214        motion_id: str,
215        timing_mode=TimingMode.CONTINUE,
216        time_offset: float = 0,
217        tool_asset: Optional[str] = None,
218    ) -> None:
219        """Log motion trajectory to Rerun viewer.
220
221        Args:
222            motion_id: The motion identifier
223            timing_mode: DEPRECATED - Timing mode (ignored in new implementation)
224            time_offset: Time offset for visualization
225            tool_asset: Optional tool asset file path
226        """
227        if timing_mode != TimingMode.CONTINUE:
228            import warnings
229
230            warnings.warn(
231                "timing_mode parameter is deprecated and will be removed in a future version. "
232                "Timing is now handled automatically per motion group.",
233                DeprecationWarning,
234                stacklevel=2,
235            )
236        logger.debug(f"log_motion called with motion_id: {motion_id}")
237        try:
238            # Use the bridge's own Nova client for API calls
239            bridge_nova = self._bridge_nova or self.nova
240            logger.debug(
241                f"Using bridge_nova: {bridge_nova is not None}, bridge_nova._api_client: {hasattr(bridge_nova, '_api_client') if bridge_nova else False}"
242            )
243
244            # Fetch motion details from api
245            logger.debug("Fetching motion details...")
246            motion = await bridge_nova._api_client.motion_api.get_planned_motion(
247                bridge_nova.cell()._cell_id, motion_id
248            )
249            logger.debug("Fetching optimizer config...")
250            optimizer_config = (
251                await bridge_nova._api_client.motion_group_infos_api.get_optimizer_configuration(
252                    bridge_nova.cell()._cell_id, motion.motion_group
253                )
254            )
255            logger.debug("Fetching trajectory...")
256            trajectory = await bridge_nova._api_client.motion_api.get_motion_trajectory(
257                bridge_nova.cell()._cell_id, motion_id, int(RECORDING_INTERVAL * 1000)
258            )
259
260            logger.debug("Fetching motion groups...")
261            motion_groups = await bridge_nova._api_client.motion_group_api.list_motion_groups(
262                bridge_nova.cell()._cell_id
263            )
264            motion_motion_group = next(
265                (mg for mg in motion_groups.instances if mg.motion_group == motion.motion_group),
266                None,
267            )
268
269            logger.debug("Fetching collision scenes...")
270            collision_scenes = await bridge_nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
271                cell=bridge_nova.cell()._cell_id
272            )
273
274            if motion_motion_group is None:
275                raise ValueError(f"Motion group {motion.motion_group} not found")
276
277            # Get or initialize the timer for this motion group
278            motion_group_id = motion.motion_group
279            current_time = self._motion_group_timers.get(motion_group_id, 0.0)
280
281            logger.debug(
282                f"Calling log_motion function with trajectory points: {len(trajectory.trajectory or [])}"
283            )
284            log_motion(
285                motion_id=motion_id,
286                model_from_controller=motion_motion_group.model_from_controller,
287                motion_group=motion.motion_group,
288                optimizer_config=optimizer_config,
289                trajectory=trajectory.trajectory or [],
290                collision_scenes=collision_scenes,
291                time_offset=current_time + time_offset,
292                tool_asset=tool_asset,
293                show_collision_link_chain=self.show_collision_link_chain,
294                show_safety_link_chain=self.show_safety_link_chain,
295            )
296            # Update the timer for this motion group based on trajectory duration
297            if trajectory.trajectory:
298                last_trajectory_point = trajectory.trajectory[-1]
299                if last_trajectory_point.time is not None:
300                    self._motion_group_timers[motion_group_id] = (
301                        current_time + time_offset + last_trajectory_point.time
302                    )
303            logger.debug("log_motion completed successfully")
304        except RuntimeError as e:
305            if "Session is closed" in str(e):
306                # Session is closed, skip trajectory logging
307                logger.debug(f"Skipping trajectory logging due to closed session: {e}")
308                return
309            else:
310                raise
311        except Exception as e:
312            # Log other errors but don't fail
313            logger.error(f"Failed to log motion trajectory: {e}")
314            raise
315
316    async def log_trajectory(
317        self,
318        joint_trajectory: models.JointTrajectory,
319        tcp: str,
320        motion_group: MotionGroup,
321        timing_mode=TimingMode.CONTINUE,
322        time_offset: float = 0,
323        tool_asset: Optional[str] = None,
324    ) -> None:
325        """Log joint trajectory to Rerun viewer.
326
327        Args:
328            joint_trajectory: The joint trajectory to log
329            tcp: TCP identifier
330            motion_group: Motion group for planning
331            timing_mode: DEPRECATED - Timing mode (ignored in new implementation)
332            time_offset: Time offset for visualization
333            tool_asset: Optional tool asset file path
334        """
335        if timing_mode != TimingMode.CONTINUE:
336            import warnings
337
338            warnings.warn(
339                "timing_mode parameter is deprecated and will be removed in a future version. "
340                "Timing is now handled automatically per motion group.",
341                DeprecationWarning,
342                stacklevel=2,
343            )
344
345        if len(joint_trajectory.joint_positions) == 0:
346            raise ValueError("No joint trajectory provided")
347
348        load_plan_response = await motion_group._load_planned_motion(joint_trajectory, tcp)
349
350        await self.log_motion(
351            load_plan_response.motion, time_offset=time_offset, tool_asset=tool_asset
352        )
353
354    def continue_after_sync(self) -> None:
355        """No longer needed with per-motion-group timing.
356
357        This method is now a no-op since timing is automatically managed
358        per motion group. Each motion group maintains its own independent timeline.
359
360        .. deprecated::
361            continue_after_sync() is deprecated and will be removed in a future version.
362            Timing is now handled automatically per motion group.
363        """
364        import warnings
365
366        warnings.warn(
367            "continue_after_sync() is deprecated and will be removed in a future version. "
368            "Timing is now handled automatically per motion group.",
369            DeprecationWarning,
370            stacklevel=2,
371        )
372
373    async def log_error_feedback(
374        self, error_feedback: PlanTrajectoryFailedResponseErrorFeedback
375    ) -> None:
376        if isinstance(error_feedback.actual_instance, FeedbackOutOfWorkspace):
377            if (
378                error_feedback.actual_instance.invalid_tcp_pose
379                and error_feedback.actual_instance.invalid_tcp_pose.position
380            ):
381                position = error_feedback.actual_instance.invalid_tcp_pose.position
382                rr.log(
383                    "motion/errors/FeedbackOutOfWorkspace",
384                    rr.Points3D(
385                        [[position[0], position[1], position[2]]],
386                        radii=rr.Radius.ui_points([5.0]),
387                        colors=[(255, 0, 0, 255)],
388                        labels=["Out of Workspace"],
389                    ),
390                    static=True,
391                )
392
393        if isinstance(error_feedback.actual_instance, FeedbackCollision):
394            collisions = error_feedback.actual_instance.collisions
395            if not collisions:
396                return
397
398            for i, collision in enumerate(collisions):
399                if collision.position_on_a is None or collision.position_on_b is None:
400                    continue
401                if collision.position_on_a.world is None or collision.position_on_b.world is None:
402                    continue
403                if collision.normal_world_on_b is None:
404                    continue
405
406                # Extract positions
407                pos_a = collision.position_on_a.world
408                pos_b = collision.position_on_b.world
409                normal = collision.normal_world_on_b
410
411                # Scale normal for visibility
412                arrow_length = 50
413
414                # Log collision points
415                rr.log(
416                    f"motion/errors/FeedbackCollision/collisions/point_{i}/a",
417                    rr.Points3D(
418                        [pos_a], radii=rr.Radius.ui_points([5.0]), colors=[(255, 0, 0, 255)]
419                    ),
420                    static=True,
421                )
422
423                rr.log(
424                    f"motion/errors/FeedbackCollision/collisions/point_{i}/b",
425                    rr.Points3D(
426                        [pos_b], radii=rr.Radius.ui_points([5.0]), colors=[(0, 0, 255, 255)]
427                    ),
428                    static=True,
429                )
430
431                # Log normal vector as arrow
432                rr.log(
433                    f"motion/errors/FeedbackCollision/collisions/normal_{i}",
434                    rr.Arrows3D(
435                        origins=[pos_b],
436                        vectors=[
437                            [
438                                normal[0] * arrow_length,
439                                normal[1] * arrow_length,
440                                normal[2] * arrow_length,
441                            ]
442                        ],
443                        colors=[(255, 255, 0, 255)],
444                    ),
445                    static=True,
446                )
447
448    async def start_streaming(self, motion_group: MotionGroup) -> None:
449        """Start streaming real-time robot state to Rerun viewer."""
450        if motion_group in self._streaming_tasks:
451            return
452
453        task = asyncio.create_task(stream_motion_group(self, motion_group=motion_group))
454        self._streaming_tasks[motion_group] = task
455
456    async def stop_streaming(self) -> None:
457        """Stop all streaming tasks."""
458        for task in self._streaming_tasks.values():
459            task.cancel()
460        self._streaming_tasks.clear()
461
462    async def log_actions(
463        self,
464        actions: list[Action | CollisionFreeMotion] | Action,
465        show_connection: bool = False,
466        show_labels: bool = False,
467        motion_group: Optional[MotionGroup] = None,
468        tcp: Optional[str] = None,
469    ) -> None:
470        """Log robot actions as points in the Rerun viewer.
471
472        This method visualizes robot actions by determining their TCP poses and displaying
473        them as colored points in 3D space. Joint motions are converted to poses using
474        forward kinematics with the specified TCP.
475
476        Args:
477            actions: Single action or list of actions to visualize
478            show_connection: Whether to draw lines connecting consecutive action points
479            show_labels: Whether to display action type labels on points
480            motion_group: Motion group for forward kinematics (required for joint motions)
481            tcp: TCP identifier to use for forward kinematics. If None and motion_group
482                 is provided, uses the first available TCP. Should match the TCP used
483                 for trajectory planning to ensure consistency.
484
485        Raises:
486            ValueError: If no actions are provided
487        """
488        rr.reset_time()
489
490        # Use motion group specific timing if available
491        if motion_group is not None:
492            motion_group_time = self._motion_group_timers.get(motion_group.motion_group_id, 0.0)
493            rr.set_time(TIME_INTERVAL_NAME, duration=motion_group_time)
494        else:
495            # Fallback to time 0 if no motion group provided
496            rr.set_time(TIME_INTERVAL_NAME, duration=0.0)
497
498        if not isinstance(actions, list):
499            actions = [actions]
500
501        if len(actions) == 0:
502            raise ValueError("No actions provided")
503
504        # Determine the TCP to use - if not provided, get the first available TCP
505        if tcp is None and motion_group is not None:
506            tcp_names = await motion_group.tcp_names()
507            tcp = tcp_names[0] if tcp_names else "Flange"
508
509        positions = []
510        point_colors = []
511        labels = []
512
513        # Keep track of the last pose for write actions
514        last_pose = None
515        last_joints = None
516
517        # Process each action to get its pose
518        for i, action in enumerate(actions):
519            pose = None
520            action_type = getattr(action, "type", type(action).__name__)
521
522            if isinstance(action, WriteAction):
523                # Write actions use the last pose or joint config
524                if last_pose is not None:
525                    pose = last_pose
526                elif last_joints is not None and motion_group is not None and tcp is not None:
527                    # Use forward kinematics to convert joint config to pose
528                    pose = await self._joint_to_pose(last_joints, motion_group, tcp)
529                else:
530                    # Skip write actions without a previous pose/joint config
531                    continue
532
533            elif isinstance(action, CollisionFreeMotion) and isinstance(action.target, Pose):
534                pose = action.target
535                last_pose = pose
536
537            elif isinstance(action, Motion):
538                if hasattr(action, "target"):
539                    if isinstance(action.target, Pose):
540                        # Cartesian motion
541                        pose = action.target
542                        last_pose = pose
543                    elif (
544                        isinstance(action.target, tuple)
545                        and motion_group is not None
546                        and tcp is not None
547                    ):
548                        # Joint motion - use forward kinematics
549                        pose = await self._joint_to_pose(action.target, motion_group, tcp)
550                        last_joints = action.target
551                        last_pose = pose
552                    else:
553                        # Skip actions without a usable target
554                        continue
555                else:
556                    # Skip actions without a target
557                    continue
558            else:
559                # Skip other action types that we don't know how to handle
560                continue
561
562            # If we reach here, we should have a valid pose or we need to skip
563            if pose is None:
564                continue
565
566            logger.debug(f"Action {i}: {action_type}, Pose: {pose}")
567            positions.append([pose.position.x, pose.position.y, pose.position.z])
568
569            # Determine action type and color using better color palette
570            from nova_rerun_bridge.colors import colors
571
572            if isinstance(action, WriteAction):
573                point_colors.append(tuple(colors[0]))  # Light purple for IO actions
574            elif action_type == "joint_ptp":
575                point_colors.append(tuple(colors[2]))  # Medium purple for joint motions
576            elif action_type == "cartesian_ptp":
577                point_colors.append(tuple(colors[4]))  # Deeper purple for cartesian motions
578            elif action_type == "linear":
579                point_colors.append(tuple(colors[6]))  # Dark purple for linear motions
580            elif action_type == "circular":
581                point_colors.append(tuple(colors[8]))  # Very dark purple for circular motions
582            else:
583                point_colors.append(tuple(colors[10]))  # Darkest color for other actions
584
585            # Create descriptive label with ID and action type (only if needed)
586            labels.append(f"{len(positions) - 1}: {action_type}")
587
588        entity_path = (
589            f"motion/{motion_group.motion_group_id}/actions" if motion_group else "motion/actions"
590        )
591
592        # Log all positions with labels and colors
593        if positions:
594            # Prepare labels
595            point_labels = labels
596
597            rr.log(
598                entity_path,
599                rr.Points3D(
600                    positions,
601                    colors=point_colors,
602                    labels=point_labels,
603                    show_labels=show_labels,
604                    radii=rr.Radius.ui_points([8.0]),
605                ),
606                static=True,
607            )
608
609            # Log connections between consecutive actions if show_connection is True
610            if show_connection and len(positions) > 1:
611                connection_lines = []
612                for i in range(len(positions) - 1):
613                    connection_lines.append([positions[i], positions[i + 1]])
614
615                rr.log(
616                    f"{entity_path}/connections",
617                    rr.LineStrips3D(
618                        connection_lines,
619                        colors=[(128, 128, 128)],  # Gray connections
620                        radii=rr.Radius.ui_points([2.0]),
621                    ),
622                    static=True,
623                )
624
625    async def _joint_to_pose(
626        self, joint_config: tuple[float, ...], motion_group: MotionGroup, tcp: str
627    ) -> Pose:
628        """Convert joint configuration to pose using forward kinematics.
629
630        Args:
631            joint_config: The joint configuration to convert
632            motion_group: The motion group for forward kinematics
633            tcp: The TCP identifier used for planning (should match the TCP used in planning)
634        """
635        try:
636            # Use Nova's forward kinematics API to get TCP pose from joint configuration
637            from nova.api import models
638
639            # Create a joint position object
640            joint_position = models.Joints(joints=list(joint_config))
641
642            # Create TcpPoseRequest for forward kinematics calculation
643            tcp_pose_request = models.TcpPoseRequest(
644                motion_group=motion_group.motion_group_id, joint_position=joint_position, tcp=tcp
645            )
646
647            # Use Nova's API to calculate the TCP pose from joint configuration
648            api_pose = await motion_group._api_gateway.motion_group_kinematic_api.calculate_forward_kinematic(
649                cell=motion_group._cell,
650                motion_group=motion_group.motion_group_id,
651                tcp_pose_request=tcp_pose_request,
652            )
653
654            # Convert API pose to Nova Pose
655            orientation = api_pose.orientation
656            return Pose(
657                (
658                    api_pose.position.x,
659                    api_pose.position.y,
660                    api_pose.position.z,
661                    orientation.x if orientation else 0.0,
662                    orientation.y if orientation else 0.0,
663                    orientation.z if orientation else 0.0,
664                )
665            )
666
667        except Exception as e:
668            logger.warning(f"Failed to convert joints to pose using forward kinematics: {e}")
669            # Fallback: return a pose at origin
670            return Pose((0.0, 0.0, 0.0, 0.0, 0.0, 0.0))
671
672    def get_motion_group_time(self, motion_group_id: str) -> float:
673        """Get the current timeline position for a motion group.
674
675        Args:
676            motion_group_id: The motion group identifier
677
678        Returns:
679            Current time position for the motion group (0.0 if not seen before)
680        """
681        return self._motion_group_timers.get(motion_group_id, 0.0)
682
683    def reset_motion_group_time(self, motion_group_id: str) -> None:
684        """Reset the timeline for a specific motion group back to 0.
685
686        Args:
687            motion_group_id: The motion group identifier to reset
688        """
689        self._motion_group_timers[motion_group_id] = 0.0
690
691    async def __aenter__(self) -> "NovaRerunBridge":
692        """Context manager entry point.
693
694        Creates a separate Nova client instance for the bridge to use.
695
696        Returns:
697            NovaRerunBridge: Self reference for context manager usage.
698        """
699        # Create a separate Nova instance for the bridge using the same connection parameters
700        # This ensures the bridge has its own session that won't be closed by the main program
701        api_client = self.nova._api_client
702
703        # Extract the host without the /api/v1 suffix
704        host = api_client._host
705        if host.endswith("/api/v1"):
706            host = host[:-7]  # Remove '/api/v1'
707        elif host.endswith("/api/"):
708            host = host[:-5]  # Remove '/api/'
709
710        self._bridge_nova = Nova(
711            host=host,
712            access_token=api_client._access_token,
713            username=api_client._username,
714            password=api_client._password,
715            version=api_client._version,
716            verify_ssl=api_client._verify_ssl,
717        )
718        await self._bridge_nova.__aenter__()
719        return self
720
721    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
722        """Context manager exit point, ensures cleanup."""
723        if "VSCODE_PROXY_URI" in os.environ:
724            logger.info(f"Install rerun app and open the visual log on {get_rerun_address()}")
725
726        await self.cleanup()
727
728    async def cleanup(self) -> None:
729        """Cleanup resources and close Nova API client connection."""
730        # Clean up the bridge's own Nova instance
731        if self._bridge_nova is not None:
732            await self._bridge_nova.__aexit__(None, None, None)
733            self._bridge_nova = None
734
735        # Note: Don't clean up self.nova as it belongs to the main program

Bridge between Nova and Rerun for visualization.

This class provides functionality to visualize Nova data in Rerun. It handles trajectoy, collision scenes, blueprints and proper cleanup of resources.

Example:
from nova.core.nova import Nova
from nova_rerun_bridge import NovaRerunBridge

async def main():
    nova = Nova()
    async with NovaRerunBridge(nova) as bridge:
        await bridge.setup_blueprint()
        await bridge.log_collision_scenes()
Arguments:
  • nova (Nova): Instance of Nova client
  • spawn (bool, optional): Whether to spawn Rerun viewer. Defaults to True.
NovaRerunBridge( nova: nova.Nova, spawn: bool = True, recording_id=None, show_details: bool = True, show_collision_link_chain: bool = False, show_safety_link_chain: bool = True)
58    def __init__(
59        self,
60        nova: Nova,
61        spawn: bool = True,
62        recording_id=None,
63        show_details: bool = True,
64        show_collision_link_chain: bool = False,
65        show_safety_link_chain: bool = True,
66    ) -> None:
67        self._ensure_models_exist()
68        # Store the original nova instance for initial setup and to copy connection parameters
69        self.nova = nova
70        # Create a separate Nova instance for the bridge to use for API calls
71        self._bridge_nova = None
72        self._streaming_tasks: dict[MotionGroup, asyncio.Task] = {}
73        # Track timing per motion group - each motion group has its own timeline
74        self._motion_group_timers: dict[str, float] = {}
75        self.show_details = show_details
76        self.show_collision_link_chain = show_collision_link_chain
77        self.show_safety_link_chain = show_safety_link_chain
78
79        recording_id = recording_id or f"nova_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
80
81        if "VSCODE_PROXY_URI" in os.environ:
82            rr.init(application_id="nova", recording_id=recording_id, spawn=False)
83            rr.save("nova.rrd")
84            logger.info(f"Install rerun app and open the visual log on {get_rerun_address()}")
85        elif spawn:
86            rr.init(application_id="nova", recording_id=recording_id, spawn=True)
87
88        logger.add(sink=rr.LoggingHandler("logs/handler"))
nova
show_details
async def setup_blueprint(self) -> None:
 96    async def setup_blueprint(self) -> None:
 97        """Configure and send blueprint configuration to Rerun.
 98
 99        Fetches motion groups from Nova and configures visualization layout.
100        """
101        nova = self._bridge_nova or self.nova
102        cell = nova.cell()
103
104        controllers = await cell.controllers()
105        motion_groups = []
106
107        if not controllers:
108            logger.warning("No controllers found")
109            return
110
111        for controller in controllers:
112            for motion_group in await controller.activated_motion_groups():
113                motion_groups.append(motion_group.motion_group_id)
114
115        rr.reset_time()
116        rr.set_time(TIME_INTERVAL_NAME, duration=0)
117
118        send_blueprint(motion_groups, self.show_details)
119        self.log_coordinate_system()

Configure and send blueprint configuration to Rerun.

Fetches motion groups from Nova and configures visualization layout.

def log_coordinate_system(self) -> None:
121    def log_coordinate_system(self) -> None:
122        """Log the coordinate system of the cell."""
123
124        coordinate_origins = np.zeros((3, 3))  # Origin points for x, y, z arrows
125        coordinate_vectors = (
126            np.array(
127                [
128                    [1.0, 0.0, 0.0],  # X direction
129                    [0.0, 1.0, 0.0],  # Y direction
130                    [0.0, 0.0, 1.0],  # Z direction
131                ]
132            )
133            * 200.0
134        )  # Scale factor of 200.0 for better visibility
135
136        coordinate_colors = np.array(
137            [
138                [1.0, 0.125, 0.376, 1.0],  # #ff2060 - Red/Pink for X
139                [0.125, 0.875, 0.502, 1.0],  # #20df80 - Green for Y
140                [0.125, 0.502, 1.0, 1.0],  # #2080ff - Blue for Z
141            ]
142        )
143
144        rr.log(
145            "coordinate_system_world",
146            rr.Arrows3D(
147                origins=coordinate_origins,
148                vectors=coordinate_vectors,
149                colors=coordinate_colors,
150                radii=rr.Radius.ui_points([5.0]),
151            ),
152            static=True,
153        )

Log the coordinate system of the cell.

async def log_collision_scenes( self) -> dict[str, wandelbots_api_client.models.collision_scene.CollisionScene]:
155    async def log_collision_scenes(self) -> dict[str, models.CollisionScene]:
156        """Fetch and log all collision scenes from Nova to Rerun."""
157        bridge_nova = self._bridge_nova or self.nova
158        collision_scenes = (
159            await bridge_nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
160                cell=bridge_nova.cell()._cell_id
161            )
162        )
163        log_collision_scenes(collision_scenes)
164        return collision_scenes

Fetch and log all collision scenes from Nova to Rerun.

async def log_collision_scene( self, scene_id: str) -> dict[str, wandelbots_api_client.models.collision_scene.CollisionScene]:
166    async def log_collision_scene(self, scene_id: str) -> dict[str, models.CollisionScene]:
167        """Log a specific collision scene by its ID.
168
169        Args:
170            scene_id (str): The ID of the collision scene to log
171
172        Raises:
173            ValueError: If scene_id is not found in stored collision scenes
174        """
175        bridge_nova = self._bridge_nova or self.nova
176        collision_scenes = (
177            await bridge_nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
178                cell=bridge_nova.cell()._cell_id
179            )
180        )
181
182        if scene_id not in collision_scenes:
183            raise ValueError(f"Collision scene with ID {scene_id} not found")
184
185        log_collision_scenes({scene_id: collision_scenes[scene_id]})
186        return {scene_id: collision_scenes[scene_id]}

Log a specific collision scene by its ID.

Arguments:
  • scene_id (str): The ID of the collision scene to log
Raises:
  • ValueError: If scene_id is not found in stored collision scenes
async def log_safety_zones(self, motion_group: nova.MotionGroup) -> None:
191    async def log_safety_zones(self, motion_group: MotionGroup) -> None:
192        tcp_names = await motion_group.tcp_names()
193        tcp = tcp_names[0]
194
195        rr.reset_time()
196        rr.set_time(TIME_INTERVAL_NAME, duration=0)
197
198        log_safety_zones(
199            motion_group.motion_group_id, await motion_group._get_optimizer_setup(tcp=tcp)
200        )
async def log_saftey_zones(self, motion_group: nova.MotionGroup) -> None:
203    async def log_saftey_zones(self, motion_group: MotionGroup) -> None:
204        """Deprecated: Use log_safety_zones instead."""
205        await self.log_safety_zones(motion_group)

Deprecated: Use log_safety_zones instead.

def log_safety_zones_( self, motion_group_id: str, optimizer_setup: wandelbots_api_client.models.optimizer_setup.OptimizerSetup) -> None:
207    def log_safety_zones_(
208        self, motion_group_id: str, optimizer_setup: models.OptimizerSetup
209    ) -> None:
210        log_safety_zones(motion_group_id, optimizer_setup)
async def log_motion( self, motion_id: str, timing_mode=<TimingMode.CONTINUE: 2>, time_offset: float = 0, tool_asset: Optional[str] = None) -> None:
212    async def log_motion(
213        self,
214        motion_id: str,
215        timing_mode=TimingMode.CONTINUE,
216        time_offset: float = 0,
217        tool_asset: Optional[str] = None,
218    ) -> None:
219        """Log motion trajectory to Rerun viewer.
220
221        Args:
222            motion_id: The motion identifier
223            timing_mode: DEPRECATED - Timing mode (ignored in new implementation)
224            time_offset: Time offset for visualization
225            tool_asset: Optional tool asset file path
226        """
227        if timing_mode != TimingMode.CONTINUE:
228            import warnings
229
230            warnings.warn(
231                "timing_mode parameter is deprecated and will be removed in a future version. "
232                "Timing is now handled automatically per motion group.",
233                DeprecationWarning,
234                stacklevel=2,
235            )
236        logger.debug(f"log_motion called with motion_id: {motion_id}")
237        try:
238            # Use the bridge's own Nova client for API calls
239            bridge_nova = self._bridge_nova or self.nova
240            logger.debug(
241                f"Using bridge_nova: {bridge_nova is not None}, bridge_nova._api_client: {hasattr(bridge_nova, '_api_client') if bridge_nova else False}"
242            )
243
244            # Fetch motion details from api
245            logger.debug("Fetching motion details...")
246            motion = await bridge_nova._api_client.motion_api.get_planned_motion(
247                bridge_nova.cell()._cell_id, motion_id
248            )
249            logger.debug("Fetching optimizer config...")
250            optimizer_config = (
251                await bridge_nova._api_client.motion_group_infos_api.get_optimizer_configuration(
252                    bridge_nova.cell()._cell_id, motion.motion_group
253                )
254            )
255            logger.debug("Fetching trajectory...")
256            trajectory = await bridge_nova._api_client.motion_api.get_motion_trajectory(
257                bridge_nova.cell()._cell_id, motion_id, int(RECORDING_INTERVAL * 1000)
258            )
259
260            logger.debug("Fetching motion groups...")
261            motion_groups = await bridge_nova._api_client.motion_group_api.list_motion_groups(
262                bridge_nova.cell()._cell_id
263            )
264            motion_motion_group = next(
265                (mg for mg in motion_groups.instances if mg.motion_group == motion.motion_group),
266                None,
267            )
268
269            logger.debug("Fetching collision scenes...")
270            collision_scenes = await bridge_nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
271                cell=bridge_nova.cell()._cell_id
272            )
273
274            if motion_motion_group is None:
275                raise ValueError(f"Motion group {motion.motion_group} not found")
276
277            # Get or initialize the timer for this motion group
278            motion_group_id = motion.motion_group
279            current_time = self._motion_group_timers.get(motion_group_id, 0.0)
280
281            logger.debug(
282                f"Calling log_motion function with trajectory points: {len(trajectory.trajectory or [])}"
283            )
284            log_motion(
285                motion_id=motion_id,
286                model_from_controller=motion_motion_group.model_from_controller,
287                motion_group=motion.motion_group,
288                optimizer_config=optimizer_config,
289                trajectory=trajectory.trajectory or [],
290                collision_scenes=collision_scenes,
291                time_offset=current_time + time_offset,
292                tool_asset=tool_asset,
293                show_collision_link_chain=self.show_collision_link_chain,
294                show_safety_link_chain=self.show_safety_link_chain,
295            )
296            # Update the timer for this motion group based on trajectory duration
297            if trajectory.trajectory:
298                last_trajectory_point = trajectory.trajectory[-1]
299                if last_trajectory_point.time is not None:
300                    self._motion_group_timers[motion_group_id] = (
301                        current_time + time_offset + last_trajectory_point.time
302                    )
303            logger.debug("log_motion completed successfully")
304        except RuntimeError as e:
305            if "Session is closed" in str(e):
306                # Session is closed, skip trajectory logging
307                logger.debug(f"Skipping trajectory logging due to closed session: {e}")
308                return
309            else:
310                raise
311        except Exception as e:
312            # Log other errors but don't fail
313            logger.error(f"Failed to log motion trajectory: {e}")
314            raise

Log motion trajectory to Rerun viewer.

Arguments:
  • motion_id: The motion identifier
  • timing_mode: DEPRECATED - Timing mode (ignored in new implementation)
  • time_offset: Time offset for visualization
  • tool_asset: Optional tool asset file path
async def log_trajectory( self, joint_trajectory: wandelbots_api_client.models.joint_trajectory.JointTrajectory, tcp: str, motion_group: nova.MotionGroup, timing_mode=<TimingMode.CONTINUE: 2>, time_offset: float = 0, tool_asset: Optional[str] = None) -> None:
316    async def log_trajectory(
317        self,
318        joint_trajectory: models.JointTrajectory,
319        tcp: str,
320        motion_group: MotionGroup,
321        timing_mode=TimingMode.CONTINUE,
322        time_offset: float = 0,
323        tool_asset: Optional[str] = None,
324    ) -> None:
325        """Log joint trajectory to Rerun viewer.
326
327        Args:
328            joint_trajectory: The joint trajectory to log
329            tcp: TCP identifier
330            motion_group: Motion group for planning
331            timing_mode: DEPRECATED - Timing mode (ignored in new implementation)
332            time_offset: Time offset for visualization
333            tool_asset: Optional tool asset file path
334        """
335        if timing_mode != TimingMode.CONTINUE:
336            import warnings
337
338            warnings.warn(
339                "timing_mode parameter is deprecated and will be removed in a future version. "
340                "Timing is now handled automatically per motion group.",
341                DeprecationWarning,
342                stacklevel=2,
343            )
344
345        if len(joint_trajectory.joint_positions) == 0:
346            raise ValueError("No joint trajectory provided")
347
348        load_plan_response = await motion_group._load_planned_motion(joint_trajectory, tcp)
349
350        await self.log_motion(
351            load_plan_response.motion, time_offset=time_offset, tool_asset=tool_asset
352        )

Log joint trajectory to Rerun viewer.

Arguments:
  • joint_trajectory: The joint trajectory to log
  • tcp: TCP identifier
  • motion_group: Motion group for planning
  • timing_mode: DEPRECATED - Timing mode (ignored in new implementation)
  • time_offset: Time offset for visualization
  • tool_asset: Optional tool asset file path
def continue_after_sync(self) -> None:
354    def continue_after_sync(self) -> None:
355        """No longer needed with per-motion-group timing.
356
357        This method is now a no-op since timing is automatically managed
358        per motion group. Each motion group maintains its own independent timeline.
359
360        .. deprecated::
361            continue_after_sync() is deprecated and will be removed in a future version.
362            Timing is now handled automatically per motion group.
363        """
364        import warnings
365
366        warnings.warn(
367            "continue_after_sync() is deprecated and will be removed in a future version. "
368            "Timing is now handled automatically per motion group.",
369            DeprecationWarning,
370            stacklevel=2,
371        )

No longer needed with per-motion-group timing.

This method is now a no-op since timing is automatically managed per motion group. Each motion group maintains its own independent timeline.

Deprecated since version : continue_after_sync() is deprecated and will be removed in a future version. Timing is now handled automatically per motion group.

async def log_error_feedback( self, error_feedback: wandelbots_api_client.models.plan_trajectory_failed_response_error_feedback.PlanTrajectoryFailedResponseErrorFeedback) -> None:
373    async def log_error_feedback(
374        self, error_feedback: PlanTrajectoryFailedResponseErrorFeedback
375    ) -> None:
376        if isinstance(error_feedback.actual_instance, FeedbackOutOfWorkspace):
377            if (
378                error_feedback.actual_instance.invalid_tcp_pose
379                and error_feedback.actual_instance.invalid_tcp_pose.position
380            ):
381                position = error_feedback.actual_instance.invalid_tcp_pose.position
382                rr.log(
383                    "motion/errors/FeedbackOutOfWorkspace",
384                    rr.Points3D(
385                        [[position[0], position[1], position[2]]],
386                        radii=rr.Radius.ui_points([5.0]),
387                        colors=[(255, 0, 0, 255)],
388                        labels=["Out of Workspace"],
389                    ),
390                    static=True,
391                )
392
393        if isinstance(error_feedback.actual_instance, FeedbackCollision):
394            collisions = error_feedback.actual_instance.collisions
395            if not collisions:
396                return
397
398            for i, collision in enumerate(collisions):
399                if collision.position_on_a is None or collision.position_on_b is None:
400                    continue
401                if collision.position_on_a.world is None or collision.position_on_b.world is None:
402                    continue
403                if collision.normal_world_on_b is None:
404                    continue
405
406                # Extract positions
407                pos_a = collision.position_on_a.world
408                pos_b = collision.position_on_b.world
409                normal = collision.normal_world_on_b
410
411                # Scale normal for visibility
412                arrow_length = 50
413
414                # Log collision points
415                rr.log(
416                    f"motion/errors/FeedbackCollision/collisions/point_{i}/a",
417                    rr.Points3D(
418                        [pos_a], radii=rr.Radius.ui_points([5.0]), colors=[(255, 0, 0, 255)]
419                    ),
420                    static=True,
421                )
422
423                rr.log(
424                    f"motion/errors/FeedbackCollision/collisions/point_{i}/b",
425                    rr.Points3D(
426                        [pos_b], radii=rr.Radius.ui_points([5.0]), colors=[(0, 0, 255, 255)]
427                    ),
428                    static=True,
429                )
430
431                # Log normal vector as arrow
432                rr.log(
433                    f"motion/errors/FeedbackCollision/collisions/normal_{i}",
434                    rr.Arrows3D(
435                        origins=[pos_b],
436                        vectors=[
437                            [
438                                normal[0] * arrow_length,
439                                normal[1] * arrow_length,
440                                normal[2] * arrow_length,
441                            ]
442                        ],
443                        colors=[(255, 255, 0, 255)],
444                    ),
445                    static=True,
446                )
async def start_streaming(self, motion_group: nova.MotionGroup) -> None:
448    async def start_streaming(self, motion_group: MotionGroup) -> None:
449        """Start streaming real-time robot state to Rerun viewer."""
450        if motion_group in self._streaming_tasks:
451            return
452
453        task = asyncio.create_task(stream_motion_group(self, motion_group=motion_group))
454        self._streaming_tasks[motion_group] = task

Start streaming real-time robot state to Rerun viewer.

async def stop_streaming(self) -> None:
456    async def stop_streaming(self) -> None:
457        """Stop all streaming tasks."""
458        for task in self._streaming_tasks.values():
459            task.cancel()
460        self._streaming_tasks.clear()

Stop all streaming tasks.

async def log_actions( self, actions: list[nova.actions.Action | nova.actions.motions.CollisionFreeMotion] | nova.actions.Action, show_connection: bool = False, show_labels: bool = False, motion_group: Optional[nova.MotionGroup] = None, tcp: Optional[str] = None) -> None:
462    async def log_actions(
463        self,
464        actions: list[Action | CollisionFreeMotion] | Action,
465        show_connection: bool = False,
466        show_labels: bool = False,
467        motion_group: Optional[MotionGroup] = None,
468        tcp: Optional[str] = None,
469    ) -> None:
470        """Log robot actions as points in the Rerun viewer.
471
472        This method visualizes robot actions by determining their TCP poses and displaying
473        them as colored points in 3D space. Joint motions are converted to poses using
474        forward kinematics with the specified TCP.
475
476        Args:
477            actions: Single action or list of actions to visualize
478            show_connection: Whether to draw lines connecting consecutive action points
479            show_labels: Whether to display action type labels on points
480            motion_group: Motion group for forward kinematics (required for joint motions)
481            tcp: TCP identifier to use for forward kinematics. If None and motion_group
482                 is provided, uses the first available TCP. Should match the TCP used
483                 for trajectory planning to ensure consistency.
484
485        Raises:
486            ValueError: If no actions are provided
487        """
488        rr.reset_time()
489
490        # Use motion group specific timing if available
491        if motion_group is not None:
492            motion_group_time = self._motion_group_timers.get(motion_group.motion_group_id, 0.0)
493            rr.set_time(TIME_INTERVAL_NAME, duration=motion_group_time)
494        else:
495            # Fallback to time 0 if no motion group provided
496            rr.set_time(TIME_INTERVAL_NAME, duration=0.0)
497
498        if not isinstance(actions, list):
499            actions = [actions]
500
501        if len(actions) == 0:
502            raise ValueError("No actions provided")
503
504        # Determine the TCP to use - if not provided, get the first available TCP
505        if tcp is None and motion_group is not None:
506            tcp_names = await motion_group.tcp_names()
507            tcp = tcp_names[0] if tcp_names else "Flange"
508
509        positions = []
510        point_colors = []
511        labels = []
512
513        # Keep track of the last pose for write actions
514        last_pose = None
515        last_joints = None
516
517        # Process each action to get its pose
518        for i, action in enumerate(actions):
519            pose = None
520            action_type = getattr(action, "type", type(action).__name__)
521
522            if isinstance(action, WriteAction):
523                # Write actions use the last pose or joint config
524                if last_pose is not None:
525                    pose = last_pose
526                elif last_joints is not None and motion_group is not None and tcp is not None:
527                    # Use forward kinematics to convert joint config to pose
528                    pose = await self._joint_to_pose(last_joints, motion_group, tcp)
529                else:
530                    # Skip write actions without a previous pose/joint config
531                    continue
532
533            elif isinstance(action, CollisionFreeMotion) and isinstance(action.target, Pose):
534                pose = action.target
535                last_pose = pose
536
537            elif isinstance(action, Motion):
538                if hasattr(action, "target"):
539                    if isinstance(action.target, Pose):
540                        # Cartesian motion
541                        pose = action.target
542                        last_pose = pose
543                    elif (
544                        isinstance(action.target, tuple)
545                        and motion_group is not None
546                        and tcp is not None
547                    ):
548                        # Joint motion - use forward kinematics
549                        pose = await self._joint_to_pose(action.target, motion_group, tcp)
550                        last_joints = action.target
551                        last_pose = pose
552                    else:
553                        # Skip actions without a usable target
554                        continue
555                else:
556                    # Skip actions without a target
557                    continue
558            else:
559                # Skip other action types that we don't know how to handle
560                continue
561
562            # If we reach here, we should have a valid pose or we need to skip
563            if pose is None:
564                continue
565
566            logger.debug(f"Action {i}: {action_type}, Pose: {pose}")
567            positions.append([pose.position.x, pose.position.y, pose.position.z])
568
569            # Determine action type and color using better color palette
570            from nova_rerun_bridge.colors import colors
571
572            if isinstance(action, WriteAction):
573                point_colors.append(tuple(colors[0]))  # Light purple for IO actions
574            elif action_type == "joint_ptp":
575                point_colors.append(tuple(colors[2]))  # Medium purple for joint motions
576            elif action_type == "cartesian_ptp":
577                point_colors.append(tuple(colors[4]))  # Deeper purple for cartesian motions
578            elif action_type == "linear":
579                point_colors.append(tuple(colors[6]))  # Dark purple for linear motions
580            elif action_type == "circular":
581                point_colors.append(tuple(colors[8]))  # Very dark purple for circular motions
582            else:
583                point_colors.append(tuple(colors[10]))  # Darkest color for other actions
584
585            # Create descriptive label with ID and action type (only if needed)
586            labels.append(f"{len(positions) - 1}: {action_type}")
587
588        entity_path = (
589            f"motion/{motion_group.motion_group_id}/actions" if motion_group else "motion/actions"
590        )
591
592        # Log all positions with labels and colors
593        if positions:
594            # Prepare labels
595            point_labels = labels
596
597            rr.log(
598                entity_path,
599                rr.Points3D(
600                    positions,
601                    colors=point_colors,
602                    labels=point_labels,
603                    show_labels=show_labels,
604                    radii=rr.Radius.ui_points([8.0]),
605                ),
606                static=True,
607            )
608
609            # Log connections between consecutive actions if show_connection is True
610            if show_connection and len(positions) > 1:
611                connection_lines = []
612                for i in range(len(positions) - 1):
613                    connection_lines.append([positions[i], positions[i + 1]])
614
615                rr.log(
616                    f"{entity_path}/connections",
617                    rr.LineStrips3D(
618                        connection_lines,
619                        colors=[(128, 128, 128)],  # Gray connections
620                        radii=rr.Radius.ui_points([2.0]),
621                    ),
622                    static=True,
623                )

Log robot actions as points in the Rerun viewer.

This method visualizes robot actions by determining their TCP poses and displaying them as colored points in 3D space. Joint motions are converted to poses using forward kinematics with the specified TCP.

Arguments:
  • actions: Single action or list of actions to visualize
  • show_connection: Whether to draw lines connecting consecutive action points
  • show_labels: Whether to display action type labels on points
  • motion_group: Motion group for forward kinematics (required for joint motions)
  • tcp: TCP identifier to use for forward kinematics. If None and motion_group is provided, uses the first available TCP. Should match the TCP used for trajectory planning to ensure consistency.
Raises:
  • ValueError: If no actions are provided
def get_motion_group_time(self, motion_group_id: str) -> float:
672    def get_motion_group_time(self, motion_group_id: str) -> float:
673        """Get the current timeline position for a motion group.
674
675        Args:
676            motion_group_id: The motion group identifier
677
678        Returns:
679            Current time position for the motion group (0.0 if not seen before)
680        """
681        return self._motion_group_timers.get(motion_group_id, 0.0)

Get the current timeline position for a motion group.

Arguments:
  • motion_group_id: The motion group identifier
Returns:

Current time position for the motion group (0.0 if not seen before)

def reset_motion_group_time(self, motion_group_id: str) -> None:
683    def reset_motion_group_time(self, motion_group_id: str) -> None:
684        """Reset the timeline for a specific motion group back to 0.
685
686        Args:
687            motion_group_id: The motion group identifier to reset
688        """
689        self._motion_group_timers[motion_group_id] = 0.0

Reset the timeline for a specific motion group back to 0.

Arguments:
  • motion_group_id: The motion group identifier to reset
async def cleanup(self) -> None:
728    async def cleanup(self) -> None:
729        """Cleanup resources and close Nova API client connection."""
730        # Clean up the bridge's own Nova instance
731        if self._bridge_nova is not None:
732            await self._bridge_nova.__aexit__(None, None, None)
733            self._bridge_nova = None
734
735        # Note: Don't clean up self.nova as it belongs to the main program

Cleanup resources and close Nova API client connection.