diff --git a/backend/app/engine/loaders/__init__.py b/backend/app/engine/loaders/__init__.py index 241f68a..4f585b4 100644 --- a/backend/app/engine/loaders/__init__.py +++ b/backend/app/engine/loaders/__init__.py @@ -1,40 +1,41 @@ import logging -# import yaml +import yaml +from app.engine.loaders.db import DBLoaderConfig, get_db_documents from app.engine.loaders.file import FileLoaderConfig, get_file_documents from app.engine.loaders.web import WebLoaderConfig, get_web_documents logger = logging.getLogger(__name__) -# 注释掉 load_configs 函数 -# def load_configs(): -# with open("config/loaders.yaml") as f: -# configs = yaml.safe_load(f) -# return configs + +def load_configs(): + with open("config/loaders.yaml") as f: + configs = yaml.safe_load(f) + return configs def get_documents(): documents = [] - # 注释掉对 load_configs 的调用 - # config = load_configs() - # if config is None or len(config.items()) == 0: - # return documents + config = load_configs() - # 使用一个空的 config 替代原有的配置加载逻辑 - config = {} + if config is None or len(config.items()) == 0: + return documents for loader_type, loader_config in config.items(): - logger.info( - f"Loading documents from loader: {loader_type}, config: {loader_config}" - ) + if loader_config.get('enable', True): # 检查 enable 字段 + logger.info( + f"Loading documents from loader: {loader_type}, config: {loader_config}" + ) - loader_config = loader_config or [] - match loader_type: - case "file": - document = get_file_documents(FileLoaderConfig(**loader_config)) - case "web": - document = get_web_documents(WebLoaderConfig(**loader_config)) - case _: - raise ValueError(f"Invalid loader type: {loader_type}") - documents.extend(document) + loader_config = loader_config or [] + match loader_type: + case "file": + document = get_file_documents(FileLoaderConfig(**loader_config)) + case "web": + document = get_web_documents(WebLoaderConfig(**loader_config)) + case "db": + document = get_db_documents(configs=[DBLoaderConfig(**cfg) for cfg in loader_config]) + case _: + raise ValueError(f"Invalid loader type: {loader_type}") + documents.extend(document) - return documents + return documents \ No newline at end of file diff --git a/backend/app/engine/loaders/db.py b/backend/app/engine/loaders/db.py index 0289fb5..00c0381 100644 --- a/backend/app/engine/loaders/db.py +++ b/backend/app/engine/loaders/db.py @@ -1,12 +1,15 @@ import logging from typing import Any, List, Optional -from llama_index.core import Document +from llama_index.core import SQLDatabase, Document +from llama_index.readers.database import DatabaseReader from pydantic import BaseModel +from sqlalchemy import create_engine, text +from sqlalchemy.engine import Engine logger = logging.getLogger(__name__) -class CustomDatabaseReader: +class CustomDatabaseReader(DatabaseReader): """Simple Database reader. Concatenates each row into Document used by LlamaIndex. @@ -39,8 +42,8 @@ class CustomDatabaseReader: def __init__( self, - sql_database: Optional[Any] = None, - engine: Optional[Any] = None, + sql_database: Optional[SQLDatabase] = None, + engine: Optional[Engine] = None, uri: Optional[str] = None, scheme: Optional[str] = None, host: Optional[str] = None, @@ -52,24 +55,51 @@ class CustomDatabaseReader: **kwargs: Any, ) -> None: """Initialize with parameters.""" - # Setting the database-related properties to None - self.sql_database = None - self.uri = None + if sql_database: + self.sql_database = sql_database + elif engine: + self.sql_database = SQLDatabase(engine, *args, **kwargs) + elif uri: + self.uri = uri + self.sql_database = SQLDatabase.from_uri(uri, *args, **kwargs) + elif scheme and host and port and user and password and dbname: + uri = f"{scheme}://{user}:{password}@{host}:{port}/{dbname}" + self.uri = uri + self.sql_database = SQLDatabase.from_uri(uri, *args, **kwargs) + else: + raise ValueError( + "You must provide either a SQLDatabase, " + "a SQL Alchemy Engine, a valid connection URI, or a valid " + "set of credentials." + ) - def load_data(self, query: str, explanation: str) -> List[Document]: - """Simulate loading data without a database connection. + def load_data(self, query: str) -> List[Document]: + """Query and load data from the Database, returning a list of Documents. Args: - query (str): Query parameter (not used). - explanation (str): Explanation to be included in the document. + query (str): Query parameter to filter tables and rows. Returns: List[Document]: A list of Document objects. """ - dco_str = explanation + "\n" - # Simulate data without querying a real database - dco_str += "Simulated column1, Simulated column2\n" - dco_str += "Simulated data1, Simulated data2\n" + dco_str = "" + + with self.sql_database.engine.connect() as connection: + if query is None: + raise ValueError("A query parameter is necessary to filter the data") + else: + result = connection.execute(text(query)) + + dco_str += ", ".join( + [f"{entry}" for entry in result.keys()] + ) + "\n" + + for item in result.fetchall(): + # Fetch each item + record_str = ", ".join( + [f"{entry}" for col, entry in zip(result.keys(), item)] + ) + dco_str += record_str + "\n" doc = Document(text=dco_str) doc.metadata["name"] = query @@ -81,10 +111,10 @@ class DBLoaderConfig(BaseModel): uri: str queries: List[dict] -def get_db_documents(configs: list[DBLoaderConfig]): +def get_db_documents(configs: List[DBLoaderConfig]) -> List[Document]: docs = [] - if len(configs) == 0 or configs[0].uri == "": + if not configs or not configs[0].uri: logger.warning( f"Failed to load database, error message: uri is empty. Return as empty document list." ) @@ -95,13 +125,20 @@ def get_db_documents(configs: list[DBLoaderConfig]): } for entry in configs: - # Skipping the database connection part - loader = CustomDatabaseReader() + engine = create_engine(entry.uri) + sql_database = SQLDatabase(engine) + + loader = CustomDatabaseReader(sql_database) for query_dict in entry.queries: query = query_dict.get("sql", "") explanation = query_dict.get("explanation", "") logger.info(f"Loading data from database with query: {query}") - documents = loader.load_data(query=query, explanation=explanation) + documents = loader.load_data(query=query) - docs.extend(documents) - return docs + # 添加解释到元数据中 + for doc in documents: + doc.metadata["explanation"] = explanation + doc.metadata.update(metadata) # 更新或添加额外的元数据 + docs.append(doc) + + return docs \ No newline at end of file diff --git a/backend/config/loaders.yaml b/backend/config/loaders.yaml index c69c13e..9844715 100644 --- a/backend/config/loaders.yaml +++ b/backend/config/loaders.yaml @@ -1,4 +1,5 @@ file: + enable: true # 添加 enable 字段 # use_llama_parse: Use LlamaParse if `true`. Needs a `LLAMA_CLOUD_API_KEY` from https://cloud.llamaindex.ai set as environment variable use_llama_parse: false @@ -7,27 +8,20 @@ db: # uri: The URI for the database. E.g.: mysql+pymysql://user:password@localhost:3306/db or postgresql+psycopg2://user:password@localhost:5432/db # query: The query to fetch data from the database. E.g.: SELECT * FROM table - uri: mysql+pymysql://zjinfo1:Dy2Bcr53Hm5xRkba@110.42.234.166:3306/zjinfo1 - #- uri: mysql+pymysql://zjinfo:Y6EAjEEdSYmskA8B@110.42.234.166:3306/zjinfo -# - uri: mysql+pymysql://zjinfo2:GSKcziSdBixDXwcd@110.42.234.166:3306/zjinfo2 + enable: true # 添加 enable 字段 queries: - - sql: select * from ProjectProperties limit 30; + - sql: select * from ProjectProperties; explanation: "工程属性表数据,层级关系包含在博微电力造价工程文件格式_ProjectProperties.json文件中。" - - sql: select Id, ParentId, Level, Name, Code, Amount, Amount_Total from TotalCalculateTable; explanation: "总算表数据,层级关系包含在博微电力造价工程文件格式_TotalCalculateTable.json文件中。" - - - sql: select Id, ParentId, Level, SerialNumber, Name, Quantity, Rate, Sum_Price from ProjectDivision where Level = 3 and ProfessionalType = '线路' limit 50; + - sql: select Id, ParentId, Level, SerialNumber, Name, Quantity, Rate, Sum_Price from ProjectDivision where ProfessionalType = '线路'; explanation: "专业类型为线路的项目划分表数据,层级关系包含在博微电力造价工程文件格式_ProjectDivision.json文件中。" - - - sql: select Id, ParentId, Level, SerialNumber, Name, Quantity, Rate, Sum_Price from ProjectDivision where Level = 3 and ProfessionalType = '余物清理' limit 50; + - sql: select Id, ParentId, Level, SerialNumber, Name, Quantity, Rate, Sum_Price from ProjectDivision where ProfessionalType = '余物清理'; explanation: "专业类型为余物清理的项目划分表数据,层级关系包含在博微电力造价工程文件格式_ProjectDivision.json文件中。" - - - sql: select Id, ParentId, Level, SerialNumber, Name, Quantity, Rate, Sum_Price from ProjectDivision where Level = 3 and ProfessionalType = '拆除线路' limit 50; + - sql: select Id, ParentId, Level, SerialNumber, Name, Quantity, Rate, Sum_Price from ProjectDivision where ProfessionalType = '拆除线路'; explanation: "专业类型为拆除线路的项目划分表数据,层级关系包含在博微电力造价工程文件格式_ProjectDivision.json文件中。" - - sql: select Id, ParentId, Level, Name, Code, Rate, Amount from OtherFee; explanation: "其他费用表数据,层级关系包含在博微电力造价工程文件格式_OtherFee.json文件中" - #web: # driver_arguments: # # The arguments to pass to the webdriver. E.g.: add --headless to run in headless mode