565 lines
20 KiB
Python
565 lines
20 KiB
Python
from flask import Flask, render_template, jsonify, request, send_file
|
|
from neo4j import GraphDatabase
|
|
from anytree import Node
|
|
import json
|
|
import configparser
|
|
import os
|
|
import pandas as pd
|
|
from io import BytesIO
|
|
|
|
app = Flask(__name__, template_folder="templates")
|
|
|
|
|
|
# 读取配置文件
|
|
def read_config(config_file="config.ini"):
|
|
"""读取配置文件"""
|
|
config = configparser.ConfigParser()
|
|
config.read(config_file, encoding="utf-8")
|
|
return config
|
|
|
|
|
|
# 获取Neo4j连接配置
|
|
config = read_config()
|
|
NEO4J_URI = config["neo4j"]["uri"]
|
|
NEO4J_USERNAME = config["neo4j"]["user"]
|
|
NEO4J_PASSWORD = config["neo4j"]["password"]
|
|
|
|
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USERNAME, NEO4J_PASSWORD))
|
|
|
|
|
|
class KnowledgeGraphService:
|
|
@staticmethod
|
|
def get_hierarchy():
|
|
"""获取层次结构数据"""
|
|
# 修改查询,获取所有节点和它们之间的关系,包括HAS_CHILD和USE关系
|
|
query = """
|
|
MATCH path = (root:EngineeringData)-[:HAS_CHILD|USE*]->(node)
|
|
UNWIND range(0, length(path)-1) AS i
|
|
WITH path, i
|
|
MATCH (parent)-[r:HAS_CHILD|USE]->(child)
|
|
WHERE parent = nodes(path)[i] AND child = nodes(path)[i+1]
|
|
RETURN DISTINCT
|
|
labels(parent)[0] AS parent_type,
|
|
id(parent) AS parent_id,
|
|
labels(child)[0] AS child_type,
|
|
id(child) AS child_id,
|
|
parent.name AS parent_name,
|
|
child.name AS child_name,
|
|
type(r) AS relationship_type,
|
|
child.amount as amount,
|
|
child.unit as unit,
|
|
child.unitPrice as unit_price,
|
|
child.totalPrice as total_price
|
|
"""
|
|
|
|
with driver.session() as session:
|
|
result = session.run(query)
|
|
return [
|
|
{
|
|
"parent_type": record["parent_type"],
|
|
"parent_id": record["parent_id"],
|
|
"child_type": record["child_type"],
|
|
"child_id": record["child_id"],
|
|
"parent_name": record["parent_name"],
|
|
"child_name": record["child_name"],
|
|
"relationship_type": record["relationship_type"],
|
|
"amount": record.get("amount"),
|
|
"unit": record.get("unit"),
|
|
"unit_price": record.get("unit_price"),
|
|
"total_price": record.get("total_price"),
|
|
}
|
|
for record in result
|
|
]
|
|
|
|
@staticmethod
|
|
def build_tree_structure():
|
|
"""构建树状结构"""
|
|
hierarchy_data = KnowledgeGraphService.get_hierarchy()
|
|
nodes = {}
|
|
|
|
# 创建节点
|
|
for item in hierarchy_data:
|
|
parent_id = item["parent_id"]
|
|
child_id = item["child_id"]
|
|
|
|
# 创建父节点
|
|
if parent_id not in nodes:
|
|
parent_label = (
|
|
f"{item['parent_type']}: {item['parent_name']}" if item["parent_name"] else item["parent_type"]
|
|
)
|
|
nodes[parent_id] = {
|
|
"id": parent_id,
|
|
"label": parent_label,
|
|
"type": item["parent_type"],
|
|
"name": item["parent_name"],
|
|
"children": [],
|
|
}
|
|
|
|
# 创建子节点
|
|
if child_id not in nodes:
|
|
child_label = (
|
|
f"{item['child_type']}: {item['child_name']}" if item["child_name"] else item["child_type"]
|
|
)
|
|
# 添加费用相关属性
|
|
child_node = {
|
|
"id": child_id,
|
|
"label": child_label,
|
|
"type": item["child_type"],
|
|
"name": item["child_name"],
|
|
"children": [],
|
|
}
|
|
|
|
# 如果有费用相关属性,添加到节点中
|
|
if item.get("amount") is not None:
|
|
child_node["amount"] = item["amount"]
|
|
if item.get("unit") is not None:
|
|
child_node["unit"] = item["unit"]
|
|
if item.get("unit_price") is not None:
|
|
child_node["unitPrice"] = item["unit_price"]
|
|
if item.get("total_price") is not None:
|
|
child_node["totalPrice"] = item["total_price"]
|
|
|
|
nodes[child_id] = child_node
|
|
|
|
# 建立父子关系
|
|
if child_id not in [child["id"] for child in nodes[parent_id]["children"]]:
|
|
nodes[parent_id]["children"].append(nodes[child_id])
|
|
|
|
# 找到根节点 - 明确查找EngineeringData类型的节点作为根节点
|
|
roots = []
|
|
for node_id, node in nodes.items():
|
|
if node["type"] == "EngineeringData":
|
|
roots.append(node)
|
|
|
|
# 如果没有找到EngineeringData节点,则使用原来的方法查找根节点
|
|
if not roots:
|
|
all_children_ids = set()
|
|
for node in nodes.values():
|
|
for child in node["children"]:
|
|
all_children_ids.add(child["id"])
|
|
|
|
roots = [node for node in nodes.values() if node["id"] not in all_children_ids]
|
|
|
|
# 为每个节点添加费用预览子节点(如果有费用数据)
|
|
KnowledgeGraphService.add_cost_preview_nodes(nodes)
|
|
|
|
return {"roots": roots, "all_nodes": nodes}
|
|
|
|
@staticmethod
|
|
def add_cost_preview_nodes(nodes):
|
|
"""为有费用数据的节点添加费用预览子节点"""
|
|
for node_id, node in nodes.items():
|
|
# 检查子节点是否有费用数据
|
|
cost_items = []
|
|
for child in node.get("children", []):
|
|
if any(key in child for key in ["amount", "unitPrice", "totalPrice"]):
|
|
cost_items.append(child)
|
|
|
|
# 如果有费用数据,创建费用预览节点
|
|
if cost_items:
|
|
cost_preview_node = {
|
|
"id": f"cost_preview_{node_id}",
|
|
"label": "费用预览",
|
|
"type": "CostPreview",
|
|
"name": "费用预览",
|
|
"children": cost_items.copy(), # 复制费用项作为子节点
|
|
}
|
|
node["children"].append(cost_preview_node)
|
|
|
|
@staticmethod
|
|
def get_node_details(node_id):
|
|
"""获取节点详细信息"""
|
|
query = """
|
|
MATCH (n)
|
|
WHERE id(n) = $node_id
|
|
RETURN n, labels(n) as labels
|
|
"""
|
|
|
|
with driver.session() as session:
|
|
result = session.run(query, node_id=int(node_id))
|
|
record = result.single()
|
|
|
|
if record:
|
|
node = record["n"]
|
|
labels = record["labels"]
|
|
|
|
# 获取所有属性
|
|
properties = dict(node)
|
|
|
|
return {"id": node_id, "labels": labels, "properties": properties}
|
|
return None
|
|
|
|
@staticmethod
|
|
def get_children_details(node_id):
|
|
"""获取子节点详细信息"""
|
|
query = """
|
|
MATCH (parent)-[:HAS_CHILD|USE]->(child)
|
|
WHERE id(parent) = $node_id
|
|
RETURN child, labels(child) as labels, id(child) as child_id
|
|
"""
|
|
|
|
with driver.session() as session:
|
|
result = session.run(query, node_id=int(node_id))
|
|
children = []
|
|
|
|
for record in result:
|
|
child = record["child"]
|
|
labels = record["labels"]
|
|
child_id = record["child_id"]
|
|
|
|
children.append({"id": child_id, "labels": labels, "properties": dict(child)})
|
|
|
|
return children
|
|
|
|
@staticmethod
|
|
def get_cost_preview_data(node_id):
|
|
"""获取节点关联的费用数据(CostSet及其子节点)"""
|
|
# 首先获取节点类型,以便根据不同类型使用不同的查询
|
|
type_query = """
|
|
MATCH (node)
|
|
WHERE id(node) = $node_id
|
|
RETURN labels(node) as node_labels
|
|
"""
|
|
|
|
with driver.session() as session:
|
|
type_result = session.run(type_query, node_id=int(node_id))
|
|
type_record = type_result.single()
|
|
node_types = type_record["node_labels"] if type_record else []
|
|
|
|
# 根据节点类型选择适当的查询
|
|
# 查询节点通过USE关系连接的CostSet
|
|
query = """
|
|
MATCH (node)-[:USE]->(costSet:CostSet)
|
|
WHERE id(node) = $node_id
|
|
OPTIONAL MATCH (costSet)-[:HAS_CHILD]->(costItem)
|
|
RETURN costSet, id(costSet) as cost_set_id, costSet.name as cost_set_name,
|
|
costItem, id(costItem) as cost_item_id, labels(costItem) as cost_item_labels
|
|
"""
|
|
|
|
result = session.run(query, node_id=int(node_id))
|
|
|
|
# 存储CostSet节点信息
|
|
cost_sets = {}
|
|
cost_items = []
|
|
|
|
for record in result:
|
|
# 处理CostSet节点
|
|
if record.get("cost_set_id") is not None and record.get("cost_set_id") not in cost_sets:
|
|
cost_set = record["costSet"]
|
|
cost_sets[record["cost_set_id"]] = {
|
|
"id": record["cost_set_id"],
|
|
"name": record.get("cost_set_name", "费用集"),
|
|
"properties": dict(cost_set),
|
|
}
|
|
|
|
# 处理CostItem或MaterialandmachineCostItem节点
|
|
if record.get("costItem") is not None:
|
|
cost_item = record["costItem"]
|
|
cost_item_labels = record.get("cost_item_labels", [])
|
|
|
|
# 构建费用项属性
|
|
properties = dict(cost_item)
|
|
|
|
cost_items.append(
|
|
{
|
|
"id": record["cost_item_id"],
|
|
"labels": cost_item_labels,
|
|
"properties": properties,
|
|
"cost_set_id": record["cost_set_id"],
|
|
}
|
|
)
|
|
|
|
# 如果没有找到CostSet,检查是否有关联的CostItem
|
|
if not cost_sets and not cost_items:
|
|
# 检查节点是否直接关联CostItem
|
|
direct_cost_query = """
|
|
MATCH (node)-[:HAS_CHILD]->(costItem)
|
|
WHERE id(node) = $node_id AND
|
|
(costItem:CostItem OR costItem:MaterialandmachineCostItem)
|
|
RETURN costItem, id(costItem) as cost_item_id, labels(costItem) as cost_item_labels
|
|
"""
|
|
|
|
direct_cost_result = session.run(direct_cost_query, node_id=int(node_id))
|
|
for record in direct_cost_result:
|
|
if record.get("costItem") is not None:
|
|
cost_item = record["costItem"]
|
|
cost_item_labels = record.get("cost_item_labels", [])
|
|
|
|
cost_items.append(
|
|
{"id": record["cost_item_id"], "labels": cost_item_labels, "properties": dict(cost_item)}
|
|
)
|
|
|
|
# 如果仍然没有找到费用项,检查节点自身是否有费用相关属性
|
|
if not cost_items:
|
|
direct_query = """
|
|
MATCH (node)
|
|
WHERE id(node) = $node_id AND
|
|
(node.amount IS NOT NULL OR
|
|
node.unitPrice IS NOT NULL OR
|
|
node.totalPrice IS NOT NULL OR
|
|
node.cost IS NOT NULL)
|
|
RETURN node, id(node) as node_id, labels(node) as node_labels
|
|
"""
|
|
|
|
direct_result = session.run(direct_query, node_id=int(node_id))
|
|
for record in direct_result:
|
|
if record.get("node") is not None:
|
|
node = record["node"]
|
|
properties = dict(node)
|
|
|
|
cost_items.append(
|
|
{
|
|
"id": record["node_id"],
|
|
"labels": record.get("node_labels", []),
|
|
"properties": properties,
|
|
}
|
|
)
|
|
|
|
return {"cost_sets": list(cost_sets.values()), "cost_items": cost_items}
|
|
|
|
@staticmethod
|
|
def export_tree_to_excel(node_id=None):
|
|
"""将整个工程导出为Excel格式"""
|
|
# 获取EngineeringData根节点
|
|
eng_data = KnowledgeGraphService.get_engineering_data_node()
|
|
if not eng_data:
|
|
return None, "未找到工程根节点"
|
|
|
|
# 获取根节点详情
|
|
root_id = eng_data["id"]
|
|
root_name = eng_data["name"]
|
|
|
|
# 递归获取所有子节点
|
|
tree_data = KnowledgeGraphService.get_full_tree_structure(root_id)
|
|
|
|
# 将树状结构转换为扁平结构,适合Excel展示
|
|
flat_data = KnowledgeGraphService.flatten_tree_structure(tree_data)
|
|
|
|
# 创建DataFrame
|
|
df = pd.DataFrame(flat_data)
|
|
|
|
# 创建Excel文件
|
|
output = BytesIO()
|
|
with pd.ExcelWriter(output, engine="openpyxl") as writer:
|
|
df.to_excel(writer, index=False)
|
|
|
|
output.seek(0)
|
|
return output, root_name
|
|
|
|
@staticmethod
|
|
def get_full_tree_structure(node_id):
|
|
"""递归获取节点及其所有子节点的完整树状结构"""
|
|
# 获取节点详情
|
|
node_details = KnowledgeGraphService.get_node_details(node_id)
|
|
if not node_details:
|
|
return None
|
|
|
|
# 获取子节点
|
|
children = KnowledgeGraphService.get_children_details(node_id)
|
|
|
|
# 构建树节点
|
|
tree_node = {
|
|
"id": node_id,
|
|
"type": node_details.get("labels", [""])[0] if node_details.get("labels") else "",
|
|
"name": node_details.get("properties", {}).get("name", ""),
|
|
"properties": node_details.get("properties", {}),
|
|
"children": [],
|
|
}
|
|
|
|
# 递归处理子节点
|
|
for child in children:
|
|
# 跳过费用预览相关节点
|
|
if (child.get("labels") and any("Cost" in label for label in child.get("labels"))) or (
|
|
child.get("properties", {}).get("name", "").find("费用预览") != -1
|
|
):
|
|
continue
|
|
|
|
child_tree = KnowledgeGraphService.get_full_tree_structure(child["id"])
|
|
if child_tree:
|
|
tree_node["children"].append(child_tree)
|
|
|
|
return tree_node
|
|
|
|
@staticmethod
|
|
def flatten_tree_structure(tree_node, level=0, parent_path=""):
|
|
"""将树状结构转换为扁平结构,适合Excel展示"""
|
|
if not tree_node:
|
|
return []
|
|
|
|
# 当前节点的路径
|
|
current_path = f"{parent_path}/{tree_node['name']}" if parent_path else tree_node["name"]
|
|
|
|
# 当前节点的数据
|
|
node_data = {
|
|
"层级": level,
|
|
"路径": current_path,
|
|
"节点类型": tree_node["type"],
|
|
"节点名称": tree_node["name"],
|
|
}
|
|
|
|
# 添加其他属性
|
|
properties = tree_node.get("properties", {})
|
|
for key, value in properties.items():
|
|
if key not in ["name"]: # 排除已包含的属性
|
|
node_data[key] = value
|
|
|
|
# 当前节点及其所有子节点的扁平数据
|
|
flat_data = [node_data]
|
|
|
|
# 递归处理子节点
|
|
for child in tree_node.get("children", []):
|
|
flat_data.extend(KnowledgeGraphService.flatten_tree_structure(child, level + 1, current_path))
|
|
|
|
return flat_data
|
|
|
|
@staticmethod
|
|
def get_engineering_data_node():
|
|
"""获取EngineeringData类型的根节点"""
|
|
query = """
|
|
MATCH (n:EngineeringData)
|
|
RETURN id(n) as node_id, n.name as name
|
|
LIMIT 1
|
|
"""
|
|
|
|
with driver.session() as session:
|
|
result = session.run(query)
|
|
record = result.single()
|
|
|
|
if record:
|
|
return {"id": record["node_id"], "name": record["name"] or "工程"}
|
|
return None
|
|
|
|
|
|
@app.route("/")
|
|
def index():
|
|
return render_template("html_template.html")
|
|
|
|
|
|
@app.route("/api/tree")
|
|
def get_tree():
|
|
"""获取树状结构API"""
|
|
try:
|
|
tree_data = KnowledgeGraphService.build_tree_structure()
|
|
return jsonify(tree_data)
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/node/<int:node_id>")
|
|
def get_node_info(node_id):
|
|
"""获取节点信息API"""
|
|
try:
|
|
node_details = KnowledgeGraphService.get_node_details(node_id)
|
|
children_details = KnowledgeGraphService.get_children_details(node_id)
|
|
|
|
if node_details:
|
|
return jsonify({"node": node_details, "children": children_details})
|
|
else:
|
|
return jsonify({"error": "Node not found"}), 404
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/node/<int:node_id>/cost")
|
|
def get_node_cost_info(node_id):
|
|
"""获取节点费用信息API"""
|
|
try:
|
|
cost_data = KnowledgeGraphService.get_cost_preview_data(node_id)
|
|
return jsonify(cost_data)
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/export/<int:node_id>")
|
|
def export_node_tree(node_id):
|
|
"""导出整个工程知识图谱为Excel,每个一级子节点为一个工作表"""
|
|
try:
|
|
# 获取EngineeringData根节点
|
|
eng_data = KnowledgeGraphService.get_engineering_data_node()
|
|
if not eng_data:
|
|
return jsonify({"error": "未找到工程根节点"}), 404
|
|
|
|
root_id = eng_data["id"]
|
|
root_name = eng_data["name"]
|
|
|
|
# 获取根节点的一级子节点
|
|
children = KnowledgeGraphService.get_children_details(root_id)
|
|
|
|
# 创建Excel文件
|
|
output = BytesIO()
|
|
with pd.ExcelWriter(output, engine="openpyxl") as writer:
|
|
# 为每个一级子节点创建一个工作表
|
|
for child in children:
|
|
# 跳过费用相关节点
|
|
if child.get("labels") and any("Cost" in label for label in child.get("labels")):
|
|
continue
|
|
|
|
# 获取该子节点的完整树结构
|
|
tree_data = KnowledgeGraphService.get_full_tree_structure(child["id"])
|
|
if not tree_data:
|
|
continue
|
|
|
|
# 将树结构转换为扁平结构
|
|
flat_data = KnowledgeGraphService.flatten_tree_structure(tree_data)
|
|
|
|
# 创建DataFrame并写入工作表
|
|
df = pd.DataFrame(flat_data)
|
|
sheet_name = child.get("properties", {}).get("name", f"Sheet{len(writer.sheets) + 1}")
|
|
# 工作表名称不能超过31个字符且不能包含特殊字符
|
|
sheet_name = (
|
|
sheet_name[:31]
|
|
.replace(":", "")
|
|
.replace("\\", "")
|
|
.replace("/", "")
|
|
.replace("?", "")
|
|
.replace("*", "")
|
|
.replace("[", "")
|
|
.replace("]", "")
|
|
)
|
|
df.to_excel(writer, sheet_name=sheet_name, index=False)
|
|
|
|
# 添加一个总览工作表
|
|
overview_tree = KnowledgeGraphService.get_full_tree_structure(root_id)
|
|
overview_data = KnowledgeGraphService.flatten_tree_structure(overview_tree)
|
|
overview_df = pd.DataFrame(overview_data)
|
|
overview_df.to_excel(writer, sheet_name="总览", index=False)
|
|
|
|
output.seek(0)
|
|
return send_file(
|
|
output,
|
|
mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
as_attachment=True,
|
|
download_name=f"{root_name}.xlsx",
|
|
)
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/export_root")
|
|
def export_root_tree():
|
|
"""获取根节点并导出为Excel"""
|
|
try:
|
|
# 获取根节点
|
|
eng_data = KnowledgeGraphService.get_engineering_data_node()
|
|
if not eng_data:
|
|
return jsonify({"error": "未找到工程根节点"}), 404
|
|
|
|
# 导出Excel
|
|
output, root_name = KnowledgeGraphService.export_tree_to_excel()
|
|
if not output:
|
|
return jsonify({"error": root_name}), 500
|
|
|
|
# 返回Excel文件
|
|
return send_file(
|
|
output,
|
|
mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
as_attachment=True,
|
|
download_name=f"{root_name}.xlsx",
|
|
)
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(debug=True, host="0.0.0.0", port=5000)
|