nova
1from nova import actions, api, types 2from nova.core.controller import Controller 3from nova.core.logging import logger 4from nova.core.motion_group import MotionGroup 5from nova.core.movement_controller import speed_up as speed_up_movement_controller 6from nova.core.nova import Cell, Nova 7from nova.types import MotionSettings 8from nova.version import version 9 10__version__ = version 11 12__all__ = [ 13 "Nova", 14 "Cell", 15 "MotionGroup", 16 "Controller", 17 "speed_up_movement_controller", 18 "api", 19 "types", 20 "actions", 21 "MotionSettings", 22 "logger", 23 "__version__", 24]
29class Nova: 30 _api_client: ApiGateway 31 32 def __init__( 33 self, 34 *, 35 host: str | None = None, 36 access_token: str | None = None, 37 username: str | None = None, 38 password: str | None = None, 39 version: str = "v1", 40 verify_ssl: bool = True, 41 # TODO: deprecated 42 log_level: str = LOG_LEVEL, 43 ): 44 self._api_client = ApiGateway( 45 host=host, 46 access_token=access_token, 47 username=username, 48 password=password, 49 version=version, 50 verify_ssl=verify_ssl, 51 ) 52 53 def cell(self, cell_id: str = CELL_NAME) -> Cell: 54 return Cell(self._api_client, cell_id) 55 56 async def close(self): 57 return await self._api_client.close() 58 59 async def __aenter__(self): 60 return self 61 62 async def __aexit__(self, exc_type, exc_val, exc_tb): 63 await self.close()
32 def __init__( 33 self, 34 *, 35 host: str | None = None, 36 access_token: str | None = None, 37 username: str | None = None, 38 password: str | None = None, 39 version: str = "v1", 40 verify_ssl: bool = True, 41 # TODO: deprecated 42 log_level: str = LOG_LEVEL, 43 ): 44 self._api_client = ApiGateway( 45 host=host, 46 access_token=access_token, 47 username=username, 48 password=password, 49 version=version, 50 verify_ssl=verify_ssl, 51 )
66class Cell: 67 def __init__(self, api_gateway: ApiGateway, cell_id: str): 68 self._api_gateway = api_gateway 69 self._cell_id = cell_id 70 71 @property 72 def cell_id(self) -> str: 73 return self._cell_id 74 75 async def _get_controller_instances(self) -> list[api.models.ControllerInstance]: 76 response = await self._api_gateway.controller_api.list_controllers(cell=self._cell_id) 77 return response.instances 78 79 def _create_controller(self, controller_id: str) -> Controller: 80 return Controller( 81 configuration=Controller.Configuration( 82 nova_api=self._api_gateway.host, 83 nova_access_token=self._api_gateway.access_token, 84 nova_username=self._api_gateway.username, 85 nova_password=self._api_gateway.password, 86 cell_id=self._cell_id, 87 controller_id=controller_id, 88 id=controller_id, 89 ) 90 ) 91 92 async def _get_controller_instance(self, name: str) -> api.models.ControllerInstance | None: 93 """Get the controller instance 94 95 Args: 96 name (str): The name of the controller 97 98 Return: models.ControllerInstance: The controller instance 99 """ 100 controllers = await self._get_controller_instances() 101 controller = next((c for c in controllers if c.controller == name), None) 102 return controller 103 104 async def _wait_for_controller_ready(self, name: str, timeout: int): 105 """Waits for the controller to be available 106 107 Args: 108 name (str): The name of the controller 109 timeout (int): The time to wait for the controller to be available in seconds 110 """ 111 iteration = 0 112 controller = await self._get_controller_instance(name) 113 114 while iteration < timeout: 115 if controller is not None: 116 if ( 117 controller.error_details == "Controller not initialized or disposed" 118 or controller.error_details == "Initializing controller connection." 119 ): 120 await self._api_gateway.controller_api.get_current_robot_controller_state( 121 cell=self._cell_id, controller=controller.host 122 ) 123 elif controller.has_error: 124 # As long has an error its being initialized 125 logger.error(controller.error_details) 126 else: 127 # Controller is ready 128 return 129 130 logger.info(f"Waiting for {self._cell_id}/{name} controller availability") 131 await asyncio.sleep(1) 132 controller = await self._get_controller_instance(name) 133 iteration += 1 134 135 raise TimeoutError(f"Timeout waiting for {self._cell_id}/{name} controller availability") 136 137 async def add_virtual_robot_controller( 138 self, 139 name: str, 140 controller_type: api.models.VirtualControllerTypes, 141 controller_manufacturer: api.models.Manufacturer, 142 timeout: int = 25, 143 position: str | None = None, 144 ) -> Controller: 145 """Add a virtual robot controller to the cell 146 147 Args: 148 name (str): The name of the controller 149 controller_type (models.VirtualControllerTypes): The type of the controller 150 controller_manufacturer (models.Manufacturer): The manufacturer of the controller 151 timeout (int): The time to wait for the controller to be available in seconds 152 position (str): The initial position of the robot 153 154 """ 155 156 home_position = ( 157 position 158 if position is not None 159 else str(MANUFACTURER_HOME_POSITIONS.get(controller_manufacturer, [0.0] * 7)) 160 ) 161 162 await self._api_gateway.controller_api.add_robot_controller( 163 cell=self._cell_id, 164 robot_controller=api.models.RobotController( 165 name=name, 166 configuration=api.models.RobotControllerConfiguration( 167 api.models.VirtualController( 168 type=controller_type, 169 manufacturer=controller_manufacturer, 170 position=home_position, 171 ) 172 ), 173 ), 174 completion_timeout=timeout, 175 ) 176 # Technically not needed because of the completion_timeout but it handles edge cases right now 177 await self._wait_for_controller_ready(name, timeout) 178 controller_instance = await self._get_controller_instance(name) 179 if controller_instance is None: 180 raise ControllerNotFound(controller=name) 181 182 return self._create_controller(controller_instance.controller) 183 184 async def ensure_virtual_robot_controller( 185 self, 186 name: str, 187 controller_type: api.models.VirtualControllerTypes, 188 controller_manufacturer: api.models.Manufacturer, 189 ) -> "Controller": 190 controller_instance = await self._get_controller_instance(name) 191 if controller_instance: 192 return self._create_controller(controller_instance.controller) 193 return await self.add_virtual_robot_controller( 194 name, controller_type, controller_manufacturer 195 ) 196 197 async def controllers(self) -> list["Controller"]: 198 controller_instances = await self._get_controller_instances() 199 return [ 200 self._create_controller(controller_instance.controller) 201 for controller_instance in controller_instances 202 ] 203 204 async def controller(self, name: str) -> "Controller": 205 controller_instance = await self._get_controller_instance(name) 206 207 if controller_instance is None: 208 raise ControllerNotFound(controller=name) 209 210 return self._create_controller(controller_instance.controller) 211 212 async def delete_robot_controller(self, name: str, timeout: int = 25): 213 await self._api_gateway.controller_api.delete_robot_controller( 214 cell=self._cell_id, controller=name, completion_timeout=timeout 215 ) 216 217 async def get_robot_cell(self) -> RobotCell: 218 """Return the configured robot cell""" 219 controllers = await self.controllers() 220 return RobotCell(timer=None, **{controller.id: controller for controller in controllers})
137 async def add_virtual_robot_controller( 138 self, 139 name: str, 140 controller_type: api.models.VirtualControllerTypes, 141 controller_manufacturer: api.models.Manufacturer, 142 timeout: int = 25, 143 position: str | None = None, 144 ) -> Controller: 145 """Add a virtual robot controller to the cell 146 147 Args: 148 name (str): The name of the controller 149 controller_type (models.VirtualControllerTypes): The type of the controller 150 controller_manufacturer (models.Manufacturer): The manufacturer of the controller 151 timeout (int): The time to wait for the controller to be available in seconds 152 position (str): The initial position of the robot 153 154 """ 155 156 home_position = ( 157 position 158 if position is not None 159 else str(MANUFACTURER_HOME_POSITIONS.get(controller_manufacturer, [0.0] * 7)) 160 ) 161 162 await self._api_gateway.controller_api.add_robot_controller( 163 cell=self._cell_id, 164 robot_controller=api.models.RobotController( 165 name=name, 166 configuration=api.models.RobotControllerConfiguration( 167 api.models.VirtualController( 168 type=controller_type, 169 manufacturer=controller_manufacturer, 170 position=home_position, 171 ) 172 ), 173 ), 174 completion_timeout=timeout, 175 ) 176 # Technically not needed because of the completion_timeout but it handles edge cases right now 177 await self._wait_for_controller_ready(name, timeout) 178 controller_instance = await self._get_controller_instance(name) 179 if controller_instance is None: 180 raise ControllerNotFound(controller=name) 181 182 return self._create_controller(controller_instance.controller)
Add a virtual robot controller to the cell
Args: name (str): The name of the controller controller_type (models.VirtualControllerTypes): The type of the controller controller_manufacturer (models.Manufacturer): The manufacturer of the controller timeout (int): The time to wait for the controller to be available in seconds position (str): The initial position of the robot
184 async def ensure_virtual_robot_controller( 185 self, 186 name: str, 187 controller_type: api.models.VirtualControllerTypes, 188 controller_manufacturer: api.models.Manufacturer, 189 ) -> "Controller": 190 controller_instance = await self._get_controller_instance(name) 191 if controller_instance: 192 return self._create_controller(controller_instance.controller) 193 return await self.add_virtual_robot_controller( 194 name, controller_type, controller_manufacturer 195 )
217 async def get_robot_cell(self) -> RobotCell: 218 """Return the configured robot cell""" 219 controllers = await self.controllers() 220 return RobotCell(timer=None, **{controller.id: controller for controller in controllers})
Return the configured robot cell
81class MotionGroup(AbstractRobot): 82 def __init__(self, api_gateway: ApiGateway, cell: str, motion_group_id: str): 83 self._api_gateway = api_gateway 84 self._motion_api_client = api_gateway.motion_api 85 self._cell = cell 86 self._motion_group_id = motion_group_id 87 self._current_motion: str | None = None 88 self._optimizer_setup: wb.models.OptimizerSetup | None = None 89 super().__init__(id=motion_group_id) 90 91 async def open(self): 92 await self._api_gateway.motion_group_api.activate_motion_group( 93 cell=self._cell, motion_group=self._motion_group_id 94 ) 95 return self 96 97 async def close(self): 98 # RPS-1174: when a motion group is deactivated, RAE closes all open connections 99 # this behaviour is not desired in some cases, 100 # so for now we will not deactivate for the user 101 pass 102 103 @property 104 def motion_group_id(self) -> str: 105 return self._motion_group_id 106 107 @property 108 def current_motion(self) -> str | None: 109 # if not self._current_motion: 110 # raise ValueError("No MotionId attached. There is no planned motion available.") 111 return self._current_motion 112 113 async def _plan_with_collision_check( 114 self, 115 actions: list[Action], 116 tcp: str, 117 start_joint_position: tuple[float, ...] | None = None, 118 optimizer_setup: wb.models.OptimizerSetup | None = None, 119 ) -> wb.models.JointTrajectory: 120 """ 121 This method plans a trajectory and checks for collisions. 122 The collision check only happens if the actions have collision scene data. 123 124 You must provide the exact same collision data into all the actions. 125 Because the underlying API supports collision checks for the whole trajectory only. 126 127 Raises: 128 InconsistentCollisionScenes: If the collision scene data is not consistent across all actions 129 130 Your actions should follow below rules to be considered consistent: 131 1- They all should have the same collision scene data 132 2- They all should have no collision data 133 134 PlanTrajectoryFailed: If the trajectory planning failed including the collision check 135 136 For more information about this API, please refer to the plan_trajectory in the API documentation. 137 138 Args: 139 actions: list of actions to plan, current supported actions are Motion and WriteActions 140 WriteAction you specify on your path is handled in a performant way. 141 Please check execute_trajectory.motion_command.set_io for more information. 142 tcp: The tool to use 143 start_joint_position: The starting joint position, if none provided, current position of the robot is used 144 optimizer_setup: The optimizer setup 145 146 Returns: planned joint trajectory 147 148 """ 149 # PREPARE THE REQUEST 150 collision_scenes = self._validate_collision_scenes(actions) 151 start_joint_position = start_joint_position or await self.joints() 152 robot_setup = optimizer_setup or await self._get_optimizer_setup(tcp=tcp) 153 154 motion_commands = CombinedActions(items=tuple(actions)).to_motion_command() # type: ignore 155 156 static_colliders = None 157 collision_motion_group = None 158 if collision_scenes and len(collision_scenes) > 0: 159 static_colliders = collision_scenes[0].colliders 160 161 motion_group_type = robot_setup.motion_group_type 162 if ( 163 collision_scenes[0].motion_groups 164 and motion_group_type in collision_scenes[0].motion_groups 165 ): 166 collision_motion_group = collision_scenes[0].motion_groups[motion_group_type] 167 168 request = wb.models.PlanTrajectoryRequest( 169 robot_setup=robot_setup, 170 start_joint_position=list(start_joint_position), 171 motion_commands=motion_commands, 172 static_colliders=static_colliders, 173 collision_motion_group=collision_motion_group, 174 ) 175 176 # EXECUTE THE API CALL 177 plan_trajectory_response = await self._motion_api_client.plan_trajectory( 178 cell=self._cell, plan_trajectory_request=request 179 ) 180 if isinstance( 181 plan_trajectory_response.response.actual_instance, 182 wb.models.PlanTrajectoryFailedResponse, 183 ): 184 # TODO: handle partially executable path 185 raise PlanTrajectoryFailed(plan_trajectory_response.response.actual_instance) 186 return plan_trajectory_response.response.actual_instance 187 188 def _validate_collision_scenes(self, actions: list[Action]) -> list[models.CollisionScene]: 189 motion_count = len([action for action in actions if isinstance(action, Motion)]) 190 collision_scenes = [ 191 action.collision_scene 192 for action in actions 193 if isinstance(action, CollisionFreeMotion) and action.collision_scene is not None 194 ] 195 196 if len(collision_scenes) != 0 and len(collision_scenes) != motion_count: 197 raise InconsistentCollisionScenes( 198 "Only some of the actions have collision scene. Either specify it for all or none." 199 ) 200 201 # If a collision scene is provided, the same should be provided for all the collision scene 202 if len(collision_scenes) > 1: 203 first_scene = collision_scenes[0] 204 if not all( 205 compare_collision_scenes(first_scene, scene) for scene in collision_scenes[1:] 206 ): 207 raise InconsistentCollisionScenes( 208 "All actions must use the same collision scene but some are different" 209 ) 210 211 return collision_scenes 212 213 # TODO: we get the optimizer setup from as an input because 214 # it has a velocity setting which is used in collision free movement, I need to double check this 215 async def _plan_collision_free( 216 self, 217 action: CollisionFreeMotion, 218 tcp: str, 219 start_joint_position: list[float], 220 optimizer_setup: wb.models.OptimizerSetup | None = None, 221 ) -> wb.models.JointTrajectory: 222 """ 223 This method plans a trajectory and avoids collisions. 224 This means if there is a collision along the way to the target pose or joint positions, 225 It will adjust the trajectory to avoid the collision. 226 227 The collision check only happens if the action have collision scene data. 228 229 For more information about this API, please refer to the plan_collision_free_ptp in the API documentation. 230 231 Args: 232 action: The target pose or joint positions to reach 233 tcp: The tool to use 234 start_joint_position: The starting joint position, if none provided, current position of the robot is used 235 optimizer_setup: The optimizer setup 236 237 Returns: planned joint trajectory 238 239 240 """ 241 target = wb.models.PlanCollisionFreePTPRequestTarget( 242 **action.model_dump(exclude_unset=True) 243 ) 244 robot_setup = optimizer_setup or await self._get_optimizer_setup(tcp=tcp) 245 246 static_colliders = None 247 collision_motion_group = None 248 collision_scene = action.collision_scene 249 if collision_scene and collision_scene.colliders: 250 static_colliders = collision_scene.colliders 251 252 if ( 253 collision_scene.motion_groups 254 and robot_setup.motion_group_type in collision_scene.motion_groups 255 ): 256 collision_motion_group = collision_scene.motion_groups[ 257 robot_setup.motion_group_type 258 ] 259 260 request: wb.models.PlanCollisionFreePTPRequest = wb.models.PlanCollisionFreePTPRequest( 261 robot_setup=robot_setup, 262 start_joint_position=start_joint_position, 263 target=target, 264 static_colliders=static_colliders, 265 collision_motion_group=collision_motion_group, 266 ) 267 268 plan_result = await self._motion_api_client.plan_collision_free_ptp( 269 cell=self._cell, plan_collision_free_ptp_request=request 270 ) 271 272 if isinstance(plan_result.response.actual_instance, wb.models.PlanTrajectoryFailedResponse): 273 raise PlanTrajectoryFailed(plan_result.response.actual_instance) 274 return plan_result.response.actual_instance 275 276 async def _plan( 277 self, 278 actions: list[Action], 279 tcp: str, 280 start_joint_position: tuple[float, ...] | None = None, 281 optimizer_setup: wb.models.OptimizerSetup | None = None, 282 ) -> wb.models.JointTrajectory: 283 if not actions: 284 raise ValueError("No actions provided") 285 286 current_joints = start_joint_position or await self.joints() 287 robot_setup = optimizer_setup or await self._get_optimizer_setup(tcp=tcp) 288 289 all_trajectories = [] 290 for batch in split_actions_into_batches(actions): 291 if len(batch) == 0: 292 raise ValueError("Empty batch of actions") 293 294 if isinstance(batch[0], CollisionFreeMotion): 295 motion: CollisionFreeMotion = cast(CollisionFreeMotion, batch[0]) 296 trajectory = await self._plan_collision_free( 297 action=motion, 298 tcp=tcp, 299 start_joint_position=list(current_joints), 300 optimizer_setup=robot_setup, 301 ) 302 all_trajectories.append(trajectory) 303 # the last joint position of this trajectory is the starting point for the next one 304 current_joints = tuple(trajectory.joint_positions[-1].joints) 305 else: 306 trajectory = await self._plan_with_collision_check( 307 actions=batch, 308 tcp=tcp, 309 start_joint_position=current_joints, 310 optimizer_setup=robot_setup, 311 ) 312 all_trajectories.append(trajectory) 313 # the last joint position of this trajectory is the starting point for the next one 314 current_joints = tuple(trajectory.joint_positions[-1].joints) 315 316 return combine_trajectories(all_trajectories) 317 318 # TODO: refactor and simplify code, tests are already there 319 # TODO: split into batches when the collision scene changes in a batch of collision free motions 320 321 async def _execute( 322 self, 323 joint_trajectory: wb.models.JointTrajectory, 324 tcp: str, 325 actions: list[Action], 326 movement_controller: MovementController | None, 327 ) -> AsyncIterable[MovementResponse]: 328 if movement_controller is None: 329 movement_controller = move_forward 330 331 # Load planned trajectory 332 load_plan_response = await self._load_planned_motion(joint_trajectory, tcp) 333 334 # Move to start position 335 number_of_joints = await self._get_number_of_joints() 336 joints_velocities = [MAX_JOINT_VELOCITY_PREPARE_MOVE] * number_of_joints 337 movement_stream = await self.move_to_start_position(joints_velocities, load_plan_response) 338 339 # If there's an initial consumer, feed it the data 340 async for move_to_response in movement_stream: 341 # TODO: refactor 342 if ( 343 move_to_response.state is None 344 or move_to_response.state.motion_groups is None 345 or len(move_to_response.state.motion_groups) == 0 346 or move_to_response.move_response is None 347 or move_to_response.move_response.current_location_on_trajectory is None 348 ): 349 continue 350 351 yield move_to_response 352 353 controller = movement_controller( 354 MovementControllerContext( 355 combined_actions=CombinedActions(items=tuple(actions)), # type: ignore 356 motion_id=load_plan_response.motion, 357 ) 358 ) 359 360 def stop_condition(response: wb.models.ExecuteTrajectoryResponse) -> bool: 361 instance = response.actual_instance 362 # Stop when standstill indicates motion ended 363 return ( 364 isinstance(instance, wb.models.Standstill) 365 and instance.standstill.reason == wb.models.StandstillReason.REASON_MOTION_ENDED 366 ) 367 368 execute_response_streaming_controller = StreamExtractor(controller, stop_condition) 369 execution_task = asyncio.create_task( 370 self._api_gateway.motion_api.execute_trajectory( 371 self._cell, execute_response_streaming_controller 372 ) 373 ) 374 async for execute_response in execute_response_streaming_controller: 375 yield execute_response 376 await execution_task 377 378 async def _get_number_of_joints(self) -> int: 379 spec = await self._api_gateway.motion_group_infos_api.get_motion_group_specification( 380 cell=self._cell, motion_group=self.motion_group_id 381 ) 382 return len(spec.mechanical_joint_limits) 383 384 async def _get_optimizer_setup(self, tcp: str) -> wb.models.OptimizerSetup: 385 # TODO: mypy failed on main branch, need to check 386 if self._optimizer_setup is None or self._optimizer_setup.tcp != tcp: # type: ignore 387 self._optimizer_setup = ( 388 await self._api_gateway.motion_group_infos_api.get_optimizer_configuration( 389 cell=self._cell, motion_group=self._motion_group_id, tcp=tcp 390 ) 391 ) 392 # TODO: mypy failed on code from main branch need to check 393 return self._optimizer_setup # type: ignore 394 395 async def _load_planned_motion( 396 self, joint_trajectory: wb.models.JointTrajectory, tcp: str 397 ) -> wb.models.PlanSuccessfulResponse: 398 load_plan_response = await self._api_gateway.motion_api.load_planned_motion( 399 cell=self._cell, 400 planned_motion=wb.models.PlannedMotion( 401 motion_group=self.motion_group_id, 402 times=joint_trajectory.times, 403 joint_positions=joint_trajectory.joint_positions, 404 locations=joint_trajectory.locations, 405 tcp=tcp, 406 ), 407 ) 408 409 if ( 410 load_plan_response.plan_failed_on_trajectory_response is not None 411 or load_plan_response.plan_failed_on_trajectory_response is not None 412 ): 413 raise LoadPlanFailed(load_plan_response) 414 415 return load_plan_response.plan_successful_response 416 417 async def move_to_start_position( 418 self, joint_velocities, load_plan_response: LoadPlanResponse 419 ) -> InitialMovementStream: 420 limit_override = wb.models.LimitsOverride() 421 if joint_velocities is not None: 422 limit_override.joint_velocity_limits = wb.models.Joints(joints=joint_velocities) 423 424 move_to_trajectory_stream = ( 425 self._api_gateway.motion_api.stream_move_to_trajectory_via_joint_ptp( 426 cell=self._cell, motion=load_plan_response.motion, location_on_trajectory=0 427 ) 428 ) 429 return move_to_trajectory_stream 430 431 async def stop(self): 432 logger.debug(f"Stopping motion of {self}...") 433 try: 434 await self._motion_api_client.stop_execution( 435 cell=self._cell, motion=self.current_motion 436 ) 437 logger.debug(f"Motion {self.current_motion} stopped.") 438 except ValueError as e: 439 logger.debug(f"No motion to stop for {self}: {e}") 440 441 async def get_state(self, tcp: str | None = None) -> RobotState: 442 response = await self._api_gateway.motion_group_infos_api.get_current_motion_group_state( 443 cell=self._cell, motion_group=self.motion_group_id, tcp=tcp 444 ) 445 return RobotState( 446 pose=Pose(response.state.tcp_pose), joints=tuple(response.state.joint_position.joints) 447 ) 448 449 async def joints(self) -> tuple: 450 state = await self.get_state() 451 if state.joints is None: 452 raise ValueError( 453 f"No joint positions available for motion group {self._motion_group_id}" 454 ) 455 return state.joints 456 457 async def tcp_pose(self, tcp: str | None = None) -> Pose: 458 state = await self.get_state(tcp=tcp) 459 return state.pose 460 461 async def tcps(self) -> list[wb.models.RobotTcp]: 462 """Get the available tool center points (TCPs)""" 463 response = await self._api_gateway.motion_group_infos_api.list_tcps( 464 cell=self._cell, motion_group=self.motion_group_id 465 ) 466 return response.tcps 467 468 async def tcp_names(self) -> list[str]: 469 """Get the names of the available tool center points (TCPs)""" 470 return [tcp.id for tcp in await self.tcps()] 471 472 async def active_tcp(self) -> wb.models.RobotTcp: 473 active_tcp = await self._api_gateway.motion_group_infos_api.get_active_tcp( 474 cell=self._cell, motion_group=self.motion_group_id 475 ) 476 return active_tcp 477 478 async def active_tcp_name(self) -> str: 479 active_tcp = await self.active_tcp() 480 return active_tcp.id
An interface for real and simulated robots
82 def __init__(self, api_gateway: ApiGateway, cell: str, motion_group_id: str): 83 self._api_gateway = api_gateway 84 self._motion_api_client = api_gateway.motion_api 85 self._cell = cell 86 self._motion_group_id = motion_group_id 87 self._current_motion: str | None = None 88 self._optimizer_setup: wb.models.OptimizerSetup | None = None 89 super().__init__(id=motion_group_id)
91 async def open(self): 92 await self._api_gateway.motion_group_api.activate_motion_group( 93 cell=self._cell, motion_group=self._motion_group_id 94 ) 95 return self
Allocates the external hardware resource (i.e. establish a connection)
97 async def close(self): 98 # RPS-1174: when a motion group is deactivated, RAE closes all open connections 99 # this behaviour is not desired in some cases, 100 # so for now we will not deactivate for the user 101 pass
Release the external hardware (i.e. close connection or set mode of external hardware back)
417 async def move_to_start_position( 418 self, joint_velocities, load_plan_response: LoadPlanResponse 419 ) -> InitialMovementStream: 420 limit_override = wb.models.LimitsOverride() 421 if joint_velocities is not None: 422 limit_override.joint_velocity_limits = wb.models.Joints(joints=joint_velocities) 423 424 move_to_trajectory_stream = ( 425 self._api_gateway.motion_api.stream_move_to_trajectory_via_joint_ptp( 426 cell=self._cell, motion=load_plan_response.motion, location_on_trajectory=0 427 ) 428 ) 429 return move_to_trajectory_stream
431 async def stop(self): 432 logger.debug(f"Stopping motion of {self}...") 433 try: 434 await self._motion_api_client.stop_execution( 435 cell=self._cell, motion=self.current_motion 436 ) 437 logger.debug(f"Motion {self.current_motion} stopped.") 438 except ValueError as e: 439 logger.debug(f"No motion to stop for {self}: {e}")
Stop behaviour of the robot
441 async def get_state(self, tcp: str | None = None) -> RobotState: 442 response = await self._api_gateway.motion_group_infos_api.get_current_motion_group_state( 443 cell=self._cell, motion_group=self.motion_group_id, tcp=tcp 444 ) 445 return RobotState( 446 pose=Pose(response.state.tcp_pose), joints=tuple(response.state.joint_position.joints) 447 )
Current state (pose, joints) of the robot based on the tcp.
Args: tcp (str): The id of the tool center point (TCP) to be used for tcp_pose in response. If not set, the flange pose is returned as tcp_pose.
Returns: the current state of the robot
449 async def joints(self) -> tuple: 450 state = await self.get_state() 451 if state.joints is None: 452 raise ValueError( 453 f"No joint positions available for motion group {self._motion_group_id}" 454 ) 455 return state.joints
Return the current joint positions of the robot
Returns: the current joint positions
457 async def tcp_pose(self, tcp: str | None = None) -> Pose: 458 state = await self.get_state(tcp=tcp) 459 return state.pose
Return the current pose of the robot based on the tcp
Args: tcp (str): The id of the tool center point (TCP) to be used for tcp_pose in response. If not set, the flange pose is returned as tcp_pose.
Returns: the current pose of the robot
461 async def tcps(self) -> list[wb.models.RobotTcp]: 462 """Get the available tool center points (TCPs)""" 463 response = await self._api_gateway.motion_group_infos_api.list_tcps( 464 cell=self._cell, motion_group=self.motion_group_id 465 ) 466 return response.tcps
Get the available tool center points (TCPs)
468 async def tcp_names(self) -> list[str]: 469 """Get the names of the available tool center points (TCPs)""" 470 return [tcp.id for tcp in await self.tcps()]
Get the names of the available tool center points (TCPs)
472 async def active_tcp(self) -> wb.models.RobotTcp: 473 active_tcp = await self._api_gateway.motion_group_infos_api.get_active_tcp( 474 cell=self._cell, motion_group=self.motion_group_id 475 ) 476 return active_tcp
Return the active TCP of the robot
Returns: the active TCP
11class Controller(Sized, AbstractController, NovaDevice, IODevice): 12 class Configuration(NovaDevice.Configuration): 13 type: Literal["controller"] = "controller" 14 id: str = "controller" 15 cell_id: str 16 controller_id: str 17 18 def __init__(self, configuration: Configuration): 19 super().__init__(configuration) 20 self._controller_api = self._nova_api.controller_api 21 self._motion_group_api = self._nova_api.motion_group_api 22 self._activated_motion_group_ids: list[str] = [] 23 self._io_access = IOAccess( 24 api_gateway=self._nova_api, 25 cell=self.configuration.cell_id, 26 controller_id=self.configuration.controller_id, 27 ) 28 29 @property 30 def controller_id(self) -> str: 31 return self.configuration.controller_id 32 33 async def open(self): 34 motion_group_ids = await self.activated_motion_group_ids() 35 self._activated_motion_group_ids = motion_group_ids 36 logger.info(f"Found motion group {motion_group_ids}") 37 return self 38 39 async def close(self): 40 # RPS-1174: when a motion group is deactivated, RAE closes all open connections 41 # this behaviour is not desired in some cases, 42 # so for now we will not deactivate for the user 43 pass 44 45 def __len__(self) -> int: 46 return len(self._activated_motion_group_ids) 47 48 # TODO: should accept the exact motion group id as str 49 def motion_group(self, motion_group_id: int = 0) -> MotionGroup: 50 return MotionGroup( 51 api_gateway=self._nova_api, 52 cell=self.configuration.cell_id, 53 motion_group_id=f"{motion_group_id}@{self.configuration.controller_id}", 54 ) 55 56 def __getitem__(self, motion_group_id: int) -> MotionGroup: 57 return self.motion_group(motion_group_id) 58 59 async def activated_motion_group_ids(self) -> list[str]: 60 activate_all_motion_groups_response = ( 61 await self._motion_group_api.activate_all_motion_groups( 62 cell=self.configuration.cell_id, controller=self.configuration.controller_id 63 ) 64 ) 65 motion_groups = activate_all_motion_groups_response.instances 66 return [mg.motion_group for mg in motion_groups] 67 68 async def activated_motion_groups(self) -> list[MotionGroup]: 69 motion_group_ids = await self.activated_motion_group_ids() 70 return [self.motion_group(int(mg.split("@")[0])) for mg in motion_group_ids] 71 72 def get_robots(self) -> dict[str, AbstractRobot]: 73 return { 74 motion_group_id: self.motion_group(int(motion_group_id.split("@")[0])) 75 for motion_group_id in self._activated_motion_group_ids 76 } 77 78 async def read(self, key: str) -> ValueType: 79 return await self._io_access.read(key) 80 81 async def write(self, key: str, value: ValueType) -> None: 82 return await self._io_access.write(key, value)
A device which is configurable
33 async def open(self): 34 motion_group_ids = await self.activated_motion_group_ids() 35 self._activated_motion_group_ids = motion_group_ids 36 logger.info(f"Found motion group {motion_group_ids}") 37 return self
Allocates the external hardware resource (i.e. establish a connection)
39 async def close(self): 40 # RPS-1174: when a motion group is deactivated, RAE closes all open connections 41 # this behaviour is not desired in some cases, 42 # so for now we will not deactivate for the user 43 pass
Release the external hardware (i.e. close connection or set mode of external hardware back)
59 async def activated_motion_group_ids(self) -> list[str]: 60 activate_all_motion_groups_response = ( 61 await self._motion_group_api.activate_all_motion_groups( 62 cell=self.configuration.cell_id, controller=self.configuration.controller_id 63 ) 64 ) 65 motion_groups = activate_all_motion_groups_response.instances 66 return [mg.motion_group for mg in motion_groups]
111def speed_up( 112 context: MovementControllerContext, on_movement: Callable[[MotionState | None], None] 113) -> MovementControllerFunction: 114 async def movement_controller( 115 response_stream: ExecuteTrajectoryResponseStream, 116 ) -> ExecuteTrajectoryRequestStream: 117 # The first request is to initialize the movement 118 yield wb.models.InitializeMovementRequest(trajectory=context.motion_id, initial_location=0) # type: ignore 119 120 # then we get the response 121 initialize_movement_response = await anext(response_stream) 122 if isinstance( 123 initialize_movement_response.actual_instance, wb.models.InitializeMovementResponse 124 ): 125 r1 = initialize_movement_response.actual_instance 126 if not r1.init_response.succeeded: 127 raise InitMovementFailed(r1.init_response) 128 129 # The second request is to start the movement 130 set_io_list = context.combined_actions.to_set_io() 131 yield wb.models.StartMovementRequest( 132 set_ios=set_io_list, start_on_io=None, pause_on_io=None 133 ) # type: ignore 134 135 counter = 0 136 latest_speed = 10 137 # then we wait until the movement is finished 138 async for execute_trajectory_response in response_stream: 139 counter += 1 140 instance = execute_trajectory_response.actual_instance 141 # Send the current location to the consume 142 if isinstance(instance, wb.models.Movement): 143 motion_state = movement_to_motion_state(instance) 144 if motion_state: 145 on_movement(motion_state) 146 147 # Terminate the generator 148 if isinstance(instance, wb.models.Standstill): 149 if instance.standstill.reason == wb.models.StandstillReason.REASON_MOTION_ENDED: 150 on_movement(None) 151 return 152 153 if isinstance(instance, wb.models.PlaybackSpeedResponse): 154 playback_speed = instance.playback_speed_response 155 logger.info(f"Current playback speed: {playback_speed}") 156 157 if counter % 10 == 0: 158 yield wb.models.ExecuteTrajectoryRequest( 159 wb.models.PlaybackSpeedRequest(playback_speed_in_percent=latest_speed) 160 ) 161 counter = 0 162 latest_speed += 5 163 if latest_speed > 100: 164 latest_speed = 100 165 166 return movement_controller
The type of the None singleton.
6class MotionSettings(pydantic.BaseModel): 7 """ 8 Settings for an action. This is closely related to the `MotionCommand` in the API. 9 See planTrajectory.motion_commands for more information. 10 11 Motion settings are immutable; if you need to change a setting, create a copy and update the new object. 12 13 Attributes: 14 min_blending_velocity: 15 A minimum velocity for blending, in percent. Cannot be used if `blending` is set. 16 17 position_zone_radius: 18 Defines the position zone radius. 19 20 joint_velocity_limits: 21 Maximum joint velocities 22 23 joint_acceleration_limits: 24 Maximum joint accelerations 25 26 tcp_velocity_limit: 27 Maximum TCP velocity 28 29 tcp_acceleration_limit: 30 Maximum TCP acceleration 31 32 tcp_orientation_velocity_limit: 33 Maximum TCP orientation velocity 34 35 tcp_orientation_acceleration_limit: 36 Maximum TCP orientation acceleration 37 """ 38 39 min_blending_velocity: int | None = pydantic.Field(default=None) 40 position_zone_radius: float | None = pydantic.Field(default=None) 41 joint_velocity_limits: tuple[float, ...] | None = pydantic.Field(default=None) 42 joint_acceleration_limits: tuple[float, ...] | None = pydantic.Field(default=None) 43 tcp_velocity_limit: float | None = pydantic.Field(default=None) 44 tcp_acceleration_limit: float | None = pydantic.Field(default=None) 45 tcp_orientation_velocity_limit: float | None = pydantic.Field(default=None) 46 tcp_orientation_acceleration_limit: float | None = pydantic.Field(default=None) 47 48 class Config: 49 frozen = True 50 51 @classmethod 52 def field_to_varname(cls, field): 53 return f"__ms_{field}" 54 55 @pydantic.model_validator(mode="after") 56 def validate_blending_settings(self) -> "MotionSettings": 57 if self.min_blending_velocity and self.position_zone_radius: 58 raise ValueError("Can't set both min_blending_velocity and blending") 59 return self 60 61 def has_blending_settings(self) -> bool: 62 return any([self.min_blending_velocity, self.position_zone_radius]) 63 64 def has_limits_override(self) -> bool: 65 return any( 66 [ 67 self.tcp_velocity_limit, 68 self.tcp_acceleration_limit, 69 self.tcp_orientation_velocity_limit, 70 self.tcp_orientation_acceleration_limit, 71 self.joint_velocity_limits, 72 self.joint_acceleration_limits, 73 ] 74 ) 75 76 def as_limits_settings(self) -> wb.models.LimitsOverride: 77 return wb.models.LimitsOverride( 78 joint_velocity_limits=wb.models.Joints(joints=self.joint_velocity_limits) # type: ignore 79 if self.joint_velocity_limits 80 else None, 81 joint_acceleration_limits=wb.models.Joints(joints=self.joint_acceleration_limits) # type: ignore 82 if self.joint_acceleration_limits 83 else None, 84 tcp_velocity_limit=self.tcp_velocity_limit, 85 tcp_acceleration_limit=self.tcp_acceleration_limit, 86 tcp_orientation_velocity_limit=self.tcp_orientation_velocity_limit, 87 tcp_orientation_acceleration_limit=self.tcp_orientation_acceleration_limit, 88 ) 89 90 def as_blending_setting(self) -> wb.models.MotionCommandBlending: 91 if self.position_zone_radius: 92 return wb.models.MotionCommandBlending( 93 wb.models.BlendingPosition( 94 position_zone_radius=self.position_zone_radius, blending_name="BlendingPosition" 95 ) 96 ) 97 return wb.models.MotionCommandBlending( 98 wb.models.BlendingAuto( 99 min_velocity_in_percent=self.min_blending_velocity, blending_name="BlendingAuto" 100 ) 101 )
Settings for an action. This is closely related to the MotionCommand
in the API.
See planTrajectory.motion_commands for more information.
Motion settings are immutable; if you need to change a setting, create a copy and update the new object.
Attributes:
min_blending_velocity:
A minimum velocity for blending, in percent. Cannot be used if blending
is set.
position_zone_radius:
Defines the position zone radius.
joint_velocity_limits:
Maximum joint velocities
joint_acceleration_limits:
Maximum joint accelerations
tcp_velocity_limit:
Maximum TCP velocity
tcp_acceleration_limit:
Maximum TCP acceleration
tcp_orientation_velocity_limit:
Maximum TCP orientation velocity
tcp_orientation_acceleration_limit:
Maximum TCP orientation acceleration
76 def as_limits_settings(self) -> wb.models.LimitsOverride: 77 return wb.models.LimitsOverride( 78 joint_velocity_limits=wb.models.Joints(joints=self.joint_velocity_limits) # type: ignore 79 if self.joint_velocity_limits 80 else None, 81 joint_acceleration_limits=wb.models.Joints(joints=self.joint_acceleration_limits) # type: ignore 82 if self.joint_acceleration_limits 83 else None, 84 tcp_velocity_limit=self.tcp_velocity_limit, 85 tcp_acceleration_limit=self.tcp_acceleration_limit, 86 tcp_orientation_velocity_limit=self.tcp_orientation_velocity_limit, 87 tcp_orientation_acceleration_limit=self.tcp_orientation_acceleration_limit, 88 )
90 def as_blending_setting(self) -> wb.models.MotionCommandBlending: 91 if self.position_zone_radius: 92 return wb.models.MotionCommandBlending( 93 wb.models.BlendingPosition( 94 position_zone_radius=self.position_zone_radius, blending_name="BlendingPosition" 95 ) 96 ) 97 return wb.models.MotionCommandBlending( 98 wb.models.BlendingAuto( 99 min_velocity_in_percent=self.min_blending_velocity, blending_name="BlendingAuto" 100 ) 101 )