Skip to content

event_listener

__all__ = ['BaseEventListenerDriver'] module-attribute

BaseEventListenerDriver

Bases: FuturesExecutorMixin, ExponentialBackoffMixin, ABC

Source code in griptape/drivers/event_listener/base_event_listener_driver.py
@define
class BaseEventListenerDriver(FuturesExecutorMixin, ExponentialBackoffMixin, ABC):
    batched: bool = field(default=True, kw_only=True)
    batch_size: int = field(default=10, kw_only=True)

    _batch: list[dict] = field(default=Factory(list), kw_only=True)

    @property
    def batch(self) -> list[dict]:
        return self._batch

    def publish_event(self, event: BaseEvent | dict) -> None:
        event_payload = event if isinstance(event, dict) else event.to_dict()

        with self.create_futures_executor() as futures_executor:
            if self.batched:
                self._batch.append(event_payload)
                if len(self.batch) >= self.batch_size:
                    futures_executor.submit(with_contextvars(self._safe_publish_event_payload_batch), self.batch)
                    self._batch = []
            else:
                futures_executor.submit(with_contextvars(self._safe_publish_event_payload), event_payload)

    def flush_events(self) -> None:
        if self.batch:
            with self.create_futures_executor() as futures_executor:
                futures_executor.submit(with_contextvars(self._safe_publish_event_payload_batch), self.batch)
            self._batch = []

    @abstractmethod
    def try_publish_event_payload(self, event_payload: dict) -> None: ...

    @abstractmethod
    def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None: ...

    def _safe_publish_event_payload(self, event_payload: dict) -> None:
        try:
            for attempt in self.retrying():
                with attempt:
                    self.try_publish_event_payload(event_payload)
        except Exception:
            logger.warning("Failed to publish event after %s attempts", self.max_attempts, exc_info=True)

    def _safe_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None:
        try:
            for attempt in self.retrying():
                with attempt:
                    self.try_publish_event_payload_batch(event_payload_batch)
        except Exception:
            logger.warning("Failed to publish event batch after %s attempts", self.max_attempts, exc_info=True)

batch: list[dict] property

batch_size: int = field(default=10, kw_only=True) class-attribute instance-attribute

batched: bool = field(default=True, kw_only=True) class-attribute instance-attribute

flush_events()

Source code in griptape/drivers/event_listener/base_event_listener_driver.py
def flush_events(self) -> None:
    if self.batch:
        with self.create_futures_executor() as futures_executor:
            futures_executor.submit(with_contextvars(self._safe_publish_event_payload_batch), self.batch)
        self._batch = []

publish_event(event)

Source code in griptape/drivers/event_listener/base_event_listener_driver.py
def publish_event(self, event: BaseEvent | dict) -> None:
    event_payload = event if isinstance(event, dict) else event.to_dict()

    with self.create_futures_executor() as futures_executor:
        if self.batched:
            self._batch.append(event_payload)
            if len(self.batch) >= self.batch_size:
                futures_executor.submit(with_contextvars(self._safe_publish_event_payload_batch), self.batch)
                self._batch = []
        else:
            futures_executor.submit(with_contextvars(self._safe_publish_event_payload), event_payload)

try_publish_event_payload(event_payload) abstractmethod

Source code in griptape/drivers/event_listener/base_event_listener_driver.py
@abstractmethod
def try_publish_event_payload(self, event_payload: dict) -> None: ...

try_publish_event_payload_batch(event_payload_batch) abstractmethod

Source code in griptape/drivers/event_listener/base_event_listener_driver.py
@abstractmethod
def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None: ...