diff --git a/.gitignore b/.gitignore index aa9b5e5..dce657c 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ __pycache__/ # 忽略数据文件 data/excel/* rag2_0/demo/Test.py +data/excel/*.xlsx diff --git a/data/excel/200条提问数据.xlsx b/data/excel/200条提问数据.xlsx deleted file mode 100644 index 78adf8e..0000000 Binary files a/data/excel/200条提问数据.xlsx and /dev/null differ diff --git a/data/excel/200条提问数据_重写结果.xlsx b/data/excel/200条提问数据_重写结果.xlsx deleted file mode 100644 index d9c053e..0000000 Binary files a/data/excel/200条提问数据_重写结果.xlsx and /dev/null differ diff --git a/data/excel/200条提问数据_重写结果修改词库前.xlsx b/data/excel/200条提问数据_重写结果修改词库前.xlsx deleted file mode 100644 index 97375e8..0000000 Binary files a/data/excel/200条提问数据_重写结果修改词库前.xlsx and /dev/null differ diff --git a/data/excel/answer_result.xlsx b/data/excel/answer_result.xlsx deleted file mode 100644 index f2d8232..0000000 Binary files a/data/excel/answer_result.xlsx and /dev/null differ diff --git a/data/excel/dify问答_对比结果.xlsx b/data/excel/dify问答_对比结果.xlsx deleted file mode 100644 index 2e52cb7..0000000 Binary files a/data/excel/dify问答_对比结果.xlsx and /dev/null differ diff --git a/data/excel/主网软件提问_回答内容评判.xlsx b/data/excel/主网软件提问_回答内容评判.xlsx deleted file mode 100644 index 3662a79..0000000 Binary files a/data/excel/主网软件提问_回答内容评判.xlsx and /dev/null differ diff --git a/data/excel/历史提问数据(dislike)_1000条_软件明确.xlsx b/data/excel/历史提问数据(dislike)_1000条_软件明确.xlsx deleted file mode 100644 index 455edf7..0000000 Binary files a/data/excel/历史提问数据(dislike)_1000条_软件明确.xlsx and /dev/null differ diff --git a/rag2_0/demo/dialogue_to_workorder.py b/rag2_0/demo/dialogue_to_workorder.py index 616705d..fe7af05 100644 --- a/rag2_0/demo/dialogue_to_workorder.py +++ b/rag2_0/demo/dialogue_to_workorder.py @@ -1,6 +1,7 @@ import os import json import pandas as pd +import argparse from datetime import datetime import time import concurrent.futures @@ -9,6 +10,7 @@ from pydantic import BaseModel, Field from langchain.output_parsers import PydanticOutputParser from rag2_0.tool.ModelTool import OpenAiLLM from dotenv import load_dotenv +import openpyxl load_dotenv() @@ -17,6 +19,9 @@ class UserQuestionAndSolution(BaseModel): user_question: str = Field(description="客户问题") solution: str = Field(description="坐席提供的解决方案") +class UserQuestionAndSolutionList(BaseModel): + user_question_list: list[UserQuestionAndSolution] = Field(description="客户问题列表") + class QuestionType(BaseModel): question_type: str = Field(description="问题类型") @@ -33,14 +38,7 @@ class ProductNameAndModuleName(BaseModel): class ProductLine(BaseModel): product_line: str = Field(description="产品线") -# 初始化输出解析器 -user_question_and_solution_parser = PydanticOutputParser(pydantic_object=UserQuestionAndSolution) -question_type_parser = PydanticOutputParser(pydantic_object=QuestionType) -is_complaint_parser = PydanticOutputParser(pydantic_object=IsComplaint) -product_name_and_module_name_parser = PydanticOutputParser(pydantic_object=ProductNameAndModuleName) -product_line_parser = PydanticOutputParser(pydantic_object=ProductLine) - -# ================ LLM配置 ================ +# ================ 工具函数 ================ def retry_llm_call(max_retries=3, delay=2): """ 重试装饰器,用于LLM调用失败时进行重试 @@ -72,130 +70,144 @@ def retry_llm_call(max_retries=3, delay=2): return wrapper return decorator -def get_llm_instance(): - """获取LLM实例""" - api_key = os.getenv("OPENAI_API_KEY") - base_url = os.getenv("OPENAI_API_BASE") - model_name = os.getenv("LLM_MODEL_NAME") - - llm_params = { - "temperature": 0.6, - "model": model_name, - "api_key": api_key, - "base_url": base_url - } - - return OpenAiLLM(**llm_params) - -# ================ 数据处理函数 ================ -def parse_product_detail_excel(file_path): - """解析产品详情Excel文件""" - df = pd.read_excel(file_path) - product_dict = {} - - for _, row in df.iterrows(): - product_line = str(row['产品线']).strip() if pd.notna(row['产品线']) else '' - product_name = str(row['产品名称']).strip() if pd.notna(row['产品名称']) else '' - module_name = str(row['模块名称']).strip() if pd.notna(row['模块名称']) else '' +# ================ 对话转工单处理类 ================ +class DialogueToWorkorder: + def __init__(self, llm_params=None): + """ + 初始化对话转工单处理类 - if product_line not in product_dict: - product_dict[product_line] = {} - if product_name not in product_dict[product_line]: - product_dict[product_line][product_name] = [] - product_dict[product_line][product_name].append(module_name) - - return product_dict - -def get_workorder_dict(rows): - """从会话行中提取工单基本信息""" - workorder_dict = {} - - # 创建时间 - for row in rows: - create_time = row['创建时间'] - if pd.notna(create_time) and str(create_time).strip() != '': - workorder_dict["创建时间"] = create_time - break - - # 处理坐席 - for row in rows: - sender = row['消息发送者'] - sender_nickname = row['发送者昵称'] - if sender == "坐席" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '': - workorder_dict["处理坐席"] = sender_nickname - break - - # 访客昵称 - for row in rows: - sender = row['消息发送者'] - sender_nickname = row['发送者昵称'] - if sender == "访客" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '': - workorder_dict["访客昵称"] = sender_nickname - break - - # 会话id - for row in rows: - conversation_id = row['会话id'] - if pd.notna(conversation_id) and str(conversation_id).strip() != '': - workorder_dict["会话id"] = conversation_id - break - - # 工单编号 - 将"创建时间"作为工单编号,格式化为20250513104124 - if "创建时间" in workorder_dict and pd.notna(workorder_dict["创建时间"]): - create_time_str = str(workorder_dict["创建时间"]).strip() - dt = datetime.strptime(create_time_str, "%Y-%m-%d %H:%M:%S") - workorder_dict["工单编号"] = dt.strftime("%Y%m%d%H%M%S") - - return workorder_dict - -def get_dialogue_str(conversation_rows): - """从会话行中提取对话内容""" - dialogue = [] - - for row in conversation_rows: - sender = row.get('消息发送者', '') - content = str(row.get('消息内容', '')).strip() + 参数: + llm_params: LLM模型参数,如果为None则使用环境变量中的配置 + """ + # 初始化输出解析器 + self.user_question_and_solution_parser = PydanticOutputParser(pydantic_object=UserQuestionAndSolution) + self.user_question_and_solution_list_parser = PydanticOutputParser(pydantic_object=UserQuestionAndSolutionList) + self.question_type_parser = PydanticOutputParser(pydantic_object=QuestionType) + self.is_complaint_parser = PydanticOutputParser(pydantic_object=IsComplaint) + self.product_name_and_module_name_parser = PydanticOutputParser(pydantic_object=ProductNameAndModuleName) + self.product_line_parser = PydanticOutputParser(pydantic_object=ProductLine) - # 处理非文本内容 - if content == '' or pd.isna(row["消息内容"]): - if str(row.get('图片', '')).strip() != '': - content = '[图片]' - elif str(row.get('附件', '')).strip() != '': - content = '[附件]' - elif str(row.get('视频', '')).strip() != '': - content = '[视频]' - elif str(row.get('语音', '')).strip() != '': - content = '[语音]' - - # 添加对话内容 - if sender == '访客': - dialogue.append(f"访客:{content}") - elif sender == '坐席': - dialogue.append(f"坐席:{content}") + # 初始化LLM模型 + self.llm_params = llm_params or { + "temperature": 0.6, + "model": os.getenv("LLM_MODEL_NAME"), + "api_key": os.getenv("OPENAI_API_KEY"), + "base_url": os.getenv("OPENAI_API_BASE") + } + + self.llm = self._get_llm_instance() + + def _get_llm_instance(self): + """获取LLM实例""" + return OpenAiLLM(**self.llm_params) + + def parse_product_detail_excel(self, file_path): + """解析产品详情Excel文件""" + df = pd.read_excel(file_path) + product_dict = {} + + for _, row in df.iterrows(): + product_line = str(row['产品线']).strip() if pd.notna(row['产品线']) else '' + product_name = str(row['产品名称']).strip() if pd.notna(row['产品名称']) else '' + module_name = str(row['模块名称']).strip() if pd.notna(row['模块名称']) else '' - return '\n'.join(dialogue) - -def group_conversations_by_id(df): - """将数据按会话ID分组""" - conversation_dict = {} + if product_line not in product_dict: + product_dict[product_line] = {} + if product_name not in product_dict[product_line]: + product_dict[product_line][product_name] = [] + product_dict[product_line][product_name].append(module_name) + + return product_dict - for index, row in df.iterrows(): - conversation_id = row['会话id'] - if pd.notna(conversation_id) and str(conversation_id).strip() != '': - if conversation_id in conversation_dict: - conversation_dict[conversation_id].append(row.to_dict()) - else: - conversation_dict[conversation_id] = [row.to_dict()] + def get_workorder_dict(self, rows): + """从会话行中提取工单基本信息""" + workorder_dict = {} + + # 创建时间 + for row in rows: + create_time = row['创建时间'] + if pd.notna(create_time) and str(create_time).strip() != '': + workorder_dict["创建时间"] = create_time + break + + # 处理坐席 + for row in rows: + sender = row['消息发送者'] + sender_nickname = row['发送者昵称'] + if sender == "坐席" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '': + workorder_dict["处理坐席"] = sender_nickname + break + + # 访客昵称 + for row in rows: + sender = row['消息发送者'] + sender_nickname = row['发送者昵称'] + if sender == "访客" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '': + workorder_dict["访客昵称"] = sender_nickname + break + + # 会话id + for row in rows: + conversation_id = row['会话id'] + if pd.notna(conversation_id) and str(conversation_id).strip() != '': + workorder_dict["会话id"] = conversation_id + break - return conversation_dict + # 工单编号 - 将"创建时间"作为工单编号,格式化为20250513104124 + if "创建时间" in workorder_dict and pd.notna(workorder_dict["创建时间"]): + create_time_str = str(workorder_dict["创建时间"]).strip() + dt = datetime.strptime(create_time_str, "%Y-%m-%d %H:%M:%S") + workorder_dict["工单编号"] = dt.strftime("%Y%m%d%H%M%S") -# ================ LLM分析函数 ================ -@retry_llm_call(max_retries=3, delay=2) -def get_user_question_and_solution(conversation_rows): - """分析用户问题和解决方案""" - dialogue_str = get_dialogue_str(conversation_rows) + return workorder_dict - prompt = f""" + def get_dialogue_str(self, conversation_rows): + """从会话行中提取对话内容""" + dialogue = [] + + for row in conversation_rows: + sender = row.get('消息发送者', '') + content = str(row.get('消息内容', '')).strip() + + # 处理非文本内容 + if content == '' or pd.isna(row["消息内容"]): + if str(row.get('图片', '')).strip() != '': + content = '[图片]' + elif str(row.get('附件', '')).strip() != '': + content = '[附件]' + elif str(row.get('视频', '')).strip() != '': + content = '[视频]' + elif str(row.get('语音', '')).strip() != '': + content = '[语音]' + + # 添加对话内容 + if sender == '访客': + dialogue.append(f"访客:{content}") + elif sender == '坐席': + dialogue.append(f"坐席:{content}") + + return '\n'.join(dialogue) + + def group_conversations_by_id(self, df): + """将数据按会话ID分组""" + conversation_dict = {} + + for index, row in df.iterrows(): + conversation_id = row['会话id'] + if pd.notna(conversation_id) and str(conversation_id).strip() != '': + if conversation_id in conversation_dict: + conversation_dict[conversation_id].append(row.to_dict()) + else: + conversation_dict[conversation_id] = [row.to_dict()] + + return conversation_dict + + @retry_llm_call(max_retries=3, delay=2) + def get_user_question_and_solution(self, conversation_rows): + """分析用户问题和解决方案""" + dialogue_str = self.get_dialogue_str(conversation_rows) + + prompt = f""" 请从以下电力造价相关的客服对话记录中,精准提取用户提出的专业问题及对应坐席提供的解决方案。要求: 1. 专业识别: @@ -217,77 +229,82 @@ def get_user_question_and_solution(conversation_rows): - 文件模板提供情况 3. 结构化输出: -{user_question_and_solution_parser.get_format_instructions()} +{self.user_question_and_solution_list_parser.get_format_instructions()} 访客与坐席的对话记录如下: {dialogue_str} - """ + """ + + response = self.llm.invoke(user_prompt=prompt) + user_question_and_solution_list = self.user_question_and_solution_list_parser.parse(response.content) + + return user_question_and_solution_list.user_question_list - llm = get_llm_instance() - response = llm.invoke(user_prompt=prompt) - user_question_and_solution = user_question_and_solution_parser.parse(response.content) - - return user_question_and_solution.user_question, user_question_and_solution.solution - -@retry_llm_call(max_retries=3, delay=2) -def get_product_name_and_module_name(product_line, conversation_rows, product_detail_dict): - """分析产品名称和模块名称""" - if product_line == '': - return '', '' - - json_str = json.dumps(product_detail_dict[product_line]) - dialogue_str = get_dialogue_str(conversation_rows) - - prompt = f""" + @retry_llm_call(max_retries=3, delay=2) + def get_product_name_and_module_name(self, product_line, conversation_rows, product_detail_dict, user_question_str, solution_str): + """分析产品名称和模块名称""" + if product_line == '': + return '', '' + + json_str = json.dumps(product_detail_dict[product_line]) + dialogue_str = self.get_dialogue_str(conversation_rows) + + prompt = f""" 请根据以下对话内容分析所属产品名称和模块名称,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。 +要求: +1. 如果对话记录中存在多个产品名称和模块名称,则根据"{user_question_str}"和"{solution_str}"判断最可能的产品名称和模块名称。 +2. 如果对话记录中只存在一个产品名称和模块名称,则直接返回该产品名称和模块名称。 + 输出格式: -{product_name_and_module_name_parser.get_format_instructions()} +{self.product_name_and_module_name_parser.get_format_instructions()} 产品名称列表及模块名称列表: {json_str} 对话记录: {dialogue_str} - """ + """ + + response = self.llm.invoke(user_prompt=prompt) + product_name_and_module_name = self.product_name_and_module_name_parser.parse(response.content) + + return product_name_and_module_name.product_name, product_name_and_module_name.module_name - llm = get_llm_instance() - response = llm.invoke(user_prompt=prompt) - product_name_and_module_name = product_name_and_module_name_parser.parse(response.content) - - return product_name_and_module_name.product_name, product_name_and_module_name.module_name - -@retry_llm_call(max_retries=3, delay=2) -def get_product_line(conversation_rows, product_detail_dict): - """分析产品线""" - dialogue_str = get_dialogue_str(conversation_rows) - product_line_list = list(product_detail_dict.keys()) - - prompt = f""" + @retry_llm_call(max_retries=3, delay=2) + def get_product_line(self, conversation_rows, product_detail_dict, user_question_str, solution_str): + """分析产品线""" + dialogue_str = self.get_dialogue_str(conversation_rows) + product_line_list = list(product_detail_dict.keys()) + + prompt = f""" 请根据以下对话内容分析所属产品线,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。 无法判断时,返回空字符串。即product_line="" +要求: +1. 如果对话记录中存在多个产品线,则根据"{user_question_str}"和"{solution_str}"判断最可能的产品线。 +2. 如果对话记录中只存在一个产品线,则直接返回该产品线。 + 输出格式: -{product_line_parser.get_format_instructions()} +{self.product_line_parser.get_format_instructions()} 产品线列表: {product_line_list} 对话记录: {dialogue_str} - """ + """ + + response = self.llm.invoke(user_prompt=prompt) + product_line = self.product_line_parser.parse(response.content) + + return product_line.product_line - llm = get_llm_instance() - response = llm.invoke(user_prompt=prompt) - product_line = product_line_parser.parse(response.content) - - return product_line.product_line - -@retry_llm_call(max_retries=3, delay=2) -def get_problem_type(conversation_rows): - """分析问题类型""" - dialogue_str = get_dialogue_str(conversation_rows) - - prompt = f""" + @retry_llm_call(max_retries=3, delay=2) + def get_problem_type(self, conversation_rows, user_question_str, solution_str): + """分析问题类型""" + dialogue_str = self.get_dialogue_str(conversation_rows) + + prompt = f""" 请根据以下对话内容分析所属业务类别,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。 分类体系: @@ -305,25 +322,25 @@ def get_problem_type(conversation_rows): 2. 区分操作类问题与技术故障(如"报表导出失败"需区分是操作错误还是系统错误) 3. 对涉及专业计算的咨询,需判断是否属于常规咨询(专业咨询)或系统计算异常(数据问题) 4. 对模糊表述要求追问的场景归入培训支持类 +5. 对话记录可能存在多个问题,你只需要判断"{user_question_str}"属于哪个类型的问题 输出格式: -{question_type_parser.get_format_instructions()} +{self.question_type_parser.get_format_instructions()} 对话记录如下: {dialogue_str} - """ + """ + + response = self.llm.invoke(user_prompt=prompt) + question_type = self.question_type_parser.parse(response.content) + + return question_type.question_type - llm = get_llm_instance() - response = llm.invoke(user_prompt=prompt) - question_type = question_type_parser.parse(response.content) - - return question_type.question_type - -@retry_llm_call(max_retries=3, delay=2) -def get_is_complaint_and_is_complaint_level(conversation_rows): - """分析是否抱怨及抱怨级别""" - dialogue_str = get_dialogue_str(conversation_rows) - - prompt = f""" + @retry_llm_call(max_retries=3, delay=2) + def get_is_complaint_and_is_complaint_level(self, conversation_rows): + """分析是否抱怨及抱怨级别""" + dialogue_str = self.get_dialogue_str(conversation_rows) + + prompt = f""" 请根据以下对话记录分析访客情绪是否对博微软件或者坐席服务存在明显抱怨,并按照以下结构输出JSON格式分析结果: 1. 抱怨识别:判断访客是否对博微软件功能或者坐席服务存在明显抱怨语气或词语 @@ -346,7 +363,7 @@ def get_is_complaint_and_is_complaint_level(conversation_rows): }} 输出格式要求: -{is_complaint_parser.get_format_instructions()} +{self.is_complaint_parser.get_format_instructions()} 当前对话记录: {dialogue_str} @@ -356,130 +373,209 @@ def get_is_complaint_and_is_complaint_level(conversation_rows): 2. 注意抱怨升级趋势(如从一般抱怨发展为严重抗议) 3. 关注非文本线索(如有记录可分析语气词、停顿等副语言特征) 4. 标注涉及多个抱怨对象的情况 - """ - - llm = get_llm_instance() - response = llm.invoke(user_prompt=prompt) - is_complaint = is_complaint_parser.parse(response.content) - - return (is_complaint.is_dissatisfaction, - is_complaint.dissatisfaction_level, - is_complaint.dissatisfaction_reasoning, - is_complaint.is_complaint) - -# ================ 主流程处理 ================ -def process_conversation(conversation_id, conversation_rows, product_detail_dict): - """处理单个会话的函数,用于多线程并发""" - # 获取工单基本信息 - workorder_dict = get_workorder_dict(conversation_rows) - - # 分析是否抱怨、是否投诉、抱怨级别 - is_dissatisfaction, dissatisfaction_level, dissatisfaction_reasoning, is_complaint = ( - get_is_complaint_and_is_complaint_level(conversation_rows)) - - # 分析用户问题和解决方案 - user_question, solution = get_user_question_and_solution(conversation_rows) - - # 分析问题类型 - problem_type = get_problem_type(conversation_rows) - - # 分析产品线 - product_line = get_product_line(conversation_rows, product_detail_dict) - # 分析产品名称和模块名称 - if product_line != '': - product_name, module_name = get_product_name_and_module_name( - product_line, conversation_rows, product_detail_dict) - else: - product_name = '' - module_name = '' - - # 更新工单字典 - workorder_dict.update({ - "产品线": product_line, - "产品名称": product_name, - "模块名称": module_name, - "客户问题": user_question, - "问题类型": problem_type, - "是否抱怨": "是" if is_dissatisfaction else '否', - "抱怨级别": dissatisfaction_level, - "是否投诉": "是" if is_complaint else '否', - "解决方案": (solution + '\n存在抱怨:' + dissatisfaction_reasoning) if is_dissatisfaction else solution - }) - - return workorder_dict - -def analyze_conversation_data(conversation_excel_path, product_detail_excel_path, max_workers=4): - """分析会话数据主流程,使用多线程并发处理""" - # 读取Excel文件 - df = pd.read_excel(conversation_excel_path) - - # 检查数据框的列 - print(f"Excel文件列名: {df.columns.tolist()}") - print(f"数据总行数: {len(df)}") - - # 解析产品详情 - product_detail_dict = parse_product_detail_excel(product_detail_excel_path) - - # 按会话ID分组 - conversation_dict = group_conversations_by_id(df) - - # 使用线程池处理每个会话 - workorder_dict_list = [] - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - # 创建任务 - future_to_conversation = { - executor.submit(process_conversation, conversation_id, conversation_rows, product_detail_dict): conversation_id - for conversation_id, conversation_rows in conversation_dict.items() - } + """ - # 获取结果 - for future in concurrent.futures.as_completed(future_to_conversation): - conversation_id = future_to_conversation[future] - try: - workorder_dict = future.result() - workorder_dict_list.append(workorder_dict) - print(f"完成处理会话ID: {conversation_id}") - except Exception as exc: - print(f"处理会话ID: {conversation_id} 时发生错误: {exc}") + response = self.llm.invoke(user_prompt=prompt) + is_complaint = self.is_complaint_parser.parse(response.content) + + return (is_complaint.is_dissatisfaction, + is_complaint.dissatisfaction_level, + is_complaint.dissatisfaction_reasoning, + is_complaint.is_complaint) - return workorder_dict_list + def process_conversation(self, conversation_id, conversation_rows, product_detail_dict): + """处理单个会话的函数,用于多线程并发""" + # 获取工单基本信息 + workorder_dict = self.get_workorder_dict(conversation_rows) + + # 分析是否抱怨、是否投诉、抱怨级别 + is_dissatisfaction, dissatisfaction_level, dissatisfaction_reasoning, is_complaint = ( + self.get_is_complaint_and_is_complaint_level(conversation_rows)) -def save_results_to_excel(workorder_dict_list, output_file='workorder_result.xlsx'): - """将结果保存到Excel文件""" - result_df = pd.DataFrame(workorder_dict_list) + # 分析用户问题和解决方案 + user_question_list = self.get_user_question_and_solution(conversation_rows) + for user_question in user_question_list: + user_question_str = user_question.user_question + solution_str = user_question.solution + # 分析问题类型 + problem_type = self.get_problem_type(conversation_rows, user_question_str, solution_str) + + # 分析产品线 + product_line = self.get_product_line(conversation_rows, product_detail_dict, user_question_str, solution_str) + # 分析产品名称和模块名称 + if product_line != '': + product_name, module_name = self.get_product_name_and_module_name( + product_line, conversation_rows, product_detail_dict, user_question_str, solution_str) + else: + product_name = '' + module_name = '' + + # 更新工单字典 + workorder_dict.update({ + "产品线": product_line, + "产品名称": product_name, + "模块名称": module_name, + "客户问题": user_question_str, + "问题类型": problem_type, + "是否抱怨": "是" if is_dissatisfaction else '否', + "抱怨级别": dissatisfaction_level if is_dissatisfaction else '', + "是否投诉": "是" if is_complaint else '否', + "解决方案": (solution_str + '\n存在抱怨:' + dissatisfaction_reasoning) if is_dissatisfaction else solution_str + }) + + return workorder_dict - # 按照指定的列顺序重新排列DataFrame的列 - columns_order = [ - '工单编号', '产品线', '产品名称', '模块名称', '问题类型', - '客户问题', '解决方案', '是否抱怨', '是否投诉', '抱怨级别', - '会话id', '访客昵称', '处理坐席', '创建时间' - ] + def analyze_conversation_data(self, conversation_excel_path, product_detail_excel_path, max_workers=4): + """分析会话数据主流程,使用多线程并发处理""" + # 读取Excel文件 + df = pd.read_excel(conversation_excel_path) + + # 检查数据框的列 + print(f"Excel文件列名: {df.columns.tolist()}") + print(f"数据总行数: {len(df)}") + + # 解析产品详情 + product_detail_dict = self.parse_product_detail_excel(product_detail_excel_path) + + # 按会话ID分组 + conversation_dict = self.group_conversations_by_id(df) + + # 使用线程池处理每个会话 + workorder_dict_list = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + # 创建任务 + future_to_conversation = { + executor.submit(self.process_conversation, conversation_id, conversation_rows, product_detail_dict): conversation_id + for conversation_id, conversation_rows in conversation_dict.items() + } + + # 获取结果 + for future in concurrent.futures.as_completed(future_to_conversation): + conversation_id = future_to_conversation[future] + try: + workorder_dict = future.result() + workorder_dict_list.append(workorder_dict) + print(f"完成处理会话ID: {conversation_id}") + except Exception as exc: + print(f"处理会话ID: {conversation_id} 时发生错误: {exc}") + + return workorder_dict_list - # 确保所有列都存在,如果不存在则添加空列 - for col in columns_order: - if col not in result_df.columns: - result_df[col] = None + def save_results_to_excel(self, workorder_dict_list, output_file=None): + """将结果保存到Excel文件""" + result_df = pd.DataFrame(workorder_dict_list) + + # 按照指定的列顺序重新排列DataFrame的列 + columns_order = [ + '工单编号', '产品线', '产品名称', '模块名称', '问题类型', + '客户问题', '解决方案', '是否抱怨', '是否投诉', '抱怨级别', + '会话id', '访客昵称', '处理坐席', '创建时间' + ] + + # 确保所有列都存在,如果不存在则添加空列 + for col in columns_order: + if col not in result_df.columns: + result_df[col] = None + + # 按指定顺序重排列 + result_df = result_df[columns_order] + + # 保存到Excel文件 + if output_file is None: + # 默认输出文件名 + timestamp = datetime.now().strftime("%Y%m%d%H%M%S") + output_file = os.path.join('data', 'excel', f'会话内容详情{timestamp}_转工单.xlsx') + + # 确保目录存在 + os.makedirs(os.path.dirname(output_file), exist_ok=True) + + # 创建ExcelWriter对象,用于设置Excel样式 + with pd.ExcelWriter(output_file, engine='openpyxl') as writer: + # 写入数据 + result_df.to_excel(writer, index=False, sheet_name='工单数据') + + # 获取工作簿和工作表 + workbook = writer.book + worksheet = writer.sheets['工单数据'] + + # 设置行高(20磅 ≈ 26.67像素) + for row in worksheet.iter_rows(): + worksheet.row_dimensions[row[0].row].height = 20 + + # 设置列宽 + column_widths = { + '工单编号': 15, + '产品线': 24, + '产品名称': 40, + '模块名称': 40, + '问题类型': 9, + '客户问题': 20, + '解决方案': 30, + '是否抱怨': 9, + '是否投诉': 9, + '抱怨级别': 9, + '会话id': 9, + '访客昵称': 9, + '处理坐席': 9, + '创建时间': 9 + } + + # 应用列宽设置 + for i, column in enumerate(columns_order): + col_letter = chr(65 + i) # A, B, C, ... + if i >= 26: # 超过Z的情况 + col_letter = chr(64 + i // 26) + chr(65 + i % 26) + worksheet.column_dimensions[col_letter].width = column_widths[column] + + print(f"结果已保存到 {output_file}") + + return output_file + +# ================ 参数解析 ================ +def parse_arguments(): + """解析命令行参数""" + parser = argparse.ArgumentParser(description='对话内容转工单工具') - # 按指定顺序重排列 - result_df = result_df[columns_order] + parser.add_argument('--conversation_file', type=str, required=False, + help='会话内容Excel文件路径') + parser.add_argument('--product_detail_file', type=str, required=False, + help='产品详情Excel文件路径') + parser.add_argument('--max_workers', type=int, default=16, + help='并发处理线程数,默认为16') - # 保存到Excel文件 - result_df.to_excel(output_file, index=False) - print(f"结果已保存到 {output_file}") + return parser.parse_args() # ================ 主程序入口 ================ def main(): """主程序入口""" - # 文件路径 - conversation_excel_path = os.path.join('data', 'excel', '会话内容详情20250527173330.xlsx') - product_detail_excel_path = os.path.join('data', 'excel', '产品详情20250527175736.xlsx') + # 解析命令行参数 + args = parse_arguments() - # 分析会话数据,设置并发线程数 - max_workers = 8 # 可以根据CPU核心数和任务特性调整 - workorder_dict_list = analyze_conversation_data(conversation_excel_path, product_detail_excel_path, max_workers) + # 设置默认文件路径 + conversation_excel_path = args.conversation_file or os.path.join('data', 'excel', '会话内容详情20250528110230.xlsx') + product_detail_excel_path = args.product_detail_file or os.path.join('data', 'excel', '产品详情_工单.xlsx') + output_file = args.output_file + + # 配置LLM参数 + llm_params = { + "temperature": args.temperature, + "model": args.model_name or os.getenv("LLM_MODEL_NAME"), + "api_key": os.getenv("OPENAI_API_KEY"), + "base_url": os.getenv("OPENAI_API_BASE") + } + + # 创建处理实例 + processor = DialogueToWorkorder(llm_params=llm_params) + + # 分析会话数据 + workorder_dict_list = processor.analyze_conversation_data( + conversation_excel_path, + product_detail_excel_path, + max_workers=args.max_workers + ) # 保存结果 - save_results_to_excel(workorder_dict_list) + processor.save_results_to_excel(workorder_dict_list, output_file) if __name__ == "__main__": main() diff --git a/rag2_0/demo/judge_judge_answer.py b/rag2_0/demo/judge_judge_answer.py deleted file mode 100644 index 389847b..0000000 --- a/rag2_0/demo/judge_judge_answer.py +++ /dev/null @@ -1,462 +0,0 @@ -""" -综合评判工具 - -此模块结合了答案正确性评判和检索内容相关性评分功能,可以同时: -1. 评判问题的新旧回答是否正确 -2. 比较新旧回答的差异 -3. 评估检索内容与问题的相关性 - -用法示例: - judge = CombinedJudge() - judge.process() -""" - -import pandas as pd -from urllib.parse import unquote -from rag2_0.tool.WikijsTool import WikijsTool -from rag2_0.tool.html_to_md import convert_html_to_md -from rag2_0.tool.ModelTool import OpenAiLLM -from dotenv import load_dotenv -import os -from tqdm import tqdm -from rag2_0.dify.dify_tool import DifyTool -import json -from pydantic import BaseModel, Field -from langchain.output_parsers import PydanticOutputParser -import concurrent.futures -from threading import Lock -load_dotenv() - -class ContentSource(BaseModel): - score:int = Field(description="相关性分数") - reason:str = Field(description="评分理由") - -class CombinedJudge: - """ - 综合评判工具类 - - 结合了答案正确性评判和检索内容相关性评分功能 - """ - - def __init__(self, wiki_excel_path="/data/QueryRewrite/data/excel/部分提问_软件名称明确.xlsx", - answer_excel_path="/data/QueryRewrite/data/excel/dify问答_对比结果.xlsx", - output_path="/data/QueryRewrite/data/excel/dify问答__综合评判结果.xlsx", - dify_appid="ccf92b97-2789-4a3f-90e0-135a869a37c5", - max_workers=10): - """ - 初始化综合评判工具 - - 参数: - wiki_excel_path (str): Wiki Excel文件路径 - answer_excel_path (str): 答案对比Excel文件路径 - output_path (str): 输出Excel文件路径 - dify_appid (str): Dify应用ID - max_workers (int): 最大工作线程数 - """ - self.wiki_excel_path = wiki_excel_path - self.answer_excel_path = answer_excel_path - self.output_path = output_path - self.dify_appid = dify_appid - self.max_workers = max_workers - self.content_source_parser = PydanticOutputParser(pydantic_object=ContentSource) - self.results_lock = Lock() - - # 读取Excel文件 - if os.path.exists(wiki_excel_path): - self.wiki_excel = pd.read_excel(self.wiki_excel_path) - else: - self.wiki_excel = None - self.answer_excel = pd.read_excel(self.answer_excel_path) - - # 初始化LLM - self.api_key = os.getenv("OPENAI_API_KEY") - self.base_url = os.getenv("OPENAI_API_BASE") - self.model = os.getenv("LLM_MODEL_NAME") - - if not all([self.api_key, self.base_url, self.model]): - raise ValueError("请设置 OPENAI_API_KEY, OPENAI_API_BASE, 和 LLM_MODEL_NAME 环境变量") - - self.llm = OpenAiLLM(api_key=self.api_key, base_url=self.base_url, model=self.model) - - def find_wiki_link(self, query) -> str | None: - """ - 根据查询(对应wiki_excel中的新提问列)找出对应的词条链接 - - 参数: - query (str): 查询内容,对应wiki_excel中的新提问列 - - 返回: - str: 对应的词条链接,如果没有找到则返回None - """ - # 确保query不为空 - if not query or pd.isna(query): - return None - if self.wiki_excel is None: - return None - - # 在"新提问"列中查找匹配的行 - matched_rows = self.wiki_excel[self.wiki_excel['新提问'] == query] - - # 如果找到了匹配的行,返回对应的词条链接 - if not matched_rows.empty: - return matched_rows.iloc[0]['对应词条链接'] - - # 如果没有完全匹配,尝试部分匹配 - # 去除软件名称部分(如果有) - query_parts = query.split(',', 1) - if len(query_parts) > 1: - clean_query = query_parts[1].strip() - - # 在"提问"列中查找包含清理后查询的行 - for idx, row in self.wiki_excel.iterrows(): - if pd.notna(row['提问']) and clean_query in row['提问']: - return row['对应词条链接'] - - return None - - def get_wiki_content(self, link) -> str: - """ - 获取词条链接的内容 - - 参数: - link (str): 词条链接 - - 返回: - str: 链接内容,如果获取失败则返回错误信息 - """ - try: - if not link or pd.isna(link): - return "链接为空或无效" - # 移除域名部分,只保留路径 - path = link.split('/', 3)[-1] - decoded_path = unquote(path) - path_parts = decoded_path.split('/') - doc_path = "/".join(path_parts[1:]) - wiki_doc = WikijsTool.get_all_doc_by_path(path=doc_path, path_is_dir=False) - html_content = WikijsTool.query_doc_info(wiki_doc[0]["id"]).get('content') - if not html_content: - return "获取内容失败" - - options = {"heading_style": '', "keep_inline_images_in": ["figure", "img"], "escape_asterisks": True} - new_content = (html_content.replace("h6>", "h7>") - .replace("h5>", "h6>") - .replace("h4>", "h5>") - .replace("h3>", "h4>") - .replace("h2>", "h3>") - .replace("h1>", "h2>")) - # 将HTML内容转换为Markdown - markdown_content = convert_html_to_md(new_content, "", **options) - markdown_content = f"# {path_parts[-1]}\n\n{markdown_content}" - return markdown_content - - except Exception as e: - raise RuntimeError(f"获取词条内容失败: {str(e)}") from e - - def get_wiki_title(self, link) -> str | None: - """ - 获取词条标题 - - 参数: - link (str): 词条链接 - - 返回: - str: 词条标题,如果获取失败则返回None - """ - try: - if not link or pd.isna(link): - return None - # 移除域名部分,只保留路径 - path = link.split('/', 3)[-1] - decoded_path = unquote(path) - path_parts = decoded_path.split('/') - return path_parts[-1] - - except Exception as e: - raise RuntimeError(f"获取词条内容失败: {str(e)}") from e - - def create_correctness_prompt(self, standard_answer: str, answer_to_check: str) -> str: - """ - 创建用于评判答案正确性的prompt - - 参数: - standard_answer (str): 标准答案 - answer_to_check (str): 需要检查的答案 - - 返回: - str: 格式化的prompt - """ - return f"""请作为一个专业的答案评判专家,评估以下回答与标准答案的匹配程度。 - -标准答案: -{standard_answer} - -待评估的回答: -{answer_to_check} - -请仔细分析两个答案的内容,并给出你的判断。只需要回答"正确"或"错误",不需要其他解释。 -如果待评估的回答与标准答案在核心内容和关键信息(步骤)上一致,即使表达方式不同,也应判定为"正确"。 -如果待评估的回答存在明显的错误信息或重要信息缺失,应判定为"错误"。 - -请严格按以下格式输出:【正确】或【错误】:""" - - def judge_answer(self, standard_answer: str, answer: str) -> bool | None: - """ - 调用LLM判断回答是否正确 - - 参数: - standard_answer (str): 标准答案(来自Wiki) - answer (str): 需评判的回答 - - 返回: - bool | None: 判断结果,True表示正确,False表示错误,None表示判断失败 - """ - prompt = self.create_correctness_prompt(standard_answer, answer) - try: - response = self.llm.invoke(user_prompt=prompt, need_retry=True) - return "正确" in response.content - except Exception as e: - return None - - def judge_by_standard_answer(self, standard_answer: str, old_answer: str, new_answer: str) -> str | None: - """ - 综合判断新旧回答的正确性 - - 参数: - standard_answer (str): 标准答案(来自Wiki) - old_answer (str): 旧流程的回答 - new_answer (str): 新流程的回答 - - 返回: - str | None: 包含新旧回答判断结果的字符串,None表示判断失败 - """ - old_result = self.judge_answer(standard_answer, old_answer) - new_result = self.judge_answer(standard_answer, new_answer) - if old_result is None or new_result is None: - return None - if new_result and old_result: - return "新旧答案均正确" - elif new_result and not old_result: - return "新答案正确" - elif not new_result and old_result: - return "旧答案正确" - else: - return "新旧答案均错误" - - def judge_answer_diff(self, old_answer: str, new_answer: str) -> str | None: - """ - 判断新旧回答是否存在较大差异 - - 参数: - old_answer (str): 旧流程的回答 - new_answer (str): 新流程的回答 - - 返回: - str | None: 差异判断结果,None表示判断失败 - """ - - prompt = f"""请判断以下两个回答是否存在较大差异: - - 旧回答: {old_answer} - - 新回答: {new_answer} - - 主要是主要步骤、主要信息、或者主要主体的差异 - 请仅回答"存在较大差异"或"差异较小"。""" - - try: - response = self.llm.invoke(user_prompt=prompt, need_retry=True) - return "缺乏标准答案无法判断准确性,但答案差异较大" if "存在较大差异" in response.content else "缺乏标准答案无法判断准确性,但答案基本相同" - except Exception as e: - return None - - def calculate_score(self, query:str, content:str) -> int: - """ - 使用LLM判断query与content之间的相关性分数 - - 参数: - query (str): 用户问题 - content (str): 检索内容 - - 返回: - int: 相关性分数,1-10分,10代表完全相关,1代表完全不相关;-1表示评分失败 - """ - try: - prompt = f"""你是一个专业的信息相关性评估助手。请根据以下标准对用户query和检索内容的相关性进行1-10评分(10=完全相关,1=完全不相关),并按指定格式输出JSON结果。 - -【评分标准】 -10分:完全契合,主题/意图完全一致且涵盖所有关键信息 -8-9分:高度相关,核心要素匹配但存在少量信息缺失 -6-7分:部分相关,涉及相同主题但存在重要信息缺失 -4-5分:弱相关,仅次要信息点匹配 -1-3分:完全不相关或信息冲突 - -【评估维度】 -1. 主题一致性:核心主题/意图的匹配程度 -2. 内容覆盖度:是否涵盖query的关键要素 -3. 信息准确性:是否存在矛盾/错误信息 -4. 细节丰富度:是否提供query要求的详细信息 - -【输出格式】 -{{ - "score": 评分, - "reason": "简明扼要的评分理由(中文)" -}} - -【示例】 -query: "新冠疫苗的常见副作用" -内容: "辉瑞疫苗常见反应包括注射部位疼痛(84.1%)、疲劳(62.9%)" -输出: {{"score":8,"reason":"主题完全匹配,涵盖主要副作用但未提及发热等常见反应"}} - -现在评估: -query: "{query}" -content: "{content}" -""" - - response = self.llm.invoke(user_prompt=prompt, need_retry=True) - - # 解析JSON响应 - try: - parsed_output = self.content_source_parser.parse(response.content) - return parsed_output.score - except Exception as e: - return -1 - except Exception as e: - return -1 - - def get_retrieve_info(self, query:str, outputs:dict) -> tuple: - """ - 获取检索信息并计算分数 - - 参数: - query (str): 用户问题 - outputs (dict): 检索输出结果 - - 返回: - tuple: (检索内容列表, 最高分, 最低分, 平均分) - """ - max_score = 0 - min_score = 10 - total_score = 0 - valid_scores = 0 - retrieve_content = [] - - for result in outputs["result"]: - content = result["content"].strip() - score = self.calculate_score(query=query, content=content) - if score != -1: - max_score = max(max_score, score) - min_score = min(min_score, score) - total_score += score - valid_scores += 1 - content_title = content.split("\n")[0] - if content_title: - retrieve_content.append(content_title + f"--得分({score}分)") - - avg_score = total_score / valid_scores if valid_scores > 0 else 0 - return retrieve_content, max_score, min_score, avg_score - - def process_single_question(self, row): - """ - 处理单个问题的评判 - - 参数: - row: DataFrame中的一行数据 - - 返回: - dict: 包含处理结果的字典 - """ - query = row["问题"] - old_answer = row["旧流程答案"] - new_answer = row["新流程答案"] - - # 获取词条链接和标准答案 - wiki_url = self.find_wiki_link(query) - standard_answer = "" - answer_title = "" - - try: - if wiki_url and not pd.isna(wiki_url): - standard_answer = self.get_wiki_content(wiki_url) - answer_title = self.get_wiki_title(wiki_url) - except Exception as e: - print(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}") - - # 判断答案正确性 - if standard_answer: - judge_result = self.judge_by_standard_answer(standard_answer, old_answer, new_answer) - else: - judge_result = self.judge_answer_diff(old_answer, new_answer) - - if judge_result is None: - judge_result = "" - - # 获取检索内容评分 - retrieve_content = [] - max_score = 0 - min_score = 0 - avg_score = 0 - rewrite_query = "" - - try: - message_info = DifyTool.get_message_debug_info(appid=self.dify_appid, query=query) - for workflow_node in message_info["workflow_node_executions_info"]: - if workflow_node["title"] == "知识检索结果后处理": - outputs = json.loads(workflow_node["outputs"]) - retrieve_content, max_score, min_score, avg_score = self.get_retrieve_info(query=query, outputs=outputs) - elif workflow_node["title"] == "问题优化结果解析": - outputs = json.loads(workflow_node["outputs"]) - rewrite_query = outputs["optimize_query"] - except Exception as e: - print(f"处理问题 '{query}' 获取检索内容时发生错误: {str(e)}") - - # 返回结果 - return { - "问题": query, - "问题改写": rewrite_query, - "旧流程答案": old_answer, - "新流程答案": new_answer, - "回答判断": judge_result, - "答案词条": answer_title if answer_title else "", - "检索得到词条": "\n".join(retrieve_content) if retrieve_content else "未检索知识库", - } - - def process(self): - """ - 多线程处理所有问题并进行综合评判 - - 读取Excel文件中的问题和答案,使用多线程进行评判,并将结果保存到输出Excel文件 - """ - # 创建结果列表 - results = [] - - # 创建进度条 - with tqdm(total=len(self.answer_excel), desc="处理问题中") as pbar: - # 使用线程池执行任务 - with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: - # 提交所有任务 - future_to_row = {executor.submit(self.process_single_question, row): idx - for idx, row in self.answer_excel.iterrows()} - - # 处理完成的任务 - for future in concurrent.futures.as_completed(future_to_row): - idx = future_to_row[future] - try: - result = future.result() - with self.results_lock: - results.append(result) - except Exception as e: - print(f"处理第 {idx} 行时发生错误: {str(e)}") - finally: - pbar.update(1) - - # 将结果转换为DataFrame并保存 - results_df = pd.DataFrame(results) - results_df.to_excel(self.output_path, index=False) - print(f"处理完成,共处理 {len(results)} 条记录,结果已保存至 {self.output_path}") - -# 测试函数 -if __name__ == "__main__": - # 创建综合评判工具实例 - judge = CombinedJudge(max_workers=30) - # 执行处理 - judge.process() \ No newline at end of file diff --git a/rag2_0/dify/dify_tool.py b/rag2_0/dify/dify_tool.py index f129df4..1210be7 100644 --- a/rag2_0/dify/dify_tool.py +++ b/rag2_0/dify/dify_tool.py @@ -160,7 +160,7 @@ class DifyTool: 应用信息、消息详情以及工作流节点执行情况。 """ @staticmethod - def get_message_debug_info_id(message_id:str)->dict | None: + def get_message_debug_info_by_id(message_id:str)->dict | None: """ 根据消息 ID 从 'messages' 表中获取消息信息。 """ @@ -178,7 +178,7 @@ class DifyTool: @staticmethod - def get_message_debug_info(appid:str, query:str)->dict: + def get_message_debug_info_by_query(appid:str, query:str)->dict: """ 获取指定应用和查询相关的调试信息。 @@ -212,4 +212,4 @@ class DifyTool: if __name__ == "__main__": - print(DifyTool.get_message_debug_info("ccf92b97-2789-4a3f-90e0-135a869a37c5", "电力建设计价通软件,导入结算后没有暂列金怎么办?要手动添加么?")) + print(DifyTool.get_message_debug_info_by_query("ccf92b97-2789-4a3f-90e0-135a869a37c5", "电力建设计价通软件,导入结算后没有暂列金怎么办?要手动添加么?")) diff --git a/rag2_0/dify/test_dify_chatapi.py b/rag2_0/dify/test_dify_chatapi.py index fa5cb64..93ca1f9 100644 --- a/rag2_0/dify/test_dify_chatapi.py +++ b/rag2_0/dify/test_dify_chatapi.py @@ -9,12 +9,27 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm from rag2_0.dify.dify_tool import DifyTool import json +from urllib.parse import unquote +from rag2_0.tool.WikijsTool import WikijsTool +from rag2_0.tool.html_to_md import convert_html_to_md +from rag2_0.tool.ModelTool import OpenAiLLM +from dotenv import load_dotenv +from pydantic import BaseModel, Field +from langchain.output_parsers import PydanticOutputParser +from threading import Lock + +load_dotenv() + +class ContentSource(BaseModel): + score:int = Field(description="相关性分数") + reason:str = Field(description="评分理由") class DifyComparisonTester: """ - Dify新旧流程对比测试类,用于比较两个不同流程的问答效果 + Dify新旧流程对比测试类,用于比较两个不同流程的问答效果并进行评判 """ - def __init__(self, excel_path:str, baseurl:str, old_workflow_api_key:str, new_workflow_api_key:str): + def __init__(self, excel_path:str, baseurl:str, old_workflow_api_key:str, new_workflow_api_key:str, + wiki_excel_path:str=None, output_path:str=None, max_workers:int=10): """ 初始化对比测试器 @@ -23,14 +38,33 @@ class DifyComparisonTester: baseurl: Dify API的基础URL old_workflow_api_key: 旧流程的API密钥 new_workflow_api_key: 新流程的API密钥 + wiki_excel_path: Wiki Excel文件路径,用于获取标准答案 + output_path: 输出Excel文件路径 + max_workers: 最大工作线程数 """ self.excel_path = excel_path - self.baseurl = baseurl - self.old_workflow_api_key = old_workflow_api_key - self.new_workflow_api_key = new_workflow_api_key self.old_chat = ChatClient(api_key=old_workflow_api_key, base_url=baseurl) self.new_chat = ChatClient(api_key=new_workflow_api_key, base_url=baseurl) + # 评判相关参数 + self.output_path = output_path or os.path.join(os.path.dirname(self.excel_path), "dify问答_综合评判结果.xlsx") + self.max_workers = max_workers + self.content_source_parser = PydanticOutputParser(pydantic_object=ContentSource) + self.results_lock = Lock() + + # 读取Wiki Excel文件 + if wiki_excel_path and os.path.exists(wiki_excel_path): + self.wiki_excel = pd.read_excel(wiki_excel_path) + else: + self.wiki_excel = None + + + def get_llm(self): + api_key = os.getenv("OPENAI_API_KEY") + base_url = os.getenv("OPENAI_API_BASE") + model = os.getenv("LLM_MODEL_NAME") + return OpenAiLLM(api_key=api_key, base_url=base_url, model=model) + def process_question(self, q:str): """ 处理单个问题,并行获取新旧流程的回答 @@ -62,40 +96,393 @@ class DifyComparisonTester: new_result = future_new.result() old_message_id = old_result["message_id"] new_message_id = new_result["message_id"] - old_message_info = DifyTool.get_message_debug_info_id(message_id=old_message_id) - new_message_info = DifyTool.get_message_debug_info_id(message_id=new_message_id) - for workflow_node in new_message_info["workflow_node_executions_info"]: - if workflow_node["title"] == "问题优化结果解析": - outputs = json.loads(workflow_node["outputs"]) - rewrite_query = outputs["optimize_query"] + old_answer = old_result["answer"] new_answer = new_result["answer"] except Exception as e: - return None - return {"问题": q, "问题改写": rewrite_query, "旧流程答案": old_answer, "新流程答案": new_answer} + return None, None, None + return {"问题": q, "旧流程答案": old_answer, "新流程答案": new_answer}, old_message_id, new_message_id - def run_comparison(self): + def find_wiki_link(self, query) -> str | None: + """ + 根据查询找出对应的词条链接 + + Args: + query (str): 查询内容 + + Returns: + str: 对应的词条链接,如果没有找到则返回None + """ + # 确保query不为空 + if not query or pd.isna(query): + return None + if self.wiki_excel is None: + return None + + # 在"新提问"列中查找匹配的行 + matched_rows = self.wiki_excel[self.wiki_excel['新提问'] == query] + + # 如果找到了匹配的行,返回对应的词条链接 + if not matched_rows.empty: + return matched_rows.iloc[0]['对应词条链接'] + + # 如果没有完全匹配,尝试部分匹配 + # 去除软件名称部分(如果有) + query_parts = query.split(',', 1) + if len(query_parts) > 1: + clean_query = query_parts[1].strip() + + # 在"提问"列中查找包含清理后查询的行 + for idx, row in self.wiki_excel.iterrows(): + if pd.notna(row['提问']) and clean_query in row['提问']: + return row['对应词条链接'] + + return None + + def get_wiki_content(self, link) -> str: + """ + 获取词条链接的内容 + + Args: + link (str): 词条链接 + + Returns: + str: 链接内容,如果获取失败则返回错误信息 + """ + try: + if not link or pd.isna(link): + return "链接为空或无效" + # 移除域名部分,只保留路径 + path = link.split('/', 3)[-1] + decoded_path = unquote(path) + path_parts = decoded_path.split('/') + doc_path = "/".join(path_parts[1:]) + wiki_doc = WikijsTool.get_all_doc_by_path(path=doc_path, path_is_dir=False) + html_content = WikijsTool.query_doc_info(wiki_doc[0]["id"]).get('content') + if not html_content: + return "获取内容失败" + + options = {"heading_style": '', "keep_inline_images_in": ["figure", "img"], "escape_asterisks": True} + new_content = (html_content.replace("h6>", "h7>") + .replace("h5>", "h6>") + .replace("h4>", "h5>") + .replace("h3>", "h4>") + .replace("h2>", "h3>") + .replace("h1>", "h2>")) + # 将HTML内容转换为Markdown + markdown_content = convert_html_to_md(new_content, "", **options) + markdown_content = f"# {path_parts[-1]}\n\n{markdown_content}" + return markdown_content + + except Exception as e: + raise RuntimeError(f"获取词条内容失败: {str(e)}") from e + + def get_wiki_title(self, link) -> str | None: + """ + 获取词条标题 + + Args: + link (str): 词条链接 + + Returns: + str: 词条标题,如果获取失败则返回None + """ + try: + if not link or pd.isna(link): + return None + # 移除域名部分,只保留路径 + path = link.split('/', 3)[-1] + decoded_path = unquote(path) + path_parts = decoded_path.split('/') + return path_parts[-1] + + except Exception as e: + raise RuntimeError(f"获取词条内容失败: {str(e)}") from e + + def create_correctness_prompt(self, standard_answer: str, answer_to_check: str) -> str: + """ + 创建用于评判答案正确性的prompt + + Args: + standard_answer (str): 标准答案 + answer_to_check (str): 需要检查的答案 + + Returns: + str: 格式化的prompt + """ + return f"""请作为一个专业的答案评判专家,评估以下回答与标准答案的匹配程度。 + +标准答案: +{standard_answer} + +待评估的回答: +{answer_to_check} + +请仔细分析两个答案的内容,并给出你的判断。只需要回答"正确"或"错误",不需要其他解释。 +如果待评估的回答与标准答案在核心内容和关键信息(步骤)上一致,即使表达方式不同,也应判定为"正确"。 +如果待评估的回答存在明显的错误信息或重要信息缺失,应判定为"错误"。 + +请严格按以下格式输出:【正确】或【错误】:""" + + def judge_answer(self, standard_answer: str, answer: str) -> bool | None: + """ + 调用LLM判断回答是否正确 + + Args: + standard_answer (str): 标准答案(来自Wiki) + answer (str): 需评判的回答 + + Returns: + bool | None: 判断结果,True表示正确,False表示错误,None表示判断失败 + """ + + prompt = self.create_correctness_prompt(standard_answer, answer) + llm = self.get_llm() + try: + response = llm.invoke(user_prompt=prompt, need_retry=True) + return "正确" in response.content + except Exception as e: + return None + + def judge_by_standard_answer(self, standard_answer: str, old_answer: str, new_answer: str) -> str | None: + """ + 综合判断新旧回答的正确性 + + Args: + standard_answer (str): 标准答案(来自Wiki) + old_answer (str): 旧流程的回答 + new_answer (str): 新流程的回答 + + Returns: + str | None: 包含新旧回答判断结果的字符串,None表示判断失败 + """ + old_result = self.judge_answer(standard_answer, old_answer) + new_result = self.judge_answer(standard_answer, new_answer) + if old_result is None or new_result is None: + return None + if new_result and old_result: + return "新旧答案均正确" + elif new_result and not old_result: + return "新答案正确" + elif not new_result and old_result: + return "旧答案正确" + else: + return "新旧答案均错误" + + def judge_answer_diff(self, old_answer: str, new_answer: str) -> str | None: + """ + 判断新旧回答是否存在较大差异 + + Args: + old_answer (str): 旧流程的回答 + new_answer (str): 新流程的回答 + + Returns: + str | None: 差异判断结果,None表示判断失败 + """ + + prompt = f"""请判断以下两个回答是否存在较大差异: + + 旧回答: {old_answer} + + 新回答: {new_answer} + + 主要是主要步骤、主要信息、或者主要主体的差异 + 请仅回答"存在较大差异"或"差异较小"。""" + llm = self.get_llm() + try: + response = llm.invoke(user_prompt=prompt, need_retry=True) + return "缺乏标准答案无法判断准确性,但答案差异较大" if "存在较大差异" in response.content else "缺乏标准答案无法判断准确性,但答案基本相同" + except Exception as e: + return None + + def calculate_score(self, query:str, content:str) -> int: + """ + 使用LLM判断query与content之间的相关性分数 + + Args: + query (str): 用户问题 + content (str): 检索内容 + + Returns: + int: 相关性分数,1-10分,10代表完全相关,1代表完全不相关;-1表示评分失败 + """ + + try: + prompt = f"""你是一个专业的信息相关性评估助手。请根据以下标准对用户query和检索内容的相关性进行1-10评分(10=完全相关,1=完全不相关),并按指定格式输出JSON结果。 + +【评分标准】 +10分:完全契合,主题/意图完全一致且涵盖所有关键信息 +8-9分:高度相关,核心要素匹配但存在少量信息缺失 +6-7分:部分相关,涉及相同主题但存在重要信息缺失 +4-5分:弱相关,仅次要信息点匹配 +1-3分:完全不相关或信息冲突 + +【评估维度】 +1. 主题一致性:核心主题/意图的匹配程度 +2. 内容覆盖度:是否涵盖query的关键要素 +3. 信息准确性:是否存在矛盾/错误信息 +4. 细节丰富度:是否提供query要求的详细信息 + +【输出格式】 +{{ + "score": 评分, + "reason": "简明扼要的评分理由(中文)" +}} + +【示例】 +query: "新冠疫苗的常见副作用" +内容: "辉瑞疫苗常见反应包括注射部位疼痛(84.1%)、疲劳(62.9%)" +输出: {{"score":8,"reason":"主题完全匹配,涵盖主要副作用但未提及发热等常见反应"}} + +现在评估: +query: "{query}" +content: "{content}" +""" + llm = self.get_llm() + response = llm.invoke(user_prompt=prompt, need_retry=True) + + # 解析JSON响应 + try: + parsed_output = self.content_source_parser.parse(response.content) + return parsed_output.score + except Exception as e: + return -1 + except Exception as e: + return -1 + + def get_retrieve_info(self, query:str, outputs:dict) -> tuple: + """ + 获取检索信息并计算分数 + + Args: + query (str): 用户问题 + outputs (dict): 检索输出结果 + + Returns: + tuple: (检索内容列表, 最高分, 最低分, 平均分) + """ + max_score = 0 + min_score = 10 + total_score = 0 + valid_scores = 0 + retrieve_content = [] + + for result in outputs["result"]: + content = result["content"].strip() + score = self.calculate_score(query=query, content=content) + if score != -1: + max_score = max(max_score, score) + min_score = min(min_score, score) + total_score += score + valid_scores += 1 + content_title = content.split("\n")[0] + if content_title: + retrieve_content.append(content_title + f"--得分({score}分)") + + avg_score = total_score / valid_scores if valid_scores > 0 else 0 + return retrieve_content, max_score, min_score, avg_score + + def process_question_with_judge(self, q:str): + """ + 处理单个问题,获取新旧流程的回答并进行评判 + + Args: + q: 问题内容 + + Returns: + dict: 包含问题、回答和评判结果的字典 + """ + # 获取基本的问题和回答 + basic_result, old_message_id, new_message_id = self.process_question(q) + if basic_result is None: + return None + + query = basic_result["问题"] + old_answer = basic_result["旧流程答案"] + new_answer = basic_result["新流程答案"] + + # 获取词条链接和标准答案 + wiki_url = self.find_wiki_link(query) + standard_answer = "" + answer_title = "" + + try: + if wiki_url and not pd.isna(wiki_url): + standard_answer = self.get_wiki_content(wiki_url) + answer_title = self.get_wiki_title(wiki_url) + except Exception as e: + print(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}") + + # 判断答案正确性 + if standard_answer: + judge_result = self.judge_by_standard_answer(standard_answer, old_answer, new_answer) + else: + judge_result = self.judge_answer_diff(old_answer, new_answer) + + if judge_result is None: + judge_result = "" + + # 获取检索内容评分 + retrieve_content = [] + max_score = 0 + min_score = 0 + avg_score = 0 + rewrite_query = "" + try: + new_message_info = DifyTool.get_message_debug_info_by_id(message_id=new_message_id) + for workflow_node in new_message_info["workflow_node_executions_info"]: + if workflow_node["title"] == "知识检索结果后处理": + outputs = json.loads(workflow_node["outputs"]) + retrieve_content, max_score, min_score, avg_score = self.get_retrieve_info(query=query, outputs=outputs) + elif workflow_node["title"] == "问题优化结果解析": + outputs = json.loads(workflow_node["outputs"]) + rewrite_query = outputs["optimize_query"] + except Exception as e: + print(f"处理问题 '{query}' 获取检索内容时发生错误: {str(e)}") + + # 返回结果 + return { + "问题": query, + "问题改写": rewrite_query, + "旧流程答案": old_answer, + "新流程答案": new_answer, + "回答判断": judge_result, + "答案词条": answer_title if answer_title else "", + "检索得到词条": "\n".join(retrieve_content) if retrieve_content else "未检索知识库", + "检索最高分": max_score, + "检索最低分": min_score, + "检索平均分": avg_score + } + + def run_comparison(self, with_judge=False): """ 运行对比测试,处理所有问题并生成结果Excel + Args: + with_judge: 是否进行答案评判 + Returns: str: 输出Excel文件的路径 """ # 读取Excel文件中的问题 df = pd.read_excel(self.excel_path) - questions = df.iloc[:,0].tolist() + questions = df['补全后的提问'].tolist() results = [] + # 选择处理函数 + process_func = self.process_question_with_judge if with_judge else self.process_question + # 按顺序处理问题 with tqdm(total=len(questions), desc="处理问题进度") as pbar: for q in questions: - result = self.process_question(q) + result = process_func(q) if result is not None: results.append(result) pbar.update(1) # 生成输出Excel文件 - out_path = os.path.join(os.path.dirname(self.excel_path), "dify问答_对比结果.xlsx") + out_path = self.output_path if with_judge else os.path.join(os.path.dirname(self.excel_path), "dify问答_对比结果.xlsx") df_results = pd.DataFrame(results) # 使用ExcelWriter设置格式 @@ -107,16 +494,16 @@ class DifyComparisonTester: worksheet = writer.sheets['Sheet1'] # 设置列宽 - worksheet.set_column('A:A', 50) # 问题列宽 50个Excel单位 - worksheet.set_column('B:B', 70) # 旧流程答案列宽 70个Excel单位 - worksheet.set_column('C:C', 70) # 新流程答案列宽 70个Excel单位 + for col_idx, col_name in enumerate(df_results.columns): + max_len = max(df_results[col_name].astype(str).map(len).max(), len(col_name)) + worksheet.set_column(col_idx, col_idx, min(max_len + 2, 70)) return out_path if __name__ == "__main__": # 定义Excel路径 - excel_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/历史提问数据(dislike)_1000条_软件明确.xlsx") + excel_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/历史提问数据(like)_提问明确.xlsx") if not os.path.exists(excel_path): print(f"错误:Excel文件不存在: {excel_path}") @@ -127,10 +514,21 @@ if __name__ == "__main__": old_workflow_api_key = "app-wUdkWJx5zeOvmvBUZizMoSw3" new_workflow_api_key = "app-Lf1pQ1NVwdMfCRVNTBCOTPHT" + # Wiki Excel路径和Dify应用ID(用于评判) + wiki_excel_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/部分提问_软件名称明确.xlsx") + # 创建测试器并运行 - tester = DifyComparisonTester(excel_path, baseurl, old_workflow_api_key, new_workflow_api_key) - output_file = tester.run_comparison() - print(f"对比结果已保存至: {output_file}") + tester = DifyComparisonTester( + excel_path=excel_path, + baseurl=baseurl, + old_workflow_api_key=old_workflow_api_key, + new_workflow_api_key=new_workflow_api_key, + wiki_excel_path=wiki_excel_path, + ) + + # 运行对比测试(带评判) + output_file = tester.run_comparison(with_judge=True) + print(f"对比评判结果已保存至: {output_file}") # 单个问题测试示例 # c = DifyChat(baseurl="http://172.20.0.145/v1", api_key="app-LjJaeLoAfqa6aoGzqU9UvxSf") diff --git a/rag2_0/tool/APIKeyManager.py b/rag2_0/tool/APIKeyManager.py index 75e0056..696cbd4 100644 --- a/rag2_0/tool/APIKeyManager.py +++ b/rag2_0/tool/APIKeyManager.py @@ -3,12 +3,9 @@ import random import time from typing import List, Optional, Dict from threading import Lock +import requests API_KEY_LIST=[ -"sk-xxaiabmfhzwwpijuledllkmkzhzwsqeicjxmjwnvriqpwmpk", -"sk-lldcprpqjhgdimiwewgbthngfbrazhkiuioubmaatrcpjjum", -"sk-bppugibbtvujomvoysnbcdzpcwndxtwrkfvmgbkbzcmobdon", -"sk-hnqitgdlfrrnpimcfxigqibstqquintnzpiidsshpajjyxqd", "sk-hrojkkkrrkmsajtnizokbcgexsfggdiqavbtvbayuwqbnmom", "sk-kkdklmnyompoiotzkfqahpayzlkgogfudjkyaebehtsowvid", "sk-sfxzvllifafbyfduupcdtcrjwhdyiyojnksyopnfslurnhsp", @@ -39,6 +36,26 @@ API_KEY_LIST=[ "sk-ocglenyvxkkvzupzumoypnyndjpjqhivyqpedusunboglspz", "sk-dtbawdwajkhdctrukundbkqwswzfzihqbebfuvqnfnounbuc", "sk-zqiyiqtbwqgyeenkvppymfbkspriolwbnxnjakugzxyvcuql", +"sk-wtnjpejveiobtvzsmnuaefqkocsafbfyrtqkkyqardndtxcs", +"sk-gqdvtrwvzxewnagwsfakrvajtzwgcknatpflkesyqhzjrlal", +"sk-plivglrkxahodgtgjlaqdjusdoerxspjbcbizaybicarfyuk", +"sk-gxwaotlyeunxdagmybluhxkberlvohhzteryqgbhbcpztpds", +"sk-vigugvoqrdqcgkxaiympdmbqtdhpjidylphdcodovfwjpjlf", +"sk-dgmbountewzxgwcwczyslehpcyejtkhpppibswzcvrjbywey", +"sk-ksqdvuisvvraeogskmgrwzpolzrfyelqhrajscrylncemyym", +"sk-vegffsoueyrbtlfbdzfppgtgwouuetoragimogulcncmutnx", +"sk-moprznmsibswkwnnjwmgssumqhoxdmsxelekkmptseyeussz", +"sk-kskakjerttqezqqqmdmcbnqssjztrogwqncadgekhmwzjukr", +"sk-ozwjvlbatnmfjgrxesjkzuzdgpvehmmgswcqctggjxmjgxck", +"sk-wjmmlmobcayarcvhdeiybhbwdoaacnlltuxyixcwplhedzht", +"sk-twuvwqxstatdddkobxthzhoddrritsikvnvwuvtqjxwaxhsf", +"sk-bludnuajavlgdfmelatzsdqhvaxthgagttelsbpviqwiehua", +"sk-nxxdpmesfzcfzdlnhpsoslajtwbsnzixfjdkuzfjywfktapx", +"sk-arayylrvatezqgmdbxvxqxydqnmbydbkpkskzxzszmrkkcrq", +"sk-vxvccjyewpgnnnxpsqkijsawhhpjctcdlfljwfwtguwnmetf", +"sk-zjwbwyocnuqxfshlpgfzdwlgjjrpewzgvoqwzyhufisidnos", +"sk-kjxpzjbteiurpzhwjbbjqpjjfoewsahpjtmyqwectdubxhgf", +"sk-sqdcnhapyzudneipdsuqlfawusrndxqkuwoaoumtonwdnppo", ] class APIKeyManager: @@ -142,6 +159,22 @@ class APIKeyManager: instance = cls.get_instance() return len(instance.api_keys) + @classmethod + def get_key_usage_stats(cls, key: str) -> Dict: + """ + 静态方法:获取API密钥使用统计信息 + + Returns: + API密钥使用统计信息 + """ + url = "https://api.siliconflow.cn/v1/user/info" + + headers = {"Authorization": f"Bearer {key}"} + + response = requests.request("GET", url, headers=headers) + + return response.json() + def __init__(self, env_var_name: str = "OPENAI_API_KEY", separator: str = ";"): """ 初始化API密钥管理器 @@ -251,10 +284,6 @@ class APIKeyManager: # 使用示例 if __name__ == "__main__": - - # 获取有效的API密钥列表 - valid_keys = APIKeyManager.get_valid_api_keys() - print(f"有效的API密钥列表:\n" + "\n".join(valid_keys)) # 查看总密钥数 print(f"总共有 {APIKeyManager.count()} 个API密钥") @@ -263,4 +292,5 @@ if __name__ == "__main__": instance = APIKeyManager.get_instance() stats = instance.get_usage_stats() for key, data in stats.items(): - print(f"密钥 {key[:5]}... 使用次数: {data['count']}") \ No newline at end of file + usage_stats = APIKeyManager.get_key_usage_stats(key) + print(f"api key:{key}---赠送余额:{usage_stats['data']['balance']}元") \ No newline at end of file