import json import os from copy import deepcopy from typing import Dict, Any, Optional, Set # 添加全局缓存 _bill_node_cache = {} def find_project_division_node(node, project_name, project_guid=None, result=None): """递归查找指定项目名称的项目划分节点,可选匹配GUID""" if result is not None and result: return result if isinstance(node, dict): # 检查是否是项目划分节点 is_project_division = node.get("type") == "项目划分" # 检查项目名称是否匹配 name_matches = node.get("项目名称") == project_name # 检查GUID是否匹配(如果提供了GUID) guid_matches = True if project_guid: node_guid = node.get("GUID") or node.get("guid") guid_matches = node_guid == project_guid # 如果是项目划分节点,且名称和GUID都匹配(或没有提供GUID) if is_project_division and name_matches and guid_matches: return [node] # 递归查找子节点 for key, value in node.items(): if isinstance(value, (dict, list)): result = find_project_division_node(value, project_name, project_guid, result) if result: return result elif isinstance(node, list): for item in node: if isinstance(item, (dict, list)): result = find_project_division_node(item, project_name, project_guid, result) if result: return result return result or [] def find_cost_table(cost_setting, table_name): """在costSetting中查找指定名称的取费表""" if isinstance(cost_setting, dict): if cost_setting.get("name") == table_name: return cost_setting for key, value in cost_setting.items(): if isinstance(value, (dict, list)): result = find_cost_table(value, table_name) if result: return result elif isinstance(cost_setting, list): for item in cost_setting: if isinstance(item, (dict, list)): result = find_cost_table(item, table_name) if result: return result return None def map_quantity_node_types(node): """映射工程量节点的类型和费用类型""" if not isinstance(node, dict): return node # 复制节点以避免修改原始数据 node_copy = deepcopy(node) # 先根据“配件判定规则”标准化类型: # 若 节点的 type/类型 == "配件" 且 配型 == "配件",则检查 配件类型: # - 配件类型 == "主材" -> 作为 主材 节点 # - 配件类型 == "配件" -> 作为 设备 节点 raw_type = node_copy.get("类型") if node_copy.get("类型") is not None else node_copy.get("type") raw_peixing = node_copy.get("配型") # 放宽条件:当 类型/type 为“配件”,且(配型为“配件”或缺省),并且存在“配件类型”时,按配件类型映射 if str(raw_type) == "配件" and (raw_peixing in (None, "", "配件")): pj_type = node_copy.get("配件类型") if pj_type == "主材": node_copy["类型"] = "主材" node_copy["type"] = "主材" elif pj_type == "配件": node_copy["类型"] = "设备" node_copy["type"] = "设备" # 映射类型字段 if "类型" in node_copy: type_mapping = {"0": "定额", "1": "主材", "5": "设备"} if node_copy["类型"] in type_mapping: node_copy["类型"] = type_mapping[node_copy["类型"]] # 映射费用类型字段 if "费用类型" in node_copy: if node_copy["费用类型"] == "0": node_copy["费用类型"] = "取费" # 如果已经是"取费"则保持不变 # 不在此处递归 children,避免对子节点进行类型标准化,交由各层独立处理 # if isinstance(node_copy.get("children"), list): # node_copy["children"] = [map_quantity_node_types(child) for child in node_copy["children"]] return node_copy def map_resource_node_types(node): """映射人材机节点的类型""" if not isinstance(node, dict): return node # 复制节点以避免修改原始数据 node_copy = deepcopy(node) # 统一标准化类型字段:同时规范化 '类型' 与 'type',输出为中文(人工/材料/机械) type_mapping = {"2": "人工", "3": "材料", "4": "机械"} # 取原始值(可能存在任一字段) raw_t = node_copy.get("类型") if node_copy.get("类型") is not None else node_copy.get("type") if raw_t is not None: # 若为代码,转换为中文;若已为中文则保持 normalized = type_mapping.get(str(raw_t), raw_t) node_copy["类型"] = normalized node_copy["type"] = normalized # 递归处理子节点 if "children" in node_copy and node_copy["children"]: node_copy["children"] = [map_resource_node_types(child) for child in node_copy["children"]] return node_copy def extract_resource_nodes(node, parent_id=None): """ 提取人材机节点并保持父子结构 Args: node: 节点 parent_id: 父级节点ID Returns: list: 提取的人材机节点列表,每个节点包含parent_id字段 """ if not isinstance(node, dict): return [] # 返回空列表而不是None,便于后续处理 # 类型工具 def _is_resource_type(t): return t in ("人工", "材料", "机械", "2", "3", "4") str2code = {"人工": "2", "材料": "3", "机械": "4"} code2str = {v: k for k, v in str2code.items()} # 获取当前节点ID current_id = node.get("id") or node.get("GUID") or node.get("guid") resource_nodes = [] # 检查是否是人材机节点(支持字符串和数字类型、双字段) node_type = node.get("类型") or node.get("type") is_resource_node = _is_resource_type(node_type) # 合并处理材机列表和children字段 # 处理逻辑: # - 若为资源节点且存在children:不返回父节点;其children继承父类型后继续递归提取 # - 若为资源节点且无children:返回该节点 # - 若非资源节点:常规递归处理children/材机列表 # 优先处理“材机列表” child_nodes = [] if "材机列表" in node and node["材机列表"]: child_nodes = node["材机列表"] elif "children" in node and node["children"]: child_nodes = node["children"] if is_resource_node: if child_nodes: # 父为资源且有子:上提子项,子项继承父类型(同时设置中英文字段),并递归 parent_code = str2code.get(node_type, node_type) parent_str = code2str.get(node_type, node_type) for child in child_nodes: child_copy = deepcopy(child) if isinstance(child_copy, dict): child_copy["parent_id"] = current_id child_copy["type"] = parent_code child_copy["类型"] = parent_str sub_resources = extract_resource_nodes(child_copy, current_id) if sub_resources: resource_nodes.extend(sub_resources) else: # 基本类型直接忽略 continue else: # 父为资源且无子:直接计入 node_copy = deepcopy(node) node_copy["parent_id"] = parent_id # 统一双字段 node_copy["type"] = str2code.get(node_type, node_type) node_copy["类型"] = code2str.get(node_type, node_type) resource_nodes.append(node_copy) else: # 非资源节点:递归其子节点 for child in child_nodes: child_copy = deepcopy(child) if isinstance(child_copy, dict): child_copy["parent_id"] = current_id sub_resources = extract_resource_nodes(child_copy, current_id) if sub_resources: resource_nodes.extend(sub_resources) return resource_nodes def process_project_children(children, software_type=None): """处理项目子节点,分离工程量节点和人材机节点,支持嵌套的工程量节点""" if not children: return None, None quantity_nodes = [] resource_nodes = [] # 内部工具:在人材机存在拆分时展开其子节点,删除中间父级人材机节点 def _flatten_resource_splits_in_place(node: dict): """ 递归+迭代地展开任意深度的人材机拆分,依据新规则: - 材料:仅当 "拆分"=1 时展开子项并删除父节点;否则保留父节点、删除 children - 机械:仅当 "拆分"=0 时保留父节点、删除 children;否则展开子项并删除父节点 - 人工:始终展开(无拆分控制) - 商品砼=1:最高优先级,保留父节点,删除 children - 同时处理 node["children"] 和 node["材机列表"] """ if not isinstance(node, dict): return # 提前退出:无 children 且无 材机列表 if ("children" not in node or not node["children"]) and ("材机列表" not in node or not node["材机列表"]): return def _is_resource_type(t): if t in ("人工", "材料", "机械"): return True return str(t) in ("2", "3", "4") # 类型映射 str2code = {"人工": "2", "材料": "3", "机械": "4"} code2str = {v: k for k, v in str2code.items()} def _get_normalized_type(t): """将类型统一转为中文(用于判断材料/机械/人工)""" if t in ("人工", "材料", "机械"): return t if str(t) == "2": return "人工" if str(t) == "3": return "材料" if str(t) == "4": return "机械" return None def _process_node_list(node_list): if not node_list: return node_list, False changed = False new_list = [] for ch in node_list: if isinstance(ch, dict): t = ch.get("类型") or ch.get("type") normalized_type = _get_normalized_type(t) # 只处理人材机类型 if normalized_type and ch.get("children"): # 规则1:商品砼=1 → 保留父节点,清空 children(最高优先级) sg_concrete = ch.get("商品砼") if sg_concrete == "1" or sg_concrete == 1: ch.pop("children", None) new_list.append(ch) changed = True continue # 规则2:根据类型和“拆分”字段决定行为 split_flag = ch.get("拆分") should_expand = False if normalized_type == "人工": # 人工:始终展开 should_expand = True elif normalized_type == "材料": # 材料:仅当 拆分=1 时展开 should_expand = split_flag == "1" or split_flag == 1 elif normalized_type == "机械": # 机械:仅当 拆分≠0 时展开(即 拆分=0 时不展开) should_expand = not (split_flag == "0" or split_flag == 0) if should_expand: # 展开:上提 children,丢弃父节点;保持子级原有 type/类型 不变 for gc in ch.get("children", []) or []: new_list.append(gc) changed = True continue else: # 不展开:保留父节点,删除 children ch.pop("children", None) new_list.append(ch) changed = True continue # 非人材机类型,或无人材机 children:递归处理并保留 _flatten_resource_splits_in_place(ch) new_list.append(ch) else: new_list.append(ch) return new_list, changed # 迭代直到稳定 changed = True while changed: changed = False if "children" in node and node["children"]: new_children, c1 = _process_node_list(node["children"]) if c1: node["children"] = new_children changed = True if "材机列表" in node and node["材机列表"]: new_materials, c2 = _process_node_list(node["材机列表"]) if c2: node["材机列表"] = new_materials changed = True for child in children: # 先复制两份:一份用于工程量树(quantity_node),一份用于资源提取(resource_node_src) quantity_node = deepcopy(child) resource_node_src = deepcopy(child) # 额外保留一份原始工程量节点用于最终输出,保持原样不作修改 original_quantity_node = deepcopy(child) # 在两份结构中都先执行“人材机拆分展开”,以便: # 1) 工程量树不保留中间的人材机父节点 # 2) 资源提取时直接提取到展开后的子资源,且不包含中间父节点 _flatten_resource_splits_in_place(quantity_node) _flatten_resource_splits_in_place(resource_node_src) # 应用工程量节点类型映射 quantity_node = map_quantity_node_types(quantity_node) # 在“技改/主网”下,对定额节点做嵌套检测(children 或 材机列表 中包含 定额/主材/设备 或 配件类型为 主材/配件) if software_type in ("技改", "主网"): def _type_is_quota_material_equipment(t): # 支持代码与中文类型名 return t in ("定额", "主材", "设备", "0", "1", "5") def _collect_nested_matches(node): matches = [] # 检查 children for ch in node.get("children", []) or []: if isinstance(ch, dict): t = ch.get("类型") or ch.get("type") pj_t = ch.get("配件类型") if _type_is_quota_material_equipment(t) or pj_t in ("主材", "配件"): matches.append( { "来源": "children", "名称": ch.get("项目名称") or ch.get("清单名称") or ch.get("name"), "类型": t, "配件类型": pj_t, "id": ch.get("GUID") or ch.get("guid") or ch.get("id"), } ) # 递归继续检查更深层的嵌套 sub = _collect_nested_matches(ch) if sub: matches.extend(sub) # 检查 材机列表(如存在) for it in node.get("材机列表", []) or []: if isinstance(it, dict): t = it.get("类型") or it.get("type") pj_t = it.get("配件类型") if _type_is_quota_material_equipment(t) or pj_t in ("主材", "配件"): matches.append( { "来源": "材机列表", "名称": it.get("项目名称") or it.get("清单名称") or it.get("name") or it.get("材料名称"), "类型": t, "配件类型": pj_t, "id": it.get("GUID") or it.get("guid") or it.get("id"), } ) return matches node_type = quantity_node.get("类型") or quantity_node.get("type") if node_type in ("定额", "0"): nested_matches = _collect_nested_matches(quantity_node) if nested_matches: quantity_node["嵌套检测"] = { "存在嵌套": True, "命中数量": len(nested_matches), "命中项": nested_matches, } # 提取人材机节点 - 基于已展开的 resource_node_src 提取 resources = extract_resource_nodes(resource_node_src) if resources: # 应用人材机节点类型映射 resources = [map_resource_node_types(resource) for resource in resources] resource_nodes.extend(resources) # 从工程量节点中移除材机列表 if "材机列表" in quantity_node: del quantity_node["材机列表"] # 清理子节点中的人材机节点 clean_resource_nodes_from_quantity(quantity_node) # 🔥 核心修改:仅当当前节点是"定额"时,才递归处理其children并提取嵌套的工程量节点 current_node_type = quantity_node.get("类型") or quantity_node.get("type") is_quota_node = current_node_type in ("定额", "0") sub_quantity_nodes = None sub_resource_nodes = None if is_quota_node and "children" in quantity_node and quantity_node["children"]: # 处理定额的人材机节点 # 关键:递归输入改为原始副本的 children,避免上一层标准化对子层“原始类型字段”的影响 sub_quantity_nodes, sub_resource_nodes = process_project_children( (original_quantity_node.get("children") or []), software_type ) # 更新当前节点的 children(保留结构) if sub_quantity_nodes: quantity_node["children"] = sub_quantity_nodes # 关键:将子层的工程量节点(如主材/设备)直接并入当前层输出, # 并确保类型为标准化后的中文值(主材/设备),避免后续 BCL 过滤不到 # 不修改原始子节点类型,仅做深拷贝并并入 normalized_children = [deepcopy(n) for n in sub_quantity_nodes] quantity_nodes.extend(normalized_children) # 调试:打印并入的子工程量节点类型 try: for ch in normalized_children: nm = ( ch.get("项目名称") or ch.get("清单名称") or ch.get("name") or ch.get("材料名称") or "<未命名>" ) tp = ch.get("类型") or ch.get("type") # print(f"[DEBUG] 并入子工程量节点: 名称={nm} | 类型={tp}") except Exception: pass else: quantity_node.pop("children", None) # 合并资源节点 if sub_resource_nodes: resource_nodes.extend(sub_resource_nodes) # else: # 非定额节点:不递归提取子节点作为工程量,不处理 sub_quantity_nodes # 对于定额节点:将“材机列表”里符合条件的项(主材/设备或配件类型为主材/配件)也作为工程量节点并入 if is_quota_node: mj_items = (quantity_node.get("材机列表") or []) if isinstance(quantity_node, dict) else [] for it in mj_items: if not isinstance(it, dict): continue t = it.get("类型") or it.get("type") pj_t = it.get("配件类型") # 类型代码转中文 if t in ("1", "5"): t = {"1": "主材", "5": "设备"}[t] new_type = None if t in ("主材", "设备"): new_type = t elif pj_t == "主材": new_type = "主材" elif pj_t == "配件": new_type = "设备" if new_type: # 不改写原始字段,直接将条目拷贝加入,类型由下游对象构造派生 new_node = deepcopy(it) # 修正:并入前应用拆分展开逻辑,确保 商品砼=1 的资源不再携带 children _flatten_resource_splits_in_place(new_node) quantity_nodes.append(new_node) try: nm = ( new_node.get("项目名称") or new_node.get("清单名称") or new_node.get("name") or new_node.get("材料名称") or "<未命名>" ) # print(f"[DEBUG] 从材机列表并入工程量节点: 名称={nm} | 类型={t} | 派生类型={new_type}") except Exception: pass # 🔥 最后:将当前节点本身加入 quantity_nodes # 为满足“工程量节点输出保持原样”,此处使用 original_quantity_node 代替处理后的 quantity_node # 规则: # - 若当前节点为“主材/设备(1/5)”,直接计入工程量节点; # - 若为“定额(0)”,沿用既有两个条件之一成立时计入; def _children_have_no_children(n: dict) -> bool: ch = (n.get("children") or []) if isinstance(n, dict) else [] # 若无 children,则视作“没有更深层 children” if not ch: return True for c in ch: if isinstance(c, dict) and (c.get("children") or []): return False return True def _mj_has_hit(n: dict) -> bool: items = (n.get("材机列表") or []) if isinstance(n, dict) else [] for it in items: if not isinstance(it, dict): continue t = it.get("类型") or it.get("type") pj_t = it.get("配件类型") if t in ("定额", "主材", "设备", "0", "1", "5") or pj_t in ("主材", "配件"): return True return False # 主材/设备/一笔性费用直接计入(以标准化类型入列,避免后续 BCL 过滤不到) if current_node_type in ("主材", "设备", "一笔性费用", "1", "5"): to_append = deepcopy(original_quantity_node) # 修正:对即将入列的节点应用人材机拆分展开逻辑, # 以生效“商品砼=1:保留父节点并去除其 children”的规则 _flatten_resource_splits_in_place(to_append) quantity_nodes.append(to_append) try: nm = ( to_append.get("项目名称") or to_append.get("清单名称") or to_append.get("name") or to_append.get("材料名称") or "<未命名>" ) tp = to_append.get("类型") or to_append.get("type") # print(f"[DEBUG] 入列工程量节点(主材/设备): 名称={nm} | 类型={tp}") except Exception: pass # 定额按条件计入 elif is_quota_node: if _children_have_no_children(quantity_node) or _mj_has_hit(quantity_node): to_append = deepcopy(original_quantity_node) # 修正:定额节点入列时也先应用拆分展开逻辑, # 保证“商品砼=1 的资源子项不含 children” _flatten_resource_splits_in_place(to_append) quantity_nodes.append(to_append) try: nm = ( to_append.get("项目名称") or to_append.get("清单名称") or to_append.get("name") or to_append.get("材料名称") or "<未命名>" ) tp = to_append.get("类型") or to_append.get("type") # print(f"[DEBUG] 入列工程量节点(定额): 名称={nm} | 类型={tp}") except Exception: pass # 始终返回列表,避免上层出现 None 导致判定困难 return (quantity_nodes or [], resource_nodes or []) def clean_resource_nodes_from_quantity(node): """递归清理工程量节点中的人材机节点""" if not isinstance(node, dict): return # 清理材机列表 if "材机列表" in node: del node["材机列表"] # 清理子节点中的人材机节点 if "children" in node and node["children"]: # 过滤掉人材机类型的子节点 node["children"] = [ c for c in node["children"] if not (isinstance(c, dict) and c.get("类型") in ["人工", "材料", "机械"]) ] # 递归清理剩余子节点 for child in node["children"]: clean_resource_nodes_from_quantity(child) # 如果没有子节点了,删除children属性 if not node["children"]: del node["children"] def load_project_data(json_file_path, project_name, project_guid=None, json_data=None): """加载JSON数据并获取目标项目节点""" try: # 支持透传已加载的 JSON 数据,避免重复读取 if json_data is not None: data = json_data else: # 读取JSON文件 with open(json_file_path, "r", encoding="utf-8") as f: data = json.load(f) # 获取projectData中的costSetting和projectDivision project_data = data.get("projectData", {}) cost_setting = project_data.get("costSetting", {}) project_division = project_data.get("projectDivision", {}) # 查找指定项目名称和GUID的节点 target_nodes = find_project_division_node(project_division, project_name, project_guid) if not target_nodes: if project_guid: print(f"未找到项目名称为 '{project_name}' 且GUID为 '{project_guid}' 的节点") else: print(f"未找到项目名称为 '{project_name}' 的节点") return None, None, None, None, None # 获取找到的节点 target_node = target_nodes[0] return data, project_data, cost_setting, project_division, target_node except Exception as e: print(f"加载项目数据时出错: {e}") return None, None, None, None, None def get_cost_table_children(json_file_path, project_name, project_guid=None, json_data=None): """获取取费表子节点""" try: # 加载项目数据,传递project_guid参数 data, project_data, cost_setting, project_division, target_node = load_project_data( json_file_path, project_name, project_guid, json_data=json_data ) if not target_node: return None # 获取取费表名称 fee_table_name = target_node.get("取费表") if not fee_table_name: print(f"目标项目划分节点中没有'取费表'字段") return None # 查找对应的取费表 cost_table = find_cost_table(cost_setting, fee_table_name) cost_table_children = cost_table.get("children", None) if cost_table else None return cost_table_children except Exception as e: print(f"获取取费表子节点时出错: {e}") return None def get_quantity_nodes(json_file_path, project_name, engineering_type, project_guid=None, software_type=None): """ 获取工程量节点 Args: json_file_path: JSON文件路径 project_name: 项目名称 engineering_type: 工程类型 project_guid: 项目GUID,用于区分同名项目 Returns: list: 工程量节点列表,如果没有找到项目划分节点则返回None,如果找到项目划分节点但没有工程量节点则返回空列表 """ try: # 加载项目数据,传递project_guid参数 data, project_data, cost_setting, project_division, target_node = load_project_data( json_file_path, project_name, project_guid ) if not target_node: return None # 没有找到项目划分节点 # 处理项目子节点 project_children = target_node.get("children", None) if not project_children: print(f"项目 '{project_name}' (GUID: {project_guid}) 没有子节点(工程量节点)") return [] # 找到项目划分节点,但没有子节点(工程量节点) # 读取项目划分节点上的“专业类型”属性 project_specialty_type = target_node.get("专业类型") # 如果是清单工程,需要先找到清单节点,然后获取其下的工程量节点 if engineering_type == "清单工程": quantity_nodes = [] print(f"开始处理清单工程,项目名称: {project_name}") # 递归查找真正的清单节点 def find_true_bill_nodes(node_list, parent_path=""): result = [] for node in node_list: # 检查是否是清单节点 is_bill_node = ( node.get("类型") == "8" or node.get("类型") == "清单" or node.get("type") == "8" or node.get("type") == "清单" or "清单名称" in node or "清单全码" in node ) node_name = node.get("清单名称", node.get("项目名称", "未命名")) # 强制使用GUID而不是ID node_id = node.get("GUID") or node.get("guid") or node.get("id", "未知ID") current_path = f"{parent_path}/{node_name}" if parent_path else node_name if is_bill_node: print(f"找到清单节点: ID={node_id}, 名称={node_name}, 路径={current_path}") # 检查清单节点是否有取费表名称 has_fee_table = "取费表名称" in node or "取费表" in node if has_fee_table: fee_table_name = node.get("取费表名称", node.get("取费表", "未知")) print(f"清单节点有取费表: {fee_table_name}") # 添加到缓存 if node_id != "未知ID": _bill_node_cache[node_id] = node print(f"缓存清单节点: ID={node_id}") result.append(node) else: print(f"警告:清单节点没有取费表名称: {node_name}") # 递归处理子节点 if "children" in node and node["children"]: result.extend(find_true_bill_nodes(node["children"], current_path)) return result # 查找所有真正的清单节点 bill_nodes = find_true_bill_nodes(project_children) print(f"找到 {len(bill_nodes)} 个有效清单节点") # 处理每个清单节点 for bill_node in bill_nodes: # 强制使用GUID而不是ID bill_id = bill_node.get("GUID") or bill_node.get("guid") or bill_node.get("id") bill_name = bill_node.get("清单名称", bill_node.get("项目名称", "未命名清单")) bill_children = bill_node.get("children", []) if bill_children: # 处理清单节点的子节点,获取工程量节点 bill_quantity_nodes, _ = process_project_children(bill_children, software_type) if bill_quantity_nodes: print(f"清单节点 '{bill_name}' 下找到 {len(bill_quantity_nodes)} 个工程量节点") # 为每个工程量节点设置清单节点信息 for node in bill_quantity_nodes: # 存储关键信息而不是整个对象 # 强制使用GUID而不是ID node["bill_guid"] = bill_id # 新增GUID字段 node["bill_id"] = bill_id # 保持兼容性 node["bill_name"] = bill_name node["取费表名称"] = bill_node.get("取费表名称", bill_node.get("取费表", "")) # 设置parent_id以保持兼容性 node["parent_id"] = bill_id print(f"设置工程量节点 '{node.get('项目名称', '未命名')}' 的清单节点信息") # 添加这一行,将完整的清单节点添加到工程量节点中 node["bill_node"] = bill_node # 继承项目划分的“专业类型”到工程量节点(若存在且节点未设置) if project_specialty_type is not None and "专业类型" not in node: node["专业类型"] = project_specialty_type quantity_nodes.extend(bill_quantity_nodes) else: print(f"清单节点 '{bill_name}' 下未找到工程量节点") else: print(f"清单节点 '{bill_name}' 没有子节点") # 如果没有找到任何工程量节点,尝试直接获取定额节点 if not quantity_nodes: print("未找到清单节点下的工程量节点,尝试直接获取定额节点...") def find_quota_nodes(node_list): result = [] for node in node_list: # 检查是否是定额节点 is_quota_node = node.get("类型") == "定额" or node.get("type") == "定额" if is_quota_node: result.append(node) # 递归处理子节点 if "children" in node and node["children"]: result.extend(find_quota_nodes(node["children"])) return result # 查找所有定额节点 quota_nodes = find_quota_nodes(project_children) if quota_nodes: print(f"找到 {len(quota_nodes)} 个定额节点") # 检查这些定额节点是否有清单节点信息 for node in quota_nodes: parent_node = None parent_id = node.get("parent_id") # 在项目中查找父节点 def find_node_by_id(node_list, node_id): for n in node_list: # 强制使用GUID而不是ID进行匹配 node_guid = n.get("GUID") or n.get("guid") or n.get("id") if node_guid == node_id: return n if "children" in n and n["children"]: found = find_node_by_id(n["children"], node_id) if found: return found return None if parent_id: parent_node = find_node_by_id(project_children, parent_id) # 如果找到父节点且是清单节点,使用其信息 if parent_node: is_bill_node = ( parent_node.get("类型") == "8" or parent_node.get("类型") == "清单" or parent_node.get("type") == "8" or parent_node.get("type") == "清单" or "清单名称" in parent_node or "清单全码" in parent_node ) if is_bill_node: # 强制使用GUID而不是ID bill_id = parent_node.get("GUID") or parent_node.get("guid") or parent_node.get("id") bill_name = parent_node.get("清单名称", parent_node.get("项目名称", "未命名清单")) # 强制使用GUID而不是ID node["bill_guid"] = bill_id # 新增GUID字段 node["bill_id"] = bill_id # 保持兼容性 node["bill_name"] = bill_name node["取费表名称"] = parent_node.get("取费表名称", parent_node.get("取费表", "")) # 设置parent_id以保持兼容性 node["parent_id"] = bill_id print(f"为定额节点 '{node.get('项目名称', '未命名')}' 设置清单节点信息") quantity_nodes.extend(quota_nodes) return quantity_nodes else: # 预算工程 - 直接获取工程量节点 quantity_nodes, _ = process_project_children(project_children, software_type) # 如果没有工程量节点,返回空列表而不是None if not quantity_nodes: print(f"项目 '{project_name}' (GUID: {project_guid}) 没有工程量节点") return [] # 继承项目划分的“专业类型”到工程量节点(若存在且节点未设置) if project_specialty_type is not None: for node in quantity_nodes: if isinstance(node, dict) and "专业类型" not in node: node["专业类型"] = project_specialty_type return quantity_nodes except Exception as e: print(f"获取工程量节点时出错: {e}") import traceback traceback.print_exc() # 打印详细错误堆栈 return None def get_resource_nodes(json_file_path, project_name, project_guid=None): """ 获取人材机节点 Args: json_file_path: JSON文件路径 project_name: 项目名称 project_guid: 项目GUID,用于区分同名项目 Returns: list: 人材机节点列表 """ try: # 加载项目数据,传递project_guid参数 data, project_data, cost_setting, project_division, target_node = load_project_data( json_file_path, project_name, project_guid ) if not target_node: return None # 获取项目子节点 project_children = target_node.get("children", None) if not project_children: print(f"项目 '{project_name}' (GUID: {project_guid}) 没有子节点") return [] # 提取人材机节点 resource_nodes = [] for child in project_children: # 提取人材机节点并保持父子结构 nodes = extract_resource_nodes(child) if nodes: resource_nodes.extend(nodes) # 映射节点类型 resource_nodes = [map_resource_node_types(node) for node in resource_nodes] return resource_nodes except Exception as e: print(f"获取人材机节点时出错: {e}") return None def get_classified_resource_nodes(json_file_path, project_name, project_guid=None): """ 获取分类后的人材机节点 Args: json_file_path: JSON文件路径 project_name: 项目名称 project_guid: 项目GUID,用于区分同名项目 Returns: tuple: (人工节点列表, 材料节点列表, 机械节点列表),每个列表包含(节点, 父级ID)元组 """ # 获取所有人材机节点,传递project_guid参数 resource_nodes = get_resource_nodes(json_file_path, project_name, project_guid) if not resource_nodes: return [], [], [] # 分类存储 labor_nodes = [] # 人工节点 material_nodes = [] # 材料节点 machine_nodes = [] # 机械节点 # 递归函数,用于处理节点及其子节点 def process_node(node): node_type = node.get("类型") parent_id = node.get("parent_id") # 同时支持数字类型和字符串类型 if node_type in ["人工", "2"]: node_type = "人工" # 统一转换为字符串类型 labor_nodes.append((node, parent_id)) elif node_type in ["材料", "3"]: node_type = "材料" # 统一转换为字符串类型 material_nodes.append((node, parent_id)) elif node_type in ["机械", "4"]: node_type = "机械" # 统一转换为字符串类型 machine_nodes.append((node, parent_id)) # 更新类型字段 node["类型"] = node_type # 处理子节点 if "children" in node and node["children"]: for child in node["children"]: # 确保子节点有父级ID if "parent_id" not in child: child["parent_id"] = node.get("id") process_node(child) # 处理所有节点 for node in resource_nodes: process_node(node) return labor_nodes, material_nodes, machine_nodes def find_bill_node_by_id(node, bill_id, result=None): """递归查找指定ID的清单节点""" if result is not None and result: return result if isinstance(node, dict): # 检查当前节点是否是目标清单节点 node_id_matches = node.get("id") == bill_id or node.get("GUID") == bill_id if node_id_matches: # 检查是否是清单节点 is_bill_node = ( node.get("类型") == "8" or node.get("类型") == "清单" or node.get("type") == "8" or node.get("type") == "清单" or "清单名称" in node or "清单全码" in node ) if is_bill_node: print(f"找到清单节点 ID={bill_id}, 类型={node.get('类型', node.get('type', '未知'))}") return [node] else: print( f"找到ID匹配但不是清单节点的节点: ID={bill_id}, 类型={node.get('类型', node.get('type', '未知'))}" ) # 不返回非清单节点 # 递归查找子节点 if "children" in node and isinstance(node["children"], list): for child in node["children"]: result = find_bill_node_by_id(child, bill_id, result) if result: return result # 检查其他字段 for key, value in node.items(): if key != "children" and isinstance(value, (dict, list)): result = find_bill_node_by_id(value, bill_id, result) if result: return result elif isinstance(node, list): for item in node: if isinstance(item, (dict, list)): result = find_bill_node_by_id(item, bill_id, result) if result: return result return result or [] def get_bill_cost_table(json_file_path, bill_id): """获取清单节点的取费表子节点""" try: # 读取JSON文件 with open(json_file_path, "r", encoding="utf-8") as f: data = json.load(f) # 获取projectData中的costSetting和projectDivision project_data = data.get("projectData", {}) cost_setting = project_data.get("costSetting", {}) project_division = project_data.get("projectDivision", {}) print(f"正在查找清单节点ID: {bill_id}") # 查找指定ID的清单节点 bill_nodes = find_bill_node_by_id(project_division, bill_id) if not bill_nodes: print(f"未找到ID为 '{bill_id}' 的清单节点") return None # 获取找到的节点 bill_node = bill_nodes[0] # 获取取费表名称或ID - 尝试多种可能的字段名 fee_table_name = bill_node.get("取费表名称") or bill_node.get("取费表") or bill_node.get("费率表") if not fee_table_name: print(f"清单节点中没有'取费表名称'字段") # 打印节点信息以便调试 print(f"清单节点字段: {', '.join(bill_node.keys())}") # 尝试从父级节点获取取费表名称 if "parent_id" in bill_node: parent_nodes = find_bill_node_by_id(project_division, bill_node["parent_id"]) if parent_nodes: parent_node = parent_nodes[0] fee_table_name = ( parent_node.get("取费表名称") or parent_node.get("取费表") or parent_node.get("费率表") ) if fee_table_name: print(f"从父级节点获取到取费表名称: {fee_table_name}") if not fee_table_name: return None print(f"找到取费表名称: {fee_table_name}") # 使用相同的函数查找取费表 - 与预算工程保持一致 cost_table = find_cost_table(cost_setting, fee_table_name) if not cost_table: print(f"未找到取费表 '{fee_table_name}'") return None cost_table_children = cost_table.get("children", None) return cost_table_children except Exception as e: print(f"获取清单节点取费表时出错: {e}") import traceback traceback.print_exc() # 打印详细错误堆栈 return None def get_bill_node_by_id(bill_id: str) -> Dict[str, Any]: """ 根据ID从缓存中获取清单节点 Args: bill_id: 清单节点ID Returns: Dict[str, Any]: 清单节点数据 """ # 清理ID格式 clean_bill_id = str(bill_id).strip("{}").upper() # 在缓存中查找 for cached_id, node in _bill_node_cache.items(): if str(cached_id).strip("{}").upper() == clean_bill_id: print(f"从缓存中获取清单节点: {node.get('清单名称', '未命名')}") return node print(f"在缓存中未找到ID为 {bill_id} 的清单节点") return {} # if __name__ == "__main__": # # 测试参数(使用新合并的JSON文件) # json_file_path = "project2json/outputs/merged/变电检修国网-副本.json" # project_name = "1.12新增卸车保管" # project_guid = "{0952D46D-E5C7-4412-88FF-93517EF2633C}" # def _node_name(n: dict): # # 尽可能稳健地取名称字段 # for k in ("名称", "name", "材料名称", "定额名称", "项目名称", "title"): # if isinstance(n, dict) and n.get(k): # return n.get(k) # return n.get("newGuid") or n.get("GUID") or n.get("id") or "<无名称>" # def _node_type(n: dict): # return n.get("类型") or n.get("type") or "<无类型>" # def _node_id(n: dict): # return n.get("GUID") or n.get("guid") or n.get("id") or n.get("parent_id") or "<无ID>" # def _print_nodes(nodes, title): # print(f"{title} 共 {len(nodes) if nodes else 0} 个:") # if not nodes: # return # for i, item in enumerate(nodes, 1): # n = item[0] if isinstance(item, tuple) and item and isinstance(item[0], dict) else item # try: # print(f" {i:02d}. 名称={_node_name(n)} | 类型={_node_type(n)} | ID={_node_id(n)}") # except Exception: # print(f" {i:02d}. {item}") # def _check_resource_parents_with_children(nodes): # """统计是否存在类型为人/材/机且仍带children的父节点""" # def _is_res(t): # return t in ("人工", "材料", "机械", "2", "3", "4") # bad = [] # for item in nodes or []: # n = item[0] if isinstance(item, tuple) and item and isinstance(item[0], dict) else item # if isinstance(n, dict) and _is_res(n.get("类型") or n.get("type")) and n.get("children"): # bad.append(n) # if bad: # print("[警告] 发现仍存在带children的人材机父节点,这些父节点应被跳过,子节点应上提:") # _print_nodes(bad, "异常父级人材机节点") # else: # print("[校验] 未发现带children的人材机父节点,扁平化正常。") # print(f"测试项目: {project_name}") # print(f"文件路径: {json_file_path}") # print(f"项目GUID: {project_guid}") # print("-" * 50) # # 获取工程量节点 # print("获取工程量节点...") # quantity_nodes = get_quantity_nodes(json_file_path, project_name, "预算工程", project_guid, software_type="技改") # _print_nodes(quantity_nodes or [], "工程量节点") # # 获取人材机节点(应为:父为人材机且有children时,父不计,子上提并继承父类型) # print("获取人材机节点...") # resource_nodes = get_resource_nodes(json_file_path, project_name, project_guid) # _print_nodes(resource_nodes or [], "人材机节点") # _check_resource_parents_with_children(resource_nodes or []) # # 获取分类后的人材机节点 # print("获取分类后的人材机节点...") # labor_nodes, material_nodes, machinery_nodes = get_classified_resource_nodes( # json_file_path, project_name, project_guid # ) # _print_nodes(labor_nodes or [], "人工节点") # _print_nodes(material_nodes or [], "材料节点") # _print_nodes(machinery_nodes or [], "机械节点") # print("-" * 50) # print("测试完成!")