更新 .gitignore 文件以忽略特定 Excel 文件,删除多个不再需要的 Excel 文件,优化对话转工单处理类,添加用户问题列表解析功能,增强代码可读性和结构。

This commit is contained in:
2025-05-28 15:59:23 +08:00
parent 6270513688
commit 6d9412b244
13 changed files with 858 additions and 795 deletions
+1
View File
@@ -6,3 +6,4 @@ __pycache__/
# 忽略数据文件 # 忽略数据文件
data/excel/* data/excel/*
rag2_0/demo/Test.py rag2_0/demo/Test.py
data/excel/*.xlsx
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+393 -297
View File
@@ -1,6 +1,7 @@
import os import os
import json import json
import pandas as pd import pandas as pd
import argparse
from datetime import datetime from datetime import datetime
import time import time
import concurrent.futures import concurrent.futures
@@ -9,6 +10,7 @@ from pydantic import BaseModel, Field
from langchain.output_parsers import PydanticOutputParser from langchain.output_parsers import PydanticOutputParser
from rag2_0.tool.ModelTool import OpenAiLLM from rag2_0.tool.ModelTool import OpenAiLLM
from dotenv import load_dotenv from dotenv import load_dotenv
import openpyxl
load_dotenv() load_dotenv()
@@ -17,6 +19,9 @@ class UserQuestionAndSolution(BaseModel):
user_question: str = Field(description="客户问题") user_question: str = Field(description="客户问题")
solution: str = Field(description="坐席提供的解决方案") solution: str = Field(description="坐席提供的解决方案")
class UserQuestionAndSolutionList(BaseModel):
user_question_list: list[UserQuestionAndSolution] = Field(description="客户问题列表")
class QuestionType(BaseModel): class QuestionType(BaseModel):
question_type: str = Field(description="问题类型") question_type: str = Field(description="问题类型")
@@ -33,14 +38,7 @@ class ProductNameAndModuleName(BaseModel):
class ProductLine(BaseModel): class ProductLine(BaseModel):
product_line: str = Field(description="产品线") product_line: str = Field(description="产品线")
# 初始化输出解析器 # ================ 工具函数 ================
user_question_and_solution_parser = PydanticOutputParser(pydantic_object=UserQuestionAndSolution)
question_type_parser = PydanticOutputParser(pydantic_object=QuestionType)
is_complaint_parser = PydanticOutputParser(pydantic_object=IsComplaint)
product_name_and_module_name_parser = PydanticOutputParser(pydantic_object=ProductNameAndModuleName)
product_line_parser = PydanticOutputParser(pydantic_object=ProductLine)
# ================ LLM配置 ================
def retry_llm_call(max_retries=3, delay=2): def retry_llm_call(max_retries=3, delay=2):
""" """
重试装饰器,用于LLM调用失败时进行重试 重试装饰器,用于LLM调用失败时进行重试
@@ -72,130 +70,144 @@ def retry_llm_call(max_retries=3, delay=2):
return wrapper return wrapper
return decorator return decorator
def get_llm_instance(): # ================ 对话转工单处理类 ================
"""获取LLM实例""" class DialogueToWorkorder:
api_key = os.getenv("OPENAI_API_KEY") def __init__(self, llm_params=None):
base_url = os.getenv("OPENAI_API_BASE") """
model_name = os.getenv("LLM_MODEL_NAME") 初始化对话转工单处理类
llm_params = {
"temperature": 0.6,
"model": model_name,
"api_key": api_key,
"base_url": base_url
}
return OpenAiLLM(**llm_params)
# ================ 数据处理函数 ================
def parse_product_detail_excel(file_path):
"""解析产品详情Excel文件"""
df = pd.read_excel(file_path)
product_dict = {}
for _, row in df.iterrows():
product_line = str(row['产品线']).strip() if pd.notna(row['产品线']) else ''
product_name = str(row['产品名称']).strip() if pd.notna(row['产品名称']) else ''
module_name = str(row['模块名称']).strip() if pd.notna(row['模块名称']) else ''
if product_line not in product_dict: 参数:
product_dict[product_line] = {} llm_params: LLM模型参数,如果为None则使用环境变量中的配置
if product_name not in product_dict[product_line]: """
product_dict[product_line][product_name] = [] # 初始化输出解析器
product_dict[product_line][product_name].append(module_name) self.user_question_and_solution_parser = PydanticOutputParser(pydantic_object=UserQuestionAndSolution)
self.user_question_and_solution_list_parser = PydanticOutputParser(pydantic_object=UserQuestionAndSolutionList)
return product_dict self.question_type_parser = PydanticOutputParser(pydantic_object=QuestionType)
self.is_complaint_parser = PydanticOutputParser(pydantic_object=IsComplaint)
def get_workorder_dict(rows): self.product_name_and_module_name_parser = PydanticOutputParser(pydantic_object=ProductNameAndModuleName)
"""从会话行中提取工单基本信息""" self.product_line_parser = PydanticOutputParser(pydantic_object=ProductLine)
workorder_dict = {}
# 创建时间
for row in rows:
create_time = row['创建时间']
if pd.notna(create_time) and str(create_time).strip() != '':
workorder_dict["创建时间"] = create_time
break
# 处理坐席
for row in rows:
sender = row['消息发送者']
sender_nickname = row['发送者昵称']
if sender == "坐席" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '':
workorder_dict["处理坐席"] = sender_nickname
break
# 访客昵称
for row in rows:
sender = row['消息发送者']
sender_nickname = row['发送者昵称']
if sender == "访客" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '':
workorder_dict["访客昵称"] = sender_nickname
break
# 会话id
for row in rows:
conversation_id = row['会话id']
if pd.notna(conversation_id) and str(conversation_id).strip() != '':
workorder_dict["会话id"] = conversation_id
break
# 工单编号 - 将"创建时间"作为工单编号,格式化为20250513104124
if "创建时间" in workorder_dict and pd.notna(workorder_dict["创建时间"]):
create_time_str = str(workorder_dict["创建时间"]).strip()
dt = datetime.strptime(create_time_str, "%Y-%m-%d %H:%M:%S")
workorder_dict["工单编号"] = dt.strftime("%Y%m%d%H%M%S")
return workorder_dict
def get_dialogue_str(conversation_rows):
"""从会话行中提取对话内容"""
dialogue = []
for row in conversation_rows:
sender = row.get('消息发送者', '')
content = str(row.get('消息内容', '')).strip()
# 处理非文本内容 # 初始化LLM模型
if content == '' or pd.isna(row["消息内容"]): self.llm_params = llm_params or {
if str(row.get('图片', '')).strip() != '': "temperature": 0.6,
content = '[图片]' "model": os.getenv("LLM_MODEL_NAME"),
elif str(row.get('附件', '')).strip() != '': "api_key": os.getenv("OPENAI_API_KEY"),
content = '[附件]' "base_url": os.getenv("OPENAI_API_BASE")
elif str(row.get('视频', '')).strip() != '': }
content = '[视频]'
elif str(row.get('语音', '')).strip() != '': self.llm = self._get_llm_instance()
content = '[语音]'
def _get_llm_instance(self):
# 添加对话内容 """获取LLM实例"""
if sender == '访客': return OpenAiLLM(**self.llm_params)
dialogue.append(f"访客:{content}")
elif sender == '坐席': def parse_product_detail_excel(self, file_path):
dialogue.append(f"坐席:{content}") """解析产品详情Excel文件"""
df = pd.read_excel(file_path)
product_dict = {}
for _, row in df.iterrows():
product_line = str(row['产品线']).strip() if pd.notna(row['产品线']) else ''
product_name = str(row['产品名称']).strip() if pd.notna(row['产品名称']) else ''
module_name = str(row['模块名称']).strip() if pd.notna(row['模块名称']) else ''
return '\n'.join(dialogue) if product_line not in product_dict:
product_dict[product_line] = {}
def group_conversations_by_id(df): if product_name not in product_dict[product_line]:
"""将数据按会话ID分组""" product_dict[product_line][product_name] = []
conversation_dict = {} product_dict[product_line][product_name].append(module_name)
return product_dict
for index, row in df.iterrows(): def get_workorder_dict(self, rows):
conversation_id = row['会话id'] """从会话行中提取工单基本信息"""
if pd.notna(conversation_id) and str(conversation_id).strip() != '': workorder_dict = {}
if conversation_id in conversation_dict:
conversation_dict[conversation_id].append(row.to_dict()) # 创建时间
else: for row in rows:
conversation_dict[conversation_id] = [row.to_dict()] create_time = row['创建时间']
if pd.notna(create_time) and str(create_time).strip() != '':
workorder_dict["创建时间"] = create_time
break
# 处理坐席
for row in rows:
sender = row['消息发送者']
sender_nickname = row['发送者昵称']
if sender == "坐席" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '':
workorder_dict["处理坐席"] = sender_nickname
break
# 访客昵称
for row in rows:
sender = row['消息发送者']
sender_nickname = row['发送者昵称']
if sender == "访客" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '':
workorder_dict["访客昵称"] = sender_nickname
break
# 会话id
for row in rows:
conversation_id = row['会话id']
if pd.notna(conversation_id) and str(conversation_id).strip() != '':
workorder_dict["会话id"] = conversation_id
break
return conversation_dict # 工单编号 - 将"创建时间"作为工单编号,格式化为20250513104124
if "创建时间" in workorder_dict and pd.notna(workorder_dict["创建时间"]):
create_time_str = str(workorder_dict["创建时间"]).strip()
dt = datetime.strptime(create_time_str, "%Y-%m-%d %H:%M:%S")
workorder_dict["工单编号"] = dt.strftime("%Y%m%d%H%M%S")
# ================ LLM分析函数 ================ return workorder_dict
@retry_llm_call(max_retries=3, delay=2)
def get_user_question_and_solution(conversation_rows):
"""分析用户问题和解决方案"""
dialogue_str = get_dialogue_str(conversation_rows)
prompt = f""" def get_dialogue_str(self, conversation_rows):
"""从会话行中提取对话内容"""
dialogue = []
for row in conversation_rows:
sender = row.get('消息发送者', '')
content = str(row.get('消息内容', '')).strip()
# 处理非文本内容
if content == '' or pd.isna(row["消息内容"]):
if str(row.get('图片', '')).strip() != '':
content = '[图片]'
elif str(row.get('附件', '')).strip() != '':
content = '[附件]'
elif str(row.get('视频', '')).strip() != '':
content = '[视频]'
elif str(row.get('语音', '')).strip() != '':
content = '[语音]'
# 添加对话内容
if sender == '访客':
dialogue.append(f"访客:{content}")
elif sender == '坐席':
dialogue.append(f"坐席:{content}")
return '\n'.join(dialogue)
def group_conversations_by_id(self, df):
"""将数据按会话ID分组"""
conversation_dict = {}
for index, row in df.iterrows():
conversation_id = row['会话id']
if pd.notna(conversation_id) and str(conversation_id).strip() != '':
if conversation_id in conversation_dict:
conversation_dict[conversation_id].append(row.to_dict())
else:
conversation_dict[conversation_id] = [row.to_dict()]
return conversation_dict
@retry_llm_call(max_retries=3, delay=2)
def get_user_question_and_solution(self, conversation_rows):
"""分析用户问题和解决方案"""
dialogue_str = self.get_dialogue_str(conversation_rows)
prompt = f"""
请从以下电力造价相关的客服对话记录中,精准提取用户提出的专业问题及对应坐席提供的解决方案。要求: 请从以下电力造价相关的客服对话记录中,精准提取用户提出的专业问题及对应坐席提供的解决方案。要求:
1. 专业识别: 1. 专业识别:
@@ -217,77 +229,82 @@ def get_user_question_and_solution(conversation_rows):
- 文件模板提供情况 - 文件模板提供情况
3. 结构化输出: 3. 结构化输出:
{user_question_and_solution_parser.get_format_instructions()} {self.user_question_and_solution_list_parser.get_format_instructions()}
访客与坐席的对话记录如下: 访客与坐席的对话记录如下:
{dialogue_str} {dialogue_str}
""" """
response = self.llm.invoke(user_prompt=prompt)
user_question_and_solution_list = self.user_question_and_solution_list_parser.parse(response.content)
return user_question_and_solution_list.user_question_list
llm = get_llm_instance() @retry_llm_call(max_retries=3, delay=2)
response = llm.invoke(user_prompt=prompt) def get_product_name_and_module_name(self, product_line, conversation_rows, product_detail_dict, user_question_str, solution_str):
user_question_and_solution = user_question_and_solution_parser.parse(response.content) """分析产品名称和模块名称"""
if product_line == '':
return user_question_and_solution.user_question, user_question_and_solution.solution return '', ''
@retry_llm_call(max_retries=3, delay=2) json_str = json.dumps(product_detail_dict[product_line])
def get_product_name_and_module_name(product_line, conversation_rows, product_detail_dict): dialogue_str = self.get_dialogue_str(conversation_rows)
"""分析产品名称和模块名称"""
if product_line == '': prompt = f"""
return '', ''
json_str = json.dumps(product_detail_dict[product_line])
dialogue_str = get_dialogue_str(conversation_rows)
prompt = f"""
请根据以下对话内容分析所属产品名称和模块名称,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。 请根据以下对话内容分析所属产品名称和模块名称,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。
要求:
1. 如果对话记录中存在多个产品名称和模块名称,则根据"{user_question_str}""{solution_str}"判断最可能的产品名称和模块名称。
2. 如果对话记录中只存在一个产品名称和模块名称,则直接返回该产品名称和模块名称。
输出格式: 输出格式:
{product_name_and_module_name_parser.get_format_instructions()} {self.product_name_and_module_name_parser.get_format_instructions()}
产品名称列表及模块名称列表: 产品名称列表及模块名称列表:
{json_str} {json_str}
对话记录: 对话记录:
{dialogue_str} {dialogue_str}
""" """
response = self.llm.invoke(user_prompt=prompt)
product_name_and_module_name = self.product_name_and_module_name_parser.parse(response.content)
return product_name_and_module_name.product_name, product_name_and_module_name.module_name
llm = get_llm_instance() @retry_llm_call(max_retries=3, delay=2)
response = llm.invoke(user_prompt=prompt) def get_product_line(self, conversation_rows, product_detail_dict, user_question_str, solution_str):
product_name_and_module_name = product_name_and_module_name_parser.parse(response.content) """分析产品线"""
dialogue_str = self.get_dialogue_str(conversation_rows)
return product_name_and_module_name.product_name, product_name_and_module_name.module_name product_line_list = list(product_detail_dict.keys())
@retry_llm_call(max_retries=3, delay=2) prompt = f"""
def get_product_line(conversation_rows, product_detail_dict):
"""分析产品线"""
dialogue_str = get_dialogue_str(conversation_rows)
product_line_list = list(product_detail_dict.keys())
prompt = f"""
请根据以下对话内容分析所属产品线,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。 请根据以下对话内容分析所属产品线,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。
无法判断时,返回空字符串。即product_line="" 无法判断时,返回空字符串。即product_line=""
要求:
1. 如果对话记录中存在多个产品线,则根据"{user_question_str}""{solution_str}"判断最可能的产品线。
2. 如果对话记录中只存在一个产品线,则直接返回该产品线。
输出格式: 输出格式:
{product_line_parser.get_format_instructions()} {self.product_line_parser.get_format_instructions()}
产品线列表: 产品线列表:
{product_line_list} {product_line_list}
对话记录: 对话记录:
{dialogue_str} {dialogue_str}
""" """
response = self.llm.invoke(user_prompt=prompt)
product_line = self.product_line_parser.parse(response.content)
return product_line.product_line
llm = get_llm_instance() @retry_llm_call(max_retries=3, delay=2)
response = llm.invoke(user_prompt=prompt) def get_problem_type(self, conversation_rows, user_question_str, solution_str):
product_line = product_line_parser.parse(response.content) """分析问题类型"""
dialogue_str = self.get_dialogue_str(conversation_rows)
return product_line.product_line
prompt = f"""
@retry_llm_call(max_retries=3, delay=2)
def get_problem_type(conversation_rows):
"""分析问题类型"""
dialogue_str = get_dialogue_str(conversation_rows)
prompt = f"""
请根据以下对话内容分析所属业务类别,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。 请根据以下对话内容分析所属业务类别,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。
分类体系: 分类体系:
@@ -305,25 +322,25 @@ def get_problem_type(conversation_rows):
2. 区分操作类问题与技术故障(如"报表导出失败"需区分是操作错误还是系统错误) 2. 区分操作类问题与技术故障(如"报表导出失败"需区分是操作错误还是系统错误)
3. 对涉及专业计算的咨询,需判断是否属于常规咨询(专业咨询)或系统计算异常(数据问题) 3. 对涉及专业计算的咨询,需判断是否属于常规咨询(专业咨询)或系统计算异常(数据问题)
4. 对模糊表述要求追问的场景归入培训支持类 4. 对模糊表述要求追问的场景归入培训支持类
5. 对话记录可能存在多个问题,你只需要判断"{user_question_str}"属于哪个类型的问题
输出格式: 输出格式:
{question_type_parser.get_format_instructions()} {self.question_type_parser.get_format_instructions()}
对话记录如下: 对话记录如下:
{dialogue_str} {dialogue_str}
""" """
response = self.llm.invoke(user_prompt=prompt)
question_type = self.question_type_parser.parse(response.content)
return question_type.question_type
llm = get_llm_instance() @retry_llm_call(max_retries=3, delay=2)
response = llm.invoke(user_prompt=prompt) def get_is_complaint_and_is_complaint_level(self, conversation_rows):
question_type = question_type_parser.parse(response.content) """分析是否抱怨及抱怨级别"""
dialogue_str = self.get_dialogue_str(conversation_rows)
return question_type.question_type
prompt = f"""
@retry_llm_call(max_retries=3, delay=2)
def get_is_complaint_and_is_complaint_level(conversation_rows):
"""分析是否抱怨及抱怨级别"""
dialogue_str = get_dialogue_str(conversation_rows)
prompt = f"""
请根据以下对话记录分析访客情绪是否对博微软件或者坐席服务存在明显抱怨,并按照以下结构输出JSON格式分析结果: 请根据以下对话记录分析访客情绪是否对博微软件或者坐席服务存在明显抱怨,并按照以下结构输出JSON格式分析结果:
1. 抱怨识别:判断访客是否对博微软件功能或者坐席服务存在明显抱怨语气或词语 1. 抱怨识别:判断访客是否对博微软件功能或者坐席服务存在明显抱怨语气或词语
@@ -346,7 +363,7 @@ def get_is_complaint_and_is_complaint_level(conversation_rows):
}} }}
输出格式要求: 输出格式要求:
{is_complaint_parser.get_format_instructions()} {self.is_complaint_parser.get_format_instructions()}
当前对话记录: 当前对话记录:
{dialogue_str} {dialogue_str}
@@ -356,130 +373,209 @@ def get_is_complaint_and_is_complaint_level(conversation_rows):
2. 注意抱怨升级趋势(如从一般抱怨发展为严重抗议) 2. 注意抱怨升级趋势(如从一般抱怨发展为严重抗议)
3. 关注非文本线索(如有记录可分析语气词、停顿等副语言特征) 3. 关注非文本线索(如有记录可分析语气词、停顿等副语言特征)
4. 标注涉及多个抱怨对象的情况 4. 标注涉及多个抱怨对象的情况
""" """
llm = get_llm_instance()
response = llm.invoke(user_prompt=prompt)
is_complaint = is_complaint_parser.parse(response.content)
return (is_complaint.is_dissatisfaction,
is_complaint.dissatisfaction_level,
is_complaint.dissatisfaction_reasoning,
is_complaint.is_complaint)
# ================ 主流程处理 ================
def process_conversation(conversation_id, conversation_rows, product_detail_dict):
"""处理单个会话的函数,用于多线程并发"""
# 获取工单基本信息
workorder_dict = get_workorder_dict(conversation_rows)
# 分析是否抱怨、是否投诉、抱怨级别
is_dissatisfaction, dissatisfaction_level, dissatisfaction_reasoning, is_complaint = (
get_is_complaint_and_is_complaint_level(conversation_rows))
# 分析用户问题和解决方案
user_question, solution = get_user_question_and_solution(conversation_rows)
# 分析问题类型
problem_type = get_problem_type(conversation_rows)
# 分析产品线
product_line = get_product_line(conversation_rows, product_detail_dict)
# 分析产品名称和模块名称
if product_line != '':
product_name, module_name = get_product_name_and_module_name(
product_line, conversation_rows, product_detail_dict)
else:
product_name = ''
module_name = ''
# 更新工单字典
workorder_dict.update({
"产品线": product_line,
"产品名称": product_name,
"模块名称": module_name,
"客户问题": user_question,
"问题类型": problem_type,
"是否抱怨": "" if is_dissatisfaction else '',
"抱怨级别": dissatisfaction_level,
"是否投诉": "" if is_complaint else '',
"解决方案": (solution + '\n存在抱怨:' + dissatisfaction_reasoning) if is_dissatisfaction else solution
})
return workorder_dict
def analyze_conversation_data(conversation_excel_path, product_detail_excel_path, max_workers=4):
"""分析会话数据主流程,使用多线程并发处理"""
# 读取Excel文件
df = pd.read_excel(conversation_excel_path)
# 检查数据框的列
print(f"Excel文件列名: {df.columns.tolist()}")
print(f"数据总行数: {len(df)}")
# 解析产品详情
product_detail_dict = parse_product_detail_excel(product_detail_excel_path)
# 按会话ID分组
conversation_dict = group_conversations_by_id(df)
# 使用线程池处理每个会话
workorder_dict_list = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 创建任务
future_to_conversation = {
executor.submit(process_conversation, conversation_id, conversation_rows, product_detail_dict): conversation_id
for conversation_id, conversation_rows in conversation_dict.items()
}
# 获取结果 response = self.llm.invoke(user_prompt=prompt)
for future in concurrent.futures.as_completed(future_to_conversation): is_complaint = self.is_complaint_parser.parse(response.content)
conversation_id = future_to_conversation[future]
try: return (is_complaint.is_dissatisfaction,
workorder_dict = future.result() is_complaint.dissatisfaction_level,
workorder_dict_list.append(workorder_dict) is_complaint.dissatisfaction_reasoning,
print(f"完成处理会话ID: {conversation_id}") is_complaint.is_complaint)
except Exception as exc:
print(f"处理会话ID: {conversation_id} 时发生错误: {exc}")
return workorder_dict_list def process_conversation(self, conversation_id, conversation_rows, product_detail_dict):
"""处理单个会话的函数,用于多线程并发"""
# 获取工单基本信息
workorder_dict = self.get_workorder_dict(conversation_rows)
# 分析是否抱怨、是否投诉、抱怨级别
is_dissatisfaction, dissatisfaction_level, dissatisfaction_reasoning, is_complaint = (
self.get_is_complaint_and_is_complaint_level(conversation_rows))
def save_results_to_excel(workorder_dict_list, output_file='workorder_result.xlsx'): # 分析用户问题和解决方案
"""将结果保存到Excel文件""" user_question_list = self.get_user_question_and_solution(conversation_rows)
result_df = pd.DataFrame(workorder_dict_list) for user_question in user_question_list:
user_question_str = user_question.user_question
solution_str = user_question.solution
# 分析问题类型
problem_type = self.get_problem_type(conversation_rows, user_question_str, solution_str)
# 分析产品线
product_line = self.get_product_line(conversation_rows, product_detail_dict, user_question_str, solution_str)
# 分析产品名称和模块名称
if product_line != '':
product_name, module_name = self.get_product_name_and_module_name(
product_line, conversation_rows, product_detail_dict, user_question_str, solution_str)
else:
product_name = ''
module_name = ''
# 更新工单字典
workorder_dict.update({
"产品线": product_line,
"产品名称": product_name,
"模块名称": module_name,
"客户问题": user_question_str,
"问题类型": problem_type,
"是否抱怨": "" if is_dissatisfaction else '',
"抱怨级别": dissatisfaction_level if is_dissatisfaction else '',
"是否投诉": "" if is_complaint else '',
"解决方案": (solution_str + '\n存在抱怨:' + dissatisfaction_reasoning) if is_dissatisfaction else solution_str
})
return workorder_dict
# 按照指定的列顺序重新排列DataFrame的列 def analyze_conversation_data(self, conversation_excel_path, product_detail_excel_path, max_workers=4):
columns_order = [ """分析会话数据主流程,使用多线程并发处理"""
'工单编号', '产品线', '产品名称', '模块名称', '问题类型', # 读取Excel文件
'客户问题', '解决方案', '是否抱怨', '是否投诉', '抱怨级别', df = pd.read_excel(conversation_excel_path)
'会话id', '访客昵称', '处理坐席', '创建时间'
] # 检查数据框的列
print(f"Excel文件列名: {df.columns.tolist()}")
print(f"数据总行数: {len(df)}")
# 解析产品详情
product_detail_dict = self.parse_product_detail_excel(product_detail_excel_path)
# 按会话ID分组
conversation_dict = self.group_conversations_by_id(df)
# 使用线程池处理每个会话
workorder_dict_list = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 创建任务
future_to_conversation = {
executor.submit(self.process_conversation, conversation_id, conversation_rows, product_detail_dict): conversation_id
for conversation_id, conversation_rows in conversation_dict.items()
}
# 获取结果
for future in concurrent.futures.as_completed(future_to_conversation):
conversation_id = future_to_conversation[future]
try:
workorder_dict = future.result()
workorder_dict_list.append(workorder_dict)
print(f"完成处理会话ID: {conversation_id}")
except Exception as exc:
print(f"处理会话ID: {conversation_id} 时发生错误: {exc}")
return workorder_dict_list
# 确保所有列都存在,如果不存在则添加空列 def save_results_to_excel(self, workorder_dict_list, output_file=None):
for col in columns_order: """将结果保存到Excel文件"""
if col not in result_df.columns: result_df = pd.DataFrame(workorder_dict_list)
result_df[col] = None
# 按照指定的列顺序重新排列DataFrame的列
columns_order = [
'工单编号', '产品线', '产品名称', '模块名称', '问题类型',
'客户问题', '解决方案', '是否抱怨', '是否投诉', '抱怨级别',
'会话id', '访客昵称', '处理坐席', '创建时间'
]
# 确保所有列都存在,如果不存在则添加空列
for col in columns_order:
if col not in result_df.columns:
result_df[col] = None
# 按指定顺序重排列
result_df = result_df[columns_order]
# 保存到Excel文件
if output_file is None:
# 默认输出文件名
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
output_file = os.path.join('data', 'excel', f'会话内容详情{timestamp}_转工单.xlsx')
# 确保目录存在
os.makedirs(os.path.dirname(output_file), exist_ok=True)
# 创建ExcelWriter对象,用于设置Excel样式
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
# 写入数据
result_df.to_excel(writer, index=False, sheet_name='工单数据')
# 获取工作簿和工作表
workbook = writer.book
worksheet = writer.sheets['工单数据']
# 设置行高(20磅 ≈ 26.67像素)
for row in worksheet.iter_rows():
worksheet.row_dimensions[row[0].row].height = 20
# 设置列宽
column_widths = {
'工单编号': 15,
'产品线': 24,
'产品名称': 40,
'模块名称': 40,
'问题类型': 9,
'客户问题': 20,
'解决方案': 30,
'是否抱怨': 9,
'是否投诉': 9,
'抱怨级别': 9,
'会话id': 9,
'访客昵称': 9,
'处理坐席': 9,
'创建时间': 9
}
# 应用列宽设置
for i, column in enumerate(columns_order):
col_letter = chr(65 + i) # A, B, C, ...
if i >= 26: # 超过Z的情况
col_letter = chr(64 + i // 26) + chr(65 + i % 26)
worksheet.column_dimensions[col_letter].width = column_widths[column]
print(f"结果已保存到 {output_file}")
return output_file
# ================ 参数解析 ================
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description='对话内容转工单工具')
# 按指定顺序重排列 parser.add_argument('--conversation_file', type=str, required=False,
result_df = result_df[columns_order] help='会话内容Excel文件路径')
parser.add_argument('--product_detail_file', type=str, required=False,
help='产品详情Excel文件路径')
parser.add_argument('--max_workers', type=int, default=16,
help='并发处理线程数,默认为16')
# 保存到Excel文件 return parser.parse_args()
result_df.to_excel(output_file, index=False)
print(f"结果已保存到 {output_file}")
# ================ 主程序入口 ================ # ================ 主程序入口 ================
def main(): def main():
"""主程序入口""" """主程序入口"""
# 文件路径 # 解析命令行参数
conversation_excel_path = os.path.join('data', 'excel', '会话内容详情20250527173330.xlsx') args = parse_arguments()
product_detail_excel_path = os.path.join('data', 'excel', '产品详情20250527175736.xlsx')
# 分析会话数据,设置并发线程数 # 设置默认文件路径
max_workers = 8 # 可以根据CPU核心数和任务特性调整 conversation_excel_path = args.conversation_file or os.path.join('data', 'excel', '会话内容详情20250528110230.xlsx')
workorder_dict_list = analyze_conversation_data(conversation_excel_path, product_detail_excel_path, max_workers) product_detail_excel_path = args.product_detail_file or os.path.join('data', 'excel', '产品详情_工单.xlsx')
output_file = args.output_file
# 配置LLM参数
llm_params = {
"temperature": args.temperature,
"model": args.model_name or os.getenv("LLM_MODEL_NAME"),
"api_key": os.getenv("OPENAI_API_KEY"),
"base_url": os.getenv("OPENAI_API_BASE")
}
# 创建处理实例
processor = DialogueToWorkorder(llm_params=llm_params)
# 分析会话数据
workorder_dict_list = processor.analyze_conversation_data(
conversation_excel_path,
product_detail_excel_path,
max_workers=args.max_workers
)
# 保存结果 # 保存结果
save_results_to_excel(workorder_dict_list) processor.save_results_to_excel(workorder_dict_list, output_file)
if __name__ == "__main__": if __name__ == "__main__":
main() main()
-462
View File
@@ -1,462 +0,0 @@
"""
综合评判工具
此模块结合了答案正确性评判和检索内容相关性评分功能,可以同时:
1. 评判问题的新旧回答是否正确
2. 比较新旧回答的差异
3. 评估检索内容与问题的相关性
用法示例:
judge = CombinedJudge()
judge.process()
"""
import pandas as pd
from urllib.parse import unquote
from rag2_0.tool.WikijsTool import WikijsTool
from rag2_0.tool.html_to_md import convert_html_to_md
from rag2_0.tool.ModelTool import OpenAiLLM
from dotenv import load_dotenv
import os
from tqdm import tqdm
from rag2_0.dify.dify_tool import DifyTool
import json
from pydantic import BaseModel, Field
from langchain.output_parsers import PydanticOutputParser
import concurrent.futures
from threading import Lock
load_dotenv()
class ContentSource(BaseModel):
score:int = Field(description="相关性分数")
reason:str = Field(description="评分理由")
class CombinedJudge:
"""
综合评判工具类
结合了答案正确性评判和检索内容相关性评分功能
"""
def __init__(self, wiki_excel_path="/data/QueryRewrite/data/excel/部分提问_软件名称明确.xlsx",
answer_excel_path="/data/QueryRewrite/data/excel/dify问答_对比结果.xlsx",
output_path="/data/QueryRewrite/data/excel/dify问答__综合评判结果.xlsx",
dify_appid="ccf92b97-2789-4a3f-90e0-135a869a37c5",
max_workers=10):
"""
初始化综合评判工具
参数:
wiki_excel_path (str): Wiki Excel文件路径
answer_excel_path (str): 答案对比Excel文件路径
output_path (str): 输出Excel文件路径
dify_appid (str): Dify应用ID
max_workers (int): 最大工作线程数
"""
self.wiki_excel_path = wiki_excel_path
self.answer_excel_path = answer_excel_path
self.output_path = output_path
self.dify_appid = dify_appid
self.max_workers = max_workers
self.content_source_parser = PydanticOutputParser(pydantic_object=ContentSource)
self.results_lock = Lock()
# 读取Excel文件
if os.path.exists(wiki_excel_path):
self.wiki_excel = pd.read_excel(self.wiki_excel_path)
else:
self.wiki_excel = None
self.answer_excel = pd.read_excel(self.answer_excel_path)
# 初始化LLM
self.api_key = os.getenv("OPENAI_API_KEY")
self.base_url = os.getenv("OPENAI_API_BASE")
self.model = os.getenv("LLM_MODEL_NAME")
if not all([self.api_key, self.base_url, self.model]):
raise ValueError("请设置 OPENAI_API_KEY, OPENAI_API_BASE, 和 LLM_MODEL_NAME 环境变量")
self.llm = OpenAiLLM(api_key=self.api_key, base_url=self.base_url, model=self.model)
def find_wiki_link(self, query) -> str | None:
"""
根据查询(对应wiki_excel中的新提问列)找出对应的词条链接
参数:
query (str): 查询内容,对应wiki_excel中的新提问列
返回:
str: 对应的词条链接,如果没有找到则返回None
"""
# 确保query不为空
if not query or pd.isna(query):
return None
if self.wiki_excel is None:
return None
# 在"新提问"列中查找匹配的行
matched_rows = self.wiki_excel[self.wiki_excel['新提问'] == query]
# 如果找到了匹配的行,返回对应的词条链接
if not matched_rows.empty:
return matched_rows.iloc[0]['对应词条链接']
# 如果没有完全匹配,尝试部分匹配
# 去除软件名称部分(如果有)
query_parts = query.split(',', 1)
if len(query_parts) > 1:
clean_query = query_parts[1].strip()
# 在"提问"列中查找包含清理后查询的行
for idx, row in self.wiki_excel.iterrows():
if pd.notna(row['提问']) and clean_query in row['提问']:
return row['对应词条链接']
return None
def get_wiki_content(self, link) -> str:
"""
获取词条链接的内容
参数:
link (str): 词条链接
返回:
str: 链接内容,如果获取失败则返回错误信息
"""
try:
if not link or pd.isna(link):
return "链接为空或无效"
# 移除域名部分,只保留路径
path = link.split('/', 3)[-1]
decoded_path = unquote(path)
path_parts = decoded_path.split('/')
doc_path = "/".join(path_parts[1:])
wiki_doc = WikijsTool.get_all_doc_by_path(path=doc_path, path_is_dir=False)
html_content = WikijsTool.query_doc_info(wiki_doc[0]["id"]).get('content')
if not html_content:
return "获取内容失败"
options = {"heading_style": '', "keep_inline_images_in": ["figure", "img"], "escape_asterisks": True}
new_content = (html_content.replace("h6>", "h7>")
.replace("h5>", "h6>")
.replace("h4>", "h5>")
.replace("h3>", "h4>")
.replace("h2>", "h3>")
.replace("h1>", "h2>"))
# 将HTML内容转换为Markdown
markdown_content = convert_html_to_md(new_content, "", **options)
markdown_content = f"# {path_parts[-1]}\n\n{markdown_content}"
return markdown_content
except Exception as e:
raise RuntimeError(f"获取词条内容失败: {str(e)}") from e
def get_wiki_title(self, link) -> str | None:
"""
获取词条标题
参数:
link (str): 词条链接
返回:
str: 词条标题,如果获取失败则返回None
"""
try:
if not link or pd.isna(link):
return None
# 移除域名部分,只保留路径
path = link.split('/', 3)[-1]
decoded_path = unquote(path)
path_parts = decoded_path.split('/')
return path_parts[-1]
except Exception as e:
raise RuntimeError(f"获取词条内容失败: {str(e)}") from e
def create_correctness_prompt(self, standard_answer: str, answer_to_check: str) -> str:
"""
创建用于评判答案正确性的prompt
参数:
standard_answer (str): 标准答案
answer_to_check (str): 需要检查的答案
返回:
str: 格式化的prompt
"""
return f"""请作为一个专业的答案评判专家,评估以下回答与标准答案的匹配程度。
标准答案:
{standard_answer}
待评估的回答:
{answer_to_check}
请仔细分析两个答案的内容,并给出你的判断。只需要回答"正确""错误",不需要其他解释。
如果待评估的回答与标准答案在核心内容和关键信息(步骤)上一致,即使表达方式不同,也应判定为"正确"
如果待评估的回答存在明显的错误信息或重要信息缺失,应判定为"错误"
请严格按以下格式输出:【正确】或【错误】:"""
def judge_answer(self, standard_answer: str, answer: str) -> bool | None:
"""
调用LLM判断回答是否正确
参数:
standard_answer (str): 标准答案(来自Wiki
answer (str): 需评判的回答
返回:
bool | None: 判断结果,True表示正确,False表示错误,None表示判断失败
"""
prompt = self.create_correctness_prompt(standard_answer, answer)
try:
response = self.llm.invoke(user_prompt=prompt, need_retry=True)
return "正确" in response.content
except Exception as e:
return None
def judge_by_standard_answer(self, standard_answer: str, old_answer: str, new_answer: str) -> str | None:
"""
综合判断新旧回答的正确性
参数:
standard_answer (str): 标准答案(来自Wiki
old_answer (str): 旧流程的回答
new_answer (str): 新流程的回答
返回:
str | None: 包含新旧回答判断结果的字符串,None表示判断失败
"""
old_result = self.judge_answer(standard_answer, old_answer)
new_result = self.judge_answer(standard_answer, new_answer)
if old_result is None or new_result is None:
return None
if new_result and old_result:
return "新旧答案均正确"
elif new_result and not old_result:
return "新答案正确"
elif not new_result and old_result:
return "旧答案正确"
else:
return "新旧答案均错误"
def judge_answer_diff(self, old_answer: str, new_answer: str) -> str | None:
"""
判断新旧回答是否存在较大差异
参数:
old_answer (str): 旧流程的回答
new_answer (str): 新流程的回答
返回:
str | None: 差异判断结果,None表示判断失败
"""
prompt = f"""请判断以下两个回答是否存在较大差异:
旧回答: {old_answer}
新回答: {new_answer}
主要是主要步骤、主要信息、或者主要主体的差异
请仅回答"存在较大差异""差异较小""""
try:
response = self.llm.invoke(user_prompt=prompt, need_retry=True)
return "缺乏标准答案无法判断准确性,但答案差异较大" if "存在较大差异" in response.content else "缺乏标准答案无法判断准确性,但答案基本相同"
except Exception as e:
return None
def calculate_score(self, query:str, content:str) -> int:
"""
使用LLM判断query与content之间的相关性分数
参数:
query (str): 用户问题
content (str): 检索内容
返回:
int: 相关性分数,1-10分,10代表完全相关,1代表完全不相关;-1表示评分失败
"""
try:
prompt = f"""你是一个专业的信息相关性评估助手。请根据以下标准对用户query和检索内容的相关性进行1-10评分(10=完全相关,1=完全不相关),并按指定格式输出JSON结果。
【评分标准】
10分:完全契合,主题/意图完全一致且涵盖所有关键信息
8-9分:高度相关,核心要素匹配但存在少量信息缺失
6-7分:部分相关,涉及相同主题但存在重要信息缺失
4-5分:弱相关,仅次要信息点匹配
1-3分:完全不相关或信息冲突
【评估维度】
1. 主题一致性:核心主题/意图的匹配程度
2. 内容覆盖度:是否涵盖query的关键要素
3. 信息准确性:是否存在矛盾/错误信息
4. 细节丰富度:是否提供query要求的详细信息
【输出格式】
{{
"score": 评分,
"reason": "简明扼要的评分理由(中文)"
}}
【示例】
query: "新冠疫苗的常见副作用"
内容: "辉瑞疫苗常见反应包括注射部位疼痛(84.1%)、疲劳(62.9%)"
输出: {{"score":8,"reason":"主题完全匹配,涵盖主要副作用但未提及发热等常见反应"}}
现在评估:
query: "{query}"
content: "{content}"
"""
response = self.llm.invoke(user_prompt=prompt, need_retry=True)
# 解析JSON响应
try:
parsed_output = self.content_source_parser.parse(response.content)
return parsed_output.score
except Exception as e:
return -1
except Exception as e:
return -1
def get_retrieve_info(self, query:str, outputs:dict) -> tuple:
"""
获取检索信息并计算分数
参数:
query (str): 用户问题
outputs (dict): 检索输出结果
返回:
tuple: (检索内容列表, 最高分, 最低分, 平均分)
"""
max_score = 0
min_score = 10
total_score = 0
valid_scores = 0
retrieve_content = []
for result in outputs["result"]:
content = result["content"].strip()
score = self.calculate_score(query=query, content=content)
if score != -1:
max_score = max(max_score, score)
min_score = min(min_score, score)
total_score += score
valid_scores += 1
content_title = content.split("\n")[0]
if content_title:
retrieve_content.append(content_title + f"--得分({score}分)")
avg_score = total_score / valid_scores if valid_scores > 0 else 0
return retrieve_content, max_score, min_score, avg_score
def process_single_question(self, row):
"""
处理单个问题的评判
参数:
row: DataFrame中的一行数据
返回:
dict: 包含处理结果的字典
"""
query = row["问题"]
old_answer = row["旧流程答案"]
new_answer = row["新流程答案"]
# 获取词条链接和标准答案
wiki_url = self.find_wiki_link(query)
standard_answer = ""
answer_title = ""
try:
if wiki_url and not pd.isna(wiki_url):
standard_answer = self.get_wiki_content(wiki_url)
answer_title = self.get_wiki_title(wiki_url)
except Exception as e:
print(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}")
# 判断答案正确性
if standard_answer:
judge_result = self.judge_by_standard_answer(standard_answer, old_answer, new_answer)
else:
judge_result = self.judge_answer_diff(old_answer, new_answer)
if judge_result is None:
judge_result = ""
# 获取检索内容评分
retrieve_content = []
max_score = 0
min_score = 0
avg_score = 0
rewrite_query = ""
try:
message_info = DifyTool.get_message_debug_info(appid=self.dify_appid, query=query)
for workflow_node in message_info["workflow_node_executions_info"]:
if workflow_node["title"] == "知识检索结果后处理":
outputs = json.loads(workflow_node["outputs"])
retrieve_content, max_score, min_score, avg_score = self.get_retrieve_info(query=query, outputs=outputs)
elif workflow_node["title"] == "问题优化结果解析":
outputs = json.loads(workflow_node["outputs"])
rewrite_query = outputs["optimize_query"]
except Exception as e:
print(f"处理问题 '{query}' 获取检索内容时发生错误: {str(e)}")
# 返回结果
return {
"问题": query,
"问题改写": rewrite_query,
"旧流程答案": old_answer,
"新流程答案": new_answer,
"回答判断": judge_result,
"答案词条": answer_title if answer_title else "",
"检索得到词条": "\n".join(retrieve_content) if retrieve_content else "未检索知识库",
}
def process(self):
"""
多线程处理所有问题并进行综合评判
读取Excel文件中的问题和答案,使用多线程进行评判,并将结果保存到输出Excel文件
"""
# 创建结果列表
results = []
# 创建进度条
with tqdm(total=len(self.answer_excel), desc="处理问题中") as pbar:
# 使用线程池执行任务
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有任务
future_to_row = {executor.submit(self.process_single_question, row): idx
for idx, row in self.answer_excel.iterrows()}
# 处理完成的任务
for future in concurrent.futures.as_completed(future_to_row):
idx = future_to_row[future]
try:
result = future.result()
with self.results_lock:
results.append(result)
except Exception as e:
print(f"处理第 {idx} 行时发生错误: {str(e)}")
finally:
pbar.update(1)
# 将结果转换为DataFrame并保存
results_df = pd.DataFrame(results)
results_df.to_excel(self.output_path, index=False)
print(f"处理完成,共处理 {len(results)} 条记录,结果已保存至 {self.output_path}")
# 测试函数
if __name__ == "__main__":
# 创建综合评判工具实例
judge = CombinedJudge(max_workers=30)
# 执行处理
judge.process()
+3 -3
View File
@@ -160,7 +160,7 @@ class DifyTool:
应用信息、消息详情以及工作流节点执行情况。 应用信息、消息详情以及工作流节点执行情况。
""" """
@staticmethod @staticmethod
def get_message_debug_info_id(message_id:str)->dict | None: def get_message_debug_info_by_id(message_id:str)->dict | None:
""" """
根据消息 ID 从 'messages' 表中获取消息信息。 根据消息 ID 从 'messages' 表中获取消息信息。
""" """
@@ -178,7 +178,7 @@ class DifyTool:
@staticmethod @staticmethod
def get_message_debug_info(appid:str, query:str)->dict: def get_message_debug_info_by_query(appid:str, query:str)->dict:
""" """
获取指定应用和查询相关的调试信息。 获取指定应用和查询相关的调试信息。
@@ -212,4 +212,4 @@ class DifyTool:
if __name__ == "__main__": if __name__ == "__main__":
print(DifyTool.get_message_debug_info("ccf92b97-2789-4a3f-90e0-135a869a37c5", "电力建设计价通软件,导入结算后没有暂列金怎么办?要手动添加么?")) print(DifyTool.get_message_debug_info_by_query("ccf92b97-2789-4a3f-90e0-135a869a37c5", "电力建设计价通软件,导入结算后没有暂列金怎么办?要手动添加么?"))
+422 -24
View File
@@ -9,12 +9,27 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm from tqdm import tqdm
from rag2_0.dify.dify_tool import DifyTool from rag2_0.dify.dify_tool import DifyTool
import json import json
from urllib.parse import unquote
from rag2_0.tool.WikijsTool import WikijsTool
from rag2_0.tool.html_to_md import convert_html_to_md
from rag2_0.tool.ModelTool import OpenAiLLM
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from langchain.output_parsers import PydanticOutputParser
from threading import Lock
load_dotenv()
class ContentSource(BaseModel):
score:int = Field(description="相关性分数")
reason:str = Field(description="评分理由")
class DifyComparisonTester: class DifyComparisonTester:
""" """
Dify新旧流程对比测试类,用于比较两个不同流程的问答效果 Dify新旧流程对比测试类,用于比较两个不同流程的问答效果并进行评判
""" """
def __init__(self, excel_path:str, baseurl:str, old_workflow_api_key:str, new_workflow_api_key:str): def __init__(self, excel_path:str, baseurl:str, old_workflow_api_key:str, new_workflow_api_key:str,
wiki_excel_path:str=None, output_path:str=None, max_workers:int=10):
""" """
初始化对比测试器 初始化对比测试器
@@ -23,14 +38,33 @@ class DifyComparisonTester:
baseurl: Dify API的基础URL baseurl: Dify API的基础URL
old_workflow_api_key: 旧流程的API密钥 old_workflow_api_key: 旧流程的API密钥
new_workflow_api_key: 新流程的API密钥 new_workflow_api_key: 新流程的API密钥
wiki_excel_path: Wiki Excel文件路径,用于获取标准答案
output_path: 输出Excel文件路径
max_workers: 最大工作线程数
""" """
self.excel_path = excel_path self.excel_path = excel_path
self.baseurl = baseurl
self.old_workflow_api_key = old_workflow_api_key
self.new_workflow_api_key = new_workflow_api_key
self.old_chat = ChatClient(api_key=old_workflow_api_key, base_url=baseurl) self.old_chat = ChatClient(api_key=old_workflow_api_key, base_url=baseurl)
self.new_chat = ChatClient(api_key=new_workflow_api_key, base_url=baseurl) self.new_chat = ChatClient(api_key=new_workflow_api_key, base_url=baseurl)
# 评判相关参数
self.output_path = output_path or os.path.join(os.path.dirname(self.excel_path), "dify问答_综合评判结果.xlsx")
self.max_workers = max_workers
self.content_source_parser = PydanticOutputParser(pydantic_object=ContentSource)
self.results_lock = Lock()
# 读取Wiki Excel文件
if wiki_excel_path and os.path.exists(wiki_excel_path):
self.wiki_excel = pd.read_excel(wiki_excel_path)
else:
self.wiki_excel = None
def get_llm(self):
api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_API_BASE")
model = os.getenv("LLM_MODEL_NAME")
return OpenAiLLM(api_key=api_key, base_url=base_url, model=model)
def process_question(self, q:str): def process_question(self, q:str):
""" """
处理单个问题,并行获取新旧流程的回答 处理单个问题,并行获取新旧流程的回答
@@ -62,40 +96,393 @@ class DifyComparisonTester:
new_result = future_new.result() new_result = future_new.result()
old_message_id = old_result["message_id"] old_message_id = old_result["message_id"]
new_message_id = new_result["message_id"] new_message_id = new_result["message_id"]
old_message_info = DifyTool.get_message_debug_info_id(message_id=old_message_id)
new_message_info = DifyTool.get_message_debug_info_id(message_id=new_message_id)
for workflow_node in new_message_info["workflow_node_executions_info"]:
if workflow_node["title"] == "问题优化结果解析":
outputs = json.loads(workflow_node["outputs"])
rewrite_query = outputs["optimize_query"]
old_answer = old_result["answer"] old_answer = old_result["answer"]
new_answer = new_result["answer"] new_answer = new_result["answer"]
except Exception as e: except Exception as e:
return None return None, None, None
return {"问题": q, "问题改写": rewrite_query, "旧流程答案": old_answer, "新流程答案": new_answer} return {"问题": q, "旧流程答案": old_answer, "新流程答案": new_answer}, old_message_id, new_message_id
def run_comparison(self): def find_wiki_link(self, query) -> str | None:
"""
根据查询找出对应的词条链接
Args:
query (str): 查询内容
Returns:
str: 对应的词条链接,如果没有找到则返回None
"""
# 确保query不为空
if not query or pd.isna(query):
return None
if self.wiki_excel is None:
return None
# 在"新提问"列中查找匹配的行
matched_rows = self.wiki_excel[self.wiki_excel['新提问'] == query]
# 如果找到了匹配的行,返回对应的词条链接
if not matched_rows.empty:
return matched_rows.iloc[0]['对应词条链接']
# 如果没有完全匹配,尝试部分匹配
# 去除软件名称部分(如果有)
query_parts = query.split(',', 1)
if len(query_parts) > 1:
clean_query = query_parts[1].strip()
# 在"提问"列中查找包含清理后查询的行
for idx, row in self.wiki_excel.iterrows():
if pd.notna(row['提问']) and clean_query in row['提问']:
return row['对应词条链接']
return None
def get_wiki_content(self, link) -> str:
"""
获取词条链接的内容
Args:
link (str): 词条链接
Returns:
str: 链接内容,如果获取失败则返回错误信息
"""
try:
if not link or pd.isna(link):
return "链接为空或无效"
# 移除域名部分,只保留路径
path = link.split('/', 3)[-1]
decoded_path = unquote(path)
path_parts = decoded_path.split('/')
doc_path = "/".join(path_parts[1:])
wiki_doc = WikijsTool.get_all_doc_by_path(path=doc_path, path_is_dir=False)
html_content = WikijsTool.query_doc_info(wiki_doc[0]["id"]).get('content')
if not html_content:
return "获取内容失败"
options = {"heading_style": '', "keep_inline_images_in": ["figure", "img"], "escape_asterisks": True}
new_content = (html_content.replace("h6>", "h7>")
.replace("h5>", "h6>")
.replace("h4>", "h5>")
.replace("h3>", "h4>")
.replace("h2>", "h3>")
.replace("h1>", "h2>"))
# 将HTML内容转换为Markdown
markdown_content = convert_html_to_md(new_content, "", **options)
markdown_content = f"# {path_parts[-1]}\n\n{markdown_content}"
return markdown_content
except Exception as e:
raise RuntimeError(f"获取词条内容失败: {str(e)}") from e
def get_wiki_title(self, link) -> str | None:
"""
获取词条标题
Args:
link (str): 词条链接
Returns:
str: 词条标题,如果获取失败则返回None
"""
try:
if not link or pd.isna(link):
return None
# 移除域名部分,只保留路径
path = link.split('/', 3)[-1]
decoded_path = unquote(path)
path_parts = decoded_path.split('/')
return path_parts[-1]
except Exception as e:
raise RuntimeError(f"获取词条内容失败: {str(e)}") from e
def create_correctness_prompt(self, standard_answer: str, answer_to_check: str) -> str:
"""
创建用于评判答案正确性的prompt
Args:
standard_answer (str): 标准答案
answer_to_check (str): 需要检查的答案
Returns:
str: 格式化的prompt
"""
return f"""请作为一个专业的答案评判专家,评估以下回答与标准答案的匹配程度。
标准答案:
{standard_answer}
待评估的回答:
{answer_to_check}
请仔细分析两个答案的内容,并给出你的判断。只需要回答"正确""错误",不需要其他解释。
如果待评估的回答与标准答案在核心内容和关键信息(步骤)上一致,即使表达方式不同,也应判定为"正确"
如果待评估的回答存在明显的错误信息或重要信息缺失,应判定为"错误"
请严格按以下格式输出:【正确】或【错误】:"""
def judge_answer(self, standard_answer: str, answer: str) -> bool | None:
"""
调用LLM判断回答是否正确
Args:
standard_answer (str): 标准答案(来自Wiki
answer (str): 需评判的回答
Returns:
bool | None: 判断结果,True表示正确,False表示错误,None表示判断失败
"""
prompt = self.create_correctness_prompt(standard_answer, answer)
llm = self.get_llm()
try:
response = llm.invoke(user_prompt=prompt, need_retry=True)
return "正确" in response.content
except Exception as e:
return None
def judge_by_standard_answer(self, standard_answer: str, old_answer: str, new_answer: str) -> str | None:
"""
综合判断新旧回答的正确性
Args:
standard_answer (str): 标准答案(来自Wiki
old_answer (str): 旧流程的回答
new_answer (str): 新流程的回答
Returns:
str | None: 包含新旧回答判断结果的字符串,None表示判断失败
"""
old_result = self.judge_answer(standard_answer, old_answer)
new_result = self.judge_answer(standard_answer, new_answer)
if old_result is None or new_result is None:
return None
if new_result and old_result:
return "新旧答案均正确"
elif new_result and not old_result:
return "新答案正确"
elif not new_result and old_result:
return "旧答案正确"
else:
return "新旧答案均错误"
def judge_answer_diff(self, old_answer: str, new_answer: str) -> str | None:
"""
判断新旧回答是否存在较大差异
Args:
old_answer (str): 旧流程的回答
new_answer (str): 新流程的回答
Returns:
str | None: 差异判断结果,None表示判断失败
"""
prompt = f"""请判断以下两个回答是否存在较大差异:
旧回答: {old_answer}
新回答: {new_answer}
主要是主要步骤、主要信息、或者主要主体的差异
请仅回答"存在较大差异""差异较小""""
llm = self.get_llm()
try:
response = llm.invoke(user_prompt=prompt, need_retry=True)
return "缺乏标准答案无法判断准确性,但答案差异较大" if "存在较大差异" in response.content else "缺乏标准答案无法判断准确性,但答案基本相同"
except Exception as e:
return None
def calculate_score(self, query:str, content:str) -> int:
"""
使用LLM判断query与content之间的相关性分数
Args:
query (str): 用户问题
content (str): 检索内容
Returns:
int: 相关性分数,1-10分,10代表完全相关,1代表完全不相关;-1表示评分失败
"""
try:
prompt = f"""你是一个专业的信息相关性评估助手。请根据以下标准对用户query和检索内容的相关性进行1-10评分(10=完全相关,1=完全不相关),并按指定格式输出JSON结果。
【评分标准】
10分:完全契合,主题/意图完全一致且涵盖所有关键信息
8-9分:高度相关,核心要素匹配但存在少量信息缺失
6-7分:部分相关,涉及相同主题但存在重要信息缺失
4-5分:弱相关,仅次要信息点匹配
1-3分:完全不相关或信息冲突
【评估维度】
1. 主题一致性:核心主题/意图的匹配程度
2. 内容覆盖度:是否涵盖query的关键要素
3. 信息准确性:是否存在矛盾/错误信息
4. 细节丰富度:是否提供query要求的详细信息
【输出格式】
{{
"score": 评分,
"reason": "简明扼要的评分理由(中文)"
}}
【示例】
query: "新冠疫苗的常见副作用"
内容: "辉瑞疫苗常见反应包括注射部位疼痛(84.1%)、疲劳(62.9%)"
输出: {{"score":8,"reason":"主题完全匹配,涵盖主要副作用但未提及发热等常见反应"}}
现在评估:
query: "{query}"
content: "{content}"
"""
llm = self.get_llm()
response = llm.invoke(user_prompt=prompt, need_retry=True)
# 解析JSON响应
try:
parsed_output = self.content_source_parser.parse(response.content)
return parsed_output.score
except Exception as e:
return -1
except Exception as e:
return -1
def get_retrieve_info(self, query:str, outputs:dict) -> tuple:
"""
获取检索信息并计算分数
Args:
query (str): 用户问题
outputs (dict): 检索输出结果
Returns:
tuple: (检索内容列表, 最高分, 最低分, 平均分)
"""
max_score = 0
min_score = 10
total_score = 0
valid_scores = 0
retrieve_content = []
for result in outputs["result"]:
content = result["content"].strip()
score = self.calculate_score(query=query, content=content)
if score != -1:
max_score = max(max_score, score)
min_score = min(min_score, score)
total_score += score
valid_scores += 1
content_title = content.split("\n")[0]
if content_title:
retrieve_content.append(content_title + f"--得分({score}分)")
avg_score = total_score / valid_scores if valid_scores > 0 else 0
return retrieve_content, max_score, min_score, avg_score
def process_question_with_judge(self, q:str):
"""
处理单个问题,获取新旧流程的回答并进行评判
Args:
q: 问题内容
Returns:
dict: 包含问题、回答和评判结果的字典
"""
# 获取基本的问题和回答
basic_result, old_message_id, new_message_id = self.process_question(q)
if basic_result is None:
return None
query = basic_result["问题"]
old_answer = basic_result["旧流程答案"]
new_answer = basic_result["新流程答案"]
# 获取词条链接和标准答案
wiki_url = self.find_wiki_link(query)
standard_answer = ""
answer_title = ""
try:
if wiki_url and not pd.isna(wiki_url):
standard_answer = self.get_wiki_content(wiki_url)
answer_title = self.get_wiki_title(wiki_url)
except Exception as e:
print(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}")
# 判断答案正确性
if standard_answer:
judge_result = self.judge_by_standard_answer(standard_answer, old_answer, new_answer)
else:
judge_result = self.judge_answer_diff(old_answer, new_answer)
if judge_result is None:
judge_result = ""
# 获取检索内容评分
retrieve_content = []
max_score = 0
min_score = 0
avg_score = 0
rewrite_query = ""
try:
new_message_info = DifyTool.get_message_debug_info_by_id(message_id=new_message_id)
for workflow_node in new_message_info["workflow_node_executions_info"]:
if workflow_node["title"] == "知识检索结果后处理":
outputs = json.loads(workflow_node["outputs"])
retrieve_content, max_score, min_score, avg_score = self.get_retrieve_info(query=query, outputs=outputs)
elif workflow_node["title"] == "问题优化结果解析":
outputs = json.loads(workflow_node["outputs"])
rewrite_query = outputs["optimize_query"]
except Exception as e:
print(f"处理问题 '{query}' 获取检索内容时发生错误: {str(e)}")
# 返回结果
return {
"问题": query,
"问题改写": rewrite_query,
"旧流程答案": old_answer,
"新流程答案": new_answer,
"回答判断": judge_result,
"答案词条": answer_title if answer_title else "",
"检索得到词条": "\n".join(retrieve_content) if retrieve_content else "未检索知识库",
"检索最高分": max_score,
"检索最低分": min_score,
"检索平均分": avg_score
}
def run_comparison(self, with_judge=False):
""" """
运行对比测试,处理所有问题并生成结果Excel 运行对比测试,处理所有问题并生成结果Excel
Args:
with_judge: 是否进行答案评判
Returns: Returns:
str: 输出Excel文件的路径 str: 输出Excel文件的路径
""" """
# 读取Excel文件中的问题 # 读取Excel文件中的问题
df = pd.read_excel(self.excel_path) df = pd.read_excel(self.excel_path)
questions = df.iloc[:,0].tolist() questions = df['补全后的提问'].tolist()
results = [] results = []
# 选择处理函数
process_func = self.process_question_with_judge if with_judge else self.process_question
# 按顺序处理问题 # 按顺序处理问题
with tqdm(total=len(questions), desc="处理问题进度") as pbar: with tqdm(total=len(questions), desc="处理问题进度") as pbar:
for q in questions: for q in questions:
result = self.process_question(q) result = process_func(q)
if result is not None: if result is not None:
results.append(result) results.append(result)
pbar.update(1) pbar.update(1)
# 生成输出Excel文件 # 生成输出Excel文件
out_path = os.path.join(os.path.dirname(self.excel_path), "dify问答_对比结果.xlsx") out_path = self.output_path if with_judge else os.path.join(os.path.dirname(self.excel_path), "dify问答_对比结果.xlsx")
df_results = pd.DataFrame(results) df_results = pd.DataFrame(results)
# 使用ExcelWriter设置格式 # 使用ExcelWriter设置格式
@@ -107,16 +494,16 @@ class DifyComparisonTester:
worksheet = writer.sheets['Sheet1'] worksheet = writer.sheets['Sheet1']
# 设置列宽 # 设置列宽
worksheet.set_column('A:A', 50) # 问题列宽 50个Excel单位 for col_idx, col_name in enumerate(df_results.columns):
worksheet.set_column('B:B', 70) # 旧流程答案列宽 70个Excel单位 max_len = max(df_results[col_name].astype(str).map(len).max(), len(col_name))
worksheet.set_column('C:C', 70) # 新流程答案列宽 70个Excel单位 worksheet.set_column(col_idx, col_idx, min(max_len + 2, 70))
return out_path return out_path
if __name__ == "__main__": if __name__ == "__main__":
# 定义Excel路径 # 定义Excel路径
excel_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/历史提问数据(dislike)_1000条_软件明确.xlsx") excel_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/历史提问数据(like)_提问明确.xlsx")
if not os.path.exists(excel_path): if not os.path.exists(excel_path):
print(f"错误:Excel文件不存在: {excel_path}") print(f"错误:Excel文件不存在: {excel_path}")
@@ -127,10 +514,21 @@ if __name__ == "__main__":
old_workflow_api_key = "app-wUdkWJx5zeOvmvBUZizMoSw3" old_workflow_api_key = "app-wUdkWJx5zeOvmvBUZizMoSw3"
new_workflow_api_key = "app-Lf1pQ1NVwdMfCRVNTBCOTPHT" new_workflow_api_key = "app-Lf1pQ1NVwdMfCRVNTBCOTPHT"
# Wiki Excel路径和Dify应用ID(用于评判)
wiki_excel_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/部分提问_软件名称明确.xlsx")
# 创建测试器并运行 # 创建测试器并运行
tester = DifyComparisonTester(excel_path, baseurl, old_workflow_api_key, new_workflow_api_key) tester = DifyComparisonTester(
output_file = tester.run_comparison() excel_path=excel_path,
print(f"对比结果已保存至: {output_file}") baseurl=baseurl,
old_workflow_api_key=old_workflow_api_key,
new_workflow_api_key=new_workflow_api_key,
wiki_excel_path=wiki_excel_path,
)
# 运行对比测试(带评判)
output_file = tester.run_comparison(with_judge=True)
print(f"对比评判结果已保存至: {output_file}")
# 单个问题测试示例 # 单个问题测试示例
# c = DifyChat(baseurl="http://172.20.0.145/v1", api_key="app-LjJaeLoAfqa6aoGzqU9UvxSf") # c = DifyChat(baseurl="http://172.20.0.145/v1", api_key="app-LjJaeLoAfqa6aoGzqU9UvxSf")
+39 -9
View File
@@ -3,12 +3,9 @@ import random
import time import time
from typing import List, Optional, Dict from typing import List, Optional, Dict
from threading import Lock from threading import Lock
import requests
API_KEY_LIST=[ API_KEY_LIST=[
"sk-xxaiabmfhzwwpijuledllkmkzhzwsqeicjxmjwnvriqpwmpk",
"sk-lldcprpqjhgdimiwewgbthngfbrazhkiuioubmaatrcpjjum",
"sk-bppugibbtvujomvoysnbcdzpcwndxtwrkfvmgbkbzcmobdon",
"sk-hnqitgdlfrrnpimcfxigqibstqquintnzpiidsshpajjyxqd",
"sk-hrojkkkrrkmsajtnizokbcgexsfggdiqavbtvbayuwqbnmom", "sk-hrojkkkrrkmsajtnizokbcgexsfggdiqavbtvbayuwqbnmom",
"sk-kkdklmnyompoiotzkfqahpayzlkgogfudjkyaebehtsowvid", "sk-kkdklmnyompoiotzkfqahpayzlkgogfudjkyaebehtsowvid",
"sk-sfxzvllifafbyfduupcdtcrjwhdyiyojnksyopnfslurnhsp", "sk-sfxzvllifafbyfduupcdtcrjwhdyiyojnksyopnfslurnhsp",
@@ -39,6 +36,26 @@ API_KEY_LIST=[
"sk-ocglenyvxkkvzupzumoypnyndjpjqhivyqpedusunboglspz", "sk-ocglenyvxkkvzupzumoypnyndjpjqhivyqpedusunboglspz",
"sk-dtbawdwajkhdctrukundbkqwswzfzihqbebfuvqnfnounbuc", "sk-dtbawdwajkhdctrukundbkqwswzfzihqbebfuvqnfnounbuc",
"sk-zqiyiqtbwqgyeenkvppymfbkspriolwbnxnjakugzxyvcuql", "sk-zqiyiqtbwqgyeenkvppymfbkspriolwbnxnjakugzxyvcuql",
"sk-wtnjpejveiobtvzsmnuaefqkocsafbfyrtqkkyqardndtxcs",
"sk-gqdvtrwvzxewnagwsfakrvajtzwgcknatpflkesyqhzjrlal",
"sk-plivglrkxahodgtgjlaqdjusdoerxspjbcbizaybicarfyuk",
"sk-gxwaotlyeunxdagmybluhxkberlvohhzteryqgbhbcpztpds",
"sk-vigugvoqrdqcgkxaiympdmbqtdhpjidylphdcodovfwjpjlf",
"sk-dgmbountewzxgwcwczyslehpcyejtkhpppibswzcvrjbywey",
"sk-ksqdvuisvvraeogskmgrwzpolzrfyelqhrajscrylncemyym",
"sk-vegffsoueyrbtlfbdzfppgtgwouuetoragimogulcncmutnx",
"sk-moprznmsibswkwnnjwmgssumqhoxdmsxelekkmptseyeussz",
"sk-kskakjerttqezqqqmdmcbnqssjztrogwqncadgekhmwzjukr",
"sk-ozwjvlbatnmfjgrxesjkzuzdgpvehmmgswcqctggjxmjgxck",
"sk-wjmmlmobcayarcvhdeiybhbwdoaacnlltuxyixcwplhedzht",
"sk-twuvwqxstatdddkobxthzhoddrritsikvnvwuvtqjxwaxhsf",
"sk-bludnuajavlgdfmelatzsdqhvaxthgagttelsbpviqwiehua",
"sk-nxxdpmesfzcfzdlnhpsoslajtwbsnzixfjdkuzfjywfktapx",
"sk-arayylrvatezqgmdbxvxqxydqnmbydbkpkskzxzszmrkkcrq",
"sk-vxvccjyewpgnnnxpsqkijsawhhpjctcdlfljwfwtguwnmetf",
"sk-zjwbwyocnuqxfshlpgfzdwlgjjrpewzgvoqwzyhufisidnos",
"sk-kjxpzjbteiurpzhwjbbjqpjjfoewsahpjtmyqwectdubxhgf",
"sk-sqdcnhapyzudneipdsuqlfawusrndxqkuwoaoumtonwdnppo",
] ]
class APIKeyManager: class APIKeyManager:
@@ -142,6 +159,22 @@ class APIKeyManager:
instance = cls.get_instance() instance = cls.get_instance()
return len(instance.api_keys) return len(instance.api_keys)
@classmethod
def get_key_usage_stats(cls, key: str) -> Dict:
"""
静态方法:获取API密钥使用统计信息
Returns:
API密钥使用统计信息
"""
url = "https://api.siliconflow.cn/v1/user/info"
headers = {"Authorization": f"Bearer {key}"}
response = requests.request("GET", url, headers=headers)
return response.json()
def __init__(self, env_var_name: str = "OPENAI_API_KEY", separator: str = ";"): def __init__(self, env_var_name: str = "OPENAI_API_KEY", separator: str = ";"):
""" """
初始化API密钥管理器 初始化API密钥管理器
@@ -251,10 +284,6 @@ class APIKeyManager:
# 使用示例 # 使用示例
if __name__ == "__main__": if __name__ == "__main__":
# 获取有效的API密钥列表
valid_keys = APIKeyManager.get_valid_api_keys()
print(f"有效的API密钥列表:\n" + "\n".join(valid_keys))
# 查看总密钥数 # 查看总密钥数
print(f"总共有 {APIKeyManager.count()} 个API密钥") print(f"总共有 {APIKeyManager.count()} 个API密钥")
@@ -263,4 +292,5 @@ if __name__ == "__main__":
instance = APIKeyManager.get_instance() instance = APIKeyManager.get_instance()
stats = instance.get_usage_stats() stats = instance.get_usage_stats()
for key, data in stats.items(): for key, data in stats.items():
print(f"密钥 {key[:5]}... 使用次数: {data['count']}") usage_stats = APIKeyManager.get_key_usage_stats(key)
print(f"api key:{key}---赠送余额:{usage_stats['data']['balance']}")