nova.program
92class ProgramRunner(ABC): 93 """Abstract base class for program runners. 94 95 This class defines the interface that all program runners must implement. 96 It provides the core functionality for running and managing program execution. 97 """ 98 99 def __init__( 100 self, 101 program_id: str, 102 program: Program, 103 args: dict[str, Any], 104 robot_cell_override: RobotCell | None = None, 105 ): 106 self._run_id = str(uuid.uuid4()) 107 self._program_id = program_id 108 self._program = program 109 self._args = args 110 self._robot_cell_override = robot_cell_override 111 self._program_run: ProgramRun = ProgramRun( 112 run_id=self._run_id, 113 program_id=program_id, 114 state=ProgramRunState.NOT_STARTED, 115 logs=None, 116 stdout=None, 117 error=None, 118 traceback=None, 119 start_time=None, 120 end_time=None, 121 input_data=args, 122 output_data=None, 123 ) 124 self._thread: threading.Thread | None = None 125 self._stop_event: threading.Event | None = None 126 self._exc: Exception | None = None 127 128 @property 129 def run_id(self) -> str: 130 """Get the unique identifier of the program run. 131 132 Returns: 133 str: The unique identifier 134 """ 135 return self._run_id 136 137 @property 138 def program_id(self) -> str: 139 """Get the unique identifier of the program. 140 141 Returns: 142 str: The unique identifier 143 """ 144 return self._program_id 145 146 @property 147 def program_run(self) -> ProgramRun: 148 """Get the current program run state and results. 149 150 Returns: 151 Any: The program run object containing execution state and results 152 """ 153 return self._program_run 154 155 @property 156 def state(self) -> ProgramRunState: 157 """Get the current state of the program run. 158 159 Returns: 160 ProgramRunState: The current state 161 """ 162 return self._program_run.state 163 164 @property 165 def stopped(self) -> bool: 166 """Check if the program has been stopped. 167 168 Returns: 169 bool: True if the program has been stopped, False otherwise 170 """ 171 if self._stop_event is None: 172 return False 173 return self._stop_event.is_set() 174 175 def is_running(self) -> bool: 176 """Check if a program is currently running. 177 178 Returns: 179 bool: True if a program is running, False otherwise 180 """ 181 return self._thread is not None and self.state is ProgramRunState.RUNNING 182 183 def join(self): 184 """Wait for the program execution to finish. 185 186 Raises: 187 Exception: If the program execution failed 188 """ 189 self._thread.join() 190 if self._exc: 191 raise self._exc 192 193 def stop(self, sync: bool = False): 194 """Stop the program execution. 195 196 Args: 197 sync: If True, the call blocks until the program is stopped 198 """ 199 if not self.is_running(): 200 raise RuntimeError("Program is not running") 201 if self._stop_event is not None: 202 self._stop_event.set() 203 if sync: 204 self.join() 205 206 def start( 207 self, sync: bool = False, on_state_change: Callable[[Any], Awaitable[None]] | None = None 208 ): 209 """Creates another thread and starts the program execution. If the program was executed already, is currently 210 running, failed or was stopped a new program runner needs to be created. 211 212 Args: 213 sync: if True the execution is synchronous and the method blocks until the execution is finished 214 on_state_change: callback function that is called when the state of the program runner changes 215 216 Raises: 217 RuntimeError: when the runner is not in IDLE state 218 """ 219 # Check if another program execution is already in progress 220 if self.state is not ProgramRunState.NOT_STARTED: 221 raise RuntimeError( 222 "The runner is not in the not_started state. Create a new runner to execute again." 223 ) 224 225 async def _on_state_change(): 226 if on_state_change is not None: 227 await on_state_change(self._program_run) 228 229 def stopper(sync_stop_event, async_stop_event): 230 while not sync_stop_event.wait(0.2): 231 from_thread.check_cancelled() 232 from_thread.run_sync(async_stop_event.set) 233 234 async def runner(): 235 self._stop_event = threading.Event() 236 async_stop_event = anyio.Event() 237 238 # TODO potential memory leak if the the program is running for a long time 239 with contextlib.redirect_stdout(Tee(sys.stdout)) as stdout: 240 try: 241 await stoppable_run( 242 self._run_program( 243 stop_event=async_stop_event, on_state_change=_on_state_change 244 ), 245 to_thread.run_sync( 246 stopper, self._stop_event, async_stop_event, abandon_on_cancel=True 247 ), 248 ) 249 except ExceptionGroup as eg: # noqa: F821 250 raise eg.exceptions[0] 251 self._program_run.stdout = stdout.getvalue() 252 253 # Create new thread and runs _run 254 # start a new thread 255 self._thread = threading.Thread(target=anyio.run, name="ProgramRunner", args=[runner]) 256 self._thread.start() 257 258 if sync: 259 self.join() 260 261 async def _estop_handler( 262 self, 263 monitoring_scope: anyio.CancelScope, 264 *, 265 task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORED, 266 ): 267 assert self.execution_context is not None 268 269 def state_is_estop(state_: api.models.RobotControllerState): 270 # See: models.RobotControllerState.safety_state 271 acceptable_safety_states = ["SAFETY_STATE_NORMAL", "SAFETY_STATE_REDUCED"] 272 return ( 273 isinstance(state_, api.models.RobotControllerState) 274 and state_.safety_state not in acceptable_safety_states 275 ) 276 277 with monitoring_scope: 278 cell_state_stream = self.execution_context.robot_cell.stream_state(1000) 279 task_status.started() 280 281 async for state in cell_state_stream: 282 if state_is_estop(state): 283 logger.info(f"ESTOP detected: {state}") 284 self.stop() # TODO is this clean 285 286 def _handle_general_exception(self, exc: Exception): 287 # Handle any exceptions raised during task execution 288 traceback = tb.format_exc() 289 logger.error(f"Program {self.program_id} run {self.run_id} failed") 290 291 if isinstance(exc, PlanTrajectoryFailed): 292 message = f"{type(exc)}: {exc.to_pretty_string()}" 293 else: 294 message = f"{type(exc)}: {str(exc)}" 295 logger.error(traceback) 296 self._exc = exc 297 logger.error(message) 298 self._program_run.error = message 299 self._program_run.traceback = traceback 300 self._program_run.state = ProgramRunState.FAILED 301 302 async def _run_program( 303 self, stop_event: anyio.Event, on_state_change: Callable[[], Awaitable[None]] 304 ) -> None: 305 """Runs the program and handles the execution context, program state and exception handling. 306 307 Args: 308 stop_event: event that is set when the program execution should be stopped 309 on_state_change: callback function that is called when the state of the program runner changes 310 311 Raises: 312 CancelledError: when the program execution is cancelled # noqa: DAR402 313 314 # noqa: DAR401 315 """ 316 317 # Create a new logger sink to capture the output of the program execution 318 # TODO potential memory leak if the the program is running for a long time 319 log_capture = io.StringIO() 320 sink_id = logger.add(log_capture) 321 322 try: 323 robot_cell = None 324 # TODO: this should be removed to make it possible running programs without a robot cell 325 if self._robot_cell_override: 326 robot_cell = self._robot_cell_override 327 else: 328 async with Nova() as nova: 329 cell = nova.cell() 330 robot_cell = await cell.get_robot_cell() 331 332 if robot_cell is None: 333 raise RuntimeError("No robot cell available") 334 335 self.execution_context = execution_context = ExecutionContext( 336 robot_cell=robot_cell, stop_event=stop_event 337 ) 338 current_execution_context_var.set(execution_context) 339 await on_state_change() 340 341 monitoring_scope = anyio.CancelScope() 342 async with robot_cell, anyio.create_task_group() as tg: 343 await tg.start(self._estop_handler, monitoring_scope) 344 345 try: 346 logger.info(f"Program {self.program_id} run {self.run_id} started") 347 self._program_run.state = ProgramRunState.RUNNING 348 self._program_run.start_time = dt.datetime.now(dt.timezone.utc) 349 await self._run(execution_context) 350 except anyio.get_cancelled_exc_class() as exc: # noqa: F841 351 # Program was stopped 352 logger.info(f"Program {self.program_id} run {self.run_id} cancelled") 353 try: 354 with anyio.CancelScope(shield=True): 355 await robot_cell.stop() 356 except Exception as e: 357 logger.error( 358 f"Program {self.program_id} run {self.run_id}: Error while stopping robot cell: {e!r}" 359 ) 360 raise 361 362 self._program_run.state = ProgramRunState.STOPPED 363 raise 364 365 except NotPlannableError as exc: 366 # Program was not plannable (aka. /plan/ endpoint) 367 self._handle_general_exception(exc) 368 except Exception as exc: # pylint: disable=broad-except 369 self._handle_general_exception(exc) 370 else: 371 if self.stopped: 372 # Program was stopped 373 logger.info( 374 f"Program {self.program_id} run {self.run_id} stopped successfully" 375 ) 376 self._program_run.state = ProgramRunState.STOPPED 377 elif self._program_run.state is ProgramRunState.RUNNING: 378 # Program was completed 379 self._program_run.state = ProgramRunState.COMPLETED 380 logger.info( 381 f"Program {self.program_id} run {self.run_id} completed successfully" 382 ) 383 finally: 384 # write path to output 385 # self._program_run.execution_results = execution_context.motion_group_recordings 386 self._program_run.output_data = execution_context.output_data 387 388 logger.info( 389 f"Program {self.program_id} run {self.run_id} finished. Run teardown routine..." 390 ) 391 self._program_run.end_time = dt.datetime.now(dt.timezone.utc) 392 393 logger.remove(sink_id) 394 self._program_run.logs = log_capture.getvalue() 395 monitoring_scope.cancel() 396 await on_state_change() 397 except anyio.get_cancelled_exc_class(): 398 raise 399 except Exception as exc: # pylint: disable=broad-except 400 # Handle any exceptions raised during entering the robot cell context 401 self._handle_general_exception(exc) 402 403 @abstractmethod 404 async def _run(self, execution_context: ExecutionContext): 405 """ 406 The main function that runs the program. This method should be overridden by subclasses to implement the 407 runner logic. 408 """
Abstract base class for program runners.
This class defines the interface that all program runners must implement. It provides the core functionality for running and managing program execution.
128 @property 129 def run_id(self) -> str: 130 """Get the unique identifier of the program run. 131 132 Returns: 133 str: The unique identifier 134 """ 135 return self._run_id
Get the unique identifier of the program run.
Returns:
str: The unique identifier
137 @property 138 def program_id(self) -> str: 139 """Get the unique identifier of the program. 140 141 Returns: 142 str: The unique identifier 143 """ 144 return self._program_id
Get the unique identifier of the program.
Returns:
str: The unique identifier
146 @property 147 def program_run(self) -> ProgramRun: 148 """Get the current program run state and results. 149 150 Returns: 151 Any: The program run object containing execution state and results 152 """ 153 return self._program_run
Get the current program run state and results.
Returns:
Any: The program run object containing execution state and results
155 @property 156 def state(self) -> ProgramRunState: 157 """Get the current state of the program run. 158 159 Returns: 160 ProgramRunState: The current state 161 """ 162 return self._program_run.state
Get the current state of the program run.
Returns:
ProgramRunState: The current state
164 @property 165 def stopped(self) -> bool: 166 """Check if the program has been stopped. 167 168 Returns: 169 bool: True if the program has been stopped, False otherwise 170 """ 171 if self._stop_event is None: 172 return False 173 return self._stop_event.is_set()
Check if the program has been stopped.
Returns:
bool: True if the program has been stopped, False otherwise
175 def is_running(self) -> bool: 176 """Check if a program is currently running. 177 178 Returns: 179 bool: True if a program is running, False otherwise 180 """ 181 return self._thread is not None and self.state is ProgramRunState.RUNNING
Check if a program is currently running.
Returns:
bool: True if a program is running, False otherwise
183 def join(self): 184 """Wait for the program execution to finish. 185 186 Raises: 187 Exception: If the program execution failed 188 """ 189 self._thread.join() 190 if self._exc: 191 raise self._exc
Wait for the program execution to finish.
Raises:
- Exception: If the program execution failed
193 def stop(self, sync: bool = False): 194 """Stop the program execution. 195 196 Args: 197 sync: If True, the call blocks until the program is stopped 198 """ 199 if not self.is_running(): 200 raise RuntimeError("Program is not running") 201 if self._stop_event is not None: 202 self._stop_event.set() 203 if sync: 204 self.join()
Stop the program execution.
Arguments:
- sync: If True, the call blocks until the program is stopped
206 def start( 207 self, sync: bool = False, on_state_change: Callable[[Any], Awaitable[None]] | None = None 208 ): 209 """Creates another thread and starts the program execution. If the program was executed already, is currently 210 running, failed or was stopped a new program runner needs to be created. 211 212 Args: 213 sync: if True the execution is synchronous and the method blocks until the execution is finished 214 on_state_change: callback function that is called when the state of the program runner changes 215 216 Raises: 217 RuntimeError: when the runner is not in IDLE state 218 """ 219 # Check if another program execution is already in progress 220 if self.state is not ProgramRunState.NOT_STARTED: 221 raise RuntimeError( 222 "The runner is not in the not_started state. Create a new runner to execute again." 223 ) 224 225 async def _on_state_change(): 226 if on_state_change is not None: 227 await on_state_change(self._program_run) 228 229 def stopper(sync_stop_event, async_stop_event): 230 while not sync_stop_event.wait(0.2): 231 from_thread.check_cancelled() 232 from_thread.run_sync(async_stop_event.set) 233 234 async def runner(): 235 self._stop_event = threading.Event() 236 async_stop_event = anyio.Event() 237 238 # TODO potential memory leak if the the program is running for a long time 239 with contextlib.redirect_stdout(Tee(sys.stdout)) as stdout: 240 try: 241 await stoppable_run( 242 self._run_program( 243 stop_event=async_stop_event, on_state_change=_on_state_change 244 ), 245 to_thread.run_sync( 246 stopper, self._stop_event, async_stop_event, abandon_on_cancel=True 247 ), 248 ) 249 except ExceptionGroup as eg: # noqa: F821 250 raise eg.exceptions[0] 251 self._program_run.stdout = stdout.getvalue() 252 253 # Create new thread and runs _run 254 # start a new thread 255 self._thread = threading.Thread(target=anyio.run, name="ProgramRunner", args=[runner]) 256 self._thread.start() 257 258 if sync: 259 self.join()
Creates another thread and starts the program execution. If the program was executed already, is currently running, failed or was stopped a new program runner needs to be created.
Arguments:
- sync: if True the execution is synchronous and the method blocks until the execution is finished
- on_state_change: callback function that is called when the state of the program runner changes
Raises:
- RuntimeError: when the runner is not in IDLE state
285def program( 286 name: str | None = None, 287 preconditions: ProgramPreconditions | None = None, 288 viewer: Any | None = None, 289): 290 """ 291 Decorator factory for creating Nova programs with declarative controller setup. 292 293 Args: 294 name: Name of the program 295 preconditions: ProgramPreconditions containing controller configurations and cleanup settings 296 viewer: Optional viewer instance for program visualization (e.g., nova.viewers.Rerun()) 297 """ 298 299 def decorator( 300 function: Callable[Parameters, Return], 301 ) -> Program[Parameters, Coroutine[Any, Any, Return]]: 302 # Validate that the function is async 303 if not asyncio.iscoroutinefunction(function): 304 raise TypeError(f"Program function '{function.__name__}' must be async") 305 306 func_obj = Program.validate(function) 307 if name: 308 func_obj.name = name 309 func_obj.preconditions = preconditions 310 311 # Create a wrapper that handles controller lifecycle 312 original_wrapped = func_obj._wrapped 313 314 async def async_wrapper(*args: Parameters.args, **kwargs: Parameters.kwargs) -> Return: 315 """Async wrapper that handles controller creation and cleanup.""" 316 created_controllers = [] 317 try: 318 # Create controllers before execution 319 created_controllers = await func_obj._create_controllers() 320 321 # Configure viewers if any are active 322 if viewer is not None: 323 # Configure the viewer when Nova instance becomes available in the function 324 # This will be done via a hook in the Nova context manager 325 pass 326 327 # Execute the wrapped function 328 result = await original_wrapped(*args, **kwargs) 329 return result 330 finally: 331 # Clean up controllers after execution 332 await func_obj._cleanup_controllers(created_controllers) 333 334 # Clean up viewers 335 if viewer is not None: 336 from nova.viewers import _cleanup_active_viewers 337 338 _cleanup_active_viewers() 339 340 # Update the wrapped function to our async wrapper 341 func_obj._wrapped = async_wrapper 342 return func_obj 343 344 return decorator
Decorator factory for creating Nova programs with declarative controller setup.
Arguments:
- name: Name of the program
- preconditions: ProgramPreconditions containing controller configurations and cleanup settings
- viewer: Optional viewer instance for program visualization (e.g., nova.viewers.Rerun())
34class ProgramPreconditions(BaseModel): 35 controllers: list[api.models.RobotController] | None = None 36 cleanup_controllers: bool = False
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__
[Signature
][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__
function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__
andModel.__root_validators__
from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel
][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-core
SchemaSerializer
used to dump instances of the model. - __pydantic_validator__: The
pydantic-core
SchemaValidator
used to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo
][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo
][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra
][pydantic.config.ConfigDict.extra] is set to'allow'
. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.