diff --git a/rag2_0/dify/AnswerType.py b/rag2_0/dify/AnswerType.py new file mode 100644 index 0000000..74f3187 --- /dev/null +++ b/rag2_0/dify/AnswerType.py @@ -0,0 +1,149 @@ +# from gevent import monkey +# monkey.patch_all() + +import os +from fastapi import FastAPI, HTTPException, Request +from fastapi.responses import JSONResponse, HTMLResponse +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel, Field +from typing import Dict, List, Any, Optional +import asyncio + +from dotenv import load_dotenv +import json +import time +import datetime +import logging +# 加载环境变量 +load_dotenv() + +def main(query: str) -> dict: + query = query.strip() + escaped_query = json.dumps(query, ensure_ascii=False) + return { + "format_query": escaped_query, + } + + +import sys +sys.path.append(os.getcwd()) +from rag2_0.dify.DifyQueryRetrieval import DifyQueryRetrieval + +# 定义文件锁和JSON文件路径 +file_lock = asyncio.Lock() +QUERY_LOG_DIR = os.path.join(os.getcwd(), "data", "query_logs") +QUERY_LOG_FILE = os.path.join(QUERY_LOG_DIR, "answer_type_logs.json") + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler() + ] +) +logging.getLogger('httpx').setLevel(logging.WARNING) +logging.getLogger('openai').setLevel(logging.WARNING) + +logger = logging.getLogger(__name__) + +# 定义请求模型 +class AnswerTypeRequest(BaseModel): + query: str + query_type: str + +# 创建FastAPI应用 +app = FastAPI( + title="提问数据类型", + description="收集用户提问数据类型", + version="1.0" +) + +# 添加CORS中间件 +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# 应用启动事件 +@app.on_event("startup") +async def startup_event(): + # 确保日志目录存在 + os.makedirs(QUERY_LOG_DIR, exist_ok=True) + # 确保日志文件存在 + if not os.path.exists(QUERY_LOG_FILE): + async with file_lock: + with open(QUERY_LOG_FILE, 'w', encoding='utf-8') as f: + json.dump([], f, ensure_ascii=False) + +# 添加健康检查端点 +@app.get("/health", summary="健康检查") +async def health_check(): + return {"status": "ok"} + +@app.get("/query_type", summary="异步检索API") +async def query_type(query: str, query_type: str): + try: + # 记录请求 + logger.info(f"接收到请求: {query}, 类型: {query_type}") + + # 保存 提问、问题类型、当前时间戳到json + timestamp = datetime.datetime.now().isoformat() + query_data = { + "query": query, + "query_type": query_type, + "timestamp": timestamp + } + success = True + try: + # 使用锁保护文件读写操作 + async with file_lock: + # 确保目录存在 + os.makedirs(os.path.dirname(QUERY_LOG_FILE), exist_ok=True) + + # 读取现有数据 + existing_data = [] + if os.path.exists(QUERY_LOG_FILE) and os.path.getsize(QUERY_LOG_FILE) > 0: + with open(QUERY_LOG_FILE, 'r', encoding='utf-8') as f: + try: + existing_data = json.load(f) + except json.JSONDecodeError: + logger.error(f"JSON文件解析错误,将创建新文件: {QUERY_LOG_FILE}") + existing_data = [] + + # 添加新数据 + existing_data.append(query_data) + + # 写入文件 + with open(QUERY_LOG_FILE, 'w', encoding='utf-8') as f: + json.dump(existing_data, f, ensure_ascii=False, indent=2) + + logger.info(f"成功保存查询数据到: {QUERY_LOG_FILE}") + except Exception as e: + success = False + logger.error(f"保存查询数据时出错: {str(e)}", exc_info=True) + + # 返回响应 + content = f"当前提问: {query}
问题类型: {query_type}
操作是否成功: {'成功' if success else '失败'}" + return HTMLResponse(content=content) + except Exception as e: + logger.error(f"处理请求时出错: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"处理请求时出错: {str(e)}") + +if __name__ == "__main__": + # 使用Uvicorn运行FastAPI应用 + import uvicorn + uvicorn.run("rag2_0.dify.AnswerType:app", host="0.0.0.0", port=8003, reload=False, workers=1, log_level="info") + # # 使用uvicorn启动服务 + # import uvicorn + # uvicorn.run( + # "rag2_0.dify.intent_recognition_api:app", + # host="0.0.0.0", + # port=8001, + # reload=False, # 开发环境启用热重载 + # workers=1 # 生产环境可以增加worker数量 + # ) + # 生产环境可以使用以下命令启动: + # uvicorn rag2_0.dify.AnswerType:app --host 0.0.0.0 --port 8003 --workers 20 \ No newline at end of file diff --git a/rag2_0/dify/dify_tool.py b/rag2_0/dify/dify_tool.py index 50d2e01..0c50101 100755 --- a/rag2_0/dify/dify_tool.py +++ b/rag2_0/dify/dify_tool.py @@ -4,6 +4,9 @@ import psycopg2 import os import json from concurrent.futures import ThreadPoolExecutor, as_completed + +import sys +sys.path.append(os.getcwd()) from rag2_0.dify.dify_client import ChatClient from pydantic import BaseModel, Field from langchain.output_parsers import PydanticOutputParser @@ -167,6 +170,76 @@ class PgSql: return None except (Exception, psycopg2.Error) as error: raise Exception(f"Error while getting workflow_node_executions_info: {error}") + + def get_app_conversations(self, appid:str)->list[str] | None: + """ + 根据应用 ID 从 'conversations' 表中获取应用会话信息。 + """ + with self.pg_sql_lock: + try: + with self.connection.cursor() as cursor: + cursor.execute( + """ + SELECT DISTINCT conversation_id + FROM messages + WHERE app_id = %s AND invoke_from != 'debugger'; + """, + (appid,) + ) + result = cursor.fetchall() + if result: + colnames = [desc[0] for desc in cursor.description] + return [dict(zip(colnames, row)) for row in result] + return None + except (Exception, psycopg2.Error) as error: + raise Exception(f"Error while getting app_conversations: {error}") + + def get_conversation_messages(self, conversation_id:str)->list[dict] | None: + """ + 根据会话 ID 从 'messages' 表中获取会话消息信息。 + """ + with self.pg_sql_lock: + try: + with self.connection.cursor() as cursor: + cursor.execute( + """ + SELECT * FROM messages WHERE conversation_id = %s AND status = 'normal' + """, + (conversation_id,) + ) + result = cursor.fetchall() + if result: + colnames = [desc[0] for desc in cursor.description] + return [dict(zip(colnames, row)) for row in result] + return None + except (Exception, psycopg2.Error) as error: + raise Exception(f"Error while getting conversation_messages: {error}") + + def get_message_rating(self, msg_id): + """ + 通过msg_id从message_feedbacks中找到对应的rating。 + :param msg_id: 消息ID (UUID格式) + :return: rating 字符串 + """ + with self.pg_sql_lock: + rating = None + try: + with self.connection.cursor() as cursor: + # 构建查询语句 + cursor.execute(""" + SELECT rating + FROM message_feedbacks + WHERE message_id = %s + """, + (msg_id,)) + # 执行查询 + row = cursor.fetchone() + + if row: + rating = row[0] + except (Exception, psycopg2.Error) as error: + raise Exception(f"Error while getting conversation_messages: {error}") + return rating class DifyTool: """ @@ -388,7 +461,6 @@ content: "{content}" avg_score = total_score / valid_scores if valid_scores > 0 else 0 return retrieve_title, max_score, min_score, avg_score - class NewWorkflowChat(BaseWorkflowChat): """ 新工作流对话类,用于调用新工作流发送对话并解析获取相关数据 diff --git a/rag2_0/dify/export_new_dify.py b/rag2_0/dify/export_new_dify.py new file mode 100644 index 0000000..a145391 --- /dev/null +++ b/rag2_0/dify/export_new_dify.py @@ -0,0 +1,303 @@ +from dotenv import load_dotenv +import os +import json +import datetime +import pandas as pd + + +import sys +sys.path.append(os.getcwd()) +from rag2_0.dify.dify_tool import PgSql, DifyTool + + +class DifyExporter: + """ + Dify数据导出工具,用于从Dify系统中导出对话和消息数据 + """ + def __init__(self, app_id=None, query_log_file=None): + """ + 初始化DifyExporter实例 + + Args: + app_id: Dify应用ID,默认为None + query_log_file: 查询日志文件路径,默认为None + """ + # 设置默认值 + self.app_id = app_id or "72d03c7d-8bea-42f9-9e8d-cdfb9480f372" + + # 设置查询日志文件路径 + self.query_log_dir = os.path.join(os.getcwd(), "data", "query_logs") + self.query_log_file = query_log_file or os.path.join(self.query_log_dir, "answer_type_logs.json") + + # 初始化工具类 + self.dify_pgsql = PgSql() + self.dify_tool = DifyTool() + + # 初始化数据存储 + self.message_info_list = [] + self.query_logs = {} + + def load_query_logs(self): + """ + 从文件加载查询日志 + """ + try: + with open(self.query_log_file, 'r', encoding='utf-8') as f: + query_logs_list = json.load(f) + # 创建字典来存储每个查询的最新记录 + for record in query_logs_list: + query = record['query'] + timestamp = record.get('timestamp') + # 如果查询不在字典中或者当前记录的时间戳更新,则更新字典 + if query not in self.query_logs or (timestamp and self.query_logs.get(query, {}).get('timestamp') and + datetime.datetime.fromisoformat(timestamp) > + datetime.datetime.fromisoformat(self.query_logs[query]['timestamp'])): + self.query_logs[query] = record + return True + except Exception as e: + print(f"加载查询日志失败: {e}") + return False + + def process_message_chain(self, messages): + """ + 处理消息链,按照时间顺序重新组织消息 + + Args: + messages: 消息列表 + + Returns: + 按时间顺序组织的消息列表 + """ + message_chain = {} + for message in messages: + if message["parent_message_id"] in message_chain: + message_chain[message["parent_message_id"]].append(message) + else: + message_chain[message["parent_message_id"]] = [message] + + message_chain_new = [] + current_message_id = None + processed_ids = set() # 防止无限循环 + + while True: + # 获取当前父消息ID对应的所有消息 + msg_list = message_chain.get(current_message_id, []) + + # 如果没有消息或已处理过该ID,则退出循环 + if not msg_list or current_message_id in processed_ids: + break + + # 记录已处理的ID + if current_message_id is not None: + processed_ids.add(current_message_id) + + # 使用max()函数找出创建时间最新的消息 + new_msg = max(msg_list, key=lambda x: x["created_at"]) if msg_list else None + + # 将最新消息添加到结果列表,并更新当前消息ID + if new_msg: + message_chain_new.append(new_msg) + current_message_id = new_msg["id"] + else: + break + + return message_chain_new + + def extract_message_info(self, message): + """ + 从消息中提取信息 + + Args: + message: 消息对象 + + Returns: + 包含消息信息的字典 + """ + msg_id = message["id"] + msg_inputs = message["inputs"] + user_name = msg_inputs.get("user_name", "") + msg_query = message["query"] + msg_answer = message["answer"] + created_at = message['created_at'].strftime("%Y-%m-%d") + msg_debug_info = self.dify_tool.get_message_debug_info_by_id(msg_id) + if not msg_debug_info: + return None + wiki_list = [] + + for node_execution in msg_debug_info['workflow_node_executions_info']: + if node_execution["title"] == "提取处理后的知识": + source_kno = json.loads(node_execution["outputs"])["source_kno"] + knowledge_list_metadata = json.loads(node_execution["outputs"])["knowledge_list_metadata"] + for knowledge in knowledge_list_metadata: + document_name = knowledge['metadata']['document_name'] + wiki_list.append(document_name.split("/")[-1]) + + wiki_list = list(set(wiki_list)) + wiki_list_str = "\n".join(wiki_list) + if wiki_list_str == "": + wiki_list_str = "无" + rating = self.dify_pgsql.get_message_rating(msg_id) + # 直接通过字典键获取query_type + query_type = self.query_logs.get(msg_query, {}).get('query_type', "") + + return { + "msg_id": msg_id, + "提问": msg_query, + "回答": msg_answer, + "提问人": user_name, + "提问时间": created_at, + "评价": rating, + "问题分类": query_type, + "检索到的词条": wiki_list_str + } + + def process_conversations(self): + """ + 处理会话数据 + + Returns: + 处理后的消息信息列表 + """ + conversations = self.dify_pgsql.get_app_conversations(appid=self.app_id) + for conversation in conversations: + messages = self.dify_pgsql.get_conversation_messages(conversation_id=conversation['conversation_id']) + message_chain_new = self.process_message_chain(messages) + + for message in message_chain_new: + message_info = self.extract_message_info(message) + if message_info: + self.message_info_list.append(message_info) + + return self.message_info_list + + def save_to_excel(self, message_info_list, output_file): + """ + 将消息信息列表保存到Excel文件 + + Args: + message_info_list: 消息信息列表 + output_file: 输出文件路径 + + Returns: + 输出文件路径 + """ + # 创建DataFrame + df = pd.DataFrame(message_info_list) + + # 设置列的顺序 + columns_order = [ + "msg_id", "提问", "回答", "提问人", "提问时间", + "评价", "问题分类", "检索到的词条" + ] + + # 确保所有列都存在,如果不存在则添加空列 + for col in columns_order: + if col not in df.columns: + df[col] = None + + # 按指定顺序重排列 + df = df[columns_order] + + # 确保目录存在 + os.makedirs(os.path.dirname(output_file), exist_ok=True) + + # 创建ExcelWriter对象,用于设置Excel样式 + with pd.ExcelWriter(output_file, engine='openpyxl') as writer: + # 写入数据 + df.to_excel(writer, index=False, sheet_name='Dify对话记录') + + # 获取工作簿和工作表 + workbook = writer.book + worksheet = writer.sheets['Dify对话记录'] + + # 设置行高(20磅 ≈ 26.67像素) + for row in worksheet.iter_rows(): + worksheet.row_dimensions[row[0].row].height = 20 + + # 设置列宽 + column_widths = { + "msg_id": 15, + "提问": 40, + "回答": 60, + "提问人": 15, + "提问时间": 15, + "评价": 10, + "问题分类": 20, + "检索到的词条": 40 + } + + # 应用列宽设置 + for i, column in enumerate(columns_order): + col_letter = chr(65 + i) # A, B, C, ... + if i >= 26: # 超过Z的情况 + col_letter = chr(64 + i // 26) + chr(65 + i % 26) + worksheet.column_dimensions[col_letter].width = column_widths[column] + + print(f"结果已保存到 {output_file}") + + return output_file + + def export(self, output_file=None): + """ + 执行导出流程 + + Args: + output_file: 输出文件路径,默认为None(自动生成文件名) + + Returns: + 处理后的消息信息列表 + """ + # 加载查询日志 + self.load_query_logs() + + # 处理会话数据 + self.process_conversations() + + # 如果指定了输出文件,保存结果 + if output_file or len(self.message_info_list) > 0: + # 如果没有指定输出文件,则使用默认文件名 + if output_file is None: + timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + output_file = os.path.join(os.getcwd(), "data", "excel", f"dify_export_{timestamp}.xlsx") + + # 保存到Excel文件 + self.save_to_excel(self.message_info_list, output_file) + + return self.message_info_list + + +# 示例用法 +if __name__ == "__main__": + import argparse + + # 解析命令行参数 + parser = argparse.ArgumentParser(description='Dify数据导出工具') + parser.add_argument('--output', '-o', type=str, default="data/excel/dify_export.xlsx", + help='输出Excel文件路径') + parser.add_argument('--app_id', '-a', type=str, default=None, + help='Dify应用ID') + parser.add_argument('--query_log_file', '-q', type=str, default=None, + help='查询日志文件路径') + + args = parser.parse_args() + + load_dotenv() + # 设置环境变量 + os.environ["DIFY_PG_HOST"] = "10.1.16.39" + os.environ["DIFY_PG_PORT"] = "5432" + os.environ["DIFY_PG_USER"] = "postgres" + os.environ["DIFY_PG_PASSWORD"] = "difyai123456" + os.environ["DIFY_PG_DATABASE"] = "dify" + + # 创建导出器实例 + exporter = DifyExporter( + app_id=args.app_id, + query_log_file=args.query_log_file + ) + + # 执行导出 + results = exporter.export(output_file=args.output) + + # 打印结果 + print(f"导出了 {len(results)} 条消息信息") + \ No newline at end of file