优化Dify对比测试器,新增命令行参数解析功能,支持选择测试模式(仅新流程或新老流程对比),重构问题处理逻辑以提高可读性和维护性,更新输出文件命名规则,确保结果保存至指定路径。

This commit is contained in:
2025-06-11 09:32:44 +08:00
parent 2e91063ad1
commit 8aec1f3f6f
+140 -84
View File
@@ -19,6 +19,7 @@ from pydantic import BaseModel, Field
from langchain.output_parsers import PydanticOutputParser
from threading import Lock
import sys
import argparse
load_dotenv()
@@ -30,27 +31,34 @@ class DifyComparisonTester:
"""
Dify新旧流程对比测试类,用于比较两个不同流程的问答效果并进行评判
"""
def __init__(self, excel_path:str, baseurl:str, old_workflow_api_key:str, new_workflow_api_key:str,
wiki_excel_path:str=None, output_path:str=None, max_workers:int=1):
def __init__(self, excel_path:str, baseurl:str, new_workflow_api_key:str,
old_workflow_api_key:str=None, wiki_excel_path:str=None,
output_path:str=None, max_workers:int=1, mode:str="both"):
"""
初始化对比测试器
Args:
excel_path: 包含问题的Excel文件路径
baseurl: Dify API的基础URL
old_workflow_api_key: 旧流程的API密钥
new_workflow_api_key: 新流程的API密钥
old_workflow_api_key: 旧流程的API密钥,仅在mode="both"时需要
wiki_excel_path: Wiki Excel文件路径,用于获取标准答案
output_path: 输出Excel文件路径
max_workers: 最大工作线程数
mode: 测试模式,"new_only"表示仅测试新对话,"both"表示测试新老对话
"""
self.excel_path = excel_path
self.mode = mode
# 使用NewWorkflowChat和OldWorkFlowChat代替ChatClient
self.old_chat = OldWorkFlowChat(api_key=old_workflow_api_key, base_url=baseurl)
self.new_chat = NewWorkflowChat(api_key=new_workflow_api_key, base_url=baseurl)
if mode == "both" and old_workflow_api_key:
self.old_chat = OldWorkFlowChat(api_key=old_workflow_api_key, base_url=baseurl)
else:
self.old_chat = None
# 评判相关参数
self.output_path = output_path or os.path.join(os.path.dirname(self.excel_path), "dify问答_综合评判结果.xlsx")
self.output_path = output_path or os.path.join(os.path.dirname(self.excel_path), "dify问答_新流程结果.xlsx")
self.max_workers = max_workers
self.content_source_parser = PydanticOutputParser(pydantic_object=ContentSource)
self.results_lock = Lock()
@@ -68,45 +76,6 @@ class DifyComparisonTester:
model = os.getenv("LLM_MODEL_NAME")
return OpenAiLLM(api_key=api_key, base_url=base_url, model=model)
def process_question(self, q:str):
"""
处理单个问题,并行获取新旧流程的回答
Args:
q: 问题内容
Returns:
dict: 包含问题和两个流程回答的字典
"""
def get_old_answer():
try:
return self.old_chat.process_question(query=q)
except Exception as e:
return f"error: {str(e)}"
def get_new_answer():
try:
return self.new_chat.process_question(query=q)
except Exception as e:
return f"error: {str(e)}"
# 并行执行old_chat和new_chat
with ThreadPoolExecutor(max_workers=2) as executor:
future_old = executor.submit(get_old_answer)
future_new = executor.submit(get_new_answer)
try:
old_result = future_old.result()
new_result = future_new.result()
if isinstance(old_result, str) and old_result.startswith("error:"):
return None, None
if isinstance(new_result, str) and new_result.startswith("error:"):
return None, None
except Exception as e:
return None, None, None
return future_old, future_new
def find_wiki_link(self, query) -> str | None:
"""
根据查询找出对应的词条链接
@@ -510,6 +479,37 @@ content: "{content}"
result = f"{similarity_percentage}%"
return result
def process_question(self, q:str) -> tuple:
"""
处理单个问题,获取新旧流程的回答
Args:
q: 问题内容
Returns:
tuple: (old_result, new_result) 包含旧流程和新流程的回答信息
"""
try:
# 如果是仅测试新流程模式
if self.mode == "new_only" or self.old_chat is None:
new_result = self.new_chat.process_question(q)
return None, new_result
else:
# 使用ThreadPoolExecutor并发执行新旧流程
with ThreadPoolExecutor(max_workers=2) as executor:
# 并发提交新旧流程的任务
future_new = executor.submit(self.new_chat.process_question, q)
future_old = executor.submit(self.old_chat.process_question, q)
# 获取结果
new_result = future_new.result()
old_result = future_old.result()
return old_result, new_result
except Exception as e:
print(f"处理问题 '{q}' 时发生错误: {str(e)}")
return None, None
def process_question_with_judge(self, q:str):
"""
处理单个问题,获取新旧流程的回答并进行评判
@@ -522,9 +522,49 @@ content: "{content}"
"""
# 获取基本的问题和回答
future_old, future_new = self.process_question(q)
if future_old is None or future_new is None:
if future_new is None:
return None
# 如果是仅测试新流程模式
if self.mode == "new_only" or future_old is None:
query = future_new["问题"]
new_answer = future_new["新流程答案"]
# 获取词条链接和标准答案
wiki_url = self.find_wiki_link(query)
standard_answer = ""
answer_title = ""
try:
if wiki_url and not pd.isna(wiki_url):
standard_answer = self.get_wiki_content(wiki_url)
answer_title = self.get_wiki_title(wiki_url)
except Exception as e:
print(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}")
# 判断答案正确性
judge_result = ""
if standard_answer:
# 调用LLM判断新答案是否正确
new_result = self.judge_answer(standard_answer, new_answer)
if new_result is not None:
judge_result = "正确" if new_result else "错误"
# 返回结果
return {
"问题": query,
"问题改写": future_new["新问题改写"],
"问题分类": future_new["新问题分类"],
"槽点信息": future_new["槽点信息"],
"新流程答案": new_answer,
"回答判断": judge_result,
"答案词条": answer_title if answer_title else "",
"检索词条": future_new["新检索词条"],
}
# 如果是测试新老流程模式
if future_old is None:
return None
query = future_old["问题"]
old_answer = future_old["旧流程答案"]
new_answer = future_new["新流程答案"]
@@ -555,17 +595,17 @@ content: "{content}"
# 返回结果
return {
"问题": query,
"新问题改写": future_new["问题改写"],
"旧问题改写": future_old["问题改写"],
"新问题分类": future_new["问题分类"],
"新问题改写": future_new["问题改写"],
"旧问题改写": future_old["问题改写"],
"新问题分类": future_new["问题分类"],
"槽点信息": future_new["槽点信息"],
"新流程答案": new_answer,
"旧流程答案": old_answer,
"回答判断": judge_result,
# "词条检索相似度": retrieve_title_score,
"答案词条": answer_title if answer_title else "",
"新检索词条": future_new["检索词条"],
"旧检索词条": future_old["检索词条"],
"新检索词条": future_new["检索词条"],
"旧检索词条": future_old["检索词条"],
}
def run_comparison(self, with_judge=False):
@@ -582,15 +622,17 @@ content: "{content}"
df = pd.read_excel(self.excel_path)
questions=[]
for idx, row in df.iterrows():
if row['回答中的软件名称'] == "未知":
continue
if row['提问中的软件名称'] != "未知":
if "回答中的软件名称" in row and "提问中的软件名称" in row:
if row['回答中的软件名称'] == "未知" and row['提问中的软件名称'] == "未知":
continue
if row['提问中的软件名称'] != "未知":
questions.append(row['提问'])
else:
questions.append(f"{row['回答中的软件名称']}, {row['提问']}")
else:
questions.append(row['提问'])
questions.append(f"{row['回答中的软件名称']}, {row['提问']}")
results = []
# 选择处理函数
process_func = self.process_question_with_judge if with_judge else self.process_question
is_debug = hasattr(sys, 'gettrace') and sys.gettrace() is not None
if not is_debug:
# 使用多线程并发处理问题
@@ -602,7 +644,7 @@ content: "{content}"
# 提交所有任务
futures = []
for q in questions:
future = executor.submit(process_func, q)
future = executor.submit(self.process_question_with_judge, q)
futures.append(future)
# 处理结果
@@ -614,12 +656,13 @@ content: "{content}"
pbar.update(1)
else:
for q in questions:
result = process_func(q)
result = self.process_question_with_judge(q)
print(json.dumps(result,ensure_ascii=False,indent=2))
if result is not None:
results.append(result)
# 生成输出Excel文件
out_path = self.output_path if with_judge else os.path.join(os.path.dirname(self.excel_path), "dify问答_对比结果.xlsx")
out_path = self.output_path
df_results = pd.DataFrame(results)
# 使用ExcelWriter设置格式
@@ -639,37 +682,50 @@ content: "{content}"
if __name__ == "__main__":
# 定义Excel路径
excel_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/历史提问数据(like)_提问明确.xlsx")
# 创建命令行参数解析器
if not os.path.exists(excel_path):
print(f"错误:Excel文件不存在: {excel_path}")
default_excel_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/历史提问数据(like)_提问明确.xlsx")
default_wiki_excel_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/部分提问_软件名称明确.xlsx")
parser = argparse.ArgumentParser(description='Dify对话测试工具')
parser.add_argument('--mode', type=str, choices=['new_only', 'both'], default='new_only',
help='测试模式: new_only表示仅测试新对话, both表示测试新老对话')
parser.add_argument('--excel_path', type=str,
default=default_excel_path,
help='包含问题的Excel文件路径')
parser.add_argument('--baseurl', type=str, default="http://172.20.0.145/v1",
help='Dify API的基础URL')
parser.add_argument('--new_api_key', type=str, default="app-qxsSybCs7ABiKlC1JabTYVn6",
help='新流程的API密钥')
parser.add_argument('--old_api_key', type=str, default="app-wUdkWJx5zeOvmvBUZizMoSw3",
help='旧流程的API密钥')
parser.add_argument('--wiki_excel_path', type=str,
default=default_wiki_excel_path,
help='Wiki Excel文件路径,用于获取标准答案')
parser.add_argument('--output_path', type=str, default=None,
help='输出Excel文件路径')
parser.add_argument('--max_workers', type=int, default=5,
help='最大工作线程数')
# 解析命令行参数
args = parser.parse_args()
# 检查Excel文件是否存在
if not os.path.exists(args.excel_path):
print(f"错误:Excel文件不存在: {args.excel_path}")
exit(1)
# Dify API配置
baseurl = "http://172.20.0.145/v1"
old_workflow_api_key = "app-wUdkWJx5zeOvmvBUZizMoSw3"
new_workflow_api_key = "app-qxsSybCs7ABiKlC1JabTYVn6"
# Wiki Excel路径和Dify应用ID(用于评判)
wiki_excel_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/部分提问_软件名称明确.xlsx")
# 创建测试器并运行
tester = DifyComparisonTester(
excel_path=excel_path,
baseurl=baseurl,
old_workflow_api_key=old_workflow_api_key,
new_workflow_api_key=new_workflow_api_key,
wiki_excel_path=wiki_excel_path,
max_workers=5
excel_path=args.excel_path,
baseurl=args.baseurl,
new_workflow_api_key=args.new_api_key,
old_workflow_api_key=args.old_api_key if args.mode == "both" else None,
wiki_excel_path=args.wiki_excel_path,
output_path=args.output_path,
max_workers=args.max_workers,
mode=args.mode
)
# 运行对比测试(带评判)
output_file = tester.run_comparison(with_judge=True)
print(f"对比评判结果已保存至: {output_file}")
# 单个问题测试示例
# 使用新的工作流类进行测试
# new_chat = NewWorkflowChat(api_key="app-qxsSybCs7ABiKlC1JabTYVn6", base_url="http://172.20.0.145/v1")
# result = new_chat.process_question("如何新建配电线路工程")
# print(json.dumps(result, ensure_ascii=False, indent=2))
print(f"测试结果已保存至: {output_file}")