nova.program

1from nova.program.function import ProgramPreconditions, program
2from nova.program.runner import ProgramRunner
3
4__all__ = ["ProgramRunner", "program", "ProgramPreconditions"]
class ProgramRunner(abc.ABC):
 92class ProgramRunner(ABC):
 93    """Abstract base class for program runners.
 94
 95    This class defines the interface that all program runners must implement.
 96    It provides the core functionality for running and managing program execution.
 97    """
 98
 99    def __init__(
100        self,
101        program_id: str,
102        program: Program,
103        args: dict[str, Any],
104        robot_cell_override: RobotCell | None = None,
105    ):
106        self._run_id = str(uuid.uuid4())
107        self._program_id = program_id
108        self._program = program
109        self._args = args
110        self._robot_cell_override = robot_cell_override
111        self._program_run: ProgramRun = ProgramRun(
112            run_id=self._run_id,
113            program_id=program_id,
114            state=ProgramRunState.NOT_STARTED,
115            logs=None,
116            stdout=None,
117            error=None,
118            traceback=None,
119            start_time=None,
120            end_time=None,
121            input_data=args,
122            output_data=None,
123        )
124        self._thread: threading.Thread | None = None
125        self._stop_event: threading.Event | None = None
126        self._exc: Exception | None = None
127
128    @property
129    def run_id(self) -> str:
130        """Get the unique identifier of the program run.
131
132        Returns:
133            str: The unique identifier
134        """
135        return self._run_id
136
137    @property
138    def program_id(self) -> str:
139        """Get the unique identifier of the program.
140
141        Returns:
142            str: The unique identifier
143        """
144        return self._program_id
145
146    @property
147    def program_run(self) -> ProgramRun:
148        """Get the current program run state and results.
149
150        Returns:
151            Any: The program run object containing execution state and results
152        """
153        return self._program_run
154
155    @property
156    def state(self) -> ProgramRunState:
157        """Get the current state of the program run.
158
159        Returns:
160            ProgramRunState: The current state
161        """
162        return self._program_run.state
163
164    @property
165    def stopped(self) -> bool:
166        """Check if the program has been stopped.
167
168        Returns:
169            bool: True if the program has been stopped, False otherwise
170        """
171        if self._stop_event is None:
172            return False
173        return self._stop_event.is_set()
174
175    def is_running(self) -> bool:
176        """Check if a program is currently running.
177
178        Returns:
179            bool: True if a program is running, False otherwise
180        """
181        return self._thread is not None and self.state is ProgramRunState.RUNNING
182
183    def join(self):
184        """Wait for the program execution to finish.
185
186        Raises:
187            Exception: If the program execution failed
188        """
189        self._thread.join()
190        if self._exc:
191            raise self._exc
192
193    def stop(self, sync: bool = False):
194        """Stop the program execution.
195
196        Args:
197            sync: If True, the call blocks until the program is stopped
198        """
199        if not self.is_running():
200            raise RuntimeError("Program is not running")
201        if self._stop_event is not None:
202            self._stop_event.set()
203        if sync:
204            self.join()
205
206    def start(
207        self, sync: bool = False, on_state_change: Callable[[Any], Awaitable[None]] | None = None
208    ):
209        """Creates another thread and starts the program execution. If the program was executed already, is currently
210        running, failed or was stopped a new program runner needs to be created.
211
212        Args:
213            sync: if True the execution is synchronous and the method blocks until the execution is finished
214            on_state_change: callback function that is called when the state of the program runner changes
215
216        Raises:
217            RuntimeError: when the runner is not in IDLE state
218        """
219        # Check if another program execution is already in progress
220        if self.state is not ProgramRunState.NOT_STARTED:
221            raise RuntimeError(
222                "The runner is not in the not_started state. Create a new runner to execute again."
223            )
224
225        async def _on_state_change():
226            if on_state_change is not None:
227                await on_state_change(self._program_run)
228
229        def stopper(sync_stop_event, async_stop_event):
230            while not sync_stop_event.wait(0.2):
231                from_thread.check_cancelled()
232            from_thread.run_sync(async_stop_event.set)
233
234        async def runner():
235            self._stop_event = threading.Event()
236            async_stop_event = anyio.Event()
237
238            # TODO potential memory leak if the the program is running for a long time
239            with contextlib.redirect_stdout(Tee(sys.stdout)) as stdout:
240                try:
241                    await stoppable_run(
242                        self._run_program(
243                            stop_event=async_stop_event, on_state_change=_on_state_change
244                        ),
245                        to_thread.run_sync(
246                            stopper, self._stop_event, async_stop_event, abandon_on_cancel=True
247                        ),
248                    )
249                except ExceptionGroup as eg:  # noqa: F821
250                    raise eg.exceptions[0]
251                self._program_run.stdout = stdout.getvalue()
252
253        # Create new thread and runs _run
254        # start a new thread
255        self._thread = threading.Thread(target=anyio.run, name="ProgramRunner", args=[runner])
256        self._thread.start()
257
258        if sync:
259            self.join()
260
261    async def _estop_handler(
262        self,
263        monitoring_scope: anyio.CancelScope,
264        *,
265        task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORED,
266    ):
267        assert self.execution_context is not None
268
269        def state_is_estop(state_: api.models.RobotControllerState):
270            # See: models.RobotControllerState.safety_state
271            acceptable_safety_states = ["SAFETY_STATE_NORMAL", "SAFETY_STATE_REDUCED"]
272            return (
273                isinstance(state_, api.models.RobotControllerState)
274                and state_.safety_state not in acceptable_safety_states
275            )
276
277        with monitoring_scope:
278            cell_state_stream = self.execution_context.robot_cell.stream_state(1000)
279            task_status.started()
280
281            async for state in cell_state_stream:
282                if state_is_estop(state):
283                    logger.info(f"ESTOP detected: {state}")
284                    self.stop()  # TODO is this clean
285
286    def _handle_general_exception(self, exc: Exception):
287        # Handle any exceptions raised during task execution
288        traceback = tb.format_exc()
289        logger.error(f"Program {self.program_id} run {self.run_id} failed")
290
291        if isinstance(exc, PlanTrajectoryFailed):
292            message = f"{type(exc)}: {exc.to_pretty_string()}"
293        else:
294            message = f"{type(exc)}: {str(exc)}"
295            logger.error(traceback)
296            self._exc = exc
297        logger.error(message)
298        self._program_run.error = message
299        self._program_run.traceback = traceback
300        self._program_run.state = ProgramRunState.FAILED
301
302    async def _run_program(
303        self, stop_event: anyio.Event, on_state_change: Callable[[], Awaitable[None]]
304    ) -> None:
305        """Runs the program and handles the execution context, program state and exception handling.
306
307        Args:
308            stop_event: event that is set when the program execution should be stopped
309            on_state_change: callback function that is called when the state of the program runner changes
310
311        Raises:
312            CancelledError: when the program execution is cancelled  # noqa: DAR402
313
314        # noqa: DAR401
315        """
316
317        # Create a new logger sink to capture the output of the program execution
318        # TODO potential memory leak if the the program is running for a long time
319        log_capture = io.StringIO()
320        sink_id = logger.add(log_capture)
321
322        try:
323            robot_cell = None
324            # TODO: this should be removed to make it possible running programs without a robot cell
325            if self._robot_cell_override:
326                robot_cell = self._robot_cell_override
327            else:
328                async with Nova() as nova:
329                    cell = nova.cell()
330                    robot_cell = await cell.get_robot_cell()
331
332            if robot_cell is None:
333                raise RuntimeError("No robot cell available")
334
335            self.execution_context = execution_context = ExecutionContext(
336                robot_cell=robot_cell, stop_event=stop_event
337            )
338            current_execution_context_var.set(execution_context)
339            await on_state_change()
340
341            monitoring_scope = anyio.CancelScope()
342            async with robot_cell, anyio.create_task_group() as tg:
343                await tg.start(self._estop_handler, monitoring_scope)
344
345                try:
346                    logger.info(f"Program {self.program_id} run {self.run_id} started")
347                    self._program_run.state = ProgramRunState.RUNNING
348                    self._program_run.start_time = dt.datetime.now(dt.timezone.utc)
349                    await self._run(execution_context)
350                except anyio.get_cancelled_exc_class() as exc:  # noqa: F841
351                    # Program was stopped
352                    logger.info(f"Program {self.program_id} run {self.run_id} cancelled")
353                    try:
354                        with anyio.CancelScope(shield=True):
355                            await robot_cell.stop()
356                    except Exception as e:
357                        logger.error(
358                            f"Program {self.program_id} run {self.run_id}: Error while stopping robot cell: {e!r}"
359                        )
360                        raise
361
362                    self._program_run.state = ProgramRunState.STOPPED
363                    raise
364
365                except NotPlannableError as exc:
366                    # Program was not plannable (aka. /plan/ endpoint)
367                    self._handle_general_exception(exc)
368                except Exception as exc:  # pylint: disable=broad-except
369                    self._handle_general_exception(exc)
370                else:
371                    if self.stopped:
372                        # Program was stopped
373                        logger.info(
374                            f"Program {self.program_id} run {self.run_id} stopped successfully"
375                        )
376                        self._program_run.state = ProgramRunState.STOPPED
377                    elif self._program_run.state is ProgramRunState.RUNNING:
378                        # Program was completed
379                        self._program_run.state = ProgramRunState.COMPLETED
380                        logger.info(
381                            f"Program {self.program_id} run {self.run_id} completed successfully"
382                        )
383                finally:
384                    # write path to output
385                    # self._program_run.execution_results = execution_context.motion_group_recordings
386                    self._program_run.output_data = execution_context.output_data
387
388                    logger.info(
389                        f"Program {self.program_id} run {self.run_id} finished. Run teardown routine..."
390                    )
391                    self._program_run.end_time = dt.datetime.now(dt.timezone.utc)
392
393                    logger.remove(sink_id)
394                    self._program_run.logs = log_capture.getvalue()
395                    monitoring_scope.cancel()
396                    await on_state_change()
397        except anyio.get_cancelled_exc_class():
398            raise
399        except Exception as exc:  # pylint: disable=broad-except
400            # Handle any exceptions raised during entering the robot cell context
401            self._handle_general_exception(exc)
402
403    @abstractmethod
404    async def _run(self, execution_context: ExecutionContext):
405        """
406        The main function that runs the program. This method should be overridden by subclasses to implement the
407        runner logic.
408        """

Abstract base class for program runners.

This class defines the interface that all program runners must implement. It provides the core functionality for running and managing program execution.

run_id: str
128    @property
129    def run_id(self) -> str:
130        """Get the unique identifier of the program run.
131
132        Returns:
133            str: The unique identifier
134        """
135        return self._run_id

Get the unique identifier of the program run.

Returns:

str: The unique identifier

program_id: str
137    @property
138    def program_id(self) -> str:
139        """Get the unique identifier of the program.
140
141        Returns:
142            str: The unique identifier
143        """
144        return self._program_id

Get the unique identifier of the program.

Returns:

str: The unique identifier

program_run: nova.program.runner.ProgramRun
146    @property
147    def program_run(self) -> ProgramRun:
148        """Get the current program run state and results.
149
150        Returns:
151            Any: The program run object containing execution state and results
152        """
153        return self._program_run

Get the current program run state and results.

Returns:

Any: The program run object containing execution state and results

state: nova.program.runner.ProgramRunState
155    @property
156    def state(self) -> ProgramRunState:
157        """Get the current state of the program run.
158
159        Returns:
160            ProgramRunState: The current state
161        """
162        return self._program_run.state

Get the current state of the program run.

Returns:

ProgramRunState: The current state

stopped: bool
164    @property
165    def stopped(self) -> bool:
166        """Check if the program has been stopped.
167
168        Returns:
169            bool: True if the program has been stopped, False otherwise
170        """
171        if self._stop_event is None:
172            return False
173        return self._stop_event.is_set()

Check if the program has been stopped.

Returns:

bool: True if the program has been stopped, False otherwise

def is_running(self) -> bool:
175    def is_running(self) -> bool:
176        """Check if a program is currently running.
177
178        Returns:
179            bool: True if a program is running, False otherwise
180        """
181        return self._thread is not None and self.state is ProgramRunState.RUNNING

Check if a program is currently running.

Returns:

bool: True if a program is running, False otherwise

def join(self):
183    def join(self):
184        """Wait for the program execution to finish.
185
186        Raises:
187            Exception: If the program execution failed
188        """
189        self._thread.join()
190        if self._exc:
191            raise self._exc

Wait for the program execution to finish.

Raises:
  • Exception: If the program execution failed
def stop(self, sync: bool = False):
193    def stop(self, sync: bool = False):
194        """Stop the program execution.
195
196        Args:
197            sync: If True, the call blocks until the program is stopped
198        """
199        if not self.is_running():
200            raise RuntimeError("Program is not running")
201        if self._stop_event is not None:
202            self._stop_event.set()
203        if sync:
204            self.join()

Stop the program execution.

Arguments:
  • sync: If True, the call blocks until the program is stopped
def start( self, sync: bool = False, on_state_change: Callable[[typing.Any], Awaitable[None]] | None = None):
206    def start(
207        self, sync: bool = False, on_state_change: Callable[[Any], Awaitable[None]] | None = None
208    ):
209        """Creates another thread and starts the program execution. If the program was executed already, is currently
210        running, failed or was stopped a new program runner needs to be created.
211
212        Args:
213            sync: if True the execution is synchronous and the method blocks until the execution is finished
214            on_state_change: callback function that is called when the state of the program runner changes
215
216        Raises:
217            RuntimeError: when the runner is not in IDLE state
218        """
219        # Check if another program execution is already in progress
220        if self.state is not ProgramRunState.NOT_STARTED:
221            raise RuntimeError(
222                "The runner is not in the not_started state. Create a new runner to execute again."
223            )
224
225        async def _on_state_change():
226            if on_state_change is not None:
227                await on_state_change(self._program_run)
228
229        def stopper(sync_stop_event, async_stop_event):
230            while not sync_stop_event.wait(0.2):
231                from_thread.check_cancelled()
232            from_thread.run_sync(async_stop_event.set)
233
234        async def runner():
235            self._stop_event = threading.Event()
236            async_stop_event = anyio.Event()
237
238            # TODO potential memory leak if the the program is running for a long time
239            with contextlib.redirect_stdout(Tee(sys.stdout)) as stdout:
240                try:
241                    await stoppable_run(
242                        self._run_program(
243                            stop_event=async_stop_event, on_state_change=_on_state_change
244                        ),
245                        to_thread.run_sync(
246                            stopper, self._stop_event, async_stop_event, abandon_on_cancel=True
247                        ),
248                    )
249                except ExceptionGroup as eg:  # noqa: F821
250                    raise eg.exceptions[0]
251                self._program_run.stdout = stdout.getvalue()
252
253        # Create new thread and runs _run
254        # start a new thread
255        self._thread = threading.Thread(target=anyio.run, name="ProgramRunner", args=[runner])
256        self._thread.start()
257
258        if sync:
259            self.join()

Creates another thread and starts the program execution. If the program was executed already, is currently running, failed or was stopped a new program runner needs to be created.

Arguments:
  • sync: if True the execution is synchronous and the method blocks until the execution is finished
  • on_state_change: callback function that is called when the state of the program runner changes
Raises:
  • RuntimeError: when the runner is not in IDLE state
def program( name: str | None = None, preconditions: ProgramPreconditions | None = None, viewer: typing.Any | None = None):
285def program(
286    name: str | None = None,
287    preconditions: ProgramPreconditions | None = None,
288    viewer: Any | None = None,
289):
290    """
291    Decorator factory for creating Nova programs with declarative controller setup.
292
293    Args:
294        name: Name of the program
295        preconditions: ProgramPreconditions containing controller configurations and cleanup settings
296        viewer: Optional viewer instance for program visualization (e.g., nova.viewers.Rerun())
297    """
298
299    def decorator(
300        function: Callable[Parameters, Return],
301    ) -> Program[Parameters, Coroutine[Any, Any, Return]]:
302        # Validate that the function is async
303        if not asyncio.iscoroutinefunction(function):
304            raise TypeError(f"Program function '{function.__name__}' must be async")
305
306        func_obj = Program.validate(function)
307        if name:
308            func_obj.name = name
309        func_obj.preconditions = preconditions
310
311        # Create a wrapper that handles controller lifecycle
312        original_wrapped = func_obj._wrapped
313
314        async def async_wrapper(*args: Parameters.args, **kwargs: Parameters.kwargs) -> Return:
315            """Async wrapper that handles controller creation and cleanup."""
316            created_controllers = []
317            try:
318                # Create controllers before execution
319                created_controllers = await func_obj._create_controllers()
320
321                # Configure viewers if any are active
322                if viewer is not None:
323                    # Configure the viewer when Nova instance becomes available in the function
324                    # This will be done via a hook in the Nova context manager
325                    pass
326
327                # Execute the wrapped function
328                result = await original_wrapped(*args, **kwargs)
329                return result
330            finally:
331                # Clean up controllers after execution
332                await func_obj._cleanup_controllers(created_controllers)
333
334                # Clean up viewers
335                if viewer is not None:
336                    from nova.viewers import _cleanup_active_viewers
337
338                    _cleanup_active_viewers()
339
340        # Update the wrapped function to our async wrapper
341        func_obj._wrapped = async_wrapper
342        return func_obj
343
344    return decorator

Decorator factory for creating Nova programs with declarative controller setup.

Arguments:
  • name: Name of the program
  • preconditions: ProgramPreconditions containing controller configurations and cleanup settings
  • viewer: Optional viewer instance for program visualization (e.g., nova.viewers.Rerun())
class ProgramPreconditions(pydantic.main.BaseModel):
34class ProgramPreconditions(BaseModel):
35    controllers: list[api.models.RobotController] | None = None
36    cleanup_controllers: bool = False

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
controllers: list[wandelbots_api_client.models.robot_controller.RobotController] | None
cleanup_controllers: bool
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].