434 lines
17 KiB
Python
434 lines
17 KiB
Python
"""
|
||
统计费用
|
||
"""
|
||
|
||
import os
|
||
import shutil
|
||
import time
|
||
from pathlib import Path
|
||
import traceback
|
||
import uuid
|
||
import random
|
||
import string
|
||
import csv
|
||
|
||
# 导入各个步骤需要的函数
|
||
from project2json.project_converter import convert_project_to_json
|
||
from transform_expense_preview import process_directory
|
||
from supplement_kg import costsummary_upwards
|
||
from equipment_calculation.main import bcl_calculate, parse_json_content
|
||
from cost_comparison import compare_costs_batch
|
||
import tempfile
|
||
|
||
# 基础目录
|
||
BASE_DIR = "project2json"
|
||
TEMP_DIR = tempfile.gettempdir() # 使用临时目录
|
||
|
||
|
||
# 生成随机8位ID
|
||
def generate_session_id():
|
||
return "".join(random.choices(string.ascii_uppercase + string.digits, k=8))
|
||
|
||
|
||
# 创建会话工作目录(仅会话根目录)
|
||
def create_session_directories(session_id):
|
||
# 仅创建会话根目录用于容纳每个文件的独立GUID临时目录
|
||
session_root = os.path.join(BASE_DIR, "outputs", session_id)
|
||
os.makedirs(session_root, exist_ok=True)
|
||
return {
|
||
"session_root": session_root,
|
||
}
|
||
|
||
|
||
# 为单个上传文件创建独立的GUID临时目录,包含六个子目录
|
||
def create_file_workdirs(session_id):
|
||
file_guid = uuid.uuid4().hex
|
||
root = os.path.join(BASE_DIR, "outputs", session_id, file_guid)
|
||
dirs = {
|
||
"root": root,
|
||
"upload_dir": os.path.join(root, "uploads"), # 生成json前的上传文件夹
|
||
"bcl_dir": os.path.join(root, "bcl"), # bcl计算文件文件夹(备用,当前流程未直接使用)
|
||
"json_dir": os.path.join(root, "json"), # 生成后的json文件夹
|
||
"merged_dir": os.path.join(root, "merged"),
|
||
"bcl_results_dir": os.path.join(root, "bclresults"),
|
||
"final_dir": os.path.join(root, "final"),
|
||
}
|
||
|
||
for d in dirs.values():
|
||
os.makedirs(d, exist_ok=True)
|
||
|
||
return dirs
|
||
|
||
|
||
# 整合的转化流程函数(无前端),执行步骤1到步骤3,并将步骤3.2/4替换为 compare_costs_batch
|
||
def convert_all_steps(files):
|
||
"""处理传入的工程文件列表,输出成功执行的文件数量。
|
||
|
||
流程:
|
||
1) 保存上传 -> 转JSON
|
||
2) 处理JSON结构并费用向上汇总(merged_dir)
|
||
3) BCL计算(输出到 bcl_results_dir)
|
||
4) 调用 compare_costs_batch(bcl_results_dir, project_json) 进行费用对比
|
||
"""
|
||
try:
|
||
session_id = generate_session_id()
|
||
print(f"生成会话ID: {session_id}")
|
||
session_dirs = create_session_directories(session_id)
|
||
session_root = session_dirs["session_root"]
|
||
|
||
total_files = len(files) if files else 0
|
||
if total_files == 0:
|
||
print("未选择任何文件。")
|
||
return 0
|
||
|
||
total_success = 0
|
||
|
||
for idx, file in enumerate(files, start=1):
|
||
try:
|
||
file_name = os.path.basename(file if isinstance(file, str) else file.name)
|
||
print(f"[{idx}/{total_files}] 处理文件: {file_name}")
|
||
|
||
# 创建该文件的独立GUID临时目录
|
||
fdirs = create_file_workdirs(session_id)
|
||
upload_dir = fdirs["upload_dir"]
|
||
json_dir = fdirs["json_dir"]
|
||
merged_dir = fdirs["merged_dir"]
|
||
bcl_results_dir = fdirs["bcl_results_dir"]
|
||
|
||
# 步骤1.1: 保存上传的该文件
|
||
save_path = os.path.join(upload_dir, file_name)
|
||
shutil.copy(file if isinstance(file, str) else file.name, save_path)
|
||
|
||
# 步骤1.2: 转换为JSON
|
||
success, file_num = convert_project_to_json(upload_dir, json_dir, fdirs["bcl_dir"])
|
||
if not success:
|
||
raise RuntimeError("转换为JSON失败")
|
||
|
||
# 步骤1.3: 处理JSON文件结构
|
||
process_directory(json_dir)
|
||
|
||
# 步骤2: 费用向上汇总
|
||
result_step2 = costsummary_upwards(json_dir, merged_dir)
|
||
if not result_step2:
|
||
print("警告:未生成任何汇总JSON")
|
||
|
||
# 步骤3.1: 计算工程量取费表
|
||
bcl_calculate(merged_dir, bcl_results_dir, bcl_dir_path=fdirs["bcl_dir"])
|
||
|
||
# 选择项目JSON:从 merged_dir 中选择一个主 JSON(若多个则取第一个)
|
||
merged_jsons = [
|
||
os.path.join(merged_dir, f) for f in os.listdir(merged_dir) if f.lower().endswith(".json")
|
||
]
|
||
if not merged_jsons:
|
||
raise FileNotFoundError("在 merged_dir 中未找到项目 JSON")
|
||
project_data_json_path = merged_jsons[0]
|
||
|
||
# 步骤3.2/4: 调用 compare_costs_batch 进行费用对比
|
||
compare_costs_batch(bcl_results_dir, project_data_json_path)
|
||
|
||
total_success += 1
|
||
except Exception as fe:
|
||
print(f"处理文件 {file_name} 失败: {fe}\n{traceback.format_exc()}")
|
||
|
||
print(f"成功执行 {total_success} 个文件。")
|
||
return total_success
|
||
except Exception as e:
|
||
error_msg = f"执行流程出错: {str(e)}\n{traceback.format_exc()}"
|
||
print(error_msg)
|
||
return 0
|
||
|
||
|
||
# ========= 批处理与命令行入口(文件末尾追加) =========
|
||
def create_named_workdirs(output_base_dir: str, base_name: str):
|
||
"""在输出根目录下创建 '<工程名>_<GUID>' 的工作目录结构。"""
|
||
guid = uuid.uuid4().hex
|
||
root_name = f"{base_name}_{guid}"
|
||
root = os.path.join(output_base_dir, root_name)
|
||
dirs = {
|
||
"root": root,
|
||
"upload_dir": os.path.join(root, "uploads"),
|
||
"bcl_dir": os.path.join(root, "bcl"),
|
||
"json_dir": os.path.join(root, "json"),
|
||
"merged_dir": os.path.join(root, "merged"),
|
||
"bcl_results_dir": os.path.join(root, "bclresults"),
|
||
"final_dir": os.path.join(root, "final"),
|
||
}
|
||
for d in dirs.values():
|
||
os.makedirs(d, exist_ok=True)
|
||
return dirs
|
||
|
||
|
||
def run_batch_with_io(input_dir: str, output_dir: str) -> int:
|
||
"""批量处理输入目录下的工程文件,输出到指定的输出根目录。"""
|
||
input_dir = os.path.abspath(input_dir)
|
||
output_dir = os.path.abspath(output_dir)
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
candidates = [
|
||
os.path.join(input_dir, f) for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))
|
||
]
|
||
if not candidates:
|
||
print(f"输入目录无文件: {input_dir}")
|
||
return 0
|
||
|
||
total_success = 0
|
||
for idx, file_path in enumerate(candidates, start=1):
|
||
base_name = os.path.basename(file_path)
|
||
stem = os.path.splitext(base_name)[0]
|
||
print(f"[{idx}/{len(candidates)}] 处理文件: {base_name}")
|
||
|
||
try:
|
||
# 1) 为该工程创建独立工作区
|
||
fdirs = create_named_workdirs(output_dir, stem)
|
||
upload_dir = fdirs["upload_dir"]
|
||
json_dir = fdirs["json_dir"]
|
||
merged_dir = fdirs["merged_dir"]
|
||
bcl_results_dir = fdirs["bcl_results_dir"]
|
||
bcl_dir = fdirs["bcl_dir"]
|
||
|
||
# 2) 保存上传
|
||
save_path = os.path.join(upload_dir, base_name)
|
||
shutil.copy(file_path, save_path)
|
||
|
||
# 3) 转换为JSON(并将 BCL 最佳版本复制到 bcl_dir)
|
||
success, _file_num = convert_project_to_json(upload_dir, json_dir, bcl_dir)
|
||
if not success:
|
||
raise RuntimeError("转换为JSON失败")
|
||
|
||
# 4) 处理JSON结构
|
||
process_directory(json_dir)
|
||
|
||
# 5) 费用向上汇总
|
||
result_step2 = costsummary_upwards(json_dir, merged_dir)
|
||
if not result_step2:
|
||
print("警告:未生成任何汇总JSON")
|
||
|
||
# 6) 计算工程量取费表(读取 bcl_dir,输出到 bcl_results_dir)
|
||
bcl_calculate(merged_dir, bcl_results_dir, bcl_dir_path=bcl_dir)
|
||
|
||
# 7) 选择一个项目 JSON 进行费用对比
|
||
merged_jsons = [os.path.join(merged_dir, f) for f in os.listdir(merged_dir) if f.lower().endswith(".json")]
|
||
if not merged_jsons:
|
||
raise FileNotFoundError("在 merged_dir 中未找到项目 JSON")
|
||
project_data_json_path = merged_jsons[0]
|
||
|
||
# 8) 费用对比
|
||
compare_costs_batch(bcl_results_dir, project_data_json_path)
|
||
|
||
total_success += 1
|
||
except Exception as fe:
|
||
print(f"处理文件 {base_name} 失败: {fe}\n{traceback.format_exc()}")
|
||
|
||
print(f"成功执行 {total_success}/{len(candidates)} 个文件。输出根目录: {output_dir}")
|
||
return total_success
|
||
|
||
|
||
# ========= 新增:嵌套目录遍历与结果抽查/汇总 =========
|
||
def _find_leaf_dirs(root_dir: str):
|
||
"""找到 root_dir 下的所有最深层级子文件夹(没有进一步子目录的文件夹)。"""
|
||
leaf_dirs = []
|
||
for current, dirs, _files in os.walk(root_dir):
|
||
# 只要该目录没有子目录,则视为叶子目录
|
||
if not dirs:
|
||
leaf_dirs.append(current)
|
||
return leaf_dirs
|
||
|
||
|
||
def _iter_comparison_results_dirs(bcl_results_dir: str):
|
||
"""在 bcl_results_dir 下递归查找名为 'comparison_results' 的目录,返回其绝对路径列表。"""
|
||
found = []
|
||
for current, dirs, _files in os.walk(bcl_results_dir):
|
||
for d in dirs:
|
||
if d == "comparison_results":
|
||
found.append(os.path.join(current, d))
|
||
return found
|
||
|
||
|
||
def _parse_diff_value_from_line(line: str) -> float | None:
|
||
"""从一行对齐文本中解析“差异”列的值。
|
||
|
||
文本列宽(在 cost_comparison.save_comparison_to_txt 中定义):
|
||
- 项目: 20
|
||
- 参考值: 25
|
||
- 计算值: 25
|
||
- 差异: 25 <-- 我们需要的列
|
||
- 原数据项: 30
|
||
"""
|
||
try:
|
||
# 按固定宽度切片
|
||
start = 20 + 25 + 25
|
||
end = start + 25
|
||
cell = line[start:end].strip()
|
||
if not cell:
|
||
return None
|
||
# 去掉可能的对齐空格,再解析为 float
|
||
val = float(cell)
|
||
return val
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _evaluate_comparison_folder(comp_dir: str, sample_k: int = 10) -> str:
|
||
"""随机抽查 comp_dir 下的 TXT 文件,若任意“差异”列存在非 0/-0 值则判定为“有问题”,否则为“正确”。"""
|
||
txt_files = [os.path.join(comp_dir, f) for f in os.listdir(comp_dir) if f.lower().endswith(".txt")]
|
||
if not txt_files:
|
||
# 无文件视为有问题(或按需 "正确"),这里更稳妥地标记为有问题
|
||
return "有问题"
|
||
|
||
random.shuffle(txt_files)
|
||
subset = txt_files[: min(sample_k, len(txt_files))]
|
||
|
||
for path in subset:
|
||
try:
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
lines = f.readlines()
|
||
# 跳过前两行(表头和分隔线)
|
||
for line in lines[2:]:
|
||
diff_val = _parse_diff_value_from_line(line)
|
||
if diff_val is None:
|
||
continue
|
||
# 允许 -0.0 视为 0
|
||
if abs(diff_val) > 0.0:
|
||
return "有问题"
|
||
except Exception:
|
||
return "有问题"
|
||
|
||
return "正确"
|
||
|
||
|
||
def _append_results_to_csv(records: list[tuple[str, str, str, str]], csv_path: str):
|
||
"""将 (文件路径, 软件类别, 项目类型, 计算结果) 记录写入 CSV(追加/创建)。"""
|
||
header = ["文件路径", "软件类别", "项目类型", "计算结果"]
|
||
file_exists = os.path.exists(csv_path)
|
||
with open(csv_path, "a", encoding="utf-8", newline="") as f:
|
||
writer = csv.writer(f)
|
||
if not file_exists:
|
||
writer.writerow(header)
|
||
for row in records:
|
||
writer.writerow(list(row))
|
||
|
||
|
||
def run_nested_batch_with_io(
|
||
input_root: str, output_root: str, sample_k: int = 10, summary_csv_path: str | None = None
|
||
) -> int:
|
||
"""遍历嵌套目录中每个最深层级的子文件夹,按现有流程处理其中所有工程文件。
|
||
|
||
并在完成每个工程的第 8 步(费用对比)后,定位 bclresults 下的所有 comparison_results 文件夹,随机抽查 10 个 TXT,
|
||
将判定结果汇总到一个 CSV(列:文件路径、工程名称、计算结果)。
|
||
"""
|
||
input_root = os.path.abspath(input_root)
|
||
output_root = os.path.abspath(output_root)
|
||
os.makedirs(output_root, exist_ok=True)
|
||
|
||
# 提前确定并创建汇总 CSV 路径,后续逐条写入,避免中途异常导致整体丢失
|
||
# 需求:多次运行不要覆盖历史数据 -> 当未显式指定时,采用固定文件名并使用追加写入
|
||
if summary_csv_path is None:
|
||
summary_csv_path = os.path.join(output_root, "comparison_validation_summary.csv")
|
||
else:
|
||
# 若显式指定了路径,确保其父目录存在
|
||
csv_dir = os.path.dirname(summary_csv_path)
|
||
if csv_dir:
|
||
os.makedirs(csv_dir, exist_ok=True)
|
||
|
||
leaf_dirs = _find_leaf_dirs(input_root)
|
||
if not leaf_dirs:
|
||
print(f"未找到任何叶子目录: {input_root}")
|
||
return 0
|
||
|
||
print(f"共发现 {len(leaf_dirs)} 个最深层级子文件夹,开始逐一处理……")
|
||
|
||
total_success = 0
|
||
|
||
for didx, leaf in enumerate(leaf_dirs, start=1):
|
||
# 收集该叶子目录下的所有文件
|
||
files = [os.path.join(leaf, f) for f in os.listdir(leaf) if os.path.isfile(os.path.join(leaf, f))]
|
||
if not files:
|
||
print(f"[{didx}/{len(leaf_dirs)}] 叶子目录无文件,跳过: {leaf}")
|
||
continue
|
||
|
||
print(f"[{didx}/{len(leaf_dirs)}] 处理叶子目录: {leaf}({len(files)} 个文件)")
|
||
|
||
for fidx, file_path in enumerate(files, start=1):
|
||
base_name = os.path.basename(file_path)
|
||
stem = os.path.splitext(base_name)[0]
|
||
print(f" - ({fidx}/{len(files)}) 处理文件: {base_name}")
|
||
|
||
try:
|
||
# 为该工程创建独立工作区
|
||
fdirs = create_named_workdirs(output_root, stem)
|
||
upload_dir = fdirs["upload_dir"]
|
||
json_dir = fdirs["json_dir"]
|
||
merged_dir = fdirs["merged_dir"]
|
||
bcl_results_dir = fdirs["bcl_results_dir"]
|
||
bcl_dir = fdirs["bcl_dir"]
|
||
|
||
# 保存上传
|
||
save_path = os.path.join(upload_dir, base_name)
|
||
shutil.copy(file_path, save_path)
|
||
|
||
# 转 JSON
|
||
success, _file_num = convert_project_to_json(upload_dir, json_dir, bcl_dir)
|
||
if not success:
|
||
raise RuntimeError("转换为JSON失败")
|
||
|
||
# 处理 JSON
|
||
process_directory(json_dir)
|
||
|
||
# 费用向上汇总
|
||
_ = costsummary_upwards(json_dir, merged_dir)
|
||
|
||
# BCL 计算
|
||
bcl_calculate(merged_dir, bcl_results_dir, bcl_dir_path=bcl_dir)
|
||
|
||
# 选择一个项目 JSON 进行费用对比
|
||
merged_jsons = [
|
||
os.path.join(merged_dir, nf) for nf in os.listdir(merged_dir) if nf.lower().endswith(".json")
|
||
]
|
||
if not merged_jsons:
|
||
raise FileNotFoundError("在 merged_dir 中未找到项目 JSON")
|
||
project_data_json_path = merged_jsons[0]
|
||
|
||
# 费用对比
|
||
compare_costs_batch(bcl_results_dir, project_data_json_path)
|
||
|
||
# 从项目 JSON 中解析 软件类别/项目类型
|
||
try:
|
||
category, project_type, engineering_type = parse_json_content(project_data_json_path)
|
||
except Exception:
|
||
category, project_type = "未知", "未知"
|
||
|
||
# 对比完成后:在 bclresults 下寻找 comparison_results 目录,并进行抽查
|
||
comp_dirs = _iter_comparison_results_dirs(bcl_results_dir)
|
||
if not comp_dirs:
|
||
# 若未找到任何 comparison_results,则记录为有问题(立即写入 CSV)
|
||
_append_results_to_csv([(bcl_results_dir, category, project_type, "有问题")], summary_csv_path)
|
||
else:
|
||
for comp_dir in comp_dirs:
|
||
parent_dir = os.path.dirname(comp_dir)
|
||
result = _evaluate_comparison_folder(comp_dir, sample_k=sample_k)
|
||
_append_results_to_csv([(parent_dir, category, project_type, result)], summary_csv_path)
|
||
|
||
total_success += 1
|
||
except Exception as fe:
|
||
print(f" ❌ 处理文件 {base_name} 失败: {fe}\n{traceback.format_exc()}")
|
||
|
||
print(f"\n📄 抽查结果 CSV 路径: {summary_csv_path}")
|
||
|
||
print(f"\n✅ 嵌套目录批处理完成。成功处理工程数: {total_success}")
|
||
return total_success
|
||
|
||
|
||
if __name__ == "__main__":
|
||
|
||
input_path = r"E:\文件\LLM_model\RAG\code\Engineering_data_KG-1\KG_generation\data\input\uploads" # 请修改为你的实际输入嵌套目录根
|
||
output_path = r"data/input" # 请修改为你的实际输出根目录
|
||
summary_csv = None # 可选:指定汇总 CSV 路径;为 None 时自动生成
|
||
|
||
t0 = time.time()
|
||
try:
|
||
count = run_nested_batch_with_io(input_path, output_path, sample_k=10, summary_csv_path=summary_csv)
|
||
print(f"\n🎉 ✅ 处理完成,共成功 {count} 个。耗时: {int(time.time() - t0)} 秒")
|
||
except Exception as e:
|
||
print(f"\n❌ 执行失败: {e}\n{traceback.format_exc()}")
|
||
raise
|