Files
2025-03-31 15:17:47 +08:00

317 lines
12 KiB
Python

import psycopg2
from psycopg2 import sql
import os
import json
from datetime import timezone, timedelta
class PgSql:
def __init__(self):
self.connection = None
self.connect_sql()
def connect_sql(self):
try:
# 连接数据库
self.connection = psycopg2.connect(
user="postgres",
password="difyai123456",
host="172.20.0.145",
port=5432,
database="dify"
)
except (Exception, psycopg2.Error) as error:
print("Error while connecting to PostgreSQL", error)
def find_rating_by_message_id(self, message_id):
cursor = None
try:
cursor = self.connection.cursor()
# 构建SQL查询
query = sql.SQL("""
SELECT rating
FROM message_feedbacks
WHERE message_id = %s;
""")
# 执行查询并获取结果
cursor.execute(query, (message_id,))
record = cursor.fetchone()
if record:
return record[0]
else:
return None
except (Exception, psycopg2.Error) as error:
print("Error while fetching data from PostgreSQL", error)
finally:
if cursor:
cursor.close()
return None
def find_message(self, conversation_id, app_id, output_dir, date=None):
cursor = None
try:
cursor = self.connection.cursor()
# 构建SQL查询,根据是否传入日期和app_id进行条件过滤
if app_id and date:
# 如果 app_id 和日期都存在
query = sql.SQL("""
SELECT id, conversation_id, query, answer, workflow_run_id, created_at
FROM messages
WHERE conversation_id = %s AND app_id = %s AND DATE(created_at) = %s
ORDER BY created_at ASC;
""")
query_params = (conversation_id, app_id, date)
elif app_id:
# 如果只有 app_id 存在
query = sql.SQL("""
SELECT id, conversation_id, query, answer, workflow_run_id, created_at
FROM messages
WHERE conversation_id = %s AND app_id = %s
ORDER BY created_at ASC;
""")
query_params = (conversation_id, app_id)
elif date:
# 如果只有日期存在
query = sql.SQL("""
SELECT id, conversation_id, query, answer, workflow_run_id, created_at
FROM messages
WHERE conversation_id = %s AND DATE(created_at) = %s
ORDER BY created_at ASC;
""")
query_params = (conversation_id, date)
else:
# 如果 app_id 和日期都不存在
query = sql.SQL("""
SELECT id, conversation_id, query, answer, workflow_run_id, created_at
FROM messages
WHERE conversation_id = %s
ORDER BY created_at ASC;
""")
query_params = (conversation_id,)
# 执行查询并获取结果
cursor.execute(query, query_params)
records = cursor.fetchall()
# 打开文件并写入查询结果
output_file = os.path.join(output_dir, "conversation.md")
with open(output_file, 'w', encoding='utf-8') as file:
for record in records:
msg_id, query, workflow_run_id, answer = record[0], record[2], record[4], record[3]
# 假设原来的时间是 UTC 时间,给它加上 UTC 时区信息
if record[5].tzinfo is None:
record_utc = record[5].replace(tzinfo=timezone.utc) # 添加 UTC 时区信息
else:
record_utc = record[5]
# 将时间转换为 UTC+8 时区
utc_plus_8 = timezone(timedelta(hours=8))
create_data = record_utc.astimezone(utc_plus_8).strftime("%Y-%m-%d %H:%M:%S")
message_rating = self.find_rating_by_message_id(msg_id)
if message_rating:
file.write(f"## ({message_rating}) Query: {query}\n\n")
else:
file.write(f"## Query: {query}\n\n")
file.write(f"Workflow_run_id: {workflow_run_id}\n\n")
file.write(f"create_data: {create_data}\n\n")
file.write(f"**Answer:**\n\n{answer}\n\n")
# 调用find_workflow_node_executions函数生成workflow_run_id的md文件
temp = self.find_workflow_node_executions(workflow_run_id, output_dir)
return temp
# print(f"Data has been written to {output_file}")
except (Exception, psycopg2.Error) as error:
print("Error while fetching data from PostgreSQL", error)
finally:
if cursor:
cursor.close()
def find_conversation_ids_by_message_date(self, date, app_id=None):
cursor = None
try:
# 创建游标
cursor = self.connection.cursor()
# 根据 app_id 是否为空,动态生成 SQL 查询
if app_id:
# 如果 app_id 存在,查询指定日期和 app_id 的 conversation_id
query = sql.SQL("""
SELECT DISTINCT conversation_id
FROM messages
WHERE DATE(created_at) = %s AND app_id = %s;
""")
query_params = (date, app_id)
else:
# 如果没有指定 app_id,只查询日期对应的 conversation_id
query = sql.SQL("""
SELECT DISTINCT conversation_id
FROM messages
WHERE DATE(created_at) = %s;
""")
query_params = (date,)
# 执行查询
cursor.execute(query, query_params)
conversation_ids = cursor.fetchall()
# 将结果转换为列表并去重(防止极端情况下仍有重复)
unique_conversation_ids = list(set([conversation_id[0] for conversation_id in conversation_ids]))
return unique_conversation_ids
except (Exception, psycopg2.Error) as error:
print("Error while fetching data from PostgreSQL", error)
finally:
if cursor:
cursor.close()
def find_conversation_ids_by_date(self, date, app_id):
cursor = None
try:
cursor = self.connection.cursor()
# 构建SQL查询,如果app_id为空则不包含app_id条件
if app_id:
query = sql.SQL("""
SELECT id
FROM conversations
WHERE DATE(updated_at) = %s
AND invoke_from = 'web-app'
AND app_id = %s;
""")
query_params = (date, app_id)
else:
query = sql.SQL("""
SELECT id
FROM conversations
WHERE DATE(updated_at) = %s
AND invoke_from = 'web-app';
""")
query_params = (date,)
# 执行查询并获取结果
cursor.execute(query, query_params)
conversation_ids = cursor.fetchall()
# 打印或返回结果
for conversation_id in conversation_ids:
print(conversation_id[0])
return [conversation_id[0] for conversation_id in conversation_ids]
except (Exception, psycopg2.Error) as error:
print("Error while fetching data from PostgreSQL", error)
finally:
if cursor:
cursor.close()
def find_workflow_node_executions(self, workflow_run_id, output_dir_):
cursor = None
try:
cursor = self.connection.cursor()
# 构建SQL查询
query = sql.SQL("""
SELECT id, workflow_run_id, node_type, title, inputs, process_data, outputs, finished_at
FROM workflow_node_executions
WHERE workflow_run_id = %s
ORDER BY finished_at ASC;
""")
# 执行查询并获取结果
cursor.execute(query, (workflow_run_id,))
records = cursor.fetchall()
# 构建文件名并写入数据
output_file = os.path.join(f"{output_dir_}", "workflow_run_logs", f"{workflow_run_id}.md")
os.makedirs(os.path.join(f"{output_dir_}", "workflow_run_logs"), exist_ok=True)
with open(output_file, 'w', encoding='utf-8') as file:
for record in records:
id, workflow_run_id, node_type, title, inputs, process_data, outputs, finished_at = record
try:
file.write(f"# Node Type: {node_type}(**{title}**)\n\n")
file.write(f"Node ID: {id}\n\n")
if inputs is not None:
inputs_data = json.loads(inputs)
file.write(
f"Inputs: \n```json\n{json.dumps(inputs_data, ensure_ascii=False, indent=4)}\n```\n\n")
if process_data is not None:
process_data = json.loads(process_data)
file.write(
f"process_data: \n```json\n{json.dumps(process_data, ensure_ascii=False, indent=4)}\n```\n\n")
if outputs is not None:
outputs_data = json.loads(outputs)
file.write(
f"outputs: \n```json\n{json.dumps(outputs_data, ensure_ascii=False, indent=4)}\n```\n\n")
# 提取出LLM的回答
if node_type == "http-request" and title == "RagasHTTP":
body = json.loads(outputs).get("body", {})
answer = json.loads(body)[0].get('answer', {})
file.write(
f"answer: \n\n{answer}\n\n")
file.write(f"Finished At: {finished_at}\n\n")
except Exception as e:
if e:
print(f"file write has error:{e}")
# print(output_file)
return output_file
except (Exception, psycopg2.Error) as error:
print("Error while fetching data from PostgreSQL", error)
finally:
if cursor:
cursor.close()
def close_connection(self):
if self.connection:
self.connection.close()
# print("PostgreSQL connection is closed")
def export_by_conversation_id(conversation_id, app_id: str):
pgsql = PgSql()
output_dir = os.path.join(".", "conversion_infos", f"{conversation_id}")
os.makedirs(output_dir, exist_ok=True)
finall = pgsql.find_message(conversation_id=conversation_id, app_id=app_id, output_dir=output_dir)
return finall
pgsql.close_connection()
def export_by_data(data: str, app_id: str):
pgsql = PgSql()
list_conversation = pgsql.find_conversation_ids_by_message_date(data, app_id)
for conversation_id in list_conversation:
output_dir = os.path.join(".", "conversion_infos", data, f"{conversation_id}")
os.makedirs(output_dir, exist_ok=True)
pgsql.find_message(conversation_id=conversation_id, app_id=app_id, output_dir=output_dir, date=data)
pgsql.close_connection()
# 执行脚本
if __name__ == '__main__':
# export_by_data("2024-08-21", "your_app_id")
a = export_by_conversation_id("d0369d15-d253-4145-9602-bf6b0e569702", "")
print(a)