Skip to content

pusher

__all__ = ['PusherEventListenerDriver'] module-attribute

PusherEventListenerDriver

Bases: BaseEventListenerDriver

Source code in griptape/drivers/event_listener/pusher_event_listener_driver.py
@define
class PusherEventListenerDriver(BaseEventListenerDriver):
    app_id: str = field(kw_only=True, metadata={"serializable": True})
    key: str = field(kw_only=True, metadata={"serializable": True})
    secret: str = field(kw_only=True, metadata={"serializable": False})
    cluster: str = field(kw_only=True, metadata={"serializable": True})
    channel: str = field(kw_only=True, metadata={"serializable": True})
    event_name: str = field(kw_only=True, metadata={"serializable": True})
    ssl: bool = field(default=True, kw_only=True, metadata={"serializable": True})
    _client: Optional[Pusher] = field(default=None, kw_only=True, alias="client", metadata={"serializable": False})

    @lazy_property()
    def client(self) -> Pusher:
        return import_optional_dependency("pusher").Pusher(
            app_id=self.app_id,
            key=self.key,
            secret=self.secret,
            cluster=self.cluster,
            ssl=self.ssl,
        )

    def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None:
        data = [
            {"channel": self.channel, "name": self.event_name, "data": event_payload}
            for event_payload in event_payload_batch
        ]

        self.client.trigger_batch(data)

    def try_publish_event_payload(self, event_payload: dict) -> None:
        self.client.trigger(channels=self.channel, event_name=self.event_name, data=event_payload)

app_id: str = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

channel: str = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

cluster: str = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

event_name: str = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

key: str = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

secret: str = field(kw_only=True, metadata={'serializable': False}) class-attribute instance-attribute

ssl: bool = field(default=True, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

client()

Source code in griptape/drivers/event_listener/pusher_event_listener_driver.py
@lazy_property()
def client(self) -> Pusher:
    return import_optional_dependency("pusher").Pusher(
        app_id=self.app_id,
        key=self.key,
        secret=self.secret,
        cluster=self.cluster,
        ssl=self.ssl,
    )

try_publish_event_payload(event_payload)

Source code in griptape/drivers/event_listener/pusher_event_listener_driver.py
def try_publish_event_payload(self, event_payload: dict) -> None:
    self.client.trigger(channels=self.channel, event_name=self.event_name, data=event_payload)

try_publish_event_payload_batch(event_payload_batch)

Source code in griptape/drivers/event_listener/pusher_event_listener_driver.py
def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None:
    data = [
        {"channel": self.channel, "name": self.event_name, "data": event_payload}
        for event_payload in event_payload_batch
    ]

    self.client.trigger_batch(data)