nova_rerun_bridge

 1from nova_rerun_bridge import (
 2    dh_robot,
 3    hull_visualizer,
 4    motion_storage,
 5    nova_rerun_bridge,
 6    robot_visualizer,
 7)
 8
 9__all__ = ["dh_robot", "hull_visualizer", "robot_visualizer", "motion_storage", "NovaRerunBridge"]
10
11NovaRerunBridge = nova_rerun_bridge.NovaRerunBridge
class NovaRerunBridge:
 33class NovaRerunBridge:
 34    """Bridge between Nova and Rerun for visualization.
 35
 36    This class provides functionality to visualize Nova data in Rerun.
 37    It handles trajectoy, collision scenes, blueprints and proper cleanup of resources.
 38
 39    Example:
 40        ```python
 41        from nova.core.nova import Nova
 42        from nova_rerun_bridge import NovaRerunBridge
 43
 44        async def main():
 45            nova = Nova()
 46            async with NovaRerunBridge(nova) as bridge:
 47                await bridge.setup_blueprint()
 48                await bridge.log_collision_scenes()
 49
 50        ```
 51
 52    Args:
 53        nova (Nova): Instance of Nova client
 54        spawn (bool, optional): Whether to spawn Rerun viewer. Defaults to True.
 55    """
 56
 57    def __init__(self, nova: Nova, spawn: bool = True, recording_id=None) -> None:
 58        self._ensure_models_exist()
 59        self.nova = nova
 60        self._streaming_tasks: dict[MotionGroup, asyncio.Task] = {}
 61        if spawn:
 62            recording_id = recording_id or f"nova_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
 63            rr.init(application_id="nova", recording_id=recording_id, spawn=True)
 64        logger.add(sink=rr.LoggingHandler("logs/handler"))
 65
 66    def _ensure_models_exist(self):
 67        """Ensure robot models are downloaded"""
 68        models_dir = Path(get_project_root()) / "models"
 69        if not models_dir.exists() or not list(models_dir.glob("*.glb")):
 70            print("Models not found, run update_robot_models() or uv run download-models")
 71
 72    async def setup_blueprint(self) -> None:
 73        """Configure and send blueprint configuration to Rerun.
 74
 75        Fetches motion groups from Nova and configures visualization layout.
 76        """
 77        cell = self.nova.cell()
 78
 79        controllers = await cell.controllers()
 80        motion_groups = []
 81
 82        if not controllers:
 83            logger.warning("No controllers found")
 84            return
 85
 86        for controller in controllers:
 87            for motion_group in await controller.activated_motion_groups():
 88                motion_groups.append(motion_group.motion_group_id)
 89
 90        rr.reset_time()
 91        rr.set_time(TIME_INTERVAL_NAME, duration=0)
 92
 93        send_blueprint(motion_groups)
 94        self.log_coordinate_system()
 95
 96    def log_coordinate_system(self) -> None:
 97        """Log the coordinate system of the cell."""
 98
 99        coordinate_origins = np.zeros((3, 3))  # Origin points for x, y, z arrows
100        coordinate_vectors = (
101            np.array(
102                [
103                    [1.0, 0.0, 0.0],  # X direction
104                    [0.0, 1.0, 0.0],  # Y direction
105                    [0.0, 0.0, 1.0],  # Z direction
106                ]
107            )
108            * 200.0
109        )  # Scale factor of 200.0 for better visibility
110
111        coordinate_colors = np.array(
112            [
113                [1.0, 0.125, 0.376, 1.0],  # #ff2060 - Red/Pink for X
114                [0.125, 0.875, 0.502, 1.0],  # #20df80 - Green for Y
115                [0.125, 0.502, 1.0, 1.0],  # #2080ff - Blue for Z
116            ]
117        )
118
119        rr.log(
120            "coordinate_system_world",
121            rr.Arrows3D(
122                origins=coordinate_origins,
123                vectors=coordinate_vectors,
124                colors=coordinate_colors,
125                radii=rr.Radius.ui_points([5.0]),
126            ),
127            static=True,
128        )
129
130    async def log_collision_scenes(self) -> dict[str, models.CollisionScene]:
131        """Fetch and log all collision scenes from Nova to Rerun."""
132        collision_scenes = (
133            await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
134                cell=self.nova.cell()._cell_id
135            )
136        )
137        log_collision_scenes(collision_scenes)
138        return collision_scenes
139
140    async def log_collision_scene(self, scene_id: str) -> dict[str, models.CollisionScene]:
141        """Log a specific collision scene by its ID.
142
143        Args:
144            scene_id (str): The ID of the collision scene to log
145
146        Raises:
147            ValueError: If scene_id is not found in stored collision scenes
148        """
149        collision_scenes = (
150            await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
151                cell=self.nova.cell()._cell_id
152            )
153        )
154
155        if scene_id not in collision_scenes:
156            raise ValueError(f"Collision scene with ID {scene_id} not found")
157
158        log_collision_scenes({scene_id: collision_scenes[scene_id]})
159        return {scene_id: collision_scenes[scene_id]}
160
161    def _log_collision_scene(self, collision_scenes: dict[str, models.CollisionScene]) -> None:
162        log_collision_scenes(collision_scenes=collision_scenes)
163
164    async def log_saftey_zones(self, motion_group: MotionGroup) -> None:
165        tcp_names = await motion_group.tcp_names()
166        tcp = tcp_names[0]
167
168        rr.reset_time()
169        rr.set_time(TIME_INTERVAL_NAME, duration=0)
170
171        log_safety_zones(
172            motion_group.motion_group_id, await motion_group._get_optimizer_setup(tcp=tcp)
173        )
174
175    def log_saftey_zones_(
176        self, motion_group_id: str, optimizer_setup: models.OptimizerSetup
177    ) -> None:
178        log_safety_zones(motion_group_id, optimizer_setup)
179
180    async def log_motion(
181        self,
182        motion_id: str,
183        timing_mode=TimingMode.CONTINUE,
184        time_offset: float = 0,
185        tool_asset: Optional[str] = None,
186    ) -> None:
187        # Fetch motion details from api
188        motion = await self.nova._api_client.motion_api.get_planned_motion(
189            self.nova.cell()._cell_id, motion_id
190        )
191        optimizer_config = (
192            await self.nova._api_client.motion_group_infos_api.get_optimizer_configuration(
193                self.nova.cell()._cell_id, motion.motion_group
194            )
195        )
196        trajectory = await self.nova._api_client.motion_api.get_motion_trajectory(
197            self.nova.cell()._cell_id, motion_id, int(RECORDING_INTERVAL * 1000)
198        )
199
200        motion_groups = await self.nova._api_client.motion_group_api.list_motion_groups(
201            self.nova.cell()._cell_id
202        )
203        motion_motion_group = next(
204            (mg for mg in motion_groups.instances if mg.motion_group == motion.motion_group), None
205        )
206
207        collision_scenes = (
208            await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
209                cell=self.nova.cell()._cell_id
210            )
211        )
212
213        if motion_motion_group is None:
214            raise ValueError(f"Motion group {motion.motion_group} not found")
215
216        log_motion(
217            motion_id=motion_id,
218            model_from_controller=motion_motion_group.model_from_controller,
219            motion_group=motion.motion_group,
220            optimizer_config=optimizer_config,
221            trajectory=trajectory.trajectory,
222            collision_scenes=collision_scenes,
223            time_offset=time_offset,
224            timing_mode=timing_mode,
225            tool_asset=tool_asset,
226        )
227
228    async def log_trajectory(
229        self,
230        joint_trajectory: models.JointTrajectory,
231        tcp: str,
232        motion_group: MotionGroup,
233        timing_mode=TimingMode.CONTINUE,
234        time_offset: float = 0,
235        tool_asset: str = None,
236    ) -> None:
237        if len(joint_trajectory.joint_positions) == 0:
238            raise ValueError("No joint trajectory provided")
239        load_plan_response = await motion_group._load_planned_motion(joint_trajectory, tcp)
240        await self.log_motion(
241            load_plan_response.motion,
242            timing_mode=timing_mode,
243            time_offset=time_offset,
244            tool_asset=tool_asset,
245        )
246
247    def continue_after_sync(self) -> None:
248        continue_after_sync()
249
250    async def log_error_feedback(
251        self, error_feedback: PlanTrajectoryFailedResponseErrorFeedback
252    ) -> None:
253        if isinstance(error_feedback.actual_instance, FeedbackOutOfWorkspace):
254            if (
255                error_feedback.actual_instance.invalid_tcp_pose
256                and error_feedback.actual_instance.invalid_tcp_pose.position
257            ):
258                position = error_feedback.actual_instance.invalid_tcp_pose.position
259                rr.log(
260                    "motion/errors/FeedbackOutOfWorkspace",
261                    rr.Points3D(
262                        [[position[0], position[1], position[2]]],
263                        radii=rr.Radius.ui_points([5.0]),
264                        colors=[(255, 0, 0, 255)],
265                        labels=["Out of Workspace"],
266                    ),
267                    static=True,
268                )
269
270        if isinstance(error_feedback.actual_instance, FeedbackCollision):
271            collisions = error_feedback.actual_instance.collisions
272            if not collisions:
273                return
274
275            for i, collision in enumerate(collisions):
276                if collision.position_on_a is None or collision.position_on_b is None:
277                    continue
278                if collision.position_on_a.world is None or collision.position_on_b.world is None:
279                    continue
280                if collision.normal_world_on_b is None:
281                    continue
282
283                # Extract positions
284                pos_a = collision.position_on_a.world
285                pos_b = collision.position_on_b.world
286                normal = collision.normal_world_on_b
287
288                # Scale normal for visibility
289                arrow_length = 50
290
291                # Log collision points
292                rr.log(
293                    f"motion/errors/FeedbackCollision/collisions/point_{i}/a",
294                    rr.Points3D(
295                        [pos_a], radii=rr.Radius.ui_points([5.0]), colors=[(255, 0, 0, 255)]
296                    ),
297                    static=True,
298                )
299
300                rr.log(
301                    f"motion/errors/FeedbackCollision/collisions/point_{i}/b",
302                    rr.Points3D(
303                        [pos_b], radii=rr.Radius.ui_points([5.0]), colors=[(0, 0, 255, 255)]
304                    ),
305                    static=True,
306                )
307
308                # Log normal vector as arrow
309                rr.log(
310                    f"motion/errors/FeedbackCollision/collisions/normal_{i}",
311                    rr.Arrows3D(
312                        origins=[pos_b],
313                        vectors=[
314                            [
315                                normal[0] * arrow_length,
316                                normal[1] * arrow_length,
317                                normal[2] * arrow_length,
318                            ]
319                        ],
320                        colors=[(255, 255, 0, 255)],
321                    ),
322                    static=True,
323                )
324
325    async def start_streaming(self, motion_group: MotionGroup) -> None:
326        """Start streaming real-time robot state to Rerun viewer."""
327        if motion_group in self._streaming_tasks:
328            return
329
330        task = asyncio.create_task(stream_motion_group(self, motion_group=motion_group))
331        self._streaming_tasks[motion_group] = task
332
333    async def stop_streaming(self) -> None:
334        """Stop all streaming tasks."""
335        for task in self._streaming_tasks.values():
336            task.cancel()
337        self._streaming_tasks.clear()
338
339    async def log_actions(
340        self,
341        actions: list[Action | CollisionFreeMotion] | Action,
342        show_connection: bool = False,
343        motion_group: Optional[MotionGroup] = None,
344    ) -> None:
345        from nova_rerun_bridge import trajectory
346
347        rr.reset_time()
348        rr.set_time(TIME_INTERVAL_NAME, duration=trajectory._last_end_time)
349
350        if not isinstance(actions, list):
351            actions = [actions]
352
353        if len(actions) == 0:
354            raise ValueError("No actions provided")
355
356        # Collect poses from regular actions
357        regular_actions = [
358            action for action in actions if isinstance(action, (Motion, WriteAction))
359        ]
360        regular_poses = (
361            CombinedActions(items=tuple(regular_actions)).poses() if regular_actions else []
362        )
363
364        # Collect poses from CollisionFreeMotion targets
365        collision_free_poses = [
366            action.target
367            for action in actions
368            if isinstance(action, CollisionFreeMotion) and isinstance(action.target, Pose)
369        ]
370
371        # Combine all poses
372        all_poses = regular_poses + collision_free_poses
373        positions = []
374        point_colors = []
375        use_red = False
376
377        # Collect all positions and determine colors
378        for i, action in enumerate(actions):
379            if isinstance(action, WriteAction):
380                use_red = True
381            if i < len(all_poses):  # Only process if there's a corresponding pose
382                pose = all_poses[i]
383                logger.debug(f"Pose: {pose}")
384                positions.append([pose.position.x, pose.position.y, pose.position.z])
385                point_colors.append(colors.colors[1] if use_red else colors.colors[9])
386
387        enity_path = (
388            f"motion/{motion_group.motion_group_id}/actions" if motion_group else "motion/actions"
389        )
390
391        # Log all positions at once
392        rr.log(
393            enity_path,
394            rr.Points3D(positions, colors=point_colors, radii=rr.Radius.ui_points([5.0])),
395            static=True,
396        )
397
398        if show_connection:
399            rr.log(
400                f"{enity_path}/connection", rr.LineStrips3D([positions], colors=[155, 155, 155, 50])
401            )
402
403    async def __aenter__(self) -> "NovaRerunBridge":
404        """Context manager entry point.
405
406        Returns:
407            NovaRerunBridge: Self reference for context manager usage.
408        """
409        return self
410
411    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
412        """Context manager exit point, ensures cleanup."""
413        await self.cleanup()
414
415    async def cleanup(self) -> None:
416        """Cleanup resources and close Nova API client connection."""
417        if hasattr(self.nova, "_api_client"):
418            await self.nova._api_client.close()

Bridge between Nova and Rerun for visualization.

This class provides functionality to visualize Nova data in Rerun. It handles trajectoy, collision scenes, blueprints and proper cleanup of resources.

Example:
from nova.core.nova import Nova
from nova_rerun_bridge import NovaRerunBridge

async def main():
    nova = Nova()
    async with NovaRerunBridge(nova) as bridge:
        await bridge.setup_blueprint()
        await bridge.log_collision_scenes()
Arguments:
  • nova (Nova): Instance of Nova client
  • spawn (bool, optional): Whether to spawn Rerun viewer. Defaults to True.
NovaRerunBridge(nova: nova.Nova, spawn: bool = True, recording_id=None)
57    def __init__(self, nova: Nova, spawn: bool = True, recording_id=None) -> None:
58        self._ensure_models_exist()
59        self.nova = nova
60        self._streaming_tasks: dict[MotionGroup, asyncio.Task] = {}
61        if spawn:
62            recording_id = recording_id or f"nova_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
63            rr.init(application_id="nova", recording_id=recording_id, spawn=True)
64        logger.add(sink=rr.LoggingHandler("logs/handler"))
nova
async def setup_blueprint(self) -> None:
72    async def setup_blueprint(self) -> None:
73        """Configure and send blueprint configuration to Rerun.
74
75        Fetches motion groups from Nova and configures visualization layout.
76        """
77        cell = self.nova.cell()
78
79        controllers = await cell.controllers()
80        motion_groups = []
81
82        if not controllers:
83            logger.warning("No controllers found")
84            return
85
86        for controller in controllers:
87            for motion_group in await controller.activated_motion_groups():
88                motion_groups.append(motion_group.motion_group_id)
89
90        rr.reset_time()
91        rr.set_time(TIME_INTERVAL_NAME, duration=0)
92
93        send_blueprint(motion_groups)
94        self.log_coordinate_system()

Configure and send blueprint configuration to Rerun.

Fetches motion groups from Nova and configures visualization layout.

def log_coordinate_system(self) -> None:
 96    def log_coordinate_system(self) -> None:
 97        """Log the coordinate system of the cell."""
 98
 99        coordinate_origins = np.zeros((3, 3))  # Origin points for x, y, z arrows
100        coordinate_vectors = (
101            np.array(
102                [
103                    [1.0, 0.0, 0.0],  # X direction
104                    [0.0, 1.0, 0.0],  # Y direction
105                    [0.0, 0.0, 1.0],  # Z direction
106                ]
107            )
108            * 200.0
109        )  # Scale factor of 200.0 for better visibility
110
111        coordinate_colors = np.array(
112            [
113                [1.0, 0.125, 0.376, 1.0],  # #ff2060 - Red/Pink for X
114                [0.125, 0.875, 0.502, 1.0],  # #20df80 - Green for Y
115                [0.125, 0.502, 1.0, 1.0],  # #2080ff - Blue for Z
116            ]
117        )
118
119        rr.log(
120            "coordinate_system_world",
121            rr.Arrows3D(
122                origins=coordinate_origins,
123                vectors=coordinate_vectors,
124                colors=coordinate_colors,
125                radii=rr.Radius.ui_points([5.0]),
126            ),
127            static=True,
128        )

Log the coordinate system of the cell.

async def log_collision_scenes( self) -> dict[str, wandelbots_api_client.models.collision_scene.CollisionScene]:
130    async def log_collision_scenes(self) -> dict[str, models.CollisionScene]:
131        """Fetch and log all collision scenes from Nova to Rerun."""
132        collision_scenes = (
133            await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
134                cell=self.nova.cell()._cell_id
135            )
136        )
137        log_collision_scenes(collision_scenes)
138        return collision_scenes

Fetch and log all collision scenes from Nova to Rerun.

async def log_collision_scene( self, scene_id: str) -> dict[str, wandelbots_api_client.models.collision_scene.CollisionScene]:
140    async def log_collision_scene(self, scene_id: str) -> dict[str, models.CollisionScene]:
141        """Log a specific collision scene by its ID.
142
143        Args:
144            scene_id (str): The ID of the collision scene to log
145
146        Raises:
147            ValueError: If scene_id is not found in stored collision scenes
148        """
149        collision_scenes = (
150            await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
151                cell=self.nova.cell()._cell_id
152            )
153        )
154
155        if scene_id not in collision_scenes:
156            raise ValueError(f"Collision scene with ID {scene_id} not found")
157
158        log_collision_scenes({scene_id: collision_scenes[scene_id]})
159        return {scene_id: collision_scenes[scene_id]}

Log a specific collision scene by its ID.

Arguments:
  • scene_id (str): The ID of the collision scene to log
Raises:
  • ValueError: If scene_id is not found in stored collision scenes
async def log_saftey_zones(self, motion_group: nova.MotionGroup) -> None:
164    async def log_saftey_zones(self, motion_group: MotionGroup) -> None:
165        tcp_names = await motion_group.tcp_names()
166        tcp = tcp_names[0]
167
168        rr.reset_time()
169        rr.set_time(TIME_INTERVAL_NAME, duration=0)
170
171        log_safety_zones(
172            motion_group.motion_group_id, await motion_group._get_optimizer_setup(tcp=tcp)
173        )
def log_saftey_zones_( self, motion_group_id: str, optimizer_setup: wandelbots_api_client.models.optimizer_setup.OptimizerSetup) -> None:
175    def log_saftey_zones_(
176        self, motion_group_id: str, optimizer_setup: models.OptimizerSetup
177    ) -> None:
178        log_safety_zones(motion_group_id, optimizer_setup)
async def log_motion( self, motion_id: str, timing_mode=<TimingMode.CONTINUE: 2>, time_offset: float = 0, tool_asset: Optional[str] = None) -> None:
180    async def log_motion(
181        self,
182        motion_id: str,
183        timing_mode=TimingMode.CONTINUE,
184        time_offset: float = 0,
185        tool_asset: Optional[str] = None,
186    ) -> None:
187        # Fetch motion details from api
188        motion = await self.nova._api_client.motion_api.get_planned_motion(
189            self.nova.cell()._cell_id, motion_id
190        )
191        optimizer_config = (
192            await self.nova._api_client.motion_group_infos_api.get_optimizer_configuration(
193                self.nova.cell()._cell_id, motion.motion_group
194            )
195        )
196        trajectory = await self.nova._api_client.motion_api.get_motion_trajectory(
197            self.nova.cell()._cell_id, motion_id, int(RECORDING_INTERVAL * 1000)
198        )
199
200        motion_groups = await self.nova._api_client.motion_group_api.list_motion_groups(
201            self.nova.cell()._cell_id
202        )
203        motion_motion_group = next(
204            (mg for mg in motion_groups.instances if mg.motion_group == motion.motion_group), None
205        )
206
207        collision_scenes = (
208            await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
209                cell=self.nova.cell()._cell_id
210            )
211        )
212
213        if motion_motion_group is None:
214            raise ValueError(f"Motion group {motion.motion_group} not found")
215
216        log_motion(
217            motion_id=motion_id,
218            model_from_controller=motion_motion_group.model_from_controller,
219            motion_group=motion.motion_group,
220            optimizer_config=optimizer_config,
221            trajectory=trajectory.trajectory,
222            collision_scenes=collision_scenes,
223            time_offset=time_offset,
224            timing_mode=timing_mode,
225            tool_asset=tool_asset,
226        )
async def log_trajectory( self, joint_trajectory: wandelbots_api_client.models.joint_trajectory.JointTrajectory, tcp: str, motion_group: nova.MotionGroup, timing_mode=<TimingMode.CONTINUE: 2>, time_offset: float = 0, tool_asset: str = None) -> None:
228    async def log_trajectory(
229        self,
230        joint_trajectory: models.JointTrajectory,
231        tcp: str,
232        motion_group: MotionGroup,
233        timing_mode=TimingMode.CONTINUE,
234        time_offset: float = 0,
235        tool_asset: str = None,
236    ) -> None:
237        if len(joint_trajectory.joint_positions) == 0:
238            raise ValueError("No joint trajectory provided")
239        load_plan_response = await motion_group._load_planned_motion(joint_trajectory, tcp)
240        await self.log_motion(
241            load_plan_response.motion,
242            timing_mode=timing_mode,
243            time_offset=time_offset,
244            tool_asset=tool_asset,
245        )
def continue_after_sync(self) -> None:
247    def continue_after_sync(self) -> None:
248        continue_after_sync()
async def log_error_feedback( self, error_feedback: wandelbots_api_client.models.plan_trajectory_failed_response_error_feedback.PlanTrajectoryFailedResponseErrorFeedback) -> None:
250    async def log_error_feedback(
251        self, error_feedback: PlanTrajectoryFailedResponseErrorFeedback
252    ) -> None:
253        if isinstance(error_feedback.actual_instance, FeedbackOutOfWorkspace):
254            if (
255                error_feedback.actual_instance.invalid_tcp_pose
256                and error_feedback.actual_instance.invalid_tcp_pose.position
257            ):
258                position = error_feedback.actual_instance.invalid_tcp_pose.position
259                rr.log(
260                    "motion/errors/FeedbackOutOfWorkspace",
261                    rr.Points3D(
262                        [[position[0], position[1], position[2]]],
263                        radii=rr.Radius.ui_points([5.0]),
264                        colors=[(255, 0, 0, 255)],
265                        labels=["Out of Workspace"],
266                    ),
267                    static=True,
268                )
269
270        if isinstance(error_feedback.actual_instance, FeedbackCollision):
271            collisions = error_feedback.actual_instance.collisions
272            if not collisions:
273                return
274
275            for i, collision in enumerate(collisions):
276                if collision.position_on_a is None or collision.position_on_b is None:
277                    continue
278                if collision.position_on_a.world is None or collision.position_on_b.world is None:
279                    continue
280                if collision.normal_world_on_b is None:
281                    continue
282
283                # Extract positions
284                pos_a = collision.position_on_a.world
285                pos_b = collision.position_on_b.world
286                normal = collision.normal_world_on_b
287
288                # Scale normal for visibility
289                arrow_length = 50
290
291                # Log collision points
292                rr.log(
293                    f"motion/errors/FeedbackCollision/collisions/point_{i}/a",
294                    rr.Points3D(
295                        [pos_a], radii=rr.Radius.ui_points([5.0]), colors=[(255, 0, 0, 255)]
296                    ),
297                    static=True,
298                )
299
300                rr.log(
301                    f"motion/errors/FeedbackCollision/collisions/point_{i}/b",
302                    rr.Points3D(
303                        [pos_b], radii=rr.Radius.ui_points([5.0]), colors=[(0, 0, 255, 255)]
304                    ),
305                    static=True,
306                )
307
308                # Log normal vector as arrow
309                rr.log(
310                    f"motion/errors/FeedbackCollision/collisions/normal_{i}",
311                    rr.Arrows3D(
312                        origins=[pos_b],
313                        vectors=[
314                            [
315                                normal[0] * arrow_length,
316                                normal[1] * arrow_length,
317                                normal[2] * arrow_length,
318                            ]
319                        ],
320                        colors=[(255, 255, 0, 255)],
321                    ),
322                    static=True,
323                )
async def start_streaming(self, motion_group: nova.MotionGroup) -> None:
325    async def start_streaming(self, motion_group: MotionGroup) -> None:
326        """Start streaming real-time robot state to Rerun viewer."""
327        if motion_group in self._streaming_tasks:
328            return
329
330        task = asyncio.create_task(stream_motion_group(self, motion_group=motion_group))
331        self._streaming_tasks[motion_group] = task

Start streaming real-time robot state to Rerun viewer.

async def stop_streaming(self) -> None:
333    async def stop_streaming(self) -> None:
334        """Stop all streaming tasks."""
335        for task in self._streaming_tasks.values():
336            task.cancel()
337        self._streaming_tasks.clear()

Stop all streaming tasks.

async def log_actions( self, actions: list[nova.actions.Action | nova.actions.motions.CollisionFreeMotion] | nova.actions.Action, show_connection: bool = False, motion_group: Optional[nova.MotionGroup] = None) -> None:
339    async def log_actions(
340        self,
341        actions: list[Action | CollisionFreeMotion] | Action,
342        show_connection: bool = False,
343        motion_group: Optional[MotionGroup] = None,
344    ) -> None:
345        from nova_rerun_bridge import trajectory
346
347        rr.reset_time()
348        rr.set_time(TIME_INTERVAL_NAME, duration=trajectory._last_end_time)
349
350        if not isinstance(actions, list):
351            actions = [actions]
352
353        if len(actions) == 0:
354            raise ValueError("No actions provided")
355
356        # Collect poses from regular actions
357        regular_actions = [
358            action for action in actions if isinstance(action, (Motion, WriteAction))
359        ]
360        regular_poses = (
361            CombinedActions(items=tuple(regular_actions)).poses() if regular_actions else []
362        )
363
364        # Collect poses from CollisionFreeMotion targets
365        collision_free_poses = [
366            action.target
367            for action in actions
368            if isinstance(action, CollisionFreeMotion) and isinstance(action.target, Pose)
369        ]
370
371        # Combine all poses
372        all_poses = regular_poses + collision_free_poses
373        positions = []
374        point_colors = []
375        use_red = False
376
377        # Collect all positions and determine colors
378        for i, action in enumerate(actions):
379            if isinstance(action, WriteAction):
380                use_red = True
381            if i < len(all_poses):  # Only process if there's a corresponding pose
382                pose = all_poses[i]
383                logger.debug(f"Pose: {pose}")
384                positions.append([pose.position.x, pose.position.y, pose.position.z])
385                point_colors.append(colors.colors[1] if use_red else colors.colors[9])
386
387        enity_path = (
388            f"motion/{motion_group.motion_group_id}/actions" if motion_group else "motion/actions"
389        )
390
391        # Log all positions at once
392        rr.log(
393            enity_path,
394            rr.Points3D(positions, colors=point_colors, radii=rr.Radius.ui_points([5.0])),
395            static=True,
396        )
397
398        if show_connection:
399            rr.log(
400                f"{enity_path}/connection", rr.LineStrips3D([positions], colors=[155, 155, 155, 50])
401            )
async def cleanup(self) -> None:
415    async def cleanup(self) -> None:
416        """Cleanup resources and close Nova API client connection."""
417        if hasattr(self.nova, "_api_client"):
418            await self.nova._api_client.close()

Cleanup resources and close Nova API client connection.