156 lines
5.1 KiB
Python
156 lines
5.1 KiB
Python
import logging
|
|
from typing import Any, List, Optional
|
|
|
|
from llama_index.core import SQLDatabase, Document
|
|
from llama_index.core.objects import SQLTableSchema
|
|
from llama_index.core.readers.base import BaseReader
|
|
from llama_index.readers.database import DatabaseReader
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy import text
|
|
from sqlalchemy.engine import Engine
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class CustomDatabaseReader(BaseReader):
|
|
"""Simple Database reader.
|
|
|
|
Concatenates each row into Document used by LlamaIndex.
|
|
|
|
Args:
|
|
sql_database (Optional[SQLDatabase]): SQL database to use,
|
|
including table names to specify.
|
|
See :ref:`Ref-Struct-Store` for more details.
|
|
|
|
OR
|
|
|
|
engine (Optional[Engine]): SQLAlchemy Engine object of the database connection.
|
|
|
|
OR
|
|
|
|
uri (Optional[str]): uri of the database connection.
|
|
|
|
OR
|
|
|
|
scheme (Optional[str]): scheme of the database connection.
|
|
host (Optional[str]): host of the database connection.
|
|
port (Optional[int]): port of the database connection.
|
|
user (Optional[str]): user of the database connection.
|
|
password (Optional[str]): password of the database connection.
|
|
dbname (Optional[str]): dbname of the database connection.
|
|
|
|
Returns:
|
|
DatabaseReader: A DatabaseReader object.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
sql_database: Optional[SQLDatabase] = None,
|
|
engine: Optional[Engine] = None,
|
|
uri: Optional[str] = None,
|
|
scheme: Optional[str] = None,
|
|
host: Optional[str] = None,
|
|
port: Optional[str] = None,
|
|
user: Optional[str] = None,
|
|
password: Optional[str] = None,
|
|
dbname: Optional[str] = None,
|
|
*args: Any,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
"""Initialize with parameters."""
|
|
if sql_database:
|
|
self.sql_database = sql_database
|
|
elif engine:
|
|
self.sql_database = SQLDatabase(engine, *args, **kwargs)
|
|
elif uri:
|
|
self.uri = uri
|
|
self.sql_database = SQLDatabase.from_uri(uri, *args, **kwargs)
|
|
elif scheme and host and port and user and password and dbname:
|
|
uri = f"{scheme}://{user}:{password}@{host}:{port}/{dbname}"
|
|
self.uri = uri
|
|
self.sql_database = SQLDatabase.from_uri(uri, *args, **kwargs)
|
|
else:
|
|
raise ValueError(
|
|
"You must provide either a SQLDatabase, "
|
|
"a SQL Alchemy Engine, a valid connection URI, or a valid "
|
|
"set of credentials."
|
|
)
|
|
|
|
def load_data(self, query: str) -> List[Document]:
|
|
"""Query and load data from the Database, returning a list of Documents.
|
|
|
|
Args:
|
|
query (str): Query parameter to filter tables and rows.
|
|
|
|
Returns:
|
|
List[Document]: A list of Document objects.
|
|
"""
|
|
dco_str = ""
|
|
with self.sql_database.engine.connect() as connection:
|
|
if query is None:
|
|
raise ValueError("A query parameter is necessary to filter the data")
|
|
else:
|
|
result = connection.execute(text(query))
|
|
|
|
dco_str = ", ".join(
|
|
[f"{entry}" for entry in result.keys()]
|
|
)
|
|
|
|
for item in result.fetchall():
|
|
# fetch each item
|
|
record_str = ", ".join(
|
|
[f"{entry}" for col, entry in zip(result.keys(), item)]
|
|
)
|
|
dco_str += record_str + "\n"
|
|
|
|
doc = Document(text=dco_str)
|
|
doc.metadata["name"] = query
|
|
doc.metadata["context"] = query
|
|
doc.metadata["file_type"] = "application/vnd.ms-excel"
|
|
return [doc]
|
|
|
|
class DBLoaderConfig(BaseModel):
|
|
uri: str
|
|
queries: List[str]
|
|
|
|
def get_db_documents(configs: list[DBLoaderConfig]):
|
|
docs = []
|
|
|
|
if len(configs) == 0 or configs[0].uri == "":
|
|
logger.warning(
|
|
f"Failed to load database, error message: uri is empty. Return as empty document list."
|
|
)
|
|
return docs
|
|
|
|
metadata = {
|
|
#'file_name':'',
|
|
'file_type':'application/booway.document.zj',
|
|
#'file_path':'',
|
|
#'file_size':'',
|
|
#'creation_date':'',
|
|
#'last_modified_date':'',
|
|
}
|
|
|
|
#from llama_index.readers.database import DatabaseReader
|
|
for entry in configs:
|
|
engine = create_engine(entry.uri)
|
|
sql_database = SQLDatabase(engine)
|
|
|
|
# table_schema_objs = makeDescriptionByEngine(sql_database)
|
|
# table_node_mapping = SQLTableNodeMapping(sql_database)
|
|
#
|
|
# nodes = table_node_mapping.to_nodes(table_schema_objs)
|
|
# for node in nodes:
|
|
# node.metadata.update(metadata)
|
|
#
|
|
# docs.extend(nodes)
|
|
|
|
queries = entry.queries or []
|
|
loader = CustomDatabaseReader(sql_database)
|
|
for query in queries:
|
|
logger.info(f"Loading data from database with query: {query}")
|
|
documents = loader.load_data(query=query)
|
|
|
|
docs.extend(documents)
|
|
return docs
|