diff --git a/project_implementation.py b/project_implementation.py deleted file mode 100644 index 3b64db0..0000000 --- a/project_implementation.py +++ /dev/null @@ -1,2265 +0,0 @@ -from neo4j import GraphDatabase -from project import * -import atexit -from typing import Tuple, List, Dict, Any, Optional -import logging - -logger = logging.getLogger("project_implementation") - -class ProjectTookiItNeo4j(ProjectTookiIt): - """ - 基于Neo4j数据库的项目类实现 - """ - - def __init__(self): - logger.info('开始初始化Neo4j连接') - """ - 初始化Neo4j连接 - - Args: - uri (str): Neo4j数据库URI - user (str): 用户名 - password (str): 密码 - """ - uri = "bolt://localhost:7487" - user = "neo4j" - password = "password" - - super().__init__() - self.driver = GraphDatabase.driver(uri, auth=(user, password)) - self.session = self.driver.session() - - # 初始化其他必要的数据结构 - self.material_equipment_dict = {} # 材机字典,键为ID - self.fee_templates = {} # 取费表模板字典,键为ID - self.fee_schedules = {} # 费用表字典,键为ID - self.project_properties = {} # 工程属性字典 - logger.info('Neo4j连接初始化完成') - - def close(self): - logger.info('开始关闭数据库连接') - """ - 关闭数据库连接 - """ - if self.session: - self.session.close() - if self.driver: - self.driver.close() - logger.info('数据库连接已关闭') - - def debug_path(self, path): - logger.info(f'开始调试路径: {path}') - path_parts = path.split("/") - print(f"调试路径: {path}") - - # 检查每一级路径是否存在 - for i in range(len(path_parts)): - partial_path = "/".join(path_parts[: i + 1]) - query = """ - MATCH (n) - WHERE n.name = $name - RETURN n.name as name - """ - result = self.session.run(query, name=path_parts[i]) - record = result.single() - exists = "存在" if record else "不存在" - print(f" 节点 '{path_parts[i]}' {exists}") - logger.info(f'路径 {path} 调试完成') - - # 通用节点查询方法 - def get_node_by_path(self, path, node_labels=None): - logger.info(f'开始通过路径 {path} 获取节点对象') - """ - 通过路径获取节点对象 - - Args: - path (str): 以'/'分隔的多级节点路径 - node_labels (list): 节点标签列表,用于过滤结果 - - Returns: - dict|None: 节点数据,如果路径不存在返回None - """ - if not path: - logger.warning('输入路径为空,无法获取节点对象') - return None - - # 分割路径为各个部分 - path_parts = path.split("/") - - # 构建查询 - if len(path_parts) == 1: - # 只有一级路径,直接查询 - if node_labels: - labels_str = ":" + "|:".join(node_labels) - query = f""" - MATCH (n{labels_str}) - WHERE n.name = $name - RETURN n LIMIT 1 - """ - else: - query = """ - MATCH (n) - WHERE n.name = $name - RETURN n LIMIT 1 - """ - params = {"name": path_parts[0]} - else: - # 多级路径,构建路径查询 - last_part = path_parts[-1] - if node_labels: - labels_str = ":" + "|".join(node_labels) - query = f""" - MATCH path = (root)-[*]->(target{labels_str}) - WHERE target.name = $last_part - RETURN target as n LIMIT 1 - """ - else: - query = """ - MATCH path = (root)-[*]->(target) - WHERE target.name = $last_part - RETURN target as n LIMIT 1 - """ - params = {"last_part": last_part} - - try: - result = self.session.run(query, **params) - record = result.single() - - if not record: - logger.warning(f'路径 {path} 不存在,未找到节点对象') - return None - - logger.info(f'成功通过路径 {path} 获取节点对象') - return record["n"] - except Exception as e: - logger.error(f'获取节点对象时出错: {e}') - print(f"获取节点对象时出错: {e}") - return None - - # 项目划分查询方法 - def get_division_item_by_path(self, path) -> Tuple[str, Dict[str, Any], str, List[Any]]: - logger.info(f'开始通过路径 {path} 获取项目划分对象') - """ - 通过路径获取项目划分对象 - - Args: - path (str): 项目划分节点的路径,以'/'分隔的多级节点路径 - - Returns: - Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - - status (str): 状态,'success'或'error' - - data (Dict[str, Any]): 成功时返回的项目划分对象数据 - - error (str): 错误时的错误信息 - - helper_info (List[Any]): 辅助信息列表,如果未找到节点,则包含父节点下所有ProjectDivisionItem节点 - """ - # 初始化返回结果 - status = "success" - data = {} - error = "" - helper_info = [] - - try: - # 获取节点数据 - node_data = self.get_node_by_path(path, ["ProjectDivisionItem"]) - - if not node_data: - status = "error" - error = f"找不到路径: {path} 上的ProjectDivisionItem节点" - logger.warning(error) - - # 提取父路径 - path_parts = path.split("/") - if len(path_parts) > 1: - # 去掉最后一个部分,得到父路径 - parent_path = "/".join(path_parts[:-1]) - parent_name = path_parts[-2] # 父节点名称 - - # 获取父节点 - parent_node = self.get_node_by_path(parent_path) - - if parent_node: - # 查询父节点下所有类型为ProjectDivisionItem的子节点 - query = """ - MATCH (p)-[*1..1]->(q:ProjectDivisionItem) - WHERE p.name = $parent_name - RETURN q.name as name - """ - params = {"parent_name": parent_name} - - try: - result = self.session.run(query, **params) - for record in result: - if record["name"]: - helper_info.append(record["name"]) - except Exception as e: - helper_info = [f"查询父节点下的子节点时出错: {str(e)}"] - logger.error(helper_info[0]) - - logger.info(f'成功通过路径 {path} 获取项目划分对象') - return status, data, error, helper_info - - # 创建项目划分对象并填充属性 - item = ProjectDivisionItem() - for key, value in node_data.items(): - if hasattr(item, key): - setattr(item, key, value) - - # 将对象的属性转换为字典 - for key, value in vars(item).items(): - if not key.startswith("_"): # 排除私有属性 - data[key] = value - - return status, data, error, helper_info - - except Exception as e: - logger.error(f'获取项目划分对象时出错: {e}') - import traceback - - error_details = traceback.format_exc() - print(f"查询出错: {error_details}") - - status = "error" - error = f"查询失败: {str(e)}" - return status, data, error, helper_info - - def get_division_node_by_parent_and_name( - self, parent_path, partial_name - ) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]: - """ - 通过父节点路径和模糊节点名称获取项目划分对象,包括子节点 - - 执行两步查询: - 1. 找到对应路径的父节点 - 2. 在父节点下查找名称中包含模糊名称的节点 - - Args: - parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 - partial_name (str): 目标节点的模糊或不完整名称 - - Returns: - Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - - status (str): 状态,'success'或'error' - - data (List[Dict[str, Any]]): 成功时返回的数据列表 - - error (str): 错误时的错误信息 - - helper_info (List[Any]): 辅助信息列表 - """ - # 初始化返回结果 - status = "success" - data = [] - error = "" - helper_info = [] - - # 检查部分名称是否为空 - if not partial_name: - status = "error" - error = "节点名称不能为空" - return status, data, error, helper_info - - try: - # 第一步:找到父节点 - parent_node_data = self.get_node_by_path(parent_path) - - # 如果找不到父节点 - if not parent_node_data: - status = "error" - error = f"找不到父节点路径: {parent_path}" - - # 提取父路径的父节点 - path_parts = parent_path.split("/") - if len(path_parts) > 1: - # 去掉最后一个部分,得到父路径的父路径 - grandparent_path = "/".join(path_parts[:-1]) - grandparent_name = path_parts[-2] if len(path_parts) >= 2 else "" - - # 查询父路径的父节点下所有类型为ProjectDivisionItem的节点,不指定关系类型 - query = """ - MATCH (p)-[*1..1]->(q:ProjectDivisionItem) - WHERE p.name = $grandparent_name - RETURN q.name as name - """ - params = {"grandparent_name": grandparent_name} - - try: - result = self.session.run(query, **params) - for record in result: - if record["name"]: - helper_info.append(record["name"]) - - # 如果没有找到任何节点,尝试更宽泛的查询 - if not helper_info: - broader_query = """ - MATCH (p {name: $grandparent_name})-[*1..2]->(q:ProjectDivisionItem) - RETURN q.name as name LIMIT 50 - """ - broader_result = self.session.run(broader_query, **params) - for record in broader_result: - if record["name"]: - helper_info.append(record["name"]) - except Exception as e: - print(f"查询辅助信息时出错: {str(e)}") - helper_info = [f"查询辅助信息时出错: {str(e)}"] - - return status, data, error, helper_info - - # 获取父节点名称 - parent_name = parent_node_data.get("name") - if not parent_name: - status = "error" - error = "父节点数据中没有name字段" - return status, data, error, helper_info - - parent_name = parent_node_data.get("name", "") - - # 第二步:改用父节点名称进行查询 - query = """ - MATCH (p {name: $parent_name})-[*1..1]-(n:ProjectDivisionItem) - WHERE toLower(n.name) CONTAINS toLower($partial_name) - RETURN n LIMIT 50 - """ - params = {"parent_name": parent_name, "partial_name": partial_name.strip()} - - # print(f"调试参数: parent_name='{parent_name}', partial_name='{partial_name}'") - result = self.session.run(query, **params) - records = list(result) - # print(f"调试返回: 找到{len(records)}条记录") - - # 处理查询结果 - items = [] - for record in records: - node_data = record["n"] - item = ProjectDivisionItem() - for key, value in node_data.items(): - if hasattr(item, key): - setattr(item, key, value) - items.append(vars(item)) - - # 如果没有找到匹配的节点 - if not items: - status = "error" - error = f"在父节点 '{parent_name}' 下找不到名称包含 '{partial_name}' 的节点" - - # 查询父节点下所有类型为ProjectDivisionItem的节点作为辅助信息,不指定关系类型 - helper_query = """ - MATCH (p)-[*1..1]->(q:ProjectDivisionItem) - WHERE p.id = $parent_id - RETURN q.name as name - """ - helper_params = {"parent_name": parent_name} - - try: - helper_result = self.session.run(helper_query, **helper_params) - for record in helper_result: - if record["name"]: - helper_info.append(record["name"]) - - # 如果没有找到任何节点,尝试更宽泛的查询 - if not helper_info: - broader_query = """ - MATCH (p)-[*1..2]->(q:ProjectDivisionItem) - WHERE p.id = $parent_id - RETURN q.name as name LIMIT 50 - """ - broader_result = self.session.run(broader_query, **helper_params) - for record in broader_result: - if record["name"]: - helper_info.append(record["name"]) - - # 如果仍然没有找到,尝试使用名称而不是ID - if not helper_info: - name_query = """ - MATCH (p {name: $parent_name})-[*1..2]->(q:ProjectDivisionItem) - RETURN q.name as name LIMIT 50 - """ - name_params = {"parent_name": parent_name} - name_result = self.session.run(name_query, **name_params) - for record in name_result: - if record["name"]: - helper_info.append(record["name"]) - except Exception as e: - print(f"查询辅助信息时出错: {str(e)}") - helper_info = [f"查询辅助信息时出错: {str(e)}"] - - return status, data, error, helper_info - - # 成功找到匹配节点 - data = items - return status, data, error, helper_info - - except Exception as e: - import traceback - - error_details = traceback.format_exc() - print(f"查询出错: {error_details}") - - status = "error" - error = f"查询失败: {str(e)}" - return status, data, error, helper_info - - def get_division_by_name(self, name_part) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]: - """ - 直接查找节点类型为ProjectDivisionItem且name字段包含输入名称的节点 - - Args: - name_part (str): 节点名称的部分内容 - - Returns: - Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - - status (str): 状态,'success'或'error' - - data (List[Dict[str, Any]]): 成功时返回的数据列表 - - error (str): 错误时的错误信息 - - helper_info (List[Any]): 辅助信息列表 - """ - # 初始化返回结果 - status = "success" - data = [] - error = "" - helper_info = [] - - # 检查输入名称是否为空 - if not name_part or name_part.strip() == "": - status = "error" - error = "输入的名称部分不能为空" - return status, data, error, helper_info - - try: - # 直接查询所有类型为ProjectDivisionItem且name包含输入名称的节点 - query = """ - MATCH (n:ProjectDivisionItem) - WHERE toLower(n.name) CONTAINS toLower($name_part) - RETURN n LIMIT 50 - """ - params = {"name_part": name_part.strip()} - - result = self.session.run(query, parameters=params) - records = list(result) - - if not records: - status = "error" - error = f"找不到名称包含 '{name_part}' 的ProjectDivisionItem节点" - - # 提供一些可能相近的节点名称作为辅助信息 - broader_query = """ - MATCH (n:ProjectDivisionItem) - RETURN n.name AS name LIMIT 20 - """ - broader_result = self.session.run(broader_query) - for record in broader_result: - if record["name"]: - helper_info.append(record["name"]) - - return status, data, error, helper_info - - # 处理查询结果 - items = [] - for record in records: - node_data = record["n"] - item = ProjectDivisionItem() - for key, value in node_data.items(): - if hasattr(item, key): - setattr(item, key, value) - - # 获取节点的完整路径作为附加信息 - node_path_query = """ - MATCH path = (root)-[*]->(target:ProjectDivisionItem) - WHERE id(target) = $node_id - WITH [node in nodes(path) | node.name] AS path_names - RETURN path_names - """ - path_result = self.session.run(node_path_query, parameters={"node_id": node_data.id}) - path_record = path_result.single() - - node_dict = vars(item) - if path_record: - node_dict["完整路径"] = "/".join([name for name in path_record["path_names"] if name]) - - items.append(node_dict) - - data = items - return status, data, error, helper_info - - except Exception as e: - import traceback - - error_details = traceback.format_exc() - print(f"查询出错: {error_details}") - - status = "error" - error = f"查询失败: {str(e)}" - return status, data, error, helper_info - - # 工程量查询方法 - def get_quantities_by_paths(self, paths_str) -> Tuple[str, Dict[str, Any], str, List[Any]]: - """ - 获取指定项目路径下的工程量对象 - - Args: - paths_str (str): 以'/'分隔的多级节点路径 - - Returns: - Tuple[str, Dict[str, Any], str, List[Any]]: - - status: 'success'或'error' - - data: 成功时返回的节点数据字典,失败时为{} - - error: 错误信息,成功时为'' - - helper_info: 辅助信息列表(当目标节点不存在时返回父节点下的所有ProjectQuantity节点名称) - """ - # 初始化返回结果 - status = "success" - data = {} - error = "" - helper_info = [] - - if not paths_str: - status = "error" - error = "路径不能为空" - return status, data, error, helper_info - - try: - # 使用通用方法获取节点,考虑所有可能的工程量类型 - node_data = self.get_node_by_path(paths_str, ["ProjectQuantity", "Quota", "MainMaterial", "Equipment"]) - - if node_data: - # 根据节点标签或类型属性创建对应类型的对象 - quantity = self._create_quantity_object(node_data) - - # 填充属性 - for key, value in node_data.items(): - if hasattr(quantity, key): - setattr(quantity, key, value) - - # 将对象转为字典 - data = vars(quantity) - return status, data, error, helper_info - - # 如果找不到节点,尝试获取父节点下的所有ProjectQuantity节点作为辅助信息 - path_parts = paths_str.split("/") - if len(path_parts) > 1: - parent_path = "/".join(path_parts[:-1]) - target_name = path_parts[-1] - - # 获取父节点 - parent_node_data = self.get_node_by_path(parent_path, ["ProjectDivisionItem"]) - - if parent_node_data: - # 查询父节点下的所有符合条件的节点 - query = """ - MATCH (p)-[*1..1]->(n) - WHERE p.name = $parent_name AND - any(label IN labels(n) WHERE label IN $allowed_labels) - RETURN n.name as name, labels(n) as labels - """ - params = { - "parent_name": parent_node_data.get("name", ""), - "allowed_labels": ["ProjectQuantity", "Quota", "MainMaterial", "Equipment"], - } - - try: - result = self.session.run(query, **params) - for record in result: - if record["name"]: - helper_info.append({record["name"]}) - except Exception as e: - print(f"查询辅助信息时出错: {str(e)}") - helper_info = [f"查询辅助信息时出错: {str(e)}"] - - status = "error" - error = f"找不到路径: {paths_str}" - return status, data, error, helper_info - - except Exception as e: - status = "error" - error = f"查询失败: {str(e)}" - return status, data, error, helper_info - - def get_quantities_node_by_parent_and_code( - self, parent_path, quantity_type=None, code=None - ) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]: - """ - 通过父节点路径和编码获取工程量对象(定额、主材或设备),包括子节点 - - 执行三步查询: - 1. 找到对应路径的父节点 - 2. 找到父节点下多级子节点中节点类型为参数中节点类型的所有节点 - 3. 在找到的节点中查找编码与传入编码匹配的节点并返回 - - Args: - parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 - quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None表示所有类型) - code (str): 工程量编码,以'/'分隔的多个编码 - - Returns: - Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - - status (str): 状态,'success'或'error' - - data (List[Dict[str, Any]]): 成功时返回的数据列表 - - error (str): 错误时的错误信息 - - helper_info (List[Any]): 辅助信息列表 - """ - # 初始化返回结果 - status = "success" - data = [] - error = "" - helper_info = [] - - if not code: - status = "error" - error = "编码不能为空" - return status, data, error, helper_info - - # 验证类型参数是否有效 - valid_types = ["定额", "主材", "设备", None] - if quantity_type not in valid_types: - status = "error" - error = f"无效的工程量类型: '{quantity_type}'" - helper_info = valid_types - return status, data, error, helper_info - - try: - # 步骤1: 找到对应路径的父节点 - parent_node = self.get_node_by_path(parent_path) - if not parent_node: - status = "error" - error = f"找不到父节点路径: {parent_path}" - - # 尝试提供辅助信息 - 路径的各部分是否存在 - path_parts = parent_path.split("/") - existing_parts = [] - for i in range(len(path_parts)): - partial_path = "/".join(path_parts[: i + 1]) - if self.get_node_by_path(partial_path): - existing_parts.append(partial_path) - helper_info = existing_parts - return status, data, error, helper_info - - # 提取父节点名称用于后续查询 - parent_name = parent_node.get("name", "") - - # 根据工程量类型确定类型条件 - type_condition = "" - - if quantity_type == "定额": - type_condition = "(q:ProjectQuantity:Quota OR q.类型 = '0')" - elif quantity_type == "主材": - type_condition = "(q:ProjectQuantity:MainMaterial OR q.类型 = '1')" - elif quantity_type == "设备": - type_condition = "(q:ProjectQuantity:Equipment OR q.类型 = '5')" - else: - type_condition = "q:ProjectQuantity" - - # 步骤2: 查找父节点下所有的指定类型的子节点 - child_query = f""" - MATCH (p)-[*1..10]->(q) - WHERE p.name = $parent_name AND {type_condition} - RETURN q - """ - child_params = {"parent_name": parent_name} - - child_result = self.session.run(child_query, **child_params) - child_nodes = [record["q"] for record in child_result] - - if not child_nodes: - status = "error" - error = f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 的节点" - - # 尝试提供辅助信息 - 父节点下有哪些节点名称 - helper_query = """ - MATCH (p)-[*1..10]->(q) - WHERE p.name = $parent_name - RETURN DISTINCT q.name as node_name - LIMIT 20 - """ - helper_result = self.session.run(helper_query, **child_params) - helper_info = [record["node_name"] for record in helper_result if record["node_name"]] - return status, data, error, helper_info - - # 步骤3: 查找编码匹配的节点 - code_conditions = [] - code_parts = code.split("/") - - for code_part in code_parts: - if code_part.strip(): - code_conditions.append(code_part.strip()) - - if not code_conditions: - status = "error" - error = "提供的编码为空" - return status, data, error, helper_info - - # 在 child_nodes 中直接查找编码匹配的节点 - matching_nodes = [] - - for node in child_nodes: - node_code = node.get("编码", "") - if any(code_part in str(node_code) for code_part in code_conditions): - matching_nodes.append(node) - - if not matching_nodes: - status = "error" - error = f"在父节点 '{parent_name}' 下找不到编码匹配的节点" - - # 尝试提供辅助信息 - 所有指定类型的子节点的编码 - helper_info = [node.get("编码") for node in child_nodes if node.get("编码")] - return status, data, error, helper_info - - # 处理匹配的节点,转换为字典格式 - result_data = [] - for node in matching_nodes: - quantity = self._create_quantity_object(node, quantity_type) - - for key, value in node.items(): - if hasattr(quantity, key): - setattr(quantity, key, value) - - attrs = {} - for key, value in vars(quantity).items(): - if not key.startswith("_"): # 排除私有属性 - attrs[key] = value - - result_data.append(attrs) - - data = result_data - return status, data, error, helper_info - - except Exception as e: - import traceback - - error_details = traceback.format_exc() - print(f"查询出错: {error_details}") - - status = "error" - error = f"查询失败: {str(e)}" - return status, data, error, helper_info - - def get_quantities_node_by_parent_and_name( - self, parent_path, partial_name, quantity_type=None - ) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]: - """ - 通过父节点路径、模糊节点名称和类型获取工程量对象(主材或者设备),包括子节点 - - 执行三步查询: - 1. 找到对应路径的父节点 - 2. 找到父节点下多级子节点中节点类型为参数中节点类型的所有节点 - 3. 在找到的节点中找到名称中包含节点模糊名称的节点返回 - - Args: - parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 - partial_name (str): 目标节点的模糊或不完整名称 - quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None表示所有类型) - - Returns: - Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - - status (str): 状态,'success'或'error' - - data (List[Dict[str, Any]]): 成功时返回的数据列表 - - error (str): 错误时的错误信息 - - helper_info (List[Any]): 辅助信息列表 - """ - # 初始化返回结果 - status = "success" - data = [] - error = "" - helper_info = [] - - if not partial_name: - status = "error" - error = "节点名称不能为空" - return status, data, error, helper_info - - # 验证类型参数是否有效 - valid_types = ["定额", "主材", "设备", None] - if quantity_type not in valid_types: - status = "error" - error = f"无效的工程量类型: '{quantity_type}'" - helper_info = valid_types - return status, data, error, helper_info - - try: - # 步骤1: 找到对应路径的父节点 - parent_node = self.get_node_by_path(parent_path) - if not parent_node: - status = "error" - error = f"找不到父节点路径: {parent_path}" - # 尝试提供辅助信息 - 路径的各部分是否存在 - path_parts = parent_path.split("/") - existing_parts = [] - for i in range(len(path_parts)): - partial_path = "/".join(path_parts[: i + 1]) - if self.get_node_by_path(partial_path): - existing_parts.append(partial_path) - helper_info = existing_parts - return status, data, error, helper_info - - # 从父节点中提取名称,用于后续查询 - parent_name = parent_node.get("name", "") - - # 根据工程量类型确定类型条件,不使用标签过滤 - type_condition = "" - - if quantity_type == "定额": - # 定额类型可能是 ProjectQuantity+Quota 或者 类型='0' - type_condition = "(q:ProjectQuantity:Quota OR q.类型 = '0')" - elif quantity_type == "主材": - # 主材类型可能是 ProjectQuantity+MainMaterial 或者 类型='1' - type_condition = "(q:ProjectQuantity:MainMaterial OR q.类型 = '1')" - elif quantity_type == "设备": - # 设备类型可能是 ProjectQuantity+Equipment 或者 类型='5' - type_condition = "(q:ProjectQuantity:Equipment OR q.类型 = '5')" - else: - # 如果没有指定类型,则查询所有ProjectQuantity节点 - type_condition = "q:ProjectQuantity" - - # 步骤2: 找到父节点下所有的子节点,类型为指定类型的节点 - # 使用父节点名称进行查询,不指定具体的关系类型 - child_query = f""" - MATCH (p)-[*1..10]->(q) - WHERE p.name = $parent_name AND {type_condition} - RETURN q - """ - child_params = {"parent_name": parent_name} - - child_result = self.session.run(child_query, **child_params) - child_nodes = [record["q"] for record in child_result] - - if not child_nodes: - # 如果没有找到节点,尝试直接使用父节点路径的最后一部分进行查询 - path_parts = parent_path.split("/") - last_part = path_parts[-1] if path_parts else "" - - fallback_query = f""" - MATCH (p)-[*1..10]->(q) - WHERE p.name CONTAINS $last_part AND {type_condition} - RETURN q LIMIT 100 - """ - fallback_params = {"last_part": last_part} - - fallback_result = self.session.run(fallback_query, **fallback_params) - child_nodes = [record["q"] for record in fallback_result] - - if not child_nodes: - status = "error" - error = f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 的节点" - - # 尝试提供辅助信息 - 父节点下有哪些类型的节点 - helper_query = """ - MATCH (p)-[*1..10]->(q) - WHERE p.name = $parent_name - RETURN DISTINCT labels(q) as node_labels, q.类型 as node_type - LIMIT 20 - """ - helper_params = {"parent_name": parent_name} - - helper_result = self.session.run(helper_query, **helper_params) - helper_data = [] - for record in helper_result: - labels = record["node_labels"] if record["node_labels"] else [] - node_type = record["node_type"] if record["node_type"] else "未知" - helper_data.append(f"标签: {labels}, 类型: {node_type}") - - helper_info = helper_data - return status, data, error, helper_info - - # 步骤3: 在找到的节点中找到名称中包含节点模糊名称的节点 - matching_nodes = [] - available_nodes = [] # 用于存储所有可用节点的名称,作为辅助信息 - - for node in child_nodes: - node_name = node.get("name", "") - # 记录所有节点名称作为辅助信息 - if node_name: - available_nodes.append(node_name) - - if partial_name in str(node_name): - matching_nodes.append(node) - - if not matching_nodes: - # 如果没有找到匹配的节点,尝试直接查询 - direct_query = f""" - MATCH (q) - WHERE q.name CONTAINS $partial_name AND {type_condition} - RETURN q LIMIT 20 - """ - direct_params = {"partial_name": partial_name} - - direct_result = self.session.run(direct_query, **direct_params) - matching_nodes = [record["q"] for record in direct_result] - - if not matching_nodes: - status = "error" - error = ( - f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 且名称包含 '{partial_name}' 的节点" - ) - # 提供辅助信息 - 该路径下所有可用的节点名称 - helper_info = available_nodes - return status, data, error, helper_info - - # 处理匹配的节点,将它们转换为对象 - result_data = [] - for matching_node in matching_nodes: - # 创建对应类型的对象 - quantity = self._create_quantity_object(matching_node, quantity_type) - - # 填充属性 - for key, value in matching_node.items(): - if hasattr(quantity, key): - setattr(quantity, key, value) - - # 将对象的属性转换为字典 - attrs = {} - for key, value in vars(quantity).items(): - if not key.startswith("_"): # 排除私有属性 - attrs[key] = value - - result_data.append(attrs) - - # 成功找到匹配节点 - data = result_data - return status, data, error, helper_info - - except Exception as e: - import traceback - - error_details = traceback.format_exc() - print(f"查询出错: {error_details}") - - status = "error" - error = f"查询失败: {str(e)}" - return status, data, error, helper_info - - # 辅助方法,用于根据节点数据创建对应类型的工程量对象 - def _create_quantity_object(self, node_data, quantity_type=None): - """ - 根据节点数据创建对应类型的工程量对象 - - Args: - node_data (dict): 节点数据 - quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None) - - Returns: - ProjectQuantity: 创建的工程量对象 - """ - # 如果指定了类型,直接创建对应类型的对象 - if quantity_type == "定额": - return Ration() - elif quantity_type == "主材": - return Material() - elif quantity_type == "设备": - return Equipment() - - # 如果没有指定类型,尝试通过节点属性或标签判断 - if "类型" in node_data: - if node_data["类型"] == "0": - return Ration() - elif node_data["类型"] == "1": - return Material() - elif node_data["类型"] == "5": - return Equipment() - - # 通过标签判断 - labels = list(node_data.labels) if hasattr(node_data, "labels") else [] - if "Quota" in labels: - return Ration() - elif "MainMaterial" in labels: - return Material() - elif "Equipment" in labels: - return Equipment() - - # 默认返回基类对象 - return ProjectQuantity() - - # 材机查询方法实现 - def get_material_equipment_by_path(self, path: str) -> Tuple[str, Dict[str, Any], str, List[Any]]: - """ - 通过路径获取材机对象(MaterialOrEquipment 类型节点) - - Args: - path (str): 项目划分节点的路径,以'/'分隔的多级节点路径 - - Returns: - Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - - status (str): 状态,'success'或'error' - - data (Dict[str, Any]): 成功时返回的节点属性数据 - - error (str): 错误时的错误信息 - - helper_info (List[Any]): 辅助信息列表: - - 如果未找到目标节点,则包含父节点下所有 MaterialOrEquipment 节点名称 - - 否则为空 - """ - # 初始化返回结果 - status = "success" - data = {} - error = "" - helper_info = [] - - try: - # 获取目标节点 - node_data = self.get_node_by_path(path) - - if not node_data: - status = "error" - error = f"找不到路径: {path} 上的 MaterialOrEquipment 节点" - - # 提取父路径 - path_parts = path.split("/") - if len(path_parts) > 1: - parent_path = "/".join(path_parts[:-1]) # 父节点路径 - parent_name = path_parts[-2] # 父节点名称 - - # 获取父节点 - parent_node = self.get_node_by_path(parent_path) - if parent_node: - # 查询父节点下所有类型为 MaterialOrEquipment 的子节点名称 - query = """ - MATCH (p)-[*1..1]->(m:MaterialOrEquipment) - WHERE p.name = $parent_name - RETURN m.name AS name - """ - params = {"parent_name": parent_name} - - try: - result = self.session.run(query, **params) - for record in result: - if record["name"]: - helper_info.append(record["name"]) - except Exception as e: - helper_info = [f"查询父节点下的材机子节点时出错: {str(e)}"] - - return status, data, error, helper_info - - # 将节点属性转换为字典并过滤掉私有字段 - for key, value in node_data.items(): - if not key.startswith("_"): - data[key] = value - - return status, data, error, helper_info - - except Exception as e: - import traceback - - error_details = traceback.format_exc() - print(f"查询出错: {error_details}") - - status = "error" - error = f"查询失败: {str(e)}" - return status, data, error, helper_info - - def get_material_equipment_by_parent_and_name( - self, parent_path: str, partial_name: str - ) -> Tuple[str, Dict[str, Any], str, List[Any]]: - """ - 通过父节点路径和模糊名称获取材机对象 - - Args: - parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 - partial_name (str): 目标节点的模糊或不完整名称 - - Returns: - Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - """ - status = "success" - data = {} - error = "" - helper_info = [] - - try: - if not parent_path or not partial_name: - status = "error" - error = "父节点路径或要查找的名称不能为空" - return status, data, error, helper_info - - # 第一步:查找父节点 - parent_node_data = self.get_node_by_path(parent_path) - - if not parent_node_data: - status = "error" - error = f"找不到父节点路径: {parent_path}" - - # 获取祖父节点路径 - path_parts = parent_path.split("/") - if len(path_parts) > 1: - grandparent_path = "/".join(path_parts[:-1]) - grandparent_name = path_parts[-2] - - # 查询祖父节点下的所有子节点名称作为 helper_info - query = """ - MATCH (p)-[*1..1]->(child) - WHERE p.name = $grandparent_name - RETURN child.name AS name - """ - params = {"grandparent_name": grandparent_name} - - try: - result = self.session.run(query, **params) - for record in result: - if record["name"]: - helper_info.append(record["name"]) - except Exception as e: - helper_info = [f"查询祖父节点下的子节点失败: {str(e)}"] - - return status, data, error, helper_info - - parent_id = parent_node_data.get("id") - if not parent_id: - status = "error" - error = f"父节点 {parent_path} 没有有效的 ID" - return status, data, error, helper_info - - # 第二步:查找父节点下名称包含 partial_name 的 MaterialOrEquipment 节点 - query = """ - MATCH (p)-[*1..1]->(m:MaterialOrEquipment) - WHERE p.id = $parent_id AND m.name CONTAINS $partial_name - RETURN m LIMIT 20 - """ - params = {"parent_id": parent_id, "partial_name": partial_name} - - result = self.session.run(query, **params) - - matched_nodes = [] - for record in result: - node = record["m"] - properties = dict(node.items()) - matched_nodes.append(properties) - - if not matched_nodes: - status = "error" - error = f"在路径 {parent_path} 下没有找到名称包含 '{partial_name}' 的 MaterialOrEquipment 节点" - - # 查询父节点下所有 MaterialOrEquipment 子节点名称作为 helper_info - query_helper = """ - MATCH (p)-[*1..1]->(m:MaterialOrEquipment) - WHERE p.id = $parent_id - RETURN m.name AS name - """ - try: - result_helper = self.session.run(query_helper, parent_id=parent_id) - for record in result_helper: - if record["name"]: - helper_info.append(record["name"]) - except Exception as e: - helper_info = [f"查询父节点下的 MaterialOrEquipment 子节点失败: {str(e)}"] - - return status, data, error, helper_info - - # 成功找到,返回第一个节点的数据 - data = matched_nodes[0] - return status, data, error, helper_info - - except Exception as e: - import traceback - - error_details = traceback.format_exc() - print(f"查询出错: {error_details}") - - status = "error" - error = f"查询失败: {str(e)}" - return status, data, error, helper_info - - # 辅助方法,用于创建材机对象并填充属性 - def _create_material_object(self, node_data): - """ - 根据节点数据创建材机对象并填充属性 - - Args: - node_data (dict): 节点数据 - - Returns: - MaterialOrEquipment: 创建的材机对象 - """ - material = MaterialOrEquipment() - - # 填充属性 - for key, value in node_data.items(): - if hasattr(material, key): - setattr(material, key, value) - - return material - - # 取费表模板查询方法实现 - def get_fee_template_by_path(self, path: str) -> Tuple[str, Dict[str, Any], str, List[Any]]: - """ - 通过路径获取取费表模板节点 - - Args: - path (str): 项目划分节点的路径,以'/'分隔的多级节点路径 - 例如:工程数据/取费表模板/余物清理/线路取费表(余物清理)/直接费 - - Returns: - Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - - status (str): 状态,'success'或'error' - - data (Dict[str, Any]): 成功时返回的节点属性数据 - - error (str): 错误时的错误信息 - - helper_info (List[Any]): 辅助信息列表: - - 如果未找到目标节点,则包含父节点下所有 FeeCollection 节点名称 - - 否则为空 - """ - # 初始化返回结果 - status = "success" - data = {} - error = "" - helper_info = [] - - try: - # 获取目标节点 - node_data = self.get_node_by_path(path) - - if not node_data: - status = "error" - error = f"找不到路径: {path} 上的节点" - - # 提取父路径 - path_parts = path.split("/") - if len(path_parts) > 1: - parent_path = "/".join(path_parts[:-1]) # 父节点路径 - parent_name = path_parts[-2] # 父节点名称 - - # 获取父节点 - parent_node = self.get_node_by_path(parent_path) - if parent_node: - # 查询父节点下所有类型为 FeeCollection 的子节点名称 - query = """ - MATCH (p)-[*1..1]->(f:FeeCollection) - WHERE p.name = $parent_name - RETURN f.name AS name - """ - params = {"parent_name": parent_name} - - try: - result = self.session.run(query, **params) - for record in result: - if record["name"]: - helper_info.append(record["name"]) - except Exception as e: - helper_info = [f"查询父节点下的FeeCollection子节点时出错: {str(e)}"] - - return status, data, error, helper_info - - # 将节点属性转换为字典并过滤掉私有字段 - for key, value in node_data.items(): - if not key.startswith("_"): - data[key] = value - - return status, data, error, helper_info - - except Exception as e: - import traceback - - error_details = traceback.format_exc() - print(f"查询出错: {error_details}") - - status = "error" - error = f"查询失败: {str(e)}" - return status, data, error, helper_info - - def get_fee_template_by_parent_and_name( - self, parent_path: str, node_name: str - ) -> Tuple[str, Dict[str, Any], str, List[Any]]: - """ - 通过父节点路径和节点名称获取取费表模板节点 - - Args: - parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 - 例如:工程数据/取费表模板/余物清理/线路取费表(余物清理) - node_name (str): 目标节点的名称,例如:直接费 - - Returns: - Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - - status (str): 状态,'success'或'error' - - data (Dict[str, Any]): 成功时返回的节点属性数据 - - error (str): 错误时的错误信息 - - helper_info (List[Any]): 辅助信息列表: - - 如果未找到父节点,则包含父节点的父节点下所有节点名称 - - 如果未找到目标节点,则包含父节点下所有 FeeCollection 节点名称 - - 否则为空 - """ - # 初始化返回结果 - status = "success" - data = {} - error = "" - helper_info = [] - - try: - # 第一段:查找父节点 - parent_node_data = self.get_node_by_path(parent_path) - - if not parent_node_data: - status = "error" - error = f"找不到父节点路径: {parent_path}" - - # 提取父节点的父路径 - parent_path_parts = parent_path.split("/") - if len(parent_path_parts) > 1: - grandparent_path = "/".join(parent_path_parts[:-1]) # 父节点的父路径 - grandparent_name = parent_path_parts[-2] # 父节点的父节点名称 - - # 获取父节点的父节点 - grandparent_node = self.get_node_by_path(grandparent_path) - if grandparent_node: - # 查询父节点的父节点下所有子节点名称 - query = """ - MATCH (gp)-[*1..1]->(n) - WHERE gp.name = $grandparent_name - RETURN n.name AS name - """ - params = {"grandparent_name": grandparent_name} - - try: - result = self.session.run(query, **params) - for record in result: - if record["name"]: - helper_info.append(record["name"]) - except Exception as e: - helper_info = [f"查询父节点的父节点下的子节点时出错: {str(e)}"] - - return status, data, error, helper_info - - # 第二段:在父节点下查找目标节点 - parent_node_name = parent_path.split("/")[-1] # 父节点名称 - - # 查询父节点下指定名称的节点 - query = """ - MATCH (p)-[*1..1]->(n) - WHERE p.name = $parent_name AND n.name = $node_name - RETURN n - """ - params = {"parent_name": parent_node_name, "node_name": node_name} - - try: - result = self.session.run(query, **params) - target_node = None - - for record in result: - target_node = record["n"] - break - - if not target_node: - status = "error" - error = f"在父节点 {parent_node_name} 下找不到名称为 {node_name} 的节点" - - # 查询父节点下所有类型为 FeeCollection 的子节点名称 - query = """ - MATCH (p)-[*1..1]->(f:FeeCollection) - WHERE p.name = $parent_name - RETURN f.name AS name - """ - params = {"parent_name": parent_node_name} - - try: - result = self.session.run(query, **params) - for record in result: - if record["name"]: - helper_info.append(record["name"]) - except Exception as e: - helper_info = [f"查询父节点下的FeeCollection子节点时出错: {str(e)}"] - - return status, data, error, helper_info - - # 将目标节点属性转换为字典并过滤掉私有字段 - for key, value in target_node.items(): - if not key.startswith("_"): - data[key] = value - - return status, data, error, helper_info - - except Exception as e: - status = "error" - error = f"查询目标节点时出错: {str(e)}" - return status, data, error, helper_info - - except Exception as e: - import traceback - - error_details = traceback.format_exc() - print(f"查询出错: {error_details}") - - status = "error" - error = f"查询失败: {str(e)}" - return status, data, error, helper_info - - # 辅助方法,用于创建取费表模板对象并填充属性 - def _create_fee_template_object(self, node_data): - """ - 根据节点数据创建取费表模板对象并填充属性 - - Args: - node_data (dict): 节点数据 - - Returns: - FeeTableTemplateItem: 创建的取费表模板对象 - """ - template = FeeTableTemplateItem() - - # 填充属性 - for key, value in node_data.items(): - if hasattr(template, key): - setattr(template, key, value) - - # 更新缓存 - if hasattr(template, "OutlayID") and template.OutlayID: - self.fee_templates[template.OutlayID] = template - - return template - - # 费用表查询方法实现 - def get_fee_schedule_on_auxiliary_expense_table( - self, table_name: str, fee_name: str, fee_attribute: str - ) -> Tuple[str, Dict[str, Any], str, List[Any]]: - """ - 在辅助费用表中查找费用 - - Args: - table_name (str): 费用表名称 - fee_name (str): 要查找的费用名称(可能在多级子节点中) - fee_attribute (str): 费用值属性名 - - Returns: - Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - - status (str): 状态,'success'或'error' - - data (Dict[str, Any]): 成功时返回包含属性值的数据 - - error (str): 错误时的错误信息 - - helper_info (List[Any]): 辅助信息列表: - - 第一步失败:返回"工程数据"节点下所有子节点名称 - - 第二步失败:返回费用表节点下所有子节点名称(递归查找) - - 第三步失败:返回费用节点的所有可用属性名称 - - 否则为空 - """ - # 初始化返回结果 - status = "success" - data = {} - error = "" - helper_info = [] - - try: - # 检查输入参数 - if not table_name or not fee_name or not fee_attribute: - status = "error" - error = "输入参数不能为空" - return status, data, error, helper_info - - # 第一步:查找父节点(费用表节点) - parent_path = f"工程数据/工程费用/{table_name}" - parent_node_data = self.get_node_by_path(parent_path) - - if not parent_node_data: - status = "error" - error = f"找不到费用表节点: {parent_path}" - - # 查询"工程数据/工程费用"节点下所有子节点名称 - base_parent_node = self.get_node_by_path("工程数据/工程费用") - if base_parent_node: - query = """ - MATCH (p)-[*1..1]->(n) - WHERE p.name = '工程费用' - RETURN n.name AS name - """ - - try: - result = self.session.run(query) - for record in result: - if record["name"]: - helper_info.append(record["name"]) - except Exception as e: - helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] - - return status, data, error, helper_info - - # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) - fee_node = None - - # 递归查找费用节点,最多查找3级深度 - query = """ - MATCH (p)-[*1..3]->(f) - WHERE p.name = $table_name AND f.name = $fee_name - RETURN f LIMIT 1 - """ - params = {"table_name": table_name, "fee_name": fee_name} - - try: - result = self.session.run(query, **params) - all_records = result.data() - - if len(all_records) > 0: - fee_node = all_records[0]["f"] - - if not fee_node: - status = "error" - error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" - - # 查询费用表节点下所有子节点名称(递归查找) - query = """ - MATCH (p)-[*1..3]->(n) - WHERE p.name = $table_name - RETURN DISTINCT n.name AS name - ORDER BY n.name - """ - params = {"table_name": table_name} - - try: - result = self.session.run(query, **params) - for record in result: - if record["name"]: - helper_info.append(record["name"]) - except Exception as e: - helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] - - return status, data, error, helper_info - - except Exception as e: - status = "error" - error = f"查询费用节点时出错: {str(e)}" - return status, data, error, helper_info - - # 第三步:获取费用节点的属性值 - try: - if fee_node and hasattr(fee_node, "get"): - fee_value = fee_node.get(fee_attribute) - elif fee_node and isinstance(fee_node, dict): - fee_value = fee_node.get(fee_attribute) - else: - # 如果fee_node是Neo4j Node对象,尝试直接访问属性 - fee_value = ( - getattr(fee_node, fee_attribute, None) - if hasattr(fee_node, fee_attribute) - else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None - ) - - if fee_value is None: - status = "error" - error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" - - # 返回费用节点的所有可用属性名称 - try: - if hasattr(fee_node, "keys"): - helper_info = list(fee_node.keys()) - elif isinstance(fee_node, dict): - helper_info = list(fee_node.keys()) - else: - # 对于Neo4j Node对象 - helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] - - # 过滤掉私有属性 - helper_info = [attr for attr in helper_info if not attr.startswith("_")] - - except Exception as e: - helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] - - return status, data, error, helper_info - - # 成功获取属性值,只返回属性值 - data = fee_value - - return status, data, error, helper_info - - except Exception as e: - status = "error" - error = f"获取费用属性值时出错: {str(e)}" - return status, data, error, helper_info - - except Exception as e: - import traceback - - error_details = traceback.format_exc() - print(f"查询出错: {error_details}") - - status = "error" - error = f"查询失败: {str(e)}" - return status, data, error, helper_info - - def get_fee_schedule_on_other_expense_table( - self, table_name: str, fee_name: str, fee_attribute: str - ) -> Tuple[str, Dict[str, Any], str, List[Any]]: - """ - 在其它费用表中查找费用 - - Args: - table_name (str): 费用表名称 - fee_name (str): 要查找的费用名称(可能在多级子节点中) - fee_attribute (str): 费用值属性名 - - Returns: - Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - - status (str): 状态,'success'或'error' - - data (Dict[str, Any]): 成功时返回包含属性值的数据 - - error (str): 错误时的错误信息 - - helper_info (List[Any]): 辅助信息列表: - - 第一步失败:返回"工程数据"节点下所有子节点名称 - - 第二步失败:返回费用表节点下所有子节点名称(递归查找) - - 第三步失败:返回费用节点的所有可用属性名称 - - 否则为空 - """ - # 初始化返回结果 - status = "success" - data = {} - error = "" - helper_info = [] - - try: - # 检查输入参数 - if not table_name or not fee_name or not fee_attribute: - status = "error" - error = "输入参数不能为空" - return status, data, error, helper_info - - # 第一步:查找父节点(费用表节点) - parent_path = f"工程数据/工程费用/{table_name}" - parent_node_data = self.get_node_by_path(parent_path) - - if not parent_node_data: - status = "error" - error = f"找不到费用表节点: {parent_path}" - - # 查询"工程数据/工程费用"节点下所有子节点名称 - base_parent_node = self.get_node_by_path("工程数据/工程费用") - if base_parent_node: - query = """ - MATCH (p)-[*1..1]->(n) - WHERE p.name = '工程费用' - RETURN n.name AS name - """ - - try: - result = self.session.run(query) - for record in result: - if record["name"]: - helper_info.append(record["name"]) - except Exception as e: - helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] - - return status, data, error, helper_info - - # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) - fee_node = None - - # 递归查找费用节点,最多查找3级深度 - query = """ - MATCH (p)-[*1..3]->(f) - WHERE p.name = $table_name AND f.name = $fee_name - RETURN f LIMIT 1 - """ - params = {"table_name": table_name, "fee_name": fee_name} - - try: - result = self.session.run(query, **params) - all_records = result.data() - - if len(all_records) > 0: - fee_node = all_records[0]["f"] - - if not fee_node: - status = "error" - error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" - - # 查询费用表节点下所有子节点名称(递归查找) - query = """ - MATCH (p)-[*1..3]->(n) - WHERE p.name = $table_name - RETURN DISTINCT n.name AS name - ORDER BY n.name - """ - params = {"table_name": table_name} - - try: - result = self.session.run(query, **params) - for record in result: - if record["name"]: - helper_info.append(record["name"]) - except Exception as e: - helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] - - return status, data, error, helper_info - - except Exception as e: - status = "error" - error = f"查询费用节点时出错: {str(e)}" - return status, data, error, helper_info - - # 第三步:获取费用节点的属性值 - try: - if fee_node and hasattr(fee_node, "get"): - fee_value = fee_node.get(fee_attribute) - elif fee_node and isinstance(fee_node, dict): - fee_value = fee_node.get(fee_attribute) - else: - # 如果fee_node是Neo4j Node对象,尝试直接访问属性 - fee_value = ( - getattr(fee_node, fee_attribute, None) - if hasattr(fee_node, fee_attribute) - else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None - ) - - if fee_value is None: - status = "error" - error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" - - # 返回费用节点的所有可用属性名称 - try: - if hasattr(fee_node, "keys"): - helper_info = list(fee_node.keys()) - elif isinstance(fee_node, dict): - helper_info = list(fee_node.keys()) - else: - # 对于Neo4j Node对象 - helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] - - # 过滤掉私有属性 - helper_info = [attr for attr in helper_info if not attr.startswith("_")] - - except Exception as e: - helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] - - return status, data, error, helper_info - - # 成功获取属性值,只返回属性值 - data = fee_value - - return status, data, error, helper_info - - except Exception as e: - status = "error" - error = f"获取费用属性值时出错: {str(e)}" - return status, data, error, helper_info - - except Exception as e: - import traceback - - error_details = traceback.format_exc() - print(f"查询出错: {error_details}") - - status = "error" - error = f"查询失败: {str(e)}" - return status, data, error, helper_info - - def get_fee_schedule_on_land_acquisition_fee_table_table( - self, table_name: str, fee_name: str, fee_attribute: str - ) -> Tuple[str, Dict[str, Any], str, List[Any]]: - """ - 在土地征用费表中查找费用 - - Args: - table_name (str): 费用表名称 - fee_name (str): 要查找的费用名称(可能在多级子节点中) - fee_attribute (str): 费用值属性名 - - Returns: - Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - - status (str): 状态,'success'或'error' - - data (Dict[str, Any]): 成功时返回包含属性值的数据 - - error (str): 错误时的错误信息 - - helper_info (List[Any]): 辅助信息列表: - - 第一步失败:返回"工程数据"节点下所有子节点名称 - - 第二步失败:返回费用表节点下所有子节点名称(递归查找) - - 第三步失败:返回费用节点的所有可用属性名称 - - 否则为空 - """ - # 初始化返回结果 - status = "success" - data = {} - error = "" - helper_info = [] - - try: - # 检查输入参数 - if not table_name or not fee_name or not fee_attribute: - status = "error" - error = "输入参数不能为空" - return status, data, error, helper_info - - # 第一步:查找父节点(费用表节点) - parent_path = f"工程数据/工程费用/{table_name}" - parent_node_data = self.get_node_by_path(parent_path) - - if not parent_node_data: - status = "error" - error = f"找不到费用表节点: {parent_path}" - - # 查询"工程数据/工程费用"节点下所有子节点名称 - base_parent_node = self.get_node_by_path("工程数据/工程费用") - if base_parent_node: - query = """ - MATCH (p)-[*1..1]->(n) - WHERE p.name = '工程费用' - RETURN n.name AS name - """ - - try: - result = self.session.run(query) - for record in result: - if record["name"]: - helper_info.append(record["name"]) - except Exception as e: - helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] - - return status, data, error, helper_info - - # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) - fee_node = None - - # 递归查找费用节点,最多查找3级深度 - query = """ - MATCH (p)-[*1..3]->(f) - WHERE p.name = $table_name AND f.name = $fee_name - RETURN f LIMIT 1 - """ - params = {"table_name": table_name, "fee_name": fee_name} - - try: - result = self.session.run(query, **params) - all_records = result.data() - - if len(all_records) > 0: - fee_node = all_records[0]["f"] - - if not fee_node: - status = "error" - error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" - - # 查询费用表节点下所有子节点名称(递归查找) - query = """ - MATCH (p)-[*1..3]->(n) - WHERE p.name = $table_name - RETURN DISTINCT n.name AS name - ORDER BY n.name - """ - params = {"table_name": table_name} - - try: - result = self.session.run(query, **params) - for record in result: - if record["name"]: - helper_info.append(record["name"]) - except Exception as e: - helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] - - return status, data, error, helper_info - - except Exception as e: - status = "error" - error = f"查询费用节点时出错: {str(e)}" - return status, data, error, helper_info - - # 第三步:获取费用节点的属性值 - try: - if fee_node and hasattr(fee_node, "get"): - fee_value = fee_node.get(fee_attribute) - elif fee_node and isinstance(fee_node, dict): - fee_value = fee_node.get(fee_attribute) - else: - # 如果fee_node是Neo4j Node对象,尝试直接访问属性 - fee_value = ( - getattr(fee_node, fee_attribute, None) - if hasattr(fee_node, fee_attribute) - else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None - ) - - if fee_value is None: - status = "error" - error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" - - # 返回费用节点的所有可用属性名称 - try: - if hasattr(fee_node, "keys"): - helper_info = list(fee_node.keys()) - elif isinstance(fee_node, dict): - helper_info = list(fee_node.keys()) - else: - # 对于Neo4j Node对象 - helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] - - # 过滤掉私有属性 - helper_info = [attr for attr in helper_info if not attr.startswith("_")] - - except Exception as e: - helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] - - return status, data, error, helper_info - - # 成功获取属性值,只返回属性值 - data = fee_value - - return status, data, error, helper_info - - except Exception as e: - status = "error" - error = f"获取费用属性值时出错: {str(e)}" - return status, data, error, helper_info - - except Exception as e: - import traceback - - error_details = traceback.format_exc() - print(f"查询出错: {error_details}") - - status = "error" - error = f"查询失败: {str(e)}" - return status, data, error, helper_info - - def get_fee_schedule_on_installation_price_difference_table( - self, table_name: str, fee_name: str, fee_attribute: str - ) -> Tuple[str, Dict[str, Any], str, List[Any]]: - """ - 在安装价差表中查找费用 - - Args: - table_name (str): 费用表名称 - fee_name (str): 要查找的费用名称(可能在多级子节点中) - fee_attribute (str): 费用值属性名 - - Returns: - Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - - status (str): 状态,'success'或'error' - - data (Dict[str, Any]): 成功时返回包含属性值的数据 - - error (str): 错误时的错误信息 - - helper_info (List[Any]): 辅助信息列表: - - 第一步失败:返回"工程数据"节点下所有子节点名称 - - 第二步失败:返回费用表节点下所有子节点名称(递归查找) - - 第三步失败:返回费用节点的所有可用属性名称 - - 否则为空 - """ - # 初始化返回结果 - status = "success" - data = {} - error = "" - helper_info = [] - - try: - # 检查输入参数 - if not table_name or not fee_name or not fee_attribute: - status = "error" - error = "输入参数不能为空" - return status, data, error, helper_info - - # 第一步:查找父节点(费用表节点) - parent_path = f"工程数据/工程费用/{table_name}" - parent_node_data = self.get_node_by_path(parent_path) - - if not parent_node_data: - status = "error" - error = f"找不到费用表节点: {parent_path}" - - # 查询"工程数据/工程费用"节点下所有子节点名称 - base_parent_node = self.get_node_by_path("工程数据/工程费用") - if base_parent_node: - query = """ - MATCH (p)-[*1..1]->(n) - WHERE p.name = '工程费用' - RETURN n.name AS name - """ - - try: - result = self.session.run(query) - for record in result: - if record["name"]: - helper_info.append(record["name"]) - except Exception as e: - helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] - - return status, data, error, helper_info - - # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) - fee_node = None - - # 递归查找费用节点,最多查找3级深度 - query = """ - MATCH (p)-[*1..3]->(f) - WHERE p.name = $table_name AND f.name = $fee_name - RETURN f LIMIT 1 - """ - params = {"table_name": table_name, "fee_name": fee_name} - - try: - result = self.session.run(query, **params) - all_records = result.data() - - if len(all_records) > 0: - fee_node = all_records[0]["f"] - - if not fee_node: - status = "error" - error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" - - # 查询费用表节点下所有子节点名称(递归查找) - query = """ - MATCH (p)-[*1..3]->(n) - WHERE p.name = $table_name - RETURN DISTINCT n.name AS name - ORDER BY n.name - """ - params = {"table_name": table_name} - - try: - result = self.session.run(query, **params) - for record in result: - if record["name"]: - helper_info.append(record["name"]) - except Exception as e: - helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] - - return status, data, error, helper_info - - except Exception as e: - status = "error" - error = f"查询费用节点时出错: {str(e)}" - return status, data, error, helper_info - - # 第三步:获取费用节点的属性值 - try: - if fee_node and hasattr(fee_node, "get"): - fee_value = fee_node.get(fee_attribute) - elif fee_node and isinstance(fee_node, dict): - fee_value = fee_node.get(fee_attribute) - else: - # 如果fee_node是Neo4j Node对象,尝试直接访问属性 - fee_value = ( - getattr(fee_node, fee_attribute, None) - if hasattr(fee_node, fee_attribute) - else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None - ) - - if fee_value is None: - status = "error" - error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" - - # 返回费用节点的所有可用属性名称 - try: - if hasattr(fee_node, "keys"): - helper_info = list(fee_node.keys()) - elif isinstance(fee_node, dict): - helper_info = list(fee_node.keys()) - else: - # 对于Neo4j Node对象 - helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] - - # 过滤掉私有属性 - helper_info = [attr for attr in helper_info if not attr.startswith("_")] - - except Exception as e: - helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] - - return status, data, error, helper_info - - # 成功获取属性值,只返回属性值 - data = fee_value - - return status, data, error, helper_info - - except Exception as e: - status = "error" - error = f"获取费用属性值时出错: {str(e)}" - return status, data, error, helper_info - - except Exception as e: - import traceback - - error_details = traceback.format_exc() - print(f"查询出错: {error_details}") - - status = "error" - error = f"查询失败: {str(e)}" - return status, data, error, helper_info - - def get_fee_schedule_on_Engineering_Cost_table( - self, table_name: str, fee_name: str, fee_attribute: str - ) -> Tuple[str, Dict[str, Any], str, List[Any]]: - """ - 在工程费用表中查找费用 - - Args: - table_name (str): 费用表名称 - fee_name (str): 要查找的费用名称(可能在多级子节点中) - fee_attribute (str): 费用值属性名 - - Returns: - Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 - - status (str): 状态,'success'或'error' - - data (Dict[str, Any]): 成功时返回包含属性值的数据 - - error (str): 错误时的错误信息 - - helper_info (List[Any]): 辅助信息列表: - - 第一步失败:返回"工程数据"节点下所有子节点名称 - - 第二步失败:返回费用表节点下所有子节点名称(递归查找) - - 第三步失败:返回费用节点的所有可用属性名称 - - 否则为空 - """ - # 初始化返回结果 - status = "success" - data = {} - error = "" - helper_info = [] - - try: - # 检查输入参数 - if not table_name or not fee_name or not fee_attribute: - status = "error" - error = "输入参数不能为空" - return status, data, error, helper_info - - # 第一步:查找父节点(费用表节点) - parent_path = f"工程数据/工程费用/{table_name}" - parent_node_data = self.get_node_by_path(parent_path) - - if not parent_node_data: - status = "error" - error = f"找不到费用表节点: {parent_path}" - - # 查询"工程数据/工程费用"节点下所有子节点名称 - base_parent_node = self.get_node_by_path("工程数据/工程费用") - if base_parent_node: - query = """ - MATCH (p)-[*1..1]->(n) - WHERE p.name = '工程费用' - RETURN n.name AS name - """ - - try: - result = self.session.run(query) - for record in result: - if record["name"]: - helper_info.append(record["name"]) - except Exception as e: - helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] - - return status, data, error, helper_info - - # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) - fee_node = None - - # 递归查找费用节点,最多查找3级深度 - query = """ - MATCH (p)-[*1..3]->(f) - WHERE p.name = $table_name AND f.name = $fee_name - RETURN f LIMIT 1 - """ - params = {"table_name": table_name, "fee_name": fee_name} - - try: - result = self.session.run(query, **params) - all_records = result.data() - - if len(all_records) > 0: - fee_node = all_records[0]["f"] - - if not fee_node: - status = "error" - error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" - - # 查询费用表节点下所有子节点名称(递归查找) - query = """ - MATCH (p)-[*1..3]->(n) - WHERE p.name = $table_name - RETURN DISTINCT n.name AS name - ORDER BY n.name - """ - params = {"table_name": table_name} - - try: - result = self.session.run(query, **params) - for record in result: - if record["name"]: - helper_info.append(record["name"]) - except Exception as e: - helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] - - return status, data, error, helper_info - - except Exception as e: - status = "error" - error = f"查询费用节点时出错: {str(e)}" - return status, data, error, helper_info - - # 第三步:获取费用节点的属性值 - try: - if fee_node and hasattr(fee_node, "get"): - fee_value = fee_node.get(fee_attribute) - elif fee_node and isinstance(fee_node, dict): - fee_value = fee_node.get(fee_attribute) - else: - # 如果fee_node是Neo4j Node对象,尝试直接访问属性 - fee_value = ( - getattr(fee_node, fee_attribute, None) - if hasattr(fee_node, fee_attribute) - else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None - ) - - if fee_value is None: - status = "error" - error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" - - # 返回费用节点的所有可用属性名称 - try: - if hasattr(fee_node, "keys"): - helper_info = list(fee_node.keys()) - elif isinstance(fee_node, dict): - helper_info = list(fee_node.keys()) - else: - # 对于Neo4j Node对象 - helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] - - # 过滤掉私有属性 - helper_info = [attr for attr in helper_info if not attr.startswith("_")] - - except Exception as e: - helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] - - return status, data, error, helper_info - - # 成功获取属性值,只返回属性值 - data = fee_value - - return status, data, error, helper_info - - except Exception as e: - status = "error" - error = f"获取费用属性值时出错: {str(e)}" - return status, data, error, helper_info - - except Exception as e: - import traceback - - error_details = traceback.format_exc() - print(f"查询出错: {error_details}") - - status = "error" - error = f"查询失败: {str(e)}" - return status, data, error, helper_info - - -class ProjectBuilder: - """ - 项目构建器 - 描述: 用于构建项目对象的构建器 - """ - - _instance = None - - @staticmethod - def build(): - """ - 构建并返回项目实例 - - Returns: - ProjectTookiItNeo4j: 创建的项目实例 - """ - # 如果已经有实例,先关闭它 - if ProjectBuilder._instance is not None: - ProjectBuilder._instance.close() - - # 创建新实例 - - ProjectBuilder._instance = ProjectTookiItNeo4j() - - return ProjectBuilder._instance - - @staticmethod - def close(): - """ - 关闭当前项目实例的连接 - """ - if ProjectBuilder._instance is not None: - ProjectBuilder._instance.close() - ProjectBuilder._instance = None - - -# 注册退出处理函数,确保程序退出时自动关闭连接 -atexit.register(ProjectBuilder.close) - - -# project = ProjectBuilder.build() -# status, data, error, helper_info = project.get_division_by_name("安装工程") - -# print(status) -# print(data) -# print(error) -# print(helper_info)