# 添加FastAPI相关导入 from fastapi import FastAPI, HTTPException, Query from pydantic import BaseModel from typing import List, Optional, Dict, Any import uvicorn import asyncio import aiosqlite import sqlite3 import sys import os # 导入ExcelToSQLiteProcessor类 sys.path.append(os.getcwd()) from rag2_0.demo.create_qingdan_dinge_database import ExcelToSQLiteProcessor # 导入向量检索相关类 from rag2_0.tool.ModelTool import XinferenceEmbeddings from langchain_community.vectorstores import SQLiteVSS from rag2_0.tool.APIKeyManager import APIKeyManager # 创建FastAPI应用 app = FastAPI(title="清单定额库查询API", description="提供清单和定额信息查询接口") TOP_K = 100 # 响应模型 class QueryResponse(BaseModel): success: bool message: str data: Optional[List[Dict[str, Any]]] = None # 批量查询请求模型 class DingEInfoList(BaseModel): dinge_code_list: List[str] = [] dinge_name_list: List[str] = [] class QingDanInfo(BaseModel): qingdan_code_list: List[str] = [] qingdan_name_list: List[str] = [] class DingEQingDanInfo(BaseModel): dinge_info_list: DingEInfoList qingdan_info: QingDanInfo class BatchQueryRequest(BaseModel): dinge_qingdan_info: DingEQingDanInfo scope: Optional[str] = Query(None, description="适用范围") # 批量查询响应模型 class BatchQueryResponse(BaseModel): success: bool message: str dinge_data: Optional[List[Dict[str, Any]]] = None qingdan_data: Optional[List[Dict[str, Any]]] = None # 封装查询数据的相关代码 class QingDanDingEQueryService: def __init__(self): self.db_path = f"{os.getcwd()}/data/db/qingdan_ding_e_ku.db" self.top_k = TOP_K # 初始化向量检索相关组件 self.embedding_function = XinferenceEmbeddings() # 初始化向量数据库连接 self.ding_e_vector_db = SQLiteVSS( table="embeding_ding_e_zimu_name", connection=None, embedding=self.embedding_function, db_file=self.db_path ) self.qing_dan_vector_db = SQLiteVSS( table="embeding_qd_zimu_name", connection=None, embedding=self.embedding_function, db_file=self.db_path ) async def get_similar_names_by_vector(self, query_text:str, vector_db:SQLiteVSS, field_map:dict, top_k:int=3, scope:str=None): """使用向量检索获取相似名称(异步包装)""" try: # 使用线程池包装同步向量检索,避免阻塞事件循环 results = await asyncio.to_thread( vector_db.similarity_search_with_score, query_text, 30 ) # 提取结果中的元数据 similar_items = [] for doc, score in results: if scope and scope not in doc.metadata[field_map["适用范围"]]: continue metadata = doc.metadata # 添加相似度分数 metadata['similarity_score'] = float(score) similar_items.append(metadata) # 分数越小越相似(SQLiteVSS 多为距离),已有代码按升序排序 similar_items.sort(key=lambda x: x['similarity_score']) return similar_items[:top_k] except Exception as e: print(f"向量检索出错: {str(e)}") return [] async def get_db_connection(self): """获取数据库连接(aiosqlite 异步)""" conn = await aiosqlite.connect(self.db_path) conn.row_factory = sqlite3.Row # 设置行工厂,使结果可以通过列名访问 return conn def create_reverse_field_map(self): """创建字段反向映射(数据库字段名 -> 中文字段名)""" # 定额库字段反向映射 ding_e_reverse_map = {v: k for k, v in ExcelToSQLiteProcessor.ding_e_field_map.items()} # 清单库字段反向映射 qing_dan_reverse_map = {v: k for k, v in ExcelToSQLiteProcessor.qing_dan_field_map.items()} return ding_e_reverse_map, qing_dan_reverse_map def convert_field_names_to_chinese(self, data_list, reverse_map): """转换字段名称为中文""" result = [] for item in data_list: chinese_item = {} for field_name, value in item.items(): # 如果字段名在反向映射中存在,则使用中文名称 chinese_field_name = reverse_map.get(field_name, field_name) chinese_item[chinese_field_name] = value result.append(chinese_item) return result def sort_results_by_exact_match(self, data_list, search_term, field_name): """对查询结果进行排序,将完全匹配的结果排在前面""" exact_matches = [] partial_matches = [] for item in data_list: # 检查是否为完全匹配 if search_term.upper() == str(item[field_name]).upper(): exact_matches.append(item) else: partial_matches.append(item) # 合并结果,完全匹配的排在前面 return exact_matches + partial_matches async def query_ding_e_by_name(self, name, scope=None): """根据定额名称查询定额子目表中详情信息,使用向量检索扩大查询范围""" try: # 获取表名和字段映射 zimu_table = ExcelToSQLiteProcessor.ding_e_table_names["定额子目"] mulu_table = ExcelToSQLiteProcessor.ding_e_table_names["定额目录"] attr_table = ExcelToSQLiteProcessor.ding_e_table_names["定额资源库属性"] field_map = ExcelToSQLiteProcessor.ding_e_field_map # 1. 先使用向量检索获取相似名称 similar_items = await self.get_similar_names_by_vector(query_text=name, vector_db=self.ding_e_vector_db, field_map=field_map, scope=scope) similar_names = [item[field_map['名称']] for item in similar_items] # 构建查询条件,始终包含原始名称的模糊匹配 like_conditions = [f"zimu.{field_map['名称']} LIKE ?"] params = [f'%{name}%'] # 如果有向量检索结果,添加这些结果的模糊匹配条件 for similar_name in similar_names: like_conditions.append(f"zimu.{field_map['名称']} LIKE ?") params.append(f'%{similar_name}%') # 将所有条件用OR连接 like_conditions_str = " OR ".join(like_conditions) like_conditions_str= f"({like_conditions_str})" query = f""" SELECT zimu.*, mulu.{field_map['名称']} as mulu_name, attr.{field_map['发布时间']} as attr_pub_time, attr.{field_map['适用范围']} as attr_scope FROM {zimu_table} zimu LEFT JOIN {mulu_table} mulu ON zimu.{field_map['章节码']} = mulu.{field_map['章节码']} AND zimu.{field_map['资源库名称']} = mulu.{field_map['资源库名称']} LEFT JOIN {attr_table} attr ON zimu.{field_map['资源库名称']} = attr.{field_map['资源库名称']} WHERE {like_conditions_str} """ # 如果提供了适用范围,添加过滤条件 if scope: query += f" AND attr.{field_map['适用范围']} LIKE ?" params.append(f'%{scope}%') async with await self.get_db_connection() as conn: cursor = await conn.execute(query, params) # 获取结果 results = await cursor.fetchall() data = [dict(row) for row in results] # 对结果进行排序,将全字匹配的排在前面 data = self.sort_results_by_exact_match(data, name, field_map['名称']) data = data[:self.top_k] if not data: return {"success": True, "message": "未找到匹配的定额信息", "data": []} # 创建反向映射并转换字段名为中文 ding_e_reverse_map, _ = self.create_reverse_field_map() # 添加自定义字段映射 ding_e_reverse_map['mulu_name'] = '目录名称' ding_e_reverse_map['attr_pub_time'] = '发布时间' ding_e_reverse_map['attr_scope'] = '适用范围' chinese_data = self.convert_field_names_to_chinese(data, ding_e_reverse_map) return {"success": True, "message": "查询成功", "data": chinese_data} except Exception as e: return {"success": False, "message": f"查询出错: {str(e)}"} async def query_ding_e_by_code(self, code, scope=None): """根据定额编码查询定额子目表中详情信息""" try: code = code.upper() # 获取表名和字段映射 zimu_table = ExcelToSQLiteProcessor.ding_e_table_names["定额子目"] mulu_table = ExcelToSQLiteProcessor.ding_e_table_names["定额目录"] attr_table = ExcelToSQLiteProcessor.ding_e_table_names["定额资源库属性"] field_map = ExcelToSQLiteProcessor.ding_e_field_map # 构建连表查询SQL query = f""" SELECT zimu.*, mulu.{field_map['名称']} as mulu_name, attr.{field_map['发布时间']} as attr_pub_time, attr.{field_map['适用范围']} as attr_scope FROM {zimu_table} zimu LEFT JOIN {mulu_table} mulu ON zimu.{field_map['章节码']} = mulu.{field_map['章节码']} AND zimu.{field_map['资源库名称']} = mulu.{field_map['资源库名称']} LEFT JOIN {attr_table} attr ON zimu.{field_map['资源库名称']} = attr.{field_map['资源库名称']} WHERE zimu.{field_map['编码']} LIKE ? """ params = [f'%{code}%'] # 如果提供了适用范围,添加过滤条件 if scope: query += f" AND attr.{field_map['适用范围']} LIKE ?" params.append(f'%{scope}%') async with await self.get_db_connection() as conn: cursor = await conn.execute(query, params) # 获取结果 results = await cursor.fetchall() data = [dict(row) for row in results] # 对结果进行排序,将全字匹配的排在前面 data = self.sort_results_by_exact_match(data, code, field_map['编码']) data = data[:self.top_k] if not data: return {"success": True, "message": "未找到匹配的定额信息", "data": []} # 创建反向映射并转换字段名为中文 ding_e_reverse_map, _ = self.create_reverse_field_map() # 添加自定义字段映射 ding_e_reverse_map['mulu_name'] = '目录名称' ding_e_reverse_map['attr_pub_time'] = '发布时间' ding_e_reverse_map['attr_scope'] = '适用范围' chinese_data = self.convert_field_names_to_chinese(data, ding_e_reverse_map) return {"success": True, "message": "查询成功", "data": chinese_data} except Exception as e: return {"success": False, "message": f"查询出错: {str(e)}"} async def query_qing_dan_by_name(self, name, scope=None): """根据清单名称查询清单子目表中详情信息,使用向量检索扩大查询范围""" try: # 获取表名和字段映射 zimu_table = ExcelToSQLiteProcessor.qing_dan_table_names["清单子目"] mulu_table = ExcelToSQLiteProcessor.qing_dan_table_names["清单目录"] attr_table = ExcelToSQLiteProcessor.qing_dan_table_names["资源库属性"] field_map = ExcelToSQLiteProcessor.qing_dan_field_map # 1. 先使用向量检索获取相似名称 similar_items = await self.get_similar_names_by_vector(query_text=name, vector_db=self.qing_dan_vector_db, field_map=field_map, scope=scope) similar_names = [item['mc'] for item in similar_items] # 构建查询条件,始终包含原始名称的模糊匹配 like_conditions = [f"zimu.{field_map['名称']} LIKE ?"] params = [f'%{name}%'] # 如果有向量检索结果,添加这些结果的模糊匹配条件 for similar_name in similar_names: like_conditions.append(f"zimu.{field_map['名称']} LIKE ?") params.append(f'%{similar_name}%') # 将所有条件用OR连接 like_conditions_str = " OR ".join(like_conditions) like_conditions_str= f"({like_conditions_str})" query = f""" SELECT zimu.*, mulu.{field_map['名称']} as mulu_name, attr.{field_map['发布时间']} as attr_pub_time, attr.{field_map['适用范围']} as attr_scope FROM {zimu_table} zimu LEFT JOIN {mulu_table} mulu ON zimu.{field_map['章节码']} = mulu.{field_map['章节码']} AND zimu.{field_map['资源库名称']} = mulu.{field_map['资源库名称']} LEFT JOIN {attr_table} attr ON zimu.{field_map['资源库名称']} = attr.{field_map['资源库名称']} WHERE {like_conditions_str} """ # 如果提供了适用范围,添加过滤条件 if scope: query += f" AND attr.{field_map['适用范围']} LIKE ?" params.append(f'%{scope}%') async with await self.get_db_connection() as conn: cursor = await conn.execute(query, params) # 获取结果 results = await cursor.fetchall() data = [dict(row) for row in results] # 对结果进行排序,将全字匹配的排在前面 data = self.sort_results_by_exact_match(data, name, field_map['名称']) data = data[:self.top_k] if not data: return {"success": True, "message": "未找到匹配的清单信息", "data": []} # 创建反向映射并转换字段名为中文 _, qing_dan_reverse_map = self.create_reverse_field_map() # 添加自定义字段映射 qing_dan_reverse_map['mulu_name'] = '目录名称' qing_dan_reverse_map['attr_pub_time'] = '发布时间' qing_dan_reverse_map['attr_scope'] = '适用范围' chinese_data = self.convert_field_names_to_chinese(data, qing_dan_reverse_map) return {"success": True, "message": "查询成功", "data": chinese_data} except Exception as e: return {"success": False, "message": f"查询出错: {str(e)}"} async def query_qing_dan_by_code(self, code, scope=None): """根据清单编码查询清单子目表中详情信息""" try: code = code.upper() # 获取表名和字段映射 zimu_table = ExcelToSQLiteProcessor.qing_dan_table_names["清单子目"] mulu_table = ExcelToSQLiteProcessor.qing_dan_table_names["清单目录"] attr_table = ExcelToSQLiteProcessor.qing_dan_table_names["资源库属性"] field_map = ExcelToSQLiteProcessor.qing_dan_field_map # 构建连表查询SQL query = f""" SELECT zimu.*, mulu.{field_map['名称']} as mulu_name, attr.{field_map['发布时间']} as attr_pub_time, attr.{field_map['适用范围']} as attr_scope FROM {zimu_table} zimu LEFT JOIN {mulu_table} mulu ON zimu.{field_map['章节码']} = mulu.{field_map['章节码']} AND zimu.{field_map['资源库名称']} = mulu.{field_map['资源库名称']} LEFT JOIN {attr_table} attr ON zimu.{field_map['资源库名称']} = attr.{field_map['资源库名称']} WHERE zimu.{field_map['编码']} LIKE ? """ params = [f'%{code}%'] # 如果提供了适用范围,添加过滤条件 if scope: query += f" AND attr.{field_map['适用范围']} LIKE ?" params.append(f'%{scope}%') async with await self.get_db_connection() as conn: cursor = await conn.execute(query, params) # 获取结果 results = await cursor.fetchall() data = [dict(row) for row in results] # 对结果进行排序,将全字匹配的排在前面 data = self.sort_results_by_exact_match(data, code, field_map['编码']) data = data[:self.top_k] if not data: return {"success": True, "message": "未找到匹配的清单信息", "data": []} # 创建反向映射并转换字段名为中文 _, qing_dan_reverse_map = self.create_reverse_field_map() # 添加自定义字段映射 qing_dan_reverse_map['mulu_name'] = '目录名称' qing_dan_reverse_map['attr_pub_time'] = '发布时间' qing_dan_reverse_map['attr_scope'] = '适用范围' chinese_data = self.convert_field_names_to_chinese(data, qing_dan_reverse_map) return {"success": True, "message": "查询成功", "data": chinese_data} except Exception as e: return {"success": False, "message": f"查询出错: {str(e)}"} async def batch_query(self, requests:BatchQueryRequest): """批量查询接口,支持向量检索(并发执行)""" dinge_results = [] qingdan_results = [] tracking_dict = {} # 用于跟踪已查询过的项目,避免重复 try: # 获取查询信息 dinge_info = requests.dinge_qingdan_info.dinge_info_list qingdan_info = requests.dinge_qingdan_info.qingdan_info scope = requests.scope dinge_tasks = [] qingdan_tasks = [] # 处理定额编码查询 for code in dinge_info.dinge_code_list or []: key = f"dinge_code_{code}_{scope}" if key not in tracking_dict: dinge_tasks.append(self.query_ding_e_by_code(code, scope)) tracking_dict[key] = True # 处理定额名称查询 for name in dinge_info.dinge_name_list or []: key = f"dinge_name_{name}_{scope}" if key not in tracking_dict: dinge_tasks.append(self.query_ding_e_by_name(name, scope)) tracking_dict[key] = True # 处理清单编码查询 for code in qingdan_info.qingdan_code_list or []: key = f"qingdan_code_{code}_{scope}" if key not in tracking_dict: qingdan_tasks.append(self.query_qing_dan_by_code(code, scope)) tracking_dict[key] = True # 处理清单名称查询 for name in qingdan_info.qingdan_name_list or []: key = f"qingdan_name_{name}_{scope}" if key not in tracking_dict: qingdan_tasks.append(self.query_qing_dan_by_name(name, scope)) tracking_dict[key] = True # 并发执行 if dinge_tasks: dinge_outs = await asyncio.gather(*dinge_tasks) for result in dinge_outs: if result and result.get("success") and result.get("data"): dinge_results.extend(result["data"]) if qingdan_tasks: qingdan_outs = await asyncio.gather(*qingdan_tasks) for result in qingdan_outs: if result and result.get("success") and result.get("data"): qingdan_results.extend(result["data"]) # 限制返回结果数量 dinge_results = dinge_results[:self.top_k] qingdan_results = qingdan_results[:self.top_k] if not dinge_results and not qingdan_results: return { "success": True, "message": "未找到匹配信息", "dinge_data": [], "qingdan_data": [] } return { "success": True, "message": "查询成功", "dinge_data": dinge_results, "qingdan_data": qingdan_results } except Exception as e: return {"success": False, "message": f"批量查询出错: {str(e)}", "dinge_data": [], "qingdan_data": []} # 创建查询服务实例 query_service = QingDanDingEQueryService() # 1. 根据定额名称查询定额子目表中详情信息(包含资源库属性和目录信息) @app.get("/api/ding_e/by_name", response_model=QueryResponse) async def query_ding_e_by_name( name: str = Query(..., description="定额名称"), scope: Optional[str] = Query(None, description="适用范围") ): result = await query_service.query_ding_e_by_name(name, scope) if not result["success"]: raise HTTPException(status_code=500, detail=result["message"]) return QueryResponse(**result) # 2. 根据定额编码查询定额子目表中详情信息(包含资源库属性和目录信息) @app.get("/api/ding_e/by_code", response_model=QueryResponse) async def query_ding_e_by_code( code: str = Query(..., description="定额编码"), scope: Optional[str] = Query(None, description="适用范围") ): result = await query_service.query_ding_e_by_code(code, scope) if not result["success"]: raise HTTPException(status_code=500, detail=result["message"]) return QueryResponse(**result) # 3. 根据清单名称查询清单子目表中详情信息(包含资源库属性和目录信息) @app.get("/api/qing_dan/by_name", response_model=QueryResponse) async def query_qing_dan_by_name( name: str = Query(..., description="清单名称"), scope: Optional[str] = Query(None, description="适用范围") ): result = await query_service.query_qing_dan_by_name(name, scope) if not result["success"]: raise HTTPException(status_code=500, detail=result["message"]) return QueryResponse(**result) # 4. 根据清单编码查询清单子目表中详情信息(包含资源库属性和目录信息) @app.get("/api/qing_dan/by_code", response_model=QueryResponse) async def query_qing_dan_by_code( code: str = Query(..., description="清单编码"), scope: Optional[str] = Query(None, description="适用范围") ): result = await query_service.query_qing_dan_by_code(code, scope) if not result["success"]: raise HTTPException(status_code=500, detail=result["message"]) return QueryResponse(**result) # 5. 批量查询定额和清单信息 @app.post("/api/batch_query", response_model=BatchQueryResponse) async def batch_query(request: BatchQueryRequest): result = await query_service.batch_query(request) if not result["success"]: raise HTTPException(status_code=500, detail=result["message"]) return BatchQueryResponse(**result) # 启动服务器的函数 def start_api_server(): """启动FastAPI服务器""" uvicorn.run(app, host="0.0.0.0", port=8005) # 主函数 def main(): """主函数""" print("正在启动API服务器...") start_api_server() if __name__ == "__main__": main() # uvicorn rag2_0.api.query_dinge_qingdan_api:app --host 0.0.0.0 --port 8005 --workers 10