Files
KG_generation/project2kg_gradio.py
chentianrui f5f26c5cf8 上传代码
2025-09-08 17:58:02 +08:00

498 lines
19 KiB
Python

import os
import shutil
import time
import gradio as gr
from pathlib import Path
import traceback
import uuid
import random
import string
# 导入各个步骤需要的函数
from project2json.project_converter import convert_project_to_json
from transform_expense_preview import process_directory
from supplement_kg import costsummary_upwards
from equipment_calculation.main import bcl_calculate
from unified_structure import batch_write_BCLresult_into_json
from build_kg_ontolo import create_KGs_from_folder, connect_to_neo4j, read_config, clear_database
from export_kg_excel import export_knowledge_graph
from neo4j import GraphDatabase
import tempfile
# 基础目录
BASE_DIR = "project2json"
TEMP_DIR = tempfile.gettempdir() # 使用临时目录
# 生成随机8位ID
def generate_session_id():
return "".join(random.choices(string.ascii_uppercase + string.digits, k=8))
# 创建会话工作目录(仅会话根目录)
def create_session_directories(session_id):
# 仅创建会话根目录用于容纳每个文件的独立GUID临时目录
session_root = os.path.join(BASE_DIR, "outputs", session_id)
os.makedirs(session_root, exist_ok=True)
return {
"session_root": session_root,
}
# 为单个上传文件创建独立的GUID临时目录,包含六个子目录
def create_file_workdirs(session_id):
file_guid = uuid.uuid4().hex
root = os.path.join(BASE_DIR, "outputs", session_id, file_guid)
dirs = {
"root": root,
"upload_dir": os.path.join(root, "uploads"), # 生成json前的上传文件夹
"bcl_dir": os.path.join(root, "bcl"), # bcl计算文件文件夹(备用,当前流程未直接使用)
"json_dir": os.path.join(root, "json"), # 生成后的json文件夹
"merged_dir": os.path.join(root, "merged"),
"bcl_results_dir": os.path.join(root, "bclresults"),
"final_dir": os.path.join(root, "final"),
}
for d in dirs.values():
os.makedirs(d, exist_ok=True)
return dirs
# 清理会话目录
def clean_session_directories(session_id):
upload_dir = os.path.join(BASE_DIR, "uploads", session_id)
output_dir = os.path.join(BASE_DIR, "outputs", session_id)
# 清理上传目录
if os.path.exists(upload_dir):
try:
shutil.rmtree(upload_dir)
print(f"已清理上传目录: {upload_dir}")
except Exception as e:
print(f"清理上传目录时出错: {e}")
# 清理输出目录
if os.path.exists(output_dir):
try:
shutil.rmtree(output_dir)
print(f"已清理输出目录: {output_dir}")
except Exception as e:
print(f"清理输出目录时出错: {e}")
# 整合的转化流程函数,执行步骤1到步骤4
def convert_all_steps(files, progress=gr.Progress()):
try:
# 生成会话ID并创建会话根目录
session_id = generate_session_id()
print(f"生成会话ID: {session_id}")
session_dirs = create_session_directories(session_id)
session_root = session_dirs["session_root"]
# 连接Neo4j(提前连接,避免逐文件重复连接)
progress(0.05, desc="步骤0: 连接Neo4j数据库")
config = read_config()
if not connect_to_neo4j(
config.get("neo4j", "uri"), config.get("neo4j", "user"), config.get("neo4j", "password")
):
clean_session_directories(session_id)
return "转化失败:无法连接到Neo4j数据库。", []
total_files = len(files) if files else 0
if total_files == 0:
clean_session_directories(session_id)
return "未选择任何文件。", []
# 结果累计
total_converted_to_json = 0
total_cost_files = 0
total_bcl_json_write = 0
total_kg_created = 0
total_kg_expected = 0
all_deleted_projects = []
# 逐文件处理
for idx, file in enumerate(files, start=1):
file_name = os.path.basename(file.name)
stage_base = (idx - 1) / total_files
stage_span = 0.9 / total_files # 从0.1到1.0之间分配给各文件
# 创建该文件的独立GUID临时目录
fdirs = create_file_workdirs(session_id)
upload_dir = fdirs["upload_dir"]
json_dir = fdirs["json_dir"]
merged_dir = fdirs["merged_dir"]
bcl_results_dir = fdirs["bcl_results_dir"]
final_dir = fdirs["final_dir"]
# 步骤1.1: 保存上传的该文件
progress(stage_base + stage_span * 0.05, desc=f"[{idx}/{total_files}] 保存上传文件: {file_name}")
save_path = os.path.join(upload_dir, file_name)
shutil.copy(file.name, save_path)
# 步骤1.2: 转换为JSON
progress(stage_base + stage_span * 0.15, desc=f"[{idx}/{total_files}] 步骤1: 转换为JSON")
success, file_num = convert_project_to_json(upload_dir, json_dir)
total_converted_to_json += file_num if success else 0
# 步骤1.3: 处理JSON文件结构
progress(stage_base + stage_span * 0.30, desc=f"[{idx}/{total_files}] 处理JSON结构")
process_directory(json_dir)
# 步骤2: 费用向上汇总
progress(stage_base + stage_span * 0.45, desc=f"[{idx}/{total_files}] 步骤2: 费用向上汇总")
result_step2 = costsummary_upwards(json_dir, merged_dir)
total_cost_files += len(result_step2) if result_step2 else 0
# 步骤3.1: 计算工程量取费表
progress(stage_base + stage_span * 0.65, desc=f"[{idx}/{total_files}] 步骤3: 计算工程量取费表")
# 传入该文件工作区的 bcl 目录
bcl_calculate(merged_dir, bcl_results_dir, bcl_dir_path=fdirs["bcl_dir"])
# 步骤3.2: 将BCL结果写入JSON
progress(stage_base + stage_span * 0.80, desc=f"[{idx}/{total_files}] 写入BCL结果到JSON")
success_count_step3 = batch_write_BCLresult_into_json(merged_dir, bcl_results_dir, final_dir)
total_bcl_json_write += success_count_step3 if success_count_step3 else 0
# 步骤4: 写入知识图谱(针对该文件的final目录)
progress(stage_base + stage_span * 0.95, desc=f"[{idx}/{total_files}] 创建知识图谱")
success_count_step4, total_count_step4, deleted_projects = create_KGs_from_folder(final_dir)
total_kg_created += success_count_step4 if success_count_step4 else 0
total_kg_expected += total_count_step4 if total_count_step4 else 0
if deleted_projects:
all_deleted_projects.extend(deleted_projects)
# 清理所有会话目录
progress(0.98, desc="清理所有临时文件")
clean_session_directories(session_id)
progress(1.0, desc="转化完成")
# 汇总结果
deleted_msg = ""
if all_deleted_projects:
deleted_msg = f"\n已删除 {len(all_deleted_projects)} 个同名工程:{', '.join(all_deleted_projects)}"
result_summary = (
f"转化完成!\n"
f"步骤1: 成功转换 {total_converted_to_json} 个工程文件到JSON\n"
f"步骤2: 成功处理 {total_cost_files} 个费用汇总文件\n"
f"步骤3: 成功处理 {total_bcl_json_write} 个BCL计算结果\n"
f"步骤4: 成功创建 {total_kg_created}/{total_kg_expected} 个知识图谱"
f"{deleted_msg}\n"
f"所有临时文件已清理。\n\n"
f'请在下方选择知识图谱并点击"导出到Excel"按钮下载。'
)
# 获取知识图谱列表
kg_list = get_engineering_data_nodes()
# 返回结果并刷新知识图谱列表
return result_summary, gr.update(choices=kg_list, value=None)
except Exception as e:
error_msg = f"转化过程出错: {str(e)}\n{traceback.format_exc()}"
print(error_msg)
# 如果发生错误,尝试清理会话目录(如果session_id已定义)
if "session_id" in locals():
try:
clean_session_directories(session_id)
except Exception as cleanup_error:
print(f"清理会话目录时出错: {cleanup_error}")
return error_msg, gr.update(choices=[], value=None)
# 获取所有EngineeringData节点的名称
def get_engineering_data_nodes():
try:
# 加载配置并创建驱动
config = read_config()
uri = config.get("neo4j", "uri")
user = config.get("neo4j", "user")
password = config.get("neo4j", "password")
driver = GraphDatabase.driver(uri, auth=(user, password))
with driver.session() as session:
# 查询所有EngineeringData节点
query = """
MATCH (n:EngineeringData)
RETURN n.name as name
"""
result = session.run(query)
nodes = [record["name"] for record in result]
driver.close()
return nodes
except Exception as e:
print(f"获取EngineeringData节点失败: {e}")
return []
# 刷新知识图谱列表
def refresh_kg_list():
try:
nodes = get_engineering_data_nodes()
if nodes:
return gr.update(choices=nodes, value=None), f"找到 {len(nodes)} 个知识图谱"
else:
return gr.update(choices=[], value=None), "未找到任何知识图谱"
except Exception as e:
error_msg = f"刷新知识图谱列表失败: {str(e)}"
print(error_msg)
return gr.update(choices=[], value=None), error_msg
# 获取知识图谱的基本信息
def get_kg_properties(kg_name):
if not kg_name:
return "请先选择一个知识图谱"
try:
# 加载配置并创建驱动
config = read_config() # 假设 read_config 已定义
uri = config.get("neo4j", "uri")
user = config.get("neo4j", "user")
password = config.get("neo4j", "password")
driver = GraphDatabase.driver(uri, auth=(user, password))
properties = []
with driver.session() as session:
# 查询具有中文属性名的 EngineeringData 节点
try:
query = """
MATCH (ed:EngineeringData {name: $name})
RETURN
ed.name AS name,
ed.`上传时间` AS upload_time,
ed.`工程类型` AS project_type
LIMIT 1
"""
result = session.run(query, name=kg_name)
record = result.single()
if not record:
return f"未找到名为 '{kg_name}' 的知识图谱节点"
# 构建属性列表
# 知识图谱名称(使用 record["name"],确保与数据库一致)
properties.append({"name": "知识图谱名称", "value": record["name"] or kg_name})
# 上传时间
upload_time = record["upload_time"]
properties.append({"name": "上传时间", "value": upload_time or "未知"})
# 工程类型
project_type = record["project_type"]
properties.append({"name": "工程类型", "value": project_type or "未知"})
except Exception as e:
print(f"查询知识图谱属性失败: {str(e)}")
properties = [{"name": "错误", "value": f"查询失败: {str(e)}"}]
driver.close()
# 格式化输出信息
info_text = f"知识图谱 '{kg_name}' 的基本信息:\n\n"
for prop in properties:
info_text += f"{prop['name']}: {prop['value']}\n"
return info_text
except Exception as e:
error_msg = f"获取知识图谱属性信息失败: {str(e)}\n{traceback.format_exc()}"
print(error_msg)
return error_msg
# 导出指定的知识图谱到Excel
def export_specific_kg(kg_name, progress=gr.Progress()):
if not kg_name:
return None, "请先选择要导出的知识图谱", ""
try:
# 加载配置并创建驱动
config = read_config() # 假设 read_config 已定义
uri = config.get("neo4j", "uri")
user = config.get("neo4j", "user")
password = config.get("neo4j", "password")
driver = GraphDatabase.driver(uri, auth=(user, password))
# 获取软件名称作为文件名
software_name = None
with driver.session() as session:
# 查询软件名称
query = """
MATCH (ed:EngineeringData {name: $name})-[*1..3]->(pp:ProjectProperty)
WHERE pp.name = '工程名称'
RETURN pp.value as software_name
LIMIT 1
"""
result = session.run(query, name=kg_name)
record = result.single()
if record:
software_name = record["software_name"]
print(f"获取到工程名称: {software_name}")
else:
print(f"未找到知识图谱 {kg_name} 的软件名称属性")
# 设置当前工程名称
with driver.session() as session:
# 查询指定EngineeringData节点的GUID
query = """
MATCH (n:EngineeringData {name: $name})
RETURN id(n) as node_id
"""
result = session.run(query, name=kg_name)
record = result.single()
if not record:
driver.close()
return None, f"未找到名为 {kg_name} 的知识图谱", ""
node_id = record["node_id"]
# 设置当前工程
set_query = """
MATCH (n:EngineeringData) WHERE id(n) = $node_id
SET n.current = true
"""
session.run(set_query, node_id=node_id)
# 清除其他工程的current标记
clear_query = """
MATCH (n:EngineeringData) WHERE id(n) <> $node_id AND n.current = true
REMOVE n.current
"""
session.run(clear_query, node_id=node_id)
driver.close()
# 导出当前工程到Excel
progress(0.3, desc=f"导出知识图谱 {kg_name} 到Excel")
# 使用软件名称作为文件名,如果没有则使用知识图谱名称
file_name = software_name if software_name else kg_name
# 替换文件名中的非法字符
file_name = (
file_name.replace("/", "_")
.replace("\\", "_")
.replace(":", "_")
.replace("*", "_")
.replace("?", "_")
.replace('"', "_")
.replace("<", "_")
.replace(">", "_")
.replace("|", "_")
)
# 直接将软件名称传递给export_knowledge_graph函数
output_path = export_knowledge_graph(TEMP_DIR, file_name)
# 确保文件存在
if not os.path.exists(output_path):
return None, f"导出失败:未能生成Excel文件 {output_path}", ""
progress(1.0, desc="导出完成")
# 获取文件大小
file_size_bytes = os.path.getsize(output_path)
# 转换为可读格式
if file_size_bytes < 1024:
file_size_str = f"{file_size_bytes} B"
elif file_size_bytes < 1024 * 1024:
file_size_str = f"{file_size_bytes / 1024:.2f} KB"
else:
file_size_str = f"{file_size_bytes / (1024 * 1024):.2f} MB"
# 创建文件信息HTML
file_info_html = f"""
<div class="file-info">
<span class="file-name">文件名: {file_name}.xlsx</span>
<span class="file-size">文件大小: {file_size_str}</span>
</div>
"""
# 返回下载按钮的值、状态消息和文件信息HTML
return output_path, f"已成功导出知识图谱 {kg_name} 到Excel文件", file_info_html
except Exception as e:
error_msg = f"导出知识图谱 {kg_name} 出错: {str(e)}\n{traceback.format_exc()}"
print(error_msg)
return None, error_msg, ""
# 创建Gradio界面
def create_interface():
with gr.Blocks(title="工程知识图谱生成工具") as app:
gr.Markdown("# 工程知识图谱生成工具")
# gr.Markdown("上传工程文件,生成知识图谱并导出到Excel")
with gr.Row():
# 左侧:文件上传和转化按钮
with gr.Column(scale=2):
files_input = gr.File(file_count="multiple", label="上传工程文件")
convert_btn = gr.Button("转化文件", variant="primary", size="lg")
# 右侧:处理结果输出
with gr.Column(scale=3):
output_text = gr.Textbox(label="处理结果", lines=10)
# 添加分隔标题
gr.Markdown("# 知识图谱导出工具")
with gr.Row():
# 左侧:选择和导出功能
with gr.Column(scale=2):
# 添加下拉框和刷新按钮
with gr.Row():
kg_dropdown = gr.Dropdown(label="选择工程知识图谱", choices=[], interactive=True)
refresh_btn = gr.Button("刷新列表", size="sm")
export_btn = gr.Button(value="导出到Excel", variant="primary", interactive=True)
kg_info = gr.Textbox(label="知识图谱属性信息", lines=10)
# 右侧:日志信息区域
with gr.Column(scale=3):
download_file = gr.File(
label="下载文件", visible=True, type="filepath", interactive=False, elem_classes=["download-file"]
)
export_status = gr.Textbox(label="导出状态", visible=True, interactive=False)
# 设置按钮点击事件
convert_btn.click(convert_all_steps, inputs=[files_input], outputs=[output_text, kg_dropdown])
# 刷新按钮点击事件
refresh_btn.click(refresh_kg_list, inputs=[], outputs=[kg_dropdown, output_text])
# 下拉框选择事件
kg_dropdown.change(get_kg_properties, inputs=[kg_dropdown], outputs=[kg_info])
# 设置导出按钮的初始状态
kg_dropdown.change(lambda x: gr.update(interactive=bool(x)), inputs=[kg_dropdown], outputs=[export_btn])
# 导出按钮点击事件 - 先导出文件,然后显示下载组件
export_btn.click(fn=export_specific_kg, inputs=[kg_dropdown], outputs=[download_file, export_status])
return app
# 主函数
def main():
# 确保基础目录存在
# os.makedirs(os.path.join(BASE_DIR, "uploads"), exist_ok=True)
# os.makedirs(os.path.join(BASE_DIR, "outputs"), exist_ok=True)
# 创建并启动Gradio界面
app = create_interface()
app.launch(
share=False,
server_name="0.0.0.0",
server_port=7860,
)
if __name__ == "__main__":
main()