1a3fa44522
新增清单定额查询API服务,支持通过名称和编码查询定额及清单信息 在意图识别模块中添加定额清单信息提取功能,并记录各步骤耗时 将SiliconFlowEmbeddings替换为XinferenceEmbeddings并添加sqlite-vss依赖 优化shell脚本的screen会话检测逻辑
567 lines
24 KiB
Python
567 lines
24 KiB
Python
# 添加FastAPI相关导入
|
|
from fastapi import FastAPI, HTTPException, Query
|
|
from pydantic import BaseModel
|
|
from typing import List, Optional, Dict, Any
|
|
import uvicorn
|
|
import sqlite3
|
|
import sys
|
|
import os
|
|
|
|
# 导入ExcelToSQLiteProcessor类
|
|
sys.path.append(os.getcwd())
|
|
from rag2_0.demo.create_qingdan_dinge_database import ExcelToSQLiteProcessor
|
|
# 导入向量检索相关类
|
|
from rag2_0.tool.ModelTool import XinferenceEmbeddings
|
|
from langchain_community.vectorstores import SQLiteVSS
|
|
from rag2_0.tool.APIKeyManager import APIKeyManager
|
|
|
|
# 创建FastAPI应用
|
|
app = FastAPI(title="清单定额库查询API", description="提供清单和定额信息查询接口")
|
|
TOP_K = 100
|
|
|
|
# 响应模型
|
|
class QueryResponse(BaseModel):
|
|
success: bool
|
|
message: str
|
|
data: Optional[List[Dict[str, Any]]] = None
|
|
|
|
# 批量查询请求模型
|
|
class DingEInfoList(BaseModel):
|
|
dinge_code_list: List[str] = []
|
|
dinge_name_list: List[str] = []
|
|
|
|
class QingDanInfo(BaseModel):
|
|
qingdan_code_list: List[str] = []
|
|
qingdan_name_list: List[str] = []
|
|
|
|
class DingEQingDanInfo(BaseModel):
|
|
dinge_info_list: DingEInfoList
|
|
qingdan_info: QingDanInfo
|
|
|
|
class BatchQueryRequest(BaseModel):
|
|
dinge_qingdan_info: DingEQingDanInfo
|
|
scope: Optional[str] = Query(None, description="适用范围")
|
|
|
|
# 批量查询响应模型
|
|
class BatchQueryResponse(BaseModel):
|
|
success: bool
|
|
message: str
|
|
dinge_data: Optional[List[Dict[str, Any]]] = None
|
|
qingdan_data: Optional[List[Dict[str, Any]]] = None
|
|
|
|
# 封装查询数据的相关代码
|
|
class QingDanDingEQueryService:
|
|
def __init__(self, db_path="/data/QueryRewrite/data/db/qingdan_ding_e_ku.db"):
|
|
self.db_path = db_path
|
|
self.top_k = TOP_K
|
|
|
|
# 初始化向量检索相关组件
|
|
self.embedding_function = XinferenceEmbeddings(api_key="")
|
|
|
|
# 初始化向量数据库连接
|
|
self.ding_e_vector_db = SQLiteVSS(
|
|
table="embeding_ding_e_zimu_name",
|
|
connection=None,
|
|
embedding=self.embedding_function,
|
|
db_file=self.db_path
|
|
)
|
|
|
|
self.qing_dan_vector_db = SQLiteVSS(
|
|
table="embeding_qd_zimu_name",
|
|
connection=None,
|
|
embedding=self.embedding_function,
|
|
db_file=self.db_path
|
|
)
|
|
|
|
def get_similar_names_by_vector(self, query_text:str, vector_db:SQLiteVSS, field_map:dict, top_k:int=3, scope:str=None):
|
|
"""使用向量检索获取相似名称"""
|
|
try:
|
|
# 使用向量数据库进行相似性搜索
|
|
results = vector_db.similarity_search_with_score(query=query_text, k=30)
|
|
|
|
# 提取结果中的元数据
|
|
similar_items = []
|
|
for doc, score in results:
|
|
if scope and scope not in doc.metadata[field_map["适用范围"]]:
|
|
continue
|
|
|
|
metadata = doc.metadata
|
|
# 添加相似度分数
|
|
metadata['similarity_score'] = float(score)
|
|
similar_items.append(metadata)
|
|
|
|
# 按相似度分数排序,分数高的排前面
|
|
similar_items.sort(key=lambda x: x['similarity_score'])
|
|
return similar_items[:top_k]
|
|
except Exception as e:
|
|
print(f"向量检索出错: {str(e)}")
|
|
return []
|
|
|
|
def get_db_connection(self):
|
|
"""获取数据库连接"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
conn.row_factory = sqlite3.Row # 设置行工厂,使结果可以通过列名访问
|
|
return conn
|
|
|
|
def create_reverse_field_map(self):
|
|
"""创建字段反向映射(数据库字段名 -> 中文字段名)"""
|
|
# 定额库字段反向映射
|
|
ding_e_reverse_map = {v: k for k, v in ExcelToSQLiteProcessor.ding_e_field_map.items()}
|
|
# 清单库字段反向映射
|
|
qing_dan_reverse_map = {v: k for k, v in ExcelToSQLiteProcessor.qing_dan_field_map.items()}
|
|
return ding_e_reverse_map, qing_dan_reverse_map
|
|
|
|
def convert_field_names_to_chinese(self, data_list, reverse_map):
|
|
"""转换字段名称为中文"""
|
|
result = []
|
|
for item in data_list:
|
|
chinese_item = {}
|
|
for field_name, value in item.items():
|
|
# 如果字段名在反向映射中存在,则使用中文名称
|
|
chinese_field_name = reverse_map.get(field_name, field_name)
|
|
chinese_item[chinese_field_name] = value
|
|
result.append(chinese_item)
|
|
return result
|
|
|
|
def sort_results_by_exact_match(self, data_list, search_term, field_name):
|
|
"""对查询结果进行排序,将完全匹配的结果排在前面"""
|
|
exact_matches = []
|
|
partial_matches = []
|
|
|
|
for item in data_list:
|
|
# 检查是否为完全匹配
|
|
if search_term.upper() == str(item[field_name]).upper():
|
|
exact_matches.append(item)
|
|
else:
|
|
partial_matches.append(item)
|
|
|
|
# 合并结果,完全匹配的排在前面
|
|
return exact_matches + partial_matches
|
|
|
|
def query_ding_e_by_name(self, name, scope=None):
|
|
"""根据定额名称查询定额子目表中详情信息,使用向量检索扩大查询范围"""
|
|
try:
|
|
conn = self.get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# 获取表名和字段映射
|
|
zimu_table = ExcelToSQLiteProcessor.ding_e_table_names["定额子目"]
|
|
mulu_table = ExcelToSQLiteProcessor.ding_e_table_names["定额目录"]
|
|
attr_table = ExcelToSQLiteProcessor.ding_e_table_names["定额资源库属性"]
|
|
field_map = ExcelToSQLiteProcessor.ding_e_field_map
|
|
|
|
# 1. 先使用向量检索获取相似名称
|
|
similar_items = self.get_similar_names_by_vector(query_text=name,
|
|
vector_db=self.ding_e_vector_db,
|
|
field_map=field_map,
|
|
scope=scope)
|
|
similar_names = [item[field_map['名称']] for item in similar_items]
|
|
|
|
# 构建查询条件,始终包含原始名称的模糊匹配
|
|
like_conditions = [f"zimu.{field_map['名称']} LIKE ?"]
|
|
params = [f'%{name}%']
|
|
|
|
# 如果有向量检索结果,添加这些结果的模糊匹配条件
|
|
for similar_name in similar_names:
|
|
like_conditions.append(f"zimu.{field_map['名称']} LIKE ?")
|
|
params.append(f'%{similar_name}%')
|
|
|
|
# 将所有条件用OR连接
|
|
like_conditions_str = " OR ".join(like_conditions)
|
|
like_conditions_str= f"({like_conditions_str})"
|
|
|
|
query = f"""
|
|
SELECT
|
|
zimu.*,
|
|
mulu.{field_map['名称']} as mulu_name,
|
|
attr.{field_map['发布时间']} as attr_pub_time,
|
|
attr.{field_map['适用范围']} as attr_scope
|
|
FROM {zimu_table} zimu
|
|
LEFT JOIN {mulu_table} mulu ON
|
|
zimu.{field_map['章节码']} = mulu.{field_map['章节码']} AND
|
|
zimu.{field_map['资源库名称']} = mulu.{field_map['资源库名称']}
|
|
LEFT JOIN {attr_table} attr ON
|
|
zimu.{field_map['资源库名称']} = attr.{field_map['资源库名称']}
|
|
WHERE {like_conditions_str}
|
|
"""
|
|
|
|
# 如果提供了适用范围,添加过滤条件
|
|
if scope:
|
|
query += f" AND attr.{field_map['适用范围']} LIKE ?"
|
|
params.append(f'%{scope}%')
|
|
|
|
cursor.execute(query, params)
|
|
|
|
# 获取结果
|
|
results = cursor.fetchall()
|
|
data = [dict(row) for row in results]
|
|
|
|
# 对结果进行排序,将全字匹配的排在前面
|
|
data = self.sort_results_by_exact_match(data, name, field_map['名称'])
|
|
data = data[:self.top_k]
|
|
|
|
conn.close()
|
|
|
|
if not data:
|
|
return {"success": True, "message": "未找到匹配的定额信息", "data": []}
|
|
|
|
# 创建反向映射并转换字段名为中文
|
|
ding_e_reverse_map, _ = self.create_reverse_field_map()
|
|
|
|
# 添加自定义字段映射
|
|
ding_e_reverse_map['mulu_name'] = '目录名称'
|
|
ding_e_reverse_map['attr_pub_time'] = '发布时间'
|
|
ding_e_reverse_map['attr_scope'] = '适用范围'
|
|
|
|
chinese_data = self.convert_field_names_to_chinese(data, ding_e_reverse_map)
|
|
|
|
return {"success": True, "message": "查询成功", "data": chinese_data}
|
|
except Exception as e:
|
|
return {"success": False, "message": f"查询出错: {str(e)}"}
|
|
|
|
def query_ding_e_by_code(self, code, scope=None):
|
|
"""根据定额编码查询定额子目表中详情信息"""
|
|
try:
|
|
code = code.upper()
|
|
conn = self.get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# 获取表名和字段映射
|
|
zimu_table = ExcelToSQLiteProcessor.ding_e_table_names["定额子目"]
|
|
mulu_table = ExcelToSQLiteProcessor.ding_e_table_names["定额目录"]
|
|
attr_table = ExcelToSQLiteProcessor.ding_e_table_names["定额资源库属性"]
|
|
field_map = ExcelToSQLiteProcessor.ding_e_field_map
|
|
|
|
# 构建连表查询SQL
|
|
query = f"""
|
|
SELECT
|
|
zimu.*,
|
|
mulu.{field_map['名称']} as mulu_name,
|
|
attr.{field_map['发布时间']} as attr_pub_time,
|
|
attr.{field_map['适用范围']} as attr_scope
|
|
FROM {zimu_table} zimu
|
|
LEFT JOIN {mulu_table} mulu ON
|
|
zimu.{field_map['章节码']} = mulu.{field_map['章节码']} AND
|
|
zimu.{field_map['资源库名称']} = mulu.{field_map['资源库名称']}
|
|
LEFT JOIN {attr_table} attr ON
|
|
zimu.{field_map['资源库名称']} = attr.{field_map['资源库名称']}
|
|
WHERE zimu.{field_map['编码']} LIKE ?
|
|
"""
|
|
|
|
params = [f'%{code}%']
|
|
|
|
# 如果提供了适用范围,添加过滤条件
|
|
if scope:
|
|
query += f" AND attr.{field_map['适用范围']} LIKE ?"
|
|
params.append(f'%{scope}%')
|
|
|
|
cursor.execute(query, params)
|
|
|
|
# 获取结果
|
|
results = cursor.fetchall()
|
|
data = [dict(row) for row in results]
|
|
|
|
# 对结果进行排序,将全字匹配的排在前面
|
|
data = self.sort_results_by_exact_match(data, code, field_map['编码'])
|
|
data = data[:self.top_k]
|
|
|
|
conn.close()
|
|
|
|
if not data:
|
|
return {"success": True, "message": "未找到匹配的定额信息", "data": []}
|
|
|
|
# 创建反向映射并转换字段名为中文
|
|
ding_e_reverse_map, _ = self.create_reverse_field_map()
|
|
|
|
# 添加自定义字段映射
|
|
ding_e_reverse_map['mulu_name'] = '目录名称'
|
|
ding_e_reverse_map['attr_pub_time'] = '发布时间'
|
|
ding_e_reverse_map['attr_scope'] = '适用范围'
|
|
|
|
chinese_data = self.convert_field_names_to_chinese(data, ding_e_reverse_map)
|
|
|
|
return {"success": True, "message": "查询成功", "data": chinese_data}
|
|
except Exception as e:
|
|
return {"success": False, "message": f"查询出错: {str(e)}"}
|
|
|
|
def query_qing_dan_by_name(self, name, scope=None):
|
|
"""根据清单名称查询清单子目表中详情信息,使用向量检索扩大查询范围"""
|
|
try:
|
|
conn = self.get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# 获取表名和字段映射
|
|
zimu_table = ExcelToSQLiteProcessor.qing_dan_table_names["清单子目"]
|
|
mulu_table = ExcelToSQLiteProcessor.qing_dan_table_names["清单目录"]
|
|
attr_table = ExcelToSQLiteProcessor.qing_dan_table_names["资源库属性"]
|
|
field_map = ExcelToSQLiteProcessor.qing_dan_field_map
|
|
|
|
# 1. 先使用向量检索获取相似名称
|
|
similar_items = self.get_similar_names_by_vector(query_text=name, vector_db=self.qing_dan_vector_db, field_map=field_map, scope=scope)
|
|
similar_names = [item['mc'] for item in similar_items]
|
|
|
|
# 构建查询条件,始终包含原始名称的模糊匹配
|
|
like_conditions = [f"zimu.{field_map['名称']} LIKE ?"]
|
|
params = [f'%{name}%']
|
|
|
|
# 如果有向量检索结果,添加这些结果的模糊匹配条件
|
|
for similar_name in similar_names:
|
|
like_conditions.append(f"zimu.{field_map['名称']} LIKE ?")
|
|
params.append(f'%{similar_name}%')
|
|
|
|
# 将所有条件用OR连接
|
|
like_conditions_str = " OR ".join(like_conditions)
|
|
like_conditions_str= f"({like_conditions_str})"
|
|
|
|
query = f"""
|
|
SELECT
|
|
zimu.*,
|
|
mulu.{field_map['名称']} as mulu_name,
|
|
attr.{field_map['发布时间']} as attr_pub_time,
|
|
attr.{field_map['适用范围']} as attr_scope
|
|
FROM {zimu_table} zimu
|
|
LEFT JOIN {mulu_table} mulu ON
|
|
zimu.{field_map['章节码']} = mulu.{field_map['章节码']} AND
|
|
zimu.{field_map['资源库名称']} = mulu.{field_map['资源库名称']}
|
|
LEFT JOIN {attr_table} attr ON
|
|
zimu.{field_map['资源库名称']} = attr.{field_map['资源库名称']}
|
|
WHERE {like_conditions_str}
|
|
"""
|
|
|
|
# 如果提供了适用范围,添加过滤条件
|
|
if scope:
|
|
query += f" AND attr.{field_map['适用范围']} LIKE ?"
|
|
params.append(f'%{scope}%')
|
|
|
|
cursor.execute(query, params)
|
|
|
|
# 获取结果
|
|
results = cursor.fetchall()
|
|
data = [dict(row) for row in results]
|
|
|
|
# 对结果进行排序,将全字匹配的排在前面
|
|
data = self.sort_results_by_exact_match(data, name, field_map['名称'])
|
|
data = data[:self.top_k]
|
|
|
|
conn.close()
|
|
|
|
if not data:
|
|
return {"success": True, "message": "未找到匹配的清单信息", "data": []}
|
|
|
|
# 创建反向映射并转换字段名为中文
|
|
_, qing_dan_reverse_map = self.create_reverse_field_map()
|
|
|
|
# 添加自定义字段映射
|
|
qing_dan_reverse_map['mulu_name'] = '目录名称'
|
|
qing_dan_reverse_map['attr_pub_time'] = '发布时间'
|
|
qing_dan_reverse_map['attr_scope'] = '适用范围'
|
|
|
|
chinese_data = self.convert_field_names_to_chinese(data, qing_dan_reverse_map)
|
|
|
|
return {"success": True, "message": "查询成功", "data": chinese_data}
|
|
except Exception as e:
|
|
return {"success": False, "message": f"查询出错: {str(e)}"}
|
|
|
|
def query_qing_dan_by_code(self, code, scope=None):
|
|
"""根据清单编码查询清单子目表中详情信息"""
|
|
try:
|
|
code = code.upper()
|
|
conn = self.get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# 获取表名和字段映射
|
|
zimu_table = ExcelToSQLiteProcessor.qing_dan_table_names["清单子目"]
|
|
mulu_table = ExcelToSQLiteProcessor.qing_dan_table_names["清单目录"]
|
|
attr_table = ExcelToSQLiteProcessor.qing_dan_table_names["资源库属性"]
|
|
field_map = ExcelToSQLiteProcessor.qing_dan_field_map
|
|
|
|
# 构建连表查询SQL
|
|
query = f"""
|
|
SELECT
|
|
zimu.*,
|
|
mulu.{field_map['名称']} as mulu_name,
|
|
attr.{field_map['发布时间']} as attr_pub_time,
|
|
attr.{field_map['适用范围']} as attr_scope
|
|
FROM {zimu_table} zimu
|
|
LEFT JOIN {mulu_table} mulu ON
|
|
zimu.{field_map['章节码']} = mulu.{field_map['章节码']} AND
|
|
zimu.{field_map['资源库名称']} = mulu.{field_map['资源库名称']}
|
|
LEFT JOIN {attr_table} attr ON
|
|
zimu.{field_map['资源库名称']} = attr.{field_map['资源库名称']}
|
|
WHERE zimu.{field_map['编码']} LIKE ?
|
|
"""
|
|
|
|
params = [f'%{code}%']
|
|
|
|
# 如果提供了适用范围,添加过滤条件
|
|
if scope:
|
|
query += f" AND attr.{field_map['适用范围']} LIKE ?"
|
|
params.append(f'%{scope}%')
|
|
|
|
cursor.execute(query, params)
|
|
|
|
# 获取结果
|
|
results = cursor.fetchall()
|
|
data = [dict(row) for row in results]
|
|
|
|
# 对结果进行排序,将全字匹配的排在前面
|
|
data = self.sort_results_by_exact_match(data, code, field_map['编码'])
|
|
data = data[:self.top_k]
|
|
|
|
conn.close()
|
|
|
|
if not data:
|
|
return {"success": True, "message": "未找到匹配的清单信息", "data": []}
|
|
|
|
# 创建反向映射并转换字段名为中文
|
|
_, qing_dan_reverse_map = self.create_reverse_field_map()
|
|
|
|
# 添加自定义字段映射
|
|
qing_dan_reverse_map['mulu_name'] = '目录名称'
|
|
qing_dan_reverse_map['attr_pub_time'] = '发布时间'
|
|
qing_dan_reverse_map['attr_scope'] = '适用范围'
|
|
|
|
chinese_data = self.convert_field_names_to_chinese(data, qing_dan_reverse_map)
|
|
|
|
return {"success": True, "message": "查询成功", "data": chinese_data}
|
|
except Exception as e:
|
|
return {"success": False, "message": f"查询出错: {str(e)}"}
|
|
|
|
def batch_query(self, requests:BatchQueryRequest):
|
|
"""批量查询接口,支持向量检索"""
|
|
dinge_results = []
|
|
qingdan_results = []
|
|
tracking_dict = {} # 用于跟踪已查询过的项目,避免重复
|
|
|
|
try:
|
|
# 获取查询信息
|
|
dinge_info = requests.dinge_qingdan_info.dinge_info_list
|
|
qingdan_info = requests.dinge_qingdan_info.qingdan_info
|
|
scope = requests.scope
|
|
|
|
# 处理定额编码查询
|
|
for code in dinge_info.dinge_code_list or []:
|
|
key = f"dinge_code_{code}_{scope}"
|
|
if key not in tracking_dict:
|
|
result = self.query_ding_e_by_code(code, scope)
|
|
if result["success"] and result["data"]:
|
|
dinge_results.extend(result["data"])
|
|
tracking_dict[key] = True
|
|
|
|
# 处理定额名称查询
|
|
for name in dinge_info.dinge_name_list or []:
|
|
key = f"dinge_name_{name}_{scope}"
|
|
if key not in tracking_dict:
|
|
result = self.query_ding_e_by_name(name, scope)
|
|
if result["success"] and result["data"]:
|
|
dinge_results.extend(result["data"])
|
|
tracking_dict[key] = True
|
|
|
|
# 处理清单编码查询
|
|
for code in qingdan_info.qingdan_code_list or []:
|
|
key = f"qingdan_code_{code}_{scope}"
|
|
if key not in tracking_dict:
|
|
result = self.query_qing_dan_by_code(code, scope)
|
|
if result["success"] and result["data"]:
|
|
qingdan_results.extend(result["data"])
|
|
tracking_dict[key] = True
|
|
|
|
# 处理清单名称查询
|
|
for name in qingdan_info.qingdan_name_list or []:
|
|
key = f"qingdan_name_{name}_{scope}"
|
|
if key not in tracking_dict:
|
|
result = self.query_qing_dan_by_name(name, scope)
|
|
if result["success"] and result["data"]:
|
|
qingdan_results.extend(result["data"])
|
|
tracking_dict[key] = True
|
|
|
|
# 限制返回结果数量
|
|
dinge_results = dinge_results[:self.top_k]
|
|
qingdan_results = qingdan_results[:self.top_k]
|
|
|
|
if not dinge_results and not qingdan_results:
|
|
return {
|
|
"success": True,
|
|
"message": "未找到匹配信息",
|
|
"dinge_data": [],
|
|
"qingdan_data": []
|
|
}
|
|
|
|
return {
|
|
"success": True,
|
|
"message": "查询成功",
|
|
"dinge_data": dinge_results,
|
|
"qingdan_data": qingdan_results
|
|
}
|
|
except Exception as e:
|
|
return {"success": False, "message": f"批量查询出错: {str(e)}", "dinge_data": [], "qingdan_data": []}
|
|
|
|
# 创建查询服务实例
|
|
query_service = QingDanDingEQueryService()
|
|
|
|
# 1. 根据定额名称查询定额子目表中详情信息(包含资源库属性和目录信息)
|
|
@app.get("/api/ding_e/by_name", response_model=QueryResponse)
|
|
async def query_ding_e_by_name(
|
|
name: str = Query(..., description="定额名称"),
|
|
scope: Optional[str] = Query(None, description="适用范围")
|
|
):
|
|
result = query_service.query_ding_e_by_name(name, scope)
|
|
if not result["success"]:
|
|
raise HTTPException(status_code=500, detail=result["message"])
|
|
return QueryResponse(**result)
|
|
|
|
# 2. 根据定额编码查询定额子目表中详情信息(包含资源库属性和目录信息)
|
|
@app.get("/api/ding_e/by_code", response_model=QueryResponse)
|
|
async def query_ding_e_by_code(
|
|
code: str = Query(..., description="定额编码"),
|
|
scope: Optional[str] = Query(None, description="适用范围")
|
|
):
|
|
result = query_service.query_ding_e_by_code(code, scope)
|
|
if not result["success"]:
|
|
raise HTTPException(status_code=500, detail=result["message"])
|
|
return QueryResponse(**result)
|
|
|
|
# 3. 根据清单名称查询清单子目表中详情信息(包含资源库属性和目录信息)
|
|
@app.get("/api/qing_dan/by_name", response_model=QueryResponse)
|
|
async def query_qing_dan_by_name(
|
|
name: str = Query(..., description="清单名称"),
|
|
scope: Optional[str] = Query(None, description="适用范围")
|
|
):
|
|
result = query_service.query_qing_dan_by_name(name, scope)
|
|
if not result["success"]:
|
|
raise HTTPException(status_code=500, detail=result["message"])
|
|
return QueryResponse(**result)
|
|
|
|
# 4. 根据清单编码查询清单子目表中详情信息(包含资源库属性和目录信息)
|
|
@app.get("/api/qing_dan/by_code", response_model=QueryResponse)
|
|
async def query_qing_dan_by_code(
|
|
code: str = Query(..., description="清单编码"),
|
|
scope: Optional[str] = Query(None, description="适用范围")
|
|
):
|
|
result = query_service.query_qing_dan_by_code(code, scope)
|
|
if not result["success"]:
|
|
raise HTTPException(status_code=500, detail=result["message"])
|
|
return QueryResponse(**result)
|
|
|
|
# 5. 批量查询定额和清单信息
|
|
@app.post("/api/batch_query", response_model=BatchQueryResponse)
|
|
async def batch_query(request: BatchQueryRequest):
|
|
result = query_service.batch_query(request)
|
|
if not result["success"]:
|
|
raise HTTPException(status_code=500, detail=result["message"])
|
|
return BatchQueryResponse(**result)
|
|
|
|
# 启动服务器的函数
|
|
def start_api_server():
|
|
"""启动FastAPI服务器"""
|
|
uvicorn.run(app, host="0.0.0.0", port=8005)
|
|
|
|
# 主函数
|
|
def main():
|
|
"""主函数"""
|
|
print("正在启动API服务器...")
|
|
start_api_server()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
# uvicorn rag2_0.dify.query_dinge_qingdan_api:app --host 0.0.0.0 --port 8005 --workers 10 |