Files
KG_generation/compare_BclCost.py
chentianrui 0a4dedda1c 更新代码
2025-10-14 16:13:18 +08:00

434 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
统计费用
"""
import os
import shutil
import time
from pathlib import Path
import traceback
import uuid
import random
import string
import csv
# 导入各个步骤需要的函数
from project2json.project_converter import convert_project_to_json
from transform_expense_preview import process_directory
from supplement_kg import costsummary_upwards
from equipment_calculation.main import bcl_calculate, parse_json_content
from cost_comparison import compare_costs_batch
import tempfile
# 基础目录
BASE_DIR = "project2json"
TEMP_DIR = tempfile.gettempdir() # 使用临时目录
# 生成随机8位ID
def generate_session_id():
return "".join(random.choices(string.ascii_uppercase + string.digits, k=8))
# 创建会话工作目录(仅会话根目录)
def create_session_directories(session_id):
# 仅创建会话根目录用于容纳每个文件的独立GUID临时目录
session_root = os.path.join(BASE_DIR, "outputs", session_id)
os.makedirs(session_root, exist_ok=True)
return {
"session_root": session_root,
}
# 为单个上传文件创建独立的GUID临时目录,包含六个子目录
def create_file_workdirs(session_id):
file_guid = uuid.uuid4().hex
root = os.path.join(BASE_DIR, "outputs", session_id, file_guid)
dirs = {
"root": root,
"upload_dir": os.path.join(root, "uploads"), # 生成json前的上传文件夹
"bcl_dir": os.path.join(root, "bcl"), # bcl计算文件文件夹(备用,当前流程未直接使用)
"json_dir": os.path.join(root, "json"), # 生成后的json文件夹
"merged_dir": os.path.join(root, "merged"),
"bcl_results_dir": os.path.join(root, "bclresults"),
"final_dir": os.path.join(root, "final"),
}
for d in dirs.values():
os.makedirs(d, exist_ok=True)
return dirs
# 整合的转化流程函数(无前端),执行步骤1到步骤3,并将步骤3.2/4替换为 compare_costs_batch
def convert_all_steps(files):
"""处理传入的工程文件列表,输出成功执行的文件数量。
流程:
1) 保存上传 -> 转JSON
2) 处理JSON结构并费用向上汇总(merged_dir)
3) BCL计算(输出到 bcl_results_dir
4) 调用 compare_costs_batch(bcl_results_dir, project_json) 进行费用对比
"""
try:
session_id = generate_session_id()
print(f"生成会话ID: {session_id}")
session_dirs = create_session_directories(session_id)
session_root = session_dirs["session_root"]
total_files = len(files) if files else 0
if total_files == 0:
print("未选择任何文件。")
return 0
total_success = 0
for idx, file in enumerate(files, start=1):
try:
file_name = os.path.basename(file if isinstance(file, str) else file.name)
print(f"[{idx}/{total_files}] 处理文件: {file_name}")
# 创建该文件的独立GUID临时目录
fdirs = create_file_workdirs(session_id)
upload_dir = fdirs["upload_dir"]
json_dir = fdirs["json_dir"]
merged_dir = fdirs["merged_dir"]
bcl_results_dir = fdirs["bcl_results_dir"]
# 步骤1.1: 保存上传的该文件
save_path = os.path.join(upload_dir, file_name)
shutil.copy(file if isinstance(file, str) else file.name, save_path)
# 步骤1.2: 转换为JSON
success, file_num = convert_project_to_json(upload_dir, json_dir, fdirs["bcl_dir"])
if not success:
raise RuntimeError("转换为JSON失败")
# 步骤1.3: 处理JSON文件结构
process_directory(json_dir)
# 步骤2: 费用向上汇总
result_step2 = costsummary_upwards(json_dir, merged_dir)
if not result_step2:
print("警告:未生成任何汇总JSON")
# 步骤3.1: 计算工程量取费表
bcl_calculate(merged_dir, bcl_results_dir, bcl_dir_path=fdirs["bcl_dir"])
# 选择项目JSON:从 merged_dir 中选择一个主 JSON(若多个则取第一个)
merged_jsons = [
os.path.join(merged_dir, f) for f in os.listdir(merged_dir) if f.lower().endswith(".json")
]
if not merged_jsons:
raise FileNotFoundError("在 merged_dir 中未找到项目 JSON")
project_data_json_path = merged_jsons[0]
# 步骤3.2/4: 调用 compare_costs_batch 进行费用对比
compare_costs_batch(bcl_results_dir, project_data_json_path)
total_success += 1
except Exception as fe:
print(f"处理文件 {file_name} 失败: {fe}\n{traceback.format_exc()}")
print(f"成功执行 {total_success} 个文件。")
return total_success
except Exception as e:
error_msg = f"执行流程出错: {str(e)}\n{traceback.format_exc()}"
print(error_msg)
return 0
# ========= 批处理与命令行入口(文件末尾追加) =========
def create_named_workdirs(output_base_dir: str, base_name: str):
"""在输出根目录下创建 '<工程名>_<GUID>' 的工作目录结构。"""
guid = uuid.uuid4().hex
root_name = f"{base_name}_{guid}"
root = os.path.join(output_base_dir, root_name)
dirs = {
"root": root,
"upload_dir": os.path.join(root, "uploads"),
"bcl_dir": os.path.join(root, "bcl"),
"json_dir": os.path.join(root, "json"),
"merged_dir": os.path.join(root, "merged"),
"bcl_results_dir": os.path.join(root, "bclresults"),
"final_dir": os.path.join(root, "final"),
}
for d in dirs.values():
os.makedirs(d, exist_ok=True)
return dirs
def run_batch_with_io(input_dir: str, output_dir: str) -> int:
"""批量处理输入目录下的工程文件,输出到指定的输出根目录。"""
input_dir = os.path.abspath(input_dir)
output_dir = os.path.abspath(output_dir)
os.makedirs(output_dir, exist_ok=True)
candidates = [
os.path.join(input_dir, f) for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))
]
if not candidates:
print(f"输入目录无文件: {input_dir}")
return 0
total_success = 0
for idx, file_path in enumerate(candidates, start=1):
base_name = os.path.basename(file_path)
stem = os.path.splitext(base_name)[0]
print(f"[{idx}/{len(candidates)}] 处理文件: {base_name}")
try:
# 1) 为该工程创建独立工作区
fdirs = create_named_workdirs(output_dir, stem)
upload_dir = fdirs["upload_dir"]
json_dir = fdirs["json_dir"]
merged_dir = fdirs["merged_dir"]
bcl_results_dir = fdirs["bcl_results_dir"]
bcl_dir = fdirs["bcl_dir"]
# 2) 保存上传
save_path = os.path.join(upload_dir, base_name)
shutil.copy(file_path, save_path)
# 3) 转换为JSON(并将 BCL 最佳版本复制到 bcl_dir
success, _file_num = convert_project_to_json(upload_dir, json_dir, bcl_dir)
if not success:
raise RuntimeError("转换为JSON失败")
# 4) 处理JSON结构
process_directory(json_dir)
# 5) 费用向上汇总
result_step2 = costsummary_upwards(json_dir, merged_dir)
if not result_step2:
print("警告:未生成任何汇总JSON")
# 6) 计算工程量取费表(读取 bcl_dir,输出到 bcl_results_dir
bcl_calculate(merged_dir, bcl_results_dir, bcl_dir_path=bcl_dir)
# 7) 选择一个项目 JSON 进行费用对比
merged_jsons = [os.path.join(merged_dir, f) for f in os.listdir(merged_dir) if f.lower().endswith(".json")]
if not merged_jsons:
raise FileNotFoundError("在 merged_dir 中未找到项目 JSON")
project_data_json_path = merged_jsons[0]
# 8) 费用对比
compare_costs_batch(bcl_results_dir, project_data_json_path)
total_success += 1
except Exception as fe:
print(f"处理文件 {base_name} 失败: {fe}\n{traceback.format_exc()}")
print(f"成功执行 {total_success}/{len(candidates)} 个文件。输出根目录: {output_dir}")
return total_success
# ========= 新增:嵌套目录遍历与结果抽查/汇总 =========
def _find_leaf_dirs(root_dir: str):
"""找到 root_dir 下的所有最深层级子文件夹(没有进一步子目录的文件夹)。"""
leaf_dirs = []
for current, dirs, _files in os.walk(root_dir):
# 只要该目录没有子目录,则视为叶子目录
if not dirs:
leaf_dirs.append(current)
return leaf_dirs
def _iter_comparison_results_dirs(bcl_results_dir: str):
"""在 bcl_results_dir 下递归查找名为 'comparison_results' 的目录,返回其绝对路径列表。"""
found = []
for current, dirs, _files in os.walk(bcl_results_dir):
for d in dirs:
if d == "comparison_results":
found.append(os.path.join(current, d))
return found
def _parse_diff_value_from_line(line: str) -> float | None:
"""从一行对齐文本中解析“差异”列的值。
文本列宽(在 cost_comparison.save_comparison_to_txt 中定义):
- 项目: 20
- 参考值: 25
- 计算值: 25
- 差异: 25 <-- 我们需要的列
- 原数据项: 30
"""
try:
# 按固定宽度切片
start = 20 + 25 + 25
end = start + 25
cell = line[start:end].strip()
if not cell:
return None
# 去掉可能的对齐空格,再解析为 float
val = float(cell)
return val
except Exception:
return None
def _evaluate_comparison_folder(comp_dir: str, sample_k: int = 10) -> str:
"""随机抽查 comp_dir 下的 TXT 文件,若任意“差异”列存在非 0/-0 值则判定为“有问题”,否则为“正确”。"""
txt_files = [os.path.join(comp_dir, f) for f in os.listdir(comp_dir) if f.lower().endswith(".txt")]
if not txt_files:
# 无文件视为有问题(或按需 "正确"),这里更稳妥地标记为有问题
return "有问题"
random.shuffle(txt_files)
subset = txt_files[: min(sample_k, len(txt_files))]
for path in subset:
try:
with open(path, "r", encoding="utf-8") as f:
lines = f.readlines()
# 跳过前两行(表头和分隔线)
for line in lines[2:]:
diff_val = _parse_diff_value_from_line(line)
if diff_val is None:
continue
# 允许 -0.0 视为 0
if abs(diff_val) > 0.0:
return "有问题"
except Exception:
return "有问题"
return "正确"
def _append_results_to_csv(records: list[tuple[str, str, str, str]], csv_path: str):
"""将 (文件路径, 软件类别, 项目类型, 计算结果) 记录写入 CSV(追加/创建)。"""
header = ["文件路径", "软件类别", "项目类型", "计算结果"]
file_exists = os.path.exists(csv_path)
with open(csv_path, "a", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(header)
for row in records:
writer.writerow(list(row))
def run_nested_batch_with_io(
input_root: str, output_root: str, sample_k: int = 10, summary_csv_path: str | None = None
) -> int:
"""遍历嵌套目录中每个最深层级的子文件夹,按现有流程处理其中所有工程文件。
并在完成每个工程的第 8 步(费用对比)后,定位 bclresults 下的所有 comparison_results 文件夹,随机抽查 10 个 TXT,
将判定结果汇总到一个 CSV(列:文件路径、工程名称、计算结果)。
"""
input_root = os.path.abspath(input_root)
output_root = os.path.abspath(output_root)
os.makedirs(output_root, exist_ok=True)
# 提前确定并创建汇总 CSV 路径,后续逐条写入,避免中途异常导致整体丢失
# 需求:多次运行不要覆盖历史数据 -> 当未显式指定时,采用固定文件名并使用追加写入
if summary_csv_path is None:
summary_csv_path = os.path.join(output_root, "comparison_validation_summary.csv")
else:
# 若显式指定了路径,确保其父目录存在
csv_dir = os.path.dirname(summary_csv_path)
if csv_dir:
os.makedirs(csv_dir, exist_ok=True)
leaf_dirs = _find_leaf_dirs(input_root)
if not leaf_dirs:
print(f"未找到任何叶子目录: {input_root}")
return 0
print(f"共发现 {len(leaf_dirs)} 个最深层级子文件夹,开始逐一处理……")
total_success = 0
for didx, leaf in enumerate(leaf_dirs, start=1):
# 收集该叶子目录下的所有文件
files = [os.path.join(leaf, f) for f in os.listdir(leaf) if os.path.isfile(os.path.join(leaf, f))]
if not files:
print(f"[{didx}/{len(leaf_dirs)}] 叶子目录无文件,跳过: {leaf}")
continue
print(f"[{didx}/{len(leaf_dirs)}] 处理叶子目录: {leaf}{len(files)} 个文件)")
for fidx, file_path in enumerate(files, start=1):
base_name = os.path.basename(file_path)
stem = os.path.splitext(base_name)[0]
print(f" - ({fidx}/{len(files)}) 处理文件: {base_name}")
try:
# 为该工程创建独立工作区
fdirs = create_named_workdirs(output_root, stem)
upload_dir = fdirs["upload_dir"]
json_dir = fdirs["json_dir"]
merged_dir = fdirs["merged_dir"]
bcl_results_dir = fdirs["bcl_results_dir"]
bcl_dir = fdirs["bcl_dir"]
# 保存上传
save_path = os.path.join(upload_dir, base_name)
shutil.copy(file_path, save_path)
# 转 JSON
success, _file_num = convert_project_to_json(upload_dir, json_dir, bcl_dir)
if not success:
raise RuntimeError("转换为JSON失败")
# 处理 JSON
process_directory(json_dir)
# 费用向上汇总
_ = costsummary_upwards(json_dir, merged_dir)
# BCL 计算
bcl_calculate(merged_dir, bcl_results_dir, bcl_dir_path=bcl_dir)
# 选择一个项目 JSON 进行费用对比
merged_jsons = [
os.path.join(merged_dir, nf) for nf in os.listdir(merged_dir) if nf.lower().endswith(".json")
]
if not merged_jsons:
raise FileNotFoundError("在 merged_dir 中未找到项目 JSON")
project_data_json_path = merged_jsons[0]
# 费用对比
compare_costs_batch(bcl_results_dir, project_data_json_path)
# 从项目 JSON 中解析 软件类别/项目类型
try:
category, project_type, engineering_type = parse_json_content(project_data_json_path)
except Exception:
category, project_type = "未知", "未知"
# 对比完成后:在 bclresults 下寻找 comparison_results 目录,并进行抽查
comp_dirs = _iter_comparison_results_dirs(bcl_results_dir)
if not comp_dirs:
# 若未找到任何 comparison_results,则记录为有问题(立即写入 CSV)
_append_results_to_csv([(bcl_results_dir, category, project_type, "有问题")], summary_csv_path)
else:
for comp_dir in comp_dirs:
parent_dir = os.path.dirname(comp_dir)
result = _evaluate_comparison_folder(comp_dir, sample_k=sample_k)
_append_results_to_csv([(parent_dir, category, project_type, result)], summary_csv_path)
total_success += 1
except Exception as fe:
print(f" ❌ 处理文件 {base_name} 失败: {fe}\n{traceback.format_exc()}")
print(f"\n📄 抽查结果 CSV 路径: {summary_csv_path}")
print(f"\n✅ 嵌套目录批处理完成。成功处理工程数: {total_success}")
return total_success
if __name__ == "__main__":
input_path = r"E:\文件\LLM_model\RAG\code\Engineering_data_KG-1\KG_generation\data\input\uploads" # 请修改为你的实际输入嵌套目录根
output_path = r"data/input" # 请修改为你的实际输出根目录
summary_csv = None # 可选:指定汇总 CSV 路径;为 None 时自动生成
t0 = time.time()
try:
count = run_nested_batch_with_io(input_path, output_path, sample_k=10, summary_csv_path=summary_csv)
print(f"\n🎉 ✅ 处理完成,共成功 {count} 个。耗时: {int(time.time() - t0)} 秒")
except Exception as e:
print(f"\n❌ 执行失败: {e}\n{traceback.format_exc()}")
raise