
 1from nova_rerun_bridge import (
 2    dh_robot,
 3    hull_visualizer,
 4    motion_storage,
 5    nova_rerun_bridge,
 6    robot_visualizer,
 9__all__ = ["dh_robot", "hull_visualizer", "robot_visualizer", "motion_storage", "NovaRerunBridge"]
11NovaRerunBridge = nova_rerun_bridge.NovaRerunBridge
class NovaRerunBridge:
 31class NovaRerunBridge:
 32    """Bridge between Nova and Rerun for visualization.
 34    This class provides functionality to visualize Nova data in Rerun.
 35    It handles trajectoy, collision scenes, blueprints and proper cleanup of resources.
 37    Example:
 38        ```python
 39        from nova.core.nova import Nova
 40        from nova_rerun_bridge import NovaRerunBridge
 42        async def main():
 43            nova = Nova()
 44            async with NovaRerunBridge(nova) as bridge:
 45                await bridge.setup_blueprint()
 46                await bridge.log_collision_scenes()
 48        ```
 50    Args:
 51        nova (Nova): Instance of Nova client
 52        spawn (bool, optional): Whether to spawn Rerun viewer. Defaults to True.
 53    """
 55    def __init__(self, nova: Nova, spawn: bool = True, recording_id=None) -> None:
 56        self._ensure_models_exist()
 57        self.nova = nova
 58        self._streaming_tasks: dict[MotionGroup, asyncio.Task] = {}
 59        if spawn:
 60            recording_id = recording_id or f"nova_{'%Y%m%d_%H%M%S')}"
 61            rr.init(application_id="nova", recording_id=recording_id, spawn=True)
 62        logger.add(sink=rr.LoggingHandler("logs/handler"))
 64    def _ensure_models_exist(self):
 65        """Ensure robot models are downloaded"""
 66        models_dir = Path(get_project_root()) / "models"
 67        if not models_dir.exists() or not list(models_dir.glob("*.glb")):
 68            print("Models not found, run update_robot_models() or poetry run download-models")
 70    async def setup_blueprint(self) -> None:
 71        """Configure and send blueprint configuration to Rerun.
 73        Fetches motion groups from Nova and configures visualization layout.
 74        """
 75        cell = self.nova.cell()
 77        controllers = await cell.controllers()
 78        motion_groups = []
 80        if not controllers:
 81            logger.warning("No controllers found")
 82            return
 84        for controller in controllers:
 85            for motion_group in await controller.activated_motion_groups():
 86                motion_groups.append(motion_group.motion_group_id)
 88        rr.reset_time()
 89        rr.set_time_seconds(TIME_INTERVAL_NAME, 0)
 91        send_blueprint(motion_groups)
 92        self.log_coordinate_system()
 94    def log_coordinate_system(self) -> None:
 95        """Log the coordinate system of the cell."""
 97        coordinate_origins = np.zeros((3, 3))  # Origin points for x, y, z arrows
 98        coordinate_vectors = (
 99            np.array(
100                [
101                    [1.0, 0.0, 0.0],  # X direction
102                    [0.0, 1.0, 0.0],  # Y direction
103                    [0.0, 0.0, 1.0],  # Z direction
104                ]
105            )
106            * 200.0
107        )  # Scale factor of 200.0 for better visibility
109        coordinate_colors = np.array(
110            [
111                [1.0, 0.125, 0.376, 1.0],  # #ff2060 - Red/Pink for X
112                [0.125, 0.875, 0.502, 1.0],  # #20df80 - Green for Y
113                [0.125, 0.502, 1.0, 1.0],  # #2080ff - Blue for Z
114            ]
115        )
117        rr.log(
118            "coordinate_system_world",
119            rr.Arrows3D(
120                origins=coordinate_origins,
121                vectors=coordinate_vectors,
122                colors=coordinate_colors,
123                radii=rr.Radius.ui_points([5.0]),
124            ),
125            static=True,
126        )
128    async def log_collision_scenes(self) -> dict[str, models.CollisionScene]:
129        """Fetch and log all collision scenes from Nova to Rerun."""
130        collision_scenes = (
131            await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
132                cell=self.nova.cell()._cell_id
133            )
134        )
135        log_collision_scenes(collision_scenes)
136        return collision_scenes
138    async def log_collision_scene(self, scene_id: str) -> dict[str, models.CollisionScene]:
139        """Log a specific collision scene by its ID.
141        Args:
142            scene_id (str): The ID of the collision scene to log
144        Raises:
145            ValueError: If scene_id is not found in stored collision scenes
146        """
147        collision_scenes = (
148            await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
149                cell=self.nova.cell()._cell_id
150            )
151        )
153        if scene_id not in collision_scenes:
154            raise ValueError(f"Collision scene with ID {scene_id} not found")
156        log_collision_scenes({scene_id: collision_scenes[scene_id]})
157        return {scene_id: collision_scenes[scene_id]}
159    def _log_collision_scene(self, collision_scenes: dict[str, models.CollisionScene]) -> None:
160        log_collision_scenes(collision_scenes=collision_scenes)
162    async def log_saftey_zones(self, motion_group: MotionGroup) -> None:
163        tcp_names = await motion_group.tcp_names()
164        tcp = tcp_names[0]
166        rr.reset_time()
167        rr.set_time_seconds(TIME_INTERVAL_NAME, 0)
169        log_safety_zones(
170            motion_group.motion_group_id, await motion_group._get_optimizer_setup(tcp=tcp)
171        )
173    def log_saftey_zones_(
174        self, motion_group_id: str, optimizer_setup: models.OptimizerSetup
175    ) -> None:
176        log_safety_zones(motion_group_id, optimizer_setup)
178    async def log_motion(
179        self,
180        motion_id: str,
181        timing_mode=TimingMode.CONTINUE,
182        time_offset: float = 0,
183        tool_asset: str = None,
184    ) -> None:
185        # Fetch motion details from api
186        motion = await self.nova._api_client.motion_api.get_planned_motion(
187            self.nova.cell()._cell_id, motion_id
188        )
189        optimizer_config = (
190            await self.nova._api_client.motion_group_infos_api.get_optimizer_configuration(
191                self.nova.cell()._cell_id, motion.motion_group
192            )
193        )
194        trajectory = await self.nova._api_client.motion_api.get_motion_trajectory(
195            self.nova.cell()._cell_id, motion_id, int(RECORDING_INTERVAL * 1000)
196        )
198        motion_groups = await self.nova._api_client.motion_group_api.list_motion_groups(
199            self.nova.cell()._cell_id
200        )
201        motion_motion_group = next(
202            (mg for mg in motion_groups.instances if mg.motion_group == motion.motion_group), None
203        )
205        collision_scenes = (
206            await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
207                cell=self.nova.cell()._cell_id
208            )
209        )
211        if motion_motion_group is None:
212            raise ValueError(f"Motion group {motion.motion_group} not found")
214        log_motion(
215            motion_id=motion_id,
216            model_from_controller=motion_motion_group.model_from_controller,
217            motion_group=motion.motion_group,
218            optimizer_config=optimizer_config,
219            trajectory=trajectory.trajectory,
220            collision_scenes=collision_scenes,
221            time_offset=time_offset,
222            timing_mode=timing_mode,
223            tool_asset=tool_asset,
224        )
226    async def log_trajectory(
227        self,
228        joint_trajectory: models.JointTrajectory,
229        tcp: str,
230        motion_group: MotionGroup,
231        timing_mode=TimingMode.CONTINUE,
232        time_offset: float = 0,
233        tool_asset: str = None,
234    ) -> None:
235        if len(joint_trajectory.joint_positions) == 0:
236            raise ValueError("No joint trajectory provided")
237        load_plan_response = await motion_group._load_planned_motion(joint_trajectory, tcp)
238        await self.log_motion(
239            load_plan_response.motion,
240            timing_mode=timing_mode,
241            time_offset=time_offset,
242            tool_asset=tool_asset,
243        )
245    def continue_after_sync(self) -> None:
246        continue_after_sync()
248    async def log_error_feedback(
249        self, error_feedback: PlanTrajectoryFailedResponseErrorFeedback
250    ) -> None:
251        if isinstance(error_feedback.actual_instance, FeedbackOutOfWorkspace):
252            if (
253                error_feedback.actual_instance.invalid_tcp_pose
254                and error_feedback.actual_instance.invalid_tcp_pose.position
255            ):
256                position = error_feedback.actual_instance.invalid_tcp_pose.position
257                rr.log(
258                    "motion/errors/FeedbackOutOfWorkspace",
259                    rr.Points3D(
260                        [[position[0], position[1], position[2]]],
261                        radii=rr.Radius.ui_points([5.0]),
262                        colors=[(255, 0, 0, 255)],
263                        labels=["Out of Workspace"],
264                    ),
265                    static=True,
266                )
268    async def start_streaming(self, motion_group: MotionGroup) -> None:
269        """Start streaming real-time robot state to Rerun viewer."""
270        if motion_group in self._streaming_tasks:
271            return
273        task = asyncio.create_task(stream_motion_group(self, motion_group=motion_group))
274        self._streaming_tasks[motion_group] = task
276    async def stop_streaming(self) -> None:
277        """Stop all streaming tasks."""
278        for task in self._streaming_tasks.values():
279            task.cancel()
280        self._streaming_tasks.clear()
282    async def log_actions(
283        self, actions: list[Action | CollisionFreeMotion] | Action, show_connection: bool = False
284    ) -> None:
285        from nova_rerun_bridge import trajectory
287        rr.reset_time()
288        rr.set_time_seconds(TIME_INTERVAL_NAME, trajectory._last_end_time)
290        if not isinstance(actions, list):
291            actions = [actions]
293        if len(actions) == 0:
294            raise ValueError("No actions provided")
296        # Collect poses from regular actions
297        regular_actions = [
298            action for action in actions if isinstance(action, (Motion, WriteAction))
299        ]
300        regular_poses = (
301            CombinedActions(items=tuple(regular_actions)).poses() if regular_actions else []
302        )
304        # Collect poses from CollisionFreeMotion targets
305        collision_free_poses = [
307            for action in actions
308            if isinstance(action, CollisionFreeMotion) and isinstance(, Pose)
309        ]
311        # Combine all poses
312        all_poses = regular_poses + collision_free_poses
313        positions = []
314        point_colors = []
315        use_red = False
317        # Collect all positions and determine colors
318        for i, action in enumerate(actions):
319            if isinstance(action, WriteAction):
320                use_red = True
321            if i < len(all_poses):  # Only process if there's a corresponding pose
322                pose = all_poses[i]
323                logger.debug(f"Pose: {pose}")
324                positions.append([pose.position.x, pose.position.y, pose.position.z])
325                point_colors.append(colors.colors[1] if use_red else colors.colors[9])
327        # Log all positions at once
328        rr.log(
329            "motion/actions",
330            rr.Points3D(positions, colors=point_colors, radii=rr.Radius.ui_points([5.0])),
331            static=True,
332        )
334        if show_connection:
335            rr.log(
336                "motion/actions/connection",
337                rr.LineStrips3D([positions], colors=[155, 155, 155, 50]),
338            )
340    async def __aenter__(self) -> "NovaRerunBridge":
341        """Context manager entry point.
343        Returns:
344            NovaRerunBridge: Self reference for context manager usage.
345        """
346        return self
348    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
349        """Context manager exit point, ensures cleanup."""
350        await self.cleanup()
352    async def cleanup(self) -> None:
353        """Cleanup resources and close Nova API client connection."""
354        if hasattr(self.nova, "_api_client"):
355            await self.nova._api_client.close()

Bridge between Nova and Rerun for visualization.

This class provides functionality to visualize Nova data in Rerun. It handles trajectoy, collision scenes, blueprints and proper cleanup of resources.


from nova.core.nova import Nova
from nova_rerun_bridge import NovaRerunBridge

async def main():
    nova = Nova()
    async with NovaRerunBridge(nova) as bridge:
        await bridge.setup_blueprint()
        await bridge.log_collision_scenes()

Args: nova (Nova): Instance of Nova client spawn (bool, optional): Whether to spawn Rerun viewer. Defaults to True.

NovaRerunBridge(nova: nova.Nova, spawn: bool = True, recording_id=None)
55    def __init__(self, nova: Nova, spawn: bool = True, recording_id=None) -> None:
56        self._ensure_models_exist()
57        self.nova = nova
58        self._streaming_tasks: dict[MotionGroup, asyncio.Task] = {}
59        if spawn:
60            recording_id = recording_id or f"nova_{'%Y%m%d_%H%M%S')}"
61            rr.init(application_id="nova", recording_id=recording_id, spawn=True)
62        logger.add(sink=rr.LoggingHandler("logs/handler"))
async def setup_blueprint(self) -> None:
70    async def setup_blueprint(self) -> None:
71        """Configure and send blueprint configuration to Rerun.
73        Fetches motion groups from Nova and configures visualization layout.
74        """
75        cell = self.nova.cell()
77        controllers = await cell.controllers()
78        motion_groups = []
80        if not controllers:
81            logger.warning("No controllers found")
82            return
84        for controller in controllers:
85            for motion_group in await controller.activated_motion_groups():
86                motion_groups.append(motion_group.motion_group_id)
88        rr.reset_time()
89        rr.set_time_seconds(TIME_INTERVAL_NAME, 0)
91        send_blueprint(motion_groups)
92        self.log_coordinate_system()

Configure and send blueprint configuration to Rerun.

Fetches motion groups from Nova and configures visualization layout.

def log_coordinate_system(self) -> None:
 94    def log_coordinate_system(self) -> None:
 95        """Log the coordinate system of the cell."""
 97        coordinate_origins = np.zeros((3, 3))  # Origin points for x, y, z arrows
 98        coordinate_vectors = (
 99            np.array(
100                [
101                    [1.0, 0.0, 0.0],  # X direction
102                    [0.0, 1.0, 0.0],  # Y direction
103                    [0.0, 0.0, 1.0],  # Z direction
104                ]
105            )
106            * 200.0
107        )  # Scale factor of 200.0 for better visibility
109        coordinate_colors = np.array(
110            [
111                [1.0, 0.125, 0.376, 1.0],  # #ff2060 - Red/Pink for X
112                [0.125, 0.875, 0.502, 1.0],  # #20df80 - Green for Y
113                [0.125, 0.502, 1.0, 1.0],  # #2080ff - Blue for Z
114            ]
115        )
117        rr.log(
118            "coordinate_system_world",
119            rr.Arrows3D(
120                origins=coordinate_origins,
121                vectors=coordinate_vectors,
122                colors=coordinate_colors,
123                radii=rr.Radius.ui_points([5.0]),
124            ),
125            static=True,
126        )

Log the coordinate system of the cell.

async def log_collision_scenes( self) -> dict[str, wandelbots_api_client.models.collision_scene.CollisionScene]:
128    async def log_collision_scenes(self) -> dict[str, models.CollisionScene]:
129        """Fetch and log all collision scenes from Nova to Rerun."""
130        collision_scenes = (
131            await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
132                cell=self.nova.cell()._cell_id
133            )
134        )
135        log_collision_scenes(collision_scenes)
136        return collision_scenes

Fetch and log all collision scenes from Nova to Rerun.

async def log_collision_scene( self, scene_id: str) -> dict[str, wandelbots_api_client.models.collision_scene.CollisionScene]:
138    async def log_collision_scene(self, scene_id: str) -> dict[str, models.CollisionScene]:
139        """Log a specific collision scene by its ID.
141        Args:
142            scene_id (str): The ID of the collision scene to log
144        Raises:
145            ValueError: If scene_id is not found in stored collision scenes
146        """
147        collision_scenes = (
148            await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
149                cell=self.nova.cell()._cell_id
150            )
151        )
153        if scene_id not in collision_scenes:
154            raise ValueError(f"Collision scene with ID {scene_id} not found")
156        log_collision_scenes({scene_id: collision_scenes[scene_id]})
157        return {scene_id: collision_scenes[scene_id]}

Log a specific collision scene by its ID.

Args: scene_id (str): The ID of the collision scene to log

Raises: ValueError: If scene_id is not found in stored collision scenes

async def log_saftey_zones(self, motion_group: nova.MotionGroup) -> None:
162    async def log_saftey_zones(self, motion_group: MotionGroup) -> None:
163        tcp_names = await motion_group.tcp_names()
164        tcp = tcp_names[0]
166        rr.reset_time()
167        rr.set_time_seconds(TIME_INTERVAL_NAME, 0)
169        log_safety_zones(
170            motion_group.motion_group_id, await motion_group._get_optimizer_setup(tcp=tcp)
171        )
def log_saftey_zones_( self, motion_group_id: str, optimizer_setup: wandelbots_api_client.models.optimizer_setup.OptimizerSetup) -> None:
173    def log_saftey_zones_(
174        self, motion_group_id: str, optimizer_setup: models.OptimizerSetup
175    ) -> None:
176        log_safety_zones(motion_group_id, optimizer_setup)
async def log_motion( self, motion_id: str, timing_mode=<TimingMode.CONTINUE: 2>, time_offset: float = 0, tool_asset: str = None) -> None:
178    async def log_motion(
179        self,
180        motion_id: str,
181        timing_mode=TimingMode.CONTINUE,
182        time_offset: float = 0,
183        tool_asset: str = None,
184    ) -> None:
185        # Fetch motion details from api
186        motion = await self.nova._api_client.motion_api.get_planned_motion(
187            self.nova.cell()._cell_id, motion_id
188        )
189        optimizer_config = (
190            await self.nova._api_client.motion_group_infos_api.get_optimizer_configuration(
191                self.nova.cell()._cell_id, motion.motion_group
192            )
193        )
194        trajectory = await self.nova._api_client.motion_api.get_motion_trajectory(
195            self.nova.cell()._cell_id, motion_id, int(RECORDING_INTERVAL * 1000)
196        )
198        motion_groups = await self.nova._api_client.motion_group_api.list_motion_groups(
199            self.nova.cell()._cell_id
200        )
201        motion_motion_group = next(
202            (mg for mg in motion_groups.instances if mg.motion_group == motion.motion_group), None
203        )
205        collision_scenes = (
206            await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes(
207                cell=self.nova.cell()._cell_id
208            )
209        )
211        if motion_motion_group is None:
212            raise ValueError(f"Motion group {motion.motion_group} not found")
214        log_motion(
215            motion_id=motion_id,
216            model_from_controller=motion_motion_group.model_from_controller,
217            motion_group=motion.motion_group,
218            optimizer_config=optimizer_config,
219            trajectory=trajectory.trajectory,
220            collision_scenes=collision_scenes,
221            time_offset=time_offset,
222            timing_mode=timing_mode,
223            tool_asset=tool_asset,
224        )
async def log_trajectory( self, joint_trajectory: wandelbots_api_client.models.joint_trajectory.JointTrajectory, tcp: str, motion_group: nova.MotionGroup, timing_mode=<TimingMode.CONTINUE: 2>, time_offset: float = 0, tool_asset: str = None) -> None:
226    async def log_trajectory(
227        self,
228        joint_trajectory: models.JointTrajectory,
229        tcp: str,
230        motion_group: MotionGroup,
231        timing_mode=TimingMode.CONTINUE,
232        time_offset: float = 0,
233        tool_asset: str = None,
234    ) -> None:
235        if len(joint_trajectory.joint_positions) == 0:
236            raise ValueError("No joint trajectory provided")
237        load_plan_response = await motion_group._load_planned_motion(joint_trajectory, tcp)
238        await self.log_motion(
239            load_plan_response.motion,
240            timing_mode=timing_mode,
241            time_offset=time_offset,
242            tool_asset=tool_asset,
243        )
def continue_after_sync(self) -> None:
245    def continue_after_sync(self) -> None:
246        continue_after_sync()
async def log_error_feedback( self, error_feedback: wandelbots_api_client.models.plan_trajectory_failed_response_error_feedback.PlanTrajectoryFailedResponseErrorFeedback) -> None:
248    async def log_error_feedback(
249        self, error_feedback: PlanTrajectoryFailedResponseErrorFeedback
250    ) -> None:
251        if isinstance(error_feedback.actual_instance, FeedbackOutOfWorkspace):
252            if (
253                error_feedback.actual_instance.invalid_tcp_pose
254                and error_feedback.actual_instance.invalid_tcp_pose.position
255            ):
256                position = error_feedback.actual_instance.invalid_tcp_pose.position
257                rr.log(
258                    "motion/errors/FeedbackOutOfWorkspace",
259                    rr.Points3D(
260                        [[position[0], position[1], position[2]]],
261                        radii=rr.Radius.ui_points([5.0]),
262                        colors=[(255, 0, 0, 255)],
263                        labels=["Out of Workspace"],
264                    ),
265                    static=True,
266                )
async def start_streaming(self, motion_group: nova.MotionGroup) -> None:
268    async def start_streaming(self, motion_group: MotionGroup) -> None:
269        """Start streaming real-time robot state to Rerun viewer."""
270        if motion_group in self._streaming_tasks:
271            return
273        task = asyncio.create_task(stream_motion_group(self, motion_group=motion_group))
274        self._streaming_tasks[motion_group] = task

Start streaming real-time robot state to Rerun viewer.

async def stop_streaming(self) -> None:
276    async def stop_streaming(self) -> None:
277        """Stop all streaming tasks."""
278        for task in self._streaming_tasks.values():
279            task.cancel()
280        self._streaming_tasks.clear()

Stop all streaming tasks.

async def log_actions( self, actions: list[nova.actions.Action | nova.actions.motions.CollisionFreeMotion] | nova.actions.Action, show_connection: bool = False) -> None:
282    async def log_actions(
283        self, actions: list[Action | CollisionFreeMotion] | Action, show_connection: bool = False
284    ) -> None:
285        from nova_rerun_bridge import trajectory
287        rr.reset_time()
288        rr.set_time_seconds(TIME_INTERVAL_NAME, trajectory._last_end_time)
290        if not isinstance(actions, list):
291            actions = [actions]
293        if len(actions) == 0:
294            raise ValueError("No actions provided")
296        # Collect poses from regular actions
297        regular_actions = [
298            action for action in actions if isinstance(action, (Motion, WriteAction))
299        ]
300        regular_poses = (
301            CombinedActions(items=tuple(regular_actions)).poses() if regular_actions else []
302        )
304        # Collect poses from CollisionFreeMotion targets
305        collision_free_poses = [
307            for action in actions
308            if isinstance(action, CollisionFreeMotion) and isinstance(, Pose)
309        ]
311        # Combine all poses
312        all_poses = regular_poses + collision_free_poses
313        positions = []
314        point_colors = []
315        use_red = False
317        # Collect all positions and determine colors
318        for i, action in enumerate(actions):
319            if isinstance(action, WriteAction):
320                use_red = True
321            if i < len(all_poses):  # Only process if there's a corresponding pose
322                pose = all_poses[i]
323                logger.debug(f"Pose: {pose}")
324                positions.append([pose.position.x, pose.position.y, pose.position.z])
325                point_colors.append(colors.colors[1] if use_red else colors.colors[9])
327        # Log all positions at once
328        rr.log(
329            "motion/actions",
330            rr.Points3D(positions, colors=point_colors, radii=rr.Radius.ui_points([5.0])),
331            static=True,
332        )
334        if show_connection:
335            rr.log(
336                "motion/actions/connection",
337                rr.LineStrips3D([positions], colors=[155, 155, 155, 50]),
338            )
async def cleanup(self) -> None:
352    async def cleanup(self) -> None:
353        """Cleanup resources and close Nova API client connection."""
354        if hasattr(self.nova, "_api_client"):
355            await self.nova._api_client.close()

Cleanup resources and close Nova API client connection.