优化Web事件代码

This commit is contained in:
wanyaokun
2024-08-29 19:00:25 +08:00
parent 131d6ef1d1
commit 50f35bb0c9
5 changed files with 347 additions and 231 deletions
+229 -167
View File
@@ -3,6 +3,7 @@ import json
import logging
import time
from typing import Dict, List, Any, Optional, AsyncGenerator
from collections import deque
from aiostream import stream
from fastapi import APIRouter, Request
@@ -13,7 +14,8 @@ from llama_index.core.callbacks import CBEventType
from llama_index.core.chat_engine.types import StreamingAgentChatResponse
from llama_index.core.tools import ToolOutput
from pydantic import BaseModel
from app.api.routers.request.base import userMng, conversations,message,parameter
from app.api.routers.request.base import userMng, conversations,message,parameter,feedback
from app.api.routers.request.baseConfig import *
from app.api.routers.request.models import ChatRequestData,ChatFileUploadRequest
from app.engine import get_chat_engine
import uuid
@@ -102,77 +104,6 @@ class ChatCallbackEvent(BaseModel):
logger.error(f"转换回应时间时发生错误,原因: {e}")
return None
class ChatEventCallbackHandler(BaseCallbackHandler):
_aqueue: asyncio.Queue
is_done: bool = False
def __init__(
self,
):
"""Initialize the base callback handler."""
ignored_events = [
# CBEventType.CHUNKING,
# CBEventType.NODE_PARSING,
# CBEventType.EMBEDDING,
# CBEventType.LLM,
# CBEventType.TEMPLATING,
]
super().__init__(ignored_events, ignored_events)
self._aqueue = asyncio.Queue()
def on_event_start(
self,
event_type: CBEventType,
payload: Optional[Dict[str, Any]] = None,
event_id: str = "",
**kwargs: Any,
) -> str:
logger.info("event_start:{} type:{} payload:{}\n".format(event_id, event_type, payload))
event = ChatCallbackEvent(event_id=event_id, event_type=event_type, payload=payload)
if event.to_response() is not None:
self._aqueue.put_nowait(event)
def on_event_end(
self,
event_type: CBEventType,
payload: Optional[Dict[str, Any]] = None,
event_id: str = "",
**kwargs: Any,
) -> None:
logger.info("event_end:{} type:{} payload:{}\n".format(event_id, event_type, payload))
event = ChatCallbackEvent(event_id=event_id, event_type=event_type, payload=payload)
if event.to_response() is not None:
self._aqueue.put_nowait(event)
def start_trace(self, trace_id: Optional[str] = None) -> None:
"""No-op."""
logger.info("trace_start:{}\n".format(trace_id))
def end_trace(
self,
trace_id: Optional[str] = None,
trace_map: Optional[Dict[str, List[str]]] = None,
) -> None:
"""No-op."""
logger.info("trace_end:{} trace_map:{}\n".format(trace_id, trace_map))
async def async_event_gen(self) -> AsyncGenerator[ChatCallbackEvent, None]:
while not self._aqueue.empty() or not self.is_done:
try:
yield await asyncio.wait_for(self._aqueue.get(), timeout=0.1)
except asyncio.TimeoutError:
pass
class IDManager:
def createID(self):
return {
"message_id" : str(uuid.uuid4()),
'task_id':str(uuid.uuid4()),
'workflow_run_id': str(uuid.uuid4()),
"workflow_id": str(uuid.uuid4())
}
class DifyChatResponseEvent(BaseModel):
event: str
conversation_id: str
@@ -180,7 +111,11 @@ class DifyChatResponseEvent(BaseModel):
created_at: int = int(time.time())
task_id: str
def to_response(self):
return self.dict()
class Workflow_started_DifyChatResponseEvent(DifyChatResponseEvent):
event: str = 'workflow_started'
workflow_run_id:str
data:Dict[str,Any]
def __init__(self,**args):
@@ -196,14 +131,13 @@ class Workflow_started_DifyChatResponseEvent(DifyChatResponseEvent):
},
"created_at": int(time.time())
}
args['event'] = 'workflow_started'
super().__init__(**args)
class Workflow_finished_DifyChatResponseEvent(DifyChatResponseEvent):
event: str = 'workflow_finished'
workflow_run_id:str
data:Dict[str,Any]
def __init__(self,**args):
args['event'] = 'workflow_finished'
args['data'] = {
"id": args['workflow_run_id'],
"workflow_id": args['workflow_id'],
@@ -227,41 +161,219 @@ class Workflow_finished_DifyChatResponseEvent(DifyChatResponseEvent):
super().__init__(**args)
class Message_DifyChatResponseEvent(DifyChatResponseEvent):
event: str = 'message'
id:str
answer:str
def __init__(self,**args):
args['id'] = args['message_id']
args['event'] = 'message'
super().__init__(**args)
class MessageEnd_DifyChatResponseEvent(DifyChatResponseEvent):
event: str = 'message_end'
id:str
metadata:Dict[str,Any] = {}
def __init__(self,**args):
args['id'] = args['message_id']
args['event'] = 'message_end'
super().__init__(**args)
class Node_started_DifyChatResponseEvent(DifyChatResponseEvent):
event: str = 'node_started'
workflow_run_id:str
data:Dict[str,Any]
def __init__(self,**args):
args['data'] = {
"id": args['nodeid'],
"node_id": args['nodeid'],
"node_type": "http-request",
"title": args['title'],
"index": args['index'],
"predecessor_node_id": args['predecessor_node_id'],
"inputs": '',
"created_at": 1724398751,
"extras": {}
}
super().__init__(**args)
class Node_finished_DifyChatResponseEvent(DifyChatResponseEvent):
event: str = 'node_finished'
workflow_run_id:str
data:Dict[str,Any]
def __init__(self,**args):
args['data'] = {
"id": args['nodeid'],
"node_id": args['nodeid'],
"node_type": "http-request",
"title": args['title'],
"index": args['index'],
"predecessor_node_id": args['predecessor_node_id'],
"inputs": '',
"process_data": '',
"outputs": '',
"status": "succeeded",
"error": '',
"elapsed_time": 0.10402441816404462,
"execution_metadata": '',
"created_at": 1724398751,
"finished_at": 1724398751,
"files": []
}
super().__init__(**args)
class ChatEventCallbackHandler(BaseCallbackHandler):
_aqueue: asyncio.Queue
is_done: bool = False
def __init__(self,**params):
"""Initialize the base callback handler."""
ignored_events = [
# CBEventType.CHUNKING,
# CBEventType.NODE_PARSING,
# CBEventType.EMBEDDING,
# CBEventType.LLM,
# CBEventType.TEMPLATING,
]
super().__init__(ignored_events, ignored_events)
self._aqueue = asyncio.Queue()
self._response:str = ''
self._params:Dict[str,Any] = params
self._nodeStack:deque = deque()
#添加工作流开始事件
ids:Dict[str,Any] = self._params['ids']
data:ChatRequestData = self._params['data']
args = ids
args.update(
{
'use_id': data.user,
'query': data.query,
'conversation_id': data.conversation_id
}
)
wf_event = Workflow_started_DifyChatResponseEvent(**args)
if wf_event.to_response() is not None:
self._aqueue.put_nowait(wf_event)
def on_event_start(
self,
event_type: CBEventType,
payload: Optional[Dict[str, Any]] = None,
event_id: str = "",
**kwargs: Any,
) -> str:
logger.info("event_start:{} type:{} payload:{}\n".format(event_id, event_type, payload))
self._nodeStack.append(event_id)
nindex = self._nodeStack.count() - 1
ids:Dict[str,Any] = self._params['ids']
args = ids
args.update(
{
'nodeid':event_id,
'title':event_type.name,
'index':nindex + 1,
'predecessor_node_id': self._nodeStack[nindex - 1] if nindex > 0 else ''
}
)
nd_event = Node_started_DifyChatResponseEvent(**args)
if nd_event.to_response() is not None:
self._aqueue.put_nowait(nd_event)
def on_event_end(
self,
event_type: CBEventType,
payload: Optional[Dict[str, Any]] = None,
event_id: str = "",
**kwargs: Any,
) -> None:
logger.info("event_end:{} type:{} payload:{}\n".format(event_id, event_type, payload))
#self.response = payload.get("response","")
args:Dict[str,Any] = self._params['ids']
nodeID = self._nodeStack[-1]
if nodeID == event_id:
nindex = self._nodeStack.count() - 1
args.update(
{
'nodeid':event_id,
'title':event_type.name,
'index':nindex + 1,
'predecessor_node_id':self._nodeStack[nindex - 1] if nindex > 0 else ''
}
)
nd_event = Node_finished_DifyChatResponseEvent(**args)
if nd_event.to_response() is not None:
self._aqueue.put_nowait(nd_event)
self._nodeStack.pop()
def start_trace(self, trace_id: Optional[str] = None) -> None:
"""No-op."""
logger.info("trace_start:{}\n".format(trace_id))
def end_trace(
self,
trace_id: Optional[str] = None,
trace_map: Optional[Dict[str, List[str]]] = None,
) -> None:
"""No-op."""
logger.info("trace_end:{} trace_map:{}\n".format(trace_id, trace_map))
ids:Dict[str,Any] = self._params['ids']
data:ChatRequestData = self._params['data']
args = ids
args.update(
{
'response':self._response,
'conversation_id': data.conversation_id
}
)
wf_event = Workflow_finished_DifyChatResponseEvent(**args)
if wf_event.to_response() is not None:
self._aqueue.put_nowait(wf_event)
args = ids
msgEnt_event = MessageEnd_DifyChatResponseEvent(**args)
if msgEnt_event.to_response() is not None:
self._aqueue.put_nowait(msgEnt_event)
async def async_event_gen(self) -> AsyncGenerator[ChatCallbackEvent, None]:
while not self._aqueue.empty() or not self.is_done:
try:
yield await asyncio.wait_for(self._aqueue.get(), timeout=0.1)
except asyncio.TimeoutError:
pass
class IDManager:
def createID(self):
return {
"message_id" : str(uuid.uuid4()),
'task_id':str(uuid.uuid4()),
'workflow_run_id': str(uuid.uuid4()),
"workflow_id": str(uuid.uuid4())
}
class ChatStreamResponse(StreamingResponse):
TEXT_PREFIX = "data: "
DATA_PREFIX = "data: "
ids:Dict[str,Any] = {}
data:ChatRequestData = None
@classmethod
def convert_text(cls, token: str):
# Escape newlines and double quotes to avoid breaking the stream
#token = json.dumps(token)
#return f"data: {{"event": "message", "conversation_id": "80d85523-de92-4b9d-aca0-c48a5eacb068", "message_id": "16a06b1b-a89b-49c0-bc15-123bd999f6d6", "created_at": 1724406492, "task_id": "802f3064-030d-42ac-a882-0e1293712d04", "id": "16a06b1b-a89b-49c0-bc15-123bd999f6d6", "answer": "{token}"}}"
return "\n"
@classmethod
def convert_data(cls, data: dict):
data_str = json.dumps(data)
def convert_Message(cls, token: str):
params = cls.ids
params.update({
'answer':token,
'conversation_id':cls.data.conversation_id
})
event = Message_DifyChatResponseEvent(**params)
data_str = json.dumps(event.dict())
return f"{cls.DATA_PREFIX}{data_str}\n\n"
@classmethod
def convert_event(cls, event: DifyChatResponseEvent):
data_str = json.dumps(event.dict())
def convert_Event(cls, data: dict):
data_str = json.dumps(data)
return f"{cls.DATA_PREFIX}{data_str}\n\n"
def __init__(
@@ -269,8 +381,11 @@ class ChatStreamResponse(StreamingResponse):
request: Request,
event_handler: ChatEventCallbackHandler,
response: StreamingAgentChatResponse,
data: ChatRequestData
data: ChatRequestData,
ids:Dict[str,Any]
):
ChatStreamResponse.ids = ids
ChatStreamResponse.data = data
content = ChatStreamResponse.content_generator(
request, event_handler, response, data
)
@@ -284,41 +399,26 @@ class ChatStreamResponse(StreamingResponse):
response: StreamingAgentChatResponse,
data: ChatRequestData
):
ids = IDManager().createID()
# Yield the text response
async def _chat_response_generator():
final_response = ""
async for token in response.async_response_gen():
final_response += token
args = ids
args['answer'] = token
args['conversation_id'] = data.conversation_id
event = Message_DifyChatResponseEvent(**args)
yield ChatStreamResponse.convert_event(event)
#yield ChatStreamResponse.convert_text(token)
yield ChatStreamResponse.convert_Message(token)
# 存储消息历史
message().add(user_id=data.user,conversation_id=data.conversation_id,query=data.query,answer=final_response)
# the text_generator is the leading stream, once it's finished, also finish the event stream
event_handler.is_done = True
# 发送工作流结束事件
args = ids
args['response'] = final_response
args['conversation_id'] = data.conversation_id
wf_event = Workflow_finished_DifyChatResponseEvent(**args)
yield ChatStreamResponse.convert_event(wf_event)
msgEnt_event = MessageEnd_DifyChatResponseEvent(**ids)
yield ChatStreamResponse.convert_event(msgEnt_event)
# Yield the events from the event handler
async def _event_generator():
async for event in event_handler.async_event_gen():
event_response = event.to_response()
if event_response is not None:
yield ChatStreamResponse.convert_text("")
yield ChatStreamResponse.convert_Event(event_response)
combine = stream.merge(_chat_response_generator(), _event_generator())
is_stream_started = False
@@ -327,25 +427,11 @@ class ChatStreamResponse(StreamingResponse):
if not is_stream_started:
is_stream_started = True
# 发送工作流开始事件
args = ids
args['use_id'] = data.user
args['query'] = data.query
args['conversation_id'] = data.conversation_id
wf_event = Workflow_started_DifyChatResponseEvent(**args)
yield ChatStreamResponse.convert_event(wf_event)
# Stream a blank message to start the stream
# 发送一个空消息事件
#yield ChatStreamResponse.convert_text("")
yield output
if await request.is_disconnected():
break
@v.post("/chat-messages")
async def post_conversations(request: Request, data: ChatRequestData):
userMng.findNoExistCreate(data.user)
@@ -365,14 +451,15 @@ async def post_conversations(request: Request, data: ChatRequestData):
chat_engine = get_chat_engine(filters=filters, params=params)
# 启动聊天事件监听
event_handler = ChatEventCallbackHandler()
ids = IDManager().createID()
event_handler = ChatEventCallbackHandler(ids = ids,data = data)
chat_engine.callback_manager.handlers.append(event_handler) # type: ignore
# 执行异步聊天
response = await chat_engine.astream_chat(data.query)
# 返回异步消息回应
return ChatStreamResponse(request, event_handler, response, data)
return ChatStreamResponse(request, event_handler, response, data,ids)
@v.get("/messages")
async def query_messages(user:str, conversation_id:str):
@@ -388,8 +475,9 @@ async def query_messages(user:str, conversation_id:str):
for record in records:
res = record.dict()
feeds = feedback().query(res['id'])
res["message_files"] = []
res["feedback"] = ''
res["feedback"] = {'rating':feeds['rating'] } if feeds != None else ''
res["retriever_resources"] = []
res["created_at"] = 1723444905
res["agent_thoughts"] = []
@@ -440,48 +528,22 @@ async def query_conversations(user:str, first_id:str = None, limit:str = None, p
async def query_parameters(user:str):
params = parameter().get(user)
if len(params) == 0:
params = {
"opening_statement": "您好,我是配网D3造价软件小助手,您可以问我有关配网造价软件的相关问题!",
"suggested_questions": [],
"suggested_questions_after_answer": {
"enabled": False
},
"speech_to_text": {
"enabled": False
},
"text_to_speech": {
"enabled": False,
"language": "",
"voice": ""
},
"retriever_resource": {
"enabled": True
},
"annotation_reply": {
"enabled": False
},
"more_like_this": {
"enabled": False
},
"user_input_form": [],
"sensitive_word_avoidance": {
"enabled": False
},
"file_upload": {
"image": {
"enabled": False,
"number_limits": 3,
"transfer_methods": [
"remote_url"
]
}
},
"system_parameters": {
"image_file_size_limit": "10"
}
}
params = BaseConfig().ParamterCfg()
return params
@v.post("/messages/{message_id}/feedbacks")
async def post_feedbacks(request: Request,message_id:str,params:Dict[str,Any]):
if params['rating'] =='null':
feedback().delete(message_id)
else:
condition = {'id':message_id}
results = message().query(**condition)
if len(results) > 0:
result = results[0]
feedback().add(message_id=message_id,query=result['query'],
answer=result['answer'],rating=params['rating'])
@r.post("")
def upload_file(request: ChatFileUploadRequest) -> List[str]:
pass
pass