From 62705136884fbe833a25e8d84e219e1712a35377 Mon Sep 17 00:00:00 2001 From: ouyangyouzhang Date: Wed, 28 May 2025 10:00:33 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=E5=90=88=E5=B9=B6=E8=84=9A?= =?UTF-8?q?=E6=9C=AC=EF=BC=8C=E6=B7=BB=E5=8A=A0=E4=BC=9A=E8=AF=9D=E8=BD=AC?= =?UTF-8?q?=E5=B7=A5=E5=8D=95=E8=84=9A=E6=9C=AC=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 6 +- rag2_0/demo/Test.py | 250 ---------- rag2_0/demo/dialogue_to_workorder.py | 486 ++++++++++++++++++++ rag2_0/demo/judge_answer_right.py | 293 ------------ rag2_0/demo/judge_judge_answer.py | 462 +++++++++++++++++++ rag2_0/demo/judge_query_full.py | 4 +- rag2_0/demo/judge_retrieve_content_score.py | 239 ---------- 7 files changed, 955 insertions(+), 785 deletions(-) delete mode 100644 rag2_0/demo/Test.py create mode 100644 rag2_0/demo/dialogue_to_workorder.py delete mode 100644 rag2_0/demo/judge_answer_right.py create mode 100644 rag2_0/demo/judge_judge_answer.py delete mode 100644 rag2_0/demo/judge_retrieve_content_score.py diff --git a/.gitignore b/.gitignore index de27d28..aa9b5e5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,8 @@ -# 忽略所有Python缓存文件夹 +# 忽略所有Python缓存文件和文件夹 __pycache__/ *.py[cod] *$py.class + +# 忽略数据文件 +data/excel/* +rag2_0/demo/Test.py diff --git a/rag2_0/demo/Test.py b/rag2_0/demo/Test.py deleted file mode 100644 index 6cef6be..0000000 --- a/rag2_0/demo/Test.py +++ /dev/null @@ -1,250 +0,0 @@ -""" -提问内容补全工具 - -此模块用于解析Excel文件中的提问和回答,调用LLM补全提问内容, -并将原提问和补全后的提问保存到新的Excel文件中。 - -用法示例: - completer = QuestionCompleter(input_path="历史提问数据(dislike).xlsx", output_path="补全后的提问数据.xlsx") - completer.process() -""" - -import pandas as pd -from tqdm import tqdm -import os -from dotenv import load_dotenv -from rag2_0.tool.ModelTool import OpenAiLLM -from pydantic import BaseModel, Field -from langchain.output_parsers import PydanticOutputParser -import concurrent.futures -from threading import Lock - -class RewriteQuery(BaseModel): - rewrite_query:str = Field(description="补全后的提问") - software_name:str = Field(description="软件名称") - -# 加载环境变量 -load_dotenv() - -class QuestionCompleter: - """ - 提问内容补全工具类 - - 用于解析Excel文件中的提问和回答,调用LLM补全提问内容, - 并将原提问和补全后的提问保存到新的Excel文件中。 - """ - - def __init__(self, input_path="/data/Rag2_0/data/excel/历史提问数据(dislike).xlsx", - output_path="/data/Rag2_0/data/excel/历史提问数据(dislike)_补全后的提问数据.xlsx", - question_column="提问", answer_column="回答", max_workers=10): - """ - 初始化提问内容补全工具 - - 参数: - input_path (str): 输入Excel文件路径 - output_path (str): 输出Excel文件路径 - question_column (str): 提问列的名称 - answer_column (str): 回答列的名称 - max_workers (int): 最大线程数 - """ - self.input_path = input_path - self.output_path = output_path - self.question_column = question_column - self.answer_column = answer_column - self.max_workers = max_workers - self.rewrite_query_parser = PydanticOutputParser(pydantic_object=RewriteQuery) - self.lock = Lock() # 添加线程锁 - - # 初始化LLM - self.api_key = os.getenv("OPENAI_API_KEY") - self.base_url = os.getenv("OPENAI_API_BASE") - self.model = os.getenv("LLM_MODEL_NAME") - - if not all([self.api_key, self.base_url, self.model]): - raise ValueError("请设置 OPENAI_API_KEY, OPENAI_API_BASE, 和 LLM_MODEL_NAME 环境变量") - - self.llm = OpenAiLLM(api_key=self.api_key, base_url=self.base_url, model=self.model) - - # 读取Excel文件 - try: - self.df = pd.read_excel(self.input_path) - print(f"成功读取Excel文件: {self.input_path}") - print(f"共有 {len(self.df)} 条记录") - except Exception as e: - raise RuntimeError(f"读取Excel文件失败: {str(e)}") - - # 检查列是否存在 - if self.question_column not in self.df.columns: - raise ValueError(f"Excel文件中不存在列: {self.question_column}") - if self.answer_column not in self.df.columns: - raise ValueError(f"Excel文件中不存在列: {self.answer_column}") - - def create_completion_prompt(self, question, answer): - """ - 创建用于补全提问的prompt - - 参数: - question (str): 原始提问 - answer (str): 对应的回答 - - 返回: - str: 格式化的prompt - """ - prompt = f""" -1、判断提问中是否缺少软件名称,如果不缺少,则直接返回原始提问 -2、如果缺少软件名称,则根据回答中的软件名称,补全提问 -3、补全后的提问需要保持问题原有意图不变 - -4、软件名称包括: - 配网D3软件(配网工程计价通D3) - 西藏Z1软件(西藏电力工程计价通Z1) - 主网计价通软件(电力建设计价通) - 技改检修工程计价通T1软件(技改检修工程计价通T1) - 技改检修清单计价通T1软件(技改检修清单计价通T1) - 储能C1软件(新型储能电站建设计价通C1) -如果没有包含上述软件名称,则直接返回原始提问,software_name为空字符串 -{{ - "rewrite_query": "xxx", - "software_name": "" -}} - -原始提问:{question} -系统回答:{answer} - -输出格式: -{self.rewrite_query_parser.get_format_instructions()} - -示例: -例如,如果输入是: -提问:这个软件怎么用? -回答:Photoshop的使用方法是... - -那么输出会是: -{{ - "rewrite_query": "Photoshop这个软件怎么用?", - "software_name": "Photoshop" -}} - -或者如果提问已经包含软件名称: -提问:Photoshop怎么用? -回答:Photoshop的使用方法是... - -那么输出会是: -{{ - "rewrite_query": "Photoshop怎么用?", - "software_name": "Photoshop" -}} - -""" - return prompt - - def complete_question(self, question, answer): - """ - 调用LLM补全提问内容 - - 参数: - question (str): 原始提问 - answer (str): 对应的回答 - - 返回: - str: 补全后的提问,如果补全失败则返回原始提问 - """ - # 如果提问或回答为空,直接返回原始提问 - if pd.isna(question) or question.strip() == "" or pd.isna(answer) or answer.strip() == "": - return question, "" - - try: - prompt = self.create_completion_prompt(question, answer) - response = self.llm.invoke(prompt) - completed_question = self.rewrite_query_parser.parse(response.content) - return completed_question.rewrite_query, completed_question.software_name - except Exception as e: - print(f"补全提问失败: {str(e)}") - return question, "" - - def process_row(self, row): - """ - 处理单行数据 - - 参数: - row: DataFrame中的一行 - - 返回: - dict: 处理结果 - """ - original_question = row[self.question_column] - answer = row[self.answer_column] - - # 调用LLM补全提问 - completed_question, software_name = self.complete_question(original_question, answer) - - # 创建结果字典 - result = { - "原始提问": original_question, - "补全后的提问": completed_question, - "软件名称": software_name - } - - return result - - def process(self): - """ - 使用多线程处理所有提问并补全内容 - - 读取Excel文件中的提问和回答,调用LLM补全提问内容, - 并将原提问和补全后的提问保存到新的Excel文件中 - """ - results = [] - total = len(self.df) - - # 使用进度条显示总体进度 - with tqdm(total=total, desc="补全提问") as pbar: - # 创建线程池 - with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: - # 提交所有任务 - future_to_idx = {executor.submit(self.process_row, self.df.iloc[idx]): idx for idx in range(total)} - - # 处理完成的任务 - for future in concurrent.futures.as_completed(future_to_idx): - result = future.result() - with self.lock: - results.append(result) - pbar.update(1) - - # 将结果转换为DataFrame并保存 - results_df = pd.DataFrame(results) - results_df.to_excel(self.output_path, index=False) - print(f"处理完成,共处理 {len(results)} 条记录,结果已保存至 {self.output_path}") - -def main(): - """主函数""" - import argparse - - parser = argparse.ArgumentParser(description='补全Excel文件中的提问内容') - parser.add_argument('-i', '--input', type=str, default="/data/Rag2_0/data/excel/历史提问数据(dislike).xlsx", - help='输入Excel文件路径') - parser.add_argument('-o', '--output', type=str, default="/data/Rag2_0/data/excel/补全后的提问数据.xlsx", - help='输出Excel文件路径') - parser.add_argument('-q', '--question', type=str, default="提问", - help='提问列的名称') - parser.add_argument('-a', '--answer', type=str, default="回答", - help='回答列的名称') - parser.add_argument('-w', '--workers', type=int, default=50, - help='最大线程数') - - args = parser.parse_args() - - # 创建提问补全工具实例 - completer = QuestionCompleter( - input_path=args.input, - output_path=args.output, - question_column=args.question, - answer_column=args.answer, - max_workers=args.workers - ) - - # 执行处理 - completer.process() - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/rag2_0/demo/dialogue_to_workorder.py b/rag2_0/demo/dialogue_to_workorder.py new file mode 100644 index 0000000..616705d --- /dev/null +++ b/rag2_0/demo/dialogue_to_workorder.py @@ -0,0 +1,486 @@ +import os +import json +import pandas as pd +from datetime import datetime +import time +import concurrent.futures +from functools import wraps +from pydantic import BaseModel, Field +from langchain.output_parsers import PydanticOutputParser +from rag2_0.tool.ModelTool import OpenAiLLM +from dotenv import load_dotenv + +load_dotenv() + +# ================ 模型定义 ================ +class UserQuestionAndSolution(BaseModel): + user_question: str = Field(description="客户问题") + solution: str = Field(description="坐席提供的解决方案") + +class QuestionType(BaseModel): + question_type: str = Field(description="问题类型") + +class IsComplaint(BaseModel): + is_dissatisfaction: bool = Field(description="是否抱怨") + dissatisfaction_level: str = Field(description="抱怨级别") + dissatisfaction_reasoning: str = Field(description="抱怨原因") + is_complaint: bool = Field(description="是否明确/暗示将进行投诉") + +class ProductNameAndModuleName(BaseModel): + product_name: str = Field(description="产品名称") + module_name: str = Field(description="模块名称") + +class ProductLine(BaseModel): + product_line: str = Field(description="产品线") + +# 初始化输出解析器 +user_question_and_solution_parser = PydanticOutputParser(pydantic_object=UserQuestionAndSolution) +question_type_parser = PydanticOutputParser(pydantic_object=QuestionType) +is_complaint_parser = PydanticOutputParser(pydantic_object=IsComplaint) +product_name_and_module_name_parser = PydanticOutputParser(pydantic_object=ProductNameAndModuleName) +product_line_parser = PydanticOutputParser(pydantic_object=ProductLine) + +# ================ LLM配置 ================ +def retry_llm_call(max_retries=3, delay=2): + """ + 重试装饰器,用于LLM调用失败时进行重试 + + 参数: + max_retries: 最大重试次数 + delay: 重试间隔时间(秒) + """ + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + retries = 0 + last_exception = None + + while retries < max_retries: + try: + return func(*args, **kwargs) + except Exception as e: + last_exception = e + retries += 1 + print(f"LLM调用失败,正在进行第{retries}次重试: {str(e)}") + if retries < max_retries: + time.sleep(delay*retries) + + # 所有重试都失败后,抛出最后一次的异常 + print(f"LLM调用失败,已达到最大重试次数{max_retries}") + raise last_exception + + return wrapper + return decorator + +def get_llm_instance(): + """获取LLM实例""" + api_key = os.getenv("OPENAI_API_KEY") + base_url = os.getenv("OPENAI_API_BASE") + model_name = os.getenv("LLM_MODEL_NAME") + + llm_params = { + "temperature": 0.6, + "model": model_name, + "api_key": api_key, + "base_url": base_url + } + + return OpenAiLLM(**llm_params) + +# ================ 数据处理函数 ================ +def parse_product_detail_excel(file_path): + """解析产品详情Excel文件""" + df = pd.read_excel(file_path) + product_dict = {} + + for _, row in df.iterrows(): + product_line = str(row['产品线']).strip() if pd.notna(row['产品线']) else '' + product_name = str(row['产品名称']).strip() if pd.notna(row['产品名称']) else '' + module_name = str(row['模块名称']).strip() if pd.notna(row['模块名称']) else '' + + if product_line not in product_dict: + product_dict[product_line] = {} + if product_name not in product_dict[product_line]: + product_dict[product_line][product_name] = [] + product_dict[product_line][product_name].append(module_name) + + return product_dict + +def get_workorder_dict(rows): + """从会话行中提取工单基本信息""" + workorder_dict = {} + + # 创建时间 + for row in rows: + create_time = row['创建时间'] + if pd.notna(create_time) and str(create_time).strip() != '': + workorder_dict["创建时间"] = create_time + break + + # 处理坐席 + for row in rows: + sender = row['消息发送者'] + sender_nickname = row['发送者昵称'] + if sender == "坐席" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '': + workorder_dict["处理坐席"] = sender_nickname + break + + # 访客昵称 + for row in rows: + sender = row['消息发送者'] + sender_nickname = row['发送者昵称'] + if sender == "访客" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '': + workorder_dict["访客昵称"] = sender_nickname + break + + # 会话id + for row in rows: + conversation_id = row['会话id'] + if pd.notna(conversation_id) and str(conversation_id).strip() != '': + workorder_dict["会话id"] = conversation_id + break + + # 工单编号 - 将"创建时间"作为工单编号,格式化为20250513104124 + if "创建时间" in workorder_dict and pd.notna(workorder_dict["创建时间"]): + create_time_str = str(workorder_dict["创建时间"]).strip() + dt = datetime.strptime(create_time_str, "%Y-%m-%d %H:%M:%S") + workorder_dict["工单编号"] = dt.strftime("%Y%m%d%H%M%S") + + return workorder_dict + +def get_dialogue_str(conversation_rows): + """从会话行中提取对话内容""" + dialogue = [] + + for row in conversation_rows: + sender = row.get('消息发送者', '') + content = str(row.get('消息内容', '')).strip() + + # 处理非文本内容 + if content == '' or pd.isna(row["消息内容"]): + if str(row.get('图片', '')).strip() != '': + content = '[图片]' + elif str(row.get('附件', '')).strip() != '': + content = '[附件]' + elif str(row.get('视频', '')).strip() != '': + content = '[视频]' + elif str(row.get('语音', '')).strip() != '': + content = '[语音]' + + # 添加对话内容 + if sender == '访客': + dialogue.append(f"访客:{content}") + elif sender == '坐席': + dialogue.append(f"坐席:{content}") + + return '\n'.join(dialogue) + +def group_conversations_by_id(df): + """将数据按会话ID分组""" + conversation_dict = {} + + for index, row in df.iterrows(): + conversation_id = row['会话id'] + if pd.notna(conversation_id) and str(conversation_id).strip() != '': + if conversation_id in conversation_dict: + conversation_dict[conversation_id].append(row.to_dict()) + else: + conversation_dict[conversation_id] = [row.to_dict()] + + return conversation_dict + +# ================ LLM分析函数 ================ +@retry_llm_call(max_retries=3, delay=2) +def get_user_question_and_solution(conversation_rows): + """分析用户问题和解决方案""" + dialogue_str = get_dialogue_str(conversation_rows) + + prompt = f""" +请从以下电力造价相关的客服对话记录中,精准提取用户提出的专业问题及对应坐席提供的解决方案。要求: + +1. 专业识别: +- 重点识别电力工程领域的专业术语(如:定额套用、工程量清单、概预算编制、造价指标分析等) +- 注意区分不同业务场景(输变电工程、配网改造、新能源项目等) +- 识别政策文件引用(如:国网Q/GDW 11337-2014标准) + +2. 信息提取: +用户问题提取: +- 核心诉求(成本核算/计价争议/软件操作等) +- 涉及的专业环节(设计概算/施工图预算/竣工结算) +- 具体技术参数(电压等级/线路长度/设备型号) + +坐席解决方案提取: +- 提供的计算方法(单位工程法/实物量法) +- 推荐的计价依据(电力建设工程定额2018版) +- 指导的软件操作步骤(博微软件操作) +- 政策法规应用建议 +- 文件模板提供情况 + +3. 结构化输出: +{user_question_and_solution_parser.get_format_instructions()} +访客与坐席的对话记录如下: +{dialogue_str} + """ + + llm = get_llm_instance() + response = llm.invoke(user_prompt=prompt) + user_question_and_solution = user_question_and_solution_parser.parse(response.content) + + return user_question_and_solution.user_question, user_question_and_solution.solution + +@retry_llm_call(max_retries=3, delay=2) +def get_product_name_and_module_name(product_line, conversation_rows, product_detail_dict): + """分析产品名称和模块名称""" + if product_line == '': + return '', '' + + json_str = json.dumps(product_detail_dict[product_line]) + dialogue_str = get_dialogue_str(conversation_rows) + + prompt = f""" +请根据以下对话内容分析所属产品名称和模块名称,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。 + +输出格式: +{product_name_and_module_name_parser.get_format_instructions()} + +产品名称列表及模块名称列表: +{json_str} + +对话记录: +{dialogue_str} + """ + + llm = get_llm_instance() + response = llm.invoke(user_prompt=prompt) + product_name_and_module_name = product_name_and_module_name_parser.parse(response.content) + + return product_name_and_module_name.product_name, product_name_and_module_name.module_name + +@retry_llm_call(max_retries=3, delay=2) +def get_product_line(conversation_rows, product_detail_dict): + """分析产品线""" + dialogue_str = get_dialogue_str(conversation_rows) + product_line_list = list(product_detail_dict.keys()) + + prompt = f""" +请根据以下对话内容分析所属产品线,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。 +无法判断时,返回空字符串。即product_line="" + +输出格式: +{product_line_parser.get_format_instructions()} + +产品线列表: +{product_line_list} + +对话记录: +{dialogue_str} + """ + + llm = get_llm_instance() + response = llm.invoke(user_prompt=prompt) + product_line = product_line_parser.parse(response.content) + + return product_line.product_line + +@retry_llm_call(max_retries=3, delay=2) +def get_problem_type(conversation_rows): + """分析问题类型""" + dialogue_str = get_dialogue_str(conversation_rows) + + prompt = f""" +请根据以下对话内容分析所属业务类别,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。 + +分类体系: +1. 软件需求 - 涉及功能新增/改进建议、系统集成需求等 +2. 数据问题 - 数据导入导出、格式转换、计算异常等问题 +3. 专业咨询 - 造价计算标准、定额套用、行业规范解读等 +4. 功能操作 - 软件功能使用步骤、界面操作指导 +5. 故障排查 - 系统崩溃、性能问题、兼容性故障 +6. 培训支持 - 功能教学、操作手册获取、培训需求 +7. 计价依据 - 定额库更新、材料价格库维护、地方标准差异 +9. 其他 - 无法归类的对话内容 + +处理要求: +1. 分析时注意识别电力行业特有表述(如"杆塔组立"、"电缆敷设"等专业工序) +2. 区分操作类问题与技术故障(如"报表导出失败"需区分是操作错误还是系统错误) +3. 对涉及专业计算的咨询,需判断是否属于常规咨询(专业咨询)或系统计算异常(数据问题) +4. 对模糊表述要求追问的场景归入培训支持类 + +输出格式: +{question_type_parser.get_format_instructions()} +对话记录如下: +{dialogue_str} + """ + + llm = get_llm_instance() + response = llm.invoke(user_prompt=prompt) + question_type = question_type_parser.parse(response.content) + + return question_type.question_type + +@retry_llm_call(max_retries=3, delay=2) +def get_is_complaint_and_is_complaint_level(conversation_rows): + """分析是否抱怨及抱怨级别""" + dialogue_str = get_dialogue_str(conversation_rows) + + prompt = f""" +请根据以下对话记录分析访客情绪是否对博微软件或者坐席服务存在明显抱怨,并按照以下结构输出JSON格式分析结果: + +1. 抱怨识别:判断访客是否对博微软件功能或者坐席服务存在明显抱怨语气或词语 +2. 抱怨分级(如存在抱怨): + - 一般抱怨:对博微软件功者坐席服务存在轻微不满但情绪稳定 + - 中等抱怨:对博微软件或者坐席服务明确表达不满并提出具体问题 + - 严重抱怨:对博微软件或者坐席服务使用激烈言辞或威胁性语言 + - 抗议行为:明确表示投诉/退费/法律手段 +3. 投诉倾向:是否明确/暗示将进行投诉 +4. 抱怨对象:坐席服务态度/业务能力 或 博微功能问题(注意忽略对非博微软件或坐席的抱怨) +5. 内容摘录:标注具体抱怨语句 +6. 分析理由:结合语义与上下文的判断依据 + +示例输出: +{{ + "is_dissatisfaction": true, + "dissatisfaction_level": "严重抱怨", + "dissatisfaction_reasoning": "博微软件缺陷导致实际损失", + "is_complaint": "true" +}} + +输出格式要求: +{is_complaint_parser.get_format_instructions()} + +当前对话记录: +{dialogue_str} + +附加分析要求: +1. 区分客观问题描述与主观情绪表达 +2. 注意抱怨升级趋势(如从一般抱怨发展为严重抗议) +3. 关注非文本线索(如有记录可分析语气词、停顿等副语言特征) +4. 标注涉及多个抱怨对象的情况 + """ + + llm = get_llm_instance() + response = llm.invoke(user_prompt=prompt) + is_complaint = is_complaint_parser.parse(response.content) + + return (is_complaint.is_dissatisfaction, + is_complaint.dissatisfaction_level, + is_complaint.dissatisfaction_reasoning, + is_complaint.is_complaint) + +# ================ 主流程处理 ================ +def process_conversation(conversation_id, conversation_rows, product_detail_dict): + """处理单个会话的函数,用于多线程并发""" + # 获取工单基本信息 + workorder_dict = get_workorder_dict(conversation_rows) + + # 分析是否抱怨、是否投诉、抱怨级别 + is_dissatisfaction, dissatisfaction_level, dissatisfaction_reasoning, is_complaint = ( + get_is_complaint_and_is_complaint_level(conversation_rows)) + + # 分析用户问题和解决方案 + user_question, solution = get_user_question_and_solution(conversation_rows) + + # 分析问题类型 + problem_type = get_problem_type(conversation_rows) + + # 分析产品线 + product_line = get_product_line(conversation_rows, product_detail_dict) + # 分析产品名称和模块名称 + if product_line != '': + product_name, module_name = get_product_name_and_module_name( + product_line, conversation_rows, product_detail_dict) + else: + product_name = '' + module_name = '' + + # 更新工单字典 + workorder_dict.update({ + "产品线": product_line, + "产品名称": product_name, + "模块名称": module_name, + "客户问题": user_question, + "问题类型": problem_type, + "是否抱怨": "是" if is_dissatisfaction else '否', + "抱怨级别": dissatisfaction_level, + "是否投诉": "是" if is_complaint else '否', + "解决方案": (solution + '\n存在抱怨:' + dissatisfaction_reasoning) if is_dissatisfaction else solution + }) + + return workorder_dict + +def analyze_conversation_data(conversation_excel_path, product_detail_excel_path, max_workers=4): + """分析会话数据主流程,使用多线程并发处理""" + # 读取Excel文件 + df = pd.read_excel(conversation_excel_path) + + # 检查数据框的列 + print(f"Excel文件列名: {df.columns.tolist()}") + print(f"数据总行数: {len(df)}") + + # 解析产品详情 + product_detail_dict = parse_product_detail_excel(product_detail_excel_path) + + # 按会话ID分组 + conversation_dict = group_conversations_by_id(df) + + # 使用线程池处理每个会话 + workorder_dict_list = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + # 创建任务 + future_to_conversation = { + executor.submit(process_conversation, conversation_id, conversation_rows, product_detail_dict): conversation_id + for conversation_id, conversation_rows in conversation_dict.items() + } + + # 获取结果 + for future in concurrent.futures.as_completed(future_to_conversation): + conversation_id = future_to_conversation[future] + try: + workorder_dict = future.result() + workorder_dict_list.append(workorder_dict) + print(f"完成处理会话ID: {conversation_id}") + except Exception as exc: + print(f"处理会话ID: {conversation_id} 时发生错误: {exc}") + + return workorder_dict_list + +def save_results_to_excel(workorder_dict_list, output_file='workorder_result.xlsx'): + """将结果保存到Excel文件""" + result_df = pd.DataFrame(workorder_dict_list) + + # 按照指定的列顺序重新排列DataFrame的列 + columns_order = [ + '工单编号', '产品线', '产品名称', '模块名称', '问题类型', + '客户问题', '解决方案', '是否抱怨', '是否投诉', '抱怨级别', + '会话id', '访客昵称', '处理坐席', '创建时间' + ] + + # 确保所有列都存在,如果不存在则添加空列 + for col in columns_order: + if col not in result_df.columns: + result_df[col] = None + + # 按指定顺序重排列 + result_df = result_df[columns_order] + + # 保存到Excel文件 + result_df.to_excel(output_file, index=False) + print(f"结果已保存到 {output_file}") + +# ================ 主程序入口 ================ +def main(): + """主程序入口""" + # 文件路径 + conversation_excel_path = os.path.join('data', 'excel', '会话内容详情20250527173330.xlsx') + product_detail_excel_path = os.path.join('data', 'excel', '产品详情20250527175736.xlsx') + + # 分析会话数据,设置并发线程数 + max_workers = 8 # 可以根据CPU核心数和任务特性调整 + workorder_dict_list = analyze_conversation_data(conversation_excel_path, product_detail_excel_path, max_workers) + + # 保存结果 + save_results_to_excel(workorder_dict_list) + +if __name__ == "__main__": + main() + diff --git a/rag2_0/demo/judge_answer_right.py b/rag2_0/demo/judge_answer_right.py deleted file mode 100644 index 65e132b..0000000 --- a/rag2_0/demo/judge_answer_right.py +++ /dev/null @@ -1,293 +0,0 @@ -""" -答案正确性评判工具 - -此模块用于评判问题的新旧回答是否正确,通过与标准答案(Wiki内容)进行比较, -或者在没有标准答案的情况下比较新旧回答的差异。 - -用法示例: - judge = AnswerCorrectnessJudge() - judge.process() -""" - -import pandas as pd -from urllib.parse import unquote -from rag2_0.tool.WikijsTool import WikijsTool -from rag2_0.tool.html_to_md import convert_html_to_md -from rag2_0.tool.ModelTool import OpenAiLLM -from dotenv import load_dotenv -import os -from tqdm import tqdm -load_dotenv() - -class AnswerCorrectnessJudge: - """ - 答案正确性评判工具类 - - 用于评估问题的新旧回答是否正确,可以通过与标准答案(Wiki内容)进行比较, - 或者在没有标准答案的情况下比较新旧回答的差异。 - """ - - def __init__(self, wiki_excel_path="/data/Rag2_0/data/excel/部分提问_软件名称明确.xlsx", - answer_excel_path="/data/Rag2_0/data/excel/主网软件提问_对比结果.xlsx", - output_path="/data/Rag2_0/data/excel/主网软件提问回答_判断结果.xlsx"): - """ - 初始化答案正确性评判工具 - - 参数: - wiki_excel_path (str): Wiki Excel文件路径 - answer_excel_path (str): 答案对比Excel文件路径 - output_path (str): 输出Excel文件路径 - """ - self.wiki_excel_path = wiki_excel_path - self.answer_excel_path = answer_excel_path - self.output_path = output_path - - # 读取Excel文件 - self.wiki_excel = pd.read_excel(self.wiki_excel_path) - self.answer_excel = pd.read_excel(self.answer_excel_path) - - # 初始化LLM - self.api_key = os.getenv("OPENAI_API_KEY") - self.base_url = os.getenv("OPENAI_API_BASE") - self.model = os.getenv("LLM_MODEL_NAME") - - if not all([self.api_key, self.base_url, self.model]): - raise ValueError("请设置 OPENAI_API_KEY, OPENAI_API_BASE, 和 LLM_MODEL_NAME 环境变量") - - self.openai_llm = OpenAiLLM(api_key=self.api_key, base_url=self.base_url, model=self.model) - - def find_wiki_link(self, query) -> str | None: - """ - 根据查询(对应wiki_excel中的新提问列)找出对应的词条链接 - - 参数: - query (str): 查询内容,对应wiki_excel中的新提问列 - - 返回: - str: 对应的词条链接,如果没有找到则返回None - """ - # 确保query不为空 - if not query or pd.isna(query): - return None - - # 在"新提问"列中查找匹配的行 - matched_rows = self.wiki_excel[self.wiki_excel['新提问'] == query] - - # 如果找到了匹配的行,返回对应的词条链接 - if not matched_rows.empty: - return matched_rows.iloc[0]['对应词条链接'] - - # 如果没有完全匹配,尝试部分匹配 - # 去除软件名称部分(如果有) - query_parts = query.split(',', 1) - if len(query_parts) > 1: - clean_query = query_parts[1].strip() - - # 在"提问"列中查找包含清理后查询的行 - for idx, row in self.wiki_excel.iterrows(): - if pd.notna(row['提问']) and clean_query in row['提问']: - return row['对应词条链接'] - - return None - - def get_wiki_content(self, link) -> str: - """ - 获取词条链接的内容 - - 参数: - link (str): 词条链接 - - 返回: - str: 链接内容,如果获取失败则返回错误信息 - """ - try: - if not link or pd.isna(link): - return "链接为空或无效" - # 移除域名部分,只保留路径 - path = link.split('/', 3)[-1] - decoded_path = unquote(path) - path_parts = decoded_path.split('/') - doc_path = "/".join(path_parts[1:]) - wiki_doc = WikijsTool.get_all_doc_by_path(path=doc_path, path_is_dir=False) - html_content = WikijsTool.query_doc_info(wiki_doc[0]["id"]).get('content') - if not html_content: - return "获取内容失败" - - options = {"heading_style": '', "keep_inline_images_in": ["figure", "img"], "escape_asterisks": True} - new_content = (html_content.replace("h6>", "h7>") - .replace("h5>", "h6>") - .replace("h4>", "h5>") - .replace("h3>", "h4>") - .replace("h2>", "h3>") - .replace("h1>", "h2>")) - # 将HTML内容转换为Markdown - markdown_content = convert_html_to_md(new_content, "", **options) - markdown_content = f"# {path_parts[-1]}\n\n{markdown_content}" - return markdown_content - - except Exception as e: - raise RuntimeError(f"获取词条内容失败: {str(e)}") from e - - def create_prompt(self, standard_answer: str, answer_to_check: str) -> str: - """ - 创建用于评判答案的prompt - - 参数: - standard_answer (str): 标准答案 - answer_to_check (str): 需要检查的答案 - - 返回: - str: 格式化的prompt - """ - return f"""请作为一个专业的答案评判专家,评估以下回答与标准答案的匹配程度。 - -标准答案: -{standard_answer} - -待评估的回答: -{answer_to_check} - -请仔细分析两个答案的内容,并给出你的判断。只需要回答"正确"或"错误",不需要其他解释。 -如果待评估的回答与标准答案在核心内容和关键信息(步骤)上一致,即使表达方式不同,也应判定为"正确"。 -如果待评估的回答存在明显的错误信息或重要信息缺失,应判定为"错误"。 - -请严格按以下格式输出:【正确】或【错误】:""" - - def judge_old_answer(self, standard_answer: str, old_answer: str) -> bool | None: - """ - 调用LLM判断旧回答是否正确 - - 参数: - standard_answer (str): 标准答案(来自Wiki) - old_answer (str): 旧流程的回答 - - 返回: - bool | None: 判断结果,True表示正确,False表示错误,None表示判断失败 - """ - prompt = self.create_prompt(standard_answer, old_answer) - try: - response = self.openai_llm.invoke(prompt) - return "正确" in response.content - except Exception as e: - return None - - def judge_new_answer(self, standard_answer: str, new_answer: str) -> bool | None: - """ - 调用LLM判断新回答是否正确 - - 参数: - standard_answer (str): 标准答案(来自Wiki) - new_answer (str): 新流程的回答 - - 返回: - bool | None: 判断结果,True表示正确,False表示错误,None表示判断失败 - """ - prompt = self.create_prompt(standard_answer, new_answer) - try: - response = self.openai_llm.invoke(prompt) - return "正确" in response.content - except Exception as e: - return None - - def judge_by_standard_answer(self, standard_answer: str, old_answer: str, new_answer: str) -> str | None: - """ - 综合判断新旧回答的正确性 - - 参数: - standard_answer (str): 标准答案(来自Wiki) - old_answer (str): 旧流程的回答 - new_answer (str): 新流程的回答 - - 返回: - str | None: 包含新旧回答判断结果的字符串,None表示判断失败 - """ - old_result = self.judge_old_answer(standard_answer, old_answer) - new_result = self.judge_new_answer(standard_answer, new_answer) - if old_result is None or new_result is None: - return None - if new_result and old_result: - return "新旧答案均正确" - elif new_result and not old_result: - return "新答案正确" - elif not new_result and old_result: - return "旧答案正确" - else: - return "新旧答案均错误" - - def judge_answer_diff(self, old_answer: str, new_answer: str) -> str | None: - """ - 判断新旧回答是否存在较大差异 - - 参数: - old_answer (str): 旧流程的回答 - new_answer (str): 新流程的回答 - - 返回: - str | None: 差异判断结果,None表示判断失败 - """ - - prompt = f"""请判断以下两个回答是否存在较大差异: - - 旧回答: {old_answer} - - 新回答: {new_answer} - - 主要是关键步骤、关键信息、或者关键主体的差异 - 请仅回答"存在较大差异"或"差异较小"。""" - - try: - response = self.openai_llm.invoke(prompt) - return "无法判断,新老答案差异较大" if "存在较大差异" in response.content else "无法判断,新老答案基本相同" - except Exception as e: - return None - - def process(self): - """ - 处理所有问题并评判答案正确性 - - 读取Excel文件中的问题和答案,进行评判,并将结果保存到输出Excel文件 - """ - # 创建结果列表 - results = [] - - # 读取Excel文件 - for idx, row in tqdm(self.answer_excel.iterrows(), total=len(self.answer_excel), desc="处理问题"): - query = row["问题"] - old_answer = row["旧流程答案"] - new_answer = row["新流程答案"] - standard_answer = "" - - try: - wiki_url = self.find_wiki_link(query) - if wiki_url and not pd.isna(wiki_url): - standard_answer = self.get_wiki_content(wiki_url) - except Exception as e: - print(f"处理问题 '{query}' 时发生错误: {str(e)}") - - if standard_answer: - # 判断答案正确性 - judge_result = self.judge_by_standard_answer(standard_answer, old_answer, new_answer) - else: - judge_result = self.judge_answer_diff(old_answer, new_answer) - - if judge_result is None: - judge_result = "" - - results.append({ - "问题": query, - "旧流程答案": old_answer, - "新流程答案": new_answer, - "判断结果": judge_result - }) - - # 将结果转换为DataFrame并保存 - results_df = pd.DataFrame(results) - results_df.to_excel(self.output_path, index=False) - print(f"处理完成,共处理 {len(results)} 条记录,结果已保存至 {self.output_path}") - -# 测试函数 -if __name__ == "__main__": - # 创建答案正确性评判工具实例 - judge = AnswerCorrectnessJudge() - # 执行处理 - judge.process() \ No newline at end of file diff --git a/rag2_0/demo/judge_judge_answer.py b/rag2_0/demo/judge_judge_answer.py new file mode 100644 index 0000000..389847b --- /dev/null +++ b/rag2_0/demo/judge_judge_answer.py @@ -0,0 +1,462 @@ +""" +综合评判工具 + +此模块结合了答案正确性评判和检索内容相关性评分功能,可以同时: +1. 评判问题的新旧回答是否正确 +2. 比较新旧回答的差异 +3. 评估检索内容与问题的相关性 + +用法示例: + judge = CombinedJudge() + judge.process() +""" + +import pandas as pd +from urllib.parse import unquote +from rag2_0.tool.WikijsTool import WikijsTool +from rag2_0.tool.html_to_md import convert_html_to_md +from rag2_0.tool.ModelTool import OpenAiLLM +from dotenv import load_dotenv +import os +from tqdm import tqdm +from rag2_0.dify.dify_tool import DifyTool +import json +from pydantic import BaseModel, Field +from langchain.output_parsers import PydanticOutputParser +import concurrent.futures +from threading import Lock +load_dotenv() + +class ContentSource(BaseModel): + score:int = Field(description="相关性分数") + reason:str = Field(description="评分理由") + +class CombinedJudge: + """ + 综合评判工具类 + + 结合了答案正确性评判和检索内容相关性评分功能 + """ + + def __init__(self, wiki_excel_path="/data/QueryRewrite/data/excel/部分提问_软件名称明确.xlsx", + answer_excel_path="/data/QueryRewrite/data/excel/dify问答_对比结果.xlsx", + output_path="/data/QueryRewrite/data/excel/dify问答__综合评判结果.xlsx", + dify_appid="ccf92b97-2789-4a3f-90e0-135a869a37c5", + max_workers=10): + """ + 初始化综合评判工具 + + 参数: + wiki_excel_path (str): Wiki Excel文件路径 + answer_excel_path (str): 答案对比Excel文件路径 + output_path (str): 输出Excel文件路径 + dify_appid (str): Dify应用ID + max_workers (int): 最大工作线程数 + """ + self.wiki_excel_path = wiki_excel_path + self.answer_excel_path = answer_excel_path + self.output_path = output_path + self.dify_appid = dify_appid + self.max_workers = max_workers + self.content_source_parser = PydanticOutputParser(pydantic_object=ContentSource) + self.results_lock = Lock() + + # 读取Excel文件 + if os.path.exists(wiki_excel_path): + self.wiki_excel = pd.read_excel(self.wiki_excel_path) + else: + self.wiki_excel = None + self.answer_excel = pd.read_excel(self.answer_excel_path) + + # 初始化LLM + self.api_key = os.getenv("OPENAI_API_KEY") + self.base_url = os.getenv("OPENAI_API_BASE") + self.model = os.getenv("LLM_MODEL_NAME") + + if not all([self.api_key, self.base_url, self.model]): + raise ValueError("请设置 OPENAI_API_KEY, OPENAI_API_BASE, 和 LLM_MODEL_NAME 环境变量") + + self.llm = OpenAiLLM(api_key=self.api_key, base_url=self.base_url, model=self.model) + + def find_wiki_link(self, query) -> str | None: + """ + 根据查询(对应wiki_excel中的新提问列)找出对应的词条链接 + + 参数: + query (str): 查询内容,对应wiki_excel中的新提问列 + + 返回: + str: 对应的词条链接,如果没有找到则返回None + """ + # 确保query不为空 + if not query or pd.isna(query): + return None + if self.wiki_excel is None: + return None + + # 在"新提问"列中查找匹配的行 + matched_rows = self.wiki_excel[self.wiki_excel['新提问'] == query] + + # 如果找到了匹配的行,返回对应的词条链接 + if not matched_rows.empty: + return matched_rows.iloc[0]['对应词条链接'] + + # 如果没有完全匹配,尝试部分匹配 + # 去除软件名称部分(如果有) + query_parts = query.split(',', 1) + if len(query_parts) > 1: + clean_query = query_parts[1].strip() + + # 在"提问"列中查找包含清理后查询的行 + for idx, row in self.wiki_excel.iterrows(): + if pd.notna(row['提问']) and clean_query in row['提问']: + return row['对应词条链接'] + + return None + + def get_wiki_content(self, link) -> str: + """ + 获取词条链接的内容 + + 参数: + link (str): 词条链接 + + 返回: + str: 链接内容,如果获取失败则返回错误信息 + """ + try: + if not link or pd.isna(link): + return "链接为空或无效" + # 移除域名部分,只保留路径 + path = link.split('/', 3)[-1] + decoded_path = unquote(path) + path_parts = decoded_path.split('/') + doc_path = "/".join(path_parts[1:]) + wiki_doc = WikijsTool.get_all_doc_by_path(path=doc_path, path_is_dir=False) + html_content = WikijsTool.query_doc_info(wiki_doc[0]["id"]).get('content') + if not html_content: + return "获取内容失败" + + options = {"heading_style": '', "keep_inline_images_in": ["figure", "img"], "escape_asterisks": True} + new_content = (html_content.replace("h6>", "h7>") + .replace("h5>", "h6>") + .replace("h4>", "h5>") + .replace("h3>", "h4>") + .replace("h2>", "h3>") + .replace("h1>", "h2>")) + # 将HTML内容转换为Markdown + markdown_content = convert_html_to_md(new_content, "", **options) + markdown_content = f"# {path_parts[-1]}\n\n{markdown_content}" + return markdown_content + + except Exception as e: + raise RuntimeError(f"获取词条内容失败: {str(e)}") from e + + def get_wiki_title(self, link) -> str | None: + """ + 获取词条标题 + + 参数: + link (str): 词条链接 + + 返回: + str: 词条标题,如果获取失败则返回None + """ + try: + if not link or pd.isna(link): + return None + # 移除域名部分,只保留路径 + path = link.split('/', 3)[-1] + decoded_path = unquote(path) + path_parts = decoded_path.split('/') + return path_parts[-1] + + except Exception as e: + raise RuntimeError(f"获取词条内容失败: {str(e)}") from e + + def create_correctness_prompt(self, standard_answer: str, answer_to_check: str) -> str: + """ + 创建用于评判答案正确性的prompt + + 参数: + standard_answer (str): 标准答案 + answer_to_check (str): 需要检查的答案 + + 返回: + str: 格式化的prompt + """ + return f"""请作为一个专业的答案评判专家,评估以下回答与标准答案的匹配程度。 + +标准答案: +{standard_answer} + +待评估的回答: +{answer_to_check} + +请仔细分析两个答案的内容,并给出你的判断。只需要回答"正确"或"错误",不需要其他解释。 +如果待评估的回答与标准答案在核心内容和关键信息(步骤)上一致,即使表达方式不同,也应判定为"正确"。 +如果待评估的回答存在明显的错误信息或重要信息缺失,应判定为"错误"。 + +请严格按以下格式输出:【正确】或【错误】:""" + + def judge_answer(self, standard_answer: str, answer: str) -> bool | None: + """ + 调用LLM判断回答是否正确 + + 参数: + standard_answer (str): 标准答案(来自Wiki) + answer (str): 需评判的回答 + + 返回: + bool | None: 判断结果,True表示正确,False表示错误,None表示判断失败 + """ + prompt = self.create_correctness_prompt(standard_answer, answer) + try: + response = self.llm.invoke(user_prompt=prompt, need_retry=True) + return "正确" in response.content + except Exception as e: + return None + + def judge_by_standard_answer(self, standard_answer: str, old_answer: str, new_answer: str) -> str | None: + """ + 综合判断新旧回答的正确性 + + 参数: + standard_answer (str): 标准答案(来自Wiki) + old_answer (str): 旧流程的回答 + new_answer (str): 新流程的回答 + + 返回: + str | None: 包含新旧回答判断结果的字符串,None表示判断失败 + """ + old_result = self.judge_answer(standard_answer, old_answer) + new_result = self.judge_answer(standard_answer, new_answer) + if old_result is None or new_result is None: + return None + if new_result and old_result: + return "新旧答案均正确" + elif new_result and not old_result: + return "新答案正确" + elif not new_result and old_result: + return "旧答案正确" + else: + return "新旧答案均错误" + + def judge_answer_diff(self, old_answer: str, new_answer: str) -> str | None: + """ + 判断新旧回答是否存在较大差异 + + 参数: + old_answer (str): 旧流程的回答 + new_answer (str): 新流程的回答 + + 返回: + str | None: 差异判断结果,None表示判断失败 + """ + + prompt = f"""请判断以下两个回答是否存在较大差异: + + 旧回答: {old_answer} + + 新回答: {new_answer} + + 主要是主要步骤、主要信息、或者主要主体的差异 + 请仅回答"存在较大差异"或"差异较小"。""" + + try: + response = self.llm.invoke(user_prompt=prompt, need_retry=True) + return "缺乏标准答案无法判断准确性,但答案差异较大" if "存在较大差异" in response.content else "缺乏标准答案无法判断准确性,但答案基本相同" + except Exception as e: + return None + + def calculate_score(self, query:str, content:str) -> int: + """ + 使用LLM判断query与content之间的相关性分数 + + 参数: + query (str): 用户问题 + content (str): 检索内容 + + 返回: + int: 相关性分数,1-10分,10代表完全相关,1代表完全不相关;-1表示评分失败 + """ + try: + prompt = f"""你是一个专业的信息相关性评估助手。请根据以下标准对用户query和检索内容的相关性进行1-10评分(10=完全相关,1=完全不相关),并按指定格式输出JSON结果。 + +【评分标准】 +10分:完全契合,主题/意图完全一致且涵盖所有关键信息 +8-9分:高度相关,核心要素匹配但存在少量信息缺失 +6-7分:部分相关,涉及相同主题但存在重要信息缺失 +4-5分:弱相关,仅次要信息点匹配 +1-3分:完全不相关或信息冲突 + +【评估维度】 +1. 主题一致性:核心主题/意图的匹配程度 +2. 内容覆盖度:是否涵盖query的关键要素 +3. 信息准确性:是否存在矛盾/错误信息 +4. 细节丰富度:是否提供query要求的详细信息 + +【输出格式】 +{{ + "score": 评分, + "reason": "简明扼要的评分理由(中文)" +}} + +【示例】 +query: "新冠疫苗的常见副作用" +内容: "辉瑞疫苗常见反应包括注射部位疼痛(84.1%)、疲劳(62.9%)" +输出: {{"score":8,"reason":"主题完全匹配,涵盖主要副作用但未提及发热等常见反应"}} + +现在评估: +query: "{query}" +content: "{content}" +""" + + response = self.llm.invoke(user_prompt=prompt, need_retry=True) + + # 解析JSON响应 + try: + parsed_output = self.content_source_parser.parse(response.content) + return parsed_output.score + except Exception as e: + return -1 + except Exception as e: + return -1 + + def get_retrieve_info(self, query:str, outputs:dict) -> tuple: + """ + 获取检索信息并计算分数 + + 参数: + query (str): 用户问题 + outputs (dict): 检索输出结果 + + 返回: + tuple: (检索内容列表, 最高分, 最低分, 平均分) + """ + max_score = 0 + min_score = 10 + total_score = 0 + valid_scores = 0 + retrieve_content = [] + + for result in outputs["result"]: + content = result["content"].strip() + score = self.calculate_score(query=query, content=content) + if score != -1: + max_score = max(max_score, score) + min_score = min(min_score, score) + total_score += score + valid_scores += 1 + content_title = content.split("\n")[0] + if content_title: + retrieve_content.append(content_title + f"--得分({score}分)") + + avg_score = total_score / valid_scores if valid_scores > 0 else 0 + return retrieve_content, max_score, min_score, avg_score + + def process_single_question(self, row): + """ + 处理单个问题的评判 + + 参数: + row: DataFrame中的一行数据 + + 返回: + dict: 包含处理结果的字典 + """ + query = row["问题"] + old_answer = row["旧流程答案"] + new_answer = row["新流程答案"] + + # 获取词条链接和标准答案 + wiki_url = self.find_wiki_link(query) + standard_answer = "" + answer_title = "" + + try: + if wiki_url and not pd.isna(wiki_url): + standard_answer = self.get_wiki_content(wiki_url) + answer_title = self.get_wiki_title(wiki_url) + except Exception as e: + print(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}") + + # 判断答案正确性 + if standard_answer: + judge_result = self.judge_by_standard_answer(standard_answer, old_answer, new_answer) + else: + judge_result = self.judge_answer_diff(old_answer, new_answer) + + if judge_result is None: + judge_result = "" + + # 获取检索内容评分 + retrieve_content = [] + max_score = 0 + min_score = 0 + avg_score = 0 + rewrite_query = "" + + try: + message_info = DifyTool.get_message_debug_info(appid=self.dify_appid, query=query) + for workflow_node in message_info["workflow_node_executions_info"]: + if workflow_node["title"] == "知识检索结果后处理": + outputs = json.loads(workflow_node["outputs"]) + retrieve_content, max_score, min_score, avg_score = self.get_retrieve_info(query=query, outputs=outputs) + elif workflow_node["title"] == "问题优化结果解析": + outputs = json.loads(workflow_node["outputs"]) + rewrite_query = outputs["optimize_query"] + except Exception as e: + print(f"处理问题 '{query}' 获取检索内容时发生错误: {str(e)}") + + # 返回结果 + return { + "问题": query, + "问题改写": rewrite_query, + "旧流程答案": old_answer, + "新流程答案": new_answer, + "回答判断": judge_result, + "答案词条": answer_title if answer_title else "", + "检索得到词条": "\n".join(retrieve_content) if retrieve_content else "未检索知识库", + } + + def process(self): + """ + 多线程处理所有问题并进行综合评判 + + 读取Excel文件中的问题和答案,使用多线程进行评判,并将结果保存到输出Excel文件 + """ + # 创建结果列表 + results = [] + + # 创建进度条 + with tqdm(total=len(self.answer_excel), desc="处理问题中") as pbar: + # 使用线程池执行任务 + with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: + # 提交所有任务 + future_to_row = {executor.submit(self.process_single_question, row): idx + for idx, row in self.answer_excel.iterrows()} + + # 处理完成的任务 + for future in concurrent.futures.as_completed(future_to_row): + idx = future_to_row[future] + try: + result = future.result() + with self.results_lock: + results.append(result) + except Exception as e: + print(f"处理第 {idx} 行时发生错误: {str(e)}") + finally: + pbar.update(1) + + # 将结果转换为DataFrame并保存 + results_df = pd.DataFrame(results) + results_df.to_excel(self.output_path, index=False) + print(f"处理完成,共处理 {len(results)} 条记录,结果已保存至 {self.output_path}") + +# 测试函数 +if __name__ == "__main__": + # 创建综合评判工具实例 + judge = CombinedJudge(max_workers=30) + # 执行处理 + judge.process() \ No newline at end of file diff --git a/rag2_0/demo/judge_query_full.py b/rag2_0/demo/judge_query_full.py index c43a1c7..645085b 100644 --- a/rag2_0/demo/judge_query_full.py +++ b/rag2_0/demo/judge_query_full.py @@ -32,8 +32,8 @@ import concurrent.futures import threading # 默认设置 -DEFAULT_EXCEL_PATH = r"/data/Rag2_0/data/excel/7000条对话数据.xlsx" -DEFAULT_OUTPUT_PATH = r"/data/Rag2_0/data/excel/7000条对话数据_完整问题结果.xlsx" +DEFAULT_EXCEL_PATH = r"/data/QueryRewrite/data/excel/7000条对话数据.xlsx" +DEFAULT_OUTPUT_PATH = r"/data/QueryRewrite/data/excel/7000条对话数据_完整问题结果.xlsx" DEFAULT_MAX_WORKERS = 50 diff --git a/rag2_0/demo/judge_retrieve_content_score.py b/rag2_0/demo/judge_retrieve_content_score.py deleted file mode 100644 index c05a884..0000000 --- a/rag2_0/demo/judge_retrieve_content_score.py +++ /dev/null @@ -1,239 +0,0 @@ -import pandas as pd -from urllib.parse import unquote -from rag2_0.tool.WikijsTool import WikijsTool -from rag2_0.tool.html_to_md import convert_html_to_md -from rag2_0.tool.ModelTool import OpenAiLLM -from dotenv import load_dotenv -import os -from tqdm import tqdm -from rag2_0.dify.dify_tool import DifyTool -import json -from pydantic import BaseModel, Field -from langchain.output_parsers import PydanticOutputParser -load_dotenv() - -class ContentSource(BaseModel): - score:int = Field(description="相关性分数") - reason:str = Field(description="评分理由") - -class RetrieveContentScoreJudge: - """ - 检索内容相关性评分工具类 - - 用于评估检索内容与问题之间的相关性,并计算相关性分数 - """ - - def __init__(self, wiki_excel_path, answer_excel_path, output_path=None): - """ - 初始化评分工具类 - - 参数: - wiki_excel_path (str): Wiki Excel文件路径 - answer_excel_path (str): 回答Excel文件路径 - output_path (str, optional): 输出Excel文件路径,默认为None - """ - self.content_source_parser = PydanticOutputParser(pydantic_object=ContentSource) - if os.path.exists(wiki_excel_path): - self.wiki_excel = pd.read_excel(wiki_excel_path) - else: - self.wiki_excel = None - self.answer_excel = pd.read_excel(answer_excel_path) - self.output_path = output_path or "/data/Rag2_0/data/excel/dify问答_检索内容评分.xlsx" - - # 从环境变量中获取OpenAI的配置 - self.api_key = os.getenv("OPENAI_API_KEY") - self.base_url = os.getenv("OPENAI_API_BASE") - self.model_name = os.getenv("LLM_MODEL_NAME") - - if not all([self.api_key, self.base_url, self.model_name]): - raise ValueError("请设置 OPENAI_API_KEY, OPENAI_API_BASE, 和 LLM_MODEL_NAME 环境变量") - - self.llm = OpenAiLLM(api_key=self.api_key, base_url=self.base_url, model=self.model_name) - - def find_wiki_link(self, query) -> str | None: - """ - 根据查询(对应wiki_excel中的新提问列)找出对应的词条链接 - - 参数: - query (str): 查询内容,对应wiki_excel中的新提问列 - - 返回: - str: 对应的词条链接,如果没有找到则返回None - """ - # 确保query不为空 - if not query or pd.isna(query): - return None - if self.wiki_excel is None: - return None - # 在"新提问"列中查找匹配的行 - matched_rows = self.wiki_excel[self.wiki_excel['新提问'] == query] - - # 如果找到了匹配的行,返回对应的词条链接 - if not matched_rows.empty: - return matched_rows.iloc[0]['对应词条链接'] - - # 如果没有完全匹配,尝试部分匹配 - # 去除软件名称部分(如果有) - query_parts = query.split(',', 1) - if len(query_parts) > 1: - clean_query = query_parts[1].strip() - - # 在"提问"列中查找包含清理后查询的行 - for idx, row in self.wiki_excel.iterrows(): - if pd.notna(row['提问']) and clean_query in row['提问']: - return row['对应词条链接'] - - return None - - def get_wiki_title(self, link) -> str | None: - """ - 获取词条标题 - - 参数: - link (str): 词条链接 - - 返回: - str: 词条标题,如果获取失败则返回None - """ - try: - if not link or pd.isna(link): - return None - # 移除域名部分,只保留路径 - path = link.split('/', 3)[-1] - decoded_path = unquote(path) - path_parts = decoded_path.split('/') - return path_parts[-1] - - except Exception as e: - raise RuntimeError(f"获取词条内容失败: {str(e)}") from e - - def calculate_score(self, answer:str, content:str) -> int: - """ - 使用OpenAiLLM通过LLM判断answer与content之间的相关性分数 - - 参数: - answer (str): 用户问题 - content (str): 检索内容 - - 返回: - int: 相关性分数,1-10分,10代表完全相关,1代表完全不相关;-1表示评分失败 - """ - try: - prompt = f"""你是一个专业的信息相关性评估助手。请根据以下标准对用户query和检索内容的相关性进行1-10评分(10=完全相关,1=完全不相关),并按指定格式输出JSON结果。 - -【评分标准】 -10分:完全契合,主题/意图完全一致且涵盖所有关键信息 -8-9分:高度相关,核心要素匹配但存在少量信息缺失 -6-7分:部分相关,涉及相同主题但存在重要信息缺失 -4-5分:弱相关,仅次要信息点匹配 -1-3分:完全不相关或信息冲突 - -【评估维度】 -1. 主题一致性:核心主题/意图的匹配程度 -2. 内容覆盖度:是否涵盖query的关键要素 -3. 信息准确性:是否存在矛盾/错误信息 -4. 细节丰富度:是否提供query要求的详细信息 - -【输出格式】 -{{ - "score": 评分, - "reason": "简明扼要的评分理由(中文)" -}} - -【示例】 -query: "新冠疫苗的常见副作用" -内容: "辉瑞疫苗常见反应包括注射部位疼痛(84.1%)、疲劳(62.9%)" -输出: {{"score":8,"reason":"主题完全匹配,涵盖主要副作用但未提及发热等常见反应"}} - -现在评估: -query: "{answer}" -content: "{content}" -""" - - response = self.llm.invoke(user_prompt=prompt, need_retry=True) - - # 解析JSON响应 - try: - parsed_output = self.content_source_parser.parse(response.content) - return parsed_output.score - except Exception as e: - return -1 - except Exception as e: - return -1 - - def get_retrieve_info(self, query:str, outputs:dict) -> tuple: - """ - 获取检索信息并计算分数 - - 参数: - query (str): 用户问题 - outputs (dict): 检索输出结果 - - 返回: - tuple: (检索内容列表, 最高分, 最低分, 平均分) - """ - max_score = 0 - min_score = 10 - total_score = 0 - valid_scores = 0 - retrieve_content = [] - for result in outputs["result"]: - content = result["content"].strip() - score = self.calculate_score(answer=query, content=content) - if score != -1: - max_score = max(max_score, score) - min_score = min(min_score, score) - total_score += score - valid_scores += 1 - content_title = content.split("\n")[0] - if content_title: - retrieve_content.append(content_title + f"--得分({score}分)") - avg_score = total_score / valid_scores if valid_scores > 0 else 0 - return retrieve_content, max_score, min_score, avg_score - - def process(self): - """ - 处理所有问题并评估检索内容相关性 - - 遍历answer_excel中的所有问题,计算检索内容与问题的相关性分数, - 并更新Excel文件 - """ - for idx, row in tqdm(self.answer_excel.iterrows(), total=len(self.answer_excel), desc="处理问题评分中"): - query = row["问题"] - link = self.find_wiki_link(query) - answer_title = self.get_wiki_title(link) - retrieve_content = [] - max_score = 0 - min_score = 0 - avg_score = 0 # 初始化平均分 - rewrite_query="" - message_info = DifyTool.get_message_debug_info(appid="ccf92b97-2789-4a3f-90e0-135a869a37c5", query=query) - for workflow_node in message_info["workflow_node_executions_info"]: - if workflow_node["title"] == "知识检索结果后处理": - outputs = json.loads(workflow_node["outputs"]) - retrieve_content, max_score, min_score, avg_score = self.get_retrieve_info(query=query, outputs=outputs) - elif workflow_node["title"] == "问题优化结果解析": - outputs = json.loads(workflow_node["outputs"]) - rewrite_query = outputs["optimize_query"] - - # 更新 answer_excel 中的词条内容 - self.answer_excel.at[idx, "答案词条"] = answer_title if answer_title else "" - self.answer_excel.at[idx, "问题改写"] = rewrite_query - self.answer_excel.at[idx, "检索得到词条"] = "\n".join(retrieve_content) if retrieve_content else "未检索知识库" - self.answer_excel.at[idx, "最大得分"] = max_score - self.answer_excel.at[idx, "最小得分"] = min_score - self.answer_excel.at[idx, "平均得分"] = avg_score - - # 保存结果到Excel文件 - self.answer_excel.to_excel(self.output_path, index=False) - print(f"结果已保存到 {self.output_path}") - -if __name__ == "__main__": - # 创建评分工具实例 - judge = RetrieveContentScoreJudge( - wiki_excel_path="/data/Rag2_0/data/excel/400条人工标注-部分提问_软件名称明确.xlsx", - answer_excel_path="/data/Rag2_0/data/excel/主网软件提问_回答内容评判.xlsx", - output_path="/data/Rag2_0/data/excel/dify问答_检索内容评分.xlsx" - ) - # 执行处理 - judge.process() \ No newline at end of file