100 lines
3.2 KiB
Python
100 lines
3.2 KiB
Python
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
import logging
|
|
import os
|
|
|
|
from app.engine.loaders import get_document_Types, get_documents
|
|
from app.engine.vectordb import get_vector_store
|
|
from app.settings import init_settings
|
|
from app.engine.retriever.CHBM25Retriever import CHBM25Retriever
|
|
from llama_index.core.ingestion import IngestionPipeline
|
|
from llama_index.core.node_parser import SentenceSplitter,MarkdownNodeParser
|
|
from llama_index.core.settings import Settings
|
|
from llama_index.core.storage import StorageContext
|
|
from llama_index.core.storage.docstore import SimpleDocumentStore
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger()
|
|
|
|
STORAGE_DIR = os.getenv("STORAGE_DIR", "storage")
|
|
|
|
|
|
def get_doc_store(docType:str):
|
|
|
|
# If the storage directory is there, load the document store from it.
|
|
# If not, set up an in-memory document store since we can't load from a directory that doesn't exist.
|
|
storeDir = os.path.join(STORAGE_DIR,docType)
|
|
if os.path.exists(storeDir):
|
|
return SimpleDocumentStore.from_persist_dir(storeDir)
|
|
else:
|
|
return SimpleDocumentStore()
|
|
|
|
|
|
def run_pipeline(docstore, vector_store, documents):
|
|
pipeline = IngestionPipeline(
|
|
transformations=[
|
|
#SentenceSplitter(
|
|
#chunk_size=Settings.chunk_size,
|
|
#chunk_overlap=Settings.chunk_overlap,
|
|
#),
|
|
MarkdownNodeParser(),
|
|
Settings.embed_model,
|
|
],
|
|
docstore=docstore,
|
|
docstore_strategy="upserts_and_delete",
|
|
vector_store=vector_store,
|
|
)
|
|
|
|
# Run the ingestion pipeline and store the results
|
|
nodes = pipeline.run(show_progress=True, documents=documents)
|
|
|
|
return nodes
|
|
|
|
|
|
def persist_storage(docstore, vector_store):
|
|
storage_context = StorageContext.from_defaults(
|
|
docstore=docstore,
|
|
vector_store=vector_store,
|
|
)
|
|
storage_context.persist(STORAGE_DIR)
|
|
|
|
|
|
def persist_BMRetriever(vector_store):
|
|
STORAGE_DIR = os.getenv("BM_RETRIEVER_PATH", "storage_bm")
|
|
nodes = vector_store.get_nodes([])
|
|
top_k = min(int(os.getenv("TOP_K", "3")),len(nodes))
|
|
bmRetriver = CHBM25Retriever.from_defaults(similarity_top_k=top_k,nodes = nodes)
|
|
bmRetriver.persist(STORAGE_DIR)
|
|
|
|
|
|
def generate_datasource():
|
|
init_settings()
|
|
logger.info("Generate index for the provided data")
|
|
|
|
# Get the stores and documents or create new ones
|
|
docTypes = get_document_Types()
|
|
for docType in docTypes:
|
|
documents = get_documents(docType)
|
|
# Set private=false to mark the document as public (required for filtering)
|
|
for doc in documents:
|
|
doc.metadata["private"] = "false"
|
|
docstore = get_doc_store(docType)
|
|
vector_store = get_vector_store(docType)
|
|
|
|
# Run the ingestion pipeline
|
|
_ = run_pipeline(docstore, vector_store, documents)
|
|
|
|
# Build the index and persist storage
|
|
persist_storage(docstore, vector_store)
|
|
persist_BMRetriever(vector_store)
|
|
|
|
logger.info("Finished generating the index")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
from phoenix.trace import using_project
|
|
with using_project(os.getenv("PHOENIX_PROJECT_NAME") + "_generate") as obj:
|
|
generate_datasource()
|