nova_rerun_bridge

 1from nova_rerun_bridge import (
 2    dh_robot,
 3    hull_visualizer,
 4    motion_storage,
 5    nova_rerun_bridge,
 6    robot_visualizer,
 7)
 8
 9__all__ = ["dh_robot", "hull_visualizer", "robot_visualizer", "motion_storage", "NovaRerunBridge"]
10
11NovaRerunBridge = nova_rerun_bridge.NovaRerunBridge
class NovaRerunBridge:
 31class NovaRerunBridge:
 32    """Bridge between Nova and Rerun for visualization.
 33
 34    This class provides functionality to visualize Nova data in Rerun.
 35    It handles trajectoy, collision scenes, blueprints and proper cleanup of resources.
 36
 37    Example:
 38        ```python
 39        from nova.core.nova import Nova
 40        from nova_rerun_bridge import NovaRerunBridge
 41
 42        async def main():
 43            nova = Nova()
 44            async with NovaRerunBridge(nova) as bridge:
 45                await bridge.setup_blueprint()
 46                await bridge.log_collision_scenes()
 47
 48        ```
 49
 50    Args:
 51        nova (Nova): Instance of Nova client
 52        spawn (bool, optional): Whether to spawn Rerun viewer. Defaults to True.
 53    """
 54
 55    def __init__(self, nova: Nova, spawn: bool = True, recording_id=None) -> None:
 56        self._ensure_models_exist()
 57        self.nova = nova
 58        self._streaming_tasks: dict[MotionGroup, asyncio.Task] = {}
 59        if spawn:
 60            recording_id = recording_id or f"nova_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
 61            rr.init(application_id="nova", recording_id=recording_id, spawn=True)
 62        logger.add(sink=rr.LoggingHandler("logs/handler"))
 63
 64    def _ensure_models_exist(self):
 65        """Ensure robot models are downloaded"""
 66        models_dir = Path(get_project_root()) / "models"
 67        if not models_dir.exists() or not list(models_dir.glob("*.glb")):
 68            print("Models not found, run update_robot_models() or poetry run download-models")
 69
 70    async def setup_blueprint(self) -> None:
 71        """Configure and send blueprint configuration to Rerun.
 72
 73        Fetches motion groups from Nova and configures visualization layout.
 74        """
 75        cell = self.nova.cell()
 76
 77        controllers = await cell.controllers()
 78        motion_groups = []
 79
 80        if not controllers:
 81            logger.warning("No controllers found")
 82            return
 83
 84        for controller in controllers:
 85            for motion_group in await controller.activated_motion_groups():
 86                motion_groups.append(motion_group.motion_group_id)
 87
 88        rr.reset_time()
 89        rr.set_time_seconds(TIME_INTERVAL_NAME, 0)
 90
 91        send_blueprint(motion_groups)
 92        self.log_coordinate_system()
 93
 94    def log_coordinate_system(self) -> None:
 95        """Log the coordinate system of the cell."""
 96
 97        coordinate_origins = np.zeros((3, 3))  # Origin points for x, y, z arrows
 98        coordinate_vectors = (
 99            np.array(
100                [
101                    [1.0, 0.0, 0.0],  # X direction
102                    [0.0, 1.0, 0.0],  # Y direction
103                    [0.0, 0.0, 1.0],  # Z direction
104                ]
105            )
106            * 200.0
107        )  # Scale factor of 200.0 for better visibility
108
109        coordinate_colors = np.array(
110            [
111                [1.0, 0.125, 0.376, 1.0],  # #ff2060 - Red/Pink for X
112                [0.125, 0.875, 0.502, 1.0],  # #20df80 - Green for Y
113                [0.125, 0.502, 1.0, 1.0],  # #2080ff - Blue for Z
114            ]
115        )
116
117        rr.log(
118            "coordinate_system_world",
119            rr.Arrows3D(
120                origins=coordinate_origins,
121                vectors=coordinate_vectors,
122                colors=coordinate_colors,
123                radii=rr.Radius.ui_points([5.0]),
124            ),
125            static=True,
126        )
127
128    async def log_collision_scenes(self) -> dict[str, models.CollisionScene]:
129        """Fetch and log all collision scenes from Nova to Rerun."""
130        collision_scenes = (
131            await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
132                cell=self.nova.cell()._cell_id
133            )
134        )
135        log_collision_scenes(collision_scenes)
136        return collision_scenes
137
138    async def log_collision_scene(self, scene_id: str) -> dict[str, models.CollisionScene]:
139        """Log a specific collision scene by its ID.
140
141        Args:
142            scene_id (str): The ID of the collision scene to log
143
144        Raises:
145            ValueError: If scene_id is not found in stored collision scenes
146        """
147        collision_scenes = (
148            await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
149                cell=self.nova.cell()._cell_id
150            )
151        )
152
153        if scene_id not in collision_scenes:
154            raise ValueError(f"Collision scene with ID {scene_id} not found")
155
156        log_collision_scenes({scene_id: collision_scenes[scene_id]})
157        return {scene_id: collision_scenes[scene_id]}
158
159    def _log_collision_scene(self, collision_scenes: dict[str, models.CollisionScene]) -> None:
160        log_collision_scenes(collision_scenes=collision_scenes)
161
162    async def log_saftey_zones(self, motion_group: MotionGroup) -> None:
163        tcp_names = await motion_group.tcp_names()
164        tcp = tcp_names[0]
165
166        rr.reset_time()
167        rr.set_time_seconds(TIME_INTERVAL_NAME, 0)
168
169        log_safety_zones(
170            motion_group.motion_group_id, await motion_group._get_optimizer_setup(tcp=tcp)
171        )
172
173    def log_saftey_zones_(
174        self, motion_group_id: str, optimizer_setup: models.OptimizerSetup
175    ) -> None:
176        log_safety_zones(motion_group_id, optimizer_setup)
177
178    async def log_motion(
179        self,
180        motion_id: str,
181        timing_mode=TimingMode.CONTINUE,
182        time_offset: float = 0,
183        tool_asset: str = None,
184    ) -> None:
185        # Fetch motion details from api
186        motion = await self.nova._api_client.motion_api.get_planned_motion(
187            self.nova.cell()._cell_id, motion_id
188        )
189        optimizer_config = (
190            await self.nova._api_client.motion_group_infos_api.get_optimizer_configuration(
191                self.nova.cell()._cell_id, motion.motion_group
192            )
193        )
194        trajectory = await self.nova._api_client.motion_api.get_motion_trajectory(
195            self.nova.cell()._cell_id, motion_id, int(RECORDING_INTERVAL * 1000)
196        )
197
198        motion_groups = await self.nova._api_client.motion_group_api.list_motion_groups(
199            self.nova.cell()._cell_id
200        )
201        motion_motion_group = next(
202            (mg for mg in motion_groups.instances if mg.motion_group == motion.motion_group), None
203        )
204
205        collision_scenes = (
206            await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
207                cell=self.nova.cell()._cell_id
208            )
209        )
210
211        if motion_motion_group is None:
212            raise ValueError(f"Motion group {motion.motion_group} not found")
213
214        log_motion(
215            motion_id=motion_id,
216            model_from_controller=motion_motion_group.model_from_controller,
217            motion_group=motion.motion_group,
218            optimizer_config=optimizer_config,
219            trajectory=trajectory.trajectory,
220            collision_scenes=collision_scenes,
221            time_offset=time_offset,
222            timing_mode=timing_mode,
223            tool_asset=tool_asset,
224        )
225
226    async def log_trajectory(
227        self,
228        joint_trajectory: models.JointTrajectory,
229        tcp: str,
230        motion_group: MotionGroup,
231        timing_mode=TimingMode.CONTINUE,
232        time_offset: float = 0,
233        tool_asset: str = None,
234    ) -> None:
235        if len(joint_trajectory.joint_positions) == 0:
236            raise ValueError("No joint trajectory provided")
237        load_plan_response = await motion_group._load_planned_motion(joint_trajectory, tcp)
238        await self.log_motion(
239            load_plan_response.motion,
240            timing_mode=timing_mode,
241            time_offset=time_offset,
242            tool_asset=tool_asset,
243        )
244
245    def continue_after_sync(self) -> None:
246        continue_after_sync()
247
248    async def log_error_feedback(
249        self, error_feedback: PlanTrajectoryFailedResponseErrorFeedback
250    ) -> None:
251        if isinstance(error_feedback.actual_instance, FeedbackOutOfWorkspace):
252            if (
253                error_feedback.actual_instance.invalid_tcp_pose
254                and error_feedback.actual_instance.invalid_tcp_pose.position
255            ):
256                position = error_feedback.actual_instance.invalid_tcp_pose.position
257                rr.log(
258                    "motion/errors/FeedbackOutOfWorkspace",
259                    rr.Points3D(
260                        [[position[0], position[1], position[2]]],
261                        radii=rr.Radius.ui_points([5.0]),
262                        colors=[(255, 0, 0, 255)],
263                        labels=["Out of Workspace"],
264                    ),
265                    static=True,
266                )
267
268    async def start_streaming(self, motion_group: MotionGroup) -> None:
269        """Start streaming real-time robot state to Rerun viewer."""
270        if motion_group in self._streaming_tasks:
271            return
272
273        task = asyncio.create_task(stream_motion_group(self, motion_group=motion_group))
274        self._streaming_tasks[motion_group] = task
275
276    async def stop_streaming(self) -> None:
277        """Stop all streaming tasks."""
278        for task in self._streaming_tasks.values():
279            task.cancel()
280        self._streaming_tasks.clear()
281
282    async def log_actions(
283        self, actions: list[Action | CollisionFreeMotion] | Action, show_connection: bool = False
284    ) -> None:
285        from nova_rerun_bridge import trajectory
286
287        rr.reset_time()
288        rr.set_time_seconds(TIME_INTERVAL_NAME, trajectory._last_end_time)
289
290        if not isinstance(actions, list):
291            actions = [actions]
292
293        if len(actions) == 0:
294            raise ValueError("No actions provided")
295
296        # Collect poses from regular actions
297        regular_actions = [
298            action for action in actions if isinstance(action, (Motion, WriteAction))
299        ]
300        regular_poses = (
301            CombinedActions(items=tuple(regular_actions)).poses() if regular_actions else []
302        )
303
304        # Collect poses from CollisionFreeMotion targets
305        collision_free_poses = [
306            action.target
307            for action in actions
308            if isinstance(action, CollisionFreeMotion) and isinstance(action.target, Pose)
309        ]
310
311        # Combine all poses
312        all_poses = regular_poses + collision_free_poses
313        positions = []
314        point_colors = []
315        use_red = False
316
317        # Collect all positions and determine colors
318        for i, action in enumerate(actions):
319            if isinstance(action, WriteAction):
320                use_red = True
321            if i < len(all_poses):  # Only process if there's a corresponding pose
322                pose = all_poses[i]
323                logger.debug(f"Pose: {pose}")
324                positions.append([pose.position.x, pose.position.y, pose.position.z])
325                point_colors.append(colors.colors[1] if use_red else colors.colors[9])
326
327        # Log all positions at once
328        rr.log(
329            "motion/actions",
330            rr.Points3D(positions, colors=point_colors, radii=rr.Radius.ui_points([5.0])),
331            static=True,
332        )
333
334        if show_connection:
335            rr.log(
336                "motion/actions/connection",
337                rr.LineStrips3D([positions], colors=[155, 155, 155, 50]),
338            )
339
340    async def __aenter__(self) -> "NovaRerunBridge":
341        """Context manager entry point.
342
343        Returns:
344            NovaRerunBridge: Self reference for context manager usage.
345        """
346        return self
347
348    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
349        """Context manager exit point, ensures cleanup."""
350        await self.cleanup()
351
352    async def cleanup(self) -> None:
353        """Cleanup resources and close Nova API client connection."""
354        if hasattr(self.nova, "_api_client"):
355            await self.nova._api_client.close()

Bridge between Nova and Rerun for visualization.

This class provides functionality to visualize Nova data in Rerun. It handles trajectoy, collision scenes, blueprints and proper cleanup of resources.

Example:

from nova.core.nova import Nova
from nova_rerun_bridge import NovaRerunBridge

async def main():
    nova = Nova()
    async with NovaRerunBridge(nova) as bridge:
        await bridge.setup_blueprint()
        await bridge.log_collision_scenes()

Args: nova (Nova): Instance of Nova client spawn (bool, optional): Whether to spawn Rerun viewer. Defaults to True.

NovaRerunBridge(nova: nova.Nova, spawn: bool = True, recording_id=None)
55    def __init__(self, nova: Nova, spawn: bool = True, recording_id=None) -> None:
56        self._ensure_models_exist()
57        self.nova = nova
58        self._streaming_tasks: dict[MotionGroup, asyncio.Task] = {}
59        if spawn:
60            recording_id = recording_id or f"nova_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
61            rr.init(application_id="nova", recording_id=recording_id, spawn=True)
62        logger.add(sink=rr.LoggingHandler("logs/handler"))
nova
async def setup_blueprint(self) -> None:
70    async def setup_blueprint(self) -> None:
71        """Configure and send blueprint configuration to Rerun.
72
73        Fetches motion groups from Nova and configures visualization layout.
74        """
75        cell = self.nova.cell()
76
77        controllers = await cell.controllers()
78        motion_groups = []
79
80        if not controllers:
81            logger.warning("No controllers found")
82            return
83
84        for controller in controllers:
85            for motion_group in await controller.activated_motion_groups():
86                motion_groups.append(motion_group.motion_group_id)
87
88        rr.reset_time()
89        rr.set_time_seconds(TIME_INTERVAL_NAME, 0)
90
91        send_blueprint(motion_groups)
92        self.log_coordinate_system()

Configure and send blueprint configuration to Rerun.

Fetches motion groups from Nova and configures visualization layout.

def log_coordinate_system(self) -> None:
 94    def log_coordinate_system(self) -> None:
 95        """Log the coordinate system of the cell."""
 96
 97        coordinate_origins = np.zeros((3, 3))  # Origin points for x, y, z arrows
 98        coordinate_vectors = (
 99            np.array(
100                [
101                    [1.0, 0.0, 0.0],  # X direction
102                    [0.0, 1.0, 0.0],  # Y direction
103                    [0.0, 0.0, 1.0],  # Z direction
104                ]
105            )
106            * 200.0
107        )  # Scale factor of 200.0 for better visibility
108
109        coordinate_colors = np.array(
110            [
111                [1.0, 0.125, 0.376, 1.0],  # #ff2060 - Red/Pink for X
112                [0.125, 0.875, 0.502, 1.0],  # #20df80 - Green for Y
113                [0.125, 0.502, 1.0, 1.0],  # #2080ff - Blue for Z
114            ]
115        )
116
117        rr.log(
118            "coordinate_system_world",
119            rr.Arrows3D(
120                origins=coordinate_origins,
121                vectors=coordinate_vectors,
122                colors=coordinate_colors,
123                radii=rr.Radius.ui_points([5.0]),
124            ),
125            static=True,
126        )

Log the coordinate system of the cell.

async def log_collision_scenes( self) -> dict[str, wandelbots_api_client.models.collision_scene.CollisionScene]:
128    async def log_collision_scenes(self) -> dict[str, models.CollisionScene]:
129        """Fetch and log all collision scenes from Nova to Rerun."""
130        collision_scenes = (
131            await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
132                cell=self.nova.cell()._cell_id
133            )
134        )
135        log_collision_scenes(collision_scenes)
136        return collision_scenes

Fetch and log all collision scenes from Nova to Rerun.

async def log_collision_scene( self, scene_id: str) -> dict[str, wandelbots_api_client.models.collision_scene.CollisionScene]:
138    async def log_collision_scene(self, scene_id: str) -> dict[str, models.CollisionScene]:
139        """Log a specific collision scene by its ID.
140
141        Args:
142            scene_id (str): The ID of the collision scene to log
143
144        Raises:
145            ValueError: If scene_id is not found in stored collision scenes
146        """
147        collision_scenes = (
148            await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
149                cell=self.nova.cell()._cell_id
150            )
151        )
152
153        if scene_id not in collision_scenes:
154            raise ValueError(f"Collision scene with ID {scene_id} not found")
155
156        log_collision_scenes({scene_id: collision_scenes[scene_id]})
157        return {scene_id: collision_scenes[scene_id]}

Log a specific collision scene by its ID.

Args: scene_id (str): The ID of the collision scene to log

Raises: ValueError: If scene_id is not found in stored collision scenes

async def log_saftey_zones(self, motion_group: nova.MotionGroup) -> None:
162    async def log_saftey_zones(self, motion_group: MotionGroup) -> None:
163        tcp_names = await motion_group.tcp_names()
164        tcp = tcp_names[0]
165
166        rr.reset_time()
167        rr.set_time_seconds(TIME_INTERVAL_NAME, 0)
168
169        log_safety_zones(
170            motion_group.motion_group_id, await motion_group._get_optimizer_setup(tcp=tcp)
171        )
def log_saftey_zones_( self, motion_group_id: str, optimizer_setup: wandelbots_api_client.models.optimizer_setup.OptimizerSetup) -> None:
173    def log_saftey_zones_(
174        self, motion_group_id: str, optimizer_setup: models.OptimizerSetup
175    ) -> None:
176        log_safety_zones(motion_group_id, optimizer_setup)
async def log_motion( self, motion_id: str, timing_mode=<TimingMode.CONTINUE: 2>, time_offset: float = 0, tool_asset: str = None) -> None:
178    async def log_motion(
179        self,
180        motion_id: str,
181        timing_mode=TimingMode.CONTINUE,
182        time_offset: float = 0,
183        tool_asset: str = None,
184    ) -> None:
185        # Fetch motion details from api
186        motion = await self.nova._api_client.motion_api.get_planned_motion(
187            self.nova.cell()._cell_id, motion_id
188        )
189        optimizer_config = (
190            await self.nova._api_client.motion_group_infos_api.get_optimizer_configuration(
191                self.nova.cell()._cell_id, motion.motion_group
192            )
193        )
194        trajectory = await self.nova._api_client.motion_api.get_motion_trajectory(
195            self.nova.cell()._cell_id, motion_id, int(RECORDING_INTERVAL * 1000)
196        )
197
198        motion_groups = await self.nova._api_client.motion_group_api.list_motion_groups(
199            self.nova.cell()._cell_id
200        )
201        motion_motion_group = next(
202            (mg for mg in motion_groups.instances if mg.motion_group == motion.motion_group), None
203        )
204
205        collision_scenes = (
206            await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
207                cell=self.nova.cell()._cell_id
208            )
209        )
210
211        if motion_motion_group is None:
212            raise ValueError(f"Motion group {motion.motion_group} not found")
213
214        log_motion(
215            motion_id=motion_id,
216            model_from_controller=motion_motion_group.model_from_controller,
217            motion_group=motion.motion_group,
218            optimizer_config=optimizer_config,
219            trajectory=trajectory.trajectory,
220            collision_scenes=collision_scenes,
221            time_offset=time_offset,
222            timing_mode=timing_mode,
223            tool_asset=tool_asset,
224        )
async def log_trajectory( self, joint_trajectory: wandelbots_api_client.models.joint_trajectory.JointTrajectory, tcp: str, motion_group: nova.MotionGroup, timing_mode=<TimingMode.CONTINUE: 2>, time_offset: float = 0, tool_asset: str = None) -> None:
226    async def log_trajectory(
227        self,
228        joint_trajectory: models.JointTrajectory,
229        tcp: str,
230        motion_group: MotionGroup,
231        timing_mode=TimingMode.CONTINUE,
232        time_offset: float = 0,
233        tool_asset: str = None,
234    ) -> None:
235        if len(joint_trajectory.joint_positions) == 0:
236            raise ValueError("No joint trajectory provided")
237        load_plan_response = await motion_group._load_planned_motion(joint_trajectory, tcp)
238        await self.log_motion(
239            load_plan_response.motion,
240            timing_mode=timing_mode,
241            time_offset=time_offset,
242            tool_asset=tool_asset,
243        )
def continue_after_sync(self) -> None:
245    def continue_after_sync(self) -> None:
246        continue_after_sync()
async def log_error_feedback( self, error_feedback: wandelbots_api_client.models.plan_trajectory_failed_response_error_feedback.PlanTrajectoryFailedResponseErrorFeedback) -> None:
248    async def log_error_feedback(
249        self, error_feedback: PlanTrajectoryFailedResponseErrorFeedback
250    ) -> None:
251        if isinstance(error_feedback.actual_instance, FeedbackOutOfWorkspace):
252            if (
253                error_feedback.actual_instance.invalid_tcp_pose
254                and error_feedback.actual_instance.invalid_tcp_pose.position
255            ):
256                position = error_feedback.actual_instance.invalid_tcp_pose.position
257                rr.log(
258                    "motion/errors/FeedbackOutOfWorkspace",
259                    rr.Points3D(
260                        [[position[0], position[1], position[2]]],
261                        radii=rr.Radius.ui_points([5.0]),
262                        colors=[(255, 0, 0, 255)],
263                        labels=["Out of Workspace"],
264                    ),
265                    static=True,
266                )
async def start_streaming(self, motion_group: nova.MotionGroup) -> None:
268    async def start_streaming(self, motion_group: MotionGroup) -> None:
269        """Start streaming real-time robot state to Rerun viewer."""
270        if motion_group in self._streaming_tasks:
271            return
272
273        task = asyncio.create_task(stream_motion_group(self, motion_group=motion_group))
274        self._streaming_tasks[motion_group] = task

Start streaming real-time robot state to Rerun viewer.

async def stop_streaming(self) -> None:
276    async def stop_streaming(self) -> None:
277        """Stop all streaming tasks."""
278        for task in self._streaming_tasks.values():
279            task.cancel()
280        self._streaming_tasks.clear()

Stop all streaming tasks.

async def log_actions( self, actions: list[nova.actions.Action | nova.actions.motions.CollisionFreeMotion] | nova.actions.Action, show_connection: bool = False) -> None:
282    async def log_actions(
283        self, actions: list[Action | CollisionFreeMotion] | Action, show_connection: bool = False
284    ) -> None:
285        from nova_rerun_bridge import trajectory
286
287        rr.reset_time()
288        rr.set_time_seconds(TIME_INTERVAL_NAME, trajectory._last_end_time)
289
290        if not isinstance(actions, list):
291            actions = [actions]
292
293        if len(actions) == 0:
294            raise ValueError("No actions provided")
295
296        # Collect poses from regular actions
297        regular_actions = [
298            action for action in actions if isinstance(action, (Motion, WriteAction))
299        ]
300        regular_poses = (
301            CombinedActions(items=tuple(regular_actions)).poses() if regular_actions else []
302        )
303
304        # Collect poses from CollisionFreeMotion targets
305        collision_free_poses = [
306            action.target
307            for action in actions
308            if isinstance(action, CollisionFreeMotion) and isinstance(action.target, Pose)
309        ]
310
311        # Combine all poses
312        all_poses = regular_poses + collision_free_poses
313        positions = []
314        point_colors = []
315        use_red = False
316
317        # Collect all positions and determine colors
318        for i, action in enumerate(actions):
319            if isinstance(action, WriteAction):
320                use_red = True
321            if i < len(all_poses):  # Only process if there's a corresponding pose
322                pose = all_poses[i]
323                logger.debug(f"Pose: {pose}")
324                positions.append([pose.position.x, pose.position.y, pose.position.z])
325                point_colors.append(colors.colors[1] if use_red else colors.colors[9])
326
327        # Log all positions at once
328        rr.log(
329            "motion/actions",
330            rr.Points3D(positions, colors=point_colors, radii=rr.Radius.ui_points([5.0])),
331            static=True,
332        )
333
334        if show_connection:
335            rr.log(
336                "motion/actions/connection",
337                rr.LineStrips3D([positions], colors=[155, 155, 155, 50]),
338            )
async def cleanup(self) -> None:
352    async def cleanup(self) -> None:
353        """Cleanup resources and close Nova API client connection."""
354        if hasattr(self.nova, "_api_client"):
355            await self.nova._api_client.close()

Cleanup resources and close Nova API client connection.