Files
KG_generation/equipment_calculation/item_acquisition.py
T
2025-08-18 15:14:37 +08:00

789 lines
30 KiB
Python

import json
import os
from copy import deepcopy
from typing import Dict, Any, Optional, Set
# 添加全局缓存
_bill_node_cache = {}
def find_project_division_node(node, project_name, project_guid=None, result=None):
"""递归查找指定项目名称的项目划分节点,可选匹配GUID"""
if result is not None and result:
return result
if isinstance(node, dict):
# 检查是否是项目划分节点
is_project_division = node.get("type") == "项目划分"
# 检查项目名称是否匹配
name_matches = node.get("项目名称") == project_name
# 检查GUID是否匹配(如果提供了GUID)
guid_matches = True
if project_guid:
node_guid = node.get("GUID") or node.get("guid")
guid_matches = node_guid == project_guid
# 如果是项目划分节点,且名称和GUID都匹配(或没有提供GUID)
if is_project_division and name_matches and guid_matches:
return [node]
# 递归查找子节点
for key, value in node.items():
if isinstance(value, (dict, list)):
result = find_project_division_node(value, project_name, project_guid, result)
if result:
return result
elif isinstance(node, list):
for item in node:
if isinstance(item, (dict, list)):
result = find_project_division_node(item, project_name, project_guid, result)
if result:
return result
return result or []
def find_cost_table(cost_setting, table_name):
"""在costSetting中查找指定名称的取费表"""
if isinstance(cost_setting, dict):
if cost_setting.get("name") == table_name:
return cost_setting
for key, value in cost_setting.items():
if isinstance(value, (dict, list)):
result = find_cost_table(value, table_name)
if result:
return result
elif isinstance(cost_setting, list):
for item in cost_setting:
if isinstance(item, (dict, list)):
result = find_cost_table(item, table_name)
if result:
return result
return None
def map_quantity_node_types(node):
"""映射工程量节点的类型和费用类型"""
if not isinstance(node, dict):
return node
# 复制节点以避免修改原始数据
node_copy = deepcopy(node)
# 映射类型字段
if "类型" in node_copy:
type_mapping = {"0": "定额", "1": "主材", "5": "设备"}
if node_copy["类型"] in type_mapping:
node_copy["类型"] = type_mapping[node_copy["类型"]]
# 映射费用类型字段
if "费用类型" in node_copy:
if node_copy["费用类型"] == "0":
node_copy["费用类型"] = "取费"
# 如果已经是"取费"则保持不变
# 递归处理子节点
if "children" in node_copy and node_copy["children"]:
node_copy["children"] = [map_quantity_node_types(child) for child in node_copy["children"]]
return node_copy
def map_resource_node_types(node):
"""映射人材机节点的类型"""
if not isinstance(node, dict):
return node
# 复制节点以避免修改原始数据
node_copy = deepcopy(node)
# 映射类型字段
if "类型" in node_copy:
type_mapping = {
"2": "人工",
"3": "材料",
"4": "机械",
}
if node_copy["类型"] in type_mapping:
node_copy["类型"] = type_mapping[node_copy["类型"]]
# 递归处理子节点
if "children" in node_copy and node_copy["children"]:
node_copy["children"] = [map_resource_node_types(child) for child in node_copy["children"]]
return node_copy
def extract_resource_nodes(node, parent_id=None):
"""
提取人材机节点并保持父子结构
Args:
node: 节点
parent_id: 父级节点ID
Returns:
list: 提取的人材机节点列表,每个节点包含parent_id字段
"""
if not isinstance(node, dict):
return [] # 返回空列表而不是None,便于后续处理
# 获取当前节点ID
current_id = node.get("id")
resource_nodes = []
# 检查是否是人材机节点(支持字符串和数字类型)
node_type = node.get("类型")
is_resource_node = False
# 同时支持数字类型和字符串类型
if node_type in ["人工", "材料", "机械", "2", "3", "4"]:
is_resource_node = True
# 复制节点但不包含children和材机列表
node_copy = {k: v for k, v in node.items() if k not in ["children", "材机列表"]}
# 添加父级ID
if parent_id:
node_copy["parent_id"] = parent_id
# 添加到资源列表
resource_nodes.append(node_copy)
# 合并处理材机列表和children字段 - 只处理一种来源的子节点,优先处理材机列表
child_nodes = []
if "材机列表" in node and node["材机列表"]:
child_nodes = node["材机列表"]
elif "children" in node and node["children"] and not is_resource_node:
# 只有在不是资源节点时才处理children
child_nodes = node["children"]
# 处理子节点
for child in child_nodes:
child_copy = deepcopy(child)
child_copy["parent_id"] = current_id
# 递归处理
sub_resources = extract_resource_nodes(child_copy, current_id)
if sub_resources:
resource_nodes.extend(sub_resources)
return resource_nodes
def process_project_children(children):
"""处理项目子节点,分离工程量节点和人材机节点,支持嵌套的工程量节点"""
if not children:
return None, None
quantity_nodes = []
resource_nodes = []
for child in children:
# 对于工程量节点,深拷贝用于工程量树
quantity_node = deepcopy(child)
# 应用工程量节点类型映射
quantity_node = map_quantity_node_types(quantity_node)
# 提取人材机节点 - extract_resource_nodes会递归提取所有层级的人材机节点
resources = extract_resource_nodes(child)
if resources:
# 应用人材机节点类型映射
resources = [map_resource_node_types(resource) for resource in resources]
resource_nodes.extend(resources)
# 从工程量节点中移除材机列表
if "材机列表" in quantity_node:
del quantity_node["材机列表"]
# 清理子节点中的人材机节点
clean_resource_nodes_from_quantity(quantity_node)
# 递归处理剩余的工程量子节点
if "children" in quantity_node and quantity_node["children"]:
sub_quantity_nodes, sub_resource_nodes = process_project_children(quantity_node["children"])
# 更新quantity_node的children
if sub_quantity_nodes:
quantity_node["children"] = sub_quantity_nodes
else:
quantity_node.pop("children", None) # 使用pop安全删除
# 合并子节点中提取的资源节点
if sub_resource_nodes:
resource_nodes.extend(sub_resource_nodes)
# 添加到工程量节点列表
quantity_nodes.append(quantity_node)
return (quantity_nodes if quantity_nodes else None, resource_nodes if resource_nodes else None)
def clean_resource_nodes_from_quantity(node):
"""递归清理工程量节点中的人材机节点"""
if not isinstance(node, dict):
return
# 清理材机列表
if "材机列表" in node:
del node["材机列表"]
# 清理子节点中的人材机节点
if "children" in node and node["children"]:
# 过滤掉人材机类型的子节点
node["children"] = [
c for c in node["children"] if not (isinstance(c, dict) and c.get("类型") in ["人工", "材料", "机械"])
]
# 递归清理剩余子节点
for child in node["children"]:
clean_resource_nodes_from_quantity(child)
# 如果没有子节点了,删除children属性
if not node["children"]:
del node["children"]
def load_project_data(json_file_path, project_name, project_guid=None):
"""加载JSON数据并获取目标项目节点"""
try:
# 读取JSON文件
with open(json_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# 获取projectData中的costSetting和projectDivision
project_data = data.get("projectData", {})
cost_setting = project_data.get("costSetting", {})
project_division = project_data.get("projectDivision", {})
# 查找指定项目名称和GUID的节点
target_nodes = find_project_division_node(project_division, project_name, project_guid)
if not target_nodes:
if project_guid:
print(f"未找到项目名称为 '{project_name}' 且GUID为 '{project_guid}' 的节点")
else:
print(f"未找到项目名称为 '{project_name}' 的节点")
return None, None, None, None, None
# 获取找到的节点
target_node = target_nodes[0]
return data, project_data, cost_setting, project_division, target_node
except Exception as e:
print(f"加载项目数据时出错: {e}")
return None, None, None, None, None
def get_cost_table_children(json_file_path, project_name, project_guid=None):
"""获取取费表子节点"""
try:
# 加载项目数据,传递project_guid参数
data, project_data, cost_setting, project_division, target_node = load_project_data(
json_file_path, project_name, project_guid
)
if not target_node:
return None
# 获取取费表名称
fee_table_name = target_node.get("取费表")
if not fee_table_name:
print(f"目标项目划分节点中没有'取费表'字段")
return None
# 查找对应的取费表
cost_table = find_cost_table(cost_setting, fee_table_name)
cost_table_children = cost_table.get("children", None) if cost_table else None
return cost_table_children
except Exception as e:
print(f"获取取费表子节点时出错: {e}")
return None
def get_quantity_nodes(json_file_path, project_name, engineering_type, project_guid=None):
"""
获取工程量节点
Args:
json_file_path: JSON文件路径
project_name: 项目名称
engineering_type: 工程类型
project_guid: 项目GUID,用于区分同名项目
Returns:
list: 工程量节点列表,如果没有找到项目划分节点则返回None,如果找到项目划分节点但没有工程量节点则返回空列表
"""
try:
# 加载项目数据,传递project_guid参数
data, project_data, cost_setting, project_division, target_node = load_project_data(
json_file_path, project_name, project_guid
)
if not target_node:
return None # 没有找到项目划分节点
# 处理项目子节点
project_children = target_node.get("children", None)
if not project_children:
print(f"项目 '{project_name}' (GUID: {project_guid}) 没有子节点(工程量节点)")
return [] # 找到项目划分节点,但没有子节点(工程量节点)
# 如果是清单工程,需要先找到清单节点,然后获取其下的工程量节点
if engineering_type == "清单工程":
quantity_nodes = []
print(f"开始处理清单工程,项目名称: {project_name}")
# 递归查找真正的清单节点
def find_true_bill_nodes(node_list, parent_path=""):
result = []
for node in node_list:
# 检查是否是清单节点
is_bill_node = (
node.get("类型") == "8"
or node.get("类型") == "清单"
or node.get("type") == "8"
or node.get("type") == "清单"
or "清单名称" in node
or "清单全码" in node
)
node_name = node.get("清单名称", node.get("项目名称", "未命名"))
# 强制使用GUID而不是ID
node_id = node.get("GUID") or node.get("guid") or node.get("id", "未知ID")
current_path = f"{parent_path}/{node_name}" if parent_path else node_name
if is_bill_node:
print(f"找到清单节点: ID={node_id}, 名称={node_name}, 路径={current_path}")
# 检查清单节点是否有取费表名称
has_fee_table = "取费表名称" in node or "取费表" in node
if has_fee_table:
fee_table_name = node.get("取费表名称", node.get("取费表", "未知"))
print(f"清单节点有取费表: {fee_table_name}")
# 添加到缓存
if node_id != "未知ID":
_bill_node_cache[node_id] = node
print(f"缓存清单节点: ID={node_id}")
result.append(node)
else:
print(f"警告:清单节点没有取费表名称: {node_name}")
# 递归处理子节点
if "children" in node and node["children"]:
result.extend(find_true_bill_nodes(node["children"], current_path))
return result
# 查找所有真正的清单节点
bill_nodes = find_true_bill_nodes(project_children)
print(f"找到 {len(bill_nodes)} 个有效清单节点")
# 处理每个清单节点
for bill_node in bill_nodes:
# 强制使用GUID而不是ID
bill_id = bill_node.get("GUID") or bill_node.get("guid") or bill_node.get("id")
bill_name = bill_node.get("清单名称", bill_node.get("项目名称", "未命名清单"))
bill_children = bill_node.get("children", [])
if bill_children:
# 处理清单节点的子节点,获取工程量节点
bill_quantity_nodes, _ = process_project_children(bill_children)
if bill_quantity_nodes:
print(f"清单节点 '{bill_name}' 下找到 {len(bill_quantity_nodes)} 个工程量节点")
# 为每个工程量节点设置清单节点信息
for node in bill_quantity_nodes:
# 存储关键信息而不是整个对象
# 强制使用GUID而不是ID
node["bill_guid"] = bill_id # 新增GUID字段
node["bill_id"] = bill_id # 保持兼容性
node["bill_name"] = bill_name
node["取费表名称"] = bill_node.get("取费表名称", bill_node.get("取费表", ""))
# 设置parent_id以保持兼容性
node["parent_id"] = bill_id
print(f"设置工程量节点 '{node.get('项目名称', '未命名')}' 的清单节点信息")
# 添加这一行,将完整的清单节点添加到工程量节点中
node["bill_node"] = bill_node
quantity_nodes.extend(bill_quantity_nodes)
else:
print(f"清单节点 '{bill_name}' 下未找到工程量节点")
else:
print(f"清单节点 '{bill_name}' 没有子节点")
# 如果没有找到任何工程量节点,尝试直接获取定额节点
if not quantity_nodes:
print("未找到清单节点下的工程量节点,尝试直接获取定额节点...")
def find_quota_nodes(node_list):
result = []
for node in node_list:
# 检查是否是定额节点
is_quota_node = node.get("类型") == "定额" or node.get("type") == "定额"
if is_quota_node:
result.append(node)
# 递归处理子节点
if "children" in node and node["children"]:
result.extend(find_quota_nodes(node["children"]))
return result
# 查找所有定额节点
quota_nodes = find_quota_nodes(project_children)
if quota_nodes:
print(f"找到 {len(quota_nodes)} 个定额节点")
# 检查这些定额节点是否有清单节点信息
for node in quota_nodes:
parent_node = None
parent_id = node.get("parent_id")
# 在项目中查找父节点
def find_node_by_id(node_list, node_id):
for n in node_list:
# 强制使用GUID而不是ID进行匹配
node_guid = n.get("GUID") or n.get("guid") or n.get("id")
if node_guid == node_id:
return n
if "children" in n and n["children"]:
found = find_node_by_id(n["children"], node_id)
if found:
return found
return None
if parent_id:
parent_node = find_node_by_id(project_children, parent_id)
# 如果找到父节点且是清单节点,使用其信息
if parent_node:
is_bill_node = (
parent_node.get("类型") == "8"
or parent_node.get("类型") == "清单"
or parent_node.get("type") == "8"
or parent_node.get("type") == "清单"
or "清单名称" in parent_node
or "清单全码" in parent_node
)
if is_bill_node:
# 强制使用GUID而不是ID
bill_id = parent_node.get("GUID") or parent_node.get("guid") or parent_node.get("id")
bill_name = parent_node.get("清单名称", parent_node.get("项目名称", "未命名清单"))
# 强制使用GUID而不是ID
node["bill_guid"] = bill_id # 新增GUID字段
node["bill_id"] = bill_id # 保持兼容性
node["bill_name"] = bill_name
node["取费表名称"] = parent_node.get("取费表名称", parent_node.get("取费表", ""))
# 设置parent_id以保持兼容性
node["parent_id"] = bill_id
print(f"为定额节点 '{node.get('项目名称', '未命名')}' 设置清单节点信息")
quantity_nodes.extend(quota_nodes)
return quantity_nodes
else:
# 预算工程 - 直接获取工程量节点
quantity_nodes, _ = process_project_children(project_children)
# 如果没有工程量节点,返回空列表而不是None
if not quantity_nodes:
print(f"项目 '{project_name}' (GUID: {project_guid}) 没有工程量节点")
return []
return quantity_nodes
except Exception as e:
print(f"获取工程量节点时出错: {e}")
import traceback
traceback.print_exc() # 打印详细错误堆栈
return None
def get_resource_nodes(json_file_path, project_name, project_guid=None):
"""
获取人材机节点
Args:
json_file_path: JSON文件路径
project_name: 项目名称
project_guid: 项目GUID,用于区分同名项目
Returns:
list: 人材机节点列表
"""
try:
# 加载项目数据,传递project_guid参数
data, project_data, cost_setting, project_division, target_node = load_project_data(
json_file_path, project_name, project_guid
)
if not target_node:
return None
# 获取项目子节点
project_children = target_node.get("children", None)
if not project_children:
print(f"项目 '{project_name}' (GUID: {project_guid}) 没有子节点")
return []
# 提取人材机节点
resource_nodes = []
for child in project_children:
# 提取人材机节点并保持父子结构
nodes = extract_resource_nodes(child)
if nodes:
resource_nodes.extend(nodes)
# 映射节点类型
resource_nodes = [map_resource_node_types(node) for node in resource_nodes]
return resource_nodes
except Exception as e:
print(f"获取人材机节点时出错: {e}")
return None
def get_classified_resource_nodes(json_file_path, project_name, project_guid=None):
"""
获取分类后的人材机节点
Args:
json_file_path: JSON文件路径
project_name: 项目名称
project_guid: 项目GUID,用于区分同名项目
Returns:
tuple: (人工节点列表, 材料节点列表, 机械节点列表),每个列表包含(节点, 父级ID)元组
"""
# 获取所有人材机节点,传递project_guid参数
resource_nodes = get_resource_nodes(json_file_path, project_name, project_guid)
if not resource_nodes:
return [], [], []
# 分类存储
labor_nodes = [] # 人工节点
material_nodes = [] # 材料节点
machine_nodes = [] # 机械节点
# 递归函数,用于处理节点及其子节点
def process_node(node):
node_type = node.get("类型")
parent_id = node.get("parent_id")
# 同时支持数字类型和字符串类型
if node_type in ["人工", "2"]:
node_type = "人工" # 统一转换为字符串类型
labor_nodes.append((node, parent_id))
elif node_type in ["材料", "3"]:
node_type = "材料" # 统一转换为字符串类型
material_nodes.append((node, parent_id))
elif node_type in ["机械", "4"]:
node_type = "机械" # 统一转换为字符串类型
machine_nodes.append((node, parent_id))
# 更新类型字段
node["类型"] = node_type
# 处理子节点
if "children" in node and node["children"]:
for child in node["children"]:
# 确保子节点有父级ID
if "parent_id" not in child:
child["parent_id"] = node.get("id")
process_node(child)
# 处理所有节点
for node in resource_nodes:
process_node(node)
return labor_nodes, material_nodes, machine_nodes
def find_bill_node_by_id(node, bill_id, result=None):
"""递归查找指定ID的清单节点"""
if result is not None and result:
return result
if isinstance(node, dict):
# 检查当前节点是否是目标清单节点
node_id_matches = node.get("id") == bill_id or node.get("GUID") == bill_id
if node_id_matches:
# 检查是否是清单节点
is_bill_node = (
node.get("类型") == "8"
or node.get("类型") == "清单"
or node.get("type") == "8"
or node.get("type") == "清单"
or "清单名称" in node
or "清单全码" in node
)
if is_bill_node:
print(f"找到清单节点 ID={bill_id}, 类型={node.get('类型', node.get('type', '未知'))}")
return [node]
else:
print(
f"找到ID匹配但不是清单节点的节点: ID={bill_id}, 类型={node.get('类型', node.get('type', '未知'))}"
)
# 不返回非清单节点
# 递归查找子节点
if "children" in node and isinstance(node["children"], list):
for child in node["children"]:
result = find_bill_node_by_id(child, bill_id, result)
if result:
return result
# 检查其他字段
for key, value in node.items():
if key != "children" and isinstance(value, (dict, list)):
result = find_bill_node_by_id(value, bill_id, result)
if result:
return result
elif isinstance(node, list):
for item in node:
if isinstance(item, (dict, list)):
result = find_bill_node_by_id(item, bill_id, result)
if result:
return result
return result or []
def get_bill_cost_table(json_file_path, bill_id):
"""获取清单节点的取费表子节点"""
try:
# 读取JSON文件
with open(json_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# 获取projectData中的costSetting和projectDivision
project_data = data.get("projectData", {})
cost_setting = project_data.get("costSetting", {})
project_division = project_data.get("projectDivision", {})
print(f"正在查找清单节点ID: {bill_id}")
# 查找指定ID的清单节点
bill_nodes = find_bill_node_by_id(project_division, bill_id)
if not bill_nodes:
print(f"未找到ID为 '{bill_id}' 的清单节点")
return None
# 获取找到的节点
bill_node = bill_nodes[0]
# 获取取费表名称或ID - 尝试多种可能的字段名
fee_table_name = bill_node.get("取费表名称") or bill_node.get("取费表") or bill_node.get("费率表")
if not fee_table_name:
print(f"清单节点中没有'取费表名称'字段")
# 打印节点信息以便调试
print(f"清单节点字段: {', '.join(bill_node.keys())}")
# 尝试从父级节点获取取费表名称
if "parent_id" in bill_node:
parent_nodes = find_bill_node_by_id(project_division, bill_node["parent_id"])
if parent_nodes:
parent_node = parent_nodes[0]
fee_table_name = (
parent_node.get("取费表名称") or parent_node.get("取费表") or parent_node.get("费率表")
)
if fee_table_name:
print(f"从父级节点获取到取费表名称: {fee_table_name}")
if not fee_table_name:
return None
print(f"找到取费表名称: {fee_table_name}")
# 使用相同的函数查找取费表 - 与预算工程保持一致
cost_table = find_cost_table(cost_setting, fee_table_name)
if not cost_table:
print(f"未找到取费表 '{fee_table_name}'")
return None
cost_table_children = cost_table.get("children", None)
return cost_table_children
except Exception as e:
print(f"获取清单节点取费表时出错: {e}")
import traceback
traceback.print_exc() # 打印详细错误堆栈
return None
def get_bill_node_by_id(bill_id: str) -> Dict[str, Any]:
"""
根据ID从缓存中获取清单节点
Args:
bill_id: 清单节点ID
Returns:
Dict[str, Any]: 清单节点数据
"""
# 清理ID格式
clean_bill_id = str(bill_id).strip("{}").upper()
# 在缓存中查找
for cached_id, node in _bill_node_cache.items():
if str(cached_id).strip("{}").upper() == clean_bill_id:
print(f"从缓存中获取清单节点: {node.get('清单名称', '未命名')}")
return node
print(f"在缓存中未找到ID为 {bill_id} 的清单节点")
return {}
# # 测试代码
# if __name__ == "__main__":
# json_file_path = os.path.join("技改预算", "架线.json")
# project_name = "基础工程材料工地运输"
# adjustment_type = "拆除"
# # 获取并输出取费表子节点
# cost_children = get_cost_table_children(json_file_path, project_name)
# print("\n取费表子节点:")
# print(json.dumps(cost_children, ensure_ascii=False, indent=2) if cost_children else None)
# # 获取并输出工程量节点
# quantity_nodes = get_quantity_nodes(json_file_path, project_name, adjustment_type)
# print("\n工程量节点:")
# print(json.dumps(quantity_nodes, ensure_ascii=False, indent=2) if quantity_nodes else None)
# # 获取并输出分类后的人材机节点
# labor_nodes, material_nodes, machine_nodes = get_classified_resource_nodes(
# json_file_path, project_name, adjustment_type
# )
# print("\n人工节点列表:")
# for node, parent_id in labor_nodes:
# print(f"节点名称: {node.get('名称')}, 父级ID: {parent_id}")
# print("\n材料节点列表:")
# for node, parent_id in material_nodes:
# print(f"节点名称: {node.get('名称')}, 父级ID: {parent_id}")
# print("\n机械节点列表:")
# for node, parent_id in machine_nodes:
# print(f"节点名称: {node.get('名称')}, 父级ID: {parent_id}")