File size: 7,804 Bytes
fcaa164 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
import copy
import logging
import warnings
from pathlib import Path
from typing import Iterable, Optional, Union
from docling_core.types.doc import DocItemLabel
from docling_ibm_models.layoutmodel.layout_predictor import LayoutPredictor
from PIL import Image
from docling.datamodel.base_models import BoundingBox, Cluster, LayoutPrediction, Page
from docling.datamodel.document import ConversionResult
from docling.datamodel.pipeline_options import AcceleratorOptions
from docling.datamodel.settings import settings
from docling.models.base_model import BasePageModel
from docling.utils.accelerator_utils import decide_device
from docling.utils.layout_postprocessor import LayoutPostprocessor
from docling.utils.profiling import TimeRecorder
from docling.utils.visualization import draw_clusters
_log = logging.getLogger(__name__)
class LayoutModel(BasePageModel):
_model_repo_folder = "ds4sd--docling-models"
_model_path = "model_artifacts/layout"
TEXT_ELEM_LABELS = [
DocItemLabel.TEXT,
DocItemLabel.FOOTNOTE,
DocItemLabel.CAPTION,
DocItemLabel.CHECKBOX_UNSELECTED,
DocItemLabel.CHECKBOX_SELECTED,
DocItemLabel.SECTION_HEADER,
DocItemLabel.PAGE_HEADER,
DocItemLabel.PAGE_FOOTER,
DocItemLabel.CODE,
DocItemLabel.LIST_ITEM,
DocItemLabel.FORMULA,
]
PAGE_HEADER_LABELS = [DocItemLabel.PAGE_HEADER, DocItemLabel.PAGE_FOOTER]
TABLE_LABELS = [DocItemLabel.TABLE, DocItemLabel.DOCUMENT_INDEX]
FIGURE_LABEL = DocItemLabel.PICTURE
FORMULA_LABEL = DocItemLabel.FORMULA
CONTAINER_LABELS = [DocItemLabel.FORM, DocItemLabel.KEY_VALUE_REGION]
def __init__(
self, artifacts_path: Optional[Path], accelerator_options: AcceleratorOptions
):
device = decide_device(accelerator_options.device)
if artifacts_path is None:
artifacts_path = self.download_models() / self._model_path
else:
# will become the default in the future
if (artifacts_path / self._model_repo_folder).exists():
artifacts_path = (
artifacts_path / self._model_repo_folder / self._model_path
)
elif (artifacts_path / self._model_path).exists():
warnings.warn(
"The usage of artifacts_path containing directly "
f"{self._model_path} is deprecated. Please point "
"the artifacts_path to the parent containing "
f"the {self._model_repo_folder} folder.",
DeprecationWarning,
stacklevel=3,
)
artifacts_path = artifacts_path / self._model_path
self.layout_predictor = LayoutPredictor(
artifact_path=str(artifacts_path),
device=device,
num_threads=accelerator_options.num_threads,
)
@staticmethod
def download_models(
local_dir: Optional[Path] = None,
force: bool = False,
progress: bool = False,
) -> Path:
from huggingface_hub import snapshot_download
from huggingface_hub.utils import disable_progress_bars
if not progress:
disable_progress_bars()
download_path = snapshot_download(
repo_id="ds4sd/docling-models",
force_download=force,
local_dir=local_dir,
revision="v2.1.0",
)
return Path(download_path)
def draw_clusters_and_cells_side_by_side(
self, conv_res, page, clusters, mode_prefix: str, show: bool = False
):
"""
Draws a page image side by side with clusters filtered into two categories:
- Left: Clusters excluding FORM, KEY_VALUE_REGION, and PICTURE.
- Right: Clusters including FORM, KEY_VALUE_REGION, and PICTURE.
Includes label names and confidence scores for each cluster.
"""
scale_x = page.image.width / page.size.width
scale_y = page.image.height / page.size.height
# Filter clusters for left and right images
exclude_labels = {
DocItemLabel.FORM,
DocItemLabel.KEY_VALUE_REGION,
DocItemLabel.PICTURE,
}
left_clusters = [c for c in clusters if c.label not in exclude_labels]
right_clusters = [c for c in clusters if c.label in exclude_labels]
# Create a deep copy of the original image for both sides
left_image = copy.deepcopy(page.image)
right_image = copy.deepcopy(page.image)
# Draw clusters on both images
draw_clusters(left_image, left_clusters, scale_x, scale_y)
draw_clusters(right_image, right_clusters, scale_x, scale_y)
# Combine the images side by side
combined_width = left_image.width * 2
combined_height = left_image.height
combined_image = Image.new("RGB", (combined_width, combined_height))
combined_image.paste(left_image, (0, 0))
combined_image.paste(right_image, (left_image.width, 0))
if show:
combined_image.show()
else:
out_path: Path = (
Path(settings.debug.debug_output_path)
/ f"debug_{conv_res.input.file.stem}"
)
out_path.mkdir(parents=True, exist_ok=True)
out_file = out_path / f"{mode_prefix}_layout_page_{page.page_no:05}.png"
combined_image.save(str(out_file), format="png")
def __call__(
self, conv_res: ConversionResult, page_batch: Iterable[Page]
) -> Iterable[Page]:
for page in page_batch:
assert page._backend is not None
if not page._backend.is_valid():
yield page
else:
with TimeRecorder(conv_res, "layout"):
assert page.size is not None
page_image = page.get_image(scale=1.0)
assert page_image is not None
clusters = []
for ix, pred_item in enumerate(
self.layout_predictor.predict(page_image)
):
label = DocItemLabel(
pred_item["label"]
.lower()
.replace(" ", "_")
.replace("-", "_")
) # Temporary, until docling-ibm-model uses docling-core types
cluster = Cluster(
id=ix,
label=label,
confidence=pred_item["confidence"],
bbox=BoundingBox.model_validate(pred_item),
cells=[],
)
clusters.append(cluster)
if settings.debug.visualize_raw_layout:
self.draw_clusters_and_cells_side_by_side(
conv_res, page, clusters, mode_prefix="raw"
)
# Apply postprocessing
processed_clusters, processed_cells = LayoutPostprocessor(
page.cells, clusters, page.size
).postprocess()
# processed_clusters, processed_cells = clusters, page.cells
page.cells = processed_cells
page.predictions.layout = LayoutPrediction(
clusters=processed_clusters
)
if settings.debug.visualize_layout:
self.draw_clusters_and_cells_side_by_side(
conv_res, page, processed_clusters, mode_prefix="postprocessed"
)
yield page
|