nova
1from nova import actions, api, types 2from nova.core.controller import Controller 3from nova.core.logging import logger 4from nova.core.motion_group import MotionGroup 5from nova.core.movement_controller import speed_up as speed_up_movement_controller 6from nova.core.nova import Cell, Nova 7from nova.types import MotionSettings 8from nova.version import version 9 10__version__ = version 11 12__all__ = [ 13 "Nova", 14 "Cell", 15 "MotionGroup", 16 "Controller", 17 "speed_up_movement_controller", 18 "api", 19 "types", 20 "actions", 21 "MotionSettings", 22 "logger", 23 "__version__", 24]
27class Nova: 28 """A high-level Nova client for interacting with robot cells and controllers.""" 29 30 _api_client: ApiGateway 31 32 def __init__( 33 self, 34 *, 35 host: str | None = None, 36 access_token: str | None = None, 37 username: str | None = None, 38 password: str | None = None, 39 version: str = "v1", 40 verify_ssl: bool = True, 41 ): 42 """ 43 Initialize the Nova client. 44 45 Args: 46 host (str | None): The Nova API host. 47 access_token (str | None): An access token for the Nova API. 48 username (str | None): Username to authenticate with the Nova API. 49 password (str | None): Password to authenticate with the Nova API. 50 version (str): The API version to use (default: "v1"). 51 verify_ssl (bool): Whether or not to verify SSL certificates (default: True). 52 """ 53 54 self._api_client = ApiGateway( 55 host=host, 56 access_token=access_token, 57 username=username, 58 password=password, 59 version=version, 60 verify_ssl=verify_ssl, 61 ) 62 63 def cell(self, cell_id: str = CELL_NAME) -> Cell: 64 """Returns the cell object with the given ID.""" 65 return Cell(self._api_client, cell_id) 66 67 async def close(self): 68 """Closes the underlying API client session.""" 69 return await self._api_client.close() 70 71 async def __aenter__(self): 72 return self 73 74 async def __aexit__(self, exc_type, exc_val, exc_tb): 75 await self.close()
A high-level Nova client for interacting with robot cells and controllers.
32 def __init__( 33 self, 34 *, 35 host: str | None = None, 36 access_token: str | None = None, 37 username: str | None = None, 38 password: str | None = None, 39 version: str = "v1", 40 verify_ssl: bool = True, 41 ): 42 """ 43 Initialize the Nova client. 44 45 Args: 46 host (str | None): The Nova API host. 47 access_token (str | None): An access token for the Nova API. 48 username (str | None): Username to authenticate with the Nova API. 49 password (str | None): Password to authenticate with the Nova API. 50 version (str): The API version to use (default: "v1"). 51 verify_ssl (bool): Whether or not to verify SSL certificates (default: True). 52 """ 53 54 self._api_client = ApiGateway( 55 host=host, 56 access_token=access_token, 57 username=username, 58 password=password, 59 version=version, 60 verify_ssl=verify_ssl, 61 )
Initialize the Nova client.
Arguments:
- host (str | None): The Nova API host.
- access_token (str | None): An access token for the Nova API.
- username (str | None): Username to authenticate with the Nova API.
- password (str | None): Password to authenticate with the Nova API.
- version (str): The API version to use (default: "v1").
- verify_ssl (bool): Whether or not to verify SSL certificates (default: True).
78class Cell: 79 """A representation of a robot cell, providing high-level operations on controllers.""" 80 81 def __init__(self, api_gateway: ApiGateway, cell_id: str): 82 """ 83 Initializes a Cell instance. 84 Args: 85 api_gateway (ApiGateway): The underlying gateway for making API calls. 86 cell_id (str): The unique identifier for the cell. 87 """ 88 self._api_gateway = api_gateway 89 self._cell_id = cell_id 90 91 @property 92 def cell_id(self) -> str: 93 """ 94 Returns unique identifier for this cell. 95 Returns: 96 str: The cell ID. 97 """ 98 return self._cell_id 99 100 def _create_controller(self, controller_id: str) -> Controller: 101 return Controller( 102 configuration=Controller.Configuration( 103 nova_api=self._api_gateway.host, 104 nova_access_token=self._api_gateway.access_token, 105 nova_username=self._api_gateway.username, 106 nova_password=self._api_gateway.password, 107 cell_id=self._cell_id, 108 controller_id=controller_id, 109 id=controller_id, 110 ) 111 ) 112 113 async def add_virtual_robot_controller( 114 self, 115 name: str, 116 controller_type: api.models.VirtualControllerTypes, 117 controller_manufacturer: api.models.Manufacturer, 118 timeout: int = 25, 119 position: str | None = None, 120 ) -> Controller: 121 """Add a virtual robot controller to the cell.""" 122 if position is None: 123 position = str(MANUFACTURER_HOME_POSITIONS.get(controller_manufacturer, [0.0] * 7)) 124 125 controller_instance = await self._api_gateway.add_virtual_robot_controller( 126 cell=self._cell_id, 127 name=name, 128 controller_type=controller_type, 129 controller_manufacturer=controller_manufacturer, 130 timeout=timeout, 131 position=position, 132 ) 133 return self._create_controller(controller_instance.controller) 134 135 async def ensure_virtual_robot_controller( 136 self, 137 name: str, 138 controller_type: api.models.VirtualControllerTypes, 139 controller_manufacturer: api.models.Manufacturer, 140 timeout: int = 25, 141 ) -> Controller: 142 """ 143 Ensure a virtual robot controller with the given name exists. 144 If the controller already exists, it is returned. Otherwise, it is created. 145 Args: 146 name (str): The name of the controller. 147 controller_type (api.models.VirtualControllerTypes): The type of virtual controller. 148 controller_manufacturer (api.models.Manufacturer): The manufacturer of the controller. 149 Returns: 150 Controller: The existing or newly created Controller object. 151 """ 152 controller_instance = await self._api_gateway.get_controller_instance( 153 cell=self.cell_id, name=name 154 ) 155 if controller_instance: 156 return self._create_controller(controller_instance.controller) 157 return await self.add_virtual_robot_controller( 158 name, controller_type, controller_manufacturer, timeout=timeout 159 ) 160 161 async def controllers(self) -> list[Controller]: 162 """ 163 List all controllers for this cell. 164 Returns: 165 list[Controller]: A list of Controller objects associated with this cell. 166 """ 167 instances = await self._api_gateway.list_controllers(cell=self._cell_id) 168 return [self._create_controller(ci.controller) for ci in instances] 169 170 async def controller(self, name: str) -> Controller: 171 """ 172 Retrieve a specific controller by name. 173 Args: 174 name (str): The name of the controller. 175 Returns: 176 Controller: The Controller object. 177 Raises: 178 ControllerNotFound: If no controller with the specified name exists. 179 """ 180 controller_instance = await self._api_gateway.get_controller_instance( 181 cell=self._cell_id, name=name 182 ) 183 if not controller_instance: 184 raise ControllerNotFound(controller=name) 185 return self._create_controller(controller_instance.controller) 186 187 async def delete_robot_controller(self, name: str, timeout: int = 25): 188 """ 189 Delete a robot controller from the cell. 190 Args: 191 name (str): The name of the controller to delete. 192 timeout (int): The time to wait for the controller deletion to complete (default: 25). 193 """ 194 await self._api_gateway.delete_robot_controller( 195 cell=self._cell_id, controller=name, completion_timeout=timeout 196 ) 197 198 async def get_robot_cell(self) -> RobotCell: 199 """ 200 Return a RobotCell object containing all known controllers. 201 Returns: 202 RobotCell: A RobotCell initialized with the available controllers. 203 """ 204 controllers = await self.controllers() 205 return RobotCell(timer=None, **{controller.id: controller for controller in controllers})
A representation of a robot cell, providing high-level operations on controllers.
81 def __init__(self, api_gateway: ApiGateway, cell_id: str): 82 """ 83 Initializes a Cell instance. 84 Args: 85 api_gateway (ApiGateway): The underlying gateway for making API calls. 86 cell_id (str): The unique identifier for the cell. 87 """ 88 self._api_gateway = api_gateway 89 self._cell_id = cell_id
Initializes a Cell instance.
Arguments:
- api_gateway (ApiGateway): The underlying gateway for making API calls.
- cell_id (str): The unique identifier for the cell.
91 @property 92 def cell_id(self) -> str: 93 """ 94 Returns unique identifier for this cell. 95 Returns: 96 str: The cell ID. 97 """ 98 return self._cell_id
Returns unique identifier for this cell.
Returns:
str: The cell ID.
113 async def add_virtual_robot_controller( 114 self, 115 name: str, 116 controller_type: api.models.VirtualControllerTypes, 117 controller_manufacturer: api.models.Manufacturer, 118 timeout: int = 25, 119 position: str | None = None, 120 ) -> Controller: 121 """Add a virtual robot controller to the cell.""" 122 if position is None: 123 position = str(MANUFACTURER_HOME_POSITIONS.get(controller_manufacturer, [0.0] * 7)) 124 125 controller_instance = await self._api_gateway.add_virtual_robot_controller( 126 cell=self._cell_id, 127 name=name, 128 controller_type=controller_type, 129 controller_manufacturer=controller_manufacturer, 130 timeout=timeout, 131 position=position, 132 ) 133 return self._create_controller(controller_instance.controller)
Add a virtual robot controller to the cell.
135 async def ensure_virtual_robot_controller( 136 self, 137 name: str, 138 controller_type: api.models.VirtualControllerTypes, 139 controller_manufacturer: api.models.Manufacturer, 140 timeout: int = 25, 141 ) -> Controller: 142 """ 143 Ensure a virtual robot controller with the given name exists. 144 If the controller already exists, it is returned. Otherwise, it is created. 145 Args: 146 name (str): The name of the controller. 147 controller_type (api.models.VirtualControllerTypes): The type of virtual controller. 148 controller_manufacturer (api.models.Manufacturer): The manufacturer of the controller. 149 Returns: 150 Controller: The existing or newly created Controller object. 151 """ 152 controller_instance = await self._api_gateway.get_controller_instance( 153 cell=self.cell_id, name=name 154 ) 155 if controller_instance: 156 return self._create_controller(controller_instance.controller) 157 return await self.add_virtual_robot_controller( 158 name, controller_type, controller_manufacturer, timeout=timeout 159 )
Ensure a virtual robot controller with the given name exists. If the controller already exists, it is returned. Otherwise, it is created.
Arguments:
- name (str): The name of the controller.
- controller_type (api.models.VirtualControllerTypes): The type of virtual controller.
- controller_manufacturer (api.models.Manufacturer): The manufacturer of the controller.
Returns:
Controller: The existing or newly created Controller object.
161 async def controllers(self) -> list[Controller]: 162 """ 163 List all controllers for this cell. 164 Returns: 165 list[Controller]: A list of Controller objects associated with this cell. 166 """ 167 instances = await self._api_gateway.list_controllers(cell=self._cell_id) 168 return [self._create_controller(ci.controller) for ci in instances]
List all controllers for this cell.
Returns:
list[Controller]: A list of Controller objects associated with this cell.
170 async def controller(self, name: str) -> Controller: 171 """ 172 Retrieve a specific controller by name. 173 Args: 174 name (str): The name of the controller. 175 Returns: 176 Controller: The Controller object. 177 Raises: 178 ControllerNotFound: If no controller with the specified name exists. 179 """ 180 controller_instance = await self._api_gateway.get_controller_instance( 181 cell=self._cell_id, name=name 182 ) 183 if not controller_instance: 184 raise ControllerNotFound(controller=name) 185 return self._create_controller(controller_instance.controller)
Retrieve a specific controller by name.
Arguments:
- name (str): The name of the controller.
Returns:
Controller: The Controller object.
Raises:
- ControllerNotFound: If no controller with the specified name exists.
187 async def delete_robot_controller(self, name: str, timeout: int = 25): 188 """ 189 Delete a robot controller from the cell. 190 Args: 191 name (str): The name of the controller to delete. 192 timeout (int): The time to wait for the controller deletion to complete (default: 25). 193 """ 194 await self._api_gateway.delete_robot_controller( 195 cell=self._cell_id, controller=name, completion_timeout=timeout 196 )
Delete a robot controller from the cell.
Arguments:
- name (str): The name of the controller to delete.
- timeout (int): The time to wait for the controller deletion to complete (default: 25).
198 async def get_robot_cell(self) -> RobotCell: 199 """ 200 Return a RobotCell object containing all known controllers. 201 Returns: 202 RobotCell: A RobotCell initialized with the available controllers. 203 """ 204 controllers = await self.controllers() 205 return RobotCell(timer=None, **{controller.id: controller for controller in controllers})
Return a RobotCell object containing all known controllers.
Returns:
RobotCell: A RobotCell initialized with the available controllers.
81class MotionGroup(AbstractRobot): 82 """Manages motion planning and execution within a specified motion group.""" 83 84 def __init__(self, api_gateway: ApiGateway, cell: str, motion_group_id: str): 85 """ 86 Initializes a new MotionGroup instance. 87 88 Args: 89 api_gateway (ApiGateway): The API gateway through which motion commands are sent. 90 cell (str): The name or identifier of the robotic cell. 91 motion_group_id (str): The identifier of the motion group. 92 """ 93 self._api_gateway = api_gateway 94 self._cell = cell 95 self._motion_group_id = motion_group_id 96 self._current_motion: str | None = None 97 self._optimizer_setup: wb.models.OptimizerSetup | None = None 98 super().__init__(id=motion_group_id) 99 100 async def open(self): 101 await self._api_gateway.activate_motion_group( 102 cell=self._cell, motion_group_id=self._motion_group_id 103 ) 104 return self 105 106 async def close(self): 107 # RPS-1174: when a motion group is deactivated, RAE closes all open connections 108 # this behaviour is not desired in some cases, 109 # so for now we will not deactivate for the user 110 pass 111 112 @property 113 def motion_group_id(self) -> str: 114 """ 115 Returns: 116 str: The unique identifier for this motion group. 117 """ 118 return self._motion_group_id 119 120 @property 121 def current_motion(self) -> str | None: 122 # if not self._current_motion: 123 # raise ValueError("No MotionId attached. There is no planned motion available.") 124 return self._current_motion 125 126 async def _plan_with_collision_check( 127 self, 128 actions: list[Action], 129 tcp: str, 130 start_joint_position: tuple[float, ...] | None = None, 131 optimizer_setup: wb.models.OptimizerSetup | None = None, 132 ) -> wb.models.JointTrajectory: 133 """ 134 This method plans a trajectory and checks for collisions. 135 The collision check only happens if the actions have collision scene data. 136 137 You must provide the exact same collision data into all the actions. 138 Because the underlying API supports collision checks for the whole trajectory only. 139 140 Raises: 141 InconsistentCollisionScenes: If the collision scene data is not consistent across all actions 142 143 Your actions should follow below rules to be considered consistent: 144 1- They all should have the same collision scene data 145 2- They all should have no collision data 146 147 PlanTrajectoryFailed: If the trajectory planning failed including the collision check 148 149 For more information about this API, please refer to the plan_trajectory in the API documentation. 150 151 Args: 152 actions: list of actions to plan, current supported actions are Motion and WriteActions 153 WriteAction you specify on your path is handled in a performant way. 154 Please check execute_trajectory.motion_command.set_io for more information. 155 tcp: The tool to use 156 start_joint_position: The starting joint position, if none provided, current position of the robot is used 157 optimizer_setup: The optimizer setup 158 159 Returns: planned joint trajectory 160 161 """ 162 # PREPARE THE REQUEST 163 collision_scenes = self._validate_collision_scenes(actions) 164 start_joint_position = start_joint_position or await self.joints() 165 robot_setup = optimizer_setup or await self._get_optimizer_setup(tcp=tcp) 166 167 motion_commands = CombinedActions(items=tuple(actions)).to_motion_command() # type: ignore 168 169 static_colliders = None 170 collision_motion_group = None 171 if collision_scenes and len(collision_scenes) > 0: 172 static_colliders = collision_scenes[0].colliders 173 174 motion_group_type = robot_setup.motion_group_type 175 if ( 176 collision_scenes[0].motion_groups 177 and motion_group_type in collision_scenes[0].motion_groups 178 ): 179 collision_motion_group = collision_scenes[0].motion_groups[motion_group_type] 180 181 request = wb.models.PlanTrajectoryRequest( 182 robot_setup=robot_setup, 183 start_joint_position=list(start_joint_position), 184 motion_commands=motion_commands, 185 static_colliders=static_colliders, 186 collision_motion_group=collision_motion_group, 187 ) 188 189 return await self._api_gateway.plan_trajectory( 190 cell=self._cell, motion_group_id=self.motion_group_id, request=request 191 ) 192 193 def _validate_collision_scenes(self, actions: list[Action]) -> list[models.CollisionScene]: 194 motion_count = len([action for action in actions if isinstance(action, Motion)]) 195 collision_scenes = [ 196 action.collision_scene 197 for action in actions 198 if isinstance(action, CollisionFreeMotion) and action.collision_scene is not None 199 ] 200 201 if len(collision_scenes) != 0 and len(collision_scenes) != motion_count: 202 raise InconsistentCollisionScenes( 203 "Only some of the actions have collision scene. Either specify it for all or none." 204 ) 205 206 # If a collision scene is provided, the same should be provided for all the collision scene 207 if len(collision_scenes) > 1: 208 first_scene = collision_scenes[0] 209 if not all( 210 compare_collision_scenes(first_scene, scene) for scene in collision_scenes[1:] 211 ): 212 raise InconsistentCollisionScenes( 213 "All actions must use the same collision scene but some are different" 214 ) 215 216 return collision_scenes 217 218 # TODO: we get the optimizer setup from as an input because 219 # it has a velocity setting which is used in collision free movement, I need to double check this 220 async def _plan_collision_free( 221 self, 222 action: CollisionFreeMotion, 223 tcp: str, 224 start_joint_position: list[float], 225 optimizer_setup: wb.models.OptimizerSetup | None = None, 226 ) -> wb.models.JointTrajectory: 227 """ 228 This method plans a trajectory and avoids collisions. 229 This means if there is a collision along the way to the target pose or joint positions, 230 It will adjust the trajectory to avoid the collision. 231 232 The collision check only happens if the action have collision scene data. 233 234 For more information about this API, please refer to the plan_collision_free_ptp in the API documentation. 235 236 Args: 237 action: The target pose or joint positions to reach 238 tcp: The tool to use 239 start_joint_position: The starting joint position, if none provided, current position of the robot is used 240 optimizer_setup: The optimizer setup 241 242 Returns: planned joint trajectory 243 244 245 """ 246 target = wb.models.PlanCollisionFreePTPRequestTarget( 247 **action.model_dump(exclude_unset=True) 248 ) 249 robot_setup = optimizer_setup or await self._get_optimizer_setup(tcp=tcp) 250 251 static_colliders = None 252 collision_motion_group = None 253 collision_scene = action.collision_scene 254 if collision_scene and collision_scene.colliders: 255 static_colliders = collision_scene.colliders 256 257 if ( 258 collision_scene.motion_groups 259 and robot_setup.motion_group_type in collision_scene.motion_groups 260 ): 261 collision_motion_group = collision_scene.motion_groups[ 262 robot_setup.motion_group_type 263 ] 264 265 request: wb.models.PlanCollisionFreePTPRequest = wb.models.PlanCollisionFreePTPRequest( 266 robot_setup=robot_setup, 267 start_joint_position=start_joint_position, 268 target=target, 269 static_colliders=static_colliders, 270 collision_motion_group=collision_motion_group, 271 ) 272 273 return await self._api_gateway.plan_collision_free_ptp( 274 cell=self._cell, motion_group_id=self.motion_group_id, request=request 275 ) 276 277 async def _plan( 278 self, 279 actions: list[Action], 280 tcp: str, 281 start_joint_position: tuple[float, ...] | None = None, 282 optimizer_setup: wb.models.OptimizerSetup | None = None, 283 ) -> wb.models.JointTrajectory: 284 if not actions: 285 raise ValueError("No actions provided") 286 287 current_joints = start_joint_position or await self.joints() 288 robot_setup = optimizer_setup or await self._get_optimizer_setup(tcp=tcp) 289 290 all_trajectories = [] 291 for batch in split_actions_into_batches(actions): 292 if len(batch) == 0: 293 raise ValueError("Empty batch of actions") 294 295 if isinstance(batch[0], CollisionFreeMotion): 296 motion: CollisionFreeMotion = cast(CollisionFreeMotion, batch[0]) 297 trajectory = await self._plan_collision_free( 298 action=motion, 299 tcp=tcp, 300 start_joint_position=list(current_joints), 301 optimizer_setup=robot_setup, 302 ) 303 all_trajectories.append(trajectory) 304 # the last joint position of this trajectory is the starting point for the next one 305 current_joints = tuple(trajectory.joint_positions[-1].joints) 306 else: 307 trajectory = await self._plan_with_collision_check( 308 actions=batch, 309 tcp=tcp, 310 start_joint_position=current_joints, 311 optimizer_setup=robot_setup, 312 ) 313 all_trajectories.append(trajectory) 314 # the last joint position of this trajectory is the starting point for the next one 315 current_joints = tuple(trajectory.joint_positions[-1].joints) 316 317 return combine_trajectories(all_trajectories) 318 319 # TODO: refactor and simplify code, tests are already there 320 # TODO: split into batches when the collision scene changes in a batch of collision free motions 321 322 async def _execute( 323 self, 324 joint_trajectory: wb.models.JointTrajectory, 325 tcp: str, 326 actions: list[Action], 327 movement_controller: MovementController | None, 328 ) -> AsyncIterable[MovementResponse]: 329 if movement_controller is None: 330 movement_controller = move_forward 331 332 # Load planned trajectory 333 load_plan_response = await self._load_planned_motion(joint_trajectory, tcp) 334 335 # Move to start position 336 number_of_joints = await self._api_gateway.get_joint_number( 337 cell=self._cell, motion_group_id=self.motion_group_id 338 ) 339 joints_velocities = [MAX_JOINT_VELOCITY_PREPARE_MOVE] * number_of_joints 340 movement_stream = await self.move_to_start_position(joints_velocities, load_plan_response) 341 342 # If there's an initial consumer, feed it the data 343 async for move_to_response in movement_stream: 344 # TODO: refactor 345 if ( 346 move_to_response.state is None 347 or move_to_response.state.motion_groups is None 348 or len(move_to_response.state.motion_groups) == 0 349 or move_to_response.move_response is None 350 or move_to_response.move_response.current_location_on_trajectory is None 351 ): 352 continue 353 354 yield move_to_response 355 356 controller = movement_controller( 357 MovementControllerContext( 358 combined_actions=CombinedActions(items=tuple(actions)), # type: ignore 359 motion_id=load_plan_response.motion, 360 ) 361 ) 362 363 def stop_condition(response: wb.models.ExecuteTrajectoryResponse) -> bool: 364 instance = response.actual_instance 365 # Stop when standstill indicates motion ended 366 return ( 367 isinstance(instance, wb.models.Standstill) 368 and instance.standstill.reason == wb.models.StandstillReason.REASON_MOTION_ENDED 369 ) 370 371 execute_response_streaming_controller = StreamExtractor(controller, stop_condition) 372 execution_task = asyncio.create_task( 373 self._api_gateway.motion_api.execute_trajectory( 374 cell=self._cell, client_request_generator=execute_response_streaming_controller 375 ) 376 ) 377 async for execute_response in execute_response_streaming_controller: 378 yield execute_response 379 await execution_task 380 381 async def _get_optimizer_setup(self, tcp: str) -> wb.models.OptimizerSetup: 382 # TODO: mypy failed on main branch, need to check 383 if self._optimizer_setup is None or self._optimizer_setup.tcp != tcp: # type: ignore 384 self._optimizer_setup = await self._api_gateway.get_optimizer_config( 385 cell=self._cell, motion_group_id=self.motion_group_id, tcp=tcp 386 ) 387 388 return self._optimizer_setup 389 390 async def _load_planned_motion( 391 self, joint_trajectory: wb.models.JointTrajectory, tcp: str 392 ) -> wb.models.PlanSuccessfulResponse: 393 return await self._api_gateway.load_planned_motion( 394 cell=self._cell, 395 motion_group_id=self.motion_group_id, 396 joint_trajectory=joint_trajectory, 397 tcp=tcp, 398 ) 399 400 async def move_to_start_position( 401 self, joint_velocities, load_plan_response: LoadPlanResponse 402 ) -> InitialMovementStream: 403 limit_override = wb.models.LimitsOverride() 404 if joint_velocities is not None: 405 limit_override.joint_velocity_limits = wb.models.Joints(joints=joint_velocities) 406 407 return self._api_gateway.stream_move_to_trajectory_via_join_ptp( 408 cell=self._cell, 409 motion_id=load_plan_response.motion, 410 location_on_trajectory=0, 411 joint_velocity_limits=limit_override.joint_velocity_limits, 412 ) 413 414 async def stop(self): 415 logger.debug(f"Stopping motion of {self}...") 416 try: 417 if self._current_motion is None: 418 raise ValueError("No motion to stop") 419 await self._api_gateway.stop_motion(cell=self._cell, motion_id=self._current_motion) 420 logger.debug(f"Motion {self.current_motion} stopped.") 421 except ValueError as e: 422 logger.debug(f"No motion to stop for {self}: {e}") 423 424 async def get_state(self, tcp: str | None = None) -> RobotState: 425 response = await self._api_gateway.get_motion_group_state( 426 cell=self._cell, motion_group_id=self.motion_group_id, tcp=tcp 427 ) 428 return RobotState( 429 pose=Pose(response.state.tcp_pose), joints=tuple(response.state.joint_position.joints) 430 ) 431 432 async def joints(self) -> tuple: 433 state = await self.get_state() 434 if state.joints is None: 435 raise ValueError( 436 f"No joint positions available for motion group {self._motion_group_id}" 437 ) 438 return state.joints 439 440 async def tcp_pose(self, tcp: str | None = None) -> Pose: 441 state = await self.get_state(tcp=tcp) 442 return state.pose 443 444 async def tcps(self) -> list[wb.models.RobotTcp]: 445 response = await self._api_gateway.list_tcps( 446 cell=self._cell, motion_group_id=self.motion_group_id 447 ) 448 return response.tcps if response.tcps else [] 449 450 async def tcp_names(self) -> list[str]: 451 return [tcp.id for tcp in await self.tcps()] 452 453 async def active_tcp(self) -> wb.models.RobotTcp: 454 active_tcp = await self._api_gateway.get_active_tcp( 455 cell=self._cell, motion_group_id=self.motion_group_id 456 ) 457 return active_tcp 458 459 async def active_tcp_name(self) -> str: 460 active_tcp = await self.active_tcp() 461 return active_tcp.id
Manages motion planning and execution within a specified motion group.
84 def __init__(self, api_gateway: ApiGateway, cell: str, motion_group_id: str): 85 """ 86 Initializes a new MotionGroup instance. 87 88 Args: 89 api_gateway (ApiGateway): The API gateway through which motion commands are sent. 90 cell (str): The name or identifier of the robotic cell. 91 motion_group_id (str): The identifier of the motion group. 92 """ 93 self._api_gateway = api_gateway 94 self._cell = cell 95 self._motion_group_id = motion_group_id 96 self._current_motion: str | None = None 97 self._optimizer_setup: wb.models.OptimizerSetup | None = None 98 super().__init__(id=motion_group_id)
Initializes a new MotionGroup instance.
Arguments:
- api_gateway (ApiGateway): The API gateway through which motion commands are sent.
- cell (str): The name or identifier of the robotic cell.
- motion_group_id (str): The identifier of the motion group.
100 async def open(self): 101 await self._api_gateway.activate_motion_group( 102 cell=self._cell, motion_group_id=self._motion_group_id 103 ) 104 return self
Allocates the external hardware resource (i.e. establish a connection)
106 async def close(self): 107 # RPS-1174: when a motion group is deactivated, RAE closes all open connections 108 # this behaviour is not desired in some cases, 109 # so for now we will not deactivate for the user 110 pass
Release the external hardware (i.e. close connection or set mode of external hardware back)
112 @property 113 def motion_group_id(self) -> str: 114 """ 115 Returns: 116 str: The unique identifier for this motion group. 117 """ 118 return self._motion_group_id
Returns: str: The unique identifier for this motion group.
400 async def move_to_start_position( 401 self, joint_velocities, load_plan_response: LoadPlanResponse 402 ) -> InitialMovementStream: 403 limit_override = wb.models.LimitsOverride() 404 if joint_velocities is not None: 405 limit_override.joint_velocity_limits = wb.models.Joints(joints=joint_velocities) 406 407 return self._api_gateway.stream_move_to_trajectory_via_join_ptp( 408 cell=self._cell, 409 motion_id=load_plan_response.motion, 410 location_on_trajectory=0, 411 joint_velocity_limits=limit_override.joint_velocity_limits, 412 )
414 async def stop(self): 415 logger.debug(f"Stopping motion of {self}...") 416 try: 417 if self._current_motion is None: 418 raise ValueError("No motion to stop") 419 await self._api_gateway.stop_motion(cell=self._cell, motion_id=self._current_motion) 420 logger.debug(f"Motion {self.current_motion} stopped.") 421 except ValueError as e: 422 logger.debug(f"No motion to stop for {self}: {e}")
Stop behaviour of the robot
424 async def get_state(self, tcp: str | None = None) -> RobotState: 425 response = await self._api_gateway.get_motion_group_state( 426 cell=self._cell, motion_group_id=self.motion_group_id, tcp=tcp 427 ) 428 return RobotState( 429 pose=Pose(response.state.tcp_pose), joints=tuple(response.state.joint_position.joints) 430 )
Current state (pose, joints) of the robot based on the tcp.
Arguments:
- tcp (str): The id of the tool center point (TCP) to be used for tcp_pose in response. If not set, the flange pose is returned as tcp_pose.
Returns: the current state of the robot
432 async def joints(self) -> tuple: 433 state = await self.get_state() 434 if state.joints is None: 435 raise ValueError( 436 f"No joint positions available for motion group {self._motion_group_id}" 437 ) 438 return state.joints
Return the current joint positions of the robot
Returns: the current joint positions
440 async def tcp_pose(self, tcp: str | None = None) -> Pose: 441 state = await self.get_state(tcp=tcp) 442 return state.pose
Return the current pose of the robot based on the tcp
Arguments:
- tcp (str): The id of the tool center point (TCP) to be used for tcp_pose in response. If not set, the flange pose is returned as tcp_pose.
Returns: the current pose of the robot
444 async def tcps(self) -> list[wb.models.RobotTcp]: 445 response = await self._api_gateway.list_tcps( 446 cell=self._cell, motion_group_id=self.motion_group_id 447 ) 448 return response.tcps if response.tcps else []
Return all TCPs that are configured on the robot with corresponding offset from flange as pose
Returns: the TCPs of the robot
Return the name of all TCPs that are configured on the robot
Returns: a list of all TCPs
453 async def active_tcp(self) -> wb.models.RobotTcp: 454 active_tcp = await self._api_gateway.get_active_tcp( 455 cell=self._cell, motion_group_id=self.motion_group_id 456 ) 457 return active_tcp
Return the active TCP of the robot
Returns: the active TCP
12class Controller(Sized, AbstractController, NovaDevice, IODevice): 13 """ 14 Represents a Nova controller, managing motion groups and IO interactions. 15 """ 16 17 class Configuration(NovaDevice.Configuration): 18 type: Literal["controller"] = "controller" 19 id: str = "controller" 20 cell_id: str 21 controller_id: str 22 23 def __init__(self, configuration: Configuration): 24 super().__init__(configuration) 25 self._activated_motion_group_ids: list[str] = [] 26 self._io_access = IOAccess( 27 api_gateway=self._nova_api, 28 cell=self.configuration.cell_id, 29 controller_id=self.configuration.controller_id, 30 ) 31 32 @property 33 def controller_id(self) -> str: 34 """Returns the unique controller ID.""" 35 return self.configuration.controller_id 36 37 async def open(self): 38 """Activates all motion groups.""" 39 motion_group_ids = await self.activated_motion_group_ids() 40 self._activated_motion_group_ids = motion_group_ids 41 logger.info(f"Found motion group {motion_group_ids}") 42 return self 43 44 async def close(self): 45 # RPS-1174: when a motion group is deactivated, RAE closes all open connections 46 # this behaviour is not desired in some cases, 47 # so for now we will not deactivate for the user 48 pass 49 50 def __len__(self) -> int: 51 return len(self._activated_motion_group_ids) 52 53 def motion_group(self, motion_group_id: str) -> MotionGroup: 54 """Returns motion group with specific id. 55 56 Args: 57 motion_group_id (str): The ID of the motion group. 58 59 Returns: 60 MotionGroup: A MotionGroup instance corresponding to the given ID. 61 """ 62 return MotionGroup( 63 api_gateway=self._nova_api, 64 cell=self.configuration.cell_id, 65 motion_group_id=motion_group_id, 66 ) 67 68 def __getitem__(self, motion_group_id: int) -> MotionGroup: 69 return self.motion_group(f"{motion_group_id}@{self.configuration.controller_id}") 70 71 async def activated_motion_group_ids(self) -> list[str]: 72 """Activates and retrieves the list of motion group IDs available on this controller. 73 74 The system automatically activates all motion groups on the associated controller. 75 76 Returns: 77 list[str]: A list of activated motion group IDs (e.g., ["0@controller_id"]). 78 """ 79 return await self._nova_api.activate_all_motion_groups( 80 cell=self.configuration.cell_id, controller=self.configuration.controller_id 81 ) 82 83 async def activated_motion_groups(self) -> list[MotionGroup]: 84 """Retrieves a list of `MotionGroup` instances for all activated motion groups. 85 86 Returns: 87 list[MotionGroup]: All activated motion groups as `MotionGroup` objects. 88 """ 89 motion_group_ids = await self.activated_motion_group_ids() 90 return [self.motion_group(motion_group_id) for motion_group_id in motion_group_ids] 91 92 def get_robots(self) -> dict[str, AbstractRobot]: 93 """Retrieves a dictionary of motion group IDs to their corresponding robots. 94 95 Note: 96 This method interprets motion group IDs to create corresponding `MotionGroup` 97 objects and returns them as `AbstractRobot` references. 98 99 Returns: 100 dict[str, AbstractRobot]: A mapping of motion group ID to `MotionGroup` instance. 101 """ 102 return { 103 motion_group_id: self.motion_group(motion_group_id) 104 for motion_group_id in self._activated_motion_group_ids 105 } 106 107 async def read(self, key: str) -> ValueType: 108 """Reads an IO value from the controller. 109 110 Args: 111 key (str): The IO key name. 112 113 Returns: 114 ValueType: The value associated with the specified IO key. 115 """ 116 return await self._io_access.read(key) 117 118 async def write(self, key: str, value: ValueType) -> None: 119 """Writes an IO value to the controller. 120 121 Args: 122 key (str): The IO key name. 123 value (ValueType): The value to write to the specified IO key. 124 """ 125 return await self._io_access.write(key, value) 126 127 async def stream_state(self, rate_msecs) -> AsyncGenerator[models.RobotControllerState, None]: 128 async for state in self._nova_api.stream_robot_controller_state( 129 cell=self.configuration.cell_id, 130 controller_id=self.configuration.controller_id, 131 response_rate=rate_msecs, 132 ): 133 yield state
Represents a Nova controller, managing motion groups and IO interactions.
32 @property 33 def controller_id(self) -> str: 34 """Returns the unique controller ID.""" 35 return self.configuration.controller_id
Returns the unique controller ID.
37 async def open(self): 38 """Activates all motion groups.""" 39 motion_group_ids = await self.activated_motion_group_ids() 40 self._activated_motion_group_ids = motion_group_ids 41 logger.info(f"Found motion group {motion_group_ids}") 42 return self
Activates all motion groups.
44 async def close(self): 45 # RPS-1174: when a motion group is deactivated, RAE closes all open connections 46 # this behaviour is not desired in some cases, 47 # so for now we will not deactivate for the user 48 pass
Release the external hardware (i.e. close connection or set mode of external hardware back)
53 def motion_group(self, motion_group_id: str) -> MotionGroup: 54 """Returns motion group with specific id. 55 56 Args: 57 motion_group_id (str): The ID of the motion group. 58 59 Returns: 60 MotionGroup: A MotionGroup instance corresponding to the given ID. 61 """ 62 return MotionGroup( 63 api_gateway=self._nova_api, 64 cell=self.configuration.cell_id, 65 motion_group_id=motion_group_id, 66 )
Returns motion group with specific id.
Arguments:
- motion_group_id (str): The ID of the motion group.
Returns:
MotionGroup: A MotionGroup instance corresponding to the given ID.
71 async def activated_motion_group_ids(self) -> list[str]: 72 """Activates and retrieves the list of motion group IDs available on this controller. 73 74 The system automatically activates all motion groups on the associated controller. 75 76 Returns: 77 list[str]: A list of activated motion group IDs (e.g., ["0@controller_id"]). 78 """ 79 return await self._nova_api.activate_all_motion_groups( 80 cell=self.configuration.cell_id, controller=self.configuration.controller_id 81 )
Activates and retrieves the list of motion group IDs available on this controller.
The system automatically activates all motion groups on the associated controller.
Returns:
list[str]: A list of activated motion group IDs (e.g., ["0@controller_id"]).
83 async def activated_motion_groups(self) -> list[MotionGroup]: 84 """Retrieves a list of `MotionGroup` instances for all activated motion groups. 85 86 Returns: 87 list[MotionGroup]: All activated motion groups as `MotionGroup` objects. 88 """ 89 motion_group_ids = await self.activated_motion_group_ids() 90 return [self.motion_group(motion_group_id) for motion_group_id in motion_group_ids]
Retrieves a list of MotionGroup
instances for all activated motion groups.
Returns:
list[MotionGroup]: All activated motion groups as
MotionGroup
objects.
107 async def read(self, key: str) -> ValueType: 108 """Reads an IO value from the controller. 109 110 Args: 111 key (str): The IO key name. 112 113 Returns: 114 ValueType: The value associated with the specified IO key. 115 """ 116 return await self._io_access.read(key)
Reads an IO value from the controller.
Arguments:
- key (str): The IO key name.
Returns:
ValueType: The value associated with the specified IO key.
118 async def write(self, key: str, value: ValueType) -> None: 119 """Writes an IO value to the controller. 120 121 Args: 122 key (str): The IO key name. 123 value (ValueType): The value to write to the specified IO key. 124 """ 125 return await self._io_access.write(key, value)
Writes an IO value to the controller.
Arguments:
- key (str): The IO key name.
- value (ValueType): The value to write to the specified IO key.
127 async def stream_state(self, rate_msecs) -> AsyncGenerator[models.RobotControllerState, None]: 128 async for state in self._nova_api.stream_robot_controller_state( 129 cell=self.configuration.cell_id, 130 controller_id=self.configuration.controller_id, 131 response_rate=rate_msecs, 132 ): 133 yield state
113def speed_up( 114 context: MovementControllerContext, on_movement: Callable[[MotionState | None], None] 115) -> MovementControllerFunction: 116 async def movement_controller( 117 response_stream: ExecuteTrajectoryResponseStream, 118 ) -> ExecuteTrajectoryRequestStream: 119 # The first request is to initialize the movement 120 yield wb.models.InitializeMovementRequest(trajectory=context.motion_id, initial_location=0) # type: ignore 121 122 # then we get the response 123 initialize_movement_response = await anext(response_stream) 124 if isinstance( 125 initialize_movement_response.actual_instance, wb.models.InitializeMovementResponse 126 ): 127 r1 = initialize_movement_response.actual_instance 128 if not r1.init_response.succeeded: 129 raise InitMovementFailed(r1.init_response) 130 131 # The second request is to start the movement 132 set_io_list = context.combined_actions.to_set_io() 133 yield wb.models.StartMovementRequest( 134 set_ios=set_io_list, start_on_io=None, pause_on_io=None 135 ) # type: ignore 136 137 counter = 0 138 latest_speed = 10 139 # then we wait until the movement is finished 140 async for execute_trajectory_response in response_stream: 141 counter += 1 142 instance = execute_trajectory_response.actual_instance 143 # Send the current location to the consume 144 if isinstance(instance, wb.models.Movement): 145 motion_state = movement_to_motion_state(instance) 146 if motion_state: 147 on_movement(motion_state) 148 149 # Terminate the generator 150 if isinstance(instance, wb.models.Standstill): 151 if instance.standstill.reason == wb.models.StandstillReason.REASON_MOTION_ENDED: 152 on_movement(None) 153 return 154 155 if isinstance(instance, wb.models.PlaybackSpeedResponse): 156 playback_speed = instance.playback_speed_response 157 logger.info(f"Current playback speed: {playback_speed}") 158 159 if counter % 10 == 0: 160 yield wb.models.ExecuteTrajectoryRequest( 161 wb.models.PlaybackSpeedRequest(playback_speed_in_percent=latest_speed) 162 ) 163 counter = 0 164 latest_speed += 5 165 if latest_speed > 100: 166 latest_speed = 100 167 168 return movement_controller
The type of the None singleton.
6class MotionSettings(pydantic.BaseModel): 7 """ 8 Settings for an action. This is closely related to the `MotionCommand` in the API. 9 See planTrajectory.motion_commands for more information. 10 11 Motion settings are immutable; if you need to change a setting, create a copy and update the new object. 12 13 Attributes: 14 min_blending_velocity: 15 A minimum velocity for blending, in percent. Cannot be used if `blending` is set. 16 17 position_zone_radius: 18 Defines the position zone radius. 19 20 joint_velocity_limits: 21 Maximum joint velocities 22 23 joint_acceleration_limits: 24 Maximum joint accelerations 25 26 tcp_velocity_limit: 27 Maximum TCP velocity 28 29 tcp_acceleration_limit: 30 Maximum TCP acceleration 31 32 tcp_orientation_velocity_limit: 33 Maximum TCP orientation velocity 34 35 tcp_orientation_acceleration_limit: 36 Maximum TCP orientation acceleration 37 """ 38 39 min_blending_velocity: int | None = pydantic.Field(default=None) 40 position_zone_radius: float | None = pydantic.Field(default=None) 41 joint_velocity_limits: tuple[float, ...] | None = pydantic.Field(default=None) 42 joint_acceleration_limits: tuple[float, ...] | None = pydantic.Field(default=None) 43 tcp_velocity_limit: float | None = pydantic.Field(default=50) 44 tcp_acceleration_limit: float | None = pydantic.Field(default=None) 45 tcp_orientation_velocity_limit: float | None = pydantic.Field(default=None) 46 tcp_orientation_acceleration_limit: float | None = pydantic.Field(default=None) 47 48 class Config: 49 frozen = True 50 51 @classmethod 52 def field_to_varname(cls, field): 53 return f"__ms_{field}" 54 55 @pydantic.model_validator(mode="after") 56 def validate_blending_settings(self) -> "MotionSettings": 57 if self.min_blending_velocity and self.position_zone_radius: 58 raise ValueError("Can't set both min_blending_velocity and blending") 59 return self 60 61 def has_blending_settings(self) -> bool: 62 return any([self.min_blending_velocity, self.position_zone_radius]) 63 64 def has_limits_override(self) -> bool: 65 return any( 66 [ 67 self.tcp_velocity_limit, 68 self.tcp_acceleration_limit, 69 self.tcp_orientation_velocity_limit, 70 self.tcp_orientation_acceleration_limit, 71 self.joint_velocity_limits, 72 self.joint_acceleration_limits, 73 ] 74 ) 75 76 def as_limits_settings(self) -> wb.models.LimitsOverride: 77 return wb.models.LimitsOverride( 78 joint_velocity_limits=wb.models.Joints(joints=self.joint_velocity_limits) # type: ignore 79 if self.joint_velocity_limits 80 else None, 81 joint_acceleration_limits=wb.models.Joints(joints=self.joint_acceleration_limits) # type: ignore 82 if self.joint_acceleration_limits 83 else None, 84 tcp_velocity_limit=self.tcp_velocity_limit, 85 tcp_acceleration_limit=self.tcp_acceleration_limit, 86 tcp_orientation_velocity_limit=self.tcp_orientation_velocity_limit, 87 tcp_orientation_acceleration_limit=self.tcp_orientation_acceleration_limit, 88 ) 89 90 def as_blending_setting(self) -> wb.models.MotionCommandBlending: 91 if self.position_zone_radius: 92 return wb.models.MotionCommandBlending( 93 wb.models.BlendingPosition( 94 position_zone_radius=self.position_zone_radius, blending_name="BlendingPosition" 95 ) 96 ) 97 return wb.models.MotionCommandBlending( 98 wb.models.BlendingAuto( 99 min_velocity_in_percent=self.min_blending_velocity, blending_name="BlendingAuto" 100 ) 101 )
Settings for an action. This is closely related to the MotionCommand
in the API.
See planTrajectory.motion_commands for more information.
Motion settings are immutable; if you need to change a setting, create a copy and update the new object.
Attributes:
- min_blending_velocity: A minimum velocity for blending, in percent. Cannot be used if
blending
is set. - position_zone_radius: Defines the position zone radius.
- joint_velocity_limits: Maximum joint velocities
- joint_acceleration_limits: Maximum joint accelerations
- tcp_velocity_limit: Maximum TCP velocity
- tcp_acceleration_limit: Maximum TCP acceleration
- tcp_orientation_velocity_limit: Maximum TCP orientation velocity
- tcp_orientation_acceleration_limit: Maximum TCP orientation acceleration
76 def as_limits_settings(self) -> wb.models.LimitsOverride: 77 return wb.models.LimitsOverride( 78 joint_velocity_limits=wb.models.Joints(joints=self.joint_velocity_limits) # type: ignore 79 if self.joint_velocity_limits 80 else None, 81 joint_acceleration_limits=wb.models.Joints(joints=self.joint_acceleration_limits) # type: ignore 82 if self.joint_acceleration_limits 83 else None, 84 tcp_velocity_limit=self.tcp_velocity_limit, 85 tcp_acceleration_limit=self.tcp_acceleration_limit, 86 tcp_orientation_velocity_limit=self.tcp_orientation_velocity_limit, 87 tcp_orientation_acceleration_limit=self.tcp_orientation_acceleration_limit, 88 )
90 def as_blending_setting(self) -> wb.models.MotionCommandBlending: 91 if self.position_zone_radius: 92 return wb.models.MotionCommandBlending( 93 wb.models.BlendingPosition( 94 position_zone_radius=self.position_zone_radius, blending_name="BlendingPosition" 95 ) 96 ) 97 return wb.models.MotionCommandBlending( 98 wb.models.BlendingAuto( 99 min_velocity_in_percent=self.min_blending_velocity, blending_name="BlendingAuto" 100 ) 101 )