import psycopg2 from psycopg2 import sql import os import json from datetime import timezone, timedelta class PgSql: """ 用于连接和操作 PostgreSQL 数据库的类。 该类封装了数据库连接、关闭连接以及执行特定查询的方法, 主要用于从 Dify 应用相关的表中获取数据。 """ def __init__(self): """ 初始化 PgSql 实例并建立数据库连接。 """ self.connection = None self.connect_sql() def connect_sql(self): """ 连接到 PostgreSQL 数据库。 使用预定义的凭据连接到 'dify' 数据库。 如果连接失败,会打印错误信息。 """ try: # 连接数据库 self.connection = psycopg2.connect( user="postgres", password="difyai123456", host="172.20.0.145", port=5432, database="dify" ) except (Exception, psycopg2.Error) as error: print("Error while connecting to PostgreSQL", error) def close_connection(self): """ 关闭当前的 PostgreSQL 数据库连接。 如果存在活动的连接,则关闭它并打印确认信息。 """ if self.connection: self.connection.close() print("PostgreSQL connection is closed") def get_appinfo(self, appid:str)->dict | None: """ 根据应用 ID 从 'apps' 表中获取应用信息。 Args: appid: 目标应用的 ID。 Returns: 一个字典,其中键是列名,值是对应的应用数据。 如果未找到应用或发生错误,则返回 None。 """ try: with self.connection.cursor() as cursor: cursor.execute( """ SELECT * FROM apps WHERE id = %s """, (appid,) ) result = cursor.fetchone() if result: colnames = [desc[0] for desc in cursor.description] return dict(zip(colnames, result)) return None except (Exception, psycopg2.Error) as error: print("Error while getting tenant_id by appid", error) def get_messages_info(self, appid:str, query:str)->dict | None: """ 根据应用 ID 和查询内容从 'messages' 表中获取消息信息。 Args: appid: 目标应用的 ID。 query: 用户查询的具体内容。 Returns: 一个字典,其中键是列名,值是对应的消息数据。 如果未找到消息或发生错误,则返回 None。 """ try: with self.connection.cursor() as cursor: cursor.execute( """ SELECT * FROM messages WHERE app_id = %s AND query = %s ORDER BY created_at DESC """, (appid, query) ) result = cursor.fetchone() if result: colnames = [desc[0] for desc in cursor.description] return dict(zip(colnames, result)) return None except (Exception, psycopg2.Error) as error: print("Error while getting messages_info", error) def get_messages_info_by_id(self, message_id:str)->dict | None: """ 根据消息 ID 从 'messages' 表中获取消息信息。 """ try: with self.connection.cursor() as cursor: cursor.execute( """ SELECT * FROM messages WHERE id = %s """, (message_id, ) ) result = cursor.fetchone() if result: colnames = [desc[0] for desc in cursor.description] return dict(zip(colnames, result)) return None except (Exception, psycopg2.Error) as error: print("Error while getting messages_info", error) def get_workflow_node_executions_info(self, workflow_run_id:str)->list[dict] | None: """ 根据工作流运行 ID 从 'workflow_node_executions' 表中获取节点执行信息。 Args: workflow_run_id: 目标工作流运行的 ID。 Returns: 一个字典,其中键是列名,值是对应的节点执行数据。 如果未找到执行信息或发生错误,则返回 None。 """ try: with self.connection.cursor() as cursor: cursor.execute( """ SELECT * FROM workflow_node_executions WHERE workflow_run_id = %s """, (workflow_run_id,) ) result = cursor.fetchall() if result: colnames = [desc[0] for desc in cursor.description] return [dict(zip(colnames, row)) for row in result] return None except (Exception, psycopg2.Error) as error: print("Error while getting workflow_node_executions_info", error) class DifyTool: """ 提供用于获取 Dify 应用调试信息的工具类。 该类利用 PgSql 类从数据库中检索与特定应用和查询相关的 应用信息、消息详情以及工作流节点执行情况。 """ @staticmethod def get_message_debug_info_by_id(message_id:str)->dict | None: """ 根据消息 ID 从 'messages' 表中获取消息信息。 """ dify_pgsql = PgSql() messages_info = dify_pgsql.get_messages_info_by_id(message_id) if not messages_info: return None workflow_node_executions_info = dify_pgsql.get_workflow_node_executions_info(messages_info['workflow_run_id']) if not workflow_node_executions_info: return None return { "messages_info": messages_info, "workflow_node_executions_info": workflow_node_executions_info } @staticmethod def get_message_debug_info_by_query(appid:str, query:str)->dict: """ 获取指定应用和查询相关的调试信息。 此静态方法会创建一个临时的 PgSql 实例来查询数据库, 然后聚合应用信息、消息信息和工作流节点执行信息。 Args: appid: 目标应用的 ID。 query: 用户查询的具体内容。 Returns: 一个包含 "appinfo", "messages_info", 和 "workflow_node_executions_info"键的字典,分别对应 查询到的应用数据、消息数据和节点执行数据。 """ dify_pgsql = PgSql() appinfo = dify_pgsql.get_appinfo(appid) if not appinfo: return None messages_info = dify_pgsql.get_messages_info(appid, query) if not messages_info: return None workflow_node_executions_info = dify_pgsql.get_workflow_node_executions_info(messages_info['workflow_run_id']) if not workflow_node_executions_info: return None return { "appinfo": appinfo, "messages_info": messages_info, "workflow_node_executions_info": workflow_node_executions_info } if __name__ == "__main__": print(DifyTool.get_message_debug_info_by_query("ccf92b97-2789-4a3f-90e0-135a869a37c5", "电力建设计价通软件,导入结算后没有暂列金怎么办?要手动添加么?"))