Skip to content

API Reference

This page is automatically generated from the source code docstrings.

CircularDependencyError

Bases: Exception

Raised when circular dependencies are detected among tasks.

Source code in src/processes/process.py
20
21
22
23
class CircularDependencyError(Exception):
    """Raised when circular dependencies are detected among tasks."""

    pass

DependencyNotFoundError

Bases: Exception

Raised when a task depends on a non-existent task.

Source code in src/processes/process.py
 8
 9
10
11
class DependencyNotFoundError(Exception):
    """Raised when a task depends on a non-existent task."""

    pass

Process

Manages and executes a collection of interdependent tasks.

A Process orchestrates the execution of multiple tasks, handling dependency resolution, task ordering. Task execution can be performed in parallel or sequentially. It provides logging management and error propagation for dependent tasks. If a task fails, all tasks depending on it are marked as failed without execution, but non-dependent tasks continue to run.

Attributes

tasks : list[Task] List of tasks to be executed, automatically sorted by dependencies. runner : ProcessRunner The runner responsible for executing the tasks.

Raises

TypeError If tasks is not a list or contains non-Task elements. ValueError If duplicate task names are found. DependencyNotFoundError If a task depends on a non-existent task. CircularDependencyError If circular dependencies are detected among tasks.

Source code in src/processes/process.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
class Process:
    """
    Manages and executes a collection of interdependent tasks.

    A Process orchestrates the execution of multiple tasks, handling dependency
    resolution, task ordering. Task execution can be performed in parallel or sequentially. It
    provides logging management and error propagation for dependent tasks. If a task fails,
    all tasks depending on it are marked as failed without execution, but non-dependent tasks
    continue to run.

    Attributes
    ----------
    tasks : list[Task]
        List of tasks to be executed, automatically sorted by dependencies.
    runner : ProcessRunner
        The runner responsible for executing the tasks.

    Raises
    ------
    TypeError
        If tasks is not a list or contains non-Task elements.
    ValueError
        If duplicate task names are found.
    DependencyNotFoundError
        If a task depends on a non-existent task.
    CircularDependencyError
        If circular dependencies are detected among tasks.
    """

    def __init__(self, tasks: list[Task]):
        self.tasks = tasks

        try:
            self._check_input_types()
            self._check_duplicate_names()
            self._check_dependencies_exist()
            self._topological_sort()
        except Exception as e:
            self.close_loggers()
            raise e
        self.runner = ProcessRunner(self)

    def __enter__(self) -> Self:
        """Called when entering the 'with' block."""
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> Literal[False]:
        """Called when exiting the 'with' block, even if an error occurred."""
        self.close_loggers()
        return False

    def _check_input_types(self) -> None:
        """Validate that tasks is a list containing only Task objects.

        Raises
        ------
        TypeError
            If tasks is not a list or contains non-Task elements.
        """
        if not isinstance(self.tasks, list):
            raise TypeError(f"tasks must be list. Got {type(self.tasks)}")
        for task in self.tasks:
            if not isinstance(task, Task):
                raise TypeError(f"task must be Task. Got {type(task)}")

    def _check_duplicate_names(self) -> None:
        """Verify that all task names are unique.

        Raises
        ------
        ValueError
            If duplicate task names are found.
        """
        names = set()
        for task in self.tasks:
            if task.name in names:
                raise ValueError(f"Duplicate task name: {task.name}")
            names.add(task.name)

    def _check_dependencies_exist(self) -> None:
        """Verify that all task dependencies refer to existing tasks.

        Raises
        ------
        DependencyNotFoundError
            If a task depends on a non-existent task.
        """
        names = {t.name for t in self.tasks}
        for task in self.tasks:
            for dep in task.get_dependencies_names():
                if dep not in names:
                    raise DependencyNotFoundError(
                        f"Task {task.name} depends on missing task: {dep}"
                    )

    def _topological_sort(self) -> None:
        """Sort tasks based on dependencies using Kahn's Algorithm in O(V+E) time.

        Reorders the task list so that dependencies are always executed before
        tasks that depend on them.

        Raises
        ------
        CircularDependencyError
            If circular dependencies are detected among tasks.
        """
        in_degree = {t.name: 0 for t in self.tasks}
        graph: dict[str, list[str]] = {t.name: [] for t in self.tasks}
        task_map = {t.name: t for t in self.tasks}

        for task in self.tasks:
            for dep in task.dependencies:
                graph[dep.task_name].append(task.name)
                in_degree[task.name] += 1

        queue = [name for name, deg in in_degree.items() if deg == 0]
        sorted_tasks = []

        while queue:
            u = queue.pop(0)
            sorted_tasks.append(task_map[u])
            for v in graph[u]:
                in_degree[v] -= 1
                if in_degree[v] == 0:
                    queue.append(v)

        if len(sorted_tasks) != len(self.tasks):
            raise CircularDependencyError("Circular dependency detected.")
        self.tasks = sorted_tasks

    def get_task(self, task_name: str) -> Task:
        """Retrieve a task by name.

        Parameters
        ----------
        task_name : str
            The name of the task to retrieve.

        Returns
        -------
        Task
            The task with the specified name.

        Raises
        ------
        TaskNotFoundError
            If no task with the given name exists.
        """
        for task in self.tasks:
            if task.name == task_name:
                return task
        raise TaskNotFoundError(f"Task not found: {task_name}")

    def run(self, parallel: bool | None = None, max_workers: int = 4) -> ProcessResult:
        """Execute all tasks in the process.

        Runs tasks sequentially or in parallel while respecting dependencies.
        Dependencies are always resolved before dependent tasks are executed.

        Parameters
        ----------
        parallel : bool, optional
            Whether to run tasks in parallel while respecting dependencies.
            If None, automatically set to True for processes with 10 or more tasks,
            False otherwise. Defaults to None.
        max_workers : int, optional
            Maximum number of worker threads for parallel execution. Defaults to 4.
            Only used when parallel=True. If set to 1, falls back to sequential execution.

        Returns
        -------
        ProcessResult
            Contains passed_tasks_results (dict mapping task names to TaskResult)
            and failed_tasks (set of task names that failed).
        """
        if parallel is None:
            parallel = len(self.tasks) >= 10

        max_workers = max(1, max_workers)
        if parallel:
            if max_workers == 1:
                parallel = False  # Fallback to sequential if only one worker
        process_result = self.runner.run(parallel, max_workers)
        return process_result

    def get_dependant_tasks(self, task_name: str) -> list[Task]:
        """Retrieve all tasks that directly or indirectly depend on a given task.

        Parameters
        ----------
        task_name : str
            The name of the task to find dependants for.

        Returns
        -------
        list[Task]
            List of all tasks that depend on the specified task, including
            transitive dependencies (tasks that depend on tasks that depend
            on the specified task).
        """
        found = []

        def find(name: str) -> None:
            for t in self.tasks:
                if name in t.get_dependencies_names() and t not in found:
                    found.append(t)
                    find(t.name)

        find(task_name)
        return found

    def close_loggers(self) -> None:
        """Close and clean up all logger handlers for all tasks.

        Should be called when the process is done to ensure proper resource cleanup.
        """
        for task in self.tasks:
            for handler in task.logger.handlers:
                handler.close()
                task.logger.removeHandler(handler)

__enter__()

Called when entering the 'with' block.

Source code in src/processes/process.py
88
89
90
def __enter__(self) -> Self:
    """Called when entering the 'with' block."""
    return self

__exit__(exc_type, exc_value, traceback)

Called when exiting the 'with' block, even if an error occurred.

Source code in src/processes/process.py
 92
 93
 94
 95
 96
 97
 98
 99
100
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> Literal[False]:
    """Called when exiting the 'with' block, even if an error occurred."""
    self.close_loggers()
    return False

close_loggers()

Close and clean up all logger handlers for all tasks.

Should be called when the process is done to ensure proper resource cleanup.

Source code in src/processes/process.py
262
263
264
265
266
267
268
269
270
def close_loggers(self) -> None:
    """Close and clean up all logger handlers for all tasks.

    Should be called when the process is done to ensure proper resource cleanup.
    """
    for task in self.tasks:
        for handler in task.logger.handlers:
            handler.close()
            task.logger.removeHandler(handler)

get_dependant_tasks(task_name)

Retrieve all tasks that directly or indirectly depend on a given task.

Parameters

task_name : str The name of the task to find dependants for.

Returns

list[Task] List of all tasks that depend on the specified task, including transitive dependencies (tasks that depend on tasks that depend on the specified task).

Source code in src/processes/process.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def get_dependant_tasks(self, task_name: str) -> list[Task]:
    """Retrieve all tasks that directly or indirectly depend on a given task.

    Parameters
    ----------
    task_name : str
        The name of the task to find dependants for.

    Returns
    -------
    list[Task]
        List of all tasks that depend on the specified task, including
        transitive dependencies (tasks that depend on tasks that depend
        on the specified task).
    """
    found = []

    def find(name: str) -> None:
        for t in self.tasks:
            if name in t.get_dependencies_names() and t not in found:
                found.append(t)
                find(t.name)

    find(task_name)
    return found

get_task(task_name)

Retrieve a task by name.

Parameters

task_name : str The name of the task to retrieve.

Returns

Task The task with the specified name.

Raises

TaskNotFoundError If no task with the given name exists.

Source code in src/processes/process.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def get_task(self, task_name: str) -> Task:
    """Retrieve a task by name.

    Parameters
    ----------
    task_name : str
        The name of the task to retrieve.

    Returns
    -------
    Task
        The task with the specified name.

    Raises
    ------
    TaskNotFoundError
        If no task with the given name exists.
    """
    for task in self.tasks:
        if task.name == task_name:
            return task
    raise TaskNotFoundError(f"Task not found: {task_name}")

run(parallel=None, max_workers=4)

Execute all tasks in the process.

Runs tasks sequentially or in parallel while respecting dependencies. Dependencies are always resolved before dependent tasks are executed.

Parameters

parallel : bool, optional Whether to run tasks in parallel while respecting dependencies. If None, automatically set to True for processes with 10 or more tasks, False otherwise. Defaults to None. max_workers : int, optional Maximum number of worker threads for parallel execution. Defaults to 4. Only used when parallel=True. If set to 1, falls back to sequential execution.

Returns

ProcessResult Contains passed_tasks_results (dict mapping task names to TaskResult) and failed_tasks (set of task names that failed).

Source code in src/processes/process.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def run(self, parallel: bool | None = None, max_workers: int = 4) -> ProcessResult:
    """Execute all tasks in the process.

    Runs tasks sequentially or in parallel while respecting dependencies.
    Dependencies are always resolved before dependent tasks are executed.

    Parameters
    ----------
    parallel : bool, optional
        Whether to run tasks in parallel while respecting dependencies.
        If None, automatically set to True for processes with 10 or more tasks,
        False otherwise. Defaults to None.
    max_workers : int, optional
        Maximum number of worker threads for parallel execution. Defaults to 4.
        Only used when parallel=True. If set to 1, falls back to sequential execution.

    Returns
    -------
    ProcessResult
        Contains passed_tasks_results (dict mapping task names to TaskResult)
        and failed_tasks (set of task names that failed).
    """
    if parallel is None:
        parallel = len(self.tasks) >= 10

    max_workers = max(1, max_workers)
    if parallel:
        if max_workers == 1:
            parallel = False  # Fallback to sequential if only one worker
    process_result = self.runner.run(parallel, max_workers)
    return process_result

ProcessResult

Container for the results of a process execution.

Holds the outcomes of all tasks executed in a process, separating successful and failed tasks with their respective results.

Attributes

passed_tasks_results : dict[str, TaskResult] Mapping of task names to TaskResult objects for all tasks that executed successfully. failed_tasks : set[str] Set of task names for all tasks that failed during execution.

Source code in src/processes/process.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class ProcessResult:
    """
    Container for the results of a process execution.

    Holds the outcomes of all tasks executed in a process, separating successful
    and failed tasks with their respective results.

    Attributes
    ----------
    passed_tasks_results : dict[str, TaskResult]
        Mapping of task names to TaskResult objects for all tasks that executed successfully.
    failed_tasks : set[str]
        Set of task names for all tasks that failed during execution.
    """

    def __init__(self, passed_tasks_results: dict[str, TaskResult], failed_tasks: set[str]):
        self.passed_tasks_results = passed_tasks_results
        self.failed_tasks = failed_tasks

ProcessRunner

Executes tasks in a Process, handling both sequential and parallel execution.

Manages task execution state, tracks passed and failed tasks, and coordinates dependencies during execution.

Attributes

process : Process Reference to the parent Process being executed. passed_results : dict[str, TaskResult] Results from successfully executed tasks. failed_tasks : set[str] Names of tasks that failed during execution. submitted_tasks : set[str] Names of tasks that have been submitted for execution.

Source code in src/processes/process.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
class ProcessRunner:
    """
    Executes tasks in a Process, handling both sequential and parallel execution.

    Manages task execution state, tracks passed and failed tasks, and coordinates
    dependencies during execution.

    Attributes
    ----------
    process : Process
        Reference to the parent Process being executed.
    passed_results : dict[str, TaskResult]
        Results from successfully executed tasks.
    failed_tasks : set[str]
        Names of tasks that failed during execution.
    submitted_tasks : set[str]
        Names of tasks that have been submitted for execution.
    """

    def __init__(self, process_ref: Process):
        self.process = process_ref
        self.passed_results: dict[str, TaskResult] = {}
        self.failed_tasks: set[str] = set()
        self.submitted_tasks: set[str] = set()

    def run(self, parallel: bool, max_workers: int) -> ProcessResult:
        """Execute all tasks in the process using the specified execution mode.

        Parameters
        ----------
        parallel : bool
            If True, execute tasks in parallel; otherwise execute sequentially.
        max_workers : int
            Maximum number of worker threads for parallel execution.

        Returns
        -------
        ProcessResult
            The combined results of all task executions.
        """
        if parallel:
            self._run_parallel(max_workers)
        else:
            self._run_sequential()
        return ProcessResult(self.passed_results, self.failed_tasks)

    def _is_unrunnable(self, task: Task) -> bool:
        """Check if a task cannot be run due to failed dependencies.

        Parameters
        ----------
        task : Task
            The task to check.

        Returns
        -------
        bool
            True if any of the task's dependencies have failed, False otherwise.
            If True, the task is also marked as failed.
        """
        if any(d.task_name in self.failed_tasks for d in task.dependencies):
            self.failed_tasks.add(task.name)  # Propagate failure
            return True
        return False

    def _all_deps_met(self, task: Task) -> bool:
        """Check if all dependencies of a task have been successfully executed.

        Parameters
        ----------
        task : Task
            The task to check.

        Returns
        -------
        bool
            True if all dependencies have passed, False otherwise.
        """
        return all(d.task_name in self.passed_results for d in task.dependencies)

    def _run_sequential(self) -> None:
        """Execute all tasks sequentially in dependency order."""
        for task in self.process.tasks:
            if self._is_unrunnable(task):
                continue
            if self._all_deps_met(task):
                res = task.run(self.process)
                if res.worked:
                    self.passed_results[task.name] = res
                else:
                    self.failed_tasks.add(task.name)

    def _run_parallel(self, max_workers: int) -> None:
        """Execute tasks in parallel using a thread pool while respecting dependencies.

        Parameters
        ----------
        max_workers : int
            Maximum number of worker threads to use.

        Raises
        ------
        RuntimeError
            If execution stalls with no candidates ready and no tasks running.
        """
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            fut_to_name = {}
            while len(self.passed_results) + len(self.failed_tasks) < len(self.process.tasks):
                # Look for candidates to execute now
                candidates = [
                    t
                    for t in self.process.tasks
                    if t.name not in self.submitted_tasks
                    and t.name not in self.failed_tasks
                    and not self._is_unrunnable(t)
                    and self._all_deps_met(t)
                ]

                # Send tasks for execution and register as Task as submitted
                for task in candidates:
                    fut = executor.submit(task.run, self.process)
                    fut_to_name[fut] = task.name
                    self.submitted_tasks.add(task.name)

                # If there are tasks pending, wait. As soon one is completed,
                # save as passed or failed and remove from futures.
                if fut_to_name:
                    done, _ = concurrent.futures.wait(
                        fut_to_name.keys(), return_when="FIRST_COMPLETED"
                    )
                    for fut in done:
                        name = fut_to_name.pop(fut)
                        try:
                            res = fut.result()
                            if res.worked:
                                self.passed_results[name] = res
                            else:
                                self.failed_tasks.add(name)
                        except Exception:
                            self.failed_tasks.add(name)
                else:
                    # No candidates and no running tasks - likely a deadlock or logic error
                    raise RuntimeError(
                        "Parallel execution stalled: no candidates found and no tasks running"
                    )

run(parallel, max_workers)

Execute all tasks in the process using the specified execution mode.

Parameters

parallel : bool If True, execute tasks in parallel; otherwise execute sequentially. max_workers : int Maximum number of worker threads for parallel execution.

Returns

ProcessResult The combined results of all task executions.

Source code in src/processes/process.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def run(self, parallel: bool, max_workers: int) -> ProcessResult:
    """Execute all tasks in the process using the specified execution mode.

    Parameters
    ----------
    parallel : bool
        If True, execute tasks in parallel; otherwise execute sequentially.
    max_workers : int
        Maximum number of worker threads for parallel execution.

    Returns
    -------
    ProcessResult
        The combined results of all task executions.
    """
    if parallel:
        self._run_parallel(max_workers)
    else:
        self._run_sequential()
    return ProcessResult(self.passed_results, self.failed_tasks)

TaskNotFoundError

Bases: Exception

Raised when attempting to retrieve a task that does not exist in the process.

Source code in src/processes/process.py
14
15
16
17
class TaskNotFoundError(Exception):
    """Raised when attempting to retrieve a task that does not exist in the process."""

    pass

Task

A Task represents a unit of work to be executed within a Process.

A Task encapsulates a callable function with its arguments, dependencies on other tasks, and logging configuration. Tasks can be executed, by the Process class, sequentially or in parallel, with automatic dependency resolution and result passing between dependent tasks.

Attributes

name : str Unique name for the task (cannot contain spaces). log_path : str File path where task logs will be written. func : Callable The function to execute when the task runs. args : tuple Positional arguments to pass to the function. Defaults to empty tuple. kwargs : dict Keyword arguments to pass to the function. Defaults to empty dict. dependencies : list[TaskDependency] List of tasks this task depends on. Defaults to empty list. html_mail_handler : HTMLSMTPHandler, optional Handler for sending error logs via email in HTML format. Defaults to None. logger : logging.Logger Logger instance for this task, automatically configured.

Source code in src/processes/task.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
class Task:
    """
    A Task represents a unit of work to be executed within a Process.

    A Task encapsulates a callable function with its arguments, dependencies on other tasks,
    and logging configuration. Tasks can be executed, by the Process class, sequentially
    or in parallel, with automatic dependency resolution and result passing between dependent tasks.

    Attributes
    ----------
    name : str
        Unique name for the task (cannot contain spaces).
    log_path : str
        File path where task logs will be written.
    func : Callable
        The function to execute when the task runs.
    args : tuple
        Positional arguments to pass to the function. Defaults to empty tuple.
    kwargs : dict
        Keyword arguments to pass to the function. Defaults to empty dict.
    dependencies : list[TaskDependency]
        List of tasks this task depends on. Defaults to empty list.
    html_mail_handler : HTMLSMTPHandler, optional
        Handler for sending error logs via email in HTML format. Defaults to None.
    logger : logging.Logger
        Logger instance for this task, automatically configured.
    """

    kwargs: dict[str, Any]
    dependencies: list[TaskDependency]

    def __init__(
        self,
        name: str,
        log_path: str,
        func: Callable[..., Any],
        args: tuple[Any, ...] = (),
        kwargs: dict[str, Any] | None = None,
        dependencies: list[TaskDependency] | None = None,
        html_mail_handler: HTMLSMTPHandler | None = None,
    ):
        self.name = name
        self.log_path = log_path
        self.func = func
        self.args = args
        self.html_mail_handler = html_mail_handler

        if kwargs is None:
            self.kwargs = {}
        else:
            self.kwargs = kwargs
        if dependencies is None:
            self.dependencies = []
        else:
            self.dependencies = dependencies

        self._check_input_types()
        if " " in self.name:
            raise ValueError(f"Task name cannot contain spaces. Got {self.name}")

        depedencies_names = []
        for dependency in self.dependencies:
            if dependency.task_name in depedencies_names:
                raise ValueError(f"Duplicate dependency name: {dependency.task_name}")
            depedencies_names.append(dependency.task_name)
            if dependency.task_name == self.name:
                raise ValueError(
                    f"Got dependency with same name as Task. "
                    f"Task: {self.name}. Dependency: {dependency.task_name}"
                )

        logger = logging.getLogger(self.name)
        logger.setLevel(logging.DEBUG)
        if logger.hasHandlers():
            logger.handlers.clear()

        file_handler = logging.FileHandler(self.log_path)
        file_handler.setLevel(logging.INFO)
        formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)

        if self.html_mail_handler is not None:
            _html_mail_handler = self.html_mail_handler.copy()
            _html_mail_handler.setFormatter(ExceptionHTMLFormatter())
            _html_mail_handler.setLevel(logging.ERROR)
            _html_mail_handler.subject = f"Error in task {self.name}"
            logger.addHandler(_html_mail_handler)

        self.logger = logger

    def _check_input_types(self) -> None:
        """
        Validates all input parameter types.

        Raises
        ------
        TypeError
            If any parameter is not of the expected type.
        """
        if not callable(self.func):
            raise TypeError(f"func must be callable. Got {type(self.func)}")

        if not isinstance(self.args, tuple):
            raise TypeError(f"args must be tuple. Got {type(self.args)}")

        if not isinstance(self.kwargs, dict):
            raise TypeError(f"kwargs must be dict. Got {type(self.kwargs)}")

        if self.html_mail_handler is not None and not isinstance(
            self.html_mail_handler, HTMLSMTPHandler
        ):
            raise TypeError(
                f"mail_cfg must be of type HTMLSMTPHandler. Got {type(self.html_mail_handler)}"
            )

        if not isinstance(self.dependencies, list):
            raise TypeError(f"dependencies must be list. Got {type(self.dependencies)}")

        for dependency in self.dependencies:
            if not isinstance(dependency, TaskDependency):
                raise TypeError(
                    f"dependency must be of type TaskDependency. Got {type(dependency)}"
                )

    def get_dependencies_names(self) -> set[str]:
        """
        Get the names of all tasks this task depends on.

        Returns
        -------
        set[str]
            Set of dependency task names.
        """
        return {dependency.task_name for dependency in self.dependencies}

    def run(self, executing_process: Process | None = None) -> TaskResult:
        """
        Execute the task's function with its arguments and dependencies.

        This method runs the task's function, automatically injecting results from
        dependent tasks as specified in the dependency configuration. Logs the task
        execution and captures any exceptions.

        Parameters
        ----------
        executing_process : Process, optional
            The parent Process executing this task. Used to retrieve results from
            dependent tasks. Defaults to None.

        Returns
        -------
        TaskResult
            Object containing:
            - worked (bool): True if execution succeeded, False otherwise.
            - result: The return value of the function if successful, None if failed.
            - exception (Exception | None): The exception raised if execution failed,
            None if successful.
        """
        final_args = list(self.args)  # Start with original positional args
        final_kwargs = self.kwargs.copy()  # Start with original keyword args

        if executing_process is not None:
            for dep in self.dependencies:
                dep_result = executing_process.runner.passed_results[dep.task_name].result
                if dep.use_result_as_additional_args:
                    final_args.append(dep_result)
                if dep.use_result_as_additional_kwargs:
                    final_kwargs[dep.additional_kwarg_name] = dep_result

        try:
            self.logger.info(f"Starting {self.name}.")
            result = self.func(*final_args, **final_kwargs)
            self.logger.info(f"Finished {self.name}.")
            return TaskResult(True, result, None)
        except Exception as e:
            report = ""
            if executing_process is not None:
                dependencies_names = [
                    d.name for d in executing_process.get_dependant_tasks(self.name)
                ]
                if dependencies_names:
                    report = (
                        "<h3>Downstream Impact</h3><p>The following tasks will be skipped:</p><ul>"
                    )
                    report += "".join(
                        f"<li>{dependency_name}</li>" for dependency_name in dependencies_names
                    )
                    report += "</ul>"
            report += f"<p><b>Context:</b><br>Function: {self.func.__name__}"
            report += f"<br>Args: {self.args}<br>Kwargs: {self.kwargs}</p>"
            self.logger.exception(e, extra={"post_traceback_html_body": report})
            return TaskResult(False, None, e)

get_dependencies_names()

Get the names of all tasks this task depends on.

Returns

set[str] Set of dependency task names.

Source code in src/processes/task.py
235
236
237
238
239
240
241
242
243
244
def get_dependencies_names(self) -> set[str]:
    """
    Get the names of all tasks this task depends on.

    Returns
    -------
    set[str]
        Set of dependency task names.
    """
    return {dependency.task_name for dependency in self.dependencies}

run(executing_process=None)

Execute the task's function with its arguments and dependencies.

This method runs the task's function, automatically injecting results from dependent tasks as specified in the dependency configuration. Logs the task execution and captures any exceptions.

Parameters

executing_process : Process, optional The parent Process executing this task. Used to retrieve results from dependent tasks. Defaults to None.

Returns

TaskResult Object containing: - worked (bool): True if execution succeeded, False otherwise. - result: The return value of the function if successful, None if failed. - exception (Exception | None): The exception raised if execution failed, None if successful.

Source code in src/processes/task.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def run(self, executing_process: Process | None = None) -> TaskResult:
    """
    Execute the task's function with its arguments and dependencies.

    This method runs the task's function, automatically injecting results from
    dependent tasks as specified in the dependency configuration. Logs the task
    execution and captures any exceptions.

    Parameters
    ----------
    executing_process : Process, optional
        The parent Process executing this task. Used to retrieve results from
        dependent tasks. Defaults to None.

    Returns
    -------
    TaskResult
        Object containing:
        - worked (bool): True if execution succeeded, False otherwise.
        - result: The return value of the function if successful, None if failed.
        - exception (Exception | None): The exception raised if execution failed,
        None if successful.
    """
    final_args = list(self.args)  # Start with original positional args
    final_kwargs = self.kwargs.copy()  # Start with original keyword args

    if executing_process is not None:
        for dep in self.dependencies:
            dep_result = executing_process.runner.passed_results[dep.task_name].result
            if dep.use_result_as_additional_args:
                final_args.append(dep_result)
            if dep.use_result_as_additional_kwargs:
                final_kwargs[dep.additional_kwarg_name] = dep_result

    try:
        self.logger.info(f"Starting {self.name}.")
        result = self.func(*final_args, **final_kwargs)
        self.logger.info(f"Finished {self.name}.")
        return TaskResult(True, result, None)
    except Exception as e:
        report = ""
        if executing_process is not None:
            dependencies_names = [
                d.name for d in executing_process.get_dependant_tasks(self.name)
            ]
            if dependencies_names:
                report = (
                    "<h3>Downstream Impact</h3><p>The following tasks will be skipped:</p><ul>"
                )
                report += "".join(
                    f"<li>{dependency_name}</li>" for dependency_name in dependencies_names
                )
                report += "</ul>"
        report += f"<p><b>Context:</b><br>Function: {self.func.__name__}"
        report += f"<br>Args: {self.args}<br>Kwargs: {self.kwargs}</p>"
        self.logger.exception(e, extra={"post_traceback_html_body": report})
        return TaskResult(False, None, e)

TaskDependency

Represents a dependency relationship between tasks.

Defines how a task depends on another task, including how the result of the dependency should be passed to the dependent task (as additional positional arguments, keyword arguments, or both).

Attributes

task_name : str The name of the task this dependency refers to. use_result_as_additional_args : bool If True, the result of the dependency task will be passed as an additional positional argument as the last argument. Defaults to False. use_result_as_additional_kwargs : bool If True, the result of the dependency task will be passed as a keyword argument. Defaults to False. additional_kwarg_name : str | None The name of the keyword argument to use if use_result_as_additional_kwargs is True. Required when use_result_as_additional_kwargs is True. Defaults to None.

Raises

TypeError If any parameter type is invalid or if use_result_as_additional_kwargs is True but additional_kwarg_name is not a string.

Source code in src/processes/task.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class TaskDependency:
    """
    Represents a dependency relationship between tasks.

    Defines how a task depends on another task, including how the result
    of the dependency should be passed to the dependent task (as additional
    positional arguments, keyword arguments, or both).

    Attributes
    ----------
    task_name : str
        The name of the task this dependency refers to.
    use_result_as_additional_args : bool
        If True, the result of the dependency task will be passed as an
        additional positional argument as the last argument. Defaults to False.
    use_result_as_additional_kwargs : bool
        If True, the result of the dependency task will be passed as a
        keyword argument. Defaults to False.
    additional_kwarg_name : str | None
        The name of the keyword argument to use if use_result_as_additional_kwargs
        is True. Required when use_result_as_additional_kwargs is True.
        Defaults to None.

    Raises
    ------
    TypeError
        If any parameter type is invalid or if use_result_as_additional_kwargs
        is True but additional_kwarg_name is not a string.
    """

    def __init__(
        self,
        task_name: str,
        use_result_as_additional_args: bool = False,
        use_result_as_additional_kwargs: bool = False,
        additional_kwarg_name: str = "",
    ):
        self.task_name = task_name
        self.use_result_as_additional_args = use_result_as_additional_args
        self.use_result_as_additional_kwargs = use_result_as_additional_kwargs
        self.additional_kwarg_name = additional_kwarg_name

        if not isinstance(self.task_name, str):
            raise TypeError(f"task_name must be of type str. Got {type(self.task_name)}")
        if not isinstance(self.use_result_as_additional_args, bool):
            raise TypeError(
                f"use_result_as_additional_args must be of type bool. "
                f"Got {type(self.use_result_as_additional_args)}"
            )
        if not isinstance(self.use_result_as_additional_kwargs, bool):
            raise TypeError(
                f"use_result_as_additional_kwargs must be of type bool. "
                f"Got {type(self.use_result_as_additional_kwargs)}"
            )

        if self.use_result_as_additional_kwargs and self.additional_kwarg_name == "":
            raise TypeError(
                "If use_result_as_additional_kwargs is True, additional_kwarg_name"
                " must be a non-empty string."
            )

    def __hash__(self) -> int:
        """
        Return hash of the dependency based on task name.

        Returns
        -------
        int
            Hash value based on the task_name attribute.
        """
        return hash(self.task_name)

__hash__()

Return hash of the dependency based on task name.

Returns

int Hash value based on the task_name attribute.

Source code in src/processes/task.py
 98
 99
100
101
102
103
104
105
106
107
def __hash__(self) -> int:
    """
    Return hash of the dependency based on task name.

    Returns
    -------
    int
        Hash value based on the task_name attribute.
    """
    return hash(self.task_name)

TaskResult

Container for the result of a task execution.

Holds the outcome of running a task, including whether it succeeded, its return value, and any exception that occurred.

Attributes

worked : bool True if the task executed successfully, False if an exception occurred. result : Any The return value of the task's function if execution succeeded, None if failed. exception : Exception | None The exception object if execution failed, None if successful.

Source code in src/processes/task.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class TaskResult:
    """
    Container for the result of a task execution.

    Holds the outcome of running a task, including whether it succeeded,
    its return value, and any exception that occurred.

    Attributes
    ----------
    worked : bool
        True if the task executed successfully, False if an exception occurred.
    result : Any
        The return value of the task's function if execution succeeded, None if failed.
    exception : Exception | None
        The exception object if execution failed, None if successful.
    """

    def __init__(self, worked: bool, result: Any, exception: Exception | None):
        self.worked = worked
        self.result = result
        self.exception = exception