diff --git a/data/nouns/professional_nouns_index/index.pkl b/data/nouns/professional_nouns_index/index.pkl index 69aa6b2..773e31b 100644 Binary files a/data/nouns/professional_nouns_index/index.pkl and b/data/nouns/professional_nouns_index/index.pkl differ diff --git a/rag2_0/demo/dialogue_to_workorder.py b/rag2_0/demo/dialogue_to_workorder.py index 6f8052d..e59c23d 100755 --- a/rag2_0/demo/dialogue_to_workorder.py +++ b/rag2_0/demo/dialogue_to_workorder.py @@ -207,10 +207,10 @@ class DialogueToWorkorder: """分析用户问题和解决方案""" dialogue_str = self.get_dialogue_str(conversation_rows) - prompt = """请从以下电力造价相关的客服对话记录中,识别并精准提取用户提出的问题及对应坐席提供的解决方案。 -1、理解对话记录,识别用户在此次对话中提出的诉求 -2、根据用户提出的诉求,分析坐席提供的解决方法 -3、使用json格式输出: + prompt = """请从以下电力造价相关的客服对话记录中,识别并总结用户提出的问题及对应坐席提供的解决方案。(注意指代消除) +1、理解对话记录,总结用户在此次对话中提出的核心诉求(希望解决的问题)。以用户的角度总结。 +2、根据用户提出的诉求,分析坐席提供的解决方法(比如:1、引导用户xxxx。2、告诉用户xxxxx)。以坐席的角度直接总结解决方案(不要出现"坐席"、"我"等字样) +3、使用json格式输出(多个用户诉求采用数组格式输出): {output_format} 输出示例: @@ -235,6 +235,7 @@ class DialogueToWorkorder: user_question_and_solution = self.user_question_and_solution_parser.parse(response.content) return [user_question_and_solution] else: + raise Exception("解析失败") except Exception as e: output_format = self.user_question_and_solution_list_parser.get_format_instructions() @@ -349,14 +350,13 @@ class DialogueToWorkorder: prompt = f""" 请根据以下对话记录分析访客情绪是否对博微软件或者坐席服务存在明显抱怨,并按照以下结构输出JSON格式分析结果: -1. 抱怨识别:判断访客是否对博微软件功能或者坐席服务存在**明显抱怨语气或词语** +1. 抱怨识别:判断访客是否对博微软件功能或者坐席服务存在抱怨或不满 2. 抱怨分级(如存在抱怨): - - 一般抱怨:明确提出对博微软件功能或者坐席服务存在不满 - - 中等抱怨:明确提出对博微软件功能或者坐席服务存在不满,语气较为强烈 - - 严重抱怨:对博微软件功能或者坐席服务使用激烈言辞或威胁性语言 + - 一般抱怨:明确表达出对博微软件功能或者坐席服务存在不满 + - 中等抱怨:明确表达出对博微软件功能或者坐席服务存在不满,语气较为强烈 + - 严重抱怨:对博微软件功能或者坐席服务使用激烈言辞 3. 投诉倾向:是否明确/暗示将进行投诉 4. 抱怨对象:坐席服务态度/业务能力 或 博微功能问题(注意忽略对非博微软件或坐席的抱怨) -5. 内容摘录:标注具体抱怨语句 示例输出: {{ @@ -372,11 +372,6 @@ class DialogueToWorkorder: 当前对话记录: {dialogue_str} -附加分析要求: -1. 区分客观问题描述与主观情绪表达 -2. 注意抱怨升级趋势(如从一般抱怨发展为严重抗议) -3. 关注非文本线索(如有记录可分析语气词、停顿等副语言特征) -4. 标注涉及多个抱怨对象的情况 """ response = self.llm.invoke(user_prompt=prompt) @@ -390,30 +385,46 @@ class DialogueToWorkorder: def process_conversation(self, conversation_id, conversation_rows, product_detail_dict): """处理单个会话的函数,用于多线程并发""" # 获取工单基本信息 - workorder_dict = self.get_workorder_dict(conversation_rows) + base_workorder_dict = self.get_workorder_dict(conversation_rows) # 分析用户问题和解决方案 user_question_list = self.get_user_question_and_solution(conversation_rows) + # 获取第一个问题和解决方案,用于后续分析 + if user_question_list and len(user_question_list) > 0: + first_question = user_question_list[0] + user_question_str = first_question.user_question + solution_str = first_question.solution + else: + user_question_str = "" + solution_str = "" + # 分析是否抱怨、是否投诉、抱怨级别 is_dissatisfaction, dissatisfaction_level, dissatisfaction_reasoning, is_complaint = ( self.get_is_complaint_and_is_complaint_level(conversation_rows)) + # 分析问题类型 + problem_type = self.get_problem_type(conversation_rows, user_question_str, solution_str) + + # 分析产品线 + product_line = self.get_product_line(conversation_rows, product_detail_dict, user_question_str, solution_str) + # 分析产品名称和模块名称 + if product_line != '': + product_name, module_name = self.get_product_name_and_module_name( + product_line, conversation_rows, product_detail_dict, user_question_str, solution_str) + else: + product_name = '' + module_name = '' + + # 创建工单列表 + workorder_list = [] + for user_question in user_question_list: user_question_str = user_question.user_question solution_str = user_question.solution - # 分析问题类型 - problem_type = self.get_problem_type(conversation_rows, user_question_str, solution_str) - # 分析产品线 - product_line = self.get_product_line(conversation_rows, product_detail_dict, user_question_str, solution_str) - # 分析产品名称和模块名称 - if product_line != '': - product_name, module_name = self.get_product_name_and_module_name( - product_line, conversation_rows, product_detail_dict, user_question_str, solution_str) - else: - product_name = '' - module_name = '' - + # 创建新的工单字典,复制基本信息 + workorder_dict = base_workorder_dict.copy() + # 更新工单字典 workorder_dict.update({ "产品线": product_line, @@ -426,10 +437,13 @@ class DialogueToWorkorder: "是否投诉": "是" if is_complaint else '否', "解决方案": (solution_str + '\n存在抱怨:' + dissatisfaction_reasoning) if is_dissatisfaction else solution_str }) + + # 将工单添加到列表中 + workorder_list.append(workorder_dict) - return workorder_dict + return workorder_list - def analyze_conversation_data(self, conversation_excel_path, product_detail_excel_path, max_workers=4): + def analyze_conversation_data(self, conversation_excel_path, product_detail_excel_path, max_workers=10): """分析会话数据主流程,使用多线程并发处理""" # 读取Excel文件 df = pd.read_excel(conversation_excel_path) @@ -457,9 +471,10 @@ class DialogueToWorkorder: for future in concurrent.futures.as_completed(future_to_conversation): conversation_id = future_to_conversation[future] try: - workorder_dict = future.result() - workorder_dict_list.append(workorder_dict) - print(f"完成处理会话ID: {conversation_id}") + result_workorders = future.result() + # 将每个会话的所有工单添加到总列表中 + workorder_dict_list.extend(result_workorders) + print(f"完成处理会话ID: {conversation_id},生成工单数量: {len(result_workorders)}") except Exception as exc: print(f"处理会话ID: {conversation_id} 时发生错误: {exc}") diff --git a/rag2_0/demo/intent_recognition_example.py b/rag2_0/demo/intent_recognition_example.py index 5c6dbce..6c84c0e 100644 --- a/rag2_0/demo/intent_recognition_example.py +++ b/rag2_0/demo/intent_recognition_example.py @@ -121,7 +121,7 @@ def process_query(recognizer, query): time.sleep(10 * retry_count) # 示例查询 -examples_query = """这个安全文明费费率在哪里调""" +examples_query = """储能软件组合件界面,点击隐藏空项目划分后界面没有任何变化""" def main(): """ @@ -138,7 +138,7 @@ def main(): # 读取提问数据 current_dir = os.path.dirname(os.path.abspath(__file__)) - data_file = os.path.join(current_dir, "..", "..", "data", "excel", "测试提问数据.xlsx") + data_file = os.path.join(current_dir, "..", "..", "data", "excel", "400条提问意图分类数据-原始.xlsx") # 检测是否为调试模式,调试模式下使用examples_query,否则从Excel读取 @@ -150,7 +150,7 @@ def main(): if not is_debug: - max_workers = 5 # 减少并发数以避免API限制 + max_workers = 10 # 减少并发数以避免API限制 logging.info(f"共有 {len(examples)} 个问题需要处理,使用 {max_workers} 个并发线程") # 创建一个与输入顺序相同的结果列表 results = [None] * len(examples) diff --git a/rag2_0/dify/intent_recognition_api.py b/rag2_0/dify/intent_recognition_api.py index 12f0ca2..4e3a072 100644 --- a/rag2_0/dify/intent_recognition_api.py +++ b/rag2_0/dify/intent_recognition_api.py @@ -23,30 +23,44 @@ def intent_recognize(): if not query: return Response(json.dumps({"error": "缺少query参数"}, ensure_ascii=False), content_type='application/json; charset=utf-8', status=400) start_time = time.time() - classification, keywords, rewrite, query_keys = recognizer.process_query(query) + result = recognizer.process_query_with_slots(query) end_time = time.time() print(f"意图识别耗时: {end_time - start_time:.2f}秒") - # keywords对象转为字符串 + + # 提取分类信息 + classification = result["classification"] + + # 提取关键词信息 + keywords = result["keywords"] keywords_str = "" - if keywords and keywords.terms: + if keywords and keywords.get("terms"): term_details = [] - for term in keywords.terms: + for term in keywords["terms"]: term_info = { - "名称": term.name, - "同义词": ";".join(term.synonymous) if term.synonymous else "", - "描述": term.description + "名称": term["name"], + "同义词": ";".join(term["synonymous"]) if term["synonymous"] else "", + "描述": term["description"] } term_details.append(term_info) keywords_str = term_details - result = { + + # 提取槽位填充信息 + slot_filling = result.get("slot_filling", {}) + + response_result = { "source_query": query, - "source_query_keys": query_keys, - "vertical_classification": classification.vertical_classification, - "sub_classification": classification.sub_classification, - "rewrite_query": rewrite.rewrite, - "keywords": keywords_str + "source_query_keys": result["query_keys"], + "vertical_classification": classification["vertical_classification"], + "sub_classification": classification["sub_classification"], + "rewrite_query": result["rewrite"]["rewrite"], + "keywords": keywords_str, + "slot_filling": { + "is_complete": slot_filling.get("is_complete", False), + "missing_slots": slot_filling.get("missing_slots", {}), + "filled_data": slot_filling.get("filled_data", {}) + } } - return Response(json.dumps(result, ensure_ascii=False), content_type='application/json; charset=utf-8') + return Response(json.dumps(response_result, ensure_ascii=False), content_type='application/json; charset=utf-8') except Exception as e: return Response(json.dumps({"error": str(e)}, ensure_ascii=False), content_type='application/json; charset=utf-8', status=500) diff --git a/rag2_0/intent_recognition/DataModels.py b/rag2_0/intent_recognition/DataModels.py index c7d28ab..38795bd 100644 --- a/rag2_0/intent_recognition/DataModels.py +++ b/rag2_0/intent_recognition/DataModels.py @@ -9,7 +9,27 @@ Description: 提取和分类的数据模型 from pydantic import BaseModel, Field from typing import List, Optional, Dict, Tuple +from enum import Enum +class SoftwareName(str, Enum): + """软件名称枚举类""" + D3 = "配网工程计价通D3软件" + C1 = "新型储能电站建设计价通C1软件" + Z1 = "西藏电力工程计价通Z1软件" + T1 = "技改检修工程计价通T1软件" + T1_LIST = "技改检修清单计价通T1软件" + MAIN = "主网电力建设计价通软件" + UNKNOWN = "" # 未知 + + # 软件别名映射 + ALIASES = { + D3: ["配网D3", "D3软件", "配网工程软件"], + C1: ["储能C1", "C1软件", "储能电站软件", "储能软件"], + Z1: ["西藏Z1", "Z1软件", "西藏电力软件"], + T1: ["技改T1", "T1软件", "技改检修软件"], + T1_LIST: ["技改清单T1", "T1清单软件", "技改检修清单软件"], + MAIN: ["主网软件", "电力建设软件", "主网建设软件", "主网软件"] + } # 定义输出模型 class Term(BaseModel): @@ -38,7 +58,7 @@ class QueryRewrite(BaseModel): # 1. 软件问题 # 1.1 软件功能 class SoftwareFunction(BaseModel): - software_name: str = Field(description="软件名称") + software_name: SoftwareName = Field(description="软件名称") function_name: str = Field(description="具体功能名称") operation: str = Field(description="用户操作意图(如何使用功能、功能入口、功能使用场景)") software_version: Optional[str] = Field(None, description="软件版本") @@ -57,7 +77,7 @@ class SoftwareFunction(BaseModel): # 1.2 故障排查 class TroubleShooting(BaseModel): - software_name: str = Field(description="软件名称") + software_name: SoftwareName = Field(description="软件名称") function_name: str = Field(description="具体功能名称/操作描述") error_message: str = Field(description="报错信息/异常现象") software_version: Optional[str] = Field(None, description="软件版本") @@ -80,7 +100,7 @@ class TroubleShooting(BaseModel): class ProfessionalConsulting(BaseModel): scene_subject: str = Field(description="场景主体") business_scene: str = Field(description="业务场景描述") - software_name: Optional[str] = Field(None, description="软件名称") + software_name: Optional[SoftwareName] = Field(None, description="软件名称") def check_required_slots(self) -> Tuple[bool, Dict[str, str]]: """检查必填槽位是否都存在""" @@ -95,7 +115,7 @@ class ProfessionalConsulting(BaseModel): class DataProblem(BaseModel): expense_type: str = Field(description="费用类型") operation_purpose: str = Field(description="操作目的") - software_name: Optional[str] = Field(None, description="软件名称") + software_name: Optional[SoftwareName] = Field(None, description="软件名称") project_type: Optional[str] = Field(None, description="工程类型") def check_required_slots(self) -> Tuple[bool, Dict[str, str]]: @@ -141,7 +161,9 @@ class SoftwareLock(BaseModel): # 3.3 安装下载类 class InstallationDownload(BaseModel): - software_name: str = Field(description="软件/插件名称") + + software_name: SoftwareName = Field(description="软件/插件名称,与file_name二选一") + file_name: str = Field(description="文件名,与software_name二选一") operation_stage: str = Field(description="操作阶段") os_version: Optional[str] = Field(None, description="操作系统版本") package_source: Optional[str] = Field(None, description="安装包来源/版本号") @@ -149,8 +171,9 @@ class InstallationDownload(BaseModel): def check_required_slots(self) -> Tuple[bool, Dict[str, str]]: """检查必填槽位是否都存在""" missing_slots = {} - if not self.software_name: + if not self.software_name and not self.file_name: missing_slots["software_name"] = InstallationDownload.model_fields["software_name"].description + missing_slots["file_name"] = InstallationDownload.model_fields["file_name"].description if not self.operation_stage: missing_slots["operation_stage"] = InstallationDownload.model_fields["operation_stage"].description return len(missing_slots) == 0, missing_slots @@ -158,7 +181,7 @@ class InstallationDownload(BaseModel): # 3.4 问题排查类 class ProblemDiagnosis(BaseModel): error_message: str = Field(description="报错信息/异常现象") - software_name: Optional[str] = Field(None, description="软件名称") + software_name: Optional[SoftwareName] = Field(None, description="软件名称") os_version: Optional[str] = Field(None, description="操作系统版本") def check_required_slots(self) -> Tuple[bool, Dict[str, str]]: diff --git a/rag2_0/intent_recognition/IntentRecognition.py b/rag2_0/intent_recognition/IntentRecognition.py index fd00a48..3a77e3a 100644 --- a/rag2_0/intent_recognition/IntentRecognition.py +++ b/rag2_0/intent_recognition/IntentRecognition.py @@ -148,6 +148,40 @@ class IntentRecognizer: except Exception as e: raise RuntimeError(f"无法解析LLM关键词提取响应: {e}") from e + def rerank_matched_terms(self, query_key: str, matched_terms: set, top_k: int = 2) -> List[Term]: + """ + 对召回的专业术语进行重排序,按与用户查询的相关性排序 + + Args: + query: 用户查询 + matched_terms: 匹配到的专业术语集合 + query_keys: 用户查询中提取的关键词列表 + + Returns: + 重排序后的专业术语列表 + """ + if not matched_terms: + return [] + + try: + # 将每个术语转换为可用于重排序的文本表示 + term_texts = ["名称:" + term.name + "|" + "同义词:" + ";".join(term.synonymous) + "|" + "描述:" + term.description for term in matched_terms] + + # 使用重排序模型 + xinference_reranker = SiliconFlowReRankerModel() + rerank_results = xinference_reranker.rerank(query_key, term_texts, top_k=top_k) + + # 将matched_terms转换为列表以便按索引访问 + matched_terms_list = list(matched_terms) + + # 根据重排序结果获取排序后的术语列表 + reranked_terms = [matched_terms_list[result["index"]] for result in rerank_results if result["score"] >= 0.6] + + return reranked_terms + + except Exception as e: + return list(matched_terms) + def match_keywords(self, query: str) -> Tuple[TermList, List[str]]: """ 从用户问题中匹配关键词,结合LLM提取和向量检索 @@ -158,7 +192,6 @@ class IntentRecognizer: Returns: 匹配到的关键词列表 """ - matched_terms = set() # 存储匹配到的Term对象 query_keys=[] # 步骤2: 使用LLM提取查询中的关键词 try: @@ -168,12 +201,13 @@ class IntentRecognizer: except Exception as e: raise RuntimeError(f"LLM关键词提取失败: {e}") from e + matched_terms = [] # 存储匹配到的Term对象 # 步骤3: 使用向量检索找到相似的专业名词 try: # 对matched_terms中的每个关键字进行向量检索 for current_key in query_keys: vector_results = self.noun_retriever.query(current_key, top_k=3, use_intersection=True) - + current_key_terms = set() # 添加向量检索结果 for result in vector_results: term = Term( @@ -181,18 +215,12 @@ class IntentRecognizer: synonymous=result.get('synonymous', []), description=result.get('description', '') ) - matched_terms.add(term) - + current_key_terms.add(term) + reranked_terms = self.rerank_matched_terms(current_key, current_key_terms) + matched_terms.extend(reranked_terms) except Exception as e: raise RuntimeError(f"向量检索关键词时出错: {e}") from e - if len(matched_terms) != 0: - txts = ["名称:" + term.name + "|" + "同义词:" + ";".join(term.synonymous) + "|" + "描述:" + term.description for term in matched_terms] - # txts = [term.name for term in matched_terms] - xinference_reranker = SiliconFlowReRankerModel() - rerank_results = xinference_reranker.rerank(query, txts, top_k=5) - matched_terms_list = list(matched_terms) - matched_terms = [matched_terms_list[result["index"]] for result in rerank_results] # 提取所有Term对象的名称并排序 # 将set类型的matched_terms转换为TermList类型 term_list = TermList(terms=list(matched_terms)) @@ -295,7 +323,7 @@ class IntentRecognizer: # rewrite = QueryRewrite(rewrite=query) return classification, keywords_terms, rewrite, query_keys - def fill_slots(self, query: str, classification: Classification, keywords: TermList) -> Dict[str, Any]: + def fill_slots(self, query: str, classification: Classification) -> Dict[str, Any]: """ 根据分类结果对问题进行槽位填充 @@ -313,7 +341,7 @@ class IntentRecognizer: return {"error": "未找到匹配的槽位模型"} # 使用LLM进行槽位填充 - filled_slots = self._fill_slots_with_llm(query, classification, keywords, slot_model) + filled_slots = self._fill_slots_with_llm(query, classification, slot_model) # 检查必填槽位是否都已填充 is_complete, missing_slots = filled_slots.check_required_slots() @@ -349,7 +377,7 @@ class IntentRecognizer: return DataProblem # 安装下载注册 - elif classification.vertical_classification == "安装下载": + elif classification.vertical_classification == "安装下载注册": if classification.sub_classification == "后缀名咨询": return FileExtensionConsulting elif classification.sub_classification == "软件锁类": @@ -361,14 +389,13 @@ class IntentRecognizer: return None - def _fill_slots_with_llm(self, query: str, classification: Classification, keywords: TermList, slot_model_class: type) -> Any: + def _fill_slots_with_llm(self, query: str, classification: Classification, slot_model_class: type) -> Any: """ 使用LLM进行槽位填充 Args: query: 用户原始问题 classification: 意图分类结果 - keywords: 匹配的关键词列表 slot_model_class: 槽位模型类 Returns: @@ -377,15 +404,11 @@ class IntentRecognizer: # 准备提示词 slot_parser = PydanticOutputParser(pydantic_object=slot_model_class) model_schema = json.dumps(slot_model_class.model_json_schema(), ensure_ascii=False) - terms_dict = [term.model_dump() for term in keywords.terms] - keywords_str = json.dumps(terms_dict, ensure_ascii=False) formatted_prompt = slot_filling_prompt.format( query=query, vertical_classification=classification.vertical_classification, sub_classification=classification.sub_classification, - keywords=keywords_str, - model_schema=model_schema, output_format=slot_parser.get_format_instructions() ) @@ -417,7 +440,7 @@ class IntentRecognizer: # 如果是有效分类,进行槽位填充 slot_filling_result = {} if classification.vertical_classification not in ["其他", "闲聊"] and classification.sub_classification not in ["其他", "闲聊"]: - slot_filling_result = self.fill_slots(rewrite.rewrite, classification, keywords) + slot_filling_result = self.fill_slots(rewrite.rewrite, classification) return { "classification": classification.model_dump(), diff --git a/rag2_0/intent_recognition/ProfessionalNounVector.py b/rag2_0/intent_recognition/ProfessionalNounVector.py index 9757958..2209506 100644 --- a/rag2_0/intent_recognition/ProfessionalNounVector.py +++ b/rag2_0/intent_recognition/ProfessionalNounVector.py @@ -157,21 +157,21 @@ class ProfessionalNounVectorizer: for term in terms: name = term["name"] texts.append(name.strip()) - synonyms = term.get("synonymous", []) + synonymous = term.get("synonymous", []) description = term.get("description", "") # 记录元数据 metadatas.append({ "name": name, - "synonyms": synonyms, + "synonymous": synonymous, "description": description }) - if len(synonyms) > 0: - synonyms_str = ', '.join(synonyms) + if len(synonymous) > 0: + synonyms_str = ', '.join(synonymous) texts.append(synonyms_str.strip()) metadatas.append({ "name": name, - "synonyms": synonyms, + "synonymous": synonymous, "description": description }) @@ -179,7 +179,7 @@ class ProfessionalNounVectorizer: texts.append(description.strip()) metadatas.append({ "name": name, - "synonyms": synonyms, + "synonymous": synonymous, "description": description }) diff --git a/rag2_0/intent_recognition/PromptTemplates.py b/rag2_0/intent_recognition/PromptTemplates.py index 9c15950..e24156b 100644 --- a/rag2_0/intent_recognition/PromptTemplates.py +++ b/rag2_0/intent_recognition/PromptTemplates.py @@ -90,7 +90,7 @@ query_rewrite_prompt = """ ## 第三阶段:专业重构 3. 术语规范化处理 - a. 实施术语映射:将口语表达替换为知识库标准术语 + a. 实施术语映射:将口语表达替换为知识库标准术语,优先保留原问题中的术语 b. 执行结构优化: - 采用【术语标记】规范标注关键概念 - 构建主谓宾明确的问题句式 @@ -118,14 +118,13 @@ query_rewrite_prompt = """ # 质量约束条款 1. 语义内容保真原则 - 禁止修改原问题核心诉求(如转换主语/变更操作对象) - - 保留原始问题的限定条件 + - 保留原始问题的限定条件(包括:软件名称等) 2. 术语使用规范 - 仅使用检索返回的关键词进行术语替换 - 新增术语必须来自关键词集合 3. 结构优化标准 - - 问题长度控制在20字内 - 必须包含≥1个【标注术语】 - 禁止添加解释性语句 @@ -144,12 +143,6 @@ slot_filling_prompt = """ 垂直领域分类: {vertical_classification} 子分类: {sub_classification} -【已识别关键词】 -{keywords} - -【目标数据结构】 -{model_schema} - 【输出格式】 {output_format}