Files
KG_generation/cost_comparison.py
T
2025-08-22 18:13:09 +08:00

235 lines
8.0 KiB
Python

"""
批量对比多个 calculation_results.json 与 project_data.json 中的费用
"""
import json
import os
import re
from difflib import SequenceMatcher
def extract_guid_from_filename(filename):
"""从文件名中提取 GUID"""
pattern = r"([0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12})"
match = re.search(pattern, filename, re.IGNORECASE)
return match.group(1).upper() if match else None
def normalize_cost_name(name):
"""标准化费用名称,便于匹配"""
name = re.sub(r"_\w+$", "", name) # 去除后缀如 _GJJ
name = re.sub(r"[^\w]", "", name) # 去除符号
return name.lower()
def calculate_similarity(a, b):
"""计算两个字符串的相似度"""
return SequenceMatcher(None, a, b).ratio()
def load_calculation_results(json_file_path):
"""读取计算结果 JSON,返回费用总和字典"""
with open(json_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
total_costs = {}
for node_name, cost_dict in data.items():
for cost_name, value in cost_dict.items():
total_costs[cost_name] = total_costs.get(cost_name, 0) + value
return total_costs
def find_node_by_guid(expense_preview, target_guid):
"""在 expensePreview 中递归查找 GUID 对应的节点"""
def search_recursive(items):
for item in items:
current_guid = item.get("GUID", "").strip("{}").upper()
if current_guid == target_guid:
return item.get("children", [])
if "children" in item:
result = search_recursive(item["children"])
if result is not None:
return result
return None
for category_name, items in expense_preview.items():
if isinstance(items, list):
result = search_recursive(items)
if result is not None:
return result
return None
def load_project_data_and_find_costs(project_json_path, target_guid):
"""读取 project_data.json 并查找对应 GUID 的费用列表"""
with open(project_json_path, "r", encoding="utf-8") as f:
project_data = json.load(f)
expense_preview = project_data.get("projectData", {}).get("expensePreview", {})
children = find_node_by_guid(expense_preview, target_guid)
if not children:
print(f"未找到 GUID 为 {target_guid} 的节点")
return {}
external_costs = {}
for item in children:
item_id = item.get("id", "")
cost_str = item.get("cost", "0")
try:
cost_val = float(cost_str)
except ValueError:
cost_val = 0.0
if item_id:
clean_id = re.sub(r"_\w+$", "", item_id)
external_costs[clean_id] = cost_val
return external_costs
def match_and_compare_costs(calc_costs, ext_costs, similarity_threshold=0.6):
"""匹配两个费用列表并对比"""
comparison = []
matched_ext = set()
for calc_name, calc_value in calc_costs.items():
best_match = None
best_score = 0
for ext_name in ext_costs:
if ext_name in matched_ext:
continue
score = calculate_similarity(normalize_cost_name(calc_name), normalize_cost_name(ext_name))
if score > best_score:
best_score = score
best_match = ext_name
if best_match and best_score >= similarity_threshold:
ext_value = ext_costs[best_match]
difference = calc_value - ext_value
comparison.append(
{
"项目": calc_name,
"参考值": ext_value,
"计算值": calc_value,
"差异": difference,
"原数据项": best_match,
"相似度": best_score,
}
)
matched_ext.add(best_match)
else:
comparison.append(
{
"项目": calc_name,
"参考值": None,
"计算值": calc_value,
"差异": None,
"原数据项": None,
"相似度": best_score,
}
)
# 添加未匹配的参考项
for ext_name, ext_value in ext_costs.items():
if ext_name not in matched_ext:
comparison.append(
{
"项目": None,
"参考值": ext_value,
"计算值": None,
"差异": None,
"原数据项": ext_name,
"相似度": None,
}
)
return comparison
def save_comparison_to_txt(comparison, output_txt_path):
"""保存对比结果到 TXT 文件"""
with open(output_txt_path, "w", encoding="utf-8") as f:
f.write(f"{'项目':<20} {'参考值':<25} {'计算值':<25} {'差异':<25} {'原数据项':<30}\n")
f.write("-" * 120 + "\n")
for item in comparison:
project = (item["项目"] or "").ljust(20)[:20]
ref = f"{item['参考值']:.2f}" if item["参考值"] is not None else ""
ref = ref.ljust(25)[:25]
calc = f"{item['计算值']:.2f}" if item["计算值"] is not None else ""
calc = calc.ljust(25)[:25]
diff = f"{item['差异']:.2f}" if item["差异"] is not None else ""
diff = diff.ljust(25)[:25]
original = (item["原数据项"] or "").ljust(30)[:30]
f.write(f"{project}{ref}{calc}{diff}{original}\n")
print(f"✅ 对比结果已保存至: {output_txt_path}")
def main():
# ================== 配置路径 ==================
# 存放所有 calculation_results.json 的文件夹
calc_results_folder = "project2json/outputs/bclresults/变电检修国网"
# 主 project_data.json 路径(参考数据源)
project_data_json_path = "project2json/outputs/json/变电检修国网.json"
# 输出对比结果的文件夹
output_folder = "project2json/outputs/comparison_results"
os.makedirs(output_folder, exist_ok=True)
# 支持的文件名关键词(可根据实际命名调整)
result_file_keyword = "_calculation_results.json"
# ==================================================
print(f"开始批量处理文件夹: {calc_results_folder}")
processed_count = 0
for filename in os.listdir(calc_results_folder):
if not filename.endswith(".json") or result_file_keyword not in filename:
continue
calc_json_path = os.path.join(calc_results_folder, filename)
try:
# 提取 GUID
guid = extract_guid_from_filename(filename)
if not guid:
print(f"⚠️ 无法从文件名提取 GUID: {filename}")
continue
print(f"\n📄 处理文件: {filename}")
print(f" 提取 GUID: {guid}")
# 读取计算结果
calc_costs = load_calculation_results(calc_json_path)
print(f" 加载 {len(calc_costs)} 个计算费用项")
# 从主 JSON 获取参考费用
ext_costs = load_project_data_and_find_costs(project_data_json_path, guid)
print(f" 找到 {len(ext_costs)} 个参考费用项")
# 对比
comparison = match_and_compare_costs(calc_costs, ext_costs, similarity_threshold=0.6)
# 生成输出文件名(与原 JSON 同名,但输出到指定文件夹)
base_name = os.path.splitext(filename)[0]
output_txt_path = os.path.join(output_folder, base_name + ".txt")
# 保存结果
save_comparison_to_txt(comparison, output_txt_path)
processed_count += 1
except Exception as e:
print(f"❌ 处理文件 {filename} 时出错: {e}")
print(f"\n✅ 批量处理完成!共处理 {processed_count} 个文件。")
print(f"📊 所有对比结果已保存至: {output_folder}")
if __name__ == "__main__":
main()