Files
agno_agentic_rag/ui.py
T
2025-04-08 14:10:12 +08:00

222 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
from typing import List
import streamlit as st
from agno.agent import Agent
from agno.utils.log import logger
from agentic_rag import work_context, set_sofeware_work_context
from utils import (
CUSTOM_CSS,
add_message,
export_chat_history,
)
def initialize_ui():
"""Initialize Streamlit UI configuration"""
st.set_page_config(
page_title="智能检索增强生成",
page_icon="💎",
layout="wide",
initial_sidebar_state="expanded",
)
st.markdown(CUSTOM_CSS, unsafe_allow_html=True)
def show_header():
"""Display application header"""
st.markdown("<h1 class='main-title'>博微配网计价通D3软件 </h1>", unsafe_allow_html=True)
#st.markdown(
# "<p class='subtitle'>由Agno驱动的智能研究助手</p>",
# unsafe_allow_html=True,
#)
def set_current_page(page : str):
set_sofeware_work_context({
"软件": "博微配网计价通D3软件",
"工程文件": "广州配网造价工程",
"已打开页面": ["工程信息", "取费设置", "组合件", "工程量", "材机分析", "工程费用", "报表输出"],
"当前页面": page,
})
def show_tabs():
"""Display tabs in main content area"""
tabs = [
{"name": "主页", "icon": "💬", "image": "static/images/主页.png"},
{"name": "工程信息", "icon": "📚", "image": "static/images/工程信息.png"},
{"name": "取费设置", "icon": "⚙️", "image": "static/images/取费设置.png"},
{"name": "组合件", "icon": "⚙️", "image": "static/images/组合件.png"},
{"name": "工程量", "icon": "⚙️", "image": "static/images/工程量.png"},
{"name": "材机分析", "icon": "⚙️", "image": "static/images/材机分析.png"},
{"name": "工程费用", "icon": "⚙️", "image": "static/images/工程费用.png"},
{"name": "报表输出", "icon": "⚙️", "image": "static/images/报表输出.png"},
]
tabNames = ["主页", "工程信息", "取费设置", "组合件", "工程量", "材机分析", "工程费用", "报表输出"]
selected_tab = st.radio("页面导航:", options=tabNames, index=0, horizontal=True)
if selected_tab in tabNames:
set_current_page(selected_tab)
st.image(f"static/images/{selected_tab}.png")
#tab_objects = st.tabs([f"{tab['name']}" for tab in tabs])
# with tab_objects[0]:
# st.image(tabs[0]["image"])
# with tab_objects[1]:
# st.image(tabs[1]["image"])
# with tab_objects[2]:
# st.image(tabs[2]["image"])
# with tab_objects[3]:
# st.image(tabs[3]["image"])
# with tab_objects[4]:
# st.image(tabs[4]["image"])
# with tab_objects[5]:
# st.image(tabs[5]["image"])
# with tab_objects[6]:
# st.image(tabs[6]["image"])
# with tab_objects[7]:
# st.image(tabs[7]["image"])
# for index, value in enumerate(tabs):
# with tab_objects[index]:
# st.image(value["image"])
# for index, value in enumerate(tabs):
# if tabs[index]["name"] == selected_tab:
# set_current_page(value["name"])
# model_options = {
# "Qwen2.5-72B": "openai:Qwen/Qwen2.5-72B-Instruct",
# "o3-mini": "openai:o3-mini",
# "gpt-4o": "openai:gpt-4o",
# "gemini-2.0-flash-exp": "google:gemini-2.0-flash-exp",
# "claude-3-5-sonnet": "anthropic:claude-3-5-sonnet-20241022",
# "llama-3.3-70b": "groq:llama-3.3-70b-versatile",
# }
model_options = dict(item.split('=') for item in os.getenv("MODEL_LIST", "Qwen2.5-72B=openai:Qwen/Qwen2.5-72B-Instruct").split('&'))
def get_modul_option(id: int = 0) -> str:
"""Return the selected module option"""
return model_options[list(model_options.keys())[id]]
def show_model_selector(index: int = 0) -> str:
"""Display model selection dialog"""
selected_model = st.selectbox(
"选择模型",
options=list(model_options.keys()),
index=index,
key="model_selector",
)
session.set("model_index", selected_model)
return model_options[selected_model]
def show_dialog_components(agent: Agent):
"""Display all sidebar components in a dialog"""
with st.expander("⚙️ 设置", expanded=False):
st.markdown("#### 📚 文档管理")
input_url = st.text_input("添加URL到知识库")
if (
input_url and not prompt and not st.session_state.knowledge_base_initialized
): # Only load if KB not initialized
if input_url not in st.session_state.loaded_urls:
alert = st.sidebar.info("Processing URLs...", icon="️")
if input_url.lower().endswith(".pdf"):
try:
# Download PDF to temporary file
response = requests.get(input_url, stream=True, verify=False)
response.raise_for_status()
with tempfile.NamedTemporaryFile(
suffix=".pdf", delete=False
) as tmp_file:
for chunk in response.iter_content(chunk_size=8192):
tmp_file.write(chunk)
tmp_path = tmp_file.name
reader = PDFReader()
docs: List[Document] = reader.read(tmp_path)
# Clean up temporary file
os.unlink(tmp_path)
except Exception as e:
st.sidebar.error(f"Error processing PDF: {str(e)}")
docs = []
else:
scraper = WebsiteReader(max_links=2, max_depth=1)
docs: List[Document] = scraper.read(input_url)
if docs:
agentic_rag_agent.knowledge.load_documents(docs, upsert=True)
st.session_state.loaded_urls.add(input_url)
st.sidebar.success("URL已添加到知识库")
else:
st.sidebar.error("无法处理提供的URL")
alert.empty()
else:
st.sidebar.info("URL已加载到知识库")
# 修正缩进,使其与上下文一致
uploaded_file = st.sidebar.file_uploader(
"添加文档(.pdf.csv.json.md或.txt)", key="file_upload"
)
if (
uploaded_file and not prompt and not st.session_state.knowledge_base_initialized
): # Only load if KB not initialized
file_identifier = f"{uploaded_file.name}_{uploaded_file.size}"
if file_identifier not in st.session_state.loaded_files:
alert = st.sidebar.info("正在处理文档...", icon="️")
file_type = uploaded_file.name.split(".")[-1].lower()
reader = get_reader(file_type)
if reader:
docs = reader.read(uploaded_file)
agentic_rag_agent.knowledge.load_documents(docs, upsert=True)
st.session_state.loaded_files.add(file_identifier)
st.sidebar.success(f"{uploaded_file.name}已添加到知识库")
st.session_state.knowledge_base_initialized = True
alert.empty()
else:
st.sidebar.info(f"{uploaded_file.name}已加载到知识库")
if st.sidebar.button("清空知识库"):
agentic_rag_agent.knowledge.vector_db.delete()
st.session_state.loaded_urls.clear()
st.session_state.loaded_files.clear()
st.session_state.knowledge_base_initialized = False # Reset initialization flag
st.sidebar.success("知识库已清空")
def show_sample_questions():
"""Display sample questions section"""
st.markdown("#### ❓ 示例问题")
if st.button("📝 总结"):
add_message(
"user",
"你能总结一下当前知识库中的内容吗(使用`search_knowledge_base`工具)?",
)
def show_utility_buttons():
"""Display utility buttons section"""
st.markdown("#### 🛠️ 工具")
col1, col2 = st.columns([1, 1])
with col1:
if st.button("🔄 新对话", use_container_width=True):
restart_agent()
with col2:
if st.download_button(
"💾 导出会话",
export_chat_history(),
file_name="rag_chat_history.md",
mime="text/markdown",
use_container_width=True,
):
st.sidebar.success("会话历史已导出!")
def show_chat_history(agent: Agent):
pass
def restart_agent():
"""Reset the agent and clear chat history"""
logger.debug("---*--- Restarting agent ---*---")
st.session_state["agentic_rag_agent"] = None
st.session_state["agentic_rag_agent_session_id"] = None
st.session_state["messages"] = []
st.rerun()