Skip to content

Amazon sqs event listener driver

AmazonSqsEventListenerDriver

Bases: BaseEventListenerDriver

Source code in griptape/drivers/event_listener/amazon_sqs_event_listener_driver.py
@define
class AmazonSqsEventListenerDriver(BaseEventListenerDriver):
    queue_url: str = field(kw_only=True)
    session: boto3.Session = field(default=Factory(lambda: import_optional_dependency("boto3").Session()), kw_only=True)
    sqs_client: Any = field(default=Factory(lambda self: self.session.client("sqs"), takes_self=True))

    def try_publish_event_payload(self, event_payload: dict) -> None:
        self.sqs_client.send_message(QueueUrl=self.queue_url, MessageBody=json.dumps(event_payload))

    def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None:
        entries = [
            {"Id": str(event_payload["id"]), "MessageBody": json.dumps(event_payload)}
            for event_payload in event_payload_batch
        ]

        self.sqs_client.send_message_batch(QueueUrl=self.queue_url, Entries=entries)

queue_url: str = field(kw_only=True) class-attribute instance-attribute

session: boto3.Session = field(default=Factory(lambda: import_optional_dependency('boto3').Session()), kw_only=True) class-attribute instance-attribute

sqs_client: Any = field(default=Factory(lambda self: self.session.client('sqs'), takes_self=True)) class-attribute instance-attribute

try_publish_event_payload(event_payload)

Source code in griptape/drivers/event_listener/amazon_sqs_event_listener_driver.py
def try_publish_event_payload(self, event_payload: dict) -> None:
    self.sqs_client.send_message(QueueUrl=self.queue_url, MessageBody=json.dumps(event_payload))

try_publish_event_payload_batch(event_payload_batch)

Source code in griptape/drivers/event_listener/amazon_sqs_event_listener_driver.py
def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None:
    entries = [
        {"Id": str(event_payload["id"]), "MessageBody": json.dumps(event_payload)}
        for event_payload in event_payload_batch
    ]

    self.sqs_client.send_message_batch(QueueUrl=self.queue_url, Entries=entries)