Files

414 lines
17 KiB
Python
Executable File

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import psycopg2
import os
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
import sys
from rag2_0.dify.dify_client import ChatClient
from pydantic import BaseModel, Field
from langchain.output_parsers import PydanticOutputParser
from threading import Lock
class ContentSource(BaseModel):
score: int = Field(description="相关性分数")
reason: str = Field(description="评分理由")
class DifyTool:
class PgSql:
"""
用于连接和操作 PostgreSQL 数据库的类。
该类封装了数据库连接、关闭连接以及执行特定查询的方法,
主要用于从 Dify 应用相关的表中获取数据。
"""
def __init__(self):
self.pg_sql_lock = Lock()
"""
初始化 PgSql 实例并建立数据库连接。
"""
self.connection = None
self.connect_sql()
def connect_sql(self):
"""
连接到 PostgreSQL 数据库。
使用预定义的凭据连接到 'dify' 数据库。
如果连接失败,会抛出异常。
"""
try:
self.DIFY_PG_USER = os.getenv("DIFY_PG_USER")
self.DIFY_PG_PASSWORD = os.getenv("DIFY_PG_PASSWORD")
self.DIFY_PG_HOST = os.getenv("DIFY_PG_HOST")
self.DIFY_PG_PORT = os.getenv("DIFY_PG_PORT")
self.DIFY_PG_DATABASE = os.getenv("DIFY_PG_DATABASE")
# 连接数据库
self.connection = psycopg2.connect(
user=self.DIFY_PG_USER,
password=self.DIFY_PG_PASSWORD,
host=self.DIFY_PG_HOST,
port=self.DIFY_PG_PORT,
database=self.DIFY_PG_DATABASE
)
except (Exception, psycopg2.Error) as error:
raise Exception(f"Error while connecting to PostgreSQL: {error}")
def close_connection(self):
"""
关闭当前的 PostgreSQL 数据库连接。
如果存在活动的连接,则关闭它。
"""
with self.pg_sql_lock:
if self.connection:
self.connection.close()
self.connection = None
def get_appinfo(self, appid:str)->dict | None:
"""
根据应用 ID 从 'apps' 表中获取应用信息。
Args:
appid: 目标应用的 ID。
Returns:
一个字典,其中键是列名,值是对应的应用数据。
如果未找到应用或发生错误,则返回 None。
"""
with self.pg_sql_lock:
try:
with self.connection.cursor() as cursor:
cursor.execute(
"""
SELECT * FROM apps WHERE id = %s
""",
(appid,)
)
result = cursor.fetchone()
if result:
colnames = [desc[0] for desc in cursor.description]
return dict(zip(colnames, result))
return None
except (Exception, psycopg2.Error) as error:
raise Exception(f"Error while getting tenant_id by appid: {error}")
def get_messages_info(self, appid:str, query:str)->dict | None:
"""
根据应用 ID 和查询内容从 'messages' 表中获取消息信息。
Args:
appid: 目标应用的 ID。
query: 用户查询的具体内容。
Returns:
一个字典,其中键是列名,值是对应的消息数据。
如果未找到消息或发生错误,则返回 None。
"""
with self.pg_sql_lock:
try:
with self.connection.cursor() as cursor:
cursor.execute(
"""
SELECT * FROM messages WHERE app_id = %s AND query = %s ORDER BY created_at DESC
""",
(appid, query)
)
result = cursor.fetchone()
if result:
colnames = [desc[0] for desc in cursor.description]
return dict(zip(colnames, result))
return None
except (Exception, psycopg2.Error) as error:
raise Exception(f"Error while getting messages_info: {error}")
def get_messages_info_by_id(self, message_id:str)->dict | None:
"""
根据消息 ID 从 'messages' 表中获取消息信息。
"""
with self.pg_sql_lock:
try:
with self.connection.cursor() as cursor:
cursor.execute(
"""
SELECT * FROM messages WHERE id = %s
""",
(message_id, )
)
result = cursor.fetchone()
if result:
colnames = [desc[0] for desc in cursor.description]
return dict(zip(colnames, result))
return None
except (Exception, psycopg2.Error) as error:
raise Exception(f"Error while getting messages_info by id: {error}")
def get_workflow_node_executions_info(self, workflow_run_id:str)->list[dict] | None:
"""
根据工作流运行 ID 从 'workflow_node_executions' 表中获取节点执行信息。
Args:
workflow_run_id: 目标工作流运行的 ID。
Returns:
一个字典,其中键是列名,值是对应的节点执行数据。
如果未找到执行信息或发生错误,则返回 None。
"""
with self.pg_sql_lock:
try:
with self.connection.cursor() as cursor:
cursor.execute(
"""
SELECT * FROM workflow_node_executions WHERE workflow_run_id = %s
""",
(workflow_run_id,)
)
result = cursor.fetchall()
if result:
colnames = [desc[0] for desc in cursor.description]
return [dict(zip(colnames, row)) for row in result]
return None
except (Exception, psycopg2.Error) as error:
raise Exception(f"Error while getting workflow_node_executions_info: {error}")
def get_app_conversations(self, appid:str)->list[str] | None:
"""
根据应用 ID 从 'conversations' 表中获取应用会话信息。
"""
with self.pg_sql_lock:
try:
with self.connection.cursor() as cursor:
cursor.execute(
"""
SELECT DISTINCT conversation_id
FROM messages
WHERE app_id = %s AND invoke_from != 'debugger';
""",
(appid,)
)
result = cursor.fetchall()
if result:
colnames = [desc[0] for desc in cursor.description]
return [dict(zip(colnames, row)) for row in result]
return None
except (Exception, psycopg2.Error) as error:
raise Exception(f"Error while getting app_conversations: {error}")
def get_conversation_messages(self, conversation_id:str)->list[dict] | None:
"""
根据会话 ID 从 'messages' 表中获取会话消息信息。
"""
with self.pg_sql_lock:
try:
with self.connection.cursor() as cursor:
cursor.execute(
"""
SELECT * FROM messages WHERE conversation_id = %s AND status = 'normal'
""",
(conversation_id,)
)
result = cursor.fetchall()
if result:
colnames = [desc[0] for desc in cursor.description]
return [dict(zip(colnames, row)) for row in result]
return None
except (Exception, psycopg2.Error) as error:
raise Exception(f"Error while getting conversation_messages: {error}")
def get_message_rating(self, msg_id):
"""
通过msg_id从message_feedbacks中找到对应的rating。
:param msg_id: 消息ID (UUID格式)
:return: rating 字符串
"""
with self.pg_sql_lock:
rating = None
try:
with self.connection.cursor() as cursor:
# 构建查询语句
cursor.execute("""
SELECT rating
FROM message_feedbacks
WHERE message_id = %s
""",
(msg_id,))
# 执行查询
row = cursor.fetchone()
if row:
rating = row[0]
except (Exception, psycopg2.Error) as error:
raise Exception(f"Error while getting conversation_messages: {error}")
return rating
def get_workflow_run_info(self, workflow_run_id):
"""
通过msg_id从message_feedbacks中找到对应的rating。
:param msg_id: 消息ID (UUID格式)
:return: rating 字符串
"""
with self.pg_sql_lock:
rating = None
try:
with self.connection.cursor() as cursor:
# 构建查询语句
cursor.execute("""
SELECT * FROM workflow_runs WHERE id=%s;
""",
(workflow_run_id,))
# 执行查询
result = cursor.fetchone()
if result:
colnames = [desc[0] for desc in cursor.description]
return dict(zip(colnames, result))
except (Exception, psycopg2.Error) as error:
raise Exception(f"Error while getting conversation_messages: {error}")
return None
def execute_custom_sql(self, sql, params=None, fetch_type='all'):
"""
执行自定义的SQL查询或命令。
Args:
sql: 要执行的SQL语句
params: SQL参数(可选),用于参数化查询
fetch_type: 结果获取类型,可选值:'all'(返回所有行), 'one'(返回单行), 'none'(不返回结果)
Returns:
根据fetch_type返回查询结果:
- fetch_type='all': 返回包含所有行的列表,每行是一个字典
- fetch_type='one': 返回单行字典,如果没有结果则返回None
- fetch_type='none': 返回受影响的行数
Raises:
Exception: 如果执行SQL时发生错误
"""
with self.pg_sql_lock:
try:
with self.connection.cursor() as cursor:
# 执行SQL语句
cursor.execute(sql, params or ())
# 根据fetch_type处理结果
if fetch_type == 'all':
result = cursor.fetchall()
if result:
colnames = [desc[0] for desc in cursor.description]
return [dict(zip(colnames, row)) for row in result]
return []
elif fetch_type == 'one':
result = cursor.fetchone()
if result:
colnames = [desc[0] for desc in cursor.description]
return dict(zip(colnames, result))
return None
elif fetch_type == 'none':
# 对于UPDATE, INSERT, DELETE等操作,返回受影响的行数
return cursor.rowcount
else:
raise ValueError(f"不支持的fetch_type: {fetch_type}")
except (Exception, psycopg2.Error) as error:
raise Exception(f"Error executing custom SQL: {error}")
"""
提供用于获取 Dify 应用调试信息的工具类。
该类利用 PgSql 类从数据库中检索与特定应用和查询相关的
应用信息、消息详情以及工作流节点执行情况。
"""
def __init__(self):
self.dify_pgsql = DifyTool.PgSql()
def __del__(self):
"""
析构函数,在对象被销毁时自动关闭数据库连接。
确保在对象生命周期结束时释放数据库资源。
"""
try:
self.dify_pgsql.close_connection()
except Exception as e:
# 析构函数中的异常不应该传播,所以这里只是简单记录
print(f"关闭数据库连接时出错: {e}")
def get_message_debug_info_by_id(self, message_id:str)->dict | None:
"""
根据消息 ID 从 'messages' 表中获取消息信息。
"""
try:
messages_info = self.dify_pgsql.get_messages_info_by_id(message_id)
if not messages_info:
return None
workflow_node_executions_info = self.dify_pgsql.get_workflow_node_executions_info(messages_info['workflow_run_id'])
if not workflow_node_executions_info:
return {
"messages_info": messages_info,
"workflow_node_executions_info": None
}
return {
"messages_info": messages_info,
"workflow_node_executions_info": workflow_node_executions_info
}
except Exception as e:
raise Exception(f"Error in get_message_debug_info_by_id: {e}")
def get_message_debug_info_by_query(self, appid:str, query:str)->dict:
"""
获取指定应用和查询相关的调试信息。
此静态方法会创建一个临时的 PgSql 实例来查询数据库,
然后聚合应用信息、消息信息和工作流节点执行信息。
Args:
appid: 目标应用的 ID。
query: 用户查询的具体内容。
Returns:
一个包含 "appinfo", "messages_info", 和
"workflow_node_executions_info"键的字典,分别对应
查询到的应用数据、消息数据和节点执行数据。
"""
try:
appinfo = self.dify_pgsql.get_appinfo(appid)
if not appinfo:
return None
messages_info = self.dify_pgsql.get_messages_info(appid, query)
if not messages_info:
return None
workflow_node_executions_info = self.dify_pgsql.get_workflow_node_executions_info(messages_info['workflow_run_id'])
if not workflow_node_executions_info:
return None
return {
"appinfo": appinfo,
"messages_info": messages_info,
"workflow_node_executions_info": workflow_node_executions_info
}
except Exception as e:
raise Exception(f"Error in get_message_debug_info_by_query: {e}")
def get_app_conversations(self, appid:str)->list[str] | None:
"""
根据应用 ID 从 'conversations' 表中获取应用会话信息。
"""
return self.dify_pgsql.get_app_conversations(appid)
def get_conversation_messages(self, conversation_id:str):
"""
根据会话 ID 从 'messages' 表中获取会话消息信息。
"""
return self.dify_pgsql.get_conversation_messages(conversation_id)
def get_workflow_node_executions_info(self, workflow_run_id:str):
return self.dify_pgsql.get_workflow_node_executions_info(workflow_run_id)
def get_message_rating(self, msg_id):
return self.dify_pgsql.get_message_rating(msg_id)
def get_workflow_run_info(self, workflow_run_id):
return self.dify_pgsql.get_workflow_run_info(workflow_run_id)