上传文件至 equipment_calculation

This commit is contained in:
2025-08-02 14:55:07 +08:00
parent e0fc238765
commit c50ebec75f
2 changed files with 1018 additions and 0 deletions
+315
View File
@@ -0,0 +1,315 @@
"""
第二步:计算bcl结果
"""
import os
import argparse
import re
import json
from typing import Dict, List, Any, Optional, Tuple
from equipment_calculation.software_types import (
get_software_type,
SoftwareCategory,
EngineeringType,
ALL_SOFTWARE_TYPES,
)
from equipment_calculation.software_calculators import get_calculator
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description="工程量取费和人材机合价计算程序")
# 软件类型参数
parser.add_argument(
"--category", choices=["主网", "配网", "技改"], default="主网", help="软件类别(主网/配网/技改)"
)
parser.add_argument("--engineering-type", choices=["预算", "清单"], default="清单", help="工程类型(预算/清单)")
# 计算类型参数
parser.add_argument(
"--calculate",
choices=["all", "quantity", "resource"],
default="quantity",
help="计算类型(all: 全部, quantity: 工程量取费, resource: 人材机合价)",
)
# 项目参数
parser.add_argument("--project", help="项目名称,如果不指定则处理所有项目")
parser.add_argument("--adjustment-type", default="调差", help="调差类型,默认为'调差'")
# 输入文件参数
parser.add_argument(
"--input-file",
default="测试案例/主网清单变电.json",
help="输入JSON文件路径,如果不指定则使用默认路径",
)
# 添加输入和输出文件夹参数
parser.add_argument("--input-folder", help="输入文件夹路径,包含要处理的JSON文件")
parser.add_argument("--output-folder", default="计算结果", help="输出文件夹路径,用于保存计算结果")
return parser.parse_args()
def parse_filename(filename: str) -> Tuple[str, str]:
"""
从文件名中解析软件类别和工程类型(备用方法)
:param filename: JSON文件名,例如"主网预算变电.json"
:return: (category, engineering_type) 元组,例如 ("主网", "预算")
"""
# 移除扩展名
basename = os.path.splitext(filename)[0]
# 查找类别(主网/配网/技改)
category = None
for cat in ["主网", "配网", "技改"]:
if cat in basename:
category = cat
break
# 查找工程类型(预算/清单)
engineering_type = None
for eng_type in ["预算", "清单"]:
if eng_type in basename:
engineering_type = eng_type
break
# 如果未找到,使用默认值
if not category:
print(f"警告: 无法从文件名 '{filename}' 中解析软件类别,使用默认值 '主网'")
category = "主网"
if not engineering_type:
print(f"警告: 无法从文件名 '{filename}' 中解析工程类型,使用默认值 '清单'")
engineering_type = "清单"
return category, engineering_type
def parse_json_content(json_file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""
从JSON文件内容中解析软件类别和工程类型
:param json_file_path: JSON文件路径
:return: (category, engineering_type) 元组,如果解析失败则返回 (None, None)
"""
try:
with open(json_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# 定义阶段类型映射表
budget_types = ["概预算", "定额", "估算", "概算"]
list_types = ["清单", "结算", "招标控制价", "招投标工程"]
# 从division字段获取软件名称和阶段类型
if "division" in data:
division = data["division"]
print(f"找到division字段: {division}")
# 使用-分割division字段
parts = division.split("-")
if len(parts) >= 2:
category = parts[0].strip()
stage_type = parts[1].strip()
# 验证软件类别
if category not in ["主网", "配网", "技改"]:
print(f"警告: division中的软件名称 '{category}' 不是有效值,将使用默认值 '主网'")
category = "主网"
# 映射阶段类型
if any(budget_type in stage_type for budget_type in budget_types):
engineering_type = "预算"
elif any(list_type in stage_type for list_type in list_types):
engineering_type = "清单"
else:
print(f"警告: division中的阶段类型 '{stage_type}' 无法映射到预算或清单,将使用默认值 '清单'")
engineering_type = "清单"
print(f"从division解析: 软件名称={category}, 阶段类型={engineering_type} (原始值: {stage_type})")
return category, engineering_type
else:
print(f"警告: division字段 '{division}' 格式不正确,无法分割")
else:
print(f"警告: JSON文件中未找到division字段,尝试从basicData中解析")
# 作为备选,尝试从basicData中获取
if "basicData" in data:
basic_data = data["basicData"]
category = basic_data.get("软件名称")
engineering_type = basic_data.get("阶段类型")
# 验证解析结果
if category and engineering_type:
# 确保category是有效值
if category not in ["主网", "配网", "技改"]:
print(f"警告: basicData中的软件名称 '{category}' 不是有效值,将使用默认值 '主网'")
category = "主网"
# 确保engineering_type是有效值
if engineering_type not in ["预算", "清单"]:
print(f"警告: basicData中的阶段类型 '{engineering_type}' 不是有效值,将使用默认值 '清单'")
engineering_type = "清单"
print(f"从basicData解析: 软件名称={category}, 阶段类型={engineering_type}")
return category, engineering_type
else:
print(f"警告: basicData中未找到软件名称或阶段类型")
else:
print(f"警告: JSON文件中未找到basicData部分")
return None, None
except Exception as e:
print(f"解析JSON文件内容时出错: {str(e)}")
return None, None
def process_json_file(
input_file: str,
base_output_dir: str = "计算结果",
category: Optional[str] = None,
engineering_type: Optional[str] = None,
project: Optional[str] = None,
adjustment_type: str = "调差",
) -> bool:
"""
处理单个JSON文件
:param input_file: 输入JSON文件路径
:param base_output_dir: 基础输出目录路径,实际结果会保存在此目录下的子文件夹中
:param category: 软件类别,如果为None则从JSON内容中解析
:param engineering_type: 工程类型,如果为None则从JSON内容中解析
:param project: 项目名称,如果为None则处理所有项目
:param adjustment_type: 调差类型
:return: 处理是否成功
"""
try:
# 获取文件名(不含扩展名)作为输出子文件夹的名称
filename = os.path.basename(input_file)
file_basename = os.path.splitext(filename)[0]
output_dir = os.path.join(base_output_dir, file_basename)
# 创建输出子文件夹
os.makedirs(output_dir, exist_ok=True)
print(f"创建输出目录: {output_dir}")
# 如果未指定category或engineering_type,从JSON内容中解析
if category is None or engineering_type is None:
# 首先尝试从JSON内容中解析
json_category, json_engineering_type = parse_json_content(input_file)
if category is None:
if json_category:
category = json_category
else:
# 如果从JSON内容中解析失败,尝试从文件名中解析(作为备用)
print("从JSON内容解析软件名称失败,尝试从文件名解析...")
parsed_category, _ = parse_filename(filename)
category = parsed_category
if engineering_type is None:
if json_engineering_type:
engineering_type = json_engineering_type
else:
# 如果从JSON内容中解析失败,尝试从文件名中解析(作为备用)
print("从JSON内容解析阶段类型失败,尝试从文件名解析...")
_, parsed_engineering_type = parse_filename(filename)
engineering_type = parsed_engineering_type
print(f"处理文件: {input_file}")
print(f" 软件类别: {category}")
print(f" 工程类型: {engineering_type}")
# 获取软件类型
software_type = get_software_type(category, engineering_type)
print(f" 使用软件类型: {software_type.name}")
# 获取计算器
calculator = get_calculator(software_type)
if not calculator:
print(f" 错误: 未找到软件类型 {software_type.name} 的计算器")
return False
# 设置输出目录
calculator.set_output_dir(output_dir)
# 执行计算
print(" 开始计算工程量取费表...")
calculator.calculate_quantity_fee_tables(json_file_path=input_file, project_name=project)
print(" 开始计算人材机合价...")
calculator.calculate_resource_fee_tables(json_file_path=input_file, project_name=project)
print(f" 处理完成: {input_file}")
print(f" 结果保存在: {output_dir}")
return True
except Exception as e:
print(f" 处理文件 {input_file} 时出错: {str(e)}")
return False
def process_BCL_calculate(input_folder: str, output_folder: str) -> List[Tuple[str, bool]]:
"""
处理指定文件夹中的所有JSON文件
:param input_folder: 输入文件夹路径,包含要处理的JSON文件
:param output_folder: 输出文件夹路径,用于保存计算结果
:return: 处理结果列表,格式为 [(文件路径, 是否成功), ...]
"""
# 确保基础输出目录存在
os.makedirs(output_folder, exist_ok=True)
# 查找所有JSON文件
json_files = []
for file in os.listdir(input_folder):
if file.lower().endswith(".json"):
json_files.append(os.path.join(input_folder, file))
if not json_files:
print(f"警告: 在目录 {input_folder} 中没有找到JSON文件")
return []
# 处理每个JSON文件
results = []
for input_file in json_files:
success = process_json_file(
input_file=input_file,
base_output_dir=output_folder, # 传递基础输出目录
category=None, # 从JSON内容中解析
engineering_type=None, # 从JSON内容中解析
project=None, # 处理所有项目
adjustment_type="调差",
)
results.append((input_file, success))
return results
def main():
"""程序入口"""
input_folder = "project2json/outputs/json"
output_folder = "project2json/outputs/bcl_results"
results = process_BCL_calculate(input_folder, output_folder)
# 显示处理结果
success_count = sum(1 for _, success in results if success)
print(f"\n处理完成: 成功 {success_count}/{len(results)} 个文件")
if success_count < len(results):
print("处理失败的文件:")
for file_path, success in results:
if not success:
print(f" - {os.path.basename(file_path)}")
if __name__ == "__main__":
main()
@@ -0,0 +1,703 @@
from equipment_calculation.calculator_base import CalculatorBase
from equipment_calculation.software_types import (
SoftwareType,
MAIN_GRID_BUDGET,
MAIN_GRID_BILL,
DISTRIBUTION_BUDGET,
DISTRIBUTION_BILL,
TECHNICAL_RENOVATION_BUDGET,
TECHNICAL_RENOVATION_BILL,
)
from equipment_calculation.calculation_strategy import CalculationStrategy, DefaultCalculationStrategy
import json
import os
from typing import Any, Dict, Optional, Set
import sys
import re
# 添加一个全局变量和一个缓存字典来存储清单数量
_BILL_QUANTITY = 1.0
bill_quantity_cache = {}
# 为每种软件类型定义特定的计算策略
class MainGridBudgetCalculationStrategy(DefaultCalculationStrategy):
"""主网预算计算策略"""
def preprocess_quantity_fee_node(self, node: dict) -> None:
"""
工程量取费表节点预处理规则:
- 类型为"1""主材"时,供货方映射,主材属性补全
- 类型为"5""设备"时,供货方映射,设备类型映射
"""
node_type = str(node.get("类型", ""))
# 定额处理
if node_type == "0" or node_type == "定额":
supplier = str(node.get("费用类型", ""))
if supplier == "1":
node["费用类型"] = "不取费"
elif supplier == "0":
node["费用类型"] = "取费"
# 处理特征段
seg_val = node.get("特征段", "")
if isinstance(seg_val, str):
num = re.search(r"\d+", seg_val)
node["特征段"] = num.group() if num else ""
# 主材处理
if node_type == "1" or node_type == "主材":
supplier = str(node.get("供货方", ""))
if supplier == "1":
node["供货方"] = "甲供"
elif supplier == "2":
node["供货方"] = "乙供"
# 补全主材属性
for key in ["拆分", "设备性材料"]:
if key not in node:
node[key] = 0
# 设备处理
if node_type == "5" or node_type == "设备":
supplier = str(node.get("供货方", ""))
if supplier == "1":
node["供货方"] = "甲供"
elif supplier == "2":
node["供货方"] = "乙供"
# 设备类型映射
if str(node.get("设备类型", "")) == "0":
node["设备类型"] = "普通设备"
class MainGridBillCalculationStrategy(DefaultCalculationStrategy):
"""主网清单计算策略"""
def __init__(self):
super().__init__()
self.bill_quantity = 1.0 # 默认值
def set_bill_quantity(self, quantity):
"""设置清单数量"""
try:
self.bill_quantity = float(quantity)
print(f"设置计算策略的清单数量: {self.bill_quantity}")
except (TypeError, ValueError):
print(f"无法将 {quantity} 转换为浮点数")
def calculate_external_variable(self, var_name: str, context: Any) -> float:
"""计算表外变量,并乘以父级清单的数量"""
# 使用默认实现计算表外变量
from equipment_calculation.bcl_utils import calculator
result = calculator.calculate(var_name, context)
# 获取清单数量 - 首先从全局变量获取
global _BILL_QUANTITY # 在模块级别定义一个全局变量
parent_quantity = _BILL_QUANTITY if "_BILL_QUANTITY" in globals() else 1.0
print(f"\n===== 获取清单数量 =====")
print(f"从全局变量获取清单数量: {parent_quantity}")
# 如果全局变量没有设置正确的值,尝试从bill_quantity_cache获取
if parent_quantity == 1.0 and hasattr(context, "__dict__"):
module = sys.modules.get("quantity_fee_calculator")
if module and hasattr(module, "bill_quantity_cache"):
bill_cache = getattr(module, "bill_quantity_cache")
bill_id = None
# 尝试从上下文中获取bill_id
if hasattr(context, "bill_id"):
bill_id = context.bill_id
elif hasattr(context, "datasource") and context.datasource:
for item in context.datasource:
if hasattr(item, "object") and hasattr(item.object, "bill_id"):
bill_id = item.object.bill_id
break
# 如果找到bill_id并且在缓存中,使用缓存的数量
if bill_id and bill_id in bill_cache:
parent_quantity = bill_cache[bill_id]
print(f"从bill_quantity_cache获取清单{bill_id}的数量: {parent_quantity}")
# 设置一个临时环境变量作为最后的手段
if parent_quantity == 1.0:
import os
env_quantity = os.environ.get("BILL_QUANTITY")
if env_quantity:
try:
parent_quantity = float(env_quantity)
print(f"从环境变量获取清单数量: {parent_quantity}")
except:
pass
print(f"===== 获取清单数量结束 =====\n")
# 将结果乘以父级清单的数量
final_result = float(result) * parent_quantity if result is not None else 0.0
print(f"表外变量 '{var_name}' 的值为 {result},父级清单数量为 {parent_quantity},最终结果为 {final_result}")
return final_result
def preprocess_quantity_fee_node(self, node: dict) -> None:
"""
工程量取费表节点预处理规则:
- 类型为"1""主材"时,供货方映射,主材属性补全
- 类型为"5""设备"时,供货方映射,设备类型映射
"""
node_type = str(node.get("类型", ""))
# 主材处理
if node_type == "1" or node_type == "主材":
supplier = str(node.get("供货方", ""))
if supplier == "1":
node["供货方"] = "甲供"
elif supplier == "2":
node["供货方"] = "乙供"
# 补全主材属性
for key in ["拆分", "设备性材料", "损耗", "预算价含税"]:
if key not in node:
node[key] = 0
# 设备处理
if node_type == "5" or node_type == "设备":
supplier = str(node.get("供货方", ""))
if supplier == "1":
node["供货方"] = "甲供"
elif supplier == "2":
node["供货方"] = "乙供"
# 设备类型映射
if str(node.get("设备类型", "")) == "0":
node["设备类型"] = "普通设备"
def post_process_quantity_fees(self, output_file: str, project_name: str) -> None:
"""将取费表中的'综合单价'替换为'合价'"""
try:
# 读取输出文件
with open(output_file, "r", encoding="utf-8") as f:
data = json.load(f)
# 递归查找并替换"综合单价"为"合价"
def replace_key(obj):
if isinstance(obj, dict):
new_obj = {}
for key, value in obj.items():
# 替换键名
new_key = "合价" if key == "综合单价" else key
# 递归处理值
new_obj[new_key] = replace_key(value)
return new_obj
elif isinstance(obj, list):
return [replace_key(item) for item in obj]
else:
return obj
# 替换所有出现的"综合单价"
new_data = replace_key(data)
# 写回文件
with open(output_file, "w", encoding="utf-8") as f:
json.dump(new_data, f, ensure_ascii=False, indent=2)
print(f"已将取费表中的'综合单价'替换为'合价'")
except Exception as e:
print(f"修改取费表时出错: {e}")
class DistributionBudgetCalculationStrategy(DefaultCalculationStrategy):
"""配网预算计算策略"""
# def calculate_fee_base(
# self, fee_base: str, cost_table: dict, project_node: dict, context, in_calculation=None
# ) -> float:
# """计算取费基数,可以重写以添加配网预算特定的计算规则"""
# # 示例:如果取费基数是特定值,使用特定的计算规则
# if fee_base == "材料费":
# print(f"应用配网预算特定的取费基数计算规则: {fee_base}")
# # 这里可以添加特定的计算逻辑
# # 暂时仍然使用默认实现
# return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation)
# # 对于其他取费基数,使用默认实现
# return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation)
class DistributionBillCalculationStrategy(DefaultCalculationStrategy):
"""配网清单计算策略"""
# def calculate_fee_base(
# self, fee_base: str, cost_table: dict, project_node: dict, context, in_calculation=None
# ) -> float:
# """计算取费基数,可以重写以添加配网清单特定的计算规则"""
# # 示例:如果取费基数是特定值,使用特定的计算规则
# if fee_base == "机械费":
# print(f"应用配网清单特定的取费基数计算规则: {fee_base}")
# # 这里可以添加特定的计算逻辑
# # 暂时仍然使用默认实现
# return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation)
# # 对于其他取费基数,使用默认实现
# return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation)
class TechnicalRenovationBudgetCalculationStrategy(DefaultCalculationStrategy):
"""技改预算计算策略"""
# def calculate_fee_base(
# self, fee_base: str, cost_table: dict, project_node: dict, context, in_calculation=None
# ) -> float:
# """计算取费基数,可以重写以添加技改预算特定的计算规则"""
# # 示例:如果取费基数是特定值,使用特定的计算规则
# if fee_base == "人工费+材料费":
# print(f"应用技改预算特定的取费基数计算规则: {fee_base}")
# # 这里可以添加特定的计算逻辑
# # 暂时仍然使用默认实现
# return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation)
# # 对于其他取费基数,使用默认实现
# return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation)
class TechnicalRenovationBillCalculationStrategy(DefaultCalculationStrategy):
"""技改清单计算策略"""
# def calculate_fee_base(
# self, fee_base: str, cost_table: dict, project_node: dict, context, in_calculation=None
# ) -> float:
# """计算取费基数,可以重写以添加技改清单特定的计算规则"""
# # 示例:如果取费基数是特定值,使用特定的计算规则
# if fee_base == "人工费+材料费+机械费":
# print(f"应用技改清单特定的取费基数计算规则: {fee_base}")
# # 这里可以添加特定的计算逻辑
# # 暂时仍然使用默认实现
# return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation)
# # 对于其他取费基数,使用默认实现
# return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation)
def calculate_external_variable(self, var_name: str, context: Any) -> float:
"""计算表外变量,并乘以父级清单的数量"""
# 使用默认实现计算表外变量
from equipment_calculation.bcl_utils import calculator
result = calculator.calculate(var_name, context)
# 获取清单数量 - 首先从全局变量获取
global _BILL_QUANTITY # 在模块级别定义一个全局变量
parent_quantity = _BILL_QUANTITY if "_BILL_QUANTITY" in globals() else 1.0
print(f"\n===== 获取清单数量 =====")
print(f"从全局变量获取清单数量: {parent_quantity}")
# 如果全局变量没有设置正确的值,尝试从bill_quantity_cache获取
if parent_quantity == 1.0 and hasattr(context, "__dict__"):
module = sys.modules.get("quantity_fee_calculator")
if module and hasattr(module, "bill_quantity_cache"):
bill_cache = getattr(module, "bill_quantity_cache")
bill_id = None
# 尝试从上下文中获取bill_id
if hasattr(context, "bill_id"):
bill_id = context.bill_id
elif hasattr(context, "datasource") and context.datasource:
for item in context.datasource:
if hasattr(item, "object") and hasattr(item.object, "bill_id"):
bill_id = item.object.bill_id
break
# 如果找到bill_id并且在缓存中,使用缓存的数量
if bill_id and bill_id in bill_cache:
parent_quantity = bill_cache[bill_id]
print(f"从bill_quantity_cache获取清单{bill_id}的数量: {parent_quantity}")
# 设置一个临时环境变量作为最后的手段
if parent_quantity == 1.0:
import os
env_quantity = os.environ.get("BILL_QUANTITY")
if env_quantity:
try:
parent_quantity = float(env_quantity)
print(f"从环境变量获取清单数量: {parent_quantity}")
except:
pass
print(f"===== 获取清单数量结束 =====\n")
# 将结果乘以父级清单的数量
final_result = float(result) * parent_quantity if result is not None else 0.0
print(f"表外变量 '{var_name}' 的值为 {result},父级清单数量为 {parent_quantity},最终结果为 {final_result}")
return final_result
def preprocess_quantity_fee_node(self, node: dict) -> None:
"""
工程量取费表节点预处理规则:
- 类型为"1""主材"时,供货方映射,主材属性补全
- 类型为"5""设备"时,供货方映射,设备类型映射
"""
node_type = str(node.get("类型", ""))
# 主材处理
if node_type == "1" or node_type == "主材":
supplier = str(node.get("供货方", ""))
if supplier == "1":
node["供货方"] = "甲供"
elif supplier == "2":
node["供货方"] = "乙供"
# 补全主材属性
for key in ["拆分", "设备性材料", "损耗", "预算价含税"]:
if key not in node:
node[key] = 0
# 设备处理
if node_type == "5" or node_type == "设备":
supplier = str(node.get("供货方", ""))
if supplier == "1":
node["供货方"] = "甲供"
elif supplier == "2":
node["供货方"] = "乙供"
# 设备类型映射
if str(node.get("设备类型", "")) == "0":
node["设备类型"] = "普通设备"
def post_process_quantity_fees(self, output_file: str, project_name: str) -> None:
"""将取费表中的'综合单价'替换为'合价'"""
try:
# 读取输出文件
with open(output_file, "r", encoding="utf-8") as f:
data = json.load(f)
# 递归查找并替换"综合单价"为"合价"
def replace_key(obj):
if isinstance(obj, dict):
new_obj = {}
for key, value in obj.items():
# 替换键名
new_key = "合价" if key == "综合单价" else key
# 递归处理值
new_obj[new_key] = replace_key(value)
return new_obj
elif isinstance(obj, list):
return [replace_key(item) for item in obj]
else:
return obj
# 替换所有出现的"综合单价"
new_data = replace_key(data)
# 写回文件
with open(output_file, "w", encoding="utf-8") as f:
json.dump(new_data, f, ensure_ascii=False, indent=2)
print(f"已将取费表中的'综合单价'替换为'合价'")
except Exception as e:
print(f"修改取费表时出错: {e}")
class MainGridBudgetCalculator(CalculatorBase):
"""主网预算计算器"""
def __init__(self):
super().__init__(MAIN_GRID_BUDGET)
def create_calculation_strategy(self) -> CalculationStrategy:
"""创建主网预算计算策略"""
return MainGridBudgetCalculationStrategy()
def set_output_dir(self, output_dir):
"""
设置计算结果的输出目录
:param output_dir: 输出目录的路径
"""
self.output_dir = output_dir
def apply_quantity_fee_rules(self) -> None:
"""应用主网预算特定的工程量取费规则"""
print(f"应用{self.software_type.name}特定的工程量取费规则 - 对节点进行预处理")
# 创建预处理函数
def preprocess_nodes_recursive(json_file_path):
try:
# 读取 JSON 文件
with open(json_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# 获取工程量数据
project_data = data.get("projectData", {})
quantity_data = project_data.get("quantity", {})
# 递归处理所有节点
def process_node(node):
# 调用计算策略的预处理方法
if isinstance(node, dict):
self.calculation_strategy.preprocess_quantity_fee_node(node)
# 处理子节点
if "children" in node and isinstance(node["children"], list):
for child in node["children"]:
process_node(child)
# 处理所有工程量节点
process_node(quantity_data)
# 写回 JSON 文件
with open(json_file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"已完成对 {json_file_path} 中所有节点的预处理")
except Exception as e:
print(f"预处理节点时出错: {e}")
# 不在这里调用预处理函数,因为我们还不知道 JSON 文件路径
# 这个函数会在 calculate_quantity_fee_tables 中调用
# preprocess_nodes_recursive(json_file_path)
# 只需实现一个空函数,返回预处理函数以便后续调用
return preprocess_nodes_recursive
def apply_resource_fee_rules(self) -> None:
"""应用主网预算特定的人材机合价规则"""
print(f"应用{self.software_type.name}特定的人材机合价规则")
def post_process_quantity_fees(self, output_file: str, project_name: str) -> None:
"""应用主网预算特定的工程量取费后处理规则"""
print(f"应用{self.software_type.name}特定的工程量取费后处理规则")
def post_process_resource_fees(self, output_file: str, project_name: str) -> None:
"""应用主网预算特定的人材机合价后处理规则"""
print(f"应用{self.software_type.name}特定的人材机合价后处理规则")
class MainGridBillCalculator(CalculatorBase):
"""主网清单计算器"""
def __init__(self):
super().__init__(MAIN_GRID_BILL)
def create_calculation_strategy(self) -> CalculationStrategy:
"""创建主网清单计算策略"""
return MainGridBillCalculationStrategy()
def set_output_dir(self, output_dir):
"""
设置计算结果的输出目录
:param output_dir: 输出目录的路径
"""
self.output_dir = output_dir
def apply_quantity_fee_rules(self) -> None:
"""应用主网清单特定的工程量取费规则"""
print(f"应用{self.software_type.name}特定的工程量取费规则 - 对节点进行预处理")
# 创建预处理函数,这个函数会处理JSON文件中的节点
def preprocess_nodes_recursive(json_file_path):
try:
# 读取 JSON 文件
with open(json_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# 获取工程量数据
project_data = data.get("projectData", {})
quantity_data = project_data.get("quantity", {})
bill_data = project_data.get("bill", {})
print(f"开始预处理工程量和清单节点...")
# 递归处理所有节点
def process_node(node):
if isinstance(node, dict):
self.calculation_strategy.preprocess_quantity_fee_node(node)
if "children" in node and isinstance(node["children"], list):
for child in node["children"]:
process_node(child)
# 处理工程量节点
process_node(quantity_data)
# 也处理清单节点,因为对于清单工程,清单节点信息也很重要
process_node(bill_data)
# 写回文件
with open(json_file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"已完成对 {json_file_path} 中所有节点的预处理")
except Exception as e:
print(f"预处理节点时出错: {e}")
import traceback
traceback.print_exc()
# 返回预处理函数,供 calculate_quantity_fee_tables 调用
return preprocess_nodes_recursive
def apply_resource_fee_rules(self) -> None:
"""应用主网清单特定的人材机合价规则"""
print(f"应用{self.software_type.name}特定的人材机合价规则")
def post_process_quantity_fees(self, output_file: str, project_name: str) -> None:
"""应用主网清单特定的工程量取费后处理规则"""
print(f"应用{self.software_type.name}特定的工程量取费后处理规则")
def post_process_resource_fees(self, output_file: str, project_name: str) -> None:
"""应用主网清单特定的人材机合价后处理规则"""
print(f"应用{self.software_type.name}特定的人材机合价后处理规则")
class DistributionBudgetCalculator(CalculatorBase):
"""配网预算计算器"""
def __init__(self):
super().__init__(DISTRIBUTION_BUDGET)
def set_output_dir(self, output_dir):
"""
设置计算结果的输出目录
:param output_dir: 输出目录的路径
"""
self.output_dir = output_dir
def create_calculation_strategy(self) -> CalculationStrategy:
"""创建配网预算计算策略"""
return DistributionBudgetCalculationStrategy()
def apply_quantity_fee_rules(self) -> None:
"""应用配网预算特定的工程量取费规则"""
print(f"应用{self.software_type.name}特定的工程量取费规则")
def apply_resource_fee_rules(self) -> None:
"""应用配网预算特定的人材机合价规则"""
print(f"应用{self.software_type.name}特定的人材机合价规则")
def post_process_quantity_fees(self, output_file: str, project_name: str) -> None:
"""应用配网预算特定的工程量取费后处理规则"""
print(f"应用{self.software_type.name}特定的工程量取费后处理规则")
def post_process_resource_fees(self, output_file: str, project_name: str) -> None:
"""应用配网预算特定的人材机合价后处理规则"""
print(f"应用{self.software_type.name}特定的人材机合价后处理规则")
class DistributionBillCalculator(CalculatorBase):
"""配网清单计算器"""
def __init__(self):
super().__init__(DISTRIBUTION_BILL)
def create_calculation_strategy(self) -> CalculationStrategy:
"""创建配网清单计算策略"""
return DistributionBillCalculationStrategy()
def set_output_dir(self, output_dir):
"""
设置计算结果的输出目录
:param output_dir: 输出目录的路径
"""
self.output_dir = output_dir
def apply_quantity_fee_rules(self) -> None:
"""应用配网清单特定的工程量取费规则"""
print(f"应用{self.software_type.name}特定的工程量取费规则")
def apply_resource_fee_rules(self) -> None:
"""应用配网清单特定的人材机合价规则"""
print(f"应用{self.software_type.name}特定的人材机合价规则")
def post_process_quantity_fees(self, output_file: str, project_name: str) -> None:
"""应用配网清单特定的工程量取费后处理规则"""
print(f"应用{self.software_type.name}特定的工程量取费后处理规则")
def post_process_resource_fees(self, output_file: str, project_name: str) -> None:
"""应用配网清单特定的人材机合价后处理规则"""
print(f"应用{self.software_type.name}特定的人材机合价后处理规则")
class TechnicalRenovationBudgetCalculator(CalculatorBase):
"""技改预算计算器"""
def __init__(self):
super().__init__(TECHNICAL_RENOVATION_BUDGET)
def create_calculation_strategy(self) -> CalculationStrategy:
"""创建技改预算计算策略"""
return TechnicalRenovationBudgetCalculationStrategy()
def set_output_dir(self, output_dir):
"""
设置计算结果的输出目录
:param output_dir: 输出目录的路径
"""
self.output_dir = output_dir
def apply_quantity_fee_rules(self) -> None:
"""应用技改预算特定的工程量取费规则"""
print(f"应用{self.software_type.name}特定的工程量取费规则")
def apply_resource_fee_rules(self) -> None:
"""应用技改预算特定的人材机合价规则"""
print(f"应用{self.software_type.name}特定的人材机合价规则")
def post_process_quantity_fees(self, output_file: str, project_name: str) -> None:
"""应用技改预算特定的工程量取费后处理规则"""
print(f"应用{self.software_type.name}特定的工程量取费后处理规则")
def post_process_resource_fees(self, output_file: str, project_name: str) -> None:
"""应用技改预算特定的人材机合价后处理规则"""
print(f"应用{self.software_type.name}特定的人材机合价后处理规则")
class TechnicalRenovationBillCalculator(CalculatorBase):
"""技改清单计算器"""
def __init__(self):
super().__init__(TECHNICAL_RENOVATION_BILL)
def create_calculation_strategy(self) -> CalculationStrategy:
"""创建技改清单计算策略"""
return TechnicalRenovationBillCalculationStrategy()
def set_output_dir(self, output_dir):
"""
设置计算结果的输出目录
:param output_dir: 输出目录的路径
"""
self.output_dir = output_dir
def apply_quantity_fee_rules(self) -> None:
"""应用技改清单特定的工程量取费规则"""
print(f"应用{self.software_type.name}特定的工程量取费规则")
def apply_resource_fee_rules(self) -> None:
"""应用技改清单特定的人材机合价规则"""
print(f"应用{self.software_type.name}特定的人材机合价规则")
def post_process_quantity_fees(self, output_file: str, project_name: str) -> None:
"""应用技改清单特定的工程量取费后处理规则"""
print(f"应用{self.software_type.name}特定的工程量取费后处理规则")
def post_process_resource_fees(self, output_file: str, project_name: str) -> None:
"""应用技改清单特定的人材机合价后处理规则"""
print(f"应用{self.software_type.name}特定的人材机合价后处理规则")
def get_calculator(software_type: SoftwareType) -> CalculatorBase:
"""
根据软件类型获取对应的计算器
Args:
software_type: 软件类型
Returns:
CalculatorBase: 计算器实例
"""
calculators = {
MAIN_GRID_BUDGET.name: MainGridBudgetCalculator(),
MAIN_GRID_BILL.name: MainGridBillCalculator(),
DISTRIBUTION_BUDGET.name: DistributionBudgetCalculator(),
DISTRIBUTION_BILL.name: DistributionBillCalculator(),
TECHNICAL_RENOVATION_BUDGET.name: TechnicalRenovationBudgetCalculator(),
TECHNICAL_RENOVATION_BILL.name: TechnicalRenovationBillCalculator(),
}
return calculators.get(software_type.name)