Spaces:
Running
Running
| import os | |
| import random | |
| import shutil | |
| import tempfile | |
| import zipfile | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from datetime import datetime | |
| import fitz # PyMuPDF | |
| import gradio as gr | |
| from huggingface_hub import DatasetCard, DatasetCardData, HfApi | |
| from dataset_card_template import DATASET_CARD_TEMPLATE | |
| os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" | |
| def process_pdf(pdf_file, sample_size, temp_dir): | |
| try: | |
| pdf_path = pdf_file.name | |
| doc = fitz.open(pdf_path) | |
| total_pages = len(doc) | |
| pages_to_convert = ( | |
| total_pages if sample_size == 0 else min(sample_size, total_pages) | |
| ) | |
| selected_pages = ( | |
| sorted(random.sample(range(total_pages), pages_to_convert)) | |
| if sample_size > 0 and sample_size < total_pages | |
| else range(total_pages) | |
| ) | |
| images = [] | |
| for page_num in selected_pages: | |
| page = doc[page_num] | |
| pix = page.get_pixmap() | |
| image_path = os.path.join( | |
| temp_dir, f"{os.path.basename(pdf_path)}_page_{page_num+1}.png" | |
| ) | |
| pix.save(image_path) | |
| images.append(image_path) | |
| doc.close() | |
| return images, None | |
| except Exception as e: | |
| return [], f"Error processing {pdf_file.name}: {str(e)}" | |
| def pdf_to_images(pdf_files, sample_size, temp_dir, progress=gr.Progress()): | |
| if not os.path.exists(temp_dir): | |
| os.makedirs(temp_dir) | |
| progress(0, desc="Starting conversion") | |
| all_images = [] | |
| skipped_pdfs = [] | |
| with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: | |
| future_to_pdf = { | |
| executor.submit(process_pdf, pdf_file, sample_size, temp_dir): pdf_file | |
| for pdf_file in pdf_files | |
| } | |
| for future in progress.tqdm( | |
| as_completed(future_to_pdf), total=len(pdf_files), desc="Converting PDFs" | |
| ): | |
| pdf_file = future_to_pdf[future] | |
| images, error = future.result() | |
| if error: | |
| skipped_pdfs.append(pdf_file.name) | |
| gr.Info(error) | |
| else: | |
| all_images.extend(images) | |
| message = f"Saved {len(all_images)} images to temporary directory" | |
| if skipped_pdfs: | |
| message += f"\nSkipped {len(skipped_pdfs)} PDFs due to errors: {', '.join(skipped_pdfs)}" | |
| return all_images, message | |
| def get_size_category(num_images): | |
| if num_images < 1000: | |
| return "n<1K" | |
| elif num_images < 10000: | |
| return "1K<n<10K" | |
| elif num_images < 100000: | |
| return "10K<n<100K" | |
| elif num_images < 1000000: | |
| return "100K<n<1M" | |
| else: | |
| return "n>1M" | |
| def process_pdfs( | |
| pdf_files, | |
| sample_size, | |
| hf_repo, | |
| create_zip, | |
| private_repo, | |
| oauth_token: gr.OAuthToken | None, | |
| progress=gr.Progress(), | |
| ): | |
| if not pdf_files: | |
| return ( | |
| None, | |
| None, | |
| gr.Markdown( | |
| "⚠️ No PDF files uploaded. Please upload at least one PDF file." | |
| ), | |
| ) | |
| if oauth_token is None: | |
| return ( | |
| None, | |
| None, | |
| gr.Markdown( | |
| "⚠️ Not logged in to Hugging Face. Please log in to upload to a Hugging Face dataset." | |
| ), | |
| ) | |
| try: | |
| temp_dir = tempfile.mkdtemp() | |
| images_dir = os.path.join(temp_dir, "images") | |
| os.makedirs(images_dir) | |
| progress(0, desc="Starting PDF processing") | |
| images, message = pdf_to_images(pdf_files, sample_size, images_dir) | |
| zip_path = None | |
| if create_zip: | |
| # Create a zip file of the images | |
| zip_path = os.path.join(temp_dir, "converted_images.zip") | |
| with zipfile.ZipFile(zip_path, "w") as zipf: | |
| progress(0, desc="Zipping images") | |
| for image in progress.tqdm(images, desc="Zipping images"): | |
| zipf.write(image, os.path.basename(image)) | |
| message += f"\nCreated zip file with {len(images)} images" | |
| if hf_repo: | |
| try: | |
| hf_api = HfApi(token=oauth_token.token) | |
| hf_api.create_repo( | |
| hf_repo, | |
| repo_type="dataset", | |
| private=private_repo, | |
| ) | |
| hf_api.upload_large_folder( | |
| folder_path=temp_dir, | |
| repo_id=hf_repo, | |
| repo_type="dataset", | |
| # path_in_repo="images", | |
| ) | |
| # Determine size category | |
| size_category = get_size_category(len(images)) | |
| # Create DatasetCardData instance | |
| card_data = DatasetCardData( | |
| tags=["created-with-pdfs-to-page-images-converter", "pdf-to-image"], | |
| size_categories=[size_category], | |
| ) | |
| # Create and populate the dataset card | |
| card = DatasetCard.from_template( | |
| card_data, | |
| template_path=None, # Use default template | |
| hf_repo=hf_repo, | |
| num_images=len(images), | |
| num_pdfs=len(pdf_files), | |
| sample_size=sample_size if sample_size > 0 else "All pages", | |
| creation_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| ) | |
| # Add our custom content to the card | |
| card.text = DATASET_CARD_TEMPLATE.format( | |
| hf_repo=hf_repo, | |
| num_images=len(images), | |
| num_pdfs=len(pdf_files), | |
| sample_size=sample_size if sample_size > 0 else "All pages", | |
| creation_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| size_category=size_category, | |
| ) | |
| repo_url = f"https://huggingface.co/datasets/{hf_repo}" | |
| message += f"\nUploaded dataset card to Hugging Face repo: [{hf_repo}]({repo_url})" | |
| card.push_to_hub(hf_repo, token=oauth_token.token) | |
| except Exception as e: | |
| message += f"\nFailed to upload to Hugging Face: {str(e)}" | |
| return images, zip_path, message | |
| except Exception as e: | |
| if "temp_dir" in locals(): | |
| shutil.rmtree(temp_dir) | |
| return None, None, f"An error occurred: {str(e)}" | |
| # Define the Gradio interface | |
| with gr.Blocks() as demo: | |
| gr.HTML( | |
| """<h1 style='text-align: center;'> PDFs to Page Images Converter</h1> | |
| <center><i> 📁 Convert PDFs to an image dataset, splitting pages into individual images 📁 </i></center>""" | |
| ) | |
| gr.Markdown( | |
| """ | |
| This app allows you to: | |
| 1. Upload one or more PDF files | |
| 2. Convert each page of the PDFs into separate image files | |
| 3. (Optionally) sample a specific number of pages from each PDF | |
| 4. (Optionally) Create a downloadable ZIP file of the converted images | |
| 5. (Optionally) Upload the images to a Hugging Face dataset repository | |
| """ | |
| ) | |
| with gr.Row(): | |
| gr.LoginButton(size="sm") | |
| with gr.Row(): | |
| pdf_files = gr.File( | |
| file_count="multiple", label="Upload PDF(s)", file_types=["*.pdf"] | |
| ) | |
| with gr.Row(): | |
| sample_size = gr.Number( | |
| value=None, | |
| label="Pages per PDF (0 for all pages)", | |
| info="Specify how many pages to convert from each PDF. Use 0 to convert all pages.", | |
| ) | |
| hf_repo = gr.Textbox( | |
| label="Hugging Face Repo", | |
| placeholder="username/repo-name", | |
| info="Enter the Hugging Face repository name in the format 'username/repo-name'", | |
| ) | |
| with gr.Row(): | |
| create_zip = gr.Checkbox(label="Create ZIP file of images?", value=False) | |
| private_repo = gr.Checkbox(label="Make repository private?", value=False) | |
| with gr.Accordion("View converted images", open=False): | |
| output_gallery = gr.Gallery(label="Converted Images") | |
| status_text = gr.Markdown(label="Status") | |
| download_button = gr.File(label="Download Converted Images") | |
| submit_button = gr.Button("Convert PDFs to page images") | |
| submit_button.click( | |
| process_pdfs, | |
| inputs=[pdf_files, sample_size, hf_repo, create_zip, private_repo], | |
| outputs=[output_gallery, download_button, status_text], | |
| ) | |
| # Launch the app | |
| demo.launch(debug=True) | |