实现多工程数据存储支持

This commit is contained in:
wanyaokun
2024-08-13 13:10:52 +08:00
parent 3a81a83033
commit 7e58a1a223
9 changed files with 97 additions and 51 deletions
+17 -19
View File
@@ -5,7 +5,7 @@ load_dotenv()
import logging
import os
from app.engine.loaders import get_documents
from app.engine.loaders import get_document_Types, get_documents
from app.engine.vectordb import get_vector_store
from app.settings import init_settings
from llama_index.core.ingestion import IngestionPipeline
@@ -19,17 +19,16 @@ logger = logging.getLogger()
STORAGE_DIR = os.getenv("STORAGE_DIR", "storage")
def get_doc_store():
def get_doc_store(docType:str):
# If the storage directory is there, load the document store from it.
# If not, set up an in-memory document store since we can't load from a directory that doesn't exist.
if os.path.exists(STORAGE_DIR):
return SimpleDocumentStore.from_persist_dir(STORAGE_DIR)
storeDir = os.path.join(STORAGE_DIR,docType)
if os.path.exists(storeDir):
return SimpleDocumentStore.from_persist_dir(storeDir)
else:
return SimpleDocumentStore()
def run_pipeline(docstore, vector_store, documents):
pipeline = IngestionPipeline(
transformations=[
@@ -49,7 +48,6 @@ def run_pipeline(docstore, vector_store, documents):
return nodes
def persist_storage(docstore, vector_store):
storage_context = StorageContext.from_defaults(
docstore=docstore,
@@ -57,28 +55,28 @@ def persist_storage(docstore, vector_store):
)
storage_context.persist(STORAGE_DIR)
def generate_datasource():
init_settings()
logger.info("Generate index for the provided data")
# Get the stores and documents or create new ones
documents = get_documents()
# Set private=false to mark the document as public (required for filtering)
for doc in documents:
doc.metadata["private"] = "false"
docstore = get_doc_store()
vector_store = get_vector_store()
docTypes = get_document_Types()
for docType in docTypes:
documents = get_documents(docType)
# Set private=false to mark the document as public (required for filtering)
for doc in documents:
doc.metadata["private"] = "false"
docstore = get_doc_store(docType)
vector_store = get_vector_store(docType)
# Run the ingestion pipeline
_ = run_pipeline(docstore, vector_store, documents)
# Run the ingestion pipeline
_ = run_pipeline(docstore, vector_store, documents)
# Build the index and persist storage
persist_storage(docstore, vector_store)
# Build the index and persist storage
persist_storage(docstore, vector_store)
logger.info("Finished generating the index")
if __name__ == "__main__":
from phoenix.trace import using_project
with using_project(os.getenv("PHOENIX_PROJECT_NAME") + "_generate") as obj: