From 36f298fe07509e45d760c9537f0a7e5d7b97b89a Mon Sep 17 00:00:00 2001 From: chentianrui Date: Tue, 24 Jun 2025 18:38:40 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=88=A0=E9=99=A4=20build=5Fkg.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build_kg.py | 993 ---------------------------------------------------- 1 file changed, 993 deletions(-) delete mode 100644 build_kg.py diff --git a/build_kg.py b/build_kg.py deleted file mode 100644 index ea7ce23..0000000 --- a/build_kg.py +++ /dev/null @@ -1,993 +0,0 @@ -from py2neo import Graph, Node, Relationship, NodeMatcher -import json -import os -import logging - -# 设置日志 -logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") -logger = logging.getLogger(__name__) - -# 连接到Neo4j数据库 -uri = "bolt://172.20.0.145:7687" -user = "neo4j" -password = "password" - -try: - graph = Graph(uri, auth=(user, password)) - logger.info("成功连接到Neo4j数据库") -except Exception as e: - logger.error(f"连接Neo4j数据库失败: {e}") - exit(1) - -# 清空数据库 -try: - graph.run("MATCH (n) DETACH DELETE n") - logger.info("已清空数据库") -except Exception as e: - logger.error(f"清空数据库失败: {e}") - -# 删除所有约束 -try: - # 获取所有约束 - constraints = graph.run("SHOW CONSTRAINTS").data() - for constraint in constraints: - constraint_name = constraint.get("name") - if constraint_name: - graph.run(f"DROP CONSTRAINT {constraint_name}") - logger.info(f"已删除约束: {constraint_name}") -except Exception as e: - logger.warning(f"删除约束失败: {e}") - - -# 创建约束和索引以提高性能 - 现在不创建任何约束 -def create_constraints_and_indexes(): - # 不创建任何约束 - logger.info("跳过创建约束") - pass - - -# 创建根节点 -def create_root_node(): - root = Node("EngineeringData", name="工程") - graph.create(root) - logger.info("创建根节点: 工程") - return root - - -# 处理ProjectDivisionSet -def process_project_division_set(data, root_node): - # 根据您提供的JSON结构,正确访问projectDivision数据 - if "projectData" in data and "projectDivision" in data["projectData"]: - project_division = data["projectData"]["projectDivision"] - elif "projectDivision" in data: - project_division = data["projectDivision"] - else: - logger.warning("JSON中未找到projectDivision数据") - logger.info(f"JSON顶层键: {list(data.keys())}") - return - - logger.info(f"开始处理projectDivision,包含 {len(project_division)} 个顶级项目") - - # 创建新的ProjectDivisionSet节点 - 项目划分集 - division_set = Node("ProjectDivisionSet", name="项目划分集") - graph.create(division_set) - graph.create(Relationship(root_node, "CONTAINS", division_set)) - logger.info(f"创建ProjectDivisionSet节点: 项目划分集") - - # 处理ProjectDivisionTree - for first_level_name, first_level_content in project_division.items(): - # 处理第一层下的内容,直接创建合并后的ProjectDivisionTree节点 - if isinstance(first_level_content, dict): - # 处理一级名称,去掉"工程"字样 - processed_first_level = first_level_name.replace("工程", "") - - for second_level_name, second_level_content in first_level_content.items(): - # 确定最终节点名称 - if second_level_name == processed_first_level: - # 如果二级名称与处理后的一级名称相同,直接使用二级名称 - final_name = second_level_name - else: - # 否则组合二级名称和处理后的一级名称 - final_name = f"{second_level_name}{processed_first_level}" - - # 创建ProjectDivisionTree节点 - division_tree = Node("ProjectDivisionTree", name=final_name) - - # 保存原始名称作为属性 - division_tree["original_first_level"] = first_level_name - division_tree["original_second_level"] = second_level_name - - # 如果有GUID,添加到节点属性 - guid = None - if isinstance(first_level_content, dict) and "GUID" in first_level_content: - division_tree["first_level_GUID"] = first_level_content["GUID"] - guid = first_level_content["GUID"] - - graph.create(division_tree) - graph.create(Relationship(division_set, "CONTAINS", division_tree)) - logger.info(f"创建ProjectDivisionTree节点: {final_name}") - - # 如果有GUID,尝试建立与CostSet的USE关系 - if guid: - # 查找对应的CostSet节点 - cost_set_query = f""" - MATCH (c:CostSet) - WHERE c.GUID = '{guid}' - RETURN c - """ - cost_set_nodes = list(graph.run(cost_set_query)) - if cost_set_nodes: - cost_set_node = cost_set_nodes[0]["c"] - graph.create(Relationship(division_tree, "USE", cost_set_node)) - logger.info(f"创建关系: {final_name} USE CostSet (GUID: {guid})") - - # 处理第二层下的ProjectDivisionItem列表 - if isinstance(second_level_content, list): - logger.info(f"ProjectDivisionTree {final_name} 包含 {len(second_level_content)} 个列表项") - for item in second_level_content: - process_project_division_item(item, division_tree) - else: - logger.warning(f"ProjectDivisionTree {final_name} 的内容类型未知: {type(second_level_content)}") - else: - logger.warning(f"第一层 {first_level_name} 的内容类型未知: {type(first_level_content)}") - - -# 处理ProjectDivisionItem -def process_project_division_item(item, parent_node): - # 提取必要属性 - guid = item.get("GUID", "") - name = item.get("项目名称", "") - - if not guid and not name: - logger.warning("ProjectDivisionItem缺少GUID和项目名称") - return - - # 创建ProjectDivisionItem节点 - item_node = Node("ProjectDivisionItem", GUID=guid, name=name) - - # 添加path属性,表示从ProjectDivisionItem到ProjectDivisionTree的路径 - if isinstance(parent_node, Node) and "ProjectDivisionTree" in parent_node.labels: - # 如果父节点是ProjectDivisionTree,使用"父节点名称/当前节点名称"作为路径 - item_node["path"] = f"{parent_node['name']}/{name}" - logger.info(f"为ProjectDivisionItem {name} 设置path: {item_node['path']}") - else: - # 如果父节点是ProjectDivisionItem,使用"父节点path/当前节点名称"作为路径 - parent_path = parent_node.get("path", "") - if parent_path: - item_node["path"] = f"{parent_path}/{name}" - else: - # 如果父节点没有path属性(不应该发生,但为了健壮性) - item_node["path"] = name - logger.info(f"为ProjectDivisionItem {name} 设置path: {item_node['path']}") - - # 添加其他属性 - for key, value in item.items(): - if key not in ["GUID", "项目名称", "children"] and value is not None: - # 检查是否为资源库列表 - if key == "资源库列表" and isinstance(value, list): - # 将资源库列表转换为分号分隔的字符串 - resource_names = [] - for resource in value: - if isinstance(resource, dict) and "资源库名称" in resource: - resource_names.append(resource["资源库名称"]) - item_node["资源库名称"] = ";".join(resource_names) - logger.info(f"将资源库列表转换为字符串: {item_node['资源库名称']}") - # 检查值是否为基本类型 - elif isinstance(value, (str, int, float, bool)): - item_node[key] = value - # 如果是列表,尝试转换为分号分隔的字符串 - elif isinstance(value, list): - try: - if all(isinstance(x, (str, int, float, bool)) for x in value): - item_node[key] = ";".join(str(x) for x in value) - else: - # 对于包含复杂对象的列表,尝试提取关键信息 - extracted_values = [] - for item_in_list in value: - if isinstance(item_in_list, dict): - # 尝试提取字典中的名称或标识符 - for name_key in ["名称", "name", "标识", "id", "ID"]: - if name_key in item_in_list: - extracted_values.append(str(item_in_list[name_key])) - break - else: - # 如果没有找到名称键,使用第一个键值对 - if item_in_list: - first_key = next(iter(item_in_list)) - extracted_values.append(f"{first_key}:{item_in_list[first_key]}") - else: - extracted_values.append(str(item_in_list)) - item_node[key] = ";".join(extracted_values) - except Exception as e: - logger.warning(f"无法将列表属性 {key} 转换为字符串: {e}") - # 如果是字典,尝试转换为字符串 - elif isinstance(value, dict): - try: - # 提取字典中的关键信息 - extracted_info = [] - for dict_key, dict_value in value.items(): - if isinstance(dict_value, (str, int, float, bool)): - extracted_info.append(f"{dict_key}:{dict_value}") - item_node[key] = ";".join(extracted_info) - except Exception as e: - logger.warning(f"无法将字典属性 {key} 转换为字符串: {e}") - - graph.create(item_node) - logger.info(f"创建ProjectDivisionItem节点: {name} (GUID: {guid})") - - # 创建与父节点的关系 - if isinstance(parent_node, Node) and "ProjectDivisionTree" in parent_node.labels: - graph.create(Relationship(parent_node, "CONTAINS", item_node)) - logger.info(f"创建关系: {parent_node['name']} CONTAINS {name}") - else: - graph.create(Relationship(parent_node, "HAS_CHILD", item_node)) - logger.info(f"创建关系: {parent_node['name']} HAS_CHILD {name}") - - # 如果有GUID,尝试建立与CostSet的USE关系 - if guid: - # 查找对应的CostSet节点 - cost_set_query = f""" - MATCH (c:CostSet) - WHERE c.GUID = '{guid}' - RETURN c - """ - cost_set_nodes = list(graph.run(cost_set_query)) - if cost_set_nodes: - cost_set_node = cost_set_nodes[0]["c"] - graph.create(Relationship(item_node, "USE", cost_set_node)) - logger.info(f"创建关系: {name} USE CostSet (GUID: {guid})") - - # 处理子项 - if "children" in item and item["children"]: - children = item["children"] - logger.info(f"ProjectDivisionItem {name} 有 {len(children)} 个子项") - - for child in children: - child_type = child.get("type", child.get("类型", "")) - - if child_type == "项目划分": - # 递归处理子ProjectDivisionItem - process_project_division_item(child, item_node) - elif child_type == "8" or child_type == "清单": - # 处理List类型节点 - process_list_item(child, item_node) - else: - # 处理ProjectQuantity及其子类 - process_project_quantity(child, item_node) - - -# 处理List及其子类 -def process_list_item(list_item, parent_node): - """处理清单类型的节点""" - # 提取必要属性 - guid = list_item.get("GUID", "") - list_name = list_item.get("清单名称") - list_type = list_item.get("类型", "") - - # 创建List节点 - list_node = Node("List", guid=guid, name=list_name, type=list_type) - - # 添加其他属性 - for key, value in list_item.items(): - if key not in ["清单名称" "类型", "guid", "children"] and value is not None: - # 检查值是否为基本类型 - if isinstance(value, (str, int, float, bool)): - list_node[key] = value - # 如果是列表,尝试转换为分号分隔的字符串 - elif isinstance(value, list): - try: - if all(isinstance(x, (str, int, float, bool)) for x in value): - list_node[key] = ";".join(str(x) for x in value) - else: - # 对于包含复杂对象的列表,尝试提取关键信息 - extracted_values = [] - for item_in_list in value: - if isinstance(item_in_list, dict): - # 尝试提取字典中的名称或标识符 - for name_key in ["名称", "name", "标识", "id", "ID"]: - if name_key in item_in_list: - extracted_values.append(str(item_in_list[name_key])) - break - else: - # 如果没有找到名称键,使用第一个键值对 - if item_in_list: - first_key = next(iter(item_in_list)) - extracted_values.append(f"{first_key}:{item_in_list[first_key]}") - else: - extracted_values.append(str(item_in_list)) - list_node[key] = ";".join(extracted_values) - except Exception as e: - logger.warning(f"无法将列表属性 {key} 转换为字符串: {e}") - # 如果是字典,尝试转换为字符串 - elif isinstance(value, dict): - try: - # 提取字典中的关键信息 - extracted_info = [] - for dict_key, dict_value in value.items(): - if isinstance(dict_value, (str, int, float, bool)): - extracted_info.append(f"{dict_key}:{dict_value}") - list_node[key] = ";".join(extracted_info) - except Exception as e: - logger.warning(f"无法将字典属性 {key} 转换为字符串: {e}") - - graph.create(list_node) - logger.info(f"创建List节点: {list_name} (类型: {list_type})") - - # 创建与父节点的关系 - graph.create(Relationship(parent_node, "HAS_CHILD", list_node)) - logger.info(f"创建关系: {parent_node['name']} HAS_CHILD {list_name}") - - # 如果有GUID,尝试建立与CostSet的USE关系 - if guid: - # 查找对应的CostSet节点 - cost_set_query = f""" - MATCH (c:CostSet) - WHERE c.GUID = '{guid}' - RETURN c - """ - cost_set_nodes = list(graph.run(cost_set_query)) - if cost_set_nodes: - cost_set_node = cost_set_nodes[0]["c"] - graph.create(Relationship(list_node, "USE", cost_set_node)) - logger.info(f"创建关系: {list_name} USE CostSet (GUID: {guid})") - - # 处理子项 - if "children" in list_item and list_item["children"]: - children = list_item["children"] - logger.info(f"List {list_name} 有 {len(children)} 个子项") - - for child in children: - # 确定子项类型 - child_type = child.get("type", child.get("类型", "")) - - if child_type == "项目划分": - # 递归处理子ProjectDivisionItem - process_project_division_item(child, list_node) - elif child_type == "8" or child_type == "清单": - # 递归处理子List - process_list_item(child, list_node) - else: - # 处理ProjectQuantity及其子类 - process_project_quantity(child, list_node) - - -# 处理ProjectQuantity及其子类 -def process_project_quantity(quantity, parent_node): - # 确定具体类型 - quantity_type = quantity.get("类型", "") - labels = ["ProjectQuantity"] - - # 支持数字和文本类型 - if quantity_type == "0" or quantity_type == "定额": - labels.append("Quota") - elif quantity_type == "1" or quantity_type == "主材": - labels.append("MainMaterial") - elif quantity_type == "5" or quantity_type == "设备": - labels.append("Equipment") - - # 创建节点 - quantity_id = quantity.get("id", "") - quantity_name = quantity.get("项目名称", quantity.get("名称", "")) - # guid = quantity.get("GUID", "") - - quantity_node = Node(*labels, id=quantity_id, name=quantity_name) - - # 添加其他属性 - for key, value in quantity.items(): - if key not in ["id", "名称", "项目名称", "材机列表", "children"] and value is not None: - # 检查是否为资源库列表 - if key == "资源库列表" and isinstance(value, list): - # 将资源库列表转换为分号分隔的字符串 - resource_names = [] - for resource in value: - if isinstance(resource, dict) and "资源库名称" in resource: - resource_names.append(resource["资源库名称"]) - quantity_node["资源库名称"] = ";".join(resource_names) - logger.info(f"将资源库列表转换为字符串: {quantity_node['资源库名称']}") - # 检查值是否为基本类型 - elif isinstance(value, (str, int, float, bool)): - quantity_node[key] = value - # 如果是列表,尝试转换为分号分隔的字符串 - elif isinstance(value, list): - try: - if all(isinstance(x, (str, int, float, bool)) for x in value): - quantity_node[key] = ";".join(str(x) for x in value) - else: - # 对于包含复杂对象的列表,尝试提取关键信息 - extracted_values = [] - for item_in_list in value: - if isinstance(item_in_list, dict): - # 尝试提取字典中的名称或标识符 - for name_key in ["名称", "name", "标识", "id", "ID"]: - if name_key in item_in_list: - extracted_values.append(str(item_in_list[name_key])) - break - else: - # 如果没有找到名称键,使用第一个键值对 - if item_in_list: - first_key = next(iter(item_in_list)) - extracted_values.append(f"{first_key}:{item_in_list[first_key]}") - else: - extracted_values.append(str(item_in_list)) - quantity_node[key] = ";".join(extracted_values) - except Exception as e: - logger.warning(f"无法将列表属性 {key} 转换为字符串: {e}") - # 如果是字典,尝试转换为字符串 - elif isinstance(value, dict): - try: - # 提取字典中的关键信息 - extracted_info = [] - for dict_key, dict_value in value.items(): - if isinstance(dict_value, (str, int, float, bool)): - extracted_info.append(f"{dict_key}:{dict_value}") - quantity_node[key] = ";".join(extracted_info) - except Exception as e: - logger.warning(f"无法将字典属性 {key} 转换为字符串: {e}") - - graph.create(quantity_node) - logger.info(f"创建ProjectQuantity节点: {quantity_name} (id: {quantity_id}, 类型: {quantity_type})") - - # 创建与父节点的关系 - graph.create(Relationship(parent_node, "HAS_COMPONENT", quantity_node)) - logger.info(f"创建关系: {parent_node['name']} HAS_COMPONENT {quantity_name}") - - # 处理材机列表或children - materials = None - - # 先检查是否有材机列表 - if "材机列表" in quantity and quantity["材机列表"]: - materials = quantity["材机列表"] - logger.info(f"ProjectQuantity {quantity_name} 有 {len(materials)} 个材机项") - - for material in materials: - process_material_or_equipment(material, quantity_node) - - # 如果没有材机列表,则检查是否有children - elif "children" in quantity and quantity["children"]: - children = quantity["children"] - logger.info(f"ProjectQuantity {quantity_name} 有 {len(children)} 个子项") - - for child in children: - child_type = child.get("类型", child.get("type", "")) - - # 如果子项类型为人工、材料或机械,则视为MaterialOrEquipment - if child_type in ["人工", "材料", "机械", "2", "3", "4"]: - process_material_or_equipment(child, quantity_node) - # 如果子项类型为主材、设备或定额,则递归处理为ProjectQuantity - elif child_type in ["1", "主材", "5", "设备", "0", "定额"]: - process_project_quantity(child, quantity_node) - - -# 处理MaterialOrEquipment -def process_material_or_equipment(material, parent_node): - material_id = material.get("id", material.get("ID", "")) - material_name = material.get("名称", "") - material_type = material.get("类型", material.get("type", "")) - - if not material_id and not material_name: - logger.warning("MaterialOrEquipment缺少id和名称") - return - - # 创建唯一标识,结合父节点的ID和当前项的id - parent_id = parent_node.get("id", parent_node.get("GUID", "")) - unique_id = f"{parent_id}_{material_id}" if parent_id else material_id - - # 直接创建新节点,不检查是否已存在 - material_node = Node( - "MaterialOrEquipment", id=material_id, unique_id=unique_id, name=material_name, type=material_type - ) - - # 添加其他属性 - for key, value in material.items(): - if key not in ["id", "ID", "名称", "类型", "type"] and value is not None: - # 检查是否为资源库列表 - if key == "资源库列表" and isinstance(value, list): - # 将资源库列表转换为分号分隔的字符串 - resource_names = [] - for resource in value: - if isinstance(resource, dict) and "资源库名称" in resource: - resource_names.append(resource["资源库名称"]) - material_node["资源库名称"] = ";".join(resource_names) - logger.info(f"将资源库列表转换为字符串: {material_node['资源库名称']}") - # 检查值是否为基本类型 - elif isinstance(value, (str, int, float, bool)): - material_node[key] = value - # 如果是列表,尝试转换为分号分隔的字符串 - elif isinstance(value, list): - try: - if all(isinstance(x, (str, int, float, bool)) for x in value): - material_node[key] = ";".join(str(x) for x in value) - else: - # 对于包含复杂对象的列表,尝试提取关键信息 - extracted_values = [] - for item_in_list in value: - if isinstance(item_in_list, dict): - # 尝试提取字典中的名称或标识符 - for name_key in ["名称", "name", "标识", "id", "ID"]: - if name_key in item_in_list: - extracted_values.append(str(item_in_list[name_key])) - break - else: - # 如果没有找到名称键,使用第一个键值对 - if item_in_list: - first_key = next(iter(item_in_list)) - extracted_values.append(f"{first_key}:{item_in_list[first_key]}") - else: - extracted_values.append(str(item_in_list)) - material_node[key] = ";".join(extracted_values) - except Exception as e: - logger.warning(f"无法将列表属性 {key} 转换为字符串: {e}") - # 如果是字典,尝试转换为字符串 - elif isinstance(value, dict): - try: - # 提取字典中的关键信息 - extracted_info = [] - for dict_key, dict_value in value.items(): - if isinstance(dict_value, (str, int, float, bool)): - extracted_info.append(f"{dict_key}:{dict_value}") - material_node[key] = ";".join(extracted_info) - except Exception as e: - logger.warning(f"无法将字典属性 {key} 转换为字符串: {e}") - - graph.create(material_node) - logger.info(f"创建MaterialOrEquipment节点: {material_name} (id: {material_id}, 类型: {material_type})") - - # 创建与父节点的关系 - graph.create(Relationship(parent_node, "OWNERSHIP", material_node)) - logger.info(f"创建关系: {parent_node['name']} OWNERSHIP {material_name}") - - -# 处理CostSet -def process_cost_set(data, root_node): - # 根据您提供的JSON结构,正确访问expensePreview数据 - expense_preview = None - - if "projectData" in data and "expensePreview" in data["projectData"]: - expense_preview = data["projectData"]["expensePreview"] - elif "expensePreview" in data: - expense_preview = data["expensePreview"] - else: - logger.warning("JSON中未找到expensePreview数据") - logger.info(f"JSON顶层键: {list(data.keys())}") - return - - logger.info("开始处理expensePreview") - - # 处理安装工程节点 - if "安装工程" in expense_preview: - install_cost_set = Node("CostSet", name="安装工程") - graph.create(install_cost_set) - # graph.create(Relationship(root_node, "HAS_COST_SET", install_cost_set)) - logger.info("创建CostSet节点: 安装工程") - - # 处理安装节点 - if "安装" in expense_preview["安装工程"]: - install_sub_cost_set = Node("CostSet", name="安装") - graph.create(install_sub_cost_set) - graph.create(Relationship(install_cost_set, "HAS_CHILD", install_sub_cost_set)) - logger.info("创建CostSet节点: 安装") - - # 处理安装下的CostSet列表 - for cost_set in expense_preview["安装工程"]["安装"]: - process_cost_set_recursive(cost_set, install_sub_cost_set) - - -# 递归处理CostSet -def process_cost_set_recursive(cost_set, parent_node): - guid = cost_set.get("GUID", "") - # 使用GUID作为名称,如果GUID为空,则尝试使用项目名称或name - name = guid if guid else cost_set.get("项目名称", cost_set.get("name", "未命名CostSet")) - - # 创建CostSet节点 - cost_set_node = Node("CostSet", GUID=guid, name=name) - - # 添加其他属性 - for key, value in cost_set.items(): - if key not in ["GUID", "children", "项目名称", "name"] and value is not None: - cost_set_node[key] = value - - graph.create(cost_set_node) - logger.info(f"创建CostSet节点: {name} (GUID: {guid})") - - # # 创建与父节点的关系 - # graph.create(Relationship(parent_node, "HAS_CHILD", cost_set_node)) - # logger.info(f"创建关系: {parent_node['name']} HAS_CHILD {name}") - - # 处理子项 - if "children" in cost_set and cost_set["children"]: - children = cost_set["children"] - logger.info(f"CostSet {name} 有 {len(children)} 个子项") - - for child in children: - if "GUID" in child: - # 递归处理子CostSet - process_cost_set_recursive(child, cost_set_node) - else: - # 处理CostItem - process_cost_item(child, cost_set_node) - - -# 处理CostItem -def process_cost_item(item, parent_node): - item_id = item.get("id", "") - cost = item.get("cost", "") - - # 使用id作为名称,如果id为空,则尝试使用name或项目名称 - name = item_id if item_id else item.get("name", item.get("项目名称", "未命名CostItem")) - - # 创建唯一标识,结合父节点的GUID和当前项的id - parent_guid = parent_node.get("GUID", "") - unique_id = f"{parent_guid}_{item_id}" if parent_guid else item_id - - # 直接创建新节点,不检查是否已存在 - item_node = Node("CostItem", id=item_id, unique_id=unique_id, cost=cost, name=name) - - # 添加其他属性 - for key, value in item.items(): - if key not in ["id", "cost", "name", "项目名称"] and value is not None: - # 检查是否为资源库列表 - if key == "资源库列表" and isinstance(value, list): - # 将资源库列表转换为分号分隔的字符串 - resource_names = [] - for resource in value: - if isinstance(resource, dict) and "资源库名称" in resource: - resource_names.append(resource["资源库名称"]) - item_node["资源库名称"] = ";".join(resource_names) - logger.info(f"将资源库列表转换为字符串: {item_node['资源库名称']}") - # 检查值是否为基本类型 - elif isinstance(value, (str, int, float, bool)): - item_node[key] = value - # 如果是列表,尝试转换为分号分隔的字符串 - elif isinstance(value, list): - try: - if all(isinstance(x, (str, int, float, bool)) for x in value): - item_node[key] = ";".join(str(x) for x in value) - else: - # 对于包含复杂对象的列表,尝试提取关键信息 - extracted_values = [] - for item_in_list in value: - if isinstance(item_in_list, dict): - # 尝试提取字典中的名称或标识符 - for name_key in ["名称", "name", "标识", "id", "ID"]: - if name_key in item_in_list: - extracted_values.append(str(item_in_list[name_key])) - break - else: - # 如果没有找到名称键,使用第一个键值对 - if item_in_list: - first_key = next(iter(item_in_list)) - extracted_values.append(f"{first_key}:{item_in_list[first_key]}") - else: - extracted_values.append(str(item_in_list)) - item_node[key] = ";".join(extracted_values) - except Exception as e: - logger.warning(f"无法将列表属性 {key} 转换为字符串: {e}") - # 如果是字典,尝试转换为字符串 - elif isinstance(value, dict): - try: - # 提取字典中的关键信息 - extracted_info = [] - for dict_key, dict_value in value.items(): - if isinstance(dict_value, (str, int, float, bool)): - extracted_info.append(f"{dict_key}:{dict_value}") - item_node[key] = ";".join(extracted_info) - except Exception as e: - logger.warning(f"无法将字典属性 {key} 转换为字符串: {e}") - - graph.create(item_node) - logger.info(f"创建CostItem节点: {name} (id: {item_id}, unique_id: {unique_id})") - - # 创建与父节点的关系 - graph.create(Relationship(parent_node, "CONTAINS", item_node)) - logger.info(f"创建关系: {parent_node['name']} CONTAINS {name}") - - -# 建立实体间的关系 -def establish_relationships(): - # 建立ProjectDivisionItem与CostSet的关系 - query_division_item = """ - MATCH (pdi:ProjectDivisionItem), (cs:CostSet) - WHERE pdi.GUID = cs.GUID AND pdi.GUID <> "" - CREATE (pdi)-[:USE]->(cs) - RETURN count(*) as count - """ - try: - result = graph.run(query_division_item) - count = result.data()[0]["count"] - logger.info(f"创建了 {count} 个 ProjectDivisionItem USE CostSet 关系") - except Exception as e: - logger.error(f"创建ProjectDivisionItem与CostSet关系失败: {e}") - - # 建立ProjectQuantity与CostSet的关系 - query_quantity = """ - MATCH (pq:ProjectQuantity), (cs:CostSet) - WHERE pq.id = cs.id AND pq.id <> "" - CREATE (pq)-[:USE]->(cs) - RETURN count(*) as count - """ - try: - result = graph.run(query_quantity) - count = result.data()[0]["count"] - logger.info(f"创建了 {count} 个 ProjectQuantity USE CostSet 关系") - except Exception as e: - logger.error(f"创建ProjectQuantity与CostSet关系失败: {e}") - - -# 处理取费表模板集(FeeTableTemplateSet) -def process_fee_table_template_set(data, root_node): - # 根据JSON结构,访问costSetting数据 - if "projectData" in data and "costSetting" in data["projectData"]: - cost_setting = data["projectData"]["costSetting"] - elif "costSetting" in data: - cost_setting = data["costSetting"] - else: - logger.warning("JSON中未找到costSetting数据") - logger.info(f"JSON顶层键: {list(data.keys())}") - return - - logger.info(f"开始处理costSetting,包含 {len(cost_setting)} 个取费表模板集") - - # 创建取费表模板集节点 - fee_template_set_node = Node("FeeTableTemplateSet", name="取费表模板集") - graph.create(fee_template_set_node) - graph.create(Relationship(root_node, "CONTAINS", fee_template_set_node)) - logger.info("创建FeeTableTemplateSet节点: 取费表模板集") - - # 处理每个取费表模板集 - for template_set_name, template_set_content in cost_setting.items(): - # 创建取费表模板集子节点 - template_set_node = Node("FeeTableTemplateSet", name=template_set_name) - if "TypeList" in template_set_content: - template_set_node["typeList"] = template_set_content["TypeList"] - - graph.create(template_set_node) - graph.create(Relationship(fee_template_set_node, "CONTAINS", template_set_node)) - logger.info(f"创建FeeTableTemplateSet子节点: {template_set_name}") - - # 处理取费表模板项 - if "tables" in template_set_content and isinstance(template_set_content["tables"], list): - for template_item in template_set_content["tables"]: - process_fee_table_template_item(template_item, template_set_node) - - -# 处理取费表模板项(FeeTableTemplateItem) -def process_fee_table_template_item(template_item, parent_node): - # 提取必要属性 - name = template_item.get("name", "") - outlay_id = template_item.get("OutlayID", "") - type_name = template_item.get("类型", "") - profession = template_item.get("专业", "") - - if not name: - logger.warning("FeeTableTemplateItem缺少name") - return - - # 创建取费表模板项节点 - template_item_node = Node( - "FeeTableTemplateItem", name=name, outlayID=outlay_id, type=type_name, profession=profession - ) - - graph.create(template_item_node) - graph.create(Relationship(parent_node, "CONTAINS", template_item_node)) - logger.info(f"创建FeeTableTemplateItem节点: {name} (OutlayID: {outlay_id})") - - # 处理取费项 - if "children" in template_item and isinstance(template_item["children"], list): - for fee_item in template_item["children"]: - process_fee(fee_item, template_item_node) - - -# 处理取费(Fee) -def process_fee(fee_item, parent_node): - # 提取必要属性 - serial_number = fee_item.get("序号", "") - fee_name = fee_item.get("费用名称", "") - code = fee_item.get("代码", "") - rate = fee_item.get("费率(%)", "") - base = fee_item.get("取费基数", "") - remark = fee_item.get("备注", "") - - if not fee_name: - logger.warning("Fee缺少费用名称") - return - - # 创建取费节点 - fee_node = Node("FeeCollection", serialNumber=serial_number, name=fee_name, code=code) - - # 添加其他属性 - if rate: - fee_node["rate"] = rate - if base: - fee_node["base"] = base - if remark: - fee_node["remark"] = remark - - graph.create(fee_node) - graph.create(Relationship(parent_node, "HAS_COMPONENT", fee_node)) - logger.info(f"创建Fee节点: {fee_name} (序号: {serial_number}, 代码: {code})") - - # 处理子费用项 - if "children" in fee_item and isinstance(fee_item["children"], list): - for child_fee in fee_item["children"]: - process_fee(child_fee, fee_node) - - -# 处理费用表集(FeeScheduleSet) -def process_fee_schedule_set(data, root_node): - """处理费用表集、费用表项和费用""" - # 检查projectCost是否存在 - if "projectData" in data and "projectCost" in data["projectData"]: - project_cost = data["projectData"]["projectCost"] - elif "projectCost" in data: - project_cost = data["projectCost"] - else: - logger.warning("JSON中未找到projectCost数据") - logger.info(f"JSON顶层键: {list(data.keys())}") - return - - logger.info(f"开始处理projectCost,包含 {len(project_cost)} 个费用表项") - - # 创建FeeScheduleSet节点 - 工程费用 - fee_schedule_set = Node("FeeScheduleSet", name="工程费用") - graph.create(fee_schedule_set) - graph.create(Relationship(root_node, "CONTAINS", fee_schedule_set)) - logger.info(f"创建FeeScheduleSet节点: 工程费用") - - # 处理费用表集下的费用表项 - for fee_table_name, fee_table_content in project_cost.items(): - # 创建FeeScheduleItem节点 - fee_schedule_item = Node("FeeScheduleItem", name=fee_table_name) - graph.create(fee_schedule_item) - graph.create(Relationship(fee_schedule_set, "CONTAINS", fee_schedule_item)) - logger.info(f"创建FeeScheduleItem节点: {fee_table_name}") - - # 处理费用表项下的费用列表 - if isinstance(fee_table_content, list): - logger.info(f"FeeScheduleItem {fee_table_name} 包含 {len(fee_table_content)} 个费用项") - for fee_item in fee_table_content: - process_fee_item(fee_item, fee_schedule_item) - else: - logger.warning(f"FeeScheduleItem {fee_table_name} 的内容类型未知: {type(fee_table_content)}") - - -# 处理费用项(Fee) -def process_fee_item(fee, parent_node): - """处理费用项""" - # 提取必要属性 - serial_number = fee.get("序号", "") - name = fee.get("费用名称", "") - code = fee.get("代码", "") - rate = fee.get("费率(%)", "") - amount = fee.get("金额", "") - - if not name: - logger.warning("Fee缺少费用名称") - return - - # 创建Fee节点 - fee_node = Node("Fee", serialNumber=serial_number, name=name, code=code) - - # 添加其他属性 - if rate: - fee_node["rate"] = rate - if amount: - fee_node["amount"] = amount - - # 添加其他属性 - for key, value in fee.items(): - if key not in ["序号", "费用名称", "代码", "费率(%)", "金额", "children"] and value is not None: - if isinstance(value, (str, int, float, bool)): - fee_node[key] = value - - graph.create(fee_node) - logger.info(f"创建Fee节点: {name} (序号: {serial_number})") - - # 创建与父节点的关系 - graph.create(Relationship(parent_node, "HAS_COMPONENT", fee_node)) - - # 处理子费用项 - if "children" in fee and fee["children"]: - children = fee["children"] - logger.info(f"Fee {name} 有 {len(children)} 个子费用项") - - for child in children: - process_fee_item(child, fee_node) - - -# 处理工程属性集(ProjectPropertySet)和工程属性(ProjectProperty) -def process_project_property_set(data, root_node): - # 检查projectInfo是否存在 - if "projectData" in data and "projectInfo" in data["projectData"]: - project_info = data["projectData"]["projectInfo"] - elif "projectInfo" in data: - project_info = data["projectInfo"] - else: - logger.warning("JSON中未找到projectInfo数据") - logger.info(f"JSON顶层键: {list(data.keys())}") - return - - logger.info("开始处理projectInfo") - - # 创建工程属性集节点 - property_set_node = Node("ProjectPropertySet", name="工程属性") - graph.create(property_set_node) - graph.create(Relationship(root_node, "CONTAINS", property_set_node)) - logger.info("创建ProjectPropertySet节点: 工程属性") - - # 创建工程属性节点 - property_node = Node("ProjectProperty") - - # 添加所有属性 - for key, value in project_info.items(): - if value is not None: - property_node[key] = value - - graph.create(property_node) - logger.info("创建ProjectProperty节点") - - # 创建与属性集的关系 - graph.create(Relationship(property_set_node, "HAS_COMPONENT", property_node)) - logger.info("创建关系: 工程属性 HAS_COMPONENT ProjectProperty") - - -# 在main函数中添加对这些函数的调用 -def main(): - # 创建根节点 - root_node = create_root_node() - - # 读取JSON文件 - json_file_path = "dataset/json/主网预算/架空.json" - with open(json_file_path, "r", encoding="utf-8") as f: - data = json.load(f) - - # 先处理费用预览,创建CostSet节点 - process_cost_set(data, root_node) - - # 再处理项目划分,创建ProjectDivisionSet和ProjectDivisionItem节点,并建立USE关系 - process_project_division_set(data, root_node) - - # 处理取费表模板集 - process_fee_table_template_set(data, root_node) - - # 处理费用表集 - process_fee_schedule_set(data, root_node) - - # 处理工程属性集 - process_project_property_set(data, root_node) - - # 统计节点和关系数量 - count_nodes_and_relationships() - - logger.info("知识图谱构建完成") - - -# 添加统计节点和关系数量的函数 -def count_nodes_and_relationships(): - # 统计节点数量 - node_count_query = """ - MATCH (n) - RETURN labels(n) AS labels, count(*) AS count - """ - node_counts = graph.run(node_count_query).data() - logger.info("节点类型统计:") - for count_info in node_counts: - labels = count_info["labels"] - count = count_info["count"] - for label in labels: - logger.info(f" {label}: {count}个节点") - - # 统计关系数量 - rel_count_query = """ - MATCH ()-[r]->() - RETURN type(r) AS type, count(*) AS count - """ - rel_counts = graph.run(rel_count_query).data() - logger.info("关系类型统计:") - for count_info in rel_counts: - rel_type = count_info["type"] - count = count_info["count"] - logger.info(f" {rel_type}: {count}个关系") - - -if __name__ == "__main__": - main() From ddbe21b1ed045b184befe82057fc0b6e052180f4 Mon Sep 17 00:00:00 2001 From: chentianrui Date: Tue, 24 Jun 2025 18:39:33 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E8=87=B3=20/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build_kg.py | 1032 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1032 insertions(+) create mode 100644 build_kg.py diff --git a/build_kg.py b/build_kg.py new file mode 100644 index 0000000..45413e6 --- /dev/null +++ b/build_kg.py @@ -0,0 +1,1032 @@ +from py2neo import Graph, Node, Relationship, NodeMatcher +import json +import os +import logging + +# 设置日志 +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) + +# 连接到Neo4j数据库 +uri = "bolt://172.20.0.145:7687" +user = "neo4j" +password = "password" + +try: + graph = Graph(uri, auth=(user, password)) + logger.info("成功连接到Neo4j数据库") +except Exception as e: + logger.error(f"连接Neo4j数据库失败: {e}") + exit(1) + +# 清空数据库 +try: + graph.run("MATCH (n) DETACH DELETE n") + logger.info("已清空数据库") +except Exception as e: + logger.error(f"清空数据库失败: {e}") + +# 删除所有约束 +try: + # 获取所有约束 + constraints = graph.run("SHOW CONSTRAINTS").data() + for constraint in constraints: + constraint_name = constraint.get("name") + if constraint_name: + graph.run(f"DROP CONSTRAINT {constraint_name}") + logger.info(f"已删除约束: {constraint_name}") +except Exception as e: + logger.warning(f"删除约束失败: {e}") + + +# 创建约束和索引以提高性能 - 现在不创建任何约束 +def create_constraints_and_indexes(): + # 不创建任何约束 + logger.info("跳过创建约束") + pass + + +# 创建根节点 +def create_root_node(): + root = Node("EngineeringData", name="工程") + graph.create(root) + logger.info("创建根节点: 工程") + return root + + +# 处理ProjectDivisionSet +def process_project_division_set(data, root_node): + # 根据您提供的JSON结构,正确访问projectDivision数据 + if "projectData" in data and "projectDivision" in data["projectData"]: + project_division = data["projectData"]["projectDivision"] + elif "projectDivision" in data: + project_division = data["projectDivision"] + else: + logger.warning("JSON中未找到projectDivision数据") + logger.info(f"JSON顶层键: {list(data.keys())}") + return + + logger.info(f"开始处理projectDivision,包含 {len(project_division)} 个顶级项目") + + # 创建新的ProjectDivisionSet节点 - 项目划分集 + division_set = Node("ProjectDivisionSet", name="项目划分集") + graph.create(division_set) + graph.create(Relationship(root_node, "CONTAINS", division_set)) + logger.info(f"创建ProjectDivisionSet节点: 项目划分集") + + # 处理ProjectDivisionTree + for first_level_name, first_level_content in project_division.items(): + # 处理第一层下的内容,直接创建合并后的ProjectDivisionTree节点 + if isinstance(first_level_content, dict): + # 处理一级名称,去掉"工程"字样 + processed_first_level = first_level_name.replace("工程", "") + + for second_level_name, second_level_content in first_level_content.items(): + # 确定最终节点名称 + if second_level_name == processed_first_level: + # 如果二级名称与处理后的一级名称相同,直接使用二级名称 + final_name = second_level_name + else: + # 否则组合二级名称和处理后的一级名称 + final_name = f"{second_level_name}{processed_first_level}" + + # 创建ProjectDivisionTree节点 + division_tree = Node("ProjectDivisionTree", name=final_name) + + # 保存原始名称作为属性 + division_tree["original_first_level"] = first_level_name + division_tree["original_second_level"] = second_level_name + + # 如果有GUID,添加到节点属性 + guid = None + if isinstance(first_level_content, dict) and "GUID" in first_level_content: + division_tree["first_level_GUID"] = first_level_content["GUID"] + guid = first_level_content["GUID"] + + graph.create(division_tree) + graph.create(Relationship(division_set, "CONTAINS", division_tree)) + logger.info(f"创建ProjectDivisionTree节点: {final_name}") + + # 如果有GUID,尝试建立与CostSet的USE关系 + if guid: + # 查找对应的CostSet节点 + cost_set_query = f""" + MATCH (c:CostSet) + WHERE c.GUID = '{guid}' + RETURN c + """ + cost_set_nodes = list(graph.run(cost_set_query)) + if cost_set_nodes: + cost_set_node = cost_set_nodes[0]["c"] + graph.create(Relationship(division_tree, "USE", cost_set_node)) + logger.info(f"创建关系: {final_name} USE CostSet (GUID: {guid})") + + # 处理第二层下的ProjectDivisionItem列表 + if isinstance(second_level_content, list): + logger.info(f"ProjectDivisionTree {final_name} 包含 {len(second_level_content)} 个列表项") + for item in second_level_content: + process_project_division_item(item, division_tree) + else: + logger.warning(f"ProjectDivisionTree {final_name} 的内容类型未知: {type(second_level_content)}") + else: + logger.warning(f"第一层 {first_level_name} 的内容类型未知: {type(first_level_content)}") + + +# 处理ProjectDivisionItem +def process_project_division_item(item, parent_node): + # 提取必要属性 + guid = item.get("GUID", "") + name = item.get("项目名称", "") + + if not guid and not name: + logger.warning("ProjectDivisionItem缺少GUID和项目名称") + return + + # 创建ProjectDivisionItem节点 + item_node = Node("ProjectDivisionItem", GUID=guid, name=name) + + # 添加path属性,表示从ProjectDivisionItem到ProjectDivisionTree的路径,不包含节点类型 + if isinstance(parent_node, Node) and "ProjectDivisionTree" in parent_node.labels: + # 如果父节点是ProjectDivisionTree,使用"父节点名称/当前节点名称"作为路径 + item_node["path"] = f"{parent_node['name']}/{name}" + logger.info(f"为ProjectDivisionItem {name} 设置path: {item_node['path']}") + else: + # 如果父节点是ProjectDivisionItem,使用"父节点path/当前节点名称"作为路径 + parent_path = parent_node.get("path", "") + if parent_path: + item_node["path"] = f"{parent_path}/{name}" + else: + # 如果父节点没有path属性(不应该发生,但为了健壮性) + item_node["path"] = name + logger.info(f"为ProjectDivisionItem {name} 设置path: {item_node['path']}") + + # 添加其他属性 + for key, value in item.items(): + if key not in ["GUID", "项目名称", "children"] and value is not None: + # 检查是否为资源库列表 + if key == "资源库列表" and isinstance(value, list): + # 将资源库列表转换为分号分隔的字符串 + resource_names = [] + for resource in value: + if isinstance(resource, dict) and "资源库名称" in resource: + resource_names.append(resource["资源库名称"]) + item_node["资源库名称"] = ";".join(resource_names) + logger.info(f"将资源库列表转换为字符串: {item_node['资源库名称']}") + # 检查值是否为基本类型 + elif isinstance(value, (str, int, float, bool)): + item_node[key] = value + # 如果是列表,尝试转换为分号分隔的字符串 + elif isinstance(value, list): + try: + if all(isinstance(x, (str, int, float, bool)) for x in value): + item_node[key] = ";".join(str(x) for x in value) + else: + # 对于包含复杂对象的列表,尝试提取关键信息 + extracted_values = [] + for item_in_list in value: + if isinstance(item_in_list, dict): + # 尝试提取字典中的名称或标识符 + for name_key in ["名称", "name", "标识", "id", "ID"]: + if name_key in item_in_list: + extracted_values.append(str(item_in_list[name_key])) + break + else: + # 如果没有找到名称键,使用第一个键值对 + if item_in_list: + first_key = next(iter(item_in_list)) + extracted_values.append(f"{first_key}:{item_in_list[first_key]}") + else: + extracted_values.append(str(item_in_list)) + item_node[key] = ";".join(extracted_values) + except Exception as e: + logger.warning(f"无法将列表属性 {key} 转换为字符串: {e}") + # 如果是字典,尝试转换为字符串 + elif isinstance(value, dict): + try: + # 提取字典中的关键信息 + extracted_info = [] + for dict_key, dict_value in value.items(): + if isinstance(dict_value, (str, int, float, bool)): + extracted_info.append(f"{dict_key}:{dict_value}") + item_node[key] = ";".join(extracted_info) + except Exception as e: + logger.warning(f"无法将字典属性 {key} 转换为字符串: {e}") + + graph.create(item_node) + logger.info(f"创建ProjectDivisionItem节点: {name} (GUID: {guid})") + + # 创建与父节点的关系 + if isinstance(parent_node, Node) and "ProjectDivisionTree" in parent_node.labels: + graph.create(Relationship(parent_node, "CONTAINS", item_node)) + logger.info(f"创建关系: {parent_node['name']} CONTAINS {name}") + else: + graph.create(Relationship(parent_node, "HAS_CHILD", item_node)) + logger.info(f"创建关系: {parent_node['name']} HAS_CHILD {name}") + + # 如果有GUID,尝试建立与CostSet的USE关系 + if guid: + # 查找对应的CostSet节点 + cost_set_query = f""" + MATCH (c:CostSet) + WHERE c.GUID = '{guid}' + RETURN c + """ + cost_set_nodes = list(graph.run(cost_set_query)) + if cost_set_nodes: + cost_set_node = cost_set_nodes[0]["c"] + graph.create(Relationship(item_node, "USE", cost_set_node)) + logger.info(f"创建关系: {name} USE CostSet (GUID: {guid})") + + # 处理子项 + if "children" in item and item["children"]: + children = item["children"] + logger.info(f"ProjectDivisionItem {name} 有 {len(children)} 个子项") + + for child in children: + child_type = child.get("type", child.get("类型", "")) + + if child_type == "项目划分": + # 递归处理子ProjectDivisionItem + process_project_division_item(child, item_node) + elif child_type == "8" or child_type == "清单": + # 处理List类型节点 + process_list_item(child, item_node) + else: + # 处理ProjectQuantity及其子类 + process_project_quantity(child, item_node) + + +# 处理List及其子类 +def process_list_item(list_item, parent_node): + """处理清单类型的节点""" + # 提取必要属性 + guid = list_item.get("GUID", "") + list_name = list_item.get("清单名称") + list_type = list_item.get("类型", "") + + # 创建List节点 + list_node = Node("List", guid=guid, name=list_name, type=list_type) + + # 添加path属性,包含节点类型 + parent_path = parent_node.get("path", "") + if parent_path: + list_node["path"] = f"{parent_path}/{list_name}(清单)" + else: + # 如果父节点没有path属性(不应该发生,但为了健壮性) + parent_name = parent_node.get("name", "") + list_node["path"] = f"{parent_name}/{list_name}(清单)" + logger.info(f"为List节点 {list_name} 设置path: {list_node['path']}") + + # 添加其他属性 + for key, value in list_item.items(): + if key not in ["清单名称", "类型", "guid", "children"] and value is not None: + # 检查值是否为基本类型 + if isinstance(value, (str, int, float, bool)): + list_node[key] = value + # 如果是列表,尝试转换为分号分隔的字符串 + elif isinstance(value, list): + try: + if all(isinstance(x, (str, int, float, bool)) for x in value): + list_node[key] = ";".join(str(x) for x in value) + else: + # 对于包含复杂对象的列表,尝试提取关键信息 + extracted_values = [] + for item_in_list in value: + if isinstance(item_in_list, dict): + # 尝试提取字典中的名称或标识符 + for name_key in ["名称", "name", "标识", "id", "ID"]: + if name_key in item_in_list: + extracted_values.append(str(item_in_list[name_key])) + break + else: + # 如果没有找到名称键,使用第一个键值对 + if item_in_list: + first_key = next(iter(item_in_list)) + extracted_values.append(f"{first_key}:{item_in_list[first_key]}") + else: + extracted_values.append(str(item_in_list)) + list_node[key] = ";".join(extracted_values) + except Exception as e: + logger.warning(f"无法将列表属性 {key} 转换为字符串: {e}") + # 如果是字典,尝试转换为字符串 + elif isinstance(value, dict): + try: + # 提取字典中的关键信息 + extracted_info = [] + for dict_key, dict_value in value.items(): + if isinstance(dict_value, (str, int, float, bool)): + extracted_info.append(f"{dict_key}:{dict_value}") + list_node[key] = ";".join(extracted_info) + except Exception as e: + logger.warning(f"无法将字典属性 {key} 转换为字符串: {e}") + + graph.create(list_node) + logger.info(f"创建List节点: {list_name} (类型: {list_type})") + + # 创建与父节点的关系 + graph.create(Relationship(parent_node, "HAS_CHILD", list_node)) + logger.info(f"创建关系: {parent_node['name']} HAS_CHILD {list_name}") + + # 如果有GUID,尝试建立与CostSet的USE关系 + if guid: + # 查找对应的CostSet节点 + cost_set_query = f""" + MATCH (c:CostSet) + WHERE c.GUID = '{guid}' + RETURN c + """ + cost_set_nodes = list(graph.run(cost_set_query)) + if cost_set_nodes: + cost_set_node = cost_set_nodes[0]["c"] + graph.create(Relationship(list_node, "USE", cost_set_node)) + logger.info(f"创建关系: {list_name} USE CostSet (GUID: {guid})") + + # 处理子项 + if "children" in list_item and list_item["children"]: + children = list_item["children"] + logger.info(f"List {list_name} 有 {len(children)} 个子项") + + for child in children: + # 确定子项类型 + child_type = child.get("type", child.get("类型", "")) + + if child_type == "项目划分": + # 递归处理子ProjectDivisionItem + process_project_division_item(child, list_node) + elif child_type == "8" or child_type == "清单": + # 递归处理子List + process_list_item(child, list_node) + else: + # 处理ProjectQuantity及其子类 + process_project_quantity(child, list_node) + + +# 处理ProjectQuantity及其子类 +def process_project_quantity(quantity, parent_node): + # 确定具体类型 + quantity_type = quantity.get("类型", "") + labels = ["ProjectQuantity"] + type_name = "ProjectQuantity" + + # 支持数字和文本类型 + if quantity_type == "0" or quantity_type == "定额": + labels.append("Quota") + type_name = "定额" + elif quantity_type == "1" or quantity_type == "主材": + labels.append("MainMaterial") + type_name = "主材" + elif quantity_type == "5" or quantity_type == "设备": + labels.append("Equipment") + type_name = "设备" + + # 创建节点 + quantity_id = quantity.get("id", "") + quantity_name = quantity.get("项目名称", quantity.get("名称", "")) + + quantity_node = Node(*labels, id=quantity_id, name=quantity_name) + + # 添加path属性,包含节点类型 + parent_path = parent_node.get("path", "") + if parent_path: + quantity_node["path"] = f"{parent_path}/{quantity_name}({type_name})" + else: + # 如果父节点没有path属性(不应该发生,但为了健壮性) + parent_name = parent_node.get("name", "") + quantity_node["path"] = f"{parent_name}/{quantity_name}({type_name})" + logger.info(f"为ProjectQuantity节点 {quantity_name} 设置path: {quantity_node['path']}") + + # 添加其他属性 + for key, value in quantity.items(): + if key not in ["id", "名称", "项目名称", "材机列表", "children"] and value is not None: + # 检查是否为资源库列表 + if key == "资源库列表" and isinstance(value, list): + # 将资源库列表转换为分号分隔的字符串 + resource_names = [] + for resource in value: + if isinstance(resource, dict) and "资源库名称" in resource: + resource_names.append(resource["资源库名称"]) + quantity_node["资源库名称"] = ";".join(resource_names) + logger.info(f"将资源库列表转换为字符串: {quantity_node['资源库名称']}") + # 检查值是否为基本类型 + elif isinstance(value, (str, int, float, bool)): + quantity_node[key] = value + # 如果是列表,尝试转换为分号分隔的字符串 + elif isinstance(value, list): + try: + if all(isinstance(x, (str, int, float, bool)) for x in value): + quantity_node[key] = ";".join(str(x) for x in value) + else: + # 对于包含复杂对象的列表,尝试提取关键信息 + extracted_values = [] + for item_in_list in value: + if isinstance(item_in_list, dict): + # 尝试提取字典中的名称或标识符 + for name_key in ["名称", "name", "标识", "id", "ID"]: + if name_key in item_in_list: + extracted_values.append(str(item_in_list[name_key])) + break + else: + # 如果没有找到名称键,使用第一个键值对 + if item_in_list: + first_key = next(iter(item_in_list)) + extracted_values.append(f"{first_key}:{item_in_list[first_key]}") + else: + extracted_values.append(str(item_in_list)) + quantity_node[key] = ";".join(extracted_values) + except Exception as e: + logger.warning(f"无法将列表属性 {key} 转换为字符串: {e}") + # 如果是字典,尝试转换为字符串 + elif isinstance(value, dict): + try: + # 提取字典中的关键信息 + extracted_info = [] + for dict_key, dict_value in value.items(): + if isinstance(dict_value, (str, int, float, bool)): + extracted_info.append(f"{dict_key}:{dict_value}") + quantity_node[key] = ";".join(extracted_info) + except Exception as e: + logger.warning(f"无法将字典属性 {key} 转换为字符串: {e}") + + graph.create(quantity_node) + logger.info(f"创建ProjectQuantity节点: {quantity_name} (id: {quantity_id}, 类型: {quantity_type})") + + # 创建与父节点的关系 + graph.create(Relationship(parent_node, "HAS_COMPONENT", quantity_node)) + logger.info(f"创建关系: {parent_node['name']} HAS_COMPONENT {quantity_name}") + + # 处理材机列表或children + materials = None + + # 先检查是否有材机列表 + if "材机列表" in quantity and quantity["材机列表"]: + materials = quantity["材机列表"] + logger.info(f"ProjectQuantity {quantity_name} 有 {len(materials)} 个材机项") + + for material in materials: + process_material_or_equipment(material, quantity_node) + + # 如果没有材机列表,则检查是否有children + elif "children" in quantity and quantity["children"]: + children = quantity["children"] + logger.info(f"ProjectQuantity {quantity_name} 有 {len(children)} 个子项") + + for child in children: + child_type = child.get("类型", child.get("type", "")) + + # 如果子项类型为人工、材料或机械,则视为MaterialOrEquipment + if child_type in ["人工", "材料", "机械", "2", "3", "4"]: + process_material_or_equipment(child, quantity_node) + # 如果子项类型为主材、设备或定额,则递归处理为ProjectQuantity + elif child_type in ["1", "主材", "5", "设备", "0", "定额"]: + process_project_quantity(child, quantity_node) + + +# 处理MaterialOrEquipment +def process_material_or_equipment(material, parent_node): + material_id = material.get("id", material.get("ID", "")) + material_name = material.get("名称", "") + material_type = material.get("类型", material.get("type", "")) + + if not material_id and not material_name: + logger.warning("MaterialOrEquipment缺少id和名称") + return + + # 创建唯一标识,结合父节点的ID和当前项的id + parent_id = parent_node.get("id", parent_node.get("GUID", "")) + unique_id = f"{parent_id}_{material_id}" if parent_id else material_id + + # 直接创建新节点,不检查是否已存在 + material_node = Node( + "MaterialOrEquipment", id=material_id, unique_id=unique_id, name=material_name, type=material_type + ) + + # 添加其他属性 + for key, value in material.items(): + if key not in ["id", "ID", "名称", "类型", "type"] and value is not None: + # 检查是否为资源库列表 + if key == "资源库列表" and isinstance(value, list): + # 将资源库列表转换为分号分隔的字符串 + resource_names = [] + for resource in value: + if isinstance(resource, dict) and "资源库名称" in resource: + resource_names.append(resource["资源库名称"]) + material_node["资源库名称"] = ";".join(resource_names) + logger.info(f"将资源库列表转换为字符串: {material_node['资源库名称']}") + # 检查值是否为基本类型 + elif isinstance(value, (str, int, float, bool)): + material_node[key] = value + # 如果是列表,尝试转换为分号分隔的字符串 + elif isinstance(value, list): + try: + if all(isinstance(x, (str, int, float, bool)) for x in value): + material_node[key] = ";".join(str(x) for x in value) + else: + # 对于包含复杂对象的列表,尝试提取关键信息 + extracted_values = [] + for item_in_list in value: + if isinstance(item_in_list, dict): + # 尝试提取字典中的名称或标识符 + for name_key in ["名称", "name", "标识", "id", "ID"]: + if name_key in item_in_list: + extracted_values.append(str(item_in_list[name_key])) + break + else: + # 如果没有找到名称键,使用第一个键值对 + if item_in_list: + first_key = next(iter(item_in_list)) + extracted_values.append(f"{first_key}:{item_in_list[first_key]}") + else: + extracted_values.append(str(item_in_list)) + material_node[key] = ";".join(extracted_values) + except Exception as e: + logger.warning(f"无法将列表属性 {key} 转换为字符串: {e}") + # 如果是字典,尝试转换为字符串 + elif isinstance(value, dict): + try: + # 提取字典中的关键信息 + extracted_info = [] + for dict_key, dict_value in value.items(): + if isinstance(dict_value, (str, int, float, bool)): + extracted_info.append(f"{dict_key}:{dict_value}") + material_node[key] = ";".join(extracted_info) + except Exception as e: + logger.warning(f"无法将字典属性 {key} 转换为字符串: {e}") + + graph.create(material_node) + logger.info(f"创建MaterialOrEquipment节点: {material_name} (id: {material_id}, 类型: {material_type})") + + # 创建与父节点的关系 + graph.create(Relationship(parent_node, "OWNERSHIP", material_node)) + logger.info(f"创建关系: {parent_node['name']} OWNERSHIP {material_name}") + + +# 处理CostSet +def process_cost_set(data, root_node): + # 根据您提供的JSON结构,正确访问expensePreview数据 + expense_preview = None + + if "projectData" in data and "expensePreview" in data["projectData"]: + expense_preview = data["projectData"]["expensePreview"] + elif "expensePreview" in data: + expense_preview = data["expensePreview"] + else: + logger.warning("JSON中未找到expensePreview数据") + logger.info(f"JSON顶层键: {list(data.keys())}") + return + + logger.info("开始处理expensePreview") + + # 处理安装工程节点 + if "安装工程" in expense_preview: + install_cost_set = Node("CostSet", name="安装工程") + graph.create(install_cost_set) + # graph.create(Relationship(root_node, "HAS_COST_SET", install_cost_set)) + logger.info("创建CostSet节点: 安装工程") + + # 处理安装节点 + if "安装" in expense_preview["安装工程"]: + install_sub_cost_set = Node("CostSet", name="安装") + graph.create(install_sub_cost_set) + graph.create(Relationship(install_cost_set, "HAS_CHILD", install_sub_cost_set)) + logger.info("创建CostSet节点: 安装") + + # 处理安装下的CostSet列表 + for cost_set in expense_preview["安装工程"]["安装"]: + process_cost_set_recursive(cost_set, install_sub_cost_set) + + +# 递归处理CostSet +def process_cost_set_recursive(cost_set, parent_node): + guid = cost_set.get("GUID", "") + # 使用GUID作为名称,如果GUID为空,则尝试使用项目名称或name + name = guid if guid else cost_set.get("项目名称", cost_set.get("name", "未命名CostSet")) + + # 创建CostSet节点 + cost_set_node = Node("CostSet", GUID=guid, name=name) + + # 添加其他属性 + for key, value in cost_set.items(): + if key not in ["GUID", "children", "项目名称", "name"] and value is not None: + cost_set_node[key] = value + + graph.create(cost_set_node) + logger.info(f"创建CostSet节点: {name} (GUID: {guid})") + + # # 创建与父节点的关系 + # graph.create(Relationship(parent_node, "HAS_CHILD", cost_set_node)) + # logger.info(f"创建关系: {parent_node['name']} HAS_CHILD {name}") + + # 处理子项 + if "children" in cost_set and cost_set["children"]: + children = cost_set["children"] + logger.info(f"CostSet {name} 有 {len(children)} 个子项") + + for child in children: + if "GUID" in child: + # 递归处理子CostSet + process_cost_set_recursive(child, cost_set_node) + else: + # 处理CostItem + process_cost_item(child, cost_set_node) + + +# 处理CostItem +def process_cost_item(item, parent_node): + item_id = item.get("id", "") + cost = item.get("cost", "") + + # 使用id作为名称,如果id为空,则尝试使用name或项目名称 + name = item_id if item_id else item.get("name", item.get("项目名称", "未命名CostItem")) + + # 创建唯一标识,结合父节点的GUID和当前项的id + parent_guid = parent_node.get("GUID", "") + unique_id = f"{parent_guid}_{item_id}" if parent_guid else item_id + + # 直接创建新节点,不检查是否已存在 + item_node = Node("CostItem", id=item_id, unique_id=unique_id, cost=cost, name=name) + + # 添加其他属性 + for key, value in item.items(): + if key not in ["id", "cost", "name", "项目名称"] and value is not None: + # 检查是否为资源库列表 + if key == "资源库列表" and isinstance(value, list): + # 将资源库列表转换为分号分隔的字符串 + resource_names = [] + for resource in value: + if isinstance(resource, dict) and "资源库名称" in resource: + resource_names.append(resource["资源库名称"]) + item_node["资源库名称"] = ";".join(resource_names) + logger.info(f"将资源库列表转换为字符串: {item_node['资源库名称']}") + # 检查值是否为基本类型 + elif isinstance(value, (str, int, float, bool)): + item_node[key] = value + # 如果是列表,尝试转换为分号分隔的字符串 + elif isinstance(value, list): + try: + if all(isinstance(x, (str, int, float, bool)) for x in value): + item_node[key] = ";".join(str(x) for x in value) + else: + # 对于包含复杂对象的列表,尝试提取关键信息 + extracted_values = [] + for item_in_list in value: + if isinstance(item_in_list, dict): + # 尝试提取字典中的名称或标识符 + for name_key in ["名称", "name", "标识", "id", "ID"]: + if name_key in item_in_list: + extracted_values.append(str(item_in_list[name_key])) + break + else: + # 如果没有找到名称键,使用第一个键值对 + if item_in_list: + first_key = next(iter(item_in_list)) + extracted_values.append(f"{first_key}:{item_in_list[first_key]}") + else: + extracted_values.append(str(item_in_list)) + item_node[key] = ";".join(extracted_values) + except Exception as e: + logger.warning(f"无法将列表属性 {key} 转换为字符串: {e}") + # 如果是字典,尝试转换为字符串 + elif isinstance(value, dict): + try: + # 提取字典中的关键信息 + extracted_info = [] + for dict_key, dict_value in value.items(): + if isinstance(dict_value, (str, int, float, bool)): + extracted_info.append(f"{dict_key}:{dict_value}") + item_node[key] = ";".join(extracted_info) + except Exception as e: + logger.warning(f"无法将字典属性 {key} 转换为字符串: {e}") + + graph.create(item_node) + logger.info(f"创建CostItem节点: {name} (id: {item_id}, unique_id: {unique_id})") + + # 创建与父节点的关系 + graph.create(Relationship(parent_node, "CONTAINS", item_node)) + logger.info(f"创建关系: {parent_node['name']} CONTAINS {name}") + + +# 建立实体间的关系 +def establish_relationships(): + # 建立ProjectDivisionItem与CostSet的关系 + query_division_item = """ + MATCH (pdi:ProjectDivisionItem), (cs:CostSet) + WHERE pdi.GUID = cs.GUID AND pdi.GUID <> "" + CREATE (pdi)-[:USE]->(cs) + RETURN count(*) as count + """ + try: + result = graph.run(query_division_item) + count = result.data()[0]["count"] + logger.info(f"创建了 {count} 个 ProjectDivisionItem USE CostSet 关系") + except Exception as e: + logger.error(f"创建ProjectDivisionItem与CostSet关系失败: {e}") + + # 建立ProjectQuantity与CostSet的关系 + query_quantity = """ + MATCH (pq:ProjectQuantity), (cs:CostSet) + WHERE pq.id = cs.id AND pq.id <> "" + CREATE (pq)-[:USE]->(cs) + RETURN count(*) as count + """ + try: + result = graph.run(query_quantity) + count = result.data()[0]["count"] + logger.info(f"创建了 {count} 个 ProjectQuantity USE CostSet 关系") + except Exception as e: + logger.error(f"创建ProjectQuantity与CostSet关系失败: {e}") + + +# 处理取费表模板集(FeeTableTemplateSet) +def process_fee_table_template_set(data, root_node): + # 根据JSON结构,访问costSetting数据 + if "projectData" in data and "costSetting" in data["projectData"]: + cost_setting = data["projectData"]["costSetting"] + elif "costSetting" in data: + cost_setting = data["costSetting"] + else: + logger.warning("JSON中未找到costSetting数据") + logger.info(f"JSON顶层键: {list(data.keys())}") + return + + logger.info(f"开始处理costSetting,包含 {len(cost_setting)} 个取费表模板集") + + # 只创建一个取费表模板集节点 + fee_template_set_node = Node("FeeTableTemplateSet", name="取费表模板集") + graph.create(fee_template_set_node) + graph.create(Relationship(root_node, "CONTAINS", fee_template_set_node)) + logger.info("创建FeeTableTemplateSet节点: 取费表模板集") + + # 处理每个取费表模板集 + for template_set_name, template_set_content in cost_setting.items(): + + # 直接处理取费表模板项 + if "tables" in template_set_content and isinstance(template_set_content["tables"], list): + for template_item in template_set_content["tables"]: + process_fee_table_template_item(template_item, fee_template_set_node) + + +# 处理取费表模板项(FeeTableTemplateItem) +def process_fee_table_template_item(template_item, parent_node): + # 提取必要属性 + name = template_item.get("name", "") + outlay_id = template_item.get("OutlayID", "") + type_name = template_item.get("类型", "") + profession = template_item.get("专业", "") + + if not name: + logger.warning("FeeTableTemplateItem缺少name") + return + + # 创建取费表模板项节点 + template_item_node = Node( + "FeeTableTemplateItem", name=name, outlayID=outlay_id, type=type_name, profession=profession + ) + + graph.create(template_item_node) + graph.create(Relationship(parent_node, "CONTAINS", template_item_node)) + logger.info(f"创建FeeTableTemplateItem节点: {name} (OutlayID: {outlay_id})") + + # 处理取费项 + if "children" in template_item and isinstance(template_item["children"], list): + for fee_item in template_item["children"]: + process_fee(fee_item, template_item_node) + + +# 处理取费(FeeCollection) +def process_fee(fee_item, parent_node): + # 提取必要属性 + serial_number = fee_item.get("序号", "") + fee_name = fee_item.get("费用名称", "") + code = fee_item.get("代码", "") + rate = fee_item.get("费率(%)", "") + base = fee_item.get("取费基数", "") + remark = fee_item.get("备注", "") + + if not fee_name: + logger.warning("Fee缺少费用名称") + return + + # 创建取费节点 + fee_node = Node("FeeCollection", serialNumber=serial_number, name=fee_name, code=code) + + # 添加path属性 + if "FeeCollection" in parent_node.labels: + # 如果父节点是FeeCollection节点,使用父节点的path加上当前节点名称 + parent_path = parent_node.get("path", "") + fee_node["path"] = f"{parent_path}/{fee_name}" + else: + # 如果父节点是FeeTableTemplateItem,直接使用父节点名称作为路径的开始 + parent_name = parent_node.get("name", "") + fee_node["path"] = f"{parent_name}/{fee_name}" + + logger.info(f"为FeeCollection节点 {fee_name} 设置path: {fee_node['path']}") + + # 添加其他属性 + if rate: + fee_node["rate"] = rate + if base: + fee_node["base"] = base + if remark: + fee_node["remark"] = remark + + graph.create(fee_node) + graph.create(Relationship(parent_node, "HAS_COMPONENT", fee_node)) + logger.info(f"创建Fee节点: {fee_name} (序号: {serial_number}, 代码: {code})") + + # 处理子费用项 + if "children" in fee_item and isinstance(fee_item["children"], list): + for child_fee in fee_item["children"]: + process_fee(child_fee, fee_node) + + +# 处理费用表集(FeeScheduleSet) +def process_fee_schedule_set(data, root_node): + """处理费用表集、费用表项和费用""" + # 检查projectCost是否存在 + if "projectData" in data and "projectCost" in data["projectData"]: + project_cost = data["projectData"]["projectCost"] + elif "projectCost" in data: + project_cost = data["projectCost"] + else: + logger.warning("JSON中未找到projectCost数据") + logger.info(f"JSON顶层键: {list(data.keys())}") + return + + logger.info(f"开始处理projectCost,包含 {len(project_cost)} 个费用表项") + + # 创建FeeScheduleSet节点 - 工程费用 + fee_schedule_set = Node("FeeScheduleSet", name="工程费用") + graph.create(fee_schedule_set) + graph.create(Relationship(root_node, "CONTAINS", fee_schedule_set)) + logger.info(f"创建FeeScheduleSet节点: 工程费用") + + # 处理费用表集下的费用表项 + for fee_table_name, fee_table_content in project_cost.items(): + # 创建FeeScheduleItem节点 + fee_schedule_item = Node("FeeScheduleItem", name=fee_table_name) + graph.create(fee_schedule_item) + graph.create(Relationship(fee_schedule_set, "CONTAINS", fee_schedule_item)) + logger.info(f"创建FeeScheduleItem节点: {fee_table_name}") + + # 处理费用表项下的费用列表 + if isinstance(fee_table_content, list): + logger.info(f"FeeScheduleItem {fee_table_name} 包含 {len(fee_table_content)} 个费用项") + for fee_item in fee_table_content: + process_fee_item(fee_item, fee_schedule_item) + else: + logger.warning(f"FeeScheduleItem {fee_table_name} 的内容类型未知: {type(fee_table_content)}") + + +# 处理费用项(Fee) +def process_fee_item(fee, parent_node): + """处理费用项""" + # 提取必要属性 + serial_number = fee.get("序号", "") + name = fee.get("费用名称", "") + code = fee.get("代码", "") + rate = fee.get("费率(%)", "") + amount = fee.get("金额", "") + + if not name: + logger.warning("Fee缺少费用名称") + return + + # 创建Fee节点 + fee_node = Node("Fee", serialNumber=serial_number, name=name, code=code) + + # 添加path属性 + if "Fee" in parent_node.labels: + # 如果父节点是Fee节点,使用父节点的path加上当前节点名称 + parent_path = parent_node.get("path", "") + fee_node["path"] = f"{parent_path}/{name}" + else: + # 如果父节点是FeeScheduleItem,直接使用父节点名称作为路径的开始 + parent_name = parent_node.get("name", "") + fee_node["path"] = f"{parent_name}/{name}" + + logger.info(f"为Fee节点 {name} 设置path: {fee_node['path']}") + + # 添加其他属性 + if rate: + fee_node["rate"] = rate + if amount: + fee_node["amount"] = amount + + # 添加其他属性 + for key, value in fee.items(): + if key not in ["序号", "费用名称", "代码", "费率(%)", "金额", "children"] and value is not None: + if isinstance(value, (str, int, float, bool)): + fee_node[key] = value + + graph.create(fee_node) + logger.info(f"创建Fee节点: {name} (序号: {serial_number})") + + # 创建与父节点的关系 + graph.create(Relationship(parent_node, "HAS_COMPONENT", fee_node)) + + # 处理子费用项 + if "children" in fee and fee["children"]: + children = fee["children"] + logger.info(f"Fee {name} 有 {len(children)} 个子费用项") + + for child in children: + process_fee_item(child, fee_node) + + +# 处理工程属性集(ProjectPropertySet)和工程属性(ProjectProperty) +def process_project_property_set(data, root_node): + # 检查projectInfo是否存在 + if "projectData" in data and "projectInfo" in data["projectData"]: + project_info = data["projectData"]["projectInfo"] + elif "projectInfo" in data: + project_info = data["projectInfo"] + else: + logger.warning("JSON中未找到projectInfo数据") + logger.info(f"JSON顶层键: {list(data.keys())}") + return + + logger.info("开始处理projectInfo") + + # 创建工程属性集节点 + property_set_node = Node("ProjectPropertySet", name="工程属性") + graph.create(property_set_node) + graph.create(Relationship(root_node, "CONTAINS", property_set_node)) + logger.info("创建ProjectPropertySet节点: 工程属性") + + # 创建工程属性节点 + property_node = Node("ProjectProperty") + + # 添加所有属性 + for key, value in project_info.items(): + if value is not None: + property_node[key] = value + + graph.create(property_node) + logger.info("创建ProjectProperty节点") + + # 创建与属性集的关系 + graph.create(Relationship(property_set_node, "HAS_COMPONENT", property_node)) + logger.info("创建关系: 工程属性 HAS_COMPONENT ProjectProperty") + + +# 在main函数中添加对这些函数的调用 +def main(): + # 创建根节点 + root_node = create_root_node() + + # 读取JSON文件 + json_file_path = "dataset/json/主网预算/架空.json" + with open(json_file_path, "r", encoding="utf-8") as f: + data = json.load(f) + + # 先处理费用预览,创建CostSet节点 + process_cost_set(data, root_node) + + # 再处理项目划分,创建ProjectDivisionSet和ProjectDivisionItem节点,并建立USE关系 + process_project_division_set(data, root_node) + + # 处理取费表模板集 + process_fee_table_template_set(data, root_node) + + # 处理费用表集 + process_fee_schedule_set(data, root_node) + + # 处理工程属性集 + process_project_property_set(data, root_node) + + # 统计节点和关系数量 + count_nodes_and_relationships() + + logger.info("知识图谱构建完成") + + +# 添加统计节点和关系数量的函数 +def count_nodes_and_relationships(): + # 统计节点数量 + node_count_query = """ + MATCH (n) + RETURN labels(n) AS labels, count(*) AS count + """ + node_counts = graph.run(node_count_query).data() + logger.info("节点类型统计:") + for count_info in node_counts: + labels = count_info["labels"] + count = count_info["count"] + for label in labels: + logger.info(f" {label}: {count}个节点") + + # 统计关系数量 + rel_count_query = """ + MATCH ()-[r]->() + RETURN type(r) AS type, count(*) AS count + """ + rel_counts = graph.run(rel_count_query).data() + logger.info("关系类型统计:") + for count_info in rel_counts: + rel_type = count_info["type"] + count = count_info["count"] + logger.info(f" {rel_type}: {count}个关系") + + +if __name__ == "__main__": + main()