新增异步意图识别器和相关功能,优化意图识别和槽位填充逻辑,支持异步处理和多线程检索,改进API调用的错误处理和日志记录,增强文档检索和关键词提取功能。

This commit is contained in:
2025-07-03 15:40:36 +08:00
parent 68e3677c34
commit c52627abeb
6 changed files with 1146 additions and 40 deletions
+13 -2
View File
@@ -15,12 +15,13 @@ import concurrent.futures
from tqdm import tqdm
import time
import sys
import asyncio
import argparse
from typing import List, Dict, Any
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
sys.path.append(os.getcwd())
from rag2_0.intent_recognition import IntentRecognizer
from rag2_0.intent_recognition import IntentRecognizer, AsyncIntentRecognizer
from rag2_0.dify.DifyQueryRetrieval import DifyQueryRetrieval
from rag2_0.intent_recognition.DataModels import Classification
from rag2_0.tool.ModelTool import OpenAiLLM
@@ -79,6 +80,12 @@ class QueryRewriteProcessor:
self.model_name = model_name or os.getenv("LLM_MODEL_NAME", "gpt-3.5-turbo")
self.recognizer = IntentRecognizer(api_key=self.api_key, base_url=self.base_url, model_name=self.model_name)
# 使用asyncio.run()运行异步create方法
self.recognizer_async = asyncio.run(AsyncIntentRecognizer.create(
api_key=self.api_key,
base_url=self.base_url,
model_name=self.model_name
))
self.dify_query_retrieval = DifyQueryRetrieval(api_key=dify_api_key, base_url=dify_base_url)
def is_retrieved_doc_relevant(self, query: str, retrieved_doc: List[Dict[str, Any]]) -> Dict[str, Any]:
@@ -168,7 +175,11 @@ class QueryRewriteProcessor:
logging.error(f"读取Excel文件时出错: {e}", exc_info=True)
return []
def process_query(self, query: str, conversation_context: str = "", chat_history: List[Dict[str, str]] = None, previous_slots: Dict[str, str] = None, enable_retrieval: bool = False):
def process_query(self, query: str,
conversation_context: str = "",
chat_history: List[Dict[str, str]] = None,
previous_slots: Dict[str, str] = None,
enable_retrieval: bool = False):
"""
处理单个查询,支持重试机制,并包含槽位填充
+5 -23
View File
@@ -33,24 +33,10 @@ logger = logging.getLogger(__name__)
app = Flask(__name__)
# 创建线程锁,用于保护共享资源
recognizer_lock = RLock()
# 使用单例模式创建意图识别器
class RecognizerSingleton:
_instance = None
_lock = RLock()
@classmethod
def get_instance(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_API_BASE")
model_name = os.getenv("LLM_MODEL_NAME", "gpt-3.5-turbo")
cls._instance = IntentRecognizer(api_key=api_key, base_url=base_url, model_name=model_name)
return cls._instance
api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_API_BASE")
model_name = os.getenv("LLM_MODEL_NAME", "gpt-3.5-turbo")
_instance = IntentRecognizer(api_key=api_key, base_url=base_url, model_name=model_name)
@app.route('/intent_recognize', methods=['POST'])
def intent_recognize():
@@ -65,9 +51,7 @@ def intent_recognize():
start_time = time.time()
# 获取单例实例并使用线程锁保护关键操作
recognizer = RecognizerSingleton.get_instance()
result = recognizer.process_query(query=query,
result = _instance.process_query(query=query,
conversation_context=conversation_context,
chat_history=chat_history,
previous_slots=previous_slots,
@@ -89,8 +73,6 @@ def intent_recognize():
for term in keywords["terms"]:
term_info = {
"名称": term["name"],
# "同义词": ";".join(term["synonymous"]) if term["synonymous"] else [],
# "描述": term["description"]
}
term_details.append(term_info)
keywords_str = term_details
+823 -13
View File
@@ -9,12 +9,14 @@ Description: 意图分类、改写核心逻辑
import logging
import os
import asyncio
from langchain.output_parsers import PydanticOutputParser
import json
from typing import List, Tuple, Dict, Any, Optional
import re
import jieba
import time
import threading
from .PromptTemplates import (classification_prompt, query_rewrite_prompt,
extract_nouns_prompt, classification_info,
@@ -33,7 +35,7 @@ from .DataModels import (
InstallationDownloadSlots, ProblemDiagnosisSlots, OtherSlots, IntentAndSlotResult,
StepBackPrompt, FollowUpQuestions, HypotheticalDocument, MultiQuestions
)
from .ProfessionalNounVector import ProfessionalNounRetriever
from .ProfessionalNounVector import ProfessionalNounRetriever, AsyncProfessionalNounRetriever
from rag2_0.tool.ModelTool import XinferenceReRankerModel, OpenAiLLM, SiliconFlowReRankerModel
@@ -448,10 +450,10 @@ class IntentRecognizer:
"slot_filling": slot_filling_result
}
# 等待所有greenlet完成
# 等待所有线程完成
start_time = time.time()
for greenlet, _ in threads_and_results:
greenlet.join()
for thread, _ in threads_and_results:
thread.join()
end_time = time.time()
logging.info(f"问题扩展环节耗时统计 - 总耗时: {end_time - start_time:.2f}")
@@ -750,7 +752,7 @@ class IntentRecognizer:
def _run_in_thread(self, func, args=(), kwargs={}):
"""
greenlet中执行函数并返回结果
线程中执行函数并返回结果
Args:
func: 要执行的函数
@@ -758,22 +760,21 @@ class IntentRecognizer:
kwargs: 函数的关键字参数
Returns:
(greenlet, result_container): greenlet对象和存放结果的容器
(thread, result_container): 线程对象和存放结果的容器
"""
from gevent import Greenlet
result_container = []
def greenlet_target():
def thread_target():
try:
result = func(*args, **kwargs)
result_container.append(result)
except Exception as e:
logging.error(f"greenlet执行函数 {func.__name__} 时出错: {e}", exc_info=True)
logging.error(f"线程执行函数 {func.__name__} 时出错: {e}", exc_info=True)
result_container.append(None)
greenlet = Greenlet(greenlet_target)
greenlet.start()
return greenlet, result_container
thread = threading.Thread(target=thread_target)
thread.start()
return thread, result_container
def _process_intent_and_slot(self, user_input: str, conversation_context: str = "",
@@ -866,4 +867,813 @@ class IntentRecognizer:
return result
except Exception as e:
raise RuntimeError(f"process_intent_and_slot error:{e}") from e
raise RuntimeError(f"process_intent_and_slot error:{e}") from e
class AsyncIntentRecognizer:
"""
异步意图识别和问题改写类
"""
def __init__(self, api_key: str = None, base_url: str = None, model_name: str = "gpt-3.5-turbo", vector_index_dir: str = None):
"""
初始化异步意图识别器
Args:
api_key: OpenAI API密钥,如果为None则从环境变量获取
base_url: OpenAI API基础URL,如果为None则使用默认URL
model_name: 要使用的模型名称
vector_index_dir: 向量索引目录,如果为None则使用默认目录
"""
# 初始化LLM
llm_params = {
"temperature": 0.2, # 降低随机性,使结果更确定
"top_p": 0.7,
"model": model_name
}
# 如果提供了API密钥,则使用提供的密钥
if api_key:
llm_params["api_key"] = api_key
# 如果提供了自定义URL,则使用提供的URL
if base_url:
llm_params["base_url"] = base_url
self._llm = OpenAiLLM(**llm_params)
# 加载suffix关键词
self._suffix_keywords = self._load_suffix_keywords()
# 异步检索器将在create方法中初始化
self._noun_retriever = None
self._api_key = api_key
self._vector_index_dir = vector_index_dir
@classmethod
async def create(cls, api_key: str = None, base_url: str = None, model_name: str = "gpt-3.5-turbo", vector_index_dir: str = None):
"""
异步工厂方法:创建并初始化异步意图识别器实例
Args:
api_key: OpenAI API密钥,如果为None则从环境变量获取
base_url: OpenAI API基础URL,如果为None则使用默认URL
model_name: 要使用的模型名称
vector_index_dir: 向量索引目录,如果为None则使用默认目录
Returns:
初始化完成的AsyncIntentRecognizer实例
"""
instance = cls(api_key, base_url, model_name, vector_index_dir)
# 异步初始化名词检索器
instance._noun_retriever = await AsyncProfessionalNounRetriever.create(
api_key=api_key,
index_dir=vector_index_dir
)
return instance
def _load_suffix_keywords(self, filepath: str = None) -> List[str]:
"""
加载后缀关键词列表
Args:
filepath: 后缀关键词文件路径,默认为None使用默认路径
Returns:
后缀关键词列表
"""
try:
# 如果未指定路径,使用默认路径
if filepath is None:
current_dir = os.path.dirname(os.path.abspath(__file__))
filepath = os.path.join(current_dir, "..", "..", "data", "nouns", "suffix_keywords.json")
# 读取JSON文件
with open(filepath, "r", encoding="utf-8") as f:
suffix_data = json.load(f)
# 添加额外的固定后缀
return suffix_data
except Exception as e:
raise RuntimeError(f"加载后缀关键词失败: {e}") from e
async def _classify_intent_async(self, query: str, conversation_context: str = "",
chat_history: List[Dict[str, str]] = None,
previous_slots: Dict[str, Any] = None) -> Classification:
"""
异步对用户输入进行意图分类
Args:
content: 用户输入内容
keywords: 匹配到的关键词列表
rewrite: 重写的问题
Returns:
分类结果
"""
classification_start_time = time.time()
classification_parser = PydanticOutputParser(pydantic_object=Classification)
formatted_prompt = classification_prompt.format(user_input=query,
classification_info=classification_info,
output_format=classification_parser.get_format_instructions(),
conversation_context=conversation_context,
chat_history=json.dumps(chat_history, ensure_ascii=False))
# 解析输出
try:
# 异步调用LLM
response = await self._llm.invoke_async(formatted_prompt, False)
classification_end_time = time.time()
classification_time = classification_end_time - classification_start_time
logging.info(f"异步意图分类耗时统计 - 总耗时: {classification_time:.2f}")
# 尝试直接解析JSON响应
parsed_output = classification_parser.parse(response.content.strip())
return parsed_output
except Exception as e:
raise RuntimeError(f"解析分类结果时出错: {e}") from e
def _tokenize_with_jieba(self, query: str) -> List[str]:
"""
使用jieba分词器对查询进行分词
Args:
query: 用户查询
Returns:
分词后的词语列表
"""
# 使用jieba进行分词
seg_list = jieba.cut(query, cut_all=False)
# 过滤掉停用词和标点符号
filtered_tokens = []
for token in seg_list:
# 过滤掉空格和标点符号
if token.strip() and not re.match(r'^[^\w\s]$', token):
filtered_tokens.append(token)
return filtered_tokens
async def _extract_keywords_with_llm_async(self, query: str, use_jieba: bool = False) -> List[Term]:
"""
异步使用LLM从用户查询中提取专业关键词
Args:
query: 用户查询
use_jieba: 是否使用jieba分词辅助提取关键词
Returns:
提取的术语列表
"""
# 如果使用jieba分词
if use_jieba:
# 先使用jieba分词
tokens = self._tokenize_with_jieba(query)
# 构建术语列表
terms = []
for token in tokens:
if len(token) > 1: # 过滤掉单字词
terms.append(Term(name=token, synonymous=[], description=""))
return terms
else:
# 使用LLM提取关键词
# 准备提示词
formatted_prompt = extract_nouns_prompt.replace("{content}", query)
terms_list_parser = PydanticOutputParser(pydantic_object=TermList)
formatted_prompt = formatted_prompt.replace("{output_format}", terms_list_parser.get_format_instructions())
# 异步调用LLM
response = await self._llm.invoke_async(formatted_prompt, False)
# 尝试使用Pydantic解析器解析TermList
parsed_output = terms_list_parser.parse(response.content)
return parsed_output.terms
async def _rerank_matched_terms_async(self, query_key: str, matched_terms: set, top_k: int = 2, rerank_score:float = 0.6) -> List[Term]:
"""
异步对召回的专业术语进行重排序,按与用户查询的相关性排序
Args:
query: 用户查询
matched_terms: 匹配到的专业术语集合
query_keys: 用户查询中提取的关键词列表
Returns:
重排序后的专业术语列表
"""
if not matched_terms:
return []
if len(matched_terms) <= top_k:
return list(matched_terms)
try:
# 将每个术语转换为可用于重排序的文本表示
term_texts = ["名称:" + term.name + "|" + "同义词:" + ";".join(term.synonymous) for term in matched_terms]
# 使用异步重排序模型
rerank_results = await XinferenceReRankerModel.rerank_async(query_key, term_texts, top_k=top_k)
# 将matched_terms转换为列表以便按索引访问
matched_terms_list = list(matched_terms)
# 根据重排序结果获取排序后的术语列表
reranked_terms = [matched_terms_list[result["index"]] for result in rerank_results if result["score"] >= rerank_score]
return reranked_terms
except Exception as e:
raise RuntimeError(f"异步_rerank_matched_terms重排失败:{e}") from e
async def _match_keywords_async(self, query: str, use_jieba: bool = False) -> Tuple[TermList, List[str]]:
"""
异步从用户问题中匹配关键词,结合LLM提取和向量检索
Args:
query: 用户问题
use_jieba: 是否使用jieba分词辅助提取关键词
Returns:
匹配到的关键词列表
"""
start_time = time.time()
query_keys=[]
# 步骤1: 使用LLM提取查询中的关键词
try:
llm_start_time = time.time()
extracted_terms = await self._extract_keywords_with_llm_async(query, use_jieba)
for term in extracted_terms:
query_keys.append(term.name)
llm_end_time = time.time()
llm_time = llm_end_time - llm_start_time
except Exception as e:
raise RuntimeError(f"异步LLM关键词提取失败: {e}") from e
matched_terms = [] # 存储匹配到的Term对象
# 步骤2: 使用向量检索找到相似的专业名词
try:
vector_start_time = time.time()
# 对matched_terms中的每个关键字进行向量检索
for current_key in query_keys:
vector_results = await self._noun_retriever.query_async(current_key, top_k=5, use_intersection=False)
current_key_terms = set()
# 添加向量检索结果
for result in vector_results:
if isinstance(result.get('synonymous', []), str):
result['synonymous'] = result['synonymous'].split(';')
term = Term(
name=result.get('name'),
synonymous=result.get('synonymous', []),
description=result.get('description', '')
)
current_key_terms.add(term)
if len(current_key_terms) > 0:
reranked_terms = await self._rerank_matched_terms_async(current_key, current_key_terms)
matched_terms.extend(reranked_terms)
vector_end_time = time.time()
vector_time = vector_end_time - vector_start_time
except Exception as e:
raise RuntimeError(f"异步向量检索关键词时出错: {e}") from e
# 提取所有Term对象的名称并排序
# 将set类型的matched_terms转换为TermList类型
term_list = TermList(terms=list(matched_terms))
end_time = time.time()
total_time = end_time - start_time
# 输出整合的时间日志
logging.info(f"异步关键词匹配耗时统计 - 总耗时: {total_time:.2f}秒, 问题关键词提取: {llm_time:.2f}秒, 向量检索+重排序: {vector_time:.2f}")
return term_list, query_keys
async def _rewrite_query_async(self, query: str, keywords: TermList, query_keys:List[str], chat_history: List[Dict[str, str]] = None, context: str = "") -> QueryRewrite:
"""
异步对用户问题进行改写
Args:
query: 用户原始问题
keywords: 匹配到的关键词列表
query_keys: 用户查询中提取的关键词列表
Returns:
改写结果
"""
rewrite_start_time = time.time()
# 准备问题改写提示
terms_dict = [term.model_dump(exclude={"description"}) for term in keywords.terms]
keywords_str = json.dumps(terms_dict, ensure_ascii=False)
query_rewrite_parser = PydanticOutputParser(pydantic_object=QueryRewrite)
formatted_prompt = query_rewrite_prompt_pro.format(query=query,
output_format=query_rewrite_parser.get_format_instructions(),
keywords=keywords_str,
chat_history=chat_history,
context=context)
# 解析输出
try:
# 异步调用LLM
response = await self._llm.invoke_async(formatted_prompt, False)
# 尝试直接解析JSON响应
parsed_output = query_rewrite_parser.parse(response.content)
rewrite_end_time = time.time()
rewrite_time = rewrite_end_time - rewrite_start_time
logging.info(f"异步问题改写耗时统计 - 总耗时: {rewrite_time:.2f}")
return parsed_output
except Exception as e:
raise RuntimeError(f"解析问题改写结果时出错: {e}") from e
def _judge_define_suffix(self, input_str: str) -> Tuple[bool, List[str]]:
"""
判断输入字符串是否包含定义的后缀,并返回所有匹配到的后缀名列表
Args:
input_str: 输入字符串
Returns:
Tuple[bool, List[str]]: (是否包含定义的后缀, 匹配到的后缀名列表)
"""
# 构建正则表达式模式,匹配大小写不敏感且前面可能带有.
pattern = r'(?:\.?)(' + '|'.join(re.escape(field.get('name')) for field in self._suffix_keywords) + r')'
# 使用 re.IGNORECASE 标志来忽略大小写,findall找到所有匹配
matches = re.finditer(pattern, input_str, re.IGNORECASE)
matched_suffixes = [match.group(1) for match in matches]
return bool(matched_suffixes), matched_suffixes
async def process_query_async(self, query: str, conversation_context: str = "",
chat_history: List[Dict[str, str]] = None,
previous_slots: Dict[str, Any] = None,
use_jieba: bool = False,
enable_query_expansion: bool = False) -> Dict[str, Any]:
"""
异步处理用户问题的完整流程
Args:
query: 用户原始问题
conversation_context: 会话背景信息
chat_history: 历史对话记录,格式为[{"user": "content"}, {"assistant": "content"}]
previous_slots: 历史槽位信息
use_jieba: 是否使用jieba分词辅助提取关键词
enable_query_expansion: 是否启用查询扩展
Returns:
包含分类、关键词、改写和槽位填充结果的字典
"""
if chat_history is None:
chat_history = []
if previous_slots is None:
previous_slots = {}
# 步骤: 并行执行提问扩展
query_expand_tasks = []
if enable_query_expansion:
# 创建异步任务并立即开始执行
query_expand_tasks = [
# 5.1: 后退提示
asyncio.create_task(self._generate_step_back_prompt_async(query, chat_history, conversation_context)),
# 5.2: Follow Up Questions
asyncio.create_task(self._generate_follow_up_questions_async(query, chat_history, conversation_context)),
# 5.3: HyDE
asyncio.create_task(self._generate_hypothetical_document_async(query, chat_history, conversation_context)),
# 5.4: 多问题查询
asyncio.create_task(self._generate_multi_questions_async(query, chat_history, conversation_context))
]
# 步骤1-3: 并行执行关键词匹配、问题改写和意图分类
keywords_task = self._match_keywords_async(query, use_jieba)
# 等待关键词匹配完成
keywords_terms, query_keys = await keywords_task
# 步骤2: 问题改写
rewrite_task = self._rewrite_query_async(
query=query,
keywords=keywords_terms,
query_keys=query_keys,
chat_history=chat_history,
context=conversation_context
)
# 等待问题改写完成
rewrite = await rewrite_task
# 步骤3: 进行意图分类
classification_task = self._classify_intent_async(rewrite.rewrite, conversation_context, chat_history, previous_slots)
classification = await classification_task
# 步骤4: 进行槽位填充
# 如果是有效分类,进行槽位填充
slot_filling_result = {}
if classification.vertical_classification not in ["其他", "闲聊"] and classification.sub_classification not in ["其他", "闲聊"]:
slot_filling_result = await self._fill_slots_async(rewrite.rewrite, classification, conversation_context, chat_history, previous_slots)
if not enable_query_expansion:
return {
"classification": classification.model_dump(),
"keywords": keywords_terms.model_dump(),
"rewrite": rewrite.model_dump(),
"query_keys": query_keys,
"slot_filling": slot_filling_result
}
# 等待所有query_expand_tasks完成
start_time = time.time()
query_expand_results = await asyncio.gather(*query_expand_tasks)
end_time = time.time()
logging.info(f"异步问题扩展环节耗时统计 - 总耗时: {end_time - start_time:.2f}")
# 收集结果
step_back_result = query_expand_results[0] if query_expand_results[0] else StepBackPrompt(original_query=query, step_back_query=query)
follow_up_result = query_expand_results[1] if query_expand_results[1] else FollowUpQuestions(original_query=query, follow_up_query=query)
hyde_result = query_expand_results[2] if query_expand_results[2] else HypotheticalDocument(original_query=query, hypothetical_answer="")
multi_questions_result = query_expand_results[3] if query_expand_results[3] else MultiQuestions(original_query=query, sub_questions=[query])
all_questions = multi_questions_result.sub_questions
all_questions.append(query)
all_questions.append(step_back_result.step_back_query)
all_questions.append(follow_up_result.follow_up_query)
all_questions.append(hyde_result.hypothetical_answer)
all_questions = list(set(all_questions))
query_expand = {
"all": all_questions,
"step_back": step_back_result.model_dump(),
"follow_up": follow_up_result.model_dump(),
"hyde": hyde_result.model_dump(),
"multi_questions": multi_questions_result.model_dump()
}
# 返回所有结果
return {
"classification": classification.model_dump(),
"keywords": keywords_terms.model_dump(),
"rewrite": rewrite.model_dump(),
"query_keys": query_keys,
"slot_filling": slot_filling_result,
"query_expand": query_expand
}
async def _fill_slots_async(self, query: str, classification: Classification, conversation_context: str = "",
chat_history: List[Dict[str, str]] = None,
previous_slots: Dict[str, Any] = None,) -> Dict[str, Any]:
"""
异步根据分类结果对问题进行槽位填充
Args:
query: 用户原始问题
classification: 意图分类结果
Returns:
填充后的槽位数据模型
"""
# 根据分类结果选择对应的数据模型
slot_model = self._get_slot_model(classification)
if not slot_model:
raise RuntimeError("未找到匹配的槽位模型")
fill_slots_start_time = time.time()
# 使用LLM进行槽位填充
filled_slots = await self._fill_slots_with_llm_async(query, classification, slot_model, conversation_context, chat_history, previous_slots)
fill_slots_end_time = time.time()
fill_slots_time = fill_slots_end_time - fill_slots_start_time
logging.info(f"异步槽位填充耗时统计 - 总耗时: {fill_slots_time:.2f}")
# 检查必填槽位是否都已填充
is_complete, missing_slots = filled_slots.check_required_slots()
return {
"is_complete": is_complete,
"missing_slots": missing_slots,
"filled_data": filled_slots.model_dump()
}
def _get_slot_model(self, classification: Classification) -> Optional[type]:
"""
根据分类结果获取对应的槽位模型类,用于统一提示词处理
Args:
classification: 意图分类结果
Returns:
对应的槽位模型类
"""
# 软件问题
if classification.vertical_classification == "软件问题":
if classification.sub_classification == "软件功能":
return SoftwareFunctionSlots
elif classification.sub_classification == "故障排查":
return SoftwareTroubleShootingSlots
# 业务问题
elif classification.vertical_classification == "业务问题":
if classification.sub_classification == "专业咨询":
return ProfessionalConsultingSlots
elif classification.sub_classification == "数据问题":
return DataProblemSlots
# 安装下载注册
elif classification.vertical_classification == "安装下载注册":
if classification.sub_classification == "后缀名咨询":
return FileExtensionConsultingSlots
elif classification.sub_classification == "软件锁类":
return SoftwareLockSlots
elif classification.sub_classification == "安装下载类":
return InstallationDownloadSlots
elif classification.sub_classification == "问题排查类":
return ProblemDiagnosisSlots
# 其他
elif classification.vertical_classification == "其他":
return OtherSlots
return None
async def _fill_slots_with_llm_async(self, query: str,
classification: Classification,
slot_model_class: type,
conversation_context: str = "",
chat_history: List[Dict[str, str]] = None,
previous_slots: Dict[str, Any] = None) -> Any:
"""
异步使用LLM进行槽位填充
Args:
query: 用户原始问题
classification: 意图分类结果
slot_model_class: 槽位模型类
Returns:
填充后的槽位数据模型实例
"""
# 准备提示词
slot_parser = PydanticOutputParser(pydantic_object=slot_model_class)
formatted_prompt = slot_filling_prompt.format(
query=query,
vertical_classification=classification.vertical_classification,
sub_classification=classification.sub_classification,
output_format=slot_parser.get_format_instructions(),
conversation_context=conversation_context,
chat_history=json.dumps(chat_history,ensure_ascii=False),
previous_slots=json.dumps(previous_slots,ensure_ascii=False),
)
try:
# 异步调用LLM
response = await self._llm.invoke_async(formatted_prompt, False)
# 尝试解析LLM响应
parsed_output = slot_parser.parse(response.content)
return parsed_output
except Exception as e:
# 如果解析失败,创建一个空的模型实例
empty_instance = slot_model_class()
return empty_instance
async def _generate_step_back_prompt_async(self, query: str, chat_history: List[Dict[str, str]] = None, conversation_context: str = "") -> StepBackPrompt:
"""
异步生成后退提示
Args:
query: 用户原始问题
chat_history: 历史对话记录
conversation_context: 会话背景信息
Returns:
后退提示结果
"""
step_back_start_time = time.time()
# 准备提示词
step_back_parser = PydanticOutputParser(pydantic_object=StepBackPrompt)
formatted_prompt = step_back_prompt.format(
query=query,
chat_history=json.dumps(chat_history, ensure_ascii=False) if chat_history else "[]",
conversation_context=conversation_context,
output_format=step_back_parser.get_format_instructions()
)
try:
# 异步调用LLM
response = await self._llm.invoke_async(formatted_prompt, False)
# 解析输出
parsed_output = step_back_parser.parse(response.content)
step_back_end_time = time.time()
step_back_time = step_back_end_time - step_back_start_time
logging.debug(f"异步后退提示生成耗时统计 - 总耗时: {step_back_time:.2f}")
return parsed_output
except Exception as e:
# 如果解析失败,返回原始查询作为后退提示
logging.error(f"异步后退提示生成失败: {e}", exc_info=True)
return StepBackPrompt(original_query=query, step_back_query=query)
async def _generate_follow_up_questions_async(self, query: str, chat_history: List[Dict[str, str]] = None, conversation_context: str = "") -> FollowUpQuestions:
"""
异步生成后续问题
Args:
query: 用户原始问题
chat_history: 历史对话记录
conversation_context: 会话背景信息
Returns:
后续问题结果
"""
follow_up_start_time = time.time()
# 准备提示词
follow_up_parser = PydanticOutputParser(pydantic_object=FollowUpQuestions)
formatted_prompt = follow_up_questions_prompt.format(
query=query,
chat_history=json.dumps(chat_history, ensure_ascii=False) if chat_history else "[]",
conversation_context=conversation_context,
output_format=follow_up_parser.get_format_instructions()
)
try:
# 异步调用LLM
response = await self._llm.invoke_async(formatted_prompt, False)
# 解析输出
parsed_output = follow_up_parser.parse(response.content)
follow_up_end_time = time.time()
follow_up_time = follow_up_end_time - follow_up_start_time
logging.debug(f"异步后续问题生成耗时统计 - 总耗时: {follow_up_time:.2f}")
return parsed_output
except Exception as e:
# 如果解析失败,返回原始查询作为后续问题
logging.error(f"异步后续问题生成失败: {e}", exc_info=True)
return FollowUpQuestions(original_query=query, follow_up_query=query)
async def _generate_hypothetical_document_async(self, query: str, chat_history: List[Dict[str, str]] = None, conversation_context: str = "") -> HypotheticalDocument:
"""
异步生成假设性文档
Args:
query: 用户原始问题
chat_history: 历史对话记录
conversation_context: 会话背景信息
Returns:
假设性文档结果
"""
hyde_start_time = time.time()
# 准备提示词
hyde_parser = PydanticOutputParser(pydantic_object=HypotheticalDocument)
formatted_prompt = hyde_prompt.format(
query=query,
chat_history=json.dumps(chat_history, ensure_ascii=False) if chat_history else "[]",
conversation_context=conversation_context,
output_format=hyde_parser.get_format_instructions()
)
try:
# 异步调用LLM
response = await self._llm.invoke_async(formatted_prompt, False)
# 解析输出
parsed_output = hyde_parser.parse(response.content)
hyde_end_time = time.time()
hyde_time = hyde_end_time - hyde_start_time
logging.debug(f"异步假设性文档生成耗时统计 - 总耗时: {hyde_time:.2f}")
return parsed_output
except Exception as e:
# 如果解析失败,返回空的假设性回答
logging.error(f"异步假设性文档生成失败: {e}", exc_info=True)
return HypotheticalDocument(original_query=query, hypothetical_answer="")
async def _generate_multi_questions_async(self, query: str, chat_history: List[Dict[str, str]] = None, conversation_context: str = "") -> MultiQuestions:
"""
异步生成多角度问题
Args:
query: 用户原始问题
chat_history: 历史对话记录
conversation_context: 会话背景信息
Returns:
多角度问题结果
"""
multi_questions_start_time = time.time()
# 准备提示词
multi_questions_parser = PydanticOutputParser(pydantic_object=MultiQuestions)
formatted_prompt = multi_questions_prompt.format(
query=query,
chat_history=json.dumps(chat_history, ensure_ascii=False) if chat_history else "[]",
conversation_context=conversation_context,
output_format=multi_questions_parser.get_format_instructions()
)
try:
# 异步调用LLM
response = await self._llm.invoke_async(formatted_prompt, False)
# 解析输出
parsed_output = multi_questions_parser.parse(response.content)
multi_questions_end_time = time.time()
multi_questions_time = multi_questions_end_time - multi_questions_start_time
logging.debug(f"异步多角度问题生成耗时统计 - 总耗时: {multi_questions_time:.2f}")
return parsed_output
except Exception as e:
# 如果解析失败,返回原始查询作为唯一子问题
logging.error(f"异步多角度问题生成失败: {e}", exc_info=True)
return MultiQuestions(original_query=query, sub_questions=[query])
async def _process_intent_and_slot_async(self, user_input: str, conversation_context: str = "",
chat_history: List[Dict[str, str]] = None,
previous_slots: Dict[str, Any] = None) -> Dict[str, Any]:
"""
异步使用统一提示词同时进行意图识别和槽位填充
Args:
user_input: 当前用户输入
conversation_context: 会话背景信息
chat_history: 历史对话记录,格式为[{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}]
previous_slots: 历史槽位信息
Returns:
包含意图分类和槽位填充结果的字典
"""
# 初始化默认值
if chat_history is None:
chat_history = []
if previous_slots is None:
previous_slots = {}
# 生成槽位映射文档
slot_mapping_doc = generate_slot_mapping_doc()
# 准备提示词
parser = PydanticOutputParser(pydantic_object=IntentAndSlotResult)
formatted_prompt = intent_and_slot_prompt.format(
conversation_context=conversation_context,
chat_history=json.dumps(chat_history, ensure_ascii=False),
previous_slots=json.dumps(previous_slots, ensure_ascii=False),
user_input=user_input,
slot_mapping_doc=slot_mapping_doc,
output_format=parser.get_format_instructions(),
classification_info=classification_info
)
# 异步调用LLM
llm_start_time = time.time()
response = await self._llm.invoke_async(formatted_prompt + output_example, False)
llm_end_time = time.time()
llm_time = llm_end_time - llm_start_time
try:
# 解析LLM响应为JSON
result_json = parser.parse(response.content)
classification = result_json.classification
slot_filling = result_json.slots
is_complete, missing_slots = slot_filling.check_required_slots()
expected_slot_model = self._get_slot_model(classification)
# 添加容错处理,发生概率较低,但仍需处理
if expected_slot_model is None:
# 添加容错处理,应对LLM返回错误分类信息,一级分类跟二级分类错乱
# 重新分类
classification = await self._classify_intent_async(user_input, conversation_context, chat_history, previous_slots)
fill_slots = await self._fill_slots_async(user_input, classification, conversation_context, chat_history, previous_slots)
result = {
"classification": classification.model_dump(),
"slot_filling": fill_slots
}
logging.warning(f"异步重新分类与槽点填充")
return result
elif expected_slot_model.__name__ != type(slot_filling).__name__:
# 添加容错处理,应对LLM槽位与分类不匹配。重新填充槽位
slot_filling = await self._fill_slots_async(user_input, classification, conversation_context, chat_history, previous_slots)
result = {
"classification": classification.model_dump(),
"slot_filling": slot_filling
}
logging.warning(f"异步重新填充槽点")
return result
logging.info(f"异步意图识别+槽位LLM调用耗时: {llm_time:.2f}")
# 构建最终结果
result = {
"classification": classification.model_dump(),
"slot_filling": {
"is_complete": is_complete,
"missing_slots": missing_slots,
"filled_data": slot_filling.model_dump()
}
}
return result
except Exception as e:
raise RuntimeError(f"异步process_intent_and_slot error:{e}") from e
@@ -10,11 +10,13 @@ Description: 专业名词向量化和检索的核心逻辑
import os
import json
import shutil
import asyncio
from typing import List, Dict, Any, Tuple, Optional
from langchain.embeddings.base import Embeddings
from langchain_community.vectorstores import FAISS
from rag2_0.tool.ModelTool import SiliconFlowEmbeddings
import logging
import httpx
def get_embedding_model(api_key: str = None) -> Embeddings:
"""
@@ -350,4 +352,148 @@ class ProfessionalNounRetriever:
except Exception as e:
logging.error(f"查询FAISS索引失败: {e}", exc_info=True)
return []
return []
class AsyncProfessionalNounRetriever:
"""异步专业名词检索类"""
def __init__(self,
embedding_model: Optional[Embeddings] = None,
api_key: str = None,
index_dir: str = None):
"""
初始化异步检索器
Args:
embedding_model: 嵌入模型,如果为None则使用默认模型
api_key: SiliconFlow API密钥,仅在embedding_model为None时使用
index_dir: 索引目录路径,默认为None使用默认路径
"""
# 设置嵌入模型
if embedding_model:
self.embedding_model = embedding_model
else:
self.embedding_model = get_embedding_model(api_key)
# 设置索引路径
self.index_dir = index_dir
if self.index_dir is None:
current_dir = os.path.dirname(os.path.abspath(__file__))
self.index_dir = os.path.join(current_dir, "..", "..", "data", "nouns", "professional_nouns_index")
# 初始化索引为None,不在构造函数中加载
self.faiss_index = None
@classmethod
async def create(cls,
embedding_model: Optional[Embeddings] = None,
api_key: str = None,
index_dir: str = None):
"""
异步工厂方法:创建并初始化异步检索器实例
Args:
embedding_model: 嵌入模型,如果为None则使用默认模型
api_key: SiliconFlow API密钥,仅在embedding_model为None时使用
index_dir: 索引目录路径,默认为None使用默认路径
Returns:
初始化完成的AsyncProfessionalNounRetriever实例
"""
instance = cls(embedding_model, api_key, index_dir)
await instance._load_index_async()
return instance
async def _load_index_async(self) -> None:
"""
异步从本地加载FAISS索引 (内部方法)
"""
try:
# 由于FAISS加载可能是CPU密集型操作,使用线程池执行器来避免阻塞事件循环
self.faiss_index = await asyncio.to_thread(
FAISS.load_local,
folder_path=self.index_dir,
embeddings=self.embedding_model,
allow_dangerous_deserialization=True
)
logging.info(f"异步成功从 {self.index_dir} 加载FAISS索引")
except Exception as e:
logging.warning(f"异步加载FAISS索引失败: {e}")
self.faiss_index = None
async def _invoke_retriever_async(self, retriever, query_text: str):
"""
异步调用检索器 (内部方法)
Args:
retriever: 检索器实例
query_text: 查询文本
Returns:
检索结果
"""
# 由于LangChain的retriever.invoke可能不是异步的,使用线程池执行器
return await asyncio.to_thread(retriever.invoke, query_text)
async def query_async(self, query_text: str, top_k: int = 5, use_intersection: bool = True) -> List[Dict]:
"""
异步查询FAISS索引,获取最相似的专业名词
Args:
query_text: 查询文本
top_k: 返回的结果数量,默认为5
use_intersection: 是否使用三种检索方式的交集,默认为True
Returns:
相似度最高的专业名词列表
"""
try:
# 检查索引是否已加载
if self.faiss_index is None:
logging.warning("FAISS索引未加载,尝试加载...")
await self._load_index_async()
if self.faiss_index is None:
logging.warning("异步加载FAISS索引失败,无法执行查询")
return []
# 使用三种检索方式并取交集
retriever1 = self.faiss_index.as_retriever(search_kwargs={"k": top_k})
retriever2 = self.faiss_index.as_retriever(
search_type="mmr",
search_kwargs={"k": top_k, "fetch_k": 3, "lambda_mult": 0.5}
)
retriever3 = self.faiss_index.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"score_threshold": 0.5}
)
# 并行执行三个检索任务
results = await asyncio.gather(
self._invoke_retriever_async(retriever1, query_text),
self._invoke_retriever_async(retriever2, query_text),
self._invoke_retriever_async(retriever3, query_text)
)
# 用json.dumps将dict转为字符串,便于取交集
set1 = set(json.dumps(i.metadata, sort_keys=True, ensure_ascii=False) for i in results[0])
set2 = set(json.dumps(i.metadata, sort_keys=True, ensure_ascii=False) for i in results[1])
set3 = set(json.dumps(i.metadata, sort_keys=True, ensure_ascii=False) for i in results[2])
# 如果use_intersection为True,取交集;否则取并集
if use_intersection:
intersection = set1 & set2 & set3
else:
intersection = set1 | set2 | set3
# 如果交集为空,使用第一种检索方式的结果
if not intersection:
logging.warning("三种检索方式无交集,使用普通检索结果")
return [json.loads(item) for item in set1]
# 转回dict
return [json.loads(item) for item in intersection]
except Exception as e:
logging.error(f"异步查询FAISS索引失败: {e}", exc_info=True)
return []
+1 -1
View File
@@ -1,5 +1,5 @@
#!/usr/bin/env python
from .ProfessionalNounVector import ProfessionalNounVectorizer, ProfessionalNounRetriever
from .IntentRecognition import IntentRecognizer
from .IntentRecognition import IntentRecognizer, AsyncIntentRecognizer
from .DataModels import Term, TermList, Classification, QueryRewrite
+157
View File
@@ -8,7 +8,9 @@ Description: 模型工具类
"""
from openai import OpenAI
from openai import AsyncOpenAI
import httpx
import asyncio
import time
import logging # 导入 logging 模块
from langchain.embeddings.base import Embeddings
@@ -41,12 +43,34 @@ class SiliconFlowEmbeddings(Embeddings):
data = response.json()
return [item["embedding"] for item in data["data"]]
async def _embed_async(self, input: List[str]) -> List[List[float]]:
"""异步嵌入方法"""
payload = {
"model": self.model,
"input": input,
"encoding_format": "float"
}
async with httpx.AsyncClient() as client:
response = await client.post(self.url, json=payload, headers=self.headers)
response.raise_for_status()
data = response.json()
return [item["embedding"] for item in data["data"]]
def embed_documents(self, texts: List[str]) -> List[List[float]]:
return self._embed(texts)
async def embed_documents_async(self, texts: List[str]) -> List[List[float]]:
"""异步嵌入多个文档"""
return await self._embed_async(texts)
def embed_query(self, text: str) -> List[float]:
return self._embed([text])[0]
async def embed_query_async(self, text: str) -> List[float]:
"""异步嵌入单个查询"""
result = await self._embed_async([text])
return result[0]
class SiliconFlowReRankerModel:
@staticmethod
def rerank(query: str, documents: List[str], top_k: int = 10) -> List[str]:
@@ -84,6 +108,44 @@ class SiliconFlowReRankerModel:
except requests.exceptions.RequestException as e:
logging.error(f"重排序请求失败: {str(e)}", exc_info=True)
return []
@staticmethod
async def rerank_async(query: str, documents: List[str], top_k: int = 10) -> List[str]:
"""
使用硅流重排模型对文档进行异步重新排序
Args:
query: 用户查询文本
documents: 需要重新排序的文档列表
top_k: 返回排序后的前k个文档
Returns:
List[dict]: 重排序后的文档列表,每个元素包含document内容、相关性分数和原始索引
"""
url = "https://api.siliconflow.cn/v1/rerank"
payload = {
"model": "BAAI/bge-reranker-v2-m3",
"query": query,
"documents": documents,
"top_n": top_k,
"max_chunks_per_doc": 1024,
"overlap_tokens": 80,
"return_documents": True
}
api_key = APIKeyManager.get_api_key()
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, json=payload, headers=headers)
response.raise_for_status()
results = response.json()
return [{"document": item["document"]["text"], "score": item["relevance_score"], "index": item["index"]} for item in results["results"]]
except httpx.RequestError as e:
logging.error(f"异步重排序请求失败: {str(e)}", exc_info=True)
return []
class XinferenceReRankerModel:
"""重排模型封装"""
@@ -122,6 +184,39 @@ class XinferenceReRankerModel:
logging.error(f"XinferenceReRankerModel重排序请求失败: {str(e)}")
return []
@staticmethod
async def rerank_async(query: str, documents: List[str], top_k: int = 10) -> List[str]:
"""
使用重排序模型对文档进行异步重新排序
Args:
query: 用户查询文本
documents: 需要重新排序的文档列表
top_k: 返回排序后的前k个文档
Returns:
List[dict]: 重排序后的文档列表,每个元素包含document内容、相关性分数和原始索引
"""
url = "http://172.20.0.145:9995/v1/rerank"
params = {"documents": documents, "query": query, "top_n": top_k, "return_documents": True, "model": "bge-reranker-v2-m3"}
headers = {
"Authorization": "Bearer <token>", # 这里需要替换为实际的token
"Content-Type": "application/json"
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, json=params, headers=headers)
response.raise_for_status() # 检查响应状态
results = response.json()
# 返回重排序后的文档列表
return [{"document": item["document"]["text"], "score": item["relevance_score"], "index": item["index"]} for item in results["results"]]
except httpx.RequestError as e:
logging.error(f"XinferenceReRankerModel异步重排序请求失败: {str(e)}")
return []
class OpenAiLLM:
@@ -189,6 +284,47 @@ class OpenAiLLM:
except Exception as e:
raise RuntimeError(f"OpenAiLLM:invoke:error:{str(e)}.api_key:{api_key}") from e
async def invoke_async(self, user_prompt="你是谁?", need_retry=True):
"""异步调用OpenAI API"""
max_retries = 3
retry_count = 0
if "timeout" not in self._kwargs:
timeout = httpx.Timeout(300.0)
self._kwargs["timeout"] = timeout
if need_retry:
while retry_count < max_retries:
try:
api_key = APIKeyManager.get_api_key()
# 使用异步客户端
async with AsyncOpenAI(api_key=api_key, base_url=self._url) as client:
# 创建异步Completion请求
completion = await client.chat.completions.create(
model=self._model,
messages=[{'role': 'user', 'content': user_prompt}],
**self._kwargs
)
return completion.choices[0].message
except Exception as e:
retry_count += 1
if retry_count == max_retries:
raise RuntimeError(f"OpenAiLLM:invoke_async:error:{str(e)}.api_key:{api_key}") from e
else:
await asyncio.sleep(5*retry_count) # 异步等待
else:
try:
api_key = APIKeyManager.get_api_key()
async with AsyncOpenAI(api_key=api_key, base_url=self._url) as client:
completion = await client.chat.completions.create(
model=self._model,
messages=[{'role': 'user', 'content': user_prompt}],
**self._kwargs
)
return completion.choices[0].message
except Exception as e:
raise RuntimeError(f"OpenAiLLM:invoke_async:error:{str(e)}.api_key:{api_key}") from e
if __name__ == "__main__":
# 测试重排模型
reranker = SiliconFlowReRankerModel()
@@ -202,4 +338,25 @@ if __name__ == "__main__":
print(f"{idx+1}. 文档: {item['document']}, 分数: {item['score']}")
print("-" * 50)
# 异步测试示例
async def test_async():
# 测试异步嵌入
api_key = APIKeyManager.get_api_key()
embeddings = SiliconFlowEmbeddings(api_key=api_key)
query_embedding = await embeddings.embed_query_async("测试查询")
print(f"异步嵌入向量维度: {len(query_embedding)}")
# 测试异步重排序
results = await SiliconFlowReRankerModel.rerank_async(query, documents)
print(f"异步重排序结果数量: {len(results)}")
# 测试异步LLM调用
llm = OpenAiLLM()
response = await llm.invoke_async("你好,请简单介绍一下自己")
print(f"异步LLM响应: {response.content}")
# 如果需要运行异步测试,取消下面的注释
# import asyncio
# asyncio.run(test_async())