""" 批量对比多个 calculation_results.json 与 project_data.json 中的费用 """ import json import os import re from difflib import SequenceMatcher def extract_guid_from_filename(filename): """从文件名中提取 GUID""" pattern = r"([0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12})" match = re.search(pattern, filename, re.IGNORECASE) return match.group(1).upper() if match else None def normalize_cost_name(name): """标准化费用名称,便于匹配""" name = re.sub(r"_\w+$", "", name) # 去除后缀如 _GJJ name = re.sub(r"[^\w]", "", name) # 去除符号 return name.lower() def calculate_similarity(a, b): """计算两个字符串的相似度""" return SequenceMatcher(None, a, b).ratio() def load_calculation_results(json_file_path, project_type: str = "budget"): """读取计算结果 JSON,返回费用总和字典 - budget(预算工程): 期望结构为 { node: { cost_name: number, ... }, ... } 逐项累加 cost_name -> number。 - inventory(清单工程): 结构通常为 { category: { item: { metric_name: number, ... } } } 将最内层 metrics(如“合价/直接费/人工费/材料费/机械费/措施费/间接费/安全文明施工费”等)按 metric_name 汇总累计。 """ with open(json_file_path, "r", encoding="utf-8") as f: data = json.load(f) total_costs = {} if project_type == "inventory": # 三级结构:大类 -> 清单项 -> 指标dict for _, items in (data or {}).items(): if isinstance(items, dict): for _, metrics in items.items(): if isinstance(metrics, dict): for metric_name, metric_val in metrics.items(): # 仅累计数值型 if isinstance(metric_val, (int, float)): total_costs[metric_name] = total_costs.get(metric_name, 0) + float(metric_val) else: # 预算:两级结构:节点 -> 费用名: 数值 for _, cost_dict in (data or {}).items(): if isinstance(cost_dict, dict): for cost_name, value in cost_dict.items(): if isinstance(value, (int, float)): total_costs[cost_name] = total_costs.get(cost_name, 0) + float(value) return total_costs def find_node_by_guid(expense_preview, target_guid): """在 expensePreview 中递归查找 GUID 对应的节点,并返回其 sum 列表用于对比。 适配新结构:每个 GUID 节点包含三个列表: - sum: 费用条目(含 id/cost)——用于对比 - children: 递归子节点列表(继续下钻) - rcj: 其他数据(此处忽略) """ def search_recursive(node): """统一遍历任意层级结构(dict/list),匹配 GUID 并返回其 children 列表""" if isinstance(node, dict): # 当前节点如果带 GUID,先判断自身是否命中 current_guid = node.get("GUID", "") if isinstance(current_guid, str) and current_guid: current_guid = current_guid.strip("{}").upper() if current_guid == target_guid: # 命中后返回 sum 列表作为对比项来源 return node.get("sum", []) # 若存在 children,优先深入 children(继续查找更深层 GUID) if "children" in node and isinstance(node["children"], list): result = search_recursive(node["children"]) if result is not None: return result # 继续遍历其它所有键的值,以覆盖类似 {"建筑工程": {"拆除": [ ... ]}} 的层级 for value in node.values(): result = search_recursive(value) if result is not None: return result return None if isinstance(node, list): for item in node: result = search_recursive(item) if result is not None: return result return None # 其他类型(如 str/number/None)无需处理 return None return search_recursive(expense_preview) def load_project_data_and_find_costs(project_json_path, target_guid): """读取 project_data.json 并查找对应 GUID 的费用列表 集成项目类型判断: - 若判定为预算工程(budget):沿用当前逻辑 - 若判定为清单工程(inventory):暂时也沿用相同逻辑(如需差异处理可再扩展) """ with open(project_json_path, "r", encoding="utf-8") as f: project_data = json.load(f) # 判断项目类型 project_type = _determine_project_type(project_data) if project_type == "budget": print("项目类型:预算工程(按现有逻辑处理)") else: print("项目类型:清单工程(暂按预算同样逻辑处理,若需差异化请告知)") expense_preview = project_data.get("projectData", {}).get("expensePreview", {}) children = find_node_by_guid(expense_preview, target_guid) if not children: print(f"未找到 GUID 为 {target_guid} 的节点") return {} external_costs = {} for item in children: item_id = item.get("id", "") cost_str = item.get("cost", "0") try: cost_val = float(cost_str) except ValueError: cost_val = 0.0 if item_id: clean_id = re.sub(r"_\w+$", "", item_id) external_costs[clean_id] = cost_val return external_costs def match_and_compare_costs(calc_costs, ext_costs, similarity_threshold=0.6): """匹配两个费用列表并对比""" comparison = [] matched_ext = set() for calc_name, calc_value in calc_costs.items(): best_match = None best_score = 0 for ext_name in ext_costs: if ext_name in matched_ext: continue score = calculate_similarity(normalize_cost_name(calc_name), normalize_cost_name(ext_name)) if score > best_score: best_score = score best_match = ext_name if best_match and best_score >= similarity_threshold: ext_value = ext_costs[best_match] difference = calc_value - ext_value comparison.append( { "项目": calc_name, "参考值": ext_value, "计算值": calc_value, "差异": difference, "原数据项": best_match, "相似度": best_score, } ) matched_ext.add(best_match) else: comparison.append( { "项目": calc_name, "参考值": None, "计算值": calc_value, "差异": None, "原数据项": None, "相似度": best_score, } ) # 添加未匹配的参考项 for ext_name, ext_value in ext_costs.items(): if ext_name not in matched_ext: comparison.append( { "项目": None, "参考值": ext_value, "计算值": None, "差异": None, "原数据项": ext_name, "相似度": None, } ) return comparison def save_comparison_to_txt(comparison, output_txt_path): """保存对比结果到 TXT 文件""" with open(output_txt_path, "w", encoding="utf-8") as f: f.write(f"{'项目':<20} {'参考值':<25} {'计算值':<25} {'差异':<25} {'原数据项':<30}\n") f.write("-" * 120 + "\n") for item in comparison: project = (item["项目"] or "").ljust(20)[:20] ref = f"{item['参考值']:.2f}" if item["参考值"] is not None else "" ref = ref.ljust(25)[:25] calc = f"{item['计算值']:.2f}" if item["计算值"] is not None else "" calc = calc.ljust(25)[:25] diff = f"{item['差异']:.2f}" if item["差异"] is not None else "" diff = diff.ljust(25)[:25] original = (item["原数据项"] or "").ljust(30)[:30] f.write(f"{project}{ref}{calc}{diff}{original}\n") print(f"✅ 对比结果已保存至: {output_txt_path}") def _determine_project_type(data): """ 根据basicData中的"项目类型"或"工程类型"判断工程类型 :param data: 项目数据 :return: 'inventory' 表示清单工程,'budget' 表示预算工程 """ # 项目类型名称映射字典:将各种变体映射到标准类型(预算/清单) PROJECT_TYPE_MAPPING = { "概预算工程": "预算", "初步设计概算": "预算", "可行性研究投资估算": "预算", "施工图预算": "预算", "配网定额计价": "预算", "招标控制价": "清单", "投标报价": "清单", "招投标工程": "清单", "配网清单招投标计价": "清单", } # 获取 basicData basic_data = data.get("basicData") or {} # 尝试获取 "项目类型",若不存在则尝试获取 "工程类型" engineering_type = basic_data.get("项目类型") or basic_data.get("工程类型") or basic_data.get("工程类别") if engineering_type: # 去除前后空格 engineering_type = engineering_type.strip() # 查找映射 mapped_type = PROJECT_TYPE_MAPPING.get(engineering_type) if mapped_type == "预算": print(f"根据项目类型 '{engineering_type}' 判断为预算工程") return "budget" elif mapped_type == "清单": print(f"根据项目类型 '{engineering_type}' 判断为清单工程") return "inventory" else: print(f"项目类型 '{engineering_type}' 未在映射中定义,跳过") # 默认按预算工程处理,以保持当前对比逻辑不变 print("未能可靠判断项目类型,默认按预算工程处理") return "budget" def compare_costs_batch(calc_results_folder: str, project_data_json_path: str): """批量对比 calculation_results.json 与项目 JSON。 - 输出目录:在 calc_results_folder 下创建 comparison_results 保存结果。 - 根据 project_data_json_path 判定工程类型(预算/清单),以选择解析方式。 """ # 输出对比结果的文件夹放在 calc_results_folder 内 output_folder = os.path.join(calc_results_folder, "comparison_results") os.makedirs(output_folder, exist_ok=True) # 支持的文件名关键词(可根据实际命名调整) result_file_keyword = "_calculation_results.json" # 预读取项目 JSON 并判定工程类型(供计算结果解析使用) try: with open(project_data_json_path, "r", encoding="utf-8") as f: project_data_for_type = json.load(f) project_type = _determine_project_type(project_data_for_type) except Exception as e: print(f"读取项目文件以判定类型失败,将默认按预算处理,错误: {e}") project_type = "budget" print(f"开始批量处理文件夹: {calc_results_folder}(项目类型: { '清单' if project_type=='inventory' else '预算' })") processed_count = 0 # 递归扫描 calc_results_folder 下所有子目录,寻找结果文件 matched_files = [] for root, _dirs, files in os.walk(calc_results_folder): for filename in files: if filename.endswith(".json") and result_file_keyword in filename: matched_files.append(os.path.join(root, filename)) if not matched_files: print("未在任何子目录中发现 '*_calculation_results.json' 文件,请确认 BCL 结果输出位置与命名。") for calc_json_path in matched_files: filename = os.path.basename(calc_json_path) try: # 提取 GUID guid = extract_guid_from_filename(filename) if not guid: print(f"⚠️ 无法从文件名提取 GUID: {filename}") continue print(f"\n📄 处理文件: {filename}") print(f" 提取 GUID: {guid}") # 读取计算结果(按工程类型解析) calc_costs = load_calculation_results(calc_json_path, project_type=project_type) print(f" 加载 {len(calc_costs)} 个计算费用项") # 从主 JSON 获取参考费用 ext_costs = load_project_data_and_find_costs(project_data_json_path, guid) print(f" 找到 {len(ext_costs)} 个参考费用项") # 对比 comparison = match_and_compare_costs(calc_costs, ext_costs, similarity_threshold=0.6) # 生成输出文件名(与原 JSON 同名,但输出到指定文件夹) base_name = os.path.splitext(filename)[0] output_txt_path = os.path.join(output_folder, base_name + ".txt") # 保存结果 save_comparison_to_txt(comparison, output_txt_path) processed_count += 1 except Exception as e: print(f"❌ 处理文件 {filename} 时出错: {e}") print(f"\n✅ 批量处理完成!共处理 {processed_count} 个文件。") print(f"📊 所有对比结果已保存至: {output_folder}") # -------------------------- # 测试入口:直接运行本文件 # -------------------------- def _main(): """直接运行费用对比,无需命令行或输入""" # ✅ 在这里直接填写你要测试的路径(可自行修改) calc_dir = r"data/input/bclresults/2022通用定额细分3.1.16" proj_json = r"data/input/merged/2022通用定额细分3.1.16.json" # 检查路径是否存在 if not os.path.exists(calc_dir): print(f"❌ BCL 结果文件夹不存在: {calc_dir}") return if not os.path.exists(proj_json): print(f"❌ 项目 JSON 文件不存在: {proj_json}") return print(f"✅ BCL 计算结果目录: {calc_dir}") print(f"✅ 项目 JSON 文件: {proj_json}") print("🚀 开始执行费用对比...") # 调用你的主函数 compare_costs_batch(calc_dir, proj_json) print("🎉 费用对比完成!") if __name__ == "__main__": _main()