Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import hashlib | |
| import logging | |
| from datetime import datetime | |
| import re | |
| from pathlib import Path | |
| # Document processing imports | |
| import PyPDF2 | |
| from docx import Document as DocxDocument | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| # Local imports | |
| from .utils import getconfig | |
| config = getconfig("params.cfg") | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Models | |
| def extract_text_from_pdf_bytes(file_content: bytes) -> tuple[str, dict]: | |
| """Extract text from PDF bytes (in memory)""" | |
| try: | |
| from io import BytesIO | |
| pdf_reader = PyPDF2.PdfReader(BytesIO(file_content)) | |
| text = "" | |
| metadata = {"total_pages": len(pdf_reader.pages)} | |
| for page_num, page in enumerate(pdf_reader.pages): | |
| page_text = page.extract_text() | |
| text += f"\n--- Page {page_num + 1} ---\n{page_text}" | |
| return text, metadata | |
| except Exception as e: | |
| logger.error(f"PDF extraction error: {str(e)}") | |
| raise Exception(f"Failed to extract text from PDF: {str(e)}") | |
| def extract_text_from_docx_bytes(file_content: bytes) -> tuple[str, dict]: | |
| """Extract text from DOCX bytes (in memory)""" | |
| try: | |
| from io import BytesIO | |
| doc = DocxDocument(BytesIO(file_content)) | |
| text = "" | |
| metadata = {"total_paragraphs": 0} | |
| for paragraph in doc.paragraphs: | |
| if paragraph.text.strip(): | |
| text += f"{paragraph.text}\n" | |
| metadata["total_paragraphs"] += 1 | |
| return text, metadata | |
| except Exception as e: | |
| logger.error(f"DOCX extraction error: {str(e)}") | |
| raise Exception(f"Failed to extract text from DOCX: {str(e)}") | |
| def clean_and_chunk_text(text: str) -> str: | |
| """Clean text and split into chunks, returning formatted context""" | |
| # Basic text cleaning | |
| text = re.sub(r'\n+', '\n', text) | |
| text = re.sub(r'\s+', ' ', text) | |
| text = text.strip() | |
| # Get chunking parameters from config | |
| chunk_size = config.getint('chunking', 'chunk_size', fallback=700) | |
| chunk_overlap = config.getint('chunking', 'chunk_overlap', fallback=50) | |
| separators_str = config.get('chunking', 'separators', fallback='\n\n,\n,. ,! ,? , ,') | |
| separators = [s.strip() for s in separators_str.split(',')] | |
| # Split text into chunks | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| length_function=len, | |
| separators=separators, | |
| is_separator_regex=False | |
| ) | |
| chunks = text_splitter.split_text(text) | |
| # Create DocumentChunk objects | |
| context_parts = [] | |
| for i, chunk_text in enumerate(chunks): | |
| context_parts.append(f"[Chunk {i+1}]: {chunk_text}") | |
| return "\n\n".join(context_parts) | |
| def ingest(file): | |
| """Main ingestion function - processes file and returns context directly""" | |
| if file is None: | |
| return "No file uploaded", "" | |
| try: | |
| with open(file.name, 'rb') as f: | |
| file_content = f.read() | |
| filename = os.path.basename(file.name) | |
| # Extract text based on file type (in memory) | |
| file_extension = os.path.splitext(filename)[1].lower() | |
| if file_extension == '.pdf': | |
| text, extraction_metadata = extract_text_from_pdf_bytes(file_content) | |
| elif file_extension == '.docx': | |
| text, extraction_metadata = extract_text_from_docx_bytes(file_content) | |
| else: | |
| raise ValueError(f"Unsupported file type: {file_extension}") | |
| # Clean and chunk text | |
| context = clean_and_chunk_text(text) | |
| logger.info(f"Successfully processed document {filename}: {len(text)} characters") | |
| return context | |
| except Exception as e: | |
| logger.error(f"Document processing failed: {str(e)}") | |
| raise Exception(f"Processing failed: {str(e)}") | |
| if __name__ == "__main__": | |
| ui = gr.Interface( | |
| fn=ingest, | |
| inputs=gr.File( | |
| label="Document Upload", | |
| file_types=[".pdf", ".docx"] | |
| ), | |
| outputs=gr.Textbox( | |
| label="Processed Context", | |
| lines=15, | |
| show_copy_button=True | |
| ), | |
| title="ChatFed Ingestion Module", | |
| description="Processes PDF or DOCX files and returns chunked text context. Intended for use in RAG pipelines as an MCP server with other ChatFed modules (i.e. context supplied to generation service).", | |
| api_name="ingest" | |
| ) | |
| ui.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| # mcp_server=True, | |
| show_error=True | |
| ) |