新增AnswerType.py文件,创建FastAPI应用以收集用户提问数据类型,添加健康检查和异步检索API,优化日志记录和错误处理。同时,新增DifyExporter类用于导出Dify系统中的对话和消息数据,支持从查询日志加载数据并保存为Excel文件。

This commit is contained in:
2025-07-09 09:12:06 +08:00
parent 9636ed3ed2
commit ffd0d52076
3 changed files with 525 additions and 1 deletions
+149
View File
@@ -0,0 +1,149 @@
# from gevent import monkey
# monkey.patch_all()
import os
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import Dict, List, Any, Optional
import asyncio
from dotenv import load_dotenv
import json
import time
import datetime
import logging
# 加载环境变量
load_dotenv()
def main(query: str) -> dict:
query = query.strip()
escaped_query = json.dumps(query, ensure_ascii=False)
return {
"format_query": escaped_query,
}
import sys
sys.path.append(os.getcwd())
from rag2_0.dify.DifyQueryRetrieval import DifyQueryRetrieval
# 定义文件锁和JSON文件路径
file_lock = asyncio.Lock()
QUERY_LOG_DIR = os.path.join(os.getcwd(), "data", "query_logs")
QUERY_LOG_FILE = os.path.join(QUERY_LOG_DIR, "answer_type_logs.json")
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logging.getLogger('httpx').setLevel(logging.WARNING)
logging.getLogger('openai').setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
# 定义请求模型
class AnswerTypeRequest(BaseModel):
query: str
query_type: str
# 创建FastAPI应用
app = FastAPI(
title="提问数据类型",
description="收集用户提问数据类型",
version="1.0"
)
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 应用启动事件
@app.on_event("startup")
async def startup_event():
# 确保日志目录存在
os.makedirs(QUERY_LOG_DIR, exist_ok=True)
# 确保日志文件存在
if not os.path.exists(QUERY_LOG_FILE):
async with file_lock:
with open(QUERY_LOG_FILE, 'w', encoding='utf-8') as f:
json.dump([], f, ensure_ascii=False)
# 添加健康检查端点
@app.get("/health", summary="健康检查")
async def health_check():
return {"status": "ok"}
@app.get("/query_type", summary="异步检索API")
async def query_type(query: str, query_type: str):
try:
# 记录请求
logger.info(f"接收到请求: {query}, 类型: {query_type}")
# 保存 提问、问题类型、当前时间戳到json
timestamp = datetime.datetime.now().isoformat()
query_data = {
"query": query,
"query_type": query_type,
"timestamp": timestamp
}
success = True
try:
# 使用锁保护文件读写操作
async with file_lock:
# 确保目录存在
os.makedirs(os.path.dirname(QUERY_LOG_FILE), exist_ok=True)
# 读取现有数据
existing_data = []
if os.path.exists(QUERY_LOG_FILE) and os.path.getsize(QUERY_LOG_FILE) > 0:
with open(QUERY_LOG_FILE, 'r', encoding='utf-8') as f:
try:
existing_data = json.load(f)
except json.JSONDecodeError:
logger.error(f"JSON文件解析错误,将创建新文件: {QUERY_LOG_FILE}")
existing_data = []
# 添加新数据
existing_data.append(query_data)
# 写入文件
with open(QUERY_LOG_FILE, 'w', encoding='utf-8') as f:
json.dump(existing_data, f, ensure_ascii=False, indent=2)
logger.info(f"成功保存查询数据到: {QUERY_LOG_FILE}")
except Exception as e:
success = False
logger.error(f"保存查询数据时出错: {str(e)}", exc_info=True)
# 返回响应
content = f"<strong>当前提问</strong>: {query}<br><strong>问题类型</strong>: {query_type}<br><strong>操作是否成功</strong>: {'成功' if success else '失败'}"
return HTMLResponse(content=content)
except Exception as e:
logger.error(f"处理请求时出错: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"处理请求时出错: {str(e)}")
if __name__ == "__main__":
# 使用Uvicorn运行FastAPI应用
import uvicorn
uvicorn.run("rag2_0.dify.AnswerType:app", host="0.0.0.0", port=8003, reload=False, workers=1, log_level="info")
# # 使用uvicorn启动服务
# import uvicorn
# uvicorn.run(
# "rag2_0.dify.intent_recognition_api:app",
# host="0.0.0.0",
# port=8001,
# reload=False, # 开发环境启用热重载
# workers=1 # 生产环境可以增加worker数量
# )
# 生产环境可以使用以下命令启动:
# uvicorn rag2_0.dify.AnswerType:app --host 0.0.0.0 --port 8003 --workers 20
+73 -1
View File
@@ -4,6 +4,9 @@ import psycopg2
import os
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
import sys
sys.path.append(os.getcwd())
from rag2_0.dify.dify_client import ChatClient
from pydantic import BaseModel, Field
from langchain.output_parsers import PydanticOutputParser
@@ -167,6 +170,76 @@ class PgSql:
return None
except (Exception, psycopg2.Error) as error:
raise Exception(f"Error while getting workflow_node_executions_info: {error}")
def get_app_conversations(self, appid:str)->list[str] | None:
"""
根据应用 ID 从 'conversations' 表中获取应用会话信息。
"""
with self.pg_sql_lock:
try:
with self.connection.cursor() as cursor:
cursor.execute(
"""
SELECT DISTINCT conversation_id
FROM messages
WHERE app_id = %s AND invoke_from != 'debugger';
""",
(appid,)
)
result = cursor.fetchall()
if result:
colnames = [desc[0] for desc in cursor.description]
return [dict(zip(colnames, row)) for row in result]
return None
except (Exception, psycopg2.Error) as error:
raise Exception(f"Error while getting app_conversations: {error}")
def get_conversation_messages(self, conversation_id:str)->list[dict] | None:
"""
根据会话 ID 从 'messages' 表中获取会话消息信息。
"""
with self.pg_sql_lock:
try:
with self.connection.cursor() as cursor:
cursor.execute(
"""
SELECT * FROM messages WHERE conversation_id = %s AND status = 'normal'
""",
(conversation_id,)
)
result = cursor.fetchall()
if result:
colnames = [desc[0] for desc in cursor.description]
return [dict(zip(colnames, row)) for row in result]
return None
except (Exception, psycopg2.Error) as error:
raise Exception(f"Error while getting conversation_messages: {error}")
def get_message_rating(self, msg_id):
"""
通过msg_id从message_feedbacks中找到对应的rating。
:param msg_id: 消息ID (UUID格式)
:return: rating 字符串
"""
with self.pg_sql_lock:
rating = None
try:
with self.connection.cursor() as cursor:
# 构建查询语句
cursor.execute("""
SELECT rating
FROM message_feedbacks
WHERE message_id = %s
""",
(msg_id,))
# 执行查询
row = cursor.fetchone()
if row:
rating = row[0]
except (Exception, psycopg2.Error) as error:
raise Exception(f"Error while getting conversation_messages: {error}")
return rating
class DifyTool:
"""
@@ -388,7 +461,6 @@ content: "{content}"
avg_score = total_score / valid_scores if valid_scores > 0 else 0
return retrieve_title, max_score, min_score, avg_score
class NewWorkflowChat(BaseWorkflowChat):
"""
新工作流对话类,用于调用新工作流发送对话并解析获取相关数据
+303
View File
@@ -0,0 +1,303 @@
from dotenv import load_dotenv
import os
import json
import datetime
import pandas as pd
import sys
sys.path.append(os.getcwd())
from rag2_0.dify.dify_tool import PgSql, DifyTool
class DifyExporter:
"""
Dify数据导出工具,用于从Dify系统中导出对话和消息数据
"""
def __init__(self, app_id=None, query_log_file=None):
"""
初始化DifyExporter实例
Args:
app_id: Dify应用ID,默认为None
query_log_file: 查询日志文件路径,默认为None
"""
# 设置默认值
self.app_id = app_id or "72d03c7d-8bea-42f9-9e8d-cdfb9480f372"
# 设置查询日志文件路径
self.query_log_dir = os.path.join(os.getcwd(), "data", "query_logs")
self.query_log_file = query_log_file or os.path.join(self.query_log_dir, "answer_type_logs.json")
# 初始化工具类
self.dify_pgsql = PgSql()
self.dify_tool = DifyTool()
# 初始化数据存储
self.message_info_list = []
self.query_logs = {}
def load_query_logs(self):
"""
从文件加载查询日志
"""
try:
with open(self.query_log_file, 'r', encoding='utf-8') as f:
query_logs_list = json.load(f)
# 创建字典来存储每个查询的最新记录
for record in query_logs_list:
query = record['query']
timestamp = record.get('timestamp')
# 如果查询不在字典中或者当前记录的时间戳更新,则更新字典
if query not in self.query_logs or (timestamp and self.query_logs.get(query, {}).get('timestamp') and
datetime.datetime.fromisoformat(timestamp) >
datetime.datetime.fromisoformat(self.query_logs[query]['timestamp'])):
self.query_logs[query] = record
return True
except Exception as e:
print(f"加载查询日志失败: {e}")
return False
def process_message_chain(self, messages):
"""
处理消息链,按照时间顺序重新组织消息
Args:
messages: 消息列表
Returns:
按时间顺序组织的消息列表
"""
message_chain = {}
for message in messages:
if message["parent_message_id"] in message_chain:
message_chain[message["parent_message_id"]].append(message)
else:
message_chain[message["parent_message_id"]] = [message]
message_chain_new = []
current_message_id = None
processed_ids = set() # 防止无限循环
while True:
# 获取当前父消息ID对应的所有消息
msg_list = message_chain.get(current_message_id, [])
# 如果没有消息或已处理过该ID,则退出循环
if not msg_list or current_message_id in processed_ids:
break
# 记录已处理的ID
if current_message_id is not None:
processed_ids.add(current_message_id)
# 使用max()函数找出创建时间最新的消息
new_msg = max(msg_list, key=lambda x: x["created_at"]) if msg_list else None
# 将最新消息添加到结果列表,并更新当前消息ID
if new_msg:
message_chain_new.append(new_msg)
current_message_id = new_msg["id"]
else:
break
return message_chain_new
def extract_message_info(self, message):
"""
从消息中提取信息
Args:
message: 消息对象
Returns:
包含消息信息的字典
"""
msg_id = message["id"]
msg_inputs = message["inputs"]
user_name = msg_inputs.get("user_name", "")
msg_query = message["query"]
msg_answer = message["answer"]
created_at = message['created_at'].strftime("%Y-%m-%d")
msg_debug_info = self.dify_tool.get_message_debug_info_by_id(msg_id)
if not msg_debug_info:
return None
wiki_list = []
for node_execution in msg_debug_info['workflow_node_executions_info']:
if node_execution["title"] == "提取处理后的知识":
source_kno = json.loads(node_execution["outputs"])["source_kno"]
knowledge_list_metadata = json.loads(node_execution["outputs"])["knowledge_list_metadata"]
for knowledge in knowledge_list_metadata:
document_name = knowledge['metadata']['document_name']
wiki_list.append(document_name.split("/")[-1])
wiki_list = list(set(wiki_list))
wiki_list_str = "\n".join(wiki_list)
if wiki_list_str == "":
wiki_list_str = ""
rating = self.dify_pgsql.get_message_rating(msg_id)
# 直接通过字典键获取query_type
query_type = self.query_logs.get(msg_query, {}).get('query_type', "")
return {
"msg_id": msg_id,
"提问": msg_query,
"回答": msg_answer,
"提问人": user_name,
"提问时间": created_at,
"评价": rating,
"问题分类": query_type,
"检索到的词条": wiki_list_str
}
def process_conversations(self):
"""
处理会话数据
Returns:
处理后的消息信息列表
"""
conversations = self.dify_pgsql.get_app_conversations(appid=self.app_id)
for conversation in conversations:
messages = self.dify_pgsql.get_conversation_messages(conversation_id=conversation['conversation_id'])
message_chain_new = self.process_message_chain(messages)
for message in message_chain_new:
message_info = self.extract_message_info(message)
if message_info:
self.message_info_list.append(message_info)
return self.message_info_list
def save_to_excel(self, message_info_list, output_file):
"""
将消息信息列表保存到Excel文件
Args:
message_info_list: 消息信息列表
output_file: 输出文件路径
Returns:
输出文件路径
"""
# 创建DataFrame
df = pd.DataFrame(message_info_list)
# 设置列的顺序
columns_order = [
"msg_id", "提问", "回答", "提问人", "提问时间",
"评价", "问题分类", "检索到的词条"
]
# 确保所有列都存在,如果不存在则添加空列
for col in columns_order:
if col not in df.columns:
df[col] = None
# 按指定顺序重排列
df = df[columns_order]
# 确保目录存在
os.makedirs(os.path.dirname(output_file), exist_ok=True)
# 创建ExcelWriter对象,用于设置Excel样式
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
# 写入数据
df.to_excel(writer, index=False, sheet_name='Dify对话记录')
# 获取工作簿和工作表
workbook = writer.book
worksheet = writer.sheets['Dify对话记录']
# 设置行高(20磅 ≈ 26.67像素)
for row in worksheet.iter_rows():
worksheet.row_dimensions[row[0].row].height = 20
# 设置列宽
column_widths = {
"msg_id": 15,
"提问": 40,
"回答": 60,
"提问人": 15,
"提问时间": 15,
"评价": 10,
"问题分类": 20,
"检索到的词条": 40
}
# 应用列宽设置
for i, column in enumerate(columns_order):
col_letter = chr(65 + i) # A, B, C, ...
if i >= 26: # 超过Z的情况
col_letter = chr(64 + i // 26) + chr(65 + i % 26)
worksheet.column_dimensions[col_letter].width = column_widths[column]
print(f"结果已保存到 {output_file}")
return output_file
def export(self, output_file=None):
"""
执行导出流程
Args:
output_file: 输出文件路径,默认为None(自动生成文件名)
Returns:
处理后的消息信息列表
"""
# 加载查询日志
self.load_query_logs()
# 处理会话数据
self.process_conversations()
# 如果指定了输出文件,保存结果
if output_file or len(self.message_info_list) > 0:
# 如果没有指定输出文件,则使用默认文件名
if output_file is None:
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
output_file = os.path.join(os.getcwd(), "data", "excel", f"dify_export_{timestamp}.xlsx")
# 保存到Excel文件
self.save_to_excel(self.message_info_list, output_file)
return self.message_info_list
# 示例用法
if __name__ == "__main__":
import argparse
# 解析命令行参数
parser = argparse.ArgumentParser(description='Dify数据导出工具')
parser.add_argument('--output', '-o', type=str, default="data/excel/dify_export.xlsx",
help='输出Excel文件路径')
parser.add_argument('--app_id', '-a', type=str, default=None,
help='Dify应用ID')
parser.add_argument('--query_log_file', '-q', type=str, default=None,
help='查询日志文件路径')
args = parser.parse_args()
load_dotenv()
# 设置环境变量
os.environ["DIFY_PG_HOST"] = "10.1.16.39"
os.environ["DIFY_PG_PORT"] = "5432"
os.environ["DIFY_PG_USER"] = "postgres"
os.environ["DIFY_PG_PASSWORD"] = "difyai123456"
os.environ["DIFY_PG_DATABASE"] = "dify"
# 创建导出器实例
exporter = DifyExporter(
app_id=args.app_id,
query_log_file=args.query_log_file
)
# 执行导出
results = exporter.export(output_file=args.output)
# 打印结果
print(f"导出了 {len(results)} 条消息信息")