Files
langchain_projectagent/project_implementation.py
T
2025-06-27 22:36:20 +08:00

3062 lines
121 KiB
Python

from neo4j import GraphDatabase
from project import *
import atexit
from typing import Tuple, List, Dict, Any, Optional
import logging
from src.config import Config
logger = logging.getLogger("project_implementation")
config = Config()
class ProjectToolkitNeo4j(ProjectToolkit):
"""
基于Neo4j数据库的项目类实现
"""
def __init__(self, neo4j_driver):
logger.info("开始初始化Neo4j连接")
"""
初始化Neo4j连接
Args:
neo4j_driver: Neo4j驱动实例,必须提供
"""
if neo4j_driver is None:
raise ValueError("必须提供Neo4j驱动实例")
super().__init__()
# 保存驱动实例
self.driver = neo4j_driver
self.external_driver = True # 标记为外部驱动,不在close时关闭
self.session = self.driver.session()
# 初始化其他必要的数据结构
self.material_equipment_dict = {} # 材机字典,键为ID
self.fee_templates = {} # 取费表模板字典,键为ID
self.fee_schedules = {} # 费用表字典,键为ID
self.project_properties = {} # 工程属性字典
logger.info("Neo4j连接初始化完成")
def close(self):
logger.info("开始关闭数据库连接")
"""
关闭数据库连接
"""
if self.session:
self.session.close()
# 仅在非外部驱动的情况下关闭driver
if self.driver and not self.external_driver:
self.driver.close()
logger.info("数据库连接已关闭")
def debug_path(self, path):
logger.info(f"开始调试路径: {path}")
path_parts = path.split("/")
print(f"调试路径: {path}")
# 检查每一级路径是否存在
for i in range(len(path_parts)):
partial_path = "/".join(path_parts[: i + 1])
query = """
MATCH (n)
WHERE n.name = $name
RETURN n.name as name
"""
result = self.session.run(query, name=path_parts[i])
record = result.single()
exists = "存在" if record else "不存在"
print(f" 节点 '{path_parts[i]}' {exists}")
logger.info(f"路径 {path} 调试完成")
# 通用节点查询方法
def get_node_by_path(self, path, node_labels=None):
logger.info(f"开始通过路径 {path} 获取节点对象")
"""
通过路径获取节点对象
Args:
path (str): 以'/'分隔的多级节点路径
node_labels (list): 节点标签列表,用于过滤结果
Returns:
dict|None: 节点数据,如果路径不存在返回None
"""
if not path:
logger.warning("输入路径为空,无法获取节点对象")
return None
# 分割路径为各个部分
path_parts = path.split("/")
# 构建查询
if len(path_parts) == 1:
# 只有一级路径,直接查询
if node_labels:
labels_str = ":" + "|:".join(node_labels)
query = f"""
MATCH (n{labels_str})
WHERE n.name = $name
RETURN n LIMIT 1
"""
else:
query = """
MATCH (n)
WHERE n.name = $name
RETURN n LIMIT 1
"""
params = {"name": path_parts[0]}
else:
# 多级路径,构建路径查询
last_part = path_parts[-1]
if node_labels:
labels_str = ":" + "|".join(node_labels)
query = f"""
MATCH path = (root)-[*]->(target{labels_str})
WHERE target.name = $last_part
RETURN target as n LIMIT 1
"""
else:
query = """
MATCH path = (root)-[*]->(target)
WHERE target.name = $last_part
RETURN target as n LIMIT 1
"""
params = {"last_part": last_part}
try:
result = self.session.run(query, **params)
record = result.single()
if not record:
logger.warning(f"路径 {path} 不存在,未找到节点对象")
return None
logger.info(f"成功通过路径 {path} 获取节点对象")
return record["n"]
except Exception as e:
logger.error(f"获取节点对象时出错: {e}")
print(f"获取节点对象时出错: {e}")
return None
# 项目划分查询方法
# 项目划分查询方法
def get_division_item_by_path(self, path) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
通过路径获取项目划分对象
Args:
path (str): 项目划分节点的路径,以'/'分隔的多级节点路径
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回的项目划分对象数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表,如果未找到节点,则包含父节点下所有ProjectDivisionItem节点
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 获取节点数据
node_data = self.get_node_by_path(path, ["ProjectDivisionItem"])
if not node_data:
status = "error"
error = f"找不到路径: {path} 上的ProjectDivisionItem节点"
# 提取父路径
path_parts = path.split("/")
if len(path_parts) > 1:
# 去掉最后一个部分,得到父路径
parent_path = "/".join(path_parts[:-1])
parent_name = path_parts[-2] # 父节点名称
# 获取父节点
parent_node = self.get_node_by_path(parent_path)
if parent_node:
# 查询父节点下所有类型为ProjectDivisionItem的子节点
query = """
MATCH (p)-[*1..1]->(q:ProjectDivisionItem)
WHERE p.name = $parent_name
RETURN q.name as name
"""
params = {"parent_name": parent_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询父节点下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
# 创建项目划分对象并填充属性
item = ProjectDivisionItem()
for key, value in node_data.items():
if hasattr(item, key):
setattr(item, key, value)
# 将对象的属性转换为字典
for key, value in vars(item).items():
if not key.startswith("_"): # 排除私有属性
data[key] = value
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_division_node_by_parent_and_name(
self, parent_path, partial_name
) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]:
"""
通过父节点路径和模糊节点名称获取项目划分对象,包括子节点
执行两步查询:
1. 找到对应路径的父节点
2. 在父节点下查找名称中包含模糊名称的节点
Args:
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
partial_name (str): 目标节点的模糊或不完整名称
Returns:
Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (List[Dict[str, Any]]): 成功时返回的数据列表
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表
"""
# 初始化返回结果
status = "success"
data = []
error = ""
helper_info = []
# 检查部分名称是否为空
if not partial_name:
status = "error"
error = "节点名称不能为空"
return status, data, error, helper_info
try:
# 第一步:找到父节点
parent_node_data = self.get_node_by_path(parent_path)
# 如果找不到父节点
if not parent_node_data:
status = "error"
error = f"找不到父节点路径: {parent_path}"
# 提取父路径的父节点
path_parts = parent_path.split("/")
if len(path_parts) > 1:
# 去掉最后一个部分,得到父路径的父路径
grandparent_path = "/".join(path_parts[:-1])
grandparent_name = path_parts[-2] if len(path_parts) >= 2 else ""
# 查询父路径的父节点下所有类型为ProjectDivisionItem的节点,不指定关系类型
query = """
MATCH (p)-[*1..1]->(q:ProjectDivisionItem)
WHERE p.name = $grandparent_name
RETURN q.name as name
"""
params = {"grandparent_name": grandparent_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
# 如果没有找到任何节点,尝试更宽泛的查询
if not helper_info:
broader_query = """
MATCH (p {name: $grandparent_name})-[*1..2]->(q:ProjectDivisionItem)
RETURN q.name as name LIMIT 50
"""
broader_result = self.session.run(broader_query, **params)
for record in broader_result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
print(f"查询辅助信息时出错: {str(e)}")
helper_info = [f"查询辅助信息时出错: {str(e)}"]
return status, data, error, helper_info
# 获取父节点名称
parent_name = parent_node_data.get("name")
if not parent_name:
status = "error"
error = "父节点数据中没有name字段"
return status, data, error, helper_info
parent_name = parent_node_data.get("name", "")
# 第二步:改用父节点名称进行查询
query = """
MATCH (p {name: $parent_name})-[*1..1]-(n:ProjectDivisionItem)
WHERE toLower(n.name) CONTAINS toLower($partial_name)
RETURN n LIMIT 50
"""
params = {"parent_name": parent_name, "partial_name": partial_name.strip()}
# print(f"调试参数: parent_name='{parent_name}', partial_name='{partial_name}'")
result = self.session.run(query, **params)
records = list(result)
# print(f"调试返回: 找到{len(records)}条记录")
# 处理查询结果
items = []
for record in records:
node_data = record["n"]
item = ProjectDivisionItem()
for key, value in node_data.items():
if hasattr(item, key):
setattr(item, key, value)
items.append(vars(item))
# 如果没有找到匹配的节点
if not items:
status = "error"
error = f"在父节点 '{parent_name}' 下找不到名称包含 '{partial_name}' 的节点"
# 查询父节点下所有类型为ProjectDivisionItem的节点作为辅助信息,不指定关系类型
helper_query = """
MATCH (p)-[*1..1]->(q:ProjectDivisionItem)
WHERE p.id = $parent_id
RETURN q.name as name
"""
helper_params = {"parent_name": parent_name}
try:
helper_result = self.session.run(helper_query, **helper_params)
for record in helper_result:
if record["name"]:
helper_info.append(record["name"])
# 如果没有找到任何节点,尝试更宽泛的查询
if not helper_info:
broader_query = """
MATCH (p)-[*1..2]->(q:ProjectDivisionItem)
WHERE p.id = $parent_id
RETURN q.name as name LIMIT 50
"""
broader_result = self.session.run(broader_query, **helper_params)
for record in broader_result:
if record["name"]:
helper_info.append(record["name"])
# 如果仍然没有找到,尝试使用名称而不是ID
if not helper_info:
name_query = """
MATCH (p {name: $parent_name})-[*1..2]->(q:ProjectDivisionItem)
RETURN q.name as name LIMIT 50
"""
name_params = {"parent_name": parent_name}
name_result = self.session.run(name_query, **name_params)
for record in name_result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
print(f"查询辅助信息时出错: {str(e)}")
helper_info = [f"查询辅助信息时出错: {str(e)}"]
return status, data, error, helper_info
# 成功找到匹配节点
data = items
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_division_by_name(self, name_part) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]:
"""
直接查找节点类型为ProjectDivisionItem且name字段包含输入名称的节点
Args:
name_part (str): 节点名称的部分内容
Returns:
Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (List[Dict[str, Any]]): 成功时返回的数据列表
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表
"""
# 初始化返回结果
status = "success"
data = []
error = ""
helper_info = []
# 检查输入名称是否为空
if not name_part or name_part.strip() == "":
status = "error"
error = "输入的名称部分不能为空"
return status, data, error, helper_info
try:
# 直接查询所有类型为ProjectDivisionItem且name包含输入名称的节点
query = """
MATCH (n:ProjectDivisionItem)
WHERE toLower(n.name) CONTAINS toLower($name_part)
RETURN n LIMIT 50
"""
params = {"name_part": name_part.strip()}
result = self.session.run(query, parameters=params)
records = list(result)
if not records:
status = "error"
error = f"找不到名称包含 '{name_part}' 的ProjectDivisionItem节点"
# 提供一些可能相近的节点名称作为辅助信息
broader_query = """
MATCH (n:ProjectDivisionItem)
RETURN n.name AS name LIMIT 20
"""
broader_result = self.session.run(broader_query)
for record in broader_result:
if record["name"]:
helper_info.append(record["name"])
return status, data, error, helper_info
# 处理查询结果
items = []
for record in records:
node_data = record["n"]
item = ProjectDivisionItem()
for key, value in node_data.items():
if hasattr(item, key):
setattr(item, key, value)
# 获取节点的完整路径作为附加信息
node_path_query = """
MATCH path = (root)-[*]->(target:ProjectDivisionItem)
WHERE id(target) = $node_id
WITH [node in nodes(path) | node.name] AS path_names
RETURN path_names
"""
path_result = self.session.run(node_path_query, parameters={"node_id": node_data.id})
path_record = path_result.single()
node_dict = vars(item)
if path_record:
node_dict["完整路径"] = "/".join([name for name in path_record["path_names"] if name])
items.append(node_dict)
data = items
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
# 工程量查询方法
def get_quantities_by_paths(self, paths_str) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
获取指定项目路径下的工程量对象
Args:
paths_str (str): 以'/'分隔的多级节点路径
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]:
- status: 'success'或'error'
- data: 成功时返回的节点数据字典,失败时为{}
- error: 错误信息,成功时为''
- helper_info: 辅助信息列表(当目标节点不存在时返回父节点下的所有ProjectQuantity节点名称)
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
if not paths_str:
status = "error"
error = "路径不能为空"
return status, data, error, helper_info
try:
# 使用通用方法获取节点,考虑所有可能的工程量类型
node_data = self.get_node_by_path(paths_str, ["ProjectQuantity", "Quota", "MainMaterial", "Equipment"])
if node_data:
# 根据节点标签或类型属性创建对应类型的对象
quantity = self._create_quantity_object(node_data)
# 填充属性
for key, value in node_data.items():
if hasattr(quantity, key):
setattr(quantity, key, value)
# 将对象转为字典
data = vars(quantity)
return status, data, error, helper_info
# 如果找不到节点,尝试获取父节点下的所有ProjectQuantity节点作为辅助信息
path_parts = paths_str.split("/")
if len(path_parts) > 1:
parent_path = "/".join(path_parts[:-1])
target_name = path_parts[-1]
# 获取父节点
parent_node_data = self.get_node_by_path(parent_path, ["ProjectDivisionItem"])
if parent_node_data:
# 查询父节点下的所有符合条件的节点
query = """
MATCH (p)-[*1..1]->(n)
WHERE p.name = $parent_name AND
any(label IN labels(n) WHERE label IN $allowed_labels)
RETURN n.name as name, labels(n) as labels
"""
params = {
"parent_name": parent_node_data.get("name", ""),
"allowed_labels": ["ProjectQuantity", "Quota", "MainMaterial", "Equipment"],
}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append({record["name"]})
except Exception as e:
print(f"查询辅助信息时出错: {str(e)}")
helper_info = [f"查询辅助信息时出错: {str(e)}"]
status = "error"
error = f"找不到路径: {paths_str}"
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_quantities_node_by_parent_and_code(
self, parent_path, quantity_type=None, code=None
) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]:
"""
通过父节点路径和编码获取工程量对象(定额、主材或设备),包括子节点
执行三步查询:
1. 找到对应路径的父节点
2. 找到父节点下多级子节点中节点类型为参数中节点类型的所有节点
3. 在找到的节点中查找编码与传入编码匹配的节点并返回
Args:
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None表示所有类型)
code (str): 工程量编码,以'/'分隔的多个编码
Returns:
Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (List[Dict[str, Any]]): 成功时返回的数据列表
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表
"""
# 初始化返回结果
status = "success"
data = []
error = ""
helper_info = []
if not code:
status = "error"
error = "编码不能为空"
return status, data, error, helper_info
# 验证类型参数是否有效
valid_types = ["定额", "主材", "设备", None]
if quantity_type not in valid_types:
status = "error"
error = f"无效的工程量类型: '{quantity_type}'"
helper_info = valid_types
return status, data, error, helper_info
try:
# 步骤1: 找到对应路径的父节点
parent_node = self.get_node_by_path(parent_path)
if not parent_node:
status = "error"
error = f"找不到父节点路径: {parent_path}"
# 尝试提供辅助信息 - 路径的各部分是否存在
path_parts = parent_path.split("/")
existing_parts = []
for i in range(len(path_parts)):
partial_path = "/".join(path_parts[: i + 1])
if self.get_node_by_path(partial_path):
existing_parts.append(partial_path)
helper_info = existing_parts
return status, data, error, helper_info
# 提取父节点名称用于后续查询
parent_name = parent_node.get("name", "")
# 根据工程量类型确定类型条件
type_condition = ""
if quantity_type == "定额":
type_condition = "(q:ProjectQuantity:Quota OR q.类型 = '0')"
elif quantity_type == "主材":
type_condition = "(q:ProjectQuantity:MainMaterial OR q.类型 = '1')"
elif quantity_type == "设备":
type_condition = "(q:ProjectQuantity:Equipment OR q.类型 = '5')"
else:
type_condition = "q:ProjectQuantity"
# 步骤2: 查找父节点下所有的指定类型的子节点
child_query = f"""
MATCH (p)-[*1..10]->(q)
WHERE p.name = $parent_name AND {type_condition}
RETURN q
"""
child_params = {"parent_name": parent_name}
child_result = self.session.run(child_query, **child_params)
child_nodes = [record["q"] for record in child_result]
if not child_nodes:
status = "error"
error = f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 的节点"
# 尝试提供辅助信息 - 父节点下有哪些节点名称
helper_query = """
MATCH (p)-[*1..10]->(q)
WHERE p.name = $parent_name
RETURN DISTINCT q.name as node_name
LIMIT 20
"""
helper_result = self.session.run(helper_query, **child_params)
helper_info = [record["node_name"] for record in helper_result if record["node_name"]]
return status, data, error, helper_info
# 步骤3: 查找编码匹配的节点
code_conditions = []
code_parts = code.split("/")
for code_part in code_parts:
if code_part.strip():
code_conditions.append(code_part.strip())
if not code_conditions:
status = "error"
error = "提供的编码为空"
return status, data, error, helper_info
# 在 child_nodes 中直接查找编码匹配的节点
matching_nodes = []
for node in child_nodes:
node_code = node.get("编码", "")
if any(code_part in str(node_code) for code_part in code_conditions):
matching_nodes.append(node)
if not matching_nodes:
status = "error"
error = f"在父节点 '{parent_name}' 下找不到编码匹配的节点"
# 尝试提供辅助信息 - 所有指定类型的子节点的编码
helper_info = [node.get("编码") for node in child_nodes if node.get("编码")]
return status, data, error, helper_info
# 处理匹配的节点,转换为字典格式
result_data = []
for node in matching_nodes:
quantity = self._create_quantity_object(node, quantity_type)
for key, value in node.items():
if hasattr(quantity, key):
setattr(quantity, key, value)
attrs = {}
for key, value in vars(quantity).items():
if not key.startswith("_"): # 排除私有属性
attrs[key] = value
result_data.append(attrs)
data = result_data
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_quantities_node_by_parent_and_name(
self, parent_path, partial_name, quantity_type=None
) -> Tuple[str, List[Dict[str, Any]], str, List[Any]]:
"""
通过父节点路径、模糊节点名称和类型获取工程量对象(主材或者设备),包括子节点
执行三步查询:
1. 找到对应路径的父节点
2. 找到父节点下多级子节点中节点类型为参数中节点类型的所有节点
3. 在找到的节点中找到名称中包含节点模糊名称的节点返回
Args:
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
partial_name (str): 目标节点的模糊或不完整名称
quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None表示所有类型)
Returns:
Tuple[str, List[Dict[str, Any]], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (List[Dict[str, Any]]): 成功时返回的数据列表
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表
"""
# 初始化返回结果
status = "success"
data = []
error = ""
helper_info = []
if not partial_name:
status = "error"
error = "节点名称不能为空"
return status, data, error, helper_info
# 验证类型参数是否有效
valid_types = ["定额", "主材", "设备", None]
if quantity_type not in valid_types:
status = "error"
error = f"无效的工程量类型: '{quantity_type}'"
helper_info = valid_types
return status, data, error, helper_info
try:
# 步骤1: 找到对应路径的父节点
parent_node = self.get_node_by_path(parent_path)
if not parent_node:
status = "error"
error = f"找不到父节点路径: {parent_path}"
# 尝试提供辅助信息 - 路径的各部分是否存在
path_parts = parent_path.split("/")
existing_parts = []
for i in range(len(path_parts)):
partial_path = "/".join(path_parts[: i + 1])
if self.get_node_by_path(partial_path):
existing_parts.append(partial_path)
helper_info = existing_parts
return status, data, error, helper_info
# 从父节点中提取名称,用于后续查询
parent_name = parent_node.get("name", "")
# 根据工程量类型确定类型条件,不使用标签过滤
type_condition = ""
if quantity_type == "定额":
# 定额类型可能是 ProjectQuantity+Quota 或者 类型='0'
type_condition = "(q:ProjectQuantity:Quota OR q.类型 = '0')"
elif quantity_type == "主材":
# 主材类型可能是 ProjectQuantity+MainMaterial 或者 类型='1'
type_condition = "(q:ProjectQuantity:MainMaterial OR q.类型 = '1')"
elif quantity_type == "设备":
# 设备类型可能是 ProjectQuantity+Equipment 或者 类型='5'
type_condition = "(q:ProjectQuantity:Equipment OR q.类型 = '5')"
else:
# 如果没有指定类型,则查询所有ProjectQuantity节点
type_condition = "q:ProjectQuantity"
# 步骤2: 找到父节点下所有的子节点,类型为指定类型的节点
# 使用父节点名称进行查询,不指定具体的关系类型
child_query = f"""
MATCH (p)-[*1..10]->(q)
WHERE p.name = $parent_name AND {type_condition}
RETURN q
"""
child_params = {"parent_name": parent_name}
child_result = self.session.run(child_query, **child_params)
child_nodes = [record["q"] for record in child_result]
if not child_nodes:
# 如果没有找到节点,尝试直接使用父节点路径的最后一部分进行查询
path_parts = parent_path.split("/")
last_part = path_parts[-1] if path_parts else ""
fallback_query = f"""
MATCH (p)-[*1..10]->(q)
WHERE p.name CONTAINS $last_part AND {type_condition}
RETURN q LIMIT 100
"""
fallback_params = {"last_part": last_part}
fallback_result = self.session.run(fallback_query, **fallback_params)
child_nodes = [record["q"] for record in fallback_result]
if not child_nodes:
status = "error"
error = f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 的节点"
# 尝试提供辅助信息 - 父节点下有哪些类型的节点
helper_query = """
MATCH (p)-[*1..10]->(q)
WHERE p.name = $parent_name
RETURN DISTINCT labels(q) as node_labels, q.类型 as node_type
LIMIT 20
"""
helper_params = {"parent_name": parent_name}
helper_result = self.session.run(helper_query, **helper_params)
helper_data = []
for record in helper_result:
labels = record["node_labels"] if record["node_labels"] else []
node_type = record["node_type"] if record["node_type"] else "未知"
helper_data.append(f"标签: {labels}, 类型: {node_type}")
helper_info = helper_data
return status, data, error, helper_info
# 步骤3: 在找到的节点中找到名称中包含节点模糊名称的节点
matching_nodes = []
available_nodes = [] # 用于存储所有可用节点的名称,作为辅助信息
for node in child_nodes:
node_name = node.get("name", "")
# 记录所有节点名称作为辅助信息
if node_name:
available_nodes.append(node_name)
if partial_name in str(node_name):
matching_nodes.append(node)
if not matching_nodes:
# 如果没有找到匹配的节点,尝试直接查询
direct_query = f"""
MATCH (q)
WHERE q.name CONTAINS $partial_name AND {type_condition}
RETURN q LIMIT 20
"""
direct_params = {"partial_name": partial_name}
direct_result = self.session.run(direct_query, **direct_params)
matching_nodes = [record["q"] for record in direct_result]
if not matching_nodes:
status = "error"
error = (
f"在父节点 '{parent_name}' 下找不到类型为 '{quantity_type}' 且名称包含 '{partial_name}' 的节点"
)
# 提供辅助信息 - 该路径下所有可用的节点名称
helper_info = available_nodes
return status, data, error, helper_info
# 处理匹配的节点,将它们转换为对象
result_data = []
for matching_node in matching_nodes:
# 创建对应类型的对象
quantity = self._create_quantity_object(matching_node, quantity_type)
# 填充属性
for key, value in matching_node.items():
if hasattr(quantity, key):
setattr(quantity, key, value)
# 将对象的属性转换为字典
attrs = {}
for key, value in vars(quantity).items():
if not key.startswith("_"): # 排除私有属性
attrs[key] = value
result_data.append(attrs)
# 成功找到匹配节点
data = result_data
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
# 辅助方法,用于根据节点数据创建对应类型的工程量对象
def _create_quantity_object(self, node_data, quantity_type=None):
"""
根据节点数据创建对应类型的工程量对象
Args:
node_data (dict): 节点数据
quantity_type (str): 工程量类型('定额'、'主材'、'设备'或None)
Returns:
ProjectQuantity: 创建的工程量对象
"""
# 如果指定了类型,直接创建对应类型的对象
if quantity_type == "定额":
return Ration()
elif quantity_type == "主材":
return Material()
elif quantity_type == "设备":
return Equipment()
# 如果没有指定类型,尝试通过节点属性或标签判断
if "类型" in node_data:
if node_data["类型"] == "0":
return Ration()
elif node_data["类型"] == "1":
return Material()
elif node_data["类型"] == "5":
return Equipment()
# 通过标签判断
labels = list(node_data.labels) if hasattr(node_data, "labels") else []
if "Quota" in labels:
return Ration()
elif "MainMaterial" in labels:
return Material()
elif "Equipment" in labels:
return Equipment()
# 默认返回基类对象
return ProjectQuantity()
# 材机查询方法实现
def get_material_equipment_by_path(self, path: str) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
通过路径获取材机对象(MaterialOrEquipment 类型节点)
Args:
path (str): 项目划分节点的路径,以'/'分隔的多级节点路径
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回的节点属性数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表:
- 如果未找到目标节点,则包含父节点下所有 MaterialOrEquipment 节点名称
- 否则为空
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 获取目标节点
node_data = self.get_node_by_path(path)
if not node_data:
status = "error"
error = f"找不到路径: {path} 上的 MaterialOrEquipment 节点"
# 提取父路径
path_parts = path.split("/")
if len(path_parts) > 1:
parent_path = "/".join(path_parts[:-1]) # 父节点路径
parent_name = path_parts[-2] # 父节点名称
# 获取父节点
parent_node = self.get_node_by_path(parent_path)
if parent_node:
# 查询父节点下所有类型为 MaterialOrEquipment 的子节点名称
query = """
MATCH (p)-[*1..1]->(m:MaterialOrEquipment)
WHERE p.name = $parent_name
RETURN m.name AS name
"""
params = {"parent_name": parent_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询父节点下的材机子节点时出错: {str(e)}"]
return status, data, error, helper_info
# 将节点属性转换为字典并过滤掉私有字段
for key, value in node_data.items():
if not key.startswith("_"):
data[key] = value
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_material_equipment_by_parent_and_code(self, parent_path, code):
"""
通过父节点路径和编码获取材机对象
Args:
parent_path (str): 父节点的部分路径,以'/'分隔的多级节点路径
code (str): 目标节点的编码属性值
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回的节点属性数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表:
- 如果未找到目标节点,则包含父节点下所有 MaterialOrEquipment 节点编码
"""
status = "success"
data = {}
error = ""
helper_info = []
try:
if not parent_path or not code:
status = "error"
error = "父节点路径或要查找的编码不能为空"
return status, data, error, helper_info
# 获取父节点路径的最后一部分
path_parts = parent_path.split("/")
last_part = path_parts[-1]
# 构建Cypher查询,查找包含指定路径名称的节点,然后找到其子孙节点中编码匹配的MaterialOrEquipment节点
# 使用任意类型的关系 [*0..10],不指定关系类型
query = """
MATCH (p)-[*0..10]->(m:MaterialOrEquipment)
WHERE p.name CONTAINS $parent_name AND m.编码 = $code
RETURN m LIMIT 5
"""
params = {"parent_name": last_part, "code": code}
result = self.session.run(query, **params)
matched_nodes = []
for record in result:
node = record["m"]
properties = dict(node.items())
matched_nodes.append(properties)
if not matched_nodes:
status = "error"
error = f"在路径包含 '{parent_path}' 的节点下没有找到编码为 '{code}' 的 MaterialOrEquipment 节点"
# 查询相似路径下的MaterialOrEquipment节点的编码作为辅助信息
helper_query = """
MATCH (p)-[*0..10]->(m:MaterialOrEquipment)
WHERE p.name CONTAINS $parent_name
RETURN m.编码 AS code, m.name AS name
LIMIT 5
"""
try:
helper_result = self.session.run(helper_query, parent_name=last_part)
for record in helper_result:
if record["code"] and record["name"]:
helper_info.append(f"{record['code']} - {record['name']}")
except Exception as e:
helper_info = [f"查询相似编码失败: {str(e)}"]
return status, data, error, helper_info
# 成功找到,返回第一个节点的数据
data = matched_nodes[0]
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
# 辅助方法,用于创建材机对象并填充属性
def _create_material_object(self, node_data):
"""
根据节点数据创建材机对象并填充属性
Args:
node_data (dict): 节点数据
Returns:
MaterialOrEquipment: 创建的材机对象
"""
material = MaterialOrEquipment()
# 填充属性
for key, value in node_data.items():
if hasattr(material, key):
setattr(material, key, value)
return material
# 取费表模板查询方法实现
def get_fee_template_by_path(self, path: str) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
通过路径获取取费表模板节点
Args:
path (str): 项目划分节点的路径,以'/'分隔的多级节点路径
例如:工程数据/取费表模板/余物清理/线路取费表(余物清理)/直接费
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回的节点属性数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表:
- 如果未找到目标节点,则包含父节点下所有 FeeCollection 节点名称
- 否则为空
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 获取目标节点
node_data = self.get_node_by_path(path)
if not node_data:
status = "error"
error = f"找不到路径: {path} 上的节点"
# 提取父路径
path_parts = path.split("/")
if len(path_parts) > 1:
parent_path = "/".join(path_parts[:-1]) # 父节点路径
parent_name = path_parts[-2] # 父节点名称
# 获取父节点
parent_node = self.get_node_by_path(parent_path)
if parent_node:
# 查询父节点下所有类型为 FeeCollection 的子节点名称
query = """
MATCH (p)-[*1..1]->(f:FeeCollection)
WHERE p.name = $parent_name
RETURN f.name AS name
"""
params = {"parent_name": parent_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询父节点下的FeeCollection子节点时出错: {str(e)}"]
return status, data, error, helper_info
# 将节点属性转换为字典并过滤掉私有字段
for key, value in node_data.items():
if not key.startswith("_"):
data[key] = value
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_fee_template_by_parent_and_name(
self, parent_path: str, node_name: str
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
通过父节点路径和节点名称获取取费表模板节点
Args:
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
例如:工程数据/取费表模板/余物清理/线路取费表(余物清理)
node_name (str): 目标节点的名称,例如:直接费
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回的节点属性数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表:
- 如果未找到父节点,则包含父节点的父节点下所有节点名称
- 如果未找到目标节点,则包含父节点下所有 FeeCollection 节点名称
- 否则为空
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 第一段:查找父节点
parent_node_data = self.get_node_by_path(parent_path)
if not parent_node_data:
status = "error"
error = f"找不到父节点路径: {parent_path}"
# 提取父节点的父路径
parent_path_parts = parent_path.split("/")
if len(parent_path_parts) > 1:
grandparent_path = "/".join(parent_path_parts[:-1]) # 父节点的父路径
grandparent_name = parent_path_parts[-2] # 父节点的父节点名称
# 获取父节点的父节点
grandparent_node = self.get_node_by_path(grandparent_path)
if grandparent_node:
# 查询父节点的父节点下所有子节点名称
query = """
MATCH (gp)-[*1..1]->(n)
WHERE gp.name = $grandparent_name
RETURN n.name AS name
"""
params = {"grandparent_name": grandparent_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询父节点的父节点下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
# 第二段:在父节点下查找目标节点
parent_node_name = parent_path.split("/")[-1] # 父节点名称
# 查询父节点下指定名称的节点
query = """
MATCH (p)-[*1..1]->(n)
WHERE p.name = $parent_name AND n.name = $node_name
RETURN n
"""
params = {"parent_name": parent_node_name, "node_name": node_name}
try:
result = self.session.run(query, **params)
target_node = None
for record in result:
target_node = record["n"]
break
if not target_node:
status = "error"
error = f"在父节点 {parent_node_name} 下找不到名称为 {node_name} 的节点"
# 查询父节点下所有类型为 FeeCollection 的子节点名称
query = """
MATCH (p)-[*1..1]->(f:FeeCollection)
WHERE p.name = $parent_name
RETURN f.name AS name
"""
params = {"parent_name": parent_node_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询父节点下的FeeCollection子节点时出错: {str(e)}"]
return status, data, error, helper_info
# 将目标节点属性转换为字典并过滤掉私有字段
for key, value in target_node.items():
if not key.startswith("_"):
data[key] = value
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询目标节点时出错: {str(e)}"
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
# 辅助方法,用于创建取费表模板对象并填充属性
def _create_fee_template_object(self, node_data):
"""
根据节点数据创建取费表模板对象并填充属性
Args:
node_data (dict): 节点数据
Returns:
FeeTableTemplateItem: 创建的取费表模板对象
"""
template = FeeTableTemplateItem()
# 填充属性
for key, value in node_data.items():
if hasattr(template, key):
setattr(template, key, value)
# 更新缓存
if hasattr(template, "OutlayID") and template.OutlayID:
self.fee_templates[template.OutlayID] = template
return template
# 费用表查询方法实现
def get_fee_schedule_on_auxiliary_expense_table(
self, table_name: str, fee_name: str, fee_attribute: str
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
在辅助费用表中查找费用
Args:
table_name (str): 费用表名称
fee_name (str): 要查找的费用名称(可能在多级子节点中)
fee_attribute (str): 费用值属性名
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回包含属性值的数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表:
- 第一步失败:返回"工程数据"节点下所有子节点名称
- 第二步失败:返回费用表节点下所有子节点名称(递归查找)
- 第三步失败:返回费用节点的所有可用属性名称
- 否则为空
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 检查输入参数
if not table_name or not fee_name or not fee_attribute:
status = "error"
error = "输入参数不能为空"
return status, data, error, helper_info
# 第一步:查找父节点(费用表节点)
parent_path = f"工程数据/工程费用/{table_name}"
parent_node_data = self.get_node_by_path(parent_path)
if not parent_node_data:
status = "error"
error = f"找不到费用表节点: {parent_path}"
# 查询"工程数据/工程费用"节点下所有子节点名称
base_parent_node = self.get_node_by_path("工程数据/工程费用")
if base_parent_node:
query = """
MATCH (p)-[*1..1]->(n)
WHERE p.name = '工程费用'
RETURN n.name AS name
"""
try:
result = self.session.run(query)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
# 第二步:在费用表节点下查找费用节点(可能在多级子节点中)
fee_node = None
# 递归查找费用节点,最多查找3级深度
query = """
MATCH (p)-[*1..3]->(f)
WHERE p.name = $table_name AND f.name = $fee_name
RETURN f LIMIT 1
"""
params = {"table_name": table_name, "fee_name": fee_name}
try:
result = self.session.run(query, **params)
all_records = result.data()
if len(all_records) > 0:
fee_node = all_records[0]["f"]
if not fee_node:
status = "error"
error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}"
# 查询费用表节点下所有子节点名称(递归查找)
query = """
MATCH (p)-[*1..3]->(n)
WHERE p.name = $table_name
RETURN DISTINCT n.name AS name
ORDER BY n.name
"""
params = {"table_name": table_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询费用表下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询费用节点时出错: {str(e)}"
return status, data, error, helper_info
# 第三步:获取费用节点的属性值
try:
if fee_node and hasattr(fee_node, "get"):
fee_value = fee_node.get(fee_attribute)
elif fee_node and isinstance(fee_node, dict):
fee_value = fee_node.get(fee_attribute)
else:
# 如果fee_node是Neo4j Node对象,尝试直接访问属性
fee_value = (
getattr(fee_node, fee_attribute, None)
if hasattr(fee_node, fee_attribute)
else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None
)
if fee_value is None:
status = "error"
error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}"
# 返回费用节点的所有可用属性名称
try:
if hasattr(fee_node, "keys"):
helper_info = list(fee_node.keys())
elif isinstance(fee_node, dict):
helper_info = list(fee_node.keys())
else:
# 对于Neo4j Node对象
helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else []
# 过滤掉私有属性
helper_info = [attr for attr in helper_info if not attr.startswith("_")]
except Exception as e:
helper_info = [f"获取费用节点属性列表时出错: {str(e)}"]
return status, data, error, helper_info
# 成功获取属性值,只返回属性值
data = fee_value
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"获取费用属性值时出错: {str(e)}"
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_fee_schedule_on_other_expense_table(
self, table_name: str, fee_name: str, fee_attribute: str
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
在其它费用表中查找费用
Args:
table_name (str): 费用表名称
fee_name (str): 要查找的费用名称(可能在多级子节点中)
fee_attribute (str): 费用值属性名
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回包含属性值的数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表:
- 第一步失败:返回"工程数据"节点下所有子节点名称
- 第二步失败:返回费用表节点下所有子节点名称(递归查找)
- 第三步失败:返回费用节点的所有可用属性名称
- 否则为空
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 检查输入参数
if not table_name or not fee_name or not fee_attribute:
status = "error"
error = "输入参数不能为空"
return status, data, error, helper_info
# 第一步:查找父节点(费用表节点)
parent_path = f"工程数据/工程费用/{table_name}"
parent_node_data = self.get_node_by_path(parent_path)
if not parent_node_data:
status = "error"
error = f"找不到费用表节点: {parent_path}"
# 查询"工程数据/工程费用"节点下所有子节点名称
base_parent_node = self.get_node_by_path("工程数据/工程费用")
if base_parent_node:
query = """
MATCH (p)-[*1..1]->(n)
WHERE p.name = '工程费用'
RETURN n.name AS name
"""
try:
result = self.session.run(query)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
# 第二步:在费用表节点下查找费用节点(可能在多级子节点中)
fee_node = None
# 递归查找费用节点,最多查找3级深度
query = """
MATCH (p)-[*1..3]->(f)
WHERE p.name = $table_name AND f.name = $fee_name
RETURN f LIMIT 1
"""
params = {"table_name": table_name, "fee_name": fee_name}
try:
result = self.session.run(query, **params)
all_records = result.data()
if len(all_records) > 0:
fee_node = all_records[0]["f"]
if not fee_node:
status = "error"
error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}"
# 查询费用表节点下所有子节点名称(递归查找)
query = """
MATCH (p)-[*1..3]->(n)
WHERE p.name = $table_name
RETURN DISTINCT n.name AS name
ORDER BY n.name
"""
params = {"table_name": table_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询费用表下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询费用节点时出错: {str(e)}"
return status, data, error, helper_info
# 第三步:获取费用节点的属性值
try:
if fee_node and hasattr(fee_node, "get"):
fee_value = fee_node.get(fee_attribute)
elif fee_node and isinstance(fee_node, dict):
fee_value = fee_node.get(fee_attribute)
else:
# 如果fee_node是Neo4j Node对象,尝试直接访问属性
fee_value = (
getattr(fee_node, fee_attribute, None)
if hasattr(fee_node, fee_attribute)
else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None
)
if fee_value is None:
status = "error"
error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}"
# 返回费用节点的所有可用属性名称
try:
if hasattr(fee_node, "keys"):
helper_info = list(fee_node.keys())
elif isinstance(fee_node, dict):
helper_info = list(fee_node.keys())
else:
# 对于Neo4j Node对象
helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else []
# 过滤掉私有属性
helper_info = [attr for attr in helper_info if not attr.startswith("_")]
except Exception as e:
helper_info = [f"获取费用节点属性列表时出错: {str(e)}"]
return status, data, error, helper_info
# 成功获取属性值,只返回属性值
data = fee_value
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"获取费用属性值时出错: {str(e)}"
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_fee_schedule_on_land_acquisition_fee_table_table(
self, table_name: str, fee_name: str, fee_attribute: str
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
在土地征用费表中查找费用
Args:
table_name (str): 费用表名称
fee_name (str): 要查找的费用名称(可能在多级子节点中)
fee_attribute (str): 费用值属性名
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回包含属性值的数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表:
- 第一步失败:返回"工程数据"节点下所有子节点名称
- 第二步失败:返回费用表节点下所有子节点名称(递归查找)
- 第三步失败:返回费用节点的所有可用属性名称
- 否则为空
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 检查输入参数
if not table_name or not fee_name or not fee_attribute:
status = "error"
error = "输入参数不能为空"
return status, data, error, helper_info
# 第一步:查找父节点(费用表节点)
parent_path = f"工程数据/工程费用/{table_name}"
parent_node_data = self.get_node_by_path(parent_path)
if not parent_node_data:
status = "error"
error = f"找不到费用表节点: {parent_path}"
# 查询"工程数据/工程费用"节点下所有子节点名称
base_parent_node = self.get_node_by_path("工程数据/工程费用")
if base_parent_node:
query = """
MATCH (p)-[*1..1]->(n)
WHERE p.name = '工程费用'
RETURN n.name AS name
"""
try:
result = self.session.run(query)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
# 第二步:在费用表节点下查找费用节点(可能在多级子节点中)
fee_node = None
# 递归查找费用节点,最多查找3级深度
query = """
MATCH (p)-[*1..3]->(f)
WHERE p.name = $table_name AND f.name = $fee_name
RETURN f LIMIT 1
"""
params = {"table_name": table_name, "fee_name": fee_name}
try:
result = self.session.run(query, **params)
all_records = result.data()
if len(all_records) > 0:
fee_node = all_records[0]["f"]
if not fee_node:
status = "error"
error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}"
# 查询费用表节点下所有子节点名称(递归查找)
query = """
MATCH (p)-[*1..3]->(n)
WHERE p.name = $table_name
RETURN DISTINCT n.name AS name
ORDER BY n.name
"""
params = {"table_name": table_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询费用表下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询费用节点时出错: {str(e)}"
return status, data, error, helper_info
# 第三步:获取费用节点的属性值
try:
if fee_node and hasattr(fee_node, "get"):
fee_value = fee_node.get(fee_attribute)
elif fee_node and isinstance(fee_node, dict):
fee_value = fee_node.get(fee_attribute)
else:
# 如果fee_node是Neo4j Node对象,尝试直接访问属性
fee_value = (
getattr(fee_node, fee_attribute, None)
if hasattr(fee_node, fee_attribute)
else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None
)
if fee_value is None:
status = "error"
error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}"
# 返回费用节点的所有可用属性名称
try:
if hasattr(fee_node, "keys"):
helper_info = list(fee_node.keys())
elif isinstance(fee_node, dict):
helper_info = list(fee_node.keys())
else:
# 对于Neo4j Node对象
helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else []
# 过滤掉私有属性
helper_info = [attr for attr in helper_info if not attr.startswith("_")]
except Exception as e:
helper_info = [f"获取费用节点属性列表时出错: {str(e)}"]
return status, data, error, helper_info
# 成功获取属性值,只返回属性值
data = fee_value
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"获取费用属性值时出错: {str(e)}"
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_fee_schedule_on_installation_price_difference_table(
self, table_name: str, fee_name: str, fee_attribute: str
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
在安装价差表中查找费用
Args:
table_name (str): 费用表名称
fee_name (str): 要查找的费用名称(可能在多级子节点中)
fee_attribute (str): 费用值属性名
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回包含属性值的数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表:
- 第一步失败:返回"工程数据"节点下所有子节点名称
- 第二步失败:返回费用表节点下所有子节点名称(递归查找)
- 第三步失败:返回费用节点的所有可用属性名称
- 否则为空
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 检查输入参数
if not table_name or not fee_name or not fee_attribute:
status = "error"
error = "输入参数不能为空"
return status, data, error, helper_info
# 第一步:查找父节点(费用表节点)
parent_path = f"工程数据/工程费用/{table_name}"
parent_node_data = self.get_node_by_path(parent_path)
if not parent_node_data:
status = "error"
error = f"找不到费用表节点: {parent_path}"
# 查询"工程数据/工程费用"节点下所有子节点名称
base_parent_node = self.get_node_by_path("工程数据/工程费用")
if base_parent_node:
query = """
MATCH (p)-[*1..1]->(n)
WHERE p.name = '工程费用'
RETURN n.name AS name
"""
try:
result = self.session.run(query)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
# 第二步:在费用表节点下查找费用节点(可能在多级子节点中)
fee_node = None
# 递归查找费用节点,最多查找3级深度
query = """
MATCH (p)-[*1..3]->(f)
WHERE p.name = $table_name AND f.name = $fee_name
RETURN f LIMIT 1
"""
params = {"table_name": table_name, "fee_name": fee_name}
try:
result = self.session.run(query, **params)
all_records = result.data()
if len(all_records) > 0:
fee_node = all_records[0]["f"]
if not fee_node:
status = "error"
error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}"
# 查询费用表节点下所有子节点名称(递归查找)
query = """
MATCH (p)-[*1..3]->(n)
WHERE p.name = $table_name
RETURN DISTINCT n.name AS name
ORDER BY n.name
"""
params = {"table_name": table_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询费用表下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询费用节点时出错: {str(e)}"
return status, data, error, helper_info
# 第三步:获取费用节点的属性值
try:
if fee_node and hasattr(fee_node, "get"):
fee_value = fee_node.get(fee_attribute)
elif fee_node and isinstance(fee_node, dict):
fee_value = fee_node.get(fee_attribute)
else:
# 如果fee_node是Neo4j Node对象,尝试直接访问属性
fee_value = (
getattr(fee_node, fee_attribute, None)
if hasattr(fee_node, fee_attribute)
else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None
)
if fee_value is None:
status = "error"
error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}"
# 返回费用节点的所有可用属性名称
try:
if hasattr(fee_node, "keys"):
helper_info = list(fee_node.keys())
elif isinstance(fee_node, dict):
helper_info = list(fee_node.keys())
else:
# 对于Neo4j Node对象
helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else []
# 过滤掉私有属性
helper_info = [attr for attr in helper_info if not attr.startswith("_")]
except Exception as e:
helper_info = [f"获取费用节点属性列表时出错: {str(e)}"]
return status, data, error, helper_info
# 成功获取属性值,只返回属性值
data = fee_value
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"获取费用属性值时出错: {str(e)}"
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_fee_schedule_on_Engineering_Cost_table(
self, table_name: str, fee_name: str, fee_attribute: str
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
在工程费用表中查找费用
Args:
table_name (str): 费用表名称
fee_name (str): 要查找的费用名称(可能在多级子节点中)
fee_attribute (str): 费用值属性名
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回包含属性值的数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表:
- 第一步失败:返回"工程数据"节点下所有子节点名称
- 第二步失败:返回费用表节点下所有子节点名称(递归查找)
- 第三步失败:返回费用节点的所有可用属性名称
- 否则为空
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 检查输入参数
if not table_name or not fee_name or not fee_attribute:
status = "error"
error = "输入参数不能为空"
return status, data, error, helper_info
# 第一步:查找父节点(费用表节点)
parent_path = f"工程数据/工程费用/{table_name}"
parent_node_data = self.get_node_by_path(parent_path)
if not parent_node_data:
status = "error"
error = f"找不到费用表节点: {parent_path}"
# 查询"工程数据/工程费用"节点下所有子节点名称
base_parent_node = self.get_node_by_path("工程数据/工程费用")
if base_parent_node:
query = """
MATCH (p)-[*1..1]->(n)
WHERE p.name = '工程费用'
RETURN n.name AS name
"""
try:
result = self.session.run(query)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询工程费用下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
# 第二步:在费用表节点下查找费用节点(可能在多级子节点中)
fee_node = None
# 递归查找费用节点,最多查找3级深度
query = """
MATCH (p)-[*1..3]->(f)
WHERE p.name = $table_name AND f.name = $fee_name
RETURN f LIMIT 1
"""
params = {"table_name": table_name, "fee_name": fee_name}
try:
result = self.session.run(query, **params)
all_records = result.data()
if len(all_records) > 0:
fee_node = all_records[0]["f"]
if not fee_node:
status = "error"
error = f"在费用表 {table_name} 下找不到费用节点: {fee_name}"
# 查询费用表节点下所有子节点名称(递归查找)
query = """
MATCH (p)-[*1..3]->(n)
WHERE p.name = $table_name
RETURN DISTINCT n.name AS name
ORDER BY n.name
"""
params = {"table_name": table_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询费用表下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询费用节点时出错: {str(e)}"
return status, data, error, helper_info
# 第三步:获取费用节点的属性值
try:
if fee_node and hasattr(fee_node, "get"):
fee_value = fee_node.get(fee_attribute)
elif fee_node and isinstance(fee_node, dict):
fee_value = fee_node.get(fee_attribute)
else:
# 如果fee_node是Neo4j Node对象,尝试直接访问属性
fee_value = (
getattr(fee_node, fee_attribute, None)
if hasattr(fee_node, fee_attribute)
else fee_node.get(fee_attribute) if hasattr(fee_node, "get") else None
)
if fee_value is None:
status = "error"
error = f"费用节点 {fee_name} 中找不到属性: {fee_attribute}"
# 返回费用节点的所有可用属性名称
try:
if hasattr(fee_node, "keys"):
helper_info = list(fee_node.keys())
elif isinstance(fee_node, dict):
helper_info = list(fee_node.keys())
else:
# 对于Neo4j Node对象
helper_info = list(fee_node.keys()) if hasattr(fee_node, "keys") else []
# 过滤掉私有属性
helper_info = [attr for attr in helper_info if not attr.startswith("_")]
except Exception as e:
helper_info = [f"获取费用节点属性列表时出错: {str(e)}"]
return status, data, error, helper_info
# 成功获取属性值,只返回属性值
data = fee_value
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"获取费用属性值时出错: {str(e)}"
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
# 工程属性查询方法
def get_project_property(self, property_name):
"""
通过属性名称获取工程属性
Args:
property_name (str): 属性名称
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回包含属性值的数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表
"""
status = "success"
data = {}
error = ""
helper_info = []
try:
# 构建查询,查找父节点类型为ProjectPropertySet下类型为ProjectProperty的节点
query = """
MATCH (parent:ProjectPropertySet)-[*1..1]->(property:ProjectProperty)
WHERE property.name = $property_name
RETURN property
LIMIT 1
"""
params = {"property_name": property_name}
result = self.session.run(query, **params)
record = result.single()
if not record:
status = "error"
error = f"找不到名称为 '{property_name}' 的工程属性节点"
# 查询类似的节点名称作为辅助信息
similar_query = """
MATCH (parent:ProjectPropertySet)-[*1..1]->(property:ProjectProperty)
RETURN property.name AS name
LIMIT 5
"""
try:
similar_result = self.session.run(similar_query)
for similar_record in similar_result:
if similar_record["name"]:
helper_info.append(similar_record["name"])
except Exception as e:
helper_info = [f"查询类似节点时出错: {str(e)}"]
return status, data, error, helper_info
# 获取节点属性
property_node = record["property"]
# 将节点所有属性添加到返回数据中
for key, value in property_node.items():
if not key.startswith("_"):
data[key] = value
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询工程属性时出错: {str(e)}"
return status, data, error, helper_info
# 项目划分查找取费表
def get_fee_table_by_project_division(
self, project_division_path, fee_name
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
通过项目划分路径和取费名称查找费用
Args:
project_division_path(str): 项目划分节点的路径
fee_name (str): 取费名称
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回包含属性值的数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 第一步:查找项目划分路径下的ProjectDivisionItem节点
division_node = self.get_node_by_path(project_division_path, ["ProjectDivisionItem"])
if not division_node:
status = "error"
error = f"找不到路径: {project_division_path} 上的ProjectDivisionItem节点"
# 提取父路径
path_parts = project_division_path.split("/")
if len(path_parts) > 1:
parent_path = "/".join(path_parts[:-1])
parent_name = path_parts[-2]
# 获取父节点下所有ProjectDivisionItem节点名称作为辅助信息
query = """
MATCH (p)-[*1..1]->(q:ProjectDivisionItem)
WHERE p.name = $parent_name
RETURN q.name as name
"""
params = {"parent_name": parent_name}
try:
result = self.session.run(query, **params)
for record in result:
if record["name"]:
helper_info.append(record["name"])
except Exception as e:
helper_info = [f"查询父节点下的子节点时出错: {str(e)}"]
return status, data, error, helper_info
# 第二步:在项目划分节点的子节点中查找属性name为"费用预览集"且关系为USE的节点
query = """
MATCH (p:ProjectDivisionItem)-[r:USE]->(c:CostSet)
WHERE id(p) = $division_node_id AND c.name = "费用预览集"
RETURN c
LIMIT 1
"""
params = {"division_node_id": division_node.id}
try:
result = self.session.run(query, **params)
record = result.single()
if not record:
status = "error"
error = f"在项目划分节点下找不到name为'费用预览集'且关系为USE的CostSet节点"
# 查询项目划分节点下的所有子节点作为辅助信息
helper_query = """
MATCH (p:ProjectDivisionItem)-[r]->(c)
WHERE id(p) = $division_node_id
RETURN type(r) as relation_type, c.name as name, labels(c) as labels
LIMIT 5
"""
helper_params = {"division_node_id": division_node.id}
helper_result = self.session.run(helper_query, **helper_params)
for helper_record in helper_result:
relation_type = helper_record.get("relation_type", "未知关系")
name = helper_record.get("name", "未知名称")
labels = helper_record.get("labels", [])
helper_info.append(f"关系类型: {relation_type}, 节点名称: {name}, 节点标签: {labels}")
return status, data, error, helper_info
cost_set_node = record["c"]
except Exception as e:
status = "error"
error = f"查询CostSet节点时出错: {str(e)}"
return status, data, error, helper_info
# 第三步:在CostSet节点的子节点中查找名称为fee_name的CostItem节点
query = """
MATCH (c:CostSet)-[*1..1]->(i:CostItem)
WHERE id(c) = $cost_set_id AND i.name = $fee_name
RETURN i
LIMIT 1
"""
params = {"cost_set_id": cost_set_node.id, "fee_name": fee_name}
try:
result = self.session.run(query, **params)
record = result.single()
if not record:
status = "error"
error = f"在CostSet节点下找不到名称为 {fee_name} 的CostItem节点"
# 查询该CostSet下所有CostItem节点名称作为辅助信息
helper_query = """
MATCH (c:CostSet)-[*1..1]->(i:CostItem)
WHERE id(c) = $cost_set_id
RETURN i.name AS name
LIMIT 5
"""
helper_params = {"cost_set_id": cost_set_node.id}
helper_result = self.session.run(helper_query, **helper_params)
for helper_record in helper_result:
if helper_record["name"]:
helper_info.append(helper_record["name"])
return status, data, error, helper_info
cost_item_node = record["i"]
# 将CostItem节点的所有属性添加到返回数据中
for key, value in cost_item_node.items():
if not key.startswith("_"): # 排除私有属性
data[key] = value
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询CostItem节点时出错: {str(e)}"
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
# 清单查找取费表
def get_fee_table_by_list(
self, parent_path, list_code, list_unit, fee_name
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
通过清单名称和取费名称查找费用
Args:
parent_path (str): 父级路径
list_code (str): 清单编号
list_unit (str): 清单单位
fee_name (str): 取费名称
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回包含属性值的数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 第一步:查找父路径下类型为List的节点,且编码和单位匹配
query = """
MATCH (p)-[*0..10]->(l:List)
WHERE p.name CONTAINS $parent_name AND l.编码 = $list_code AND l.单位 = $list_unit
RETURN l
LIMIT 1
"""
params = {"parent_name": parent_path.split("/")[-1], "list_code": list_code, "list_unit": list_unit}
try:
result = self.session.run(query, **params)
record = result.single()
if not record:
status = "error"
error = f"在路径 {parent_path} 下找不到编码为 {list_code} 且单位为 {list_unit} 的List节点"
# 查询路径下的List节点作为辅助信息
helper_query = """
MATCH (p)-[*0..10]->(l:List)
WHERE p.name CONTAINS $parent_name
RETURN l.编码 AS code, l.单位 AS unit, l.name AS name
LIMIT 5
"""
helper_params = {"parent_name": parent_path.split("/")[-1]}
helper_result = self.session.run(helper_query, **helper_params)
for helper_record in helper_result:
code = helper_record.get("code", "")
unit = helper_record.get("unit", "")
name = helper_record.get("name", "")
helper_info.append(f"{code} - {unit} - {name}")
return status, data, error, helper_info
list_node = record["l"]
except Exception as e:
status = "error"
error = f"查询List节点时出错: {str(e)}"
return status, data, error, helper_info
# 第二步:在list节点的子节点中查找属性name为"费用预览集"且关系为USE的节点
query = """
MATCH (l:List)-[r:USE]->(c:CostSet)
WHERE id(l) = $list_node_id AND c.name = "费用预览集"
RETURN c
LIMIT 1
"""
params = {"list_node_id": list_node.id}
try:
result = self.session.run(query, **params)
record = result.single()
if not record:
status = "error"
error = f"在List节点下找不到name为'费用预览集'且关系为USE的CostSet节点"
# 查询List节点下的所有子节点作为辅助信息
helper_query = """
MATCH (l:List)-[r]->(c)
WHERE id(l) = $list_node_id
RETURN type(r) as relation_type, c.name as name, labels(c) as labels
LIMIT 5
"""
helper_params = {"list_node_id": list_node.id}
helper_result = self.session.run(helper_query, **helper_params)
for helper_record in helper_result:
relation_type = helper_record.get("relation_type", "未知关系")
name = helper_record.get("name", "未知名称")
labels = helper_record.get("labels", [])
helper_info.append(f"关系类型: {relation_type}, 节点名称: {name}, 节点标签: {labels}")
return status, data, error, helper_info
cost_set_node = record["c"]
except Exception as e:
status = "error"
error = f"查询CostSet节点时出错: {str(e)}"
return status, data, error, helper_info
# 第三步:在CostSet节点的子节点中查找名称为fee_name的CostItem节点
query = """
MATCH (c:CostSet)-[*1..1]->(i:CostItem)
WHERE id(c) = $cost_set_id AND i.name = $fee_name
RETURN i
LIMIT 1
"""
params = {"cost_set_id": cost_set_node.id, "fee_name": fee_name}
try:
result = self.session.run(query, **params)
record = result.single()
if not record:
status = "error"
error = f"在CostSet节点下找不到名称为 {fee_name} 的CostItem节点"
# 查询该CostSet下所有CostItem节点名称作为辅助信息
helper_query = """
MATCH (c:CostSet)-[*1..1]->(i:CostItem)
WHERE id(c) = $cost_set_id
RETURN i.name AS name
LIMIT 5
"""
helper_params = {"cost_set_id": cost_set_node.id}
helper_result = self.session.run(helper_query, **helper_params)
for helper_record in helper_result:
if helper_record["name"]:
helper_info.append(helper_record["name"])
return status, data, error, helper_info
cost_item_node = record["i"]
# 将CostItem节点的所有属性添加到返回数据中
for key, value in cost_item_node.items():
if not key.startswith("_"): # 排除私有属性
data[key] = value
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询CostItem节点时出错: {str(e)}"
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
# 定额节点查找合价
def get_fee_table_by_quoto_code(
self, parent_path, quantity_type, code, fee_name
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
通过父级路径、工程量类型、定额编码和取费名称查找费用
Args:
parent_path (str): 父级路径
quantity_type (str): 工程量类型
code (str): 定额编码
fee_name (str): 取费名称
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回包含属性值的数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 第一步:查找父路径下类型为Quota的节点,且编码匹配
# 根据工程量类型构建节点类型条件
node_type_condition = ""
if quantity_type == "定额":
node_type_condition = "(q:ProjectQuantity:Quota OR q.类型 = '0')"
else:
status = "error"
error = f"工程量类型必须为'定额'"
return status, data, error, helper_info
query = """
MATCH (p)-[*0..10]->(q)
WHERE p.name CONTAINS $parent_name AND q.编码 = $code AND {type_condition}
RETURN q
LIMIT 1
""".format(
type_condition=node_type_condition
)
params = {"parent_name": parent_path.split("/")[-1], "code": code}
try:
result = self.session.run(query, **params)
record = result.single()
if not record:
status = "error"
error = f"在路径 {parent_path} 下找不到编码为 {code} 的Quota节点"
# 查询路径下的Quota节点作为辅助信息
helper_query = """
MATCH (p)-[*0..10]->(q)
WHERE p.name CONTAINS $parent_name AND {type_condition}
RETURN q.编码 AS code, q.name AS name
LIMIT 5
""".format(
type_condition=node_type_condition
)
helper_params = {"parent_name": parent_path.split("/")[-1]}
helper_result = self.session.run(helper_query, **helper_params)
for helper_record in helper_result:
code_val = helper_record.get("code", "")
name = helper_record.get("name", "")
helper_info.append(f"{code_val} - {name}")
return status, data, error, helper_info
quota_node = record["q"]
# 获取GUID
guid = quota_node.get("GUID")
if not guid:
status = "error"
error = f"Quota节点没有GUID属性"
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询Quota节点时出错: {str(e)}"
return status, data, error, helper_info
# 第二步:查找GUID与Quota的GUID匹配的CostSet节点
query = """
MATCH (c:CostSet)
WHERE c.name = $guid
RETURN c
LIMIT 1
"""
params = {"guid": guid}
try:
result = self.session.run(query, **params)
record = result.single()
if not record:
status = "error"
error = f"找不到与GUID: {guid} 匹配的CostSet节点"
# 查找一些CostSet节点名称作为辅助信息
helper_query = """
MATCH (c:CostSet)
RETURN c.name AS name
LIMIT 5
"""
helper_result = self.session.run(helper_query)
for helper_record in helper_result:
if helper_record["name"]:
helper_info.append(helper_record["name"])
return status, data, error, helper_info
cost_set_node = record["c"]
except Exception as e:
status = "error"
error = f"查询CostSet节点时出错: {str(e)}"
return status, data, error, helper_info
# 第三步:在CostSet节点的子节点中查找名称为fee_name的CostItem节点
query = """
MATCH (c:CostSet)-[*1..1]->(i:CostItem)
WHERE c.name = $guid AND i.name = $fee_name
RETURN i
LIMIT 1
"""
params = {"guid": guid, "fee_name": fee_name}
try:
result = self.session.run(query, **params)
record = result.single()
if not record:
status = "error"
error = f"在CostSet节点 {guid} 下找不到名称为 {fee_name} 的CostItem节点"
# 查询该CostSet下所有CostItem节点名称作为辅助信息
helper_query = """
MATCH (c:CostSet)-[*1..1]->(i:CostItem)
WHERE c.name = $guid
RETURN i.name AS name
LIMIT 5
"""
helper_params = {"guid": guid}
helper_result = self.session.run(helper_query, **helper_params)
for helper_record in helper_result:
if helper_record["name"]:
helper_info.append(helper_record["name"])
return status, data, error, helper_info
cost_item_node = record["i"]
# 将CostItem节点的所有属性添加到返回数据中
for key, value in cost_item_node.items():
if not key.startswith("_"): # 排除私有属性
data[key] = value
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询CostItem节点时出错: {str(e)}"
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
def get_fee_table_by_quantities_name(
self, parent_path, quantity_type, quantity_name, fee_name
) -> Tuple[str, Dict[str, Any], str, List[Any]]:
"""
通过父级路径、工程量类型、工程量名称和取费名称查找费用
Args:
parent_path (str): 父级路径
quantity_type (str): 工程量类型
quantity_name (str): 工程量名称
fee_name (str): 取费名称
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回包含属性值的数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 第一步:根据工程量类型构建节点类型条件
node_type_condition = ""
if quantity_type == "主材":
node_type_condition = "(q:ProjectQuantity:MainMaterial OR q.类型 = '1')"
elif quantity_type == "设备":
node_type_condition = "(q:ProjectQuantity:Equipment OR q.类型 = '5')"
else:
status = "error"
error = f"工程量类型必须为'主材'或'设备'"
helper_info = ["主材", "设备"]
return status, data, error, helper_info
# 查找父路径下指定类型的节点,且名称匹配
query = """
MATCH (p)-[*0..10]->(q)
WHERE p.name CONTAINS $parent_name AND q.name CONTAINS $quantity_name AND {type_condition}
RETURN q
LIMIT 1
""".format(
type_condition=node_type_condition
)
params = {"parent_name": parent_path.split("/")[-1], "quantity_name": quantity_name}
try:
result = self.session.run(query, **params)
record = result.single()
if not record:
status = "error"
error = (
f"在路径 {parent_path} 下找不到类型为 {quantity_type} 且名称包含 {quantity_name} 的工程量节点"
)
# 查询路径下的工程量节点作为辅助信息
helper_query = """
MATCH (p)-[*0..10]->(q)
WHERE p.name CONTAINS $parent_name AND {type_condition}
RETURN q.name AS name
LIMIT 5
""".format(
type_condition=node_type_condition
)
helper_params = {"parent_name": parent_path.split("/")[-1]}
helper_result = self.session.run(helper_query, **helper_params)
for helper_record in helper_result:
name = helper_record.get("name", "")
helper_info.append(name)
return status, data, error, helper_info
quantity_node = record["q"]
# 获取GUID
guid = quantity_node.get("GUID")
if not guid:
status = "error"
error = f"工程量节点没有GUID属性"
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询工程量节点时出错: {str(e)}"
return status, data, error, helper_info
# 第二步:查找GUID与工程量节点的GUID匹配的CostSet节点
query = """
MATCH (c:CostSet)
WHERE c.name = $guid
RETURN c
LIMIT 1
"""
params = {"guid": guid}
try:
result = self.session.run(query, **params)
record = result.single()
if not record:
status = "error"
error = f"找不到与GUID: {guid} 匹配的CostSet节点"
# 查找一些CostSet节点名称作为辅助信息
helper_query = """
MATCH (c:CostSet)
RETURN c.name AS name
LIMIT 5
"""
helper_result = self.session.run(helper_query)
for helper_record in helper_result:
if helper_record["name"]:
helper_info.append(helper_record["name"])
return status, data, error, helper_info
cost_set_node = record["c"]
except Exception as e:
status = "error"
error = f"查询CostSet节点时出错: {str(e)}"
return status, data, error, helper_info
# 第三步:在CostSet节点的子节点中查找名称为fee_name的CostItem节点
query = """
MATCH (c:CostSet)-[*1..1]->(i:CostItem)
WHERE c.name = $guid AND i.name = $fee_name
RETURN i
LIMIT 1
"""
params = {"guid": guid, "fee_name": fee_name}
try:
result = self.session.run(query, **params)
record = result.single()
if not record:
status = "error"
error = f"在CostSet节点 {guid} 下找不到名称为 {fee_name} 的CostItem节点"
# 查询该CostSet下所有CostItem节点名称作为辅助信息
helper_query = """
MATCH (c:CostSet)-[*1..1]->(i:CostItem)
WHERE c.name = $guid
RETURN i.name AS name
LIMIT 5
"""
helper_params = {"guid": guid}
helper_result = self.session.run(helper_query, **helper_params)
for helper_record in helper_result:
if helper_record["name"]:
helper_info.append(helper_record["name"])
return status, data, error, helper_info
cost_item_node = record["i"]
# 将CostItem节点的所有属性添加到返回数据中
for key, value in cost_item_node.items():
if not key.startswith("_"): # 排除私有属性
data[key] = value
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询CostItem节点时出错: {str(e)}"
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info
# 材机节点查找市场价
def get_fee_by_material_equipment_code(self, parent_path, material_equipment_code, fee_name):
"""
通过父级路径、材机编码和取费名称查找费用
Args:
parent_path (str): 父级路径
material_equipment_code (str): 材机编码
fee_name (str): 取费名称
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回包含属性值的数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表
"""
pass
# 查找清单节点
def get_fee_table_by_list_name(self, parent_path, list_code, unit):
"""
通过父级路径、清单编号和单位查找清单
Args:
parent_path (str): 父级路径
list_code (str): 清单编号
unit (str): 单位
Returns:
Tuple[str, Dict[str, Any], str, List[Any]]: 包含状态、数据、错误信息和辅助信息的元组
- status (str): 状态,'success'或'error'
- data (Dict[str, Any]): 成功时返回包含属性值的数据
- error (str): 错误时的错误信息
- helper_info (List[Any]): 辅助信息列表
"""
# 初始化返回结果
status = "success"
data = {}
error = ""
helper_info = []
try:
# 查找父路径下类型为List的节点,且编码和单位匹配
query = """
MATCH (p)-[*0..10]->(l:List)
WHERE p.name CONTAINS $parent_name AND l.编码 = $list_code AND l.单位 = $unit
RETURN l
LIMIT 1
"""
params = {"parent_name": parent_path.split("/")[-1], "list_code": list_code, "unit": unit}
try:
result = self.session.run(query, **params)
record = result.single()
if not record:
status = "error"
error = f"在路径 {parent_path} 下找不到编码为 {list_code} 且单位为 {unit} 的List节点"
# 查询路径下的List节点作为辅助信息
helper_query = """
MATCH (p)-[*0..10]->(l:List)
WHERE p.name CONTAINS $parent_name
RETURN l.编码 AS code, l.单位 AS unit, l.name AS name
LIMIT 5
"""
helper_params = {"parent_name": parent_path.split("/")[-1]}
helper_result = self.session.run(helper_query, **helper_params)
for helper_record in helper_result:
code = helper_record.get("code", "")
unit_val = helper_record.get("unit", "")
name = helper_record.get("name", "")
helper_info.append(f"{code} - {unit_val} - {name}")
return status, data, error, helper_info
list_node = record["l"]
# 将节点的所有属性添加到返回数据中
for key, value in list_node.items():
if not key.startswith("_"): # 排除私有属性
data[key] = value
return status, data, error, helper_info
except Exception as e:
status = "error"
error = f"查询List节点时出错: {str(e)}"
return status, data, error, helper_info
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"查询出错: {error_details}")
status = "error"
error = f"查询失败: {str(e)}"
return status, data, error, helper_info