Compare commits

...

2 Commits

7 changed files with 101 additions and 717 deletions
+2 -11
View File
@@ -3034,11 +3034,6 @@
"synonymous": [],
"description": "VC运行时库的安装程序"
},
{
"name": "技改检修计价通T1",
"synonymous": [],
"description": "电力行业技改检修计价软件的名称"
},
{
"name": "博微电力建设计价通",
"synonymous": [],
@@ -9885,11 +9880,6 @@
],
"description": "报表参数配置项,控制材料/设备的显示筛选"
},
{
"name": "技改检修清单T1",
"synonymous": [],
"description": "软件版本标识,对应技术改造和检修工程清单类型"
},
{
"name": "导入EXCEL版物料库",
"synonymous": [],
@@ -10023,7 +10013,8 @@
{
"name": "技改检修清单计价通T1软件",
"synonymous": [
"技改检修清单软件"
"技改检修清单软件",
"技改检修清单T1"
],
"description": "电力行业用于技改检修清单计价的软件名称"
},
Binary file not shown.
Binary file not shown.
-618
View File
@@ -1,618 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
完整性问题判断工具
此脚本用于读取Excel文件中的问题,调用LLM判断问题是否完整,并将结果保存到Excel文件中。
用法示例:
python judge_query_full.py -i "问题数据.xlsx" -o "完整问题结果.xlsx" -w 50 -c 0
命令行参数:
-i, --input: 输入Excel文件路径
-o, --output: 输出Excel文件路径
-w, --workers: 并发处理的最大线程数
-c, --column: 要处理的问题所在列的索引(从0开始)
-t, --test: 测试单个问题,不处理Excel文件
"""
import pandas as pd
import json
import os
import time
import re
import argparse
import traceback
from pathlib import Path
from rag2_0.tool.ModelTool import OpenAiLLM
from rag2_0.tool.APIKeyManager import APIKeyManager
from openpyxl.utils import get_column_letter
from openpyxl.styles import Alignment, PatternFill, Font, Border, Side
from tqdm import tqdm
import concurrent.futures
import threading
# 默认设置
DEFAULT_EXCEL_PATH = r"/data/QueryRewrite/data/excel/7000条对话数据.xlsx"
DEFAULT_OUTPUT_PATH = r"/data/QueryRewrite/data/excel/7000条对话数据_完整问题结果.xlsx"
DEFAULT_MAX_WORKERS = 50
class QueryCompletenessJudge:
"""
问题完整性判断工具类
用于评估问题是否完整,并将结果保存到Excel文件中。
可以批量处理Excel文件中的问题,也可以测试单个问题。
"""
def __init__(self, input_path=DEFAULT_EXCEL_PATH, output_path=DEFAULT_OUTPUT_PATH,
max_workers=DEFAULT_MAX_WORKERS, column_index=0):
"""
初始化问题完整性判断工具
参数:
input_path (str): 输入Excel文件路径
output_path (str): 输出Excel文件路径
max_workers (int): 并发处理的最大线程数
column_index (int): 要处理的问题所在列的索引(从0开始)
"""
self.input_path = input_path
self.output_path = output_path
self.max_workers = max_workers
self.column_index = column_index
self.llm_client = self._create_llm_client()
def _extract_json_from_response(self, full_answer):
"""
从LLM响应中提取JSON部分
参数:
full_answer (str): LLM的完整响应文本
返回:
dict: 解析后的JSON对象,如果解析失败则返回None
"""
# 尝试从回答中提取JSON部分
json_match = re.search(r'```json\s*(.*?)\s*```', full_answer, re.DOTALL)
if json_match:
json_str = json_match.group(1)
else:
# 如果没有找到```json```格式,尝试寻找普通的JSON对象
json_match = re.search(r'({[\s\S]*"is_complete"[\s\S]*})', full_answer)
if json_match:
json_str = json_match.group(1)
else:
# 如果仍然没有找到,返回None
return None
try:
# 解析JSON
return json.loads(json_str)
except json.JSONDecodeError:
return None
def _create_llm_prompt(self, question):
"""
创建LLM提示词
参数:
question (str): 需要判断完整性的问题
返回:
str: 格式化后的提示词
"""
return f"""你是一个电力造价行业专家,用户正在使用电力造价软件,并提出了相关问题。请分析以下问题是否完整。
问题:{question}
首先,分析这个问题的结构和内容,思考它是否包含足够的信息来表达清晰的意图。
考虑以下几点:
1. 问题是否有明确的核心意图,不需要面面俱到
2. 问题是否缺少必要的上下文
3. **问题如果涉及软件相关,则只需要包含:软件名称、软件功能或软件目的即可**
在你的分析之后,请用JSON格式给出最终结论,格式如下:
```json
{{
"is_complete": true或false,
"reason": "判断原因的简要说明",
"confidence": 0到100之间的数值,表示你对判断的置信度
}}
```
请确保JSON格式正确,以便于程序解析。"""
def _create_llm_client(self, api_key=None):
"""
创建LLM客户端
参数:
api_key (str, optional): API密钥,如果为None则从APIKeyManager获取
返回:
OpenAiLLM: LLM客户端实例
"""
if api_key is None:
api_key = APIKeyManager.get_api_key()
return OpenAiLLM(
api_key=api_key,
base_url="https://api.siliconflow.cn/v1", # 可以根据实际情况修改
model="deepseek-ai/DeepSeek-V3", # 可以根据实际情况修改
temperature=0.2,
max_tokens=100
)
def is_question_complete(self, question):
"""
调用LLM判断问题是否完整
参数:
question (str): 需要判断的问题
返回:
tuple: (bool, str) - 是否完整的布尔值和LLM的详细回复
"""
# 最大重试次数
max_retries = 3
retry_count = 0
retry_delay = 2 # 重试延迟,单位:秒
while retry_count <= max_retries:
try:
# 创建提示词
prompt = self._create_llm_prompt(question)
# 使用OpenAiLLM调用模型
response = self.llm_client.invoke(prompt)
# 处理可能的响应格式
if hasattr(response, 'content'):
full_answer = response.content
else:
# 如果response是字符串
full_answer = str(response)
# 提取JSON部分
result = self._extract_json_from_response(full_answer)
if result:
is_complete = result.get("is_complete", False)
return is_complete, full_answer
else:
# 如果没有找到或解析失败,使用简单判断
is_complete = "完整" in full_answer[:100]
return is_complete, full_answer
except Exception as e:
retry_count += 1
if retry_count <= max_retries:
# 非最后一次重试,打印错误并继续
time.sleep(retry_delay)
# 每次重试增加延迟时间,避免频繁失败
retry_delay *= 2
else:
# 已达到最大重试次数,返回错误
stack_trace = traceback.format_exc()
print(f"错误: 经过 {max_retries} 次重试后仍然失败: {str(e)}")
print(f"堆栈跟踪信息:\n{stack_trace}")
return False, f"错误: 经过 {max_retries} 次重试后仍然失败: {str(e)}\n堆栈摘要: {str(e).__class__.__name__}"
# 不应该到达这里,但为了代码完整性添加
return False, "未知错误:重试机制逻辑错误"
def _process_question(self, args, complete_questions, progress_counter, progress_lock, complete_questions_lock, pbar):
"""
处理单个问题并更新进度
参数:
args (tuple): 包含问题索引、问题内容、LLM客户端和总问题数的元组
complete_questions (list): 存储完整问题的列表
progress_counter (dict): 进度计数器
progress_lock (threading.Lock): 进度锁
complete_questions_lock (threading.Lock): 完整问题列表锁
pbar (tqdm): 进度条对象
"""
index, question, llm_client, total_questions = args
# 跳过空问题
if pd.isna(question) or question.strip() == "":
with progress_lock:
progress_counter["processed"] += 1
pbar.update(1)
return None
# 调用LLM判断问题是否完整
is_complete, full_answer = self.is_question_complete(question)
if is_complete:
# 从答案中提取JSON
parsed_json = self._extract_json_from_response(full_answer)
if parsed_json:
# 构造包含解析出的JSON信息的结果
result = {
"问题": question,
"LLM回复": full_answer,
"完整性": "完整" if parsed_json.get("is_complete", False) else "不完整",
"原因": parsed_json.get("reason", "未提供"),
"置信度": parsed_json.get("confidence", 0)
}
# 更新计数
with progress_lock:
if result["完整性"] == "完整":
progress_counter["complete"] += 1
else:
progress_counter["incomplete"] += 1
else:
# JSON解析失败,只保存原始回答
result = {
"问题": question,
"LLM回复": full_answer,
"完整性": "完整"
}
# 更新计数
with progress_lock:
progress_counter["complete"] += 1
with complete_questions_lock:
complete_questions.append(result)
else:
with progress_lock:
progress_counter["incomplete"] += 1
# 更新进度条
with progress_lock:
progress_counter["processed"] += 1
# 更新进度条描述
pbar.set_postfix(
完整=progress_counter["complete"],
不完整=progress_counter["incomplete"],
完整率=f"{progress_counter['complete']/max(1, progress_counter['processed']):.1%}"
)
pbar.update(1)
def _shorten_response(self, response):
"""
截断LLM响应,提取重要信息
参数:
response (str): 原始LLM响应
返回:
str: 截断后的响应
"""
# 保留思考过程的前200个字符和JSON部分
json_match = re.search(r'```json\s*(.*?)\s*```', response, re.DOTALL)
if json_match:
json_part = json_match.group(0)
prefix = response[:200] + "..." if len(response) > 200 else response
return f"{prefix}\n\n{json_part}"
return response[:500] + "..." if len(response) > 500 else response
def _prepare_excel_dataframe(self, complete_questions):
"""
将结果处理为DataFrame用于Excel输出
参数:
complete_questions (list): 完整问题列表
返回:
pandas.DataFrame: 处理后的DataFrame
"""
# 将结果列表转换为DataFrame
result_df = pd.DataFrame(complete_questions)
# 处理LLM回复列,截取一定长度以避免Excel单元格过大
if "LLM回复" in result_df.columns:
result_df["LLM回复"] = result_df["LLM回复"].apply(self._shorten_response)
# 调整列的顺序,确保重要列在前面
column_order = ["问题", "完整性", "置信度", "原因", "LLM回复"]
# 过滤掉不存在的列
column_order = [col for col in column_order if col in result_df.columns]
# 确保所有剩余的列也被包含
for col in result_df.columns:
if col not in column_order:
column_order.append(col)
# 重新排序列
return result_df[column_order]
def _set_excel_column_widths(self, worksheet):
"""
设置Excel列宽
参数:
worksheet (openpyxl.worksheet.worksheet.Worksheet): Excel工作表
"""
for col in range(1, worksheet.max_column + 1):
col_letter = get_column_letter(col)
column_name = worksheet[f"{col_letter}1"].value
if column_name == "问题":
worksheet.column_dimensions[col_letter].width = 40
elif column_name == "LLM回复":
worksheet.column_dimensions[col_letter].width = 60
elif column_name == "原因":
worksheet.column_dimensions[col_letter].width = 30
elif column_name == "完整性":
worksheet.column_dimensions[col_letter].width = 10
elif column_name == "置信度":
worksheet.column_dimensions[col_letter].width = 10
else:
worksheet.column_dimensions[col_letter].width = 15
def _apply_excel_cell_styles(self, worksheet):
"""
应用单元格样式
参数:
worksheet (openpyxl.worksheet.worksheet.Worksheet): Excel工作表
返回:
openpyxl.styles.Border: 边框样式,用于统计信息
"""
# 定义样式
header_fill = PatternFill(start_color="DDEBF7", end_color="DDEBF7", fill_type="solid")
header_font = Font(bold=True)
wrap_alignment = Alignment(wrap_text=True, vertical="top")
border = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
# 应用样式到每个单元格
for row in worksheet.iter_rows(min_row=1, max_row=worksheet.max_row, min_col=1, max_col=worksheet.max_column):
for cell in row:
cell.alignment = wrap_alignment
cell.border = border
# 为标题行应用特殊样式
if cell.row == 1:
cell.fill = header_fill
cell.font = header_font
# 为完整性列应用条件格式
if cell.row > 1: # 跳过标题行
column_name = worksheet.cell(row=1, column=cell.column).value
if column_name == "完整性":
if cell.value == "完整":
cell.fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
else:
cell.fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
return border # 返回边框样式以便在统计信息中重用
def _add_statistics_to_excel(self, worksheet, complete_questions, total_rows, total_questions, border):
"""
添加统计信息到Excel表格
参数:
worksheet (openpyxl.worksheet.worksheet.Worksheet): Excel工作表
complete_questions (list): 完整问题列表
total_rows (int): 总行数
total_questions (int): 总问题数
border (openpyxl.styles.Border): 边框样式
返回:
int: 完整问题数量
"""
# 计算统计数据
complete_count = sum(1 for item in complete_questions if item.get("完整性") == "完整")
incomplete_count = total_rows - complete_count
# 添加统计行
worksheet.append([""]) # 空行
stat_row = worksheet.max_row + 1
worksheet.cell(row=stat_row, column=1, value="统计信息")
worksheet.cell(row=stat_row, column=1).font = Font(bold=True)
worksheet.cell(row=stat_row+1, column=1, value="总问题数")
worksheet.cell(row=stat_row+1, column=2, value=total_rows)
worksheet.cell(row=stat_row+2, column=1, value="完整问题数")
worksheet.cell(row=stat_row+2, column=2, value=complete_count)
worksheet.cell(row=stat_row+2, column=2).fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
worksheet.cell(row=stat_row+3, column=1, value="不完整问题数")
worksheet.cell(row=stat_row+3, column=2, value=incomplete_count)
worksheet.cell(row=stat_row+3, column=2).fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
worksheet.cell(row=stat_row+4, column=1, value="完整问题比例")
worksheet.cell(row=stat_row+4, column=2, value=f"{complete_count/total_rows:.2%}" if total_rows > 0 else "0%")
# 应用边框到统计行
for r in range(stat_row, stat_row+5):
for c in range(1, 3):
worksheet.cell(row=r, column=c).border = border
return complete_count
def save_results_to_excel(self, complete_questions, total_questions):
"""
将结果保存到Excel文件
参数:
complete_questions (list): 完整问题列表
total_questions (int): 总问题数
"""
if not complete_questions:
print(f"没有找到完整的问题。")
return
# 准备数据
result_df = self._prepare_excel_dataframe(complete_questions)
total_rows = len(result_df)
# 保存到Excel文件
result_df.to_excel(self.output_path, index=False, engine='openpyxl')
# 应用Excel样式
from openpyxl import load_workbook
wb = load_workbook(self.output_path)
ws = wb.active
# 设置列宽
self._set_excel_column_widths(ws)
# 应用单元格样式
border = self._apply_excel_cell_styles(ws)
# 添加统计信息
complete_count = self._add_statistics_to_excel(ws, complete_questions, total_rows, total_questions, border)
# 保存样式化的工作簿
wb.save(self.output_path)
# 输出结果统计
print(f"处理完成。共有{complete_count}/{total_questions}个完整问题被保存到 {self.output_path}")
print(f"完整问题比例: {complete_count/total_questions:.2%}" if total_questions > 0 else "完整问题比例: 0%")
def process_excel_file(self):
"""
处理Excel文件中的问题
读取Excel文件,判断问题完整性,并将结果保存到输出Excel文件
"""
# 确保Excel文件存在
if not os.path.exists(self.input_path):
print(f"错误: 找不到Excel文件 '{self.input_path}'")
return
# 读取Excel文件
print(f"正在读取Excel文件: {self.input_path}")
try:
df = pd.read_excel(self.input_path)
except Exception as e:
print(f"读取Excel文件时出错: {e}")
return
# 检查列数据
if len(df.columns) <= self.column_index:
print(f"错误: Excel文件没有足够的列,请求索引 {self.column_index},但只有 {len(df.columns)}")
return
# 获取目标列名称
target_col = df.columns[self.column_index]
print(f"目标列名称: {target_col}")
# 准备存储完整问题的列表
complete_questions = []
total_questions = len(df)
print(f"总共有{total_questions}个问题需要判断")
# 用于线程安全的列表操作和进度计数
complete_questions_lock = threading.Lock()
progress_counter = {"processed": 0, "complete": 0, "incomplete": 0}
progress_lock = threading.Lock()
# 准备问题列表
questions = [(i, str(row[target_col]), self.llm_client, total_questions)
for i, row in df.iterrows()]
# 记录开始时间
start_time = time.time()
# 使用tqdm创建进度条
print(f"开始处理问题,使用 {self.max_workers} 个并发线程...")
with tqdm(total=total_questions, desc="处理问题", unit="问题") as pbar:
# 使用线程池并发处理
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有任务
futures = [executor.submit(
self._process_question,
args,
complete_questions,
progress_counter,
progress_lock,
complete_questions_lock,
pbar
) for args in questions]
# 等待所有任务完成
concurrent.futures.wait(futures)
# 计算总处理时间
processing_time = time.time() - start_time
print(f"处理完成,耗时: {processing_time:.2f}秒,平均每问题: {processing_time/total_questions:.2f}")
# 将完整问题保存到Excel文件
self.save_results_to_excel(complete_questions, total_questions)
def test_single_question(self, question):
"""
测试单个问题的完整性
参数:
question (str): 要测试的问题
"""
print(f"问题: {question}")
print("正在调用LLM判断问题是否完整...")
# 调用LLM判断问题是否完整
is_complete, full_answer = self.is_question_complete(question)
# 从答案中提取JSON
parsed_json = self._extract_json_from_response(full_answer)
print("\n==== LLM回复 ====")
print(full_answer)
print("================\n")
if parsed_json:
print(f"判断结果: {'完整' if parsed_json.get('is_complete', False) else '不完整'}")
print(f"判断原因: {parsed_json.get('reason', '未提供')}")
print(f"置信度: {parsed_json.get('confidence', 0)}%")
else:
print(f"判断结果: {'完整' if is_complete else '不完整'} (简单判断)")
print("无法从回复中提取JSON结构化数据")
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description='判断Excel文件中的问题是否完整')
parser.add_argument('-i', '--input', type=str, default=DEFAULT_EXCEL_PATH,
help=f'输入Excel文件路径 (默认: {DEFAULT_EXCEL_PATH})')
parser.add_argument('-o', '--output', type=str, default=DEFAULT_OUTPUT_PATH,
help=f'输出Excel文件路径 (默认: {DEFAULT_OUTPUT_PATH})')
parser.add_argument('-w', '--workers', type=int, default=DEFAULT_MAX_WORKERS,
help=f'并发处理的最大线程数 (默认: {DEFAULT_MAX_WORKERS})')
parser.add_argument('-c', '--column', type=int, default=0,
help='要处理的问题所在列的索引 (默认: 0,即第一列)')
parser.add_argument('-t', '--test', type=str,
help='测试单个问题,不处理Excel文件')
return parser.parse_args()
def main():
"""主函数"""
args = parse_arguments()
# 创建问题完整性判断工具实例
judge = QueryCompletenessJudge(
input_path=args.input,
output_path=args.output,
max_workers=args.workers,
column_index=args.column
)
# 如果是测试单个问题
if args.test:
judge.test_single_question(args.test)
return
# 处理Excel文件
judge.process_excel_file()
if __name__ == "__main__":
main()
+79 -74
View File
@@ -2,27 +2,39 @@
# -*- coding: utf-8 -*-
import os
from rag2_0.dify.dify_client import DifyClient
from rag2_0.dify.dify_tool import NewWorkflowChat, OldWorkFlowChat
import sys
import argparse
from threading import Lock
import pandas as pd
# 使用线程池并发执行
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from rag2_0.dify.dify_tool import DifyTool
import json
from urllib.parse import unquote
from rag2_0.tool.WikijsTool import WikijsTool
from rag2_0.tool.html_to_md import convert_html_to_md
from rag2_0.tool.ModelTool import OpenAiLLM
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from langchain.output_parsers import PydanticOutputParser
from threading import Lock
import sys
import argparse
sys.path.append(os.getcwd())
from rag2_0.dify.dify_client import DifyClient
from rag2_0.dify.dify_tool import NewWorkflowChat, OldWorkFlowChat
from rag2_0.tool.WikijsTool import WikijsTool
from rag2_0.tool.html_to_md import convert_html_to_md
from rag2_0.tool.ModelTool import OpenAiLLM
from rag2_0.dify.dify_tool import DifyTool
load_dotenv()
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
class ContentSource(BaseModel):
score:int = Field(description="相关性分数")
reason:str = Field(description="评分理由")
@@ -32,8 +44,7 @@ class DifyComparisonTester:
Dify新旧流程对比测试类,用于比较两个不同流程的问答效果并进行评判
"""
def __init__(self, excel_path:str, baseurl:str, new_workflow_api_key:str,
old_workflow_api_key:str=None, wiki_excel_path:str=None,
output_path:str=None, max_workers:int=1, mode:str="both"):
old_workflow_api_key:str=None, output_path:str=None, max_workers:int=1, mode:str="both"):
"""
初始化对比测试器
@@ -42,7 +53,6 @@ class DifyComparisonTester:
baseurl: Dify API的基础URL
new_workflow_api_key: 新流程的API密钥
old_workflow_api_key: 旧流程的API密钥,仅在mode="both"时需要
wiki_excel_path: Wiki Excel文件路径,用于获取标准答案
output_path: 输出Excel文件路径
max_workers: 最大工作线程数
mode: 测试模式,"new_only"表示仅测试新对话,"both"表示测试新老对话
@@ -64,8 +74,8 @@ class DifyComparisonTester:
self.results_lock = Lock()
# 读取Wiki Excel文件
if wiki_excel_path and os.path.exists(wiki_excel_path):
self.wiki_excel = pd.read_excel(wiki_excel_path)
if excel_path and os.path.exists(excel_path):
self.wiki_excel = pd.read_excel(excel_path)
else:
self.wiki_excel = None
@@ -78,13 +88,13 @@ class DifyComparisonTester:
"""
self.dify_tool.close_connection()
def get_llm(self):
def get_llm(self, **kwargs):
api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_API_BASE")
model = os.getenv("LLM_MODEL_NAME")
return OpenAiLLM(api_key=api_key, base_url=base_url, model=model)
return OpenAiLLM(api_key=api_key, base_url=base_url, model=model, **kwargs)
def find_wiki_link(self, query) -> str | None:
def find_wiki_link(self, row) -> str | None:
"""
根据查询找出对应的词条链接
@@ -94,30 +104,11 @@ class DifyComparisonTester:
Returns:
str: 对应的词条链接,如果没有找到则返回None
"""
# 确保query不为空
if not query or pd.isna(query):
return None
if self.wiki_excel is None:
return None
# 在"新提问"列中查找匹配的行
matched_rows = self.wiki_excel[self.wiki_excel['新提问'] == query]
# 如果找到了匹配的行,返回对应的词条链接
if not matched_rows.empty:
return matched_rows.iloc[0]['对应词条链接']
# 如果没有完全匹配,尝试部分匹配
# 去除软件名称部分(如果有)
query_parts = query.split(',', 1)
if len(query_parts) > 1:
clean_query = query_parts[1].strip()
# 在"提问"列中查找包含清理后查询的行
for idx, row in self.wiki_excel.iterrows():
if pd.notna(row['提问']) and clean_query in row['提问']:
return row['对应词条链接']
if "词条链接" in row:
return row["词条链接"]
return None
def get_wiki_content(self, link) -> str:
@@ -191,7 +182,7 @@ class DifyComparisonTester:
Returns:
str: 格式化的prompt
"""
return f"""请作为一个专业的答案评判专家,评估以下回答与标准答案的匹配程度。
return f"""请作为一个电力造价行业的专家,评估以下回答与标准答案的匹配程度。
标准答案:
{standard_answer}
@@ -199,11 +190,20 @@ class DifyComparisonTester:
待评估的回答:
{answer_to_check}
请仔细分析两个答案的内容,并给出你的判断。只需要回答"正确""错误",不需要其他解释。
如果待评估的回答与标准答案在核心内容和关键信息(步骤)上一致,即使表达方式不同,也应判定为"正确"
如果待评估的回答存在明显的错误信息或重要信息缺失,应判定为"错误"
请严格按以下格式输出:【正确】或【错误】:"""
要求
1、分析待评估的回答与标准答案的匹配程度(包括内容、步骤、主体等)
2、如果待评估的回答与标准答案在核心内容和关键信息(步骤)上一致,即使表达方式不同,也应判定为"正确"
3、只要大体描述一致,即使缺失了一些步骤,也应判定为"正确"
3、如果待评估的回答存在明显的错误信息,应判定为"错误"
4、请严格按json格式输出:
{{
"result": True or False,
"reason": "简明扼要的理由(中文)"
}}
字段说明:
result: True or False,待评估的回答是否正确
reason: 简明扼要的理由(中文)
"""
def judge_answer(self, standard_answer: str, answer: str) -> bool | None:
"""
@@ -218,10 +218,11 @@ class DifyComparisonTester:
"""
prompt = self.create_correctness_prompt(standard_answer, answer)
llm = self.get_llm()
llm = self.get_llm(response_format={"type": "json_object"})
try:
response = llm.invoke(user_prompt=prompt, need_retry=True)
return "正确" in response.content
response_json = json.loads(response.content)
return response_json["result"]
except Exception as e:
return None
@@ -513,10 +514,10 @@ content: "{content}"
return old_result, new_result
except Exception as e:
print(f"处理问题 '{q}' 时发生错误: {str(e)}")
logging.error(f"处理问题 '{q}' 时发生错误: {str(e)}", exc_info=True)
return None, None
def process_question_with_judge(self, q:str):
def process_question_with_judge(self, q:str, row):
"""
处理单个问题,获取新旧流程的回答并进行评判
@@ -537,7 +538,7 @@ content: "{content}"
new_answer = future_new["新流程答案"]
# 获取词条链接和标准答案
wiki_url = self.find_wiki_link(query)
wiki_url = self.find_wiki_link(row)
standard_answer = ""
answer_title = ""
@@ -546,7 +547,7 @@ content: "{content}"
standard_answer = self.get_wiki_content(wiki_url)
answer_title = self.get_wiki_title(wiki_url)
except Exception as e:
print(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}")
logging.error(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}", exc_info=True)
# 判断答案正确性
judge_result = ""
@@ -563,7 +564,7 @@ content: "{content}"
"问题分类": future_new["新问题分类"],
"槽点信息": future_new["槽点信息"],
"新流程答案": new_answer,
"回答判断": judge_result,
"回答是否正确": judge_result,
"答案词条": answer_title if answer_title else "",
"检索词条": future_new["新检索词条"],
}
@@ -576,7 +577,7 @@ content: "{content}"
new_answer = future_new["新流程答案"]
# 获取词条链接和标准答案
wiki_url = self.find_wiki_link(query)
wiki_url = self.find_wiki_link(row)
standard_answer = ""
answer_title = ""
@@ -585,7 +586,7 @@ content: "{content}"
standard_answer = self.get_wiki_content(wiki_url)
answer_title = self.get_wiki_title(wiki_url)
except Exception as e:
print(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}")
logging.error(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}", exc_info=True)
# 判断答案正确性
if standard_answer:
@@ -630,25 +631,25 @@ content: "{content}"
if row['回答中的软件名称'] == "未知" and row['提问中的软件名称'] == "未知":
continue
if row['提问中的软件名称'] != "未知":
questions.append(row['提问'])
questions.append((row['提问'],row))
else:
questions.append(f"{row['回答中的软件名称']}, {row['提问']}")
questions.append((f"{row['回答中的软件名称']}, {row['提问']}",row))
else:
questions.append(row['提问'])
questions.append((row['提问'], row))
results = []
is_debug = hasattr(sys, 'gettrace') and sys.gettrace() is not None
if not is_debug:
# 使用多线程并发处理问题
print("并发数量: ", self.max_workers)
print("问题数量: ", len(questions))
logging.info(f"并发数量: {self.max_workers}")
logging.info(f"问题数量: {len(questions)}")
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 创建进度条
with tqdm(total=len(questions), desc="处理问题进度") as pbar:
# 提交所有任务
futures = []
for q in questions:
future = executor.submit(self.process_question_with_judge, q)
for q, row in questions:
future = executor.submit(self.process_question_with_judge, q, row)
futures.append(future)
# 处理结果
@@ -659,9 +660,9 @@ content: "{content}"
results.append(result)
pbar.update(1)
else:
for q in questions:
result = self.process_question_with_judge(q)
print(json.dumps(result,ensure_ascii=False,indent=2))
for q, row in questions:
result = self.process_question_with_judge(q, row)
logging.info(json.dumps(result,ensure_ascii=False,indent=2))
if result is not None:
results.append(result)
@@ -687,24 +688,29 @@ content: "{content}"
if __name__ == "__main__":
# 创建命令行参数解析器
os.environ["DIFY_BASEURL"] = "http://10.1.16.39/v1"
os.environ["DIFY_NEW_API_KEY"] = "app-qxsSybCs7ABiKlC1JabTYVn6"
os.environ["DIFY_OLD_API_KEY"] = "app-wUdkWJx5zeOvmvBUZizMoSw3"
os.environ["DIFY_PG_HOST"] = "10.1.16.39"
os.environ["DIFY_PG_PORT"] = "5432"
os.environ["DIFY_PG_USER"] = "postgres"
os.environ["DIFY_PG_PASSWORD"] = "difyai123456"
os.environ["DIFY_PG_DATABASE"] = "dify"
default_excel_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/历史提问数据(like)_提问明确.xlsx")
default_wiki_excel_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/部分提问_软件名称明确.xlsx")
default_excel_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/740条(dislike)_存在标准词条.xlsx")
parser = argparse.ArgumentParser(description='Dify对话测试工具')
parser.add_argument('--mode', type=str, choices=['new_only', 'both'], default='new_only',
help='测试模式: new_only表示仅测试新对话, both表示测试新老对话')
parser.add_argument('--excel_path', type=str,
default=default_excel_path,
help='包含问题的Excel文件路径')
parser.add_argument('--baseurl', type=str, default="http://172.20.0.145/v1",
parser.add_argument('--baseurl', type=str, default=os.getenv("DIFY_BASEURL"),
help='Dify API的基础URL')
parser.add_argument('--new_api_key', type=str, default="app-qxsSybCs7ABiKlC1JabTYVn6",
parser.add_argument('--new_api_key', type=str, default=os.getenv("DIFY_NEW_API_KEY"),
help='新流程的API密钥')
parser.add_argument('--old_api_key', type=str, default="app-wUdkWJx5zeOvmvBUZizMoSw3",
parser.add_argument('--old_api_key', type=str, default=os.getenv("DIFY_OLD_API_KEY"),
help='旧流程的API密钥')
parser.add_argument('--wiki_excel_path', type=str,
default=default_wiki_excel_path,
help='Wiki Excel文件路径,用于获取标准答案')
parser.add_argument('--output_path', type=str, default=None,
help='输出Excel文件路径')
parser.add_argument('--max_workers', type=int, default=5,
@@ -715,7 +721,7 @@ if __name__ == "__main__":
# 检查Excel文件是否存在
if not os.path.exists(args.excel_path):
print(f"错误:Excel文件不存在: {args.excel_path}")
logging.error(f"错误:Excel文件不存在: {args.excel_path}", exc_info=True)
exit(1)
# 创建测试器并运行
@@ -724,7 +730,6 @@ if __name__ == "__main__":
baseurl=args.baseurl,
new_workflow_api_key=args.new_api_key,
old_workflow_api_key=args.old_api_key if args.mode == "both" else None,
wiki_excel_path=args.wiki_excel_path,
output_path=args.output_path,
max_workers=args.max_workers,
mode=args.mode
@@ -732,4 +737,4 @@ if __name__ == "__main__":
# 运行对比测试(带评判)
output_file = tester.run_comparison(with_judge=True)
print(f"测试结果已保存至: {output_file}")
logging.info(f"测试结果已保存至: {output_file}")
+13 -12
View File
@@ -39,11 +39,11 @@ class PgSql:
try:
# 连接数据库
self.connection = psycopg2.connect(
user="postgres",
password="difyai123456",
host="172.20.0.145",
port=5432,
database="dify"
user=os.getenv("DIFY_PG_USER"),
password=os.getenv("DIFY_PG_PASSWORD"),
host=os.getenv("DIFY_PG_HOST"),
port=os.getenv("DIFY_PG_PORT"),
database=os.getenv("DIFY_PG_DATABASE")
)
except (Exception, psycopg2.Error) as error:
@@ -160,11 +160,11 @@ class PgSql:
""",
(workflow_run_id,)
)
result = cursor.fetchall()
if result:
colnames = [desc[0] for desc in cursor.description]
return [dict(zip(colnames, row)) for row in result]
return None
result = cursor.fetchall()
if result:
colnames = [desc[0] for desc in cursor.description]
return [dict(zip(colnames, row)) for row in result]
return None
except (Exception, psycopg2.Error) as error:
raise Exception(f"Error while getting workflow_node_executions_info: {error}")
@@ -263,7 +263,8 @@ class BaseWorkflowChat:
析构函数,在对象被销毁时自动关闭数据库连接。
确保在对象生命周期结束时释放数据库资源。
"""
self.dify_tool.close_connection()
# DifyTool类已经在其__del__方法中关闭了数据库连接,无需在此重复调用
pass
def create_chat_message(self, query: str):
"""
@@ -464,7 +465,7 @@ class NewWorkflowChat(BaseWorkflowChat):
elif workflow_node["title"] == "提取处理后的知识":
outputs = json.loads(workflow_node["outputs"])["knowledge_list"]
retrieve_title, max_score, min_score, avg_score = self.get_retrieve_info(query=query, outputs=outputs, reranker_sorce_info=reranker_sorce)
elif workflow_node["title"] == "问题优化结果解析":
elif workflow_node["title"] == "意图识别结果解析":
outputs = json.loads(workflow_node["outputs"])
rewrite_query = outputs["optimize_query"]
llm_result_json = json.loads(workflow_node['inputs'])["llm_result"]
+7 -2
View File
@@ -62,8 +62,13 @@ def intent_recognize():
# 获取单例实例并使用线程锁保护关键操作
recognizer = RecognizerSingleton.get_instance()
result = recognizer.process_query(query, conversation_context, chat_history, previous_slots)
result = recognizer.process_query(query=query,
conversation_context=conversation_context,
chat_history=chat_history,
previous_slots=previous_slots,
use_jieba=False,
enable_query_expansion=True)
end_time = time.time()
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S %z")
logger.info(f"[{os.getpid()}] 意图识别耗时: {end_time - start_time:.2f}")