diff --git a/rag2_0/demo/intent_recognition_example.py b/rag2_0/demo/intent_recognition_example.py index 186f2c5..3203018 100755 --- a/rag2_0/demo/intent_recognition_example.py +++ b/rag2_0/demo/intent_recognition_example.py @@ -15,12 +15,13 @@ import concurrent.futures from tqdm import tqdm import time import sys +import asyncio import argparse from typing import List, Dict, Any from langchain.output_parsers import PydanticOutputParser from pydantic import BaseModel, Field sys.path.append(os.getcwd()) -from rag2_0.intent_recognition import IntentRecognizer +from rag2_0.intent_recognition import IntentRecognizer, AsyncIntentRecognizer from rag2_0.dify.DifyQueryRetrieval import DifyQueryRetrieval from rag2_0.intent_recognition.DataModels import Classification from rag2_0.tool.ModelTool import OpenAiLLM @@ -79,6 +80,12 @@ class QueryRewriteProcessor: self.model_name = model_name or os.getenv("LLM_MODEL_NAME", "gpt-3.5-turbo") self.recognizer = IntentRecognizer(api_key=self.api_key, base_url=self.base_url, model_name=self.model_name) + # 使用asyncio.run()运行异步create方法 + self.recognizer_async = asyncio.run(AsyncIntentRecognizer.create( + api_key=self.api_key, + base_url=self.base_url, + model_name=self.model_name + )) self.dify_query_retrieval = DifyQueryRetrieval(api_key=dify_api_key, base_url=dify_base_url) def is_retrieved_doc_relevant(self, query: str, retrieved_doc: List[Dict[str, Any]]) -> Dict[str, Any]: @@ -168,7 +175,11 @@ class QueryRewriteProcessor: logging.error(f"读取Excel文件时出错: {e}", exc_info=True) return [] - def process_query(self, query: str, conversation_context: str = "", chat_history: List[Dict[str, str]] = None, previous_slots: Dict[str, str] = None, enable_retrieval: bool = False): + def process_query(self, query: str, + conversation_context: str = "", + chat_history: List[Dict[str, str]] = None, + previous_slots: Dict[str, str] = None, + enable_retrieval: bool = False): """ 处理单个查询,支持重试机制,并包含槽位填充 diff --git a/rag2_0/dify/intent_recognition_api.py b/rag2_0/dify/intent_recognition_api.py index d2e5772..5080c8d 100755 --- a/rag2_0/dify/intent_recognition_api.py +++ b/rag2_0/dify/intent_recognition_api.py @@ -33,24 +33,10 @@ logger = logging.getLogger(__name__) app = Flask(__name__) -# 创建线程锁,用于保护共享资源 -recognizer_lock = RLock() - -# 使用单例模式创建意图识别器 -class RecognizerSingleton: - _instance = None - _lock = RLock() - - @classmethod - def get_instance(cls): - if cls._instance is None: - with cls._lock: - if cls._instance is None: - api_key = os.getenv("OPENAI_API_KEY") - base_url = os.getenv("OPENAI_API_BASE") - model_name = os.getenv("LLM_MODEL_NAME", "gpt-3.5-turbo") - cls._instance = IntentRecognizer(api_key=api_key, base_url=base_url, model_name=model_name) - return cls._instance +api_key = os.getenv("OPENAI_API_KEY") +base_url = os.getenv("OPENAI_API_BASE") +model_name = os.getenv("LLM_MODEL_NAME", "gpt-3.5-turbo") +_instance = IntentRecognizer(api_key=api_key, base_url=base_url, model_name=model_name) @app.route('/intent_recognize', methods=['POST']) def intent_recognize(): @@ -65,9 +51,7 @@ def intent_recognize(): start_time = time.time() - # 获取单例实例并使用线程锁保护关键操作 - recognizer = RecognizerSingleton.get_instance() - result = recognizer.process_query(query=query, + result = _instance.process_query(query=query, conversation_context=conversation_context, chat_history=chat_history, previous_slots=previous_slots, @@ -89,8 +73,6 @@ def intent_recognize(): for term in keywords["terms"]: term_info = { "名称": term["name"], - # "同义词": ";".join(term["synonymous"]) if term["synonymous"] else [], - # "描述": term["description"] } term_details.append(term_info) keywords_str = term_details diff --git a/rag2_0/intent_recognition/IntentRecognition.py b/rag2_0/intent_recognition/IntentRecognition.py index da55435..02f5884 100755 --- a/rag2_0/intent_recognition/IntentRecognition.py +++ b/rag2_0/intent_recognition/IntentRecognition.py @@ -9,12 +9,14 @@ Description: 意图分类、改写核心逻辑 import logging import os +import asyncio from langchain.output_parsers import PydanticOutputParser import json from typing import List, Tuple, Dict, Any, Optional import re import jieba import time +import threading from .PromptTemplates import (classification_prompt, query_rewrite_prompt, extract_nouns_prompt, classification_info, @@ -33,7 +35,7 @@ from .DataModels import ( InstallationDownloadSlots, ProblemDiagnosisSlots, OtherSlots, IntentAndSlotResult, StepBackPrompt, FollowUpQuestions, HypotheticalDocument, MultiQuestions ) -from .ProfessionalNounVector import ProfessionalNounRetriever +from .ProfessionalNounVector import ProfessionalNounRetriever, AsyncProfessionalNounRetriever from rag2_0.tool.ModelTool import XinferenceReRankerModel, OpenAiLLM, SiliconFlowReRankerModel @@ -448,10 +450,10 @@ class IntentRecognizer: "slot_filling": slot_filling_result } - # 等待所有greenlet完成 + # 等待所有线程完成 start_time = time.time() - for greenlet, _ in threads_and_results: - greenlet.join() + for thread, _ in threads_and_results: + thread.join() end_time = time.time() logging.info(f"问题扩展环节耗时统计 - 总耗时: {end_time - start_time:.2f}秒") @@ -750,7 +752,7 @@ class IntentRecognizer: def _run_in_thread(self, func, args=(), kwargs={}): """ - 在greenlet中执行函数并返回结果 + 在线程中执行函数并返回结果 Args: func: 要执行的函数 @@ -758,22 +760,21 @@ class IntentRecognizer: kwargs: 函数的关键字参数 Returns: - (greenlet, result_container): greenlet对象和存放结果的容器 + (thread, result_container): 线程对象和存放结果的容器 """ - from gevent import Greenlet result_container = [] - def greenlet_target(): + def thread_target(): try: result = func(*args, **kwargs) result_container.append(result) except Exception as e: - logging.error(f"greenlet执行函数 {func.__name__} 时出错: {e}", exc_info=True) + logging.error(f"线程执行函数 {func.__name__} 时出错: {e}", exc_info=True) result_container.append(None) - greenlet = Greenlet(greenlet_target) - greenlet.start() - return greenlet, result_container + thread = threading.Thread(target=thread_target) + thread.start() + return thread, result_container def _process_intent_and_slot(self, user_input: str, conversation_context: str = "", @@ -866,4 +867,813 @@ class IntentRecognizer: return result except Exception as e: - raise RuntimeError(f"process_intent_and_slot error:{e}") from e \ No newline at end of file + raise RuntimeError(f"process_intent_and_slot error:{e}") from e + + + +class AsyncIntentRecognizer: + """ + 异步意图识别和问题改写类 + """ + def __init__(self, api_key: str = None, base_url: str = None, model_name: str = "gpt-3.5-turbo", vector_index_dir: str = None): + """ + 初始化异步意图识别器 + + Args: + api_key: OpenAI API密钥,如果为None则从环境变量获取 + base_url: OpenAI API基础URL,如果为None则使用默认URL + model_name: 要使用的模型名称 + vector_index_dir: 向量索引目录,如果为None则使用默认目录 + """ + # 初始化LLM + llm_params = { + "temperature": 0.2, # 降低随机性,使结果更确定 + "top_p": 0.7, + "model": model_name + } + + # 如果提供了API密钥,则使用提供的密钥 + if api_key: + llm_params["api_key"] = api_key + + # 如果提供了自定义URL,则使用提供的URL + if base_url: + llm_params["base_url"] = base_url + + self._llm = OpenAiLLM(**llm_params) + + # 加载suffix关键词 + self._suffix_keywords = self._load_suffix_keywords() + + # 异步检索器将在create方法中初始化 + self._noun_retriever = None + self._api_key = api_key + self._vector_index_dir = vector_index_dir + + @classmethod + async def create(cls, api_key: str = None, base_url: str = None, model_name: str = "gpt-3.5-turbo", vector_index_dir: str = None): + """ + 异步工厂方法:创建并初始化异步意图识别器实例 + + Args: + api_key: OpenAI API密钥,如果为None则从环境变量获取 + base_url: OpenAI API基础URL,如果为None则使用默认URL + model_name: 要使用的模型名称 + vector_index_dir: 向量索引目录,如果为None则使用默认目录 + + Returns: + 初始化完成的AsyncIntentRecognizer实例 + """ + instance = cls(api_key, base_url, model_name, vector_index_dir) + # 异步初始化名词检索器 + instance._noun_retriever = await AsyncProfessionalNounRetriever.create( + api_key=api_key, + index_dir=vector_index_dir + ) + return instance + + def _load_suffix_keywords(self, filepath: str = None) -> List[str]: + """ + 加载后缀关键词列表 + + Args: + filepath: 后缀关键词文件路径,默认为None使用默认路径 + + Returns: + 后缀关键词列表 + """ + try: + # 如果未指定路径,使用默认路径 + if filepath is None: + current_dir = os.path.dirname(os.path.abspath(__file__)) + filepath = os.path.join(current_dir, "..", "..", "data", "nouns", "suffix_keywords.json") + + # 读取JSON文件 + with open(filepath, "r", encoding="utf-8") as f: + suffix_data = json.load(f) + + # 添加额外的固定后缀 + return suffix_data + + except Exception as e: + raise RuntimeError(f"加载后缀关键词失败: {e}") from e + + async def _classify_intent_async(self, query: str, conversation_context: str = "", + chat_history: List[Dict[str, str]] = None, + previous_slots: Dict[str, Any] = None) -> Classification: + """ + 异步对用户输入进行意图分类 + + Args: + content: 用户输入内容 + keywords: 匹配到的关键词列表 + rewrite: 重写的问题 + Returns: + 分类结果 + """ + classification_start_time = time.time() + classification_parser = PydanticOutputParser(pydantic_object=Classification) + formatted_prompt = classification_prompt.format(user_input=query, + classification_info=classification_info, + output_format=classification_parser.get_format_instructions(), + conversation_context=conversation_context, + chat_history=json.dumps(chat_history, ensure_ascii=False)) + # 解析输出 + try: + # 异步调用LLM + response = await self._llm.invoke_async(formatted_prompt, False) + + classification_end_time = time.time() + classification_time = classification_end_time - classification_start_time + logging.info(f"异步意图分类耗时统计 - 总耗时: {classification_time:.2f}秒") + + # 尝试直接解析JSON响应 + parsed_output = classification_parser.parse(response.content.strip()) + return parsed_output + except Exception as e: + raise RuntimeError(f"解析分类结果时出错: {e}") from e + + def _tokenize_with_jieba(self, query: str) -> List[str]: + """ + 使用jieba分词器对查询进行分词 + + Args: + query: 用户查询 + + Returns: + 分词后的词语列表 + """ + # 使用jieba进行分词 + seg_list = jieba.cut(query, cut_all=False) + + # 过滤掉停用词和标点符号 + filtered_tokens = [] + for token in seg_list: + # 过滤掉空格和标点符号 + if token.strip() and not re.match(r'^[^\w\s]$', token): + filtered_tokens.append(token) + + return filtered_tokens + + async def _extract_keywords_with_llm_async(self, query: str, use_jieba: bool = False) -> List[Term]: + """ + 异步使用LLM从用户查询中提取专业关键词 + + Args: + query: 用户查询 + use_jieba: 是否使用jieba分词辅助提取关键词 + + Returns: + 提取的术语列表 + """ + # 如果使用jieba分词 + if use_jieba: + # 先使用jieba分词 + tokens = self._tokenize_with_jieba(query) + + # 构建术语列表 + terms = [] + for token in tokens: + if len(token) > 1: # 过滤掉单字词 + terms.append(Term(name=token, synonymous=[], description="")) + + return terms + else: + # 使用LLM提取关键词 + # 准备提示词 + formatted_prompt = extract_nouns_prompt.replace("{content}", query) + terms_list_parser = PydanticOutputParser(pydantic_object=TermList) + formatted_prompt = formatted_prompt.replace("{output_format}", terms_list_parser.get_format_instructions()) + + # 异步调用LLM + response = await self._llm.invoke_async(formatted_prompt, False) + + # 尝试使用Pydantic解析器解析TermList + parsed_output = terms_list_parser.parse(response.content) + return parsed_output.terms + + + async def _rerank_matched_terms_async(self, query_key: str, matched_terms: set, top_k: int = 2, rerank_score:float = 0.6) -> List[Term]: + """ + 异步对召回的专业术语进行重排序,按与用户查询的相关性排序 + + Args: + query: 用户查询 + matched_terms: 匹配到的专业术语集合 + query_keys: 用户查询中提取的关键词列表 + + Returns: + 重排序后的专业术语列表 + """ + if not matched_terms: + return [] + + if len(matched_terms) <= top_k: + return list(matched_terms) + + try: + # 将每个术语转换为可用于重排序的文本表示 + term_texts = ["名称:" + term.name + "|" + "同义词:" + ";".join(term.synonymous) for term in matched_terms] + + # 使用异步重排序模型 + rerank_results = await XinferenceReRankerModel.rerank_async(query_key, term_texts, top_k=top_k) + + # 将matched_terms转换为列表以便按索引访问 + matched_terms_list = list(matched_terms) + + # 根据重排序结果获取排序后的术语列表 + reranked_terms = [matched_terms_list[result["index"]] for result in rerank_results if result["score"] >= rerank_score] + + return reranked_terms + + except Exception as e: + raise RuntimeError(f"异步_rerank_matched_terms重排失败:{e}") from e + + async def _match_keywords_async(self, query: str, use_jieba: bool = False) -> Tuple[TermList, List[str]]: + """ + 异步从用户问题中匹配关键词,结合LLM提取和向量检索 + + Args: + query: 用户问题 + use_jieba: 是否使用jieba分词辅助提取关键词 + + Returns: + 匹配到的关键词列表 + """ + start_time = time.time() + query_keys=[] + # 步骤1: 使用LLM提取查询中的关键词 + try: + llm_start_time = time.time() + extracted_terms = await self._extract_keywords_with_llm_async(query, use_jieba) + for term in extracted_terms: + query_keys.append(term.name) + llm_end_time = time.time() + llm_time = llm_end_time - llm_start_time + except Exception as e: + raise RuntimeError(f"异步LLM关键词提取失败: {e}") from e + + matched_terms = [] # 存储匹配到的Term对象 + # 步骤2: 使用向量检索找到相似的专业名词 + try: + vector_start_time = time.time() + # 对matched_terms中的每个关键字进行向量检索 + for current_key in query_keys: + vector_results = await self._noun_retriever.query_async(current_key, top_k=5, use_intersection=False) + current_key_terms = set() + # 添加向量检索结果 + for result in vector_results: + if isinstance(result.get('synonymous', []), str): + result['synonymous'] = result['synonymous'].split(';') + term = Term( + name=result.get('name'), + synonymous=result.get('synonymous', []), + description=result.get('description', '') + ) + current_key_terms.add(term) + if len(current_key_terms) > 0: + reranked_terms = await self._rerank_matched_terms_async(current_key, current_key_terms) + matched_terms.extend(reranked_terms) + vector_end_time = time.time() + vector_time = vector_end_time - vector_start_time + except Exception as e: + raise RuntimeError(f"异步向量检索关键词时出错: {e}") from e + + # 提取所有Term对象的名称并排序 + # 将set类型的matched_terms转换为TermList类型 + term_list = TermList(terms=list(matched_terms)) + end_time = time.time() + total_time = end_time - start_time + + # 输出整合的时间日志 + logging.info(f"异步关键词匹配耗时统计 - 总耗时: {total_time:.2f}秒, 问题关键词提取: {llm_time:.2f}秒, 向量检索+重排序: {vector_time:.2f}秒") + + return term_list, query_keys + + async def _rewrite_query_async(self, query: str, keywords: TermList, query_keys:List[str], chat_history: List[Dict[str, str]] = None, context: str = "") -> QueryRewrite: + """ + 异步对用户问题进行改写 + + Args: + query: 用户原始问题 + keywords: 匹配到的关键词列表 + query_keys: 用户查询中提取的关键词列表 + + Returns: + 改写结果 + """ + + rewrite_start_time = time.time() + # 准备问题改写提示 + terms_dict = [term.model_dump(exclude={"description"}) for term in keywords.terms] + keywords_str = json.dumps(terms_dict, ensure_ascii=False) + query_rewrite_parser = PydanticOutputParser(pydantic_object=QueryRewrite) + formatted_prompt = query_rewrite_prompt_pro.format(query=query, + output_format=query_rewrite_parser.get_format_instructions(), + keywords=keywords_str, + chat_history=chat_history, + context=context) + # 解析输出 + try: + # 异步调用LLM + response = await self._llm.invoke_async(formatted_prompt, False) + + # 尝试直接解析JSON响应 + parsed_output = query_rewrite_parser.parse(response.content) + rewrite_end_time = time.time() + rewrite_time = rewrite_end_time - rewrite_start_time + logging.info(f"异步问题改写耗时统计 - 总耗时: {rewrite_time:.2f}秒") + return parsed_output + except Exception as e: + raise RuntimeError(f"解析问题改写结果时出错: {e}") from e + + def _judge_define_suffix(self, input_str: str) -> Tuple[bool, List[str]]: + """ + 判断输入字符串是否包含定义的后缀,并返回所有匹配到的后缀名列表 + + Args: + input_str: 输入字符串 + + Returns: + Tuple[bool, List[str]]: (是否包含定义的后缀, 匹配到的后缀名列表) + """ + + # 构建正则表达式模式,匹配大小写不敏感且前面可能带有. + pattern = r'(?:\.?)(' + '|'.join(re.escape(field.get('name')) for field in self._suffix_keywords) + r')' + + # 使用 re.IGNORECASE 标志来忽略大小写,findall找到所有匹配 + matches = re.finditer(pattern, input_str, re.IGNORECASE) + matched_suffixes = [match.group(1) for match in matches] + + return bool(matched_suffixes), matched_suffixes + + async def process_query_async(self, query: str, conversation_context: str = "", + chat_history: List[Dict[str, str]] = None, + previous_slots: Dict[str, Any] = None, + use_jieba: bool = False, + enable_query_expansion: bool = False) -> Dict[str, Any]: + """ + 异步处理用户问题的完整流程 + + Args: + query: 用户原始问题 + conversation_context: 会话背景信息 + chat_history: 历史对话记录,格式为[{"user": "content"}, {"assistant": "content"}] + previous_slots: 历史槽位信息 + use_jieba: 是否使用jieba分词辅助提取关键词 + enable_query_expansion: 是否启用查询扩展 + + Returns: + 包含分类、关键词、改写和槽位填充结果的字典 + """ + if chat_history is None: + chat_history = [] + if previous_slots is None: + previous_slots = {} + + # 步骤: 并行执行提问扩展 + query_expand_tasks = [] + if enable_query_expansion: + # 创建异步任务并立即开始执行 + query_expand_tasks = [ + # 5.1: 后退提示 + asyncio.create_task(self._generate_step_back_prompt_async(query, chat_history, conversation_context)), + + # 5.2: Follow Up Questions + asyncio.create_task(self._generate_follow_up_questions_async(query, chat_history, conversation_context)), + + # 5.3: HyDE + asyncio.create_task(self._generate_hypothetical_document_async(query, chat_history, conversation_context)), + + # 5.4: 多问题查询 + asyncio.create_task(self._generate_multi_questions_async(query, chat_history, conversation_context)) + ] + + # 步骤1-3: 并行执行关键词匹配、问题改写和意图分类 + keywords_task = self._match_keywords_async(query, use_jieba) + + # 等待关键词匹配完成 + keywords_terms, query_keys = await keywords_task + + # 步骤2: 问题改写 + rewrite_task = self._rewrite_query_async( + query=query, + keywords=keywords_terms, + query_keys=query_keys, + chat_history=chat_history, + context=conversation_context + ) + + # 等待问题改写完成 + rewrite = await rewrite_task + + # 步骤3: 进行意图分类 + classification_task = self._classify_intent_async(rewrite.rewrite, conversation_context, chat_history, previous_slots) + classification = await classification_task + + # 步骤4: 进行槽位填充 + # 如果是有效分类,进行槽位填充 + slot_filling_result = {} + if classification.vertical_classification not in ["其他", "闲聊"] and classification.sub_classification not in ["其他", "闲聊"]: + slot_filling_result = await self._fill_slots_async(rewrite.rewrite, classification, conversation_context, chat_history, previous_slots) + + if not enable_query_expansion: + return { + "classification": classification.model_dump(), + "keywords": keywords_terms.model_dump(), + "rewrite": rewrite.model_dump(), + "query_keys": query_keys, + "slot_filling": slot_filling_result + } + + # 等待所有query_expand_tasks完成 + start_time = time.time() + query_expand_results = await asyncio.gather(*query_expand_tasks) + end_time = time.time() + logging.info(f"异步问题扩展环节耗时统计 - 总耗时: {end_time - start_time:.2f}秒") + + # 收集结果 + step_back_result = query_expand_results[0] if query_expand_results[0] else StepBackPrompt(original_query=query, step_back_query=query) + follow_up_result = query_expand_results[1] if query_expand_results[1] else FollowUpQuestions(original_query=query, follow_up_query=query) + hyde_result = query_expand_results[2] if query_expand_results[2] else HypotheticalDocument(original_query=query, hypothetical_answer="") + multi_questions_result = query_expand_results[3] if query_expand_results[3] else MultiQuestions(original_query=query, sub_questions=[query]) + + all_questions = multi_questions_result.sub_questions + all_questions.append(query) + all_questions.append(step_back_result.step_back_query) + all_questions.append(follow_up_result.follow_up_query) + all_questions.append(hyde_result.hypothetical_answer) + all_questions = list(set(all_questions)) + + query_expand = { + "all": all_questions, + "step_back": step_back_result.model_dump(), + "follow_up": follow_up_result.model_dump(), + "hyde": hyde_result.model_dump(), + "multi_questions": multi_questions_result.model_dump() + } + + # 返回所有结果 + return { + "classification": classification.model_dump(), + "keywords": keywords_terms.model_dump(), + "rewrite": rewrite.model_dump(), + "query_keys": query_keys, + "slot_filling": slot_filling_result, + "query_expand": query_expand + } + + async def _fill_slots_async(self, query: str, classification: Classification, conversation_context: str = "", + chat_history: List[Dict[str, str]] = None, + previous_slots: Dict[str, Any] = None,) -> Dict[str, Any]: + """ + 异步根据分类结果对问题进行槽位填充 + + Args: + query: 用户原始问题 + classification: 意图分类结果 + + Returns: + 填充后的槽位数据模型 + """ + # 根据分类结果选择对应的数据模型 + slot_model = self._get_slot_model(classification) + if not slot_model: + raise RuntimeError("未找到匹配的槽位模型") + + fill_slots_start_time = time.time() + # 使用LLM进行槽位填充 + filled_slots = await self._fill_slots_with_llm_async(query, classification, slot_model, conversation_context, chat_history, previous_slots) + fill_slots_end_time = time.time() + fill_slots_time = fill_slots_end_time - fill_slots_start_time + logging.info(f"异步槽位填充耗时统计 - 总耗时: {fill_slots_time:.2f}秒") + + # 检查必填槽位是否都已填充 + is_complete, missing_slots = filled_slots.check_required_slots() + + return { + "is_complete": is_complete, + "missing_slots": missing_slots, + "filled_data": filled_slots.model_dump() + } + + def _get_slot_model(self, classification: Classification) -> Optional[type]: + """ + 根据分类结果获取对应的槽位模型类,用于统一提示词处理 + + Args: + classification: 意图分类结果 + + Returns: + 对应的槽位模型类 + """ + # 软件问题 + if classification.vertical_classification == "软件问题": + if classification.sub_classification == "软件功能": + return SoftwareFunctionSlots + elif classification.sub_classification == "故障排查": + return SoftwareTroubleShootingSlots + + # 业务问题 + elif classification.vertical_classification == "业务问题": + if classification.sub_classification == "专业咨询": + return ProfessionalConsultingSlots + elif classification.sub_classification == "数据问题": + return DataProblemSlots + + # 安装下载注册 + elif classification.vertical_classification == "安装下载注册": + if classification.sub_classification == "后缀名咨询": + return FileExtensionConsultingSlots + elif classification.sub_classification == "软件锁类": + return SoftwareLockSlots + elif classification.sub_classification == "安装下载类": + return InstallationDownloadSlots + elif classification.sub_classification == "问题排查类": + return ProblemDiagnosisSlots + + # 其他 + elif classification.vertical_classification == "其他": + return OtherSlots + + return None + + async def _fill_slots_with_llm_async(self, query: str, + classification: Classification, + slot_model_class: type, + conversation_context: str = "", + chat_history: List[Dict[str, str]] = None, + previous_slots: Dict[str, Any] = None) -> Any: + """ + 异步使用LLM进行槽位填充 + + Args: + query: 用户原始问题 + classification: 意图分类结果 + slot_model_class: 槽位模型类 + + Returns: + 填充后的槽位数据模型实例 + """ + # 准备提示词 + slot_parser = PydanticOutputParser(pydantic_object=slot_model_class) + + formatted_prompt = slot_filling_prompt.format( + query=query, + vertical_classification=classification.vertical_classification, + sub_classification=classification.sub_classification, + output_format=slot_parser.get_format_instructions(), + conversation_context=conversation_context, + chat_history=json.dumps(chat_history,ensure_ascii=False), + previous_slots=json.dumps(previous_slots,ensure_ascii=False), + ) + try: + # 异步调用LLM + response = await self._llm.invoke_async(formatted_prompt, False) + + # 尝试解析LLM响应 + parsed_output = slot_parser.parse(response.content) + return parsed_output + except Exception as e: + # 如果解析失败,创建一个空的模型实例 + empty_instance = slot_model_class() + return empty_instance + + async def _generate_step_back_prompt_async(self, query: str, chat_history: List[Dict[str, str]] = None, conversation_context: str = "") -> StepBackPrompt: + """ + 异步生成后退提示 + + Args: + query: 用户原始问题 + chat_history: 历史对话记录 + conversation_context: 会话背景信息 + + Returns: + 后退提示结果 + """ + step_back_start_time = time.time() + # 准备提示词 + step_back_parser = PydanticOutputParser(pydantic_object=StepBackPrompt) + formatted_prompt = step_back_prompt.format( + query=query, + chat_history=json.dumps(chat_history, ensure_ascii=False) if chat_history else "[]", + conversation_context=conversation_context, + output_format=step_back_parser.get_format_instructions() + ) + + try: + # 异步调用LLM + response = await self._llm.invoke_async(formatted_prompt, False) + + # 解析输出 + parsed_output = step_back_parser.parse(response.content) + step_back_end_time = time.time() + step_back_time = step_back_end_time - step_back_start_time + logging.debug(f"异步后退提示生成耗时统计 - 总耗时: {step_back_time:.2f}秒") + return parsed_output + except Exception as e: + # 如果解析失败,返回原始查询作为后退提示 + logging.error(f"异步后退提示生成失败: {e}", exc_info=True) + return StepBackPrompt(original_query=query, step_back_query=query) + + async def _generate_follow_up_questions_async(self, query: str, chat_history: List[Dict[str, str]] = None, conversation_context: str = "") -> FollowUpQuestions: + """ + 异步生成后续问题 + + Args: + query: 用户原始问题 + chat_history: 历史对话记录 + conversation_context: 会话背景信息 + + Returns: + 后续问题结果 + """ + follow_up_start_time = time.time() + # 准备提示词 + follow_up_parser = PydanticOutputParser(pydantic_object=FollowUpQuestions) + formatted_prompt = follow_up_questions_prompt.format( + query=query, + chat_history=json.dumps(chat_history, ensure_ascii=False) if chat_history else "[]", + conversation_context=conversation_context, + output_format=follow_up_parser.get_format_instructions() + ) + + try: + # 异步调用LLM + response = await self._llm.invoke_async(formatted_prompt, False) + + # 解析输出 + parsed_output = follow_up_parser.parse(response.content) + follow_up_end_time = time.time() + follow_up_time = follow_up_end_time - follow_up_start_time + logging.debug(f"异步后续问题生成耗时统计 - 总耗时: {follow_up_time:.2f}秒") + return parsed_output + except Exception as e: + # 如果解析失败,返回原始查询作为后续问题 + logging.error(f"异步后续问题生成失败: {e}", exc_info=True) + return FollowUpQuestions(original_query=query, follow_up_query=query) + + async def _generate_hypothetical_document_async(self, query: str, chat_history: List[Dict[str, str]] = None, conversation_context: str = "") -> HypotheticalDocument: + """ + 异步生成假设性文档 + + Args: + query: 用户原始问题 + chat_history: 历史对话记录 + conversation_context: 会话背景信息 + + Returns: + 假设性文档结果 + """ + hyde_start_time = time.time() + # 准备提示词 + hyde_parser = PydanticOutputParser(pydantic_object=HypotheticalDocument) + formatted_prompt = hyde_prompt.format( + query=query, + chat_history=json.dumps(chat_history, ensure_ascii=False) if chat_history else "[]", + conversation_context=conversation_context, + output_format=hyde_parser.get_format_instructions() + ) + + try: + # 异步调用LLM + response = await self._llm.invoke_async(formatted_prompt, False) + + # 解析输出 + parsed_output = hyde_parser.parse(response.content) + hyde_end_time = time.time() + hyde_time = hyde_end_time - hyde_start_time + logging.debug(f"异步假设性文档生成耗时统计 - 总耗时: {hyde_time:.2f}秒") + return parsed_output + except Exception as e: + # 如果解析失败,返回空的假设性回答 + logging.error(f"异步假设性文档生成失败: {e}", exc_info=True) + return HypotheticalDocument(original_query=query, hypothetical_answer="") + + async def _generate_multi_questions_async(self, query: str, chat_history: List[Dict[str, str]] = None, conversation_context: str = "") -> MultiQuestions: + """ + 异步生成多角度问题 + + Args: + query: 用户原始问题 + chat_history: 历史对话记录 + conversation_context: 会话背景信息 + + Returns: + 多角度问题结果 + """ + multi_questions_start_time = time.time() + # 准备提示词 + multi_questions_parser = PydanticOutputParser(pydantic_object=MultiQuestions) + formatted_prompt = multi_questions_prompt.format( + query=query, + chat_history=json.dumps(chat_history, ensure_ascii=False) if chat_history else "[]", + conversation_context=conversation_context, + output_format=multi_questions_parser.get_format_instructions() + ) + + try: + # 异步调用LLM + response = await self._llm.invoke_async(formatted_prompt, False) + + # 解析输出 + parsed_output = multi_questions_parser.parse(response.content) + multi_questions_end_time = time.time() + multi_questions_time = multi_questions_end_time - multi_questions_start_time + logging.debug(f"异步多角度问题生成耗时统计 - 总耗时: {multi_questions_time:.2f}秒") + return parsed_output + except Exception as e: + # 如果解析失败,返回原始查询作为唯一子问题 + logging.error(f"异步多角度问题生成失败: {e}", exc_info=True) + return MultiQuestions(original_query=query, sub_questions=[query]) + + async def _process_intent_and_slot_async(self, user_input: str, conversation_context: str = "", + chat_history: List[Dict[str, str]] = None, + previous_slots: Dict[str, Any] = None) -> Dict[str, Any]: + """ + 异步使用统一提示词同时进行意图识别和槽位填充 + + Args: + user_input: 当前用户输入 + conversation_context: 会话背景信息 + chat_history: 历史对话记录,格式为[{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}] + previous_slots: 历史槽位信息 + + Returns: + 包含意图分类和槽位填充结果的字典 + """ + # 初始化默认值 + if chat_history is None: + chat_history = [] + + if previous_slots is None: + previous_slots = {} + + # 生成槽位映射文档 + slot_mapping_doc = generate_slot_mapping_doc() + + # 准备提示词 + parser = PydanticOutputParser(pydantic_object=IntentAndSlotResult) + formatted_prompt = intent_and_slot_prompt.format( + conversation_context=conversation_context, + chat_history=json.dumps(chat_history, ensure_ascii=False), + previous_slots=json.dumps(previous_slots, ensure_ascii=False), + user_input=user_input, + slot_mapping_doc=slot_mapping_doc, + output_format=parser.get_format_instructions(), + classification_info=classification_info + ) + + # 异步调用LLM + llm_start_time = time.time() + response = await self._llm.invoke_async(formatted_prompt + output_example, False) + llm_end_time = time.time() + llm_time = llm_end_time - llm_start_time + + try: + # 解析LLM响应为JSON + result_json = parser.parse(response.content) + classification = result_json.classification + slot_filling = result_json.slots + is_complete, missing_slots = slot_filling.check_required_slots() + expected_slot_model = self._get_slot_model(classification) + + # 添加容错处理,发生概率较低,但仍需处理 + if expected_slot_model is None: + # 添加容错处理,应对LLM返回错误分类信息,一级分类跟二级分类错乱 + # 重新分类 + classification = await self._classify_intent_async(user_input, conversation_context, chat_history, previous_slots) + fill_slots = await self._fill_slots_async(user_input, classification, conversation_context, chat_history, previous_slots) + + result = { + "classification": classification.model_dump(), + "slot_filling": fill_slots + } + logging.warning(f"异步重新分类与槽点填充") + return result + elif expected_slot_model.__name__ != type(slot_filling).__name__: + # 添加容错处理,应对LLM槽位与分类不匹配。重新填充槽位 + slot_filling = await self._fill_slots_async(user_input, classification, conversation_context, chat_history, previous_slots) + result = { + "classification": classification.model_dump(), + "slot_filling": slot_filling + } + logging.warning(f"异步重新填充槽点") + return result + + logging.info(f"异步意图识别+槽位LLM调用耗时: {llm_time:.2f}秒") + + # 构建最终结果 + result = { + "classification": classification.model_dump(), + "slot_filling": { + "is_complete": is_complete, + "missing_slots": missing_slots, + "filled_data": slot_filling.model_dump() + } + } + + return result + + except Exception as e: + raise RuntimeError(f"异步process_intent_and_slot error:{e}") from e \ No newline at end of file diff --git a/rag2_0/intent_recognition/ProfessionalNounVector.py b/rag2_0/intent_recognition/ProfessionalNounVector.py index 31b3d2b..8d6afb2 100755 --- a/rag2_0/intent_recognition/ProfessionalNounVector.py +++ b/rag2_0/intent_recognition/ProfessionalNounVector.py @@ -10,11 +10,13 @@ Description: 专业名词向量化和检索的核心逻辑 import os import json import shutil +import asyncio from typing import List, Dict, Any, Tuple, Optional from langchain.embeddings.base import Embeddings from langchain_community.vectorstores import FAISS from rag2_0.tool.ModelTool import SiliconFlowEmbeddings import logging +import httpx def get_embedding_model(api_key: str = None) -> Embeddings: """ @@ -350,4 +352,148 @@ class ProfessionalNounRetriever: except Exception as e: logging.error(f"查询FAISS索引失败: {e}", exc_info=True) - return [] \ No newline at end of file + return [] + + +class AsyncProfessionalNounRetriever: + """异步专业名词检索类""" + + def __init__(self, + embedding_model: Optional[Embeddings] = None, + api_key: str = None, + index_dir: str = None): + """ + 初始化异步检索器 + + Args: + embedding_model: 嵌入模型,如果为None则使用默认模型 + api_key: SiliconFlow API密钥,仅在embedding_model为None时使用 + index_dir: 索引目录路径,默认为None使用默认路径 + """ + # 设置嵌入模型 + if embedding_model: + self.embedding_model = embedding_model + else: + self.embedding_model = get_embedding_model(api_key) + + # 设置索引路径 + self.index_dir = index_dir + if self.index_dir is None: + current_dir = os.path.dirname(os.path.abspath(__file__)) + self.index_dir = os.path.join(current_dir, "..", "..", "data", "nouns", "professional_nouns_index") + + # 初始化索引为None,不在构造函数中加载 + self.faiss_index = None + + @classmethod + async def create(cls, + embedding_model: Optional[Embeddings] = None, + api_key: str = None, + index_dir: str = None): + """ + 异步工厂方法:创建并初始化异步检索器实例 + + Args: + embedding_model: 嵌入模型,如果为None则使用默认模型 + api_key: SiliconFlow API密钥,仅在embedding_model为None时使用 + index_dir: 索引目录路径,默认为None使用默认路径 + + Returns: + 初始化完成的AsyncProfessionalNounRetriever实例 + """ + instance = cls(embedding_model, api_key, index_dir) + await instance._load_index_async() + return instance + + async def _load_index_async(self) -> None: + """ + 异步从本地加载FAISS索引 (内部方法) + """ + try: + # 由于FAISS加载可能是CPU密集型操作,使用线程池执行器来避免阻塞事件循环 + self.faiss_index = await asyncio.to_thread( + FAISS.load_local, + folder_path=self.index_dir, + embeddings=self.embedding_model, + allow_dangerous_deserialization=True + ) + logging.info(f"异步成功从 {self.index_dir} 加载FAISS索引") + except Exception as e: + logging.warning(f"异步加载FAISS索引失败: {e}") + self.faiss_index = None + + async def _invoke_retriever_async(self, retriever, query_text: str): + """ + 异步调用检索器 (内部方法) + + Args: + retriever: 检索器实例 + query_text: 查询文本 + + Returns: + 检索结果 + """ + # 由于LangChain的retriever.invoke可能不是异步的,使用线程池执行器 + return await asyncio.to_thread(retriever.invoke, query_text) + + async def query_async(self, query_text: str, top_k: int = 5, use_intersection: bool = True) -> List[Dict]: + """ + 异步查询FAISS索引,获取最相似的专业名词 + + Args: + query_text: 查询文本 + top_k: 返回的结果数量,默认为5 + use_intersection: 是否使用三种检索方式的交集,默认为True + + Returns: + 相似度最高的专业名词列表 + """ + try: + # 检查索引是否已加载 + if self.faiss_index is None: + logging.warning("FAISS索引未加载,尝试加载...") + await self._load_index_async() + if self.faiss_index is None: + logging.warning("异步加载FAISS索引失败,无法执行查询") + return [] + + # 使用三种检索方式并取交集 + retriever1 = self.faiss_index.as_retriever(search_kwargs={"k": top_k}) + retriever2 = self.faiss_index.as_retriever( + search_type="mmr", + search_kwargs={"k": top_k, "fetch_k": 3, "lambda_mult": 0.5} + ) + retriever3 = self.faiss_index.as_retriever( + search_type="similarity_score_threshold", + search_kwargs={"score_threshold": 0.5} + ) + + # 并行执行三个检索任务 + results = await asyncio.gather( + self._invoke_retriever_async(retriever1, query_text), + self._invoke_retriever_async(retriever2, query_text), + self._invoke_retriever_async(retriever3, query_text) + ) + + # 用json.dumps将dict转为字符串,便于取交集 + set1 = set(json.dumps(i.metadata, sort_keys=True, ensure_ascii=False) for i in results[0]) + set2 = set(json.dumps(i.metadata, sort_keys=True, ensure_ascii=False) for i in results[1]) + set3 = set(json.dumps(i.metadata, sort_keys=True, ensure_ascii=False) for i in results[2]) + + # 如果use_intersection为True,取交集;否则取并集 + if use_intersection: + intersection = set1 & set2 & set3 + else: + intersection = set1 | set2 | set3 + + # 如果交集为空,使用第一种检索方式的结果 + if not intersection: + logging.warning("三种检索方式无交集,使用普通检索结果") + return [json.loads(item) for item in set1] + + # 转回dict + return [json.loads(item) for item in intersection] + + except Exception as e: + logging.error(f"异步查询FAISS索引失败: {e}", exc_info=True) + return [] diff --git a/rag2_0/intent_recognition/__init__.py b/rag2_0/intent_recognition/__init__.py index 3818a7e..f9feb6b 100755 --- a/rag2_0/intent_recognition/__init__.py +++ b/rag2_0/intent_recognition/__init__.py @@ -1,5 +1,5 @@ #!/usr/bin/env python from .ProfessionalNounVector import ProfessionalNounVectorizer, ProfessionalNounRetriever -from .IntentRecognition import IntentRecognizer +from .IntentRecognition import IntentRecognizer, AsyncIntentRecognizer from .DataModels import Term, TermList, Classification, QueryRewrite diff --git a/rag2_0/tool/ModelTool.py b/rag2_0/tool/ModelTool.py index d5121dd..fad8b5a 100755 --- a/rag2_0/tool/ModelTool.py +++ b/rag2_0/tool/ModelTool.py @@ -8,7 +8,9 @@ Description: 模型工具类 """ from openai import OpenAI +from openai import AsyncOpenAI import httpx +import asyncio import time import logging # 导入 logging 模块 from langchain.embeddings.base import Embeddings @@ -41,12 +43,34 @@ class SiliconFlowEmbeddings(Embeddings): data = response.json() return [item["embedding"] for item in data["data"]] + async def _embed_async(self, input: List[str]) -> List[List[float]]: + """异步嵌入方法""" + payload = { + "model": self.model, + "input": input, + "encoding_format": "float" + } + async with httpx.AsyncClient() as client: + response = await client.post(self.url, json=payload, headers=self.headers) + response.raise_for_status() + data = response.json() + return [item["embedding"] for item in data["data"]] + def embed_documents(self, texts: List[str]) -> List[List[float]]: return self._embed(texts) + async def embed_documents_async(self, texts: List[str]) -> List[List[float]]: + """异步嵌入多个文档""" + return await self._embed_async(texts) + def embed_query(self, text: str) -> List[float]: return self._embed([text])[0] + async def embed_query_async(self, text: str) -> List[float]: + """异步嵌入单个查询""" + result = await self._embed_async([text]) + return result[0] + class SiliconFlowReRankerModel: @staticmethod def rerank(query: str, documents: List[str], top_k: int = 10) -> List[str]: @@ -84,6 +108,44 @@ class SiliconFlowReRankerModel: except requests.exceptions.RequestException as e: logging.error(f"重排序请求失败: {str(e)}", exc_info=True) return [] + + @staticmethod + async def rerank_async(query: str, documents: List[str], top_k: int = 10) -> List[str]: + """ + 使用硅流重排模型对文档进行异步重新排序 + + Args: + query: 用户查询文本 + documents: 需要重新排序的文档列表 + top_k: 返回排序后的前k个文档 + + Returns: + List[dict]: 重排序后的文档列表,每个元素包含document内容、相关性分数和原始索引 + """ + url = "https://api.siliconflow.cn/v1/rerank" + payload = { + "model": "BAAI/bge-reranker-v2-m3", + "query": query, + "documents": documents, + "top_n": top_k, + "max_chunks_per_doc": 1024, + "overlap_tokens": 80, + "return_documents": True + } + api_key = APIKeyManager.get_api_key() + headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json" + } + try: + async with httpx.AsyncClient() as client: + response = await client.post(url, json=payload, headers=headers) + response.raise_for_status() + results = response.json() + return [{"document": item["document"]["text"], "score": item["relevance_score"], "index": item["index"]} for item in results["results"]] + except httpx.RequestError as e: + logging.error(f"异步重排序请求失败: {str(e)}", exc_info=True) + return [] class XinferenceReRankerModel: """重排模型封装""" @@ -122,6 +184,39 @@ class XinferenceReRankerModel: logging.error(f"XinferenceReRankerModel重排序请求失败: {str(e)}") return [] + @staticmethod + async def rerank_async(query: str, documents: List[str], top_k: int = 10) -> List[str]: + """ + 使用重排序模型对文档进行异步重新排序 + + Args: + query: 用户查询文本 + documents: 需要重新排序的文档列表 + top_k: 返回排序后的前k个文档 + + Returns: + List[dict]: 重排序后的文档列表,每个元素包含document内容、相关性分数和原始索引 + """ + url = "http://172.20.0.145:9995/v1/rerank" + + params = {"documents": documents, "query": query, "top_n": top_k, "return_documents": True, "model": "bge-reranker-v2-m3"} + headers = { + "Authorization": "Bearer ", # 这里需要替换为实际的token + "Content-Type": "application/json" + } + + try: + async with httpx.AsyncClient() as client: + response = await client.post(url, json=params, headers=headers) + response.raise_for_status() # 检查响应状态 + results = response.json() + + # 返回重排序后的文档列表 + return [{"document": item["document"]["text"], "score": item["relevance_score"], "index": item["index"]} for item in results["results"]] + + except httpx.RequestError as e: + logging.error(f"XinferenceReRankerModel异步重排序请求失败: {str(e)}") + return [] class OpenAiLLM: @@ -189,6 +284,47 @@ class OpenAiLLM: except Exception as e: raise RuntimeError(f"OpenAiLLM:invoke:error:{str(e)}.api_key:{api_key}") from e + async def invoke_async(self, user_prompt="你是谁?", need_retry=True): + """异步调用OpenAI API""" + max_retries = 3 + retry_count = 0 + if "timeout" not in self._kwargs: + timeout = httpx.Timeout(300.0) + self._kwargs["timeout"] = timeout + + if need_retry: + while retry_count < max_retries: + try: + api_key = APIKeyManager.get_api_key() + # 使用异步客户端 + async with AsyncOpenAI(api_key=api_key, base_url=self._url) as client: + # 创建异步Completion请求 + completion = await client.chat.completions.create( + model=self._model, + messages=[{'role': 'user', 'content': user_prompt}], + **self._kwargs + ) + return completion.choices[0].message + + except Exception as e: + retry_count += 1 + if retry_count == max_retries: + raise RuntimeError(f"OpenAiLLM:invoke_async:error:{str(e)}.api_key:{api_key}") from e + else: + await asyncio.sleep(5*retry_count) # 异步等待 + else: + try: + api_key = APIKeyManager.get_api_key() + async with AsyncOpenAI(api_key=api_key, base_url=self._url) as client: + completion = await client.chat.completions.create( + model=self._model, + messages=[{'role': 'user', 'content': user_prompt}], + **self._kwargs + ) + return completion.choices[0].message + except Exception as e: + raise RuntimeError(f"OpenAiLLM:invoke_async:error:{str(e)}.api_key:{api_key}") from e + if __name__ == "__main__": # 测试重排模型 reranker = SiliconFlowReRankerModel() @@ -202,4 +338,25 @@ if __name__ == "__main__": print(f"{idx+1}. 文档: {item['document']}, 分数: {item['score']}") print("-" * 50) + # 异步测试示例 + async def test_async(): + # 测试异步嵌入 + api_key = APIKeyManager.get_api_key() + embeddings = SiliconFlowEmbeddings(api_key=api_key) + query_embedding = await embeddings.embed_query_async("测试查询") + print(f"异步嵌入向量维度: {len(query_embedding)}") + + # 测试异步重排序 + results = await SiliconFlowReRankerModel.rerank_async(query, documents) + print(f"异步重排序结果数量: {len(results)}") + + # 测试异步LLM调用 + llm = OpenAiLLM() + response = await llm.invoke_async("你好,请简单介绍一下自己") + print(f"异步LLM响应: {response.content}") + + # 如果需要运行异步测试,取消下面的注释 + # import asyncio + # asyncio.run(test_async()) +