nova

 1from nova import actions, api, types
 2from nova.core.controller import Controller
 3from nova.core.logging import logger
 4from nova.core.motion_group import MotionGroup
 5from nova.core.movement_controller import speed_up as speed_up_movement_controller
 6from nova.core.nova import Cell, Nova
 7from nova.types import MotionSettings
 8from nova.version import version
 9
10__version__ = version
11
12__all__ = [
13    "Nova",
14    "Cell",
15    "MotionGroup",
16    "Controller",
17    "speed_up_movement_controller",
18    "api",
19    "types",
20    "actions",
21    "MotionSettings",
22    "logger",
23    "__version__",
24]
class Nova:
29class Nova:
30    _api_client: ApiGateway
31
32    def __init__(
33        self,
34        *,
35        host: str | None = None,
36        access_token: str | None = None,
37        username: str | None = None,
38        password: str | None = None,
39        version: str = "v1",
40        verify_ssl: bool = True,
41        # TODO: deprecated
42        log_level: str = LOG_LEVEL,
43    ):
44        self._api_client = ApiGateway(
45            host=host,
46            access_token=access_token,
47            username=username,
48            password=password,
49            version=version,
50            verify_ssl=verify_ssl,
51        )
52
53    def cell(self, cell_id: str = CELL_NAME) -> Cell:
54        return Cell(self._api_client, cell_id)
55
56    async def close(self):
57        return await self._api_client.close()
58
59    async def __aenter__(self):
60        return self
61
62    async def __aexit__(self, exc_type, exc_val, exc_tb):
63        await self.close()
Nova( *, host: str | None = None, access_token: str | None = None, username: str | None = None, password: str | None = None, version: str = 'v1', verify_ssl: bool = True, log_level: str = 'INFO')
32    def __init__(
33        self,
34        *,
35        host: str | None = None,
36        access_token: str | None = None,
37        username: str | None = None,
38        password: str | None = None,
39        version: str = "v1",
40        verify_ssl: bool = True,
41        # TODO: deprecated
42        log_level: str = LOG_LEVEL,
43    ):
44        self._api_client = ApiGateway(
45            host=host,
46            access_token=access_token,
47            username=username,
48            password=password,
49            version=version,
50            verify_ssl=verify_ssl,
51        )
def cell(self, cell_id: str = 'cell') -> Cell:
53    def cell(self, cell_id: str = CELL_NAME) -> Cell:
54        return Cell(self._api_client, cell_id)
async def close(self):
56    async def close(self):
57        return await self._api_client.close()
class Cell:
 66class Cell:
 67    def __init__(self, api_gateway: ApiGateway, cell_id: str):
 68        self._api_gateway = api_gateway
 69        self._cell_id = cell_id
 70
 71    @property
 72    def cell_id(self) -> str:
 73        return self._cell_id
 74
 75    async def _get_controller_instances(self) -> list[api.models.ControllerInstance]:
 76        response = await self._api_gateway.controller_api.list_controllers(cell=self._cell_id)
 77        return response.instances
 78
 79    def _create_controller(self, controller_id: str) -> Controller:
 80        return Controller(
 81            configuration=Controller.Configuration(
 82                nova_api=self._api_gateway.host,
 83                nova_access_token=self._api_gateway.access_token,
 84                nova_username=self._api_gateway.username,
 85                nova_password=self._api_gateway.password,
 86                cell_id=self._cell_id,
 87                controller_id=controller_id,
 88                id=controller_id,
 89            )
 90        )
 91
 92    async def _get_controller_instance(self, name: str) -> api.models.ControllerInstance | None:
 93        """Get the controller instance
 94
 95        Args:
 96            name (str): The name of the controller
 97
 98        Return: models.ControllerInstance: The controller instance
 99        """
100        controllers = await self._get_controller_instances()
101        controller = next((c for c in controllers if c.controller == name), None)
102        return controller
103
104    async def _wait_for_controller_ready(self, name: str, timeout: int):
105        """Waits for the controller to be available
106
107        Args:
108            name (str): The name of the controller
109            timeout (int): The time to wait for the controller to be available in seconds
110        """
111        iteration = 0
112        controller = await self._get_controller_instance(name)
113
114        while iteration < timeout:
115            if controller is not None:
116                if (
117                    controller.error_details == "Controller not initialized or disposed"
118                    or controller.error_details == "Initializing controller connection."
119                ):
120                    await self._api_gateway.controller_api.get_current_robot_controller_state(
121                        cell=self._cell_id, controller=controller.host
122                    )
123                elif controller.has_error:
124                    # As long has an error its being initialized
125                    logger.error(controller.error_details)
126                else:
127                    # Controller is ready
128                    return
129
130            logger.info(f"Waiting for {self._cell_id}/{name} controller availability")
131            await asyncio.sleep(1)
132            controller = await self._get_controller_instance(name)
133            iteration += 1
134
135        raise TimeoutError(f"Timeout waiting for {self._cell_id}/{name} controller availability")
136
137    async def add_virtual_robot_controller(
138        self,
139        name: str,
140        controller_type: api.models.VirtualControllerTypes,
141        controller_manufacturer: api.models.Manufacturer,
142        timeout: int = 25,
143        position: str | None = None,
144    ) -> Controller:
145        """Add a virtual robot controller to the cell
146
147        Args:
148            name (str): The name of the controller
149            controller_type (models.VirtualControllerTypes): The type of the controller
150            controller_manufacturer (models.Manufacturer): The manufacturer of the controller
151            timeout (int): The time to wait for the controller to be available in seconds
152            position (str): The initial position of the robot
153
154        """
155
156        home_position = (
157            position
158            if position is not None
159            else str(MANUFACTURER_HOME_POSITIONS.get(controller_manufacturer, [0.0] * 7))
160        )
161
162        await self._api_gateway.controller_api.add_robot_controller(
163            cell=self._cell_id,
164            robot_controller=api.models.RobotController(
165                name=name,
166                configuration=api.models.RobotControllerConfiguration(
167                    api.models.VirtualController(
168                        type=controller_type,
169                        manufacturer=controller_manufacturer,
170                        position=home_position,
171                    )
172                ),
173            ),
174            completion_timeout=timeout,
175        )
176        # Technically not needed because of the completion_timeout but it handles edge cases right now
177        await self._wait_for_controller_ready(name, timeout)
178        controller_instance = await self._get_controller_instance(name)
179        if controller_instance is None:
180            raise ControllerNotFound(controller=name)
181
182        return self._create_controller(controller_instance.controller)
183
184    async def ensure_virtual_robot_controller(
185        self,
186        name: str,
187        controller_type: api.models.VirtualControllerTypes,
188        controller_manufacturer: api.models.Manufacturer,
189    ) -> "Controller":
190        controller_instance = await self._get_controller_instance(name)
191        if controller_instance:
192            return self._create_controller(controller_instance.controller)
193        return await self.add_virtual_robot_controller(
194            name, controller_type, controller_manufacturer
195        )
196
197    async def controllers(self) -> list["Controller"]:
198        controller_instances = await self._get_controller_instances()
199        return [
200            self._create_controller(controller_instance.controller)
201            for controller_instance in controller_instances
202        ]
203
204    async def controller(self, name: str) -> "Controller":
205        controller_instance = await self._get_controller_instance(name)
206
207        if controller_instance is None:
208            raise ControllerNotFound(controller=name)
209
210        return self._create_controller(controller_instance.controller)
211
212    async def delete_robot_controller(self, name: str, timeout: int = 25):
213        await self._api_gateway.controller_api.delete_robot_controller(
214            cell=self._cell_id, controller=name, completion_timeout=timeout
215        )
216
217    async def get_robot_cell(self) -> RobotCell:
218        """Return the configured robot cell"""
219        controllers = await self.controllers()
220        return RobotCell(timer=None, **{controller.id: controller for controller in controllers})
Cell(api_gateway: nova.core.gateway.ApiGateway, cell_id: str)
67    def __init__(self, api_gateway: ApiGateway, cell_id: str):
68        self._api_gateway = api_gateway
69        self._cell_id = cell_id
cell_id: str
71    @property
72    def cell_id(self) -> str:
73        return self._cell_id
async def add_virtual_robot_controller( self, name: str, controller_type: wandelbots_api_client.models.virtual_controller_types.VirtualControllerTypes, controller_manufacturer: wandelbots_api_client.models.manufacturer.Manufacturer, timeout: int = 25, position: str | None = None) -> Controller:
137    async def add_virtual_robot_controller(
138        self,
139        name: str,
140        controller_type: api.models.VirtualControllerTypes,
141        controller_manufacturer: api.models.Manufacturer,
142        timeout: int = 25,
143        position: str | None = None,
144    ) -> Controller:
145        """Add a virtual robot controller to the cell
146
147        Args:
148            name (str): The name of the controller
149            controller_type (models.VirtualControllerTypes): The type of the controller
150            controller_manufacturer (models.Manufacturer): The manufacturer of the controller
151            timeout (int): The time to wait for the controller to be available in seconds
152            position (str): The initial position of the robot
153
154        """
155
156        home_position = (
157            position
158            if position is not None
159            else str(MANUFACTURER_HOME_POSITIONS.get(controller_manufacturer, [0.0] * 7))
160        )
161
162        await self._api_gateway.controller_api.add_robot_controller(
163            cell=self._cell_id,
164            robot_controller=api.models.RobotController(
165                name=name,
166                configuration=api.models.RobotControllerConfiguration(
167                    api.models.VirtualController(
168                        type=controller_type,
169                        manufacturer=controller_manufacturer,
170                        position=home_position,
171                    )
172                ),
173            ),
174            completion_timeout=timeout,
175        )
176        # Technically not needed because of the completion_timeout but it handles edge cases right now
177        await self._wait_for_controller_ready(name, timeout)
178        controller_instance = await self._get_controller_instance(name)
179        if controller_instance is None:
180            raise ControllerNotFound(controller=name)
181
182        return self._create_controller(controller_instance.controller)

Add a virtual robot controller to the cell

Args: name (str): The name of the controller controller_type (models.VirtualControllerTypes): The type of the controller controller_manufacturer (models.Manufacturer): The manufacturer of the controller timeout (int): The time to wait for the controller to be available in seconds position (str): The initial position of the robot

async def ensure_virtual_robot_controller( self, name: str, controller_type: wandelbots_api_client.models.virtual_controller_types.VirtualControllerTypes, controller_manufacturer: wandelbots_api_client.models.manufacturer.Manufacturer) -> Controller:
184    async def ensure_virtual_robot_controller(
185        self,
186        name: str,
187        controller_type: api.models.VirtualControllerTypes,
188        controller_manufacturer: api.models.Manufacturer,
189    ) -> "Controller":
190        controller_instance = await self._get_controller_instance(name)
191        if controller_instance:
192            return self._create_controller(controller_instance.controller)
193        return await self.add_virtual_robot_controller(
194            name, controller_type, controller_manufacturer
195        )
async def controllers(self) -> list[Controller]:
197    async def controllers(self) -> list["Controller"]:
198        controller_instances = await self._get_controller_instances()
199        return [
200            self._create_controller(controller_instance.controller)
201            for controller_instance in controller_instances
202        ]
async def controller(self, name: str) -> Controller:
204    async def controller(self, name: str) -> "Controller":
205        controller_instance = await self._get_controller_instance(name)
206
207        if controller_instance is None:
208            raise ControllerNotFound(controller=name)
209
210        return self._create_controller(controller_instance.controller)
async def delete_robot_controller(self, name: str, timeout: int = 25):
212    async def delete_robot_controller(self, name: str, timeout: int = 25):
213        await self._api_gateway.controller_api.delete_robot_controller(
214            cell=self._cell_id, controller=name, completion_timeout=timeout
215        )
async def get_robot_cell(self) -> nova.core.robot_cell.RobotCell:
217    async def get_robot_cell(self) -> RobotCell:
218        """Return the configured robot cell"""
219        controllers = await self.controllers()
220        return RobotCell(timer=None, **{controller.id: controller for controller in controllers})

Return the configured robot cell

class MotionGroup(nova.core.robot_cell.AbstractRobot):
 81class MotionGroup(AbstractRobot):
 82    def __init__(self, api_gateway: ApiGateway, cell: str, motion_group_id: str):
 83        self._api_gateway = api_gateway
 84        self._motion_api_client = api_gateway.motion_api
 85        self._cell = cell
 86        self._motion_group_id = motion_group_id
 87        self._current_motion: str | None = None
 88        self._optimizer_setup: wb.models.OptimizerSetup | None = None
 89        super().__init__(id=motion_group_id)
 90
 91    async def open(self):
 92        await self._api_gateway.motion_group_api.activate_motion_group(
 93            cell=self._cell, motion_group=self._motion_group_id
 94        )
 95        return self
 96
 97    async def close(self):
 98        # RPS-1174: when a motion group is deactivated, RAE closes all open connections
 99        #           this behaviour is not desired in some cases,
100        #           so for now we will not deactivate for the user
101        pass
102
103    @property
104    def motion_group_id(self) -> str:
105        return self._motion_group_id
106
107    @property
108    def current_motion(self) -> str | None:
109        # if not self._current_motion:
110        #    raise ValueError("No MotionId attached. There is no planned motion available.")
111        return self._current_motion
112
113    async def _plan_with_collision_check(
114        self,
115        actions: list[Action],
116        tcp: str,
117        start_joint_position: tuple[float, ...] | None = None,
118        optimizer_setup: wb.models.OptimizerSetup | None = None,
119    ) -> wb.models.JointTrajectory:
120        """
121        This method plans a trajectory and checks for collisions.
122        The collision check only happens if the actions have collision scene data.
123
124        You must provide the exact same collision data into all the actions.
125        Because the underlying API supports collision checks for the whole trajectory only.
126
127        Raises:
128            InconsistentCollisionScenes: If the collision scene data is not consistent across all actions
129
130            Your actions should follow below rules to be considered consistent:
131            1- They all should have the same collision scene data
132            2- They all should have no collision data
133
134            PlanTrajectoryFailed: If the trajectory planning failed including the collision check
135
136        For more information about this API, please refer to the plan_trajectory in the API documentation.
137
138        Args:
139            actions: list of actions to plan, current supported actions are Motion and WriteActions
140                     WriteAction you specify on your path is handled in a performant way.
141                     Please check execute_trajectory.motion_command.set_io for more information.
142            tcp:     The tool to use
143            start_joint_position: The starting joint position, if none provided, current position of the robot is used
144            optimizer_setup: The optimizer setup
145
146        Returns: planned joint trajectory
147
148        """
149        # PREPARE THE REQUEST
150        collision_scenes = self._validate_collision_scenes(actions)
151        start_joint_position = start_joint_position or await self.joints()
152        robot_setup = optimizer_setup or await self._get_optimizer_setup(tcp=tcp)
153
154        motion_commands = CombinedActions(items=tuple(actions)).to_motion_command()  # type: ignore
155
156        static_colliders = None
157        collision_motion_group = None
158        if collision_scenes and len(collision_scenes) > 0:
159            static_colliders = collision_scenes[0].colliders
160
161            motion_group_type = robot_setup.motion_group_type
162            if (
163                collision_scenes[0].motion_groups
164                and motion_group_type in collision_scenes[0].motion_groups
165            ):
166                collision_motion_group = collision_scenes[0].motion_groups[motion_group_type]
167
168        request = wb.models.PlanTrajectoryRequest(
169            robot_setup=robot_setup,
170            start_joint_position=list(start_joint_position),
171            motion_commands=motion_commands,
172            static_colliders=static_colliders,
173            collision_motion_group=collision_motion_group,
174        )
175
176        # EXECUTE THE API CALL
177        plan_trajectory_response = await self._motion_api_client.plan_trajectory(
178            cell=self._cell, plan_trajectory_request=request
179        )
180        if isinstance(
181            plan_trajectory_response.response.actual_instance,
182            wb.models.PlanTrajectoryFailedResponse,
183        ):
184            # TODO: handle partially executable path
185            raise PlanTrajectoryFailed(plan_trajectory_response.response.actual_instance)
186        return plan_trajectory_response.response.actual_instance
187
188    def _validate_collision_scenes(self, actions: list[Action]) -> list[models.CollisionScene]:
189        motion_count = len([action for action in actions if isinstance(action, Motion)])
190        collision_scenes = [
191            action.collision_scene
192            for action in actions
193            if isinstance(action, CollisionFreeMotion) and action.collision_scene is not None
194        ]
195
196        if len(collision_scenes) != 0 and len(collision_scenes) != motion_count:
197            raise InconsistentCollisionScenes(
198                "Only some of the actions have collision scene. Either specify it for all or none."
199            )
200
201        # If a collision scene is provided, the same should be provided for all the collision scene
202        if len(collision_scenes) > 1:
203            first_scene = collision_scenes[0]
204            if not all(
205                compare_collision_scenes(first_scene, scene) for scene in collision_scenes[1:]
206            ):
207                raise InconsistentCollisionScenes(
208                    "All actions must use the same collision scene but some are different"
209                )
210
211        return collision_scenes
212
213    # TODO: we get the optimizer setup from as an input because
214    #  it has a velocity setting which is used in collision free movement, I need to double check this
215    async def _plan_collision_free(
216        self,
217        action: CollisionFreeMotion,
218        tcp: str,
219        start_joint_position: list[float],
220        optimizer_setup: wb.models.OptimizerSetup | None = None,
221    ) -> wb.models.JointTrajectory:
222        """
223        This method plans a trajectory and avoids collisions.
224        This means if there is a collision along the way to the target pose or joint positions,
225        It will adjust the trajectory to avoid the collision.
226
227        The collision check only happens if the action have collision scene data.
228
229        For more information about this API, please refer to the plan_collision_free_ptp in the API documentation.
230
231        Args:
232            action: The target pose or joint positions to reach
233            tcp:     The tool to use
234            start_joint_position: The starting joint position, if none provided, current position of the robot is used
235            optimizer_setup: The optimizer setup
236
237        Returns: planned joint trajectory
238
239
240        """
241        target = wb.models.PlanCollisionFreePTPRequestTarget(
242            **action.model_dump(exclude_unset=True)
243        )
244        robot_setup = optimizer_setup or await self._get_optimizer_setup(tcp=tcp)
245
246        static_colliders = None
247        collision_motion_group = None
248        collision_scene = action.collision_scene
249        if collision_scene and collision_scene.colliders:
250            static_colliders = collision_scene.colliders
251
252            if (
253                collision_scene.motion_groups
254                and robot_setup.motion_group_type in collision_scene.motion_groups
255            ):
256                collision_motion_group = collision_scene.motion_groups[
257                    robot_setup.motion_group_type
258                ]
259
260        request: wb.models.PlanCollisionFreePTPRequest = wb.models.PlanCollisionFreePTPRequest(
261            robot_setup=robot_setup,
262            start_joint_position=start_joint_position,
263            target=target,
264            static_colliders=static_colliders,
265            collision_motion_group=collision_motion_group,
266        )
267
268        plan_result = await self._motion_api_client.plan_collision_free_ptp(
269            cell=self._cell, plan_collision_free_ptp_request=request
270        )
271
272        if isinstance(plan_result.response.actual_instance, wb.models.PlanTrajectoryFailedResponse):
273            raise PlanTrajectoryFailed(plan_result.response.actual_instance)
274        return plan_result.response.actual_instance
275
276    async def _plan(
277        self,
278        actions: list[Action],
279        tcp: str,
280        start_joint_position: tuple[float, ...] | None = None,
281        optimizer_setup: wb.models.OptimizerSetup | None = None,
282    ) -> wb.models.JointTrajectory:
283        if not actions:
284            raise ValueError("No actions provided")
285
286        current_joints = start_joint_position or await self.joints()
287        robot_setup = optimizer_setup or await self._get_optimizer_setup(tcp=tcp)
288
289        all_trajectories = []
290        for batch in split_actions_into_batches(actions):
291            if len(batch) == 0:
292                raise ValueError("Empty batch of actions")
293
294            if isinstance(batch[0], CollisionFreeMotion):
295                motion: CollisionFreeMotion = cast(CollisionFreeMotion, batch[0])
296                trajectory = await self._plan_collision_free(
297                    action=motion,
298                    tcp=tcp,
299                    start_joint_position=list(current_joints),
300                    optimizer_setup=robot_setup,
301                )
302                all_trajectories.append(trajectory)
303                # the last joint position of this trajectory is the starting point for the next one
304                current_joints = tuple(trajectory.joint_positions[-1].joints)
305            else:
306                trajectory = await self._plan_with_collision_check(
307                    actions=batch,
308                    tcp=tcp,
309                    start_joint_position=current_joints,
310                    optimizer_setup=robot_setup,
311                )
312                all_trajectories.append(trajectory)
313                # the last joint position of this trajectory is the starting point for the next one
314                current_joints = tuple(trajectory.joint_positions[-1].joints)
315
316        return combine_trajectories(all_trajectories)
317
318    # TODO: refactor and simplify code, tests are already there
319    # TODO: split into batches when the collision scene changes in a batch of collision free motions
320
321    async def _execute(
322        self,
323        joint_trajectory: wb.models.JointTrajectory,
324        tcp: str,
325        actions: list[Action],
326        movement_controller: MovementController | None,
327    ) -> AsyncIterable[MovementResponse]:
328        if movement_controller is None:
329            movement_controller = move_forward
330
331        # Load planned trajectory
332        load_plan_response = await self._load_planned_motion(joint_trajectory, tcp)
333
334        # Move to start position
335        number_of_joints = await self._get_number_of_joints()
336        joints_velocities = [MAX_JOINT_VELOCITY_PREPARE_MOVE] * number_of_joints
337        movement_stream = await self.move_to_start_position(joints_velocities, load_plan_response)
338
339        # If there's an initial consumer, feed it the data
340        async for move_to_response in movement_stream:
341            # TODO: refactor
342            if (
343                move_to_response.state is None
344                or move_to_response.state.motion_groups is None
345                or len(move_to_response.state.motion_groups) == 0
346                or move_to_response.move_response is None
347                or move_to_response.move_response.current_location_on_trajectory is None
348            ):
349                continue
350
351            yield move_to_response
352
353        controller = movement_controller(
354            MovementControllerContext(
355                combined_actions=CombinedActions(items=tuple(actions)),  # type: ignore
356                motion_id=load_plan_response.motion,
357            )
358        )
359
360        def stop_condition(response: wb.models.ExecuteTrajectoryResponse) -> bool:
361            instance = response.actual_instance
362            # Stop when standstill indicates motion ended
363            return (
364                isinstance(instance, wb.models.Standstill)
365                and instance.standstill.reason == wb.models.StandstillReason.REASON_MOTION_ENDED
366            )
367
368        execute_response_streaming_controller = StreamExtractor(controller, stop_condition)
369        execution_task = asyncio.create_task(
370            self._api_gateway.motion_api.execute_trajectory(
371                self._cell, execute_response_streaming_controller
372            )
373        )
374        async for execute_response in execute_response_streaming_controller:
375            yield execute_response
376        await execution_task
377
378    async def _get_number_of_joints(self) -> int:
379        spec = await self._api_gateway.motion_group_infos_api.get_motion_group_specification(
380            cell=self._cell, motion_group=self.motion_group_id
381        )
382        return len(spec.mechanical_joint_limits)
383
384    async def _get_optimizer_setup(self, tcp: str) -> wb.models.OptimizerSetup:
385        # TODO: mypy failed on main branch, need to check
386        if self._optimizer_setup is None or self._optimizer_setup.tcp != tcp:  # type: ignore
387            self._optimizer_setup = (
388                await self._api_gateway.motion_group_infos_api.get_optimizer_configuration(
389                    cell=self._cell, motion_group=self._motion_group_id, tcp=tcp
390                )
391            )
392        # TODO: mypy failed on code from main branch need to check
393        return self._optimizer_setup  # type: ignore
394
395    async def _load_planned_motion(
396        self, joint_trajectory: wb.models.JointTrajectory, tcp: str
397    ) -> wb.models.PlanSuccessfulResponse:
398        load_plan_response = await self._api_gateway.motion_api.load_planned_motion(
399            cell=self._cell,
400            planned_motion=wb.models.PlannedMotion(
401                motion_group=self.motion_group_id,
402                times=joint_trajectory.times,
403                joint_positions=joint_trajectory.joint_positions,
404                locations=joint_trajectory.locations,
405                tcp=tcp,
406            ),
407        )
408
409        if (
410            load_plan_response.plan_failed_on_trajectory_response is not None
411            or load_plan_response.plan_failed_on_trajectory_response is not None
412        ):
413            raise LoadPlanFailed(load_plan_response)
414
415        return load_plan_response.plan_successful_response
416
417    async def move_to_start_position(
418        self, joint_velocities, load_plan_response: LoadPlanResponse
419    ) -> InitialMovementStream:
420        limit_override = wb.models.LimitsOverride()
421        if joint_velocities is not None:
422            limit_override.joint_velocity_limits = wb.models.Joints(joints=joint_velocities)
423
424        move_to_trajectory_stream = (
425            self._api_gateway.motion_api.stream_move_to_trajectory_via_joint_ptp(
426                cell=self._cell, motion=load_plan_response.motion, location_on_trajectory=0
427            )
428        )
429        return move_to_trajectory_stream
430
431    async def stop(self):
432        logger.debug(f"Stopping motion of {self}...")
433        try:
434            await self._motion_api_client.stop_execution(
435                cell=self._cell, motion=self.current_motion
436            )
437            logger.debug(f"Motion {self.current_motion} stopped.")
438        except ValueError as e:
439            logger.debug(f"No motion to stop for {self}: {e}")
440
441    async def get_state(self, tcp: str | None = None) -> RobotState:
442        response = await self._api_gateway.motion_group_infos_api.get_current_motion_group_state(
443            cell=self._cell, motion_group=self.motion_group_id, tcp=tcp
444        )
445        return RobotState(
446            pose=Pose(response.state.tcp_pose), joints=tuple(response.state.joint_position.joints)
447        )
448
449    async def joints(self) -> tuple:
450        state = await self.get_state()
451        if state.joints is None:
452            raise ValueError(
453                f"No joint positions available for motion group {self._motion_group_id}"
454            )
455        return state.joints
456
457    async def tcp_pose(self, tcp: str | None = None) -> Pose:
458        state = await self.get_state(tcp=tcp)
459        return state.pose
460
461    async def tcps(self) -> list[wb.models.RobotTcp]:
462        """Get the available tool center points (TCPs)"""
463        response = await self._api_gateway.motion_group_infos_api.list_tcps(
464            cell=self._cell, motion_group=self.motion_group_id
465        )
466        return response.tcps
467
468    async def tcp_names(self) -> list[str]:
469        """Get the names of the available tool center points (TCPs)"""
470        return [tcp.id for tcp in await self.tcps()]
471
472    async def active_tcp(self) -> wb.models.RobotTcp:
473        active_tcp = await self._api_gateway.motion_group_infos_api.get_active_tcp(
474            cell=self._cell, motion_group=self.motion_group_id
475        )
476        return active_tcp
477
478    async def active_tcp_name(self) -> str:
479        active_tcp = await self.active_tcp()
480        return active_tcp.id

An interface for real and simulated robots

MotionGroup( api_gateway: nova.core.gateway.ApiGateway, cell: str, motion_group_id: str)
82    def __init__(self, api_gateway: ApiGateway, cell: str, motion_group_id: str):
83        self._api_gateway = api_gateway
84        self._motion_api_client = api_gateway.motion_api
85        self._cell = cell
86        self._motion_group_id = motion_group_id
87        self._current_motion: str | None = None
88        self._optimizer_setup: wb.models.OptimizerSetup | None = None
89        super().__init__(id=motion_group_id)
async def open(self):
91    async def open(self):
92        await self._api_gateway.motion_group_api.activate_motion_group(
93            cell=self._cell, motion_group=self._motion_group_id
94        )
95        return self

Allocates the external hardware resource (i.e. establish a connection)

async def close(self):
 97    async def close(self):
 98        # RPS-1174: when a motion group is deactivated, RAE closes all open connections
 99        #           this behaviour is not desired in some cases,
100        #           so for now we will not deactivate for the user
101        pass

Release the external hardware (i.e. close connection or set mode of external hardware back)

motion_group_id: str
103    @property
104    def motion_group_id(self) -> str:
105        return self._motion_group_id
current_motion: str | None
107    @property
108    def current_motion(self) -> str | None:
109        # if not self._current_motion:
110        #    raise ValueError("No MotionId attached. There is no planned motion available.")
111        return self._current_motion
async def move_to_start_position( self, joint_velocities, load_plan_response: wandelbots_api_client.models.plan_successful_response.PlanSuccessfulResponse) -> AsyncIterator[wandelbots_api_client.models.stream_move_response.StreamMoveResponse]:
417    async def move_to_start_position(
418        self, joint_velocities, load_plan_response: LoadPlanResponse
419    ) -> InitialMovementStream:
420        limit_override = wb.models.LimitsOverride()
421        if joint_velocities is not None:
422            limit_override.joint_velocity_limits = wb.models.Joints(joints=joint_velocities)
423
424        move_to_trajectory_stream = (
425            self._api_gateway.motion_api.stream_move_to_trajectory_via_joint_ptp(
426                cell=self._cell, motion=load_plan_response.motion, location_on_trajectory=0
427            )
428        )
429        return move_to_trajectory_stream
async def stop(self):
431    async def stop(self):
432        logger.debug(f"Stopping motion of {self}...")
433        try:
434            await self._motion_api_client.stop_execution(
435                cell=self._cell, motion=self.current_motion
436            )
437            logger.debug(f"Motion {self.current_motion} stopped.")
438        except ValueError as e:
439            logger.debug(f"No motion to stop for {self}: {e}")

Stop behaviour of the robot

async def get_state(self, tcp: str | None = None) -> nova.types.RobotState:
441    async def get_state(self, tcp: str | None = None) -> RobotState:
442        response = await self._api_gateway.motion_group_infos_api.get_current_motion_group_state(
443            cell=self._cell, motion_group=self.motion_group_id, tcp=tcp
444        )
445        return RobotState(
446            pose=Pose(response.state.tcp_pose), joints=tuple(response.state.joint_position.joints)
447        )

Current state (pose, joints) of the robot based on the tcp.

Args: tcp (str): The id of the tool center point (TCP) to be used for tcp_pose in response. If not set, the flange pose is returned as tcp_pose.

Returns: the current state of the robot

async def joints(self) -> tuple:
449    async def joints(self) -> tuple:
450        state = await self.get_state()
451        if state.joints is None:
452            raise ValueError(
453                f"No joint positions available for motion group {self._motion_group_id}"
454            )
455        return state.joints

Return the current joint positions of the robot

Returns: the current joint positions

async def tcp_pose(self, tcp: str | None = None) -> nova.types.Pose:
457    async def tcp_pose(self, tcp: str | None = None) -> Pose:
458        state = await self.get_state(tcp=tcp)
459        return state.pose

Return the current pose of the robot based on the tcp

Args: tcp (str): The id of the tool center point (TCP) to be used for tcp_pose in response. If not set, the flange pose is returned as tcp_pose.

Returns: the current pose of the robot

async def tcps(self) -> list[wandelbots_api_client.models.robot_tcp.RobotTcp]:
461    async def tcps(self) -> list[wb.models.RobotTcp]:
462        """Get the available tool center points (TCPs)"""
463        response = await self._api_gateway.motion_group_infos_api.list_tcps(
464            cell=self._cell, motion_group=self.motion_group_id
465        )
466        return response.tcps

Get the available tool center points (TCPs)

async def tcp_names(self) -> list[str]:
468    async def tcp_names(self) -> list[str]:
469        """Get the names of the available tool center points (TCPs)"""
470        return [tcp.id for tcp in await self.tcps()]

Get the names of the available tool center points (TCPs)

async def active_tcp(self) -> wandelbots_api_client.models.robot_tcp.RobotTcp:
472    async def active_tcp(self) -> wb.models.RobotTcp:
473        active_tcp = await self._api_gateway.motion_group_infos_api.get_active_tcp(
474            cell=self._cell, motion_group=self.motion_group_id
475        )
476        return active_tcp

Return the active TCP of the robot

Returns: the active TCP

async def active_tcp_name(self) -> str:
478    async def active_tcp_name(self) -> str:
479        active_tcp = await self.active_tcp()
480        return active_tcp.id

Return the name of the active TCP of the robot

Returns: the name of the active TCP

class Controller(typing.Sized, nova.core.robot_cell.AbstractController, nova.core.gateway.NovaDevice, nova.core.robot_cell.IODevice):
11class Controller(Sized, AbstractController, NovaDevice, IODevice):
12    class Configuration(NovaDevice.Configuration):
13        type: Literal["controller"] = "controller"
14        id: str = "controller"
15        cell_id: str
16        controller_id: str
17
18    def __init__(self, configuration: Configuration):
19        super().__init__(configuration)
20        self._controller_api = self._nova_api.controller_api
21        self._motion_group_api = self._nova_api.motion_group_api
22        self._activated_motion_group_ids: list[str] = []
23        self._io_access = IOAccess(
24            api_gateway=self._nova_api,
25            cell=self.configuration.cell_id,
26            controller_id=self.configuration.controller_id,
27        )
28
29    @property
30    def controller_id(self) -> str:
31        return self.configuration.controller_id
32
33    async def open(self):
34        motion_group_ids = await self.activated_motion_group_ids()
35        self._activated_motion_group_ids = motion_group_ids
36        logger.info(f"Found motion group {motion_group_ids}")
37        return self
38
39    async def close(self):
40        # RPS-1174: when a motion group is deactivated, RAE closes all open connections
41        #           this behaviour is not desired in some cases,
42        #           so for now we will not deactivate for the user
43        pass
44
45    def __len__(self) -> int:
46        return len(self._activated_motion_group_ids)
47
48    # TODO: should accept the exact motion group id as str
49    def motion_group(self, motion_group_id: int = 0) -> MotionGroup:
50        return MotionGroup(
51            api_gateway=self._nova_api,
52            cell=self.configuration.cell_id,
53            motion_group_id=f"{motion_group_id}@{self.configuration.controller_id}",
54        )
55
56    def __getitem__(self, motion_group_id: int) -> MotionGroup:
57        return self.motion_group(motion_group_id)
58
59    async def activated_motion_group_ids(self) -> list[str]:
60        activate_all_motion_groups_response = (
61            await self._motion_group_api.activate_all_motion_groups(
62                cell=self.configuration.cell_id, controller=self.configuration.controller_id
63            )
64        )
65        motion_groups = activate_all_motion_groups_response.instances
66        return [mg.motion_group for mg in motion_groups]
67
68    async def activated_motion_groups(self) -> list[MotionGroup]:
69        motion_group_ids = await self.activated_motion_group_ids()
70        return [self.motion_group(int(mg.split("@")[0])) for mg in motion_group_ids]
71
72    def get_robots(self) -> dict[str, AbstractRobot]:
73        return {
74            motion_group_id: self.motion_group(int(motion_group_id.split("@")[0]))
75            for motion_group_id in self._activated_motion_group_ids
76        }
77
78    async def read(self, key: str) -> ValueType:
79        return await self._io_access.read(key)
80
81    async def write(self, key: str, value: ValueType) -> None:
82        return await self._io_access.write(key, value)

A device which is configurable

controller_id: str
29    @property
30    def controller_id(self) -> str:
31        return self.configuration.controller_id
async def open(self):
33    async def open(self):
34        motion_group_ids = await self.activated_motion_group_ids()
35        self._activated_motion_group_ids = motion_group_ids
36        logger.info(f"Found motion group {motion_group_ids}")
37        return self

Allocates the external hardware resource (i.e. establish a connection)

async def close(self):
39    async def close(self):
40        # RPS-1174: when a motion group is deactivated, RAE closes all open connections
41        #           this behaviour is not desired in some cases,
42        #           so for now we will not deactivate for the user
43        pass

Release the external hardware (i.e. close connection or set mode of external hardware back)

def motion_group(self, motion_group_id: int = 0) -> MotionGroup:
49    def motion_group(self, motion_group_id: int = 0) -> MotionGroup:
50        return MotionGroup(
51            api_gateway=self._nova_api,
52            cell=self.configuration.cell_id,
53            motion_group_id=f"{motion_group_id}@{self.configuration.controller_id}",
54        )
async def activated_motion_group_ids(self) -> list[str]:
59    async def activated_motion_group_ids(self) -> list[str]:
60        activate_all_motion_groups_response = (
61            await self._motion_group_api.activate_all_motion_groups(
62                cell=self.configuration.cell_id, controller=self.configuration.controller_id
63            )
64        )
65        motion_groups = activate_all_motion_groups_response.instances
66        return [mg.motion_group for mg in motion_groups]
async def activated_motion_groups(self) -> list[MotionGroup]:
68    async def activated_motion_groups(self) -> list[MotionGroup]:
69        motion_group_ids = await self.activated_motion_group_ids()
70        return [self.motion_group(int(mg.split("@")[0])) for mg in motion_group_ids]
async def read(self, key: str) -> Union[int, str, bool, float, nova.types.Pose]:
78    async def read(self, key: str) -> ValueType:
79        return await self._io_access.read(key)

Read a value given its key

async def write( self, key: str, value: Union[int, str, bool, float, nova.types.Pose]) -> None:
81    async def write(self, key: str, value: ValueType) -> None:
82        return await self._io_access.write(key, value)

Write a value given its key and the new value

def speed_up_movement_controller( context: nova.actions.MovementControllerContext, on_movement: Callable[[nova.types.MotionState | None], NoneType]) -> Callable[[AsyncIterator[wandelbots_api_client.models.execute_trajectory_response.ExecuteTrajectoryResponse]], AsyncIterator[wandelbots_api_client.models.execute_trajectory_request.ExecuteTrajectoryRequest]]:
111def speed_up(
112    context: MovementControllerContext, on_movement: Callable[[MotionState | None], None]
113) -> MovementControllerFunction:
114    async def movement_controller(
115        response_stream: ExecuteTrajectoryResponseStream,
116    ) -> ExecuteTrajectoryRequestStream:
117        # The first request is to initialize the movement
118        yield wb.models.InitializeMovementRequest(trajectory=context.motion_id, initial_location=0)  # type: ignore
119
120        # then we get the response
121        initialize_movement_response = await anext(response_stream)
122        if isinstance(
123            initialize_movement_response.actual_instance, wb.models.InitializeMovementResponse
124        ):
125            r1 = initialize_movement_response.actual_instance
126            if not r1.init_response.succeeded:
127                raise InitMovementFailed(r1.init_response)
128
129        # The second request is to start the movement
130        set_io_list = context.combined_actions.to_set_io()
131        yield wb.models.StartMovementRequest(
132            set_ios=set_io_list, start_on_io=None, pause_on_io=None
133        )  # type: ignore
134
135        counter = 0
136        latest_speed = 10
137        # then we wait until the movement is finished
138        async for execute_trajectory_response in response_stream:
139            counter += 1
140            instance = execute_trajectory_response.actual_instance
141            # Send the current location to the consume
142            if isinstance(instance, wb.models.Movement):
143                motion_state = movement_to_motion_state(instance)
144                if motion_state:
145                    on_movement(motion_state)
146
147            # Terminate the generator
148            if isinstance(instance, wb.models.Standstill):
149                if instance.standstill.reason == wb.models.StandstillReason.REASON_MOTION_ENDED:
150                    on_movement(None)
151                    return
152
153            if isinstance(instance, wb.models.PlaybackSpeedResponse):
154                playback_speed = instance.playback_speed_response
155                logger.info(f"Current playback speed: {playback_speed}")
156
157            if counter % 10 == 0:
158                yield wb.models.ExecuteTrajectoryRequest(
159                    wb.models.PlaybackSpeedRequest(playback_speed_in_percent=latest_speed)
160                )
161                counter = 0
162                latest_speed += 5
163                if latest_speed > 100:
164                    latest_speed = 100
165
166    return movement_controller

The type of the None singleton.

class MotionSettings(pydantic.main.BaseModel):
  6class MotionSettings(pydantic.BaseModel):
  7    """
  8    Settings for an action. This is closely related to the `MotionCommand` in the API.
  9    See planTrajectory.motion_commands for more information.
 10
 11    Motion settings are immutable; if you need to change a setting, create a copy and update the new object.
 12
 13    Attributes:
 14        min_blending_velocity:
 15            A minimum velocity for blending, in percent. Cannot be used if `blending` is set.
 16
 17        position_zone_radius:
 18            Defines the position zone radius.
 19
 20        joint_velocity_limits:
 21            Maximum joint velocities
 22
 23        joint_acceleration_limits:
 24            Maximum joint accelerations
 25
 26        tcp_velocity_limit:
 27            Maximum TCP velocity
 28
 29        tcp_acceleration_limit:
 30            Maximum TCP acceleration
 31
 32        tcp_orientation_velocity_limit:
 33            Maximum TCP orientation velocity
 34
 35        tcp_orientation_acceleration_limit:
 36            Maximum TCP orientation acceleration
 37    """
 38
 39    min_blending_velocity: int | None = pydantic.Field(default=None)
 40    position_zone_radius: float | None = pydantic.Field(default=None)
 41    joint_velocity_limits: tuple[float, ...] | None = pydantic.Field(default=None)
 42    joint_acceleration_limits: tuple[float, ...] | None = pydantic.Field(default=None)
 43    tcp_velocity_limit: float | None = pydantic.Field(default=None)
 44    tcp_acceleration_limit: float | None = pydantic.Field(default=None)
 45    tcp_orientation_velocity_limit: float | None = pydantic.Field(default=None)
 46    tcp_orientation_acceleration_limit: float | None = pydantic.Field(default=None)
 47
 48    class Config:
 49        frozen = True
 50
 51    @classmethod
 52    def field_to_varname(cls, field):
 53        return f"__ms_{field}"
 54
 55    @pydantic.model_validator(mode="after")
 56    def validate_blending_settings(self) -> "MotionSettings":
 57        if self.min_blending_velocity and self.position_zone_radius:
 58            raise ValueError("Can't set both min_blending_velocity and blending")
 59        return self
 60
 61    def has_blending_settings(self) -> bool:
 62        return any([self.min_blending_velocity, self.position_zone_radius])
 63
 64    def has_limits_override(self) -> bool:
 65        return any(
 66            [
 67                self.tcp_velocity_limit,
 68                self.tcp_acceleration_limit,
 69                self.tcp_orientation_velocity_limit,
 70                self.tcp_orientation_acceleration_limit,
 71                self.joint_velocity_limits,
 72                self.joint_acceleration_limits,
 73            ]
 74        )
 75
 76    def as_limits_settings(self) -> wb.models.LimitsOverride:
 77        return wb.models.LimitsOverride(
 78            joint_velocity_limits=wb.models.Joints(joints=self.joint_velocity_limits)  # type: ignore
 79            if self.joint_velocity_limits
 80            else None,
 81            joint_acceleration_limits=wb.models.Joints(joints=self.joint_acceleration_limits)  # type: ignore
 82            if self.joint_acceleration_limits
 83            else None,
 84            tcp_velocity_limit=self.tcp_velocity_limit,
 85            tcp_acceleration_limit=self.tcp_acceleration_limit,
 86            tcp_orientation_velocity_limit=self.tcp_orientation_velocity_limit,
 87            tcp_orientation_acceleration_limit=self.tcp_orientation_acceleration_limit,
 88        )
 89
 90    def as_blending_setting(self) -> wb.models.MotionCommandBlending:
 91        if self.position_zone_radius:
 92            return wb.models.MotionCommandBlending(
 93                wb.models.BlendingPosition(
 94                    position_zone_radius=self.position_zone_radius, blending_name="BlendingPosition"
 95                )
 96            )
 97        return wb.models.MotionCommandBlending(
 98            wb.models.BlendingAuto(
 99                min_velocity_in_percent=self.min_blending_velocity, blending_name="BlendingAuto"
100            )
101        )

Settings for an action. This is closely related to the MotionCommand in the API. See planTrajectory.motion_commands for more information.

Motion settings are immutable; if you need to change a setting, create a copy and update the new object.

Attributes: min_blending_velocity: A minimum velocity for blending, in percent. Cannot be used if blending is set.

position_zone_radius:
    Defines the position zone radius.

joint_velocity_limits:
    Maximum joint velocities

joint_acceleration_limits:
    Maximum joint accelerations

tcp_velocity_limit:
    Maximum TCP velocity

tcp_acceleration_limit:
    Maximum TCP acceleration

tcp_orientation_velocity_limit:
    Maximum TCP orientation velocity

tcp_orientation_acceleration_limit:
    Maximum TCP orientation acceleration
min_blending_velocity: int | None
position_zone_radius: float | None
joint_velocity_limits: tuple[float, ...] | None
joint_acceleration_limits: tuple[float, ...] | None
tcp_velocity_limit: float | None
tcp_acceleration_limit: float | None
tcp_orientation_velocity_limit: float | None
tcp_orientation_acceleration_limit: float | None
@classmethod
def field_to_varname(cls, field):
51    @classmethod
52    def field_to_varname(cls, field):
53        return f"__ms_{field}"
@pydantic.model_validator(mode='after')
def validate_blending_settings(self) -> MotionSettings:
55    @pydantic.model_validator(mode="after")
56    def validate_blending_settings(self) -> "MotionSettings":
57        if self.min_blending_velocity and self.position_zone_radius:
58            raise ValueError("Can't set both min_blending_velocity and blending")
59        return self
def has_blending_settings(self) -> bool:
61    def has_blending_settings(self) -> bool:
62        return any([self.min_blending_velocity, self.position_zone_radius])
def has_limits_override(self) -> bool:
64    def has_limits_override(self) -> bool:
65        return any(
66            [
67                self.tcp_velocity_limit,
68                self.tcp_acceleration_limit,
69                self.tcp_orientation_velocity_limit,
70                self.tcp_orientation_acceleration_limit,
71                self.joint_velocity_limits,
72                self.joint_acceleration_limits,
73            ]
74        )
def as_limits_settings(self) -> wandelbots_api_client.models.limits_override.LimitsOverride:
76    def as_limits_settings(self) -> wb.models.LimitsOverride:
77        return wb.models.LimitsOverride(
78            joint_velocity_limits=wb.models.Joints(joints=self.joint_velocity_limits)  # type: ignore
79            if self.joint_velocity_limits
80            else None,
81            joint_acceleration_limits=wb.models.Joints(joints=self.joint_acceleration_limits)  # type: ignore
82            if self.joint_acceleration_limits
83            else None,
84            tcp_velocity_limit=self.tcp_velocity_limit,
85            tcp_acceleration_limit=self.tcp_acceleration_limit,
86            tcp_orientation_velocity_limit=self.tcp_orientation_velocity_limit,
87            tcp_orientation_acceleration_limit=self.tcp_orientation_acceleration_limit,
88        )
def as_blending_setting( self) -> wandelbots_api_client.models.motion_command_blending.MotionCommandBlending:
 90    def as_blending_setting(self) -> wb.models.MotionCommandBlending:
 91        if self.position_zone_radius:
 92            return wb.models.MotionCommandBlending(
 93                wb.models.BlendingPosition(
 94                    position_zone_radius=self.position_zone_radius, blending_name="BlendingPosition"
 95                )
 96            )
 97        return wb.models.MotionCommandBlending(
 98            wb.models.BlendingAuto(
 99                min_velocity_in_percent=self.min_blending_velocity, blending_name="BlendingAuto"
100            )
101        )
model_config: ClassVar[pydantic.config.ConfigDict] = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class MotionSettings.Config:
48    class Config:
49        frozen = True
frozen = True
logger = <Logger wandelbots-nova (INFO)>
__version__ = '0.40.1'