From 4386cfac41897ef87aafca994acf3979e9364dae Mon Sep 17 00:00:00 2001 From: ouyangyouzhang Date: Tue, 24 Jun 2025 15:03:01 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=84=8F=E5=9B=BE=E8=AF=86?= =?UTF-8?q?=E5=88=AB=E6=A8=A1=E5=9D=97=EF=BC=8C=E6=96=B0=E5=A2=9E=E6=96=87?= =?UTF-8?q?=E6=A1=A3=E7=9B=B8=E5=85=B3=E6=80=A7=E5=88=A4=E6=96=AD=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=EF=BC=8C=E6=9B=B4=E6=96=B0DifyQueryRetrieval=E7=B1=BB?= =?UTF-8?q?=E4=BB=A5=E6=94=AF=E6=8C=81=E5=A4=9A=E7=BA=BF=E7=A8=8B=E6=A3=80?= =?UTF-8?q?=E7=B4=A2=EF=BC=8C=E5=A2=9E=E5=BC=BA=E6=95=B0=E6=8D=AE=E6=A8=A1?= =?UTF-8?q?=E5=9E=8B=EF=BC=8C=E6=94=B9=E8=BF=9B=E6=97=A5=E5=BF=97=E8=AE=B0?= =?UTF-8?q?=E5=BD=95=EF=BC=8C=E8=B0=83=E6=95=B4Excel=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E9=AA=8C=E8=AF=81=E9=80=BB=E8=BE=91=EF=BC=8C=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=E5=A4=9A=E4=B8=AA=E6=8F=90=E7=A4=BA=E8=AF=8D=E6=A8=A1=E6=9D=BF?= =?UTF-8?q?=E4=BB=A5=E6=8F=90=E5=8D=87=E7=94=A8=E6=88=B7=E4=BD=93=E9=AA=8C?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/launch.json | 2 +- rag2_0/demo/intent_recognition_example.py | 162 +++++++--- rag2_0/demo/validate_excel_data_batch.py | 2 +- rag2_0/dify/DifyQueryRetrieval.py | 164 ++++++++++ rag2_0/intent_recognition/DataModels.py | 23 +- .../intent_recognition/IntentRecognition.py | 295 ++++++++++++++++-- .../Multi_PromptTemplates.py | 117 ------- rag2_0/intent_recognition/PromptTemplates.py | 163 +++++++++- 8 files changed, 737 insertions(+), 191 deletions(-) create mode 100644 rag2_0/dify/DifyQueryRetrieval.py diff --git a/.vscode/launch.json b/.vscode/launch.json index 282e1aa..ab1266d 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -10,7 +10,7 @@ "request": "launch", "program": "${file}", "console": "integratedTerminal", - "justMyCode": false + "justMyCode": true }, { "name": "IntentRecognition", diff --git a/rag2_0/demo/intent_recognition_example.py b/rag2_0/demo/intent_recognition_example.py index 1decac8..6895c70 100755 --- a/rag2_0/demo/intent_recognition_example.py +++ b/rag2_0/demo/intent_recognition_example.py @@ -16,13 +16,92 @@ from tqdm import tqdm import time import sys import argparse -from typing import List, Dict +from typing import List, Dict, Any, Optional +from langchain.output_parsers import PydanticOutputParser +from pydantic import BaseModel, Field sys.path.append(os.getcwd()) from rag2_0.intent_recognition import IntentRecognizer +from rag2_0.dify.DifyQueryRetrieval import DifyQueryRetrieval +from rag2_0.intent_recognition.DataModels import Classification +from rag2_0.tool.ModelTool import OpenAiLLM # 加载环境变量 load_dotenv() +dify_query_retrieval = DifyQueryRetrieval(api_key="dataset-skLjmPVonjHo119OWNf3kAmY", base_url="http://172.20.0.145/v1") +def is_retrieved_doc_relevant(query: str, retrieved_doc: List[Dict[str, Any]], api_key: str = None, base_url: str = None, model_name: str = None) -> Dict[str, Any]: + """ + 使用LLM判断检索出的文档是否与用户提问相关 + + Args: + query: 用户提问 + retrieved_doc: 检索出的文档列表 + api_key: API密钥,默认使用环境变量 + base_url: API基础URL,默认使用环境变量 + model_name: 模型名称,默认使用环境变量或默认模型 + + Returns: + 包含相关性判断结果的字典,包括is_relevant(布尔值)和explanation(解释) + """ + # 使用环境变量或参数值 + api_key = api_key or os.getenv("OPENAI_API_KEY") + base_url = base_url or os.getenv("OPENAI_API_BASE") + model_name = model_name or os.getenv("LLM_MODEL_NAME", "gpt-3.5-turbo") + + # 如果没有检索到文档,直接返回不相关 + if not retrieved_doc or len(retrieved_doc) == 0: + return { + "is_relevant": False, + "explanation": "没有检索到任何文档", + "relevance_score": 0.0 + } + + # 构建文档内容 + doc_contents = [] + for i, doc in enumerate(retrieved_doc[:3]): # 只取前3个文档进行判断 + content = doc.get("content", "") + title = doc.get("title", "") + doc_contents.append(f"文档{i+1}标题: {title}\n文档{i+1}内容: {content}") + + doc_text = "\n\n".join(doc_contents) + class TempModel(BaseModel): + is_relevant: bool = Field(description="是否与用户提问相关") + relevance_score: int = Field(description="相关性评分,0-100分") + explanation: str = Field(description="解释各个文档与提问的相关性或不相关性") + + parser = PydanticOutputParser(pydantic_object=TempModel) + # 构建提示词 + prompt = f"""请判断以下检索文档是否与用户提问相关,并给出相关性评分(0-100分)。 + +用户提问: {query} + +检索文档: +{doc_text} + +请按照以下JSON格式返回结果: +{parser.get_format_instructions()} +""" + + try: + # 初始化LLM并调用 + llm = OpenAiLLM(api_key=api_key, base_url=base_url, model="deepseek-ai/DeepSeek-R1", response_format={"type": "json_object"}) + response = llm.invoke(prompt) + + result = parser.parse(response.content) + + return { + "is_relevant": result.is_relevant, + "relevance_score": result.relevance_score, + "explanation": result.explanation + } + except Exception as e: + logging.error(f"判断文档相关性时出错: {str(e)}") + return { + "is_relevant": False, + "explanation": f"判断过程出错: {str(e)}", + "relevance_score": 0.0 + } + # 读取Excel文件中的提问数据 def load_questions_from_excel(file_path=None): """ @@ -70,23 +149,33 @@ def process_query(recognizer: IntentRecognizer, query: str, conversation_context enable_query_expansion=True) # 提取分类信息 classification = result["classification"] + original_query = result["rewrite"]["rewrite"] + query_list = result["query_expand"]["all"] + soft_name = result.get("slot_filling", {}).get("filled_data", {}).get("software_name","") + # 将字典转换为Classification对象 + classification_obj = Classification(**classification) + retrieved_doc=dify_query_retrieval.retrieve(original_query, query_list, classification_obj, soft_name) - # 提取关键词信息 - keywords = result["keywords"] - keywords_str = "" - if keywords and keywords.get("terms"): - term_details = [] - for term in keywords["terms"]: - term_info = { - "名称": term["name"], - "同义词": ";".join(term["synonymous"]) if term["synonymous"] else "", - "描述": term["description"] - } - term_details.append(term_info) - - # 将term_details转换为JSON字符串,确保中文正确显示 - keywords_str = json.dumps(term_details, ensure_ascii=False, indent=2) - + # 判断检索文档是否相关 + relevance_result = {} + if retrieved_doc: + # 获取API密钥和基础URL + api_key = os.getenv("OPENAI_API_KEY") + base_url = os.getenv("OPENAI_API_BASE") + model_name = os.getenv("LLM_MODEL_NAME", "gpt-3.5-turbo") + # 判断文档相关性 + relevance_result = is_retrieved_doc_relevant(query, retrieved_doc, api_key, base_url, model_name) + else: + retrieved_doc_str = [] + relevance_result = { + "is_relevant": False, + "explanation": "没有检索到文档", + "relevance_score": 0.0 + } + + retrieved_doc_titles=[] + if retrieved_doc: + retrieved_doc_titles=[doc["title"].split("/")[-1] for doc in retrieved_doc] # 提取槽位填充信息 slot_filling = result.get("slot_filling", {}) slot_filling_str = "" @@ -97,30 +186,31 @@ def process_query(recognizer: IntentRecognizer, query: str, conversation_context "缺失槽位": slot_filling.get("missing_slots", {}), "填充数据": slot_filling.get("filled_data", {}) }, ensure_ascii=False, indent=2) - + # 处理成功,返回结果 return { "提问": query, - "问题拆解": result["query_keys"], - "一级分类": classification["vertical_classification"], - "二级分类": classification["sub_classification"], + "问题分类": f"{classification['vertical_classification']} - {classification['sub_classification']}", "问题改写": result["rewrite"]["rewrite"], - "检索的关键词": keywords_str, - "槽位填充": slot_filling_str + "槽位填充": slot_filling_str, + "检索的文档": "\n".join(retrieved_doc_titles), + "文档是否相关": "相关" if relevance_result["is_relevant"] else "不相关", + "文档相关性解释": relevance_result["explanation"] } except Exception as e: + logging.error(f"处理问题 '{query}' 时出错: ",exc_info=True) retry_count += 1 # 如果已经重试了最大次数,则记录错误并返回错误结果 if retry_count > max_retries: - logging.error(f"处理问题 '{query}' 时出错: {e.__class__}{e}") return { "提问": query, - "一级分类": "处理出错", - "二级分类": "处理出错", + "问题分类": "处理出错", "问题改写": "处理出错", - "检索的关键词": f"重试 {max_retries} 次后失败: {str(e)}", - "槽位填充": "处理出错" + "槽位填充": "处理出错", + "检索的文档": f"重试 {max_retries} 次后失败: {str(e)}", + "文档是否相关": "处理出错", + "文档相关性解释": "处理出错" } else: # 可以在这里添加延迟,避免过快重试 @@ -172,6 +262,7 @@ def save_results_to_excel(results, output_file, is_final=False): worksheet.set_column('E:E', 60) # 问题改写 60个Excel单位 worksheet.set_column('F:F', 60) # 检索到的关键词 60个Excel单位 worksheet.set_column('G:G', 80) # 槽位填充 80个Excel单位 + worksheet.set_column('H:H', 60) # 文档相关性 60个Excel单位 # 设置所有行高为20磅 for i in range(len(results_df) + 1): # +1 是为了包括表头 @@ -222,7 +313,7 @@ def parse_arguments(): help='API基础URL,默认使用环境变量中的配置') # 添加处理相关参数 - parser.add_argument('--max_workers', '-w', type=int, default=20, + parser.add_argument('--max_workers', '-w', type=int, default=2, help='并发处理的最大线程数,默认为20') parser.add_argument('--debug', '-d', action='store_true', help='启用调试模式,使用示例查询而非从文件读取') @@ -249,12 +340,12 @@ def main(): # 读取提问数据 current_dir = os.path.dirname(os.path.abspath(__file__)) - data_file = args.input if args.input else os.path.join(current_dir, "..", "..", "data", "excel", "历史提问数据(dislike)_提问明确.xlsx") - output_file = args.output if args.output else os.path.join(current_dir, "..", "..", "data", "excel", "历史提问数据(dislike)_槽位(分类)填充结果.xlsx") + data_file = args.input if args.input else os.path.join(current_dir, "..", "..", "data", "excel", "1500条点踩软件问题测试.xlsx") + output_file = args.output if args.output else os.path.join(current_dir, "..", "..", "data", "excel", "1500条点踩软件问题_槽位(分类)填充结果.xlsx") # 检测是否为调试模式 is_debug = args.debug or (hasattr(sys, 'gettrace') and sys.gettrace() is not None) - + is_debug = False if is_debug: # 如果提供了查询参数,使用它;否则使用默认示例 if args.query: @@ -287,12 +378,7 @@ def main(): result = future.result() # 将结果放在与输入相同的位置 results[idx] = result - completed += 1 - # 每处理batch_size条数据保存一次 - # if completed % batch_size == 0: - # logging.info(f"已完成 {completed}/{len(examples)} 条,保存中间结果...") - # save_results_to_excel(results, output_file, is_final=False) # 处理完所有数据后,保存最终结果 save_results_to_excel(results, output_file, is_final=True) @@ -308,7 +394,7 @@ def setup_logging(): # 配置日志输出到控制台 logging.basicConfig( level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + format='%(asctime)s - %(name)s - [%(thread)d] - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler() # 添加控制台处理器 ] diff --git a/rag2_0/demo/validate_excel_data_batch.py b/rag2_0/demo/validate_excel_data_batch.py index 33d1c55..a6d4bbd 100755 --- a/rag2_0/demo/validate_excel_data_batch.py +++ b/rag2_0/demo/validate_excel_data_batch.py @@ -390,7 +390,7 @@ class ExcelDataValidator: return index, False, "槽位填充", error_reason, slot_confidence # 4. 验证检索内容 - if retrieve_content: + if retrieve_content and retrieve_content != "" and pd.notna(retrieve_content): if self.debug: logging.info(f" 验证检索内容...") diff --git a/rag2_0/dify/DifyQueryRetrieval.py b/rag2_0/dify/DifyQueryRetrieval.py new file mode 100644 index 0000000..6046917 --- /dev/null +++ b/rag2_0/dify/DifyQueryRetrieval.py @@ -0,0 +1,164 @@ +import sys +import os +import json +from threading import Thread +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import List, Dict, Any, Optional +import logging +import time +sys.path.append(os.getcwd()) + +from rag2_0.intent_recognition.DataModels import Classification +from rag2_0.dify.dify_client.client import DifyClient, KnowledgeBaseClient +from rag2_0.tool.ModelTool import XinferenceReRankerModel +class DifyQueryRetrieval: + + software_to_dataset_map = {"配网工程计价通D3":["下载安装注册","配网造价知识","配网造价软件知识"], + "新型储能电站建设计价通C1":["下载安装注册","储能C1计价通软件知识","新能源造价知识"], + "西藏电力工程计价通Z1":["下载安装注册","西藏造价知识","西藏造价软件知识"], + "技改检修工程计价通T1":["下载安装注册","技改检修工程计价通T1软件知识","技改造价知识"], + "技改检修清单计价通T1":["下载安装注册","技改检修清单计价通T1软件知识","技改造价知识"], + "电力建设计价通":["下载安装注册","主网造价知识","电力建设计价通(2018)软件知识"], + "其他":["下载安装注册","技改检修清单计价通T1软件知识", + "主网造价知识","西藏造价知识","技改检修工程计价通T1软件知识", + "电力建设计价通(2018)软件知识","储能C1计价通软件知识", + "西藏造价软件知识","新能源造价知识","配网造价知识","技改造价知识", + "配网造价软件知识"]} + + def __init__(self, api_key: str, base_url: str): + self._api_key = api_key + self._base_url = base_url + self._datasets_list = self.get_datasets_list() + + def get_datasets_list(self) -> Dict[str, str]: + client = KnowledgeBaseClient(api_key=self._api_key, base_url=self._base_url) + datasets = client.list_datasets(page_size=50) + datasets_json = datasets.json() + return {dataset["name"]:dataset["id"] for dataset in datasets_json["data"]} + + def retrieve_by_dataset(self, query: str, dataset_name: str) -> List[Dict[str, Any]]: + try: + knowledge_base_client = KnowledgeBaseClient(api_key=self._api_key, base_url=self._base_url, dataset_id=self._datasets_list[dataset_name]) + documents = knowledge_base_client.retrieve(query) + retrieved_documents = documents.json().get("records", []) + + # 添加数据集信息 + for retrieved_document in retrieved_documents: + retrieved_document["dataset_id"] = self._datasets_list[dataset_name] + retrieved_document["dataset_name"] = dataset_name + + return retrieved_documents + except Exception as e: + logging.error(f"检索数据集 {dataset_name} 时出错: {str(e)}", exc_info=True) + return [] + + def retrieve(self, original_query: str, query_list: List[str], classification: Classification, software_name: str) -> Optional[List[Dict[str, Any]]]: + datasets = self.get_datasets_by_classification(classification, software_name) + if len(datasets) == 0: + return None + + all_documents=[] + # 使用线程池替代无限制创建线程 + # 设置合理的最大线程数,这里使用min(32, len(query_list) * len(datasets))来限制 + time_start = time.time() + max_workers = min(os.cpu_count() * 2, len(query_list) * len(datasets)) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [] + for query in query_list: + for dataset in datasets: + if dataset not in self._datasets_list: + raise ValueError(f"dataset {dataset} not in datasets_list") + + futures.append(executor.submit(self.retrieve_by_dataset, query, dataset)) + + # 等待所有任务完成 + for future in as_completed(futures): + # 处理可能的异常 + try: + retrieved_documents = future.result() + all_documents.extend(retrieved_documents) + except Exception as e: + logging.error(f"检索过程中发生错误: {str(e)}", exc_info=True) + time_end = time.time() + + logging.info(f"检索耗时: {time_end - time_start:.2f}秒") + # 根据segment_id对文档进行去重 + unique_documents = {} + for document in all_documents: + segment_id = document['segment']['id'] + if segment_id not in unique_documents: + unique_documents[segment_id] = document + + # 将去重后的文档转换为列表 + deduplicated_documents = list(unique_documents.values()) + + # 对所有检索出来的文档进行重排序 + time_start = time.time() + processed_documents = self.data_post_processor(original_query, deduplicated_documents) + time_end = time.time() + logging.info(f"检索后重排序耗时: {time_end - time_start:.2f}秒") + + return processed_documents + + def data_post_processor(self, query: str, all_documents: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + reranker_model = XinferenceReRankerModel() + documents = [document['segment']['content'] for document in all_documents] + reranked_documents = reranker_model.rerank(query, documents, top_k=5) + new_all_documents = [] + + def to_dify_document_format(document: dict)->dict: + return { + "metadata": { + "_source": "knowledge", + "dataset_id": document["dataset_id"], + "dataset_name": document["dataset_name"], + "document_id": document['segment']['document_id'], + "document_name": document["segment"]["document"]["name"], + "document_data_source_type": document["segment"]["document"]["data_source_type"], + "segment_id": document["segment"]["id"], + "retriever_from": "api", + "score": document.get("score", 0), + "segment_hit_count": document.get("segment", {}).get("hit_count", 0), + "segment_word_count": document.get("segment", {}).get("word_count", 0), + "segment_position": document.get("segment", {}).get("position", 0), + "segment_index_node_hash": document.get("segment", {}).get("index_node_hash", ""), + "position": document["segment"].get("position", 0) + }, + "title": document["segment"]["document"]["name"], + "content": document["segment"]["content"] + } + + for reranked_document in reranked_documents: + cur_doc_info = all_documents[reranked_document["index"]] + cur_doc_info["score"] = reranked_document["score"] + new_all_documents.append(to_dify_document_format(cur_doc_info)) + return new_all_documents + + def get_datasets_by_classification(self, classification: Classification, software_name: str) -> List[str]: + if classification.vertical_classification=="软件问题" or classification.vertical_classification=="业务问题": + software_name_list = self.software_to_dataset_map.keys() + cur_software_name = "" + for software_name_info in software_name_list: + if software_name_info in software_name: + cur_software_name = software_name_info + break + if cur_software_name == "": + return self.software_to_dataset_map["其他"] + else: + return self.software_to_dataset_map[cur_software_name] + + if classification.vertical_classification == "安装下载注册": + if classification.sub_classification in ["后缀名咨询", "软件锁类"]: + return ["下载安装注册"] + elif classification.sub_classification == "安装下载类": + return [] + elif classification.sub_classification == "问题排查": + return self.software_to_dataset_map["其他"] + + return self.software_to_dataset_map["其他"] + + +if __name__ == "__main__": + dify_query_retrieval = DifyQueryRetrieval(api_key="dataset-skLjmPVonjHo119OWNf3kAmY", base_url="https://172.20.0.145/v1") + datasets = dify_query_retrieval.retrieve("配网工程计价通D3软件如何新建工程?", Classification(vertical_classification="软件问题", sub_classification="软件功能"), "配网工程计价通D3") + print(datasets) \ No newline at end of file diff --git a/rag2_0/intent_recognition/DataModels.py b/rag2_0/intent_recognition/DataModels.py index 64a59e6..5fd216c 100755 --- a/rag2_0/intent_recognition/DataModels.py +++ b/rag2_0/intent_recognition/DataModels.py @@ -308,4 +308,25 @@ class IntentAndSlotResult(BaseModel): InstallationDownloadSlots, ProblemDiagnosisSlots, OtherSlots - ] \ No newline at end of file + ] + +# 意图优化环节数据模型 +class StepBackPrompt(BaseModel): + """后退提示数据模型""" + original_query: str = Field(description="原始查询") + step_back_query: str = Field(description="后退提示生成的抽象查询") + +class FollowUpQuestions(BaseModel): + """后续问题数据模型""" + original_query: str = Field(description="原始查询") + follow_up_query: str = Field(description="基于历史对话生成的独立问题") + +class HypotheticalDocument(BaseModel): + """假设文档数据模型""" + original_query: str = Field(description="原始查询") + hypothetical_answer: str = Field(description="假设性回答") + +class MultiQuestions(BaseModel): + """多问题查询数据模型""" + original_query: str = Field(description="原始查询") + sub_questions: List[str] = Field(description="从不同角度生成的子问题列表") \ No newline at end of file diff --git a/rag2_0/intent_recognition/IntentRecognition.py b/rag2_0/intent_recognition/IntentRecognition.py index 626feab..861a3b6 100755 --- a/rag2_0/intent_recognition/IntentRecognition.py +++ b/rag2_0/intent_recognition/IntentRecognition.py @@ -9,6 +9,7 @@ Description: 意图分类、改写核心逻辑 import logging import os +import threading from langchain.output_parsers import PydanticOutputParser import json from typing import List, Tuple, Dict, Any, Optional @@ -18,7 +19,8 @@ import time from .PromptTemplates import (classification_prompt, query_rewrite_prompt, extract_nouns_prompt, classification_info, - slot_filling_prompt) + slot_filling_prompt, step_back_prompt, + follow_up_questions_prompt, hyde_prompt, multi_questions_prompt) from .Multi_PromptTemplates import ( intent_and_slot_prompt, output_example, @@ -29,7 +31,8 @@ from .DataModels import ( Classification, QueryRewrite, Term, TermList, SoftwareFunctionSlots, SoftwareTroubleShootingSlots, ProfessionalConsultingSlots, DataProblemSlots, FileExtensionConsultingSlots, SoftwareLockSlots, - InstallationDownloadSlots, ProblemDiagnosisSlots, OtherSlots, IntentAndSlotResult + InstallationDownloadSlots, ProblemDiagnosisSlots, OtherSlots, IntentAndSlotResult, + StepBackPrompt, FollowUpQuestions, HypotheticalDocument, MultiQuestions ) from .ProfessionalNounVector import ProfessionalNounRetriever from rag2_0.tool.ModelTool import XinferenceReRankerModel, OpenAiLLM, SiliconFlowReRankerModel @@ -166,34 +169,32 @@ class IntentRecognizer: Returns: 提取的术语列表 """ - try: - # 如果使用jieba分词 - if use_jieba: - # 先使用jieba分词 - tokens = self._tokenize_with_jieba(query) - - # 构建术语列表 - terms = [] - for token in tokens: - if len(token) > 1: # 过滤掉单字词 - terms.append(Term(name=token, synonymous=[], description="")) - - return terms - else: - # 使用LLM提取关键词 - # 准备提示词 - formatted_prompt = extract_nouns_prompt.replace("{content}", query) - terms_list_parser = PydanticOutputParser(pydantic_object=TermList) - formatted_prompt = formatted_prompt.replace("{output_format}", terms_list_parser.get_format_instructions()) - - # 调用LLM - response = self._llm.invoke(formatted_prompt, False) - - # 尝试使用Pydantic解析器解析TermList - parsed_output = terms_list_parser.parse(response.content) - return parsed_output.terms - except Exception as e: - raise RuntimeError(f"无法解析LLM关键词提取响应: {e}") from e + # 如果使用jieba分词 + if use_jieba: + # 先使用jieba分词 + tokens = self._tokenize_with_jieba(query) + + # 构建术语列表 + terms = [] + for token in tokens: + if len(token) > 1: # 过滤掉单字词 + terms.append(Term(name=token, synonymous=[], description="")) + + return terms + else: + # 使用LLM提取关键词 + # 准备提示词 + formatted_prompt = extract_nouns_prompt.replace("{content}", query) + terms_list_parser = PydanticOutputParser(pydantic_object=TermList) + formatted_prompt = formatted_prompt.replace("{output_format}", terms_list_parser.get_format_instructions()) + + # 调用LLM + response = self._llm.invoke(formatted_prompt, False) + + # 尝试使用Pydantic解析器解析TermList + parsed_output = terms_list_parser.parse(response.content) + return parsed_output.terms + def _rerank_matched_terms(self, query_key: str, matched_terms: set, top_k: int = 2, rerank_score:float = 0.6) -> List[Term]: """ @@ -358,7 +359,8 @@ class IntentRecognizer: def process_query(self, query: str, conversation_context: str = "", chat_history: List[Dict[str, str]] = None, previous_slots: Dict[str, Any] = None, - use_jieba: bool = False) -> Dict[str, Any]: + use_jieba: bool = False, + enable_query_expansion: bool = False) -> Dict[str, Any]: """ 处理用户问题的完整流程 @@ -388,10 +390,29 @@ class IntentRecognizer: # suffix_terms.append(suffix_term) # return Classification(vertical_classification="安装下载", sub_classification="查询"), TermList(terms=suffix_terms), QueryRewrite(rewrite=query), matched_suffixes + if chat_history is None: chat_history = [] if previous_slots is None: previous_slots = {} + + # 步骤: 并行执行提问扩展 + if enable_query_expansion: + # 创建线程和结果容器 + threads_and_results = [ + # 5.1: 后退提示 + self._run_in_thread(self._generate_step_back_prompt, args=(query, chat_history, conversation_context)), + + # 5.2: Follow Up Questions + self._run_in_thread(self._generate_follow_up_questions, args=(query, chat_history, conversation_context)), + + # 5.3: HyDE + self._run_in_thread(self._generate_hypothetical_document, args=(query, chat_history, conversation_context)), + + # 5.4: 多问题查询 + self._run_in_thread(self._generate_multi_questions, args=(query, chat_history, conversation_context)) + ] + # 步骤1: 匹配关键词 keywords_terms, query_keys = self._match_keywords(query, use_jieba) @@ -419,12 +440,47 @@ class IntentRecognizer: if classification.vertical_classification not in ["其他", "闲聊"] and classification.sub_classification not in ["其他", "闲聊"]: slot_filling_result = self._fill_slots(rewrite.rewrite, classification, conversation_context, chat_history, previous_slots) + if not enable_query_expansion: + return { + "classification": classification.model_dump(), + "keywords": keywords_terms.model_dump(), + "rewrite": rewrite.model_dump(), + "query_keys": query_keys, + "slot_filling": slot_filling_result + } + + # 等待所有线程完成 + start_time = time.time() + for thread, _ in threads_and_results: + thread.join() + end_time = time.time() + logging.info(f"问题扩展环节耗时统计 - 总耗时: {end_time - start_time:.2f}秒") + + # 收集结果 + step_back_result = threads_and_results[0][1][0] if threads_and_results[0][1] else StepBackPrompt(original_query=query, step_back_query=query) + follow_up_result = threads_and_results[1][1][0] if threads_and_results[1][1] else FollowUpQuestions(original_query=query, follow_up_query=query) + hyde_result = threads_and_results[2][1][0] if threads_and_results[2][1] else HypotheticalDocument(original_query=query, hypothetical_answer="") + multi_questions_result = threads_and_results[3][1][0] if threads_and_results[3][1] else MultiQuestions(original_query=query, sub_questions=[query]) + all_questions=multi_questions_result.sub_questions + all_questions.append(query) + all_questions.append(step_back_result.step_back_query) + all_questions.append(follow_up_result.follow_up_query) + all_questions.append(hyde_result.hypothetical_answer) + all_questions = list(set(all_questions)) + + query_expand={"all":all_questions, + "step_back":step_back_result.model_dump(), + "follow_up":follow_up_result.model_dump(), + "hyde":hyde_result.model_dump(), + "multi_questions":multi_questions_result.model_dump()} + # 返回所有结果 return { "classification": classification.model_dump(), "keywords": keywords_terms.model_dump(), "rewrite": rewrite.model_dump(), "query_keys": query_keys, - "slot_filling": slot_filling_result + "slot_filling": slot_filling_result, + "query_expand": query_expand } @@ -544,7 +600,182 @@ class IntentRecognizer: # 如果解析失败,创建一个空的模型实例 empty_instance = slot_model_class() return empty_instance + + def _generate_step_back_prompt(self, query: str, chat_history: List[Dict[str, str]] = None, conversation_context: str = "") -> StepBackPrompt: + """ + 生成后退提示 + Args: + query: 用户原始问题 + chat_history: 历史对话记录 + conversation_context: 会话背景信息 + + Returns: + 后退提示结果 + """ + step_back_start_time = time.time() + # 准备提示词 + step_back_parser = PydanticOutputParser(pydantic_object=StepBackPrompt) + formatted_prompt = step_back_prompt.format( + query=query, + chat_history=json.dumps(chat_history, ensure_ascii=False) if chat_history else "[]", + conversation_context=conversation_context, + output_format=step_back_parser.get_format_instructions() + ) + + try: + # 调用LLM + response = self._llm.invoke(formatted_prompt, False) + + # 解析输出 + parsed_output = step_back_parser.parse(response.content) + step_back_end_time = time.time() + step_back_time = step_back_end_time - step_back_start_time + logging.debug(f"后退提示生成耗时统计 - 总耗时: {step_back_time:.2f}秒") + return parsed_output + except Exception as e: + # 如果解析失败,返回原始查询作为后退提示 + logging.error(f"后退提示生成失败: {e}") + return StepBackPrompt(original_query=query, step_back_query=query) + + def _generate_follow_up_questions(self, query: str, chat_history: List[Dict[str, str]] = None, conversation_context: str = "") -> FollowUpQuestions: + """ + 生成后续问题 + + Args: + query: 用户原始问题 + chat_history: 历史对话记录 + conversation_context: 会话背景信息 + + Returns: + 后续问题结果 + """ + follow_up_start_time = time.time() + # 准备提示词 + follow_up_parser = PydanticOutputParser(pydantic_object=FollowUpQuestions) + formatted_prompt = follow_up_questions_prompt.format( + query=query, + chat_history=json.dumps(chat_history, ensure_ascii=False) if chat_history else "[]", + conversation_context=conversation_context, + output_format=follow_up_parser.get_format_instructions() + ) + + try: + # 调用LLM + response = self._llm.invoke(formatted_prompt, False) + + # 解析输出 + parsed_output = follow_up_parser.parse(response.content) + follow_up_end_time = time.time() + follow_up_time = follow_up_end_time - follow_up_start_time + logging.debug(f"后续问题生成耗时统计 - 总耗时: {follow_up_time:.2f}秒") + return parsed_output + except Exception as e: + # 如果解析失败,返回原始查询作为后续问题 + logging.error(f"后续问题生成失败: {e}") + return FollowUpQuestions(original_query=query, follow_up_query=query) + + def _generate_hypothetical_document(self, query: str, chat_history: List[Dict[str, str]] = None, conversation_context: str = "") -> HypotheticalDocument: + """ + 生成假设性文档 + + Args: + query: 用户原始问题 + chat_history: 历史对话记录 + conversation_context: 会话背景信息 + + Returns: + 假设性文档结果 + """ + hyde_start_time = time.time() + # 准备提示词 + hyde_parser = PydanticOutputParser(pydantic_object=HypotheticalDocument) + formatted_prompt = hyde_prompt.format( + query=query, + chat_history=json.dumps(chat_history, ensure_ascii=False) if chat_history else "[]", + conversation_context=conversation_context, + output_format=hyde_parser.get_format_instructions() + ) + + try: + # 调用LLM + response = self._llm.invoke(formatted_prompt, False) + + # 解析输出 + parsed_output = hyde_parser.parse(response.content) + hyde_end_time = time.time() + hyde_time = hyde_end_time - hyde_start_time + logging.debug(f"假设性文档生成耗时统计 - 总耗时: {hyde_time:.2f}秒") + return parsed_output + except Exception as e: + # 如果解析失败,返回空的假设性回答 + logging.error(f"假设性文档生成失败: {e}") + return HypotheticalDocument(original_query=query, hypothetical_answer="") + + def _generate_multi_questions(self, query: str, chat_history: List[Dict[str, str]] = None, conversation_context: str = "") -> MultiQuestions: + """ + 生成多角度问题 + + Args: + query: 用户原始问题 + chat_history: 历史对话记录 + conversation_context: 会话背景信息 + + Returns: + 多角度问题结果 + """ + multi_questions_start_time = time.time() + # 准备提示词 + multi_questions_parser = PydanticOutputParser(pydantic_object=MultiQuestions) + formatted_prompt = multi_questions_prompt.format( + query=query, + chat_history=json.dumps(chat_history, ensure_ascii=False) if chat_history else "[]", + conversation_context=conversation_context, + output_format=multi_questions_parser.get_format_instructions() + ) + + try: + # 调用LLM + response = self._llm.invoke(formatted_prompt, False) + + # 解析输出 + parsed_output = multi_questions_parser.parse(response.content) + multi_questions_end_time = time.time() + multi_questions_time = multi_questions_end_time - multi_questions_start_time + logging.debug(f"多角度问题生成耗时统计 - 总耗时: {multi_questions_time:.2f}秒") + return parsed_output + except Exception as e: + # 如果解析失败,返回原始查询作为唯一子问题 + logging.error(f"多角度问题生成失败: {e},LLM返回内容:{response.content}") + return MultiQuestions(original_query=query, sub_questions=[query]) + + def _run_in_thread(self, func, args=(), kwargs={}): + """ + 在线程中执行函数并返回结果 + + Args: + func: 要执行的函数 + args: 函数的位置参数 + kwargs: 函数的关键字参数 + + Returns: + (thread, result_container): 线程对象和存放结果的容器 + """ + result_container = [] + + def thread_target(): + try: + result = func(*args, **kwargs) + result_container.append(result) + except Exception as e: + logging.error(f"线程执行函数 {func.__name__} 时出错: {e}") + result_container.append(None) + + thread = threading.Thread(target=thread_target) + thread.start() + return thread, result_container + + def _process_intent_and_slot(self, user_input: str, conversation_context: str = "", chat_history: List[Dict[str, str]] = None, previous_slots: Dict[str, Any] = None) -> Dict[str, Any]: diff --git a/rag2_0/intent_recognition/Multi_PromptTemplates.py b/rag2_0/intent_recognition/Multi_PromptTemplates.py index 092e975..a8ce21b 100755 --- a/rag2_0/intent_recognition/Multi_PromptTemplates.py +++ b/rag2_0/intent_recognition/Multi_PromptTemplates.py @@ -7,123 +7,6 @@ Date: 2025-06-13 Description: 多轮对话下意图分类、改写核心提示词 """ - -# 首版重构提示词 -query_rewrite_prompt_pro_old=""" -# 电力造价专业问答优化工程师(升级版) - -你是一名电力造价专业问答优化工程师,负责结合历史对话背景和专业术语库,将用户的原始问题进行规范化重构,以提升知识库检索准确率。 - -## 核心任务 -基于历史对话上下文和专业术语库,将用户的原始问题进行规范化重构,提高知识库检索的准确性和专业性,同时保持对话的连贯性和语境相关性。 - -## 处理流程 -### 第一阶段:输入解析 -1. 解析基础信息 - - 原始问题(需保留核心语义):{query} - - 关键词集合:{keywords} - - 历史对话记录:{chat_history} - - 当前聊天背景:{context} - -2. 背景分析 - - 识别历史对话中的关键主题和专业领域 - - 提取上下文中的隐含信息(如软件版本、地区、具体场景等) - - 分析用户的提问模式和专业水平 - -### 第二阶段:上下文匹配分析 -**背景匹配规则:** -1. 检查当前问题是否与历史对话存在关联性 -2. 识别历史对话中提到的关键信息: - - 软件版本/系统(如Z1、D3等) - - 地区定额(如西藏、山东等) - - 具体功能模块 - - 用户操作习惯 - -**术语匹配规则:** -1. 检查原始问题中是否包含关键词集合中的`name`字段或`synonymous`字段中的任何词汇 -2. 结合历史对话,识别可能的隐含专业术语 -3. 统计匹配的术语数量 -4. 判断执行路径: - - 匹配术语 ≥ 1个 或 存在明显上下文关联 → 执行重构流程 - - 匹配术语 = 0个 且 无明显上下文关联 → 直接输出原始问题 - -### 第三阶段:问题重构 -**重构原则(按优先级排序):** - -1. **语义保真**:严格保持原问题的核心意图和诉求 -2. **上下文继承**: - - 补充历史对话中的隐含信息(如软件名称、版本、地区等) - - 保持对话的连贯性和逻辑性 - - 避免重复已确认的背景信息 -3. **术语规范**: - - 将匹配到的同义词替换为对应的标准术语(name字段) - - 对在关键词中的标准术语使用【】进行标记 - - 保留在原问题中未在关键词库中的专业术语、限定词和修饰词 -4. **结构优化**: - - 保持原问题的语态特征5W2H - - 保持主谓宾结构清晰 - - 保留时间、版本等限定条件 - -**术语处理规则:** -- 优先级1:基于历史对话补充缺失的背景信息 -- 优先级2:保留原问题中的专业术语、限定词和修饰词(即使不在关键词库中) -- 优先级3:将同义词替换为标准术语并用【】标记 -- 优先级4:对原问题中已存在的标准术语添加【】标记 - -**上下文处理策略:** -- 如果当前问题与历史对话高度相关,适当补充背景信息 -- 如果用户使用代词(如"这个"、"那个"),尝试结合历史对话明确指代 -- 如果历史对话中已确定软件或系统,在当前问题中适当体现 - -# 输出规范 -{output_format} - -# 示范案例库 - -▶ 案例1(有效匹配 + 上下文继承) -历史对话:用户之前询问过"西藏定额升级的问题" -输入: -原始问题:怎么把旧版工程转到Z1新版 -关键词:【'老版本定额升级', '批量设置定额', '西藏造价软件Z1'】 -输出: -{{"rewrite":"【西藏造价软件Z1】如何执行【老版本定额升级】操作?"}} - -▶ 案例2(无效匹配 + 无上下文关联) -历史对话:无相关内容 -输入: -原始问题:程序界面文字显示过小如何处理? -关键词:【'定额升级', '工程批量导入'】 -输出: -{{"rewrite":"程序界面文字显示过小如何处理?"}} - -▶ 案例3(部分匹配 + 上下文补充) -历史对话:用户之前询问过"D3软件的功能" -输入: -原始问题:能导出清单的计算公式吗? -关键词:【'配网工程计价通D3软件', '计算式'】 -输出: -{{"rewrite":"【配网工程计价通D3软件】能导出清单的【计算式】吗?"}} - -▶ 案例4(代词替换 + 上下文解析) -历史对话:用户刚询问过"山东定额的问题" -输入: -原始问题:这个定额怎么批量导入? -关键词:【'批量导入定额', '山东定额'】 -输出: -{{"rewrite":"【山东定额】如何进行【批量导入定额】操作?"}} - -## 质量检查清单 -执行前请确认: -- [ ] 是否保持了原问题的核心诉求? -- [ ] 是否合理利用了历史对话中的背景信息? -- [ ] 是否正确执行了同义词替换? -- [ ] 是否保留了原问题中的专业术语和限定条件? -- [ ] 是否正确使用了【】标记? -- [ ] 重构后的问题是否自然流畅? -- [ ] 是否保持了对话的连贯性? -- [ ] 是否避免了过度补充不必要的信息? -""" - query_rewrite_prompt_pro=""" # 电力造价问答优化工程师(精简版) **角色**:基于历史对话和术语库重构问题,提升知识库检索准确率。 diff --git a/rag2_0/intent_recognition/PromptTemplates.py b/rag2_0/intent_recognition/PromptTemplates.py index ebee592..2288487 100755 --- a/rag2_0/intent_recognition/PromptTemplates.py +++ b/rag2_0/intent_recognition/PromptTemplates.py @@ -49,7 +49,7 @@ classification_info="""【垂直领域分类】: "用哪个软件打开.BDY3文件?", "BDD3是什么" 2. 软件锁类:询问软件锁信息、锁注册号查询、许可证查询、锁激活问题等软件锁相关问题 -3. 安装下载类:安装下载咨询、组件(插件)选择、环境配置等 +3. 安装下载类:安装下载咨询、组件(插件)选择、环境配置、安装包下载、政策文件(规范文件)下载等 4. 问题排查类:软件安装下载失败、报错,系统兼容性问题等 【其他】: @@ -203,4 +203,165 @@ slot_filling_prompt = """ "os_version": null, "reproduction_steps": "新建工程" }} +""" + +# 意图优化环节提示词模板 +step_back_prompt = """ +# 后退提示生成器 + +你是一个专业的电力造价领域问题抽象专家。你的任务是根据用户的具体问题,提出一个更抽象、更高层次的问题,帮助系统更好地理解用户的意图。 + +## 任务说明 +1. 分析用户的原始问题,理解其核心意图和需求 +2. 考虑历史对话和会话背景,理解用户当前问题的上下文 +3. 生成一个更抽象、更高层次的问题,称为"后退问题" +4. 后退问题应该: + - 更加通用和抽象 + - 涵盖原始问题的核心主题 + - 去除过于具体的限制条件(如时间、地点、特定版本等) + - 保持在同一领域和主题范围内 + - 考虑历史对话中的相关信息 + +## 输入 +用户原始问题: {query} +历史对话记录: {chat_history} +会话背景: {conversation_context} + +## 输出格式 +{output_format} + +## 示例 +原始问题: "配网D3软件2023版本如何在Windows 11系统上导入单位工程量清单?" +后退问题: "配网D3软件如何导入工程量清单?" + +原始问题: "技改T1软件中的某个设备更换后,如何在系统中更新对应的定额?" +后退问题: "技改T1软件中如何更新设备对应的定额?" +""" + +follow_up_questions_prompt = """ +# 后续问题生成器 + +你是一个专业的电力造价领域对话理解专家。你的任务是根据历史对话和当前问题,生成一个完整的独立问题,确保即使没有历史上下文也能理解。 + +## 任务说明 +1. 分析历史对话记录和当前用户问题 +2. 识别当前问题中可能引用了历史对话的部分(如代词、省略内容等) +3. 生成一个完整的独立问题,该问题应该: + - 包含历史对话中的关键上下文 + - 明确指代所有代词和省略内容 + - 保持原问题的核心意图不变 + - 即使没有历史对话也能被理解 + +## 输入 +历史对话记录: {chat_history} +当前用户问题: {query} +会话背景: {conversation_context} + +## 输出格式 +{output_format} + +## 示例 +历史对话: +用户: "我在使用配网D3软件" +助手: "好的,请问您遇到什么问题?" +当前问题: "怎么导入清单?" +生成的独立问题: "在配网D3软件中,怎么导入清单?" + +历史对话: +用户: "技改T1软件中的设备更换功能在哪里?" +助手: "设备更换功能在'工程管理'菜单下的'设备管理'子菜单中。" +当前问题: "更新后如何保存?" +生成的独立问题: "在技改T1软件中使用设备更换功能后,如何保存更新的设备信息?" +""" + +hyde_prompt = """ +# 假设性文档生成器 + +你是一个专业的电力造价领域知识专家。你的任务是根据用户的问题和历史对话,生成一个假设性的回答,该回答可能包含的信息和模式将有助于在知识库中检索相关文档。 + +## 任务说明 +1. 分析用户的问题,理解其核心意图和需求 +2. 考虑历史对话和会话背景,理解用户当前问题的上下文 +3. 生成一个假设性的回答,该回答应该: + - 尽可能准确地回答用户问题 + - 包含可能与问题相关的专业术语和概念 + - 包含可能在知识库文档中出现的模式和结构 + - 考虑历史对话中的相关信息 + - 即使你不确定答案的准确性,也要提供一个合理的假设 + +## 输入 +用户问题: {query} +历史对话记录: {chat_history} +会话背景: {conversation_context} + +## 输出格式 +{output_format} + +## 示例 +用户问题: "配网D3软件如何导入Excel清单?" +假设性回答: "在配网D3软件中导入Excel清单的步骤如下: +1. 打开配网D3软件,进入工程管理模块 +2. 点击菜单栏中的'导入/导出'选项 +3. 选择'导入Excel清单'功能 +4. 在弹出的对话框中选择已准备好的Excel文件 +5. 确认导入格式和映射关系 +6. 点击'确定'完成导入 +注意:导入的Excel文件需要符合特定的格式要求,建议先在软件中导出一个样例文件作为模板。" +""" + +multi_questions_prompt = """ +# 多角度问题生成器 + +你是一个专业的电力造价领域问题分解专家。你的任务是根据用户的原始问题和历史对话,从不同角度生成多个子问题,以帮助系统更全面地理解和回答用户的需求。 + +## 任务说明 +1. 分析用户的原始问题,理解其核心意图和需求 +2. 考虑历史对话和会话背景,理解用户当前问题的上下文 +3. 从不同角度生成2-4个子问题,这些子问题应该: + - 分别关注原始问题的不同方面或组成部分 + - 更加具体和直接 + - 共同覆盖原始问题的完整意图 + - 考虑历史对话中的相关信息 + - 每个子问题都应该是自包含的,可以独立回答 + +## 输入 +用户原始问题: {query} +历史对话记录: {chat_history} +会话背景: {conversation_context} + +## 输出格式 +{output_format} + +## 示例 +原始问题: "配网D3软件中如何处理定额调整和工程量清单导入?" +子问题: +1. "配网D3软件中如何进行定额调整?" +2. "配网D3软件中如何导入工程量清单?" +3. "定额调整后,是否会影响已导入的工程量清单?" +按格式输出: +{{ + "original_query":"配网D3软件中如何处理定额调整和工程量清单导入?" + "sub_questions": [ + "配网D3软件中如何进行定额调整?", + "配网D3软件中如何导入工程量清单?", + "定额调整后,是否会影响已导入的工程量清单?" + ] +}} + +原始问题: "技改T1软件和配网D3软件的区别是什么?" +子问题: +1. "技改T1软件的主要功能和适用范围是什么?" +2. "配网D3软件的主要功能和适用范围是什么?" +3. "技改T1软件和配网D3软件在使用场景上有什么不同?" +4. "如何选择使用技改T1软件还是配网D3软件?" +按格式输出: +{{ + "original_query":"技改T1软件和配网D3软件的区别是什么?" + "sub_questions": [ + "技改T1软件的主要功能和适用范围是什么?", + "配网D3软件的主要功能和适用范围是什么?", + "技改T1软件和配网D3软件在使用场景上有什么不同?", + "如何选择使用技改T1软件还是配网D3软件?" + ] +}} """ \ No newline at end of file