Files
KG_generation/supplement_kg.py
T
chentianrui 9609bb67b4 上传文件
2025-08-01 15:31:56 +08:00

233 lines
8.9 KiB
Python

"""
第四步:实现项目划分和清单节点费用预览向上汇总
"""
import json
import os
import glob
from typing import Dict, List, Any, Tuple
import copy
class ExpenseProcessor:
def __init__(self):
pass
@staticmethod
def calculate_parent_costs(node: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
计算节点的汇总费用
:param node: 费用预览节点
:return: 汇总后的费用项列表
"""
result_nodes = []
processed_ids = set()
# 首先处理节点自身的sum数组
if "sum" in node and isinstance(node["sum"], list):
for cost_item in node["sum"]:
if "id" in cost_item and "cost" in cost_item:
result_nodes.append(copy.deepcopy(cost_item))
processed_ids.add(cost_item["id"])
# 然后处理children节点
if "children" in node and isinstance(node["children"], list):
for child in node["children"]:
child_costs = ExpenseProcessor.calculate_parent_costs(child)
# 合并子节点的费用
for cost_item in child_costs:
if "id" in cost_item:
# 查找是否已存在相同id的费用项
found = False
for existing in result_nodes:
if "id" in existing and existing["id"] == cost_item["id"]:
# 如果存在,累加cost值
existing["cost"] = str(float(existing["cost"]) + float(cost_item["cost"]))
found = True
break
# 如果不存在,添加新的费用项
if not found:
result_nodes.append(copy.deepcopy(cost_item))
else:
# 处理没有id的费用项(不常见)
found = False
for existing in result_nodes:
if "id" not in existing:
existing["cost"] = str(float(existing["cost"]) + float(cost_item["cost"]))
found = True
break
if not found:
result_nodes.append(copy.deepcopy(cost_item))
return result_nodes
@staticmethod
def process_node(node: Dict[str, Any]) -> Dict[str, Any]:
"""
处理单个节点,计算汇总费用并更新sum数组
:param node: 费用预览节点
:return: 处理后的节点
"""
result = copy.deepcopy(node)
# 如果没有children,则不需要汇总
if "children" not in node or not node["children"]:
# 确保节点有标准格式
if "sum" not in result:
result["sum"] = []
if "children" not in result:
result["children"] = []
if "rcj" not in result:
result["rcj"] = []
return result
# 计算汇总费用
cost_items = ExpenseProcessor.calculate_parent_costs(node)
# 更新sum数组
if cost_items:
# 确保只保留id和cost两个属性
result["sum"] = [{"id": item["id"], "cost": item["cost"]} for item in cost_items if "id" in item]
else:
result["sum"] = []
# 递归处理子节点
result["children"] = [ExpenseProcessor.process_node(child) for child in node["children"]]
# 确保rcj数组存在
if "rcj" not in result:
result["rcj"] = []
return result
@staticmethod
def process_expense_preview(expense_preview: Dict[str, Any]) -> Dict[str, Any]:
"""
处理整个费用预览结构
:param expense_preview: 费用预览结构
:return: 处理后的费用预览结构
"""
result = copy.deepcopy(expense_preview)
for category_key, category_value in expense_preview.items():
for subcategory_key, subcategory_value in category_value.items():
if isinstance(subcategory_value, list):
result[category_key][subcategory_key] = [
ExpenseProcessor.process_node(item) for item in subcategory_value
]
return result
@classmethod
def load_and_process_from_file(cls, input_path: str, output_path: str = None) -> Dict[str, Any]:
"""
从文件加载 JSON 并处理
:param input_path: 输入文件路径
:param output_path: 输出文件路径(可选)
:return: 处理后的完整数据
"""
try:
with open(input_path, "r", encoding="utf-8") as f:
data = json.load(f)
if "projectData" in data and "expensePreview" in data["projectData"]:
processed_data = copy.deepcopy(data)
processed_data["projectData"]["expensePreview"] = cls.process_expense_preview(
data["projectData"]["expensePreview"]
)
if output_path:
with open(output_path, "w", encoding="utf-8") as f:
json.dump(processed_data, f, ensure_ascii=False, indent=4)
print(f"处理完成,结果已保存到 {output_path}")
return processed_data
else:
print(f"警告: 文件 {input_path} 中未找到 projectData.expensePreview 路径")
return None
except Exception as e:
print(f"处理文件 {input_path} 时出错: {str(e)}")
return None
@classmethod
def process_raw_data(cls, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""
直接处理原始数据(不涉及文件读写)
:param raw_data: 原始数据,格式应包含 projectData.expensePreview
:return: 处理后的数据
"""
if "projectData" in raw_data and "expensePreview" in raw_data["projectData"]:
processed_data = copy.deepcopy(raw_data)
processed_data["projectData"]["expensePreview"] = cls.process_expense_preview(
raw_data["projectData"]["expensePreview"]
)
return processed_data
else:
raise ValueError("未找到 projectData.expensePreview 路径")
@classmethod
def process_directory(cls, input_dir: str, output_dir: str) -> List[Tuple[str, str]]:
"""
处理指定目录中的所有JSON文件
:param input_dir: 输入目录路径,包含要处理的JSON文件
:param output_dir: 输出目录路径,处理后的JSON文件将保存在这里
:return: 成功处理的文件列表,格式为 [(源文件路径, 输出文件路径), ...]
"""
# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)
# 查找所有JSON文件
json_files = []
for file in os.listdir(input_dir):
if file.lower().endswith(".json"):
json_files.append(os.path.join(input_dir, file))
if not json_files:
print(f"警告: 在目录 {input_dir} 中没有找到JSON文件")
return []
# 处理每个JSON文件
successful_files = []
for input_file in json_files:
file_name = os.path.basename(input_file)
output_file = os.path.join(output_dir, file_name)
print(f"处理文件: {input_file}")
processed_data = cls.load_and_process_from_file(input_file, output_file)
if processed_data:
successful_files.append((input_file, output_file))
print(f"✅ 成功处理: {file_name}")
else:
print(f"❌ 处理失败: {file_name}")
return successful_files
def costsummary_upwards(input_dir: str, output_dir: str) -> List[Tuple[str, str]]:
"""
处理指定目录中的所有JSON文件,实现项目划分和清单节点费用预览向上汇总
:param input_dir: 输入目录路径,包含要处理的JSON文件
:param output_dir: 输出目录路径,处理后的JSON文件将保存在这里
:return: 成功处理的文件列表,格式为 [(源文件路径, 输出文件路径), ...]
"""
return ExpenseProcessor.process_directory(input_dir, output_dir)
if __name__ == "__main__":
# 使用示例
input_directory = "final_outputs" # 输入JSON文件夹路径
output_directory = "final_outputs" # 输出JSON文件夹路径
# 处理整个文件夹
result = costsummary_upwards(input_directory, output_directory)
# 显示处理结果
if result:
print(f"\n成功处理了 {len(result)} 个文件:")
for src, dst in result:
print(f" {os.path.basename(src)} -> {os.path.basename(dst)}")
else:
print("\n没有文件被成功处理")