From 01dc1c3c91d09b6efdc5dafff97a32d263c75b7d Mon Sep 17 00:00:00 2001 From: ouyangyouzhang Date: Wed, 4 Jun 2025 17:37:47 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E5=AF=B9=E8=AF=9D=E8=BD=AC?= =?UTF-8?q?=E5=B7=A5=E5=8D=95=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91=EF=BC=8C?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=94=A8=E6=88=B7=E9=97=AE=E9=A2=98=E5=92=8C?= =?UTF-8?q?=E8=A7=A3=E5=86=B3=E6=96=B9=E6=A1=88=E6=8F=90=E5=8F=96=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=EF=BC=8C=E8=B0=83=E6=95=B4LLM=E5=8F=82=E6=95=B0?= =?UTF-8?q?=EF=BC=8C=E5=A2=9E=E5=BC=BA=E9=94=99=E8=AF=AF=E5=A4=84=E7=90=86?= =?UTF-8?q?=E6=9C=BA=E5=88=B6=EF=BC=8C=E6=94=B9=E8=BF=9B=E5=A4=9A=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E5=A4=84=E7=90=86=E6=95=88=E7=8E=87=EF=BC=8C=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E6=B5=8B=E8=AF=95=E7=94=A8=E4=BE=8B=E4=BB=A5=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E6=96=B0=E5=8A=9F=E8=83=BD=EF=BC=8C=E6=8F=90=E5=8D=87?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E5=8F=AF=E8=AF=BB=E6=80=A7=E5=92=8C=E7=BB=93?= =?UTF-8?q?=E6=9E=84=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rag2_0/demo/dialogue_to_workorder.py | 64 ++++++++---- rag2_0/dify/test_dify_chatapi.py | 149 +++++++++++++++++++-------- 2 files changed, 147 insertions(+), 66 deletions(-) diff --git a/rag2_0/demo/dialogue_to_workorder.py b/rag2_0/demo/dialogue_to_workorder.py index e59c23d..7c3d7e1 100755 --- a/rag2_0/demo/dialogue_to_workorder.py +++ b/rag2_0/demo/dialogue_to_workorder.py @@ -11,12 +11,13 @@ from langchain.output_parsers import PydanticOutputParser from rag2_0.tool.ModelTool import OpenAiLLM from dotenv import load_dotenv import openpyxl +import re load_dotenv() # ================ 模型定义 ================ class UserQuestionAndSolution(BaseModel): - user_question: str = Field(description="客户问题") + user_question: str = Field(description="用户的核心问题") solution: str = Field(description="坐席提供的解决方案") class UserQuestionAndSolutionList(BaseModel): @@ -90,7 +91,8 @@ class DialogueToWorkorder: # 初始化LLM模型 self.llm_params = llm_params or { "temperature": 0.2, - "model": os.getenv("LLM_MODEL_NAME"), + "top_p":0.95, + "model": "deepseek-ai/DeepSeek-R1", "api_key": os.getenv("OPENAI_API_KEY"), "base_url": os.getenv("OPENAI_API_BASE") } @@ -207,18 +209,21 @@ class DialogueToWorkorder: """分析用户问题和解决方案""" dialogue_str = self.get_dialogue_str(conversation_rows) - prompt = """请从以下电力造价相关的客服对话记录中,识别并总结用户提出的问题及对应坐席提供的解决方案。(注意指代消除) -1、理解对话记录,总结用户在此次对话中提出的核心诉求(希望解决的问题)。以用户的角度总结。 -2、根据用户提出的诉求,分析坐席提供的解决方法(比如:1、引导用户xxxx。2、告诉用户xxxxx)。以坐席的角度直接总结解决方案(不要出现"坐席"、"我"等字样) -3、使用json格式输出(多个用户诉求采用数组格式输出): + prompt = """请从以下电力造价相关的客服对话记录中,识别并总结用户提出的核心问题及对应坐席提供的解决方案。(注意指代消除) +1、理解对话记录,总结用户在此次对话中提出的核心问题,以用户的角度总结核心问题(可根据上下文完善问题内容)。 +2、根据用户提出的问题,分析坐席提供的解决方法(比如:1、引导用户xxxx。2、告诉用户xxxxx)。以坐席的角度直接总结完整的解决方案或应对措施(不要出现"坐席"、"我"等字样) +3、提炼访客独立的核心问题(以访客的角度总结核心问题),核心问题衍生、细化后的请求合并到对应的核心问题中。不要单独列出衍生、细化后的请求。 + +4、使用json格式输出(多个用户问题采用标准json数组格式输出): {output_format} 输出示例: +```json {{ - "user_question": "软件打开报错", - "solution": "通过远程引导解决" + "user_question": "技改软件打开报错", + "solution": "1、告知报错原因 2、通过远程辅助解决" }} - +``` =======对话记录如下所示======= {dialogue_str} ============================ @@ -227,22 +232,38 @@ class DialogueToWorkorder: llm_prompt = prompt.format(output_format=output_format, dialogue_str=dialogue_str) response = self.llm.invoke(user_prompt=llm_prompt) - if 'reasoning_content' not in response.model_extra and self.llm._model == 'deepseek-ai/DeepSeek-R1': - print("deepseek-ai/DeepSeek-R1 解析失败") try: if response.content.count('user_question') == 1: user_question_and_solution = self.user_question_and_solution_parser.parse(response.content) return [user_question_and_solution] else: - - raise Exception("解析失败") + array_pattern = r'\[\s*(\{[\s\S]*?\}(?:\s*,\s*\{[\s\S]*?\})*)\s*\]' + array_match = re.search(array_pattern, response.content) + if array_match: + # 找到了JSON数组 + json_array_str = '[' + array_match.group(1) + ']' + + # 尝试解析JSON + json_objects = json.loads(json_array_str) + user_question_and_solution_list = [] + + for obj in json_objects: + user_question = obj.get('user_question', '') + solution = obj.get('solution', '') + user_question_and_solution_list.append( + UserQuestionAndSolution(user_question=user_question, solution=solution) + ) + + if user_question_and_solution_list: + return user_question_and_solution_list + raise Exception("解析失败") # 如果之前没有return 触发异常重新提取 except Exception as e: output_format = self.user_question_and_solution_list_parser.get_format_instructions() llm_prompt = prompt.format(output_format=output_format, dialogue_str=dialogue_str) response = self.llm.invoke(user_prompt=llm_prompt) - user_question_and_solution = self.user_question_and_solution_list_parser.parse(response.content) - return user_question_and_solution.user_question_list + user_question_and_solution_temp = self.user_question_and_solution_list_parser.parse(response.content) + return user_question_and_solution_temp.user_question_list return [user_question_and_solution] @@ -348,15 +369,14 @@ class DialogueToWorkorder: dialogue_str = self.get_dialogue_str(conversation_rows) prompt = f""" -请根据以下对话记录分析访客情绪是否对博微软件或者坐席服务存在明显抱怨,并按照以下结构输出JSON格式分析结果: +请根据以下对话记录分析访客情绪是否对软件或者坐席服务存在明显抱怨,并按照以下结构输出JSON格式分析结果: -1. 抱怨识别:判断访客是否对博微软件功能或者坐席服务存在抱怨或不满 +1. 抱怨识别:判断访客是否对软件功能或者坐席服务存在明显抱怨或不满 2. 抱怨分级(如存在抱怨): - - 一般抱怨:明确表达出对博微软件功能或者坐席服务存在不满 - - 中等抱怨:明确表达出对博微软件功能或者坐席服务存在不满,语气较为强烈 - - 严重抱怨:对博微软件功能或者坐席服务使用激烈言辞 + - 普通抱怨:明确表达出对软件功能或者坐席服务存在不满 + - 严重抱怨:明确表达出对软件功能或者坐席服务存在不满,语气较为强烈 3. 投诉倾向:是否明确/暗示将进行投诉 -4. 抱怨对象:坐席服务态度/业务能力 或 博微功能问题(注意忽略对非博微软件或坐席的抱怨) +4. 抱怨对象:坐席服务态度/业务能力 或 软件功能问题(注意忽略对非软件或坐席的抱怨) 示例输出: {{ @@ -384,6 +404,8 @@ class DialogueToWorkorder: def process_conversation(self, conversation_id, conversation_rows, product_detail_dict): """处理单个会话的函数,用于多线程并发""" + # if conversation_id!="b157aa91-3acb-11f0-a191-4fb224ef4b40": + # return [] # 获取工单基本信息 base_workorder_dict = self.get_workorder_dict(conversation_rows) # 分析用户问题和解决方案 diff --git a/rag2_0/dify/test_dify_chatapi.py b/rag2_0/dify/test_dify_chatapi.py index ae74288..3943523 100755 --- a/rag2_0/dify/test_dify_chatapi.py +++ b/rag2_0/dify/test_dify_chatapi.py @@ -17,6 +17,7 @@ from dotenv import load_dotenv from pydantic import BaseModel, Field from langchain.output_parsers import PydanticOutputParser from threading import Lock +import sys load_dotenv() @@ -292,7 +293,7 @@ class DifyComparisonTester: llm = self.get_llm() try: response = llm.invoke(user_prompt=prompt, need_retry=True) - return "缺乏标准答案无法判断准确性,但答案差异较大" if "存在较大差异" in response.content else "缺乏标准答案无法判断准确性,但答案基本相同" + return "缺乏标准答案无法判断准确性,但答案基本相同" if "差异较小" in response.content else "缺乏标准答案无法判断准确性,但答案差异较大" except Exception as e: return None @@ -368,17 +369,29 @@ content: "{content}" valid_scores = 0 retrieve_content = [] - for result in outputs["result"]: - content = result["content"].strip() - score = self.calculate_score(query=query, content=content) - if score != -1: - max_score = max(max_score, score) - min_score = min(min_score, score) - total_score += score - valid_scores += 1 - content_title = content.split("\n")[0] - if content_title: - retrieve_content.append(content_title + f"--得分({score}分)") + # 使用线程池并发计算分数 + with ThreadPoolExecutor() as executor: + # 创建任务列表 + future_to_content = {} + for result in outputs["result"]: + content = result["content"].strip() + future = executor.submit(self.calculate_score, query=query, content=content) + future_to_content[future] = content + + # 收集结果 + for future in as_completed(future_to_content): + content = future_to_content[future] + score = future.result() + content_title = content.split("\n")[0] + + if score != -1: + max_score = max(max_score, score) + min_score = min(min_score, score) + total_score += score + valid_scores += 1 + + if content_title: + retrieve_content.append(content_title + f"--得分({score}分)") avg_score = total_score / valid_scores if valid_scores > 0 else 0 return retrieve_content, max_score, min_score, avg_score @@ -394,6 +407,7 @@ content: "{content}" Returns: dict: 包含问题分类结果的字典 """ + retrieve_title=[] retrieve_content=[] max_score=0 min_score=0 @@ -401,12 +415,14 @@ content: "{content}" rewrite_query="" vertical_classification="" sub_classification="" + slot_info="" try: new_message_info = DifyTool.get_message_debug_info_by_id(message_id=new_message_id) for workflow_node in new_message_info["workflow_node_executions_info"]: if workflow_node["title"] == "知识检索结果后处理": outputs = json.loads(workflow_node["outputs"]) - retrieve_content, max_score, min_score, avg_score = self.get_retrieve_info(query=query, outputs=outputs) + retrieve_title, max_score, min_score, avg_score = self.get_retrieve_info(query=query, outputs=outputs) + retrieve_content=outputs["result"] elif workflow_node["title"] == "问题优化结果解析": outputs = json.loads(workflow_node["outputs"]) rewrite_query = outputs["optimize_query"] @@ -414,15 +430,15 @@ content: "{content}" json_result = json.loads(llm_result_json) vertical_classification = json_result['vertical_classification'] sub_classification = json_result['sub_classification'] + slot_info=json.dumps(json_result["slot_filling"],ensure_ascii=False,indent=2) except Exception as e: return None return { "问题改写": rewrite_query, - "检索词条": "\n".join(retrieve_content) if retrieve_content else "未检索知识库", + "检索词条": "\n".join(retrieve_title) if retrieve_title else "未检索知识库", + "检索内容": retrieve_content, "问题分类": f"{vertical_classification} - {sub_classification}", - "检索最高分": max_score, - "检索最低分": min_score, - "检索平均分": avg_score + "槽点信息":slot_info } def get_old_workflow_info(self, query:str, old_message_id:str) -> dict: @@ -436,6 +452,7 @@ content: "{content}" Returns: dict: 包含问题分类结果的字典 """ + retrieve_title=[] retrieve_content=[] max_score=0 min_score=0 @@ -446,7 +463,8 @@ content: "{content}" for workflow_node in old_message_info["workflow_node_executions_info"]: if workflow_node["title"] == "知识检索结果后处理": outputs = json.loads(workflow_node["outputs"]) - retrieve_content, max_score, min_score, avg_score = self.get_retrieve_info(query=query, outputs=outputs) + retrieve_title, max_score, min_score, avg_score = self.get_retrieve_info(query=query, outputs=outputs) + retrieve_content=outputs["result"] elif workflow_node["title"] == "问题优化结果解析": outputs = json.loads(workflow_node["outputs"]) rewrite_query = outputs["optimize_query"] @@ -454,12 +472,35 @@ content: "{content}" return None return { "问题改写": rewrite_query, - "检索词条": "\n".join(retrieve_content) if retrieve_content else "未检索知识库", - "检索最高分": max_score, - "检索最低分": min_score, - "检索平均分": avg_score + "检索词条": "\n".join(retrieve_title) if retrieve_title else "未检索知识库", + "检索内容": retrieve_content, } + def get_retrieve_title_similarity(self, old_retrieve_content:list[dict], new_retrieve_content:list[dict]) -> str: + old_retrieve_content_list=[content["content"] for content in old_retrieve_content] + new_retrieve_content_list=[content["content"] for content in new_retrieve_content] + # 计算两个列表的交集 + intersection = set(old_retrieve_content_list).intersection(set(new_retrieve_content_list)) + + # 准备详细的比较结果 + intersection_count = len(intersection) + old_count = len(old_retrieve_content_list) + new_count = len(new_retrieve_content_list) + + # 计算相似度 (Jaccard相似系数) + if old_count == 0 and new_count == 0: + similarity = 1.0 # 都为空时,认为完全相似 + elif old_count == 0 or new_count == 0: + similarity = 0.0 # 一个为空时,认为完全不相似 + else: + # 交集大小除以并集大小 + union_count = len(set(old_retrieve_content_list).union(set(new_retrieve_content_list))) + similarity = intersection_count / union_count + + similarity_percentage = round(similarity * 100, 2) + result = f"{similarity_percentage}%" + return result + def process_question_with_judge(self, q:str): """ 处理单个问题,获取新旧流程的回答并进行评判 @@ -511,16 +552,18 @@ content: "{content}" except Exception as e: print(f"处理问题 '{query}' 获取工作流信息时发生错误: {str(e)}") return None - + retrieve_title_score=self.get_retrieve_title_similarity(old_retrieve_content=old_workflow_info["检索内容"], new_retrieve_content=new_workflow_info["检索内容"]) # 返回结果 return { "问题": query, "新问题改写": new_workflow_info["问题改写"], "旧问题改写": old_workflow_info["问题改写"], "新问题分类": new_workflow_info["问题分类"], + "槽点信息":new_workflow_info["槽点信息"], "新流程答案": new_answer, "旧流程答案": old_answer, "回答判断": judge_result, + "词条检索相似度": retrieve_title_score, "答案词条": answer_title if answer_title else "", "新检索词条": new_workflow_info["检索词条"], "旧检索词条": old_workflow_info["检索词条"], @@ -538,29 +581,44 @@ content: "{content}" """ # 读取Excel文件中的问题 df = pd.read_excel(self.excel_path) - questions = df['问题'].tolist() + questions=[] + for idx, row in df.iterrows(): + if row['回答中的软件名称'] == "未知": + continue + if row['提问中的软件名称'] != "未知": + questions.append(row['提问']) + questions.append(f"{row['回答中的软件名称']}, {row['提问']}") + results = [] # 选择处理函数 process_func = self.process_question_with_judge if with_judge else self.process_question - - # 使用多线程并发处理问题 - with ThreadPoolExecutor(max_workers=self.max_workers) as executor: - # 创建进度条 - with tqdm(total=len(questions), desc="处理问题进度") as pbar: - # 提交所有任务 - futures = [] - for q in questions: - future = executor.submit(process_func, q) - futures.append(future) - - # 处理结果 - for future in as_completed(futures): - result = future.result() - if result is not None: - with self.results_lock: - results.append(result) - pbar.update(1) - + is_debug = hasattr(sys, 'gettrace') and sys.gettrace() is not None + if not is_debug: + # 使用多线程并发处理问题 + print("并发数量: ", self.max_workers) + print("问题数量: ", len(questions)) + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + # 创建进度条 + with tqdm(total=len(questions), desc="处理问题进度") as pbar: + # 提交所有任务 + futures = [] + for q in questions: + future = executor.submit(process_func, q) + futures.append(future) + + # 处理结果 + for future in as_completed(futures): + result = future.result() + if result is not None: + with self.results_lock: + results.append(result) + pbar.update(1) + else: + for q in questions: + result = process_func(q) + print(json.dumps(result,ensure_ascii=False,indent=2)) + if result is not None: + results.append(result) # 生成输出Excel文件 out_path = self.output_path if with_judge else os.path.join(os.path.dirname(self.excel_path), "dify问答_对比结果.xlsx") df_results = pd.DataFrame(results) @@ -583,7 +641,7 @@ content: "{content}" if __name__ == "__main__": # 定义Excel路径 - excel_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/400条答案差异的.xlsx") + excel_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/历史提问数据(like)_提问明确.xlsx") if not os.path.exists(excel_path): print(f"错误:Excel文件不存在: {excel_path}") @@ -592,7 +650,7 @@ if __name__ == "__main__": # Dify API配置 baseurl = "http://172.20.0.145/v1" old_workflow_api_key = "app-wUdkWJx5zeOvmvBUZizMoSw3" - new_workflow_api_key = "app-Lf1pQ1NVwdMfCRVNTBCOTPHT" + new_workflow_api_key = "app-qxsSybCs7ABiKlC1JabTYVn6" # Wiki Excel路径和Dify应用ID(用于评判) wiki_excel_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/部分提问_软件名称明确.xlsx") @@ -604,6 +662,7 @@ if __name__ == "__main__": old_workflow_api_key=old_workflow_api_key, new_workflow_api_key=new_workflow_api_key, wiki_excel_path=wiki_excel_path, + max_workers=5 ) # 运行对比测试(带评判)