diff --git a/backend/app/api/routers/app.py b/backend/app/api/routers/app.py index cb53bd2..bca5fd5 100644 --- a/backend/app/api/routers/app.py +++ b/backend/app/api/routers/app.py @@ -3,6 +3,7 @@ import json import logging import time from typing import Dict, List, Any, Optional, AsyncGenerator +from collections import deque from aiostream import stream from fastapi import APIRouter, Request @@ -13,7 +14,8 @@ from llama_index.core.callbacks import CBEventType from llama_index.core.chat_engine.types import StreamingAgentChatResponse from llama_index.core.tools import ToolOutput from pydantic import BaseModel -from app.api.routers.request.base import userMng, conversations,message,parameter +from app.api.routers.request.base import userMng, conversations,message,parameter,feedback +from app.api.routers.request.baseConfig import * from app.api.routers.request.models import ChatRequestData,ChatFileUploadRequest from app.engine import get_chat_engine import uuid @@ -102,77 +104,6 @@ class ChatCallbackEvent(BaseModel): logger.error(f"转换回应时间时发生错误,原因: {e}") return None -class ChatEventCallbackHandler(BaseCallbackHandler): - _aqueue: asyncio.Queue - is_done: bool = False - - def __init__( - self, - ): - """Initialize the base callback handler.""" - ignored_events = [ - # CBEventType.CHUNKING, - # CBEventType.NODE_PARSING, - # CBEventType.EMBEDDING, - # CBEventType.LLM, - # CBEventType.TEMPLATING, - ] - super().__init__(ignored_events, ignored_events) - self._aqueue = asyncio.Queue() - - def on_event_start( - self, - event_type: CBEventType, - payload: Optional[Dict[str, Any]] = None, - event_id: str = "", - **kwargs: Any, - ) -> str: - logger.info("event_start:{} type:{} payload:{}\n".format(event_id, event_type, payload)) - - event = ChatCallbackEvent(event_id=event_id, event_type=event_type, payload=payload) - if event.to_response() is not None: - self._aqueue.put_nowait(event) - - def on_event_end( - self, - event_type: CBEventType, - payload: Optional[Dict[str, Any]] = None, - event_id: str = "", - **kwargs: Any, - ) -> None: - logger.info("event_end:{} type:{} payload:{}\n".format(event_id, event_type, payload)) - event = ChatCallbackEvent(event_id=event_id, event_type=event_type, payload=payload) - if event.to_response() is not None: - self._aqueue.put_nowait(event) - - def start_trace(self, trace_id: Optional[str] = None) -> None: - """No-op.""" - logger.info("trace_start:{}\n".format(trace_id)) - - def end_trace( - self, - trace_id: Optional[str] = None, - trace_map: Optional[Dict[str, List[str]]] = None, - ) -> None: - """No-op.""" - logger.info("trace_end:{} trace_map:{}\n".format(trace_id, trace_map)) - - async def async_event_gen(self) -> AsyncGenerator[ChatCallbackEvent, None]: - while not self._aqueue.empty() or not self.is_done: - try: - yield await asyncio.wait_for(self._aqueue.get(), timeout=0.1) - except asyncio.TimeoutError: - pass - -class IDManager: - def createID(self): - return { - "message_id" : str(uuid.uuid4()), - 'task_id':str(uuid.uuid4()), - 'workflow_run_id': str(uuid.uuid4()), - "workflow_id": str(uuid.uuid4()) - } - class DifyChatResponseEvent(BaseModel): event: str conversation_id: str @@ -180,7 +111,11 @@ class DifyChatResponseEvent(BaseModel): created_at: int = int(time.time()) task_id: str + def to_response(self): + return self.dict() + class Workflow_started_DifyChatResponseEvent(DifyChatResponseEvent): + event: str = 'workflow_started' workflow_run_id:str data:Dict[str,Any] def __init__(self,**args): @@ -196,14 +131,13 @@ class Workflow_started_DifyChatResponseEvent(DifyChatResponseEvent): }, "created_at": int(time.time()) } - args['event'] = 'workflow_started' super().__init__(**args) - + class Workflow_finished_DifyChatResponseEvent(DifyChatResponseEvent): + event: str = 'workflow_finished' workflow_run_id:str data:Dict[str,Any] def __init__(self,**args): - args['event'] = 'workflow_finished' args['data'] = { "id": args['workflow_run_id'], "workflow_id": args['workflow_id'], @@ -227,41 +161,219 @@ class Workflow_finished_DifyChatResponseEvent(DifyChatResponseEvent): super().__init__(**args) class Message_DifyChatResponseEvent(DifyChatResponseEvent): + event: str = 'message' id:str answer:str def __init__(self,**args): args['id'] = args['message_id'] - args['event'] = 'message' super().__init__(**args) class MessageEnd_DifyChatResponseEvent(DifyChatResponseEvent): + event: str = 'message_end' id:str metadata:Dict[str,Any] = {} def __init__(self,**args): args['id'] = args['message_id'] - args['event'] = 'message_end' super().__init__(**args) + +class Node_started_DifyChatResponseEvent(DifyChatResponseEvent): + event: str = 'node_started' + workflow_run_id:str + data:Dict[str,Any] + def __init__(self,**args): + args['data'] = { + "id": args['nodeid'], + "node_id": args['nodeid'], + "node_type": "http-request", + "title": args['title'], + "index": args['index'], + "predecessor_node_id": args['predecessor_node_id'], + "inputs": '', + "created_at": 1724398751, + "extras": {} + } + super().__init__(**args) + +class Node_finished_DifyChatResponseEvent(DifyChatResponseEvent): + event: str = 'node_finished' + workflow_run_id:str + data:Dict[str,Any] + def __init__(self,**args): + args['data'] = { + "id": args['nodeid'], + "node_id": args['nodeid'], + "node_type": "http-request", + "title": args['title'], + "index": args['index'], + "predecessor_node_id": args['predecessor_node_id'], + "inputs": '', + "process_data": '', + "outputs": '', + "status": "succeeded", + "error": '', + "elapsed_time": 0.10402441816404462, + "execution_metadata": '', + "created_at": 1724398751, + "finished_at": 1724398751, + "files": [] + } + super().__init__(**args) + +class ChatEventCallbackHandler(BaseCallbackHandler): + _aqueue: asyncio.Queue + is_done: bool = False + + def __init__(self,**params): + """Initialize the base callback handler.""" + ignored_events = [ + # CBEventType.CHUNKING, + # CBEventType.NODE_PARSING, + # CBEventType.EMBEDDING, + # CBEventType.LLM, + # CBEventType.TEMPLATING, + ] + super().__init__(ignored_events, ignored_events) + self._aqueue = asyncio.Queue() + self._response:str = '' + self._params:Dict[str,Any] = params + self._nodeStack:deque = deque() + + #添加工作流开始事件 + ids:Dict[str,Any] = self._params['ids'] + data:ChatRequestData = self._params['data'] + args = ids + args.update( + { + 'use_id': data.user, + 'query': data.query, + 'conversation_id': data.conversation_id + } + ) + wf_event = Workflow_started_DifyChatResponseEvent(**args) + if wf_event.to_response() is not None: + self._aqueue.put_nowait(wf_event) + + def on_event_start( + self, + event_type: CBEventType, + payload: Optional[Dict[str, Any]] = None, + event_id: str = "", + **kwargs: Any, + ) -> str: + logger.info("event_start:{} type:{} payload:{}\n".format(event_id, event_type, payload)) + + self._nodeStack.append(event_id) + nindex = self._nodeStack.count() - 1 + + ids:Dict[str,Any] = self._params['ids'] + args = ids + args.update( + { + 'nodeid':event_id, + 'title':event_type.name, + 'index':nindex + 1, + 'predecessor_node_id': self._nodeStack[nindex - 1] if nindex > 0 else '' + } + ) + nd_event = Node_started_DifyChatResponseEvent(**args) + if nd_event.to_response() is not None: + self._aqueue.put_nowait(nd_event) + + + def on_event_end( + self, + event_type: CBEventType, + payload: Optional[Dict[str, Any]] = None, + event_id: str = "", + **kwargs: Any, + ) -> None: + logger.info("event_end:{} type:{} payload:{}\n".format(event_id, event_type, payload)) + #self.response = payload.get("response","") + args:Dict[str,Any] = self._params['ids'] + nodeID = self._nodeStack[-1] + if nodeID == event_id: + nindex = self._nodeStack.count() - 1 + args.update( + { + 'nodeid':event_id, + 'title':event_type.name, + 'index':nindex + 1, + 'predecessor_node_id':self._nodeStack[nindex - 1] if nindex > 0 else '' + } + ) + nd_event = Node_finished_DifyChatResponseEvent(**args) + if nd_event.to_response() is not None: + self._aqueue.put_nowait(nd_event) + self._nodeStack.pop() + + + def start_trace(self, trace_id: Optional[str] = None) -> None: + """No-op.""" + logger.info("trace_start:{}\n".format(trace_id)) + + def end_trace( + self, + trace_id: Optional[str] = None, + trace_map: Optional[Dict[str, List[str]]] = None, + ) -> None: + """No-op.""" + logger.info("trace_end:{} trace_map:{}\n".format(trace_id, trace_map)) + ids:Dict[str,Any] = self._params['ids'] + data:ChatRequestData = self._params['data'] + args = ids + args.update( + { + 'response':self._response, + 'conversation_id': data.conversation_id + } + ) + wf_event = Workflow_finished_DifyChatResponseEvent(**args) + if wf_event.to_response() is not None: + self._aqueue.put_nowait(wf_event) + + + args = ids + msgEnt_event = MessageEnd_DifyChatResponseEvent(**args) + if msgEnt_event.to_response() is not None: + self._aqueue.put_nowait(msgEnt_event) + + async def async_event_gen(self) -> AsyncGenerator[ChatCallbackEvent, None]: + while not self._aqueue.empty() or not self.is_done: + try: + yield await asyncio.wait_for(self._aqueue.get(), timeout=0.1) + except asyncio.TimeoutError: + pass + +class IDManager: + def createID(self): + return { + "message_id" : str(uuid.uuid4()), + 'task_id':str(uuid.uuid4()), + 'workflow_run_id': str(uuid.uuid4()), + "workflow_id": str(uuid.uuid4()) + } + class ChatStreamResponse(StreamingResponse): TEXT_PREFIX = "data: " DATA_PREFIX = "data: " + ids:Dict[str,Any] = {} + data:ChatRequestData = None @classmethod - def convert_text(cls, token: str): - # Escape newlines and double quotes to avoid breaking the stream - #token = json.dumps(token) - - #return f"data: {{"event": "message", "conversation_id": "80d85523-de92-4b9d-aca0-c48a5eacb068", "message_id": "16a06b1b-a89b-49c0-bc15-123bd999f6d6", "created_at": 1724406492, "task_id": "802f3064-030d-42ac-a882-0e1293712d04", "id": "16a06b1b-a89b-49c0-bc15-123bd999f6d6", "answer": "{token}"}}" - return "\n" - - @classmethod - def convert_data(cls, data: dict): - data_str = json.dumps(data) + def convert_Message(cls, token: str): + params = cls.ids + params.update({ + 'answer':token, + 'conversation_id':cls.data.conversation_id + }) + event = Message_DifyChatResponseEvent(**params) + data_str = json.dumps(event.dict()) return f"{cls.DATA_PREFIX}{data_str}\n\n" @classmethod - def convert_event(cls, event: DifyChatResponseEvent): - data_str = json.dumps(event.dict()) + def convert_Event(cls, data: dict): + data_str = json.dumps(data) return f"{cls.DATA_PREFIX}{data_str}\n\n" def __init__( @@ -269,8 +381,11 @@ class ChatStreamResponse(StreamingResponse): request: Request, event_handler: ChatEventCallbackHandler, response: StreamingAgentChatResponse, - data: ChatRequestData + data: ChatRequestData, + ids:Dict[str,Any] ): + ChatStreamResponse.ids = ids + ChatStreamResponse.data = data content = ChatStreamResponse.content_generator( request, event_handler, response, data ) @@ -284,41 +399,26 @@ class ChatStreamResponse(StreamingResponse): response: StreamingAgentChatResponse, data: ChatRequestData ): - ids = IDManager().createID() + # Yield the text response async def _chat_response_generator(): final_response = "" async for token in response.async_response_gen(): final_response += token - args = ids - args['answer'] = token - args['conversation_id'] = data.conversation_id - event = Message_DifyChatResponseEvent(**args) - yield ChatStreamResponse.convert_event(event) - #yield ChatStreamResponse.convert_text(token) + yield ChatStreamResponse.convert_Message(token) # 存储消息历史 message().add(user_id=data.user,conversation_id=data.conversation_id,query=data.query,answer=final_response) # the text_generator is the leading stream, once it's finished, also finish the event stream event_handler.is_done = True - # 发送工作流结束事件 - args = ids - args['response'] = final_response - args['conversation_id'] = data.conversation_id - wf_event = Workflow_finished_DifyChatResponseEvent(**args) - yield ChatStreamResponse.convert_event(wf_event) - - msgEnt_event = MessageEnd_DifyChatResponseEvent(**ids) - yield ChatStreamResponse.convert_event(msgEnt_event) - - + # Yield the events from the event handler async def _event_generator(): async for event in event_handler.async_event_gen(): event_response = event.to_response() if event_response is not None: - yield ChatStreamResponse.convert_text("") + yield ChatStreamResponse.convert_Event(event_response) combine = stream.merge(_chat_response_generator(), _event_generator()) is_stream_started = False @@ -327,25 +427,11 @@ class ChatStreamResponse(StreamingResponse): if not is_stream_started: is_stream_started = True - # 发送工作流开始事件 - args = ids - args['use_id'] = data.user - args['query'] = data.query - args['conversation_id'] = data.conversation_id - wf_event = Workflow_started_DifyChatResponseEvent(**args) - yield ChatStreamResponse.convert_event(wf_event) - - # Stream a blank message to start the stream - # 发送一个空消息事件 - #yield ChatStreamResponse.convert_text("") - yield output if await request.is_disconnected(): break - - @v.post("/chat-messages") async def post_conversations(request: Request, data: ChatRequestData): userMng.findNoExistCreate(data.user) @@ -365,14 +451,15 @@ async def post_conversations(request: Request, data: ChatRequestData): chat_engine = get_chat_engine(filters=filters, params=params) # 启动聊天事件监听 - event_handler = ChatEventCallbackHandler() + ids = IDManager().createID() + event_handler = ChatEventCallbackHandler(ids = ids,data = data) chat_engine.callback_manager.handlers.append(event_handler) # type: ignore # 执行异步聊天 response = await chat_engine.astream_chat(data.query) # 返回异步消息回应 - return ChatStreamResponse(request, event_handler, response, data) + return ChatStreamResponse(request, event_handler, response, data,ids) @v.get("/messages") async def query_messages(user:str, conversation_id:str): @@ -388,8 +475,9 @@ async def query_messages(user:str, conversation_id:str): for record in records: res = record.dict() + feeds = feedback().query(res['id']) res["message_files"] = [] - res["feedback"] = '' + res["feedback"] = {'rating':feeds['rating'] } if feeds != None else '' res["retriever_resources"] = [] res["created_at"] = 1723444905 res["agent_thoughts"] = [] @@ -440,48 +528,22 @@ async def query_conversations(user:str, first_id:str = None, limit:str = None, p async def query_parameters(user:str): params = parameter().get(user) if len(params) == 0: - params = { - "opening_statement": "您好,我是配网D3造价软件小助手,您可以问我有关配网造价软件的相关问题!", - "suggested_questions": [], - "suggested_questions_after_answer": { - "enabled": False - }, - "speech_to_text": { - "enabled": False - }, - "text_to_speech": { - "enabled": False, - "language": "", - "voice": "" - }, - "retriever_resource": { - "enabled": True - }, - "annotation_reply": { - "enabled": False - }, - "more_like_this": { - "enabled": False - }, - "user_input_form": [], - "sensitive_word_avoidance": { - "enabled": False - }, - "file_upload": { - "image": { - "enabled": False, - "number_limits": 3, - "transfer_methods": [ - "remote_url" - ] - } - }, - "system_parameters": { - "image_file_size_limit": "10" - } - } + params = BaseConfig().ParamterCfg() return params +@v.post("/messages/{message_id}/feedbacks") +async def post_feedbacks(request: Request,message_id:str,params:Dict[str,Any]): + if params['rating'] =='null': + feedback().delete(message_id) + else: + condition = {'id':message_id} + results = message().query(**condition) + if len(results) > 0: + result = results[0] + feedback().add(message_id=message_id,query=result['query'], + answer=result['answer'],rating=params['rating']) + @r.post("") def upload_file(request: ChatFileUploadRequest) -> List[str]: - pass \ No newline at end of file + pass + diff --git a/backend/app/api/routers/request/base.py b/backend/app/api/routers/request/base.py index bb90305..234323b 100644 --- a/backend/app/api/routers/request/base.py +++ b/backend/app/api/routers/request/base.py @@ -25,7 +25,7 @@ class conversations: return None def add(self,id:str, user_id:str, name:str): - template = BaseConfig.ConversationCfg + template = BaseConfig().ConversationCfg() template['id'] = id template['user_id'] = user_id template['name'] = name @@ -111,7 +111,7 @@ class message: return datas def add(self,user_id:str,conversation_id:str,query:str,answer:str): - template = BaseConfig.MessageCfg + template = BaseConfig.MessageCfg() template['id'] = str(uuid.uuid4()) template['user_id'] = user_id template['conversation_id'] = conversation_id @@ -122,4 +122,34 @@ class message: def delete(self,user_id:str): dbManage.delete(self._tableName,user_id = user_id) + def query(self,**condition): + results = [] + records = dbManage.query(self._tableName,**condition) + for record in records: + results.append(record.dict()) + return results +class feedback: + def __init__(self) -> None: + self._tableName = 'feedbacks' + dbManage.createTable(self._tableName) + + def add(self,message_id:str,query:str,answer:str,rating:str): + record = { + 'message_id': message_id, + 'query': query, + 'answer': answer, + 'rating': rating, + } + dbManage.addRecord(self._tableName,record) + + def delete(self,message_id:str): + cond = {'message_id':message_id} + dbManage.delete(self._tableName,**cond) + + def query(self,message_id:str): + cond = {'message_id':message_id} + records = dbManage.query(self._tableName,**cond) + if len(records) > 0: + return records[0].dict() + return None \ No newline at end of file diff --git a/backend/app/api/routers/request/baseConfig.py b/backend/app/api/routers/request/baseConfig.py index 7dce858..eb8515a 100644 --- a/backend/app/api/routers/request/baseConfig.py +++ b/backend/app/api/routers/request/baseConfig.py @@ -1,62 +1,71 @@ +from pydantic import BaseModel +import os +class BaseConfig(BaseModel): + projectInfo:str = os.getenv("PROJECT_TITLE","您好,我是博微工程理解小助手,您可以问我有关[线路工程]工程数据的相关问题!") -class BaseConfig: - ParamterCfg = { - "opening_statement": "您好,我是配网D3造价软件小助手,您可以问我有关配网造价软件的相关问题!", - "suggested_questions": [], - "suggested_questions_after_answer": { - "enabled": False - }, - "speech_to_text": { - "enabled": False - }, - "text_to_speech": { - "enabled": False, - "language": "", - "voice": "" - }, - "retriever_resource": { - "enabled": True - }, - "annotation_reply": { - "enabled": False - }, - "more_like_this": { - "enabled": False - }, - "user_input_form": [], - "sensitive_word_avoidance": { - "enabled": False - }, - "file_upload": { - "image": { + def ParamterCfg(self): + questions = os.getenv("CONVERSATION_STARTERS", "dev") + return{ + "opening_statement": self.projectInfo, + "suggested_questions": questions.split('\n'), + "suggested_questions_after_answer": { + "enabled": False + }, + "speech_to_text": { + "enabled": False + }, + "text_to_speech": { "enabled": False, - "number_limits": 3, - "transfer_methods": [ - "remote_url" - ] + "language": "", + "voice": "" + }, + "retriever_resource": { + "enabled": True + }, + "annotation_reply": { + "enabled": False + }, + "more_like_this": { + "enabled": False + }, + "user_input_form": [], + "sensitive_word_avoidance": { + "enabled": False + }, + "file_upload": { + "image": { + "enabled": False, + "number_limits": 3, + "transfer_methods": [ + "remote_url" + ] + } + }, + "system_parameters": { + "image_file_size_limit": "10" } - }, - "system_parameters": { - "image_file_size_limit": "10" } - } + + def ConversationCfg(self): + return{ + "id": "", + 'user_id':'', + "name": "", + "inputs": {}, + "status": "normal", + "introduction": self.projectInfo, + "created_at":'' + } - ConversationCfg = { - "id": "", - 'user_id':'', - "name": "", - "inputs": {}, - "status": "normal", - "introduction": ParamterCfg['opening_statement'], - "created_at":'' - } - - - MessageCfg = { + @classmethod + def MessageCfg(cls): + return { "id": "", 'user_id':'', "conversation_id": "", "inputs": {}, "query": "", "answer": "" - } \ No newline at end of file + } + + diff --git a/backend/app/api/routers/request/dbOrm.py b/backend/app/api/routers/request/dbOrm.py index 796b90c..38af99d 100644 --- a/backend/app/api/routers/request/dbOrm.py +++ b/backend/app/api/routers/request/dbOrm.py @@ -2,7 +2,7 @@ import os from typing import Dict, List, Any from pydantic import BaseModel -from sqlalchemy import create_engine, Column, String, Integer, JSON +from sqlalchemy import create_engine, Column, String, Integer, JSON,Float from sqlalchemy.engine.reflection import Inspector from sqlalchemy.orm import sessionmaker, declarative_base @@ -24,10 +24,6 @@ class ConversationOrm(Base): if 'name' in data: self.name = data['name'] - - - - class UserOrm(Base): __tablename__ = "user" @@ -51,6 +47,14 @@ class MessagesOrm(Base): query = Column(String) answer = Column(String) +class FeedBackOrm(Base): + __tablename__ = "feedbacks" + + message_id = Column(String,primary_key=True) + query = Column(String) + answer = Column(String) + rating = Column(String) + #数据结构 class ConversationModel(BaseModel): id: str @@ -61,7 +65,6 @@ class ConversationModel(BaseModel): created_at: int class Config: - #orm_mode = True from_attributes=True @classmethod @@ -73,7 +76,6 @@ class UserModel(BaseModel): createtime: str class Config: - #orm_mode = True from_attributes=True @classmethod @@ -86,7 +88,6 @@ class ParametersModel(BaseModel): value : Dict[str, Any] class Config: - #orm_mode = True from_attributes=True @classmethod @@ -101,13 +102,25 @@ class MessagesModel(BaseModel): answer : str class Config: - #orm_mode = True from_attributes=True @classmethod def orm(cls): return MessagesOrm +class FeedBackModel(BaseModel): + message_id :str + query :str + answer :str + rating :str + + class Config: + from_attributes=True + + @classmethod + def orm(cls): + return FeedBackOrm + class DBManager: def __init__(self) -> None: DATABASE_URL = os.getenv("SQLITE_DATABASE_URL") diff --git a/backend/app/api/routers/request/models.py b/backend/app/api/routers/request/models.py index d76af75..c06204a 100644 --- a/backend/app/api/routers/request/models.py +++ b/backend/app/api/routers/request/models.py @@ -1,7 +1,7 @@ from typing import Dict, Any from pydantic import BaseModel - +from typing import Optional class ChatRequestData(BaseModel): inputs: Dict[str,Any] @@ -12,4 +12,6 @@ class ChatRequestData(BaseModel): conversation_id: str = None class ChatFileUploadRequest(BaseModel): - base64: str \ No newline at end of file + base64: str + +