from neo4j import GraphDatabase from project import * import atexit from typing import Tuple, List, Dict, Any, Optional import logging from src.config import Config logger = logging.getLogger("project_implementation") config = Config() class ProjectToolkitNeo4j(ProjectToolkit): """ 基于Neo4j数据库的项目类实现 """ def __init__(self, neo4j_driver): logger.info("开始初始化Neo4j连接") """ 初始化Neo4j连接 Args: neo4j_driver: Neo4j驱动实例,必须提供 """ if neo4j_driver is None: raise ValueError("必须提供Neo4j驱动实例") super().__init__() # 保存驱动实例 self.driver = neo4j_driver self.external_driver = True # 标记为外部驱动,不在close时关闭 self.session = self.driver.session() # 初始化其他必要的数据结构 self.material_equipment_dict = {} # 材机字典,键为ID self.fee_templates = {} # 取费表模板字典,键为ID self.fee_schedules = {} # 费用表字典,键为ID self.project_properties = {} # 工程属性字典 logger.info("Neo4j连接初始化完成") def close(self): logger.info("开始关闭数据库连接") """ 关闭数据库连接 """ if self.session: self.session.close() # 仅在非外部驱动的情况下关闭driver if self.driver and not self.external_driver: self.driver.close() logger.info("数据库连接已关闭") def debug_path(self, path): logger.info(f"开始调试路径: {path}") path_parts = path.split("/") print(f"调试路径: {path}") # 检查每一级路径是否存在 for i in range(len(path_parts)): partial_path = "/".join(path_parts[: i + 1]) query = """ MATCH (n) WHERE n.name = $name RETURN n.name as name """ result = self.session.run(query, name=path_parts[i]) record = result.single() exists = "存在" if record else "不存在" print(f" 节点 '{path_parts[i]}' {exists}") logger.info(f"路径 {path} 调试完成") # 通用节点查询方法 def get_node_by_path(self, path, node_labels=None): logger.info(f"开始通过路径 {path} 获取节点对象") """ 通过路径获取节点对象 Args: path (str): 以'/'分隔的多级节点路径 node_labels (list): 节点标签列表,用于过滤结果 Returns: dict|None: 节点数据,如果路径不存在返回None """ if not path: logger.warning("输入路径为空,无法获取节点对象") return None # 分割路径为各个部分 path_parts = path.split("/") # 构建查询 if len(path_parts) == 1: # 只有一级路径,直接查询 if node_labels: labels_str = ":" + "|:".join(node_labels) query = f""" MATCH (n{labels_str}) WHERE n.name = $name RETURN n LIMIT 1 """ else: query = """ MATCH (n) WHERE n.name = $name RETURN n LIMIT 1 """ params = {"name": path_parts[0]} else: # 多级路径,构建路径查询 last_part = path_parts[-1] if node_labels: labels_str = ":" + "|".join(node_labels) query = f""" MATCH path = (root)-[*]->(target{labels_str}) WHERE target.name = $last_part RETURN target as n LIMIT 1 """ else: query = """ MATCH path = (root)-[*]->(target) WHERE target.name = $last_part RETURN target as n LIMIT 1 """ params = {"last_part": last_part} try: result = self.session.run(query, **params) record = result.single() if not record: logger.warning(f"路径 {path} 不存在,未找到节点对象") return None logger.info(f"成功通过路径 {path} 获取节点对象") return record["n"] except Exception as e: logger.error(f"获取节点对象时出错: {e}") print(f"获取节点对象时出错: {e}") return None # 项目划分查询方法 # 项目划分查询方法 def get_division_item_by_path(self, path) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 通过路径获取项目划分对象 Args: path (str): 项目划分节点的路径,以'/'分隔的多级节点路径 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回的项目划分对象数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表,如果未找到节点,则包含父节点下所有ProjectDivisionItem节点 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 获取节点数据 node_data = self.get_node_by_path(path, ["ProjectDivisionItem"]) if not node_data: status = "error" error = f"找不到路径: {path} 上的ProjectDivisionItem节点" # 提取父路径 path_parts = path.split("/") if len(path_parts) > 1: # 去掉最后一个部分,得到父路径 parent_path = "/".join(path_parts[:-1]) parent_name = path_parts[-2] # 父节点名称 # 获取父节点 parent_node = self.get_node_by_path(parent_path) if parent_node: # 查询父节点下所有类型为ProjectDivisionItem的子节点 query = """ MATCH (p)-[*1..1]->(q:ProjectDivisionItem) WHERE p.name = $parent_name RETURN q.name as name """ params = {"parent_name": parent_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询父节点下的子节点时出错: {str(e)}"] return status, data, error, helper_info # 创建项目划分对象并填充属性 item = ProjectDivisionItem() for key, value in node_data.items(): if hasattr(item, key): setattr(item, key, value) # 将对象的属性转换为字典 for key, value in vars(item).items(): if not key.startswith("_"): # 排除私有属性 data[key] = value return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_division_node_by_parent_and_name( self, parent_path, partial_name ) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]: """ 通过父节点路径和模糊节点名称获取项目划分对象,包括子节点 执行两步查询: 1. 找到对应路径的父节点 2. 在父节点下查找名称中包含模糊名称的节点 Args: parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 partial_name (str): 目标节点的模糊或不完整名称 Returns: Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (List[Dict[str, Any]]): 成功时返回的数据列表 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表 """ # 初始化返回结果 status = "success" data = [] error = "" helper_info = [] # 检查部分名称是否为空 if not partial_name: status = "error" error = "节点名称不能为空" return status, data, error, helper_info try: # 第一步:找到父节点 parent_node_data = self.get_node_by_path(parent_path) # 如果找不到父节点 if not parent_node_data: status = "error" error = f"找不到父节点路径: {parent_path}" # 提取父路径的父节点 path_parts = parent_path.split("/") if len(path_parts) > 1: # 去掉最后一个部分,得到父路径的父路径 grandparent_path = "/".join(path_parts[:-1]) grandparent_name = path_parts[-2] if len(path_parts) >= 2 else "" # 查询父路径的父节点下所有类型为ProjectDivisionItem的节点,不指定关系类型 query = """ MATCH (p)-[*1..1]->(q:ProjectDivisionItem) WHERE p.name = $grandparent_name RETURN q.name as name """ params = {"grandparent_name": grandparent_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) # 如果没有找到任何节点,尝试更宽泛的查询 if not helper_info: broader_query = """ MATCH (p {name: $grandparent_name})-[*1..2]->(q:ProjectDivisionItem) RETURN q.name as name LIMIT 50 """ broader_result = self.session.run(broader_query, **params) for record in broader_result: if record["name"]: helper_info.append(record["name"]) except Exception as e: print(f"查询辅助信息时出错: {str(e)}") helper_info = [f"查询辅助信息时出错: {str(e)}"] return status, data, error, helper_info # 获取父节点名称 parent_name = parent_node_data.get("name") if not parent_name: status = "error" error = "父节点数据中没有name字段" return status, data, error, helper_info parent_name = parent_node_data.get("name", "") # 第二步:改用父节点名称进行查询 query = """ MATCH (p {name: $parent_name})-[*1..1]-(n:ProjectDivisionItem) WHERE toLower(n.name) CONTAINS toLower($partial_name) RETURN n LIMIT 50 """ params = {"parent_name": parent_name, "partial_name": partial_name.strip()} # print(f"调试参数: parent_name='{parent_name}', partial_name='{partial_name}'") result = self.session.run(query, **params) records = list(result) # print(f"调试返回: 找到{len(records)}条记录") # 处理查询结果 items = [] for record in records: node_data = record["n"] item = ProjectDivisionItem() for key, value in node_data.items(): if hasattr(item, key): setattr(item, key, value) items.append(vars(item)) # 如果没有找到匹配的节点 if not items: status = "error" error = f"在父节点 '{parent_name}' 下找不到名称包含 '{partial_name}' 的节点" # 查询父节点下所有类型为ProjectDivisionItem的节点作为辅助信息,不指定关系类型 helper_query = """ MATCH (p)-[*1..1]->(q:ProjectDivisionItem) WHERE p.id = $parent_id RETURN q.name as name """ helper_params = {"parent_name": parent_name} try: helper_result = self.session.run(helper_query, **helper_params) for record in helper_result: if record["name"]: helper_info.append(record["name"]) # 如果没有找到任何节点,尝试更宽泛的查询 if not helper_info: broader_query = """ MATCH (p)-[*1..2]->(q:ProjectDivisionItem) WHERE p.id = $parent_id RETURN q.name as name LIMIT 50 """ broader_result = self.session.run(broader_query, **helper_params) for record in broader_result: if record["name"]: helper_info.append(record["name"]) # 如果仍然没有找到,尝试使用名称而不是ID if not helper_info: name_query = """ MATCH (p {name: $parent_name})-[*1..2]->(q:ProjectDivisionItem) RETURN q.name as name LIMIT 50 """ name_params = {"parent_name": parent_name} name_result = self.session.run(name_query, **name_params) for record in name_result: if record["name"]: helper_info.append(record["name"]) except Exception as e: print(f"查询辅助信息时出错: {str(e)}") helper_info = [f"查询辅助信息时出错: {str(e)}"] return status, data, error, helper_info # 成功找到匹配节点 data = items return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_division_by_name(self, name_part) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]: """ 直接查找节点类型为ProjectDivisionItem且name字段包含输入名称的节点 Args: name_part (str): 节点名称的部分内容 Returns: Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (List[Dict[str, Any]]): 成功时返回的数据列表 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表 """ # 初始化返回结果 status = "success" data = [] error = "" helper_info = [] # 检查输入名称是否为空 if not name_part or name_part.strip() == "": status = "error" error = "输入的名称部分不能为空" return status, data, error, helper_info try: # 直接查询所有类型为ProjectDivisionItem且name包含输入名称的节点 query = """ MATCH (n:ProjectDivisionItem) WHERE toLower(n.name) CONTAINS toLower($name_part) RETURN n LIMIT 50 """ params = {"name_part": name_part.strip()} result = self.session.run(query, parameters=params) records = list(result) if not records: status = "error" error = f"找不到名称包含 '{name_part}' 的ProjectDivisionItem节点" # 提供一些可能相近的节点名称作为辅助信息 broader_query = """ MATCH (n:ProjectDivisionItem) RETURN n.name AS name LIMIT 20 """ broader_result = self.session.run(broader_query) for record in broader_result: if record["name"]: helper_info.append(record["name"]) return status, data, error, helper_info # 处理查询结果 items = [] for record in records: node_data = record["n"] item = ProjectDivisionItem() for key, value in node_data.items(): if hasattr(item, key): setattr(item, key, value) # 获取节点的完整路径作为附加信息 node_path_query = """ MATCH path = (root)-[*]->(target:ProjectDivisionItem) WHERE id(target) = $node_id WITH [node in nodes(path) | node.name] AS path_names RETURN path_names """ path_result = self.session.run(node_path_query, parameters={"node_id": node_data.id}) path_record = path_result.single() node_dict = vars(item) if path_record: node_dict["完整路径"] = "/".join([name for name in path_record["path_names"] if name]) items.append(node_dict) data = items return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info # 工程量查询方法 def get_quantities_by_paths(self, paths_str) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 获取指定项目路径下的工程量对象 Args: paths_str (str): 以'/'分隔的多级节点路径 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: - status: 'success'或'error' - data: 成功时返回的节点数据字典,失败时为{} - error: 错误信息,成功时为'' - helper_info: 辅助信息列表(当目标节点不存在时返回父节点下的所有ProjectQuantity节点名称) """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] if not paths_str: status = "error" error = "路径不能为空" return status, data, error, helper_info try: # 使用通用方法获取节点,考虑所有可能的工程量类型 node_data = self.get_node_by_path(paths_str, ["ProjectQuantity", "Quota", "MainMaterial", "Equipment"]) if node_data: # 根据节点标签或类型属性创建对应类型的对象 quantity = self._create_quantity_object(node_data) # 填充属性 for key, value in node_data.items(): if hasattr(quantity, key): setattr(quantity, key, value) # 将对象转为字典 data = vars(quantity) return status, data, error, helper_info # 如果找不到节点,尝试获取父节点下的所有ProjectQuantity节点作为辅助信息 path_parts = paths_str.split("/") if len(path_parts) > 1: parent_path = "/".join(path_parts[:-1]) target_name = path_parts[-1] # 获取父节点 parent_node_data = self.get_node_by_path(parent_path, ["ProjectDivisionItem"]) if parent_node_data: # 查询父节点下的所有符合条件的节点 query = """ MATCH (p)-[*1..1]->(n) WHERE p.name = $parent_name AND any(label IN labels(n) WHERE label IN $allowed_labels) RETURN n.name as name, labels(n) as labels """ params = { "parent_name": parent_node_data.get("name", ""), "allowed_labels": ["ProjectQuantity", "Quota", "MainMaterial", "Equipment"], } try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append({record["name"]}) except Exception as e: print(f"查询辅助信息时出错: {str(e)}") helper_info = [f"查询辅助信息时出错: {str(e)}"] status = "error" error = f"找不到路径: {paths_str}" return status, data, error, helper_info except Exception as e: status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_quantities_node_by_parent_and_code( self, parent_path, quantity_type=None, code=None ) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]: """ 通过父节点路径和编码获取工程量对象(定额、主材或设备),包括子节点 执行三步查询: 1. 找到对应路径的父节点 2. 找到父节点下多级子节点中节点类型为参数中节点类型的所有节点 3. 在找到的节点中查找编码与传入编码匹配的节点并返回 Args: parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None表示所有类型) code (str): 工程量编码,以'/'分隔的多个编码 Returns: Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (List[Dict[str, Any]]): 成功时返回的数据列表 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表 """ # 初始化返回结果 status = "success" data = [] error = "" helper_info = [] if not code: status = "error" error = "编码不能为空" return status, data, error, helper_info # 验证类型参数是否有效 valid_types = ["定额", "主材", "设备", None] if quantity_type not in valid_types: status = "error" error = f"无效的工程量类型: '{quantity_type}'" helper_info = valid_types return status, data, error, helper_info try: # 步骤1: 找到对应路径的父节点 parent_node = self.get_node_by_path(parent_path) if not parent_node: status = "error" error = f"找不到父节点路径: {parent_path}" # 尝试提供辅助信息 - 路径的各部分是否存在 path_parts = parent_path.split("/") existing_parts = [] for i in range(len(path_parts)): partial_path = "/".join(path_parts[: i + 1]) if self.get_node_by_path(partial_path): existing_parts.append(partial_path) helper_info = existing_parts return status, data, error, helper_info # 提取父节点名称用于后续查询 parent_name = parent_node.get("name", "") # 根据工程量类型确定类型条件 type_condition = "" if quantity_type == "定额": type_condition = "(q:ProjectQuantity:Quota OR q.类型 = '0')" elif quantity_type == "主材": type_condition = "(q:ProjectQuantity:MainMaterial OR q.类型 = '1')" elif quantity_type == "设备": type_condition = "(q:ProjectQuantity:Equipment OR q.类型 = '5')" else: type_condition = "q:ProjectQuantity" # 步骤2: 查找父节点下所有的指定类型的子节点 child_query = f""" MATCH (p)-[*1..10]->(q) WHERE p.name = $parent_name AND {type_condition} RETURN q """ child_params = {"parent_name": parent_name} child_result = self.session.run(child_query, **child_params) child_nodes = [record["q"] for record in child_result] if not child_nodes: status = "error" error = f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 的节点" # 尝试提供辅助信息 - 父节点下有哪些节点名称 helper_query = """ MATCH (p)-[*1..10]->(q) WHERE p.name = $parent_name RETURN DISTINCT q.name as node_name LIMIT 20 """ helper_result = self.session.run(helper_query, **child_params) helper_info = [record["node_name"] for record in helper_result if record["node_name"]] return status, data, error, helper_info # 步骤3: 查找编码匹配的节点 code_conditions = [] code_parts = code.split("/") for code_part in code_parts: if code_part.strip(): code_conditions.append(code_part.strip()) if not code_conditions: status = "error" error = "提供的编码为空" return status, data, error, helper_info # 在 child_nodes 中直接查找编码匹配的节点 matching_nodes = [] for node in child_nodes: node_code = node.get("编码", "") if any(code_part in str(node_code) for code_part in code_conditions): matching_nodes.append(node) if not matching_nodes: status = "error" error = f"在父节点 '{parent_name}' 下找不到编码匹配的节点" # 尝试提供辅助信息 - 所有指定类型的子节点的编码 helper_info = [node.get("编码") for node in child_nodes if node.get("编码")] return status, data, error, helper_info # 处理匹配的节点,转换为字典格式 result_data = [] for node in matching_nodes: quantity = self._create_quantity_object(node, quantity_type) for key, value in node.items(): if hasattr(quantity, key): setattr(quantity, key, value) attrs = {} for key, value in vars(quantity).items(): if not key.startswith("_"): # 排除私有属性 attrs[key] = value result_data.append(attrs) data = result_data return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_quantities_node_by_parent_and_name( self, parent_path, partial_name, quantity_type=None ) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]: """ 通过父节点路径、模糊节点名称和类型获取工程量对象(主材或者设备),包括子节点 执行三步查询: 1. 找到对应路径的父节点 2. 找到父节点下多级子节点中节点类型为参数中节点类型的所有节点 3. 在找到的节点中找到名称中包含节点模糊名称的节点返回 Args: parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 partial_name (str): 目标节点的模糊或不完整名称 quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None表示所有类型) Returns: Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (List[Dict[str, Any]]): 成功时返回的数据列表 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表 """ # 初始化返回结果 status = "success" data = [] error = "" helper_info = [] if not partial_name: status = "error" error = "节点名称不能为空" return status, data, error, helper_info # 验证类型参数是否有效 valid_types = ["定额", "主材", "设备", None] if quantity_type not in valid_types: status = "error" error = f"无效的工程量类型: '{quantity_type}'" helper_info = valid_types return status, data, error, helper_info try: # 步骤1: 找到对应路径的父节点 parent_node = self.get_node_by_path(parent_path) if not parent_node: status = "error" error = f"找不到父节点路径: {parent_path}" # 尝试提供辅助信息 - 路径的各部分是否存在 path_parts = parent_path.split("/") existing_parts = [] for i in range(len(path_parts)): partial_path = "/".join(path_parts[: i + 1]) if self.get_node_by_path(partial_path): existing_parts.append(partial_path) helper_info = existing_parts return status, data, error, helper_info # 从父节点中提取名称,用于后续查询 parent_name = parent_node.get("name", "") # 根据工程量类型确定类型条件,不使用标签过滤 type_condition = "" if quantity_type == "定额": # 定额类型可能是 ProjectQuantity+Quota 或者 类型='0' type_condition = "(q:ProjectQuantity:Quota OR q.类型 = '0')" elif quantity_type == "主材": # 主材类型可能是 ProjectQuantity+MainMaterial 或者 类型='1' type_condition = "(q:ProjectQuantity:MainMaterial OR q.类型 = '1')" elif quantity_type == "设备": # 设备类型可能是 ProjectQuantity+Equipment 或者 类型='5' type_condition = "(q:ProjectQuantity:Equipment OR q.类型 = '5')" else: # 如果没有指定类型,则查询所有ProjectQuantity节点 type_condition = "q:ProjectQuantity" # 步骤2: 找到父节点下所有的子节点,类型为指定类型的节点 # 使用父节点名称进行查询,不指定具体的关系类型 child_query = f""" MATCH (p)-[*1..10]->(q) WHERE p.name = $parent_name AND {type_condition} RETURN q """ child_params = {"parent_name": parent_name} child_result = self.session.run(child_query, **child_params) child_nodes = [record["q"] for record in child_result] if not child_nodes: # 如果没有找到节点,尝试直接使用父节点路径的最后一部分进行查询 path_parts = parent_path.split("/") last_part = path_parts[-1] if path_parts else "" fallback_query = f""" MATCH (p)-[*1..10]->(q) WHERE p.name CONTAINS $last_part AND {type_condition} RETURN q LIMIT 100 """ fallback_params = {"last_part": last_part} fallback_result = self.session.run(fallback_query, **fallback_params) child_nodes = [record["q"] for record in fallback_result] if not child_nodes: status = "error" error = f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 的节点" # 尝试提供辅助信息 - 父节点下有哪些类型的节点 helper_query = """ MATCH (p)-[*1..10]->(q) WHERE p.name = $parent_name RETURN DISTINCT labels(q) as node_labels, q.类型 as node_type LIMIT 20 """ helper_params = {"parent_name": parent_name} helper_result = self.session.run(helper_query, **helper_params) helper_data = [] for record in helper_result: labels = record["node_labels"] if record["node_labels"] else [] node_type = record["node_type"] if record["node_type"] else "未知" helper_data.append(f"标签: {labels}, 类型: {node_type}") helper_info = helper_data return status, data, error, helper_info # 步骤3: 在找到的节点中找到名称中包含节点模糊名称的节点 matching_nodes = [] available_nodes = [] # 用于存储所有可用节点的名称,作为辅助信息 for node in child_nodes: node_name = node.get("name", "") # 记录所有节点名称作为辅助信息 if node_name: available_nodes.append(node_name) if partial_name in str(node_name): matching_nodes.append(node) if not matching_nodes: # 如果没有找到匹配的节点,尝试直接查询 direct_query = f""" MATCH (q) WHERE q.name CONTAINS $partial_name AND {type_condition} RETURN q LIMIT 20 """ direct_params = {"partial_name": partial_name} direct_result = self.session.run(direct_query, **direct_params) matching_nodes = [record["q"] for record in direct_result] if not matching_nodes: status = "error" error = ( f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 且名称包含 '{partial_name}' 的节点" ) # 提供辅助信息 - 该路径下所有可用的节点名称 helper_info = available_nodes return status, data, error, helper_info # 处理匹配的节点,将它们转换为对象 result_data = [] for matching_node in matching_nodes: # 创建对应类型的对象 quantity = self._create_quantity_object(matching_node, quantity_type) # 填充属性 for key, value in matching_node.items(): if hasattr(quantity, key): setattr(quantity, key, value) # 将对象的属性转换为字典 attrs = {} for key, value in vars(quantity).items(): if not key.startswith("_"): # 排除私有属性 attrs[key] = value result_data.append(attrs) # 成功找到匹配节点 data = result_data return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info # 辅助方法,用于根据节点数据创建对应类型的工程量对象 def _create_quantity_object(self, node_data, quantity_type=None): """ 根据节点数据创建对应类型的工程量对象 Args: node_data (dict): 节点数据 quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None) Returns: ProjectQuantity: 创建的工程量对象 """ # 如果指定了类型,直接创建对应类型的对象 if quantity_type == "定额": return Ration() elif quantity_type == "主材": return Material() elif quantity_type == "设备": return Equipment() # 如果没有指定类型,尝试通过节点属性或标签判断 if "类型" in node_data: if node_data["类型"] == "0": return Ration() elif node_data["类型"] == "1": return Material() elif node_data["类型"] == "5": return Equipment() # 通过标签判断 labels = list(node_data.labels) if hasattr(node_data, "labels") else [] if "Quota" in labels: return Ration() elif "MainMaterial" in labels: return Material() elif "Equipment" in labels: return Equipment() # 默认返回基类对象 return ProjectQuantity() # 材机查询方法实现 def get_material_equipment_by_path(self, path: str) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 通过路径获取材机对象(MaterialOrEquipment 类型节点) Args: path (str): 项目划分节点的路径,以'/'分隔的多级节点路径 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回的节点属性数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表: - 如果未找到目标节点,则包含父节点下所有 MaterialOrEquipment 节点名称 - 否则为空 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 获取目标节点 node_data = self.get_node_by_path(path) if not node_data: status = "error" error = f"找不到路径: {path} 上的 MaterialOrEquipment 节点" # 提取父路径 path_parts = path.split("/") if len(path_parts) > 1: parent_path = "/".join(path_parts[:-1]) # 父节点路径 parent_name = path_parts[-2] # 父节点名称 # 获取父节点 parent_node = self.get_node_by_path(parent_path) if parent_node: # 查询父节点下所有类型为 MaterialOrEquipment 的子节点名称 query = """ MATCH (p)-[*1..1]->(m:MaterialOrEquipment) WHERE p.name = $parent_name RETURN m.name AS name """ params = {"parent_name": parent_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询父节点下的材机子节点时出错: {str(e)}"] return status, data, error, helper_info # 将节点属性转换为字典并过滤掉私有字段 for key, value in node_data.items(): if not key.startswith("_"): data[key] = value return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_material_equipment_by_parent_and_code(self, parent_path, code): """ 通过父节点路径和编码获取材机对象 Args: parent_path (str): 父节点的部分路径,以'/'分隔的多级节点路径 code (str): 目标节点的编码属性值 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回的节点属性数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表: - 如果未找到目标节点,则包含父节点下所有 MaterialOrEquipment 节点编码 """ status = "success" data = {} error = "" helper_info = [] try: if not parent_path or not code: status = "error" error = "父节点路径或要查找的编码不能为空" return status, data, error, helper_info # 获取父节点路径的最后一部分 path_parts = parent_path.split("/") last_part = path_parts[-1] # 构建Cypher查询,查找包含指定路径名称的节点,然后找到其子孙节点中编码匹配的MaterialOrEquipment节点 # 使用任意类型的关系 [*0..10],不指定关系类型 query = """ MATCH (p)-[*0..10]->(m:MaterialOrEquipment) WHERE p.name CONTAINS $parent_name AND m.编码 = $code RETURN m LIMIT 5 """ params = {"parent_name": last_part, "code": code} result = self.session.run(query, **params) matched_nodes = [] for record in result: node = record["m"] properties = dict(node.items()) matched_nodes.append(properties) if not matched_nodes: status = "error" error = f"在路径包含 '{parent_path}' 的节点下没有找到编码为 '{code}' 的 MaterialOrEquipment 节点" # 查询相似路径下的MaterialOrEquipment节点的编码作为辅助信息 helper_query = """ MATCH (p)-[*0..10]->(m:MaterialOrEquipment) WHERE p.name CONTAINS $parent_name RETURN m.编码 AS code, m.name AS name LIMIT 5 """ try: helper_result = self.session.run(helper_query, parent_name=last_part) for record in helper_result: if record["code"] and record["name"]: helper_info.append(f"{record['code']} - {record['name']}") except Exception as e: helper_info = [f"查询相似编码失败: {str(e)}"] return status, data, error, helper_info # 成功找到,返回第一个节点的数据 data = matched_nodes[0] return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info # 辅助方法,用于创建材机对象并填充属性 def _create_material_object(self, node_data): """ 根据节点数据创建材机对象并填充属性 Args: node_data (dict): 节点数据 Returns: MaterialOrEquipment: 创建的材机对象 """ material = MaterialOrEquipment() # 填充属性 for key, value in node_data.items(): if hasattr(material, key): setattr(material, key, value) return material # 取费表模板查询方法实现 def get_fee_template_by_path(self, path: str) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 通过路径获取取费表模板节点 Args: path (str): 项目划分节点的路径,以'/'分隔的多级节点路径 例如:工程数据/取费表模板/余物清理/线路取费表(余物清理)/直接费 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回的节点属性数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表: - 如果未找到目标节点,则包含父节点下所有 FeeCollection 节点名称 - 否则为空 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 获取目标节点 node_data = self.get_node_by_path(path) if not node_data: status = "error" error = f"找不到路径: {path} 上的节点" # 提取父路径 path_parts = path.split("/") if len(path_parts) > 1: parent_path = "/".join(path_parts[:-1]) # 父节点路径 parent_name = path_parts[-2] # 父节点名称 # 获取父节点 parent_node = self.get_node_by_path(parent_path) if parent_node: # 查询父节点下所有类型为 FeeCollection 的子节点名称 query = """ MATCH (p)-[*1..1]->(f:FeeCollection) WHERE p.name = $parent_name RETURN f.name AS name """ params = {"parent_name": parent_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询父节点下的FeeCollection子节点时出错: {str(e)}"] return status, data, error, helper_info # 将节点属性转换为字典并过滤掉私有字段 for key, value in node_data.items(): if not key.startswith("_"): data[key] = value return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_fee_template_by_parent_and_name( self, parent_path: str, node_name: str ) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 通过父节点路径和节点名称获取取费表模板节点 Args: parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 例如:工程数据/取费表模板/余物清理/线路取费表(余物清理) node_name (str): 目标节点的名称,例如:直接费 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回的节点属性数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表: - 如果未找到父节点,则包含父节点的父节点下所有节点名称 - 如果未找到目标节点,则包含父节点下所有 FeeCollection 节点名称 - 否则为空 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 第一段:查找父节点 parent_node_data = self.get_node_by_path(parent_path) if not parent_node_data: status = "error" error = f"找不到父节点路径: {parent_path}" # 提取父节点的父路径 parent_path_parts = parent_path.split("/") if len(parent_path_parts) > 1: grandparent_path = "/".join(parent_path_parts[:-1]) # 父节点的父路径 grandparent_name = parent_path_parts[-2] # 父节点的父节点名称 # 获取父节点的父节点 grandparent_node = self.get_node_by_path(grandparent_path) if grandparent_node: # 查询父节点的父节点下所有子节点名称 query = """ MATCH (gp)-[*1..1]->(n) WHERE gp.name = $grandparent_name RETURN n.name AS name """ params = {"grandparent_name": grandparent_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询父节点的父节点下的子节点时出错: {str(e)}"] return status, data, error, helper_info # 第二段:在父节点下查找目标节点 parent_node_name = parent_path.split("/")[-1] # 父节点名称 # 查询父节点下指定名称的节点 query = """ MATCH (p)-[*1..1]->(n) WHERE p.name = $parent_name AND n.name = $node_name RETURN n """ params = {"parent_name": parent_node_name, "node_name": node_name} try: result = self.session.run(query, **params) target_node = None for record in result: target_node = record["n"] break if not target_node: status = "error" error = f"在父节点 {parent_node_name} 下找不到名称为 {node_name} 的节点" # 查询父节点下所有类型为 FeeCollection 的子节点名称 query = """ MATCH (p)-[*1..1]->(f:FeeCollection) WHERE p.name = $parent_name RETURN f.name AS name """ params = {"parent_name": parent_node_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询父节点下的FeeCollection子节点时出错: {str(e)}"] return status, data, error, helper_info # 将目标节点属性转换为字典并过滤掉私有字段 for key, value in target_node.items(): if not key.startswith("_"): data[key] = value return status, data, error, helper_info except Exception as e: status = "error" error = f"查询目标节点时出错: {str(e)}" return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info # 辅助方法,用于创建取费表模板对象并填充属性 def _create_fee_template_object(self, node_data): """ 根据节点数据创建取费表模板对象并填充属性 Args: node_data (dict): 节点数据 Returns: FeeTableTemplateItem: 创建的取费表模板对象 """ template = FeeTableTemplateItem() # 填充属性 for key, value in node_data.items(): if hasattr(template, key): setattr(template, key, value) # 更新缓存 if hasattr(template, "OutlayID") and template.OutlayID: self.fee_templates[template.OutlayID] = template return template # 费用表查询方法实现 def get_fee_schedule_on_auxiliary_expense_table( self, table_name: str, fee_name: str, fee_attribute: str ) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 在辅助费用表中查找费用 Args: table_name (str): 费用表名称 fee_name (str): 要查找的费用名称(可能在多级子节点中) fee_attribute (str): 费用值属性名 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回包含属性值的数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表: - 第一步失败:返回"工程数据"节点下所有子节点名称 - 第二步失败:返回费用表节点下所有子节点名称(递归查找) - 第三步失败:返回费用节点的所有可用属性名称 - 否则为空 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 检查输入参数 if not table_name or not fee_name or not fee_attribute: status = "error" error = "输入参数不能为空" return status, data, error, helper_info # 第一步:查找父节点(费用表节点) parent_path = f"工程数据/工程费用/{table_name}" parent_node_data = self.get_node_by_path(parent_path) if not parent_node_data: status = "error" error = f"找不到费用表节点: {parent_path}" # 查询"工程数据/工程费用"节点下所有子节点名称 base_parent_node = self.get_node_by_path("工程数据/工程费用") if base_parent_node: query = """ MATCH (p)-[*1..1]->(n) WHERE p.name = '工程费用' RETURN n.name AS name """ try: result = self.session.run(query) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] return status, data, error, helper_info # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) fee_node = None # 递归查找费用节点,最多查找3级深度 query = """ MATCH (p)-[*1..3]->(f) WHERE p.name = $table_name AND f.name = $fee_name RETURN f LIMIT 1 """ params = {"table_name": table_name, "fee_name": fee_name} try: result = self.session.run(query, **params) all_records = result.data() if len(all_records) > 0: fee_node = all_records[0]["f"] if not fee_node: status = "error" error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" # 查询费用表节点下所有子节点名称(递归查找) query = """ MATCH (p)-[*1..3]->(n) WHERE p.name = $table_name RETURN DISTINCT n.name AS name ORDER BY n.name """ params = {"table_name": table_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] return status, data, error, helper_info except Exception as e: status = "error" error = f"查询费用节点时出错: {str(e)}" return status, data, error, helper_info # 第三步:获取费用节点的属性值 try: if fee_node and hasattr(fee_node, "get"): fee_value = fee_node.get(fee_attribute) elif fee_node and isinstance(fee_node, dict): fee_value = fee_node.get(fee_attribute) else: # 如果fee_node是Neo4j Node对象,尝试直接访问属性 fee_value = ( getattr(fee_node, fee_attribute, None) if hasattr(fee_node, fee_attribute) else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None ) if fee_value is None: status = "error" error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" # 返回费用节点的所有可用属性名称 try: if hasattr(fee_node, "keys"): helper_info = list(fee_node.keys()) elif isinstance(fee_node, dict): helper_info = list(fee_node.keys()) else: # 对于Neo4j Node对象 helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] # 过滤掉私有属性 helper_info = [attr for attr in helper_info if not attr.startswith("_")] except Exception as e: helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] return status, data, error, helper_info # 成功获取属性值,只返回属性值 data = fee_value return status, data, error, helper_info except Exception as e: status = "error" error = f"获取费用属性值时出错: {str(e)}" return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_fee_schedule_on_other_expense_table( self, table_name: str, fee_name: str, fee_attribute: str ) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 在其它费用表中查找费用 Args: table_name (str): 费用表名称 fee_name (str): 要查找的费用名称(可能在多级子节点中) fee_attribute (str): 费用值属性名 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回包含属性值的数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表: - 第一步失败:返回"工程数据"节点下所有子节点名称 - 第二步失败:返回费用表节点下所有子节点名称(递归查找) - 第三步失败:返回费用节点的所有可用属性名称 - 否则为空 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 检查输入参数 if not table_name or not fee_name or not fee_attribute: status = "error" error = "输入参数不能为空" return status, data, error, helper_info # 第一步:查找父节点(费用表节点) parent_path = f"工程数据/工程费用/{table_name}" parent_node_data = self.get_node_by_path(parent_path) if not parent_node_data: status = "error" error = f"找不到费用表节点: {parent_path}" # 查询"工程数据/工程费用"节点下所有子节点名称 base_parent_node = self.get_node_by_path("工程数据/工程费用") if base_parent_node: query = """ MATCH (p)-[*1..1]->(n) WHERE p.name = '工程费用' RETURN n.name AS name """ try: result = self.session.run(query) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] return status, data, error, helper_info # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) fee_node = None # 递归查找费用节点,最多查找3级深度 query = """ MATCH (p)-[*1..3]->(f) WHERE p.name = $table_name AND f.name = $fee_name RETURN f LIMIT 1 """ params = {"table_name": table_name, "fee_name": fee_name} try: result = self.session.run(query, **params) all_records = result.data() if len(all_records) > 0: fee_node = all_records[0]["f"] if not fee_node: status = "error" error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" # 查询费用表节点下所有子节点名称(递归查找) query = """ MATCH (p)-[*1..3]->(n) WHERE p.name = $table_name RETURN DISTINCT n.name AS name ORDER BY n.name """ params = {"table_name": table_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] return status, data, error, helper_info except Exception as e: status = "error" error = f"查询费用节点时出错: {str(e)}" return status, data, error, helper_info # 第三步:获取费用节点的属性值 try: if fee_node and hasattr(fee_node, "get"): fee_value = fee_node.get(fee_attribute) elif fee_node and isinstance(fee_node, dict): fee_value = fee_node.get(fee_attribute) else: # 如果fee_node是Neo4j Node对象,尝试直接访问属性 fee_value = ( getattr(fee_node, fee_attribute, None) if hasattr(fee_node, fee_attribute) else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None ) if fee_value is None: status = "error" error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" # 返回费用节点的所有可用属性名称 try: if hasattr(fee_node, "keys"): helper_info = list(fee_node.keys()) elif isinstance(fee_node, dict): helper_info = list(fee_node.keys()) else: # 对于Neo4j Node对象 helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] # 过滤掉私有属性 helper_info = [attr for attr in helper_info if not attr.startswith("_")] except Exception as e: helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] return status, data, error, helper_info # 成功获取属性值,只返回属性值 data = fee_value return status, data, error, helper_info except Exception as e: status = "error" error = f"获取费用属性值时出错: {str(e)}" return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_fee_schedule_on_land_acquisition_fee_table_table( self, table_name: str, fee_name: str, fee_attribute: str ) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 在土地征用费表中查找费用 Args: table_name (str): 费用表名称 fee_name (str): 要查找的费用名称(可能在多级子节点中) fee_attribute (str): 费用值属性名 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回包含属性值的数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表: - 第一步失败:返回"工程数据"节点下所有子节点名称 - 第二步失败:返回费用表节点下所有子节点名称(递归查找) - 第三步失败:返回费用节点的所有可用属性名称 - 否则为空 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 检查输入参数 if not table_name or not fee_name or not fee_attribute: status = "error" error = "输入参数不能为空" return status, data, error, helper_info # 第一步:查找父节点(费用表节点) parent_path = f"工程数据/工程费用/{table_name}" parent_node_data = self.get_node_by_path(parent_path) if not parent_node_data: status = "error" error = f"找不到费用表节点: {parent_path}" # 查询"工程数据/工程费用"节点下所有子节点名称 base_parent_node = self.get_node_by_path("工程数据/工程费用") if base_parent_node: query = """ MATCH (p)-[*1..1]->(n) WHERE p.name = '工程费用' RETURN n.name AS name """ try: result = self.session.run(query) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] return status, data, error, helper_info # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) fee_node = None # 递归查找费用节点,最多查找3级深度 query = """ MATCH (p)-[*1..3]->(f) WHERE p.name = $table_name AND f.name = $fee_name RETURN f LIMIT 1 """ params = {"table_name": table_name, "fee_name": fee_name} try: result = self.session.run(query, **params) all_records = result.data() if len(all_records) > 0: fee_node = all_records[0]["f"] if not fee_node: status = "error" error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" # 查询费用表节点下所有子节点名称(递归查找) query = """ MATCH (p)-[*1..3]->(n) WHERE p.name = $table_name RETURN DISTINCT n.name AS name ORDER BY n.name """ params = {"table_name": table_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] return status, data, error, helper_info except Exception as e: status = "error" error = f"查询费用节点时出错: {str(e)}" return status, data, error, helper_info # 第三步:获取费用节点的属性值 try: if fee_node and hasattr(fee_node, "get"): fee_value = fee_node.get(fee_attribute) elif fee_node and isinstance(fee_node, dict): fee_value = fee_node.get(fee_attribute) else: # 如果fee_node是Neo4j Node对象,尝试直接访问属性 fee_value = ( getattr(fee_node, fee_attribute, None) if hasattr(fee_node, fee_attribute) else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None ) if fee_value is None: status = "error" error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" # 返回费用节点的所有可用属性名称 try: if hasattr(fee_node, "keys"): helper_info = list(fee_node.keys()) elif isinstance(fee_node, dict): helper_info = list(fee_node.keys()) else: # 对于Neo4j Node对象 helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] # 过滤掉私有属性 helper_info = [attr for attr in helper_info if not attr.startswith("_")] except Exception as e: helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] return status, data, error, helper_info # 成功获取属性值,只返回属性值 data = fee_value return status, data, error, helper_info except Exception as e: status = "error" error = f"获取费用属性值时出错: {str(e)}" return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_fee_schedule_on_installation_price_difference_table( self, table_name: str, fee_name: str, fee_attribute: str ) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 在安装价差表中查找费用 Args: table_name (str): 费用表名称 fee_name (str): 要查找的费用名称(可能在多级子节点中) fee_attribute (str): 费用值属性名 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回包含属性值的数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表: - 第一步失败:返回"工程数据"节点下所有子节点名称 - 第二步失败:返回费用表节点下所有子节点名称(递归查找) - 第三步失败:返回费用节点的所有可用属性名称 - 否则为空 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 检查输入参数 if not table_name or not fee_name or not fee_attribute: status = "error" error = "输入参数不能为空" return status, data, error, helper_info # 第一步:查找父节点(费用表节点) parent_path = f"工程数据/工程费用/{table_name}" parent_node_data = self.get_node_by_path(parent_path) if not parent_node_data: status = "error" error = f"找不到费用表节点: {parent_path}" # 查询"工程数据/工程费用"节点下所有子节点名称 base_parent_node = self.get_node_by_path("工程数据/工程费用") if base_parent_node: query = """ MATCH (p)-[*1..1]->(n) WHERE p.name = '工程费用' RETURN n.name AS name """ try: result = self.session.run(query) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] return status, data, error, helper_info # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) fee_node = None # 递归查找费用节点,最多查找3级深度 query = """ MATCH (p)-[*1..3]->(f) WHERE p.name = $table_name AND f.name = $fee_name RETURN f LIMIT 1 """ params = {"table_name": table_name, "fee_name": fee_name} try: result = self.session.run(query, **params) all_records = result.data() if len(all_records) > 0: fee_node = all_records[0]["f"] if not fee_node: status = "error" error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" # 查询费用表节点下所有子节点名称(递归查找) query = """ MATCH (p)-[*1..3]->(n) WHERE p.name = $table_name RETURN DISTINCT n.name AS name ORDER BY n.name """ params = {"table_name": table_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] return status, data, error, helper_info except Exception as e: status = "error" error = f"查询费用节点时出错: {str(e)}" return status, data, error, helper_info # 第三步:获取费用节点的属性值 try: if fee_node and hasattr(fee_node, "get"): fee_value = fee_node.get(fee_attribute) elif fee_node and isinstance(fee_node, dict): fee_value = fee_node.get(fee_attribute) else: # 如果fee_node是Neo4j Node对象,尝试直接访问属性 fee_value = ( getattr(fee_node, fee_attribute, None) if hasattr(fee_node, fee_attribute) else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None ) if fee_value is None: status = "error" error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" # 返回费用节点的所有可用属性名称 try: if hasattr(fee_node, "keys"): helper_info = list(fee_node.keys()) elif isinstance(fee_node, dict): helper_info = list(fee_node.keys()) else: # 对于Neo4j Node对象 helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] # 过滤掉私有属性 helper_info = [attr for attr in helper_info if not attr.startswith("_")] except Exception as e: helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] return status, data, error, helper_info # 成功获取属性值,只返回属性值 data = fee_value return status, data, error, helper_info except Exception as e: status = "error" error = f"获取费用属性值时出错: {str(e)}" return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_fee_schedule_on_Engineering_Cost_table( self, table_name: str, fee_name: str, fee_attribute: str ) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 在工程费用表中查找费用 Args: table_name (str): 费用表名称 fee_name (str): 要查找的费用名称(可能在多级子节点中) fee_attribute (str): 费用值属性名 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回包含属性值的数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表: - 第一步失败:返回"工程数据"节点下所有子节点名称 - 第二步失败:返回费用表节点下所有子节点名称(递归查找) - 第三步失败:返回费用节点的所有可用属性名称 - 否则为空 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 检查输入参数 if not table_name or not fee_name or not fee_attribute: status = "error" error = "输入参数不能为空" return status, data, error, helper_info # 第一步:查找父节点(费用表节点) parent_path = f"工程数据/工程费用/{table_name}" parent_node_data = self.get_node_by_path(parent_path) if not parent_node_data: status = "error" error = f"找不到费用表节点: {parent_path}" # 查询"工程数据/工程费用"节点下所有子节点名称 base_parent_node = self.get_node_by_path("工程数据/工程费用") if base_parent_node: query = """ MATCH (p)-[*1..1]->(n) WHERE p.name = '工程费用' RETURN n.name AS name """ try: result = self.session.run(query) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] return status, data, error, helper_info # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) fee_node = None # 递归查找费用节点,最多查找3级深度 query = """ MATCH (p)-[*1..3]->(f) WHERE p.name = $table_name AND f.name = $fee_name RETURN f LIMIT 1 """ params = {"table_name": table_name, "fee_name": fee_name} try: result = self.session.run(query, **params) all_records = result.data() if len(all_records) > 0: fee_node = all_records[0]["f"] if not fee_node: status = "error" error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" # 查询费用表节点下所有子节点名称(递归查找) query = """ MATCH (p)-[*1..3]->(n) WHERE p.name = $table_name RETURN DISTINCT n.name AS name ORDER BY n.name """ params = {"table_name": table_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] return status, data, error, helper_info except Exception as e: status = "error" error = f"查询费用节点时出错: {str(e)}" return status, data, error, helper_info # 第三步:获取费用节点的属性值 try: if fee_node and hasattr(fee_node, "get"): fee_value = fee_node.get(fee_attribute) elif fee_node and isinstance(fee_node, dict): fee_value = fee_node.get(fee_attribute) else: # 如果fee_node是Neo4j Node对象,尝试直接访问属性 fee_value = ( getattr(fee_node, fee_attribute, None) if hasattr(fee_node, fee_attribute) else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None ) if fee_value is None: status = "error" error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" # 返回费用节点的所有可用属性名称 try: if hasattr(fee_node, "keys"): helper_info = list(fee_node.keys()) elif isinstance(fee_node, dict): helper_info = list(fee_node.keys()) else: # 对于Neo4j Node对象 helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] # 过滤掉私有属性 helper_info = [attr for attr in helper_info if not attr.startswith("_")] except Exception as e: helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] return status, data, error, helper_info # 成功获取属性值,只返回属性值 data = fee_value return status, data, error, helper_info except Exception as e: status = "error" error = f"获取费用属性值时出错: {str(e)}" return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info # 工程属性查询方法 def get_project_property(self, property_name): """ 通过属性名称获取工程属性 Args: property_name (str): 属性名称 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回包含属性值的数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表 """ status = "success" data = {} error = "" helper_info = [] try: # 构建查询,查找父节点类型为ProjectPropertySet下类型为ProjectProperty的节点 query = """ MATCH (parent:ProjectPropertySet)-[*1..1]->(property:ProjectProperty) WHERE property.name = $property_name RETURN property LIMIT 1 """ params = {"property_name": property_name} result = self.session.run(query, **params) record = result.single() if not record: status = "error" error = f"找不到名称为 '{property_name}' 的工程属性节点" # 查询类似的节点名称作为辅助信息 similar_query = """ MATCH (parent:ProjectPropertySet)-[*1..1]->(property:ProjectProperty) RETURN property.name AS name LIMIT 5 """ try: similar_result = self.session.run(similar_query) for similar_record in similar_result: if similar_record["name"]: helper_info.append(similar_record["name"]) except Exception as e: helper_info = [f"查询类似节点时出错: {str(e)}"] return status, data, error, helper_info # 获取节点属性 property_node = record["property"] # 将节点所有属性添加到返回数据中 for key, value in property_node.items(): if not key.startswith("_"): data[key] = value return status, data, error, helper_info except Exception as e: status = "error" error = f"查询工程属性时出错: {str(e)}" return status, data, error, helper_info # 项目划分查找取费表 def get_fee_table_by_project_division( self, project_division_path, fee_name ) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 通过项目划分路径和取费名称查找费用 Args: project_division_path(str): 项目划分节点的路径 fee_name (str): 取费名称 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回包含属性值的数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 第一步:查找项目划分路径下的ProjectDivisionItem节点 division_node = self.get_node_by_path(project_division_path, ["ProjectDivisionItem"]) if not division_node: status = "error" error = f"找不到路径: {project_division_path} 上的ProjectDivisionItem节点" # 提取父路径 path_parts = project_division_path.split("/") if len(path_parts) > 1: parent_path = "/".join(path_parts[:-1]) parent_name = path_parts[-2] # 获取父节点下所有ProjectDivisionItem节点名称作为辅助信息 query = """ MATCH (p)-[*1..1]->(q:ProjectDivisionItem) WHERE p.name = $parent_name RETURN q.name as name """ params = {"parent_name": parent_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询父节点下的子节点时出错: {str(e)}"] return status, data, error, helper_info # 第二步:在项目划分节点的子节点中查找属性name为"费用预览集"且关系为USE的节点 query = """ MATCH (p:ProjectDivisionItem)-[r:USE]->(c:CostSet) WHERE id(p) = $division_node_id AND c.name = "费用预览集" RETURN c LIMIT 1 """ params = {"division_node_id": division_node.id} try: result = self.session.run(query, **params) record = result.single() if not record: status = "error" error = f"在项目划分节点下找不到name为'费用预览集'且关系为USE的CostSet节点" # 查询项目划分节点下的所有子节点作为辅助信息 helper_query = """ MATCH (p:ProjectDivisionItem)-[r]->(c) WHERE id(p) = $division_node_id RETURN type(r) as relation_type, c.name as name, labels(c) as labels LIMIT 5 """ helper_params = {"division_node_id": division_node.id} helper_result = self.session.run(helper_query, **helper_params) for helper_record in helper_result: relation_type = helper_record.get("relation_type", "未知关系") name = helper_record.get("name", "未知名称") labels = helper_record.get("labels", []) helper_info.append(f"关系类型: {relation_type}, 节点名称: {name}, 节点标签: {labels}") return status, data, error, helper_info cost_set_node = record["c"] except Exception as e: status = "error" error = f"查询CostSet节点时出错: {str(e)}" return status, data, error, helper_info # 第三步:在CostSet节点的子节点中查找名称为fee_name的CostItem节点 query = """ MATCH (c:CostSet)-[*1..1]->(i:CostItem) WHERE id(c) = $cost_set_id AND i.name = $fee_name RETURN i LIMIT 1 """ params = {"cost_set_id": cost_set_node.id, "fee_name": fee_name} try: result = self.session.run(query, **params) record = result.single() if not record: status = "error" error = f"在CostSet节点下找不到名称为 {fee_name} 的CostItem节点" # 查询该CostSet下所有CostItem节点名称作为辅助信息 helper_query = """ MATCH (c:CostSet)-[*1..1]->(i:CostItem) WHERE id(c) = $cost_set_id RETURN i.name AS name LIMIT 5 """ helper_params = {"cost_set_id": cost_set_node.id} helper_result = self.session.run(helper_query, **helper_params) for helper_record in helper_result: if helper_record["name"]: helper_info.append(helper_record["name"]) return status, data, error, helper_info cost_item_node = record["i"] # 将CostItem节点的所有属性添加到返回数据中 for key, value in cost_item_node.items(): if not key.startswith("_"): # 排除私有属性 data[key] = value return status, data, error, helper_info except Exception as e: status = "error" error = f"查询CostItem节点时出错: {str(e)}" return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info # 清单查找取费表 def get_fee_table_by_list( self, parent_path, list_code, list_unit, fee_name ) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 通过清单名称和取费名称查找费用 Args: parent_path (str): 父级路径 list_code (str): 清单编号 list_unit (str): 清单单位 fee_name (str): 取费名称 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回包含属性值的数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 第一步:查找父路径下类型为List的节点,且编码和单位匹配 query = """ MATCH (p)-[*0..10]->(l:List) WHERE p.name CONTAINS $parent_name AND l.编码 = $list_code AND l.单位 = $list_unit RETURN l LIMIT 1 """ params = {"parent_name": parent_path.split("/")[-1], "list_code": list_code, "list_unit": list_unit} try: result = self.session.run(query, **params) record = result.single() if not record: status = "error" error = f"在路径 {parent_path} 下找不到编码为 {list_code} 且单位为 {list_unit} 的List节点" # 查询路径下的List节点作为辅助信息 helper_query = """ MATCH (p)-[*0..10]->(l:List) WHERE p.name CONTAINS $parent_name RETURN l.编码 AS code, l.单位 AS unit, l.name AS name LIMIT 5 """ helper_params = {"parent_name": parent_path.split("/")[-1]} helper_result = self.session.run(helper_query, **helper_params) for helper_record in helper_result: code = helper_record.get("code", "") unit = helper_record.get("unit", "") name = helper_record.get("name", "") helper_info.append(f"{code} - {unit} - {name}") return status, data, error, helper_info list_node = record["l"] except Exception as e: status = "error" error = f"查询List节点时出错: {str(e)}" return status, data, error, helper_info # 第二步:在list节点的子节点中查找属性name为"费用预览集"且关系为USE的节点 query = """ MATCH (l:List)-[r:USE]->(c:CostSet) WHERE id(l) = $list_node_id AND c.name = "费用预览集" RETURN c LIMIT 1 """ params = {"list_node_id": list_node.id} try: result = self.session.run(query, **params) record = result.single() if not record: status = "error" error = f"在List节点下找不到name为'费用预览集'且关系为USE的CostSet节点" # 查询List节点下的所有子节点作为辅助信息 helper_query = """ MATCH (l:List)-[r]->(c) WHERE id(l) = $list_node_id RETURN type(r) as relation_type, c.name as name, labels(c) as labels LIMIT 5 """ helper_params = {"list_node_id": list_node.id} helper_result = self.session.run(helper_query, **helper_params) for helper_record in helper_result: relation_type = helper_record.get("relation_type", "未知关系") name = helper_record.get("name", "未知名称") labels = helper_record.get("labels", []) helper_info.append(f"关系类型: {relation_type}, 节点名称: {name}, 节点标签: {labels}") return status, data, error, helper_info cost_set_node = record["c"] except Exception as e: status = "error" error = f"查询CostSet节点时出错: {str(e)}" return status, data, error, helper_info # 第三步:在CostSet节点的子节点中查找名称为fee_name的CostItem节点 query = """ MATCH (c:CostSet)-[*1..1]->(i:CostItem) WHERE id(c) = $cost_set_id AND i.name = $fee_name RETURN i LIMIT 1 """ params = {"cost_set_id": cost_set_node.id, "fee_name": fee_name} try: result = self.session.run(query, **params) record = result.single() if not record: status = "error" error = f"在CostSet节点下找不到名称为 {fee_name} 的CostItem节点" # 查询该CostSet下所有CostItem节点名称作为辅助信息 helper_query = """ MATCH (c:CostSet)-[*1..1]->(i:CostItem) WHERE id(c) = $cost_set_id RETURN i.name AS name LIMIT 5 """ helper_params = {"cost_set_id": cost_set_node.id} helper_result = self.session.run(helper_query, **helper_params) for helper_record in helper_result: if helper_record["name"]: helper_info.append(helper_record["name"]) return status, data, error, helper_info cost_item_node = record["i"] # 将CostItem节点的所有属性添加到返回数据中 for key, value in cost_item_node.items(): if not key.startswith("_"): # 排除私有属性 data[key] = value return status, data, error, helper_info except Exception as e: status = "error" error = f"查询CostItem节点时出错: {str(e)}" return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info # 定额节点查找合价 def get_fee_table_by_quoto_code( self, parent_path, quantity_type, code, fee_name ) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 通过父级路径、工程量类型、定额编码和取费名称查找费用 Args: parent_path (str): 父级路径 quantity_type (str): 工程量类型 code (str): 定额编码 fee_name (str): 取费名称 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回包含属性值的数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 第一步:查找父路径下类型为Quota的节点,且编码匹配 # 根据工程量类型构建节点类型条件 node_type_condition = "" if quantity_type == "定额": node_type_condition = "(q:ProjectQuantity:Quota OR q.类型 = '0')" else: status = "error" error = f"工程量类型必须为'定额'" return status, data, error, helper_info query = """ MATCH (p)-[*0..10]->(q) WHERE p.name CONTAINS $parent_name AND q.编码 = $code AND {type_condition} RETURN q LIMIT 1 """.format( type_condition=node_type_condition ) params = {"parent_name": parent_path.split("/")[-1], "code": code} try: result = self.session.run(query, **params) record = result.single() if not record: status = "error" error = f"在路径 {parent_path} 下找不到编码为 {code} 的Quota节点" # 查询路径下的Quota节点作为辅助信息 helper_query = """ MATCH (p)-[*0..10]->(q) WHERE p.name CONTAINS $parent_name AND {type_condition} RETURN q.编码 AS code, q.name AS name LIMIT 5 """.format( type_condition=node_type_condition ) helper_params = {"parent_name": parent_path.split("/")[-1]} helper_result = self.session.run(helper_query, **helper_params) for helper_record in helper_result: code_val = helper_record.get("code", "") name = helper_record.get("name", "") helper_info.append(f"{code_val} - {name}") return status, data, error, helper_info quota_node = record["q"] # 获取GUID guid = quota_node.get("GUID") if not guid: status = "error" error = f"Quota节点没有GUID属性" return status, data, error, helper_info except Exception as e: status = "error" error = f"查询Quota节点时出错: {str(e)}" return status, data, error, helper_info # 第二步:查找GUID与Quota的GUID匹配的CostSet节点 query = """ MATCH (c:CostSet) WHERE c.name = $guid RETURN c LIMIT 1 """ params = {"guid": guid} try: result = self.session.run(query, **params) record = result.single() if not record: status = "error" error = f"找不到与GUID: {guid} 匹配的CostSet节点" # 查找一些CostSet节点名称作为辅助信息 helper_query = """ MATCH (c:CostSet) RETURN c.name AS name LIMIT 5 """ helper_result = self.session.run(helper_query) for helper_record in helper_result: if helper_record["name"]: helper_info.append(helper_record["name"]) return status, data, error, helper_info cost_set_node = record["c"] except Exception as e: status = "error" error = f"查询CostSet节点时出错: {str(e)}" return status, data, error, helper_info # 第三步:在CostSet节点的子节点中查找名称为fee_name的CostItem节点 query = """ MATCH (c:CostSet)-[*1..1]->(i:CostItem) WHERE c.name = $guid AND i.name = $fee_name RETURN i LIMIT 1 """ params = {"guid": guid, "fee_name": fee_name} try: result = self.session.run(query, **params) record = result.single() if not record: status = "error" error = f"在CostSet节点 {guid} 下找不到名称为 {fee_name} 的CostItem节点" # 查询该CostSet下所有CostItem节点名称作为辅助信息 helper_query = """ MATCH (c:CostSet)-[*1..1]->(i:CostItem) WHERE c.name = $guid RETURN i.name AS name LIMIT 5 """ helper_params = {"guid": guid} helper_result = self.session.run(helper_query, **helper_params) for helper_record in helper_result: if helper_record["name"]: helper_info.append(helper_record["name"]) return status, data, error, helper_info cost_item_node = record["i"] # 将CostItem节点的所有属性添加到返回数据中 for key, value in cost_item_node.items(): if not key.startswith("_"): # 排除私有属性 data[key] = value return status, data, error, helper_info except Exception as e: status = "error" error = f"查询CostItem节点时出错: {str(e)}" return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_fee_table_by_quantities_name( self, parent_path, quantity_type, quantity_name, fee_name ) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 通过父级路径、工程量类型、工程量名称和取费名称查找费用 Args: parent_path (str): 父级路径 quantity_type (str): 工程量类型 quantity_name (str): 工程量名称 fee_name (str): 取费名称 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回包含属性值的数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 第一步:根据工程量类型构建节点类型条件 node_type_condition = "" if quantity_type == "主材": node_type_condition = "(q:ProjectQuantity:MainMaterial OR q.类型 = '1')" elif quantity_type == "设备": node_type_condition = "(q:ProjectQuantity:Equipment OR q.类型 = '5')" else: status = "error" error = f"工程量类型必须为'主材'或'设备'" helper_info = ["主材", "设备"] return status, data, error, helper_info # 查找父路径下指定类型的节点,且名称匹配 query = """ MATCH (p)-[*0..10]->(q) WHERE p.name CONTAINS $parent_name AND q.name CONTAINS $quantity_name AND {type_condition} RETURN q LIMIT 1 """.format( type_condition=node_type_condition ) params = {"parent_name": parent_path.split("/")[-1], "quantity_name": quantity_name} try: result = self.session.run(query, **params) record = result.single() if not record: status = "error" error = ( f"在路径 {parent_path} 下找不到类型为 {quantity_type} 且名称包含 {quantity_name} 的工程量节点" ) # 查询路径下的工程量节点作为辅助信息 helper_query = """ MATCH (p)-[*0..10]->(q) WHERE p.name CONTAINS $parent_name AND {type_condition} RETURN q.name AS name LIMIT 5 """.format( type_condition=node_type_condition ) helper_params = {"parent_name": parent_path.split("/")[-1]} helper_result = self.session.run(helper_query, **helper_params) for helper_record in helper_result: name = helper_record.get("name", "") helper_info.append(name) return status, data, error, helper_info quantity_node = record["q"] # 获取GUID guid = quantity_node.get("GUID") if not guid: status = "error" error = f"工程量节点没有GUID属性" return status, data, error, helper_info except Exception as e: status = "error" error = f"查询工程量节点时出错: {str(e)}" return status, data, error, helper_info # 第二步:查找GUID与工程量节点的GUID匹配的CostSet节点 query = """ MATCH (c:CostSet) WHERE c.name = $guid RETURN c LIMIT 1 """ params = {"guid": guid} try: result = self.session.run(query, **params) record = result.single() if not record: status = "error" error = f"找不到与GUID: {guid} 匹配的CostSet节点" # 查找一些CostSet节点名称作为辅助信息 helper_query = """ MATCH (c:CostSet) RETURN c.name AS name LIMIT 5 """ helper_result = self.session.run(helper_query) for helper_record in helper_result: if helper_record["name"]: helper_info.append(helper_record["name"]) return status, data, error, helper_info cost_set_node = record["c"] except Exception as e: status = "error" error = f"查询CostSet节点时出错: {str(e)}" return status, data, error, helper_info # 第三步:在CostSet节点的子节点中查找名称为fee_name的CostItem节点 query = """ MATCH (c:CostSet)-[*1..1]->(i:CostItem) WHERE c.name = $guid AND i.name = $fee_name RETURN i LIMIT 1 """ params = {"guid": guid, "fee_name": fee_name} try: result = self.session.run(query, **params) record = result.single() if not record: status = "error" error = f"在CostSet节点 {guid} 下找不到名称为 {fee_name} 的CostItem节点" # 查询该CostSet下所有CostItem节点名称作为辅助信息 helper_query = """ MATCH (c:CostSet)-[*1..1]->(i:CostItem) WHERE c.name = $guid RETURN i.name AS name LIMIT 5 """ helper_params = {"guid": guid} helper_result = self.session.run(helper_query, **helper_params) for helper_record in helper_result: if helper_record["name"]: helper_info.append(helper_record["name"]) return status, data, error, helper_info cost_item_node = record["i"] # 将CostItem节点的所有属性添加到返回数据中 for key, value in cost_item_node.items(): if not key.startswith("_"): # 排除私有属性 data[key] = value return status, data, error, helper_info except Exception as e: status = "error" error = f"查询CostItem节点时出错: {str(e)}" return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info # 材机节点查找市场价 def get_fee_by_material_equipment_code(self, parent_path, material_equipment_code, fee_name): """ 通过父级路径、材机编码和取费名称查找费用 Args: parent_path (str): 父级路径 material_equipment_code (str): 材机编码 fee_name (str): 取费名称 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回包含属性值的数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表 """ pass # 查找清单节点 def get_fee_table_by_list_name(self, parent_path, list_code, unit): """ 通过父级路径、清单编号和单位查找清单 Args: parent_path (str): 父级路径 list_code (str): 清单编号 unit (str): 单位 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回包含属性值的数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 查找父路径下类型为List的节点,且编码和单位匹配 query = """ MATCH (p)-[*0..10]->(l:List) WHERE p.name CONTAINS $parent_name AND l.编码 = $list_code AND l.单位 = $unit RETURN l LIMIT 1 """ params = {"parent_name": parent_path.split("/")[-1], "list_code": list_code, "unit": unit} try: result = self.session.run(query, **params) record = result.single() if not record: status = "error" error = f"在路径 {parent_path} 下找不到编码为 {list_code} 且单位为 {unit} 的List节点" # 查询路径下的List节点作为辅助信息 helper_query = """ MATCH (p)-[*0..10]->(l:List) WHERE p.name CONTAINS $parent_name RETURN l.编码 AS code, l.单位 AS unit, l.name AS name LIMIT 5 """ helper_params = {"parent_name": parent_path.split("/")[-1]} helper_result = self.session.run(helper_query, **helper_params) for helper_record in helper_result: code = helper_record.get("code", "") unit_val = helper_record.get("unit", "") name = helper_record.get("name", "") helper_info.append(f"{code} - {unit_val} - {name}") return status, data, error, helper_info list_node = record["l"] # 将节点的所有属性添加到返回数据中 for key, value in list_node.items(): if not key.startswith("_"): # 排除私有属性 data[key] = value return status, data, error, helper_info except Exception as e: status = "error" error = f"查询List节点时出错: {str(e)}" return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info