diff --git a/kg_lab_6.13/llm.py b/kg_lab_6.13/llm.py new file mode 100644 index 0000000..8945722 --- /dev/null +++ b/kg_lab_6.13/llm.py @@ -0,0 +1,111 @@ +import requests +from typing import Any, Dict, List, Optional +from langchain_core.language_models import BaseLLM +from langchain_core.callbacks import CallbackManagerForLLMRun +from langchain_core.outputs import LLMResult, Generation +from openai import OpenAI +import httpx +import logging + + +# class OpenAiLLM: + +# def __init__(self, url, api_key, model_name): +# self._api_key = api_key +# self._url = url +# self._model = model_name + +# def generate(self, prompt): + +# client = OpenAI(api_key=self._api_key, base_url=self._url) + +# try: +# # 创建 Completion 请求 +# completion = client.chat.completions.create( +# model=self._model, +# messages=[ +# {"role": "system", "content": "You are a helpful assistant"}, +# {"role": "user", "content": prompt} +# ], +# timeout=httpx.Timeout(300.0), +# temperature=0.7, +# ) +# return completion.choices[0].message.content +# except Exception as e: +# logging.error(f"LLM调用出错: {e}") +# return f"模型调用失败: {str(e)}" + + +# llm = OpenAiLLM( +# url="http://172.20.0.145:9995/v1", +# api_key="xxx", +# model_name="deepseek-r1-distill-qwen2.5-32b", +# ) + + +class Embedding: + def __init__(self, url, api_key, model_name): + self._api_key = api_key + self._url = url + self._model = model_name + + def embed(self, text): + # 使用OpenAI客户端 + client = OpenAI(api_key=self._api_key, base_url=self._url) + + try: + # 调用embeddings API + response = client.embeddings.create(model=self._model, input=text, timeout=httpx.Timeout(60.0)) + # 返回嵌入向量 + return response.data[0].embedding + except Exception as e: + logging.error(f"嵌入模型调用出错: {e}") + raise RuntimeError(f"嵌入请求失败: {str(e)}") + + +embedding = Embedding(url="http://10.1.16.39:9995/v1", api_key="xxx", model_name="bge-m3") + + +class SiliconFlowLLM(BaseLLM): + """自定义硅基流动大模型调用类""" + + api_url: str + api_key: str + model: str + + def _generate( + self, + prompts: List[str], + stop: Optional[List[str]] = None, + run_manager: Optional[CallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> LLMResult: + from langchain_core.outputs import Generation, LLMResult + + generations = [] + for prompt in prompts: + try: + headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"} + payload = { + "model": self.model, + "messages": [{"role": "user", "content": prompt}], + } + response = requests.post(self.api_url, json=payload, headers=headers) + response.raise_for_status() + text = response.json()["choices"][0]["message"]["content"] + generations.append([Generation(text=text)]) + except Exception as e: + raise Exception(f"调用硅基流动API失败: {str(e)}") + + return LLMResult(generations=generations) + + @property + def _llm_type(self) -> str: + return "siliconflow" + + +llm = SiliconFlowLLM( + api_url="https://api.siliconflow.cn/v1/chat/completions", + api_key="sk-bbeamiumkouptsrueilgufqqyuumelcsivxwjbdugqwsqhwj", + model="Qwen/Qwen2.5-72B-Instruct", +) diff --git a/kg_lab_6.13/parameter_rewriting.py b/kg_lab_6.13/parameter_rewriting.py new file mode 100644 index 0000000..99609df --- /dev/null +++ b/kg_lab_6.13/parameter_rewriting.py @@ -0,0 +1,383 @@ +import json +import ast +import inspect + +from typing import Dict, Any, List, Optional, Union +from langchain.agents import Tool, initialize_agent, AgentType +from langchain.chat_models import ChatOpenAI +from langchain.prompts import PromptTemplate +from langchain.schema import BaseOutputParser +from langchain.schema import BaseRetriever +from langchain.callbacks.manager import CallbackManagerForRetrieverRun +from langchain.schema import Document +from langchain.chains import LLMChain +from langchain.llms.base import BaseLLM + +from prompt_templates import FUNCTION_RETURNS_LOOP_PROMPT +from llm import llm as base_llm +import project # 明确导入project模块 + + +class CodeOnlyOutputParser(BaseOutputParser): + def parse(self, text: str) -> dict: + if "Final Answer:" in text: + code_part = text.split("Final Answer:")[-1].strip() + else: + code_part = text.strip() + + return {"code": code_part} + + def get_format_instructions(self) -> str: + return "只输出最终的Python代码,不要包含其他解释或内容。" + + +class KnowledgeGraphProcessor: + """知识图谱处理器""" + + def __init__(self, kg_file="kg_simple_hierarchy.json"): + """初始化处理器""" + self.kg_data = self._load_kg_data(kg_file) + self.prompt = FUNCTION_RETURNS_LOOP_PROMPT + + def _load_kg_data(self, file_path: str) -> Dict[str, Any]: + """加载知识图谱数据""" + try: + with open(file_path, "r", encoding="utf-8") as f: + return json.load(f) + except FileNotFoundError: + print(f"警告: 未找到 {file_path} 文件") + return {} + except Exception as e: + print(f"加载知识图谱数据时出错: {e}") + return {} + + def _extract_parameters_from_code(self, code: str) -> List[str]: + """从代码中提取字符串参数""" + try: + tree = ast.parse(code) + parameters = [] + + for node in ast.walk(tree): + if isinstance(node, ast.Str): + parameters.append(node.s) + elif isinstance(node, ast.Constant) and isinstance(node.value, str): + parameters.append(node.value) + + return parameters + except Exception as e: + print(f"解析代码时出错: {e}") + return [] + + def _find_node_by_path(self, path: str, data: Union[Dict[str, Any], List[Any]] = None) -> Optional[Dict[str, Any]]: + """ + 根据路径查找节点 + + Args: + path: 节点路径,格式如 "工程数据/安装过程/安装/架空输电线路本体工程/杆塔工程" + data: 当前数据节点 + + Returns: + 找到的节点及其子节点 + """ + if data is None: + data = self.kg_data + + path_parts = path.split("/") + current_part = path_parts[0] + + # 如果data是列表,则遍历列表中的每个元素 + if isinstance(data, list): + for item in data: + result = self._find_node_by_path(path, item) + if result: + return result + return None + + # 如果data是字典,则按原来的逻辑处理 + if isinstance(data, dict): + # 检查当前节点是否匹配 + found_node = None + for key, value in data.items(): + if isinstance(value, str) and current_part in value: + found_node = data + break + + # 如果在当前层级找到了节点 + if found_node: + # 如果还有更深的路径部分,则继续递归查找 + if len(path_parts) > 1: + # 查找子节点 + if "children" in found_node: + for child in found_node["children"]: + result = self._find_node_by_path("/".join(path_parts[1:]), child) + if result: + return result + return None + else: + # 已经找到最终节点 + return found_node + + # 如果当前层级没找到,则查找子节点 + if "children" in data: + for child in data["children"]: + result = self._find_node_by_path(path, child) + if result: + return result + + return None + + def _search_in_kg(self, parameter: str, data: Union[Dict[str, Any], List[Any]] = None) -> Optional[Dict[str, Any]]: + """在知识图谱中递归搜索参数""" + # 先尝试按路径查找 + if "/" in parameter: + result = self._find_node_by_path(parameter) + if result: + return result + + # 如果路径查找失败,则按关键词搜索 + if data is None: + data = self.kg_data + + # 如果是列表,遍历每个元素 + if isinstance(data, list): + for item in data: + if isinstance(item, (dict, list)): + result = self._search_in_kg(parameter, item) + if result: + return result + return None + + # 如果是字典,按原来的逻辑处理 + if isinstance(data, dict): + for key, value in data.items(): + if isinstance(value, str) and parameter in value: + return data + elif isinstance(value, list): + for item in value: + if isinstance(item, (dict, list)): + result = self._search_in_kg(parameter, item) + if result: + return result + elif isinstance(value, dict): + result = self._search_in_kg(parameter, value) + if result: + return result + + return None + + def _get_node_definition(self, node_type: str) -> str: + """获取节点类型定义""" + try: + # 检查project模块中是否存在该类 + if hasattr(project, node_type): + cls = getattr(project, node_type) + source = inspect.getsource(cls) + return source + else: + return f"未找到类型 {node_type} 的定义" + except NameError: + return f"获取类型 {node_type} 定义时出错: project模块未正确导入" + except Exception as e: + return f"获取类型 {node_type} 定义时出错: {e}" + + def _extract_node_types(self, data: Dict[str, Any], node_types: set): + """递归提取所有节点类型""" + if not isinstance(data, dict): + return + + for key, value in data.items(): + # 所有键都可能是节点类型 + node_types.add(key) + + if isinstance(value, dict): + self._extract_node_types(value, node_types) + elif isinstance(value, list): + for item in value: + if isinstance(item, dict): + self._extract_node_types(item, node_types) + + def _get_relevant_knowledge(self, code: str) -> tuple[str, str]: + """根据代码获取相关知识库内容和节点定义""" + parameters = self._extract_parameters_from_code(code) + + knowledge_base = "" + node_definitions = "" + + for param in parameters: + # 处理【】格式的路径,提取最后一个部分 + if "【" in param: + path_parts = [] + parts = param.split("【") + for part in parts: + if "】" in part: + clean_part = part.split("】")[0].strip() + if clean_part: + path_parts.append(clean_part) + + # 对于每个路径,只取最后一个部分 + for path in path_parts: + if "/" in path: + # 提取路径中的最后一个部分 + last_part = path.split("/")[-1].strip() + if last_part: + found_data = self._search_in_kg(last_part) + if found_data: + knowledge_base += f"节点 '{last_part}' 相关信息:\n" + knowledge_base += json.dumps(found_data, ensure_ascii=False, indent=2) + "\n\n" + + # 提取节点类型 + node_types = set() + self._extract_node_types(found_data, node_types) + + for node_type in node_types: + definition = self._get_node_definition(node_type) + if definition and "未找到" not in definition and "出错" not in definition: + node_definitions += f"类型 {node_type} 定义:\n{definition}\n\n" + else: + # 如果没有/,直接使用整个部分 + found_data = self._search_in_kg(path) + if found_data: + knowledge_base += f"节点 '{path}' 相关信息:\n" + knowledge_base += json.dumps(found_data, ensure_ascii=False, indent=2) + "\n\n" + + # 提取节点类型 + node_types = set() + self._extract_node_types(found_data, node_types) + + for node_type in node_types: + definition = self._get_node_definition(node_type) + if definition and "未找到" not in definition and "出错" not in definition: + node_definitions += f"类型 {node_type} 定义:\n{definition}\n\n" + else: + # 处理普通参数 + found_data = self._search_in_kg(param) + if found_data: + knowledge_base += f"参数 '{param}' 相关信息:\n" + knowledge_base += json.dumps(found_data, ensure_ascii=False, indent=2) + "\n\n" + + # 提取节点类型 + node_types = set() + self._extract_node_types(found_data, node_types) + + for node_type in node_types: + definition = self._get_node_definition(node_type) + if definition and "未找到" not in definition and "出错" not in definition: + node_definitions += f"类型 {node_type} 定义:\n{definition}\n\n" + + return knowledge_base, node_definitions + + def process_query( + self, original_query: str, original_code: str, error_info: Dict[str, str] = None + ) -> Dict[str, str]: + """ + 处理查询并尝试修复错误 + + Args: + original_query: 原始查询字符串 + original_code: 原始代码字符串 + error_info: 错误信息字典 (可选) + + Returns: + Dict: 包含修改后的query和code的字典 + """ + print("\n====== RAG重写查询开始 ======") + print(f"原始查询: {original_query}") + print(f"原始代码: {original_code}") + print(f"错误信息: {error_info}") + + # 初始化工具 + tools = [ + Tool( + name="search_knowledge_base", + func=lambda x: json.dumps(self._search_in_kg(x), ensure_ascii=False), + description="用于在知识图谱中搜索节点信息", + ), + Tool( + name="get_node_definition", + func=lambda x: self._get_node_definition(x), + description="获取指定类型的节点定义", + ), + ] + + # 使用基础 LLM + llm = base_llm + + # 初始化 Agent + agent_executor = initialize_agent( + tools, + llm, + agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, + verbose=True, + handle_parsing_errors=True, + prompt=self.prompt, + output_parser=CodeOnlyOutputParser(), + ) + + # 构造输入变量 + knowledge_base, node_definitions = self._get_relevant_knowledge(original_code) + + print(f"知识库内容: {knowledge_base}") + print(f"节点定义: {node_definitions}") + + input_vars = { + "input": original_query, + "original_query": original_query, + "error_info": json.dumps(error_info, ensure_ascii=False) if error_info else "", + "original_code": original_code, + "KnowledgeBase": knowledge_base, + "NodeDefinition": node_definitions, + } + + print(f"Agent输入变量: {input_vars}") + + # 调用 Agent 执行 + response = agent_executor.invoke(input_vars) + + print(f"Agent响应: {response}") + + # 处理不同的返回格式 + if isinstance(response, dict): + if "output" in response: + if isinstance(response["output"], dict) and "code" in response["output"]: + return {"query": original_query, "code": response["output"]["code"]} + elif isinstance(response["output"], str): + return {"query": original_query, "code": response["output"]} + + # 如果无法解析,返回原始代码 + print(f"警告: 无法解析 Agent 响应,返回原始代码") + return {"query": original_query, "code": original_code} + + +# 全局处理器实例(延迟初始化) +_processor = None + + +def _get_processor(): + """获取处理器实例(单例模式)""" + global _processor + if _processor is None: + _processor = KnowledgeGraphProcessor() + return _processor + + +def rewrite_query_parameters( + original_query: str, original_code: str, error_info: Dict[str, str] = None +) -> Dict[str, str]: + """ + 重写查询参数的外部接口函数 + + Args: + original_query: 原始查询字符串 + original_code: 原始代码字符串 + error_info: 错误信息字典 (可选) + + Returns: + Dict: 包含修改后的query和code的字典 + + Example: + result = rewrite_query_parameters("查询动态费用", 'search_node("动态费用")') + print(result["query"]) # 修改后的查询 + print(result["code"]) # 修改后的代码 + """ + processor = _get_processor() + return processor.process_query(original_query, original_code, error_info) diff --git a/kg_lab_6.13/project.py b/kg_lab_6.13/project.py new file mode 100644 index 0000000..56d0177 --- /dev/null +++ b/kg_lab_6.13/project.py @@ -0,0 +1,536 @@ +""" +软件知识图谱类定义 +根据Ontology_Layer.txt文件中的知识图谱信息创建 +""" + +from abc import ABC, abstractmethod +import json + + +class ProjectTookiIt(ABC): + """ + 项目类(抽象基类) + 描述: 代表整个项目结构的顶层容器 + """ + + def __init__(self): + self.project_division_set = ProjectDivisionItem() # 项目划分集对象 + + # 项目划分查询方法 + @abstractmethod + def get_division_item_by_path(self, path): + """ + 通过路径获取项目划分对象 + + Args: + path (str): 项目划分节点的路径,以'/'分隔的多级节点路径 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回的项目划分对象数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表,如果未找到节点,则包含父节点下所有ProjectDivisionItem节点 + + Note: + 当路径为空字符串时,会返回None + """ + pass + + @abstractmethod + def get_division_node_by_parent_and_name(self, parent_path, partial_name): + """ + 通过父节点路径和模糊节点名称获取项目划分对象,包括子节点 + + Args: + parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 + partial_name (str): 目标节点的模糊或不完整名称 + + Returns: + Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (List[Dict[str, Any]]): 成功时返回的数据列表 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表 + """ + pass + + # 工程量查询方法 + @abstractmethod + def get_quantities_by_paths(self, paths_str): + """ + 获取指定项目路径下的工程量对象 + + Args: + paths_str (str): 以'/'分隔的多级节点路径 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: + - status: 'success'或'error' + - data: 成功时返回的节点数据字典,失败时为{} + - error: 错误信息,成功时为'' + - helper_info: 辅助信息列表(当目标节点不存在时返回父节点下的所有ProjectQuantity节点名称) + """ + pass + + @abstractmethod + def get_quantities_node_by_parent_and_code(self, parent_path, quantity_type=None, code=None): + """ + 通过父节点路径和编码获取工程量对象(定额、主材或设备),包括子节点 + + Args: + parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 + quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None表示所有类型) + code (str): 工程量编码,以'/'分隔的多个编码 + + Returns: + Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (List[Dict[str, Any]]): 成功时返回的数据列表 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表 + """ + pass + + @abstractmethod + def get_quantities_node_by_parent_and_name(self, parent_path, partial_name, quantity_type=None): + """ + 通过父节点路径、模糊节点名称和类型获取工程量对象(主材或者设备),包括子节点 + + Args: + parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 + partial_name (str): 目标节点的模糊或不完整名称 + quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None表示所有类型) + + Returns: + Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (List[Dict[str, Any]]): 成功时返回的数据列表 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表 + """ + pass + + # 材机查询方法 + @abstractmethod + def get_material_equipment_by_path(self, paths_str): + """ + 通过路径获取材机对象(MaterialOrEquipment 类型节点) + + Args: + path (str): 项目划分节点的路径,以'/'分隔的多级节点路径 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回的节点属性数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 如果未找到目标节点,则包含父节点下所有 MaterialOrEquipment 节点名称 + """ + pass + + @abstractmethod + def get_material_equipment_by_parent_and_name(self, parent_path, partial_name): + """ + 通过父节点路径和模糊名称获取材机对象 + + Args: + parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 + partial_name (str): 目标节点的模糊或不完整名称 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回的节点属性数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 如果未找到目标节点,则包含父节点下所有 MaterialOrEquipment 节点名称 + """ + pass + + # 取费查询方法 + @abstractmethod + def get_fee_template_by_path(self, paths_str): + """ + 通过路径获取取费节点 + + Args: + path (str): 项目划分节点的路径,以'/'分隔的多级节点路径 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回的节点属性数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 如果未找到目标节点,则包含父节点下所有 FeeCollection 节点名称 + """ + pass + + @abstractmethod + def get_fee_template_by_parent_and_name(self, parent_path, partial_name): + """ + 通过父节点路径和节点名称获取取费节点 + + Args: + parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 + node_name (str): 目标节点的名称,例如:直接费 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回的节点属性数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 如果未找到父节点,则包含父节点的父节点下所有节点名称 + - 如果未找到目标节点,则包含父节点下所有 FeeCollection 节点名称 + - 否则为空 + """ + pass + + # 费用表查询方法 + @abstractmethod + def get_fee_schedule_on_auxiliary_expense_table(self, table_name, fee_name, fee: str): + """ + 在辅助费用表中查找费用 + + Args: + table_name (str): 费用表名称 + fee_name (str): 要查找的费用名称(可能在多级子节点中) + fee_attribute (str): 费用值属性名 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 第一步失败:返回"工程数据"节点下所有子节点名称 + - 第二步失败:返回费用表节点下所有子节点名称(递归查找) + - 第三步失败:返回费用节点的所有可用属性名称 + - 否则为空 + """ + pass + + @abstractmethod + def get_fee_schedule_on_other_expense_table(self, table_name, fee_name, fee): + """ + 在其它费用表中查找费用 + + Args: + table_name (str): 费用表名称 + fee_name (str): 要查找的费用名称(可能在多级子节点中) + fee_attribute (str): 费用值属性名 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 第一步失败:返回"工程数据"节点下所有子节点名称 + - 第二步失败:返回费用表节点下所有子节点名称(递归查找) + - 第三步失败:返回费用节点的所有可用属性名称 + - 否则为空 + """ + pass + + @abstractmethod + def get_fee_schedule_on_land_acquisition_fee_table_table(self, table_name, fee_name, fee): + """ + 在其中:土地征用费表中查找费用 + + Args: + table_name (str): 费用表名称 + fee_name (str): 要查找的费用名称(可能在多级子节点中) + fee_attribute (str): 费用值属性名 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 第一步失败:返回"工程数据"节点下所有子节点名称 + - 第二步失败:返回费用表节点下所有子节点名称(递归查找) + - 第三步失败:返回费用节点的所有可用属性名称 + - 否则为空 + """ + pass + + @abstractmethod + def get_fee_schedule_on_installation_price_difference_table(self, table_name, fee_name, fee): + """ + 在安装价差表中查找费用 + + Args: + table_name (str): 费用表名称 + fee_name (str): 要查找的费用名称(可能在多级子节点中) + fee_attribute (str): 费用值属性名 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 第一步失败:返回"工程数据"节点下所有子节点名称 + - 第二步失败:返回费用表节点下所有子节点名称(递归查找) + - 第三步失败:返回费用节点的所有可用属性名称 + - 否则为空 + """ + pass + + @abstractmethod + def get_fee_schedule_on_Engineering_Cost_table(self, table_name, fee_name, fee): + """ + 在工程费用表中查找费用 + + Args: + table_name (str): 费用表名称 + fee_name (str): 要查找的费用名称(可能在多级子节点中) + fee_attribute (str): 费用值属性名 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 第一步失败:返回"工程数据"节点下所有子节点名称 + - 第二步失败:返回费用表节点下所有子节点名称(递归查找) + - 第三步失败:返回费用节点的所有可用属性名称 + - 否则为空 + """ + pass + + +class ProjectDivisionItem: + """ + 项目划分项 + 描述: 代表项目结构中的层级条目,具有自身的属性,并且可以包含子项目划分项或详细工作项 + JSON对应: ProjectDivisionSet数组中的对象,或ProjectDivisionItem的children数组中type为"项目划分"的对象 + """ + + def __init__(self): + self.GUID = None # xsd:string + self.id = None # xsd:string + self.name = None # xsd:string + self.代码 = None # xsd:string (可选) + self.费率 = None # xsd:string + self.单位 = None # xsd:string (可选) + self.取费表id = None # xsd:string + self.颜色标记 = None # xsd:string + self.取费表 = None # xsd:string + self.合价含税 = None # xsd:string (可选) + self.type = None # xsd:string + self.专业类型 = None # xsd:string + self.资源库列表 = None # xsd:list (可选) + self.notCheck = None # xsd:string (可选) + + +class ProjectQuantity: + """ + 工程量 + 描述: 代表项目划分项(ProjectDivisionItem)下的具体工作单元或物料项,是定额、主材、设备的父类型 + """ + + def __init__(self): + self.id = None # xsd:string + self.类型 = None # xsd:string ("0"为定额,"1"为主材,"5"为设备) + self.name = None # xsd:string + self.编码 = None # xsd:string + self.单位 = None # xsd:string + self.数量 = None # xsd:string + self.资源库名称 = None # xsd:string + self.投标数量 = None # xsd:string (可选) + self.投标单价 = None # xsd:string (可选) + self.特征段 = None # xsd:string (可选) + self.关联父级量 = None # xsd:string (可选) + self.颜色标记 = None # xsd:string (可选) + self.单价不含税 = None + self.cost_set = None # xsd:CostSet + + +class Ration(ProjectQuantity): + """ + 定额 + 描述: 代表一种标准的工程量条目,通常包含详细的工、料、机消耗标准 + """ + + def __init__(self): + super().__init__() + self.计算式 = None # xsd:string + self.中标计算式 = None # xsd:string + self.人工费 = None # xsd:string + self.机械费 = None # xsd:string + self.甲供材料费不含税 = None # xsd:string + self.材料费 = None # xsd:string + self.定额系数 = None # xsd:string + self.人工系数 = None # xsd:string + self.材料系数 = None # xsd:string + self.机械系数 = None # xsd:string + self.定额范围 = None # xsd:string + self.定额章节名称 = None # xsd:string + self.费用类型 = None # xsd:string + self.甲供材料费含税 = None # xsd:string + self.投标合价 = None # xsd:string + self.其中甲供材料费 = None # xsd:string + self.合价不含税 = None # xsd:string + self.基价 = None # xsd:string + self.所属定额库 = None # xsd:string + + def __str__(self): + """返回定额的字符串表示""" + attrs = {k: v for k, v in self.__dict__.items() if not k.startswith("_")} + return json.dumps(attrs, ensure_ascii=False, indent=2) + + +class Material(ProjectQuantity): + """ + 主材 + 描述: 代表工程中使用的主要材料 + """ + + def __init__(self): + super().__init__() + self.规格型号 = None # xsd:string + self.损耗率 = None # xsd:string + self.供货方 = None # xsd:string + self.集中配送 = None # xsd:string (可选) + self.单重 = None # xsd:string (可选) + self.市场价不含税 = None # xsd:string + self.市场价含税 = None # xsd:string + self.单价含税 = None # xsd:string + self.结算市场价不含税 = None # xsd:string (可选) + self.结算市场价含税 = None # xsd:string (可选) + self.基准价不含税 = None # xsd:string (可选) + self.基准价含税 = None # xsd:string (可选) + self.费用类型 = None # xsd:string + self.增值税率 = None # xsd:string (可选) + self.合价含税 = None # xsd:string + self.合价不含税 = None # xsd:string + self.线重 = None # xsd:string (可选) + self.制造长度 = None # xsd:string (可选) + self.截面积 = None # xsd:string (可选) + + def __str__(self): + """返回材料的字符串表示""" + attrs = {k: v for k, v in self.__dict__.items() if not k.startswith("_")} + return json.dumps(attrs, ensure_ascii=False, indent=2) + + +class Equipment(ProjectQuantity): + """ + 设备 + 描述: 代表工程中安装或使用的设备 + """ + + def __init__(self): + super().__init__() + self.规格型号 = None # xsd:string + self.供货方 = None # xsd:string + self.运杂费率 = None # xsd:string (可选) + self.单价含税 = None # xsd:string + self.设备类型 = None # xsd:string (可选) + self.增值税率 = None # xsd:string (可选) + self.合价含税 = None # xsd:string + self.合价不含税 = None # xsd:string + + +class MaterialOrEquipment: + """ + 材机 + 描述: 代表DetailedWorkItem中所列出的具体材料、人工或机械设备及其详细信息 + """ + + def __init__(self): + self.id = None # xsd:string + self.编码 = None # xsd:string + self.名称 = None # xsd:string + self.单位 = None # xsd:string + self.类型 = None # xsd:string + self.供货方 = None # xsd:string + self.预算价不含税 = None # xsd:string + self.市场价不含税 = None # xsd:string + self.预算价含税 = None # xsd:string + self.市场价含税 = None # xsd:string + self.结算预算价不含税 = None # xsd:string + self.结算市场价不含税 = None # xsd:string + self.结算预算价含税 = None # xsd:string + self.结算市场价含税 = None # xsd:string + self.暂估价 = None # xsd:string + self.拆分 = None # xsd:string + self.全口径市场价不含税 = None # xsd:string + self.全口径市场价含税 = None # xsd:string + self.商品砼 = None # xsd:string + self.数量 = None # xsd:string + self.是否未计价 = None # xsd:string + + +class FeeTableTemplateItem: + """ + 取费表模板项 + 描述: + """ + + def __init__(self): + self.name = None # xsd:string + self.OutlayID = None # xsd:string + self.type = None # xsd:string + self.profession = None # xsd:string + + +class FeeCollection: + """ + 取费 + 描述: + """ + + def __init__(self): + self.name = None # xsd:string + self.serialNumber = None # xsd:string + self.code = None # xsd:string + self.base = None # xsd:string + self.rate = None # xsd:string + + +class FeeScheduleItem: + """ + 费用表项 + 描述: + """ + + def __init__(self): + self.name = None # xsd:string + + +class Fee: + """ + 取费 + 描述: + """ + + def __init__(self): + self.name = None # xsd:string + self.serialNumber = None # xsd:string + self.code = None # xsd:string + self.rate = None # xsd:string + self.amount = None # xsd:string + self.输出 = None # xsd:string (可选) + self.取费基数 = None # xsd:string (可选) + self.编制依据 = None # xsd:string (可选) + self.编码 = None # xsd:string (可选) + self.备注 = None # xsd:string (可选) + self.表一显示 = None # xsd:string (可选) + self.报表输出 = None # xsd:string (可选) + self.建安合计费 = None # xsd:string (可选) + self.合计费 = None # xsd:string (可选) + self.占总计 = None # xsd:string (可选) + self.建筑费 = None # xsd:string (可选) + self.安装费 = None # xsd:string (可选) + self.设备费 = None # xsd:string (可选) + self.其他费 = None # xsd:string (可选) + self.施工费 = None # xsd:string (可选) + self.单位投资 = None # xsd:string (可选) diff --git a/kg_lab_6.13/project_implementation.py b/kg_lab_6.13/project_implementation.py new file mode 100644 index 0000000..5c64a05 --- /dev/null +++ b/kg_lab_6.13/project_implementation.py @@ -0,0 +1,2142 @@ +from neo4j import GraphDatabase +from project import * +import atexit +from typing import Tuple, List, Dict, Any, Optional + + +class ProjectTookiItNeo4j(ProjectTookiIt): + """ + 基于Neo4j数据库的项目类实现 + """ + + def __init__(self): + """ + 初始化Neo4j连接 + + Args: + uri (str): Neo4j数据库URI + user (str): 用户名 + password (str): 密码 + """ + uri = "bolt://172.20.0.145:7687" + user = "neo4j" + password = "password" + + super().__init__() + self.driver = GraphDatabase.driver(uri, auth=(user, password)) + self.session = self.driver.session() + + # 初始化其他必要的数据结构 + self.material_equipment_dict = {} # 材机字典,键为ID + self.fee_templates = {} # 取费表模板字典,键为ID + self.fee_schedules = {} # 费用表字典,键为ID + self.project_properties = {} # 工程属性字典 + + def close(self): + """ + 关闭数据库连接 + """ + if self.session: + self.session.close() + if self.driver: + self.driver.close() + + # 通用节点查询方法 + def get_node_by_path(self, path, node_labels=None): + """ + 通过路径获取节点对象 + + Args: + path (str): 以'/'分隔的多级节点路径 + node_labels (list): 节点标签列表,用于过滤结果 + + Returns: + dict|None: 节点数据,如果路径不存在返回None + """ + if not path: + return None + + # 分割路径为各个部分 + path_parts = path.split("/") + + # 构建查询 + if len(path_parts) == 1: + # 只有一级路径,直接查询 + if node_labels: + labels_str = ":" + "|:".join(node_labels) + query = f""" + MATCH (n{labels_str}) + WHERE n.name = $name + RETURN n LIMIT 1 + """ + else: + query = """ + MATCH (n) + WHERE n.name = $name + RETURN n LIMIT 1 + """ + params = {"name": path_parts[0]} + else: + # 多级路径,构建路径查询 + last_part = path_parts[-1] + if node_labels: + labels_str = ":" + "|".join(node_labels) + query = f""" + MATCH path = (root)-[*]->(target{labels_str}) + WHERE target.name = $last_part + RETURN target as n LIMIT 1 + """ + else: + query = """ + MATCH path = (root)-[*]->(target) + WHERE target.name = $last_part + RETURN target as n LIMIT 1 + """ + params = {"last_part": last_part} + + try: + result = self.session.run(query, **params) + record = result.single() + + if not record: + return None + + return record["n"] + except Exception as e: + print(f"获取节点对象时出错: {e}") + return None + + # 项目划分查询方法 + def get_division_item_by_path(self, path) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 通过路径获取项目划分对象 + + Args: + path (str): 项目划分节点的路径,以'/'分隔的多级节点路径 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回的项目划分对象数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表,如果未找到节点,则包含父节点下所有ProjectDivisionItem节点 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 获取节点数据 + node_data = self.get_node_by_path(path, ["ProjectDivisionItem"]) + + if not node_data: + status = "error" + error = f"找不到路径: {path} 上的ProjectDivisionItem节点" + + # 提取父路径 + path_parts = path.split("/") + if len(path_parts) > 1: + # 去掉最后一个部分,得到父路径 + parent_path = "/".join(path_parts[:-1]) + parent_name = path_parts[-2] # 父节点名称 + + # 获取父节点 + parent_node = self.get_node_by_path(parent_path) + + if parent_node: + # 查询父节点下所有类型为ProjectDivisionItem的子节点 + query = """ + MATCH (p)-[*1..1]->(q:ProjectDivisionItem) + WHERE p.name = $parent_name + RETURN q.name as name + """ + params = {"parent_name": parent_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询父节点下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 创建项目划分对象并填充属性 + item = ProjectDivisionItem() + for key, value in node_data.items(): + if hasattr(item, key): + setattr(item, key, value) + + # 将对象的属性转换为字典 + for key, value in vars(item).items(): + if not key.startswith("_"): # 排除私有属性 + data[key] = value + + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + def get_division_node_by_parent_and_name( + self, parent_path, partial_name + ) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]: + """ + 通过父节点路径和模糊节点名称获取项目划分对象,包括子节点 + + 执行两步查询: + 1. 找到对应路径的父节点 + 2. 在父节点下查找名称中包含模糊名称的节点 + + Args: + parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 + partial_name (str): 目标节点的模糊或不完整名称 + + Returns: + Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (List[Dict[str, Any]]): 成功时返回的数据列表 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表 + """ + # 初始化返回结果 + status = "success" + data = [] + error = "" + helper_info = [] + + # 检查部分名称是否为空 + if not partial_name: + status = "error" + error = "节点名称不能为空" + return status, data, error, helper_info + + try: + # 第一步:找到父节点 + parent_node_data = self.get_node_by_path(parent_path, ["ProjectDivisionItem"]) + + # 如果找不到父节点 + if not parent_node_data: + status = "error" + error = f"找不到父节点路径: {parent_path}" + + # 提取父路径的父节点 + path_parts = parent_path.split("/") + if len(path_parts) > 1: + # 去掉最后一个部分,得到父路径的父路径 + grandparent_path = "/".join(path_parts[:-1]) + grandparent_name = path_parts[-2] if len(path_parts) >= 2 else "" + + # 查询父路径的父节点下所有类型为ProjectDivisionItem的节点,不指定关系类型 + query = """ + MATCH (p)-[*1..1]->(q:ProjectDivisionItem) + WHERE p.name = $grandparent_name + RETURN q.name as name + """ + params = {"grandparent_name": grandparent_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + + # 如果没有找到任何节点,尝试更宽泛的查询 + if not helper_info: + broader_query = """ + MATCH (p {name: $grandparent_name})-[*1..2]->(q:ProjectDivisionItem) + RETURN q.name as name LIMIT 50 + """ + broader_result = self.session.run(broader_query, **params) + for record in broader_result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + print(f"查询辅助信息时出错: {str(e)}") + helper_info = [f"查询辅助信息时出错: {str(e)}"] + + return status, data, error, helper_info + + # 获取父节点名称(确保一定有name字段) + parent_name = parent_node_data.get("name") + if not parent_name: + status = "error" + error = "父节点数据中没有name字段" + return status, data, error, helper_info + + parent_name = parent_node_data.get("name", "") + + # 第二步:改用父节点名称进行查询(不再依赖id) + query = """ + MATCH (p {name: $parent_name})-[*1..1]-(n:ProjectDivisionItem) + WHERE toLower(n.name) CONTAINS toLower($partial_name) + RETURN n LIMIT 50 + """ + params = {"parent_name": parent_name, "partial_name": partial_name.strip()} + + # print(f"调试参数: parent_name='{parent_name}', partial_name='{partial_name}'") + result = self.session.run(query, **params) + records = list(result) + # print(f"调试返回: 找到{len(records)}条记录") + + # 处理查询结果 + items = [] + for record in records: + node_data = record["n"] + item = ProjectDivisionItem() + for key, value in node_data.items(): + if hasattr(item, key): + setattr(item, key, value) + items.append(vars(item)) + + # 如果没有找到匹配的节点 + if not items: + status = "error" + error = f"在父节点 '{parent_name}' 下找不到名称包含 '{partial_name}' 的节点" + + # 查询父节点下所有类型为ProjectDivisionItem的节点作为辅助信息,不指定关系类型 + helper_query = """ + MATCH (p)-[*1..1]->(q:ProjectDivisionItem) + WHERE p.id = $parent_id + RETURN q.name as name + """ + helper_params = {"parent_name": parent_name} + + try: + helper_result = self.session.run(helper_query, **helper_params) + for record in helper_result: + if record["name"]: + helper_info.append(record["name"]) + + # 如果没有找到任何节点,尝试更宽泛的查询 + if not helper_info: + broader_query = """ + MATCH (p)-[*1..2]->(q:ProjectDivisionItem) + WHERE p.id = $parent_id + RETURN q.name as name LIMIT 50 + """ + broader_result = self.session.run(broader_query, **helper_params) + for record in broader_result: + if record["name"]: + helper_info.append(record["name"]) + + # 如果仍然没有找到,尝试使用名称而不是ID + if not helper_info: + name_query = """ + MATCH (p {name: $parent_name})-[*1..2]->(q:ProjectDivisionItem) + RETURN q.name as name LIMIT 50 + """ + name_params = {"parent_name": parent_name} + name_result = self.session.run(name_query, **name_params) + for record in name_result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + print(f"查询辅助信息时出错: {str(e)}") + helper_info = [f"查询辅助信息时出错: {str(e)}"] + + return status, data, error, helper_info + + # 成功找到匹配节点 + data = items + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + # 工程量查询方法 + def get_quantities_by_paths(self, paths_str) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 获取指定项目路径下的工程量对象 + + Args: + paths_str (str): 以'/'分隔的多级节点路径 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: + - status: 'success'或'error' + - data: 成功时返回的节点数据字典,失败时为{} + - error: 错误信息,成功时为'' + - helper_info: 辅助信息列表(当目标节点不存在时返回父节点下的所有ProjectQuantity节点名称) + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + if not paths_str: + status = "error" + error = "路径不能为空" + return status, data, error, helper_info + + try: + # 使用通用方法获取节点,考虑所有可能的工程量类型 + node_data = self.get_node_by_path(paths_str, ["ProjectQuantity", "Quota", "MainMaterial", "Equipment"]) + + if node_data: + # 根据节点标签或类型属性创建对应类型的对象 + quantity = self._create_quantity_object(node_data) + + # 填充属性 + for key, value in node_data.items(): + if hasattr(quantity, key): + setattr(quantity, key, value) + + # 将对象转为字典 + data = vars(quantity) + return status, data, error, helper_info + + # 如果找不到节点,尝试获取父节点下的所有ProjectQuantity节点作为辅助信息 + path_parts = paths_str.split("/") + if len(path_parts) > 1: + parent_path = "/".join(path_parts[:-1]) + target_name = path_parts[-1] + + # 获取父节点 + parent_node_data = self.get_node_by_path(parent_path, ["ProjectDivisionItem"]) + + if parent_node_data: + # 查询父节点下的所有符合条件的节点 + query = """ + MATCH (p)-[*1..1]->(n) + WHERE p.name = $parent_name AND + any(label IN labels(n) WHERE label IN $allowed_labels) + RETURN n.name as name, labels(n) as labels + """ + params = { + "parent_name": parent_node_data.get("name", ""), + "allowed_labels": ["ProjectQuantity", "Quota", "MainMaterial", "Equipment"], + } + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append({record["name"]}) + except Exception as e: + print(f"查询辅助信息时出错: {str(e)}") + helper_info = [f"查询辅助信息时出错: {str(e)}"] + + status = "error" + error = f"找不到路径: {paths_str}" + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + def get_quantities_node_by_parent_and_code( + self, parent_path, quantity_type=None, code=None + ) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]: + """ + 通过父节点路径和编码获取工程量对象(定额、主材或设备),包括子节点 + + 执行三步查询: + 1. 找到对应路径的父节点 + 2. 找到父节点下多级子节点中节点类型为参数中节点类型的所有节点 + 3. 在找到的节点中查找编码与传入编码匹配的节点并返回 + + Args: + parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 + quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None表示所有类型) + code (str): 工程量编码,以'/'分隔的多个编码 + + Returns: + Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (List[Dict[str, Any]]): 成功时返回的数据列表 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表 + """ + # 初始化返回结果 + status = "success" + data = [] + error = "" + helper_info = [] + + if not code: + status = "error" + error = "编码不能为空" + return status, data, error, helper_info + + # 验证类型参数是否有效 + valid_types = ["定额", "主材", "设备", None] + if quantity_type not in valid_types: + status = "error" + error = f"无效的工程量类型: '{quantity_type}'" + helper_info = valid_types + return status, data, error, helper_info + + try: + # 步骤1: 找到对应路径的父节点 + parent_node = self.get_node_by_path(parent_path) + if not parent_node: + status = "error" + error = f"找不到父节点路径: {parent_path}" + + # 尝试提供辅助信息 - 路径的各部分是否存在 + path_parts = parent_path.split("/") + existing_parts = [] + for i in range(len(path_parts)): + partial_path = "/".join(path_parts[: i + 1]) + if self.get_node_by_path(partial_path): + existing_parts.append(partial_path) + helper_info = existing_parts + return status, data, error, helper_info + + # 提取父节点名称用于后续查询 + parent_name = parent_node.get("name", "") + + # 根据工程量类型确定类型条件 + type_condition = "" + + if quantity_type == "定额": + type_condition = "(q:ProjectQuantity:Quota OR q.类型 = '0')" + elif quantity_type == "主材": + type_condition = "(q:ProjectQuantity:MainMaterial OR q.类型 = '1')" + elif quantity_type == "设备": + type_condition = "(q:ProjectQuantity:Equipment OR q.类型 = '5')" + else: + type_condition = "q:ProjectQuantity" + + # 步骤2: 查找父节点下所有的指定类型的子节点 + child_query = f""" + MATCH (p)-[*1..10]->(q) + WHERE p.name = $parent_name AND {type_condition} + RETURN q + """ + child_params = {"parent_name": parent_name} + + child_result = self.session.run(child_query, **child_params) + child_nodes = [record["q"] for record in child_result] + + if not child_nodes: + status = "error" + error = f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 的节点" + + # 尝试提供辅助信息 - 父节点下有哪些节点名称 + helper_query = """ + MATCH (p)-[*1..10]->(q) + WHERE p.name = $parent_name + RETURN DISTINCT q.name as node_name + LIMIT 20 + """ + helper_result = self.session.run(helper_query, **child_params) + helper_info = [record["node_name"] for record in helper_result if record["node_name"]] + return status, data, error, helper_info + + # 步骤3: 查找编码匹配的节点 + code_conditions = [] + code_parts = code.split("/") + + for code_part in code_parts: + if code_part.strip(): + code_conditions.append(code_part.strip()) + + if not code_conditions: + status = "error" + error = "提供的编码为空" + return status, data, error, helper_info + + # 在 child_nodes 中直接查找编码匹配的节点 + matching_nodes = [] + + for node in child_nodes: + node_code = node.get("编码", "") + if any(code_part in str(node_code) for code_part in code_conditions): + matching_nodes.append(node) + + if not matching_nodes: + status = "error" + error = f"在父节点 '{parent_name}' 下找不到编码匹配的节点" + + # 尝试提供辅助信息 - 所有指定类型的子节点的编码 + helper_info = [node.get("编码") for node in child_nodes if node.get("编码")] + return status, data, error, helper_info + + # 处理匹配的节点,转换为字典格式 + result_data = [] + for node in matching_nodes: + quantity = self._create_quantity_object(node, quantity_type) + + for key, value in node.items(): + if hasattr(quantity, key): + setattr(quantity, key, value) + + attrs = {} + for key, value in vars(quantity).items(): + if not key.startswith("_"): # 排除私有属性 + attrs[key] = value + + result_data.append(attrs) + + data = result_data + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + def get_quantities_node_by_parent_and_name( + self, parent_path, partial_name, quantity_type=None + ) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]: + """ + 通过父节点路径、模糊节点名称和类型获取工程量对象(主材或者设备),包括子节点 + + 执行三步查询: + 1. 找到对应路径的父节点 + 2. 找到父节点下多级子节点中节点类型为参数中节点类型的所有节点 + 3. 在找到的节点中找到名称中包含节点模糊名称的节点返回 + + Args: + parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 + partial_name (str): 目标节点的模糊或不完整名称 + quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None表示所有类型) + + Returns: + Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (List[Dict[str, Any]]): 成功时返回的数据列表 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表 + """ + # 初始化返回结果 + status = "success" + data = [] + error = "" + helper_info = [] + + if not partial_name: + status = "error" + error = "节点名称不能为空" + return status, data, error, helper_info + + # 验证类型参数是否有效 + valid_types = ["定额", "主材", "设备", None] + if quantity_type not in valid_types: + status = "error" + error = f"无效的工程量类型: '{quantity_type}'" + helper_info = valid_types + return status, data, error, helper_info + + try: + # 步骤1: 找到对应路径的父节点 + parent_node = self.get_node_by_path(parent_path) + if not parent_node: + status = "error" + error = f"找不到父节点路径: {parent_path}" + # 尝试提供辅助信息 - 路径的各部分是否存在 + path_parts = parent_path.split("/") + existing_parts = [] + for i in range(len(path_parts)): + partial_path = "/".join(path_parts[: i + 1]) + if self.get_node_by_path(partial_path): + existing_parts.append(partial_path) + helper_info = existing_parts + return status, data, error, helper_info + + # 从父节点中提取名称,用于后续查询 + parent_name = parent_node.get("name", "") + + # 根据工程量类型确定类型条件,不使用标签过滤 + type_condition = "" + + if quantity_type == "定额": + # 定额类型可能是 ProjectQuantity+Quota 或者 类型='0' + type_condition = "(q:ProjectQuantity:Quota OR q.类型 = '0')" + elif quantity_type == "主材": + # 主材类型可能是 ProjectQuantity+MainMaterial 或者 类型='1' + type_condition = "(q:ProjectQuantity:MainMaterial OR q.类型 = '1')" + elif quantity_type == "设备": + # 设备类型可能是 ProjectQuantity+Equipment 或者 类型='5' + type_condition = "(q:ProjectQuantity:Equipment OR q.类型 = '5')" + else: + # 如果没有指定类型,则查询所有ProjectQuantity节点 + type_condition = "q:ProjectQuantity" + + # 步骤2: 找到父节点下所有的子节点,类型为指定类型的节点 + # 使用父节点名称进行查询,不指定具体的关系类型 + child_query = f""" + MATCH (p)-[*1..10]->(q) + WHERE p.name = $parent_name AND {type_condition} + RETURN q + """ + child_params = {"parent_name": parent_name} + + child_result = self.session.run(child_query, **child_params) + child_nodes = [record["q"] for record in child_result] + + if not child_nodes: + # 如果没有找到节点,尝试直接使用父节点路径的最后一部分进行查询 + path_parts = parent_path.split("/") + last_part = path_parts[-1] if path_parts else "" + + fallback_query = f""" + MATCH (p)-[*1..10]->(q) + WHERE p.name CONTAINS $last_part AND {type_condition} + RETURN q LIMIT 100 + """ + fallback_params = {"last_part": last_part} + + fallback_result = self.session.run(fallback_query, **fallback_params) + child_nodes = [record["q"] for record in fallback_result] + + if not child_nodes: + status = "error" + error = f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 的节点" + + # 尝试提供辅助信息 - 父节点下有哪些类型的节点 + helper_query = """ + MATCH (p)-[*1..10]->(q) + WHERE p.name = $parent_name + RETURN DISTINCT labels(q) as node_labels, q.类型 as node_type + LIMIT 20 + """ + helper_params = {"parent_name": parent_name} + + helper_result = self.session.run(helper_query, **helper_params) + helper_data = [] + for record in helper_result: + labels = record["node_labels"] if record["node_labels"] else [] + node_type = record["node_type"] if record["node_type"] else "未知" + helper_data.append(f"标签: {labels}, 类型: {node_type}") + + helper_info = helper_data + return status, data, error, helper_info + + # 步骤3: 在找到的节点中找到名称中包含节点模糊名称的节点 + matching_nodes = [] + available_nodes = [] # 用于存储所有可用节点的名称,作为辅助信息 + + for node in child_nodes: + node_name = node.get("name", "") + # 记录所有节点名称作为辅助信息 + if node_name: + available_nodes.append(node_name) + + if partial_name in str(node_name): + matching_nodes.append(node) + + if not matching_nodes: + # 如果没有找到匹配的节点,尝试直接查询 + direct_query = f""" + MATCH (q) + WHERE q.name CONTAINS $partial_name AND {type_condition} + RETURN q LIMIT 20 + """ + direct_params = {"partial_name": partial_name} + + direct_result = self.session.run(direct_query, **direct_params) + matching_nodes = [record["q"] for record in direct_result] + + if not matching_nodes: + status = "error" + error = ( + f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 且名称包含 '{partial_name}' 的节点" + ) + # 提供辅助信息 - 该路径下所有可用的节点名称 + helper_info = available_nodes + return status, data, error, helper_info + + # 处理匹配的节点,将它们转换为对象 + result_data = [] + for matching_node in matching_nodes: + # 创建对应类型的对象 + quantity = self._create_quantity_object(matching_node, quantity_type) + + # 填充属性 + for key, value in matching_node.items(): + if hasattr(quantity, key): + setattr(quantity, key, value) + + # 将对象的属性转换为字典 + attrs = {} + for key, value in vars(quantity).items(): + if not key.startswith("_"): # 排除私有属性 + attrs[key] = value + + result_data.append(attrs) + + # 成功找到匹配节点 + data = result_data + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + # 辅助方法,用于根据节点数据创建对应类型的工程量对象 + def _create_quantity_object(self, node_data, quantity_type=None): + """ + 根据节点数据创建对应类型的工程量对象 + + Args: + node_data (dict): 节点数据 + quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None) + + Returns: + ProjectQuantity: 创建的工程量对象 + """ + # 如果指定了类型,直接创建对应类型的对象 + if quantity_type == "定额": + return Ration() + elif quantity_type == "主材": + return Material() + elif quantity_type == "设备": + return Equipment() + + # 如果没有指定类型,尝试通过节点属性或标签判断 + if "类型" in node_data: + if node_data["类型"] == "0": + return Ration() + elif node_data["类型"] == "1": + return Material() + elif node_data["类型"] == "5": + return Equipment() + + # 通过标签判断 + labels = list(node_data.labels) if hasattr(node_data, "labels") else [] + if "Quota" in labels: + return Ration() + elif "MainMaterial" in labels: + return Material() + elif "Equipment" in labels: + return Equipment() + + # 默认返回基类对象 + return ProjectQuantity() + + # 材机查询方法实现 + def get_material_equipment_by_path(self, path: str) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 通过路径获取材机对象(MaterialOrEquipment 类型节点) + + Args: + path (str): 项目划分节点的路径,以'/'分隔的多级节点路径 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回的节点属性数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 如果未找到目标节点,则包含父节点下所有 MaterialOrEquipment 节点名称 + - 否则为空 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 获取目标节点 + node_data = self.get_node_by_path(path) + + if not node_data: + status = "error" + error = f"找不到路径: {path} 上的 MaterialOrEquipment 节点" + + # 提取父路径 + path_parts = path.split("/") + if len(path_parts) > 1: + parent_path = "/".join(path_parts[:-1]) # 父节点路径 + parent_name = path_parts[-2] # 父节点名称 + + # 获取父节点 + parent_node = self.get_node_by_path(parent_path) + if parent_node: + # 查询父节点下所有类型为 MaterialOrEquipment 的子节点名称 + query = """ + MATCH (p)-[*1..1]->(m:MaterialOrEquipment) + WHERE p.name = $parent_name + RETURN m.name AS name + """ + params = {"parent_name": parent_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询父节点下的材机子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 将节点属性转换为字典并过滤掉私有字段 + for key, value in node_data.items(): + if not key.startswith("_"): + data[key] = value + + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + def get_material_equipment_by_parent_and_name( + self, parent_path: str, partial_name: str + ) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 通过父节点路径和模糊名称获取材机对象 + + Args: + parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 + partial_name (str): 目标节点的模糊或不完整名称 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + """ + status = "success" + data = {} + error = "" + helper_info = [] + + try: + if not parent_path or not partial_name: + status = "error" + error = "父节点路径或要查找的名称不能为空" + return status, data, error, helper_info + + # 第一步:查找父节点 + parent_node_data = self.get_node_by_path(parent_path) + + if not parent_node_data: + status = "error" + error = f"找不到父节点路径: {parent_path}" + + # 获取祖父节点路径 + path_parts = parent_path.split("/") + if len(path_parts) > 1: + grandparent_path = "/".join(path_parts[:-1]) + grandparent_name = path_parts[-2] + + # 查询祖父节点下的所有子节点名称作为 helper_info + query = """ + MATCH (p)-[*1..1]->(child) + WHERE p.name = $grandparent_name + RETURN child.name AS name + """ + params = {"grandparent_name": grandparent_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询祖父节点下的子节点失败: {str(e)}"] + + return status, data, error, helper_info + + parent_id = parent_node_data.get("id") + if not parent_id: + status = "error" + error = f"父节点 {parent_path} 没有有效的 ID" + return status, data, error, helper_info + + # 第二步:查找父节点下名称包含 partial_name 的 MaterialOrEquipment 节点 + query = """ + MATCH (p)-[*1..1]->(m:MaterialOrEquipment) + WHERE p.id = $parent_id AND m.name CONTAINS $partial_name + RETURN m LIMIT 20 + """ + params = {"parent_id": parent_id, "partial_name": partial_name} + + result = self.session.run(query, **params) + + matched_nodes = [] + for record in result: + node = record["m"] + properties = dict(node.items()) + matched_nodes.append(properties) + + if not matched_nodes: + status = "error" + error = f"在路径 {parent_path} 下没有找到名称包含 '{partial_name}' 的 MaterialOrEquipment 节点" + + # 查询父节点下所有 MaterialOrEquipment 子节点名称作为 helper_info + query_helper = """ + MATCH (p)-[*1..1]->(m:MaterialOrEquipment) + WHERE p.id = $parent_id + RETURN m.name AS name + """ + try: + result_helper = self.session.run(query_helper, parent_id=parent_id) + for record in result_helper: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询父节点下的 MaterialOrEquipment 子节点失败: {str(e)}"] + + return status, data, error, helper_info + + # 成功找到,返回第一个节点的数据 + data = matched_nodes[0] + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + # 辅助方法,用于创建材机对象并填充属性 + def _create_material_object(self, node_data): + """ + 根据节点数据创建材机对象并填充属性 + + Args: + node_data (dict): 节点数据 + + Returns: + MaterialOrEquipment: 创建的材机对象 + """ + material = MaterialOrEquipment() + + # 填充属性 + for key, value in node_data.items(): + if hasattr(material, key): + setattr(material, key, value) + + return material + + # 取费表模板查询方法实现 + def get_fee_template_by_path(self, path: str) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 通过路径获取取费表模板节点 + + Args: + path (str): 项目划分节点的路径,以'/'分隔的多级节点路径 + 例如:工程数据/取费表模板/余物清理/线路取费表(余物清理)/直接费 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回的节点属性数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 如果未找到目标节点,则包含父节点下所有 FeeCollection 节点名称 + - 否则为空 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 获取目标节点 + node_data = self.get_node_by_path(path) + + if not node_data: + status = "error" + error = f"找不到路径: {path} 上的节点" + + # 提取父路径 + path_parts = path.split("/") + if len(path_parts) > 1: + parent_path = "/".join(path_parts[:-1]) # 父节点路径 + parent_name = path_parts[-2] # 父节点名称 + + # 获取父节点 + parent_node = self.get_node_by_path(parent_path) + if parent_node: + # 查询父节点下所有类型为 FeeCollection 的子节点名称 + query = """ + MATCH (p)-[*1..1]->(f:FeeCollection) + WHERE p.name = $parent_name + RETURN f.name AS name + """ + params = {"parent_name": parent_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询父节点下的FeeCollection子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 将节点属性转换为字典并过滤掉私有字段 + for key, value in node_data.items(): + if not key.startswith("_"): + data[key] = value + + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + def get_fee_template_by_parent_and_name( + self, parent_path: str, node_name: str + ) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 通过父节点路径和节点名称获取取费表模板节点 + + Args: + parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 + 例如:工程数据/取费表模板/余物清理/线路取费表(余物清理) + node_name (str): 目标节点的名称,例如:直接费 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回的节点属性数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 如果未找到父节点,则包含父节点的父节点下所有节点名称 + - 如果未找到目标节点,则包含父节点下所有 FeeCollection 节点名称 + - 否则为空 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 第一段:查找父节点 + parent_node_data = self.get_node_by_path(parent_path) + + if not parent_node_data: + status = "error" + error = f"找不到父节点路径: {parent_path}" + + # 提取父节点的父路径 + parent_path_parts = parent_path.split("/") + if len(parent_path_parts) > 1: + grandparent_path = "/".join(parent_path_parts[:-1]) # 父节点的父路径 + grandparent_name = parent_path_parts[-2] # 父节点的父节点名称 + + # 获取父节点的父节点 + grandparent_node = self.get_node_by_path(grandparent_path) + if grandparent_node: + # 查询父节点的父节点下所有子节点名称 + query = """ + MATCH (gp)-[*1..1]->(n) + WHERE gp.name = $grandparent_name + RETURN n.name AS name + """ + params = {"grandparent_name": grandparent_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询父节点的父节点下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 第二段:在父节点下查找目标节点 + parent_node_name = parent_path.split("/")[-1] # 父节点名称 + + # 查询父节点下指定名称的节点 + query = """ + MATCH (p)-[*1..1]->(n) + WHERE p.name = $parent_name AND n.name = $node_name + RETURN n + """ + params = {"parent_name": parent_node_name, "node_name": node_name} + + try: + result = self.session.run(query, **params) + target_node = None + + for record in result: + target_node = record["n"] + break + + if not target_node: + status = "error" + error = f"在父节点 {parent_node_name} 下找不到名称为 {node_name} 的节点" + + # 查询父节点下所有类型为 FeeCollection 的子节点名称 + query = """ + MATCH (p)-[*1..1]->(f:FeeCollection) + WHERE p.name = $parent_name + RETURN f.name AS name + """ + params = {"parent_name": parent_node_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询父节点下的FeeCollection子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 将目标节点属性转换为字典并过滤掉私有字段 + for key, value in target_node.items(): + if not key.startswith("_"): + data[key] = value + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询目标节点时出错: {str(e)}" + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + # 辅助方法,用于创建取费表模板对象并填充属性 + def _create_fee_template_object(self, node_data): + """ + 根据节点数据创建取费表模板对象并填充属性 + + Args: + node_data (dict): 节点数据 + + Returns: + FeeTableTemplateItem: 创建的取费表模板对象 + """ + template = FeeTableTemplateItem() + + # 填充属性 + for key, value in node_data.items(): + if hasattr(template, key): + setattr(template, key, value) + + # 更新缓存 + if hasattr(template, "OutlayID") and template.OutlayID: + self.fee_templates[template.OutlayID] = template + + return template + + # 费用表查询方法实现 + def get_fee_schedule_on_auxiliary_expense_table( + self, table_name: str, fee_name: str, fee_attribute: str + ) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 在辅助费用表中查找费用 + + Args: + table_name (str): 费用表名称 + fee_name (str): 要查找的费用名称(可能在多级子节点中) + fee_attribute (str): 费用值属性名 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 第一步失败:返回"工程数据"节点下所有子节点名称 + - 第二步失败:返回费用表节点下所有子节点名称(递归查找) + - 第三步失败:返回费用节点的所有可用属性名称 + - 否则为空 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 检查输入参数 + if not table_name or not fee_name or not fee_attribute: + status = "error" + error = "输入参数不能为空" + return status, data, error, helper_info + + # 第一步:查找父节点(费用表节点) + parent_path = f"工程数据/工程费用/{table_name}" + parent_node_data = self.get_node_by_path(parent_path) + + if not parent_node_data: + status = "error" + error = f"找不到费用表节点: {parent_path}" + + # 查询"工程数据/工程费用"节点下所有子节点名称 + base_parent_node = self.get_node_by_path("工程数据/工程费用") + if base_parent_node: + query = """ + MATCH (p)-[*1..1]->(n) + WHERE p.name = '工程费用' + RETURN n.name AS name + """ + + try: + result = self.session.run(query) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) + fee_node = None + + # 递归查找费用节点,最多查找3级深度 + query = """ + MATCH (p)-[*1..3]->(f) + WHERE p.name = $table_name AND f.name = $fee_name + RETURN f LIMIT 1 + """ + params = {"table_name": table_name, "fee_name": fee_name} + + try: + result = self.session.run(query, **params) + all_records = result.data() + + if len(all_records) > 0: + fee_node = all_records[0]["f"] + + if not fee_node: + status = "error" + error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" + + # 查询费用表节点下所有子节点名称(递归查找) + query = """ + MATCH (p)-[*1..3]->(n) + WHERE p.name = $table_name + RETURN DISTINCT n.name AS name + ORDER BY n.name + """ + params = {"table_name": table_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询费用节点时出错: {str(e)}" + return status, data, error, helper_info + + # 第三步:获取费用节点的属性值 + try: + if fee_node and hasattr(fee_node, "get"): + fee_value = fee_node.get(fee_attribute) + elif fee_node and isinstance(fee_node, dict): + fee_value = fee_node.get(fee_attribute) + else: + # 如果fee_node是Neo4j Node对象,尝试直接访问属性 + fee_value = ( + getattr(fee_node, fee_attribute, None) + if hasattr(fee_node, fee_attribute) + else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None + ) + + if fee_value is None: + status = "error" + error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" + + # 返回费用节点的所有可用属性名称 + try: + if hasattr(fee_node, "keys"): + helper_info = list(fee_node.keys()) + elif isinstance(fee_node, dict): + helper_info = list(fee_node.keys()) + else: + # 对于Neo4j Node对象 + helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] + + # 过滤掉私有属性 + helper_info = [attr for attr in helper_info if not attr.startswith("_")] + + except Exception as e: + helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] + + return status, data, error, helper_info + + # 成功获取属性值,只返回属性值 + data = fee_value + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"获取费用属性值时出错: {str(e)}" + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + def get_fee_schedule_on_other_expense_table( + self, table_name: str, fee_name: str, fee_attribute: str + ) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 在其它费用表中查找费用 + + Args: + table_name (str): 费用表名称 + fee_name (str): 要查找的费用名称(可能在多级子节点中) + fee_attribute (str): 费用值属性名 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 第一步失败:返回"工程数据"节点下所有子节点名称 + - 第二步失败:返回费用表节点下所有子节点名称(递归查找) + - 第三步失败:返回费用节点的所有可用属性名称 + - 否则为空 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 检查输入参数 + if not table_name or not fee_name or not fee_attribute: + status = "error" + error = "输入参数不能为空" + return status, data, error, helper_info + + # 第一步:查找父节点(费用表节点) + parent_path = f"工程数据/工程费用/{table_name}" + parent_node_data = self.get_node_by_path(parent_path) + + if not parent_node_data: + status = "error" + error = f"找不到费用表节点: {parent_path}" + + # 查询"工程数据/工程费用"节点下所有子节点名称 + base_parent_node = self.get_node_by_path("工程数据/工程费用") + if base_parent_node: + query = """ + MATCH (p)-[*1..1]->(n) + WHERE p.name = '工程费用' + RETURN n.name AS name + """ + + try: + result = self.session.run(query) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) + fee_node = None + + # 递归查找费用节点,最多查找3级深度 + query = """ + MATCH (p)-[*1..3]->(f) + WHERE p.name = $table_name AND f.name = $fee_name + RETURN f LIMIT 1 + """ + params = {"table_name": table_name, "fee_name": fee_name} + + try: + result = self.session.run(query, **params) + all_records = result.data() + + if len(all_records) > 0: + fee_node = all_records[0]["f"] + + if not fee_node: + status = "error" + error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" + + # 查询费用表节点下所有子节点名称(递归查找) + query = """ + MATCH (p)-[*1..3]->(n) + WHERE p.name = $table_name + RETURN DISTINCT n.name AS name + ORDER BY n.name + """ + params = {"table_name": table_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询费用节点时出错: {str(e)}" + return status, data, error, helper_info + + # 第三步:获取费用节点的属性值 + try: + if fee_node and hasattr(fee_node, "get"): + fee_value = fee_node.get(fee_attribute) + elif fee_node and isinstance(fee_node, dict): + fee_value = fee_node.get(fee_attribute) + else: + # 如果fee_node是Neo4j Node对象,尝试直接访问属性 + fee_value = ( + getattr(fee_node, fee_attribute, None) + if hasattr(fee_node, fee_attribute) + else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None + ) + + if fee_value is None: + status = "error" + error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" + + # 返回费用节点的所有可用属性名称 + try: + if hasattr(fee_node, "keys"): + helper_info = list(fee_node.keys()) + elif isinstance(fee_node, dict): + helper_info = list(fee_node.keys()) + else: + # 对于Neo4j Node对象 + helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] + + # 过滤掉私有属性 + helper_info = [attr for attr in helper_info if not attr.startswith("_")] + + except Exception as e: + helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] + + return status, data, error, helper_info + + # 成功获取属性值,只返回属性值 + data = fee_value + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"获取费用属性值时出错: {str(e)}" + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + def get_fee_schedule_on_land_acquisition_fee_table_table( + self, table_name: str, fee_name: str, fee_attribute: str + ) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 在土地征用费表中查找费用 + + Args: + table_name (str): 费用表名称 + fee_name (str): 要查找的费用名称(可能在多级子节点中) + fee_attribute (str): 费用值属性名 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 第一步失败:返回"工程数据"节点下所有子节点名称 + - 第二步失败:返回费用表节点下所有子节点名称(递归查找) + - 第三步失败:返回费用节点的所有可用属性名称 + - 否则为空 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 检查输入参数 + if not table_name or not fee_name or not fee_attribute: + status = "error" + error = "输入参数不能为空" + return status, data, error, helper_info + + # 第一步:查找父节点(费用表节点) + parent_path = f"工程数据/工程费用/{table_name}" + parent_node_data = self.get_node_by_path(parent_path) + + if not parent_node_data: + status = "error" + error = f"找不到费用表节点: {parent_path}" + + # 查询"工程数据/工程费用"节点下所有子节点名称 + base_parent_node = self.get_node_by_path("工程数据/工程费用") + if base_parent_node: + query = """ + MATCH (p)-[*1..1]->(n) + WHERE p.name = '工程费用' + RETURN n.name AS name + """ + + try: + result = self.session.run(query) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) + fee_node = None + + # 递归查找费用节点,最多查找3级深度 + query = """ + MATCH (p)-[*1..3]->(f) + WHERE p.name = $table_name AND f.name = $fee_name + RETURN f LIMIT 1 + """ + params = {"table_name": table_name, "fee_name": fee_name} + + try: + result = self.session.run(query, **params) + all_records = result.data() + + if len(all_records) > 0: + fee_node = all_records[0]["f"] + + if not fee_node: + status = "error" + error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" + + # 查询费用表节点下所有子节点名称(递归查找) + query = """ + MATCH (p)-[*1..3]->(n) + WHERE p.name = $table_name + RETURN DISTINCT n.name AS name + ORDER BY n.name + """ + params = {"table_name": table_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询费用节点时出错: {str(e)}" + return status, data, error, helper_info + + # 第三步:获取费用节点的属性值 + try: + if fee_node and hasattr(fee_node, "get"): + fee_value = fee_node.get(fee_attribute) + elif fee_node and isinstance(fee_node, dict): + fee_value = fee_node.get(fee_attribute) + else: + # 如果fee_node是Neo4j Node对象,尝试直接访问属性 + fee_value = ( + getattr(fee_node, fee_attribute, None) + if hasattr(fee_node, fee_attribute) + else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None + ) + + if fee_value is None: + status = "error" + error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" + + # 返回费用节点的所有可用属性名称 + try: + if hasattr(fee_node, "keys"): + helper_info = list(fee_node.keys()) + elif isinstance(fee_node, dict): + helper_info = list(fee_node.keys()) + else: + # 对于Neo4j Node对象 + helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] + + # 过滤掉私有属性 + helper_info = [attr for attr in helper_info if not attr.startswith("_")] + + except Exception as e: + helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] + + return status, data, error, helper_info + + # 成功获取属性值,只返回属性值 + data = fee_value + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"获取费用属性值时出错: {str(e)}" + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + def get_fee_schedule_on_installation_price_difference_table( + self, table_name: str, fee_name: str, fee_attribute: str + ) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 在安装价差表中查找费用 + + Args: + table_name (str): 费用表名称 + fee_name (str): 要查找的费用名称(可能在多级子节点中) + fee_attribute (str): 费用值属性名 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 第一步失败:返回"工程数据"节点下所有子节点名称 + - 第二步失败:返回费用表节点下所有子节点名称(递归查找) + - 第三步失败:返回费用节点的所有可用属性名称 + - 否则为空 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 检查输入参数 + if not table_name or not fee_name or not fee_attribute: + status = "error" + error = "输入参数不能为空" + return status, data, error, helper_info + + # 第一步:查找父节点(费用表节点) + parent_path = f"工程数据/工程费用/{table_name}" + parent_node_data = self.get_node_by_path(parent_path) + + if not parent_node_data: + status = "error" + error = f"找不到费用表节点: {parent_path}" + + # 查询"工程数据/工程费用"节点下所有子节点名称 + base_parent_node = self.get_node_by_path("工程数据/工程费用") + if base_parent_node: + query = """ + MATCH (p)-[*1..1]->(n) + WHERE p.name = '工程费用' + RETURN n.name AS name + """ + + try: + result = self.session.run(query) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) + fee_node = None + + # 递归查找费用节点,最多查找3级深度 + query = """ + MATCH (p)-[*1..3]->(f) + WHERE p.name = $table_name AND f.name = $fee_name + RETURN f LIMIT 1 + """ + params = {"table_name": table_name, "fee_name": fee_name} + + try: + result = self.session.run(query, **params) + all_records = result.data() + + if len(all_records) > 0: + fee_node = all_records[0]["f"] + + if not fee_node: + status = "error" + error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" + + # 查询费用表节点下所有子节点名称(递归查找) + query = """ + MATCH (p)-[*1..3]->(n) + WHERE p.name = $table_name + RETURN DISTINCT n.name AS name + ORDER BY n.name + """ + params = {"table_name": table_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询费用节点时出错: {str(e)}" + return status, data, error, helper_info + + # 第三步:获取费用节点的属性值 + try: + if fee_node and hasattr(fee_node, "get"): + fee_value = fee_node.get(fee_attribute) + elif fee_node and isinstance(fee_node, dict): + fee_value = fee_node.get(fee_attribute) + else: + # 如果fee_node是Neo4j Node对象,尝试直接访问属性 + fee_value = ( + getattr(fee_node, fee_attribute, None) + if hasattr(fee_node, fee_attribute) + else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None + ) + + if fee_value is None: + status = "error" + error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" + + # 返回费用节点的所有可用属性名称 + try: + if hasattr(fee_node, "keys"): + helper_info = list(fee_node.keys()) + elif isinstance(fee_node, dict): + helper_info = list(fee_node.keys()) + else: + # 对于Neo4j Node对象 + helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] + + # 过滤掉私有属性 + helper_info = [attr for attr in helper_info if not attr.startswith("_")] + + except Exception as e: + helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] + + return status, data, error, helper_info + + # 成功获取属性值,只返回属性值 + data = fee_value + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"获取费用属性值时出错: {str(e)}" + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + def get_fee_schedule_on_Engineering_Cost_table( + self, table_name: str, fee_name: str, fee_attribute: str + ) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 在工程费用表中查找费用 + + Args: + table_name (str): 费用表名称 + fee_name (str): 要查找的费用名称(可能在多级子节点中) + fee_attribute (str): 费用值属性名 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 第一步失败:返回"工程数据"节点下所有子节点名称 + - 第二步失败:返回费用表节点下所有子节点名称(递归查找) + - 第三步失败:返回费用节点的所有可用属性名称 + - 否则为空 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 检查输入参数 + if not table_name or not fee_name or not fee_attribute: + status = "error" + error = "输入参数不能为空" + return status, data, error, helper_info + + # 第一步:查找父节点(费用表节点) + parent_path = f"工程数据/工程费用/{table_name}" + parent_node_data = self.get_node_by_path(parent_path) + + if not parent_node_data: + status = "error" + error = f"找不到费用表节点: {parent_path}" + + # 查询"工程数据/工程费用"节点下所有子节点名称 + base_parent_node = self.get_node_by_path("工程数据/工程费用") + if base_parent_node: + query = """ + MATCH (p)-[*1..1]->(n) + WHERE p.name = '工程费用' + RETURN n.name AS name + """ + + try: + result = self.session.run(query) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) + fee_node = None + + # 递归查找费用节点,最多查找3级深度 + query = """ + MATCH (p)-[*1..3]->(f) + WHERE p.name = $table_name AND f.name = $fee_name + RETURN f LIMIT 1 + """ + params = {"table_name": table_name, "fee_name": fee_name} + + try: + result = self.session.run(query, **params) + all_records = result.data() + + if len(all_records) > 0: + fee_node = all_records[0]["f"] + + if not fee_node: + status = "error" + error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" + + # 查询费用表节点下所有子节点名称(递归查找) + query = """ + MATCH (p)-[*1..3]->(n) + WHERE p.name = $table_name + RETURN DISTINCT n.name AS name + ORDER BY n.name + """ + params = {"table_name": table_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询费用节点时出错: {str(e)}" + return status, data, error, helper_info + + # 第三步:获取费用节点的属性值 + try: + if fee_node and hasattr(fee_node, "get"): + fee_value = fee_node.get(fee_attribute) + elif fee_node and isinstance(fee_node, dict): + fee_value = fee_node.get(fee_attribute) + else: + # 如果fee_node是Neo4j Node对象,尝试直接访问属性 + fee_value = ( + getattr(fee_node, fee_attribute, None) + if hasattr(fee_node, fee_attribute) + else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None + ) + + if fee_value is None: + status = "error" + error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" + + # 返回费用节点的所有可用属性名称 + try: + if hasattr(fee_node, "keys"): + helper_info = list(fee_node.keys()) + elif isinstance(fee_node, dict): + helper_info = list(fee_node.keys()) + else: + # 对于Neo4j Node对象 + helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] + + # 过滤掉私有属性 + helper_info = [attr for attr in helper_info if not attr.startswith("_")] + + except Exception as e: + helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] + + return status, data, error, helper_info + + # 成功获取属性值,只返回属性值 + data = fee_value + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"获取费用属性值时出错: {str(e)}" + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + +class ProjectBuilder: + """ + 项目构建器 + 描述: 用于构建项目对象的构建器 + """ + + _instance = None + + @staticmethod + def build(): + """ + 构建并返回项目实例 + + Returns: + ProjectTookiItNeo4j: 创建的项目实例 + """ + # 如果已经有实例,先关闭它 + if ProjectBuilder._instance is not None: + ProjectBuilder._instance.close() + + # 创建新实例 + + ProjectBuilder._instance = ProjectTookiItNeo4j() + + return ProjectBuilder._instance + + @staticmethod + def close(): + """ + 关闭当前项目实例的连接 + """ + if ProjectBuilder._instance is not None: + ProjectBuilder._instance.close() + ProjectBuilder._instance = None + + +# 注册退出处理函数,确保程序退出时自动关闭连接 +atexit.register(ProjectBuilder.close) + + +# project = ProjectBuilder.build() +# status, data, error, helper_info = project.get_quantities_node_by_parent_and_name( +# "工程数据/安装工程/安装/架空输电线路本体工程/杆塔工程/杆塔组立/铁塔、钢管杆组立", +# "塔材", +# "主材", +# ) + +# print(status) +# print(data) +# print(error) +# print(helper_info) diff --git a/kg_lab_6.13/prompt_templates.py b/kg_lab_6.13/prompt_templates.py new file mode 100644 index 0000000..1392816 --- /dev/null +++ b/kg_lab_6.13/prompt_templates.py @@ -0,0 +1,76 @@ +from langchain.prompts import PromptTemplate + +FUNCTION_CALL_TEMPLATE = """ +你是一个专业的Python工程师。我会给你一个用户问题,你需要将其转换为对应的Python代码 + +可用工具: +{tools} + +工具名称: +{tool_names} + +# 工作流程 +1. 从用户问题中{query}提取关键信息(节点路径、节点类型、节点名称等) +2. 使用工具查询知识图谱结构以理解可用节点和节点属性 +3. 根据查询结果选择最匹配的{project_class_methods}中的方法 +4. 生成可直接执行的Python代码 + +# 代码模板(必须严格遵循) +def neo4j_find_function(): + project = ProjectBuilder.build() + status, data, error, helper_info = project.[SELECTED_METHOD]([PARAMETERS]) + return status, data, error, helper_info + +# 执行规则 +- 每次只能调用一个工具或生成最终代码 +- 参数必须从用户问题或知识图谱查询结果中提取 +- 必须确保生成的代码可以直接执行 +- 禁止修改代码模板结构 +- 禁止添加任何注释或解释 + +# 当前进度 +{agent_scratchpad} + +# 响应格式 +思考: 分析当前步骤需要做什么 +行动: 选择工具名称 +行动输入: 工具参数 +观察: 工具返回结果 + +...(重复直到准备好生成代码)... + +思考: 已收集足够信息,可以生成代码 +Final Answer: +def neo4j_find_function(): + project = ProjectBuilder.build() + status, data, error, helper_info = project.[SELECTED_METHOD]([PARAMETERS]) + return status, data, error, helper_info +""" + +FUNCTION_CALL_PROMPT = PromptTemplate.from_template(FUNCTION_CALL_TEMPLATE) + + +########################################################################################################################################################################### + +FUNCTION_RETURNS_LOOP_TEMPLATE = """ + +你是一个专业的Python工程师。我会给你一段错误python代码和错误信息,你需要帮我修复这段出错的代码 + +你的任务是: +1. 根据需要修改的代码{original_code}和代码的错误信息{error_info}来对代码和参数进行修改 +2. 如果错误信息中是代码的逻辑出现错误,那么就需要对代码本身整体结构进行修改 +3. 如果是代码中参数出现问题了,那么就需要结合错误信息中的帮助信息(helper_info)来对代码总的参数进行修改 +4. 修复后的代码应该完整,可以直接执行,并且能够返回查询结果 + +注意: +- 必须只输出最终的Python代码,不要添加任何解释、注释、推理过程或自然语言描述。 +- 不要以“以下是修正后的代码”、“修改如下”等语句开头。 +- 不要输出任何其他无关的内容。 +- 输出格式必须完全符合指定的函数模板。 +- 如果无法根据已有信息进行修改,请原样返回原始代码。 + +请输出你修补后的代码: +""" + + +FUNCTION_RETURNS_LOOP_PROMPT: PromptTemplate = PromptTemplate.from_template(FUNCTION_RETURNS_LOOP_TEMPLATE)