469 lines
17 KiB
Python
469 lines
17 KiB
Python
import os
|
|
import subprocess
|
|
import time
|
|
from gradio_client import file
|
|
import requests
|
|
import json
|
|
import shutil
|
|
import threading
|
|
import atexit
|
|
import tempfile
|
|
|
|
# ==================== BCL 工具配置 ====================
|
|
# 相对于本脚本所在目录的 BwZipBCLTool.exe 路径(位于 `project2json/解压计算配置工具/` 下)
|
|
BCL_TOOL_RELATIVE_PATH = os.path.join(
|
|
os.path.dirname(__file__),
|
|
"解压计算配置工具",
|
|
"BwZipBCLTool.exe",
|
|
)
|
|
# ======================================================
|
|
|
|
# ==================== 配置区 ====================
|
|
JAVA_WORKING_DIR = "D:/eclipseworkspace/bwyAnalysis2.3.2/analysis-server"
|
|
JAR_NAME = "booway-analysis-server-null.jar"
|
|
SERVER_URL = "http://localhost:8090/api/doAnalysis"
|
|
# ================================================
|
|
|
|
# 全局变量:Java 进程
|
|
java_process = None
|
|
|
|
|
|
def start_java_server():
|
|
"""启动 Java 服务"""
|
|
global java_process
|
|
|
|
if java_process is not None and java_process.poll() is None:
|
|
print("✅ Java 服务已在运行。")
|
|
return True
|
|
|
|
try:
|
|
jar_path = os.path.join(JAVA_WORKING_DIR, JAR_NAME)
|
|
if not os.path.exists(jar_path):
|
|
print(f"❌ JAR 文件不存在: {jar_path}")
|
|
return False
|
|
if not os.path.exists(JAVA_WORKING_DIR):
|
|
print(f"❌ Java 工作目录不存在: {JAVA_WORKING_DIR}")
|
|
return False
|
|
|
|
print(f"📁 启动 Java 服务: {jar_path}")
|
|
cmd = ["java", "-Dfile.encoding=UTF-8", "-jar", JAR_NAME]
|
|
java_process = subprocess.Popen(
|
|
cmd,
|
|
cwd=JAVA_WORKING_DIR,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
encoding="utf-8",
|
|
errors="replace",
|
|
)
|
|
|
|
def log_output(pipe, prefix):
|
|
for line in iter(pipe.readline, ""):
|
|
if line.strip():
|
|
print(f"{prefix}{line.strip()}")
|
|
|
|
threading.Thread(target=log_output, args=(java_process.stdout, "【OUT】 "), daemon=True).start()
|
|
threading.Thread(target=log_output, args=(java_process.stderr, "【ERR】 "), daemon=True).start()
|
|
|
|
print("⏳ 等待服务启动(等待 20 秒)...")
|
|
time.sleep(20)
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"❌ 启动 Java 服务失败: {e}")
|
|
return False
|
|
|
|
|
|
def check_server_ready(timeout=30):
|
|
"""检查服务是否可用"""
|
|
print("🔍 检测服务是否就绪...")
|
|
start_time = time.time()
|
|
while time.time() - start_time < timeout:
|
|
try:
|
|
requests.get("http://localhost:8090/", timeout=3)
|
|
print("✅ 服务已就绪!")
|
|
return True
|
|
except:
|
|
time.sleep(1)
|
|
print("❌ 服务启动超时。")
|
|
return False
|
|
|
|
|
|
def convert_json_to_readable(input_file, output_file=None):
|
|
"""
|
|
升级版:支持编码检测和容错
|
|
"""
|
|
if output_file is None:
|
|
base_name = os.path.splitext(input_file)[0]
|
|
output_file = f"{base_name}_pretty.json"
|
|
|
|
# 确保输出目录存在
|
|
os.makedirs(os.path.dirname(output_file) if os.path.dirname(output_file) else ".", exist_ok=True)
|
|
|
|
encodings = ["utf-8", "gbk", "gb18030", "latin1"] # 常见编码
|
|
data = None
|
|
|
|
for enc in encodings:
|
|
try:
|
|
with open(input_file, "r", encoding=enc) as f:
|
|
data = json.load(f)
|
|
print(f"✔ 使用编码 {enc} 成功读取 {input_file}")
|
|
break
|
|
except UnicodeDecodeError:
|
|
continue
|
|
except Exception as e:
|
|
print(f"❌ 读取失败 {input_file}: {e}")
|
|
return None
|
|
|
|
if data is None:
|
|
print(f"❌ 所有编码尝试失败: {input_file}")
|
|
return None
|
|
|
|
try:
|
|
with open(output_file, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=4)
|
|
print(f"✅ 美化成功: {output_file}")
|
|
return output_file
|
|
except Exception as e:
|
|
print(f"❌ 写入失败: {e}")
|
|
return None
|
|
|
|
|
|
def _parse_version_key(name: str):
|
|
"""将形如 '1.20' 的版本字符串转换为可比较的元组。(主, 次, 补丁...)
|
|
非法格式将返回一个极小值以便被忽略。
|
|
"""
|
|
try:
|
|
parts = [int(p) for p in name.strip().split(".") if p.isdigit()]
|
|
if not parts:
|
|
return (-1,)
|
|
return tuple(parts)
|
|
except Exception:
|
|
return (-1,)
|
|
|
|
|
|
def _copy_dir_contents(src_dir: str, dst_dir: str):
|
|
"""将 src_dir 下的所有文件/子目录复制到 dst_dir(不包含 src_dir 这一层目录)。"""
|
|
os.makedirs(dst_dir, exist_ok=True)
|
|
for entry in os.listdir(src_dir):
|
|
s = os.path.join(src_dir, entry)
|
|
d = os.path.join(dst_dir, entry)
|
|
if os.path.isdir(s):
|
|
# 复制子目录
|
|
if os.path.exists(d):
|
|
shutil.rmtree(d)
|
|
shutil.copytree(s, d)
|
|
else:
|
|
shutil.copy2(s, d)
|
|
|
|
|
|
def run_bcl_tool_and_copy_best(
|
|
project_path: str,
|
|
final_copy_dir: str,
|
|
desktop_base: str = None,
|
|
exe_path: str = None,
|
|
exts: tuple[str, ...] | None = None,
|
|
cleanup_temp: bool = True,
|
|
):
|
|
"""
|
|
运行 BwZipBCLTool.exe 解析 BCL,并将临时输出目录下“版本号最大”的子文件夹内容复制到 final_copy_dir。
|
|
|
|
参数:
|
|
project_path: .zwqd 工程文件路径或包含该文件的目录
|
|
final_copy_dir: 版本号最大的文件夹内的所有文件复制到的最终目录
|
|
desktop_base: 临时输出目录所在的桌面路径,默认为当前用户桌面
|
|
exe_path: BwZipBCLTool.exe 的完整路径,默认使用 BCL_TOOL_RELATIVE_PATH
|
|
|
|
返回:
|
|
(bool, str): (是否成功, 最佳版本目录路径或错误信息)
|
|
"""
|
|
try:
|
|
exe_path = exe_path or BCL_TOOL_RELATIVE_PATH
|
|
if not os.path.exists(exe_path):
|
|
return False, f"未找到 BCL 工具: {exe_path}"
|
|
|
|
# 构建候选工程文件列表
|
|
# 默认:尝试目录内所有文件;若传入 exts,则只筛选指定后缀
|
|
candidates: list[str] = []
|
|
if os.path.isfile(project_path):
|
|
candidates = [project_path]
|
|
elif os.path.isdir(project_path):
|
|
names = sorted(os.listdir(project_path))
|
|
if exts:
|
|
lower_exts = tuple(e.lower() for e in exts)
|
|
candidates.extend([os.path.join(project_path, n) for n in names if n.lower().endswith(lower_exts)])
|
|
else:
|
|
candidates.extend(
|
|
[os.path.join(project_path, n) for n in names if os.path.isfile(os.path.join(project_path, n))]
|
|
)
|
|
else:
|
|
return False, f"路径不存在: {project_path}"
|
|
|
|
if not candidates:
|
|
return False, f"在 '{project_path}' 未找到可供解析的工程文件"
|
|
|
|
last_error = None
|
|
for eng_file in candidates:
|
|
# 临时目录放到桌面
|
|
if desktop_base is None:
|
|
desktop_base = os.path.join(os.path.expanduser("~"), "Desktop")
|
|
os.makedirs(desktop_base, exist_ok=True)
|
|
temp_out_dir = tempfile.mkdtemp(prefix="bcl_", dir=desktop_base)
|
|
|
|
print(f"🚀 调用 BCL 工具: {exe_path}")
|
|
print(f" ├─ 工程文件: {eng_file}")
|
|
print(f" └─ 临时输出: {temp_out_dir}")
|
|
|
|
try:
|
|
# 运行外部工具
|
|
result = subprocess.run(
|
|
[exe_path, eng_file, temp_out_dir],
|
|
capture_output=True,
|
|
text=True,
|
|
encoding="utf-8",
|
|
errors="replace",
|
|
shell=False,
|
|
)
|
|
if result.returncode != 0:
|
|
stderr = (result.stderr or "").strip()
|
|
stdout = (result.stdout or "").strip()
|
|
last_error = f"BCL 工具执行失败 (code={result.returncode})\nSTDOUT: {stdout}\nSTDERR: {stderr}"
|
|
continue
|
|
|
|
# 在临时目录下查找版本号子目录
|
|
subdirs = [d for d in os.listdir(temp_out_dir) if os.path.isdir(os.path.join(temp_out_dir, d))]
|
|
if not subdirs:
|
|
last_error = f"BCL 工具未在输出目录生成任何子目录: {temp_out_dir}"
|
|
continue
|
|
|
|
best_dir_name = max(subdirs, key=_parse_version_key)
|
|
best_dir_path = os.path.join(temp_out_dir, best_dir_name)
|
|
print(f"🏆 发现最佳版本目录: {best_dir_name}")
|
|
|
|
# 若下层只有单一子目录则继续下探;若存在多个子目录,且其名称像版本号(如 1.2.13),则选取版本号最大者继续下探
|
|
def _descend_if_single_subdir(path: str) -> str:
|
|
try:
|
|
while True:
|
|
entries = [os.path.join(path, e) for e in os.listdir(path)]
|
|
files = [p for p in entries if os.path.isfile(p)]
|
|
dirs = [p for p in entries if os.path.isdir(p)]
|
|
if files:
|
|
return path
|
|
if len(dirs) == 1:
|
|
path = dirs[0]
|
|
continue
|
|
if len(dirs) > 1:
|
|
# 如果多子目录都像版本号,则选择最大的版本继续下探
|
|
dir_names = [os.path.basename(d) for d in dirs]
|
|
|
|
def looks_like_version(name: str) -> bool:
|
|
parts = name.split(".")
|
|
if not parts:
|
|
return False
|
|
return all(p.isdigit() for p in parts)
|
|
|
|
version_like = [d for d, n in zip(dirs, dir_names) if looks_like_version(n)]
|
|
if version_like:
|
|
best_dir = max(version_like, key=lambda p: _parse_version_key(os.path.basename(p)))
|
|
path = best_dir
|
|
continue
|
|
# 否则停止下探,返回当前层
|
|
return path
|
|
return path
|
|
except Exception:
|
|
return path
|
|
|
|
copy_root = _descend_if_single_subdir(best_dir_path)
|
|
|
|
# 复制内容到最终目录
|
|
os.makedirs(final_copy_dir, exist_ok=True)
|
|
_copy_dir_contents(copy_root, final_copy_dir)
|
|
print(f"✅ 已将最佳版本目录内的文件复制到: {final_copy_dir}")
|
|
|
|
return True, copy_root
|
|
finally:
|
|
if cleanup_temp:
|
|
try:
|
|
shutil.rmtree(temp_out_dir, ignore_errors=True)
|
|
# print(f"🧹 已清理临时目录: {temp_out_dir}")
|
|
except Exception:
|
|
pass
|
|
|
|
# 所有候选均失败
|
|
return False, (last_error or "未找到可用的工程文件以执行 BCL 工具")
|
|
except Exception as e:
|
|
return False, f"运行 BCL 工具出错: {e}"
|
|
|
|
|
|
def convert_project_to_json(input_folder, output_folder, best_version_dest_folder=None):
|
|
"""
|
|
将工程文件夹转换为JSON,并移动到输出文件夹(含美化)
|
|
|
|
参数:
|
|
input_folder: 工程文件所在目录(包含 .zwzj 文件)
|
|
output_folder: 转换后的 JSON 文件输出目录
|
|
best_version_dest_folder: 若提供,则会在流程开始前运行 BCL 工具,并将最佳版本目录内容复制到该路径
|
|
|
|
返回:
|
|
tuple(bool, int): (是否成功, 成功转移并美化的文件数量)
|
|
"""
|
|
transferred_count = 0 # 记录成功转移的文件数
|
|
success = False # 整体是否成功
|
|
|
|
# --- 路径预处理 ---
|
|
try:
|
|
input_folder = os.path.abspath(input_folder)
|
|
output_folder = os.path.abspath(output_folder)
|
|
except Exception as e:
|
|
print(f"❌ 路径解析失败: {e}")
|
|
return False, 0
|
|
|
|
print(f"📁 输入工程目录: {input_folder}")
|
|
print(f"📁 输出 JSON 目录: {output_folder}")
|
|
# -----------------
|
|
|
|
# 1. 检查输入目录
|
|
if not os.path.exists(input_folder):
|
|
print(f"❌ 输入目录不存在: {input_folder}")
|
|
return False, 0
|
|
if not os.path.isdir(input_folder):
|
|
print(f"❌ 输入路径不是目录: {input_folder}")
|
|
return False, 0
|
|
|
|
# 2. 确保输出目录存在
|
|
try:
|
|
os.makedirs(output_folder, exist_ok=True)
|
|
output_folder = os.path.abspath(output_folder)
|
|
except Exception as e:
|
|
print(f"❌ 创建输出目录失败: {e}")
|
|
return False, 0
|
|
|
|
# 3. 若用户指定了 BCL 最终复制目录,则先进行 BCL 解析与复制
|
|
if best_version_dest_folder:
|
|
ok, info = run_bcl_tool_and_copy_best(input_folder, best_version_dest_folder)
|
|
if ok:
|
|
print(f"🧩 BCL 解析已完成,最佳版本输出目录: {info}")
|
|
else:
|
|
print(f"⚠️ BCL 解析步骤失败: {info}")
|
|
|
|
# 4. 启动 Java 服务
|
|
if not start_java_server():
|
|
return False, 0
|
|
if not check_server_ready():
|
|
return False, 0
|
|
|
|
# 5. 调用 API 开始分析
|
|
params = {"filePath": input_folder.replace("\\", "/")}
|
|
print(f"🌐 调用 API: {SERVER_URL}?{requests.compat.urlencode(params)}")
|
|
|
|
try:
|
|
response = requests.get(SERVER_URL, params=params, timeout=180)
|
|
print(f"📡 响应状态码: {response.status_code}")
|
|
if response.status_code != 200:
|
|
print(f"❌ API 调用失败: {response.text}")
|
|
return False, 0
|
|
result = response.json()
|
|
if not result.get("success", False):
|
|
print(f"❌ 服务返回失败: {result.get('msg', '未知错误')}")
|
|
return False, 0
|
|
except Exception as e:
|
|
print(f"❌ 调用 API 出错: {e}")
|
|
return False, 0
|
|
|
|
# 6. 等待并检测 JSON 文件生成完成(改进版)
|
|
print("⏳ 正在等待 JSON 文件生成完成...")
|
|
json_files = []
|
|
max_wait_time = 600
|
|
check_interval = 2
|
|
|
|
start_time = time.time()
|
|
while time.time() - start_time < max_wait_time:
|
|
# 获取目录中的所有文件
|
|
all_files = os.listdir(input_folder)
|
|
# 获取JSON文件列表
|
|
current_jsons = [f for f in all_files if f.endswith(".json")]
|
|
# 获取非JSON文件列表
|
|
non_json_files = [f for f in all_files if not f.endswith(".json")]
|
|
|
|
# 计算文件数量
|
|
json_count = len(current_jsons)
|
|
non_json_count = len(non_json_files)
|
|
|
|
print(
|
|
f" ├─ 第 {int(time.time()-start_time)} 秒: 发现 {json_count} 个 JSON 文件, {non_json_count} 个非 JSON 文件"
|
|
)
|
|
|
|
# 如果JSON文件和非JSON文件数量相等且都大于0,认为处理完成
|
|
if json_count > 0 and json_count == non_json_count:
|
|
json_files = current_jsons
|
|
print(f"✅ 文件生成完成,JSON文件和非JSON文件数量相等(各{json_count}个)。")
|
|
break
|
|
|
|
time.sleep(check_interval)
|
|
else:
|
|
json_files = [f for f in os.listdir(input_folder) if f.endswith(".json")]
|
|
if json_files:
|
|
print(f"⚠️ 警告:等待超时,但已发现 {len(json_files)} 个 JSON 文件,继续处理。")
|
|
else:
|
|
print(f"❌ 在 {max_wait_time} 秒内未生成任何 JSON 文件。")
|
|
return False, 0
|
|
|
|
# 7. 查找所有 .json 文件
|
|
json_files = [f for f in os.listdir(input_folder) if f.endswith(".json")]
|
|
if not json_files:
|
|
print(f"❌ 未在 {input_folder} 中生成 JSON 文件。")
|
|
return False, 0
|
|
|
|
print(f"📄 发现 {len(json_files)} 个 JSON 文件: {json_files}")
|
|
|
|
# 8. 转换为可读格式并移动到输出目录
|
|
transferred_count = 0
|
|
for file_name in json_files:
|
|
src_path = os.path.join(input_folder, file_name)
|
|
readable_path = os.path.join(output_folder, f"{os.path.splitext(file_name)[0]}.json")
|
|
|
|
if convert_json_to_readable(src_path, readable_path):
|
|
transferred_count += 1
|
|
else:
|
|
print(f"⚠️ 跳过文件: {file_name}")
|
|
|
|
print(f"✅ 共转移并美化 {transferred_count}/{len(json_files)} 个 JSON 文件到: {output_folder}")
|
|
|
|
# ✅ 返回成功状态和实际转移数量
|
|
success = True
|
|
return success, transferred_count
|
|
|
|
|
|
def cleanup():
|
|
"""清理 Java 进程"""
|
|
global java_process
|
|
if java_process and java_process.poll() is None:
|
|
print("\n🛑 正在关闭 Java 服务...")
|
|
java_process.terminate()
|
|
try:
|
|
java_process.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
java_process.kill()
|
|
print("✅ Java 服务已关闭。")
|
|
|
|
|
|
# 注册退出钩子
|
|
atexit.register(cleanup)
|
|
|
|
|
|
# ==================== 使用示例 ====================
|
|
if __name__ == "__main__":
|
|
# 示例调用
|
|
input_dir = r"data/input/uploads"
|
|
output_dir = r"data/input/json"
|
|
best_bcl_dest = r"data/input/bcl"
|
|
|
|
success, _count = convert_project_to_json(input_dir, output_dir, best_bcl_dest)
|
|
|
|
if success:
|
|
print("\n🎉 ✅ 全部流程执行成功!")
|
|
else:
|
|
print("\n❌ 流程执行失败,请检查日志。")
|
|
|
|
input("\n按回车键退出...")
|