Spaces:
Running
Running
| import logging | |
| from pathlib import Path | |
| from typing import List, Dict, Union, Optional | |
| import re | |
| import openai | |
| import requests | |
| from PyPDF2 import PdfReader | |
| from gradio_client import Client | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def extract_text_from_pdf(file_path: str) -> str: | |
| """ | |
| Extract text from a PDF file with robust error handling. | |
| Args: | |
| file_path: Path to the PDF file | |
| Returns: | |
| Extracted text as a string | |
| Raises: | |
| ValueError: If file doesn't exist or isn't readable | |
| RuntimeError: If text extraction fails | |
| """ | |
| try: | |
| if not Path(file_path).exists(): | |
| raise ValueError(f"PDF file not found: {file_path}") | |
| reader = PdfReader(file_path) | |
| text_content = [] | |
| for page_num, page in enumerate(reader.pages, 1): | |
| try: | |
| text = page.extract_text() | |
| if text.strip(): | |
| text_content.append(text) | |
| else: | |
| logger.warning(f"Page {page_num} appears to be empty or unreadable") | |
| except Exception as e: | |
| logger.error(f"Error extracting text from page {page_num}: {str(e)}") | |
| continue | |
| if not text_content: | |
| raise RuntimeError("No readable text found in PDF") | |
| return "\n\n".join(text_content) | |
| except Exception as e: | |
| logger.error(f"PDF extraction failed: {str(e)}") | |
| raise RuntimeError(f"Failed to process PDF: {str(e)}") | |
| def format_content(text: str, format_type: str) -> str: | |
| """ | |
| Format extracted text into the specified output format. | |
| Args: | |
| text: Raw text content | |
| format_type: Output format ('txt', 'md', 'html') | |
| Returns: | |
| Formatted text string | |
| Raises: | |
| ValueError: If format type is invalid | |
| """ | |
| if not isinstance(text, str): | |
| raise ValueError("Input text must be a string") | |
| # Clean up common PDF extraction artifacts | |
| text = re.sub(r'\s+', ' ', text) # Normalize whitespace | |
| text = re.sub(r'(?<=[.!?])\s+', '\n\n', text) # Split sentences into paragraphs | |
| text = text.strip() | |
| if format_type.lower() == 'txt': | |
| return text | |
| elif format_type.lower() == 'md': | |
| paragraphs = text.split('\n\n') | |
| md_text = [] | |
| for para in paragraphs: | |
| # Detect and format headers | |
| if re.match(r'^[A-Z][^.!?]*$', para.strip()): | |
| md_text.append(f"## {para.strip()}") | |
| else: | |
| md_text.append(para.strip()) | |
| return '\n\n'.join(md_text) | |
| elif format_type.lower() == 'html': | |
| paragraphs = text.split('\n\n') | |
| html_parts = ['<!DOCTYPE html>', '<html>', '<body>'] | |
| for para in paragraphs: | |
| if re.match(r'^[A-Z][^.!?]*$', para.strip()): | |
| html_parts.append(f"<h2>{para.strip()}</h2>") | |
| else: | |
| html_parts.append(f"<p>{para.strip()}</p>") | |
| html_parts.extend(['</body>', '</html>']) | |
| return '\n'.join(html_parts) | |
| else: | |
| raise ValueError(f"Unsupported format type: {format_type}") | |
| def split_into_snippets(text: str, chunk_size: int = 4000, overlap: int = 200) -> List[str]: | |
| """ | |
| Split text into overlapping chunks that fit within model context windows. | |
| Args: | |
| text: Input text to split | |
| chunk_size: Maximum size of each chunk | |
| overlap: Number of characters to overlap between chunks | |
| Returns: | |
| List of text snippets | |
| Raises: | |
| ValueError: If chunk_size is too small or text is empty | |
| """ | |
| if not text: | |
| raise ValueError("Input text is empty") | |
| if chunk_size < 1000: | |
| raise ValueError("Chunk size must be at least 1000 characters") | |
| # Split into paragraphs first | |
| paragraphs = text.split('\n\n') | |
| chunks = [] | |
| current_chunk = [] | |
| current_size = 0 | |
| for para in paragraphs: | |
| para_size = len(para) | |
| if current_size + para_size <= chunk_size: | |
| current_chunk.append(para) | |
| current_size += para_size + 2 # +2 for newlines | |
| else: | |
| if current_chunk: | |
| chunks.append('\n\n'.join(current_chunk)) | |
| # Start new chunk with overlap | |
| if chunks: | |
| overlap_text = chunks[-1][-overlap:] if overlap > 0 else "" | |
| current_chunk = [overlap_text, para] | |
| current_size = len(overlap_text) + para_size + 2 | |
| else: | |
| current_chunk = [para] | |
| current_size = para_size | |
| # Add the last chunk if it exists | |
| if current_chunk: | |
| chunks.append('\n\n'.join(current_chunk)) | |
| return chunks | |
| def build_prompts(chunks: List[str], custom_prompt: Optional[str] = None) -> List[str]: | |
| """ | |
| Build formatted prompts for each text chunk. | |
| Args: | |
| chunks: List of text chunks | |
| custom_prompt: Optional custom instruction | |
| Returns: | |
| List of formatted prompt strings | |
| """ | |
| default_prompt = """Please analyze and summarize the following text. Focus on: | |
| 1. Key points and main ideas | |
| 2. Important details and supporting evidence | |
| 3. Any conclusions or recommendations | |
| Please maintain the original meaning while being concise.""" | |
| instruction = custom_prompt if custom_prompt else default_prompt | |
| prompts = [] | |
| for i, chunk in enumerate(chunks, 1): | |
| prompt = f"""### Instruction | |
| {instruction} | |
| ### Input Text (Part {i} of {len(chunks)}) | |
| {chunk} | |
| ### End of Input Text | |
| Please provide your summary below:""" | |
| prompts.append(prompt) | |
| return prompts | |
| def process_with_model( | |
| prompt: str, | |
| model_choice: str, | |
| api_key: Optional[str] = None, | |
| oauth_token: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Process text with selected model. | |
| Args: | |
| prompt: Input prompt | |
| model_choice: Selected model name | |
| api_key: OpenAI API key for GPT models | |
| oauth_token: Hugging Face token for other models | |
| Returns: | |
| Generated summary | |
| Raises: | |
| ValueError: If required credentials are missing | |
| RuntimeError: If model processing fails | |
| """ | |
| try: | |
| if 'gpt' in model_choice.lower(): | |
| if not api_key: | |
| raise ValueError("OpenAI API key required for GPT models") | |
| openai.api_key = api_key | |
| response = openai.ChatCompletion.create( | |
| model="gpt-3.5-turbo" if "3.5" in model_choice else "gpt-4", | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.7, | |
| max_tokens=1500 | |
| ) | |
| return response.choices[0].message.content | |
| else: # Hugging Face models | |
| if not oauth_token: | |
| raise ValueError("Hugging Face token required") | |
| headers = {"Authorization": f"Bearer {oauth_token}"} | |
| # Map model choice to actual model ID | |
| model_map = { | |
| "Claude-3": "anthropic/claude-3-opus-20240229", | |
| "Mistral": "mistralai/Mixtral-8x7B-Instruct-v0.1" | |
| } | |
| model_id = model_map.get(model_choice) | |
| if not model_id: | |
| raise ValueError(f"Unknown model: {model_choice}") | |
| response = requests.post( | |
| f"https://api-inference.huggingface.co/models/{model_id}", | |
| headers=headers, | |
| json={"inputs": prompt} | |
| ) | |
| if response.status_code != 200: | |
| raise RuntimeError(f"Model API error: {response.text}") | |
| return response.json()[0]["generated_text"] | |
| except Exception as e: | |
| logger.error(f"Model processing failed: {str(e)}") | |
| raise RuntimeError(f"Failed to process with model: {str(e)}") | |
| def validate_api_keys(openai_key: Optional[str] = None, hf_token: Optional[str] = None) -> Dict[str, bool]: | |
| """ | |
| Validate API keys for different services. | |
| Args: | |
| openai_key: OpenAI API key | |
| hf_token: Hugging Face token | |
| Returns: | |
| Dictionary with validation results | |
| """ | |
| results = {"openai": False, "huggingface": False} | |
| if openai_key: | |
| try: | |
| openai.api_key = openai_key | |
| openai.Model.list() | |
| results["openai"] = True | |
| except: | |
| pass | |
| if hf_token: | |
| try: | |
| response = requests.get( | |
| "https://huggingface.co/api/models", | |
| headers={"Authorization": f"Bearer {hf_token}"} | |
| ) | |
| results["huggingface"] = response.status_code == 200 | |
| except: | |
| pass | |
| return results |