删除不再使用的脚本文件,更新数据库连接配置为环境变量,优化Dify对比测试逻辑,增强日志记录.

This commit is contained in:
2025-07-01 11:43:56 +08:00
parent 500c8c166c
commit 603c8122d4
6 changed files with 94 additions and 715 deletions
+79 -74
View File
@@ -2,27 +2,39 @@
# -*- coding: utf-8 -*-
import os
from rag2_0.dify.dify_client import DifyClient
from rag2_0.dify.dify_tool import NewWorkflowChat, OldWorkFlowChat
import sys
import argparse
from threading import Lock
import pandas as pd
# 使用线程池并发执行
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from rag2_0.dify.dify_tool import DifyTool
import json
from urllib.parse import unquote
from rag2_0.tool.WikijsTool import WikijsTool
from rag2_0.tool.html_to_md import convert_html_to_md
from rag2_0.tool.ModelTool import OpenAiLLM
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from langchain.output_parsers import PydanticOutputParser
from threading import Lock
import sys
import argparse
sys.path.append(os.getcwd())
from rag2_0.dify.dify_client import DifyClient
from rag2_0.dify.dify_tool import NewWorkflowChat, OldWorkFlowChat
from rag2_0.tool.WikijsTool import WikijsTool
from rag2_0.tool.html_to_md import convert_html_to_md
from rag2_0.tool.ModelTool import OpenAiLLM
from rag2_0.dify.dify_tool import DifyTool
load_dotenv()
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
class ContentSource(BaseModel):
score:int = Field(description="相关性分数")
reason:str = Field(description="评分理由")
@@ -32,8 +44,7 @@ class DifyComparisonTester:
Dify新旧流程对比测试类,用于比较两个不同流程的问答效果并进行评判
"""
def __init__(self, excel_path:str, baseurl:str, new_workflow_api_key:str,
old_workflow_api_key:str=None, wiki_excel_path:str=None,
output_path:str=None, max_workers:int=1, mode:str="both"):
old_workflow_api_key:str=None, output_path:str=None, max_workers:int=1, mode:str="both"):
"""
初始化对比测试器
@@ -42,7 +53,6 @@ class DifyComparisonTester:
baseurl: Dify API的基础URL
new_workflow_api_key: 新流程的API密钥
old_workflow_api_key: 旧流程的API密钥,仅在mode="both"时需要
wiki_excel_path: Wiki Excel文件路径,用于获取标准答案
output_path: 输出Excel文件路径
max_workers: 最大工作线程数
mode: 测试模式,"new_only"表示仅测试新对话,"both"表示测试新老对话
@@ -64,8 +74,8 @@ class DifyComparisonTester:
self.results_lock = Lock()
# 读取Wiki Excel文件
if wiki_excel_path and os.path.exists(wiki_excel_path):
self.wiki_excel = pd.read_excel(wiki_excel_path)
if excel_path and os.path.exists(excel_path):
self.wiki_excel = pd.read_excel(excel_path)
else:
self.wiki_excel = None
@@ -78,13 +88,13 @@ class DifyComparisonTester:
"""
self.dify_tool.close_connection()
def get_llm(self):
def get_llm(self, **kwargs):
api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_API_BASE")
model = os.getenv("LLM_MODEL_NAME")
return OpenAiLLM(api_key=api_key, base_url=base_url, model=model)
return OpenAiLLM(api_key=api_key, base_url=base_url, model=model, **kwargs)
def find_wiki_link(self, query) -> str | None:
def find_wiki_link(self, row) -> str | None:
"""
根据查询找出对应的词条链接
@@ -94,30 +104,11 @@ class DifyComparisonTester:
Returns:
str: 对应的词条链接,如果没有找到则返回None
"""
# 确保query不为空
if not query or pd.isna(query):
return None
if self.wiki_excel is None:
return None
# 在"新提问"列中查找匹配的行
matched_rows = self.wiki_excel[self.wiki_excel['新提问'] == query]
# 如果找到了匹配的行,返回对应的词条链接
if not matched_rows.empty:
return matched_rows.iloc[0]['对应词条链接']
# 如果没有完全匹配,尝试部分匹配
# 去除软件名称部分(如果有)
query_parts = query.split(',', 1)
if len(query_parts) > 1:
clean_query = query_parts[1].strip()
# 在"提问"列中查找包含清理后查询的行
for idx, row in self.wiki_excel.iterrows():
if pd.notna(row['提问']) and clean_query in row['提问']:
return row['对应词条链接']
if "词条链接" in row:
return row["词条链接"]
return None
def get_wiki_content(self, link) -> str:
@@ -191,7 +182,7 @@ class DifyComparisonTester:
Returns:
str: 格式化的prompt
"""
return f"""请作为一个专业的答案评判专家,评估以下回答与标准答案的匹配程度。
return f"""请作为一个电力造价行业的专家,评估以下回答与标准答案的匹配程度。
标准答案:
{standard_answer}
@@ -199,11 +190,20 @@ class DifyComparisonTester:
待评估的回答:
{answer_to_check}
请仔细分析两个答案的内容,并给出你的判断。只需要回答"正确""错误",不需要其他解释。
如果待评估的回答与标准答案在核心内容和关键信息(步骤)上一致,即使表达方式不同,也应判定为"正确"
如果待评估的回答存在明显的错误信息或重要信息缺失,应判定为"错误"
请严格按以下格式输出:【正确】或【错误】:"""
要求
1、分析待评估的回答与标准答案的匹配程度(包括内容、步骤、主体等)
2、如果待评估的回答与标准答案在核心内容和关键信息(步骤)上一致,即使表达方式不同,也应判定为"正确"
3、只要大体描述一致,即使缺失了一些步骤,也应判定为"正确"
3、如果待评估的回答存在明显的错误信息,应判定为"错误"
4、请严格按json格式输出:
{{
"result": True or False,
"reason": "简明扼要的理由(中文)"
}}
字段说明:
result: True or False,待评估的回答是否正确
reason: 简明扼要的理由(中文)
"""
def judge_answer(self, standard_answer: str, answer: str) -> bool | None:
"""
@@ -218,10 +218,11 @@ class DifyComparisonTester:
"""
prompt = self.create_correctness_prompt(standard_answer, answer)
llm = self.get_llm()
llm = self.get_llm(response_format={"type": "json_object"})
try:
response = llm.invoke(user_prompt=prompt, need_retry=True)
return "正确" in response.content
response_json = json.loads(response.content)
return response_json["result"]
except Exception as e:
return None
@@ -513,10 +514,10 @@ content: "{content}"
return old_result, new_result
except Exception as e:
print(f"处理问题 '{q}' 时发生错误: {str(e)}")
logging.error(f"处理问题 '{q}' 时发生错误: {str(e)}", exc_info=True)
return None, None
def process_question_with_judge(self, q:str):
def process_question_with_judge(self, q:str, row):
"""
处理单个问题,获取新旧流程的回答并进行评判
@@ -537,7 +538,7 @@ content: "{content}"
new_answer = future_new["新流程答案"]
# 获取词条链接和标准答案
wiki_url = self.find_wiki_link(query)
wiki_url = self.find_wiki_link(row)
standard_answer = ""
answer_title = ""
@@ -546,7 +547,7 @@ content: "{content}"
standard_answer = self.get_wiki_content(wiki_url)
answer_title = self.get_wiki_title(wiki_url)
except Exception as e:
print(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}")
logging.error(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}", exc_info=True)
# 判断答案正确性
judge_result = ""
@@ -563,7 +564,7 @@ content: "{content}"
"问题分类": future_new["新问题分类"],
"槽点信息": future_new["槽点信息"],
"新流程答案": new_answer,
"回答判断": judge_result,
"回答是否正确": judge_result,
"答案词条": answer_title if answer_title else "",
"检索词条": future_new["新检索词条"],
}
@@ -576,7 +577,7 @@ content: "{content}"
new_answer = future_new["新流程答案"]
# 获取词条链接和标准答案
wiki_url = self.find_wiki_link(query)
wiki_url = self.find_wiki_link(row)
standard_answer = ""
answer_title = ""
@@ -585,7 +586,7 @@ content: "{content}"
standard_answer = self.get_wiki_content(wiki_url)
answer_title = self.get_wiki_title(wiki_url)
except Exception as e:
print(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}")
logging.error(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}", exc_info=True)
# 判断答案正确性
if standard_answer:
@@ -630,25 +631,25 @@ content: "{content}"
if row['回答中的软件名称'] == "未知" and row['提问中的软件名称'] == "未知":
continue
if row['提问中的软件名称'] != "未知":
questions.append(row['提问'])
questions.append((row['提问'],row))
else:
questions.append(f"{row['回答中的软件名称']}, {row['提问']}")
questions.append((f"{row['回答中的软件名称']}, {row['提问']}",row))
else:
questions.append(row['提问'])
questions.append((row['提问'], row))
results = []
is_debug = hasattr(sys, 'gettrace') and sys.gettrace() is not None
if not is_debug:
# 使用多线程并发处理问题
print("并发数量: ", self.max_workers)
print("问题数量: ", len(questions))
logging.info(f"并发数量: {self.max_workers}")
logging.info(f"问题数量: {len(questions)}")
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 创建进度条
with tqdm(total=len(questions), desc="处理问题进度") as pbar:
# 提交所有任务
futures = []
for q in questions:
future = executor.submit(self.process_question_with_judge, q)
for q, row in questions:
future = executor.submit(self.process_question_with_judge, q, row)
futures.append(future)
# 处理结果
@@ -659,9 +660,9 @@ content: "{content}"
results.append(result)
pbar.update(1)
else:
for q in questions:
result = self.process_question_with_judge(q)
print(json.dumps(result,ensure_ascii=False,indent=2))
for q, row in questions:
result = self.process_question_with_judge(q, row)
logging.info(json.dumps(result,ensure_ascii=False,indent=2))
if result is not None:
results.append(result)
@@ -687,24 +688,29 @@ content: "{content}"
if __name__ == "__main__":
# 创建命令行参数解析器
os.environ["DIFY_BASEURL"] = "http://10.1.16.39/v1"
os.environ["DIFY_NEW_API_KEY"] = "app-qxsSybCs7ABiKlC1JabTYVn6"
os.environ["DIFY_OLD_API_KEY"] = "app-wUdkWJx5zeOvmvBUZizMoSw3"
os.environ["DIFY_PG_HOST"] = "10.1.16.39"
os.environ["DIFY_PG_PORT"] = "5432"
os.environ["DIFY_PG_USER"] = "postgres"
os.environ["DIFY_PG_PASSWORD"] = "difyai123456"
os.environ["DIFY_PG_DATABASE"] = "dify"
default_excel_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/历史提问数据(like)_提问明确.xlsx")
default_wiki_excel_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/部分提问_软件名称明确.xlsx")
default_excel_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/740条(dislike)_存在标准词条.xlsx")
parser = argparse.ArgumentParser(description='Dify对话测试工具')
parser.add_argument('--mode', type=str, choices=['new_only', 'both'], default='new_only',
help='测试模式: new_only表示仅测试新对话, both表示测试新老对话')
parser.add_argument('--excel_path', type=str,
default=default_excel_path,
help='包含问题的Excel文件路径')
parser.add_argument('--baseurl', type=str, default="http://172.20.0.145/v1",
parser.add_argument('--baseurl', type=str, default=os.getenv("DIFY_BASEURL"),
help='Dify API的基础URL')
parser.add_argument('--new_api_key', type=str, default="app-qxsSybCs7ABiKlC1JabTYVn6",
parser.add_argument('--new_api_key', type=str, default=os.getenv("DIFY_NEW_API_KEY"),
help='新流程的API密钥')
parser.add_argument('--old_api_key', type=str, default="app-wUdkWJx5zeOvmvBUZizMoSw3",
parser.add_argument('--old_api_key', type=str, default=os.getenv("DIFY_OLD_API_KEY"),
help='旧流程的API密钥')
parser.add_argument('--wiki_excel_path', type=str,
default=default_wiki_excel_path,
help='Wiki Excel文件路径,用于获取标准答案')
parser.add_argument('--output_path', type=str, default=None,
help='输出Excel文件路径')
parser.add_argument('--max_workers', type=int, default=5,
@@ -715,7 +721,7 @@ if __name__ == "__main__":
# 检查Excel文件是否存在
if not os.path.exists(args.excel_path):
print(f"错误:Excel文件不存在: {args.excel_path}")
logging.error(f"错误:Excel文件不存在: {args.excel_path}", exc_info=True)
exit(1)
# 创建测试器并运行
@@ -724,7 +730,6 @@ if __name__ == "__main__":
baseurl=args.baseurl,
new_workflow_api_key=args.new_api_key,
old_workflow_api_key=args.old_api_key if args.mode == "both" else None,
wiki_excel_path=args.wiki_excel_path,
output_path=args.output_path,
max_workers=args.max_workers,
mode=args.mode
@@ -732,4 +737,4 @@ if __name__ == "__main__":
# 运行对比测试(带评判)
output_file = tester.run_comparison(with_judge=True)
print(f"测试结果已保存至: {output_file}")
logging.info(f"测试结果已保存至: {output_file}")
+13 -12
View File
@@ -39,11 +39,11 @@ class PgSql:
try:
# 连接数据库
self.connection = psycopg2.connect(
user="postgres",
password="difyai123456",
host="172.20.0.145",
port=5432,
database="dify"
user=os.getenv("DIFY_PG_USER"),
password=os.getenv("DIFY_PG_PASSWORD"),
host=os.getenv("DIFY_PG_HOST"),
port=os.getenv("DIFY_PG_PORT"),
database=os.getenv("DIFY_PG_DATABASE")
)
except (Exception, psycopg2.Error) as error:
@@ -160,11 +160,11 @@ class PgSql:
""",
(workflow_run_id,)
)
result = cursor.fetchall()
if result:
colnames = [desc[0] for desc in cursor.description]
return [dict(zip(colnames, row)) for row in result]
return None
result = cursor.fetchall()
if result:
colnames = [desc[0] for desc in cursor.description]
return [dict(zip(colnames, row)) for row in result]
return None
except (Exception, psycopg2.Error) as error:
raise Exception(f"Error while getting workflow_node_executions_info: {error}")
@@ -263,7 +263,8 @@ class BaseWorkflowChat:
析构函数,在对象被销毁时自动关闭数据库连接。
确保在对象生命周期结束时释放数据库资源。
"""
self.dify_tool.close_connection()
# DifyTool类已经在其__del__方法中关闭了数据库连接,无需在此重复调用
pass
def create_chat_message(self, query: str):
"""
@@ -464,7 +465,7 @@ class NewWorkflowChat(BaseWorkflowChat):
elif workflow_node["title"] == "提取处理后的知识":
outputs = json.loads(workflow_node["outputs"])["knowledge_list"]
retrieve_title, max_score, min_score, avg_score = self.get_retrieve_info(query=query, outputs=outputs, reranker_sorce_info=reranker_sorce)
elif workflow_node["title"] == "问题优化结果解析":
elif workflow_node["title"] == "意图识别结果解析":
outputs = json.loads(workflow_node["outputs"])
rewrite_query = outputs["optimize_query"]
llm_result_json = json.loads(workflow_node['inputs'])["llm_result"]