diff --git a/project_implementation.py b/project_implementation.py new file mode 100644 index 0000000..5877dd9 --- /dev/null +++ b/project_implementation.py @@ -0,0 +1,3061 @@ +from neo4j import GraphDatabase +from project import * +import atexit +from typing import Tuple, List, Dict, Any, Optional +import logging +from src.config import Config + +logger = logging.getLogger("project_implementation") +config = Config() + + +class ProjectToolkitNeo4j(ProjectToolkit): + """ + 基于Neo4j数据库的项目类实现 + """ + + def __init__(self, neo4j_driver): + logger.info("开始初始化Neo4j连接") + """ + 初始化Neo4j连接 + + Args: + neo4j_driver: Neo4j驱动实例,必须提供 + """ + if neo4j_driver is None: + raise ValueError("必须提供Neo4j驱动实例") + + super().__init__() + + # 保存驱动实例 + self.driver = neo4j_driver + self.external_driver = True # 标记为外部驱动,不在close时关闭 + self.session = self.driver.session() + + # 初始化其他必要的数据结构 + self.material_equipment_dict = {} # 材机字典,键为ID + self.fee_templates = {} # 取费表模板字典,键为ID + self.fee_schedules = {} # 费用表字典,键为ID + self.project_properties = {} # 工程属性字典 + logger.info("Neo4j连接初始化完成") + + def close(self): + logger.info("开始关闭数据库连接") + """ + 关闭数据库连接 + """ + if self.session: + self.session.close() + # 仅在非外部驱动的情况下关闭driver + if self.driver and not self.external_driver: + self.driver.close() + logger.info("数据库连接已关闭") + + def debug_path(self, path): + logger.info(f"开始调试路径: {path}") + path_parts = path.split("/") + print(f"调试路径: {path}") + + # 检查每一级路径是否存在 + for i in range(len(path_parts)): + partial_path = "/".join(path_parts[: i + 1]) + query = """ + MATCH (n) + WHERE n.name = $name + RETURN n.name as name + """ + result = self.session.run(query, name=path_parts[i]) + record = result.single() + exists = "存在" if record else "不存在" + print(f" 节点 '{path_parts[i]}' {exists}") + logger.info(f"路径 {path} 调试完成") + + # 通用节点查询方法 + def get_node_by_path(self, path, node_labels=None): + logger.info(f"开始通过路径 {path} 获取节点对象") + """ + 通过路径获取节点对象 + + Args: + path (str): 以'/'分隔的多级节点路径 + node_labels (list): 节点标签列表,用于过滤结果 + + Returns: + dict|None: 节点数据,如果路径不存在返回None + """ + if not path: + logger.warning("输入路径为空,无法获取节点对象") + return None + + # 分割路径为各个部分 + path_parts = path.split("/") + + # 构建查询 + if len(path_parts) == 1: + # 只有一级路径,直接查询 + if node_labels: + labels_str = ":" + "|:".join(node_labels) + query = f""" + MATCH (n{labels_str}) + WHERE n.name = $name + RETURN n LIMIT 1 + """ + else: + query = """ + MATCH (n) + WHERE n.name = $name + RETURN n LIMIT 1 + """ + params = {"name": path_parts[0]} + else: + # 多级路径,构建路径查询 + last_part = path_parts[-1] + if node_labels: + labels_str = ":" + "|".join(node_labels) + query = f""" + MATCH path = (root)-[*]->(target{labels_str}) + WHERE target.name = $last_part + RETURN target as n LIMIT 1 + """ + else: + query = """ + MATCH path = (root)-[*]->(target) + WHERE target.name = $last_part + RETURN target as n LIMIT 1 + """ + params = {"last_part": last_part} + + try: + result = self.session.run(query, **params) + record = result.single() + + if not record: + logger.warning(f"路径 {path} 不存在,未找到节点对象") + return None + + logger.info(f"成功通过路径 {path} 获取节点对象") + return record["n"] + except Exception as e: + logger.error(f"获取节点对象时出错: {e}") + print(f"获取节点对象时出错: {e}") + return None + + # 项目划分查询方法 + # 项目划分查询方法 + def get_division_item_by_path(self, path) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 通过路径获取项目划分对象 + + Args: + path (str): 项目划分节点的路径,以'/'分隔的多级节点路径 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回的项目划分对象数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表,如果未找到节点,则包含父节点下所有ProjectDivisionItem节点 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 获取节点数据 + node_data = self.get_node_by_path(path, ["ProjectDivisionItem"]) + + if not node_data: + status = "error" + error = f"找不到路径: {path} 上的ProjectDivisionItem节点" + + # 提取父路径 + path_parts = path.split("/") + if len(path_parts) > 1: + # 去掉最后一个部分,得到父路径 + parent_path = "/".join(path_parts[:-1]) + parent_name = path_parts[-2] # 父节点名称 + + # 获取父节点 + parent_node = self.get_node_by_path(parent_path) + + if parent_node: + # 查询父节点下所有类型为ProjectDivisionItem的子节点 + query = """ + MATCH (p)-[*1..1]->(q:ProjectDivisionItem) + WHERE p.name = $parent_name + RETURN q.name as name + """ + params = {"parent_name": parent_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询父节点下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 创建项目划分对象并填充属性 + item = ProjectDivisionItem() + for key, value in node_data.items(): + if hasattr(item, key): + setattr(item, key, value) + + # 将对象的属性转换为字典 + for key, value in vars(item).items(): + if not key.startswith("_"): # 排除私有属性 + data[key] = value + + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + def get_division_node_by_parent_and_name( + self, parent_path, partial_name + ) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]: + """ + 通过父节点路径和模糊节点名称获取项目划分对象,包括子节点 + + 执行两步查询: + 1. 找到对应路径的父节点 + 2. 在父节点下查找名称中包含模糊名称的节点 + + Args: + parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 + partial_name (str): 目标节点的模糊或不完整名称 + + Returns: + Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (List[Dict[str, Any]]): 成功时返回的数据列表 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表 + """ + # 初始化返回结果 + status = "success" + data = [] + error = "" + helper_info = [] + + # 检查部分名称是否为空 + if not partial_name: + status = "error" + error = "节点名称不能为空" + return status, data, error, helper_info + + try: + # 第一步:找到父节点 + parent_node_data = self.get_node_by_path(parent_path) + + # 如果找不到父节点 + if not parent_node_data: + status = "error" + error = f"找不到父节点路径: {parent_path}" + + # 提取父路径的父节点 + path_parts = parent_path.split("/") + if len(path_parts) > 1: + # 去掉最后一个部分,得到父路径的父路径 + grandparent_path = "/".join(path_parts[:-1]) + grandparent_name = path_parts[-2] if len(path_parts) >= 2 else "" + + # 查询父路径的父节点下所有类型为ProjectDivisionItem的节点,不指定关系类型 + query = """ + MATCH (p)-[*1..1]->(q:ProjectDivisionItem) + WHERE p.name = $grandparent_name + RETURN q.name as name + """ + params = {"grandparent_name": grandparent_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + + # 如果没有找到任何节点,尝试更宽泛的查询 + if not helper_info: + broader_query = """ + MATCH (p {name: $grandparent_name})-[*1..2]->(q:ProjectDivisionItem) + RETURN q.name as name LIMIT 50 + """ + broader_result = self.session.run(broader_query, **params) + for record in broader_result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + print(f"查询辅助信息时出错: {str(e)}") + helper_info = [f"查询辅助信息时出错: {str(e)}"] + + return status, data, error, helper_info + + # 获取父节点名称 + parent_name = parent_node_data.get("name") + if not parent_name: + status = "error" + error = "父节点数据中没有name字段" + return status, data, error, helper_info + + parent_name = parent_node_data.get("name", "") + + # 第二步:改用父节点名称进行查询 + query = """ + MATCH (p {name: $parent_name})-[*1..1]-(n:ProjectDivisionItem) + WHERE toLower(n.name) CONTAINS toLower($partial_name) + RETURN n LIMIT 50 + """ + params = {"parent_name": parent_name, "partial_name": partial_name.strip()} + + # print(f"调试参数: parent_name='{parent_name}', partial_name='{partial_name}'") + result = self.session.run(query, **params) + records = list(result) + # print(f"调试返回: 找到{len(records)}条记录") + + # 处理查询结果 + items = [] + for record in records: + node_data = record["n"] + item = ProjectDivisionItem() + for key, value in node_data.items(): + if hasattr(item, key): + setattr(item, key, value) + items.append(vars(item)) + + # 如果没有找到匹配的节点 + if not items: + status = "error" + error = f"在父节点 '{parent_name}' 下找不到名称包含 '{partial_name}' 的节点" + + # 查询父节点下所有类型为ProjectDivisionItem的节点作为辅助信息,不指定关系类型 + helper_query = """ + MATCH (p)-[*1..1]->(q:ProjectDivisionItem) + WHERE p.id = $parent_id + RETURN q.name as name + """ + helper_params = {"parent_name": parent_name} + + try: + helper_result = self.session.run(helper_query, **helper_params) + for record in helper_result: + if record["name"]: + helper_info.append(record["name"]) + + # 如果没有找到任何节点,尝试更宽泛的查询 + if not helper_info: + broader_query = """ + MATCH (p)-[*1..2]->(q:ProjectDivisionItem) + WHERE p.id = $parent_id + RETURN q.name as name LIMIT 50 + """ + broader_result = self.session.run(broader_query, **helper_params) + for record in broader_result: + if record["name"]: + helper_info.append(record["name"]) + + # 如果仍然没有找到,尝试使用名称而不是ID + if not helper_info: + name_query = """ + MATCH (p {name: $parent_name})-[*1..2]->(q:ProjectDivisionItem) + RETURN q.name as name LIMIT 50 + """ + name_params = {"parent_name": parent_name} + name_result = self.session.run(name_query, **name_params) + for record in name_result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + print(f"查询辅助信息时出错: {str(e)}") + helper_info = [f"查询辅助信息时出错: {str(e)}"] + + return status, data, error, helper_info + + # 成功找到匹配节点 + data = items + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + def get_division_by_name(self, name_part) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]: + """ + 直接查找节点类型为ProjectDivisionItem且name字段包含输入名称的节点 + + Args: + name_part (str): 节点名称的部分内容 + + Returns: + Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (List[Dict[str, Any]]): 成功时返回的数据列表 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表 + """ + # 初始化返回结果 + status = "success" + data = [] + error = "" + helper_info = [] + + # 检查输入名称是否为空 + if not name_part or name_part.strip() == "": + status = "error" + error = "输入的名称部分不能为空" + return status, data, error, helper_info + + try: + # 直接查询所有类型为ProjectDivisionItem且name包含输入名称的节点 + query = """ + MATCH (n:ProjectDivisionItem) + WHERE toLower(n.name) CONTAINS toLower($name_part) + RETURN n LIMIT 50 + """ + params = {"name_part": name_part.strip()} + + result = self.session.run(query, parameters=params) + records = list(result) + + if not records: + status = "error" + error = f"找不到名称包含 '{name_part}' 的ProjectDivisionItem节点" + + # 提供一些可能相近的节点名称作为辅助信息 + broader_query = """ + MATCH (n:ProjectDivisionItem) + RETURN n.name AS name LIMIT 20 + """ + broader_result = self.session.run(broader_query) + for record in broader_result: + if record["name"]: + helper_info.append(record["name"]) + + return status, data, error, helper_info + + # 处理查询结果 + items = [] + for record in records: + node_data = record["n"] + item = ProjectDivisionItem() + for key, value in node_data.items(): + if hasattr(item, key): + setattr(item, key, value) + + # 获取节点的完整路径作为附加信息 + node_path_query = """ + MATCH path = (root)-[*]->(target:ProjectDivisionItem) + WHERE id(target) = $node_id + WITH [node in nodes(path) | node.name] AS path_names + RETURN path_names + """ + path_result = self.session.run(node_path_query, parameters={"node_id": node_data.id}) + path_record = path_result.single() + + node_dict = vars(item) + if path_record: + node_dict["完整路径"] = "/".join([name for name in path_record["path_names"] if name]) + + items.append(node_dict) + + data = items + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + # 工程量查询方法 + def get_quantities_by_paths(self, paths_str) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 获取指定项目路径下的工程量对象 + + Args: + paths_str (str): 以'/'分隔的多级节点路径 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: + - status: 'success'或'error' + - data: 成功时返回的节点数据字典,失败时为{} + - error: 错误信息,成功时为'' + - helper_info: 辅助信息列表(当目标节点不存在时返回父节点下的所有ProjectQuantity节点名称) + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + if not paths_str: + status = "error" + error = "路径不能为空" + return status, data, error, helper_info + + try: + # 使用通用方法获取节点,考虑所有可能的工程量类型 + node_data = self.get_node_by_path(paths_str, ["ProjectQuantity", "Quota", "MainMaterial", "Equipment"]) + + if node_data: + # 根据节点标签或类型属性创建对应类型的对象 + quantity = self._create_quantity_object(node_data) + + # 填充属性 + for key, value in node_data.items(): + if hasattr(quantity, key): + setattr(quantity, key, value) + + # 将对象转为字典 + data = vars(quantity) + return status, data, error, helper_info + + # 如果找不到节点,尝试获取父节点下的所有ProjectQuantity节点作为辅助信息 + path_parts = paths_str.split("/") + if len(path_parts) > 1: + parent_path = "/".join(path_parts[:-1]) + target_name = path_parts[-1] + + # 获取父节点 + parent_node_data = self.get_node_by_path(parent_path, ["ProjectDivisionItem"]) + + if parent_node_data: + # 查询父节点下的所有符合条件的节点 + query = """ + MATCH (p)-[*1..1]->(n) + WHERE p.name = $parent_name AND + any(label IN labels(n) WHERE label IN $allowed_labels) + RETURN n.name as name, labels(n) as labels + """ + params = { + "parent_name": parent_node_data.get("name", ""), + "allowed_labels": ["ProjectQuantity", "Quota", "MainMaterial", "Equipment"], + } + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append({record["name"]}) + except Exception as e: + print(f"查询辅助信息时出错: {str(e)}") + helper_info = [f"查询辅助信息时出错: {str(e)}"] + + status = "error" + error = f"找不到路径: {paths_str}" + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + def get_quantities_node_by_parent_and_code( + self, parent_path, quantity_type=None, code=None + ) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]: + """ + 通过父节点路径和编码获取工程量对象(定额、主材或设备),包括子节点 + + 执行三步查询: + 1. 找到对应路径的父节点 + 2. 找到父节点下多级子节点中节点类型为参数中节点类型的所有节点 + 3. 在找到的节点中查找编码与传入编码匹配的节点并返回 + + Args: + parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 + quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None表示所有类型) + code (str): 工程量编码,以'/'分隔的多个编码 + + Returns: + Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (List[Dict[str, Any]]): 成功时返回的数据列表 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表 + """ + # 初始化返回结果 + status = "success" + data = [] + error = "" + helper_info = [] + + if not code: + status = "error" + error = "编码不能为空" + return status, data, error, helper_info + + # 验证类型参数是否有效 + valid_types = ["定额", "主材", "设备", None] + if quantity_type not in valid_types: + status = "error" + error = f"无效的工程量类型: '{quantity_type}'" + helper_info = valid_types + return status, data, error, helper_info + + try: + # 步骤1: 找到对应路径的父节点 + parent_node = self.get_node_by_path(parent_path) + if not parent_node: + status = "error" + error = f"找不到父节点路径: {parent_path}" + + # 尝试提供辅助信息 - 路径的各部分是否存在 + path_parts = parent_path.split("/") + existing_parts = [] + for i in range(len(path_parts)): + partial_path = "/".join(path_parts[: i + 1]) + if self.get_node_by_path(partial_path): + existing_parts.append(partial_path) + helper_info = existing_parts + return status, data, error, helper_info + + # 提取父节点名称用于后续查询 + parent_name = parent_node.get("name", "") + + # 根据工程量类型确定类型条件 + type_condition = "" + + if quantity_type == "定额": + type_condition = "(q:ProjectQuantity:Quota OR q.类型 = '0')" + elif quantity_type == "主材": + type_condition = "(q:ProjectQuantity:MainMaterial OR q.类型 = '1')" + elif quantity_type == "设备": + type_condition = "(q:ProjectQuantity:Equipment OR q.类型 = '5')" + else: + type_condition = "q:ProjectQuantity" + + # 步骤2: 查找父节点下所有的指定类型的子节点 + child_query = f""" + MATCH (p)-[*1..10]->(q) + WHERE p.name = $parent_name AND {type_condition} + RETURN q + """ + child_params = {"parent_name": parent_name} + + child_result = self.session.run(child_query, **child_params) + child_nodes = [record["q"] for record in child_result] + + if not child_nodes: + status = "error" + error = f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 的节点" + + # 尝试提供辅助信息 - 父节点下有哪些节点名称 + helper_query = """ + MATCH (p)-[*1..10]->(q) + WHERE p.name = $parent_name + RETURN DISTINCT q.name as node_name + LIMIT 20 + """ + helper_result = self.session.run(helper_query, **child_params) + helper_info = [record["node_name"] for record in helper_result if record["node_name"]] + return status, data, error, helper_info + + # 步骤3: 查找编码匹配的节点 + code_conditions = [] + code_parts = code.split("/") + + for code_part in code_parts: + if code_part.strip(): + code_conditions.append(code_part.strip()) + + if not code_conditions: + status = "error" + error = "提供的编码为空" + return status, data, error, helper_info + + # 在 child_nodes 中直接查找编码匹配的节点 + matching_nodes = [] + + for node in child_nodes: + node_code = node.get("编码", "") + if any(code_part in str(node_code) for code_part in code_conditions): + matching_nodes.append(node) + + if not matching_nodes: + status = "error" + error = f"在父节点 '{parent_name}' 下找不到编码匹配的节点" + + # 尝试提供辅助信息 - 所有指定类型的子节点的编码 + helper_info = [node.get("编码") for node in child_nodes if node.get("编码")] + return status, data, error, helper_info + + # 处理匹配的节点,转换为字典格式 + result_data = [] + for node in matching_nodes: + quantity = self._create_quantity_object(node, quantity_type) + + for key, value in node.items(): + if hasattr(quantity, key): + setattr(quantity, key, value) + + attrs = {} + for key, value in vars(quantity).items(): + if not key.startswith("_"): # 排除私有属性 + attrs[key] = value + + result_data.append(attrs) + + data = result_data + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + def get_quantities_node_by_parent_and_name( + self, parent_path, partial_name, quantity_type=None + ) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]: + """ + 通过父节点路径、模糊节点名称和类型获取工程量对象(主材或者设备),包括子节点 + + 执行三步查询: + 1. 找到对应路径的父节点 + 2. 找到父节点下多级子节点中节点类型为参数中节点类型的所有节点 + 3. 在找到的节点中找到名称中包含节点模糊名称的节点返回 + + Args: + parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 + partial_name (str): 目标节点的模糊或不完整名称 + quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None表示所有类型) + + Returns: + Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (List[Dict[str, Any]]): 成功时返回的数据列表 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表 + """ + # 初始化返回结果 + status = "success" + data = [] + error = "" + helper_info = [] + + if not partial_name: + status = "error" + error = "节点名称不能为空" + return status, data, error, helper_info + + # 验证类型参数是否有效 + valid_types = ["定额", "主材", "设备", None] + if quantity_type not in valid_types: + status = "error" + error = f"无效的工程量类型: '{quantity_type}'" + helper_info = valid_types + return status, data, error, helper_info + + try: + # 步骤1: 找到对应路径的父节点 + parent_node = self.get_node_by_path(parent_path) + if not parent_node: + status = "error" + error = f"找不到父节点路径: {parent_path}" + # 尝试提供辅助信息 - 路径的各部分是否存在 + path_parts = parent_path.split("/") + existing_parts = [] + for i in range(len(path_parts)): + partial_path = "/".join(path_parts[: i + 1]) + if self.get_node_by_path(partial_path): + existing_parts.append(partial_path) + helper_info = existing_parts + return status, data, error, helper_info + + # 从父节点中提取名称,用于后续查询 + parent_name = parent_node.get("name", "") + + # 根据工程量类型确定类型条件,不使用标签过滤 + type_condition = "" + + if quantity_type == "定额": + # 定额类型可能是 ProjectQuantity+Quota 或者 类型='0' + type_condition = "(q:ProjectQuantity:Quota OR q.类型 = '0')" + elif quantity_type == "主材": + # 主材类型可能是 ProjectQuantity+MainMaterial 或者 类型='1' + type_condition = "(q:ProjectQuantity:MainMaterial OR q.类型 = '1')" + elif quantity_type == "设备": + # 设备类型可能是 ProjectQuantity+Equipment 或者 类型='5' + type_condition = "(q:ProjectQuantity:Equipment OR q.类型 = '5')" + else: + # 如果没有指定类型,则查询所有ProjectQuantity节点 + type_condition = "q:ProjectQuantity" + + # 步骤2: 找到父节点下所有的子节点,类型为指定类型的节点 + # 使用父节点名称进行查询,不指定具体的关系类型 + child_query = f""" + MATCH (p)-[*1..10]->(q) + WHERE p.name = $parent_name AND {type_condition} + RETURN q + """ + child_params = {"parent_name": parent_name} + + child_result = self.session.run(child_query, **child_params) + child_nodes = [record["q"] for record in child_result] + + if not child_nodes: + # 如果没有找到节点,尝试直接使用父节点路径的最后一部分进行查询 + path_parts = parent_path.split("/") + last_part = path_parts[-1] if path_parts else "" + + fallback_query = f""" + MATCH (p)-[*1..10]->(q) + WHERE p.name CONTAINS $last_part AND {type_condition} + RETURN q LIMIT 100 + """ + fallback_params = {"last_part": last_part} + + fallback_result = self.session.run(fallback_query, **fallback_params) + child_nodes = [record["q"] for record in fallback_result] + + if not child_nodes: + status = "error" + error = f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 的节点" + + # 尝试提供辅助信息 - 父节点下有哪些类型的节点 + helper_query = """ + MATCH (p)-[*1..10]->(q) + WHERE p.name = $parent_name + RETURN DISTINCT labels(q) as node_labels, q.类型 as node_type + LIMIT 20 + """ + helper_params = {"parent_name": parent_name} + + helper_result = self.session.run(helper_query, **helper_params) + helper_data = [] + for record in helper_result: + labels = record["node_labels"] if record["node_labels"] else [] + node_type = record["node_type"] if record["node_type"] else "未知" + helper_data.append(f"标签: {labels}, 类型: {node_type}") + + helper_info = helper_data + return status, data, error, helper_info + + # 步骤3: 在找到的节点中找到名称中包含节点模糊名称的节点 + matching_nodes = [] + available_nodes = [] # 用于存储所有可用节点的名称,作为辅助信息 + + for node in child_nodes: + node_name = node.get("name", "") + # 记录所有节点名称作为辅助信息 + if node_name: + available_nodes.append(node_name) + + if partial_name in str(node_name): + matching_nodes.append(node) + + if not matching_nodes: + # 如果没有找到匹配的节点,尝试直接查询 + direct_query = f""" + MATCH (q) + WHERE q.name CONTAINS $partial_name AND {type_condition} + RETURN q LIMIT 20 + """ + direct_params = {"partial_name": partial_name} + + direct_result = self.session.run(direct_query, **direct_params) + matching_nodes = [record["q"] for record in direct_result] + + if not matching_nodes: + status = "error" + error = ( + f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 且名称包含 '{partial_name}' 的节点" + ) + # 提供辅助信息 - 该路径下所有可用的节点名称 + helper_info = available_nodes + return status, data, error, helper_info + + # 处理匹配的节点,将它们转换为对象 + result_data = [] + for matching_node in matching_nodes: + # 创建对应类型的对象 + quantity = self._create_quantity_object(matching_node, quantity_type) + + # 填充属性 + for key, value in matching_node.items(): + if hasattr(quantity, key): + setattr(quantity, key, value) + + # 将对象的属性转换为字典 + attrs = {} + for key, value in vars(quantity).items(): + if not key.startswith("_"): # 排除私有属性 + attrs[key] = value + + result_data.append(attrs) + + # 成功找到匹配节点 + data = result_data + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + # 辅助方法,用于根据节点数据创建对应类型的工程量对象 + def _create_quantity_object(self, node_data, quantity_type=None): + """ + 根据节点数据创建对应类型的工程量对象 + + Args: + node_data (dict): 节点数据 + quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None) + + Returns: + ProjectQuantity: 创建的工程量对象 + """ + # 如果指定了类型,直接创建对应类型的对象 + if quantity_type == "定额": + return Ration() + elif quantity_type == "主材": + return Material() + elif quantity_type == "设备": + return Equipment() + + # 如果没有指定类型,尝试通过节点属性或标签判断 + if "类型" in node_data: + if node_data["类型"] == "0": + return Ration() + elif node_data["类型"] == "1": + return Material() + elif node_data["类型"] == "5": + return Equipment() + + # 通过标签判断 + labels = list(node_data.labels) if hasattr(node_data, "labels") else [] + if "Quota" in labels: + return Ration() + elif "MainMaterial" in labels: + return Material() + elif "Equipment" in labels: + return Equipment() + + # 默认返回基类对象 + return ProjectQuantity() + + # 材机查询方法实现 + def get_material_equipment_by_path(self, path: str) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 通过路径获取材机对象(MaterialOrEquipment 类型节点) + + Args: + path (str): 项目划分节点的路径,以'/'分隔的多级节点路径 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回的节点属性数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 如果未找到目标节点,则包含父节点下所有 MaterialOrEquipment 节点名称 + - 否则为空 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 获取目标节点 + node_data = self.get_node_by_path(path) + + if not node_data: + status = "error" + error = f"找不到路径: {path} 上的 MaterialOrEquipment 节点" + + # 提取父路径 + path_parts = path.split("/") + if len(path_parts) > 1: + parent_path = "/".join(path_parts[:-1]) # 父节点路径 + parent_name = path_parts[-2] # 父节点名称 + + # 获取父节点 + parent_node = self.get_node_by_path(parent_path) + if parent_node: + # 查询父节点下所有类型为 MaterialOrEquipment 的子节点名称 + query = """ + MATCH (p)-[*1..1]->(m:MaterialOrEquipment) + WHERE p.name = $parent_name + RETURN m.name AS name + """ + params = {"parent_name": parent_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询父节点下的材机子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 将节点属性转换为字典并过滤掉私有字段 + for key, value in node_data.items(): + if not key.startswith("_"): + data[key] = value + + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + def get_material_equipment_by_parent_and_code(self, parent_path, code): + """ + 通过父节点路径和编码获取材机对象 + + Args: + parent_path (str): 父节点的部分路径,以'/'分隔的多级节点路径 + code (str): 目标节点的编码属性值 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回的节点属性数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 如果未找到目标节点,则包含父节点下所有 MaterialOrEquipment 节点编码 + """ + status = "success" + data = {} + error = "" + helper_info = [] + + try: + if not parent_path or not code: + status = "error" + error = "父节点路径或要查找的编码不能为空" + return status, data, error, helper_info + + # 获取父节点路径的最后一部分 + path_parts = parent_path.split("/") + last_part = path_parts[-1] + + # 构建Cypher查询,查找包含指定路径名称的节点,然后找到其子孙节点中编码匹配的MaterialOrEquipment节点 + # 使用任意类型的关系 [*0..10],不指定关系类型 + query = """ + MATCH (p)-[*0..10]->(m:MaterialOrEquipment) + WHERE p.name CONTAINS $parent_name AND m.编码 = $code + RETURN m LIMIT 5 + """ + params = {"parent_name": last_part, "code": code} + + result = self.session.run(query, **params) + matched_nodes = [] + + for record in result: + node = record["m"] + properties = dict(node.items()) + matched_nodes.append(properties) + + if not matched_nodes: + status = "error" + error = f"在路径包含 '{parent_path}' 的节点下没有找到编码为 '{code}' 的 MaterialOrEquipment 节点" + + # 查询相似路径下的MaterialOrEquipment节点的编码作为辅助信息 + helper_query = """ + MATCH (p)-[*0..10]->(m:MaterialOrEquipment) + WHERE p.name CONTAINS $parent_name + RETURN m.编码 AS code, m.name AS name + LIMIT 5 + """ + + try: + helper_result = self.session.run(helper_query, parent_name=last_part) + for record in helper_result: + if record["code"] and record["name"]: + helper_info.append(f"{record['code']} - {record['name']}") + except Exception as e: + helper_info = [f"查询相似编码失败: {str(e)}"] + + return status, data, error, helper_info + + # 成功找到,返回第一个节点的数据 + data = matched_nodes[0] + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + # 辅助方法,用于创建材机对象并填充属性 + def _create_material_object(self, node_data): + """ + 根据节点数据创建材机对象并填充属性 + + Args: + node_data (dict): 节点数据 + + Returns: + MaterialOrEquipment: 创建的材机对象 + """ + material = MaterialOrEquipment() + + # 填充属性 + for key, value in node_data.items(): + if hasattr(material, key): + setattr(material, key, value) + + return material + + # 取费表模板查询方法实现 + def get_fee_template_by_path(self, path: str) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 通过路径获取取费表模板节点 + + Args: + path (str): 项目划分节点的路径,以'/'分隔的多级节点路径 + 例如:工程数据/取费表模板/余物清理/线路取费表(余物清理)/直接费 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回的节点属性数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 如果未找到目标节点,则包含父节点下所有 FeeCollection 节点名称 + - 否则为空 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 获取目标节点 + node_data = self.get_node_by_path(path) + + if not node_data: + status = "error" + error = f"找不到路径: {path} 上的节点" + + # 提取父路径 + path_parts = path.split("/") + if len(path_parts) > 1: + parent_path = "/".join(path_parts[:-1]) # 父节点路径 + parent_name = path_parts[-2] # 父节点名称 + + # 获取父节点 + parent_node = self.get_node_by_path(parent_path) + if parent_node: + # 查询父节点下所有类型为 FeeCollection 的子节点名称 + query = """ + MATCH (p)-[*1..1]->(f:FeeCollection) + WHERE p.name = $parent_name + RETURN f.name AS name + """ + params = {"parent_name": parent_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询父节点下的FeeCollection子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 将节点属性转换为字典并过滤掉私有字段 + for key, value in node_data.items(): + if not key.startswith("_"): + data[key] = value + + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + def get_fee_template_by_parent_and_name( + self, parent_path: str, node_name: str + ) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 通过父节点路径和节点名称获取取费表模板节点 + + Args: + parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 + 例如:工程数据/取费表模板/余物清理/线路取费表(余物清理) + node_name (str): 目标节点的名称,例如:直接费 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回的节点属性数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 如果未找到父节点,则包含父节点的父节点下所有节点名称 + - 如果未找到目标节点,则包含父节点下所有 FeeCollection 节点名称 + - 否则为空 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 第一段:查找父节点 + parent_node_data = self.get_node_by_path(parent_path) + + if not parent_node_data: + status = "error" + error = f"找不到父节点路径: {parent_path}" + + # 提取父节点的父路径 + parent_path_parts = parent_path.split("/") + if len(parent_path_parts) > 1: + grandparent_path = "/".join(parent_path_parts[:-1]) # 父节点的父路径 + grandparent_name = parent_path_parts[-2] # 父节点的父节点名称 + + # 获取父节点的父节点 + grandparent_node = self.get_node_by_path(grandparent_path) + if grandparent_node: + # 查询父节点的父节点下所有子节点名称 + query = """ + MATCH (gp)-[*1..1]->(n) + WHERE gp.name = $grandparent_name + RETURN n.name AS name + """ + params = {"grandparent_name": grandparent_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询父节点的父节点下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 第二段:在父节点下查找目标节点 + parent_node_name = parent_path.split("/")[-1] # 父节点名称 + + # 查询父节点下指定名称的节点 + query = """ + MATCH (p)-[*1..1]->(n) + WHERE p.name = $parent_name AND n.name = $node_name + RETURN n + """ + params = {"parent_name": parent_node_name, "node_name": node_name} + + try: + result = self.session.run(query, **params) + target_node = None + + for record in result: + target_node = record["n"] + break + + if not target_node: + status = "error" + error = f"在父节点 {parent_node_name} 下找不到名称为 {node_name} 的节点" + + # 查询父节点下所有类型为 FeeCollection 的子节点名称 + query = """ + MATCH (p)-[*1..1]->(f:FeeCollection) + WHERE p.name = $parent_name + RETURN f.name AS name + """ + params = {"parent_name": parent_node_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询父节点下的FeeCollection子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 将目标节点属性转换为字典并过滤掉私有字段 + for key, value in target_node.items(): + if not key.startswith("_"): + data[key] = value + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询目标节点时出错: {str(e)}" + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + # 辅助方法,用于创建取费表模板对象并填充属性 + def _create_fee_template_object(self, node_data): + """ + 根据节点数据创建取费表模板对象并填充属性 + + Args: + node_data (dict): 节点数据 + + Returns: + FeeTableTemplateItem: 创建的取费表模板对象 + """ + template = FeeTableTemplateItem() + + # 填充属性 + for key, value in node_data.items(): + if hasattr(template, key): + setattr(template, key, value) + + # 更新缓存 + if hasattr(template, "OutlayID") and template.OutlayID: + self.fee_templates[template.OutlayID] = template + + return template + + # 费用表查询方法实现 + def get_fee_schedule_on_auxiliary_expense_table( + self, table_name: str, fee_name: str, fee_attribute: str + ) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 在辅助费用表中查找费用 + + Args: + table_name (str): 费用表名称 + fee_name (str): 要查找的费用名称(可能在多级子节点中) + fee_attribute (str): 费用值属性名 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 第一步失败:返回"工程数据"节点下所有子节点名称 + - 第二步失败:返回费用表节点下所有子节点名称(递归查找) + - 第三步失败:返回费用节点的所有可用属性名称 + - 否则为空 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 检查输入参数 + if not table_name or not fee_name or not fee_attribute: + status = "error" + error = "输入参数不能为空" + return status, data, error, helper_info + + # 第一步:查找父节点(费用表节点) + parent_path = f"工程数据/工程费用/{table_name}" + parent_node_data = self.get_node_by_path(parent_path) + + if not parent_node_data: + status = "error" + error = f"找不到费用表节点: {parent_path}" + + # 查询"工程数据/工程费用"节点下所有子节点名称 + base_parent_node = self.get_node_by_path("工程数据/工程费用") + if base_parent_node: + query = """ + MATCH (p)-[*1..1]->(n) + WHERE p.name = '工程费用' + RETURN n.name AS name + """ + + try: + result = self.session.run(query) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) + fee_node = None + + # 递归查找费用节点,最多查找3级深度 + query = """ + MATCH (p)-[*1..3]->(f) + WHERE p.name = $table_name AND f.name = $fee_name + RETURN f LIMIT 1 + """ + params = {"table_name": table_name, "fee_name": fee_name} + + try: + result = self.session.run(query, **params) + all_records = result.data() + + if len(all_records) > 0: + fee_node = all_records[0]["f"] + + if not fee_node: + status = "error" + error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" + + # 查询费用表节点下所有子节点名称(递归查找) + query = """ + MATCH (p)-[*1..3]->(n) + WHERE p.name = $table_name + RETURN DISTINCT n.name AS name + ORDER BY n.name + """ + params = {"table_name": table_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询费用节点时出错: {str(e)}" + return status, data, error, helper_info + + # 第三步:获取费用节点的属性值 + try: + if fee_node and hasattr(fee_node, "get"): + fee_value = fee_node.get(fee_attribute) + elif fee_node and isinstance(fee_node, dict): + fee_value = fee_node.get(fee_attribute) + else: + # 如果fee_node是Neo4j Node对象,尝试直接访问属性 + fee_value = ( + getattr(fee_node, fee_attribute, None) + if hasattr(fee_node, fee_attribute) + else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None + ) + + if fee_value is None: + status = "error" + error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" + + # 返回费用节点的所有可用属性名称 + try: + if hasattr(fee_node, "keys"): + helper_info = list(fee_node.keys()) + elif isinstance(fee_node, dict): + helper_info = list(fee_node.keys()) + else: + # 对于Neo4j Node对象 + helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] + + # 过滤掉私有属性 + helper_info = [attr for attr in helper_info if not attr.startswith("_")] + + except Exception as e: + helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] + + return status, data, error, helper_info + + # 成功获取属性值,只返回属性值 + data = fee_value + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"获取费用属性值时出错: {str(e)}" + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + def get_fee_schedule_on_other_expense_table( + self, table_name: str, fee_name: str, fee_attribute: str + ) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 在其它费用表中查找费用 + + Args: + table_name (str): 费用表名称 + fee_name (str): 要查找的费用名称(可能在多级子节点中) + fee_attribute (str): 费用值属性名 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 第一步失败:返回"工程数据"节点下所有子节点名称 + - 第二步失败:返回费用表节点下所有子节点名称(递归查找) + - 第三步失败:返回费用节点的所有可用属性名称 + - 否则为空 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 检查输入参数 + if not table_name or not fee_name or not fee_attribute: + status = "error" + error = "输入参数不能为空" + return status, data, error, helper_info + + # 第一步:查找父节点(费用表节点) + parent_path = f"工程数据/工程费用/{table_name}" + parent_node_data = self.get_node_by_path(parent_path) + + if not parent_node_data: + status = "error" + error = f"找不到费用表节点: {parent_path}" + + # 查询"工程数据/工程费用"节点下所有子节点名称 + base_parent_node = self.get_node_by_path("工程数据/工程费用") + if base_parent_node: + query = """ + MATCH (p)-[*1..1]->(n) + WHERE p.name = '工程费用' + RETURN n.name AS name + """ + + try: + result = self.session.run(query) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) + fee_node = None + + # 递归查找费用节点,最多查找3级深度 + query = """ + MATCH (p)-[*1..3]->(f) + WHERE p.name = $table_name AND f.name = $fee_name + RETURN f LIMIT 1 + """ + params = {"table_name": table_name, "fee_name": fee_name} + + try: + result = self.session.run(query, **params) + all_records = result.data() + + if len(all_records) > 0: + fee_node = all_records[0]["f"] + + if not fee_node: + status = "error" + error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" + + # 查询费用表节点下所有子节点名称(递归查找) + query = """ + MATCH (p)-[*1..3]->(n) + WHERE p.name = $table_name + RETURN DISTINCT n.name AS name + ORDER BY n.name + """ + params = {"table_name": table_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询费用节点时出错: {str(e)}" + return status, data, error, helper_info + + # 第三步:获取费用节点的属性值 + try: + if fee_node and hasattr(fee_node, "get"): + fee_value = fee_node.get(fee_attribute) + elif fee_node and isinstance(fee_node, dict): + fee_value = fee_node.get(fee_attribute) + else: + # 如果fee_node是Neo4j Node对象,尝试直接访问属性 + fee_value = ( + getattr(fee_node, fee_attribute, None) + if hasattr(fee_node, fee_attribute) + else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None + ) + + if fee_value is None: + status = "error" + error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" + + # 返回费用节点的所有可用属性名称 + try: + if hasattr(fee_node, "keys"): + helper_info = list(fee_node.keys()) + elif isinstance(fee_node, dict): + helper_info = list(fee_node.keys()) + else: + # 对于Neo4j Node对象 + helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] + + # 过滤掉私有属性 + helper_info = [attr for attr in helper_info if not attr.startswith("_")] + + except Exception as e: + helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] + + return status, data, error, helper_info + + # 成功获取属性值,只返回属性值 + data = fee_value + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"获取费用属性值时出错: {str(e)}" + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + def get_fee_schedule_on_land_acquisition_fee_table_table( + self, table_name: str, fee_name: str, fee_attribute: str + ) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 在土地征用费表中查找费用 + + Args: + table_name (str): 费用表名称 + fee_name (str): 要查找的费用名称(可能在多级子节点中) + fee_attribute (str): 费用值属性名 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 第一步失败:返回"工程数据"节点下所有子节点名称 + - 第二步失败:返回费用表节点下所有子节点名称(递归查找) + - 第三步失败:返回费用节点的所有可用属性名称 + - 否则为空 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 检查输入参数 + if not table_name or not fee_name or not fee_attribute: + status = "error" + error = "输入参数不能为空" + return status, data, error, helper_info + + # 第一步:查找父节点(费用表节点) + parent_path = f"工程数据/工程费用/{table_name}" + parent_node_data = self.get_node_by_path(parent_path) + + if not parent_node_data: + status = "error" + error = f"找不到费用表节点: {parent_path}" + + # 查询"工程数据/工程费用"节点下所有子节点名称 + base_parent_node = self.get_node_by_path("工程数据/工程费用") + if base_parent_node: + query = """ + MATCH (p)-[*1..1]->(n) + WHERE p.name = '工程费用' + RETURN n.name AS name + """ + + try: + result = self.session.run(query) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) + fee_node = None + + # 递归查找费用节点,最多查找3级深度 + query = """ + MATCH (p)-[*1..3]->(f) + WHERE p.name = $table_name AND f.name = $fee_name + RETURN f LIMIT 1 + """ + params = {"table_name": table_name, "fee_name": fee_name} + + try: + result = self.session.run(query, **params) + all_records = result.data() + + if len(all_records) > 0: + fee_node = all_records[0]["f"] + + if not fee_node: + status = "error" + error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" + + # 查询费用表节点下所有子节点名称(递归查找) + query = """ + MATCH (p)-[*1..3]->(n) + WHERE p.name = $table_name + RETURN DISTINCT n.name AS name + ORDER BY n.name + """ + params = {"table_name": table_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询费用节点时出错: {str(e)}" + return status, data, error, helper_info + + # 第三步:获取费用节点的属性值 + try: + if fee_node and hasattr(fee_node, "get"): + fee_value = fee_node.get(fee_attribute) + elif fee_node and isinstance(fee_node, dict): + fee_value = fee_node.get(fee_attribute) + else: + # 如果fee_node是Neo4j Node对象,尝试直接访问属性 + fee_value = ( + getattr(fee_node, fee_attribute, None) + if hasattr(fee_node, fee_attribute) + else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None + ) + + if fee_value is None: + status = "error" + error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" + + # 返回费用节点的所有可用属性名称 + try: + if hasattr(fee_node, "keys"): + helper_info = list(fee_node.keys()) + elif isinstance(fee_node, dict): + helper_info = list(fee_node.keys()) + else: + # 对于Neo4j Node对象 + helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] + + # 过滤掉私有属性 + helper_info = [attr for attr in helper_info if not attr.startswith("_")] + + except Exception as e: + helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] + + return status, data, error, helper_info + + # 成功获取属性值,只返回属性值 + data = fee_value + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"获取费用属性值时出错: {str(e)}" + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + def get_fee_schedule_on_installation_price_difference_table( + self, table_name: str, fee_name: str, fee_attribute: str + ) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 在安装价差表中查找费用 + + Args: + table_name (str): 费用表名称 + fee_name (str): 要查找的费用名称(可能在多级子节点中) + fee_attribute (str): 费用值属性名 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 第一步失败:返回"工程数据"节点下所有子节点名称 + - 第二步失败:返回费用表节点下所有子节点名称(递归查找) + - 第三步失败:返回费用节点的所有可用属性名称 + - 否则为空 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 检查输入参数 + if not table_name or not fee_name or not fee_attribute: + status = "error" + error = "输入参数不能为空" + return status, data, error, helper_info + + # 第一步:查找父节点(费用表节点) + parent_path = f"工程数据/工程费用/{table_name}" + parent_node_data = self.get_node_by_path(parent_path) + + if not parent_node_data: + status = "error" + error = f"找不到费用表节点: {parent_path}" + + # 查询"工程数据/工程费用"节点下所有子节点名称 + base_parent_node = self.get_node_by_path("工程数据/工程费用") + if base_parent_node: + query = """ + MATCH (p)-[*1..1]->(n) + WHERE p.name = '工程费用' + RETURN n.name AS name + """ + + try: + result = self.session.run(query) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) + fee_node = None + + # 递归查找费用节点,最多查找3级深度 + query = """ + MATCH (p)-[*1..3]->(f) + WHERE p.name = $table_name AND f.name = $fee_name + RETURN f LIMIT 1 + """ + params = {"table_name": table_name, "fee_name": fee_name} + + try: + result = self.session.run(query, **params) + all_records = result.data() + + if len(all_records) > 0: + fee_node = all_records[0]["f"] + + if not fee_node: + status = "error" + error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" + + # 查询费用表节点下所有子节点名称(递归查找) + query = """ + MATCH (p)-[*1..3]->(n) + WHERE p.name = $table_name + RETURN DISTINCT n.name AS name + ORDER BY n.name + """ + params = {"table_name": table_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询费用节点时出错: {str(e)}" + return status, data, error, helper_info + + # 第三步:获取费用节点的属性值 + try: + if fee_node and hasattr(fee_node, "get"): + fee_value = fee_node.get(fee_attribute) + elif fee_node and isinstance(fee_node, dict): + fee_value = fee_node.get(fee_attribute) + else: + # 如果fee_node是Neo4j Node对象,尝试直接访问属性 + fee_value = ( + getattr(fee_node, fee_attribute, None) + if hasattr(fee_node, fee_attribute) + else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None + ) + + if fee_value is None: + status = "error" + error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" + + # 返回费用节点的所有可用属性名称 + try: + if hasattr(fee_node, "keys"): + helper_info = list(fee_node.keys()) + elif isinstance(fee_node, dict): + helper_info = list(fee_node.keys()) + else: + # 对于Neo4j Node对象 + helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] + + # 过滤掉私有属性 + helper_info = [attr for attr in helper_info if not attr.startswith("_")] + + except Exception as e: + helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] + + return status, data, error, helper_info + + # 成功获取属性值,只返回属性值 + data = fee_value + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"获取费用属性值时出错: {str(e)}" + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + def get_fee_schedule_on_Engineering_Cost_table( + self, table_name: str, fee_name: str, fee_attribute: str + ) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 在工程费用表中查找费用 + + Args: + table_name (str): 费用表名称 + fee_name (str): 要查找的费用名称(可能在多级子节点中) + fee_attribute (str): 费用值属性名 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表: + - 第一步失败:返回"工程数据"节点下所有子节点名称 + - 第二步失败:返回费用表节点下所有子节点名称(递归查找) + - 第三步失败:返回费用节点的所有可用属性名称 + - 否则为空 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 检查输入参数 + if not table_name or not fee_name or not fee_attribute: + status = "error" + error = "输入参数不能为空" + return status, data, error, helper_info + + # 第一步:查找父节点(费用表节点) + parent_path = f"工程数据/工程费用/{table_name}" + parent_node_data = self.get_node_by_path(parent_path) + + if not parent_node_data: + status = "error" + error = f"找不到费用表节点: {parent_path}" + + # 查询"工程数据/工程费用"节点下所有子节点名称 + base_parent_node = self.get_node_by_path("工程数据/工程费用") + if base_parent_node: + query = """ + MATCH (p)-[*1..1]->(n) + WHERE p.name = '工程费用' + RETURN n.name AS name + """ + + try: + result = self.session.run(query) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 第二步:在费用表节点下查找费用节点(可能在多级子节点中) + fee_node = None + + # 递归查找费用节点,最多查找3级深度 + query = """ + MATCH (p)-[*1..3]->(f) + WHERE p.name = $table_name AND f.name = $fee_name + RETURN f LIMIT 1 + """ + params = {"table_name": table_name, "fee_name": fee_name} + + try: + result = self.session.run(query, **params) + all_records = result.data() + + if len(all_records) > 0: + fee_node = all_records[0]["f"] + + if not fee_node: + status = "error" + error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}" + + # 查询费用表节点下所有子节点名称(递归查找) + query = """ + MATCH (p)-[*1..3]->(n) + WHERE p.name = $table_name + RETURN DISTINCT n.name AS name + ORDER BY n.name + """ + params = {"table_name": table_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询费用表下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询费用节点时出错: {str(e)}" + return status, data, error, helper_info + + # 第三步:获取费用节点的属性值 + try: + if fee_node and hasattr(fee_node, "get"): + fee_value = fee_node.get(fee_attribute) + elif fee_node and isinstance(fee_node, dict): + fee_value = fee_node.get(fee_attribute) + else: + # 如果fee_node是Neo4j Node对象,尝试直接访问属性 + fee_value = ( + getattr(fee_node, fee_attribute, None) + if hasattr(fee_node, fee_attribute) + else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None + ) + + if fee_value is None: + status = "error" + error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}" + + # 返回费用节点的所有可用属性名称 + try: + if hasattr(fee_node, "keys"): + helper_info = list(fee_node.keys()) + elif isinstance(fee_node, dict): + helper_info = list(fee_node.keys()) + else: + # 对于Neo4j Node对象 + helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else [] + + # 过滤掉私有属性 + helper_info = [attr for attr in helper_info if not attr.startswith("_")] + + except Exception as e: + helper_info = [f"获取费用节点属性列表时出错: {str(e)}"] + + return status, data, error, helper_info + + # 成功获取属性值,只返回属性值 + data = fee_value + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"获取费用属性值时出错: {str(e)}" + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + # 工程属性查询方法 + def get_project_property(self, property_name): + """ + 通过属性名称获取工程属性 + + Args: + property_name (str): 属性名称 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表 + """ + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 构建查询,查找父节点类型为ProjectPropertySet下类型为ProjectProperty的节点 + query = """ + MATCH (parent:ProjectPropertySet)-[*1..1]->(property:ProjectProperty) + WHERE property.name = $property_name + RETURN property + LIMIT 1 + """ + params = {"property_name": property_name} + + result = self.session.run(query, **params) + record = result.single() + + if not record: + status = "error" + error = f"找不到名称为 '{property_name}' 的工程属性节点" + + # 查询类似的节点名称作为辅助信息 + similar_query = """ + MATCH (parent:ProjectPropertySet)-[*1..1]->(property:ProjectProperty) + RETURN property.name AS name + LIMIT 5 + """ + + try: + similar_result = self.session.run(similar_query) + for similar_record in similar_result: + if similar_record["name"]: + helper_info.append(similar_record["name"]) + except Exception as e: + helper_info = [f"查询类似节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 获取节点属性 + property_node = record["property"] + + # 将节点所有属性添加到返回数据中 + for key, value in property_node.items(): + if not key.startswith("_"): + data[key] = value + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询工程属性时出错: {str(e)}" + return status, data, error, helper_info + + # 项目划分查找取费表 + + def get_fee_table_by_project_division( + self, project_division_path, fee_name + ) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 通过项目划分路径和取费名称查找费用 + + Args: + project_division_path(str): 项目划分节点的路径 + fee_name (str): 取费名称 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 第一步:查找项目划分路径下的ProjectDivisionItem节点 + division_node = self.get_node_by_path(project_division_path, ["ProjectDivisionItem"]) + + if not division_node: + status = "error" + error = f"找不到路径: {project_division_path} 上的ProjectDivisionItem节点" + + # 提取父路径 + path_parts = project_division_path.split("/") + if len(path_parts) > 1: + parent_path = "/".join(path_parts[:-1]) + parent_name = path_parts[-2] + + # 获取父节点下所有ProjectDivisionItem节点名称作为辅助信息 + query = """ + MATCH (p)-[*1..1]->(q:ProjectDivisionItem) + WHERE p.name = $parent_name + RETURN q.name as name + """ + params = {"parent_name": parent_name} + + try: + result = self.session.run(query, **params) + for record in result: + if record["name"]: + helper_info.append(record["name"]) + except Exception as e: + helper_info = [f"查询父节点下的子节点时出错: {str(e)}"] + + return status, data, error, helper_info + + # 第二步:在项目划分节点的子节点中查找属性name为"费用预览集"且关系为USE的节点 + query = """ + MATCH (p:ProjectDivisionItem)-[r:USE]->(c:CostSet) + WHERE id(p) = $division_node_id AND c.name = "费用预览集" + RETURN c + LIMIT 1 + """ + params = {"division_node_id": division_node.id} + + try: + result = self.session.run(query, **params) + record = result.single() + + if not record: + status = "error" + error = f"在项目划分节点下找不到name为'费用预览集'且关系为USE的CostSet节点" + + # 查询项目划分节点下的所有子节点作为辅助信息 + helper_query = """ + MATCH (p:ProjectDivisionItem)-[r]->(c) + WHERE id(p) = $division_node_id + RETURN type(r) as relation_type, c.name as name, labels(c) as labels + LIMIT 5 + """ + helper_params = {"division_node_id": division_node.id} + helper_result = self.session.run(helper_query, **helper_params) + + for helper_record in helper_result: + relation_type = helper_record.get("relation_type", "未知关系") + name = helper_record.get("name", "未知名称") + labels = helper_record.get("labels", []) + helper_info.append(f"关系类型: {relation_type}, 节点名称: {name}, 节点标签: {labels}") + + return status, data, error, helper_info + + cost_set_node = record["c"] + + except Exception as e: + status = "error" + error = f"查询CostSet节点时出错: {str(e)}" + return status, data, error, helper_info + + # 第三步:在CostSet节点的子节点中查找名称为fee_name的CostItem节点 + query = """ + MATCH (c:CostSet)-[*1..1]->(i:CostItem) + WHERE id(c) = $cost_set_id AND i.name = $fee_name + RETURN i + LIMIT 1 + """ + params = {"cost_set_id": cost_set_node.id, "fee_name": fee_name} + + try: + result = self.session.run(query, **params) + record = result.single() + + if not record: + status = "error" + error = f"在CostSet节点下找不到名称为 {fee_name} 的CostItem节点" + + # 查询该CostSet下所有CostItem节点名称作为辅助信息 + helper_query = """ + MATCH (c:CostSet)-[*1..1]->(i:CostItem) + WHERE id(c) = $cost_set_id + RETURN i.name AS name + LIMIT 5 + """ + helper_params = {"cost_set_id": cost_set_node.id} + helper_result = self.session.run(helper_query, **helper_params) + + for helper_record in helper_result: + if helper_record["name"]: + helper_info.append(helper_record["name"]) + + return status, data, error, helper_info + + cost_item_node = record["i"] + + # 将CostItem节点的所有属性添加到返回数据中 + for key, value in cost_item_node.items(): + if not key.startswith("_"): # 排除私有属性 + data[key] = value + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询CostItem节点时出错: {str(e)}" + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + # 清单查找取费表 + def get_fee_table_by_list( + self, parent_path, list_code, list_unit, fee_name + ) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 通过清单名称和取费名称查找费用 + + Args: + parent_path (str): 父级路径 + list_code (str): 清单编号 + list_unit (str): 清单单位 + fee_name (str): 取费名称 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 第一步:查找父路径下类型为List的节点,且编码和单位匹配 + query = """ + MATCH (p)-[*0..10]->(l:List) + WHERE p.name CONTAINS $parent_name AND l.编码 = $list_code AND l.单位 = $list_unit + RETURN l + LIMIT 1 + """ + params = {"parent_name": parent_path.split("/")[-1], "list_code": list_code, "list_unit": list_unit} + + try: + result = self.session.run(query, **params) + record = result.single() + + if not record: + status = "error" + error = f"在路径 {parent_path} 下找不到编码为 {list_code} 且单位为 {list_unit} 的List节点" + + # 查询路径下的List节点作为辅助信息 + helper_query = """ + MATCH (p)-[*0..10]->(l:List) + WHERE p.name CONTAINS $parent_name + RETURN l.编码 AS code, l.单位 AS unit, l.name AS name + LIMIT 5 + """ + + helper_params = {"parent_name": parent_path.split("/")[-1]} + helper_result = self.session.run(helper_query, **helper_params) + + for helper_record in helper_result: + code = helper_record.get("code", "") + unit = helper_record.get("unit", "") + name = helper_record.get("name", "") + helper_info.append(f"{code} - {unit} - {name}") + + return status, data, error, helper_info + + list_node = record["l"] + + except Exception as e: + status = "error" + error = f"查询List节点时出错: {str(e)}" + return status, data, error, helper_info + + # 第二步:在list节点的子节点中查找属性name为"费用预览集"且关系为USE的节点 + query = """ + MATCH (l:List)-[r:USE]->(c:CostSet) + WHERE id(l) = $list_node_id AND c.name = "费用预览集" + RETURN c + LIMIT 1 + """ + params = {"list_node_id": list_node.id} + + try: + result = self.session.run(query, **params) + record = result.single() + + if not record: + status = "error" + error = f"在List节点下找不到name为'费用预览集'且关系为USE的CostSet节点" + + # 查询List节点下的所有子节点作为辅助信息 + helper_query = """ + MATCH (l:List)-[r]->(c) + WHERE id(l) = $list_node_id + RETURN type(r) as relation_type, c.name as name, labels(c) as labels + LIMIT 5 + """ + helper_params = {"list_node_id": list_node.id} + helper_result = self.session.run(helper_query, **helper_params) + + for helper_record in helper_result: + relation_type = helper_record.get("relation_type", "未知关系") + name = helper_record.get("name", "未知名称") + labels = helper_record.get("labels", []) + helper_info.append(f"关系类型: {relation_type}, 节点名称: {name}, 节点标签: {labels}") + + return status, data, error, helper_info + + cost_set_node = record["c"] + + except Exception as e: + status = "error" + error = f"查询CostSet节点时出错: {str(e)}" + return status, data, error, helper_info + + # 第三步:在CostSet节点的子节点中查找名称为fee_name的CostItem节点 + query = """ + MATCH (c:CostSet)-[*1..1]->(i:CostItem) + WHERE id(c) = $cost_set_id AND i.name = $fee_name + RETURN i + LIMIT 1 + """ + params = {"cost_set_id": cost_set_node.id, "fee_name": fee_name} + + try: + result = self.session.run(query, **params) + record = result.single() + + if not record: + status = "error" + error = f"在CostSet节点下找不到名称为 {fee_name} 的CostItem节点" + + # 查询该CostSet下所有CostItem节点名称作为辅助信息 + helper_query = """ + MATCH (c:CostSet)-[*1..1]->(i:CostItem) + WHERE id(c) = $cost_set_id + RETURN i.name AS name + LIMIT 5 + """ + helper_params = {"cost_set_id": cost_set_node.id} + helper_result = self.session.run(helper_query, **helper_params) + + for helper_record in helper_result: + if helper_record["name"]: + helper_info.append(helper_record["name"]) + + return status, data, error, helper_info + + cost_item_node = record["i"] + + # 将CostItem节点的所有属性添加到返回数据中 + for key, value in cost_item_node.items(): + if not key.startswith("_"): # 排除私有属性 + data[key] = value + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询CostItem节点时出错: {str(e)}" + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + # 定额节点查找合价 + def get_fee_table_by_quoto_code( + self, parent_path, quantity_type, code, fee_name + ) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 通过父级路径、工程量类型、定额编码和取费名称查找费用 + + Args: + parent_path (str): 父级路径 + quantity_type (str): 工程量类型 + code (str): 定额编码 + fee_name (str): 取费名称 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 第一步:查找父路径下类型为Quota的节点,且编码匹配 + # 根据工程量类型构建节点类型条件 + node_type_condition = "" + if quantity_type == "定额": + node_type_condition = "(q:ProjectQuantity:Quota OR q.类型 = '0')" + else: + status = "error" + error = f"工程量类型必须为'定额'" + return status, data, error, helper_info + + query = """ + MATCH (p)-[*0..10]->(q) + WHERE p.name CONTAINS $parent_name AND q.编码 = $code AND {type_condition} + RETURN q + LIMIT 1 + """.format( + type_condition=node_type_condition + ) + + params = {"parent_name": parent_path.split("/")[-1], "code": code} + + try: + result = self.session.run(query, **params) + record = result.single() + + if not record: + status = "error" + error = f"在路径 {parent_path} 下找不到编码为 {code} 的Quota节点" + + # 查询路径下的Quota节点作为辅助信息 + helper_query = """ + MATCH (p)-[*0..10]->(q) + WHERE p.name CONTAINS $parent_name AND {type_condition} + RETURN q.编码 AS code, q.name AS name + LIMIT 5 + """.format( + type_condition=node_type_condition + ) + + helper_params = {"parent_name": parent_path.split("/")[-1]} + helper_result = self.session.run(helper_query, **helper_params) + + for helper_record in helper_result: + code_val = helper_record.get("code", "") + name = helper_record.get("name", "") + helper_info.append(f"{code_val} - {name}") + + return status, data, error, helper_info + + quota_node = record["q"] + + # 获取GUID + guid = quota_node.get("GUID") + if not guid: + status = "error" + error = f"Quota节点没有GUID属性" + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询Quota节点时出错: {str(e)}" + return status, data, error, helper_info + + # 第二步:查找GUID与Quota的GUID匹配的CostSet节点 + query = """ + MATCH (c:CostSet) + WHERE c.name = $guid + RETURN c + LIMIT 1 + """ + params = {"guid": guid} + + try: + result = self.session.run(query, **params) + record = result.single() + + if not record: + status = "error" + error = f"找不到与GUID: {guid} 匹配的CostSet节点" + + # 查找一些CostSet节点名称作为辅助信息 + helper_query = """ + MATCH (c:CostSet) + RETURN c.name AS name + LIMIT 5 + """ + + helper_result = self.session.run(helper_query) + for helper_record in helper_result: + if helper_record["name"]: + helper_info.append(helper_record["name"]) + + return status, data, error, helper_info + + cost_set_node = record["c"] + + except Exception as e: + status = "error" + error = f"查询CostSet节点时出错: {str(e)}" + return status, data, error, helper_info + + # 第三步:在CostSet节点的子节点中查找名称为fee_name的CostItem节点 + query = """ + MATCH (c:CostSet)-[*1..1]->(i:CostItem) + WHERE c.name = $guid AND i.name = $fee_name + RETURN i + LIMIT 1 + """ + params = {"guid": guid, "fee_name": fee_name} + + try: + result = self.session.run(query, **params) + record = result.single() + + if not record: + status = "error" + error = f"在CostSet节点 {guid} 下找不到名称为 {fee_name} 的CostItem节点" + + # 查询该CostSet下所有CostItem节点名称作为辅助信息 + helper_query = """ + MATCH (c:CostSet)-[*1..1]->(i:CostItem) + WHERE c.name = $guid + RETURN i.name AS name + LIMIT 5 + """ + + helper_params = {"guid": guid} + helper_result = self.session.run(helper_query, **helper_params) + + for helper_record in helper_result: + if helper_record["name"]: + helper_info.append(helper_record["name"]) + + return status, data, error, helper_info + + cost_item_node = record["i"] + + # 将CostItem节点的所有属性添加到返回数据中 + for key, value in cost_item_node.items(): + if not key.startswith("_"): # 排除私有属性 + data[key] = value + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询CostItem节点时出错: {str(e)}" + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + def get_fee_table_by_quantities_name( + self, parent_path, quantity_type, quantity_name, fee_name + ) -> Tuple[str, Dict[str, Any], str, List[Any]]: + """ + 通过父级路径、工程量类型、工程量名称和取费名称查找费用 + + Args: + parent_path (str): 父级路径 + quantity_type (str): 工程量类型 + quantity_name (str): 工程量名称 + fee_name (str): 取费名称 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 第一步:根据工程量类型构建节点类型条件 + node_type_condition = "" + if quantity_type == "主材": + node_type_condition = "(q:ProjectQuantity:MainMaterial OR q.类型 = '1')" + elif quantity_type == "设备": + node_type_condition = "(q:ProjectQuantity:Equipment OR q.类型 = '5')" + else: + status = "error" + error = f"工程量类型必须为'主材'或'设备'" + helper_info = ["主材", "设备"] + return status, data, error, helper_info + + # 查找父路径下指定类型的节点,且名称匹配 + query = """ + MATCH (p)-[*0..10]->(q) + WHERE p.name CONTAINS $parent_name AND q.name CONTAINS $quantity_name AND {type_condition} + RETURN q + LIMIT 1 + """.format( + type_condition=node_type_condition + ) + + params = {"parent_name": parent_path.split("/")[-1], "quantity_name": quantity_name} + + try: + result = self.session.run(query, **params) + record = result.single() + + if not record: + status = "error" + error = ( + f"在路径 {parent_path} 下找不到类型为 {quantity_type} 且名称包含 {quantity_name} 的工程量节点" + ) + + # 查询路径下的工程量节点作为辅助信息 + helper_query = """ + MATCH (p)-[*0..10]->(q) + WHERE p.name CONTAINS $parent_name AND {type_condition} + RETURN q.name AS name + LIMIT 5 + """.format( + type_condition=node_type_condition + ) + + helper_params = {"parent_name": parent_path.split("/")[-1]} + helper_result = self.session.run(helper_query, **helper_params) + + for helper_record in helper_result: + name = helper_record.get("name", "") + helper_info.append(name) + + return status, data, error, helper_info + + quantity_node = record["q"] + + # 获取GUID + guid = quantity_node.get("GUID") + if not guid: + status = "error" + error = f"工程量节点没有GUID属性" + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询工程量节点时出错: {str(e)}" + return status, data, error, helper_info + + # 第二步:查找GUID与工程量节点的GUID匹配的CostSet节点 + query = """ + MATCH (c:CostSet) + WHERE c.name = $guid + RETURN c + LIMIT 1 + """ + params = {"guid": guid} + + try: + result = self.session.run(query, **params) + record = result.single() + + if not record: + status = "error" + error = f"找不到与GUID: {guid} 匹配的CostSet节点" + + # 查找一些CostSet节点名称作为辅助信息 + helper_query = """ + MATCH (c:CostSet) + RETURN c.name AS name + LIMIT 5 + """ + + helper_result = self.session.run(helper_query) + for helper_record in helper_result: + if helper_record["name"]: + helper_info.append(helper_record["name"]) + + return status, data, error, helper_info + + cost_set_node = record["c"] + + except Exception as e: + status = "error" + error = f"查询CostSet节点时出错: {str(e)}" + return status, data, error, helper_info + + # 第三步:在CostSet节点的子节点中查找名称为fee_name的CostItem节点 + query = """ + MATCH (c:CostSet)-[*1..1]->(i:CostItem) + WHERE c.name = $guid AND i.name = $fee_name + RETURN i + LIMIT 1 + """ + params = {"guid": guid, "fee_name": fee_name} + + try: + result = self.session.run(query, **params) + record = result.single() + + if not record: + status = "error" + error = f"在CostSet节点 {guid} 下找不到名称为 {fee_name} 的CostItem节点" + + # 查询该CostSet下所有CostItem节点名称作为辅助信息 + helper_query = """ + MATCH (c:CostSet)-[*1..1]->(i:CostItem) + WHERE c.name = $guid + RETURN i.name AS name + LIMIT 5 + """ + + helper_params = {"guid": guid} + helper_result = self.session.run(helper_query, **helper_params) + + for helper_record in helper_result: + if helper_record["name"]: + helper_info.append(helper_record["name"]) + + return status, data, error, helper_info + + cost_item_node = record["i"] + + # 将CostItem节点的所有属性添加到返回数据中 + for key, value in cost_item_node.items(): + if not key.startswith("_"): # 排除私有属性 + data[key] = value + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询CostItem节点时出错: {str(e)}" + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info + + # 材机节点查找市场价 + def get_fee_by_material_equipment_code(self, parent_path, material_equipment_code, fee_name): + """ + 通过父级路径、材机编码和取费名称查找费用 + + Args: + parent_path (str): 父级路径 + material_equipment_code (str): 材机编码 + fee_name (str): 取费名称 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表 + """ + pass + + # 查找清单节点 + def get_fee_table_by_list_name(self, parent_path, list_code, unit): + """ + 通过父级路径、清单编号和单位查找清单 + + Args: + parent_path (str): 父级路径 + list_code (str): 清单编号 + unit (str): 单位 + + Returns: + Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组 + - status (str): 状态,'success'或'error' + - data (Dict[str, Any]): 成功时返回包含属性值的数据 + - error (str): 错误时的错误信息 + - helper_info (List[Any]): 辅助信息列表 + """ + # 初始化返回结果 + status = "success" + data = {} + error = "" + helper_info = [] + + try: + # 查找父路径下类型为List的节点,且编码和单位匹配 + query = """ + MATCH (p)-[*0..10]->(l:List) + WHERE p.name CONTAINS $parent_name AND l.编码 = $list_code AND l.单位 = $unit + RETURN l + LIMIT 1 + """ + params = {"parent_name": parent_path.split("/")[-1], "list_code": list_code, "unit": unit} + + try: + result = self.session.run(query, **params) + record = result.single() + + if not record: + status = "error" + error = f"在路径 {parent_path} 下找不到编码为 {list_code} 且单位为 {unit} 的List节点" + + # 查询路径下的List节点作为辅助信息 + helper_query = """ + MATCH (p)-[*0..10]->(l:List) + WHERE p.name CONTAINS $parent_name + RETURN l.编码 AS code, l.单位 AS unit, l.name AS name + LIMIT 5 + """ + + helper_params = {"parent_name": parent_path.split("/")[-1]} + helper_result = self.session.run(helper_query, **helper_params) + + for helper_record in helper_result: + code = helper_record.get("code", "") + unit_val = helper_record.get("unit", "") + name = helper_record.get("name", "") + helper_info.append(f"{code} - {unit_val} - {name}") + + return status, data, error, helper_info + + list_node = record["l"] + + # 将节点的所有属性添加到返回数据中 + for key, value in list_node.items(): + if not key.startswith("_"): # 排除私有属性 + data[key] = value + + return status, data, error, helper_info + + except Exception as e: + status = "error" + error = f"查询List节点时出错: {str(e)}" + return status, data, error, helper_info + + except Exception as e: + import traceback + + error_details = traceback.format_exc() + print(f"查询出错: {error_details}") + + status = "error" + error = f"查询失败: {str(e)}" + return status, data, error, helper_info diff --git a/supplement_kg.py b/supplement_kg.py new file mode 100644 index 0000000..3e13eb1 --- /dev/null +++ b/supplement_kg.py @@ -0,0 +1,117 @@ +import json +from typing import Dict, List, Any +import copy + + +class ExpenseProcessor: + def __init__(self): + pass + + @staticmethod + def calculate_parent_costs(node: Dict[str, Any]) -> List[Dict[str, Any]]: + if "children" not in node: + if "id" in node and "cost" in node: + return [{"id": node["id"], "cost": node["cost"]}] + elif "cost" in node: + return [{"cost": node["cost"]}] + return [] + + result_nodes = [] + processed_ids = set() + + for child in node["children"]: + child_costs = ExpenseProcessor.calculate_parent_costs(child) + + for cost_item in child_costs: + if "id" in cost_item: + found = False + for existing in result_nodes: + if "id" in existing and existing["id"] == cost_item["id"]: + existing["cost"] = str(float(existing["cost"]) + float(cost_item["cost"])) + found = True + break + if not found: + result_nodes.append(copy.deepcopy(cost_item)) + processed_ids.add(cost_item["id"]) + else: + found = False + for existing in result_nodes: + if "id" not in existing: + existing["cost"] = str(float(existing["cost"]) + float(cost_item["cost"])) + found = True + break + if not found: + result_nodes.append(copy.deepcopy(cost_item)) + + return result_nodes + + @staticmethod + def process_node(node: Dict[str, Any]) -> Dict[str, Any]: + result = copy.deepcopy(node) + if "children" not in node or not node["children"]: + return result + cost_items = ExpenseProcessor.calculate_parent_costs(node) + if cost_items: + result["sum"] = cost_items + result["children"] = [ExpenseProcessor.process_node(child) for child in node["children"]] + return result + + @staticmethod + def process_expense_preview(expense_preview: Dict[str, Any]) -> Dict[str, Any]: + result = copy.deepcopy(expense_preview) + for category_key, category_value in expense_preview.items(): + for subcategory_key, subcategory_value in category_value.items(): + if isinstance(subcategory_value, list): + result[category_key][subcategory_key] = [ + ExpenseProcessor.process_node(item) for item in subcategory_value + ] + return result + + @classmethod + def load_and_process_from_file(cls, input_path: str, output_path: str = None) -> Dict[str, Any]: + """ + 从文件加载 JSON 并处理 + :param input_path: 输入文件路径 + :param output_path: 输出文件路径(可选) + :return: 处理后的完整数据 + """ + with open(input_path, "r", encoding="utf-8") as f: + data = json.load(f) + + if "projectData" in data and "expensePreview" in data["projectData"]: + processed_data = copy.deepcopy(data) + processed_data["projectData"]["expensePreview"] = cls.process_expense_preview( + data["projectData"]["expensePreview"] + ) + + if output_path: + with open(output_path, "w", encoding="utf-8") as f: + json.dump(processed_data, f, ensure_ascii=False, indent=4) + print(f"处理完成,结果已保存到 {output_path}") + return processed_data + else: + raise ValueError("未找到 projectData.expensePreview 路径") + + @classmethod + def process_raw_data(cls, raw_data: Dict[str, Any]) -> Dict[str, Any]: + """ + 直接处理原始数据(不涉及文件读写) + :param raw_data: 原始数据,格式应包含 projectData.expensePreview + :return: 处理后的数据 + """ + if "projectData" in raw_data and "expensePreview" in raw_data["projectData"]: + processed_data = copy.deepcopy(raw_data) + processed_data["projectData"]["expensePreview"] = cls.process_expense_preview( + raw_data["projectData"]["expensePreview"] + ) + return processed_data + else: + raise ValueError("未找到 projectData.expensePreview 路径") + + +if __name__ == "__main__": + input_file = "架空_clean.json" # 输入 JSON 文件路径 + output_file = "output.json" # 输出 JSON 文件路径 + + # 使用类方法加载并处理 JSON 文件 + ExpenseProcessor.load_and_process_from_file(input_file, output_file)