Files
KG_generation/equipment_calculation/main.py
T
2025-08-18 15:14:37 +08:00

288 lines
11 KiB
Python

import os
import json
from typing import Dict, List, Any, Optional, Tuple
from equipment_calculation.software_types import (
get_software_type,
SoftwareCategory,
EngineeringType,
ALL_SOFTWARE_TYPES,
)
from equipment_calculation.software_calculators import get_calculator
# 软件类别名称映射字典,将各种变体映射到标准类别
CATEGORY_MAPPING = {
# 主网及其变体
"主网": "主网",
"主网工程": "主网",
"主网项目": "主网",
# 配网及其变体
"配网": "配网",
"配网造价": "配网",
"配网清单": "配网",
# 技改及其变体
"技改": "技改",
"技改工程": "技改",
"技改项目": "技改",
"技改造价": "技改",
"技改清单": "技改",
}
def parse_json_content(json_file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""
从JSON文件内容中解析软件类别和工程类型
:param json_file_path: JSON文件路径
:return: (category, engineering_type) 元组,如果解析失败则返回 (None, None)
"""
try:
with open(json_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# 定义阶段类型映射表
budget_types = ["概预算", "定额", "定额计价", "概算", "概预算工程"]
list_types = ["清单", "结算", "招标控制价", "招投标工程", "清单计价"]
# 从division字段获取软件名称和阶段类型
if "division" in data:
division = data["division"]
print(f"找到division字段: {division}")
# 使用-分割division字段
parts = division.split("-")
if len(parts) == 2:
category = parts[0].strip()
stage_type = parts[1].strip()
elif len(parts) == 3:
category = parts[0].strip()
stage_type = parts[2].strip()
else:
print(f"警告: division字段 '{division}' 格式不正确,无法分割")
# 可以选择返回默认值或抛出异常,这里以返回默认值为例
category = "主网"
engineering_type = "清单"
print(f"使用默认值: 软件名称={category}, 阶段类型={engineering_type}")
return category, engineering_type
# 使用映射字典规范化软件类别
if category in CATEGORY_MAPPING:
category = CATEGORY_MAPPING[category]
else:
print(f"警告: division中的软件名称 '{category}' 不是有效值,将使用默认值 '主网'")
category = "主网"
# 映射阶段类型
if any(budget_type in stage_type for budget_type in budget_types):
engineering_type = "预算"
elif any(list_type in stage_type for list_type in list_types):
engineering_type = "清单"
else:
print(f"警告: division中的阶段类型 '{stage_type}' 无法映射到预算或清单,将使用默认值 '清单'")
engineering_type = "清单"
print(f"从division解析: 软件名称={category}, 阶段类型={engineering_type} (原始值: {stage_type})")
return category, engineering_type
else:
print(f"警告: JSON文件中未找到division字段,尝试从basicData中解析")
# 作为备选,尝试从basicData中获取
if "basicData" in data:
basic_data = data["basicData"]
category = basic_data.get("软件名称")
engineering_type = basic_data.get("阶段类型")
# 验证解析结果
if category and engineering_type:
# 使用映射字典规范化软件类别
if category in CATEGORY_MAPPING:
category = CATEGORY_MAPPING[category]
else:
print(f"警告: basicData中的软件名称 '{category}' 不是有效值,将使用默认值 '主网'")
category = "主网"
# 确保engineering_type是有效值
if engineering_type not in ["预算", "清单"]:
print(f"警告: basicData中的阶段类型 '{engineering_type}' 不是有效值,将使用默认值 '清单'")
engineering_type = "清单"
print(f"从basicData解析: 软件名称={category}, 阶段类型={engineering_type}")
return category, engineering_type
else:
print(f"警告: basicData中未找到软件名称或阶段类型")
else:
print(f"警告: JSON文件中未找到basicData部分")
return None, None
except Exception as e:
print(f"解析JSON文件内容时出错: {str(e)}")
return None, None
def process_json_file(
json_file_path: str, output_dir: str, calculate_type: str = "all", project_name: str = None
) -> bool:
"""
处理单个JSON文件
Args:
json_file_path: JSON文件路径
output_dir: 输出目录
calculate_type: 计算类型(all: 全部, quantity: 工程量取费, resource: 人材机合价)
project_name: 项目名称,如果不指定则处理所有项目
Returns:
bool: 处理是否成功
"""
try:
# 从JSON文件内容中解析软件类别和工程类型
category, engineering_type = parse_json_content(json_file_path)
# 如果解析失败,使用默认值
if category is None:
category = "主网"
print(f"无法从文件中解析软件类别,使用默认值: {category}")
if engineering_type is None:
engineering_type = "预算"
print(f"无法从文件中解析工程类型,使用默认值: {engineering_type}")
# 获取软件类型
software_type = get_software_type(category, engineering_type)
print(f"使用软件类型: {software_type.name}")
# 获取计算器
calculator = get_calculator(software_type)
if not calculator:
print(f"错误: 未找到软件类型 {software_type.name} 的计算器")
return False
# 获取文件名(不含扩展名),并替换空格和特殊字符
base_filename = os.path.basename(json_file_path).replace(".json", "")
# 创建安全的目录名(替换空格和特殊字符)
safe_dirname = base_filename.replace(" ", "_").replace("\\", "_").replace("/", "_")
# 设置自定义输出目录
custom_output_dir = os.path.join(output_dir, safe_dirname)
# 确保输出目录存在
try:
os.makedirs(custom_output_dir, exist_ok=True)
print(f"创建输出目录: {custom_output_dir}")
except Exception as e:
print(f"创建输出目录失败: {str(e)}")
# 尝试使用更安全的目录名
safe_dirname = f"project_{hash(base_filename) % 10000}"
custom_output_dir = os.path.join(output_dir, safe_dirname)
os.makedirs(custom_output_dir, exist_ok=True)
print(f"使用备用输出目录: {custom_output_dir}")
# 检查计算器是否有set_output_dir方法,如果有则直接设置输出目录
if hasattr(calculator, "set_output_dir"):
calculator.set_output_dir(custom_output_dir)
print(f"已设置计算器输出目录: {custom_output_dir}")
# 根据计算类型执行计算
if calculate_type in ["all", "quantity"]:
print("开始计算工程量取费表...")
calculator.calculate_quantity_fee_tables(
json_file_path=json_file_path,
project_name=project_name,
)
if calculate_type in ["all", "resource"]:
print("开始计算人材机合价...")
calculator.calculate_resource_fee_tables(json_file_path=json_file_path, project_name=project_name)
else:
# 创建一个新的计算器,复制原始计算器的属性
class CustomOutputCalculator:
def __init__(self, original_calculator, custom_dir):
self.original_calculator = original_calculator
self.custom_dir = custom_dir
def get_output_dir(self):
return self.custom_dir
def __getattr__(self, name):
return getattr(self.original_calculator, name)
# 创建自定义输出目录的计算器
custom_calculator = CustomOutputCalculator(calculator, custom_output_dir)
# 根据计算类型执行计算
if calculate_type in ["all", "quantity"]:
print("开始计算工程量取费表...")
custom_calculator.calculate_quantity_fee_tables(
json_file_path=json_file_path,
project_name=project_name,
)
if calculate_type in ["all", "resource"]:
print("开始计算人材机合价...")
custom_calculator.calculate_resource_fee_tables(
json_file_path=json_file_path, project_name=project_name
)
print(f"文件 {json_file_path} 处理完成")
return True
except Exception as e:
print(f"处理文件 {json_file_path} 时出错: {str(e)}")
import traceback
traceback.print_exc()
return False
def process_directory(input_dir: str, output_dir: str, calculate_type: str = "all") -> None:
"""
批量处理目录中的JSON文件
Args:
input_dir: 输入目录路径
output_dir: 输出目录路径
calculate_type: 计算类型(all: 全部, quantity: 工程量取费, resource: 人材机合价)
"""
# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)
# 获取目录中的所有JSON文件
json_files = [f for f in os.listdir(input_dir) if f.lower().endswith(".json")]
if not json_files:
print(f"警告: 在目录 {input_dir} 中未找到JSON文件")
return
print(f"找到 {len(json_files)} 个JSON文件,开始处理...")
# 处理每个JSON文件
success_count = 0
for i, json_file in enumerate(json_files, 1):
json_file_path = os.path.join(input_dir, json_file)
print(f"处理文件 {i}/{len(json_files)}: {json_file}")
if process_json_file(json_file_path, output_dir, calculate_type):
success_count += 1
print(f"处理完成,成功: {success_count}/{len(json_files)}")
def bcl_calculate(input_dir: str, output_dir: str, calculate_type: str = "all") -> None:
"""
主函数,处理指定目录中的所有JSON文件
Args:
input_dir: 输入目录路径
output_dir: 输出目录路径
calculate_type: 计算类型(all: 全部, quantity: 工程量取费, resource: 人材机合价)
"""
print(f"开始处理目录: {input_dir}")
print(f"输出目录: {output_dir}")
print(f"计算类型: {calculate_type}")
process_directory(input_dir, output_dir, calculate_type)
print("所有文件处理完成")