From 6ba314188546b3bf4c0ef51ecbbc0895984addcd Mon Sep 17 00:00:00 2001 From: ouyangyouzhang Date: Fri, 18 Jul 2025 16:50:24 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rag2_0/dify/DifyCompareTest.py | 928 +++++++----------------------- rag2_0/dify/dify_client/client.py | 2 +- rag2_0/dify/dify_tool.py | 325 ----------- rag2_0/tool/ModelTool.py | 6 +- 4 files changed, 223 insertions(+), 1038 deletions(-) diff --git a/rag2_0/dify/DifyCompareTest.py b/rag2_0/dify/DifyCompareTest.py index 342a1a0..93bbd29 100755 --- a/rag2_0/dify/DifyCompareTest.py +++ b/rag2_0/dify/DifyCompareTest.py @@ -3,27 +3,31 @@ import os import sys -import argparse -from threading import Lock import pandas as pd # 使用线程池并发执行 from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm import json -from urllib.parse import unquote +import re from dotenv import load_dotenv -from pydantic import BaseModel, Field -from langchain.output_parsers import PydanticOutputParser +import logging +from datetime import datetime +import os +from langchain_core.output_parsers import JsonOutputParser sys.path.append(os.getcwd()) -from rag2_0.dify.dify_client import DifyClient -from rag2_0.dify.dify_tool import NewWorkflowChat, OldWorkFlowChat -from rag2_0.tool.WikijsTool import WikijsTool -from rag2_0.tool.html_to_md import convert_html_to_md +from rag2_0.dify.dify_client import ChatClient from rag2_0.tool.ModelTool import OpenAiLLM -from rag2_0.dify.dify_tool import DifyTool load_dotenv() +# 创建日志目录 +log_dir = 'data/logs' +if not os.path.exists(log_dir): + os.makedirs(log_dir) + +# 生成带时间戳的日志文件名 +log_file = os.path.join(log_dir, f'dify_compare_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log') + import logging # 配置日志 @@ -31,731 +35,233 @@ logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ - logging.StreamHandler() + logging.StreamHandler(), # 输出到控制台 + logging.FileHandler(log_file, encoding='utf-8') # 同时输出到文件 ] ) -class ContentSource(BaseModel): - score:int = Field(description="相关性分数") - reason:str = Field(description="评分理由") +class DifyCompareTest: + def __init__(self): + # 先词条后工单检索工作流 + self.first_wiki_client = ChatClient(api_key="app-gocvuqduBnJptYNPpnW9V9R6", base_url=os.getenv("DIFY_BSAE_URL")) + # 词条与工单同时检索 + self.both_wiki_worker_client = ChatClient(api_key="app-CPoOMaGDsLRPAe9TW7Xjhszy", base_url=os.getenv("DIFY_BSAE_URL")) + self.llm = OpenAiLLM(base_url=os.getenv("OPENAI_API_BASE"), model="deepseek-ai/DeepSeek-R1") -class DifyComparisonTester: - """ - Dify新旧流程对比测试类,用于比较两个不同流程的问答效果并进行评判 - """ - def __init__(self, excel_path:str, baseurl:str, new_workflow_api_key:str, - old_workflow_api_key:str=None, output_path:str=None, max_workers:int=1, mode:str="both"): + def llm_judge_answer(self, old_answer: str, now_answer: str): + user_prompt = f""" + 请判断以下两个文本描述内容是否大致相同(内容主体等) + 文本1: + + {old_answer} + + ================= + 文本2: + + {now_answer} + + 输出格式(json格式输出): + {{ + "is_same": true or false, + "reason": "文本1和文本2大致相同" + }} """ - 初始化对比测试器 - - Args: - excel_path: 包含问题的Excel文件路径 - baseurl: Dify API的基础URL - new_workflow_api_key: 新流程的API密钥 - old_workflow_api_key: 旧流程的API密钥,仅在mode="both"时需要 - output_path: 输出Excel文件路径 - max_workers: 最大工作线程数 - mode: 测试模式,"new_only"表示仅测试新对话,"both"表示测试新老对话 - """ - self.excel_path = excel_path - self.mode = mode - - # 使用NewWorkflowChat和OldWorkFlowChat代替ChatClient - self.new_chat = NewWorkflowChat(api_key=new_workflow_api_key, base_url=baseurl) - if mode == "both" and old_workflow_api_key: - self.old_chat = OldWorkFlowChat(api_key=old_workflow_api_key, base_url=baseurl) - else: - self.old_chat = None - - # 评判相关参数 - self.output_path = output_path or os.path.join(os.path.dirname(self.excel_path), "dify问答_新流程结果.xlsx") - self.max_workers = max_workers - self.content_source_parser = PydanticOutputParser(pydantic_object=ContentSource) - self.results_lock = Lock() - - # 读取Wiki Excel文件 - if excel_path and os.path.exists(excel_path): - self.wiki_excel = pd.read_excel(excel_path) - else: - self.wiki_excel = None - - self.dify_tool = DifyTool() - def get_llm(self, **kwargs): - api_key = os.getenv("OPENAI_API_KEY") - base_url = os.getenv("OPENAI_API_BASE") - model = os.getenv("MODEL_NAME") - return OpenAiLLM(api_key=api_key, base_url=base_url, model=model, **kwargs) - - def find_wiki_link(self, row) -> str | None: - """ - 根据查询找出对应的词条链接 - - Args: - query (str): 查询内容 - - Returns: - str: 对应的词条链接,如果没有找到则返回None - """ - if self.wiki_excel is None: - return None - - if "词条链接" in row: - return row["词条链接"] - return None - - def get_wiki_content(self, link) -> str: - """ - 获取词条链接的内容 - - Args: - link (str): 词条链接 - - Returns: - str: 链接内容,如果获取失败则返回错误信息 - """ - try: - if not link or pd.isna(link): - return "链接为空或无效" - # 移除域名部分,只保留路径 - path = link.split('/', 3)[-1] - decoded_path = unquote(path) - path_parts = decoded_path.split('/') - doc_path = "/".join(path_parts[1:]) - wiki_doc = WikijsTool.get_all_doc_by_path(path=doc_path, path_is_dir=False) - html_content = WikijsTool.query_doc_info(wiki_doc[0]["id"]).get('content') - if not html_content: - return "获取内容失败" - - options = {"heading_style": '', "keep_inline_images_in": ["figure", "img"], "escape_asterisks": True} - new_content = (html_content.replace("h6>", "h7>") - .replace("h5>", "h6>") - .replace("h4>", "h5>") - .replace("h3>", "h4>") - .replace("h2>", "h3>") - .replace("h1>", "h2>")) - # 将HTML内容转换为Markdown - markdown_content = convert_html_to_md(new_content, "", **options) - markdown_content = f"# {path_parts[-1]}\n\n{markdown_content}" - return markdown_content - - except Exception as e: - raise RuntimeError(f"获取词条内容失败: {str(e)}") from e - - def get_wiki_title(self, link) -> str | None: - """ - 获取词条标题 - - Args: - link (str): 词条链接 - - Returns: - str: 词条标题,如果获取失败则返回None - """ - try: - if not link or pd.isna(link): - return None - # 移除域名部分,只保留路径 - path = link.split('/', 3)[-1] - decoded_path = unquote(path) - path_parts = decoded_path.split('/') - return path_parts[-1] - - except Exception as e: - raise RuntimeError(f"获取词条内容失败: {str(e)}") from e - - def create_correctness_prompt(self, standard_answer: str, answer_to_check: str) -> str: - """ - 创建用于评判答案正确性的prompt - - Args: - standard_answer (str): 标准答案 - answer_to_check (str): 需要检查的答案 - - Returns: - str: 格式化的prompt - """ - return f"""请作为一个电力造价行业的专家,评估以下回答与标准答案的匹配程度。 - -标准答案: -{standard_answer} - -待评估的回答: -{answer_to_check} - -要求 -1、分析待评估的回答与标准答案的匹配程度(包括内容、步骤、主体等) -2、如果待评估的回答与标准答案在核心内容和关键信息(步骤)上一致,即使表达方式不同,也应判定为"正确"。 -3、如果待评估的回答存在明显的错误信息,应判定为"错误"。 -4、请严格按json格式输出: -{{ - "result": True or False, - "reason": "简明扼要的理由(中文)" -}} -字段说明: -result: True or False,待评估的回答是否正确 -reason: 简明扼要的理由(中文) -""" - - def judge_answer(self, standard_answer: str, answer: str) -> bool | None: - """ - 调用LLM判断回答是否正确 - - Args: - standard_answer (str): 标准答案(来自Wiki) - answer (str): 需评判的回答 - - Returns: - bool | None: 判断结果,True表示正确,False表示错误,None表示判断失败 - """ - - prompt = self.create_correctness_prompt(standard_answer, answer) - llm = self.get_llm(response_format={"type": "json_object"}) - max_retries = 3 retry_count = 0 while retry_count < max_retries: try: - response = llm.invoke(user_prompt=prompt, need_retry=True) - response_json = json.loads(response.content) - return response_json["result"] + response = self.llm.invoke(user_prompt=user_prompt, need_retry=False, response_format={"type": "json_object"}) + response.content = response.content.strip() + clean_output = re.sub(r'.*?', '', response.content, flags=re.DOTALL) + result = JsonOutputParser().parse(clean_output) + result = json.loads(clean_output) + return "回答基本相同" if result.get("is_same", False) else "回答基本不相同" except Exception as e: retry_count += 1 if retry_count >= max_retries: - logging.error(f"判断答案失败,已重试{max_retries}次: {str(e)}") - return False - # 指数退避策略,每次重试等待时间增加 - import time - time.sleep(1 * (2 ** (retry_count - 1))) # 1秒, 2秒, 4秒... - - def judge_by_standard_answer(self, standard_answer: str, old_answer: str, new_answer: str) -> str | None: - """ - 综合判断新旧回答的正确性 - - Args: - standard_answer (str): 标准答案(来自Wiki) - old_answer (str): 旧流程的回答 - new_answer (str): 新流程的回答 - - Returns: - str | None: 包含新旧回答判断结果的字符串,None表示判断失败 - """ - old_result = self.judge_answer(standard_answer, old_answer) - new_result = self.judge_answer(standard_answer, new_answer) - if old_result is None or new_result is None: - return None - if new_result and old_result: - return "新旧答案均正确" - elif new_result and not old_result: - return "新答案正确" - elif not new_result and old_result: - return "旧答案正确" - else: - return "新旧答案均错误" - - def judge_answer_diff(self, old_answer: str, new_answer: str) -> str | None: - """ - 判断新旧回答是否存在较大差异 - - Args: - old_answer (str): 旧流程的回答 - new_answer (str): 新流程的回答 - - Returns: - str | None: 差异判断结果,None表示判断失败 - """ - - prompt = f"""请判断以下两个回答是否存在较大差异: - - 旧回答: {old_answer} - - 新回答: {new_answer} - - 主要是主要步骤、主要信息、或者主要主体的差异 - 请仅回答"存在较大差异"或"差异较小"。""" - llm = self.get_llm() - try: - response = llm.invoke(user_prompt=prompt, need_retry=True) - return "缺乏标准答案无法判断准确性,但答案基本相同" if "差异较小" in response.content else "缺乏标准答案无法判断准确性,但答案差异较大" - except Exception as e: - return None - - def calculate_score(self, query:str, content:str) -> int: - """ - 使用LLM判断query与content之间的相关性分数 - - Args: - query (str): 用户问题 - content (str): 检索内容 - - Returns: - int: 相关性分数,1-10分,10代表完全相关,1代表完全不相关;-1表示评分失败 - """ - - try: - prompt = f"""你是一个专业的信息相关性评估助手。请根据以下标准对用户query和检索内容的相关性进行1-10评分(10=完全相关,1=完全不相关),并按指定格式输出JSON结果。 - -【评分标准】 -10分:完全契合,主题/意图完全一致且涵盖所有关键信息 -8-9分:高度相关,核心要素匹配但存在少量信息缺失 -6-7分:部分相关,涉及相同主题但存在重要信息缺失 -4-5分:弱相关,仅次要信息点匹配 -1-3分:完全不相关或信息冲突 - -【评估维度】 -1. 主题一致性:核心主题/意图的匹配程度 -2. 内容覆盖度:是否涵盖query的关键要素 -3. 信息准确性:是否存在矛盾/错误信息 -4. 细节丰富度:是否提供query要求的详细信息 - -【输出格式】 -{{ - "score": 评分, - "reason": "简明扼要的评分理由(中文)" -}} - -【示例】 -query: "新冠疫苗的常见副作用" -内容: "辉瑞疫苗常见反应包括注射部位疼痛(84.1%)、疲劳(62.9%)" -输出: {{"score":8,"reason":"主题完全匹配,涵盖主要副作用但未提及发热等常见反应"}} - -现在评估: -query: "{query}" -content: "{content}" -""" - llm = self.get_llm() - response = llm.invoke(user_prompt=prompt, need_retry=True) - - # 解析JSON响应 - try: - parsed_output = self.content_source_parser.parse(response.content) - return parsed_output.score - except Exception as e: - return -1 - except Exception as e: - return -1 - - def get_retrieve_info(self, query:str, outputs:dict) -> tuple: - """ - 获取检索信息并计算分数 - - Args: - query (str): 用户问题 - outputs (dict): 检索输出结果 - - Returns: - tuple: (检索内容列表, 最高分, 最低分, 平均分) - """ - max_score = 0 - min_score = 10 - total_score = 0 - valid_scores = 0 - retrieve_content = [] - - # 使用线程池并发计算分数 - with ThreadPoolExecutor() as executor: - # 创建任务列表 - future_to_content = {} - for result in outputs["result"]: - content = result["content"].strip() - future = executor.submit(self.calculate_score, query=query, content=content) - future_to_content[future] = content - - # 收集结果 - for future in as_completed(future_to_content): - content = future_to_content[future] - score = future.result() - content_title = content.split("\n")[0] - - if score != -1: - max_score = max(max_score, score) - min_score = min(min_score, score) - total_score += score - valid_scores += 1 - - if content_title: - retrieve_content.append(content_title + f"--得分({score}分)") - - avg_score = total_score / valid_scores if valid_scores > 0 else 0 - return retrieve_content, max_score, min_score, avg_score - - def get_new_workflow_info(self, query:str, new_message_id:str) -> dict: - """ - 获取新流程的问题分类 - - Args: - query (str): 用户问题 - new_message_id (str): 新流程的消息ID - - Returns: - dict: 包含问题分类结果的字典 - """ - try: - # 使用DifyTool直接获取消息信息 - new_message_info = self.dify_tool.get_message_debug_info_by_id(message_id=new_message_id) - - # 初始化变量 - retrieve_title = [] - retrieve_content = [] - rewrite_query = "" - vertical_classification = "" - sub_classification = "" - slot_info = "" - - # 解析工作流节点信息 - for workflow_node in new_message_info["workflow_node_executions_info"]: - if workflow_node["title"] == "知识检索结果后处理": - outputs = json.loads(workflow_node["outputs"]) - retrieve_title, max_score, min_score, avg_score = self.get_retrieve_info(query=query, outputs=outputs) - retrieve_content = outputs["result"] - elif workflow_node["title"] == "问题优化结果解析": - outputs = json.loads(workflow_node["outputs"]) - rewrite_query = outputs["optimize_query"] - llm_result_json = json.loads(workflow_node['inputs'])["llm_result"] - json_result = json.loads(llm_result_json) - vertical_classification = json_result['vertical_classification'] - sub_classification = json_result['sub_classification'] - slot_info = json.dumps(json_result["slot_filling"], ensure_ascii=False, indent=2) - except Exception as e: - return None - - return { - "问题改写": rewrite_query, - "检索词条": "\n".join(retrieve_title) if retrieve_title else "未检索知识库", - "问题分类": f"{vertical_classification} - {sub_classification}", - "槽点信息": slot_info - } - - def get_old_workflow_info(self, query:str, old_message_id:str) -> dict: - """ - 获取旧流程的问题分类 - - Args: - query (str): 用户问题 - old_message_id (str): 旧的流程的消息ID - - Returns: - dict: 包含问题分类结果的字典 - """ - try: - # 使用DifyTool直接获取消息信息 - old_message_info = self.dify_tool.get_message_debug_info_by_id(message_id=old_message_id) - - # 初始化变量 - retrieve_title = [] - retrieve_content = [] - rewrite_query = "" - - # 解析工作流节点信息 - for workflow_node in old_message_info["workflow_node_executions_info"]: - if workflow_node["title"] == "知识检索结果后处理": - outputs = json.loads(workflow_node["outputs"]) - retrieve_title, max_score, min_score, avg_score = self.get_retrieve_info(query=query, outputs=outputs) - retrieve_content = outputs["result"] - elif workflow_node["title"] == "问题优化结果解析": - outputs = json.loads(workflow_node["outputs"]) - rewrite_query = outputs["optimize_query"] - except Exception as e: - return None - - return { - "问题改写": rewrite_query, - "检索词条": "\n".join(retrieve_title) if retrieve_title else "未检索知识库", - } - - def get_retrieve_title_similarity(self, old_retrieve_content:list[dict], new_retrieve_content:list[dict]) -> str: - old_retrieve_content_list=[content["content"] for content in old_retrieve_content] - new_retrieve_content_list=[content["content"] for content in new_retrieve_content] - # 计算两个列表的交集 - intersection = set(old_retrieve_content_list).intersection(set(new_retrieve_content_list)) - - # 准备详细的比较结果 - intersection_count = len(intersection) - old_count = len(old_retrieve_content_list) - new_count = len(new_retrieve_content_list) - - # 计算相似度 (Jaccard相似系数) - if old_count == 0 and new_count == 0: - similarity = 1.0 # 都为空时,认为完全相似 - elif old_count == 0 or new_count == 0: - similarity = 0.0 # 一个为空时,认为完全不相似 - else: - # 交集大小除以并集大小 - union_count = len(set(old_retrieve_content_list).union(set(new_retrieve_content_list))) - similarity = intersection_count / union_count - - similarity_percentage = round(similarity * 100, 2) - result = f"{similarity_percentage}%" - return result - - def process_question(self, q:str) -> tuple: - """ - 处理单个问题,获取新旧流程的回答 - - Args: - q: 问题内容 - - Returns: - tuple: (old_result, new_result) 包含旧流程和新流程的回答信息 - """ - try: - # 如果是仅测试新流程模式 - if self.mode == "new_only" or self.old_chat is None: - new_result = self.new_chat.process_question(q) - return None, new_result - else: - # 使用ThreadPoolExecutor并发执行新旧流程 - with ThreadPoolExecutor(max_workers=2) as executor: - # 并发提交新旧流程的任务 - future_new = executor.submit(self.new_chat.process_question, q) - future_old = executor.submit(self.old_chat.process_question, q) - - # 获取结果 - new_result = future_new.result() - old_result = future_old.result() - - return old_result, new_result - except Exception as e: - logging.error(f"处理问题 '{q}' 时发生错误: {str(e)}", exc_info=True) - return None, None - - def process_question_with_judge(self, q:str, row): - """ - 处理单个问题,获取新旧流程的回答并进行评判 - - Args: - q: 问题内容 - - Returns: - dict: 包含问题、回答和评判结果的字典 - """ - try: - # 获取基本的问题和回答 - future_old, future_new = self.process_question(q) - if future_new is None: - return None - - # 如果是仅测试新流程模式 - if self.mode == "new_only" or future_old is None: - query = future_new["问题"] - new_answer = future_new["新流程答案"] - - # 获取词条链接和标准答案 - wiki_url = self.find_wiki_link(row) - standard_answer = "" - answer_title = "" - - try: - if wiki_url and not pd.isna(wiki_url): - standard_answer = self.get_wiki_content(wiki_url) - answer_title = self.get_wiki_title(wiki_url) - except Exception as e: - logging.error(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}", exc_info=True) - - # 判断答案正确性 - judge_result = "" - if standard_answer: - # 调用LLM判断新答案是否正确 - new_result = self.judge_answer(standard_answer, new_answer) - if new_result is not None: - judge_result = "正确" if new_result else "错误" - - # 判断检索词条是否正确 - retrieve_right = answer_title in future_new["新检索词条"] - retrieve_right_str = ("正确" if retrieve_right else "错误") if answer_title else "" - # 判断槽点是否缺失 - slot_info = future_new["槽点信息"] - slot_info_data=None - if isinstance(slot_info, str): - slot_info_data = json.loads(slot_info) + logging.error(f"LLM判断过程在尝试 {max_retries} 次后仍然出错: {e}") + return "" else: - slot_info_data = slot_info - slot_missing = slot_info_data.get("missing_slots", {}) - slot_missing_str = "完整" if len(slot_missing) == 0 else "缺失" - # 返回结果 - return { - "问题": query, - "问题改写": future_new["新问题改写"], - "问题分类": future_new["新问题分类"], - "槽点信息": future_new["槽点信息"], - "槽点是否缺失": slot_missing_str, - "新流程答案": new_answer, - "回答是否正确": judge_result, - "检索是否正确": retrieve_right_str, - "答案词条": answer_title if answer_title else "", - "检索词条": future_new["新检索词条"], - } + # 可以添加短暂的等待时间,避免立即重试 + import time + time.sleep(1) # 等待1秒后重试 + + + def process_workflow(self, workflow_name, client, inputs, query, old_answer): + """处理单个工作流调用""" + try: + response = client.create_chat_message( + inputs=inputs, query=query, user="AutoCodeRun", response_mode="blocking" + ) + result = response.json() + answer = result.get('answer', "") + judge_result = self.llm_judge_answer(old_answer=old_answer, now_answer=answer) + return answer, judge_result + except Exception as e: + logging.error(f"{workflow_name}调用失败: {e}") + return '', '' + + def process_single_row(self, index, row): + """处理单行数据的方法,用于多线程执行""" + try: + query = row["提问"] + old_answer = row["回答"] + current_software = row["当前软件"] - # 如果是测试新老流程模式 - if future_old is None: - return None - query = future_old["问题"] - old_answer = future_old["旧流程答案"] - new_answer = future_new["新流程答案"] - - # 获取词条链接和标准答案 - wiki_url = self.find_wiki_link(row) - standard_answer = "" - answer_title = "" - - try: - if wiki_url and not pd.isna(wiki_url): - standard_answer = self.get_wiki_content(wiki_url) - answer_title = self.get_wiki_title(wiki_url) - except Exception as e: - logging.error(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}", exc_info=True) - - # 判断答案正确性 - if standard_answer: - judge_result = self.judge_by_standard_answer(standard_answer, old_answer, new_answer) - else: - judge_result = self.judge_answer_diff(old_answer, new_answer) - - if judge_result is None: - judge_result = "" - - # 返回结果 - return { - "问题": query, - "新问题改写": future_new["新问题改写"], - "旧问题改写": future_old["旧问题改写"], - "新问题分类": future_new["新问题分类"], - "槽点信息": future_new["槽点信息"], - "新流程答案": new_answer, - "旧流程答案": old_answer, - "回答判断": judge_result, - # "词条检索相似度": retrieve_title_score, - "答案词条": answer_title if answer_title else "", - "新检索词条": future_new["新检索词条"], - "旧检索词条": future_old["旧检索词条"], + inputs = { + "current_softname": current_software, + "user_name": "AutoCodeRun" } + + # 并行调用两个工作流 + results = {'first_wiki': None, 'both_wiki_worker': None} + + with ThreadPoolExecutor(max_workers=2) as workflow_executor: + # 提交两个工作流任务 + futures = { + workflow_executor.submit( + self.process_workflow, + "先词条后工单工作流", + self.first_wiki_client, + inputs, + query, + old_answer + ): 'first_wiki', + + workflow_executor.submit( + self.process_workflow, + "词条与工单同时工作流", + self.both_wiki_worker_client, + inputs, + query, + old_answer + ): 'both_wiki_worker' + } + + # 收集结果 + for future in as_completed(futures): + workflow_key = futures[future] + try: + answer, judge_result = future.result() + results[workflow_key] = (answer, judge_result) + except Exception as e: + logging.error(f"工作流执行失败 (行{index}): {e}") + results[workflow_key] = ('', '') + + # 构建结果 + result_row = row.copy() + result_row["先词条后工单回答"] = results['first_wiki'][0] + result_row["先词条后工单回答对比"] = results['first_wiki'][1] + result_row["词条与工单同时回答"] = results['both_wiki_worker'][0] + result_row["词条与工单同时回答对比"] = results['both_wiki_worker'][1] + + logging.info(f"成功处理第 {index + 1} 行数据") + return index, result_row + except Exception as e: - logging.error(f"处理问题 '{q}' 时发生错误: {str(e)}", exc_info=True) - return None - - def run_comparison(self, with_judge=False): + logging.error(f"处理第 {index + 1} 行数据时出错: {e}") + result_row = row.copy() + result_row["先词条后工单回答"] = '' + result_row["先词条后工单回答对比"] = '' + result_row["词条与工单同时回答"] = '' + result_row["词条与工单同时回答对比"] = '' + return index, result_row + + + def run(self, excel_path, save_path, max_workers=3): """ - 运行对比测试,处理所有问题并生成结果Excel + 运行对比测试 Args: - with_judge: 是否进行答案评判 - - Returns: - str: 输出Excel文件的路径 + excel_path: Excel文件路径 + save_path: 保存路径 + max_workers: 最大并发线程数,默认为3 """ - # 读取Excel文件中的问题 - df = pd.read_excel(self.excel_path) - questions=[] - for idx, row in df.iterrows(): - if "回答中的软件名称" in row and "提问中的软件名称" in row: - if row['回答中的软件名称'] == "未知" and row['提问中的软件名称'] == "未知": - continue - if row['提问中的软件名称'] != "未知": - questions.append((row['提问'],row)) - else: - questions.append((f"{row['回答中的软件名称']}, {row['提问']}",row)) - else: - questions.append((row['提问'], row)) + try: + # 读取Excel文件 + if not os.path.exists(excel_path): + logging.error(f"Excel文件不存在: {excel_path}") + return + + df = pd.read_excel(excel_path) + logging.info(f"成功读取Excel文件: {excel_path}, 共 {len(df)} 行数据") - results = [] - is_debug = hasattr(sys, 'gettrace') and sys.gettrace() is not None - if not is_debug: - # 使用多线程并发处理问题 - logging.info(f"并发数量: {self.max_workers}") - logging.info(f"问题数量: {len(questions)}") - with ThreadPoolExecutor(max_workers=self.max_workers) as executor: - # 创建进度条 - with tqdm(total=len(questions), desc="处理问题进度") as pbar: - # 提交所有任务 - futures = [] - for q, row in questions: - future = executor.submit(self.process_question_with_judge, q, row) - futures.append(future) - - # 处理结果 - for future in as_completed(futures): - result = future.result() - if result is not None: - with self.results_lock: - results.append(result) - pbar.update(1) - else: - for q, row in questions: - result = self.process_question_with_judge(q, row) - logging.info(json.dumps(result,ensure_ascii=False,indent=2)) - if result is not None: - results.append(result) - - # 生成输出Excel文件 - out_path = self.output_path - df_results = pd.DataFrame(results) - - # 使用ExcelWriter设置格式 - with pd.ExcelWriter(out_path, engine='xlsxwriter') as writer: - df_results.to_excel(writer, index=False, sheet_name='Sheet1') + # 验证必要的列是否存在 + required_columns = ["提问", "回答", "当前软件"] + missing_columns = [col for col in required_columns if col not in df.columns] + if missing_columns: + logging.error(f"Excel文件缺少必要的列: {missing_columns}") + return - # 获取工作簿和工作表对象 - workbook = writer.book - worksheet = writer.sheets['Sheet1'] + # 创建保存目录 + save_dir = os.path.dirname(save_path) + if save_dir and not os.path.exists(save_dir): + os.makedirs(save_dir) + + # 使用线程池处理数据 + results = {} - # 设置列宽 - for col_idx, col_name in enumerate(df_results.columns): - max_len = max(df_results[col_name].astype(str).map(len).max(), len(col_name)) - worksheet.set_column(col_idx, col_idx, min(max_len + 2, 70)) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # 提交所有任务 + future_to_index = { + executor.submit(self.process_single_row, index, row): index + for index, row in df.iterrows() + } + + # 使用tqdm显示进度 + with tqdm(total=len(future_to_index), desc="处理进度") as pbar: + for future in as_completed(future_to_index): + try: + index, result_row = future.result() + results[index] = result_row + pbar.update(1) + except Exception as e: + original_index = future_to_index[future] + logging.error(f"线程执行失败 (行{original_index + 1}): {e}") + # 添加失败的行 + result_row = df.iloc[original_index].copy() + result_row["先词条后工单回答"] = '线程执行失败' + result_row["先词条后工单回答对比"] = '线程执行失败' + result_row["词条与工单同时回答"] = '线程执行失败' + result_row["词条与工单同时回答对比"] = '线程执行失败' + results[original_index] = result_row + pbar.update(1) - return out_path - - + # 按原始顺序重新组织结果 + rows_info = [results[i] for i in sorted(results.keys())] + + # 保存结果 + result_df = pd.DataFrame(rows_info) + result_df.to_excel(save_path, index=False) + logging.info(f"结果已保存到: {save_path}") + + except Exception as e: + logging.error(f"运行过程中出现错误: {e}") + raise + if __name__ == "__main__": - # 创建命令行参数解析器 - os.environ["DIFY_BASEURL"] = "http://10.1.16.39/v1" - os.environ["DIFY_NEW_API_KEY"] = "app-rv6ie73Ufoa3nRYCMiJx3a8K" - os.environ["DIFY_OLD_API_KEY"] = "app-wUdkWJx5zeOvmvBUZizMoSw3" - - os.environ["DIFY_PG_HOST"] = "10.1.16.39" - os.environ["DIFY_PG_PORT"] = "5432" - os.environ["DIFY_PG_USER"] = "postgres" - os.environ["DIFY_PG_PASSWORD"] = "difyai123456" - os.environ["DIFY_PG_DATABASE"] = "dify" - - default_excel_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/740条(dislike)_存在标准词条.xlsx") - parser = argparse.ArgumentParser(description='Dify对话测试工具') - parser.add_argument('--mode', type=str, choices=['new_only', 'both'], default='new_only', - help='测试模式: new_only表示仅测试新对话, both表示测试新老对话') - parser.add_argument('--excel_path', type=str, - default=default_excel_path, - help='包含问题的Excel文件路径') - parser.add_argument('--baseurl', type=str, default=os.getenv("DIFY_BASEURL"), - help='Dify API的基础URL') - parser.add_argument('--new_api_key', type=str, default=os.getenv("DIFY_NEW_API_KEY"), - help='新流程的API密钥') - parser.add_argument('--old_api_key', type=str, default=os.getenv("DIFY_OLD_API_KEY"), - help='旧流程的API密钥') - parser.add_argument('--output_path', type=str, default=None, - help='输出Excel文件路径') - parser.add_argument('--max_workers', type=int, default=5, - help='最大工作线程数') - - # 解析命令行参数 - args = parser.parse_args() - - # 检查Excel文件是否存在 - if not os.path.exists(args.excel_path): - logging.error(f"错误:Excel文件不存在: {args.excel_path}", exc_info=True) - exit(1) - - # 创建测试器并运行 - tester = DifyComparisonTester( - excel_path=args.excel_path, - baseurl=args.baseurl, - new_workflow_api_key=args.new_api_key, - old_workflow_api_key=args.old_api_key if args.mode == "both" else None, - output_path=args.output_path, - max_workers=args.max_workers, - mode=args.mode - ) - - # 运行对比测试(带评判) - output_file = tester.run_comparison(with_judge=True) - logging.info(f"测试结果已保存至: {output_file}") + try: + dify_compare_test = DifyCompareTest() + + # 处理第一个文件 + excel_files = [ + ("data/excel/5月.xlsx", "data/excel/5月问答对比.xlsx"), + ("data/excel/其他月.xlsx", "data/excel/其他月问答对比.xlsx") + ] + + for excel_path, save_path in excel_files: + logging.info(f"开始处理文件: {excel_path}") + try: + dify_compare_test.run(excel_path=excel_path, save_path=save_path, max_workers=3) + logging.info(f"文件处理完成: {excel_path}") + except Exception as e: + logging.error(f"处理文件 {excel_path} 时出错: {e}") + continue + + logging.info("所有文件处理完成") + + except Exception as e: + logging.error(f"程序执行出错: {e}") + sys.exit(1) diff --git a/rag2_0/dify/dify_client/client.py b/rag2_0/dify/dify_client/client.py index 1646810..e81b0fc 100755 --- a/rag2_0/dify/dify_client/client.py +++ b/rag2_0/dify/dify_client/client.py @@ -8,7 +8,7 @@ class DifyClient: self.api_key = api_key self.base_url = base_url - def _send_request(self, method, endpoint, json=None, params=None, stream=False, timeout=300): + def _send_request(self, method, endpoint, json=None, params=None, stream=False, timeout=600): headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", diff --git a/rag2_0/dify/dify_tool.py b/rag2_0/dify/dify_tool.py index d636832..af5b433 100755 --- a/rag2_0/dify/dify_tool.py +++ b/rag2_0/dify/dify_tool.py @@ -362,328 +362,3 @@ class DifyTool: def get_workflow_run_info(self, workflow_run_id): return self.dify_pgsql.get_workflow_run_info(workflow_run_id) - -class BaseWorkflowChat: - """ - 工作流对话基类,封装了与Dify API交互的基本功能 - """ - def __init__(self, api_key: str, base_url: str): - """ - 初始化工作流对话基类 - - Args: - api_key: Dify API的密钥 - base_url: Dify API的基础URL - """ - self.chat_client = ChatClient(api_key=api_key, base_url=base_url) - self.content_source_parser = PydanticOutputParser(pydantic_object=ContentSource) - self.dify_tool = DifyTool() - - def __del__(self): - """ - 析构函数,在对象被销毁时自动关闭数据库连接。 - 确保在对象生命周期结束时释放数据库资源。 - """ - # DifyTool类已经在其__del__方法中关闭了数据库连接,无需在此重复调用 - pass - - def create_chat_message(self, query: str): - """ - 创建聊天消息 - - Args: - query: 问题内容 - - Returns: - tuple: (聊天响应, 消息ID) - """ - try: - response = self.chat_client.create_chat_message(inputs={}, query=query, user="AutoTestDifyChat").json() - return response, response["message_id"] - except Exception as e: - raise e - - def calculate_score(self, query: str, content: str) -> int: - """ - 使用LLM判断query与content之间的相关性分数 - - Args: - query (str): 用户问题 - content (str): 检索内容 - - Returns: - int: 相关性分数,1-10分,10代表完全相关,1代表完全不相关;-1表示评分失败 - """ - from rag2_0.tool.ModelTool import OpenAiLLM - - try: - prompt = f"""你是一个专业的信息相关性评估助手。请根据以下标准对用户query和检索内容的相关性进行1-10评分(10=完全相关,1=完全不相关),并按指定格式输出JSON结果。 - -【评分标准】 -10分:完全契合,主题/意图完全一致且涵盖所有关键信息 -8-9分:高度相关,核心要素匹配但存在少量信息缺失 -6-7分:部分相关,涉及相同主题但存在重要信息缺失 -4-5分:弱相关,仅次要信息点匹配 -1-3分:完全不相关或信息冲突 - -【评估维度】 -1. 主题一致性:核心主题/意图的匹配程度 -2. 内容覆盖度:是否涵盖query的关键要素 -3. 信息准确性:是否存在矛盾/错误信息 -4. 细节丰富度:是否提供query要求的详细信息 - -【输出格式】 -{{ - "score": 评分, - "reason": "简明扼要的评分理由(中文)" -}} - -【示例】 -query: "新冠疫苗的常见副作用" -内容: "辉瑞疫苗常见反应包括注射部位疼痛(84.1%)、疲劳(62.9%)" -输出: {{"score":8,"reason":"主题完全匹配,涵盖主要副作用但未提及发热等常见反应"}} - -现在评估: -query: "{query}" -content: "{content}" -""" - api_key = os.getenv("OPENAI_API_KEY") - base_url = os.getenv("OPENAI_API_BASE") - model = os.getenv("MODEL_NAME") - llm = OpenAiLLM(api_key=api_key, base_url=base_url, model=model) - response = llm.invoke(user_prompt=prompt, need_retry=True) - - # 解析JSON响应 - try: - parsed_output = self.content_source_parser.parse(response.content) - return parsed_output.score - except Exception as e: - return -1 - except Exception as e: - return -1 - - def get_retrieve_info(self, query: str, outputs: list[dict], reranker_sorce_info:list) -> tuple: - """ - 获取检索信息并计算分数 - - Args: - query (str): 用户问题 - outputs (dict): 检索输出结果 - - Returns: - tuple: (检索内容列表, 最高分, 最低分, 平均分) - """ - max_score = 0 - min_score = 10 - total_score = 0 - valid_scores = 0 - retrieve_title = [] - segmentid_to_title = { result["segment_id"]:result["title"].split("/")[-1] for result in outputs} - - # 使用线程池并发计算分数 - with ThreadPoolExecutor() as executor: - # 创建任务列表 - future_to_content = {} - for result in outputs: - content = result["segment_content"].strip() - segment_id = result["segment_id"].strip() - future = executor.submit(self.calculate_score, query=query, content=content) - future_to_content[future] = (content, segment_id) - - # 收集结果 - for future in as_completed(future_to_content): - content, segment_id = future_to_content[future] - score = future.result() - content_title = segmentid_to_title[segment_id] - - if score != -1: - max_score = max(max_score, score) - min_score = min(min_score, score) - total_score += score - valid_scores += 1 - - if content_title: - current_score = next((cur_source_info["score"] for cur_source_info in reranker_sorce_info if cur_source_info["segment_id"] == segment_id), None) - retrieve_title.append(content_title + f"--LLM得分({score}分)--重排得分({current_score:.2f}分)") - - avg_score = total_score / valid_scores if valid_scores > 0 else 0 - return retrieve_title, max_score, min_score, avg_score - -class NewWorkflowChat(BaseWorkflowChat): - """ - 新工作流对话类,用于调用新工作流发送对话并解析获取相关数据 - """ - def __init__(self, api_key: str, base_url: str): - super().__init__(api_key, base_url) - - def process_question(self, query: str) -> dict: - """ - 处理问题,获取新工作流的回答和相关信息 - - Args: - query: 问题内容 - - Returns: - dict: 包含问题、回答和相关信息的字典 - """ - response, message_id = self.create_chat_message(query) - - if isinstance(response, str) and response.startswith("error:"): - raise RuntimeError(f"create_chat_message 出错:{response}") - - answer = response["answer"] - workflow_info = self.get_workflow_info(query, message_id) - - if workflow_info is None: - return None - - result = { - "问题": query, - "新流程答案": answer, - "新问题改写": workflow_info["问题改写"], - "新问题分类": workflow_info["问题分类"], - "槽点信息": workflow_info["槽点信息"], - "新检索词条": workflow_info["检索词条"], - "message_id":message_id - } - - return result - - def get_workflow_info(self, query: str, message_id: str) -> dict: - """ - 获取新工作流的问题分类和检索信息 - - Args: - query (str): 用户问题 - message_id (str): 新工作流的消息ID - - Returns: - dict: 包含问题分类结果的字典 - """ - retrieve_title = [] - retrieve_content = [] - max_score = 0 - min_score = 0 - avg_score = 0 - rewrite_query = "" - vertical_classification = "" - sub_classification = "" - slot_info = "" - reranker_sorce=[] - try: - # 先取出重排得分 - message_info = self.dify_tool.get_message_debug_info_by_id(message_id=message_id) - for workflow_node in message_info["workflow_node_executions_info"]: - if workflow_node["title"] == "提取处理后的知识": - retrieve_outputs = json.loads(workflow_node["outputs"])["source_kno"] - reranker_sorce = [{"score":result["metadata"]["score"], "segment_id":result["metadata"]["segment_id"]} for result in retrieve_outputs] - break - - for workflow_node in message_info["workflow_node_executions_info"]: - if workflow_node["title"] == "提取处理后的知识": - outputs = json.loads(workflow_node["outputs"])["knowledge_list"] - retrieve_title, max_score, min_score, avg_score = self.get_retrieve_info(query=query, outputs=outputs, reranker_sorce_info=reranker_sorce) - elif workflow_node["title"] == "意图识别结果解析": - outputs = json.loads(workflow_node["outputs"]) - rewrite_query = outputs["optimize_query"] - llm_result_json = json.loads(workflow_node['inputs'])["llm_result"] - json_result = json.loads(llm_result_json) - vertical_classification = json_result['vertical_classification'] - sub_classification = json_result['sub_classification'] - slot_info = json.dumps(json_result["slot_filling"], ensure_ascii=False, indent=2) - except Exception as e: - raise e - - retrieve_content = "" - if len(reranker_sorce)==0: - retrieve_content="未检索知识库" - elif len(reranker_sorce) > 0 and len(retrieve_title)==0: - retrieve_content = "知识与提问不相关,被丢弃" - else: - retrieve_content = "\n".join(retrieve_title) - - return { - "问题改写": rewrite_query, - "检索词条": retrieve_content, - "问题分类": f"{vertical_classification} - {sub_classification}", - "槽点信息": slot_info, - - } - -class OldWorkFlowChat(BaseWorkflowChat): - """ - 旧工作流对话类,用于调用旧工作流发送对话并解析获取相关数据 - """ - - def __init__(self, api_key: str, base_url: str): - super().__init__(api_key, base_url) - - def process_question(self, query: str) -> dict: - """ - 处理问题,获取旧工作流的回答和相关信息 - - Args: - query: 问题内容 - - Returns: - dict: 包含问题、回答和相关信息的字典 - """ - response, message_id = self.create_chat_message(query) - - if isinstance(response, str) and response.startswith("error:"): - return None - - answer = response["answer"] - workflow_info = self.get_workflow_info(query, message_id) - - if workflow_info is None: - return None - - result = { - "问题": query, - "旧流程答案": answer, - "旧问题改写": workflow_info["问题改写"], - "旧检索词条": workflow_info["检索词条"], - "message_id":message_id - } - - return result - - def get_workflow_info(self, query: str, message_id: str) -> dict: - """ - 获取旧工作流的问题改写和检索信息 - - Args: - query (str): 用户问题 - message_id (str): 旧工作流的消息ID - - Returns: - dict: 包含问题改写和检索信息的字典 - """ - retrieve_title = [] - retrieve_content = [] - max_score = 0 - min_score = 0 - avg_score = 0 - rewrite_query = "" - - try: - message_info = self.dify_tool.get_message_debug_info_by_id(message_id=message_id) - for workflow_node in message_info["workflow_node_executions_info"]: - if workflow_node["title"] == "知识检索结果后处理": - outputs = json.loads(workflow_node["outputs"]) - retrieve_title, max_score, min_score, avg_score = self.get_retrieve_info(query=query, outputs=outputs) - retrieve_content = outputs["result"] - elif workflow_node["title"] == "问题优化结果解析": - outputs = json.loads(workflow_node["outputs"]) - rewrite_query = outputs["optimize_query"] - except Exception as e: - return None - - return { - "问题改写": rewrite_query, - "检索词条": "\n".join(retrieve_title) if retrieve_title else "未检索知识库", - } - -if __name__ == "__main__": - pass diff --git a/rag2_0/tool/ModelTool.py b/rag2_0/tool/ModelTool.py index 0978d86..d59f489 100755 --- a/rag2_0/tool/ModelTool.py +++ b/rag2_0/tool/ModelTool.py @@ -240,11 +240,15 @@ class OpenAiLLM: self._kwargs = kwargs - def invoke(self, user_prompt="你是谁?", need_retry=True): + def invoke(self, user_prompt="你是谁?", need_retry=True,**extra_kwargs): # 初始化 OpenAI 客户端 max_retries = 3 retry_count = 0 + # 合并额外的kwargs与self._kwargs + kwargs = {**self._kwargs} + if extra_kwargs: + kwargs.update(extra_kwargs) if "timeout" not in self._kwargs: timeout = httpx.Timeout(300.0) self._kwargs["timeout"] = timeout