nova_rerun_bridge
class
NovaRerunBridge:
33class NovaRerunBridge: 34 """Bridge between Nova and Rerun for visualization. 35 36 This class provides functionality to visualize Nova data in Rerun. 37 It handles trajectoy, collision scenes, blueprints and proper cleanup of resources. 38 39 Example: 40 ```python 41 from nova.core.nova import Nova 42 from nova_rerun_bridge import NovaRerunBridge 43 44 async def main(): 45 nova = Nova() 46 async with NovaRerunBridge(nova) as bridge: 47 await bridge.setup_blueprint() 48 await bridge.log_collision_scenes() 49 50 ``` 51 52 Args: 53 nova (Nova): Instance of Nova client 54 spawn (bool, optional): Whether to spawn Rerun viewer. Defaults to True. 55 """ 56 57 def __init__(self, nova: Nova, spawn: bool = True, recording_id=None) -> None: 58 self._ensure_models_exist() 59 self.nova = nova 60 self._streaming_tasks: dict[MotionGroup, asyncio.Task] = {} 61 if spawn: 62 recording_id = recording_id or f"nova_{datetime.now().strftime('%Y%m%d_%H%M%S')}" 63 rr.init(application_id="nova", recording_id=recording_id, spawn=True) 64 logger.add(sink=rr.LoggingHandler("logs/handler")) 65 66 def _ensure_models_exist(self): 67 """Ensure robot models are downloaded""" 68 models_dir = Path(get_project_root()) / "models" 69 if not models_dir.exists() or not list(models_dir.glob("*.glb")): 70 print("Models not found, run update_robot_models() or uv run download-models") 71 72 async def setup_blueprint(self) -> None: 73 """Configure and send blueprint configuration to Rerun. 74 75 Fetches motion groups from Nova and configures visualization layout. 76 """ 77 cell = self.nova.cell() 78 79 controllers = await cell.controllers() 80 motion_groups = [] 81 82 if not controllers: 83 logger.warning("No controllers found") 84 return 85 86 for controller in controllers: 87 for motion_group in await controller.activated_motion_groups(): 88 motion_groups.append(motion_group.motion_group_id) 89 90 rr.reset_time() 91 rr.set_time(TIME_INTERVAL_NAME, duration=0) 92 93 send_blueprint(motion_groups) 94 self.log_coordinate_system() 95 96 def log_coordinate_system(self) -> None: 97 """Log the coordinate system of the cell.""" 98 99 coordinate_origins = np.zeros((3, 3)) # Origin points for x, y, z arrows 100 coordinate_vectors = ( 101 np.array( 102 [ 103 [1.0, 0.0, 0.0], # X direction 104 [0.0, 1.0, 0.0], # Y direction 105 [0.0, 0.0, 1.0], # Z direction 106 ] 107 ) 108 * 200.0 109 ) # Scale factor of 200.0 for better visibility 110 111 coordinate_colors = np.array( 112 [ 113 [1.0, 0.125, 0.376, 1.0], # #ff2060 - Red/Pink for X 114 [0.125, 0.875, 0.502, 1.0], # #20df80 - Green for Y 115 [0.125, 0.502, 1.0, 1.0], # #2080ff - Blue for Z 116 ] 117 ) 118 119 rr.log( 120 "coordinate_system_world", 121 rr.Arrows3D( 122 origins=coordinate_origins, 123 vectors=coordinate_vectors, 124 colors=coordinate_colors, 125 radii=rr.Radius.ui_points([5.0]), 126 ), 127 static=True, 128 ) 129 130 async def log_collision_scenes(self) -> dict[str, models.CollisionScene]: 131 """Fetch and log all collision scenes from Nova to Rerun.""" 132 collision_scenes = ( 133 await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes( 134 cell=self.nova.cell()._cell_id 135 ) 136 ) 137 log_collision_scenes(collision_scenes) 138 return collision_scenes 139 140 async def log_collision_scene(self, scene_id: str) -> dict[str, models.CollisionScene]: 141 """Log a specific collision scene by its ID. 142 143 Args: 144 scene_id (str): The ID of the collision scene to log 145 146 Raises: 147 ValueError: If scene_id is not found in stored collision scenes 148 """ 149 collision_scenes = ( 150 await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes( 151 cell=self.nova.cell()._cell_id 152 ) 153 ) 154 155 if scene_id not in collision_scenes: 156 raise ValueError(f"Collision scene with ID {scene_id} not found") 157 158 log_collision_scenes({scene_id: collision_scenes[scene_id]}) 159 return {scene_id: collision_scenes[scene_id]} 160 161 def _log_collision_scene(self, collision_scenes: dict[str, models.CollisionScene]) -> None: 162 log_collision_scenes(collision_scenes=collision_scenes) 163 164 async def log_saftey_zones(self, motion_group: MotionGroup) -> None: 165 tcp_names = await motion_group.tcp_names() 166 tcp = tcp_names[0] 167 168 rr.reset_time() 169 rr.set_time(TIME_INTERVAL_NAME, duration=0) 170 171 log_safety_zones( 172 motion_group.motion_group_id, await motion_group._get_optimizer_setup(tcp=tcp) 173 ) 174 175 def log_saftey_zones_( 176 self, motion_group_id: str, optimizer_setup: models.OptimizerSetup 177 ) -> None: 178 log_safety_zones(motion_group_id, optimizer_setup) 179 180 async def log_motion( 181 self, 182 motion_id: str, 183 timing_mode=TimingMode.CONTINUE, 184 time_offset: float = 0, 185 tool_asset: Optional[str] = None, 186 ) -> None: 187 # Fetch motion details from api 188 motion = await self.nova._api_client.motion_api.get_planned_motion( 189 self.nova.cell()._cell_id, motion_id 190 ) 191 optimizer_config = ( 192 await self.nova._api_client.motion_group_infos_api.get_optimizer_configuration( 193 self.nova.cell()._cell_id, motion.motion_group 194 ) 195 ) 196 trajectory = await self.nova._api_client.motion_api.get_motion_trajectory( 197 self.nova.cell()._cell_id, motion_id, int(RECORDING_INTERVAL * 1000) 198 ) 199 200 motion_groups = await self.nova._api_client.motion_group_api.list_motion_groups( 201 self.nova.cell()._cell_id 202 ) 203 motion_motion_group = next( 204 (mg for mg in motion_groups.instances if mg.motion_group == motion.motion_group), None 205 ) 206 207 collision_scenes = ( 208 await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes( 209 cell=self.nova.cell()._cell_id 210 ) 211 ) 212 213 if motion_motion_group is None: 214 raise ValueError(f"Motion group {motion.motion_group} not found") 215 216 log_motion( 217 motion_id=motion_id, 218 model_from_controller=motion_motion_group.model_from_controller, 219 motion_group=motion.motion_group, 220 optimizer_config=optimizer_config, 221 trajectory=trajectory.trajectory, 222 collision_scenes=collision_scenes, 223 time_offset=time_offset, 224 timing_mode=timing_mode, 225 tool_asset=tool_asset, 226 ) 227 228 async def log_trajectory( 229 self, 230 joint_trajectory: models.JointTrajectory, 231 tcp: str, 232 motion_group: MotionGroup, 233 timing_mode=TimingMode.CONTINUE, 234 time_offset: float = 0, 235 tool_asset: str = None, 236 ) -> None: 237 if len(joint_trajectory.joint_positions) == 0: 238 raise ValueError("No joint trajectory provided") 239 load_plan_response = await motion_group._load_planned_motion(joint_trajectory, tcp) 240 await self.log_motion( 241 load_plan_response.motion, 242 timing_mode=timing_mode, 243 time_offset=time_offset, 244 tool_asset=tool_asset, 245 ) 246 247 def continue_after_sync(self) -> None: 248 continue_after_sync() 249 250 async def log_error_feedback( 251 self, error_feedback: PlanTrajectoryFailedResponseErrorFeedback 252 ) -> None: 253 if isinstance(error_feedback.actual_instance, FeedbackOutOfWorkspace): 254 if ( 255 error_feedback.actual_instance.invalid_tcp_pose 256 and error_feedback.actual_instance.invalid_tcp_pose.position 257 ): 258 position = error_feedback.actual_instance.invalid_tcp_pose.position 259 rr.log( 260 "motion/errors/FeedbackOutOfWorkspace", 261 rr.Points3D( 262 [[position[0], position[1], position[2]]], 263 radii=rr.Radius.ui_points([5.0]), 264 colors=[(255, 0, 0, 255)], 265 labels=["Out of Workspace"], 266 ), 267 static=True, 268 ) 269 270 if isinstance(error_feedback.actual_instance, FeedbackCollision): 271 collisions = error_feedback.actual_instance.collisions 272 if not collisions: 273 return 274 275 for i, collision in enumerate(collisions): 276 if collision.position_on_a is None or collision.position_on_b is None: 277 continue 278 if collision.position_on_a.world is None or collision.position_on_b.world is None: 279 continue 280 if collision.normal_world_on_b is None: 281 continue 282 283 # Extract positions 284 pos_a = collision.position_on_a.world 285 pos_b = collision.position_on_b.world 286 normal = collision.normal_world_on_b 287 288 # Scale normal for visibility 289 arrow_length = 50 290 291 # Log collision points 292 rr.log( 293 f"motion/errors/FeedbackCollision/collisions/point_{i}/a", 294 rr.Points3D( 295 [pos_a], radii=rr.Radius.ui_points([5.0]), colors=[(255, 0, 0, 255)] 296 ), 297 static=True, 298 ) 299 300 rr.log( 301 f"motion/errors/FeedbackCollision/collisions/point_{i}/b", 302 rr.Points3D( 303 [pos_b], radii=rr.Radius.ui_points([5.0]), colors=[(0, 0, 255, 255)] 304 ), 305 static=True, 306 ) 307 308 # Log normal vector as arrow 309 rr.log( 310 f"motion/errors/FeedbackCollision/collisions/normal_{i}", 311 rr.Arrows3D( 312 origins=[pos_b], 313 vectors=[ 314 [ 315 normal[0] * arrow_length, 316 normal[1] * arrow_length, 317 normal[2] * arrow_length, 318 ] 319 ], 320 colors=[(255, 255, 0, 255)], 321 ), 322 static=True, 323 ) 324 325 async def start_streaming(self, motion_group: MotionGroup) -> None: 326 """Start streaming real-time robot state to Rerun viewer.""" 327 if motion_group in self._streaming_tasks: 328 return 329 330 task = asyncio.create_task(stream_motion_group(self, motion_group=motion_group)) 331 self._streaming_tasks[motion_group] = task 332 333 async def stop_streaming(self) -> None: 334 """Stop all streaming tasks.""" 335 for task in self._streaming_tasks.values(): 336 task.cancel() 337 self._streaming_tasks.clear() 338 339 async def log_actions( 340 self, 341 actions: list[Action | CollisionFreeMotion] | Action, 342 show_connection: bool = False, 343 motion_group: Optional[MotionGroup] = None, 344 ) -> None: 345 from nova_rerun_bridge import trajectory 346 347 rr.reset_time() 348 rr.set_time(TIME_INTERVAL_NAME, duration=trajectory._last_end_time) 349 350 if not isinstance(actions, list): 351 actions = [actions] 352 353 if len(actions) == 0: 354 raise ValueError("No actions provided") 355 356 # Collect poses from regular actions 357 regular_actions = [ 358 action for action in actions if isinstance(action, (Motion, WriteAction)) 359 ] 360 regular_poses = ( 361 CombinedActions(items=tuple(regular_actions)).poses() if regular_actions else [] 362 ) 363 364 # Collect poses from CollisionFreeMotion targets 365 collision_free_poses = [ 366 action.target 367 for action in actions 368 if isinstance(action, CollisionFreeMotion) and isinstance(action.target, Pose) 369 ] 370 371 # Combine all poses 372 all_poses = regular_poses + collision_free_poses 373 positions = [] 374 point_colors = [] 375 use_red = False 376 377 # Collect all positions and determine colors 378 for i, action in enumerate(actions): 379 if isinstance(action, WriteAction): 380 use_red = True 381 if i < len(all_poses): # Only process if there's a corresponding pose 382 pose = all_poses[i] 383 logger.debug(f"Pose: {pose}") 384 positions.append([pose.position.x, pose.position.y, pose.position.z]) 385 point_colors.append(colors.colors[1] if use_red else colors.colors[9]) 386 387 enity_path = ( 388 f"motion/{motion_group.motion_group_id}/actions" if motion_group else "motion/actions" 389 ) 390 391 # Log all positions at once 392 rr.log( 393 enity_path, 394 rr.Points3D(positions, colors=point_colors, radii=rr.Radius.ui_points([5.0])), 395 static=True, 396 ) 397 398 if show_connection: 399 rr.log( 400 f"{enity_path}/connection", rr.LineStrips3D([positions], colors=[155, 155, 155, 50]) 401 ) 402 403 async def __aenter__(self) -> "NovaRerunBridge": 404 """Context manager entry point. 405 406 Returns: 407 NovaRerunBridge: Self reference for context manager usage. 408 """ 409 return self 410 411 async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: 412 """Context manager exit point, ensures cleanup.""" 413 await self.cleanup() 414 415 async def cleanup(self) -> None: 416 """Cleanup resources and close Nova API client connection.""" 417 if hasattr(self.nova, "_api_client"): 418 await self.nova._api_client.close()
Bridge between Nova and Rerun for visualization.
This class provides functionality to visualize Nova data in Rerun. It handles trajectoy, collision scenes, blueprints and proper cleanup of resources.
Example:
from nova.core.nova import Nova from nova_rerun_bridge import NovaRerunBridge async def main(): nova = Nova() async with NovaRerunBridge(nova) as bridge: await bridge.setup_blueprint() await bridge.log_collision_scenes()
Arguments:
- nova (Nova): Instance of Nova client
- spawn (bool, optional): Whether to spawn Rerun viewer. Defaults to True.
NovaRerunBridge(nova: nova.Nova, spawn: bool = True, recording_id=None)
57 def __init__(self, nova: Nova, spawn: bool = True, recording_id=None) -> None: 58 self._ensure_models_exist() 59 self.nova = nova 60 self._streaming_tasks: dict[MotionGroup, asyncio.Task] = {} 61 if spawn: 62 recording_id = recording_id or f"nova_{datetime.now().strftime('%Y%m%d_%H%M%S')}" 63 rr.init(application_id="nova", recording_id=recording_id, spawn=True) 64 logger.add(sink=rr.LoggingHandler("logs/handler"))
async def
setup_blueprint(self) -> None:
72 async def setup_blueprint(self) -> None: 73 """Configure and send blueprint configuration to Rerun. 74 75 Fetches motion groups from Nova and configures visualization layout. 76 """ 77 cell = self.nova.cell() 78 79 controllers = await cell.controllers() 80 motion_groups = [] 81 82 if not controllers: 83 logger.warning("No controllers found") 84 return 85 86 for controller in controllers: 87 for motion_group in await controller.activated_motion_groups(): 88 motion_groups.append(motion_group.motion_group_id) 89 90 rr.reset_time() 91 rr.set_time(TIME_INTERVAL_NAME, duration=0) 92 93 send_blueprint(motion_groups) 94 self.log_coordinate_system()
Configure and send blueprint configuration to Rerun.
Fetches motion groups from Nova and configures visualization layout.
def
log_coordinate_system(self) -> None:
96 def log_coordinate_system(self) -> None: 97 """Log the coordinate system of the cell.""" 98 99 coordinate_origins = np.zeros((3, 3)) # Origin points for x, y, z arrows 100 coordinate_vectors = ( 101 np.array( 102 [ 103 [1.0, 0.0, 0.0], # X direction 104 [0.0, 1.0, 0.0], # Y direction 105 [0.0, 0.0, 1.0], # Z direction 106 ] 107 ) 108 * 200.0 109 ) # Scale factor of 200.0 for better visibility 110 111 coordinate_colors = np.array( 112 [ 113 [1.0, 0.125, 0.376, 1.0], # #ff2060 - Red/Pink for X 114 [0.125, 0.875, 0.502, 1.0], # #20df80 - Green for Y 115 [0.125, 0.502, 1.0, 1.0], # #2080ff - Blue for Z 116 ] 117 ) 118 119 rr.log( 120 "coordinate_system_world", 121 rr.Arrows3D( 122 origins=coordinate_origins, 123 vectors=coordinate_vectors, 124 colors=coordinate_colors, 125 radii=rr.Radius.ui_points([5.0]), 126 ), 127 static=True, 128 )
Log the coordinate system of the cell.
async def
log_collision_scenes( self) -> dict[str, wandelbots_api_client.models.collision_scene.CollisionScene]:
130 async def log_collision_scenes(self) -> dict[str, models.CollisionScene]: 131 """Fetch and log all collision scenes from Nova to Rerun.""" 132 collision_scenes = ( 133 await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes( 134 cell=self.nova.cell()._cell_id 135 ) 136 ) 137 log_collision_scenes(collision_scenes) 138 return collision_scenes
Fetch and log all collision scenes from Nova to Rerun.
async def
log_collision_scene( self, scene_id: str) -> dict[str, wandelbots_api_client.models.collision_scene.CollisionScene]:
140 async def log_collision_scene(self, scene_id: str) -> dict[str, models.CollisionScene]: 141 """Log a specific collision scene by its ID. 142 143 Args: 144 scene_id (str): The ID of the collision scene to log 145 146 Raises: 147 ValueError: If scene_id is not found in stored collision scenes 148 """ 149 collision_scenes = ( 150 await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes( 151 cell=self.nova.cell()._cell_id 152 ) 153 ) 154 155 if scene_id not in collision_scenes: 156 raise ValueError(f"Collision scene with ID {scene_id} not found") 157 158 log_collision_scenes({scene_id: collision_scenes[scene_id]}) 159 return {scene_id: collision_scenes[scene_id]}
Log a specific collision scene by its ID.
Arguments:
- scene_id (str): The ID of the collision scene to log
Raises:
- ValueError: If scene_id is not found in stored collision scenes
164 async def log_saftey_zones(self, motion_group: MotionGroup) -> None: 165 tcp_names = await motion_group.tcp_names() 166 tcp = tcp_names[0] 167 168 rr.reset_time() 169 rr.set_time(TIME_INTERVAL_NAME, duration=0) 170 171 log_safety_zones( 172 motion_group.motion_group_id, await motion_group._get_optimizer_setup(tcp=tcp) 173 )
def
log_saftey_zones_( self, motion_group_id: str, optimizer_setup: wandelbots_api_client.models.optimizer_setup.OptimizerSetup) -> None:
async def
log_motion( self, motion_id: str, timing_mode=<TimingMode.CONTINUE: 2>, time_offset: float = 0, tool_asset: Optional[str] = None) -> None:
180 async def log_motion( 181 self, 182 motion_id: str, 183 timing_mode=TimingMode.CONTINUE, 184 time_offset: float = 0, 185 tool_asset: Optional[str] = None, 186 ) -> None: 187 # Fetch motion details from api 188 motion = await self.nova._api_client.motion_api.get_planned_motion( 189 self.nova.cell()._cell_id, motion_id 190 ) 191 optimizer_config = ( 192 await self.nova._api_client.motion_group_infos_api.get_optimizer_configuration( 193 self.nova.cell()._cell_id, motion.motion_group 194 ) 195 ) 196 trajectory = await self.nova._api_client.motion_api.get_motion_trajectory( 197 self.nova.cell()._cell_id, motion_id, int(RECORDING_INTERVAL * 1000) 198 ) 199 200 motion_groups = await self.nova._api_client.motion_group_api.list_motion_groups( 201 self.nova.cell()._cell_id 202 ) 203 motion_motion_group = next( 204 (mg for mg in motion_groups.instances if mg.motion_group == motion.motion_group), None 205 ) 206 207 collision_scenes = ( 208 await self.nova._api_client.store_collision_scenes_api.list_stored_collision_scenes( 209 cell=self.nova.cell()._cell_id 210 ) 211 ) 212 213 if motion_motion_group is None: 214 raise ValueError(f"Motion group {motion.motion_group} not found") 215 216 log_motion( 217 motion_id=motion_id, 218 model_from_controller=motion_motion_group.model_from_controller, 219 motion_group=motion.motion_group, 220 optimizer_config=optimizer_config, 221 trajectory=trajectory.trajectory, 222 collision_scenes=collision_scenes, 223 time_offset=time_offset, 224 timing_mode=timing_mode, 225 tool_asset=tool_asset, 226 )
async def
log_trajectory( self, joint_trajectory: wandelbots_api_client.models.joint_trajectory.JointTrajectory, tcp: str, motion_group: nova.MotionGroup, timing_mode=<TimingMode.CONTINUE: 2>, time_offset: float = 0, tool_asset: str = None) -> None:
228 async def log_trajectory( 229 self, 230 joint_trajectory: models.JointTrajectory, 231 tcp: str, 232 motion_group: MotionGroup, 233 timing_mode=TimingMode.CONTINUE, 234 time_offset: float = 0, 235 tool_asset: str = None, 236 ) -> None: 237 if len(joint_trajectory.joint_positions) == 0: 238 raise ValueError("No joint trajectory provided") 239 load_plan_response = await motion_group._load_planned_motion(joint_trajectory, tcp) 240 await self.log_motion( 241 load_plan_response.motion, 242 timing_mode=timing_mode, 243 time_offset=time_offset, 244 tool_asset=tool_asset, 245 )
async def
log_error_feedback( self, error_feedback: wandelbots_api_client.models.plan_trajectory_failed_response_error_feedback.PlanTrajectoryFailedResponseErrorFeedback) -> None:
250 async def log_error_feedback( 251 self, error_feedback: PlanTrajectoryFailedResponseErrorFeedback 252 ) -> None: 253 if isinstance(error_feedback.actual_instance, FeedbackOutOfWorkspace): 254 if ( 255 error_feedback.actual_instance.invalid_tcp_pose 256 and error_feedback.actual_instance.invalid_tcp_pose.position 257 ): 258 position = error_feedback.actual_instance.invalid_tcp_pose.position 259 rr.log( 260 "motion/errors/FeedbackOutOfWorkspace", 261 rr.Points3D( 262 [[position[0], position[1], position[2]]], 263 radii=rr.Radius.ui_points([5.0]), 264 colors=[(255, 0, 0, 255)], 265 labels=["Out of Workspace"], 266 ), 267 static=True, 268 ) 269 270 if isinstance(error_feedback.actual_instance, FeedbackCollision): 271 collisions = error_feedback.actual_instance.collisions 272 if not collisions: 273 return 274 275 for i, collision in enumerate(collisions): 276 if collision.position_on_a is None or collision.position_on_b is None: 277 continue 278 if collision.position_on_a.world is None or collision.position_on_b.world is None: 279 continue 280 if collision.normal_world_on_b is None: 281 continue 282 283 # Extract positions 284 pos_a = collision.position_on_a.world 285 pos_b = collision.position_on_b.world 286 normal = collision.normal_world_on_b 287 288 # Scale normal for visibility 289 arrow_length = 50 290 291 # Log collision points 292 rr.log( 293 f"motion/errors/FeedbackCollision/collisions/point_{i}/a", 294 rr.Points3D( 295 [pos_a], radii=rr.Radius.ui_points([5.0]), colors=[(255, 0, 0, 255)] 296 ), 297 static=True, 298 ) 299 300 rr.log( 301 f"motion/errors/FeedbackCollision/collisions/point_{i}/b", 302 rr.Points3D( 303 [pos_b], radii=rr.Radius.ui_points([5.0]), colors=[(0, 0, 255, 255)] 304 ), 305 static=True, 306 ) 307 308 # Log normal vector as arrow 309 rr.log( 310 f"motion/errors/FeedbackCollision/collisions/normal_{i}", 311 rr.Arrows3D( 312 origins=[pos_b], 313 vectors=[ 314 [ 315 normal[0] * arrow_length, 316 normal[1] * arrow_length, 317 normal[2] * arrow_length, 318 ] 319 ], 320 colors=[(255, 255, 0, 255)], 321 ), 322 static=True, 323 )
325 async def start_streaming(self, motion_group: MotionGroup) -> None: 326 """Start streaming real-time robot state to Rerun viewer.""" 327 if motion_group in self._streaming_tasks: 328 return 329 330 task = asyncio.create_task(stream_motion_group(self, motion_group=motion_group)) 331 self._streaming_tasks[motion_group] = task
Start streaming real-time robot state to Rerun viewer.
async def
stop_streaming(self) -> None:
333 async def stop_streaming(self) -> None: 334 """Stop all streaming tasks.""" 335 for task in self._streaming_tasks.values(): 336 task.cancel() 337 self._streaming_tasks.clear()
Stop all streaming tasks.
async def
log_actions( self, actions: list[nova.actions.Action | nova.actions.motions.CollisionFreeMotion] | nova.actions.Action, show_connection: bool = False, motion_group: Optional[nova.MotionGroup] = None) -> None:
339 async def log_actions( 340 self, 341 actions: list[Action | CollisionFreeMotion] | Action, 342 show_connection: bool = False, 343 motion_group: Optional[MotionGroup] = None, 344 ) -> None: 345 from nova_rerun_bridge import trajectory 346 347 rr.reset_time() 348 rr.set_time(TIME_INTERVAL_NAME, duration=trajectory._last_end_time) 349 350 if not isinstance(actions, list): 351 actions = [actions] 352 353 if len(actions) == 0: 354 raise ValueError("No actions provided") 355 356 # Collect poses from regular actions 357 regular_actions = [ 358 action for action in actions if isinstance(action, (Motion, WriteAction)) 359 ] 360 regular_poses = ( 361 CombinedActions(items=tuple(regular_actions)).poses() if regular_actions else [] 362 ) 363 364 # Collect poses from CollisionFreeMotion targets 365 collision_free_poses = [ 366 action.target 367 for action in actions 368 if isinstance(action, CollisionFreeMotion) and isinstance(action.target, Pose) 369 ] 370 371 # Combine all poses 372 all_poses = regular_poses + collision_free_poses 373 positions = [] 374 point_colors = [] 375 use_red = False 376 377 # Collect all positions and determine colors 378 for i, action in enumerate(actions): 379 if isinstance(action, WriteAction): 380 use_red = True 381 if i < len(all_poses): # Only process if there's a corresponding pose 382 pose = all_poses[i] 383 logger.debug(f"Pose: {pose}") 384 positions.append([pose.position.x, pose.position.y, pose.position.z]) 385 point_colors.append(colors.colors[1] if use_red else colors.colors[9]) 386 387 enity_path = ( 388 f"motion/{motion_group.motion_group_id}/actions" if motion_group else "motion/actions" 389 ) 390 391 # Log all positions at once 392 rr.log( 393 enity_path, 394 rr.Points3D(positions, colors=point_colors, radii=rr.Radius.ui_points([5.0])), 395 static=True, 396 ) 397 398 if show_connection: 399 rr.log( 400 f"{enity_path}/connection", rr.LineStrips3D([positions], colors=[155, 155, 155, 50]) 401 )