Files
KG_generation/export_kg_excel.py
2025-08-18 15:14:37 +08:00

1189 lines
46 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pandas as pd
from configparser import ConfigParser
from neo4j import GraphDatabase
from datetime import datetime
import re
import os
from typing import Dict, List, Any
# ------------------------ 通用工具函数 ------------------------
def load_neo4j_config(config_file: str = "config.ini") -> Dict[str, str]:
"""从config.ini加载Neo4j配置"""
config = ConfigParser()
config.read(config_file)
return {
"uri": config.get("neo4j", "uri"),
"user": config.get("neo4j", "user"),
"password": config.get("neo4j", "password"),
}
def get_project_name(driver) -> str:
"""获取工程名称"""
query = """
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(ps:ProjectPropertySet)-[:HAS_CHILD]->(n:ProjectProperty {name: '工程名称'})
RETURN n.value as project_name
LIMIT 1
"""
with driver.session() as session:
result = session.run(query)
record = result.single()
if record:
return record["project_name"]
# 如果没有找到,尝试使用旧的查询方式(向后兼容)
old_query = """
MATCH (n:ProjectProperty {name: '工程名称'})
RETURN n.value as project_name
LIMIT 1
"""
result = session.run(old_query)
record = result.single()
if record:
return record["project_name"]
return "" # 返回空字符串而非None
def get_project_type(driver) -> str:
"""
获取工程类型(清单工程或预算工程)
1. 优先检查ProjectProperty中name为'执行规范'的节点的value属性:
- 包含"清单"字样的为清单工程
- 包含"预规"字样的为预算工程
2. 如果未找到'执行规范',则查找name为'工程类型'的节点的value属性,
并通过预定义的映射字典判断工程类型。
"""
project_type_mapping = {
"招投标工程": "清单工程",
"概预算工程": "预算工程",
"定额计价": "预算工程",
"清单计价": "清单工程",
}
# 查询1:优先查找 '执行规范'
query_standard = """
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(ps:ProjectPropertySet)-[:HAS_CHILD]->(n:ProjectProperty {name: '执行规范'})
RETURN n.value AS standard
LIMIT 1
"""
with driver.session() as session:
# 执行第一个查询
result = session.run(query_standard)
record = result.single()
if record and record["standard"]:
standard = record["standard"]
if "清单" in standard:
return "清单工程"
elif "预规" in standard:
return "预算工程"
# 如果 '执行规范' 未找到或不匹配,尝试查找 '工程类型'
query_type = """
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(ps:ProjectPropertySet)-[:HAS_CHILD]->(n:ProjectProperty {name: '工程类型'})
RETURN n.value AS type_value
LIMIT 1
"""
result = session.run(query_type)
record = result.single()
if record and record["type_value"]:
type_value = record["type_value"]
# 查找映射字典
if type_value in project_type_mapping:
return project_type_mapping[type_value]
else:
# 如果字典中没有该值,可以打印警告或返回默认值
print(f"Warning: '工程类型' 值 '{type_value}' 未在 project_type_mapping 中定义,使用默认值 '预算工程'")
# 您也可以选择抛出异常或返回 None
# 如果两种方式都找不到,尝试旧的兼容查询(可选保留)
old_query = """
MATCH (n:ProjectProperty {name: '执行规范'})
RETURN n.value AS standard
LIMIT 1
"""
result = session.run(old_query)
record = result.single()
if record and record["standard"]:
standard = record["standard"]
if "清单" in standard:
return "清单工程"
elif "预规" in standard:
return "预算工程"
# 最终默认返回
return "预算工程"
def create_excel_writer(project_name: str = "") -> tuple:
"""创建Excel写入器并返回写入器和文件路径"""
if project_name:
output_file = f"{project_name}.xlsx"
else:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f"knowledge_graph_export_{timestamp}.xlsx"
# 确保目录存在 - 只有当output_file包含路径时才创建目录
output_dir = os.path.dirname(output_file)
if output_dir: # 只有当路径不为空时才创建目录
os.makedirs(output_dir, exist_ok=True)
writer = pd.ExcelWriter(output_file, engine="openpyxl")
return writer, output_file
def custom_sort_key(sortid: str) -> tuple:
"""
为 sortid 创建排序键,支持 '1', '1.1', '1.1.1' 格式
"""
if not sortid:
return (float("inf"),) # 空值排最后
try:
return tuple(int(x) for x in sortid.split("."))
except ValueError:
# 如果无法转为数字,则按字符串排序(兜底)
return tuple(float("inf") for _ in sortid.split("."))
# ------------------------ 项目划分导出函数 ------------------------
def fetch_project_division_items(driver, project_type: str) -> List[Dict[str, Any]]:
"""
获取ProjectDivisionItem节点数据,根据工程类型调整查询
project_type: 工程类型,"清单工程"或"预算工程"
"""
print(f"开始获取项目划分数据,工程类型: {project_type}")
if project_type == "清单工程":
# 清单工程需要获取ProjectDivisionItem和List两种节点
# 修改查询,确保正确获取List节点,并且只获取当前工程的节点
query = """
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(pds:ProjectDivisionSet)-[:HAS_CHILD]->(pdt:ProjectDivisionTree)-[:HAS_CHILD*0..]->(n)
WHERE n:ProjectDivisionItem OR n:List
RETURN n.sortid as sortid, n.name as name, labels(n) as labels, properties(n) as properties
"""
else:
# 预算工程只获取ProjectDivisionItem节点,并且只获取当前工程的节点
query = """
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(pds:ProjectDivisionSet)-[:HAS_CHILD]->(pdt:ProjectDivisionTree)-[:HAS_CHILD*0..]->(n:ProjectDivisionItem)
RETURN n.sortid as sortid, n.name as name, labels(n) as labels, properties(n) as properties
"""
print(f"执行查询: {query}")
results = []
node_count = 0
list_count = 0
pdi_count = 0
with driver.session() as session:
result = session.run(query)
for record in result:
node_count += 1
props = record["properties"]
# --- 只移除已单独提取的字段 ---
props.pop("sortid", None)
props.pop("name", None)
# ✅ 不再 pop("序号", None) —— 保留它!
# 获取节点标签
labels = record["labels"]
label_str = ",".join(labels) if labels else ""
# 检查是否有必要的字段
sortid = record["sortid"]
name = record["name"]
if not sortid or not name:
print(f"警告: 跳过缺少必要字段的节点,标签: {label_str}")
continue
item = {
"sortid": sortid,
"name": name,
"节点类型": label_str, # 添加节点类型信息
**props,
} # 包含 "序号" 在内所有其他属性
# 统计不同类型的节点
if "List" in label_str:
list_count += 1
elif "ProjectDivisionItem" in label_str:
pdi_count += 1
results.append(item)
# 输出调试信息
if node_count <= 10 or node_count % 100 == 0:
print(f"找到节点: 类型={label_str}, name={name}, sortid={sortid}")
print(f"共查询到 {node_count} 个节点,处理后保留 {len(results)} 个节点")
print(f"其中 List 类型节点: {list_count} 个, ProjectDivisionItem 类型节点: {pdi_count} 个")
# 重要修改:不再按专业类型分组,而是直接返回所有节点
if project_type == "清单工程":
# 对于清单工程,直接按sortid排序所有节点
results.sort(key=lambda x: custom_sort_key(str(x["sortid"])))
return results
else:
# 对于预算工程,保持原有的分组排序逻辑
# === 排序逻辑:先按 '专业类型' 分组,再按 sortid 排序 ===
group_key = "专业类型"
if not results:
return []
# 获取所有唯一的"专业类型"值,并排序(按字母顺序或自定义)
unique_groups = sorted(set(item.get(group_key, "") for item in results))
print(f"找到的专业类型: {unique_groups}")
sorted_results = []
for group in unique_groups:
group_items = [item for item in results if item.get(group_key) == group]
# ✅ 排序只依赖 sortid,不依赖"序号"
group_items.sort(key=lambda x: custom_sort_key(str(x["sortid"])))
sorted_results.extend(group_items)
return sorted_results
def export_project_division(writer: pd.ExcelWriter, driver, project_type: str) -> None:
"""
导出项目划分工作簿(不显示 sortid 列)
project_type: 工程类型,"清单工程"或"预算工程"
"""
print(f"正在导出项目划分数据({project_type}...")
data = fetch_project_division_items(driver, project_type)
print(f"从fetch_project_division_items获取到 {len(data)} 条数据")
if not data:
print("警告:未获取到项目划分数据,将创建空表。")
df = pd.DataFrame(columns=["序号", "name", "代码", "编码", "节点类型"])
else:
# 输出前10条数据的节点类型,用于调试
for i, item in enumerate(data[:10]):
print(f"数据{i+1}: 类型={item.get('节点类型', 'N/A')}, name={item.get('name', 'N/A')}")
# 创建完整的DataFrame
df = pd.DataFrame(data)
print(f"创建DataFrame后有 {len(df)} 行")
# 检查不同类型的节点数量
list_nodes = df[df["节点类型"].str.contains("List", na=False)]
pdi_nodes = df[df["节点类型"].str.contains("ProjectDivisionItem", na=False)]
print(f"其中List类型节点有 {len(list_nodes)} 个,ProjectDivisionItem类型节点有 {len(pdi_nodes)} 个")
# === 定义希望显示的固定列 ===
fixed_cols = ["序号", "name", "代码", "编码", "节点类型"]
# === 排除的列(如 sortid===
excluded_cols = {"sortid"}
# 获取其他列,但排除 sortid
other_cols = [col for col in df.columns if col not in fixed_cols and col not in excluded_cols]
# 构建最终列顺序
cols = fixed_cols + other_cols
# 确保只保留实际存在的列
cols = [col for col in cols if col in df.columns]
df = df[cols]
print(f"最终DataFrame有 {len(df)} 行")
# 导出所有节点到"项目划分"工作簿
df.to_excel(writer, sheet_name="项目划分_清单", index=False)
print(f"✅ 已导出 {len(df)} 条项目划分数据")
print(f"📊 表头列:{df.columns.tolist()}")
# 如果是清单工程,创建单独的工作簿
if project_type == "清单工程" and len(data) > 0:
# 1. 只导出List节点到"清单节点"工作簿
df_list = pd.DataFrame([item for item in data if "List" in item.get("节点类型", "")])
if len(df_list) > 0:
# 排除sortid列
if "sortid" in df_list.columns:
df_list = df_list.drop(columns=["sortid"])
df_list.to_excel(writer, sheet_name="清单", index=False)
print(f"✅ 额外导出 {len(df_list)} 条清单节点数据到'清单节点'工作簿")
# 2. 只导出ProjectDivisionItem节点到"项目划分节点"工作簿
df_pdi = pd.DataFrame([item for item in data if "ProjectDivisionItem" in item.get("节点类型", "")])
if len(df_pdi) > 0:
# 排除sortid列
if "sortid" in df_pdi.columns:
df_pdi = df_pdi.drop(columns=["sortid"])
df_pdi.to_excel(writer, sheet_name="项目划分", index=False)
print(f"✅ 额外导出 {len(df_pdi)} 条项目划分节点数据到'项目划分节点'工作簿")
# ------------------------ 工程属性导出函数 ------------------------
def fetch_project_properties(driver) -> List[Dict[str, Any]]:
"""获取ProjectProperty节点数据(包含 sortid"""
query = """
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(ps:ProjectPropertySet)-[:HAS_CHILD*0..]->(n:ProjectProperty)
RETURN n.sortid as sortid, n.name as name, n.value as value
ORDER BY n.sortid // 可选:在数据库层初步排序
"""
results = []
with driver.session() as session:
result = session.run(query)
for record in result:
results.append({"sortid": record["sortid"], "name": record["name"], "value": record["value"]})
# 如果没有找到结果,尝试使用旧的查询方式(向后兼容)
if not results:
old_query = """
MATCH (n:ProjectProperty)
RETURN n.sortid as sortid, n.name as name, n.value as value
ORDER BY n.sortid
"""
result = session.run(old_query)
for record in result:
results.append({"sortid": record["sortid"], "name": record["name"], "value": record["value"]})
# 在 Python 中按 sortid 排序(确保复杂格式正确)
results.sort(key=lambda x: custom_sort_key(str(x["sortid"])))
return results
def export_project_properties(writer: pd.ExcelWriter, driver) -> None:
"""导出工程属性工作簿"""
print("正在导出工程属性数据...")
data = fetch_project_properties(driver)
if not data:
print("警告:未获取到工程属性数据,将创建空表。")
df = pd.DataFrame(columns=["sortid", "name", "value"])
else:
df = pd.DataFrame(data)
# 确保列顺序
df = df[["name", "value"]]
df.to_excel(writer, sheet_name="工程属性", index=False)
print(f"已导出 {len(df)} 条工程属性数据")
# ------------------------ 工程量导出函数 ------------------------
def fetch_project_quantities(driver) -> List[Dict[str, Any]]:
"""获取ProjectQuantity节点数据"""
query = """
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(pds:ProjectDivisionSet)-[:HAS_CHILD]->(pdt:ProjectDivisionTree)-[:HAS_CHILD*0..]->(n:ProjectQuantity)
OPTIONAL MATCH (parent)-[:HAS_CHILD]->(n)
RETURN
n.name as name,
n.sortid as sortid,
COALESCE(parent.GUID, parent.guid, '') AS parent_id, // 优先取 GUID,其次 guid,都无则为空字符串
properties(n) as properties
"""
results = []
with driver.session() as session:
result = session.run(query)
for record in result:
props = record["properties"]
props.pop("name", None)
props.pop("sortid", None)
results.append(
{
"name": record["name"],
"sortid": record["sortid"],
"父级GUID": record["parent_id"],
**props,
}
)
# 如果没有找到结果,尝试使用旧的查询方式(向后兼容)
if not results:
old_query = """
MATCH (n:ProjectQuantity)
OPTIONAL MATCH (parent)-[:HAS_CHILD]->(n)
RETURN
n.name as name,
n.sortid as sortid,
COALESCE(parent.GUID, parent.guid, '') AS parent_id,
properties(n) as properties
"""
result = session.run(old_query)
for record in result:
props = record["properties"]
props.pop("name", None)
props.pop("sortid", None)
results.append(
{
"name": record["name"],
"sortid": record["sortid"],
"父级GUID": record["parent_id"],
**props,
}
)
# 首先按父级GUID排序,再按当前节点的sortid排序
results.sort(
key=lambda x: (
custom_sort_key(str(x["父级GUID"]) if x["父级GUID"] else ""),
custom_sort_key(str(x["sortid"])),
)
)
return results
def export_project_quantities(writer: pd.ExcelWriter, driver) -> None:
"""导出工程量工作簿(不显示 sortid 列)"""
print("正在导出工程量数据...")
data = fetch_project_quantities(driver)
if not data:
print("警告:未获取到工程量数据,将创建空表。")
# 创建空表时也不包含 sortid
fixed_cols = ["父级GUID", "特征段", "name", "规格型号", "单位", "计算式", "数量"]
df = pd.DataFrame(columns=fixed_cols)
else:
df = pd.DataFrame(data)
# === 定义希望显示的固定列(按期望顺序)===
fixed_cols = [
"父级GUID",
"特征段",
"name",
"规格型号",
"单位",
"计算式",
"数量",
]
# === 排除的列(如不需要显示的字段)===
excluded_cols = {"sortid"} # 可以添加更多如 "id", "临时字段" 等
# === 获取其余列:在 df.columns 中但不在 fixed_cols 且不在 excluded_cols ===
other_cols = [col for col in df.columns if col not in fixed_cols and col not in excluded_cols]
# === 构建最终列顺序 ===
cols = fixed_cols + other_cols
# 确保只保留实际存在的列
cols = [col for col in cols if col in df.columns]
df = df[cols]
# 导出到 Excel
df.to_excel(writer, sheet_name="工程量", index=False)
print(f"✅ 已导出 {len(df)} 条工程量数据")
print(f"📊 表头列:{df.columns.tolist()}")
# ------------------------ 人材机导出函数 ------------------------
def fetch_materials_equipments(driver) -> List[Dict[str, Any]]:
"""获取MaterialOrEquipment节点数据"""
query = """
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(pds:ProjectDivisionSet)-[:HAS_CHILD]->(pdt:ProjectDivisionTree)-[:HAS_CHILD*0..]->(pq:ProjectQuantity)-[:HAS_CHILD]->(n:MaterialOrEquipment)
OPTIONAL MATCH (parent)-[:HAS_CHILD]->(n)
RETURN n.id as id, n.name as name, n.sortid as sortid, COALESCE(parent.GUID, parent.guid, '') AS parent_id, properties(n) as properties
"""
results = []
with driver.session() as session:
result = session.run(query)
for record in result:
props = record["properties"]
props.pop("id", None)
props.pop("name", None)
props.pop("sortid", None)
results.append(
{
"id": record["id"],
"name": record["name"],
"sortid": record["sortid"],
"父级GUID": record["parent_id"] if record["parent_id"] else "",
**props,
}
)
# 如果没有找到结果,尝试使用旧的查询方式(向后兼容)
if not results:
old_query = """
MATCH (n:MaterialOrEquipment)
OPTIONAL MATCH (parent)-[:HAS_CHILD]->(n)
RETURN n.id as id, n.name as name, n.sortid as sortid, COALESCE(parent.GUID, parent.guid, '') AS parent_id, properties(n) as properties
"""
result = session.run(old_query)
for record in result:
props = record["properties"]
props.pop("id", None)
props.pop("name", None)
props.pop("sortid", None)
results.append(
{
"id": record["id"],
"name": record["name"],
"sortid": record["sortid"],
"父级GUID": record["parent_id"] if record["parent_id"] else "",
**props,
}
)
# 首先按父级GUID排序,再按当前节点的sortid排序
results.sort(key=lambda x: (custom_sort_key(str(x["父级GUID"])), custom_sort_key(str(x["sortid"]))))
return results
def export_materials_equipments(writer: pd.ExcelWriter, driver) -> None:
"""导出入材机工作簿"""
print("正在导出入材机数据...")
data = fetch_materials_equipments(driver)
if not data:
print("警告:未获取到入材机数据,将创建空表。")
df = pd.DataFrame(columns=["父级GUID", "sortid", "id", "name"])
else:
df = pd.DataFrame(data)
# 父级GUID, sortid, id, name, 其他属性
cols = ["父级GUID", "id", "name"] + [
col for col in df.columns if col not in ["父级GUID", "sortid", "id", "name"]
]
df = df[cols]
df.to_excel(writer, sheet_name="人材机", index=False)
print(f"已导出 {len(df)} 条人材机数据")
# ------------------------ 费用预览导出函数 ------------------------
def fetch_cost_set_items(driver, parent_label: str, project_type: str) -> tuple:
"""
获取 CostSet 节点数据,正确提取父节点的中文字段
project_type: 工程类型,"清单工程"或"预算工程"
"""
if project_type == "清单工程":
if parent_label == "ProjectDivisionItem":
# 清单工程需要考虑ProjectDivisionItem和List两种节点
child_node = "(child) WHERE child:ProjectDivisionItem OR child:List"
parent_node = "(parent) WHERE parent:ProjectDivisionItem OR parent:List"
extra_return = """
child.专业类型 AS parent_specialty,
child.取费表 AS parent_fee_table,
child.序号 AS parent_xuhao,
labels(child) AS child_labels
"""
parent_cols = ["专业类型", "取费表", "序号", "节点类型"]
elif parent_label == "ProjectQuantity":
child_node = "(child:ProjectQuantity)"
parent_node = "(parent:ProjectQuantity)"
extra_return = """
child.编码 AS parent_code,
labels(child) AS child_labels
"""
parent_cols = ["编码", "节点类型"]
else:
raise ValueError(f"不支持的父节点类型: {parent_label}")
else:
# 预算工程的原有逻辑
if parent_label == "ProjectDivisionItem":
child_node = "(child:ProjectDivisionItem)"
parent_node = "(parent:ProjectDivisionItem)"
extra_return = """
child.专业类型 AS parent_specialty,
child.取费表 AS parent_fee_table,
child.序号 AS parent_xuhao
"""
parent_cols = ["专业类型", "取费表", "序号"]
elif parent_label == "ProjectQuantity":
child_node = "(child:ProjectQuantity)"
parent_node = "(parent:ProjectQuantity)"
extra_return = """
child.编码 AS parent_code
"""
parent_cols = ["编码"]
else:
raise ValueError(f"不支持的父节点类型: {parent_label}")
# ✅ 关键:MATCH 父子关系,并提取字段,添加对current=true的过滤
query = f"""
MATCH (ed:EngineeringData {{current: true}})-[:HAS_CHILD*0..]->{child_node}
OPTIONAL MATCH (parent)-[:HAS_CHILD]->(child)
OPTIONAL MATCH (child)-[:USE]->(costSet:CostSet)
OPTIONAL MATCH (costSet)-[:HAS_CHILD]->(costItem:CostItem)
RETURN
child.name AS child_name,
COALESCE(costSet.GUID, toString(id(child))) AS guid,
COALESCE(costSet.sortid, '999999') AS cost_sortid,
{extra_return.strip()},
costItem.name AS cost_item_name,
costItem.cost AS cost_value
ORDER BY COALESCE(costSet.sortid, '999999')
"""
results = {}
all_cost_names = []
with driver.session() as session:
result = session.run(query)
for record in result:
child_name = record["child_name"]
guid = record["guid"]
cost_sortid = record["cost_sortid"]
if guid not in results:
row = {"sortid": cost_sortid, "GUID": guid, "工程名称": child_name}
# ✅ 使用 record 中的字段名(来自 extra_return 的别名)
if parent_label == "ProjectDivisionItem":
row["专业类型"] = record["parent_specialty"]
row["取费表"] = record["parent_fee_table"]
row["序号"] = record["parent_xuhao"]
if project_type == "清单工程":
row["节点类型"] = ",".join(record["child_labels"]) if record["child_labels"] else ""
elif parent_label == "ProjectQuantity":
row["编码"] = record["parent_code"]
if project_type == "清单工程":
row["节点类型"] = ",".join(record["child_labels"]) if record["child_labels"] else ""
results[guid] = row
raw_name = record["cost_item_name"]
if raw_name is not None:
cost_name = raw_name.split("_")[0] if "_" in raw_name else raw_name
if cost_name not in all_cost_names:
all_cost_names.append(cost_name)
results[guid][cost_name] = record["cost_value"]
# 如果没有找到结果,尝试使用旧的查询方式(向后兼容)
if not results:
old_query = f"""
MATCH {child_node}
OPTIONAL MATCH (parent)-[:HAS_CHILD]->(child)
OPTIONAL MATCH (child)-[:USE]->(costSet:CostSet)
OPTIONAL MATCH (costSet)-[:HAS_CHILD]->(costItem:CostItem)
RETURN
child.name AS child_name,
COALESCE(costSet.GUID, toString(id(child))) AS guid,
COALESCE(costSet.sortid, '999999') AS cost_sortid,
{extra_return.strip()},
costItem.name AS cost_item_name,
costItem.cost AS cost_value
ORDER BY COALESCE(costSet.sortid, '999999')
"""
result = session.run(old_query)
for record in result:
child_name = record["child_name"]
guid = record["guid"]
cost_sortid = record["cost_sortid"]
if guid not in results:
row = {"sortid": cost_sortid, "GUID": guid, "工程名称": child_name}
# ✅ 使用 record 中的字段名(来自 extra_return 的别名)
if parent_label == "ProjectDivisionItem":
row["专业类型"] = record["parent_specialty"]
row["取费表"] = record["parent_fee_table"]
row["序号"] = record["parent_xuhao"]
if project_type == "清单工程":
row["节点类型"] = ",".join(record["child_labels"]) if record["child_labels"] else ""
elif parent_label == "ProjectQuantity":
row["编码"] = record["parent_code"]
if project_type == "清单工程":
row["节点类型"] = ",".join(record["child_labels"]) if record["child_labels"] else ""
results[guid] = row
raw_name = record["cost_item_name"]
if raw_name is not None:
cost_name = raw_name.split("_")[0] if "_" in raw_name else raw_name
if cost_name not in all_cost_names:
all_cost_names.append(cost_name)
results[guid][cost_name] = record["cost_value"]
# 排序
data = list(results.values())
data.sort(key=lambda x: custom_sort_key(str(x["sortid"])))
# 构建列顺序
base_cols = ["sortid", "GUID", "工程名称"]
if parent_label == "ProjectDivisionItem":
base_cols += ["专业类型", "取费表", "序号"]
if project_type == "清单工程":
base_cols += ["节点类型"]
elif parent_label == "ProjectQuantity":
base_cols += ["编码"]
if project_type == "清单工程":
base_cols += ["节点类型"]
final_columns = base_cols + all_cost_names
return data, final_columns
def export_cost_set_preview(
writer: pd.ExcelWriter, driver, parent_label: str, sheet_name: str, project_type: str
) -> None:
"""
导出费用预览工作簿(动态列名):按 sortid 排序,但不显示 sortid 列
project_type: 工程类型,"清单工程"或"预算工程"
"""
print(f"正在导出 {sheet_name} 数据({project_type}...")
try:
data, column_order = fetch_cost_set_items(driver, parent_label, project_type)
except Exception as e:
print(f"获取数据失败: {e}")
return
if not data:
print(f"警告: 没有找到 {sheet_name} 相关数据")
# 构建空表,排除 sortid
cols_without_sortid = [col for col in column_order if col != "sortid"]
df = pd.DataFrame(columns=cols_without_sortid)
else:
df = pd.DataFrame(data)
# 补全缺失列
for col in column_order:
if col not in df.columns:
df[col] = None
# 按原始顺序排列(包含 sortid)
df = df[column_order]
# === 关键:移除 sortid 列 ===
df = df.drop(columns=["sortid"]) # 或使用:df = df[[col for col in df.columns if col != "sortid"]]
df.to_excel(writer, sheet_name=sheet_name, index=False)
print(f"✅ 已导出 {len(df)}{sheet_name} 数据")
print(f"📊 列顺序: {df.columns.tolist()}")
# ------------------------ 材机分析导出函数 ------------------------
def fetch_material_machine_items(driver) -> tuple:
"""获取MaterialandmachineCostItem节点数据(修正版:避免去重)"""
query = """
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD*0..]->(parent:ProjectDivisionItem)-[:USE]->(costSet:CostSet)-[:HAS_CHILD]->(item:MaterialandmachineCostItem)
RETURN
costSet.GUID AS guid,
costSet.sortid AS sortid,
parent.name AS parent_name,
properties(item) AS properties
ORDER BY costSet.sortid
"""
results = []
all_columns = set()
column_order = [] # 保持字段首次出现顺序
with driver.session() as session:
result = session.run(query)
for record in result:
guid = record["guid"]
sortid = record["sortid"]
parent_name = record["parent_name"]
props = record["properties"]
# 构造一行数据
row = {"sortid": sortid, "GUID": guid, "项目划分名称": parent_name}
# 添加 properties 中的每个字段
for key, value in props.items():
if key not in all_columns:
all_columns.add(key)
column_order.append(key)
row[key] = value
results.append(row)
# 如果没有找到结果,尝试使用旧的查询方式(向后兼容)
if not results:
old_query = """
MATCH (parent:ProjectDivisionItem)-[:USE]->(costSet:CostSet)-[:HAS_CHILD]->(item:MaterialandmachineCostItem)
RETURN
costSet.GUID AS guid,
costSet.sortid AS sortid,
parent.name AS parent_name,
properties(item) AS properties
ORDER BY costSet.sortid
"""
result = session.run(old_query)
for record in result:
guid = record["guid"]
sortid = record["sortid"]
parent_name = record["parent_name"]
props = record["properties"]
# 构造一行数据
row = {"sortid": sortid, "GUID": guid, "项目划分名称": parent_name}
# 添加 properties 中的每个字段
for key, value in props.items():
if key not in all_columns:
all_columns.add(key)
column_order.append(key)
row[key] = value
results.append(row)
# === 关键修改:复合排序键 ===
# 先按 GUID 排序,再在每组内按 sortid 排序(使用 custom_sort_key
results.sort(
key=lambda x: (x["GUID"], custom_sort_key(str(x["sortid"]))) # 先按 GUID 字符串排序 # 再按 sortid 的自定义顺序
)
# 最终列顺序
final_columns = ["sortid", "GUID", "项目划分名称"] + column_order
return results, final_columns
def export_material_machine_analysis(writer: pd.ExcelWriter, driver) -> None:
"""导出材机分析工作簿(修正版):按 sortid 排序,但不显示 sortid 列"""
print("正在导出材机分析数据...")
try:
data, column_order = fetch_material_machine_items(driver)
except Exception as e:
print(f"获取材机分析数据失败: {e}")
return
if not data:
print("警告: 没有找到材机分析相关数据")
# 创建空表,也不包含 sortid
df = pd.DataFrame(columns=[col for col in column_order if col != "sortid"])
else:
df = pd.DataFrame(data)
# 确保所有列都存在(防止某些行缺少字段)
for col in column_order:
if col not in df.columns:
df[col] = None
# === 关键:移除 sortid 列(但排序已生效)===
cols_to_keep = [col for col in column_order if col != "sortid"]
# 重新排序并过滤列
df = df[cols_to_keep]
df.to_excel(writer, sheet_name="材机分析", index=False)
print(f"✅ 已导出 {len(df)} 条材机分析数据,列顺序: {df.columns.tolist()}")
# ------------------------ 取费表导出函数 ------------------------
def fetch_all_fee_collections(driver) -> List[Dict[str, Any]]:
"""递归获取所有FeeCollection节点数据,保留原始属性名(英文),加入 template_name 和 sortid"""
query = """
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(templateSet:FeeTableTemplateSet)-[:HAS_CHILD]->(templateItem:FeeTableTemplateItem)
OPTIONAL MATCH path=(templateItem)-[:HAS_CHILD*]->(feeCollection:FeeCollection)
WHERE ALL(r in relationships(path) WHERE type(r) = 'HAS_CHILD')
RETURN templateItem.name as template_name,
feeCollection,
length(path) as depth
ORDER BY template_name, depth
"""
results = []
with driver.session() as session:
result = session.run(query)
for record in result:
fee_collection = record["feeCollection"]
if fee_collection is None:
continue
# 节点属性转为字典(原始英文属性名)
props = dict(fee_collection.items())
# 添加额外字段(template_name 和 depth),也用英文
row = {
"template_name": record["template_name"], # 原始英文字段
"sortid": props.get("sortid"), # 用于排序和展示
"depth": record["depth"], # 用于分组排序,导出前会删除
}
# 合并节点原始属性(保留英文名)
row.update(props)
results.append(row)
# 如果没有找到结果,尝试使用旧的查询方式(向后兼容)
if not results:
old_query = """
MATCH (templateSet:FeeTableTemplateSet)-[:HAS_CHILD]->(templateItem:FeeTableTemplateItem)
OPTIONAL MATCH path=(templateItem)-[:HAS_CHILD*]->(feeCollection:FeeCollection)
WHERE ALL(r in relationships(path) WHERE type(r) = 'HAS_CHILD')
RETURN templateItem.name as template_name,
feeCollection,
length(path) as depth
ORDER BY template_name, depth
"""
result = session.run(old_query)
for record in result:
fee_collection = record["feeCollection"]
if fee_collection is None:
continue
# 节点属性转为字典(原始英文属性名)
props = dict(fee_collection.items())
# 添加额外字段(template_name 和 depth),也用英文
row = {
"template_name": record["template_name"], # 原始英文字段
"sortid": props.get("sortid"), # 用于排序和展示
"depth": record["depth"], # 用于分组排序,导出前会删除
}
# 合并节点原始属性(保留英文名)
row.update(props)
results.append(row)
return results
def export_fee_table(writer, driver) -> None:
"""导出取费表工作簿:按 sortid 排序,不显示 sortid,固定 name、序号、代码 三列在前"""
print("正在导出取费表数据...")
data = fetch_all_fee_collections(driver)
if not data:
print("警告: 没有找到取费表相关数据")
# 创建空 DataFrame,包含固定列
df = pd.DataFrame(columns=["序号", "name", "代码"])
else:
df = pd.DataFrame(data)
# === 确保 sortid 存在,用于排序 ===
if "sortid" not in df.columns:
df["sortid"] = ""
# 生成排序键
df["sort_key"] = df["sortid"].apply(custom_sort_key)
# 按 template_name 分组,组内按 sort_key 排序
if "template_name" in df.columns:
df = df.sort_values(by=["template_name", "sort_key"]).reset_index(drop=True)
else:
df = df.sort_values(by=["sort_key"]).reset_index(drop=True)
# 删除辅助列
df = df.drop(["sort_key", "depth"], axis=1, errors="ignore")
# === 固定前置列 ===
fixed_front_cols = ["序号", "name", "代码"]
# 补全缺失的固定列(避免 KeyError)
for col in fixed_front_cols:
if col not in df.columns:
df[col] = None
# 构建最终列顺序:固定列 + 其他非 sortid 列
remaining_cols = [col for col in df.columns if col not in fixed_front_cols and col != "sortid"]
final_columns = fixed_front_cols + remaining_cols
df = df[final_columns]
# 导出到 Excel
df.to_excel(writer, sheet_name="取费表", index=False)
# 安全获取 template_name 数量
num_templates = df["template_name"].nunique() if not df.empty and "template_name" in df.columns else 0
print(f"✅ 已导出 {len(df)} 条取费表数据,来自 {num_templates} 个取费表")
print(f"📊 表头顺序: {df.columns.tolist()}")
# ------------------------ 工程费用导出函数 ------------------------
def fetch_fee_schedule_items(driver) -> List[Dict[str, str]]:
"""获取所有FeeScheduleItem节点"""
query = """
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(feeScheduleSet:FeeScheduleSet)-[:HAS_CHILD]->(feeScheduleItem:FeeScheduleItem)
RETURN feeScheduleItem.name as name
"""
results = []
with driver.session() as session:
result = session.run(query)
for record in result:
results.append({"name": record["name"]})
# 如果没有找到结果,尝试使用旧的查询方式(向后兼容)
if not results:
old_query = """
MATCH (feeScheduleSet:FeeScheduleSet)-[:HAS_CHILD]->(feeScheduleItem:FeeScheduleItem)
RETURN feeScheduleItem.name as name
"""
result = session.run(old_query)
for record in result:
results.append({"name": record["name"]})
return results
def fetch_fees_by_schedule_item(driver, schedule_item_name: str) -> List[Dict[str, Any]]:
"""获取指定FeeScheduleItem下的Fee节点数据,动态提取所有属性"""
query = """
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(feeScheduleSet:FeeScheduleSet)-[:HAS_CHILD]->(feeScheduleItem:FeeScheduleItem {name: $schedule_item_name})
MATCH path=(feeScheduleItem)-[:HAS_CHILD*]->(fee:Fee)
WHERE ALL(r in relationships(path) WHERE type(r) = 'HAS_CHILD')
RETURN fee.sortid as sortid,
fee // 返回整个节点
"""
results = []
with driver.session() as session:
result = session.run(query, schedule_item_name=schedule_item_name)
for record in result:
fee_node = record["fee"]
props = dict(fee_node.items()) if fee_node is not None else {}
row = {"sortid": record["sortid"]}
row.update(props)
results.append(row)
# 如果没有找到结果,尝试使用旧的查询方式(向后兼容)
if not results:
old_query = """
MATCH (feeScheduleSet:FeeScheduleSet)-[:HAS_CHILD]->(feeScheduleItem:FeeScheduleItem {name: $schedule_item_name})
MATCH path=(feeScheduleItem)-[:HAS_CHILD*]->(fee:Fee)
WHERE ALL(r in relationships(path) WHERE type(r) = 'HAS_CHILD')
RETURN fee.sortid as sortid,
fee // 返回整个节点
"""
result = session.run(old_query, schedule_item_name=schedule_item_name)
for record in result:
fee_node = record["fee"]
props = dict(fee_node.items()) if fee_node is not None else {}
row = {"sortid": record["sortid"]}
row.update(props)
results.append(row)
return results
def export_fee_schedules(writer: pd.ExcelWriter, driver) -> None:
"""导出费用计划工作簿:按 sortid 排序,不显示 sortid,固定 name、序号、代码 三列在前"""
print("正在导出费用计划数据...")
schedule_items = fetch_fee_schedule_items(driver)
if not schedule_items:
print("警告: 没有找到费用计划相关数据")
return
# === 固定前置列 ===
fixed_front_cols = ["序号", "name", "代码"]
for item in schedule_items:
schedule_item_name = item["name"]
print(f"正在导出 '{schedule_item_name}' 费用计划...")
# 获取该 FeeScheduleItem 下的所有 Fee 节点(动态属性)
fees = fetch_fees_by_schedule_item(driver, schedule_item_name)
if not fees:
print(f"警告: 没有找到 '{schedule_item_name}' 相关的费用数据")
# 创建空 DataFrame,只包含固定列
df = pd.DataFrame(columns=fixed_front_cols)
else:
df = pd.DataFrame(fees)
# === 排序:使用 sortid 排序(关键:排序后才移除)===
if "sortid" in df.columns:
df["sort_key"] = df["sortid"].apply(custom_sort_key)
df = df.sort_values(by=["sort_key"]).reset_index(drop=True)
df = df.drop(["sort_key"], axis=1)
# === 构建列顺序:name、序号、代码 在前,其余非 sortid 列在后 ===
remaining_cols = [col for col in df.columns if col not in fixed_front_cols and col != "sortid"]
# 确保 fixed_front_cols 中存在的列才保留(避免 KeyError)
cols_in_df = [col for col in fixed_front_cols if col in df.columns]
final_columns = cols_in_df + remaining_cols
df = df[final_columns]
# sheet 名称限制(最大 31 字符)
sheet_name = schedule_item_name
if len(sheet_name) > 31:
sheet_name = sheet_name[:28] + "..."
df.to_excel(writer, sheet_name=sheet_name, index=False)
print(f"✅ 已导出 {len(df)} 条 '{schedule_item_name}' 费用计划数据")
print(f"📊 表头顺序: {df.columns.tolist()}")
# ------------------------ 主函数 ------------------------
def export_knowledge_graph(output_dir: str = "", software_name: str = "") -> str:
"""
主导出函数
参数:
output_dir: 输出目录路径
software_name: 软件名称,优先用作文件名
返回:
str: 导出文件的完整路径
"""
# 加载配置并创建驱动
config = load_neo4j_config()
driver = GraphDatabase.driver(config["uri"], auth=(config["user"], config["password"]))
try:
# 获取工程类型(清单工程或预算工程)
project_type = get_project_type(driver)
print(f"检测到工程类型: {project_type}")
# 如果提供了软件名称,优先使用软件名称
if software_name:
project_name = software_name
print(f"使用提供的软件名称作为文件名: {project_name}")
else:
# 获取工程名称
project_name = get_project_name(driver)
if not project_name:
print("警告:未找到工程名称,将使用默认文件名")
# 如果有指定输出目录,添加到文件名前
if output_dir:
if not project_name:
project_name = "knowledge_graph_export"
project_name = os.path.join(output_dir, project_name)
# 创建Excel写入器和获取文件路径
writer, output_path = create_excel_writer(project_name)
# 导出项目划分工作簿(根据工程类型)
export_project_division(writer, driver, project_type)
# 导出工程量工作簿
export_project_properties(writer, driver)
# 导出工程量工作簿
export_project_quantities(writer, driver)
# 导出人材机工作簿
export_materials_equipments(writer, driver)
# 导出费用预览工作簿(根据工程类型)
export_cost_set_preview(writer, driver, "ProjectDivisionItem", "项目划分_费用预览", project_type)
export_cost_set_preview(writer, driver, "ProjectQuantity", "工程量_费用预览", project_type)
# 导出材机分析工作簿
export_material_machine_analysis(writer, driver)
# 导出取费表工作簿
export_fee_table(writer, driver)
# 导出费用计划工作簿
export_fee_schedules(writer, driver)
# 保存Excel文件
writer.close()
print(f"导出完成,文件已保存到: {os.path.abspath(output_path)}")
return output_path
finally:
driver.close()
# if __name__ == "__main__":
# # 执行导出,可以指定输出目录
# export_knowledge_graph() # 默认当前目录