Initial commit from Create Llama
This commit is contained in:
@@ -0,0 +1,4 @@
|
||||
__pycache__
|
||||
storage
|
||||
.env
|
||||
output
|
||||
@@ -0,0 +1,26 @@
|
||||
FROM python:3.11 as build
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
ENV PYTHONPATH=/app
|
||||
|
||||
# Install Poetry
|
||||
RUN curl -sSL https://install.python-poetry.org | POETRY_HOME=/opt/poetry python && \
|
||||
cd /usr/local/bin && \
|
||||
ln -s /opt/poetry/bin/poetry && \
|
||||
poetry config virtualenvs.create false
|
||||
|
||||
# Install Chromium for web loader
|
||||
# Can disable this if you don't use the web loader to reduce the image size
|
||||
RUN apt update && apt install -y chromium chromium-driver
|
||||
|
||||
# Install dependencies
|
||||
COPY ./pyproject.toml ./poetry.lock* /app/
|
||||
RUN poetry install --no-root --no-cache --only main
|
||||
|
||||
# ====================================
|
||||
FROM build as release
|
||||
|
||||
COPY . .
|
||||
|
||||
CMD ["python", "main.py"]
|
||||
@@ -0,0 +1,101 @@
|
||||
This is a [LlamaIndex](https://www.llamaindex.ai/) project using [FastAPI](https://fastapi.tiangolo.com/) bootstrapped with [`create-llama`](https://github.com/run-llama/LlamaIndexTS/tree/main/packages/create-llama).
|
||||
|
||||
## Getting Started
|
||||
|
||||
First, setup the environment with poetry:
|
||||
|
||||
> **_Note:_** This step is not needed if you are using the dev-container.
|
||||
|
||||
```
|
||||
poetry install
|
||||
poetry shell
|
||||
```
|
||||
|
||||
Then check the parameters that have been pre-configured in the `.env` file in this directory. (E.g. you might need to configure an `OPENAI_API_KEY` if you're using OpenAI as model provider).
|
||||
|
||||
If you are using any tools or data sources, you can update their config files in the `config` folder.
|
||||
|
||||
Second, generate the embeddings of the documents in the `./data` directory (if this folder exists - otherwise, skip this step):
|
||||
|
||||
```
|
||||
poetry run generate
|
||||
```
|
||||
|
||||
Third, run the development server:
|
||||
|
||||
```
|
||||
python main.py
|
||||
```
|
||||
|
||||
The example provides two different API endpoints:
|
||||
|
||||
1. `/api/chat` - a streaming chat endpoint
|
||||
2. `/api/chat/request` - a non-streaming chat endpoint
|
||||
|
||||
You can test the streaming endpoint with the following curl request:
|
||||
|
||||
```
|
||||
curl --location 'localhost:8000/api/chat' \
|
||||
--header 'Content-Type: application/json' \
|
||||
--data '{ "messages": [{ "role": "user", "content": "Hello" }] }'
|
||||
```
|
||||
|
||||
And for the non-streaming endpoint run:
|
||||
|
||||
```
|
||||
curl --location 'localhost:8000/api/chat/request' \
|
||||
--header 'Content-Type: application/json' \
|
||||
--data '{ "messages": [{ "role": "user", "content": "Hello" }] }'
|
||||
```
|
||||
|
||||
You can start editing the API endpoints by modifying `app/api/routers/chat.py`. The endpoints auto-update as you save the file. You can delete the endpoint you're not using.
|
||||
|
||||
Open [http://localhost:8000/docs](http://localhost:8000/docs) with your browser to see the Swagger UI of the API.
|
||||
|
||||
The API allows CORS for all origins to simplify development. You can change this behavior by setting the `ENVIRONMENT` environment variable to `prod`:
|
||||
|
||||
```
|
||||
ENVIRONMENT=prod python main.py
|
||||
```
|
||||
|
||||
## Using Docker
|
||||
|
||||
1. Build an image for the FastAPI app:
|
||||
|
||||
```
|
||||
docker build -t <your_backend_image_name> .
|
||||
```
|
||||
|
||||
2. Generate embeddings:
|
||||
|
||||
Parse the data and generate the vector embeddings if the `./data` folder exists - otherwise, skip this step:
|
||||
|
||||
```
|
||||
docker run \
|
||||
--rm \
|
||||
-v $(pwd)/.env:/app/.env \ # Use ENV variables and configuration from your file-system
|
||||
-v $(pwd)/config:/app/config \
|
||||
-v $(pwd)/data:/app/data \ # Use your local folder to read the data
|
||||
-v $(pwd)/storage:/app/storage \ # Use your file system to store the vector database
|
||||
<your_backend_image_name> \
|
||||
poetry run generate
|
||||
```
|
||||
|
||||
3. Start the API:
|
||||
|
||||
```
|
||||
docker run \
|
||||
-v $(pwd)/.env:/app/.env \ # Use ENV variables and configuration from your file-system
|
||||
-v $(pwd)/config:/app/config \
|
||||
-v $(pwd)/storage:/app/storage \ # Use your file system to store gea vector database
|
||||
-p 8000:8000 \
|
||||
<your_backend_image_name>
|
||||
```
|
||||
|
||||
## Learn More
|
||||
|
||||
To learn more about LlamaIndex, take a look at the following resources:
|
||||
|
||||
- [LlamaIndex Documentation](https://docs.llamaindex.ai) - learn about LlamaIndex.
|
||||
|
||||
You can check out [the LlamaIndex GitHub repository](https://github.com/run-llama/llama_index) - your feedback and contributions are welcome!
|
||||
@@ -0,0 +1,148 @@
|
||||
import logging
|
||||
import os
|
||||
from typing import List
|
||||
|
||||
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, status
|
||||
from llama_index.core.chat_engine.types import BaseChatEngine, NodeWithScore
|
||||
from llama_index.core.llms import MessageRole
|
||||
from llama_index.core.vector_stores.types import MetadataFilter, MetadataFilters
|
||||
|
||||
from app.api.routers.events import EventCallbackHandler
|
||||
from app.api.routers.models import (
|
||||
ChatConfig,
|
||||
ChatData,
|
||||
Message,
|
||||
Result,
|
||||
SourceNodes,
|
||||
)
|
||||
from app.api.routers.vercel_response import VercelStreamResponse
|
||||
from app.api.services.llama_cloud import LLamaCloudFileService
|
||||
from app.engine import get_chat_engine
|
||||
|
||||
chat_router = r = APIRouter()
|
||||
|
||||
logger = logging.getLogger("uvicorn")
|
||||
|
||||
|
||||
def process_response_nodes(
|
||||
nodes: List[NodeWithScore],
|
||||
background_tasks: BackgroundTasks,
|
||||
):
|
||||
"""
|
||||
Start background tasks on the source nodes if needed.
|
||||
"""
|
||||
files_to_download = SourceNodes.get_download_files(nodes)
|
||||
for file in files_to_download:
|
||||
background_tasks.add_task(
|
||||
LLamaCloudFileService.download_llamacloud_pipeline_file, file
|
||||
)
|
||||
|
||||
|
||||
# streaming endpoint - delete if not needed
|
||||
@r.post("")
|
||||
async def chat(
|
||||
request: Request,
|
||||
data: ChatData,
|
||||
background_tasks: BackgroundTasks,
|
||||
chat_engine: BaseChatEngine = Depends(get_chat_engine),
|
||||
):
|
||||
try:
|
||||
last_message_content = data.get_last_message_content()
|
||||
messages = data.get_history_messages()
|
||||
|
||||
doc_ids = data.get_chat_document_ids()
|
||||
filters = generate_filters(doc_ids)
|
||||
params = data.data or {}
|
||||
logger.info("Creating chat engine with filters", filters.dict())
|
||||
chat_engine = get_chat_engine(filters=filters, params=params)
|
||||
|
||||
event_handler = EventCallbackHandler()
|
||||
chat_engine.callback_manager.handlers.append(event_handler) # type: ignore
|
||||
|
||||
response = await chat_engine.astream_chat(last_message_content, messages)
|
||||
process_response_nodes(response.source_nodes, background_tasks)
|
||||
|
||||
return VercelStreamResponse(request, event_handler, response, data)
|
||||
except Exception as e:
|
||||
logger.exception("Error in chat engine", exc_info=True)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Error in chat engine: {e}",
|
||||
) from e
|
||||
|
||||
|
||||
def generate_filters(doc_ids):
|
||||
if len(doc_ids) > 0:
|
||||
filters = MetadataFilters(
|
||||
filters=[
|
||||
MetadataFilter(
|
||||
key="private",
|
||||
value=["true"],
|
||||
operator="nin", # type: ignore
|
||||
),
|
||||
MetadataFilter(
|
||||
key="doc_id",
|
||||
value=doc_ids,
|
||||
operator="in", # type: ignore
|
||||
),
|
||||
],
|
||||
condition="or", # type: ignore
|
||||
)
|
||||
else:
|
||||
filters = MetadataFilters(
|
||||
# Use the "NIN" - "not in" operator to include all public documents (don't have the private key set)
|
||||
filters=[
|
||||
MetadataFilter(
|
||||
key="private",
|
||||
value=["true"],
|
||||
operator="nin", # type: ignore
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
return filters
|
||||
|
||||
|
||||
# non-streaming endpoint - delete if not needed
|
||||
@r.post("/request")
|
||||
async def chat_request(
|
||||
data: ChatData,
|
||||
chat_engine: BaseChatEngine = Depends(get_chat_engine),
|
||||
) -> Result:
|
||||
last_message_content = data.get_last_message_content()
|
||||
messages = data.get_history_messages()
|
||||
|
||||
response = await chat_engine.achat(last_message_content, messages)
|
||||
return Result(
|
||||
result=Message(role=MessageRole.ASSISTANT, content=response.response),
|
||||
nodes=SourceNodes.from_source_nodes(response.source_nodes),
|
||||
)
|
||||
|
||||
|
||||
@r.get("/config")
|
||||
async def chat_config() -> ChatConfig:
|
||||
starter_questions = None
|
||||
conversation_starters = os.getenv("CONVERSATION_STARTERS")
|
||||
if conversation_starters and conversation_starters.strip():
|
||||
starter_questions = conversation_starters.strip().split("\n")
|
||||
return ChatConfig(starter_questions=starter_questions)
|
||||
|
||||
|
||||
@r.get("/config/llamacloud")
|
||||
async def chat_llama_cloud_config():
|
||||
projects = LLamaCloudFileService.get_all_projects_with_pipelines()
|
||||
pipeline = os.getenv("LLAMA_CLOUD_INDEX_NAME")
|
||||
project = os.getenv("LLAMA_CLOUD_PROJECT_NAME")
|
||||
pipeline_config = (
|
||||
pipeline
|
||||
and project
|
||||
and {
|
||||
"pipeline": pipeline,
|
||||
"project": project,
|
||||
}
|
||||
or None
|
||||
)
|
||||
return {
|
||||
"projects": projects,
|
||||
"pipeline": pipeline_config,
|
||||
}
|
||||
@@ -0,0 +1,149 @@
|
||||
import json
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import AsyncGenerator, Dict, Any, List, Optional
|
||||
from llama_index.core.callbacks.base import BaseCallbackHandler
|
||||
from llama_index.core.callbacks.schema import CBEventType
|
||||
from llama_index.core.tools.types import ToolOutput
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CallbackEvent(BaseModel):
|
||||
event_type: CBEventType
|
||||
payload: Optional[Dict[str, Any]] = None
|
||||
event_id: str = ""
|
||||
|
||||
def get_retrieval_message(self) -> dict | None:
|
||||
if self.payload:
|
||||
nodes = self.payload.get("nodes")
|
||||
if nodes:
|
||||
msg = f"Retrieved {len(nodes)} sources to use as context for the query"
|
||||
else:
|
||||
msg = f"Retrieving context for query: '{self.payload.get('query_str')}'"
|
||||
return {
|
||||
"type": "events",
|
||||
"data": {"title": msg},
|
||||
}
|
||||
else:
|
||||
return None
|
||||
|
||||
def get_tool_message(self) -> dict | None:
|
||||
func_call_args = self.payload.get("function_call")
|
||||
if func_call_args is not None and "tool" in self.payload:
|
||||
tool = self.payload.get("tool")
|
||||
return {
|
||||
"type": "events",
|
||||
"data": {
|
||||
"title": f"Calling tool: {tool.name} with inputs: {func_call_args}",
|
||||
},
|
||||
}
|
||||
|
||||
def _is_output_serializable(self, output: Any) -> bool:
|
||||
try:
|
||||
json.dumps(output)
|
||||
return True
|
||||
except TypeError:
|
||||
return False
|
||||
|
||||
def get_agent_tool_response(self) -> dict | None:
|
||||
response = self.payload.get("response")
|
||||
if response is not None:
|
||||
sources = response.sources
|
||||
for source in sources:
|
||||
# Return the tool response here to include the toolCall information
|
||||
if isinstance(source, ToolOutput):
|
||||
if self._is_output_serializable(source.raw_output):
|
||||
output = source.raw_output
|
||||
else:
|
||||
output = source.content
|
||||
|
||||
return {
|
||||
"type": "tools",
|
||||
"data": {
|
||||
"toolOutput": {
|
||||
"output": output,
|
||||
"isError": source.is_error,
|
||||
},
|
||||
"toolCall": {
|
||||
"id": None, # There is no tool id in the ToolOutput
|
||||
"name": source.tool_name,
|
||||
"input": source.raw_input,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
def to_response(self):
|
||||
try:
|
||||
match self.event_type:
|
||||
case "retrieve":
|
||||
return self.get_retrieval_message()
|
||||
case "function_call":
|
||||
return self.get_tool_message()
|
||||
case "agent_step":
|
||||
return self.get_agent_tool_response()
|
||||
case _:
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error in converting event to response: {e}")
|
||||
return None
|
||||
|
||||
|
||||
class EventCallbackHandler(BaseCallbackHandler):
|
||||
_aqueue: asyncio.Queue
|
||||
is_done: bool = False
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
):
|
||||
"""Initialize the base callback handler."""
|
||||
ignored_events = [
|
||||
CBEventType.CHUNKING,
|
||||
CBEventType.NODE_PARSING,
|
||||
CBEventType.EMBEDDING,
|
||||
CBEventType.LLM,
|
||||
CBEventType.TEMPLATING,
|
||||
]
|
||||
super().__init__(ignored_events, ignored_events)
|
||||
self._aqueue = asyncio.Queue()
|
||||
|
||||
def on_event_start(
|
||||
self,
|
||||
event_type: CBEventType,
|
||||
payload: Optional[Dict[str, Any]] = None,
|
||||
event_id: str = "",
|
||||
**kwargs: Any,
|
||||
) -> str:
|
||||
event = CallbackEvent(event_id=event_id, event_type=event_type, payload=payload)
|
||||
if event.to_response() is not None:
|
||||
self._aqueue.put_nowait(event)
|
||||
|
||||
def on_event_end(
|
||||
self,
|
||||
event_type: CBEventType,
|
||||
payload: Optional[Dict[str, Any]] = None,
|
||||
event_id: str = "",
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
event = CallbackEvent(event_id=event_id, event_type=event_type, payload=payload)
|
||||
if event.to_response() is not None:
|
||||
self._aqueue.put_nowait(event)
|
||||
|
||||
def start_trace(self, trace_id: Optional[str] = None) -> None:
|
||||
"""No-op."""
|
||||
|
||||
def end_trace(
|
||||
self,
|
||||
trace_id: Optional[str] = None,
|
||||
trace_map: Optional[Dict[str, List[str]]] = None,
|
||||
) -> None:
|
||||
"""No-op."""
|
||||
|
||||
async def async_event_gen(self) -> AsyncGenerator[CallbackEvent, None]:
|
||||
while not self._aqueue.empty() or not self.is_done:
|
||||
try:
|
||||
yield await asyncio.wait_for(self._aqueue.get(), timeout=0.1)
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
@@ -0,0 +1,252 @@
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Dict, List, Literal, Optional, Set
|
||||
|
||||
from llama_index.core.llms import ChatMessage, MessageRole
|
||||
from llama_index.core.schema import NodeWithScore
|
||||
from pydantic import BaseModel, Field, validator
|
||||
from pydantic.alias_generators import to_camel
|
||||
|
||||
logger = logging.getLogger("uvicorn")
|
||||
|
||||
|
||||
class FileContent(BaseModel):
|
||||
type: Literal["text", "ref"]
|
||||
# If the file is pure text then the value is be a string
|
||||
# otherwise, it's a list of document IDs
|
||||
value: str | List[str]
|
||||
|
||||
|
||||
class File(BaseModel):
|
||||
id: str
|
||||
content: FileContent
|
||||
filename: str
|
||||
filesize: int
|
||||
filetype: str
|
||||
|
||||
|
||||
class AnnotationFileData(BaseModel):
|
||||
files: List[File] = Field(
|
||||
default=[],
|
||||
description="List of files",
|
||||
)
|
||||
|
||||
class Config:
|
||||
json_schema_extra = {
|
||||
"example": {
|
||||
"csvFiles": [
|
||||
{
|
||||
"content": "Name, Age\nAlice, 25\nBob, 30",
|
||||
"filename": "example.csv",
|
||||
"filesize": 123,
|
||||
"id": "123",
|
||||
"type": "text/csv",
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
alias_generator = to_camel
|
||||
|
||||
|
||||
class Annotation(BaseModel):
|
||||
type: str
|
||||
data: AnnotationFileData | List[str]
|
||||
|
||||
def to_content(self) -> str | None:
|
||||
if self.type == "document_file":
|
||||
# We only support generating context content for CSV files for now
|
||||
csv_files = [file for file in self.data.files if file.filetype == "csv"]
|
||||
if len(csv_files) > 0:
|
||||
return "Use data from following CSV raw content\n" + "\n".join(
|
||||
[f"```csv\n{csv_file.content.value}\n```" for csv_file in csv_files]
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"The annotation {self.type} is not supported for generating context content"
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
class Message(BaseModel):
|
||||
role: MessageRole
|
||||
content: str
|
||||
annotations: List[Annotation] | None = None
|
||||
|
||||
|
||||
class ChatData(BaseModel):
|
||||
messages: List[Message]
|
||||
data: Any = None
|
||||
|
||||
class Config:
|
||||
json_schema_extra = {
|
||||
"example": {
|
||||
"messages": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": "What standards for letters exist?",
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
@validator("messages")
|
||||
def messages_must_not_be_empty(cls, v):
|
||||
if len(v) == 0:
|
||||
raise ValueError("Messages must not be empty")
|
||||
return v
|
||||
|
||||
def get_last_message_content(self) -> str:
|
||||
"""
|
||||
Get the content of the last message along with the data content if available.
|
||||
Fallback to use data content from previous messages
|
||||
"""
|
||||
if len(self.messages) == 0:
|
||||
raise ValueError("There is not any message in the chat")
|
||||
last_message = self.messages[-1]
|
||||
message_content = last_message.content
|
||||
for message in reversed(self.messages):
|
||||
if message.role == MessageRole.USER and message.annotations is not None:
|
||||
annotation_contents = filter(
|
||||
None,
|
||||
[annotation.to_content() for annotation in message.annotations],
|
||||
)
|
||||
if not annotation_contents:
|
||||
continue
|
||||
annotation_text = "\n".join(annotation_contents)
|
||||
message_content = f"{message_content}\n{annotation_text}"
|
||||
break
|
||||
return message_content
|
||||
|
||||
def get_history_messages(self) -> List[ChatMessage]:
|
||||
"""
|
||||
Get the history messages
|
||||
"""
|
||||
return [
|
||||
ChatMessage(role=message.role, content=message.content)
|
||||
for message in self.messages[:-1]
|
||||
]
|
||||
|
||||
def is_last_message_from_user(self) -> bool:
|
||||
return self.messages[-1].role == MessageRole.USER
|
||||
|
||||
def get_chat_document_ids(self) -> List[str]:
|
||||
"""
|
||||
Get the document IDs from the chat messages
|
||||
"""
|
||||
document_ids: List[str] = []
|
||||
for message in self.messages:
|
||||
if message.role == MessageRole.USER and message.annotations is not None:
|
||||
for annotation in message.annotations:
|
||||
if (
|
||||
annotation.type == "document_file"
|
||||
and annotation.data.files is not None
|
||||
):
|
||||
for fi in annotation.data.files:
|
||||
if fi.content.type == "ref":
|
||||
document_ids += fi.content.value
|
||||
return list(set(document_ids))
|
||||
|
||||
|
||||
class LlamaCloudFile(BaseModel):
|
||||
file_name: str
|
||||
pipeline_id: str
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, LlamaCloudFile):
|
||||
return NotImplemented
|
||||
return (
|
||||
self.file_name == other.file_name and self.pipeline_id == other.pipeline_id
|
||||
)
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.file_name, self.pipeline_id))
|
||||
|
||||
|
||||
class SourceNodes(BaseModel):
|
||||
id: str
|
||||
metadata: Dict[str, Any]
|
||||
score: Optional[float]
|
||||
text: str
|
||||
url: Optional[str]
|
||||
|
||||
@classmethod
|
||||
def from_source_node(cls, source_node: NodeWithScore):
|
||||
metadata = source_node.node.metadata
|
||||
url = cls.get_url_from_metadata(metadata)
|
||||
|
||||
return cls(
|
||||
id=source_node.node.node_id,
|
||||
metadata=metadata,
|
||||
score=source_node.score,
|
||||
text=source_node.node.text, # type: ignore
|
||||
url=url,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_url_from_metadata(cls, metadata: Dict[str, Any]) -> str:
|
||||
url_prefix = os.getenv("FILESERVER_URL_PREFIX")
|
||||
if not url_prefix:
|
||||
logger.warning(
|
||||
"Warning: FILESERVER_URL_PREFIX not set in environment variables. Can't use file server"
|
||||
)
|
||||
file_name = metadata.get("file_name")
|
||||
if file_name and url_prefix:
|
||||
# file_name exists and file server is configured
|
||||
pipeline_id = metadata.get("pipeline_id")
|
||||
if pipeline_id and metadata.get("private") is None:
|
||||
# file is from LlamaCloud and was not ingested locally
|
||||
file_name = f"{pipeline_id}${file_name}"
|
||||
return f"{url_prefix}/output/llamacloud/{file_name}"
|
||||
is_private = metadata.get("private", "false") == "true"
|
||||
if is_private:
|
||||
return f"{url_prefix}/output/uploaded/{file_name}"
|
||||
return f"{url_prefix}/data/{file_name}"
|
||||
else:
|
||||
# fallback to URL in metadata (e.g. for websites)
|
||||
return metadata.get("URL")
|
||||
|
||||
@classmethod
|
||||
def from_source_nodes(cls, source_nodes: List[NodeWithScore]):
|
||||
return [cls.from_source_node(node) for node in source_nodes]
|
||||
|
||||
@staticmethod
|
||||
def get_download_files(nodes: List[NodeWithScore]) -> Set[LlamaCloudFile]:
|
||||
source_nodes = SourceNodes.from_source_nodes(nodes)
|
||||
llama_cloud_files = [
|
||||
LlamaCloudFile(
|
||||
file_name=node.metadata.get("file_name"),
|
||||
pipeline_id=node.metadata.get("pipeline_id"),
|
||||
)
|
||||
for node in source_nodes
|
||||
if (
|
||||
node.metadata.get("private")
|
||||
is None # Only download files are from LlamaCloud and were not ingested locally
|
||||
and node.metadata.get("pipeline_id") is not None
|
||||
and node.metadata.get("file_name") is not None
|
||||
)
|
||||
]
|
||||
# Remove duplicates and return
|
||||
return set(llama_cloud_files)
|
||||
|
||||
|
||||
class Result(BaseModel):
|
||||
result: Message
|
||||
nodes: List[SourceNodes]
|
||||
|
||||
|
||||
class ChatConfig(BaseModel):
|
||||
starter_questions: Optional[List[str]] = Field(
|
||||
default=None,
|
||||
description="List of starter questions",
|
||||
serialization_alias="starterQuestions",
|
||||
)
|
||||
|
||||
class Config:
|
||||
json_schema_extra = {
|
||||
"example": {
|
||||
"starterQuestions": [
|
||||
"What standards for letters exist?",
|
||||
"What are the requirements for a letter to be considered a letter?",
|
||||
]
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
import logging
|
||||
from typing import List
|
||||
|
||||
from fastapi import APIRouter, HTTPException
|
||||
from pydantic import BaseModel
|
||||
|
||||
from app.api.services.file import PrivateFileService
|
||||
|
||||
file_upload_router = r = APIRouter()
|
||||
|
||||
logger = logging.getLogger("uvicorn")
|
||||
|
||||
|
||||
class FileUploadRequest(BaseModel):
|
||||
base64: str
|
||||
|
||||
|
||||
@r.post("")
|
||||
def upload_file(request: FileUploadRequest) -> List[str]:
|
||||
try:
|
||||
logger.info("Processing file")
|
||||
return PrivateFileService.process_file(request.base64)
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing file: {e}", exc_info=True)
|
||||
raise HTTPException(status_code=500, detail="Error processing file")
|
||||
@@ -0,0 +1,109 @@
|
||||
import json
|
||||
|
||||
from aiostream import stream
|
||||
from fastapi import Request
|
||||
from fastapi.responses import StreamingResponse
|
||||
from llama_index.core.chat_engine.types import StreamingAgentChatResponse
|
||||
|
||||
from app.api.routers.events import EventCallbackHandler
|
||||
from app.api.routers.models import ChatData, Message, SourceNodes
|
||||
from app.api.services.suggestion import NextQuestionSuggestion
|
||||
|
||||
|
||||
class VercelStreamResponse(StreamingResponse):
|
||||
"""
|
||||
Class to convert the response from the chat engine to the streaming format expected by Vercel
|
||||
"""
|
||||
|
||||
TEXT_PREFIX = "0:"
|
||||
DATA_PREFIX = "8:"
|
||||
|
||||
@classmethod
|
||||
def convert_text(cls, token: str):
|
||||
# Escape newlines and double quotes to avoid breaking the stream
|
||||
token = json.dumps(token)
|
||||
return f"{cls.TEXT_PREFIX}{token}\n"
|
||||
|
||||
@classmethod
|
||||
def convert_data(cls, data: dict):
|
||||
data_str = json.dumps(data)
|
||||
return f"{cls.DATA_PREFIX}[{data_str}]\n"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
request: Request,
|
||||
event_handler: EventCallbackHandler,
|
||||
response: StreamingAgentChatResponse,
|
||||
chat_data: ChatData,
|
||||
):
|
||||
content = VercelStreamResponse.content_generator(
|
||||
request, event_handler, response, chat_data
|
||||
)
|
||||
super().__init__(content=content)
|
||||
|
||||
@classmethod
|
||||
async def content_generator(
|
||||
cls,
|
||||
request: Request,
|
||||
event_handler: EventCallbackHandler,
|
||||
response: StreamingAgentChatResponse,
|
||||
chat_data: ChatData,
|
||||
):
|
||||
# Yield the text response
|
||||
async def _chat_response_generator():
|
||||
final_response = ""
|
||||
async for token in response.async_response_gen():
|
||||
final_response += token
|
||||
yield VercelStreamResponse.convert_text(token)
|
||||
|
||||
# Generate questions that user might interested to
|
||||
conversation = chat_data.messages + [
|
||||
Message(role="assistant", content=final_response)
|
||||
]
|
||||
questions = await NextQuestionSuggestion.suggest_next_questions(
|
||||
conversation
|
||||
)
|
||||
if len(questions) > 0:
|
||||
yield VercelStreamResponse.convert_data(
|
||||
{
|
||||
"type": "suggested_questions",
|
||||
"data": questions,
|
||||
}
|
||||
)
|
||||
|
||||
# the text_generator is the leading stream, once it's finished, also finish the event stream
|
||||
event_handler.is_done = True
|
||||
|
||||
# Yield the source nodes
|
||||
yield cls.convert_data(
|
||||
{
|
||||
"type": "sources",
|
||||
"data": {
|
||||
"nodes": [
|
||||
SourceNodes.from_source_node(node).dict()
|
||||
for node in response.source_nodes
|
||||
]
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
# Yield the events from the event handler
|
||||
async def _event_generator():
|
||||
async for event in event_handler.async_event_gen():
|
||||
event_response = event.to_response()
|
||||
if event_response is not None:
|
||||
yield VercelStreamResponse.convert_data(event_response)
|
||||
|
||||
combine = stream.merge(_chat_response_generator(), _event_generator())
|
||||
is_stream_started = False
|
||||
async with combine.stream() as streamer:
|
||||
async for output in streamer:
|
||||
if not is_stream_started:
|
||||
is_stream_started = True
|
||||
# Stream a blank message to start the stream
|
||||
yield VercelStreamResponse.convert_text("")
|
||||
|
||||
yield output
|
||||
|
||||
if await request.is_disconnected():
|
||||
break
|
||||
@@ -0,0 +1,113 @@
|
||||
import base64
|
||||
import mimetypes
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Dict, List
|
||||
from uuid import uuid4
|
||||
|
||||
from app.engine.index import get_index
|
||||
from llama_index.core import VectorStoreIndex
|
||||
from llama_index.core.ingestion import IngestionPipeline
|
||||
from llama_index.core.readers.file.base import (
|
||||
_try_loading_included_file_formats as get_file_loaders_map,
|
||||
)
|
||||
from llama_index.core.readers.file.base import (
|
||||
default_file_metadata_func,
|
||||
)
|
||||
from llama_index.core.schema import Document
|
||||
from llama_index.indices.managed.llama_cloud.base import LlamaCloudIndex
|
||||
from llama_index.readers.file import FlatReader
|
||||
|
||||
|
||||
def get_llamaparse_parser():
|
||||
from app.engine.loaders import load_configs
|
||||
from app.engine.loaders.file import FileLoaderConfig, llama_parse_parser
|
||||
|
||||
config = load_configs()
|
||||
file_loader_config = FileLoaderConfig(**config["file"])
|
||||
if file_loader_config.use_llama_parse:
|
||||
return llama_parse_parser()
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def default_file_loaders_map():
|
||||
default_loaders = get_file_loaders_map()
|
||||
default_loaders[".txt"] = FlatReader
|
||||
return default_loaders
|
||||
|
||||
|
||||
class PrivateFileService:
|
||||
PRIVATE_STORE_PATH = "output/uploaded"
|
||||
|
||||
@staticmethod
|
||||
def preprocess_base64_file(base64_content: str) -> tuple:
|
||||
header, data = base64_content.split(",", 1)
|
||||
mime_type = header.split(";")[0].split(":", 1)[1]
|
||||
extension = mimetypes.guess_extension(mime_type)
|
||||
# File data as bytes
|
||||
return base64.b64decode(data), extension
|
||||
|
||||
@staticmethod
|
||||
def store_and_parse_file(file_data, extension) -> List[Document]:
|
||||
# Store file to the private directory
|
||||
os.makedirs(PrivateFileService.PRIVATE_STORE_PATH, exist_ok=True)
|
||||
|
||||
# random file name
|
||||
file_name = f"{uuid4().hex}{extension}"
|
||||
file_path = Path(os.path.join(PrivateFileService.PRIVATE_STORE_PATH, file_name))
|
||||
|
||||
# write file
|
||||
with open(file_path, "wb") as f:
|
||||
f.write(file_data)
|
||||
|
||||
# Load file to documents
|
||||
# If LlamaParse is enabled, use it to parse the file
|
||||
# Otherwise, use the default file loaders
|
||||
reader = get_llamaparse_parser()
|
||||
if reader is None:
|
||||
reader_cls = default_file_loaders_map().get(extension)
|
||||
if reader_cls is None:
|
||||
raise ValueError(f"File extension {extension} is not supported")
|
||||
reader = reader_cls()
|
||||
documents = reader.load_data(file_path)
|
||||
# Add custom metadata
|
||||
for doc in documents:
|
||||
doc.metadata["file_name"] = file_name
|
||||
doc.metadata["private"] = "true"
|
||||
return documents
|
||||
|
||||
@staticmethod
|
||||
def process_file(base64_content: str) -> List[str]:
|
||||
file_data, extension = PrivateFileService.preprocess_base64_file(base64_content)
|
||||
documents = PrivateFileService.store_and_parse_file(file_data, extension)
|
||||
|
||||
# Only process nodes, no store the index
|
||||
pipeline = IngestionPipeline()
|
||||
nodes = pipeline.run(documents=documents)
|
||||
|
||||
# Add the nodes to the index and persist it
|
||||
current_index = get_index()
|
||||
|
||||
# Insert the documents into the index
|
||||
if isinstance(current_index, LlamaCloudIndex):
|
||||
# LlamaCloudIndex is a managed index so we don't need to process the nodes
|
||||
# just insert the documents
|
||||
for doc in documents:
|
||||
current_index.insert(doc)
|
||||
else:
|
||||
# Only process nodes, no store the index
|
||||
pipeline = IngestionPipeline()
|
||||
nodes = pipeline.run(documents=documents)
|
||||
|
||||
# Add the nodes to the index and persist it
|
||||
if current_index is None:
|
||||
current_index = VectorStoreIndex(nodes=nodes)
|
||||
else:
|
||||
current_index.insert_nodes(nodes=nodes)
|
||||
current_index.storage_context.persist(
|
||||
persist_dir=os.environ.get("STORAGE_DIR", "storage")
|
||||
)
|
||||
|
||||
# Return the document ids
|
||||
return [doc.doc_id for doc in documents]
|
||||
@@ -0,0 +1,114 @@
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import requests
|
||||
from app.api.routers.models import LlamaCloudFile
|
||||
|
||||
logger = logging.getLogger("uvicorn")
|
||||
|
||||
|
||||
class LLamaCloudFileService:
|
||||
LLAMA_CLOUD_URL = "https://cloud.llamaindex.ai/api/v1"
|
||||
LOCAL_STORE_PATH = "output/llamacloud"
|
||||
|
||||
DOWNLOAD_FILE_NAME_TPL = "{pipeline_id}${filename}"
|
||||
|
||||
@classmethod
|
||||
def get_all_projects(cls) -> List[Dict[str, Any]]:
|
||||
url = f"{cls.LLAMA_CLOUD_URL}/projects"
|
||||
return cls._make_request(url)
|
||||
|
||||
@classmethod
|
||||
def get_all_pipelines(cls) -> List[Dict[str, Any]]:
|
||||
url = f"{cls.LLAMA_CLOUD_URL}/pipelines"
|
||||
return cls._make_request(url)
|
||||
|
||||
@classmethod
|
||||
def get_all_projects_with_pipelines(cls) -> List[Dict[str, Any]]:
|
||||
try:
|
||||
projects = cls.get_all_projects()
|
||||
pipelines = cls.get_all_pipelines()
|
||||
return [
|
||||
{
|
||||
**project,
|
||||
"pipelines": [p for p in pipelines if p["project_id"] == project["id"]],
|
||||
}
|
||||
for project in projects
|
||||
]
|
||||
except Exception as error:
|
||||
logger.error(f"Error listing projects and pipelines: {error}")
|
||||
return []
|
||||
|
||||
@classmethod
|
||||
def _get_files(cls, pipeline_id: str) -> List[Dict[str, Any]]:
|
||||
url = f"{cls.LLAMA_CLOUD_URL}/pipelines/{pipeline_id}/files"
|
||||
return cls._make_request(url)
|
||||
|
||||
@classmethod
|
||||
def _get_file_detail(cls, project_id: str, file_id: str) -> Dict[str, Any]:
|
||||
url = f"{cls.LLAMA_CLOUD_URL}/files/{file_id}/content?project_id={project_id}"
|
||||
return cls._make_request(url)
|
||||
|
||||
@classmethod
|
||||
def _download_file(cls, url: str, local_file_path: str):
|
||||
logger.info(f"Downloading file to {local_file_path}")
|
||||
# Create directory if it doesn't exist
|
||||
os.makedirs(cls.LOCAL_STORE_PATH, exist_ok=True)
|
||||
# Download the file
|
||||
with requests.get(url, stream=True) as r:
|
||||
r.raise_for_status()
|
||||
with open(local_file_path, "wb") as f:
|
||||
for chunk in r.iter_content(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
logger.info("File downloaded successfully")
|
||||
|
||||
@classmethod
|
||||
def download_llamacloud_pipeline_file(
|
||||
cls,
|
||||
file: LlamaCloudFile,
|
||||
force_download: bool = False,
|
||||
):
|
||||
file_name = file.file_name
|
||||
pipeline_id = file.pipeline_id
|
||||
|
||||
# Check is the file already exists
|
||||
downloaded_file_path = cls.get_file_path(file_name, pipeline_id)
|
||||
if os.path.exists(downloaded_file_path) and not force_download:
|
||||
logger.debug(f"File {file_name} already exists in local storage")
|
||||
return
|
||||
try:
|
||||
logger.info(f"Downloading file {file_name} for pipeline {pipeline_id}")
|
||||
files = cls._get_files(pipeline_id)
|
||||
if not files or not isinstance(files, list):
|
||||
raise Exception("No files found in LlamaCloud")
|
||||
for file_entry in files:
|
||||
if file_entry["name"] == file_name:
|
||||
file_id = file_entry["file_id"]
|
||||
project_id = file_entry["project_id"]
|
||||
file_detail = cls._get_file_detail(project_id, file_id)
|
||||
cls._download_file(file_detail["url"], downloaded_file_path)
|
||||
break
|
||||
except Exception as error:
|
||||
logger.info(f"Error fetching file from LlamaCloud: {error}")
|
||||
|
||||
@classmethod
|
||||
def get_file_name(cls, name: str, pipeline_id: str) -> str:
|
||||
return cls.DOWNLOAD_FILE_NAME_TPL.format(pipeline_id=pipeline_id, filename=name)
|
||||
|
||||
@classmethod
|
||||
def get_file_path(cls, name: str, pipeline_id: str) -> str:
|
||||
return os.path.join(cls.LOCAL_STORE_PATH, cls.get_file_name(name, pipeline_id))
|
||||
|
||||
@staticmethod
|
||||
def _make_request(
|
||||
url: str, data=None, headers: Optional[Dict] = None, method: str = "get"
|
||||
):
|
||||
if headers is None:
|
||||
headers = {
|
||||
"Accept": "application/json",
|
||||
"Authorization": f'Bearer {os.getenv("LLAMA_CLOUD_API_KEY")}',
|
||||
}
|
||||
response = requests.request(method, url, headers=headers, data=data)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
@@ -0,0 +1,48 @@
|
||||
from typing import List
|
||||
|
||||
from app.api.routers.models import Message
|
||||
from llama_index.core.prompts import PromptTemplate
|
||||
from llama_index.core.settings import Settings
|
||||
from pydantic import BaseModel
|
||||
|
||||
NEXT_QUESTIONS_SUGGESTION_PROMPT = PromptTemplate(
|
||||
"You're a helpful assistant! Your task is to suggest the next question that user might ask. "
|
||||
"\nHere is the conversation history"
|
||||
"\n---------------------\n{conversation}\n---------------------"
|
||||
"Given the conversation history, please give me $number_of_questions questions that you might ask next!"
|
||||
)
|
||||
N_QUESTION_TO_GENERATE = 3
|
||||
|
||||
|
||||
class NextQuestions(BaseModel):
|
||||
"""A list of questions that user might ask next"""
|
||||
|
||||
questions: List[str]
|
||||
|
||||
|
||||
class NextQuestionSuggestion:
|
||||
@staticmethod
|
||||
async def suggest_next_questions(
|
||||
messages: List[Message],
|
||||
number_of_questions: int = N_QUESTION_TO_GENERATE,
|
||||
) -> List[str]:
|
||||
# Reduce the cost by only using the last two messages
|
||||
last_user_message = None
|
||||
last_assistant_message = None
|
||||
for message in reversed(messages):
|
||||
if message.role == "user":
|
||||
last_user_message = f"User: {message.content}"
|
||||
elif message.role == "assistant":
|
||||
last_assistant_message = f"Assistant: {message.content}"
|
||||
if last_user_message and last_assistant_message:
|
||||
break
|
||||
conversation: str = f"{last_user_message}\n{last_assistant_message}"
|
||||
|
||||
output: NextQuestions = await Settings.llm.astructured_predict(
|
||||
NextQuestions,
|
||||
prompt=NEXT_QUESTIONS_SUGGESTION_PROMPT,
|
||||
conversation=conversation,
|
||||
nun_questions=number_of_questions,
|
||||
)
|
||||
|
||||
return output.questions
|
||||
@@ -0,0 +1,31 @@
|
||||
import os
|
||||
from llama_index.core.settings import Settings
|
||||
from llama_index.core.agent import AgentRunner
|
||||
from llama_index.core.tools.query_engine import QueryEngineTool
|
||||
from app.engine.tools import ToolFactory
|
||||
from app.engine.index import get_index
|
||||
|
||||
|
||||
def get_chat_engine(filters=None, params=None):
|
||||
system_prompt = os.getenv("SYSTEM_PROMPT")
|
||||
top_k = os.getenv("TOP_K", "3")
|
||||
tools = []
|
||||
|
||||
# Add query tool if index exists
|
||||
index = get_index()
|
||||
if index is not None:
|
||||
query_engine = index.as_query_engine(
|
||||
similarity_top_k=int(top_k), filters=filters
|
||||
)
|
||||
query_engine_tool = QueryEngineTool.from_defaults(query_engine=query_engine)
|
||||
tools.append(query_engine_tool)
|
||||
|
||||
# Add additional tools
|
||||
tools += ToolFactory.from_env()
|
||||
|
||||
return AgentRunner.from_llm(
|
||||
llm=Settings.llm,
|
||||
tools=tools,
|
||||
system_prompt=system_prompt,
|
||||
verbose=True,
|
||||
)
|
||||
@@ -0,0 +1,51 @@
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
import os
|
||||
import logging
|
||||
from app.settings import init_settings
|
||||
from app.engine.loaders import get_documents
|
||||
from llama_index.indices.managed.llama_cloud import LlamaCloudIndex
|
||||
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
def generate_datasource():
|
||||
init_settings()
|
||||
logger.info("Generate index for the provided data")
|
||||
|
||||
name = os.getenv("LLAMA_CLOUD_INDEX_NAME")
|
||||
project_name = os.getenv("LLAMA_CLOUD_PROJECT_NAME")
|
||||
api_key = os.getenv("LLAMA_CLOUD_API_KEY")
|
||||
base_url = os.getenv("LLAMA_CLOUD_BASE_URL")
|
||||
organization_id = os.getenv("LLAMA_CLOUD_ORGANIZATION_ID")
|
||||
|
||||
if name is None or project_name is None or api_key is None:
|
||||
raise ValueError(
|
||||
"Please set LLAMA_CLOUD_INDEX_NAME, LLAMA_CLOUD_PROJECT_NAME and LLAMA_CLOUD_API_KEY"
|
||||
" to your environment variables or config them in .env file"
|
||||
)
|
||||
|
||||
documents = get_documents()
|
||||
|
||||
# Set private=false to mark the document as public (required for filtering)
|
||||
for doc in documents:
|
||||
doc.metadata["private"] = "false"
|
||||
|
||||
LlamaCloudIndex.from_documents(
|
||||
documents=documents,
|
||||
name=name,
|
||||
project_name=project_name,
|
||||
api_key=api_key,
|
||||
base_url=base_url,
|
||||
organization_id=organization_id
|
||||
)
|
||||
|
||||
logger.info("Finished generating the index")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
generate_datasource()
|
||||
@@ -0,0 +1,31 @@
|
||||
import logging
|
||||
import os
|
||||
from llama_index.indices.managed.llama_cloud import LlamaCloudIndex
|
||||
|
||||
|
||||
logger = logging.getLogger("uvicorn")
|
||||
|
||||
def get_index(params=None):
|
||||
configParams = params or {}
|
||||
pipelineConfig = configParams.get("llamaCloudPipeline", {})
|
||||
name = pipelineConfig.get("pipeline", os.getenv("LLAMA_CLOUD_INDEX_NAME"))
|
||||
project_name = pipelineConfig.get("project", os.getenv("LLAMA_CLOUD_PROJECT_NAME"))
|
||||
api_key = os.getenv("LLAMA_CLOUD_API_KEY")
|
||||
base_url = os.getenv("LLAMA_CLOUD_BASE_URL")
|
||||
organization_id = os.getenv("LLAMA_CLOUD_ORGANIZATION_ID")
|
||||
|
||||
if name is None or project_name is None or api_key is None:
|
||||
raise ValueError(
|
||||
"Please set LLAMA_CLOUD_INDEX_NAME, LLAMA_CLOUD_PROJECT_NAME and LLAMA_CLOUD_API_KEY"
|
||||
" to your environment variables or config them in .env file"
|
||||
)
|
||||
|
||||
index = LlamaCloudIndex(
|
||||
name=name,
|
||||
project_name=project_name,
|
||||
api_key=api_key,
|
||||
base_url=base_url,
|
||||
organization_id=organization_id
|
||||
)
|
||||
|
||||
return index
|
||||
@@ -0,0 +1,37 @@
|
||||
import logging
|
||||
|
||||
import yaml
|
||||
from app.engine.loaders.db import DBLoaderConfig, get_db_documents
|
||||
from app.engine.loaders.file import FileLoaderConfig, get_file_documents
|
||||
from app.engine.loaders.web import WebLoaderConfig, get_web_documents
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def load_configs():
|
||||
with open("config/loaders.yaml") as f:
|
||||
configs = yaml.safe_load(f)
|
||||
return configs
|
||||
|
||||
|
||||
def get_documents():
|
||||
documents = []
|
||||
config = load_configs()
|
||||
for loader_type, loader_config in config.items():
|
||||
logger.info(
|
||||
f"Loading documents from loader: {loader_type}, config: {loader_config}"
|
||||
)
|
||||
match loader_type:
|
||||
case "file":
|
||||
document = get_file_documents(FileLoaderConfig(**loader_config))
|
||||
case "web":
|
||||
document = get_web_documents(WebLoaderConfig(**loader_config))
|
||||
case "db":
|
||||
document = get_db_documents(
|
||||
configs=[DBLoaderConfig(**cfg) for cfg in loader_config]
|
||||
)
|
||||
case _:
|
||||
raise ValueError(f"Invalid loader type: {loader_type}")
|
||||
documents.extend(document)
|
||||
|
||||
return documents
|
||||
@@ -0,0 +1,26 @@
|
||||
import os
|
||||
import logging
|
||||
from typing import List
|
||||
from pydantic import BaseModel, validator
|
||||
from llama_index.core.indices.vector_store import VectorStoreIndex
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DBLoaderConfig(BaseModel):
|
||||
uri: str
|
||||
queries: List[str]
|
||||
|
||||
|
||||
def get_db_documents(configs: list[DBLoaderConfig]):
|
||||
from llama_index.readers.database import DatabaseReader
|
||||
|
||||
docs = []
|
||||
for entry in configs:
|
||||
loader = DatabaseReader(uri=entry.uri)
|
||||
for query in entry.queries:
|
||||
logger.info(f"Loading data from database with query: {query}")
|
||||
documents = loader.load_data(query=query)
|
||||
docs.extend(documents)
|
||||
|
||||
return documents
|
||||
@@ -0,0 +1,79 @@
|
||||
import os
|
||||
import logging
|
||||
from typing import Dict
|
||||
from llama_parse import LlamaParse
|
||||
from pydantic import BaseModel, validator
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FileLoaderConfig(BaseModel):
|
||||
data_dir: str = "data"
|
||||
use_llama_parse: bool = False
|
||||
|
||||
@validator("data_dir")
|
||||
def data_dir_must_exist(cls, v):
|
||||
if not os.path.isdir(v):
|
||||
raise ValueError(f"Directory '{v}' does not exist")
|
||||
return v
|
||||
|
||||
|
||||
def llama_parse_parser():
|
||||
if os.getenv("LLAMA_CLOUD_API_KEY") is None:
|
||||
raise ValueError(
|
||||
"LLAMA_CLOUD_API_KEY environment variable is not set. "
|
||||
"Please set it in .env file or in your shell environment then run again!"
|
||||
)
|
||||
parser = LlamaParse(
|
||||
result_type="markdown",
|
||||
verbose=True,
|
||||
language="en",
|
||||
ignore_errors=False,
|
||||
)
|
||||
return parser
|
||||
|
||||
|
||||
def llama_parse_extractor() -> Dict[str, LlamaParse]:
|
||||
from llama_parse.utils import SUPPORTED_FILE_TYPES
|
||||
|
||||
parser = llama_parse_parser()
|
||||
return {file_type: parser for file_type in SUPPORTED_FILE_TYPES}
|
||||
|
||||
|
||||
def get_file_documents(config: FileLoaderConfig):
|
||||
from llama_index.core.readers import SimpleDirectoryReader
|
||||
|
||||
try:
|
||||
file_extractor = None
|
||||
if config.use_llama_parse:
|
||||
# LlamaParse is async first,
|
||||
# so we need to use nest_asyncio to run it in sync mode
|
||||
import nest_asyncio
|
||||
|
||||
nest_asyncio.apply()
|
||||
|
||||
file_extractor = llama_parse_extractor()
|
||||
reader = SimpleDirectoryReader(
|
||||
config.data_dir,
|
||||
recursive=True,
|
||||
filename_as_id=True,
|
||||
raise_on_error=True,
|
||||
file_extractor=file_extractor,
|
||||
)
|
||||
return reader.load_data()
|
||||
except Exception as e:
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
# Catch the error if the data dir is empty
|
||||
# and return as empty document list
|
||||
_, _, exc_traceback = sys.exc_info()
|
||||
function_name = traceback.extract_tb(exc_traceback)[-1].name
|
||||
if function_name == "_add_files":
|
||||
logger.warning(
|
||||
f"Failed to load file documents, error message: {e} . Return as empty document list."
|
||||
)
|
||||
return []
|
||||
else:
|
||||
# Raise the error if it is not the case of empty data dir
|
||||
raise e
|
||||
@@ -0,0 +1,36 @@
|
||||
import os
|
||||
import json
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class CrawlUrl(BaseModel):
|
||||
base_url: str
|
||||
prefix: str
|
||||
max_depth: int = Field(default=1, ge=0)
|
||||
|
||||
|
||||
class WebLoaderConfig(BaseModel):
|
||||
driver_arguments: list[str] = Field(default=None)
|
||||
urls: list[CrawlUrl]
|
||||
|
||||
|
||||
def get_web_documents(config: WebLoaderConfig):
|
||||
from llama_index.readers.web import WholeSiteReader
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
|
||||
options = Options()
|
||||
driver_arguments = config.driver_arguments or []
|
||||
for arg in driver_arguments:
|
||||
options.add_argument(arg)
|
||||
|
||||
docs = []
|
||||
for url in config.urls:
|
||||
scraper = WholeSiteReader(
|
||||
prefix=url.prefix,
|
||||
max_depth=url.max_depth,
|
||||
driver=webdriver.Chrome(options=options),
|
||||
)
|
||||
docs.extend(scraper.load_data(url.base_url))
|
||||
|
||||
return docs
|
||||
@@ -0,0 +1,56 @@
|
||||
import os
|
||||
import yaml
|
||||
import json
|
||||
import importlib
|
||||
from cachetools import cached, LRUCache
|
||||
from llama_index.core.tools.tool_spec.base import BaseToolSpec
|
||||
from llama_index.core.tools.function_tool import FunctionTool
|
||||
|
||||
|
||||
class ToolType:
|
||||
LLAMAHUB = "llamahub"
|
||||
LOCAL = "local"
|
||||
|
||||
|
||||
class ToolFactory:
|
||||
|
||||
TOOL_SOURCE_PACKAGE_MAP = {
|
||||
ToolType.LLAMAHUB: "llama_index.tools",
|
||||
ToolType.LOCAL: "app.engine.tools",
|
||||
}
|
||||
|
||||
def load_tools(tool_type: str, tool_name: str, config: dict) -> list[FunctionTool]:
|
||||
source_package = ToolFactory.TOOL_SOURCE_PACKAGE_MAP[tool_type]
|
||||
try:
|
||||
if "ToolSpec" in tool_name:
|
||||
tool_package, tool_cls_name = tool_name.split(".")
|
||||
module_name = f"{source_package}.{tool_package}"
|
||||
module = importlib.import_module(module_name)
|
||||
tool_class = getattr(module, tool_cls_name)
|
||||
tool_spec: BaseToolSpec = tool_class(**config)
|
||||
return tool_spec.to_tool_list()
|
||||
else:
|
||||
module = importlib.import_module(f"{source_package}.{tool_name}")
|
||||
tools = module.get_tools(**config)
|
||||
if not all(isinstance(tool, FunctionTool) for tool in tools):
|
||||
raise ValueError(
|
||||
f"The module {module} does not contain valid tools"
|
||||
)
|
||||
return tools
|
||||
except ImportError as e:
|
||||
raise ValueError(f"Failed to import tool {tool_name}: {e}")
|
||||
except AttributeError as e:
|
||||
raise ValueError(f"Failed to load tool {tool_name}: {e}")
|
||||
|
||||
@staticmethod
|
||||
def from_env() -> list[FunctionTool]:
|
||||
tools = []
|
||||
if os.path.exists("config/tools.yaml"):
|
||||
with open("config/tools.yaml", "r") as f:
|
||||
tool_configs = yaml.safe_load(f)
|
||||
for tool_type, config_entries in tool_configs.items():
|
||||
for tool_name, config in config_entries.items():
|
||||
tools.extend(
|
||||
ToolFactory.load_tools(tool_type, tool_name, config)
|
||||
)
|
||||
return tools
|
||||
@@ -0,0 +1,36 @@
|
||||
from llama_index.core.tools.function_tool import FunctionTool
|
||||
|
||||
|
||||
def duckduckgo_search(
|
||||
query: str,
|
||||
region: str = "wt-wt",
|
||||
max_results: int = 10,
|
||||
):
|
||||
"""
|
||||
Use this function to search for any query in DuckDuckGo.
|
||||
Args:
|
||||
query (str): The query to search in DuckDuckGo.
|
||||
region Optional(str): The region to be used for the search in [country-language] convention, ex us-en, uk-en, ru-ru, etc...
|
||||
max_results Optional(int): The maximum number of results to be returned. Default is 10.
|
||||
"""
|
||||
try:
|
||||
from duckduckgo_search import DDGS
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"duckduckgo_search package is required to use this function."
|
||||
"Please install it by running: `poetry add duckduckgo_search` or `pip install duckduckgo_search`"
|
||||
)
|
||||
|
||||
params = {
|
||||
"keywords": query,
|
||||
"region": region,
|
||||
"max_results": max_results,
|
||||
}
|
||||
results = []
|
||||
with DDGS() as ddg:
|
||||
results = list(ddg.text(**params))
|
||||
return results
|
||||
|
||||
|
||||
def get_tools(**kwargs):
|
||||
return [FunctionTool.from_defaults(duckduckgo_search)]
|
||||
@@ -0,0 +1,108 @@
|
||||
import os
|
||||
import uuid
|
||||
import logging
|
||||
import requests
|
||||
from typing import Optional
|
||||
from pydantic import BaseModel, Field
|
||||
from llama_index.core.tools import FunctionTool
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ImageGeneratorToolOutput(BaseModel):
|
||||
is_success: bool = Field(
|
||||
...,
|
||||
description="Whether the image generation was successful.",
|
||||
)
|
||||
image_url: Optional[str] = Field(
|
||||
None,
|
||||
description="The URL of the generated image.",
|
||||
)
|
||||
error_message: Optional[str] = Field(
|
||||
None,
|
||||
description="The error message if the image generation failed.",
|
||||
)
|
||||
|
||||
|
||||
class ImageGeneratorTool:
|
||||
_IMG_OUTPUT_FORMAT = "webp"
|
||||
_IMG_OUTPUT_DIR = "output/tool"
|
||||
_IMG_GEN_API = "https://api.stability.ai/v2beta/stable-image/generate/core"
|
||||
|
||||
def __init__(self, api_key: str = None):
|
||||
if not api_key:
|
||||
api_key = os.getenv("STABILITY_API_KEY")
|
||||
self._api_key = api_key
|
||||
self.fileserver_url_prefix = os.getenv("FILESERVER_URL_PREFIX")
|
||||
if self._api_key is None:
|
||||
raise ValueError(
|
||||
"STABILITY_API_KEY key is required to run image generator. Get it here: https://platform.stability.ai/account/keys"
|
||||
)
|
||||
if self.fileserver_url_prefix is None:
|
||||
raise ValueError("FILESERVER_URL_PREFIX is required.")
|
||||
|
||||
def _prepare_output_dir(self):
|
||||
"""
|
||||
Create the output directory if it doesn't exist
|
||||
"""
|
||||
if not os.path.exists(self._IMG_OUTPUT_DIR):
|
||||
os.makedirs(self._IMG_OUTPUT_DIR, exist_ok=True)
|
||||
|
||||
def _save_image(self, image_data: bytes):
|
||||
self._prepare_output_dir()
|
||||
filename = f"{uuid.uuid4()}.{self._IMG_OUTPUT_FORMAT}"
|
||||
output_path = os.path.join(self._IMG_OUTPUT_DIR, filename)
|
||||
with open(output_path, "wb") as f:
|
||||
f.write(image_data)
|
||||
url = f"{os.getenv('FILESERVER_URL_PREFIX')}/{self._IMG_OUTPUT_DIR}/{filename}"
|
||||
logger.info(f"Saved image to {output_path}.\nURL: {url}")
|
||||
return url
|
||||
|
||||
def _call_stability_api(self, prompt: str):
|
||||
headers = {
|
||||
"authorization": f"Bearer {self._api_key}",
|
||||
"accept": "image/*",
|
||||
}
|
||||
data = {
|
||||
"prompt": prompt,
|
||||
"output_format": self._IMG_OUTPUT_FORMAT,
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
self._IMG_GEN_API,
|
||||
headers=headers,
|
||||
files={"none": ""},
|
||||
data=data,
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
return response
|
||||
|
||||
def generate_image(self, prompt: str) -> ImageGeneratorToolOutput:
|
||||
"""
|
||||
Use this tool to generate an image based on the prompt.
|
||||
Args:
|
||||
prompt (str): The prompt to generate the image from.
|
||||
"""
|
||||
|
||||
try:
|
||||
# Call the Stability API
|
||||
response = self._call_stability_api(prompt)
|
||||
|
||||
# Save the image and get the URL
|
||||
image_url = self._save_image(response.content)
|
||||
|
||||
return ImageGeneratorToolOutput(
|
||||
is_success=True,
|
||||
image_url=image_url,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(e, exc_info=True)
|
||||
return ImageGeneratorToolOutput(
|
||||
is_success=False,
|
||||
error_message=str(e),
|
||||
)
|
||||
|
||||
|
||||
def get_tools(**kwargs):
|
||||
return [FunctionTool.from_defaults(ImageGeneratorTool(**kwargs).generate_image)]
|
||||
@@ -0,0 +1,143 @@
|
||||
import os
|
||||
import logging
|
||||
import base64
|
||||
import uuid
|
||||
from pydantic import BaseModel
|
||||
from typing import List, Tuple, Dict, Optional
|
||||
from llama_index.core.tools import FunctionTool
|
||||
from e2b_code_interpreter import CodeInterpreter
|
||||
from e2b_code_interpreter.models import Logs
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class InterpreterExtraResult(BaseModel):
|
||||
type: str
|
||||
content: Optional[str] = None
|
||||
filename: Optional[str] = None
|
||||
url: Optional[str] = None
|
||||
|
||||
|
||||
class E2BToolOutput(BaseModel):
|
||||
is_error: bool
|
||||
logs: Logs
|
||||
results: List[InterpreterExtraResult] = []
|
||||
|
||||
|
||||
class E2BCodeInterpreter:
|
||||
|
||||
output_dir = "output/tool"
|
||||
|
||||
def __init__(self, api_key: str = None):
|
||||
if api_key is None:
|
||||
api_key = os.getenv("E2B_API_KEY")
|
||||
filesever_url_prefix = os.getenv("FILESERVER_URL_PREFIX")
|
||||
if not api_key:
|
||||
raise ValueError(
|
||||
"E2B_API_KEY key is required to run code interpreter. Get it here: https://e2b.dev/docs/getting-started/api-key"
|
||||
)
|
||||
if not filesever_url_prefix:
|
||||
raise ValueError(
|
||||
"FILESERVER_URL_PREFIX is required to display file output from sandbox"
|
||||
)
|
||||
|
||||
self.filesever_url_prefix = filesever_url_prefix
|
||||
self.interpreter = CodeInterpreter(api_key=api_key)
|
||||
|
||||
def __del__(self):
|
||||
self.interpreter.close()
|
||||
|
||||
def get_output_path(self, filename: str) -> str:
|
||||
# if output directory doesn't exist, create it
|
||||
if not os.path.exists(self.output_dir):
|
||||
os.makedirs(self.output_dir, exist_ok=True)
|
||||
return os.path.join(self.output_dir, filename)
|
||||
|
||||
def save_to_disk(self, base64_data: str, ext: str) -> Dict:
|
||||
filename = f"{uuid.uuid4()}.{ext}" # generate a unique filename
|
||||
buffer = base64.b64decode(base64_data)
|
||||
output_path = self.get_output_path(filename)
|
||||
|
||||
try:
|
||||
with open(output_path, "wb") as file:
|
||||
file.write(buffer)
|
||||
except IOError as e:
|
||||
logger.error(f"Failed to write to file {output_path}: {str(e)}")
|
||||
raise e
|
||||
|
||||
logger.info(f"Saved file to {output_path}")
|
||||
|
||||
return {
|
||||
"outputPath": output_path,
|
||||
"filename": filename,
|
||||
}
|
||||
|
||||
def get_file_url(self, filename: str) -> str:
|
||||
return f"{self.filesever_url_prefix}/{self.output_dir}/{filename}"
|
||||
|
||||
def parse_result(self, result) -> List[InterpreterExtraResult]:
|
||||
"""
|
||||
The result could include multiple formats (e.g. png, svg, etc.) but encoded in base64
|
||||
We save each result to disk and return saved file metadata (extension, filename, url)
|
||||
"""
|
||||
if not result:
|
||||
return []
|
||||
|
||||
output = []
|
||||
|
||||
try:
|
||||
formats = result.formats()
|
||||
results = [result[format] for format in formats]
|
||||
|
||||
for ext, data in zip(formats, results):
|
||||
match ext:
|
||||
case "png" | "svg" | "jpeg" | "pdf":
|
||||
result = self.save_to_disk(data, ext)
|
||||
filename = result["filename"]
|
||||
output.append(
|
||||
InterpreterExtraResult(
|
||||
type=ext,
|
||||
filename=filename,
|
||||
url=self.get_file_url(filename),
|
||||
)
|
||||
)
|
||||
case _:
|
||||
output.append(
|
||||
InterpreterExtraResult(
|
||||
type=ext,
|
||||
content=data,
|
||||
)
|
||||
)
|
||||
except Exception as error:
|
||||
logger.exception(error, exc_info=True)
|
||||
logger.error("Error when parsing output from E2b interpreter tool", error)
|
||||
|
||||
return output
|
||||
|
||||
def interpret(self, code: str) -> E2BToolOutput:
|
||||
"""
|
||||
Execute python code in a Jupyter notebook cell, the toll will return result, stdout, stderr, display_data, and error.
|
||||
|
||||
Parameters:
|
||||
code (str): The python code to be executed in a single cell.
|
||||
"""
|
||||
logger.info(
|
||||
f"\n{'='*50}\n> Running following AI-generated code:\n{code}\n{'='*50}"
|
||||
)
|
||||
exec = self.interpreter.notebook.exec_cell(code)
|
||||
|
||||
if exec.error:
|
||||
logger.error("Error when executing code", exec.error)
|
||||
output = E2BToolOutput(is_error=True, logs=exec.logs, results=[])
|
||||
else:
|
||||
if len(exec.results) == 0:
|
||||
output = E2BToolOutput(is_error=False, logs=exec.logs, results=[])
|
||||
else:
|
||||
results = self.parse_result(exec.results[0])
|
||||
output = E2BToolOutput(is_error=False, logs=exec.logs, results=results)
|
||||
return output
|
||||
|
||||
|
||||
def get_tools(**kwargs):
|
||||
return [FunctionTool.from_defaults(E2BCodeInterpreter(**kwargs).interpret)]
|
||||
@@ -0,0 +1,78 @@
|
||||
from typing import Dict, List, Tuple
|
||||
from llama_index.tools.openapi import OpenAPIToolSpec
|
||||
from llama_index.tools.requests import RequestsToolSpec
|
||||
|
||||
|
||||
class OpenAPIActionToolSpec(OpenAPIToolSpec, RequestsToolSpec):
|
||||
"""
|
||||
A combination of OpenAPI and Requests tool specs that can parse OpenAPI specs and make requests.
|
||||
|
||||
openapi_uri: str: The file path or URL to the OpenAPI spec.
|
||||
domain_headers: dict: Whitelist domains and the headers to use.
|
||||
"""
|
||||
|
||||
spec_functions = OpenAPIToolSpec.spec_functions + RequestsToolSpec.spec_functions
|
||||
# Cached parsed specs by URI
|
||||
_specs: Dict[str, Tuple[Dict, List[str]]] = {}
|
||||
|
||||
def __init__(self, openapi_uri: str, domain_headers: dict = None, **kwargs):
|
||||
if domain_headers is None:
|
||||
domain_headers = {}
|
||||
if openapi_uri not in self._specs:
|
||||
openapi_spec, servers = self._load_openapi_spec(openapi_uri)
|
||||
self._specs[openapi_uri] = (openapi_spec, servers)
|
||||
else:
|
||||
openapi_spec, servers = self._specs[openapi_uri]
|
||||
|
||||
# Add the servers to the domain headers if they are not already present
|
||||
for server in servers:
|
||||
if server not in domain_headers:
|
||||
domain_headers[server] = {}
|
||||
|
||||
OpenAPIToolSpec.__init__(self, spec=openapi_spec)
|
||||
RequestsToolSpec.__init__(self, domain_headers)
|
||||
|
||||
@staticmethod
|
||||
def _load_openapi_spec(uri: str) -> Tuple[Dict, List[str]]:
|
||||
"""
|
||||
Load an OpenAPI spec from a URI.
|
||||
|
||||
Args:
|
||||
uri (str): A file path or URL to the OpenAPI spec.
|
||||
|
||||
Returns:
|
||||
List[Document]: A list of Document objects.
|
||||
"""
|
||||
import yaml
|
||||
from urllib.parse import urlparse
|
||||
|
||||
if uri.startswith("http"):
|
||||
import requests
|
||||
|
||||
response = requests.get(uri)
|
||||
if response.status_code != 200:
|
||||
raise ValueError(
|
||||
"Could not initialize OpenAPIActionToolSpec: "
|
||||
f"Failed to load OpenAPI spec from {uri}, status code: {response.status_code}"
|
||||
)
|
||||
spec = yaml.safe_load(response.text)
|
||||
elif uri.startswith("file"):
|
||||
filepath = urlparse(uri).path
|
||||
with open(filepath, "r") as file:
|
||||
spec = yaml.safe_load(file)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Could not initialize OpenAPIActionToolSpec: Invalid OpenAPI URI provided. "
|
||||
"Only HTTP and file path are supported."
|
||||
)
|
||||
# Add the servers to the whitelist
|
||||
try:
|
||||
servers = [
|
||||
urlparse(server["url"]).netloc for server in spec.get("servers", [])
|
||||
]
|
||||
except KeyError as e:
|
||||
raise ValueError(
|
||||
"Could not initialize OpenAPIActionToolSpec: Invalid OpenAPI spec provided. "
|
||||
"Could not get `servers` from the spec."
|
||||
) from e
|
||||
return spec, servers
|
||||
@@ -0,0 +1,73 @@
|
||||
"""Open Meteo weather map tool spec."""
|
||||
|
||||
import logging
|
||||
import requests
|
||||
import pytz
|
||||
from llama_index.core.tools import FunctionTool
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OpenMeteoWeather:
|
||||
geo_api = "https://geocoding-api.open-meteo.com/v1"
|
||||
weather_api = "https://api.open-meteo.com/v1"
|
||||
|
||||
@classmethod
|
||||
def _get_geo_location(cls, location: str) -> dict:
|
||||
"""Get geo location from location name."""
|
||||
params = {"name": location, "count": 10, "language": "en", "format": "json"}
|
||||
response = requests.get(f"{cls.geo_api}/search", params=params)
|
||||
if response.status_code != 200:
|
||||
raise Exception(f"Failed to fetch geo location: {response.status_code}")
|
||||
else:
|
||||
data = response.json()
|
||||
result = data["results"][0]
|
||||
geo_location = {
|
||||
"id": result["id"],
|
||||
"name": result["name"],
|
||||
"latitude": result["latitude"],
|
||||
"longitude": result["longitude"],
|
||||
}
|
||||
return geo_location
|
||||
|
||||
@classmethod
|
||||
def get_weather_information(cls, location: str) -> dict:
|
||||
"""Use this function to get the weather of any given location.
|
||||
Note that the weather code should follow WMO Weather interpretation codes (WW):
|
||||
0: Clear sky
|
||||
1, 2, 3: Mainly clear, partly cloudy, and overcast
|
||||
45, 48: Fog and depositing rime fog
|
||||
51, 53, 55: Drizzle: Light, moderate, and dense intensity
|
||||
56, 57: Freezing Drizzle: Light and dense intensity
|
||||
61, 63, 65: Rain: Slight, moderate and heavy intensity
|
||||
66, 67: Freezing Rain: Light and heavy intensity
|
||||
71, 73, 75: Snow fall: Slight, moderate, and heavy intensity
|
||||
77: Snow grains
|
||||
80, 81, 82: Rain showers: Slight, moderate, and violent
|
||||
85, 86: Snow showers slight and heavy
|
||||
95: Thunderstorm: Slight or moderate
|
||||
96, 99: Thunderstorm with slight and heavy hail
|
||||
"""
|
||||
logger.info(
|
||||
f"Calling open-meteo api to get weather information of location: {location}"
|
||||
)
|
||||
geo_location = cls._get_geo_location(location)
|
||||
timezone = pytz.timezone("UTC").zone
|
||||
params = {
|
||||
"latitude": geo_location["latitude"],
|
||||
"longitude": geo_location["longitude"],
|
||||
"current": "temperature_2m,weather_code",
|
||||
"hourly": "temperature_2m,weather_code",
|
||||
"daily": "weather_code",
|
||||
"timezone": timezone,
|
||||
}
|
||||
response = requests.get(f"{cls.weather_api}/forecast", params=params)
|
||||
if response.status_code != 200:
|
||||
raise Exception(
|
||||
f"Failed to fetch weather information: {response.status_code}"
|
||||
)
|
||||
return response.json()
|
||||
|
||||
|
||||
def get_tools(**kwargs):
|
||||
return [FunctionTool.from_defaults(OpenMeteoWeather.get_weather_information)]
|
||||
@@ -0,0 +1,61 @@
|
||||
from llama_index.embeddings.openai import OpenAIEmbedding
|
||||
from llama_index.core.settings import Settings
|
||||
from typing import Dict
|
||||
import os
|
||||
|
||||
DEFAULT_MODEL = "gpt-3.5-turbo"
|
||||
DEFAULT_EMBEDDING_MODEL = "text-embedding-3-large"
|
||||
|
||||
class TSIEmbedding(OpenAIEmbedding):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._query_engine = self._text_engine = self.model_name
|
||||
|
||||
def llm_config_from_env() -> Dict:
|
||||
from llama_index.core.constants import DEFAULT_TEMPERATURE
|
||||
|
||||
model = os.getenv("MODEL", DEFAULT_MODEL)
|
||||
temperature = os.getenv("LLM_TEMPERATURE", DEFAULT_TEMPERATURE)
|
||||
max_tokens = os.getenv("LLM_MAX_TOKENS")
|
||||
api_key = os.getenv("T_SYSTEMS_LLMHUB_API_KEY")
|
||||
api_base = os.getenv("T_SYSTEMS_LLMHUB_BASE_URL")
|
||||
|
||||
config = {
|
||||
"model": model,
|
||||
"api_key": api_key,
|
||||
"api_base": api_base,
|
||||
"temperature": float(temperature),
|
||||
"max_tokens": int(max_tokens) if max_tokens is not None else None,
|
||||
}
|
||||
return config
|
||||
|
||||
|
||||
def embedding_config_from_env() -> Dict:
|
||||
from llama_index.core.constants import DEFAULT_EMBEDDING_DIM
|
||||
|
||||
model = os.getenv("EMBEDDING_MODEL", DEFAULT_EMBEDDING_MODEL)
|
||||
dimension = os.getenv("EMBEDDING_DIM", DEFAULT_EMBEDDING_DIM)
|
||||
api_key = os.getenv("T_SYSTEMS_LLMHUB_API_KEY")
|
||||
api_base = os.getenv("T_SYSTEMS_LLMHUB_BASE_URL")
|
||||
|
||||
config = {
|
||||
"model_name": model,
|
||||
"dimension": int(dimension) if dimension is not None else None,
|
||||
"api_key": api_key,
|
||||
"api_base": api_base,
|
||||
}
|
||||
return config
|
||||
|
||||
def init_llmhub():
|
||||
from llama_index.llms.openai_like import OpenAILike
|
||||
|
||||
llm_configs = llm_config_from_env()
|
||||
embedding_configs = embedding_config_from_env()
|
||||
|
||||
Settings.embed_model = TSIEmbedding(**embedding_configs)
|
||||
Settings.llm = OpenAILike(
|
||||
**llm_configs,
|
||||
is_chat_model=True,
|
||||
is_function_calling_model=False,
|
||||
context_window=4096,
|
||||
)
|
||||
@@ -0,0 +1,2 @@
|
||||
def init_observability():
|
||||
pass
|
||||
@@ -0,0 +1,172 @@
|
||||
import os
|
||||
from typing import Dict
|
||||
|
||||
from llama_index.core.settings import Settings
|
||||
|
||||
|
||||
def init_settings():
|
||||
model_provider = os.getenv("MODEL_PROVIDER")
|
||||
match model_provider:
|
||||
case "openai":
|
||||
init_openai()
|
||||
case "groq":
|
||||
init_groq()
|
||||
case "ollama":
|
||||
init_ollama()
|
||||
case "anthropic":
|
||||
init_anthropic()
|
||||
case "gemini":
|
||||
init_gemini()
|
||||
case "mistral":
|
||||
init_mistral()
|
||||
case "azure-openai":
|
||||
init_azure_openai()
|
||||
case "t-systems":
|
||||
from .llmhub import init_llmhub
|
||||
|
||||
init_llmhub()
|
||||
case _:
|
||||
raise ValueError(f"Invalid model provider: {model_provider}")
|
||||
|
||||
Settings.chunk_size = int(os.getenv("CHUNK_SIZE", "1024"))
|
||||
Settings.chunk_overlap = int(os.getenv("CHUNK_OVERLAP", "20"))
|
||||
|
||||
|
||||
def init_ollama():
|
||||
from llama_index.embeddings.ollama import OllamaEmbedding
|
||||
from llama_index.llms.ollama.base import DEFAULT_REQUEST_TIMEOUT, Ollama
|
||||
|
||||
base_url = os.getenv("OLLAMA_BASE_URL") or "http://127.0.0.1:11434"
|
||||
request_timeout = float(
|
||||
os.getenv("OLLAMA_REQUEST_TIMEOUT", DEFAULT_REQUEST_TIMEOUT)
|
||||
)
|
||||
Settings.embed_model = OllamaEmbedding(
|
||||
base_url=base_url,
|
||||
model_name=os.getenv("EMBEDDING_MODEL"),
|
||||
)
|
||||
Settings.llm = Ollama(
|
||||
base_url=base_url, model=os.getenv("MODEL"), request_timeout=request_timeout
|
||||
)
|
||||
|
||||
|
||||
def init_openai():
|
||||
from llama_index.core.constants import DEFAULT_TEMPERATURE
|
||||
from llama_index.embeddings.openai import OpenAIEmbedding
|
||||
from llama_index.llms.openai import OpenAI
|
||||
|
||||
max_tokens = os.getenv("LLM_MAX_TOKENS")
|
||||
config = {
|
||||
"model": os.getenv("MODEL"),
|
||||
"temperature": float(os.getenv("LLM_TEMPERATURE", DEFAULT_TEMPERATURE)),
|
||||
"max_tokens": int(max_tokens) if max_tokens is not None else None,
|
||||
}
|
||||
Settings.llm = OpenAI(**config)
|
||||
|
||||
dimensions = os.getenv("EMBEDDING_DIM")
|
||||
config = {
|
||||
"model": os.getenv("EMBEDDING_MODEL"),
|
||||
"dimensions": int(dimensions) if dimensions is not None else None,
|
||||
}
|
||||
Settings.embed_model = OpenAIEmbedding(**config)
|
||||
|
||||
|
||||
def init_azure_openai():
|
||||
from llama_index.core.constants import DEFAULT_TEMPERATURE
|
||||
from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding
|
||||
from llama_index.llms.azure_openai import AzureOpenAI
|
||||
|
||||
llm_deployment = os.environ["AZURE_OPENAI_LLM_DEPLOYMENT"]
|
||||
embedding_deployment = os.environ["AZURE_OPENAI_EMBEDDING_DEPLOYMENT"]
|
||||
max_tokens = os.getenv("LLM_MAX_TOKENS")
|
||||
temperature = os.getenv("LLM_TEMPERATURE", DEFAULT_TEMPERATURE)
|
||||
dimensions = os.getenv("EMBEDDING_DIM")
|
||||
|
||||
azure_config = {
|
||||
"api_key": os.environ["AZURE_OPENAI_KEY"],
|
||||
"azure_endpoint": os.environ["AZURE_OPENAI_ENDPOINT"],
|
||||
"api_version": os.getenv("AZURE_OPENAI_API_VERSION")
|
||||
or os.getenv("OPENAI_API_VERSION"),
|
||||
}
|
||||
|
||||
Settings.llm = AzureOpenAI(
|
||||
model=os.getenv("MODEL"),
|
||||
max_tokens=int(max_tokens) if max_tokens is not None else None,
|
||||
temperature=float(temperature),
|
||||
deployment_name=llm_deployment,
|
||||
**azure_config,
|
||||
)
|
||||
|
||||
Settings.embed_model = AzureOpenAIEmbedding(
|
||||
model=os.getenv("EMBEDDING_MODEL"),
|
||||
dimensions=int(dimensions) if dimensions is not None else None,
|
||||
deployment_name=embedding_deployment,
|
||||
**azure_config,
|
||||
)
|
||||
|
||||
|
||||
def init_fastembed():
|
||||
"""
|
||||
Use Qdrant Fastembed as the local embedding provider.
|
||||
"""
|
||||
from llama_index.embeddings.fastembed import FastEmbedEmbedding
|
||||
|
||||
embed_model_map: Dict[str, str] = {
|
||||
# Small and multilingual
|
||||
"all-MiniLM-L6-v2": "sentence-transformers/all-MiniLM-L6-v2",
|
||||
# Large and multilingual
|
||||
"paraphrase-multilingual-mpnet-base-v2": "sentence-transformers/paraphrase-multilingual-mpnet-base-v2", # noqa: E501
|
||||
}
|
||||
|
||||
# This will download the model automatically if it is not already downloaded
|
||||
Settings.embed_model = FastEmbedEmbedding(
|
||||
model_name=embed_model_map[os.getenv("EMBEDDING_MODEL")]
|
||||
)
|
||||
|
||||
|
||||
def init_groq():
|
||||
from llama_index.llms.groq import Groq
|
||||
|
||||
model_map: Dict[str, str] = {
|
||||
"llama3-8b": "llama3-8b-8192",
|
||||
"llama3-70b": "llama3-70b-8192",
|
||||
"mixtral-8x7b": "mixtral-8x7b-32768",
|
||||
}
|
||||
|
||||
Settings.llm = Groq(model=model_map[os.getenv("MODEL")])
|
||||
# Groq does not provide embeddings, so we use FastEmbed instead
|
||||
init_fastembed()
|
||||
|
||||
|
||||
def init_anthropic():
|
||||
from llama_index.llms.anthropic import Anthropic
|
||||
|
||||
model_map: Dict[str, str] = {
|
||||
"claude-3-opus": "claude-3-opus-20240229",
|
||||
"claude-3-sonnet": "claude-3-sonnet-20240229",
|
||||
"claude-3-haiku": "claude-3-haiku-20240307",
|
||||
"claude-2.1": "claude-2.1",
|
||||
"claude-instant-1.2": "claude-instant-1.2",
|
||||
}
|
||||
|
||||
Settings.llm = Anthropic(model=model_map[os.getenv("MODEL")])
|
||||
# Anthropic does not provide embeddings, so we use FastEmbed instead
|
||||
init_fastembed()
|
||||
|
||||
|
||||
def init_gemini():
|
||||
from llama_index.embeddings.gemini import GeminiEmbedding
|
||||
from llama_index.llms.gemini import Gemini
|
||||
|
||||
model_name = f"models/{os.getenv('MODEL')}"
|
||||
embed_model_name = f"models/{os.getenv('EMBEDDING_MODEL')}"
|
||||
|
||||
Settings.llm = Gemini(model=model_name)
|
||||
Settings.embed_model = GeminiEmbedding(model_name=embed_model_name)
|
||||
|
||||
|
||||
def init_mistral():
|
||||
from llama_index.embeddings.mistralai import MistralAIEmbedding
|
||||
from llama_index.llms.mistralai import MistralAI
|
||||
|
||||
Settings.llm = MistralAI(model=os.getenv("MODEL"))
|
||||
Settings.embed_model = MistralAIEmbedding(model_name=os.getenv("EMBEDDING_MODEL"))
|
||||
@@ -0,0 +1,10 @@
|
||||
file:
|
||||
# use_llama_parse: Use LlamaParse if `true`. Needs a `LLAMA_CLOUD_API_KEY` from https://cloud.llamaindex.ai set as environment variable
|
||||
use_llama_parse: true
|
||||
db:
|
||||
# The configuration for the database loader, only supports MySQL and PostgreSQL databases for now.
|
||||
# uri: The URI for the database. E.g.: mysql+pymysql://user:password@localhost:3306/db or postgresql+psycopg2://user:password@localhost:5432/db
|
||||
# query: The query to fetch data from the database. E.g.: SELECT * FROM table
|
||||
- uri: mysql+pymysql://zjinfo1:Dy2Bcr53Hm5xRkba@110.42.234.166:3306/zjinfo1
|
||||
queries:
|
||||
- SELECT * FROM mytable
|
||||
@@ -0,0 +1,4 @@
|
||||
local:
|
||||
weather: {}
|
||||
interpreter: {}
|
||||
llamahub: {}
|
||||
Binary file not shown.
@@ -0,0 +1,64 @@
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
import logging
|
||||
import os
|
||||
import uvicorn
|
||||
from fastapi import FastAPI
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import RedirectResponse
|
||||
from app.api.routers.chat import chat_router
|
||||
from app.api.routers.upload import file_upload_router
|
||||
from app.settings import init_settings
|
||||
from app.observability import init_observability
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
init_settings()
|
||||
init_observability()
|
||||
|
||||
environment = os.getenv("ENVIRONMENT", "dev") # Default to 'development' if not set
|
||||
logger = logging.getLogger("uvicorn")
|
||||
|
||||
if environment == "dev":
|
||||
logger.warning("Running in development mode - allowing CORS for all origins")
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
# Redirect to documentation page when accessing base URL
|
||||
@app.get("/")
|
||||
async def redirect_to_docs():
|
||||
return RedirectResponse(url="/docs")
|
||||
|
||||
|
||||
def mount_static_files(directory, path):
|
||||
if os.path.exists(directory):
|
||||
for dir, _, _ in os.walk(directory):
|
||||
relative_path = os.path.relpath(dir, directory)
|
||||
mount_path = path if relative_path == "." else f"{path}/{relative_path}"
|
||||
logger.info(f"Mounting static files '{dir}' at {mount_path}")
|
||||
app.mount(mount_path, StaticFiles(directory=dir), name=f"{dir}-static")
|
||||
|
||||
|
||||
# Mount the data files to serve the file viewer
|
||||
mount_static_files("data", "/api/files/data")
|
||||
# Mount the output files from tools
|
||||
mount_static_files("output", "/api/files/output")
|
||||
|
||||
app.include_router(chat_router, prefix="/api/chat")
|
||||
app.include_router(file_upload_router, prefix="/api/chat/upload")
|
||||
|
||||
if __name__ == "__main__":
|
||||
app_host = os.getenv("APP_HOST", "0.0.0.0")
|
||||
app_port = int(os.getenv("APP_PORT", "8000"))
|
||||
reload = True if environment == "dev" else False
|
||||
|
||||
uvicorn.run(app="main:app", host=app_host, port=app_port, reload=reload)
|
||||
@@ -0,0 +1,48 @@
|
||||
[tool]
|
||||
[tool.poetry]
|
||||
name = "app"
|
||||
version = "0.1.0"
|
||||
description = ""
|
||||
authors = [ "Marcus Schiesser <mail@marcusschiesser.de>" ]
|
||||
readme = "README.md"
|
||||
|
||||
[tool.poetry.scripts]
|
||||
generate = "app.engine.generate:generate_datasource"
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.11,<3.12"
|
||||
fastapi = "^0.109.1"
|
||||
python-dotenv = "^1.0.0"
|
||||
aiostream = "^0.5.2"
|
||||
llama-index = "0.10.58"
|
||||
cachetools = "^5.3.3"
|
||||
|
||||
[tool.poetry.dependencies.uvicorn]
|
||||
extras = [ "standard" ]
|
||||
version = "^0.23.2"
|
||||
|
||||
[tool.poetry.dependencies.llama-index-readers-database]
|
||||
version = "^0.1.3"
|
||||
|
||||
[tool.poetry.dependencies.pymysql]
|
||||
version = "^1.1.0"
|
||||
extras = [ "rsa" ]
|
||||
|
||||
[tool.poetry.dependencies.psycopg2]
|
||||
version = "^2.9.9"
|
||||
|
||||
[tool.poetry.dependencies.llama-index-indices-managed-llama-cloud]
|
||||
version = "^0.2.7"
|
||||
|
||||
[tool.poetry.dependencies.docx2txt]
|
||||
version = "^0.8"
|
||||
|
||||
[tool.poetry.dependencies.e2b_code_interpreter]
|
||||
version = "0.0.7"
|
||||
|
||||
[tool.poetry.dependencies.llama-index-agent-openai]
|
||||
version = "0.2.6"
|
||||
|
||||
[build-system]
|
||||
requires = [ "poetry-core" ]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
Reference in New Issue
Block a user