|
|
import logging |
|
|
from io import BytesIO |
|
|
from pathlib import Path |
|
|
from typing import Dict, Set, Tuple, Union |
|
|
|
|
|
from docling_core.types.doc import ( |
|
|
DoclingDocument, |
|
|
DocumentOrigin, |
|
|
GroupLabel, |
|
|
ImageRef, |
|
|
TableCell, |
|
|
TableData, |
|
|
) |
|
|
|
|
|
|
|
|
from openpyxl import Workbook, load_workbook |
|
|
from openpyxl.cell.cell import Cell |
|
|
from openpyxl.drawing.image import Image |
|
|
from openpyxl.worksheet.worksheet import Worksheet |
|
|
|
|
|
from docling.backend.abstract_backend import DeclarativeDocumentBackend |
|
|
from docling.datamodel.base_models import InputFormat |
|
|
from docling.datamodel.document import InputDocument |
|
|
|
|
|
_log = logging.getLogger(__name__) |
|
|
|
|
|
from typing import Any, List |
|
|
|
|
|
from PIL import Image as PILImage |
|
|
from pydantic import BaseModel |
|
|
|
|
|
|
|
|
class ExcelCell(BaseModel): |
|
|
row: int |
|
|
col: int |
|
|
text: str |
|
|
row_span: int |
|
|
col_span: int |
|
|
|
|
|
|
|
|
class ExcelTable(BaseModel): |
|
|
num_rows: int |
|
|
num_cols: int |
|
|
data: List[ExcelCell] |
|
|
|
|
|
|
|
|
class MsExcelDocumentBackend(DeclarativeDocumentBackend): |
|
|
def __init__(self, in_doc: "InputDocument", path_or_stream: Union[BytesIO, Path]): |
|
|
super().__init__(in_doc, path_or_stream) |
|
|
|
|
|
|
|
|
self.max_levels = 10 |
|
|
|
|
|
self.parents: Dict[int, Any] = {} |
|
|
for i in range(-1, self.max_levels): |
|
|
self.parents[i] = None |
|
|
|
|
|
self.workbook = None |
|
|
try: |
|
|
if isinstance(self.path_or_stream, BytesIO): |
|
|
self.workbook = load_workbook(filename=self.path_or_stream) |
|
|
|
|
|
elif isinstance(self.path_or_stream, Path): |
|
|
self.workbook = load_workbook(filename=str(self.path_or_stream)) |
|
|
|
|
|
self.valid = True |
|
|
except Exception as e: |
|
|
self.valid = False |
|
|
|
|
|
raise RuntimeError( |
|
|
f"MsPowerpointDocumentBackend could not load document with hash {self.document_hash}" |
|
|
) from e |
|
|
|
|
|
def is_valid(self) -> bool: |
|
|
_log.info(f"valid: {self.valid}") |
|
|
return self.valid |
|
|
|
|
|
@classmethod |
|
|
def supports_pagination(cls) -> bool: |
|
|
return True |
|
|
|
|
|
def unload(self): |
|
|
if isinstance(self.path_or_stream, BytesIO): |
|
|
self.path_or_stream.close() |
|
|
|
|
|
self.path_or_stream = None |
|
|
|
|
|
@classmethod |
|
|
def supported_formats(cls) -> Set[InputFormat]: |
|
|
return {InputFormat.XLSX} |
|
|
|
|
|
def convert(self) -> DoclingDocument: |
|
|
|
|
|
|
|
|
origin = DocumentOrigin( |
|
|
filename=self.file.name or "file.xlsx", |
|
|
mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", |
|
|
binary_hash=self.document_hash, |
|
|
) |
|
|
|
|
|
doc = DoclingDocument(name=self.file.stem or "file.xlsx", origin=origin) |
|
|
|
|
|
if self.is_valid(): |
|
|
doc = self._convert_workbook(doc) |
|
|
else: |
|
|
raise RuntimeError( |
|
|
f"Cannot convert doc with {self.document_hash} because the backend failed to init." |
|
|
) |
|
|
|
|
|
return doc |
|
|
|
|
|
def _convert_workbook(self, doc: DoclingDocument) -> DoclingDocument: |
|
|
|
|
|
if self.workbook is not None: |
|
|
|
|
|
|
|
|
for sheet_name in self.workbook.sheetnames: |
|
|
_log.info(f"Processing sheet: {sheet_name}") |
|
|
|
|
|
|
|
|
sheet = self.workbook[sheet_name] |
|
|
|
|
|
self.parents[0] = doc.add_group( |
|
|
parent=None, |
|
|
label=GroupLabel.SECTION, |
|
|
name=f"sheet: {sheet_name}", |
|
|
) |
|
|
|
|
|
doc = self._convert_sheet(doc, sheet) |
|
|
else: |
|
|
_log.error("Workbook is not initialized.") |
|
|
|
|
|
return doc |
|
|
|
|
|
def _convert_sheet(self, doc: DoclingDocument, sheet: Worksheet): |
|
|
|
|
|
doc = self._find_tables_in_sheet(doc, sheet) |
|
|
|
|
|
doc = self._find_images_in_sheet(doc, sheet) |
|
|
|
|
|
return doc |
|
|
|
|
|
def _find_tables_in_sheet(self, doc: DoclingDocument, sheet: Worksheet): |
|
|
|
|
|
tables = self._find_data_tables(sheet) |
|
|
|
|
|
for excel_table in tables: |
|
|
num_rows = excel_table.num_rows |
|
|
num_cols = excel_table.num_cols |
|
|
|
|
|
table_data = TableData( |
|
|
num_rows=num_rows, |
|
|
num_cols=num_cols, |
|
|
table_cells=[], |
|
|
) |
|
|
|
|
|
for excel_cell in excel_table.data: |
|
|
|
|
|
cell = TableCell( |
|
|
text=excel_cell.text, |
|
|
row_span=excel_cell.row_span, |
|
|
col_span=excel_cell.col_span, |
|
|
start_row_offset_idx=excel_cell.row, |
|
|
end_row_offset_idx=excel_cell.row + excel_cell.row_span, |
|
|
start_col_offset_idx=excel_cell.col, |
|
|
end_col_offset_idx=excel_cell.col + excel_cell.col_span, |
|
|
col_header=False, |
|
|
row_header=False, |
|
|
) |
|
|
table_data.table_cells.append(cell) |
|
|
|
|
|
doc.add_table(data=table_data, parent=self.parents[0]) |
|
|
|
|
|
return doc |
|
|
|
|
|
def _find_data_tables(self, sheet: Worksheet): |
|
|
""" |
|
|
Find all compact rectangular data tables in a sheet. |
|
|
""" |
|
|
|
|
|
|
|
|
tables = [] |
|
|
visited: set[Tuple[int, int]] = set() |
|
|
|
|
|
|
|
|
for ri, row in enumerate(sheet.iter_rows(values_only=False)): |
|
|
for rj, cell in enumerate(row): |
|
|
|
|
|
|
|
|
if cell.value is None or (ri, rj) in visited: |
|
|
continue |
|
|
|
|
|
|
|
|
table_bounds, visited_cells = self._find_table_bounds( |
|
|
sheet, ri, rj, visited |
|
|
) |
|
|
|
|
|
visited.update(visited_cells) |
|
|
tables.append(table_bounds) |
|
|
|
|
|
return tables |
|
|
|
|
|
def _find_table_bounds( |
|
|
self, |
|
|
sheet: Worksheet, |
|
|
start_row: int, |
|
|
start_col: int, |
|
|
visited: set[Tuple[int, int]], |
|
|
): |
|
|
""" |
|
|
Determine the bounds of a compact rectangular table. |
|
|
Returns: |
|
|
- A dictionary with the bounds and data. |
|
|
- A set of visited cell coordinates. |
|
|
""" |
|
|
_log.info("find_table_bounds") |
|
|
|
|
|
max_row = self._find_table_bottom(sheet, start_row, start_col) |
|
|
max_col = self._find_table_right(sheet, start_row, start_col) |
|
|
|
|
|
|
|
|
data = [] |
|
|
visited_cells = set() |
|
|
for ri in range(start_row, max_row + 1): |
|
|
for rj in range(start_col, max_col + 1): |
|
|
|
|
|
cell = sheet.cell(row=ri + 1, column=rj + 1) |
|
|
|
|
|
|
|
|
row_span = 1 |
|
|
col_span = 1 |
|
|
|
|
|
|
|
|
for merged_range in sheet.merged_cells.ranges: |
|
|
|
|
|
if ( |
|
|
merged_range.min_row <= ri + 1 |
|
|
and ri + 1 <= merged_range.max_row |
|
|
and merged_range.min_col <= rj + 1 |
|
|
and rj + 1 <= merged_range.max_col |
|
|
): |
|
|
|
|
|
row_span = merged_range.max_row - merged_range.min_row + 1 |
|
|
col_span = merged_range.max_col - merged_range.min_col + 1 |
|
|
break |
|
|
|
|
|
if (ri, rj) not in visited_cells: |
|
|
data.append( |
|
|
ExcelCell( |
|
|
row=ri - start_row, |
|
|
col=rj - start_col, |
|
|
text=str(cell.value), |
|
|
row_span=row_span, |
|
|
col_span=col_span, |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
for span_row in range(ri, ri + row_span): |
|
|
for span_col in range(rj, rj + col_span): |
|
|
visited_cells.add((span_row, span_col)) |
|
|
|
|
|
return ( |
|
|
ExcelTable( |
|
|
num_rows=max_row + 1 - start_row, |
|
|
num_cols=max_col + 1 - start_col, |
|
|
data=data, |
|
|
), |
|
|
visited_cells, |
|
|
) |
|
|
|
|
|
def _find_table_bottom(self, sheet: Worksheet, start_row: int, start_col: int): |
|
|
"""Function to find the bottom boundary of the table""" |
|
|
|
|
|
max_row = start_row |
|
|
|
|
|
while max_row < sheet.max_row - 1: |
|
|
|
|
|
cell = sheet.cell(row=max_row + 2, column=start_col + 1) |
|
|
|
|
|
|
|
|
merged_range = next( |
|
|
(mr for mr in sheet.merged_cells.ranges if cell.coordinate in mr), |
|
|
None, |
|
|
) |
|
|
|
|
|
if cell.value is None and not merged_range: |
|
|
break |
|
|
|
|
|
|
|
|
if merged_range: |
|
|
max_row = max(max_row, merged_range.max_row - 1) |
|
|
else: |
|
|
max_row += 1 |
|
|
|
|
|
return max_row |
|
|
|
|
|
def _find_table_right(self, sheet: Worksheet, start_row: int, start_col: int): |
|
|
"""Function to find the right boundary of the table""" |
|
|
|
|
|
max_col = start_col |
|
|
|
|
|
while max_col < sheet.max_column - 1: |
|
|
|
|
|
cell = sheet.cell(row=start_row + 1, column=max_col + 2) |
|
|
|
|
|
|
|
|
merged_range = next( |
|
|
(mr for mr in sheet.merged_cells.ranges if cell.coordinate in mr), |
|
|
None, |
|
|
) |
|
|
|
|
|
if cell.value is None and not merged_range: |
|
|
break |
|
|
|
|
|
|
|
|
if merged_range: |
|
|
max_col = max(max_col, merged_range.max_col - 1) |
|
|
else: |
|
|
max_col += 1 |
|
|
|
|
|
return max_col |
|
|
|
|
|
def _find_images_in_sheet( |
|
|
self, doc: DoclingDocument, sheet: Worksheet |
|
|
) -> DoclingDocument: |
|
|
|
|
|
|
|
|
for idx, image in enumerate(sheet._images): |
|
|
|
|
|
try: |
|
|
pil_image = PILImage.open(image.ref) |
|
|
|
|
|
doc.add_picture( |
|
|
parent=self.parents[0], |
|
|
image=ImageRef.from_pil(image=pil_image, dpi=72), |
|
|
caption=None, |
|
|
) |
|
|
except: |
|
|
_log.error("could not extract the image from excel sheets") |
|
|
|
|
|
""" |
|
|
for idx, chart in enumerate(sheet._charts): # type: ignore |
|
|
try: |
|
|
chart_path = f"chart_{idx + 1}.png" |
|
|
_log.info( |
|
|
f"Chart found, but dynamic rendering is required for: {chart_path}" |
|
|
) |
|
|
|
|
|
_log.info(f"Chart {idx + 1}:") |
|
|
|
|
|
# Chart type |
|
|
# _log.info(f"Type: {type(chart).__name__}") |
|
|
print(f"Type: {type(chart).__name__}") |
|
|
|
|
|
# Extract series data |
|
|
for series_idx, series in enumerate(chart.series): |
|
|
#_log.info(f"Series {series_idx + 1}:") |
|
|
print(f"Series {series_idx + 1} type: {type(series).__name__}") |
|
|
#print(f"x-values: {series.xVal}") |
|
|
#print(f"y-values: {series.yVal}") |
|
|
|
|
|
print(f"xval type: {type(series.xVal).__name__}") |
|
|
|
|
|
xvals = [] |
|
|
for _ in series.xVal.numLit.pt: |
|
|
print(f"xval type: {type(_).__name__}") |
|
|
if hasattr(_, 'v'): |
|
|
xvals.append(_.v) |
|
|
|
|
|
print(f"x-values: {xvals}") |
|
|
|
|
|
yvals = [] |
|
|
for _ in series.yVal: |
|
|
if hasattr(_, 'v'): |
|
|
yvals.append(_.v) |
|
|
|
|
|
print(f"y-values: {yvals}") |
|
|
|
|
|
except Exception as exc: |
|
|
print(exc) |
|
|
continue |
|
|
""" |
|
|
|
|
|
return doc |
|
|
|