Initial commit

This commit is contained in:
ruxia
2025-06-03 13:45:48 +08:00
commit c5f565d7df
11 changed files with 11910 additions and 0 deletions
+727
View File
@@ -0,0 +1,727 @@
[
{
"指标名称": "测试1",
"指标描述": "xxx",
"code": "xxxxxxxx"
},
{
"指标名称": "测试2",
"指标描述": "xxx",
"code": "xxxxxxxx"
},
{
"指标名称": "电压等级",
"指标描述": "",
"code": ""
},
{
"指标名称": "工程编码",
"指标描述": "",
"code": ""
},
{
"指标名称": "工程名称",
"指标描述": "",
"code": ""
},
{
"指标名称": "工程时间",
"指标描述": "",
"code": ""
},
{
"指标名称": "线路曲折系数",
"指标描述": "",
"code": ""
},
{
"指标名称": "线路长度合计_折单",
"指标描述": "",
"code": ""
},
{
"指标名称": "路径长度_单回路长度",
"指标描述": "",
"code": ""
},
{
"指标名称": "路径长度_双回路长度",
"指标描述": "",
"code": ""
},
{
"指标名称": "路径长度_三回路长度",
"指标描述": "",
"code": ""
},
{
"指标名称": "路径长度_四回路长度",
"指标描述": "",
"code": ""
},
{
"指标名称": "杆塔总基数",
"指标描述": "查找一下【工程数据/安装工程/安装/架空输电线路本体工程/基础工程】的【定额】下的【YX2-1/YX2-2/YX2-3/YX2-4/YX2-5/YX2-6/YX2-7】",
"code": "def neo4j_find_function():\n project = ProjectBuilder.build()\n result = project.get_quantities_node_by_parent_and_code('工程数据/安装工程/安装/架空输电线路本体工程/基础工程', '定额', 'YX2-1/YX2-2/YX2-3/YX2-4/YX2-5/YX2-6/YX2-7')\n print(result)"
},
{
"指标名称": "角钢塔_塔基数",
"指标描述": "",
"code": ""
},
{
"指标名称": "角钢塔_塔材量",
"指标描述": "查找一下【工程数据/安装工程/安装/架空输电线路本体工程/杆塔工程/杆塔组立/铁塔、钢管杆组立】的【主材】下的【角钢】",
"code": ""
},
{
"指标名称": "角钢塔_其中:高强钢塔材量",
"指标描述": "查找一下【工程数据/安装工程/安装/架空输电线路本体工程/杆塔工程/杆塔组立/铁塔、钢管杆组立】的【主材】下的【角钢、高强】",
"code": ""
},
{
"指标名称": "角钢塔_塔材装材费",
"指标描述": "",
"code": ""
},
{
"指标名称": "角钢塔_塔材装材费_元",
"指标描述": "查找一下【工程数据/安装工程/安装/架空输电线路本体工程/杆塔工程/杆塔组立/铁塔、钢管杆组立】的【主材】下的【角钢】",
"code": ""
},
{
"指标名称": "角钢塔_其中:高强钢塔材费用",
"指标描述": "",
"code": ""
},
{
"指标名称": "角钢塔_其中:高强钢塔材费用_元",
"指标描述": "查找一下【工程数据/安装工程/安装/架空输电线路本体工程/杆塔工程/杆塔组立/铁塔、钢管杆组立】的【主材】下的【角钢、高强】",
"code": ""
},
{
"指标名称": "钢管塔_塔基数",
"指标描述": "",
"code": ""
},
{
"指标名称": "钢管塔_塔材量",
"指标描述": "查找一下【工程数据/安装工程/安装/架空输电线路本体工程/杆塔工程/杆塔组立/铁塔、钢管杆组立】的【主材】下的【钢管塔】",
"code": ""
},
{
"指标名称": "钢管塔_钢管价格",
"指标描述": "",
"code": ""
},
{
"指标名称": "钢管塔_钢管价格_元",
"指标描述": "查找一下【工程数据/安装工程/安装/架空输电线路本体工程/杆塔工程/杆塔组立/铁塔、钢管杆组立】的【主材】下的【钢管塔】",
"code": ""
},
{
"指标名称": "钢管杆_塔基数",
"指标描述": "",
"code": ""
},
{
"指标名称": "钢管杆_塔材量",
"指标描述": "查找一下【工程数据/安装工程/安装/架空输电线路本体工程/杆塔工程/杆塔组立/铁塔、钢管杆组立】的【主材】下的【钢管杆】",
"code": ""
},
{
"指标名称": "钢管杆_钢管价格",
"指标描述": "",
"code": ""
},
{
"指标名称": "钢管杆_钢管价格_元",
"指标描述": "查找一下【工程数据/安装工程/安装/架空输电线路本体工程/杆塔工程/杆塔组立/铁塔、钢管杆组立】的【主材】下的【钢管杆】",
"code": ""
},
{
"指标名称": "水泥杆基数",
"指标描述": "",
"code": ""
},
{
"指标名称": "直线塔基数",
"指标描述": "",
"code": ""
},
{
"指标名称": "耐张转角塔基数",
"指标描述": "",
"code": ""
},
{
"指标名称": "海拔",
"指标描述": "",
"code": ""
},
{
"指标名称": "导线及线材_分裂数",
"指标描述": "",
"code": ""
},
{
"指标名称": "导线及线材_单根导线面积",
"指标描述": "",
"code": ""
},
{
"指标名称": "导线及线材_导线量",
"指标描述": "",
"code": ""
},
{
"指标名称": "导线及线材_其中:节能导线量",
"指标描述": "",
"code": ""
},
{
"指标名称": "导线及线材_导线装材费",
"指标描述": "",
"code": ""
},
{
"指标名称": "导线及线材_导线装材费_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "导线及线材_其中:节能导线费用",
"指标描述": "",
"code": ""
},
{
"指标名称": "导线及线材_其中:节能导线费用_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "导线及线材_导线类型",
"指标描述": "",
"code": ""
},
{
"指标名称": "设计风速",
"指标描述": "",
"code": ""
},
{
"指标名称": "覆冰厚度",
"指标描述": "",
"code": ""
},
{
"指标名称": "地形分布_平地",
"指标描述": "",
"code": ""
},
{
"指标名称": "地形分布_丘陵",
"指标描述": "",
"code": ""
},
{
"指标名称": "地形分布_河网",
"指标描述": "",
"code": ""
},
{
"指标名称": "地形分布_泥沼",
"指标描述": "",
"code": ""
},
{
"指标名称": "地形分布_山地",
"指标描述": "",
"code": ""
},
{
"指标名称": "地形分布_高山",
"指标描述": "",
"code": ""
},
{
"指标名称": "地形分布_沙漠",
"指标描述": "",
"code": ""
},
{
"指标名称": "地形分布_峻岭",
"指标描述": "",
"code": ""
},
{
"指标名称": "地质条件_普通土",
"指标描述": "",
"code": ""
},
{
"指标名称": "地质条件_坚土",
"指标描述": "",
"code": ""
},
{
"指标名称": "地质条件_松砂石",
"指标描述": "",
"code": ""
},
{
"指标名称": "地质条件_水坑",
"指标描述": "",
"code": ""
},
{
"指标名称": "地质条件_泥水坑",
"指标描述": "",
"code": ""
},
{
"指标名称": "地质条件_流沙坑",
"指标描述": "",
"code": ""
},
{
"指标名称": "地质条件_岩石爆破",
"指标描述": "",
"code": ""
},
{
"指标名称": "地质条件_岩石人工",
"指标描述": "",
"code": ""
},
{
"指标名称": "土石方总量",
"指标描述": "",
"code": ""
},
{
"指标名称": "土石方量_基坑",
"指标描述": "",
"code": ""
},
{
"指标名称": "土石方量_接地",
"指标描述": "",
"code": ""
},
{
"指标名称": "土石方量_基面",
"指标描述": "",
"code": ""
},
{
"指标名称": "各类基础数量占总塔基数比例_台阶式",
"指标描述": "",
"code": ""
},
{
"指标名称": "各类基础数量占总塔基数比例_板式",
"指标描述": "",
"code": ""
},
{
"指标名称": "各类基础数量占总塔基数比例_插入式",
"指标描述": "",
"code": ""
},
{
"指标名称": "各类基础数量占总塔基数比例_掏挖",
"指标描述": "",
"code": ""
},
{
"指标名称": "各类基础数量占总塔基数比例_岩石嵌固",
"指标描述": "",
"code": ""
},
{
"指标名称": "各类基础数量占总塔基数比例_锚杆",
"指标描述": "",
"code": ""
},
{
"指标名称": "各类基础数量占总塔基数比例_灌注桩",
"指标描述": "",
"code": ""
},
{
"指标名称": "各类基础数量占总塔基数比例_人工挖孔桩",
"指标描述": "",
"code": ""
},
{
"指标名称": "各类基础数量占总塔基数比例_其他",
"指标描述": "",
"code": ""
},
{
"指标名称": "台阶式基础基数",
"指标描述": "",
"code": ""
},
{
"指标名称": "板式基础基数",
"指标描述": "",
"code": ""
},
{
"指标名称": "插入式基础基数",
"指标描述": "",
"code": ""
},
{
"指标名称": "掏挖基础基数",
"指标描述": "",
"code": ""
},
{
"指标名称": "岩石嵌固基础基数",
"指标描述": "",
"code": ""
},
{
"指标名称": "锚杆基础基数",
"指标描述": "",
"code": ""
},
{
"指标名称": "灌注桩基础基数",
"指标描述": "",
"code": ""
},
{
"指标名称": "人工挖孔桩基础基数",
"指标描述": "",
"code": ""
},
{
"指标名称": "其他基础基数",
"指标描述": "",
"code": ""
},
{
"指标名称": "基础混凝土总量",
"指标描述": "",
"code": ""
},
{
"指标名称": "灌注桩基础混凝土量",
"指标描述": "",
"code": ""
},
{
"指标名称": "现浇基础混凝土量",
"指标描述": "",
"code": ""
},
{
"指标名称": "挖孔基础混凝土量",
"指标描述": "",
"code": ""
},
{
"指标名称": "基础护壁混凝土用量",
"指标描述": "",
"code": ""
},
{
"指标名称": "预制混凝土用量",
"指标描述": "",
"code": ""
},
{
"指标名称": "基础钢材量",
"指标描述": "查找一下【工程数据/安装工程/安装/架空输电线路本体工程/基础工程/基础砌筑】的【主材】下的【圆钢】",
"code": ""
},
{
"指标名称": "基础钢材价格",
"指标描述": "查找一下【工程数据/安装工程/安装/架空输电线路本体工程/基础工程/基础砌筑】的【主材】下的【圆钢】",
"code": ""
},
{
"指标名称": "本体费用合计",
"指标描述": "",
"code": ""
},
{
"指标名称": "本体工程人工费",
"指标描述": "",
"code": ""
},
{
"指标名称": "本体工程机械费",
"指标描述": "",
"code": ""
},
{
"指标名称": "基础工程费用",
"指标描述": "",
"code": ""
},
{
"指标名称": "杆塔工程费用",
"指标描述": "",
"code": ""
},
{
"指标名称": "接地工程费用",
"指标描述": "",
"code": ""
},
{
"指标名称": "架线工程费用",
"指标描述": "",
"code": ""
},
{
"指标名称": "附件工程费用",
"指标描述": "",
"code": ""
},
{
"指标名称": "辅助工程费用",
"指标描述": "",
"code": ""
},
{
"指标名称": "辅助设施工程",
"指标描述": "",
"code": ""
},
{
"指标名称": "其他费用合计",
"指标描述": "",
"code": ""
},
{
"指标名称": "建场费合计",
"指标描述": "",
"code": ""
},
{
"指标名称": "项目建设管理费合计",
"指标描述": "",
"code": ""
},
{
"指标名称": "其中:工程监理费",
"指标描述": "",
"code": ""
},
{
"指标名称": "项目建设技术服务费合计",
"指标描述": "",
"code": ""
},
{
"指标名称": "其中:项目前期工作费",
"指标描述": "",
"code": ""
},
{
"指标名称": "其中:勘察费",
"指标描述": "",
"code": ""
},
{
"指标名称": "其中:设计费",
"指标描述": "",
"code": ""
},
{
"指标名称": "其中:工程建设检测费",
"指标描述": "",
"code": ""
},
{
"指标名称": "生产准备费",
"指标描述": "",
"code": ""
},
{
"指标名称": "其中:安全文明施工费",
"指标描述": "",
"code": ""
},
{
"指标名称": "基本预备费",
"指标描述": "",
"code": ""
},
{
"指标名称": "静态投资",
"指标描述": "",
"code": ""
},
{
"指标名称": "建设期利息",
"指标描述": "",
"code": ""
},
{
"指标名称": "动态投资",
"指标描述": "",
"code": ""
},
{
"指标名称": "增值税抵扣税额",
"指标描述": "",
"code": ""
},
{
"指标名称": "本体费用合计_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "本体工程人工费_本体_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "本体工程人工费_调试_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "本体工程机械费_本体_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "本体工程机械费_调试_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "基础工程费用_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "杆塔工程费用_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "接地工程费用_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "架线工程费用_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "附件工程费用_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "辅助工程费用_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "辅助工程费用_调试_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "辅助设施工程_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "其他费用合计_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "建场费合计_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "项目建设管理费合计_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "其中:工程监理费_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "项目建设技术服务费合计_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "其中:项目前期工作费_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "其中:勘察费_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "其中:设计费_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "其中:工程建设检测费_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "生产准备费_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "其中:安全文明施工费_线路_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "其中:安全文明施工费_调试_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "基本预备费_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "静态投资_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "建设期利息_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "动态投资_元",
"指标描述": "",
"code": ""
},
{
"指标名称": "增值税抵扣税额_元",
"指标描述": "",
"code": ""
}
]
File diff suppressed because it is too large Load Diff
+577
View File
@@ -0,0 +1,577 @@
from langchain.chains import LLMChain
from langchain_openai import OpenAI
from langchain_experimental.utilities import PythonREPL
from project_implementation import ProjectBuilder
from prompt_templates import FUNCTION_CALL_PROMPT
import inspect
import project
import io
import sys
from parameter_rewriting import rewrite_query_parameters, KnowledgeGraphProcessor
import json
from langchain.agents import Tool, AgentExecutor, create_react_agent
from langchain.prompts import PromptTemplate
from llm import llm
# 获取ProjectTookiIt类的方法定义
def get_project_class_methods():
"""
从project模块中提取ProjectTookiIt类的方法定义
Returns:
str: 格式化后的方法定义字符串
"""
project_class_code = inspect.getsource(project.ProjectTookiIt)
lines = project_class_code.split("\n")
result_lines = []
in_class = False
skip_init = False
for line in lines:
if line.strip().startswith("class ProjectTookiIt"):
in_class = True
result_lines.append(line)
elif in_class:
if line.strip().startswith("def __init__"):
skip_init = True
elif skip_init and line.strip() and not line.startswith(" " * 8):
skip_init = False
if not skip_init:
result_lines.append(line)
return "\n".join(result_lines)
# 创建动态提示模板
project_class_methods = get_project_class_methods()
# 创建 Chain
function_call_chain = LLMChain(llm=llm, prompt=FUNCTION_CALL_PROMPT, output_key="code")
# Python 执行器
repl = PythonREPL()
# 创建知识图谱处理器实例
kg_processor = KnowledgeGraphProcessor()
# 定义搜索知识库的工具
def search_knowledge_base(query):
"""
在知识库中搜索关键词
Args:
query (str): 搜索关键词
Returns:
str: 搜索结果的JSON字符串
"""
found_data = kg_processor._search_in_kg(query)
if found_data:
return json.dumps(found_data, ensure_ascii=False, indent=2)
else:
return f"未找到与'{query}'相关的信息"
# 定义获取节点定义的工具
def get_node_definition(node_type):
"""
获取节点类型的定义
Args:
node_type (str): 节点类型名称
Returns:
str: 节点类型定义
"""
definition = kg_processor._get_node_definition(node_type)
return definition
# 创建工具列表
tools = [
Tool(
name="search_knowledge_base",
func=search_knowledge_base,
description="在知识库中搜索关键词,返回相关信息。输入应该是一个搜索关键词。",
),
Tool(
name="get_node_definition",
func=get_node_definition,
description="获取节点类型的定义,返回类型定义代码。输入应该是一个节点类型名称。",
),
]
# 创建Agent
agent = create_react_agent(llm, tools, FUNCTION_CALL_PROMPT)
# 创建Agent执行器
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)
def nl_query_to_function_call(input_data):
"""
将自然语言查询转换为函数调用并执行,或直接执行提供的代码
Args:
input_data (dict): 包含type和value的字典
{
"type": "query|code",
"value": "查询内容或代码"
}
Returns:
dict: 包含状态码、消息和数据的字典
"""
input_type = input_data.get("type", "query")
input_value = input_data.get("value", "")
max_retries = 1 # 设置最大重试次数
current_retry = 0
original_query = input_value # 保存原始查询用于RAG
print(f"\n====== 开始处理查询 ======")
print(f"查询类型: {input_type}")
print(f"查询内容: {input_value}")
while current_retry <= max_retries:
print(f"\n----- 尝试 #{current_retry + 1} -----")
# 如果type是query,使用LLM生成代码
if input_type == "query" and current_retry == 0:
# 从查询中提取关键部分
import re
# 提取【】中的内容
path_parts = re.findall(r"【([^】]+)】", input_value)
# 创建临时代码
temp_code = f'search("{input_value}")'
# 对于每个路径,只添加最后一个部分
for part in path_parts:
if "/" in part:
# 提取路径中的最后一个部分
last_part = part.split("/")[-1].strip()
if last_part:
temp_code += f'\nsearch("{last_part}")'
else:
# 如果没有/,直接使用整个部分
temp_code += f'\nsearch("{part}")'
# 获取知识库内容和节点定义
knowledge_base, node_definitions = kg_processor._get_relevant_knowledge(temp_code)
# 使用Agent执行查询
agent_response = agent_executor.invoke(
{
"query": input_value,
"project_class_methods": project_class_methods,
"KnowledgeBase": knowledge_base,
"NodeDefinition": node_definitions,
}
)
# 从Agent响应中提取代码
code = agent_response["output"]
print(f"\n生成的代码:\n{code}")
else:
print(f"\n使用重写后的代码:\n{input_value}")
code = input_value
# 保存原始代码用于返回
original_code = code
# 执行生成的函数并捕获输出
try:
# 创建一个新的命名空间来执行代码,包含必要的导入
namespace = {
"ProjectBuilder": ProjectBuilder, # 添加ProjectBuilder到命名空间
"project_implementation": __import__("project_implementation"),
"project": __import__("project"),
}
# 执行生成的代码,定义neo4j_find_function函数
exec(code, namespace)
# 重定向stdout来捕获print输出
old_stdout = sys.stdout
redirected_output = io.StringIO()
sys.stdout = redirected_output
try:
# 执行函数
print("\n执行代码...")
result = namespace["neo4j_find_function"]()
# 获取捕获的输出
output = redirected_output.getvalue().strip()
print(f"\n原始输出:\n{output}")
# 检查结果是否为空
is_empty_result = (
not output
or output.lower() == "none"
or output == "[]"
or "未找到" in output
or "[]" in output
or "None" in output
or result is None
)
# 如果结果为空,走重写流程
if is_empty_result:
print("\n查询未找到结果,尝试定位具体缺失节点...")
# 解析原始查询路径中的最后一个节点名
import re
match = re.search(r"【([^】]+)】\s*$", original_query)
missing_node = match.group(1) if match else "未知节点"
error_info = {
"error_type": "NodeNotFoundError",
"error_message": f"{missing_node} 未找到,请检查该节点是否存在。",
"missing_node": missing_node,
"original_query": original_query,
"executed_code": original_code,
}
print("结构化错误信息:")
print(json.dumps(error_info, ensure_ascii=False, indent=2))
if current_retry < max_retries:
print("\n尝试使用RAG重写查询...")
try:
# 使用RAG重写查询和代码,并传递错误信息
rewritten = rewrite_query_parameters(original_query, original_code, error_info)
print(f"\nRAG重写结果: {json.dumps(rewritten, ensure_ascii=False, indent=2)}")
# 更新查询和代码
if "query" in rewritten and "code" in rewritten and rewritten["code"] != original_code:
print("\nRAG重写成功,使用新代码重试...")
input_value = rewritten["code"] # 直接使用重写后的代码
input_type = "code" # 切换到代码模式
current_retry += 1
continue # 继续下一次循环
else:
print("\nRAG重写未产生新代码,返回原始错误")
except Exception as e:
print(f"\nRAG重写失败: {e}")
# 记录错误但继续执行
# RAG重写失败或未产生新代码,返回原始错误
query_status = (
"第一次查询失败,RAG重写也失败"
if current_retry == 0
else f"{current_retry+1}次查询失败,RAG重写也失败"
)
print(f"\n{query_status}")
return {
"code": 1,
"message": f"{missing_node} 未找到,请检查该节点是否存在。",
"data": {"value": "", "code": original_code},
"error_info": error_info,
"query_status": query_status,
}
# 清理输出,只保留有用的结果部分
clean_output = output
# 如果输出包含查询结果数量和对象引用
if "查询结果数量:" in output and "<project." in output:
# 提取查询结果部分
import re
# 尝试提取节点属性
node_match = re.search(r"找到节点: <Node.*?properties=({.*?})>", output, re.DOTALL)
if node_match:
props_str = node_match.group(1).replace("'", '"')
try:
import ast
props = ast.literal_eval(props_str)
clean_output = json.dumps(props, ensure_ascii=False, indent=2)
except:
pass
# 如果有查询结果数量信息
count_match = re.search(r"查询结果数量: (\d+)", output)
if count_match:
count = count_match.group(1)
if count == "0":
clean_output = "未找到匹配的数据。"
is_empty_result = True
elif not node_match: # 如果没有提取到节点属性但有结果
clean_output = f"找到 {count} 条匹配结果"
# 检查结果对象
if result is not None:
if isinstance(result, list):
if not result: # 空列表
is_empty_result = True
else:
# 处理非空列表
formatted_items = []
for item in result:
if hasattr(item, "__dict__"):
# 提取对象的所有属性
attrs = {k: v for k, v in item.__dict__.items() if not k.startswith("_")}
formatted_items.append(attrs)
else:
formatted_items.append(str(item))
if not is_empty_result: # 只有在不是空结果时才返回成功
query_status = (
"第一次查询成功"
if current_retry == 0
else f"{current_retry+1}次查询成功(RAG重写后)"
)
print(f"\n{query_status}")
return {
"code": 0,
"message": "成功",
"data": {
"value": json.dumps(formatted_items, ensure_ascii=False, indent=2),
"code": original_code,
},
"query_status": query_status,
}
elif hasattr(result, "__dict__"):
# 单个对象
attrs = {k: v for k, v in result.__dict__.items() if not k.startswith("_")}
if not is_empty_result: # 只有在不是空结果时才返回成功
query_status = (
"第一次查询成功" if current_retry == 0 else f"{current_retry+1}次查询成功(RAG重写后)"
)
print(f"\n{query_status}")
return {
"code": 0,
"message": "成功",
"data": {
"value": json.dumps(attrs, ensure_ascii=False, indent=2),
"code": original_code,
},
"query_status": query_status,
}
# 如果没有对象属性但有清理后的输出,且不是空结果
if (
clean_output
and clean_output.lower() != "none"
and clean_output != "[]"
and "未找到" not in clean_output
and not is_empty_result
):
query_status = (
"第一次查询成功" if current_retry == 0 else f"{current_retry+1}次查询成功(RAG重写后)"
)
print(f"\n{query_status}")
return {
"code": 0,
"message": "成功",
"data": {"value": clean_output, "code": original_code},
"query_status": query_status,
}
finally:
# 恢复stdout
sys.stdout = old_stdout
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"\n执行代码时出错: {error_details}")
# 如果走到这里,说明结果为空或未找到匹配项,应该执行RAG重写流程
print("\n查询未找到结果,尝试定位具体缺失节点...")
# 解析原始查询路径中的最后一个节点名
import re
match = re.search(r"【([^】]+)】\s*$", original_query)
missing_node = match.group(1) if match else "未知节点"
error_info = {
"error_type": "NodeNotFoundError",
"error_message": f"{missing_node} 未找到,请检查该节点是否存在。",
"missing_node": missing_node,
"original_query": original_query,
"executed_code": original_code,
}
print("结构化错误信息:")
print(json.dumps(error_info, ensure_ascii=False, indent=2))
if current_retry < max_retries:
print("\n尝试使用RAG重写查询...")
try:
# 使用RAG重写查询和代码,并传递错误信息
rewritten = rewrite_query_parameters(original_query, original_code, error_info)
print(f"\nRAG重写结果: {json.dumps(rewritten, ensure_ascii=False, indent=2)}")
# 更新查询和代码
if "query" in rewritten and "code" in rewritten and rewritten["code"] != original_code:
print("\nRAG重写成功,使用新代码重试...")
input_value = rewritten["code"] # 直接使用重写后的代码
input_type = "code" # 切换到代码模式
current_retry += 1
continue # 继续下一次循环
else:
print("\nRAG重写未产生新代码,返回原始错误")
except Exception as e:
print(f"\nRAG重写失败: {e}")
# 记录错误但继续执行
# RAG重写失败或未产生新代码,返回原始错误
query_status = (
"第一次查询失败,RAG重写也失败"
if current_retry == 0
else f"{current_retry+1}次查询失败,RAG重写也失败"
)
print(f"\n{query_status}")
return {
"code": 1,
"message": f"{missing_node} 未找到,请检查该节点是否存在。",
"data": {"value": "", "code": original_code},
"error_info": error_info,
"query_status": query_status,
}
# 如果所有重试都失败
print("\n所有重试都失败,无法找到匹配的结果")
query_status = "所有重试都失败"
return {
"code": 1,
"message": "所有重试都失败,无法找到匹配的结果",
"data": {"value": "", "code": original_code},
"query_status": query_status,
}
def format_result(result):
"""
格式化查询结果
Args:
result: 查询结果(可能为 list、dict 或其他类型)
Returns:
str: 格式化后的结果
"""
# 处理 project 对象
if hasattr(result, "__module__") and result.__module__ == "project":
# 这是一个 project 模块中的对象
attrs = {k: v for k, v in result.__dict__.items() if not k.startswith("_")}
return json.dumps(attrs, ensure_ascii=False, indent=2)
# 处理 project 对象列表
if isinstance(result, list) and all(
hasattr(item, "__module__") and item.__module__ == "project" for item in result if hasattr(item, "__module__")
):
formatted_items = []
for item in result:
if hasattr(item, "__dict__"):
attrs = {k: v for k, v in item.__dict__.items() if not k.startswith("_")}
formatted_items.append(attrs)
else:
formatted_items.append(str(item))
return json.dumps(formatted_items, ensure_ascii=False, indent=2)
# 如果结果是字符串,可能包含调试信息,需要提取有用部分
if isinstance(result, str):
# 尝试提取最终结果部分
if "[]" in result:
return "未找到匹配的数据。"
# 如果包含节点信息,提取关键部分
import re
node_match = re.search(r"找到.*?labels=.*?properties=(.*?)>", result)
if node_match:
try:
# 提取属性部分并格式化
props_str = node_match.group(1).replace("'", '"')
import ast
props = ast.literal_eval(props_str)
formatted = "找到节点:\n"
for k, v in props.items():
formatted += f" {k}: {v}\n"
return formatted
except:
pass
# 如果包含查询结果数量
count_match = re.search(r"查询结果数量: (\d+)", result)
if count_match:
count = count_match.group(1)
if count == "0":
return "未找到匹配的数据。"
# 如果是列表
if isinstance(result, list):
if not result:
return "未找到匹配的数据。"
lines = [f"找到 {len(result)} 条匹配结果:"]
for i, item in enumerate(result, 1):
lines.append(f"\n结果 {i}:")
if hasattr(item, "items"): # 检查是否有items方法(字典或类似字典的对象)
try:
for k, v in item.items():
lines.append(f" {k}: {v}")
except:
lines.append(f" {item}")
else:
lines.append(f" {item}")
return "\n".join(lines)
# 如果是字典
elif isinstance(result, dict):
lines = ["查询结果:"]
for k, v in result.items():
lines.append(f" {k}: {v}")
return "\n".join(lines)
# 其他类型
else:
return str(result)
def format_dict_or_item(item):
"""
格式化字典或其他对象
Args:
item: 字典或其他对象
Returns:
str: 格式化后的字符串
"""
if isinstance(item, dict):
formatted = ""
for key, value in item.items():
formatted += f" {key}: {value}\n"
return formatted
return str(item)
question = {
"type": "query",
"value": "查找一下【工程数据/安装工程/安装/架空输电线路本体工程/杆塔工程/杆塔组立/铁塔、钢管杆组立】的类型为【主材】的【阿巴阿巴】",
}
result = nl_query_to_function_call(question)
print(result)
+111
View File
@@ -0,0 +1,111 @@
import requests
from typing import Any, Dict, List, Optional
from langchain_core.language_models import BaseLLM
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.outputs import LLMResult, Generation
from openai import OpenAI
import httpx
import logging
# class OpenAiLLM:
# def __init__(self, url, api_key, model_name):
# self._api_key = api_key
# self._url = url
# self._model = model_name
# def generate(self, prompt):
# client = OpenAI(api_key=self._api_key, base_url=self._url)
# try:
# # 创建 Completion 请求
# completion = client.chat.completions.create(
# model=self._model,
# messages=[
# {"role": "system", "content": "You are a helpful assistant"},
# {"role": "user", "content": prompt}
# ],
# timeout=httpx.Timeout(300.0),
# temperature=0.7,
# )
# return completion.choices[0].message.content
# except Exception as e:
# logging.error(f"LLM调用出错: {e}")
# return f"模型调用失败: {str(e)}"
# llm = OpenAiLLM(
# url="http://172.20.0.145:9995/v1",
# api_key="xxx",
# model_name="deepseek-r1-distill-qwen2.5-32b",
# )
class Embedding:
def __init__(self, url, api_key, model_name):
self._api_key = api_key
self._url = url
self._model = model_name
def embed(self, text):
# 使用OpenAI客户端
client = OpenAI(api_key=self._api_key, base_url=self._url)
try:
# 调用embeddings API
response = client.embeddings.create(model=self._model, input=text, timeout=httpx.Timeout(60.0))
# 返回嵌入向量
return response.data[0].embedding
except Exception as e:
logging.error(f"嵌入模型调用出错: {e}")
raise RuntimeError(f"嵌入请求失败: {str(e)}")
embedding = Embedding(url="http://10.1.16.39:9995/v1", api_key="xxx", model_name="bge-m3")
class SiliconFlowLLM(BaseLLM):
"""自定义硅基流动大模型调用类"""
api_url: str
api_key: str
model: str
def _generate(
self,
prompts: List[str],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> LLMResult:
from langchain_core.outputs import Generation, LLMResult
generations = []
for prompt in prompts:
try:
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
}
response = requests.post(self.api_url, json=payload, headers=headers)
response.raise_for_status()
text = response.json()["choices"][0]["message"]["content"]
generations.append([Generation(text=text)])
except Exception as e:
raise Exception(f"调用硅基流动API失败: {str(e)}")
return LLMResult(generations=generations)
@property
def _llm_type(self) -> str:
return "siliconflow"
llm = SiliconFlowLLM(
api_url="https://api.siliconflow.cn/v1/chat/completions",
api_key="sk-bbeamiumkouptsrueilgufqqyuumelcsivxwjbdugqwsqhwj",
model="Qwen/Qwen2.5-72B-Instruct",
)
+46
View File
@@ -0,0 +1,46 @@
from langchain_neo4j import nl_query_to_function_call
def main():
input_str = "杆塔总基数"
import json
# 打开并读取 JSON 文件
with open("./data/data.json", "r", encoding="utf-8") as f:
json_data = json.load(f)
temp_result = None
for item in json_data:
if item["指标名称"] == input_str:
temp_result = item
break
if temp_result:
if temp_result["code"] == "":
question = {
"type": "query",
"value": temp_result["指标描述"],
}
result = nl_query_to_function_call(question)
if result["message"] == "成功":
code_result = result["data"]["code"]
temp_result["code"] = code_result
with open("./data/data.json", "w", encoding="utf-8") as f:
json.dump(json_data, f, ensure_ascii=False, indent=4)
print(f"已更新 code 字段")
else:
question = {
"type": "code",
"value": temp_result["code"],
}
result = nl_query_to_function_call(question)
else:
pass
# 示例使用
if __name__ == "__main__":
main()
+383
View File
@@ -0,0 +1,383 @@
import json
import ast
import inspect
from typing import Dict, Any, List, Optional, Union
from langchain.agents import Tool, initialize_agent, AgentType
from langchain.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.schema import BaseOutputParser
from langchain.schema import BaseRetriever
from langchain.callbacks.manager import CallbackManagerForRetrieverRun
from langchain.schema import Document
from langchain.chains import LLMChain
from langchain.llms.base import BaseLLM
from prompt_templates import FUNCTION_RETURNS_LOOP_PROMPT
from llm import llm as base_llm
import project # 明确导入project模块
class CodeOnlyOutputParser(BaseOutputParser):
def parse(self, text: str) -> dict:
if "Final Answer:" in text:
code_part = text.split("Final Answer:")[-1].strip()
else:
code_part = text.strip()
return {"code": code_part}
def get_format_instructions(self) -> str:
return "只输出最终的Python代码,不要包含其他解释或内容。"
class KnowledgeGraphProcessor:
"""知识图谱处理器"""
def __init__(self, kg_file="kg_simple_hierarchy.json"):
"""初始化处理器"""
self.kg_data = self._load_kg_data(kg_file)
self.prompt = FUNCTION_RETURNS_LOOP_PROMPT
def _load_kg_data(self, file_path: str) -> Dict[str, Any]:
"""加载知识图谱数据"""
try:
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
print(f"警告: 未找到 {file_path} 文件")
return {}
except Exception as e:
print(f"加载知识图谱数据时出错: {e}")
return {}
def _extract_parameters_from_code(self, code: str) -> List[str]:
"""从代码中提取字符串参数"""
try:
tree = ast.parse(code)
parameters = []
for node in ast.walk(tree):
if isinstance(node, ast.Str):
parameters.append(node.s)
elif isinstance(node, ast.Constant) and isinstance(node.value, str):
parameters.append(node.value)
return parameters
except Exception as e:
print(f"解析代码时出错: {e}")
return []
def _find_node_by_path(self, path: str, data: Union[Dict[str, Any], List[Any]] = None) -> Optional[Dict[str, Any]]:
"""
根据路径查找节点
Args:
path: 节点路径,格式如 "工程数据/安装过程/安装/架空输电线路本体工程/杆塔工程"
data: 当前数据节点
Returns:
找到的节点及其子节点
"""
if data is None:
data = self.kg_data
path_parts = path.split("/")
current_part = path_parts[0]
# 如果data是列表,则遍历列表中的每个元素
if isinstance(data, list):
for item in data:
result = self._find_node_by_path(path, item)
if result:
return result
return None
# 如果data是字典,则按原来的逻辑处理
if isinstance(data, dict):
# 检查当前节点是否匹配
found_node = None
for key, value in data.items():
if isinstance(value, str) and current_part in value:
found_node = data
break
# 如果在当前层级找到了节点
if found_node:
# 如果还有更深的路径部分,则继续递归查找
if len(path_parts) > 1:
# 查找子节点
if "children" in found_node:
for child in found_node["children"]:
result = self._find_node_by_path("/".join(path_parts[1:]), child)
if result:
return result
return None
else:
# 已经找到最终节点
return found_node
# 如果当前层级没找到,则查找子节点
if "children" in data:
for child in data["children"]:
result = self._find_node_by_path(path, child)
if result:
return result
return None
def _search_in_kg(self, parameter: str, data: Union[Dict[str, Any], List[Any]] = None) -> Optional[Dict[str, Any]]:
"""在知识图谱中递归搜索参数"""
# 先尝试按路径查找
if "/" in parameter:
result = self._find_node_by_path(parameter)
if result:
return result
# 如果路径查找失败,则按关键词搜索
if data is None:
data = self.kg_data
# 如果是列表,遍历每个元素
if isinstance(data, list):
for item in data:
if isinstance(item, (dict, list)):
result = self._search_in_kg(parameter, item)
if result:
return result
return None
# 如果是字典,按原来的逻辑处理
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, str) and parameter in value:
return data
elif isinstance(value, list):
for item in value:
if isinstance(item, (dict, list)):
result = self._search_in_kg(parameter, item)
if result:
return result
elif isinstance(value, dict):
result = self._search_in_kg(parameter, value)
if result:
return result
return None
def _get_node_definition(self, node_type: str) -> str:
"""获取节点类型定义"""
try:
# 检查project模块中是否存在该类
if hasattr(project, node_type):
cls = getattr(project, node_type)
source = inspect.getsource(cls)
return source
else:
return f"未找到类型 {node_type} 的定义"
except NameError:
return f"获取类型 {node_type} 定义时出错: project模块未正确导入"
except Exception as e:
return f"获取类型 {node_type} 定义时出错: {e}"
def _extract_node_types(self, data: Dict[str, Any], node_types: set):
"""递归提取所有节点类型"""
if not isinstance(data, dict):
return
for key, value in data.items():
# 所有键都可能是节点类型
node_types.add(key)
if isinstance(value, dict):
self._extract_node_types(value, node_types)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
self._extract_node_types(item, node_types)
def _get_relevant_knowledge(self, code: str) -> tuple[str, str]:
"""根据代码获取相关知识库内容和节点定义"""
parameters = self._extract_parameters_from_code(code)
knowledge_base = ""
node_definitions = ""
for param in parameters:
# 处理【】格式的路径,提取最后一个部分
if "" in param:
path_parts = []
parts = param.split("")
for part in parts:
if "" in part:
clean_part = part.split("")[0].strip()
if clean_part:
path_parts.append(clean_part)
# 对于每个路径,只取最后一个部分
for path in path_parts:
if "/" in path:
# 提取路径中的最后一个部分
last_part = path.split("/")[-1].strip()
if last_part:
found_data = self._search_in_kg(last_part)
if found_data:
knowledge_base += f"节点 '{last_part}' 相关信息:\n"
knowledge_base += json.dumps(found_data, ensure_ascii=False, indent=2) + "\n\n"
# 提取节点类型
node_types = set()
self._extract_node_types(found_data, node_types)
for node_type in node_types:
definition = self._get_node_definition(node_type)
if definition and "未找到" not in definition and "出错" not in definition:
node_definitions += f"类型 {node_type} 定义:\n{definition}\n\n"
else:
# 如果没有/,直接使用整个部分
found_data = self._search_in_kg(path)
if found_data:
knowledge_base += f"节点 '{path}' 相关信息:\n"
knowledge_base += json.dumps(found_data, ensure_ascii=False, indent=2) + "\n\n"
# 提取节点类型
node_types = set()
self._extract_node_types(found_data, node_types)
for node_type in node_types:
definition = self._get_node_definition(node_type)
if definition and "未找到" not in definition and "出错" not in definition:
node_definitions += f"类型 {node_type} 定义:\n{definition}\n\n"
else:
# 处理普通参数
found_data = self._search_in_kg(param)
if found_data:
knowledge_base += f"参数 '{param}' 相关信息:\n"
knowledge_base += json.dumps(found_data, ensure_ascii=False, indent=2) + "\n\n"
# 提取节点类型
node_types = set()
self._extract_node_types(found_data, node_types)
for node_type in node_types:
definition = self._get_node_definition(node_type)
if definition and "未找到" not in definition and "出错" not in definition:
node_definitions += f"类型 {node_type} 定义:\n{definition}\n\n"
return knowledge_base, node_definitions
def process_query(
self, original_query: str, original_code: str, error_info: Dict[str, str] = None
) -> Dict[str, str]:
"""
处理查询并尝试修复错误
Args:
original_query: 原始查询字符串
original_code: 原始代码字符串
error_info: 错误信息字典 (可选)
Returns:
Dict: 包含修改后的query和code的字典
"""
print("\n====== RAG重写查询开始 ======")
print(f"原始查询: {original_query}")
print(f"原始代码: {original_code}")
print(f"错误信息: {error_info}")
# 初始化工具
tools = [
Tool(
name="search_knowledge_base",
func=lambda x: json.dumps(self._search_in_kg(x), ensure_ascii=False),
description="用于在知识图谱中搜索节点信息",
),
Tool(
name="get_node_definition",
func=lambda x: self._get_node_definition(x),
description="获取指定类型的节点定义",
),
]
# 使用基础 LLM
llm = base_llm
# 初始化 Agent
agent_executor = initialize_agent(
tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
handle_parsing_errors=True,
prompt=self.prompt,
output_parser=CodeOnlyOutputParser(),
)
# 构造输入变量
knowledge_base, node_definitions = self._get_relevant_knowledge(original_code)
print(f"知识库内容: {knowledge_base}")
print(f"节点定义: {node_definitions}")
input_vars = {
"input": original_query,
"original_query": original_query,
"error_info": json.dumps(error_info, ensure_ascii=False) if error_info else "",
"original_code": original_code,
"KnowledgeBase": knowledge_base,
"NodeDefinition": node_definitions,
}
print(f"Agent输入变量: {input_vars}")
# 调用 Agent 执行
response = agent_executor.invoke(input_vars)
print(f"Agent响应: {response}")
# 处理不同的返回格式
if isinstance(response, dict):
if "output" in response:
if isinstance(response["output"], dict) and "code" in response["output"]:
return {"query": original_query, "code": response["output"]["code"]}
elif isinstance(response["output"], str):
return {"query": original_query, "code": response["output"]}
# 如果无法解析,返回原始代码
print(f"警告: 无法解析 Agent 响应,返回原始代码")
return {"query": original_query, "code": original_code}
# 全局处理器实例(延迟初始化)
_processor = None
def _get_processor():
"""获取处理器实例(单例模式)"""
global _processor
if _processor is None:
_processor = KnowledgeGraphProcessor()
return _processor
def rewrite_query_parameters(
original_query: str, original_code: str, error_info: Dict[str, str] = None
) -> Dict[str, str]:
"""
重写查询参数的外部接口函数
Args:
original_query: 原始查询字符串
original_code: 原始代码字符串
error_info: 错误信息字典 (可选)
Returns:
Dict: 包含修改后的query和code的字典
Example:
result = rewrite_query_parameters("查询动态费用", 'search_node("动态费用")')
print(result["query"]) # 修改后的查询
print(result["code"]) # 修改后的代码
"""
processor = _get_processor()
return processor.process_query(original_query, original_code, error_info)
+613
View File
@@ -0,0 +1,613 @@
"""
软件知识图谱类定义
根据Ontology_Layer.txt文件中的知识图谱信息创建
"""
class ProjectTookiIt:
"""
项目类
描述: 代表整个项目结构的顶层容器,包含项目划分集
"""
def __init__(self):
self.project_division_set = ProjectDivisionSet() # 项目划分集对象
def get_division_item_by_path(self, path):
"""
通过路径获取项目划分对象
Args:
path (str): 以'/'分隔的多级项目划分名称路径
Returns:
ProjectDivisionItem|None: 对应的项目划分对象,如果路径不存在返回None,成功找到返回项目划分对象
Note:
当路径为空字符串时,会返回None
"""
names = path.split("/")
if not names:
raise ValueError("路径不能为空")
try:
current = self.project_division_set.get_division_item(names[0])
except KeyError:
return None
for name in names[1:]:
found = False
for child in current.children:
if child.项目名称 == name:
current = child
found = True
break
if not found:
return None
return current
def get_quantities_by_paths(self, paths_str, quantity_type=None, keyword=None):
"""
获取多个项目路径下的工程量对象
Args:
paths_str (str): 以'@@'分隔的多个项目路径字符串,每个路径以'/'分隔层级
quantity_type (str): 工程量类型('定额''主材''设备'或None表示所有类型)
keyword (str): 工程量代码或名称中的关键字(可选)
Returns:
list: 包含所有匹配的工程量对象的列表
"""
paths = paths_str.split("@@") if paths_str else []
results = []
for path in paths:
item = self.get_division_item_by_path(path.strip())
if item is None:
continue
def collect_quantities(node):
for quantity in node.get_quantities():
# 检查类型匹配
type_matched = True
if quantity_type:
if quantity_type == "定额" and not isinstance(quantity, Ration):
type_matched = False
elif quantity_type == "主材" and not isinstance(quantity, Material):
type_matched = False
elif quantity_type == "设备" and not isinstance(quantity, Equipment):
type_matched = False
# 检查关键字匹配
keyword_matched = True
if keyword:
keyword_matched = (keyword in (quantity.编码 or "")) or (keyword in (quantity.name or ""))
if type_matched and keyword_matched:
results.append(quantity)
for child in node.children:
collect_quantities(child)
collect_quantities(item)
return results
class ProjectDivisionSet:
"""
项目划分集
描述: 代表projectDivision下的第一层键和第二层建,例如"安装工程""安装"
JSON对应: projectDivision下的直接键名,如"安装工程": {...}、"安装": [...]
"""
def __init__(self):
self.category_name = None # xsd:string
self.division_items = {} # 存储项目划分项的字典,键为项目名称
def get_division_item(self, item_name):
"""
Get project division item by name
Args:
item_name (str): The name of item to find
Returns:
ProjectDivisionItem: The corresponding division item object
Raises:
KeyError: When item name does not exist
"""
if item_name not in self.division_items:
raise KeyError(f'Item name "{item_name}" does not exist')
return self.division_items[item_name]
# 工程属性查询方法
@abstractmethod
def get_project_property_by_name(self, name):
"""
通过名称获取工程属性
Args:
name (str): 工程属性名称
Returns:
str|None: 对应的工程属性值,如果不存在返回None
"""
pass
@abstractmethod
def get_project_property_by_parent_and_name(self, parent_path, partial_name):
"""
通过父节点路径和模糊名称获取工程属性
Args:
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
partial_name (str): 目标节点的模糊或不完整名称
Returns:
dict: 包含所有匹配的工程属性的字典,键为属性名,值为属性值
"""
pass
class ProjectDivisionItem:
"""
项目划分项
描述: 代表项目结构中的层级条目,具有自身的属性,并且可以包含子项目划分项或详细工作项
JSON对应: ProjectDivisionSet数组中的对象,或ProjectDivisionItem的children数组中type为"项目划分"的对象
"""
def __init__(self):
self.GUID = None # xsd:string
self.id = None # xsd:string
self.name = None # xsd:string
self.代码 = None # xsd:string (可选)
self.费率 = None # xsd:string
self.单位 = None # xsd:string (可选)
self.取费表id = None # xsd:string
self.颜色标记 = None # xsd:string
self.取费表 = None # xsd:string
self.合价含税 = None # xsd:string (可选)
self.type = None # xsd:string
self.专业类型 = None # xsd:string
self.资源库列表 = None # xsd:list (可选)
self.notCheck = None # xsd:string (可选)
self.children = [] # 存储子项目划分项的数组
self.quantities = [] # 存储工程量对象的数组
self.cost_set = None # xsd:CostSet
def get_cost_set(self):
"""
获取费用集对象
Returns:
CostSet: 关联的费用集对象
"""
return self.cost_set
def add_child(self, child_item):
"""
添加子项目划分项
Args:
child_item (ProjectDivisionItem): 子项目对象
Raises:
ValueError: 当child_item不是ProjectDivisionItem类型时
"""
if not isinstance(child_item, ProjectDivisionItem):
raise ValueError("child_item必须是ProjectDivisionItem类型")
self.children.append(child_item)
def get_child(self, index):
"""
根据索引获取子项目划分项
Args:
index (int): 子项目索引
Returns:
ProjectDivisionItem: 对应的子项目对象
Raises:
IndexError: 当索引超出范围时
"""
if index < 0 or index >= len(self.children):
raise IndexError(f'索引"{index}"超出范围')
return self.children[index]
def traverse(self, callback, depth=0):
"""
遍历树形结构
Args:
callback (function): 回调函数,接收当前节点和深度作为参数
depth (int): 当前深度,默认为0
"""
callback(self, depth)
for child in self.children:
child.traverse(callback, depth + 1)
def get_quantities(self):
"""
获取当前节点的ProjectQuantity及其子类(Ration, Material, Equipment)的实例
Returns:
list: 包含当前节点ProjectQuantity实例的列表
"""
return self.quantities
class ProjectQuantity:
"""
工程量
描述: 代表项目划分项(ProjectDivisionItem)下的具体工作单元或物料项,是定额、主材、设备的父类型
"""
def __init__(self):
self.id = None # xsd:string
self.类型 = None # xsd:string ("0"为定额,"1"为主材,"5"为设备)
self.name = None # xsd:string
self.编码 = None # xsd:string
self.单位 = None # xsd:string
self.数量 = None # xsd:string
self.资源库名称 = None # xsd:string
self.投标数量 = None # xsd:string (可选)
self.投标单价 = None # xsd:string (可选)
self.特征段 = None # xsd:string (可选)
self.关联父级量 = None # xsd:string (可选)
self.颜色标记 = None # xsd:string (可选)
self.单价不含税 = None
self.cost_set = None # xsd:CostSet
def get_cost_set(self):
"""
获取费用集对象
Returns:
CostSet: 关联的费用集对象
"""
return self.cost_set # xsd:string (可选)
class Ration(ProjectQuantity):
"""
定额
描述: 代表一种标准的工程量条目,通常包含详细的工、料、机消耗标准
"""
def __init__(self):
super().__init__()
self.计算式 = None # xsd:string
self.中标计算式 = None # xsd:string
self.人工费 = None # xsd:string
self.机械费 = None # xsd:string
self.甲供材料费不含税 = None # xsd:string
self.材料费 = None # xsd:string
self.定额系数 = None # xsd:string
self.人工系数 = None # xsd:string
self.材料系数 = None # xsd:string
self.机械系数 = None # xsd:string
self.定额范围 = None # xsd:string
self.定额章节名称 = None # xsd:string
self.费用类型 = None # xsd:string
self.甲供材料费含税 = None # xsd:string
self.投标合价 = None # xsd:string
self.其中甲供材料费 = None # xsd:string
self.合价不含税 = None # xsd:string
self.基价 = None # xsd:string
self.所属定额库 = None # xsd:string
class Material(ProjectQuantity):
"""
主材
描述: 代表工程中使用的主要材料
"""
def __init__(self):
super().__init__()
self.规格型号 = None # xsd:string
self.损耗率 = None # xsd:string
self.供货方 = None # xsd:string
self.集中配送 = None # xsd:string (可选)
self.单重 = None # xsd:string (可选)
self.市场价不含税 = None # xsd:string
self.市场价含税 = None # xsd:string
self.单价含税 = None # xsd:string
self.结算市场价不含税 = None # xsd:string (可选)
self.结算市场价含税 = None # xsd:string (可选)
self.基准价不含税 = None # xsd:string (可选)
self.基准价含税 = None # xsd:string (可选)
self.费用类型 = None # xsd:string
self.增值税率 = None # xsd:string (可选)
self.合价含税 = None # xsd:string
self.合价不含税 = None # xsd:string
self.线重 = None # xsd:string (可选)
self.制造长度 = None # xsd:string (可选)
self.截面积 = None # xsd:string (可选)
class Equipment(ProjectQuantity):
"""
设备
描述: 代表工程中安装或使用的设备
"""
def __init__(self):
super().__init__()
self.规格型号 = None # xsd:string
self.供货方 = None # xsd:string
self.运杂费率 = None # xsd:string (可选)
self.单价含税 = None # xsd:string
self.设备类型 = None # xsd:string (可选)
self.增值税率 = None # xsd:string (可选)
self.合价含税 = None # xsd:string
self.合价不含税 = None # xsd:string
class MaterialOrEquipment:
"""
材机
描述: 代表DetailedWorkItem中所列出的具体材料、人工或机械设备及其详细信息
"""
def __init__(self):
self.id = None # xsd:string
self.编码 = None # xsd:string
self.名称 = None # xsd:string
self.单位 = None # xsd:string
self.类型 = None # xsd:string
self.供货方 = None # xsd:string
self.预算价不含税 = None # xsd:string
self.市场价不含税 = None # xsd:string
self.预算价含税 = None # xsd:string
self.市场价含税 = None # xsd:string
self.结算预算价不含税 = None # xsd:string
self.结算市场价不含税 = None # xsd:string
self.结算预算价含税 = None # xsd:string
self.结算市场价含税 = None # xsd:string
self.暂估价 = None # xsd:string
self.拆分 = None # xsd:string
self.全口径市场价不含税 = None # xsd:string
self.全口径市场价含税 = None # xsd:string
self.商品砼 = None # xsd:string
self.数量 = None # xsd:string
self.是否未计价 = None # xsd:string
class CostSet:
"""
费用集
描述: 代表费用预览层级结构中的一个节点
"""
def __init__(self):
self.GUID = None # xsd:string
self.cost_items = {} # 存储费用项的字典,键为项目名称
def get_cost_item(self, item_name):
"""
根据名称获取费用项
Args:
item_name (str): 要查找的费用项名称
Returns:
CostItem: 对应的费用项对象
Raises:
KeyError: 当费用项名称不存在时
"""
if item_name not in self.cost_items:
raise KeyError(f'费用项名称"{item_name}"不存在')
return self.cost_items[item_name]
class CostItem:
"""
费用项
描述: 代表预览层级结构中的叶子节点,显示具体的费用项及其金额
"""
def __init__(self):
self.id = None # xsd:string
self.cost = None # xsd:string
self.sub_items = {} # 存储子费用项的字典,键为项目名称
def get_sub_item(self, item_name):
"""
根据名称获取子费用项
Args:
item_name (str): 要查找的子费用项名称
Returns:
CostItem: 对应的子费用项对象
Raises:
KeyError: 当子费用项名称不存在时
"""
if item_name not in self.sub_items:
raise KeyError(f'子费用项名称"{item_name}"不存在')
return self.sub_items[item_name]
class FeeTableTemplateSet:
"""
取费表模板集
描述:
"""
def __init__(self):
self.name = None # xsd:string
self.feetable_template = {} # 存储费用项的字典,键为项目名称
def get_feetable_template_item(self, item_name):
"""
根据名称获取取费表模板项
Args:
item_name (str): 要查找的取费表模板项名称
Returns:
FeeTableTemplateItem: 对应的取费表模板项对象
Raises:
KeyError: 当取费表模板项名称不存在时
"""
if item_name not in self.feetable_template:
raise KeyError(f'取费表模板项名称"{item_name}"不存在')
return self.feetable_template[item_name]
class FeeTableTemplateItem:
"""
取费表模板项
描述:
"""
def __init__(self):
self.name = None # xsd:string
self.OutlayID = None # xsd:string
self.type = None # xsd:string
self.profession = None # xsd:string
self.sub_feecollections = {} # 存储子费用项的字典,键为项目名称
def get_sub_fee(self, item_name):
"""
根据名称获取子取费
Args:
item_name (str): 要查找的子取费名称
Returns:
feecollections: 对应的子费用项对象
Raises:
KeyError: 当子费用项名称不存在时
"""
if item_name not in self.sub_feecollections:
raise KeyError(f'取费名称"{item_name}"不存在')
return self.sub_feecollections[item_name]
class FeeCollection:
"""
取费
描述:
"""
def __init__(self):
self.name = None # xsd:string
self.serialNumber = None # xsd:string
self.code = None # xsd:string
self.base = None # xsd:string
self.rate = None # xsd:string
self.sub_feecollections = {} # 存储子费用项的字典,键为项目名称
def get_sub_feecollections(self, item_name):
"""
根据名称获取子取费
Args:
item_name (str): 要查找的子费用名称
Returns:
feecollections: 对应的费用对象
Raises:
KeyError: 当子费用名称不存在时
"""
if item_name not in self.sub_feecollections:
raise KeyError(f'取费名称"{item_name}"不存在')
return self.sub_feecollections[item_name]
class FeeScheduleSet:
"""
费用表集
描述:
"""
def __init__(self):
self.name = None # xsd:string
self.feeschedule_template = {} # 存储费用项的字典,键为项目名称
def get_feetable_template_item(self, item_name):
"""
根据名称获取费用表项
Args:
item_name (str): 要查找的费用表项名称
Returns:
FeeScheduleItem: 对应的费用表项对象
Raises:
KeyError: 当费用表项名称不存在时
"""
if item_name not in self.feeschedule_template:
raise KeyError(f'取费表模板项名称"{item_name}"不存在')
return self.feeschedule_template[item_name]
class FeeScheduleItem:
"""
费用表项
描述:
"""
def __init__(self):
self.name = None # xsd:string
self.sub_fees = {} # 存储子费用项的字典,键为项目名称
def get_sub_fee(self, item_name):
"""
根据名称获取子费用
Args:
item_name (str): 要查找的子费用名称
Returns:
fees: 对应的子费用项对象
Raises:
KeyError: 当子费用项名称不存在时
"""
if item_name not in self.sub_fees:
raise KeyError(f'费用名称"{item_name}"不存在')
return self.sub_fees[item_name]
class Fee:
"""
取费
描述:
"""
def __init__(self):
self.name = None # xsd:string
self.serialNumber = None # xsd:string
self.code = None # xsd:string
self.rate = None # xsd:string
self.amount = None # xsd:string
self.sub_feecollections = {} # 存储子费用项的字典,键为项目名称
def get_sub_feecollections(self, item_name):
"""
根据名称获取子取费
Args:
item_name (str): 要查找的子费用名称
Returns:
feecollections: 对应的费用对象
Raises:
KeyError: 当子费用名称不存在时
"""
if item_name not in self.sub_feecollections:
raise KeyError(f'费用名称"{item_name}"不存在')
return self.sub_feecollections[item_name]
+455
View File
@@ -0,0 +1,455 @@
"""
软件知识图谱类定义
根据Ontology_Layer.txt文件中的知识图谱信息创建
"""
from abc import ABC, abstractmethod
import json
class ProjectTookiIt(ABC):
"""
项目类(抽象基类)
描述: 代表整个项目结构的顶层容器
"""
def __init__(self):
self.project_division_set = ProjectDivisionItem() # 项目划分集对象
# 项目划分查询方法
@abstractmethod
def get_division_item_by_path(self, path):
"""
通过路径获取项目划分对象
Args:
path (str): 以'/'分隔的多级项目划分名称路径
Returns:
ProjectDivisionItem|None: 对应的项目划分对象,如果路径不存在返回None,成功找到返回项目划分对象
Note:
当路径为空字符串时,会返回None
"""
pass
@abstractmethod
def get_division_node_by_parent_and_name(self, parent_path, partial_name):
"""
通过父节点路径和模糊节点名称获取项目划分对象
Args:
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
partial_name (str): 目标节点的模糊或不完整名称
Returns:
list: 包含所有匹配节点的列表,如果没有匹配返回空列表
"""
pass
# 工程量查询方法
@abstractmethod
def get_quantities_by_paths(self, paths_str):
"""
获取指定项目路径下的工程量对象
Args:
paths_str (str): 以'/'分隔的多级节点路径
Returns:
ProjectQuantity|None: 对应的工程量对象,如果路径不存在返回None,成功找到返回工程量对象
"""
pass
@abstractmethod
def get_quantities_node_by_parent_and_code(self, parent_path, quantity_type=None, code=None):
"""
通过父节点路径和编码获取工程量对象(定额)
Args:
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
quantity_type (str): 工程量类型('定额''主材''设备'或None表示所有类型)
code(str): 工程量编码,以'/'分隔的多个编码
Returns:
list: 包含所有匹配节点的列表,如果没有匹配返回空列表
"""
pass
@abstractmethod
def get_quantities_node_by_parent_and_name(self, parent_path, partial_name, quantity_type=None):
"""
通过父节点路径、模糊节点名称和类型获取工程量对象(主材或者设备)
Args:
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
partial_name (str): 目标节点的模糊或不完整名称
quantity_type (str): 工程量类型('定额''主材''设备'或None表示所有类型)
Returns:
list: 包含所有匹配节点的列表,如果没有匹配返回空列表
"""
pass
# 材机查询方法
@abstractmethod
def get_material_equipment_by_path(self, paths_str):
"""
通过路径获取材机对象
Args:
path (str): 以'/'分隔的多级项目划分名称路径
Returns:
list: 包含所有匹配的材机对象的列表
"""
pass
@abstractmethod
def get_material_equipment_by_parent_and_name(self, parent_path, partial_name):
"""
通过父节点路径和模糊名称获取材机对象
Args:
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
partial_name (str): 目标节点的模糊或不完整名称
Returns:
list: 包含所有匹配的材机对象的列表
"""
pass
# 取费表模板查询方法
@abstractmethod
def get_fee_template_by_path(self, paths_str):
"""
通过路径获取取费表模板
Args:
path (str): 以'/'分隔的多级项目划分名称路径
Returns:
list: 包含所有匹配的取费表模板对象的列表
"""
pass
@abstractmethod
def get_fee_template_by_parent_and_name(self, parent_path, partial_name):
"""
通过父节点路径和模糊名称获取取费表模板
Args:
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
partial_name (str): 目标节点的模糊或不完整名称
Returns:
list: 包含所有匹配的取费表模板对象的列表
"""
pass
# 费用表查询方法
@abstractmethod
def get_fee_schedule_on_auxiliary_expense_table(self, table_name, fee_name, fee: str):
"""
在辅助费用表中查找费用
Args:
table_name (str): 费用表名称
fee_name (str): 要查找的费用名称
fee (str): 匹配的费用值
Returns:
str: 匹配到的费用名称节点对应的费用值
"""
pass
@abstractmethod
def get_fee_schedule_on_other_expense_table(self, table_name, fee_name, fee):
"""
在其它费用表中查找费用
Args:
table_name (str): 费用表名称
fee_name (str): 要查找的费用名称
fee (str): 匹配的费用值
Returns:
str: 匹配到的费用名称节点对应的费用值
"""
pass
@abstractmethod
def get_fee_schedule_on_land_acquisition_fee_table_table(self, table_name, fee_name, fee):
"""
在其中:土地征用费表中查找费用
Args:
table_name (str): 费用表名称
fee_name (str): 要查找的费用名称
fee (str): 匹配的费用值
Returns:
str: 匹配到的费用名称节点对应的费用值
"""
pass
@abstractmethod
def get_fee_schedule_on_installation_price_difference_table(self, table_name, fee_name, fee):
"""
在安装价差表中查找费用
Args:
table_name (str): 费用表名称
fee_name (str): 要查找的费用名称
fee (str): 匹配的费用值
Returns:
str: 匹配到的费用名称节点对应的费用值
"""
pass
@abstractmethod
def get_fee_schedule_on_Engineering_Cost_table(self, table_name, fee_name, fee):
"""
在工程费用表中查找费用
Args:
table_name (str): 费用表名称
fee_name (str): 要查找的费用名称
fee (str): 匹配的费用值
Returns:
str: 匹配到的费用名称节点对应的费用值
"""
pass
class ProjectDivisionItem:
"""
项目划分项
描述: 代表项目结构中的层级条目,具有自身的属性,并且可以包含子项目划分项或详细工作项
JSON对应: ProjectDivisionSet数组中的对象,或ProjectDivisionItem的children数组中type为"项目划分"的对象
"""
def __init__(self):
self.GUID = None # xsd:string
self.id = None # xsd:string
self.name = None # xsd:string
self.代码 = None # xsd:string (可选)
self.费率 = None # xsd:string
self.单位 = None # xsd:string (可选)
self.取费表id = None # xsd:string
self.颜色标记 = None # xsd:string
self.取费表 = None # xsd:string
self.合价含税 = None # xsd:string (可选)
self.type = None # xsd:string
self.专业类型 = None # xsd:string
self.资源库列表 = None # xsd:list (可选)
self.notCheck = None # xsd:string (可选)
class ProjectQuantity:
"""
工程量
描述: 代表项目划分项(ProjectDivisionItem)下的具体工作单元或物料项,是定额、主材、设备的父类型
"""
def __init__(self):
self.id = None # xsd:string
self.类型 = None # xsd:string ("0"为定额,"1"为主材,"5"为设备)
self.name = None # xsd:string
self.编码 = None # xsd:string
self.单位 = None # xsd:string
self.数量 = None # xsd:string
self.资源库名称 = None # xsd:string
self.投标数量 = None # xsd:string (可选)
self.投标单价 = None # xsd:string (可选)
self.特征段 = None # xsd:string (可选)
self.关联父级量 = None # xsd:string (可选)
self.颜色标记 = None # xsd:string (可选)
self.单价不含税 = None
self.cost_set = None # xsd:CostSet
class Ration(ProjectQuantity):
"""
定额
描述: 代表一种标准的工程量条目,通常包含详细的工、料、机消耗标准
"""
def __init__(self):
super().__init__()
self.计算式 = None # xsd:string
self.中标计算式 = None # xsd:string
self.人工费 = None # xsd:string
self.机械费 = None # xsd:string
self.甲供材料费不含税 = None # xsd:string
self.材料费 = None # xsd:string
self.定额系数 = None # xsd:string
self.人工系数 = None # xsd:string
self.材料系数 = None # xsd:string
self.机械系数 = None # xsd:string
self.定额范围 = None # xsd:string
self.定额章节名称 = None # xsd:string
self.费用类型 = None # xsd:string
self.甲供材料费含税 = None # xsd:string
self.投标合价 = None # xsd:string
self.其中甲供材料费 = None # xsd:string
self.合价不含税 = None # xsd:string
self.基价 = None # xsd:string
self.所属定额库 = None # xsd:string
def __str__(self):
"""返回定额的字符串表示"""
attrs = {k: v for k, v in self.__dict__.items() if not k.startswith("_")}
return json.dumps(attrs, ensure_ascii=False, indent=2)
class Material(ProjectQuantity):
"""
主材
描述: 代表工程中使用的主要材料
"""
def __init__(self):
super().__init__()
self.规格型号 = None # xsd:string
self.损耗率 = None # xsd:string
self.供货方 = None # xsd:string
self.集中配送 = None # xsd:string (可选)
self.单重 = None # xsd:string (可选)
self.市场价不含税 = None # xsd:string
self.市场价含税 = None # xsd:string
self.单价含税 = None # xsd:string
self.结算市场价不含税 = None # xsd:string (可选)
self.结算市场价含税 = None # xsd:string (可选)
self.基准价不含税 = None # xsd:string (可选)
self.基准价含税 = None # xsd:string (可选)
self.费用类型 = None # xsd:string
self.增值税率 = None # xsd:string (可选)
self.合价含税 = None # xsd:string
self.合价不含税 = None # xsd:string
self.线重 = None # xsd:string (可选)
self.制造长度 = None # xsd:string (可选)
self.截面积 = None # xsd:string (可选)
def __str__(self):
"""返回材料的字符串表示"""
attrs = {k: v for k, v in self.__dict__.items() if not k.startswith("_")}
return json.dumps(attrs, ensure_ascii=False, indent=2)
class Equipment(ProjectQuantity):
"""
设备
描述: 代表工程中安装或使用的设备
"""
def __init__(self):
super().__init__()
self.规格型号 = None # xsd:string
self.供货方 = None # xsd:string
self.运杂费率 = None # xsd:string (可选)
self.单价含税 = None # xsd:string
self.设备类型 = None # xsd:string (可选)
self.增值税率 = None # xsd:string (可选)
self.合价含税 = None # xsd:string
self.合价不含税 = None # xsd:string
class MaterialOrEquipment:
"""
材机
描述: 代表DetailedWorkItem中所列出的具体材料、人工或机械设备及其详细信息
"""
def __init__(self):
self.id = None # xsd:string
self.编码 = None # xsd:string
self.名称 = None # xsd:string
self.单位 = None # xsd:string
self.类型 = None # xsd:string
self.供货方 = None # xsd:string
self.预算价不含税 = None # xsd:string
self.市场价不含税 = None # xsd:string
self.预算价含税 = None # xsd:string
self.市场价含税 = None # xsd:string
self.结算预算价不含税 = None # xsd:string
self.结算市场价不含税 = None # xsd:string
self.结算预算价含税 = None # xsd:string
self.结算市场价含税 = None # xsd:string
self.暂估价 = None # xsd:string
self.拆分 = None # xsd:string
self.全口径市场价不含税 = None # xsd:string
self.全口径市场价含税 = None # xsd:string
self.商品砼 = None # xsd:string
self.数量 = None # xsd:string
self.是否未计价 = None # xsd:string
class FeeTableTemplateItem:
"""
取费表模板项
描述:
"""
def __init__(self):
self.name = None # xsd:string
self.OutlayID = None # xsd:string
self.type = None # xsd:string
self.profession = None # xsd:string
class FeeCollection:
"""
取费
描述:
"""
def __init__(self):
self.name = None # xsd:string
self.serialNumber = None # xsd:string
self.code = None # xsd:string
self.base = None # xsd:string
self.rate = None # xsd:string
class FeeScheduleItem:
"""
费用表项
描述:
"""
def __init__(self):
self.name = None # xsd:string
class Fee:
"""
取费
描述:
"""
def __init__(self):
self.name = None # xsd:string
self.serialNumber = None # xsd:string
self.code = None # xsd:string
self.rate = None # xsd:string
self.amount = None # xsd:string
self.输出 = None # xsd:string (可选)
self.取费基数 = None # xsd:string (可选)
self.编制依据 = None # xsd:string (可选)
self.编码 = None # xsd:string (可选)
self.备注 = None # xsd:string (可选)
self.表一显示 = None # xsd:string (可选)
self.报表输出 = None # xsd:string (可选)
self.建安合计费 = None # xsd:string (可选)
self.合计费 = None # xsd:string (可选)
self.占总计 = None # xsd:string (可选)
self.建筑费 = None # xsd:string (可选)
self.安装费 = None # xsd:string (可选)
self.设备费 = None # xsd:string (可选)
self.其他费 = None # xsd:string (可选)
self.施工费 = None # xsd:string (可选)
self.单位投资 = None # xsd:string (可选)
+957
View File
@@ -0,0 +1,957 @@
from neo4j import GraphDatabase
from project import *
import atexit
class ProjectTookiItNeo4j(ProjectTookiIt):
"""
基于Neo4j数据库的项目类实现
"""
def __init__(self):
"""
初始化Neo4j连接
Args:
uri (str): Neo4j数据库URI
user (str): 用户名
password (str): 密码
"""
uri = "bolt://172.20.0.145:7687"
user = "neo4j"
password = "password"
super().__init__()
self.driver = GraphDatabase.driver(uri, auth=(user, password))
self.session = self.driver.session()
# 初始化其他必要的数据结构
self.material_equipment_dict = {} # 材机字典,键为ID
self.fee_templates = {} # 取费表模板字典,键为ID
self.fee_schedules = {} # 费用表字典,键为ID
self.project_properties = {} # 工程属性字典
def close(self):
"""
关闭数据库连接
"""
if self.session:
self.session.close()
if self.driver:
self.driver.close()
# 通用节点查询方法
def get_node_by_path(self, path, node_labels=None):
"""
通过路径获取节点对象
Args:
path (str): 以'/'分隔的多级节点路径
node_labels (list): 节点标签列表,用于过滤结果
Returns:
dict|None: 节点数据,如果路径不存在返回None
"""
if not path:
return None
# 分割路径为各个部分
path_parts = path.split("/")
# 构建查询
if len(path_parts) == 1:
# 只有一级路径,直接查询
if node_labels:
labels_str = ":" + "|:".join(node_labels)
query = f"""
MATCH (n{labels_str})
WHERE n.name = $name
RETURN n LIMIT 1
"""
else:
query = """
MATCH (n)
WHERE n.name = $name
RETURN n LIMIT 1
"""
params = {"name": path_parts[0]}
else:
# 多级路径,构建路径查询
last_part = path_parts[-1]
if node_labels:
labels_str = ":" + "|:".join(node_labels)
query = f"""
MATCH path = (root)-[*]->(target{labels_str})
WHERE target.name = $last_part
RETURN target as n LIMIT 1
"""
else:
query = """
MATCH path = (root)-[*]->(target)
WHERE target.name = $last_part
RETURN target as n LIMIT 1
"""
params = {"last_part": last_part}
try:
result = self.session.run(query, **params)
record = result.single()
if not record:
return None
return record["n"]
except Exception as e:
print(f"获取节点对象时出错: {e}")
return None
# 项目划分查询方法
def get_division_item_by_path(self, path):
"""
通过路径获取项目划分对象
"""
node_data = self.get_node_by_path(path, ["ProjectDivisionItem"])
if not node_data:
return None
item = ProjectDivisionItem()
for key, value in node_data.items():
if hasattr(item, key):
setattr(item, key, value)
return item
def get_division_node_by_parent_and_name(self, parent_path, partial_name):
"""
通过父节点路径和模糊节点名称获取项目划分对象,包括子节点
Args:
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
partial_name (str): 目标节点的模糊或不完整名称
Returns:
list: 包含所有匹配项目划分节点的列表,如果没有匹配返回空列表
"""
if not partial_name:
return []
# 使用通用方法获取父节点
parent_node_data = self.get_node_by_path(parent_path, ["ProjectDivisionItem"])
parent_node_id = parent_node_data["id"] if parent_node_data and "id" in parent_node_data else None
# 构建查询,根据是否有父节点ID调整查询条件,使用递归关系查询
if parent_node_id:
query = """
MATCH (p)-[:CONTAINS|HAS|RELATED_TO*]-(n:ProjectDivisionItem)
WHERE p.id = $parent_id AND n.name CONTAINS $partial_name
RETURN n LIMIT 50
"""
params = {"parent_id": parent_node_id, "partial_name": partial_name}
else:
query = """
MATCH (n:ProjectDivisionItem)
WHERE n.name CONTAINS $partial_name
RETURN n LIMIT 50
"""
params = {"partial_name": partial_name}
try:
result = self.session.run(query, **params)
items = []
for record in result:
node_data = record["n"]
item = ProjectDivisionItem()
for key, value in node_data.items():
if hasattr(item, key):
setattr(item, key, value)
items.append(item)
return items
except Exception as e:
print(f"通过父节点路径和模糊名称获取项目划分对象时出错: {e}")
return []
# 工程量查询方法
def get_quantities_by_paths(self, paths_str):
"""
获取指定项目路径下的工程量对象
Args:
paths_str (str): 以'/'分隔的多级节点路径
Returns:
ProjectQuantity|None: 对应的工程量对象,如果路径不存在返回None,成功找到返回工程量对象
"""
if not paths_str:
return None
# 使用通用方法获取节点,考虑所有可能的工程量类型
node_data = self.get_node_by_path(paths_str, ["ProjectQuantity", "Quota", "MainMaterial", "Equipment"])
if not node_data:
return None
# 根据节点标签或类型属性创建对应类型的对象
quantity = self._create_quantity_object(node_data)
# 填充属性
for key, value in node_data.items():
if hasattr(quantity, key):
setattr(quantity, key, value)
return quantity
def get_quantities_node_by_parent_and_code(self, parent_path, quantity_type=None, code=None):
"""
通过父节点路径和编码获取工程量对象(定额),包括子节点
Args:
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
quantity_type (str): 工程量类型('定额''主材''设备'或None表示所有类型)
code(str): 工程量编码,以'/'分隔的多个编码
Returns:
list: 包含所有匹配节点的列表,如果没有匹配返回空列表
"""
if not code:
return []
# 使用通用方法获取父节点
parent_node_data = self.get_node_by_path(parent_path)
# 从路径中获取父节点名称
path_parts = parent_path.split("/")
parent_name = path_parts[-1]
# 处理编码,可能有多个编码用/分隔
code_parts = code.split("/")
code_conditions = []
for code_part in code_parts:
if code_part:
code_conditions.append(f"q.编码 = '{code_part}'")
if not code_conditions:
return []
code_query = " OR ".join(code_conditions)
# 根据工程量类型确定标签
node_labels = []
if quantity_type == "定额":
node_labels = ["ProjectQuantity", "Quota"]
elif quantity_type == "主材":
node_labels = ["ProjectQuantity", "MainMaterial"]
elif quantity_type == "设备":
node_labels = ["ProjectQuantity", "Equipment"]
else:
node_labels = ["ProjectQuantity"]
# 构建标签字符串
labels_str = ":" + ":".join(node_labels) if node_labels else ""
# 使用name属性进行匹配
query = f"""
MATCH (p)-[*1..5]->(q{labels_str})
WHERE p.name = $parent_name AND ({code_query})
RETURN q
LIMIT 10
"""
params = {"parent_name": parent_name}
try:
result = self.session.run(query, params)
quantities = []
for record in result:
node_data = record["q"]
quantity = self._create_quantity_object(node_data, quantity_type)
# 将节点属性赋值到对象
for key, value in node_data.items():
setattr(quantity, key, value)
# 转换为字典
if hasattr(quantity, "to_dict"):
quantities.append(quantity.to_dict())
else:
# 如果没有 to_dict 方法,就用 vars() 动态获取属性
quantities.append(vars(quantity))
return quantities
except Exception as e:
print(f"通过编码获取工程量对象时出错: {e}")
import traceback
traceback.print_exc()
return []
def get_quantities_node_by_parent_and_name(self, parent_path, partial_name, quantity_type=None):
"""
通过父节点路径、模糊节点名称和类型获取工程量对象(主材或者设备),包括子节点
Args:
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
partial_name (str): 目标节点的模糊或不完整名称
quantity_type (str): 工程量类型('定额''主材''设备'或None表示所有类型)
Returns:
list: 包含所有匹配节点的列表,如果没有匹配返回空列表
"""
if not partial_name:
return []
# 调试输出
print(f"搜索父路径: {parent_path}")
print(f"搜索名称: {partial_name}")
print(f"搜索类型: {quantity_type}")
# 使用通用方法获取父节点
parent_node_data = self.get_node_by_path(parent_path)
if parent_node_data:
print(f"找到父节点: {parent_node_data}")
else:
print("未找到父节点")
parent_node_id = parent_node_data["id"] if parent_node_data and "id" in parent_node_data else None
# 根据工程量类型确定标签和类型条件
node_labels = []
type_condition = ""
if quantity_type == "定额":
node_labels = ["ProjectQuantity", "Quota"]
type_condition = "q.类型 = '0'"
elif quantity_type == "主材":
node_labels = ["ProjectQuantity", "MainMaterial"]
type_condition = "q.类型 = '1'"
elif quantity_type == "设备":
node_labels = ["ProjectQuantity", "Equipment"]
type_condition = "q.类型 = '5'"
else:
node_labels = ["ProjectQuantity"]
# 构建标签字符串
labels_str = ":" + ":".join(node_labels) if node_labels else ""
# 扩展关系类型
relationship_types = "CONTAINS|HAS|RELATED_TO|USES|BELONGS_TO"
# 构建查询 - 使用递归关系查询
if parent_node_id:
query = f"""
MATCH (p)-[:{relationship_types}*1..10]->(q{labels_str})
WHERE p.id = $parent_id AND q.name CONTAINS $partial_name
{f'AND {type_condition}' if type_condition else ''}
RETURN q LIMIT 50
"""
params = {"parent_id": parent_node_id, "partial_name": partial_name}
print(f"执行查询: {query}")
print(f"参数: {params}")
else:
query = f"""
MATCH (q{labels_str})
WHERE q.name CONTAINS $partial_name
{f'AND {type_condition}' if type_condition else ''}
RETURN q LIMIT 50
"""
params = {"partial_name": partial_name}
print(f"执行全局查询: {query}")
try:
result = self.session.run(query, **params)
quantities = []
for record in result:
node_data = record["q"]
print(f"找到节点: {node_data}")
# 创建对应类型的对象
quantity = self._create_quantity_object(node_data, quantity_type)
# 填充属性
for key, value in node_data.items():
if hasattr(quantity, key):
setattr(quantity, key, value)
quantities.append(quantity)
print(f"查询结果数量: {len(quantities)}")
return quantities
except Exception as e:
print(f"通过名称获取工程量对象时出错: {e}")
import traceback
traceback.print_exc()
return []
# 辅助方法,用于根据节点数据创建对应类型的工程量对象
def _create_quantity_object(self, node_data, quantity_type=None):
"""
根据节点数据创建对应类型的工程量对象
Args:
node_data (dict): 节点数据
quantity_type (str): 工程量类型('定额''主材''设备'或None)
Returns:
ProjectQuantity: 创建的工程量对象
"""
# 如果指定了类型,直接创建对应类型的对象
if quantity_type == "定额":
return Ration()
elif quantity_type == "主材":
return Material()
elif quantity_type == "设备":
return Equipment()
# 如果没有指定类型,尝试通过节点属性或标签判断
if "类型" in node_data:
if node_data["类型"] == "0":
return Ration()
elif node_data["类型"] == "1":
return Material()
elif node_data["类型"] == "5":
return Equipment()
# 通过标签判断
labels = list(node_data.labels) if hasattr(node_data, "labels") else []
if "Quota" in labels:
return Ration()
elif "MainMaterial" in labels:
return Material()
elif "Equipment" in labels:
return Equipment()
# 默认返回基类对象
return ProjectQuantity()
# 材机查询方法实现
def get_material_equipment_by_path(self, paths_str):
"""
通过路径获取材机对象
Args:
paths_str (str): 以'/'分隔的多级项目划分名称路径
Returns:
list: 包含所有匹配的材机对象的列表
"""
if not paths_str:
return []
# 使用通用方法获取节点
node_data = self.get_node_by_path(paths_str)
node_id = node_data["id"] if node_data and "id" in node_data else None
if not node_id:
return []
# 查询与该节点关联的所有材机对象
query = """
MATCH (p)-[r]-(m)
WHERE p.id = $node_id AND (m:MaterialOrEquipment OR m:Material OR m:Equipment)
RETURN m LIMIT 50
"""
params = {"node_id": node_id}
try:
result = self.session.run(query, **params)
materials = []
for record in result:
material = self._create_material_object(record["m"])
materials.append(material)
# 更新缓存
if hasattr(material, "id") and material.id:
self.material_equipment_dict[material.id] = material
return materials
except Exception as e:
print(f"通过路径获取材机对象时出错: {e}")
return []
def get_material_equipment_by_parent_and_name(self, parent_path, partial_name):
"""
通过父节点路径和模糊名称获取材机对象
Args:
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
partial_name (str): 目标节点的模糊或不完整名称
Returns:
list: 包含所有匹配的材机对象的列表
"""
if not partial_name:
return []
# 使用通用方法获取父节点
parent_node_data = self.get_node_by_path(parent_path)
parent_node_id = parent_node_data["id"] if parent_node_data and "id" in parent_node_data else None
# 构建查询,根据是否有父节点ID调整查询条件
if parent_node_id:
# 如果找到了父节点,查找与父节点有关系的材机节点
query = """
MATCH (p)-[:CONTAINS|HAS|USES|RELATED_TO]-(m)
WHERE p.id = $parent_id AND m.name CONTAINS $partial_name
AND (m:MaterialOrEquipment OR m:Material OR m:Equipment)
RETURN m LIMIT 20
"""
params = {"parent_id": parent_node_id, "partial_name": partial_name}
else:
# 如果没有找到父节点或没有提供父节点路径,只按名称查询
query = """
MATCH (m)
WHERE m.name CONTAINS $partial_name
AND (m:MaterialOrEquipment OR m:Material OR m:Equipment)
RETURN m LIMIT 20
"""
params = {"partial_name": partial_name}
try:
result = self.session.run(query, **params)
materials = []
for record in result:
material = self._create_material_object(record["m"])
materials.append(material)
# 更新缓存
if hasattr(material, "id") and material.id:
self.material_equipment_dict[material.id] = material
return materials
except Exception as e:
print(f"通过父节点路径和模糊名称获取材机对象时出错: {e}")
return []
# 辅助方法,用于创建材机对象并填充属性
def _create_material_object(self, node_data):
"""
根据节点数据创建材机对象并填充属性
Args:
node_data (dict): 节点数据
Returns:
MaterialOrEquipment: 创建的材机对象
"""
material = MaterialOrEquipment()
# 填充属性
for key, value in node_data.items():
if hasattr(material, key):
setattr(material, key, value)
return material
# 取费表模板查询方法实现
def get_fee_template_by_path(self, paths_str):
"""
通过路径获取取费表模板
Args:
paths_str (str): 以'/'分隔的多级项目划分名称路径
Returns:
list: 包含所有匹配的取费表模板对象的列表
"""
if not paths_str:
return []
# 使用通用方法获取节点
node_data = self.get_node_by_path(paths_str)
node_id = node_data["id"] if node_data and "id" in node_data else None
if not node_id:
return []
# 查询与该节点关联的所有取费表模板
query = """
MATCH (p)-[r]-(t)
WHERE p.id = $node_id AND (t:FeeTableTemplate OR t.type = 'FeeTableTemplate')
RETURN t LIMIT 20
"""
params = {"node_id": node_id}
try:
result = self.session.run(query, **params)
templates = []
for record in result:
template = self._create_fee_template_object(record["t"])
templates.append(template)
return templates
except Exception as e:
print(f"通过路径获取取费表模板时出错: {e}")
return []
def get_fee_template_by_parent_and_name(self, parent_path, partial_name):
"""
通过父节点路径和模糊名称获取取费表模板
Args:
parent_path (str): 父节点的路径,以'/'分隔的多级节点路径
partial_name (str): 目标节点的模糊或不完整名称
Returns:
list: 包含所有匹配的取费表模板对象的列表
"""
if not partial_name:
return []
# 使用通用方法获取父节点
parent_node_data = self.get_node_by_path(parent_path)
parent_node_id = parent_node_data["id"] if parent_node_data and "id" in parent_node_data else None
# 构建查询,根据是否有父节点ID调整查询条件
if parent_node_id:
query = """
MATCH (p)-[:CONTAINS|HAS|USES|RELATED_TO]-(t)
WHERE p.id = $parent_id AND t.name CONTAINS $partial_name
AND (t:FeeTableTemplate OR t.type = 'FeeTableTemplate')
RETURN t LIMIT 20
"""
params = {"parent_id": parent_node_id, "partial_name": partial_name}
else:
query = """
MATCH (t)
WHERE t.name CONTAINS $partial_name
AND (t:FeeTableTemplate OR t.type = 'FeeTableTemplate')
RETURN t LIMIT 20
"""
params = {"partial_name": partial_name}
try:
result = self.session.run(query, **params)
templates = []
for record in result:
template = self._create_fee_template_object(record["t"])
templates.append(template)
return templates
except Exception as e:
print(f"通过父节点路径和模糊名称获取取费表模板时出错: {e}")
return []
# 辅助方法,用于创建取费表模板对象并填充属性
def _create_fee_template_object(self, node_data):
"""
根据节点数据创建取费表模板对象并填充属性
Args:
node_data (dict): 节点数据
Returns:
FeeTableTemplateItem: 创建的取费表模板对象
"""
template = FeeTableTemplateItem()
# 填充属性
for key, value in node_data.items():
if hasattr(template, key):
setattr(template, key, value)
# 更新缓存
if hasattr(template, "OutlayID") and template.OutlayID:
self.fee_templates[template.OutlayID] = template
return template
# 费用表查询方法实现
def get_fee_schedule_on_auxiliary_expense_table(self, table_name, fee_name, fee):
"""
在辅助费用表中查找费用
Args:
table_name (str): 费用表名称
fee_name (str): 要查找的费用名称
fee (str): 匹配的费用值属性名
Returns:
str: 匹配到的费用名称节点对应的费用值
"""
if not table_name or not fee_name or not fee:
return None
# 构建查询,查找辅助费用表中的特定费用 - 使用任意关系类型
query = """
MATCH (t:FeeScheduleItem)-[r]->(f:Fee)
WHERE t.name = $table_name AND f.name = $fee_name
RETURN f LIMIT 1
"""
params = {"table_name": table_name, "fee_name": fee_name}
try:
result = self.session.run(query, **params)
all_records = result.data()
if len(all_records) > 0:
fee_node = all_records[0]["f"]
value = fee_node.get(fee)
if value is not None:
return value
return None
except Exception as e:
print(f"在辅助费用表中查找费用时出错: {e}")
return None
def get_fee_schedule_on_other_expense_table(self, table_name, fee_name, fee):
"""
在其它费用表中查找费用
Args:
table_name (str): 费用表名称
fee_name (str): 要查找的费用名称
fee (str): 匹配的费用值属性名
Returns:
str: 匹配到的费用名称节点对应的费用值
"""
if not table_name or not fee_name or not fee:
return None
# 构建查询,查找其它费用表中的特定费用 - 使用任意关系类型
query = """
MATCH (t:FeeScheduleItem)-[r]->(f:Fee)
WHERE t.name = $table_name AND f.name = $fee_name
RETURN f LIMIT 1
"""
params = {"table_name": table_name, "fee_name": fee_name}
try:
result = self.session.run(query, **params)
all_records = result.data()
if len(all_records) > 0:
fee_node = all_records[0]["f"]
value = fee_node.get(fee)
if value is not None:
return value
return None
except Exception as e:
print(f"在其它费用表中查找费用时出错: {e}")
return None
def get_fee_schedule_on_land_acquisition_fee_table_table(self, table_name, fee_name, fee):
"""
在土地征用费表中查找费用
Args:
table_name (str): 费用表名称
fee_name (str): 要查找的费用名称
fee (str): 匹配的费用值属性名
Returns:
str: 匹配到的费用名称节点对应的费用值
"""
if not table_name or not fee_name or not fee:
return None
# 构建查询,查找土地征用费表中的特定费用 - 使用任意关系类型
query = """
MATCH (t:FeeScheduleItem)-[r]->(f:Fee)
WHERE t.name = $table_name AND f.name = $fee_name
RETURN f LIMIT 1
"""
params = {"table_name": table_name, "fee_name": fee_name}
try:
result = self.session.run(query, **params)
all_records = result.data()
if len(all_records) > 0:
fee_node = all_records[0]["f"]
value = fee_node.get(fee)
if value is not None:
return value
return None
except Exception as e:
print(f"在土地征用费表中查找费用时出错: {e}")
return None
def get_fee_schedule_on_installation_price_difference_table(self, table_name, fee_name, fee):
"""
在安装价差表中查找费用
Args:
table_name (str): 费用表名称
fee_name (str): 要查找的费用名称
fee (str): 匹配的费用值属性名
Returns:
str: 匹配到的费用名称节点对应的费用值
"""
if not table_name or not fee_name or not fee:
return None
# 构建查询,查找安装价差表中的特定费用 - 使用任意关系类型
query = """
MATCH (t:FeeScheduleItem)-[r]->(f:Fee)
WHERE t.name = $table_name AND f.name = $fee_name
RETURN f LIMIT 1
"""
params = {"table_name": table_name, "fee_name": fee_name}
try:
result = self.session.run(query, **params)
all_records = result.data()
if len(all_records) > 0:
fee_node = all_records[0]["f"]
value = fee_node.get(fee)
if value is not None:
return value
return None
except Exception as e:
print(f"在安装价差表中查找费用时出错: {e}")
return None
def get_fee_schedule_on_Engineering_Cost_table(self, table_name, fee_name, fee):
"""
在工程费用表中查找费用
Args:
table_name (str): 费用表名称
fee_name (str): 要查找的费用名称
fee (str): 匹配的费用值属性名
Returns:
str: 匹配到的费用名称节点对应的费用值
"""
if not table_name or not fee_name or not fee:
return None
# 调试输出
print(f"查询费用表: {table_name}")
print(f"查询费用名称: {fee_name}")
print(f"查询费用属性: {fee}")
# 构建查询,使用递归关系查询,并扩展关系类型
query = """
MATCH (t:FeeScheduleItem)-[r*1..5]->(f:Fee)
WHERE t.name = $table_name AND f.name = $fee_name
RETURN f LIMIT 10
"""
params = {"table_name": table_name, "fee_name": fee_name}
print(f"执行查询: {query}")
print(f"参数: {params}")
try:
result = self.session.run(query, **params)
all_records = result.data()
print(f"查询结果数量: {len(all_records)}")
if len(all_records) > 0:
fee_node = all_records[0]["f"]
print(f"找到费用节点: {fee_node}")
# 获取节点的所有属性,用于调试
for key, value in fee_node.items():
print(f"属性: {key} = {value}")
value = fee_node.get(fee)
if value is not None:
print(f"找到费用值: {value}")
return value
else:
print(f"节点中没有属性: {fee}")
# 如果没有找到,尝试另一种查询方式,使用CONTAINS进行模糊匹配
print("尝试使用模糊匹配...")
backup_query = """
MATCH (t:FeeScheduleItem)-[r*1..5]->(f:Fee)
WHERE t.name CONTAINS $table_name AND f.name CONTAINS $fee_name
RETURN f LIMIT 10
"""
backup_result = self.session.run(backup_query, **params)
backup_records = backup_result.data()
print(f"模糊匹配结果数量: {len(backup_records)}")
if len(backup_records) > 0:
fee_node = backup_records[0]["f"]
print(f"找到费用节点(模糊匹配): {fee_node}")
# 获取节点的所有属性,用于调试
for key, value in fee_node.items():
print(f"属性: {key} = {value}")
value = fee_node.get(fee)
if value is not None:
print(f"找到费用值: {value}")
return value
else:
print(f"节点中没有属性: {fee}")
return None
except Exception as e:
print(f"在工程费用表中查找费用时出错: {e}")
import traceback
traceback.print_exc()
return None
class ProjectBuilder:
"""
项目构建器
描述: 用于构建项目对象的构建器
"""
_instance = None
@staticmethod
def build():
"""
构建并返回项目实例
Returns:
ProjectTookiItNeo4j: 创建的项目实例
"""
# 如果已经有实例,先关闭它
if ProjectBuilder._instance is not None:
ProjectBuilder._instance.close()
# 创建新实例
ProjectBuilder._instance = ProjectTookiItNeo4j()
return ProjectBuilder._instance
@staticmethod
def close():
"""
关闭当前项目实例的连接
"""
if ProjectBuilder._instance is not None:
ProjectBuilder._instance.close()
ProjectBuilder._instance = None
# 注册退出处理函数,确保程序退出时自动关闭连接
atexit.register(ProjectBuilder.close)
# project = ProjectBuilder.build()
# result = project.get_quantities_node_by_parent_and_code(
# "工程数据/安装工程/安装/架空输电线路本体工程/基础工程", "定额", "YX2-1/YX2-2/YX2-3/YX2-6/YX2-7"
# )
# print(result)
+99
View File
@@ -0,0 +1,99 @@
from langchain.prompts import PromptTemplate
FUNCTION_CALL_TEMPLATE = """
你是一个专业的Python工程师。我会给你一个用户问题,你需要将其转换为对应的Python代码
可用工具:
{tools}
工具名称: {tool_names}
你的任务是:
1. 首先从用户的{query}中去提取出关键节点信息出来
2. 将提取出来的信息在知识库中内容{KnowledgeBase}和节点属性{NodeDefinition}中进行查找对比是否正确,如果不正确,那么需要你从新解析用户的问题
3. 根据用户输入解析出来的信息来从{project_class_methods}中定义的查找方法类确定使用哪个方法进行查询
4. 每次只能调用一个方法,不要同时调用多个方法
5. 模板中Function为需要根据用户输入来判断的方法名,parameter为用户输入中解析出来的方法的参数
6. 使用固定的python模板来生成Python代码
7. 代码应该完整,可以直接执行,并且能够返回查询结果
{agent_scratchpad}
输出代码模板:
def neo4j_find_function():
project = ProjectBuilder.build()
result = project.Function(parameter)
print(result)
注意:
- 必须要严格按照给定代码模板来输出代码,不得对模板进行修改等操作。
- 代码应该简洁明了,只需要调用适当的方法并返回结果
- 如果需要对结果进行处理(如单位换算,简单四则运算等),可以添加必要的代码
- 只输出Python代码,不要添加任何解释或注释
- 不要输出任何其他无关的内容出来其他内容(包括多余的符号,字符等)
使用以下格式:
思考: [你的推理过程]
行动: 工具名称
行动输入: 工具参数
观察: 工具返回的结果
...重复以上步骤...
思考: 我现在已经收集到所有必要的信息,可以生成最终的Python代码。
Final Answer:
def neo4j_find_function():
project = ProjectBuilder.build()
result = project.Function(parameter)
print(result)
"""
FUNCTION_CALL_PROMPT = PromptTemplate.from_template(FUNCTION_CALL_TEMPLATE)
###########################################################################################################################################################################
FUNCTION_RETURNS_LOOP_TEMPLATE = """
你是一个专业的Python工程师。我会给你一个用户问题和错误信息,你需要将其转换为对应的Python代码
你的任务是:
1. 首先从用户的{original_query}中去提取出关键节点信息出来
2. 将提取出来的信息在知识库中内容{KnowledgeBase}和节点属性{NodeDefinition}中进行查找对比是否正确,如果不正确,那么需要你从新解析用户的问题
3. 根据用户输入解析出来的信息来、需要修改的代码{original_code}和代码的错误信息{error_info}来对代码和参数进行修改
4. 每次只能调用一个方法,不要同时调用多个方法
5. 模板中Function为需要根据用户输入来判断的方法名,parameter为用户输入中解析出来的方法的参数
6. 使用固定的python模板来生成Python代码
7. 代码应该完整,可以直接执行,并且能够返回查询结果
注意:
- 你的任务是对错误的代码和错误参数进行修正,而不是返回错误信息和解释
- 如果要查找的节点在知识图谱中不存在,那么请你返回一个类似的节点,而不是返回错误信息
- 你的输出只能修改后的错误Python代码,不要添加任何解释或注释
- 代码应该简洁明了,只需要调用适当的方法并返回结果
- 如果需要对结果进行处理(如单位换算,简单四则运算等),可以添加必要的代码
- 如果没有办法根据已有的信息对代码进行修改,那么只需要返回原本的错误代码即可,而不是去返回给我一段自然语言文本
请根据以下输入进行推理并修正代码:
思考: [你的推理过程]
行动: 工具名称
行动输入: 工具参数
观察: 工具返回的结果
...重复以上步骤...
思考: 我已经根据错误信息修正了参数或路径,可以生成新的代码。
Final Answer:
def neo4j_find_function():
project = ProjectBuilder.build()
result = project.Function(parameter)
print(result)
"""
FUNCTION_RETURNS_LOOP_PROMPT: PromptTemplate = PromptTemplate.from_template(FUNCTION_RETURNS_LOOP_TEMPLATE)
+18
View File
@@ -0,0 +1,18 @@
from project_implementation import ProjectBuilder
def find_specific_node():
project = ProjectBuilder.build()
table_name = "工程费用"
fee_name = "动态费用"
fee = "其他费(QTF)"
result = project.get_fee_schedule_on_Engineering_Cost_table(table_name, fee_name, fee)
print(result)
if __name__ == "__main__":
find_specific_node()