From 699ef24d453e51c64e51e4b7ab585d9332b0e779 Mon Sep 17 00:00:00 2001 From: chentianrui Date: Sat, 2 Aug 2025 14:51:43 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=20kg=5Fvisualization.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kg_visualization.py | 493 -------------------------------------------- 1 file changed, 493 deletions(-) delete mode 100644 kg_visualization.py diff --git a/kg_visualization.py b/kg_visualization.py deleted file mode 100644 index a41551d..0000000 --- a/kg_visualization.py +++ /dev/null @@ -1,493 +0,0 @@ -from flask import Flask, render_template, jsonify, request, send_file -from neo4j import GraphDatabase -from anytree import Node -import json -import configparser -import os -import pandas as pd -from io import BytesIO - -app = Flask(__name__, template_folder="templates") - - -# 读取配置文件 -def read_config(config_file="config.ini"): - """读取配置文件""" - config = configparser.ConfigParser() - config.read(config_file, encoding="utf-8") - return config - - -# 获取Neo4j连接配置 -config = read_config() -NEO4J_URI = config["neo4j"]["uri"] -NEO4J_USERNAME = config["neo4j"]["user"] -NEO4J_PASSWORD = config["neo4j"]["password"] - -driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USERNAME, NEO4J_PASSWORD)) - - -class KnowledgeGraphService: - @staticmethod - def get_hierarchy(): - """获取层次结构数据""" - # 修改查询,获取所有节点和它们之间的关系,包括HAS_CHILD和USE关系 - query = """ - MATCH path = (root:EngineeringData)-[:HAS_CHILD|USE*]->(node) - UNWIND range(0, length(path)-1) AS i - WITH path, i - MATCH (parent)-[r:HAS_CHILD|USE]->(child) - WHERE parent = nodes(path)[i] AND child = nodes(path)[i+1] - RETURN DISTINCT - labels(parent)[0] AS parent_type, - id(parent) AS parent_id, - labels(child)[0] AS child_type, - id(child) AS child_id, - parent.name AS parent_name, - child.name AS child_name, - type(r) AS relationship_type, - child.amount as amount, - child.unit as unit, - child.unitPrice as unit_price, - child.totalPrice as total_price - """ - - with driver.session() as session: - result = session.run(query) - return [ - { - "parent_type": record["parent_type"], - "parent_id": record["parent_id"], - "child_type": record["child_type"], - "child_id": record["child_id"], - "parent_name": record["parent_name"], - "child_name": record["child_name"], - "relationship_type": record["relationship_type"], - "amount": record.get("amount"), - "unit": record.get("unit"), - "unit_price": record.get("unit_price"), - "total_price": record.get("total_price"), - } - for record in result - ] - - @staticmethod - def build_tree_structure(): - """构建树状结构""" - hierarchy_data = KnowledgeGraphService.get_hierarchy() - nodes = {} - - # 创建节点 - for item in hierarchy_data: - parent_id = item["parent_id"] - child_id = item["child_id"] - - # 创建父节点 - if parent_id not in nodes: - parent_label = ( - f"{item['parent_type']}: {item['parent_name']}" if item["parent_name"] else item["parent_type"] - ) - nodes[parent_id] = { - "id": parent_id, - "label": parent_label, - "type": item["parent_type"], - "name": item["parent_name"], - "children": [], - } - - # 创建子节点 - if child_id not in nodes: - child_label = ( - f"{item['child_type']}: {item['child_name']}" if item["child_name"] else item["child_type"] - ) - # 添加费用相关属性 - child_node = { - "id": child_id, - "label": child_label, - "type": item["child_type"], - "name": item["child_name"], - "children": [], - } - - # 如果有费用相关属性,添加到节点中 - if item.get("amount") is not None: - child_node["amount"] = item["amount"] - if item.get("unit") is not None: - child_node["unit"] = item["unit"] - if item.get("unit_price") is not None: - child_node["unitPrice"] = item["unit_price"] - if item.get("total_price") is not None: - child_node["totalPrice"] = item["total_price"] - - nodes[child_id] = child_node - - # 建立父子关系 - if child_id not in [child["id"] for child in nodes[parent_id]["children"]]: - nodes[parent_id]["children"].append(nodes[child_id]) - - # 找到根节点 - 明确查找EngineeringData类型的节点作为根节点 - roots = [] - for node_id, node in nodes.items(): - if node["type"] == "EngineeringData": - roots.append(node) - - # 如果没有找到EngineeringData节点,则使用原来的方法查找根节点 - if not roots: - all_children_ids = set() - for node in nodes.values(): - for child in node["children"]: - all_children_ids.add(child["id"]) - - roots = [node for node in nodes.values() if node["id"] not in all_children_ids] - - # 为每个节点添加费用预览子节点(如果有费用数据) - KnowledgeGraphService.add_cost_preview_nodes(nodes) - - return {"roots": roots, "all_nodes": nodes} - - @staticmethod - def add_cost_preview_nodes(nodes): - """为有费用数据的节点添加费用预览子节点""" - for node_id, node in nodes.items(): - # 检查子节点是否有费用数据 - cost_items = [] - for child in node.get("children", []): - if any(key in child for key in ["amount", "unitPrice", "totalPrice"]): - cost_items.append(child) - - # 如果有费用数据,创建费用预览节点 - if cost_items: - cost_preview_node = { - "id": f"cost_preview_{node_id}", - "label": "费用预览", - "type": "CostPreview", - "name": "费用预览", - "children": cost_items.copy(), # 复制费用项作为子节点 - } - node["children"].append(cost_preview_node) - - @staticmethod - def get_node_details(node_id): - """获取节点详细信息""" - query = """ - MATCH (n) - WHERE id(n) = $node_id - RETURN n, labels(n) as labels - """ - - with driver.session() as session: - result = session.run(query, node_id=int(node_id)) - record = result.single() - - if record: - node = record["n"] - labels = record["labels"] - - # 获取所有属性 - properties = dict(node) - - return {"id": node_id, "labels": labels, "properties": properties} - return None - - @staticmethod - def get_children_details(node_id): - """获取子节点详细信息""" - query = """ - MATCH (parent)-[:HAS_CHILD|USE]->(child) - WHERE id(parent) = $node_id - RETURN child, labels(child) as labels, id(child) as child_id - """ - - with driver.session() as session: - result = session.run(query, node_id=int(node_id)) - children = [] - - for record in result: - child = record["child"] - labels = record["labels"] - child_id = record["child_id"] - - children.append({"id": child_id, "labels": labels, "properties": dict(child)}) - - return children - - @staticmethod - def get_cost_preview_data(node_id): - """获取节点关联的费用数据(CostSet及其子节点)""" - # 首先获取节点类型,以便根据不同类型使用不同的查询 - type_query = """ - MATCH (node) - WHERE id(node) = $node_id - RETURN labels(node) as node_labels - """ - - with driver.session() as session: - type_result = session.run(type_query, node_id=int(node_id)) - type_record = type_result.single() - node_types = type_record["node_labels"] if type_record else [] - - # 根据节点类型选择适当的查询 - # 查询节点通过USE关系连接的CostSet - query = """ - MATCH (node)-[:USE]->(costSet:CostSet) - WHERE id(node) = $node_id - OPTIONAL MATCH (costSet)-[:HAS_CHILD]->(costItem) - RETURN costSet, id(costSet) as cost_set_id, costSet.name as cost_set_name, - costItem, id(costItem) as cost_item_id, labels(costItem) as cost_item_labels - """ - - result = session.run(query, node_id=int(node_id)) - - # 存储CostSet节点信息 - cost_sets = {} - cost_items = [] - - for record in result: - # 处理CostSet节点 - if record.get("cost_set_id") is not None and record.get("cost_set_id") not in cost_sets: - cost_set = record["costSet"] - cost_sets[record["cost_set_id"]] = { - "id": record["cost_set_id"], - "name": record.get("cost_set_name", "费用集"), - "properties": dict(cost_set), - } - - # 处理CostItem或MaterialandmachineCostItem节点 - if record.get("costItem") is not None: - cost_item = record["costItem"] - cost_item_labels = record.get("cost_item_labels", []) - - # 构建费用项属性 - properties = dict(cost_item) - - cost_items.append( - { - "id": record["cost_item_id"], - "labels": cost_item_labels, - "properties": properties, - "cost_set_id": record["cost_set_id"], - } - ) - - # 如果没有找到CostSet,检查是否有关联的CostItem - if not cost_sets and not cost_items: - # 检查节点是否直接关联CostItem - direct_cost_query = """ - MATCH (node)-[:HAS_CHILD]->(costItem) - WHERE id(node) = $node_id AND - (costItem:CostItem OR costItem:MaterialandmachineCostItem) - RETURN costItem, id(costItem) as cost_item_id, labels(costItem) as cost_item_labels - """ - - direct_cost_result = session.run(direct_cost_query, node_id=int(node_id)) - for record in direct_cost_result: - if record.get("costItem") is not None: - cost_item = record["costItem"] - cost_item_labels = record.get("cost_item_labels", []) - - cost_items.append( - {"id": record["cost_item_id"], "labels": cost_item_labels, "properties": dict(cost_item)} - ) - - # 如果仍然没有找到费用项,检查节点自身是否有费用相关属性 - if not cost_items: - direct_query = """ - MATCH (node) - WHERE id(node) = $node_id AND - (node.amount IS NOT NULL OR - node.unitPrice IS NOT NULL OR - node.totalPrice IS NOT NULL OR - node.cost IS NOT NULL) - RETURN node, id(node) as node_id, labels(node) as node_labels - """ - - direct_result = session.run(direct_query, node_id=int(node_id)) - for record in direct_result: - if record.get("node") is not None: - node = record["node"] - properties = dict(node) - - cost_items.append( - { - "id": record["node_id"], - "labels": record.get("node_labels", []), - "properties": properties, - } - ) - - return {"cost_sets": list(cost_sets.values()), "cost_items": cost_items} - - @staticmethod - def export_tree_to_excel(node_id=None): - """将整个工程导出为Excel格式""" - # 获取EngineeringData根节点 - eng_data = KnowledgeGraphService.get_engineering_data_node() - if not eng_data: - return None, "未找到工程根节点" - - # 获取根节点详情 - root_id = eng_data["id"] - root_name = eng_data["name"] - - # 递归获取所有子节点 - tree_data = KnowledgeGraphService.get_full_tree_structure(root_id) - - # 将树状结构转换为扁平结构,适合Excel展示 - flat_data = KnowledgeGraphService.flatten_tree_structure(tree_data) - - # 创建DataFrame - df = pd.DataFrame(flat_data) - - # 创建Excel文件 - output = BytesIO() - with pd.ExcelWriter(output, engine="openpyxl") as writer: - df.to_excel(writer, index=False) - - output.seek(0) - return output, root_name - - @staticmethod - def get_full_tree_structure(node_id): - """递归获取节点及其所有子节点的完整树状结构""" - # 获取节点详情 - node_details = KnowledgeGraphService.get_node_details(node_id) - if not node_details: - return None - - # 获取子节点 - children = KnowledgeGraphService.get_children_details(node_id) - - # 构建树节点 - tree_node = { - "id": node_id, - "type": node_details.get("labels", [""])[0] if node_details.get("labels") else "", - "name": node_details.get("properties", {}).get("name", ""), - "properties": node_details.get("properties", {}), - "children": [], - } - - # 递归处理子节点 - for child in children: - # 跳过费用预览相关节点 - if (child.get("labels") and any("Cost" in label for label in child.get("labels"))) or ( - child.get("properties", {}).get("name", "").find("费用预览") != -1 - ): - continue - - child_tree = KnowledgeGraphService.get_full_tree_structure(child["id"]) - if child_tree: - tree_node["children"].append(child_tree) - - return tree_node - - @staticmethod - def flatten_tree_structure(tree_node, level=0, parent_path=""): - """将树状结构转换为扁平结构,适合Excel展示""" - if not tree_node: - return [] - - # 当前节点的路径 - current_path = f"{parent_path}/{tree_node['name']}" if parent_path else tree_node["name"] - - # 当前节点的数据 - node_data = { - "层级": level, - "路径": current_path, - "节点类型": tree_node["type"], - "节点名称": tree_node["name"], - } - - # 添加其他属性 - properties = tree_node.get("properties", {}) - for key, value in properties.items(): - if key not in ["name"]: # 排除已包含的属性 - node_data[key] = value - - # 当前节点及其所有子节点的扁平数据 - flat_data = [node_data] - - # 递归处理子节点 - for child in tree_node.get("children", []): - flat_data.extend(KnowledgeGraphService.flatten_tree_structure(child, level + 1, current_path)) - - return flat_data - - @staticmethod - def get_engineering_data_node(): - """获取EngineeringData类型的根节点""" - query = """ - MATCH (n:EngineeringData) - RETURN id(n) as node_id, n.name as name - LIMIT 1 - """ - - with driver.session() as session: - result = session.run(query) - record = result.single() - - if record: - return {"id": record["node_id"], "name": record["name"] or "工程"} - return None - - -@app.route("/") -def index(): - return render_template("html_template.html") - - -@app.route("/api/tree") -def get_tree(): - """获取树状结构API""" - try: - tree_data = KnowledgeGraphService.build_tree_structure() - return jsonify(tree_data) - except Exception as e: - return jsonify({"error": str(e)}), 500 - - -@app.route("/api/node/") -def get_node_info(node_id): - """获取节点信息API""" - try: - node_details = KnowledgeGraphService.get_node_details(node_id) - children_details = KnowledgeGraphService.get_children_details(node_id) - - if node_details: - return jsonify({"node": node_details, "children": children_details}) - else: - return jsonify({"error": "Node not found"}), 404 - except Exception as e: - return jsonify({"error": str(e)}), 500 - - -@app.route("/api/node//cost") -def get_node_cost_info(node_id): - """获取节点费用信息API""" - try: - cost_data = KnowledgeGraphService.get_cost_preview_data(node_id) - return jsonify(cost_data) - except Exception as e: - return jsonify({"error": str(e)}), 500 - - -@app.route("/api/export/") -def export_node_tree(node_id): - """导出整个工程知识图谱为Excel""" - try: - # 忽略传入的node_id参数,总是导出整个工程 - excel_file, filename = KnowledgeGraphService.export_tree_to_excel(None) - if excel_file: - return send_file( - excel_file, - mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", - as_attachment=True, - download_name=f"{filename}.xlsx", - ) - else: - return jsonify({"error": "无法生成Excel文件"}), 500 - except Exception as e: - return jsonify({"error": str(e)}), 500 - - -if __name__ == "__main__": - app.run(debug=True, host="0.0.0.0", port=5000)