2289 lines
91 KiB
Python
2289 lines
91 KiB
Python
from neo4j import GraphDatabase
|
|
from project import *
|
|
import atexit
|
|
from typing import Tuple, List, Dict, Any, Optional
|
|
import logging
|
|
from src.config import Config
|
|
|
|
logger = logging.getLogger("project_implementation")
|
|
config = Config()
|
|
|
|
|
|
class ProjectTookiItNeo4j(ProjectTookiIt):
|
|
"""
|
|
基于Neo4j数据库的项目类实现
|
|
"""
|
|
|
|
def __init__(self, neo4j_driver):
|
|
logger.info("开始初始化Neo4j连接")
|
|
"""
|
|
初始化Neo4j连接
|
|
|
|
Args:
|
|
neo4j_driver: Neo4j驱动实例,必须提供
|
|
"""
|
|
if neo4j_driver is None:
|
|
raise ValueError("必须提供Neo4j驱动实例")
|
|
|
|
super().__init__()
|
|
|
|
# 保存驱动实例
|
|
self.driver = neo4j_driver
|
|
self.external_driver = True # 标记为外部驱动,不在close时关闭
|
|
self.session = self.driver.session()
|
|
|
|
# 初始化其他必要的数据结构
|
|
self.material_equipment_dict = {} # 材机字典,键为ID
|
|
self.fee_templates = {} # 取费表模板字典,键为ID
|
|
self.fee_schedules = {} # 费用表字典,键为ID
|
|
self.project_properties = {} # 工程属性字典
|
|
logger.info("Neo4j连接初始化完成")
|
|
|
|
def close(self):
|
|
logger.info("开始关闭数据库连接")
|
|
"""
|
|
关闭数据库连接
|
|
"""
|
|
if self.session:
|
|
self.session.close()
|
|
# 仅在非外部驱动的情况下关闭driver
|
|
if self.driver and not self.external_driver:
|
|
self.driver.close()
|
|
logger.info("数据库连接已关闭")
|
|
|
|
def debug_path(self, path):
|
|
logger.info(f"开始调试路径: {path}")
|
|
path_parts = path.split("/")
|
|
print(f"调试路径: {path}")
|
|
|
|
# 检查每一级路径是否存在
|
|
for i in range(len(path_parts)):
|
|
partial_path = "/".join(path_parts[: i + 1])
|
|
query = """
|
|
MATCH (n)
|
|
WHERE n.name = $name
|
|
RETURN n.name as name
|
|
"""
|
|
result = self.session.run(query, name=path_parts[i])
|
|
record = result.single()
|
|
exists = "存在" if record else "不存在"
|
|
print(f" 节点 '{path_parts[i]}' {exists}")
|
|
logger.info(f"路径 {path} 调试完成")
|
|
|
|
# 通用节点查询方法
|
|
def get_node_by_path(self, path, node_labels=None):
|
|
logger.info(f"开始通过路径 {path} 获取节点对象")
|
|
"""
|
|
通过路径获取节点对象
|
|
|
|
Args:
|
|
path (str): 以'/'分隔的多级节点路径
|
|
node_labels (list): 节点标签列表,用于过滤结果
|
|
|
|
Returns:
|
|
dict|None: 节点数据,如果路径不存在返回None
|
|
"""
|
|
if not path:
|
|
logger.warning("输入路径为空,无法获取节点对象")
|
|
return None
|
|
|
|
# 分割路径为各个部分
|
|
path_parts = path.split("/")
|
|
|
|
# 构建查询
|
|
if len(path_parts) == 1:
|
|
# 只有一级路径,直接查询
|
|
if node_labels:
|
|
labels_str = ":" + "|:".join(node_labels)
|
|
query = f"""
|
|
MATCH (n{labels_str})
|
|
WHERE n.name = $name
|
|
RETURN n LIMIT 1
|
|
"""
|
|
else:
|
|
query = """
|
|
MATCH (n)
|
|
WHERE n.name = $name
|
|
RETURN n LIMIT 1
|
|
"""
|
|
params = {"name": path_parts[0]}
|
|
else:
|
|
# 多级路径,构建路径查询
|
|
last_part = path_parts[-1]
|
|
if node_labels:
|
|
labels_str = ":" + "|".join(node_labels)
|
|
query = f"""
|
|
MATCH path = (root)-[*]->(target{labels_str})
|
|
WHERE target.name = $last_part
|
|
RETURN target as n LIMIT 1
|
|
"""
|
|
else:
|
|
query = """
|
|
MATCH path = (root)-[*]->(target)
|
|
WHERE target.name = $last_part
|
|
RETURN target as n LIMIT 1
|
|
"""
|
|
params = {"last_part": last_part}
|
|
|
|
try:
|
|
result = self.session.run(query, **params)
|
|
record = result.single()
|
|
|
|
if not record:
|
|
logger.warning(f"路径 {path} 不存在,未找到节点对象")
|
|
return None
|
|
|
|
logger.info(f"成功通过路径 {path} 获取节点对象")
|
|
return record["n"]
|
|
except Exception as e:
|
|
logger.error(f"获取节点对象时出错: {e}")
|
|
print(f"获取节点对象时出错: {e}")
|
|
return None
|
|
|
|
# 项目划分查询方法
|
|
def get_division_item_by_path(self, path) -> Tuple[str, Dict[str, Any], str, List[Any]]:
|
|
logger.info(f"开始通过路径 {path} 获取项目划分对象")
|
|
"""
|
|
通过路径获取项目划分对象
|
|
|
|
Args:
|
|
path (str): 项目划分节点的路径,以'/'分隔的多级节点路径
|
|
|
|
Returns:
|
|
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
|
|
- status (str): 状态,'success'或'error'
|
|
- data (Dict[str, Any]): 成功时返回的项目划分对象数据
|
|
- error (str): 错误时的错误信息
|
|
- helper_info (List[Any]): 辅助信息列表,如果未找到节点,则包含父节点下所有ProjectDivisionItem节点
|
|
"""
|
|
# 初始化返回结果
|
|
status = "success"
|
|
data = {}
|
|
error = ""
|
|
helper_info = []
|
|
|
|
try:
|
|
# 获取节点数据
|
|
node_data = self.get_node_by_path(path, ["ProjectDivisionItem"])
|
|
|
|
if not node_data:
|
|
status = "error"
|
|
error = f"找不到路径: {path} 上的ProjectDivisionItem节点"
|
|
logger.warning(error)
|
|
|
|
# 提取父路径
|
|
path_parts = path.split("/")
|
|
if len(path_parts) > 1:
|
|
# 去掉最后一个部分,得到父路径
|
|
parent_path = "/".join(path_parts[:-1])
|
|
parent_name = path_parts[-2] # 父节点名称
|
|
|
|
# 获取父节点
|
|
parent_node = self.get_node_by_path(parent_path)
|
|
|
|
if parent_node:
|
|
# 查询父节点下所有类型为ProjectDivisionItem的子节点
|
|
query = """
|
|
MATCH (p)-[*1..1]->(q:ProjectDivisionItem)
|
|
WHERE p.name = $parent_name
|
|
RETURN q.name as name
|
|
"""
|
|
params = {"parent_name": parent_name}
|
|
|
|
try:
|
|
result = self.session.run(query, **params)
|
|
for record in result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
except Exception as e:
|
|
helper_info = [f"查询父节点下的子节点时出错: {str(e)}"]
|
|
logger.error(helper_info[0])
|
|
|
|
logger.info(f"成功通过路径 {path} 获取项目划分对象")
|
|
return status, data, error, helper_info
|
|
|
|
# 创建项目划分对象并填充属性
|
|
item = ProjectDivisionItem()
|
|
for key, value in node_data.items():
|
|
if hasattr(item, key):
|
|
setattr(item, key, value)
|
|
|
|
# 将对象的属性转换为字典
|
|
for key, value in vars(item).items():
|
|
if not key.startswith("_"): # 排除私有属性
|
|
data[key] = value
|
|
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
logger.error(f"获取项目划分对象时出错: {e}")
|
|
import traceback
|
|
|
|
error_details = traceback.format_exc()
|
|
print(f"查询出错: {error_details}")
|
|
|
|
status = "error"
|
|
error = f"查询失败: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
def get_division_node_by_parent_and_name(
|
|
self, parent_path, partial_name
|
|
) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]:
|
|
"""
|
|
通过父节点路径和模糊节点名称获取项目划分对象,包括子节点
|
|
|
|
执行两步查询:
|
|
1. 找到对应路径的父节点
|
|
2. 在父节点下查找名称中包含模糊名称的节点
|
|
|
|
Args:
|
|
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
|
|
partial_name (str): 目标节点的模糊或不完整名称
|
|
|
|
Returns:
|
|
Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
|
|
- status (str): 状态,'success'或'error'
|
|
- data (List[Dict[str, Any]]): 成功时返回的数据列表
|
|
- error (str): 错误时的错误信息
|
|
- helper_info (List[Any]): 辅助信息列表
|
|
"""
|
|
# 初始化返回结果
|
|
status = "success"
|
|
data = []
|
|
error = ""
|
|
helper_info = []
|
|
|
|
# 检查部分名称是否为空
|
|
if not partial_name:
|
|
status = "error"
|
|
error = "节点名称不能为空"
|
|
return status, data, error, helper_info
|
|
|
|
try:
|
|
# 第一步:找到父节点
|
|
parent_node_data = self.get_node_by_path(parent_path)
|
|
|
|
# 如果找不到父节点
|
|
if not parent_node_data:
|
|
status = "error"
|
|
error = f"找不到父节点路径: {parent_path}"
|
|
|
|
# 提取父路径的父节点
|
|
path_parts = parent_path.split("/")
|
|
if len(path_parts) > 1:
|
|
# 去掉最后一个部分,得到父路径的父路径
|
|
grandparent_path = "/".join(path_parts[:-1])
|
|
grandparent_name = path_parts[-2] if len(path_parts) >= 2 else ""
|
|
|
|
# 查询父路径的父节点下所有类型为ProjectDivisionItem的节点,不指定关系类型
|
|
query = """
|
|
MATCH (p)-[*1..1]->(q:ProjectDivisionItem)
|
|
WHERE p.name = $grandparent_name
|
|
RETURN q.name as name
|
|
"""
|
|
params = {"grandparent_name": grandparent_name}
|
|
|
|
try:
|
|
result = self.session.run(query, **params)
|
|
for record in result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
|
|
# 如果没有找到任何节点,尝试更宽泛的查询
|
|
if not helper_info:
|
|
broader_query = """
|
|
MATCH (p {name: $grandparent_name})-[*1..2]->(q:ProjectDivisionItem)
|
|
RETURN q.name as name LIMIT 50
|
|
"""
|
|
broader_result = self.session.run(broader_query, **params)
|
|
for record in broader_result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
except Exception as e:
|
|
print(f"查询辅助信息时出错: {str(e)}")
|
|
helper_info = [f"查询辅助信息时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
# 获取父节点名称
|
|
parent_name = parent_node_data.get("name")
|
|
if not parent_name:
|
|
status = "error"
|
|
error = "父节点数据中没有name字段"
|
|
return status, data, error, helper_info
|
|
|
|
parent_name = parent_node_data.get("name", "")
|
|
|
|
# 第二步:改用父节点名称进行查询
|
|
query = """
|
|
MATCH (p {name: $parent_name})-[*1..1]-(n:ProjectDivisionItem)
|
|
WHERE toLower(n.name) CONTAINS toLower($partial_name)
|
|
RETURN n LIMIT 50
|
|
"""
|
|
params = {"parent_name": parent_name, "partial_name": partial_name.strip()}
|
|
|
|
# print(f"调试参数: parent_name='{parent_name}', partial_name='{partial_name}'")
|
|
result = self.session.run(query, **params)
|
|
records = list(result)
|
|
# print(f"调试返回: 找到{len(records)}条记录")
|
|
|
|
# 处理查询结果
|
|
items = []
|
|
for record in records:
|
|
node_data = record["n"]
|
|
item = ProjectDivisionItem()
|
|
for key, value in node_data.items():
|
|
if hasattr(item, key):
|
|
setattr(item, key, value)
|
|
items.append(vars(item))
|
|
|
|
# 如果没有找到匹配的节点
|
|
if not items:
|
|
status = "error"
|
|
error = f"在父节点 '{parent_name}' 下找不到名称包含 '{partial_name}' 的节点"
|
|
|
|
# 查询父节点下所有类型为ProjectDivisionItem的节点作为辅助信息,不指定关系类型
|
|
helper_query = """
|
|
MATCH (p)-[*1..1]->(q:ProjectDivisionItem)
|
|
WHERE p.id = $parent_id
|
|
RETURN q.name as name
|
|
"""
|
|
helper_params = {"parent_name": parent_name}
|
|
|
|
try:
|
|
helper_result = self.session.run(helper_query, **helper_params)
|
|
for record in helper_result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
|
|
# 如果没有找到任何节点,尝试更宽泛的查询
|
|
if not helper_info:
|
|
broader_query = """
|
|
MATCH (p)-[*1..2]->(q:ProjectDivisionItem)
|
|
WHERE p.id = $parent_id
|
|
RETURN q.name as name LIMIT 50
|
|
"""
|
|
broader_result = self.session.run(broader_query, **helper_params)
|
|
for record in broader_result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
|
|
# 如果仍然没有找到,尝试使用名称而不是ID
|
|
if not helper_info:
|
|
name_query = """
|
|
MATCH (p {name: $parent_name})-[*1..2]->(q:ProjectDivisionItem)
|
|
RETURN q.name as name LIMIT 50
|
|
"""
|
|
name_params = {"parent_name": parent_name}
|
|
name_result = self.session.run(name_query, **name_params)
|
|
for record in name_result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
except Exception as e:
|
|
print(f"查询辅助信息时出错: {str(e)}")
|
|
helper_info = [f"查询辅助信息时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
# 成功找到匹配节点
|
|
data = items
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
error_details = traceback.format_exc()
|
|
print(f"查询出错: {error_details}")
|
|
|
|
status = "error"
|
|
error = f"查询失败: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
def get_division_by_name(self, name_part) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]:
|
|
"""
|
|
直接查找节点类型为ProjectDivisionItem且name字段包含输入名称的节点
|
|
|
|
Args:
|
|
name_part (str): 节点名称的部分内容
|
|
|
|
Returns:
|
|
Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
|
|
- status (str): 状态,'success'或'error'
|
|
- data (List[Dict[str, Any]]): 成功时返回的数据列表
|
|
- error (str): 错误时的错误信息
|
|
- helper_info (List[Any]): 辅助信息列表
|
|
"""
|
|
# 初始化返回结果
|
|
status = "success"
|
|
data = []
|
|
error = ""
|
|
helper_info = []
|
|
|
|
# 检查输入名称是否为空
|
|
if not name_part or name_part.strip() == "":
|
|
status = "error"
|
|
error = "输入的名称部分不能为空"
|
|
return status, data, error, helper_info
|
|
|
|
try:
|
|
# 直接查询所有类型为ProjectDivisionItem且name包含输入名称的节点
|
|
query = """
|
|
MATCH (n:ProjectDivisionItem)
|
|
WHERE toLower(n.name) CONTAINS toLower($name_part)
|
|
RETURN n LIMIT 50
|
|
"""
|
|
params = {"name_part": name_part.strip()}
|
|
|
|
result = self.session.run(query, parameters=params)
|
|
records = list(result)
|
|
|
|
if not records:
|
|
status = "error"
|
|
error = f"找不到名称包含 '{name_part}' 的ProjectDivisionItem节点"
|
|
|
|
# 提供一些可能相近的节点名称作为辅助信息
|
|
broader_query = """
|
|
MATCH (n:ProjectDivisionItem)
|
|
RETURN n.name AS name LIMIT 20
|
|
"""
|
|
broader_result = self.session.run(broader_query)
|
|
for record in broader_result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
|
|
return status, data, error, helper_info
|
|
|
|
# 处理查询结果
|
|
items = []
|
|
for record in records:
|
|
node_data = record["n"]
|
|
item = ProjectDivisionItem()
|
|
for key, value in node_data.items():
|
|
if hasattr(item, key):
|
|
setattr(item, key, value)
|
|
|
|
# 获取节点的完整路径作为附加信息
|
|
node_path_query = """
|
|
MATCH path = (root)-[*]->(target:ProjectDivisionItem)
|
|
WHERE id(target) = $node_id
|
|
WITH [node in nodes(path) | node.name] AS path_names
|
|
RETURN path_names
|
|
"""
|
|
path_result = self.session.run(node_path_query, parameters={"node_id": node_data.id})
|
|
path_record = path_result.single()
|
|
|
|
node_dict = vars(item)
|
|
if path_record:
|
|
node_dict["完整路径"] = "/".join([name for name in path_record["path_names"] if name])
|
|
|
|
items.append(node_dict)
|
|
|
|
data = items
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
error_details = traceback.format_exc()
|
|
print(f"查询出错: {error_details}")
|
|
|
|
status = "error"
|
|
error = f"查询失败: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
# 工程量查询方法
|
|
def get_quantities_by_paths(self, paths_str) -> Tuple[str, Dict[str, Any], str, List[Any]]:
|
|
"""
|
|
获取指定项目路径下的工程量对象
|
|
|
|
Args:
|
|
paths_str (str): 以'/'分隔的多级节点路径
|
|
|
|
Returns:
|
|
Tuple[str, Dict[str, Any], str, List[Any]]:
|
|
- status: 'success'或'error'
|
|
- data: 成功时返回的节点数据字典,失败时为{}
|
|
- error: 错误信息,成功时为''
|
|
- helper_info: 辅助信息列表(当目标节点不存在时返回父节点下的所有ProjectQuantity节点名称)
|
|
"""
|
|
# 初始化返回结果
|
|
status = "success"
|
|
data = {}
|
|
error = ""
|
|
helper_info = []
|
|
|
|
if not paths_str:
|
|
status = "error"
|
|
error = "路径不能为空"
|
|
return status, data, error, helper_info
|
|
|
|
try:
|
|
# 使用通用方法获取节点,考虑所有可能的工程量类型
|
|
node_data = self.get_node_by_path(paths_str, ["ProjectQuantity", "Quota", "MainMaterial", "Equipment"])
|
|
|
|
if node_data:
|
|
# 根据节点标签或类型属性创建对应类型的对象
|
|
quantity = self._create_quantity_object(node_data)
|
|
|
|
# 填充属性
|
|
for key, value in node_data.items():
|
|
if hasattr(quantity, key):
|
|
setattr(quantity, key, value)
|
|
|
|
# 将对象转为字典
|
|
data = vars(quantity)
|
|
return status, data, error, helper_info
|
|
|
|
# 如果找不到节点,尝试获取父节点下的所有ProjectQuantity节点作为辅助信息
|
|
path_parts = paths_str.split("/")
|
|
if len(path_parts) > 1:
|
|
parent_path = "/".join(path_parts[:-1])
|
|
target_name = path_parts[-1]
|
|
|
|
# 获取父节点
|
|
parent_node_data = self.get_node_by_path(parent_path, ["ProjectDivisionItem"])
|
|
|
|
if parent_node_data:
|
|
# 查询父节点下的所有符合条件的节点
|
|
query = """
|
|
MATCH (p)-[*1..1]->(n)
|
|
WHERE p.name = $parent_name AND
|
|
any(label IN labels(n) WHERE label IN $allowed_labels)
|
|
RETURN n.name as name, labels(n) as labels
|
|
"""
|
|
params = {
|
|
"parent_name": parent_node_data.get("name", ""),
|
|
"allowed_labels": ["ProjectQuantity", "Quota", "MainMaterial", "Equipment"],
|
|
}
|
|
|
|
try:
|
|
result = self.session.run(query, **params)
|
|
for record in result:
|
|
if record["name"]:
|
|
helper_info.append({record["name"]})
|
|
except Exception as e:
|
|
print(f"查询辅助信息时出错: {str(e)}")
|
|
helper_info = [f"查询辅助信息时出错: {str(e)}"]
|
|
|
|
status = "error"
|
|
error = f"找不到路径: {paths_str}"
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
status = "error"
|
|
error = f"查询失败: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
def get_quantities_node_by_parent_and_code(
|
|
self, parent_path, quantity_type=None, code=None
|
|
) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]:
|
|
"""
|
|
通过父节点路径和编码获取工程量对象(定额、主材或设备),包括子节点
|
|
|
|
执行三步查询:
|
|
1. 找到对应路径的父节点
|
|
2. 找到父节点下多级子节点中节点类型为参数中节点类型的所有节点
|
|
3. 在找到的节点中查找编码与传入编码匹配的节点并返回
|
|
|
|
Args:
|
|
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
|
|
quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None表示所有类型)
|
|
code (str): 工程量编码,以'/'分隔的多个编码
|
|
|
|
Returns:
|
|
Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
|
|
- status (str): 状态,'success'或'error'
|
|
- data (List[Dict[str, Any]]): 成功时返回的数据列表
|
|
- error (str): 错误时的错误信息
|
|
- helper_info (List[Any]): 辅助信息列表
|
|
"""
|
|
# 初始化返回结果
|
|
status = "success"
|
|
data = []
|
|
error = ""
|
|
helper_info = []
|
|
|
|
if not code:
|
|
status = "error"
|
|
error = "编码不能为空"
|
|
return status, data, error, helper_info
|
|
|
|
# 验证类型参数是否有效
|
|
valid_types = ["定额", "主材", "设备", None]
|
|
if quantity_type not in valid_types:
|
|
status = "error"
|
|
error = f"无效的工程量类型: '{quantity_type}'"
|
|
helper_info = valid_types
|
|
return status, data, error, helper_info
|
|
|
|
try:
|
|
# 步骤1: 找到对应路径的父节点
|
|
parent_node = self.get_node_by_path(parent_path)
|
|
if not parent_node:
|
|
status = "error"
|
|
error = f"找不到父节点路径: {parent_path}"
|
|
|
|
# 尝试提供辅助信息 - 路径的各部分是否存在
|
|
path_parts = parent_path.split("/")
|
|
existing_parts = []
|
|
for i in range(len(path_parts)):
|
|
partial_path = "/".join(path_parts[: i + 1])
|
|
if self.get_node_by_path(partial_path):
|
|
existing_parts.append(partial_path)
|
|
helper_info = existing_parts
|
|
return status, data, error, helper_info
|
|
|
|
# 提取父节点名称用于后续查询
|
|
parent_name = parent_node.get("name", "")
|
|
|
|
# 根据工程量类型确定类型条件
|
|
type_condition = ""
|
|
|
|
if quantity_type == "定额":
|
|
type_condition = "(q:ProjectQuantity:Quota OR q.类型 = '0')"
|
|
elif quantity_type == "主材":
|
|
type_condition = "(q:ProjectQuantity:MainMaterial OR q.类型 = '1')"
|
|
elif quantity_type == "设备":
|
|
type_condition = "(q:ProjectQuantity:Equipment OR q.类型 = '5')"
|
|
else:
|
|
type_condition = "q:ProjectQuantity"
|
|
|
|
# 步骤2: 查找父节点下所有的指定类型的子节点
|
|
child_query = f"""
|
|
MATCH (p)-[*1..10]->(q)
|
|
WHERE p.name = $parent_name AND {type_condition}
|
|
RETURN q
|
|
"""
|
|
child_params = {"parent_name": parent_name}
|
|
|
|
child_result = self.session.run(child_query, **child_params)
|
|
child_nodes = [record["q"] for record in child_result]
|
|
|
|
if not child_nodes:
|
|
status = "error"
|
|
error = f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 的节点"
|
|
|
|
# 尝试提供辅助信息 - 父节点下有哪些节点名称
|
|
helper_query = """
|
|
MATCH (p)-[*1..10]->(q)
|
|
WHERE p.name = $parent_name
|
|
RETURN DISTINCT q.name as node_name
|
|
LIMIT 20
|
|
"""
|
|
helper_result = self.session.run(helper_query, **child_params)
|
|
helper_info = [record["node_name"] for record in helper_result if record["node_name"]]
|
|
return status, data, error, helper_info
|
|
|
|
# 步骤3: 查找编码匹配的节点
|
|
code_conditions = []
|
|
code_parts = code.split("/")
|
|
|
|
for code_part in code_parts:
|
|
if code_part.strip():
|
|
code_conditions.append(code_part.strip())
|
|
|
|
if not code_conditions:
|
|
status = "error"
|
|
error = "提供的编码为空"
|
|
return status, data, error, helper_info
|
|
|
|
# 在 child_nodes 中直接查找编码匹配的节点
|
|
matching_nodes = []
|
|
|
|
for node in child_nodes:
|
|
node_code = node.get("编码", "")
|
|
if any(code_part in str(node_code) for code_part in code_conditions):
|
|
matching_nodes.append(node)
|
|
|
|
if not matching_nodes:
|
|
status = "error"
|
|
error = f"在父节点 '{parent_name}' 下找不到编码匹配的节点"
|
|
|
|
# 尝试提供辅助信息 - 所有指定类型的子节点的编码
|
|
helper_info = [node.get("编码") for node in child_nodes if node.get("编码")]
|
|
return status, data, error, helper_info
|
|
|
|
# 处理匹配的节点,转换为字典格式
|
|
result_data = []
|
|
for node in matching_nodes:
|
|
quantity = self._create_quantity_object(node, quantity_type)
|
|
|
|
for key, value in node.items():
|
|
if hasattr(quantity, key):
|
|
setattr(quantity, key, value)
|
|
|
|
attrs = {}
|
|
for key, value in vars(quantity).items():
|
|
if not key.startswith("_"): # 排除私有属性
|
|
attrs[key] = value
|
|
|
|
result_data.append(attrs)
|
|
|
|
data = result_data
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
error_details = traceback.format_exc()
|
|
print(f"查询出错: {error_details}")
|
|
|
|
status = "error"
|
|
error = f"查询失败: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
def get_quantities_node_by_parent_and_name(
|
|
self, parent_path, partial_name, quantity_type=None
|
|
) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]:
|
|
"""
|
|
通过父节点路径、模糊节点名称和类型获取工程量对象(主材或者设备),包括子节点
|
|
|
|
执行三步查询:
|
|
1. 找到对应路径的父节点
|
|
2. 找到父节点下多级子节点中节点类型为参数中节点类型的所有节点
|
|
3. 在找到的节点中找到名称中包含节点模糊名称的节点返回
|
|
|
|
Args:
|
|
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
|
|
partial_name (str): 目标节点的模糊或不完整名称
|
|
quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None表示所有类型)
|
|
|
|
Returns:
|
|
Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
|
|
- status (str): 状态,'success'或'error'
|
|
- data (List[Dict[str, Any]]): 成功时返回的数据列表
|
|
- error (str): 错误时的错误信息
|
|
- helper_info (List[Any]): 辅助信息列表
|
|
"""
|
|
# 初始化返回结果
|
|
status = "success"
|
|
data = []
|
|
error = ""
|
|
helper_info = []
|
|
|
|
if not partial_name:
|
|
status = "error"
|
|
error = "节点名称不能为空"
|
|
return status, data, error, helper_info
|
|
|
|
# 验证类型参数是否有效
|
|
valid_types = ["定额", "主材", "设备", None]
|
|
if quantity_type not in valid_types:
|
|
status = "error"
|
|
error = f"无效的工程量类型: '{quantity_type}'"
|
|
helper_info = valid_types
|
|
return status, data, error, helper_info
|
|
|
|
try:
|
|
# 步骤1: 找到对应路径的父节点
|
|
parent_node = self.get_node_by_path(parent_path)
|
|
if not parent_node:
|
|
status = "error"
|
|
error = f"找不到父节点路径: {parent_path}"
|
|
# 尝试提供辅助信息 - 路径的各部分是否存在
|
|
path_parts = parent_path.split("/")
|
|
existing_parts = []
|
|
for i in range(len(path_parts)):
|
|
partial_path = "/".join(path_parts[: i + 1])
|
|
if self.get_node_by_path(partial_path):
|
|
existing_parts.append(partial_path)
|
|
helper_info = existing_parts
|
|
return status, data, error, helper_info
|
|
|
|
# 从父节点中提取名称,用于后续查询
|
|
parent_name = parent_node.get("name", "")
|
|
|
|
# 根据工程量类型确定类型条件,不使用标签过滤
|
|
type_condition = ""
|
|
|
|
if quantity_type == "定额":
|
|
# 定额类型可能是 ProjectQuantity+Quota 或者 类型='0'
|
|
type_condition = "(q:ProjectQuantity:Quota OR q.类型 = '0')"
|
|
elif quantity_type == "主材":
|
|
# 主材类型可能是 ProjectQuantity+MainMaterial 或者 类型='1'
|
|
type_condition = "(q:ProjectQuantity:MainMaterial OR q.类型 = '1')"
|
|
elif quantity_type == "设备":
|
|
# 设备类型可能是 ProjectQuantity+Equipment 或者 类型='5'
|
|
type_condition = "(q:ProjectQuantity:Equipment OR q.类型 = '5')"
|
|
else:
|
|
# 如果没有指定类型,则查询所有ProjectQuantity节点
|
|
type_condition = "q:ProjectQuantity"
|
|
|
|
# 步骤2: 找到父节点下所有的子节点,类型为指定类型的节点
|
|
# 使用父节点名称进行查询,不指定具体的关系类型
|
|
child_query = f"""
|
|
MATCH (p)-[*1..10]->(q)
|
|
WHERE p.name = $parent_name AND {type_condition}
|
|
RETURN q
|
|
"""
|
|
child_params = {"parent_name": parent_name}
|
|
|
|
child_result = self.session.run(child_query, **child_params)
|
|
child_nodes = [record["q"] for record in child_result]
|
|
|
|
if not child_nodes:
|
|
# 如果没有找到节点,尝试直接使用父节点路径的最后一部分进行查询
|
|
path_parts = parent_path.split("/")
|
|
last_part = path_parts[-1] if path_parts else ""
|
|
|
|
fallback_query = f"""
|
|
MATCH (p)-[*1..10]->(q)
|
|
WHERE p.name CONTAINS $last_part AND {type_condition}
|
|
RETURN q LIMIT 100
|
|
"""
|
|
fallback_params = {"last_part": last_part}
|
|
|
|
fallback_result = self.session.run(fallback_query, **fallback_params)
|
|
child_nodes = [record["q"] for record in fallback_result]
|
|
|
|
if not child_nodes:
|
|
status = "error"
|
|
error = f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 的节点"
|
|
|
|
# 尝试提供辅助信息 - 父节点下有哪些类型的节点
|
|
helper_query = """
|
|
MATCH (p)-[*1..10]->(q)
|
|
WHERE p.name = $parent_name
|
|
RETURN DISTINCT labels(q) as node_labels, q.类型 as node_type
|
|
LIMIT 20
|
|
"""
|
|
helper_params = {"parent_name": parent_name}
|
|
|
|
helper_result = self.session.run(helper_query, **helper_params)
|
|
helper_data = []
|
|
for record in helper_result:
|
|
labels = record["node_labels"] if record["node_labels"] else []
|
|
node_type = record["node_type"] if record["node_type"] else "未知"
|
|
helper_data.append(f"标签: {labels}, 类型: {node_type}")
|
|
|
|
helper_info = helper_data
|
|
return status, data, error, helper_info
|
|
|
|
# 步骤3: 在找到的节点中找到名称中包含节点模糊名称的节点
|
|
matching_nodes = []
|
|
available_nodes = [] # 用于存储所有可用节点的名称,作为辅助信息
|
|
|
|
for node in child_nodes:
|
|
node_name = node.get("name", "")
|
|
# 记录所有节点名称作为辅助信息
|
|
if node_name:
|
|
available_nodes.append(node_name)
|
|
|
|
if partial_name in str(node_name):
|
|
matching_nodes.append(node)
|
|
|
|
if not matching_nodes:
|
|
# 如果没有找到匹配的节点,尝试直接查询
|
|
direct_query = f"""
|
|
MATCH (q)
|
|
WHERE q.name CONTAINS $partial_name AND {type_condition}
|
|
RETURN q LIMIT 20
|
|
"""
|
|
direct_params = {"partial_name": partial_name}
|
|
|
|
direct_result = self.session.run(direct_query, **direct_params)
|
|
matching_nodes = [record["q"] for record in direct_result]
|
|
|
|
if not matching_nodes:
|
|
status = "error"
|
|
error = (
|
|
f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 且名称包含 '{partial_name}' 的节点"
|
|
)
|
|
# 提供辅助信息 - 该路径下所有可用的节点名称
|
|
helper_info = available_nodes
|
|
return status, data, error, helper_info
|
|
|
|
# 处理匹配的节点,将它们转换为对象
|
|
result_data = []
|
|
for matching_node in matching_nodes:
|
|
# 创建对应类型的对象
|
|
quantity = self._create_quantity_object(matching_node, quantity_type)
|
|
|
|
# 填充属性
|
|
for key, value in matching_node.items():
|
|
if hasattr(quantity, key):
|
|
setattr(quantity, key, value)
|
|
|
|
# 将对象的属性转换为字典
|
|
attrs = {}
|
|
for key, value in vars(quantity).items():
|
|
if not key.startswith("_"): # 排除私有属性
|
|
attrs[key] = value
|
|
|
|
result_data.append(attrs)
|
|
|
|
# 成功找到匹配节点
|
|
data = result_data
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
error_details = traceback.format_exc()
|
|
print(f"查询出错: {error_details}")
|
|
|
|
status = "error"
|
|
error = f"查询失败: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
# 辅助方法,用于根据节点数据创建对应类型的工程量对象
|
|
def _create_quantity_object(self, node_data, quantity_type=None):
|
|
"""
|
|
根据节点数据创建对应类型的工程量对象
|
|
|
|
Args:
|
|
node_data (dict): 节点数据
|
|
quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None)
|
|
|
|
Returns:
|
|
ProjectQuantity: 创建的工程量对象
|
|
"""
|
|
# 如果指定了类型,直接创建对应类型的对象
|
|
if quantity_type == "定额":
|
|
return Ration()
|
|
elif quantity_type == "主材":
|
|
return Material()
|
|
elif quantity_type == "设备":
|
|
return Equipment()
|
|
|
|
# 如果没有指定类型,尝试通过节点属性或标签判断
|
|
if "类型" in node_data:
|
|
if node_data["类型"] == "0":
|
|
return Ration()
|
|
elif node_data["类型"] == "1":
|
|
return Material()
|
|
elif node_data["类型"] == "5":
|
|
return Equipment()
|
|
|
|
# 通过标签判断
|
|
labels = list(node_data.labels) if hasattr(node_data, "labels") else []
|
|
if "Quota" in labels:
|
|
return Ration()
|
|
elif "MainMaterial" in labels:
|
|
return Material()
|
|
elif "Equipment" in labels:
|
|
return Equipment()
|
|
|
|
# 默认返回基类对象
|
|
return ProjectQuantity()
|
|
|
|
# 材机查询方法实现
|
|
def get_material_equipment_by_path(self, path: str) -> Tuple[str, Dict[str, Any], str, List[Any]]:
|
|
"""
|
|
通过路径获取材机对象(MaterialOrEquipment 类型节点)
|
|
|
|
Args:
|
|
path (str): 项目划分节点的路径,以'/'分隔的多级节点路径
|
|
|
|
Returns:
|
|
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
|
|
- status (str): 状态,'success'或'error'
|
|
- data (Dict[str, Any]): 成功时返回的节点属性数据
|
|
- error (str): 错误时的错误信息
|
|
- helper_info (List[Any]): 辅助信息列表:
|
|
- 如果未找到目标节点,则包含父节点下所有 MaterialOrEquipment 节点名称
|
|
- 否则为空
|
|
"""
|
|
# 初始化返回结果
|
|
status = "success"
|
|
data = {}
|
|
error = ""
|
|
helper_info = []
|
|
|
|
try:
|
|
# 获取目标节点
|
|
node_data = self.get_node_by_path(path)
|
|
|
|
if not node_data:
|
|
status = "error"
|
|
error = f"找不到路径: {path} 上的 MaterialOrEquipment 节点"
|
|
|
|
# 提取父路径
|
|
path_parts = path.split("/")
|
|
if len(path_parts) > 1:
|
|
parent_path = "/".join(path_parts[:-1]) # 父节点路径
|
|
parent_name = path_parts[-2] # 父节点名称
|
|
|
|
# 获取父节点
|
|
parent_node = self.get_node_by_path(parent_path)
|
|
if parent_node:
|
|
# 查询父节点下所有类型为 MaterialOrEquipment 的子节点名称
|
|
query = """
|
|
MATCH (p)-[*1..1]->(m:MaterialOrEquipment)
|
|
WHERE p.name = $parent_name
|
|
RETURN m.name AS name
|
|
"""
|
|
params = {"parent_name": parent_name}
|
|
|
|
try:
|
|
result = self.session.run(query, **params)
|
|
for record in result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
except Exception as e:
|
|
helper_info = [f"查询父节点下的材机子节点时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
# 将节点属性转换为字典并过滤掉私有字段
|
|
for key, value in node_data.items():
|
|
if not key.startswith("_"):
|
|
data[key] = value
|
|
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
error_details = traceback.format_exc()
|
|
print(f"查询出错: {error_details}")
|
|
|
|
status = "error"
|
|
error = f"查询失败: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
def get_material_equipment_by_parent_and_name(
|
|
self, parent_path: str, partial_name: str
|
|
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
|
|
"""
|
|
通过父节点路径和模糊名称获取材机对象
|
|
|
|
Args:
|
|
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
|
|
partial_name (str): 目标节点的模糊或不完整名称
|
|
|
|
Returns:
|
|
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
|
|
"""
|
|
status = "success"
|
|
data = {}
|
|
error = ""
|
|
helper_info = []
|
|
|
|
try:
|
|
if not parent_path or not partial_name:
|
|
status = "error"
|
|
error = "父节点路径或要查找的名称不能为空"
|
|
return status, data, error, helper_info
|
|
|
|
# 第一步:查找父节点
|
|
parent_node_data = self.get_node_by_path(parent_path)
|
|
|
|
if not parent_node_data:
|
|
status = "error"
|
|
error = f"找不到父节点路径: {parent_path}"
|
|
|
|
# 获取祖父节点路径
|
|
path_parts = parent_path.split("/")
|
|
if len(path_parts) > 1:
|
|
grandparent_path = "/".join(path_parts[:-1])
|
|
grandparent_name = path_parts[-2]
|
|
|
|
# 查询祖父节点下的所有子节点名称作为 helper_info
|
|
query = """
|
|
MATCH (p)-[*1..1]->(child)
|
|
WHERE p.name = $grandparent_name
|
|
RETURN child.name AS name
|
|
"""
|
|
params = {"grandparent_name": grandparent_name}
|
|
|
|
try:
|
|
result = self.session.run(query, **params)
|
|
for record in result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
except Exception as e:
|
|
helper_info = [f"查询祖父节点下的子节点失败: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
parent_id = parent_node_data.get("id")
|
|
if not parent_id:
|
|
status = "error"
|
|
error = f"父节点 {parent_path} 没有有效的 ID"
|
|
return status, data, error, helper_info
|
|
|
|
# 第二步:查找父节点下名称包含 partial_name 的 MaterialOrEquipment 节点
|
|
query = """
|
|
MATCH (p)-[*1..1]->(m:MaterialOrEquipment)
|
|
WHERE p.id = $parent_id AND m.name CONTAINS $partial_name
|
|
RETURN m LIMIT 20
|
|
"""
|
|
params = {"parent_id": parent_id, "partial_name": partial_name}
|
|
|
|
result = self.session.run(query, **params)
|
|
|
|
matched_nodes = []
|
|
for record in result:
|
|
node = record["m"]
|
|
properties = dict(node.items())
|
|
matched_nodes.append(properties)
|
|
|
|
if not matched_nodes:
|
|
status = "error"
|
|
error = f"在路径 {parent_path} 下没有找到名称包含 '{partial_name}' 的 MaterialOrEquipment 节点"
|
|
|
|
# 查询父节点下所有 MaterialOrEquipment 子节点名称作为 helper_info
|
|
query_helper = """
|
|
MATCH (p)-[*1..1]->(m:MaterialOrEquipment)
|
|
WHERE p.id = $parent_id
|
|
RETURN m.name AS name
|
|
"""
|
|
try:
|
|
result_helper = self.session.run(query_helper, parent_id=parent_id)
|
|
for record in result_helper:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
except Exception as e:
|
|
helper_info = [f"查询父节点下的 MaterialOrEquipment 子节点失败: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
# 成功找到,返回第一个节点的数据
|
|
data = matched_nodes[0]
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
error_details = traceback.format_exc()
|
|
print(f"查询出错: {error_details}")
|
|
|
|
status = "error"
|
|
error = f"查询失败: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
# 辅助方法,用于创建材机对象并填充属性
|
|
def _create_material_object(self, node_data):
|
|
"""
|
|
根据节点数据创建材机对象并填充属性
|
|
|
|
Args:
|
|
node_data (dict): 节点数据
|
|
|
|
Returns:
|
|
MaterialOrEquipment: 创建的材机对象
|
|
"""
|
|
material = MaterialOrEquipment()
|
|
|
|
# 填充属性
|
|
for key, value in node_data.items():
|
|
if hasattr(material, key):
|
|
setattr(material, key, value)
|
|
|
|
return material
|
|
|
|
# 取费表模板查询方法实现
|
|
def get_fee_template_by_path(self, path: str) -> Tuple[str, Dict[str, Any], str, List[Any]]:
|
|
"""
|
|
通过路径获取取费表模板节点
|
|
|
|
Args:
|
|
path (str): 项目划分节点的路径,以'/'分隔的多级节点路径
|
|
例如:工程数据/取费表模板/余物清理/线路取费表(余物清理)/直接费
|
|
|
|
Returns:
|
|
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
|
|
- status (str): 状态,'success'或'error'
|
|
- data (Dict[str, Any]): 成功时返回的节点属性数据
|
|
- error (str): 错误时的错误信息
|
|
- helper_info (List[Any]): 辅助信息列表:
|
|
- 如果未找到目标节点,则包含父节点下所有 FeeCollection 节点名称
|
|
- 否则为空
|
|
"""
|
|
# 初始化返回结果
|
|
status = "success"
|
|
data = {}
|
|
error = ""
|
|
helper_info = []
|
|
|
|
try:
|
|
# 获取目标节点
|
|
node_data = self.get_node_by_path(path)
|
|
|
|
if not node_data:
|
|
status = "error"
|
|
error = f"找不到路径: {path} 上的节点"
|
|
|
|
# 提取父路径
|
|
path_parts = path.split("/")
|
|
if len(path_parts) > 1:
|
|
parent_path = "/".join(path_parts[:-1]) # 父节点路径
|
|
parent_name = path_parts[-2] # 父节点名称
|
|
|
|
# 获取父节点
|
|
parent_node = self.get_node_by_path(parent_path)
|
|
if parent_node:
|
|
# 查询父节点下所有类型为 FeeCollection 的子节点名称
|
|
query = """
|
|
MATCH (p)-[*1..1]->(f:FeeCollection)
|
|
WHERE p.name = $parent_name
|
|
RETURN f.name AS name
|
|
"""
|
|
params = {"parent_name": parent_name}
|
|
|
|
try:
|
|
result = self.session.run(query, **params)
|
|
for record in result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
except Exception as e:
|
|
helper_info = [f"查询父节点下的FeeCollection子节点时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
# 将节点属性转换为字典并过滤掉私有字段
|
|
for key, value in node_data.items():
|
|
if not key.startswith("_"):
|
|
data[key] = value
|
|
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
error_details = traceback.format_exc()
|
|
print(f"查询出错: {error_details}")
|
|
|
|
status = "error"
|
|
error = f"查询失败: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
def get_fee_template_by_parent_and_name(
|
|
self, parent_path: str, node_name: str
|
|
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
|
|
"""
|
|
通过父节点路径和节点名称获取取费表模板节点
|
|
|
|
Args:
|
|
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
|
|
例如:工程数据/取费表模板/余物清理/线路取费表(余物清理)
|
|
node_name (str): 目标节点的名称,例如:直接费
|
|
|
|
Returns:
|
|
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
|
|
- status (str): 状态,'success'或'error'
|
|
- data (Dict[str, Any]): 成功时返回的节点属性数据
|
|
- error (str): 错误时的错误信息
|
|
- helper_info (List[Any]): 辅助信息列表:
|
|
- 如果未找到父节点,则包含父节点的父节点下所有节点名称
|
|
- 如果未找到目标节点,则包含父节点下所有 FeeCollection 节点名称
|
|
- 否则为空
|
|
"""
|
|
# 初始化返回结果
|
|
status = "success"
|
|
data = {}
|
|
error = ""
|
|
helper_info = []
|
|
|
|
try:
|
|
# 第一段:查找父节点
|
|
parent_node_data = self.get_node_by_path(parent_path)
|
|
|
|
if not parent_node_data:
|
|
status = "error"
|
|
error = f"找不到父节点路径: {parent_path}"
|
|
|
|
# 提取父节点的父路径
|
|
parent_path_parts = parent_path.split("/")
|
|
if len(parent_path_parts) > 1:
|
|
grandparent_path = "/".join(parent_path_parts[:-1]) # 父节点的父路径
|
|
grandparent_name = parent_path_parts[-2] # 父节点的父节点名称
|
|
|
|
# 获取父节点的父节点
|
|
grandparent_node = self.get_node_by_path(grandparent_path)
|
|
if grandparent_node:
|
|
# 查询父节点的父节点下所有子节点名称
|
|
query = """
|
|
MATCH (gp)-[*1..1]->(n)
|
|
WHERE gp.name = $grandparent_name
|
|
RETURN n.name AS name
|
|
"""
|
|
params = {"grandparent_name": grandparent_name}
|
|
|
|
try:
|
|
result = self.session.run(query, **params)
|
|
for record in result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
except Exception as e:
|
|
helper_info = [f"查询父节点的父节点下的子节点时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
# 第二段:在父节点下查找目标节点
|
|
parent_node_name = parent_path.split("/")[-1] # 父节点名称
|
|
|
|
# 查询父节点下指定名称的节点
|
|
query = """
|
|
MATCH (p)-[*1..1]->(n)
|
|
WHERE p.name = $parent_name AND n.name = $node_name
|
|
RETURN n
|
|
"""
|
|
params = {"parent_name": parent_node_name, "node_name": node_name}
|
|
|
|
try:
|
|
result = self.session.run(query, **params)
|
|
target_node = None
|
|
|
|
for record in result:
|
|
target_node = record["n"]
|
|
break
|
|
|
|
if not target_node:
|
|
status = "error"
|
|
error = f"在父节点 {parent_node_name} 下找不到名称为 {node_name} 的节点"
|
|
|
|
# 查询父节点下所有类型为 FeeCollection 的子节点名称
|
|
query = """
|
|
MATCH (p)-[*1..1]->(f:FeeCollection)
|
|
WHERE p.name = $parent_name
|
|
RETURN f.name AS name
|
|
"""
|
|
params = {"parent_name": parent_node_name}
|
|
|
|
try:
|
|
result = self.session.run(query, **params)
|
|
for record in result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
except Exception as e:
|
|
helper_info = [f"查询父节点下的FeeCollection子节点时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
# 将目标节点属性转换为字典并过滤掉私有字段
|
|
for key, value in target_node.items():
|
|
if not key.startswith("_"):
|
|
data[key] = value
|
|
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
status = "error"
|
|
error = f"查询目标节点时出错: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
error_details = traceback.format_exc()
|
|
print(f"查询出错: {error_details}")
|
|
|
|
status = "error"
|
|
error = f"查询失败: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
# 辅助方法,用于创建取费表模板对象并填充属性
|
|
def _create_fee_template_object(self, node_data):
|
|
"""
|
|
根据节点数据创建取费表模板对象并填充属性
|
|
|
|
Args:
|
|
node_data (dict): 节点数据
|
|
|
|
Returns:
|
|
FeeTableTemplateItem: 创建的取费表模板对象
|
|
"""
|
|
template = FeeTableTemplateItem()
|
|
|
|
# 填充属性
|
|
for key, value in node_data.items():
|
|
if hasattr(template, key):
|
|
setattr(template, key, value)
|
|
|
|
# 更新缓存
|
|
if hasattr(template, "OutlayID") and template.OutlayID:
|
|
self.fee_templates[template.OutlayID] = template
|
|
|
|
return template
|
|
|
|
# 费用表查询方法实现
|
|
def get_fee_schedule_on_auxiliary_expense_table(
|
|
self, table_name: str, fee_name: str, fee_attribute: str
|
|
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
|
|
"""
|
|
在辅助费用表中查找费用
|
|
|
|
Args:
|
|
table_name (str): 费用表名称
|
|
fee_name (str): 要查找的费用名称(可能在多级子节点中)
|
|
fee_attribute (str): 费用值属性名
|
|
|
|
Returns:
|
|
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
|
|
- status (str): 状态,'success'或'error'
|
|
- data (Dict[str, Any]): 成功时返回包含属性值的数据
|
|
- error (str): 错误时的错误信息
|
|
- helper_info (List[Any]): 辅助信息列表:
|
|
- 第一步失败:返回"工程数据"节点下所有子节点名称
|
|
- 第二步失败:返回费用表节点下所有子节点名称(递归查找)
|
|
- 第三步失败:返回费用节点的所有可用属性名称
|
|
- 否则为空
|
|
"""
|
|
# 初始化返回结果
|
|
status = "success"
|
|
data = {}
|
|
error = ""
|
|
helper_info = []
|
|
|
|
try:
|
|
# 检查输入参数
|
|
if not table_name or not fee_name or not fee_attribute:
|
|
status = "error"
|
|
error = "输入参数不能为空"
|
|
return status, data, error, helper_info
|
|
|
|
# 第一步:查找父节点(费用表节点)
|
|
parent_path = f"工程数据/工程费用/{table_name}"
|
|
parent_node_data = self.get_node_by_path(parent_path)
|
|
|
|
if not parent_node_data:
|
|
status = "error"
|
|
error = f"找不到费用表节点: {parent_path}"
|
|
|
|
# 查询"工程数据/工程费用"节点下所有子节点名称
|
|
base_parent_node = self.get_node_by_path("工程数据/工程费用")
|
|
if base_parent_node:
|
|
query = """
|
|
MATCH (p)-[*1..1]->(n)
|
|
WHERE p.name = '工程费用'
|
|
RETURN n.name AS name
|
|
"""
|
|
|
|
try:
|
|
result = self.session.run(query)
|
|
for record in result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
except Exception as e:
|
|
helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
# 第二步:在费用表节点下查找费用节点(可能在多级子节点中)
|
|
fee_node = None
|
|
|
|
# 递归查找费用节点,最多查找3级深度
|
|
query = """
|
|
MATCH (p)-[*1..3]->(f)
|
|
WHERE p.name = $table_name AND f.name = $fee_name
|
|
RETURN f LIMIT 1
|
|
"""
|
|
params = {"table_name": table_name, "fee_name": fee_name}
|
|
|
|
try:
|
|
result = self.session.run(query, **params)
|
|
all_records = result.data()
|
|
|
|
if len(all_records) > 0:
|
|
fee_node = all_records[0]["f"]
|
|
|
|
if not fee_node:
|
|
status = "error"
|
|
error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}"
|
|
|
|
# 查询费用表节点下所有子节点名称(递归查找)
|
|
query = """
|
|
MATCH (p)-[*1..3]->(n)
|
|
WHERE p.name = $table_name
|
|
RETURN DISTINCT n.name AS name
|
|
ORDER BY n.name
|
|
"""
|
|
params = {"table_name": table_name}
|
|
|
|
try:
|
|
result = self.session.run(query, **params)
|
|
for record in result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
except Exception as e:
|
|
helper_info = [f"查询费用表下的子节点时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
status = "error"
|
|
error = f"查询费用节点时出错: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
# 第三步:获取费用节点的属性值
|
|
try:
|
|
if fee_node and hasattr(fee_node, "get"):
|
|
fee_value = fee_node.get(fee_attribute)
|
|
elif fee_node and isinstance(fee_node, dict):
|
|
fee_value = fee_node.get(fee_attribute)
|
|
else:
|
|
# 如果fee_node是Neo4j Node对象,尝试直接访问属性
|
|
fee_value = (
|
|
getattr(fee_node, fee_attribute, None)
|
|
if hasattr(fee_node, fee_attribute)
|
|
else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None
|
|
)
|
|
|
|
if fee_value is None:
|
|
status = "error"
|
|
error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}"
|
|
|
|
# 返回费用节点的所有可用属性名称
|
|
try:
|
|
if hasattr(fee_node, "keys"):
|
|
helper_info = list(fee_node.keys())
|
|
elif isinstance(fee_node, dict):
|
|
helper_info = list(fee_node.keys())
|
|
else:
|
|
# 对于Neo4j Node对象
|
|
helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else []
|
|
|
|
# 过滤掉私有属性
|
|
helper_info = [attr for attr in helper_info if not attr.startswith("_")]
|
|
|
|
except Exception as e:
|
|
helper_info = [f"获取费用节点属性列表时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
# 成功获取属性值,只返回属性值
|
|
data = fee_value
|
|
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
status = "error"
|
|
error = f"获取费用属性值时出错: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
error_details = traceback.format_exc()
|
|
print(f"查询出错: {error_details}")
|
|
|
|
status = "error"
|
|
error = f"查询失败: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
def get_fee_schedule_on_other_expense_table(
|
|
self, table_name: str, fee_name: str, fee_attribute: str
|
|
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
|
|
"""
|
|
在其它费用表中查找费用
|
|
|
|
Args:
|
|
table_name (str): 费用表名称
|
|
fee_name (str): 要查找的费用名称(可能在多级子节点中)
|
|
fee_attribute (str): 费用值属性名
|
|
|
|
Returns:
|
|
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
|
|
- status (str): 状态,'success'或'error'
|
|
- data (Dict[str, Any]): 成功时返回包含属性值的数据
|
|
- error (str): 错误时的错误信息
|
|
- helper_info (List[Any]): 辅助信息列表:
|
|
- 第一步失败:返回"工程数据"节点下所有子节点名称
|
|
- 第二步失败:返回费用表节点下所有子节点名称(递归查找)
|
|
- 第三步失败:返回费用节点的所有可用属性名称
|
|
- 否则为空
|
|
"""
|
|
# 初始化返回结果
|
|
status = "success"
|
|
data = {}
|
|
error = ""
|
|
helper_info = []
|
|
|
|
try:
|
|
# 检查输入参数
|
|
if not table_name or not fee_name or not fee_attribute:
|
|
status = "error"
|
|
error = "输入参数不能为空"
|
|
return status, data, error, helper_info
|
|
|
|
# 第一步:查找父节点(费用表节点)
|
|
parent_path = f"工程数据/工程费用/{table_name}"
|
|
parent_node_data = self.get_node_by_path(parent_path)
|
|
|
|
if not parent_node_data:
|
|
status = "error"
|
|
error = f"找不到费用表节点: {parent_path}"
|
|
|
|
# 查询"工程数据/工程费用"节点下所有子节点名称
|
|
base_parent_node = self.get_node_by_path("工程数据/工程费用")
|
|
if base_parent_node:
|
|
query = """
|
|
MATCH (p)-[*1..1]->(n)
|
|
WHERE p.name = '工程费用'
|
|
RETURN n.name AS name
|
|
"""
|
|
|
|
try:
|
|
result = self.session.run(query)
|
|
for record in result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
except Exception as e:
|
|
helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
# 第二步:在费用表节点下查找费用节点(可能在多级子节点中)
|
|
fee_node = None
|
|
|
|
# 递归查找费用节点,最多查找3级深度
|
|
query = """
|
|
MATCH (p)-[*1..3]->(f)
|
|
WHERE p.name = $table_name AND f.name = $fee_name
|
|
RETURN f LIMIT 1
|
|
"""
|
|
params = {"table_name": table_name, "fee_name": fee_name}
|
|
|
|
try:
|
|
result = self.session.run(query, **params)
|
|
all_records = result.data()
|
|
|
|
if len(all_records) > 0:
|
|
fee_node = all_records[0]["f"]
|
|
|
|
if not fee_node:
|
|
status = "error"
|
|
error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}"
|
|
|
|
# 查询费用表节点下所有子节点名称(递归查找)
|
|
query = """
|
|
MATCH (p)-[*1..3]->(n)
|
|
WHERE p.name = $table_name
|
|
RETURN DISTINCT n.name AS name
|
|
ORDER BY n.name
|
|
"""
|
|
params = {"table_name": table_name}
|
|
|
|
try:
|
|
result = self.session.run(query, **params)
|
|
for record in result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
except Exception as e:
|
|
helper_info = [f"查询费用表下的子节点时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
status = "error"
|
|
error = f"查询费用节点时出错: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
# 第三步:获取费用节点的属性值
|
|
try:
|
|
if fee_node and hasattr(fee_node, "get"):
|
|
fee_value = fee_node.get(fee_attribute)
|
|
elif fee_node and isinstance(fee_node, dict):
|
|
fee_value = fee_node.get(fee_attribute)
|
|
else:
|
|
# 如果fee_node是Neo4j Node对象,尝试直接访问属性
|
|
fee_value = (
|
|
getattr(fee_node, fee_attribute, None)
|
|
if hasattr(fee_node, fee_attribute)
|
|
else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None
|
|
)
|
|
|
|
if fee_value is None:
|
|
status = "error"
|
|
error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}"
|
|
|
|
# 返回费用节点的所有可用属性名称
|
|
try:
|
|
if hasattr(fee_node, "keys"):
|
|
helper_info = list(fee_node.keys())
|
|
elif isinstance(fee_node, dict):
|
|
helper_info = list(fee_node.keys())
|
|
else:
|
|
# 对于Neo4j Node对象
|
|
helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else []
|
|
|
|
# 过滤掉私有属性
|
|
helper_info = [attr for attr in helper_info if not attr.startswith("_")]
|
|
|
|
except Exception as e:
|
|
helper_info = [f"获取费用节点属性列表时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
# 成功获取属性值,只返回属性值
|
|
data = fee_value
|
|
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
status = "error"
|
|
error = f"获取费用属性值时出错: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
error_details = traceback.format_exc()
|
|
print(f"查询出错: {error_details}")
|
|
|
|
status = "error"
|
|
error = f"查询失败: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
def get_fee_schedule_on_land_acquisition_fee_table_table(
|
|
self, table_name: str, fee_name: str, fee_attribute: str
|
|
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
|
|
"""
|
|
在土地征用费表中查找费用
|
|
|
|
Args:
|
|
table_name (str): 费用表名称
|
|
fee_name (str): 要查找的费用名称(可能在多级子节点中)
|
|
fee_attribute (str): 费用值属性名
|
|
|
|
Returns:
|
|
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
|
|
- status (str): 状态,'success'或'error'
|
|
- data (Dict[str, Any]): 成功时返回包含属性值的数据
|
|
- error (str): 错误时的错误信息
|
|
- helper_info (List[Any]): 辅助信息列表:
|
|
- 第一步失败:返回"工程数据"节点下所有子节点名称
|
|
- 第二步失败:返回费用表节点下所有子节点名称(递归查找)
|
|
- 第三步失败:返回费用节点的所有可用属性名称
|
|
- 否则为空
|
|
"""
|
|
# 初始化返回结果
|
|
status = "success"
|
|
data = {}
|
|
error = ""
|
|
helper_info = []
|
|
|
|
try:
|
|
# 检查输入参数
|
|
if not table_name or not fee_name or not fee_attribute:
|
|
status = "error"
|
|
error = "输入参数不能为空"
|
|
return status, data, error, helper_info
|
|
|
|
# 第一步:查找父节点(费用表节点)
|
|
parent_path = f"工程数据/工程费用/{table_name}"
|
|
parent_node_data = self.get_node_by_path(parent_path)
|
|
|
|
if not parent_node_data:
|
|
status = "error"
|
|
error = f"找不到费用表节点: {parent_path}"
|
|
|
|
# 查询"工程数据/工程费用"节点下所有子节点名称
|
|
base_parent_node = self.get_node_by_path("工程数据/工程费用")
|
|
if base_parent_node:
|
|
query = """
|
|
MATCH (p)-[*1..1]->(n)
|
|
WHERE p.name = '工程费用'
|
|
RETURN n.name AS name
|
|
"""
|
|
|
|
try:
|
|
result = self.session.run(query)
|
|
for record in result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
except Exception as e:
|
|
helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
# 第二步:在费用表节点下查找费用节点(可能在多级子节点中)
|
|
fee_node = None
|
|
|
|
# 递归查找费用节点,最多查找3级深度
|
|
query = """
|
|
MATCH (p)-[*1..3]->(f)
|
|
WHERE p.name = $table_name AND f.name = $fee_name
|
|
RETURN f LIMIT 1
|
|
"""
|
|
params = {"table_name": table_name, "fee_name": fee_name}
|
|
|
|
try:
|
|
result = self.session.run(query, **params)
|
|
all_records = result.data()
|
|
|
|
if len(all_records) > 0:
|
|
fee_node = all_records[0]["f"]
|
|
|
|
if not fee_node:
|
|
status = "error"
|
|
error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}"
|
|
|
|
# 查询费用表节点下所有子节点名称(递归查找)
|
|
query = """
|
|
MATCH (p)-[*1..3]->(n)
|
|
WHERE p.name = $table_name
|
|
RETURN DISTINCT n.name AS name
|
|
ORDER BY n.name
|
|
"""
|
|
params = {"table_name": table_name}
|
|
|
|
try:
|
|
result = self.session.run(query, **params)
|
|
for record in result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
except Exception as e:
|
|
helper_info = [f"查询费用表下的子节点时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
status = "error"
|
|
error = f"查询费用节点时出错: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
# 第三步:获取费用节点的属性值
|
|
try:
|
|
if fee_node and hasattr(fee_node, "get"):
|
|
fee_value = fee_node.get(fee_attribute)
|
|
elif fee_node and isinstance(fee_node, dict):
|
|
fee_value = fee_node.get(fee_attribute)
|
|
else:
|
|
# 如果fee_node是Neo4j Node对象,尝试直接访问属性
|
|
fee_value = (
|
|
getattr(fee_node, fee_attribute, None)
|
|
if hasattr(fee_node, fee_attribute)
|
|
else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None
|
|
)
|
|
|
|
if fee_value is None:
|
|
status = "error"
|
|
error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}"
|
|
|
|
# 返回费用节点的所有可用属性名称
|
|
try:
|
|
if hasattr(fee_node, "keys"):
|
|
helper_info = list(fee_node.keys())
|
|
elif isinstance(fee_node, dict):
|
|
helper_info = list(fee_node.keys())
|
|
else:
|
|
# 对于Neo4j Node对象
|
|
helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else []
|
|
|
|
# 过滤掉私有属性
|
|
helper_info = [attr for attr in helper_info if not attr.startswith("_")]
|
|
|
|
except Exception as e:
|
|
helper_info = [f"获取费用节点属性列表时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
# 成功获取属性值,只返回属性值
|
|
data = fee_value
|
|
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
status = "error"
|
|
error = f"获取费用属性值时出错: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
error_details = traceback.format_exc()
|
|
print(f"查询出错: {error_details}")
|
|
|
|
status = "error"
|
|
error = f"查询失败: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
def get_fee_schedule_on_installation_price_difference_table(
|
|
self, table_name: str, fee_name: str, fee_attribute: str
|
|
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
|
|
"""
|
|
在安装价差表中查找费用
|
|
|
|
Args:
|
|
table_name (str): 费用表名称
|
|
fee_name (str): 要查找的费用名称(可能在多级子节点中)
|
|
fee_attribute (str): 费用值属性名
|
|
|
|
Returns:
|
|
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
|
|
- status (str): 状态,'success'或'error'
|
|
- data (Dict[str, Any]): 成功时返回包含属性值的数据
|
|
- error (str): 错误时的错误信息
|
|
- helper_info (List[Any]): 辅助信息列表:
|
|
- 第一步失败:返回"工程数据"节点下所有子节点名称
|
|
- 第二步失败:返回费用表节点下所有子节点名称(递归查找)
|
|
- 第三步失败:返回费用节点的所有可用属性名称
|
|
- 否则为空
|
|
"""
|
|
# 初始化返回结果
|
|
status = "success"
|
|
data = {}
|
|
error = ""
|
|
helper_info = []
|
|
|
|
try:
|
|
# 检查输入参数
|
|
if not table_name or not fee_name or not fee_attribute:
|
|
status = "error"
|
|
error = "输入参数不能为空"
|
|
return status, data, error, helper_info
|
|
|
|
# 第一步:查找父节点(费用表节点)
|
|
parent_path = f"工程数据/工程费用/{table_name}"
|
|
parent_node_data = self.get_node_by_path(parent_path)
|
|
|
|
if not parent_node_data:
|
|
status = "error"
|
|
error = f"找不到费用表节点: {parent_path}"
|
|
|
|
# 查询"工程数据/工程费用"节点下所有子节点名称
|
|
base_parent_node = self.get_node_by_path("工程数据/工程费用")
|
|
if base_parent_node:
|
|
query = """
|
|
MATCH (p)-[*1..1]->(n)
|
|
WHERE p.name = '工程费用'
|
|
RETURN n.name AS name
|
|
"""
|
|
|
|
try:
|
|
result = self.session.run(query)
|
|
for record in result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
except Exception as e:
|
|
helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
# 第二步:在费用表节点下查找费用节点(可能在多级子节点中)
|
|
fee_node = None
|
|
|
|
# 递归查找费用节点,最多查找3级深度
|
|
query = """
|
|
MATCH (p)-[*1..3]->(f)
|
|
WHERE p.name = $table_name AND f.name = $fee_name
|
|
RETURN f LIMIT 1
|
|
"""
|
|
params = {"table_name": table_name, "fee_name": fee_name}
|
|
|
|
try:
|
|
result = self.session.run(query, **params)
|
|
all_records = result.data()
|
|
|
|
if len(all_records) > 0:
|
|
fee_node = all_records[0]["f"]
|
|
|
|
if not fee_node:
|
|
status = "error"
|
|
error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}"
|
|
|
|
# 查询费用表节点下所有子节点名称(递归查找)
|
|
query = """
|
|
MATCH (p)-[*1..3]->(n)
|
|
WHERE p.name = $table_name
|
|
RETURN DISTINCT n.name AS name
|
|
ORDER BY n.name
|
|
"""
|
|
params = {"table_name": table_name}
|
|
|
|
try:
|
|
result = self.session.run(query, **params)
|
|
for record in result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
except Exception as e:
|
|
helper_info = [f"查询费用表下的子节点时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
status = "error"
|
|
error = f"查询费用节点时出错: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
# 第三步:获取费用节点的属性值
|
|
try:
|
|
if fee_node and hasattr(fee_node, "get"):
|
|
fee_value = fee_node.get(fee_attribute)
|
|
elif fee_node and isinstance(fee_node, dict):
|
|
fee_value = fee_node.get(fee_attribute)
|
|
else:
|
|
# 如果fee_node是Neo4j Node对象,尝试直接访问属性
|
|
fee_value = (
|
|
getattr(fee_node, fee_attribute, None)
|
|
if hasattr(fee_node, fee_attribute)
|
|
else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None
|
|
)
|
|
|
|
if fee_value is None:
|
|
status = "error"
|
|
error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}"
|
|
|
|
# 返回费用节点的所有可用属性名称
|
|
try:
|
|
if hasattr(fee_node, "keys"):
|
|
helper_info = list(fee_node.keys())
|
|
elif isinstance(fee_node, dict):
|
|
helper_info = list(fee_node.keys())
|
|
else:
|
|
# 对于Neo4j Node对象
|
|
helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else []
|
|
|
|
# 过滤掉私有属性
|
|
helper_info = [attr for attr in helper_info if not attr.startswith("_")]
|
|
|
|
except Exception as e:
|
|
helper_info = [f"获取费用节点属性列表时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
# 成功获取属性值,只返回属性值
|
|
data = fee_value
|
|
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
status = "error"
|
|
error = f"获取费用属性值时出错: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
error_details = traceback.format_exc()
|
|
print(f"查询出错: {error_details}")
|
|
|
|
status = "error"
|
|
error = f"查询失败: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
def get_fee_schedule_on_Engineering_Cost_table(
|
|
self, table_name: str, fee_name: str, fee_attribute: str
|
|
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
|
|
"""
|
|
在工程费用表中查找费用
|
|
|
|
Args:
|
|
table_name (str): 费用表名称
|
|
fee_name (str): 要查找的费用名称(可能在多级子节点中)
|
|
fee_attribute (str): 费用值属性名
|
|
|
|
Returns:
|
|
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
|
|
- status (str): 状态,'success'或'error'
|
|
- data (Dict[str, Any]): 成功时返回包含属性值的数据
|
|
- error (str): 错误时的错误信息
|
|
- helper_info (List[Any]): 辅助信息列表:
|
|
- 第一步失败:返回"工程数据"节点下所有子节点名称
|
|
- 第二步失败:返回费用表节点下所有子节点名称(递归查找)
|
|
- 第三步失败:返回费用节点的所有可用属性名称
|
|
- 否则为空
|
|
"""
|
|
# 初始化返回结果
|
|
status = "success"
|
|
data = {}
|
|
error = ""
|
|
helper_info = []
|
|
|
|
try:
|
|
# 检查输入参数
|
|
if not table_name or not fee_name or not fee_attribute:
|
|
status = "error"
|
|
error = "输入参数不能为空"
|
|
return status, data, error, helper_info
|
|
|
|
# 第一步:查找父节点(费用表节点)
|
|
parent_path = f"工程数据/工程费用/{table_name}"
|
|
parent_node_data = self.get_node_by_path(parent_path)
|
|
|
|
if not parent_node_data:
|
|
status = "error"
|
|
error = f"找不到费用表节点: {parent_path}"
|
|
|
|
# 查询"工程数据/工程费用"节点下所有子节点名称
|
|
base_parent_node = self.get_node_by_path("工程数据/工程费用")
|
|
if base_parent_node:
|
|
query = """
|
|
MATCH (p)-[*1..1]->(n)
|
|
WHERE p.name = '工程费用'
|
|
RETURN n.name AS name
|
|
"""
|
|
|
|
try:
|
|
result = self.session.run(query)
|
|
for record in result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
except Exception as e:
|
|
helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
# 第二步:在费用表节点下查找费用节点(可能在多级子节点中)
|
|
fee_node = None
|
|
|
|
# 递归查找费用节点,最多查找3级深度
|
|
query = """
|
|
MATCH (p)-[*1..3]->(f)
|
|
WHERE p.name = $table_name AND f.name = $fee_name
|
|
RETURN f LIMIT 1
|
|
"""
|
|
params = {"table_name": table_name, "fee_name": fee_name}
|
|
|
|
try:
|
|
result = self.session.run(query, **params)
|
|
all_records = result.data()
|
|
|
|
if len(all_records) > 0:
|
|
fee_node = all_records[0]["f"]
|
|
|
|
if not fee_node:
|
|
status = "error"
|
|
error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}"
|
|
|
|
# 查询费用表节点下所有子节点名称(递归查找)
|
|
query = """
|
|
MATCH (p)-[*1..3]->(n)
|
|
WHERE p.name = $table_name
|
|
RETURN DISTINCT n.name AS name
|
|
ORDER BY n.name
|
|
"""
|
|
params = {"table_name": table_name}
|
|
|
|
try:
|
|
result = self.session.run(query, **params)
|
|
for record in result:
|
|
if record["name"]:
|
|
helper_info.append(record["name"])
|
|
except Exception as e:
|
|
helper_info = [f"查询费用表下的子节点时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
status = "error"
|
|
error = f"查询费用节点时出错: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
# 第三步:获取费用节点的属性值
|
|
try:
|
|
if fee_node and hasattr(fee_node, "get"):
|
|
fee_value = fee_node.get(fee_attribute)
|
|
elif fee_node and isinstance(fee_node, dict):
|
|
fee_value = fee_node.get(fee_attribute)
|
|
else:
|
|
# 如果fee_node是Neo4j Node对象,尝试直接访问属性
|
|
fee_value = (
|
|
getattr(fee_node, fee_attribute, None)
|
|
if hasattr(fee_node, fee_attribute)
|
|
else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None
|
|
)
|
|
|
|
if fee_value is None:
|
|
status = "error"
|
|
error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}"
|
|
|
|
# 返回费用节点的所有可用属性名称
|
|
try:
|
|
if hasattr(fee_node, "keys"):
|
|
helper_info = list(fee_node.keys())
|
|
elif isinstance(fee_node, dict):
|
|
helper_info = list(fee_node.keys())
|
|
else:
|
|
# 对于Neo4j Node对象
|
|
helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else []
|
|
|
|
# 过滤掉私有属性
|
|
helper_info = [attr for attr in helper_info if not attr.startswith("_")]
|
|
|
|
except Exception as e:
|
|
helper_info = [f"获取费用节点属性列表时出错: {str(e)}"]
|
|
|
|
return status, data, error, helper_info
|
|
|
|
# 成功获取属性值,只返回属性值
|
|
data = fee_value
|
|
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
status = "error"
|
|
error = f"获取费用属性值时出错: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
error_details = traceback.format_exc()
|
|
print(f"查询出错: {error_details}")
|
|
|
|
status = "error"
|
|
error = f"查询失败: {str(e)}"
|
|
return status, data, error, helper_info
|
|
|
|
|
|
class ProjectBuilder:
|
|
"""
|
|
项目构建器
|
|
描述: 用于构建项目对象的构建器
|
|
"""
|
|
|
|
_instance = None
|
|
_driver = None # 存储Neo4j驱动实例
|
|
|
|
@staticmethod
|
|
def init_driver(neo4j_conf):
|
|
"""
|
|
初始化Neo4j驱动
|
|
|
|
Args:
|
|
neo4j_conf (dict): Neo4j配置信息,包含uri、username和password
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
if ProjectBuilder._driver is not None:
|
|
# 如果已经有驱动实例,先关闭它
|
|
ProjectBuilder._driver.close()
|
|
|
|
# 创建新的驱动实例
|
|
uri = neo4j_conf.get("uri")
|
|
username = neo4j_conf.get("username")
|
|
password = neo4j_conf.get("password")
|
|
ProjectBuilder._driver = GraphDatabase.driver(uri, auth=(username, password))
|
|
|
|
@staticmethod
|
|
def build():
|
|
"""
|
|
构建并返回项目实例,使用已初始化的驱动
|
|
|
|
Returns:
|
|
ProjectTookiItNeo4j: 创建的项目实例
|
|
"""
|
|
# 如果已经有实例,直接返回
|
|
if ProjectBuilder._instance is not None:
|
|
return ProjectBuilder._instance
|
|
|
|
# 检查驱动是否已初始化
|
|
if ProjectBuilder._driver is None:
|
|
raise ValueError("必须先调用ProjectBuilder.init_driver初始化Neo4j驱动")
|
|
|
|
# 创建新实例,使用已初始化的驱动
|
|
ProjectBuilder._instance = ProjectTookiItNeo4j(ProjectBuilder._driver)
|
|
return ProjectBuilder._instance
|
|
|
|
@staticmethod
|
|
def close():
|
|
"""
|
|
关闭当前项目实例和驱动的连接
|
|
"""
|
|
if ProjectBuilder._instance is not None:
|
|
ProjectBuilder._instance.close()
|
|
ProjectBuilder._instance = None
|
|
|
|
if ProjectBuilder._driver is not None:
|
|
ProjectBuilder._driver.close()
|
|
ProjectBuilder._driver = None
|
|
|
|
|
|
# 注册退出处理函数,确保程序退出时自动关闭连接
|
|
atexit.register(ProjectBuilder.close)
|