From 1a3fa44522110da2df74af28c769d89a7e29d854 Mon Sep 17 00:00:00 2001 From: ouyangyouzhang Date: Wed, 20 Aug 2025 19:08:29 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E6=B8=85=E5=8D=95?= =?UTF-8?q?=E5=AE=9A=E9=A2=9D=E6=9F=A5=E8=AF=A2API=E5=B9=B6=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E6=84=8F=E5=9B=BE=E8=AF=86=E5=88=AB=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增清单定额查询API服务,支持通过名称和编码查询定额及清单信息 在意图识别模块中添加定额清单信息提取功能,并记录各步骤耗时 将SiliconFlowEmbeddings替换为XinferenceEmbeddings并添加sqlite-vss依赖 优化shell脚本的screen会话检测逻辑 --- pyproject.toml | 1 + rag2_0/demo/create_qingdan_dinge_database.py | 590 ++++++++++++++++++ rag2_0/dify/query_dinge_qingdan_api.py | 567 +++++++++++++++++ .../intent_recognition/IntentRecognition.py | 113 ++-- .../ProfessionalNounVector.py | 4 +- rag2_0/tool/ModelTool.py | 4 +- start_DifyQueryRetrieval_api.sh | 6 +- uv.lock | 12 + 8 files changed, 1244 insertions(+), 53 deletions(-) create mode 100644 rag2_0/demo/create_qingdan_dinge_database.py create mode 100644 rag2_0/dify/query_dinge_qingdan_api.py diff --git a/pyproject.toml b/pyproject.toml index c261117..2e6f193 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ dependencies = [ "python-dotenv>=1.1.0", "requests>=2.32.4", "sqlalchemy>=2.0.41", + "sqlite-vss>=0.1.2", "tqdm>=4.67.1", "uvicorn>=0.35.0", "xlsxwriter>=3.2.5", diff --git a/rag2_0/demo/create_qingdan_dinge_database.py b/rag2_0/demo/create_qingdan_dinge_database.py new file mode 100644 index 0000000..ed974c0 --- /dev/null +++ b/rag2_0/demo/create_qingdan_dinge_database.py @@ -0,0 +1,590 @@ +import os +import sys +import sqlite3 +import pandas as pd +from openpyxl import load_workbook +import logging +import numpy as np +sys.path.append(os.getcwd()) +from rag2_0.tool.ModelTool import XinferenceEmbeddings +from langchain_community.vectorstores import SQLiteVSS + + +class ExcelToSQLiteProcessor: + """Excel文件到SQLite数据库的处理器""" + # 定额库表名映射 + ding_e_table_names = { + "定额资源库属性": "ding_e_zyk_shuxing", + "定额目录": "ding_e_mulu", + "定额子目": "ding_e_zimu" + } + + # 清单库表名映射 + qing_dan_table_names = { + "资源库属性": "qd_zyk_shuxing", + "清单目录": "qd_mulu", + "清单子目": "qd_zimu" + } + + # 定额库字段映射 + ding_e_field_map = { + "资源库名称": "zyk_mc", + "发布时间": "fb_sj", + "适用范围": "sy_fw", + "章节码": "zj_m", + "父章节(章节码)": "fzj_m", + "名称": "mc", + "编码": "bm", + "单位": "dw", + "基价不含税": "jj_bhs", + "基价含税": "jj_hs", + "人工费基价不含税": "rgf_jj_bhs", + "材料费基价不含税": "clf_jj_bhs", + "机械费基价不含税": "jxf_jj_bhs", + "人工费基价含税": "rgf_jj_hs", + "材料费基价含税": "clf_jj_hs", + "机械费基价含税": "jxf_jj_hs", + "人工工日": "rg_gr", + "定额类型": "de_lx", + "工作内容": "gz_nr" + } + + # 清单库字段映射 + qing_dan_field_map = { + "资源库名称": "zyk_mc", + "发布时间": "fb_sj", + "适用范围": "sy_fw", + "章节码": "zj_m", + "父章节": "fzj_m", + "名称": "mc", + "编码": "bm", + "单位": "dw", + "工作内容": "gz_nr", + "计算规则": "js_gz", + "项目特征": "xm_tz", + "特征值": "tz_z" + } + def __init__(self, db_path): + self.db_path = db_path + self.conn = sqlite3.connect(db_path) + self.cursor = self.conn.cursor() + + self._create_tables() + + def _safe_str_convert(self, value): + """安全地将值转换为字符串""" + if value is None or pd.isna(value): + return "" + return str(value).strip() + + def _create_tables(self): + """创建数据库表结构 - 所有字段都使用TEXT类型""" + print("正在创建数据库表结构...") + + # 创建定额库表 - 所有字段都改为TEXT类型 + self.cursor.execute(f""" + CREATE TABLE IF NOT EXISTS {self.ding_e_table_names["定额资源库属性"]} ( + {self.ding_e_field_map["资源库名称"]} TEXT, + {self.ding_e_field_map["发布时间"]} TEXT, + {self.ding_e_field_map["适用范围"]} TEXT + ) + """) + + self.cursor.execute(f""" + CREATE TABLE IF NOT EXISTS {self.ding_e_table_names["定额目录"]} ( + {self.ding_e_field_map["章节码"]} TEXT, + {self.ding_e_field_map["父章节(章节码)"]} TEXT, + {self.ding_e_field_map["名称"]} TEXT, + {self.ding_e_field_map["资源库名称"]} TEXT, + PRIMARY KEY ({self.ding_e_field_map["资源库名称"]}, {self.ding_e_field_map["章节码"]}, {self.ding_e_field_map["名称"]}) + ) + """) + + self.cursor.execute(f""" + CREATE TABLE IF NOT EXISTS {self.ding_e_table_names["定额子目"]} ( + {self.ding_e_field_map["章节码"]} TEXT, + {self.ding_e_field_map["编码"]} TEXT, + {self.ding_e_field_map["名称"]} TEXT, + {self.ding_e_field_map["单位"]} TEXT, + {self.ding_e_field_map["基价不含税"]} TEXT, + {self.ding_e_field_map["基价含税"]} TEXT, + {self.ding_e_field_map["人工费基价不含税"]} TEXT, + {self.ding_e_field_map["材料费基价不含税"]} TEXT, + {self.ding_e_field_map["机械费基价不含税"]} TEXT, + {self.ding_e_field_map["人工费基价含税"]} TEXT, + {self.ding_e_field_map["材料费基价含税"]} TEXT, + {self.ding_e_field_map["机械费基价含税"]} TEXT, + {self.ding_e_field_map["人工工日"]} TEXT, + {self.ding_e_field_map["定额类型"]} TEXT, + {self.ding_e_field_map["工作内容"]} TEXT, + {self.ding_e_field_map["资源库名称"]} TEXT, + PRIMARY KEY ({self.ding_e_field_map["资源库名称"]}, {self.ding_e_field_map["章节码"]}, {self.ding_e_field_map["编码"]}, {self.ding_e_field_map["名称"]}) + ) + """) + + # 创建清单库表 - 所有字段都改为TEXT类型 + self.cursor.execute(f''' + CREATE TABLE IF NOT EXISTS {self.qing_dan_table_names["资源库属性"]} ( + {self.qing_dan_field_map["资源库名称"]} TEXT PRIMARY KEY, + {self.qing_dan_field_map["发布时间"]} TEXT, + {self.qing_dan_field_map["适用范围"]} TEXT + ) + ''') + + self.cursor.execute(f''' + CREATE TABLE IF NOT EXISTS {self.qing_dan_table_names["清单目录"]} ( + {self.qing_dan_field_map["资源库名称"]} TEXT, + {self.qing_dan_field_map["章节码"]} TEXT, + {self.qing_dan_field_map["父章节"]} TEXT, + {self.qing_dan_field_map["名称"]} TEXT, + PRIMARY KEY ({self.qing_dan_field_map["资源库名称"]}, {self.qing_dan_field_map["章节码"]}, {self.qing_dan_field_map["名称"]}) + ) + ''') + + self.cursor.execute(f''' + CREATE TABLE IF NOT EXISTS {self.qing_dan_table_names["清单子目"]} ( + {self.qing_dan_field_map["资源库名称"]} TEXT, + {self.qing_dan_field_map["章节码"]} TEXT, + {self.qing_dan_field_map["编码"]} TEXT, + {self.qing_dan_field_map["名称"]} TEXT, + {self.qing_dan_field_map["单位"]} TEXT, + {self.qing_dan_field_map["工作内容"]} TEXT, + {self.qing_dan_field_map["计算规则"]} TEXT, + {self.qing_dan_field_map["项目特征"]} TEXT, + {self.qing_dan_field_map["特征值"]} TEXT, + PRIMARY KEY ({self.qing_dan_field_map["资源库名称"]}, {self.qing_dan_field_map["章节码"]}, {self.qing_dan_field_map["编码"]}, {self.qing_dan_field_map["名称"]}) + ) + ''') + + print("数据库表结构创建完成") + + def process_ding_e_files(self, ding_e_base_dir): + """处理定额库Excel文件""" + print("=" * 50) + print("开始处理定额库文件...") + print("=" * 50) + + if not os.path.exists(ding_e_base_dir): + print(f"定额库目录不存在: {ding_e_base_dir}") + return + + # 遍历 Excel 文件 + for file_name in os.listdir(ding_e_base_dir): + if not file_name.lower().endswith((".xls", ".xlsx")): + continue + + file_path = os.path.join(ding_e_base_dir, file_name) + print(f"正在处理定额库文件: {file_path}") + + try: + df_attr = pd.read_excel(file_path, sheet_name="资源库属性", dtype=str) + df_mulu = pd.read_excel(file_path, sheet_name="定额目录", dtype=str) + df_zimu = pd.read_excel(file_path, sheet_name="定额子目", dtype=str) + except Exception as e: + print(f"读取 {file_name} 出错: {e}") + continue + + # 提取资源库属性 + attr_dict = pd.Series(df_attr["属性值"].values, index=df_attr["资源库属性"]).to_dict() + zyk_name = self._safe_str_convert(attr_dict.get("资源库名称", "")) + pub_time = self._safe_str_convert(attr_dict.get("发布时间", "")) + scope = self._safe_str_convert(attr_dict.get("适用范围", "")) + + self.cursor.execute( + f"INSERT INTO {self.ding_e_table_names['定额资源库属性']} VALUES (?, ?, ?)", + (zyk_name, pub_time, scope) + ) + + # 定额目录 - 转换所有数据为字符串 + df_mulu_copy = df_mulu.copy() + df_mulu_copy.rename(columns=self.ding_e_field_map, inplace=True) + df_mulu_copy[self.ding_e_field_map["资源库名称"]] = zyk_name + + # 将所有列转换为字符串 + for col in df_mulu_copy.columns: + df_mulu_copy[col] = df_mulu_copy[col].apply(self._safe_str_convert) + + df_mulu_copy.to_sql(self.ding_e_table_names["定额目录"], self.conn, if_exists="append", index=False) + + # 定额子目 - 转换所有数据为字符串 + df_zimu_copy = df_zimu.copy() + df_zimu_copy.rename(columns=self.ding_e_field_map, inplace=True) + df_zimu_copy[self.ding_e_field_map["资源库名称"]] = zyk_name + + # 将所有列转换为字符串 + for col in df_zimu_copy.columns: + df_zimu_copy[col] = df_zimu_copy[col].apply(self._safe_str_convert) + + df_zimu_copy.to_sql(self.ding_e_table_names["定额子目"], self.conn, if_exists="append", index=False) + + print(f" 成功处理定额库文件: {file_name}") + + print("定额库文件处理完成") + + def parse_merged_excel_sheet(self, file_path, sheet_name): + """ + 解析包含合并单元格的Excel表格 + 支持工作内容、项目特征、特征值既可能是合并单元格也可能是非合并单元格的情况 + """ + + # 使用openpyxl读取工作簿以获取合并单元格信息 + wb = load_workbook(file_path, data_only=True) + ws = wb[sheet_name] + + # 获取所有合并单元格的范围 + merged_ranges = list(ws.merged_cells.ranges) + + # 创建一个字典来存储合并单元格的值 + merged_cells_dict = {} + + # 为每个合并单元格范围创建映射 + for merged_range in merged_ranges: + # 获取合并单元格左上角的值 + top_left_cell = ws[merged_range.coord.split(':')[0]] + value = top_left_cell.value + + # 将这个值应用到合并范围内的所有单元格 + for row in range(merged_range.min_row, merged_range.max_row + 1): + for col in range(merged_range.min_col, merged_range.max_col + 1): + merged_cells_dict[(row, col)] = value + + # 将数据转换为二维数组进行处理 + data_array = [] + + # 遍历所有行和列,应用合并单元格的值 + for row_idx in range(1, ws.max_row + 1): # 从第1行开始(包含表头) + row_data = [] + for col_idx in range(1, min(ws.max_column + 1, 9)): # 只取前8列 + cell_key = (row_idx, col_idx) + if cell_key in merged_cells_dict: + cell_value = merged_cells_dict[cell_key] + else: + cell = ws.cell(row=row_idx, column=col_idx) + cell_value = cell.value + + row_data.append(cell_value) + data_array.append(row_data) + + # 跳过表头行 + if len(data_array) > 1: + data_array = data_array[1:] + + # 处理数据:基于章节码、编码、名称进行分组 + processed_data = [] + + # 存储所有数据行,用于后续分组处理 + all_rows = [] + for row_data in data_array: + if len(row_data) < 8: + continue + + # 将所有数据转换为字符串 + 章节码, 编码, 名称, 单位, 工作内容, 计算规则, 项目特征, 特征值 = [ + self._safe_str_convert(cell) for cell in row_data[:8] + ] + + all_rows.append({ + '章节码': 章节码, + '编码': 编码, + '名称': 名称, + '单位': 单位, + '工作内容': 工作内容, + '计算规则': 计算规则, + '项目特征': 项目特征, + '特征值': 特征值 + }) + + # 基于章节码、编码、名称进行分组处理 + grouped_data = {} + + for row in all_rows: + # 创建分组键 + group_key = (row['章节码'], row['编码'], row['名称']) + + # 如果章节码、编码、名称都有值,则作为主记录 + if row['章节码'] and row['编码'] and row['名称']: + if group_key not in grouped_data: + grouped_data[group_key] = { + '章节码': row['章节码'], + '编码': row['编码'], + '名称': row['名称'], + '单位': row['单位'], + '工作内容': [], + '计算规则': row['计算规则'], + '项目特征': [], + '特征值': [] + } + + # 更新单位和计算规则(如果当前行有值且之前没有值) + if row['单位'] and not grouped_data[group_key]['单位']: + grouped_data[group_key]['单位'] = row['单位'] + if row['计算规则'] and not grouped_data[group_key]['计算规则']: + grouped_data[group_key]['计算规则'] = row['计算规则'] + + # 查找该行属于哪个组(找最近的有效组) + target_group = None + if row['章节码'] and row['编码'] and row['名称']: + target_group = group_key + else: + # 如果当前行没有完整的分组信息,查找最近的有效组 + # 这里采用向上查找的策略,找到最近的有效分组 + for existing_key in reversed(list(grouped_data.keys())): + # 如果章节码匹配(或为空),则认为属于该组 + if (not row['章节码'] or row['章节码'] == existing_key[0] or + not row['编码'] or row['编码'] == existing_key[1] or + not row['名称'] or row['名称'] == existing_key[2]): + target_group = existing_key + break + + # 如果还找不到,使用最后一个组 + if not target_group and grouped_data: + target_group = list(grouped_data.keys())[-1] + + # 将工作内容、项目特征、特征值添加到对应的组 + if target_group and target_group in grouped_data: + if row['工作内容']: + # 避免重复添加 + if row['工作内容'] not in grouped_data[target_group]['工作内容']: + grouped_data[target_group]['工作内容'].append(row['工作内容']) + + if row['项目特征']: + if row['项目特征'] not in grouped_data[target_group]['项目特征']: + grouped_data[target_group]['项目特征'].append(row['项目特征']) + + if row['特征值']: + if row['特征值'] not in grouped_data[target_group]['特征值']: + grouped_data[target_group]['特征值'].append(row['特征值']) + + # 将分组后的数据转换为最终格式 + for group_key, group_data in grouped_data.items(): + processed_data.append({ + '章节码': group_data['章节码'], + '编码': group_data['编码'], + '名称': group_data['名称'], + '单位': group_data['单位'], + '工作内容': '\n'.join(group_data['工作内容']), + '计算规则': group_data['计算规则'], + '项目特征': '\n'.join(group_data['项目特征']), + '特征值': '\n'.join(group_data['特征值']) + }) + + return processed_data + + def process_qing_dan_files(self, qing_dan_base_dir): + """处理清单库Excel文件""" + print("=" * 50) + print("开始处理清单库文件...") + print("=" * 50) + + if not os.path.exists(qing_dan_base_dir): + print(f"清单库目录不存在: {qing_dan_base_dir}") + return + + try: + # 获取目录下的所有Excel文件 + excel_files = [f for f in os.listdir(qing_dan_base_dir) if f.endswith('.xlsx') or f.endswith('.xls')] + + for excel_file in excel_files: + file_path = os.path.join(qing_dan_base_dir, excel_file) + print(f"处理清单库文件: {excel_file}") + + # 使用openpyxl加载工作簿以检查sheet名称 + wb = load_workbook(file_path, read_only=True, data_only=True) + sheet_names = wb.sheetnames + + # 检查是否包含所需的三个页签 + required_sheets = ['资源库属性', '清单目录', '清单子目'] + if not all(sheet in sheet_names for sheet in required_sheets): + print(f"警告: {excel_file} 不包含所需的全部页签,跳过此文件") + continue + + # 处理资源库属性页签 + try: + prop_df = pd.read_excel(file_path, sheet_name='资源库属性', header=None, dtype=str) + # 找到属性和值的列 + prop_df.columns = ['属性', '值'] if len(prop_df.columns) >= 2 else ['属性'] + [f'值{i}' for i in range(len(prop_df.columns)-1)] + + # 提取资源库名称、发布时间和适用范围 - 转换为字符串 + 资源库名称 = self._safe_str_convert(prop_df.loc[prop_df['属性'] == '资源库名称', '值'].iloc[0] if '资源库名称' in prop_df['属性'].values else excel_file.split('.')[0]) + 发布时间 = self._safe_str_convert(prop_df.loc[prop_df['属性'] == '发布时间', '值'].iloc[0] if '发布时间' in prop_df['属性'].values else '') + 适用范围 = self._safe_str_convert(prop_df.loc[prop_df['属性'] == '适用范围', '值'].iloc[0] if '适用范围' in prop_df['属性'].values else '') + + # 插入资源库属性 + self.cursor.execute( + f"INSERT OR REPLACE INTO {self.qing_dan_table_names['资源库属性']} ({self.qing_dan_field_map['资源库名称']}, {self.qing_dan_field_map['发布时间']}, {self.qing_dan_field_map['适用范围']}) VALUES (?, ?, ?)", + (资源库名称, 发布时间, 适用范围) + ) + + # 处理清单目录页签 + 目录_df = pd.read_excel(file_path, sheet_name='清单目录', dtype=str) + for _, row in 目录_df.iterrows(): + if pd.notna(row['章节码']): # 确保章节码不为空 + # 将所有数据转换为字符串 + 章节码_str = self._safe_str_convert(row['章节码']) + 父章节_str = self._safe_str_convert(row['父章节(章节码)']) if pd.notna(row['父章节(章节码)']) else '' + 名称_str = self._safe_str_convert(row['名称']) if pd.notna(row['名称']) else '' + + self.cursor.execute( + f"INSERT OR REPLACE INTO {self.qing_dan_table_names['清单目录']} ({self.qing_dan_field_map['资源库名称']}, {self.qing_dan_field_map['章节码']}, {self.qing_dan_field_map['父章节']}, {self.qing_dan_field_map['名称']}) VALUES (?, ?, ?, ?)", + (资源库名称, 章节码_str, 父章节_str, 名称_str) + ) + + # 处理清单子目页签 - 使用改进的合并单元格处理函数 + print(f" 正在处理清单子目页签...") + processed_data = self.parse_merged_excel_sheet(file_path, '清单子目') + + # 将处理后的数据插入数据库 + for data in processed_data: + if data['章节码'] and data['编码'] and data['名称']: # 确保主要字段不为空 + # 所有数据都已经在parse_merged_excel_sheet中转换为字符串 + self.cursor.execute( + f"INSERT OR REPLACE INTO {self.qing_dan_table_names['清单子目']} ({self.qing_dan_field_map['资源库名称']}, {self.qing_dan_field_map['章节码']}, {self.qing_dan_field_map['编码']}, {self.qing_dan_field_map['名称']}, {self.qing_dan_field_map['单位']}, {self.qing_dan_field_map['工作内容']}, {self.qing_dan_field_map['计算规则']}, {self.qing_dan_field_map['项目特征']}, {self.qing_dan_field_map['特征值']}) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + (资源库名称, data['章节码'], data['编码'], data['名称'], data['单位'], + data['工作内容'], data['计算规则'], data['项目特征'], data['特征值']) + ) + + print(f" 成功处理 {len(processed_data)} 条清单子目记录") + + except Exception as e: + print(f"处理清单库文件 {excel_file} 时出错: {str(e)}") + continue + + print("清单库文件处理完成") + + except Exception as e: + print(f"处理清单库文件时出错: {str(e)}") + self.conn.rollback() + + def commit_and_close(self): + """提交事务并关闭数据库连接""" + self.conn.commit() + self.conn.close() + print("数据库事务已提交,连接已关闭") + +class CreateEmbedingData(): + def __init__(self, db_path, api_key="aa"): + self.db_path = db_path + self.conn = sqlite3.connect(db_path) + self.embedding_function = XinferenceEmbeddings(api_key=api_key) + + def create_ding_e_zimu_embedding(self): + """创建定额子目名称的向量索引""" + cursor = self.conn.execute(""" + SELECT dz.bm, dz.mc, dz.zyk_mc, ds.sy_fw + FROM ding_e_zimu dz + LEFT JOIN ding_e_zyk_shuxing ds ON dz.zyk_mc = ds.zyk_mc + """) + rows = cursor.fetchall() + texts = [row[1] for row in rows] # 提取描述文本 + metadatas = [{"bm": row[0], "mc": row[1], "zyk_mc": row[2], "sy_fw": row[3]} for row in rows] # 添加元数据 + + # 创建SQLiteVSS实例 + db = SQLiteVSS( + table="embeding_ding_e_zimu_name", # 向量表名 + connection=None, # 复用现有连接 + embedding=self.embedding_function, + db_file=self.db_path # 复用原数据库文件 + ) + + # 分批次插入数据,每批次5000条 + batch_size = 5000 + for i in range(0, len(texts), batch_size): + batch_texts = texts[i:i+batch_size] + batch_metadatas = metadatas[i:i+batch_size] + db.add_texts(texts=batch_texts, metadatas=batch_metadatas) + print(f"已插入定额子目向量索引 {i+len(batch_texts)}/{len(texts)}") + + return db + + def create_qd_zimu_embedding(self): + """创建清单子目名称的向量索引""" + cursor = self.conn.execute(""" + SELECT qz.bm, qz.mc, qz.zyk_mc, qs.sy_fw + FROM qd_zimu qz + LEFT JOIN qd_zyk_shuxing qs ON qz.zyk_mc = qs.zyk_mc + """) + rows = cursor.fetchall() + texts = [row[1] for row in rows] # 提取描述文本 + metadatas = [{"bm": row[0], "mc": row[1], "zyk_mc": row[2], "sy_fw": row[3]} for row in rows] # 添加元数据 + + # 创建SQLiteVSS实例 + db = SQLiteVSS( + table="embeding_qd_zimu_name", # 向量表名 + connection=None, # 复用现有连接 + embedding=self.embedding_function, + db_file=self.db_path # 复用原数据库文件 + ) + + # 分批次插入数据,每批次5000条 + batch_size = 5000 + for i in range(0, len(texts), batch_size): + batch_texts = texts[i:i+batch_size] + batch_metadatas = metadatas[i:i+batch_size] + db.add_texts(texts=batch_texts, metadatas=batch_metadatas) + print(f"已插入清单子目向量索引 {i+len(batch_texts)}/{len(texts)}") + + return db + + def close(self): + """关闭数据库连接""" + if self.conn: + self.conn.close() + +def main(): + """主函数""" + print("开始处理定额库和清单库Excel文件...") + + # 配置参数 + ding_e_base_dir = "/data/QueryRewrite/data/excel/Excel版 清单定额库/定额库" + qing_dan_base_dir = "/data/QueryRewrite/data/excel/Excel版 清单定额库/清单库" + db_path = "/data/QueryRewrite/data/db/qingdan_ding_e_ku copy.db" + + # 创建处理器实例 + # processor = ExcelToSQLiteProcessor(db_path) + + try: + # 处理定额库文件 + # processor.process_ding_e_files(ding_e_base_dir) + + # # 处理清单库文件 + # processor.process_qing_dan_files(qing_dan_base_dir) + + # # 提交并关闭 + # processor.commit_and_close() + + print("=" * 50) + print("所有Excel文件处理完成!数据已成功导入SQLite数据库") + print(f"数据库文件位置: {db_path}") + print("=" * 50) + + # 生成向量数据 + print("开始生成向量数据...") + try: + # 创建向量数据处理器实例 + embedding_processor = CreateEmbedingData(db_path) + + # 生成定额子目向量数据 + print("正在生成定额子目向量数据...") + embedding_processor.create_ding_e_zimu_embedding() + + # 生成清单子目向量数据 + print("正在生成清单子目向量数据...") + embedding_processor.create_qd_zimu_embedding() + + # 关闭连接 + embedding_processor.close() + + print("=" * 50) + print("向量数据生成完成!") + print("=" * 50) + + except Exception as e: + logging.error(f"生成向量数据过程中出现错误: {str(e)}", exc_info=True) + + except Exception as e: + logging.error(f"处理过程中出现错误: {str(e)}", exc_info=True) + processor.conn.rollback() + processor.conn.close() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/rag2_0/dify/query_dinge_qingdan_api.py b/rag2_0/dify/query_dinge_qingdan_api.py new file mode 100644 index 0000000..04b671b --- /dev/null +++ b/rag2_0/dify/query_dinge_qingdan_api.py @@ -0,0 +1,567 @@ +# 添加FastAPI相关导入 +from fastapi import FastAPI, HTTPException, Query +from pydantic import BaseModel +from typing import List, Optional, Dict, Any +import uvicorn +import sqlite3 +import sys +import os + +# 导入ExcelToSQLiteProcessor类 +sys.path.append(os.getcwd()) +from rag2_0.demo.create_qingdan_dinge_database import ExcelToSQLiteProcessor +# 导入向量检索相关类 +from rag2_0.tool.ModelTool import XinferenceEmbeddings +from langchain_community.vectorstores import SQLiteVSS +from rag2_0.tool.APIKeyManager import APIKeyManager + +# 创建FastAPI应用 +app = FastAPI(title="清单定额库查询API", description="提供清单和定额信息查询接口") +TOP_K = 100 + +# 响应模型 +class QueryResponse(BaseModel): + success: bool + message: str + data: Optional[List[Dict[str, Any]]] = None + +# 批量查询请求模型 +class DingEInfoList(BaseModel): + dinge_code_list: List[str] = [] + dinge_name_list: List[str] = [] + +class QingDanInfo(BaseModel): + qingdan_code_list: List[str] = [] + qingdan_name_list: List[str] = [] + +class DingEQingDanInfo(BaseModel): + dinge_info_list: DingEInfoList + qingdan_info: QingDanInfo + +class BatchQueryRequest(BaseModel): + dinge_qingdan_info: DingEQingDanInfo + scope: Optional[str] = Query(None, description="适用范围") + +# 批量查询响应模型 +class BatchQueryResponse(BaseModel): + success: bool + message: str + dinge_data: Optional[List[Dict[str, Any]]] = None + qingdan_data: Optional[List[Dict[str, Any]]] = None + +# 封装查询数据的相关代码 +class QingDanDingEQueryService: + def __init__(self, db_path="/data/QueryRewrite/data/db/qingdan_ding_e_ku.db"): + self.db_path = db_path + self.top_k = TOP_K + + # 初始化向量检索相关组件 + self.embedding_function = XinferenceEmbeddings(api_key="") + + # 初始化向量数据库连接 + self.ding_e_vector_db = SQLiteVSS( + table="embeding_ding_e_zimu_name", + connection=None, + embedding=self.embedding_function, + db_file=self.db_path + ) + + self.qing_dan_vector_db = SQLiteVSS( + table="embeding_qd_zimu_name", + connection=None, + embedding=self.embedding_function, + db_file=self.db_path + ) + + def get_similar_names_by_vector(self, query_text:str, vector_db:SQLiteVSS, field_map:dict, top_k:int=3, scope:str=None): + """使用向量检索获取相似名称""" + try: + # 使用向量数据库进行相似性搜索 + results = vector_db.similarity_search_with_score(query=query_text, k=30) + + # 提取结果中的元数据 + similar_items = [] + for doc, score in results: + if scope and scope not in doc.metadata[field_map["适用范围"]]: + continue + + metadata = doc.metadata + # 添加相似度分数 + metadata['similarity_score'] = float(score) + similar_items.append(metadata) + + # 按相似度分数排序,分数高的排前面 + similar_items.sort(key=lambda x: x['similarity_score']) + return similar_items[:top_k] + except Exception as e: + print(f"向量检索出错: {str(e)}") + return [] + + def get_db_connection(self): + """获取数据库连接""" + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row # 设置行工厂,使结果可以通过列名访问 + return conn + + def create_reverse_field_map(self): + """创建字段反向映射(数据库字段名 -> 中文字段名)""" + # 定额库字段反向映射 + ding_e_reverse_map = {v: k for k, v in ExcelToSQLiteProcessor.ding_e_field_map.items()} + # 清单库字段反向映射 + qing_dan_reverse_map = {v: k for k, v in ExcelToSQLiteProcessor.qing_dan_field_map.items()} + return ding_e_reverse_map, qing_dan_reverse_map + + def convert_field_names_to_chinese(self, data_list, reverse_map): + """转换字段名称为中文""" + result = [] + for item in data_list: + chinese_item = {} + for field_name, value in item.items(): + # 如果字段名在反向映射中存在,则使用中文名称 + chinese_field_name = reverse_map.get(field_name, field_name) + chinese_item[chinese_field_name] = value + result.append(chinese_item) + return result + + def sort_results_by_exact_match(self, data_list, search_term, field_name): + """对查询结果进行排序,将完全匹配的结果排在前面""" + exact_matches = [] + partial_matches = [] + + for item in data_list: + # 检查是否为完全匹配 + if search_term.upper() == str(item[field_name]).upper(): + exact_matches.append(item) + else: + partial_matches.append(item) + + # 合并结果,完全匹配的排在前面 + return exact_matches + partial_matches + + def query_ding_e_by_name(self, name, scope=None): + """根据定额名称查询定额子目表中详情信息,使用向量检索扩大查询范围""" + try: + conn = self.get_db_connection() + cursor = conn.cursor() + + # 获取表名和字段映射 + zimu_table = ExcelToSQLiteProcessor.ding_e_table_names["定额子目"] + mulu_table = ExcelToSQLiteProcessor.ding_e_table_names["定额目录"] + attr_table = ExcelToSQLiteProcessor.ding_e_table_names["定额资源库属性"] + field_map = ExcelToSQLiteProcessor.ding_e_field_map + + # 1. 先使用向量检索获取相似名称 + similar_items = self.get_similar_names_by_vector(query_text=name, + vector_db=self.ding_e_vector_db, + field_map=field_map, + scope=scope) + similar_names = [item[field_map['名称']] for item in similar_items] + + # 构建查询条件,始终包含原始名称的模糊匹配 + like_conditions = [f"zimu.{field_map['名称']} LIKE ?"] + params = [f'%{name}%'] + + # 如果有向量检索结果,添加这些结果的模糊匹配条件 + for similar_name in similar_names: + like_conditions.append(f"zimu.{field_map['名称']} LIKE ?") + params.append(f'%{similar_name}%') + + # 将所有条件用OR连接 + like_conditions_str = " OR ".join(like_conditions) + like_conditions_str= f"({like_conditions_str})" + + query = f""" + SELECT + zimu.*, + mulu.{field_map['名称']} as mulu_name, + attr.{field_map['发布时间']} as attr_pub_time, + attr.{field_map['适用范围']} as attr_scope + FROM {zimu_table} zimu + LEFT JOIN {mulu_table} mulu ON + zimu.{field_map['章节码']} = mulu.{field_map['章节码']} AND + zimu.{field_map['资源库名称']} = mulu.{field_map['资源库名称']} + LEFT JOIN {attr_table} attr ON + zimu.{field_map['资源库名称']} = attr.{field_map['资源库名称']} + WHERE {like_conditions_str} + """ + + # 如果提供了适用范围,添加过滤条件 + if scope: + query += f" AND attr.{field_map['适用范围']} LIKE ?" + params.append(f'%{scope}%') + + cursor.execute(query, params) + + # 获取结果 + results = cursor.fetchall() + data = [dict(row) for row in results] + + # 对结果进行排序,将全字匹配的排在前面 + data = self.sort_results_by_exact_match(data, name, field_map['名称']) + data = data[:self.top_k] + + conn.close() + + if not data: + return {"success": True, "message": "未找到匹配的定额信息", "data": []} + + # 创建反向映射并转换字段名为中文 + ding_e_reverse_map, _ = self.create_reverse_field_map() + + # 添加自定义字段映射 + ding_e_reverse_map['mulu_name'] = '目录名称' + ding_e_reverse_map['attr_pub_time'] = '发布时间' + ding_e_reverse_map['attr_scope'] = '适用范围' + + chinese_data = self.convert_field_names_to_chinese(data, ding_e_reverse_map) + + return {"success": True, "message": "查询成功", "data": chinese_data} + except Exception as e: + return {"success": False, "message": f"查询出错: {str(e)}"} + + def query_ding_e_by_code(self, code, scope=None): + """根据定额编码查询定额子目表中详情信息""" + try: + code = code.upper() + conn = self.get_db_connection() + cursor = conn.cursor() + + # 获取表名和字段映射 + zimu_table = ExcelToSQLiteProcessor.ding_e_table_names["定额子目"] + mulu_table = ExcelToSQLiteProcessor.ding_e_table_names["定额目录"] + attr_table = ExcelToSQLiteProcessor.ding_e_table_names["定额资源库属性"] + field_map = ExcelToSQLiteProcessor.ding_e_field_map + + # 构建连表查询SQL + query = f""" + SELECT + zimu.*, + mulu.{field_map['名称']} as mulu_name, + attr.{field_map['发布时间']} as attr_pub_time, + attr.{field_map['适用范围']} as attr_scope + FROM {zimu_table} zimu + LEFT JOIN {mulu_table} mulu ON + zimu.{field_map['章节码']} = mulu.{field_map['章节码']} AND + zimu.{field_map['资源库名称']} = mulu.{field_map['资源库名称']} + LEFT JOIN {attr_table} attr ON + zimu.{field_map['资源库名称']} = attr.{field_map['资源库名称']} + WHERE zimu.{field_map['编码']} LIKE ? + """ + + params = [f'%{code}%'] + + # 如果提供了适用范围,添加过滤条件 + if scope: + query += f" AND attr.{field_map['适用范围']} LIKE ?" + params.append(f'%{scope}%') + + cursor.execute(query, params) + + # 获取结果 + results = cursor.fetchall() + data = [dict(row) for row in results] + + # 对结果进行排序,将全字匹配的排在前面 + data = self.sort_results_by_exact_match(data, code, field_map['编码']) + data = data[:self.top_k] + + conn.close() + + if not data: + return {"success": True, "message": "未找到匹配的定额信息", "data": []} + + # 创建反向映射并转换字段名为中文 + ding_e_reverse_map, _ = self.create_reverse_field_map() + + # 添加自定义字段映射 + ding_e_reverse_map['mulu_name'] = '目录名称' + ding_e_reverse_map['attr_pub_time'] = '发布时间' + ding_e_reverse_map['attr_scope'] = '适用范围' + + chinese_data = self.convert_field_names_to_chinese(data, ding_e_reverse_map) + + return {"success": True, "message": "查询成功", "data": chinese_data} + except Exception as e: + return {"success": False, "message": f"查询出错: {str(e)}"} + + def query_qing_dan_by_name(self, name, scope=None): + """根据清单名称查询清单子目表中详情信息,使用向量检索扩大查询范围""" + try: + conn = self.get_db_connection() + cursor = conn.cursor() + + # 获取表名和字段映射 + zimu_table = ExcelToSQLiteProcessor.qing_dan_table_names["清单子目"] + mulu_table = ExcelToSQLiteProcessor.qing_dan_table_names["清单目录"] + attr_table = ExcelToSQLiteProcessor.qing_dan_table_names["资源库属性"] + field_map = ExcelToSQLiteProcessor.qing_dan_field_map + + # 1. 先使用向量检索获取相似名称 + similar_items = self.get_similar_names_by_vector(query_text=name, vector_db=self.qing_dan_vector_db, field_map=field_map, scope=scope) + similar_names = [item['mc'] for item in similar_items] + + # 构建查询条件,始终包含原始名称的模糊匹配 + like_conditions = [f"zimu.{field_map['名称']} LIKE ?"] + params = [f'%{name}%'] + + # 如果有向量检索结果,添加这些结果的模糊匹配条件 + for similar_name in similar_names: + like_conditions.append(f"zimu.{field_map['名称']} LIKE ?") + params.append(f'%{similar_name}%') + + # 将所有条件用OR连接 + like_conditions_str = " OR ".join(like_conditions) + like_conditions_str= f"({like_conditions_str})" + + query = f""" + SELECT + zimu.*, + mulu.{field_map['名称']} as mulu_name, + attr.{field_map['发布时间']} as attr_pub_time, + attr.{field_map['适用范围']} as attr_scope + FROM {zimu_table} zimu + LEFT JOIN {mulu_table} mulu ON + zimu.{field_map['章节码']} = mulu.{field_map['章节码']} AND + zimu.{field_map['资源库名称']} = mulu.{field_map['资源库名称']} + LEFT JOIN {attr_table} attr ON + zimu.{field_map['资源库名称']} = attr.{field_map['资源库名称']} + WHERE {like_conditions_str} + """ + + # 如果提供了适用范围,添加过滤条件 + if scope: + query += f" AND attr.{field_map['适用范围']} LIKE ?" + params.append(f'%{scope}%') + + cursor.execute(query, params) + + # 获取结果 + results = cursor.fetchall() + data = [dict(row) for row in results] + + # 对结果进行排序,将全字匹配的排在前面 + data = self.sort_results_by_exact_match(data, name, field_map['名称']) + data = data[:self.top_k] + + conn.close() + + if not data: + return {"success": True, "message": "未找到匹配的清单信息", "data": []} + + # 创建反向映射并转换字段名为中文 + _, qing_dan_reverse_map = self.create_reverse_field_map() + + # 添加自定义字段映射 + qing_dan_reverse_map['mulu_name'] = '目录名称' + qing_dan_reverse_map['attr_pub_time'] = '发布时间' + qing_dan_reverse_map['attr_scope'] = '适用范围' + + chinese_data = self.convert_field_names_to_chinese(data, qing_dan_reverse_map) + + return {"success": True, "message": "查询成功", "data": chinese_data} + except Exception as e: + return {"success": False, "message": f"查询出错: {str(e)}"} + + def query_qing_dan_by_code(self, code, scope=None): + """根据清单编码查询清单子目表中详情信息""" + try: + code = code.upper() + conn = self.get_db_connection() + cursor = conn.cursor() + + # 获取表名和字段映射 + zimu_table = ExcelToSQLiteProcessor.qing_dan_table_names["清单子目"] + mulu_table = ExcelToSQLiteProcessor.qing_dan_table_names["清单目录"] + attr_table = ExcelToSQLiteProcessor.qing_dan_table_names["资源库属性"] + field_map = ExcelToSQLiteProcessor.qing_dan_field_map + + # 构建连表查询SQL + query = f""" + SELECT + zimu.*, + mulu.{field_map['名称']} as mulu_name, + attr.{field_map['发布时间']} as attr_pub_time, + attr.{field_map['适用范围']} as attr_scope + FROM {zimu_table} zimu + LEFT JOIN {mulu_table} mulu ON + zimu.{field_map['章节码']} = mulu.{field_map['章节码']} AND + zimu.{field_map['资源库名称']} = mulu.{field_map['资源库名称']} + LEFT JOIN {attr_table} attr ON + zimu.{field_map['资源库名称']} = attr.{field_map['资源库名称']} + WHERE zimu.{field_map['编码']} LIKE ? + """ + + params = [f'%{code}%'] + + # 如果提供了适用范围,添加过滤条件 + if scope: + query += f" AND attr.{field_map['适用范围']} LIKE ?" + params.append(f'%{scope}%') + + cursor.execute(query, params) + + # 获取结果 + results = cursor.fetchall() + data = [dict(row) for row in results] + + # 对结果进行排序,将全字匹配的排在前面 + data = self.sort_results_by_exact_match(data, code, field_map['编码']) + data = data[:self.top_k] + + conn.close() + + if not data: + return {"success": True, "message": "未找到匹配的清单信息", "data": []} + + # 创建反向映射并转换字段名为中文 + _, qing_dan_reverse_map = self.create_reverse_field_map() + + # 添加自定义字段映射 + qing_dan_reverse_map['mulu_name'] = '目录名称' + qing_dan_reverse_map['attr_pub_time'] = '发布时间' + qing_dan_reverse_map['attr_scope'] = '适用范围' + + chinese_data = self.convert_field_names_to_chinese(data, qing_dan_reverse_map) + + return {"success": True, "message": "查询成功", "data": chinese_data} + except Exception as e: + return {"success": False, "message": f"查询出错: {str(e)}"} + + def batch_query(self, requests:BatchQueryRequest): + """批量查询接口,支持向量检索""" + dinge_results = [] + qingdan_results = [] + tracking_dict = {} # 用于跟踪已查询过的项目,避免重复 + + try: + # 获取查询信息 + dinge_info = requests.dinge_qingdan_info.dinge_info_list + qingdan_info = requests.dinge_qingdan_info.qingdan_info + scope = requests.scope + + # 处理定额编码查询 + for code in dinge_info.dinge_code_list or []: + key = f"dinge_code_{code}_{scope}" + if key not in tracking_dict: + result = self.query_ding_e_by_code(code, scope) + if result["success"] and result["data"]: + dinge_results.extend(result["data"]) + tracking_dict[key] = True + + # 处理定额名称查询 + for name in dinge_info.dinge_name_list or []: + key = f"dinge_name_{name}_{scope}" + if key not in tracking_dict: + result = self.query_ding_e_by_name(name, scope) + if result["success"] and result["data"]: + dinge_results.extend(result["data"]) + tracking_dict[key] = True + + # 处理清单编码查询 + for code in qingdan_info.qingdan_code_list or []: + key = f"qingdan_code_{code}_{scope}" + if key not in tracking_dict: + result = self.query_qing_dan_by_code(code, scope) + if result["success"] and result["data"]: + qingdan_results.extend(result["data"]) + tracking_dict[key] = True + + # 处理清单名称查询 + for name in qingdan_info.qingdan_name_list or []: + key = f"qingdan_name_{name}_{scope}" + if key not in tracking_dict: + result = self.query_qing_dan_by_name(name, scope) + if result["success"] and result["data"]: + qingdan_results.extend(result["data"]) + tracking_dict[key] = True + + # 限制返回结果数量 + dinge_results = dinge_results[:self.top_k] + qingdan_results = qingdan_results[:self.top_k] + + if not dinge_results and not qingdan_results: + return { + "success": True, + "message": "未找到匹配信息", + "dinge_data": [], + "qingdan_data": [] + } + + return { + "success": True, + "message": "查询成功", + "dinge_data": dinge_results, + "qingdan_data": qingdan_results + } + except Exception as e: + return {"success": False, "message": f"批量查询出错: {str(e)}", "dinge_data": [], "qingdan_data": []} + +# 创建查询服务实例 +query_service = QingDanDingEQueryService() + +# 1. 根据定额名称查询定额子目表中详情信息(包含资源库属性和目录信息) +@app.get("/api/ding_e/by_name", response_model=QueryResponse) +async def query_ding_e_by_name( + name: str = Query(..., description="定额名称"), + scope: Optional[str] = Query(None, description="适用范围") +): + result = query_service.query_ding_e_by_name(name, scope) + if not result["success"]: + raise HTTPException(status_code=500, detail=result["message"]) + return QueryResponse(**result) + +# 2. 根据定额编码查询定额子目表中详情信息(包含资源库属性和目录信息) +@app.get("/api/ding_e/by_code", response_model=QueryResponse) +async def query_ding_e_by_code( + code: str = Query(..., description="定额编码"), + scope: Optional[str] = Query(None, description="适用范围") +): + result = query_service.query_ding_e_by_code(code, scope) + if not result["success"]: + raise HTTPException(status_code=500, detail=result["message"]) + return QueryResponse(**result) + +# 3. 根据清单名称查询清单子目表中详情信息(包含资源库属性和目录信息) +@app.get("/api/qing_dan/by_name", response_model=QueryResponse) +async def query_qing_dan_by_name( + name: str = Query(..., description="清单名称"), + scope: Optional[str] = Query(None, description="适用范围") +): + result = query_service.query_qing_dan_by_name(name, scope) + if not result["success"]: + raise HTTPException(status_code=500, detail=result["message"]) + return QueryResponse(**result) + +# 4. 根据清单编码查询清单子目表中详情信息(包含资源库属性和目录信息) +@app.get("/api/qing_dan/by_code", response_model=QueryResponse) +async def query_qing_dan_by_code( + code: str = Query(..., description="清单编码"), + scope: Optional[str] = Query(None, description="适用范围") +): + result = query_service.query_qing_dan_by_code(code, scope) + if not result["success"]: + raise HTTPException(status_code=500, detail=result["message"]) + return QueryResponse(**result) + +# 5. 批量查询定额和清单信息 +@app.post("/api/batch_query", response_model=BatchQueryResponse) +async def batch_query(request: BatchQueryRequest): + result = query_service.batch_query(request) + if not result["success"]: + raise HTTPException(status_code=500, detail=result["message"]) + return BatchQueryResponse(**result) + +# 启动服务器的函数 +def start_api_server(): + """启动FastAPI服务器""" + uvicorn.run(app, host="0.0.0.0", port=8005) + +# 主函数 +def main(): + """主函数""" + print("正在启动API服务器...") + start_api_server() + +if __name__ == "__main__": + main() + # uvicorn rag2_0.dify.query_dinge_qingdan_api:app --host 0.0.0.0 --port 8005 --workers 10 \ No newline at end of file diff --git a/rag2_0/intent_recognition/IntentRecognition.py b/rag2_0/intent_recognition/IntentRecognition.py index 9366a99..8c04089 100755 --- a/rag2_0/intent_recognition/IntentRecognition.py +++ b/rag2_0/intent_recognition/IntentRecognition.py @@ -188,6 +188,8 @@ class AsyncIntentRecognizer: Returns: 分类结果 """ + start_time = time.time() # 记录开始时间 + classification_parser = PydanticOutputParser(pydantic_object=Classification) formatted_prompt = classification_prompt.format(user_input=query, classification_info=classification_info, @@ -203,6 +205,11 @@ class AsyncIntentRecognizer: response.content = response.content.strip() clean_output = re.sub(r'.*?', '', response.content, flags=re.DOTALL) parsed_output = classification_parser.parse(clean_output) + + # 计算并打印耗时 + end_time = time.time() + logging.info(f"意图分类耗时: {end_time - start_time:.2f}秒") + return parsed_output except Exception as e: raise RuntimeError(f"解析分类结果时出错: {e}") from e @@ -268,43 +275,6 @@ class AsyncIntentRecognizer: parsed_output = terms_list_parser.parse(clean_output) return parsed_output.terms - - async def _rerank_matched_terms_async(self, query_key: str, matched_terms: set, top_k: int = 2, rerank_score:float = 0.6) -> List[Term]: - """ - 异步对召回的专业术语进行重排序,按与用户查询的相关性排序 - - Args: - query: 用户查询 - matched_terms: 匹配到的专业术语集合 - query_keys: 用户查询中提取的关键词列表 - - Returns: - 重排序后的专业术语列表 - """ - if not matched_terms: - return [] - - if len(matched_terms) <= top_k: - return list(matched_terms) - - try: - # 将每个术语转换为可用于重排序的文本表示 - term_texts = ["名称:" + term.name + "|" + "同义词:" + ";".join(term.synonymous) for term in matched_terms] - - # 使用异步重排序模型 - rerank_results = await XinferenceReRankerModel.rerank_async(query_key, term_texts, top_k=top_k) - - # 将matched_terms转换为列表以便按索引访问 - matched_terms_list = list(matched_terms) - - # 根据重排序结果获取排序后的术语列表 - reranked_terms = [matched_terms_list[result["index"]] for result in rerank_results if result["score"] >= rerank_score] - - return reranked_terms - - except Exception as e: - raise RuntimeError(f"异步_rerank_matched_terms重排失败:{e}") from e - async def _match_keywords_async(self, query: str, use_jieba: bool = False) -> Tuple[TermList, List[str]]: """ 异步从用户问题中匹配关键词,结合LLM提取和向量检索 @@ -345,10 +315,56 @@ class AsyncIntentRecognizer: total_time = end_time - start_time # 输出整合的时间日志 - logging.info(f"异步关键词匹配耗时统计 - 总耗时: {total_time:.2f}秒") + # logging.info(f"异步关键词匹配耗时统计 - 总耗时: {total_time:.2f}秒") return term_list, query_keys + async def _get_dinge_qingdan_info(self, query: str, chat_history: List[Dict[str, str]] = None) -> dict: + """ + 获取问题中定额、清单相关信息 + + Args: + query: 用户查询 + + Returns: + 指令详情字典,包含定额、清单相关信息 + """ + start_time = time.time() # 记录开始时间 + + prompt=f""" + 当前提问内容: + {query} + 对话上下文: + + {json.dumps(chat_history, ensure_ascii=False)} + + + 1、请从当前提问内容中提取电力造价行中定额编码、定额名称、清单编码、清单名称 + 2、请勿随机编造,如果没有提取到,返回空内容 + 3、返回结果为json格式 + {{ + "dinge_info_list":{{"dinge_code_list":["xxxx","xxxx"], "dinge_name_list":["xxxx","xxxx"]}}, + "qingdan_info":{{"qingdan_code_list":["xxxx","xxxx"], "qingdan_name_list":["xxxx","xxxx"]}} + }} + """ + + try: + response = await self._llm.invoke_async(prompt, False, response_format={"type": "json_object"}) + response.content = response.content.strip() + clean_output = re.sub(r'.*?', '', response.content, flags=re.DOTALL) + parsed_output = JsonOutputParser().parse(clean_output) + + # 计算并打印耗时 + end_time = time.time() + logging.info(f"获取定额清单信息耗时: {end_time - start_time:.2f}秒") + + return parsed_output + except Exception as e: + # 发生异常时也记录耗时 + logging.error(f"获取问题定额清单详情失败: {e}", exc_info=True) + parsed_output = {"dinge_info_list": [], "qingdan_info": []} + return parsed_output + async def _rewrite_query_async(self, query: str, keywords: TermList, query_keys:List[str], chat_history: List[Dict[str, str]] = None, context: str = "") -> QueryRewrite: """ 异步对用户问题进行改写 @@ -361,7 +377,7 @@ class AsyncIntentRecognizer: Returns: 改写结果 """ - + start_time = time.time() # 准备问题改写提示 terms_dict = [term.model_dump(exclude={"description"}) for term in keywords.terms] keywords_str = json.dumps(terms_dict, ensure_ascii=False) @@ -378,6 +394,9 @@ class AsyncIntentRecognizer: response.content = response.content.strip() clean_output = re.sub(r'.*?', '', response.content, flags=re.DOTALL) parsed_output = query_rewrite_parser.parse(clean_output) + end_time = time.time() + process_time=end_time-start_time + logging.info(f"异步问题改写耗时 - 耗时: {process_time:.2f}秒") return parsed_output except Exception as e: raise RuntimeError(f"解析问题改写结果时出错: {e}") from e @@ -447,12 +466,12 @@ class AsyncIntentRecognizer: context=conversation_context ) classification_task = self._classify_intent_async(query, conversation_context, chat_history, previous_slots) - + # 定额清单信息 + dinge_qingdan_info_task = self._get_dinge_qingdan_info(query, chat_history) + # 并行等待问题改写和意图分类完成 - start_time = time.time() - rewrite, classification = await asyncio.gather(rewrite_task, classification_task) - end_time = time.time() - logging.info(f"意图分类耗时统计 - 总耗时: {end_time - start_time:.2f}秒") + + rewrite, classification, dinge_qingdan_info = await asyncio.gather(rewrite_task, classification_task, dinge_qingdan_info_task) # 特殊处理 锁相关咨询 if classification.vertical_classification == "安装下载注册" and classification.sub_classification == "软件锁类": @@ -470,7 +489,8 @@ class AsyncIntentRecognizer: "keywords": keywords_terms.model_dump(), "rewrite": rewrite.model_dump(), "query_keys": query_keys, - "slot_filling": slot_filling_result + "slot_filling": slot_filling_result, + "dinge_qingdan_info": dinge_qingdan_info } # 等待所有query_expand_tasks完成 @@ -505,7 +525,8 @@ class AsyncIntentRecognizer: "rewrite": rewrite.model_dump(), "query_keys": query_keys, "slot_filling": slot_filling_result, - "query_expand": query_expand + "query_expand": query_expand, + "dinge_qingdan_info": dinge_qingdan_info } async def _fill_slots_async(self, query: str, classification: Classification, conversation_context: str = "", diff --git a/rag2_0/intent_recognition/ProfessionalNounVector.py b/rag2_0/intent_recognition/ProfessionalNounVector.py index 43dcca2..b589c83 100755 --- a/rag2_0/intent_recognition/ProfessionalNounVector.py +++ b/rag2_0/intent_recognition/ProfessionalNounVector.py @@ -14,7 +14,7 @@ import asyncio from typing import List, Dict, Any, Tuple, Optional from langchain.embeddings.base import Embeddings from langchain_community.vectorstores import FAISS -from rag2_0.tool.ModelTool import SiliconFlowEmbeddings +from rag2_0.tool.ModelTool import XinferenceEmbeddings import logging import httpx @@ -28,7 +28,7 @@ def get_embedding_model(api_key: str = None) -> Embeddings: Returns: 嵌入模型实例 """ - return SiliconFlowEmbeddings(api_key=api_key) + return XinferenceEmbeddings(api_key=api_key) class ProfessionalNounVectorizer: diff --git a/rag2_0/tool/ModelTool.py b/rag2_0/tool/ModelTool.py index 2eefb86..0c27a95 100755 --- a/rag2_0/tool/ModelTool.py +++ b/rag2_0/tool/ModelTool.py @@ -21,7 +21,7 @@ import logging from rag2_0.tool.APIKeyManager import APIKeyManager from urllib.parse import urljoin -class SiliconFlowEmbeddings(Embeddings): +class XinferenceEmbeddings(Embeddings): """SiliconFlow嵌入模型封装""" def __init__(self, api_key: str, model: str = os.getenv("EMBEDDING_MODEL_NAME", "bge-m3")): self.api_key = api_key @@ -281,7 +281,7 @@ if __name__ == "__main__": async def test_async(): # 测试异步嵌入 api_key = APIKeyManager.get_api_key() - embeddings = SiliconFlowEmbeddings(api_key=api_key) + embeddings = XinferenceEmbeddings(api_key=api_key) query_embedding = await embeddings.embed_query_async("测试查询") print(f"异步嵌入向量维度: {len(query_embedding)}") diff --git a/start_DifyQueryRetrieval_api.sh b/start_DifyQueryRetrieval_api.sh index 072177b..9cfff55 100755 --- a/start_DifyQueryRetrieval_api.sh +++ b/start_DifyQueryRetrieval_api.sh @@ -4,14 +4,14 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" # 检查是否已经存在名为DifyQueryRetrieval_api的screen会话 -if screen -ls | grep -q "DifyQueryRetrieval_api"; then +if screen -ls | grep "DifyQueryRetrieval_api"; then echo "Screen session 'DifyQueryRetrieval_api' already exists." else # 启动一个名为DifyQueryRetrieval_api的screen会话,并在其中执行后续命令 - screen -dmS DifyQueryRetrieval_api bash -c ' + screen -dmS DifyQueryRetrieval_api bash -c " cd \"$SCRIPT_DIR\" uv run uvicorn rag2_0.dify.DifyQueryRetrieval_api:app --host 0.0.0.0 --port 8002 --workers 25 - ' + " # 输出提示信息 echo "Started screen session 'DifyQueryRetrieval_api' and executed the command." diff --git a/uv.lock b/uv.lock index 44fdc57..c3b31d6 100644 --- a/uv.lock +++ b/uv.lock @@ -1540,6 +1540,7 @@ dependencies = [ { name = "python-dotenv" }, { name = "requests" }, { name = "sqlalchemy" }, + { name = "sqlite-vss" }, { name = "tqdm" }, { name = "uvicorn" }, { name = "xlsxwriter" }, @@ -1568,6 +1569,7 @@ requires-dist = [ { name = "python-dotenv", specifier = ">=1.1.0" }, { name = "requests", specifier = ">=2.32.4" }, { name = "sqlalchemy", specifier = ">=2.0.41" }, + { name = "sqlite-vss", specifier = ">=0.1.2" }, { name = "tqdm", specifier = ">=4.67.1" }, { name = "uvicorn", specifier = ">=0.35.0" }, { name = "xlsxwriter", specifier = ">=3.2.5" }, @@ -1726,6 +1728,16 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1c/fc/9ba22f01b5cdacc8f5ed0d22304718d2c758fce3fd49a5372b886a86f37c/sqlalchemy-2.0.41-py3-none-any.whl", hash = "sha256:57df5dc6fdb5ed1a88a1ed2195fd31927e705cad62dedd86b46972752a80f576", size = 1911224, upload-time = "2025-05-14T17:39:42.154Z" }, ] +[[package]] +name = "sqlite-vss" +version = "0.1.2" +source = { registry = "https://pypi.org/simple" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/68/f7/df3bde9cd7409bb827fa90bec8e1f99b7459e76f2ddd446506cc2319c199/sqlite_vss-0.1.2-py3-none-macosx_10_6_x86_64.whl", hash = "sha256:9eefa4207f8b522e32b2747fce44422c773e36710bf807613795218c7ba125f0", size = 1684060, upload-time = "2023-08-06T02:38:46.103Z" }, + { url = "https://files.pythonhosted.org/packages/aa/28/bd9a9c3aa2841755ce0196137daa386966e2b4ad65f6806edb18fcdf33cf/sqlite_vss-0.1.2-py3-none-macosx_11_0_arm64.whl", hash = "sha256:84994eaf7fe700218b258422358c4536a6aca39b96026c308b28630967f954c4", size = 1330091, upload-time = "2023-08-06T02:38:47.923Z" }, + { url = "https://files.pythonhosted.org/packages/39/77/74439767271950f6e463ee4d1594d82dce4e2fa5bf2c73b343046a083f4d/sqlite_vss-0.1.2-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux1_x86_64.whl", hash = "sha256:e44f03bc4cb214bb77b206519abfb623e3e4795967a569218e288927a7715806", size = 1553947, upload-time = "2023-08-06T02:38:49.766Z" }, +] + [[package]] name = "starlette" version = "0.46.2"