diff --git a/data/nouns/merged_nouns.json b/data/nouns/merged_nouns.json index f33135b..e1d63bb 100644 --- a/data/nouns/merged_nouns.json +++ b/data/nouns/merged_nouns.json @@ -3034,11 +3034,6 @@ "synonymous": [], "description": "VC运行时库的安装程序" }, - { - "name": "技改检修计价通T1", - "synonymous": [], - "description": "电力行业技改检修计价软件的名称" - }, { "name": "博微电力建设计价通", "synonymous": [], @@ -9885,11 +9880,6 @@ ], "description": "报表参数配置项,控制材料/设备的显示筛选" }, - { - "name": "技改检修清单T1", - "synonymous": [], - "description": "软件版本标识,对应技术改造和检修工程清单类型" - }, { "name": "导入EXCEL版物料库", "synonymous": [], @@ -10023,7 +10013,8 @@ { "name": "技改检修清单计价通T1软件", "synonymous": [ - "技改检修清单软件" + "技改检修清单软件", + "技改检修清单T1" ], "description": "电力行业用于技改检修清单计价的软件名称" }, diff --git a/data/nouns/professional_nouns_index/index.faiss b/data/nouns/professional_nouns_index/index.faiss index 6e8737e..e3f4468 100644 Binary files a/data/nouns/professional_nouns_index/index.faiss and b/data/nouns/professional_nouns_index/index.faiss differ diff --git a/data/nouns/professional_nouns_index/index.pkl b/data/nouns/professional_nouns_index/index.pkl index cd55d2e..c70ea14 100644 Binary files a/data/nouns/professional_nouns_index/index.pkl and b/data/nouns/professional_nouns_index/index.pkl differ diff --git a/rag2_0/demo/judge_query_full.py b/rag2_0/demo/judge_query_full.py deleted file mode 100755 index dd40512..0000000 --- a/rag2_0/demo/judge_query_full.py +++ /dev/null @@ -1,618 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -完整性问题判断工具 - -此脚本用于读取Excel文件中的问题,调用LLM判断问题是否完整,并将结果保存到Excel文件中。 - -用法示例: - python judge_query_full.py -i "问题数据.xlsx" -o "完整问题结果.xlsx" -w 50 -c 0 - -命令行参数: - -i, --input: 输入Excel文件路径 - -o, --output: 输出Excel文件路径 - -w, --workers: 并发处理的最大线程数 - -c, --column: 要处理的问题所在列的索引(从0开始) - -t, --test: 测试单个问题,不处理Excel文件 -""" - -import pandas as pd -import json -import os -import time -import re -import argparse -import traceback -from pathlib import Path -from rag2_0.tool.ModelTool import OpenAiLLM -from rag2_0.tool.APIKeyManager import APIKeyManager -from openpyxl.utils import get_column_letter -from openpyxl.styles import Alignment, PatternFill, Font, Border, Side -from tqdm import tqdm -import concurrent.futures -import threading - -# 默认设置 -DEFAULT_EXCEL_PATH = r"/data/QueryRewrite/data/excel/7000条对话数据.xlsx" -DEFAULT_OUTPUT_PATH = r"/data/QueryRewrite/data/excel/7000条对话数据_完整问题结果.xlsx" -DEFAULT_MAX_WORKERS = 50 - - -class QueryCompletenessJudge: - """ - 问题完整性判断工具类 - - 用于评估问题是否完整,并将结果保存到Excel文件中。 - 可以批量处理Excel文件中的问题,也可以测试单个问题。 - """ - - def __init__(self, input_path=DEFAULT_EXCEL_PATH, output_path=DEFAULT_OUTPUT_PATH, - max_workers=DEFAULT_MAX_WORKERS, column_index=0): - """ - 初始化问题完整性判断工具 - - 参数: - input_path (str): 输入Excel文件路径 - output_path (str): 输出Excel文件路径 - max_workers (int): 并发处理的最大线程数 - column_index (int): 要处理的问题所在列的索引(从0开始) - """ - self.input_path = input_path - self.output_path = output_path - self.max_workers = max_workers - self.column_index = column_index - self.llm_client = self._create_llm_client() - - def _extract_json_from_response(self, full_answer): - """ - 从LLM响应中提取JSON部分 - - 参数: - full_answer (str): LLM的完整响应文本 - - 返回: - dict: 解析后的JSON对象,如果解析失败则返回None - """ - # 尝试从回答中提取JSON部分 - json_match = re.search(r'```json\s*(.*?)\s*```', full_answer, re.DOTALL) - if json_match: - json_str = json_match.group(1) - else: - # 如果没有找到```json```格式,尝试寻找普通的JSON对象 - json_match = re.search(r'({[\s\S]*"is_complete"[\s\S]*})', full_answer) - if json_match: - json_str = json_match.group(1) - else: - # 如果仍然没有找到,返回None - return None - - try: - # 解析JSON - return json.loads(json_str) - except json.JSONDecodeError: - return None - - def _create_llm_prompt(self, question): - """ - 创建LLM提示词 - - 参数: - question (str): 需要判断完整性的问题 - - 返回: - str: 格式化后的提示词 - """ - return f"""你是一个电力造价行业专家,用户正在使用电力造价软件,并提出了相关问题。请分析以下问题是否完整。 - -问题:{question} - -首先,分析这个问题的结构和内容,思考它是否包含足够的信息来表达清晰的意图。 -考虑以下几点: -1. 问题是否有明确的核心意图,不需要面面俱到 -2. 问题是否缺少必要的上下文 -3. **问题如果涉及软件相关,则只需要包含:软件名称、软件功能或软件目的即可** - - -在你的分析之后,请用JSON格式给出最终结论,格式如下: -```json -{{ - "is_complete": true或false, - "reason": "判断原因的简要说明", - "confidence": 0到100之间的数值,表示你对判断的置信度 -}} -``` - -请确保JSON格式正确,以便于程序解析。""" - - def _create_llm_client(self, api_key=None): - """ - 创建LLM客户端 - - 参数: - api_key (str, optional): API密钥,如果为None则从APIKeyManager获取 - - 返回: - OpenAiLLM: LLM客户端实例 - """ - if api_key is None: - api_key = APIKeyManager.get_api_key() - - return OpenAiLLM( - api_key=api_key, - base_url="https://api.siliconflow.cn/v1", # 可以根据实际情况修改 - model="deepseek-ai/DeepSeek-V3", # 可以根据实际情况修改 - temperature=0.2, - max_tokens=100 - ) - - def is_question_complete(self, question): - """ - 调用LLM判断问题是否完整 - - 参数: - question (str): 需要判断的问题 - - 返回: - tuple: (bool, str) - 是否完整的布尔值和LLM的详细回复 - """ - # 最大重试次数 - max_retries = 3 - retry_count = 0 - retry_delay = 2 # 重试延迟,单位:秒 - - while retry_count <= max_retries: - try: - # 创建提示词 - prompt = self._create_llm_prompt(question) - - # 使用OpenAiLLM调用模型 - response = self.llm_client.invoke(prompt) - - # 处理可能的响应格式 - if hasattr(response, 'content'): - full_answer = response.content - else: - # 如果response是字符串 - full_answer = str(response) - - # 提取JSON部分 - result = self._extract_json_from_response(full_answer) - - if result: - is_complete = result.get("is_complete", False) - return is_complete, full_answer - else: - # 如果没有找到或解析失败,使用简单判断 - is_complete = "完整" in full_answer[:100] - return is_complete, full_answer - - except Exception as e: - retry_count += 1 - if retry_count <= max_retries: - # 非最后一次重试,打印错误并继续 - time.sleep(retry_delay) - # 每次重试增加延迟时间,避免频繁失败 - retry_delay *= 2 - else: - # 已达到最大重试次数,返回错误 - stack_trace = traceback.format_exc() - print(f"错误: 经过 {max_retries} 次重试后仍然失败: {str(e)}") - print(f"堆栈跟踪信息:\n{stack_trace}") - return False, f"错误: 经过 {max_retries} 次重试后仍然失败: {str(e)}\n堆栈摘要: {str(e).__class__.__name__}" - - # 不应该到达这里,但为了代码完整性添加 - return False, "未知错误:重试机制逻辑错误" - - def _process_question(self, args, complete_questions, progress_counter, progress_lock, complete_questions_lock, pbar): - """ - 处理单个问题并更新进度 - - 参数: - args (tuple): 包含问题索引、问题内容、LLM客户端和总问题数的元组 - complete_questions (list): 存储完整问题的列表 - progress_counter (dict): 进度计数器 - progress_lock (threading.Lock): 进度锁 - complete_questions_lock (threading.Lock): 完整问题列表锁 - pbar (tqdm): 进度条对象 - """ - index, question, llm_client, total_questions = args - - # 跳过空问题 - if pd.isna(question) or question.strip() == "": - with progress_lock: - progress_counter["processed"] += 1 - pbar.update(1) - return None - - # 调用LLM判断问题是否完整 - is_complete, full_answer = self.is_question_complete(question) - - if is_complete: - # 从答案中提取JSON - parsed_json = self._extract_json_from_response(full_answer) - - if parsed_json: - # 构造包含解析出的JSON信息的结果 - result = { - "问题": question, - "LLM回复": full_answer, - "完整性": "完整" if parsed_json.get("is_complete", False) else "不完整", - "原因": parsed_json.get("reason", "未提供"), - "置信度": parsed_json.get("confidence", 0) - } - - # 更新计数 - with progress_lock: - if result["完整性"] == "完整": - progress_counter["complete"] += 1 - else: - progress_counter["incomplete"] += 1 - else: - # JSON解析失败,只保存原始回答 - result = { - "问题": question, - "LLM回复": full_answer, - "完整性": "完整" - } - - # 更新计数 - with progress_lock: - progress_counter["complete"] += 1 - - with complete_questions_lock: - complete_questions.append(result) - else: - with progress_lock: - progress_counter["incomplete"] += 1 - # 更新进度条 - with progress_lock: - progress_counter["processed"] += 1 - # 更新进度条描述 - pbar.set_postfix( - 完整=progress_counter["complete"], - 不完整=progress_counter["incomplete"], - 完整率=f"{progress_counter['complete']/max(1, progress_counter['processed']):.1%}" - ) - pbar.update(1) - - def _shorten_response(self, response): - """ - 截断LLM响应,提取重要信息 - - 参数: - response (str): 原始LLM响应 - - 返回: - str: 截断后的响应 - """ - # 保留思考过程的前200个字符和JSON部分 - json_match = re.search(r'```json\s*(.*?)\s*```', response, re.DOTALL) - if json_match: - json_part = json_match.group(0) - prefix = response[:200] + "..." if len(response) > 200 else response - return f"{prefix}\n\n{json_part}" - return response[:500] + "..." if len(response) > 500 else response - - def _prepare_excel_dataframe(self, complete_questions): - """ - 将结果处理为DataFrame用于Excel输出 - - 参数: - complete_questions (list): 完整问题列表 - - 返回: - pandas.DataFrame: 处理后的DataFrame - """ - # 将结果列表转换为DataFrame - result_df = pd.DataFrame(complete_questions) - - # 处理LLM回复列,截取一定长度以避免Excel单元格过大 - if "LLM回复" in result_df.columns: - result_df["LLM回复"] = result_df["LLM回复"].apply(self._shorten_response) - - # 调整列的顺序,确保重要列在前面 - column_order = ["问题", "完整性", "置信度", "原因", "LLM回复"] - # 过滤掉不存在的列 - column_order = [col for col in column_order if col in result_df.columns] - # 确保所有剩余的列也被包含 - for col in result_df.columns: - if col not in column_order: - column_order.append(col) - - # 重新排序列 - return result_df[column_order] - - def _set_excel_column_widths(self, worksheet): - """ - 设置Excel列宽 - - 参数: - worksheet (openpyxl.worksheet.worksheet.Worksheet): Excel工作表 - """ - for col in range(1, worksheet.max_column + 1): - col_letter = get_column_letter(col) - column_name = worksheet[f"{col_letter}1"].value - - if column_name == "问题": - worksheet.column_dimensions[col_letter].width = 40 - elif column_name == "LLM回复": - worksheet.column_dimensions[col_letter].width = 60 - elif column_name == "原因": - worksheet.column_dimensions[col_letter].width = 30 - elif column_name == "完整性": - worksheet.column_dimensions[col_letter].width = 10 - elif column_name == "置信度": - worksheet.column_dimensions[col_letter].width = 10 - else: - worksheet.column_dimensions[col_letter].width = 15 - - def _apply_excel_cell_styles(self, worksheet): - """ - 应用单元格样式 - - 参数: - worksheet (openpyxl.worksheet.worksheet.Worksheet): Excel工作表 - - 返回: - openpyxl.styles.Border: 边框样式,用于统计信息 - """ - # 定义样式 - header_fill = PatternFill(start_color="DDEBF7", end_color="DDEBF7", fill_type="solid") - header_font = Font(bold=True) - wrap_alignment = Alignment(wrap_text=True, vertical="top") - border = Border( - left=Side(style='thin'), - right=Side(style='thin'), - top=Side(style='thin'), - bottom=Side(style='thin') - ) - - # 应用样式到每个单元格 - for row in worksheet.iter_rows(min_row=1, max_row=worksheet.max_row, min_col=1, max_col=worksheet.max_column): - for cell in row: - cell.alignment = wrap_alignment - cell.border = border - - # 为标题行应用特殊样式 - if cell.row == 1: - cell.fill = header_fill - cell.font = header_font - - # 为完整性列应用条件格式 - if cell.row > 1: # 跳过标题行 - column_name = worksheet.cell(row=1, column=cell.column).value - if column_name == "完整性": - if cell.value == "完整": - cell.fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid") - else: - cell.fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid") - - return border # 返回边框样式以便在统计信息中重用 - - def _add_statistics_to_excel(self, worksheet, complete_questions, total_rows, total_questions, border): - """ - 添加统计信息到Excel表格 - - 参数: - worksheet (openpyxl.worksheet.worksheet.Worksheet): Excel工作表 - complete_questions (list): 完整问题列表 - total_rows (int): 总行数 - total_questions (int): 总问题数 - border (openpyxl.styles.Border): 边框样式 - - 返回: - int: 完整问题数量 - """ - # 计算统计数据 - complete_count = sum(1 for item in complete_questions if item.get("完整性") == "完整") - incomplete_count = total_rows - complete_count - - # 添加统计行 - worksheet.append([""]) # 空行 - - stat_row = worksheet.max_row + 1 - worksheet.cell(row=stat_row, column=1, value="统计信息") - worksheet.cell(row=stat_row, column=1).font = Font(bold=True) - - worksheet.cell(row=stat_row+1, column=1, value="总问题数") - worksheet.cell(row=stat_row+1, column=2, value=total_rows) - - worksheet.cell(row=stat_row+2, column=1, value="完整问题数") - worksheet.cell(row=stat_row+2, column=2, value=complete_count) - worksheet.cell(row=stat_row+2, column=2).fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid") - - worksheet.cell(row=stat_row+3, column=1, value="不完整问题数") - worksheet.cell(row=stat_row+3, column=2, value=incomplete_count) - worksheet.cell(row=stat_row+3, column=2).fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid") - - worksheet.cell(row=stat_row+4, column=1, value="完整问题比例") - worksheet.cell(row=stat_row+4, column=2, value=f"{complete_count/total_rows:.2%}" if total_rows > 0 else "0%") - - # 应用边框到统计行 - for r in range(stat_row, stat_row+5): - for c in range(1, 3): - worksheet.cell(row=r, column=c).border = border - - return complete_count - - def save_results_to_excel(self, complete_questions, total_questions): - """ - 将结果保存到Excel文件 - - 参数: - complete_questions (list): 完整问题列表 - total_questions (int): 总问题数 - """ - if not complete_questions: - print(f"没有找到完整的问题。") - return - - # 准备数据 - result_df = self._prepare_excel_dataframe(complete_questions) - total_rows = len(result_df) - - # 保存到Excel文件 - result_df.to_excel(self.output_path, index=False, engine='openpyxl') - - # 应用Excel样式 - from openpyxl import load_workbook - wb = load_workbook(self.output_path) - ws = wb.active - - # 设置列宽 - self._set_excel_column_widths(ws) - - # 应用单元格样式 - border = self._apply_excel_cell_styles(ws) - - # 添加统计信息 - complete_count = self._add_statistics_to_excel(ws, complete_questions, total_rows, total_questions, border) - - # 保存样式化的工作簿 - wb.save(self.output_path) - - # 输出结果统计 - print(f"处理完成。共有{complete_count}/{total_questions}个完整问题被保存到 {self.output_path}") - print(f"完整问题比例: {complete_count/total_questions:.2%}" if total_questions > 0 else "完整问题比例: 0%") - - def process_excel_file(self): - """ - 处理Excel文件中的问题 - - 读取Excel文件,判断问题完整性,并将结果保存到输出Excel文件 - """ - # 确保Excel文件存在 - if not os.path.exists(self.input_path): - print(f"错误: 找不到Excel文件 '{self.input_path}'") - return - - # 读取Excel文件 - print(f"正在读取Excel文件: {self.input_path}") - try: - df = pd.read_excel(self.input_path) - except Exception as e: - print(f"读取Excel文件时出错: {e}") - return - - # 检查列数据 - if len(df.columns) <= self.column_index: - print(f"错误: Excel文件没有足够的列,请求索引 {self.column_index},但只有 {len(df.columns)} 列") - return - - # 获取目标列名称 - target_col = df.columns[self.column_index] - print(f"目标列名称: {target_col}") - - # 准备存储完整问题的列表 - complete_questions = [] - total_questions = len(df) - - print(f"总共有{total_questions}个问题需要判断") - - # 用于线程安全的列表操作和进度计数 - complete_questions_lock = threading.Lock() - progress_counter = {"processed": 0, "complete": 0, "incomplete": 0} - progress_lock = threading.Lock() - - # 准备问题列表 - questions = [(i, str(row[target_col]), self.llm_client, total_questions) - for i, row in df.iterrows()] - - # 记录开始时间 - start_time = time.time() - - # 使用tqdm创建进度条 - print(f"开始处理问题,使用 {self.max_workers} 个并发线程...") - with tqdm(total=total_questions, desc="处理问题", unit="问题") as pbar: - # 使用线程池并发处理 - with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: - # 提交所有任务 - futures = [executor.submit( - self._process_question, - args, - complete_questions, - progress_counter, - progress_lock, - complete_questions_lock, - pbar - ) for args in questions] - - # 等待所有任务完成 - concurrent.futures.wait(futures) - - # 计算总处理时间 - processing_time = time.time() - start_time - print(f"处理完成,耗时: {processing_time:.2f}秒,平均每问题: {processing_time/total_questions:.2f}秒") - - # 将完整问题保存到Excel文件 - self.save_results_to_excel(complete_questions, total_questions) - - def test_single_question(self, question): - """ - 测试单个问题的完整性 - - 参数: - question (str): 要测试的问题 - """ - print(f"问题: {question}") - print("正在调用LLM判断问题是否完整...") - - # 调用LLM判断问题是否完整 - is_complete, full_answer = self.is_question_complete(question) - - # 从答案中提取JSON - parsed_json = self._extract_json_from_response(full_answer) - - print("\n==== LLM回复 ====") - print(full_answer) - print("================\n") - - if parsed_json: - print(f"判断结果: {'完整' if parsed_json.get('is_complete', False) else '不完整'}") - print(f"判断原因: {parsed_json.get('reason', '未提供')}") - print(f"置信度: {parsed_json.get('confidence', 0)}%") - else: - print(f"判断结果: {'完整' if is_complete else '不完整'} (简单判断)") - print("无法从回复中提取JSON结构化数据") - - -def parse_arguments(): - """解析命令行参数""" - parser = argparse.ArgumentParser(description='判断Excel文件中的问题是否完整') - parser.add_argument('-i', '--input', type=str, default=DEFAULT_EXCEL_PATH, - help=f'输入Excel文件路径 (默认: {DEFAULT_EXCEL_PATH})') - parser.add_argument('-o', '--output', type=str, default=DEFAULT_OUTPUT_PATH, - help=f'输出Excel文件路径 (默认: {DEFAULT_OUTPUT_PATH})') - parser.add_argument('-w', '--workers', type=int, default=DEFAULT_MAX_WORKERS, - help=f'并发处理的最大线程数 (默认: {DEFAULT_MAX_WORKERS})') - parser.add_argument('-c', '--column', type=int, default=0, - help='要处理的问题所在列的索引 (默认: 0,即第一列)') - parser.add_argument('-t', '--test', type=str, - help='测试单个问题,不处理Excel文件') - return parser.parse_args() - - -def main(): - """主函数""" - args = parse_arguments() - - # 创建问题完整性判断工具实例 - judge = QueryCompletenessJudge( - input_path=args.input, - output_path=args.output, - max_workers=args.workers, - column_index=args.column - ) - # 如果是测试单个问题 - if args.test: - judge.test_single_question(args.test) - return - - # 处理Excel文件 - judge.process_excel_file() - - -if __name__ == "__main__": - main() - - diff --git a/rag2_0/dify/DifyCompareTest.py b/rag2_0/dify/DifyCompareTest.py index aba5179..519b516 100755 --- a/rag2_0/dify/DifyCompareTest.py +++ b/rag2_0/dify/DifyCompareTest.py @@ -2,27 +2,39 @@ # -*- coding: utf-8 -*- import os -from rag2_0.dify.dify_client import DifyClient -from rag2_0.dify.dify_tool import NewWorkflowChat, OldWorkFlowChat +import sys +import argparse +from threading import Lock import pandas as pd # 使用线程池并发执行 from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm -from rag2_0.dify.dify_tool import DifyTool import json from urllib.parse import unquote -from rag2_0.tool.WikijsTool import WikijsTool -from rag2_0.tool.html_to_md import convert_html_to_md -from rag2_0.tool.ModelTool import OpenAiLLM from dotenv import load_dotenv from pydantic import BaseModel, Field from langchain.output_parsers import PydanticOutputParser -from threading import Lock -import sys -import argparse + +sys.path.append(os.getcwd()) +from rag2_0.dify.dify_client import DifyClient +from rag2_0.dify.dify_tool import NewWorkflowChat, OldWorkFlowChat +from rag2_0.tool.WikijsTool import WikijsTool +from rag2_0.tool.html_to_md import convert_html_to_md +from rag2_0.tool.ModelTool import OpenAiLLM +from rag2_0.dify.dify_tool import DifyTool load_dotenv() +import logging +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler() + ] +) + class ContentSource(BaseModel): score:int = Field(description="相关性分数") reason:str = Field(description="评分理由") @@ -32,8 +44,7 @@ class DifyComparisonTester: Dify新旧流程对比测试类,用于比较两个不同流程的问答效果并进行评判 """ def __init__(self, excel_path:str, baseurl:str, new_workflow_api_key:str, - old_workflow_api_key:str=None, wiki_excel_path:str=None, - output_path:str=None, max_workers:int=1, mode:str="both"): + old_workflow_api_key:str=None, output_path:str=None, max_workers:int=1, mode:str="both"): """ 初始化对比测试器 @@ -42,7 +53,6 @@ class DifyComparisonTester: baseurl: Dify API的基础URL new_workflow_api_key: 新流程的API密钥 old_workflow_api_key: 旧流程的API密钥,仅在mode="both"时需要 - wiki_excel_path: Wiki Excel文件路径,用于获取标准答案 output_path: 输出Excel文件路径 max_workers: 最大工作线程数 mode: 测试模式,"new_only"表示仅测试新对话,"both"表示测试新老对话 @@ -64,8 +74,8 @@ class DifyComparisonTester: self.results_lock = Lock() # 读取Wiki Excel文件 - if wiki_excel_path and os.path.exists(wiki_excel_path): - self.wiki_excel = pd.read_excel(wiki_excel_path) + if excel_path and os.path.exists(excel_path): + self.wiki_excel = pd.read_excel(excel_path) else: self.wiki_excel = None @@ -78,13 +88,13 @@ class DifyComparisonTester: """ self.dify_tool.close_connection() - def get_llm(self): + def get_llm(self, **kwargs): api_key = os.getenv("OPENAI_API_KEY") base_url = os.getenv("OPENAI_API_BASE") model = os.getenv("LLM_MODEL_NAME") - return OpenAiLLM(api_key=api_key, base_url=base_url, model=model) + return OpenAiLLM(api_key=api_key, base_url=base_url, model=model, **kwargs) - def find_wiki_link(self, query) -> str | None: + def find_wiki_link(self, row) -> str | None: """ 根据查询找出对应的词条链接 @@ -94,30 +104,11 @@ class DifyComparisonTester: Returns: str: 对应的词条链接,如果没有找到则返回None """ - # 确保query不为空 - if not query or pd.isna(query): - return None if self.wiki_excel is None: return None - # 在"新提问"列中查找匹配的行 - matched_rows = self.wiki_excel[self.wiki_excel['新提问'] == query] - - # 如果找到了匹配的行,返回对应的词条链接 - if not matched_rows.empty: - return matched_rows.iloc[0]['对应词条链接'] - - # 如果没有完全匹配,尝试部分匹配 - # 去除软件名称部分(如果有) - query_parts = query.split(',', 1) - if len(query_parts) > 1: - clean_query = query_parts[1].strip() - - # 在"提问"列中查找包含清理后查询的行 - for idx, row in self.wiki_excel.iterrows(): - if pd.notna(row['提问']) and clean_query in row['提问']: - return row['对应词条链接'] - + if "词条链接" in row: + return row["词条链接"] return None def get_wiki_content(self, link) -> str: @@ -191,7 +182,7 @@ class DifyComparisonTester: Returns: str: 格式化的prompt """ - return f"""请作为一个专业的答案评判专家,评估以下回答与标准答案的匹配程度。 + return f"""请作为一个电力造价行业的专家,评估以下回答与标准答案的匹配程度。 标准答案: {standard_answer} @@ -199,11 +190,20 @@ class DifyComparisonTester: 待评估的回答: {answer_to_check} -请仔细分析两个答案的内容,并给出你的判断。只需要回答"正确"或"错误",不需要其他解释。 -如果待评估的回答与标准答案在核心内容和关键信息(步骤)上一致,即使表达方式不同,也应判定为"正确"。 -如果待评估的回答存在明显的错误信息或重要信息缺失,应判定为"错误"。 - -请严格按以下格式输出:【正确】或【错误】:""" +要求 +1、分析待评估的回答与标准答案的匹配程度(包括内容、步骤、主体等) +2、如果待评估的回答与标准答案在核心内容和关键信息(步骤)上一致,即使表达方式不同,也应判定为"正确"。 +3、只要大体描述一致,即使缺失了一些步骤,也应判定为"正确"。 +3、如果待评估的回答存在明显的错误信息,应判定为"错误"。 +4、请严格按json格式输出: +{{ + "result": True or False, + "reason": "简明扼要的理由(中文)" +}} +字段说明: +result: True or False,待评估的回答是否正确 +reason: 简明扼要的理由(中文) +""" def judge_answer(self, standard_answer: str, answer: str) -> bool | None: """ @@ -218,10 +218,11 @@ class DifyComparisonTester: """ prompt = self.create_correctness_prompt(standard_answer, answer) - llm = self.get_llm() + llm = self.get_llm(response_format={"type": "json_object"}) try: response = llm.invoke(user_prompt=prompt, need_retry=True) - return "正确" in response.content + response_json = json.loads(response.content) + return response_json["result"] except Exception as e: return None @@ -513,10 +514,10 @@ content: "{content}" return old_result, new_result except Exception as e: - print(f"处理问题 '{q}' 时发生错误: {str(e)}") + logging.error(f"处理问题 '{q}' 时发生错误: {str(e)}", exc_info=True) return None, None - def process_question_with_judge(self, q:str): + def process_question_with_judge(self, q:str, row): """ 处理单个问题,获取新旧流程的回答并进行评判 @@ -537,7 +538,7 @@ content: "{content}" new_answer = future_new["新流程答案"] # 获取词条链接和标准答案 - wiki_url = self.find_wiki_link(query) + wiki_url = self.find_wiki_link(row) standard_answer = "" answer_title = "" @@ -546,7 +547,7 @@ content: "{content}" standard_answer = self.get_wiki_content(wiki_url) answer_title = self.get_wiki_title(wiki_url) except Exception as e: - print(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}") + logging.error(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}", exc_info=True) # 判断答案正确性 judge_result = "" @@ -563,7 +564,7 @@ content: "{content}" "问题分类": future_new["新问题分类"], "槽点信息": future_new["槽点信息"], "新流程答案": new_answer, - "回答判断": judge_result, + "回答是否正确": judge_result, "答案词条": answer_title if answer_title else "", "检索词条": future_new["新检索词条"], } @@ -576,7 +577,7 @@ content: "{content}" new_answer = future_new["新流程答案"] # 获取词条链接和标准答案 - wiki_url = self.find_wiki_link(query) + wiki_url = self.find_wiki_link(row) standard_answer = "" answer_title = "" @@ -585,7 +586,7 @@ content: "{content}" standard_answer = self.get_wiki_content(wiki_url) answer_title = self.get_wiki_title(wiki_url) except Exception as e: - print(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}") + logging.error(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}", exc_info=True) # 判断答案正确性 if standard_answer: @@ -630,25 +631,25 @@ content: "{content}" if row['回答中的软件名称'] == "未知" and row['提问中的软件名称'] == "未知": continue if row['提问中的软件名称'] != "未知": - questions.append(row['提问']) + questions.append((row['提问'],row)) else: - questions.append(f"{row['回答中的软件名称']}, {row['提问']}") + questions.append((f"{row['回答中的软件名称']}, {row['提问']}",row)) else: - questions.append(row['提问']) + questions.append((row['提问'], row)) results = [] is_debug = hasattr(sys, 'gettrace') and sys.gettrace() is not None if not is_debug: # 使用多线程并发处理问题 - print("并发数量: ", self.max_workers) - print("问题数量: ", len(questions)) + logging.info(f"并发数量: {self.max_workers}") + logging.info(f"问题数量: {len(questions)}") with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # 创建进度条 with tqdm(total=len(questions), desc="处理问题进度") as pbar: # 提交所有任务 futures = [] - for q in questions: - future = executor.submit(self.process_question_with_judge, q) + for q, row in questions: + future = executor.submit(self.process_question_with_judge, q, row) futures.append(future) # 处理结果 @@ -659,9 +660,9 @@ content: "{content}" results.append(result) pbar.update(1) else: - for q in questions: - result = self.process_question_with_judge(q) - print(json.dumps(result,ensure_ascii=False,indent=2)) + for q, row in questions: + result = self.process_question_with_judge(q, row) + logging.info(json.dumps(result,ensure_ascii=False,indent=2)) if result is not None: results.append(result) @@ -687,24 +688,29 @@ content: "{content}" if __name__ == "__main__": # 创建命令行参数解析器 + os.environ["DIFY_BASEURL"] = "http://10.1.16.39/v1" + os.environ["DIFY_NEW_API_KEY"] = "app-qxsSybCs7ABiKlC1JabTYVn6" + os.environ["DIFY_OLD_API_KEY"] = "app-wUdkWJx5zeOvmvBUZizMoSw3" + + os.environ["DIFY_PG_HOST"] = "10.1.16.39" + os.environ["DIFY_PG_PORT"] = "5432" + os.environ["DIFY_PG_USER"] = "postgres" + os.environ["DIFY_PG_PASSWORD"] = "difyai123456" + os.environ["DIFY_PG_DATABASE"] = "dify" - default_excel_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/历史提问数据(like)_提问明确.xlsx") - default_wiki_excel_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/部分提问_软件名称明确.xlsx") + default_excel_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/740条(dislike)_存在标准词条.xlsx") parser = argparse.ArgumentParser(description='Dify对话测试工具') parser.add_argument('--mode', type=str, choices=['new_only', 'both'], default='new_only', help='测试模式: new_only表示仅测试新对话, both表示测试新老对话') parser.add_argument('--excel_path', type=str, default=default_excel_path, help='包含问题的Excel文件路径') - parser.add_argument('--baseurl', type=str, default="http://172.20.0.145/v1", + parser.add_argument('--baseurl', type=str, default=os.getenv("DIFY_BASEURL"), help='Dify API的基础URL') - parser.add_argument('--new_api_key', type=str, default="app-qxsSybCs7ABiKlC1JabTYVn6", + parser.add_argument('--new_api_key', type=str, default=os.getenv("DIFY_NEW_API_KEY"), help='新流程的API密钥') - parser.add_argument('--old_api_key', type=str, default="app-wUdkWJx5zeOvmvBUZizMoSw3", + parser.add_argument('--old_api_key', type=str, default=os.getenv("DIFY_OLD_API_KEY"), help='旧流程的API密钥') - parser.add_argument('--wiki_excel_path', type=str, - default=default_wiki_excel_path, - help='Wiki Excel文件路径,用于获取标准答案') parser.add_argument('--output_path', type=str, default=None, help='输出Excel文件路径') parser.add_argument('--max_workers', type=int, default=5, @@ -715,7 +721,7 @@ if __name__ == "__main__": # 检查Excel文件是否存在 if not os.path.exists(args.excel_path): - print(f"错误:Excel文件不存在: {args.excel_path}") + logging.error(f"错误:Excel文件不存在: {args.excel_path}", exc_info=True) exit(1) # 创建测试器并运行 @@ -724,7 +730,6 @@ if __name__ == "__main__": baseurl=args.baseurl, new_workflow_api_key=args.new_api_key, old_workflow_api_key=args.old_api_key if args.mode == "both" else None, - wiki_excel_path=args.wiki_excel_path, output_path=args.output_path, max_workers=args.max_workers, mode=args.mode @@ -732,4 +737,4 @@ if __name__ == "__main__": # 运行对比测试(带评判) output_file = tester.run_comparison(with_judge=True) - print(f"测试结果已保存至: {output_file}") + logging.info(f"测试结果已保存至: {output_file}") diff --git a/rag2_0/dify/dify_tool.py b/rag2_0/dify/dify_tool.py index 4afe3ce..b734f0c 100755 --- a/rag2_0/dify/dify_tool.py +++ b/rag2_0/dify/dify_tool.py @@ -39,11 +39,11 @@ class PgSql: try: # 连接数据库 self.connection = psycopg2.connect( - user="postgres", - password="difyai123456", - host="172.20.0.145", - port=5432, - database="dify" + user=os.getenv("DIFY_PG_USER"), + password=os.getenv("DIFY_PG_PASSWORD"), + host=os.getenv("DIFY_PG_HOST"), + port=os.getenv("DIFY_PG_PORT"), + database=os.getenv("DIFY_PG_DATABASE") ) except (Exception, psycopg2.Error) as error: @@ -160,11 +160,11 @@ class PgSql: """, (workflow_run_id,) ) - result = cursor.fetchall() - if result: - colnames = [desc[0] for desc in cursor.description] - return [dict(zip(colnames, row)) for row in result] - return None + result = cursor.fetchall() + if result: + colnames = [desc[0] for desc in cursor.description] + return [dict(zip(colnames, row)) for row in result] + return None except (Exception, psycopg2.Error) as error: raise Exception(f"Error while getting workflow_node_executions_info: {error}") @@ -263,7 +263,8 @@ class BaseWorkflowChat: 析构函数,在对象被销毁时自动关闭数据库连接。 确保在对象生命周期结束时释放数据库资源。 """ - self.dify_tool.close_connection() + # DifyTool类已经在其__del__方法中关闭了数据库连接,无需在此重复调用 + pass def create_chat_message(self, query: str): """ @@ -464,7 +465,7 @@ class NewWorkflowChat(BaseWorkflowChat): elif workflow_node["title"] == "提取处理后的知识": outputs = json.loads(workflow_node["outputs"])["knowledge_list"] retrieve_title, max_score, min_score, avg_score = self.get_retrieve_info(query=query, outputs=outputs, reranker_sorce_info=reranker_sorce) - elif workflow_node["title"] == "问题优化结果解析": + elif workflow_node["title"] == "意图识别结果解析": outputs = json.loads(workflow_node["outputs"]) rewrite_query = outputs["optimize_query"] llm_result_json = json.loads(workflow_node['inputs'])["llm_result"]