Skip to content

API Reference

This page is automatically generated from the source code docstrings.

Process

Process(tasks: list[Task], name: str = '')

Manages and executes a collection of interdependent tasks.

A Process orchestrates the execution of multiple tasks, handling dependency resolution, task ordering. Task execution can be performed in parallel or sequentially. It provides logging management and error propagation for dependent tasks. If a task fails, all tasks depending on it are marked as failed without execution, but non-dependent tasks continue to run.

Attributes:

Name Type Description
tasks list[Task]

List of tasks to be executed, automatically sorted by dependencies.

name str

Human-readable name for the process ("" if unnamed).

runner ProcessRunner

The runner responsible for executing the tasks.

Parameters:

Name Type Description Default
tasks list[Task]

The tasks to orchestrate. Order does not matter — the constructor topologically sorts the list in place. The list is mutated during construction; pass a copy if the original ordering matters.

required
name str

Human-readable name for the process. Recorded on the resulting ProcessExecutionReport and used to label notifications (e.g. the email subject). Defaults to "" (unnamed).

''

Raises:

Type Description
TypeError

If tasks is not a list or contains non-Task elements.

ValueError

If duplicate task names are found.

DependencyNotFoundError

If a task depends on a non-existent task.

CircularDependencyError

If circular dependencies are detected among tasks.

Source code in src/processes/process.py
57
58
59
60
61
62
63
64
65
66
def __init__(self, tasks: list[Task], name: str = ""):
    self.tasks = tasks
    self.name = name

    try:
        self._validate_and_sort()
    except Exception:
        self.close_loggers()
        raise
    self.runner = ProcessRunner(self)

__enter__

__enter__() -> Self

Called when entering the 'with' block.

Source code in src/processes/process.py
81
82
83
def __enter__(self) -> Self:
    """Called when entering the 'with' block."""
    return self

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> Literal[False]

Called when exiting the 'with' block, even if an error occurred.

Source code in src/processes/process.py
85
86
87
88
89
90
91
92
93
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> Literal[False]:
    """Called when exiting the 'with' block, even if an error occurred."""
    self.close_loggers()
    return False

get_task

get_task(task_name: str) -> Task

Retrieve a task by name in O(1).

Parameters:

Name Type Description Default
task_name str

The name of the task to retrieve.

required

Returns:

Type Description
Task

The task with the specified name.

Raises:

Type Description
TaskNotFoundError

If no task with the given name exists.

Source code in src/processes/process.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def get_task(self, task_name: str) -> Task:
    """Retrieve a task by name in O(1).

    Parameters
    ----------
    task_name : str
        The name of the task to retrieve.

    Returns
    -------
    Task
        The task with the specified name.

    Raises
    ------
    TaskNotFoundError
        If no task with the given name exists.
    """
    try:
        return self._task_map[task_name]
    except KeyError as err:
        raise TaskNotFoundError(task_name) from err

add_task

add_task(task: Task) -> None

Add a task to the process and re-resolve the graph.

The graph is re-validated and re-sorted, and any pending run state is reset (a fresh runner is built). If the new task makes the graph invalid, the process is left unchanged and the original error is raised.

Parameters:

Name Type Description Default
task Task

The task to add.

required

Raises:

Type Description
TypeError

If task is not a Task.

ValueError

If a task with the same name already exists.

DependencyNotFoundError

If task depends on a task not present in the process.

CircularDependencyError

If adding task introduces a cycle.

Source code in src/processes/process.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def add_task(self, task: Task) -> None:
    """Add a task to the process and re-resolve the graph.

    The graph is re-validated and re-sorted, and any pending run state is
    reset (a fresh runner is built). If the new task makes the graph
    invalid, the process is left unchanged and the original error is raised.

    Parameters
    ----------
    task : Task
        The task to add.

    Raises
    ------
    TypeError
        If ``task`` is not a ``Task``.
    ValueError
        If a task with the same name already exists.
    DependencyNotFoundError
        If ``task`` depends on a task not present in the process.
    CircularDependencyError
        If adding ``task`` introduces a cycle.
    """
    snapshot = list(self.tasks)
    self.tasks.append(task)
    self._commit_or_rollback(snapshot)

remove_task

remove_task(task_name: str) -> None

Remove a task from the process and re-resolve the graph.

Rejected if any other task still depends on task_name — remove the dependents first. On success the removed task's logger handlers are closed (the process no longer owns them) and a fresh runner is built.

Note: if the same Task instance is shared with another Process, closing its handlers here affects that process too.

Parameters:

Name Type Description Default
task_name str

Name of the task to remove.

required

Raises:

Type Description
TaskNotFoundError

If no task with that name exists.

ValueError

If other tasks depend on it.

Source code in src/processes/process.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def remove_task(self, task_name: str) -> None:
    """Remove a task from the process and re-resolve the graph.

    Rejected if any other task still depends on ``task_name`` — remove the
    dependents first. On success the removed task's logger handlers are
    closed (the process no longer owns them) and a fresh runner is built.

    Note: if the same ``Task`` instance is shared with another ``Process``,
    closing its handlers here affects that process too.

    Parameters
    ----------
    task_name : str
        Name of the task to remove.

    Raises
    ------
    TaskNotFoundError
        If no task with that name exists.
    ValueError
        If other tasks depend on it.
    """
    task = self.get_task(task_name)
    dependents = self.get_dependant_tasks(task_name)
    if dependents:
        blocking = ", ".join(sorted(d.name for d in dependents))
        raise ValueError(
            f"Cannot remove task '{task_name}': still required by {blocking}. "
            "Remove the dependent task(s) first."
        )

    snapshot = list(self.tasks)
    self.tasks = [t for t in self.tasks if t.name != task_name]
    self._commit_or_rollback(snapshot)

    task.close_handlers()

add_task_dependency

add_task_dependency(
    task_name: str, dependency: TaskDependency
) -> None

Add a dependency to an existing task and re-resolve the graph.

Note: this mutates the Task instance's dependencies list; if the task is shared with another Process, that process sees the change too.

Parameters:

Name Type Description Default
task_name str

Name of the (dependent) task that gains the dependency.

required
dependency TaskDependency

The dependency to add. Its task_name must refer to an existing task and must not already be a dependency of task_name.

required

Raises:

Type Description
TypeError

If dependency is not a TaskDependency.

TaskNotFoundError

If task_name does not exist.

ValueError

If task_name already depends on dependency.task_name.

DependencyNotFoundError

If dependency.task_name does not exist in the process.

CircularDependencyError

If the dependency introduces a cycle (including a self-dependency).

Source code in src/processes/process.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def add_task_dependency(self, task_name: str, dependency: TaskDependency) -> None:
    """Add a dependency to an existing task and re-resolve the graph.

    Note: this mutates the ``Task`` instance's ``dependencies`` list; if the
    task is shared with another ``Process``, that process sees the change too.

    Parameters
    ----------
    task_name : str
        Name of the (dependent) task that gains the dependency.
    dependency : TaskDependency
        The dependency to add. Its ``task_name`` must refer to an existing
        task and must not already be a dependency of ``task_name``.

    Raises
    ------
    TypeError
        If ``dependency`` is not a ``TaskDependency``.
    TaskNotFoundError
        If ``task_name`` does not exist.
    ValueError
        If ``task_name`` already depends on ``dependency.task_name``.
    DependencyNotFoundError
        If ``dependency.task_name`` does not exist in the process.
    CircularDependencyError
        If the dependency introduces a cycle (including a self-dependency).
    """
    if not isinstance(dependency, TaskDependency):
        raise TypeError(f"dependency must be of type TaskDependency. Got {type(dependency)}")
    task = self.get_task(task_name)
    if dependency.task_name in task.get_dependencies_names():
        raise ValueError(f"Task '{task_name}' already depends on '{dependency.task_name}'.")

    task.dependencies.append(dependency)
    try:
        self._validate_and_sort()
    except Exception:
        task.dependencies.pop()
        raise
    self.runner = ProcessRunner(self)

run

run(
    parallel: bool | None = None, max_workers: int = 4
) -> ProcessExecutionReport

Execute all tasks in the process.

Runs tasks sequentially or in parallel while respecting dependencies. Dependencies are always resolved before dependent tasks are executed.

Parameters:

Name Type Description Default
parallel bool

Whether to run tasks in parallel while respecting dependencies. If None, automatically set to True for processes with 10 or more tasks, False otherwise. Defaults to None.

None
max_workers int

Maximum number of worker threads for parallel execution. Defaults to 4. Only used when parallel=True. If set to 1, falls back to sequential execution. Values below 1 are clamped to 1.

4

Returns:

Type Description
ProcessExecutionReport

Per-task breakdown of the run, in topological order.

Source code in src/processes/process.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def run(self, parallel: bool | None = None, max_workers: int = 4) -> ProcessExecutionReport:
    """Execute all tasks in the process.

    Runs tasks sequentially or in parallel while respecting dependencies.
    Dependencies are always resolved before dependent tasks are executed.

    Parameters
    ----------
    parallel : bool, optional
        Whether to run tasks in parallel while respecting dependencies.
        If None, automatically set to True for processes with 10 or more tasks,
        False otherwise. Defaults to None.
    max_workers : int, optional
        Maximum number of worker threads for parallel execution. Defaults to 4.
        Only used when parallel=True. If set to 1, falls back to sequential
        execution. Values below 1 are clamped to 1.

    Returns
    -------
    ProcessExecutionReport
        Per-task breakdown of the run, in topological order.
    """
    if parallel is None:
        parallel = len(self.tasks) >= 10

    max_workers = max(1, max_workers)
    if parallel:
        if max_workers == 1:
            parallel = False  # Fallback to sequential if only one worker
    return self.runner.run(parallel, max_workers)

get_dependant_tasks

get_dependant_tasks(task_name: str) -> list[Task]

Retrieve all tasks that directly or indirectly depend on a given task.

Uses a pre-built adjacency map for O(V+E) traversal.

Parameters:

Name Type Description Default
task_name str

The name of the task to find dependants for.

required

Returns:

Type Description
list[Task]

List of all tasks that depend on the specified task, including transitive dependants.

Source code in src/processes/process.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
def get_dependant_tasks(self, task_name: str) -> list[Task]:
    """Retrieve all tasks that directly or indirectly depend on a given task.

    Uses a pre-built adjacency map for O(V+E) traversal.

    Parameters
    ----------
    task_name : str
        The name of the task to find dependants for.

    Returns
    -------
    list[Task]
        List of all tasks that depend on the specified task, including
        transitive dependants.
    """
    found: list[Task] = []
    seen: set[str] = set()
    queue: deque[str] = deque(self._dependants_map.get(task_name, []))
    while queue:
        name = queue.popleft()
        if name in seen:
            continue
        seen.add(name)
        found.append(self._task_map[name])
        queue.extend(self._dependants_map.get(name, []))
    return found

close_loggers

close_loggers() -> None

Close and clean up all logger handlers for all tasks.

Should be called when the process is done to ensure proper resource cleanup.

Source code in src/processes/process.py
377
378
379
380
381
382
383
def close_loggers(self) -> None:
    """Close and clean up all logger handlers for all tasks.

    Should be called when the process is done to ensure proper resource cleanup.
    """
    for task in self.tasks:
        task.close_handlers()

Task

Task(
    name: str,
    func: Callable[..., Any],
    log_path: str | None = None,
    args: tuple[Any, ...] = (),
    kwargs: dict[str, Any] | None = None,
    dependencies: list[TaskDependency] | None = None,
    traced_vars_frame_filter: str | None = None,
    timeout: float | None = None,
    retries: int | None = 0,
    retry_on: tuple[type[Exception], ...] | None = None,
)

A Task represents a unit of work to be executed within a Process.

A Task encapsulates a callable function with its arguments, dependencies on other tasks, and logging configuration. Tasks can be executed by the Process class, sequentially or in parallel, with automatic dependency resolution and result passing between dependent tasks.

Attributes:

Name Type Description
name str

Unique name for the task (cannot contain spaces).

log_path str | None

File path where task logs will be written. None if no file logging is configured.

func Callable

The function to execute when the task runs.

args tuple

Positional arguments to pass to the function. Defaults to empty tuple.

kwargs dict

Keyword arguments to pass to the function. Defaults to empty dict.

dependencies list[TaskDependency]

List of tasks this task depends on. Defaults to empty list.

traced_vars_frame_filter str | None

Substring selecting which traceback frame's local variables are captured into the failure context on error. None selects the outermost user frame. Defaults to None.

timeout float | None

Seconds allowed per attempt before a TimeoutError is raised. None means no limit. Defaults to None.

retries int

Additional attempts after the first failure. 0 or None means no retries. Defaults to 0.

retry_on tuple[type[Exception], ...] | None

Exception types that trigger a retry. When retries >= 1 and retry_on is None, defaults at call time to (ConnectionError, TimeoutError). Defaults to None.

logger Logger

Logger instance for this task, automatically configured.

Parameters:

Name Type Description Default
name str

Unique task name; must not contain spaces. Normalized to lowercase, so task names (and the dependency references that point at them) are matched case-insensitively.

required
func Callable[..., Any]

The callable executed when the task runs.

required
log_path str | None

File path the task's log records are written to (one FileHandler at INFO level, format "%(asctime)s - %(name)s - %(levelname)s - %(message)s", with the structured failure context appended on error). None means no file logging is configured, and a NullHandler is attached instead. Defaults to None.

None
args tuple[Any, ...]

Positional arguments forwarded to func. Defaults to ().

()
kwargs dict[str, Any] | None

Keyword arguments forwarded to func. None is treated as an empty dict.

None
dependencies list[TaskDependency] | None

Tasks this task depends on. None is treated as an empty list.

None
traced_vars_frame_filter str | None

Substring used to select the traceback frame whose local variables are captured into the failure context (and thus into both the logfile and any report notification). When None (default), the outermost user frame is used; when set, the outermost frame whose filename contains this substring is used instead.

None
timeout float | None

Seconds allowed per attempt before TimeoutError is raised for that attempt. None means no limit. When a timeout fires, the underlying thread is detached rather than killed (Python threading limitation). Defaults to None.

None
retries int | None

Number of additional attempts after the first failure. 0 or None means a single attempt with no retry. Defaults to 0.

0
retry_on tuple[type[Exception], ...] | None

Exception types that trigger a retry. Evaluated only when retries >= 1. When None, defaults at call time to (ConnectionError, TimeoutError). Defaults to None.

None

Raises:

Type Description
TypeError

If any parameter is not of the expected type, timeout is not a positive number, retries is negative, retry_on is not a tuple of Exception subclasses, or traced_vars_frame_filter is neither str nor None.

ValueError

If name contains a space, if the same dependency name is listed more than once, or if the task lists itself as a dependency.

Source code in src/processes/task.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def __init__(
    self,
    name: str,
    func: Callable[..., Any],
    log_path: str | None = None,
    args: tuple[Any, ...] = (),
    kwargs: dict[str, Any] | None = None,
    dependencies: list[TaskDependency] | None = None,
    traced_vars_frame_filter: str | None = None,
    timeout: float | None = None,
    retries: int | None = 0,
    retry_on: tuple[type[Exception], ...] | None = None,
):
    if not isinstance(name, str):
        raise TypeError(f"name must be str. Got {type(name)}")
    self.name = name.lower()
    self.log_path = log_path
    self.func = func
    self.args = args
    self.traced_vars_frame_filter = traced_vars_frame_filter
    self.timeout = timeout
    self.retries = retries if retries is not None else 0
    self.retry_on = retry_on

    if kwargs is None:
        self.kwargs = {}
    else:
        self.kwargs = kwargs
    if dependencies is None:
        self.dependencies = []
    else:
        self.dependencies = dependencies

    self._check_input_types()
    if " " in self.name:
        raise ValueError(f"Task name cannot contain spaces. Got {self.name}")

    depedencies_names = []
    for dependency in self.dependencies:
        if dependency.task_name in depedencies_names:
            raise ValueError(f"Duplicate dependency name: {dependency.task_name}")
        depedencies_names.append(dependency.task_name)
        if dependency.task_name == self.name:
            raise CircularDependencyError(f"Task '{self.name}' lists itself as a dependency.")

    logger = logging.getLogger(f"processes.{self.name}.{id(self)}")
    logger.setLevel(logging.DEBUG)

    if self.log_path is not None:
        file_handler = logging.FileHandler(self.log_path)
        file_handler.setLevel(logging.INFO)
        file_handler.setFormatter(_TaskLogfileFormatter())
        logger.addHandler(file_handler)
    else:
        logger.addHandler(logging.NullHandler())

    self._frame_filter: str | None = self.traced_vars_frame_filter
    self.logger = logger

get_dependencies_names

get_dependencies_names() -> set[str]

Get the names of all tasks this task depends on.

Returns:

Type Description
set[str]

Set of dependency task names.

Source code in src/processes/task.py
225
226
227
228
229
230
231
232
233
234
def get_dependencies_names(self) -> set[str]:
    """
    Get the names of all tasks this task depends on.

    Returns
    -------
    set[str]
        Set of dependency task names.
    """
    return {dependency.task_name for dependency in self.dependencies}

close_handlers

close_handlers() -> None

Close and detach every handler on this task's logger.

The task owns the lifecycle of the logger it builds in __init__; this is the single teardown path used by Process.close_loggers and Process.remove_task. Safe to call more than once.

Source code in src/processes/task.py
236
237
238
239
240
241
242
243
244
245
def close_handlers(self) -> None:
    """Close and detach every handler on this task's logger.

    The task owns the lifecycle of the logger it builds in ``__init__``;
    this is the single teardown path used by ``Process.close_loggers`` and
    ``Process.remove_task``. Safe to call more than once.
    """
    for handler in list(self.logger.handlers):
        handler.close()
        self.logger.removeHandler(handler)

run

run(executing_process: Process | None = None) -> TaskResult

Execute the task, retrying on transient failures up to retries times.

Never propagates: a failure while resolving dependency arguments (before any attempt runs) is captured and returned as an ERRORED result with attempts=0, just like a failure inside the task's function.

Parameters:

Name Type Description Default
executing_process Process

Parent process; used to inject dependency results and to build the downstream impact list on failure.

None

Returns:

Type Description
TaskResult

worked=True with the return value on success; worked=False with the last exception on failure.

Source code in src/processes/task.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
def run(self, executing_process: Process | None = None) -> TaskResult:
    """Execute the task, retrying on transient failures up to ``retries`` times.

    Never propagates: a failure while resolving dependency arguments (before
    any attempt runs) is captured and returned as an ``ERRORED`` result with
    ``attempts=0``, just like a failure inside the task's function.

    Parameters
    ----------
    executing_process : Process, optional
        Parent process; used to inject dependency results and to build
        the downstream impact list on failure.

    Returns
    -------
    TaskResult
        ``worked=True`` with the return value on success; ``worked=False``
        with the last exception on failure.
    """
    max_attempts = self.retries + 1
    effective_retry_on = (
        self.retry_on if self.retry_on is not None else (ConnectionError, TimeoutError)
    )

    self.logger.info(f"Starting {self.name}.")
    start = time.monotonic()

    try:
        final_args, final_kwargs = self._resolve_args(executing_process)
    except Exception as e:
        return self._errored_result(e, executing_process, time.monotonic() - start, attempts=0)

    last_exc: Exception | None = None
    for attempt in range(1, max_attempts + 1):
        try:
            result = self._call_with_timeout(final_args, final_kwargs)
            self.logger.info(f"Finished {self.name}.")
            return TaskResult.success(
                result, elapsed_seconds=time.monotonic() - start, attempts=attempt
            )
        except Exception as e:
            last_exc = e
            retryable = self.retries >= 1 and attempt < max_attempts
            if retryable and isinstance(e, effective_retry_on):
                self.logger.warning(
                    f"Attempt {attempt}/{max_attempts} failed: {e}. Retrying..."
                )
                continue
            break

    assert last_exc is not None
    return self._errored_result(
        last_exc, executing_process, time.monotonic() - start, attempts=attempt
    )

TaskDependency

TaskDependency(
    task_name: str,
    use_result_as_additional_args: bool = False,
    use_result_as_additional_kwargs: bool = False,
    additional_kwarg_name: str = "",
)

Represents a dependency relationship between tasks.

Defines how a task depends on another task, including how the result of the dependency should be passed to the dependent task (as additional positional arguments, keyword arguments, or both).

Attributes:

Name Type Description
task_name str

The name of the task this dependency refers to. Normalized to lowercase to match :class:Task names case-insensitively.

use_result_as_additional_args bool

If True, the result of the dependency task will be passed as an additional positional argument as the last argument. Defaults to False.

use_result_as_additional_kwargs bool

If True, the result of the dependency task will be passed as a keyword argument. Defaults to False.

additional_kwarg_name str

The name of the keyword argument to use if use_result_as_additional_kwargs is True. Must be a non-empty string when use_result_as_additional_kwargs is True. Defaults to "".

Raises:

Type Description
TypeError

If any parameter type is invalid or if use_result_as_additional_kwargs is True but additional_kwarg_name is not a string.

Source code in src/processes/task_types.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def __init__(
    self,
    task_name: str,
    use_result_as_additional_args: bool = False,
    use_result_as_additional_kwargs: bool = False,
    additional_kwarg_name: str = "",
):
    if not isinstance(task_name, str):
        raise TypeError(f"task_name must be of type str. Got {type(task_name)}")
    self.task_name = task_name.lower()
    self.use_result_as_additional_args = use_result_as_additional_args
    self.use_result_as_additional_kwargs = use_result_as_additional_kwargs
    self.additional_kwarg_name = additional_kwarg_name

    if not isinstance(self.use_result_as_additional_args, bool):
        raise TypeError(
            f"use_result_as_additional_args must be of type bool. "
            f"Got {type(self.use_result_as_additional_args)}"
        )
    if not isinstance(self.use_result_as_additional_kwargs, bool):
        raise TypeError(
            f"use_result_as_additional_kwargs must be of type bool. "
            f"Got {type(self.use_result_as_additional_kwargs)}"
        )

    if self.use_result_as_additional_kwargs and self.additional_kwarg_name == "":
        raise TypeError(
            "If use_result_as_additional_kwargs is True, additional_kwarg_name"
            " must be a non-empty string."
        )

__hash__

__hash__() -> int

Return hash of the dependency based on task name.

Returns:

Type Description
int

Hash value based on the task_name attribute.

Source code in src/processes/task_types.py
193
194
195
196
197
198
199
200
201
202
def __hash__(self) -> int:
    """
    Return hash of the dependency based on task name.

    Returns
    -------
    int
        Hash value based on the task_name attribute.
    """
    return hash(self.task_name)

TaskResult

TaskResult(
    status: TaskStatus,
    result: Any,
    exception: Exception | None,
    error_data: ErrorData | None = None,
    elapsed_seconds: float = 0.0,
    attempts: int = 0,
)

Container for the result of a task execution.

Holds the outcome of running a task, including its status, return value, and any exception that occurred.

Attributes:

Name Type Description
status TaskStatus

Outcome of the task: PENDING, SUCCESS, ERRORED, or SKIPPED.

worked bool

True if status is SUCCESS, False otherwise.

result Any

The return value of the task's function if execution succeeded, None if failed.

exception Exception | None

The exception object if execution failed, None if successful.

error_data ErrorData | None

Structured failure context (function, args, kwargs, traceback, traced variables, downstream impact) when execution failed; None if the task succeeded.

elapsed_seconds float

Wall-clock time spent running the task across all attempts, in seconds. Defaults to 0.0.

attempts int

Number of attempts actually executed (1 or more if the task ran, 0 if it never ran). Defaults to 0.

Source code in src/processes/task_types.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def __init__(
    self,
    status: TaskStatus,
    result: Any,
    exception: Exception | None,
    error_data: ErrorData | None = None,
    elapsed_seconds: float = 0.0,
    attempts: int = 0,
):
    self.status = status
    self.result = result
    self.exception = exception
    self.error_data = error_data
    self.elapsed_seconds = elapsed_seconds
    self.attempts = attempts

worked property

worked: bool

True if the task executed successfully, False otherwise.

pending classmethod

pending() -> TaskResult

Build the placeholder result for a task that has not run yet.

Source code in src/processes/task_types.py
90
91
92
93
@classmethod
def pending(cls) -> TaskResult:
    """Build the placeholder result for a task that has not run yet."""
    return cls(TaskStatus.PENDING, None, None)

skipped classmethod

skipped() -> TaskResult

Build the result for a task skipped after an upstream dependency failed.

Source code in src/processes/task_types.py
95
96
97
98
@classmethod
def skipped(cls) -> TaskResult:
    """Build the result for a task skipped after an upstream dependency failed."""
    return cls(TaskStatus.SKIPPED, None, None)

success classmethod

success(
    result: Any,
    *,
    elapsed_seconds: float = 0.0,
    attempts: int = 0,
) -> TaskResult

Build the result for a task whose function returned without raising.

Source code in src/processes/task_types.py
100
101
102
103
104
105
106
107
108
109
@classmethod
def success(cls, result: Any, *, elapsed_seconds: float = 0.0, attempts: int = 0) -> TaskResult:
    """Build the result for a task whose function returned without raising."""
    return cls(
        TaskStatus.SUCCESS,
        result,
        None,
        elapsed_seconds=elapsed_seconds,
        attempts=attempts,
    )

errored classmethod

errored(
    exception: Exception,
    *,
    error_data: ErrorData | None = None,
    elapsed_seconds: float = 0.0,
    attempts: int = 0,
) -> TaskResult

Build the result for a task whose function raised after exhausting retries.

Source code in src/processes/task_types.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
@classmethod
def errored(
    cls,
    exception: Exception,
    *,
    error_data: ErrorData | None = None,
    elapsed_seconds: float = 0.0,
    attempts: int = 0,
) -> TaskResult:
    """Build the result for a task whose function raised after exhausting retries."""
    return cls(
        TaskStatus.ERRORED,
        None,
        exception,
        error_data=error_data,
        elapsed_seconds=elapsed_seconds,
        attempts=attempts,
    )

ProcessExecutionReport dataclass

ProcessExecutionReport(
    entries: dict[str, TaskReportEntry] = dict(),
    process_name: str = "",
)

Per-task breakdown of a finished :meth:Process.run call.

Attributes:

Name Type Description
entries dict[str, TaskReportEntry]

Mapping of task name to its report entry, ordered the same way as process.tasks (topological order).

process_name str

Name of the process the report came from ("" if unnamed). Used to label notifications such as the email subject.

successes property

successes: dict[str, TaskReportEntry]

Entries for tasks whose status is SUCCESS.

errored property

errored: dict[str, TaskReportEntry]

Entries for tasks whose status is ERRORED.

skipped property

skipped: dict[str, TaskReportEntry]

Entries for tasks whose status is SKIPPED.

from_results classmethod

from_results(
    process: Process, results: dict[str, TaskResult]
) -> ProcessExecutionReport

Build a report from a finished process run.

Parameters:

Name Type Description Default
process Process

The process that was run. Used for task definitions (name, function, args, kwargs) and topological ordering.

required
results dict[str, TaskResult]

One TaskResult per task, keyed by task name, as produced by :class:~processes.process.ProcessRunner.

required

Returns:

Type Description
ProcessExecutionReport

One entry per task in process.tasks, in topological order.

Source code in src/processes/execution_report.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
@classmethod
def from_results(
    cls, process: Process, results: dict[str, TaskResult]
) -> ProcessExecutionReport:
    """Build a report from a finished process run.

    Parameters
    ----------
    process : Process
        The process that was run. Used for task definitions (name,
        function, args, kwargs) and topological ordering.
    results : dict[str, TaskResult]
        One ``TaskResult`` per task, keyed by task name, as produced by
        :class:`~processes.process.ProcessRunner`.

    Returns
    -------
    ProcessExecutionReport
        One entry per task in ``process.tasks``, in topological order.
    """
    entries: dict[str, TaskReportEntry] = {}
    for task in process.tasks:
        res = results[task.name]
        entries[task.name] = TaskReportEntry(
            name=task.name,
            function=task.func.__name__,
            args=task.args,
            kwargs=task.kwargs,
            status=res.status,
            elapsed_seconds=res.elapsed_seconds,
            attempts=res.attempts,
            result=res.result if res.worked else None,
            error=res.error_data if res.status == TaskStatus.ERRORED else None,
        )
    return cls(entries, process.name)

to_json

to_json(
    *, indent: int | None = None, **dumps_kwargs: Any
) -> str

Serialize the whole report to a JSON string without dropping any field.

Every entry and every field is included. Values that are not natively JSON-serializable are rendered faithfully rather than omitted: TaskStatus as its string value, the nested ErrorData as an object, and any arbitrary object appearing in args, kwargs or result via repr(). The content is therefore lossless, though repr-rendered objects are not round-trippable into live objects.

Parameters:

Name Type Description Default
indent int

Forwarded to json.dumps for pretty-printing. None (default) produces a compact single line.

None
**dumps_kwargs Any

Forwarded to json.dumps (e.g. sort_keys=True). Any default is ignored so the lossless fallback always applies.

{}

Returns:

Type Description
str

A JSON object {"entries": {<name>: {...}, ...}} with entries in topological order.

Source code in src/processes/execution_report.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def to_json(self, *, indent: int | None = None, **dumps_kwargs: Any) -> str:
    """Serialize the whole report to a JSON string without dropping any field.

    Every entry and every field is included. Values that are not natively
    JSON-serializable are rendered faithfully rather than omitted:
    ``TaskStatus`` as its string value, the nested ``ErrorData`` as an
    object, and any arbitrary object appearing in ``args``, ``kwargs`` or
    ``result`` via ``repr()``. The content is therefore lossless, though
    ``repr``-rendered objects are not round-trippable into live objects.

    Parameters
    ----------
    indent : int, optional
        Forwarded to ``json.dumps`` for pretty-printing. ``None`` (default)
        produces a compact single line.
    **dumps_kwargs : Any
        Forwarded to ``json.dumps`` (e.g. ``sort_keys=True``). Any
        ``default`` is ignored so the lossless fallback always applies.

    Returns
    -------
    str
        A JSON object ``{"entries": {<name>: {...}, ...}}`` with entries in
        topological order.
    """
    dumps_kwargs.pop("default", None)
    return json.dumps(self, default=_json_default, indent=indent, **dumps_kwargs)

notify

notify(
    *channels: ReportChannel,
    only_errors: bool = False,
    tasks: list[str] | None = None,
    show_warnings: bool = True,
) -> None

Deliver the report through each channel, in order.

Each channel renders and sends the report itself (email, webhook, ...). What detail is included is configured per channel (see ReportContent). If a channel raises, the exception is caught so the remaining channels still receive the report; a UserWarning is emitted when show_warnings is True.

Parameters:

Name Type Description Default
*channels ReportChannel

Channels to deliver the report to. No-op if none are given.

()
only_errors bool

When True, each channel restricts the payload to tasks whose status is ERRORED (see :attr:errored). Defaults to False.

False
tasks list[str]

Restrict the report to these task names, compared case-insensitively. None (default) includes every task; an empty list includes none. Combines with only_errors — both filters apply.

None
show_warnings bool

Emit a UserWarning when a channel fails. Defaults to True.

True
Source code in src/processes/execution_report.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def notify(
    self,
    *channels: ReportChannel,
    only_errors: bool = False,
    tasks: list[str] | None = None,
    show_warnings: bool = True,
) -> None:
    """Deliver the report through each channel, in order.

    Each channel renders and sends the report itself (email, webhook, ...).
    What detail is included is configured per channel (see ``ReportContent``).
    If a channel raises, the exception is caught so the remaining channels
    still receive the report; a ``UserWarning`` is emitted when
    ``show_warnings`` is ``True``.

    Parameters
    ----------
    *channels : ReportChannel
        Channels to deliver the report to. No-op if none are given.
    only_errors : bool
        When ``True``, each channel restricts the payload to tasks whose
        status is ``ERRORED`` (see :attr:`errored`). Defaults to ``False``.
    tasks : list[str], optional
        Restrict the report to these task names, compared case-insensitively.
        ``None`` (default) includes every task; an empty list includes none.
        Combines with ``only_errors`` — both filters apply.
    show_warnings : bool
        Emit a ``UserWarning`` when a channel fails. Defaults to ``True``.
    """
    report = self if tasks is None else self._for_tasks(tasks)
    if only_errors and all(v.status != TaskStatus.ERRORED for v in self.entries.values()):
        return
    for channel in channels:
        try:
            channel.send_report(report, errors_only=only_errors)
        except Exception as exc:
            if show_warnings:
                warnings.warn(
                    f"{type(channel).__name__} failed to send report: {exc}",
                    stacklevel=2,
                )

TaskReportEntry dataclass

TaskReportEntry(
    name: str,
    function: str,
    args: tuple[Any, ...],
    kwargs: dict[str, Any],
    status: TaskStatus,
    elapsed_seconds: float,
    attempts: int,
    result: Any | None = None,
    error: ErrorData | None = None,
)

Per-task entry in a :class:ProcessExecutionReport.

Attributes:

Name Type Description
name str

The task's name.

function str

Name of the function the task runs.

args tuple[Any, ...]

Positional arguments the task was constructed with.

kwargs dict[str, Any]

Keyword arguments the task was constructed with.

status TaskStatus

Outcome of the task: SUCCESS, ERRORED, or SKIPPED.

elapsed_seconds float

Wall-clock time spent running the task across all attempts. 0.0 for skipped tasks.

attempts int

Number of attempts actually executed. 0 for skipped tasks.

result Any | None

The task's return value if status is SUCCESS, else None.

error ErrorData | None

Structured failure context if status is ERRORED, else None.

TaskStatus

Bases: Enum

Outcome of a task within a process execution.

Attributes:

Name Type Description
PENDING

The task has not been executed yet.

SUCCESS

The task ran and its function returned without raising.

ERRORED

The task ran and its function raised an exception (after exhausting retries, if any).

SKIPPED

The task never ran because an upstream dependency failed.

ErrorData dataclass

ErrorData(
    task_name: str = "?",
    function: str = "?",
    args: tuple[Any, ...] = (),
    kwargs: dict[str, Any] = dict(),
    downstream_impact: list[str] = list(),
    exception: str = "",
    traceback_str: str = "",
    traced_vars: dict[str, str] = dict(),
    traced_vars_location: str = "",
)

Typed view of a task failure, extracted from record.task_context.

Attributes:

Name Type Description
task_name str

Name of the task that failed. Defaults to "?".

function str

Name of the function that was executing. Defaults to "?".

args tuple[Any, ...]

Positional arguments the function was called with. Defaults to ().

kwargs dict[str, Any]

Keyword arguments the function was called with. Defaults to {}.

downstream_impact list[str]

Names of tasks skipped as a result of this failure. Defaults to [].

exception str

String representation of the raised exception. Defaults to "".

traceback_str str

Full formatted traceback. Defaults to "".

traced_vars dict[str, str]

Mapping of local variable names to repr(value) for the traced frame. Defaults to {}.

traced_vars_location str

"filename:lineno" of the traced frame. Defaults to "".

SMTPConfig dataclass

SMTPConfig(mailhost: tuple[str, int], fromaddr: str, toaddrs: list[str], credentials: tuple[str, str] | None = None, secure: tuple[] | tuple[str] | tuple[str, str] | tuple[str, str, SSLContext] | None = None, timeout: int = 5)

SMTP transport configuration for HTML email error alerts.

Attributes:

Name Type Description
mailhost tuple[str, int]

(host, port) of the SMTP server.

fromaddr str

Sender email address.

toaddrs list[str]

Recipient email addresses.

credentials tuple[str, str] | None

(username, password) for SMTP authentication. Defaults to None.

secure tuple[] | tuple[str] | tuple[str, str] | tuple[str, str, SSLContext] | None

Security configuration. Use () for STARTTLS, (keyfile,) or (keyfile, certfile) for explicit TLS context creation, or (keyfile, certfile, ssl_context) to supply a pre-built ssl.SSLContext. None means no encryption. Defaults to None.

timeout int

Connection timeout in seconds. Defaults to 5.

HTMLEmailStyle dataclass

HTMLEmailStyle(
    palette: str = _DEFAULT_PALETTE,
    language: str = _DEFAULT_LANGUAGE,
)

HTML presentation settings for the report email.

Both fields default to a neutral, English email — pass only what you want to override.

Attributes:

Name Type Description
palette str

Color scheme: "neutral", "catppuccin", "neobones", or "slate". Defaults to "neutral".

language str

ISO 639-1 code for the email body text: "en", "es", "pt", "fr", "de", or "it". Defaults to "en".

Raises:

Type Description
ValueError

If palette or language is not one of the supported values.