nova

 1from nova import actions, api, types
 2from nova.core.controller import Controller
 3from nova.core.logging import logger
 4from nova.core.motion_group import MotionGroup
 5from nova.core.movement_controller import speed_up as speed_up_movement_controller
 6from nova.core.nova import Cell, Nova
 7from nova.types import MotionSettings
 8from nova.version import version
 9
10__version__ = version
11
12__all__ = [
13    "Nova",
14    "Cell",
15    "MotionGroup",
16    "Controller",
17    "speed_up_movement_controller",
18    "api",
19    "types",
20    "actions",
21    "MotionSettings",
22    "logger",
23    "__version__",
24]
class Nova:
27class Nova:
28    """A high-level Nova client for interacting with robot cells and controllers."""
29
30    _api_client: ApiGateway
31
32    def __init__(
33        self,
34        *,
35        host: str | None = None,
36        access_token: str | None = None,
37        username: str | None = None,
38        password: str | None = None,
39        version: str = "v1",
40        verify_ssl: bool = True,
41    ):
42        """
43        Initialize the Nova client.
44
45        Args:
46            host (str | None): The Nova API host.
47            access_token (str | None): An access token for the Nova API.
48            username (str | None): Username to authenticate with the Nova API.
49            password (str | None): Password to authenticate with the Nova API.
50            version (str): The API version to use (default: "v1").
51            verify_ssl (bool): Whether or not to verify SSL certificates (default: True).
52        """
53
54        self._api_client = ApiGateway(
55            host=host,
56            access_token=access_token,
57            username=username,
58            password=password,
59            version=version,
60            verify_ssl=verify_ssl,
61        )
62
63    def cell(self, cell_id: str = CELL_NAME) -> Cell:
64        """Returns the cell object with the given ID."""
65        return Cell(self._api_client, cell_id)
66
67    async def close(self):
68        """Closes the underlying API client session."""
69        return await self._api_client.close()
70
71    async def __aenter__(self):
72        return self
73
74    async def __aexit__(self, exc_type, exc_val, exc_tb):
75        await self.close()

A high-level Nova client for interacting with robot cells and controllers.

Nova( *, host: str | None = None, access_token: str | None = None, username: str | None = None, password: str | None = None, version: str = 'v1', verify_ssl: bool = True)
32    def __init__(
33        self,
34        *,
35        host: str | None = None,
36        access_token: str | None = None,
37        username: str | None = None,
38        password: str | None = None,
39        version: str = "v1",
40        verify_ssl: bool = True,
41    ):
42        """
43        Initialize the Nova client.
44
45        Args:
46            host (str | None): The Nova API host.
47            access_token (str | None): An access token for the Nova API.
48            username (str | None): Username to authenticate with the Nova API.
49            password (str | None): Password to authenticate with the Nova API.
50            version (str): The API version to use (default: "v1").
51            verify_ssl (bool): Whether or not to verify SSL certificates (default: True).
52        """
53
54        self._api_client = ApiGateway(
55            host=host,
56            access_token=access_token,
57            username=username,
58            password=password,
59            version=version,
60            verify_ssl=verify_ssl,
61        )

Initialize the Nova client.

Arguments:
  • host (str | None): The Nova API host.
  • access_token (str | None): An access token for the Nova API.
  • username (str | None): Username to authenticate with the Nova API.
  • password (str | None): Password to authenticate with the Nova API.
  • version (str): The API version to use (default: "v1").
  • verify_ssl (bool): Whether or not to verify SSL certificates (default: True).
def cell(self, cell_id: str = 'cell') -> Cell:
63    def cell(self, cell_id: str = CELL_NAME) -> Cell:
64        """Returns the cell object with the given ID."""
65        return Cell(self._api_client, cell_id)

Returns the cell object with the given ID.

async def close(self):
67    async def close(self):
68        """Closes the underlying API client session."""
69        return await self._api_client.close()

Closes the underlying API client session.

class Cell:
 78class Cell:
 79    """A representation of a robot cell, providing high-level operations on controllers."""
 80
 81    def __init__(self, api_gateway: ApiGateway, cell_id: str):
 82        """
 83        Initializes a Cell instance.
 84        Args:
 85            api_gateway (ApiGateway): The underlying gateway for making API calls.
 86            cell_id (str): The unique identifier for the cell.
 87        """
 88        self._api_gateway = api_gateway
 89        self._cell_id = cell_id
 90
 91    @property
 92    def cell_id(self) -> str:
 93        """
 94        Returns unique identifier for this cell.
 95        Returns:
 96            str: The cell ID.
 97        """
 98        return self._cell_id
 99
100    def _create_controller(self, controller_id: str) -> Controller:
101        return Controller(
102            configuration=Controller.Configuration(
103                nova_api=self._api_gateway.host,
104                nova_access_token=self._api_gateway.access_token,
105                nova_username=self._api_gateway.username,
106                nova_password=self._api_gateway.password,
107                cell_id=self._cell_id,
108                controller_id=controller_id,
109                id=controller_id,
110            )
111        )
112
113    async def add_virtual_robot_controller(
114        self,
115        name: str,
116        controller_type: api.models.VirtualControllerTypes,
117        controller_manufacturer: api.models.Manufacturer,
118        timeout: int = 25,
119        position: str | None = None,
120    ) -> Controller:
121        """Add a virtual robot controller to the cell."""
122        if position is None:
123            position = str(MANUFACTURER_HOME_POSITIONS.get(controller_manufacturer, [0.0] * 7))
124
125        controller_instance = await self._api_gateway.add_virtual_robot_controller(
126            cell=self._cell_id,
127            name=name,
128            controller_type=controller_type,
129            controller_manufacturer=controller_manufacturer,
130            timeout=timeout,
131            position=position,
132        )
133        return self._create_controller(controller_instance.controller)
134
135    async def ensure_virtual_robot_controller(
136        self,
137        name: str,
138        controller_type: api.models.VirtualControllerTypes,
139        controller_manufacturer: api.models.Manufacturer,
140        timeout: int = 25,
141    ) -> Controller:
142        """
143        Ensure a virtual robot controller with the given name exists.
144        If the controller already exists, it is returned. Otherwise, it is created.
145        Args:
146            name (str): The name of the controller.
147            controller_type (api.models.VirtualControllerTypes): The type of virtual controller.
148            controller_manufacturer (api.models.Manufacturer): The manufacturer of the controller.
149        Returns:
150            Controller: The existing or newly created Controller object.
151        """
152        controller_instance = await self._api_gateway.get_controller_instance(
153            cell=self.cell_id, name=name
154        )
155        if controller_instance:
156            return self._create_controller(controller_instance.controller)
157        return await self.add_virtual_robot_controller(
158            name, controller_type, controller_manufacturer, timeout=timeout
159        )
160
161    async def controllers(self) -> list[Controller]:
162        """
163        List all controllers for this cell.
164        Returns:
165            list[Controller]: A list of Controller objects associated with this cell.
166        """
167        instances = await self._api_gateway.list_controllers(cell=self._cell_id)
168        return [self._create_controller(ci.controller) for ci in instances]
169
170    async def controller(self, name: str) -> Controller:
171        """
172        Retrieve a specific controller by name.
173        Args:
174            name (str): The name of the controller.
175        Returns:
176            Controller: The Controller object.
177        Raises:
178            ControllerNotFound: If no controller with the specified name exists.
179        """
180        controller_instance = await self._api_gateway.get_controller_instance(
181            cell=self._cell_id, name=name
182        )
183        if not controller_instance:
184            raise ControllerNotFound(controller=name)
185        return self._create_controller(controller_instance.controller)
186
187    async def delete_robot_controller(self, name: str, timeout: int = 25):
188        """
189        Delete a robot controller from the cell.
190        Args:
191            name (str): The name of the controller to delete.
192            timeout (int): The time to wait for the controller deletion to complete (default: 25).
193        """
194        await self._api_gateway.delete_robot_controller(
195            cell=self._cell_id, controller=name, completion_timeout=timeout
196        )
197
198    async def get_robot_cell(self) -> RobotCell:
199        """
200        Return a RobotCell object containing all known controllers.
201        Returns:
202            RobotCell: A RobotCell initialized with the available controllers.
203        """
204        controllers = await self.controllers()
205        return RobotCell(timer=None, **{controller.id: controller for controller in controllers})

A representation of a robot cell, providing high-level operations on controllers.

Cell(api_gateway: nova.core.gateway.ApiGateway, cell_id: str)
81    def __init__(self, api_gateway: ApiGateway, cell_id: str):
82        """
83        Initializes a Cell instance.
84        Args:
85            api_gateway (ApiGateway): The underlying gateway for making API calls.
86            cell_id (str): The unique identifier for the cell.
87        """
88        self._api_gateway = api_gateway
89        self._cell_id = cell_id

Initializes a Cell instance.

Arguments:
  • api_gateway (ApiGateway): The underlying gateway for making API calls.
  • cell_id (str): The unique identifier for the cell.
cell_id: str
91    @property
92    def cell_id(self) -> str:
93        """
94        Returns unique identifier for this cell.
95        Returns:
96            str: The cell ID.
97        """
98        return self._cell_id

Returns unique identifier for this cell.

Returns:

str: The cell ID.

async def add_virtual_robot_controller( self, name: str, controller_type: wandelbots_api_client.models.virtual_controller_types.VirtualControllerTypes, controller_manufacturer: wandelbots_api_client.models.manufacturer.Manufacturer, timeout: int = 25, position: str | None = None) -> Controller:
113    async def add_virtual_robot_controller(
114        self,
115        name: str,
116        controller_type: api.models.VirtualControllerTypes,
117        controller_manufacturer: api.models.Manufacturer,
118        timeout: int = 25,
119        position: str | None = None,
120    ) -> Controller:
121        """Add a virtual robot controller to the cell."""
122        if position is None:
123            position = str(MANUFACTURER_HOME_POSITIONS.get(controller_manufacturer, [0.0] * 7))
124
125        controller_instance = await self._api_gateway.add_virtual_robot_controller(
126            cell=self._cell_id,
127            name=name,
128            controller_type=controller_type,
129            controller_manufacturer=controller_manufacturer,
130            timeout=timeout,
131            position=position,
132        )
133        return self._create_controller(controller_instance.controller)

Add a virtual robot controller to the cell.

async def ensure_virtual_robot_controller( self, name: str, controller_type: wandelbots_api_client.models.virtual_controller_types.VirtualControllerTypes, controller_manufacturer: wandelbots_api_client.models.manufacturer.Manufacturer, timeout: int = 25) -> Controller:
135    async def ensure_virtual_robot_controller(
136        self,
137        name: str,
138        controller_type: api.models.VirtualControllerTypes,
139        controller_manufacturer: api.models.Manufacturer,
140        timeout: int = 25,
141    ) -> Controller:
142        """
143        Ensure a virtual robot controller with the given name exists.
144        If the controller already exists, it is returned. Otherwise, it is created.
145        Args:
146            name (str): The name of the controller.
147            controller_type (api.models.VirtualControllerTypes): The type of virtual controller.
148            controller_manufacturer (api.models.Manufacturer): The manufacturer of the controller.
149        Returns:
150            Controller: The existing or newly created Controller object.
151        """
152        controller_instance = await self._api_gateway.get_controller_instance(
153            cell=self.cell_id, name=name
154        )
155        if controller_instance:
156            return self._create_controller(controller_instance.controller)
157        return await self.add_virtual_robot_controller(
158            name, controller_type, controller_manufacturer, timeout=timeout
159        )

Ensure a virtual robot controller with the given name exists. If the controller already exists, it is returned. Otherwise, it is created.

Arguments:
  • name (str): The name of the controller.
  • controller_type (api.models.VirtualControllerTypes): The type of virtual controller.
  • controller_manufacturer (api.models.Manufacturer): The manufacturer of the controller.
Returns:

Controller: The existing or newly created Controller object.

async def controllers(self) -> list[Controller]:
161    async def controllers(self) -> list[Controller]:
162        """
163        List all controllers for this cell.
164        Returns:
165            list[Controller]: A list of Controller objects associated with this cell.
166        """
167        instances = await self._api_gateway.list_controllers(cell=self._cell_id)
168        return [self._create_controller(ci.controller) for ci in instances]

List all controllers for this cell.

Returns:

list[Controller]: A list of Controller objects associated with this cell.

async def controller(self, name: str) -> Controller:
170    async def controller(self, name: str) -> Controller:
171        """
172        Retrieve a specific controller by name.
173        Args:
174            name (str): The name of the controller.
175        Returns:
176            Controller: The Controller object.
177        Raises:
178            ControllerNotFound: If no controller with the specified name exists.
179        """
180        controller_instance = await self._api_gateway.get_controller_instance(
181            cell=self._cell_id, name=name
182        )
183        if not controller_instance:
184            raise ControllerNotFound(controller=name)
185        return self._create_controller(controller_instance.controller)

Retrieve a specific controller by name.

Arguments:
  • name (str): The name of the controller.
Returns:

Controller: The Controller object.

Raises:
  • ControllerNotFound: If no controller with the specified name exists.
async def delete_robot_controller(self, name: str, timeout: int = 25):
187    async def delete_robot_controller(self, name: str, timeout: int = 25):
188        """
189        Delete a robot controller from the cell.
190        Args:
191            name (str): The name of the controller to delete.
192            timeout (int): The time to wait for the controller deletion to complete (default: 25).
193        """
194        await self._api_gateway.delete_robot_controller(
195            cell=self._cell_id, controller=name, completion_timeout=timeout
196        )

Delete a robot controller from the cell.

Arguments:
  • name (str): The name of the controller to delete.
  • timeout (int): The time to wait for the controller deletion to complete (default: 25).
async def get_robot_cell(self) -> nova.core.robot_cell.RobotCell:
198    async def get_robot_cell(self) -> RobotCell:
199        """
200        Return a RobotCell object containing all known controllers.
201        Returns:
202            RobotCell: A RobotCell initialized with the available controllers.
203        """
204        controllers = await self.controllers()
205        return RobotCell(timer=None, **{controller.id: controller for controller in controllers})

Return a RobotCell object containing all known controllers.

Returns:

RobotCell: A RobotCell initialized with the available controllers.

class MotionGroup(nova.core.robot_cell.AbstractRobot):
 81class MotionGroup(AbstractRobot):
 82    """Manages motion planning and execution within a specified motion group."""
 83
 84    def __init__(self, api_gateway: ApiGateway, cell: str, motion_group_id: str):
 85        """
 86        Initializes a new MotionGroup instance.
 87
 88        Args:
 89            api_gateway (ApiGateway): The API gateway through which motion commands are sent.
 90            cell (str): The name or identifier of the robotic cell.
 91            motion_group_id (str): The identifier of the motion group.
 92        """
 93        self._api_gateway = api_gateway
 94        self._cell = cell
 95        self._motion_group_id = motion_group_id
 96        self._current_motion: str | None = None
 97        self._optimizer_setup: wb.models.OptimizerSetup | None = None
 98        super().__init__(id=motion_group_id)
 99
100    async def open(self):
101        await self._api_gateway.activate_motion_group(
102            cell=self._cell, motion_group_id=self._motion_group_id
103        )
104        return self
105
106    async def close(self):
107        # RPS-1174: when a motion group is deactivated, RAE closes all open connections
108        #           this behaviour is not desired in some cases,
109        #           so for now we will not deactivate for the user
110        pass
111
112    @property
113    def motion_group_id(self) -> str:
114        """
115        Returns:
116            str: The unique identifier for this motion group.
117        """
118        return self._motion_group_id
119
120    @property
121    def current_motion(self) -> str | None:
122        # if not self._current_motion:
123        #    raise ValueError("No MotionId attached. There is no planned motion available.")
124        return self._current_motion
125
126    async def _plan_with_collision_check(
127        self,
128        actions: list[Action],
129        tcp: str,
130        start_joint_position: tuple[float, ...] | None = None,
131        optimizer_setup: wb.models.OptimizerSetup | None = None,
132    ) -> wb.models.JointTrajectory:
133        """
134        This method plans a trajectory and checks for collisions.
135        The collision check only happens if the actions have collision scene data.
136
137        You must provide the exact same collision data into all the actions.
138        Because the underlying API supports collision checks for the whole trajectory only.
139
140        Raises:
141            InconsistentCollisionScenes: If the collision scene data is not consistent across all actions
142
143            Your actions should follow below rules to be considered consistent:
144            1- They all should have the same collision scene data
145            2- They all should have no collision data
146
147            PlanTrajectoryFailed: If the trajectory planning failed including the collision check
148
149        For more information about this API, please refer to the plan_trajectory in the API documentation.
150
151        Args:
152            actions: list of actions to plan, current supported actions are Motion and WriteActions
153                     WriteAction you specify on your path is handled in a performant way.
154                     Please check execute_trajectory.motion_command.set_io for more information.
155            tcp:     The tool to use
156            start_joint_position: The starting joint position, if none provided, current position of the robot is used
157            optimizer_setup: The optimizer setup
158
159        Returns: planned joint trajectory
160
161        """
162        # PREPARE THE REQUEST
163        collision_scenes = self._validate_collision_scenes(actions)
164        start_joint_position = start_joint_position or await self.joints()
165        robot_setup = optimizer_setup or await self._get_optimizer_setup(tcp=tcp)
166
167        motion_commands = CombinedActions(items=tuple(actions)).to_motion_command()  # type: ignore
168
169        static_colliders = None
170        collision_motion_group = None
171        if collision_scenes and len(collision_scenes) > 0:
172            static_colliders = collision_scenes[0].colliders
173
174            motion_group_type = robot_setup.motion_group_type
175            if (
176                collision_scenes[0].motion_groups
177                and motion_group_type in collision_scenes[0].motion_groups
178            ):
179                collision_motion_group = collision_scenes[0].motion_groups[motion_group_type]
180
181        request = wb.models.PlanTrajectoryRequest(
182            robot_setup=robot_setup,
183            start_joint_position=list(start_joint_position),
184            motion_commands=motion_commands,
185            static_colliders=static_colliders,
186            collision_motion_group=collision_motion_group,
187        )
188
189        return await self._api_gateway.plan_trajectory(
190            cell=self._cell, motion_group_id=self.motion_group_id, request=request
191        )
192
193    def _validate_collision_scenes(self, actions: list[Action]) -> list[models.CollisionScene]:
194        motion_count = len([action for action in actions if isinstance(action, Motion)])
195        collision_scenes = [
196            action.collision_scene
197            for action in actions
198            if isinstance(action, CollisionFreeMotion) and action.collision_scene is not None
199        ]
200
201        if len(collision_scenes) != 0 and len(collision_scenes) != motion_count:
202            raise InconsistentCollisionScenes(
203                "Only some of the actions have collision scene. Either specify it for all or none."
204            )
205
206        # If a collision scene is provided, the same should be provided for all the collision scene
207        if len(collision_scenes) > 1:
208            first_scene = collision_scenes[0]
209            if not all(
210                compare_collision_scenes(first_scene, scene) for scene in collision_scenes[1:]
211            ):
212                raise InconsistentCollisionScenes(
213                    "All actions must use the same collision scene but some are different"
214                )
215
216        return collision_scenes
217
218    # TODO: we get the optimizer setup from as an input because
219    #  it has a velocity setting which is used in collision free movement, I need to double check this
220    async def _plan_collision_free(
221        self,
222        action: CollisionFreeMotion,
223        tcp: str,
224        start_joint_position: list[float],
225        optimizer_setup: wb.models.OptimizerSetup | None = None,
226    ) -> wb.models.JointTrajectory:
227        """
228        This method plans a trajectory and avoids collisions.
229        This means if there is a collision along the way to the target pose or joint positions,
230        It will adjust the trajectory to avoid the collision.
231
232        The collision check only happens if the action have collision scene data.
233
234        For more information about this API, please refer to the plan_collision_free_ptp in the API documentation.
235
236        Args:
237            action: The target pose or joint positions to reach
238            tcp:     The tool to use
239            start_joint_position: The starting joint position, if none provided, current position of the robot is used
240            optimizer_setup: The optimizer setup
241
242        Returns: planned joint trajectory
243
244
245        """
246        target = wb.models.PlanCollisionFreePTPRequestTarget(
247            **action.model_dump(exclude_unset=True)
248        )
249        robot_setup = optimizer_setup or await self._get_optimizer_setup(tcp=tcp)
250
251        static_colliders = None
252        collision_motion_group = None
253        collision_scene = action.collision_scene
254        if collision_scene and collision_scene.colliders:
255            static_colliders = collision_scene.colliders
256
257            if (
258                collision_scene.motion_groups
259                and robot_setup.motion_group_type in collision_scene.motion_groups
260            ):
261                collision_motion_group = collision_scene.motion_groups[
262                    robot_setup.motion_group_type
263                ]
264
265        request: wb.models.PlanCollisionFreePTPRequest = wb.models.PlanCollisionFreePTPRequest(
266            robot_setup=robot_setup,
267            start_joint_position=start_joint_position,
268            target=target,
269            static_colliders=static_colliders,
270            collision_motion_group=collision_motion_group,
271        )
272
273        return await self._api_gateway.plan_collision_free_ptp(
274            cell=self._cell, motion_group_id=self.motion_group_id, request=request
275        )
276
277    async def _plan(
278        self,
279        actions: list[Action],
280        tcp: str,
281        start_joint_position: tuple[float, ...] | None = None,
282        optimizer_setup: wb.models.OptimizerSetup | None = None,
283    ) -> wb.models.JointTrajectory:
284        if not actions:
285            raise ValueError("No actions provided")
286
287        current_joints = start_joint_position or await self.joints()
288        robot_setup = optimizer_setup or await self._get_optimizer_setup(tcp=tcp)
289
290        all_trajectories = []
291        for batch in split_actions_into_batches(actions):
292            if len(batch) == 0:
293                raise ValueError("Empty batch of actions")
294
295            if isinstance(batch[0], CollisionFreeMotion):
296                motion: CollisionFreeMotion = cast(CollisionFreeMotion, batch[0])
297                trajectory = await self._plan_collision_free(
298                    action=motion,
299                    tcp=tcp,
300                    start_joint_position=list(current_joints),
301                    optimizer_setup=robot_setup,
302                )
303                all_trajectories.append(trajectory)
304                # the last joint position of this trajectory is the starting point for the next one
305                current_joints = tuple(trajectory.joint_positions[-1].joints)
306            else:
307                trajectory = await self._plan_with_collision_check(
308                    actions=batch,
309                    tcp=tcp,
310                    start_joint_position=current_joints,
311                    optimizer_setup=robot_setup,
312                )
313                all_trajectories.append(trajectory)
314                # the last joint position of this trajectory is the starting point for the next one
315                current_joints = tuple(trajectory.joint_positions[-1].joints)
316
317        return combine_trajectories(all_trajectories)
318
319    # TODO: refactor and simplify code, tests are already there
320    # TODO: split into batches when the collision scene changes in a batch of collision free motions
321
322    async def _execute(
323        self,
324        joint_trajectory: wb.models.JointTrajectory,
325        tcp: str,
326        actions: list[Action],
327        movement_controller: MovementController | None,
328    ) -> AsyncIterable[MovementResponse]:
329        if movement_controller is None:
330            movement_controller = move_forward
331
332        # Load planned trajectory
333        load_plan_response = await self._load_planned_motion(joint_trajectory, tcp)
334
335        # Move to start position
336        number_of_joints = await self._api_gateway.get_joint_number(
337            cell=self._cell, motion_group_id=self.motion_group_id
338        )
339        joints_velocities = [MAX_JOINT_VELOCITY_PREPARE_MOVE] * number_of_joints
340        movement_stream = await self.move_to_start_position(joints_velocities, load_plan_response)
341
342        # If there's an initial consumer, feed it the data
343        async for move_to_response in movement_stream:
344            # TODO: refactor
345            if (
346                move_to_response.state is None
347                or move_to_response.state.motion_groups is None
348                or len(move_to_response.state.motion_groups) == 0
349                or move_to_response.move_response is None
350                or move_to_response.move_response.current_location_on_trajectory is None
351            ):
352                continue
353
354            yield move_to_response
355
356        controller = movement_controller(
357            MovementControllerContext(
358                combined_actions=CombinedActions(items=tuple(actions)),  # type: ignore
359                motion_id=load_plan_response.motion,
360            )
361        )
362
363        def stop_condition(response: wb.models.ExecuteTrajectoryResponse) -> bool:
364            instance = response.actual_instance
365            # Stop when standstill indicates motion ended
366            return (
367                isinstance(instance, wb.models.Standstill)
368                and instance.standstill.reason == wb.models.StandstillReason.REASON_MOTION_ENDED
369            )
370
371        execute_response_streaming_controller = StreamExtractor(controller, stop_condition)
372        execution_task = asyncio.create_task(
373            self._api_gateway.motion_api.execute_trajectory(
374                cell=self._cell, client_request_generator=execute_response_streaming_controller
375            )
376        )
377        async for execute_response in execute_response_streaming_controller:
378            yield execute_response
379        await execution_task
380
381    async def _get_optimizer_setup(self, tcp: str) -> wb.models.OptimizerSetup:
382        # TODO: mypy failed on main branch, need to check
383        if self._optimizer_setup is None or self._optimizer_setup.tcp != tcp:  # type: ignore
384            self._optimizer_setup = await self._api_gateway.get_optimizer_config(
385                cell=self._cell, motion_group_id=self.motion_group_id, tcp=tcp
386            )
387
388        return self._optimizer_setup
389
390    async def _load_planned_motion(
391        self, joint_trajectory: wb.models.JointTrajectory, tcp: str
392    ) -> wb.models.PlanSuccessfulResponse:
393        return await self._api_gateway.load_planned_motion(
394            cell=self._cell,
395            motion_group_id=self.motion_group_id,
396            joint_trajectory=joint_trajectory,
397            tcp=tcp,
398        )
399
400    async def move_to_start_position(
401        self, joint_velocities, load_plan_response: LoadPlanResponse
402    ) -> InitialMovementStream:
403        limit_override = wb.models.LimitsOverride()
404        if joint_velocities is not None:
405            limit_override.joint_velocity_limits = wb.models.Joints(joints=joint_velocities)
406
407        return self._api_gateway.stream_move_to_trajectory_via_join_ptp(
408            cell=self._cell,
409            motion_id=load_plan_response.motion,
410            location_on_trajectory=0,
411            joint_velocity_limits=limit_override.joint_velocity_limits,
412        )
413
414    async def stop(self):
415        logger.debug(f"Stopping motion of {self}...")
416        try:
417            if self._current_motion is None:
418                raise ValueError("No motion to stop")
419            await self._api_gateway.stop_motion(cell=self._cell, motion_id=self._current_motion)
420            logger.debug(f"Motion {self.current_motion} stopped.")
421        except ValueError as e:
422            logger.debug(f"No motion to stop for {self}: {e}")
423
424    async def get_state(self, tcp: str | None = None) -> RobotState:
425        response = await self._api_gateway.get_motion_group_state(
426            cell=self._cell, motion_group_id=self.motion_group_id, tcp=tcp
427        )
428        return RobotState(
429            pose=Pose(response.state.tcp_pose), joints=tuple(response.state.joint_position.joints)
430        )
431
432    async def joints(self) -> tuple:
433        state = await self.get_state()
434        if state.joints is None:
435            raise ValueError(
436                f"No joint positions available for motion group {self._motion_group_id}"
437            )
438        return state.joints
439
440    async def tcp_pose(self, tcp: str | None = None) -> Pose:
441        state = await self.get_state(tcp=tcp)
442        return state.pose
443
444    async def tcps(self) -> list[wb.models.RobotTcp]:
445        response = await self._api_gateway.list_tcps(
446            cell=self._cell, motion_group_id=self.motion_group_id
447        )
448        return response.tcps if response.tcps else []
449
450    async def tcp_names(self) -> list[str]:
451        return [tcp.id for tcp in await self.tcps()]
452
453    async def active_tcp(self) -> wb.models.RobotTcp:
454        active_tcp = await self._api_gateway.get_active_tcp(
455            cell=self._cell, motion_group_id=self.motion_group_id
456        )
457        return active_tcp
458
459    async def active_tcp_name(self) -> str:
460        active_tcp = await self.active_tcp()
461        return active_tcp.id

Manages motion planning and execution within a specified motion group.

MotionGroup( api_gateway: nova.core.gateway.ApiGateway, cell: str, motion_group_id: str)
84    def __init__(self, api_gateway: ApiGateway, cell: str, motion_group_id: str):
85        """
86        Initializes a new MotionGroup instance.
87
88        Args:
89            api_gateway (ApiGateway): The API gateway through which motion commands are sent.
90            cell (str): The name or identifier of the robotic cell.
91            motion_group_id (str): The identifier of the motion group.
92        """
93        self._api_gateway = api_gateway
94        self._cell = cell
95        self._motion_group_id = motion_group_id
96        self._current_motion: str | None = None
97        self._optimizer_setup: wb.models.OptimizerSetup | None = None
98        super().__init__(id=motion_group_id)

Initializes a new MotionGroup instance.

Arguments:
  • api_gateway (ApiGateway): The API gateway through which motion commands are sent.
  • cell (str): The name or identifier of the robotic cell.
  • motion_group_id (str): The identifier of the motion group.
async def open(self):
100    async def open(self):
101        await self._api_gateway.activate_motion_group(
102            cell=self._cell, motion_group_id=self._motion_group_id
103        )
104        return self

Allocates the external hardware resource (i.e. establish a connection)

async def close(self):
106    async def close(self):
107        # RPS-1174: when a motion group is deactivated, RAE closes all open connections
108        #           this behaviour is not desired in some cases,
109        #           so for now we will not deactivate for the user
110        pass

Release the external hardware (i.e. close connection or set mode of external hardware back)

motion_group_id: str
112    @property
113    def motion_group_id(self) -> str:
114        """
115        Returns:
116            str: The unique identifier for this motion group.
117        """
118        return self._motion_group_id

Returns: str: The unique identifier for this motion group.

current_motion: str | None
120    @property
121    def current_motion(self) -> str | None:
122        # if not self._current_motion:
123        #    raise ValueError("No MotionId attached. There is no planned motion available.")
124        return self._current_motion
async def move_to_start_position( self, joint_velocities, load_plan_response: wandelbots_api_client.models.plan_successful_response.PlanSuccessfulResponse) -> AsyncIterator[wandelbots_api_client.models.stream_move_response.StreamMoveResponse]:
400    async def move_to_start_position(
401        self, joint_velocities, load_plan_response: LoadPlanResponse
402    ) -> InitialMovementStream:
403        limit_override = wb.models.LimitsOverride()
404        if joint_velocities is not None:
405            limit_override.joint_velocity_limits = wb.models.Joints(joints=joint_velocities)
406
407        return self._api_gateway.stream_move_to_trajectory_via_join_ptp(
408            cell=self._cell,
409            motion_id=load_plan_response.motion,
410            location_on_trajectory=0,
411            joint_velocity_limits=limit_override.joint_velocity_limits,
412        )
async def stop(self):
414    async def stop(self):
415        logger.debug(f"Stopping motion of {self}...")
416        try:
417            if self._current_motion is None:
418                raise ValueError("No motion to stop")
419            await self._api_gateway.stop_motion(cell=self._cell, motion_id=self._current_motion)
420            logger.debug(f"Motion {self.current_motion} stopped.")
421        except ValueError as e:
422            logger.debug(f"No motion to stop for {self}: {e}")

Stop behaviour of the robot

async def get_state(self, tcp: str | None = None) -> nova.types.RobotState:
424    async def get_state(self, tcp: str | None = None) -> RobotState:
425        response = await self._api_gateway.get_motion_group_state(
426            cell=self._cell, motion_group_id=self.motion_group_id, tcp=tcp
427        )
428        return RobotState(
429            pose=Pose(response.state.tcp_pose), joints=tuple(response.state.joint_position.joints)
430        )

Current state (pose, joints) of the robot based on the tcp.

Arguments:
  • tcp (str): The id of the tool center point (TCP) to be used for tcp_pose in response. If not set, the flange pose is returned as tcp_pose.

Returns: the current state of the robot

async def joints(self) -> tuple:
432    async def joints(self) -> tuple:
433        state = await self.get_state()
434        if state.joints is None:
435            raise ValueError(
436                f"No joint positions available for motion group {self._motion_group_id}"
437            )
438        return state.joints

Return the current joint positions of the robot

Returns: the current joint positions

async def tcp_pose(self, tcp: str | None = None) -> nova.types.Pose:
440    async def tcp_pose(self, tcp: str | None = None) -> Pose:
441        state = await self.get_state(tcp=tcp)
442        return state.pose

Return the current pose of the robot based on the tcp

Arguments:
  • tcp (str): The id of the tool center point (TCP) to be used for tcp_pose in response. If not set, the flange pose is returned as tcp_pose.

Returns: the current pose of the robot

async def tcps(self) -> list[wandelbots_api_client.models.robot_tcp.RobotTcp]:
444    async def tcps(self) -> list[wb.models.RobotTcp]:
445        response = await self._api_gateway.list_tcps(
446            cell=self._cell, motion_group_id=self.motion_group_id
447        )
448        return response.tcps if response.tcps else []

Return all TCPs that are configured on the robot with corresponding offset from flange as pose

Returns: the TCPs of the robot

async def tcp_names(self) -> list[str]:
450    async def tcp_names(self) -> list[str]:
451        return [tcp.id for tcp in await self.tcps()]

Return the name of all TCPs that are configured on the robot

Returns: a list of all TCPs

async def active_tcp(self) -> wandelbots_api_client.models.robot_tcp.RobotTcp:
453    async def active_tcp(self) -> wb.models.RobotTcp:
454        active_tcp = await self._api_gateway.get_active_tcp(
455            cell=self._cell, motion_group_id=self.motion_group_id
456        )
457        return active_tcp

Return the active TCP of the robot

Returns: the active TCP

async def active_tcp_name(self) -> str:
459    async def active_tcp_name(self) -> str:
460        active_tcp = await self.active_tcp()
461        return active_tcp.id

Return the name of the active TCP of the robot

Returns: the name of the active TCP

class Controller(typing.Sized, nova.core.robot_cell.AbstractController, nova.core.gateway.NovaDevice, nova.core.robot_cell.IODevice):
 12class Controller(Sized, AbstractController, NovaDevice, IODevice):
 13    """
 14    Represents a Nova controller, managing motion groups and IO interactions.
 15    """
 16
 17    class Configuration(NovaDevice.Configuration):
 18        type: Literal["controller"] = "controller"
 19        id: str = "controller"
 20        cell_id: str
 21        controller_id: str
 22
 23    def __init__(self, configuration: Configuration):
 24        super().__init__(configuration)
 25        self._activated_motion_group_ids: list[str] = []
 26        self._io_access = IOAccess(
 27            api_gateway=self._nova_api,
 28            cell=self.configuration.cell_id,
 29            controller_id=self.configuration.controller_id,
 30        )
 31
 32    @property
 33    def controller_id(self) -> str:
 34        """Returns the unique controller ID."""
 35        return self.configuration.controller_id
 36
 37    async def open(self):
 38        """Activates all motion groups."""
 39        motion_group_ids = await self.activated_motion_group_ids()
 40        self._activated_motion_group_ids = motion_group_ids
 41        logger.info(f"Found motion group {motion_group_ids}")
 42        return self
 43
 44    async def close(self):
 45        # RPS-1174: when a motion group is deactivated, RAE closes all open connections
 46        #           this behaviour is not desired in some cases,
 47        #           so for now we will not deactivate for the user
 48        pass
 49
 50    def __len__(self) -> int:
 51        return len(self._activated_motion_group_ids)
 52
 53    def motion_group(self, motion_group_id: str) -> MotionGroup:
 54        """Returns motion group with specific id.
 55
 56        Args:
 57            motion_group_id (str): The ID of the motion group.
 58
 59        Returns:
 60            MotionGroup: A MotionGroup instance corresponding to the given ID.
 61        """
 62        return MotionGroup(
 63            api_gateway=self._nova_api,
 64            cell=self.configuration.cell_id,
 65            motion_group_id=motion_group_id,
 66        )
 67
 68    def __getitem__(self, motion_group_id: int) -> MotionGroup:
 69        return self.motion_group(f"{motion_group_id}@{self.configuration.controller_id}")
 70
 71    async def activated_motion_group_ids(self) -> list[str]:
 72        """Activates and retrieves the list of motion group IDs available on this controller.
 73
 74        The system automatically activates all motion groups on the associated controller.
 75
 76        Returns:
 77            list[str]: A list of activated motion group IDs (e.g., ["0@controller_id"]).
 78        """
 79        return await self._nova_api.activate_all_motion_groups(
 80            cell=self.configuration.cell_id, controller=self.configuration.controller_id
 81        )
 82
 83    async def activated_motion_groups(self) -> list[MotionGroup]:
 84        """Retrieves a list of `MotionGroup` instances for all activated motion groups.
 85
 86        Returns:
 87            list[MotionGroup]: All activated motion groups as `MotionGroup` objects.
 88        """
 89        motion_group_ids = await self.activated_motion_group_ids()
 90        return [self.motion_group(motion_group_id) for motion_group_id in motion_group_ids]
 91
 92    def get_robots(self) -> dict[str, AbstractRobot]:
 93        """Retrieves a dictionary of motion group IDs to their corresponding robots.
 94
 95        Note:
 96            This method interprets motion group IDs to create corresponding `MotionGroup`
 97            objects and returns them as `AbstractRobot` references.
 98
 99        Returns:
100            dict[str, AbstractRobot]: A mapping of motion group ID to `MotionGroup` instance.
101        """
102        return {
103            motion_group_id: self.motion_group(motion_group_id)
104            for motion_group_id in self._activated_motion_group_ids
105        }
106
107    async def read(self, key: str) -> ValueType:
108        """Reads an IO value from the controller.
109
110        Args:
111            key (str): The IO key name.
112
113        Returns:
114            ValueType: The value associated with the specified IO key.
115        """
116        return await self._io_access.read(key)
117
118    async def write(self, key: str, value: ValueType) -> None:
119        """Writes an IO value to the controller.
120
121        Args:
122            key (str): The IO key name.
123            value (ValueType): The value to write to the specified IO key.
124        """
125        return await self._io_access.write(key, value)
126
127    async def stream_state(self, rate_msecs) -> AsyncGenerator[models.RobotControllerState, None]:
128        async for state in self._nova_api.stream_robot_controller_state(
129            cell=self.configuration.cell_id,
130            controller_id=self.configuration.controller_id,
131            response_rate=rate_msecs,
132        ):
133            yield state

Represents a Nova controller, managing motion groups and IO interactions.

controller_id: str
32    @property
33    def controller_id(self) -> str:
34        """Returns the unique controller ID."""
35        return self.configuration.controller_id

Returns the unique controller ID.

async def open(self):
37    async def open(self):
38        """Activates all motion groups."""
39        motion_group_ids = await self.activated_motion_group_ids()
40        self._activated_motion_group_ids = motion_group_ids
41        logger.info(f"Found motion group {motion_group_ids}")
42        return self

Activates all motion groups.

async def close(self):
44    async def close(self):
45        # RPS-1174: when a motion group is deactivated, RAE closes all open connections
46        #           this behaviour is not desired in some cases,
47        #           so for now we will not deactivate for the user
48        pass

Release the external hardware (i.e. close connection or set mode of external hardware back)

def motion_group(self, motion_group_id: str) -> MotionGroup:
53    def motion_group(self, motion_group_id: str) -> MotionGroup:
54        """Returns motion group with specific id.
55
56        Args:
57            motion_group_id (str): The ID of the motion group.
58
59        Returns:
60            MotionGroup: A MotionGroup instance corresponding to the given ID.
61        """
62        return MotionGroup(
63            api_gateway=self._nova_api,
64            cell=self.configuration.cell_id,
65            motion_group_id=motion_group_id,
66        )

Returns motion group with specific id.

Arguments:
  • motion_group_id (str): The ID of the motion group.
Returns:

MotionGroup: A MotionGroup instance corresponding to the given ID.

async def activated_motion_group_ids(self) -> list[str]:
71    async def activated_motion_group_ids(self) -> list[str]:
72        """Activates and retrieves the list of motion group IDs available on this controller.
73
74        The system automatically activates all motion groups on the associated controller.
75
76        Returns:
77            list[str]: A list of activated motion group IDs (e.g., ["0@controller_id"]).
78        """
79        return await self._nova_api.activate_all_motion_groups(
80            cell=self.configuration.cell_id, controller=self.configuration.controller_id
81        )

Activates and retrieves the list of motion group IDs available on this controller.

The system automatically activates all motion groups on the associated controller.

Returns:

list[str]: A list of activated motion group IDs (e.g., ["0@controller_id"]).

async def activated_motion_groups(self) -> list[MotionGroup]:
83    async def activated_motion_groups(self) -> list[MotionGroup]:
84        """Retrieves a list of `MotionGroup` instances for all activated motion groups.
85
86        Returns:
87            list[MotionGroup]: All activated motion groups as `MotionGroup` objects.
88        """
89        motion_group_ids = await self.activated_motion_group_ids()
90        return [self.motion_group(motion_group_id) for motion_group_id in motion_group_ids]

Retrieves a list of MotionGroup instances for all activated motion groups.

Returns:

list[MotionGroup]: All activated motion groups as MotionGroup objects.

async def read(self, key: str) -> Union[int, str, bool, float, nova.types.Pose]:
107    async def read(self, key: str) -> ValueType:
108        """Reads an IO value from the controller.
109
110        Args:
111            key (str): The IO key name.
112
113        Returns:
114            ValueType: The value associated with the specified IO key.
115        """
116        return await self._io_access.read(key)

Reads an IO value from the controller.

Arguments:
  • key (str): The IO key name.
Returns:

ValueType: The value associated with the specified IO key.

async def write( self, key: str, value: Union[int, str, bool, float, nova.types.Pose]) -> None:
118    async def write(self, key: str, value: ValueType) -> None:
119        """Writes an IO value to the controller.
120
121        Args:
122            key (str): The IO key name.
123            value (ValueType): The value to write to the specified IO key.
124        """
125        return await self._io_access.write(key, value)

Writes an IO value to the controller.

Arguments:
  • key (str): The IO key name.
  • value (ValueType): The value to write to the specified IO key.
async def stream_state( self, rate_msecs) -> AsyncGenerator[wandelbots_api_client.models.robot_controller_state.RobotControllerState, NoneType]:
127    async def stream_state(self, rate_msecs) -> AsyncGenerator[models.RobotControllerState, None]:
128        async for state in self._nova_api.stream_robot_controller_state(
129            cell=self.configuration.cell_id,
130            controller_id=self.configuration.controller_id,
131            response_rate=rate_msecs,
132        ):
133            yield state
def speed_up_movement_controller( context: nova.actions.MovementControllerContext, on_movement: Callable[[nova.types.MotionState | None], NoneType]) -> Callable[[AsyncIterator[wandelbots_api_client.models.execute_trajectory_response.ExecuteTrajectoryResponse]], AsyncIterator[wandelbots_api_client.models.execute_trajectory_request.ExecuteTrajectoryRequest]]:
113def speed_up(
114    context: MovementControllerContext, on_movement: Callable[[MotionState | None], None]
115) -> MovementControllerFunction:
116    async def movement_controller(
117        response_stream: ExecuteTrajectoryResponseStream,
118    ) -> ExecuteTrajectoryRequestStream:
119        # The first request is to initialize the movement
120        yield wb.models.InitializeMovementRequest(trajectory=context.motion_id, initial_location=0)  # type: ignore
121
122        # then we get the response
123        initialize_movement_response = await anext(response_stream)
124        if isinstance(
125            initialize_movement_response.actual_instance, wb.models.InitializeMovementResponse
126        ):
127            r1 = initialize_movement_response.actual_instance
128            if not r1.init_response.succeeded:
129                raise InitMovementFailed(r1.init_response)
130
131        # The second request is to start the movement
132        set_io_list = context.combined_actions.to_set_io()
133        yield wb.models.StartMovementRequest(
134            set_ios=set_io_list, start_on_io=None, pause_on_io=None
135        )  # type: ignore
136
137        counter = 0
138        latest_speed = 10
139        # then we wait until the movement is finished
140        async for execute_trajectory_response in response_stream:
141            counter += 1
142            instance = execute_trajectory_response.actual_instance
143            # Send the current location to the consume
144            if isinstance(instance, wb.models.Movement):
145                motion_state = movement_to_motion_state(instance)
146                if motion_state:
147                    on_movement(motion_state)
148
149            # Terminate the generator
150            if isinstance(instance, wb.models.Standstill):
151                if instance.standstill.reason == wb.models.StandstillReason.REASON_MOTION_ENDED:
152                    on_movement(None)
153                    return
154
155            if isinstance(instance, wb.models.PlaybackSpeedResponse):
156                playback_speed = instance.playback_speed_response
157                logger.info(f"Current playback speed: {playback_speed}")
158
159            if counter % 10 == 0:
160                yield wb.models.ExecuteTrajectoryRequest(
161                    wb.models.PlaybackSpeedRequest(playback_speed_in_percent=latest_speed)
162                )
163                counter = 0
164                latest_speed += 5
165                if latest_speed > 100:
166                    latest_speed = 100
167
168    return movement_controller

The type of the None singleton.

class MotionSettings(pydantic.main.BaseModel):
  6class MotionSettings(pydantic.BaseModel):
  7    """
  8    Settings for an action. This is closely related to the `MotionCommand` in the API.
  9    See planTrajectory.motion_commands for more information.
 10
 11    Motion settings are immutable; if you need to change a setting, create a copy and update the new object.
 12
 13    Attributes:
 14        min_blending_velocity:
 15            A minimum velocity for blending, in percent. Cannot be used if `blending` is set.
 16
 17        position_zone_radius:
 18            Defines the position zone radius.
 19
 20        joint_velocity_limits:
 21            Maximum joint velocities
 22
 23        joint_acceleration_limits:
 24            Maximum joint accelerations
 25
 26        tcp_velocity_limit:
 27            Maximum TCP velocity
 28
 29        tcp_acceleration_limit:
 30            Maximum TCP acceleration
 31
 32        tcp_orientation_velocity_limit:
 33            Maximum TCP orientation velocity
 34
 35        tcp_orientation_acceleration_limit:
 36            Maximum TCP orientation acceleration
 37    """
 38
 39    min_blending_velocity: int | None = pydantic.Field(default=None)
 40    position_zone_radius: float | None = pydantic.Field(default=None)
 41    joint_velocity_limits: tuple[float, ...] | None = pydantic.Field(default=None)
 42    joint_acceleration_limits: tuple[float, ...] | None = pydantic.Field(default=None)
 43    tcp_velocity_limit: float | None = pydantic.Field(default=50)
 44    tcp_acceleration_limit: float | None = pydantic.Field(default=None)
 45    tcp_orientation_velocity_limit: float | None = pydantic.Field(default=None)
 46    tcp_orientation_acceleration_limit: float | None = pydantic.Field(default=None)
 47
 48    class Config:
 49        frozen = True
 50
 51    @classmethod
 52    def field_to_varname(cls, field):
 53        return f"__ms_{field}"
 54
 55    @pydantic.model_validator(mode="after")
 56    def validate_blending_settings(self) -> "MotionSettings":
 57        if self.min_blending_velocity and self.position_zone_radius:
 58            raise ValueError("Can't set both min_blending_velocity and blending")
 59        return self
 60
 61    def has_blending_settings(self) -> bool:
 62        return any([self.min_blending_velocity, self.position_zone_radius])
 63
 64    def has_limits_override(self) -> bool:
 65        return any(
 66            [
 67                self.tcp_velocity_limit,
 68                self.tcp_acceleration_limit,
 69                self.tcp_orientation_velocity_limit,
 70                self.tcp_orientation_acceleration_limit,
 71                self.joint_velocity_limits,
 72                self.joint_acceleration_limits,
 73            ]
 74        )
 75
 76    def as_limits_settings(self) -> wb.models.LimitsOverride:
 77        return wb.models.LimitsOverride(
 78            joint_velocity_limits=wb.models.Joints(joints=self.joint_velocity_limits)  # type: ignore
 79            if self.joint_velocity_limits
 80            else None,
 81            joint_acceleration_limits=wb.models.Joints(joints=self.joint_acceleration_limits)  # type: ignore
 82            if self.joint_acceleration_limits
 83            else None,
 84            tcp_velocity_limit=self.tcp_velocity_limit,
 85            tcp_acceleration_limit=self.tcp_acceleration_limit,
 86            tcp_orientation_velocity_limit=self.tcp_orientation_velocity_limit,
 87            tcp_orientation_acceleration_limit=self.tcp_orientation_acceleration_limit,
 88        )
 89
 90    def as_blending_setting(self) -> wb.models.MotionCommandBlending:
 91        if self.position_zone_radius:
 92            return wb.models.MotionCommandBlending(
 93                wb.models.BlendingPosition(
 94                    position_zone_radius=self.position_zone_radius, blending_name="BlendingPosition"
 95                )
 96            )
 97        return wb.models.MotionCommandBlending(
 98            wb.models.BlendingAuto(
 99                min_velocity_in_percent=self.min_blending_velocity, blending_name="BlendingAuto"
100            )
101        )

Settings for an action. This is closely related to the MotionCommand in the API. See planTrajectory.motion_commands for more information.

Motion settings are immutable; if you need to change a setting, create a copy and update the new object.

Attributes:
  • min_blending_velocity: A minimum velocity for blending, in percent. Cannot be used if blending is set.
  • position_zone_radius: Defines the position zone radius.
  • joint_velocity_limits: Maximum joint velocities
  • joint_acceleration_limits: Maximum joint accelerations
  • tcp_velocity_limit: Maximum TCP velocity
  • tcp_acceleration_limit: Maximum TCP acceleration
  • tcp_orientation_velocity_limit: Maximum TCP orientation velocity
  • tcp_orientation_acceleration_limit: Maximum TCP orientation acceleration
min_blending_velocity: int | None
position_zone_radius: float | None
joint_velocity_limits: tuple[float, ...] | None
joint_acceleration_limits: tuple[float, ...] | None
tcp_velocity_limit: float | None
tcp_acceleration_limit: float | None
tcp_orientation_velocity_limit: float | None
tcp_orientation_acceleration_limit: float | None
@classmethod
def field_to_varname(cls, field):
51    @classmethod
52    def field_to_varname(cls, field):
53        return f"__ms_{field}"
@pydantic.model_validator(mode='after')
def validate_blending_settings(self) -> MotionSettings:
55    @pydantic.model_validator(mode="after")
56    def validate_blending_settings(self) -> "MotionSettings":
57        if self.min_blending_velocity and self.position_zone_radius:
58            raise ValueError("Can't set both min_blending_velocity and blending")
59        return self
def has_blending_settings(self) -> bool:
61    def has_blending_settings(self) -> bool:
62        return any([self.min_blending_velocity, self.position_zone_radius])
def has_limits_override(self) -> bool:
64    def has_limits_override(self) -> bool:
65        return any(
66            [
67                self.tcp_velocity_limit,
68                self.tcp_acceleration_limit,
69                self.tcp_orientation_velocity_limit,
70                self.tcp_orientation_acceleration_limit,
71                self.joint_velocity_limits,
72                self.joint_acceleration_limits,
73            ]
74        )
def as_limits_settings(self) -> wandelbots_api_client.models.limits_override.LimitsOverride:
76    def as_limits_settings(self) -> wb.models.LimitsOverride:
77        return wb.models.LimitsOverride(
78            joint_velocity_limits=wb.models.Joints(joints=self.joint_velocity_limits)  # type: ignore
79            if self.joint_velocity_limits
80            else None,
81            joint_acceleration_limits=wb.models.Joints(joints=self.joint_acceleration_limits)  # type: ignore
82            if self.joint_acceleration_limits
83            else None,
84            tcp_velocity_limit=self.tcp_velocity_limit,
85            tcp_acceleration_limit=self.tcp_acceleration_limit,
86            tcp_orientation_velocity_limit=self.tcp_orientation_velocity_limit,
87            tcp_orientation_acceleration_limit=self.tcp_orientation_acceleration_limit,
88        )
def as_blending_setting( self) -> wandelbots_api_client.models.motion_command_blending.MotionCommandBlending:
 90    def as_blending_setting(self) -> wb.models.MotionCommandBlending:
 91        if self.position_zone_radius:
 92            return wb.models.MotionCommandBlending(
 93                wb.models.BlendingPosition(
 94                    position_zone_radius=self.position_zone_radius, blending_name="BlendingPosition"
 95                )
 96            )
 97        return wb.models.MotionCommandBlending(
 98            wb.models.BlendingAuto(
 99                min_velocity_in_percent=self.min_blending_velocity, blending_name="BlendingAuto"
100            )
101        )
model_config: ClassVar[pydantic.config.ConfigDict] = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class MotionSettings.Config:
48    class Config:
49        frozen = True
frozen = True
logger = <Logger wandelbots-nova (INFO)>
__version__ = '0.47.12'