diff --git a/equipment_calculation/main.py b/equipment_calculation/main.py new file mode 100644 index 0000000..831f60f --- /dev/null +++ b/equipment_calculation/main.py @@ -0,0 +1,315 @@ +""" +第二步:计算bcl结果 +""" + +import os +import argparse +import re +import json +from typing import Dict, List, Any, Optional, Tuple +from equipment_calculation.software_types import ( + get_software_type, + SoftwareCategory, + EngineeringType, + ALL_SOFTWARE_TYPES, +) +from equipment_calculation.software_calculators import get_calculator + + +def parse_arguments(): + """解析命令行参数""" + parser = argparse.ArgumentParser(description="工程量取费和人材机合价计算程序") + + # 软件类型参数 + parser.add_argument( + "--category", choices=["主网", "配网", "技改"], default="主网", help="软件类别(主网/配网/技改)" + ) + + parser.add_argument("--engineering-type", choices=["预算", "清单"], default="清单", help="工程类型(预算/清单)") + + # 计算类型参数 + parser.add_argument( + "--calculate", + choices=["all", "quantity", "resource"], + default="quantity", + help="计算类型(all: 全部, quantity: 工程量取费, resource: 人材机合价)", + ) + + # 项目参数 + parser.add_argument("--project", help="项目名称,如果不指定则处理所有项目") + + parser.add_argument("--adjustment-type", default="调差", help="调差类型,默认为'调差'") + + # 输入文件参数 + parser.add_argument( + "--input-file", + default="测试案例/主网清单变电.json", + help="输入JSON文件路径,如果不指定则使用默认路径", + ) + + # 添加输入和输出文件夹参数 + parser.add_argument("--input-folder", help="输入文件夹路径,包含要处理的JSON文件") + parser.add_argument("--output-folder", default="计算结果", help="输出文件夹路径,用于保存计算结果") + + return parser.parse_args() + + +def parse_filename(filename: str) -> Tuple[str, str]: + """ + 从文件名中解析软件类别和工程类型(备用方法) + + :param filename: JSON文件名,例如"主网预算变电.json" + :return: (category, engineering_type) 元组,例如 ("主网", "预算") + """ + # 移除扩展名 + basename = os.path.splitext(filename)[0] + + # 查找类别(主网/配网/技改) + category = None + for cat in ["主网", "配网", "技改"]: + if cat in basename: + category = cat + break + + # 查找工程类型(预算/清单) + engineering_type = None + for eng_type in ["预算", "清单"]: + if eng_type in basename: + engineering_type = eng_type + break + + # 如果未找到,使用默认值 + if not category: + print(f"警告: 无法从文件名 '{filename}' 中解析软件类别,使用默认值 '主网'") + category = "主网" + + if not engineering_type: + print(f"警告: 无法从文件名 '{filename}' 中解析工程类型,使用默认值 '清单'") + engineering_type = "清单" + + return category, engineering_type + + +def parse_json_content(json_file_path: str) -> Tuple[Optional[str], Optional[str]]: + """ + 从JSON文件内容中解析软件类别和工程类型 + + :param json_file_path: JSON文件路径 + :return: (category, engineering_type) 元组,如果解析失败则返回 (None, None) + """ + try: + with open(json_file_path, "r", encoding="utf-8") as f: + data = json.load(f) + + # 定义阶段类型映射表 + budget_types = ["概预算", "定额", "估算", "概算"] + list_types = ["清单", "结算", "招标控制价", "招投标工程"] + + # 从division字段获取软件名称和阶段类型 + if "division" in data: + division = data["division"] + print(f"找到division字段: {division}") + + # 使用-分割division字段 + parts = division.split("-") + if len(parts) >= 2: + category = parts[0].strip() + stage_type = parts[1].strip() + + # 验证软件类别 + if category not in ["主网", "配网", "技改"]: + print(f"警告: division中的软件名称 '{category}' 不是有效值,将使用默认值 '主网'") + category = "主网" + + # 映射阶段类型 + if any(budget_type in stage_type for budget_type in budget_types): + engineering_type = "预算" + elif any(list_type in stage_type for list_type in list_types): + engineering_type = "清单" + else: + print(f"警告: division中的阶段类型 '{stage_type}' 无法映射到预算或清单,将使用默认值 '清单'") + engineering_type = "清单" + + print(f"从division解析: 软件名称={category}, 阶段类型={engineering_type} (原始值: {stage_type})") + return category, engineering_type + else: + print(f"警告: division字段 '{division}' 格式不正确,无法分割") + else: + print(f"警告: JSON文件中未找到division字段,尝试从basicData中解析") + + # 作为备选,尝试从basicData中获取 + if "basicData" in data: + basic_data = data["basicData"] + category = basic_data.get("软件名称") + engineering_type = basic_data.get("阶段类型") + + # 验证解析结果 + if category and engineering_type: + # 确保category是有效值 + if category not in ["主网", "配网", "技改"]: + print(f"警告: basicData中的软件名称 '{category}' 不是有效值,将使用默认值 '主网'") + category = "主网" + + # 确保engineering_type是有效值 + if engineering_type not in ["预算", "清单"]: + print(f"警告: basicData中的阶段类型 '{engineering_type}' 不是有效值,将使用默认值 '清单'") + engineering_type = "清单" + + print(f"从basicData解析: 软件名称={category}, 阶段类型={engineering_type}") + return category, engineering_type + else: + print(f"警告: basicData中未找到软件名称或阶段类型") + else: + print(f"警告: JSON文件中未找到basicData部分") + + return None, None + + except Exception as e: + print(f"解析JSON文件内容时出错: {str(e)}") + return None, None + + +def process_json_file( + input_file: str, + base_output_dir: str = "计算结果", + category: Optional[str] = None, + engineering_type: Optional[str] = None, + project: Optional[str] = None, + adjustment_type: str = "调差", +) -> bool: + """ + 处理单个JSON文件 + + :param input_file: 输入JSON文件路径 + :param base_output_dir: 基础输出目录路径,实际结果会保存在此目录下的子文件夹中 + :param category: 软件类别,如果为None则从JSON内容中解析 + :param engineering_type: 工程类型,如果为None则从JSON内容中解析 + :param project: 项目名称,如果为None则处理所有项目 + :param adjustment_type: 调差类型 + :return: 处理是否成功 + """ + try: + # 获取文件名(不含扩展名)作为输出子文件夹的名称 + filename = os.path.basename(input_file) + file_basename = os.path.splitext(filename)[0] + output_dir = os.path.join(base_output_dir, file_basename) + + # 创建输出子文件夹 + os.makedirs(output_dir, exist_ok=True) + print(f"创建输出目录: {output_dir}") + + # 如果未指定category或engineering_type,从JSON内容中解析 + if category is None or engineering_type is None: + # 首先尝试从JSON内容中解析 + json_category, json_engineering_type = parse_json_content(input_file) + + if category is None: + if json_category: + category = json_category + else: + # 如果从JSON内容中解析失败,尝试从文件名中解析(作为备用) + print("从JSON内容解析软件名称失败,尝试从文件名解析...") + parsed_category, _ = parse_filename(filename) + category = parsed_category + + if engineering_type is None: + if json_engineering_type: + engineering_type = json_engineering_type + else: + # 如果从JSON内容中解析失败,尝试从文件名中解析(作为备用) + print("从JSON内容解析阶段类型失败,尝试从文件名解析...") + _, parsed_engineering_type = parse_filename(filename) + engineering_type = parsed_engineering_type + + print(f"处理文件: {input_file}") + print(f" 软件类别: {category}") + print(f" 工程类型: {engineering_type}") + + # 获取软件类型 + software_type = get_software_type(category, engineering_type) + print(f" 使用软件类型: {software_type.name}") + + # 获取计算器 + calculator = get_calculator(software_type) + if not calculator: + print(f" 错误: 未找到软件类型 {software_type.name} 的计算器") + return False + + # 设置输出目录 + calculator.set_output_dir(output_dir) + + # 执行计算 + print(" 开始计算工程量取费表...") + calculator.calculate_quantity_fee_tables(json_file_path=input_file, project_name=project) + + print(" 开始计算人材机合价...") + calculator.calculate_resource_fee_tables(json_file_path=input_file, project_name=project) + + print(f" 处理完成: {input_file}") + print(f" 结果保存在: {output_dir}") + return True + + except Exception as e: + print(f" 处理文件 {input_file} 时出错: {str(e)}") + return False + + +def process_BCL_calculate(input_folder: str, output_folder: str) -> List[Tuple[str, bool]]: + """ + 处理指定文件夹中的所有JSON文件 + + :param input_folder: 输入文件夹路径,包含要处理的JSON文件 + :param output_folder: 输出文件夹路径,用于保存计算结果 + :return: 处理结果列表,格式为 [(文件路径, 是否成功), ...] + """ + # 确保基础输出目录存在 + os.makedirs(output_folder, exist_ok=True) + + # 查找所有JSON文件 + json_files = [] + for file in os.listdir(input_folder): + if file.lower().endswith(".json"): + json_files.append(os.path.join(input_folder, file)) + + if not json_files: + print(f"警告: 在目录 {input_folder} 中没有找到JSON文件") + return [] + + # 处理每个JSON文件 + results = [] + for input_file in json_files: + success = process_json_file( + input_file=input_file, + base_output_dir=output_folder, # 传递基础输出目录 + category=None, # 从JSON内容中解析 + engineering_type=None, # 从JSON内容中解析 + project=None, # 处理所有项目 + adjustment_type="调差", + ) + + results.append((input_file, success)) + + return results + + +def main(): + """程序入口""" + + input_folder = "project2json/outputs/json" + output_folder = "project2json/outputs/bcl_results" + + results = process_BCL_calculate(input_folder, output_folder) + + # 显示处理结果 + success_count = sum(1 for _, success in results if success) + print(f"\n处理完成: 成功 {success_count}/{len(results)} 个文件") + + if success_count < len(results): + print("处理失败的文件:") + for file_path, success in results: + if not success: + print(f" - {os.path.basename(file_path)}") + + +if __name__ == "__main__": + main() diff --git a/equipment_calculation/software_calculators.py b/equipment_calculation/software_calculators.py new file mode 100644 index 0000000..d2fef44 --- /dev/null +++ b/equipment_calculation/software_calculators.py @@ -0,0 +1,703 @@ +from equipment_calculation.calculator_base import CalculatorBase +from equipment_calculation.software_types import ( + SoftwareType, + MAIN_GRID_BUDGET, + MAIN_GRID_BILL, + DISTRIBUTION_BUDGET, + DISTRIBUTION_BILL, + TECHNICAL_RENOVATION_BUDGET, + TECHNICAL_RENOVATION_BILL, +) +from equipment_calculation.calculation_strategy import CalculationStrategy, DefaultCalculationStrategy +import json +import os +from typing import Any, Dict, Optional, Set +import sys +import re + +# 添加一个全局变量和一个缓存字典来存储清单数量 +_BILL_QUANTITY = 1.0 +bill_quantity_cache = {} + + +# 为每种软件类型定义特定的计算策略 +class MainGridBudgetCalculationStrategy(DefaultCalculationStrategy): + """主网预算计算策略""" + + def preprocess_quantity_fee_node(self, node: dict) -> None: + """ + 工程量取费表节点预处理规则: + - 类型为"1"或"主材"时,供货方映射,主材属性补全 + - 类型为"5"或"设备"时,供货方映射,设备类型映射 + """ + node_type = str(node.get("类型", "")) + # 定额处理 + if node_type == "0" or node_type == "定额": + supplier = str(node.get("费用类型", "")) + if supplier == "1": + node["费用类型"] = "不取费" + elif supplier == "0": + node["费用类型"] = "取费" + + # 处理特征段 + seg_val = node.get("特征段", "") + if isinstance(seg_val, str): + num = re.search(r"\d+", seg_val) + node["特征段"] = num.group() if num else "" + + # 主材处理 + if node_type == "1" or node_type == "主材": + supplier = str(node.get("供货方", "")) + if supplier == "1": + node["供货方"] = "甲供" + elif supplier == "2": + node["供货方"] = "乙供" + # 补全主材属性 + for key in ["拆分", "设备性材料"]: + if key not in node: + node[key] = 0 + # 设备处理 + if node_type == "5" or node_type == "设备": + supplier = str(node.get("供货方", "")) + if supplier == "1": + node["供货方"] = "甲供" + elif supplier == "2": + node["供货方"] = "乙供" + # 设备类型映射 + if str(node.get("设备类型", "")) == "0": + node["设备类型"] = "普通设备" + + +class MainGridBillCalculationStrategy(DefaultCalculationStrategy): + """主网清单计算策略""" + + def __init__(self): + super().__init__() + self.bill_quantity = 1.0 # 默认值 + + def set_bill_quantity(self, quantity): + """设置清单数量""" + try: + self.bill_quantity = float(quantity) + print(f"设置计算策略的清单数量: {self.bill_quantity}") + except (TypeError, ValueError): + print(f"无法将 {quantity} 转换为浮点数") + + def calculate_external_variable(self, var_name: str, context: Any) -> float: + """计算表外变量,并乘以父级清单的数量""" + # 使用默认实现计算表外变量 + from equipment_calculation.bcl_utils import calculator + + result = calculator.calculate(var_name, context) + + # 获取清单数量 - 首先从全局变量获取 + global _BILL_QUANTITY # 在模块级别定义一个全局变量 + parent_quantity = _BILL_QUANTITY if "_BILL_QUANTITY" in globals() else 1.0 + + print(f"\n===== 获取清单数量 =====") + print(f"从全局变量获取清单数量: {parent_quantity}") + + # 如果全局变量没有设置正确的值,尝试从bill_quantity_cache获取 + if parent_quantity == 1.0 and hasattr(context, "__dict__"): + module = sys.modules.get("quantity_fee_calculator") + if module and hasattr(module, "bill_quantity_cache"): + bill_cache = getattr(module, "bill_quantity_cache") + bill_id = None + + # 尝试从上下文中获取bill_id + if hasattr(context, "bill_id"): + bill_id = context.bill_id + elif hasattr(context, "datasource") and context.datasource: + for item in context.datasource: + if hasattr(item, "object") and hasattr(item.object, "bill_id"): + bill_id = item.object.bill_id + break + + # 如果找到bill_id并且在缓存中,使用缓存的数量 + if bill_id and bill_id in bill_cache: + parent_quantity = bill_cache[bill_id] + print(f"从bill_quantity_cache获取清单{bill_id}的数量: {parent_quantity}") + + # 设置一个临时环境变量作为最后的手段 + if parent_quantity == 1.0: + import os + + env_quantity = os.environ.get("BILL_QUANTITY") + if env_quantity: + try: + parent_quantity = float(env_quantity) + print(f"从环境变量获取清单数量: {parent_quantity}") + except: + pass + + print(f"===== 获取清单数量结束 =====\n") + + # 将结果乘以父级清单的数量 + final_result = float(result) * parent_quantity if result is not None else 0.0 + print(f"表外变量 '{var_name}' 的值为 {result},父级清单数量为 {parent_quantity},最终结果为 {final_result}") + return final_result + + def preprocess_quantity_fee_node(self, node: dict) -> None: + """ + 工程量取费表节点预处理规则: + - 类型为"1"或"主材"时,供货方映射,主材属性补全 + - 类型为"5"或"设备"时,供货方映射,设备类型映射 + """ + node_type = str(node.get("类型", "")) + # 主材处理 + if node_type == "1" or node_type == "主材": + supplier = str(node.get("供货方", "")) + if supplier == "1": + node["供货方"] = "甲供" + elif supplier == "2": + node["供货方"] = "乙供" + # 补全主材属性 + for key in ["拆分", "设备性材料", "损耗", "预算价含税"]: + if key not in node: + node[key] = 0 + # 设备处理 + if node_type == "5" or node_type == "设备": + supplier = str(node.get("供货方", "")) + if supplier == "1": + node["供货方"] = "甲供" + elif supplier == "2": + node["供货方"] = "乙供" + # 设备类型映射 + if str(node.get("设备类型", "")) == "0": + node["设备类型"] = "普通设备" + + def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: + """将取费表中的'综合单价'替换为'合价'""" + try: + # 读取输出文件 + with open(output_file, "r", encoding="utf-8") as f: + data = json.load(f) + + # 递归查找并替换"综合单价"为"合价" + def replace_key(obj): + if isinstance(obj, dict): + new_obj = {} + for key, value in obj.items(): + # 替换键名 + new_key = "合价" if key == "综合单价" else key + # 递归处理值 + new_obj[new_key] = replace_key(value) + return new_obj + elif isinstance(obj, list): + return [replace_key(item) for item in obj] + else: + return obj + + # 替换所有出现的"综合单价" + new_data = replace_key(data) + + # 写回文件 + with open(output_file, "w", encoding="utf-8") as f: + json.dump(new_data, f, ensure_ascii=False, indent=2) + + print(f"已将取费表中的'综合单价'替换为'合价'") + except Exception as e: + print(f"修改取费表时出错: {e}") + + +class DistributionBudgetCalculationStrategy(DefaultCalculationStrategy): + """配网预算计算策略""" + + # def calculate_fee_base( + # self, fee_base: str, cost_table: dict, project_node: dict, context, in_calculation=None + # ) -> float: + # """计算取费基数,可以重写以添加配网预算特定的计算规则""" + # # 示例:如果取费基数是特定值,使用特定的计算规则 + # if fee_base == "材料费": + # print(f"应用配网预算特定的取费基数计算规则: {fee_base}") + # # 这里可以添加特定的计算逻辑 + + # # 暂时仍然使用默认实现 + # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) + + # # 对于其他取费基数,使用默认实现 + # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) + + +class DistributionBillCalculationStrategy(DefaultCalculationStrategy): + """配网清单计算策略""" + + # def calculate_fee_base( + # self, fee_base: str, cost_table: dict, project_node: dict, context, in_calculation=None + # ) -> float: + # """计算取费基数,可以重写以添加配网清单特定的计算规则""" + # # 示例:如果取费基数是特定值,使用特定的计算规则 + # if fee_base == "机械费": + # print(f"应用配网清单特定的取费基数计算规则: {fee_base}") + # # 这里可以添加特定的计算逻辑 + + # # 暂时仍然使用默认实现 + # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) + + # # 对于其他取费基数,使用默认实现 + # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) + + +class TechnicalRenovationBudgetCalculationStrategy(DefaultCalculationStrategy): + """技改预算计算策略""" + + # def calculate_fee_base( + # self, fee_base: str, cost_table: dict, project_node: dict, context, in_calculation=None + # ) -> float: + # """计算取费基数,可以重写以添加技改预算特定的计算规则""" + # # 示例:如果取费基数是特定值,使用特定的计算规则 + # if fee_base == "人工费+材料费": + # print(f"应用技改预算特定的取费基数计算规则: {fee_base}") + # # 这里可以添加特定的计算逻辑 + + # # 暂时仍然使用默认实现 + # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) + + # # 对于其他取费基数,使用默认实现 + # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) + + +class TechnicalRenovationBillCalculationStrategy(DefaultCalculationStrategy): + """技改清单计算策略""" + + # def calculate_fee_base( + # self, fee_base: str, cost_table: dict, project_node: dict, context, in_calculation=None + # ) -> float: + # """计算取费基数,可以重写以添加技改清单特定的计算规则""" + # # 示例:如果取费基数是特定值,使用特定的计算规则 + # if fee_base == "人工费+材料费+机械费": + # print(f"应用技改清单特定的取费基数计算规则: {fee_base}") + # # 这里可以添加特定的计算逻辑 + + # # 暂时仍然使用默认实现 + # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) + + # # 对于其他取费基数,使用默认实现 + # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) + def calculate_external_variable(self, var_name: str, context: Any) -> float: + """计算表外变量,并乘以父级清单的数量""" + # 使用默认实现计算表外变量 + from equipment_calculation.bcl_utils import calculator + + result = calculator.calculate(var_name, context) + + # 获取清单数量 - 首先从全局变量获取 + global _BILL_QUANTITY # 在模块级别定义一个全局变量 + parent_quantity = _BILL_QUANTITY if "_BILL_QUANTITY" in globals() else 1.0 + + print(f"\n===== 获取清单数量 =====") + print(f"从全局变量获取清单数量: {parent_quantity}") + + # 如果全局变量没有设置正确的值,尝试从bill_quantity_cache获取 + if parent_quantity == 1.0 and hasattr(context, "__dict__"): + module = sys.modules.get("quantity_fee_calculator") + if module and hasattr(module, "bill_quantity_cache"): + bill_cache = getattr(module, "bill_quantity_cache") + bill_id = None + + # 尝试从上下文中获取bill_id + if hasattr(context, "bill_id"): + bill_id = context.bill_id + elif hasattr(context, "datasource") and context.datasource: + for item in context.datasource: + if hasattr(item, "object") and hasattr(item.object, "bill_id"): + bill_id = item.object.bill_id + break + + # 如果找到bill_id并且在缓存中,使用缓存的数量 + if bill_id and bill_id in bill_cache: + parent_quantity = bill_cache[bill_id] + print(f"从bill_quantity_cache获取清单{bill_id}的数量: {parent_quantity}") + + # 设置一个临时环境变量作为最后的手段 + if parent_quantity == 1.0: + import os + + env_quantity = os.environ.get("BILL_QUANTITY") + if env_quantity: + try: + parent_quantity = float(env_quantity) + print(f"从环境变量获取清单数量: {parent_quantity}") + except: + pass + + print(f"===== 获取清单数量结束 =====\n") + + # 将结果乘以父级清单的数量 + final_result = float(result) * parent_quantity if result is not None else 0.0 + print(f"表外变量 '{var_name}' 的值为 {result},父级清单数量为 {parent_quantity},最终结果为 {final_result}") + return final_result + + def preprocess_quantity_fee_node(self, node: dict) -> None: + """ + 工程量取费表节点预处理规则: + - 类型为"1"或"主材"时,供货方映射,主材属性补全 + - 类型为"5"或"设备"时,供货方映射,设备类型映射 + """ + node_type = str(node.get("类型", "")) + # 主材处理 + if node_type == "1" or node_type == "主材": + supplier = str(node.get("供货方", "")) + if supplier == "1": + node["供货方"] = "甲供" + elif supplier == "2": + node["供货方"] = "乙供" + # 补全主材属性 + for key in ["拆分", "设备性材料", "损耗", "预算价含税"]: + if key not in node: + node[key] = 0 + # 设备处理 + if node_type == "5" or node_type == "设备": + supplier = str(node.get("供货方", "")) + if supplier == "1": + node["供货方"] = "甲供" + elif supplier == "2": + node["供货方"] = "乙供" + # 设备类型映射 + if str(node.get("设备类型", "")) == "0": + node["设备类型"] = "普通设备" + + def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: + """将取费表中的'综合单价'替换为'合价'""" + try: + # 读取输出文件 + with open(output_file, "r", encoding="utf-8") as f: + data = json.load(f) + + # 递归查找并替换"综合单价"为"合价" + def replace_key(obj): + if isinstance(obj, dict): + new_obj = {} + for key, value in obj.items(): + # 替换键名 + new_key = "合价" if key == "综合单价" else key + # 递归处理值 + new_obj[new_key] = replace_key(value) + return new_obj + elif isinstance(obj, list): + return [replace_key(item) for item in obj] + else: + return obj + + # 替换所有出现的"综合单价" + new_data = replace_key(data) + + # 写回文件 + with open(output_file, "w", encoding="utf-8") as f: + json.dump(new_data, f, ensure_ascii=False, indent=2) + + print(f"已将取费表中的'综合单价'替换为'合价'") + except Exception as e: + print(f"修改取费表时出错: {e}") + + +class MainGridBudgetCalculator(CalculatorBase): + """主网预算计算器""" + + def __init__(self): + super().__init__(MAIN_GRID_BUDGET) + + def create_calculation_strategy(self) -> CalculationStrategy: + """创建主网预算计算策略""" + return MainGridBudgetCalculationStrategy() + + def set_output_dir(self, output_dir): + """ + 设置计算结果的输出目录 + + :param output_dir: 输出目录的路径 + """ + self.output_dir = output_dir + + def apply_quantity_fee_rules(self) -> None: + """应用主网预算特定的工程量取费规则""" + print(f"应用{self.software_type.name}特定的工程量取费规则 - 对节点进行预处理") + + # 创建预处理函数 + def preprocess_nodes_recursive(json_file_path): + try: + # 读取 JSON 文件 + with open(json_file_path, "r", encoding="utf-8") as f: + data = json.load(f) + + # 获取工程量数据 + project_data = data.get("projectData", {}) + quantity_data = project_data.get("quantity", {}) + + # 递归处理所有节点 + def process_node(node): + # 调用计算策略的预处理方法 + if isinstance(node, dict): + self.calculation_strategy.preprocess_quantity_fee_node(node) + # 处理子节点 + if "children" in node and isinstance(node["children"], list): + for child in node["children"]: + process_node(child) + + # 处理所有工程量节点 + process_node(quantity_data) + + # 写回 JSON 文件 + with open(json_file_path, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + print(f"已完成对 {json_file_path} 中所有节点的预处理") + except Exception as e: + print(f"预处理节点时出错: {e}") + + # 不在这里调用预处理函数,因为我们还不知道 JSON 文件路径 + # 这个函数会在 calculate_quantity_fee_tables 中调用 + # preprocess_nodes_recursive(json_file_path) + + # 只需实现一个空函数,返回预处理函数以便后续调用 + return preprocess_nodes_recursive + + def apply_resource_fee_rules(self) -> None: + """应用主网预算特定的人材机合价规则""" + print(f"应用{self.software_type.name}特定的人材机合价规则") + + def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: + """应用主网预算特定的工程量取费后处理规则""" + print(f"应用{self.software_type.name}特定的工程量取费后处理规则") + + def post_process_resource_fees(self, output_file: str, project_name: str) -> None: + """应用主网预算特定的人材机合价后处理规则""" + print(f"应用{self.software_type.name}特定的人材机合价后处理规则") + + +class MainGridBillCalculator(CalculatorBase): + """主网清单计算器""" + + def __init__(self): + super().__init__(MAIN_GRID_BILL) + + def create_calculation_strategy(self) -> CalculationStrategy: + """创建主网清单计算策略""" + return MainGridBillCalculationStrategy() + + def set_output_dir(self, output_dir): + """ + 设置计算结果的输出目录 + + :param output_dir: 输出目录的路径 + """ + self.output_dir = output_dir + + def apply_quantity_fee_rules(self) -> None: + """应用主网清单特定的工程量取费规则""" + print(f"应用{self.software_type.name}特定的工程量取费规则 - 对节点进行预处理") + + # 创建预处理函数,这个函数会处理JSON文件中的节点 + def preprocess_nodes_recursive(json_file_path): + try: + # 读取 JSON 文件 + with open(json_file_path, "r", encoding="utf-8") as f: + data = json.load(f) + + # 获取工程量数据 + project_data = data.get("projectData", {}) + quantity_data = project_data.get("quantity", {}) + bill_data = project_data.get("bill", {}) + + print(f"开始预处理工程量和清单节点...") + + # 递归处理所有节点 + def process_node(node): + if isinstance(node, dict): + self.calculation_strategy.preprocess_quantity_fee_node(node) + if "children" in node and isinstance(node["children"], list): + for child in node["children"]: + process_node(child) + + # 处理工程量节点 + process_node(quantity_data) + # 也处理清单节点,因为对于清单工程,清单节点信息也很重要 + process_node(bill_data) + + # 写回文件 + with open(json_file_path, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + print(f"已完成对 {json_file_path} 中所有节点的预处理") + except Exception as e: + print(f"预处理节点时出错: {e}") + import traceback + + traceback.print_exc() + + # 返回预处理函数,供 calculate_quantity_fee_tables 调用 + return preprocess_nodes_recursive + + def apply_resource_fee_rules(self) -> None: + """应用主网清单特定的人材机合价规则""" + print(f"应用{self.software_type.name}特定的人材机合价规则") + + def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: + """应用主网清单特定的工程量取费后处理规则""" + print(f"应用{self.software_type.name}特定的工程量取费后处理规则") + + def post_process_resource_fees(self, output_file: str, project_name: str) -> None: + """应用主网清单特定的人材机合价后处理规则""" + print(f"应用{self.software_type.name}特定的人材机合价后处理规则") + + +class DistributionBudgetCalculator(CalculatorBase): + """配网预算计算器""" + + def __init__(self): + super().__init__(DISTRIBUTION_BUDGET) + + def set_output_dir(self, output_dir): + """ + 设置计算结果的输出目录 + + :param output_dir: 输出目录的路径 + """ + self.output_dir = output_dir + + def create_calculation_strategy(self) -> CalculationStrategy: + """创建配网预算计算策略""" + return DistributionBudgetCalculationStrategy() + + def apply_quantity_fee_rules(self) -> None: + """应用配网预算特定的工程量取费规则""" + print(f"应用{self.software_type.name}特定的工程量取费规则") + + def apply_resource_fee_rules(self) -> None: + """应用配网预算特定的人材机合价规则""" + print(f"应用{self.software_type.name}特定的人材机合价规则") + + def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: + """应用配网预算特定的工程量取费后处理规则""" + print(f"应用{self.software_type.name}特定的工程量取费后处理规则") + + def post_process_resource_fees(self, output_file: str, project_name: str) -> None: + """应用配网预算特定的人材机合价后处理规则""" + print(f"应用{self.software_type.name}特定的人材机合价后处理规则") + + +class DistributionBillCalculator(CalculatorBase): + """配网清单计算器""" + + def __init__(self): + super().__init__(DISTRIBUTION_BILL) + + def create_calculation_strategy(self) -> CalculationStrategy: + """创建配网清单计算策略""" + return DistributionBillCalculationStrategy() + + def set_output_dir(self, output_dir): + """ + 设置计算结果的输出目录 + + :param output_dir: 输出目录的路径 + """ + self.output_dir = output_dir + + def apply_quantity_fee_rules(self) -> None: + """应用配网清单特定的工程量取费规则""" + print(f"应用{self.software_type.name}特定的工程量取费规则") + + def apply_resource_fee_rules(self) -> None: + """应用配网清单特定的人材机合价规则""" + print(f"应用{self.software_type.name}特定的人材机合价规则") + + def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: + """应用配网清单特定的工程量取费后处理规则""" + print(f"应用{self.software_type.name}特定的工程量取费后处理规则") + + def post_process_resource_fees(self, output_file: str, project_name: str) -> None: + """应用配网清单特定的人材机合价后处理规则""" + print(f"应用{self.software_type.name}特定的人材机合价后处理规则") + + +class TechnicalRenovationBudgetCalculator(CalculatorBase): + """技改预算计算器""" + + def __init__(self): + super().__init__(TECHNICAL_RENOVATION_BUDGET) + + def create_calculation_strategy(self) -> CalculationStrategy: + """创建技改预算计算策略""" + return TechnicalRenovationBudgetCalculationStrategy() + + def set_output_dir(self, output_dir): + """ + 设置计算结果的输出目录 + + :param output_dir: 输出目录的路径 + """ + self.output_dir = output_dir + + def apply_quantity_fee_rules(self) -> None: + """应用技改预算特定的工程量取费规则""" + print(f"应用{self.software_type.name}特定的工程量取费规则") + + def apply_resource_fee_rules(self) -> None: + """应用技改预算特定的人材机合价规则""" + print(f"应用{self.software_type.name}特定的人材机合价规则") + + def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: + """应用技改预算特定的工程量取费后处理规则""" + print(f"应用{self.software_type.name}特定的工程量取费后处理规则") + + def post_process_resource_fees(self, output_file: str, project_name: str) -> None: + """应用技改预算特定的人材机合价后处理规则""" + print(f"应用{self.software_type.name}特定的人材机合价后处理规则") + + +class TechnicalRenovationBillCalculator(CalculatorBase): + """技改清单计算器""" + + def __init__(self): + super().__init__(TECHNICAL_RENOVATION_BILL) + + def create_calculation_strategy(self) -> CalculationStrategy: + """创建技改清单计算策略""" + return TechnicalRenovationBillCalculationStrategy() + + def set_output_dir(self, output_dir): + """ + 设置计算结果的输出目录 + + :param output_dir: 输出目录的路径 + """ + self.output_dir = output_dir + + def apply_quantity_fee_rules(self) -> None: + """应用技改清单特定的工程量取费规则""" + print(f"应用{self.software_type.name}特定的工程量取费规则") + + def apply_resource_fee_rules(self) -> None: + """应用技改清单特定的人材机合价规则""" + print(f"应用{self.software_type.name}特定的人材机合价规则") + + def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: + """应用技改清单特定的工程量取费后处理规则""" + print(f"应用{self.software_type.name}特定的工程量取费后处理规则") + + def post_process_resource_fees(self, output_file: str, project_name: str) -> None: + """应用技改清单特定的人材机合价后处理规则""" + print(f"应用{self.software_type.name}特定的人材机合价后处理规则") + + +def get_calculator(software_type: SoftwareType) -> CalculatorBase: + """ + 根据软件类型获取对应的计算器 + + Args: + software_type: 软件类型 + + Returns: + CalculatorBase: 计算器实例 + """ + calculators = { + MAIN_GRID_BUDGET.name: MainGridBudgetCalculator(), + MAIN_GRID_BILL.name: MainGridBillCalculator(), + DISTRIBUTION_BUDGET.name: DistributionBudgetCalculator(), + DISTRIBUTION_BILL.name: DistributionBillCalculator(), + TECHNICAL_RENOVATION_BUDGET.name: TechnicalRenovationBudgetCalculator(), + TECHNICAL_RENOVATION_BILL.name: TechnicalRenovationBillCalculator(), + } + + return calculators.get(software_type.name)