#!/usr/bin/env python # -*- coding: utf-8 -*- """ File: IntentRecognition.py Author: oyyz Date: 2025-05-13 Description: 意图分类、改写核心逻辑 """ import logging import os import asyncio from langchain.output_parsers import PydanticOutputParser from langchain_core.output_parsers import JsonOutputParser import json from typing import List, Tuple, Dict, Any, Optional import re import jieba import time import threading from .PromptTemplates import (classification_prompt, query_rewrite_prompt_pro, extract_nouns_prompt, classification_info, slot_filling_prompt, step_back_prompt, hyde_prompt) from .DataModels import ( Classification, QueryRewrite, Term, TermList, SoftwareFunctionSlots, SoftwareTroubleShootingSlots, ProfessionalConsultingSlots, DataProblemSlots, FileExtensionConsultingSlots, SoftwareLockSlots, InstallationDownloadSlots, ProblemDiagnosisSlots, OtherSlots, IntentAndSlotResult, StepBackPrompt, HypotheticalDocument ) from .ProfessionalNounVector import ProfessionalNounRetriever, AsyncProfessionalNounRetriever from rag2_0.tool.ModelTool import XinferenceReRankerModel, OpenAiLLM class AsyncIntentRecognizer: SOFT_WIKI_PATH = "data/wiki_data" SOFT_NAMETOWIKI_MAP = { "配网计价通D3软件": "配网计价通D3软件.txt", "西藏计价通Z1软件": "西藏计价通Z1软件.txt", "储能计价通C1软件": "储能计价通C1软件.txt", "技改检修工程计价通T1软件": "技改检修工程计价通T1软件.txt", "技改检修清单计价通T1软件": "技改检修清单计价通T1软件.txt", "电力建设计价通软件": "电力建设计价通软件.txt", "下载安装注册": "下载安装注册.txt", } """ 异步意图识别和问题改写类 """ def __init__(self): """ 初始化异步意图识别器 Args: api_key: OpenAI API密钥,如果为None则从环境变量获取 base_url: OpenAI API基础URL,如果为None则使用默认URL model_name: 要使用的模型名称 vector_index_dir: 向量索引目录,如果为None则使用默认目录 """ api_key = os.getenv("OPENAI_API_KEY") base_url = os.getenv("OPENAI_API_BASE") model_name = os.getenv("MODEL_NAME", "gpt-3.5-turbo") # 初始化LLM llm_params = { "temperature": 0.2, # 降低随机性,使结果更确定 "top_p": 0.7, "model": model_name, "api_key": api_key, "base_url": base_url } self._llm = OpenAiLLM(**llm_params) llm_params["model"] = os.getenv("MINI_MODEL_NAME", "gpt-3.5-turbo") self._llm_mini = OpenAiLLM(**llm_params) # 加载suffix关键词 self._suffix_keywords = self._load_suffix_keywords() # 加载软件词条名称库 self._soft_wiki_library = self._load_soft_wiki_library() # 异步检索器将在create方法中初始化 # self._noun_retriever = None # 初始化jieba自定义词典 self._init_jieba_dict() self._synonymous_dict=self._init_synonymous_dict() def _init_jieba_dict(self): """初始化jieba自定义词典""" try: current_dir = os.path.dirname(os.path.abspath(__file__)) dict_path = os.path.join(current_dir, "..", "..", "data", "nouns", "all_synonymous_jieba.txt") # 检查字典文件是否存在 if os.path.exists(dict_path): jieba.load_userdict(dict_path) logging.info("成功加载jieba自定义词典") else: logging.warning(f"自定义词典文件不存在: {dict_path}") except Exception as e: logging.error(f"加载jieba自定义词典失败: {e}") def _init_synonymous_dict(self): """加载同义词,key是同义词 val:是对应名词""" try: current_dir = os.path.dirname(os.path.abspath(__file__)) dict_path = os.path.join(current_dir, "..", "..", "data", "nouns", "merged_nouns.json") # 检查字典文件是否存在 synonymous_dict={} if os.path.exists(dict_path): with open(dict_path, "r", encoding="utf-8") as f: data = json.load(f) for cur_data in data: synonymous=cur_data["synonymous"] name=cur_data["name"] for cur_synonymous in synonymous: synonymous_dict[cur_synonymous]=name else: logging.warning(f"名词库文件不存在: {dict_path}") return synonymous_dict except Exception as e: logging.error(f"加载名词库文件失败: {e}") return {} def _load_soft_wiki_library(self): """ 加载软件wiki库 """ SOFT_WIKI_LIBRARY = {} for soft_name, wiki_file_name in self.SOFT_NAMETOWIKI_MAP.items(): with open(f"{self.SOFT_WIKI_PATH}/{wiki_file_name}", "r", encoding="utf-8") as f: lines = f.readlines() # 去除空行 lines = [line.strip() for line in lines if line.strip()] SOFT_WIKI_LIBRARY[soft_name] = lines return SOFT_WIKI_LIBRARY @classmethod async def create(cls): """ 异步工厂方法:创建并初始化异步意图识别器实例 Returns: 初始化完成的AsyncIntentRecognizer实例 """ instance = cls() # 异步初始化名词检索器 # instance._noun_retriever = await AsyncProfessionalNounRetriever.create() return instance def _load_suffix_keywords(self, filepath: str = None) -> List[str]: """ 加载后缀关键词列表 Args: filepath: 后缀关键词文件路径,默认为None使用默认路径 Returns: 后缀关键词列表 """ try: # 如果未指定路径,使用默认路径 if filepath is None: current_dir = os.path.dirname(os.path.abspath(__file__)) filepath = os.path.join(current_dir, "..", "..", "data", "nouns", "suffix_keywords.json") # 读取JSON文件 with open(filepath, "r", encoding="utf-8") as f: suffix_data = json.load(f) # 添加额外的固定后缀 return suffix_data except Exception as e: raise RuntimeError(f"加载后缀关键词失败: {e}") from e async def _classify_intent_async(self, query: str, conversation_context: str = "", chat_history: List[Dict[str, str]] = None, previous_slots: Dict[str, Any] = None) -> Classification: """ 异步对用户输入进行意图分类 Args: content: 用户输入内容 keywords: 匹配到的关键词列表 rewrite: 重写的问题 Returns: 分类结果 """ classification_parser = PydanticOutputParser(pydantic_object=Classification) formatted_prompt = classification_prompt.format(user_input=query, classification_info=classification_info, output_format=classification_parser.get_format_instructions(), # conversation_context=conversation_context, chat_history=json.dumps(chat_history, ensure_ascii=False)) # 解析输出 try: # 异步调用LLM response = await self._llm.invoke_async(formatted_prompt, False) # 尝试直接解析JSON响应 response.content = response.content.strip() clean_output = re.sub(r'.*?', '', response.content, flags=re.DOTALL) parsed_output = classification_parser.parse(clean_output) return parsed_output except Exception as e: raise RuntimeError(f"解析分类结果时出错: {e}") from e def _tokenize_with_jieba(self, query: str) -> List[str]: """ 使用jieba分词器对查询进行分词 Args: query: 用户查询 Returns: 分词后的词语列表 """ # 使用jieba进行分词 seg_list = jieba.cut(query, cut_all=False) # 过滤掉停用词和标点符号 filtered_tokens = [] for token in seg_list: # 过滤掉空格和标点符号 if token.strip() and not re.match(r'^[^\w\s]$', token): filtered_tokens.append(token) return filtered_tokens async def _extract_keywords_async(self, query: str, use_jieba: bool = False) -> List[Term]: """ 异步使用LLM从用户查询中提取专业关键词 Args: query: 用户查询 use_jieba: 是否使用jieba分词辅助提取关键词 Returns: 提取的术语列表 """ # 如果使用jieba分词 if use_jieba: # 先使用jieba分词 tokens = self._tokenize_with_jieba(query) # 构建术语列表 terms = [] for token in tokens: if len(token) > 1: # 过滤掉单字词 terms.append(Term(name=token, synonymous=[], description="")) return terms else: # 使用LLM提取关键词 # 准备提示词 formatted_prompt = extract_nouns_prompt.replace("{content}", query) terms_list_parser = PydanticOutputParser(pydantic_object=TermList) formatted_prompt = formatted_prompt.replace("{output_format}", terms_list_parser.get_format_instructions()) # 异步调用LLM response = await self._llm.invoke_async(formatted_prompt, False) # 尝试使用Pydantic解析器解析TermList response.content = response.content.strip() clean_output = re.sub(r'.*?', '', response.content, flags=re.DOTALL) parsed_output = terms_list_parser.parse(clean_output) return parsed_output.terms async def _rerank_matched_terms_async(self, query_key: str, matched_terms: set, top_k: int = 2, rerank_score:float = 0.6) -> List[Term]: """ 异步对召回的专业术语进行重排序,按与用户查询的相关性排序 Args: query: 用户查询 matched_terms: 匹配到的专业术语集合 query_keys: 用户查询中提取的关键词列表 Returns: 重排序后的专业术语列表 """ if not matched_terms: return [] if len(matched_terms) <= top_k: return list(matched_terms) try: # 将每个术语转换为可用于重排序的文本表示 term_texts = ["名称:" + term.name + "|" + "同义词:" + ";".join(term.synonymous) for term in matched_terms] # 使用异步重排序模型 rerank_results = await XinferenceReRankerModel.rerank_async(query_key, term_texts, top_k=top_k) # 将matched_terms转换为列表以便按索引访问 matched_terms_list = list(matched_terms) # 根据重排序结果获取排序后的术语列表 reranked_terms = [matched_terms_list[result["index"]] for result in rerank_results if result["score"] >= rerank_score] return reranked_terms except Exception as e: raise RuntimeError(f"异步_rerank_matched_terms重排失败:{e}") from e async def _match_keywords_async(self, query: str, use_jieba: bool = False) -> Tuple[TermList, List[str]]: """ 异步从用户问题中匹配关键词,结合LLM提取和向量检索 Args: query: 用户问题 use_jieba: 是否使用jieba分词辅助提取关键词 Returns: 匹配到的关键词列表 """ start_time = time.time() query_keys=[] # 步骤1: 提取查询中的关键词 try: llm_start_time = time.time() extracted_terms = await self._extract_keywords_async(query, use_jieba) for term in extracted_terms: query_keys.append(term.name) llm_end_time = time.time() llm_time = llm_end_time - llm_start_time except Exception as e: raise RuntimeError(f"异步LLM关键词提取失败: {e}") from e matched_terms = [] # 存储匹配到的Term对象 # 查找同义词 for cur_key in query_keys: if cur_key not in self._synonymous_dict: continue name = self._synonymous_dict[cur_key] matched_terms.append(Term(name=name,synonymous=[cur_key],description="")) # 提取所有Term对象的名称并排序 # 将set类型的matched_terms转换为TermList类型 term_list = TermList(terms=list(matched_terms)) end_time = time.time() total_time = end_time - start_time # 输出整合的时间日志 logging.info(f"异步关键词匹配耗时统计 - 总耗时: {total_time:.2f}秒") return term_list, query_keys async def _rewrite_query_async(self, query: str, keywords: TermList, query_keys:List[str], chat_history: List[Dict[str, str]] = None, context: str = "") -> QueryRewrite: """ 异步对用户问题进行改写 Args: query: 用户原始问题 keywords: 匹配到的关键词列表 query_keys: 用户查询中提取的关键词列表 Returns: 改写结果 """ # 准备问题改写提示 terms_dict = [term.model_dump(exclude={"description"}) for term in keywords.terms] keywords_str = json.dumps(terms_dict, ensure_ascii=False) query_rewrite_parser = PydanticOutputParser(pydantic_object=QueryRewrite) formatted_prompt = query_rewrite_prompt_pro.format(query=query, output_format=query_rewrite_parser.get_format_instructions(), keywords=keywords_str, chat_history=chat_history, context=context) # 解析输出 try: # 异步调用LLM response = await self._llm.invoke_async(formatted_prompt, False) response.content = response.content.strip() clean_output = re.sub(r'.*?', '', response.content, flags=re.DOTALL) parsed_output = query_rewrite_parser.parse(clean_output) return parsed_output except Exception as e: raise RuntimeError(f"解析问题改写结果时出错: {e}") from e def _process_lock_related_query(self, query: str) -> str: """ 特殊处理锁相关咨询 """ pattern = r'(? Dict[str, Any]: """ 异步处理用户问题的完整流程 Args: query: 用户原始问题 conversation_context: 会话背景信息 chat_history: 历史对话记录,格式为[{"user": "content"}, {"assistant": "content"}] previous_slots: 历史槽位信息 use_jieba: 是否使用jieba分词辅助提取关键词 enable_query_expansion: 是否启用查询扩展 cur_soft_name: 当前查询的软件名称 Returns: 包含分类、关键词、改写和槽位填充结果的字典 """ if chat_history is None: chat_history = [] if previous_slots is None: previous_slots = {} if conversation_context is None: conversation_context = {} # 步骤: 并行执行提问扩展 query_expand_tasks = [] if enable_query_expansion: # 创建异步任务并立即开始执行 query_expand_tasks = [ # 后退提示 asyncio.create_task(self._generate_step_back_prompt_async(query, chat_history, conversation_context)), # 文档查询 # asyncio.create_task(self._find_matching_software_docs_async(query, cur_soft_name, chat_history)), ] # 执行关键词匹配 keywords_task = self._match_keywords_async(query, use_jieba) # 等待关键词匹配完成 keywords_terms, query_keys = await keywords_task # 步骤2-3: 并行执行问题改写和意图分类 rewrite_task = self._rewrite_query_async( query=query, keywords=keywords_terms, query_keys=query_keys, chat_history=chat_history, context=conversation_context ) classification_task = self._classify_intent_async(query, conversation_context, chat_history, previous_slots) # 并行等待问题改写和意图分类完成 start_time = time.time() rewrite, classification = await asyncio.gather(rewrite_task, classification_task) end_time = time.time() logging.info(f"意图分类耗时统计 - 总耗时: {end_time - start_time:.2f}秒") # 特殊处理 锁相关咨询 if classification.vertical_classification == "安装下载注册" and classification.sub_classification == "软件锁类": process_lock_start_time = time.time() rewrite.rewrite = self._process_lock_related_query(rewrite.rewrite) process_lock_end_time = time.time() process_lock_time = process_lock_end_time - process_lock_start_time logging.info(f"锁相关咨询正则匹配 - 总耗时: {process_lock_time:.2f}秒") slot_filling_result = {} if not enable_query_expansion: return { "classification": classification.model_dump(), "keywords": keywords_terms.model_dump(), "rewrite": rewrite.model_dump(), "query_keys": query_keys, "slot_filling": slot_filling_result } # 等待所有query_expand_tasks完成 start_time = time.time() query_expand_results = await asyncio.gather(*query_expand_tasks) end_time = time.time() logging.info(f"问题扩展环节耗时统计 - 总耗时: {end_time - start_time:.2f}秒") # 收集结果 step_back_result = query_expand_results[0] if query_expand_results[0] else StepBackPrompt(original_query=query, can_use_back_prompt=False, step_back_query=[query]) # wiki_result = query_expand_results[1] if query_expand_results[1] else [] all_questions=[] all_questions.append(query) all_questions.append(rewrite.rewrite) # all_questions.extend(wiki_result) all_questions.extend(step_back_result.step_back_query) all_questions = list(set(all_questions)) query_expand = { "all": all_questions, "step_back": step_back_result.step_back_query, # "wiki_title": wiki_result, "original_query":query, "rewrite_query":rewrite.rewrite } # 返回所有结果 return { "classification": classification.model_dump(), "keywords": keywords_terms.model_dump(), "rewrite": rewrite.model_dump(), "query_keys": query_keys, "slot_filling": slot_filling_result, "query_expand": query_expand } async def _fill_slots_async(self, query: str, classification: Classification, conversation_context: str = "", chat_history: List[Dict[str, str]] = None, previous_slots: Dict[str, Any] = None,) -> Dict[str, Any]: """ 异步根据分类结果对问题进行槽位填充 Args: query: 用户原始问题 classification: 意图分类结果 Returns: 填充后的槽位数据模型 """ # 根据分类结果选择对应的数据模型 slot_model = self._get_slot_model(classification) if not slot_model: raise RuntimeError("未找到匹配的槽位模型") fill_slots_start_time = time.time() # 使用LLM进行槽位填充 filled_slots = await self._fill_slots_with_llm_async(query, classification, slot_model, conversation_context, chat_history, previous_slots) fill_slots_end_time = time.time() fill_slots_time = fill_slots_end_time - fill_slots_start_time logging.info(f"异步槽位填充耗时统计 - 总耗时: {fill_slots_time:.2f}秒") # 检查必填槽位是否都已填充 is_complete, missing_slots = filled_slots.check_required_slots() return { "is_complete": is_complete, "missing_slots": missing_slots, "filled_data": filled_slots.model_dump() } def _get_slot_model(self, classification: Classification) -> Optional[type]: """ 根据分类结果获取对应的槽位模型类,用于统一提示词处理 Args: classification: 意图分类结果 Returns: 对应的槽位模型类 """ # 软件问题 if classification.vertical_classification == "软件问题": if classification.sub_classification == "软件功能": return SoftwareFunctionSlots elif classification.sub_classification == "故障排查": return SoftwareTroubleShootingSlots # 业务问题 elif classification.vertical_classification == "业务问题": if classification.sub_classification == "专业咨询": return ProfessionalConsultingSlots elif classification.sub_classification == "数据问题": return DataProblemSlots # 安装下载注册 elif classification.vertical_classification == "安装下载注册": if classification.sub_classification == "后缀名咨询": return FileExtensionConsultingSlots elif classification.sub_classification == "软件锁类": return SoftwareLockSlots elif classification.sub_classification == "安装下载类": return InstallationDownloadSlots elif classification.sub_classification == "问题排查类": return ProblemDiagnosisSlots # 其他 elif classification.vertical_classification == "其他": return OtherSlots return None async def _fill_slots_with_llm_async(self, query: str, classification: Classification, slot_model_class: type, conversation_context: str = "", chat_history: List[Dict[str, str]] = None, previous_slots: Dict[str, Any] = None) -> Any: """ 异步使用LLM进行槽位填充 Args: query: 用户原始问题 classification: 意图分类结果 slot_model_class: 槽位模型类 Returns: 填充后的槽位数据模型实例 """ # 准备提示词 slot_parser = PydanticOutputParser(pydantic_object=slot_model_class) formatted_prompt = slot_filling_prompt.format( query=query, vertical_classification=classification.vertical_classification, sub_classification=classification.sub_classification, output_format=slot_parser.get_format_instructions(), conversation_context=conversation_context, chat_history=json.dumps(chat_history,ensure_ascii=False), previous_slots=json.dumps(previous_slots,ensure_ascii=False), ) try: # 异步调用LLM response = await self._llm.invoke_async(formatted_prompt, False) response.content = response.content.strip() clean_output = re.sub(r'.*?', '', response.content, flags=re.DOTALL) # 尝试解析LLM响应 parsed_output = slot_parser.parse(clean_output) return parsed_output except Exception as e: # 如果解析失败,创建一个空的模型实例 empty_instance = slot_model_class() return empty_instance async def _generate_step_back_prompt_async(self, query: str, chat_history: List[Dict[str, str]] = None, conversation_context: str = "") -> StepBackPrompt: """ 异步生成后退提示 Args: query: 用户原始问题 chat_history: 历史对话记录 conversation_context: 会话背景信息 Returns: 后退提示结果 """ step_back_start_time = time.time() # 准备提示词 step_back_parser = PydanticOutputParser(pydantic_object=StepBackPrompt) formatted_prompt = step_back_prompt.format( query=query, chat_history=json.dumps(chat_history, ensure_ascii=False) if chat_history else "[]", # conversation_context=conversation_context, output_format=step_back_parser.get_format_instructions() ) try: # 异步调用LLM response = await self._llm.invoke_async(formatted_prompt, False) # 解析输出 response.content = response.content.strip() clean_output = re.sub(r'.*?', '', response.content, flags=re.DOTALL) parsed_output = step_back_parser.parse(clean_output) step_back_end_time = time.time() step_back_time = step_back_end_time - step_back_start_time logging.info(f"后退提示生成耗时统计 - 总耗时: {step_back_time:.2f}秒") return parsed_output except Exception as e: # 如果解析失败,返回原始查询作为后退提示 logging.error(f"后退提示生成失败: {e}", exc_info=True) return StepBackPrompt(original_query=query, can_use_back_prompt=False, step_back_query=[query]) async def _find_matching_software_docs_async(self, query: str, soft_name: str, chat_history: List[Dict[str, str]] = None, top_k: int = 3) -> List[str]: """ 异步查找软件文档中与用户问题最匹配的几行内容 Args: query: 用户问题 soft_name: 软件名称 chat_history: 历史对话记录 top_k: 返回的匹配行数,默认为3 Returns: 匹配的文档行列表 """ if chat_history is None: chat_history = [] # 检查软件名称是否在支持的列表中 if soft_name not in self.SOFT_NAMETOWIKI_MAP: return [] # 获取软件文档内容 soft_docs = self._soft_wiki_library.get(soft_name, []) if not soft_docs: return [] soft_docs.extend(self._soft_wiki_library.get("下载安装注册", [])) # soft_docs=soft_docs[:50] # 构建文档字符串,只包含行内容 soft_docs_str = "\n".join(f"{doc.strip()}" for i, doc in enumerate(soft_docs)) # 构建提示词,让LLM选择最匹配的行 prompt = f""" {soft_docs_str} ================================ 以上为软件功能操作、常见问题排查等功能,结合历史对话,请输出与当前提问最相关的1-3个功能名称, 使用Json格式输出,如下: [{{"content": "行内容"}},...] 当前问题: {query} 历史对话: {json.dumps(chat_history, ensure_ascii=False)} """ try: # 异步调用LLM start_time = time.time() response = await self._llm.invoke_async(prompt, False, response_format={"type": "json_object"}) end_time = time.time() # 解析JSON响应 try: wiki_names = [] json_parser = JsonOutputParser() json_response = json_parser.parse(response.content) for match in json_response: wiki_names.append(match["content"]) logging.info(f"软件文档匹配耗时: {end_time - start_time:.2f}秒") return wiki_names except json.JSONDecodeError as e: logging.error(f"解析JSON响应时出错: {e}") return [] except Exception as e: logging.error(f"查找匹配软件文档时出错: {e}", exc_info=True) # 出错时返回空列表 return []