Spaces:
Running
Running
| # Import required libraries | |
| import os | |
| from pathlib import Path | |
| from typing import List | |
| from langchain.schema import Document | |
| from .config import logger | |
| from llama_parse import LlamaParse | |
| from llama_index.core import SimpleDirectoryReader | |
| def load_pdf_documents(pdf_path: Path, api_key: str = None) -> List[Document]: | |
| """ | |
| Load and process PDF documents from medical guidelines using LlamaParse. | |
| Excellent for borderless tables and complex medical document layouts. | |
| Extracts disease and provider from directory structure. | |
| Directory structure expected: data/new_data/PROVIDER/file.pdf | |
| Example: data/new_data/SASLT/SASLT_2021.pdf | |
| Args: | |
| pdf_path: Path to the PDF file | |
| api_key: LlamaCloud API key. If None, reads from LLAMA_CLOUD_API_KEY env variable | |
| Get your API key from: https://cloud.llamaindex.ai/api-key | |
| Returns: | |
| List of Document objects with metadata (source, disease, provider, page_number) | |
| """ | |
| try: | |
| # Validate file exists | |
| if not pdf_path.exists(): | |
| raise FileNotFoundError(f"PDF file not found at {pdf_path}") | |
| # Extract provider from directory structure | |
| path_parts = pdf_path.parts | |
| disease = "HBV" # Default disease for this system | |
| provider = "unknown" | |
| # Find provider: it's the parent directory of the PDF file | |
| if len(path_parts) >= 2: | |
| provider = path_parts[-2] # Parent directory (e.g., SASLT) | |
| # If provider is 'new_data', it means file is directly in new_data folder | |
| if provider.lower() == "new_data": | |
| provider = "unknown" | |
| # Get API key from parameter or environment variable | |
| llama_api_key = api_key or os.getenv("LLAMA_CLOUD_API_KEY") | |
| if not llama_api_key: | |
| raise ValueError( | |
| "LlamaCloud API key not found. Please provide api_key parameter or set " | |
| "LLAMA_CLOUD_API_KEY environment variable. " | |
| "Get your key from: https://cloud.llamaindex.ai/api-key" | |
| ) | |
| # Initialize LlamaParse with optimized settings for medical documents | |
| parser = LlamaParse( | |
| api_key=llama_api_key, | |
| result_type="markdown", # or "text" for plain text | |
| verbose=True, | |
| language="en", | |
| # Medical document optimizations | |
| parsing_instruction=( | |
| "This is a medical guideline document. " | |
| "Pay special attention to tables (including borderless tables), " | |
| "clinical recommendations, dosage information, and reference citations. " | |
| "Preserve table structure and maintain hierarchical headings." | |
| ), | |
| # Advanced options for better table handling | |
| invalidate_cache=False, # Use cache for faster re-processing | |
| do_not_cache=False, | |
| fast_mode=False, # Use deep parsing for better accuracy | |
| # Split by page for proper page numbering | |
| split_by_page=True, # This is the key parameter! | |
| ) | |
| # Parse the PDF file | |
| logger.info(f"Parsing PDF with LlamaParse: {pdf_path.name}") | |
| # Use SimpleDirectoryReader with LlamaParse | |
| file_extractor = {".pdf": parser} | |
| reader = SimpleDirectoryReader( | |
| input_files=[str(pdf_path)], | |
| file_extractor=file_extractor | |
| ) | |
| # Load documents - each page will be a separate document when split_by_page=True | |
| llama_documents = reader.load_data() | |
| # Convert to LangChain Document format | |
| documents = [] | |
| for doc_idx, llama_doc in enumerate(llama_documents): | |
| # When split_by_page=True, each llama_doc represents one page | |
| # Check if page number exists in metadata, otherwise use index | |
| page_num = llama_doc.metadata.get('page_number', doc_idx + 1) | |
| processed_doc = Document( | |
| page_content=llama_doc.text.strip(), | |
| metadata={ | |
| "source": pdf_path.name, | |
| "disease": disease, | |
| "provider": provider, | |
| "page_number": page_num, | |
| "document_index": doc_idx, | |
| # Preserve any additional metadata from LlamaParse | |
| **{k: v for k, v in llama_doc.metadata.items() | |
| if k not in ['source', 'disease', 'provider', 'page_number', 'document_index']} | |
| } | |
| ) | |
| documents.append(processed_doc) | |
| logger.info(f"Loaded {len(documents)} pages from PDF: {pdf_path.name} (Disease: {disease}, Provider: {provider})") | |
| return documents | |
| except Exception as e: | |
| logger.error(f"Error loading PDF documents from {pdf_path}: {str(e)}") | |
| raise | |
| def load_pdf_documents_advanced( | |
| pdf_path: Path, | |
| api_key: str = None, | |
| premium_mode: bool = False | |
| ) -> List[Document]: | |
| """ | |
| Advanced version with premium features for complex medical documents. | |
| Args: | |
| pdf_path: Path to the PDF file | |
| api_key: LlamaCloud API key | |
| premium_mode: Use premium GPT-4o mode for highest accuracy (costs more) | |
| Returns: | |
| List of Document objects with enhanced metadata | |
| """ | |
| try: | |
| if not pdf_path.exists(): | |
| raise FileNotFoundError(f"PDF file not found at {pdf_path}") | |
| path_parts = pdf_path.parts | |
| disease = "HBV" | |
| provider = path_parts[-2] if len(path_parts) >= 2 else "unknown" | |
| if provider.lower() == "new_data": | |
| provider = "unknown" | |
| llama_api_key = api_key or os.getenv("LLAMA_CLOUD_API_KEY") | |
| if not llama_api_key: | |
| raise ValueError("LlamaCloud API key required") | |
| # Advanced parser configuration | |
| parser = LlamaParse( | |
| api_key=llama_api_key, | |
| result_type="markdown", | |
| verbose=True, | |
| language="en", | |
| # Premium mode uses GPT-4o for better accuracy | |
| premium_mode=premium_mode, | |
| # Detailed parsing instructions for medical content | |
| parsing_instruction=( | |
| "Medical guideline document with complex tables. Instructions:\n" | |
| "0. Keep the original text intact without changing anything\n" | |
| "1. Preserve all table structures, especially borderless tables\n" | |
| "2. Maintain hierarchical organization of sections and subsections\n" | |
| "3. Keep dosage tables and treatment algorithms intact\n" | |
| "4. Preserve reference numbers and citations\n" | |
| "5. Identify and mark clinical recommendation levels\n" | |
| "6. Extract figures and their captions accurately" | |
| ), | |
| # Extract structured data | |
| take_screenshot=True, # Capture page screenshots for reference | |
| # Table-specific optimizations | |
| invalidate_cache=False, | |
| do_not_cache=False, | |
| fast_mode=False, | |
| # Critical: split by page for accurate page numbering | |
| split_by_page=True, | |
| ) | |
| file_extractor = {".pdf": parser} | |
| reader = SimpleDirectoryReader( | |
| input_files=[str(pdf_path)], | |
| file_extractor=file_extractor | |
| ) | |
| logger.info(f"Parsing PDF with LlamaParse (Premium: {premium_mode}): {pdf_path.name}") | |
| llama_documents = reader.load_data() | |
| documents = [] | |
| for doc_idx, llama_doc in enumerate(llama_documents): | |
| # Get page number from metadata or use index | |
| page_num = llama_doc.metadata.get('page_number', doc_idx + 1) | |
| # Enhanced metadata | |
| metadata = { | |
| "source": pdf_path.name, | |
| "disease": disease, | |
| "provider": provider, | |
| "page_number": page_num, | |
| "document_index": doc_idx + 1, | |
| "parser": "llamaparse", | |
| "premium_mode": premium_mode | |
| } | |
| # Add additional metadata from LlamaIndex document | |
| if hasattr(llama_doc, 'metadata'): | |
| # Merge additional metadata, avoiding duplicates | |
| for key, value in llama_doc.metadata.items(): | |
| if key not in metadata: | |
| metadata[key] = value | |
| processed_doc = Document( | |
| page_content=llama_doc.text.strip(), | |
| metadata=metadata | |
| ) | |
| documents.append(processed_doc) | |
| logger.info(f"Loaded {len(documents)} pages from PDF: {pdf_path.name}") | |
| return documents | |
| except Exception as e: | |
| logger.error(f"Error loading PDF documents from {pdf_path}: {str(e)}") | |
| raise | |
| # Batch processing function for multiple PDFs | |
| def load_multiple_pdfs( | |
| pdf_directory: Path, | |
| api_key: str = None, | |
| file_pattern: str = "*.pdf" | |
| ) -> List[Document]: | |
| """ | |
| Load multiple PDF files from a directory. | |
| Args: | |
| pdf_directory: Directory containing PDF files | |
| api_key: LlamaCloud API key | |
| file_pattern: Glob pattern for PDF files (default: "*.pdf") | |
| Returns: | |
| List of all documents from all PDFs | |
| """ | |
| all_documents = [] | |
| pdf_files = list(pdf_directory.glob(file_pattern)) | |
| logger.info(f"Found {len(pdf_files)} PDF files to process") | |
| for pdf_path in pdf_files: | |
| try: | |
| documents = load_pdf_documents(pdf_path, api_key=api_key) | |
| all_documents.extend(documents) | |
| logger.info(f"Successfully processed: {pdf_path.name}") | |
| except Exception as e: | |
| logger.error(f"Failed to process {pdf_path.name}: {str(e)}") | |
| continue | |
| logger.info(f"Total documents loaded: {len(all_documents)}") | |
| return all_documents | |
| def load_markdown_documents(md_path: Path) -> List[Document]: | |
| """ | |
| Load and process Markdown medical guidelines. | |
| Extracts disease and provider from directory structure. | |
| Directory structure expected: data/new_data/PROVIDER/file.md | |
| Example: data/new_data/SASLT/guidelines.md | |
| Args: | |
| md_path: Path to the Markdown file | |
| Returns: | |
| List of Document objects with metadata (source, disease, provider, page_number) | |
| """ | |
| try: | |
| # Validate file exists | |
| if not md_path.exists(): | |
| raise FileNotFoundError(f"Markdown file not found at {md_path}") | |
| # Extract provider from directory structure | |
| # Structure: data/new_data/PROVIDER/file.md | |
| path_parts = md_path.parts | |
| disease = "HBV" # Default disease for this system | |
| provider = "unknown" | |
| # Find provider: it's the parent directory of the markdown file | |
| if len(path_parts) >= 2: | |
| provider = path_parts[-2] # Parent directory (e.g., SASLT) | |
| # If provider is 'new_data', it means file is directly in new_data folder | |
| if provider.lower() == "new_data": | |
| provider = "unknown" | |
| # Read markdown content | |
| with open(md_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| # Create document with minimal metadata for RAG | |
| doc = Document( | |
| page_content=content, | |
| metadata={ | |
| "source": md_path.name, | |
| "disease": disease, | |
| "provider": provider, | |
| "page_number": 1 | |
| } | |
| ) | |
| logger.info(f"Loaded Markdown document: {md_path.name} (Disease: {disease}, Provider: {provider})") | |
| return [doc] | |
| except Exception as e: | |
| logger.error(f"Error loading Markdown document from {md_path}: {str(e)}") | |
| raise | |