删除 equipment_calculation/software_calculators.py

This commit is contained in:
2025-08-02 14:54:33 +08:00
parent 377f7012f0
commit e0fc238765
@@ -1,703 +0,0 @@
from equipment_calculation.calculator_base import CalculatorBase
from equipment_calculation.software_types import (
SoftwareType,
MAIN_GRID_BUDGET,
MAIN_GRID_BILL,
DISTRIBUTION_BUDGET,
DISTRIBUTION_BILL,
TECHNICAL_RENOVATION_BUDGET,
TECHNICAL_RENOVATION_BILL,
)
from equipment_calculation.calculation_strategy import CalculationStrategy, DefaultCalculationStrategy
import json
import os
from typing import Any, Dict, Optional, Set
import sys
import re
# 添加一个全局变量和一个缓存字典来存储清单数量
_BILL_QUANTITY = 1.0
bill_quantity_cache = {}
# 为每种软件类型定义特定的计算策略
class MainGridBudgetCalculationStrategy(DefaultCalculationStrategy):
"""主网预算计算策略"""
def preprocess_quantity_fee_node(self, node: dict) -> None:
"""
工程量取费表节点预处理规则:
- 类型为"1""主材"时,供货方映射,主材属性补全
- 类型为"5""设备"时,供货方映射,设备类型映射
"""
node_type = str(node.get("类型", ""))
# 定额处理
if node_type == "0" or node_type == "定额":
supplier = str(node.get("费用类型", ""))
if supplier == "1":
node["费用类型"] = "不取费"
elif supplier == "0":
node["费用类型"] = "取费"
# 处理特征段
seg_val = node.get("特征段", "")
if isinstance(seg_val, str):
num = re.search(r"\d+", seg_val)
node["特征段"] = num.group() if num else ""
# 主材处理
if node_type == "1" or node_type == "主材":
supplier = str(node.get("供货方", ""))
if supplier == "1":
node["供货方"] = "甲供"
elif supplier == "2":
node["供货方"] = "乙供"
# 补全主材属性
for key in ["拆分", "设备性材料"]:
if key not in node:
node[key] = 0
# 设备处理
if node_type == "5" or node_type == "设备":
supplier = str(node.get("供货方", ""))
if supplier == "1":
node["供货方"] = "甲供"
elif supplier == "2":
node["供货方"] = "乙供"
# 设备类型映射
if str(node.get("设备类型", "")) == "0":
node["设备类型"] = "普通设备"
class MainGridBillCalculationStrategy(DefaultCalculationStrategy):
"""主网清单计算策略"""
def __init__(self):
super().__init__()
self.bill_quantity = 1.0 # 默认值
def set_bill_quantity(self, quantity):
"""设置清单数量"""
try:
self.bill_quantity = float(quantity)
print(f"设置计算策略的清单数量: {self.bill_quantity}")
except (TypeError, ValueError):
print(f"无法将 {quantity} 转换为浮点数")
def calculate_external_variable(self, var_name: str, context: Any) -> float:
"""计算表外变量,并乘以父级清单的数量"""
# 使用默认实现计算表外变量
from equipment_calculation.bcl_utils import calculator
result = calculator.calculate(var_name, context)
# 获取清单数量 - 首先从全局变量获取
global _BILL_QUANTITY # 在模块级别定义一个全局变量
parent_quantity = _BILL_QUANTITY if "_BILL_QUANTITY" in globals() else 1.0
print(f"\n===== 获取清单数量 =====")
print(f"从全局变量获取清单数量: {parent_quantity}")
# 如果全局变量没有设置正确的值,尝试从bill_quantity_cache获取
if parent_quantity == 1.0 and hasattr(context, "__dict__"):
module = sys.modules.get("quantity_fee_calculator")
if module and hasattr(module, "bill_quantity_cache"):
bill_cache = getattr(module, "bill_quantity_cache")
bill_id = None
# 尝试从上下文中获取bill_id
if hasattr(context, "bill_id"):
bill_id = context.bill_id
elif hasattr(context, "datasource") and context.datasource:
for item in context.datasource:
if hasattr(item, "object") and hasattr(item.object, "bill_id"):
bill_id = item.object.bill_id
break
# 如果找到bill_id并且在缓存中,使用缓存的数量
if bill_id and bill_id in bill_cache:
parent_quantity = bill_cache[bill_id]
print(f"从bill_quantity_cache获取清单{bill_id}的数量: {parent_quantity}")
# 设置一个临时环境变量作为最后的手段
if parent_quantity == 1.0:
import os
env_quantity = os.environ.get("BILL_QUANTITY")
if env_quantity:
try:
parent_quantity = float(env_quantity)
print(f"从环境变量获取清单数量: {parent_quantity}")
except:
pass
print(f"===== 获取清单数量结束 =====\n")
# 将结果乘以父级清单的数量
final_result = float(result) * parent_quantity if result is not None else 0.0
print(f"表外变量 '{var_name}' 的值为 {result},父级清单数量为 {parent_quantity},最终结果为 {final_result}")
return final_result
def preprocess_quantity_fee_node(self, node: dict) -> None:
"""
工程量取费表节点预处理规则:
- 类型为"1""主材"时,供货方映射,主材属性补全
- 类型为"5""设备"时,供货方映射,设备类型映射
"""
node_type = str(node.get("类型", ""))
# 主材处理
if node_type == "1" or node_type == "主材":
supplier = str(node.get("供货方", ""))
if supplier == "1":
node["供货方"] = "甲供"
elif supplier == "2":
node["供货方"] = "乙供"
# 补全主材属性
for key in ["拆分", "设备性材料", "损耗", "预算价含税"]:
if key not in node:
node[key] = 0
# 设备处理
if node_type == "5" or node_type == "设备":
supplier = str(node.get("供货方", ""))
if supplier == "1":
node["供货方"] = "甲供"
elif supplier == "2":
node["供货方"] = "乙供"
# 设备类型映射
if str(node.get("设备类型", "")) == "0":
node["设备类型"] = "普通设备"
def post_process_quantity_fees(self, output_file: str, project_name: str) -> None:
"""将取费表中的'综合单价'替换为'合价'"""
try:
# 读取输出文件
with open(output_file, "r", encoding="utf-8") as f:
data = json.load(f)
# 递归查找并替换"综合单价"为"合价"
def replace_key(obj):
if isinstance(obj, dict):
new_obj = {}
for key, value in obj.items():
# 替换键名
new_key = "合价" if key == "综合单价" else key
# 递归处理值
new_obj[new_key] = replace_key(value)
return new_obj
elif isinstance(obj, list):
return [replace_key(item) for item in obj]
else:
return obj
# 替换所有出现的"综合单价"
new_data = replace_key(data)
# 写回文件
with open(output_file, "w", encoding="utf-8") as f:
json.dump(new_data, f, ensure_ascii=False, indent=2)
print(f"已将取费表中的'综合单价'替换为'合价'")
except Exception as e:
print(f"修改取费表时出错: {e}")
class DistributionBudgetCalculationStrategy(DefaultCalculationStrategy):
"""配网预算计算策略"""
# def calculate_fee_base(
# self, fee_base: str, cost_table: dict, project_node: dict, context, in_calculation=None
# ) -> float:
# """计算取费基数,可以重写以添加配网预算特定的计算规则"""
# # 示例:如果取费基数是特定值,使用特定的计算规则
# if fee_base == "材料费":
# print(f"应用配网预算特定的取费基数计算规则: {fee_base}")
# # 这里可以添加特定的计算逻辑
# # 暂时仍然使用默认实现
# return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation)
# # 对于其他取费基数,使用默认实现
# return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation)
class DistributionBillCalculationStrategy(DefaultCalculationStrategy):
"""配网清单计算策略"""
# def calculate_fee_base(
# self, fee_base: str, cost_table: dict, project_node: dict, context, in_calculation=None
# ) -> float:
# """计算取费基数,可以重写以添加配网清单特定的计算规则"""
# # 示例:如果取费基数是特定值,使用特定的计算规则
# if fee_base == "机械费":
# print(f"应用配网清单特定的取费基数计算规则: {fee_base}")
# # 这里可以添加特定的计算逻辑
# # 暂时仍然使用默认实现
# return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation)
# # 对于其他取费基数,使用默认实现
# return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation)
class TechnicalRenovationBudgetCalculationStrategy(DefaultCalculationStrategy):
"""技改预算计算策略"""
# def calculate_fee_base(
# self, fee_base: str, cost_table: dict, project_node: dict, context, in_calculation=None
# ) -> float:
# """计算取费基数,可以重写以添加技改预算特定的计算规则"""
# # 示例:如果取费基数是特定值,使用特定的计算规则
# if fee_base == "人工费+材料费":
# print(f"应用技改预算特定的取费基数计算规则: {fee_base}")
# # 这里可以添加特定的计算逻辑
# # 暂时仍然使用默认实现
# return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation)
# # 对于其他取费基数,使用默认实现
# return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation)
class TechnicalRenovationBillCalculationStrategy(DefaultCalculationStrategy):
"""技改清单计算策略"""
# def calculate_fee_base(
# self, fee_base: str, cost_table: dict, project_node: dict, context, in_calculation=None
# ) -> float:
# """计算取费基数,可以重写以添加技改清单特定的计算规则"""
# # 示例:如果取费基数是特定值,使用特定的计算规则
# if fee_base == "人工费+材料费+机械费":
# print(f"应用技改清单特定的取费基数计算规则: {fee_base}")
# # 这里可以添加特定的计算逻辑
# # 暂时仍然使用默认实现
# return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation)
# # 对于其他取费基数,使用默认实现
# return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation)
def calculate_external_variable(self, var_name: str, context: Any) -> float:
"""计算表外变量,并乘以父级清单的数量"""
# 使用默认实现计算表外变量
from equipment_calculation.bcl_utils import calculator
result = calculator.calculate(var_name, context)
# 获取清单数量 - 首先从全局变量获取
global _BILL_QUANTITY # 在模块级别定义一个全局变量
parent_quantity = _BILL_QUANTITY if "_BILL_QUANTITY" in globals() else 1.0
print(f"\n===== 获取清单数量 =====")
print(f"从全局变量获取清单数量: {parent_quantity}")
# 如果全局变量没有设置正确的值,尝试从bill_quantity_cache获取
if parent_quantity == 1.0 and hasattr(context, "__dict__"):
module = sys.modules.get("quantity_fee_calculator")
if module and hasattr(module, "bill_quantity_cache"):
bill_cache = getattr(module, "bill_quantity_cache")
bill_id = None
# 尝试从上下文中获取bill_id
if hasattr(context, "bill_id"):
bill_id = context.bill_id
elif hasattr(context, "datasource") and context.datasource:
for item in context.datasource:
if hasattr(item, "object") and hasattr(item.object, "bill_id"):
bill_id = item.object.bill_id
break
# 如果找到bill_id并且在缓存中,使用缓存的数量
if bill_id and bill_id in bill_cache:
parent_quantity = bill_cache[bill_id]
print(f"从bill_quantity_cache获取清单{bill_id}的数量: {parent_quantity}")
# 设置一个临时环境变量作为最后的手段
if parent_quantity == 1.0:
import os
env_quantity = os.environ.get("BILL_QUANTITY")
if env_quantity:
try:
parent_quantity = float(env_quantity)
print(f"从环境变量获取清单数量: {parent_quantity}")
except:
pass
print(f"===== 获取清单数量结束 =====\n")
# 将结果乘以父级清单的数量
final_result = float(result) * parent_quantity if result is not None else 0.0
print(f"表外变量 '{var_name}' 的值为 {result},父级清单数量为 {parent_quantity},最终结果为 {final_result}")
return final_result
def preprocess_quantity_fee_node(self, node: dict) -> None:
"""
工程量取费表节点预处理规则:
- 类型为"1""主材"时,供货方映射,主材属性补全
- 类型为"5""设备"时,供货方映射,设备类型映射
"""
node_type = str(node.get("类型", ""))
# 主材处理
if node_type == "1" or node_type == "主材":
supplier = str(node.get("供货方", ""))
if supplier == "1":
node["供货方"] = "甲供"
elif supplier == "2":
node["供货方"] = "乙供"
# 补全主材属性
for key in ["拆分", "设备性材料", "损耗", "预算价含税"]:
if key not in node:
node[key] = 0
# 设备处理
if node_type == "5" or node_type == "设备":
supplier = str(node.get("供货方", ""))
if supplier == "1":
node["供货方"] = "甲供"
elif supplier == "2":
node["供货方"] = "乙供"
# 设备类型映射
if str(node.get("设备类型", "")) == "0":
node["设备类型"] = "普通设备"
def post_process_quantity_fees(self, output_file: str, project_name: str) -> None:
"""将取费表中的'综合单价'替换为'合价'"""
try:
# 读取输出文件
with open(output_file, "r", encoding="utf-8") as f:
data = json.load(f)
# 递归查找并替换"综合单价"为"合价"
def replace_key(obj):
if isinstance(obj, dict):
new_obj = {}
for key, value in obj.items():
# 替换键名
new_key = "合价" if key == "综合单价" else key
# 递归处理值
new_obj[new_key] = replace_key(value)
return new_obj
elif isinstance(obj, list):
return [replace_key(item) for item in obj]
else:
return obj
# 替换所有出现的"综合单价"
new_data = replace_key(data)
# 写回文件
with open(output_file, "w", encoding="utf-8") as f:
json.dump(new_data, f, ensure_ascii=False, indent=2)
print(f"已将取费表中的'综合单价'替换为'合价'")
except Exception as e:
print(f"修改取费表时出错: {e}")
class MainGridBudgetCalculator(CalculatorBase):
"""主网预算计算器"""
def __init__(self):
super().__init__(MAIN_GRID_BUDGET)
def create_calculation_strategy(self) -> CalculationStrategy:
"""创建主网预算计算策略"""
return MainGridBudgetCalculationStrategy()
def set_output_dir(self, output_dir):
"""
设置计算结果的输出目录
:param output_dir: 输出目录的路径
"""
self.output_dir = output_dir
def apply_quantity_fee_rules(self) -> None:
"""应用主网预算特定的工程量取费规则"""
print(f"应用{self.software_type.name}特定的工程量取费规则 - 对节点进行预处理")
# 创建预处理函数
def preprocess_nodes_recursive(json_file_path):
try:
# 读取 JSON 文件
with open(json_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# 获取工程量数据
project_data = data.get("projectData", {})
quantity_data = project_data.get("quantity", {})
# 递归处理所有节点
def process_node(node):
# 调用计算策略的预处理方法
if isinstance(node, dict):
self.calculation_strategy.preprocess_quantity_fee_node(node)
# 处理子节点
if "children" in node and isinstance(node["children"], list):
for child in node["children"]:
process_node(child)
# 处理所有工程量节点
process_node(quantity_data)
# 写回 JSON 文件
with open(json_file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"已完成对 {json_file_path} 中所有节点的预处理")
except Exception as e:
print(f"预处理节点时出错: {e}")
# 不在这里调用预处理函数,因为我们还不知道 JSON 文件路径
# 这个函数会在 calculate_quantity_fee_tables 中调用
# preprocess_nodes_recursive(json_file_path)
# 只需实现一个空函数,返回预处理函数以便后续调用
return preprocess_nodes_recursive
def apply_resource_fee_rules(self) -> None:
"""应用主网预算特定的人材机合价规则"""
print(f"应用{self.software_type.name}特定的人材机合价规则")
def post_process_quantity_fees(self, output_file: str, project_name: str) -> None:
"""应用主网预算特定的工程量取费后处理规则"""
print(f"应用{self.software_type.name}特定的工程量取费后处理规则")
def post_process_resource_fees(self, output_file: str, project_name: str) -> None:
"""应用主网预算特定的人材机合价后处理规则"""
print(f"应用{self.software_type.name}特定的人材机合价后处理规则")
class MainGridBillCalculator(CalculatorBase):
"""主网清单计算器"""
def __init__(self):
super().__init__(MAIN_GRID_BILL)
def create_calculation_strategy(self) -> CalculationStrategy:
"""创建主网清单计算策略"""
return MainGridBillCalculationStrategy()
def set_output_dir(self, output_dir):
"""
设置计算结果的输出目录
:param output_dir: 输出目录的路径
"""
self.output_dir = output_dir
def apply_quantity_fee_rules(self) -> None:
"""应用主网清单特定的工程量取费规则"""
print(f"应用{self.software_type.name}特定的工程量取费规则 - 对节点进行预处理")
# 创建预处理函数,这个函数会处理JSON文件中的节点
def preprocess_nodes_recursive(json_file_path):
try:
# 读取 JSON 文件
with open(json_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# 获取工程量数据
project_data = data.get("projectData", {})
quantity_data = project_data.get("quantity", {})
bill_data = project_data.get("bill", {})
print(f"开始预处理工程量和清单节点...")
# 递归处理所有节点
def process_node(node):
if isinstance(node, dict):
self.calculation_strategy.preprocess_quantity_fee_node(node)
if "children" in node and isinstance(node["children"], list):
for child in node["children"]:
process_node(child)
# 处理工程量节点
process_node(quantity_data)
# 也处理清单节点,因为对于清单工程,清单节点信息也很重要
process_node(bill_data)
# 写回文件
with open(json_file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"已完成对 {json_file_path} 中所有节点的预处理")
except Exception as e:
print(f"预处理节点时出错: {e}")
import traceback
traceback.print_exc()
# 返回预处理函数,供 calculate_quantity_fee_tables 调用
return preprocess_nodes_recursive
def apply_resource_fee_rules(self) -> None:
"""应用主网清单特定的人材机合价规则"""
print(f"应用{self.software_type.name}特定的人材机合价规则")
def post_process_quantity_fees(self, output_file: str, project_name: str) -> None:
"""应用主网清单特定的工程量取费后处理规则"""
print(f"应用{self.software_type.name}特定的工程量取费后处理规则")
def post_process_resource_fees(self, output_file: str, project_name: str) -> None:
"""应用主网清单特定的人材机合价后处理规则"""
print(f"应用{self.software_type.name}特定的人材机合价后处理规则")
class DistributionBudgetCalculator(CalculatorBase):
"""配网预算计算器"""
def __init__(self):
super().__init__(DISTRIBUTION_BUDGET)
def set_output_dir(self, output_dir):
"""
设置计算结果的输出目录
:param output_dir: 输出目录的路径
"""
self.output_dir = output_dir
def create_calculation_strategy(self) -> CalculationStrategy:
"""创建配网预算计算策略"""
return DistributionBudgetCalculationStrategy()
def apply_quantity_fee_rules(self) -> None:
"""应用配网预算特定的工程量取费规则"""
print(f"应用{self.software_type.name}特定的工程量取费规则")
def apply_resource_fee_rules(self) -> None:
"""应用配网预算特定的人材机合价规则"""
print(f"应用{self.software_type.name}特定的人材机合价规则")
def post_process_quantity_fees(self, output_file: str, project_name: str) -> None:
"""应用配网预算特定的工程量取费后处理规则"""
print(f"应用{self.software_type.name}特定的工程量取费后处理规则")
def post_process_resource_fees(self, output_file: str, project_name: str) -> None:
"""应用配网预算特定的人材机合价后处理规则"""
print(f"应用{self.software_type.name}特定的人材机合价后处理规则")
class DistributionBillCalculator(CalculatorBase):
"""配网清单计算器"""
def __init__(self):
super().__init__(DISTRIBUTION_BILL)
def create_calculation_strategy(self) -> CalculationStrategy:
"""创建配网清单计算策略"""
return DistributionBillCalculationStrategy()
def set_output_dir(self, output_dir):
"""
设置计算结果的输出目录
:param output_dir: 输出目录的路径
"""
self.output_dir = output_dir
def apply_quantity_fee_rules(self) -> None:
"""应用配网清单特定的工程量取费规则"""
print(f"应用{self.software_type.name}特定的工程量取费规则")
def apply_resource_fee_rules(self) -> None:
"""应用配网清单特定的人材机合价规则"""
print(f"应用{self.software_type.name}特定的人材机合价规则")
def post_process_quantity_fees(self, output_file: str, project_name: str) -> None:
"""应用配网清单特定的工程量取费后处理规则"""
print(f"应用{self.software_type.name}特定的工程量取费后处理规则")
def post_process_resource_fees(self, output_file: str, project_name: str) -> None:
"""应用配网清单特定的人材机合价后处理规则"""
print(f"应用{self.software_type.name}特定的人材机合价后处理规则")
class TechnicalRenovationBudgetCalculator(CalculatorBase):
"""技改预算计算器"""
def __init__(self):
super().__init__(TECHNICAL_RENOVATION_BUDGET)
def create_calculation_strategy(self) -> CalculationStrategy:
"""创建技改预算计算策略"""
return TechnicalRenovationBudgetCalculationStrategy()
def set_output_dir(self, output_dir):
"""
设置计算结果的输出目录
:param output_dir: 输出目录的路径
"""
self.output_dir = output_dir
def apply_quantity_fee_rules(self) -> None:
"""应用技改预算特定的工程量取费规则"""
print(f"应用{self.software_type.name}特定的工程量取费规则")
def apply_resource_fee_rules(self) -> None:
"""应用技改预算特定的人材机合价规则"""
print(f"应用{self.software_type.name}特定的人材机合价规则")
def post_process_quantity_fees(self, output_file: str, project_name: str) -> None:
"""应用技改预算特定的工程量取费后处理规则"""
print(f"应用{self.software_type.name}特定的工程量取费后处理规则")
def post_process_resource_fees(self, output_file: str, project_name: str) -> None:
"""应用技改预算特定的人材机合价后处理规则"""
print(f"应用{self.software_type.name}特定的人材机合价后处理规则")
class TechnicalRenovationBillCalculator(CalculatorBase):
"""技改清单计算器"""
def __init__(self):
super().__init__(TECHNICAL_RENOVATION_BILL)
def create_calculation_strategy(self) -> CalculationStrategy:
"""创建技改清单计算策略"""
return TechnicalRenovationBillCalculationStrategy()
def set_output_dir(self, output_dir):
"""
设置计算结果的输出目录
:param output_dir: 输出目录的路径
"""
self.output_dir = output_dir
def apply_quantity_fee_rules(self) -> None:
"""应用技改清单特定的工程量取费规则"""
print(f"应用{self.software_type.name}特定的工程量取费规则")
def apply_resource_fee_rules(self) -> None:
"""应用技改清单特定的人材机合价规则"""
print(f"应用{self.software_type.name}特定的人材机合价规则")
def post_process_quantity_fees(self, output_file: str, project_name: str) -> None:
"""应用技改清单特定的工程量取费后处理规则"""
print(f"应用{self.software_type.name}特定的工程量取费后处理规则")
def post_process_resource_fees(self, output_file: str, project_name: str) -> None:
"""应用技改清单特定的人材机合价后处理规则"""
print(f"应用{self.software_type.name}特定的人材机合价后处理规则")
def get_calculator(software_type: SoftwareType) -> CalculatorBase:
"""
根据软件类型获取对应的计算器
Args:
software_type: 软件类型
Returns:
CalculatorBase: 计算器实例
"""
calculators = {
MAIN_GRID_BUDGET.name: MainGridBudgetCalculator(),
MAIN_GRID_BILL.name: MainGridBillCalculator(),
DISTRIBUTION_BUDGET.name: DistributionBudgetCalculator(),
DISTRIBUTION_BILL.name: DistributionBillCalculator(),
TECHNICAL_RENOVATION_BUDGET.name: TechnicalRenovationBudgetCalculator(),
TECHNICAL_RENOVATION_BILL.name: TechnicalRenovationBillCalculator(),
}
return calculators.get(software_type.name)