From 18e6c193b66b86220c1003b18a94f3161a65109d Mon Sep 17 00:00:00 2001 From: chentianrui Date: Tue, 3 Jun 2025 14:50:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=20project=5Fimplementation.p?= =?UTF-8?q?y?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- project_implementation.py | 957 -------------------------------------- 1 file changed, 957 deletions(-) delete mode 100644 project_implementation.py diff --git a/project_implementation.py b/project_implementation.py deleted file mode 100644 index d5c85ef..0000000 --- a/project_implementation.py +++ /dev/null @@ -1,957 +0,0 @@ -from neo4j import GraphDatabase -from project import * -import atexit - - -class ProjectTookiItNeo4j(ProjectTookiIt): - """ - 基于Neo4j数据库的项目类实现 - """ - - def __init__(self): - """ - 初始化Neo4j连接 - - Args: - uri (str): Neo4j数据库URI - user (str): 用户名 - password (str): 密码 - """ - uri = "bolt://172.20.0.145:7687" - user = "neo4j" - password = "password" - - super().__init__() - self.driver = GraphDatabase.driver(uri, auth=(user, password)) - self.session = self.driver.session() - - # 初始化其他必要的数据结构 - self.material_equipment_dict = {} # 材机字典,键为ID - self.fee_templates = {} # 取费表模板字典,键为ID - self.fee_schedules = {} # 费用表字典,键为ID - self.project_properties = {} # 工程属性字典 - - def close(self): - """ - 关闭数据库连接 - """ - if self.session: - self.session.close() - if self.driver: - self.driver.close() - - # 通用节点查询方法 - def get_node_by_path(self, path, node_labels=None): - """ - 通过路径获取节点对象 - - Args: - path (str): 以'/'分隔的多级节点路径 - node_labels (list): 节点标签列表,用于过滤结果 - - Returns: - dict|None: 节点数据,如果路径不存在返回None - """ - if not path: - return None - - # 分割路径为各个部分 - path_parts = path.split("/") - - # 构建查询 - if len(path_parts) == 1: - # 只有一级路径,直接查询 - if node_labels: - labels_str = ":" + "|:".join(node_labels) - query = f""" - MATCH (n{labels_str}) - WHERE n.name = $name - RETURN n LIMIT 1 - """ - else: - query = """ - MATCH (n) - WHERE n.name = $name - RETURN n LIMIT 1 - """ - params = {"name": path_parts[0]} - else: - # 多级路径,构建路径查询 - last_part = path_parts[-1] - if node_labels: - labels_str = ":" + "|:".join(node_labels) - query = f""" - MATCH path = (root)-[*]->(target{labels_str}) - WHERE target.name = $last_part - RETURN target as n LIMIT 1 - """ - else: - query = """ - MATCH path = (root)-[*]->(target) - WHERE target.name = $last_part - RETURN target as n LIMIT 1 - """ - params = {"last_part": last_part} - - try: - result = self.session.run(query, **params) - record = result.single() - - if not record: - return None - - return record["n"] - except Exception as e: - print(f"获取节点对象时出错: {e}") - return None - - # 项目划分查询方法 - def get_division_item_by_path(self, path): - """ - 通过路径获取项目划分对象 - """ - node_data = self.get_node_by_path(path, ["ProjectDivisionItem"]) - if not node_data: - return None - - item = ProjectDivisionItem() - for key, value in node_data.items(): - if hasattr(item, key): - setattr(item, key, value) - - return item - - def get_division_node_by_parent_and_name(self, parent_path, partial_name): - """ - 通过父节点路径和模糊节点名称获取项目划分对象,包括子节点 - - Args: - parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 - partial_name (str): 目标节点的模糊或不完整名称 - - Returns: - list: 包含所有匹配项目划分节点的列表,如果没有匹配返回空列表 - """ - if not partial_name: - return [] - - # 使用通用方法获取父节点 - parent_node_data = self.get_node_by_path(parent_path, ["ProjectDivisionItem"]) - parent_node_id = parent_node_data["id"] if parent_node_data and "id" in parent_node_data else None - - # 构建查询,根据是否有父节点ID调整查询条件,使用递归关系查询 - if parent_node_id: - query = """ - MATCH (p)-[:CONTAINS|HAS|RELATED_TO*]-(n:ProjectDivisionItem) - WHERE p.id = $parent_id AND n.name CONTAINS $partial_name - RETURN n LIMIT 50 - """ - params = {"parent_id": parent_node_id, "partial_name": partial_name} - else: - query = """ - MATCH (n:ProjectDivisionItem) - WHERE n.name CONTAINS $partial_name - RETURN n LIMIT 50 - """ - params = {"partial_name": partial_name} - - try: - result = self.session.run(query, **params) - - items = [] - for record in result: - node_data = record["n"] - item = ProjectDivisionItem() - for key, value in node_data.items(): - if hasattr(item, key): - setattr(item, key, value) - items.append(item) - - return items - except Exception as e: - print(f"通过父节点路径和模糊名称获取项目划分对象时出错: {e}") - return [] - - # 工程量查询方法 - def get_quantities_by_paths(self, paths_str): - """ - 获取指定项目路径下的工程量对象 - - Args: - paths_str (str): 以'/'分隔的多级节点路径 - - Returns: - ProjectQuantity|None: 对应的工程量对象,如果路径不存在返回None,成功找到返回工程量对象 - """ - if not paths_str: - return None - - # 使用通用方法获取节点,考虑所有可能的工程量类型 - node_data = self.get_node_by_path(paths_str, ["ProjectQuantity", "Quota", "MainMaterial", "Equipment"]) - - if not node_data: - return None - - # 根据节点标签或类型属性创建对应类型的对象 - quantity = self._create_quantity_object(node_data) - - # 填充属性 - for key, value in node_data.items(): - if hasattr(quantity, key): - setattr(quantity, key, value) - - return quantity - - def get_quantities_node_by_parent_and_code(self, parent_path, quantity_type=None, code=None): - """ - 通过父节点路径和编码获取工程量对象(定额),包括子节点 - - Args: - parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 - quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None表示所有类型) - code(str): 工程量编码,以'/'分隔的多个编码 - - Returns: - list: 包含所有匹配节点的列表,如果没有匹配返回空列表 - """ - if not code: - return [] - - # 使用通用方法获取父节点 - parent_node_data = self.get_node_by_path(parent_path) - - # 从路径中获取父节点名称 - path_parts = parent_path.split("/") - parent_name = path_parts[-1] - - # 处理编码,可能有多个编码用/分隔 - code_parts = code.split("/") - code_conditions = [] - for code_part in code_parts: - if code_part: - - code_conditions.append(f"q.编码 = '{code_part}'") - - if not code_conditions: - return [] - - code_query = " OR ".join(code_conditions) - - # 根据工程量类型确定标签 - node_labels = [] - if quantity_type == "定额": - node_labels = ["ProjectQuantity", "Quota"] - elif quantity_type == "主材": - node_labels = ["ProjectQuantity", "MainMaterial"] - elif quantity_type == "设备": - node_labels = ["ProjectQuantity", "Equipment"] - else: - node_labels = ["ProjectQuantity"] - - # 构建标签字符串 - labels_str = ":" + ":".join(node_labels) if node_labels else "" - - # 使用name属性进行匹配 - query = f""" - MATCH (p)-[*1..5]->(q{labels_str}) - WHERE p.name = $parent_name AND ({code_query}) - RETURN q - LIMIT 10 - """ - params = {"parent_name": parent_name} - - try: - result = self.session.run(query, params) - quantities = [] - - for record in result: - node_data = record["q"] - quantity = self._create_quantity_object(node_data, quantity_type) - - # 将节点属性赋值到对象 - for key, value in node_data.items(): - setattr(quantity, key, value) - - # 转换为字典 - if hasattr(quantity, "to_dict"): - quantities.append(quantity.to_dict()) - else: - # 如果没有 to_dict 方法,就用 vars() 动态获取属性 - quantities.append(vars(quantity)) - - return quantities - except Exception as e: - print(f"通过编码获取工程量对象时出错: {e}") - import traceback - - traceback.print_exc() - return [] - - def get_quantities_node_by_parent_and_name(self, parent_path, partial_name, quantity_type=None): - """ - 通过父节点路径、模糊节点名称和类型获取工程量对象(主材或者设备),包括子节点 - - Args: - parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 - partial_name (str): 目标节点的模糊或不完整名称 - quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None表示所有类型) - - Returns: - list: 包含所有匹配节点的列表,如果没有匹配返回空列表 - """ - if not partial_name: - return [] - - # 调试输出 - print(f"搜索父路径: {parent_path}") - print(f"搜索名称: {partial_name}") - print(f"搜索类型: {quantity_type}") - - # 使用通用方法获取父节点 - parent_node_data = self.get_node_by_path(parent_path) - - if parent_node_data: - print(f"找到父节点: {parent_node_data}") - else: - print("未找到父节点") - - parent_node_id = parent_node_data["id"] if parent_node_data and "id" in parent_node_data else None - - # 根据工程量类型确定标签和类型条件 - node_labels = [] - type_condition = "" - - if quantity_type == "定额": - node_labels = ["ProjectQuantity", "Quota"] - type_condition = "q.类型 = '0'" - elif quantity_type == "主材": - node_labels = ["ProjectQuantity", "MainMaterial"] - type_condition = "q.类型 = '1'" - elif quantity_type == "设备": - node_labels = ["ProjectQuantity", "Equipment"] - type_condition = "q.类型 = '5'" - else: - node_labels = ["ProjectQuantity"] - - # 构建标签字符串 - labels_str = ":" + ":".join(node_labels) if node_labels else "" - - # 扩展关系类型 - relationship_types = "CONTAINS|HAS|RELATED_TO|USES|BELONGS_TO" - - # 构建查询 - 使用递归关系查询 - if parent_node_id: - query = f""" - MATCH (p)-[:{relationship_types}*1..10]->(q{labels_str}) - WHERE p.id = $parent_id AND q.name CONTAINS $partial_name - {f'AND {type_condition}' if type_condition else ''} - RETURN q LIMIT 50 - """ - params = {"parent_id": parent_node_id, "partial_name": partial_name} - print(f"执行查询: {query}") - print(f"参数: {params}") - else: - query = f""" - MATCH (q{labels_str}) - WHERE q.name CONTAINS $partial_name - {f'AND {type_condition}' if type_condition else ''} - RETURN q LIMIT 50 - """ - params = {"partial_name": partial_name} - print(f"执行全局查询: {query}") - - try: - result = self.session.run(query, **params) - - quantities = [] - for record in result: - node_data = record["q"] - print(f"找到节点: {node_data}") - - # 创建对应类型的对象 - quantity = self._create_quantity_object(node_data, quantity_type) - - # 填充属性 - for key, value in node_data.items(): - if hasattr(quantity, key): - setattr(quantity, key, value) - - quantities.append(quantity) - - print(f"查询结果数量: {len(quantities)}") - return quantities - except Exception as e: - print(f"通过名称获取工程量对象时出错: {e}") - import traceback - - traceback.print_exc() - return [] - - # 辅助方法,用于根据节点数据创建对应类型的工程量对象 - def _create_quantity_object(self, node_data, quantity_type=None): - """ - 根据节点数据创建对应类型的工程量对象 - - Args: - node_data (dict): 节点数据 - quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None) - - Returns: - ProjectQuantity: 创建的工程量对象 - """ - # 如果指定了类型,直接创建对应类型的对象 - if quantity_type == "定额": - return Ration() - elif quantity_type == "主材": - return Material() - elif quantity_type == "设备": - return Equipment() - - # 如果没有指定类型,尝试通过节点属性或标签判断 - if "类型" in node_data: - if node_data["类型"] == "0": - return Ration() - elif node_data["类型"] == "1": - return Material() - elif node_data["类型"] == "5": - return Equipment() - - # 通过标签判断 - labels = list(node_data.labels) if hasattr(node_data, "labels") else [] - if "Quota" in labels: - return Ration() - elif "MainMaterial" in labels: - return Material() - elif "Equipment" in labels: - return Equipment() - - # 默认返回基类对象 - return ProjectQuantity() - - # 材机查询方法实现 - def get_material_equipment_by_path(self, paths_str): - """ - 通过路径获取材机对象 - - Args: - paths_str (str): 以'/'分隔的多级项目划分名称路径 - - Returns: - list: 包含所有匹配的材机对象的列表 - """ - if not paths_str: - return [] - - # 使用通用方法获取节点 - node_data = self.get_node_by_path(paths_str) - node_id = node_data["id"] if node_data and "id" in node_data else None - - if not node_id: - return [] - - # 查询与该节点关联的所有材机对象 - query = """ - MATCH (p)-[r]-(m) - WHERE p.id = $node_id AND (m:MaterialOrEquipment OR m:Material OR m:Equipment) - RETURN m LIMIT 50 - """ - params = {"node_id": node_id} - - try: - result = self.session.run(query, **params) - - materials = [] - for record in result: - material = self._create_material_object(record["m"]) - materials.append(material) - - # 更新缓存 - if hasattr(material, "id") and material.id: - self.material_equipment_dict[material.id] = material - - return materials - except Exception as e: - print(f"通过路径获取材机对象时出错: {e}") - return [] - - def get_material_equipment_by_parent_and_name(self, parent_path, partial_name): - """ - 通过父节点路径和模糊名称获取材机对象 - - Args: - parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 - partial_name (str): 目标节点的模糊或不完整名称 - - Returns: - list: 包含所有匹配的材机对象的列表 - """ - if not partial_name: - return [] - - # 使用通用方法获取父节点 - parent_node_data = self.get_node_by_path(parent_path) - parent_node_id = parent_node_data["id"] if parent_node_data and "id" in parent_node_data else None - - # 构建查询,根据是否有父节点ID调整查询条件 - if parent_node_id: - # 如果找到了父节点,查找与父节点有关系的材机节点 - query = """ - MATCH (p)-[:CONTAINS|HAS|USES|RELATED_TO]-(m) - WHERE p.id = $parent_id AND m.name CONTAINS $partial_name - AND (m:MaterialOrEquipment OR m:Material OR m:Equipment) - RETURN m LIMIT 20 - """ - params = {"parent_id": parent_node_id, "partial_name": partial_name} - else: - # 如果没有找到父节点或没有提供父节点路径,只按名称查询 - query = """ - MATCH (m) - WHERE m.name CONTAINS $partial_name - AND (m:MaterialOrEquipment OR m:Material OR m:Equipment) - RETURN m LIMIT 20 - """ - params = {"partial_name": partial_name} - - try: - result = self.session.run(query, **params) - - materials = [] - for record in result: - material = self._create_material_object(record["m"]) - materials.append(material) - - # 更新缓存 - if hasattr(material, "id") and material.id: - self.material_equipment_dict[material.id] = material - - return materials - except Exception as e: - print(f"通过父节点路径和模糊名称获取材机对象时出错: {e}") - return [] - - # 辅助方法,用于创建材机对象并填充属性 - def _create_material_object(self, node_data): - """ - 根据节点数据创建材机对象并填充属性 - - Args: - node_data (dict): 节点数据 - - Returns: - MaterialOrEquipment: 创建的材机对象 - """ - material = MaterialOrEquipment() - - # 填充属性 - for key, value in node_data.items(): - if hasattr(material, key): - setattr(material, key, value) - - return material - - # 取费表模板查询方法实现 - def get_fee_template_by_path(self, paths_str): - """ - 通过路径获取取费表模板 - - Args: - paths_str (str): 以'/'分隔的多级项目划分名称路径 - - Returns: - list: 包含所有匹配的取费表模板对象的列表 - """ - if not paths_str: - return [] - - # 使用通用方法获取节点 - node_data = self.get_node_by_path(paths_str) - node_id = node_data["id"] if node_data and "id" in node_data else None - - if not node_id: - return [] - - # 查询与该节点关联的所有取费表模板 - query = """ - MATCH (p)-[r]-(t) - WHERE p.id = $node_id AND (t:FeeTableTemplate OR t.type = 'FeeTableTemplate') - RETURN t LIMIT 20 - """ - params = {"node_id": node_id} - - try: - result = self.session.run(query, **params) - - templates = [] - for record in result: - template = self._create_fee_template_object(record["t"]) - templates.append(template) - - return templates - except Exception as e: - print(f"通过路径获取取费表模板时出错: {e}") - return [] - - def get_fee_template_by_parent_and_name(self, parent_path, partial_name): - """ - 通过父节点路径和模糊名称获取取费表模板 - - Args: - parent_path (str): 父节点的路径,以'/'分隔的多级节点路径 - partial_name (str): 目标节点的模糊或不完整名称 - - Returns: - list: 包含所有匹配的取费表模板对象的列表 - """ - if not partial_name: - return [] - - # 使用通用方法获取父节点 - parent_node_data = self.get_node_by_path(parent_path) - parent_node_id = parent_node_data["id"] if parent_node_data and "id" in parent_node_data else None - - # 构建查询,根据是否有父节点ID调整查询条件 - if parent_node_id: - query = """ - MATCH (p)-[:CONTAINS|HAS|USES|RELATED_TO]-(t) - WHERE p.id = $parent_id AND t.name CONTAINS $partial_name - AND (t:FeeTableTemplate OR t.type = 'FeeTableTemplate') - RETURN t LIMIT 20 - """ - params = {"parent_id": parent_node_id, "partial_name": partial_name} - else: - query = """ - MATCH (t) - WHERE t.name CONTAINS $partial_name - AND (t:FeeTableTemplate OR t.type = 'FeeTableTemplate') - RETURN t LIMIT 20 - """ - params = {"partial_name": partial_name} - - try: - result = self.session.run(query, **params) - - templates = [] - for record in result: - template = self._create_fee_template_object(record["t"]) - templates.append(template) - - return templates - except Exception as e: - print(f"通过父节点路径和模糊名称获取取费表模板时出错: {e}") - return [] - - # 辅助方法,用于创建取费表模板对象并填充属性 - def _create_fee_template_object(self, node_data): - """ - 根据节点数据创建取费表模板对象并填充属性 - - Args: - node_data (dict): 节点数据 - - Returns: - FeeTableTemplateItem: 创建的取费表模板对象 - """ - template = FeeTableTemplateItem() - - # 填充属性 - for key, value in node_data.items(): - if hasattr(template, key): - setattr(template, key, value) - - # 更新缓存 - if hasattr(template, "OutlayID") and template.OutlayID: - self.fee_templates[template.OutlayID] = template - - return template - - # 费用表查询方法实现 - def get_fee_schedule_on_auxiliary_expense_table(self, table_name, fee_name, fee): - """ - 在辅助费用表中查找费用 - - Args: - table_name (str): 费用表名称 - fee_name (str): 要查找的费用名称 - fee (str): 匹配的费用值属性名 - - Returns: - str: 匹配到的费用名称节点对应的费用值 - """ - if not table_name or not fee_name or not fee: - return None - - # 构建查询,查找辅助费用表中的特定费用 - 使用任意关系类型 - query = """ - MATCH (t:FeeScheduleItem)-[r]->(f:Fee) - WHERE t.name = $table_name AND f.name = $fee_name - RETURN f LIMIT 1 - """ - params = {"table_name": table_name, "fee_name": fee_name} - - try: - result = self.session.run(query, **params) - - all_records = result.data() - - if len(all_records) > 0: - fee_node = all_records[0]["f"] - value = fee_node.get(fee) - if value is not None: - return value - - return None - except Exception as e: - print(f"在辅助费用表中查找费用时出错: {e}") - return None - - def get_fee_schedule_on_other_expense_table(self, table_name, fee_name, fee): - """ - 在其它费用表中查找费用 - - Args: - table_name (str): 费用表名称 - fee_name (str): 要查找的费用名称 - fee (str): 匹配的费用值属性名 - - Returns: - str: 匹配到的费用名称节点对应的费用值 - """ - if not table_name or not fee_name or not fee: - return None - - # 构建查询,查找其它费用表中的特定费用 - 使用任意关系类型 - query = """ - MATCH (t:FeeScheduleItem)-[r]->(f:Fee) - WHERE t.name = $table_name AND f.name = $fee_name - RETURN f LIMIT 1 - """ - params = {"table_name": table_name, "fee_name": fee_name} - - try: - result = self.session.run(query, **params) - - all_records = result.data() - - if len(all_records) > 0: - fee_node = all_records[0]["f"] - value = fee_node.get(fee) - if value is not None: - return value - - return None - except Exception as e: - print(f"在其它费用表中查找费用时出错: {e}") - return None - - def get_fee_schedule_on_land_acquisition_fee_table_table(self, table_name, fee_name, fee): - """ - 在土地征用费表中查找费用 - - Args: - table_name (str): 费用表名称 - fee_name (str): 要查找的费用名称 - fee (str): 匹配的费用值属性名 - - Returns: - str: 匹配到的费用名称节点对应的费用值 - """ - if not table_name or not fee_name or not fee: - return None - - # 构建查询,查找土地征用费表中的特定费用 - 使用任意关系类型 - query = """ - MATCH (t:FeeScheduleItem)-[r]->(f:Fee) - WHERE t.name = $table_name AND f.name = $fee_name - RETURN f LIMIT 1 - """ - params = {"table_name": table_name, "fee_name": fee_name} - - try: - result = self.session.run(query, **params) - - all_records = result.data() - - if len(all_records) > 0: - fee_node = all_records[0]["f"] - value = fee_node.get(fee) - if value is not None: - return value - - return None - except Exception as e: - print(f"在土地征用费表中查找费用时出错: {e}") - return None - - def get_fee_schedule_on_installation_price_difference_table(self, table_name, fee_name, fee): - """ - 在安装价差表中查找费用 - - Args: - table_name (str): 费用表名称 - fee_name (str): 要查找的费用名称 - fee (str): 匹配的费用值属性名 - - Returns: - str: 匹配到的费用名称节点对应的费用值 - """ - if not table_name or not fee_name or not fee: - return None - - # 构建查询,查找安装价差表中的特定费用 - 使用任意关系类型 - query = """ - MATCH (t:FeeScheduleItem)-[r]->(f:Fee) - WHERE t.name = $table_name AND f.name = $fee_name - RETURN f LIMIT 1 - """ - params = {"table_name": table_name, "fee_name": fee_name} - - try: - result = self.session.run(query, **params) - - all_records = result.data() - - if len(all_records) > 0: - fee_node = all_records[0]["f"] - value = fee_node.get(fee) - if value is not None: - return value - - return None - except Exception as e: - print(f"在安装价差表中查找费用时出错: {e}") - return None - - def get_fee_schedule_on_Engineering_Cost_table(self, table_name, fee_name, fee): - """ - 在工程费用表中查找费用 - - Args: - table_name (str): 费用表名称 - fee_name (str): 要查找的费用名称 - fee (str): 匹配的费用值属性名 - - Returns: - str: 匹配到的费用名称节点对应的费用值 - """ - if not table_name or not fee_name or not fee: - return None - - # 调试输出 - print(f"查询费用表: {table_name}") - print(f"查询费用名称: {fee_name}") - print(f"查询费用属性: {fee}") - - # 构建查询,使用递归关系查询,并扩展关系类型 - query = """ - MATCH (t:FeeScheduleItem)-[r*1..5]->(f:Fee) - WHERE t.name = $table_name AND f.name = $fee_name - RETURN f LIMIT 10 - """ - params = {"table_name": table_name, "fee_name": fee_name} - - print(f"执行查询: {query}") - print(f"参数: {params}") - - try: - result = self.session.run(query, **params) - all_records = result.data() - - print(f"查询结果数量: {len(all_records)}") - - if len(all_records) > 0: - fee_node = all_records[0]["f"] - print(f"找到费用节点: {fee_node}") - - # 获取节点的所有属性,用于调试 - for key, value in fee_node.items(): - print(f"属性: {key} = {value}") - - value = fee_node.get(fee) - if value is not None: - print(f"找到费用值: {value}") - return value - else: - print(f"节点中没有属性: {fee}") - - # 如果没有找到,尝试另一种查询方式,使用CONTAINS进行模糊匹配 - print("尝试使用模糊匹配...") - backup_query = """ - MATCH (t:FeeScheduleItem)-[r*1..5]->(f:Fee) - WHERE t.name CONTAINS $table_name AND f.name CONTAINS $fee_name - RETURN f LIMIT 10 - """ - backup_result = self.session.run(backup_query, **params) - backup_records = backup_result.data() - - print(f"模糊匹配结果数量: {len(backup_records)}") - - if len(backup_records) > 0: - fee_node = backup_records[0]["f"] - print(f"找到费用节点(模糊匹配): {fee_node}") - - # 获取节点的所有属性,用于调试 - for key, value in fee_node.items(): - print(f"属性: {key} = {value}") - - value = fee_node.get(fee) - if value is not None: - print(f"找到费用值: {value}") - return value - else: - print(f"节点中没有属性: {fee}") - - return None - except Exception as e: - print(f"在工程费用表中查找费用时出错: {e}") - import traceback - - traceback.print_exc() - return None - - -class ProjectBuilder: - """ - 项目构建器 - 描述: 用于构建项目对象的构建器 - """ - - _instance = None - - @staticmethod - def build(): - """ - 构建并返回项目实例 - - Returns: - ProjectTookiItNeo4j: 创建的项目实例 - """ - # 如果已经有实例,先关闭它 - if ProjectBuilder._instance is not None: - ProjectBuilder._instance.close() - - # 创建新实例 - - ProjectBuilder._instance = ProjectTookiItNeo4j() - - return ProjectBuilder._instance - - @staticmethod - def close(): - """ - 关闭当前项目实例的连接 - """ - if ProjectBuilder._instance is not None: - ProjectBuilder._instance.close() - ProjectBuilder._instance = None - - -# 注册退出处理函数,确保程序退出时自动关闭连接 -atexit.register(ProjectBuilder.close) - - -# project = ProjectBuilder.build() -# result = project.get_quantities_node_by_parent_and_code( -# "工程数据/安装工程/安装/架空输电线路本体工程/基础工程", "定额", "YX2-1/YX2-2/YX2-3/YX2-6/YX2-7" -# ) - -# print(result)