498 lines
19 KiB
Python
498 lines
19 KiB
Python
import os
|
|
import shutil
|
|
import time
|
|
import gradio as gr
|
|
from pathlib import Path
|
|
import traceback
|
|
import uuid
|
|
import random
|
|
import string
|
|
|
|
# 导入各个步骤需要的函数
|
|
from project2json.project_converter import convert_project_to_json
|
|
from transform_expense_preview import process_directory
|
|
from supplement_kg import costsummary_upwards
|
|
from equipment_calculation.main import bcl_calculate
|
|
from unified_structure import batch_write_BCLresult_into_json
|
|
from build_kg_ontolo import create_KGs_from_folder, connect_to_neo4j, read_config, clear_database
|
|
from export_kg_excel import export_knowledge_graph
|
|
from neo4j import GraphDatabase
|
|
import tempfile
|
|
|
|
# 基础目录
|
|
BASE_DIR = "project2json"
|
|
TEMP_DIR = tempfile.gettempdir() # 使用临时目录
|
|
|
|
|
|
# 生成随机8位ID
|
|
def generate_session_id():
|
|
return "".join(random.choices(string.ascii_uppercase + string.digits, k=8))
|
|
|
|
|
|
# 创建会话工作目录(仅会话根目录)
|
|
def create_session_directories(session_id):
|
|
# 仅创建会话根目录用于容纳每个文件的独立GUID临时目录
|
|
session_root = os.path.join(BASE_DIR, "outputs", session_id)
|
|
os.makedirs(session_root, exist_ok=True)
|
|
return {
|
|
"session_root": session_root,
|
|
}
|
|
|
|
|
|
# 为单个上传文件创建独立的GUID临时目录,包含六个子目录
|
|
def create_file_workdirs(session_id):
|
|
file_guid = uuid.uuid4().hex
|
|
root = os.path.join(BASE_DIR, "outputs", session_id, file_guid)
|
|
dirs = {
|
|
"root": root,
|
|
"upload_dir": os.path.join(root, "uploads"), # 生成json前的上传文件夹
|
|
"bcl_dir": os.path.join(root, "bcl"), # bcl计算文件文件夹(备用,当前流程未直接使用)
|
|
"json_dir": os.path.join(root, "json"), # 生成后的json文件夹
|
|
"merged_dir": os.path.join(root, "merged"),
|
|
"bcl_results_dir": os.path.join(root, "bclresults"),
|
|
"final_dir": os.path.join(root, "final"),
|
|
}
|
|
|
|
for d in dirs.values():
|
|
os.makedirs(d, exist_ok=True)
|
|
|
|
return dirs
|
|
|
|
|
|
# 清理会话目录
|
|
def clean_session_directories(session_id):
|
|
upload_dir = os.path.join(BASE_DIR, "uploads", session_id)
|
|
output_dir = os.path.join(BASE_DIR, "outputs", session_id)
|
|
|
|
# 清理上传目录
|
|
if os.path.exists(upload_dir):
|
|
try:
|
|
shutil.rmtree(upload_dir)
|
|
print(f"已清理上传目录: {upload_dir}")
|
|
except Exception as e:
|
|
print(f"清理上传目录时出错: {e}")
|
|
|
|
# 清理输出目录
|
|
if os.path.exists(output_dir):
|
|
try:
|
|
shutil.rmtree(output_dir)
|
|
print(f"已清理输出目录: {output_dir}")
|
|
except Exception as e:
|
|
print(f"清理输出目录时出错: {e}")
|
|
|
|
|
|
# 整合的转化流程函数,执行步骤1到步骤4
|
|
def convert_all_steps(files, progress=gr.Progress()):
|
|
try:
|
|
# 生成会话ID并创建会话根目录
|
|
session_id = generate_session_id()
|
|
print(f"生成会话ID: {session_id}")
|
|
session_dirs = create_session_directories(session_id)
|
|
session_root = session_dirs["session_root"]
|
|
|
|
# 连接Neo4j(提前连接,避免逐文件重复连接)
|
|
progress(0.05, desc="步骤0: 连接Neo4j数据库")
|
|
config = read_config()
|
|
if not connect_to_neo4j(
|
|
config.get("neo4j", "uri"), config.get("neo4j", "user"), config.get("neo4j", "password")
|
|
):
|
|
clean_session_directories(session_id)
|
|
return "转化失败:无法连接到Neo4j数据库。", []
|
|
|
|
total_files = len(files) if files else 0
|
|
if total_files == 0:
|
|
clean_session_directories(session_id)
|
|
return "未选择任何文件。", []
|
|
|
|
# 结果累计
|
|
total_converted_to_json = 0
|
|
total_cost_files = 0
|
|
total_bcl_json_write = 0
|
|
total_kg_created = 0
|
|
total_kg_expected = 0
|
|
all_deleted_projects = []
|
|
|
|
# 逐文件处理
|
|
for idx, file in enumerate(files, start=1):
|
|
file_name = os.path.basename(file.name)
|
|
stage_base = (idx - 1) / total_files
|
|
stage_span = 0.9 / total_files # 从0.1到1.0之间分配给各文件
|
|
|
|
# 创建该文件的独立GUID临时目录
|
|
fdirs = create_file_workdirs(session_id)
|
|
upload_dir = fdirs["upload_dir"]
|
|
json_dir = fdirs["json_dir"]
|
|
merged_dir = fdirs["merged_dir"]
|
|
bcl_results_dir = fdirs["bcl_results_dir"]
|
|
final_dir = fdirs["final_dir"]
|
|
|
|
# 步骤1.1: 保存上传的该文件
|
|
progress(stage_base + stage_span * 0.05, desc=f"[{idx}/{total_files}] 保存上传文件: {file_name}")
|
|
save_path = os.path.join(upload_dir, file_name)
|
|
shutil.copy(file.name, save_path)
|
|
|
|
# 步骤1.2: 转换为JSON
|
|
progress(stage_base + stage_span * 0.15, desc=f"[{idx}/{total_files}] 步骤1: 转换为JSON")
|
|
success, file_num = convert_project_to_json(upload_dir, json_dir)
|
|
total_converted_to_json += file_num if success else 0
|
|
|
|
# 步骤1.3: 处理JSON文件结构
|
|
progress(stage_base + stage_span * 0.30, desc=f"[{idx}/{total_files}] 处理JSON结构")
|
|
process_directory(json_dir)
|
|
|
|
# 步骤2: 费用向上汇总
|
|
progress(stage_base + stage_span * 0.45, desc=f"[{idx}/{total_files}] 步骤2: 费用向上汇总")
|
|
result_step2 = costsummary_upwards(json_dir, merged_dir)
|
|
total_cost_files += len(result_step2) if result_step2 else 0
|
|
|
|
# 步骤3.1: 计算工程量取费表
|
|
progress(stage_base + stage_span * 0.65, desc=f"[{idx}/{total_files}] 步骤3: 计算工程量取费表")
|
|
# 传入该文件工作区的 bcl 目录
|
|
bcl_calculate(merged_dir, bcl_results_dir, bcl_dir_path=fdirs["bcl_dir"])
|
|
|
|
# 步骤3.2: 将BCL结果写入JSON
|
|
progress(stage_base + stage_span * 0.80, desc=f"[{idx}/{total_files}] 写入BCL结果到JSON")
|
|
success_count_step3 = batch_write_BCLresult_into_json(merged_dir, bcl_results_dir, final_dir)
|
|
total_bcl_json_write += success_count_step3 if success_count_step3 else 0
|
|
|
|
# 步骤4: 写入知识图谱(针对该文件的final目录)
|
|
progress(stage_base + stage_span * 0.95, desc=f"[{idx}/{total_files}] 创建知识图谱")
|
|
success_count_step4, total_count_step4, deleted_projects = create_KGs_from_folder(final_dir)
|
|
total_kg_created += success_count_step4 if success_count_step4 else 0
|
|
total_kg_expected += total_count_step4 if total_count_step4 else 0
|
|
if deleted_projects:
|
|
all_deleted_projects.extend(deleted_projects)
|
|
|
|
# 清理所有会话目录
|
|
progress(0.98, desc="清理所有临时文件")
|
|
clean_session_directories(session_id)
|
|
|
|
progress(1.0, desc="转化完成")
|
|
|
|
# 汇总结果
|
|
deleted_msg = ""
|
|
if all_deleted_projects:
|
|
deleted_msg = f"\n已删除 {len(all_deleted_projects)} 个同名工程:{', '.join(all_deleted_projects)}"
|
|
|
|
result_summary = (
|
|
f"转化完成!\n"
|
|
f"步骤1: 成功转换 {total_converted_to_json} 个工程文件到JSON\n"
|
|
f"步骤2: 成功处理 {total_cost_files} 个费用汇总文件\n"
|
|
f"步骤3: 成功处理 {total_bcl_json_write} 个BCL计算结果\n"
|
|
f"步骤4: 成功创建 {total_kg_created}/{total_kg_expected} 个知识图谱"
|
|
f"{deleted_msg}\n"
|
|
f"所有临时文件已清理。\n\n"
|
|
f'请在下方选择知识图谱并点击"导出到Excel"按钮下载。'
|
|
)
|
|
|
|
# 获取知识图谱列表
|
|
kg_list = get_engineering_data_nodes()
|
|
|
|
# 返回结果并刷新知识图谱列表
|
|
return result_summary, gr.update(choices=kg_list, value=None)
|
|
except Exception as e:
|
|
error_msg = f"转化过程出错: {str(e)}\n{traceback.format_exc()}"
|
|
print(error_msg)
|
|
# 如果发生错误,尝试清理会话目录(如果session_id已定义)
|
|
if "session_id" in locals():
|
|
try:
|
|
clean_session_directories(session_id)
|
|
except Exception as cleanup_error:
|
|
print(f"清理会话目录时出错: {cleanup_error}")
|
|
return error_msg, gr.update(choices=[], value=None)
|
|
|
|
|
|
# 获取所有EngineeringData节点的名称
|
|
def get_engineering_data_nodes():
|
|
try:
|
|
# 加载配置并创建驱动
|
|
config = read_config()
|
|
uri = config.get("neo4j", "uri")
|
|
user = config.get("neo4j", "user")
|
|
password = config.get("neo4j", "password")
|
|
|
|
driver = GraphDatabase.driver(uri, auth=(user, password))
|
|
|
|
with driver.session() as session:
|
|
# 查询所有EngineeringData节点
|
|
query = """
|
|
MATCH (n:EngineeringData)
|
|
RETURN n.name as name
|
|
"""
|
|
result = session.run(query)
|
|
nodes = [record["name"] for record in result]
|
|
|
|
driver.close()
|
|
return nodes
|
|
except Exception as e:
|
|
print(f"获取EngineeringData节点失败: {e}")
|
|
return []
|
|
|
|
|
|
# 刷新知识图谱列表
|
|
def refresh_kg_list():
|
|
try:
|
|
nodes = get_engineering_data_nodes()
|
|
if nodes:
|
|
return gr.update(choices=nodes, value=None), f"找到 {len(nodes)} 个知识图谱"
|
|
else:
|
|
return gr.update(choices=[], value=None), "未找到任何知识图谱"
|
|
except Exception as e:
|
|
error_msg = f"刷新知识图谱列表失败: {str(e)}"
|
|
print(error_msg)
|
|
return gr.update(choices=[], value=None), error_msg
|
|
|
|
|
|
# 获取知识图谱的基本信息
|
|
def get_kg_properties(kg_name):
|
|
if not kg_name:
|
|
return "请先选择一个知识图谱"
|
|
|
|
try:
|
|
# 加载配置并创建驱动
|
|
config = read_config() # 假设 read_config 已定义
|
|
uri = config.get("neo4j", "uri")
|
|
user = config.get("neo4j", "user")
|
|
password = config.get("neo4j", "password")
|
|
|
|
driver = GraphDatabase.driver(uri, auth=(user, password))
|
|
|
|
properties = []
|
|
|
|
with driver.session() as session:
|
|
# 查询具有中文属性名的 EngineeringData 节点
|
|
try:
|
|
query = """
|
|
MATCH (ed:EngineeringData {name: $name})
|
|
RETURN
|
|
ed.name AS name,
|
|
ed.`上传时间` AS upload_time,
|
|
ed.`工程类型` AS project_type
|
|
LIMIT 1
|
|
"""
|
|
result = session.run(query, name=kg_name)
|
|
record = result.single()
|
|
|
|
if not record:
|
|
return f"未找到名为 '{kg_name}' 的知识图谱节点"
|
|
|
|
# 构建属性列表
|
|
# 知识图谱名称(使用 record["name"],确保与数据库一致)
|
|
properties.append({"name": "知识图谱名称", "value": record["name"] or kg_name})
|
|
|
|
# 上传时间
|
|
upload_time = record["upload_time"]
|
|
properties.append({"name": "上传时间", "value": upload_time or "未知"})
|
|
|
|
# 工程类型
|
|
project_type = record["project_type"]
|
|
properties.append({"name": "工程类型", "value": project_type or "未知"})
|
|
|
|
except Exception as e:
|
|
print(f"查询知识图谱属性失败: {str(e)}")
|
|
properties = [{"name": "错误", "value": f"查询失败: {str(e)}"}]
|
|
|
|
driver.close()
|
|
|
|
# 格式化输出信息
|
|
info_text = f"知识图谱 '{kg_name}' 的基本信息:\n\n"
|
|
for prop in properties:
|
|
info_text += f"{prop['name']}: {prop['value']}\n"
|
|
|
|
return info_text
|
|
|
|
except Exception as e:
|
|
error_msg = f"获取知识图谱属性信息失败: {str(e)}\n{traceback.format_exc()}"
|
|
print(error_msg)
|
|
return error_msg
|
|
|
|
|
|
# 导出指定的知识图谱到Excel
|
|
def export_specific_kg(kg_name, progress=gr.Progress()):
|
|
if not kg_name:
|
|
return None, "请先选择要导出的知识图谱", ""
|
|
|
|
try:
|
|
# 加载配置并创建驱动
|
|
config = read_config() # 假设 read_config 已定义
|
|
uri = config.get("neo4j", "uri")
|
|
user = config.get("neo4j", "user")
|
|
password = config.get("neo4j", "password")
|
|
|
|
driver = GraphDatabase.driver(uri, auth=(user, password))
|
|
|
|
# 获取软件名称作为文件名
|
|
software_name = None
|
|
with driver.session() as session:
|
|
# 查询软件名称
|
|
query = """
|
|
MATCH (ed:EngineeringData {name: $name})-[*1..3]->(pp:ProjectProperty)
|
|
WHERE pp.name = '工程名称'
|
|
RETURN pp.value as software_name
|
|
LIMIT 1
|
|
"""
|
|
result = session.run(query, name=kg_name)
|
|
record = result.single()
|
|
if record:
|
|
software_name = record["software_name"]
|
|
print(f"获取到工程名称: {software_name}")
|
|
else:
|
|
print(f"未找到知识图谱 {kg_name} 的软件名称属性")
|
|
|
|
# 设置当前工程名称
|
|
with driver.session() as session:
|
|
# 查询指定EngineeringData节点的GUID
|
|
query = """
|
|
MATCH (n:EngineeringData {name: $name})
|
|
RETURN id(n) as node_id
|
|
"""
|
|
result = session.run(query, name=kg_name)
|
|
record = result.single()
|
|
if not record:
|
|
driver.close()
|
|
return None, f"未找到名为 {kg_name} 的知识图谱", ""
|
|
|
|
node_id = record["node_id"]
|
|
|
|
# 设置当前工程
|
|
set_query = """
|
|
MATCH (n:EngineeringData) WHERE id(n) = $node_id
|
|
SET n.current = true
|
|
"""
|
|
session.run(set_query, node_id=node_id)
|
|
|
|
# 清除其他工程的current标记
|
|
clear_query = """
|
|
MATCH (n:EngineeringData) WHERE id(n) <> $node_id AND n.current = true
|
|
REMOVE n.current
|
|
"""
|
|
session.run(clear_query, node_id=node_id)
|
|
|
|
driver.close()
|
|
|
|
# 导出当前工程到Excel
|
|
progress(0.3, desc=f"导出知识图谱 {kg_name} 到Excel")
|
|
|
|
# 使用软件名称作为文件名,如果没有则使用知识图谱名称
|
|
file_name = software_name if software_name else kg_name
|
|
# 替换文件名中的非法字符
|
|
file_name = (
|
|
file_name.replace("/", "_")
|
|
.replace("\\", "_")
|
|
.replace(":", "_")
|
|
.replace("*", "_")
|
|
.replace("?", "_")
|
|
.replace('"', "_")
|
|
.replace("<", "_")
|
|
.replace(">", "_")
|
|
.replace("|", "_")
|
|
)
|
|
|
|
# 直接将软件名称传递给export_knowledge_graph函数
|
|
output_path = export_knowledge_graph(TEMP_DIR, file_name)
|
|
|
|
# 确保文件存在
|
|
if not os.path.exists(output_path):
|
|
return None, f"导出失败:未能生成Excel文件 {output_path}", ""
|
|
|
|
progress(1.0, desc="导出完成")
|
|
|
|
# 获取文件大小
|
|
file_size_bytes = os.path.getsize(output_path)
|
|
# 转换为可读格式
|
|
if file_size_bytes < 1024:
|
|
file_size_str = f"{file_size_bytes} B"
|
|
elif file_size_bytes < 1024 * 1024:
|
|
file_size_str = f"{file_size_bytes / 1024:.2f} KB"
|
|
else:
|
|
file_size_str = f"{file_size_bytes / (1024 * 1024):.2f} MB"
|
|
|
|
# 创建文件信息HTML
|
|
file_info_html = f"""
|
|
<div class="file-info">
|
|
<span class="file-name">文件名: {file_name}.xlsx</span>
|
|
<span class="file-size">文件大小: {file_size_str}</span>
|
|
</div>
|
|
"""
|
|
|
|
# 返回下载按钮的值、状态消息和文件信息HTML
|
|
return output_path, f"已成功导出知识图谱 {kg_name} 到Excel文件", file_info_html
|
|
except Exception as e:
|
|
error_msg = f"导出知识图谱 {kg_name} 出错: {str(e)}\n{traceback.format_exc()}"
|
|
print(error_msg)
|
|
return None, error_msg, ""
|
|
|
|
|
|
# 创建Gradio界面
|
|
def create_interface():
|
|
with gr.Blocks(title="工程知识图谱生成工具") as app:
|
|
gr.Markdown("# 工程知识图谱生成工具")
|
|
# gr.Markdown("上传工程文件,生成知识图谱并导出到Excel")
|
|
|
|
with gr.Row():
|
|
# 左侧:文件上传和转化按钮
|
|
with gr.Column(scale=2):
|
|
files_input = gr.File(file_count="multiple", label="上传工程文件")
|
|
convert_btn = gr.Button("转化文件", variant="primary", size="lg")
|
|
|
|
# 右侧:处理结果输出
|
|
with gr.Column(scale=3):
|
|
output_text = gr.Textbox(label="处理结果", lines=10)
|
|
|
|
# 添加分隔标题
|
|
gr.Markdown("# 知识图谱导出工具")
|
|
|
|
with gr.Row():
|
|
# 左侧:选择和导出功能
|
|
with gr.Column(scale=2):
|
|
# 添加下拉框和刷新按钮
|
|
with gr.Row():
|
|
kg_dropdown = gr.Dropdown(label="选择工程知识图谱", choices=[], interactive=True)
|
|
refresh_btn = gr.Button("刷新列表", size="sm")
|
|
|
|
export_btn = gr.Button(value="导出到Excel", variant="primary", interactive=True)
|
|
kg_info = gr.Textbox(label="知识图谱属性信息", lines=10)
|
|
|
|
# 右侧:日志信息区域
|
|
with gr.Column(scale=3):
|
|
download_file = gr.File(
|
|
label="下载文件", visible=True, type="filepath", interactive=False, elem_classes=["download-file"]
|
|
)
|
|
export_status = gr.Textbox(label="导出状态", visible=True, interactive=False)
|
|
|
|
# 设置按钮点击事件
|
|
convert_btn.click(convert_all_steps, inputs=[files_input], outputs=[output_text, kg_dropdown])
|
|
|
|
# 刷新按钮点击事件
|
|
refresh_btn.click(refresh_kg_list, inputs=[], outputs=[kg_dropdown, output_text])
|
|
|
|
# 下拉框选择事件
|
|
kg_dropdown.change(get_kg_properties, inputs=[kg_dropdown], outputs=[kg_info])
|
|
|
|
# 设置导出按钮的初始状态
|
|
kg_dropdown.change(lambda x: gr.update(interactive=bool(x)), inputs=[kg_dropdown], outputs=[export_btn])
|
|
|
|
# 导出按钮点击事件 - 先导出文件,然后显示下载组件
|
|
export_btn.click(fn=export_specific_kg, inputs=[kg_dropdown], outputs=[download_file, export_status])
|
|
|
|
return app
|
|
|
|
|
|
# 主函数
|
|
def main():
|
|
# 确保基础目录存在
|
|
# os.makedirs(os.path.join(BASE_DIR, "uploads"), exist_ok=True)
|
|
# os.makedirs(os.path.join(BASE_DIR, "outputs"), exist_ok=True)
|
|
|
|
# 创建并启动Gradio界面
|
|
app = create_interface()
|
|
app.launch(
|
|
share=False,
|
|
server_name="0.0.0.0",
|
|
server_port=7860,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|