import json from typing import Dict, List, Any import copy class ExpenseProcessor: def __init__(self): pass @staticmethod def calculate_parent_costs(node: Dict[str, Any]) -> List[Dict[str, Any]]: if "children" not in node: if "id" in node and "cost" in node: return [{"id": node["id"], "cost": node["cost"]}] elif "cost" in node: return [{"cost": node["cost"]}] return [] result_nodes = [] processed_ids = set() for child in node["children"]: child_costs = ExpenseProcessor.calculate_parent_costs(child) for cost_item in child_costs: if "id" in cost_item: found = False for existing in result_nodes: if "id" in existing and existing["id"] == cost_item["id"]: existing["cost"] = str(float(existing["cost"]) + float(cost_item["cost"])) found = True break if not found: result_nodes.append(copy.deepcopy(cost_item)) processed_ids.add(cost_item["id"]) else: found = False for existing in result_nodes: if "id" not in existing: existing["cost"] = str(float(existing["cost"]) + float(cost_item["cost"])) found = True break if not found: result_nodes.append(copy.deepcopy(cost_item)) return result_nodes @staticmethod def process_node(node: Dict[str, Any]) -> Dict[str, Any]: result = copy.deepcopy(node) if "children" not in node or not node["children"]: return result cost_items = ExpenseProcessor.calculate_parent_costs(node) if cost_items: result["sum"] = cost_items result["children"] = [ExpenseProcessor.process_node(child) for child in node["children"]] return result @staticmethod def process_expense_preview(expense_preview: Dict[str, Any]) -> Dict[str, Any]: result = copy.deepcopy(expense_preview) for category_key, category_value in expense_preview.items(): for subcategory_key, subcategory_value in category_value.items(): if isinstance(subcategory_value, list): result[category_key][subcategory_key] = [ ExpenseProcessor.process_node(item) for item in subcategory_value ] return result @classmethod def load_and_process_from_file(cls, input_path: str, output_path: str = None) -> Dict[str, Any]: """ 从文件加载 JSON 并处理 :param input_path: 输入文件路径 :param output_path: 输出文件路径(可选) :return: 处理后的完整数据 """ with open(input_path, "r", encoding="utf-8") as f: data = json.load(f) if "projectData" in data and "expensePreview" in data["projectData"]: processed_data = copy.deepcopy(data) processed_data["projectData"]["expensePreview"] = cls.process_expense_preview( data["projectData"]["expensePreview"] ) if output_path: with open(output_path, "w", encoding="utf-8") as f: json.dump(processed_data, f, ensure_ascii=False, indent=4) print(f"处理完成,结果已保存到 {output_path}") return processed_data else: raise ValueError("未找到 projectData.expensePreview 路径") @classmethod def process_raw_data(cls, raw_data: Dict[str, Any]) -> Dict[str, Any]: """ 直接处理原始数据(不涉及文件读写) :param raw_data: 原始数据,格式应包含 projectData.expensePreview :return: 处理后的数据 """ if "projectData" in raw_data and "expensePreview" in raw_data["projectData"]: processed_data = copy.deepcopy(raw_data) processed_data["projectData"]["expensePreview"] = cls.process_expense_preview( raw_data["projectData"]["expensePreview"] ) return processed_data else: raise ValueError("未找到 projectData.expensePreview 路径") if __name__ == "__main__": input_file = "架空_clean.json" # 输入 JSON 文件路径 output_file = "output.json" # 输出 JSON 文件路径 # 使用类方法加载并处理 JSON 文件 ExpenseProcessor.load_and_process_from_file(input_file, output_file)