Bases: FuturesExecutorMixin
, ExponentialBackoffMixin
, ABC
Source code in griptape/drivers/event_listener/base_event_listener_driver.py
| @define
class BaseEventListenerDriver(FuturesExecutorMixin, ExponentialBackoffMixin, ABC):
batched: bool = field(default=True, kw_only=True)
batch_size: int = field(default=10, kw_only=True)
_batch: list[dict] = field(default=Factory(list), kw_only=True)
@property
def batch(self) -> list[dict]:
return self._batch
def publish_event(self, event: BaseEvent | dict) -> None:
event_payload = event if isinstance(event, dict) else event.to_dict()
with self.create_futures_executor() as futures_executor:
if self.batched:
self._batch.append(event_payload)
if len(self.batch) >= self.batch_size:
futures_executor.submit(with_contextvars(self._safe_publish_event_payload_batch), self.batch)
self._batch = []
else:
futures_executor.submit(with_contextvars(self._safe_publish_event_payload), event_payload)
def flush_events(self) -> None:
if self.batch:
with self.create_futures_executor() as futures_executor:
futures_executor.submit(with_contextvars(self._safe_publish_event_payload_batch), self.batch)
self._batch = []
@abstractmethod
def try_publish_event_payload(self, event_payload: dict) -> None: ...
@abstractmethod
def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None: ...
def _safe_publish_event_payload(self, event_payload: dict) -> None:
try:
for attempt in self.retrying():
with attempt:
self.try_publish_event_payload(event_payload)
except Exception:
logger.warning("Failed to publish event after %s attempts", self.max_attempts, exc_info=True)
def _safe_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None:
try:
for attempt in self.retrying():
with attempt:
self.try_publish_event_payload_batch(event_payload_batch)
except Exception:
logger.warning("Failed to publish event batch after %s attempts", self.max_attempts, exc_info=True)
|
batch: list[dict]
property
batch_size: int = field(default=10, kw_only=True)
class-attribute
instance-attribute
batched: bool = field(default=True, kw_only=True)
class-attribute
instance-attribute
flush_events()
Source code in griptape/drivers/event_listener/base_event_listener_driver.py
| def flush_events(self) -> None:
if self.batch:
with self.create_futures_executor() as futures_executor:
futures_executor.submit(with_contextvars(self._safe_publish_event_payload_batch), self.batch)
self._batch = []
|
publish_event(event)
Source code in griptape/drivers/event_listener/base_event_listener_driver.py
| def publish_event(self, event: BaseEvent | dict) -> None:
event_payload = event if isinstance(event, dict) else event.to_dict()
with self.create_futures_executor() as futures_executor:
if self.batched:
self._batch.append(event_payload)
if len(self.batch) >= self.batch_size:
futures_executor.submit(with_contextvars(self._safe_publish_event_payload_batch), self.batch)
self._batch = []
else:
futures_executor.submit(with_contextvars(self._safe_publish_event_payload), event_payload)
|
try_publish_event_payload(event_payload)
abstractmethod
Source code in griptape/drivers/event_listener/base_event_listener_driver.py
| @abstractmethod
def try_publish_event_payload(self, event_payload: dict) -> None: ...
|
try_publish_event_payload_batch(event_payload_batch)
abstractmethod
Source code in griptape/drivers/event_listener/base_event_listener_driver.py
| @abstractmethod
def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None: ...
|