nova
1from nova.cell.cell import Cell 2from nova.core.controller import Controller 3from nova.core.logging import logger 4from nova.core.motion_group import MotionGroup, combine_trajectories 5from nova.core.nova import Nova 6from nova.runtime.function import wrap as program 7from nova.version import version 8 9__version__ = version 10 11__all__ = [ 12 "Nova", 13 "Cell", 14 "MotionGroup", 15 "combine_trajectories", 16 "Controller", 17 "api", 18 "types", 19 "actions", 20 "logger", 21 "program", 22 "__version__", 23]
15class Nova: 16 """A high-level Nova client for interacting with robot cells and controllers.""" 17 18 _api_client: ApiGateway 19 20 def __init__( 21 self, 22 *, 23 host: str | None = None, 24 access_token: str | None = None, 25 username: str | None = None, 26 password: str | None = None, 27 version: str = "v1", 28 verify_ssl: bool = True, 29 ): 30 """ 31 Initialize the Nova client. 32 33 Args: 34 host (str | None): The Nova API host. 35 access_token (str | None): An access token for the Nova API. 36 username (str | None): Username to authenticate with the Nova API. 37 password (str | None): Password to authenticate with the Nova API. 38 version (str): The API version to use (default: "v1"). 39 verify_ssl (bool): Whether or not to verify SSL certificates (default: True). 40 """ 41 42 self._api_client = ApiGateway( 43 host=host, 44 access_token=access_token, 45 username=username, 46 password=password, 47 version=version, 48 verify_ssl=verify_ssl, 49 ) 50 51 def cell(self, cell_id: str = CELL_NAME) -> Cell: 52 """Returns the cell object with the given ID.""" 53 return Cell(self._api_client, cell_id) 54 55 async def close(self): 56 """Closes the underlying API client session.""" 57 # hardcoded for now, later stuff like NATS might become devices 58 await nats.close() 59 return await self._api_client.close() 60 61 async def __aenter__(self): 62 return self 63 64 async def __aexit__(self, exc_type, exc_val, exc_tb): 65 await self.close()
A high-level Nova client for interacting with robot cells and controllers.
20 def __init__( 21 self, 22 *, 23 host: str | None = None, 24 access_token: str | None = None, 25 username: str | None = None, 26 password: str | None = None, 27 version: str = "v1", 28 verify_ssl: bool = True, 29 ): 30 """ 31 Initialize the Nova client. 32 33 Args: 34 host (str | None): The Nova API host. 35 access_token (str | None): An access token for the Nova API. 36 username (str | None): Username to authenticate with the Nova API. 37 password (str | None): Password to authenticate with the Nova API. 38 version (str): The API version to use (default: "v1"). 39 verify_ssl (bool): Whether or not to verify SSL certificates (default: True). 40 """ 41 42 self._api_client = ApiGateway( 43 host=host, 44 access_token=access_token, 45 username=username, 46 password=password, 47 version=version, 48 verify_ssl=verify_ssl, 49 )
Initialize the Nova client.
Arguments:
- host (str | None): The Nova API host.
- access_token (str | None): An access token for the Nova API.
- username (str | None): Username to authenticate with the Nova API.
- password (str | None): Password to authenticate with the Nova API.
- version (str): The API version to use (default: "v1").
- verify_ssl (bool): Whether or not to verify SSL certificates (default: True).
16class Cell: 17 """A representation of a robot cell, providing high-level operations on controllers.""" 18 19 def __init__(self, api_gateway: ApiGateway, cell_id: str): 20 """ 21 Initializes a Cell instance. 22 Args: 23 api_gateway (ApiGateway): The underlying gateway for making API calls. 24 cell_id (str): The unique identifier for the cell. 25 """ 26 self._api_gateway = api_gateway 27 self._cell_id = cell_id 28 29 @property 30 def cell_id(self) -> str: 31 """ 32 Returns unique identifier for this cell. 33 Returns: 34 str: The cell ID. 35 """ 36 return self._cell_id 37 38 def _create_controller(self, controller_id: str) -> Controller: 39 return Controller( 40 configuration=Controller.Configuration( 41 nova_api=self._api_gateway.host, 42 nova_access_token=self._api_gateway.access_token, 43 nova_username=self._api_gateway.username, 44 nova_password=self._api_gateway.password, 45 cell_id=self._cell_id, 46 controller_id=controller_id, 47 id=controller_id, 48 ) 49 ) 50 51 async def add_virtual_robot_controller( 52 self, 53 name: str, 54 controller_type: api.models.VirtualControllerTypes, 55 controller_manufacturer: api.models.Manufacturer, 56 timeout: int = DEFAULT_ADD_CONTROLLER_TIMEOUT, 57 wait_for_ready_timeout: int = DEFAULT_WAIT_FOR_READY_TIMEOUT, 58 position: str | None = None, 59 ) -> Controller: 60 return await self.add_controller( 61 robot_controller=virtual_controller( 62 name=name, 63 type=controller_type, 64 manufacturer=controller_manufacturer, 65 position=position, 66 ), 67 add_timeout=timeout, 68 wait_for_ready_timeout=wait_for_ready_timeout, 69 ) 70 71 async def ensure_virtual_robot_controller( 72 self, 73 name: str, 74 controller_type: api.models.VirtualControllerTypes, 75 controller_manufacturer: api.models.Manufacturer, 76 timeout: int = DEFAULT_ADD_CONTROLLER_TIMEOUT, 77 wait_for_ready_timeout: int = DEFAULT_WAIT_FOR_READY_TIMEOUT, 78 ) -> Controller: 79 return await self.ensure_controller( 80 robot_controller=virtual_controller( 81 name=name, type=controller_type, manufacturer=controller_manufacturer 82 ), 83 add_timeout=timeout, 84 wait_for_ready_timeout=wait_for_ready_timeout, 85 ) 86 87 async def add_controller( 88 self, 89 robot_controller: api.models.RobotController, 90 add_timeout: int = DEFAULT_ADD_CONTROLLER_TIMEOUT, 91 wait_for_ready_timeout: int = DEFAULT_WAIT_FOR_READY_TIMEOUT, 92 ) -> Controller: 93 """ 94 Add a robot controller to the cell and wait for it to get ready. 95 Args: 96 robot_controller (api.models.RobotController): The robot controller to add. You can use helper functions from nova to create these configs easily, 97 see :func:`nova.cell.abb_controller`, :func:`nova.cell.fanuc_controller`, :func:`nova.cell.kuka_controller`, 98 :func:`nova.cell.universal_robots_controller`, :func:`nova.cell.virtual_controller`, :func:`nova.cell.yaskawa_controller`. 99 add_timeout (int): The time to wait for the controller to be added (default: 25). 100 wait_for_ready_timeout (int): The time to wait for the controller to be ready (default: 25). 101 102 Returns: 103 Controller: The added Controller object. 104 """ 105 await self._api_gateway.add_robot_controller( 106 cell=self._cell_id, robot_controller=robot_controller, timeout=add_timeout 107 ) 108 109 await self._api_gateway.wait_for_controller_ready( 110 cell=self._cell_id, name=robot_controller.name, timeout=wait_for_ready_timeout 111 ) 112 113 return self._create_controller(robot_controller.name) 114 115 async def ensure_controller( 116 self, 117 robot_controller: api.models.RobotController, 118 add_timeout: int = DEFAULT_ADD_CONTROLLER_TIMEOUT, 119 wait_for_ready_timeout: int = DEFAULT_WAIT_FOR_READY_TIMEOUT, 120 ) -> Controller: 121 """ 122 Ensure that a robot controller is added to the cell. If it already exists, it will be returned. 123 If it doesn't exist, it will be added and waited for to be ready. 124 Args: 125 robot_controller (api.models.RobotController): The robot controller to add. You can use helper functions from nova to create these configs easily, 126 see :func:`nova.abb_controller`, :func:`nova.fanuc_controller`, :func:`nova.kuka_controller`, 127 :func:`nova.universal_robots_controller`, :func:`nova.virtual_controller`, :func:`nova.yaskawa_controller`. 128 add_timeout (int): The time to wait for the controller to be added (default: 25). 129 wait_for_ready_timeout (int): The time to wait for the controller to be ready (default: 25). 130 131 Returns: 132 Controller: The added Controller object. 133 """ 134 controller = await self._api_gateway.get_controller_instance( 135 cell=self.cell_id, name=robot_controller.name 136 ) 137 138 if controller: 139 return self._create_controller(controller.controller) 140 return await self.add_controller( 141 robot_controller, add_timeout=add_timeout, wait_for_ready_timeout=wait_for_ready_timeout 142 ) 143 144 async def controllers(self) -> list[Controller]: 145 """ 146 List all controllers for this cell. 147 Returns: 148 list[Controller]: A list of Controller objects associated with this cell. 149 """ 150 instances = await self._api_gateway.list_controllers(cell=self._cell_id) 151 return [self._create_controller(ci.controller) for ci in instances] 152 153 async def controller(self, name: str) -> Controller: 154 """ 155 Retrieve a specific controller by name. 156 Args: 157 name (str): The name of the controller. 158 Returns: 159 Controller: The Controller object. 160 Raises: 161 ControllerNotFound: If no controller with the specified name exists. 162 """ 163 controller_instance = await self._api_gateway.get_controller_instance( 164 cell=self._cell_id, name=name 165 ) 166 if not controller_instance: 167 raise ControllerNotFound(controller=name) 168 return self._create_controller(controller_instance.controller) 169 170 async def delete_robot_controller(self, name: str, timeout: int = 25): 171 """ 172 Delete a robot controller from the cell. 173 Args: 174 name (str): The name of the controller to delete. 175 timeout (int): The time to wait for the controller deletion to complete (default: 25). 176 """ 177 await self._api_gateway.delete_robot_controller( 178 cell=self._cell_id, controller=name, completion_timeout=timeout 179 ) 180 181 async def get_robot_cell(self) -> RobotCell: 182 """ 183 Return a RobotCell object containing all known controllers. 184 Returns: 185 RobotCell: A RobotCell initialized with the available controllers. 186 """ 187 controllers = await self.controllers() 188 return RobotCell(timer=None, **{controller.id: controller for controller in controllers})
A representation of a robot cell, providing high-level operations on controllers.
19 def __init__(self, api_gateway: ApiGateway, cell_id: str): 20 """ 21 Initializes a Cell instance. 22 Args: 23 api_gateway (ApiGateway): The underlying gateway for making API calls. 24 cell_id (str): The unique identifier for the cell. 25 """ 26 self._api_gateway = api_gateway 27 self._cell_id = cell_id
Initializes a Cell instance.
Arguments:
- api_gateway (ApiGateway): The underlying gateway for making API calls.
- cell_id (str): The unique identifier for the cell.
29 @property 30 def cell_id(self) -> str: 31 """ 32 Returns unique identifier for this cell. 33 Returns: 34 str: The cell ID. 35 """ 36 return self._cell_id
Returns unique identifier for this cell.
Returns:
str: The cell ID.
51 async def add_virtual_robot_controller( 52 self, 53 name: str, 54 controller_type: api.models.VirtualControllerTypes, 55 controller_manufacturer: api.models.Manufacturer, 56 timeout: int = DEFAULT_ADD_CONTROLLER_TIMEOUT, 57 wait_for_ready_timeout: int = DEFAULT_WAIT_FOR_READY_TIMEOUT, 58 position: str | None = None, 59 ) -> Controller: 60 return await self.add_controller( 61 robot_controller=virtual_controller( 62 name=name, 63 type=controller_type, 64 manufacturer=controller_manufacturer, 65 position=position, 66 ), 67 add_timeout=timeout, 68 wait_for_ready_timeout=wait_for_ready_timeout, 69 )
71 async def ensure_virtual_robot_controller( 72 self, 73 name: str, 74 controller_type: api.models.VirtualControllerTypes, 75 controller_manufacturer: api.models.Manufacturer, 76 timeout: int = DEFAULT_ADD_CONTROLLER_TIMEOUT, 77 wait_for_ready_timeout: int = DEFAULT_WAIT_FOR_READY_TIMEOUT, 78 ) -> Controller: 79 return await self.ensure_controller( 80 robot_controller=virtual_controller( 81 name=name, type=controller_type, manufacturer=controller_manufacturer 82 ), 83 add_timeout=timeout, 84 wait_for_ready_timeout=wait_for_ready_timeout, 85 )
87 async def add_controller( 88 self, 89 robot_controller: api.models.RobotController, 90 add_timeout: int = DEFAULT_ADD_CONTROLLER_TIMEOUT, 91 wait_for_ready_timeout: int = DEFAULT_WAIT_FOR_READY_TIMEOUT, 92 ) -> Controller: 93 """ 94 Add a robot controller to the cell and wait for it to get ready. 95 Args: 96 robot_controller (api.models.RobotController): The robot controller to add. You can use helper functions from nova to create these configs easily, 97 see :func:`nova.cell.abb_controller`, :func:`nova.cell.fanuc_controller`, :func:`nova.cell.kuka_controller`, 98 :func:`nova.cell.universal_robots_controller`, :func:`nova.cell.virtual_controller`, :func:`nova.cell.yaskawa_controller`. 99 add_timeout (int): The time to wait for the controller to be added (default: 25). 100 wait_for_ready_timeout (int): The time to wait for the controller to be ready (default: 25). 101 102 Returns: 103 Controller: The added Controller object. 104 """ 105 await self._api_gateway.add_robot_controller( 106 cell=self._cell_id, robot_controller=robot_controller, timeout=add_timeout 107 ) 108 109 await self._api_gateway.wait_for_controller_ready( 110 cell=self._cell_id, name=robot_controller.name, timeout=wait_for_ready_timeout 111 ) 112 113 return self._create_controller(robot_controller.name)
Add a robot controller to the cell and wait for it to get ready.
Arguments:
- robot_controller (api.models.RobotController): The robot controller to add. You can use helper functions from nova to create these configs easily,
see
nova.cell.abb_controller()
,nova.cell.fanuc_controller()
,nova.cell.kuka_controller()
,nova.cell.universal_robots_controller()
,nova.cell.virtual_controller()
,nova.cell.yaskawa_controller()
. - add_timeout (int): The time to wait for the controller to be added (default: 25).
- wait_for_ready_timeout (int): The time to wait for the controller to be ready (default: 25).
Returns:
Controller: The added Controller object.
115 async def ensure_controller( 116 self, 117 robot_controller: api.models.RobotController, 118 add_timeout: int = DEFAULT_ADD_CONTROLLER_TIMEOUT, 119 wait_for_ready_timeout: int = DEFAULT_WAIT_FOR_READY_TIMEOUT, 120 ) -> Controller: 121 """ 122 Ensure that a robot controller is added to the cell. If it already exists, it will be returned. 123 If it doesn't exist, it will be added and waited for to be ready. 124 Args: 125 robot_controller (api.models.RobotController): The robot controller to add. You can use helper functions from nova to create these configs easily, 126 see :func:`nova.abb_controller`, :func:`nova.fanuc_controller`, :func:`nova.kuka_controller`, 127 :func:`nova.universal_robots_controller`, :func:`nova.virtual_controller`, :func:`nova.yaskawa_controller`. 128 add_timeout (int): The time to wait for the controller to be added (default: 25). 129 wait_for_ready_timeout (int): The time to wait for the controller to be ready (default: 25). 130 131 Returns: 132 Controller: The added Controller object. 133 """ 134 controller = await self._api_gateway.get_controller_instance( 135 cell=self.cell_id, name=robot_controller.name 136 ) 137 138 if controller: 139 return self._create_controller(controller.controller) 140 return await self.add_controller( 141 robot_controller, add_timeout=add_timeout, wait_for_ready_timeout=wait_for_ready_timeout 142 )
Ensure that a robot controller is added to the cell. If it already exists, it will be returned. If it doesn't exist, it will be added and waited for to be ready.
Arguments:
- robot_controller (api.models.RobotController): The robot controller to add. You can use helper functions from nova to create these configs easily,
see
nova.abb_controller()
,nova.fanuc_controller()
,nova.kuka_controller()
,nova.universal_robots_controller()
,nova.virtual_controller()
,nova.yaskawa_controller()
. - add_timeout (int): The time to wait for the controller to be added (default: 25).
- wait_for_ready_timeout (int): The time to wait for the controller to be ready (default: 25).
Returns:
Controller: The added Controller object.
144 async def controllers(self) -> list[Controller]: 145 """ 146 List all controllers for this cell. 147 Returns: 148 list[Controller]: A list of Controller objects associated with this cell. 149 """ 150 instances = await self._api_gateway.list_controllers(cell=self._cell_id) 151 return [self._create_controller(ci.controller) for ci in instances]
List all controllers for this cell.
Returns:
list[Controller]: A list of Controller objects associated with this cell.
153 async def controller(self, name: str) -> Controller: 154 """ 155 Retrieve a specific controller by name. 156 Args: 157 name (str): The name of the controller. 158 Returns: 159 Controller: The Controller object. 160 Raises: 161 ControllerNotFound: If no controller with the specified name exists. 162 """ 163 controller_instance = await self._api_gateway.get_controller_instance( 164 cell=self._cell_id, name=name 165 ) 166 if not controller_instance: 167 raise ControllerNotFound(controller=name) 168 return self._create_controller(controller_instance.controller)
Retrieve a specific controller by name.
Arguments:
- name (str): The name of the controller.
Returns:
Controller: The Controller object.
Raises:
- ControllerNotFound: If no controller with the specified name exists.
170 async def delete_robot_controller(self, name: str, timeout: int = 25): 171 """ 172 Delete a robot controller from the cell. 173 Args: 174 name (str): The name of the controller to delete. 175 timeout (int): The time to wait for the controller deletion to complete (default: 25). 176 """ 177 await self._api_gateway.delete_robot_controller( 178 cell=self._cell_id, controller=name, completion_timeout=timeout 179 )
Delete a robot controller from the cell.
Arguments:
- name (str): The name of the controller to delete.
- timeout (int): The time to wait for the controller deletion to complete (default: 25).
181 async def get_robot_cell(self) -> RobotCell: 182 """ 183 Return a RobotCell object containing all known controllers. 184 Returns: 185 RobotCell: A RobotCell initialized with the available controllers. 186 """ 187 controllers = await self.controllers() 188 return RobotCell(timer=None, **{controller.id: controller for controller in controllers})
Return a RobotCell object containing all known controllers.
Returns:
RobotCell: A RobotCell initialized with the available controllers.
127class MotionGroup(AbstractRobot): 128 """Manages motion planning and execution within a specified motion group.""" 129 130 def __init__(self, api_gateway: ApiGateway, cell: str, motion_group_id: str): 131 """ 132 Initializes a new MotionGroup instance. 133 134 Args: 135 api_gateway (ApiGateway): The API gateway through which motion commands are sent. 136 cell (str): The name or identifier of the robotic cell. 137 motion_group_id (str): The identifier of the motion group. 138 """ 139 self._api_gateway = api_gateway 140 self._cell = cell 141 self._motion_group_id = motion_group_id 142 self._current_motion: str | None = None 143 self._optimizer_setup: wb.models.OptimizerSetup | None = None 144 super().__init__(id=motion_group_id) 145 146 async def open(self): 147 await self._api_gateway.activate_motion_group( 148 cell=self._cell, motion_group_id=self._motion_group_id 149 ) 150 return self 151 152 async def close(self): 153 # RPS-1174: when a motion group is deactivated, RAE closes all open connections 154 # this behaviour is not desired in some cases, 155 # so for now we will not deactivate for the user 156 pass 157 158 @property 159 def motion_group_id(self) -> str: 160 """ 161 Returns: 162 str: The unique identifier for this motion group. 163 """ 164 return self._motion_group_id 165 166 @property 167 def current_motion(self) -> str | None: 168 # if not self._current_motion: 169 # raise ValueError("No MotionId attached. There is no planned motion available.") 170 return self._current_motion 171 172 async def _plan_with_collision_check( 173 self, 174 actions: list[Action], 175 tcp: str, 176 start_joint_position: tuple[float, ...] | None = None, 177 optimizer_setup: wb.models.OptimizerSetup | None = None, 178 ) -> wb.models.JointTrajectory: 179 """ 180 This method plans a trajectory and checks for collisions. 181 The collision check only happens if the actions have collision scene data. 182 183 You must provide the exact same collision data into all the actions. 184 Because the underlying API supports collision checks for the whole trajectory only. 185 186 Raises: 187 InconsistentCollisionScenes: If the collision scene data is not consistent across all actions 188 189 Your actions should follow below rules to be considered consistent: 190 1- They all should have the same collision scene data 191 2- They all should have no collision data 192 193 PlanTrajectoryFailed: If the trajectory planning failed including the collision check 194 195 For more information about this API, please refer to the plan_trajectory in the API documentation. 196 197 Args: 198 actions: list of actions to plan, current supported actions are Motion and WriteActions 199 WriteAction you specify on your path is handled in a performant way. 200 Please check execute_trajectory.motion_command.set_io for more information. 201 tcp: The tool to use 202 start_joint_position: The starting joint position, if none provided, current position of the robot is used 203 optimizer_setup: The optimizer setup 204 205 Returns: planned joint trajectory 206 207 """ 208 # PREPARE THE REQUEST 209 collision_scenes = validate_collision_scenes(actions) 210 start_joint_position = start_joint_position or await self.joints() 211 robot_setup = optimizer_setup or await self._get_optimizer_setup(tcp=tcp) 212 213 motion_commands = CombinedActions(items=tuple(actions)).to_motion_command() # type: ignore 214 215 static_colliders = None 216 collision_motion_group = None 217 if collision_scenes and len(collision_scenes) > 0: 218 static_colliders = collision_scenes[0].colliders 219 220 motion_group_type = robot_setup.motion_group_type 221 if ( 222 collision_scenes[0].motion_groups 223 and motion_group_type in collision_scenes[0].motion_groups 224 ): 225 collision_motion_group = collision_scenes[0].motion_groups[motion_group_type] 226 227 request = wb.models.PlanTrajectoryRequest( 228 robot_setup=robot_setup, 229 start_joint_position=list(start_joint_position), 230 motion_commands=motion_commands, 231 static_colliders=static_colliders, 232 collision_motion_group=collision_motion_group, 233 ) 234 235 return await self._api_gateway.plan_trajectory( 236 cell=self._cell, motion_group_id=self.motion_group_id, request=request 237 ) 238 239 # TODO: we get the optimizer setup from as an input because 240 # it has a velocity setting which is used in collision free movement, I need to double check this 241 async def _plan_collision_free( 242 self, 243 action: CollisionFreeMotion, 244 tcp: str, 245 start_joint_position: list[float], 246 optimizer_setup: wb.models.OptimizerSetup | None = None, 247 ) -> wb.models.JointTrajectory: 248 """ 249 This method plans a trajectory and avoids collisions. 250 This means if there is a collision along the way to the target pose or joint positions, 251 It will adjust the trajectory to avoid the collision. 252 253 The collision check only happens if the action have collision scene data. 254 255 For more information about this API, please refer to the plan_collision_free_ptp in the API documentation. 256 257 Args: 258 action: The target pose or joint positions to reach 259 tcp: The tool to use 260 start_joint_position: The starting joint position, if none provided, current position of the robot is used 261 optimizer_setup: The optimizer setup 262 263 Returns: planned joint trajectory 264 265 266 """ 267 target = wb.models.PlanCollisionFreePTPRequestTarget(**action.to_api_model().model_dump()) 268 robot_setup = optimizer_setup or await self._get_optimizer_setup(tcp=tcp) 269 270 static_colliders = None 271 collision_motion_group = None 272 collision_scene = action.collision_scene 273 if collision_scene and collision_scene.colliders: 274 static_colliders = collision_scene.colliders 275 276 if ( 277 collision_scene.motion_groups 278 and robot_setup.motion_group_type in collision_scene.motion_groups 279 ): 280 collision_motion_group = collision_scene.motion_groups[ 281 robot_setup.motion_group_type 282 ] 283 284 request: wb.models.PlanCollisionFreePTPRequest = wb.models.PlanCollisionFreePTPRequest( 285 robot_setup=robot_setup, 286 start_joint_position=start_joint_position, 287 target=target, 288 static_colliders=static_colliders, 289 collision_motion_group=collision_motion_group, 290 ) 291 292 return await self._api_gateway.plan_collision_free_ptp( 293 cell=self._cell, motion_group_id=self.motion_group_id, request=request 294 ) 295 296 async def _plan( 297 self, 298 actions: list[Action], 299 tcp: str, 300 start_joint_position: tuple[float, ...] | None = None, 301 optimizer_setup: wb.models.OptimizerSetup | None = None, 302 ) -> wb.models.JointTrajectory: 303 if not actions: 304 raise ValueError("No actions provided") 305 306 current_joints = start_joint_position or await self.joints() 307 robot_setup = optimizer_setup or await self._get_optimizer_setup(tcp=tcp) 308 309 all_trajectories = [] 310 for batch in split_actions_into_batches(actions): 311 if len(batch) == 0: 312 raise ValueError("Empty batch of actions") 313 314 if isinstance(batch[0], CollisionFreeMotion): 315 motion: CollisionFreeMotion = cast(CollisionFreeMotion, batch[0]) 316 trajectory = await self._plan_collision_free( 317 action=motion, 318 tcp=tcp, 319 start_joint_position=list(current_joints), 320 optimizer_setup=robot_setup, 321 ) 322 all_trajectories.append(trajectory) 323 # the last joint position of this trajectory is the starting point for the next one 324 current_joints = tuple(trajectory.joint_positions[-1].joints) 325 elif isinstance(batch[0], WaitAction): 326 # Waits generate a trajectory with the same joint position at each timestep 327 # Use 50ms timesteps from 0 to wait_for_in_seconds 328 wait_time = batch[0].wait_for_in_seconds 329 timestep = 0.050 # 50ms timestep 330 num_steps = max(2, int(wait_time / timestep) + 1) # Ensure at least 2 points 331 332 # Create equal-length arrays for positions, times, and locations 333 joint_positions = [ 334 wb.models.Joints(joints=list(current_joints)) for _ in range(num_steps) 335 ] 336 times = [i * timestep for i in range(num_steps)] 337 # Ensure the last timestep is exactly the wait duration 338 times[-1] = wait_time 339 # Use the same location value for all points 340 locations = [0] * num_steps 341 342 trajectory = wb.models.JointTrajectory( 343 joint_positions=joint_positions, 344 times=times, 345 locations=[float(loc) for loc in locations], 346 ) 347 all_trajectories.append(trajectory) 348 # the last joint position of this trajectory is the starting point for the next one 349 current_joints = tuple(trajectory.joint_positions[-1].joints) 350 else: 351 trajectory = await self._plan_with_collision_check( 352 actions=batch, 353 tcp=tcp, 354 start_joint_position=current_joints, 355 optimizer_setup=robot_setup, 356 ) 357 all_trajectories.append(trajectory) 358 # the last joint position of this trajectory is the starting point for the next one 359 current_joints = tuple(trajectory.joint_positions[-1].joints) 360 361 return combine_trajectories(all_trajectories) 362 363 # TODO: refactor and simplify code, tests are already there 364 # TODO: split into batches when the collision scene changes in a batch of collision free motions 365 366 async def _execute( 367 self, 368 joint_trajectory: wb.models.JointTrajectory, 369 tcp: str, 370 actions: list[Action], 371 movement_controller: MovementController | None, 372 ) -> AsyncIterable[MovementResponse]: 373 if movement_controller is None: 374 movement_controller = move_forward 375 376 # Load planned trajectory 377 load_plan_response = await self._load_planned_motion(joint_trajectory, tcp) 378 379 # Move to start position 380 number_of_joints = await self._api_gateway.get_joint_number( 381 cell=self._cell, motion_group_id=self.motion_group_id 382 ) 383 joints_velocities = [MAX_JOINT_VELOCITY_PREPARE_MOVE] * number_of_joints 384 movement_stream = await self.move_to_start_position(joints_velocities, load_plan_response) 385 386 # If there's an initial consumer, feed it the data 387 async for move_to_response in movement_stream: 388 # TODO: refactor 389 if ( 390 move_to_response.state is None 391 or move_to_response.state.motion_groups is None 392 or len(move_to_response.state.motion_groups) == 0 393 or move_to_response.move_response is None 394 or move_to_response.move_response.current_location_on_trajectory is None 395 ): 396 continue 397 398 yield move_to_response 399 400 controller = movement_controller( 401 MovementControllerContext( 402 combined_actions=CombinedActions(items=tuple(actions)), # type: ignore 403 motion_id=load_plan_response.motion, 404 ) 405 ) 406 407 def stop_condition(response: wb.models.ExecuteTrajectoryResponse) -> bool: 408 instance = response.actual_instance 409 # Stop when standstill indicates motion ended 410 return ( 411 isinstance(instance, wb.models.Standstill) 412 and instance.standstill.reason == wb.models.StandstillReason.REASON_MOTION_ENDED 413 ) 414 415 execute_response_streaming_controller = StreamExtractor(controller, stop_condition) 416 execution_task = asyncio.create_task( 417 self._api_gateway.motion_api.execute_trajectory( 418 cell=self._cell, client_request_generator=execute_response_streaming_controller 419 ) 420 ) 421 async for execute_response in execute_response_streaming_controller: 422 yield execute_response 423 await execution_task 424 425 async def _get_optimizer_setup(self, tcp: str) -> wb.models.OptimizerSetup: 426 # TODO: mypy failed on main branch, need to check 427 if self._optimizer_setup is None or self._optimizer_setup.tcp != tcp: # type: ignore 428 self._optimizer_setup = await self._api_gateway.get_optimizer_config( 429 cell=self._cell, motion_group_id=self.motion_group_id, tcp=tcp 430 ) 431 432 return self._optimizer_setup 433 434 async def _load_planned_motion( 435 self, joint_trajectory: wb.models.JointTrajectory, tcp: str 436 ) -> wb.models.PlanSuccessfulResponse: 437 return await self._api_gateway.load_planned_motion( 438 cell=self._cell, 439 motion_group_id=self.motion_group_id, 440 joint_trajectory=joint_trajectory, 441 tcp=tcp, 442 ) 443 444 async def move_to_start_position( 445 self, joint_velocities, load_plan_response: LoadPlanResponse 446 ) -> InitialMovementStream: 447 limit_override = wb.models.LimitsOverride() 448 if joint_velocities is not None: 449 limit_override.joint_velocity_limits = wb.models.Joints(joints=joint_velocities) 450 451 return self._api_gateway.stream_move_to_trajectory_via_join_ptp( 452 cell=self._cell, 453 motion_id=load_plan_response.motion, 454 location_on_trajectory=0, 455 joint_velocity_limits=limit_override.joint_velocity_limits, 456 ) 457 458 async def stop(self): 459 logger.debug(f"Stopping motion of {self}...") 460 try: 461 if self._current_motion is None: 462 raise ValueError("No motion to stop") 463 await self._api_gateway.stop_motion(cell=self._cell, motion_id=self._current_motion) 464 logger.debug(f"Motion {self.current_motion} stopped.") 465 except ValueError as e: 466 logger.debug(f"No motion to stop for {self}: {e}") 467 468 async def get_state(self, tcp: str | None = None) -> RobotState: 469 """ 470 Returns the motion group state. 471 Args: 472 tcp (str | None): The reference TCP for the cartesian pose part of the robot state. Defaults to None. 473 If None, the current active/selected TCP of the motion group is used. 474 """ 475 response = await self._api_gateway.get_motion_group_state( 476 cell=self._cell, motion_group_id=self.motion_group_id, tcp=tcp 477 ) 478 pose = Pose(response.tcp_pose or response.state.tcp_pose) 479 return RobotState(pose=pose, joints=tuple(response.state.joint_position.joints)) 480 481 async def joints(self) -> tuple: 482 """Returns the current joint positions of the motion group.""" 483 state = await self.get_state() 484 if state.joints is None: 485 raise ValueError( 486 f"No joint positions available for motion group {self._motion_group_id}" 487 ) 488 return state.joints 489 490 async def tcp_pose(self, tcp: str | None = None) -> Pose: 491 """ 492 Returns the current TCP pose of the motion group. 493 Args: 494 tcp (str | None): The reference TCP for the returned pose. Defaults to None. 495 If None, the current active/selected TCP of the motion group is used. 496 """ 497 state = await self.get_state(tcp=tcp) 498 return state.pose 499 500 async def tcps(self) -> list[wb.models.RobotTcp]: 501 response = await self._api_gateway.list_tcps( 502 cell=self._cell, motion_group_id=self.motion_group_id 503 ) 504 return response.tcps if response.tcps else [] 505 506 async def tcp_names(self) -> list[str]: 507 return [tcp.id for tcp in await self.tcps()] 508 509 async def active_tcp(self) -> wb.models.RobotTcp: 510 active_tcp = await self._api_gateway.get_active_tcp( 511 cell=self._cell, motion_group_id=self.motion_group_id 512 ) 513 return active_tcp 514 515 async def active_tcp_name(self) -> str: 516 active_tcp = await self.active_tcp() 517 return active_tcp.id 518 519 async def ensure_virtual_tcp(self, tcp: models.RobotTcp) -> models.RobotTcp: 520 """ 521 Ensure that a virtual TCP with the expected configuration exists on this motion group. 522 If it doesn't exist, it will be created. If it exists but has different configuration, 523 it will be updated by recreating it. 524 525 Args: 526 tcp (models.RobotTcp): The expected TCP configuration 527 528 Returns: 529 models.RobotTcp: The TCP configuration 530 """ 531 existing_tcps = await self.tcps() 532 533 existing_tcp = None 534 for existing in existing_tcps: 535 if existing.id == tcp.id: 536 existing_tcp = existing 537 break 538 539 if existing_tcp: 540 if existing_tcp == tcp: 541 return existing_tcp 542 543 controller_name = self._motion_group_id.split("@")[1] 544 motion_group_index = int(self._motion_group_id.split("@")[0]) 545 546 await self._api_gateway.virtual_robot_setup_api.add_virtual_robot_tcp( 547 cell=self._cell, controller=controller_name, id=motion_group_index, robot_tcp=tcp 548 ) 549 550 return tcp
Manages motion planning and execution within a specified motion group.
130 def __init__(self, api_gateway: ApiGateway, cell: str, motion_group_id: str): 131 """ 132 Initializes a new MotionGroup instance. 133 134 Args: 135 api_gateway (ApiGateway): The API gateway through which motion commands are sent. 136 cell (str): The name or identifier of the robotic cell. 137 motion_group_id (str): The identifier of the motion group. 138 """ 139 self._api_gateway = api_gateway 140 self._cell = cell 141 self._motion_group_id = motion_group_id 142 self._current_motion: str | None = None 143 self._optimizer_setup: wb.models.OptimizerSetup | None = None 144 super().__init__(id=motion_group_id)
Initializes a new MotionGroup instance.
Arguments:
- api_gateway (ApiGateway): The API gateway through which motion commands are sent.
- cell (str): The name or identifier of the robotic cell.
- motion_group_id (str): The identifier of the motion group.
146 async def open(self): 147 await self._api_gateway.activate_motion_group( 148 cell=self._cell, motion_group_id=self._motion_group_id 149 ) 150 return self
Allocates the external hardware resource (i.e. establish a connection)
152 async def close(self): 153 # RPS-1174: when a motion group is deactivated, RAE closes all open connections 154 # this behaviour is not desired in some cases, 155 # so for now we will not deactivate for the user 156 pass
Release the external hardware (i.e. close connection or set mode of external hardware back)
158 @property 159 def motion_group_id(self) -> str: 160 """ 161 Returns: 162 str: The unique identifier for this motion group. 163 """ 164 return self._motion_group_id
Returns: str: The unique identifier for this motion group.
444 async def move_to_start_position( 445 self, joint_velocities, load_plan_response: LoadPlanResponse 446 ) -> InitialMovementStream: 447 limit_override = wb.models.LimitsOverride() 448 if joint_velocities is not None: 449 limit_override.joint_velocity_limits = wb.models.Joints(joints=joint_velocities) 450 451 return self._api_gateway.stream_move_to_trajectory_via_join_ptp( 452 cell=self._cell, 453 motion_id=load_plan_response.motion, 454 location_on_trajectory=0, 455 joint_velocity_limits=limit_override.joint_velocity_limits, 456 )
458 async def stop(self): 459 logger.debug(f"Stopping motion of {self}...") 460 try: 461 if self._current_motion is None: 462 raise ValueError("No motion to stop") 463 await self._api_gateway.stop_motion(cell=self._cell, motion_id=self._current_motion) 464 logger.debug(f"Motion {self.current_motion} stopped.") 465 except ValueError as e: 466 logger.debug(f"No motion to stop for {self}: {e}")
Stop behaviour of the robot
468 async def get_state(self, tcp: str | None = None) -> RobotState: 469 """ 470 Returns the motion group state. 471 Args: 472 tcp (str | None): The reference TCP for the cartesian pose part of the robot state. Defaults to None. 473 If None, the current active/selected TCP of the motion group is used. 474 """ 475 response = await self._api_gateway.get_motion_group_state( 476 cell=self._cell, motion_group_id=self.motion_group_id, tcp=tcp 477 ) 478 pose = Pose(response.tcp_pose or response.state.tcp_pose) 479 return RobotState(pose=pose, joints=tuple(response.state.joint_position.joints))
Returns the motion group state.
Arguments:
- tcp (str | None): The reference TCP for the cartesian pose part of the robot state. Defaults to None. If None, the current active/selected TCP of the motion group is used.
481 async def joints(self) -> tuple: 482 """Returns the current joint positions of the motion group.""" 483 state = await self.get_state() 484 if state.joints is None: 485 raise ValueError( 486 f"No joint positions available for motion group {self._motion_group_id}" 487 ) 488 return state.joints
Returns the current joint positions of the motion group.
490 async def tcp_pose(self, tcp: str | None = None) -> Pose: 491 """ 492 Returns the current TCP pose of the motion group. 493 Args: 494 tcp (str | None): The reference TCP for the returned pose. Defaults to None. 495 If None, the current active/selected TCP of the motion group is used. 496 """ 497 state = await self.get_state(tcp=tcp) 498 return state.pose
Returns the current TCP pose of the motion group.
Arguments:
- tcp (str | None): The reference TCP for the returned pose. Defaults to None. If None, the current active/selected TCP of the motion group is used.
500 async def tcps(self) -> list[wb.models.RobotTcp]: 501 response = await self._api_gateway.list_tcps( 502 cell=self._cell, motion_group_id=self.motion_group_id 503 ) 504 return response.tcps if response.tcps else []
Return all TCPs that are configured on the robot with corresponding offset from flange as pose
Returns: the TCPs of the robot
Return the name of all TCPs that are configured on the robot
Returns: a list of all TCPs
509 async def active_tcp(self) -> wb.models.RobotTcp: 510 active_tcp = await self._api_gateway.get_active_tcp( 511 cell=self._cell, motion_group_id=self.motion_group_id 512 ) 513 return active_tcp
Return the active TCP of the robot
Returns: the active TCP
515 async def active_tcp_name(self) -> str: 516 active_tcp = await self.active_tcp() 517 return active_tcp.id
Return the name of the active TCP of the robot
Returns: the name of the active TCP
519 async def ensure_virtual_tcp(self, tcp: models.RobotTcp) -> models.RobotTcp: 520 """ 521 Ensure that a virtual TCP with the expected configuration exists on this motion group. 522 If it doesn't exist, it will be created. If it exists but has different configuration, 523 it will be updated by recreating it. 524 525 Args: 526 tcp (models.RobotTcp): The expected TCP configuration 527 528 Returns: 529 models.RobotTcp: The TCP configuration 530 """ 531 existing_tcps = await self.tcps() 532 533 existing_tcp = None 534 for existing in existing_tcps: 535 if existing.id == tcp.id: 536 existing_tcp = existing 537 break 538 539 if existing_tcp: 540 if existing_tcp == tcp: 541 return existing_tcp 542 543 controller_name = self._motion_group_id.split("@")[1] 544 motion_group_index = int(self._motion_group_id.split("@")[0]) 545 546 await self._api_gateway.virtual_robot_setup_api.add_virtual_robot_tcp( 547 cell=self._cell, controller=controller_name, id=motion_group_index, robot_tcp=tcp 548 ) 549 550 return tcp
Ensure that a virtual TCP with the expected configuration exists on this motion group. If it doesn't exist, it will be created. If it exists but has different configuration, it will be updated by recreating it.
Arguments:
- tcp (models.RobotTcp): The expected TCP configuration
Returns:
models.RobotTcp: The TCP configuration
58def combine_trajectories( 59 trajectories: list[wb.models.JointTrajectory], 60) -> wb.models.JointTrajectory: 61 """ 62 Combines multiple trajectories into one trajectory. 63 """ 64 final_trajectory = trajectories[0] 65 current_end_time = final_trajectory.times[-1] 66 current_end_location = final_trajectory.locations[-1] 67 68 for trajectory in trajectories[1:]: 69 # Shift times and locations to continue from last endpoint 70 shifted_times = [t + current_end_time for t in trajectory.times[1:]] # Skip first point 71 shifted_locations = [ 72 location + current_end_location for location in trajectory.locations[1:] 73 ] # Skip first point 74 75 final_trajectory.times.extend(shifted_times) 76 final_trajectory.joint_positions.extend(trajectory.joint_positions[1:]) 77 final_trajectory.locations.extend(shifted_locations) 78 79 current_end_time = final_trajectory.times[-1] 80 current_end_location = final_trajectory.locations[-1] 81 82 return final_trajectory
Combines multiple trajectories into one trajectory.
12class Controller(Sized, AbstractController, NovaDevice, IODevice): 13 """ 14 Represents a Nova controller, managing motion groups and IO interactions. 15 """ 16 17 class Configuration(NovaDevice.Configuration): 18 type: Literal["controller"] = "controller" 19 id: str = "controller" 20 cell_id: str 21 controller_id: str 22 23 def __init__(self, configuration: Configuration): 24 super().__init__(configuration) 25 self._activated_motion_group_ids: list[str] = [] 26 self._io_access = IOAccess( 27 api_gateway=self._nova_api, 28 cell=self.configuration.cell_id, 29 controller_id=self.configuration.controller_id, 30 ) 31 32 @property 33 def controller_id(self) -> str: 34 """Returns the unique controller ID.""" 35 return self.configuration.controller_id 36 37 async def open(self): 38 """Activates all motion groups.""" 39 motion_group_ids = await self.activated_motion_group_ids() 40 self._activated_motion_group_ids = motion_group_ids 41 logger.info(f"Found motion group {motion_group_ids}") 42 return self 43 44 async def close(self): 45 # RPS-1174: when a motion group is deactivated, RAE closes all open connections 46 # this behaviour is not desired in some cases, 47 # so for now we will not deactivate for the user 48 pass 49 50 def __len__(self) -> int: 51 return len(self._activated_motion_group_ids) 52 53 def motion_group(self, motion_group_id: str) -> MotionGroup: 54 """Returns motion group with specific id. 55 56 Args: 57 motion_group_id (str): The ID of the motion group. 58 59 Returns: 60 MotionGroup: A MotionGroup instance corresponding to the given ID. 61 """ 62 return MotionGroup( 63 api_gateway=self._nova_api, 64 cell=self.configuration.cell_id, 65 motion_group_id=motion_group_id, 66 ) 67 68 def __getitem__(self, motion_group_id: int) -> MotionGroup: 69 return self.motion_group(f"{motion_group_id}@{self.configuration.controller_id}") 70 71 async def activated_motion_group_ids(self) -> list[str]: 72 """Activates and retrieves the list of motion group IDs available on this controller. 73 74 The system automatically activates all motion groups on the associated controller. 75 76 Returns: 77 list[str]: A list of activated motion group IDs (e.g., ["0@controller_id"]). 78 """ 79 return await self._nova_api.activate_all_motion_groups( 80 cell=self.configuration.cell_id, controller=self.configuration.controller_id 81 ) 82 83 async def activated_motion_groups(self) -> list[MotionGroup]: 84 """Retrieves a list of `MotionGroup` instances for all activated motion groups. 85 86 Returns: 87 list[MotionGroup]: All activated motion groups as `MotionGroup` objects. 88 """ 89 motion_group_ids = await self.activated_motion_group_ids() 90 return [self.motion_group(motion_group_id) for motion_group_id in motion_group_ids] 91 92 def get_robots(self) -> dict[str, AbstractRobot]: 93 """Retrieves a dictionary of motion group IDs to their corresponding robots. 94 95 Note: 96 This method interprets motion group IDs to create corresponding `MotionGroup` 97 objects and returns them as `AbstractRobot` references. 98 99 Returns: 100 dict[str, AbstractRobot]: A mapping of motion group ID to `MotionGroup` instance. 101 """ 102 return { 103 motion_group_id: self.motion_group(motion_group_id) 104 for motion_group_id in self._activated_motion_group_ids 105 } 106 107 async def read(self, key: str) -> ValueType: 108 """Reads an IO value from the controller. 109 110 Args: 111 key (str): The IO key name. 112 113 Returns: 114 ValueType: The value associated with the specified IO key. 115 """ 116 return await self._io_access.read(key) 117 118 async def write(self, key: str, value: ValueType) -> None: 119 """Writes an IO value to the controller. 120 121 Args: 122 key (str): The IO key name. 123 value (ValueType): The value to write to the specified IO key. 124 """ 125 return await self._io_access.write(key, value) 126 127 async def stream_state(self, rate_msecs) -> AsyncGenerator[models.RobotControllerState, None]: 128 async for state in self._nova_api.stream_robot_controller_state( 129 cell=self.configuration.cell_id, 130 controller_id=self.configuration.controller_id, 131 response_rate=rate_msecs, 132 ): 133 yield state
Represents a Nova controller, managing motion groups and IO interactions.
32 @property 33 def controller_id(self) -> str: 34 """Returns the unique controller ID.""" 35 return self.configuration.controller_id
Returns the unique controller ID.
37 async def open(self): 38 """Activates all motion groups.""" 39 motion_group_ids = await self.activated_motion_group_ids() 40 self._activated_motion_group_ids = motion_group_ids 41 logger.info(f"Found motion group {motion_group_ids}") 42 return self
Activates all motion groups.
44 async def close(self): 45 # RPS-1174: when a motion group is deactivated, RAE closes all open connections 46 # this behaviour is not desired in some cases, 47 # so for now we will not deactivate for the user 48 pass
Release the external hardware (i.e. close connection or set mode of external hardware back)
53 def motion_group(self, motion_group_id: str) -> MotionGroup: 54 """Returns motion group with specific id. 55 56 Args: 57 motion_group_id (str): The ID of the motion group. 58 59 Returns: 60 MotionGroup: A MotionGroup instance corresponding to the given ID. 61 """ 62 return MotionGroup( 63 api_gateway=self._nova_api, 64 cell=self.configuration.cell_id, 65 motion_group_id=motion_group_id, 66 )
Returns motion group with specific id.
Arguments:
- motion_group_id (str): The ID of the motion group.
Returns:
MotionGroup: A MotionGroup instance corresponding to the given ID.
71 async def activated_motion_group_ids(self) -> list[str]: 72 """Activates and retrieves the list of motion group IDs available on this controller. 73 74 The system automatically activates all motion groups on the associated controller. 75 76 Returns: 77 list[str]: A list of activated motion group IDs (e.g., ["0@controller_id"]). 78 """ 79 return await self._nova_api.activate_all_motion_groups( 80 cell=self.configuration.cell_id, controller=self.configuration.controller_id 81 )
Activates and retrieves the list of motion group IDs available on this controller.
The system automatically activates all motion groups on the associated controller.
Returns:
list[str]: A list of activated motion group IDs (e.g., ["0@controller_id"]).
83 async def activated_motion_groups(self) -> list[MotionGroup]: 84 """Retrieves a list of `MotionGroup` instances for all activated motion groups. 85 86 Returns: 87 list[MotionGroup]: All activated motion groups as `MotionGroup` objects. 88 """ 89 motion_group_ids = await self.activated_motion_group_ids() 90 return [self.motion_group(motion_group_id) for motion_group_id in motion_group_ids]
Retrieves a list of MotionGroup
instances for all activated motion groups.
Returns:
list[MotionGroup]: All activated motion groups as
MotionGroup
objects.
107 async def read(self, key: str) -> ValueType: 108 """Reads an IO value from the controller. 109 110 Args: 111 key (str): The IO key name. 112 113 Returns: 114 ValueType: The value associated with the specified IO key. 115 """ 116 return await self._io_access.read(key)
Reads an IO value from the controller.
Arguments:
- key (str): The IO key name.
Returns:
ValueType: The value associated with the specified IO key.
118 async def write(self, key: str, value: ValueType) -> None: 119 """Writes an IO value to the controller. 120 121 Args: 122 key (str): The IO key name. 123 value (ValueType): The value to write to the specified IO key. 124 """ 125 return await self._io_access.write(key, value)
Writes an IO value to the controller.
Arguments:
- key (str): The IO key name.
- value (ValueType): The value to write to the specified IO key.
127 async def stream_state(self, rate_msecs) -> AsyncGenerator[models.RobotControllerState, None]: 128 async for state in self._nova_api.stream_robot_controller_state( 129 cell=self.configuration.cell_id, 130 controller_id=self.configuration.controller_id, 131 response_rate=rate_msecs, 132 ): 133 yield state
203def wrap(function: Callable[Parameters, Return]) -> Function[Parameters, Return]: 204 return Function.validate(function)
The type of the None singleton.