Files
zjdataai-app/backend/app/engine/loaders/db.py
T
2024-08-28 09:45:01 +08:00

144 lines
4.8 KiB
Python

import logging
from typing import Any, List, Optional
from llama_index.core import SQLDatabase, Document
from llama_index.readers.database import DatabaseReader
from pydantic import BaseModel
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
logger = logging.getLogger(__name__)
class CustomDatabaseReader(DatabaseReader):
"""Simple Database reader.
Concatenates each row into Document used by LlamaIndex.
Args:
sql_database (Optional[SQLDatabase]): SQL database to use,
including table names to specify.
See :ref:`Ref-Struct-Store` for more details.
OR
engine (Optional[Engine]): SQLAlchemy Engine object of the database connection.
OR
uri (Optional[str]): uri of the database connection.
OR
scheme (Optional[str]): scheme of the database connection.
host (Optional[str]): host of the database connection.
port (Optional[int]): port of the database connection.
user (Optional[str]): user of the database connection.
password (Optional[str]): password of the database connection.
dbname (Optional[str]): dbname of the database connection.
Returns:
DatabaseReader: A DatabaseReader object.
"""
def __init__(
self,
sql_database: Optional[SQLDatabase] = None,
engine: Optional[Engine] = None,
uri: Optional[str] = None,
scheme: Optional[str] = None,
host: Optional[str] = None,
port: Optional[str] = None,
user: Optional[str] = None,
password: Optional[str] = None,
dbname: Optional[str] = None,
*args: Any,
**kwargs: Any,
) -> None:
"""Initialize with parameters."""
if sql_database:
self.sql_database = sql_database
elif engine:
self.sql_database = SQLDatabase(engine, *args, **kwargs)
elif uri:
self.uri = uri
self.sql_database = SQLDatabase.from_uri(uri, *args, **kwargs)
elif scheme and host and port and user and password and dbname:
uri = f"{scheme}://{user}:{password}@{host}:{port}/{dbname}"
self.uri = uri
self.sql_database = SQLDatabase.from_uri(uri, *args, **kwargs)
else:
raise ValueError(
"You must provide either a SQLDatabase, "
"a SQL Alchemy Engine, a valid connection URI, or a valid "
"set of credentials."
)
def load_data(self, query: str) -> List[Document]:
"""Query and load data from the Database, returning a list of Documents.
Args:
query (str): Query parameter to filter tables and rows.
Returns:
List[Document]: A list of Document objects.
"""
dco_str = ""
with self.sql_database.engine.connect() as connection:
if query is None:
raise ValueError("A query parameter is necessary to filter the data")
else:
result = connection.execute(text(query))
dco_str += ", ".join(
[f"{entry}" for entry in result.keys()]
) + "\n"
for item in result.fetchall():
# Fetch each item
record_str = ", ".join(
[f"{entry}" for col, entry in zip(result.keys(), item)]
)
dco_str += record_str + "\n"
doc = Document(text=dco_str)
doc.metadata["name"] = query
doc.metadata["context"] = query
doc.metadata["file_type"] = "application/vnd.ms-excel"
return [doc]
class DBLoaderConfig(BaseModel):
uri: str
queries: List[dict]
def get_db_documents(configs: List[DBLoaderConfig]) -> List[Document]:
docs = []
if not configs or not configs[0].uri:
logger.warning(
f"Failed to load database, error message: uri is empty. Return as empty document list."
)
return docs
metadata = {
'file_type': 'application/booway.document.zj',
}
for entry in configs:
engine = create_engine(entry.uri)
sql_database = SQLDatabase(engine)
loader = CustomDatabaseReader(sql_database)
for query_dict in entry.queries:
query = query_dict.get("sql", "")
explanation = query_dict.get("explanation", "")
logger.info(f"Loading data from database with query: {query}")
documents = loader.load_data(query=query)
# 添加解释到元数据中
for doc in documents:
doc.metadata["explanation"] = explanation
doc.metadata.update(metadata) # 更新或添加额外的元数据
docs.append(doc)
return docs