""" 将Neo4j知识图谱导出为树状结构的txt文件 """ import os from neo4j import GraphDatabase import configparser from typing import Dict, List, Any, Set def read_config(config_file="config.ini"): """读取配置文件""" config = configparser.ConfigParser() config.read(config_file, encoding="utf-8") return config def connect_to_neo4j(): """连接到Neo4j数据库""" config = read_config() uri = config["neo4j"]["uri"] user = config["neo4j"]["user"] password = config["neo4j"]["password"] return GraphDatabase.driver(uri, auth=(user, password)) def query_kg_structure(driver): """查询知识图谱的结构""" with driver.session() as session: # 查询所有节点及其关系 query = """ MATCH (n) OPTIONAL MATCH (n)-[r:HAS_CHILD|USE]->(m) RETURN id(n) as node_id, labels(n)[0] as node_type, n.name as node_name, id(m) as child_id, labels(m)[0] as child_type, m.name as child_name, type(r) as relationship_type """ result = session.run(query) nodes = {} relationships = [] for record in result: node_id = record["node_id"] if node_id not in nodes: nodes[node_id] = { "id": node_id, "type": record["node_type"], "name": record["node_name"] or "", "children": [], } if record["child_id"] is not None: child_id = record["child_id"] if child_id not in nodes: nodes[child_id] = { "id": child_id, "type": record["child_type"], "name": record["child_name"] or "", "children": [], } relationships.append({"source": node_id, "target": child_id, "type": record["relationship_type"]}) return nodes, relationships def build_tree(nodes, relationships): """构建树状结构""" # 为每个节点添加子节点 for rel in relationships: source_id = rel["source"] target_id = rel["target"] if source_id in nodes and target_id in nodes: # 避免添加重复的子节点 if target_id not in [child["id"] for child in nodes[source_id]["children"]]: nodes[source_id]["children"].append(nodes[target_id]) # 找出根节点(没有父节点的节点) child_ids = set(rel["target"] for rel in relationships) root_ids = [node_id for node_id in nodes if node_id not in child_ids] # 优先选择EngineeringData类型的节点作为根节点 engineering_data_roots = [node_id for node_id in root_ids if nodes[node_id]["type"] == "EngineeringData"] if engineering_data_roots: roots = [nodes[node_id] for node_id in engineering_data_roots] else: roots = [nodes[node_id] for node_id in root_ids] return roots def export_tree_to_txt(roots, output_file="kg_tree_structure.txt"): """将树状结构导出为txt文件""" with open(output_file, "w", encoding="utf-8") as f: for root in roots: write_node(f, root, 0) print(f"知识图谱树状结构已导出到: {output_file}") def write_node(file, node, level): """递归写入节点及其子节点""" indent = " " * level node_info = f"{node['type']}" if node["name"]: node_info += f": {node['name']}" file.write(f"{indent}{node_info}\n") # 递归写入子节点 for child in node["children"]: write_node(file, child, level + 1) def export_kg_tree(output_file="kg_tree_structure.txt"): """导出知识图谱树状结构的主函数""" try: driver = connect_to_neo4j() nodes, relationships = query_kg_structure(driver) if not nodes: print("知识图谱中没有找到节点") return roots = build_tree(nodes, relationships) if not roots: print("无法确定知识图谱的根节点") return export_tree_to_txt(roots, output_file) except Exception as e: print(f"导出知识图谱树状结构时出错: {str(e)}") finally: if driver: driver.close() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="导出Neo4j知识图谱的树状结构") parser.add_argument("--output", "-o", default="kg_tree_structure.txt", help="输出文件路径") args = parser.parse_args() export_kg_tree(args.output)