import os from typing import List import streamlit as st from agno.agent import Agent from agno.utils.log import logger from agentic_rag import work_context, set_sofeware_work_context from utils import ( CUSTOM_CSS, add_message, export_chat_history, ) def initialize_ui(): """Initialize Streamlit UI configuration""" st.set_page_config( page_title="智能检索增强生成", page_icon="💎", layout="wide", initial_sidebar_state="expanded", ) st.markdown(CUSTOM_CSS, unsafe_allow_html=True) def show_header(): """Display application header""" st.markdown("
由Agno驱动的智能研究助手
", # unsafe_allow_html=True, #) def set_current_page(page : str): set_sofeware_work_context({ "软件": "博微配网计价通D3软件", "工程文件": "广州配网造价工程", "所有编辑页面": ["工程信息", "取费设置", "组合件", "工程量", "材机分析", "工程费用", "报表输出"], "当前页面": page, }) def show_tabs(): """Display tabs in main content area""" tabs = [ {"name": "主页", "icon": "💬", "image": "static/images/主页.png"}, {"name": "工程信息", "icon": "📚", "image": "static/images/工程信息.png"}, {"name": "取费设置", "icon": "⚙️", "image": "static/images/取费设置.png"}, {"name": "组合件", "icon": "⚙️", "image": "static/images/组合件.png"}, {"name": "工程量", "icon": "⚙️", "image": "static/images/工程量.png"}, {"name": "材机分析", "icon": "⚙️", "image": "static/images/材机分析.png"}, {"name": "工程费用", "icon": "⚙️", "image": "static/images/工程费用.png"}, {"name": "报表输出", "icon": "⚙️", "image": "static/images/报表输出.png"}, ] tabNames = ["主页", "工程信息", "取费设置", "组合件", "工程量", "材机分析", "工程费用", "报表输出"] selected_tab = st.radio("页面导航:", options=tabNames, index=0, horizontal=True) if selected_tab in tabNames: set_current_page(selected_tab) st.image(f"static/images/{selected_tab}.png") #tab_objects = st.tabs([f"{tab['name']}" for tab in tabs]) # with tab_objects[0]: # st.image(tabs[0]["image"]) # with tab_objects[1]: # st.image(tabs[1]["image"]) # with tab_objects[2]: # st.image(tabs[2]["image"]) # with tab_objects[3]: # st.image(tabs[3]["image"]) # with tab_objects[4]: # st.image(tabs[4]["image"]) # with tab_objects[5]: # st.image(tabs[5]["image"]) # with tab_objects[6]: # st.image(tabs[6]["image"]) # with tab_objects[7]: # st.image(tabs[7]["image"]) # for index, value in enumerate(tabs): # with tab_objects[index]: # st.image(value["image"]) # for index, value in enumerate(tabs): # if tabs[index]["name"] == selected_tab: # set_current_page(value["name"]) # model_options = { # "Qwen2.5-72B": "openai:Qwen/Qwen2.5-72B-Instruct", # "o3-mini": "openai:o3-mini", # "gpt-4o": "openai:gpt-4o", # "gemini-2.0-flash-exp": "google:gemini-2.0-flash-exp", # "claude-3-5-sonnet": "anthropic:claude-3-5-sonnet-20241022", # "llama-3.3-70b": "groq:llama-3.3-70b-versatile", # } model_options = dict(item.split('=') for item in os.getenv("MODEL_LIST", "Qwen2.5-72B=openai:Qwen/Qwen2.5-72B-Instruct").split('&')) def get_modul_option(id: int = 0) -> str: """Return the selected module option""" return model_options[list(model_options.keys())[id]] def show_model_selector(index: int = 0) -> str: """Display model selection dialog""" selected_model = st.selectbox( "选择模型", options=list(model_options.keys()), index=index, key="model_selector", ) session.set("model_index", selected_model) return model_options[selected_model] def show_dialog_components(agent: Agent): """Display all sidebar components in a dialog""" with st.expander("⚙️ 设置", expanded=False): st.markdown("#### 📚 文档管理") input_url = st.text_input("添加URL到知识库") if ( input_url and not prompt and not st.session_state.knowledge_base_initialized ): # Only load if KB not initialized if input_url not in st.session_state.loaded_urls: alert = st.sidebar.info("Processing URLs...", icon="ℹ️") if input_url.lower().endswith(".pdf"): try: # Download PDF to temporary file response = requests.get(input_url, stream=True, verify=False) response.raise_for_status() with tempfile.NamedTemporaryFile( suffix=".pdf", delete=False ) as tmp_file: for chunk in response.iter_content(chunk_size=8192): tmp_file.write(chunk) tmp_path = tmp_file.name reader = PDFReader() docs: List[Document] = reader.read(tmp_path) # Clean up temporary file os.unlink(tmp_path) except Exception as e: st.sidebar.error(f"Error processing PDF: {str(e)}") docs = [] else: scraper = WebsiteReader(max_links=2, max_depth=1) docs: List[Document] = scraper.read(input_url) if docs: agentic_rag_agent.knowledge.load_documents(docs, upsert=True) st.session_state.loaded_urls.add(input_url) st.sidebar.success("URL已添加到知识库") else: st.sidebar.error("无法处理提供的URL") alert.empty() else: st.sidebar.info("URL已加载到知识库") # 修正缩进,使其与上下文一致 uploaded_file = st.sidebar.file_uploader( "添加文档(.pdf,.csv,.json,.md或.txt)", key="file_upload" ) if ( uploaded_file and not prompt and not st.session_state.knowledge_base_initialized ): # Only load if KB not initialized file_identifier = f"{uploaded_file.name}_{uploaded_file.size}" if file_identifier not in st.session_state.loaded_files: alert = st.sidebar.info("正在处理文档...", icon="ℹ️") file_type = uploaded_file.name.split(".")[-1].lower() reader = get_reader(file_type) if reader: docs = reader.read(uploaded_file) agentic_rag_agent.knowledge.load_documents(docs, upsert=True) st.session_state.loaded_files.add(file_identifier) st.sidebar.success(f"{uploaded_file.name}已添加到知识库") st.session_state.knowledge_base_initialized = True alert.empty() else: st.sidebar.info(f"{uploaded_file.name}已加载到知识库") if st.sidebar.button("清空知识库"): agentic_rag_agent.knowledge.vector_db.delete() st.session_state.loaded_urls.clear() st.session_state.loaded_files.clear() st.session_state.knowledge_base_initialized = False # Reset initialization flag st.sidebar.success("知识库已清空") def show_sample_questions(): """Display sample questions section""" st.markdown("#### ❓ 示例问题") if st.button("📝 总结"): add_message( "user", "你能总结一下当前知识库中的内容吗(使用`search_knowledge_base`工具)?", ) def show_utility_buttons(): """Display utility buttons section""" st.markdown("#### 🛠️ 工具") col1, col2 = st.columns([1, 1]) with col1: if st.button("🔄 新对话", use_container_width=True): restart_agent() with col2: if st.download_button( "💾 导出会话", export_chat_history(), file_name="rag_chat_history.md", mime="text/markdown", use_container_width=True, ): st.sidebar.success("会话历史已导出!") def show_chat_history(agent: Agent): pass def restart_agent(): """Reset the agent and clear chat history""" logger.debug("---*--- Restarting agent ---*---") st.session_state["agentic_rag_agent"] = None st.session_state["agentic_rag_agent_session_id"] = None st.session_state["messages"] = [] st.rerun()