Skip to content

base ¤

Base class for task middlewares.

Classes:

Name Description
TaskMiddleware

Base class for task middlewares. Middlewares are used to wrap a task and perform some actions before and after the task is executed.

TaskMiddleware ¤

TaskMiddleware(
    task: TaskType, kwargs: Dict[str, Any] = None
)

Base class for task middlewares. Middlewares are used to wrap a task and perform some actions before and after the task is executed. Is mainly used for logging, but can be used for other purposes as well. Please see LoggingMiddleware for an example of a middleware. Args: task: The task to wrap kwargs: The keyword arguments to pass to the task Attributes: _task: The task to wrap _kwargs: The keyword arguments to pass to the task

Methods:

Name Description
__call__

Once the middleware is called, it executes the task and returns the result.

_stream

Re-streams the streaming response while adding the after sideeffects execution to the generator

after

Executed after the task is executed. Should be used to perform some actions after the task is executed.

before

Executed before the task is executed. Should be used to perform some actions before the task is executed.

Source code in src/declarai/middleware/base.py
23
24
25
def __init__(self, task: TaskType, kwargs: Dict[str, Any] = None):
    self._task = task
    self._kwargs = kwargs

__call__ ¤

__call__() -> Any

Once the middleware is called, it executes the task and returns the result. Before it executes the task, it calls the before method, and after it executes the task, it calls the after method. Returns: The result of the task

Source code in src/declarai/middleware/base.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __call__(self) -> Any:
    """
    Once the middleware is called, it executes the task and returns the result.
    Before it executes the task, it calls the `before` method, and after it executes the task, it calls the `after` method.
    Returns:
        The result of the task
    """
    self.before(self._task)
    # # If the task is streaming, handle it differently
    if self._task.operator.streaming:
        # Yield chunks from the task, then call the after method
        return self._stream()
    else:
        # Non-streaming tasks can be handled as before
        res = self._task._exec(self._kwargs)
        self.after(self._task)
        return res

_stream ¤

_stream() -> Iterator

Re-streams the streaming response while adding the after sideeffects execution to the generator Returns:

Source code in src/declarai/middleware/base.py
27
28
29
30
31
32
33
34
35
def _stream(self) -> Iterator:
    """
    Re-streams the streaming response while adding the after sideeffects execution to the generator
    Returns:

    """
    for chunk in self._task._exec(self._kwargs):
        yield chunk
    self.after(self._task)

after abstractmethod ¤

after(task: TaskType) -> None

Executed after the task is executed. Should be used to perform some actions after the task is executed. Args: task: the task to execute

Source code in src/declarai/middleware/base.py
63
64
65
66
67
68
69
@abstractmethod
def after(self, task: TaskType) -> None:
    """
    Executed after the task is executed. Should be used to perform some actions after the task is executed.
    Args:
        task: the task to execute
    """

before abstractmethod ¤

before(task: TaskType) -> None

Executed before the task is executed. Should be used to perform some actions before the task is executed. Args: task: the task to execute

Source code in src/declarai/middleware/base.py
55
56
57
58
59
60
61
@abstractmethod
def before(self, task: TaskType) -> None:
    """
    Executed before the task is executed. Should be used to perform some actions before the task is executed.
    Args:
        task: the task to execute
    """