Files
langchain_projectagent/project_implementation.py
T
2025-06-24 08:34:57 +08:00

2266 lines
90 KiB
Python

from neo4j import GraphDatabase
from project import *
import atexit
from typing import Tuple, List, Dict, Any, Optional
import logging
logger = logging.getLogger("project_implementation")
class ProjectTookiItNeo4j(ProjectTookiIt):
"""
基于Neo4j数据库的项目类实现
"""
def __init__(self):
logger.info('开始初始化Neo4j连接')
"""
初始化Neo4j连接
Args:
uri (str): Neo4j数据库URI
user (str): 用户名
password (str): 密码
"""
uri = "bolt://localhost:7487"
user = "neo4j"
password = "password"
super().__init__()
self.driver = GraphDatabase.driver(uri, auth=(user, password))
self.session = self.driver.session()
# 初始化其他必要的数据结构
self.material_equipment_dict = {} # 材机字典,键为ID
self.fee_templates = {} # 取费表模板字典,键为ID
self.fee_schedules = {} # 费用表字典,键为ID
self.project_properties = {} # 工程属性字典
logger.info('Neo4j连接初始化完成')
def close(self):
logger.info('开始关闭数据库连接')
"""
关闭数据库连接
"""
if self.session:
self.session.close()
if self.driver:
self.driver.close()
logger.info('数据库连接已关闭')
def debug_path(self, path):
logger.info(f'开始调试路径: {path}')
path_parts = path.split("/")
print(f"调试路径: {path}")
# 检查每一级路径是否存在
for i in range(len(path_parts)):
partial_path = "/".join(path_parts[: i + 1])
query = """
MATCH (n)
WHERE n.name = $name
RETURN n.name as name
"""
result = self.session.run(query, name=path_parts[i])
record = result.single()
exists = "存在" if record else "不存在"
print(f" 节点 '{path_parts[i]}' {exists}")
logger.info(f'路径 {path} 调试完成')
# 通用节点查询方法
def get_node_by_path(self, path, node_labels=None):
logger.info(f'开始通过路径 {path} 获取节点对象')
"""
通过路径获取节点对象
Args:
path (str): 以'/'分隔的多级节点路径
node_labels (list): 节点标签列表,用于过滤结果
Returns:
dict|None: 节点数据,如果路径不存在返回None
"""
if not path:
logger.warning('输入路径为空,无法获取节点对象')
return None
# 分割路径为各个部分
path_parts = path.split("/")
# 构建查询
if len(path_parts) == 1:
# 只有一级路径,直接查询
if node_labels:
labels_str = ":" + "|:".join(node_labels)
query = f"""
MATCH (n{labels_str})
WHERE n.name = $name
RETURN n LIMIT 1
"""
else:
query = """
MATCH (n)
WHERE n.name = $name
RETURN n LIMIT 1
"""
params = {"name": path_parts[0]}
else:
# 多级路径,构建路径查询
last_part = path_parts[-1]
if node_labels:
labels_str = ":" + "|".join(node_labels)
query = f"""
MATCH path = (root)-[*]->(target{labels_str})
WHERE target.name = $last_part
RETURN target as n LIMIT 1
"""
else:
query = """
MATCH path = (root)-[*]->(target)
WHERE target.name = $last_part
RETURN target as n LIMIT 1
"""
params = {"last_part": last_part}
try:
result = self.session.run(query, **params)
record = result.single()
if not record:
logger.warning(f'路径 {path} 不存在,未找到节点对象')
return None
logger.info(f'成功通过路径 {path} 获取节点对象')
return record["n"]
except Exception as e:
logger.error(f'获取节点对象时出错: {e}')
print(f"获取节点对象时出错: {e}")
return None
# 项目划分查询方法
def get_division_item_by_path(self, path) -> Tuple[str, Dict[str, Any], str, List[Any]]:
logger.info(f'开始通过路径 {path} 获取项目划分对象')
"""
通过路径获取项目划分对象
Args:
path (str): 项目划分节点的路径,以'/'分隔的多级节点路径
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回的项目划分对象数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表,如果未找到节点,则包含父节点下所有ProjectDivisionItem节点
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 获取节点数据
node_data = self.get_node_by_path(path, ["ProjectDivisionItem"])
if not node_data:
status = "error"
error = f"找不到路径: {path} 上的ProjectDivisionItem节点"
logger.warning(error)
# 提取父路径
path_parts = path.split("/")
if len(path_parts) > 1:
# 去掉最后一个部分,得到父路径
parent_path = "/".join(path_parts[:-1])
parent_name = path_parts[-2] # 父节点名称
# 获取父节点
parent_node = self.get_node_by_path(parent_path)
if parent_node:
# 查询父节点下所有类型为ProjectDivisionItem的子节点
query = """
MATCH (p)-[*1..1]->(q:ProjectDivisionItem)
WHERE p.name = $parent_name
RETURN q.name as name
"""
params = {"parent_name": parent_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询父节点下的子节点时出错: {str(e)}"]
logger.error(helper_info[0])
logger.info(f'成功通过路径 {path} 获取项目划分对象')
return status, data, error, helper_info
# 创建项目划分对象并填充属性
item = ProjectDivisionItem()
for key, value in node_data.items():
if hasattr(item, key):
setattr(item, key, value)
# 将对象的属性转换为字典
for key, value in vars(item).items():
if not key.startswith("_"): # 排除私有属性
data[key] = value
return status, data, error, helper_info
except Exception as e:
logger.error(f'获取项目划分对象时出错: {e}')
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_division_node_by_parent_and_name(
self, parent_path, partial_name
) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]:
"""
通过父节点路径和模糊节点名称获取项目划分对象,包括子节点
执行两步查询:
1. 找到对应路径的父节点
2. 在父节点下查找名称中包含模糊名称的节点
Args:
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
partial_name (str): 目标节点的模糊或不完整名称
Returns:
Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (List[Dict[str, Any]]): 成功时返回的数据列表
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表
"""
# 初始化返回结果
status = "success"
data = []
error = ""
helper_info = []
# 检查部分名称是否为空
if not partial_name:
status = "error"
error = "节点名称不能为空"
return status, data, error, helper_info
try:
# 第一步:找到父节点
parent_node_data = self.get_node_by_path(parent_path)
# 如果找不到父节点
if not parent_node_data:
status = "error"
error = f"找不到父节点路径: {parent_path}"
# 提取父路径的父节点
path_parts = parent_path.split("/")
if len(path_parts) > 1:
# 去掉最后一个部分,得到父路径的父路径
grandparent_path = "/".join(path_parts[:-1])
grandparent_name = path_parts[-2] if len(path_parts) >= 2 else ""
# 查询父路径的父节点下所有类型为ProjectDivisionItem的节点,不指定关系类型
query = """
MATCH (p)-[*1..1]->(q:ProjectDivisionItem)
WHERE p.name = $grandparent_name
RETURN q.name as name
"""
params = {"grandparent_name": grandparent_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
# 如果没有找到任何节点,尝试更宽泛的查询
if not helper_info:
broader_query = """
MATCH (p {name: $grandparent_name})-[*1..2]->(q:ProjectDivisionItem)
RETURN q.name as name LIMIT 50
"""
broader_result = self.session.run(broader_query, **params)
for record in broader_result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
print(f"查询辅助信息时出错: {str(e)}")
helper_info = [f"查询辅助信息时出错: {str(e)}"]
return status, data, error, helper_info
# 获取父节点名称
parent_name = parent_node_data.get("name")
if not parent_name:
status = "error"
error = "父节点数据中没有name字段"
return status, data, error, helper_info
parent_name = parent_node_data.get("name", "")
# 第二步:改用父节点名称进行查询
query = """
MATCH (p {name: $parent_name})-[*1..1]-(n:ProjectDivisionItem)
WHERE toLower(n.name) CONTAINS toLower($partial_name)
RETURN n LIMIT 50
"""
params = {"parent_name": parent_name, "partial_name": partial_name.strip()}
# print(f"调试参数: parent_name='{parent_name}', partial_name='{partial_name}'")
result = self.session.run(query, **params)
records = list(result)
# print(f"调试返回: 找到{len(records)}条记录")
# 处理查询结果
items = []
for record in records:
node_data = record["n"]
item = ProjectDivisionItem()
for key, value in node_data.items():
if hasattr(item, key):
setattr(item, key, value)
items.append(vars(item))
# 如果没有找到匹配的节点
if not items:
status = "error"
error = f"在父节点 '{parent_name}' 下找不到名称包含 '{partial_name}' 的节点"
# 查询父节点下所有类型为ProjectDivisionItem的节点作为辅助信息,不指定关系类型
helper_query = """
MATCH (p)-[*1..1]->(q:ProjectDivisionItem)
WHERE p.id = $parent_id
RETURN q.name as name
"""
helper_params = {"parent_name": parent_name}
try:
helper_result = self.session.run(helper_query, **helper_params)
for record in helper_result:
if record["name"]:
helper_info.append(record["name"])
# 如果没有找到任何节点,尝试更宽泛的查询
if not helper_info:
broader_query = """
MATCH (p)-[*1..2]->(q:ProjectDivisionItem)
WHERE p.id = $parent_id
RETURN q.name as name LIMIT 50
"""
broader_result = self.session.run(broader_query, **helper_params)
for record in broader_result:
if record["name"]:
helper_info.append(record["name"])
# 如果仍然没有找到,尝试使用名称而不是ID
if not helper_info:
name_query = """
MATCH (p {name: $parent_name})-[*1..2]->(q:ProjectDivisionItem)
RETURN q.name as name LIMIT 50
"""
name_params = {"parent_name": parent_name}
name_result = self.session.run(name_query, **name_params)
for record in name_result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
print(f"查询辅助信息时出错: {str(e)}")
helper_info = [f"查询辅助信息时出错: {str(e)}"]
return status, data, error, helper_info
# 成功找到匹配节点
data = items
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_division_by_name(self, name_part) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]:
"""
直接查找节点类型为ProjectDivisionItem且name字段包含输入名称的节点
Args:
name_part (str): 节点名称的部分内容
Returns:
Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (List[Dict[str, Any]]): 成功时返回的数据列表
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表
"""
# 初始化返回结果
status = "success"
data = []
error = ""
helper_info = []
# 检查输入名称是否为空
if not name_part or name_part.strip() == "":
status = "error"
error = "输入的名称部分不能为空"
return status, data, error, helper_info
try:
# 直接查询所有类型为ProjectDivisionItem且name包含输入名称的节点
query = """
MATCH (n:ProjectDivisionItem)
WHERE toLower(n.name) CONTAINS toLower($name_part)
RETURN n LIMIT 50
"""
params = {"name_part": name_part.strip()}
result = self.session.run(query, parameters=params)
records = list(result)
if not records:
status = "error"
error = f"找不到名称包含 '{name_part}' 的ProjectDivisionItem节点"
# 提供一些可能相近的节点名称作为辅助信息
broader_query = """
MATCH (n:ProjectDivisionItem)
RETURN n.name AS name LIMIT 20
"""
broader_result = self.session.run(broader_query)
for record in broader_result:
if record["name"]:
helper_info.append(record["name"])
return status, data, error, helper_info
# 处理查询结果
items = []
for record in records:
node_data = record["n"]
item = ProjectDivisionItem()
for key, value in node_data.items():
if hasattr(item, key):
setattr(item, key, value)
# 获取节点的完整路径作为附加信息
node_path_query = """
MATCH path = (root)-[*]->(target:ProjectDivisionItem)
WHERE id(target) = $node_id
WITH [node in nodes(path) | node.name] AS path_names
RETURN path_names
"""
path_result = self.session.run(node_path_query, parameters={"node_id": node_data.id})
path_record = path_result.single()
node_dict = vars(item)
if path_record:
node_dict["完整路径"] = "/".join([name for name in path_record["path_names"] if name])
items.append(node_dict)
data = items
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
# 工程量查询方法
def get_quantities_by_paths(self, paths_str) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
获取指定项目路径下的工程量对象
Args:
paths_str (str): 以'/'分隔的多级节点路径
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]:
- status: 'success'或'error'
- data: 成功时返回的节点数据字典,失败时为{}
- error: 错误信息,成功时为''
- helper_info: 辅助信息列表(当目标节点不存在时返回父节点下的所有ProjectQuantity节点名称)
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
if not paths_str:
status = "error"
error = "路径不能为空"
return status, data, error, helper_info
try:
# 使用通用方法获取节点,考虑所有可能的工程量类型
node_data = self.get_node_by_path(paths_str, ["ProjectQuantity", "Quota", "MainMaterial", "Equipment"])
if node_data:
# 根据节点标签或类型属性创建对应类型的对象
quantity = self._create_quantity_object(node_data)
# 填充属性
for key, value in node_data.items():
if hasattr(quantity, key):
setattr(quantity, key, value)
# 将对象转为字典
data = vars(quantity)
return status, data, error, helper_info
# 如果找不到节点,尝试获取父节点下的所有ProjectQuantity节点作为辅助信息
path_parts = paths_str.split("/")
if len(path_parts) > 1:
parent_path = "/".join(path_parts[:-1])
target_name = path_parts[-1]
# 获取父节点
parent_node_data = self.get_node_by_path(parent_path, ["ProjectDivisionItem"])
if parent_node_data:
# 查询父节点下的所有符合条件的节点
query = """
MATCH (p)-[*1..1]->(n)
WHERE p.name = $parent_name AND
any(label IN labels(n) WHERE label IN $allowed_labels)
RETURN n.name as name, labels(n) as labels
"""
params = {
"parent_name": parent_node_data.get("name", ""),
"allowed_labels": ["ProjectQuantity", "Quota", "MainMaterial", "Equipment"],
}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append({record["name"]})
except Exception as e:
print(f"查询辅助信息时出错: {str(e)}")
helper_info = [f"查询辅助信息时出错: {str(e)}"]
status = "error"
error = f"找不到路径: {paths_str}"
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_quantities_node_by_parent_and_code(
self, parent_path, quantity_type=None, code=None
) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]:
"""
通过父节点路径和编码获取工程量对象(定额、主材或设备),包括子节点
执行三步查询:
1. 找到对应路径的父节点
2. 找到父节点下多级子节点中节点类型为参数中节点类型的所有节点
3. 在找到的节点中查找编码与传入编码匹配的节点并返回
Args:
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None表示所有类型)
code (str): 工程量编码,以'/'分隔的多个编码
Returns:
Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (List[Dict[str, Any]]): 成功时返回的数据列表
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表
"""
# 初始化返回结果
status = "success"
data = []
error = ""
helper_info = []
if not code:
status = "error"
error = "编码不能为空"
return status, data, error, helper_info
# 验证类型参数是否有效
valid_types = ["定额", "主材", "设备", None]
if quantity_type not in valid_types:
status = "error"
error = f"无效的工程量类型: '{quantity_type}'"
helper_info = valid_types
return status, data, error, helper_info
try:
# 步骤1: 找到对应路径的父节点
parent_node = self.get_node_by_path(parent_path)
if not parent_node:
status = "error"
error = f"找不到父节点路径: {parent_path}"
# 尝试提供辅助信息 - 路径的各部分是否存在
path_parts = parent_path.split("/")
existing_parts = []
for i in range(len(path_parts)):
partial_path = "/".join(path_parts[: i + 1])
if self.get_node_by_path(partial_path):
existing_parts.append(partial_path)
helper_info = existing_parts
return status, data, error, helper_info
# 提取父节点名称用于后续查询
parent_name = parent_node.get("name", "")
# 根据工程量类型确定类型条件
type_condition = ""
if quantity_type == "定额":
type_condition = "(q:ProjectQuantity:Quota OR q.类型 = '0')"
elif quantity_type == "主材":
type_condition = "(q:ProjectQuantity:MainMaterial OR q.类型 = '1')"
elif quantity_type == "设备":
type_condition = "(q:ProjectQuantity:Equipment OR q.类型 = '5')"
else:
type_condition = "q:ProjectQuantity"
# 步骤2: 查找父节点下所有的指定类型的子节点
child_query = f"""
MATCH (p)-[*1..10]->(q)
WHERE p.name = $parent_name AND {type_condition}
RETURN q
"""
child_params = {"parent_name": parent_name}
child_result = self.session.run(child_query, **child_params)
child_nodes = [record["q"] for record in child_result]
if not child_nodes:
status = "error"
error = f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 的节点"
# 尝试提供辅助信息 - 父节点下有哪些节点名称
helper_query = """
MATCH (p)-[*1..10]->(q)
WHERE p.name = $parent_name
RETURN DISTINCT q.name as node_name
LIMIT 20
"""
helper_result = self.session.run(helper_query, **child_params)
helper_info = [record["node_name"] for record in helper_result if record["node_name"]]
return status, data, error, helper_info
# 步骤3: 查找编码匹配的节点
code_conditions = []
code_parts = code.split("/")
for code_part in code_parts:
if code_part.strip():
code_conditions.append(code_part.strip())
if not code_conditions:
status = "error"
error = "提供的编码为空"
return status, data, error, helper_info
# 在 child_nodes 中直接查找编码匹配的节点
matching_nodes = []
for node in child_nodes:
node_code = node.get("编码", "")
if any(code_part in str(node_code) for code_part in code_conditions):
matching_nodes.append(node)
if not matching_nodes:
status = "error"
error = f"在父节点 '{parent_name}' 下找不到编码匹配的节点"
# 尝试提供辅助信息 - 所有指定类型的子节点的编码
helper_info = [node.get("编码") for node in child_nodes if node.get("编码")]
return status, data, error, helper_info
# 处理匹配的节点,转换为字典格式
result_data = []
for node in matching_nodes:
quantity = self._create_quantity_object(node, quantity_type)
for key, value in node.items():
if hasattr(quantity, key):
setattr(quantity, key, value)
attrs = {}
for key, value in vars(quantity).items():
if not key.startswith("_"): # 排除私有属性
attrs[key] = value
result_data.append(attrs)
data = result_data
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_quantities_node_by_parent_and_name(
self, parent_path, partial_name, quantity_type=None
) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]:
"""
通过父节点路径、模糊节点名称和类型获取工程量对象(主材或者设备),包括子节点
执行三步查询:
1. 找到对应路径的父节点
2. 找到父节点下多级子节点中节点类型为参数中节点类型的所有节点
3. 在找到的节点中找到名称中包含节点模糊名称的节点返回
Args:
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
partial_name (str): 目标节点的模糊或不完整名称
quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None表示所有类型)
Returns:
Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (List[Dict[str, Any]]): 成功时返回的数据列表
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表
"""
# 初始化返回结果
status = "success"
data = []
error = ""
helper_info = []
if not partial_name:
status = "error"
error = "节点名称不能为空"
return status, data, error, helper_info
# 验证类型参数是否有效
valid_types = ["定额", "主材", "设备", None]
if quantity_type not in valid_types:
status = "error"
error = f"无效的工程量类型: '{quantity_type}'"
helper_info = valid_types
return status, data, error, helper_info
try:
# 步骤1: 找到对应路径的父节点
parent_node = self.get_node_by_path(parent_path)
if not parent_node:
status = "error"
error = f"找不到父节点路径: {parent_path}"
# 尝试提供辅助信息 - 路径的各部分是否存在
path_parts = parent_path.split("/")
existing_parts = []
for i in range(len(path_parts)):
partial_path = "/".join(path_parts[: i + 1])
if self.get_node_by_path(partial_path):
existing_parts.append(partial_path)
helper_info = existing_parts
return status, data, error, helper_info
# 从父节点中提取名称,用于后续查询
parent_name = parent_node.get("name", "")
# 根据工程量类型确定类型条件,不使用标签过滤
type_condition = ""
if quantity_type == "定额":
# 定额类型可能是 ProjectQuantity+Quota 或者 类型='0'
type_condition = "(q:ProjectQuantity:Quota OR q.类型 = '0')"
elif quantity_type == "主材":
# 主材类型可能是 ProjectQuantity+MainMaterial 或者 类型='1'
type_condition = "(q:ProjectQuantity:MainMaterial OR q.类型 = '1')"
elif quantity_type == "设备":
# 设备类型可能是 ProjectQuantity+Equipment 或者 类型='5'
type_condition = "(q:ProjectQuantity:Equipment OR q.类型 = '5')"
else:
# 如果没有指定类型,则查询所有ProjectQuantity节点
type_condition = "q:ProjectQuantity"
# 步骤2: 找到父节点下所有的子节点,类型为指定类型的节点
# 使用父节点名称进行查询,不指定具体的关系类型
child_query = f"""
MATCH (p)-[*1..10]->(q)
WHERE p.name = $parent_name AND {type_condition}
RETURN q
"""
child_params = {"parent_name": parent_name}
child_result = self.session.run(child_query, **child_params)
child_nodes = [record["q"] for record in child_result]
if not child_nodes:
# 如果没有找到节点,尝试直接使用父节点路径的最后一部分进行查询
path_parts = parent_path.split("/")
last_part = path_parts[-1] if path_parts else ""
fallback_query = f"""
MATCH (p)-[*1..10]->(q)
WHERE p.name CONTAINS $last_part AND {type_condition}
RETURN q LIMIT 100
"""
fallback_params = {"last_part": last_part}
fallback_result = self.session.run(fallback_query, **fallback_params)
child_nodes = [record["q"] for record in fallback_result]
if not child_nodes:
status = "error"
error = f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 的节点"
# 尝试提供辅助信息 - 父节点下有哪些类型的节点
helper_query = """
MATCH (p)-[*1..10]->(q)
WHERE p.name = $parent_name
RETURN DISTINCT labels(q) as node_labels, q.类型 as node_type
LIMIT 20
"""
helper_params = {"parent_name": parent_name}
helper_result = self.session.run(helper_query, **helper_params)
helper_data = []
for record in helper_result:
labels = record["node_labels"] if record["node_labels"] else []
node_type = record["node_type"] if record["node_type"] else "未知"
helper_data.append(f"标签: {labels}, 类型: {node_type}")
helper_info = helper_data
return status, data, error, helper_info
# 步骤3: 在找到的节点中找到名称中包含节点模糊名称的节点
matching_nodes = []
available_nodes = [] # 用于存储所有可用节点的名称,作为辅助信息
for node in child_nodes:
node_name = node.get("name", "")
# 记录所有节点名称作为辅助信息
if node_name:
available_nodes.append(node_name)
if partial_name in str(node_name):
matching_nodes.append(node)
if not matching_nodes:
# 如果没有找到匹配的节点,尝试直接查询
direct_query = f"""
MATCH (q)
WHERE q.name CONTAINS $partial_name AND {type_condition}
RETURN q LIMIT 20
"""
direct_params = {"partial_name": partial_name}
direct_result = self.session.run(direct_query, **direct_params)
matching_nodes = [record["q"] for record in direct_result]
if not matching_nodes:
status = "error"
error = (
f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 且名称包含 '{partial_name}' 的节点"
)
# 提供辅助信息 - 该路径下所有可用的节点名称
helper_info = available_nodes
return status, data, error, helper_info
# 处理匹配的节点,将它们转换为对象
result_data = []
for matching_node in matching_nodes:
# 创建对应类型的对象
quantity = self._create_quantity_object(matching_node, quantity_type)
# 填充属性
for key, value in matching_node.items():
if hasattr(quantity, key):
setattr(quantity, key, value)
# 将对象的属性转换为字典
attrs = {}
for key, value in vars(quantity).items():
if not key.startswith("_"): # 排除私有属性
attrs[key] = value
result_data.append(attrs)
# 成功找到匹配节点
data = result_data
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
# 辅助方法,用于根据节点数据创建对应类型的工程量对象
def _create_quantity_object(self, node_data, quantity_type=None):
"""
根据节点数据创建对应类型的工程量对象
Args:
node_data (dict): 节点数据
quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None)
Returns:
ProjectQuantity: 创建的工程量对象
"""
# 如果指定了类型,直接创建对应类型的对象
if quantity_type == "定额":
return Ration()
elif quantity_type == "主材":
return Material()
elif quantity_type == "设备":
return Equipment()
# 如果没有指定类型,尝试通过节点属性或标签判断
if "类型" in node_data:
if node_data["类型"] == "0":
return Ration()
elif node_data["类型"] == "1":
return Material()
elif node_data["类型"] == "5":
return Equipment()
# 通过标签判断
labels = list(node_data.labels) if hasattr(node_data, "labels") else []
if "Quota" in labels:
return Ration()
elif "MainMaterial" in labels:
return Material()
elif "Equipment" in labels:
return Equipment()
# 默认返回基类对象
return ProjectQuantity()
# 材机查询方法实现
def get_material_equipment_by_path(self, path: str) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
通过路径获取材机对象(MaterialOrEquipment 类型节点)
Args:
path (str): 项目划分节点的路径,以'/'分隔的多级节点路径
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回的节点属性数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表:
- 如果未找到目标节点,则包含父节点下所有 MaterialOrEquipment 节点名称
- 否则为空
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 获取目标节点
node_data = self.get_node_by_path(path)
if not node_data:
status = "error"
error = f"找不到路径: {path} 上的 MaterialOrEquipment 节点"
# 提取父路径
path_parts = path.split("/")
if len(path_parts) > 1:
parent_path = "/".join(path_parts[:-1]) # 父节点路径
parent_name = path_parts[-2] # 父节点名称
# 获取父节点
parent_node = self.get_node_by_path(parent_path)
if parent_node:
# 查询父节点下所有类型为 MaterialOrEquipment 的子节点名称
query = """
MATCH (p)-[*1..1]->(m:MaterialOrEquipment)
WHERE p.name = $parent_name
RETURN m.name AS name
"""
params = {"parent_name": parent_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询父节点下的材机子节点时出错: {str(e)}"]
return status, data, error, helper_info
# 将节点属性转换为字典并过滤掉私有字段
for key, value in node_data.items():
if not key.startswith("_"):
data[key] = value
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_material_equipment_by_parent_and_name(
self, parent_path: str, partial_name: str
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
通过父节点路径和模糊名称获取材机对象
Args:
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
partial_name (str): 目标节点的模糊或不完整名称
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
"""
status = "success"
data = {}
error = ""
helper_info = []
try:
if not parent_path or not partial_name:
status = "error"
error = "父节点路径或要查找的名称不能为空"
return status, data, error, helper_info
# 第一步:查找父节点
parent_node_data = self.get_node_by_path(parent_path)
if not parent_node_data:
status = "error"
error = f"找不到父节点路径: {parent_path}"
# 获取祖父节点路径
path_parts = parent_path.split("/")
if len(path_parts) > 1:
grandparent_path = "/".join(path_parts[:-1])
grandparent_name = path_parts[-2]
# 查询祖父节点下的所有子节点名称作为 helper_info
query = """
MATCH (p)-[*1..1]->(child)
WHERE p.name = $grandparent_name
RETURN child.name AS name
"""
params = {"grandparent_name": grandparent_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询祖父节点下的子节点失败: {str(e)}"]
return status, data, error, helper_info
parent_id = parent_node_data.get("id")
if not parent_id:
status = "error"
error = f"父节点 {parent_path} 没有有效的 ID"
return status, data, error, helper_info
# 第二步:查找父节点下名称包含 partial_name 的 MaterialOrEquipment 节点
query = """
MATCH (p)-[*1..1]->(m:MaterialOrEquipment)
WHERE p.id = $parent_id AND m.name CONTAINS $partial_name
RETURN m LIMIT 20
"""
params = {"parent_id": parent_id, "partial_name": partial_name}
result = self.session.run(query, **params)
matched_nodes = []
for record in result:
node = record["m"]
properties = dict(node.items())
matched_nodes.append(properties)
if not matched_nodes:
status = "error"
error = f"在路径 {parent_path} 下没有找到名称包含 '{partial_name}' 的 MaterialOrEquipment 节点"
# 查询父节点下所有 MaterialOrEquipment 子节点名称作为 helper_info
query_helper = """
MATCH (p)-[*1..1]->(m:MaterialOrEquipment)
WHERE p.id = $parent_id
RETURN m.name AS name
"""
try:
result_helper = self.session.run(query_helper, parent_id=parent_id)
for record in result_helper:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询父节点下的 MaterialOrEquipment 子节点失败: {str(e)}"]
return status, data, error, helper_info
# 成功找到,返回第一个节点的数据
data = matched_nodes[0]
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
# 辅助方法,用于创建材机对象并填充属性
def _create_material_object(self, node_data):
"""
根据节点数据创建材机对象并填充属性
Args:
node_data (dict): 节点数据
Returns:
MaterialOrEquipment: 创建的材机对象
"""
material = MaterialOrEquipment()
# 填充属性
for key, value in node_data.items():
if hasattr(material, key):
setattr(material, key, value)
return material
# 取费表模板查询方法实现
def get_fee_template_by_path(self, path: str) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
通过路径获取取费表模板节点
Args:
path (str): 项目划分节点的路径,以'/'分隔的多级节点路径
例如:工程数据/取费表模板/余物清理/线路取费表(余物清理)/直接费
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回的节点属性数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表:
- 如果未找到目标节点,则包含父节点下所有 FeeCollection 节点名称
- 否则为空
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 获取目标节点
node_data = self.get_node_by_path(path)
if not node_data:
status = "error"
error = f"找不到路径: {path} 上的节点"
# 提取父路径
path_parts = path.split("/")
if len(path_parts) > 1:
parent_path = "/".join(path_parts[:-1]) # 父节点路径
parent_name = path_parts[-2] # 父节点名称
# 获取父节点
parent_node = self.get_node_by_path(parent_path)
if parent_node:
# 查询父节点下所有类型为 FeeCollection 的子节点名称
query = """
MATCH (p)-[*1..1]->(f:FeeCollection)
WHERE p.name = $parent_name
RETURN f.name AS name
"""
params = {"parent_name": parent_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询父节点下的FeeCollection子节点时出错: {str(e)}"]
return status, data, error, helper_info
# 将节点属性转换为字典并过滤掉私有字段
for key, value in node_data.items():
if not key.startswith("_"):
data[key] = value
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_fee_template_by_parent_and_name(
self, parent_path: str, node_name: str
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
通过父节点路径和节点名称获取取费表模板节点
Args:
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
例如:工程数据/取费表模板/余物清理/线路取费表(余物清理)
node_name (str): 目标节点的名称,例如:直接费
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回的节点属性数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表:
- 如果未找到父节点,则包含父节点的父节点下所有节点名称
- 如果未找到目标节点,则包含父节点下所有 FeeCollection 节点名称
- 否则为空
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 第一段:查找父节点
parent_node_data = self.get_node_by_path(parent_path)
if not parent_node_data:
status = "error"
error = f"找不到父节点路径: {parent_path}"
# 提取父节点的父路径
parent_path_parts = parent_path.split("/")
if len(parent_path_parts) > 1:
grandparent_path = "/".join(parent_path_parts[:-1]) # 父节点的父路径
grandparent_name = parent_path_parts[-2] # 父节点的父节点名称
# 获取父节点的父节点
grandparent_node = self.get_node_by_path(grandparent_path)
if grandparent_node:
# 查询父节点的父节点下所有子节点名称
query = """
MATCH (gp)-[*1..1]->(n)
WHERE gp.name = $grandparent_name
RETURN n.name AS name
"""
params = {"grandparent_name": grandparent_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询父节点的父节点下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
# 第二段:在父节点下查找目标节点
parent_node_name = parent_path.split("/")[-1] # 父节点名称
# 查询父节点下指定名称的节点
query = """
MATCH (p)-[*1..1]->(n)
WHERE p.name = $parent_name AND n.name = $node_name
RETURN n
"""
params = {"parent_name": parent_node_name, "node_name": node_name}
try:
result = self.session.run(query, **params)
target_node = None
for record in result:
target_node = record["n"]
break
if not target_node:
status = "error"
error = f"在父节点 {parent_node_name} 下找不到名称为 {node_name} 的节点"
# 查询父节点下所有类型为 FeeCollection 的子节点名称
query = """
MATCH (p)-[*1..1]->(f:FeeCollection)
WHERE p.name = $parent_name
RETURN f.name AS name
"""
params = {"parent_name": parent_node_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询父节点下的FeeCollection子节点时出错: {str(e)}"]
return status, data, error, helper_info
# 将目标节点属性转换为字典并过滤掉私有字段
for key, value in target_node.items():
if not key.startswith("_"):
data[key] = value
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询目标节点时出错: {str(e)}"
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
# 辅助方法,用于创建取费表模板对象并填充属性
def _create_fee_template_object(self, node_data):
"""
根据节点数据创建取费表模板对象并填充属性
Args:
node_data (dict): 节点数据
Returns:
FeeTableTemplateItem: 创建的取费表模板对象
"""
template = FeeTableTemplateItem()
# 填充属性
for key, value in node_data.items():
if hasattr(template, key):
setattr(template, key, value)
# 更新缓存
if hasattr(template, "OutlayID") and template.OutlayID:
self.fee_templates[template.OutlayID] = template
return template
# 费用表查询方法实现
def get_fee_schedule_on_auxiliary_expense_table(
self, table_name: str, fee_name: str, fee_attribute: str
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
在辅助费用表中查找费用
Args:
table_name (str): 费用表名称
fee_name (str): 要查找的费用名称(可能在多级子节点中)
fee_attribute (str): 费用值属性名
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回包含属性值的数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表:
- 第一步失败:返回"工程数据"节点下所有子节点名称
- 第二步失败:返回费用表节点下所有子节点名称(递归查找)
- 第三步失败:返回费用节点的所有可用属性名称
- 否则为空
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 检查输入参数
if not table_name or not fee_name or not fee_attribute:
status = "error"
error = "输入参数不能为空"
return status, data, error, helper_info
# 第一步:查找父节点(费用表节点)
parent_path = f"工程数据/工程费用/{table_name}"
parent_node_data = self.get_node_by_path(parent_path)
if not parent_node_data:
status = "error"
error = f"找不到费用表节点: {parent_path}"
# 查询"工程数据/工程费用"节点下所有子节点名称
base_parent_node = self.get_node_by_path("工程数据/工程费用")
if base_parent_node:
query = """
MATCH (p)-[*1..1]->(n)
WHERE p.name = '工程费用'
RETURN n.name AS name
"""
try:
result = self.session.run(query)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
# 第二步:在费用表节点下查找费用节点(可能在多级子节点中)
fee_node = None
# 递归查找费用节点,最多查找3级深度
query = """
MATCH (p)-[*1..3]->(f)
WHERE p.name = $table_name AND f.name = $fee_name
RETURN f LIMIT 1
"""
params = {"table_name": table_name, "fee_name": fee_name}
try:
result = self.session.run(query, **params)
all_records = result.data()
if len(all_records) > 0:
fee_node = all_records[0]["f"]
if not fee_node:
status = "error"
error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}"
# 查询费用表节点下所有子节点名称(递归查找)
query = """
MATCH (p)-[*1..3]->(n)
WHERE p.name = $table_name
RETURN DISTINCT n.name AS name
ORDER BY n.name
"""
params = {"table_name": table_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询费用表下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询费用节点时出错: {str(e)}"
return status, data, error, helper_info
# 第三步:获取费用节点的属性值
try:
if fee_node and hasattr(fee_node, "get"):
fee_value = fee_node.get(fee_attribute)
elif fee_node and isinstance(fee_node, dict):
fee_value = fee_node.get(fee_attribute)
else:
# 如果fee_node是Neo4j Node对象,尝试直接访问属性
fee_value = (
getattr(fee_node, fee_attribute, None)
if hasattr(fee_node, fee_attribute)
else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None
)
if fee_value is None:
status = "error"
error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}"
# 返回费用节点的所有可用属性名称
try:
if hasattr(fee_node, "keys"):
helper_info = list(fee_node.keys())
elif isinstance(fee_node, dict):
helper_info = list(fee_node.keys())
else:
# 对于Neo4j Node对象
helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else []
# 过滤掉私有属性
helper_info = [attr for attr in helper_info if not attr.startswith("_")]
except Exception as e:
helper_info = [f"获取费用节点属性列表时出错: {str(e)}"]
return status, data, error, helper_info
# 成功获取属性值,只返回属性值
data = fee_value
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"获取费用属性值时出错: {str(e)}"
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_fee_schedule_on_other_expense_table(
self, table_name: str, fee_name: str, fee_attribute: str
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
在其它费用表中查找费用
Args:
table_name (str): 费用表名称
fee_name (str): 要查找的费用名称(可能在多级子节点中)
fee_attribute (str): 费用值属性名
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回包含属性值的数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表:
- 第一步失败:返回"工程数据"节点下所有子节点名称
- 第二步失败:返回费用表节点下所有子节点名称(递归查找)
- 第三步失败:返回费用节点的所有可用属性名称
- 否则为空
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 检查输入参数
if not table_name or not fee_name or not fee_attribute:
status = "error"
error = "输入参数不能为空"
return status, data, error, helper_info
# 第一步:查找父节点(费用表节点)
parent_path = f"工程数据/工程费用/{table_name}"
parent_node_data = self.get_node_by_path(parent_path)
if not parent_node_data:
status = "error"
error = f"找不到费用表节点: {parent_path}"
# 查询"工程数据/工程费用"节点下所有子节点名称
base_parent_node = self.get_node_by_path("工程数据/工程费用")
if base_parent_node:
query = """
MATCH (p)-[*1..1]->(n)
WHERE p.name = '工程费用'
RETURN n.name AS name
"""
try:
result = self.session.run(query)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
# 第二步:在费用表节点下查找费用节点(可能在多级子节点中)
fee_node = None
# 递归查找费用节点,最多查找3级深度
query = """
MATCH (p)-[*1..3]->(f)
WHERE p.name = $table_name AND f.name = $fee_name
RETURN f LIMIT 1
"""
params = {"table_name": table_name, "fee_name": fee_name}
try:
result = self.session.run(query, **params)
all_records = result.data()
if len(all_records) > 0:
fee_node = all_records[0]["f"]
if not fee_node:
status = "error"
error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}"
# 查询费用表节点下所有子节点名称(递归查找)
query = """
MATCH (p)-[*1..3]->(n)
WHERE p.name = $table_name
RETURN DISTINCT n.name AS name
ORDER BY n.name
"""
params = {"table_name": table_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询费用表下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询费用节点时出错: {str(e)}"
return status, data, error, helper_info
# 第三步:获取费用节点的属性值
try:
if fee_node and hasattr(fee_node, "get"):
fee_value = fee_node.get(fee_attribute)
elif fee_node and isinstance(fee_node, dict):
fee_value = fee_node.get(fee_attribute)
else:
# 如果fee_node是Neo4j Node对象,尝试直接访问属性
fee_value = (
getattr(fee_node, fee_attribute, None)
if hasattr(fee_node, fee_attribute)
else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None
)
if fee_value is None:
status = "error"
error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}"
# 返回费用节点的所有可用属性名称
try:
if hasattr(fee_node, "keys"):
helper_info = list(fee_node.keys())
elif isinstance(fee_node, dict):
helper_info = list(fee_node.keys())
else:
# 对于Neo4j Node对象
helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else []
# 过滤掉私有属性
helper_info = [attr for attr in helper_info if not attr.startswith("_")]
except Exception as e:
helper_info = [f"获取费用节点属性列表时出错: {str(e)}"]
return status, data, error, helper_info
# 成功获取属性值,只返回属性值
data = fee_value
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"获取费用属性值时出错: {str(e)}"
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_fee_schedule_on_land_acquisition_fee_table_table(
self, table_name: str, fee_name: str, fee_attribute: str
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
在土地征用费表中查找费用
Args:
table_name (str): 费用表名称
fee_name (str): 要查找的费用名称(可能在多级子节点中)
fee_attribute (str): 费用值属性名
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回包含属性值的数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表:
- 第一步失败:返回"工程数据"节点下所有子节点名称
- 第二步失败:返回费用表节点下所有子节点名称(递归查找)
- 第三步失败:返回费用节点的所有可用属性名称
- 否则为空
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 检查输入参数
if not table_name or not fee_name or not fee_attribute:
status = "error"
error = "输入参数不能为空"
return status, data, error, helper_info
# 第一步:查找父节点(费用表节点)
parent_path = f"工程数据/工程费用/{table_name}"
parent_node_data = self.get_node_by_path(parent_path)
if not parent_node_data:
status = "error"
error = f"找不到费用表节点: {parent_path}"
# 查询"工程数据/工程费用"节点下所有子节点名称
base_parent_node = self.get_node_by_path("工程数据/工程费用")
if base_parent_node:
query = """
MATCH (p)-[*1..1]->(n)
WHERE p.name = '工程费用'
RETURN n.name AS name
"""
try:
result = self.session.run(query)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
# 第二步:在费用表节点下查找费用节点(可能在多级子节点中)
fee_node = None
# 递归查找费用节点,最多查找3级深度
query = """
MATCH (p)-[*1..3]->(f)
WHERE p.name = $table_name AND f.name = $fee_name
RETURN f LIMIT 1
"""
params = {"table_name": table_name, "fee_name": fee_name}
try:
result = self.session.run(query, **params)
all_records = result.data()
if len(all_records) > 0:
fee_node = all_records[0]["f"]
if not fee_node:
status = "error"
error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}"
# 查询费用表节点下所有子节点名称(递归查找)
query = """
MATCH (p)-[*1..3]->(n)
WHERE p.name = $table_name
RETURN DISTINCT n.name AS name
ORDER BY n.name
"""
params = {"table_name": table_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询费用表下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询费用节点时出错: {str(e)}"
return status, data, error, helper_info
# 第三步:获取费用节点的属性值
try:
if fee_node and hasattr(fee_node, "get"):
fee_value = fee_node.get(fee_attribute)
elif fee_node and isinstance(fee_node, dict):
fee_value = fee_node.get(fee_attribute)
else:
# 如果fee_node是Neo4j Node对象,尝试直接访问属性
fee_value = (
getattr(fee_node, fee_attribute, None)
if hasattr(fee_node, fee_attribute)
else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None
)
if fee_value is None:
status = "error"
error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}"
# 返回费用节点的所有可用属性名称
try:
if hasattr(fee_node, "keys"):
helper_info = list(fee_node.keys())
elif isinstance(fee_node, dict):
helper_info = list(fee_node.keys())
else:
# 对于Neo4j Node对象
helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else []
# 过滤掉私有属性
helper_info = [attr for attr in helper_info if not attr.startswith("_")]
except Exception as e:
helper_info = [f"获取费用节点属性列表时出错: {str(e)}"]
return status, data, error, helper_info
# 成功获取属性值,只返回属性值
data = fee_value
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"获取费用属性值时出错: {str(e)}"
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_fee_schedule_on_installation_price_difference_table(
self, table_name: str, fee_name: str, fee_attribute: str
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
在安装价差表中查找费用
Args:
table_name (str): 费用表名称
fee_name (str): 要查找的费用名称(可能在多级子节点中)
fee_attribute (str): 费用值属性名
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回包含属性值的数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表:
- 第一步失败:返回"工程数据"节点下所有子节点名称
- 第二步失败:返回费用表节点下所有子节点名称(递归查找)
- 第三步失败:返回费用节点的所有可用属性名称
- 否则为空
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 检查输入参数
if not table_name or not fee_name or not fee_attribute:
status = "error"
error = "输入参数不能为空"
return status, data, error, helper_info
# 第一步:查找父节点(费用表节点)
parent_path = f"工程数据/工程费用/{table_name}"
parent_node_data = self.get_node_by_path(parent_path)
if not parent_node_data:
status = "error"
error = f"找不到费用表节点: {parent_path}"
# 查询"工程数据/工程费用"节点下所有子节点名称
base_parent_node = self.get_node_by_path("工程数据/工程费用")
if base_parent_node:
query = """
MATCH (p)-[*1..1]->(n)
WHERE p.name = '工程费用'
RETURN n.name AS name
"""
try:
result = self.session.run(query)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
# 第二步:在费用表节点下查找费用节点(可能在多级子节点中)
fee_node = None
# 递归查找费用节点,最多查找3级深度
query = """
MATCH (p)-[*1..3]->(f)
WHERE p.name = $table_name AND f.name = $fee_name
RETURN f LIMIT 1
"""
params = {"table_name": table_name, "fee_name": fee_name}
try:
result = self.session.run(query, **params)
all_records = result.data()
if len(all_records) > 0:
fee_node = all_records[0]["f"]
if not fee_node:
status = "error"
error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}"
# 查询费用表节点下所有子节点名称(递归查找)
query = """
MATCH (p)-[*1..3]->(n)
WHERE p.name = $table_name
RETURN DISTINCT n.name AS name
ORDER BY n.name
"""
params = {"table_name": table_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询费用表下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询费用节点时出错: {str(e)}"
return status, data, error, helper_info
# 第三步:获取费用节点的属性值
try:
if fee_node and hasattr(fee_node, "get"):
fee_value = fee_node.get(fee_attribute)
elif fee_node and isinstance(fee_node, dict):
fee_value = fee_node.get(fee_attribute)
else:
# 如果fee_node是Neo4j Node对象,尝试直接访问属性
fee_value = (
getattr(fee_node, fee_attribute, None)
if hasattr(fee_node, fee_attribute)
else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None
)
if fee_value is None:
status = "error"
error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}"
# 返回费用节点的所有可用属性名称
try:
if hasattr(fee_node, "keys"):
helper_info = list(fee_node.keys())
elif isinstance(fee_node, dict):
helper_info = list(fee_node.keys())
else:
# 对于Neo4j Node对象
helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else []
# 过滤掉私有属性
helper_info = [attr for attr in helper_info if not attr.startswith("_")]
except Exception as e:
helper_info = [f"获取费用节点属性列表时出错: {str(e)}"]
return status, data, error, helper_info
# 成功获取属性值,只返回属性值
data = fee_value
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"获取费用属性值时出错: {str(e)}"
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_fee_schedule_on_Engineering_Cost_table(
self, table_name: str, fee_name: str, fee_attribute: str
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
在工程费用表中查找费用
Args:
table_name (str): 费用表名称
fee_name (str): 要查找的费用名称(可能在多级子节点中)
fee_attribute (str): 费用值属性名
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回包含属性值的数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表:
- 第一步失败:返回"工程数据"节点下所有子节点名称
- 第二步失败:返回费用表节点下所有子节点名称(递归查找)
- 第三步失败:返回费用节点的所有可用属性名称
- 否则为空
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 检查输入参数
if not table_name or not fee_name or not fee_attribute:
status = "error"
error = "输入参数不能为空"
return status, data, error, helper_info
# 第一步:查找父节点(费用表节点)
parent_path = f"工程数据/工程费用/{table_name}"
parent_node_data = self.get_node_by_path(parent_path)
if not parent_node_data:
status = "error"
error = f"找不到费用表节点: {parent_path}"
# 查询"工程数据/工程费用"节点下所有子节点名称
base_parent_node = self.get_node_by_path("工程数据/工程费用")
if base_parent_node:
query = """
MATCH (p)-[*1..1]->(n)
WHERE p.name = '工程费用'
RETURN n.name AS name
"""
try:
result = self.session.run(query)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
# 第二步:在费用表节点下查找费用节点(可能在多级子节点中)
fee_node = None
# 递归查找费用节点,最多查找3级深度
query = """
MATCH (p)-[*1..3]->(f)
WHERE p.name = $table_name AND f.name = $fee_name
RETURN f LIMIT 1
"""
params = {"table_name": table_name, "fee_name": fee_name}
try:
result = self.session.run(query, **params)
all_records = result.data()
if len(all_records) > 0:
fee_node = all_records[0]["f"]
if not fee_node:
status = "error"
error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}"
# 查询费用表节点下所有子节点名称(递归查找)
query = """
MATCH (p)-[*1..3]->(n)
WHERE p.name = $table_name
RETURN DISTINCT n.name AS name
ORDER BY n.name
"""
params = {"table_name": table_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询费用表下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询费用节点时出错: {str(e)}"
return status, data, error, helper_info
# 第三步:获取费用节点的属性值
try:
if fee_node and hasattr(fee_node, "get"):
fee_value = fee_node.get(fee_attribute)
elif fee_node and isinstance(fee_node, dict):
fee_value = fee_node.get(fee_attribute)
else:
# 如果fee_node是Neo4j Node对象,尝试直接访问属性
fee_value = (
getattr(fee_node, fee_attribute, None)
if hasattr(fee_node, fee_attribute)
else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None
)
if fee_value is None:
status = "error"
error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}"
# 返回费用节点的所有可用属性名称
try:
if hasattr(fee_node, "keys"):
helper_info = list(fee_node.keys())
elif isinstance(fee_node, dict):
helper_info = list(fee_node.keys())
else:
# 对于Neo4j Node对象
helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else []
# 过滤掉私有属性
helper_info = [attr for attr in helper_info if not attr.startswith("_")]
except Exception as e:
helper_info = [f"获取费用节点属性列表时出错: {str(e)}"]
return status, data, error, helper_info
# 成功获取属性值,只返回属性值
data = fee_value
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"获取费用属性值时出错: {str(e)}"
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
class ProjectBuilder:
"""
项目构建器
描述: 用于构建项目对象的构建器
"""
_instance = None
@staticmethod
def build():
"""
构建并返回项目实例
Returns:
ProjectTookiItNeo4j: 创建的项目实例
"""
# 如果已经有实例,先关闭它
if ProjectBuilder._instance is not None:
ProjectBuilder._instance.close()
# 创建新实例
ProjectBuilder._instance = ProjectTookiItNeo4j()
return ProjectBuilder._instance
@staticmethod
def close():
"""
关闭当前项目实例的连接
"""
if ProjectBuilder._instance is not None:
ProjectBuilder._instance.close()
ProjectBuilder._instance = None
# 注册退出处理函数,确保程序退出时自动关闭连接
atexit.register(ProjectBuilder.close)
# project = ProjectBuilder.build()
# status, data, error, helper_info = project.get_division_by_name("安装工程")
# print(status)
# print(data)
# print(error)
# print(helper_info)