1320 lines
60 KiB
Python
1320 lines
60 KiB
Python
"""
|
||
第四步:将bcl计算结果补充到json文件中
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
from copy import deepcopy
|
||
import uuid
|
||
import re
|
||
|
||
|
||
class ProjectExpenseProcessor:
|
||
def __init__(self):
|
||
self.project_data = None
|
||
|
||
def initialize_project_data(self, project_data):
|
||
"""初始化项目数据,深拷贝避免修改原始数据"""
|
||
self.project_data = deepcopy(project_data) if project_data else {}
|
||
|
||
# 添加调试信息
|
||
print(f"项目数据结构: {list(self.project_data.keys())}")
|
||
|
||
# 检查projectData是否存在
|
||
if self.project_data and "projectData" in self.project_data:
|
||
print("projectData 结构存在")
|
||
print(f"projectData 子结构: {list(self.project_data['projectData'].keys())}")
|
||
|
||
# 检查expensePreview是否存在于projectData中
|
||
if "expensePreview" in self.project_data["projectData"]:
|
||
print("expensePreview 结构存在于 projectData 中")
|
||
expense_preview = self.project_data["projectData"]["expensePreview"]
|
||
print(f"expensePreview 类别: {list(expense_preview.keys())}")
|
||
|
||
for category_name, category in expense_preview.items():
|
||
print(f"类别: {category_name}, 类型: {type(category)}")
|
||
|
||
# 处理category是字典的情况
|
||
if isinstance(category, dict):
|
||
for group_name, group in category.items():
|
||
print(f" 组: {group_name}, 类型: {type(group)}")
|
||
if isinstance(group, list):
|
||
for i, item in enumerate(group):
|
||
if isinstance(item, dict) and "GUID" in item:
|
||
print(f" 项目 {i} GUID: {item['GUID']}")
|
||
|
||
# 处理category是列表的情况
|
||
elif isinstance(category, list):
|
||
print(f" 类别 {category_name} 是列表类型,包含 {len(category)} 个项目")
|
||
for i, item in enumerate(category):
|
||
if isinstance(item, dict) and "GUID" in item:
|
||
print(f" 项目 {i} GUID: {item['GUID']}")
|
||
else:
|
||
print("警告: expensePreview 结构不存在于 projectData 中!")
|
||
else:
|
||
print("警告: projectData 结构不存在!")
|
||
|
||
return self
|
||
|
||
def check_guids_in_division(self, node=None, path=""):
|
||
"""检查项目划分中的所有GUID"""
|
||
if node is None:
|
||
if not self.project_data or "projectData" not in self.project_data:
|
||
print("警告: project_data 为空或不包含 projectData")
|
||
return
|
||
|
||
project_division = self.project_data.get("projectDivision", {})
|
||
if isinstance(project_division, dict):
|
||
for category_name, category in project_division.items():
|
||
self.check_guids_in_division(category, category_name)
|
||
return
|
||
|
||
if isinstance(node, list):
|
||
for i, item in enumerate(node):
|
||
self.check_guids_in_division(item, f"{path}[{i}]")
|
||
elif isinstance(node, dict):
|
||
if (
|
||
"GUID" in node
|
||
or "guid" in node
|
||
or (
|
||
"id" in node
|
||
and isinstance(node["id"], str)
|
||
and node["id"].startswith("{")
|
||
and node["id"].endswith("}")
|
||
)
|
||
):
|
||
guid = self.get_node_guid(node)
|
||
if guid:
|
||
guid_with_braces = "{" + guid + "}"
|
||
print(f"项目划分GUID: {guid_with_braces} 在路径: {path}")
|
||
# 尝试在费用预览中查找
|
||
expense_node = self.find_expense_preview_node(guid)
|
||
if expense_node:
|
||
print(f" ✓ 在费用预览中找到对应节点")
|
||
else:
|
||
print(f" ✗ 在费用预览中未找到对应节点")
|
||
|
||
if "children" in node:
|
||
self.check_guids_in_division(node["children"], f"{path}.children")
|
||
|
||
def find_project_division_node(self, target_guid, node=None, path=""):
|
||
"""查找指定GUID的最子级项目划分节点"""
|
||
# 将目标GUID转换为大写并去掉花括号
|
||
target_guid = target_guid.strip("{}").upper()
|
||
|
||
if node is None:
|
||
# 从 projectData.projectDivision 开始搜索
|
||
if not self.project_data or "projectData" not in self.project_data:
|
||
print("警告: project_data 为空或不包含 projectData")
|
||
return None
|
||
|
||
if "projectDivision" not in self.project_data["projectData"]:
|
||
print("警告: project_data 不包含 projectData.projectDivision")
|
||
return None
|
||
|
||
# print(f"开始在项目划分中查找GUID: {target_guid}")
|
||
for category_name, category in self.project_data["projectData"]["projectDivision"].items():
|
||
# print(f"搜索项目划分类别: {category_name}")
|
||
# 递归处理category,无论它是什么类型
|
||
result = self.find_project_division_node(
|
||
target_guid, category, f"projectData.projectDivision.{category_name}"
|
||
)
|
||
if result:
|
||
return result
|
||
# print(f"在项目划分中未找到GUID: {target_guid}")
|
||
return None
|
||
|
||
if isinstance(node, list):
|
||
for i, item in enumerate(node):
|
||
new_path = f"{path}[{i}]"
|
||
result = self.find_project_division_node(target_guid, item, new_path)
|
||
if result:
|
||
return result
|
||
elif isinstance(node, dict):
|
||
# 检查当前节点的GUID
|
||
current_guid = self.get_node_guid(node)
|
||
if current_guid:
|
||
# print(f"比较项目划分GUID: {current_guid} vs {target_guid} 在路径: {path}")
|
||
pass
|
||
if current_guid == target_guid:
|
||
# print(f"找到匹配的项目划分GUID: {current_guid} 在路径: {path}")
|
||
return node
|
||
|
||
# 递归检查所有子节点,包括children和其他字典值
|
||
for key, value in node.items():
|
||
if isinstance(value, (dict, list)):
|
||
new_path = f"{path}.{key}"
|
||
result = self.find_project_division_node(target_guid, value, new_path)
|
||
if result:
|
||
return result
|
||
return None
|
||
|
||
def find_expense_preview_node(self, target_guid, node=None, path=""):
|
||
"""
|
||
在 expensePreview 中递归查找指定 GUID 的节点
|
||
:param target_guid: 要查找的 GUID(字符串,不带 {})
|
||
:param node: 当前查找的子节点(默认从 project_data 开始)
|
||
:param path: 当前搜索路径(用于调试)
|
||
:return: 找到的节点或 None
|
||
"""
|
||
# 将目标GUID转换为大写并去掉花括号
|
||
target_guid = target_guid.strip("{}").upper()
|
||
|
||
if (
|
||
not self.project_data
|
||
or "projectData" not in self.project_data
|
||
or "expensePreview" not in self.project_data["projectData"]
|
||
):
|
||
print("警告: project_data 为空或不包含 projectData.expensePreview")
|
||
return None
|
||
|
||
# 初始调用时从顶层开始
|
||
if node is None:
|
||
# print(f"开始查找GUID: {target_guid}")
|
||
expense_preview = self.project_data["projectData"]["expensePreview"]
|
||
for category_name, category in expense_preview.items():
|
||
# 处理category是字典的情况
|
||
if isinstance(category, dict):
|
||
for group_name, group in category.items():
|
||
if isinstance(group, list):
|
||
for i, item in enumerate(group):
|
||
new_path = f"projectData.expensePreview.{category_name}.{group_name}[{i}]"
|
||
result = self.find_expense_preview_node(target_guid, item, new_path)
|
||
if result:
|
||
return result
|
||
# 处理category是列表的情况
|
||
elif isinstance(category, list):
|
||
for i, item in enumerate(category):
|
||
new_path = f"projectData.expensePreview.{category_name}[{i}]"
|
||
result = self.find_expense_preview_node(target_guid, item, new_path)
|
||
if result:
|
||
return result
|
||
# print(f"在顶层搜索中未找到GUID: {target_guid}")
|
||
return None
|
||
|
||
# 检查当前节点
|
||
current_guid = self.get_node_guid(node)
|
||
if current_guid:
|
||
# print(f"比较GUID: {current_guid} vs {target_guid} 在路径: {path}")
|
||
pass
|
||
if current_guid == target_guid:
|
||
# print(f"找到匹配的GUID: {current_guid} 在路径: {path}")
|
||
return node
|
||
|
||
# 递归检查子节点
|
||
if "children" in node and isinstance(node["children"], list):
|
||
for i, child in enumerate(node["children"]):
|
||
new_path = f"{path}.children[{i}]"
|
||
result = self.find_expense_preview_node(target_guid, child, new_path)
|
||
if result:
|
||
return result
|
||
|
||
return None
|
||
|
||
@staticmethod
|
||
def generate_id(text):
|
||
"""生成简单的ID(只保留字母数字和中文)"""
|
||
return "".join(c for c in text if c.isalnum() or "\u4e00" <= c <= "\u9fa5").upper()
|
||
|
||
def convert_calculation_results_to_children(self, calculation_results):
|
||
"""将工程量计算结果转换为标准格式"""
|
||
children = []
|
||
for node_name, costs in calculation_results.items():
|
||
node_data = {"name": node_name, "type": "工程量节点", "children": []}
|
||
for cost_type, amount in costs.items():
|
||
node_data["children"].append(
|
||
{
|
||
"id": f"{cost_type}_{self.generate_id(cost_type)}",
|
||
"cost": str(amount),
|
||
}
|
||
)
|
||
children.append(node_data)
|
||
return children
|
||
|
||
def generate_guid(self):
|
||
"""生成新的GUID"""
|
||
return "{" + str(uuid.uuid4()).upper() + "}"
|
||
|
||
@staticmethod
|
||
def _determine_project_type(project_data):
|
||
"""
|
||
根据division字段判断工程类型
|
||
:param project_data: 项目数据
|
||
:return: 'inventory' 表示清单工程,'budget' 表示预算工程
|
||
"""
|
||
# 清单工程关键词
|
||
inventory_keywords = ["清单", "结算", "招标控制价", "招投标工程", "清单计价"]
|
||
# 预算工程关键词
|
||
budget_keywords = ["概预算", "定额", "定额计价", "概算", "概预算工程"]
|
||
|
||
# 尝试从数据中获取division字段
|
||
division = None
|
||
if "division" in project_data:
|
||
division = project_data["division"]
|
||
parts = division.split("-")
|
||
|
||
# 如果找到division字段
|
||
if division:
|
||
# 去掉"主网-"前缀
|
||
if len(parts) == 2:
|
||
division_type = parts[1].strip()
|
||
else:
|
||
division_type = parts[2].strip()
|
||
|
||
# 判断是否为清单工程
|
||
for keyword in inventory_keywords:
|
||
if keyword in division_type:
|
||
print(f"根据division字段 '{division}' 判断为清单工程")
|
||
return "inventory"
|
||
|
||
# 判断是否为预算工程
|
||
for keyword in budget_keywords:
|
||
if keyword in division_type:
|
||
print(f"根据division字段 '{division}' 判断为预算工程")
|
||
return "budget"
|
||
|
||
# 如果无法通过division字段判断,则尝试通过数据结构判断
|
||
is_inventory_project = False
|
||
for key in project_data.keys():
|
||
if re.search(r"[0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12}", key, re.IGNORECASE):
|
||
is_inventory_project = True
|
||
print("通过数据结构判断为清单工程")
|
||
break
|
||
|
||
return "inventory" if is_inventory_project else "budget"
|
||
|
||
def add_quantity_node_expense_data(self, project_guid, calculation_results):
|
||
"""添加工程量节点费用预览数据"""
|
||
try:
|
||
# 1. 查找项目划分节点
|
||
division_node = self.find_project_division_node(project_guid)
|
||
if not division_node:
|
||
print(f"未找到GUID为 {project_guid} 的项目划分节点")
|
||
return False
|
||
|
||
# 2. 查找费用预览节点
|
||
expense_node = self.find_expense_preview_node(project_guid)
|
||
if not expense_node:
|
||
print(f"未找到GUID为 {project_guid} 的费用预览节点")
|
||
return False
|
||
|
||
# 3. 确保节点有标准格式
|
||
self.ensure_standard_format(expense_node)
|
||
|
||
# 4. 判断工程类型
|
||
project_type = ProjectExpenseProcessor._determine_project_type(calculation_results)
|
||
|
||
if project_type == "inventory":
|
||
# 清单工程处理逻辑
|
||
print("使用清单工程处理逻辑")
|
||
return self._process_inventory_project(division_node, expense_node, calculation_results)
|
||
else:
|
||
# 预算工程处理逻辑
|
||
print("使用预算工程处理逻辑")
|
||
return self._process_budget_project(division_node, expense_node, calculation_results)
|
||
|
||
except Exception as e:
|
||
print(f"添加工程量节点费用预览数据失败: {str(e)}")
|
||
import traceback
|
||
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
def _process_budget_project(self, division_node, expense_node, calculation_results):
|
||
"""处理预算工程的费用数据"""
|
||
try:
|
||
# 清空children准备添加工程量节点
|
||
expense_node["children"] = []
|
||
|
||
# 处理每个工程量计算结果
|
||
for node_name, costs in calculation_results.items():
|
||
# 查找项目划分中对应的工程量节点
|
||
quantity_nodes = [] # 修改为列表,存储所有匹配的节点
|
||
|
||
# 检查node_name是否包含GUID格式
|
||
guid_match = re.search(
|
||
r"_([0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12})$",
|
||
node_name,
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
node_base_name = node_name
|
||
node_guid = None
|
||
node_id = None
|
||
|
||
# 如果节点名称包含GUID格式
|
||
if guid_match:
|
||
node_guid = guid_match.group(1)
|
||
node_base_name = node_name[: node_name.rfind("_")]
|
||
print(f"从名称中提取GUID: {node_guid}, 基础名称: {node_base_name}")
|
||
else:
|
||
# 检查是否为"工程量名称_id"格式(纯数字ID)
|
||
node_name_parts = node_name.split("_")
|
||
if len(node_name_parts) > 1 and node_name_parts[-1].isdigit():
|
||
node_id = node_name_parts[-1]
|
||
node_base_name = "_".join(node_name_parts[:-1])
|
||
print(f"从名称中提取ID: {node_id}, 基础名称: {node_base_name}")
|
||
|
||
if "children" in division_node:
|
||
guid_match_nodes = [] # 存储GUID匹配的节点
|
||
exact_match_nodes = [] # 存储完全匹配的节点
|
||
id_match_nodes = [] # 存储ID匹配的节点
|
||
name_match_nodes = [] # 存储基础名称匹配的节点
|
||
|
||
# 如果从名称中提取了GUID,先尝试为没有GUID的节点添加这个GUID
|
||
if node_guid:
|
||
for child in division_node["children"]:
|
||
child_name = child.get("项目名称") or child.get("name", "")
|
||
# 如果节点名称与基础名称匹配,但没有任何形式的GUID,则添加从名称中提取的GUID
|
||
if child_name == node_base_name and self.get_node_guid(child) is None:
|
||
self.set_node_guid(child, node_guid)
|
||
print(f"为匹配的节点 {child_name} 添加从名称中提取的GUID: {node_guid}")
|
||
|
||
for child in division_node["children"]:
|
||
child_name = child.get("项目名称") or child.get("name", "")
|
||
child_id = child.get("id", "")
|
||
child_guid = self.get_node_guid(child)
|
||
|
||
# 情况0: GUID匹配(最高优先级)
|
||
if node_guid and child_guid and node_guid.upper() == child_guid:
|
||
guid_match_nodes.append(child)
|
||
print(f"找到GUID匹配的工程量节点: {child_name} (GUID: {child_guid})")
|
||
|
||
# 情况1: 完全匹配(次高优先级)
|
||
if child_name == node_name:
|
||
exact_match_nodes.append(child)
|
||
|
||
# 情况2: 检查项目划分节点名称是否为"工程量名称_id"格式
|
||
child_name_parts = child_name.split("_")
|
||
child_base_name = child_name
|
||
child_id_from_name = None
|
||
|
||
if len(child_name_parts) > 1 and child_name_parts[-1].isdigit():
|
||
child_id_from_name = child_name_parts[-1]
|
||
child_base_name = "_".join(child_name_parts[:-1])
|
||
|
||
# 情况3: 名称和ID都匹配
|
||
if node_id and (child_id == node_id or child_id_from_name == node_id):
|
||
if child_base_name == node_base_name:
|
||
id_match_nodes.append(child)
|
||
|
||
# 情况4: 基础名称匹配(最低优先级)
|
||
elif child_base_name == node_base_name:
|
||
name_match_nodes.append(child)
|
||
|
||
# 按优先级选择匹配节点
|
||
if guid_match_nodes:
|
||
quantity_nodes = guid_match_nodes
|
||
print(f"使用GUID匹配的工程量节点: {node_name}, 找到 {len(guid_match_nodes)} 个节点")
|
||
elif exact_match_nodes:
|
||
quantity_nodes = exact_match_nodes
|
||
print(f"使用完全匹配的工程量节点: {node_name}, 找到 {len(exact_match_nodes)} 个节点")
|
||
elif id_match_nodes:
|
||
quantity_nodes = id_match_nodes
|
||
print(f"使用ID匹配的工程量节点: {node_name} (ID: {node_id}), 找到 {len(id_match_nodes)} 个节点")
|
||
elif name_match_nodes:
|
||
quantity_nodes = name_match_nodes
|
||
print(f"使用基础名称匹配的工程量节点: {node_base_name}, 找到 {len(name_match_nodes)} 个节点")
|
||
|
||
if not quantity_nodes:
|
||
print(f"未找到名称为 {node_name} 的工程量节点")
|
||
continue
|
||
|
||
# 处理所有匹配的节点
|
||
for quantity_node in quantity_nodes:
|
||
# 如果工程量节点没有GUID,使用从名称中提取的GUID或生成一个新的
|
||
existing_guid = self.get_node_guid(quantity_node)
|
||
if existing_guid is None:
|
||
if node_guid:
|
||
self.set_node_guid(quantity_node, node_guid)
|
||
print(f"使用名称中的GUID: {node_guid}")
|
||
else:
|
||
new_guid = self.generate_guid()
|
||
self.set_node_guid(quantity_node, new_guid)
|
||
print(f"生成新的GUID: {new_guid}")
|
||
else:
|
||
print(f"使用节点已有的GUID: {existing_guid}")
|
||
|
||
# 使用get_node_guid确保获取正确的GUID,无论是大写还是小写
|
||
quantity_guid = self.get_node_guid(quantity_node)
|
||
if not quantity_guid:
|
||
# 这种情况不应该发生,因为前面已经确保了节点有GUID
|
||
print(f"警告: 节点 {node_name} 没有GUID")
|
||
continue
|
||
|
||
# 创建工程量费用预览节点(使用标准格式)
|
||
quantity_expense = {"GUID": "{" + quantity_guid.upper() + "}", "sum": [], "children": [], "rcj": []}
|
||
|
||
# 添加费用项到sum
|
||
for cost_type, amount in costs.items():
|
||
quantity_expense["sum"].append({"id": f"{cost_type}", "cost": str(amount)})
|
||
|
||
# 将工程量费用预览节点添加到项目划分费用预览的children中
|
||
expense_node["children"].append(quantity_expense)
|
||
|
||
print(f"成功添加GUID为 {division_node.get('GUID', '')} 的工程量节点费用预览数据")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"处理预算工程费用数据失败: {str(e)}")
|
||
return False
|
||
|
||
def _process_inventory_project(self, division_node, expense_node, calculation_results):
|
||
"""处理清单工程的费用数据"""
|
||
try:
|
||
# 确保expense_node有children数组
|
||
if "children" not in expense_node:
|
||
expense_node["children"] = []
|
||
|
||
# 从calculation_results中移除division字段(如果存在)
|
||
if "division" in calculation_results:
|
||
division_value = calculation_results.pop("division")
|
||
print(f"移除division字段: {division_value}")
|
||
|
||
# 保存原有的清单级节点
|
||
existing_inventory_nodes = {}
|
||
if "children" in expense_node:
|
||
for child in expense_node["children"]:
|
||
if "GUID" in child:
|
||
existing_inventory_nodes[child["GUID"].strip("{}").upper()] = child
|
||
|
||
# 创建清单级节点字典,用于存储新的或更新的清单级节点
|
||
inventory_nodes = {}
|
||
|
||
# 处理计算结果中的每个清单级节点
|
||
for inventory_key, inventory_data in calculation_results.items():
|
||
# 跳过非清单级节点
|
||
if inventory_key == "division":
|
||
continue
|
||
|
||
# 从清单键中提取GUID
|
||
inventory_guid_match = re.search(
|
||
r"([0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12})", inventory_key, re.IGNORECASE
|
||
)
|
||
if not inventory_guid_match:
|
||
print(f"无法从键 {inventory_key} 中提取GUID,跳过")
|
||
continue
|
||
|
||
inventory_guid = inventory_guid_match.group(1)
|
||
|
||
# 在项目划分中查找对应的清单节点
|
||
inventory_node = self._find_node_by_guid(division_node, inventory_guid)
|
||
if not inventory_node:
|
||
print(f"未找到GUID为 {inventory_guid} 的清单节点")
|
||
continue
|
||
|
||
# 获取或创建清单级费用节点
|
||
inventory_guid_upper = inventory_guid.strip("{}").upper()
|
||
if inventory_guid_upper in existing_inventory_nodes:
|
||
# 使用现有的清单级节点
|
||
inventory_expense = existing_inventory_nodes[inventory_guid_upper]
|
||
print(f"使用现有的清单级节点: {inventory_guid}")
|
||
else:
|
||
# 创建新的清单级费用节点
|
||
inventory_expense = {"GUID": "{" + inventory_guid_upper + "}", "sum": [], "children": [], "rcj": []}
|
||
print(f"创建新的清单级节点: {inventory_guid}")
|
||
|
||
# 确保清单级费用节点有标准格式
|
||
self.ensure_standard_format(inventory_expense)
|
||
|
||
# 创建工程量级节点字典,用于存储新的或更新的工程量级节点
|
||
quantity_nodes = {}
|
||
|
||
# 如果清单级节点已有工程量级节点,保存它们
|
||
if "children" in inventory_expense:
|
||
for child in inventory_expense["children"]:
|
||
if "GUID" in child:
|
||
quantity_nodes[child["GUID"].strip("{}").upper()] = child
|
||
|
||
# 处理清单下的每个工程量节点
|
||
for quantity_name, quantity_costs in inventory_data.items():
|
||
# 检查quantity_name是否包含GUID格式
|
||
guid_match = re.search(
|
||
r"_([0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12})$",
|
||
quantity_name,
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
quantity_base_name = quantity_name
|
||
quantity_guid = None
|
||
quantity_id = None
|
||
|
||
# 如果节点名称包含GUID格式
|
||
if guid_match:
|
||
quantity_guid = guid_match.group(1)
|
||
quantity_base_name = quantity_name[: quantity_name.rfind("_")]
|
||
print(f"从名称中提取GUID: {quantity_guid}, 基础名称: {quantity_base_name}")
|
||
else:
|
||
# 检查是否为"工程量名称_id"格式(纯数字ID)
|
||
quantity_name_parts = quantity_name.split("_")
|
||
if len(quantity_name_parts) > 1 and quantity_name_parts[-1].isdigit():
|
||
quantity_id = quantity_name_parts[-1]
|
||
quantity_base_name = "_".join(quantity_name_parts[:-1])
|
||
print(f"从名称中提取ID: {quantity_id}, 基础名称: {quantity_base_name}")
|
||
|
||
# 在清单节点的children中查找工程量节点
|
||
quantity_node = None
|
||
guid_match_node = None
|
||
exact_match = None
|
||
id_match = None
|
||
name_match = None
|
||
|
||
if "children" in inventory_node:
|
||
# 如果从名称中提取了GUID,先尝试为没有GUID的节点添加这个GUID
|
||
if quantity_guid:
|
||
for child in inventory_node["children"]:
|
||
child_name = child.get("项目名称") or child.get("name", "")
|
||
# 如果节点名称与基础名称匹配,但没有任何形式的GUID,则添加从名称中提取的GUID
|
||
if child_name == quantity_base_name and self.get_node_guid(child) is None:
|
||
self.set_node_guid(child, quantity_guid)
|
||
print(f"为匹配的节点 {child_name} 添加从名称中提取的GUID: {quantity_guid}")
|
||
|
||
for child in inventory_node["children"]:
|
||
child_name = child.get("项目名称") or child.get("name", "")
|
||
child_id = child.get("id", "")
|
||
child_guid = self.get_node_guid(child)
|
||
|
||
# 情况0: GUID匹配(最高优先级)
|
||
if quantity_guid and child_guid and quantity_guid.upper() == child_guid:
|
||
guid_match_node = child
|
||
print(f"找到GUID匹配的工程量节点: {child_name} (GUID: {child_guid})")
|
||
break # 找到GUID匹配,立即使用
|
||
|
||
# 情况1: 完全匹配(次高优先级)
|
||
if child_name == quantity_name:
|
||
exact_match = child
|
||
# 不立即break,继续寻找可能的GUID匹配
|
||
|
||
# 情况2: 检查项目划分节点名称是否为"工程量名称_id"格式
|
||
child_name_parts = child_name.split("_")
|
||
child_base_name = child_name
|
||
child_id_from_name = None
|
||
|
||
if len(child_name_parts) > 1 and child_name_parts[-1].isdigit():
|
||
child_id_from_name = child_name_parts[-1]
|
||
child_base_name = "_".join(child_name_parts[:-1])
|
||
|
||
# 情况3: 名称和ID都匹配
|
||
if quantity_id and (child_id == quantity_id or child_id_from_name == quantity_id):
|
||
if child_base_name == quantity_base_name:
|
||
id_match = child
|
||
# 不立即break,继续寻找可能的更精确匹配
|
||
|
||
# 情况4: 基础名称匹配(最低优先级)
|
||
elif child_base_name == quantity_base_name:
|
||
name_match = child
|
||
# 不立即break,继续寻找更精确的匹配
|
||
|
||
# 按优先级选择最佳匹配
|
||
if guid_match_node:
|
||
quantity_node = guid_match_node
|
||
print(f"使用GUID匹配的工程量节点: {quantity_name}")
|
||
elif exact_match:
|
||
quantity_node = exact_match
|
||
print(f"使用完全匹配的工程量节点: {quantity_name}")
|
||
elif id_match:
|
||
quantity_node = id_match
|
||
print(f"使用ID匹配的工程量节点: {quantity_name} (ID: {quantity_id})")
|
||
elif name_match:
|
||
quantity_node = name_match
|
||
print(f"使用基础名称匹配的工程量节点: {quantity_base_name}")
|
||
|
||
if not quantity_node:
|
||
print(f"未找到名称为 {quantity_name} 的工程量节点")
|
||
continue
|
||
|
||
# 如果工程量节点没有GUID,使用从名称中提取的GUID或生成一个新的
|
||
existing_guid = self.get_node_guid(quantity_node)
|
||
if existing_guid is None:
|
||
if quantity_guid:
|
||
self.set_node_guid(quantity_node, quantity_guid)
|
||
print(f"使用名称中的GUID: {quantity_guid}")
|
||
else:
|
||
new_guid = self.generate_guid()
|
||
self.set_node_guid(quantity_node, new_guid)
|
||
print(f"生成新的GUID: {new_guid}")
|
||
else:
|
||
print(f"使用节点已有的GUID: {existing_guid}")
|
||
|
||
# 使用get_node_guid确保获取正确的GUID,无论是大写还是小写
|
||
quantity_guid = self.get_node_guid(quantity_node)
|
||
if quantity_guid:
|
||
quantity_guid_upper = quantity_guid
|
||
else:
|
||
# 这种情况不应该发生,因为前面已经确保了节点有GUID
|
||
print(f"警告: 节点 {quantity_name} 没有GUID")
|
||
continue
|
||
|
||
# 获取或创建工程量级费用节点
|
||
if quantity_guid_upper in quantity_nodes:
|
||
# 使用现有的工程量级节点
|
||
quantity_expense = quantity_nodes[quantity_guid_upper]
|
||
print(f"使用现有的工程量级节点: {quantity_guid}")
|
||
else:
|
||
# 创建新的工程量级费用节点
|
||
quantity_expense = {"GUID": quantity_guid, "sum": [], "children": [], "rcj": []}
|
||
print(f"创建新的工程量级节点: {quantity_guid}")
|
||
|
||
# 添加费用项到sum
|
||
for cost_type, amount in quantity_costs.items():
|
||
quantity_expense["sum"].append({"id": f"{cost_type}", "cost": str(amount)})
|
||
|
||
# 将工程量级费用节点添加到字典
|
||
quantity_nodes[quantity_guid_upper] = quantity_expense
|
||
|
||
# 将工程量级费用节点添加到清单级费用节点
|
||
inventory_expense["children"] = list(quantity_nodes.values())
|
||
|
||
# 将清单级费用节点添加到字典
|
||
inventory_nodes[inventory_guid_upper] = inventory_expense
|
||
|
||
# 将所有清单级费用节点添加到项目级费用预览的children中
|
||
# 注意:这里不覆盖原有的children,而是合并新旧节点
|
||
for guid, node in inventory_nodes.items():
|
||
# 如果已经存在于existing_inventory_nodes中,则已经处理过
|
||
if guid not in existing_inventory_nodes:
|
||
expense_node["children"].append(node)
|
||
|
||
print(f"成功添加GUID为 {division_node.get('GUID', '')} 的清单工程费用预览数据")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"处理清单工程费用数据失败: {str(e)}")
|
||
import traceback
|
||
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
def _find_node_by_guid(self, node, target_guid):
|
||
"""在节点及其子节点中查找指定GUID的节点"""
|
||
# 标准化GUID格式
|
||
target_guid = target_guid.strip("{}").upper()
|
||
|
||
if isinstance(node, dict):
|
||
# 检查当前节点
|
||
node_guid = self.get_node_guid(node)
|
||
if node_guid == target_guid:
|
||
return node
|
||
|
||
# 检查子节点
|
||
if "children" in node and isinstance(node["children"], list):
|
||
for child in node["children"]:
|
||
result = self._find_node_by_guid(child, target_guid)
|
||
if result:
|
||
return result
|
||
|
||
elif isinstance(node, list):
|
||
# 检查列表中的每个项
|
||
for item in node:
|
||
result = self._find_node_by_guid(item, target_guid)
|
||
if result:
|
||
return result
|
||
|
||
return None
|
||
|
||
def add_labor_material_machine_expense_data(self, project_guid, lmm_data):
|
||
"""添加人材机节点费用数据到rcj数组"""
|
||
try:
|
||
# 1. 查找项目划分节点
|
||
division_node = self.find_project_division_node(project_guid)
|
||
if not division_node:
|
||
print(f"未找到GUID为 {project_guid} 的项目划分节点")
|
||
return False
|
||
|
||
# 2. 查找费用预览节点
|
||
expense_node = self.find_expense_preview_node(project_guid)
|
||
if not expense_node:
|
||
print(f"未找到GUID为 {project_guid} 的费用预览节点")
|
||
return False
|
||
|
||
# 3. 确保节点有标准格式
|
||
self.ensure_standard_format(expense_node)
|
||
|
||
# 4. 确保rcj数组存在
|
||
if "rcj" not in expense_node:
|
||
expense_node["rcj"] = []
|
||
|
||
# 4. 处理人材机数据
|
||
# 添加人工节点
|
||
if "人工节点" in lmm_data:
|
||
for item in lmm_data["人工节点"]:
|
||
expense_node["rcj"].append(
|
||
{
|
||
"type": "人工",
|
||
"编码": item.get("编码", ""),
|
||
"名称": item.get("名称", ""),
|
||
"单位": item.get("单位", ""),
|
||
"预算价不含税": item.get("预算价不含税", ""),
|
||
"市场价不含税": item.get("市场价不含税", ""),
|
||
"预算价合价": item.get("预算价合价", ""),
|
||
"市场价合价": item.get("市场价合价", ""),
|
||
"价差": item.get("价差", ""),
|
||
"数量": item.get("数量", ""),
|
||
}
|
||
)
|
||
|
||
# 添加材料节点
|
||
if "材料节点" in lmm_data:
|
||
for item in lmm_data["材料节点"]:
|
||
expense_node["rcj"].append(
|
||
{
|
||
"type": "材料",
|
||
"供货方": item.get("供货方", ""),
|
||
"编码": item.get("编码", ""),
|
||
"名称": item.get("名称", ""),
|
||
"单位": item.get("单位", ""),
|
||
"预算价不含税": item.get("预算价不含税", ""),
|
||
"市场价不含税": item.get("市场价不含税", ""),
|
||
"预算价合价": item.get("预算价合价", ""),
|
||
"市场价合价": item.get("市场价合价", ""),
|
||
"价差": item.get("价差", ""),
|
||
"数量": item.get("数量", ""),
|
||
}
|
||
)
|
||
|
||
# 添加机械节点
|
||
if "机械节点" in lmm_data:
|
||
for item in lmm_data["机械节点"]:
|
||
expense_node["rcj"].append(
|
||
{
|
||
"type": "机械",
|
||
"编码": item.get("编码", ""),
|
||
"名称": item.get("名称", ""),
|
||
"单位": item.get("单位", ""),
|
||
"预算价不含税": item.get("预算价不含税", ""),
|
||
"市场价不含税": item.get("市场价不含税", ""),
|
||
"预算价合价": item.get("预算价合价", ""),
|
||
"市场价合价": item.get("市场价合价", ""),
|
||
"价差": item.get("价差", ""),
|
||
"数量": item.get("数量", ""),
|
||
}
|
||
)
|
||
|
||
print(f"成功添加GUID为 {project_guid} 的人材机节点费用预览数据到rcj数组")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"添加人材机节点费用预览数据失败: {str(e)}")
|
||
return False
|
||
|
||
def batch_process_expense_data(self, data_list):
|
||
"""
|
||
批量处理多个项目的费用预览数据
|
||
:param data_list: 包含项目GUID和费用数据的列表
|
||
:return: 是否全部成功处理
|
||
"""
|
||
all_success = True
|
||
for data in data_list:
|
||
project_guid = data.get("projectGuid")
|
||
calculation_results = data.get("calculationResults")
|
||
labor_material_machine_data = data.get("laborMaterialMachineData")
|
||
|
||
if calculation_results:
|
||
success1 = self.add_quantity_node_expense_data(project_guid, calculation_results)
|
||
all_success &= success1
|
||
|
||
if labor_material_machine_data:
|
||
success2 = self.add_labor_material_machine_expense_data(project_guid, labor_material_machine_data)
|
||
all_success &= success2
|
||
|
||
return all_success
|
||
|
||
def get_node_guid(self, node):
|
||
"""获取节点的GUID,无论是大写还是小写"""
|
||
if "GUID" in node:
|
||
return node["GUID"].strip("{}").upper()
|
||
elif "guid" in node:
|
||
return node["guid"].strip("{}").upper()
|
||
elif "id" in node and node["id"].startswith("{") and node["id"].endswith("}"):
|
||
# 如果id是GUID格式
|
||
return node["id"].strip("{}").upper()
|
||
return None
|
||
|
||
def set_node_guid(self, node, guid_value):
|
||
"""设置节点的GUID,统一使用大写GUID,并保留原有的guid和id(如果它们是GUID格式)"""
|
||
# 确保guid_value是标准格式(带花括号的大写GUID)
|
||
if not guid_value.startswith("{"):
|
||
guid_value = "{" + guid_value
|
||
if not guid_value.endswith("}"):
|
||
guid_value = guid_value + "}"
|
||
guid_value = guid_value.upper()
|
||
|
||
# 设置GUID属性(大写)
|
||
node["GUID"] = guid_value
|
||
|
||
# 如果已有小写guid属性,保持不变
|
||
# 如果已有id属性且是GUID格式,保持不变
|
||
# 这样可以保留原有的属性,避免数据丢失
|
||
return node
|
||
|
||
def batch_process_from_folder(self, folder_path, original_division=""):
|
||
"""
|
||
批量处理指定文件夹下的工程量和人材机JSON文件
|
||
:param folder_path: 包含JSON文件的文件夹路径
|
||
:param original_division: 原始文件中的division字段
|
||
:return: 成功处理的数量
|
||
"""
|
||
# 先列出所有项目划分中的GUID
|
||
print("\n=== 列出所有项目划分中的GUID ===")
|
||
all_guids = self.list_all_division_guids()
|
||
print("=== 列出结束 ===\n")
|
||
|
||
# 首先确保所有费用预览节点都有标准格式
|
||
print("\n=== 确保所有费用预览节点都有标准格式 ===")
|
||
standardized_count = 0
|
||
if all_guids:
|
||
for guid, name, path in all_guids:
|
||
try:
|
||
# 查找费用预览节点
|
||
expense_node = self.find_expense_preview_node(guid)
|
||
if expense_node:
|
||
self.ensure_standard_format(expense_node)
|
||
standardized_count += 1
|
||
except Exception as e:
|
||
print(f"标准化节点 {guid} 时出错: {str(e)}")
|
||
continue
|
||
print(f"已标准化 {standardized_count} 个费用预览节点")
|
||
print("=== 标准化结束 ===\n")
|
||
|
||
success_count = 0
|
||
files = os.listdir(folder_path)
|
||
|
||
guid_map = {}
|
||
|
||
# 第一步:遍历所有文件,按 GUID 分组
|
||
for filename in files:
|
||
if not filename.endswith(".json"):
|
||
continue
|
||
|
||
# 检查文件名格式
|
||
parts = filename.split("_")
|
||
if len(parts) < 2:
|
||
continue
|
||
|
||
# 尝试从文件名中提取GUID
|
||
guid = None
|
||
is_calculation_file = False
|
||
is_rcj_file = False
|
||
is_inventory_project = False # 新增变量,用于标记是否为清单工程文件
|
||
|
||
# 检查文件名是否包含GUID格式
|
||
import re
|
||
|
||
guid_match = re.search(
|
||
r"[0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12}", filename, re.IGNORECASE
|
||
)
|
||
|
||
if guid_match:
|
||
# 新格式:项目名称_GUID_文件类型.json
|
||
guid = guid_match.group(0)
|
||
if "calculation_results" in filename:
|
||
is_calculation_file = True
|
||
elif "rcj" in filename:
|
||
is_rcj_file = True
|
||
elif "清单工程" in filename: # 新增检查,如果文件名包含"清单工程",则标记为清单工程
|
||
is_inventory_project = True
|
||
is_calculation_file = True # 清单工程也作为计算结果处理
|
||
else:
|
||
# 旧格式:假设GUID是文件名中的第二个字段
|
||
if len(parts) >= 3:
|
||
guid = parts[1] # 文件名中的第二个字段为 GUID
|
||
guid = guid.strip("{}") # 去掉可能的花括号
|
||
|
||
if "调差_预算工程" in filename:
|
||
is_calculation_file = True
|
||
elif "调差_rcj" in filename:
|
||
is_rcj_file = True
|
||
elif "清单工程" in filename: # 新增检查,如果文件名包含"清单工程",则标记为清单工程
|
||
is_inventory_project = True
|
||
is_calculation_file = True # 清单工程也作为计算结果处理
|
||
|
||
if not guid:
|
||
print(f"无法从文件名 {filename} 提取GUID,跳过")
|
||
continue
|
||
|
||
# print(f"从文件名 {filename} 提取的GUID: {guid}")
|
||
|
||
if guid not in guid_map:
|
||
guid_map[guid] = {
|
||
"calc": None,
|
||
"rcj": None,
|
||
"division": original_division,
|
||
"is_inventory": is_inventory_project,
|
||
}
|
||
|
||
if is_calculation_file:
|
||
guid_map[guid]["calc"] = os.path.join(folder_path, filename)
|
||
guid_map[guid]["is_inventory"] = is_inventory_project
|
||
elif is_rcj_file:
|
||
guid_map[guid]["rcj"] = os.path.join(folder_path, filename)
|
||
|
||
# 第二步:逐个 GUID 加载数据并调用处理函数
|
||
for guid, paths in guid_map.items():
|
||
try:
|
||
# 尝试通过GUID查找项目划分节点
|
||
division_node = self.find_project_division_node(guid)
|
||
|
||
# 如果找不到,尝试通过文件名前缀(项目名称)查找
|
||
if not division_node:
|
||
file_path = paths["calc"] or paths["rcj"]
|
||
if file_path:
|
||
filename = os.path.basename(file_path)
|
||
project_name = filename.split("_")[0] # 假设文件名第一部分是项目名称
|
||
print(f"通过GUID未找到节点,尝试通过名称 '{project_name}' 查找")
|
||
division_node = self.find_division_node_by_name(project_name)
|
||
|
||
if not division_node:
|
||
print(f"无法找到对应的项目划分节点,跳过处理 GUID: {guid}")
|
||
continue
|
||
|
||
# 使用找到的节点的GUID
|
||
actual_guid = division_node.get("GUID", "")
|
||
# print(
|
||
# f"找到项目划分节点,GUID: {actual_guid}, 名称: {division_node.get('项目名称', division_node.get('name', '未命名'))}"
|
||
# )
|
||
|
||
# 查找费用预览节点,如果不存在则创建
|
||
expense_node = self.find_expense_preview_node(actual_guid)
|
||
if not expense_node:
|
||
print(f"未找到GUID为 {actual_guid} 的费用预览节点,将在处理时创建")
|
||
|
||
calc_data = None
|
||
rcj_data = None
|
||
|
||
# 处理计算结果文件
|
||
if paths["calc"]:
|
||
try:
|
||
with open(paths["calc"], "r", encoding="utf-8") as f:
|
||
file_data = json.load(f)
|
||
|
||
# 使用原始文件中的division字段
|
||
if original_division:
|
||
paths["division"] = original_division
|
||
print(f"使用原始文件中的division字段: {original_division}")
|
||
|
||
# 判断是否为清单工程
|
||
is_inventory = paths["is_inventory"]
|
||
|
||
if is_inventory:
|
||
# 清单工程,保留原始结构
|
||
calc_data = file_data
|
||
print("检测到清单工程,保留原始数据结构")
|
||
else:
|
||
# 预算工程,检查数据格式
|
||
has_guid_key = False
|
||
for key in file_data.keys():
|
||
if re.search(
|
||
r"[0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12}",
|
||
key,
|
||
re.IGNORECASE,
|
||
):
|
||
has_guid_key = True
|
||
break
|
||
|
||
if has_guid_key:
|
||
# 如果预算工程中有GUID键,也保留原始结构
|
||
calc_data = file_data
|
||
print("检测到预算工程中包含GUID键,保留原始数据结构")
|
||
else:
|
||
# 旧格式:直接使用
|
||
calc_data = file_data
|
||
except Exception as e:
|
||
print(f"读取工程量文件失败 {paths['calc']}: {e}")
|
||
continue
|
||
|
||
# 处理人材机文件
|
||
if paths["rcj"]:
|
||
try:
|
||
with open(paths["rcj"], "r", encoding="utf-8") as f:
|
||
file_data = json.load(f)
|
||
|
||
# 使用原始文件中的division字段
|
||
if original_division:
|
||
paths["division"] = original_division
|
||
print(f"使用原始文件中的division字段: {original_division}")
|
||
|
||
# 判断是否为清单工程
|
||
is_inventory = paths["is_inventory"]
|
||
|
||
if is_inventory:
|
||
# 清单工程,保留原始结构
|
||
rcj_data = file_data
|
||
print("检测到清单工程,保留原始数据结构")
|
||
else:
|
||
# 预算工程,检查数据格式
|
||
has_guid_key = False
|
||
for key in file_data.keys():
|
||
if re.search(
|
||
r"[0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12}",
|
||
key,
|
||
re.IGNORECASE,
|
||
):
|
||
has_guid_key = True
|
||
break
|
||
|
||
if has_guid_key:
|
||
# 如果预算工程中有GUID键,也保留原始结构
|
||
rcj_data = file_data
|
||
print("检测到预算工程中包含GUID键,保留原始数据结构")
|
||
else:
|
||
# 旧格式:直接使用
|
||
rcj_data = file_data
|
||
except Exception as e:
|
||
print(f"读取人材机文件失败 {paths['rcj']}: {e}")
|
||
continue
|
||
|
||
# 调用处理方法
|
||
success = True
|
||
if calc_data:
|
||
# 如果有division字段,添加到calc_data中
|
||
if paths["division"]:
|
||
calc_data["division"] = paths["division"]
|
||
success &= self.add_quantity_node_expense_data(guid, calc_data)
|
||
if rcj_data:
|
||
# 如果有division字段,添加到rcj_data中
|
||
if paths["division"]:
|
||
rcj_data["division"] = paths["division"]
|
||
success &= self.add_labor_material_machine_expense_data(guid, rcj_data)
|
||
|
||
if success:
|
||
success_count += 1
|
||
print(f"✅ 成功处理 GUID: {guid}")
|
||
else:
|
||
print(f"❌ 处理 GUID: {guid} 时发生错误")
|
||
except Exception as e:
|
||
print(f"处理 GUID: {guid} 时发生异常: {str(e)}")
|
||
continue
|
||
|
||
return success_count
|
||
|
||
def export_to_json(self, pretty=True):
|
||
"""
|
||
导出为JSON字符串
|
||
:param pretty: 是否格式化输出
|
||
:return: JSON字符串
|
||
"""
|
||
return json.dumps(self.project_data, ensure_ascii=False, indent=2 if pretty else 0)
|
||
|
||
def list_all_division_guids(self):
|
||
"""列出所有项目划分中的GUID"""
|
||
if "projectData" not in self.project_data or "projectDivision" not in self.project_data["projectData"]:
|
||
print("警告: project_data 为空或不包含 projectData.projectDivision")
|
||
return
|
||
|
||
guids = []
|
||
|
||
def collect_guids(node, path=""):
|
||
"""递归收集所有GUID"""
|
||
if isinstance(node, list):
|
||
for i, item in enumerate(node):
|
||
collect_guids(item, f"{path}[{i}]")
|
||
elif isinstance(node, dict):
|
||
guid = self.get_node_guid(node)
|
||
if guid:
|
||
name = node.get("项目名称", node.get("name", "未命名"))
|
||
guid_with_braces = "{" + guid + "}"
|
||
guids.append((guid_with_braces, name, path))
|
||
|
||
# 递归检查所有子节点,包括children和其他字典值
|
||
for key, value in node.items():
|
||
if isinstance(value, (dict, list)):
|
||
collect_guids(value, f"{path}.{key}")
|
||
|
||
# 从projectDivision开始收集
|
||
collect_guids(self.project_data["projectData"]["projectDivision"], "projectData.projectDivision")
|
||
|
||
# print(f"项目划分中共有 {len(guids)} 个GUID:")
|
||
# for guid, name, path in guids:
|
||
# print(f"GUID: {guid}, 名称: {name}, 路径: {path}")
|
||
|
||
return guids
|
||
|
||
def find_division_node_by_name(self, name):
|
||
"""通过名称查找项目划分节点"""
|
||
if "projectData" not in self.project_data or "projectDivision" not in self.project_data["projectData"]:
|
||
print("警告: project_data 为空或不包含 projectData.projectDivision")
|
||
return None
|
||
|
||
def search_by_name(node):
|
||
if isinstance(node, list):
|
||
for item in node:
|
||
result = search_by_name(item)
|
||
if result:
|
||
return result
|
||
elif isinstance(node, dict):
|
||
node_name = node.get("项目名称", node.get("name", ""))
|
||
if node_name == name:
|
||
return node
|
||
|
||
if "children" in node:
|
||
result = search_by_name(node["children"])
|
||
if result:
|
||
return result
|
||
return None
|
||
|
||
for category in self.project_data["projectData"]["projectDivision"].values():
|
||
result = search_by_name(category)
|
||
if result:
|
||
return result
|
||
|
||
return None
|
||
|
||
def ensure_standard_format(self, expense_node):
|
||
"""确保费用预览节点有标准格式(sum, children, rcj)"""
|
||
# 确保sum存在
|
||
if "sum" not in expense_node:
|
||
expense_node["sum"] = []
|
||
|
||
# 如果children中有费用项(直接费用项,不是子节点),将其移至sum并清空children
|
||
if "children" in expense_node:
|
||
has_direct_cost_items = False
|
||
for child in expense_node["children"]:
|
||
if "cost" in child and "id" in child and "GUID" not in child:
|
||
# 这是直接费用项,应该移到sum中
|
||
has_direct_cost_items = True
|
||
# 检查是否已经存在相同id的项
|
||
exists = False
|
||
for item in expense_node["sum"]:
|
||
if item.get("id") == child["id"]:
|
||
exists = True
|
||
break
|
||
|
||
if not exists:
|
||
# 只保留id和cost两个属性
|
||
expense_node["sum"].append({"id": child["id"], "cost": child["cost"]})
|
||
|
||
# 如果children中只有直接费用项,清空children
|
||
if has_direct_cost_items and all("GUID" not in child for child in expense_node["children"]):
|
||
expense_node["children"] = []
|
||
else:
|
||
expense_node["children"] = []
|
||
|
||
# 确保rcj存在
|
||
if "rcj" not in expense_node:
|
||
expense_node["rcj"] = []
|
||
|
||
# 确保sum中的项只有id和cost两个属性
|
||
for i, item in enumerate(expense_node["sum"]):
|
||
if "id" in item and "cost" in item:
|
||
expense_node["sum"][i] = {"id": item["id"], "cost": item["cost"]}
|
||
|
||
return expense_node
|
||
|
||
|
||
def write_BCLresult_into_json(original_json_path, bcl_result_folder, output_json_path):
|
||
"""
|
||
将BCL计算结果写入到原始JSON文件中
|
||
|
||
:param original_json_path: 原始JSON文件路径
|
||
:param bcl_result_folder: BCL计算结果文件夹路径
|
||
:param output_json_path: 输出JSON文件路径(合并后的JSON)
|
||
:return: 是否成功处理
|
||
"""
|
||
try:
|
||
# 1. 加载原始项目数据
|
||
print(f"加载原始项目数据: {original_json_path}")
|
||
with open(original_json_path, "r", encoding="utf-8") as f:
|
||
project_data = json.load(f)
|
||
|
||
# 获取原始文件中的division字段
|
||
original_division = project_data.get("division", "")
|
||
print(f"从原始文件中获取到division字段: {original_division}")
|
||
|
||
# 2. 初始化处理器
|
||
processor = ProjectExpenseProcessor()
|
||
processor.initialize_project_data(project_data)
|
||
|
||
# 3. 检查BCL计算结果文件夹是否存在
|
||
if not os.path.exists(bcl_result_folder):
|
||
print(f"错误: BCL计算结果文件夹不存在: {bcl_result_folder}")
|
||
return False
|
||
|
||
print(f"使用BCL计算结果文件夹: {bcl_result_folder}")
|
||
|
||
# 4. 批量处理
|
||
count = processor.batch_process_from_folder(bcl_result_folder, original_division)
|
||
print(f"共成功处理了 {count} 个项目节点。")
|
||
|
||
# 5. 导出更新后的数据
|
||
print(f"保存更新后的数据: {output_json_path}")
|
||
os.makedirs(os.path.dirname(output_json_path), exist_ok=True) # 确保输出目录存在
|
||
with open(output_json_path, "w", encoding="utf-8") as f:
|
||
f.write(processor.export_to_json(pretty=True))
|
||
|
||
print(f"✅ 数据已保存至 {output_json_path}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 处理失败: {str(e)}")
|
||
import traceback
|
||
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
|
||
def batch_write_BCLresult_into_json(original_folder, bcl_result_folder, output_folder):
|
||
"""
|
||
批量处理文件夹中的所有JSON文件,将BCL计算结果写入到原始JSON文件中
|
||
|
||
:param original_folder: 原始JSON文件夹路径
|
||
:param bcl_result_folder: BCL计算结果文件夹路径
|
||
:param output_folder: 输出文件夹路径(合并后的JSON)
|
||
:return: 处理成功的文件数量
|
||
"""
|
||
# 确保输出文件夹存在
|
||
os.makedirs(output_folder, exist_ok=True)
|
||
|
||
# 查找所有JSON文件
|
||
json_files = []
|
||
for file in os.listdir(original_folder):
|
||
if file.lower().endswith(".json"):
|
||
json_files.append(os.path.join(original_folder, file))
|
||
|
||
if not json_files:
|
||
print(f"警告: 在目录 {original_folder} 中没有找到JSON文件")
|
||
return 0
|
||
|
||
# 处理每个JSON文件
|
||
success_count = 0
|
||
for original_file in json_files:
|
||
# 构建输出文件路径
|
||
rel_path = os.path.relpath(original_file, original_folder)
|
||
output_file = os.path.join(output_folder, rel_path)
|
||
|
||
# 获取文件名(不含扩展名),用于查找对应的BCL计算结果文件夹
|
||
base_filename = os.path.splitext(os.path.basename(original_file))[0]
|
||
|
||
# 构建BCL计算结果文件夹路径(假设与原始文件同名)
|
||
file_bcl_result_folder = os.path.join(bcl_result_folder, base_filename)
|
||
|
||
# 如果不存在同名文件夹,使用BCL计算结果文件夹本身
|
||
if not os.path.exists(file_bcl_result_folder):
|
||
file_bcl_result_folder = bcl_result_folder
|
||
|
||
print(f"\n处理文件: {original_file}")
|
||
print(f"BCL计算结果文件夹: {file_bcl_result_folder}")
|
||
print(f"输出文件: {output_file}")
|
||
|
||
# 处理文件
|
||
if write_BCLresult_into_json(original_file, file_bcl_result_folder, output_file):
|
||
success_count += 1
|
||
|
||
return success_count
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 使用硬编码的文件路径,不需要命令行参数
|
||
original_file = "project2json/outputs/GPRB1MJL/merged/架线检修国网.json" # 原始JSON文件
|
||
bcl_result_folder = "project2json/outputs/GPRB1MJL/bclresults/架线检修国网" # BCL计算结果文件夹
|
||
output_file = "project2json/outputs/GPRB1MJL/final/架线检修国网.json" # 输出文件(合并后的JSON)
|
||
|
||
print(f"原始JSON文件: {original_file}")
|
||
print(f"BCL计算结果文件夹: {bcl_result_folder}")
|
||
print(f"输出文件: {output_file}")
|
||
|
||
success = write_BCLresult_into_json(original_file, bcl_result_folder, output_file)
|
||
if success:
|
||
print(f"\n处理成功: 数据已保存至 {output_file}")
|
||
else:
|
||
print("\n处理失败")
|