Spaces:
Sleeping
Sleeping
| # src/ingestion_orchestrator/orchestrator.py | |
| from src.data_loader.loader import load_documents | |
| from src.document_processor.processor import process_documents | |
| from src.embedding_generator.embedder import EmbeddingGenerator | |
| from src.vector_store_manager.chroma_manager import ChromaManager | |
| from config.settings import DOCS_FOLDER | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class IngestionOrchestrator: | |
| """ | |
| Orchestrates the end-to-end data ingestion pipeline. | |
| """ | |
| def __init__(self): | |
| # Initialize the necessary components | |
| try: | |
| self.embedding_generator = EmbeddingGenerator() | |
| self.vector_store_manager = ChromaManager(self.embedding_generator) | |
| logger.info("Initialized ingestion orchestrator components.") | |
| except Exception as e: | |
| logger.critical(f"Failed to initialize ingestion orchestrator components: {e}") | |
| raise e | |
| def run_ingestion_pipeline(self, docs_folder: str = DOCS_FOLDER): | |
| """ | |
| Runs the complete ingestion pipeline: loads, processes, and embeds documents. | |
| Args: | |
| docs_folder: The folder containing the source documents. | |
| """ | |
| logger.info(f"Starting ingestion pipeline from folder: {docs_folder}") | |
| # 1. Load documents | |
| # --- Financial Ministry Adaptation --- | |
| # Implement logic to identify *new* or *modified* documents | |
| # instead of reloading everything each time for efficiency. | |
| # Handle potential large number of files efficiently. | |
| # ------------------------------------ | |
| raw_documents = load_documents(docs_folder) | |
| if not raw_documents: | |
| logger.warning("No documents loaded. Ingestion pipeline finished.") | |
| return | |
| # 2. Process documents (split and extract metadata) | |
| processed_chunks = process_documents(raw_documents) | |
| if not processed_chunks: | |
| logger.warning("No processed chunks generated. Ingestion pipeline finished.") | |
| return | |
| # 3. Add documents to the vector store | |
| # The add_documents method handles embedding internally | |
| # --- Financial Ministry Adaptation --- | |
| # Implement logic for updating or deleting documents if the source data changed. | |
| # This requires comparing current source data with what's in ChromaDB (e.g., by source path and modification date or version). | |
| # Use the vector_store_manager's update_documents and delete_documents methods. | |
| # Implement batching for adding documents to avoid overwhelming ChromaDB or the backend. | |
| # ------------------------------------ | |
| self.vector_store_manager.add_documents(processed_chunks) | |
| logger.info("Ingestion pipeline finished successfully.") | |
| # --- Financial Ministry Adaptation --- | |
| # TODO: Add methods for handling updates and deletions specifically. | |
| # def update_changed_documents(self, changed_files: List[str]): pass | |
| # def delete_removed_documents(self, removed_files: List[str]): pass | |
| # ------------------------------------ |