diff --git a/rag2_0/demo/dialogue_to_workorder.py b/rag2_0/demo/dialogue_to_workorder.py index 467446a..0cb2fe7 100755 --- a/rag2_0/demo/dialogue_to_workorder.py +++ b/rag2_0/demo/dialogue_to_workorder.py @@ -33,6 +33,33 @@ logging.basicConfig( ) logger = logging.getLogger("dialogue_to_workorder") +human_info={ +"1116":["夏剑媛", "储能"], +"1201":["曹美芳", "配网"], +"1202":["彭珊珊", "主网"], +"1230":["龚青", "配网"], +"1544":["黄婷", "主网"], +"1546":["严琼辉", "配网"], +"1552":["吴园妹", "主网"], +"1555":["魏怡璠", "配网"], +"1789":["冷琛", "主网"], +"2142":["余国庆", "配网"], +"2144":["卢光辉", "技改"], +"2145":["万志星", "技改"], +"2233":["徐雨萍", "主网"], +"2262":["刘雨微", "主网"], +"2591":["揭敏", "主网"], +"3035":["杨玲", "主网"], +"3416":["杨苏文", "配网"], +"3417":["王琴", "配网"], +"439":["赵莉", "技改"], +"8340":["熊磊娇", "储能"], +"8442":["胡月", "配网"], +"8443":["杨淑玲", "主网"], +"8555":["胡青艳", "主网"], +"8762":["周丽华", "主网"], +} + # ================ 模型定义 ================ class UserQuestionAndSolution(BaseModel): user_question: str = Field(description="用户的核心问题") @@ -143,6 +170,7 @@ class DialogueToWorkorder: def get_workorder_dict(self, rows): """从会话行中提取工单基本信息""" + # 预设字段 workorder_dict = {} # 创建时间 @@ -158,6 +186,10 @@ class DialogueToWorkorder: sender_nickname = row['发送者昵称'] if sender == "坐席" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '': workorder_dict["处理坐席"] = sender_nickname + sender_num = re.findall(r'客服(\d+)', sender_nickname) + if len(sender_num) > 0 and sender_num[0] in human_info: + workorder_dict["处理人"] = human_info[sender_num[0]][0] + workorder_dict["处理技能组"] = human_info[sender_num[0]][1] break # 访客昵称 @@ -463,7 +495,28 @@ class DialogueToWorkorder: # 更新工单字典 - base_workorder_dict.update({ + # base_workorder_dict.update({ + # "产品线": product_line, + # "产品名称": product_name, + # "模块名称": module_name, + # "客户问题": user_question_str, + # "问题类型": problem_type, + # "是否抱怨": "是" if is_dissatisfaction else '否', + # "抱怨内容": dissatisfaction_reasoning if is_dissatisfaction else '', + # "抱怨级别": dissatisfaction_level if is_dissatisfaction else '', + # "是否投诉": "是" if is_complaint else '否', + # "解决方案": solution_str + # }) + # workorder_list.append(base_workorder_dict) + for user_question in user_question_list: + user_question_str = user_question.user_question + solution_str = user_question.solution + + # 创建新的工单字典,复制基本信息 + workorder_dict = base_workorder_dict.copy() + + # 更新工单字典 + workorder_dict.update({ "产品线": product_line, "产品名称": product_name, "模块名称": module_name, @@ -475,29 +528,9 @@ class DialogueToWorkorder: "是否投诉": "是" if is_complaint else '否', "解决方案": solution_str }) - workorder_list.append(base_workorder_dict) - # for user_question in user_question_list: - # user_question_str = user_question.user_question - # solution_str = user_question.solution - # # 创建新的工单字典,复制基本信息 - # workorder_dict = base_workorder_dict.copy() - - # # 更新工单字典 - # workorder_dict.update({ - # "产品线": product_line, - # "产品名称": product_name, - # "模块名称": module_name, - # "客户问题": user_question_str, - # "问题类型": problem_type, - # "是否抱怨": "是" if is_dissatisfaction else '否', - # "抱怨级别": dissatisfaction_level if is_dissatisfaction else '', - # "是否投诉": "是" if is_complaint else '否', - # "解决方案": (solution_str + '\n存在抱怨:' + dissatisfaction_reasoning) if is_dissatisfaction else solution_str - # }) - - # # 将工单添加到列表中 - # workorder_list.append(workorder_dict) + # 将工单添加到列表中 + workorder_list.append(workorder_dict) return workorder_list @@ -513,27 +546,32 @@ class DialogueToWorkorder: # 解析产品详情 product_detail_dict = self.parse_product_detail_excel(product_detail_excel_path) - # 如果指定了时间范围,则过滤数据 - if start_date or end_date: - # 确保创建时间列为日期时间类型 - if '创建时间' in df.columns: - df['创建时间'] = pd.to_datetime(df['创建时间'], errors='coerce') - - # 按时间范围过滤 - if start_date: - start_date = pd.to_datetime(start_date) - df = df[df['创建时间'] >= start_date] - logger.info(f"过滤开始时间 {start_date},剩余数据行数: {len(df)}") - - if end_date: - end_date = pd.to_datetime(end_date) - df = df[df['创建时间'] <= end_date] - logger.info(f"过滤结束时间 {end_date},剩余数据行数: {len(df)}") - else: - logger.warning("数据中没有'创建时间'列,无法按时间范围过滤") - # 按会话ID分组 conversation_dict = self.group_conversations_by_id(df) + + # 如果指定了时间范围,则过滤数据 + if start_date or end_date: + logging.info(f"过滤时间范围: {start_date} 至 {end_date}") + # 将字符串日期转换为datetime对象 + start_date_dt = datetime.strptime(start_date, "%Y-%m-%d %H:%M:%S") if start_date else None + end_date_dt = datetime.strptime(end_date, "%Y-%m-%d %H:%M:%S") if end_date else None + + new_conversation_dict = {} + for conversation_id, conversation_rows in conversation_dict.items(): + # 获取会话创建时间并转换为datetime对象 + create_time_str = conversation_rows[0]["创建时间"] + if isinstance(create_time_str, str): + create_time_dt = datetime.strptime(create_time_str, "%Y-%m-%d %H:%M:%S") + else: + # 如果已经是datetime对象则直接使用 + create_time_dt = create_time_str + + # 使用datetime对象进行比较 + if (start_date_dt and create_time_dt < start_date_dt) or (end_date_dt and create_time_dt > end_date_dt): + continue + new_conversation_dict[conversation_id] = conversation_rows + conversation_dict = new_conversation_dict + logger.info(f"会话总数为 {len(conversation_dict)},处理全部会话") # 使用线程池处理每个会话 @@ -566,7 +604,7 @@ class DialogueToWorkorder: columns_order = [ '工单编号', '产品线', '产品名称', '模块名称', '问题类型', '客户问题', '解决方案', '是否抱怨', "抱怨内容", '是否投诉', '抱怨级别', - '会话id', '访客昵称', '处理坐席', '创建时间' + '会话id', '访客昵称', '处理坐席', "处理人", "处理技能组",'创建时间' ] # 确保所有列都存在,如果不存在则添加空列 @@ -615,6 +653,8 @@ class DialogueToWorkorder: '会话id': 9, '访客昵称': 9, '处理坐席': 9, + '处理人': 9, + '处理技能组': 9, '创建时间': 9 } @@ -640,9 +680,9 @@ def parse_arguments(): help='产品详情Excel文件路径') parser.add_argument('--max_workers', type=int, default=16, help='并发处理线程数,默认为16') - parser.add_argument('--start_date', type=str, required=False,default="2025-05-01 00:00:00", + parser.add_argument('--start_date', type=str, required=False,default="2025-06-10 16:08:00", help='开始日期,格式为YYYY-MM-DD') - parser.add_argument('--end_date', type=str, required=False,default="2025-05-24 23:59:59", + parser.add_argument('--end_date', type=str, required=False,default="2025-06-30 23:59:59", help='结束日期,格式为YYYY-MM-DD') return parser.parse_args() diff --git a/rag2_0/dify/AnswerType.py b/rag2_0/dify/AnswerType.py index e1d9d88..8b4be05 100644 --- a/rag2_0/dify/AnswerType.py +++ b/rag2_0/dify/AnswerType.py @@ -8,6 +8,8 @@ from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from typing import Dict, List, Any, Optional import asyncio +import threading +import queue from dotenv import load_dotenv import json @@ -32,20 +34,92 @@ from rag2_0.dify.DifyQueryRetrieval import DifyQueryRetrieval # 定义文件锁和JSON文件路径 file_lock = asyncio.Lock() QUERY_LOG_DIR = os.path.join(os.getcwd(), "data", "query_logs") -QUERY_LOG_FILE = os.path.join(QUERY_LOG_DIR, "answer_type_logs.json") +QUERY_DATA_FILE = os.path.join(QUERY_LOG_DIR, "answer_type_logs.json") -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.StreamHandler() - ] +# 创建异步日志队列和工作线程 +log_queue = queue.Queue() +worker_thread = None + +# 后台工作线程函数 +def log_worker(): + while True: + try: + # 从队列获取数据,设置超时以允许线程退出 + data = log_queue.get(timeout=1.0) + if data is None: # 接收到退出信号 + # 处理剩余数据后再退出 + while not log_queue.empty(): + data = log_queue.get_nowait() + if data is None: # 跳过额外的停止信号 + continue + process_log_data(data) + break + + process_log_data(data) + log_queue.task_done() + except queue.Empty: + continue + except Exception as e: + logger.error(f"保存查询数据时出错: {str(e)}", exc_info=True) + +# 提取数据处理逻辑到单独函数 +def process_log_data(data): + try: + # 确保目录存在 + os.makedirs(os.path.dirname(QUERY_DATA_FILE), exist_ok=True) + + # 读取现有数据 + existing_data = [] + if os.path.exists(QUERY_DATA_FILE) and os.path.getsize(QUERY_DATA_FILE) > 0: + with open(QUERY_DATA_FILE, 'r', encoding='utf-8') as f: + try: + existing_data = json.load(f) + except json.JSONDecodeError: + logger.error(f"JSON文件解析错误,将创建新文件: {QUERY_DATA_FILE}") + existing_data = [] + + # 添加新数据 + existing_data.append(data) + + # 写入文件 + with open(QUERY_DATA_FILE, 'w', encoding='utf-8') as f: + json.dump(existing_data, f, ensure_ascii=False, indent=2) + + logger.info(f"成功保存查询数据到: {QUERY_DATA_FILE}") + except Exception as e: + logger.error(f"处理日志数据时出错: {str(e)}", exc_info=True) + +# 创建日志目录 +os.makedirs(QUERY_LOG_DIR, exist_ok=True) + +# 配置日志 - 同时输出到控制台和文件 +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +# 创建控制台处理器 +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.INFO) + +# 创建文件处理器 +file_handler = logging.FileHandler( + os.path.join(QUERY_LOG_DIR, "answer_type_service.log"), + encoding='utf-8' ) +file_handler.setLevel(logging.INFO) + +# 创建日志格式 +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +console_handler.setFormatter(formatter) +file_handler.setFormatter(formatter) + +# 添加处理器到日志器 +logger.addHandler(console_handler) +logger.addHandler(file_handler) + +# 设置其他库的日志级别 logging.getLogger('httpx').setLevel(logging.WARNING) logging.getLogger('openai').setLevel(logging.WARNING) -logger = logging.getLogger(__name__) - # 定义请求模型 class AnswerTypeRequest(BaseModel): query: str @@ -70,13 +144,32 @@ app.add_middleware( # 应用启动事件 @app.on_event("startup") async def startup_event(): + global worker_thread # 确保日志目录存在 os.makedirs(QUERY_LOG_DIR, exist_ok=True) # 确保日志文件存在 - if not os.path.exists(QUERY_LOG_FILE): - async with file_lock: - with open(QUERY_LOG_FILE, 'w', encoding='utf-8') as f: - json.dump([], f, ensure_ascii=False) + if not os.path.exists(QUERY_DATA_FILE): + with open(QUERY_DATA_FILE, 'w', encoding='utf-8') as f: + json.dump([], f, ensure_ascii=False) + + # 启动后台工作线程 + worker_thread = threading.Thread(target=log_worker, daemon=True) + worker_thread.start() + logger.info("后台日志工作线程已启动") + +# 应用关闭事件 +@app.on_event("shutdown") +def shutdown_event(): + global worker_thread + if worker_thread: + # 发送退出信号 + log_queue.put(None) + # 等待工作线程处理剩余数据 + worker_thread.join(timeout=10.0) + if worker_thread.is_alive(): + logger.warning("工作线程未在超时时间内退出") + else: + logger.info("后台日志工作线程已停止") # 添加健康检查端点 @app.get("/health", summary="健康检查") @@ -89,41 +182,22 @@ async def query_type(query_type: str, workflow_run_id:str): # 记录请求 logger.info(f"接收到请求: 类型: {query_type}, workflow_run_id: {workflow_run_id}") - # 保存 提问、问题类型、当前时间戳到json + # 准备数据 timestamp = datetime.datetime.now().isoformat() query_data = { "query_type": query_type, "timestamp": timestamp, "workflow_run_id": workflow_run_id } - success = True + + # 将数据放入队列 try: - # 使用锁保护文件读写操作 - async with file_lock: - # 确保目录存在 - os.makedirs(os.path.dirname(QUERY_LOG_FILE), exist_ok=True) - - # 读取现有数据 - existing_data = [] - if os.path.exists(QUERY_LOG_FILE) and os.path.getsize(QUERY_LOG_FILE) > 0: - with open(QUERY_LOG_FILE, 'r', encoding='utf-8') as f: - try: - existing_data = json.load(f) - except json.JSONDecodeError: - logger.error(f"JSON文件解析错误,将创建新文件: {QUERY_LOG_FILE}") - existing_data = [] - - # 添加新数据 - existing_data.append(query_data) - - # 写入文件 - with open(QUERY_LOG_FILE, 'w', encoding='utf-8') as f: - json.dump(existing_data, f, ensure_ascii=False, indent=2) - - logger.info(f"成功保存查询数据到: {QUERY_LOG_FILE}") + log_queue.put(query_data) + success = True + logger.info(f"查询数据已加入队列,当前队列大小: {log_queue.qsize()}") except Exception as e: success = False - logger.error(f"保存查询数据时出错: {str(e)}", exc_info=True) + logger.error(f"加入队列时出错: {str(e)}", exc_info=True) # 返回响应 content = f"问题类型: {query_type}
操作是否成功: {'成功' if success else '失败'}" @@ -146,4 +220,4 @@ if __name__ == "__main__": # workers=1 # 生产环境可以增加worker数量 # ) # 生产环境可以使用以下命令启动: - # uvicorn rag2_0.dify.AnswerType:app --host 0.0.0.0 --port 8003 --workers 20 \ No newline at end of file + # uvicorn rag2_0.dify.AnswerType:app --host 0.0.0.0 --port 8003 --workers 1 \ No newline at end of file diff --git a/rag2_0/dify/WorkorderToDify.py b/rag2_0/dify/WorkorderToDify.py index 3924140..8b007fe 100644 --- a/rag2_0/dify/WorkorderToDify.py +++ b/rag2_0/dify/WorkorderToDify.py @@ -5,7 +5,7 @@ sys.path.append(os.getcwd()) import rag2_0.dify.dify_client.dify_api as DifyApi import pandas as pd -pd_data = pd.read_excel("data/excel/工单汇总(给AI)_2.xlsx") +pd_data = pd.read_excel("data/excel/工单汇总(给AI)_工单拆分.xlsx") dify_api = DifyApi.DifyApi() @@ -13,6 +13,7 @@ peiwang_dataset_id = dify_api.get_or_create_dataset_by_name("配网工单数据" zhuwang_dataset_id = dify_api.get_or_create_dataset_by_name("主网工单数据") jianga_dataset_id = dify_api.get_or_create_dataset_by_name("技改工单数据") chuneng_dataset_id = dify_api.get_or_create_dataset_by_name("储能工单数据") +xizang_dataset_id = dify_api.get_or_create_dataset_by_name("西藏工单数据") soft_segments_list={} @@ -39,6 +40,10 @@ for skill_group, segments_list in soft_segments_list.items(): dataset_id = jianga_dataset_id elif skill_group == "储能": dataset_id = chuneng_dataset_id + elif skill_group == "西藏": + dataset_id = xizang_dataset_id + else: + continue document_id = dify_api.get_document_id(dataset_id=dataset_id, document_name=f"{skill_group}工单数据") if not document_id: document_id = dify_api.upload_text_to_document(text_name=f"{skill_group}工单数据", text="", dataset_id=dataset_id) diff --git a/start_AnswerType.sh b/start_AnswerType.sh new file mode 100644 index 0000000..0f57fb2 --- /dev/null +++ b/start_AnswerType.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +# 获取当前脚本所在的绝对路径 +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +# 检查是否已经存在名为AnswerType的screen会话 +if screen -ls | grep "AnswerType"; then + echo "Screen session 'AnswerType' already exists." +else + # 启动一个名为AnswerType的screen会话,并在其中执行后续命令 + screen -dmS AnswerType bash -c ' + cd $SCRIPT_DIR + uv run uvicorn rag2_0.dify.AnswerType:app --host 0.0.0.0 --port 8003 --workers 1 + ' + + # 输出提示信息 + echo "Started screen session 'AnswerType' and executed the command." +fi \ No newline at end of file diff --git a/start_DifyQueryRetrieval_api.sh b/start_DifyQueryRetrieval_api.sh new file mode 100644 index 0000000..bc5cec1 --- /dev/null +++ b/start_DifyQueryRetrieval_api.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +# 获取当前脚本所在的绝对路径 +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +# 检查是否已经存在名为DifyQueryRetrieval_api的screen会话 +if screen -ls | grep "DifyQueryRetrieval_api"; then + echo "Screen session 'DifyQueryRetrieval_api' already exists." +else + # 启动一个名为DifyQueryRetrieval_api的screen会话,并在其中执行后续命令 + screen -dmS DifyQueryRetrieval_api bash -c ' + cd $SCRIPT_DIR + uv run uvicorn rag2_0.dify.DifyQueryRetrieval_api:app --host 0.0.0.0 --port 8002 --workers 25 + ' + + # 输出提示信息 + echo "Started screen session 'DifyQueryRetrieval_api' and executed the command." +fi \ No newline at end of file diff --git a/start_intent_recognition_api.sh b/start_intent_recognition_api.sh new file mode 100755 index 0000000..0f48a44 --- /dev/null +++ b/start_intent_recognition_api.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +# 获取当前脚本所在的绝对路径 +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +# 检查是否已经存在名为xinference的screen会话 +if screen -ls | grep "intent_recognition_api"; then + echo "Screen session 'intent_recognition_api' already exists." +else + # 启动一个名为xinference的screen会话,并在其中执行后续命令 + screen -dmS intent_recognition_api bash -c ' + cd $SCRIPT_DIR + uv run uvicorn rag2_0.dify.intent_recognition_api:app --host 0.0.0.0 --port 8001 --workers 25 + ' + + # 输出提示信息 + echo "Started screen session 'intent_recognition_api' and executed the command." +fi \ No newline at end of file