Skip to content

Webhook event listener driver

WebhookEventListenerDriver

Bases: BaseEventListenerDriver

Source code in griptape/drivers/event_listener/webhook_event_listener_driver.py
@define
class WebhookEventListenerDriver(BaseEventListenerDriver):
    webhook_url: str = field(kw_only=True)
    headers: dict = field(default=None, kw_only=True)

    def try_publish_event_payload(self, event_payload: dict) -> None:
        response = requests.post(url=self.webhook_url, json=event_payload, headers=self.headers)
        response.raise_for_status()

    def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None:
        response = requests.post(url=self.webhook_url, json=event_payload_batch, headers=self.headers)
        response.raise_for_status()

headers: dict = field(default=None, kw_only=True) class-attribute instance-attribute

webhook_url: str = field(kw_only=True) class-attribute instance-attribute

try_publish_event_payload(event_payload)

Source code in griptape/drivers/event_listener/webhook_event_listener_driver.py
def try_publish_event_payload(self, event_payload: dict) -> None:
    response = requests.post(url=self.webhook_url, json=event_payload, headers=self.headers)
    response.raise_for_status()

try_publish_event_payload_batch(event_payload_batch)

Source code in griptape/drivers/event_listener/webhook_event_listener_driver.py
def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None:
    response = requests.post(url=self.webhook_url, json=event_payload_batch, headers=self.headers)
    response.raise_for_status()