#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import pandas as pd
# 使用线程池并发执行
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import json
import re
from dotenv import load_dotenv
import logging
from datetime import datetime
import os
from langchain_core.output_parsers import JsonOutputParser
sys.path.append(os.getcwd())
from rag2_0.dify.dify_client import ChatClient
from rag2_0.tool.ModelTool import OpenAiLLM
load_dotenv()
# 创建日志目录
log_dir = 'data/logs'
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 生成带时间戳的日志文件名
log_file = os.path.join(log_dir, f'dify_compare_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(), # 输出到控制台
logging.FileHandler(log_file, encoding='utf-8') # 同时输出到文件
]
)
class DifyCompareTest:
def __init__(self):
# 先词条后工单检索工作流
self.first_wiki_client = ChatClient(api_key="app-gocvuqduBnJptYNPpnW9V9R6", base_url=os.getenv("DIFY_BSAE_URL"))
# 词条与工单同时检索
self.both_wiki_worker_client = ChatClient(api_key="app-CPoOMaGDsLRPAe9TW7Xjhszy", base_url=os.getenv("DIFY_BSAE_URL"))
self.llm = OpenAiLLM(base_url=os.getenv("OPENAI_API_BASE"), model=os.getenv("MODEL_NAME"))
def llm_judge_answer(self, old_answer: str, now_answer: str):
user_prompt = f"""
请判断以下两个文本描述内容是否大致相同(内容主体等)
文本1:
{old_answer}
=================
文本2:
{now_answer}
输出格式(json格式输出):
{{
"is_same": true or false,
"reason": "文本1和文本2大致相同"
}}
"""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
response = self.llm.invoke(user_prompt=user_prompt, need_retry=False, response_format={"type": "json_object"})
response.content = response.content.strip()
clean_output = re.sub(r'.*?', '', response.content, flags=re.DOTALL)
result = JsonOutputParser().parse(clean_output)
return "回答基本相同" if result.get("is_same", False) else "回答基本不相同"
except Exception as e:
retry_count += 1
if retry_count >= max_retries:
logging.error(f"LLM判断过程在尝试 {max_retries} 次后仍然出错: {e}")
return ""
else:
# 可以添加短暂的等待时间,避免立即重试
import time
time.sleep(1) # 等待1秒后重试
def process_workflow(self, workflow_name, client, inputs, query, old_answer):
"""处理单个工作流调用"""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
response = client.create_chat_message(
inputs=inputs, query=query, user="AutoCodeRun", response_mode="blocking"
)
result = response.json()
answer = result.get('answer', "")
if len(answer) == 0:
raise Exception(f"回答为空: {result}")
judge_result = self.llm_judge_answer(old_answer=old_answer, now_answer=answer)
return answer, judge_result
except Exception as e:
retry_count += 1
if retry_count >= max_retries:
logging.error(f"{workflow_name}调用失败 (尝试 {max_retries} 次后): {e}")
return '', ''
else:
import time
time.sleep(1) # 等待1秒后重试
def process_single_row(self, index, row):
"""处理单行数据的方法,用于多线程执行"""
try:
query = row["提问"]
old_answer = row["回答"]
current_software = row["当前软件"]
inputs = {
"current_softname": current_software,
"user_name": "AutoCodeRun"
}
# 并行调用两个工作流
results = {'first_wiki': None, 'both_wiki_worker': None}
with ThreadPoolExecutor(max_workers=2) as workflow_executor:
# 提交两个工作流任务
futures = {
workflow_executor.submit(
self.process_workflow,
"先词条后工单工作流",
self.first_wiki_client,
inputs,
query,
old_answer
): 'first_wiki',
workflow_executor.submit(
self.process_workflow,
"词条与工单同时工作流",
self.both_wiki_worker_client,
inputs,
query,
old_answer
): 'both_wiki_worker'
}
# 收集结果
for future in as_completed(futures):
workflow_key = futures[future]
try:
answer, judge_result = future.result()
results[workflow_key] = (answer, judge_result)
except Exception as e:
logging.error(f"工作流执行失败 (行{index}): {e}")
results[workflow_key] = ('', '')
# 构建结果
result_row = row.copy()
result_row["先词条后工单回答"] = results['first_wiki'][0]
result_row["先词条后工单回答对比"] = results['first_wiki'][1]
result_row["词条与工单同时回答"] = results['both_wiki_worker'][0]
result_row["词条与工单同时回答对比"] = results['both_wiki_worker'][1]
logging.info(f"成功处理第 {index + 1} 行数据")
return index, result_row
except Exception as e:
logging.error(f"处理第 {index + 1} 行数据时出错: {e}")
result_row = row.copy()
result_row["先词条后工单回答"] = ''
result_row["先词条后工单回答对比"] = ''
result_row["词条与工单同时回答"] = ''
result_row["词条与工单同时回答对比"] = ''
return index, result_row
def run(self, excel_path, save_path, max_workers=3):
"""
运行对比测试
Args:
excel_path: Excel文件路径
save_path: 保存路径
max_workers: 最大并发线程数,默认为3
"""
try:
# 读取Excel文件
if not os.path.exists(excel_path):
logging.error(f"Excel文件不存在: {excel_path}")
return
df = pd.read_excel(excel_path)
logging.info(f"成功读取Excel文件: {excel_path}, 共 {len(df)} 行数据")
# 验证必要的列是否存在
required_columns = ["提问", "回答", "当前软件"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
logging.error(f"Excel文件缺少必要的列: {missing_columns}")
return
# 创建保存目录
save_dir = os.path.dirname(save_path)
if save_dir and not os.path.exists(save_dir):
os.makedirs(save_dir)
# 使用线程池处理数据
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_index = {
executor.submit(self.process_single_row, index, row): index
for index, row in df.iterrows()
}
# 使用tqdm显示进度
with tqdm(total=len(future_to_index), desc="处理进度") as pbar:
for future in as_completed(future_to_index):
try:
index, result_row = future.result()
results[index] = result_row
pbar.update(1)
except Exception as e:
original_index = future_to_index[future]
logging.error(f"线程执行失败 (行{original_index + 1}): {e}")
# 添加失败的行
result_row = df.iloc[original_index].copy()
result_row["先词条后工单回答"] = '线程执行失败'
result_row["先词条后工单回答对比"] = '线程执行失败'
result_row["词条与工单同时回答"] = '线程执行失败'
result_row["词条与工单同时回答对比"] = '线程执行失败'
results[original_index] = result_row
pbar.update(1)
# 按原始顺序重新组织结果
rows_info = [results[i] for i in sorted(results.keys())]
# 保存结果
result_df = pd.DataFrame(rows_info)
result_df.to_excel(save_path, index=False)
logging.info(f"结果已保存到: {save_path}")
except Exception as e:
logging.error(f"运行过程中出现错误: {e}")
raise
if __name__ == "__main__":
try:
dify_compare_test = DifyCompareTest()
# 处理第一个文件
excel_files = [
# ("data/excel/5月.xlsx", "data/excel/5月问答对比.xlsx"),
("data/excel/其他月.xlsx", "data/excel/其他月问答对比.xlsx")
]
for excel_path, save_path in excel_files:
logging.info(f"开始处理文件: {excel_path}")
try:
dify_compare_test.run(excel_path=excel_path, save_path=save_path, max_workers=3)
logging.info(f"文件处理完成: {excel_path}")
except Exception as e:
logging.error(f"处理文件 {excel_path} 时出错: {e}")
continue
logging.info("所有文件处理完成")
except Exception as e:
logging.error(f"程序执行出错: {e}")
sys.exit(1)