From 50f35bb0c9be73fc3aa728be4dcbb277e873cd18 Mon Sep 17 00:00:00 2001 From: wanyaokun <12345678> Date: Thu, 29 Aug 2024 19:00:25 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E4=BC=98=E5=8C=96Web=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/app/api/routers/app.py | 396 ++++++++++-------- backend/app/api/routers/request/base.py | 34 +- backend/app/api/routers/request/baseConfig.py | 111 ++--- backend/app/api/routers/request/dbOrm.py | 31 +- backend/app/api/routers/request/models.py | 6 +- 5 files changed, 347 insertions(+), 231 deletions(-) diff --git a/backend/app/api/routers/app.py b/backend/app/api/routers/app.py index cb53bd2..bca5fd5 100644 --- a/backend/app/api/routers/app.py +++ b/backend/app/api/routers/app.py @@ -3,6 +3,7 @@ import json import logging import time from typing import Dict, List, Any, Optional, AsyncGenerator +from collections import deque from aiostream import stream from fastapi import APIRouter, Request @@ -13,7 +14,8 @@ from llama_index.core.callbacks import CBEventType from llama_index.core.chat_engine.types import StreamingAgentChatResponse from llama_index.core.tools import ToolOutput from pydantic import BaseModel -from app.api.routers.request.base import userMng, conversations,message,parameter +from app.api.routers.request.base import userMng, conversations,message,parameter,feedback +from app.api.routers.request.baseConfig import * from app.api.routers.request.models import ChatRequestData,ChatFileUploadRequest from app.engine import get_chat_engine import uuid @@ -102,77 +104,6 @@ class ChatCallbackEvent(BaseModel): logger.error(f"转换回应时间时发生错误,原因: {e}") return None -class ChatEventCallbackHandler(BaseCallbackHandler): - _aqueue: asyncio.Queue - is_done: bool = False - - def __init__( - self, - ): - """Initialize the base callback handler.""" - ignored_events = [ - # CBEventType.CHUNKING, - # CBEventType.NODE_PARSING, - # CBEventType.EMBEDDING, - # CBEventType.LLM, - # CBEventType.TEMPLATING, - ] - super().__init__(ignored_events, ignored_events) - self._aqueue = asyncio.Queue() - - def on_event_start( - self, - event_type: CBEventType, - payload: Optional[Dict[str, Any]] = None, - event_id: str = "", - **kwargs: Any, - ) -> str: - logger.info("event_start:{} type:{} payload:{}\n".format(event_id, event_type, payload)) - - event = ChatCallbackEvent(event_id=event_id, event_type=event_type, payload=payload) - if event.to_response() is not None: - self._aqueue.put_nowait(event) - - def on_event_end( - self, - event_type: CBEventType, - payload: Optional[Dict[str, Any]] = None, - event_id: str = "", - **kwargs: Any, - ) -> None: - logger.info("event_end:{} type:{} payload:{}\n".format(event_id, event_type, payload)) - event = ChatCallbackEvent(event_id=event_id, event_type=event_type, payload=payload) - if event.to_response() is not None: - self._aqueue.put_nowait(event) - - def start_trace(self, trace_id: Optional[str] = None) -> None: - """No-op.""" - logger.info("trace_start:{}\n".format(trace_id)) - - def end_trace( - self, - trace_id: Optional[str] = None, - trace_map: Optional[Dict[str, List[str]]] = None, - ) -> None: - """No-op.""" - logger.info("trace_end:{} trace_map:{}\n".format(trace_id, trace_map)) - - async def async_event_gen(self) -> AsyncGenerator[ChatCallbackEvent, None]: - while not self._aqueue.empty() or not self.is_done: - try: - yield await asyncio.wait_for(self._aqueue.get(), timeout=0.1) - except asyncio.TimeoutError: - pass - -class IDManager: - def createID(self): - return { - "message_id" : str(uuid.uuid4()), - 'task_id':str(uuid.uuid4()), - 'workflow_run_id': str(uuid.uuid4()), - "workflow_id": str(uuid.uuid4()) - } - class DifyChatResponseEvent(BaseModel): event: str conversation_id: str @@ -180,7 +111,11 @@ class DifyChatResponseEvent(BaseModel): created_at: int = int(time.time()) task_id: str + def to_response(self): + return self.dict() + class Workflow_started_DifyChatResponseEvent(DifyChatResponseEvent): + event: str = 'workflow_started' workflow_run_id:str data:Dict[str,Any] def __init__(self,**args): @@ -196,14 +131,13 @@ class Workflow_started_DifyChatResponseEvent(DifyChatResponseEvent): }, "created_at": int(time.time()) } - args['event'] = 'workflow_started' super().__init__(**args) - + class Workflow_finished_DifyChatResponseEvent(DifyChatResponseEvent): + event: str = 'workflow_finished' workflow_run_id:str data:Dict[str,Any] def __init__(self,**args): - args['event'] = 'workflow_finished' args['data'] = { "id": args['workflow_run_id'], "workflow_id": args['workflow_id'], @@ -227,41 +161,219 @@ class Workflow_finished_DifyChatResponseEvent(DifyChatResponseEvent): super().__init__(**args) class Message_DifyChatResponseEvent(DifyChatResponseEvent): + event: str = 'message' id:str answer:str def __init__(self,**args): args['id'] = args['message_id'] - args['event'] = 'message' super().__init__(**args) class MessageEnd_DifyChatResponseEvent(DifyChatResponseEvent): + event: str = 'message_end' id:str metadata:Dict[str,Any] = {} def __init__(self,**args): args['id'] = args['message_id'] - args['event'] = 'message_end' super().__init__(**args) + +class Node_started_DifyChatResponseEvent(DifyChatResponseEvent): + event: str = 'node_started' + workflow_run_id:str + data:Dict[str,Any] + def __init__(self,**args): + args['data'] = { + "id": args['nodeid'], + "node_id": args['nodeid'], + "node_type": "http-request", + "title": args['title'], + "index": args['index'], + "predecessor_node_id": args['predecessor_node_id'], + "inputs": '', + "created_at": 1724398751, + "extras": {} + } + super().__init__(**args) + +class Node_finished_DifyChatResponseEvent(DifyChatResponseEvent): + event: str = 'node_finished' + workflow_run_id:str + data:Dict[str,Any] + def __init__(self,**args): + args['data'] = { + "id": args['nodeid'], + "node_id": args['nodeid'], + "node_type": "http-request", + "title": args['title'], + "index": args['index'], + "predecessor_node_id": args['predecessor_node_id'], + "inputs": '', + "process_data": '', + "outputs": '', + "status": "succeeded", + "error": '', + "elapsed_time": 0.10402441816404462, + "execution_metadata": '', + "created_at": 1724398751, + "finished_at": 1724398751, + "files": [] + } + super().__init__(**args) + +class ChatEventCallbackHandler(BaseCallbackHandler): + _aqueue: asyncio.Queue + is_done: bool = False + + def __init__(self,**params): + """Initialize the base callback handler.""" + ignored_events = [ + # CBEventType.CHUNKING, + # CBEventType.NODE_PARSING, + # CBEventType.EMBEDDING, + # CBEventType.LLM, + # CBEventType.TEMPLATING, + ] + super().__init__(ignored_events, ignored_events) + self._aqueue = asyncio.Queue() + self._response:str = '' + self._params:Dict[str,Any] = params + self._nodeStack:deque = deque() + + #添加工作流开始事件 + ids:Dict[str,Any] = self._params['ids'] + data:ChatRequestData = self._params['data'] + args = ids + args.update( + { + 'use_id': data.user, + 'query': data.query, + 'conversation_id': data.conversation_id + } + ) + wf_event = Workflow_started_DifyChatResponseEvent(**args) + if wf_event.to_response() is not None: + self._aqueue.put_nowait(wf_event) + + def on_event_start( + self, + event_type: CBEventType, + payload: Optional[Dict[str, Any]] = None, + event_id: str = "", + **kwargs: Any, + ) -> str: + logger.info("event_start:{} type:{} payload:{}\n".format(event_id, event_type, payload)) + + self._nodeStack.append(event_id) + nindex = self._nodeStack.count() - 1 + + ids:Dict[str,Any] = self._params['ids'] + args = ids + args.update( + { + 'nodeid':event_id, + 'title':event_type.name, + 'index':nindex + 1, + 'predecessor_node_id': self._nodeStack[nindex - 1] if nindex > 0 else '' + } + ) + nd_event = Node_started_DifyChatResponseEvent(**args) + if nd_event.to_response() is not None: + self._aqueue.put_nowait(nd_event) + + + def on_event_end( + self, + event_type: CBEventType, + payload: Optional[Dict[str, Any]] = None, + event_id: str = "", + **kwargs: Any, + ) -> None: + logger.info("event_end:{} type:{} payload:{}\n".format(event_id, event_type, payload)) + #self.response = payload.get("response","") + args:Dict[str,Any] = self._params['ids'] + nodeID = self._nodeStack[-1] + if nodeID == event_id: + nindex = self._nodeStack.count() - 1 + args.update( + { + 'nodeid':event_id, + 'title':event_type.name, + 'index':nindex + 1, + 'predecessor_node_id':self._nodeStack[nindex - 1] if nindex > 0 else '' + } + ) + nd_event = Node_finished_DifyChatResponseEvent(**args) + if nd_event.to_response() is not None: + self._aqueue.put_nowait(nd_event) + self._nodeStack.pop() + + + def start_trace(self, trace_id: Optional[str] = None) -> None: + """No-op.""" + logger.info("trace_start:{}\n".format(trace_id)) + + def end_trace( + self, + trace_id: Optional[str] = None, + trace_map: Optional[Dict[str, List[str]]] = None, + ) -> None: + """No-op.""" + logger.info("trace_end:{} trace_map:{}\n".format(trace_id, trace_map)) + ids:Dict[str,Any] = self._params['ids'] + data:ChatRequestData = self._params['data'] + args = ids + args.update( + { + 'response':self._response, + 'conversation_id': data.conversation_id + } + ) + wf_event = Workflow_finished_DifyChatResponseEvent(**args) + if wf_event.to_response() is not None: + self._aqueue.put_nowait(wf_event) + + + args = ids + msgEnt_event = MessageEnd_DifyChatResponseEvent(**args) + if msgEnt_event.to_response() is not None: + self._aqueue.put_nowait(msgEnt_event) + + async def async_event_gen(self) -> AsyncGenerator[ChatCallbackEvent, None]: + while not self._aqueue.empty() or not self.is_done: + try: + yield await asyncio.wait_for(self._aqueue.get(), timeout=0.1) + except asyncio.TimeoutError: + pass + +class IDManager: + def createID(self): + return { + "message_id" : str(uuid.uuid4()), + 'task_id':str(uuid.uuid4()), + 'workflow_run_id': str(uuid.uuid4()), + "workflow_id": str(uuid.uuid4()) + } + class ChatStreamResponse(StreamingResponse): TEXT_PREFIX = "data: " DATA_PREFIX = "data: " + ids:Dict[str,Any] = {} + data:ChatRequestData = None @classmethod - def convert_text(cls, token: str): - # Escape newlines and double quotes to avoid breaking the stream - #token = json.dumps(token) - - #return f"data: {{"event": "message", "conversation_id": "80d85523-de92-4b9d-aca0-c48a5eacb068", "message_id": "16a06b1b-a89b-49c0-bc15-123bd999f6d6", "created_at": 1724406492, "task_id": "802f3064-030d-42ac-a882-0e1293712d04", "id": "16a06b1b-a89b-49c0-bc15-123bd999f6d6", "answer": "{token}"}}" - return "\n" - - @classmethod - def convert_data(cls, data: dict): - data_str = json.dumps(data) + def convert_Message(cls, token: str): + params = cls.ids + params.update({ + 'answer':token, + 'conversation_id':cls.data.conversation_id + }) + event = Message_DifyChatResponseEvent(**params) + data_str = json.dumps(event.dict()) return f"{cls.DATA_PREFIX}{data_str}\n\n" @classmethod - def convert_event(cls, event: DifyChatResponseEvent): - data_str = json.dumps(event.dict()) + def convert_Event(cls, data: dict): + data_str = json.dumps(data) return f"{cls.DATA_PREFIX}{data_str}\n\n" def __init__( @@ -269,8 +381,11 @@ class ChatStreamResponse(StreamingResponse): request: Request, event_handler: ChatEventCallbackHandler, response: StreamingAgentChatResponse, - data: ChatRequestData + data: ChatRequestData, + ids:Dict[str,Any] ): + ChatStreamResponse.ids = ids + ChatStreamResponse.data = data content = ChatStreamResponse.content_generator( request, event_handler, response, data ) @@ -284,41 +399,26 @@ class ChatStreamResponse(StreamingResponse): response: StreamingAgentChatResponse, data: ChatRequestData ): - ids = IDManager().createID() + # Yield the text response async def _chat_response_generator(): final_response = "" async for token in response.async_response_gen(): final_response += token - args = ids - args['answer'] = token - args['conversation_id'] = data.conversation_id - event = Message_DifyChatResponseEvent(**args) - yield ChatStreamResponse.convert_event(event) - #yield ChatStreamResponse.convert_text(token) + yield ChatStreamResponse.convert_Message(token) # 存储消息历史 message().add(user_id=data.user,conversation_id=data.conversation_id,query=data.query,answer=final_response) # the text_generator is the leading stream, once it's finished, also finish the event stream event_handler.is_done = True - # 发送工作流结束事件 - args = ids - args['response'] = final_response - args['conversation_id'] = data.conversation_id - wf_event = Workflow_finished_DifyChatResponseEvent(**args) - yield ChatStreamResponse.convert_event(wf_event) - - msgEnt_event = MessageEnd_DifyChatResponseEvent(**ids) - yield ChatStreamResponse.convert_event(msgEnt_event) - - + # Yield the events from the event handler async def _event_generator(): async for event in event_handler.async_event_gen(): event_response = event.to_response() if event_response is not None: - yield ChatStreamResponse.convert_text("") + yield ChatStreamResponse.convert_Event(event_response) combine = stream.merge(_chat_response_generator(), _event_generator()) is_stream_started = False @@ -327,25 +427,11 @@ class ChatStreamResponse(StreamingResponse): if not is_stream_started: is_stream_started = True - # 发送工作流开始事件 - args = ids - args['use_id'] = data.user - args['query'] = data.query - args['conversation_id'] = data.conversation_id - wf_event = Workflow_started_DifyChatResponseEvent(**args) - yield ChatStreamResponse.convert_event(wf_event) - - # Stream a blank message to start the stream - # 发送一个空消息事件 - #yield ChatStreamResponse.convert_text("") - yield output if await request.is_disconnected(): break - - @v.post("/chat-messages") async def post_conversations(request: Request, data: ChatRequestData): userMng.findNoExistCreate(data.user) @@ -365,14 +451,15 @@ async def post_conversations(request: Request, data: ChatRequestData): chat_engine = get_chat_engine(filters=filters, params=params) # 启动聊天事件监听 - event_handler = ChatEventCallbackHandler() + ids = IDManager().createID() + event_handler = ChatEventCallbackHandler(ids = ids,data = data) chat_engine.callback_manager.handlers.append(event_handler) # type: ignore # 执行异步聊天 response = await chat_engine.astream_chat(data.query) # 返回异步消息回应 - return ChatStreamResponse(request, event_handler, response, data) + return ChatStreamResponse(request, event_handler, response, data,ids) @v.get("/messages") async def query_messages(user:str, conversation_id:str): @@ -388,8 +475,9 @@ async def query_messages(user:str, conversation_id:str): for record in records: res = record.dict() + feeds = feedback().query(res['id']) res["message_files"] = [] - res["feedback"] = '' + res["feedback"] = {'rating':feeds['rating'] } if feeds != None else '' res["retriever_resources"] = [] res["created_at"] = 1723444905 res["agent_thoughts"] = [] @@ -440,48 +528,22 @@ async def query_conversations(user:str, first_id:str = None, limit:str = None, p async def query_parameters(user:str): params = parameter().get(user) if len(params) == 0: - params = { - "opening_statement": "您好,我是配网D3造价软件小助手,您可以问我有关配网造价软件的相关问题!", - "suggested_questions": [], - "suggested_questions_after_answer": { - "enabled": False - }, - "speech_to_text": { - "enabled": False - }, - "text_to_speech": { - "enabled": False, - "language": "", - "voice": "" - }, - "retriever_resource": { - "enabled": True - }, - "annotation_reply": { - "enabled": False - }, - "more_like_this": { - "enabled": False - }, - "user_input_form": [], - "sensitive_word_avoidance": { - "enabled": False - }, - "file_upload": { - "image": { - "enabled": False, - "number_limits": 3, - "transfer_methods": [ - "remote_url" - ] - } - }, - "system_parameters": { - "image_file_size_limit": "10" - } - } + params = BaseConfig().ParamterCfg() return params +@v.post("/messages/{message_id}/feedbacks") +async def post_feedbacks(request: Request,message_id:str,params:Dict[str,Any]): + if params['rating'] =='null': + feedback().delete(message_id) + else: + condition = {'id':message_id} + results = message().query(**condition) + if len(results) > 0: + result = results[0] + feedback().add(message_id=message_id,query=result['query'], + answer=result['answer'],rating=params['rating']) + @r.post("") def upload_file(request: ChatFileUploadRequest) -> List[str]: - pass \ No newline at end of file + pass + diff --git a/backend/app/api/routers/request/base.py b/backend/app/api/routers/request/base.py index bb90305..234323b 100644 --- a/backend/app/api/routers/request/base.py +++ b/backend/app/api/routers/request/base.py @@ -25,7 +25,7 @@ class conversations: return None def add(self,id:str, user_id:str, name:str): - template = BaseConfig.ConversationCfg + template = BaseConfig().ConversationCfg() template['id'] = id template['user_id'] = user_id template['name'] = name @@ -111,7 +111,7 @@ class message: return datas def add(self,user_id:str,conversation_id:str,query:str,answer:str): - template = BaseConfig.MessageCfg + template = BaseConfig.MessageCfg() template['id'] = str(uuid.uuid4()) template['user_id'] = user_id template['conversation_id'] = conversation_id @@ -122,4 +122,34 @@ class message: def delete(self,user_id:str): dbManage.delete(self._tableName,user_id = user_id) + def query(self,**condition): + results = [] + records = dbManage.query(self._tableName,**condition) + for record in records: + results.append(record.dict()) + return results +class feedback: + def __init__(self) -> None: + self._tableName = 'feedbacks' + dbManage.createTable(self._tableName) + + def add(self,message_id:str,query:str,answer:str,rating:str): + record = { + 'message_id': message_id, + 'query': query, + 'answer': answer, + 'rating': rating, + } + dbManage.addRecord(self._tableName,record) + + def delete(self,message_id:str): + cond = {'message_id':message_id} + dbManage.delete(self._tableName,**cond) + + def query(self,message_id:str): + cond = {'message_id':message_id} + records = dbManage.query(self._tableName,**cond) + if len(records) > 0: + return records[0].dict() + return None \ No newline at end of file diff --git a/backend/app/api/routers/request/baseConfig.py b/backend/app/api/routers/request/baseConfig.py index 7dce858..eb8515a 100644 --- a/backend/app/api/routers/request/baseConfig.py +++ b/backend/app/api/routers/request/baseConfig.py @@ -1,62 +1,71 @@ +from pydantic import BaseModel +import os +class BaseConfig(BaseModel): + projectInfo:str = os.getenv("PROJECT_TITLE","您好,我是博微工程理解小助手,您可以问我有关[线路工程]工程数据的相关问题!") -class BaseConfig: - ParamterCfg = { - "opening_statement": "您好,我是配网D3造价软件小助手,您可以问我有关配网造价软件的相关问题!", - "suggested_questions": [], - "suggested_questions_after_answer": { - "enabled": False - }, - "speech_to_text": { - "enabled": False - }, - "text_to_speech": { - "enabled": False, - "language": "", - "voice": "" - }, - "retriever_resource": { - "enabled": True - }, - "annotation_reply": { - "enabled": False - }, - "more_like_this": { - "enabled": False - }, - "user_input_form": [], - "sensitive_word_avoidance": { - "enabled": False - }, - "file_upload": { - "image": { + def ParamterCfg(self): + questions = os.getenv("CONVERSATION_STARTERS", "dev") + return{ + "opening_statement": self.projectInfo, + "suggested_questions": questions.split('\n'), + "suggested_questions_after_answer": { + "enabled": False + }, + "speech_to_text": { + "enabled": False + }, + "text_to_speech": { "enabled": False, - "number_limits": 3, - "transfer_methods": [ - "remote_url" - ] + "language": "", + "voice": "" + }, + "retriever_resource": { + "enabled": True + }, + "annotation_reply": { + "enabled": False + }, + "more_like_this": { + "enabled": False + }, + "user_input_form": [], + "sensitive_word_avoidance": { + "enabled": False + }, + "file_upload": { + "image": { + "enabled": False, + "number_limits": 3, + "transfer_methods": [ + "remote_url" + ] + } + }, + "system_parameters": { + "image_file_size_limit": "10" } - }, - "system_parameters": { - "image_file_size_limit": "10" } - } + + def ConversationCfg(self): + return{ + "id": "", + 'user_id':'', + "name": "", + "inputs": {}, + "status": "normal", + "introduction": self.projectInfo, + "created_at":'' + } - ConversationCfg = { - "id": "", - 'user_id':'', - "name": "", - "inputs": {}, - "status": "normal", - "introduction": ParamterCfg['opening_statement'], - "created_at":'' - } - - - MessageCfg = { + @classmethod + def MessageCfg(cls): + return { "id": "", 'user_id':'', "conversation_id": "", "inputs": {}, "query": "", "answer": "" - } \ No newline at end of file + } + + diff --git a/backend/app/api/routers/request/dbOrm.py b/backend/app/api/routers/request/dbOrm.py index 796b90c..38af99d 100644 --- a/backend/app/api/routers/request/dbOrm.py +++ b/backend/app/api/routers/request/dbOrm.py @@ -2,7 +2,7 @@ import os from typing import Dict, List, Any from pydantic import BaseModel -from sqlalchemy import create_engine, Column, String, Integer, JSON +from sqlalchemy import create_engine, Column, String, Integer, JSON,Float from sqlalchemy.engine.reflection import Inspector from sqlalchemy.orm import sessionmaker, declarative_base @@ -24,10 +24,6 @@ class ConversationOrm(Base): if 'name' in data: self.name = data['name'] - - - - class UserOrm(Base): __tablename__ = "user" @@ -51,6 +47,14 @@ class MessagesOrm(Base): query = Column(String) answer = Column(String) +class FeedBackOrm(Base): + __tablename__ = "feedbacks" + + message_id = Column(String,primary_key=True) + query = Column(String) + answer = Column(String) + rating = Column(String) + #数据结构 class ConversationModel(BaseModel): id: str @@ -61,7 +65,6 @@ class ConversationModel(BaseModel): created_at: int class Config: - #orm_mode = True from_attributes=True @classmethod @@ -73,7 +76,6 @@ class UserModel(BaseModel): createtime: str class Config: - #orm_mode = True from_attributes=True @classmethod @@ -86,7 +88,6 @@ class ParametersModel(BaseModel): value : Dict[str, Any] class Config: - #orm_mode = True from_attributes=True @classmethod @@ -101,13 +102,25 @@ class MessagesModel(BaseModel): answer : str class Config: - #orm_mode = True from_attributes=True @classmethod def orm(cls): return MessagesOrm +class FeedBackModel(BaseModel): + message_id :str + query :str + answer :str + rating :str + + class Config: + from_attributes=True + + @classmethod + def orm(cls): + return FeedBackOrm + class DBManager: def __init__(self) -> None: DATABASE_URL = os.getenv("SQLITE_DATABASE_URL") diff --git a/backend/app/api/routers/request/models.py b/backend/app/api/routers/request/models.py index d76af75..c06204a 100644 --- a/backend/app/api/routers/request/models.py +++ b/backend/app/api/routers/request/models.py @@ -1,7 +1,7 @@ from typing import Dict, Any from pydantic import BaseModel - +from typing import Optional class ChatRequestData(BaseModel): inputs: Dict[str,Any] @@ -12,4 +12,6 @@ class ChatRequestData(BaseModel): conversation_id: str = None class ChatFileUploadRequest(BaseModel): - base64: str \ No newline at end of file + base64: str + + From 480a1f7fdc72c6c5fb8455d3ead4ef09c705becb Mon Sep 17 00:00:00 2001 From: wanyaokun <12345678> Date: Thu, 29 Aug 2024 19:03:38 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=B7=A5=E7=A8=8B?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/.env.example | 1 + backend/.env.xinference | 1 + 2 files changed, 2 insertions(+) diff --git a/backend/.env.example b/backend/.env.example index 83e69c1..a549405 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -80,3 +80,4 @@ SYSTEM_PROMPT="You are a weather forecast agent. You help users to get the weath - You can install any pip package (if it exists) by running a cell with pip install. " +PROJECT_TITLE = "您好,我是博微工程理解小助手,您可以问我有关[线路工程]工程数据的相关问题!" \ No newline at end of file diff --git a/backend/.env.xinference b/backend/.env.xinference index 1dc074c..7a00d93 100644 --- a/backend/.env.xinference +++ b/backend/.env.xinference @@ -111,3 +111,4 @@ SYSTEM_PROMPT="You are a weather forecast agent. You help users to get the weath - You can install any pip package (if it exists) by running a cell with pip install. " +PROJECT_TITLE = "您好,我是博微工程理解小助手,您可以问我有关[线路工程]工程数据的相关问题!" \ No newline at end of file From 03c4eb1af18def7f9eaea6258e15d915f6c9448a Mon Sep 17 00:00:00 2001 From: wanyaokun <12345678> Date: Thu, 29 Aug 2024 19:52:53 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E4=BC=98=E5=8C=96ChatCallbackEvent?= =?UTF-8?q?=E4=BA=8B=E4=BB=B6=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/app/api/routers/app.py | 323 +++++++----------- backend/app/api/routers/request/baseConfig.py | 9 + 2 files changed, 141 insertions(+), 191 deletions(-) diff --git a/backend/app/api/routers/app.py b/backend/app/api/routers/app.py index bca5fd5..1bd7e9a 100644 --- a/backend/app/api/routers/app.py +++ b/backend/app/api/routers/app.py @@ -26,199 +26,144 @@ api_router = r = APIRouter() v1_router = v = APIRouter() class ChatCallbackEvent(BaseModel): - event_type: CBEventType + event_type: ChatEventType payload: Optional[Dict[str, Any]] = None - event_id: str = "" - def get_retrieval_message(self) -> dict | None: - if self.payload: - nodes = self.payload.get("nodes") - if nodes: - msg = f"根据查询检索到 {len(nodes)} 源文件" - else: - msg = f"查询检索中: '{self.payload.get('query_str')}'" - return { - "type": "events", - "data": {"title": msg}, - } - else: - return None + def get_common_param(self)-> dict: + return { + 'event': self.event_type.name, + 'conversation_id':self.payload.get("conversation_id"), + 'message_id': self.payload.get("message_id"), + 'created_at': int(time.time()), + 'task_id': self.payload.get("task_id") + } - def get_tool_message(self) -> dict | None: - func_call_args = self.payload.get("function_call") - if func_call_args is not None and "tool" in self.payload: - tool = self.payload.get("tool") - return { - "type": "events", - "data": { - "title": f"调用工具 {tool.name} ,参数: {func_call_args}", + def get_WorkflowStart_param(self) -> dict: + params = self.get_common_param() + params.update({ + 'workflow_run_id':self.payload.get('workflow_run_id'), + 'data':{ + "id": self.payload.get('workflow_run_id'), + "workflow_id": self.payload.get('workflow_id'), + "sequence_number": 1709, + "inputs": { + "sys.query": self.payload.get('query'), + "sys.files": [], + "sys.conversation_id": self.payload.get('conversation_id'), + "sys.user_id": self.payload.get('use_id') }, + "created_at": int(time.time()) } + }) + return params - def _is_output_serializable(self, output: Any) -> bool: - try: - json.dumps(output) - return True - except TypeError: - return False + def get_WorkflowFinished_param(self) -> dict: + params = self.get_common_param() + params.update({ + 'workflow_run_id':self.payload.get('workflow_run_id'), + 'data':{ + "id": self.payload.get('workflow_run_id'), + "workflow_id": self.payload.get('workflow_id'), + "sequence_number": 1709, + "status": "succeeded", + "outputs": { + "answer": self.payload.get('response') + }, + "error": '', + "elapsed_time": 36.03764106379822, + "total_tokens": 11707, + "total_steps": 10, + "created_by": { + "id": str(uuid.uuid4()), + "user": self.payload.get('use_id') + }, + "created_at": int(time.time()), + "finished_at": int(time.time()), + "files": [] + } + }) + return params + + def get_NodeStart_param(self) -> dict: + params = self.get_common_param() + params.update({ + 'workflow_run_id':self.payload.get('workflow_run_id'), + 'data':{ + "id": self.payload.get('nodeid'), + "node_id": self.payload.get('nodeid'), + "node_type": "http-request", + "title": self.payload.get('title'), + "index": self.payload.get('index'), + "predecessor_node_id": self.payload.get('predecessor_node_id'), + "inputs": '', + "created_at": 1724398751, + "extras": {} + } + }) + return params - def get_agent_tool_response(self) -> dict | None: - response = self.payload.get("response") - if response is not None: - sources = response.sources - for source in sources: - # Return the tool response here to include the toolCall information - if isinstance(source, ToolOutput): - if self._is_output_serializable(source.raw_output): - output = source.raw_output - else: - output = source.content + def get_NodeFinished_param(self) -> dict: + params = self.get_common_param() + params.update({ + 'workflow_run_id':self.payload.get('workflow_run_id'), + 'data':{ + "id": self.payload.get('nodeid'), + "node_id": self.payload.get('nodeid'), + "node_type": "http-request", + "title": self.payload.get('title'), + "index": self.payload.get('index'), + "predecessor_node_id": self.payload.get('predecessor_node_id'), + "inputs": '', + "process_data": '', + "outputs": '', + "status": "succeeded", + "error": '', + "elapsed_time": 0.10402441816404462, + "execution_metadata": '', + "created_at": 1724398751, + "finished_at": 1724398751, + "files": [] + } + }) + return params - return { - "type": "tools", - "data": { - "toolOutput": { - "output": output, - "isError": source.is_error, - }, - "toolCall": { - "id": None, # There is no tool id in the ToolOutput - "name": source.tool_name, - "input": source.raw_input, - }, - }, - } + def get_Message_param(self) -> dict: + params = self.get_common_param() + params.update({ + 'id':self.payload.get('message_id'), + 'answer':self.payload.get('answer') + }) + return params + + def get_MessageEnd_param(self) -> dict: + params = self.get_common_param() + params.update({ + 'id':self.payload.get('message_id'), + 'metadata':self.payload.get('metadata') + }) + return params - def to_response(self): + def to_response(self)-> dict|None: try: match self.event_type: - case "retrieve": - return self.get_retrieval_message() - case "function_call": - return self.get_tool_message() - case "agent_step": - return self.get_agent_tool_response() + case "workflow_started": + return self.get_WorkflowStart_param() + case "workflow_finished": + return self.get_WorkflowFinished_param() + case "node_started": + return self.get_NodeStart_param() + case 'node_finished': + return self.get_NodeFinished_param() + case 'message': + return self.get_Message_param() + case 'message_end': + return self.get_MessageEnd_param() case _: return None except Exception as e: logger.error(f"转换回应时间时发生错误,原因: {e}") return None -class DifyChatResponseEvent(BaseModel): - event: str - conversation_id: str - message_id: str - created_at: int = int(time.time()) - task_id: str - - def to_response(self): - return self.dict() - -class Workflow_started_DifyChatResponseEvent(DifyChatResponseEvent): - event: str = 'workflow_started' - workflow_run_id:str - data:Dict[str,Any] - def __init__(self,**args): - args['data'] = { - "id": args['workflow_run_id'], - "workflow_id": args['workflow_id'], - "sequence_number": 1709, - "inputs": { - "sys.query": args['query'], - "sys.files": [], - "sys.conversation_id": args['conversation_id'], - "sys.user_id": args['use_id'] - }, - "created_at": int(time.time()) - } - super().__init__(**args) - -class Workflow_finished_DifyChatResponseEvent(DifyChatResponseEvent): - event: str = 'workflow_finished' - workflow_run_id:str - data:Dict[str,Any] - def __init__(self,**args): - args['data'] = { - "id": args['workflow_run_id'], - "workflow_id": args['workflow_id'], - "sequence_number": 1709, - "status": "succeeded", - "outputs": { - "answer": args['response'] - }, - "error": '', - "elapsed_time": 36.03764106379822, - "total_tokens": 11707, - "total_steps": 10, - "created_by": { - "id": str(uuid.uuid4()), - "user": args['use_id'] - }, - "created_at": int(time.time()), - "finished_at": int(time.time()), - "files": [] - } - super().__init__(**args) - -class Message_DifyChatResponseEvent(DifyChatResponseEvent): - event: str = 'message' - id:str - answer:str - def __init__(self,**args): - args['id'] = args['message_id'] - super().__init__(**args) - -class MessageEnd_DifyChatResponseEvent(DifyChatResponseEvent): - event: str = 'message_end' - id:str - metadata:Dict[str,Any] = {} - def __init__(self,**args): - args['id'] = args['message_id'] - super().__init__(**args) - -class Node_started_DifyChatResponseEvent(DifyChatResponseEvent): - event: str = 'node_started' - workflow_run_id:str - data:Dict[str,Any] - def __init__(self,**args): - args['data'] = { - "id": args['nodeid'], - "node_id": args['nodeid'], - "node_type": "http-request", - "title": args['title'], - "index": args['index'], - "predecessor_node_id": args['predecessor_node_id'], - "inputs": '', - "created_at": 1724398751, - "extras": {} - } - super().__init__(**args) - -class Node_finished_DifyChatResponseEvent(DifyChatResponseEvent): - event: str = 'node_finished' - workflow_run_id:str - data:Dict[str,Any] - def __init__(self,**args): - args['data'] = { - "id": args['nodeid'], - "node_id": args['nodeid'], - "node_type": "http-request", - "title": args['title'], - "index": args['index'], - "predecessor_node_id": args['predecessor_node_id'], - "inputs": '', - "process_data": '', - "outputs": '', - "status": "succeeded", - "error": '', - "elapsed_time": 0.10402441816404462, - "execution_metadata": '', - "created_at": 1724398751, - "finished_at": 1724398751, - "files": [] - } - super().__init__(**args) - class ChatEventCallbackHandler(BaseCallbackHandler): _aqueue: asyncio.Queue is_done: bool = False @@ -239,9 +184,8 @@ class ChatEventCallbackHandler(BaseCallbackHandler): self._nodeStack:deque = deque() #添加工作流开始事件 - ids:Dict[str,Any] = self._params['ids'] data:ChatRequestData = self._params['data'] - args = ids + args:Dict[str,Any] = self._params['ids'] args.update( { 'use_id': data.user, @@ -249,7 +193,7 @@ class ChatEventCallbackHandler(BaseCallbackHandler): 'conversation_id': data.conversation_id } ) - wf_event = Workflow_started_DifyChatResponseEvent(**args) + wf_event = ChatCallbackEvent(event_type = ChatEventType.WORKFLOW_START,payload = args) if wf_event.to_response() is not None: self._aqueue.put_nowait(wf_event) @@ -264,9 +208,7 @@ class ChatEventCallbackHandler(BaseCallbackHandler): self._nodeStack.append(event_id) nindex = self._nodeStack.count() - 1 - - ids:Dict[str,Any] = self._params['ids'] - args = ids + args:Dict[str,Any] = self._params['ids'] args.update( { 'nodeid':event_id, @@ -275,7 +217,7 @@ class ChatEventCallbackHandler(BaseCallbackHandler): 'predecessor_node_id': self._nodeStack[nindex - 1] if nindex > 0 else '' } ) - nd_event = Node_started_DifyChatResponseEvent(**args) + nd_event = ChatCallbackEvent(event_type = ChatEventType.NODE_START,payload = args) if nd_event.to_response() is not None: self._aqueue.put_nowait(nd_event) @@ -302,7 +244,7 @@ class ChatEventCallbackHandler(BaseCallbackHandler): 'predecessor_node_id':self._nodeStack[nindex - 1] if nindex > 0 else '' } ) - nd_event = Node_finished_DifyChatResponseEvent(**args) + nd_event = ChatCallbackEvent(event_type = ChatEventType.NODE_FINISHED,payload = args) if nd_event.to_response() is not None: self._aqueue.put_nowait(nd_event) self._nodeStack.pop() @@ -319,22 +261,21 @@ class ChatEventCallbackHandler(BaseCallbackHandler): ) -> None: """No-op.""" logger.info("trace_end:{} trace_map:{}\n".format(trace_id, trace_map)) - ids:Dict[str,Any] = self._params['ids'] data:ChatRequestData = self._params['data'] - args = ids + args:Dict[str,Any] = self._params['ids'] args.update( { 'response':self._response, 'conversation_id': data.conversation_id } ) - wf_event = Workflow_finished_DifyChatResponseEvent(**args) + wf_event = ChatCallbackEvent(event_type = ChatEventType.WORKFLOW_FINISHED,payload = args) if wf_event.to_response() is not None: self._aqueue.put_nowait(wf_event) - args = ids - msgEnt_event = MessageEnd_DifyChatResponseEvent(**args) + args:Dict[str,Any] = self._params['ids'] + msgEnt_event = ChatCallbackEvent(event_type = ChatEventType.MESSAGE_END,payload = args) if msgEnt_event.to_response() is not None: self._aqueue.put_nowait(msgEnt_event) @@ -367,8 +308,8 @@ class ChatStreamResponse(StreamingResponse): 'answer':token, 'conversation_id':cls.data.conversation_id }) - event = Message_DifyChatResponseEvent(**params) - data_str = json.dumps(event.dict()) + event = ChatCallbackEvent(event_type = ChatEventType.MESSAGE,payload = params) + data_str = json.dumps(event.to_response()) return f"{cls.DATA_PREFIX}{data_str}\n\n" @classmethod diff --git a/backend/app/api/routers/request/baseConfig.py b/backend/app/api/routers/request/baseConfig.py index eb8515a..d254d8a 100644 --- a/backend/app/api/routers/request/baseConfig.py +++ b/backend/app/api/routers/request/baseConfig.py @@ -1,5 +1,7 @@ from pydantic import BaseModel import os +from enum import Enum + class BaseConfig(BaseModel): projectInfo:str = os.getenv("PROJECT_TITLE","您好,我是博微工程理解小助手,您可以问我有关[线路工程]工程数据的相关问题!") @@ -69,3 +71,10 @@ class BaseConfig(BaseModel): } +class ChatEventType(str, Enum): + WORKFLOW_START = "workflow_started" + WORKFLOW_FINISHED = "workflow_finished" + NODE_START = "node_started" + NODE_FINISHED = "node_finished" + MESSAGE = "message" + MESSAGE_END = "message_end" \ No newline at end of file