Spaces:
Running
Running
| import gradio as gr | |
| from markitdown import MarkItDown | |
| import google.generativeai as genai | |
| import tempfile | |
| import os | |
| from pathlib import Path | |
| # Initialize MarkItDown | |
| md = MarkItDown() | |
| # Configure Gemini AI | |
| genai.configure(api_key=os.getenv('GEMINI_KEY')) | |
| model = genai.GenerativeModel('gemini-2.0-flash-lite-preview-02-05') | |
| def process_with_markitdown(input_path): | |
| """Process file or URL with MarkItDown and return text content""" | |
| print(f"[DEBUG] Starting MarkItDown processing for: {input_path}") | |
| try: | |
| import concurrent.futures | |
| from concurrent.futures import ThreadPoolExecutor | |
| def convert_with_timeout(): | |
| print("[DEBUG] Attempting MarkItDown conversion") | |
| result = md.convert(input_path) | |
| print("[DEBUG] MarkItDown conversion successful") | |
| if not result or not hasattr(result, 'text_content'): | |
| print("[DEBUG] No text content in result") | |
| return "Error: No text content found in document" | |
| return result.text_content | |
| # Use ThreadPoolExecutor with timeout | |
| with ThreadPoolExecutor() as executor: | |
| future = executor.submit(convert_with_timeout) | |
| try: | |
| result = future.result(timeout=30) # 30 second timeout | |
| print("[DEBUG] Successfully got result from MarkItDown") | |
| return result | |
| except concurrent.futures.TimeoutError: | |
| print("[DEBUG] MarkItDown processing timed out") | |
| return "Error: Processing timed out after 30 seconds" | |
| except Exception as e: | |
| print(f"[DEBUG] Error in process_with_markitdown: {str(e)}") | |
| return f"Error processing input: {str(e)}" | |
| def save_uploaded_file(uploaded_file): | |
| """Saves an uploaded file to a temporary location.""" | |
| print("[DEBUG] Starting save_uploaded_file") | |
| if uploaded_file is None: | |
| print("[DEBUG] No file uploaded") | |
| return "No file uploaded." | |
| try: | |
| print(f"[DEBUG] Uploaded file object type: {type(uploaded_file)}") | |
| print(f"[DEBUG] Uploaded file name: {uploaded_file.name}") | |
| # Get the actual file path from the uploaded file | |
| file_path = uploaded_file.name | |
| print(f"[DEBUG] Original file path: {file_path}") | |
| # Read the content directly from the original file | |
| try: | |
| with open(file_path, 'rb') as source_file: | |
| content = source_file.read() | |
| print(f"[DEBUG] Successfully read {len(content)} bytes from source file") | |
| except Exception as e: | |
| print(f"[DEBUG] Error reading source file: {str(e)}") | |
| return f"Error reading file: {str(e)}" | |
| # Save to temp file | |
| temp_dir = tempfile.gettempdir() | |
| temp_filename = os.path.join(temp_dir, os.path.basename(file_path)) | |
| with open(temp_filename, 'wb') as f: | |
| f.write(content) | |
| print(f"[DEBUG] File saved successfully at: {temp_filename}") | |
| return temp_filename | |
| except Exception as e: | |
| print(f"[DEBUG] Error in save_uploaded_file: {str(e)}") | |
| return f"An error occurred: {str(e)}" | |
| async def summarize_text(text): | |
| """Summarize the input text using Gemini AI""" | |
| try: | |
| prompt = f"""Please provide a concise summary of the following text. Focus on the main points and key takeaways: | |
| {text} | |
| Summary:""" | |
| # Use the synchronous version since async version isn't working as expected | |
| response = model.generate_content(prompt) | |
| return response.text | |
| except Exception as e: | |
| return f"Error generating summary: {str(e)}" | |
| async def process_input(input_text, uploaded_file=None): | |
| """Main function to process either URL or uploaded file""" | |
| print("[DEBUG] Starting process_input") | |
| try: | |
| if uploaded_file is not None: | |
| # Handle file upload | |
| temp_path = save_uploaded_file(uploaded_file) | |
| if temp_path.startswith('Error'): | |
| return temp_path | |
| text = process_with_markitdown(temp_path) | |
| # Clean up temporary file | |
| try: | |
| os.remove(temp_path) | |
| except: | |
| pass | |
| elif input_text.startswith(('http://', 'https://')): | |
| # Handle URL | |
| text = process_with_markitdown(input_text) | |
| else: | |
| # Handle direct text input | |
| text = input_text | |
| if text.startswith('Error'): | |
| return text | |
| # Generate summary using Gemini AI | |
| summary = await summarize_text(text) | |
| return summary | |
| except Exception as e: | |
| return f"Error processing input: {str(e)}" | |
| def clear_inputs(): | |
| return ["", None, ""] | |
| # Create Gradio interface with drag-and-drop | |
| with gr.Blocks(theme=gr.themes.Soft()) as iface: | |
| gr.Markdown( | |
| """ | |
| # Summarizeit | |
| > Summarize any document! Using Gemini 2.0 Flash model. | |
| Enter a URL, paste text, or drag & drop a file to get a summary. | |
| """ | |
| ) | |
| with gr.Row(): | |
| input_text = gr.Textbox( | |
| label="Enter URL or text", | |
| placeholder="Enter a URL or paste text here...", | |
| scale=2 | |
| ) | |
| with gr.Row(): | |
| file_upload = gr.File( | |
| label="Drop files here or click to upload", | |
| file_types=[ | |
| ".pdf", ".docx", ".xlsx", ".csv", ".txt", | |
| ".html", ".htm", ".xml", ".json" | |
| ], | |
| file_count="single", | |
| scale=2 | |
| ) | |
| with gr.Row(): | |
| submit_btn = gr.Button("Summarize", variant="primary") | |
| clear_btn = gr.Button("Clear") | |
| output_text = gr.Textbox( | |
| label="Summary", | |
| lines=10, | |
| show_copy_button=True | |
| ) | |
| # Set up event handlers | |
| submit_btn.click( | |
| fn=process_input, | |
| inputs=[input_text, file_upload], | |
| outputs=output_text, | |
| api_name="process" | |
| ) | |
| clear_btn.click( | |
| fn=clear_inputs, | |
| outputs=[input_text, file_upload, output_text] | |
| ) | |
| # Add examples | |
| gr.Examples( | |
| examples=[ | |
| ["https://h3manth.com"], | |
| ["https://www.youtube.com/watch?v=bSHp7WVpPgc"], | |
| ["https://en.wikipedia.org/wiki/Three-body_problem"] | |
| ], | |
| inputs=input_text | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch(True) |