import os import json import pandas as pd import argparse from datetime import datetime import time import concurrent.futures from functools import wraps from pydantic import BaseModel, Field from langchain.output_parsers import PydanticOutputParser from rag2_0.tool.ModelTool import OpenAiLLM from dotenv import load_dotenv import openpyxl load_dotenv() # ================ 模型定义 ================ class UserQuestionAndSolution(BaseModel): user_question: str = Field(description="客户问题") solution: str = Field(description="坐席提供的解决方案") class UserQuestionAndSolutionList(BaseModel): user_question_list: list[UserQuestionAndSolution] = Field(description="客户问题列表") class QuestionType(BaseModel): question_type: str = Field(description="问题类型") class IsComplaint(BaseModel): is_dissatisfaction: bool = Field(description="是否抱怨") dissatisfaction_level: str = Field(description="抱怨级别") dissatisfaction_reasoning: str = Field(description="抱怨原因") is_complaint: bool = Field(description="是否明确/暗示将进行投诉") class ProductNameAndModuleName(BaseModel): product_name: str = Field(description="产品名称") module_name: str = Field(description="模块名称") class ProductLine(BaseModel): product_line: str = Field(description="产品线") # ================ 工具函数 ================ def retry_llm_call(max_retries=3, delay=2): """ 重试装饰器,用于LLM调用失败时进行重试 参数: max_retries: 最大重试次数 delay: 重试间隔时间(秒) """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): retries = 0 last_exception = None while retries < max_retries: try: return func(*args, **kwargs) except Exception as e: last_exception = e retries += 1 print(f"LLM调用失败,正在进行第{retries}次重试: {str(e)}") if retries < max_retries: time.sleep(delay*retries) # 所有重试都失败后,抛出最后一次的异常 print(f"LLM调用失败,已达到最大重试次数{max_retries}") raise last_exception return wrapper return decorator # ================ 对话转工单处理类 ================ class DialogueToWorkorder: def __init__(self, llm_params=None): """ 初始化对话转工单处理类 参数: llm_params: LLM模型参数,如果为None则使用环境变量中的配置 """ # 初始化输出解析器 self.user_question_and_solution_parser = PydanticOutputParser(pydantic_object=UserQuestionAndSolution) self.user_question_and_solution_list_parser = PydanticOutputParser(pydantic_object=UserQuestionAndSolutionList) self.question_type_parser = PydanticOutputParser(pydantic_object=QuestionType) self.is_complaint_parser = PydanticOutputParser(pydantic_object=IsComplaint) self.product_name_and_module_name_parser = PydanticOutputParser(pydantic_object=ProductNameAndModuleName) self.product_line_parser = PydanticOutputParser(pydantic_object=ProductLine) # 初始化LLM模型 self.llm_params = llm_params or { "temperature": 0.6, "model": os.getenv("LLM_MODEL_NAME"), "api_key": os.getenv("OPENAI_API_KEY"), "base_url": os.getenv("OPENAI_API_BASE") } self.llm = self._get_llm_instance() def _get_llm_instance(self): """获取LLM实例""" return OpenAiLLM(**self.llm_params) def parse_product_detail_excel(self, file_path): """解析产品详情Excel文件""" df = pd.read_excel(file_path) product_dict = {} for _, row in df.iterrows(): product_line = str(row['产品线']).strip() if pd.notna(row['产品线']) else '' product_name = str(row['产品名称']).strip() if pd.notna(row['产品名称']) else '' module_name = str(row['模块名称']).strip() if pd.notna(row['模块名称']) else '' if product_line not in product_dict: product_dict[product_line] = {} if product_name not in product_dict[product_line]: product_dict[product_line][product_name] = [] product_dict[product_line][product_name].append(module_name) return product_dict def get_workorder_dict(self, rows): """从会话行中提取工单基本信息""" workorder_dict = {} # 创建时间 for row in rows: create_time = row['创建时间'] if pd.notna(create_time) and str(create_time).strip() != '': workorder_dict["创建时间"] = create_time break # 处理坐席 for row in rows: sender = row['消息发送者'] sender_nickname = row['发送者昵称'] if sender == "坐席" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '': workorder_dict["处理坐席"] = sender_nickname break # 访客昵称 for row in rows: sender = row['消息发送者'] sender_nickname = row['发送者昵称'] if sender == "访客" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '': workorder_dict["访客昵称"] = sender_nickname break # 会话id for row in rows: conversation_id = row['会话id'] if pd.notna(conversation_id) and str(conversation_id).strip() != '': workorder_dict["会话id"] = conversation_id break # 工单编号 - 将"创建时间"作为工单编号,格式化为20250513104124 if "创建时间" in workorder_dict and pd.notna(workorder_dict["创建时间"]): create_time_str = str(workorder_dict["创建时间"]).strip() dt = datetime.strptime(create_time_str, "%Y-%m-%d %H:%M:%S") workorder_dict["工单编号"] = dt.strftime("%Y%m%d%H%M%S") return workorder_dict def get_dialogue_str(self, conversation_rows): """从会话行中提取对话内容""" dialogue = [] for row in conversation_rows: sender = row.get('消息发送者', '') content = str(row.get('消息内容', '')).strip() # 处理非文本内容 if content == '' or pd.isna(row["消息内容"]): if str(row.get('图片', '')).strip() != '': content = '[图片]' elif str(row.get('附件', '')).strip() != '': content = '[附件]' elif str(row.get('视频', '')).strip() != '': content = '[视频]' elif str(row.get('语音', '')).strip() != '': content = '[语音]' # 添加对话内容 if sender == '访客': dialogue.append(f"访客:{content}") elif sender == '坐席': dialogue.append(f"坐席:{content}") return '\n'.join(dialogue) def group_conversations_by_id(self, df): """将数据按会话ID分组""" conversation_dict = {} for index, row in df.iterrows(): conversation_id = row['会话id'] if pd.notna(conversation_id) and str(conversation_id).strip() != '': if conversation_id in conversation_dict: conversation_dict[conversation_id].append(row.to_dict()) else: conversation_dict[conversation_id] = [row.to_dict()] return conversation_dict @retry_llm_call(max_retries=3, delay=2) def get_user_question_and_solution(self, conversation_rows): """分析用户问题和解决方案""" dialogue_str = self.get_dialogue_str(conversation_rows) prompt = f""" 请从以下电力造价相关的客服对话记录中,精准提取用户提出的专业问题及对应坐席提供的解决方案。要求: 1. 专业识别: - 重点识别电力工程领域的专业术语(如:定额套用、工程量清单、概预算编制、造价指标分析等) - 注意区分不同业务场景(输变电工程、配网改造、新能源项目等) - 识别政策文件引用(如:国网Q/GDW 11337-2014标准) 2. 信息提取: 用户问题提取: - 核心诉求(成本核算/计价争议/软件操作等) - 涉及的专业环节(设计概算/施工图预算/竣工结算) - 具体技术参数(电压等级/线路长度/设备型号) 坐席解决方案提取: - 提供的计算方法(单位工程法/实物量法) - 推荐的计价依据(电力建设工程定额2018版) - 指导的软件操作步骤(博微软件操作) - 政策法规应用建议 - 文件模板提供情况 3. 结构化输出: {self.user_question_and_solution_list_parser.get_format_instructions()} 访客与坐席的对话记录如下: {dialogue_str} """ response = self.llm.invoke(user_prompt=prompt) user_question_and_solution_list = self.user_question_and_solution_list_parser.parse(response.content) return user_question_and_solution_list.user_question_list @retry_llm_call(max_retries=3, delay=2) def get_product_name_and_module_name(self, product_line, conversation_rows, product_detail_dict, user_question_str, solution_str): """分析产品名称和模块名称""" if product_line == '': return '', '' json_str = json.dumps(product_detail_dict[product_line]) dialogue_str = self.get_dialogue_str(conversation_rows) prompt = f""" 请根据以下对话内容分析所属产品名称和模块名称,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。 要求: 1. 如果对话记录中存在多个产品名称和模块名称,则根据"{user_question_str}"和"{solution_str}"判断最可能的产品名称和模块名称。 2. 如果对话记录中只存在一个产品名称和模块名称,则直接返回该产品名称和模块名称。 输出格式: {self.product_name_and_module_name_parser.get_format_instructions()} 产品名称列表及模块名称列表: {json_str} 对话记录: {dialogue_str} """ response = self.llm.invoke(user_prompt=prompt) product_name_and_module_name = self.product_name_and_module_name_parser.parse(response.content) return product_name_and_module_name.product_name, product_name_and_module_name.module_name @retry_llm_call(max_retries=3, delay=2) def get_product_line(self, conversation_rows, product_detail_dict, user_question_str, solution_str): """分析产品线""" dialogue_str = self.get_dialogue_str(conversation_rows) product_line_list = list(product_detail_dict.keys()) prompt = f""" 请根据以下对话内容分析所属产品线,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。 无法判断时,返回空字符串。即product_line="" 要求: 1. 如果对话记录中存在多个产品线,则根据"{user_question_str}"和"{solution_str}"判断最可能的产品线。 2. 如果对话记录中只存在一个产品线,则直接返回该产品线。 输出格式: {self.product_line_parser.get_format_instructions()} 产品线列表: {product_line_list} 对话记录: {dialogue_str} """ response = self.llm.invoke(user_prompt=prompt) product_line = self.product_line_parser.parse(response.content) return product_line.product_line @retry_llm_call(max_retries=3, delay=2) def get_problem_type(self, conversation_rows, user_question_str, solution_str): """分析问题类型""" dialogue_str = self.get_dialogue_str(conversation_rows) prompt = f""" 请根据以下对话内容分析所属业务类别,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。 分类体系: 1. 软件需求 - 涉及功能新增/改进建议、系统集成需求等 2. 数据问题 - 数据导入导出、格式转换、计算异常等问题 3. 专业咨询 - 造价计算标准、定额套用、行业规范解读等 4. 功能操作 - 软件功能使用步骤、界面操作指导 5. 故障排查 - 系统崩溃、性能问题、兼容性故障 6. 培训支持 - 功能教学、操作手册获取、培训需求 7. 计价依据 - 定额库更新、材料价格库维护、地方标准差异 9. 其他 - 无法归类的对话内容 处理要求: 1. 分析时注意识别电力行业特有表述(如"杆塔组立"、"电缆敷设"等专业工序) 2. 区分操作类问题与技术故障(如"报表导出失败"需区分是操作错误还是系统错误) 3. 对涉及专业计算的咨询,需判断是否属于常规咨询(专业咨询)或系统计算异常(数据问题) 4. 对模糊表述要求追问的场景归入培训支持类 5. 对话记录可能存在多个问题,你只需要判断"{user_question_str}"属于哪个类型的问题 输出格式: {self.question_type_parser.get_format_instructions()} 对话记录如下: {dialogue_str} """ response = self.llm.invoke(user_prompt=prompt) question_type = self.question_type_parser.parse(response.content) return question_type.question_type @retry_llm_call(max_retries=3, delay=2) def get_is_complaint_and_is_complaint_level(self, conversation_rows): """分析是否抱怨及抱怨级别""" dialogue_str = self.get_dialogue_str(conversation_rows) prompt = f""" 请根据以下对话记录分析访客情绪是否对博微软件或者坐席服务存在明显抱怨,并按照以下结构输出JSON格式分析结果: 1. 抱怨识别:判断访客是否对博微软件功能或者坐席服务存在明显抱怨语气或词语 2. 抱怨分级(如存在抱怨): - 一般抱怨:对博微软件功者坐席服务存在轻微不满但情绪稳定 - 中等抱怨:对博微软件或者坐席服务明确表达不满并提出具体问题 - 严重抱怨:对博微软件或者坐席服务使用激烈言辞或威胁性语言 - 抗议行为:明确表示投诉/退费/法律手段 3. 投诉倾向:是否明确/暗示将进行投诉 4. 抱怨对象:坐席服务态度/业务能力 或 博微功能问题(注意忽略对非博微软件或坐席的抱怨) 5. 内容摘录:标注具体抱怨语句 6. 分析理由:结合语义与上下文的判断依据 示例输出: {{ "is_dissatisfaction": true, "dissatisfaction_level": "严重抱怨", "dissatisfaction_reasoning": "博微软件缺陷导致实际损失", "is_complaint": "true" }} 输出格式要求: {self.is_complaint_parser.get_format_instructions()} 当前对话记录: {dialogue_str} 附加分析要求: 1. 区分客观问题描述与主观情绪表达 2. 注意抱怨升级趋势(如从一般抱怨发展为严重抗议) 3. 关注非文本线索(如有记录可分析语气词、停顿等副语言特征) 4. 标注涉及多个抱怨对象的情况 """ response = self.llm.invoke(user_prompt=prompt) is_complaint = self.is_complaint_parser.parse(response.content) return (is_complaint.is_dissatisfaction, is_complaint.dissatisfaction_level, is_complaint.dissatisfaction_reasoning, is_complaint.is_complaint) def process_conversation(self, conversation_id, conversation_rows, product_detail_dict): """处理单个会话的函数,用于多线程并发""" # 获取工单基本信息 workorder_dict = self.get_workorder_dict(conversation_rows) # 分析是否抱怨、是否投诉、抱怨级别 is_dissatisfaction, dissatisfaction_level, dissatisfaction_reasoning, is_complaint = ( self.get_is_complaint_and_is_complaint_level(conversation_rows)) # 分析用户问题和解决方案 user_question_list = self.get_user_question_and_solution(conversation_rows) for user_question in user_question_list: user_question_str = user_question.user_question solution_str = user_question.solution # 分析问题类型 problem_type = self.get_problem_type(conversation_rows, user_question_str, solution_str) # 分析产品线 product_line = self.get_product_line(conversation_rows, product_detail_dict, user_question_str, solution_str) # 分析产品名称和模块名称 if product_line != '': product_name, module_name = self.get_product_name_and_module_name( product_line, conversation_rows, product_detail_dict, user_question_str, solution_str) else: product_name = '' module_name = '' # 更新工单字典 workorder_dict.update({ "产品线": product_line, "产品名称": product_name, "模块名称": module_name, "客户问题": user_question_str, "问题类型": problem_type, "是否抱怨": "是" if is_dissatisfaction else '否', "抱怨级别": dissatisfaction_level if is_dissatisfaction else '', "是否投诉": "是" if is_complaint else '否', "解决方案": (solution_str + '\n存在抱怨:' + dissatisfaction_reasoning) if is_dissatisfaction else solution_str }) return workorder_dict def analyze_conversation_data(self, conversation_excel_path, product_detail_excel_path, max_workers=4): """分析会话数据主流程,使用多线程并发处理""" # 读取Excel文件 df = pd.read_excel(conversation_excel_path) # 检查数据框的列 print(f"Excel文件列名: {df.columns.tolist()}") print(f"数据总行数: {len(df)}") # 解析产品详情 product_detail_dict = self.parse_product_detail_excel(product_detail_excel_path) # 按会话ID分组 conversation_dict = self.group_conversations_by_id(df) # 使用线程池处理每个会话 workorder_dict_list = [] with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: # 创建任务 future_to_conversation = { executor.submit(self.process_conversation, conversation_id, conversation_rows, product_detail_dict): conversation_id for conversation_id, conversation_rows in conversation_dict.items() } # 获取结果 for future in concurrent.futures.as_completed(future_to_conversation): conversation_id = future_to_conversation[future] try: workorder_dict = future.result() workorder_dict_list.append(workorder_dict) print(f"完成处理会话ID: {conversation_id}") except Exception as exc: print(f"处理会话ID: {conversation_id} 时发生错误: {exc}") return workorder_dict_list def save_results_to_excel(self, workorder_dict_list, output_file=None): """将结果保存到Excel文件""" result_df = pd.DataFrame(workorder_dict_list) # 按照指定的列顺序重新排列DataFrame的列 columns_order = [ '工单编号', '产品线', '产品名称', '模块名称', '问题类型', '客户问题', '解决方案', '是否抱怨', '是否投诉', '抱怨级别', '会话id', '访客昵称', '处理坐席', '创建时间' ] # 确保所有列都存在,如果不存在则添加空列 for col in columns_order: if col not in result_df.columns: result_df[col] = None # 按指定顺序重排列 result_df = result_df[columns_order] # 保存到Excel文件 if output_file is None: # 默认输出文件名 timestamp = datetime.now().strftime("%Y%m%d%H%M%S") output_file = os.path.join('data', 'excel', f'会话内容详情{timestamp}_转工单.xlsx') # 确保目录存在 os.makedirs(os.path.dirname(output_file), exist_ok=True) # 创建ExcelWriter对象,用于设置Excel样式 with pd.ExcelWriter(output_file, engine='openpyxl') as writer: # 写入数据 result_df.to_excel(writer, index=False, sheet_name='工单数据') # 获取工作簿和工作表 workbook = writer.book worksheet = writer.sheets['工单数据'] # 设置行高(20磅 ≈ 26.67像素) for row in worksheet.iter_rows(): worksheet.row_dimensions[row[0].row].height = 20 # 设置列宽 column_widths = { '工单编号': 15, '产品线': 24, '产品名称': 40, '模块名称': 40, '问题类型': 9, '客户问题': 20, '解决方案': 30, '是否抱怨': 9, '是否投诉': 9, '抱怨级别': 9, '会话id': 9, '访客昵称': 9, '处理坐席': 9, '创建时间': 9 } # 应用列宽设置 for i, column in enumerate(columns_order): col_letter = chr(65 + i) # A, B, C, ... if i >= 26: # 超过Z的情况 col_letter = chr(64 + i // 26) + chr(65 + i % 26) worksheet.column_dimensions[col_letter].width = column_widths[column] print(f"结果已保存到 {output_file}") return output_file # ================ 参数解析 ================ def parse_arguments(): """解析命令行参数""" parser = argparse.ArgumentParser(description='对话内容转工单工具') parser.add_argument('--conversation_file', type=str, required=False, help='会话内容Excel文件路径') parser.add_argument('--product_detail_file', type=str, required=False, help='产品详情Excel文件路径') parser.add_argument('--max_workers', type=int, default=16, help='并发处理线程数,默认为16') return parser.parse_args() # ================ 主程序入口 ================ def main(): """主程序入口""" # 解析命令行参数 args = parse_arguments() # 设置默认文件路径 conversation_excel_path = args.conversation_file or os.path.join('data', 'excel', '会话内容详情20250528110230.xlsx') product_detail_excel_path = args.product_detail_file or os.path.join('data', 'excel', '产品详情_工单.xlsx') output_file = args.output_file # 配置LLM参数 llm_params = { "temperature": args.temperature, "model": args.model_name or os.getenv("LLM_MODEL_NAME"), "api_key": os.getenv("OPENAI_API_KEY"), "base_url": os.getenv("OPENAI_API_BASE") } # 创建处理实例 processor = DialogueToWorkorder(llm_params=llm_params) # 分析会话数据 workorder_dict_list = processor.analyze_conversation_data( conversation_excel_path, product_detail_excel_path, max_workers=args.max_workers ) # 保存结果 processor.save_results_to_excel(workorder_dict_list, output_file) if __name__ == "__main__": main()