"""🤖 Agentic RAG Agent - 您的AI知识助手! 这个高级示例展示了如何构建一个复杂的RAG(检索增强生成)系统, 利用向量搜索和LLMs从任何知识库中提供深入见解。 该代理可以: - 处理和理解来自多个来源的文档(PDF、网站、文本文件) - 使用向量嵌入构建可搜索的知识库 - 跨会话维护对话上下文和记忆 - 为其响应提供相关引用和来源 - 生成摘要并提取关键见解 - 回答后续问题和澄清 可以尝试的示例查询: - "本文档的关键要点是什么?" - "你能总结主要论点和支持证据吗?" - "有哪些重要地统计数据和发现?" - "这与[主题X]有什么关系?" - "这个分析有哪些局限性或空白?" - "你能更详细地解释[概念X]吗?" - "其他来源支持或反驳这些主张吗?" 该代理使用: - 向量相似性搜索进行相关文档检索 - 对话记忆用于上下文响应 - 引用跟踪用于来源归属 - 动态知识库更新 查看README了解如何运行应用程序。 """ import json from pathlib import Path from textwrap import dedent from agno.document.chunking.document import DocumentChunking from agno.memory.workflow import WorkflowMemory from agno.models.deepseek import DeepSeek from agno.models.message import Message from agno.run.response import RunResponse from agno.storage.base import Storage from agno.storage.sqlite import SqliteStorage from agno.utils.log import logger from agno.workflow import Workflow from dotenv import load_dotenv # 加载.env文件 load_dotenv() import os from typing import Optional, Iterator, Dict, Any, Union, List from agno.agent import Agent, AgentMemory from agno.embedder.openai import OpenAIEmbedder from agno.knowledge import AgentKnowledge from agno.memory.classifier import MemoryClassifier from agno.memory.db.sqlite import SqliteMemoryDb from agno.memory.manager import MemoryManager from agno.memory.summarizer import MemorySummarizer from agno.models.openai import OpenAIChat from agno.storage.json import JsonStorage from agno.vectordb.lancedb import LanceDb from agno.vectordb.search import SearchType from agno.document.reader.json_reader import JSONReader from agno.document.reader.csv_reader import CSVReader from agno.document.reader.pdf_reader import PDFReader from agno.document.reader.text_reader import TextReader #db_url = "postgresql+psycopg://ai:ai@localhost:5532/ai" api_key = os.getenv("API_KEY") embedding_model = os.getenv("EMBEDDING_MODEL") embedding_baseUrl = os.getenv("EMBEDDING_BASE_URL") model_baseUrl = os.getenv("MODEL_BASE_URL") cwd = Path(__file__).parent.resolve() tmp = cwd.joinpath("tmp") if not tmp.exists(): tmp.mkdir(exist_ok=True, parents=True) work_context = "" def get_sofeware_work_context() -> str: """返回当前用户使用软件时所处环境.""" global work_context return work_context def set_sofeware_work_context(context : str): global work_context work_context = context def get_reader(file_type: str): """Return appropriate reader based on file type.""" readers = { "pdf": PDFReader(), "csv": CSVReader(), "txt": TextReader(), "md": TextReader(), "json": JSONReader(), } return readers.get(file_type.lower(), None) def get_model_by_provider(provider: str, model_name: str, temperature: float = None): """根据提供商获取对应的模型实例""" if provider == "openai": model = OpenAIChat(id=model_name, base_url=model_baseUrl, api_key=api_key, temperature=temperature) model.role_map = { "system": "system", "user": "user", "assistant": "assistant", "tool": "tool", "model": "assistant", } return model # elif provider == "google": # return Gemini(id=model_name) # elif provider == "anthropic": # return Claude(id=model_name) # elif provider == "groq": # return Groq(id=model_name) elif provider == "deepseek": return DeepSeek(id=model_name, base_url=model_baseUrl, api_key=api_key) else: raise ValueError(f"Unsupported model provider: {provider}") def initialize_memory(model) -> AgentMemory: """初始化并返回配置好的AgentMemory实例""" return AgentMemory( db=SqliteMemoryDb( table_name="agent_memory", db_file=os.getenv("MEMORY_DB_FILE", "tmp/agent_memory.db"), ), # 在Sqlite中持久化记忆 classifier=MemoryClassifier(model=model), summarizer=MemorySummarizer(model=model), manager=MemoryManager(model=model), create_user_memories=True, # 存储用户偏好 #create_session_summary=True, # 存储对话摘要 ) def initialize_vector_db() -> LanceDb: """初始化并返回配置好的LanceDb实例""" return LanceDb( table_name="knowledge", uri=os.getenv("VECTOR_DB_PATH", "tmp/knowledgedb"), search_type=SearchType.hybrid, embedder=OpenAIEmbedder(id=embedding_model, base_url=embedding_baseUrl, api_key=api_key) ) def initialize_mingci_vector_db() -> LanceDb: """初始化并返回配置好的LanceDb实例""" return LanceDb( table_name="mingci", uri=os.getenv("MINGCI_VECTOR_DB_PATH", "tmp/mingcidb"), search_type=SearchType.vector, embedder=OpenAIEmbedder(id=embedding_model, base_url=embedding_baseUrl, api_key=api_key) ) def initialize_knowledge_base() -> AgentKnowledge: """初始化并返回配置好的AgentKnowledge实例""" return AgentKnowledge( vector_db=initialize_vector_db(), num_documents=3, # 检索3个最相关的文档 chunking_strategy=DocumentChunking( chunk_size=500, overlap=50, ), # 固定大小分块 optimize_on=1000, # 每1000条数据进行向量优化 reader=TextReader(), # 默认文本读取器 ) def initialize_mingci_knowledge_base() -> AgentKnowledge: """初始化并返回配置好的AgentKnowledge实例""" return AgentKnowledge( vector_db=initialize_mingci_vector_db(), num_documents=1, # 检索1个最相关的文档 chunking_strategy=DocumentChunking( chunk_size=500, overlap=50, ), # 固定大小分块 optimize_on=1000, # 每1000条数据进行向量优化 reader=TextReader(), # 默认文本读取器 ) def get_question_agent( model_id: str = "openai:gpt-4o", user_id: Optional[str] = None, session_id: Optional[str] = None, debug_mode: bool = True, ) -> Agent: """获取一个带有记忆功能的Questions代理。""" """获取一个带有记忆功能的Agentic RAG代理。""" # 解析模型提供商和名称 provider, model_name = model_id.split(":") model = get_model_by_provider(provider, model_name, temperature=0) # 初始化记忆系统 #memory = initialize_memory(model) # 初始化知识库 knowledge_base = initialize_mingci_knowledge_base() description = """ # 电力造价软件操作问题转换工具(严格模式) ## 核心指令 你只能输出与**电力造价软件操作**直接相关的专业问题改写结果,禁止任何解释或中间步骤。 ## 角色定义 你是一个**严格的问题转换工具**,仅将用户问题转换为电力造价专业表述,**禁止任何额外输出**。 ## 绝对禁止事项 - ❌ 禁止强调软件 - ❌ 禁止添加解释性文字(如“注:”“提示:”等) - ❌ 禁止反问句(如“请问”“您想了解什么?”) - ❌ 禁止分步过程说明(如“下一步需要...”) - ❌ 禁止使用人称代词(你/我/您) ## 必须执行的操作流程 1. **关键词提取** - 立即识别问题中的**基础词语**(动词/名词/形容词) - 格式示例:`"项目" "划分" "cost" "control"` - **不输出**,直接进入下一步 2. **专业术语查询** - 强制自动执行 `search_knowledge_base` 工具 - 等待返回结果后继续 3. **问题改写** - 严格使用知识库返回的**专业术语**(带引号,如`"工程量清单"`) - 如果问题仅是一个**专业术语**,则用户是想知道该**专业术语**的操作入口 - 仅输出最终改写结果 ## 输出规范 - **唯一允许格式**: `[改写后的专业问题]` - **错误示例**(将触发终止): ❌ `"涉及...管理"` ❌ `"请问您需要..."` ❌ `"注:已提取关键词..."` ## 违规处罚 若违反上述规则,立即终止并输出: `[ERROR_STRICT_MODE: 检测到禁止内容]` ## 正确案例 ✅ `"在哪里查看'项目划分'?"` ✅ `"如何编辑'项目划分'的'取费表'属性?"` --- **立即执行**:输入问题后,自动完成`提取→查询→改写`,**仅输出最终结果**。 """ instructions = """ 1. 识别关键词 请识别问题中的关键词,需要中英文均有,可以适量补充不在问题中但相关的关键词。 关键词尽量切分为动词、名词、或形容词等单独的词,不要长词组(目的是更好的匹配检索到语义相关但表述不同的相关资料)。 关键词间以空格分割,比如: "关键词1" "关键词2" "keyword3" "keyword4" 2.搜索知识库中的电力造价专业专有名词 必须始终使用工具 search_knowledge_base 来搜索出所有和关键词相关的电力造价专业及软件业务对象、业务属性的专有名词。 3. 改写用户问题 请结合知识库中返回的电力造价专业专有名词改写用户的输入问题,将问题中的同义词替换成专业词汇,然后将用户问题改写成用电力造价专业描述方式输出完整清晰的问句。 4. 注意事项 不需要反问,不需要补充背景,仅输出第3步中最终改写后的问句。 注意:如果用户问题中有知识库返回的电力造价专业专有名词,请务必将该专业名词用引号包起来,以保留该专业名词完整,不要被拆分成多个基础词语。 """ # 创建代理 rag_agent: Agent = Agent( name="博微用户问题理解助手", session_id=session_id, # 跟踪会话ID以实现持久对话 user_id=user_id, model=model, storage=JsonStorage(dir_path=os.getenv("SESSION_STORAGE_PATH", "tmp/question_agent_sessions_json")), # 持久化会话数据 #memory=memory, # 为代理添加记忆功能 knowledge=knowledge_base, # 添加知识库 description=description, #instructions=instructions, search_knowledge=True, # 此设置赋予模型搜索知识库信息的工具 markdown=True, # 此设置告诉模型以markdown格式格式化消息 show_tool_calls=True, add_datetime_to_instructions=True, debug_mode=debug_mode, read_tool_call_history=True, num_history_responses=3, save_response_to_file=str(tmp.joinpath("msg/question_{message}_{run_id}.md")), ) return rag_agent def get_answer_agent( model_id: str = "openai:gpt-4o", user_id: Optional[str] = None, session_id: Optional[str] = None, debug_mode: bool = True, ) -> Agent: """获取一个带有记忆功能的Questions代理。""" """获取一个带有记忆功能的Agentic RAG代理。""" # 解析模型提供商和名称 provider, model_name = model_id.split(":") model = get_model_by_provider(provider, model_name, 0.3) # 初始化记忆系统 memory = initialize_memory(model) # 初始化知识库 knowledge_base = initialize_knowledge_base() description = """ 你是一个智能助手,专门为[博微配网计价通D3软件]提供使用支持。你的任务是帮助用户理解和使用这个复杂的配电网工程造价软件系统。 软件特点 1.多页面架构:软件由多个功能页面组成 2.复杂控件布局:每个页面包含多种控件(如列表控件、TAB控件、按钮等) 3.业务对象丰富:涉及"取费表"、"项目划分"、"工程量"等多种业务对象 4.操作多样:支持"添加"、"修改"、"删除"、"导入"、"导出"等多种操作 """ instructions = """ 1. 理解用户问题 - 用户正在使用软件过程中遇到问题,向您请求帮助 - 只从用户问题识别中提到的业务对象(如"如何设置取费费率"→"取费表") - 只从用户问题识别业务对象的属性字段(如"如何设置取费费率"→"费率") - 只从用户问题识别用户想要执行的操作(如"如何设置取费费率"→"设置") - 判断问题类型(功能入口、操作步骤、错误处理等) - **不输出**,直接进入下一步 2. 改写问题 - 将用户问题改写为包含以下要素的标准查询: - [问题类型] : [操作类型] + [业务对象] + [属性] - **不输出** - [属性]只有用户问题中明确包含才改写,否则为未知。 - [问题类型]、[操作类型]、[业务对象]为必须输入,如果缺少任一个都需追问用户补全才能进入下一步。 示例: 原始问题:"如何设置取费费率?" 改写后:"操作步骤 : 设置 - 取费 - 费率" 3.搜索知识库 - 必须始终使用工具 search_knowledge_base 来搜索知识库 - 在回应前彻底分析所有返回的文档 - 如果返回多个文档,需连贯地综合信息 - **不输出**,直接进入下一步 4. 上下文管理: - 使用工具 get_chat_history 保持对话连续性 - 相关时引用之前的交互 - 记录用户偏好和之前的澄清 5. 结果呈现要求 - 用户所处环境如下: {sofeware_work_context} - 以 makedown 格式输出,注意换行和排版 - 避免使用'根据我的知识'或'取决于信息'等模糊表述 7. 特殊情况处理 - 如果问题不明确,可以反问请求澄清 - 如果知识库搜索无结果,则直接明确回复不知道 - 对于错误提示,先解释含义再直接回复无法解决 """ # 创建代理 rag_agent: Agent = Agent( name="博微软件AI助手", session_id=session_id, # 跟踪会话ID以实现持久对话 user_id=user_id, model=model, storage=JsonStorage(dir_path=os.getenv("SESSION_STORAGE_PATH", "tmp/answer_agent_sessions_json")), # 持久化会话数据 memory=memory, # 为代理添加记忆功能 knowledge=knowledge_base, # 添加知识库 description=description, instructions=instructions, search_knowledge=True, # 此设置赋予模型搜索知识库信息的工具 #read_chat_history=True, # 此设置赋予模型获取聊天历史的工具 # tools=[DuckDuckGoTools()], markdown=True, # 此设置告诉模型以markdown格式格式化消息 # add_chat_history_to_messages=True, show_tool_calls=True, add_history_to_messages=True, # 将聊天历史添加到消息中 add_datetime_to_instructions=True, add_name_to_instructions=True, debug_mode=debug_mode, read_tool_call_history=True, num_history_responses=3, save_response_to_file=str(tmp.joinpath("msg/answer_{message}_{run_id}.md")), ) return rag_agent class QuestionAndAnswerGenerator(Workflow): description: str = dedent("""\ An intelligent blog post generator that creates engaging, well-researched content. This workflow orchestrates multiple AI agents to research, analyze, and craft compelling blog posts that combine journalistic rigor with engaging storytelling. The system excels at creating content that is both informative and optimized for digital consumption. """) question: Agent answer: Agent def __init__( self, *, name: Optional[str] = None, workflow_id: Optional[str] = None, description: Optional[str] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, session_name: Optional[str] = None, session_state: Optional[Dict[str, Any]] = None, memory: Optional[WorkflowMemory] = None, storage: Optional[Storage] = None, extra_data: Optional[Dict[str, Any]] = None, debug_mode: bool = False, monitoring: bool = False, telemetry: bool = True, ): super().__init__( name=name, workflow_id=workflow_id, description=description, user_id=user_id, session_id=session_id, session_name=session_name, session_state=session_state, memory=memory, storage=storage, extra_data=extra_data, debug_mode=debug_mode, monitoring=monitoring, telemetry=telemetry, ) def run( self, message: Optional[Union[str, List, Dict, Message]] = None, stream: bool = False, ) -> Iterator[RunResponse]: logger.info(f"Generating a blog post on: {message}") # Run the writer and yield the response question_spsponse: RunResponse = self.question.run(message, stream=stream); if(question_spsponse is None and question_spsponse.content is None): yield RunResponse( content=f"对不起, question 发生错误: {str(question_spsponse.error)}", error=question_spsponse.error, tools=question_spsponse.tools, ) return answer_spsponse: RunResponse = self.question.run(question_spsponse.content, stream=stream); if (answer_spsponse is None and answer_spsponse.content is None): yield RunResponse( content=f"对不起, question 发生错误: {str(answer_spsponse.error)}", error=answer_spsponse.error, tools=answer_spsponse.tools, ) return yield answer_spsponse def get_workflow( model_id: str = "openai:gpt-4o", user_id: Optional[str] = None, session_id: Optional[str] = None, debug_mode: bool = True, ) -> QuestionAndAnswerGenerator: qa_workflow = QuestionAndAnswerGenerator( user_id=user_id, session_id=session_id, #storage=SqliteStorage( # table_name="investment_report_workflows", # db_file="tmp/agno_workflows.db", # auto_upgrade_schema=True, #), memory=WorkflowMemory(), debug_mode=debug_mode, ) qa_workflow.question = get_question_agent(model_id, user_id, session_id, debug_mode) qa_workflow.answer = get_answer_agent(model_id, user_id, session_id, debug_mode) return qa_workflow mingci_knowledge_base = initialize_mingci_knowledge_base() def search_mingci_knowledge(query: str) -> str: """Use this function to search the mingci knowledge for information about a query. Args: query: The query to search for. Returns: str: A string containing the response from the mingci knowledge. """ global mingci_knowledge_base from agno.document import Document docs: List[Document] = mingci_knowledge_base.search(query=query, num_documents=1) if len(docs) == 0: return "No documents found" return "\n\n".join([doc.content for doc in docs]) def get_agentic_rag_agent( model_id: str = "openai:gpt-4o", user_id: Optional[str] = None, session_id: Optional[str] = None, debug_mode: bool = True, ) -> Agent: """获取一个带有记忆功能的Agentic RAG代理。""" # 解析模型提供商和名称 provider, model_name = model_id.split(":") model = get_model_by_provider(provider, model_name, 0.3) # 初始化记忆系统 memory = initialize_memory(model) # 初始化知识库 knowledge_base = initialize_knowledge_base() description=""" 你是一个智能助手,专门为[博微配网计价通D3软件]提供使用支持。你的任务是帮助用户理解和使用这个复杂的配电网工程造价软件系统, 交谈中的"它"指定当前软件。 软件特点 1.多页面架构:软件整个界面由多个编辑页面组成,通过顶部的页签切换页面 2.复杂控件布局:每个编辑页面内包含多种控件(如列表控件、TAB控件、树列表控件、按钮等) 3.业务对象丰富:每个控件可能用于展示和编辑零至多个业务对象。(如"取费表"、"项目划分"、"工程量"等业务对象) 4.操作多样:支持"添加"、"修改"、"删除"、"导入"、"导出"、"右键"等多种操作 """ instructions=""" 1. **关键词提取** - 立即识别问题中的**基础词语**(动词/名词/形容词) - 格式示例:`"项目" "划分" "cost" "control"` - **不输出**,直接进入下一步 2. **专业术语查询** - 强制自动执行 `search_mingci_knowledge` 工具 - 等待返回结果后继续 3. **问题改写** - 问题改写严格仅使用知识库返回的**专业术语**和**同义词**,必须完全匹配才能替换,不使用上下文等其他信息(带引号,如`"工程量清单"`) - 如果问题仅是一个**专业术语**,则用户是想知道该**专业术语**的操作入口 4. **问题结构化** - 判断问题类型(功能入口、操作步骤、错误处理等) - 将第三步问题改写的问句解析为包含以下要素的标准查询: - [问题类型] : [操作类型] + [业务对象] + [业务属性] - **不输出** - [业务属性]只有用户问题中明确包含才改写,否则为未知。 - [问题类型]、[操作类型]、[业务对象]为必须输入,如果缺少任一个都需追问用户补全才能进入下一步。 示例: 原始问题:"如何设置取费费率?" 改写后:"操作步骤 : 设置 - 取费 - 费率" 5.**搜索知识库** - 必须始终使用工具 search_knowledge_base 来搜索知识库 - 在回应前彻底分析所有返回的文档 - 如果返回多个文档,需连贯地综合信息 - **不输出**,直接进入下一步 6. **上下文管理** - 使用工具 get_chat_history 保持对话连续性 - 相关时引用之前的交互 - 记录用户偏好和之前的澄清 - 用户消息中的{context}中的软件环境上下文如下: {软件上下文} 7. **结果呈现要求** - 避免使用'根据我的知识'或'取决于信息'等模糊表述 - 注意 在当前环境下"工程"和"项目"并非同义词,请勿相互替换 - 回答问题时请必须从用户消息中的"软件上下文"中读取当前页面信息,然后从当前页面指引用户如何继续操作 - **首先以 makedown 格式以向用户确认的口吻输出改写后的完整问句** (如 您是想询问:`改写后的问句`) - **其次换行后以 makedown 格式输出回答信息** 8. 特殊情况处理 - 如果问题不明确,可以反问请求澄清 - 如果搜索知识库返回"No documents found"无结果,则直接明确回复当前知识库中不存在该知识 - 对于错误提示,直接告知错误原因即可 """ # 创建代理 agentic_rag_agent: Agent = Agent( name="博微软件AI助手", session_id=session_id, # 跟踪会话ID以实现持久对话 user_id=user_id, model=model, storage=JsonStorage(dir_path=os.getenv("SESSION_STORAGE_PATH", "tmp/agent_sessions_json")), # 持久化会话数据 memory=memory, # 为代理添加记忆功能 knowledge=knowledge_base, # 添加知识库 description=description, instructions=instructions, context={"软件上下文": get_sofeware_work_context}, add_context=True, search_knowledge=True, # 此设置赋予模型搜索知识库信息的工具 read_chat_history=True, # 此设置赋予模型获取聊天历史的工具 tools=[search_mingci_knowledge], markdown=True, # 此设置告诉模型以markdown格式格式化消息 # add_chat_history_to_messages=True, show_tool_calls=True, #add_history_to_messages=True, # 将聊天历史添加到消息中 add_datetime_to_instructions=True, add_name_to_instructions=True, debug_mode=debug_mode, #read_tool_call_history=True, num_history_responses=3, save_response_to_file=str(tmp.joinpath("msg/answer_{message}_{run_id}.md")), ) return agentic_rag_agent