|
|
import logging |
|
|
import re |
|
|
from io import BytesIO |
|
|
from pathlib import Path |
|
|
from typing import Any, Optional, Union |
|
|
|
|
|
from docling_core.types.doc import ( |
|
|
DocItemLabel, |
|
|
DoclingDocument, |
|
|
DocumentOrigin, |
|
|
GroupLabel, |
|
|
ImageRef, |
|
|
NodeItem, |
|
|
TableCell, |
|
|
TableData, |
|
|
) |
|
|
from docx import Document |
|
|
from docx.document import Document as DocxDocument |
|
|
from docx.oxml.table import CT_Tc |
|
|
from docx.oxml.xmlchemy import BaseOxmlElement |
|
|
from docx.table import Table, _Cell |
|
|
from docx.text.paragraph import Paragraph |
|
|
from lxml import etree |
|
|
from lxml.etree import XPath |
|
|
from PIL import Image, UnidentifiedImageError |
|
|
from typing_extensions import override |
|
|
|
|
|
from docling.backend.abstract_backend import DeclarativeDocumentBackend |
|
|
from docling.datamodel.base_models import InputFormat |
|
|
from docling.datamodel.document import InputDocument |
|
|
|
|
|
_log = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class MsWordDocumentBackend(DeclarativeDocumentBackend): |
|
|
@override |
|
|
def __init__( |
|
|
self, in_doc: "InputDocument", path_or_stream: Union[BytesIO, Path] |
|
|
) -> None: |
|
|
super().__init__(in_doc, path_or_stream) |
|
|
self.XML_KEY = ( |
|
|
"{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val" |
|
|
) |
|
|
self.xml_namespaces = { |
|
|
"w": "http://schemas.microsoft.com/office/word/2003/wordml" |
|
|
} |
|
|
|
|
|
|
|
|
self.path_or_stream: Union[BytesIO, Path] = path_or_stream |
|
|
self.valid: bool = False |
|
|
|
|
|
self.max_levels: int = 10 |
|
|
self.level_at_new_list: Optional[int] = None |
|
|
self.parents: dict[int, Optional[NodeItem]] = {} |
|
|
for i in range(-1, self.max_levels): |
|
|
self.parents[i] = None |
|
|
|
|
|
self.level = 0 |
|
|
self.listIter = 0 |
|
|
|
|
|
self.history: dict[str, Any] = { |
|
|
"names": [None], |
|
|
"levels": [None], |
|
|
"numids": [None], |
|
|
"indents": [None], |
|
|
} |
|
|
|
|
|
self.docx_obj = None |
|
|
try: |
|
|
if isinstance(self.path_or_stream, BytesIO): |
|
|
self.docx_obj = Document(self.path_or_stream) |
|
|
elif isinstance(self.path_or_stream, Path): |
|
|
self.docx_obj = Document(str(self.path_or_stream)) |
|
|
|
|
|
self.valid = True |
|
|
except Exception as e: |
|
|
raise RuntimeError( |
|
|
f"MsPowerpointDocumentBackend could not load document with hash {self.document_hash}" |
|
|
) from e |
|
|
|
|
|
@override |
|
|
def is_valid(self) -> bool: |
|
|
return self.valid |
|
|
|
|
|
@classmethod |
|
|
@override |
|
|
def supports_pagination(cls) -> bool: |
|
|
return False |
|
|
|
|
|
@override |
|
|
def unload(self): |
|
|
if isinstance(self.path_or_stream, BytesIO): |
|
|
self.path_or_stream.close() |
|
|
|
|
|
self.path_or_stream = None |
|
|
|
|
|
@classmethod |
|
|
@override |
|
|
def supported_formats(cls) -> set[InputFormat]: |
|
|
return {InputFormat.DOCX} |
|
|
|
|
|
@override |
|
|
def convert(self) -> DoclingDocument: |
|
|
"""Parses the DOCX into a structured document model. |
|
|
|
|
|
Returns: |
|
|
The parsed document. |
|
|
""" |
|
|
|
|
|
origin = DocumentOrigin( |
|
|
filename=self.file.name or "file", |
|
|
mimetype="application/vnd.openxmlformats-officedocument.wordprocessingml.document", |
|
|
binary_hash=self.document_hash, |
|
|
) |
|
|
|
|
|
doc = DoclingDocument(name=self.file.stem or "file", origin=origin) |
|
|
if self.is_valid(): |
|
|
assert self.docx_obj is not None |
|
|
doc = self.walk_linear(self.docx_obj.element.body, self.docx_obj, doc) |
|
|
return doc |
|
|
else: |
|
|
raise RuntimeError( |
|
|
f"Cannot convert doc with {self.document_hash} because the backend failed to init." |
|
|
) |
|
|
|
|
|
def update_history( |
|
|
self, |
|
|
name: str, |
|
|
level: Optional[int], |
|
|
numid: Optional[int], |
|
|
ilevel: Optional[int], |
|
|
): |
|
|
self.history["names"].append(name) |
|
|
self.history["levels"].append(level) |
|
|
|
|
|
self.history["numids"].append(numid) |
|
|
self.history["indents"].append(ilevel) |
|
|
|
|
|
def prev_name(self) -> Optional[str]: |
|
|
return self.history["names"][-1] |
|
|
|
|
|
def prev_level(self) -> Optional[int]: |
|
|
return self.history["levels"][-1] |
|
|
|
|
|
def prev_numid(self) -> Optional[int]: |
|
|
return self.history["numids"][-1] |
|
|
|
|
|
def prev_indent(self) -> Optional[int]: |
|
|
return self.history["indents"][-1] |
|
|
|
|
|
def get_level(self) -> int: |
|
|
"""Return the first None index.""" |
|
|
for k, v in self.parents.items(): |
|
|
if k >= 0 and v == None: |
|
|
return k |
|
|
return 0 |
|
|
|
|
|
def walk_linear( |
|
|
self, |
|
|
body: BaseOxmlElement, |
|
|
docx_obj: DocxDocument, |
|
|
doc: DoclingDocument, |
|
|
) -> DoclingDocument: |
|
|
for element in body: |
|
|
tag_name = etree.QName(element).localname |
|
|
|
|
|
namespaces = { |
|
|
"a": "http://schemas.openxmlformats.org/drawingml/2006/main", |
|
|
"r": "http://schemas.openxmlformats.org/officeDocument/2006/relationships", |
|
|
"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main", |
|
|
} |
|
|
xpath_expr = XPath(".//a:blip", namespaces=namespaces) |
|
|
drawing_blip = xpath_expr(element) |
|
|
|
|
|
|
|
|
if element.tag.endswith("tbl"): |
|
|
try: |
|
|
self.handle_tables(element, docx_obj, doc) |
|
|
except Exception: |
|
|
_log.debug("could not parse a table, broken docx table") |
|
|
|
|
|
elif drawing_blip: |
|
|
self.handle_pictures(docx_obj, drawing_blip, doc) |
|
|
|
|
|
elif tag_name in ["sdt"]: |
|
|
sdt_content = element.find(".//w:sdtContent", namespaces=namespaces) |
|
|
if sdt_content is not None: |
|
|
|
|
|
paragraphs = sdt_content.findall(".//w:p", namespaces=namespaces) |
|
|
for p in paragraphs: |
|
|
self.handle_text_elements(p, docx_obj, doc) |
|
|
|
|
|
elif tag_name in ["p"]: |
|
|
|
|
|
self.handle_text_elements(element, docx_obj, doc) |
|
|
else: |
|
|
_log.debug(f"Ignoring element in DOCX with tag: {tag_name}") |
|
|
return doc |
|
|
|
|
|
def str_to_int(self, s: Optional[str], default: Optional[int] = 0) -> Optional[int]: |
|
|
if s is None: |
|
|
return None |
|
|
try: |
|
|
return int(s) |
|
|
except ValueError: |
|
|
return default |
|
|
|
|
|
def split_text_and_number(self, input_string: str) -> list[str]: |
|
|
match = re.match(r"(\D+)(\d+)$|^(\d+)(\D+)", input_string) |
|
|
if match: |
|
|
parts = list(filter(None, match.groups())) |
|
|
return parts |
|
|
else: |
|
|
return [input_string] |
|
|
|
|
|
def get_numId_and_ilvl( |
|
|
self, paragraph: Paragraph |
|
|
) -> tuple[Optional[int], Optional[int]]: |
|
|
|
|
|
numPr = paragraph._element.find( |
|
|
".//w:numPr", namespaces=paragraph._element.nsmap |
|
|
) |
|
|
|
|
|
if numPr is not None: |
|
|
|
|
|
numId_elem = numPr.find("w:numId", namespaces=paragraph._element.nsmap) |
|
|
ilvl_elem = numPr.find("w:ilvl", namespaces=paragraph._element.nsmap) |
|
|
numId = numId_elem.get(self.XML_KEY) if numId_elem is not None else None |
|
|
ilvl = ilvl_elem.get(self.XML_KEY) if ilvl_elem is not None else None |
|
|
|
|
|
return self.str_to_int(numId, None), self.str_to_int(ilvl, None) |
|
|
|
|
|
return None, None |
|
|
|
|
|
def get_label_and_level(self, paragraph: Paragraph) -> tuple[str, Optional[int]]: |
|
|
if paragraph.style is None: |
|
|
return "Normal", None |
|
|
label = paragraph.style.style_id |
|
|
if label is None: |
|
|
return "Normal", None |
|
|
if ":" in label: |
|
|
parts = label.split(":") |
|
|
|
|
|
if len(parts) == 2: |
|
|
return parts[0], self.str_to_int(parts[1], None) |
|
|
|
|
|
parts = self.split_text_and_number(label) |
|
|
|
|
|
if "Heading" in label and len(parts) == 2: |
|
|
parts.sort() |
|
|
label_str: str = "" |
|
|
label_level: Optional[int] = 0 |
|
|
if parts[0] == "Heading": |
|
|
label_str = parts[0] |
|
|
label_level = self.str_to_int(parts[1], None) |
|
|
if parts[1] == "Heading": |
|
|
label_str = parts[1] |
|
|
label_level = self.str_to_int(parts[0], None) |
|
|
return label_str, label_level |
|
|
else: |
|
|
return label, None |
|
|
|
|
|
def handle_text_elements( |
|
|
self, |
|
|
element: BaseOxmlElement, |
|
|
docx_obj: DocxDocument, |
|
|
doc: DoclingDocument, |
|
|
) -> None: |
|
|
paragraph = Paragraph(element, docx_obj) |
|
|
|
|
|
if paragraph.text is None: |
|
|
return |
|
|
text = paragraph.text.strip() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
is_numbered = False |
|
|
p_style_id, p_level = self.get_label_and_level(paragraph) |
|
|
numid, ilevel = self.get_numId_and_ilvl(paragraph) |
|
|
|
|
|
if numid == 0: |
|
|
numid = None |
|
|
|
|
|
|
|
|
if ( |
|
|
numid is not None |
|
|
and ilevel is not None |
|
|
and p_style_id not in ["Title", "Heading"] |
|
|
): |
|
|
self.add_listitem( |
|
|
doc, |
|
|
numid, |
|
|
ilevel, |
|
|
text, |
|
|
is_numbered, |
|
|
) |
|
|
self.update_history(p_style_id, p_level, numid, ilevel) |
|
|
return |
|
|
elif ( |
|
|
numid is None |
|
|
and self.prev_numid() is not None |
|
|
and p_style_id not in ["Title", "Heading"] |
|
|
): |
|
|
if self.level_at_new_list: |
|
|
for key in range(len(self.parents)): |
|
|
if key >= self.level_at_new_list: |
|
|
self.parents[key] = None |
|
|
self.level = self.level_at_new_list - 1 |
|
|
self.level_at_new_list = None |
|
|
else: |
|
|
for key in range(len(self.parents)): |
|
|
self.parents[key] = None |
|
|
self.level = 0 |
|
|
|
|
|
if p_style_id in ["Title"]: |
|
|
for key in range(len(self.parents)): |
|
|
self.parents[key] = None |
|
|
self.parents[0] = doc.add_text( |
|
|
parent=None, label=DocItemLabel.TITLE, text=text |
|
|
) |
|
|
elif "Heading" in p_style_id: |
|
|
self.add_header(doc, p_level, text) |
|
|
|
|
|
elif p_style_id in [ |
|
|
"Paragraph", |
|
|
"Normal", |
|
|
"Subtitle", |
|
|
"Author", |
|
|
"DefaultText", |
|
|
"ListParagraph", |
|
|
"ListBullet", |
|
|
"Quote", |
|
|
]: |
|
|
level = self.get_level() |
|
|
doc.add_text( |
|
|
label=DocItemLabel.PARAGRAPH, parent=self.parents[level - 1], text=text |
|
|
) |
|
|
|
|
|
else: |
|
|
|
|
|
|
|
|
level = self.get_level() |
|
|
doc.add_text( |
|
|
label=DocItemLabel.PARAGRAPH, parent=self.parents[level - 1], text=text |
|
|
) |
|
|
|
|
|
self.update_history(p_style_id, p_level, numid, ilevel) |
|
|
return |
|
|
|
|
|
def add_header( |
|
|
self, doc: DoclingDocument, curr_level: Optional[int], text: str |
|
|
) -> None: |
|
|
level = self.get_level() |
|
|
if isinstance(curr_level, int): |
|
|
if curr_level > level: |
|
|
|
|
|
for i in range(level, curr_level): |
|
|
self.parents[i] = doc.add_group( |
|
|
parent=self.parents[i - 1], |
|
|
label=GroupLabel.SECTION, |
|
|
name=f"header-{i}", |
|
|
) |
|
|
elif curr_level < level: |
|
|
|
|
|
for key in range(len(self.parents)): |
|
|
if key >= curr_level: |
|
|
self.parents[key] = None |
|
|
|
|
|
self.parents[curr_level] = doc.add_heading( |
|
|
parent=self.parents[curr_level - 1], |
|
|
text=text, |
|
|
level=curr_level, |
|
|
) |
|
|
else: |
|
|
self.parents[self.level] = doc.add_heading( |
|
|
parent=self.parents[self.level - 1], |
|
|
text=text, |
|
|
level=1, |
|
|
) |
|
|
return |
|
|
|
|
|
def add_listitem( |
|
|
self, |
|
|
doc: DoclingDocument, |
|
|
numid: int, |
|
|
ilevel: int, |
|
|
text: str, |
|
|
is_numbered: bool = False, |
|
|
) -> None: |
|
|
enum_marker = "" |
|
|
|
|
|
level = self.get_level() |
|
|
prev_indent = self.prev_indent() |
|
|
if self.prev_numid() is None: |
|
|
self.level_at_new_list = level |
|
|
|
|
|
self.parents[level] = doc.add_group( |
|
|
label=GroupLabel.LIST, name="list", parent=self.parents[level - 1] |
|
|
) |
|
|
|
|
|
|
|
|
self.listIter += 1 |
|
|
if is_numbered: |
|
|
enum_marker = str(self.listIter) + "." |
|
|
is_numbered = True |
|
|
doc.add_list_item( |
|
|
marker=enum_marker, |
|
|
enumerated=is_numbered, |
|
|
parent=self.parents[level], |
|
|
text=text, |
|
|
) |
|
|
|
|
|
elif ( |
|
|
self.prev_numid() == numid |
|
|
and self.level_at_new_list is not None |
|
|
and prev_indent is not None |
|
|
and prev_indent < ilevel |
|
|
): |
|
|
for i in range( |
|
|
self.level_at_new_list + prev_indent + 1, |
|
|
self.level_at_new_list + ilevel + 1, |
|
|
): |
|
|
|
|
|
|
|
|
self.listIter = 0 |
|
|
if is_numbered: |
|
|
self.parents[i] = doc.add_group( |
|
|
label=GroupLabel.ORDERED_LIST, |
|
|
name="list", |
|
|
parent=self.parents[i - 1], |
|
|
) |
|
|
else: |
|
|
self.parents[i] = doc.add_group( |
|
|
label=GroupLabel.LIST, name="list", parent=self.parents[i - 1] |
|
|
) |
|
|
|
|
|
|
|
|
self.listIter += 1 |
|
|
if is_numbered: |
|
|
enum_marker = str(self.listIter) + "." |
|
|
is_numbered = True |
|
|
doc.add_list_item( |
|
|
marker=enum_marker, |
|
|
enumerated=is_numbered, |
|
|
parent=self.parents[self.level_at_new_list + ilevel], |
|
|
text=text, |
|
|
) |
|
|
|
|
|
elif ( |
|
|
self.prev_numid() == numid |
|
|
and self.level_at_new_list is not None |
|
|
and prev_indent is not None |
|
|
and ilevel < prev_indent |
|
|
): |
|
|
for k, v in self.parents.items(): |
|
|
if k > self.level_at_new_list + ilevel: |
|
|
self.parents[k] = None |
|
|
|
|
|
|
|
|
self.listIter += 1 |
|
|
if is_numbered: |
|
|
enum_marker = str(self.listIter) + "." |
|
|
is_numbered = True |
|
|
doc.add_list_item( |
|
|
marker=enum_marker, |
|
|
enumerated=is_numbered, |
|
|
parent=self.parents[self.level_at_new_list + ilevel], |
|
|
text=text, |
|
|
) |
|
|
self.listIter = 0 |
|
|
|
|
|
elif self.prev_numid() == numid or prev_indent == ilevel: |
|
|
|
|
|
self.listIter += 1 |
|
|
if is_numbered: |
|
|
enum_marker = str(self.listIter) + "." |
|
|
is_numbered = True |
|
|
doc.add_list_item( |
|
|
marker=enum_marker, |
|
|
enumerated=is_numbered, |
|
|
parent=self.parents[level - 1], |
|
|
text=text, |
|
|
) |
|
|
return |
|
|
|
|
|
def handle_tables( |
|
|
self, |
|
|
element: BaseOxmlElement, |
|
|
docx_obj: DocxDocument, |
|
|
doc: DoclingDocument, |
|
|
) -> None: |
|
|
table: Table = Table(element, docx_obj) |
|
|
num_rows = len(table.rows) |
|
|
num_cols = len(table.columns) |
|
|
_log.debug(f"Table grid with {num_rows} rows and {num_cols} columns") |
|
|
|
|
|
if num_rows == 1 and num_cols == 1: |
|
|
cell_element = table.rows[0].cells[0] |
|
|
|
|
|
|
|
|
self.walk_linear(cell_element._element, docx_obj, doc) |
|
|
return |
|
|
|
|
|
data = TableData(num_rows=num_rows, num_cols=num_cols) |
|
|
cell_set: set[CT_Tc] = set() |
|
|
for row_idx, row in enumerate(table.rows): |
|
|
_log.debug(f"Row index {row_idx} with {len(row.cells)} populated cells") |
|
|
col_idx = 0 |
|
|
while col_idx < num_cols: |
|
|
cell: _Cell = row.cells[col_idx] |
|
|
_log.debug( |
|
|
f" col {col_idx} grid_span {cell.grid_span} grid_cols_before {row.grid_cols_before}" |
|
|
) |
|
|
if cell is None or cell._tc in cell_set: |
|
|
_log.debug(f" skipped since repeated content") |
|
|
col_idx += cell.grid_span |
|
|
continue |
|
|
else: |
|
|
cell_set.add(cell._tc) |
|
|
|
|
|
spanned_idx = row_idx |
|
|
spanned_tc: Optional[CT_Tc] = cell._tc |
|
|
while spanned_tc == cell._tc: |
|
|
spanned_idx += 1 |
|
|
spanned_tc = ( |
|
|
table.rows[spanned_idx].cells[col_idx]._tc |
|
|
if spanned_idx < num_rows |
|
|
else None |
|
|
) |
|
|
_log.debug(f" spanned before row {spanned_idx}") |
|
|
|
|
|
table_cell = TableCell( |
|
|
text=cell.text, |
|
|
row_span=spanned_idx - row_idx, |
|
|
col_span=cell.grid_span, |
|
|
start_row_offset_idx=row.grid_cols_before + row_idx, |
|
|
end_row_offset_idx=row.grid_cols_before + spanned_idx, |
|
|
start_col_offset_idx=col_idx, |
|
|
end_col_offset_idx=col_idx + cell.grid_span, |
|
|
col_header=False, |
|
|
row_header=False, |
|
|
) |
|
|
data.table_cells.append(table_cell) |
|
|
col_idx += cell.grid_span |
|
|
|
|
|
level = self.get_level() |
|
|
doc.add_table(data=data, parent=self.parents[level - 1]) |
|
|
return |
|
|
|
|
|
def handle_pictures( |
|
|
self, docx_obj: DocxDocument, drawing_blip: Any, doc: DoclingDocument |
|
|
) -> None: |
|
|
def get_docx_image(drawing_blip): |
|
|
rId = drawing_blip[0].get( |
|
|
"{http://schemas.openxmlformats.org/officeDocument/2006/relationships}embed" |
|
|
) |
|
|
if rId in docx_obj.part.rels: |
|
|
|
|
|
image_part = docx_obj.part.rels[rId].target_part |
|
|
image_data = image_part.blob |
|
|
return image_data |
|
|
|
|
|
level = self.get_level() |
|
|
|
|
|
try: |
|
|
image_data = get_docx_image(drawing_blip) |
|
|
image_bytes = BytesIO(image_data) |
|
|
pil_image = Image.open(image_bytes) |
|
|
doc.add_picture( |
|
|
parent=self.parents[level - 1], |
|
|
image=ImageRef.from_pil(image=pil_image, dpi=72), |
|
|
caption=None, |
|
|
) |
|
|
except (UnidentifiedImageError, OSError) as e: |
|
|
_log.warning("Warning: image cannot be loaded by Pillow") |
|
|
doc.add_picture( |
|
|
parent=self.parents[level - 1], |
|
|
caption=None, |
|
|
) |
|
|
return |
|
|
|