Files
QueryRewrite/rag2_0/dify/test_dify_chatapi.py
T

136 lines
5.3 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
from rag2_0.dify.dify_client import ChatClient, DifyClient
import pandas as pd
# 使用线程池并发执行
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from rag2_0.dify.dify_tool import DifyTool
import json
class DifyComparisonTester:
"""
Dify新旧流程对比测试类,用于比较两个不同流程的问答效果
"""
def __init__(self, excel_path:str, baseurl:str, old_workflow_api_key:str, new_workflow_api_key:str):
"""
初始化对比测试器
Args:
excel_path: 包含问题的Excel文件路径
baseurl: Dify API的基础URL
old_workflow_api_key: 旧流程的API密钥
new_workflow_api_key: 新流程的API密钥
"""
self.excel_path = excel_path
self.baseurl = baseurl
self.old_workflow_api_key = old_workflow_api_key
self.new_workflow_api_key = new_workflow_api_key
self.old_chat = ChatClient(api_key=old_workflow_api_key, base_url=baseurl)
self.new_chat = ChatClient(api_key=new_workflow_api_key, base_url=baseurl)
def process_question(self, q:str):
"""
处理单个问题,并行获取新旧流程的回答
Args:
q: 问题内容
Returns:
dict: 包含问题和两个流程回答的字典
"""
def get_old_answer():
try:
return self.old_chat.create_chat_message(inputs={}, query=q, user="AutoTestDifyChat").json()
except Exception as e:
return f"error: {str(e)}"
def get_new_answer():
try:
return self.new_chat.create_chat_message(inputs={}, query=q, user="AutoTestDifyChat").json()
except Exception as e:
return f"error: {str(e)}"
# 并行执行old_chat和new_chat
with ThreadPoolExecutor(max_workers=2) as executor:
future_old = executor.submit(get_old_answer)
future_new = executor.submit(get_new_answer)
old_result = future_old.result()
new_result = future_new.result()
old_message_id = old_result["message_id"]
new_message_id = new_result["message_id"]
old_message_info = DifyTool.get_message_debug_info_id(message_id=old_message_id)
new_message_info = DifyTool.get_message_debug_info_id(message_id=new_message_id)
for workflow_node in new_message_info["workflow_node_executions_info"]:
if workflow_node["title"] == "问题优化结果解析":
outputs = json.loads(workflow_node["outputs"])
rewrite_query = outputs["optimize_query"]
old_answer = old_result["answer"]
new_answer = new_result["answer"]
return {"问题": q, "问题改写": rewrite_query, "旧流程答案": old_answer, "新流程答案": new_answer}
def run_comparison(self):
"""
运行对比测试,处理所有问题并生成结果Excel
Returns:
str: 输出Excel文件的路径
"""
# 读取Excel文件中的问题
df = pd.read_excel(self.excel_path)
questions = df.iloc[:,0].tolist()
results = []
# 按顺序处理问题
with tqdm(total=len(questions), desc="处理问题进度") as pbar:
for q in questions:
result = self.process_question(q)
results.append(result)
pbar.update(1)
# 生成输出Excel文件
out_path = os.path.join(os.path.dirname(self.excel_path), "dify问答_对比结果.xlsx")
df_results = pd.DataFrame(results)
# 使用ExcelWriter设置格式
with pd.ExcelWriter(out_path, engine='xlsxwriter') as writer:
df_results.to_excel(writer, index=False, sheet_name='Sheet1')
# 获取工作簿和工作表对象
workbook = writer.book
worksheet = writer.sheets['Sheet1']
# 设置列宽
worksheet.set_column('A:A', 50) # 问题列宽 50个Excel单位
worksheet.set_column('B:B', 70) # 旧流程答案列宽 70个Excel单位
worksheet.set_column('C:C', 70) # 新流程答案列宽 70个Excel单位
return out_path
if __name__ == "__main__":
# 定义Excel路径
excel_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/历史提问数据(dislike)_1000条_软件明确.xlsx")
if not os.path.exists(excel_path):
print(f"错误:Excel文件不存在: {excel_path}")
exit(1)
# Dify API配置
baseurl = "http://172.20.0.145/v1"
old_workflow_api_key = "app-wUdkWJx5zeOvmvBUZizMoSw3"
new_workflow_api_key = "app-Lf1pQ1NVwdMfCRVNTBCOTPHT"
# 创建测试器并运行
tester = DifyComparisonTester(excel_path, baseurl, old_workflow_api_key, new_workflow_api_key)
output_file = tester.run_comparison()
print(f"对比结果已保存至: {output_file}")
# 单个问题测试示例
# c = DifyChat(baseurl="http://172.20.0.145/v1", api_key="app-LjJaeLoAfqa6aoGzqU9UvxSf")
# c.chat("如何新建配电线路工程")