Files
chentianrui 9609bb67b4 上传文件
2025-08-01 15:31:56 +08:00

1068 lines
47 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from py2neo import Graph, Node, Relationship, NodeMatcher
import json
import os
import logging
# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# 连接到Neo4j数据库
uri = "bolt://172.20.0.145:7687"
user = "neo4j"
password = "password"
try:
graph = Graph(uri, auth=(user, password))
logger.info("成功连接到Neo4j数据库")
except Exception as e:
logger.error(f"连接Neo4j数据库失败: {e}")
exit(1)
# 清空数据库
try:
graph.run("MATCH (n) DETACH DELETE n")
logger.info("已清空数据库")
except Exception as e:
logger.error(f"清空数据库失败: {e}")
# 删除所有约束
try:
# 获取所有约束
constraints = graph.run("SHOW CONSTRAINTS").data()
for constraint in constraints:
constraint_name = constraint.get("name")
if constraint_name:
graph.run(f"DROP CONSTRAINT {constraint_name}")
logger.info(f"已删除约束: {constraint_name}")
except Exception as e:
logger.warning(f"删除约束失败: {e}")
# 创建约束和索引以提高性能 - 现在不创建任何约束
def create_constraints_and_indexes():
# 不创建任何约束
logger.info("跳过创建约束")
pass
# 创建根节点
def create_root_node():
root = Node("EngineeringData", name="工程")
graph.create(root)
logger.info("创建根节点: 工程")
return root
# 处理ProjectDivisionSet
def process_project_division_set(data, root_node):
# 根据您提供的JSON结构,正确访问projectDivision数据
if "projectData" in data and "projectDivision" in data["projectData"]:
project_division = data["projectData"]["projectDivision"]
elif "projectDivision" in data:
project_division = data["projectDivision"]
else:
logger.warning("JSON中未找到projectDivision数据")
logger.info(f"JSON顶层键: {list(data.keys())}")
return
logger.info(f"开始处理projectDivision,包含 {len(project_division)} 个顶级项目")
# 创建新的ProjectDivisionSet节点 - 项目划分集
division_set = Node("ProjectDivisionSet", name="项目划分集")
graph.create(division_set)
graph.create(Relationship(root_node, "CONTAINS", division_set))
logger.info(f"创建ProjectDivisionSet节点: 项目划分集")
# 处理ProjectDivisionTree
for first_level_name, first_level_content in project_division.items():
# 处理第一层下的内容,直接创建合并后的ProjectDivisionTree节点
if isinstance(first_level_content, dict):
# 处理一级名称,去掉"工程"字样
processed_first_level = first_level_name.replace("工程", "")
for second_level_name, second_level_content in first_level_content.items():
# 确定最终节点名称
if second_level_name == processed_first_level:
# 如果二级名称与处理后的一级名称相同,直接使用二级名称
final_name = second_level_name
else:
# 否则组合二级名称和处理后的一级名称
final_name = f"{second_level_name}{processed_first_level}"
# 创建ProjectDivisionTree节点
division_tree = Node("ProjectDivisionTree", name=final_name)
# 保存原始名称作为属性
division_tree["original_first_level"] = first_level_name
division_tree["original_second_level"] = second_level_name
# 如果有GUID,添加到节点属性
guid = None
if isinstance(first_level_content, dict) and "GUID" in first_level_content:
division_tree["first_level_GUID"] = first_level_content["GUID"]
guid = first_level_content["GUID"]
graph.create(division_tree)
graph.create(Relationship(division_set, "CONTAINS", division_tree))
logger.info(f"创建ProjectDivisionTree节点: {final_name}")
# 如果有GUID,尝试建立与CostSet的USE关系
if guid:
# 查找对应的CostSet节点
cost_set_query = f"""
MATCH (c:CostSet)
WHERE c.GUID = '{guid}'
RETURN c
"""
cost_set_nodes = list(graph.run(cost_set_query))
if cost_set_nodes:
cost_set_node = cost_set_nodes[0]["c"]
graph.create(Relationship(division_tree, "USE", cost_set_node))
logger.info(f"创建关系: {final_name} USE CostSet (GUID: {guid})")
# 处理第二层下的ProjectDivisionItem列表
if isinstance(second_level_content, list):
logger.info(f"ProjectDivisionTree {final_name} 包含 {len(second_level_content)} 个列表项")
for item in second_level_content:
process_project_division_item(item, division_tree)
else:
logger.warning(f"ProjectDivisionTree {final_name} 的内容类型未知: {type(second_level_content)}")
else:
logger.warning(f"第一层 {first_level_name} 的内容类型未知: {type(first_level_content)}")
# 处理ProjectDivisionItem
def process_project_division_item(item, parent_node):
# 提取必要属性
guid = item.get("GUID", "")
name = item.get("项目名称", "")
if not guid and not name:
logger.warning("ProjectDivisionItem缺少GUID和项目名称")
return
# 创建ProjectDivisionItem节点
item_node = Node("ProjectDivisionItem", GUID=guid, name=name)
# 添加path属性,表示从ProjectDivisionItem到ProjectDivisionTree的路径,不包含节点类型
if isinstance(parent_node, Node) and "ProjectDivisionTree" in parent_node.labels:
# 如果父节点是ProjectDivisionTree,使用"父节点名称/当前节点名称"作为路径
item_node["path"] = f"{parent_node['name']}/{name}"
logger.info(f"为ProjectDivisionItem {name} 设置path: {item_node['path']}")
else:
# 如果父节点是ProjectDivisionItem,使用"父节点path/当前节点名称"作为路径
parent_path = parent_node.get("path", "")
if parent_path:
item_node["path"] = f"{parent_path}/{name}"
else:
# 如果父节点没有path属性(不应该发生,但为了健壮性)
item_node["path"] = name
logger.info(f"为ProjectDivisionItem {name} 设置path: {item_node['path']}")
# 添加其他属性
for key, value in item.items():
if key not in ["GUID", "项目名称", "children"] and value is not None:
# 检查是否为资源库列表
if key == "资源库列表" and isinstance(value, list):
# 将资源库列表转换为分号分隔的字符串
resource_names = []
for resource in value:
if isinstance(resource, dict) and "资源库名称" in resource:
resource_names.append(resource["资源库名称"])
item_node["资源库名称"] = "".join(resource_names)
logger.info(f"将资源库列表转换为字符串: {item_node['资源库名称']}")
# 检查值是否为基本类型
elif isinstance(value, (str, int, float, bool)):
item_node[key] = value
# 如果是列表,尝试转换为分号分隔的字符串
elif isinstance(value, list):
try:
if all(isinstance(x, (str, int, float, bool)) for x in value):
item_node[key] = "".join(str(x) for x in value)
else:
# 对于包含复杂对象的列表,尝试提取关键信息
extracted_values = []
for item_in_list in value:
if isinstance(item_in_list, dict):
# 尝试提取字典中的名称或标识符
for name_key in ["名称", "name", "标识", "id", "ID"]:
if name_key in item_in_list:
extracted_values.append(str(item_in_list[name_key]))
break
else:
# 如果没有找到名称键,使用第一个键值对
if item_in_list:
first_key = next(iter(item_in_list))
extracted_values.append(f"{first_key}:{item_in_list[first_key]}")
else:
extracted_values.append(str(item_in_list))
item_node[key] = "".join(extracted_values)
except Exception as e:
logger.warning(f"无法将列表属性 {key} 转换为字符串: {e}")
# 如果是字典,尝试转换为字符串
elif isinstance(value, dict):
try:
# 提取字典中的关键信息
extracted_info = []
for dict_key, dict_value in value.items():
if isinstance(dict_value, (str, int, float, bool)):
extracted_info.append(f"{dict_key}:{dict_value}")
item_node[key] = "".join(extracted_info)
except Exception as e:
logger.warning(f"无法将字典属性 {key} 转换为字符串: {e}")
graph.create(item_node)
logger.info(f"创建ProjectDivisionItem节点: {name} (GUID: {guid})")
# 创建与父节点的关系
if isinstance(parent_node, Node) and "ProjectDivisionTree" in parent_node.labels:
graph.create(Relationship(parent_node, "CONTAINS", item_node))
logger.info(f"创建关系: {parent_node['name']} CONTAINS {name}")
else:
graph.create(Relationship(parent_node, "HAS_CHILD", item_node))
logger.info(f"创建关系: {parent_node['name']} HAS_CHILD {name}")
# 如果有GUID,尝试建立与CostSet的USE关系
if guid:
# 查找对应的CostSet节点
cost_set_query = f"""
MATCH (c:CostSet)
WHERE c.GUID = '{guid}'
RETURN c
"""
cost_set_nodes = list(graph.run(cost_set_query))
if cost_set_nodes:
cost_set_node = cost_set_nodes[0]["c"]
graph.create(Relationship(item_node, "USE", cost_set_node))
logger.info(f"创建关系: {name} USE CostSet (GUID: {guid})")
# 处理子项
if "children" in item and item["children"]:
children = item["children"]
logger.info(f"ProjectDivisionItem {name}{len(children)} 个子项")
for child in children:
child_type = child.get("type", child.get("类型", ""))
if child_type == "项目划分":
# 递归处理子ProjectDivisionItem
process_project_division_item(child, item_node)
elif child_type == "8" or child_type == "清单":
# 处理List类型节点
process_list_item(child, item_node)
else:
# 处理ProjectQuantity及其子类
process_project_quantity(child, item_node)
# 处理List及其子类
def process_list_item(list_item, parent_node):
"""处理清单类型的节点"""
# 提取必要属性
guid = list_item.get("GUID", "")
list_name = list_item.get("清单名称")
list_type = list_item.get("类型", "")
# 创建List节点
list_node = Node("List", guid=guid, name=list_name, type=list_type)
# 添加path属性,包含节点类型
parent_path = parent_node.get("path", "")
if parent_path:
list_node["path"] = f"{parent_path}/{list_name}(清单)"
else:
# 如果父节点没有path属性(不应该发生,但为了健壮性)
parent_name = parent_node.get("name", "")
list_node["path"] = f"{parent_name}/{list_name}(清单)"
logger.info(f"为List节点 {list_name} 设置path: {list_node['path']}")
# 添加其他属性
for key, value in list_item.items():
if key not in ["清单名称", "类型", "guid", "children"] and value is not None:
# 检查值是否为基本类型
if isinstance(value, (str, int, float, bool)):
list_node[key] = value
# 如果是列表,尝试转换为分号分隔的字符串
elif isinstance(value, list):
try:
if all(isinstance(x, (str, int, float, bool)) for x in value):
list_node[key] = "".join(str(x) for x in value)
else:
# 对于包含复杂对象的列表,尝试提取关键信息
extracted_values = []
for item_in_list in value:
if isinstance(item_in_list, dict):
# 尝试提取字典中的名称或标识符
for name_key in ["名称", "name", "标识", "id", "ID"]:
if name_key in item_in_list:
extracted_values.append(str(item_in_list[name_key]))
break
else:
# 如果没有找到名称键,使用第一个键值对
if item_in_list:
first_key = next(iter(item_in_list))
extracted_values.append(f"{first_key}:{item_in_list[first_key]}")
else:
extracted_values.append(str(item_in_list))
list_node[key] = "".join(extracted_values)
except Exception as e:
logger.warning(f"无法将列表属性 {key} 转换为字符串: {e}")
# 如果是字典,尝试转换为字符串
elif isinstance(value, dict):
try:
# 提取字典中的关键信息
extracted_info = []
for dict_key, dict_value in value.items():
if isinstance(dict_value, (str, int, float, bool)):
extracted_info.append(f"{dict_key}:{dict_value}")
list_node[key] = "".join(extracted_info)
except Exception as e:
logger.warning(f"无法将字典属性 {key} 转换为字符串: {e}")
graph.create(list_node)
logger.info(f"创建List节点: {list_name} (类型: {list_type})")
# 创建与父节点的关系
graph.create(Relationship(parent_node, "HAS_CHILD", list_node))
logger.info(f"创建关系: {parent_node['name']} HAS_CHILD {list_name}")
# 如果有GUID,尝试建立与CostSet的USE关系
if guid:
# 查找对应的CostSet节点
cost_set_query = f"""
MATCH (c:CostSet)
WHERE c.GUID = '{guid}'
RETURN c
"""
cost_set_nodes = list(graph.run(cost_set_query))
if cost_set_nodes:
cost_set_node = cost_set_nodes[0]["c"]
graph.create(Relationship(list_node, "USE", cost_set_node))
logger.info(f"创建关系: {list_name} USE CostSet (GUID: {guid})")
# 处理子项
if "children" in list_item and list_item["children"]:
children = list_item["children"]
logger.info(f"List {list_name}{len(children)} 个子项")
for child in children:
# 确定子项类型
child_type = child.get("type", child.get("类型", ""))
if child_type == "项目划分":
# 递归处理子ProjectDivisionItem
process_project_division_item(child, list_node)
elif child_type == "8" or child_type == "清单":
# 递归处理子List
process_list_item(child, list_node)
else:
# 处理ProjectQuantity及其子类
process_project_quantity(child, list_node)
# 处理ProjectQuantity及其子类
def process_project_quantity(quantity, parent_node):
# 确定具体类型
quantity_type = quantity.get("类型", "")
labels = ["ProjectQuantity"]
type_name = "ProjectQuantity"
# 支持数字和文本类型
if quantity_type == "0" or quantity_type == "定额":
labels.append("Quota")
type_name = "定额"
elif quantity_type == "1" or quantity_type == "主材":
labels.append("MainMaterial")
type_name = "主材"
elif quantity_type == "5" or quantity_type == "设备":
labels.append("Equipment")
type_name = "设备"
# 创建节点
quantity_id = quantity.get("id", "")
quantity_name = quantity.get("项目名称", quantity.get("名称", ""))
quantity_node = Node(*labels, id=quantity_id, name=quantity_name)
# 添加path属性,包含节点类型
parent_path = parent_node.get("path", "")
if parent_path:
quantity_node["path"] = f"{parent_path}/{quantity_name}({type_name})"
else:
# 如果父节点没有path属性(不应该发生,但为了健壮性)
parent_name = parent_node.get("name", "")
quantity_node["path"] = f"{parent_name}/{quantity_name}({type_name})"
logger.info(f"为ProjectQuantity节点 {quantity_name} 设置path: {quantity_node['path']}")
# 添加其他属性
for key, value in quantity.items():
if key not in ["id", "名称", "项目名称", "材机列表", "children"] and value is not None:
# 检查是否为资源库列表
if key == "资源库列表" and isinstance(value, list):
# 将资源库列表转换为分号分隔的字符串
resource_names = []
for resource in value:
if isinstance(resource, dict) and "资源库名称" in resource:
resource_names.append(resource["资源库名称"])
quantity_node["资源库名称"] = "".join(resource_names)
logger.info(f"将资源库列表转换为字符串: {quantity_node['资源库名称']}")
# 检查值是否为基本类型
elif isinstance(value, (str, int, float, bool)):
quantity_node[key] = value
# 如果是列表,尝试转换为分号分隔的字符串
elif isinstance(value, list):
try:
if all(isinstance(x, (str, int, float, bool)) for x in value):
quantity_node[key] = "".join(str(x) for x in value)
else:
# 对于包含复杂对象的列表,尝试提取关键信息
extracted_values = []
for item_in_list in value:
if isinstance(item_in_list, dict):
# 尝试提取字典中的名称或标识符
for name_key in ["名称", "name", "标识", "id", "ID"]:
if name_key in item_in_list:
extracted_values.append(str(item_in_list[name_key]))
break
else:
# 如果没有找到名称键,使用第一个键值对
if item_in_list:
first_key = next(iter(item_in_list))
extracted_values.append(f"{first_key}:{item_in_list[first_key]}")
else:
extracted_values.append(str(item_in_list))
quantity_node[key] = "".join(extracted_values)
except Exception as e:
logger.warning(f"无法将列表属性 {key} 转换为字符串: {e}")
# 如果是字典,尝试转换为字符串
elif isinstance(value, dict):
try:
# 提取字典中的关键信息
extracted_info = []
for dict_key, dict_value in value.items():
if isinstance(dict_value, (str, int, float, bool)):
extracted_info.append(f"{dict_key}:{dict_value}")
quantity_node[key] = "".join(extracted_info)
except Exception as e:
logger.warning(f"无法将字典属性 {key} 转换为字符串: {e}")
graph.create(quantity_node)
logger.info(f"创建ProjectQuantity节点: {quantity_name} (id: {quantity_id}, 类型: {quantity_type})")
# 创建与父节点的关系
graph.create(Relationship(parent_node, "HAS_COMPONENT", quantity_node))
logger.info(f"创建关系: {parent_node['name']} HAS_COMPONENT {quantity_name}")
# 处理材机列表或children
materials = None
# 先检查是否有材机列表
if "材机列表" in quantity and quantity["材机列表"]:
materials = quantity["材机列表"]
logger.info(f"ProjectQuantity {quantity_name}{len(materials)} 个材机项")
for material in materials:
process_material_or_equipment(material, quantity_node)
# 如果没有材机列表,则检查是否有children
elif "children" in quantity and quantity["children"]:
children = quantity["children"]
logger.info(f"ProjectQuantity {quantity_name}{len(children)} 个子项")
for child in children:
child_type = child.get("类型", child.get("type", ""))
# 如果子项类型为人工、材料或机械,则视为MaterialOrEquipment
if child_type in ["人工", "材料", "机械", "2", "3", "4"]:
process_material_or_equipment(child, quantity_node)
# 如果子项类型为主材、设备或定额,则递归处理为ProjectQuantity
elif child_type in ["1", "主材", "5", "设备", "0", "定额"]:
process_project_quantity(child, quantity_node)
# 处理MaterialOrEquipment
def process_material_or_equipment(material, parent_node):
material_id = material.get("id", material.get("ID", ""))
material_name = material.get("名称", "")
material_type = material.get("类型", material.get("type", ""))
if not material_id and not material_name:
logger.warning("MaterialOrEquipment缺少id和名称")
return
# 创建唯一标识,结合父节点的ID和当前项的id
parent_id = parent_node.get("id", parent_node.get("GUID", ""))
unique_id = f"{parent_id}_{material_id}" if parent_id else material_id
# 直接创建新节点,不检查是否已存在
material_node = Node(
"MaterialOrEquipment", id=material_id, unique_id=unique_id, name=material_name, type=material_type
)
# 添加其他属性
for key, value in material.items():
if key not in ["id", "ID", "名称", "类型", "type"] and value is not None:
# 检查是否为资源库列表
if key == "资源库列表" and isinstance(value, list):
# 将资源库列表转换为分号分隔的字符串
resource_names = []
for resource in value:
if isinstance(resource, dict) and "资源库名称" in resource:
resource_names.append(resource["资源库名称"])
material_node["资源库名称"] = "".join(resource_names)
logger.info(f"将资源库列表转换为字符串: {material_node['资源库名称']}")
# 检查值是否为基本类型
elif isinstance(value, (str, int, float, bool)):
material_node[key] = value
# 如果是列表,尝试转换为分号分隔的字符串
elif isinstance(value, list):
try:
if all(isinstance(x, (str, int, float, bool)) for x in value):
material_node[key] = "".join(str(x) for x in value)
else:
# 对于包含复杂对象的列表,尝试提取关键信息
extracted_values = []
for item_in_list in value:
if isinstance(item_in_list, dict):
# 尝试提取字典中的名称或标识符
for name_key in ["名称", "name", "标识", "id", "ID"]:
if name_key in item_in_list:
extracted_values.append(str(item_in_list[name_key]))
break
else:
# 如果没有找到名称键,使用第一个键值对
if item_in_list:
first_key = next(iter(item_in_list))
extracted_values.append(f"{first_key}:{item_in_list[first_key]}")
else:
extracted_values.append(str(item_in_list))
material_node[key] = "".join(extracted_values)
except Exception as e:
logger.warning(f"无法将列表属性 {key} 转换为字符串: {e}")
# 如果是字典,尝试转换为字符串
elif isinstance(value, dict):
try:
# 提取字典中的关键信息
extracted_info = []
for dict_key, dict_value in value.items():
if isinstance(dict_value, (str, int, float, bool)):
extracted_info.append(f"{dict_key}:{dict_value}")
material_node[key] = "".join(extracted_info)
except Exception as e:
logger.warning(f"无法将字典属性 {key} 转换为字符串: {e}")
graph.create(material_node)
logger.info(f"创建MaterialOrEquipment节点: {material_name} (id: {material_id}, 类型: {material_type})")
# 创建与父节点的关系
graph.create(Relationship(parent_node, "OWNERSHIP", material_node))
logger.info(f"创建关系: {parent_node['name']} OWNERSHIP {material_name}")
# 处理CostSet
def process_cost_set(data, root_node):
# 根据您提供的JSON结构,正确访问expensePreview数据
expense_preview = None
if "projectData" in data and "expensePreview" in data["projectData"]:
expense_preview = data["projectData"]["expensePreview"]
elif "expensePreview" in data:
expense_preview = data["expensePreview"]
else:
logger.warning("JSON中未找到expensePreview数据")
logger.info(f"JSON顶层键: {list(data.keys())}")
return
logger.info("开始处理expensePreview")
# 处理安装工程节点
if "安装工程" in expense_preview:
install_cost_set = Node("CostSet", name="安装工程")
graph.create(install_cost_set)
# graph.create(Relationship(root_node, "HAS_COST_SET", install_cost_set))
logger.info("创建CostSet节点: 安装工程")
# 处理安装节点
if "安装" in expense_preview["安装工程"]:
install_sub_cost_set = Node("CostSet", name="安装")
graph.create(install_sub_cost_set)
graph.create(Relationship(install_cost_set, "HAS_CHILD", install_sub_cost_set))
logger.info("创建CostSet节点: 安装")
# 处理安装下的CostSet列表
for cost_set in expense_preview["安装工程"]["安装"]:
process_cost_set_recursive(cost_set, install_sub_cost_set)
# 递归处理CostSet
def process_cost_set_recursive(cost_set, parent_node):
guid = cost_set.get("GUID", "")
# 使用GUID作为名称,如果GUID为空,则尝试使用项目名称或name
name = guid if guid else cost_set.get("项目名称", cost_set.get("name", "未命名CostSet"))
# 创建CostSet节点
cost_set_node = Node("CostSet", GUID=guid, name=name)
# 添加其他属性
for key, value in cost_set.items():
if key not in ["GUID", "children", "项目名称", "name"] and value is not None:
cost_set_node[key] = value
graph.create(cost_set_node)
logger.info(f"创建CostSet节点: {name} (GUID: {guid})")
# # 创建与父节点的关系
# graph.create(Relationship(parent_node, "HAS_CHILD", cost_set_node))
# logger.info(f"创建关系: {parent_node['name']} HAS_CHILD {name}")
# 处理子项
if "children" in cost_set and cost_set["children"]:
children = cost_set["children"]
logger.info(f"CostSet {name}{len(children)} 个子项")
for child in children:
if "GUID" in child:
# 递归处理子CostSet
process_cost_set_recursive(child, cost_set_node)
else:
# 处理CostItem
process_cost_item(child, cost_set_node)
# 处理CostItem
def process_cost_item(item, parent_node):
item_id = item.get("id", "")
cost = item.get("cost", "")
# 使用id作为名称,如果id为空,则尝试使用name或项目名称
name = item_id if item_id else item.get("name", item.get("项目名称", "未命名CostItem"))
# 创建唯一标识,结合父节点的GUID和当前项的id
parent_guid = parent_node.get("GUID", "")
unique_id = f"{parent_guid}_{item_id}" if parent_guid else item_id
# 直接创建新节点,不检查是否已存在
item_node = Node("CostItem", id=item_id, unique_id=unique_id, cost=cost, name=name)
# 添加其他属性
for key, value in item.items():
if key not in ["id", "cost", "name", "项目名称"] and value is not None:
# 检查是否为资源库列表
if key == "资源库列表" and isinstance(value, list):
# 将资源库列表转换为分号分隔的字符串
resource_names = []
for resource in value:
if isinstance(resource, dict) and "资源库名称" in resource:
resource_names.append(resource["资源库名称"])
item_node["资源库名称"] = "".join(resource_names)
logger.info(f"将资源库列表转换为字符串: {item_node['资源库名称']}")
# 检查值是否为基本类型
elif isinstance(value, (str, int, float, bool)):
item_node[key] = value
# 如果是列表,尝试转换为分号分隔的字符串
elif isinstance(value, list):
try:
if all(isinstance(x, (str, int, float, bool)) for x in value):
item_node[key] = "".join(str(x) for x in value)
else:
# 对于包含复杂对象的列表,尝试提取关键信息
extracted_values = []
for item_in_list in value:
if isinstance(item_in_list, dict):
# 尝试提取字典中的名称或标识符
for name_key in ["名称", "name", "标识", "id", "ID"]:
if name_key in item_in_list:
extracted_values.append(str(item_in_list[name_key]))
break
else:
# 如果没有找到名称键,使用第一个键值对
if item_in_list:
first_key = next(iter(item_in_list))
extracted_values.append(f"{first_key}:{item_in_list[first_key]}")
else:
extracted_values.append(str(item_in_list))
item_node[key] = "".join(extracted_values)
except Exception as e:
logger.warning(f"无法将列表属性 {key} 转换为字符串: {e}")
# 如果是字典,尝试转换为字符串
elif isinstance(value, dict):
try:
# 提取字典中的关键信息
extracted_info = []
for dict_key, dict_value in value.items():
if isinstance(dict_value, (str, int, float, bool)):
extracted_info.append(f"{dict_key}:{dict_value}")
item_node[key] = "".join(extracted_info)
except Exception as e:
logger.warning(f"无法将字典属性 {key} 转换为字符串: {e}")
graph.create(item_node)
logger.info(f"创建CostItem节点: {name} (id: {item_id}, unique_id: {unique_id})")
# 创建与父节点的关系
graph.create(Relationship(parent_node, "CONTAINS", item_node))
logger.info(f"创建关系: {parent_node['name']} CONTAINS {name}")
# 建立实体间的关系
def establish_relationships():
# 建立ProjectDivisionItem与CostSet的关系
query_division_item = """
MATCH (pdi:ProjectDivisionItem), (cs:CostSet)
WHERE pdi.GUID = cs.GUID AND pdi.GUID <> ""
CREATE (pdi)-[:USE]->(cs)
RETURN count(*) as count
"""
try:
result = graph.run(query_division_item)
count = result.data()[0]["count"]
logger.info(f"创建了 {count} 个 ProjectDivisionItem USE CostSet 关系")
except Exception as e:
logger.error(f"创建ProjectDivisionItem与CostSet关系失败: {e}")
# 建立ProjectQuantity与CostSet的关系
query_quantity = """
MATCH (pq:ProjectQuantity), (cs:CostSet)
WHERE pq.id = cs.id AND pq.id <> ""
CREATE (pq)-[:USE]->(cs)
RETURN count(*) as count
"""
try:
result = graph.run(query_quantity)
count = result.data()[0]["count"]
logger.info(f"创建了 {count} 个 ProjectQuantity USE CostSet 关系")
except Exception as e:
logger.error(f"创建ProjectQuantity与CostSet关系失败: {e}")
# 处理取费表模板集(FeeTableTemplateSet)
def process_fee_table_template_set(data, root_node):
# 根据JSON结构,访问costSetting数据
if "projectData" in data and "costSetting" in data["projectData"]:
cost_setting = data["projectData"]["costSetting"]
elif "costSetting" in data:
cost_setting = data["costSetting"]
else:
logger.warning("JSON中未找到costSetting数据")
logger.info(f"JSON顶层键: {list(data.keys())}")
return
logger.info(f"开始处理costSetting,包含 {len(cost_setting)} 个取费表模板集")
# 只创建一个取费表模板集节点
fee_template_set_node = Node("FeeTableTemplateSet", name="取费表模板集")
graph.create(fee_template_set_node)
graph.create(Relationship(root_node, "CONTAINS", fee_template_set_node))
logger.info("创建FeeTableTemplateSet节点: 取费表模板集")
# 处理每个取费表模板集
for template_set_name, template_set_content in cost_setting.items():
# 直接处理取费表模板项
if "tables" in template_set_content and isinstance(template_set_content["tables"], list):
for template_item in template_set_content["tables"]:
process_fee_table_template_item(template_item, fee_template_set_node)
# 处理取费表模板项(FeeTableTemplateItem)
def process_fee_table_template_item(template_item, parent_node):
# 提取必要属性
name = template_item.get("name", "")
outlay_id = template_item.get("OutlayID", "")
type_name = template_item.get("类型", "")
profession = template_item.get("专业", "")
if not name:
logger.warning("FeeTableTemplateItem缺少name")
return
# 创建取费表模板项节点
template_item_node = Node(
"FeeTableTemplateItem", name=name, outlayID=outlay_id, type=type_name, profession=profession
)
graph.create(template_item_node)
graph.create(Relationship(parent_node, "CONTAINS", template_item_node))
logger.info(f"创建FeeTableTemplateItem节点: {name} (OutlayID: {outlay_id})")
# 处理取费项
if "children" in template_item and isinstance(template_item["children"], list):
for fee_item in template_item["children"]:
process_fee(fee_item, template_item_node)
# 处理取费(FeeCollection)
def process_fee(fee_item, parent_node):
# 提取必要属性
serial_number = fee_item.get("序号", "")
fee_name = fee_item.get("费用名称", "")
code = fee_item.get("代码", "")
rate = fee_item.get("费率(%)", "")
base = fee_item.get("取费基数", "")
remark = fee_item.get("备注", "")
if not fee_name:
logger.warning("Fee缺少费用名称")
return
# 创建取费节点
fee_node = Node("FeeCollection", serialNumber=serial_number, name=fee_name, code=code)
# 添加path属性
if "FeeCollection" in parent_node.labels:
# 如果父节点是FeeCollection节点,使用父节点的path加上当前节点名称
parent_path = parent_node.get("path", "")
fee_node["path"] = f"{parent_path}/{fee_name}"
else:
# 如果父节点是FeeTableTemplateItem,直接使用父节点名称作为路径的开始
parent_name = parent_node.get("name", "")
fee_node["path"] = f"{parent_name}/{fee_name}"
logger.info(f"为FeeCollection节点 {fee_name} 设置path: {fee_node['path']}")
# 添加其他属性
if rate:
fee_node["rate"] = rate
if base:
fee_node["base"] = base
if remark:
fee_node["remark"] = remark
graph.create(fee_node)
graph.create(Relationship(parent_node, "HAS_COMPONENT", fee_node))
logger.info(f"创建Fee节点: {fee_name} (序号: {serial_number}, 代码: {code})")
# 处理子费用项
if "children" in fee_item and isinstance(fee_item["children"], list):
for child_fee in fee_item["children"]:
process_fee(child_fee, fee_node)
# 处理费用表集(FeeScheduleSet)
def process_fee_schedule_set(data, root_node):
"""处理费用表集、费用表项和费用"""
# 检查projectCost是否存在
if "projectData" in data and "projectCost" in data["projectData"]:
project_cost = data["projectData"]["projectCost"]
elif "projectCost" in data:
project_cost = data["projectCost"]
else:
logger.warning("JSON中未找到projectCost数据")
logger.info(f"JSON顶层键: {list(data.keys())}")
return
logger.info(f"开始处理projectCost,包含 {len(project_cost)} 个费用表项")
# 创建FeeScheduleSet节点 - 工程费用
fee_schedule_set = Node("FeeScheduleSet", name="工程费用")
graph.create(fee_schedule_set)
graph.create(Relationship(root_node, "CONTAINS", fee_schedule_set))
logger.info(f"创建FeeScheduleSet节点: 工程费用")
# 处理费用表集下的费用表项
for fee_table_name, fee_table_content in project_cost.items():
# 创建FeeScheduleItem节点
fee_schedule_item = Node("FeeScheduleItem", name=fee_table_name)
graph.create(fee_schedule_item)
graph.create(Relationship(fee_schedule_set, "CONTAINS", fee_schedule_item))
logger.info(f"创建FeeScheduleItem节点: {fee_table_name}")
# 处理费用表项下的费用列表
if isinstance(fee_table_content, list):
logger.info(f"FeeScheduleItem {fee_table_name} 包含 {len(fee_table_content)} 个费用项")
for fee_item in fee_table_content:
process_fee_item(fee_item, fee_schedule_item)
else:
logger.warning(f"FeeScheduleItem {fee_table_name} 的内容类型未知: {type(fee_table_content)}")
# 处理费用项(Fee)
def process_fee_item(fee, parent_node):
"""处理费用项"""
# 提取必要属性
serial_number = fee.get("序号", "")
name = fee.get("费用名称", "")
code = fee.get("代码", "")
rate = fee.get("费率(%)", "")
amount = fee.get("金额", "")
if not name:
logger.warning("Fee缺少费用名称")
return
# 创建Fee节点
fee_node = Node("Fee", serialNumber=serial_number, name=name, code=code)
# 添加path属性
if "Fee" in parent_node.labels:
# 如果父节点是Fee节点,使用父节点的path加上当前节点名称
parent_path = parent_node.get("path", "")
fee_node["path"] = f"{parent_path}/{name}"
else:
# 如果父节点是FeeScheduleItem,直接使用父节点名称作为路径的开始
parent_name = parent_node.get("name", "")
fee_node["path"] = f"{parent_name}/{name}"
logger.info(f"为Fee节点 {name} 设置path: {fee_node['path']}")
# 添加其他属性
if rate:
fee_node["rate"] = rate
if amount:
fee_node["amount"] = amount
# 添加其他属性
for key, value in fee.items():
if key not in ["序号", "费用名称", "代码", "费率(%)", "金额", "children"] and value is not None:
if isinstance(value, (str, int, float, bool)):
fee_node[key] = value
graph.create(fee_node)
logger.info(f"创建Fee节点: {name} (序号: {serial_number})")
# 创建与父节点的关系
graph.create(Relationship(parent_node, "HAS_COMPONENT", fee_node))
# 处理子费用项
if "children" in fee and fee["children"]:
children = fee["children"]
logger.info(f"Fee {name}{len(children)} 个子费用项")
for child in children:
process_fee_item(child, fee_node)
# 处理工程属性集(ProjectPropertySet)和工程属性(ProjectProperty)
def process_project_property_set(data, root_node):
# 检查projectInfo是否存在
if "projectData" in data and "projectInfo" in data["projectData"]:
project_info = data["projectData"]["projectInfo"]
elif "projectInfo" in data:
project_info = data["projectInfo"]
else:
logger.warning("JSON中未找到projectInfo数据")
logger.info(f"JSON顶层键: {list(data.keys())}")
return
logger.info("开始处理projectInfo")
# 创建工程属性集节点
property_set_node = Node("ProjectPropertySet", name="工程属性")
graph.create(property_set_node)
graph.create(Relationship(root_node, "CONTAINS", property_set_node))
logger.info("创建ProjectPropertySet节点: 工程属性")
# 为每个属性创建单独的节点
for key, value in project_info.items():
if value is not None:
# 处理复杂类型的值
if isinstance(value, list):
# 对于列表类型,创建一个包含列表摘要的节点
property_node = Node("ProjectProperty", name=key, value=f"列表({len(value)}项)")
graph.create(property_node)
graph.create(Relationship(property_set_node, "HAS_COMPONENT", property_node))
logger.info(f"创建ProjectProperty节点: {key} = 列表({len(value)}项)")
# 为列表中的每个项创建子节点
for i, item in enumerate(value):
if isinstance(item, dict):
# 对于字典类型的列表项,创建包含键值对的节点
for sub_key, sub_value in item.items():
if sub_value is not None:
sub_property_node = Node(
"ProjectProperty", name=f"{key}[{i}].{sub_key}", value=str(sub_value)
)
graph.create(sub_property_node)
graph.create(Relationship(property_node, "HAS_COMPONENT", sub_property_node))
logger.info(f"创建ProjectProperty子节点: {key}[{i}].{sub_key} = {sub_value}")
else:
# 对于基本类型的列表项,创建简单节点
sub_property_node = Node("ProjectProperty", name=f"{key}[{i}]", value=str(item))
graph.create(sub_property_node)
graph.create(Relationship(property_node, "HAS_COMPONENT", sub_property_node))
logger.info(f"创建ProjectProperty子节点: {key}[{i}] = {item}")
elif isinstance(value, dict):
# 对于字典类型,创建一个包含字典摘要的节点
property_node = Node("ProjectProperty", name=key, value=f"字典({len(value)}项)")
graph.create(property_node)
graph.create(Relationship(property_set_node, "HAS_COMPONENT", property_node))
logger.info(f"创建ProjectProperty节点: {key} = 字典({len(value)}项)")
# 为字典中的每个键值对创建子节点
for sub_key, sub_value in value.items():
if sub_value is not None:
sub_property_node = Node("ProjectProperty", name=f"{key}.{sub_key}", value=str(sub_value))
graph.create(sub_property_node)
graph.create(Relationship(property_node, "HAS_COMPONENT", sub_property_node))
logger.info(f"创建ProjectProperty子节点: {key}.{sub_key} = {sub_value}")
else:
# 对于基本类型,直接创建节点
property_node = Node("ProjectProperty", name=key, value=str(value))
graph.create(property_node)
graph.create(Relationship(property_set_node, "HAS_COMPONENT", property_node))
logger.info(f"创建ProjectProperty节点: {key} = {value}")
# 在main函数中添加对这些函数的调用
def main():
# 创建根节点
root_node = create_root_node()
# 读取JSON文件
json_file_path = "dataset/json/主网预算/架空.json"
with open(json_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# 先处理费用预览,创建CostSet节点
process_cost_set(data, root_node)
# 再处理项目划分,创建ProjectDivisionSet和ProjectDivisionItem节点
process_project_division_set(data, root_node)
# 处理取费表模板集
process_fee_table_template_set(data, root_node)
# 处理费用表集
process_fee_schedule_set(data, root_node)
# 处理工程属性集
process_project_property_set(data, root_node)
# 统计节点和关系数量
count_nodes_and_relationships()
logger.info("知识图谱构建完成")
# 添加统计节点和关系数量的函数
def count_nodes_and_relationships():
# 统计节点数量
node_count_query = """
MATCH (n)
RETURN labels(n) AS labels, count(*) AS count
"""
node_counts = graph.run(node_count_query).data()
logger.info("节点类型统计:")
for count_info in node_counts:
labels = count_info["labels"]
count = count_info["count"]
for label in labels:
logger.info(f" {label}: {count}个节点")
# 统计关系数量
rel_count_query = """
MATCH ()-[r]->()
RETURN type(r) AS type, count(*) AS count
"""
rel_counts = graph.run(rel_count_query).data()
logger.info("关系类型统计:")
for count_info in rel_counts:
rel_type = count_info["type"]
count = count_info["count"]
logger.info(f" {rel_type}: {count}个关系")
if __name__ == "__main__":
main()