from neo4j import GraphDatabase from project import * import atexit from typing import Tuple, List, Dict, Any, Optional import logging logger = logging.getLogger("project_implementation") class ProjectTookiItNeo4j(ProjectTookiIt): """ 基于Neo4j数据库的项目类实现 """ def __init__(self): logger.info('开始初始化Neo4j连接') """ 初始化Neo4j连接 Args: uri (str): Neo4j数据库URI user (str): 用户名 password (str): 密码 """ uri = "bolt://localhost:7487" user = "neo4j" password = "password" super().__init__() self.driver = GraphDatabase.driver(uri, auth=(user, password)) self.session = self.driver.session() # 初始化其他必要的数据结构 self.material_equipment_dict = {} # 材机字典,键为ID self.fee_templates = {} # 取费表模板字典,键为ID self.fee_schedules = {} # 费用表字典,键为ID self.project_properties = {} # 工程属性字典 logger.info('Neo4j连接初始化完成') def close(self): logger.info('开始关闭数据库连接') """ 关闭数据库连接 """ if self.session: self.session.close() if self.driver: self.driver.close() logger.info('数据库连接已关闭') def debug_path(self, path): logger.info(f'开始调试路径: {path}') path_parts = path.split("/") print(f"调试路径: {path}") # 检查每一级路径是否存在 for i in range(len(path_parts)): partial_path = "/".join(path_parts[: i + 1]) query = """ MATCH (n) WHERE n.name = $name RETURN n.name as name """ result = self.session.run(query, name=path_parts[i]) record = result.single() exists = "存在" if record else "不存在" print(f" 节点 '{path_parts[i]}' {exists}") logger.info(f'路径 {path} 调试完成') # 通用节点查询方法 def get_node_by_path(self, path, node_labels=None): logger.info(f'开始通过路径 {path} 获取节点对象') """ 通过路径获取节点对象 Args: path (str): 以'/'分隔的多级节点路径 node_labels (list): 节点标签列表,用于过滤结果 Returns: dict|None: 节点数据,如果路径不存在返回None """ if not path: logger.warning('输入路径为空,无法获取节点对象') return None # 分割路径为各个部分 path_parts = path.split("/") # 构建查询 if len(path_parts) == 1: # 只有一级路径,直接查询 if node_labels: labels_str = ":" + "|:".join(node_labels) query = f""" MATCH (n{labels_str}) WHERE n.name = $name RETURN n LIMIT 1 """ else: query = """ MATCH (n) WHERE n.name = $name RETURN n LIMIT 1 """ params = {"name": path_parts[0]} else: # 多级路径,构建路径查询 last_part = path_parts[-1] if node_labels: labels_str = ":" + "|".join(node_labels) query = f""" MATCH path = (root)-[*]->(target{labels_str}) WHERE target.name = $last_part RETURN target as n LIMIT 1 """ else: query = """ MATCH path = (root)-[*]->(target) WHERE target.name = $last_part RETURN target as n LIMIT 1 """ params = {"last_part": last_part} try: result = self.session.run(query, **params) record = result.single() if not record: logger.warning(f'路径 {path} 不存在,未找到节点对象') return None logger.info(f'成功通过路径 {path} 获取节点对象') return record["n"] except Exception as e: logger.error(f'获取节点对象时出错: {e}') print(f"获取节点对象时出错: {e}") return None # 项目划分查询方法 def get_division_item_by_path(self, path) -> Tuple[str, Dict[str, Any], str, List[Any]]: logger.info(f'开始通过路径 {path} 获取项目划分对象') """ 通过路径获取项目划分对象 Args: path (str): 项目划分节点的路径,以'/'分隔的多级节点路径 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回的项目划分对象数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表,如果未找到节点,则包含父节点下所有ProjectDivisionItem节点 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 获取节点数据 node_data = self.get_node_by_path(path, ["ProjectDivisionItem"]) if not node_data: status = "error" error = f"找不到路径: {path} 上的ProjectDivisionItem节点" logger.warning(error) # 提取父路径 path_parts = path.split("/") if len(path_parts) > 1: # 去掉最后一个部分,得到父路径 parent_path = "/".join(path_parts[:-1]) parent_name = path_parts[-2] # 父节点名称 # 获取父节点 parent_node = self.get_node_by_path(parent_path) if parent_node: # 查询父节点下所有类型为ProjectDivisionItem的子节点 query = """ MATCH (p)-[*1..1]->(q:ProjectDivisionItem) WHERE p.name = $parent_name RETURN q.name as name """ params = {"parent_name": parent_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询父节点下的子节点时出错: {str(e)}"] logger.error(helper_info[0]) logger.info(f'成功通过路径 {path} 获取项目划分对象') return status, data, error, helper_info # 创建项目划分对象并填充属性 item = ProjectDivisionItem() for key, value in node_data.items(): if hasattr(item, key): setattr(item, key, value) # 将对象的属性转换为字典 for key, value in vars(item).items(): if not key.startswith("_"): # 排除私有属性 data[key] = value return status, data, error, helper_info except Exception as e: logger.error(f'获取项目划分对象时出错: {e}') import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_division_node_by_parent_and_name( self, parent_path, partial_name ) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]: """ 通过父节点路径和模糊节点名称获取项目划分对象,包括子节点 执行两步查询: 1. 找到对应路径的父节点 2. 在父节点下查找名称中包含模糊名称的节点 Args: parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 partial_name (str): 目标节点的模糊或不完整名称 Returns: Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (List[Dict[str, Any]]): 成功时返回的数据列表 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表 """ # 初始化返回结果 status = "success" data = [] error = "" helper_info = [] # 检查部分名称是否为空 if not partial_name: status = "error" error = "节点名称不能为空" return status, data, error, helper_info try: # 第一步:找到父节点 parent_node_data = self.get_node_by_path(parent_path) # 如果找不到父节点 if not parent_node_data: status = "error" error = f"找不到父节点路径: {parent_path}" # 提取父路径的父节点 path_parts = parent_path.split("/") if len(path_parts) > 1: # 去掉最后一个部分,得到父路径的父路径 grandparent_path = "/".join(path_parts[:-1]) grandparent_name = path_parts[-2] if len(path_parts) >= 2 else "" # 查询父路径的父节点下所有类型为ProjectDivisionItem的节点,不指定关系类型 query = """ MATCH (p)-[*1..1]->(q:ProjectDivisionItem) WHERE p.name = $grandparent_name RETURN q.name as name """ params = {"grandparent_name": grandparent_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) # 如果没有找到任何节点,尝试更宽泛的查询 if not helper_info: broader_query = """ MATCH (p {name: $grandparent_name})-[*1..2]->(q:ProjectDivisionItem) RETURN q.name as name LIMIT 50 """ broader_result = self.session.run(broader_query, **params) for record in broader_result: if record["name"]: helper_info.append(record["name"]) except Exception as e: print(f"查询辅助信息时出错: {str(e)}") helper_info = [f"查询辅助信息时出错: {str(e)}"] return status, data, error, helper_info # 获取父节点名称 parent_name = parent_node_data.get("name") if not parent_name: status = "error" error = "父节点数据中没有name字段" return status, data, error, helper_info parent_name = parent_node_data.get("name", "") # 第二步:改用父节点名称进行查询 query = """ MATCH (p {name: $parent_name})-[*1..1]-(n:ProjectDivisionItem) WHERE toLower(n.name) CONTAINS toLower($partial_name) RETURN n LIMIT 50 """ params = {"parent_name": parent_name, "partial_name": partial_name.strip()} # print(f"调试参数: parent_name='{parent_name}', partial_name='{partial_name}'") result = self.session.run(query, **params) records = list(result) # print(f"调试返回: 找到{len(records)}条记录") # 处理查询结果 items = [] for record in records: node_data = record["n"] item = ProjectDivisionItem() for key, value in node_data.items(): if hasattr(item, key): setattr(item, key, value) items.append(vars(item)) # 如果没有找到匹配的节点 if not items: status = "error" error = f"在父节点 '{parent_name}' 下找不到名称包含 '{partial_name}' 的节点" # 查询父节点下所有类型为ProjectDivisionItem的节点作为辅助信息,不指定关系类型 helper_query = """ MATCH (p)-[*1..1]->(q:ProjectDivisionItem) WHERE p.id = $parent_id RETURN q.name as name """ helper_params = {"parent_name": parent_name} try: helper_result = self.session.run(helper_query, **helper_params) for record in helper_result: if record["name"]: helper_info.append(record["name"]) # 如果没有找到任何节点,尝试更宽泛的查询 if not helper_info: broader_query = """ MATCH (p)-[*1..2]->(q:ProjectDivisionItem) WHERE p.id = $parent_id RETURN q.name as name LIMIT 50 """ broader_result = self.session.run(broader_query, **helper_params) for record in broader_result: if record["name"]: helper_info.append(record["name"]) # 如果仍然没有找到,尝试使用名称而不是ID if not helper_info: name_query = """ MATCH (p {name: $parent_name})-[*1..2]->(q:ProjectDivisionItem) RETURN q.name as name LIMIT 50 """ name_params = {"parent_name": parent_name} name_result = self.session.run(name_query, **name_params) for record in name_result: if record["name"]: helper_info.append(record["name"]) except Exception as e: print(f"查询辅助信息时出错: {str(e)}") helper_info = [f"查询辅助信息时出错: {str(e)}"] return status, data, error, helper_info # 成功找到匹配节点 data = items return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_division_by_name(self, name_part) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]: """ 直接查找节点类型为ProjectDivisionItem且name字段包含输入名称的节点 Args: name_part (str): 节点名称的部分内容 Returns: Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (List[Dict[str, Any]]): 成功时返回的数据列表 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表 """ # 初始化返回结果 status = "success" data = [] error = "" helper_info = [] # 检查输入名称是否为空 if not name_part or name_part.strip() == "": status = "error" error = "输入的名称部分不能为空" return status, data, error, helper_info try: # 直接查询所有类型为ProjectDivisionItem且name包含输入名称的节点 query = """ MATCH (n:ProjectDivisionItem) WHERE toLower(n.name) CONTAINS toLower($name_part) RETURN n LIMIT 50 """ params = {"name_part": name_part.strip()} result = self.session.run(query, parameters=params) records = list(result) if not records: status = "error" error = f"找不到名称包含 '{name_part}' 的ProjectDivisionItem节点" # 提供一些可能相近的节点名称作为辅助信息 broader_query = """ MATCH (n:ProjectDivisionItem) RETURN n.name AS name LIMIT 20 """ broader_result = self.session.run(broader_query) for record in broader_result: if record["name"]: helper_info.append(record["name"]) return status, data, error, helper_info # 处理查询结果 items = [] for record in records: node_data = record["n"] item = ProjectDivisionItem() for key, value in node_data.items(): if hasattr(item, key): setattr(item, key, value) # 获取节点的完整路径作为附加信息 node_path_query = """ MATCH path = (root)-[*]->(target:ProjectDivisionItem) WHERE id(target) = $node_id WITH [node in nodes(path) | node.name] AS path_names RETURN path_names """ path_result = self.session.run(node_path_query, parameters={"node_id": node_data.id}) path_record = path_result.single() node_dict = vars(item) if path_record: node_dict["完整路径"] = "/".join([name for name in path_record["path_names"] if name]) items.append(node_dict) data = items return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info # 工程量查询方法 def get_quantities_by_paths(self, paths_str) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 获取指定项目路径下的工程量对象 Args: paths_str (str): 以'/'分隔的多级节点路径 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: - status: 'success'或'error' - data: 成功时返回的节点数据字典,失败时为{} - error: 错误信息,成功时为'' - helper_info: 辅助信息列表(当目标节点不存在时返回父节点下的所有ProjectQuantity节点名称) """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] if not paths_str: status = "error" error = "路径不能为空" return status, data, error, helper_info try: # 使用通用方法获取节点,考虑所有可能的工程量类型 node_data = self.get_node_by_path(paths_str, ["ProjectQuantity", "Quota", "MainMaterial", "Equipment"]) if node_data: # 根据节点标签或类型属性创建对应类型的对象 quantity = self._create_quantity_object(node_data) # 填充属性 for key, value in node_data.items(): if hasattr(quantity, key): setattr(quantity, key, value) # 将对象转为字典 data = vars(quantity) return status, data, error, helper_info # 如果找不到节点,尝试获取父节点下的所有ProjectQuantity节点作为辅助信息 path_parts = paths_str.split("/") if len(path_parts) > 1: parent_path = "/".join(path_parts[:-1]) target_name = path_parts[-1] # 获取父节点 parent_node_data = self.get_node_by_path(parent_path, ["ProjectDivisionItem"]) if parent_node_data: # 查询父节点下的所有符合条件的节点 query = """ MATCH (p)-[*1..1]->(n) WHERE p.name = $parent_name AND any(label IN labels(n) WHERE label IN $allowed_labels) RETURN n.name as name, labels(n) as labels """ params = { "parent_name": parent_node_data.get("name", ""), "allowed_labels": ["ProjectQuantity", "Quota", "MainMaterial", "Equipment"], } try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append({record["name"]}) except Exception as e: print(f"查询辅助信息时出错: {str(e)}") helper_info = [f"查询辅助信息时出错: {str(e)}"] status = "error" error = f"找不到路径: {paths_str}" return status, data, error, helper_info except Exception as e: status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_quantities_node_by_parent_and_code( self, parent_path, quantity_type=None, code=None ) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]: """ 通过父节点路径和编码获取工程量对象(定额、主材或设备),包括子节点 执行三步查询: 1. 找到对应路径的父节点 2. 找到父节点下多级子节点中节点类型为参数中节点类型的所有节点 3. 在找到的节点中查找编码与传入编码匹配的节点并返回 Args: parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None表示所有类型) code (str): 工程量编码,以'/'分隔的多个编码 Returns: Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (List[Dict[str, Any]]): 成功时返回的数据列表 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表 """ # 初始化返回结果 status = "success" data = [] error = "" helper_info = [] if not code: status = "error" error = "编码不能为空" return status, data, error, helper_info # 验证类型参数是否有效 valid_types = ["定额", "主材", "设备", None] if quantity_type not in valid_types: status = "error" error = f"无效的工程量类型: '{quantity_type}'" helper_info = valid_types return status, data, error, helper_info try: # 步骤1: 找到对应路径的父节点 parent_node = self.get_node_by_path(parent_path) if not parent_node: status = "error" error = f"找不到父节点路径: {parent_path}" # 尝试提供辅助信息 - 路径的各部分是否存在 path_parts = parent_path.split("/") existing_parts = [] for i in range(len(path_parts)): partial_path = "/".join(path_parts[: i + 1]) if self.get_node_by_path(partial_path): existing_parts.append(partial_path) helper_info = existing_parts return status, data, error, helper_info # 提取父节点名称用于后续查询 parent_name = parent_node.get("name", "") # 根据工程量类型确定类型条件 type_condition = "" if quantity_type == "定额": type_condition = "(q:ProjectQuantity:Quota OR q.类型 = '0')" elif quantity_type == "主材": type_condition = "(q:ProjectQuantity:MainMaterial OR q.类型 = '1')" elif quantity_type == "设备": type_condition = "(q:ProjectQuantity:Equipment OR q.类型 = '5')" else: type_condition = "q:ProjectQuantity" # 步骤2: 查找父节点下所有的指定类型的子节点 child_query = f""" MATCH (p)-[*1..10]->(q) WHERE p.name = $parent_name AND {type_condition} RETURN q """ child_params = {"parent_name": parent_name} child_result = self.session.run(child_query, **child_params) child_nodes = [record["q"] for record in child_result] if not child_nodes: status = "error" error = f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 的节点" # 尝试提供辅助信息 - 父节点下有哪些节点名称 helper_query = """ MATCH (p)-[*1..10]->(q) WHERE p.name = $parent_name RETURN DISTINCT q.name as node_name LIMIT 20 """ helper_result = self.session.run(helper_query, **child_params) helper_info = [record["node_name"] for record in helper_result if record["node_name"]] return status, data, error, helper_info # 步骤3: 查找编码匹配的节点 code_conditions = [] code_parts = code.split("/") for code_part in code_parts: if code_part.strip(): code_conditions.append(code_part.strip()) if not code_conditions: status = "error" error = "提供的编码为空" return status, data, error, helper_info # 在 child_nodes 中直接查找编码匹配的节点 matching_nodes = [] for node in child_nodes: node_code = node.get("编码", "") if any(code_part in str(node_code) for code_part in code_conditions): matching_nodes.append(node) if not matching_nodes: status = "error" error = f"在父节点 '{parent_name}' 下找不到编码匹配的节点" # 尝试提供辅助信息 - 所有指定类型的子节点的编码 helper_info = [node.get("编码") for node in child_nodes if node.get("编码")] return status, data, error, helper_info # 处理匹配的节点,转换为字典格式 result_data = [] for node in matching_nodes: quantity = self._create_quantity_object(node, quantity_type) for key, value in node.items(): if hasattr(quantity, key): setattr(quantity, key, value) attrs = {} for key, value in vars(quantity).items(): if not key.startswith("_"): # 排除私有属性 attrs[key] = value result_data.append(attrs) data = result_data return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_quantities_node_by_parent_and_name( self, parent_path, partial_name, quantity_type=None ) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]: """ 通过父节点路径、模糊节点名称和类型获取工程量对象(主材或者设备),包括子节点 执行三步查询: 1. 找到对应路径的父节点 2. 找到父节点下多级子节点中节点类型为参数中节点类型的所有节点 3. 在找到的节点中找到名称中包含节点模糊名称的节点返回 Args: parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 partial_name (str): 目标节点的模糊或不完整名称 quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None表示所有类型) Returns: Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (List[Dict[str, Any]]): 成功时返回的数据列表 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表 """ # 初始化返回结果 status = "success" data = [] error = "" helper_info = [] if not partial_name: status = "error" error = "节点名称不能为空" return status, data, error, helper_info # 验证类型参数是否有效 valid_types = ["定额", "主材", "设备", None] if quantity_type not in valid_types: status = "error" error = f"无效的工程量类型: '{quantity_type}'" helper_info = valid_types return status, data, error, helper_info try: # 步骤1: 找到对应路径的父节点 parent_node = self.get_node_by_path(parent_path) if not parent_node: status = "error" error = f"找不到父节点路径: {parent_path}" # 尝试提供辅助信息 - 路径的各部分是否存在 path_parts = parent_path.split("/") existing_parts = [] for i in range(len(path_parts)): partial_path = "/".join(path_parts[: i + 1]) if self.get_node_by_path(partial_path): existing_parts.append(partial_path) helper_info = existing_parts return status, data, error, helper_info # 从父节点中提取名称,用于后续查询 parent_name = parent_node.get("name", "") # 根据工程量类型确定类型条件,不使用标签过滤 type_condition = "" if quantity_type == "定额": # 定额类型可能是 ProjectQuantity+Quota 或者 类型='0' type_condition = "(q:ProjectQuantity:Quota OR q.类型 = '0')" elif quantity_type == "主材": # 主材类型可能是 ProjectQuantity+MainMaterial 或者 类型='1' type_condition = "(q:ProjectQuantity:MainMaterial OR q.类型 = '1')" elif quantity_type == "设备": # 设备类型可能是 ProjectQuantity+Equipment 或者 类型='5' type_condition = "(q:ProjectQuantity:Equipment OR q.类型 = '5')" else: # 如果没有指定类型,则查询所有ProjectQuantity节点 type_condition = "q:ProjectQuantity" # 步骤2: 找到父节点下所有的子节点,类型为指定类型的节点 # 使用父节点名称进行查询,不指定具体的关系类型 child_query = f""" MATCH (p)-[*1..10]->(q) WHERE p.name = $parent_name AND {type_condition} RETURN q """ child_params = {"parent_name": parent_name} child_result = self.session.run(child_query, **child_params) child_nodes = [record["q"] for record in child_result] if not child_nodes: # 如果没有找到节点,尝试直接使用父节点路径的最后一部分进行查询 path_parts = parent_path.split("/") last_part = path_parts[-1] if path_parts else "" fallback_query = f""" MATCH (p)-[*1..10]->(q) WHERE p.name CONTAINS $last_part AND {type_condition} RETURN q LIMIT 100 """ fallback_params = {"last_part": last_part} fallback_result = self.session.run(fallback_query, **fallback_params) child_nodes = [record["q"] for record in fallback_result] if not child_nodes: status = "error" error = f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 的节点" # 尝试提供辅助信息 - 父节点下有哪些类型的节点 helper_query = """ MATCH (p)-[*1..10]->(q) WHERE p.name = $parent_name RETURN DISTINCT labels(q) as node_labels, q.类型 as node_type LIMIT 20 """ helper_params = {"parent_name": parent_name} helper_result = self.session.run(helper_query, **helper_params) helper_data = [] for record in helper_result: labels = record["node_labels"] if record["node_labels"] else [] node_type = record["node_type"] if record["node_type"] else "未知" helper_data.append(f"标签: {labels}, 类型: {node_type}") helper_info = helper_data return status, data, error, helper_info # 步骤3: 在找到的节点中找到名称中包含节点模糊名称的节点 matching_nodes = [] available_nodes = [] # 用于存储所有可用节点的名称,作为辅助信息 for node in child_nodes: node_name = node.get("name", "") # 记录所有节点名称作为辅助信息 if node_name: available_nodes.append(node_name) if partial_name in str(node_name): matching_nodes.append(node) if not matching_nodes: # 如果没有找到匹配的节点,尝试直接查询 direct_query = f""" MATCH (q) WHERE q.name CONTAINS $partial_name AND {type_condition} RETURN q LIMIT 20 """ direct_params = {"partial_name": partial_name} direct_result = self.session.run(direct_query, **direct_params) matching_nodes = [record["q"] for record in direct_result] if not matching_nodes: status = "error" error = ( f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 且名称包含 '{partial_name}' 的节点" ) # 提供辅助信息 - 该路径下所有可用的节点名称 helper_info = available_nodes return status, data, error, helper_info # 处理匹配的节点,将它们转换为对象 result_data = [] for matching_node in matching_nodes: # 创建对应类型的对象 quantity = self._create_quantity_object(matching_node, quantity_type) # 填充属性 for key, value in matching_node.items(): if hasattr(quantity, key): setattr(quantity, key, value) # 将对象的属性转换为字典 attrs = {} for key, value in vars(quantity).items(): if not key.startswith("_"): # 排除私有属性 attrs[key] = value result_data.append(attrs) # 成功找到匹配节点 data = result_data return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info # 辅助方法,用于根据节点数据创建对应类型的工程量对象 def _create_quantity_object(self, node_data, quantity_type=None): """ 根据节点数据创建对应类型的工程量对象 Args: node_data (dict): 节点数据 quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None) Returns: ProjectQuantity: 创建的工程量对象 """ # 如果指定了类型,直接创建对应类型的对象 if quantity_type == "定额": return Ration() elif quantity_type == "主材": return Material() elif quantity_type == "设备": return Equipment() # 如果没有指定类型,尝试通过节点属性或标签判断 if "类型" in node_data: if node_data["类型"] == "0": return Ration() elif node_data["类型"] == "1": return Material() elif node_data["类型"] == "5": return Equipment() # 通过标签判断 labels = list(node_data.labels) if hasattr(node_data, "labels") else [] if "Quota" in labels: return Ration() elif "MainMaterial" in labels: return Material() elif "Equipment" in labels: return Equipment() # 默认返回基类对象 return ProjectQuantity() # 材机查询方法实现 def get_material_equipment_by_path(self, path: str) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 通过路径获取材机对象(MaterialOrEquipment 类型节点) Args: path (str): 项目划分节点的路径,以'/'分隔的多级节点路径 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回的节点属性数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表: - 如果未找到目标节点,则包含父节点下所有 MaterialOrEquipment 节点名称 - 否则为空 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 获取目标节点 node_data = self.get_node_by_path(path) if not node_data: status = "error" error = f"找不到路径: {path} 上的 MaterialOrEquipment 节点" # 提取父路径 path_parts = path.split("/") if len(path_parts) > 1: parent_path = "/".join(path_parts[:-1]) # 父节点路径 parent_name = path_parts[-2] # 父节点名称 # 获取父节点 parent_node = self.get_node_by_path(parent_path) if parent_node: # 查询父节点下所有类型为 MaterialOrEquipment 的子节点名称 query = """ MATCH (p)-[*1..1]->(m:MaterialOrEquipment) WHERE p.name = $parent_name RETURN m.name AS name """ params = {"parent_name": parent_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询父节点下的材机子节点时出错: {str(e)}"] return status, data, error, helper_info # 将节点属性转换为字典并过滤掉私有字段 for key, value in node_data.items(): if not key.startswith("_"): data[key] = value return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_material_equipment_by_parent_and_name( self, parent_path: str, partial_name: str ) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 通过父节点路径和模糊名称获取材机对象 Args: parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 partial_name (str): 目标节点的模糊或不完整名称 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 """ status = "success" data = {} error = "" helper_info = [] try: if not parent_path or not partial_name: status = "error" error = "父节点路径或要查找的名称不能为空" return status, data, error, helper_info # 第一步:查找父节点 parent_node_data = self.get_node_by_path(parent_path) if not parent_node_data: status = "error" error = f"找不到父节点路径: {parent_path}" # 获取祖父节点路径 path_parts = parent_path.split("/") if len(path_parts) > 1: grandparent_path = "/".join(path_parts[:-1]) grandparent_name = path_parts[-2] # 查询祖父节点下的所有子节点名称作为 helper_info query = """ MATCH (p)-[*1..1]->(child) WHERE p.name = $grandparent_name RETURN child.name AS name """ params = {"grandparent_name": grandparent_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询祖父节点下的子节点失败: {str(e)}"] return status, data, error, helper_info parent_id = parent_node_data.get("id") if not parent_id: status = "error" error = f"父节点 {parent_path} 没有有效的 ID" return status, data, error, helper_info # 第二步:查找父节点下名称包含 partial_name 的 MaterialOrEquipment 节点 query = """ MATCH (p)-[*1..1]->(m:MaterialOrEquipment) WHERE p.id = $parent_id AND m.name CONTAINS $partial_name RETURN m LIMIT 20 """ params = {"parent_id": parent_id, "partial_name": partial_name} result = self.session.run(query, **params) matched_nodes = [] for record in result: node = record["m"] properties = dict(node.items()) matched_nodes.append(properties) if not matched_nodes: status = "error" error = f"在路径 {parent_path} 下没有找到名称包含 '{partial_name}' 的 MaterialOrEquipment 节点" # 查询父节点下所有 MaterialOrEquipment 子节点名称作为 helper_info query_helper = """ MATCH (p)-[*1..1]->(m:MaterialOrEquipment) WHERE p.id = $parent_id RETURN m.name AS name """ try: result_helper = self.session.run(query_helper, parent_id=parent_id) for record in result_helper: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询父节点下的 MaterialOrEquipment 子节点失败: {str(e)}"] return status, data, error, helper_info # 成功找到,返回第一个节点的数据 data = matched_nodes[0] return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info # 辅助方法,用于创建材机对象并填充属性 def _create_material_object(self, node_data): """ 根据节点数据创建材机对象并填充属性 Args: node_data (dict): 节点数据 Returns: MaterialOrEquipment: 创建的材机对象 """ material = MaterialOrEquipment() # 填充属性 for key, value in node_data.items(): if hasattr(material, key): setattr(material, key, value) return material # 取费表模板查询方法实现 def get_fee_template_by_path(self, path: str) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 通过路径获取取费表模板节点 Args: path (str): 项目划分节点的路径,以'/'分隔的多级节点路径 例如:工程数据/取费表模板/余物清理/线路取费表(余物清理)/直接费 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回的节点属性数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表: - 如果未找到目标节点,则包含父节点下所有 FeeCollection 节点名称 - 否则为空 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 获取目标节点 node_data = self.get_node_by_path(path) if not node_data: status = "error" error = f"找不到路径: {path} 上的节点" # 提取父路径 path_parts = path.split("/") if len(path_parts) > 1: parent_path = "/".join(path_parts[:-1]) # 父节点路径 parent_name = path_parts[-2] # 父节点名称 # 获取父节点 parent_node = self.get_node_by_path(parent_path) if parent_node: # 查询父节点下所有类型为 FeeCollection 的子节点名称 query = """ MATCH (p)-[*1..1]->(f:FeeCollection) WHERE p.name = $parent_name RETURN f.name AS name """ params = {"parent_name": parent_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询父节点下的FeeCollection子节点时出错: {str(e)}"] return status, data, error, helper_info # 将节点属性转换为字典并过滤掉私有字段 for key, value in node_data.items(): if not key.startswith("_"): data[key] = value return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_fee_template_by_parent_and_name( self, parent_path: str, node_name: str ) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 通过父节点路径和节点名称获取取费表模板节点 Args: parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 例如:工程数据/取费表模板/余物清理/线路取费表(余物清理) node_name (str): 目标节点的名称,例如:直接费 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回的节点属性数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表: - 如果未找到父节点,则包含父节点的父节点下所有节点名称 - 如果未找到目标节点,则包含父节点下所有 FeeCollection 节点名称 - 否则为空 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 第一段:查找父节点 parent_node_data = self.get_node_by_path(parent_path) if not parent_node_data: status = "error" error = f"找不到父节点路径: {parent_path}" # 提取父节点的父路径 parent_path_parts = parent_path.split("/") if len(parent_path_parts) > 1: grandparent_path = "/".join(parent_path_parts[:-1]) # 父节点的父路径 grandparent_name = parent_path_parts[-2] # 父节点的父节点名称 # 获取父节点的父节点 grandparent_node = self.get_node_by_path(grandparent_path) if grandparent_node: # 查询父节点的父节点下所有子节点名称 query = """ MATCH (gp)-[*1..1]->(n) WHERE gp.name = $grandparent_name RETURN n.name AS name """ params = {"grandparent_name": grandparent_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询父节点的父节点下的子节点时出错: {str(e)}"] return status, data, error, helper_info # 第二段:在父节点下查找目标节点 parent_node_name = parent_path.split("/")[-1] # 父节点名称 # 查询父节点下指定名称的节点 query = """ MATCH (p)-[*1..1]->(n) WHERE p.name = $parent_name AND n.name = $node_name RETURN n """ params = {"parent_name": parent_node_name, "node_name": node_name} try: result = self.session.run(query, **params) target_node = None for record in result: target_node = record["n"] break if not target_node: status = "error" error = f"在父节点 {parent_node_name} 下找不到名称为 {node_name} 的节点" # 查询父节点下所有类型为 FeeCollection 的子节点名称 query = """ MATCH (p)-[*1..1]->(f:FeeCollection) WHERE p.name = $parent_name RETURN f.name AS name """ params = {"parent_name": parent_node_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询父节点下的FeeCollection子节点时出错: {str(e)}"] return status, data, error, helper_info # 将目标节点属性转换为字典并过滤掉私有字段 for key, value in target_node.items(): if not key.startswith("_"): data[key] = value return status, data, error, helper_info except Exception as e: status = "error" error = f"查询目标节点时出错: {str(e)}" return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info # 辅助方法,用于创建取费表模板对象并填充属性 def _create_fee_template_object(self, node_data): """ 根据节点数据创建取费表模板对象并填充属性 Args: node_data (dict): 节点数据 Returns: FeeTableTemplateItem: 创建的取费表模板对象 """ template = FeeTableTemplateItem() # 填充属性 for key, value in node_data.items(): if hasattr(template, key): setattr(template, key, value) # 更新缓存 if hasattr(template, "OutlayID") and template.OutlayID: self.fee_templates[template.OutlayID] = template return template # 费用表查询方法实现 def get_fee_schedule_on_auxiliary_expense_table( self, table_name: str, fee_name: str, fee_attribute: str ) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 在辅助费用表中查找费用 Args: table_name (str): 费用表名称 fee_name (str): 要查找的费用名称(可能在多级子节点中) fee_attribute (str): 费用值属性名 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回包含属性值的数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表: - 第一步失败:返回"工程数据"节点下所有子节点名称 - 第二步失败:返回费用表节点下所有子节点名称(递归查找) - 第三步失败:返回费用节点的所有可用属性名称 - 否则为空 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 检查输入参数 if not table_name or not fee_name or not fee_attribute: status = "error" error = "输入参数不能为空" return status, data, error, helper_info # 第一步:查找父节点(费用表节点) parent_path = f"工程数据/工程费用/{table_name}" parent_node_data = self.get_node_by_path(parent_path) if not parent_node_data: status = "error" error = f"找不到费用表节点: {parent_path}" # 查询"工程数据/工程费用"节点下所有子节点名称 base_parent_node = self.get_node_by_path("工程数据/工程费用") if base_parent_node: query = """ MATCH (p)-[*1..1]->(n) WHERE p.name = '工程费用' RETURN n.name AS name """ try: result = self.session.run(query) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] return status, data, error, helper_info # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) fee_node = None # 递归查找费用节点,最多查找3级深度 query = """ MATCH (p)-[*1..3]->(f) WHERE p.name = $table_name AND f.name = $fee_name RETURN f LIMIT 1 """ params = {"table_name": table_name, "fee_name": fee_name} try: result = self.session.run(query, **params) all_records = result.data() if len(all_records) > 0: fee_node = all_records[0]["f"] if not fee_node: status = "error" error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" # 查询费用表节点下所有子节点名称(递归查找) query = """ MATCH (p)-[*1..3]->(n) WHERE p.name = $table_name RETURN DISTINCT n.name AS name ORDER BY n.name """ params = {"table_name": table_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] return status, data, error, helper_info except Exception as e: status = "error" error = f"查询费用节点时出错: {str(e)}" return status, data, error, helper_info # 第三步:获取费用节点的属性值 try: if fee_node and hasattr(fee_node, "get"): fee_value = fee_node.get(fee_attribute) elif fee_node and isinstance(fee_node, dict): fee_value = fee_node.get(fee_attribute) else: # 如果fee_node是Neo4j Node对象,尝试直接访问属性 fee_value = ( getattr(fee_node, fee_attribute, None) if hasattr(fee_node, fee_attribute) else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None ) if fee_value is None: status = "error" error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" # 返回费用节点的所有可用属性名称 try: if hasattr(fee_node, "keys"): helper_info = list(fee_node.keys()) elif isinstance(fee_node, dict): helper_info = list(fee_node.keys()) else: # 对于Neo4j Node对象 helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] # 过滤掉私有属性 helper_info = [attr for attr in helper_info if not attr.startswith("_")] except Exception as e: helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] return status, data, error, helper_info # 成功获取属性值,只返回属性值 data = fee_value return status, data, error, helper_info except Exception as e: status = "error" error = f"获取费用属性值时出错: {str(e)}" return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_fee_schedule_on_other_expense_table( self, table_name: str, fee_name: str, fee_attribute: str ) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 在其它费用表中查找费用 Args: table_name (str): 费用表名称 fee_name (str): 要查找的费用名称(可能在多级子节点中) fee_attribute (str): 费用值属性名 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回包含属性值的数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表: - 第一步失败:返回"工程数据"节点下所有子节点名称 - 第二步失败:返回费用表节点下所有子节点名称(递归查找) - 第三步失败:返回费用节点的所有可用属性名称 - 否则为空 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 检查输入参数 if not table_name or not fee_name or not fee_attribute: status = "error" error = "输入参数不能为空" return status, data, error, helper_info # 第一步:查找父节点(费用表节点) parent_path = f"工程数据/工程费用/{table_name}" parent_node_data = self.get_node_by_path(parent_path) if not parent_node_data: status = "error" error = f"找不到费用表节点: {parent_path}" # 查询"工程数据/工程费用"节点下所有子节点名称 base_parent_node = self.get_node_by_path("工程数据/工程费用") if base_parent_node: query = """ MATCH (p)-[*1..1]->(n) WHERE p.name = '工程费用' RETURN n.name AS name """ try: result = self.session.run(query) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] return status, data, error, helper_info # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) fee_node = None # 递归查找费用节点,最多查找3级深度 query = """ MATCH (p)-[*1..3]->(f) WHERE p.name = $table_name AND f.name = $fee_name RETURN f LIMIT 1 """ params = {"table_name": table_name, "fee_name": fee_name} try: result = self.session.run(query, **params) all_records = result.data() if len(all_records) > 0: fee_node = all_records[0]["f"] if not fee_node: status = "error" error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" # 查询费用表节点下所有子节点名称(递归查找) query = """ MATCH (p)-[*1..3]->(n) WHERE p.name = $table_name RETURN DISTINCT n.name AS name ORDER BY n.name """ params = {"table_name": table_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] return status, data, error, helper_info except Exception as e: status = "error" error = f"查询费用节点时出错: {str(e)}" return status, data, error, helper_info # 第三步:获取费用节点的属性值 try: if fee_node and hasattr(fee_node, "get"): fee_value = fee_node.get(fee_attribute) elif fee_node and isinstance(fee_node, dict): fee_value = fee_node.get(fee_attribute) else: # 如果fee_node是Neo4j Node对象,尝试直接访问属性 fee_value = ( getattr(fee_node, fee_attribute, None) if hasattr(fee_node, fee_attribute) else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None ) if fee_value is None: status = "error" error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" # 返回费用节点的所有可用属性名称 try: if hasattr(fee_node, "keys"): helper_info = list(fee_node.keys()) elif isinstance(fee_node, dict): helper_info = list(fee_node.keys()) else: # 对于Neo4j Node对象 helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] # 过滤掉私有属性 helper_info = [attr for attr in helper_info if not attr.startswith("_")] except Exception as e: helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] return status, data, error, helper_info # 成功获取属性值,只返回属性值 data = fee_value return status, data, error, helper_info except Exception as e: status = "error" error = f"获取费用属性值时出错: {str(e)}" return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_fee_schedule_on_land_acquisition_fee_table_table( self, table_name: str, fee_name: str, fee_attribute: str ) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 在土地征用费表中查找费用 Args: table_name (str): 费用表名称 fee_name (str): 要查找的费用名称(可能在多级子节点中) fee_attribute (str): 费用值属性名 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回包含属性值的数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表: - 第一步失败:返回"工程数据"节点下所有子节点名称 - 第二步失败:返回费用表节点下所有子节点名称(递归查找) - 第三步失败:返回费用节点的所有可用属性名称 - 否则为空 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 检查输入参数 if not table_name or not fee_name or not fee_attribute: status = "error" error = "输入参数不能为空" return status, data, error, helper_info # 第一步:查找父节点(费用表节点) parent_path = f"工程数据/工程费用/{table_name}" parent_node_data = self.get_node_by_path(parent_path) if not parent_node_data: status = "error" error = f"找不到费用表节点: {parent_path}" # 查询"工程数据/工程费用"节点下所有子节点名称 base_parent_node = self.get_node_by_path("工程数据/工程费用") if base_parent_node: query = """ MATCH (p)-[*1..1]->(n) WHERE p.name = '工程费用' RETURN n.name AS name """ try: result = self.session.run(query) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] return status, data, error, helper_info # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) fee_node = None # 递归查找费用节点,最多查找3级深度 query = """ MATCH (p)-[*1..3]->(f) WHERE p.name = $table_name AND f.name = $fee_name RETURN f LIMIT 1 """ params = {"table_name": table_name, "fee_name": fee_name} try: result = self.session.run(query, **params) all_records = result.data() if len(all_records) > 0: fee_node = all_records[0]["f"] if not fee_node: status = "error" error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" # 查询费用表节点下所有子节点名称(递归查找) query = """ MATCH (p)-[*1..3]->(n) WHERE p.name = $table_name RETURN DISTINCT n.name AS name ORDER BY n.name """ params = {"table_name": table_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] return status, data, error, helper_info except Exception as e: status = "error" error = f"查询费用节点时出错: {str(e)}" return status, data, error, helper_info # 第三步:获取费用节点的属性值 try: if fee_node and hasattr(fee_node, "get"): fee_value = fee_node.get(fee_attribute) elif fee_node and isinstance(fee_node, dict): fee_value = fee_node.get(fee_attribute) else: # 如果fee_node是Neo4j Node对象,尝试直接访问属性 fee_value = ( getattr(fee_node, fee_attribute, None) if hasattr(fee_node, fee_attribute) else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None ) if fee_value is None: status = "error" error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" # 返回费用节点的所有可用属性名称 try: if hasattr(fee_node, "keys"): helper_info = list(fee_node.keys()) elif isinstance(fee_node, dict): helper_info = list(fee_node.keys()) else: # 对于Neo4j Node对象 helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] # 过滤掉私有属性 helper_info = [attr for attr in helper_info if not attr.startswith("_")] except Exception as e: helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] return status, data, error, helper_info # 成功获取属性值,只返回属性值 data = fee_value return status, data, error, helper_info except Exception as e: status = "error" error = f"获取费用属性值时出错: {str(e)}" return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_fee_schedule_on_installation_price_difference_table( self, table_name: str, fee_name: str, fee_attribute: str ) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 在安装价差表中查找费用 Args: table_name (str): 费用表名称 fee_name (str): 要查找的费用名称(可能在多级子节点中) fee_attribute (str): 费用值属性名 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回包含属性值的数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表: - 第一步失败:返回"工程数据"节点下所有子节点名称 - 第二步失败:返回费用表节点下所有子节点名称(递归查找) - 第三步失败:返回费用节点的所有可用属性名称 - 否则为空 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 检查输入参数 if not table_name or not fee_name or not fee_attribute: status = "error" error = "输入参数不能为空" return status, data, error, helper_info # 第一步:查找父节点(费用表节点) parent_path = f"工程数据/工程费用/{table_name}" parent_node_data = self.get_node_by_path(parent_path) if not parent_node_data: status = "error" error = f"找不到费用表节点: {parent_path}" # 查询"工程数据/工程费用"节点下所有子节点名称 base_parent_node = self.get_node_by_path("工程数据/工程费用") if base_parent_node: query = """ MATCH (p)-[*1..1]->(n) WHERE p.name = '工程费用' RETURN n.name AS name """ try: result = self.session.run(query) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] return status, data, error, helper_info # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) fee_node = None # 递归查找费用节点,最多查找3级深度 query = """ MATCH (p)-[*1..3]->(f) WHERE p.name = $table_name AND f.name = $fee_name RETURN f LIMIT 1 """ params = {"table_name": table_name, "fee_name": fee_name} try: result = self.session.run(query, **params) all_records = result.data() if len(all_records) > 0: fee_node = all_records[0]["f"] if not fee_node: status = "error" error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" # 查询费用表节点下所有子节点名称(递归查找) query = """ MATCH (p)-[*1..3]->(n) WHERE p.name = $table_name RETURN DISTINCT n.name AS name ORDER BY n.name """ params = {"table_name": table_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] return status, data, error, helper_info except Exception as e: status = "error" error = f"查询费用节点时出错: {str(e)}" return status, data, error, helper_info # 第三步:获取费用节点的属性值 try: if fee_node and hasattr(fee_node, "get"): fee_value = fee_node.get(fee_attribute) elif fee_node and isinstance(fee_node, dict): fee_value = fee_node.get(fee_attribute) else: # 如果fee_node是Neo4j Node对象,尝试直接访问属性 fee_value = ( getattr(fee_node, fee_attribute, None) if hasattr(fee_node, fee_attribute) else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None ) if fee_value is None: status = "error" error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" # 返回费用节点的所有可用属性名称 try: if hasattr(fee_node, "keys"): helper_info = list(fee_node.keys()) elif isinstance(fee_node, dict): helper_info = list(fee_node.keys()) else: # 对于Neo4j Node对象 helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] # 过滤掉私有属性 helper_info = [attr for attr in helper_info if not attr.startswith("_")] except Exception as e: helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] return status, data, error, helper_info # 成功获取属性值,只返回属性值 data = fee_value return status, data, error, helper_info except Exception as e: status = "error" error = f"获取费用属性值时出错: {str(e)}" return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info def get_fee_schedule_on_Engineering_Cost_table( self, table_name: str, fee_name: str, fee_attribute: str ) -> Tuple[str, Dict[str, Any], str, List[Any]]: """ 在工程费用表中查找费用 Args: table_name (str): 费用表名称 fee_name (str): 要查找的费用名称(可能在多级子节点中) fee_attribute (str): 费用值属性名 Returns: Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - status (str): 状态,'success'或'error' - data (Dict[str, Any]): 成功时返回包含属性值的数据 - error (str): 错误时的错误信息 - helper_info (List[Any]): 辅助信息列表: - 第一步失败:返回"工程数据"节点下所有子节点名称 - 第二步失败:返回费用表节点下所有子节点名称(递归查找) - 第三步失败:返回费用节点的所有可用属性名称 - 否则为空 """ # 初始化返回结果 status = "success" data = {} error = "" helper_info = [] try: # 检查输入参数 if not table_name or not fee_name or not fee_attribute: status = "error" error = "输入参数不能为空" return status, data, error, helper_info # 第一步:查找父节点(费用表节点) parent_path = f"工程数据/工程费用/{table_name}" parent_node_data = self.get_node_by_path(parent_path) if not parent_node_data: status = "error" error = f"找不到费用表节点: {parent_path}" # 查询"工程数据/工程费用"节点下所有子节点名称 base_parent_node = self.get_node_by_path("工程数据/工程费用") if base_parent_node: query = """ MATCH (p)-[*1..1]->(n) WHERE p.name = '工程费用' RETURN n.name AS name """ try: result = self.session.run(query) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] return status, data, error, helper_info # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) fee_node = None # 递归查找费用节点,最多查找3级深度 query = """ MATCH (p)-[*1..3]->(f) WHERE p.name = $table_name AND f.name = $fee_name RETURN f LIMIT 1 """ params = {"table_name": table_name, "fee_name": fee_name} try: result = self.session.run(query, **params) all_records = result.data() if len(all_records) > 0: fee_node = all_records[0]["f"] if not fee_node: status = "error" error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" # 查询费用表节点下所有子节点名称(递归查找) query = """ MATCH (p)-[*1..3]->(n) WHERE p.name = $table_name RETURN DISTINCT n.name AS name ORDER BY n.name """ params = {"table_name": table_name} try: result = self.session.run(query, **params) for record in result: if record["name"]: helper_info.append(record["name"]) except Exception as e: helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] return status, data, error, helper_info except Exception as e: status = "error" error = f"查询费用节点时出错: {str(e)}" return status, data, error, helper_info # 第三步:获取费用节点的属性值 try: if fee_node and hasattr(fee_node, "get"): fee_value = fee_node.get(fee_attribute) elif fee_node and isinstance(fee_node, dict): fee_value = fee_node.get(fee_attribute) else: # 如果fee_node是Neo4j Node对象,尝试直接访问属性 fee_value = ( getattr(fee_node, fee_attribute, None) if hasattr(fee_node, fee_attribute) else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None ) if fee_value is None: status = "error" error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" # 返回费用节点的所有可用属性名称 try: if hasattr(fee_node, "keys"): helper_info = list(fee_node.keys()) elif isinstance(fee_node, dict): helper_info = list(fee_node.keys()) else: # 对于Neo4j Node对象 helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] # 过滤掉私有属性 helper_info = [attr for attr in helper_info if not attr.startswith("_")] except Exception as e: helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] return status, data, error, helper_info # 成功获取属性值,只返回属性值 data = fee_value return status, data, error, helper_info except Exception as e: status = "error" error = f"获取费用属性值时出错: {str(e)}" return status, data, error, helper_info except Exception as e: import traceback error_details = traceback.format_exc() print(f"查询出错: {error_details}") status = "error" error = f"查询失败: {str(e)}" return status, data, error, helper_info class ProjectBuilder: """ 项目构建器 描述: 用于构建项目对象的构建器 """ _instance = None @staticmethod def build(): """ 构建并返回项目实例 Returns: ProjectTookiItNeo4j: 创建的项目实例 """ # 如果已经有实例,先关闭它 if ProjectBuilder._instance is not None: ProjectBuilder._instance.close() # 创建新实例 ProjectBuilder._instance = ProjectTookiItNeo4j() return ProjectBuilder._instance @staticmethod def close(): """ 关闭当前项目实例的连接 """ if ProjectBuilder._instance is not None: ProjectBuilder._instance.close() ProjectBuilder._instance = None # 注册退出处理函数,确保程序退出时自动关闭连接 atexit.register(ProjectBuilder.close) # project = ProjectBuilder.build() # status, data, error, helper_info = project.get_division_by_name("安装工程") # print(status) # print(data) # print(error) # print(helper_info)