1、删除不再使用的.cursorrules文件

2、更新poetry.lock以反映Poetry版本的变化,添加jieba依赖,
3、重构意图识别逻辑以支持多轮对话,优化槽位填充和意图分类功能,增强代码可读性和维护性。
This commit is contained in:
2025-06-13 09:14:58 +08:00
parent b412019c17
commit d155565ef6
10 changed files with 1016 additions and 309 deletions
+36 -14
View File
@@ -8,6 +8,7 @@ Description: 意图识别和问题改写示例
import os
from dotenv import load_dotenv
from regex import F
from rag2_0.intent_recognition import IntentRecognizer
import pandas as pd
import logging
@@ -16,6 +17,7 @@ import concurrent.futures
from tqdm import tqdm
import time
import sys
from typing import List, Dict
# 加载环境变量
load_dotenv()
@@ -42,7 +44,7 @@ def load_questions_from_excel(file_path=None):
logging.error(f"读取Excel文件时出错: {e}")
return []
def process_query(recognizer, query):
def process_query(recognizer: IntentRecognizer, query: str, conversation_context: str = "", chat_history: List[Dict[str, str]] = None, previous_slots: Dict[str, str] = None):
"""
处理单个查询,支持重试机制,并包含槽位填充
@@ -59,8 +61,8 @@ def process_query(recognizer, query):
while retry_count <= max_retries:
try:
# 使用新的process_query_with_slots方法处理查询
result = recognizer.process_query_with_slots(query)
# result = recognizer.process_query_with_slots(query)
result = recognizer.process_query(query, conversation_context=conversation_context, chat_history=chat_history, previous_slots=previous_slots)
# 提取分类信息
classification = result["classification"]
@@ -101,7 +103,6 @@ def process_query(recognizer, query):
"检索的关键词": keywords_str,
"槽位填充": slot_filling_str
}
except Exception as e:
retry_count += 1
@@ -174,10 +175,30 @@ def save_results_to_excel(results, output_file, is_final=False):
logging.info(f"已保存{len(valid_results)}条结果至: {temp_output_file}")
# 示例查询
examples_query = """"锁标签号:811621005858, 注册单位:惠州电力勘察设计院有限公司,软件名称:广东迁改导则2022, 注册号:BW278-83834-58155-58339.迁改导则是要另外下载安装软件吗?"
"""
examples_query = """那西藏软件呢"""
conversation_context=""
chat_history=[
{
"user": "怎么新建工程啊"
},
{
"assistant": "您好!为了更准确地帮助您解决问题,请提供以下信息:\n\n- **软件名称**:请确认您使用的软件名称,可选值如下:\n - 配网工程计价通D3软件\n - 新型储能电站建设计价通C1软件\n - 西藏电力工程计价通Z1软件\n - 技改检修工程计价通T1软件\n - 技改检修清单计价通T1软件\n - 主网电力建设计价通软件\n\n请补充以上信息,以便我们能够更好地指导您如何新建工程。"
},
{
"user": "配网工程计价通D3软件"
},
{
"assistant": """好的,针对您使用**配网工程计价通D3软件**新建工程的问题,以下是具体的操作步骤:\n\n## 新建工程\n\n### 功能入口\n【主页】界面——"新建"按钮\n\n### 操作步骤\n1. 在"新建窗口"选择对应工程模板。\n2. 设置工程名称。\n3. 确定后根据新建向导完善"电压等级""地区类型"等参数。\n4. 点击"确定"即可完成新建工程。\n\n![进入新建工程窗口](https://172.20.0.145/files/362491a3-69c3-45b0-a037-b5f96b91bf73/image-preview)\n\n希望这些步骤能帮助您顺利完成新建工程。如果还有其他问题,欢迎随时提问!\n"""
}
]
previous_slots={
"software_name": "配网工程计价通D3软件",
"function_name": "新建工程",
"operation": "如何新建工程",
"project_type": None,
"software_version": None,
"operation_steps": None
}
def main():
"""
意图识别和问题改写示例
@@ -193,18 +214,19 @@ def main():
# 读取提问数据
current_dir = os.path.dirname(os.path.abspath(__file__))
data_file = os.path.join(current_dir, "..", "..", "data", "excel", "历史提问数据(dislike)_提问明确.xlsx")
data_file = os.path.join(current_dir, "..", "..", "data", "excel", "历史提问数据(like)_提问明确.xlsx")
output_file = os.path.join(current_dir, "..", "..", "data", "excel", "测试提问数据_槽位填充结果.xlsx")
# 检测是否为调试模式,调试模式下使用examples_query,否则从Excel读取
is_debug = hasattr(sys, 'gettrace') and sys.gettrace() is not None
# is_debug = False
if is_debug:
examples = examples_query.strip().split("\n")
else:
examples = load_questions_from_excel(data_file)
if not is_debug:
max_workers = 20 # 减少并发数以避免API限制
max_workers = 40 # 减少并发数以避免API限制
logging.info(f"共有 {len(examples)} 个问题需要处理,使用 {max_workers} 个并发线程")
# 创建一个与输入顺序相同的结果列表
@@ -229,9 +251,9 @@ def main():
completed += 1
# 每处理batch_size条数据保存一次
if completed % batch_size == 0:
logging.info(f"已完成 {completed}/{len(examples)} 条,保存中间结果...")
save_results_to_excel(results, output_file, is_final=False)
# if completed % batch_size == 0:
# logging.info(f"已完成 {completed}/{len(examples)} 条,保存中间结果...")
# save_results_to_excel(results, output_file, is_final=False)
# 处理完所有数据后,保存最终结果
save_results_to_excel(results, output_file, is_final=True)
@@ -240,7 +262,7 @@ def main():
for idx, query in enumerate(examples):
if query.strip() == "":
continue
process_query(recognizer, query)
process_query(recognizer, query, conversation_context, chat_history, previous_slots)
def setup_logging():
# 配置日志输出到控制台
+6 -3
View File
@@ -36,6 +36,9 @@ def intent_recognize():
try:
data = request.get_json(force=True)
query = data.get('query')
conversation_context = data.get('conversation_context', "")
chat_history = data.get('chat_history', None)
previous_slots = data.get('previous_slots', None)
if not query:
return Response(json.dumps({"error": "缺少query参数"}, ensure_ascii=False), content_type='application/json; charset=utf-8', status=400)
@@ -43,7 +46,7 @@ def intent_recognize():
# 获取单例实例并使用线程锁保护关键操作
recognizer = RecognizerSingleton.get_instance()
result = recognizer.process_query_with_slots(query)
result = recognizer.process_query(query, conversation_context, chat_history, previous_slots)
end_time = time.time()
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S %z")
@@ -60,8 +63,8 @@ def intent_recognize():
for term in keywords["terms"]:
term_info = {
"名称": term["name"],
"同义词": ";".join(term["synonymous"]) if term["synonymous"] else "",
"描述": term["description"]
# "同义词": ";".join(term["synonymous"]) if term["synonymous"] else "",
# "描述": term["description"]
}
term_details.append(term_info)
keywords_str = term_details
+177 -61
View File
@@ -7,8 +7,8 @@ Date: 2025-05-13
Description: 提取和分类的数据模型
"""
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Tuple
from pydantic import BaseModel, Field, field_validator
from typing import List, Optional, Dict, Tuple, Union, Any
from enum import Enum
class SoftwareName(str, Enum):
@@ -31,6 +31,61 @@ class SoftwareName(str, Enum):
MAIN: "别名包括:主网软件、电力建设软件、主网建设软件、博微电力建设计价通等其他类似称呼"
}
# 构建别名到标准名称的映射
def build_alias_mapping() -> Dict[str, SoftwareName]:
"""构建从别名到标准软件名称的映射字典"""
alias_map = {}
# 配网工程计价通D3软件的别名映射
alias_map["配网D3"] = SoftwareName.D3
alias_map["D3软件"] = SoftwareName.D3
alias_map["配网工程软件"] = SoftwareName.D3
alias_map["配网软件"] = SoftwareName.D3
# 新型储能电站建设计价通C1软件的别名映射
alias_map["储能C1"] = SoftwareName.C1
alias_map["C1软件"] = SoftwareName.C1
alias_map["储能电站软件"] = SoftwareName.C1
alias_map["储能软件"] = SoftwareName.C1
# 西藏电力工程计价通Z1软件的别名映射
alias_map["西藏Z1"] = SoftwareName.Z1
alias_map["Z1软件"] = SoftwareName.Z1
alias_map["西藏电力软件"] = SoftwareName.Z1
# 技改检修工程计价通T1软件的别名映射
alias_map["技改T1"] = SoftwareName.T1
alias_map["T1软件"] = SoftwareName.T1
alias_map["技改检修软件"] = SoftwareName.T1
# 技改检修清单计价通T1软件的别名映射
alias_map["技改清单T1"] = SoftwareName.T1_LIST
alias_map["T1清单软件"] = SoftwareName.T1_LIST
alias_map["技改检修清单软件"] = SoftwareName.T1_LIST
# 主网电力建设计价通软件的别名映射
alias_map["主网软件"] = SoftwareName.MAIN
alias_map["电力建设软件"] = SoftwareName.MAIN
alias_map["主网建设软件"] = SoftwareName.MAIN
alias_map["博微电力建设计价通"] = SoftwareName.MAIN
alias_map["主网计价通"] = SoftwareName.MAIN
alias_map["主网计价通软件"] = SoftwareName.MAIN
alias_map["计价通软件"] = SoftwareName.MAIN
alias_map["电力计价通软件"] = SoftwareName.MAIN
alias_map["计价通"] = SoftwareName.MAIN
# 添加标准名称映射
alias_map[SoftwareName.D3.value] = SoftwareName.D3
alias_map[SoftwareName.C1.value] = SoftwareName.C1
alias_map[SoftwareName.Z1.value] = SoftwareName.Z1
alias_map[SoftwareName.T1.value] = SoftwareName.T1
alias_map[SoftwareName.T1_LIST.value] = SoftwareName.T1_LIST
alias_map[SoftwareName.MAIN.value] = SoftwareName.MAIN
return alias_map
# 全局别名映射字典
SOFTWARE_NAME_ALIAS_MAP = build_alias_mapping()
# 定义输出模型
class Term(BaseModel):
name: str = Field(description="专业名词")
@@ -55,140 +110,201 @@ class Classification(BaseModel):
class QueryRewrite(BaseModel):
rewrite:str = Field(description="问题改写")
##########################槽位模型###########################
class SlotBase(BaseModel):
"""槽位基础模型"""
def check_required_slots(self) -> Tuple[bool, Dict[str, str]]:
"""检查必填槽位是否都存在"""
raise NotImplementedError("子类必须实现check_required_slots方法")
@field_validator('software_name', mode='before', check_fields=False)
@classmethod
def validate_software_name(cls, v):
"""验证并转换软件名称,支持别名"""
if v is None or v == "":
return ""
# 如果已经是枚举类型,直接返回其值
if isinstance(v, SoftwareName):
return v.value
# 如果是字符串,尝试转换
if isinstance(v, str):
# 直接匹配枚举值
for software in SoftwareName:
if v == software.value:
return software.value
# 尝试通过别名匹配
if v in SOFTWARE_NAME_ALIAS_MAP:
return SOFTWARE_NAME_ALIAS_MAP[v].value
# 如果无法匹配,返回原值用于错误提示
return v
return v
# 1. 软件问题
# 1.1 软件功能
class SoftwareFunction(BaseModel):
software_name: SoftwareName = Field(description="软件名称,只能从给定的范围中取值")
function_name: str = Field(description="具体功能名称")
operation: str = Field(description="用户操作意图(如何使用功能、功能入口、功能使用场景)")
software_version: Optional[str] = Field(None, description="软件版本")
operation_steps: Optional[str] = Field(None, description="操作步骤描述")
class SoftwareFunctionSlots(SlotBase):
software_name: str = Field(default="", description="软件名称")
function_name: str = Field(default="", description="具体功能名称")
operation: str = Field(default="", description="用户操作意图(如何使用功能、功能入口、功能使用场景)")
project_type: Optional[str] = Field(default="单工程", description="工程类型(单工程、多工程、批次工程)")
software_version: Optional[str] = Field(default="", description="软件版本")
operation_steps: Optional[str] = Field(default="", description="操作步骤描述")
def check_required_slots(self) -> Tuple[bool, Dict[str, str]]:
"""检查必填槽位是否都存在"""
missing_slots = {}
if not self.software_name:
missing_slots["software_name"] = f"{SoftwareFunction.model_fields['software_name'].description},可选值:{', '.join([name.value for name in SoftwareName if name not in [SoftwareName.UNKNOWN, SoftwareName.ALIASES]])}"
missing_slots["software_name"] = f"{SoftwareFunctionSlots.model_fields['software_name'].description},可选值:{', '.join([name.value for name in SoftwareName if name not in [SoftwareName.UNKNOWN, SoftwareName.ALIASES]])}"
if not self.function_name:
missing_slots["function_name"] = SoftwareFunction.model_fields["function_name"].description
missing_slots["function_name"] = SoftwareFunctionSlots.model_fields["function_name"].description
if not self.operation:
missing_slots["operation"] = SoftwareFunction.model_fields["operation"].description
missing_slots["operation"] = SoftwareFunctionSlots.model_fields["operation"].description
return len(missing_slots) == 0, missing_slots
# 1.2 故障排查
class TroubleShooting(BaseModel):
software_name: SoftwareName = Field(description="软件名称,只能从给定的范围中取值")
function_name: str = Field(description="具体功能名称/操作描述")
error_message: str = Field(description="报错信息/异常现象")
software_version: Optional[str] = Field(None, description="软件版本")
os_version: Optional[str] = Field(None, description="操作系统及版本")
reproduction_steps: Optional[str] = Field(None, description="故障重现步骤")
class SoftwareTroubleShootingSlots(SlotBase):
software_name: str = Field(default="", description="软件名称")
function_name: str = Field(default="", description="具体功能名称/操作描述")
error_message: str = Field(default="", description="报错信息/异常现象")
software_version: Optional[str] = Field(default="", description="软件版本")
os_version: Optional[str] = Field(default="", description="操作系统及版本")
reproduction_steps: Optional[str] = Field(default="", description="故障重现步骤")
project_type: Optional[str] = Field(default="单工程", description="工程类型(单工程、多工程、批次工程)")
def check_required_slots(self) -> Tuple[bool, Dict[str, str]]:
"""检查必填槽位是否都存在"""
missing_slots = {}
if not self.software_name:
missing_slots["software_name"] = f"{TroubleShooting.model_fields['software_name'].description},可选值:{', '.join([name.value for name in SoftwareName if name not in [SoftwareName.UNKNOWN, SoftwareName.ALIASES]])}"
missing_slots["software_name"] = f"{SoftwareTroubleShootingSlots.model_fields['software_name'].description},可选值:{', '.join([name.value for name in SoftwareName if name not in [SoftwareName.UNKNOWN, SoftwareName.ALIASES]])}"
if not self.function_name:
missing_slots["function_name"] = TroubleShooting.model_fields["function_name"].description
missing_slots["function_name"] = SoftwareTroubleShootingSlots.model_fields["function_name"].description
if not self.error_message:
missing_slots["error_message"] = TroubleShooting.model_fields["error_message"].description
missing_slots["error_message"] = SoftwareTroubleShootingSlots.model_fields["error_message"].description
return len(missing_slots) == 0, missing_slots
# 2. 业务问题
# 2.1 专业咨询
class ProfessionalConsulting(BaseModel):
scene_subject: str = Field(description="场景主体")
business_scene: str = Field(description="业务场景描述")
software_name: Optional[SoftwareName] = Field(None, description="软件名称")
class ProfessionalConsultingSlots(SlotBase):
scene_subject: str = Field(default="", description="场景主体")
business_scene: str = Field(default="", description="业务场景描述")
software_name: Optional[str] = Field(default="", description="软件名称")
def check_required_slots(self) -> Tuple[bool, Dict[str, str]]:
"""检查必填槽位是否都存在"""
missing_slots = {}
if not self.scene_subject:
missing_slots["scene_subject"] = ProfessionalConsulting.model_fields["scene_subject"].description
missing_slots["scene_subject"] = ProfessionalConsultingSlots.model_fields["scene_subject"].description
if not self.business_scene:
missing_slots["business_scene"] = ProfessionalConsulting.model_fields["business_scene"].description
missing_slots["business_scene"] = ProfessionalConsultingSlots.model_fields["business_scene"].description
return len(missing_slots) == 0, missing_slots
# 2.2 数据问题
class DataProblem(BaseModel):
expense_type: str = Field(description="费用类型")
operation_purpose: str = Field(description="操作目的")
software_name: Optional[SoftwareName] = Field(None, description="软件名称")
project_type: Optional[str] = Field(None, description="工程类型")
class DataProblemSlots(SlotBase):
expense_type: str = Field(default="", description="费用类型")
operation_purpose: str = Field(default="", description="操作目的")
software_name: Optional[str] = Field(default="", description="软件名称")
project_type: Optional[str] = Field(default="", description="工程类型")
def check_required_slots(self) -> Tuple[bool, Dict[str, str]]:
"""检查必填槽位是否都存在"""
missing_slots = {}
if not self.expense_type:
missing_slots["expense_type"] = DataProblem.model_fields["expense_type"].description
missing_slots["expense_type"] = DataProblemSlots.model_fields["expense_type"].description
if not self.operation_purpose:
missing_slots["operation_purpose"] = DataProblem.model_fields["operation_purpose"].description
missing_slots["operation_purpose"] = DataProblemSlots.model_fields["operation_purpose"].description
return len(missing_slots) == 0, missing_slots
# 3. 安装下载注册
# 3.1 后缀名咨询
class FileExtensionConsulting(BaseModel):
file_extension: str = Field(description="文件后缀名")
operation_purpose: str = Field(description="操作目的")
file_source: Optional[str] = Field(None, description="文件来源场景")
related_software: Optional[str] = Field(None, description="相关软件名称")
class FileExtensionConsultingSlots(SlotBase):
file_extension: str = Field(default="", description="文件后缀名")
operation_purpose: str = Field(default="", description="操作目的(了解对应软件,对应工程)")
file_source: Optional[str] = Field(default="", description="文件来源场景")
related_software: Optional[str] = Field(default="", description="相关软件名称")
def check_required_slots(self) -> Tuple[bool, Dict[str, str]]:
"""检查必填槽位是否都存在"""
missing_slots = {}
if not self.file_extension:
missing_slots["file_extension"] = FileExtensionConsulting.model_fields["file_extension"].description
missing_slots["file_extension"] = FileExtensionConsultingSlots.model_fields["file_extension"].description
if not self.operation_purpose:
missing_slots["operation_purpose"] = FileExtensionConsulting.model_fields["operation_purpose"].description
missing_slots["operation_purpose"] = FileExtensionConsultingSlots.model_fields["operation_purpose"].description
return len(missing_slots) == 0, missing_slots
# 3.2 软件锁类
class SoftwareLock(BaseModel):
lock_type: str = Field(description="锁类型")
operation_purpose: str = Field(description="操作目的")
lock_number: Optional[str] = Field(None, description="软件锁编号/注册号")
class SoftwareLockSlots(SlotBase):
lock_type: str = Field(default="", description="锁类型")
operation_purpose: str = Field(default="", description="操作目的")
lock_number: Optional[str] = Field(default="", description="软件锁编号/注册号")
def check_required_slots(self) -> Tuple[bool, Dict[str, str]]:
"""检查必填槽位是否都存在"""
missing_slots = {}
if not self.lock_type:
missing_slots["lock_type"] = SoftwareLock.model_fields["lock_type"].description
missing_slots["lock_type"] = SoftwareLockSlots.model_fields["lock_type"].description
if not self.operation_purpose:
missing_slots["operation_purpose"] = SoftwareLock.model_fields["operation_purpose"].description
missing_slots["operation_purpose"] = SoftwareLockSlots.model_fields["operation_purpose"].description
return len(missing_slots) == 0, missing_slots
# 3.3 安装下载类
class InstallationDownload(BaseModel):
class InstallationDownloadSlots(SlotBase):
software_name: str = Field(description="软件/插件名称,与file_name二选一")
file_name: str = Field(description="文件名,与software_name二选一")
operation_stage: str = Field(description="操作阶段")
os_version: Optional[str] = Field(None, description="操作系统版本")
package_source: Optional[str] = Field(None, description="安装包来源/版本号")
software_name: str = Field(default="", description="软件/插件名称,与file_name二选一")
file_name: str = Field(default="", description="文件名,与software_name二选一")
operation_stage: str = Field(default="", description="操作阶段(下载、安装等)")
os_version: Optional[str] = Field(default="", description="操作系统版本")
package_source: Optional[str] = Field(default="", description="安装包来源/版本号")
def check_required_slots(self) -> Tuple[bool, Dict[str, str]]:
"""检查必填槽位是否都存在"""
missing_slots = {}
if not self.software_name and not self.file_name:
missing_slots["software_name"] = f"{InstallationDownload.model_fields['software_name'].description},"
missing_slots["software_name"] = f"{InstallationDownloadSlots.model_fields['software_name'].description},"
f"可选值:{', '.join([name.value for name in SoftwareName if name not in [SoftwareName.UNKNOWN, SoftwareName.ALIASES]])}"
missing_slots["file_name"] = InstallationDownload.model_fields["file_name"].description
missing_slots["file_name"] = InstallationDownloadSlots.model_fields["file_name"].description
if not self.operation_stage:
missing_slots["operation_stage"] = InstallationDownload.model_fields["operation_stage"].description
missing_slots["operation_stage"] = InstallationDownloadSlots.model_fields["operation_stage"].description
return len(missing_slots) == 0, missing_slots
# 3.4 问题排查类
class ProblemDiagnosis(BaseModel):
error_message: str = Field(description="报错信息/异常现象")
software_name: Optional[SoftwareName] = Field(None, description="软件名称,只能从给定的范围中取值")
os_version: Optional[str] = Field(None, description="操作系统版本")
class ProblemDiagnosisSlots(SlotBase):
error_message: str = Field(default="", description="报错信息/异常现象")
software_name: Optional[str] = Field(default="", description="软件名称")
os_version: Optional[str] = Field(default="", description="操作系统版本")
def check_required_slots(self) -> Tuple[bool, Dict[str, str]]:
"""检查必填槽位是否都存在"""
missing_slots = {}
if not self.error_message:
missing_slots["error_message"] = ProblemDiagnosis.model_fields["error_message"].description
missing_slots["error_message"] = ProblemDiagnosisSlots.model_fields["error_message"].description
return len(missing_slots) == 0, missing_slots
class OtherSlots(SlotBase):
"""其他类型槽位"""
content_type: str = Field(default="", description="内容类型(必填)")
intent: Optional[str] = Field(default="", description="用户意图(选填)")
def check_required_slots(self) -> Tuple[bool, Dict[str, str]]:
missing_slots = {}
if not self.content_type:
missing_slots["content_type"] = OtherSlots.model_fields["content_type"].description
return len(missing_slots) == 0, missing_slots
class IntentAndSlotResult(BaseModel):
"""意图槽位填充结果"""
classification: Classification
slots: Union[
SoftwareFunctionSlots,
SoftwareTroubleShootingSlots,
ProfessionalConsultingSlots,
DataProblemSlots,
FileExtensionConsultingSlots,
SoftwareLockSlots,
InstallationDownloadSlots,
ProblemDiagnosisSlots,
OtherSlots
]
+230 -104
View File
@@ -7,18 +7,27 @@ Date: 2025-05-13
Description: 意图分类、改写核心逻辑
"""
import logging
import os
from langchain_openai import ChatOpenAI
from langchain.output_parsers import PydanticOutputParser
import json
from typing import List, Tuple, Dict, Any, Optional, Union
from typing import List, Tuple, Dict, Any, Optional
import re
from .PromptTemplates import classification_prompt, query_rewrite_prompt, extract_nouns_prompt, classification_info, slot_filling_prompt
import jieba
from .PromptTemplates import (classification_prompt, query_rewrite_prompt,
extract_nouns_prompt, classification_info,
slot_filling_prompt)
from .Multi_PromptTemplates import (
intent_and_slot_prompt, output_example,
generate_slot_mapping_doc, query_rewrite_prompt_pro,
)
from .DataModels import (
Classification, QueryRewrite, Term, TermList,
SoftwareFunction, TroubleShooting, ProfessionalConsulting,
DataProblem, FileExtensionConsulting, SoftwareLock,
InstallationDownload, ProblemDiagnosis
SoftwareFunctionSlots, SoftwareTroubleShootingSlots, ProfessionalConsultingSlots,
DataProblemSlots, FileExtensionConsultingSlots, SoftwareLockSlots,
InstallationDownloadSlots, ProblemDiagnosisSlots, OtherSlots, IntentAndSlotResult
)
from .ProfessionalNounVector import ProfessionalNounRetriever
from rag2_0.tool.ModelTool import XinferenceReRankerModel, OpenAiLLM, SiliconFlowReRankerModel
@@ -52,22 +61,13 @@ class IntentRecognizer:
if base_url:
llm_params["base_url"] = base_url
self.llm = OpenAiLLM(**llm_params)
# 准备分类解析器
self.classification_parser = PydanticOutputParser(pydantic_object=Classification)
# 准备问题改写解析器
self.query_rewrite_parser = PydanticOutputParser(pydantic_object=QueryRewrite)
# 准备术语列表解析器
self.terms_list_parser = PydanticOutputParser(pydantic_object=TermList)
self._llm = OpenAiLLM(**llm_params)
# 加载suffix关键词
self.suffix_keywords = self._load_suffix_keywords()
self._suffix_keywords = self._load_suffix_keywords()
# 初始化向量检索器
self.noun_retriever = ProfessionalNounRetriever(api_key=api_key, index_dir=vector_index_dir)
self._noun_retriever = ProfessionalNounRetriever(api_key=api_key, index_dir=vector_index_dir)
def _load_suffix_keywords(self, filepath: str = None) -> List[str]:
"""
@@ -95,7 +95,7 @@ class IntentRecognizer:
except Exception as e:
raise RuntimeError(f"加载后缀关键词失败: {e}") from e
def classify_intent(self, query: str, keywords: TermList) -> Classification:
def _classify_intent(self, query: str) -> Classification:
"""
对用户输入进行意图分类
@@ -106,49 +106,85 @@ class IntentRecognizer:
Returns:
分类结果
"""
classification_parser = PydanticOutputParser(pydantic_object=Classification)
formatted_prompt = classification_prompt.format(user_input=query,
classification_info=classification_info,
output_format=self.classification_parser.get_format_instructions())
# 将关键词列表转换为JSON字符串
terms_dict = [term.model_dump() for term in keywords.terms]
keywords_str = json.dumps(terms_dict, ensure_ascii=False)
formatted_prompt = formatted_prompt.replace("{keywords}", keywords_str)
output_format=classification_parser.get_format_instructions())
# 调用LLM
response = self.llm.invoke(formatted_prompt, False)
response = self._llm.invoke(formatted_prompt, False)
# 解析输出
try:
# 尝试直接解析JSON响应
parsed_output = self.classification_parser.parse(response.content.strip())
parsed_output = classification_parser.parse(response.content.strip())
return parsed_output
except Exception as e:
raise RuntimeError(f"解析分类结果时出错: {e}") from e
def extract_keywords_with_llm(self, query: str) -> List[Term]:
def _tokenize_with_jieba(self, query: str) -> List[str]:
"""
使用LLM从用户查询中提取专业关键
使用jieba分词器对查询进行分
Args:
query: 用户查询
Returns:
分词后的词语列表
"""
# 使用jieba进行分词
seg_list = jieba.cut(query, cut_all=False)
# 过滤掉停用词和标点符号
filtered_tokens = []
for token in seg_list:
# 过滤掉空格和标点符号
if token.strip() and not re.match(r'^[^\w\s]$', token):
filtered_tokens.append(token)
return filtered_tokens
def _extract_keywords_with_llm(self, query: str, use_jieba: bool = False) -> List[Term]:
"""
使用LLM从用户查询中提取专业关键词
Args:
query: 用户查询
use_jieba: 是否使用jieba分词辅助提取关键词
Returns:
提取的术语列表
"""
# 准备提示词
formatted_prompt = extract_nouns_prompt.replace("{content}", query)
formatted_prompt = formatted_prompt.replace("{output_format}", self.terms_list_parser.get_format_instructions())
# 调用LLM
response = self.llm.invoke(formatted_prompt, False)
try:
# 尝试使用Pydantic解析器解析TermList
parsed_output = self.terms_list_parser.parse(response.content)
return parsed_output.terms
# 如果使用jieba分词
if use_jieba:
# 先使用jieba分词
tokens = self._tokenize_with_jieba(query)
# 构建术语列表
terms = []
for token in tokens:
if len(token) > 1: # 过滤掉单字词
terms.append(Term(name=token, synonymous=[], description=""))
return terms
else:
# 使用LLM提取关键词
# 准备提示词
formatted_prompt = extract_nouns_prompt.replace("{content}", query)
terms_list_parser = PydanticOutputParser(pydantic_object=TermList)
formatted_prompt = formatted_prompt.replace("{output_format}", terms_list_parser.get_format_instructions())
# 调用LLM
response = self._llm.invoke(formatted_prompt, False)
# 尝试使用Pydantic解析器解析TermList
parsed_output = terms_list_parser.parse(response.content)
return parsed_output.terms
except Exception as e:
raise RuntimeError(f"无法解析LLM关键词提取响应: {e}") from e
def rerank_matched_terms(self, query_key: str, matched_terms: set, top_k: int = 2) -> List[Term]:
def _rerank_matched_terms(self, query_key: str, matched_terms: set, top_k: int = 2) -> List[Term]:
"""
对召回的专业术语进行重排序,按与用户查询的相关性排序
@@ -182,31 +218,32 @@ class IntentRecognizer:
except Exception as e:
raise RuntimeError(f"SiliconFlowReRankerModel重排失败:{e}") from e
def match_keywords(self, query: str) -> Tuple[TermList, List[str]]:
def _match_keywords(self, query: str, use_jieba: bool = False) -> Tuple[TermList, List[str]]:
"""
从用户问题中匹配关键词,结合LLM提取和向量检索
Args:
query: 用户问题
use_jieba: 是否使用jieba分词辅助提取关键词
Returns:
匹配到的关键词列表
"""
query_keys=[]
# 步骤2: 使用LLM提取查询中的关键词
# 步骤1: 使用LLM提取查询中的关键词
try:
extracted_terms = self.extract_keywords_with_llm(query)
extracted_terms = self._extract_keywords_with_llm(query, use_jieba)
for term in extracted_terms:
query_keys.append(term.name)
except Exception as e:
raise RuntimeError(f"LLM关键词提取失败: {e}") from e
matched_terms = [] # 存储匹配到的Term对象
# 步骤3: 使用向量检索找到相似的专业名词
# 步骤2: 使用向量检索找到相似的专业名词
try:
# 对matched_terms中的每个关键字进行向量检索
for current_key in query_keys:
vector_results = self.noun_retriever.query(current_key, top_k=5, use_intersection=False)
vector_results = self._noun_retriever.query(current_key, top_k=5, use_intersection=False)
current_key_terms = set()
# 添加向量检索结果
for result in vector_results:
@@ -218,8 +255,9 @@ class IntentRecognizer:
description=result.get('description', '')
)
current_key_terms.add(term)
reranked_terms = self.rerank_matched_terms(current_key, current_key_terms)
matched_terms.extend(reranked_terms)
if len(current_key_terms) > 0:
reranked_terms = self._rerank_matched_terms(current_key, current_key_terms)
matched_terms.extend(reranked_terms)
except Exception as e:
raise RuntimeError(f"向量检索关键词时出错: {e}") from e
@@ -228,7 +266,7 @@ class IntentRecognizer:
term_list = TermList(terms=list(matched_terms))
return term_list, query_keys
def rewrite_query(self, query: str, keywords: TermList) -> QueryRewrite:
def _rewrite_query(self, query: str, keywords: TermList, chat_history: List[Dict[str, str]] = None, context: str = "") -> QueryRewrite:
"""
对用户问题进行改写
@@ -242,23 +280,28 @@ class IntentRecognizer:
# 准备问题改写提示
terms_dict = [term.model_dump(exclude={"description"}) for term in keywords.terms]
keywords_str = json.dumps(terms_dict, ensure_ascii=False)
formatted_prompt = query_rewrite_prompt.format(query=query,
output_format=self.query_rewrite_parser.get_format_instructions(),
keywords=keywords_str)
query_rewrite_parser = PydanticOutputParser(pydantic_object=QueryRewrite)
# formatted_prompt = query_rewrite_prompt.format(query=query,
# output_format=query_rewrite_parser.get_format_instructions(),
# keywords=keywords_str)
formatted_prompt = query_rewrite_prompt_pro.format(query=query,
output_format=query_rewrite_parser.get_format_instructions(),
keywords=keywords_str,
chat_history=chat_history,
context=context)
# 调用LLM
response = self.llm.invoke(formatted_prompt, False)
response = self._llm.invoke(formatted_prompt, False)
# 解析输出
try:
# 尝试直接解析JSON响应
parsed_output = self.query_rewrite_parser.parse(response.content)
parsed_output = query_rewrite_parser.parse(response.content)
return parsed_output
except Exception as e:
raise RuntimeError(f"解析问题改写结果时出错: {e}") from e
def judge_define_suffix(self, input_str: str) -> Tuple[bool, List[str]]:
def _judge_define_suffix(self, input_str: str) -> Tuple[bool, List[str]]:
"""
判断输入字符串是否包含定义的后缀,并返回所有匹配到的后缀名列表
@@ -270,7 +313,7 @@ class IntentRecognizer:
"""
# 构建正则表达式模式,匹配大小写不敏感且前面可能带有.
pattern = r'(?:\.?)(' + '|'.join(re.escape(field.get('name')) for field in self.suffix_keywords) + r')'
pattern = r'(?:\.?)(' + '|'.join(re.escape(field.get('name')) for field in self._suffix_keywords) + r')'
# 使用 re.IGNORECASE 标志来忽略大小写,findall找到所有匹配
matches = re.finditer(pattern, input_str, re.IGNORECASE)
@@ -278,23 +321,30 @@ class IntentRecognizer:
return bool(matched_suffixes), matched_suffixes
def process_query(self, query: str) -> Tuple[Classification, TermList, QueryRewrite, List[str]]:
def process_query(self, query: str, conversation_context: str = "",
chat_history: List[Dict[str, str]] = None,
previous_slots: Dict[str, Any] = None,
use_jieba: bool = False) -> Dict[str, Any]:
"""
处理用户问题的完整流程
Args:
query: 用户原始问题
conversation_context: 会话背景信息
chat_history: 历史对话记录,格式为[{"user": "content"}, {"assistant": "content"}]
previous_slots: 历史槽位信息
use_jieba: 是否使用jieba分词辅助提取关键词
Returns:
(意图分类结果, 匹配的关键词列表, 问题改写结果)的元组
包含分类、关键词、改写和槽位填充结果的字典
"""
# 是否是扩展名
# is_suffix, matched_suffixes = self.judge_define_suffix(query)
# is_suffix, matched_suffixes = self._judge_define_suffix(query)
# if is_suffix:
# # 将所有匹配到的后缀名作为Term添加到结果中
# suffix_terms = []
# for suffix in matched_suffixes:
# term_dict = next((item for item in self.suffix_keywords if item['name'].lower() == suffix.lower()), None)
# term_dict = next((item for item in self._suffix_keywords if item['name'].lower() == suffix.lower()), None)
# if term_dict:
# suffix_term = Term(
# name=term_dict.get('name'),
@@ -306,26 +356,41 @@ class IntentRecognizer:
# return Classification(vertical_classification="安装下载", sub_classification="查询"), TermList(terms=suffix_terms), QueryRewrite(rewrite=query), matched_suffixes
# 步骤1: 匹配关键词
keywords_terms, query_keys = self.match_keywords(query)
keywords_terms, query_keys = self._match_keywords(query, use_jieba)
# 步骤2: 问题改写
rewrite = self.rewrite_query(
rewrite = self._rewrite_query(
query=query,
keywords=keywords_terms
keywords=keywords_terms,
chat_history=chat_history,
context=conversation_context
)
# 步骤3: 进行意图识别和槽位填充
result = self._process_intent_and_slot(query, conversation_context, chat_history, previous_slots)
result.update({"keywords": keywords_terms.model_dump(),
"rewrite": rewrite.model_dump(),
"query_keys": query_keys})
return result
# # 步骤3: 进行意图分类
# classification = self._classify_intent(query)
# 步骤3: 进行意图分类
classification = self.classify_intent(query, keywords_terms)
if classification.vertical_classification == "其他" or classification.sub_classification == "其他":
return classification, TermList(terms=[]), QueryRewrite(rewrite=query), []
# # 步骤4: 进行槽位填充
# # 如果是有效分类,进行槽位填充
# slot_filling_result = {}
# if classification.vertical_classification not in ["其他", "闲聊"] and classification.sub_classification not in ["其他", "闲聊"]:
# slot_filling_result = self._fill_slots(rewrite.rewrite, classification)
# return {
# "classification": classification.model_dump(),
# "keywords": keywords_terms.model_dump(),
# "rewrite": rewrite.model_dump(),
# "query_keys": query_keys,
# "slot_filling": slot_filling_result
# }
if classification.vertical_classification == "闲聊" or classification.sub_classification == "闲聊":
return classification, TermList(terms=[]), QueryRewrite(rewrite=query),[]
# rewrite = QueryRewrite(rewrite=query)
return classification, keywords_terms, rewrite, query_keys
def fill_slots(self, query: str, classification: Classification) -> Dict[str, Any]:
def _fill_slots(self, query: str, classification: Classification) -> Dict[str, Any]:
"""
根据分类结果对问题进行槽位填充
@@ -340,7 +405,7 @@ class IntentRecognizer:
# 根据分类结果选择对应的数据模型
slot_model = self._get_slot_model(classification)
if not slot_model:
return {"error": "未找到匹配的槽位模型"}
raise RuntimeError("未找到匹配的槽位模型")
# 使用LLM进行槽位填充
filled_slots = self._fill_slots_with_llm(query, classification, slot_model)
@@ -356,7 +421,7 @@ class IntentRecognizer:
def _get_slot_model(self, classification: Classification) -> Optional[type]:
"""
根据分类结果获取对应的槽位模型类
根据分类结果获取对应的槽位模型类,用于统一提示词处理
Args:
classification: 意图分类结果
@@ -367,31 +432,33 @@ class IntentRecognizer:
# 软件问题
if classification.vertical_classification == "软件问题":
if classification.sub_classification == "软件功能":
return SoftwareFunction
return SoftwareFunctionSlots
elif classification.sub_classification == "故障排查":
return TroubleShooting
return SoftwareTroubleShootingSlots
# 业务问题
elif classification.vertical_classification == "业务问题":
if classification.sub_classification == "专业咨询":
return ProfessionalConsulting
return ProfessionalConsultingSlots
elif classification.sub_classification == "数据问题":
return DataProblem
return DataProblemSlots
# 安装下载注册
elif classification.vertical_classification == "安装下载注册":
if classification.sub_classification == "后缀名咨询":
return FileExtensionConsulting
return FileExtensionConsultingSlots
elif classification.sub_classification == "软件锁类":
return SoftwareLock
return SoftwareLockSlots
elif classification.sub_classification == "安装下载类":
return InstallationDownload
return InstallationDownloadSlots
elif classification.sub_classification == "问题排查类":
return ProblemDiagnosis
return ProblemDiagnosisSlots
# 其他
elif classification.vertical_classification == "其他":
return OtherSlots
return None
count=1
def _fill_slots_with_llm(self, query: str, classification: Classification, slot_model_class: type) -> Any:
"""
@@ -416,7 +483,7 @@ class IntentRecognizer:
)
# 调用LLM
response = self.llm.invoke(formatted_prompt, False)
response = self._llm.invoke(formatted_prompt, False)
try:
# 尝试解析LLM响应
@@ -426,29 +493,88 @@ class IntentRecognizer:
# 如果解析失败,创建一个空的模型实例
empty_instance = slot_model_class()
return empty_instance
def process_query_with_slots(self, query: str) -> Dict[str, Any]:
def _process_intent_and_slot(self, user_input: str, conversation_context: str = "",
chat_history: List[Dict[str, str]] = None,
previous_slots: Dict[str, Any] = None) -> Dict[str, Any]:
"""
处理用户问题的完整流程,包括槽位填充
使用统一提示词同时进行意图识别和槽位填充
Args:
query: 用户原始问题
user_input: 当前用户输入
conversation_context: 会话背景信息
chat_history: 历史对话记录,格式为[{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}]
previous_slots: 历史槽位信息
Returns:
包含分类、关键词、改写和槽位填充结果的字典
包含意图分类和槽位填充结果的字典
"""
# 执行基本处理流程
classification, keywords, rewrite, query_keys = self.process_query(query)
# 初始化默认值
if chat_history is None:
chat_history = []
# 如果是有效分类,进行槽位填充
slot_filling_result = {}
if classification.vertical_classification not in ["其他", "闲聊"] and classification.sub_classification not in ["其他", "闲聊"]:
slot_filling_result = self.fill_slots(rewrite.rewrite, classification)
if previous_slots is None:
previous_slots = {}
# 生成槽位映射文档
slot_mapping_doc = generate_slot_mapping_doc()
return {
"classification": classification.model_dump(),
"keywords": keywords.model_dump(),
"rewrite": rewrite.model_dump(),
"query_keys": query_keys,
"slot_filling": slot_filling_result
}
# 准备提示词
parser = PydanticOutputParser(pydantic_object=IntentAndSlotResult)
formatted_prompt = intent_and_slot_prompt.format(
conversation_context=conversation_context,
chat_history=json.dumps(chat_history, ensure_ascii=False),
previous_slots=json.dumps(previous_slots, ensure_ascii=False),
user_input=user_input,
slot_mapping_doc=slot_mapping_doc,
output_format=parser.get_format_instructions(),
classification_info=classification_info
)
# 调用LLM
response = self._llm.invoke(formatted_prompt + output_example, False)
try:
# 解析LLM响应为JSON
result_json = parser.parse(response.content)
classification=result_json.classification
slot_filling=result_json.slots
is_complete, missing_slots = slot_filling.check_required_slots()
expected_slot_model = self._get_slot_model(classification)
# 添加容错处理,发生概率较低,但仍需处理
if expected_slot_model is None:
# 添加容错处理,应对LLM返回错误分类信息,一级分类跟二级分类错乱
# 重新分类
classification = self._classify_intent(user_input)
fill_slots = self._fill_slots(user_input, classification)
result = {
"classification": classification.model_dump(),
"slot_filling": fill_slots
}
logging.warning(f"重新分类与槽点填充")
return result
elif expected_slot_model.__name__ != type(slot_filling).__name__:
# 添加容错处理,应对LLM槽位与分类不匹配。重新填充槽位
slot_filling = self._fill_slots(user_input, classification)
result = {
"classification": classification.model_dump(),
"slot_filling": slot_filling
}
logging.warning(f"重新填充槽点")
return result
# 构建最终结果
result = {
"classification": classification.model_dump(),
"slot_filling": {
"is_complete": is_complete,
"missing_slots": missing_slots,
"filled_data": slot_filling.model_dump()
}
}
return result
except Exception as e:
logging.error(f"process_intent_and_slot error:{e}")
raise RuntimeError(f"process_intent_and_slot error:{e}") from e
@@ -0,0 +1,357 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
File: Multi_PromptTemplates.py
Author: oyyz
Date: 2025-06-13
Description: 多轮对话下意图分类、改写核心提示词
"""
# 首版重构提示词
query_rewrite_prompt_pro_old="""
# 电力造价专业问答优化工程师(升级版)
你是一名电力造价专业问答优化工程师,负责结合历史对话背景和专业术语库,将用户的原始问题进行规范化重构,以提升知识库检索准确率。
## 核心任务
基于历史对话上下文和专业术语库,将用户的原始问题进行规范化重构,提高知识库检索的准确性和专业性,同时保持对话的连贯性和语境相关性。
## 处理流程
### 第一阶段:输入解析
1. 解析基础信息
- 原始问题(需保留核心语义){query}
- 关键词集合:{keywords}
- 历史对话记录:{chat_history}
- 当前聊天背景:{context}
2. 背景分析
- 识别历史对话中的关键主题和专业领域
- 提取上下文中的隐含信息(如软件版本、地区、具体场景等)
- 分析用户的提问模式和专业水平
### 第二阶段:上下文匹配分析
**背景匹配规则:**
1. 检查当前问题是否与历史对话存在关联性
2. 识别历史对话中提到的关键信息:
- 软件版本/系统(如Z1、D3等)
- 地区定额(如西藏、山东等)
- 具体功能模块
- 用户操作习惯
**术语匹配规则:**
1. 检查原始问题中是否包含关键词集合中的`name`字段或`synonymous`字段中的任何词汇
2. 结合历史对话,识别可能的隐含专业术语
3. 统计匹配的术语数量
4. 判断执行路径:
- 匹配术语 ≥ 1个 或 存在明显上下文关联 → 执行重构流程
- 匹配术语 = 0个 且 无明显上下文关联 → 直接输出原始问题
### 第三阶段:问题重构
**重构原则(按优先级排序):**
1. **语义保真**:严格保持原问题的核心意图和诉求
2. **上下文继承**
- 补充历史对话中的隐含信息(如软件名称、版本、地区等)
- 保持对话的连贯性和逻辑性
- 避免重复已确认的背景信息
3. **术语规范**
- 将匹配到的同义词替换为对应的标准术语(name字段)
- 对在关键词中的标准术语使用【】进行标记
- 保留在原问题中未在关键词库中的专业术语、限定词和修饰词
4. **结构优化**
- 保持原问题的语态特征5W2H
- 保持主谓宾结构清晰
- 保留时间、版本等限定条件
**术语处理规则:**
- 优先级1:基于历史对话补充缺失的背景信息
- 优先级2:保留原问题中的专业术语、限定词和修饰词(即使不在关键词库中)
- 优先级3:将同义词替换为标准术语并用【】标记
- 优先级4:对原问题中已存在的标准术语添加【】标记
**上下文处理策略:**
- 如果当前问题与历史对话高度相关,适当补充背景信息
- 如果用户使用代词(如"这个""那个"),尝试结合历史对话明确指代
- 如果历史对话中已确定软件或系统,在当前问题中适当体现
# 输出规范
{output_format}
# 示范案例库
▶ 案例1(有效匹配 + 上下文继承)
历史对话:用户之前询问过"西藏定额升级的问题"
输入:
原始问题:怎么把旧版工程转到Z1新版
关键词:【'老版本定额升级', '批量设置定额', '西藏造价软件Z1'
输出:
{{"rewrite":"【西藏造价软件Z1】如何执行【老版本定额升级】操作?"}}
▶ 案例2(无效匹配 + 无上下文关联)
历史对话:无相关内容
输入:
原始问题:程序界面文字显示过小如何处理?
关键词:【'定额升级', '工程批量导入'
输出:
{{"rewrite":"程序界面文字显示过小如何处理?"}}
▶ 案例3(部分匹配 + 上下文补充)
历史对话:用户之前询问过"D3软件的功能"
输入:
原始问题:能导出清单的计算公式吗?
关键词:【'配网工程计价通D3软件', '计算式'
输出:
{{"rewrite":"【配网工程计价通D3软件】能导出清单的【计算式】吗?"}}
▶ 案例4(代词替换 + 上下文解析)
历史对话:用户刚询问过"山东定额的问题"
输入:
原始问题:这个定额怎么批量导入?
关键词:【'批量导入定额', '山东定额'
输出:
{{"rewrite":"【山东定额】如何进行【批量导入定额】操作?"}}
## 质量检查清单
执行前请确认:
- [ ] 是否保持了原问题的核心诉求?
- [ ] 是否合理利用了历史对话中的背景信息?
- [ ] 是否正确执行了同义词替换?
- [ ] 是否保留了原问题中的专业术语和限定条件?
- [ ] 是否正确使用了【】标记?
- [ ] 重构后的问题是否自然流畅?
- [ ] 是否保持了对话的连贯性?
- [ ] 是否避免了过度补充不必要的信息?
"""
query_rewrite_prompt_pro="""
# 电力造价问答优化工程师(精简版)
**角色**:基于历史对话和专业术语库重构问题,提升知识库检索准确率。
## 核心原则
1. 语义保真 → 保持问题核心意图
2. 术语规范 → 同义词转标准词并【】标记
3. 背景继承 → 补充历史对话的隐含信息
## 处理流程
### 一、输入解析
- 原始问题(需保留核心语义){query}
- 关键词集合:{keywords}
- 历史对话记录:
<history>
{chat_history}
</history>
- 当前聊天背景:
<conversation_background>
{context}
</conversation_background>
### 二、重构决策树
```mermaid
graph TD
A[输入问题] --> B{{匹配关键词或上下文?}}
B -- 是 --> C[执行重构]
B -- 否 --> D[直接输出原始问题]
C --> E[补充缺失背景]
E --> F[同义词替换+【】标记]
F --> G[保留原生专业术语]
```
### 三、重构优先级
1. **背景补充**
- 历史对话中确定的软件/地区必须继承(例:"这软件""【配网工程D3】"
2. **术语处理**
- 同义词转标准词 → 批量设置定额
- 存在即标记 → 【计算式】
3. **结构优化**
- 保持原问题的5W2H特征
- 明确指代关系("该功能""【批量导入】功能"
## 输出规范
{output_format}
## 典型案例
| 场景 | 输入问题 | 输出结果 |
|---------------------|-----------------------------------|------------------------------------------|
| 强上下文关联 | “怎么升级旧版工程” | {{"rewrite":"【西藏Z1】如何执行【老版本定额升级】?"}} |
| 弱术语匹配 | “界面文字太小怎么办” | 原样输出 |
| 代词+背景继承 | “这个定额如何导入” | {{"rewrite":"【山东定额】如何执行【批量导入定额】?"}}|
## 质量自检
- [] 核心诉求是否保留?
- [] 背景信息是否合理补充?
- [] 术语标记是否完整【】?
- [] 语句是否自然流畅?
- [] 避免过度补充无关信息
"""
intent_and_slot_prompt = """
# 电力造价软件意图分类与槽位填充统一提示词
你是一个专业的电力造价领域智能助手,负责对用户输入进行意图分类识别和关键信息槽位填充。
{classification_info}
{slot_mapping_doc}
## 【软件名称规范】
支持的软件名称及其别名:
- **配网工程计价通D3软件**:别名包括配网D3、D3软件、配网工程软件等
- **新型储能电站建设计价通C1软件**:别名包括储能C1、C1软件、储能电站软件、储能软件等
- **西藏电力工程计价通Z1软件**:别名包括西藏Z1、Z1软件、西藏电力软件等
- **技改检修工程计价通T1软件**:别名包括技改T1、T1软件、技改检修软件等
- **技改检修清单计价通T1软件**:别名包括技改清单T1、T1清单软件、技改检修清单软件等
- **主网电力建设计价通软件**:别名包括主网软件、电力建设软件、主网建设软件、博微电力建设计价通等
## 【任务要求】
1. **会话理解**:综合考虑会话背景、历史对话和之前的槽位信息来理解当前用户输入
2. **意图分类**:准确识别用户输入属于哪个垂直领域和子分类
3. **槽位填充**:从当前用户问题中提取关键信息,并结合历史槽位信息进行补充完善
4. **信息融合**
- 优先使用当前用户输入中的明确信息
- 当前输入缺失但历史槽位存在的信息,可适当继承
- 历史对话中的上下文信息有助于理解当前输入的真实意图
5. **槽位处理**
- 对于必填槽位,必须尽力从当前输入和历史信息中提取
- 对于选填槽位,如果能从当前输入或历史信息中提取则填写
- 如果当前输入与历史信息存在冲突,以当前输入为准
6. **输出格式**:只输出符合格式的JSON数据,不要有任何额外的解释
## 【会话背景信息】
{conversation_context}
## 【历史对话记录】
{chat_history}
## 【历史槽位信息】
{previous_slots}
## 【当前用户输入】
{user_input}
## 【输出格式】
{output_format}
"""
output_example="""
## 【综合分析示例】
**示例1:利用历史对话理解当前输入**
会话背景: 用户正在咨询软件使用问题
历史对话:
- 用户: "我在使用配网D3软件"
- 助手: "好的,请问您遇到什么问题?"
历史槽位:{"software_name": "配网工程计价通D3软件"}
当前用户输入: "新建工程按钮找不到"
输出:
```json
{
"classification": {
"vertical_classification": "软件问题",
"sub_classification": "软件功能"
},
"slot_filling": {
"software_name": "配网工程计价通D3软件",
"function_name": "新建工程",
"operation": "查找新建工程按钮位置",
"software_version": null,
"operation_steps": null
}
}
```
**示例2:继承和更新槽位信息**
会话背景: 用户遇到软件报错,正在进行故障排查
历史对话:
- 用户: "西藏Z1软件报错了"
- 助手: "请详细描述一下报错信息"
历史槽位: {"software_name": "西藏电力工程计价通Z1软件"}
当前用户输入: "提示找不到许可证,是在新建工程的时候"
输出:
```json
{
"classification": {
"vertical_classification": "软件问题",
"sub_classification": "故障排查"
},
"slot_filling": {
"software_name": "西藏电力工程计价通Z1软件",
"function_name": "新建工程",
"error_message": "提示找不到许可证",
"software_version": null,
"os_version": null,
"reproduction_steps": "新建工程时出现错误"
}
}
```
**示例3:信息冲突处理**
会话背景: 用户在多个软件间切换咨询
历史对话:
- 用户: "配网D3的费用计算"
- 助手: "好的,关于配网D3的费用计算..."
历史槽位: {"software_name": "配网工程计价通D3软件"}
当前用户输入: "不对,我说的是技改T1软件的材料费怎么算"
输出:
```json
{
"classification": {
"vertical_classification": "业务问题",
"sub_classification": "数据问题"
},
"slot_filling": {
"expense_type": "材料费",
"operation_purpose": "了解费用计算方法",
"software_name": "技改检修工程计价通T1软件",
"project_type": null
}
}
```
"""
def generate_slot_mapping_doc() -> str:
"""
生成分类与槽位模型对应关系的文档
Returns:
str: 格式化的文档字符串
"""
mapping = {
"软件问题": {
"软件功能": "SoftwareFunctionSlots",
"故障排查": "SoftwareTroubleShootingSlots"
},
"业务问题": {
"专业咨询": "ProfessionalConsultingSlots",
"数据问题": "DataProblemSlots"
},
"安装下载注册": {
"后缀名咨询": "FileExtensionConsultingSlots",
"软件锁类": "SoftwareLockSlots",
"安装下载类": "InstallationDownloadSlots",
"问题排查类": "ProblemDiagnosisSlots"
},
"其他": {
"其他": "OtherSlots"
}
}
doc = ["## 【分类与槽位模型对应关系】"]
for vertical, sub_classes in mapping.items():
doc.append(f"\n{vertical}:")
for sub_class, slot_model in sub_classes.items():
doc.append(f"- {sub_class} -> {slot_model}")
doc.append("\n## 【注意事项】")
doc.append("1. 分类与槽位模型必须严格对应")
doc.append("2. 每个分类只能使用其对应的槽位模型")
doc.append("3. 不允许混用不同分类的槽位模型")
return "\n".join(doc)
+1 -1
View File
@@ -118,7 +118,7 @@ class XinferenceReRankerModel:
return [{"document": item["document"]["text"], "score": item["relevance_score"], "index": item["index"]} for item in results["results"]]
except requests.exceptions.RequestException as e:
logging.error(f"重排序请求失败: {str(e)}")
logging.error(f"XinferenceReRankerModel重排序请求失败: {str(e)}")
return []