删除合并脚本,添加会话转工单脚本逻辑

This commit is contained in:
2025-05-28 10:00:33 +08:00
parent ce27ddd823
commit 6270513688
7 changed files with 955 additions and 785 deletions
+486
View File
@@ -0,0 +1,486 @@
import os
import json
import pandas as pd
from datetime import datetime
import time
import concurrent.futures
from functools import wraps
from pydantic import BaseModel, Field
from langchain.output_parsers import PydanticOutputParser
from rag2_0.tool.ModelTool import OpenAiLLM
from dotenv import load_dotenv
load_dotenv()
# ================ 模型定义 ================
class UserQuestionAndSolution(BaseModel):
user_question: str = Field(description="客户问题")
solution: str = Field(description="坐席提供的解决方案")
class QuestionType(BaseModel):
question_type: str = Field(description="问题类型")
class IsComplaint(BaseModel):
is_dissatisfaction: bool = Field(description="是否抱怨")
dissatisfaction_level: str = Field(description="抱怨级别")
dissatisfaction_reasoning: str = Field(description="抱怨原因")
is_complaint: bool = Field(description="是否明确/暗示将进行投诉")
class ProductNameAndModuleName(BaseModel):
product_name: str = Field(description="产品名称")
module_name: str = Field(description="模块名称")
class ProductLine(BaseModel):
product_line: str = Field(description="产品线")
# 初始化输出解析器
user_question_and_solution_parser = PydanticOutputParser(pydantic_object=UserQuestionAndSolution)
question_type_parser = PydanticOutputParser(pydantic_object=QuestionType)
is_complaint_parser = PydanticOutputParser(pydantic_object=IsComplaint)
product_name_and_module_name_parser = PydanticOutputParser(pydantic_object=ProductNameAndModuleName)
product_line_parser = PydanticOutputParser(pydantic_object=ProductLine)
# ================ LLM配置 ================
def retry_llm_call(max_retries=3, delay=2):
"""
重试装饰器,用于LLM调用失败时进行重试
参数:
max_retries: 最大重试次数
delay: 重试间隔时间(秒)
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
last_exception = None
while retries < max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
retries += 1
print(f"LLM调用失败,正在进行第{retries}次重试: {str(e)}")
if retries < max_retries:
time.sleep(delay*retries)
# 所有重试都失败后,抛出最后一次的异常
print(f"LLM调用失败,已达到最大重试次数{max_retries}")
raise last_exception
return wrapper
return decorator
def get_llm_instance():
"""获取LLM实例"""
api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_API_BASE")
model_name = os.getenv("LLM_MODEL_NAME")
llm_params = {
"temperature": 0.6,
"model": model_name,
"api_key": api_key,
"base_url": base_url
}
return OpenAiLLM(**llm_params)
# ================ 数据处理函数 ================
def parse_product_detail_excel(file_path):
"""解析产品详情Excel文件"""
df = pd.read_excel(file_path)
product_dict = {}
for _, row in df.iterrows():
product_line = str(row['产品线']).strip() if pd.notna(row['产品线']) else ''
product_name = str(row['产品名称']).strip() if pd.notna(row['产品名称']) else ''
module_name = str(row['模块名称']).strip() if pd.notna(row['模块名称']) else ''
if product_line not in product_dict:
product_dict[product_line] = {}
if product_name not in product_dict[product_line]:
product_dict[product_line][product_name] = []
product_dict[product_line][product_name].append(module_name)
return product_dict
def get_workorder_dict(rows):
"""从会话行中提取工单基本信息"""
workorder_dict = {}
# 创建时间
for row in rows:
create_time = row['创建时间']
if pd.notna(create_time) and str(create_time).strip() != '':
workorder_dict["创建时间"] = create_time
break
# 处理坐席
for row in rows:
sender = row['消息发送者']
sender_nickname = row['发送者昵称']
if sender == "坐席" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '':
workorder_dict["处理坐席"] = sender_nickname
break
# 访客昵称
for row in rows:
sender = row['消息发送者']
sender_nickname = row['发送者昵称']
if sender == "访客" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '':
workorder_dict["访客昵称"] = sender_nickname
break
# 会话id
for row in rows:
conversation_id = row['会话id']
if pd.notna(conversation_id) and str(conversation_id).strip() != '':
workorder_dict["会话id"] = conversation_id
break
# 工单编号 - 将"创建时间"作为工单编号,格式化为20250513104124
if "创建时间" in workorder_dict and pd.notna(workorder_dict["创建时间"]):
create_time_str = str(workorder_dict["创建时间"]).strip()
dt = datetime.strptime(create_time_str, "%Y-%m-%d %H:%M:%S")
workorder_dict["工单编号"] = dt.strftime("%Y%m%d%H%M%S")
return workorder_dict
def get_dialogue_str(conversation_rows):
"""从会话行中提取对话内容"""
dialogue = []
for row in conversation_rows:
sender = row.get('消息发送者', '')
content = str(row.get('消息内容', '')).strip()
# 处理非文本内容
if content == '' or pd.isna(row["消息内容"]):
if str(row.get('图片', '')).strip() != '':
content = '[图片]'
elif str(row.get('附件', '')).strip() != '':
content = '[附件]'
elif str(row.get('视频', '')).strip() != '':
content = '[视频]'
elif str(row.get('语音', '')).strip() != '':
content = '[语音]'
# 添加对话内容
if sender == '访客':
dialogue.append(f"访客:{content}")
elif sender == '坐席':
dialogue.append(f"坐席:{content}")
return '\n'.join(dialogue)
def group_conversations_by_id(df):
"""将数据按会话ID分组"""
conversation_dict = {}
for index, row in df.iterrows():
conversation_id = row['会话id']
if pd.notna(conversation_id) and str(conversation_id).strip() != '':
if conversation_id in conversation_dict:
conversation_dict[conversation_id].append(row.to_dict())
else:
conversation_dict[conversation_id] = [row.to_dict()]
return conversation_dict
# ================ LLM分析函数 ================
@retry_llm_call(max_retries=3, delay=2)
def get_user_question_and_solution(conversation_rows):
"""分析用户问题和解决方案"""
dialogue_str = get_dialogue_str(conversation_rows)
prompt = f"""
请从以下电力造价相关的客服对话记录中,精准提取用户提出的专业问题及对应坐席提供的解决方案。要求:
1. 专业识别:
- 重点识别电力工程领域的专业术语(如:定额套用、工程量清单、概预算编制、造价指标分析等)
- 注意区分不同业务场景(输变电工程、配网改造、新能源项目等)
- 识别政策文件引用(如:国网Q/GDW 11337-2014标准)
2. 信息提取:
用户问题提取:
- 核心诉求(成本核算/计价争议/软件操作等)
- 涉及的专业环节(设计概算/施工图预算/竣工结算)
- 具体技术参数(电压等级/线路长度/设备型号)
坐席解决方案提取:
- 提供的计算方法(单位工程法/实物量法)
- 推荐的计价依据(电力建设工程定额2018版)
- 指导的软件操作步骤(博微软件操作)
- 政策法规应用建议
- 文件模板提供情况
3. 结构化输出:
{user_question_and_solution_parser.get_format_instructions()}
访客与坐席的对话记录如下:
{dialogue_str}
"""
llm = get_llm_instance()
response = llm.invoke(user_prompt=prompt)
user_question_and_solution = user_question_and_solution_parser.parse(response.content)
return user_question_and_solution.user_question, user_question_and_solution.solution
@retry_llm_call(max_retries=3, delay=2)
def get_product_name_and_module_name(product_line, conversation_rows, product_detail_dict):
"""分析产品名称和模块名称"""
if product_line == '':
return '', ''
json_str = json.dumps(product_detail_dict[product_line])
dialogue_str = get_dialogue_str(conversation_rows)
prompt = f"""
请根据以下对话内容分析所属产品名称和模块名称,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。
输出格式:
{product_name_and_module_name_parser.get_format_instructions()}
产品名称列表及模块名称列表:
{json_str}
对话记录:
{dialogue_str}
"""
llm = get_llm_instance()
response = llm.invoke(user_prompt=prompt)
product_name_and_module_name = product_name_and_module_name_parser.parse(response.content)
return product_name_and_module_name.product_name, product_name_and_module_name.module_name
@retry_llm_call(max_retries=3, delay=2)
def get_product_line(conversation_rows, product_detail_dict):
"""分析产品线"""
dialogue_str = get_dialogue_str(conversation_rows)
product_line_list = list(product_detail_dict.keys())
prompt = f"""
请根据以下对话内容分析所属产品线,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。
无法判断时,返回空字符串。即product_line=""
输出格式:
{product_line_parser.get_format_instructions()}
产品线列表:
{product_line_list}
对话记录:
{dialogue_str}
"""
llm = get_llm_instance()
response = llm.invoke(user_prompt=prompt)
product_line = product_line_parser.parse(response.content)
return product_line.product_line
@retry_llm_call(max_retries=3, delay=2)
def get_problem_type(conversation_rows):
"""分析问题类型"""
dialogue_str = get_dialogue_str(conversation_rows)
prompt = f"""
请根据以下对话内容分析所属业务类别,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。
分类体系:
1. 软件需求 - 涉及功能新增/改进建议、系统集成需求等
2. 数据问题 - 数据导入导出、格式转换、计算异常等问题
3. 专业咨询 - 造价计算标准、定额套用、行业规范解读等
4. 功能操作 - 软件功能使用步骤、界面操作指导
5. 故障排查 - 系统崩溃、性能问题、兼容性故障
6. 培训支持 - 功能教学、操作手册获取、培训需求
7. 计价依据 - 定额库更新、材料价格库维护、地方标准差异
9. 其他 - 无法归类的对话内容
处理要求:
1. 分析时注意识别电力行业特有表述(如"杆塔组立""电缆敷设"等专业工序)
2. 区分操作类问题与技术故障(如"报表导出失败"需区分是操作错误还是系统错误)
3. 对涉及专业计算的咨询,需判断是否属于常规咨询(专业咨询)或系统计算异常(数据问题)
4. 对模糊表述要求追问的场景归入培训支持类
输出格式:
{question_type_parser.get_format_instructions()}
对话记录如下:
{dialogue_str}
"""
llm = get_llm_instance()
response = llm.invoke(user_prompt=prompt)
question_type = question_type_parser.parse(response.content)
return question_type.question_type
@retry_llm_call(max_retries=3, delay=2)
def get_is_complaint_and_is_complaint_level(conversation_rows):
"""分析是否抱怨及抱怨级别"""
dialogue_str = get_dialogue_str(conversation_rows)
prompt = f"""
请根据以下对话记录分析访客情绪是否对博微软件或者坐席服务存在明显抱怨,并按照以下结构输出JSON格式分析结果:
1. 抱怨识别:判断访客是否对博微软件功能或者坐席服务存在明显抱怨语气或词语
2. 抱怨分级(如存在抱怨):
- 一般抱怨:对博微软件功者坐席服务存在轻微不满但情绪稳定
- 中等抱怨:对博微软件或者坐席服务明确表达不满并提出具体问题
- 严重抱怨:对博微软件或者坐席服务使用激烈言辞或威胁性语言
- 抗议行为:明确表示投诉/退费/法律手段
3. 投诉倾向:是否明确/暗示将进行投诉
4. 抱怨对象:坐席服务态度/业务能力 或 博微功能问题(注意忽略对非博微软件或坐席的抱怨)
5. 内容摘录:标注具体抱怨语句
6. 分析理由:结合语义与上下文的判断依据
示例输出:
{{
"is_dissatisfaction": true,
"dissatisfaction_level": "严重抱怨",
"dissatisfaction_reasoning": "博微软件缺陷导致实际损失",
"is_complaint": "true"
}}
输出格式要求:
{is_complaint_parser.get_format_instructions()}
当前对话记录:
{dialogue_str}
附加分析要求:
1. 区分客观问题描述与主观情绪表达
2. 注意抱怨升级趋势(如从一般抱怨发展为严重抗议)
3. 关注非文本线索(如有记录可分析语气词、停顿等副语言特征)
4. 标注涉及多个抱怨对象的情况
"""
llm = get_llm_instance()
response = llm.invoke(user_prompt=prompt)
is_complaint = is_complaint_parser.parse(response.content)
return (is_complaint.is_dissatisfaction,
is_complaint.dissatisfaction_level,
is_complaint.dissatisfaction_reasoning,
is_complaint.is_complaint)
# ================ 主流程处理 ================
def process_conversation(conversation_id, conversation_rows, product_detail_dict):
"""处理单个会话的函数,用于多线程并发"""
# 获取工单基本信息
workorder_dict = get_workorder_dict(conversation_rows)
# 分析是否抱怨、是否投诉、抱怨级别
is_dissatisfaction, dissatisfaction_level, dissatisfaction_reasoning, is_complaint = (
get_is_complaint_and_is_complaint_level(conversation_rows))
# 分析用户问题和解决方案
user_question, solution = get_user_question_and_solution(conversation_rows)
# 分析问题类型
problem_type = get_problem_type(conversation_rows)
# 分析产品线
product_line = get_product_line(conversation_rows, product_detail_dict)
# 分析产品名称和模块名称
if product_line != '':
product_name, module_name = get_product_name_and_module_name(
product_line, conversation_rows, product_detail_dict)
else:
product_name = ''
module_name = ''
# 更新工单字典
workorder_dict.update({
"产品线": product_line,
"产品名称": product_name,
"模块名称": module_name,
"客户问题": user_question,
"问题类型": problem_type,
"是否抱怨": "" if is_dissatisfaction else '',
"抱怨级别": dissatisfaction_level,
"是否投诉": "" if is_complaint else '',
"解决方案": (solution + '\n存在抱怨:' + dissatisfaction_reasoning) if is_dissatisfaction else solution
})
return workorder_dict
def analyze_conversation_data(conversation_excel_path, product_detail_excel_path, max_workers=4):
"""分析会话数据主流程,使用多线程并发处理"""
# 读取Excel文件
df = pd.read_excel(conversation_excel_path)
# 检查数据框的列
print(f"Excel文件列名: {df.columns.tolist()}")
print(f"数据总行数: {len(df)}")
# 解析产品详情
product_detail_dict = parse_product_detail_excel(product_detail_excel_path)
# 按会话ID分组
conversation_dict = group_conversations_by_id(df)
# 使用线程池处理每个会话
workorder_dict_list = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 创建任务
future_to_conversation = {
executor.submit(process_conversation, conversation_id, conversation_rows, product_detail_dict): conversation_id
for conversation_id, conversation_rows in conversation_dict.items()
}
# 获取结果
for future in concurrent.futures.as_completed(future_to_conversation):
conversation_id = future_to_conversation[future]
try:
workorder_dict = future.result()
workorder_dict_list.append(workorder_dict)
print(f"完成处理会话ID: {conversation_id}")
except Exception as exc:
print(f"处理会话ID: {conversation_id} 时发生错误: {exc}")
return workorder_dict_list
def save_results_to_excel(workorder_dict_list, output_file='workorder_result.xlsx'):
"""将结果保存到Excel文件"""
result_df = pd.DataFrame(workorder_dict_list)
# 按照指定的列顺序重新排列DataFrame的列
columns_order = [
'工单编号', '产品线', '产品名称', '模块名称', '问题类型',
'客户问题', '解决方案', '是否抱怨', '是否投诉', '抱怨级别',
'会话id', '访客昵称', '处理坐席', '创建时间'
]
# 确保所有列都存在,如果不存在则添加空列
for col in columns_order:
if col not in result_df.columns:
result_df[col] = None
# 按指定顺序重排列
result_df = result_df[columns_order]
# 保存到Excel文件
result_df.to_excel(output_file, index=False)
print(f"结果已保存到 {output_file}")
# ================ 主程序入口 ================
def main():
"""主程序入口"""
# 文件路径
conversation_excel_path = os.path.join('data', 'excel', '会话内容详情20250527173330.xlsx')
product_detail_excel_path = os.path.join('data', 'excel', '产品详情20250527175736.xlsx')
# 分析会话数据,设置并发线程数
max_workers = 8 # 可以根据CPU核心数和任务特性调整
workorder_dict_list = analyze_conversation_data(conversation_excel_path, product_detail_excel_path, max_workers)
# 保存结果
save_results_to_excel(workorder_dict_list)
if __name__ == "__main__":
main()