新增多个启动脚本以支持不同服务的后台运行,优化对话到工单的处理逻辑,增加人力信息映射,调整日志记录机制以支持异步处理。

This commit is contained in:
2025-07-18 13:39:57 +08:00
parent 75c0992526
commit 5d5c3c0257
6 changed files with 259 additions and 86 deletions
+85 -45
View File
@@ -33,6 +33,33 @@ logging.basicConfig(
) )
logger = logging.getLogger("dialogue_to_workorder") logger = logging.getLogger("dialogue_to_workorder")
human_info={
"1116":["夏剑媛", "储能"],
"1201":["曹美芳", "配网"],
"1202":["彭珊珊", "主网"],
"1230":["龚青", "配网"],
"1544":["黄婷", "主网"],
"1546":["严琼辉", "配网"],
"1552":["吴园妹", "主网"],
"1555":["魏怡璠", "配网"],
"1789":["冷琛", "主网"],
"2142":["余国庆", "配网"],
"2144":["卢光辉", "技改"],
"2145":["万志星", "技改"],
"2233":["徐雨萍", "主网"],
"2262":["刘雨微", "主网"],
"2591":["揭敏", "主网"],
"3035":["杨玲", "主网"],
"3416":["杨苏文", "配网"],
"3417":["王琴", "配网"],
"439":["赵莉", "技改"],
"8340":["熊磊娇", "储能"],
"8442":["胡月", "配网"],
"8443":["杨淑玲", "主网"],
"8555":["胡青艳", "主网"],
"8762":["周丽华", "主网"],
}
# ================ 模型定义 ================ # ================ 模型定义 ================
class UserQuestionAndSolution(BaseModel): class UserQuestionAndSolution(BaseModel):
user_question: str = Field(description="用户的核心问题") user_question: str = Field(description="用户的核心问题")
@@ -143,6 +170,7 @@ class DialogueToWorkorder:
def get_workorder_dict(self, rows): def get_workorder_dict(self, rows):
"""从会话行中提取工单基本信息""" """从会话行中提取工单基本信息"""
# 预设字段
workorder_dict = {} workorder_dict = {}
# 创建时间 # 创建时间
@@ -158,6 +186,10 @@ class DialogueToWorkorder:
sender_nickname = row['发送者昵称'] sender_nickname = row['发送者昵称']
if sender == "坐席" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '': if sender == "坐席" and pd.notna(sender_nickname) and str(sender_nickname).strip() != '':
workorder_dict["处理坐席"] = sender_nickname workorder_dict["处理坐席"] = sender_nickname
sender_num = re.findall(r'客服(\d+)', sender_nickname)
if len(sender_num) > 0 and sender_num[0] in human_info:
workorder_dict["处理人"] = human_info[sender_num[0]][0]
workorder_dict["处理技能组"] = human_info[sender_num[0]][1]
break break
# 访客昵称 # 访客昵称
@@ -463,7 +495,28 @@ class DialogueToWorkorder:
# 更新工单字典 # 更新工单字典
base_workorder_dict.update({ # base_workorder_dict.update({
# "产品线": product_line,
# "产品名称": product_name,
# "模块名称": module_name,
# "客户问题": user_question_str,
# "问题类型": problem_type,
# "是否抱怨": "是" if is_dissatisfaction else '否',
# "抱怨内容": dissatisfaction_reasoning if is_dissatisfaction else '',
# "抱怨级别": dissatisfaction_level if is_dissatisfaction else '',
# "是否投诉": "是" if is_complaint else '否',
# "解决方案": solution_str
# })
# workorder_list.append(base_workorder_dict)
for user_question in user_question_list:
user_question_str = user_question.user_question
solution_str = user_question.solution
# 创建新的工单字典,复制基本信息
workorder_dict = base_workorder_dict.copy()
# 更新工单字典
workorder_dict.update({
"产品线": product_line, "产品线": product_line,
"产品名称": product_name, "产品名称": product_name,
"模块名称": module_name, "模块名称": module_name,
@@ -475,29 +528,9 @@ class DialogueToWorkorder:
"是否投诉": "" if is_complaint else '', "是否投诉": "" if is_complaint else '',
"解决方案": solution_str "解决方案": solution_str
}) })
workorder_list.append(base_workorder_dict)
# for user_question in user_question_list:
# user_question_str = user_question.user_question
# solution_str = user_question.solution
# # 创建新的工单字典,复制基本信息 # 将工单添加到列表中
# workorder_dict = base_workorder_dict.copy() workorder_list.append(workorder_dict)
# # 更新工单字典
# workorder_dict.update({
# "产品线": product_line,
# "产品名称": product_name,
# "模块名称": module_name,
# "客户问题": user_question_str,
# "问题类型": problem_type,
# "是否抱怨": "是" if is_dissatisfaction else '否',
# "抱怨级别": dissatisfaction_level if is_dissatisfaction else '',
# "是否投诉": "是" if is_complaint else '否',
# "解决方案": (solution_str + '\n存在抱怨:' + dissatisfaction_reasoning) if is_dissatisfaction else solution_str
# })
# # 将工单添加到列表中
# workorder_list.append(workorder_dict)
return workorder_list return workorder_list
@@ -513,27 +546,32 @@ class DialogueToWorkorder:
# 解析产品详情 # 解析产品详情
product_detail_dict = self.parse_product_detail_excel(product_detail_excel_path) product_detail_dict = self.parse_product_detail_excel(product_detail_excel_path)
# 如果指定了时间范围,则过滤数据
if start_date or end_date:
# 确保创建时间列为日期时间类型
if '创建时间' in df.columns:
df['创建时间'] = pd.to_datetime(df['创建时间'], errors='coerce')
# 按时间范围过滤
if start_date:
start_date = pd.to_datetime(start_date)
df = df[df['创建时间'] >= start_date]
logger.info(f"过滤开始时间 {start_date},剩余数据行数: {len(df)}")
if end_date:
end_date = pd.to_datetime(end_date)
df = df[df['创建时间'] <= end_date]
logger.info(f"过滤结束时间 {end_date},剩余数据行数: {len(df)}")
else:
logger.warning("数据中没有'创建时间'列,无法按时间范围过滤")
# 按会话ID分组 # 按会话ID分组
conversation_dict = self.group_conversations_by_id(df) conversation_dict = self.group_conversations_by_id(df)
# 如果指定了时间范围,则过滤数据
if start_date or end_date:
logging.info(f"过滤时间范围: {start_date}{end_date}")
# 将字符串日期转换为datetime对象
start_date_dt = datetime.strptime(start_date, "%Y-%m-%d %H:%M:%S") if start_date else None
end_date_dt = datetime.strptime(end_date, "%Y-%m-%d %H:%M:%S") if end_date else None
new_conversation_dict = {}
for conversation_id, conversation_rows in conversation_dict.items():
# 获取会话创建时间并转换为datetime对象
create_time_str = conversation_rows[0]["创建时间"]
if isinstance(create_time_str, str):
create_time_dt = datetime.strptime(create_time_str, "%Y-%m-%d %H:%M:%S")
else:
# 如果已经是datetime对象则直接使用
create_time_dt = create_time_str
# 使用datetime对象进行比较
if (start_date_dt and create_time_dt < start_date_dt) or (end_date_dt and create_time_dt > end_date_dt):
continue
new_conversation_dict[conversation_id] = conversation_rows
conversation_dict = new_conversation_dict
logger.info(f"会话总数为 {len(conversation_dict)},处理全部会话") logger.info(f"会话总数为 {len(conversation_dict)},处理全部会话")
# 使用线程池处理每个会话 # 使用线程池处理每个会话
@@ -566,7 +604,7 @@ class DialogueToWorkorder:
columns_order = [ columns_order = [
'工单编号', '产品线', '产品名称', '模块名称', '问题类型', '工单编号', '产品线', '产品名称', '模块名称', '问题类型',
'客户问题', '解决方案', '是否抱怨', "抱怨内容", '是否投诉', '抱怨级别', '客户问题', '解决方案', '是否抱怨', "抱怨内容", '是否投诉', '抱怨级别',
'会话id', '访客昵称', '处理坐席', '创建时间' '会话id', '访客昵称', '处理坐席', "处理人", "处理技能组",'创建时间'
] ]
# 确保所有列都存在,如果不存在则添加空列 # 确保所有列都存在,如果不存在则添加空列
@@ -615,6 +653,8 @@ class DialogueToWorkorder:
'会话id': 9, '会话id': 9,
'访客昵称': 9, '访客昵称': 9,
'处理坐席': 9, '处理坐席': 9,
'处理人': 9,
'处理技能组': 9,
'创建时间': 9 '创建时间': 9
} }
@@ -640,9 +680,9 @@ def parse_arguments():
help='产品详情Excel文件路径') help='产品详情Excel文件路径')
parser.add_argument('--max_workers', type=int, default=16, parser.add_argument('--max_workers', type=int, default=16,
help='并发处理线程数,默认为16') help='并发处理线程数,默认为16')
parser.add_argument('--start_date', type=str, required=False,default="2025-05-01 00:00:00", parser.add_argument('--start_date', type=str, required=False,default="2025-06-10 16:08:00",
help='开始日期,格式为YYYY-MM-DD') help='开始日期,格式为YYYY-MM-DD')
parser.add_argument('--end_date', type=str, required=False,default="2025-05-24 23:59:59", parser.add_argument('--end_date', type=str, required=False,default="2025-06-30 23:59:59",
help='结束日期,格式为YYYY-MM-DD') help='结束日期,格式为YYYY-MM-DD')
return parser.parse_args() return parser.parse_args()
+114 -40
View File
@@ -8,6 +8,8 @@ from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from typing import Dict, List, Any, Optional from typing import Dict, List, Any, Optional
import asyncio import asyncio
import threading
import queue
from dotenv import load_dotenv from dotenv import load_dotenv
import json import json
@@ -32,20 +34,92 @@ from rag2_0.dify.DifyQueryRetrieval import DifyQueryRetrieval
# 定义文件锁和JSON文件路径 # 定义文件锁和JSON文件路径
file_lock = asyncio.Lock() file_lock = asyncio.Lock()
QUERY_LOG_DIR = os.path.join(os.getcwd(), "data", "query_logs") QUERY_LOG_DIR = os.path.join(os.getcwd(), "data", "query_logs")
QUERY_LOG_FILE = os.path.join(QUERY_LOG_DIR, "answer_type_logs.json") QUERY_DATA_FILE = os.path.join(QUERY_LOG_DIR, "answer_type_logs.json")
logging.basicConfig( # 创建异步日志队列和工作线程
level=logging.INFO, log_queue = queue.Queue()
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', worker_thread = None
handlers=[
logging.StreamHandler() # 后台工作线程函数
] def log_worker():
while True:
try:
# 从队列获取数据,设置超时以允许线程退出
data = log_queue.get(timeout=1.0)
if data is None: # 接收到退出信号
# 处理剩余数据后再退出
while not log_queue.empty():
data = log_queue.get_nowait()
if data is None: # 跳过额外的停止信号
continue
process_log_data(data)
break
process_log_data(data)
log_queue.task_done()
except queue.Empty:
continue
except Exception as e:
logger.error(f"保存查询数据时出错: {str(e)}", exc_info=True)
# 提取数据处理逻辑到单独函数
def process_log_data(data):
try:
# 确保目录存在
os.makedirs(os.path.dirname(QUERY_DATA_FILE), exist_ok=True)
# 读取现有数据
existing_data = []
if os.path.exists(QUERY_DATA_FILE) and os.path.getsize(QUERY_DATA_FILE) > 0:
with open(QUERY_DATA_FILE, 'r', encoding='utf-8') as f:
try:
existing_data = json.load(f)
except json.JSONDecodeError:
logger.error(f"JSON文件解析错误,将创建新文件: {QUERY_DATA_FILE}")
existing_data = []
# 添加新数据
existing_data.append(data)
# 写入文件
with open(QUERY_DATA_FILE, 'w', encoding='utf-8') as f:
json.dump(existing_data, f, ensure_ascii=False, indent=2)
logger.info(f"成功保存查询数据到: {QUERY_DATA_FILE}")
except Exception as e:
logger.error(f"处理日志数据时出错: {str(e)}", exc_info=True)
# 创建日志目录
os.makedirs(QUERY_LOG_DIR, exist_ok=True)
# 配置日志 - 同时输出到控制台和文件
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# 创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 创建文件处理器
file_handler = logging.FileHandler(
os.path.join(QUERY_LOG_DIR, "answer_type_service.log"),
encoding='utf-8'
) )
file_handler.setLevel(logging.INFO)
# 创建日志格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
# 添加处理器到日志器
logger.addHandler(console_handler)
logger.addHandler(file_handler)
# 设置其他库的日志级别
logging.getLogger('httpx').setLevel(logging.WARNING) logging.getLogger('httpx').setLevel(logging.WARNING)
logging.getLogger('openai').setLevel(logging.WARNING) logging.getLogger('openai').setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
# 定义请求模型 # 定义请求模型
class AnswerTypeRequest(BaseModel): class AnswerTypeRequest(BaseModel):
query: str query: str
@@ -70,13 +144,32 @@ app.add_middleware(
# 应用启动事件 # 应用启动事件
@app.on_event("startup") @app.on_event("startup")
async def startup_event(): async def startup_event():
global worker_thread
# 确保日志目录存在 # 确保日志目录存在
os.makedirs(QUERY_LOG_DIR, exist_ok=True) os.makedirs(QUERY_LOG_DIR, exist_ok=True)
# 确保日志文件存在 # 确保日志文件存在
if not os.path.exists(QUERY_LOG_FILE): if not os.path.exists(QUERY_DATA_FILE):
async with file_lock: with open(QUERY_DATA_FILE, 'w', encoding='utf-8') as f:
with open(QUERY_LOG_FILE, 'w', encoding='utf-8') as f: json.dump([], f, ensure_ascii=False)
json.dump([], f, ensure_ascii=False)
# 启动后台工作线程
worker_thread = threading.Thread(target=log_worker, daemon=True)
worker_thread.start()
logger.info("后台日志工作线程已启动")
# 应用关闭事件
@app.on_event("shutdown")
def shutdown_event():
global worker_thread
if worker_thread:
# 发送退出信号
log_queue.put(None)
# 等待工作线程处理剩余数据
worker_thread.join(timeout=10.0)
if worker_thread.is_alive():
logger.warning("工作线程未在超时时间内退出")
else:
logger.info("后台日志工作线程已停止")
# 添加健康检查端点 # 添加健康检查端点
@app.get("/health", summary="健康检查") @app.get("/health", summary="健康检查")
@@ -89,41 +182,22 @@ async def query_type(query_type: str, workflow_run_id:str):
# 记录请求 # 记录请求
logger.info(f"接收到请求: 类型: {query_type}, workflow_run_id: {workflow_run_id}") logger.info(f"接收到请求: 类型: {query_type}, workflow_run_id: {workflow_run_id}")
# 保存 提问、问题类型、当前时间戳到json # 准备数据
timestamp = datetime.datetime.now().isoformat() timestamp = datetime.datetime.now().isoformat()
query_data = { query_data = {
"query_type": query_type, "query_type": query_type,
"timestamp": timestamp, "timestamp": timestamp,
"workflow_run_id": workflow_run_id "workflow_run_id": workflow_run_id
} }
success = True
# 将数据放入队列
try: try:
# 使用锁保护文件读写操作 log_queue.put(query_data)
async with file_lock: success = True
# 确保目录存在 logger.info(f"查询数据已加入队列,当前队列大小: {log_queue.qsize()}")
os.makedirs(os.path.dirname(QUERY_LOG_FILE), exist_ok=True)
# 读取现有数据
existing_data = []
if os.path.exists(QUERY_LOG_FILE) and os.path.getsize(QUERY_LOG_FILE) > 0:
with open(QUERY_LOG_FILE, 'r', encoding='utf-8') as f:
try:
existing_data = json.load(f)
except json.JSONDecodeError:
logger.error(f"JSON文件解析错误,将创建新文件: {QUERY_LOG_FILE}")
existing_data = []
# 添加新数据
existing_data.append(query_data)
# 写入文件
with open(QUERY_LOG_FILE, 'w', encoding='utf-8') as f:
json.dump(existing_data, f, ensure_ascii=False, indent=2)
logger.info(f"成功保存查询数据到: {QUERY_LOG_FILE}")
except Exception as e: except Exception as e:
success = False success = False
logger.error(f"保存查询数据时出错: {str(e)}", exc_info=True) logger.error(f"加入队列时出错: {str(e)}", exc_info=True)
# 返回响应 # 返回响应
content = f"<strong>问题类型</strong>: {query_type}<br><strong>操作是否成功</strong>: {'成功' if success else '失败'}" content = f"<strong>问题类型</strong>: {query_type}<br><strong>操作是否成功</strong>: {'成功' if success else '失败'}"
@@ -146,4 +220,4 @@ if __name__ == "__main__":
# workers=1 # 生产环境可以增加worker数量 # workers=1 # 生产环境可以增加worker数量
# ) # )
# 生产环境可以使用以下命令启动: # 生产环境可以使用以下命令启动:
# uvicorn rag2_0.dify.AnswerType:app --host 0.0.0.0 --port 8003 --workers 20 # uvicorn rag2_0.dify.AnswerType:app --host 0.0.0.0 --port 8003 --workers 1
+6 -1
View File
@@ -5,7 +5,7 @@ sys.path.append(os.getcwd())
import rag2_0.dify.dify_client.dify_api as DifyApi import rag2_0.dify.dify_client.dify_api as DifyApi
import pandas as pd import pandas as pd
pd_data = pd.read_excel("data/excel/工单汇总给AI_2.xlsx") pd_data = pd.read_excel("data/excel/工单汇总(给AI)_工单拆分.xlsx")
dify_api = DifyApi.DifyApi() dify_api = DifyApi.DifyApi()
@@ -13,6 +13,7 @@ peiwang_dataset_id = dify_api.get_or_create_dataset_by_name("配网工单数据"
zhuwang_dataset_id = dify_api.get_or_create_dataset_by_name("主网工单数据") zhuwang_dataset_id = dify_api.get_or_create_dataset_by_name("主网工单数据")
jianga_dataset_id = dify_api.get_or_create_dataset_by_name("技改工单数据") jianga_dataset_id = dify_api.get_or_create_dataset_by_name("技改工单数据")
chuneng_dataset_id = dify_api.get_or_create_dataset_by_name("储能工单数据") chuneng_dataset_id = dify_api.get_or_create_dataset_by_name("储能工单数据")
xizang_dataset_id = dify_api.get_or_create_dataset_by_name("西藏工单数据")
soft_segments_list={} soft_segments_list={}
@@ -39,6 +40,10 @@ for skill_group, segments_list in soft_segments_list.items():
dataset_id = jianga_dataset_id dataset_id = jianga_dataset_id
elif skill_group == "储能": elif skill_group == "储能":
dataset_id = chuneng_dataset_id dataset_id = chuneng_dataset_id
elif skill_group == "西藏":
dataset_id = xizang_dataset_id
else:
continue
document_id = dify_api.get_document_id(dataset_id=dataset_id, document_name=f"{skill_group}工单数据") document_id = dify_api.get_document_id(dataset_id=dataset_id, document_name=f"{skill_group}工单数据")
if not document_id: if not document_id:
document_id = dify_api.upload_text_to_document(text_name=f"{skill_group}工单数据", text="", dataset_id=dataset_id) document_id = dify_api.upload_text_to_document(text_name=f"{skill_group}工单数据", text="", dataset_id=dataset_id)
+18
View File
@@ -0,0 +1,18 @@
#!/bin/bash
# 获取当前脚本所在的绝对路径
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# 检查是否已经存在名为AnswerType的screen会话
if screen -ls | grep "AnswerType"; then
echo "Screen session 'AnswerType' already exists."
else
# 启动一个名为AnswerType的screen会话,并在其中执行后续命令
screen -dmS AnswerType bash -c '
cd $SCRIPT_DIR
uv run uvicorn rag2_0.dify.AnswerType:app --host 0.0.0.0 --port 8003 --workers 1
'
# 输出提示信息
echo "Started screen session 'AnswerType' and executed the command."
fi
+18
View File
@@ -0,0 +1,18 @@
#!/bin/bash
# 获取当前脚本所在的绝对路径
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# 检查是否已经存在名为DifyQueryRetrieval_api的screen会话
if screen -ls | grep "DifyQueryRetrieval_api"; then
echo "Screen session 'DifyQueryRetrieval_api' already exists."
else
# 启动一个名为DifyQueryRetrieval_api的screen会话,并在其中执行后续命令
screen -dmS DifyQueryRetrieval_api bash -c '
cd $SCRIPT_DIR
uv run uvicorn rag2_0.dify.DifyQueryRetrieval_api:app --host 0.0.0.0 --port 8002 --workers 25
'
# 输出提示信息
echo "Started screen session 'DifyQueryRetrieval_api' and executed the command."
fi
+18
View File
@@ -0,0 +1,18 @@
#!/bin/bash
# 获取当前脚本所在的绝对路径
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# 检查是否已经存在名为xinference的screen会话
if screen -ls | grep "intent_recognition_api"; then
echo "Screen session 'intent_recognition_api' already exists."
else
# 启动一个名为xinference的screen会话,并在其中执行后续命令
screen -dmS intent_recognition_api bash -c '
cd $SCRIPT_DIR
uv run uvicorn rag2_0.dify.intent_recognition_api:app --host 0.0.0.0 --port 8001 --workers 25
'
# 输出提示信息
echo "Started screen session 'intent_recognition_api' and executed the command."
fi