216 lines
7.5 KiB
Python
216 lines
7.5 KiB
Python
import psycopg2
|
|
from psycopg2 import sql
|
|
import os
|
|
import json
|
|
from datetime import timezone, timedelta
|
|
|
|
class PgSql:
|
|
"""
|
|
用于连接和操作 PostgreSQL 数据库的类。
|
|
|
|
该类封装了数据库连接、关闭连接以及执行特定查询的方法,
|
|
主要用于从 Dify 应用相关的表中获取数据。
|
|
"""
|
|
def __init__(self):
|
|
"""
|
|
初始化 PgSql 实例并建立数据库连接。
|
|
"""
|
|
self.connection = None
|
|
self.connect_sql()
|
|
|
|
def connect_sql(self):
|
|
"""
|
|
连接到 PostgreSQL 数据库。
|
|
|
|
使用预定义的凭据连接到 'dify' 数据库。
|
|
如果连接失败,会打印错误信息。
|
|
"""
|
|
try:
|
|
# 连接数据库
|
|
self.connection = psycopg2.connect(
|
|
user="postgres",
|
|
password="difyai123456",
|
|
host="172.20.0.145",
|
|
port=5432,
|
|
database="dify"
|
|
)
|
|
|
|
except (Exception, psycopg2.Error) as error:
|
|
print("Error while connecting to PostgreSQL", error)
|
|
|
|
def close_connection(self):
|
|
"""
|
|
关闭当前的 PostgreSQL 数据库连接。
|
|
|
|
如果存在活动的连接,则关闭它并打印确认信息。
|
|
"""
|
|
if self.connection:
|
|
self.connection.close()
|
|
print("PostgreSQL connection is closed")
|
|
|
|
|
|
def get_appinfo(self, appid:str)->dict | None:
|
|
"""
|
|
根据应用 ID 从 'apps' 表中获取应用信息。
|
|
|
|
Args:
|
|
appid: 目标应用的 ID。
|
|
|
|
Returns:
|
|
一个字典,其中键是列名,值是对应的应用数据。
|
|
如果未找到应用或发生错误,则返回 None。
|
|
"""
|
|
try:
|
|
with self.connection.cursor() as cursor:
|
|
cursor.execute(
|
|
"""
|
|
SELECT * FROM apps WHERE id = %s
|
|
""",
|
|
(appid,)
|
|
)
|
|
result = cursor.fetchone()
|
|
if result:
|
|
colnames = [desc[0] for desc in cursor.description]
|
|
return dict(zip(colnames, result))
|
|
return None
|
|
except (Exception, psycopg2.Error) as error:
|
|
print("Error while getting tenant_id by appid", error)
|
|
|
|
|
|
def get_messages_info(self, appid:str, query:str)->dict | None:
|
|
"""
|
|
根据应用 ID 和查询内容从 'messages' 表中获取消息信息。
|
|
|
|
Args:
|
|
appid: 目标应用的 ID。
|
|
query: 用户查询的具体内容。
|
|
|
|
Returns:
|
|
一个字典,其中键是列名,值是对应的消息数据。
|
|
如果未找到消息或发生错误,则返回 None。
|
|
"""
|
|
try:
|
|
with self.connection.cursor() as cursor:
|
|
cursor.execute(
|
|
"""
|
|
SELECT * FROM messages WHERE app_id = %s AND query = %s ORDER BY created_at DESC
|
|
""",
|
|
(appid, query)
|
|
)
|
|
result = cursor.fetchone()
|
|
if result:
|
|
colnames = [desc[0] for desc in cursor.description]
|
|
return dict(zip(colnames, result))
|
|
return None
|
|
except (Exception, psycopg2.Error) as error:
|
|
print("Error while getting messages_info", error)
|
|
|
|
def get_messages_info_by_id(self, message_id:str)->dict | None:
|
|
"""
|
|
根据消息 ID 从 'messages' 表中获取消息信息。
|
|
"""
|
|
try:
|
|
with self.connection.cursor() as cursor:
|
|
cursor.execute(
|
|
"""
|
|
SELECT * FROM messages WHERE id = %s
|
|
""",
|
|
(message_id, )
|
|
)
|
|
result = cursor.fetchone()
|
|
if result:
|
|
colnames = [desc[0] for desc in cursor.description]
|
|
return dict(zip(colnames, result))
|
|
return None
|
|
except (Exception, psycopg2.Error) as error:
|
|
print("Error while getting messages_info", error)
|
|
|
|
def get_workflow_node_executions_info(self, workflow_run_id:str)->list[dict] | None:
|
|
"""
|
|
根据工作流运行 ID 从 'workflow_node_executions' 表中获取节点执行信息。
|
|
|
|
Args:
|
|
workflow_run_id: 目标工作流运行的 ID。
|
|
|
|
Returns:
|
|
一个字典,其中键是列名,值是对应的节点执行数据。
|
|
如果未找到执行信息或发生错误,则返回 None。
|
|
"""
|
|
try:
|
|
with self.connection.cursor() as cursor:
|
|
cursor.execute(
|
|
"""
|
|
SELECT * FROM workflow_node_executions WHERE workflow_run_id = %s
|
|
""",
|
|
(workflow_run_id,)
|
|
)
|
|
result = cursor.fetchall()
|
|
if result:
|
|
colnames = [desc[0] for desc in cursor.description]
|
|
return [dict(zip(colnames, row)) for row in result]
|
|
return None
|
|
except (Exception, psycopg2.Error) as error:
|
|
print("Error while getting workflow_node_executions_info", error)
|
|
|
|
class DifyTool:
|
|
"""
|
|
提供用于获取 Dify 应用调试信息的工具类。
|
|
|
|
该类利用 PgSql 类从数据库中检索与特定应用和查询相关的
|
|
应用信息、消息详情以及工作流节点执行情况。
|
|
"""
|
|
@staticmethod
|
|
def get_message_debug_info_id(message_id:str)->dict | None:
|
|
"""
|
|
根据消息 ID 从 'messages' 表中获取消息信息。
|
|
"""
|
|
dify_pgsql = PgSql()
|
|
messages_info = dify_pgsql.get_messages_info_by_id(message_id)
|
|
if not messages_info:
|
|
return None
|
|
workflow_node_executions_info = dify_pgsql.get_workflow_node_executions_info(messages_info['workflow_run_id'])
|
|
if not workflow_node_executions_info:
|
|
return None
|
|
return {
|
|
"messages_info": messages_info,
|
|
"workflow_node_executions_info": workflow_node_executions_info
|
|
}
|
|
|
|
|
|
@staticmethod
|
|
def get_message_debug_info(appid:str, query:str)->dict:
|
|
"""
|
|
获取指定应用和查询相关的调试信息。
|
|
|
|
此静态方法会创建一个临时的 PgSql 实例来查询数据库,
|
|
然后聚合应用信息、消息信息和工作流节点执行信息。
|
|
|
|
Args:
|
|
appid: 目标应用的 ID。
|
|
query: 用户查询的具体内容。
|
|
|
|
Returns:
|
|
一个包含 "appinfo", "messages_info", 和
|
|
"workflow_node_executions_info"键的字典,分别对应
|
|
查询到的应用数据、消息数据和节点执行数据。
|
|
"""
|
|
dify_pgsql = PgSql()
|
|
appinfo = dify_pgsql.get_appinfo(appid)
|
|
if not appinfo:
|
|
return None
|
|
messages_info = dify_pgsql.get_messages_info(appid, query)
|
|
if not messages_info:
|
|
return None
|
|
workflow_node_executions_info = dify_pgsql.get_workflow_node_executions_info(messages_info['workflow_run_id'])
|
|
if not workflow_node_executions_info:
|
|
return None
|
|
return {
|
|
"appinfo": appinfo,
|
|
"messages_info": messages_info,
|
|
"workflow_node_executions_info": workflow_node_executions_info
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print(DifyTool.get_message_debug_info("ccf92b97-2789-4a3f-90e0-135a869a37c5", "电力建设计价通软件,导入结算后没有暂列金怎么办?要手动添加么?"))
|