|
|
import logging |
|
|
import math |
|
|
import sys |
|
|
import time |
|
|
from functools import partial |
|
|
from pathlib import Path |
|
|
from typing import Dict, Iterable, Iterator, List, Optional, Tuple, Type, Union |
|
|
|
|
|
from pydantic import BaseModel, ConfigDict, model_validator, validate_call |
|
|
|
|
|
from docling.backend.abstract_backend import AbstractDocumentBackend |
|
|
from docling.backend.asciidoc_backend import AsciiDocBackend |
|
|
from docling.backend.docling_parse_v2_backend import DoclingParseV2DocumentBackend |
|
|
from docling.backend.html_backend import HTMLDocumentBackend |
|
|
from docling.backend.json.docling_json_backend import DoclingJSONBackend |
|
|
from docling.backend.md_backend import MarkdownDocumentBackend |
|
|
from docling.backend.msexcel_backend import MsExcelDocumentBackend |
|
|
from docling.backend.mspowerpoint_backend import MsPowerpointDocumentBackend |
|
|
from docling.backend.msword_backend import MsWordDocumentBackend |
|
|
from docling.backend.xml.pubmed_backend import PubMedDocumentBackend |
|
|
from docling.backend.xml.uspto_backend import PatentUsptoDocumentBackend |
|
|
from docling.datamodel.base_models import ( |
|
|
ConversionStatus, |
|
|
DoclingComponentType, |
|
|
DocumentStream, |
|
|
ErrorItem, |
|
|
InputFormat, |
|
|
) |
|
|
from docling.datamodel.document import ( |
|
|
ConversionResult, |
|
|
InputDocument, |
|
|
_DocumentConversionInput, |
|
|
) |
|
|
from docling.datamodel.pipeline_options import PipelineOptions |
|
|
from docling.datamodel.settings import ( |
|
|
DEFAULT_PAGE_RANGE, |
|
|
DocumentLimits, |
|
|
PageRange, |
|
|
settings, |
|
|
) |
|
|
from docling.exceptions import ConversionError |
|
|
from docling.pipeline.base_pipeline import BasePipeline |
|
|
from docling.pipeline.simple_pipeline import SimplePipeline |
|
|
from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline |
|
|
from docling.utils.utils import chunkify |
|
|
|
|
|
_log = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class FormatOption(BaseModel): |
|
|
pipeline_cls: Type[BasePipeline] |
|
|
pipeline_options: Optional[PipelineOptions] = None |
|
|
backend: Type[AbstractDocumentBackend] |
|
|
|
|
|
model_config = ConfigDict(arbitrary_types_allowed=True) |
|
|
|
|
|
@model_validator(mode="after") |
|
|
def set_optional_field_default(self) -> "FormatOption": |
|
|
if self.pipeline_options is None: |
|
|
self.pipeline_options = self.pipeline_cls.get_default_options() |
|
|
return self |
|
|
|
|
|
|
|
|
class ExcelFormatOption(FormatOption): |
|
|
pipeline_cls: Type = SimplePipeline |
|
|
backend: Type[AbstractDocumentBackend] = MsExcelDocumentBackend |
|
|
|
|
|
|
|
|
class WordFormatOption(FormatOption): |
|
|
pipeline_cls: Type = SimplePipeline |
|
|
backend: Type[AbstractDocumentBackend] = MsWordDocumentBackend |
|
|
|
|
|
|
|
|
class PowerpointFormatOption(FormatOption): |
|
|
pipeline_cls: Type = SimplePipeline |
|
|
backend: Type[AbstractDocumentBackend] = MsPowerpointDocumentBackend |
|
|
|
|
|
|
|
|
class MarkdownFormatOption(FormatOption): |
|
|
pipeline_cls: Type = SimplePipeline |
|
|
backend: Type[AbstractDocumentBackend] = MarkdownDocumentBackend |
|
|
|
|
|
|
|
|
class AsciiDocFormatOption(FormatOption): |
|
|
pipeline_cls: Type = SimplePipeline |
|
|
backend: Type[AbstractDocumentBackend] = AsciiDocBackend |
|
|
|
|
|
|
|
|
class HTMLFormatOption(FormatOption): |
|
|
pipeline_cls: Type = SimplePipeline |
|
|
backend: Type[AbstractDocumentBackend] = HTMLDocumentBackend |
|
|
|
|
|
|
|
|
class PatentUsptoFormatOption(FormatOption): |
|
|
pipeline_cls: Type = SimplePipeline |
|
|
backend: Type[PatentUsptoDocumentBackend] = PatentUsptoDocumentBackend |
|
|
|
|
|
|
|
|
class XMLPubMedFormatOption(FormatOption): |
|
|
pipeline_cls: Type = SimplePipeline |
|
|
backend: Type[AbstractDocumentBackend] = PubMedDocumentBackend |
|
|
|
|
|
|
|
|
class ImageFormatOption(FormatOption): |
|
|
pipeline_cls: Type = StandardPdfPipeline |
|
|
backend: Type[AbstractDocumentBackend] = DoclingParseV2DocumentBackend |
|
|
|
|
|
|
|
|
class PdfFormatOption(FormatOption): |
|
|
pipeline_cls: Type = StandardPdfPipeline |
|
|
backend: Type[AbstractDocumentBackend] = DoclingParseV2DocumentBackend |
|
|
|
|
|
|
|
|
def _get_default_option(format: InputFormat) -> FormatOption: |
|
|
format_to_default_options = { |
|
|
InputFormat.XLSX: FormatOption( |
|
|
pipeline_cls=SimplePipeline, backend=MsExcelDocumentBackend |
|
|
), |
|
|
InputFormat.DOCX: FormatOption( |
|
|
pipeline_cls=SimplePipeline, backend=MsWordDocumentBackend |
|
|
), |
|
|
InputFormat.PPTX: FormatOption( |
|
|
pipeline_cls=SimplePipeline, backend=MsPowerpointDocumentBackend |
|
|
), |
|
|
InputFormat.MD: FormatOption( |
|
|
pipeline_cls=SimplePipeline, backend=MarkdownDocumentBackend |
|
|
), |
|
|
InputFormat.ASCIIDOC: FormatOption( |
|
|
pipeline_cls=SimplePipeline, backend=AsciiDocBackend |
|
|
), |
|
|
InputFormat.HTML: FormatOption( |
|
|
pipeline_cls=SimplePipeline, backend=HTMLDocumentBackend |
|
|
), |
|
|
InputFormat.XML_USPTO: FormatOption( |
|
|
pipeline_cls=SimplePipeline, backend=PatentUsptoDocumentBackend |
|
|
), |
|
|
InputFormat.XML_PUBMED: FormatOption( |
|
|
pipeline_cls=SimplePipeline, backend=PubMedDocumentBackend |
|
|
), |
|
|
InputFormat.IMAGE: FormatOption( |
|
|
pipeline_cls=StandardPdfPipeline, backend=DoclingParseV2DocumentBackend |
|
|
), |
|
|
InputFormat.PDF: FormatOption( |
|
|
pipeline_cls=StandardPdfPipeline, backend=DoclingParseV2DocumentBackend |
|
|
), |
|
|
InputFormat.JSON_DOCLING: FormatOption( |
|
|
pipeline_cls=SimplePipeline, backend=DoclingJSONBackend |
|
|
), |
|
|
} |
|
|
if (options := format_to_default_options.get(format)) is not None: |
|
|
return options |
|
|
else: |
|
|
raise RuntimeError(f"No default options configured for {format}") |
|
|
|
|
|
|
|
|
class DocumentConverter: |
|
|
_default_download_filename = "file" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
allowed_formats: Optional[List[InputFormat]] = None, |
|
|
format_options: Optional[Dict[InputFormat, FormatOption]] = None, |
|
|
): |
|
|
self.allowed_formats = ( |
|
|
allowed_formats if allowed_formats is not None else [e for e in InputFormat] |
|
|
) |
|
|
self.format_to_options = { |
|
|
format: ( |
|
|
_get_default_option(format=format) |
|
|
if (custom_option := (format_options or {}).get(format)) is None |
|
|
else custom_option |
|
|
) |
|
|
for format in self.allowed_formats |
|
|
} |
|
|
self.initialized_pipelines: Dict[Type[BasePipeline], BasePipeline] = {} |
|
|
|
|
|
def initialize_pipeline(self, format: InputFormat): |
|
|
"""Initialize the conversion pipeline for the selected format.""" |
|
|
pipeline = self._get_pipeline(doc_format=format) |
|
|
if pipeline is None: |
|
|
raise ConversionError( |
|
|
f"No pipeline could be initialized for format {format}" |
|
|
) |
|
|
|
|
|
@validate_call(config=ConfigDict(strict=True)) |
|
|
def convert( |
|
|
self, |
|
|
source: Union[Path, str, DocumentStream], |
|
|
headers: Optional[Dict[str, str]] = None, |
|
|
raises_on_error: bool = True, |
|
|
max_num_pages: int = sys.maxsize, |
|
|
max_file_size: int = sys.maxsize, |
|
|
page_range: PageRange = DEFAULT_PAGE_RANGE, |
|
|
) -> ConversionResult: |
|
|
all_res = self.convert_all( |
|
|
source=[source], |
|
|
raises_on_error=raises_on_error, |
|
|
max_num_pages=max_num_pages, |
|
|
max_file_size=max_file_size, |
|
|
headers=headers, |
|
|
page_range=page_range, |
|
|
) |
|
|
return next(all_res) |
|
|
|
|
|
@validate_call(config=ConfigDict(strict=True)) |
|
|
def convert_all( |
|
|
self, |
|
|
source: Iterable[Union[Path, str, DocumentStream]], |
|
|
headers: Optional[Dict[str, str]] = None, |
|
|
raises_on_error: bool = True, |
|
|
max_num_pages: int = sys.maxsize, |
|
|
max_file_size: int = sys.maxsize, |
|
|
page_range: PageRange = DEFAULT_PAGE_RANGE, |
|
|
) -> Iterator[ConversionResult]: |
|
|
limits = DocumentLimits( |
|
|
max_num_pages=max_num_pages, |
|
|
max_file_size=max_file_size, |
|
|
page_range=page_range, |
|
|
) |
|
|
conv_input = _DocumentConversionInput( |
|
|
path_or_stream_iterator=source, limits=limits, headers=headers |
|
|
) |
|
|
conv_res_iter = self._convert(conv_input, raises_on_error=raises_on_error) |
|
|
|
|
|
had_result = False |
|
|
for conv_res in conv_res_iter: |
|
|
had_result = True |
|
|
if raises_on_error and conv_res.status not in { |
|
|
ConversionStatus.SUCCESS, |
|
|
ConversionStatus.PARTIAL_SUCCESS, |
|
|
}: |
|
|
raise ConversionError( |
|
|
f"Conversion failed for: {conv_res.input.file} with status: {conv_res.status}" |
|
|
) |
|
|
else: |
|
|
yield conv_res |
|
|
|
|
|
if not had_result and raises_on_error: |
|
|
raise ConversionError( |
|
|
f"Conversion failed because the provided file has no recognizable format or it wasn't in the list of allowed formats." |
|
|
) |
|
|
|
|
|
def _convert( |
|
|
self, conv_input: _DocumentConversionInput, raises_on_error: bool |
|
|
) -> Iterator[ConversionResult]: |
|
|
start_time = time.monotonic() |
|
|
|
|
|
for input_batch in chunkify( |
|
|
conv_input.docs(self.format_to_options), |
|
|
settings.perf.doc_batch_size, |
|
|
): |
|
|
_log.info(f"Going to convert document batch...") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for item in map( |
|
|
partial(self._process_document, raises_on_error=raises_on_error), |
|
|
input_batch, |
|
|
): |
|
|
elapsed = time.monotonic() - start_time |
|
|
start_time = time.monotonic() |
|
|
_log.info( |
|
|
f"Finished converting document {item.input.file.name} in {elapsed:.2f} sec." |
|
|
) |
|
|
yield item |
|
|
|
|
|
def _get_pipeline(self, doc_format: InputFormat) -> Optional[BasePipeline]: |
|
|
fopt = self.format_to_options.get(doc_format) |
|
|
|
|
|
if fopt is None: |
|
|
return None |
|
|
else: |
|
|
pipeline_class = fopt.pipeline_cls |
|
|
pipeline_options = fopt.pipeline_options |
|
|
|
|
|
if pipeline_options is None: |
|
|
return None |
|
|
|
|
|
if ( |
|
|
pipeline_class not in self.initialized_pipelines |
|
|
or self.initialized_pipelines[pipeline_class].pipeline_options |
|
|
!= pipeline_options |
|
|
): |
|
|
self.initialized_pipelines[pipeline_class] = pipeline_class( |
|
|
pipeline_options=pipeline_options |
|
|
) |
|
|
return self.initialized_pipelines[pipeline_class] |
|
|
|
|
|
def _process_document( |
|
|
self, in_doc: InputDocument, raises_on_error: bool |
|
|
) -> ConversionResult: |
|
|
|
|
|
valid = ( |
|
|
self.allowed_formats is not None and in_doc.format in self.allowed_formats |
|
|
) |
|
|
if valid: |
|
|
conv_res = self._execute_pipeline(in_doc, raises_on_error=raises_on_error) |
|
|
else: |
|
|
error_message = f"File format not allowed: {in_doc.file}" |
|
|
if raises_on_error: |
|
|
raise ConversionError(error_message) |
|
|
else: |
|
|
error_item = ErrorItem( |
|
|
component_type=DoclingComponentType.USER_INPUT, |
|
|
module_name="", |
|
|
error_message=error_message, |
|
|
) |
|
|
conv_res = ConversionResult( |
|
|
input=in_doc, status=ConversionStatus.SKIPPED, errors=[error_item] |
|
|
) |
|
|
|
|
|
return conv_res |
|
|
|
|
|
def _execute_pipeline( |
|
|
self, in_doc: InputDocument, raises_on_error: bool |
|
|
) -> ConversionResult: |
|
|
if in_doc.valid: |
|
|
pipeline = self._get_pipeline(in_doc.format) |
|
|
if pipeline is not None: |
|
|
conv_res = pipeline.execute(in_doc, raises_on_error=raises_on_error) |
|
|
else: |
|
|
if raises_on_error: |
|
|
raise ConversionError( |
|
|
f"No pipeline could be initialized for {in_doc.file}." |
|
|
) |
|
|
else: |
|
|
conv_res = ConversionResult( |
|
|
input=in_doc, |
|
|
status=ConversionStatus.FAILURE, |
|
|
) |
|
|
else: |
|
|
if raises_on_error: |
|
|
raise ConversionError(f"Input document {in_doc.file} is not valid.") |
|
|
|
|
|
else: |
|
|
|
|
|
conv_res = ConversionResult( |
|
|
input=in_doc, |
|
|
status=ConversionStatus.FAILURE, |
|
|
) |
|
|
|
|
|
|
|
|
return conv_res |
|
|
|