diff --git a/agentic_rag.py b/agentic_rag.py index a5a9987..908e87d 100644 --- a/agentic_rag.py +++ b/agentic_rag.py @@ -28,17 +28,26 @@ 查看README了解如何运行应用程序。 """ +import json from pathlib import Path +from textwrap import dedent from agno.document.chunking.document import DocumentChunking +from agno.memory.workflow import WorkflowMemory from agno.models.deepseek import DeepSeek +from agno.models.message import Message +from agno.run.response import RunResponse +from agno.storage.base import Storage +from agno.storage.sqlite import SqliteStorage +from agno.utils.log import logger +from agno.workflow import Workflow from dotenv import load_dotenv # 加载.env文件 load_dotenv() import os -from typing import Optional +from typing import Optional, Iterator, Dict, Any, Union, List from agno.agent import Agent, AgentMemory from agno.embedder.openai import OpenAIEmbedder @@ -126,7 +135,7 @@ def initialize_memory(model) -> AgentMemory: create_user_memories=True, # 存储用户偏好 #create_session_summary=True, # 存储对话摘要 ) - + def initialize_vector_db() -> LanceDb: """初始化并返回配置好的LanceDb实例""" return LanceDb( @@ -141,7 +150,7 @@ def initialize_mingci_vector_db() -> LanceDb: return LanceDb( table_name="mingci", uri=os.getenv("MINGCI_VECTOR_DB_PATH", "tmp/mingcidb"), - search_type=SearchType.hybrid, + search_type=SearchType.keyword, embedder=OpenAIEmbedder(id=embedding_model, base_url=embedding_baseUrl, api_key=api_key) ) @@ -171,6 +180,260 @@ def initialize_mingci_knowledge_base() -> AgentKnowledge: reader=TextReader(), # 默认文本读取器 ) +def get_question_agent( + + model_id: str = "openai:gpt-4o", + user_id: Optional[str] = None, + session_id: Optional[str] = None, + debug_mode: bool = True, +) -> Agent: + """获取一个带有记忆功能的Questions代理。""" + """获取一个带有记忆功能的Agentic RAG代理。""" + # 解析模型提供商和名称 + provider, model_name = model_id.split(":") + model = get_model_by_provider(provider, model_name) + + # 初始化记忆系统 + memory = initialize_memory(model) + + # 初始化知识库 + knowledge_base = initialize_knowledge_base() + + description = """ + 你是一个智能助手,专门为电力造价软件[博微配网计价通D3软件]提供使用支持。你的任务是与理解用户的问题, + 将用户口语化、简化、省略化的电力造价的专业问题改写成对电力造价软件[博微配网计价通D3软件]的书面化、完整的电力专业问题的问句。 + """ + + instructions = """ + 1. 提取关键词 + 请提取问题中的关键词,需要中英文均有,可以适量补充不在问题中但相关的关键词。 + 关键词尽量切分为动词、名词、或形容词等单独的词,不要长词组(目的是更好的匹配检索到语义相关但表述不同的相关资料)。 + 关键词间以空格分割,比如: "关键词1" "关键词2" "keyword3" "keyword4" + 2.搜索知识库中的电力造价专业专有名词 + 必须始终使用工具 search_knowledge_base 来搜索出所有和关键词相关的电力造价专业及软件业务对象、业务属性的专有名词。 + 3. 改写用户问题 + 请结合知识库中返回的电力造价专业专有名词改写用户的输入问题,将问题中的简写、缩写、口语转化成专业词汇后,电力造价专业描述方式输出完整清晰的问句。 + 4. 结构化问题 + 解析改写后的用户问题,判断用户咨询问题的类型(功能入口、操作步骤、错误处理等) + 然后将用户问题转化为JSON格式的标准查询 + 例如:"如何设置取费费率" 转化后: + { + "问题类型":"操作步骤", + "操作方法":"设置", + "业务对象":"取费", + "业务属性":"费率", + } + "问题类型"、"操作方法"、"业务对象"、"业务属性"如果在用户问题中识别不出来则为空。 + """ + + # 创建代理 + rag_agent: Agent = Agent( + name="博微用户问题理解助手", + session_id=session_id, # 跟踪会话ID以实现持久对话 + user_id=user_id, + model=model, + storage=JsonStorage(dir_path=os.getenv("SESSION_STORAGE_PATH", "tmp/answer_agent_sessions_json")), # 持久化会话数据 + memory=memory, # 为代理添加记忆功能 + knowledge=knowledge_base, # 添加知识库 + description=description, + instructions=instructions, + search_knowledge=True, # 此设置赋予模型搜索知识库信息的工具 + markdown=True, # 此设置告诉模型以markdown格式格式化消息 + show_tool_calls=True, + add_datetime_to_instructions=True, + debug_mode=debug_mode, + read_tool_call_history=True, + num_history_responses=3, + save_response_to_file=str(tmp.joinpath("msg/answer_{message}_{run_id}.md")), + ) + + return rag_agent + +def get_answer_agent( + model_id: str = "openai:gpt-4o", + user_id: Optional[str] = None, + session_id: Optional[str] = None, + debug_mode: bool = True, +) -> Agent: + """获取一个带有记忆功能的Questions代理。""" + """获取一个带有记忆功能的Agentic RAG代理。""" + # 解析模型提供商和名称 + provider, model_name = model_id.split(":") + model = get_model_by_provider(provider, model_name) + + # 初始化记忆系统 + #memory = initialize_memory(model) + + # 初始化知识库 + knowledge_base = initialize_mingci_knowledge_base() + + description = """ + 你是一个智能助手,专门为[博微配网计价通D3软件]提供使用支持。你的任务是帮助用户理解和使用这个复杂的配电网工程造价软件系统。 + 软件特点 + 1.多页面架构:软件由多个功能页面组成 + 2.复杂控件布局:每个页面包含多种控件(如列表控件、TAB控件、按钮等) + 3.业务对象丰富:涉及"取费表"、"项目划分"、"工程量"等多种业务对象 + 4.操作多样:支持"添加"、"修改"、"删除"、"导入"、"导出"等多种操作 + """ + + instructions = """ + 1. 理解用户问题 + 用户正在使用软件过程中遇到问题,向您请求帮助 + 用户所处环境如下: + {sofeware_work_context} + 只从用户问题识别中提到的业务对象(如"如何设置取费费率"→"取费表") + 只从用户问题识别业务对象的属性字段(如"如何设置取费费率"→"费率") + 只从用户问题识别用户想要执行的操作(如"如何设置取费费率"→"设置") + 判断问题类型(功能入口、操作步骤、错误处理等) + 2. 改写问题 + 将用户问题改写为包含以下要素的标准查询: + [问题类型] : [操作类型] + [业务对象] + [属性] + [属性]只有用户问题中明确包含才改写,否则为未知。 + [问题类型]、[操作类型]、[业务对象]为必须输入,如果缺少任一个都需追问用户补全才能进入下一步。 + 示例: + 原始问题:"如何设置取费费率?" + 改写后:"操作步骤 : 设置 - 取费 - 费率" + 3.搜索知识库 + 必须始终使用工具 search_knowledge_base 来搜索知识库 + 在回应前彻底分析所有返回的文档 + 如果返回多个文档,需连贯地综合信息 + 4. 上下文管理: + 使用工具 get_chat_history 保持对话连续性 + 相关时引用之前的交互 + 记录用户偏好和之前的澄清 + 5. 结果呈现要求 + 以 makedown 格式输出,注意换行和排版 + 避免使用'根据我的知识'或'取决于信息'等模糊表述 + 7. 特殊情况处理 + 如果问题不明确,可以反问请求澄清 + 如果知识库搜索无结果,则直接明确回复不知道 + 对于错误提示,先解释含义再直接回复无法解决 + """ + + # 创建代理 + rag_agent: Agent = Agent( + name="博微软件AI助手", + session_id=session_id, # 跟踪会话ID以实现持久对话 + user_id=user_id, + model=model, + storage=JsonStorage(dir_path=os.getenv("SESSION_STORAGE_PATH", "tmp/question_agent_sessions_json")), # 持久化会话数据 + #memory=memory, # 为代理添加记忆功能 + knowledge=knowledge_base, # 添加知识库 + description=description, + instructions=instructions, + search_knowledge=True, # 此设置赋予模型搜索知识库信息的工具 + #read_chat_history=True, # 此设置赋予模型获取聊天历史的工具 + # tools=[DuckDuckGoTools()], + markdown=True, # 此设置告诉模型以markdown格式格式化消息 + # add_chat_history_to_messages=True, + show_tool_calls=True, + add_history_to_messages=True, # 将聊天历史添加到消息中 + add_datetime_to_instructions=True, + add_name_to_instructions=True, + debug_mode=debug_mode, + read_tool_call_history=True, + num_history_responses=3, + save_response_to_file=str(tmp.joinpath("msg/answer_{message}_{run_id}.md")), + ) + + return rag_agent + +class QuestionAndAnswerGenerator(Workflow): + description: str = dedent("""\ + An intelligent blog post generator that creates engaging, well-researched content. + This workflow orchestrates multiple AI agents to research, analyze, and craft + compelling blog posts that combine journalistic rigor with engaging storytelling. + The system excels at creating content that is both informative and optimized for + digital consumption. + """) + question: Agent + answer: Agent + + def __init__( + self, + *, + name: Optional[str] = None, + workflow_id: Optional[str] = None, + description: Optional[str] = None, + user_id: Optional[str] = None, + session_id: Optional[str] = None, + session_name: Optional[str] = None, + session_state: Optional[Dict[str, Any]] = None, + memory: Optional[WorkflowMemory] = None, + storage: Optional[Storage] = None, + extra_data: Optional[Dict[str, Any]] = None, + debug_mode: bool = False, + monitoring: bool = False, + telemetry: bool = True, + ): + super().__init__( + name=name, + workflow_id=workflow_id, + description=description, + user_id=user_id, + session_id=session_id, + session_name=session_name, + session_state=session_state, + memory=memory, + storage=storage, + extra_data=extra_data, + debug_mode=debug_mode, + monitoring=monitoring, + telemetry=telemetry, + ) + + def run( + self, + message: Optional[Union[str, List, Dict, Message]] = None, + stream: bool = False, + ) -> Iterator[RunResponse]: + logger.info(f"Generating a blog post on: {message}") + # Run the writer and yield the response + question_spsponse: RunResponse = self.question.run(message, stream=stream); + if(question_spsponse is None + and question_spsponse.content is None): + yield RunResponse( + content=f"对不起, question 发生错误: {str(question_spsponse.error)}", + error=question_spsponse.error, + tools=question_spsponse.tools, + ) + return + + answer_spsponse: RunResponse = self.question.run(question_spsponse.content, stream=stream); + if (answer_spsponse is None + and answer_spsponse.content is None): + yield RunResponse( + content=f"对不起, question 发生错误: {str(answer_spsponse.error)}", + error=answer_spsponse.error, + tools=answer_spsponse.tools, + ) + return + + yield answer_spsponse + +def get_workflow( + model_id: str = "openai:gpt-4o", + user_id: Optional[str] = None, + session_id: Optional[str] = None, + debug_mode: bool = True, +) -> QuestionAndAnswerGenerator: + + qa_workflow = QuestionAndAnswerGenerator( + user_id=user_id, + session_id=session_id, + #storage=SqliteStorage( + # table_name="investment_report_workflows", + # db_file="tmp/agno_workflows.db", + # auto_upgrade_schema=True, + #), + memory=WorkflowMemory(), + debug_mode=debug_mode, + ) + qa_workflow.question = get_question_agent(model_id, user_id, session_id, debug_mode) + qa_workflow.answer = get_answer_agent(model_id, user_id, session_id, debug_mode) + + return qa_workflow + def get_agentic_rag_agent( model_id: str = "openai:gpt-4o", user_id: Optional[str] = None, diff --git a/app.py b/app.py index a79c58f..671168e 100644 --- a/app.py +++ b/app.py @@ -1,10 +1,12 @@ +from typing import Optional + from dotenv import load_dotenv # 加载.env文件 load_dotenv() import threading import nest_asyncio -from agentic_rag import get_agentic_rag_agent +from agentic_rag import get_agentic_rag_agent, get_workflow from agno.utils.log import logger from ui import ( initialize_ui, @@ -14,7 +16,6 @@ from ui import ( ) from utils import ( add_message, - session_selector_widget, ) import streamlit as st from extra_streamlit_components import CookieManager @@ -24,9 +25,13 @@ nest_asyncio.apply() lock = threading.Lock() -def initialize_agent(model_id: str): +def initialize_agent(model_id: str, session_id: Optional[str] = None): """Initialize or retrieve the Agentic RAG.""" lock.acquire() + + if session_id is None: + session_id = st.session_state.get("agentic_rag_agent_session_id") + try: if ( not "agentic_rag_agent" in st.session_state @@ -34,9 +39,13 @@ def initialize_agent(model_id: str): or st.session_state.get("current_model") != model_id ): logger.info(f"---*--- Creating {model_id} Agent ---*---") - agent = get_agentic_rag_agent( + #agent = get_agentic_rag_agent( + # model_id=model_id, + # session_id=session_id, + #) + agent = get_workflow( model_id=model_id, - session_id=st.session_state.get("agentic_rag_agent_session_id"), + session_id=session_id, ) st.session_state["agentic_rag_agent"] = agent st.session_state["current_model"] = model_id @@ -127,7 +136,7 @@ def main(): response = "" try: # Run the agent and stream the response - run_response = agentic_rag_agent.run(question, stream=True) + run_response = agentic_rag_agent.run(message=question, stream=False) for _resp_chunk in run_response: # Display tool calls if available #if _resp_chunk.tools and len(_resp_chunk.tools) > 0: @@ -148,7 +157,44 @@ def main(): #################################################################### # Session selector #################################################################### - session_selector_widget(agentic_rag_agent, model_id) + if agentic_rag_agent.storage: + agent_sessions = agentic_rag_agent.storage.get_all_sessions() + # Get session names if available, otherwise use IDs + session_options = [] + for session in agent_sessions: + session_id = session.session_id + session_name = ( + session.session_data.get("session_name", None) + if session.session_data + else None + ) + display_name = session_name if session_name else session_id + session_options.append({"id": session_id, "display": display_name}) + + # Display session selector + # selected_session = st.sidebar.selectbox( + # "会话", + # options=[s["display"] for s in session_options], + # key="session_selector", + # ) + # Find the selected session ID + # selected_session_id = next( + # s["id"] for s in session_options if s["display"] == selected_session + # ) + if len(session_options) > 0: + selected_session_id = session_options[0]["id"] + + if ('agentic_rag_agent_session_id' in st.session_state and + selected_session_id is not None and + st.session_state["agentic_rag_agent_session_id"] != selected_session_id): + logger.info( + f"---*--- Loading {model_id} run: {selected_session_id} ---*---" + ) + st.session_state["agentic_rag_agent"] = initialize_agent( + model_id=model_id, + session_id=selected_session_id, + ) + st.rerun() #rename_session_widget(agentic_rag_agent) #################################################################### diff --git a/utils.py b/utils.py index 58ca9bb..8b86c1c 100644 --- a/utils.py +++ b/utils.py @@ -1,4 +1,5 @@ from dotenv import load_dotenv + # 加载.env文件 load_dotenv() @@ -108,48 +109,6 @@ def rename_session_widget(agent: Agent) -> None: st.session_state.session_edit_mode = False st.rerun() - -def session_selector_widget(agent: Agent, model_id: str) -> None: - """Display a session selector in the sidebar""" - - if agent.storage: - agent_sessions = agent.storage.get_all_sessions() - # Get session names if available, otherwise use IDs - session_options = [] - for session in agent_sessions: - session_id = session.session_id - session_name = ( - session.session_data.get("session_name", None) - if session.session_data - else None - ) - display_name = session_name if session_name else session_id - session_options.append({"id": session_id, "display": display_name}) - - # Display session selector - #selected_session = st.sidebar.selectbox( - # "会话", - # options=[s["display"] for s in session_options], - # key="session_selector", - #) - # Find the selected session ID - #selected_session_id = next( - # s["id"] for s in session_options if s["display"] == selected_session - #) - if len(session_options) > 0: - selected_session_id = session_options[0]["id"] - - if st.session_state["agentic_rag_agent_session_id"] != selected_session_id: - logger.info( - f"---*--- Loading {model_id} run: {selected_session_id} ---*---" - ) - st.session_state["agentic_rag_agent"] = get_agentic_rag_agent( - model_id=model_id, - session_id=selected_session_id, - ) - st.rerun() - - def about_widget() -> None: """Display an about section in the sidebar""" st.sidebar.markdown("---")