import psycopg2 from psycopg2 import sql import os import json from datetime import timezone, timedelta class PgSql: def __init__(self): self.connection = None self.connect_sql() def connect_sql(self): try: # 连接数据库 self.connection = psycopg2.connect( user="postgres", password="difyai123456", host="172.20.0.145", port=5432, database="dify" ) except (Exception, psycopg2.Error) as error: print("Error while connecting to PostgreSQL", error) def find_rating_by_message_id(self, message_id): cursor = None try: cursor = self.connection.cursor() # 构建SQL查询 query = sql.SQL(""" SELECT rating FROM message_feedbacks WHERE message_id = %s; """) # 执行查询并获取结果 cursor.execute(query, (message_id,)) record = cursor.fetchone() if record: return record[0] else: return None except (Exception, psycopg2.Error) as error: print("Error while fetching data from PostgreSQL", error) finally: if cursor: cursor.close() return None def find_message(self, conversation_id, app_id, output_dir, date=None): cursor = None try: cursor = self.connection.cursor() # 构建SQL查询,根据是否传入日期和app_id进行条件过滤 if app_id and date: # 如果 app_id 和日期都存在 query = sql.SQL(""" SELECT id, conversation_id, query, answer, workflow_run_id, created_at FROM messages WHERE conversation_id = %s AND app_id = %s AND DATE(created_at) = %s ORDER BY created_at ASC; """) query_params = (conversation_id, app_id, date) elif app_id: # 如果只有 app_id 存在 query = sql.SQL(""" SELECT id, conversation_id, query, answer, workflow_run_id, created_at FROM messages WHERE conversation_id = %s AND app_id = %s ORDER BY created_at ASC; """) query_params = (conversation_id, app_id) elif date: # 如果只有日期存在 query = sql.SQL(""" SELECT id, conversation_id, query, answer, workflow_run_id, created_at FROM messages WHERE conversation_id = %s AND DATE(created_at) = %s ORDER BY created_at ASC; """) query_params = (conversation_id, date) else: # 如果 app_id 和日期都不存在 query = sql.SQL(""" SELECT id, conversation_id, query, answer, workflow_run_id, created_at FROM messages WHERE conversation_id = %s ORDER BY created_at ASC; """) query_params = (conversation_id,) # 执行查询并获取结果 cursor.execute(query, query_params) records = cursor.fetchall() # 打开文件并写入查询结果 output_file = os.path.join(output_dir, "conversation.md") with open(output_file, 'w', encoding='utf-8') as file: for record in records: msg_id, query, workflow_run_id, answer = record[0], record[2], record[4], record[3] # 假设原来的时间是 UTC 时间,给它加上 UTC 时区信息 if record[5].tzinfo is None: record_utc = record[5].replace(tzinfo=timezone.utc) # 添加 UTC 时区信息 else: record_utc = record[5] # 将时间转换为 UTC+8 时区 utc_plus_8 = timezone(timedelta(hours=8)) create_data = record_utc.astimezone(utc_plus_8).strftime("%Y-%m-%d %H:%M:%S") message_rating = self.find_rating_by_message_id(msg_id) if message_rating: file.write(f"## ({message_rating}) Query: {query}\n\n") else: file.write(f"## Query: {query}\n\n") file.write(f"Workflow_run_id: {workflow_run_id}\n\n") file.write(f"create_data: {create_data}\n\n") file.write(f"**Answer:**\n\n{answer}\n\n") # 调用find_workflow_node_executions函数生成workflow_run_id的md文件 temp = self.find_workflow_node_executions(workflow_run_id, output_dir) return temp # print(f"Data has been written to {output_file}") except (Exception, psycopg2.Error) as error: print("Error while fetching data from PostgreSQL", error) finally: if cursor: cursor.close() def find_conversation_ids_by_message_date(self, date, app_id=None): cursor = None try: # 创建游标 cursor = self.connection.cursor() # 根据 app_id 是否为空,动态生成 SQL 查询 if app_id: # 如果 app_id 存在,查询指定日期和 app_id 的 conversation_id query = sql.SQL(""" SELECT DISTINCT conversation_id FROM messages WHERE DATE(created_at) = %s AND app_id = %s; """) query_params = (date, app_id) else: # 如果没有指定 app_id,只查询日期对应的 conversation_id query = sql.SQL(""" SELECT DISTINCT conversation_id FROM messages WHERE DATE(created_at) = %s; """) query_params = (date,) # 执行查询 cursor.execute(query, query_params) conversation_ids = cursor.fetchall() # 将结果转换为列表并去重(防止极端情况下仍有重复) unique_conversation_ids = list(set([conversation_id[0] for conversation_id in conversation_ids])) return unique_conversation_ids except (Exception, psycopg2.Error) as error: print("Error while fetching data from PostgreSQL", error) finally: if cursor: cursor.close() def find_conversation_ids_by_date(self, date, app_id): cursor = None try: cursor = self.connection.cursor() # 构建SQL查询,如果app_id为空则不包含app_id条件 if app_id: query = sql.SQL(""" SELECT id FROM conversations WHERE DATE(updated_at) = %s AND invoke_from = 'web-app' AND app_id = %s; """) query_params = (date, app_id) else: query = sql.SQL(""" SELECT id FROM conversations WHERE DATE(updated_at) = %s AND invoke_from = 'web-app'; """) query_params = (date,) # 执行查询并获取结果 cursor.execute(query, query_params) conversation_ids = cursor.fetchall() # 打印或返回结果 for conversation_id in conversation_ids: print(conversation_id[0]) return [conversation_id[0] for conversation_id in conversation_ids] except (Exception, psycopg2.Error) as error: print("Error while fetching data from PostgreSQL", error) finally: if cursor: cursor.close() def find_workflow_node_executions(self, workflow_run_id, output_dir_): cursor = None try: cursor = self.connection.cursor() # 构建SQL查询 query = sql.SQL(""" SELECT id, workflow_run_id, node_type, title, inputs, process_data, outputs, finished_at FROM workflow_node_executions WHERE workflow_run_id = %s ORDER BY finished_at ASC; """) # 执行查询并获取结果 cursor.execute(query, (workflow_run_id,)) records = cursor.fetchall() # 构建文件名并写入数据 output_file = os.path.join(f"{output_dir_}", "workflow_run_logs", f"{workflow_run_id}.md") os.makedirs(os.path.join(f"{output_dir_}", "workflow_run_logs"), exist_ok=True) with open(output_file, 'w', encoding='utf-8') as file: for record in records: id, workflow_run_id, node_type, title, inputs, process_data, outputs, finished_at = record try: file.write(f"# Node Type: {node_type}(**{title}**)\n\n") file.write(f"Node ID: {id}\n\n") if inputs is not None: inputs_data = json.loads(inputs) file.write( f"Inputs: \n```json\n{json.dumps(inputs_data, ensure_ascii=False, indent=4)}\n```\n\n") if process_data is not None: process_data = json.loads(process_data) file.write( f"process_data: \n```json\n{json.dumps(process_data, ensure_ascii=False, indent=4)}\n```\n\n") if outputs is not None: outputs_data = json.loads(outputs) file.write( f"outputs: \n```json\n{json.dumps(outputs_data, ensure_ascii=False, indent=4)}\n```\n\n") # 提取出LLM的回答 if node_type == "http-request" and title == "RagasHTTP": body = json.loads(outputs).get("body", {}) answer = json.loads(body)[0].get('answer', {}) file.write( f"answer: \n\n{answer}\n\n") file.write(f"Finished At: {finished_at}\n\n") except Exception as e: if e: print(f"file write has error:{e}") # print(output_file) return output_file except (Exception, psycopg2.Error) as error: print("Error while fetching data from PostgreSQL", error) finally: if cursor: cursor.close() def close_connection(self): if self.connection: self.connection.close() # print("PostgreSQL connection is closed") def export_by_conversation_id(conversation_id, app_id: str): pgsql = PgSql() output_dir = os.path.join(".", "conversion_infos", f"{conversation_id}") os.makedirs(output_dir, exist_ok=True) finall = pgsql.find_message(conversation_id=conversation_id, app_id=app_id, output_dir=output_dir) return finall pgsql.close_connection() def export_by_data(data: str, app_id: str): pgsql = PgSql() list_conversation = pgsql.find_conversation_ids_by_message_date(data, app_id) for conversation_id in list_conversation: output_dir = os.path.join(".", "conversion_infos", data, f"{conversation_id}") os.makedirs(output_dir, exist_ok=True) pgsql.find_message(conversation_id=conversation_id, app_id=app_id, output_dir=output_dir, date=data) pgsql.close_connection() # 执行脚本 if __name__ == '__main__': # export_by_data("2024-08-21", "your_app_id") a = export_by_conversation_id("d0369d15-d253-4145-9602-bf6b0e569702", "") print(a)