feat: 添加清单定额查询API并优化意图识别模块
新增清单定额查询API服务,支持通过名称和编码查询定额及清单信息 在意图识别模块中添加定额清单信息提取功能,并记录各步骤耗时 将SiliconFlowEmbeddings替换为XinferenceEmbeddings并添加sqlite-vss依赖 优化shell脚本的screen会话检测逻辑
This commit is contained in:
@@ -0,0 +1,567 @@
|
||||
# 添加FastAPI相关导入
|
||||
from fastapi import FastAPI, HTTPException, Query
|
||||
from pydantic import BaseModel
|
||||
from typing import List, Optional, Dict, Any
|
||||
import uvicorn
|
||||
import sqlite3
|
||||
import sys
|
||||
import os
|
||||
|
||||
# 导入ExcelToSQLiteProcessor类
|
||||
sys.path.append(os.getcwd())
|
||||
from rag2_0.demo.create_qingdan_dinge_database import ExcelToSQLiteProcessor
|
||||
# 导入向量检索相关类
|
||||
from rag2_0.tool.ModelTool import XinferenceEmbeddings
|
||||
from langchain_community.vectorstores import SQLiteVSS
|
||||
from rag2_0.tool.APIKeyManager import APIKeyManager
|
||||
|
||||
# 创建FastAPI应用
|
||||
app = FastAPI(title="清单定额库查询API", description="提供清单和定额信息查询接口")
|
||||
TOP_K = 100
|
||||
|
||||
# 响应模型
|
||||
class QueryResponse(BaseModel):
|
||||
success: bool
|
||||
message: str
|
||||
data: Optional[List[Dict[str, Any]]] = None
|
||||
|
||||
# 批量查询请求模型
|
||||
class DingEInfoList(BaseModel):
|
||||
dinge_code_list: List[str] = []
|
||||
dinge_name_list: List[str] = []
|
||||
|
||||
class QingDanInfo(BaseModel):
|
||||
qingdan_code_list: List[str] = []
|
||||
qingdan_name_list: List[str] = []
|
||||
|
||||
class DingEQingDanInfo(BaseModel):
|
||||
dinge_info_list: DingEInfoList
|
||||
qingdan_info: QingDanInfo
|
||||
|
||||
class BatchQueryRequest(BaseModel):
|
||||
dinge_qingdan_info: DingEQingDanInfo
|
||||
scope: Optional[str] = Query(None, description="适用范围")
|
||||
|
||||
# 批量查询响应模型
|
||||
class BatchQueryResponse(BaseModel):
|
||||
success: bool
|
||||
message: str
|
||||
dinge_data: Optional[List[Dict[str, Any]]] = None
|
||||
qingdan_data: Optional[List[Dict[str, Any]]] = None
|
||||
|
||||
# 封装查询数据的相关代码
|
||||
class QingDanDingEQueryService:
|
||||
def __init__(self, db_path="/data/QueryRewrite/data/db/qingdan_ding_e_ku.db"):
|
||||
self.db_path = db_path
|
||||
self.top_k = TOP_K
|
||||
|
||||
# 初始化向量检索相关组件
|
||||
self.embedding_function = XinferenceEmbeddings(api_key="")
|
||||
|
||||
# 初始化向量数据库连接
|
||||
self.ding_e_vector_db = SQLiteVSS(
|
||||
table="embeding_ding_e_zimu_name",
|
||||
connection=None,
|
||||
embedding=self.embedding_function,
|
||||
db_file=self.db_path
|
||||
)
|
||||
|
||||
self.qing_dan_vector_db = SQLiteVSS(
|
||||
table="embeding_qd_zimu_name",
|
||||
connection=None,
|
||||
embedding=self.embedding_function,
|
||||
db_file=self.db_path
|
||||
)
|
||||
|
||||
def get_similar_names_by_vector(self, query_text:str, vector_db:SQLiteVSS, field_map:dict, top_k:int=3, scope:str=None):
|
||||
"""使用向量检索获取相似名称"""
|
||||
try:
|
||||
# 使用向量数据库进行相似性搜索
|
||||
results = vector_db.similarity_search_with_score(query=query_text, k=30)
|
||||
|
||||
# 提取结果中的元数据
|
||||
similar_items = []
|
||||
for doc, score in results:
|
||||
if scope and scope not in doc.metadata[field_map["适用范围"]]:
|
||||
continue
|
||||
|
||||
metadata = doc.metadata
|
||||
# 添加相似度分数
|
||||
metadata['similarity_score'] = float(score)
|
||||
similar_items.append(metadata)
|
||||
|
||||
# 按相似度分数排序,分数高的排前面
|
||||
similar_items.sort(key=lambda x: x['similarity_score'])
|
||||
return similar_items[:top_k]
|
||||
except Exception as e:
|
||||
print(f"向量检索出错: {str(e)}")
|
||||
return []
|
||||
|
||||
def get_db_connection(self):
|
||||
"""获取数据库连接"""
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
conn.row_factory = sqlite3.Row # 设置行工厂,使结果可以通过列名访问
|
||||
return conn
|
||||
|
||||
def create_reverse_field_map(self):
|
||||
"""创建字段反向映射(数据库字段名 -> 中文字段名)"""
|
||||
# 定额库字段反向映射
|
||||
ding_e_reverse_map = {v: k for k, v in ExcelToSQLiteProcessor.ding_e_field_map.items()}
|
||||
# 清单库字段反向映射
|
||||
qing_dan_reverse_map = {v: k for k, v in ExcelToSQLiteProcessor.qing_dan_field_map.items()}
|
||||
return ding_e_reverse_map, qing_dan_reverse_map
|
||||
|
||||
def convert_field_names_to_chinese(self, data_list, reverse_map):
|
||||
"""转换字段名称为中文"""
|
||||
result = []
|
||||
for item in data_list:
|
||||
chinese_item = {}
|
||||
for field_name, value in item.items():
|
||||
# 如果字段名在反向映射中存在,则使用中文名称
|
||||
chinese_field_name = reverse_map.get(field_name, field_name)
|
||||
chinese_item[chinese_field_name] = value
|
||||
result.append(chinese_item)
|
||||
return result
|
||||
|
||||
def sort_results_by_exact_match(self, data_list, search_term, field_name):
|
||||
"""对查询结果进行排序,将完全匹配的结果排在前面"""
|
||||
exact_matches = []
|
||||
partial_matches = []
|
||||
|
||||
for item in data_list:
|
||||
# 检查是否为完全匹配
|
||||
if search_term.upper() == str(item[field_name]).upper():
|
||||
exact_matches.append(item)
|
||||
else:
|
||||
partial_matches.append(item)
|
||||
|
||||
# 合并结果,完全匹配的排在前面
|
||||
return exact_matches + partial_matches
|
||||
|
||||
def query_ding_e_by_name(self, name, scope=None):
|
||||
"""根据定额名称查询定额子目表中详情信息,使用向量检索扩大查询范围"""
|
||||
try:
|
||||
conn = self.get_db_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
# 获取表名和字段映射
|
||||
zimu_table = ExcelToSQLiteProcessor.ding_e_table_names["定额子目"]
|
||||
mulu_table = ExcelToSQLiteProcessor.ding_e_table_names["定额目录"]
|
||||
attr_table = ExcelToSQLiteProcessor.ding_e_table_names["定额资源库属性"]
|
||||
field_map = ExcelToSQLiteProcessor.ding_e_field_map
|
||||
|
||||
# 1. 先使用向量检索获取相似名称
|
||||
similar_items = self.get_similar_names_by_vector(query_text=name,
|
||||
vector_db=self.ding_e_vector_db,
|
||||
field_map=field_map,
|
||||
scope=scope)
|
||||
similar_names = [item[field_map['名称']] for item in similar_items]
|
||||
|
||||
# 构建查询条件,始终包含原始名称的模糊匹配
|
||||
like_conditions = [f"zimu.{field_map['名称']} LIKE ?"]
|
||||
params = [f'%{name}%']
|
||||
|
||||
# 如果有向量检索结果,添加这些结果的模糊匹配条件
|
||||
for similar_name in similar_names:
|
||||
like_conditions.append(f"zimu.{field_map['名称']} LIKE ?")
|
||||
params.append(f'%{similar_name}%')
|
||||
|
||||
# 将所有条件用OR连接
|
||||
like_conditions_str = " OR ".join(like_conditions)
|
||||
like_conditions_str= f"({like_conditions_str})"
|
||||
|
||||
query = f"""
|
||||
SELECT
|
||||
zimu.*,
|
||||
mulu.{field_map['名称']} as mulu_name,
|
||||
attr.{field_map['发布时间']} as attr_pub_time,
|
||||
attr.{field_map['适用范围']} as attr_scope
|
||||
FROM {zimu_table} zimu
|
||||
LEFT JOIN {mulu_table} mulu ON
|
||||
zimu.{field_map['章节码']} = mulu.{field_map['章节码']} AND
|
||||
zimu.{field_map['资源库名称']} = mulu.{field_map['资源库名称']}
|
||||
LEFT JOIN {attr_table} attr ON
|
||||
zimu.{field_map['资源库名称']} = attr.{field_map['资源库名称']}
|
||||
WHERE {like_conditions_str}
|
||||
"""
|
||||
|
||||
# 如果提供了适用范围,添加过滤条件
|
||||
if scope:
|
||||
query += f" AND attr.{field_map['适用范围']} LIKE ?"
|
||||
params.append(f'%{scope}%')
|
||||
|
||||
cursor.execute(query, params)
|
||||
|
||||
# 获取结果
|
||||
results = cursor.fetchall()
|
||||
data = [dict(row) for row in results]
|
||||
|
||||
# 对结果进行排序,将全字匹配的排在前面
|
||||
data = self.sort_results_by_exact_match(data, name, field_map['名称'])
|
||||
data = data[:self.top_k]
|
||||
|
||||
conn.close()
|
||||
|
||||
if not data:
|
||||
return {"success": True, "message": "未找到匹配的定额信息", "data": []}
|
||||
|
||||
# 创建反向映射并转换字段名为中文
|
||||
ding_e_reverse_map, _ = self.create_reverse_field_map()
|
||||
|
||||
# 添加自定义字段映射
|
||||
ding_e_reverse_map['mulu_name'] = '目录名称'
|
||||
ding_e_reverse_map['attr_pub_time'] = '发布时间'
|
||||
ding_e_reverse_map['attr_scope'] = '适用范围'
|
||||
|
||||
chinese_data = self.convert_field_names_to_chinese(data, ding_e_reverse_map)
|
||||
|
||||
return {"success": True, "message": "查询成功", "data": chinese_data}
|
||||
except Exception as e:
|
||||
return {"success": False, "message": f"查询出错: {str(e)}"}
|
||||
|
||||
def query_ding_e_by_code(self, code, scope=None):
|
||||
"""根据定额编码查询定额子目表中详情信息"""
|
||||
try:
|
||||
code = code.upper()
|
||||
conn = self.get_db_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
# 获取表名和字段映射
|
||||
zimu_table = ExcelToSQLiteProcessor.ding_e_table_names["定额子目"]
|
||||
mulu_table = ExcelToSQLiteProcessor.ding_e_table_names["定额目录"]
|
||||
attr_table = ExcelToSQLiteProcessor.ding_e_table_names["定额资源库属性"]
|
||||
field_map = ExcelToSQLiteProcessor.ding_e_field_map
|
||||
|
||||
# 构建连表查询SQL
|
||||
query = f"""
|
||||
SELECT
|
||||
zimu.*,
|
||||
mulu.{field_map['名称']} as mulu_name,
|
||||
attr.{field_map['发布时间']} as attr_pub_time,
|
||||
attr.{field_map['适用范围']} as attr_scope
|
||||
FROM {zimu_table} zimu
|
||||
LEFT JOIN {mulu_table} mulu ON
|
||||
zimu.{field_map['章节码']} = mulu.{field_map['章节码']} AND
|
||||
zimu.{field_map['资源库名称']} = mulu.{field_map['资源库名称']}
|
||||
LEFT JOIN {attr_table} attr ON
|
||||
zimu.{field_map['资源库名称']} = attr.{field_map['资源库名称']}
|
||||
WHERE zimu.{field_map['编码']} LIKE ?
|
||||
"""
|
||||
|
||||
params = [f'%{code}%']
|
||||
|
||||
# 如果提供了适用范围,添加过滤条件
|
||||
if scope:
|
||||
query += f" AND attr.{field_map['适用范围']} LIKE ?"
|
||||
params.append(f'%{scope}%')
|
||||
|
||||
cursor.execute(query, params)
|
||||
|
||||
# 获取结果
|
||||
results = cursor.fetchall()
|
||||
data = [dict(row) for row in results]
|
||||
|
||||
# 对结果进行排序,将全字匹配的排在前面
|
||||
data = self.sort_results_by_exact_match(data, code, field_map['编码'])
|
||||
data = data[:self.top_k]
|
||||
|
||||
conn.close()
|
||||
|
||||
if not data:
|
||||
return {"success": True, "message": "未找到匹配的定额信息", "data": []}
|
||||
|
||||
# 创建反向映射并转换字段名为中文
|
||||
ding_e_reverse_map, _ = self.create_reverse_field_map()
|
||||
|
||||
# 添加自定义字段映射
|
||||
ding_e_reverse_map['mulu_name'] = '目录名称'
|
||||
ding_e_reverse_map['attr_pub_time'] = '发布时间'
|
||||
ding_e_reverse_map['attr_scope'] = '适用范围'
|
||||
|
||||
chinese_data = self.convert_field_names_to_chinese(data, ding_e_reverse_map)
|
||||
|
||||
return {"success": True, "message": "查询成功", "data": chinese_data}
|
||||
except Exception as e:
|
||||
return {"success": False, "message": f"查询出错: {str(e)}"}
|
||||
|
||||
def query_qing_dan_by_name(self, name, scope=None):
|
||||
"""根据清单名称查询清单子目表中详情信息,使用向量检索扩大查询范围"""
|
||||
try:
|
||||
conn = self.get_db_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
# 获取表名和字段映射
|
||||
zimu_table = ExcelToSQLiteProcessor.qing_dan_table_names["清单子目"]
|
||||
mulu_table = ExcelToSQLiteProcessor.qing_dan_table_names["清单目录"]
|
||||
attr_table = ExcelToSQLiteProcessor.qing_dan_table_names["资源库属性"]
|
||||
field_map = ExcelToSQLiteProcessor.qing_dan_field_map
|
||||
|
||||
# 1. 先使用向量检索获取相似名称
|
||||
similar_items = self.get_similar_names_by_vector(query_text=name, vector_db=self.qing_dan_vector_db, field_map=field_map, scope=scope)
|
||||
similar_names = [item['mc'] for item in similar_items]
|
||||
|
||||
# 构建查询条件,始终包含原始名称的模糊匹配
|
||||
like_conditions = [f"zimu.{field_map['名称']} LIKE ?"]
|
||||
params = [f'%{name}%']
|
||||
|
||||
# 如果有向量检索结果,添加这些结果的模糊匹配条件
|
||||
for similar_name in similar_names:
|
||||
like_conditions.append(f"zimu.{field_map['名称']} LIKE ?")
|
||||
params.append(f'%{similar_name}%')
|
||||
|
||||
# 将所有条件用OR连接
|
||||
like_conditions_str = " OR ".join(like_conditions)
|
||||
like_conditions_str= f"({like_conditions_str})"
|
||||
|
||||
query = f"""
|
||||
SELECT
|
||||
zimu.*,
|
||||
mulu.{field_map['名称']} as mulu_name,
|
||||
attr.{field_map['发布时间']} as attr_pub_time,
|
||||
attr.{field_map['适用范围']} as attr_scope
|
||||
FROM {zimu_table} zimu
|
||||
LEFT JOIN {mulu_table} mulu ON
|
||||
zimu.{field_map['章节码']} = mulu.{field_map['章节码']} AND
|
||||
zimu.{field_map['资源库名称']} = mulu.{field_map['资源库名称']}
|
||||
LEFT JOIN {attr_table} attr ON
|
||||
zimu.{field_map['资源库名称']} = attr.{field_map['资源库名称']}
|
||||
WHERE {like_conditions_str}
|
||||
"""
|
||||
|
||||
# 如果提供了适用范围,添加过滤条件
|
||||
if scope:
|
||||
query += f" AND attr.{field_map['适用范围']} LIKE ?"
|
||||
params.append(f'%{scope}%')
|
||||
|
||||
cursor.execute(query, params)
|
||||
|
||||
# 获取结果
|
||||
results = cursor.fetchall()
|
||||
data = [dict(row) for row in results]
|
||||
|
||||
# 对结果进行排序,将全字匹配的排在前面
|
||||
data = self.sort_results_by_exact_match(data, name, field_map['名称'])
|
||||
data = data[:self.top_k]
|
||||
|
||||
conn.close()
|
||||
|
||||
if not data:
|
||||
return {"success": True, "message": "未找到匹配的清单信息", "data": []}
|
||||
|
||||
# 创建反向映射并转换字段名为中文
|
||||
_, qing_dan_reverse_map = self.create_reverse_field_map()
|
||||
|
||||
# 添加自定义字段映射
|
||||
qing_dan_reverse_map['mulu_name'] = '目录名称'
|
||||
qing_dan_reverse_map['attr_pub_time'] = '发布时间'
|
||||
qing_dan_reverse_map['attr_scope'] = '适用范围'
|
||||
|
||||
chinese_data = self.convert_field_names_to_chinese(data, qing_dan_reverse_map)
|
||||
|
||||
return {"success": True, "message": "查询成功", "data": chinese_data}
|
||||
except Exception as e:
|
||||
return {"success": False, "message": f"查询出错: {str(e)}"}
|
||||
|
||||
def query_qing_dan_by_code(self, code, scope=None):
|
||||
"""根据清单编码查询清单子目表中详情信息"""
|
||||
try:
|
||||
code = code.upper()
|
||||
conn = self.get_db_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
# 获取表名和字段映射
|
||||
zimu_table = ExcelToSQLiteProcessor.qing_dan_table_names["清单子目"]
|
||||
mulu_table = ExcelToSQLiteProcessor.qing_dan_table_names["清单目录"]
|
||||
attr_table = ExcelToSQLiteProcessor.qing_dan_table_names["资源库属性"]
|
||||
field_map = ExcelToSQLiteProcessor.qing_dan_field_map
|
||||
|
||||
# 构建连表查询SQL
|
||||
query = f"""
|
||||
SELECT
|
||||
zimu.*,
|
||||
mulu.{field_map['名称']} as mulu_name,
|
||||
attr.{field_map['发布时间']} as attr_pub_time,
|
||||
attr.{field_map['适用范围']} as attr_scope
|
||||
FROM {zimu_table} zimu
|
||||
LEFT JOIN {mulu_table} mulu ON
|
||||
zimu.{field_map['章节码']} = mulu.{field_map['章节码']} AND
|
||||
zimu.{field_map['资源库名称']} = mulu.{field_map['资源库名称']}
|
||||
LEFT JOIN {attr_table} attr ON
|
||||
zimu.{field_map['资源库名称']} = attr.{field_map['资源库名称']}
|
||||
WHERE zimu.{field_map['编码']} LIKE ?
|
||||
"""
|
||||
|
||||
params = [f'%{code}%']
|
||||
|
||||
# 如果提供了适用范围,添加过滤条件
|
||||
if scope:
|
||||
query += f" AND attr.{field_map['适用范围']} LIKE ?"
|
||||
params.append(f'%{scope}%')
|
||||
|
||||
cursor.execute(query, params)
|
||||
|
||||
# 获取结果
|
||||
results = cursor.fetchall()
|
||||
data = [dict(row) for row in results]
|
||||
|
||||
# 对结果进行排序,将全字匹配的排在前面
|
||||
data = self.sort_results_by_exact_match(data, code, field_map['编码'])
|
||||
data = data[:self.top_k]
|
||||
|
||||
conn.close()
|
||||
|
||||
if not data:
|
||||
return {"success": True, "message": "未找到匹配的清单信息", "data": []}
|
||||
|
||||
# 创建反向映射并转换字段名为中文
|
||||
_, qing_dan_reverse_map = self.create_reverse_field_map()
|
||||
|
||||
# 添加自定义字段映射
|
||||
qing_dan_reverse_map['mulu_name'] = '目录名称'
|
||||
qing_dan_reverse_map['attr_pub_time'] = '发布时间'
|
||||
qing_dan_reverse_map['attr_scope'] = '适用范围'
|
||||
|
||||
chinese_data = self.convert_field_names_to_chinese(data, qing_dan_reverse_map)
|
||||
|
||||
return {"success": True, "message": "查询成功", "data": chinese_data}
|
||||
except Exception as e:
|
||||
return {"success": False, "message": f"查询出错: {str(e)}"}
|
||||
|
||||
def batch_query(self, requests:BatchQueryRequest):
|
||||
"""批量查询接口,支持向量检索"""
|
||||
dinge_results = []
|
||||
qingdan_results = []
|
||||
tracking_dict = {} # 用于跟踪已查询过的项目,避免重复
|
||||
|
||||
try:
|
||||
# 获取查询信息
|
||||
dinge_info = requests.dinge_qingdan_info.dinge_info_list
|
||||
qingdan_info = requests.dinge_qingdan_info.qingdan_info
|
||||
scope = requests.scope
|
||||
|
||||
# 处理定额编码查询
|
||||
for code in dinge_info.dinge_code_list or []:
|
||||
key = f"dinge_code_{code}_{scope}"
|
||||
if key not in tracking_dict:
|
||||
result = self.query_ding_e_by_code(code, scope)
|
||||
if result["success"] and result["data"]:
|
||||
dinge_results.extend(result["data"])
|
||||
tracking_dict[key] = True
|
||||
|
||||
# 处理定额名称查询
|
||||
for name in dinge_info.dinge_name_list or []:
|
||||
key = f"dinge_name_{name}_{scope}"
|
||||
if key not in tracking_dict:
|
||||
result = self.query_ding_e_by_name(name, scope)
|
||||
if result["success"] and result["data"]:
|
||||
dinge_results.extend(result["data"])
|
||||
tracking_dict[key] = True
|
||||
|
||||
# 处理清单编码查询
|
||||
for code in qingdan_info.qingdan_code_list or []:
|
||||
key = f"qingdan_code_{code}_{scope}"
|
||||
if key not in tracking_dict:
|
||||
result = self.query_qing_dan_by_code(code, scope)
|
||||
if result["success"] and result["data"]:
|
||||
qingdan_results.extend(result["data"])
|
||||
tracking_dict[key] = True
|
||||
|
||||
# 处理清单名称查询
|
||||
for name in qingdan_info.qingdan_name_list or []:
|
||||
key = f"qingdan_name_{name}_{scope}"
|
||||
if key not in tracking_dict:
|
||||
result = self.query_qing_dan_by_name(name, scope)
|
||||
if result["success"] and result["data"]:
|
||||
qingdan_results.extend(result["data"])
|
||||
tracking_dict[key] = True
|
||||
|
||||
# 限制返回结果数量
|
||||
dinge_results = dinge_results[:self.top_k]
|
||||
qingdan_results = qingdan_results[:self.top_k]
|
||||
|
||||
if not dinge_results and not qingdan_results:
|
||||
return {
|
||||
"success": True,
|
||||
"message": "未找到匹配信息",
|
||||
"dinge_data": [],
|
||||
"qingdan_data": []
|
||||
}
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": "查询成功",
|
||||
"dinge_data": dinge_results,
|
||||
"qingdan_data": qingdan_results
|
||||
}
|
||||
except Exception as e:
|
||||
return {"success": False, "message": f"批量查询出错: {str(e)}", "dinge_data": [], "qingdan_data": []}
|
||||
|
||||
# 创建查询服务实例
|
||||
query_service = QingDanDingEQueryService()
|
||||
|
||||
# 1. 根据定额名称查询定额子目表中详情信息(包含资源库属性和目录信息)
|
||||
@app.get("/api/ding_e/by_name", response_model=QueryResponse)
|
||||
async def query_ding_e_by_name(
|
||||
name: str = Query(..., description="定额名称"),
|
||||
scope: Optional[str] = Query(None, description="适用范围")
|
||||
):
|
||||
result = query_service.query_ding_e_by_name(name, scope)
|
||||
if not result["success"]:
|
||||
raise HTTPException(status_code=500, detail=result["message"])
|
||||
return QueryResponse(**result)
|
||||
|
||||
# 2. 根据定额编码查询定额子目表中详情信息(包含资源库属性和目录信息)
|
||||
@app.get("/api/ding_e/by_code", response_model=QueryResponse)
|
||||
async def query_ding_e_by_code(
|
||||
code: str = Query(..., description="定额编码"),
|
||||
scope: Optional[str] = Query(None, description="适用范围")
|
||||
):
|
||||
result = query_service.query_ding_e_by_code(code, scope)
|
||||
if not result["success"]:
|
||||
raise HTTPException(status_code=500, detail=result["message"])
|
||||
return QueryResponse(**result)
|
||||
|
||||
# 3. 根据清单名称查询清单子目表中详情信息(包含资源库属性和目录信息)
|
||||
@app.get("/api/qing_dan/by_name", response_model=QueryResponse)
|
||||
async def query_qing_dan_by_name(
|
||||
name: str = Query(..., description="清单名称"),
|
||||
scope: Optional[str] = Query(None, description="适用范围")
|
||||
):
|
||||
result = query_service.query_qing_dan_by_name(name, scope)
|
||||
if not result["success"]:
|
||||
raise HTTPException(status_code=500, detail=result["message"])
|
||||
return QueryResponse(**result)
|
||||
|
||||
# 4. 根据清单编码查询清单子目表中详情信息(包含资源库属性和目录信息)
|
||||
@app.get("/api/qing_dan/by_code", response_model=QueryResponse)
|
||||
async def query_qing_dan_by_code(
|
||||
code: str = Query(..., description="清单编码"),
|
||||
scope: Optional[str] = Query(None, description="适用范围")
|
||||
):
|
||||
result = query_service.query_qing_dan_by_code(code, scope)
|
||||
if not result["success"]:
|
||||
raise HTTPException(status_code=500, detail=result["message"])
|
||||
return QueryResponse(**result)
|
||||
|
||||
# 5. 批量查询定额和清单信息
|
||||
@app.post("/api/batch_query", response_model=BatchQueryResponse)
|
||||
async def batch_query(request: BatchQueryRequest):
|
||||
result = query_service.batch_query(request)
|
||||
if not result["success"]:
|
||||
raise HTTPException(status_code=500, detail=result["message"])
|
||||
return BatchQueryResponse(**result)
|
||||
|
||||
# 启动服务器的函数
|
||||
def start_api_server():
|
||||
"""启动FastAPI服务器"""
|
||||
uvicorn.run(app, host="0.0.0.0", port=8005)
|
||||
|
||||
# 主函数
|
||||
def main():
|
||||
"""主函数"""
|
||||
print("正在启动API服务器...")
|
||||
start_api_server()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
# uvicorn rag2_0.dify.query_dinge_qingdan_api:app --host 0.0.0.0 --port 8005 --workers 10
|
||||
Reference in New Issue
Block a user