Files
QueryRewrite/rag2_0/intent_recognition/IntentRecognition.py
T

761 lines
32 KiB
Python
Executable File

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
File: IntentRecognition.py
Author: oyyz
Date: 2025-05-13
Description: 意图分类、改写核心逻辑
"""
import logging
import os
import asyncio
from langchain.output_parsers import PydanticOutputParser
from langchain_core.output_parsers import JsonOutputParser
import json
from typing import List, Tuple, Dict, Any, Optional
import re
import jieba
import time
from .PromptTemplates import (classification_prompt, query_rewrite_prompt_pro,
extract_nouns_prompt, classification_info,
slot_filling_prompt, step_back_prompt)
from .DataModels import (
Classification, QueryRewrite, Term, TermList,
SoftwareFunctionSlots, SoftwareTroubleShootingSlots, ProfessionalConsultingSlots,
DataProblemSlots, FileExtensionConsultingSlots, SoftwareLockSlots,
InstallationDownloadSlots, ProblemDiagnosisSlots, OtherSlots,
StepBackPrompt
)
from rag2_0.tool.ModelTool import OpenAiLLM
class AsyncIntentRecognizer:
SOFT_WIKI_PATH = "data/wiki_data"
SOFT_NAMETOWIKI_MAP = {
"配网计价通D3软件": "配网计价通D3软件.txt",
"西藏计价通Z1软件": "西藏计价通Z1软件.txt",
"储能计价通C1软件": "储能计价通C1软件.txt",
"技改检修工程计价通T1软件": "技改检修工程计价通T1软件.txt",
"技改检修清单计价通T1软件": "技改检修清单计价通T1软件.txt",
"电力建设计价通软件": "电力建设计价通软件.txt",
"下载安装注册": "下载安装注册.txt",
}
"""
异步意图识别和问题改写类
"""
def __init__(self):
"""
初始化异步意图识别器
Args:
api_key: OpenAI API密钥,如果为None则从环境变量获取
base_url: OpenAI API基础URL,如果为None则使用默认URL
model_name: 要使用的模型名称
vector_index_dir: 向量索引目录,如果为None则使用默认目录
"""
base_url = os.getenv("OPENAI_API_BASE")
model_name = os.getenv("MODEL_NAME", "gpt-3.5-turbo")
# 初始化LLM
llm_params = {
"temperature": 0.4, # 降低随机性,使结果更确定
"top_p": 0.7,
"model": model_name,
"base_url": base_url
}
self._llm = OpenAiLLM(**llm_params)
# 加载suffix关键词
self._suffix_keywords = self._load_suffix_keywords()
# 加载软件词条名称库
self._soft_wiki_library = self._load_soft_wiki_library()
# 异步检索器将在create方法中初始化
# self._noun_retriever = None
# 初始化jieba自定义词典
self._init_jieba_dict()
self._synonymous_dict=self._init_synonymous_dict()
def _init_jieba_dict(self):
"""初始化jieba自定义词典"""
try:
current_dir = os.path.dirname(os.path.abspath(__file__))
dict_path = os.path.join(current_dir, "..", "..", "data", "nouns", "all_synonymous_jieba.txt")
# 检查字典文件是否存在
if os.path.exists(dict_path):
jieba.load_userdict(dict_path)
logging.info("成功加载jieba自定义词典")
else:
logging.warning(f"自定义词典文件不存在: {dict_path}")
except Exception as e:
logging.error(f"加载jieba自定义词典失败: {e}")
def _init_synonymous_dict(self):
"""加载同义词,key是同义词 val:是对应名词"""
try:
current_dir = os.path.dirname(os.path.abspath(__file__))
dict_path = os.path.join(current_dir, "..", "..", "data", "nouns", "merged_nouns.json")
# 检查字典文件是否存在
synonymous_dict={}
if os.path.exists(dict_path):
with open(dict_path, "r", encoding="utf-8") as f:
data = json.load(f)
for cur_data in data:
synonymous=cur_data["synonymous"]
name=cur_data["name"]
for cur_synonymous in synonymous:
synonymous_dict[cur_synonymous]=name
else:
logging.warning(f"名词库文件不存在: {dict_path}")
return synonymous_dict
except Exception as e:
logging.error(f"加载名词库文件失败: {e}")
return {}
def _load_soft_wiki_library(self):
"""
加载软件wiki库
"""
SOFT_WIKI_LIBRARY = {}
for soft_name, wiki_file_name in self.SOFT_NAMETOWIKI_MAP.items():
with open(f"{self.SOFT_WIKI_PATH}/{wiki_file_name}", "r", encoding="utf-8") as f:
lines = f.readlines()
# 去除空行
lines = [line.strip() for line in lines if line.strip()]
SOFT_WIKI_LIBRARY[soft_name] = lines
return SOFT_WIKI_LIBRARY
@classmethod
async def create(cls):
"""
异步工厂方法:创建并初始化异步意图识别器实例
Returns:
初始化完成的AsyncIntentRecognizer实例
"""
instance = cls()
# 异步初始化名词检索器
# instance._noun_retriever = await AsyncProfessionalNounRetriever.create()
return instance
def _load_suffix_keywords(self, filepath: str = None) -> List[str]:
"""
加载后缀关键词列表
Args:
filepath: 后缀关键词文件路径,默认为None使用默认路径
Returns:
后缀关键词列表
"""
try:
# 如果未指定路径,使用默认路径
if filepath is None:
current_dir = os.path.dirname(os.path.abspath(__file__))
filepath = os.path.join(current_dir, "..", "..", "data", "nouns", "suffix_keywords.json")
# 读取JSON文件
with open(filepath, "r", encoding="utf-8") as f:
suffix_data = json.load(f)
# 添加额外的固定后缀
return suffix_data
except Exception as e:
raise RuntimeError(f"加载后缀关键词失败: {e}") from e
def _clean_llm_json(self, content: str) -> str:
"""
统一清洗LLM返回的JSON字符串:
1) 去首尾空白
2) 去除<think>…</think>段
3) 通过第一个"{"与最后一个"}"裁剪内容,移除首尾多余字符
4) 压缩所有空白字符
"""
content = content.strip()
content = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL)
# 裁剪到最外层花括号范围
start = content.find('{')
end = content.rfind('}')
if start != -1 and end != -1 and start < end:
content = content[start:end+1]
content = re.sub(r'\s+', '', content)
return content
async def _classify_intent_async(self, query: str, conversation_context: str = "",
chat_history: List[Dict[str, str]] = None,
previous_slots: Dict[str, Any] = None) -> Classification:
"""
异步对用户输入进行意图分类
Args:
content: 用户输入内容
keywords: 匹配到的关键词列表
rewrite: 重写的问题
Returns:
分类结果
"""
start_time = time.time() # 记录开始时间
classification_parser = PydanticOutputParser(pydantic_object=Classification)
formatted_prompt = classification_prompt.format(user_input=query,
classification_info=classification_info,
output_format=Classification.get_format_instructions(),
# conversation_context=conversation_context,
chat_history=json.dumps(chat_history, ensure_ascii=False))
# 解析输出
try:
# 异步调用LLM
# response = await self._llm.ainvoke(formatted_prompt, response_format={"type": "json_object"}, extra_body={"enable_thinking": False})
response = await self._llm.ainvoke(formatted_prompt, response_format={"type": "json_object"})
# 尝试直接解析JSON响应
clean_output = self._clean_llm_json(response.content)
parsed_output = classification_parser.parse(clean_output)
# 计算并打印耗时
end_time = time.time()
logging.info(f"意图分类耗时: {end_time - start_time:.2f}秒")
return parsed_output
except Exception as e:
raise RuntimeError(f"解析分类结果时出错: {e}") from e
def _tokenize_with_jieba(self, query: str) -> List[str]:
"""
使用jieba分词器对查询进行分词
Args:
query: 用户查询
Returns:
分词后的词语列表
"""
# 使用jieba进行分词
seg_list = jieba.cut(query, cut_all=False)
# 过滤掉停用词和标点符号
filtered_tokens = []
for token in seg_list:
# 过滤掉空格和标点符号
if token.strip() and not re.match(r'^[^\w\s]$', token):
filtered_tokens.append(token)
return filtered_tokens
async def _extract_keywords_async(self, query: str, use_jieba: bool = False) -> List[Term]:
"""
异步使用LLM从用户查询中提取专业关键词
Args:
query: 用户查询
use_jieba: 是否使用jieba分词辅助提取关键词
Returns:
提取的术语列表
"""
# 如果使用jieba分词
if use_jieba:
# 先使用jieba分词
tokens = self._tokenize_with_jieba(query)
# 构建术语列表
terms = []
for token in tokens:
if len(token) > 1: # 过滤掉单字词
terms.append(Term(name=token, synonymous=[], description=""))
return terms
else:
# 使用LLM提取关键词
# 准备提示词
formatted_prompt = extract_nouns_prompt.replace("{content}", query)
terms_list_parser = PydanticOutputParser(pydantic_object=TermList)
formatted_prompt = formatted_prompt.replace("{output_format}", terms_list_parser.get_format_instructions())
# 异步调用LLM
# response = await self._llm.ainvoke(formatted_prompt, response_format={"type": "json_object"}, extra_body={"enable_thinking": False})
response = await self._llm.ainvoke(formatted_prompt, response_format={"type": "json_object"})
# 尝试使用Pydantic解析器解析TermList
clean_output = self._clean_llm_json(response.content)
parsed_output = terms_list_parser.parse(clean_output)
return parsed_output.terms
async def _match_keywords_async(self, query: str, use_jieba: bool = False) -> Tuple[TermList, List[str]]:
"""
异步从用户问题中匹配关键词,结合LLM提取和向量检索
Args:
query: 用户问题
use_jieba: 是否使用jieba分词辅助提取关键词
Returns:
匹配到的关键词列表
"""
start_time = time.time()
query_keys=[]
# 步骤1: 提取查询中的关键词
try:
llm_start_time = time.time()
extracted_terms = await self._extract_keywords_async(query, use_jieba)
for term in extracted_terms:
query_keys.append(term.name)
llm_end_time = time.time()
llm_time = llm_end_time - llm_start_time
except Exception as e:
raise RuntimeError(f"异步LLM关键词提取失败: {e}") from e
matched_terms = [] # 存储匹配到的Term对象
# 查找同义词
for cur_key in query_keys:
if cur_key not in self._synonymous_dict:
continue
name = self._synonymous_dict[cur_key]
matched_terms.append(Term(name=name,synonymous=[cur_key],description=""))
# 提取所有Term对象的名称并排序
# 将set类型的matched_terms转换为TermList类型
term_list = TermList(terms=list(matched_terms))
end_time = time.time()
total_time = end_time - start_time
# 输出整合的时间日志
# logging.info(f"异步关键词匹配耗时统计 - 总耗时: {total_time:.2f}秒")
return term_list, query_keys
async def _get_dinge_qingdan_info(self, query: str, chat_history: List[Dict[str, str]] = None) -> dict:
"""
获取问题中定额、清单相关信息
Args:
query: 用户查询
Returns:
指令详情字典,包含定额、清单相关信息
"""
start_time = time.time() # 记录开始时间
prompt=f"""当前提问内容:
<query>{query}</query>
对话上下文:
<chat_history>
{json.dumps(chat_history, ensure_ascii=False)}
</chat_history>
1、请从当前提问内容中提取电力造价行中定额编码、定额名称、清单编码、清单名称
2、请勿随机编造,如果没有提取到内容返回空的JSON
3、返回结果为json格式,必须严格以纯JSON格式输出
{{
"dinge_info_list":{{"dinge_code_list":["xxxx","xxxx"], "dinge_name_list":["xxxx","xxxx"]}},
"qingdan_info":{{"qingdan_code_list":["xxxx","xxxx"], "qingdan_name_list":["xxxx","xxxx"]}}
}}"""
try:
# response = await self._llm.ainvoke(prompt, response_format={"type": "json_object"}, extra_body={"enable_thinking": False})
response = await self._llm.ainvoke(prompt, response_format={"type": "json_object"})
clean_output = self._clean_llm_json(response.content)
parsed_output = JsonOutputParser().parse(clean_output)
# 计算并打印耗时
end_time = time.time()
logging.info(f"获取定额清单信息耗时: {end_time - start_time:.2f}秒")
return parsed_output
except Exception as e:
# 发生异常时也记录耗时
logging.error(f"获取问题定额清单详情失败: {e}", exc_info=True)
parsed_output = {"dinge_info_list": [], "qingdan_info": []}
return parsed_output
async def _rewrite_query_async(self, query: str, keywords: TermList, query_keys:List[str], chat_history: List[Dict[str, str]] = None, context: str = "") -> QueryRewrite:
"""
异步对用户问题进行改写
Args:
query: 用户原始问题
keywords: 匹配到的关键词列表
query_keys: 用户查询中提取的关键词列表
Returns:
改写结果
"""
start_time = time.time()
# 准备问题改写提示
terms_dict = [term.model_dump(exclude={"description"}) for term in keywords.terms]
keywords_str = json.dumps(terms_dict, ensure_ascii=False)
query_rewrite_parser = PydanticOutputParser(pydantic_object=QueryRewrite)
formatted_prompt = query_rewrite_prompt_pro.format(query=query,
output_format=QueryRewrite.get_format_instructions(),
keywords=keywords_str,
chat_history=chat_history,
context=context)
# 解析输出
try:
# 异步调用LLM
# response = await self._llm.ainvoke(formatted_prompt,response_format={"type": "json_object"}, extra_body={"enable_thinking": False})
response = await self._llm.ainvoke(formatted_prompt,response_format={"type": "json_object"})
clean_output = self._clean_llm_json(response.content)
parsed_output = query_rewrite_parser.parse(clean_output)
end_time = time.time()
process_time=end_time-start_time
logging.info(f"异步问题改写耗时 - 耗时: {process_time:.2f}秒")
return parsed_output
except Exception as e:
raise RuntimeError(f"解析问题改写结果时出错: {e}") from e
def _process_lock_related_query(self, query: str) -> str:
"""
特殊处理锁相关咨询
"""
pattern = r'(?<!\d)(?:8116(?:\s*\d){8}|\d{2}\s*-\s*\d{6})(?!\d)'
matches = re.findall(pattern, query)
if not matches:
return query
lock_number = "、".join(matches)
return f"通过博微软件助手查询软件锁信息,锁注册号为{lock_number}"
async def process_query_async(self, query: str, conversation_context: Dict = None,
chat_history: List[Dict[str, str]] = None,
previous_slots: Dict[str, Any] = None,
use_jieba: bool = False,
enable_query_expansion: bool = False,
cur_soft_name: str = "") -> Dict[str, Any]:
"""
异步处理用户问题的完整流程
Args:
query: 用户原始问题
conversation_context: 会话背景信息
chat_history: 历史对话记录,格式为[{"user": "content"}, {"assistant": "content"}]
previous_slots: 历史槽位信息
use_jieba: 是否使用jieba分词辅助提取关键词
enable_query_expansion: 是否启用查询扩展
cur_soft_name: 当前查询的软件名称
Returns:
包含分类、关键词、改写和槽位填充结果的字典
"""
if chat_history is None:
chat_history = []
if previous_slots is None:
previous_slots = {}
if conversation_context is None:
conversation_context = {}
# 步骤: 并行执行提问扩展
query_expand_tasks = []
if enable_query_expansion:
# 创建异步任务并立即开始执行
query_expand_tasks = [
# 后退提示
asyncio.create_task(self._generate_step_back_prompt_async(query, chat_history, conversation_context)),
# 文档查询
# asyncio.create_task(self._find_matching_software_docs_async(query, cur_soft_name, chat_history)),
]
# 执行关键词匹配
keywords_task = self._match_keywords_async(query, use_jieba)
# 等待关键词匹配完成
keywords_terms, query_keys = await keywords_task
# 步骤2-3: 并行执行问题改写和意图分类
rewrite_task = self._rewrite_query_async(
query=query,
keywords=keywords_terms,
query_keys=query_keys,
chat_history=chat_history,
context=conversation_context
)
classification_task = self._classify_intent_async(query, conversation_context, chat_history, previous_slots)
# 定额清单信息
dinge_qingdan_info_task = self._get_dinge_qingdan_info(query, chat_history)
# 并行等待问题改写和意图分类完成
rewrite, classification, dinge_qingdan_info = await asyncio.gather(rewrite_task, classification_task, dinge_qingdan_info_task)
# 特殊处理 锁相关咨询
if classification.vertical_classification == "安装下载注册" and classification.sub_classification == "软件锁类":
process_lock_start_time = time.time()
# 特殊处理提问只有锁号的问题,手动将问题改写为特定格式
rewrite.rewrite = self._process_lock_related_query(rewrite.rewrite)
process_lock_end_time = time.time()
process_lock_time = process_lock_end_time - process_lock_start_time
logging.info(f"锁相关咨询正则匹配 - 总耗时: {process_lock_time:.2f}秒")
slot_filling_result = {}
if not enable_query_expansion:
return {
"classification": classification.model_dump(),
"keywords": keywords_terms.model_dump(),
"rewrite": rewrite.model_dump(),
"query_keys": query_keys,
"slot_filling": slot_filling_result,
"dinge_qingdan_info": dinge_qingdan_info
}
# 等待所有query_expand_tasks完成
start_time = time.time()
query_expand_results = await asyncio.gather(*query_expand_tasks)
end_time = time.time()
logging.info(f"问题扩展环节耗时统计 - 总耗时: {end_time - start_time:.2f}秒")
# 收集结果
step_back_result = query_expand_results[0] if query_expand_results[0] else StepBackPrompt(original_query=query, can_use_back_prompt=False, step_back_query=[query])
# wiki_result = query_expand_results[1] if query_expand_results[1] else []
all_questions=[]
all_questions.append(query)
all_questions.append(rewrite.rewrite)
# all_questions.extend(wiki_result)
all_questions.extend(step_back_result.step_back_query)
all_questions = list(set(all_questions))
query_expand = {
"all": all_questions,
"step_back": step_back_result.step_back_query,
# "wiki_title": wiki_result,
"original_query":query,
"rewrite_query":rewrite.rewrite
}
# 返回所有结果
return {
"classification": classification.model_dump(),
"keywords": keywords_terms.model_dump(),
"rewrite": rewrite.model_dump(),
"query_keys": query_keys,
"slot_filling": slot_filling_result,
"query_expand": query_expand,
"dinge_qingdan_info": dinge_qingdan_info
}
async def _fill_slots_async(self, query: str, classification: Classification, conversation_context: str = "",
chat_history: List[Dict[str, str]] = None,
previous_slots: Dict[str, Any] = None,) -> Dict[str, Any]:
"""
异步根据分类结果对问题进行槽位填充
Args:
query: 用户原始问题
classification: 意图分类结果
Returns:
填充后的槽位数据模型
"""
# 根据分类结果选择对应的数据模型
slot_model = self._get_slot_model(classification)
if not slot_model:
raise RuntimeError("未找到匹配的槽位模型")
fill_slots_start_time = time.time()
# 使用LLM进行槽位填充
filled_slots = await self._fill_slots_with_llm_async(query, classification, slot_model, conversation_context, chat_history, previous_slots)
fill_slots_end_time = time.time()
fill_slots_time = fill_slots_end_time - fill_slots_start_time
logging.info(f"异步槽位填充耗时统计 - 总耗时: {fill_slots_time:.2f}秒")
# 检查必填槽位是否都已填充
is_complete, missing_slots = filled_slots.check_required_slots()
return {
"is_complete": is_complete,
"missing_slots": missing_slots,
"filled_data": filled_slots.model_dump()
}
def _get_slot_model(self, classification: Classification) -> Optional[type]:
"""
根据分类结果获取对应的槽位模型类,用于统一提示词处理
Args:
classification: 意图分类结果
Returns:
对应的槽位模型类
"""
# 软件问题
if classification.vertical_classification == "软件问题":
if classification.sub_classification == "软件功能":
return SoftwareFunctionSlots
elif classification.sub_classification == "故障排查":
return SoftwareTroubleShootingSlots
# 业务问题
elif classification.vertical_classification == "业务问题":
if classification.sub_classification == "专业咨询":
return ProfessionalConsultingSlots
elif classification.sub_classification == "数据问题":
return DataProblemSlots
# 安装下载注册
elif classification.vertical_classification == "安装下载注册":
if classification.sub_classification == "后缀名咨询":
return FileExtensionConsultingSlots
elif classification.sub_classification == "软件锁类":
return SoftwareLockSlots
elif classification.sub_classification == "安装下载类":
return InstallationDownloadSlots
elif classification.sub_classification == "问题排查类":
return ProblemDiagnosisSlots
# 其他
elif classification.vertical_classification == "其他":
return OtherSlots
return None
async def _fill_slots_with_llm_async(self, query: str,
classification: Classification,
slot_model_class: type,
conversation_context: str = "",
chat_history: List[Dict[str, str]] = None,
previous_slots: Dict[str, Any] = None) -> Any:
"""
异步使用LLM进行槽位填充
Args:
query: 用户原始问题
classification: 意图分类结果
slot_model_class: 槽位模型类
Returns:
填充后的槽位数据模型实例
"""
# 准备提示词
slot_parser = PydanticOutputParser(pydantic_object=slot_model_class)
formatted_prompt = slot_filling_prompt.format(
query=query,
vertical_classification=classification.vertical_classification,
sub_classification=classification.sub_classification,
output_format=slot_parser.get_format_instructions(),
conversation_context=conversation_context,
chat_history=json.dumps(chat_history,ensure_ascii=False),
previous_slots=json.dumps(previous_slots,ensure_ascii=False),
)
try:
# 异步调用LLM
response = await self._llm.ainvoke(formatted_prompt,response_format={"type": "json_object"}, extra_body={"enable_thinking": False})
# response = await self._llm.ainvoke(formatted_prompt, extra_body={"enable_thinking": False})
clean_output = self._clean_llm_json(response.content)
# 尝试解析LLM响应
parsed_output = slot_parser.parse(clean_output)
return parsed_output
except Exception as e:
# 如果解析失败,创建一个空的模型实例
empty_instance = slot_model_class()
return empty_instance
async def _generate_step_back_prompt_async(self, query: str, chat_history: List[Dict[str, str]] = None, conversation_context: str = "") -> StepBackPrompt:
"""
异步生成后退提示
Args:
query: 用户原始问题
chat_history: 历史对话记录
conversation_context: 会话背景信息
Returns:
后退提示结果
"""
step_back_start_time = time.time()
# 准备提示词
step_back_parser = PydanticOutputParser(pydantic_object=StepBackPrompt)
formatted_prompt = step_back_prompt.format(
query=query,
chat_history=json.dumps(chat_history, ensure_ascii=False) if chat_history else "[]",
# conversation_context=conversation_context,
output_format=StepBackPrompt.get_format_instructions()
)
try:
# 异步调用LLM
# response = await self._llm.ainvoke(formatted_prompt, response_format={"type": "json_object"}, extra_body={"enable_thinking": False})
response = await self._llm.ainvoke(formatted_prompt, response_format={"type": "json_object"})
# 解析输出
clean_output = self._clean_llm_json(response.content)
parsed_output = step_back_parser.parse(clean_output)
step_back_end_time = time.time()
step_back_time = step_back_end_time - step_back_start_time
logging.info(f"后退提示生成耗时统计 - 总耗时: {step_back_time:.2f}秒")
return parsed_output
except Exception as e:
raise RuntimeError(f"解析后退提示结果时出错: {e}") from e
async def _find_matching_software_docs_async(self, query: str, soft_name: str,
chat_history: List[Dict[str, str]] = None,
top_k: int = 3) -> List[str]:
"""
异步查找软件文档中与用户问题最匹配的几行内容
Args:
query: 用户问题
soft_name: 软件名称
chat_history: 历史对话记录
top_k: 返回的匹配行数,默认为3
Returns:
匹配的文档行列表
"""
if chat_history is None:
chat_history = []
# 检查软件名称是否在支持的列表中
if soft_name not in self.SOFT_NAMETOWIKI_MAP:
return []
# 获取软件文档内容
soft_docs = self._soft_wiki_library.get(soft_name, [])
if not soft_docs:
return []
soft_docs.extend(self._soft_wiki_library.get("下载安装注册", []))
# soft_docs=soft_docs[:50]
# 构建文档字符串,只包含行内容
soft_docs_str = "\n".join(f"{doc.strip()}" for i, doc in enumerate(soft_docs))
# 构建提示词,让LLM选择最匹配的行
prompt = f"""
{soft_docs_str}
================================
以上为软件功能操作、常见问题排查等功能,结合历史对话,请输出与当前提问最相关的1-3个功能名称,
使用Json格式输出,如下:
[{{"content": "行内容"}},...]
当前问题: {query}
历史对话: {json.dumps(chat_history, ensure_ascii=False)}
"""
try:
# 异步调用LLM
start_time = time.time()
# response = await self._llm.ainvoke(prompt, response_format={"type": "json_object"}, extra_body={"enable_thinking": False})
response = await self._llm.ainvoke(prompt, response_format={"type": "json_object"})
end_time = time.time()
# 解析JSON响应
try:
wiki_names = []
json_parser = JsonOutputParser()
json_response = json_parser.parse(response.content)
for match in json_response:
wiki_names.append(match["content"])
logging.info(f"软件文档匹配耗时: {end_time - start_time:.2f}秒")
return wiki_names
except json.JSONDecodeError as e:
logging.error(f"解析JSON响应时出错: {e}")
return []
except Exception as e:
logging.error(f"查找匹配软件文档时出错: {e}", exc_info=True)
# 出错时返回空列表
return []