Files

441 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from dotenv import load_dotenv
import os
import json
import datetime
import pandas as pd
import sys
from rag2_0.dify.dify_tool import DifyTool
import requests
class DifyExporter:
"""
Dify数据导出工具,用于从Dify系统中导出对话和消息数据
支持按日期范围过滤消息,可以指定开始日期和结束日期
"""
def __init__(self, app_id=None, start_date=None, end_date=None):
"""
初始化DifyExporter实例
Args:
app_id: Dify应用ID,默认为None
start_date: 开始日期时间,格式为YYYY-MM-DD HH,默认为None(不限制开始日期)
end_date: 结束日期时间,格式为YYYY-MM-DD HH,默认为None(不限制结束日期)
Note:
数据库中的时间是UTC+0时区,会自动转换为UTC+8时区进行过滤和显示
因此输入的start_date和end_date应该是UTC+8时区的时间
"""
# 设置默认值
self.app_id = app_id or "72d03c7d-8bea-42f9-9e8d-cdfb9480f372"
# 设置日期过滤,转换为datetime对象
self.start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d %H") if start_date else None
self.end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d %H") if end_date else None
# 初始化工具类
self.dify_tool = DifyTool()
# 初始化数据存储
self.message_info_list = []
# 设置AnswerType服务地址
self.answer_type_url = f"http://172.20.0.145:8003"
def process_message_chain(self, messages):
"""
处理消息链,按照时间顺序重新组织消息
Args:
messages: 消息列表
Returns:
按时间顺序组织的消息列表
"""
message_chain = {}
for message in messages:
if message["parent_message_id"] in message_chain:
message_chain[message["parent_message_id"]].append(message)
else:
message_chain[message["parent_message_id"]] = [message]
message_chain_new = []
for message in message_chain.values():
if len(message) == 1:
message_chain_new.append(message[0])
else:
query_list = {}
for msg in message:
if msg['query'] in query_list:
if query_list[msg['query']]["created_at"] < msg['created_at']:
query_list[msg['query']] = msg
else:
query_list[msg['query']] = msg
for msg in query_list.values():
message_chain_new.append(msg)
return message_chain_new
def get_remark(self, msg_debug_info):
"""
获取备注
"""
intent_node_execution_info = [node_execution_info for node_execution_info in msg_debug_info['workflow_node_executions_info']
if node_execution_info["title"] == "意图识别结果解析"]
if len(intent_node_execution_info) == 0:
return ""
if intent_node_execution_info[0]["outputs"] is None:
return ""
intent_result = json.loads(intent_node_execution_info[0]["outputs"])
vertical_classification = intent_result.get("vertical_classification", "")
sub_classification = intent_result.get("sub_classification", "")
if sub_classification == "固定话术类":
return "使用固定话术"
worker_node_execution_info = [node_execution_info for node_execution_info in msg_debug_info['workflow_node_executions_info']
if node_execution_info["title"] == "检索工单数据"]
if len(worker_node_execution_info) != 0:
return "检索工单"
return ""
def get_node_info_by_title(self, workflow_node_executions_info:list, title:str) -> dict:
"""
获取指定标题的节点信息
"""
if workflow_node_executions_info is None:
return None
for node_execution in workflow_node_executions_info:
if node_execution["title"] == title:
return node_execution
return None
def get_wiki_list(self, msg_debug_info) -> list:
"""
获取检索到的词条列表
"""
wiki_list = []
if msg_debug_info['workflow_node_executions_info'] is None:
return []
node_execution = self.get_node_info_by_title(msg_debug_info['workflow_node_executions_info'], "提取处理后的知识")
if node_execution is not None:
if node_execution["outputs"] is None:
return []
outputs = json.loads(node_execution["outputs"])
source_kno = outputs["source_kno"]
knowledge_list_metadata = outputs["knowledge_list_metadata"]
for knowledge in knowledge_list_metadata:
document_name = knowledge['metadata']['document_name']
doc_metadata = knowledge['metadata']['doc_metadata']
if doc_metadata is None or doc_metadata.get("workorder_time", None) is not None:
wiki_list.append(document_name.split("/")[-1])
else:
dataset_name = knowledge['metadata']['dataset_name']
wiki_list.append(f"{dataset_name} - {document_name.split('/')[-1]}")
return wiki_list
lock_node_execution = self.get_node_info_by_title(msg_debug_info['workflow_node_executions_info'], "软件锁知识")
if lock_node_execution is not None:
if lock_node_execution["outputs"] is None:
return []
outputs = json.loads(lock_node_execution["outputs"])
if 'json' in outputs:
source_kno = outputs['json'][0]['retrieve_result']
else:
source_kno = outputs['result']['value']
for knowledge in source_kno:
document_name = knowledge['metadata']['document_name']
wiki_list.append(document_name.split("/")[-1])
if "锁信息查询" not in wiki_list:
wiki_list.append("锁信息查询")
if "软件锁注册、激活、查锁、试用锁延期" not in wiki_list:
wiki_list.append("软件锁注册、激活、查锁、试用锁延期")
return wiki_list
return []
def get_query_type_from_service(self, workflow_run_id):
"""
从HTTP服务获取查询类型
Args:
workflow_run_id: 工作流运行ID
Returns:
查询类型字符串,如果获取失败则返回空字符串
"""
try:
url = f"{self.answer_type_url}/query_by_workflow_id?workflow_run_id={workflow_run_id}"
response = requests.get(url, timeout=2)
if response.status_code == 200:
data = response.json()
if data.get("data") and len(data["data"]) > 0:
return data["data"][0]["query_type"]
return ""
except Exception as e:
print(f"获取查询类型时出错: {e}")
return ""
def get_dislike_reason_from_service(self, workflow_run_id):
"""
从HTTP服务获取查询类型
Args:
workflow_run_id: 工作流运行ID
Returns:
查询类型字符串,如果获取失败则返回空字符串
"""
try:
url = f"{self.answer_type_url}/dislike_by_workflow_id?workflow_run_id={workflow_run_id}"
response = requests.get(url, timeout=2)
if response.status_code == 200:
data = response.json()
if data.get("data") and len(data["data"]) > 0:
return data["data"][0]["dislike_reason"]
return ""
except Exception as e:
print(f"获取查询类型时出错: {e}")
return ""
def extract_message_info(self, message):
"""
从消息中提取信息
Note:
数据库中的created_at是UTC+0时间,会自动转换为UTC+8时间显示
Args:
message: 消息对象
Returns:
包含消息信息的字典
"""
msg_id = message["id"]
msg_inputs = message["inputs"]
user_name = msg_inputs.get("user_name", "")
current_softname = msg_inputs.get("current_softname", "")
msg_query = message["query"]
msg_answer = message["answer"]
msg_answer = msg_answer.split("----------------------------------------")[0]
# 将UTC+0时间转换为UTC+8时间
created_at_utc = message['created_at']
created_at_utc8 = created_at_utc + datetime.timedelta(hours=8)
created_at = created_at_utc8.strftime("%Y-%m-%d %H:%M")
msg_debug_info = self.dify_tool.get_message_debug_info_by_id(msg_id)
if not msg_debug_info:
return None
wiki_list = self.get_wiki_list(msg_debug_info)
if len(wiki_list) ==0:
wiki_list_str = self.get_remark(msg_debug_info)
else:
wiki_list = list(set(wiki_list))
wiki_list_str = "\n".join(wiki_list)
rating = self.dify_tool.get_message_rating(msg_id)
# 从HTTP服务获取query_type
workflow_run_id = message['workflow_run_id']
query_type = self.get_query_type_from_service(workflow_run_id)
dislike_reason = self.get_dislike_reason_from_service(workflow_run_id)
return {
"msg_id": msg_id,
"提问": msg_query,
"当前软件": current_softname,
"回答": msg_answer,
"提问人": user_name,
"提问时间": created_at,
"评价": rating,
"问题分类": query_type,
"检索到的词条": wiki_list_str,
"点踩原因": dislike_reason
}
def process_conversations(self):
"""
处理会话数据,支持按日期范围过滤消息,精确到小时
Note:
数据库中的created_at是UTC+0时间,会自动转换为UTC+8时间进行过滤
Returns:
处理后的消息信息列表
"""
conversations = self.dify_tool.get_app_conversations(appid=self.app_id)
for conversation in conversations:
messages = self.dify_tool.get_conversation_messages(conversation_id=conversation['conversation_id'])
message_chain_new = self.process_message_chain(messages)
if len(message_chain_new) != len(messages):
print(f"过滤了{len(messages) - len(message_chain_new)}条消息,会话ID{conversation['conversation_id']}")
for message in message_chain_new:
# 将UTC+0时间转换为UTC+8时间
created_at_utc = message['created_at']
created_at_utc8 = created_at_utc + datetime.timedelta(hours=8)
# 应用日期时间过滤
if self.start_date and created_at_utc8 < self.start_date:
continue
if self.end_date and created_at_utc8 > self.end_date:
continue
message_info = self.extract_message_info(message)
if message_info:
self.message_info_list.append(message_info)
return self.message_info_list
def save_to_excel(self, message_info_list, output_file):
"""
将消息信息列表保存到Excel文件
Args:
message_info_list: 消息信息列表
output_file: 输出文件路径
Returns:
输出文件路径
"""
# 创建DataFrame
df = pd.DataFrame(message_info_list)
# 设置列的顺序
columns_order = [
"msg_id","当前软件", "提问", "回答", "提问人", "提问时间",
"评价", "问题分类", "检索到的词条", "点踩原因"
]
# 确保所有列都存在,如果不存在则添加空列
for col in columns_order:
if col not in df.columns:
df[col] = None
# 按指定顺序重排列
df = df[columns_order]
# 确保目录存在
os.makedirs(os.path.dirname(output_file), exist_ok=True)
# 创建ExcelWriter对象,用于设置Excel样式
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
# 写入数据
df.to_excel(writer, index=False, sheet_name='Dify对话记录')
# 获取工作簿和工作表
workbook = writer.book
worksheet = writer.sheets['Dify对话记录']
# 设置行高(20磅 ≈ 26.67像素)
for row in worksheet.iter_rows():
worksheet.row_dimensions[row[0].row].height = 20
# 设置列宽
column_widths = {
"msg_id": 15,
"当前软件": 15,
"提问": 40,
"回答": 60,
"提问人": 15,
"提问时间": 15,
"评价": 10,
"问题分类": 20,
"检索到的词条": 40,
"点踩原因": 20
}
# 应用列宽设置
for i, column in enumerate(columns_order):
col_letter = chr(65 + i) # A, B, C, ...
if i >= 26: # 超过Z的情况
col_letter = chr(64 + i // 26) + chr(65 + i % 26)
worksheet.column_dimensions[col_letter].width = column_widths[column]
print(f"结果已保存到 {output_file}")
return output_file
def export(self, output_file=None):
"""
执行导出流程
Args:
output_file: 输出文件路径,默认为None(自动生成文件名)
Returns:
处理后的消息信息列表
Note:
如果在初始化时指定了start_date或end_date,则只会导出指定日期时间范围内的消息
数据库中的时间是UTC+0时区,会自动转换为UTC+8时区进行过滤和显示
"""
# 处理会话数据
self.process_conversations()
# 如果指定了输出文件,保存结果
if output_file or len(self.message_info_list) > 0:
# 如果没有指定输出文件,则使用默认文件名
if output_file is None:
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
# 如果指定了日期范围,则在文件名中体现
date_suffix = ""
if self.start_date:
# 格式化日期对象为字符串
formatted_start = self.start_date.strftime("%Y-%m-%d_%H")
date_suffix += f"_from_{formatted_start}"
if self.end_date:
# 格式化日期对象为字符串
formatted_end = self.end_date.strftime("%Y-%m-%d_%H")
date_suffix += f"_to_{formatted_end}"
output_file = os.path.join(os.getcwd(), "data", "excel", f"dify_export{date_suffix}_{timestamp}.xlsx")
# 保存到Excel文件
self.save_to_excel(self.message_info_list, output_file)
return self.message_info_list
# 示例用法
if __name__ == "__main__":
import argparse
# 解析命令行参数
parser = argparse.ArgumentParser(description='Dify数据导出工具')
parser.add_argument('--output', '-o', type=str, default="data/excel/dify_export.xlsx",
help='输出Excel文件路径')
parser.add_argument('--app_id', '-a', type=str, default="72d03c7d-8bea-42f9-9e8d-cdfb9480f372",
help='Dify应用ID')
parser.add_argument('--start_date', '-s', type=str, default="2025-09-10 00",
help='开始日期时间,格式为YYYY-MM-DD HH,例如2025-07-08 14表示2025年7月8日14时(UTC+8时区)')
parser.add_argument('--end_date', '-e', type=str, default="2025-09-19 23",
help='结束日期时间,格式为YYYY-MM-DD HH,例如2025-07-08 18表示2025年7月8日18时(UTC+8时区)')
args = parser.parse_args()
load_dotenv()
# 设置环境变量
os.environ["DIFY_PG_HOST"] = "172.20.0.145"
os.environ["DIFY_PG_PORT"] = "5432"
os.environ["DIFY_PG_USER"] = "postgres"
os.environ["DIFY_PG_PASSWORD"] = "difyai123456"
os.environ["DIFY_PG_DATABASE"] = "dify"
# 创建导出器实例
exporter = DifyExporter(
app_id=args.app_id,
start_date=args.start_date,
end_date=args.end_date
)
# 执行导出
results = exporter.export(output_file=args.output)
# 打印结果
print(f"导出了 {len(results)} 条消息信息")