""" 第二步:计算bcl结果 """ import os import argparse import re import json from typing import Dict, List, Any, Optional, Tuple from equipment_calculation.software_types import ( get_software_type, SoftwareCategory, EngineeringType, ALL_SOFTWARE_TYPES, ) from equipment_calculation.software_calculators import get_calculator def parse_arguments(): """解析命令行参数""" parser = argparse.ArgumentParser(description="工程量取费和人材机合价计算程序") # 软件类型参数 parser.add_argument( "--category", choices=["主网", "配网", "技改"], default="主网", help="软件类别(主网/配网/技改)" ) parser.add_argument("--engineering-type", choices=["预算", "清单"], default="清单", help="工程类型(预算/清单)") # 计算类型参数 parser.add_argument( "--calculate", choices=["all", "quantity", "resource"], default="quantity", help="计算类型(all: 全部, quantity: 工程量取费, resource: 人材机合价)", ) # 项目参数 parser.add_argument("--project", help="项目名称,如果不指定则处理所有项目") parser.add_argument("--adjustment-type", default="调差", help="调差类型,默认为'调差'") # 输入文件参数 parser.add_argument( "--input-file", default="测试案例/主网清单变电.json", help="输入JSON文件路径,如果不指定则使用默认路径", ) # 添加输入和输出文件夹参数 parser.add_argument("--input-folder", help="输入文件夹路径,包含要处理的JSON文件") parser.add_argument("--output-folder", default="计算结果", help="输出文件夹路径,用于保存计算结果") return parser.parse_args() def parse_filename(filename: str) -> Tuple[str, str]: """ 从文件名中解析软件类别和工程类型(备用方法) :param filename: JSON文件名,例如"主网预算变电.json" :return: (category, engineering_type) 元组,例如 ("主网", "预算") """ # 移除扩展名 basename = os.path.splitext(filename)[0] # 查找类别(主网/配网/技改) category = None for cat in ["主网", "配网", "技改"]: if cat in basename: category = cat break # 查找工程类型(预算/清单) engineering_type = None for eng_type in ["预算", "清单"]: if eng_type in basename: engineering_type = eng_type break # 如果未找到,使用默认值 if not category: print(f"警告: 无法从文件名 '{filename}' 中解析软件类别,使用默认值 '主网'") category = "主网" if not engineering_type: print(f"警告: 无法从文件名 '{filename}' 中解析工程类型,使用默认值 '清单'") engineering_type = "清单" return category, engineering_type def parse_json_content(json_file_path: str) -> Tuple[Optional[str], Optional[str]]: """ 从JSON文件内容中解析软件类别和工程类型 :param json_file_path: JSON文件路径 :return: (category, engineering_type) 元组,如果解析失败则返回 (None, None) """ try: with open(json_file_path, "r", encoding="utf-8") as f: data = json.load(f) # 定义阶段类型映射表 budget_types = ["概预算", "定额", "估算", "概算"] list_types = ["清单", "结算", "竣工", "决算"] # 从division字段获取软件名称和阶段类型 if "division" in data: division = data["division"] print(f"找到division字段: {division}") # 使用-分割division字段 parts = division.split("-") if len(parts) >= 2: category = parts[0].strip() stage_type = parts[1].strip() # 验证软件类别 if category not in ["主网", "配网", "技改"]: print(f"警告: division中的软件名称 '{category}' 不是有效值,将使用默认值 '主网'") category = "主网" # 映射阶段类型 if any(budget_type in stage_type for budget_type in budget_types): engineering_type = "预算" elif any(list_type in stage_type for list_type in list_types): engineering_type = "清单" else: print(f"警告: division中的阶段类型 '{stage_type}' 无法映射到预算或清单,将使用默认值 '清单'") engineering_type = "清单" print(f"从division解析: 软件名称={category}, 阶段类型={engineering_type} (原始值: {stage_type})") return category, engineering_type else: print(f"警告: division字段 '{division}' 格式不正确,无法分割") else: print(f"警告: JSON文件中未找到division字段,尝试从basicData中解析") # 作为备选,尝试从basicData中获取 if "basicData" in data: basic_data = data["basicData"] category = basic_data.get("软件名称") engineering_type = basic_data.get("阶段类型") # 验证解析结果 if category and engineering_type: # 确保category是有效值 if category not in ["主网", "配网", "技改"]: print(f"警告: basicData中的软件名称 '{category}' 不是有效值,将使用默认值 '主网'") category = "主网" # 确保engineering_type是有效值 if engineering_type not in ["预算", "清单"]: print(f"警告: basicData中的阶段类型 '{engineering_type}' 不是有效值,将使用默认值 '清单'") engineering_type = "清单" print(f"从basicData解析: 软件名称={category}, 阶段类型={engineering_type}") return category, engineering_type else: print(f"警告: basicData中未找到软件名称或阶段类型") else: print(f"警告: JSON文件中未找到basicData部分") return None, None except Exception as e: print(f"解析JSON文件内容时出错: {str(e)}") return None, None def process_json_file( input_file: str, base_output_dir: str = "计算结果", category: Optional[str] = None, engineering_type: Optional[str] = None, project: Optional[str] = None, adjustment_type: str = "调差", ) -> bool: """ 处理单个JSON文件 :param input_file: 输入JSON文件路径 :param base_output_dir: 基础输出目录路径,实际结果会保存在此目录下的子文件夹中 :param category: 软件类别,如果为None则从JSON内容中解析 :param engineering_type: 工程类型,如果为None则从JSON内容中解析 :param project: 项目名称,如果为None则处理所有项目 :param adjustment_type: 调差类型 :return: 处理是否成功 """ try: # 获取文件名(不含扩展名)作为输出子文件夹的名称 filename = os.path.basename(input_file) file_basename = os.path.splitext(filename)[0] output_dir = os.path.join(base_output_dir, file_basename) # 创建输出子文件夹 os.makedirs(output_dir, exist_ok=True) print(f"创建输出目录: {output_dir}") # 如果未指定category或engineering_type,从JSON内容中解析 if category is None or engineering_type is None: # 首先尝试从JSON内容中解析 json_category, json_engineering_type = parse_json_content(input_file) if category is None: if json_category: category = json_category else: # 如果从JSON内容中解析失败,尝试从文件名中解析(作为备用) print("从JSON内容解析软件名称失败,尝试从文件名解析...") parsed_category, _ = parse_filename(filename) category = parsed_category if engineering_type is None: if json_engineering_type: engineering_type = json_engineering_type else: # 如果从JSON内容中解析失败,尝试从文件名中解析(作为备用) print("从JSON内容解析阶段类型失败,尝试从文件名解析...") _, parsed_engineering_type = parse_filename(filename) engineering_type = parsed_engineering_type print(f"处理文件: {input_file}") print(f" 软件类别: {category}") print(f" 工程类型: {engineering_type}") # 获取软件类型 software_type = get_software_type(category, engineering_type) print(f" 使用软件类型: {software_type.name}") # 获取计算器 calculator = get_calculator(software_type) if not calculator: print(f" 错误: 未找到软件类型 {software_type.name} 的计算器") return False # 设置输出目录 calculator.set_output_dir(output_dir) # 执行计算 print(" 开始计算工程量取费表...") calculator.calculate_quantity_fee_tables( json_file_path=input_file, project_name=project, adjustment_type=adjustment_type ) print(" 开始计算人材机合价...") calculator.calculate_resource_fee_tables( json_file_path=input_file, project_name=project, adjustment_type=adjustment_type ) print(f" 处理完成: {input_file}") print(f" 结果保存在: {output_dir}") return True except Exception as e: print(f" 处理文件 {input_file} 时出错: {str(e)}") return False def process_BCL_calculate(input_folder: str, output_folder: str) -> List[Tuple[str, bool]]: """ 处理指定文件夹中的所有JSON文件 :param input_folder: 输入文件夹路径,包含要处理的JSON文件 :param output_folder: 输出文件夹路径,用于保存计算结果 :return: 处理结果列表,格式为 [(文件路径, 是否成功), ...] """ # 确保基础输出目录存在 os.makedirs(output_folder, exist_ok=True) # 查找所有JSON文件 json_files = [] for file in os.listdir(input_folder): if file.lower().endswith(".json"): json_files.append(os.path.join(input_folder, file)) if not json_files: print(f"警告: 在目录 {input_folder} 中没有找到JSON文件") return [] # 处理每个JSON文件 results = [] for input_file in json_files: success = process_json_file( input_file=input_file, base_output_dir=output_folder, # 传递基础输出目录 category=None, # 从JSON内容中解析 engineering_type=None, # 从JSON内容中解析 project=None, # 处理所有项目 adjustment_type="调差", ) results.append((input_file, success)) return results def main(): """程序入口""" input_folder = "project2json/outputs" output_folder = "outputs-2" results = process_BCL_calculate(input_folder, output_folder) # 显示处理结果 success_count = sum(1 for _, success in results if success) print(f"\n处理完成: 成功 {success_count}/{len(results)} 个文件") if success_count < len(results): print("处理失败的文件:") for file_path, success in results: if not success: print(f" - {os.path.basename(file_path)}") if __name__ == "__main__": main()