Files
KG_generation/cost_comparison.py
chentianrui 6afa368745 上传代码
2025-10-17 18:18:26 +08:00

376 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
批量对比多个 calculation_results.json 与 project_data.json 中的费用
"""
import json
import os
import re
from difflib import SequenceMatcher
def extract_guid_from_filename(filename):
"""从文件名中提取 GUID"""
pattern = r"([0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12})"
match = re.search(pattern, filename, re.IGNORECASE)
return match.group(1).upper() if match else None
def normalize_cost_name(name):
"""标准化费用名称,便于匹配"""
name = re.sub(r"_\w+$", "", name) # 去除后缀如 _GJJ
name = re.sub(r"[^\w]", "", name) # 去除符号
return name.lower()
def calculate_similarity(a, b):
"""计算两个字符串的相似度"""
return SequenceMatcher(None, a, b).ratio()
def load_calculation_results(json_file_path, project_type: str = "budget"):
"""读取计算结果 JSON,返回费用总和字典
- budget(预算工程): 期望结构为 { node: { cost_name: number, ... }, ... }
逐项累加 cost_name -> number。
- inventory(清单工程): 结构通常为 { category: { item: { metric_name: number, ... } } }
将最内层 metrics(如“合价/直接费/人工费/材料费/机械费/措施费/间接费/安全文明施工费”等)按 metric_name 汇总累计。
"""
with open(json_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
total_costs = {}
if project_type == "inventory":
# 三级结构:大类 -> 清单项 -> 指标dict
for _, items in (data or {}).items():
if isinstance(items, dict):
for _, metrics in items.items():
if isinstance(metrics, dict):
for metric_name, metric_val in metrics.items():
# 仅累计数值型
if isinstance(metric_val, (int, float)):
total_costs[metric_name] = total_costs.get(metric_name, 0) + float(metric_val)
else:
# 预算:两级结构:节点 -> 费用名: 数值
for _, cost_dict in (data or {}).items():
if isinstance(cost_dict, dict):
for cost_name, value in cost_dict.items():
if isinstance(value, (int, float)):
total_costs[cost_name] = total_costs.get(cost_name, 0) + float(value)
return total_costs
def find_node_by_guid(expense_preview, target_guid):
"""在 expensePreview 中递归查找 GUID 对应的节点,并返回其 sum 列表用于对比。
适配新结构:每个 GUID 节点包含三个列表:
- sum: 费用条目(含 id/cost)——用于对比
- children: 递归子节点列表(继续下钻)
- rcj: 其他数据(此处忽略)
"""
def search_recursive(node):
"""统一遍历任意层级结构(dict/list),匹配 GUID 并返回其 children 列表"""
if isinstance(node, dict):
# 当前节点如果带 GUID,先判断自身是否命中
current_guid = node.get("GUID", "")
if isinstance(current_guid, str) and current_guid:
current_guid = current_guid.strip("{}").upper()
if current_guid == target_guid:
# 命中后返回 sum 列表作为对比项来源
return node.get("sum", [])
# 若存在 children,优先深入 children(继续查找更深层 GUID
if "children" in node and isinstance(node["children"], list):
result = search_recursive(node["children"])
if result is not None:
return result
# 继续遍历其它所有键的值,以覆盖类似 {"建筑工程": {"拆除": [ ... ]}} 的层级
for value in node.values():
result = search_recursive(value)
if result is not None:
return result
return None
if isinstance(node, list):
for item in node:
result = search_recursive(item)
if result is not None:
return result
return None
# 其他类型(如 str/number/None)无需处理
return None
return search_recursive(expense_preview)
def load_project_data_and_find_costs(project_json_path, target_guid):
"""读取 project_data.json 并查找对应 GUID 的费用列表
集成项目类型判断:
- 若判定为预算工程(budget):沿用当前逻辑
- 若判定为清单工程(inventory):暂时也沿用相同逻辑(如需差异处理可再扩展)
"""
with open(project_json_path, "r", encoding="utf-8") as f:
project_data = json.load(f)
# 判断项目类型
project_type = _determine_project_type(project_data)
if project_type == "budget":
print("项目类型:预算工程(按现有逻辑处理)")
else:
print("项目类型:清单工程(暂按预算同样逻辑处理,若需差异化请告知)")
expense_preview = project_data.get("projectData", {}).get("expensePreview", {})
children = find_node_by_guid(expense_preview, target_guid)
if not children:
print(f"未找到 GUID 为 {target_guid} 的节点")
return {}
external_costs = {}
for item in children:
item_id = item.get("id", "")
cost_str = item.get("cost", "0")
try:
cost_val = float(cost_str)
except ValueError:
cost_val = 0.0
if item_id:
clean_id = re.sub(r"_\w+$", "", item_id)
external_costs[clean_id] = cost_val
return external_costs
def match_and_compare_costs(calc_costs, ext_costs, similarity_threshold=0.6):
"""匹配两个费用列表并对比"""
comparison = []
matched_ext = set()
for calc_name, calc_value in calc_costs.items():
best_match = None
best_score = 0
for ext_name in ext_costs:
if ext_name in matched_ext:
continue
score = calculate_similarity(normalize_cost_name(calc_name), normalize_cost_name(ext_name))
if score > best_score:
best_score = score
best_match = ext_name
if best_match and best_score >= similarity_threshold:
ext_value = ext_costs[best_match]
difference = calc_value - ext_value
comparison.append(
{
"项目": calc_name,
"参考值": ext_value,
"计算值": calc_value,
"差异": difference,
"原数据项": best_match,
"相似度": best_score,
}
)
matched_ext.add(best_match)
else:
comparison.append(
{
"项目": calc_name,
"参考值": None,
"计算值": calc_value,
"差异": None,
"原数据项": None,
"相似度": best_score,
}
)
# 添加未匹配的参考项
for ext_name, ext_value in ext_costs.items():
if ext_name not in matched_ext:
comparison.append(
{
"项目": None,
"参考值": ext_value,
"计算值": None,
"差异": None,
"原数据项": ext_name,
"相似度": None,
}
)
return comparison
def save_comparison_to_txt(comparison, output_txt_path):
"""保存对比结果到 TXT 文件"""
with open(output_txt_path, "w", encoding="utf-8") as f:
f.write(f"{'项目':<20} {'参考值':<25} {'计算值':<25} {'差异':<25} {'原数据项':<30}\n")
f.write("-" * 120 + "\n")
for item in comparison:
project = (item["项目"] or "").ljust(20)[:20]
ref = f"{item['参考值']:.2f}" if item["参考值"] is not None else ""
ref = ref.ljust(25)[:25]
calc = f"{item['计算值']:.2f}" if item["计算值"] is not None else ""
calc = calc.ljust(25)[:25]
diff = f"{item['差异']:.2f}" if item["差异"] is not None else ""
diff = diff.ljust(25)[:25]
original = (item["原数据项"] or "").ljust(30)[:30]
f.write(f"{project}{ref}{calc}{diff}{original}\n")
print(f"✅ 对比结果已保存至: {output_txt_path}")
def _determine_project_type(data):
"""
根据basicData中的"项目类型"或"工程类型"判断工程类型
:param data: 项目数据
:return: 'inventory' 表示清单工程,'budget' 表示预算工程
"""
# 项目类型名称映射字典:将各种变体映射到标准类型(预算/清单)
PROJECT_TYPE_MAPPING = {
"概预算工程": "预算",
"初步设计概算": "预算",
"可行性研究投资估算": "预算",
"施工图预算": "预算",
"配网定额计价": "预算",
"招标控制价": "清单",
"投标报价": "清单",
"招投标工程": "清单",
"配网清单招投标计价": "清单",
}
# 获取 basicData
basic_data = data.get("basicData") or {}
# 尝试获取 "项目类型",若不存在则尝试获取 "工程类型"
engineering_type = basic_data.get("项目类型") or basic_data.get("工程类型") or basic_data.get("工程类别")
if engineering_type:
# 去除前后空格
engineering_type = engineering_type.strip()
# 查找映射
mapped_type = PROJECT_TYPE_MAPPING.get(engineering_type)
if mapped_type == "预算":
print(f"根据项目类型 '{engineering_type}' 判断为预算工程")
return "budget"
elif mapped_type == "清单":
print(f"根据项目类型 '{engineering_type}' 判断为清单工程")
return "inventory"
else:
print(f"项目类型 '{engineering_type}' 未在映射中定义,跳过")
# 默认按预算工程处理,以保持当前对比逻辑不变
print("未能可靠判断项目类型,默认按预算工程处理")
return "budget"
def compare_costs_batch(calc_results_folder: str, project_data_json_path: str):
"""批量对比 calculation_results.json 与项目 JSON。
- 输出目录:在 calc_results_folder 下创建 comparison_results 保存结果。
- 根据 project_data_json_path 判定工程类型(预算/清单),以选择解析方式。
"""
# 输出对比结果的文件夹放在 calc_results_folder 内
output_folder = os.path.join(calc_results_folder, "comparison_results")
os.makedirs(output_folder, exist_ok=True)
# 支持的文件名关键词(可根据实际命名调整)
result_file_keyword = "_calculation_results.json"
# 预读取项目 JSON 并判定工程类型(供计算结果解析使用)
try:
with open(project_data_json_path, "r", encoding="utf-8") as f:
project_data_for_type = json.load(f)
project_type = _determine_project_type(project_data_for_type)
except Exception as e:
print(f"读取项目文件以判定类型失败,将默认按预算处理,错误: {e}")
project_type = "budget"
print(f"开始批量处理文件夹: {calc_results_folder}(项目类型: { '清单' if project_type=='inventory' else '预算' }")
processed_count = 0
# 递归扫描 calc_results_folder 下所有子目录,寻找结果文件
matched_files = []
for root, _dirs, files in os.walk(calc_results_folder):
for filename in files:
if filename.endswith(".json") and result_file_keyword in filename:
matched_files.append(os.path.join(root, filename))
if not matched_files:
print("未在任何子目录中发现 '*_calculation_results.json' 文件,请确认 BCL 结果输出位置与命名。")
for calc_json_path in matched_files:
filename = os.path.basename(calc_json_path)
try:
# 提取 GUID
guid = extract_guid_from_filename(filename)
if not guid:
print(f"⚠️ 无法从文件名提取 GUID: {filename}")
continue
print(f"\n📄 处理文件: {filename}")
print(f" 提取 GUID: {guid}")
# 读取计算结果(按工程类型解析)
calc_costs = load_calculation_results(calc_json_path, project_type=project_type)
print(f" 加载 {len(calc_costs)} 个计算费用项")
# 从主 JSON 获取参考费用
ext_costs = load_project_data_and_find_costs(project_data_json_path, guid)
print(f" 找到 {len(ext_costs)} 个参考费用项")
# 对比
comparison = match_and_compare_costs(calc_costs, ext_costs, similarity_threshold=0.6)
# 生成输出文件名(与原 JSON 同名,但输出到指定文件夹)
base_name = os.path.splitext(filename)[0]
output_txt_path = os.path.join(output_folder, base_name + ".txt")
# 保存结果
save_comparison_to_txt(comparison, output_txt_path)
processed_count += 1
except Exception as e:
print(f"❌ 处理文件 {filename} 时出错: {e}")
print(f"\n✅ 批量处理完成!共处理 {processed_count} 个文件。")
print(f"📊 所有对比结果已保存至: {output_folder}")
# --------------------------
# 测试入口:直接运行本文件
# --------------------------
def _main():
"""直接运行费用对比,无需命令行或输入"""
# ✅ 在这里直接填写你要测试的路径(可自行修改)
calc_dir = r"data/input/bclresults/2022通用定额细分3.1.16"
proj_json = r"data/input/merged/2022通用定额细分3.1.16.json"
# 检查路径是否存在
if not os.path.exists(calc_dir):
print(f"❌ BCL 结果文件夹不存在: {calc_dir}")
return
if not os.path.exists(proj_json):
print(f"❌ 项目 JSON 文件不存在: {proj_json}")
return
print(f"✅ BCL 计算结果目录: {calc_dir}")
print(f"✅ 项目 JSON 文件: {proj_json}")
print("🚀 开始执行费用对比...")
# 调用你的主函数
compare_costs_batch(calc_dir, proj_json)
print("🎉 费用对比完成!")
if __name__ == "__main__":
_main()