import json import os import math from typing import Dict, List, Any, Tuple from copy import deepcopy from equipment_calculation.item_acquisition import get_quantity_nodes, get_classified_resource_nodes, load_project_data from bcl_utils import ( ZjMaterialOrEquipmentBCLContext, ZjProjectBCLContext, ZjBillBCLContext, init_bcl_calculator, calculator, ) from item_acquisition import get_quantity_nodes, get_classified_resource_nodes # 人材机节点定义合并条件常量 LABOR_MERGE_CONDITIONS = [ "编码", "名称", "单位", "预算价含税", "预算价不含税", "市场价不含税", "市场价含税", "调差类型", "专业属性", "供货方", "物料类材料", ] MATERIAL_MERGE_CONDITIONS = [ "类型", "编码", "名称", "单位", "预算价不含税", "预算价含税", "市场价不含税", "市场价含税", "调差类型", "专业属性", "供货方", "集中配送", "卸车", "保管", "物料类材料", ] MACHINE_MERGE_CONDITIONS = [ "编码", "名称", "单位", "预算价不含税", "预算价含税", "市场价不含税", "市场价含税", "调差类型", "专业属性", "供货方", "物料类材料", ] from bcl_utils import ( ZjMaterialOrEquipmentBCLContext, ZjProjectBCLContext, ZjBillBCLContext, init_bcl_calculator, calculator, ) def calc_rcj_count( rcj_nodes: List[Tuple[Dict[str, Any], str]], project_children: List[Dict[str, Any]], json_file_path: str = None ) -> List[Dict[str, Any]]: """ 计算人材机节点的数量,考虑父级消耗量 Args: rcj_nodes: 人材机节点列表,每个元素是(节点, 父级ID)元组 project_children: 项目划分级别下的所有工程量节点 json_file_path: JSON文件路径,用于获取工程信息 Returns: List[Dict[str, Any]]: 计算数量后的人材机节点列表 """ result_nodes = [] # 检查rcj_nodes是否为空 if not rcj_nodes: print("没有找到人材机节点") return result_nodes # 检查project_children是否为None if project_children is None: print("没有找到工程量节点,使用默认数量") # 使用默认数量处理所有人材机节点 for node, parent_id in rcj_nodes: node_copy = deepcopy(node) node_copy["计算数量"] = node_copy.get("数量", "1.0") # 保存父级ID node_copy["parent_id"] = parent_id # 由于没有找到父级节点,使用默认名称 node_copy["parent_name"] = "未知工程量节点" result_nodes.append(node_copy) return result_nodes # 创建父级ID到工程量节点的映射 parent_nodes = {} for node in project_children: if "id" in node: parent_nodes[node["id"]] = node print(f"找到 {len(parent_nodes)} 个父级工程量节点") # 初始化BCL计算器 if not init_bcl_calculator(): print("初始化BCL计算器失败,使用默认数量") # 使用默认数量处理所有人材机节点 for node, parent_id in rcj_nodes: node_copy = deepcopy(node) node_copy["计算数量"] = node_copy.get("数量", "1.0") # 保存父级ID node_copy["parent_id"] = parent_id # 由于没有找到父级节点,使用默认名称 node_copy["parent_name"] = "未知工程量节点" result_nodes.append(node_copy) return result_nodes # 创建工程信息上下文(顶层上下文) project_context = ZjProjectBCLContext(json_file_path=json_file_path) # 遍历所有节点,计算数量 for node, parent_id in rcj_nodes: node_copy = deepcopy(node) # 保存父级ID node_copy["parent_id"] = parent_id # 获取父级工程量节点 parent_node = parent_nodes.get(parent_id) if not parent_node: print(f"未找到ID为 {parent_id} 的父级工程量节点,使用默认数量") # 使用默认数量 node_copy["计算数量"] = node_copy.get("数量", "1.0") # 由于没有找到父级节点,使用默认名称 node_copy["parent_name"] = "未知工程量节点" result_nodes.append(node_copy) continue # 保存父级节点名称 parent_name = parent_node.get("项目名称", parent_node.get("name", "未命名工程量")) node_copy["parent_name"] = parent_name # 确保父级节点有数量字段 if "数量" not in parent_node or not parent_node["数量"]: print(f"父级节点 {parent_id} 没有数量字段,使用默认数量") parent_node["数量"] = "1.0" # 打印父级节点信息 print(f"父级节点 {parent_id} ({parent_name}) 的数量: {parent_node.get('数量', '1.0')}") # 创建定额上下文(中间层上下文) ration_context = ZjBillBCLContext(prefix="定额", valueDict=parent_node, prevContext=project_context) # 创建人材机上下文(底层上下文) context = ZjMaterialOrEquipmentBCLContext(node_data=node, parent_node=parent_node, prevContext=ration_context) # 根据节点类型选择不同的计算表达式 node_type = node.get("类型", "") if node_type == "人工": calc_expr = "_材机合并人工数量" elif node_type == "材料": calc_expr = "_材机合并材料数量" elif node_type == "机械": calc_expr = "_材机合并机械数量" else: # 未知类型,使用默认数量 print(f"未知节点类型 {node_type},使用默认数量") node_copy["计算数量"] = node_copy.get("数量", "1.0") result_nodes.append(node_copy) continue try: # 使用BCL计算器计算数量 result = calculator.calculate(calc_expr, context) # 打印调试信息 print(f"计算 {node.get('名称', '未知节点')} 的数量,表达式: {calc_expr}, 结果: {result}") # 处理计算结果 if hasattr(result, "value"): calculated_quantity = result.value elif isinstance(result, (int, float)): calculated_quantity = result else: # 尝试转换为浮点数 try: calculated_quantity = float(result) except (ValueError, TypeError): print(f"无法将计算结果转换为浮点数: {result}") calculated_quantity = float(node_copy.get("数量", "1.0") or "1.0") # 如果计算结果为0,尝试使用原始数量 if calculated_quantity == 0: orig_quantity = float(node_copy.get("数量", "0.0") or "0.0") if orig_quantity > 0: print(f"计算结果为0,使用原始数量: {orig_quantity}") calculated_quantity = orig_quantity # 设置计算数量 node_copy["计算数量"] = str(calculated_quantity) except Exception as e: print(f"计算节点 '{node.get('名称', '未知节点')}' 的数量时出错: {e}") # 使用默认数量 node_copy["计算数量"] = node_copy.get("数量", "1.0") result_nodes.append(node_copy) return result_nodes def generate_node_key(node: Dict[str, Any], conditions: List[str]) -> str: """ 根据合并条件生成节点的唯一键,增加parent_id作为条件 Args: node: 节点 conditions: 合并条件列表 Returns: str: 节点的唯一键 """ key_parts = [] # 添加parent_id作为合并条件的第一个元素 parent_id = node.get("parent_id", "") key_parts.append(f"parent_id:{parent_id}") # 添加其他合并条件 for condition in conditions: value = node.get(condition, "") # 确保值是字符串 if value is None: value = "" elif not isinstance(value, str): value = str(value) key_parts.append(f"{condition}:{value}") return "|".join(key_parts) def merge_similar_nodes(nodes: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ 合并相似的节点 Args: nodes: 节点列表 Returns: List[Dict[str, Any]]: 合并后的节点列表 """ if not nodes: return [] # 按节点类型分组 nodes_by_type = {} for node in nodes: node_type = node.get("类型", "") if node_type not in nodes_by_type: nodes_by_type[node_type] = [] nodes_by_type[node_type].append(node) merged_nodes = [] # 处理每种类型的节点 for node_type, type_nodes in nodes_by_type.items(): # 选择合并条件 if node_type == "人工": conditions = LABOR_MERGE_CONDITIONS elif node_type == "材料": conditions = MATERIAL_MERGE_CONDITIONS elif node_type == "机械": conditions = MACHINE_MERGE_CONDITIONS else: # 未知类型,不合并 merged_nodes.extend(type_nodes) continue # 使用字典存储合并后的节点,键是根据合并条件生成的唯一键 merged_dict = {} for node in type_nodes: key = generate_node_key(node, conditions) if key in merged_dict: # 合并节点 merged_node = merged_dict[key] # 合并数量 merged_quantity = float(merged_node.get("取整数量", "0.0") or "0.0") node_quantity = float(node.get("取整数量", "0.0") or "0.0") merged_node["取整数量"] = str(merged_quantity + node_quantity) # 合并计算数量 merged_calc_quantity = float(merged_node.get("计算数量", "0.0") or "0.0") node_calc_quantity = float(node.get("计算数量", "0.0") or "0.0") merged_node["计算数量"] = str(merged_calc_quantity + node_calc_quantity) # 更新原始数量 merged_orig_quantity = float(merged_node.get("数量", "0.0") or "0.0") node_orig_quantity = float(node.get("数量", "0.0") or "0.0") merged_node["数量"] = str(merged_orig_quantity + node_orig_quantity) # 添加合并信息 if "合并来源" not in merged_node: # 使用唯一标识符,如果没有id字段则使用节点本身的索引 merged_node_id = merged_node.get("id", f"node_{id(merged_node)}") merged_node["合并来源"] = [merged_node_id] # 添加当前节点ID到合并来源 node_id = node.get("id", f"node_{id(node)}") merged_node["合并来源"].append(node_id) # 更新合并数量计数 merged_node["合并数量"] = str(len(merged_node["合并来源"])) else: # 创建新节点 merged_dict[key] = deepcopy(node) # 添加合并信息 node_id = node.get("id", f"node_{id(node)}") merged_dict[key]["合并来源"] = [node_id] merged_dict[key]["合并数量"] = "1" # 将合并后的节点添加到结果列表 merged_nodes.extend(merged_dict.values()) return merged_nodes def cat_rcj_count(rcj_nodes: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ 处理人材机节点数量的变化并合并相似节点 Args: rcj_nodes: 人材机节点列表 Returns: List[Dict[str, Any]]: 处理数量变化后的人材机节点列表 """ # 使用计算数量,不进行向上取整 processed_nodes = [] for node in rcj_nodes: node_copy = deepcopy(node) # 获取计算数量,直接使用,不再向上取整 calc_quantity = float(node_copy.get("计算数量", "0.0") or "0.0") node_copy["取整数量"] = str(calc_quantity) processed_nodes.append(node_copy) # 合并相似节点 merged_nodes = merge_similar_nodes(processed_nodes) return merged_nodes def calc_rcj_fee(rcj_nodes: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ 计算人材机节点的合价 Args: rcj_nodes: 人材机节点列表 Returns: List[Dict[str, Any]]: 计算合价后的人材机节点列表 """ result_nodes = [] for node in rcj_nodes: node_copy = deepcopy(node) # 获取取整数量 quantity = float(node_copy.get("取整数量", "0.0") or "0.0") # 获取预算价不含税和市场价不含税 budget_price = float(node_copy.get("预算价不含税", "0.0") or "0.0") market_price = float(node_copy.get("市场价不含税", "0.0") or "0.0") # 计算预算价合价和市场价合价 budget_total = quantity * budget_price market_total = quantity * market_price # 计算价差 price_diff = market_total - budget_total # 设置合价信息 node_copy["预算价合价"] = str(budget_total) node_copy["市场价合价"] = str(market_total) node_copy["价差"] = str(price_diff) result_nodes.append(node_copy) return result_nodes def format_rcj_output(rcj_nodes: List[Dict[str, Any]], node_type: str) -> List[Dict[str, Any]]: """ 格式化人材机节点输出,只保留需要的字段 Args: rcj_nodes: 人材机节点列表 node_type: 节点类型("人工"、"材料"、"机械") Returns: List[Dict[str, Any]]: 格式化后的节点列表 """ formatted_nodes = [] for node in rcj_nodes: formatted_node = {} # 保留父级工程量节点信息 formatted_node["parent_id"] = node.get("parent_id", "") formatted_node["parent_name"] = node.get("parent_name", "未知工程量节点") # 根据节点类型保留特定字段 if node_type == "人工": # 人工节点保存:编码,名称,单位,数量,预算价不含税,市场价不含税,预算价合价,市场价合价,价差 fields = [ "编码", "名称", "单位", "取整数量", "预算价不含税", "市场价不含税", "预算价合价", "市场价合价", "价差", ] for field in fields: formatted_node[field] = node.get(field, "") elif node_type == "材料": # 材料节点保存:供货方,编码,名称,单位,数量,预算价不含税,市场价不含税,预算价合价,市场价合价,价差 fields = [ "供货方", "编码", "名称", "单位", "取整数量", "预算价不含税", "市场价不含税", "预算价合价", "市场价合价", "价差", ] for field in fields: formatted_node[field] = node.get(field, "") elif node_type == "机械": # 机械节点保存:编码,名称,单位,数量,预算价不含税,市场价不含税,预算价合价,市场价合价,价差 fields = [ "编码", "名称", "单位", "取整数量", "预算价不含税", "市场价不含税", "预算价合价", "市场价合价", "价差", ] for field in fields: formatted_node[field] = node.get(field, "") # 重命名取整数量为数量 if "取整数量" in formatted_node: formatted_node["数量"] = formatted_node.pop("取整数量") formatted_nodes.append(formatted_node) return formatted_nodes def calculate_rcj_fees( json_file_path: str, project_name: str, adjustment_type: str = "拆除", engineering_type: str = "预算工程", project_guid: str = None, ) -> Dict[str, Any]: """ 计算项目划分节点下所有人材机节点的合价 Args: json_file_path: JSON文件路径 project_name: 项目名称 adjustment_type: 调差类型,默认为"拆除" engineering_type: 工程类型,默认为"预算工程" project_guid: 项目GUID,用于区分同名项目 Returns: Dict[str, Any]: 计算结果,按工程量节点分组 """ # 获取工程量节点,传递project_guid参数 project_children = get_quantity_nodes(json_file_path, project_name, adjustment_type, engineering_type, project_guid) # 如果没有找到项目划分节点 if project_children is None: print(f"警告: 未找到项目 '{project_name}' 的项目划分节点") return {} # 如果找到项目划分节点但没有工程量节点 if len(project_children) == 0: print(f"警告: 项目 '{project_name}' (GUID: {project_guid}) 没有工程量节点,跳过计算") return {} # 获取分类后的人材机节点,传递project_guid参数 labor_nodes, material_nodes, machine_nodes = get_classified_resource_nodes( json_file_path, project_name, adjustment_type, project_guid ) # 打印调试信息 print(f"找到 {len(labor_nodes)} 个人工节点, {len(material_nodes)} 个材料节点, {len(machine_nodes)} 个机械节点") # 计算人工节点的数量和合价 labor_with_count = calc_rcj_count(labor_nodes, project_children, json_file_path) labor_with_cat = cat_rcj_count(labor_with_count) labor_with_fee = calc_rcj_fee(labor_with_cat) labor_formatted = format_rcj_output(labor_with_fee, "人工") # 计算材料节点的数量和合价 material_with_count = calc_rcj_count(material_nodes, project_children, json_file_path) material_with_cat = cat_rcj_count(material_with_count) material_with_fee = calc_rcj_fee(material_with_cat) material_formatted = format_rcj_output(material_with_fee, "材料") # 计算机械节点的数量和合价 machine_with_count = calc_rcj_count(machine_nodes, project_children, json_file_path) machine_with_cat = cat_rcj_count(machine_with_count) machine_with_fee = calc_rcj_fee(machine_with_cat) machine_formatted = format_rcj_output(machine_with_fee, "机械") # 创建工程量节点ID到名称的映射 node_id_to_name = {} for node in project_children: if "id" in node: node_name = node.get("项目名称", node.get("name", f"工程量节点_{node['id']}")) node_id_to_name[node["id"]] = node_name # 按工程量节点分组结果 result_by_parent = {} # 处理人工节点 for node in labor_formatted: parent_id = node.pop("parent_id", "") parent_name = node.pop("parent_name", "未知工程量节点") # 使用父级节点名称作为键 parent_key = node_id_to_name.get(parent_id, parent_name) if parent_key not in result_by_parent: result_by_parent[parent_key] = {"人工节点": [], "材料节点": [], "机械节点": []} result_by_parent[parent_key]["人工节点"].append(node) # 处理材料节点 for node in material_formatted: parent_id = node.pop("parent_id", "") parent_name = node.pop("parent_name", "未知工程量节点") # 使用父级节点名称作为键 parent_key = node_id_to_name.get(parent_id, parent_name) if parent_key not in result_by_parent: result_by_parent[parent_key] = {"人工节点": [], "材料节点": [], "机械节点": []} result_by_parent[parent_key]["材料节点"].append(node) # 处理机械节点 for node in machine_formatted: parent_id = node.pop("parent_id", "") parent_name = node.pop("parent_name", "未知工程量节点") # 使用父级节点名称作为键 parent_key = node_id_to_name.get(parent_id, parent_name) if parent_key not in result_by_parent: result_by_parent[parent_key] = {"人工节点": [], "材料节点": [], "机械节点": []} result_by_parent[parent_key]["机械节点"].append(node) # 计算合价只用于调试显示 for parent_key, parent_data in result_by_parent.items(): labor_fee = sum(float(node.get("预算价合价", "0.0") or "0.0") for node in parent_data["人工节点"]) material_fee = sum(float(node.get("预算价合价", "0.0") or "0.0") for node in parent_data["材料节点"]) machine_fee = sum(float(node.get("预算价合价", "0.0") or "0.0") for node in parent_data["机械节点"]) print(f"工程量节点 '{parent_key}': 人工合价={labor_fee}, 材料合价={material_fee}, 机械合价={machine_fee}") return result_by_parent def calculate_resource_fees( json_file_path: str, project_name: str, adjustment_type: str, engineering_type: str, project_guid: str = None, calculation_strategy=None, ) -> str: """ 计算人材机合价 Args: json_file_path: JSON文件路径 project_name: 项目名称 adjustment_type: 调差类型 engineering_type: 工程类型 project_guid: 项目GUID,用于区分同名项目 calculation_strategy: 计算策略,如果为None则使用默认策略 Returns: str: 输出文件路径,如果没有工程量节点则返回None """ # 如果没有提供计算策略,使用默认策略 if calculation_strategy is None: from calculation_strategy import DefaultCalculationStrategy calculation_strategy = DefaultCalculationStrategy() # 获取项目划分节点的GUID _, _, _, _, target_node = load_project_data(json_file_path, project_name, project_guid) # 如果传入了GUID但与节点的GUID不一致,优先使用传入的GUID node_guid = target_node.get("GUID") or target_node.get("guid", "") if target_node else "" project_guid = project_guid if project_guid else node_guid # 将文件名中的非法字符替换为下划线 safe_project_name = ( project_name.replace("/", "_") .replace("\\", "_") .replace(":", "_") .replace("*", "_") .replace("?", "_") .replace('"', "_") .replace("<", "_") .replace(">", "_") .replace("|", "_") ) # 将GUID中的非法字符替换为下划线 safe_project_guid = "" if project_guid: safe_project_guid = ( project_guid.replace("/", "_") .replace("\\", "_") .replace(":", "_") .replace("*", "_") .replace("?", "_") .replace('"', "_") .replace("<", "_") .replace(">", "_") .replace("|", "_") .replace("{", "") .replace("}", "") ) # 添加下划线作为分隔符 safe_project_guid = f"_{safe_project_guid}" # 设置输出目录和文件名 output_dir = "计算结果" os.makedirs(output_dir, exist_ok=True) rcj_output_file = os.path.join( output_dir, f"{safe_project_name}{safe_project_guid}_{adjustment_type}_rcj_fees.json" ) # 计算人材机节点的合价,传递project_guid参数 # 这里使用 calculation_strategy 的 calculate_rcj_fees 方法 if hasattr(calculation_strategy, "calculate_rcj_fees"): rcj_results = calculation_strategy.calculate_rcj_fees( json_file_path, project_name, adjustment_type, engineering_type, project_guid ) else: # 如果计算策略没有实现 calculate_rcj_fees 方法,使用原始函数 rcj_results = calculate_rcj_fees(json_file_path, project_name, adjustment_type, engineering_type, project_guid) # 检查是否有人材机数据 - 适应新的按工程量节点分组的结构 has_data = False # 如果是旧结构(直接包含人工节点、材料节点、机械节点) if any(key in rcj_results for key in ["人工节点", "材料节点", "机械节点"]): for node_type in ["人工节点", "材料节点", "机械节点"]: if rcj_results.get(node_type) and len(rcj_results[node_type]) > 0: has_data = True break # 如果是新结构(按工程量节点分组) else: for parent_name, parent_data in rcj_results.items(): for node_type in ["人工节点", "材料节点", "机械节点"]: if parent_data.get(node_type) and len(parent_data[node_type]) > 0: has_data = True break if has_data: break if not has_data: print(f"项目 '{project_name}' (GUID: {project_guid}) 没有人材机数据,不保存结果") return None # 保存人材机合价计算结果到JSON文件 with open(rcj_output_file, "w", encoding="utf-8") as f: json.dump(rcj_results, f, ensure_ascii=False, indent=2) print(f"\n人材机合价计算结果已保存到 {rcj_output_file}") return rcj_output_file