Skip to content

Base event listener driver

logger = Logger(__name__) module-attribute

BaseEventListenerDriver

Bases: ABC

Source code in griptape/drivers/event_listener/base_event_listener_driver.py
@define
class BaseEventListenerDriver(ABC):
    futures_executor: futures.Executor = field(default=Factory(lambda: futures.ThreadPoolExecutor()), kw_only=True)

    def publish_event(self, event: BaseEvent | dict) -> None:
        if isinstance(event, dict):
            self.futures_executor.submit(self._safe_try_publish_event_payload, event)
        else:
            self.futures_executor.submit(self._safe_try_publish_event_payload, event.to_dict())

    @abstractmethod
    def try_publish_event_payload(self, event_payload: dict) -> None:
        ...

    def _safe_try_publish_event_payload(self, event_payload: dict) -> None:
        try:
            self.try_publish_event_payload(event_payload)
        except Exception as e:
            logger.error(e)

futures_executor: futures.Executor = field(default=Factory(lambda: futures.ThreadPoolExecutor()), kw_only=True) class-attribute instance-attribute

publish_event(event)

Source code in griptape/drivers/event_listener/base_event_listener_driver.py
def publish_event(self, event: BaseEvent | dict) -> None:
    if isinstance(event, dict):
        self.futures_executor.submit(self._safe_try_publish_event_payload, event)
    else:
        self.futures_executor.submit(self._safe_try_publish_event_payload, event.to_dict())

try_publish_event_payload(event_payload) abstractmethod

Source code in griptape/drivers/event_listener/base_event_listener_driver.py
@abstractmethod
def try_publish_event_payload(self, event_payload: dict) -> None:
    ...