import logging from typing import Any, List, Optional from llama_index.core import SQLDatabase, Document from llama_index.readers.database import DatabaseReader from pydantic import BaseModel from sqlalchemy import create_engine, text from sqlalchemy.engine import Engine logger = logging.getLogger(__name__) class CustomDatabaseReader(DatabaseReader): """Simple Database reader. Concatenates each row into Document used by LlamaIndex. Args: sql_database (Optional[SQLDatabase]): SQL database to use, including table names to specify. See :ref:`Ref-Struct-Store` for more details. OR engine (Optional[Engine]): SQLAlchemy Engine object of the database connection. OR uri (Optional[str]): uri of the database connection. OR scheme (Optional[str]): scheme of the database connection. host (Optional[str]): host of the database connection. port (Optional[int]): port of the database connection. user (Optional[str]): user of the database connection. password (Optional[str]): password of the database connection. dbname (Optional[str]): dbname of the database connection. Returns: DatabaseReader: A DatabaseReader object. """ def __init__( self, sql_database: Optional[SQLDatabase] = None, engine: Optional[Engine] = None, uri: Optional[str] = None, scheme: Optional[str] = None, host: Optional[str] = None, port: Optional[str] = None, user: Optional[str] = None, password: Optional[str] = None, dbname: Optional[str] = None, *args: Any, **kwargs: Any, ) -> None: """Initialize with parameters.""" if sql_database: self.sql_database = sql_database elif engine: self.sql_database = SQLDatabase(engine, *args, **kwargs) elif uri: self.uri = uri self.sql_database = SQLDatabase.from_uri(uri, *args, **kwargs) elif scheme and host and port and user and password and dbname: uri = f"{scheme}://{user}:{password}@{host}:{port}/{dbname}" self.uri = uri self.sql_database = SQLDatabase.from_uri(uri, *args, **kwargs) else: raise ValueError( "You must provide either a SQLDatabase, " "a SQL Alchemy Engine, a valid connection URI, or a valid " "set of credentials." ) def load_data(self, query: str, explanation: str) -> List[Document]: """Query and load data from the Database, returning a list of Documents. Args: query (str): Query parameter to filter tables and rows. explanation (str): Explanation for the query to be included in the document. Returns: List[Document]: A list of Document objects. """ dco_str = explanation + "\n" with self.sql_database.engine.connect() as connection: if query is None: raise ValueError("A query parameter is necessary to filter the data") else: result = connection.execute(text(query)) dco_str += ", ".join( [f"{entry}" for entry in result.keys()] ) + "\n" for item in result.fetchall(): # Fetch each item record_str = ", ".join( [f"{entry}" for col, entry in zip(result.keys(), item)] ) dco_str += record_str + "\n" doc = Document(text=dco_str) doc.metadata["name"] = query doc.metadata["context"] = query doc.metadata["file_type"] = "application/vnd.ms-excel" return [doc] class DBLoaderConfig(BaseModel): uri: str queries: List[dict] def get_db_documents(configs: list[DBLoaderConfig]): docs = [] if len(configs) == 0 or configs[0].uri == "": logger.warning( f"Failed to load database, error message: uri is empty. Return as empty document list." ) return docs metadata = { 'file_type': 'application/booway.document.zj', } for entry in configs: engine = create_engine(entry.uri) sql_database = SQLDatabase(engine) loader = CustomDatabaseReader(sql_database) for query_dict in entry.queries: query = query_dict.get("sql", "") explanation = query_dict.get("explanation", "") logger.info(f"Loading data from database with query: {query}") documents = loader.load_data(query=query, explanation=explanation) docs.extend(documents) return docs