Files
KG_generation/equipment_calculation/quantity_fee_calculator.py
T
chentianrui 0a4dedda1c 更新代码
2025-10-14 16:13:18 +08:00

1137 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
import sys
from memory_profiler import profile
from typing import Dict, List, Any, Optional, Tuple, Set
from equipment_calculation.expressioncalculator import ExpressionCalculator
from equipment_calculation.bcl_utils import (
ZjQuantityBCLContext,
ZjProjectBCLContext,
ZjBillBCLContext,
init_bcl_calculator,
BCLVariant,
calculator,
create_project_contexts,
BCLDataSourceItem,
BCLDataSourceContext,
create_list_from_node,
create_node_from_type,
create_material_or_equipment_from_node,
)
from equipment_calculation.item_acquisition import (
get_cost_table_children,
get_quantity_nodes,
get_bill_cost_table,
find_cost_table,
get_bill_node_by_id,
load_project_data,
get_classified_resource_nodes,
)
# 缓存已计算过的费用
calculated_fees = {}
# # 添加一个全局变量和一个缓存字典来存储清单数量
# _BILL_QUANTITY = 1.0
# bill_quantity_cache = {}
# 主网架线处理地形系数
def process_DXdata(json_data):
"""
处理 projectData 中的线路特征段数据,计算每条 bpBillZhXsTable 记录的加权值,
并返回一个字典列表,每个字典包含 '特征段' 和对应的 ItemName: 计算结果。
:param json_data: 解析后的 JSON 数据(字典格式)
:return: 字典列表,例如 [{"特征段": "1", "工地运输...": "21.8"}, ...]
"""
result_list = []
# 遍历每个线路特征段
for segment in json_data.get("projectData", {}).get("线路特征段", []):
seg_name = segment.get("Name", "")
# 提取特征段中的数字,例如 "特征段1" -> "1"
seg_number = "".join(filter(str.isdigit, seg_name))
if not seg_number:
seg_number = "0" # 默认值
# 获取 bpBillZhXsTable 列表
table_list = segment.get("bpBillZhXsTable", [])
for item in table_list:
try:
# 提取字段并转换为浮点数
KnapScale = float(item.get("KnapScale", 0))
Knap = float(item.get("Knap", 0))
HillScale = float(item.get("HillScale", 0))
Hill = float(item.get("Hill", 0))
EdelweissScale = float(item.get("EdelweissScale", 0))
edelweiss = float(item.get("edelweiss", 0))
MountainScale = float(item.get("MountainScale", 0))
Mountain = float(item.get("Mountain", 0))
SloughScale = float(item.get("SloughScale", 0))
Slough = float(item.get("Slough", 0))
RiverScale = float(item.get("RiverScale", 0))
River = float(item.get("River", 0))
DesertScale = float(item.get("DesertScale", 0))
Desert = float(item.get("Desert", 0))
# 执行计算
total = (
(KnapScale * Knap)
+ (HillScale * Hill)
+ (EdelweissScale * edelweiss)
+ (MountainScale * Mountain)
+ (SloughScale * Slough)
+ (RiverScale * River)
+ (DesertScale * Desert)
) * 0.01
# 保留一位小数,格式化为字符串
# calculated_value = f"{total:.8f}"
# 获取 ItemName
item_name = item.get("ItemName", "未知项目")
# 构建结果字典
item_dict = {"特征段": seg_number, item_name: total}
# 添加到结果列表
result_list.append(item_dict)
except (ValueError, TypeError) as e:
print(f"数据转换错误,跳过该项: {e}")
continue
return result_list
# # 技改处理地形系数
# def JG_process_DXdata(json_data):
# 在create_list_from_node函数后添加一个包装函数
def create_list_from_node_with_bill_quantity(node, quantity=None):
"""创建清单对象并设置数量,不使用全局变量"""
from equipment_calculation.bcl_utils import create_list_from_node
bill_obj = create_list_from_node(node)
# 设置清单数量
if quantity is not None:
bill_quantity = quantity
elif "数量" in node:
try:
bill_quantity = float(node["数量"])
except (ValueError, TypeError):
bill_quantity = 1.0
else:
bill_quantity = 1.0
# 设置到对象属性
if hasattr(bill_obj, "quantity"):
bill_obj.quantity = bill_quantity
else:
# 如果对象没有quantity属性,尝试设置到其他可能的属性
try:
setattr(bill_obj, "quantity", bill_quantity)
except:
pass
print(f"设置清单对象数量: {bill_quantity}")
return bill_obj
# 递归查找取费表中包含"取费基数"的节点
def find_fee_base_nodes(node: Dict[str, Any], result: List[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
"""
递归查找取费表中包含"取费基数"的节点或有子节点的费用项
Args:
node: 取费表节点
result: 结果列表
Returns:
List[Dict[str, Any]]: 包含"取费基数"的节点或有子节点的费用项列表
"""
if result is None:
result = []
if isinstance(node, dict):
# 检查当前节点是否包含"取费基数"字段或有子节点
has_children = "children" in node and isinstance(node["children"], list) and node["children"]
has_code = "代码" in node
if "取费基数" in node or (has_children and has_code):
result.append(node)
# 递归检查子节点
for key, value in node.items():
if key == "children" and isinstance(value, list):
for child in value:
find_fee_base_nodes(child, result)
elif isinstance(value, (dict, list)):
find_fee_base_nodes(value, result)
elif isinstance(node, list):
for item in node:
find_fee_base_nodes(item, result)
return result
# 查找取费表中的费用项
def find_fee_item_by_code(cost_table: Dict[str, Any], code: str) -> Optional[Dict[str, Any]]:
"""
在取费表中查找指定代码的费用项
Args:
cost_table: 取费表
code: 费用代码
Returns:
Optional[Dict[str, Any]]: 找到的费用项,如果未找到则返回None
"""
def search_in_node(node):
if isinstance(node, dict):
if node.get("代码") == code:
return node
for key, value in node.items():
if isinstance(value, (dict, list)):
result = search_in_node(value)
if result:
return result
elif isinstance(node, list):
for item in node:
result = search_in_node(item)
if result:
return result
return None
return search_in_node(cost_table)
# 计算表外变量
def calculate_external_variable(var_name: str, context: ZjQuantityBCLContext, calculation_strategy=None) -> float:
"""
使用BCL计算器计算表外变量
Args:
var_name: 变量名
context: BCL上下文
calculation_strategy: 计算策略对象
Returns:
float: 计算结果
"""
try:
# 如果有计算策略,使用计算策略的方法
if calculation_strategy:
# print(f"使用计算策略 {calculation_strategy.__class__.__name__} 计算表外变量: {var_name}")
# 直接调用计算策略的方法,而不是递归调用自己
return calculation_strategy.calculate_external_variable(var_name, context)
# 否则使用默认实现
print(f"使用默认方法计算表外变量: {var_name}")
result = calculator.calculate(var_name, context)
# 检查结果类型,如果是 BCLVariant,尝试转换为浮点数
if hasattr(result, "__class__") and result.__class__.__name__ == "BCLVariant":
# 根据错误信息,BCLVariant 格式似乎是 Variant(BCLVariantType.FLOAT, 0.0)
# 尝试访问 value 属性或使用适当的方法获取值
try:
# 尝试不同的方法获取值
if hasattr(result, "value"):
return float(result.value)
elif hasattr(result, "get_value"):
return float(result.get_value())
elif hasattr(result, "__getitem__"):
# 如果 BCLVariant 支持索引访问,尝试获取第二个元素
return float(result[1])
else:
# 尝试直接转换为字符串,然后解析值
str_result = str(result)
# 假设格式是 Variant(BCLVariantType.FLOAT, 0.0)
# 尝试提取括号中的第二个值
import re
match = re.search(r"Variant\([^,]+,\s*([^)]+)\)", str_result)
if match:
return float(match.group(1))
# 如果上述方法都失败,尝试直接转换
return float(result)
except (ValueError, TypeError, AttributeError, IndexError) as e:
print(f"无法将 BCLVariant 类型的结果转换为浮点数: {result}, 错误: {e}")
# 打印更多调试信息
print(f"BCLVariant 类型: {type(result)}")
print(f"BCLVariant 属性: {dir(result)}")
return 0.0
# 如果不是 BCLVariant,尝试直接转换为浮点数
return float(result) if result is not None else 0.0
except Exception as e:
print(f"计算表外变量 '{var_name}' 时出错: {e}")
return 0.0
# 计算表内费用
def calculate_internal_fee(
fee_code: str,
cost_table: Dict[str, Any],
project_node: Dict[str, Any],
context: ZjQuantityBCLContext,
in_calculation: set = None,
calculation_strategy=None,
) -> float:
"""
计算表内费用
Args:
fee_code: 费用代码
cost_table: 取费表
project_node: 项目划分节点
context: BCL上下文
in_calculation: 正在计算中的费用代码集合,用于检测循环依赖
calculation_strategy: 计算策略对象
Returns:
float: 计算结果
"""
# 初始化正在计算的集合
if in_calculation is None:
in_calculation = set()
# 检查是否存在循环依赖
if fee_code in in_calculation:
print(f"检测到循环依赖: {fee_code}")
return 0.0
# 将当前费用代码添加到正在计算的集合中
in_calculation.add(fee_code)
# 检查缓存中是否已有计算结果
cache_key = f"{fee_code}_{project_node.get('id', '')}"
if cache_key in calculated_fees:
# 从正在计算的集合中移除当前费用代码
in_calculation.remove(fee_code)
return calculated_fees[cache_key]
# 查找费用项
fee_item = find_fee_item_by_code(cost_table, fee_code)
if not fee_item:
print(f"未找到代码为 '{fee_code}' 的费用项")
# 从正在计算的集合中移除当前费用代码
in_calculation.remove(fee_code)
return 0.0
# 获取取费基数和费率
fee_base = fee_item.get("取费基数")
fee_rate = fee_item.get("费率(%)") or fee_item.get("费率")
# 尝试转换费率为浮点数
try:
if fee_rate is None or (isinstance(fee_rate, str) and fee_rate.strip() == ""):
rate = 1.0
else:
rate = float(fee_rate) / 100.0
except (ValueError, TypeError):
print(f"无法将费率 '{fee_rate}' 转换为浮点数,使用默认值1.0")
rate = 1.0
# 如果有取费基数,直接计算
if fee_base:
base_value = calculate_fee_base(
fee_base, cost_table, project_node, context, in_calculation, calculation_strategy
)
# 确保 base_value 不为 None
base_value = base_value if base_value is not None else 0.0
result = base_value * rate
else:
# 如果没有取费基数,检查是否有子节点
children = fee_item.get("children", [])
if children:
# 计算所有子节点的费用之和
result = 0.0
for child in children:
child_code = child.get("代码", "")
if child_code:
child_result = calculate_internal_fee(
child_code, cost_table, project_node, context, in_calculation, calculation_strategy
)
# 确保 child_result 不为 None
child_result = child_result if child_result is not None else 0.0
result += child_result
else:
print(f"费用项 '{fee_code}' 没有取费基数且没有子节点")
result = 0.0
# 缓存计算结果
calculated_fees[cache_key] = result
# 从正在计算的集合中移除当前费用代码
in_calculation.remove(fee_code)
return result
# 计算取费基数
def calculate_fee_base(
fee_base: str,
cost_table: Dict[str, Any],
project_node: Dict[str, Any],
context: ZjQuantityBCLContext,
in_calculation: set = None,
calculation_strategy=None,
) -> float:
"""
计算取费基数
Args:
fee_base: 取费基数表达式
cost_table: 取费表
project_node: 项目划分节点
context: BCL上下文
in_calculation: 正在计算中的费用代码集合,用于检测循环依赖
calculation_strategy: 计算策略对象
Returns:
float: 计算结果
"""
# 初始化正在计算的集合
if in_calculation is None:
in_calculation = set()
# 检查是否是直接引用另一个费用项的代码
if fee_base in calculated_fees:
return calculated_fees[fee_base]
# 创建表达式计算器
expr_calculator = ExpressionCalculator()
parse_success = expr_calculator.parse_expression(fee_base)
if not parse_success:
print(f"测试表达式解析失败: {expr_calculator.get_last_error()}")
return None
# 获取表达式中的变量
variables = expr_calculator.variables
variable_values = {}
# 处理每个变量
for var in variables:
# 检查是否为表内费用代码
if var in calculated_fees:
# 使用缓存的计算结果
variable_values[var] = calculated_fees[var]
elif find_fee_item_by_code(cost_table, var):
# 计算表内费用
value = calculate_internal_fee(var, cost_table, project_node, context, in_calculation, calculation_strategy)
# 确保值不为 None
variable_values[var] = value if value is not None else 0.0
else:
# 处理表外变量
value = calculate_external_variable(var, context, calculation_strategy)
# 确保值不为 None
variable_values[var] = value if value is not None else 0.0
# 计算最终结果
try:
result, value = expr_calculator.evaluate(variable_values)
# 确保结果不为 None
return value if result else 0.0
except Exception as e:
print(f"计算表达式 '{fee_base}' 时出错: {e}")
return 0.0
# 计算项目节点的所有费用
def calculate_all_fees(
project_node: Dict[str, Any],
cost_table: Dict[str, Any],
json_file_path: str = None,
engineering_type: str = None,
calculation_strategy=None,
json_data: dict | None = None,
) -> Dict[str, float]:
"""
计算项目节点的所有费用,确保清单数量正确传递
"""
results = {}
# 1. 工程信息上下文(优先使用传入的 json_data)
project_context = create_project_contexts(json_file_path=json_file_path, json_data=json_data)
if json_data is not None:
data = json_data
else:
with open(json_file_path, "r", encoding="utf-8") as file:
data = json.load(file)
processed_data = process_DXdata(data)
DXITEM = [BCLDataSourceItem(item) for item in processed_data]
# 2. 构建数据源链式上下文
if engineering_type == "清单工程":
# 获取清单节点
bill_id = project_node.get("bill_id")
bill_name = project_node.get("bill_name")
bill_node_data = project_node.get("bill_node")
if not bill_node_data:
bill_node_data = {
"id": bill_id,
"GUID": bill_id,
"清单名称": bill_name,
"取费表名称": project_node.get("取费表名称"),
"清单全码": project_node.get("清单全码", ""),
"编码": project_node.get("清单编码", ""),
"单位": project_node.get("单位", ""),
}
# 使用包装函数创建清单对象
bill_obj = create_list_from_node_with_bill_quantity(bill_node_data)
# 创建工程量对象
project_obj = create_node_from_type(project_node)
dxitem_context = BCLDataSourceContext(DXITEM, project_context)
dxitem_context.variables["@特征段地形系数"] = BCLVariant(DXITEM)
# 递归数据源链式上下文
billItem = BCLDataSourceItem(bill_obj)
# 将清单数量直接设置到billItem对象上
if hasattr(bill_obj, "quantity"):
billItem.bill_quantity = bill_obj.quantity
print(f"设置清单数据源项数量: {billItem.bill_quantity}")
# 如果无法创建工程量对象,则跳过并返回当前已计算结果,避免未定义的 QuantityItem
if not project_obj:
print(
f"警告: 无法从项目节点创建工程量对象,跳过该节点: "
f"{project_node.get('项目名称', project_node.get('name', '未知节点'))}"
)
return results
QuantityItem = BCLDataSourceItem(project_obj, billItem)
childItems = []
if project_obj.children:
for moe in project_obj.children:
childItems.append(BCLDataSourceItem(moe, QuantityItem))
QuantityItem.set_childs(childItems)
bill_context = BCLDataSourceContext([billItem], dxitem_context)
# # 构建人材机数据源项(若存在)并接入上下文
# rcj_nodes_for_parent = project_node.get("_rcj_nodes", [])
# moe_items = []
# if rcj_nodes_for_parent:
# try:
# for rcj_node in rcj_nodes_for_parent:
# node_obj = create_material_or_equipment_from_node(rcj_node)
# moe_items.append(BCLDataSourceItem(node_obj, QuantityItem))
# except Exception as e:
# print(f"构建人材机数据源项出错: {e}")
# items_chain = [QuantityItem] + moe_items if moe_items else [QuantityItem]
context = BCLDataSourceContext([QuantityItem], bill_context)
# 如果计算策略存在,设置清单数量
if calculation_strategy and hasattr(calculation_strategy, "set_bill_quantity"):
if hasattr(bill_obj, "quantity"):
calculation_strategy.set_bill_quantity(bill_obj.quantity)
print(f"设置计算策略的清单数量: {bill_obj.quantity}")
else:
# 非清单工程 - 工程信息 -> 工程量节点
project_obj = create_node_from_type(project_node)
dxitem_context = BCLDataSourceContext(DXITEM, project_context)
dxitem_context.variables["@特征段地形系数"] = BCLVariant(DXITEM)
# 如果无法创建工程量对象,则跳过并返回当前已计算结果,避免未定义的 QuantityItem
if not project_obj:
print(
f"警告: 无法从项目节点创建工程量对象,跳过该节点: "
f"{project_node.get('项目名称', project_node.get('name', '未知节点'))}"
)
return results
QuantityItem = BCLDataSourceItem(project_obj)
childItems = []
if project_obj.children:
for moe in project_obj.children:
childItems.append(BCLDataSourceItem(moe, QuantityItem))
QuantityItem.set_childs(childItems)
# # 构建人材机数据源项(若存在)并接入上下文
# rcj_nodes_for_parent = project_node.get("_rcj_nodes", [])
# moe_items = []
# if rcj_nodes_for_parent:
# try:
# for rcj_node in rcj_nodes_for_parent:
# node_obj = create_material_or_equipment_from_node(rcj_node)
# moe_items.append(BCLDataSourceItem(node_obj, QuantityItem))
# except Exception as e:
# print(f"构建人材机数据源项出错: {e}")
# items_chain = [QuantityItem] + moe_items if moe_items else [QuantityItem]
context = BCLDataSourceContext([QuantityItem], dxitem_context)
# 查找包含取费基数的节点
fee_base_nodes = find_fee_base_nodes(cost_table)
# 计算每个费用项
for node in fee_base_nodes:
fee_name = node.get("费用名称", node.get("name", "未知费用"))
fee_code = node.get("代码", "")
fee_base = node.get("取费基数", "")
fee_rate = node.get("费率(%)") or node.get("费率")
# 尝试转换费率为浮点数
try:
if fee_rate is None or (isinstance(fee_rate, str) and fee_rate.strip() == ""):
rate = 1.0
else:
rate = float(fee_rate) / 100.0
except (ValueError, TypeError):
print(f"无法将费率 '{fee_rate}' 转换为浮点数,使用默认值1.0")
rate = 1.0
if fee_base:
# 计算取费基数
base_value = calculate_fee_base(fee_base, cost_table, project_node, context, None, calculation_strategy)
# 应用费率
result = base_value * rate
results[fee_name] = result
# 缓存计算结果
if fee_code:
calculated_fees[fee_code] = result
elif fee_code:
# 如果没有取费基数但有代码,尝试计算
result = calculate_internal_fee(fee_code, cost_table, project_node, context, None, calculation_strategy)
results[fee_name] = result
return results
# 计算工程量取费表
##原始代码
# def calculate_quantity_fees(
# json_file_path: str, project_name: str, adjustment_type: str = None, engineering_type: str = None
# ) -> str:
# """
# 计算工程量取费表
# Args:
# json_file_path: JSON文件路径
# project_name: 项目名称
# adjustment_type: 调差类型
# engineering_type: 工程类型
# Returns:
# str: 输出文件路径
# """
# # 设置输出目录和文件名
# output_dir = "计算结果"
# os.makedirs(output_dir, exist_ok=True)
# output_file = os.path.join(
# output_dir, f"{project_name}_{adjustment_type}_{engineering_type}_calculation_results.json"
# )
# # 初始化BCL计算器
# if not init_bcl_calculator():
# return None
# # 根据工程类型获取取费表和项目节点
# if engineering_type == "预算工程":
# # 预算工程 - 获取项目划分节点的取费表
# cost_table_children = get_cost_table_children(json_file_path, project_name)
# project_children = get_quantity_nodes(json_file_path, project_name, adjustment_type, engineering_type)
# if not cost_table_children or not project_children:
# print(f"未找到项目 '{project_name}' 的取费表或项目划分节点")
# return None
# elif engineering_type == "清单工程":
# # 清单工程 - 获取清单节点的取费表
# project_children = get_quantity_nodes(json_file_path, project_name, adjustment_type, engineering_type)
# if not project_children:
# print(f"未找到项目 '{project_name}' 的项目划分节点")
# return None
# # 为每个工程量节点找到对应的清单节点取费表
# cost_tables = {}
# for project_node in project_children:
# node_name = project_node.get("项目名称", project_node.get("name", "未知节点"))
# # 获取清单节点信息
# bill_id = project_node.get("bill_id")
# fee_table_name = project_node.get("取费表名称")
# if not bill_id:
# print(f"工程量节点 '{node_name}' 缺少清单节点ID")
# continue
# if not fee_table_name:
# print(f"工程量节点 '{node_name}' 没有取费表名称")
# continue
# # 使用清单节点的取费表名称直接查找取费表
# if bill_id not in cost_tables:
# print(f"查找清单节点 '{project_node.get('bill_name', '未命名清单')}' 的取费表: {fee_table_name}")
# # 查找取费表
# with open(json_file_path, "r", encoding="utf-8") as f:
# data = json.load(f)
# project_data = data.get("projectData", {})
# cost_setting = project_data.get("costSetting", {})
# # 查找取费表
# cost_table = find_cost_table(cost_setting, fee_table_name)
# if not cost_table:
# print(f"未找到取费表 '{fee_table_name}'")
# continue
# cost_table_children = cost_table.get("children", None)
# if not cost_table_children:
# print(f"取费表 '{fee_table_name}' 没有子节点")
# continue
# cost_tables[bill_id] = cost_table_children
# print(f"成功获取清单节点 '{project_node.get('bill_name', '未命名清单')}' 的取费表")
# else:
# print(f"不支持的工程类型: {engineering_type}")
# return None
# # 初始化缓存
# calculated_fees.clear()
# # 计算每个项目节点的费用
# all_results = {}
# for project_node in project_children:
# # 为每个节点清空缓存,确保计算独立
# calculated_fees.clear()
# node_name = project_node.get("项目名称", project_node.get("name", "未知节点"))
# # 根据工程类型获取对应的取费表
# if engineering_type == "预算工程":
# # 预算工程 - 使用项目划分节点的取费表
# node_results = calculate_all_fees(
# project_node, {"children": cost_table_children}, json_file_path, engineering_type
# )
# else: # 清单工程
# # 清单工程 - 使用清单节点的取费表
# parent_id = project_node.get("parent_id")
# if parent_id and parent_id in cost_tables:
# node_results = calculate_all_fees(
# project_node, {"children": cost_tables[parent_id]}, json_file_path, engineering_type
# )
# else:
# print(f"无法获取工程量节点 '{node_name}' 的取费表")
# continue
# all_results[node_name] = node_results
# # 输出计算结果
# print(f"费用计算结果:")
# for fee_name, fee_value in node_results.items():
# print(f" {fee_name}: {fee_value}")
# # 保存计算结果到JSON文件
# with open(output_file, "w", encoding="utf-8") as f:
# json.dump(all_results, f, ensure_ascii=False, indent=2)
# print(f"\n计算结果已保存到 {output_file}")
# return output_file
def calculate_quantity_fees(
json_file_path: str,
project_name: str,
engineering_type: str = None,
project_guid: str = None,
calculation_strategy=None,
software_type: str = None,
json_data: dict | None = None,
) -> str:
"""
计算工程量取费表,不使用全局变量,并按清单节点组织结果
"""
# 如果没有提供计算策略,使用默认策略
if calculation_strategy is None:
from equipment_calculation.calculation_strategy import DefaultCalculationStrategy
calculation_strategy = DefaultCalculationStrategy()
# 设置输出目录和文件名
# 检查calculation_strategy是否有get_output_dir方法
if hasattr(calculation_strategy, "get_output_dir"):
output_dir = calculation_strategy.get_output_dir()
else:
# 兼容旧代码,使用默认目录
output_dir = "计算结果"
os.makedirs(output_dir, exist_ok=True)
# 获取项目划分节点的GUID
_, _, _, _, target_node = load_project_data(json_file_path, project_name, project_guid)
if target_node:
node_guid = target_node.get("GUID") or target_node.get("guid", "")
# 如果传入了GUID但与节点的GUID不一致,优先使用传入的GUID
project_guid = project_guid if project_guid else node_guid
# 将文件名中的非法字符替换为下划线
safe_project_name = (
project_name.replace("/", "_")
.replace("\\", "_")
.replace(":", "_")
.replace("*", "_")
.replace("?", "_")
.replace('"', "_")
.replace("<", "_")
.replace(">", "_")
.replace("|", "_")
)
# 将GUID中的非法字符替换为下划线
safe_project_guid = ""
if project_guid:
safe_project_guid = (
project_guid.replace("/", "_")
.replace("\\", "_")
.replace(":", "_")
.replace("*", "_")
.replace("?", "_")
.replace('"', "_")
.replace("<", "_")
.replace(">", "_")
.replace("|", "_")
.replace("{", "")
.replace("}", "")
)
# 添加下划线作为分隔符
safe_project_guid = f"_{safe_project_guid}"
output_file = os.path.join(
output_dir,
f"{safe_project_name}{safe_project_guid}_{engineering_type}_calculation_results.json",
)
# 创建一个字典来存储清单数量
bill_quantities = {}
# 创建一个字典来存储清单信息
bill_info = {}
# 根据工程类型获取取费表和项目节点
if engineering_type == "预算工程":
# 预算工程 - 获取项目划分节点的取费表,传递project_guid参数
cost_table_children = get_cost_table_children(json_file_path, project_name, project_guid)
project_children = get_quantity_nodes(
json_file_path, project_name, engineering_type, project_guid, software_type
)
if not cost_table_children or not project_children:
print(f"未找到项目 '{project_name}' (GUID: {project_guid}) 的取费表或项目划分节点")
return None
elif engineering_type == "清单工程":
# 清单工程 - 获取清单节点的取费表,传递project_guid参数
project_children = get_quantity_nodes(
json_file_path, project_name, engineering_type, project_guid, software_type
)
if not project_children:
print(f"未找到项目 '{project_name}' (GUID: {project_guid}) 的项目划分节点")
return None
# 为每个工程量节点找到对应的清单节点取费表
cost_tables = {}
# 准备一次性 JSON 数据(清单工程需要从 projectData 中获取 costSetting 与 bill
if json_data is not None:
_data_all = json_data
else:
with open(json_file_path, "r", encoding="utf-8") as f:
_data_all = json.load(f)
_project_data_all = _data_all.get("projectData", {})
_cost_setting_all = _project_data_all.get("costSetting", {})
_bill_data_all = _project_data_all.get("bill", {})
for project_node in project_children:
node_name = project_node.get("项目名称", project_node.get("name", "未知节点"))
# 获取清单节点信息
bill_id = project_node.get("bill_id")
bill_name = project_node.get("bill_name", "未命名清单")
fee_table_name = project_node.get("取费表名称")
if not bill_id:
print(f"工程量节点 '{node_name}' 缺少清单节点ID")
continue
if not fee_table_name:
print(f"工程量节点 '{node_name}' 没有取费表名称")
continue
# 保存清单信息
if bill_id not in bill_info:
bill_info[bill_id] = {
"name": bill_name,
"fee_table_name": fee_table_name,
"guid": bill_id.replace("{", "").replace("}", "") if bill_id else "",
}
# 使用清单节点的取费表名称直接查找取费表
if bill_id not in cost_tables:
print(f"查找清单节点 '{bill_name}' 的取费表: {fee_table_name}")
# 查找取费表:使用已加载的数据
cost_table = find_cost_table(_cost_setting_all, fee_table_name)
if not cost_table:
print(f"未找到取费表 '{fee_table_name}'")
continue
cost_table_children = cost_table.get("children", None)
if not cost_table_children:
print(f"取费表 '{fee_table_name}' 没有子节点")
continue
cost_tables[bill_id] = cost_table_children
print(f"成功获取清单节点 '{bill_name}' 的取费表")
# 遍历项目中的清单节点,获取并保存清单数量(使用已加载的数据)
bill_data = _bill_data_all
# 递归函数获取所有清单节点
def get_bill_nodes(node):
if isinstance(node, dict):
# 检查是否是清单节点
is_bill = (
node.get("类型") == "8" or node.get("类型") == "清单" or "清单名称" in node or "清单编码" in node
)
if is_bill and ("id" in node or "GUID" in node):
# 强制使用GUID而不是ID
bill_id = node.get("GUID") or node.get("guid") or node.get("id")
quantity = 1.0 # 默认值
if "数量" in node:
try:
quantity = float(node["数量"])
except (TypeError, ValueError):
pass
# 保存到字典
bill_quantities[bill_id] = quantity
# 补充清单信息
if bill_id not in bill_info:
bill_info[bill_id] = {
"name": node.get("清单名称", "未命名清单"),
"fee_table_name": node.get("取费表名称", ""),
"guid": bill_id.replace("{", "").replace("}", "") if bill_id else "",
}
print(f"保存清单 {bill_id} 的数量: {quantity}")
# 递归处理子节点
if "children" in node and isinstance(node["children"], list):
for child in node["children"]:
get_bill_nodes(child)
# 获取和设置清单数量
get_bill_nodes(bill_data)
else:
print(f"不支持的工程类型: {engineering_type}")
return None
# 预取并分派人材机节点到各工程量节点上,供后续上下文使用
try:
labor_nodes, material_nodes, machine_nodes = get_classified_resource_nodes(
json_file_path, project_name, project_guid
)
# 合并三类节点,形成父ID到节点列表的映射
rcj_all = []
if labor_nodes:
rcj_all.extend([n for n, _ in labor_nodes])
if material_nodes:
rcj_all.extend([n for n, _ in material_nodes])
if machine_nodes:
rcj_all.extend([n for n, _ in machine_nodes])
rcj_by_parent: Dict[str, List[Dict[str, Any]]] = {}
# 注意:get_classified_resource_nodes 返回的是 (node, parent_id) 元组列表
def add_to_map(pairs):
if not pairs:
return
for node, parent_id in pairs:
if parent_id is None:
continue
key = str(parent_id)
rcj_by_parent.setdefault(key, []).append(node)
add_to_map(labor_nodes)
add_to_map(material_nodes)
add_to_map(machine_nodes)
# 将对应的人材机列表写入每个工程量节点
if project_children:
for node in project_children:
pid_candidates = [
node.get("GUID"),
node.get("guid"),
node.get("id"),
]
rcj_list = []
for pid in pid_candidates:
if pid and str(pid) in rcj_by_parent:
rcj_list = rcj_by_parent.get(str(pid), [])
break
if rcj_list:
node["_rcj_nodes"] = rcj_list
except Exception as e:
print(f"预取人材机节点失败: {e}")
# 在获取 project_children 后添加预处理代码
if project_children and calculation_strategy and hasattr(calculation_strategy, "preprocess_quantity_fee_node"):
print(f"对项目节点进行预处理...")
# 递归处理所有节点
def process_node(node):
calculation_strategy.preprocess_quantity_fee_node(node)
if "children" in node and isinstance(node["children"], list):
for child in node["children"]:
process_node(child)
# 处理每个项目节点
for node in project_children:
process_node(node)
# 初始化缓存
calculated_fees.clear() # 使用全局缓存,而不是计算策略中的缓存
# 创建一个新的结果结构,按清单节点组织
structured_results = {}
# 辅助函数:处理ID,移除花括号并保留完整ID
def process_id(id_str):
if not id_str:
return ""
# 移除花括号
return id_str.replace("{", "").replace("}", "")
# 计算每个项目节点的费用
for project_node in project_children:
# 为每个节点清空缓存,确保计算独立
calculated_fees.clear()
node_name = project_node.get("项目名称", project_node.get("name", "未知节点"))
# 强制使用GUID而不是ID
node_id = project_node.get("GUID") or project_node.get("guid") or project_node.get("id", "")
# 处理节点ID
processed_node_id = process_id(node_id)
# 创建唯一的节点键
node_key = node_name
if processed_node_id:
node_key = f"{node_name}_{processed_node_id}"
# 添加清单数量信息到项目节点
if engineering_type == "清单工程":
# 强制使用GUID而不是ID
bill_id = project_node.get("bill_guid") or project_node.get("bill_id")
if bill_id in bill_quantities:
project_node["bill_quantity"] = bill_quantities[bill_id]
print(f"为工程量节点 '{node_name}' 设置清单数量: {bill_quantities[bill_id]}")
# 如果计算策略支持,设置清单数量
if calculation_strategy and hasattr(calculation_strategy, "set_bill_quantity"):
calculation_strategy.set_bill_quantity(bill_quantities[bill_id])
# 根据工程类型获取对应的取费表
if engineering_type == "预算工程":
# 预算工程 - 使用项目划分节点的取费表
node_results = calculation_strategy.calculate_all_fees(
project_node,
{"children": cost_table_children},
json_file_path,
engineering_type,
json_data=json_data,
)
# 直接使用节点键作为结果键
structured_results[node_key] = node_results
else: # 清单工程
# 清单工程 - 使用清单节点的取费表
# 强制使用GUID而不是ID
bill_id = project_node.get("bill_guid") or project_node.get("bill_id")
if bill_id and bill_id in cost_tables:
node_results = calculation_strategy.calculate_all_fees(
project_node,
{"children": cost_tables[bill_id]},
json_file_path,
engineering_type,
json_data=json_data,
)
# 获取清单信息
bill_name = bill_info.get(bill_id, {}).get("name", "未命名清单")
# 处理清单ID - 强制使用GUID
processed_bill_id = process_id(bill_id)
# 创建清单的唯一键 - 强制使用GUID
bill_key = bill_name
if processed_bill_id:
# 获取清单的guid而不是id
bill_guid = bill_info.get(bill_id, {}).get("guid", processed_bill_id)
if not bill_guid:
bill_guid = processed_bill_id
bill_key = f"{bill_name}_{bill_guid}"
# 如果清单节点不存在于结果中,创建它
if bill_key not in structured_results:
structured_results[bill_key] = {}
# 将工程量节点的计算结果添加到对应的清单节点下
structured_results[bill_key][node_key] = node_results
else:
print(f"无法获取工程量节点 '{node_name}' 的取费表")
continue
# 输出计算结果
# print(f"费用计算结果:")
# for fee_name, fee_value in node_results.items():
# print(f" {fee_name}: {fee_value}")
# 保存计算结果到JSON文件
with open(output_file, "w", encoding="utf-8") as f:
json.dump(structured_results, f, ensure_ascii=False, indent=2)
print(f"\n计算结果已保存到 {output_file}")
# 如果有计算策略,应用后处理规则
if calculation_strategy:
calculation_strategy.post_process_quantity_fees(output_file, project_name)
return output_file