Skip to content

futures_executor_mixin

FuturesExecutorMixin

Bases: ABC

Source code in griptape/mixins/futures_executor_mixin.py
@define(slots=False, kw_only=True)
class FuturesExecutorMixin(ABC):
    futures_executor_fn: Callable[[], futures.Executor] = field(
        default=Factory(lambda: lambda: futures.ThreadPoolExecutor()),
    )

    futures_executor: Optional[futures.Executor] = field(
        default=Factory(lambda self: self.futures_executor_fn(), takes_self=True)
    )

    def __del__(self) -> None:
        executor = self.futures_executor

        if executor is not None:
            self.futures_executor = None

            executor.shutdown(wait=True)

futures_executor: Optional[futures.Executor] = field(default=Factory(lambda self: self.futures_executor_fn(), takes_self=True)) class-attribute instance-attribute

futures_executor_fn: Callable[[], futures.Executor] = field(default=Factory(lambda: lambda: futures.ThreadPoolExecutor())) class-attribute instance-attribute

__del__()

Source code in griptape/mixins/futures_executor_mixin.py
def __del__(self) -> None:
    executor = self.futures_executor

    if executor is not None:
        self.futures_executor = None

        executor.shutdown(wait=True)