Files
KG_generation/transform_expense_preview.py
T
chentianrui 6afa368745 上传代码
2025-10-17 18:18:26 +08:00

1022 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
import re
import uuid
def _determine_project_type(data):
"""
根据basicData中的"软件类别"或"软件名称"判断工程类型
:param data: 项目数据,包含 basicData 字段
:return: 主网, 配网, 技改;如果无法匹配则返回 None
"""
# 软件类别名称映射字典,将各种变体映射到标准类别
CATEGORY_MAPPING = {
# 主网及其变体
"主网": "主网",
"主网工程": "主网",
"主网项目": "主网",
# 配网及其变体
"配网": "配网",
"配网造价": "配网",
"配网清单": "配网",
# 技改及其变体
"技改": "技改",
"技改工程": "技改",
"技改项目": "技改",
"技改造价": "技改",
"技改清单": "技改",
}
# 获取 basicData
basic_data = data.get("basicData") or {}
# 尝试获取 "软件类别",若不存在则尝试获取 "软件名称"
category = basic_data.get("软件类别") or basic_data.get("软件名称")
if not category:
return None
# 去除前后空格并查找映射
category = category.strip()
return CATEGORY_MAPPING.get(category)
# 新增:按工程类型为 projectData.projectInfo 增补键值对的映射与函数
# 可按需扩展不同工程类型需要自动补充的字段
PROJECT_INFO_ADDITIONS = {
# 主网:如需新增字段,请在此处补充,示例:"示例字段": "" 或默认值
"主网": {
"阶段类型": "",
"是否结算量差工程": "",
# 在此处按需添加主网专属字段,例如:
# "主网示例字段": ""
},
# 配网:如需新增字段,请在此处补充
"配网": {
# "配网示例字段": ""
},
# 技改:需求示例——同时支持以下两个字段,缺哪个补哪个
"技改": {
"建筑材机按系数调差": "",
"建筑修缮材机按系数调差": "",
"建筑拆除材机按系数调差": "",
"建筑拆除人工调差系数": "",
"安装拆除机械调差系数": "",
"安装拆除人工调差系数": "",
"安装拆除材料调差系数": "",
"安装人工调差系数": "",
"安装材料调差系数": "",
"安装机械调差系数": "",
"建筑人工调差系数": "",
"主材配送费费率": "",
},
}
def add_project_info_fields(data):
"""
根据工程类型(主网/配网/技改),为 data["projectData"]["projectInfo"] 增补字段。
- 若 projectInfo 不存在则创建。
- 仅在键不存在时补充,不覆盖已有值。
"""
try:
project_type = _determine_project_type(data)
except Exception:
project_type = None
if not project_type:
return
additions = PROJECT_INFO_ADDITIONS.get(project_type) or {}
if not additions:
return
project_data = data.setdefault("projectData", {})
project_info = project_data.get("projectInfo")
if not isinstance(project_info, dict):
project_info = {}
project_data["projectInfo"] = project_info
for k, v in additions.items():
if k not in project_info:
project_info[k] = v
def add_adjustment_type_to_engineering_nodes(data):
"""
为工程量节点(定额、主材、设备)新增属性字段 "调差类型"。
判定规则基于 projectData.projectDivision 的上级分类与子分类:
- 上级分类:"建筑工程" => 前缀为 "建筑""安装工程" => 前缀为 "安装"
- 子分类:
- "拆除" => 后缀 "拆除"(例:建筑拆除、安装拆除)
- "建筑" 或 "安装" => 无后缀(例:建筑、安装)
- "清理项目" => 后缀 "清理"(例:建筑清理、安装清理)
不覆盖已有的 "调差类型",仅在缺失时补充。
"""
try:
project_data = data.get("projectData", {})
pd = project_data.get("projectDivision")
if not isinstance(pd, dict):
return
# 映射:上级分类 -> 前缀
base_prefix_map = {"建筑工程": "建筑", "安装工程": "安装"}
# 映射:子分类 -> 后缀
sub_suffix_map = {"拆除": "拆除", "建筑": "", "安装": "", "清理项目": ""}
# 判断是否是工程量节点(定额、主材、设备)
def is_engineering_node(obj: dict) -> bool:
if not isinstance(obj, dict):
return False
t = obj.get("type")
if t in ("定额", "主材", "设备"):
return True
t2 = obj.get("类型")
if t2 in ("定额", "主材", "设备"):
return True
# 数字编码兼容(0:定额,1:主材,5:设备)
if str(t2) in ("0", "1", "5"):
return True
return False
# 递归遍历,携带当前上级分类前缀和子分类键名
def traverse(node, parent_key=None, base_prefix=None, sub_key=None):
# 更新当前上下文
if parent_key in base_prefix_map:
base_prefix = base_prefix_map[parent_key]
# 子分类只关心我们映射表里的几个键
if parent_key in sub_suffix_map:
sub_key = parent_key
if isinstance(node, dict):
# 命中工程量节点则补充“调差类型”
if is_engineering_node(node) and base_prefix:
if "调差类型" not in node:
suffix = sub_suffix_map.get(sub_key, None)
if suffix is None:
# 未识别子分类时,不写入,保持安全
pass
else:
# 特殊规则:拆除类需要前缀为“拆除”+ 基础前缀,如“拆除建筑/拆除安装”
if suffix == "拆除":
node["调差类型"] = "拆除" + base_prefix
else:
node["调差类型"] = base_prefix + (suffix or "")
for k, v in list(node.items()):
traverse(v, k, base_prefix, sub_key)
elif isinstance(node, list):
for item in node:
traverse(item, parent_key, base_prefix, sub_key)
# 从 projectDivision 根开始遍历
traverse(pd)
except Exception:
# 保守失败,不影响主流程
pass
def _fix_split_flag_without_children(root_node):
"""
遍历节点树,将属性中存在 "拆分": "1"(或数值1)且不包含 "children" 键的节点,修正为 "拆分": "0"。
若存在 "children" 键,则不变(无论 children 是否为空均视为存在)。
"""
def _recurse(node):
if isinstance(node, dict):
try:
if "拆分" in node and "children" not in node:
val = node.get("拆分")
if str(val) == "1":
node["拆分"] = "0"
except Exception:
pass
# 递归子项
for _, v in list(node.items()):
_recurse(v)
elif isinstance(node, list):
for item in node:
_recurse(item)
try:
_recurse(root_node)
except Exception:
pass
def _normalize_project_division_guid_keys(project_division_root):
"""
仅在 projectDivision 下,将任意节点属性中的键 "guid" 规范化为 "GUID"
- 若同时存在 "guid" 与 "GUID",不做任何修改;
- 若只存在 "guid",则改名为 "GUID"(保留原值)。
- 仅处理 projectDivision 这颗子树,不影响其他位置。
"""
def _recurse(node):
if isinstance(node, dict):
if "guid" in node and "GUID" not in node:
try:
node["GUID"] = node["guid"]
del node["guid"]
except Exception:
# 安全回退:若修改失败则忽略该键
pass
# 继续递归所有子项
for k, v in list(node.items()):
_recurse(v)
elif isinstance(node, list):
for item in node:
_recurse(item)
try:
_recurse(project_division_root)
except Exception:
# 保守失败,不影响主流程
pass
def transform_expense_preview(input_file, output_file):
"""
转换技改预算线路.json中的expensePreview结构,使其与主网预算线路.json中的结构一致
思路:
1. 从projectDivision中提取项目划分结构
2. 根据这个结构重新生成expensePreview
3. 将原始expensePreview中的children挂载到对应GUID的节点下
"""
print(f"正在读取文件: {input_file}")
try:
with open(input_file, "r", encoding="utf-8") as f:
data = json.load(f)
print("JSON文件加载成功")
# 提取原始的expensePreview和projectDivision
original_expense_preview = data.get("projectData", {}).get("expensePreview", {})
project_division = data.get("projectData", {}).get("projectDivision", {})
print(f"原始expensePreview中的顶级分类: {list(original_expense_preview.keys())}")
print(f"projectDivision中的顶级分类: {list(project_division.keys())}")
# 先清理 projectDivision:递归删除任意带有 "删除": "1" 或 1 的节点
def _filter_deleted_nodes(obj):
# 若当前对象本身标记了删除,则直接丢弃
if isinstance(obj, dict):
flag = obj.get("删除")
if flag == "1" or flag == 1:
return None
new_obj = {}
for k, v in obj.items():
filtered = _filter_deleted_nodes(v)
if filtered is not None:
new_obj[k] = filtered
return new_obj
elif isinstance(obj, list):
new_list = []
for item in obj:
filtered = _filter_deleted_nodes(item)
if filtered is not None:
new_list.append(filtered)
return new_list
else:
return obj
cleaned_project_division = _filter_deleted_nodes(project_division) or {}
if cleaned_project_division != project_division:
print("已根据 '删除' 标记清理 projectDivision 中的节点")
project_division = cleaned_project_division
# 在 projectDivision 中规范化 guid->GUID 键(若无 GUID 才改名)
_normalize_project_division_guid_keys(project_division)
# 回写清理与规范化后的结构,确保后续流程与落盘一致
if "projectData" in data:
data["projectData"]["projectDivision"] = project_division
# 创建新的expensePreview结构
new_expense_preview = {}
# 创建GUID到原始expensePreview中数据的映射
guid_to_data = {}
# 创建GUID到嵌套GUID的映射,用于处理嵌套结构
guid_to_nested_guids = {}
# 记录已处理过的GUID,避免重复添加
processed_guids = set()
# 递归处理嵌套结构
def extract_guid_data(obj, path="", parent_guid=None):
if isinstance(obj, dict):
# 检查当前对象是否有guid字段
guid = obj.get("guid")
if guid:
# 保存整个对象数据
guid_to_data[guid] = obj
# 同时保存大写GUID的映射,以处理大小写不一致问题
guid_to_data[guid.upper()] = obj
# 如果有父GUID,记录嵌套关系
if parent_guid:
if parent_guid not in guid_to_nested_guids:
guid_to_nested_guids[parent_guid] = []
guid_to_nested_guids[parent_guid].append(guid)
# 继续递归处理所有子项
current_guid = guid if guid else parent_guid
for key, value in obj.items():
new_path = f"{path}.{key}" if path else key
extract_guid_data(value, new_path, current_guid)
elif isinstance(obj, list):
for i, item in enumerate(obj):
new_path = f"{path}[{i}]"
extract_guid_data(item, new_path, parent_guid)
# 对每个顶级分类进行递归处理
for category, category_data in original_expense_preview.items():
if isinstance(category_data, dict):
# 处理每个顶级节点
for key, item_data in category_data.items():
# 检查键名是否是GUID格式
if key.startswith("{") and key.endswith("}"):
# 带清单的结构:键名是GUID格式
parent_guid = key.strip("{}")
# 递归处理该GUID下的所有数据
extract_guid_data(item_data, f"{category}.{key}", parent_guid)
# 直接添加到guid_to_data映射中
if "guid" not in item_data:
item_data["guid"] = key
guid_to_data[parent_guid] = item_data
else:
# 不带清单的结构:键名是描述性名称
if isinstance(item_data, dict) and "guid" in item_data:
# 直接保存到guid_to_data映射中
guid = item_data["guid"]
guid_stripped = guid.strip("{}")
guid_to_data[guid] = item_data
guid_to_data[guid_stripped] = item_data
guid_to_data[guid.upper()] = item_data
guid_to_data[guid_stripped.upper()] = item_data
else:
# 递归处理
extract_guid_data(item_data, f"{category}.{key}")
else:
# 不是字典结构,直接递归处理
extract_guid_data(category_data, category)
print(f"找到 {len(guid_to_data)} 个GUID映射")
print(f"找到 {len(guid_to_nested_guids)} 个嵌套GUID关系")
# 处理projectDivision中的数据
for category, items in project_division.items():
if category == "工程量" and isinstance(items, dict):
for specialty_type, specialty_items in items.items():
if isinstance(specialty_items, list) and specialty_items:
print(f"处理专业类型: {specialty_type}")
# 创建专业类型的分类
if specialty_type not in new_expense_preview:
new_expense_preview[specialty_type] = []
# 处理每个项目
for item in specialty_items:
if item.get("type") == "项目划分":
# 构建项目层级
project_hierarchy = build_project_hierarchy(
item, guid_to_data, guid_to_nested_guids, processed_guids
)
if project_hierarchy:
new_expense_preview[specialty_type].append(project_hierarchy)
# 如果没有从projectDivision中找到数据,保留原始的expensePreview结构
if not new_expense_preview:
print("未从projectDivision中找到数据,保留原始结构")
for category, category_data in original_expense_preview.items():
if category not in new_expense_preview:
new_expense_preview[category] = []
# 尝试转换原始结构为列表结构
for item_key, item_data in category_data.items():
if isinstance(item_data, dict):
# 检查是否是直接包含guid的项
if "guid" in item_data:
guid = item_data["guid"]
new_item = {"GUID": guid}
# 复制所有其他属性
for k, v in item_data.items():
if k != "guid":
new_item[k] = v
new_expense_preview[category].append(new_item)
else:
# 处理嵌套结构
for nested_key, nested_data in item_data.items():
if isinstance(nested_data, dict) and "guid" in nested_data:
guid = nested_data["guid"]
new_item = {"GUID": guid}
# 复制所有其他属性
for k, v in nested_data.items():
if k != "guid":
new_item[k] = v
# 记录父子关系
parent_guid = item_key.strip("{}")
if parent_guid:
if parent_guid not in guid_to_nested_guids:
guid_to_nested_guids[parent_guid] = []
guid_to_nested_guids[parent_guid].append(guid)
new_expense_preview[category].append(new_item)
# 后处理:移除所有自引用节点
remove_self_references(new_expense_preview)
print(f"新expensePreview中的顶级分类: {list(new_expense_preview.keys())}")
# 更新data中的expensePreview
data["projectData"]["expensePreview"] = new_expense_preview
# 清洗:修正没有 children 却标记为 "拆分": "1" 的节点
try:
_fix_split_flag_without_children(data.get("projectData", {}).get("projectDivision", {}))
_fix_split_flag_without_children(data.get("projectData", {}).get("expensePreview", {}))
except Exception:
pass
# 新增:按工程类型为 projectInfo 补充字段
add_project_info_fields(data)
# 新增:为工程量节点补充“调差类型”
add_adjustment_type_to_engineering_nodes(data)
# 保存转换后的文件
print(f"正在保存文件: {output_file}")
with open(output_file, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print("转换完成!")
except Exception as e:
print(f"处理过程中出错: {str(e)}")
def build_project_hierarchy(item, guid_to_data, guid_to_nested_guids, processed_guids=None):
"""构建项目的层级结构"""
guid = item.get("GUID")
if not guid:
return None
# 如果已经处理过该GUID,则跳过
if processed_guids is not None:
if guid in processed_guids:
return None
processed_guids.add(guid)
# 创建新的项目节点,只包含GUID
project_node = {"GUID": guid}
# 如果在原始数据中有对应的数据,则复制相关属性
guid_stripped = guid.strip("{}")
if guid_stripped in guid_to_data:
original_data = guid_to_data[guid_stripped]
# 复制children和其他属性
for key, value in original_data.items():
if key != "guid": # 不复制guid,因为已经用GUID替代
# 特殊处理children字段,避免类型错误
if key == "children" and isinstance(value, list):
if "children" not in project_node:
project_node["children"] = []
# 复制children中的每个元素
for child in value:
# 只有当child是包含id和cost的字典时才添加
if isinstance(child, dict) and ("id" in child or "cost" in child):
project_node["children"].append(child)
else:
# 确保不复制形如"{GUID}": {...}的键值对
if not (isinstance(key, str) and key.startswith("{") and key.endswith("}")):
project_node[key] = value
# 检查是否有嵌套的GUID需要处理
if guid_stripped in guid_to_nested_guids:
if "children" not in project_node:
project_node["children"] = []
# 为每个嵌套的GUID创建子节点
for nested_guid in guid_to_nested_guids[guid_stripped]:
# 避免创建自引用
if nested_guid == guid_stripped:
continue
# 避免重复处理
if processed_guids is not None and f"{{{nested_guid}}}" in processed_guids:
continue
# 标准化GUID格式,确保只有单中括号
normalized_guid = nested_guid.strip("{}")
guid_with_braces = "{" + normalized_guid + "}"
nested_node = {"GUID": guid_with_braces}
# 记录已处理过的GUID
if processed_guids is not None:
processed_guids.add(guid_with_braces)
# 从guid_to_data中获取嵌套节点的数据
if nested_guid in guid_to_data:
nested_data = guid_to_data[nested_guid]
# 复制嵌套节点的属性
for key, value in nested_data.items():
if key != "guid":
# 确保不复制形如"{GUID}": {...}的键值对
if not (isinstance(key, str) and key.startswith("{") and key.endswith("}")):
# 特殊处理children字段
if key == "children" and isinstance(value, list):
# 不直接赋值,而是逐个添加元素
if "children" not in nested_node:
nested_node["children"] = []
for child in value:
# 只有当child是包含id和cost的字典时才添加
if isinstance(child, dict) and ("id" in child or "cost" in child):
nested_node["children"].append(child)
else:
# 直接复制值,让JSON序列化处理类型转换
nested_node[key] = value
# 递归处理嵌套节点的嵌套关系
if nested_guid in guid_to_nested_guids:
# 不直接赋值,而是确保children是一个列表
if "children" not in nested_node:
nested_node["children"] = []
build_nested_hierarchy(nested_node, nested_guid, guid_to_data, guid_to_nested_guids, processed_guids)
project_node["children"].append(nested_node)
# 处理子项
children = item.get("children", [])
if children:
if "children" not in project_node:
project_node["children"] = []
# 递归处理每个子项
for child in children:
if child.get("type") == "项目划分":
child_node = build_project_hierarchy(child, guid_to_data, guid_to_nested_guids, processed_guids)
if child_node:
# 避免创建自引用
if child_node.get("GUID") != guid:
project_node["children"].append(child_node)
return project_node
def build_nested_hierarchy(node, guid, guid_to_data, guid_to_nested_guids, processed_guids=None):
"""递归构建嵌套的层级结构"""
if guid in guid_to_nested_guids:
# 不直接赋值,而是确保children是一个列表
if "children" not in node:
node["children"] = []
for nested_guid in guid_to_nested_guids[guid]:
# 避免创建自引用
if nested_guid == guid:
continue
# 避免重复处理
if processed_guids is not None and f"{{{nested_guid}}}" in processed_guids:
continue
# 标准化GUID格式,确保只有单中括号
normalized_guid = nested_guid.strip("{}")
guid_with_braces = "{" + normalized_guid + "}"
nested_node = {"GUID": guid_with_braces}
# 记录已处理过的GUID
if processed_guids is not None:
processed_guids.add(guid_with_braces)
# 从guid_to_data中获取嵌套节点的数据
if nested_guid in guid_to_data:
nested_data = guid_to_data[nested_guid]
# 复制嵌套节点的属性
for key, value in nested_data.items():
if key != "guid":
# 确保不复制形如"{GUID}": {...}的键值对
if not (isinstance(key, str) and key.startswith("{") and key.endswith("}")):
# 特殊处理children字段
if key == "children" and isinstance(value, list):
# 不直接赋值,而是逐个添加元素
if "children" not in nested_node:
nested_node["children"] = []
for child in value:
# 只有当child是包含id和cost的字典时才添加
if isinstance(child, dict) and ("id" in child or "cost" in child):
nested_node["children"].append(child)
else:
# 直接复制值,让JSON序列化处理类型转换
nested_node[key] = value
# 递归处理更深层次的嵌套
if nested_guid in guid_to_nested_guids:
# 不直接赋值,而是确保children是一个列表
if "children" not in nested_node:
nested_node["children"] = []
build_nested_hierarchy(nested_node, nested_guid, guid_to_data, guid_to_nested_guids, processed_guids)
node["children"].append(nested_node)
def remove_self_references(expense_preview):
"""移除所有自引用节点"""
for category, items in expense_preview.items():
if isinstance(items, list):
for item in items:
remove_self_references_from_node(item)
def remove_self_references_from_node(node):
"""递归移除节点中的自引用"""
if not isinstance(node, dict):
return
guid = node.get("GUID")
if not guid:
return
# 检查children
if "children" in node and isinstance(node["children"], list):
# 找出需要移除的自引用节点
to_remove = []
for i, child in enumerate(node["children"]):
if isinstance(child, dict):
child_guid = child.get("GUID")
if child_guid:
# 检查是否是自引用
if child_guid == guid:
to_remove.append(i)
# 检查是否是双重大括号的自引用
elif child_guid == f"{{{guid.strip('{}')}}}" or f"{{{child_guid.strip('{}')}}}" == guid:
to_remove.append(i)
# 检查是否是不带大括号的自引用
elif child_guid.strip("{}") == guid.strip("{}"):
to_remove.append(i)
else:
# 递归处理子节点
remove_self_references_from_node(child)
# 从后往前移除自引用节点,避免索引变化
for i in reversed(to_remove):
del node["children"][i]
def find_node_in_expense_preview(expense_preview, target_guid):
"""在expensePreview中查找指定GUID的节点"""
for category, items in expense_preview.items():
if isinstance(items, list):
for item in items:
result = find_node(item, target_guid)
if result:
return result
return None
def find_node(node, target_guid):
"""递归查找指定GUID的节点"""
if node.get("GUID") == target_guid:
return node
# 检查children
children = node.get("children", [])
for child in children:
result = find_node(child, target_guid)
if result:
return result
return None
def transform_json_types(input_file_path, output_file_path=None):
"""
主网转换JSON文件中的多个字段值
参数:
input_file_path (str): 输入的JSON文件路径
output_file_path (str, 可选): 输出的JSON文件路径,如果为None则覆盖原文件
返回:
dict: 转换后的JSON数据
"""
# 定义类型映射关系
type_mapping = {
"8": "清单",
"0": "定额",
"1": "主材",
"5": "设备",
"2": "人工",
"3": "材料",
"4": "机械",
"16": "一笔性费用",
}
# 定义设备类型映射关系
device_type_mapping = {"0": "普通设备"}
# 定义供货方映射关系
supplier_mapping = {"1": "甲供", "2": "乙供"}
# 定义费用类型映射关系
fee_type_mapping = {"0": "取费", "1": "不取费"}
# 读取输入文件
with open(input_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# 在主网流程中,同样先清理 projectDivision:递归删除任意带有 "删除": "1" 或 1 的节点
def _filter_deleted_nodes(obj):
if isinstance(obj, dict):
flag = obj.get("删除")
if flag == "1" or flag == 1:
return None
new_obj = {}
for k, v in obj.items():
filtered = _filter_deleted_nodes(v)
if filtered is not None:
new_obj[k] = filtered
return new_obj
elif isinstance(obj, list):
new_list = []
for item in obj:
filtered = _filter_deleted_nodes(item)
if filtered is not None:
new_list.append(filtered)
return new_list
else:
return obj
try:
pd = data.get("projectData", {}).get("projectDivision", {})
cleaned_pd = _filter_deleted_nodes(pd) or {}
if cleaned_pd != pd:
print("[主网] 已根据 '删除' 标记清理 projectDivision 中的节点")
# 在 projectDivision 中规范化 guid->GUID 键(若无 GUID 才改名)
_normalize_project_division_guid_keys(cleaned_pd)
if "projectData" in data:
data["projectData"]["projectDivision"] = cleaned_pd
except Exception:
pass
# 递归处理函数
def traverse(obj):
if isinstance(obj, dict):
# 转换"类型"字段
if "类型" in obj:
current_type = str(obj["类型"])
if current_type in type_mapping:
obj["类型"] = type_mapping[current_type]
# 转换id为GUID
if current_type in ("0", "1", "5") and "id" in obj:
obj["GUID"] = obj["id"]
del obj["id"]
if current_type in ("0", "1", "5") and "费用类型" in obj:
fee_type = str(obj["费用类型"])
if fee_type in fee_type_mapping:
obj["费用类型"] = fee_type_mapping[fee_type]
# 类型为1或5的节点: 转换供货方
if current_type in ("1", "5") and "供货方" in obj:
supplier = str(obj["供货方"])
if supplier in supplier_mapping:
obj["供货方"] = supplier_mapping[supplier]
# 类型为5的节点: 转换设备类型
if current_type == "5" and "设备类型" in obj:
device_type = str(obj["设备类型"])
if device_type in device_type_mapping:
obj["设备类型"] = device_type_mapping[device_type]
# 若节点存在“类型”但没有“type”,则补充一个“type”属性,其值等于当前“类型”的值
if "类型" in obj and "type" not in obj:
if obj["类型"] == "材料":
obj["type"] = "消材"
else:
obj["type"] = obj["类型"]
# 转换“定额范围”字段:1 -> 预算,0 -> 概算
if "定额范围" in obj:
try:
scope_val = str(obj["定额范围"]).strip()
if scope_val == "1":
obj["定额范围"] = "预算"
elif scope_val == "0":
obj["定额范围"] = "概算"
except Exception:
pass
if "脚手架计取" in obj:
try:
scope_val = str(obj["脚手架计取"]).strip()
if scope_val == "1":
obj["脚手架计取"] = "计取"
elif scope_val == "0":
obj["脚手架计取"] = "不计取"
except Exception:
pass
# 工程量节点的“特征段”字段规范化,例如“特征1”或“特征段1”-> 1
try:
if "特征段" in obj and isinstance(obj.get("特征段"), str):
val = obj.get("特征段", "").strip()
# 匹配以“特征”或“特征段”开头并带有数字的形式
m = re.match(r"^\s*特征(?:段)?\s*(\d+)\s*$", val)
if m:
# 统一以字符串形式写回,确保JSON中带双引号
obj["特征段"] = m.group(1)
except Exception:
# 保守失败,不中断整体转换
pass
# 递归处理所有值
for value in obj.values():
traverse(value)
elif isinstance(obj, list):
for item in obj:
traverse(item)
# 执行转换
traverse(data)
# 清洗:修正没有 children 却标记为 "拆分": "1" 的节点(仅限关键树)
try:
_fix_split_flag_without_children(data.get("projectData", {}).get("projectDivision", {}))
_fix_split_flag_without_children(data.get("projectData", {}).get("expensePreview", {}))
except Exception:
pass
# 新增:按工程类型为 projectInfo 补充字段
add_project_info_fields(data)
# 新增:为工程量节点补充“调差类型”
add_adjustment_type_to_engineering_nodes(data)
# 确定输出路径
if output_file_path is None:
output_file_path = input_file_path
# 写入输出文件
with open(output_file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return data
def add_missing_guids_to_nodes(file_path):
"""
为缺少GUID的定额、主材、设备节点生成GUID
参数:
file_path (str): JSON文件路径
"""
try:
print(f"正在为缺少GUID的节点生成GUID: {file_path}")
# 读取JSON文件
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# 统计生成的GUID数量
generated_count = 0
def process_node(node):
"""递归处理节点,为缺少GUID的定额、主材、设备节点生成GUID"""
nonlocal generated_count
if isinstance(node, dict):
# 检查节点类型
node_type = node.get("type", "")
# 如果是定额、主材、设备类型,且没有GUID,则生成一个
if node_type in ["定额", "主材", "设备"] and "guid" not in node and "GUID" not in node:
new_guid = "{" + str(uuid.uuid4()).upper() + "}"
node["guid"] = new_guid
generated_count += 1
print(f"为{node_type}节点生成GUID: {new_guid}")
# 递归处理所有子节点
for key, value in node.items():
if isinstance(value, (dict, list)):
process_node(value)
elif isinstance(node, list):
# 处理列表中的每个元素
for item in node:
process_node(item)
# 从projectData开始处理
if "projectData" in data:
process_node(data["projectData"])
# 保存修改后的文件
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"✅ 成功为 {generated_count} 个节点生成了GUID")
except Exception as e:
print(f"❌ 为节点生成GUID时出错: {str(e)}")
import traceback
traceback.print_exc()
def process_directory(directory_path):
"""
批量处理指定目录下的所有JSON文件
参数:
directory_path (str): 包含JSON文件的目录路径
"""
print(f"开始处理目录: {directory_path}")
# 确保目录存在
if not os.path.exists(directory_path):
print(f"错误: 目录 {directory_path} 不存在")
return
# 获取目录中的所有JSON文件
json_files = [f for f in os.listdir(directory_path) if f.lower().endswith(".json")]
if not json_files:
print(f"警告: 目录 {directory_path} 中没有找到JSON文件")
return
print(f"找到 {len(json_files)} 个JSON文件")
# 处理每个JSON文件
for json_file in json_files:
file_path = os.path.join(directory_path, json_file)
print(f"\n处理文件: {file_path}")
try:
# 读取JSON文件
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# 确定项目类型
project_type = _determine_project_type(data)
if project_type:
print(f"检测到项目类型: {project_type}")
# 根据项目类型选择处理方法
if project_type == "主网":
print("应用主网转换...")
transform_json_types(file_path) # 覆盖原文件
elif project_type in ["配网", "技改"]:
print(f"应用{project_type}转换...")
transform_expense_preview(file_path, file_path) # 覆盖原文件
# 为缺少GUID的定额、主材、设备节点生成GUID
add_missing_guids_to_nodes(file_path)
else:
print(f"未知项目类型: {project_type},跳过处理")
else:
print("无法确定项目类型,跳过处理")
except Exception as e:
print(f"处理文件 {file_path} 时出错: {str(e)}")
print("\n批量处理完成!")
if __name__ == "__main__":
# 示例用法
# # 单文件处理
# try:
# input_file = "project2json/outputs/json/招标-架线检修.json"
# output_file = "project2json/outputs/json/招标-架线检修_transformed.json"
# print("直接测试单个文件处理...")
# print(f"输入文件: {input_file}")
# print(f"输出文件: {output_file}")
# transform_expense_preview(input_file, output_file)
# print("处理完成!")
# except Exception as e:
# import traceback
# print(f"处理过程中出错: {str(e)}")
# traceback.print_exc()
# 批量处理目录
json_directory = "data/input/json"
process_directory(json_directory)