nova

 1from nova.cell.cell import Cell
 2from nova.core.controller import Controller
 3from nova.core.logging import logger
 4from nova.core.motion_group import MotionGroup, combine_trajectories
 5from nova.core.nova import Nova
 6from nova.runtime.function import wrap as program
 7from nova.version import version
 8
 9__version__ = version
10
11__all__ = [
12    "Nova",
13    "Cell",
14    "MotionGroup",
15    "combine_trajectories",
16    "Controller",
17    "api",
18    "types",
19    "actions",
20    "logger",
21    "program",
22    "__version__",
23]
class Nova:
15class Nova:
16    """A high-level Nova client for interacting with robot cells and controllers."""
17
18    _api_client: ApiGateway
19
20    def __init__(
21        self,
22        *,
23        host: str | None = None,
24        access_token: str | None = None,
25        username: str | None = None,
26        password: str | None = None,
27        version: str = "v1",
28        verify_ssl: bool = True,
29    ):
30        """
31        Initialize the Nova client.
32
33        Args:
34            host (str | None): The Nova API host.
35            access_token (str | None): An access token for the Nova API.
36            username (str | None): Username to authenticate with the Nova API.
37            password (str | None): Password to authenticate with the Nova API.
38            version (str): The API version to use (default: "v1").
39            verify_ssl (bool): Whether or not to verify SSL certificates (default: True).
40        """
41
42        self._api_client = ApiGateway(
43            host=host,
44            access_token=access_token,
45            username=username,
46            password=password,
47            version=version,
48            verify_ssl=verify_ssl,
49        )
50
51    def cell(self, cell_id: str = CELL_NAME) -> Cell:
52        """Returns the cell object with the given ID."""
53        return Cell(self._api_client, cell_id)
54
55    async def close(self):
56        """Closes the underlying API client session."""
57        # hardcoded for now, later stuff like NATS might become devices
58        await nats.close()
59        return await self._api_client.close()
60
61    async def __aenter__(self):
62        return self
63
64    async def __aexit__(self, exc_type, exc_val, exc_tb):
65        await self.close()

A high-level Nova client for interacting with robot cells and controllers.

Nova( *, host: str | None = None, access_token: str | None = None, username: str | None = None, password: str | None = None, version: str = 'v1', verify_ssl: bool = True)
20    def __init__(
21        self,
22        *,
23        host: str | None = None,
24        access_token: str | None = None,
25        username: str | None = None,
26        password: str | None = None,
27        version: str = "v1",
28        verify_ssl: bool = True,
29    ):
30        """
31        Initialize the Nova client.
32
33        Args:
34            host (str | None): The Nova API host.
35            access_token (str | None): An access token for the Nova API.
36            username (str | None): Username to authenticate with the Nova API.
37            password (str | None): Password to authenticate with the Nova API.
38            version (str): The API version to use (default: "v1").
39            verify_ssl (bool): Whether or not to verify SSL certificates (default: True).
40        """
41
42        self._api_client = ApiGateway(
43            host=host,
44            access_token=access_token,
45            username=username,
46            password=password,
47            version=version,
48            verify_ssl=verify_ssl,
49        )

Initialize the Nova client.

Arguments:
  • host (str | None): The Nova API host.
  • access_token (str | None): An access token for the Nova API.
  • username (str | None): Username to authenticate with the Nova API.
  • password (str | None): Password to authenticate with the Nova API.
  • version (str): The API version to use (default: "v1").
  • verify_ssl (bool): Whether or not to verify SSL certificates (default: True).
def cell(self, cell_id: str = 'cell') -> Cell:
51    def cell(self, cell_id: str = CELL_NAME) -> Cell:
52        """Returns the cell object with the given ID."""
53        return Cell(self._api_client, cell_id)

Returns the cell object with the given ID.

async def close(self):
55    async def close(self):
56        """Closes the underlying API client session."""
57        # hardcoded for now, later stuff like NATS might become devices
58        await nats.close()
59        return await self._api_client.close()

Closes the underlying API client session.

class Cell:
 16class Cell:
 17    """A representation of a robot cell, providing high-level operations on controllers."""
 18
 19    def __init__(self, api_gateway: ApiGateway, cell_id: str):
 20        """
 21        Initializes a Cell instance.
 22        Args:
 23            api_gateway (ApiGateway): The underlying gateway for making API calls.
 24            cell_id (str): The unique identifier for the cell.
 25        """
 26        self._api_gateway = api_gateway
 27        self._cell_id = cell_id
 28
 29    @property
 30    def cell_id(self) -> str:
 31        """
 32        Returns unique identifier for this cell.
 33        Returns:
 34            str: The cell ID.
 35        """
 36        return self._cell_id
 37
 38    def _create_controller(self, controller_id: str) -> Controller:
 39        return Controller(
 40            configuration=Controller.Configuration(
 41                nova_api=self._api_gateway.host,
 42                nova_access_token=self._api_gateway.access_token,
 43                nova_username=self._api_gateway.username,
 44                nova_password=self._api_gateway.password,
 45                cell_id=self._cell_id,
 46                controller_id=controller_id,
 47                id=controller_id,
 48            )
 49        )
 50
 51    async def add_virtual_robot_controller(
 52        self,
 53        name: str,
 54        controller_type: api.models.VirtualControllerTypes,
 55        controller_manufacturer: api.models.Manufacturer,
 56        timeout: int = DEFAULT_ADD_CONTROLLER_TIMEOUT,
 57        wait_for_ready_timeout: int = DEFAULT_WAIT_FOR_READY_TIMEOUT,
 58        position: str | None = None,
 59    ) -> Controller:
 60        return await self.add_controller(
 61            robot_controller=virtual_controller(
 62                name=name,
 63                type=controller_type,
 64                manufacturer=controller_manufacturer,
 65                position=position,
 66            ),
 67            add_timeout=timeout,
 68            wait_for_ready_timeout=wait_for_ready_timeout,
 69        )
 70
 71    async def ensure_virtual_robot_controller(
 72        self,
 73        name: str,
 74        controller_type: api.models.VirtualControllerTypes,
 75        controller_manufacturer: api.models.Manufacturer,
 76        timeout: int = DEFAULT_ADD_CONTROLLER_TIMEOUT,
 77        wait_for_ready_timeout: int = DEFAULT_WAIT_FOR_READY_TIMEOUT,
 78    ) -> Controller:
 79        return await self.ensure_controller(
 80            robot_controller=virtual_controller(
 81                name=name, type=controller_type, manufacturer=controller_manufacturer
 82            ),
 83            add_timeout=timeout,
 84            wait_for_ready_timeout=wait_for_ready_timeout,
 85        )
 86
 87    async def add_controller(
 88        self,
 89        robot_controller: api.models.RobotController,
 90        add_timeout: int = DEFAULT_ADD_CONTROLLER_TIMEOUT,
 91        wait_for_ready_timeout: int = DEFAULT_WAIT_FOR_READY_TIMEOUT,
 92    ) -> Controller:
 93        """
 94        Add a robot controller to the cell and wait for it to get ready.
 95        Args:
 96            robot_controller (api.models.RobotController): The robot controller to add. You can use helper functions from nova to create these configs easily,
 97                see :func:`nova.cell.abb_controller`, :func:`nova.cell.fanuc_controller`, :func:`nova.cell.kuka_controller`,
 98                :func:`nova.cell.universal_robots_controller`, :func:`nova.cell.virtual_controller`, :func:`nova.cell.yaskawa_controller`.
 99            add_timeout (int): The time to wait for the controller to be added (default: 25).
100            wait_for_ready_timeout (int): The time to wait for the controller to be ready (default: 25).
101
102        Returns:
103            Controller: The added Controller object.
104        """
105        await self._api_gateway.add_robot_controller(
106            cell=self._cell_id, robot_controller=robot_controller, timeout=add_timeout
107        )
108
109        await self._api_gateway.wait_for_controller_ready(
110            cell=self._cell_id, name=robot_controller.name, timeout=wait_for_ready_timeout
111        )
112
113        return self._create_controller(robot_controller.name)
114
115    async def ensure_controller(
116        self,
117        robot_controller: api.models.RobotController,
118        add_timeout: int = DEFAULT_ADD_CONTROLLER_TIMEOUT,
119        wait_for_ready_timeout: int = DEFAULT_WAIT_FOR_READY_TIMEOUT,
120    ) -> Controller:
121        """
122        Ensure that a robot controller is added to the cell. If it already exists, it will be returned.
123        If it doesn't exist, it will be added and waited for to be ready.
124        Args:
125            robot_controller (api.models.RobotController): The robot controller to add. You can use helper functions from nova to create these configs easily,
126                see :func:`nova.abb_controller`, :func:`nova.fanuc_controller`, :func:`nova.kuka_controller`,
127                :func:`nova.universal_robots_controller`, :func:`nova.virtual_controller`, :func:`nova.yaskawa_controller`.
128            add_timeout (int): The time to wait for the controller to be added (default: 25).
129            wait_for_ready_timeout (int): The time to wait for the controller to be ready (default: 25).
130
131        Returns:
132            Controller: The added Controller object.
133        """
134        controller = await self._api_gateway.get_controller_instance(
135            cell=self.cell_id, name=robot_controller.name
136        )
137
138        if controller:
139            return self._create_controller(controller.controller)
140        return await self.add_controller(
141            robot_controller, add_timeout=add_timeout, wait_for_ready_timeout=wait_for_ready_timeout
142        )
143
144    async def controllers(self) -> list[Controller]:
145        """
146        List all controllers for this cell.
147        Returns:
148            list[Controller]: A list of Controller objects associated with this cell.
149        """
150        instances = await self._api_gateway.list_controllers(cell=self._cell_id)
151        return [self._create_controller(ci.controller) for ci in instances]
152
153    async def controller(self, name: str) -> Controller:
154        """
155        Retrieve a specific controller by name.
156        Args:
157            name (str): The name of the controller.
158        Returns:
159            Controller: The Controller object.
160        Raises:
161            ControllerNotFound: If no controller with the specified name exists.
162        """
163        controller_instance = await self._api_gateway.get_controller_instance(
164            cell=self._cell_id, name=name
165        )
166        if not controller_instance:
167            raise ControllerNotFound(controller=name)
168        return self._create_controller(controller_instance.controller)
169
170    async def delete_robot_controller(self, name: str, timeout: int = 25):
171        """
172        Delete a robot controller from the cell.
173        Args:
174            name (str): The name of the controller to delete.
175            timeout (int): The time to wait for the controller deletion to complete (default: 25).
176        """
177        await self._api_gateway.delete_robot_controller(
178            cell=self._cell_id, controller=name, completion_timeout=timeout
179        )
180
181    async def get_robot_cell(self) -> RobotCell:
182        """
183        Return a RobotCell object containing all known controllers.
184        Returns:
185            RobotCell: A RobotCell initialized with the available controllers.
186        """
187        controllers = await self.controllers()
188        return RobotCell(timer=None, **{controller.id: controller for controller in controllers})

A representation of a robot cell, providing high-level operations on controllers.

Cell(api_gateway: nova.core.gateway.ApiGateway, cell_id: str)
19    def __init__(self, api_gateway: ApiGateway, cell_id: str):
20        """
21        Initializes a Cell instance.
22        Args:
23            api_gateway (ApiGateway): The underlying gateway for making API calls.
24            cell_id (str): The unique identifier for the cell.
25        """
26        self._api_gateway = api_gateway
27        self._cell_id = cell_id

Initializes a Cell instance.

Arguments:
  • api_gateway (ApiGateway): The underlying gateway for making API calls.
  • cell_id (str): The unique identifier for the cell.
cell_id: str
29    @property
30    def cell_id(self) -> str:
31        """
32        Returns unique identifier for this cell.
33        Returns:
34            str: The cell ID.
35        """
36        return self._cell_id

Returns unique identifier for this cell.

Returns:

str: The cell ID.

async def add_virtual_robot_controller( self, name: str, controller_type: wandelbots_api_client.models.virtual_controller_types.VirtualControllerTypes, controller_manufacturer: wandelbots_api_client.models.manufacturer.Manufacturer, timeout: int = 120, wait_for_ready_timeout: int = 120, position: str | None = None) -> Controller:
51    async def add_virtual_robot_controller(
52        self,
53        name: str,
54        controller_type: api.models.VirtualControllerTypes,
55        controller_manufacturer: api.models.Manufacturer,
56        timeout: int = DEFAULT_ADD_CONTROLLER_TIMEOUT,
57        wait_for_ready_timeout: int = DEFAULT_WAIT_FOR_READY_TIMEOUT,
58        position: str | None = None,
59    ) -> Controller:
60        return await self.add_controller(
61            robot_controller=virtual_controller(
62                name=name,
63                type=controller_type,
64                manufacturer=controller_manufacturer,
65                position=position,
66            ),
67            add_timeout=timeout,
68            wait_for_ready_timeout=wait_for_ready_timeout,
69        )
async def ensure_virtual_robot_controller( self, name: str, controller_type: wandelbots_api_client.models.virtual_controller_types.VirtualControllerTypes, controller_manufacturer: wandelbots_api_client.models.manufacturer.Manufacturer, timeout: int = 120, wait_for_ready_timeout: int = 120) -> Controller:
71    async def ensure_virtual_robot_controller(
72        self,
73        name: str,
74        controller_type: api.models.VirtualControllerTypes,
75        controller_manufacturer: api.models.Manufacturer,
76        timeout: int = DEFAULT_ADD_CONTROLLER_TIMEOUT,
77        wait_for_ready_timeout: int = DEFAULT_WAIT_FOR_READY_TIMEOUT,
78    ) -> Controller:
79        return await self.ensure_controller(
80            robot_controller=virtual_controller(
81                name=name, type=controller_type, manufacturer=controller_manufacturer
82            ),
83            add_timeout=timeout,
84            wait_for_ready_timeout=wait_for_ready_timeout,
85        )
async def add_controller( self, robot_controller: wandelbots_api_client.models.robot_controller.RobotController, add_timeout: int = 120, wait_for_ready_timeout: int = 120) -> Controller:
 87    async def add_controller(
 88        self,
 89        robot_controller: api.models.RobotController,
 90        add_timeout: int = DEFAULT_ADD_CONTROLLER_TIMEOUT,
 91        wait_for_ready_timeout: int = DEFAULT_WAIT_FOR_READY_TIMEOUT,
 92    ) -> Controller:
 93        """
 94        Add a robot controller to the cell and wait for it to get ready.
 95        Args:
 96            robot_controller (api.models.RobotController): The robot controller to add. You can use helper functions from nova to create these configs easily,
 97                see :func:`nova.cell.abb_controller`, :func:`nova.cell.fanuc_controller`, :func:`nova.cell.kuka_controller`,
 98                :func:`nova.cell.universal_robots_controller`, :func:`nova.cell.virtual_controller`, :func:`nova.cell.yaskawa_controller`.
 99            add_timeout (int): The time to wait for the controller to be added (default: 25).
100            wait_for_ready_timeout (int): The time to wait for the controller to be ready (default: 25).
101
102        Returns:
103            Controller: The added Controller object.
104        """
105        await self._api_gateway.add_robot_controller(
106            cell=self._cell_id, robot_controller=robot_controller, timeout=add_timeout
107        )
108
109        await self._api_gateway.wait_for_controller_ready(
110            cell=self._cell_id, name=robot_controller.name, timeout=wait_for_ready_timeout
111        )
112
113        return self._create_controller(robot_controller.name)

Add a robot controller to the cell and wait for it to get ready.

Arguments:
  • robot_controller (api.models.RobotController): The robot controller to add. You can use helper functions from nova to create these configs easily, see nova.cell.abb_controller(), nova.cell.fanuc_controller(), nova.cell.kuka_controller(), nova.cell.universal_robots_controller(), nova.cell.virtual_controller(), nova.cell.yaskawa_controller().
  • add_timeout (int): The time to wait for the controller to be added (default: 25).
  • wait_for_ready_timeout (int): The time to wait for the controller to be ready (default: 25).
Returns:

Controller: The added Controller object.

async def ensure_controller( self, robot_controller: wandelbots_api_client.models.robot_controller.RobotController, add_timeout: int = 120, wait_for_ready_timeout: int = 120) -> Controller:
115    async def ensure_controller(
116        self,
117        robot_controller: api.models.RobotController,
118        add_timeout: int = DEFAULT_ADD_CONTROLLER_TIMEOUT,
119        wait_for_ready_timeout: int = DEFAULT_WAIT_FOR_READY_TIMEOUT,
120    ) -> Controller:
121        """
122        Ensure that a robot controller is added to the cell. If it already exists, it will be returned.
123        If it doesn't exist, it will be added and waited for to be ready.
124        Args:
125            robot_controller (api.models.RobotController): The robot controller to add. You can use helper functions from nova to create these configs easily,
126                see :func:`nova.abb_controller`, :func:`nova.fanuc_controller`, :func:`nova.kuka_controller`,
127                :func:`nova.universal_robots_controller`, :func:`nova.virtual_controller`, :func:`nova.yaskawa_controller`.
128            add_timeout (int): The time to wait for the controller to be added (default: 25).
129            wait_for_ready_timeout (int): The time to wait for the controller to be ready (default: 25).
130
131        Returns:
132            Controller: The added Controller object.
133        """
134        controller = await self._api_gateway.get_controller_instance(
135            cell=self.cell_id, name=robot_controller.name
136        )
137
138        if controller:
139            return self._create_controller(controller.controller)
140        return await self.add_controller(
141            robot_controller, add_timeout=add_timeout, wait_for_ready_timeout=wait_for_ready_timeout
142        )

Ensure that a robot controller is added to the cell. If it already exists, it will be returned. If it doesn't exist, it will be added and waited for to be ready.

Arguments:
  • robot_controller (api.models.RobotController): The robot controller to add. You can use helper functions from nova to create these configs easily, see nova.abb_controller(), nova.fanuc_controller(), nova.kuka_controller(), nova.universal_robots_controller(), nova.virtual_controller(), nova.yaskawa_controller().
  • add_timeout (int): The time to wait for the controller to be added (default: 25).
  • wait_for_ready_timeout (int): The time to wait for the controller to be ready (default: 25).
Returns:

Controller: The added Controller object.

async def controllers(self) -> list[Controller]:
144    async def controllers(self) -> list[Controller]:
145        """
146        List all controllers for this cell.
147        Returns:
148            list[Controller]: A list of Controller objects associated with this cell.
149        """
150        instances = await self._api_gateway.list_controllers(cell=self._cell_id)
151        return [self._create_controller(ci.controller) for ci in instances]

List all controllers for this cell.

Returns:

list[Controller]: A list of Controller objects associated with this cell.

async def controller(self, name: str) -> Controller:
153    async def controller(self, name: str) -> Controller:
154        """
155        Retrieve a specific controller by name.
156        Args:
157            name (str): The name of the controller.
158        Returns:
159            Controller: The Controller object.
160        Raises:
161            ControllerNotFound: If no controller with the specified name exists.
162        """
163        controller_instance = await self._api_gateway.get_controller_instance(
164            cell=self._cell_id, name=name
165        )
166        if not controller_instance:
167            raise ControllerNotFound(controller=name)
168        return self._create_controller(controller_instance.controller)

Retrieve a specific controller by name.

Arguments:
  • name (str): The name of the controller.
Returns:

Controller: The Controller object.

Raises:
  • ControllerNotFound: If no controller with the specified name exists.
async def delete_robot_controller(self, name: str, timeout: int = 25):
170    async def delete_robot_controller(self, name: str, timeout: int = 25):
171        """
172        Delete a robot controller from the cell.
173        Args:
174            name (str): The name of the controller to delete.
175            timeout (int): The time to wait for the controller deletion to complete (default: 25).
176        """
177        await self._api_gateway.delete_robot_controller(
178            cell=self._cell_id, controller=name, completion_timeout=timeout
179        )

Delete a robot controller from the cell.

Arguments:
  • name (str): The name of the controller to delete.
  • timeout (int): The time to wait for the controller deletion to complete (default: 25).
async def get_robot_cell(self) -> nova.cell.robot_cell.RobotCell:
181    async def get_robot_cell(self) -> RobotCell:
182        """
183        Return a RobotCell object containing all known controllers.
184        Returns:
185            RobotCell: A RobotCell initialized with the available controllers.
186        """
187        controllers = await self.controllers()
188        return RobotCell(timer=None, **{controller.id: controller for controller in controllers})

Return a RobotCell object containing all known controllers.

Returns:

RobotCell: A RobotCell initialized with the available controllers.

class MotionGroup(nova.cell.robot_cell.AbstractRobot):
127class MotionGroup(AbstractRobot):
128    """Manages motion planning and execution within a specified motion group."""
129
130    def __init__(self, api_gateway: ApiGateway, cell: str, motion_group_id: str):
131        """
132        Initializes a new MotionGroup instance.
133
134        Args:
135            api_gateway (ApiGateway): The API gateway through which motion commands are sent.
136            cell (str): The name or identifier of the robotic cell.
137            motion_group_id (str): The identifier of the motion group.
138        """
139        self._api_gateway = api_gateway
140        self._cell = cell
141        self._motion_group_id = motion_group_id
142        self._current_motion: str | None = None
143        self._optimizer_setup: wb.models.OptimizerSetup | None = None
144        super().__init__(id=motion_group_id)
145
146    async def open(self):
147        await self._api_gateway.activate_motion_group(
148            cell=self._cell, motion_group_id=self._motion_group_id
149        )
150        return self
151
152    async def close(self):
153        # RPS-1174: when a motion group is deactivated, RAE closes all open connections
154        #           this behaviour is not desired in some cases,
155        #           so for now we will not deactivate for the user
156        pass
157
158    @property
159    def motion_group_id(self) -> str:
160        """
161        Returns:
162            str: The unique identifier for this motion group.
163        """
164        return self._motion_group_id
165
166    @property
167    def current_motion(self) -> str | None:
168        # if not self._current_motion:
169        #    raise ValueError("No MotionId attached. There is no planned motion available.")
170        return self._current_motion
171
172    async def _plan_with_collision_check(
173        self,
174        actions: list[Action],
175        tcp: str,
176        start_joint_position: tuple[float, ...] | None = None,
177        optimizer_setup: wb.models.OptimizerSetup | None = None,
178    ) -> wb.models.JointTrajectory:
179        """
180        This method plans a trajectory and checks for collisions.
181        The collision check only happens if the actions have collision scene data.
182
183        You must provide the exact same collision data into all the actions.
184        Because the underlying API supports collision checks for the whole trajectory only.
185
186        Raises:
187            InconsistentCollisionScenes: If the collision scene data is not consistent across all actions
188
189            Your actions should follow below rules to be considered consistent:
190            1- They all should have the same collision scene data
191            2- They all should have no collision data
192
193            PlanTrajectoryFailed: If the trajectory planning failed including the collision check
194
195        For more information about this API, please refer to the plan_trajectory in the API documentation.
196
197        Args:
198            actions: list of actions to plan, current supported actions are Motion and WriteActions
199                     WriteAction you specify on your path is handled in a performant way.
200                     Please check execute_trajectory.motion_command.set_io for more information.
201            tcp:     The tool to use
202            start_joint_position: The starting joint position, if none provided, current position of the robot is used
203            optimizer_setup: The optimizer setup
204
205        Returns: planned joint trajectory
206
207        """
208        # PREPARE THE REQUEST
209        collision_scenes = validate_collision_scenes(actions)
210        start_joint_position = start_joint_position or await self.joints()
211        robot_setup = optimizer_setup or await self._get_optimizer_setup(tcp=tcp)
212
213        motion_commands = CombinedActions(items=tuple(actions)).to_motion_command()  # type: ignore
214
215        static_colliders = None
216        collision_motion_group = None
217        if collision_scenes and len(collision_scenes) > 0:
218            static_colliders = collision_scenes[0].colliders
219
220            motion_group_type = robot_setup.motion_group_type
221            if (
222                collision_scenes[0].motion_groups
223                and motion_group_type in collision_scenes[0].motion_groups
224            ):
225                collision_motion_group = collision_scenes[0].motion_groups[motion_group_type]
226
227        request = wb.models.PlanTrajectoryRequest(
228            robot_setup=robot_setup,
229            start_joint_position=list(start_joint_position),
230            motion_commands=motion_commands,
231            static_colliders=static_colliders,
232            collision_motion_group=collision_motion_group,
233        )
234
235        return await self._api_gateway.plan_trajectory(
236            cell=self._cell, motion_group_id=self.motion_group_id, request=request
237        )
238
239    # TODO: we get the optimizer setup from as an input because
240    #  it has a velocity setting which is used in collision free movement, I need to double check this
241    async def _plan_collision_free(
242        self,
243        action: CollisionFreeMotion,
244        tcp: str,
245        start_joint_position: list[float],
246        optimizer_setup: wb.models.OptimizerSetup | None = None,
247    ) -> wb.models.JointTrajectory:
248        """
249        This method plans a trajectory and avoids collisions.
250        This means if there is a collision along the way to the target pose or joint positions,
251        It will adjust the trajectory to avoid the collision.
252
253        The collision check only happens if the action have collision scene data.
254
255        For more information about this API, please refer to the plan_collision_free_ptp in the API documentation.
256
257        Args:
258            action: The target pose or joint positions to reach
259            tcp:     The tool to use
260            start_joint_position: The starting joint position, if none provided, current position of the robot is used
261            optimizer_setup: The optimizer setup
262
263        Returns: planned joint trajectory
264
265
266        """
267        target = wb.models.PlanCollisionFreePTPRequestTarget(**action.to_api_model().model_dump())
268        robot_setup = optimizer_setup or await self._get_optimizer_setup(tcp=tcp)
269
270        static_colliders = None
271        collision_motion_group = None
272        collision_scene = action.collision_scene
273        if collision_scene and collision_scene.colliders:
274            static_colliders = collision_scene.colliders
275
276            if (
277                collision_scene.motion_groups
278                and robot_setup.motion_group_type in collision_scene.motion_groups
279            ):
280                collision_motion_group = collision_scene.motion_groups[
281                    robot_setup.motion_group_type
282                ]
283
284        request: wb.models.PlanCollisionFreePTPRequest = wb.models.PlanCollisionFreePTPRequest(
285            robot_setup=robot_setup,
286            start_joint_position=start_joint_position,
287            target=target,
288            static_colliders=static_colliders,
289            collision_motion_group=collision_motion_group,
290        )
291
292        return await self._api_gateway.plan_collision_free_ptp(
293            cell=self._cell, motion_group_id=self.motion_group_id, request=request
294        )
295
296    async def _plan(
297        self,
298        actions: list[Action],
299        tcp: str,
300        start_joint_position: tuple[float, ...] | None = None,
301        optimizer_setup: wb.models.OptimizerSetup | None = None,
302    ) -> wb.models.JointTrajectory:
303        if not actions:
304            raise ValueError("No actions provided")
305
306        current_joints = start_joint_position or await self.joints()
307        robot_setup = optimizer_setup or await self._get_optimizer_setup(tcp=tcp)
308
309        all_trajectories = []
310        for batch in split_actions_into_batches(actions):
311            if len(batch) == 0:
312                raise ValueError("Empty batch of actions")
313
314            if isinstance(batch[0], CollisionFreeMotion):
315                motion: CollisionFreeMotion = cast(CollisionFreeMotion, batch[0])
316                trajectory = await self._plan_collision_free(
317                    action=motion,
318                    tcp=tcp,
319                    start_joint_position=list(current_joints),
320                    optimizer_setup=robot_setup,
321                )
322                all_trajectories.append(trajectory)
323                # the last joint position of this trajectory is the starting point for the next one
324                current_joints = tuple(trajectory.joint_positions[-1].joints)
325            elif isinstance(batch[0], WaitAction):
326                # Waits generate a trajectory with the same joint position at each timestep
327                # Use 50ms timesteps from 0 to wait_for_in_seconds
328                wait_time = batch[0].wait_for_in_seconds
329                timestep = 0.050  # 50ms timestep
330                num_steps = max(2, int(wait_time / timestep) + 1)  # Ensure at least 2 points
331
332                # Create equal-length arrays for positions, times, and locations
333                joint_positions = [
334                    wb.models.Joints(joints=list(current_joints)) for _ in range(num_steps)
335                ]
336                times = [i * timestep for i in range(num_steps)]
337                # Ensure the last timestep is exactly the wait duration
338                times[-1] = wait_time
339                # Use the same location value for all points
340                locations = [0] * num_steps
341
342                trajectory = wb.models.JointTrajectory(
343                    joint_positions=joint_positions,
344                    times=times,
345                    locations=[float(loc) for loc in locations],
346                )
347                all_trajectories.append(trajectory)
348                # the last joint position of this trajectory is the starting point for the next one
349                current_joints = tuple(trajectory.joint_positions[-1].joints)
350            else:
351                trajectory = await self._plan_with_collision_check(
352                    actions=batch,
353                    tcp=tcp,
354                    start_joint_position=current_joints,
355                    optimizer_setup=robot_setup,
356                )
357                all_trajectories.append(trajectory)
358                # the last joint position of this trajectory is the starting point for the next one
359                current_joints = tuple(trajectory.joint_positions[-1].joints)
360
361        return combine_trajectories(all_trajectories)
362
363    # TODO: refactor and simplify code, tests are already there
364    # TODO: split into batches when the collision scene changes in a batch of collision free motions
365
366    async def _execute(
367        self,
368        joint_trajectory: wb.models.JointTrajectory,
369        tcp: str,
370        actions: list[Action],
371        movement_controller: MovementController | None,
372    ) -> AsyncIterable[MovementResponse]:
373        if movement_controller is None:
374            movement_controller = move_forward
375
376        # Load planned trajectory
377        load_plan_response = await self._load_planned_motion(joint_trajectory, tcp)
378
379        # Move to start position
380        number_of_joints = await self._api_gateway.get_joint_number(
381            cell=self._cell, motion_group_id=self.motion_group_id
382        )
383        joints_velocities = [MAX_JOINT_VELOCITY_PREPARE_MOVE] * number_of_joints
384        movement_stream = await self.move_to_start_position(joints_velocities, load_plan_response)
385
386        # If there's an initial consumer, feed it the data
387        async for move_to_response in movement_stream:
388            # TODO: refactor
389            if (
390                move_to_response.state is None
391                or move_to_response.state.motion_groups is None
392                or len(move_to_response.state.motion_groups) == 0
393                or move_to_response.move_response is None
394                or move_to_response.move_response.current_location_on_trajectory is None
395            ):
396                continue
397
398            yield move_to_response
399
400        controller = movement_controller(
401            MovementControllerContext(
402                combined_actions=CombinedActions(items=tuple(actions)),  # type: ignore
403                motion_id=load_plan_response.motion,
404            )
405        )
406
407        def stop_condition(response: wb.models.ExecuteTrajectoryResponse) -> bool:
408            instance = response.actual_instance
409            # Stop when standstill indicates motion ended
410            return (
411                isinstance(instance, wb.models.Standstill)
412                and instance.standstill.reason == wb.models.StandstillReason.REASON_MOTION_ENDED
413            )
414
415        execute_response_streaming_controller = StreamExtractor(controller, stop_condition)
416        execution_task = asyncio.create_task(
417            self._api_gateway.motion_api.execute_trajectory(
418                cell=self._cell, client_request_generator=execute_response_streaming_controller
419            )
420        )
421        async for execute_response in execute_response_streaming_controller:
422            yield execute_response
423        await execution_task
424
425    async def _get_optimizer_setup(self, tcp: str) -> wb.models.OptimizerSetup:
426        # TODO: mypy failed on main branch, need to check
427        if self._optimizer_setup is None or self._optimizer_setup.tcp != tcp:  # type: ignore
428            self._optimizer_setup = await self._api_gateway.get_optimizer_config(
429                cell=self._cell, motion_group_id=self.motion_group_id, tcp=tcp
430            )
431
432        return self._optimizer_setup
433
434    async def _load_planned_motion(
435        self, joint_trajectory: wb.models.JointTrajectory, tcp: str
436    ) -> wb.models.PlanSuccessfulResponse:
437        return await self._api_gateway.load_planned_motion(
438            cell=self._cell,
439            motion_group_id=self.motion_group_id,
440            joint_trajectory=joint_trajectory,
441            tcp=tcp,
442        )
443
444    async def move_to_start_position(
445        self, joint_velocities, load_plan_response: LoadPlanResponse
446    ) -> InitialMovementStream:
447        limit_override = wb.models.LimitsOverride()
448        if joint_velocities is not None:
449            limit_override.joint_velocity_limits = wb.models.Joints(joints=joint_velocities)
450
451        return self._api_gateway.stream_move_to_trajectory_via_join_ptp(
452            cell=self._cell,
453            motion_id=load_plan_response.motion,
454            location_on_trajectory=0,
455            joint_velocity_limits=limit_override.joint_velocity_limits,
456        )
457
458    async def stop(self):
459        logger.debug(f"Stopping motion of {self}...")
460        try:
461            if self._current_motion is None:
462                raise ValueError("No motion to stop")
463            await self._api_gateway.stop_motion(cell=self._cell, motion_id=self._current_motion)
464            logger.debug(f"Motion {self.current_motion} stopped.")
465        except ValueError as e:
466            logger.debug(f"No motion to stop for {self}: {e}")
467
468    async def get_state(self, tcp: str | None = None) -> RobotState:
469        """
470        Returns the motion group state.
471        Args:
472            tcp (str | None): The reference TCP for the cartesian pose part of the robot state. Defaults to None.
473                                        If None, the current active/selected TCP of the motion group is used.
474        """
475        response = await self._api_gateway.get_motion_group_state(
476            cell=self._cell, motion_group_id=self.motion_group_id, tcp=tcp
477        )
478        pose = Pose(response.tcp_pose or response.state.tcp_pose)
479        return RobotState(pose=pose, joints=tuple(response.state.joint_position.joints))
480
481    async def joints(self) -> tuple:
482        """Returns the current joint positions of the motion group."""
483        state = await self.get_state()
484        if state.joints is None:
485            raise ValueError(
486                f"No joint positions available for motion group {self._motion_group_id}"
487            )
488        return state.joints
489
490    async def tcp_pose(self, tcp: str | None = None) -> Pose:
491        """
492        Returns the current TCP pose of the motion group.
493        Args:
494            tcp (str | None): The reference TCP for the returned pose. Defaults to None.
495                                        If None, the current active/selected TCP of the motion group is used.
496        """
497        state = await self.get_state(tcp=tcp)
498        return state.pose
499
500    async def tcps(self) -> list[wb.models.RobotTcp]:
501        response = await self._api_gateway.list_tcps(
502            cell=self._cell, motion_group_id=self.motion_group_id
503        )
504        return response.tcps if response.tcps else []
505
506    async def tcp_names(self) -> list[str]:
507        return [tcp.id for tcp in await self.tcps()]
508
509    async def active_tcp(self) -> wb.models.RobotTcp:
510        active_tcp = await self._api_gateway.get_active_tcp(
511            cell=self._cell, motion_group_id=self.motion_group_id
512        )
513        return active_tcp
514
515    async def active_tcp_name(self) -> str:
516        active_tcp = await self.active_tcp()
517        return active_tcp.id
518
519    async def ensure_virtual_tcp(self, tcp: models.RobotTcp) -> models.RobotTcp:
520        """
521        Ensure that a virtual TCP with the expected configuration exists on this motion group.
522        If it doesn't exist, it will be created. If it exists but has different configuration,
523        it will be updated by recreating it.
524
525        Args:
526            tcp (models.RobotTcp): The expected TCP configuration
527
528        Returns:
529            models.RobotTcp: The TCP configuration
530        """
531        existing_tcps = await self.tcps()
532
533        existing_tcp = None
534        for existing in existing_tcps:
535            if existing.id == tcp.id:
536                existing_tcp = existing
537                break
538
539        if existing_tcp:
540            if existing_tcp == tcp:
541                return existing_tcp
542
543        controller_name = self._motion_group_id.split("@")[1]
544        motion_group_index = int(self._motion_group_id.split("@")[0])
545
546        await self._api_gateway.virtual_robot_setup_api.add_virtual_robot_tcp(
547            cell=self._cell, controller=controller_name, id=motion_group_index, robot_tcp=tcp
548        )
549
550        return tcp

Manages motion planning and execution within a specified motion group.

MotionGroup( api_gateway: nova.core.gateway.ApiGateway, cell: str, motion_group_id: str)
130    def __init__(self, api_gateway: ApiGateway, cell: str, motion_group_id: str):
131        """
132        Initializes a new MotionGroup instance.
133
134        Args:
135            api_gateway (ApiGateway): The API gateway through which motion commands are sent.
136            cell (str): The name or identifier of the robotic cell.
137            motion_group_id (str): The identifier of the motion group.
138        """
139        self._api_gateway = api_gateway
140        self._cell = cell
141        self._motion_group_id = motion_group_id
142        self._current_motion: str | None = None
143        self._optimizer_setup: wb.models.OptimizerSetup | None = None
144        super().__init__(id=motion_group_id)

Initializes a new MotionGroup instance.

Arguments:
  • api_gateway (ApiGateway): The API gateway through which motion commands are sent.
  • cell (str): The name or identifier of the robotic cell.
  • motion_group_id (str): The identifier of the motion group.
async def open(self):
146    async def open(self):
147        await self._api_gateway.activate_motion_group(
148            cell=self._cell, motion_group_id=self._motion_group_id
149        )
150        return self

Allocates the external hardware resource (i.e. establish a connection)

async def close(self):
152    async def close(self):
153        # RPS-1174: when a motion group is deactivated, RAE closes all open connections
154        #           this behaviour is not desired in some cases,
155        #           so for now we will not deactivate for the user
156        pass

Release the external hardware (i.e. close connection or set mode of external hardware back)

motion_group_id: str
158    @property
159    def motion_group_id(self) -> str:
160        """
161        Returns:
162            str: The unique identifier for this motion group.
163        """
164        return self._motion_group_id

Returns: str: The unique identifier for this motion group.

current_motion: str | None
166    @property
167    def current_motion(self) -> str | None:
168        # if not self._current_motion:
169        #    raise ValueError("No MotionId attached. There is no planned motion available.")
170        return self._current_motion
async def move_to_start_position( self, joint_velocities, load_plan_response: wandelbots_api_client.models.plan_successful_response.PlanSuccessfulResponse) -> AsyncIterator[wandelbots_api_client.models.stream_move_response.StreamMoveResponse]:
444    async def move_to_start_position(
445        self, joint_velocities, load_plan_response: LoadPlanResponse
446    ) -> InitialMovementStream:
447        limit_override = wb.models.LimitsOverride()
448        if joint_velocities is not None:
449            limit_override.joint_velocity_limits = wb.models.Joints(joints=joint_velocities)
450
451        return self._api_gateway.stream_move_to_trajectory_via_join_ptp(
452            cell=self._cell,
453            motion_id=load_plan_response.motion,
454            location_on_trajectory=0,
455            joint_velocity_limits=limit_override.joint_velocity_limits,
456        )
async def stop(self):
458    async def stop(self):
459        logger.debug(f"Stopping motion of {self}...")
460        try:
461            if self._current_motion is None:
462                raise ValueError("No motion to stop")
463            await self._api_gateway.stop_motion(cell=self._cell, motion_id=self._current_motion)
464            logger.debug(f"Motion {self.current_motion} stopped.")
465        except ValueError as e:
466            logger.debug(f"No motion to stop for {self}: {e}")

Stop behaviour of the robot

async def get_state(self, tcp: str | None = None) -> nova.types.RobotState:
468    async def get_state(self, tcp: str | None = None) -> RobotState:
469        """
470        Returns the motion group state.
471        Args:
472            tcp (str | None): The reference TCP for the cartesian pose part of the robot state. Defaults to None.
473                                        If None, the current active/selected TCP of the motion group is used.
474        """
475        response = await self._api_gateway.get_motion_group_state(
476            cell=self._cell, motion_group_id=self.motion_group_id, tcp=tcp
477        )
478        pose = Pose(response.tcp_pose or response.state.tcp_pose)
479        return RobotState(pose=pose, joints=tuple(response.state.joint_position.joints))

Returns the motion group state.

Arguments:
  • tcp (str | None): The reference TCP for the cartesian pose part of the robot state. Defaults to None. If None, the current active/selected TCP of the motion group is used.
async def joints(self) -> tuple:
481    async def joints(self) -> tuple:
482        """Returns the current joint positions of the motion group."""
483        state = await self.get_state()
484        if state.joints is None:
485            raise ValueError(
486                f"No joint positions available for motion group {self._motion_group_id}"
487            )
488        return state.joints

Returns the current joint positions of the motion group.

async def tcp_pose(self, tcp: str | None = None) -> nova.types.Pose:
490    async def tcp_pose(self, tcp: str | None = None) -> Pose:
491        """
492        Returns the current TCP pose of the motion group.
493        Args:
494            tcp (str | None): The reference TCP for the returned pose. Defaults to None.
495                                        If None, the current active/selected TCP of the motion group is used.
496        """
497        state = await self.get_state(tcp=tcp)
498        return state.pose

Returns the current TCP pose of the motion group.

Arguments:
  • tcp (str | None): The reference TCP for the returned pose. Defaults to None. If None, the current active/selected TCP of the motion group is used.
async def tcps(self) -> list[wandelbots_api_client.models.robot_tcp.RobotTcp]:
500    async def tcps(self) -> list[wb.models.RobotTcp]:
501        response = await self._api_gateway.list_tcps(
502            cell=self._cell, motion_group_id=self.motion_group_id
503        )
504        return response.tcps if response.tcps else []

Return all TCPs that are configured on the robot with corresponding offset from flange as pose

Returns: the TCPs of the robot

async def tcp_names(self) -> list[str]:
506    async def tcp_names(self) -> list[str]:
507        return [tcp.id for tcp in await self.tcps()]

Return the name of all TCPs that are configured on the robot

Returns: a list of all TCPs

async def active_tcp(self) -> wandelbots_api_client.models.robot_tcp.RobotTcp:
509    async def active_tcp(self) -> wb.models.RobotTcp:
510        active_tcp = await self._api_gateway.get_active_tcp(
511            cell=self._cell, motion_group_id=self.motion_group_id
512        )
513        return active_tcp

Return the active TCP of the robot

Returns: the active TCP

async def active_tcp_name(self) -> str:
515    async def active_tcp_name(self) -> str:
516        active_tcp = await self.active_tcp()
517        return active_tcp.id

Return the name of the active TCP of the robot

Returns: the name of the active TCP

async def ensure_virtual_tcp( self, tcp: wandelbots_api_client.models.robot_tcp.RobotTcp) -> wandelbots_api_client.models.robot_tcp.RobotTcp:
519    async def ensure_virtual_tcp(self, tcp: models.RobotTcp) -> models.RobotTcp:
520        """
521        Ensure that a virtual TCP with the expected configuration exists on this motion group.
522        If it doesn't exist, it will be created. If it exists but has different configuration,
523        it will be updated by recreating it.
524
525        Args:
526            tcp (models.RobotTcp): The expected TCP configuration
527
528        Returns:
529            models.RobotTcp: The TCP configuration
530        """
531        existing_tcps = await self.tcps()
532
533        existing_tcp = None
534        for existing in existing_tcps:
535            if existing.id == tcp.id:
536                existing_tcp = existing
537                break
538
539        if existing_tcp:
540            if existing_tcp == tcp:
541                return existing_tcp
542
543        controller_name = self._motion_group_id.split("@")[1]
544        motion_group_index = int(self._motion_group_id.split("@")[0])
545
546        await self._api_gateway.virtual_robot_setup_api.add_virtual_robot_tcp(
547            cell=self._cell, controller=controller_name, id=motion_group_index, robot_tcp=tcp
548        )
549
550        return tcp

Ensure that a virtual TCP with the expected configuration exists on this motion group. If it doesn't exist, it will be created. If it exists but has different configuration, it will be updated by recreating it.

Arguments:
  • tcp (models.RobotTcp): The expected TCP configuration
Returns:

models.RobotTcp: The TCP configuration

def combine_trajectories( trajectories: list[wandelbots_api_client.models.joint_trajectory.JointTrajectory]) -> wandelbots_api_client.models.joint_trajectory.JointTrajectory:
58def combine_trajectories(
59    trajectories: list[wb.models.JointTrajectory],
60) -> wb.models.JointTrajectory:
61    """
62    Combines multiple trajectories into one trajectory.
63    """
64    final_trajectory = trajectories[0]
65    current_end_time = final_trajectory.times[-1]
66    current_end_location = final_trajectory.locations[-1]
67
68    for trajectory in trajectories[1:]:
69        # Shift times and locations to continue from last endpoint
70        shifted_times = [t + current_end_time for t in trajectory.times[1:]]  # Skip first point
71        shifted_locations = [
72            location + current_end_location for location in trajectory.locations[1:]
73        ]  # Skip first point
74
75        final_trajectory.times.extend(shifted_times)
76        final_trajectory.joint_positions.extend(trajectory.joint_positions[1:])
77        final_trajectory.locations.extend(shifted_locations)
78
79        current_end_time = final_trajectory.times[-1]
80        current_end_location = final_trajectory.locations[-1]
81
82    return final_trajectory

Combines multiple trajectories into one trajectory.

class Controller(typing.Sized, nova.cell.robot_cell.AbstractController, nova.core.gateway.NovaDevice, nova.cell.robot_cell.IODevice):
 12class Controller(Sized, AbstractController, NovaDevice, IODevice):
 13    """
 14    Represents a Nova controller, managing motion groups and IO interactions.
 15    """
 16
 17    class Configuration(NovaDevice.Configuration):
 18        type: Literal["controller"] = "controller"
 19        id: str = "controller"
 20        cell_id: str
 21        controller_id: str
 22
 23    def __init__(self, configuration: Configuration):
 24        super().__init__(configuration)
 25        self._activated_motion_group_ids: list[str] = []
 26        self._io_access = IOAccess(
 27            api_gateway=self._nova_api,
 28            cell=self.configuration.cell_id,
 29            controller_id=self.configuration.controller_id,
 30        )
 31
 32    @property
 33    def controller_id(self) -> str:
 34        """Returns the unique controller ID."""
 35        return self.configuration.controller_id
 36
 37    async def open(self):
 38        """Activates all motion groups."""
 39        motion_group_ids = await self.activated_motion_group_ids()
 40        self._activated_motion_group_ids = motion_group_ids
 41        logger.info(f"Found motion group {motion_group_ids}")
 42        return self
 43
 44    async def close(self):
 45        # RPS-1174: when a motion group is deactivated, RAE closes all open connections
 46        #           this behaviour is not desired in some cases,
 47        #           so for now we will not deactivate for the user
 48        pass
 49
 50    def __len__(self) -> int:
 51        return len(self._activated_motion_group_ids)
 52
 53    def motion_group(self, motion_group_id: str) -> MotionGroup:
 54        """Returns motion group with specific id.
 55
 56        Args:
 57            motion_group_id (str): The ID of the motion group.
 58
 59        Returns:
 60            MotionGroup: A MotionGroup instance corresponding to the given ID.
 61        """
 62        return MotionGroup(
 63            api_gateway=self._nova_api,
 64            cell=self.configuration.cell_id,
 65            motion_group_id=motion_group_id,
 66        )
 67
 68    def __getitem__(self, motion_group_id: int) -> MotionGroup:
 69        return self.motion_group(f"{motion_group_id}@{self.configuration.controller_id}")
 70
 71    async def activated_motion_group_ids(self) -> list[str]:
 72        """Activates and retrieves the list of motion group IDs available on this controller.
 73
 74        The system automatically activates all motion groups on the associated controller.
 75
 76        Returns:
 77            list[str]: A list of activated motion group IDs (e.g., ["0@controller_id"]).
 78        """
 79        return await self._nova_api.activate_all_motion_groups(
 80            cell=self.configuration.cell_id, controller=self.configuration.controller_id
 81        )
 82
 83    async def activated_motion_groups(self) -> list[MotionGroup]:
 84        """Retrieves a list of `MotionGroup` instances for all activated motion groups.
 85
 86        Returns:
 87            list[MotionGroup]: All activated motion groups as `MotionGroup` objects.
 88        """
 89        motion_group_ids = await self.activated_motion_group_ids()
 90        return [self.motion_group(motion_group_id) for motion_group_id in motion_group_ids]
 91
 92    def get_robots(self) -> dict[str, AbstractRobot]:
 93        """Retrieves a dictionary of motion group IDs to their corresponding robots.
 94
 95        Note:
 96            This method interprets motion group IDs to create corresponding `MotionGroup`
 97            objects and returns them as `AbstractRobot` references.
 98
 99        Returns:
100            dict[str, AbstractRobot]: A mapping of motion group ID to `MotionGroup` instance.
101        """
102        return {
103            motion_group_id: self.motion_group(motion_group_id)
104            for motion_group_id in self._activated_motion_group_ids
105        }
106
107    async def read(self, key: str) -> ValueType:
108        """Reads an IO value from the controller.
109
110        Args:
111            key (str): The IO key name.
112
113        Returns:
114            ValueType: The value associated with the specified IO key.
115        """
116        return await self._io_access.read(key)
117
118    async def write(self, key: str, value: ValueType) -> None:
119        """Writes an IO value to the controller.
120
121        Args:
122            key (str): The IO key name.
123            value (ValueType): The value to write to the specified IO key.
124        """
125        return await self._io_access.write(key, value)
126
127    async def stream_state(self, rate_msecs) -> AsyncGenerator[models.RobotControllerState, None]:
128        async for state in self._nova_api.stream_robot_controller_state(
129            cell=self.configuration.cell_id,
130            controller_id=self.configuration.controller_id,
131            response_rate=rate_msecs,
132        ):
133            yield state

Represents a Nova controller, managing motion groups and IO interactions.

controller_id: str
32    @property
33    def controller_id(self) -> str:
34        """Returns the unique controller ID."""
35        return self.configuration.controller_id

Returns the unique controller ID.

async def open(self):
37    async def open(self):
38        """Activates all motion groups."""
39        motion_group_ids = await self.activated_motion_group_ids()
40        self._activated_motion_group_ids = motion_group_ids
41        logger.info(f"Found motion group {motion_group_ids}")
42        return self

Activates all motion groups.

async def close(self):
44    async def close(self):
45        # RPS-1174: when a motion group is deactivated, RAE closes all open connections
46        #           this behaviour is not desired in some cases,
47        #           so for now we will not deactivate for the user
48        pass

Release the external hardware (i.e. close connection or set mode of external hardware back)

def motion_group(self, motion_group_id: str) -> MotionGroup:
53    def motion_group(self, motion_group_id: str) -> MotionGroup:
54        """Returns motion group with specific id.
55
56        Args:
57            motion_group_id (str): The ID of the motion group.
58
59        Returns:
60            MotionGroup: A MotionGroup instance corresponding to the given ID.
61        """
62        return MotionGroup(
63            api_gateway=self._nova_api,
64            cell=self.configuration.cell_id,
65            motion_group_id=motion_group_id,
66        )

Returns motion group with specific id.

Arguments:
  • motion_group_id (str): The ID of the motion group.
Returns:

MotionGroup: A MotionGroup instance corresponding to the given ID.

async def activated_motion_group_ids(self) -> list[str]:
71    async def activated_motion_group_ids(self) -> list[str]:
72        """Activates and retrieves the list of motion group IDs available on this controller.
73
74        The system automatically activates all motion groups on the associated controller.
75
76        Returns:
77            list[str]: A list of activated motion group IDs (e.g., ["0@controller_id"]).
78        """
79        return await self._nova_api.activate_all_motion_groups(
80            cell=self.configuration.cell_id, controller=self.configuration.controller_id
81        )

Activates and retrieves the list of motion group IDs available on this controller.

The system automatically activates all motion groups on the associated controller.

Returns:

list[str]: A list of activated motion group IDs (e.g., ["0@controller_id"]).

async def activated_motion_groups(self) -> list[MotionGroup]:
83    async def activated_motion_groups(self) -> list[MotionGroup]:
84        """Retrieves a list of `MotionGroup` instances for all activated motion groups.
85
86        Returns:
87            list[MotionGroup]: All activated motion groups as `MotionGroup` objects.
88        """
89        motion_group_ids = await self.activated_motion_group_ids()
90        return [self.motion_group(motion_group_id) for motion_group_id in motion_group_ids]

Retrieves a list of MotionGroup instances for all activated motion groups.

Returns:

list[MotionGroup]: All activated motion groups as MotionGroup objects.

async def read(self, key: str) -> Union[int, str, bool, float, nova.types.Pose]:
107    async def read(self, key: str) -> ValueType:
108        """Reads an IO value from the controller.
109
110        Args:
111            key (str): The IO key name.
112
113        Returns:
114            ValueType: The value associated with the specified IO key.
115        """
116        return await self._io_access.read(key)

Reads an IO value from the controller.

Arguments:
  • key (str): The IO key name.
Returns:

ValueType: The value associated with the specified IO key.

async def write( self, key: str, value: Union[int, str, bool, float, nova.types.Pose]) -> None:
118    async def write(self, key: str, value: ValueType) -> None:
119        """Writes an IO value to the controller.
120
121        Args:
122            key (str): The IO key name.
123            value (ValueType): The value to write to the specified IO key.
124        """
125        return await self._io_access.write(key, value)

Writes an IO value to the controller.

Arguments:
  • key (str): The IO key name.
  • value (ValueType): The value to write to the specified IO key.
async def stream_state( self, rate_msecs) -> AsyncGenerator[wandelbots_api_client.models.robot_controller_state.RobotControllerState, NoneType]:
127    async def stream_state(self, rate_msecs) -> AsyncGenerator[models.RobotControllerState, None]:
128        async for state in self._nova_api.stream_robot_controller_state(
129            cell=self.configuration.cell_id,
130            controller_id=self.configuration.controller_id,
131            response_rate=rate_msecs,
132        ):
133            yield state
logger = <Logger wandelbots-nova (INFO)>
def program( function: Callable[~Parameters, ~Return]) -> nova.runtime.function.Function:
203def wrap(function: Callable[Parameters, Return]) -> Function[Parameters, Return]:
204    return Function.validate(function)

The type of the None singleton.

__version__ = '1.18.1'