import json import os import re from difflib import SequenceMatcher def extract_guid_from_filename(filename): """从文件名中提取 GUID""" # 匹配 8-4-4-4-12 格式的 GUID pattern = r"([0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12})" match = re.search(pattern, filename, re.IGNORECASE) return match.group(1).upper() if match else None def normalize_cost_name(name): """标准化费用名称,去除特殊符号、后缀等,便于匹配""" # 去除 ID 后缀如 "_GJJ", "_BZHF" 等 name = re.sub(r"_\w+$", "", name) # 去除常见符号 name = re.sub(r"[^\w]", "", name) # 统一转小写 return name.lower() def calculate_similarity(a, b): """计算两个字符串的相似度""" return SequenceMatcher(None, a, b).ratio() def load_calculation_results(json_file_path): """读取计算结果 JSON,返回费用总和字典""" with open(json_file_path, "r", encoding="utf-8") as f: data = json.load(f) total_costs = {} for node_name, cost_dict in data.items(): for cost_name, value in cost_dict.items(): total_costs[cost_name] = total_costs.get(cost_name, 0) + value return total_costs def find_node_by_guid(expense_preview, target_guid): """ 在 expensePreview 中递归查找 GUID 对应的节点 """ def search_recursive(items): """在列表中递归查找 GUID""" for item in items: current_guid = item.get("GUID", "").strip("{}").upper() if current_guid == target_guid: return item.get("children", []) if "children" in item: result = search_recursive(item["children"]) if result is not None: return result return None # 遍历每个大类(如 "建筑工程") for category_name, category_data in expense_preview.items(): if not isinstance(category_data, dict): continue # 遍历每个子类(如 "建筑"、"安装") for subcategory_name, items in category_data.items(): if isinstance(items, list): result = search_recursive(items) if result is not None: return result return None def load_project_data_and_find_costs(project_json_path, target_guid): """读取 project_data.json 并查找对应 GUID 的费用列表""" with open(project_json_path, "r", encoding="utf-8") as f: project_data = json.load(f) expense_preview = project_data.get("projectData", {}).get("expensePreview", {}) children = find_node_by_guid(expense_preview, target_guid) if not children: print(f"未找到 GUID 为 {target_guid} 的节点") return {} external_costs = {} for item in children: # 有些节点只有 cost,没有 id(可能是合计) item_id = item.get("id", "") cost_str = item.get("cost", "0") try: cost_val = float(cost_str) except ValueError: cost_val = 0.0 if item_id: # 只提取有 id 的项 clean_id = re.sub(r"_\w+$", "", item_id) # 去掉 _GJJ 等后缀 external_costs[clean_id] = cost_val # 如果没有 id,可以考虑用其他方式标记,这里先忽略 return external_costs def match_and_compare_costs(calc_costs, ext_costs, similarity_threshold=0.6): """匹配两个费用列表并对比""" comparison = [] matched_ext = set() for calc_name, calc_value in calc_costs.items(): best_match = None best_score = 0 for ext_name in ext_costs: if ext_name in matched_ext: continue score = calculate_similarity(normalize_cost_name(calc_name), normalize_cost_name(ext_name)) if score > best_score: best_score = score best_match = ext_name # 判断是否足够相似 if best_match and best_score >= similarity_threshold: ext_value = ext_costs[best_match] difference = calc_value - ext_value comparison.append( { "项目": calc_name, "计算值": calc_value, "参考值": ext_value, "差异": difference, "匹配项": best_match, "相似度": best_score, } ) matched_ext.add(best_match) else: comparison.append( { "项目": calc_name, "计算值": calc_value, "参考值": None, "差异": None, "匹配项": None, "相似度": best_score, } ) # 添加未匹配的参考项 for ext_name, ext_value in ext_costs.items(): if ext_name not in matched_ext: comparison.append( {"项目": None, "计算值": None, "参考值": ext_value, "差异": None, "匹配项": ext_name, "相似度": None} ) return comparison def save_comparison_to_txt(comparison, output_txt_path): """保存对比结果到 TXT 文件,差异保留两位小数,交换计算值和参考值位置""" with open(output_txt_path, "w", encoding="utf-8") as f: # 表头:项目、参考值、计算值、差异、原数据项 f.write(f"{'项目':<20} {'参考值':<25} {'计算值':<25} {'差异':<25} {'原数据项':<30}\n") f.write("-" * 120 + "\n") for item in comparison: # 原始字段 project = (item["项目"] or "").ljust(20)[:20] # 最多20字符,左对齐 # 交换计算值和参考值的位置 ref = str(item["参考值"]) if item["参考值"] is not None else "" ref = ref.ljust(25)[:25] calc = str(item["计算值"]) if item["计算值"] is not None else "" calc = calc.ljust(25)[:25] # 最多25字符宽度 # 差异保留两位小数 if item["差异"] is not None: diff = f"{item['差异']:.2f}" else: diff = "" diff = diff.ljust(25)[:25] original = (item["匹配项"] or "").ljust(30)[:30] # 原数据项字段更宽 f.write(f"{project}{ref}{calc}{diff}{original}\n") print(f"对比结果已保存至: {output_txt_path}") def main(): # 配置路径 calculation_json_path = ( "project2json/outputs/bclresult/一般土建_496A54BB-8A38-4BE1-B116-AD4780E6874A_预算工程_calculation_results.json" ) project_data_json_path = "project2json/outputs/json/220kV变电站工程_readable.json" # 1. 提取 GUID guid = extract_guid_from_filename(calculation_json_path) if not guid: raise ValueError("无法从文件名中提取 GUID") print(f"提取到 GUID: {guid}") # 2. 读取计算结果并汇总 calc_costs = load_calculation_results(calculation_json_path) print(f"共加载 {len(calc_costs)} 个费用项") # 3. 从 project_data.json 中查找对应 GUID 的费用项 ext_costs = load_project_data_and_find_costs(project_data_json_path, guid) print(f"从 project_data 中找到 {len(ext_costs)} 个参考费用项") # 4. 匹配并对比 comparison = match_and_compare_costs(calc_costs, ext_costs, similarity_threshold=0.6) # 5. 输出到同名 .txt 文件 base_name = os.path.splitext(calculation_json_path)[0] output_txt_path = base_name + ".txt" save_comparison_to_txt(comparison, output_txt_path) if __name__ == "__main__": main()