PDF-Data_Extractor / extract_red_text.py
Shami96's picture
Update extract_red_text.py
dff6f36 verified
raw
history blame
16.6 kB
#!/usr/bin/env python3
"""
extract_red_text.py - Enhanced version with improved red text detection and master key alignment
"""
from __future__ import annotations
import json
import re
import sys
import logging
from collections import defaultdict
from typing import List, Dict, Optional, Any, Tuple
# attempt to import python-docx (document processing)
try:
from docx import Document
from docx.oxml.ns import qn
from docx.shared import RGBColor
except Exception as e:
raise RuntimeError("python-docx is required. Install with: pip install python-docx") from e
# ------------------------------
# Import master_key configurations
# ------------------------------
try:
import master_key as mk
GLOBAL_SETTINGS = mk.GLOBAL_SETTINGS
EXTRA_HEADER_SYNONYMS = mk.EXTRA_HEADER_SYNONYMS
TABLE_SCHEMAS = getattr(mk, "TABLE_SCHEMAS", {})
except ImportError as e:
logging.error("Failed to import master_key.py: %s", e)
raise RuntimeError("master_key.py is required for configuration") from e
except AttributeError as e:
logging.error("Missing required configuration in master_key.py: %s", e)
raise RuntimeError("master_key.py missing required GLOBAL_SETTINGS or EXTRA_HEADER_SYNONYMS") from e
# ------------------------------
# Logging
# ------------------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
log = logging.getLogger("extract_red_text")
# ------------------------------
# Normalization & OCR-repair utilities (aligned to GLOBAL_SETTINGS)
# ------------------------------
def _apply_ocr_repair_rules(text: str) -> str:
"""Apply OCR repair rules from GLOBAL_SETTINGS."""
s = text or ""
for pat, repl in GLOBAL_SETTINGS.get("ocr_repair_rules", []):
try:
s = re.sub(pat, repl, s, flags=re.I)
except re.error:
# skip invalid rule
continue
return s
def _normalize_text(text: str) -> str:
"""Normalize text according to GLOBAL_SETTINGS (readable normalized form)."""
s = _apply_ocr_repair_rules(text or "")
norm_cfg = GLOBAL_SETTINGS.get("normalize", {})
if norm_cfg.get("replace_smart_dashes", False):
s = s.replace("–", "-").replace("—", "-")
if norm_cfg.get("lower", False):
s = s.lower()
if norm_cfg.get("strip_punctuation", False):
# keep hyphen, ampersand, parentheses, slash, colon; drop other punctuation
s = re.sub(r"[^\w\s\-\&\(\)\/:]", " ", s)
if norm_cfg.get("collapse_whitespace", False):
s = re.sub(r"\s+", " ", s)
return s.strip()
def _compact_key(text: str) -> str:
"""Create compact key (no non-word chars) for deterministic lookup."""
if text is None:
return ""
normalized = _normalize_text(text)
return re.sub(r"[^\w]", "", normalized)
def map_header_using_extra_synonyms(header_text: str) -> Optional[str]:
"""
Try deterministic mapping using EXTRA_HEADER_SYNONYMS.
Return canonical label if found, else None.
"""
if not header_text:
return None
normalized = _normalize_text(header_text)
compact = _compact_key(header_text)
# try compact key
if compact in EXTRA_HEADER_SYNONYMS:
return EXTRA_HEADER_SYNONYMS[compact]
# try normalized key directly
if normalized in EXTRA_HEADER_SYNONYMS:
return EXTRA_HEADER_SYNONYMS[normalized]
# also try case-insensitive match on keys
for k, v in EXTRA_HEADER_SYNONYMS.items():
if k.lower() == normalized.lower() or k.lower() == compact.lower():
return v
return None
# ------------------------------
# Enhanced red font detection using hf_utils pattern
# ------------------------------
def _run_is_red(run) -> bool:
"""
Enhanced red color detection for docx.run objects.
Uses multiple methods to detect red text robustly.
"""
try:
# Method 1: Check run.font.color.rgb
col = getattr(run.font, "color", None)
if col is not None and getattr(col, "rgb", None):
rgb = col.rgb
try:
# rgb may be sequence-like or have attributes
if hasattr(rgb, '__getitem__'): # sequence-like
r, g, b = rgb[0], rgb[1], rgb[2]
else: # attribute access
r = getattr(rgb, "r", None) or getattr(rgb, "red", None)
g = getattr(rgb, "g", None) or getattr(rgb, "green", None)
b = getattr(rgb, "b", None) or getattr(rgb, "blue", None)
if r is not None and g is not None and b is not None:
# Tolerant heuristic: red must be noticeably higher than green/blue
if r >= 160 and g <= 120 and b <= 120 and (r - g) >= 30 and (r - b) >= 30:
return True
except Exception:
pass
except Exception:
pass
# Method 2: Check raw XML color code
try:
rPr = run._element.rPr
if rPr is not None:
clr = rPr.find('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}color')
if clr is not None:
val = clr.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val')
if val and re.fullmatch(r"[0-9A-Fa-f]{6}", val):
rr = int(val[:2], 16)
gg = int(val[2:4], 16)
bb = int(val[4:], 16)
if rr >= 160 and gg <= 120 and bb <= 120 and (rr - gg) >= 30 and (rr - bb) >= 30:
return True
except Exception:
pass
# Method 3: Check theme color
try:
color = run.font.color
if color is not None:
theme_color = getattr(color, "theme_color", None)
if theme_color:
theme_str = str(theme_color).lower()
if "red" in theme_str or "accent_2" in theme_str: # Common red theme
return True
except Exception:
pass
# Method 4: String representation fallback
try:
if hasattr(run.font.color, "rgb") and run.font.color.rgb is not None:
s = str(run.font.color.rgb)
# Look for patterns like "FF0000" or similar high-red values
if re.search(r"[Ff]{2}0{4}|[Ee]{2}0{4}|[Dd]{2}0{4}", s):
return True
except Exception:
pass
return False
def _extract_red_text_segments(cell):
"""Extract red text segments from a table cell."""
segments = []
for p_idx, paragraph in enumerate(cell.paragraphs):
current_text = ""
current_runs = []
for r_idx, run in enumerate(paragraph.runs):
if _run_is_red(run) and run.text.strip():
current_text += run.text
current_runs.append((p_idx, r_idx, run))
else:
# End of red segment
if current_runs:
segments.append({
'text': current_text.strip(),
'runs': current_runs.copy(),
'paragraph_idx': p_idx
})
current_text = ""
current_runs = []
# Handle segment at end of paragraph
if current_runs:
segments.append({
'text': current_text.strip(),
'runs': current_runs.copy(),
'paragraph_idx': p_idx
})
return segments
def _has_red_text(cell) -> bool:
"""Check if a cell contains any red text."""
for paragraph in cell.paragraphs:
for run in paragraph.runs:
if _run_is_red(run) and run.text.strip():
return True
return False
# ------------------------------
# Enhanced table processing with schema-aware header mapping
# ------------------------------
def _process_table_with_schema_mapping(table, t_index: int) -> Dict[str, Any]:
"""Process table with enhanced header mapping using master key schemas."""
nrows = len(table.rows)
ncols = max(len(row.cells) for row in table.rows) if nrows > 0 else 0
if nrows == 0:
return {
"table_index": t_index,
"nrows": 0,
"ncols": 0,
"headers": [],
"rows": [],
"red_cells": [],
"mapped_headers": []
}
# Process headers from first row
header_row = table.rows[0]
headers = []
mapped_headers = []
for c_idx, cell in enumerate(header_row.cells[:ncols]):
cell_text = cell.text.strip()
# Try mapping using EXTRA_HEADER_SYNONYMS first
mapped = map_header_using_extra_synonyms(cell_text)
if mapped:
header_label = mapped
log.debug(f"Mapped header '{cell_text}' -> '{mapped}'")
else:
header_label = cell_text
headers.append(cell_text) # Original header
mapped_headers.append(header_label) # Mapped header
# Process all rows
rows_text = []
rows_red_cells = []
rows_red_metadata = []
for r_i, row in enumerate(table.rows):
row_texts = []
row_reds = []
row_red_meta = []
for c_i, cell in enumerate(row.cells[:ncols]):
cell_text = cell.text.strip()
# Extract red text segments with metadata
red_segments = _extract_red_text_segments(cell)
if red_segments:
# Join all red text segments
red_text_parts = [seg['text'] for seg in red_segments if seg['text']]
red_text_joined = " ".join(red_text_parts).strip()
# Store metadata about red text location
red_metadata = {
"has_red": True,
"red_text": red_text_joined,
"segments": len(red_segments),
"total_red_runs": sum(len(seg['runs']) for seg in red_segments)
}
else:
red_text_joined = None
red_metadata = {"has_red": False}
row_texts.append(cell_text)
row_reds.append(red_text_joined)
row_red_meta.append(red_metadata)
rows_text.append(row_texts)
rows_red_cells.append(row_reds)
rows_red_metadata.append(row_red_meta)
return {
"table_index": t_index,
"nrows": nrows,
"ncols": ncols,
"headers": headers, # Original headers
"mapped_headers": mapped_headers, # Mapped headers
"rows": rows_text,
"red_cells": rows_red_cells,
"red_metadata": rows_red_metadata # Additional red text metadata
}
# ------------------------------
# Extraction: paragraphs, headings, tables
# ------------------------------
def extract_from_docx(path: str) -> Dict[str, Any]:
"""Extract content from DOCX with enhanced red text detection and schema mapping."""
log.info(f"Opening document: {path}")
doc = Document(path)
headings: List[str] = []
paragraphs_red: List[Dict[str, Any]] = []
red_runs: List[Dict[str, Any]] = []
tables_out: List[Dict[str, Any]] = []
# Extract headings and paragraphs with red runs
log.info("Processing paragraphs and headings...")
for p_index, para in enumerate(doc.paragraphs):
text = para.text or ""
# Identify heading level from style name if available
style_name = getattr(para.style, "name", "") if para.style is not None else ""
is_heading = bool(re.search(r"Heading\s*\d+|HEADING|TITLE|SUBTITLE", style_name, flags=re.I)) or \
bool(re.search(r"^(MAINTENANCE|MASS|FATIGUE|NHVAS|Vehicle Registration|CORRECTIVE)", text, flags=re.I))
if is_heading:
headings.append(text.strip())
log.debug(f"Found heading: {text.strip()}")
# Gather red runs in this paragraph
paragraph_red_texts = []
char_cursor = 0
for run in para.runs:
run_text = run.text or ""
run_len = len(run_text)
if _run_is_red(run) and run_text.strip():
# Store a red run entry
rr = {
"text": run_text,
"paragraph_index": p_index,
"char_index": char_cursor,
"style_name": style_name,
"normalized_text": _normalize_text(run_text)
}
red_runs.append(rr)
paragraph_red_texts.append(run_text)
log.debug(f"Found red text in paragraph {p_index}: '{run_text.strip()}'")
char_cursor += run_len
if paragraph_red_texts:
paragraphs_red.append({
"paragraph_index": p_index,
"text": text,
"red_texts": paragraph_red_texts,
"style_name": style_name,
"red_text_joined": " ".join(paragraph_red_texts).strip()
})
# Extract tables with enhanced processing
log.info(f"Processing {len(doc.tables)} tables...")
for t_index, table in enumerate(doc.tables):
table_data = _process_table_with_schema_mapping(table, t_index)
tables_out.append(table_data)
# Log red text findings
red_cell_count = sum(1 for row in table_data["red_cells"] for cell in row if cell)
if red_cell_count > 0:
log.info(f"Table {t_index}: Found {red_cell_count} cells with red text")
# Assemble output structure
out = {
"headings": headings,
"paragraphs": paragraphs_red,
"tables": tables_out,
"red_runs": red_runs,
# Enhanced metadata
"meta": {
"source_file": path,
"total_headings": len(headings),
"total_red_paragraphs": len(paragraphs_red),
"total_tables": len(tables_out),
"total_red_runs": len(red_runs),
"total_red_cells": sum(
sum(1 for cell in row_red_cells if cell)
for table in tables_out
for row_red_cells in table["red_cells"]
),
"global_settings_used": {
"normalization": GLOBAL_SETTINGS.get("normalize", {}),
"ocr_repair_rules_count": len(GLOBAL_SETTINGS.get("ocr_repair_rules", [])),
"synonyms_count": len(EXTRA_HEADER_SYNONYMS) if EXTRA_HEADER_SYNONYMS else 0
}
}
}
return out
# ------------------------------
# Command-line interface
# ------------------------------
def main(argv):
if len(argv) < 3:
print("Usage: python extract_red_text.py input.docx output.json")
sys.exit(2)
input_docx = argv[1]
output_json = argv[2]
log.info("Starting red text extraction from: %s", input_docx)
log.info("Using master_key configuration with %d header synonyms",
len(EXTRA_HEADER_SYNONYMS) if EXTRA_HEADER_SYNONYMS else 0)
try:
result = extract_from_docx(input_docx)
except Exception as exc:
log.exception("Failed to extract from docx: %s", exc)
raise
# Save JSON pretty-printed for debugging by default
try:
with open(output_json, "w", encoding="utf-8") as fh:
json.dump(result, fh, ensure_ascii=False, indent=2)
log.info("Saved extracted data to: %s", output_json)
except Exception:
log.exception("Failed to write output JSON to %s", output_json)
raise
# Print comprehensive summary
meta = result.get("meta", {})
log.info("=== EXTRACTION SUMMARY ===")
log.info("Headings found: %d", meta.get("total_headings", 0))
log.info("Red paragraphs: %d", meta.get("total_red_paragraphs", 0))
log.info("Red runs total: %d", meta.get("total_red_runs", 0))
log.info("Tables processed: %d", meta.get("total_tables", 0))
log.info("Red cells found: %d", meta.get("total_red_cells", 0))
log.info("Header synonyms used: %d", meta.get("global_settings_used", {}).get("synonyms_count", 0))
if __name__ == "__main__":
main(sys.argv)
# Print output for verification
if len(sys.argv) >= 3:
try:
with open(sys.argv[2], 'r') as f:
print(f"\n📄 EXTRACT_RED_TEXT OUTPUT:\n{f.read()}")
except Exception as e:
print(f"\n❌ Could not read output file: {e}")