""" 第四步:实现项目划分和清单节点费用预览向上汇总 """ import json import os import glob from typing import Dict, List, Any, Tuple import copy class ExpenseProcessor: def __init__(self): pass @staticmethod def calculate_parent_costs(node: Dict[str, Any]) -> List[Dict[str, Any]]: """ 计算节点的汇总费用 :param node: 费用预览节点 :return: 汇总后的费用项列表 """ result_nodes = [] processed_ids = set() # 首先处理节点自身的sum数组 if "sum" in node and isinstance(node["sum"], list): for cost_item in node["sum"]: if "id" in cost_item and "cost" in cost_item: result_nodes.append(copy.deepcopy(cost_item)) processed_ids.add(cost_item["id"]) # 然后处理children节点 if "children" in node and isinstance(node["children"], list): for child in node["children"]: child_costs = ExpenseProcessor.calculate_parent_costs(child) # 合并子节点的费用 for cost_item in child_costs: if "id" in cost_item: # 查找是否已存在相同id的费用项 found = False for existing in result_nodes: if "id" in existing and existing["id"] == cost_item["id"]: # 如果存在,累加cost值 existing["cost"] = str(float(existing["cost"]) + float(cost_item["cost"])) found = True break # 如果不存在,添加新的费用项 if not found: result_nodes.append(copy.deepcopy(cost_item)) else: # 处理没有id的费用项(不常见) found = False for existing in result_nodes: if "id" not in existing: existing["cost"] = str(float(existing["cost"]) + float(cost_item["cost"])) found = True break if not found: result_nodes.append(copy.deepcopy(cost_item)) return result_nodes @staticmethod def process_node(node: Dict[str, Any]) -> Dict[str, Any]: """ 处理单个节点,计算汇总费用并更新sum数组 :param node: 费用预览节点 :return: 处理后的节点 """ result = copy.deepcopy(node) # 如果没有children,则不需要汇总 if "children" not in node or not node["children"]: # 确保节点有标准格式 if "sum" not in result: result["sum"] = [] if "children" not in result: result["children"] = [] if "rcj" not in result: result["rcj"] = [] return result # 计算汇总费用 cost_items = ExpenseProcessor.calculate_parent_costs(node) # 更新sum数组 if cost_items: # 确保只保留id和cost两个属性 result["sum"] = [{"id": item["id"], "cost": item["cost"]} for item in cost_items if "id" in item] else: result["sum"] = [] # 递归处理子节点 result["children"] = [ExpenseProcessor.process_node(child) for child in node["children"]] # 确保rcj数组存在 if "rcj" not in result: result["rcj"] = [] return result @staticmethod def process_expense_preview(expense_preview: Dict[str, Any]) -> Dict[str, Any]: """ 处理整个费用预览结构 :param expense_preview: 费用预览结构 :return: 处理后的费用预览结构 """ result = copy.deepcopy(expense_preview) for category_key, category_value in expense_preview.items(): for subcategory_key, subcategory_value in category_value.items(): if isinstance(subcategory_value, list): result[category_key][subcategory_key] = [ ExpenseProcessor.process_node(item) for item in subcategory_value ] return result @classmethod def load_and_process_from_file(cls, input_path: str, output_path: str = None) -> Dict[str, Any]: """ 从文件加载 JSON 并处理 :param input_path: 输入文件路径 :param output_path: 输出文件路径(可选) :return: 处理后的完整数据 """ try: with open(input_path, "r", encoding="utf-8") as f: data = json.load(f) if "projectData" in data and "expensePreview" in data["projectData"]: processed_data = copy.deepcopy(data) processed_data["projectData"]["expensePreview"] = cls.process_expense_preview( data["projectData"]["expensePreview"] ) if output_path: with open(output_path, "w", encoding="utf-8") as f: json.dump(processed_data, f, ensure_ascii=False, indent=4) print(f"处理完成,结果已保存到 {output_path}") return processed_data else: print(f"警告: 文件 {input_path} 中未找到 projectData.expensePreview 路径") return None except Exception as e: print(f"处理文件 {input_path} 时出错: {str(e)}") return None @classmethod def process_raw_data(cls, raw_data: Dict[str, Any]) -> Dict[str, Any]: """ 直接处理原始数据(不涉及文件读写) :param raw_data: 原始数据,格式应包含 projectData.expensePreview :return: 处理后的数据 """ if "projectData" in raw_data and "expensePreview" in raw_data["projectData"]: processed_data = copy.deepcopy(raw_data) processed_data["projectData"]["expensePreview"] = cls.process_expense_preview( raw_data["projectData"]["expensePreview"] ) return processed_data else: raise ValueError("未找到 projectData.expensePreview 路径") @classmethod def process_directory(cls, input_dir: str, output_dir: str) -> List[Tuple[str, str]]: """ 处理指定目录中的所有JSON文件 :param input_dir: 输入目录路径,包含要处理的JSON文件 :param output_dir: 输出目录路径,处理后的JSON文件将保存在这里 :return: 成功处理的文件列表,格式为 [(源文件路径, 输出文件路径), ...] """ # 确保输出目录存在 os.makedirs(output_dir, exist_ok=True) # 查找所有JSON文件 json_files = [] for file in os.listdir(input_dir): if file.lower().endswith(".json"): json_files.append(os.path.join(input_dir, file)) if not json_files: print(f"警告: 在目录 {input_dir} 中没有找到JSON文件") return [] # 处理每个JSON文件 successful_files = [] for input_file in json_files: file_name = os.path.basename(input_file) output_file = os.path.join(output_dir, file_name) print(f"处理文件: {input_file}") processed_data = cls.load_and_process_from_file(input_file, output_file) if processed_data: successful_files.append((input_file, output_file)) print(f"✅ 成功处理: {file_name}") else: print(f"❌ 处理失败: {file_name}") return successful_files def costsummary_upwards(input_dir: str, output_dir: str) -> List[Tuple[str, str]]: """ 处理指定目录中的所有JSON文件,实现项目划分和清单节点费用预览向上汇总 :param input_dir: 输入目录路径,包含要处理的JSON文件 :param output_dir: 输出目录路径,处理后的JSON文件将保存在这里 :return: 成功处理的文件列表,格式为 [(源文件路径, 输出文件路径), ...] """ return ExpenseProcessor.process_directory(input_dir, output_dir) if __name__ == "__main__": # 使用示例 input_directory = "final_outputs" # 输入JSON文件夹路径 output_directory = "final_outputs" # 输出JSON文件夹路径 # 处理整个文件夹 result = costsummary_upwards(input_directory, output_directory) # 显示处理结果 if result: print(f"\n成功处理了 {len(result)} 个文件:") for src, dst in result: print(f" {os.path.basename(src)} -> {os.path.basename(dst)}") else: print("\n没有文件被成功处理")