From ec3db656a5099619b1219018ab1a770a665b329b Mon Sep 17 00:00:00 2001 From: ouyangyouzhang Date: Wed, 9 Jul 2025 16:50:35 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96DifyExporter=E7=B1=BB?= =?UTF-8?q?=EF=BC=8C=E6=96=B0=E5=A2=9E=E6=8C=89=E6=97=A5=E6=9C=9F=E8=8C=83?= =?UTF-8?q?=E5=9B=B4=E8=BF=87=E6=BB=A4=E6=B6=88=E6=81=AF=E7=9A=84=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=EF=BC=8C=E6=94=AF=E6=8C=81UTC+8=E6=97=B6=E5=8C=BA?= =?UTF-8?q?=E7=9A=84=E6=97=B6=E9=97=B4=E8=BD=AC=E6=8D=A2=EF=BC=8C=E5=B9=B6?= =?UTF-8?q?=E6=9B=B4=E6=96=B0=E5=AF=BC=E5=87=BA=E6=96=87=E4=BB=B6=E5=90=8D?= =?UTF-8?q?=E4=BB=A5=E5=8C=85=E5=90=AB=E6=97=A5=E6=9C=9F=E8=8C=83=E5=9B=B4?= =?UTF-8?q?=E4=BF=A1=E6=81=AF=E3=80=82=E5=90=8C=E6=97=B6=EF=BC=8C=E9=87=8D?= =?UTF-8?q?=E6=9E=84=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rag2_0/dify/dify_tool.py | 20 +++-- rag2_0/dify/export_new_dify.py | 129 +++++++++++++++++++++++---------- 2 files changed, 104 insertions(+), 45 deletions(-) diff --git a/rag2_0/dify/dify_tool.py b/rag2_0/dify/dify_tool.py index 0c50101..fbb3400 100755 --- a/rag2_0/dify/dify_tool.py +++ b/rag2_0/dify/dify_tool.py @@ -40,13 +40,18 @@ class PgSql: 如果连接失败,会抛出异常。 """ try: + self.DIFY_PG_USER = os.getenv("DIFY_PG_USER") + self.DIFY_PG_PASSWORD = os.getenv("DIFY_PG_PASSWORD") + self.DIFY_PG_HOST = os.getenv("DIFY_PG_HOST") + self.DIFY_PG_PORT = os.getenv("DIFY_PG_PORT") + self.DIFY_PG_DATABASE = os.getenv("DIFY_PG_DATABASE") # 连接数据库 self.connection = psycopg2.connect( - user=os.getenv("DIFY_PG_USER"), - password=os.getenv("DIFY_PG_PASSWORD"), - host=os.getenv("DIFY_PG_HOST"), - port=os.getenv("DIFY_PG_PORT"), - database=os.getenv("DIFY_PG_DATABASE") + user=self.DIFY_PG_USER, + password=self.DIFY_PG_PASSWORD, + host=self.DIFY_PG_HOST, + port=self.DIFY_PG_PORT, + database=self.DIFY_PG_DATABASE ) except (Exception, psycopg2.Error) as error: @@ -273,7 +278,10 @@ class DifyTool: return None workflow_node_executions_info = self.dify_pgsql.get_workflow_node_executions_info(messages_info['workflow_run_id']) if not workflow_node_executions_info: - return None + return { + "messages_info": messages_info, + "workflow_node_executions_info": None + } return { "messages_info": messages_info, "workflow_node_executions_info": workflow_node_executions_info diff --git a/rag2_0/dify/export_new_dify.py b/rag2_0/dify/export_new_dify.py index a145391..cf9e999 100644 --- a/rag2_0/dify/export_new_dify.py +++ b/rag2_0/dify/export_new_dify.py @@ -13,14 +13,22 @@ from rag2_0.dify.dify_tool import PgSql, DifyTool class DifyExporter: """ Dify数据导出工具,用于从Dify系统中导出对话和消息数据 + + 支持按日期范围过滤消息,可以指定开始日期和结束日期 """ - def __init__(self, app_id=None, query_log_file=None): + def __init__(self, app_id=None, query_log_file=None, start_date=None, end_date=None): """ 初始化DifyExporter实例 Args: app_id: Dify应用ID,默认为None query_log_file: 查询日志文件路径,默认为None + start_date: 开始日期时间,格式为YYYY-MM-DD HH,默认为None(不限制开始日期) + end_date: 结束日期时间,格式为YYYY-MM-DD HH,默认为None(不限制结束日期) + + Note: + 数据库中的时间是UTC+0时区,会自动转换为UTC+8时区进行过滤和显示 + 因此输入的start_date和end_date应该是UTC+8时区的时间 """ # 设置默认值 self.app_id = app_id or "72d03c7d-8bea-42f9-9e8d-cdfb9480f372" @@ -29,6 +37,10 @@ class DifyExporter: self.query_log_dir = os.path.join(os.getcwd(), "data", "query_logs") self.query_log_file = query_log_file or os.path.join(self.query_log_dir, "answer_type_logs.json") + # 设置日期过滤 + self.start_date = start_date + self.end_date = end_date + # 初始化工具类 self.dify_pgsql = PgSql() self.dify_tool = DifyTool() @@ -76,37 +88,28 @@ class DifyExporter: message_chain[message["parent_message_id"]] = [message] message_chain_new = [] - current_message_id = None - processed_ids = set() # 防止无限循环 - - while True: - # 获取当前父消息ID对应的所有消息 - msg_list = message_chain.get(current_message_id, []) - - # 如果没有消息或已处理过该ID,则退出循环 - if not msg_list or current_message_id in processed_ids: - break - - # 记录已处理的ID - if current_message_id is not None: - processed_ids.add(current_message_id) - - # 使用max()函数找出创建时间最新的消息 - new_msg = max(msg_list, key=lambda x: x["created_at"]) if msg_list else None - - # 将最新消息添加到结果列表,并更新当前消息ID - if new_msg: - message_chain_new.append(new_msg) - current_message_id = new_msg["id"] + for message in message_chain.values(): + if len(message) == 1: + message_chain_new.append(message[0]) else: - break - + query_list = {} + for msg in message: + if msg['query'] in query_list: + if query_list[msg['query']]["created_at"] < msg['created_at']: + query_list[msg['query']] = msg + else: + query_list[msg['query']] = msg + for msg in query_list.values(): + message_chain_new.append(msg) return message_chain_new def extract_message_info(self, message): """ 从消息中提取信息 + Note: + 数据库中的created_at是UTC+0时间,会自动转换为UTC+8时间显示 + Args: message: 消息对象 @@ -118,19 +121,27 @@ class DifyExporter: user_name = msg_inputs.get("user_name", "") msg_query = message["query"] msg_answer = message["answer"] - created_at = message['created_at'].strftime("%Y-%m-%d") + + # 将UTC+0时间转换为UTC+8时间 + created_at_utc = message['created_at'] + created_at_utc8 = created_at_utc + datetime.timedelta(hours=8) + created_at = created_at_utc8.strftime("%Y-%m-%d %H:%M") + msg_debug_info = self.dify_tool.get_message_debug_info_by_id(msg_id) if not msg_debug_info: return None - wiki_list = [] - for node_execution in msg_debug_info['workflow_node_executions_info']: - if node_execution["title"] == "提取处理后的知识": - source_kno = json.loads(node_execution["outputs"])["source_kno"] - knowledge_list_metadata = json.loads(node_execution["outputs"])["knowledge_list_metadata"] - for knowledge in knowledge_list_metadata: - document_name = knowledge['metadata']['document_name'] - wiki_list.append(document_name.split("/")[-1]) + wiki_list = [] + if msg_debug_info['workflow_node_executions_info'] is not None: + for node_execution in msg_debug_info['workflow_node_executions_info']: + if node_execution["title"] == "提取处理后的知识": + if node_execution["outputs"] is None: + break + source_kno = json.loads(node_execution["outputs"])["source_kno"] + knowledge_list_metadata = json.loads(node_execution["outputs"])["knowledge_list_metadata"] + for knowledge in knowledge_list_metadata: + document_name = knowledge['metadata']['document_name'] + wiki_list.append(document_name.split("/")[-1]) wiki_list = list(set(wiki_list)) wiki_list_str = "\n".join(wiki_list) @@ -153,17 +164,37 @@ class DifyExporter: def process_conversations(self): """ - 处理会话数据 + 处理会话数据,支持按日期范围过滤消息,精确到小时 + + Note: + 数据库中的created_at是UTC+0时间,会自动转换为UTC+8时间进行过滤 Returns: 处理后的消息信息列表 """ conversations = self.dify_pgsql.get_app_conversations(appid=self.app_id) for conversation in conversations: + if conversation['conversation_id'] == '10d04219-0359-42f7-b9da-2ba039bf87a2': + breakpoint() messages = self.dify_pgsql.get_conversation_messages(conversation_id=conversation['conversation_id']) message_chain_new = self.process_message_chain(messages) - + if len(message_chain_new) != len(messages): + print(f"过滤了{len(messages) - len(message_chain_new)}条消息,会话ID:{conversation['conversation_id']}") + for message in message_chain_new: + # 将UTC+0时间转换为UTC+8时间 + created_at_utc = message['created_at'] + created_at_utc8 = created_at_utc + datetime.timedelta(hours=8) + + # 提取消息的创建日期时间,精确到小时 + created_at_hour = created_at_utc8.strftime("%Y-%m-%d %H") + + # 应用日期时间过滤 + if self.start_date and created_at_hour < self.start_date: + continue + if self.end_date and created_at_hour > self.end_date: + continue + message_info = self.extract_message_info(message) if message_info: self.message_info_list.append(message_info) @@ -246,6 +277,10 @@ class DifyExporter: Returns: 处理后的消息信息列表 + + Note: + 如果在初始化时指定了start_date或end_date,则只会导出指定日期时间范围内的消息 + 数据库中的时间是UTC+0时区,会自动转换为UTC+8时区进行过滤和显示 """ # 加载查询日志 self.load_query_logs() @@ -258,7 +293,17 @@ class DifyExporter: # 如果没有指定输出文件,则使用默认文件名 if output_file is None: timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") - output_file = os.path.join(os.getcwd(), "data", "excel", f"dify_export_{timestamp}.xlsx") + # 如果指定了日期范围,则在文件名中体现 + date_suffix = "" + if self.start_date: + # 将空格替换为下划线,使文件名更规范 + formatted_start = self.start_date.replace(" ", "_") + date_suffix += f"_from_{formatted_start}" + if self.end_date: + # 将空格替换为下划线,使文件名更规范 + formatted_end = self.end_date.replace(" ", "_") + date_suffix += f"_to_{formatted_end}" + output_file = os.path.join(os.getcwd(), "data", "excel", f"dify_export{date_suffix}_{timestamp}.xlsx") # 保存到Excel文件 self.save_to_excel(self.message_info_list, output_file) @@ -276,8 +321,12 @@ if __name__ == "__main__": help='输出Excel文件路径') parser.add_argument('--app_id', '-a', type=str, default=None, help='Dify应用ID') - parser.add_argument('--query_log_file', '-q', type=str, default=None, + parser.add_argument('--query_log_file', '-q', type=str, default="data/query_logs/answer_type_logs.json", help='查询日志文件路径') + parser.add_argument('--start_date', '-s', type=str, default=None, + help='开始日期时间,格式为YYYY-MM-DD HH,例如2025-07-08 14表示2025年7月8日14时(UTC+8时区)') + parser.add_argument('--end_date', '-e', type=str, default=None, + help='结束日期时间,格式为YYYY-MM-DD HH,例如2025-07-08 18表示2025年7月8日18时(UTC+8时区)') args = parser.parse_args() @@ -292,7 +341,9 @@ if __name__ == "__main__": # 创建导出器实例 exporter = DifyExporter( app_id=args.app_id, - query_log_file=args.query_log_file + query_log_file=args.query_log_file, + start_date=args.start_date, + end_date=args.end_date ) # 执行导出