import os
import json
import pandas as pd
import argparse
from datetime import datetime
import time
import concurrent.futures
from functools import wraps
from pydantic import BaseModel, Field
from langchain.output_parsers import PydanticOutputParser
import sys
from dotenv import load_dotenv
import httpx
import traceback
import re
import logging
from tqdm import tqdm
import glob
import shutil
# 将项目根目录添加到Python路径
sys.path.append(os.getcwd())
from rag2_0.tool.ModelTool import OpenAiLLM
load_dotenv()
os.makedirs("data/logs", exist_ok=True)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('data/logs/dialogue_to_workorder.log', encoding='utf-8')
]
)
logger = logging.getLogger("dialogue_to_workorder")
human_info={
"1116":["夏剑媛", "新能源"],
"1201":["曹美芳", "配网造价及清单"],
"1202":["彭珊珊", "主网造价及清单"],
"1230":["龚青", "配网造价及清单"],
"1544":["黄婷", "主网造价及清单"],
"1546":["严琼辉", "配网造价及清单"],
"1552":["吴园妹", "主网造价及清单"],
"1555":["魏怡璠", "配网造价及清单"],
"1789":["冷琛", "主网造价及清单"],
"2142":["余国庆", "配网造价及清单"],
"2144":["卢光辉", "技改检修"],
"2145":["万志星", "技改检修"],
"2233":["徐雨萍", "主网造价及清单"],
"2262":["刘雨微", "主网造价及清单"],
"2591":["揭敏", "主网造价及清单"],
"3035":["杨玲", "主网造价及清单"],
"3416":["杨苏文", "配网造价及清单"],
"3417":["王琴", "配网造价及清单"],
"439":["赵莉", "技改检修"],
"8340":["熊磊娇", "新能源"],
"8442":["胡月", "配网造价及清单"],
"8443":["杨淑玲", "主网造价及清单"],
"8555":["胡青艳", "主网造价及清单"],
"8762":["周丽华", "主网造价及清单"],
"1553":["郝中华", "技改检修"],
"8817":["赵雅馨", "技改检修"],
"2590":["李琴", "技改检修"],
}
# ================ 模型定义 ================
class UserQuestionAndSolution(BaseModel):
user_question: str = Field(description="用户的核心问题")
solution: str = Field(description="坐席提供的解决方案,解决方案如果存在多个步骤,使用中文分号隔开")
class ProductInfo:
product_line:str
product_name:str
class UserQuestionAndSolutionList(BaseModel):
user_question_list: list[UserQuestionAndSolution] = Field(description="客户问题列表")
class QuestionType(BaseModel):
question_type: str = Field(description="问题类型")
class IsComplaint(BaseModel):
is_dissatisfaction: bool = Field(description="是否抱怨")
dissatisfaction_level: str = Field(description="抱怨级别")
dissatisfaction_reasoning: str = Field(description="抱怨原因")
is_complaint: bool = Field(description="是否明确/暗示将进行投诉")
# ================ 工具函数 ================
def retry_llm_call(max_retries=3, delay=2):
"""
重试装饰器,用于LLM调用失败时进行重试
参数:
max_retries: 最大重试次数
delay: 重试间隔时间(秒)
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
last_exception = None
while retries < max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
retries += 1
logger.warning(f"LLM调用失败,正在进行第{retries}次重试: {str(e)}")
if retries < max_retries:
time.sleep(delay*retries)
stack_trace = traceback.format_exc()
logger.error(f"LLM调用失败,堆栈跟踪信息:\n{stack_trace}")
# 所有重试都失败后,抛出最后一次的异常
logger.error(f"LLM调用失败,已达到最大重试次数{max_retries}")
raise last_exception
return wrapper
return decorator
# ================ 对话转工单处理类 ================
class DialogueToWorkorder:
def __init__(self, llm_params=None):
"""
初始化对话转工单处理类
参数:
llm_params: LLM模型参数,如果为None则使用环境变量中的配置
"""
# 初始化输出解析器
self.user_question_and_solution_parser = PydanticOutputParser(pydantic_object=UserQuestionAndSolution)
self.user_question_and_solution_list_parser = PydanticOutputParser(pydantic_object=UserQuestionAndSolutionList)
self.question_type_parser = PydanticOutputParser(pydantic_object=QuestionType)
self.is_complaint_parser = PydanticOutputParser(pydantic_object=IsComplaint)
# 初始化LLM模型
self.llm_params = llm_params or {
"temperature": 0.2,
"top_p":0.95,
"model": "deepseek-ai/DeepSeek-R1",
"api_key": os.getenv("OPENAI_API_KEY"),
"base_url": os.getenv("OPENAI_API_BASE"),
"timeout": httpx.Timeout(600.0)
}
# self.llm_params = llm_params or {
# "temperature": 0.2,
# "top_p":0.95,
# "model": "deepseek-r1",
# "api_key": "25t%Syu6I9yxX2IuTN",
# "base_url": "http://10.1.0.154:8000/v1",
# "timeout": httpx.Timeout(600.0)
# }
self.llm = self._get_llm_instance()
# 创建工单JSON文件目录
self.workorder_json_dir = "data/temp_workorder_json"
os.makedirs(self.workorder_json_dir, exist_ok=True)
def _get_llm_instance(self):
"""获取LLM实例"""
return OpenAiLLM(**self.llm_params)
def get_product_info(self, conversation_context:str, skill_group:str)->ProductInfo:
product_info = ProductInfo()
# 默认为其他
product_info.product_line = "其他"
product_info.product_name = ""
# 主网造价及清单技能组
if skill_group == "主网造价及清单":
if "西藏" in conversation_context:
product_info.product_line = "博微西藏计价通Z1"
if "建安预算" in conversation_context or "全口径预算" in conversation_context:
product_info.product_name = "施工图预算"
elif "招标" in conversation_context or "投标" in conversation_context:
product_info.product_name = "招投标"
elif "清单" in conversation_context and "结算" in conversation_context:
product_info.product_name = "清单结算"
else:
product_info.product_name = "概预算"
elif "造价2016" in conversation_context or "造价2014" in conversation_context or "造价2008" in conversation_context:
product_info.product_line = "主网造价系列"
elif "清单2016" in conversation_context or "清单2015" in conversation_context:
product_info.product_line = "主网清单系列"
elif "大结算" in conversation_context or "结算2018" in conversation_context:
product_info.product_line = "管理或辅助工具软件"
product_info.product_name = "结算应用2018"
elif "清标" in conversation_context or "数字化" in conversation_context:
product_info.product_line = "管理或辅助工具软件"
product_info.product_name = "数字化清标工具"
else:
product_info.product_line = "博微电力建设计价通软件"
if "建安预算" in conversation_context or "全口径预算" in conversation_context:
product_info.product_name = "施工图预算"
elif "招标" in conversation_context or "投标" in conversation_context:
product_info.product_name = "招投标"
elif any(keyword in conversation_context for keyword in ["清单结算", "基准价", "结算条款", "结算市场价", "结算审核"]):
product_info.product_name = "清单结算"
else:
product_info.product_name = "概预算"
# 技改检修技能组
elif skill_group == "技改检修":
if "技改2015" in conversation_context or "技改2016" in conversation_context:
product_info.product_line = "技改检修系列"
else:
product_info.product_line = "博微技改检修计价通T1软件"
if "T1" in conversation_context and "清单" in conversation_context:
product_info.product_name = "清单"
else:
product_info.product_name = "概预算"
# 配网造价及清单技能组
elif skill_group == "配网造价及清单":
if "配网2017" in conversation_context or "09配网" in conversation_context or "老配网" in conversation_context:
product_info.product_line = "配网系列"
else:
product_info.product_line = "博微配网计价通D3"
if "南网" in conversation_context:
product_info.product_name = "南网版"
elif "清单" in conversation_context and "南网" not in conversation_context:
product_info.product_name = "行业清单计价"
else:
product_info.product_name = "概预算"
# 新能源技能组
elif skill_group == "新能源":
product_info.product_line = "新能源系列"
if "核电" in conversation_context:
product_info.product_name = "核电清单2018"
elif "光伏" in conversation_context and "2018" in conversation_context:
product_info.product_name = "光伏计价2018"
elif "储能" in conversation_context:
product_info.product_name = "新型储能电站计价通C1"
elif "营销" in conversation_context:
product_info.product_name = "陕西电力营销计价通M1"
elif "新能源" in conversation_context or "光伏清单" in conversation_context or "风电" in conversation_context:
product_info.product_name = "新能源计价通N1"
# 经济评价及其他技能组
elif "经济评价" in conversation_context:
product_info.product_line = "经济评价系列"
return product_info
def get_workorder_dict(self, rows):
"""从会话行中提取工单基本信息"""
# 预设字段
workorder_dict = {}
# 创建时间
for row in rows:
create_time = row['创建时间']
if pd.notna(create_time) and str(create_time).strip() != '':
workorder_dict["创建时间"] = create_time
break
# 处理坐席
for row in rows:
sender = row['消息发送者']
sender_nickname = row['发送者昵称']
if sender == "坐席" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '':
workorder_dict["处理坐席"] = sender_nickname
sender_num = re.findall(r'客服(\d+)', sender_nickname)
if len(sender_num) > 0 and sender_num[0] in human_info:
workorder_dict["处理人"] = human_info[sender_num[0]][0]
workorder_dict["处理技能组"] = human_info[sender_num[0]][1]
break
# 访客昵称
for row in rows:
sender = row['消息发送者']
sender_nickname = row['发送者昵称']
if sender == "访客" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '':
workorder_dict["访客昵称"] = sender_nickname
break
# 会话id
for row in rows:
conversation_id = row['会话id']
if pd.notna(conversation_id) and str(conversation_id).strip() != '':
workorder_dict["会话id"] = conversation_id
break
# 工单编号 - 将"创建时间"作为工单编号,格式化为20250513104124
if "创建时间" in workorder_dict and pd.notna(workorder_dict["创建时间"]):
create_time_str = str(workorder_dict["创建时间"]).strip()
dt = datetime.strptime(create_time_str, "%Y-%m-%d %H:%M:%S")
workorder_dict["工单编号"] = dt.strftime("%Y%m%d%H%M%S")
# 获取产品线和产品名称
dialogue_str = self.get_dialogue_str(rows)
skill_group = workorder_dict.get("处理技能组", "")
product_info = self.get_product_info(dialogue_str, skill_group)
workorder_dict["产品线"] = product_info.product_line
workorder_dict["产品名称"] = product_info.product_name
return workorder_dict
def get_dialogue_str(self, conversation_rows):
"""从会话行中提取对话内容"""
dialogue = []
for row in conversation_rows:
sender = row.get('消息发送者', '')
content = str(row.get('消息内容', '')).strip()
# 处理非文本内容
if content == '' or pd.isna(row["消息内容"]):
if str(row.get('图片', '')).strip() != '':
content = '[图片]'
elif str(row.get('附件', '')).strip() != '':
content = '[附件]'
elif str(row.get('视频', '')).strip() != '':
content = '[视频]'
elif str(row.get('语音', '')).strip() != '':
content = '[语音]'
# 添加对话内容
if sender == '访客':
dialogue.append(f"访客:{content}")
elif sender == '坐席':
dialogue.append(f"坐席:{content}")
return '\n'.join(dialogue)
def group_conversations_by_id(self, df):
"""将数据按会话ID分组"""
conversation_dict = {}
for index, row in df.iterrows():
conversation_id = row['会话id']
if pd.notna(conversation_id) and str(conversation_id).strip() != '':
if conversation_id in conversation_dict:
conversation_dict[conversation_id].append(row.to_dict())
else:
conversation_dict[conversation_id] = [row.to_dict()]
return conversation_dict
@retry_llm_call(max_retries=3, delay=2)
def get_user_question_and_solution(self, conversation_rows):
"""分析用户问题和解决方案"""
dialogue_str = self.get_dialogue_str(conversation_rows)
prompt = """请从以下电力造价相关的客服对话记录中,识别并总结用户提出的核心问题及对应坐席提供的解决方案。(注意指代消除)
1、理解对话记录,总结用户在此次对话中提出的核心问题,以用户的角度总结核心问题(可根据上下文完善问题内容)。
2、根据用户提出的问题,分析坐席提供的解决方法(比如:1、引导用户xxxx。2、告诉用户xxxxx)。以坐席的角度直接总结完整的解决方案或应对措施(不要出现"坐席"、"我"等字样)
3、提炼访客独立的核心问题(以访客的角度总结核心问题),核心问题衍生、细化后的请求合并到对应的核心问题中。不要单独列出衍生、细化后的请求。
4、使用json格式输出(多个用户问题采用标准json格式输出):
{output_format}
输出示例:
```json
{{
"user_question": "技改软件打开报错",
"solution": "1、告知报错原因;2、通过远程辅助解决"
}}
```
=======对话记录如下所示=======
{dialogue_str}
============================
"""
output_format = self.user_question_and_solution_parser.get_format_instructions()
llm_prompt = prompt.format(output_format=output_format, dialogue_str=dialogue_str)
response = self.llm.invoke(user_prompt=llm_prompt, need_retry=False)
clean_output = re.sub(r'.*?', '', response.content, flags=re.DOTALL)
try:
if clean_output.count('user_question') == 1:
user_question_and_solution = self.user_question_and_solution_parser.parse(clean_output)
return [user_question_and_solution]
else:
array_pattern = r'\[\s*(\{[\s\S]*?\}(?:\s*,\s*\{[\s\S]*?\})*)\s*\]'
array_match = re.search(array_pattern, clean_output)
if array_match:
# 找到了JSON数组
json_array_str = '[' + array_match.group(1) + ']'
# 尝试解析JSON
json_objects = json.loads(json_array_str)
user_question_and_solution_list = []
for obj in json_objects:
user_question = obj.get('user_question', '')
solution = obj.get('solution', '')
user_question_and_solution_list.append(
UserQuestionAndSolution(user_question=user_question, solution=solution)
)
if user_question_and_solution_list:
return user_question_and_solution_list
raise Exception("解析失败") # 如果之前没有return 触发异常重新提取
except Exception as e:
output_format = self.user_question_and_solution_list_parser.get_format_instructions()
llm_prompt = prompt.format(output_format=output_format, dialogue_str=dialogue_str)
response = self.llm.invoke(user_prompt=llm_prompt, need_retry=False)
clean_output = re.sub(r'.*?', '', response.content, flags=re.DOTALL)
user_question_and_solution_temp = self.user_question_and_solution_list_parser.parse(clean_output)
return user_question_and_solution_temp.user_question_list
return [user_question_and_solution]
@retry_llm_call(max_retries=3, delay=2)
def get_problem_type(self, conversation_rows, user_question_str, solution_str):
"""分析问题类型"""
dialogue_str = self.get_dialogue_str(conversation_rows)
prompt = f"""
请根据以下对话内容分析所属业务类别,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。
分类体系:
1. 软件需求 - 涉及功能新增/改进建议、系统集成需求等
2. 数据问题 - 数据导入导出、格式转换、计算异常等问题
3. 专业咨询 - 造价计算标准、定额套用、行业规范解读等
4. 功能操作 - 软件功能使用步骤、界面操作指导
5. 故障排查 - 系统崩溃、性能问题、兼容性故障
6. 培训支持 - 功能教学、操作手册获取、培训需求
7. 计价依据 - 定额库更新、材料价格库维护、地方标准差异
9. 其他 - 无法归类的对话内容
处理要求:
1. 分析时注意识别电力行业特有表述(如"杆塔组立"、"电缆敷设"等专业工序)
2. 区分操作类问题与技术故障(如"报表导出失败"需区分是操作错误还是系统错误)
3. 对涉及专业计算的咨询,需判断是否属于常规咨询(专业咨询)或系统计算异常(数据问题)
4. 对模糊表述要求追问的场景归入培训支持类
5. 对话记录可能存在多个问题,你只需要判断"{user_question_str}"属于哪个类型的问题
输出格式:
{self.question_type_parser.get_format_instructions()}
对话记录如下:
{dialogue_str}
"""
response = self.llm.invoke(user_prompt=prompt, need_retry=False)
clean_output = re.sub(r'.*?', '', response.content, flags=re.DOTALL)
question_type = self.question_type_parser.parse(clean_output)
return question_type.question_type
@retry_llm_call(max_retries=3, delay=2)
def get_is_complaint_and_is_complaint_level(self, conversation_rows):
"""分析是否抱怨及抱怨级别"""
dialogue_str = self.get_dialogue_str(conversation_rows)
prompt = f"""
请根据以下对话记录分析访客情绪是否对软件或者坐席服务存在明显抱怨,并按照以下结构输出JSON格式分析结果:
1. 抱怨识别:判断访客是否对软件功能或者坐席服务存在明显抱怨或不满
2. 抱怨分级(如存在抱怨):
- 普通抱怨:明确表达出对软件功能或者坐席服务存在不满
- 严重抱怨:明确表达出对软件功能或者坐席服务存在不满,语气较为强烈
3. 投诉倾向:是否明确/暗示将进行投诉
4. 抱怨对象:坐席服务态度/业务能力 或 软件功能问题(注意忽略对非软件或坐席的抱怨)
示例输出:
{{
"is_dissatisfaction": true,
"dissatisfaction_level": "严重抱怨",
"dissatisfaction_reasoning": "博微软件缺陷导致实际损失",
"is_complaint": "true"
}}
输出格式要求:
{self.is_complaint_parser.get_format_instructions()}
当前对话记录:
{dialogue_str}
"""
response = self.llm.invoke(user_prompt=prompt, need_retry=False)
clean_output = re.sub(r'.*?', '', response.content, flags=re.DOTALL)
is_complaint = self.is_complaint_parser.parse(clean_output)
return (is_complaint.is_dissatisfaction,
is_complaint.dissatisfaction_level,
is_complaint.dissatisfaction_reasoning,
is_complaint.is_complaint)
def save_conversation_to_json(self, conversation_id, workorder_list):
"""
将会话处理结果保存为JSON文件
参数:
conversation_id: 会话ID
workorder_list: 工单列表
"""
# 确保目录存在
os.makedirs(self.workorder_json_dir, exist_ok=True)
# 构建文件路径
file_path = os.path.join(self.workorder_json_dir, f"{conversation_id}.json")
# 将工单列表转换为可序列化的字典列表
serializable_workorder_list = []
for workorder in workorder_list:
# 处理datetime对象
serializable_workorder = {}
for key, value in workorder.items():
if isinstance(value, datetime):
serializable_workorder[key] = value.strftime("%Y-%m-%d %H:%M:%S")
else:
serializable_workorder[key] = value
serializable_workorder_list.append(serializable_workorder)
# 保存为JSON文件
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(serializable_workorder_list, f, ensure_ascii=False, indent=2)
logger.info(f"会话ID: {conversation_id} 的处理结果已保存到 {file_path}")
def load_conversation_from_json(self, conversation_id):
"""
从JSON文件加载会话处理结果
参数:
conversation_id: 会话ID
返回:
工单列表,如果文件不存在则返回None
"""
# 构建文件路径
file_path = os.path.join(self.workorder_json_dir, f"{conversation_id}.json")
# 检查文件是否存在
if not os.path.exists(file_path):
return None
# 从JSON文件加载工单列表
try:
with open(file_path, 'r', encoding='utf-8') as f:
workorder_list = json.load(f)
logger.info(f"已从 {file_path} 加载会话ID: {conversation_id} 的处理结果")
return workorder_list
except Exception as e:
logger.error(f"加载会话ID: {conversation_id} 的处理结果时发生错误: {e}")
return None
def process_conversation(self, conversation_id, conversation_rows):
"""处理单个会话的函数,用于多线程并发"""
# if conversation_id!="b157aa91-3acb-11f0-a191-4fb224ef4b40":
# return []
try:
# 获取工单基本信息
base_workorder_dict = self.get_workorder_dict(conversation_rows)
# 分析用户问题和解决方案
user_question_list = self.get_user_question_and_solution(conversation_rows)
user_question_str=""
for user_question in user_question_list:
user_question_str = user_question_str + user_question.user_question.strip() + "\n"
user_question_str = user_question_str.strip()
solution_str=""
for user_question in user_question_list:
solution_str = solution_str + user_question.solution.strip() + "\n"
solution_str = solution_str.strip()
# 分析是否抱怨、是否投诉、抱怨级别
is_dissatisfaction, dissatisfaction_level, dissatisfaction_reasoning, is_complaint = (
self.get_is_complaint_and_is_complaint_level(conversation_rows))
# 分析问题类型
problem_type = self.get_problem_type(conversation_rows, user_question_str, solution_str)
# 创建工单列表
workorder_list = []
for user_question in user_question_list:
user_question_str = user_question.user_question
solution_str = user_question.solution
# 创建新的工单字典,复制基本信息
workorder_dict = base_workorder_dict.copy()
# 更新工单字典
workorder_dict.update({
"客户问题": user_question_str,
"问题类型": problem_type,
"是否抱怨": "是" if is_dissatisfaction else '否',
"抱怨内容": dissatisfaction_reasoning if is_dissatisfaction else '',
"抱怨级别": dissatisfaction_level if is_dissatisfaction else '',
"是否投诉": "是" if is_complaint else '否',
"解决方案": solution_str
})
# 将工单添加到列表中
workorder_list.append(workorder_dict)
# 将处理结果保存为JSON文件
self.save_conversation_to_json(conversation_id, workorder_list)
return workorder_list
except Exception as e:
logger.error(f"处理会话ID: {conversation_id} 时发生错误: {e}")
return []
def analyze_conversation_data(self, conversation_excel_path, max_workers=10, start_date=None, end_date=None):
"""分析会话数据主流程,使用多线程并发处理,支持失败重试和JSON合并"""
# 读取Excel文件
df = pd.read_excel(conversation_excel_path)
# 检查数据框的列
logger.info(f"Excel文件列名: {df.columns.tolist()}")
logger.info(f"数据总行数: {len(df)}")
# 按会话ID分组
conversation_dict = self.group_conversations_by_id(df)
# 如果指定了时间范围,则过滤数据
if start_date or end_date:
logging.info(f"过滤时间范围: {start_date} 至 {end_date}")
# 将字符串日期转换为datetime对象
start_date_dt = datetime.strptime(start_date, "%Y-%m-%d %H:%M:%S") if start_date else None
end_date_dt = datetime.strptime(end_date, "%Y-%m-%d %H:%M:%S") if end_date else None
new_conversation_dict = {}
for conversation_id, conversation_rows in conversation_dict.items():
# 获取会话创建时间并转换为datetime对象
create_time_str = conversation_rows[0]["创建时间"]
if isinstance(create_time_str, str):
create_time_dt = datetime.strptime(create_time_str, "%Y-%m-%d %H:%M:%S")
else:
# 如果已经是datetime对象则直接使用
create_time_dt = create_time_str
# 使用datetime对象进行比较
if (start_date_dt and create_time_dt < start_date_dt) or (end_date_dt and create_time_dt > end_date_dt):
continue
new_conversation_dict[conversation_id] = conversation_rows
conversation_dict = new_conversation_dict
logger.info(f"会话总数为 {len(conversation_dict)},处理全部会话")
# ========== 新增:扫描已存在的JSON文件 ==========
existing_json_files = set()
workorder_json_dir = self.workorder_json_dir
if not os.path.exists(workorder_json_dir):
os.makedirs(workorder_json_dir, exist_ok=True)
for fname in os.listdir(workorder_json_dir):
if fname.endswith('.json'):
conversation_id = fname[:-5]
existing_json_files.add(conversation_id)
# 本次新生成的JSON文件
newly_generated_json_files = set()
# 本次未重新生成但已存在的JSON文件
reused_json_files = set()
# ========== 线程池处理会话 ==========
successful_conversations = set()
failed_conversations = set()
import threading
lock = threading.Lock()
def process_wrapper(conversation_id, conversation_rows):
json_file_path = os.path.join(workorder_json_dir, f"{conversation_id}.json")
if conversation_id in existing_json_files and os.path.exists(json_file_path):
# 已存在,直接复用
with lock:
reused_json_files.add(conversation_id)
return None # 不处理
# 否则正常处理
result = self.process_conversation(conversation_id, conversation_rows)
if result:
with lock:
newly_generated_json_files.add(conversation_id)
return result
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_conversation = {
executor.submit(process_wrapper, conversation_id, conversation_rows): conversation_id
for conversation_id, conversation_rows in conversation_dict.items()
}
for future in tqdm(concurrent.futures.as_completed(future_to_conversation), total=len(future_to_conversation), desc="第一轮处理会话进度"):
conversation_id = future_to_conversation[future]
try:
result_workorders = future.result()
if result_workorders:
successful_conversations.add(conversation_id)
logger.info(f"完成处理会话ID: {conversation_id},生成工单数量: {len(result_workorders)}")
elif conversation_id in reused_json_files:
successful_conversations.add(conversation_id)
logger.info(f"跳过已存在JSON,会话ID: {conversation_id}")
else:
failed_conversations.add(conversation_id)
logger.warning(f"会话ID: {conversation_id} 处理可能失败,将在第二轮重试")
except Exception as exc:
failed_conversations.add(conversation_id)
logger.error(f"处理会话ID: {conversation_id} 时发生错误: {exc}")
# 检查哪些会话没有成功生成JSON文件
all_conversation_ids = set(conversation_dict.keys())
for conversation_id in all_conversation_ids:
json_file_path = os.path.join(workorder_json_dir, f"{conversation_id}.json")
if not os.path.exists(json_file_path):
failed_conversations.add(conversation_id)
if conversation_id in successful_conversations:
successful_conversations.remove(conversation_id)
# ========== 第二轮重试 ==========
if failed_conversations:
logger.info(f"第一轮处理后有 {len(failed_conversations)} 个会话需要重试")
with concurrent.futures.ThreadPoolExecutor(max_workers=max(1, max_workers // 2)) as executor:
future_to_conversation = {
executor.submit(process_wrapper, conversation_id, conversation_dict[conversation_id]): conversation_id
for conversation_id in failed_conversations
}
for future in tqdm(concurrent.futures.as_completed(future_to_conversation), total=len(future_to_conversation), desc="第二轮重试处理进度"):
conversation_id = future_to_conversation[future]
try:
result_workorders = future.result()
if result_workorders:
successful_conversations.add(conversation_id)
newly_generated_json_files.add(conversation_id)
failed_conversations.remove(conversation_id)
logger.info(f"重试成功: 会话ID: {conversation_id},生成工单数量: {len(result_workorders)}")
elif conversation_id in reused_json_files:
successful_conversations.add(conversation_id)
failed_conversations.remove(conversation_id)
logger.info(f"重试跳过已存在JSON,会话ID: {conversation_id}")
except Exception as exc:
logger.error(f"重试处理会话ID: {conversation_id} 时仍然发生错误: {exc}")
# ========== 合并本次所有成功的JSON文件 ==========
logger.info(f"开始合并JSON文件结果,成功处理会话数: {len(successful_conversations)},失败会话数: {len(failed_conversations)}")
workorder_dict_list = []
# 只合并本次新生成和本次未重新生成但已存在的JSON
all_json_ids_to_merge = newly_generated_json_files.union(reused_json_files)
json_files = [os.path.join(workorder_json_dir, f"{cid}.json") for cid in all_json_ids_to_merge if os.path.exists(os.path.join(workorder_json_dir, f"{cid}.json"))]
for json_file in tqdm(json_files, desc="合并JSON文件"):
conversation_id = os.path.basename(json_file).replace(".json", "")
try:
with open(json_file, 'r', encoding='utf-8') as f:
workorders = json.load(f)
workorder_dict_list.extend(workorders)
except Exception as e:
logger.error(f"加载JSON文件 {json_file} 时发生错误: {e}")
logger.info(f"处理完成,成功处理会话数: {len(successful_conversations)},失败会话数: {len(failed_conversations)}")
if failed_conversations:
logger.warning(f"以下会话处理失败: {failed_conversations}")
return workorder_dict_list
def save_results_to_excel(self, workorder_dict_list, output_file=None):
"""将结果保存到Excel文件,并清理JSON文件"""
result_df = pd.DataFrame(workorder_dict_list)
# 按照指定的列顺序重新排列DataFrame的列
columns_order = [
'工单编号', '产品线', '产品名称', '问题类型',
'客户问题', '解决方案', '是否抱怨', "抱怨内容", '是否投诉', '抱怨级别',
'会话id', '访客昵称', '处理坐席', "处理人", "处理技能组",'创建时间'
]
# 确保所有列都存在,如果不存在则添加空列
for col in columns_order:
if col not in result_df.columns:
result_df[col] = None
# 按指定顺序重排列
result_df = result_df[columns_order]
# 保存到Excel文件
if output_file is None:
# 默认输出文件名
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
output_file = os.path.join('data', 'excel', f'会话内容详情{timestamp}_转工单.xlsx')
# 确保目录存在
os.makedirs(os.path.dirname(output_file), exist_ok=True)
# 创建ExcelWriter对象,用于设置Excel样式
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
# 写入数据
result_df.to_excel(writer, index=False, sheet_name='工单数据')
# 获取工作簿和工作表
workbook = writer.book
worksheet = writer.sheets['工单数据']
# 设置行高(20磅 ≈ 26.67像素)
for row in worksheet.iter_rows():
worksheet.row_dimensions[row[0].row].height = 20
# 设置列宽
column_widths = {
'工单编号': 15,
'产品线': 24,
'产品名称': 40,
'问题类型': 9,
'客户问题': 20,
'解决方案': 30,
'是否抱怨': 9,
'抱怨内容': 30,
'是否投诉': 9,
'抱怨级别': 9,
'会话id': 9,
'访客昵称': 9,
'处理坐席': 9,
'处理人': 9,
'处理技能组': 9,
'创建时间': 9
}
# 应用列宽设置
for i, column in enumerate(columns_order):
col_letter = chr(65 + i) # A, B, C, ...
if i >= 26: # 超过Z的情况
col_letter = chr(64 + i // 26) + chr(65 + i % 26)
worksheet.column_dimensions[col_letter].width = column_widths[column]
logger.info(f"结果已保存到 {output_file}")
return output_file
# ================ 参数解析 ================
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description='对话内容转工单工具')
parser.add_argument('--conversation_file', type=str, required=False,
help='会话内容Excel文件路径')
parser.add_argument('--max_workers', type=int, default=40,
help='并发处理线程数,默认为16')
parser.add_argument('--start_date', type=str, required=False,default=None,
help='开始日期,格式为YYYY-MM-DD')
parser.add_argument('--end_date', type=str, required=False,default=None,
help='结束日期,格式为YYYY-MM-DD')
return parser.parse_args()
# ================ 主程序入口 ================
def main():
"""主程序入口"""
# 解析命令行参数
args = parse_arguments()
# 设置默认文件路径
conversation_excel_path = args.conversation_file or os.path.join('data', 'excel', '客服对话记录7.1-7.22.xlsx')
# 创建处理实例
processor = DialogueToWorkorder()
# 分析会话数据
workorder_dict_list = processor.analyze_conversation_data(
conversation_excel_path,
max_workers=args.max_workers,
start_date=args.start_date,
end_date=args.end_date
)
# 生成输出文件名
if args.start_date and args.end_date:
output_file = conversation_excel_path.replace('.xlsx', f'_{args.start_date}至{args.end_date}_转工单.xlsx')
elif args.start_date:
output_file = conversation_excel_path.replace('.xlsx', f'_从{args.start_date}起_转工单.xlsx')
elif args.end_date:
output_file = conversation_excel_path.replace('.xlsx', f'_至{args.end_date}_转工单.xlsx')
else:
output_file = conversation_excel_path.replace('.xlsx', '_转工单.xlsx')
# 保存结果
processor.save_results_to_excel(workorder_dict_list, output_file)
if __name__ == "__main__":
main()