Files
KG_generation/equipment_calculation/计算项目划分下个工程量节点的人材机合价.py
T
chentianrui 9609bb67b4 上传文件
2025-08-01 15:31:56 +08:00

719 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
import math
from typing import Dict, List, Any, Tuple
from copy import deepcopy
from equipment_calculation.item_acquisition import get_quantity_nodes, get_classified_resource_nodes, load_project_data
from bcl_utils import (
ZjMaterialOrEquipmentBCLContext,
ZjProjectBCLContext,
ZjBillBCLContext,
init_bcl_calculator,
calculator,
)
from item_acquisition import get_quantity_nodes, get_classified_resource_nodes
# 人材机节点定义合并条件常量
LABOR_MERGE_CONDITIONS = [
"编码",
"名称",
"单位",
"预算价含税",
"预算价不含税",
"市场价不含税",
"市场价含税",
"调差类型",
"专业属性",
"供货方",
"物料类材料",
]
MATERIAL_MERGE_CONDITIONS = [
"类型",
"编码",
"名称",
"单位",
"预算价不含税",
"预算价含税",
"市场价不含税",
"市场价含税",
"调差类型",
"专业属性",
"供货方",
"集中配送",
"卸车",
"保管",
"物料类材料",
]
MACHINE_MERGE_CONDITIONS = [
"编码",
"名称",
"单位",
"预算价不含税",
"预算价含税",
"市场价不含税",
"市场价含税",
"调差类型",
"专业属性",
"供货方",
"物料类材料",
]
from bcl_utils import (
ZjMaterialOrEquipmentBCLContext,
ZjProjectBCLContext,
ZjBillBCLContext,
init_bcl_calculator,
calculator,
)
def calc_rcj_count(
rcj_nodes: List[Tuple[Dict[str, Any], str]], project_children: List[Dict[str, Any]], json_file_path: str = None
) -> List[Dict[str, Any]]:
"""
计算人材机节点的数量,考虑父级消耗量
Args:
rcj_nodes: 人材机节点列表,每个元素是(节点, 父级ID)元组
project_children: 项目划分级别下的所有工程量节点
json_file_path: JSON文件路径,用于获取工程信息
Returns:
List[Dict[str, Any]]: 计算数量后的人材机节点列表
"""
result_nodes = []
# 检查rcj_nodes是否为空
if not rcj_nodes:
print("没有找到人材机节点")
return result_nodes
# 检查project_children是否为None
if project_children is None:
print("没有找到工程量节点,使用默认数量")
# 使用默认数量处理所有人材机节点
for node, parent_id in rcj_nodes:
node_copy = deepcopy(node)
node_copy["计算数量"] = node_copy.get("数量", "1.0")
# 保存父级ID
node_copy["parent_id"] = parent_id
# 由于没有找到父级节点,使用默认名称
node_copy["parent_name"] = "未知工程量节点"
result_nodes.append(node_copy)
return result_nodes
# 创建父级ID到工程量节点的映射
parent_nodes = {}
for node in project_children:
if "id" in node:
parent_nodes[node["id"]] = node
print(f"找到 {len(parent_nodes)} 个父级工程量节点")
# 初始化BCL计算器
if not init_bcl_calculator():
print("初始化BCL计算器失败,使用默认数量")
# 使用默认数量处理所有人材机节点
for node, parent_id in rcj_nodes:
node_copy = deepcopy(node)
node_copy["计算数量"] = node_copy.get("数量", "1.0")
# 保存父级ID
node_copy["parent_id"] = parent_id
# 由于没有找到父级节点,使用默认名称
node_copy["parent_name"] = "未知工程量节点"
result_nodes.append(node_copy)
return result_nodes
# 创建工程信息上下文(顶层上下文)
project_context = ZjProjectBCLContext(json_file_path=json_file_path)
# 遍历所有节点,计算数量
for node, parent_id in rcj_nodes:
node_copy = deepcopy(node)
# 保存父级ID
node_copy["parent_id"] = parent_id
# 获取父级工程量节点
parent_node = parent_nodes.get(parent_id)
if not parent_node:
print(f"未找到ID为 {parent_id} 的父级工程量节点,使用默认数量")
# 使用默认数量
node_copy["计算数量"] = node_copy.get("数量", "1.0")
# 由于没有找到父级节点,使用默认名称
node_copy["parent_name"] = "未知工程量节点"
result_nodes.append(node_copy)
continue
# 保存父级节点名称
parent_name = parent_node.get("项目名称", parent_node.get("name", "未命名工程量"))
node_copy["parent_name"] = parent_name
# 确保父级节点有数量字段
if "数量" not in parent_node or not parent_node["数量"]:
print(f"父级节点 {parent_id} 没有数量字段,使用默认数量")
parent_node["数量"] = "1.0"
# 打印父级节点信息
print(f"父级节点 {parent_id} ({parent_name}) 的数量: {parent_node.get('数量', '1.0')}")
# 创建定额上下文(中间层上下文)
ration_context = ZjBillBCLContext(prefix="定额", valueDict=parent_node, prevContext=project_context)
# 创建人材机上下文(底层上下文)
context = ZjMaterialOrEquipmentBCLContext(node_data=node, parent_node=parent_node, prevContext=ration_context)
# 根据节点类型选择不同的计算表达式
node_type = node.get("类型", "")
if node_type == "人工":
calc_expr = "_材机合并人工数量"
elif node_type == "材料":
calc_expr = "_材机合并材料数量"
elif node_type == "机械":
calc_expr = "_材机合并机械数量"
else:
# 未知类型,使用默认数量
print(f"未知节点类型 {node_type},使用默认数量")
node_copy["计算数量"] = node_copy.get("数量", "1.0")
result_nodes.append(node_copy)
continue
try:
# 使用BCL计算器计算数量
result = calculator.calculate(calc_expr, context)
# 打印调试信息
print(f"计算 {node.get('名称', '未知节点')} 的数量,表达式: {calc_expr}, 结果: {result}")
# 处理计算结果
if hasattr(result, "value"):
calculated_quantity = result.value
elif isinstance(result, (int, float)):
calculated_quantity = result
else:
# 尝试转换为浮点数
try:
calculated_quantity = float(result)
except (ValueError, TypeError):
print(f"无法将计算结果转换为浮点数: {result}")
calculated_quantity = float(node_copy.get("数量", "1.0") or "1.0")
# 如果计算结果为0,尝试使用原始数量
if calculated_quantity == 0:
orig_quantity = float(node_copy.get("数量", "0.0") or "0.0")
if orig_quantity > 0:
print(f"计算结果为0,使用原始数量: {orig_quantity}")
calculated_quantity = orig_quantity
# 设置计算数量
node_copy["计算数量"] = str(calculated_quantity)
except Exception as e:
print(f"计算节点 '{node.get('名称', '未知节点')}' 的数量时出错: {e}")
# 使用默认数量
node_copy["计算数量"] = node_copy.get("数量", "1.0")
result_nodes.append(node_copy)
return result_nodes
def generate_node_key(node: Dict[str, Any], conditions: List[str]) -> str:
"""
根据合并条件生成节点的唯一键,增加parent_id作为条件
Args:
node: 节点
conditions: 合并条件列表
Returns:
str: 节点的唯一键
"""
key_parts = []
# 添加parent_id作为合并条件的第一个元素
parent_id = node.get("parent_id", "")
key_parts.append(f"parent_id:{parent_id}")
# 添加其他合并条件
for condition in conditions:
value = node.get(condition, "")
# 确保值是字符串
if value is None:
value = ""
elif not isinstance(value, str):
value = str(value)
key_parts.append(f"{condition}:{value}")
return "|".join(key_parts)
def merge_similar_nodes(nodes: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
合并相似的节点
Args:
nodes: 节点列表
Returns:
List[Dict[str, Any]]: 合并后的节点列表
"""
if not nodes:
return []
# 按节点类型分组
nodes_by_type = {}
for node in nodes:
node_type = node.get("类型", "")
if node_type not in nodes_by_type:
nodes_by_type[node_type] = []
nodes_by_type[node_type].append(node)
merged_nodes = []
# 处理每种类型的节点
for node_type, type_nodes in nodes_by_type.items():
# 选择合并条件
if node_type == "人工":
conditions = LABOR_MERGE_CONDITIONS
elif node_type == "材料":
conditions = MATERIAL_MERGE_CONDITIONS
elif node_type == "机械":
conditions = MACHINE_MERGE_CONDITIONS
else:
# 未知类型,不合并
merged_nodes.extend(type_nodes)
continue
# 使用字典存储合并后的节点,键是根据合并条件生成的唯一键
merged_dict = {}
for node in type_nodes:
key = generate_node_key(node, conditions)
if key in merged_dict:
# 合并节点
merged_node = merged_dict[key]
# 合并数量
merged_quantity = float(merged_node.get("取整数量", "0.0") or "0.0")
node_quantity = float(node.get("取整数量", "0.0") or "0.0")
merged_node["取整数量"] = str(merged_quantity + node_quantity)
# 合并计算数量
merged_calc_quantity = float(merged_node.get("计算数量", "0.0") or "0.0")
node_calc_quantity = float(node.get("计算数量", "0.0") or "0.0")
merged_node["计算数量"] = str(merged_calc_quantity + node_calc_quantity)
# 更新原始数量
merged_orig_quantity = float(merged_node.get("数量", "0.0") or "0.0")
node_orig_quantity = float(node.get("数量", "0.0") or "0.0")
merged_node["数量"] = str(merged_orig_quantity + node_orig_quantity)
# 添加合并信息
if "合并来源" not in merged_node:
# 使用唯一标识符,如果没有id字段则使用节点本身的索引
merged_node_id = merged_node.get("id", f"node_{id(merged_node)}")
merged_node["合并来源"] = [merged_node_id]
# 添加当前节点ID到合并来源
node_id = node.get("id", f"node_{id(node)}")
merged_node["合并来源"].append(node_id)
# 更新合并数量计数
merged_node["合并数量"] = str(len(merged_node["合并来源"]))
else:
# 创建新节点
merged_dict[key] = deepcopy(node)
# 添加合并信息
node_id = node.get("id", f"node_{id(node)}")
merged_dict[key]["合并来源"] = [node_id]
merged_dict[key]["合并数量"] = "1"
# 将合并后的节点添加到结果列表
merged_nodes.extend(merged_dict.values())
return merged_nodes
def cat_rcj_count(rcj_nodes: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
处理人材机节点数量的变化并合并相似节点
Args:
rcj_nodes: 人材机节点列表
Returns:
List[Dict[str, Any]]: 处理数量变化后的人材机节点列表
"""
# 使用计算数量,不进行向上取整
processed_nodes = []
for node in rcj_nodes:
node_copy = deepcopy(node)
# 获取计算数量,直接使用,不再向上取整
calc_quantity = float(node_copy.get("计算数量", "0.0") or "0.0")
node_copy["取整数量"] = str(calc_quantity)
processed_nodes.append(node_copy)
# 合并相似节点
merged_nodes = merge_similar_nodes(processed_nodes)
return merged_nodes
def calc_rcj_fee(rcj_nodes: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
计算人材机节点的合价
Args:
rcj_nodes: 人材机节点列表
Returns:
List[Dict[str, Any]]: 计算合价后的人材机节点列表
"""
result_nodes = []
for node in rcj_nodes:
node_copy = deepcopy(node)
# 获取取整数量
quantity = float(node_copy.get("取整数量", "0.0") or "0.0")
# 获取预算价不含税和市场价不含税
budget_price = float(node_copy.get("预算价不含税", "0.0") or "0.0")
market_price = float(node_copy.get("市场价不含税", "0.0") or "0.0")
# 计算预算价合价和市场价合价
budget_total = quantity * budget_price
market_total = quantity * market_price
# 计算价差
price_diff = market_total - budget_total
# 设置合价信息
node_copy["预算价合价"] = str(budget_total)
node_copy["市场价合价"] = str(market_total)
node_copy["价差"] = str(price_diff)
result_nodes.append(node_copy)
return result_nodes
def format_rcj_output(rcj_nodes: List[Dict[str, Any]], node_type: str) -> List[Dict[str, Any]]:
"""
格式化人材机节点输出,只保留需要的字段
Args:
rcj_nodes: 人材机节点列表
node_type: 节点类型("人工"、"材料"、"机械"
Returns:
List[Dict[str, Any]]: 格式化后的节点列表
"""
formatted_nodes = []
for node in rcj_nodes:
formatted_node = {}
# 保留父级工程量节点信息
formatted_node["parent_id"] = node.get("parent_id", "")
formatted_node["parent_name"] = node.get("parent_name", "未知工程量节点")
# 根据节点类型保留特定字段
if node_type == "人工":
# 人工节点保存:编码,名称,单位,数量,预算价不含税,市场价不含税,预算价合价,市场价合价,价差
fields = [
"编码",
"名称",
"单位",
"取整数量",
"预算价不含税",
"市场价不含税",
"预算价合价",
"市场价合价",
"价差",
]
for field in fields:
formatted_node[field] = node.get(field, "")
elif node_type == "材料":
# 材料节点保存:供货方,编码,名称,单位,数量,预算价不含税,市场价不含税,预算价合价,市场价合价,价差
fields = [
"供货方",
"编码",
"名称",
"单位",
"取整数量",
"预算价不含税",
"市场价不含税",
"预算价合价",
"市场价合价",
"价差",
]
for field in fields:
formatted_node[field] = node.get(field, "")
elif node_type == "机械":
# 机械节点保存:编码,名称,单位,数量,预算价不含税,市场价不含税,预算价合价,市场价合价,价差
fields = [
"编码",
"名称",
"单位",
"取整数量",
"预算价不含税",
"市场价不含税",
"预算价合价",
"市场价合价",
"价差",
]
for field in fields:
formatted_node[field] = node.get(field, "")
# 重命名取整数量为数量
if "取整数量" in formatted_node:
formatted_node["数量"] = formatted_node.pop("取整数量")
formatted_nodes.append(formatted_node)
return formatted_nodes
def calculate_rcj_fees(
json_file_path: str,
project_name: str,
adjustment_type: str = "拆除",
engineering_type: str = "预算工程",
project_guid: str = None,
) -> Dict[str, Any]:
"""
计算项目划分节点下所有人材机节点的合价
Args:
json_file_path: JSON文件路径
project_name: 项目名称
adjustment_type: 调差类型,默认为"拆除"
engineering_type: 工程类型,默认为"预算工程"
project_guid: 项目GUID,用于区分同名项目
Returns:
Dict[str, Any]: 计算结果,按工程量节点分组
"""
# 获取工程量节点,传递project_guid参数
project_children = get_quantity_nodes(json_file_path, project_name, adjustment_type, engineering_type, project_guid)
# 如果没有找到项目划分节点
if project_children is None:
print(f"警告: 未找到项目 '{project_name}' 的项目划分节点")
return {}
# 如果找到项目划分节点但没有工程量节点
if len(project_children) == 0:
print(f"警告: 项目 '{project_name}' (GUID: {project_guid}) 没有工程量节点,跳过计算")
return {}
# 获取分类后的人材机节点,传递project_guid参数
labor_nodes, material_nodes, machine_nodes = get_classified_resource_nodes(
json_file_path, project_name, adjustment_type, project_guid
)
# 打印调试信息
print(f"找到 {len(labor_nodes)} 个人工节点, {len(material_nodes)} 个材料节点, {len(machine_nodes)} 个机械节点")
# 计算人工节点的数量和合价
labor_with_count = calc_rcj_count(labor_nodes, project_children, json_file_path)
labor_with_cat = cat_rcj_count(labor_with_count)
labor_with_fee = calc_rcj_fee(labor_with_cat)
labor_formatted = format_rcj_output(labor_with_fee, "人工")
# 计算材料节点的数量和合价
material_with_count = calc_rcj_count(material_nodes, project_children, json_file_path)
material_with_cat = cat_rcj_count(material_with_count)
material_with_fee = calc_rcj_fee(material_with_cat)
material_formatted = format_rcj_output(material_with_fee, "材料")
# 计算机械节点的数量和合价
machine_with_count = calc_rcj_count(machine_nodes, project_children, json_file_path)
machine_with_cat = cat_rcj_count(machine_with_count)
machine_with_fee = calc_rcj_fee(machine_with_cat)
machine_formatted = format_rcj_output(machine_with_fee, "机械")
# 创建工程量节点ID到名称的映射
node_id_to_name = {}
for node in project_children:
if "id" in node:
node_name = node.get("项目名称", node.get("name", f"工程量节点_{node['id']}"))
node_id_to_name[node["id"]] = node_name
# 按工程量节点分组结果
result_by_parent = {}
# 处理人工节点
for node in labor_formatted:
parent_id = node.pop("parent_id", "")
parent_name = node.pop("parent_name", "未知工程量节点")
# 使用父级节点名称作为键
parent_key = node_id_to_name.get(parent_id, parent_name)
if parent_key not in result_by_parent:
result_by_parent[parent_key] = {"人工节点": [], "材料节点": [], "机械节点": []}
result_by_parent[parent_key]["人工节点"].append(node)
# 处理材料节点
for node in material_formatted:
parent_id = node.pop("parent_id", "")
parent_name = node.pop("parent_name", "未知工程量节点")
# 使用父级节点名称作为键
parent_key = node_id_to_name.get(parent_id, parent_name)
if parent_key not in result_by_parent:
result_by_parent[parent_key] = {"人工节点": [], "材料节点": [], "机械节点": []}
result_by_parent[parent_key]["材料节点"].append(node)
# 处理机械节点
for node in machine_formatted:
parent_id = node.pop("parent_id", "")
parent_name = node.pop("parent_name", "未知工程量节点")
# 使用父级节点名称作为键
parent_key = node_id_to_name.get(parent_id, parent_name)
if parent_key not in result_by_parent:
result_by_parent[parent_key] = {"人工节点": [], "材料节点": [], "机械节点": []}
result_by_parent[parent_key]["机械节点"].append(node)
# 计算合价只用于调试显示
for parent_key, parent_data in result_by_parent.items():
labor_fee = sum(float(node.get("预算价合价", "0.0") or "0.0") for node in parent_data["人工节点"])
material_fee = sum(float(node.get("预算价合价", "0.0") or "0.0") for node in parent_data["材料节点"])
machine_fee = sum(float(node.get("预算价合价", "0.0") or "0.0") for node in parent_data["机械节点"])
print(f"工程量节点 '{parent_key}': 人工合价={labor_fee}, 材料合价={material_fee}, 机械合价={machine_fee}")
return result_by_parent
def calculate_resource_fees(
json_file_path: str,
project_name: str,
adjustment_type: str,
engineering_type: str,
project_guid: str = None,
calculation_strategy=None,
) -> str:
"""
计算人材机合价
Args:
json_file_path: JSON文件路径
project_name: 项目名称
adjustment_type: 调差类型
engineering_type: 工程类型
project_guid: 项目GUID,用于区分同名项目
calculation_strategy: 计算策略,如果为None则使用默认策略
Returns:
str: 输出文件路径,如果没有工程量节点则返回None
"""
# 如果没有提供计算策略,使用默认策略
if calculation_strategy is None:
from calculation_strategy import DefaultCalculationStrategy
calculation_strategy = DefaultCalculationStrategy()
# 获取项目划分节点的GUID
_, _, _, _, target_node = load_project_data(json_file_path, project_name, project_guid)
# 如果传入了GUID但与节点的GUID不一致,优先使用传入的GUID
node_guid = target_node.get("GUID") or target_node.get("guid", "") if target_node else ""
project_guid = project_guid if project_guid else node_guid
# 将文件名中的非法字符替换为下划线
safe_project_name = (
project_name.replace("/", "_")
.replace("\\", "_")
.replace(":", "_")
.replace("*", "_")
.replace("?", "_")
.replace('"', "_")
.replace("<", "_")
.replace(">", "_")
.replace("|", "_")
)
# 将GUID中的非法字符替换为下划线
safe_project_guid = ""
if project_guid:
safe_project_guid = (
project_guid.replace("/", "_")
.replace("\\", "_")
.replace(":", "_")
.replace("*", "_")
.replace("?", "_")
.replace('"', "_")
.replace("<", "_")
.replace(">", "_")
.replace("|", "_")
.replace("{", "")
.replace("}", "")
)
# 添加下划线作为分隔符
safe_project_guid = f"_{safe_project_guid}"
# 设置输出目录和文件名
output_dir = "计算结果"
os.makedirs(output_dir, exist_ok=True)
rcj_output_file = os.path.join(
output_dir, f"{safe_project_name}{safe_project_guid}_{adjustment_type}_rcj_fees.json"
)
# 计算人材机节点的合价,传递project_guid参数
# 这里使用 calculation_strategy 的 calculate_rcj_fees 方法
if hasattr(calculation_strategy, "calculate_rcj_fees"):
rcj_results = calculation_strategy.calculate_rcj_fees(
json_file_path, project_name, adjustment_type, engineering_type, project_guid
)
else:
# 如果计算策略没有实现 calculate_rcj_fees 方法,使用原始函数
rcj_results = calculate_rcj_fees(json_file_path, project_name, adjustment_type, engineering_type, project_guid)
# 检查是否有人材机数据 - 适应新的按工程量节点分组的结构
has_data = False
# 如果是旧结构(直接包含人工节点、材料节点、机械节点)
if any(key in rcj_results for key in ["人工节点", "材料节点", "机械节点"]):
for node_type in ["人工节点", "材料节点", "机械节点"]:
if rcj_results.get(node_type) and len(rcj_results[node_type]) > 0:
has_data = True
break
# 如果是新结构(按工程量节点分组)
else:
for parent_name, parent_data in rcj_results.items():
for node_type in ["人工节点", "材料节点", "机械节点"]:
if parent_data.get(node_type) and len(parent_data[node_type]) > 0:
has_data = True
break
if has_data:
break
if not has_data:
print(f"项目 '{project_name}' (GUID: {project_guid}) 没有人材机数据,不保存结果")
return None
# 保存人材机合价计算结果到JSON文件
with open(rcj_output_file, "w", encoding="utf-8") as f:
json.dump(rcj_results, f, ensure_ascii=False, indent=2)
print(f"\n人材机合价计算结果已保存到 {rcj_output_file}")
return rcj_output_file