Spaces:
Sleeping
Sleeping
| """ | |
| PDF Processor Component | |
| Processes PDF files to extract text and metadata | |
| """ | |
| import os | |
| import re | |
| import warnings | |
| from typing import List, Dict, Optional, Any | |
| from datetime import datetime | |
| from pathlib import Path | |
| # PDF processing libraries | |
| import pypdf | |
| try: | |
| import pdfplumber | |
| import fitz # PyMuPDF | |
| PDF_ENHANCED = True | |
| except ImportError: | |
| PDF_ENHANCED = False | |
| warnings.filterwarnings('ignore') | |
| class PDFProcessor: | |
| """ | |
| Processes PDF files to extract text, metadata, and structure | |
| Supports multiple PDF processing libraries for better compatibility | |
| """ | |
| def __init__(self, config=None): | |
| # Import Config only when needed to avoid dependency issues | |
| if config is None: | |
| try: | |
| from .config import Config | |
| self.config = Config() | |
| except ImportError: | |
| # Fallback to None if Config cannot be imported | |
| self.config = None | |
| else: | |
| self.config = config | |
| self.supported_formats = ['.pdf'] | |
| # Check available libraries | |
| self.libraries = { | |
| 'pypdf': True, | |
| 'pdfplumber': PDF_ENHANCED, | |
| 'PyMuPDF': PDF_ENHANCED | |
| } | |
| print(f"PDF Processor initialized with libraries: {[k for k, v in self.libraries.items() if v]}") | |
| def extract_text_from_file(self, file_path: str, method: str = 'auto') -> Dict[str, Any]: | |
| """ | |
| Extract text from PDF file | |
| Args: | |
| file_path: Path to PDF file | |
| method: Extraction method ('auto', 'pypdf', 'pdfplumber', 'pymupdf') | |
| Returns: | |
| Dictionary with extracted text and metadata | |
| """ | |
| if not os.path.exists(file_path): | |
| return {'error': f"File not found: {file_path}"} | |
| if not file_path.lower().endswith('.pdf'): | |
| return {'error': f"Not a PDF file: {file_path}"} | |
| try: | |
| print(f"Processing PDF: {os.path.basename(file_path)}") | |
| # Try different methods based on preference | |
| if method == 'auto': | |
| # Try methods in order of preference | |
| methods = ['pdfplumber', 'pymupdf', 'pypdf'] | |
| for m in methods: | |
| if self.libraries.get(m.replace('pymupdf', 'PyMuPDF').replace('pdfplumber', 'pdfplumber').replace('pypdf', 'pypdf')): | |
| result = self._extract_with_method(file_path, m) | |
| if result and not result.get('error'): | |
| return result | |
| # If all methods fail, return error | |
| return {'error': 'All extraction methods failed'} | |
| else: | |
| return self._extract_with_method(file_path, method) | |
| except Exception as e: | |
| return {'error': f"Error processing PDF: {str(e)}"} | |
| def _extract_with_method(self, file_path: str, method: str) -> Dict[str, Any]: | |
| """ | |
| Extract text using a specific method | |
| Args: | |
| file_path: Path to PDF file | |
| method: Extraction method | |
| Returns: | |
| Dictionary with extracted text and metadata | |
| """ | |
| try: | |
| if method == 'pdfplumber' and self.libraries['pdfplumber']: | |
| return self._extract_with_pdfplumber(file_path) | |
| elif method == 'pymupdf' and self.libraries['PyMuPDF']: | |
| return self._extract_with_pymupdf(file_path) | |
| elif method == 'pypdf' and self.libraries['pypdf']: | |
| return self._extract_with_pypdf(file_path) | |
| else: | |
| return {'error': f"Method {method} not available"} | |
| except Exception as e: | |
| return {'error': f"Error with method {method}: {str(e)}"} | |
| def _extract_with_pdfplumber(self, file_path: str) -> Dict[str, Any]: | |
| """Extract text using pdfplumber (best for tables and layout)""" | |
| import pdfplumber | |
| text_content = [] | |
| metadata = { | |
| 'method': 'pdfplumber', | |
| 'pages': 0, | |
| 'tables': 0, | |
| 'images': 0 | |
| } | |
| with pdfplumber.open(file_path) as pdf: | |
| metadata['pages'] = len(pdf.pages) | |
| for page_num, page in enumerate(pdf.pages): | |
| # Extract text | |
| page_text = page.extract_text() | |
| if page_text: | |
| text_content.append(f"--- Page {page_num + 1} ---\n{page_text}") | |
| # Count tables | |
| tables = page.extract_tables() | |
| if tables: | |
| metadata['tables'] += len(tables) | |
| # Add table content | |
| for table in tables: | |
| table_text = self._format_table(table) | |
| text_content.append(f"--- Table on Page {page_num + 1} ---\n{table_text}") | |
| # Count images | |
| if hasattr(page, 'images'): | |
| metadata['images'] += len(page.images) | |
| full_text = '\n\n'.join(text_content) | |
| return { | |
| 'text': full_text, | |
| 'metadata': metadata, | |
| 'word_count': len(full_text.split()), | |
| 'char_count': len(full_text), | |
| 'extracted_at': datetime.now().isoformat(), | |
| 'file_path': file_path | |
| } | |
| def _extract_with_pymupdf(self, file_path: str) -> Dict[str, Any]: | |
| """Extract text using PyMuPDF (fast and accurate)""" | |
| import fitz | |
| doc = fitz.open(file_path) | |
| text_content = [] | |
| metadata = { | |
| 'method': 'pymupdf', | |
| 'pages': len(doc), | |
| 'images': 0, | |
| 'links': 0 | |
| } | |
| for page_num in range(len(doc)): | |
| page = doc[page_num] | |
| # Extract text | |
| page_text = page.get_text() | |
| if page_text.strip(): | |
| text_content.append(f"--- Page {page_num + 1} ---\n{page_text}") | |
| # Count images | |
| images = page.get_images() | |
| metadata['images'] += len(images) | |
| # Count links | |
| links = page.get_links() | |
| metadata['links'] += len(links) | |
| doc.close() | |
| full_text = '\n\n'.join(text_content) | |
| return { | |
| 'text': full_text, | |
| 'metadata': metadata, | |
| 'word_count': len(full_text.split()), | |
| 'char_count': len(full_text), | |
| 'extracted_at': datetime.now().isoformat(), | |
| 'file_path': file_path | |
| } | |
| def _extract_with_pypdf(self, file_path: str) -> Dict[str, Any]: | |
| """Extract text using pypdf (basic but reliable)""" | |
| text_content = [] | |
| metadata = { | |
| 'method': 'pypdf', | |
| 'pages': 0 | |
| } | |
| with open(file_path, 'rb') as file: | |
| pdf_reader = pypdf.PdfReader(file) | |
| metadata['pages'] = len(pdf_reader.pages) | |
| for page_num, page in enumerate(pdf_reader.pages): | |
| page_text = page.extract_text() | |
| if page_text.strip(): | |
| text_content.append(f"--- Page {page_num + 1} ---\n{page_text}") | |
| full_text = '\n\n'.join(text_content) | |
| return { | |
| 'text': full_text, | |
| 'metadata': metadata, | |
| 'word_count': len(full_text.split()), | |
| 'char_count': len(full_text), | |
| 'extracted_at': datetime.now().isoformat(), | |
| 'file_path': file_path | |
| } | |
| def _format_table(self, table: List[List[str]]) -> str: | |
| """Format a table for text output""" | |
| if not table: | |
| return "" | |
| formatted_rows = [] | |
| for row in table: | |
| if row: # Skip empty rows | |
| formatted_row = ' | '.join(str(cell) if cell else '' for cell in row) | |
| formatted_rows.append(formatted_row) | |
| return '\n'.join(formatted_rows) | |
| def extract_text_from_bytes(self, pdf_bytes: bytes, filename: str = "uploaded.pdf") -> Dict[str, Any]: | |
| """ | |
| Extract text from PDF bytes (for uploaded files) | |
| Args: | |
| pdf_bytes: PDF file bytes | |
| filename: Original filename | |
| Returns: | |
| Dictionary with extracted text and metadata | |
| """ | |
| try: | |
| # Save bytes to temporary file | |
| import tempfile | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
| tmp_file.write(pdf_bytes) | |
| tmp_path = tmp_file.name | |
| # Extract text | |
| result = self.extract_text_from_file(tmp_path) | |
| # Clean up | |
| os.unlink(tmp_path) | |
| # Update metadata | |
| if 'metadata' in result: | |
| result['metadata']['original_filename'] = filename | |
| result['metadata']['file_size'] = len(pdf_bytes) | |
| return result | |
| except Exception as e: | |
| return {'error': f"Error processing PDF bytes: {str(e)}"} | |
| def validate_pdf(self, file_path: str) -> Dict[str, Any]: | |
| """ | |
| Validate PDF file | |
| Args: | |
| file_path: Path to PDF file | |
| Returns: | |
| Validation result | |
| """ | |
| try: | |
| if not os.path.exists(file_path): | |
| return {'valid': False, 'error': 'File not found'} | |
| if not file_path.lower().endswith('.pdf'): | |
| return {'valid': False, 'error': 'Not a PDF file'} | |
| # Try to open with pypdf | |
| with open(file_path, 'rb') as file: | |
| pdf_reader = pypdf.PdfReader(file) | |
| page_count = len(pdf_reader.pages) | |
| # Check if encrypted | |
| is_encrypted = pdf_reader.is_encrypted | |
| # Get file size | |
| file_size = os.path.getsize(file_path) | |
| return { | |
| 'valid': True, | |
| 'pages': page_count, | |
| 'encrypted': is_encrypted, | |
| 'file_size': file_size, | |
| 'file_path': file_path | |
| } | |
| except Exception as e: | |
| return {'valid': False, 'error': str(e)} | |
| def get_pdf_metadata(self, file_path: str) -> Dict[str, Any]: | |
| """ | |
| Extract metadata from PDF | |
| Args: | |
| file_path: Path to PDF file | |
| Returns: | |
| PDF metadata | |
| """ | |
| try: | |
| metadata = {} | |
| # Try pypdf first | |
| try: | |
| with open(file_path, 'rb') as file: | |
| pdf_reader = pypdf.PdfReader(file) | |
| if pdf_reader.metadata: | |
| metadata.update({ | |
| 'title': pdf_reader.metadata.get('/Title', ''), | |
| 'author': pdf_reader.metadata.get('/Author', ''), | |
| 'subject': pdf_reader.metadata.get('/Subject', ''), | |
| 'creator': pdf_reader.metadata.get('/Creator', ''), | |
| 'producer': pdf_reader.metadata.get('/Producer', ''), | |
| 'creation_date': pdf_reader.metadata.get('/CreationDate', ''), | |
| 'modification_date': pdf_reader.metadata.get('/ModDate', '') | |
| }) | |
| except Exception: | |
| pass | |
| # Try PyMuPDF for additional metadata | |
| if self.libraries['PyMuPDF']: | |
| try: | |
| import fitz | |
| doc = fitz.open(file_path) | |
| doc_metadata = doc.metadata | |
| doc.close() | |
| if doc_metadata: | |
| metadata.update({ | |
| 'format': doc_metadata.get('format', ''), | |
| 'encryption': doc_metadata.get('encryption', ''), | |
| 'keywords': doc_metadata.get('keywords', '') | |
| }) | |
| except Exception: | |
| pass | |
| # Add file system metadata | |
| stat = os.stat(file_path) | |
| metadata.update({ | |
| 'file_size': stat.st_size, | |
| 'created': datetime.fromtimestamp(stat.st_ctime).isoformat(), | |
| 'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(), | |
| 'accessed': datetime.fromtimestamp(stat.st_atime).isoformat() | |
| }) | |
| return metadata | |
| except Exception as e: | |
| return {'error': f"Error extracting metadata: {str(e)}"} | |
| def split_pdf_text(self, text: str, chunk_size: int = None, chunk_overlap: int = None) -> List[str]: | |
| """ | |
| Split PDF text into chunks for processing | |
| Args: | |
| text: Extracted text | |
| chunk_size: Size of each chunk | |
| chunk_overlap: Overlap between chunks | |
| Returns: | |
| List of text chunks | |
| """ | |
| # Use provided values or defaults if config is None | |
| if chunk_size is None: | |
| chunk_size = self.config.CHUNK_SIZE if self.config else 1000 | |
| if chunk_overlap is None: | |
| chunk_overlap = self.config.CHUNK_OVERLAP if self.config else 200 | |
| if len(text) <= chunk_size: | |
| return [text] | |
| chunks = [] | |
| start = 0 | |
| while start < len(text): | |
| end = start + chunk_size | |
| # Try to break at sentence boundary | |
| if end < len(text): | |
| # Look for sentence ending | |
| sentence_end = text.rfind('.', start, end) | |
| if sentence_end > start: | |
| end = sentence_end + 1 | |
| else: | |
| # Look for paragraph break | |
| para_end = text.rfind('\n\n', start, end) | |
| if para_end > start: | |
| end = para_end + 2 | |
| else: | |
| # Look for any line break | |
| line_end = text.rfind('\n', start, end) | |
| if line_end > start: | |
| end = line_end + 1 | |
| chunk = text[start:end].strip() | |
| if chunk: | |
| chunks.append(chunk) | |
| start = end - chunk_overlap | |
| return chunks | |
| def clean_text(self, text: str) -> str: | |
| """ | |
| Clean extracted text | |
| Args: | |
| text: Raw extracted text | |
| Returns: | |
| Cleaned text | |
| """ | |
| if not text: | |
| return "" | |
| # Remove extra whitespace | |
| text = re.sub(r'\s+', ' ', text) | |
| # Remove page headers/footers (basic) | |
| text = re.sub(r'Page \d+', '', text) | |
| # Remove email addresses (optional) | |
| text = re.sub(r'\S+@\S+', '', text) | |
| # Remove URLs (optional) | |
| text = re.sub(r'https?://\S+', '', text) | |
| # Fix common OCR errors | |
| text = text.replace('fi', 'fi') | |
| text = text.replace('fl', 'fl') | |
| text = text.replace('ff', 'ff') | |
| text = text.replace('ffi', 'ffi') | |
| text = text.replace('ffl', 'ffl') | |
| return text.strip() | |
| def get_processing_stats(self) -> Dict[str, Any]: | |
| """ | |
| Get PDF processing statistics | |
| Returns: | |
| Processing statistics | |
| """ | |
| return { | |
| 'available_libraries': self.libraries, | |
| 'supported_formats': self.supported_formats, | |
| 'enhanced_features': PDF_ENHANCED, | |
| 'config': { | |
| 'chunk_size': self.config.CHUNK_SIZE if self.config else 1000, | |
| 'chunk_overlap': self.config.CHUNK_OVERLAP if self.config else 200 | |
| } | |
| } | |