1189 lines
46 KiB
Python
1189 lines
46 KiB
Python
import pandas as pd
|
||
from configparser import ConfigParser
|
||
from neo4j import GraphDatabase
|
||
from datetime import datetime
|
||
import re
|
||
import os
|
||
from typing import Dict, List, Any
|
||
|
||
|
||
# ------------------------ 通用工具函数 ------------------------
|
||
def load_neo4j_config(config_file: str = "config.ini") -> Dict[str, str]:
|
||
"""从config.ini加载Neo4j配置"""
|
||
config = ConfigParser()
|
||
config.read(config_file)
|
||
return {
|
||
"uri": config.get("neo4j", "uri"),
|
||
"user": config.get("neo4j", "user"),
|
||
"password": config.get("neo4j", "password"),
|
||
}
|
||
|
||
|
||
def get_project_name(driver) -> str:
|
||
"""获取工程名称"""
|
||
query = """
|
||
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(ps:ProjectPropertySet)-[:HAS_CHILD]->(n:ProjectProperty {name: '工程名称'})
|
||
RETURN n.value as project_name
|
||
LIMIT 1
|
||
"""
|
||
|
||
with driver.session() as session:
|
||
result = session.run(query)
|
||
record = result.single()
|
||
if record:
|
||
return record["project_name"]
|
||
|
||
# 如果没有找到,尝试使用旧的查询方式(向后兼容)
|
||
old_query = """
|
||
MATCH (n:ProjectProperty {name: '工程名称'})
|
||
RETURN n.value as project_name
|
||
LIMIT 1
|
||
"""
|
||
result = session.run(old_query)
|
||
record = result.single()
|
||
if record:
|
||
return record["project_name"]
|
||
|
||
return "" # 返回空字符串而非None
|
||
|
||
|
||
def get_project_type(driver) -> str:
|
||
"""
|
||
获取工程类型(清单工程或预算工程)
|
||
1. 优先检查ProjectProperty中name为'执行规范'的节点的value属性:
|
||
- 包含"清单"字样的为清单工程
|
||
- 包含"预规"字样的为预算工程
|
||
2. 如果未找到'执行规范',则查找name为'工程类型'的节点的value属性,
|
||
并通过预定义的映射字典判断工程类型。
|
||
"""
|
||
|
||
project_type_mapping = {
|
||
"招投标工程": "清单工程",
|
||
"概预算工程": "预算工程",
|
||
"定额计价": "预算工程",
|
||
"清单计价": "清单工程",
|
||
}
|
||
|
||
# 查询1:优先查找 '执行规范'
|
||
query_standard = """
|
||
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(ps:ProjectPropertySet)-[:HAS_CHILD]->(n:ProjectProperty {name: '执行规范'})
|
||
RETURN n.value AS standard
|
||
LIMIT 1
|
||
"""
|
||
|
||
with driver.session() as session:
|
||
# 执行第一个查询
|
||
result = session.run(query_standard)
|
||
record = result.single()
|
||
if record and record["standard"]:
|
||
standard = record["standard"]
|
||
if "清单" in standard:
|
||
return "清单工程"
|
||
elif "预规" in standard:
|
||
return "预算工程"
|
||
|
||
# 如果 '执行规范' 未找到或不匹配,尝试查找 '工程类型'
|
||
query_type = """
|
||
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(ps:ProjectPropertySet)-[:HAS_CHILD]->(n:ProjectProperty {name: '工程类型'})
|
||
RETURN n.value AS type_value
|
||
LIMIT 1
|
||
"""
|
||
result = session.run(query_type)
|
||
record = result.single()
|
||
if record and record["type_value"]:
|
||
type_value = record["type_value"]
|
||
# 查找映射字典
|
||
if type_value in project_type_mapping:
|
||
return project_type_mapping[type_value]
|
||
else:
|
||
# 如果字典中没有该值,可以打印警告或返回默认值
|
||
print(f"Warning: '工程类型' 值 '{type_value}' 未在 project_type_mapping 中定义,使用默认值 '预算工程'")
|
||
# 您也可以选择抛出异常或返回 None
|
||
|
||
# 如果两种方式都找不到,尝试旧的兼容查询(可选保留)
|
||
old_query = """
|
||
MATCH (n:ProjectProperty {name: '执行规范'})
|
||
RETURN n.value AS standard
|
||
LIMIT 1
|
||
"""
|
||
result = session.run(old_query)
|
||
record = result.single()
|
||
if record and record["standard"]:
|
||
standard = record["standard"]
|
||
if "清单" in standard:
|
||
return "清单工程"
|
||
elif "预规" in standard:
|
||
return "预算工程"
|
||
|
||
# 最终默认返回
|
||
return "预算工程"
|
||
|
||
|
||
def create_excel_writer(project_name: str = "") -> tuple:
|
||
"""创建Excel写入器并返回写入器和文件路径"""
|
||
if project_name:
|
||
output_file = f"{project_name}.xlsx"
|
||
else:
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
output_file = f"knowledge_graph_export_{timestamp}.xlsx"
|
||
|
||
# 确保目录存在 - 只有当output_file包含路径时才创建目录
|
||
output_dir = os.path.dirname(output_file)
|
||
if output_dir: # 只有当路径不为空时才创建目录
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
writer = pd.ExcelWriter(output_file, engine="openpyxl")
|
||
return writer, output_file
|
||
|
||
|
||
def custom_sort_key(sortid: str) -> tuple:
|
||
"""
|
||
为 sortid 创建排序键,支持 '1', '1.1', '1.1.1' 格式
|
||
"""
|
||
if not sortid:
|
||
return (float("inf"),) # 空值排最后
|
||
try:
|
||
return tuple(int(x) for x in sortid.split("."))
|
||
except ValueError:
|
||
# 如果无法转为数字,则按字符串排序(兜底)
|
||
return tuple(float("inf") for _ in sortid.split("."))
|
||
|
||
|
||
# ------------------------ 项目划分导出函数 ------------------------
|
||
def fetch_project_division_items(driver, project_type: str) -> List[Dict[str, Any]]:
|
||
"""
|
||
获取ProjectDivisionItem节点数据,根据工程类型调整查询
|
||
project_type: 工程类型,"清单工程"或"预算工程"
|
||
"""
|
||
print(f"开始获取项目划分数据,工程类型: {project_type}")
|
||
|
||
if project_type == "清单工程":
|
||
# 清单工程需要获取ProjectDivisionItem和List两种节点
|
||
# 修改查询,确保正确获取List节点,并且只获取当前工程的节点
|
||
query = """
|
||
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(pds:ProjectDivisionSet)-[:HAS_CHILD]->(pdt:ProjectDivisionTree)-[:HAS_CHILD*0..]->(n)
|
||
WHERE n:ProjectDivisionItem OR n:List
|
||
RETURN n.sortid as sortid, n.name as name, labels(n) as labels, properties(n) as properties
|
||
"""
|
||
else:
|
||
# 预算工程只获取ProjectDivisionItem节点,并且只获取当前工程的节点
|
||
query = """
|
||
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(pds:ProjectDivisionSet)-[:HAS_CHILD]->(pdt:ProjectDivisionTree)-[:HAS_CHILD*0..]->(n:ProjectDivisionItem)
|
||
RETURN n.sortid as sortid, n.name as name, labels(n) as labels, properties(n) as properties
|
||
"""
|
||
|
||
print(f"执行查询: {query}")
|
||
|
||
results = []
|
||
node_count = 0
|
||
list_count = 0
|
||
pdi_count = 0
|
||
|
||
with driver.session() as session:
|
||
result = session.run(query)
|
||
for record in result:
|
||
node_count += 1
|
||
props = record["properties"]
|
||
# --- 只移除已单独提取的字段 ---
|
||
props.pop("sortid", None)
|
||
props.pop("name", None)
|
||
# ✅ 不再 pop("序号", None) —— 保留它!
|
||
|
||
# 获取节点标签
|
||
labels = record["labels"]
|
||
label_str = ",".join(labels) if labels else ""
|
||
|
||
# 检查是否有必要的字段
|
||
sortid = record["sortid"]
|
||
name = record["name"]
|
||
|
||
if not sortid or not name:
|
||
print(f"警告: 跳过缺少必要字段的节点,标签: {label_str}")
|
||
continue
|
||
|
||
item = {
|
||
"sortid": sortid,
|
||
"name": name,
|
||
"节点类型": label_str, # 添加节点类型信息
|
||
**props,
|
||
} # 包含 "序号" 在内所有其他属性
|
||
|
||
# 统计不同类型的节点
|
||
if "List" in label_str:
|
||
list_count += 1
|
||
elif "ProjectDivisionItem" in label_str:
|
||
pdi_count += 1
|
||
|
||
results.append(item)
|
||
|
||
# 输出调试信息
|
||
if node_count <= 10 or node_count % 100 == 0:
|
||
print(f"找到节点: 类型={label_str}, name={name}, sortid={sortid}")
|
||
|
||
print(f"共查询到 {node_count} 个节点,处理后保留 {len(results)} 个节点")
|
||
print(f"其中 List 类型节点: {list_count} 个, ProjectDivisionItem 类型节点: {pdi_count} 个")
|
||
|
||
# 重要修改:不再按专业类型分组,而是直接返回所有节点
|
||
if project_type == "清单工程":
|
||
# 对于清单工程,直接按sortid排序所有节点
|
||
results.sort(key=lambda x: custom_sort_key(str(x["sortid"])))
|
||
return results
|
||
else:
|
||
# 对于预算工程,保持原有的分组排序逻辑
|
||
# === 排序逻辑:先按 '专业类型' 分组,再按 sortid 排序 ===
|
||
group_key = "专业类型"
|
||
|
||
if not results:
|
||
return []
|
||
|
||
# 获取所有唯一的"专业类型"值,并排序(按字母顺序或自定义)
|
||
unique_groups = sorted(set(item.get(group_key, "") for item in results))
|
||
print(f"找到的专业类型: {unique_groups}")
|
||
|
||
sorted_results = []
|
||
for group in unique_groups:
|
||
group_items = [item for item in results if item.get(group_key) == group]
|
||
# ✅ 排序只依赖 sortid,不依赖"序号"
|
||
group_items.sort(key=lambda x: custom_sort_key(str(x["sortid"])))
|
||
sorted_results.extend(group_items)
|
||
|
||
return sorted_results
|
||
|
||
|
||
def export_project_division(writer: pd.ExcelWriter, driver, project_type: str) -> None:
|
||
"""
|
||
导出项目划分工作簿(不显示 sortid 列)
|
||
project_type: 工程类型,"清单工程"或"预算工程"
|
||
"""
|
||
print(f"正在导出项目划分数据({project_type})...")
|
||
data = fetch_project_division_items(driver, project_type)
|
||
|
||
print(f"从fetch_project_division_items获取到 {len(data)} 条数据")
|
||
|
||
if not data:
|
||
print("警告:未获取到项目划分数据,将创建空表。")
|
||
df = pd.DataFrame(columns=["序号", "name", "代码", "编码", "节点类型"])
|
||
else:
|
||
# 输出前10条数据的节点类型,用于调试
|
||
for i, item in enumerate(data[:10]):
|
||
print(f"数据{i+1}: 类型={item.get('节点类型', 'N/A')}, name={item.get('name', 'N/A')}")
|
||
|
||
# 创建完整的DataFrame
|
||
df = pd.DataFrame(data)
|
||
print(f"创建DataFrame后有 {len(df)} 行")
|
||
|
||
# 检查不同类型的节点数量
|
||
list_nodes = df[df["节点类型"].str.contains("List", na=False)]
|
||
pdi_nodes = df[df["节点类型"].str.contains("ProjectDivisionItem", na=False)]
|
||
print(f"其中List类型节点有 {len(list_nodes)} 个,ProjectDivisionItem类型节点有 {len(pdi_nodes)} 个")
|
||
|
||
# === 定义希望显示的固定列 ===
|
||
fixed_cols = ["序号", "name", "代码", "编码", "节点类型"]
|
||
|
||
# === 排除的列(如 sortid)===
|
||
excluded_cols = {"sortid"}
|
||
|
||
# 获取其他列,但排除 sortid
|
||
other_cols = [col for col in df.columns if col not in fixed_cols and col not in excluded_cols]
|
||
|
||
# 构建最终列顺序
|
||
cols = fixed_cols + other_cols
|
||
|
||
# 确保只保留实际存在的列
|
||
cols = [col for col in cols if col in df.columns]
|
||
df = df[cols]
|
||
|
||
print(f"最终DataFrame有 {len(df)} 行")
|
||
|
||
# 导出所有节点到"项目划分"工作簿
|
||
df.to_excel(writer, sheet_name="项目划分_清单", index=False)
|
||
print(f"✅ 已导出 {len(df)} 条项目划分数据")
|
||
print(f"📊 表头列:{df.columns.tolist()}")
|
||
|
||
# 如果是清单工程,创建单独的工作簿
|
||
if project_type == "清单工程" and len(data) > 0:
|
||
# 1. 只导出List节点到"清单节点"工作簿
|
||
df_list = pd.DataFrame([item for item in data if "List" in item.get("节点类型", "")])
|
||
if len(df_list) > 0:
|
||
# 排除sortid列
|
||
if "sortid" in df_list.columns:
|
||
df_list = df_list.drop(columns=["sortid"])
|
||
|
||
df_list.to_excel(writer, sheet_name="清单", index=False)
|
||
print(f"✅ 额外导出 {len(df_list)} 条清单节点数据到'清单节点'工作簿")
|
||
|
||
# 2. 只导出ProjectDivisionItem节点到"项目划分节点"工作簿
|
||
df_pdi = pd.DataFrame([item for item in data if "ProjectDivisionItem" in item.get("节点类型", "")])
|
||
if len(df_pdi) > 0:
|
||
# 排除sortid列
|
||
if "sortid" in df_pdi.columns:
|
||
df_pdi = df_pdi.drop(columns=["sortid"])
|
||
|
||
df_pdi.to_excel(writer, sheet_name="项目划分", index=False)
|
||
print(f"✅ 额外导出 {len(df_pdi)} 条项目划分节点数据到'项目划分节点'工作簿")
|
||
|
||
|
||
# ------------------------ 工程属性导出函数 ------------------------
|
||
def fetch_project_properties(driver) -> List[Dict[str, Any]]:
|
||
"""获取ProjectProperty节点数据(包含 sortid)"""
|
||
query = """
|
||
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(ps:ProjectPropertySet)-[:HAS_CHILD*0..]->(n:ProjectProperty)
|
||
RETURN n.sortid as sortid, n.name as name, n.value as value
|
||
ORDER BY n.sortid // 可选:在数据库层初步排序
|
||
"""
|
||
|
||
results = []
|
||
with driver.session() as session:
|
||
result = session.run(query)
|
||
for record in result:
|
||
results.append({"sortid": record["sortid"], "name": record["name"], "value": record["value"]})
|
||
|
||
# 如果没有找到结果,尝试使用旧的查询方式(向后兼容)
|
||
if not results:
|
||
old_query = """
|
||
MATCH (n:ProjectProperty)
|
||
RETURN n.sortid as sortid, n.name as name, n.value as value
|
||
ORDER BY n.sortid
|
||
"""
|
||
result = session.run(old_query)
|
||
for record in result:
|
||
results.append({"sortid": record["sortid"], "name": record["name"], "value": record["value"]})
|
||
|
||
# 在 Python 中按 sortid 排序(确保复杂格式正确)
|
||
results.sort(key=lambda x: custom_sort_key(str(x["sortid"])))
|
||
return results
|
||
|
||
|
||
def export_project_properties(writer: pd.ExcelWriter, driver) -> None:
|
||
"""导出工程属性工作簿"""
|
||
print("正在导出工程属性数据...")
|
||
data = fetch_project_properties(driver)
|
||
|
||
if not data:
|
||
print("警告:未获取到工程属性数据,将创建空表。")
|
||
df = pd.DataFrame(columns=["sortid", "name", "value"])
|
||
else:
|
||
df = pd.DataFrame(data)
|
||
# 确保列顺序
|
||
df = df[["name", "value"]]
|
||
|
||
df.to_excel(writer, sheet_name="工程属性", index=False)
|
||
print(f"已导出 {len(df)} 条工程属性数据")
|
||
|
||
|
||
# ------------------------ 工程量导出函数 ------------------------
|
||
def fetch_project_quantities(driver) -> List[Dict[str, Any]]:
|
||
"""获取ProjectQuantity节点数据"""
|
||
query = """
|
||
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(pds:ProjectDivisionSet)-[:HAS_CHILD]->(pdt:ProjectDivisionTree)-[:HAS_CHILD*0..]->(n:ProjectQuantity)
|
||
OPTIONAL MATCH (parent)-[:HAS_CHILD]->(n)
|
||
RETURN
|
||
n.name as name,
|
||
n.sortid as sortid,
|
||
COALESCE(parent.GUID, parent.guid, '') AS parent_id, // 优先取 GUID,其次 guid,都无则为空字符串
|
||
properties(n) as properties
|
||
"""
|
||
|
||
results = []
|
||
with driver.session() as session:
|
||
result = session.run(query)
|
||
for record in result:
|
||
props = record["properties"]
|
||
props.pop("name", None)
|
||
props.pop("sortid", None)
|
||
results.append(
|
||
{
|
||
"name": record["name"],
|
||
"sortid": record["sortid"],
|
||
"父级GUID": record["parent_id"],
|
||
**props,
|
||
}
|
||
)
|
||
|
||
# 如果没有找到结果,尝试使用旧的查询方式(向后兼容)
|
||
if not results:
|
||
old_query = """
|
||
MATCH (n:ProjectQuantity)
|
||
OPTIONAL MATCH (parent)-[:HAS_CHILD]->(n)
|
||
RETURN
|
||
n.name as name,
|
||
n.sortid as sortid,
|
||
COALESCE(parent.GUID, parent.guid, '') AS parent_id,
|
||
properties(n) as properties
|
||
"""
|
||
result = session.run(old_query)
|
||
for record in result:
|
||
props = record["properties"]
|
||
props.pop("name", None)
|
||
props.pop("sortid", None)
|
||
results.append(
|
||
{
|
||
"name": record["name"],
|
||
"sortid": record["sortid"],
|
||
"父级GUID": record["parent_id"],
|
||
**props,
|
||
}
|
||
)
|
||
|
||
# 首先按父级GUID排序,再按当前节点的sortid排序
|
||
results.sort(
|
||
key=lambda x: (
|
||
custom_sort_key(str(x["父级GUID"]) if x["父级GUID"] else ""),
|
||
custom_sort_key(str(x["sortid"])),
|
||
)
|
||
)
|
||
return results
|
||
|
||
|
||
def export_project_quantities(writer: pd.ExcelWriter, driver) -> None:
|
||
"""导出工程量工作簿(不显示 sortid 列)"""
|
||
print("正在导出工程量数据...")
|
||
data = fetch_project_quantities(driver)
|
||
|
||
if not data:
|
||
print("警告:未获取到工程量数据,将创建空表。")
|
||
# 创建空表时也不包含 sortid
|
||
fixed_cols = ["父级GUID", "特征段", "name", "规格型号", "单位", "计算式", "数量"]
|
||
df = pd.DataFrame(columns=fixed_cols)
|
||
else:
|
||
df = pd.DataFrame(data)
|
||
|
||
# === 定义希望显示的固定列(按期望顺序)===
|
||
fixed_cols = [
|
||
"父级GUID",
|
||
"特征段",
|
||
"name",
|
||
"规格型号",
|
||
"单位",
|
||
"计算式",
|
||
"数量",
|
||
]
|
||
|
||
# === 排除的列(如不需要显示的字段)===
|
||
excluded_cols = {"sortid"} # 可以添加更多如 "id", "临时字段" 等
|
||
|
||
# === 获取其余列:在 df.columns 中但不在 fixed_cols 且不在 excluded_cols ===
|
||
other_cols = [col for col in df.columns if col not in fixed_cols and col not in excluded_cols]
|
||
|
||
# === 构建最终列顺序 ===
|
||
cols = fixed_cols + other_cols
|
||
|
||
# 确保只保留实际存在的列
|
||
cols = [col for col in cols if col in df.columns]
|
||
df = df[cols]
|
||
|
||
# 导出到 Excel
|
||
df.to_excel(writer, sheet_name="工程量", index=False)
|
||
print(f"✅ 已导出 {len(df)} 条工程量数据")
|
||
print(f"📊 表头列:{df.columns.tolist()}")
|
||
|
||
|
||
# ------------------------ 人材机导出函数 ------------------------
|
||
def fetch_materials_equipments(driver) -> List[Dict[str, Any]]:
|
||
"""获取MaterialOrEquipment节点数据"""
|
||
query = """
|
||
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(pds:ProjectDivisionSet)-[:HAS_CHILD]->(pdt:ProjectDivisionTree)-[:HAS_CHILD*0..]->(pq:ProjectQuantity)-[:HAS_CHILD]->(n:MaterialOrEquipment)
|
||
OPTIONAL MATCH (parent)-[:HAS_CHILD]->(n)
|
||
RETURN n.id as id, n.name as name, n.sortid as sortid, COALESCE(parent.GUID, parent.guid, '') AS parent_id, properties(n) as properties
|
||
"""
|
||
|
||
results = []
|
||
with driver.session() as session:
|
||
result = session.run(query)
|
||
for record in result:
|
||
props = record["properties"]
|
||
props.pop("id", None)
|
||
props.pop("name", None)
|
||
props.pop("sortid", None)
|
||
results.append(
|
||
{
|
||
"id": record["id"],
|
||
"name": record["name"],
|
||
"sortid": record["sortid"],
|
||
"父级GUID": record["parent_id"] if record["parent_id"] else "",
|
||
**props,
|
||
}
|
||
)
|
||
|
||
# 如果没有找到结果,尝试使用旧的查询方式(向后兼容)
|
||
if not results:
|
||
old_query = """
|
||
MATCH (n:MaterialOrEquipment)
|
||
OPTIONAL MATCH (parent)-[:HAS_CHILD]->(n)
|
||
RETURN n.id as id, n.name as name, n.sortid as sortid, COALESCE(parent.GUID, parent.guid, '') AS parent_id, properties(n) as properties
|
||
"""
|
||
result = session.run(old_query)
|
||
for record in result:
|
||
props = record["properties"]
|
||
props.pop("id", None)
|
||
props.pop("name", None)
|
||
props.pop("sortid", None)
|
||
results.append(
|
||
{
|
||
"id": record["id"],
|
||
"name": record["name"],
|
||
"sortid": record["sortid"],
|
||
"父级GUID": record["parent_id"] if record["parent_id"] else "",
|
||
**props,
|
||
}
|
||
)
|
||
|
||
# 首先按父级GUID排序,再按当前节点的sortid排序
|
||
results.sort(key=lambda x: (custom_sort_key(str(x["父级GUID"])), custom_sort_key(str(x["sortid"]))))
|
||
|
||
return results
|
||
|
||
|
||
def export_materials_equipments(writer: pd.ExcelWriter, driver) -> None:
|
||
"""导出入材机工作簿"""
|
||
print("正在导出入材机数据...")
|
||
data = fetch_materials_equipments(driver)
|
||
|
||
if not data:
|
||
print("警告:未获取到入材机数据,将创建空表。")
|
||
df = pd.DataFrame(columns=["父级GUID", "sortid", "id", "name"])
|
||
else:
|
||
df = pd.DataFrame(data)
|
||
|
||
# 父级GUID, sortid, id, name, 其他属性
|
||
cols = ["父级GUID", "id", "name"] + [
|
||
col for col in df.columns if col not in ["父级GUID", "sortid", "id", "name"]
|
||
]
|
||
df = df[cols]
|
||
|
||
df.to_excel(writer, sheet_name="人材机", index=False)
|
||
print(f"已导出 {len(df)} 条人材机数据")
|
||
|
||
|
||
# ------------------------ 费用预览导出函数 ------------------------
|
||
def fetch_cost_set_items(driver, parent_label: str, project_type: str) -> tuple:
|
||
"""
|
||
获取 CostSet 节点数据,正确提取父节点的中文字段
|
||
project_type: 工程类型,"清单工程"或"预算工程"
|
||
"""
|
||
if project_type == "清单工程":
|
||
if parent_label == "ProjectDivisionItem":
|
||
# 清单工程需要考虑ProjectDivisionItem和List两种节点
|
||
child_node = "(child) WHERE child:ProjectDivisionItem OR child:List"
|
||
parent_node = "(parent) WHERE parent:ProjectDivisionItem OR parent:List"
|
||
extra_return = """
|
||
child.专业类型 AS parent_specialty,
|
||
child.取费表 AS parent_fee_table,
|
||
child.序号 AS parent_xuhao,
|
||
labels(child) AS child_labels
|
||
"""
|
||
parent_cols = ["专业类型", "取费表", "序号", "节点类型"]
|
||
elif parent_label == "ProjectQuantity":
|
||
child_node = "(child:ProjectQuantity)"
|
||
parent_node = "(parent:ProjectQuantity)"
|
||
extra_return = """
|
||
child.编码 AS parent_code,
|
||
labels(child) AS child_labels
|
||
"""
|
||
parent_cols = ["编码", "节点类型"]
|
||
else:
|
||
raise ValueError(f"不支持的父节点类型: {parent_label}")
|
||
else:
|
||
# 预算工程的原有逻辑
|
||
if parent_label == "ProjectDivisionItem":
|
||
child_node = "(child:ProjectDivisionItem)"
|
||
parent_node = "(parent:ProjectDivisionItem)"
|
||
extra_return = """
|
||
child.专业类型 AS parent_specialty,
|
||
child.取费表 AS parent_fee_table,
|
||
child.序号 AS parent_xuhao
|
||
"""
|
||
parent_cols = ["专业类型", "取费表", "序号"]
|
||
elif parent_label == "ProjectQuantity":
|
||
child_node = "(child:ProjectQuantity)"
|
||
parent_node = "(parent:ProjectQuantity)"
|
||
extra_return = """
|
||
child.编码 AS parent_code
|
||
"""
|
||
parent_cols = ["编码"]
|
||
else:
|
||
raise ValueError(f"不支持的父节点类型: {parent_label}")
|
||
|
||
# ✅ 关键:MATCH 父子关系,并提取字段,添加对current=true的过滤
|
||
query = f"""
|
||
MATCH (ed:EngineeringData {{current: true}})-[:HAS_CHILD*0..]->{child_node}
|
||
OPTIONAL MATCH (parent)-[:HAS_CHILD]->(child)
|
||
OPTIONAL MATCH (child)-[:USE]->(costSet:CostSet)
|
||
OPTIONAL MATCH (costSet)-[:HAS_CHILD]->(costItem:CostItem)
|
||
|
||
RETURN
|
||
child.name AS child_name,
|
||
COALESCE(costSet.GUID, toString(id(child))) AS guid,
|
||
COALESCE(costSet.sortid, '999999') AS cost_sortid,
|
||
{extra_return.strip()},
|
||
costItem.name AS cost_item_name,
|
||
costItem.cost AS cost_value
|
||
ORDER BY COALESCE(costSet.sortid, '999999')
|
||
"""
|
||
|
||
results = {}
|
||
all_cost_names = []
|
||
|
||
with driver.session() as session:
|
||
result = session.run(query)
|
||
|
||
for record in result:
|
||
child_name = record["child_name"]
|
||
guid = record["guid"]
|
||
cost_sortid = record["cost_sortid"]
|
||
|
||
if guid not in results:
|
||
row = {"sortid": cost_sortid, "GUID": guid, "工程名称": child_name}
|
||
|
||
# ✅ 使用 record 中的字段名(来自 extra_return 的别名)
|
||
if parent_label == "ProjectDivisionItem":
|
||
row["专业类型"] = record["parent_specialty"]
|
||
row["取费表"] = record["parent_fee_table"]
|
||
row["序号"] = record["parent_xuhao"]
|
||
if project_type == "清单工程":
|
||
row["节点类型"] = ",".join(record["child_labels"]) if record["child_labels"] else ""
|
||
elif parent_label == "ProjectQuantity":
|
||
row["编码"] = record["parent_code"]
|
||
if project_type == "清单工程":
|
||
row["节点类型"] = ",".join(record["child_labels"]) if record["child_labels"] else ""
|
||
|
||
results[guid] = row
|
||
|
||
raw_name = record["cost_item_name"]
|
||
if raw_name is not None:
|
||
cost_name = raw_name.split("_")[0] if "_" in raw_name else raw_name
|
||
if cost_name not in all_cost_names:
|
||
all_cost_names.append(cost_name)
|
||
results[guid][cost_name] = record["cost_value"]
|
||
|
||
# 如果没有找到结果,尝试使用旧的查询方式(向后兼容)
|
||
if not results:
|
||
old_query = f"""
|
||
MATCH {child_node}
|
||
OPTIONAL MATCH (parent)-[:HAS_CHILD]->(child)
|
||
OPTIONAL MATCH (child)-[:USE]->(costSet:CostSet)
|
||
OPTIONAL MATCH (costSet)-[:HAS_CHILD]->(costItem:CostItem)
|
||
|
||
RETURN
|
||
child.name AS child_name,
|
||
COALESCE(costSet.GUID, toString(id(child))) AS guid,
|
||
COALESCE(costSet.sortid, '999999') AS cost_sortid,
|
||
{extra_return.strip()},
|
||
costItem.name AS cost_item_name,
|
||
costItem.cost AS cost_value
|
||
ORDER BY COALESCE(costSet.sortid, '999999')
|
||
"""
|
||
result = session.run(old_query)
|
||
|
||
for record in result:
|
||
child_name = record["child_name"]
|
||
guid = record["guid"]
|
||
cost_sortid = record["cost_sortid"]
|
||
|
||
if guid not in results:
|
||
row = {"sortid": cost_sortid, "GUID": guid, "工程名称": child_name}
|
||
|
||
# ✅ 使用 record 中的字段名(来自 extra_return 的别名)
|
||
if parent_label == "ProjectDivisionItem":
|
||
row["专业类型"] = record["parent_specialty"]
|
||
row["取费表"] = record["parent_fee_table"]
|
||
row["序号"] = record["parent_xuhao"]
|
||
if project_type == "清单工程":
|
||
row["节点类型"] = ",".join(record["child_labels"]) if record["child_labels"] else ""
|
||
elif parent_label == "ProjectQuantity":
|
||
row["编码"] = record["parent_code"]
|
||
if project_type == "清单工程":
|
||
row["节点类型"] = ",".join(record["child_labels"]) if record["child_labels"] else ""
|
||
|
||
results[guid] = row
|
||
|
||
raw_name = record["cost_item_name"]
|
||
if raw_name is not None:
|
||
cost_name = raw_name.split("_")[0] if "_" in raw_name else raw_name
|
||
if cost_name not in all_cost_names:
|
||
all_cost_names.append(cost_name)
|
||
results[guid][cost_name] = record["cost_value"]
|
||
|
||
# 排序
|
||
data = list(results.values())
|
||
data.sort(key=lambda x: custom_sort_key(str(x["sortid"])))
|
||
|
||
# 构建列顺序
|
||
base_cols = ["sortid", "GUID", "工程名称"]
|
||
if parent_label == "ProjectDivisionItem":
|
||
base_cols += ["专业类型", "取费表", "序号"]
|
||
if project_type == "清单工程":
|
||
base_cols += ["节点类型"]
|
||
elif parent_label == "ProjectQuantity":
|
||
base_cols += ["编码"]
|
||
if project_type == "清单工程":
|
||
base_cols += ["节点类型"]
|
||
|
||
final_columns = base_cols + all_cost_names
|
||
|
||
return data, final_columns
|
||
|
||
|
||
def export_cost_set_preview(
|
||
writer: pd.ExcelWriter, driver, parent_label: str, sheet_name: str, project_type: str
|
||
) -> None:
|
||
"""
|
||
导出费用预览工作簿(动态列名):按 sortid 排序,但不显示 sortid 列
|
||
project_type: 工程类型,"清单工程"或"预算工程"
|
||
"""
|
||
print(f"正在导出 {sheet_name} 数据({project_type})...")
|
||
|
||
try:
|
||
data, column_order = fetch_cost_set_items(driver, parent_label, project_type)
|
||
except Exception as e:
|
||
print(f"获取数据失败: {e}")
|
||
return
|
||
|
||
if not data:
|
||
print(f"警告: 没有找到 {sheet_name} 相关数据")
|
||
# 构建空表,排除 sortid
|
||
cols_without_sortid = [col for col in column_order if col != "sortid"]
|
||
df = pd.DataFrame(columns=cols_without_sortid)
|
||
else:
|
||
df = pd.DataFrame(data)
|
||
# 补全缺失列
|
||
for col in column_order:
|
||
if col not in df.columns:
|
||
df[col] = None
|
||
# 按原始顺序排列(包含 sortid)
|
||
df = df[column_order]
|
||
# === 关键:移除 sortid 列 ===
|
||
df = df.drop(columns=["sortid"]) # 或使用:df = df[[col for col in df.columns if col != "sortid"]]
|
||
|
||
df.to_excel(writer, sheet_name=sheet_name, index=False)
|
||
print(f"✅ 已导出 {len(df)} 条 {sheet_name} 数据")
|
||
print(f"📊 列顺序: {df.columns.tolist()}")
|
||
|
||
|
||
# ------------------------ 材机分析导出函数 ------------------------
|
||
def fetch_material_machine_items(driver) -> tuple:
|
||
"""获取MaterialandmachineCostItem节点数据(修正版:避免去重)"""
|
||
query = """
|
||
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD*0..]->(parent:ProjectDivisionItem)-[:USE]->(costSet:CostSet)-[:HAS_CHILD]->(item:MaterialandmachineCostItem)
|
||
RETURN
|
||
costSet.GUID AS guid,
|
||
costSet.sortid AS sortid,
|
||
parent.name AS parent_name,
|
||
properties(item) AS properties
|
||
ORDER BY costSet.sortid
|
||
"""
|
||
|
||
results = []
|
||
all_columns = set()
|
||
column_order = [] # 保持字段首次出现顺序
|
||
|
||
with driver.session() as session:
|
||
result = session.run(query)
|
||
|
||
for record in result:
|
||
guid = record["guid"]
|
||
sortid = record["sortid"]
|
||
parent_name = record["parent_name"]
|
||
props = record["properties"]
|
||
|
||
# 构造一行数据
|
||
row = {"sortid": sortid, "GUID": guid, "项目划分名称": parent_name}
|
||
|
||
# 添加 properties 中的每个字段
|
||
for key, value in props.items():
|
||
if key not in all_columns:
|
||
all_columns.add(key)
|
||
column_order.append(key)
|
||
row[key] = value
|
||
|
||
results.append(row)
|
||
|
||
# 如果没有找到结果,尝试使用旧的查询方式(向后兼容)
|
||
if not results:
|
||
old_query = """
|
||
MATCH (parent:ProjectDivisionItem)-[:USE]->(costSet:CostSet)-[:HAS_CHILD]->(item:MaterialandmachineCostItem)
|
||
RETURN
|
||
costSet.GUID AS guid,
|
||
costSet.sortid AS sortid,
|
||
parent.name AS parent_name,
|
||
properties(item) AS properties
|
||
ORDER BY costSet.sortid
|
||
"""
|
||
result = session.run(old_query)
|
||
|
||
for record in result:
|
||
guid = record["guid"]
|
||
sortid = record["sortid"]
|
||
parent_name = record["parent_name"]
|
||
props = record["properties"]
|
||
|
||
# 构造一行数据
|
||
row = {"sortid": sortid, "GUID": guid, "项目划分名称": parent_name}
|
||
|
||
# 添加 properties 中的每个字段
|
||
for key, value in props.items():
|
||
if key not in all_columns:
|
||
all_columns.add(key)
|
||
column_order.append(key)
|
||
row[key] = value
|
||
|
||
results.append(row)
|
||
|
||
# === 关键修改:复合排序键 ===
|
||
# 先按 GUID 排序,再在每组内按 sortid 排序(使用 custom_sort_key)
|
||
results.sort(
|
||
key=lambda x: (x["GUID"], custom_sort_key(str(x["sortid"]))) # 先按 GUID 字符串排序 # 再按 sortid 的自定义顺序
|
||
)
|
||
|
||
# 最终列顺序
|
||
final_columns = ["sortid", "GUID", "项目划分名称"] + column_order
|
||
return results, final_columns
|
||
|
||
|
||
def export_material_machine_analysis(writer: pd.ExcelWriter, driver) -> None:
|
||
"""导出材机分析工作簿(修正版):按 sortid 排序,但不显示 sortid 列"""
|
||
print("正在导出材机分析数据...")
|
||
|
||
try:
|
||
data, column_order = fetch_material_machine_items(driver)
|
||
except Exception as e:
|
||
print(f"获取材机分析数据失败: {e}")
|
||
return
|
||
|
||
if not data:
|
||
print("警告: 没有找到材机分析相关数据")
|
||
# 创建空表,也不包含 sortid
|
||
df = pd.DataFrame(columns=[col for col in column_order if col != "sortid"])
|
||
else:
|
||
df = pd.DataFrame(data)
|
||
|
||
# 确保所有列都存在(防止某些行缺少字段)
|
||
for col in column_order:
|
||
if col not in df.columns:
|
||
df[col] = None
|
||
|
||
# === 关键:移除 sortid 列(但排序已生效)===
|
||
cols_to_keep = [col for col in column_order if col != "sortid"]
|
||
|
||
# 重新排序并过滤列
|
||
df = df[cols_to_keep]
|
||
|
||
df.to_excel(writer, sheet_name="材机分析", index=False)
|
||
print(f"✅ 已导出 {len(df)} 条材机分析数据,列顺序: {df.columns.tolist()}")
|
||
|
||
|
||
# ------------------------ 取费表导出函数 ------------------------
|
||
def fetch_all_fee_collections(driver) -> List[Dict[str, Any]]:
|
||
"""递归获取所有FeeCollection节点数据,保留原始属性名(英文),加入 template_name 和 sortid"""
|
||
query = """
|
||
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(templateSet:FeeTableTemplateSet)-[:HAS_CHILD]->(templateItem:FeeTableTemplateItem)
|
||
OPTIONAL MATCH path=(templateItem)-[:HAS_CHILD*]->(feeCollection:FeeCollection)
|
||
WHERE ALL(r in relationships(path) WHERE type(r) = 'HAS_CHILD')
|
||
RETURN templateItem.name as template_name,
|
||
feeCollection,
|
||
length(path) as depth
|
||
ORDER BY template_name, depth
|
||
"""
|
||
|
||
results = []
|
||
with driver.session() as session:
|
||
result = session.run(query)
|
||
|
||
for record in result:
|
||
fee_collection = record["feeCollection"]
|
||
if fee_collection is None:
|
||
continue
|
||
|
||
# 节点属性转为字典(原始英文属性名)
|
||
props = dict(fee_collection.items())
|
||
|
||
# 添加额外字段(template_name 和 depth),也用英文
|
||
row = {
|
||
"template_name": record["template_name"], # 原始英文字段
|
||
"sortid": props.get("sortid"), # 用于排序和展示
|
||
"depth": record["depth"], # 用于分组排序,导出前会删除
|
||
}
|
||
# 合并节点原始属性(保留英文名)
|
||
row.update(props)
|
||
results.append(row)
|
||
|
||
# 如果没有找到结果,尝试使用旧的查询方式(向后兼容)
|
||
if not results:
|
||
old_query = """
|
||
MATCH (templateSet:FeeTableTemplateSet)-[:HAS_CHILD]->(templateItem:FeeTableTemplateItem)
|
||
OPTIONAL MATCH path=(templateItem)-[:HAS_CHILD*]->(feeCollection:FeeCollection)
|
||
WHERE ALL(r in relationships(path) WHERE type(r) = 'HAS_CHILD')
|
||
RETURN templateItem.name as template_name,
|
||
feeCollection,
|
||
length(path) as depth
|
||
ORDER BY template_name, depth
|
||
"""
|
||
result = session.run(old_query)
|
||
|
||
for record in result:
|
||
fee_collection = record["feeCollection"]
|
||
if fee_collection is None:
|
||
continue
|
||
|
||
# 节点属性转为字典(原始英文属性名)
|
||
props = dict(fee_collection.items())
|
||
|
||
# 添加额外字段(template_name 和 depth),也用英文
|
||
row = {
|
||
"template_name": record["template_name"], # 原始英文字段
|
||
"sortid": props.get("sortid"), # 用于排序和展示
|
||
"depth": record["depth"], # 用于分组排序,导出前会删除
|
||
}
|
||
# 合并节点原始属性(保留英文名)
|
||
row.update(props)
|
||
results.append(row)
|
||
|
||
return results
|
||
|
||
|
||
def export_fee_table(writer, driver) -> None:
|
||
"""导出取费表工作簿:按 sortid 排序,不显示 sortid,固定 name、序号、代码 三列在前"""
|
||
print("正在导出取费表数据...")
|
||
data = fetch_all_fee_collections(driver)
|
||
|
||
if not data:
|
||
print("警告: 没有找到取费表相关数据")
|
||
# 创建空 DataFrame,包含固定列
|
||
df = pd.DataFrame(columns=["序号", "name", "代码"])
|
||
else:
|
||
df = pd.DataFrame(data)
|
||
|
||
# === 确保 sortid 存在,用于排序 ===
|
||
if "sortid" not in df.columns:
|
||
df["sortid"] = ""
|
||
|
||
# 生成排序键
|
||
df["sort_key"] = df["sortid"].apply(custom_sort_key)
|
||
|
||
# 按 template_name 分组,组内按 sort_key 排序
|
||
if "template_name" in df.columns:
|
||
df = df.sort_values(by=["template_name", "sort_key"]).reset_index(drop=True)
|
||
else:
|
||
df = df.sort_values(by=["sort_key"]).reset_index(drop=True)
|
||
|
||
# 删除辅助列
|
||
df = df.drop(["sort_key", "depth"], axis=1, errors="ignore")
|
||
|
||
# === 固定前置列 ===
|
||
fixed_front_cols = ["序号", "name", "代码"]
|
||
|
||
# 补全缺失的固定列(避免 KeyError)
|
||
for col in fixed_front_cols:
|
||
if col not in df.columns:
|
||
df[col] = None
|
||
|
||
# 构建最终列顺序:固定列 + 其他非 sortid 列
|
||
remaining_cols = [col for col in df.columns if col not in fixed_front_cols and col != "sortid"]
|
||
final_columns = fixed_front_cols + remaining_cols
|
||
|
||
df = df[final_columns]
|
||
|
||
# 导出到 Excel
|
||
df.to_excel(writer, sheet_name="取费表", index=False)
|
||
|
||
# 安全获取 template_name 数量
|
||
num_templates = df["template_name"].nunique() if not df.empty and "template_name" in df.columns else 0
|
||
print(f"✅ 已导出 {len(df)} 条取费表数据,来自 {num_templates} 个取费表")
|
||
print(f"📊 表头顺序: {df.columns.tolist()}")
|
||
|
||
|
||
# ------------------------ 工程费用导出函数 ------------------------
|
||
def fetch_fee_schedule_items(driver) -> List[Dict[str, str]]:
|
||
"""获取所有FeeScheduleItem节点"""
|
||
query = """
|
||
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(feeScheduleSet:FeeScheduleSet)-[:HAS_CHILD]->(feeScheduleItem:FeeScheduleItem)
|
||
RETURN feeScheduleItem.name as name
|
||
"""
|
||
|
||
results = []
|
||
with driver.session() as session:
|
||
result = session.run(query)
|
||
for record in result:
|
||
results.append({"name": record["name"]})
|
||
|
||
# 如果没有找到结果,尝试使用旧的查询方式(向后兼容)
|
||
if not results:
|
||
old_query = """
|
||
MATCH (feeScheduleSet:FeeScheduleSet)-[:HAS_CHILD]->(feeScheduleItem:FeeScheduleItem)
|
||
RETURN feeScheduleItem.name as name
|
||
"""
|
||
result = session.run(old_query)
|
||
for record in result:
|
||
results.append({"name": record["name"]})
|
||
|
||
return results
|
||
|
||
|
||
def fetch_fees_by_schedule_item(driver, schedule_item_name: str) -> List[Dict[str, Any]]:
|
||
"""获取指定FeeScheduleItem下的Fee节点数据,动态提取所有属性"""
|
||
query = """
|
||
MATCH (ed:EngineeringData {current: true})-[:HAS_CHILD]->(feeScheduleSet:FeeScheduleSet)-[:HAS_CHILD]->(feeScheduleItem:FeeScheduleItem {name: $schedule_item_name})
|
||
MATCH path=(feeScheduleItem)-[:HAS_CHILD*]->(fee:Fee)
|
||
WHERE ALL(r in relationships(path) WHERE type(r) = 'HAS_CHILD')
|
||
RETURN fee.sortid as sortid,
|
||
fee // 返回整个节点
|
||
"""
|
||
|
||
results = []
|
||
with driver.session() as session:
|
||
result = session.run(query, schedule_item_name=schedule_item_name)
|
||
for record in result:
|
||
fee_node = record["fee"]
|
||
props = dict(fee_node.items()) if fee_node is not None else {}
|
||
row = {"sortid": record["sortid"]}
|
||
row.update(props)
|
||
results.append(row)
|
||
|
||
# 如果没有找到结果,尝试使用旧的查询方式(向后兼容)
|
||
if not results:
|
||
old_query = """
|
||
MATCH (feeScheduleSet:FeeScheduleSet)-[:HAS_CHILD]->(feeScheduleItem:FeeScheduleItem {name: $schedule_item_name})
|
||
MATCH path=(feeScheduleItem)-[:HAS_CHILD*]->(fee:Fee)
|
||
WHERE ALL(r in relationships(path) WHERE type(r) = 'HAS_CHILD')
|
||
RETURN fee.sortid as sortid,
|
||
fee // 返回整个节点
|
||
"""
|
||
result = session.run(old_query, schedule_item_name=schedule_item_name)
|
||
for record in result:
|
||
fee_node = record["fee"]
|
||
props = dict(fee_node.items()) if fee_node is not None else {}
|
||
row = {"sortid": record["sortid"]}
|
||
row.update(props)
|
||
results.append(row)
|
||
|
||
return results
|
||
|
||
|
||
def export_fee_schedules(writer: pd.ExcelWriter, driver) -> None:
|
||
"""导出费用计划工作簿:按 sortid 排序,不显示 sortid,固定 name、序号、代码 三列在前"""
|
||
print("正在导出费用计划数据...")
|
||
schedule_items = fetch_fee_schedule_items(driver)
|
||
|
||
if not schedule_items:
|
||
print("警告: 没有找到费用计划相关数据")
|
||
return
|
||
|
||
# === 固定前置列 ===
|
||
fixed_front_cols = ["序号", "name", "代码"]
|
||
|
||
for item in schedule_items:
|
||
schedule_item_name = item["name"]
|
||
print(f"正在导出 '{schedule_item_name}' 费用计划...")
|
||
|
||
# 获取该 FeeScheduleItem 下的所有 Fee 节点(动态属性)
|
||
fees = fetch_fees_by_schedule_item(driver, schedule_item_name)
|
||
|
||
if not fees:
|
||
print(f"警告: 没有找到 '{schedule_item_name}' 相关的费用数据")
|
||
# 创建空 DataFrame,只包含固定列
|
||
df = pd.DataFrame(columns=fixed_front_cols)
|
||
else:
|
||
df = pd.DataFrame(fees)
|
||
|
||
# === 排序:使用 sortid 排序(关键:排序后才移除)===
|
||
if "sortid" in df.columns:
|
||
df["sort_key"] = df["sortid"].apply(custom_sort_key)
|
||
df = df.sort_values(by=["sort_key"]).reset_index(drop=True)
|
||
df = df.drop(["sort_key"], axis=1)
|
||
|
||
# === 构建列顺序:name、序号、代码 在前,其余非 sortid 列在后 ===
|
||
remaining_cols = [col for col in df.columns if col not in fixed_front_cols and col != "sortid"]
|
||
# 确保 fixed_front_cols 中存在的列才保留(避免 KeyError)
|
||
cols_in_df = [col for col in fixed_front_cols if col in df.columns]
|
||
final_columns = cols_in_df + remaining_cols
|
||
|
||
df = df[final_columns]
|
||
|
||
# sheet 名称限制(最大 31 字符)
|
||
sheet_name = schedule_item_name
|
||
if len(sheet_name) > 31:
|
||
sheet_name = sheet_name[:28] + "..."
|
||
|
||
df.to_excel(writer, sheet_name=sheet_name, index=False)
|
||
print(f"✅ 已导出 {len(df)} 条 '{schedule_item_name}' 费用计划数据")
|
||
print(f"📊 表头顺序: {df.columns.tolist()}")
|
||
|
||
|
||
# ------------------------ 主函数 ------------------------
|
||
def export_knowledge_graph(output_dir: str = "", software_name: str = "") -> str:
|
||
"""
|
||
主导出函数
|
||
|
||
参数:
|
||
output_dir: 输出目录路径
|
||
software_name: 软件名称,优先用作文件名
|
||
|
||
返回:
|
||
str: 导出文件的完整路径
|
||
"""
|
||
# 加载配置并创建驱动
|
||
config = load_neo4j_config()
|
||
driver = GraphDatabase.driver(config["uri"], auth=(config["user"], config["password"]))
|
||
|
||
try:
|
||
# 获取工程类型(清单工程或预算工程)
|
||
project_type = get_project_type(driver)
|
||
print(f"检测到工程类型: {project_type}")
|
||
|
||
# 如果提供了软件名称,优先使用软件名称
|
||
if software_name:
|
||
project_name = software_name
|
||
print(f"使用提供的软件名称作为文件名: {project_name}")
|
||
else:
|
||
# 获取工程名称
|
||
project_name = get_project_name(driver)
|
||
if not project_name:
|
||
print("警告:未找到工程名称,将使用默认文件名")
|
||
|
||
# 如果有指定输出目录,添加到文件名前
|
||
if output_dir:
|
||
if not project_name:
|
||
project_name = "knowledge_graph_export"
|
||
project_name = os.path.join(output_dir, project_name)
|
||
|
||
# 创建Excel写入器和获取文件路径
|
||
writer, output_path = create_excel_writer(project_name)
|
||
|
||
# 导出项目划分工作簿(根据工程类型)
|
||
export_project_division(writer, driver, project_type)
|
||
|
||
# 导出工程量工作簿
|
||
export_project_properties(writer, driver)
|
||
|
||
# 导出工程量工作簿
|
||
export_project_quantities(writer, driver)
|
||
|
||
# 导出人材机工作簿
|
||
export_materials_equipments(writer, driver)
|
||
|
||
# 导出费用预览工作簿(根据工程类型)
|
||
export_cost_set_preview(writer, driver, "ProjectDivisionItem", "项目划分_费用预览", project_type)
|
||
export_cost_set_preview(writer, driver, "ProjectQuantity", "工程量_费用预览", project_type)
|
||
|
||
# 导出材机分析工作簿
|
||
export_material_machine_analysis(writer, driver)
|
||
|
||
# 导出取费表工作簿
|
||
export_fee_table(writer, driver)
|
||
|
||
# 导出费用计划工作簿
|
||
export_fee_schedules(writer, driver)
|
||
|
||
# 保存Excel文件
|
||
writer.close()
|
||
print(f"导出完成,文件已保存到: {os.path.abspath(output_path)}")
|
||
return output_path
|
||
|
||
finally:
|
||
driver.close()
|
||
|
||
|
||
# if __name__ == "__main__":
|
||
# # 执行导出,可以指定输出目录
|
||
# export_knowledge_graph() # 默认当前目录
|