Bases: BaseEventListenerDriver
Source code in griptape/drivers/event_listener/amazon_sqs_event_listener_driver.py
| @define
class AmazonSqsEventListenerDriver(BaseEventListenerDriver):
queue_url: str = field(kw_only=True)
session: boto3.Session = field(default=Factory(lambda: import_optional_dependency("boto3").Session()), kw_only=True)
_client: SQSClient = field(default=None, kw_only=True, alias="client", metadata={"serializable": False})
@lazy_property()
def client(self) -> SQSClient:
return self.session.client("sqs")
def try_publish_event_payload(self, event_payload: dict) -> None:
self.client.send_message(QueueUrl=self.queue_url, MessageBody=json.dumps(event_payload))
def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None:
entries = [
{"Id": str(event_payload["id"]), "MessageBody": json.dumps(event_payload)}
for event_payload in event_payload_batch
]
self.client.send_message_batch(QueueUrl=self.queue_url, Entries=entries)
|
queue_url: str = field(kw_only=True)
class-attribute
instance-attribute
session: boto3.Session = field(default=Factory(lambda: import_optional_dependency('boto3').Session()), kw_only=True)
class-attribute
instance-attribute
client()
Source code in griptape/drivers/event_listener/amazon_sqs_event_listener_driver.py
| @lazy_property()
def client(self) -> SQSClient:
return self.session.client("sqs")
|
try_publish_event_payload(event_payload)
Source code in griptape/drivers/event_listener/amazon_sqs_event_listener_driver.py
| def try_publish_event_payload(self, event_payload: dict) -> None:
self.client.send_message(QueueUrl=self.queue_url, MessageBody=json.dumps(event_payload))
|
try_publish_event_payload_batch(event_payload_batch)
Source code in griptape/drivers/event_listener/amazon_sqs_event_listener_driver.py
| def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None:
entries = [
{"Id": str(event_payload["id"]), "MessageBody": json.dumps(event_payload)}
for event_payload in event_payload_batch
]
self.client.send_message_batch(QueueUrl=self.queue_url, Entries=entries)
|