Files
QueryRewrite/rag2_0/demo/dialogue_to_workorder.py
T

642 lines
27 KiB
Python
Executable File

import os
import json
import pandas as pd
import argparse
from datetime import datetime
import time
import concurrent.futures
from functools import wraps
from pydantic import BaseModel, Field
from langchain.output_parsers import PydanticOutputParser
from rag2_0.tool.ModelTool import OpenAiLLM
from dotenv import load_dotenv
import httpx
import traceback
import re
import logging
load_dotenv()
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('dialogue_to_workorder.log', encoding='utf-8')
]
)
logger = logging.getLogger("dialogue_to_workorder")
# ================ 模型定义 ================
class UserQuestionAndSolution(BaseModel):
user_question: str = Field(description="用户的核心问题")
solution: str = Field(description="坐席提供的解决方案")
class UserQuestionAndSolutionList(BaseModel):
user_question_list: list[UserQuestionAndSolution] = Field(description="客户问题列表")
class QuestionType(BaseModel):
question_type: str = Field(description="问题类型")
class IsComplaint(BaseModel):
is_dissatisfaction: bool = Field(description="是否抱怨")
dissatisfaction_level: str = Field(description="抱怨级别")
dissatisfaction_reasoning: str = Field(description="抱怨原因")
is_complaint: bool = Field(description="是否明确/暗示将进行投诉")
class ProductNameAndModuleName(BaseModel):
product_name: str = Field(description="产品名称")
module_name: str = Field(description="模块名称")
class ProductLine(BaseModel):
product_line: str = Field(description="产品线")
# ================ 工具函数 ================
def retry_llm_call(max_retries=3, delay=2):
"""
重试装饰器,用于LLM调用失败时进行重试
参数:
max_retries: 最大重试次数
delay: 重试间隔时间(秒)
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
last_exception = None
while retries < max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
retries += 1
logger.warning(f"LLM调用失败,正在进行第{retries}次重试: {str(e)}")
if retries < max_retries:
time.sleep(delay*retries)
stack_trace = traceback.format_exc()
logger.error(f"LLM调用失败,堆栈跟踪信息:\n{stack_trace}")
# 所有重试都失败后,抛出最后一次的异常
logger.error(f"LLM调用失败,已达到最大重试次数{max_retries}")
raise last_exception
return wrapper
return decorator
# ================ 对话转工单处理类 ================
class DialogueToWorkorder:
def __init__(self, llm_params=None):
"""
初始化对话转工单处理类
参数:
llm_params: LLM模型参数,如果为None则使用环境变量中的配置
"""
# 初始化输出解析器
self.user_question_and_solution_parser = PydanticOutputParser(pydantic_object=UserQuestionAndSolution)
self.user_question_and_solution_list_parser = PydanticOutputParser(pydantic_object=UserQuestionAndSolutionList)
self.question_type_parser = PydanticOutputParser(pydantic_object=QuestionType)
self.is_complaint_parser = PydanticOutputParser(pydantic_object=IsComplaint)
self.product_name_and_module_name_parser = PydanticOutputParser(pydantic_object=ProductNameAndModuleName)
self.product_line_parser = PydanticOutputParser(pydantic_object=ProductLine)
# 初始化LLM模型
self.llm_params = llm_params or {
"temperature": 0.2,
"top_p":0.95,
"model": "deepseek-ai/DeepSeek-R1",
"api_key": os.getenv("OPENAI_API_KEY"),
"base_url": os.getenv("OPENAI_API_BASE"),
"timeout": httpx.Timeout(600.0)
}
self.llm = self._get_llm_instance()
def _get_llm_instance(self):
"""获取LLM实例"""
return OpenAiLLM(**self.llm_params)
def parse_product_detail_excel(self, file_path):
"""解析产品详情Excel文件"""
df = pd.read_excel(file_path)
product_dict = {}
for _, row in df.iterrows():
product_line = str(row['产品线']).strip() if pd.notna(row['产品线']) else ''
product_name = str(row['产品名称']).strip() if pd.notna(row['产品名称']) else ''
module_name = str(row['模块名称']).strip() if pd.notna(row['模块名称']) else ''
if product_line not in product_dict:
product_dict[product_line] = {}
if product_name not in product_dict[product_line]:
product_dict[product_line][product_name] = []
product_dict[product_line][product_name].append(module_name)
return product_dict
def get_workorder_dict(self, rows):
"""从会话行中提取工单基本信息"""
workorder_dict = {}
# 创建时间
for row in rows:
create_time = row['创建时间']
if pd.notna(create_time) and str(create_time).strip() != '':
workorder_dict["创建时间"] = create_time
break
# 处理坐席
for row in rows:
sender = row['消息发送者']
sender_nickname = row['发送者昵称']
if sender == "坐席" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '':
workorder_dict["处理坐席"] = sender_nickname
break
# 访客昵称
for row in rows:
sender = row['消息发送者']
sender_nickname = row['发送者昵称']
if sender == "访客" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '':
workorder_dict["访客昵称"] = sender_nickname
break
# 会话id
for row in rows:
conversation_id = row['会话id']
if pd.notna(conversation_id) and str(conversation_id).strip() != '':
workorder_dict["会话id"] = conversation_id
break
# 工单编号 - 将"创建时间"作为工单编号,格式化为20250513104124
if "创建时间" in workorder_dict and pd.notna(workorder_dict["创建时间"]):
create_time_str = str(workorder_dict["创建时间"]).strip()
dt = datetime.strptime(create_time_str, "%Y-%m-%d %H:%M:%S")
workorder_dict["工单编号"] = dt.strftime("%Y%m%d%H%M%S")
return workorder_dict
def get_dialogue_str(self, conversation_rows):
"""从会话行中提取对话内容"""
dialogue = []
for row in conversation_rows:
sender = row.get('消息发送者', '')
content = str(row.get('消息内容', '')).strip()
# 处理非文本内容
if content == '' or pd.isna(row["消息内容"]):
if str(row.get('图片', '')).strip() != '':
content = '[图片]'
elif str(row.get('附件', '')).strip() != '':
content = '[附件]'
elif str(row.get('视频', '')).strip() != '':
content = '[视频]'
elif str(row.get('语音', '')).strip() != '':
content = '[语音]'
# 添加对话内容
if sender == '访客':
dialogue.append(f"访客:{content}")
elif sender == '坐席':
dialogue.append(f"坐席:{content}")
return '\n'.join(dialogue)
def group_conversations_by_id(self, df):
"""将数据按会话ID分组"""
conversation_dict = {}
for index, row in df.iterrows():
conversation_id = row['会话id']
if pd.notna(conversation_id) and str(conversation_id).strip() != '':
if conversation_id in conversation_dict:
conversation_dict[conversation_id].append(row.to_dict())
else:
conversation_dict[conversation_id] = [row.to_dict()]
return conversation_dict
@retry_llm_call(max_retries=3, delay=2)
def get_user_question_and_solution(self, conversation_rows):
"""分析用户问题和解决方案"""
dialogue_str = self.get_dialogue_str(conversation_rows)
prompt = """请从以下电力造价相关的客服对话记录中,识别并总结用户提出的核心问题及对应坐席提供的解决方案。(注意指代消除)
1、理解对话记录,总结用户在此次对话中提出的核心问题,以用户的角度总结核心问题(可根据上下文完善问题内容)。
2、根据用户提出的问题,分析坐席提供的解决方法(比如:1、引导用户xxxx。2、告诉用户xxxxx)。以坐席的角度直接总结完整的解决方案或应对措施(不要出现"坐席"、"我"等字样)
3、提炼访客独立的核心问题(以访客的角度总结核心问题),核心问题衍生、细化后的请求合并到对应的核心问题中。不要单独列出衍生、细化后的请求。
4、使用json格式输出(多个用户问题采用标准json数组格式输出):
{output_format}
输出示例:
```json
{{
"user_question": "技改软件打开报错",
"solution": "1、告知报错原因 2、通过远程辅助解决"
}}
```
=======对话记录如下所示=======
{dialogue_str}
============================
"""
output_format = self.user_question_and_solution_parser.get_format_instructions()
llm_prompt = prompt.format(output_format=output_format, dialogue_str=dialogue_str)
response = self.llm.invoke(user_prompt=llm_prompt, need_retry=False)
try:
if response.content.count('user_question') == 1:
user_question_and_solution = self.user_question_and_solution_parser.parse(response.content)
return [user_question_and_solution]
else:
array_pattern = r'\[\s*(\{[\s\S]*?\}(?:\s*,\s*\{[\s\S]*?\})*)\s*\]'
array_match = re.search(array_pattern, response.content)
if array_match:
# 找到了JSON数组
json_array_str = '[' + array_match.group(1) + ']'
# 尝试解析JSON
json_objects = json.loads(json_array_str)
user_question_and_solution_list = []
for obj in json_objects:
user_question = obj.get('user_question', '')
solution = obj.get('solution', '')
user_question_and_solution_list.append(
UserQuestionAndSolution(user_question=user_question, solution=solution)
)
if user_question_and_solution_list:
return user_question_and_solution_list
raise Exception("解析失败") # 如果之前没有return 触发异常重新提取
except Exception as e:
output_format = self.user_question_and_solution_list_parser.get_format_instructions()
llm_prompt = prompt.format(output_format=output_format, dialogue_str=dialogue_str)
response = self.llm.invoke(user_prompt=llm_prompt, need_retry=False)
user_question_and_solution_temp = self.user_question_and_solution_list_parser.parse(response.content)
return user_question_and_solution_temp.user_question_list
return [user_question_and_solution]
@retry_llm_call(max_retries=3, delay=2)
def get_product_name_and_module_name(self, product_line, conversation_rows, product_detail_dict, user_question_str, solution_str):
"""分析产品名称和模块名称"""
if product_line == '':
return '', ''
json_str = json.dumps(product_detail_dict[product_line])
dialogue_str = self.get_dialogue_str(conversation_rows)
prompt = f"""
请根据以下对话内容分析所属产品名称和模块名称,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。
要求:
1. 如果对话记录中存在多个产品名称和模块名称,则根据"{user_question_str}"和"{solution_str}"判断最可能的产品名称和模块名称。
2. 如果对话记录中只存在一个产品名称和模块名称,则直接返回该产品名称和模块名称。
输出格式:
{self.product_name_and_module_name_parser.get_format_instructions()}
产品名称列表及模块名称列表:
{json_str}
对话记录:
{dialogue_str}
"""
response = self.llm.invoke(user_prompt=prompt, need_retry=False)
product_name_and_module_name = self.product_name_and_module_name_parser.parse(response.content)
return product_name_and_module_name.product_name, product_name_and_module_name.module_name
@retry_llm_call(max_retries=3, delay=2)
def get_product_line(self, conversation_rows, product_detail_dict, user_question_str, solution_str):
"""分析产品线"""
dialogue_str = self.get_dialogue_str(conversation_rows)
product_line_list = list(product_detail_dict.keys())
prompt = f"""
请根据以下对话内容分析所属产品线,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。
无法判断时,返回空字符串。即product_line=""
要求:
1. 如果对话记录中存在多个产品线,则根据"{user_question_str}"和"{solution_str}"判断最可能的产品线。
2. 如果对话记录中只存在一个产品线,则直接返回该产品线。
输出格式:
{self.product_line_parser.get_format_instructions()}
产品线列表:
{product_line_list}
对话记录:
{dialogue_str}
"""
response = self.llm.invoke(user_prompt=prompt, need_retry=False)
product_line = self.product_line_parser.parse(response.content)
return product_line.product_line
@retry_llm_call(max_retries=3, delay=2)
def get_problem_type(self, conversation_rows, user_question_str, solution_str):
"""分析问题类型"""
dialogue_str = self.get_dialogue_str(conversation_rows)
prompt = f"""
请根据以下对话内容分析所属业务类别,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。
分类体系:
1. 软件需求 - 涉及功能新增/改进建议、系统集成需求等
2. 数据问题 - 数据导入导出、格式转换、计算异常等问题
3. 专业咨询 - 造价计算标准、定额套用、行业规范解读等
4. 功能操作 - 软件功能使用步骤、界面操作指导
5. 故障排查 - 系统崩溃、性能问题、兼容性故障
6. 培训支持 - 功能教学、操作手册获取、培训需求
7. 计价依据 - 定额库更新、材料价格库维护、地方标准差异
9. 其他 - 无法归类的对话内容
处理要求:
1. 分析时注意识别电力行业特有表述(如"杆塔组立"、"电缆敷设"等专业工序)
2. 区分操作类问题与技术故障(如"报表导出失败"需区分是操作错误还是系统错误)
3. 对涉及专业计算的咨询,需判断是否属于常规咨询(专业咨询)或系统计算异常(数据问题)
4. 对模糊表述要求追问的场景归入培训支持类
5. 对话记录可能存在多个问题,你只需要判断"{user_question_str}"属于哪个类型的问题
输出格式:
{self.question_type_parser.get_format_instructions()}
对话记录如下:
{dialogue_str}
"""
response = self.llm.invoke(user_prompt=prompt, need_retry=False)
question_type = self.question_type_parser.parse(response.content)
return question_type.question_type
@retry_llm_call(max_retries=3, delay=2)
def get_is_complaint_and_is_complaint_level(self, conversation_rows):
"""分析是否抱怨及抱怨级别"""
dialogue_str = self.get_dialogue_str(conversation_rows)
prompt = f"""
请根据以下对话记录分析访客情绪是否对软件或者坐席服务存在明显抱怨,并按照以下结构输出JSON格式分析结果:
1. 抱怨识别:判断访客是否对软件功能或者坐席服务存在明显抱怨或不满
2. 抱怨分级(如存在抱怨):
- 普通抱怨:明确表达出对软件功能或者坐席服务存在不满
- 严重抱怨:明确表达出对软件功能或者坐席服务存在不满,语气较为强烈
3. 投诉倾向:是否明确/暗示将进行投诉
4. 抱怨对象:坐席服务态度/业务能力 或 软件功能问题(注意忽略对非软件或坐席的抱怨)
示例输出:
{{
"is_dissatisfaction": true,
"dissatisfaction_level": "严重抱怨",
"dissatisfaction_reasoning": "博微软件缺陷导致实际损失",
"is_complaint": "true"
}}
输出格式要求:
{self.is_complaint_parser.get_format_instructions()}
当前对话记录:
{dialogue_str}
"""
response = self.llm.invoke(user_prompt=prompt, need_retry=False)
is_complaint = self.is_complaint_parser.parse(response.content)
return (is_complaint.is_dissatisfaction,
is_complaint.dissatisfaction_level,
is_complaint.dissatisfaction_reasoning,
is_complaint.is_complaint)
def process_conversation(self, conversation_id, conversation_rows, product_detail_dict):
"""处理单个会话的函数,用于多线程并发"""
# if conversation_id!="b157aa91-3acb-11f0-a191-4fb224ef4b40":
# return []
# 获取工单基本信息
base_workorder_dict = self.get_workorder_dict(conversation_rows)
# 分析用户问题和解决方案
user_question_list = self.get_user_question_and_solution(conversation_rows)
# 获取第一个问题和解决方案,用于后续分析
if user_question_list and len(user_question_list) > 0:
first_question = user_question_list[0]
user_question_str = first_question.user_question
solution_str = first_question.solution
else:
user_question_str = ""
solution_str = ""
# 分析是否抱怨、是否投诉、抱怨级别
is_dissatisfaction, dissatisfaction_level, dissatisfaction_reasoning, is_complaint = (
self.get_is_complaint_and_is_complaint_level(conversation_rows))
# 分析问题类型
problem_type = self.get_problem_type(conversation_rows, user_question_str, solution_str)
# 分析产品线
product_line = self.get_product_line(conversation_rows, product_detail_dict, user_question_str, solution_str)
# 分析产品名称和模块名称
if product_line != '':
product_name, module_name = self.get_product_name_and_module_name(
product_line, conversation_rows, product_detail_dict, user_question_str, solution_str)
else:
product_name = ''
module_name = ''
# 创建工单列表
workorder_list = []
for user_question in user_question_list:
user_question_str = user_question.user_question
solution_str = user_question.solution
# 创建新的工单字典,复制基本信息
workorder_dict = base_workorder_dict.copy()
# 更新工单字典
workorder_dict.update({
"产品线": product_line,
"产品名称": product_name,
"模块名称": module_name,
"客户问题": user_question_str,
"问题类型": problem_type,
"是否抱怨": "是" if is_dissatisfaction else '否',
"抱怨级别": dissatisfaction_level if is_dissatisfaction else '',
"是否投诉": "是" if is_complaint else '否',
"解决方案": (solution_str + '\n存在抱怨:' + dissatisfaction_reasoning) if is_dissatisfaction else solution_str
})
# 将工单添加到列表中
workorder_list.append(workorder_dict)
return workorder_list
def analyze_conversation_data(self, conversation_excel_path, product_detail_excel_path, max_workers=10):
"""分析会话数据主流程,使用多线程并发处理"""
# 读取Excel文件
df = pd.read_excel(conversation_excel_path)
# 检查数据框的列
logger.info(f"Excel文件列名: {df.columns.tolist()}")
logger.info(f"数据总行数: {len(df)}")
# 解析产品详情
product_detail_dict = self.parse_product_detail_excel(product_detail_excel_path)
# 按会话ID分组
conversation_dict = self.group_conversations_by_id(df)
# 限制处理的会话数量为前2000个
if len(conversation_dict) > 2000:
logger.info(f"会话总数为 {len(conversation_dict)},限制处理前2000个会话")
# 获取所有会话ID
conversation_ids = list(conversation_dict.keys())
# 只保留前2000个会话
limited_conversation_dict = {
conversation_id: conversation_dict[conversation_id]
for conversation_id in conversation_ids[:2000]
}
conversation_dict = limited_conversation_dict
else:
logger.info(f"会话总数为 {len(conversation_dict)},处理全部会话")
# 使用线程池处理每个会话
workorder_dict_list = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 创建任务
future_to_conversation = {
executor.submit(self.process_conversation, conversation_id, conversation_rows, product_detail_dict): conversation_id
for conversation_id, conversation_rows in conversation_dict.items()
}
# 获取结果
for future in concurrent.futures.as_completed(future_to_conversation):
conversation_id = future_to_conversation[future]
try:
result_workorders = future.result()
# 将每个会话的所有工单添加到总列表中
workorder_dict_list.extend(result_workorders)
logger.info(f"完成处理会话ID: {conversation_id},生成工单数量: {len(result_workorders)}")
except Exception as exc:
logger.error(f"处理会话ID: {conversation_id} 时发生错误: {exc}")
return workorder_dict_list
def save_results_to_excel(self, workorder_dict_list, output_file=None):
"""将结果保存到Excel文件"""
result_df = pd.DataFrame(workorder_dict_list)
# 按照指定的列顺序重新排列DataFrame的列
columns_order = [
'工单编号', '产品线', '产品名称', '模块名称', '问题类型',
'客户问题', '解决方案', '是否抱怨', '是否投诉', '抱怨级别',
'会话id', '访客昵称', '处理坐席', '创建时间'
]
# 确保所有列都存在,如果不存在则添加空列
for col in columns_order:
if col not in result_df.columns:
result_df[col] = None
# 按指定顺序重排列
result_df = result_df[columns_order]
# 保存到Excel文件
if output_file is None:
# 默认输出文件名
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
output_file = os.path.join('data', 'excel', f'会话内容详情{timestamp}_转工单.xlsx')
# 确保目录存在
os.makedirs(os.path.dirname(output_file), exist_ok=True)
# 创建ExcelWriter对象,用于设置Excel样式
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
# 写入数据
result_df.to_excel(writer, index=False, sheet_name='工单数据')
# 获取工作簿和工作表
workbook = writer.book
worksheet = writer.sheets['工单数据']
# 设置行高(20磅 ≈ 26.67像素)
for row in worksheet.iter_rows():
worksheet.row_dimensions[row[0].row].height = 20
# 设置列宽
column_widths = {
'工单编号': 15,
'产品线': 24,
'产品名称': 40,
'模块名称': 40,
'问题类型': 9,
'客户问题': 20,
'解决方案': 30,
'是否抱怨': 9,
'是否投诉': 9,
'抱怨级别': 9,
'会话id': 9,
'访客昵称': 9,
'处理坐席': 9,
'创建时间': 9
}
# 应用列宽设置
for i, column in enumerate(columns_order):
col_letter = chr(65 + i) # A, B, C, ...
if i >= 26: # 超过Z的情况
col_letter = chr(64 + i // 26) + chr(65 + i % 26)
worksheet.column_dimensions[col_letter].width = column_widths[column]
logger.info(f"结果已保存到 {output_file}")
return output_file
# ================ 参数解析 ================
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description='对话内容转工单工具')
parser.add_argument('--conversation_file', type=str, required=False,
help='会话内容Excel文件路径')
parser.add_argument('--product_detail_file', type=str, required=False,
help='产品详情Excel文件路径')
parser.add_argument('--max_workers', type=int, default=16,
help='并发处理线程数,默认为16')
return parser.parse_args()
# ================ 主程序入口 ================
def main():
"""主程序入口"""
# 解析命令行参数
args = parse_arguments()
# 设置默认文件路径
conversation_excel_path = args.conversation_file or os.path.join('data', 'excel', '2025年1月到6月12号所有对话记录.xlsx')
product_detail_excel_path = args.product_detail_file or os.path.join('data', 'excel', '产品详情_工单.xlsx')
# 创建处理实例
processor = DialogueToWorkorder()
# 分析会话数据
workorder_dict_list = processor.analyze_conversation_data(
conversation_excel_path,
product_detail_excel_path,
max_workers=args.max_workers
)
output_file = conversation_excel_path.replace('.xlsx', '_转工单.xlsx')
# 保存结果
processor.save_results_to_excel(workorder_dict_list, output_file)
if __name__ == "__main__":
main()