Files
KG_generation/project2json/project_converter.py
T
chentianrui f5f26c5cf8 上传代码
2025-09-08 17:58:02 +08:00

469 lines
17 KiB
Python

import os
import subprocess
import time
from gradio_client import file
import requests
import json
import shutil
import threading
import atexit
import tempfile
# ==================== BCL 工具配置 ====================
# 相对于本脚本所在目录的 BwZipBCLTool.exe 路径(位于 `project2json/解压计算配置工具/` 下)
BCL_TOOL_RELATIVE_PATH = os.path.join(
os.path.dirname(__file__),
"解压计算配置工具",
"BwZipBCLTool.exe",
)
# ======================================================
# ==================== 配置区 ====================
JAVA_WORKING_DIR = "D:/eclipseworkspace/bwyAnalysis2.3.2/analysis-server"
JAR_NAME = "booway-analysis-server-null.jar"
SERVER_URL = "http://localhost:8090/api/doAnalysis"
# ================================================
# 全局变量:Java 进程
java_process = None
def start_java_server():
"""启动 Java 服务"""
global java_process
if java_process is not None and java_process.poll() is None:
print("✅ Java 服务已在运行。")
return True
try:
jar_path = os.path.join(JAVA_WORKING_DIR, JAR_NAME)
if not os.path.exists(jar_path):
print(f"❌ JAR 文件不存在: {jar_path}")
return False
if not os.path.exists(JAVA_WORKING_DIR):
print(f"❌ Java 工作目录不存在: {JAVA_WORKING_DIR}")
return False
print(f"📁 启动 Java 服务: {jar_path}")
cmd = ["java", "-Dfile.encoding=UTF-8", "-jar", JAR_NAME]
java_process = subprocess.Popen(
cmd,
cwd=JAVA_WORKING_DIR,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
errors="replace",
)
def log_output(pipe, prefix):
for line in iter(pipe.readline, ""):
if line.strip():
print(f"{prefix}{line.strip()}")
threading.Thread(target=log_output, args=(java_process.stdout, "【OUT】 "), daemon=True).start()
threading.Thread(target=log_output, args=(java_process.stderr, "【ERR】 "), daemon=True).start()
print("⏳ 等待服务启动(等待 20 秒)...")
time.sleep(20)
return True
except Exception as e:
print(f"❌ 启动 Java 服务失败: {e}")
return False
def check_server_ready(timeout=30):
"""检查服务是否可用"""
print("🔍 检测服务是否就绪...")
start_time = time.time()
while time.time() - start_time < timeout:
try:
requests.get("http://localhost:8090/", timeout=3)
print("✅ 服务已就绪!")
return True
except:
time.sleep(1)
print("❌ 服务启动超时。")
return False
def convert_json_to_readable(input_file, output_file=None):
"""
升级版:支持编码检测和容错
"""
if output_file is None:
base_name = os.path.splitext(input_file)[0]
output_file = f"{base_name}_pretty.json"
# 确保输出目录存在
os.makedirs(os.path.dirname(output_file) if os.path.dirname(output_file) else ".", exist_ok=True)
encodings = ["utf-8", "gbk", "gb18030", "latin1"] # 常见编码
data = None
for enc in encodings:
try:
with open(input_file, "r", encoding=enc) as f:
data = json.load(f)
print(f"✔ 使用编码 {enc} 成功读取 {input_file}")
break
except UnicodeDecodeError:
continue
except Exception as e:
print(f"❌ 读取失败 {input_file}: {e}")
return None
if data is None:
print(f"❌ 所有编码尝试失败: {input_file}")
return None
try:
with open(output_file, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
print(f"✅ 美化成功: {output_file}")
return output_file
except Exception as e:
print(f"❌ 写入失败: {e}")
return None
def _parse_version_key(name: str):
"""将形如 '1.20' 的版本字符串转换为可比较的元组。(主, 次, 补丁...)
非法格式将返回一个极小值以便被忽略。
"""
try:
parts = [int(p) for p in name.strip().split(".") if p.isdigit()]
if not parts:
return (-1,)
return tuple(parts)
except Exception:
return (-1,)
def _copy_dir_contents(src_dir: str, dst_dir: str):
"""将 src_dir 下的所有文件/子目录复制到 dst_dir(不包含 src_dir 这一层目录)。"""
os.makedirs(dst_dir, exist_ok=True)
for entry in os.listdir(src_dir):
s = os.path.join(src_dir, entry)
d = os.path.join(dst_dir, entry)
if os.path.isdir(s):
# 复制子目录
if os.path.exists(d):
shutil.rmtree(d)
shutil.copytree(s, d)
else:
shutil.copy2(s, d)
def run_bcl_tool_and_copy_best(
project_path: str,
final_copy_dir: str,
desktop_base: str = None,
exe_path: str = None,
exts: tuple[str, ...] | None = None,
cleanup_temp: bool = True,
):
"""
运行 BwZipBCLTool.exe 解析 BCL,并将临时输出目录下“版本号最大”的子文件夹内容复制到 final_copy_dir。
参数:
project_path: .zwqd 工程文件路径或包含该文件的目录
final_copy_dir: 版本号最大的文件夹内的所有文件复制到的最终目录
desktop_base: 临时输出目录所在的桌面路径,默认为当前用户桌面
exe_path: BwZipBCLTool.exe 的完整路径,默认使用 BCL_TOOL_RELATIVE_PATH
返回:
(bool, str): (是否成功, 最佳版本目录路径或错误信息)
"""
try:
exe_path = exe_path or BCL_TOOL_RELATIVE_PATH
if not os.path.exists(exe_path):
return False, f"未找到 BCL 工具: {exe_path}"
# 构建候选工程文件列表
# 默认:尝试目录内所有文件;若传入 exts,则只筛选指定后缀
candidates: list[str] = []
if os.path.isfile(project_path):
candidates = [project_path]
elif os.path.isdir(project_path):
names = sorted(os.listdir(project_path))
if exts:
lower_exts = tuple(e.lower() for e in exts)
candidates.extend([os.path.join(project_path, n) for n in names if n.lower().endswith(lower_exts)])
else:
candidates.extend(
[os.path.join(project_path, n) for n in names if os.path.isfile(os.path.join(project_path, n))]
)
else:
return False, f"路径不存在: {project_path}"
if not candidates:
return False, f"在 '{project_path}' 未找到可供解析的工程文件"
last_error = None
for eng_file in candidates:
# 临时目录放到桌面
if desktop_base is None:
desktop_base = os.path.join(os.path.expanduser("~"), "Desktop")
os.makedirs(desktop_base, exist_ok=True)
temp_out_dir = tempfile.mkdtemp(prefix="bcl_", dir=desktop_base)
print(f"🚀 调用 BCL 工具: {exe_path}")
print(f" ├─ 工程文件: {eng_file}")
print(f" └─ 临时输出: {temp_out_dir}")
try:
# 运行外部工具
result = subprocess.run(
[exe_path, eng_file, temp_out_dir],
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
shell=False,
)
if result.returncode != 0:
stderr = (result.stderr or "").strip()
stdout = (result.stdout or "").strip()
last_error = f"BCL 工具执行失败 (code={result.returncode})\nSTDOUT: {stdout}\nSTDERR: {stderr}"
continue
# 在临时目录下查找版本号子目录
subdirs = [d for d in os.listdir(temp_out_dir) if os.path.isdir(os.path.join(temp_out_dir, d))]
if not subdirs:
last_error = f"BCL 工具未在输出目录生成任何子目录: {temp_out_dir}"
continue
best_dir_name = max(subdirs, key=_parse_version_key)
best_dir_path = os.path.join(temp_out_dir, best_dir_name)
print(f"🏆 发现最佳版本目录: {best_dir_name}")
# 若下层只有单一子目录则继续下探;若存在多个子目录,且其名称像版本号(如 1.2.13),则选取版本号最大者继续下探
def _descend_if_single_subdir(path: str) -> str:
try:
while True:
entries = [os.path.join(path, e) for e in os.listdir(path)]
files = [p for p in entries if os.path.isfile(p)]
dirs = [p for p in entries if os.path.isdir(p)]
if files:
return path
if len(dirs) == 1:
path = dirs[0]
continue
if len(dirs) > 1:
# 如果多子目录都像版本号,则选择最大的版本继续下探
dir_names = [os.path.basename(d) for d in dirs]
def looks_like_version(name: str) -> bool:
parts = name.split(".")
if not parts:
return False
return all(p.isdigit() for p in parts)
version_like = [d for d, n in zip(dirs, dir_names) if looks_like_version(n)]
if version_like:
best_dir = max(version_like, key=lambda p: _parse_version_key(os.path.basename(p)))
path = best_dir
continue
# 否则停止下探,返回当前层
return path
return path
except Exception:
return path
copy_root = _descend_if_single_subdir(best_dir_path)
# 复制内容到最终目录
os.makedirs(final_copy_dir, exist_ok=True)
_copy_dir_contents(copy_root, final_copy_dir)
print(f"✅ 已将最佳版本目录内的文件复制到: {final_copy_dir}")
return True, copy_root
finally:
if cleanup_temp:
try:
shutil.rmtree(temp_out_dir, ignore_errors=True)
# print(f"🧹 已清理临时目录: {temp_out_dir}")
except Exception:
pass
# 所有候选均失败
return False, (last_error or "未找到可用的工程文件以执行 BCL 工具")
except Exception as e:
return False, f"运行 BCL 工具出错: {e}"
def convert_project_to_json(input_folder, output_folder, best_version_dest_folder=None):
"""
将工程文件夹转换为JSON,并移动到输出文件夹(含美化)
参数:
input_folder: 工程文件所在目录(包含 .zwzj 文件)
output_folder: 转换后的 JSON 文件输出目录
best_version_dest_folder: 若提供,则会在流程开始前运行 BCL 工具,并将最佳版本目录内容复制到该路径
返回:
tuple(bool, int): (是否成功, 成功转移并美化的文件数量)
"""
transferred_count = 0 # 记录成功转移的文件数
success = False # 整体是否成功
# --- 路径预处理 ---
try:
input_folder = os.path.abspath(input_folder)
output_folder = os.path.abspath(output_folder)
except Exception as e:
print(f"❌ 路径解析失败: {e}")
return False, 0
print(f"📁 输入工程目录: {input_folder}")
print(f"📁 输出 JSON 目录: {output_folder}")
# -----------------
# 1. 检查输入目录
if not os.path.exists(input_folder):
print(f"❌ 输入目录不存在: {input_folder}")
return False, 0
if not os.path.isdir(input_folder):
print(f"❌ 输入路径不是目录: {input_folder}")
return False, 0
# 2. 确保输出目录存在
try:
os.makedirs(output_folder, exist_ok=True)
output_folder = os.path.abspath(output_folder)
except Exception as e:
print(f"❌ 创建输出目录失败: {e}")
return False, 0
# 3. 若用户指定了 BCL 最终复制目录,则先进行 BCL 解析与复制
if best_version_dest_folder:
ok, info = run_bcl_tool_and_copy_best(input_folder, best_version_dest_folder)
if ok:
print(f"🧩 BCL 解析已完成,最佳版本输出目录: {info}")
else:
print(f"⚠️ BCL 解析步骤失败: {info}")
# 4. 启动 Java 服务
if not start_java_server():
return False, 0
if not check_server_ready():
return False, 0
# 5. 调用 API 开始分析
params = {"filePath": input_folder.replace("\\", "/")}
print(f"🌐 调用 API: {SERVER_URL}?{requests.compat.urlencode(params)}")
try:
response = requests.get(SERVER_URL, params=params, timeout=180)
print(f"📡 响应状态码: {response.status_code}")
if response.status_code != 200:
print(f"❌ API 调用失败: {response.text}")
return False, 0
result = response.json()
if not result.get("success", False):
print(f"❌ 服务返回失败: {result.get('msg', '未知错误')}")
return False, 0
except Exception as e:
print(f"❌ 调用 API 出错: {e}")
return False, 0
# 6. 等待并检测 JSON 文件生成完成(改进版)
print("⏳ 正在等待 JSON 文件生成完成...")
json_files = []
max_wait_time = 6000
check_interval = 2
start_time = time.time()
while time.time() - start_time < max_wait_time:
# 获取目录中的所有文件
all_files = os.listdir(input_folder)
# 获取JSON文件列表
current_jsons = [f for f in all_files if f.endswith(".json")]
# 获取非JSON文件列表
non_json_files = [f for f in all_files if not f.endswith(".json")]
# 计算文件数量
json_count = len(current_jsons)
non_json_count = len(non_json_files)
print(
f" ├─ 第 {int(time.time()-start_time)} 秒: 发现 {json_count} 个 JSON 文件, {non_json_count} 个非 JSON 文件"
)
# 如果JSON文件和非JSON文件数量相等且都大于0,认为处理完成
if json_count > 0 and json_count == non_json_count:
json_files = current_jsons
print(f"✅ 文件生成完成,JSON文件和非JSON文件数量相等(各{json_count}个)。")
break
time.sleep(check_interval)
else:
json_files = [f for f in os.listdir(input_folder) if f.endswith(".json")]
if json_files:
print(f"⚠️ 警告:等待超时,但已发现 {len(json_files)} 个 JSON 文件,继续处理。")
else:
print(f"❌ 在 {max_wait_time} 秒内未生成任何 JSON 文件。")
return False, 0
# 7. 查找所有 .json 文件
json_files = [f for f in os.listdir(input_folder) if f.endswith(".json")]
if not json_files:
print(f"❌ 未在 {input_folder} 中生成 JSON 文件。")
return False, 0
print(f"📄 发现 {len(json_files)} 个 JSON 文件: {json_files}")
# 8. 转换为可读格式并移动到输出目录
transferred_count = 0
for file_name in json_files:
src_path = os.path.join(input_folder, file_name)
readable_path = os.path.join(output_folder, f"{os.path.splitext(file_name)[0]}.json")
if convert_json_to_readable(src_path, readable_path):
transferred_count += 1
else:
print(f"⚠️ 跳过文件: {file_name}")
print(f"✅ 共转移并美化 {transferred_count}/{len(json_files)} 个 JSON 文件到: {output_folder}")
# ✅ 返回成功状态和实际转移数量
success = True
return success, transferred_count
def cleanup():
"""清理 Java 进程"""
global java_process
if java_process and java_process.poll() is None:
print("\n🛑 正在关闭 Java 服务...")
java_process.terminate()
try:
java_process.wait(timeout=5)
except subprocess.TimeoutExpired:
java_process.kill()
print("✅ Java 服务已关闭。")
# 注册退出钩子
atexit.register(cleanup)
# ==================== 使用示例 ====================
if __name__ == "__main__":
# 示例调用
input_dir = r"data/output/uploads"
output_dir = r"data/output/json"
best_bcl_dest = r"data/output/bcl"
success, _count = convert_project_to_json(input_dir, output_dir, best_bcl_dest)
if success:
print("\n🎉 ✅ 全部流程执行成功!")
else:
print("\n❌ 流程执行失败,请检查日志。")
input("\n按回车键退出...")