import json import os import math from typing import Dict, List, Any, Tuple from copy import deepcopy from memory_profiler import profile from equipment_calculation.item_acquisition import get_quantity_nodes, get_classified_resource_nodes, load_project_data # 修改后的代码:先将字典转换为对象 from equipment_calculation.bcl_utils import create_node_from_type, create_material_or_equipment_from_node from equipment_calculation.bcl_utils import ( ZjMaterialOrEquipmentBCLContext, ZjProjectBCLContext, BCLVariant, ZjBillBCLContext, init_bcl_calculator, create_project_contexts, calculator, ) from equipment_calculation.bcl_utils import BCLDataSourceItem, BCLDataSourceContext from equipment_calculation.item_acquisition import get_quantity_nodes, get_classified_resource_nodes # 人材机节点定义合并条件常量 LABOR_MERGE_CONDITIONS = [ "编码", "名称", "单位", "预算价含税", "预算价不含税", "市场价不含税", "市场价含税", "调差类型", "专业属性", "供货方", "物料类材料", ] MATERIAL_MERGE_CONDITIONS = [ "类型", "编码", "名称", "单位", "预算价不含税", "预算价含税", "市场价不含税", "市场价含税", "调差类型", "专业属性", "供货方", "集中配送", "卸车", "保管", "物料类材料", ] MACHINE_MERGE_CONDITIONS = [ "编码", "名称", "单位", "预算价不含税", "预算价含税", "市场价不含税", "市场价含税", "调差类型", "专业属性", "供货方", "物料类材料", ] def process_DXdata(json_data): """ 处理 projectData 中的线路特征段数据,计算每条 bpBillZhXsTable 记录的加权值, 并返回一个字典列表,每个字典包含 '特征段' 和对应的 ItemName: 计算结果。 :param json_data: 解析后的 JSON 数据(字典格式) :return: 字典列表,例如 [{"特征段": "1", "工地运输...": "21.8"}, ...] """ result_list = [] # 遍历每个线路特征段 for segment in json_data.get("projectData", {}).get("线路特征段", []): seg_name = segment.get("Name", "") # 提取特征段中的数字,例如 "特征段1" -> "1" seg_number = "".join(filter(str.isdigit, seg_name)) if not seg_number: seg_number = "0" # 默认值 # 获取 bpBillZhXsTable 列表 table_list = segment.get("bpBillZhXsTable", []) for item in table_list: try: # 提取字段并转换为浮点数 KnapScale = float(item.get("KnapScale", 0)) Knap = float(item.get("Knap", 0)) HillScale = float(item.get("HillScale", 0)) Hill = float(item.get("Hill", 0)) EdelweissScale = float(item.get("EdelweissScale", 0)) edelweiss = float(item.get("edelweiss", 0)) MountainScale = float(item.get("MountainScale", 0)) Mountain = float(item.get("Mountain", 0)) SloughScale = float(item.get("SloughScale", 0)) Slough = float(item.get("Slough", 0)) RiverScale = float(item.get("RiverScale", 0)) River = float(item.get("River", 0)) DesertScale = float(item.get("DesertScale", 0)) Desert = float(item.get("Desert", 0)) # 执行计算 total = ( (KnapScale * Knap) + (HillScale * Hill) + (EdelweissScale * edelweiss) + (MountainScale * Mountain) + (SloughScale * Slough) + (RiverScale * River) + (DesertScale * Desert) ) * 0.01 # 保留一位小数,格式化为字符串 calculated_value = f"{total:.1f}" # 获取 ItemName item_name = item.get("ItemName", "未知项目") # 构建结果字典 item_dict = {"特征段": seg_number, item_name: calculated_value} # 添加到结果列表 result_list.append(item_dict) except (ValueError, TypeError) as e: print(f"数据转换错误,跳过该项: {e}") continue return result_list # 在 resource_fee_calculator.py 文件中修改 calc_rcj_count 函数中的相关代码 def calc_rcj_count( rcj_nodes: List[Tuple[Dict[str, Any], str]], project_children: List[Dict[str, Any]], json_file_path: str = None, json_data: dict | None = None, ) -> List[Dict[str, Any]]: """ 计算人材机节点的数量,考虑父级消耗量 Args: rcj_nodes: 人材机节点列表,每个元素是(节点, 父级ID)元组 project_children: 项目划分级别下的所有工程量节点 json_file_path: JSON文件路径,用于获取工程信息 Returns: List[Dict[str, Any]]: 计算数量后的人材机节点列表 """ result_nodes = [] # 检查rcj_nodes是否为空 if not rcj_nodes: print("没有找到人材机节点") return result_nodes # 检查project_children是否为None if project_children is None: print("没有找到工程量节点,使用默认数量") # 使用默认数量处理所有人材机节点 for node, parent_id in rcj_nodes: node_copy = deepcopy(node) node_copy["计算数量"] = node_copy.get("数量", "1.0") # 保存父级ID node_copy["parent_id"] = parent_id # 由于没有找到父级节点,使用默认名称 node_copy["parent_name"] = "未知工程量节点" result_nodes.append(node_copy) return result_nodes # 创建父级ID到工程量节点的映射 parent_nodes = {} for node in project_children: # 如果节点没有id,使用节点的名称作为id node_id = node.get("id") if not node_id: # 使用节点名称或其他唯一标识符作为ID node_id = node.get("项目名称", node.get("name", "")) # 如果还是没有,使用内存地址作为唯一标识符 if not node_id: node_id = f"node_{id(node)}" # 将生成的ID添加到节点中 node["id"] = node_id print(f"为工程量节点 '{node.get('项目名称', node.get('name', '未命名'))}' 分配ID: {node_id}") parent_nodes[node_id] = node node_name = node.get("项目名称", node.get("name", "未命名")) print(f"工程量节点: ID={node_id}, 名称={node_name}") print(f"找到 {len(parent_nodes)} 个父级工程量节点") # 如果只有一个工程量节点,为所有没有父级ID的人材机节点分配这个工程量节点作为父级 default_parent_node = None default_parent_id = None if len(parent_nodes) == 1: default_parent_id = list(parent_nodes.keys())[0] default_parent_node = list(parent_nodes.values())[0] print( f"只有一个工程量节点,将用作默认父级: ID={default_parent_id}, 名称={default_parent_node.get('项目名称', default_parent_node.get('name', '未命名'))}" ) # 添加调试信息,打印人材机节点的父级ID for i, (node, parent_id) in enumerate(rcj_nodes): node_name = node.get("名称", "未命名") # 如果人材机节点没有父级ID且有默认父级,分配默认父级 if parent_id is None and default_parent_id is not None: rcj_nodes[i] = (node, default_parent_id) parent_id = default_parent_id print(f"人材机节点: 名称={node_name}, 分配默认父级ID={parent_id}") else: print(f"人材机节点: 名称={node_name}, 父级ID={parent_id}") # 初始化BCL计算器 if not init_bcl_calculator(): print("初始化BCL计算器失败,使用默认数量") # 使用默认数量处理所有人材机节点 for node, parent_id in rcj_nodes: node_copy = deepcopy(node) node_copy["计算数量"] = node_copy.get("数量", "1.0") # 保存父级ID node_copy["parent_id"] = parent_id # 由于没有找到父级节点,使用默认名称 node_copy["parent_name"] = "未知工程量节点" result_nodes.append(node_copy) return result_nodes # 创建工程信息上下文(顶层上下文,优先使用 json_data) project_context = create_project_contexts(json_file_path=json_file_path, json_data=json_data) # 遍历所有节点,计算数量 for node, parent_id in rcj_nodes: node_copy = deepcopy(node) # 保存父级ID node_copy["parent_id"] = parent_id # 获取父级工程量节点 parent_node = parent_nodes.get(parent_id) if not parent_node: print(f"未找到ID为 {parent_id} 的父级工程量节点,使用默认数量") # 使用默认数量 node_copy["计算数量"] = node_copy.get("数量", "1.0") # 由于没有找到父级节点,使用默认名称 node_copy["parent_name"] = "未知工程量节点" result_nodes.append(node_copy) continue # 保存父级节点名称 parent_name = parent_node.get("项目名称", parent_node.get("name", "未命名工程量")) node_copy["parent_name"] = parent_name # 确保父级节点有数量字段 if "数量" not in parent_node or not parent_node["数量"]: print(f"父级节点 {parent_id} 没有数量字段,使用默认数量") parent_node["数量"] = "1.0" # 打印父级节点信息 print(f"父级节点 {parent_id} ({parent_name}) 的数量: {parent_node.get('数量', '1.0')}") # 原始代码 # ration_context = ZjBillBCLContext(prefix="定额", valueDict=parent_node, prevContext=project_context) # context = ZjMaterialOrEquipmentBCLContext(node_data=node, parent_node=parent_node, prevContext=ration_context) # 将父级节点转换为对象 parent_obj = create_node_from_type(parent_node) # 将人材机节点转换为对象 node_obj = create_material_or_equipment_from_node(node) # 使用对象创建数据源项 QuantityItem = BCLDataSourceItem(parent_obj) moeItem = BCLDataSourceItem(node_obj) context = BCLDataSourceContext([QuantityItem, moeItem], project_context) # 根据节点类型选择不同的计算表达式 node_type = node.get("类型", node.get("type", "")) if node_type == "人工" or node_type == "2": calc_expr = "_材机合并人工数量" elif node_type == "材料" or node_type == "3": calc_expr = "_材机合并材料数量" elif node_type == "机械" or node_type == "4": calc_expr = "_材机合并机械数量" else: # 未知类型,使用默认数量 print(f"未知节点类型 {node_type},使用默认数量") node_copy["计算数量"] = node_copy.get("数量", "1.0") result_nodes.append(node_copy) continue try: # 使用BCL计算器计算数量 result = calculator.calculate(calc_expr, context) # 打印调试信息 print(f"计算 {node.get('名称', '未知节点')} 的数量,表达式: {calc_expr}, 结果: {result}") # 处理计算结果 if hasattr(result, "value"): calculated_quantity = result.value elif isinstance(result, (int, float)): calculated_quantity = result else: # 尝试转换为浮点数 try: calculated_quantity = float(result) except (ValueError, TypeError): print(f"无法将计算结果转换为浮点数: {result}") calculated_quantity = float(node_copy.get("数量", "1.0") or "1.0") # 如果计算结果为0,尝试使用原始数量 if calculated_quantity == 0: orig_quantity = float(node_copy.get("数量", "0.0") or "0.0") if orig_quantity > 0: print(f"计算结果为0") calculated_quantity = 0 # 设置计算数量 node_copy["计算数量"] = str(calculated_quantity) except Exception as e: print(f"计算节点 '{node.get('名称', '未知节点')}' 的数量时出错: {e}") # 使用默认数量 node_copy["计算数量"] = node_copy.get("数量", "1.0") result_nodes.append(node_copy) return result_nodes def generate_node_key(node: Dict[str, Any], conditions: List[str]) -> str: """ 根据合并条件生成节点的唯一键,不再使用parent_id作为条件 """ key_parts = [] # 添加合并条件 for condition in conditions: # 排除与父节点相关的字段 if condition in ["parent_id", "parent_name", "id", "合并来源", "合并数量", "计算数量", "取整数量"]: print(f" 排除字段: {condition}") continue value = node.get(condition, "") # 确保值是字符串并进行标准化处理 if value is None: value = "" print(f" 字段 {condition} 的值为None,设置为空字符串") elif isinstance(value, (int, float)): # 对数值进行标准化处理,保留固定小数位 value = f"{float(value):.4f}" print(f" 字段 {condition} 的值是数值,标准化为: {value}") elif isinstance(value, str): # 对字符串进行标准化处理,去除空格 original = value value = value.strip() if original != value: print(f" 字段 {condition} 的值是字符串,去除空格: '{original}' -> '{value}'") else: # 其他类型转换为字符串 original = value value = str(value).strip() print(f" 字段 {condition} 的值是其他类型,转换为字符串: {type(original)} -> '{value}'") key_parts.append(f"{condition}:{value}") result = "|".join(key_parts) return result # 测试代码************************** def debug_node_differences(node1, node2, conditions): """比较两个节点在合并条件上的差异""" print(f"\n比较节点: {node1.get('名称', '未知')} 和 {node2.get('名称', '未知')}") for condition in conditions: val1 = node1.get(condition, "") val2 = node2.get(condition, "") if val1 != val2: print(f" 条件 '{condition}' 不同: '{val1}' != '{val2}'") # 进一步分析差异类型 if isinstance(val1, str) and isinstance(val2, str): if val1.strip() == val2.strip(): print(f" 差异仅在空格") elif val1.lower() == val2.lower(): print(f" 差异仅在大小写") # ************************** def merge_similar_nodes(nodes: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ 合并相似的节点 """ print(f"\n进入merge_similar_nodes函数,节点数量: {len(nodes)}") if not nodes: return [] # 按节点类型分组 nodes_by_type = {} for node in nodes: node_type = node.get("类型", "") if node_type not in nodes_by_type: nodes_by_type[node_type] = [] nodes_by_type[node_type].append(node) print(f"节点类型分组: {list(nodes_by_type.keys())}") merged_nodes = [] # 处理每种类型的节点 for node_type, type_nodes in nodes_by_type.items(): print(f"\n处理节点类型: {node_type}, 数量: {len(type_nodes)}") # 选择合并条件 if node_type == "人工" or node_type == "2": conditions = LABOR_MERGE_CONDITIONS elif node_type == "材料" or node_type == "3": conditions = MATERIAL_MERGE_CONDITIONS elif node_type == "机械" or node_type == "4": conditions = MACHINE_MERGE_CONDITIONS else: # 未知类型,不合并 print(f"未知节点类型: {node_type}, 不合并") merged_nodes.extend(type_nodes) continue print(f"使用合并条件: {conditions}") # 检查节点是否有所有合并条件字段 for i, node in enumerate(type_nodes): print(f"\n节点 {i+1}: {node.get('名称')}") missing_fields = [] for condition in conditions: if condition not in node: missing_fields.append(condition) if missing_fields: print(f" 缺少字段: {missing_fields}") # 打印节点的所有字段 print(" 节点字段:") for key, value in node.items(): print(f" {key}: {value}") # 使用字典存储合并后的节点,键是根据合并条件生成的唯一键 merged_dict = {} # 检查是否有parent_id或parent_name字段 has_parent_fields = False for node in type_nodes: if "parent_id" in node or "parent_name" in node: has_parent_fields = True print( f"发现parent字段: parent_id={node.get('parent_id', '无')}, parent_name={node.get('parent_name', '无')}" ) break if has_parent_fields: print("警告: 节点包含parent字段,这可能导致合并失败。考虑从合并键中排除这些字段。") for i, node in enumerate(type_nodes): print(f"\n处理节点 {i+1}: {node.get('名称')}") # 生成唯一键前,打印节点的合并条件字段 print(" 合并条件字段值:") for condition in conditions: value = node.get(condition, "") print(f" {condition}: '{value}' (类型: {type(value)})") key = generate_node_key(node, conditions) print(f" 生成的唯一键: {key}") # 检查键是否在字典中 if key in merged_dict: print(f" 键已存在,合并节点") # 合并节点 merged_node = merged_dict[key] # 打印合并前的数量 merged_quantity = float(merged_node.get("取整数量", "0.0") or "0.0") node_quantity = float(node.get("取整数量", "0.0") or "0.0") print(f" 合并数量: {merged_quantity} + {node_quantity} = {merged_quantity + node_quantity}") # 合并数量 merged_node["取整数量"] = str(merged_quantity + node_quantity) # 合并计算数量 merged_calc_quantity = float(merged_node.get("计算数量", "0.0") or "0.0") node_calc_quantity = float(node.get("计算数量", "0.0") or "0.0") merged_node["计算数量"] = str(merged_calc_quantity + node_calc_quantity) # 更新原始数量 merged_orig_quantity = float(merged_node.get("数量", "0.0") or "0.0") node_orig_quantity = float(node.get("数量", "0.0") or "0.0") merged_node["数量"] = str(merged_orig_quantity + node_orig_quantity) # 添加合并信息 if "合并来源" not in merged_node: # 使用唯一标识符,如果没有id字段则使用节点本身的索引 merged_node_id = merged_node.get("id", f"node_{id(merged_node)}") merged_node["合并来源"] = [merged_node_id] # 添加当前节点ID到合并来源 node_id = node.get("id", f"node_{id(node)}") merged_node["合并来源"].append(node_id) # 更新合并数量计数 merged_node["合并数量"] = str(len(merged_node["合并来源"])) else: print(f" 键不存在,创建新节点") # 创建新节点 merged_dict[key] = deepcopy(node) # 添加合并信息 node_id = node.get("id", f"node_{id(node)}") merged_dict[key]["合并来源"] = [node_id] merged_dict[key]["合并数量"] = "1" # 检查合并结果 print(f"\n合并后节点数量: {len(merged_dict)}") for key, node in merged_dict.items(): print(f" 键: {key}") print(f" 节点: {node.get('名称')}, 取整数量: {node.get('取整数量')}") if "合并来源" in node and len(node["合并来源"]) > 1: print(f" 合并来源数量: {len(node['合并来源'])}") # 将合并后的节点添加到结果列表 merged_nodes.extend(merged_dict.values()) return merged_nodes def cat_rcj_count(rcj_nodes: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ 处理人材机节点数量的变化并合并相似节点 """ print(f"进入cat_rcj_count函数,节点数量: {len(rcj_nodes)}") for node in rcj_nodes: print(f"节点: {node.get('名称')}, 计算数量: {node.get('计算数量')}") # 使用计算数量,不进行向上取整 processed_nodes = [] for node in rcj_nodes: node_copy = deepcopy(node) # 获取计算数量,直接使用,不再向上取整 calc_quantity = float(node_copy.get("计算数量", "0.0") or "0.0") node_copy["取整数量"] = str(calc_quantity) processed_nodes.append(node_copy) print(f"处理后节点数量: {len(processed_nodes)}") # 合并相似节点 merged_nodes = merge_similar_nodes(processed_nodes) print(f"合并后节点数量: {len(merged_nodes)}") for node in merged_nodes: print(f"合并后节点: {node.get('名称')}, 取整数量: {node.get('取整数量')}") return merged_nodes def calc_rcj_fee(rcj_nodes: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ 计算人材机节点的合价 Args: rcj_nodes: 人材机节点列表 Returns: List[Dict[str, Any]]: 计算合价后的人材机节点列表 """ result_nodes = [] for node in rcj_nodes: node_copy = deepcopy(node) # 获取取整数量 quantity = float(node_copy.get("取整数量", "0.0") or "0.0") # 获取预算价不含税和市场价不含税 budget_price = float(node_copy.get("预算价不含税", "0.0") or "0.0") market_price = float(node_copy.get("市场价不含税", "0.0") or "0.0") # 计算预算价合价和市场价合价 budget_total = quantity * budget_price market_total = quantity * market_price # 计算价差 price_diff = market_total - budget_total # 设置合价信息 node_copy["预算价合价"] = str(budget_total) node_copy["市场价合价"] = str(market_total) node_copy["价差"] = str(price_diff) result_nodes.append(node_copy) return result_nodes def format_rcj_output(rcj_nodes: List[Dict[str, Any]], node_type: str) -> List[Dict[str, Any]]: """ 格式化人材机节点输出,只保留需要的字段,不再保留父级节点信息 Args: rcj_nodes: 人材机节点列表 node_type: 节点类型("人工"、"材料"、"机械") Returns: List[Dict[str, Any]]: 格式化后的节点列表 """ formatted_nodes = [] for node in rcj_nodes: formatted_node = {} # 根据节点类型保留特定字段 if node_type == "人工": # 人工节点保存:编码,名称,单位,数量,预算价不含税,市场价不含税,预算价合价,市场价合价,价差 fields = [ "编码", "名称", "单位", "取整数量", "预算价不含税", "市场价不含税", "预算价合价", "市场价合价", "价差", ] for field in fields: formatted_node[field] = node.get(field, "") elif node_type == "材料": # 材料节点保存:供货方,编码,名称,单位,数量,预算价不含税,市场价不含税,预算价合价,市场价合价,价差 fields = [ "供货方", "编码", "名称", "单位", "取整数量", "预算价不含税", "市场价不含税", "预算价合价", "市场价合价", "价差", ] for field in fields: formatted_node[field] = node.get(field, "") elif node_type == "机械": # 机械节点保存:编码,名称,单位,数量,预算价不含税,市场价不含税,预算价合价,市场价合价,价差 fields = [ "编码", "名称", "单位", "取整数量", "预算价不含税", "市场价不含税", "预算价合价", "市场价合价", "价差", ] for field in fields: formatted_node[field] = node.get(field, "") # 重命名取整数量为数量 if "取整数量" in formatted_node: formatted_node["数量"] = formatted_node.pop("取整数量") formatted_nodes.append(formatted_node) return formatted_nodes def calculate_rcj_fees( json_file_path: str, project_name: str, project_guid: str = None, json_data: dict | None = None, ) -> Dict[str, Any]: """ 计算项目划分节点下所有人材机节点的合价 Args: json_file_path: JSON文件路径 project_name: 项目名称 project_guid: 项目GUID,用于区分同名项目 Returns: Dict[str, Any]: 计算结果,包含人工节点、材料节点和机械节点 """ # 加载项目数据 data, project_data, cost_setting, project_division, target_node = load_project_data( json_file_path, project_name, project_guid, json_data=json_data ) # 如果没有找到项目划分节点 if not target_node: print(f"警告: 未找到项目 '{project_name}' 的项目划分节点") return {} # 获取项目子节点 project_children = target_node.get("children", []) if not project_children: print(f"警告: 项目 '{project_name}' (GUID: {project_guid}) 没有工程量节点,跳过计算") return {} # 直接从工程量节点中提取人材机节点,保持父子关系 labor_nodes = [] # 人工节点 (节点, 父级节点) 元组列表 material_nodes = [] # 材料节点 (节点, 父级节点) 元组列表 machine_nodes = [] # 机械节点 (节点, 父级节点) 元组列表 # 工具:类型判断与资源拆分上提 def _is_resource_type(t): return t in ("人工", "材料", "机械", "2", "3", "4") def _normalize_type(t): return {"2": "人工", "3": "材料", "4": "机械"}.get(t, t) def _flatten_resource_splits(n, inherited_type=None, qty_factor: float = 1.0): """ 将可能带 children 的人材机节点递归拆平: - 若是人材机且有children:不计入父,children 继承本节点类型继续拆分; - 若是人材机且无children:作为叶子返回; - 若非人材机:递归其children以发现资源叶子。 返回叶子资源节点列表(dict)。 """ results = [] if not isinstance(n, dict): return results n_type = _normalize_type(n.get("类型") or n.get("type")) children = n.get("children") or [] if _is_resource_type(n_type): # 当前节点是人材机 current_type = _normalize_type(inherited_type or n_type) # 当前资源节点自身数量,默认1 try: self_qty = float((n.get("数量") or "1").strip()) except Exception: self_qty = 1.0 new_factor = qty_factor * self_qty if children: for ch in children: results.extend(_flatten_resource_splits(ch, current_type, new_factor)) else: leaf = deepcopy(n) leaf["类型"] = current_type # 叶子数量 = 父链乘积 * 自身(若叶子也有数量则已计入 new_factor) try: # 如果叶子本身还有数量字段,已包含在 new_factor 中,这里直接覆盖为乘积 leaf["数量"] = str(new_factor) except Exception: leaf["数量"] = str(new_factor) results.append(leaf) else: # 非资源,继续向下查找 for ch in children: results.extend(_flatten_resource_splits(ch, inherited_type, qty_factor)) return results # 递归处理工程量节点,提取人材机节点(对children内的人材机执行拆分上提) def process_nodes(nodes): for node in nodes: # 如果是定额节点,处理其材机列表和children if node.get("类型") == "定额" or node.get("类型") == "0": print(f"找到定额节点: {node.get('项目名称', node.get('name', '未命名'))}") # 处理材机列表 if "材机列表" in node and node["材机列表"]: print( f"定额节点 '{node.get('项目名称', node.get('name', '未命名'))}' 有材机列表,数量: {len(node['材机列表'])}" ) for rcj_node in node["材机列表"]: node_type = rcj_node.get("类型", rcj_node.get("type")) # 同时支持数字类型和字符串类型 if node_type in ["人工", "2"]: labor_nodes.append((rcj_node, node)) print( f"从材机列表找到人工节点: {rcj_node.get('名称', '未命名')}, 父节点: {node.get('项目名称', node.get('name', '未命名'))}" ) elif node_type in ["材料", "3"]: material_nodes.append((rcj_node, node)) print( f"从材机列表找到材料节点: {rcj_node.get('名称', '未命名')}, 父节点: {node.get('项目名称', node.get('name', '未命名'))}" ) elif node_type in ["机械", "4"]: machine_nodes.append((rcj_node, node)) print( f"从材机列表找到机械节点: {rcj_node.get('名称', '未命名')}, 父节点: {node.get('项目名称', node.get('name', '未命名'))}" ) else: print(f"定额节点 '{node.get('项目名称', node.get('name', '未命名'))}' 没有材机列表") # 处理children中的人材机节点 if "children" in node and node["children"]: print( f"定额节点 '{node.get('项目名称', node.get('name', '未命名'))}' 有子节点,数量: {len(node['children'])}" ) for child in node["children"]: # 将children中的资源节点拆分为叶子资源,排除中间父级 flattened = _flatten_resource_splits(child) for leaf in flattened: leaf_type = _normalize_type(leaf.get("类型") or leaf.get("type")) if leaf_type == "人工": labor_nodes.append((leaf, node)) print( f"从children(拆分后)找到人工节点: {leaf.get('名称', '未命名')}, 父节点: {node.get('项目名称', node.get('name', '未命名'))}" ) elif leaf_type == "材料": material_nodes.append((leaf, node)) print( f"从children(拆分后)找到材料节点: {leaf.get('名称', '未命名')}, 父节点: {node.get('项目名称', node.get('name', '未命名'))}" ) elif leaf_type == "机械": machine_nodes.append((leaf, node)) print( f"从children(拆分后)找到机械节点: {leaf.get('名称', '未命名')}, 父节点: {node.get('项目名称', node.get('name', '未命名'))}" ) else: print(f"定额节点 '{node.get('项目名称', node.get('name', '未命名'))}' 没有子节点") # 递归处理子节点 if "children" in node and node["children"]: process_nodes(node["children"]) # 处理所有工程量节点 process_nodes(project_children) print(f"找到 {len(labor_nodes)} 个人工节点, {len(material_nodes)} 个材料节点, {len(machine_nodes)} 个机械节点") # 如果仍然没有找到人材机节点,返回空结果 if not labor_nodes and not material_nodes and not machine_nodes: print(f"项目 '{project_name}' (GUID: {project_guid}) 没有人材机数据,不保存结果") return {} # 修改calc_rcj_count函数调用,直接传递父级节点对象而不是ID def calc_rcj_count_with_parent(rcj_nodes): """ 计算人材机节点的数量,考虑父级消耗量 Args: rcj_nodes: 人材机节点列表,每个元素是(节点, 父级节点)元组 Returns: List[Dict[str, Any]]: 计算数量后的人材机节点列表 """ result_nodes = [] # 检查rcj_nodes是否为空 if not rcj_nodes: print("没有找到人材机节点") return result_nodes # # 初始化BCL计算器 # if not init_bcl_calculator(): # print("初始化BCL计算器失败,使用默认数量") # # 使用默认数量处理所有人材机节点 # for node, parent_node in rcj_nodes: # node_copy = deepcopy(node) # node_copy["计算数量"] = node_copy.get("数量", "1.0") # # 保存父级信息 # node_copy["parent_id"] = parent_node.get("id", "") # node_copy["parent_name"] = parent_node.get("项目名称", parent_node.get("name", "未命名工程量")) # result_nodes.append(node_copy) # return result_nodes # 创建工程信息上下文(顶层上下文,优先使用 json_data) project_context = create_project_contexts(json_file_path=json_file_path, json_data=json_data) if json_data is not None: data = json_data else: with open(json_file_path, "r", encoding="utf-8") as file: data = json.load(file) processed_data = process_DXdata(data) DXITEM = [BCLDataSourceItem(item) for item in processed_data] # 遍历所有节点,计算数量 for node, parent_node in rcj_nodes: node_copy = deepcopy(node) # 保存父级信息 parent_id = parent_node.get("id", "") parent_name = parent_node.get("项目名称", parent_node.get("name", "未命名工程量")) node_copy["parent_id"] = parent_id node_copy["parent_name"] = parent_name # 确保父级节点有数量字段 if "数量" not in parent_node or not parent_node["数量"]: print(f"父级节点 {parent_name} 没有数量字段,使用默认数量") parent_node["数量"] = "1.0" # 打印父级节点信息 print(f"父级节点 {parent_name} 的数量: {parent_node.get('数量', '1.0')}") # 确保父级节点的类型字段正确设置 if "类型" in parent_node: parent_type = parent_node["类型"] print(f"父级节点 {parent_name} 的类型: {parent_type}") else: # 如果父级节点没有类型字段,默认设置为"定额" parent_node["类型"] = "定额" print(f"父级节点 {parent_name} 没有类型字段,设置为默认类型: 定额") # 处理 parent_node 的 "特征段" 字段 import re segment_value = parent_node.get("特征段", "") if isinstance(segment_value, str): match = re.search(r"\d+", segment_value) # 提取第一个连续数字 if match: parent_node["特征段"] = match.group() # 例如 "特征1" -> "1" else: parent_node["特征段"] = "" # 无法提取数字则清空或设为默认值 # 将父级节点转换为对象 parent_obj = create_node_from_type(parent_node) # 确保父级对象的type属性正确设置 if hasattr(parent_obj, "type") and not parent_obj.type: parent_obj.type = parent_node.get("类型", "定额") print(f"设置父级对象 {parent_name} 的type属性为: {parent_obj.type}") # 将人材机节点转换为对象 node_obj = create_material_or_equipment_from_node(node) dxitem_context = BCLDataSourceContext(DXITEM, project_context) dxitem_context.variables["@特征段地形系数"] = BCLVariant(DXITEM) # 使用对象创建数据源项 QuantityItem = BCLDataSourceItem(parent_obj, None, "定额") moeItem = BCLDataSourceItem(node_obj, QuantityItem) if hasattr(parent_obj, "type"): print(f"父级对象type属性值: {parent_obj.type}") # 创建数据源上下文 context = BCLDataSourceContext([moeItem], dxitem_context) # 根据节点类型选择不同的计算表达式 node_type = node.get("类型", node.get("type", "")) if node_type in ["人工", "2"]: calc_expr = "_材机合并人工数量" elif node_type in ["材料", "3"]: calc_expr = "_材机合并材料数量" elif node_type in ["机械", "4"]: calc_expr = "_材机合并机械数量" else: # 未知类型,使用默认数量 print(f"未知节点类型 {node_type},使用默认数量") node_copy["计算数量"] = node_copy.get("数量", "1.0") result_nodes.append(node_copy) continue try: # 使用BCL计算器计算数量 result = calculator.calculate(calc_expr, context) # 打印调试信息 print(f"计算 {node.get('名称', '未知节点')} 的数量,表达式: {calc_expr}, 结果: {result}") # 处理计算结果 if hasattr(result, "value"): calculated_quantity = result.value elif isinstance(result, (int, float)): calculated_quantity = result else: # 尝试转换为浮点数 try: calculated_quantity = float(result) except (ValueError, TypeError): print(f"无法将计算结果转换为浮点数: {result}") calculated_quantity = float(node_copy.get("数量", "1.0") or "1.0") # 如果计算结果为0,使用原始数量 if calculated_quantity == 0: orig_quantity = float(node_copy.get("数量", "0.0") or "0.0") if orig_quantity > 0: print(f"计算结果为0,使用原始数量: {orig_quantity}") calculated_quantity = orig_quantity # 设置计算数量 node_copy["计算数量"] = str(calculated_quantity) except Exception as e: print(f"计算节点 '{node.get('名称', '未知节点')}' 的数量时出错: {e}") # 使用默认数量 node_copy["计算数量"] = node_copy.get("数量", "1.0") result_nodes.append(node_copy) return result_nodes # 计算人工节点的数量和合价 labor_with_count = calc_rcj_count_with_parent(labor_nodes) print(f"calc_rcj_count_with_parent 返回节点数量: {len(labor_with_count)}") labor_with_cat = cat_rcj_count(labor_with_count) print(f"cat_rcj_count 返回节点数量: {len(labor_with_cat)}") labor_with_fee = calc_rcj_fee(labor_with_cat) print(f"calc_rcj_fee 返回节点数量: {len(labor_with_fee)}") labor_formatted = format_rcj_output(labor_with_fee, "人工") print(f"format_rcj_output 返回节点数量: {len(labor_formatted)}") # 计算材料节点的数量和合价 material_with_count = calc_rcj_count_with_parent(material_nodes) material_with_cat = cat_rcj_count(material_with_count) material_with_fee = calc_rcj_fee(material_with_cat) material_formatted = format_rcj_output(material_with_fee, "材料") # 计算机械节点的数量和合价 machine_with_count = calc_rcj_count_with_parent(machine_nodes) machine_with_cat = cat_rcj_count(machine_with_count) machine_with_fee = calc_rcj_fee(machine_with_cat) machine_formatted = format_rcj_output(machine_with_fee, "机械") # 直接返回合并后的结果,不再按工程量节点分组 result = {"人工节点": labor_formatted, "材料节点": material_formatted, "机械节点": machine_formatted} # 计算合价只用于调试显示 labor_fee = sum(float(node.get("预算价合价", "0.0") or "0.0") for node in labor_formatted) material_fee = sum(float(node.get("预算价合价", "0.0") or "0.0") for node in material_formatted) machine_fee = sum(float(node.get("预算价合价", "0.0") or "0.0") for node in machine_formatted) print(f"项目 '{project_name}': 人工合价={labor_fee}, 材料合价={material_fee}, 机械合价={machine_fee}") return result def calculate_resource_fees( json_file_path: str, project_name: str, project_guid: str = None, calculation_strategy=None, json_data: dict | None = None, ) -> str: """ 计算人材机合价 Args: json_file_path: JSON文件路径 project_name: 项目名称 project_guid: 项目GUID,用于区分同名项目 calculation_strategy: 计算策略,如果为None则使用默认策略 Returns: str: 输出文件路径,如果没有工程量节点则返回None """ # 如果没有提供计算策略,使用默认策略 if calculation_strategy is None: from equipment_calculation.calculation_strategy import DefaultCalculationStrategy calculation_strategy = DefaultCalculationStrategy() # 获取项目划分节点的GUID _, _, _, _, target_node = load_project_data(json_file_path, project_name, project_guid, json_data=json_data) # 如果传入了GUID但与节点的GUID不一致,优先使用传入的GUID node_guid = target_node.get("GUID") or target_node.get("guid", "") if target_node else "" project_guid = project_guid if project_guid else node_guid # 将文件名中的非法字符替换为下划线 safe_project_name = ( project_name.replace("/", "_") .replace("\\", "_") .replace(":", "_") .replace("*", "_") .replace("?", "_") .replace('"', "_") .replace("<", "_") .replace(">", "_") .replace("|", "_") ) # 将GUID中的非法字符替换为下划线 safe_project_guid = "" if project_guid: safe_project_guid = ( project_guid.replace("/", "_") .replace("\\", "_") .replace(":", "_") .replace("*", "_") .replace("?", "_") .replace('"', "_") .replace("<", "_") .replace(">", "_") .replace("|", "_") .replace("{", "") .replace("}", "") ) # 添加下划线作为分隔符 safe_project_guid = f"_{safe_project_guid}" # 设置输出目录和文件名 # 检查calculation_strategy是否有get_output_dir方法 if hasattr(calculation_strategy, "get_output_dir"): output_dir = calculation_strategy.get_output_dir() else: # 兼容旧代码,使用默认目录 output_dir = "计算结果" os.makedirs(output_dir, exist_ok=True) rcj_output_file = os.path.join(output_dir, f"{safe_project_name}{safe_project_guid}_rcj_fees.json") # 计算人材机节点的合价,传递project_guid参数 # 这里使用 calculation_strategy 的 calculate_rcj_fees 方法 if hasattr(calculation_strategy, "calculate_rcj_fees"): try: rcj_results = calculation_strategy.calculate_rcj_fees( json_file_path, project_name, project_guid, json_data=json_data ) except TypeError: # 向后兼容:旧策略不支持 json_data 形参 rcj_results = calculation_strategy.calculate_rcj_fees(json_file_path, project_name, project_guid) else: # 如果计算策略没有实现 calculate_rcj_fees 方法,使用原始函数 rcj_results = calculate_rcj_fees(json_file_path, project_name, project_guid, json_data=json_data) # 检查是否有人材机数据 has_data = False for node_type in ["人工节点", "材料节点", "机械节点"]: if rcj_results.get(node_type) and len(rcj_results[node_type]) > 0: has_data = True break if not has_data: print(f"项目 '{project_name}' (GUID: {project_guid}) 没有人材机数据,不保存结果") return None # 保存人材机合价计算结果到JSON文件 with open(rcj_output_file, "w", encoding="utf-8") as f: json.dump(rcj_results, f, ensure_ascii=False, indent=2) print(f"\n人材机合价计算结果已保存到 {rcj_output_file}") return rcj_output_file