优化DifyExporter类,新增按日期范围过滤消息的功能,支持UTC+8时区的时间转换,并更新导出文件名以包含日期范围信息。同时,重构消息处理逻辑

This commit is contained in:
2025-07-09 16:50:35 +08:00
parent ffd0d52076
commit ec3db656a5
2 changed files with 104 additions and 45 deletions
+14 -6
View File
@@ -40,13 +40,18 @@ class PgSql:
如果连接失败,会抛出异常。
"""
try:
self.DIFY_PG_USER = os.getenv("DIFY_PG_USER")
self.DIFY_PG_PASSWORD = os.getenv("DIFY_PG_PASSWORD")
self.DIFY_PG_HOST = os.getenv("DIFY_PG_HOST")
self.DIFY_PG_PORT = os.getenv("DIFY_PG_PORT")
self.DIFY_PG_DATABASE = os.getenv("DIFY_PG_DATABASE")
# 连接数据库
self.connection = psycopg2.connect(
user=os.getenv("DIFY_PG_USER"),
password=os.getenv("DIFY_PG_PASSWORD"),
host=os.getenv("DIFY_PG_HOST"),
port=os.getenv("DIFY_PG_PORT"),
database=os.getenv("DIFY_PG_DATABASE")
user=self.DIFY_PG_USER,
password=self.DIFY_PG_PASSWORD,
host=self.DIFY_PG_HOST,
port=self.DIFY_PG_PORT,
database=self.DIFY_PG_DATABASE
)
except (Exception, psycopg2.Error) as error:
@@ -273,7 +278,10 @@ class DifyTool:
return None
workflow_node_executions_info = self.dify_pgsql.get_workflow_node_executions_info(messages_info['workflow_run_id'])
if not workflow_node_executions_info:
return None
return {
"messages_info": messages_info,
"workflow_node_executions_info": None
}
return {
"messages_info": messages_info,
"workflow_node_executions_info": workflow_node_executions_info
+90 -39
View File
@@ -13,14 +13,22 @@ from rag2_0.dify.dify_tool import PgSql, DifyTool
class DifyExporter:
"""
Dify数据导出工具,用于从Dify系统中导出对话和消息数据
支持按日期范围过滤消息,可以指定开始日期和结束日期
"""
def __init__(self, app_id=None, query_log_file=None):
def __init__(self, app_id=None, query_log_file=None, start_date=None, end_date=None):
"""
初始化DifyExporter实例
Args:
app_id: Dify应用ID,默认为None
query_log_file: 查询日志文件路径,默认为None
start_date: 开始日期时间,格式为YYYY-MM-DD HH,默认为None(不限制开始日期)
end_date: 结束日期时间,格式为YYYY-MM-DD HH,默认为None(不限制结束日期)
Note:
数据库中的时间是UTC+0时区,会自动转换为UTC+8时区进行过滤和显示
因此输入的start_date和end_date应该是UTC+8时区的时间
"""
# 设置默认值
self.app_id = app_id or "72d03c7d-8bea-42f9-9e8d-cdfb9480f372"
@@ -29,6 +37,10 @@ class DifyExporter:
self.query_log_dir = os.path.join(os.getcwd(), "data", "query_logs")
self.query_log_file = query_log_file or os.path.join(self.query_log_dir, "answer_type_logs.json")
# 设置日期过滤
self.start_date = start_date
self.end_date = end_date
# 初始化工具类
self.dify_pgsql = PgSql()
self.dify_tool = DifyTool()
@@ -76,37 +88,28 @@ class DifyExporter:
message_chain[message["parent_message_id"]] = [message]
message_chain_new = []
current_message_id = None
processed_ids = set() # 防止无限循环
while True:
# 获取当前父消息ID对应的所有消息
msg_list = message_chain.get(current_message_id, [])
# 如果没有消息或已处理过该ID,则退出循环
if not msg_list or current_message_id in processed_ids:
break
# 记录已处理的ID
if current_message_id is not None:
processed_ids.add(current_message_id)
# 使用max()函数找出创建时间最新的消息
new_msg = max(msg_list, key=lambda x: x["created_at"]) if msg_list else None
# 将最新消息添加到结果列表,并更新当前消息ID
if new_msg:
message_chain_new.append(new_msg)
current_message_id = new_msg["id"]
for message in message_chain.values():
if len(message) == 1:
message_chain_new.append(message[0])
else:
break
query_list = {}
for msg in message:
if msg['query'] in query_list:
if query_list[msg['query']]["created_at"] < msg['created_at']:
query_list[msg['query']] = msg
else:
query_list[msg['query']] = msg
for msg in query_list.values():
message_chain_new.append(msg)
return message_chain_new
def extract_message_info(self, message):
"""
从消息中提取信息
Note:
数据库中的created_at是UTC+0时间,会自动转换为UTC+8时间显示
Args:
message: 消息对象
@@ -118,19 +121,27 @@ class DifyExporter:
user_name = msg_inputs.get("user_name", "")
msg_query = message["query"]
msg_answer = message["answer"]
created_at = message['created_at'].strftime("%Y-%m-%d")
# 将UTC+0时间转换为UTC+8时间
created_at_utc = message['created_at']
created_at_utc8 = created_at_utc + datetime.timedelta(hours=8)
created_at = created_at_utc8.strftime("%Y-%m-%d %H:%M")
msg_debug_info = self.dify_tool.get_message_debug_info_by_id(msg_id)
if not msg_debug_info:
return None
wiki_list = []
for node_execution in msg_debug_info['workflow_node_executions_info']:
if node_execution["title"] == "提取处理后的知识":
source_kno = json.loads(node_execution["outputs"])["source_kno"]
knowledge_list_metadata = json.loads(node_execution["outputs"])["knowledge_list_metadata"]
for knowledge in knowledge_list_metadata:
document_name = knowledge['metadata']['document_name']
wiki_list.append(document_name.split("/")[-1])
wiki_list = []
if msg_debug_info['workflow_node_executions_info'] is not None:
for node_execution in msg_debug_info['workflow_node_executions_info']:
if node_execution["title"] == "提取处理后的知识":
if node_execution["outputs"] is None:
break
source_kno = json.loads(node_execution["outputs"])["source_kno"]
knowledge_list_metadata = json.loads(node_execution["outputs"])["knowledge_list_metadata"]
for knowledge in knowledge_list_metadata:
document_name = knowledge['metadata']['document_name']
wiki_list.append(document_name.split("/")[-1])
wiki_list = list(set(wiki_list))
wiki_list_str = "\n".join(wiki_list)
@@ -153,17 +164,37 @@ class DifyExporter:
def process_conversations(self):
"""
处理会话数据
处理会话数据,支持按日期范围过滤消息,精确到小时
Note:
数据库中的created_at是UTC+0时间,会自动转换为UTC+8时间进行过滤
Returns:
处理后的消息信息列表
"""
conversations = self.dify_pgsql.get_app_conversations(appid=self.app_id)
for conversation in conversations:
if conversation['conversation_id'] == '10d04219-0359-42f7-b9da-2ba039bf87a2':
breakpoint()
messages = self.dify_pgsql.get_conversation_messages(conversation_id=conversation['conversation_id'])
message_chain_new = self.process_message_chain(messages)
if len(message_chain_new) != len(messages):
print(f"过滤了{len(messages) - len(message_chain_new)}条消息,会话ID{conversation['conversation_id']}")
for message in message_chain_new:
# 将UTC+0时间转换为UTC+8时间
created_at_utc = message['created_at']
created_at_utc8 = created_at_utc + datetime.timedelta(hours=8)
# 提取消息的创建日期时间,精确到小时
created_at_hour = created_at_utc8.strftime("%Y-%m-%d %H")
# 应用日期时间过滤
if self.start_date and created_at_hour < self.start_date:
continue
if self.end_date and created_at_hour > self.end_date:
continue
message_info = self.extract_message_info(message)
if message_info:
self.message_info_list.append(message_info)
@@ -246,6 +277,10 @@ class DifyExporter:
Returns:
处理后的消息信息列表
Note:
如果在初始化时指定了start_date或end_date,则只会导出指定日期时间范围内的消息
数据库中的时间是UTC+0时区,会自动转换为UTC+8时区进行过滤和显示
"""
# 加载查询日志
self.load_query_logs()
@@ -258,7 +293,17 @@ class DifyExporter:
# 如果没有指定输出文件,则使用默认文件名
if output_file is None:
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
output_file = os.path.join(os.getcwd(), "data", "excel", f"dify_export_{timestamp}.xlsx")
# 如果指定了日期范围,则在文件名中体现
date_suffix = ""
if self.start_date:
# 将空格替换为下划线,使文件名更规范
formatted_start = self.start_date.replace(" ", "_")
date_suffix += f"_from_{formatted_start}"
if self.end_date:
# 将空格替换为下划线,使文件名更规范
formatted_end = self.end_date.replace(" ", "_")
date_suffix += f"_to_{formatted_end}"
output_file = os.path.join(os.getcwd(), "data", "excel", f"dify_export{date_suffix}_{timestamp}.xlsx")
# 保存到Excel文件
self.save_to_excel(self.message_info_list, output_file)
@@ -276,8 +321,12 @@ if __name__ == "__main__":
help='输出Excel文件路径')
parser.add_argument('--app_id', '-a', type=str, default=None,
help='Dify应用ID')
parser.add_argument('--query_log_file', '-q', type=str, default=None,
parser.add_argument('--query_log_file', '-q', type=str, default="data/query_logs/answer_type_logs.json",
help='查询日志文件路径')
parser.add_argument('--start_date', '-s', type=str, default=None,
help='开始日期时间,格式为YYYY-MM-DD HH,例如2025-07-08 14表示2025年7月8日14时(UTC+8时区)')
parser.add_argument('--end_date', '-e', type=str, default=None,
help='结束日期时间,格式为YYYY-MM-DD HH,例如2025-07-08 18表示2025年7月8日18时(UTC+8时区)')
args = parser.parse_args()
@@ -292,7 +341,9 @@ if __name__ == "__main__":
# 创建导出器实例
exporter = DifyExporter(
app_id=args.app_id,
query_log_file=args.query_log_file
query_log_file=args.query_log_file,
start_date=args.start_date,
end_date=args.end_date
)
# 执行导出