Skip to content

sql_driver

SqlDriver

Bases: BaseSqlDriver

Source code in griptape/drivers/sql/sql_driver.py
@define
class SqlDriver(BaseSqlDriver):
    engine_url: str = field(kw_only=True)
    create_engine_params: dict = field(factory=dict, kw_only=True)
    _engine: Engine = field(default=None, kw_only=True, alias="engine", metadata={"serializable": False})

    @lazy_property()
    def engine(self) -> Engine:
        return import_optional_dependency("sqlalchemy").create_engine(self.engine_url, **self.create_engine_params)

    def execute_query(self, query: str) -> Optional[list[BaseSqlDriver.RowResult]]:
        rows = self.execute_query_raw(query)

        if rows:
            return [BaseSqlDriver.RowResult(row) for row in rows]
        else:
            return None

    def execute_query_raw(self, query: str) -> Optional[list[dict[str, Optional[Any]]]]:
        sqlalchemy = import_optional_dependency("sqlalchemy")

        with self.engine.connect() as con:
            results = con.execute(sqlalchemy.text(query))

            if results is not None:
                if results.returns_rows:
                    return [dict(result._mapping) for result in results]
                else:
                    con.commit()
                    return None
            else:
                raise ValueError("No result found")

    def get_table_schema(self, table_name: str, schema: Optional[str] = None) -> Optional[str]:
        sqlalchemy = import_optional_dependency("sqlalchemy")
        sqlalchemy_exc = import_optional_dependency("sqlalchemy.exc")

        try:
            table = sqlalchemy.Table(table_name, sqlalchemy.MetaData(), schema=schema, autoload_with=self.engine)
            return str([(c.name, c.type) for c in table.columns])
        except sqlalchemy_exc.NoSuchTableError:
            return None

create_engine_params: dict = field(factory=dict, kw_only=True) class-attribute instance-attribute

engine_url: str = field(kw_only=True) class-attribute instance-attribute

engine()

Source code in griptape/drivers/sql/sql_driver.py
@lazy_property()
def engine(self) -> Engine:
    return import_optional_dependency("sqlalchemy").create_engine(self.engine_url, **self.create_engine_params)

execute_query(query)

Source code in griptape/drivers/sql/sql_driver.py
def execute_query(self, query: str) -> Optional[list[BaseSqlDriver.RowResult]]:
    rows = self.execute_query_raw(query)

    if rows:
        return [BaseSqlDriver.RowResult(row) for row in rows]
    else:
        return None

execute_query_raw(query)

Source code in griptape/drivers/sql/sql_driver.py
def execute_query_raw(self, query: str) -> Optional[list[dict[str, Optional[Any]]]]:
    sqlalchemy = import_optional_dependency("sqlalchemy")

    with self.engine.connect() as con:
        results = con.execute(sqlalchemy.text(query))

        if results is not None:
            if results.returns_rows:
                return [dict(result._mapping) for result in results]
            else:
                con.commit()
                return None
        else:
            raise ValueError("No result found")

get_table_schema(table_name, schema=None)

Source code in griptape/drivers/sql/sql_driver.py
def get_table_schema(self, table_name: str, schema: Optional[str] = None) -> Optional[str]:
    sqlalchemy = import_optional_dependency("sqlalchemy")
    sqlalchemy_exc = import_optional_dependency("sqlalchemy.exc")

    try:
        table = sqlalchemy.Table(table_name, sqlalchemy.MetaData(), schema=schema, autoload_with=self.engine)
        return str([(c.name, c.type) for c in table.columns])
    except sqlalchemy_exc.NoSuchTableError:
        return None