PDF-Data_Extractor / extract_red_text.py
Shami96's picture
Update extract_red_text.py
f4b6b63 verified
raw
history blame
23.6 kB
#!/usr/bin/env python3
"""
extract_red_text.py
"""
import re
import json
import sys
from io import BytesIO
from docx import Document
from docx.oxml.ns import qn
# Import schema constants (TABLE_SCHEMAS, HEADING_PATTERNS, PARAGRAPH_PATTERNS, GLOBAL_SETTINGS)
# Ensure master_key.py is present in same dir / importable path
from master_key import TABLE_SCHEMAS, HEADING_PATTERNS, PARAGRAPH_PATTERNS, GLOBAL_SETTINGS
def is_red_font(run):
"""
Robust detection of 'red' font in a run.
Tries several sources:
- python-docx run.font.color.rgb (safe-guarded)
- raw XML rPr/w:color value (hex)
Returns True if color appears predominantly red.
"""
# Quick guard
if run is None:
return False
# 1) Try docx high-level color API if available
try:
col = getattr(run.font, "color", None)
if col is not None:
rgb_val = getattr(col, "rgb", None)
if rgb_val:
# rgb_val might be an RGBColor object or a tuple/list or hex-string
try:
# If it's sequence-like (tuple/list) with 3 ints
if isinstance(rgb_val, (tuple, list)) and len(rgb_val) == 3:
rr, gg, bb = rgb_val
else:
# Try string representation like 'FF0000' or 'ff0000'
hexstr = str(rgb_val).strip()
if re.fullmatch(r"[0-9A-Fa-f]{6}", hexstr):
rr, gg, bb = int(hexstr[0:2], 16), int(hexstr[2:4], 16), int(hexstr[4:6], 16)
else:
# unknown format - fall through to XML check
rr = gg = bb = None
if rr is not None:
# Heuristic thresholds for 'red-ish'
if rr > 150 and gg < 120 and bb < 120 and (rr - gg) > 30 and (rr - bb) > 30:
return True
except Exception:
# fall back to rPr introspection below
pass
except Exception:
# ignore and continue to XML method
pass
# 2) Inspect raw XML run properties for <w:color w:val="RRGGBB" />
try:
rPr = getattr(run._element, "rPr", None)
if rPr is not None:
clr = rPr.find(qn('w:color'))
if clr is not None:
val = clr.get(qn('w:val')) or clr.get('w:val') or clr.get('val')
if val and isinstance(val, str):
val = val.strip()
# sometimes color is provided as 'FF0000' hex or shorthand
if re.fullmatch(r"[0-9A-Fa-f]{6}", val):
rr, gg, bb = int(val[0:2], 16), int(val[2:4], 16), int(val[4:6], 16)
if rr > 150 and gg < 120 and bb < 120 and (rr - gg) > 30 and (rr - bb) > 30:
return True
except Exception:
pass
return False
def _prev_para_text(tbl):
"""Return text of previous paragraph node before a given table element."""
prev = tbl._tbl.getprevious()
while prev is not None and not prev.tag.endswith("}p"):
prev = prev.getprevious()
if prev is None:
return ""
# gather all text nodes under the paragraph element
return "".join(node.text for node in prev.iter() if node.tag.endswith("}t") and node.text).strip()
def normalize_text(text):
"""Normalize text for more reliable matching (collapse whitespace)."""
if text is None:
return ""
return re.sub(r'\s+', ' ', text.strip())
def fuzzy_match_heading(heading, patterns):
"""
Attempt fuzzy matching of heading against regex patterns.
patterns is a list of pattern dicts or strings.
"""
heading_norm = normalize_text(heading.upper())
for p in patterns:
if isinstance(p, dict):
pat = p.get("text", "")
else:
pat = p
try:
if re.search(pat, heading_norm, re.IGNORECASE):
return True
except re.error:
# treat as plain substring fallback
if pat and pat.upper() in heading_norm:
return True
return False
def get_table_context(tbl):
"""Return context metadata for a table to aid schema matching."""
heading = normalize_text(_prev_para_text(tbl))
headers = []
if tbl.rows:
# collect header text of first row, keeping cell order
headers = [normalize_text(c.text) for c in tbl.rows[0].cells]
col0 = [normalize_text(r.cells[0].text) for r in tbl.rows if r.cells and r.cells[0].text.strip()]
first_cell = normalize_text(tbl.rows[0].cells[0].text) if tbl.rows else ""
all_cells = []
for row in tbl.rows:
for cell in row.cells:
text = normalize_text(cell.text)
if text:
all_cells.append(text)
return {
'heading': heading,
'headers': headers,
'col0': col0,
'first_cell': first_cell,
'all_cells': all_cells,
'num_rows': len(tbl.rows),
'num_cols': len(tbl.rows[0].cells) if tbl.rows else 0
}
def calculate_schema_match_score(schema_name, spec, context):
"""
Return (score, reasons[]) for how well a table context matches a schema.
Heuristic-based scoring; vehicle registration and 'DETAILS' summary boosts added.
"""
score = 0
reasons = []
table_text = " ".join(context.get('headers', [])).lower() + " " + context.get('heading', "").lower()
# Vehicle Registration specific boost
if "Vehicle Registration" in schema_name:
vehicle_keywords = ["registration", "vehicle", "sub-contractor", "weight verification", "rfs suspension", "roadworthiness"]
keyword_matches = sum(1 for kw in vehicle_keywords if kw in table_text)
if keyword_matches >= 2:
score += 150
reasons.append(f"Vehicle keywords matched: {keyword_matches}")
elif keyword_matches >= 1:
score += 75
reasons.append(f"Some vehicle keywords matched: {keyword_matches}")
# Summary DETAILS boost
if "Summary" in schema_name and "details" in table_text:
score += 100
reasons.append("Summary with DETAILS found")
if "Summary" not in schema_name and "details" in table_text:
score -= 75
reasons.append("Non-summary schema penalized due to DETAILS column presence")
# Context exclusions
for exclusion in spec.get("context_exclusions", []):
if exclusion.lower() in table_text:
score -= 50
reasons.append(f"Context exclusion: {exclusion}")
# Context keywords positive matches
kw_count = 0
for kw in spec.get("context_keywords", []):
if kw.lower() in table_text:
kw_count += 1
if kw_count:
score += kw_count * 15
reasons.append(f"Context keywords matched: {kw_count}")
# First-cell exact match
if context.get('first_cell') and context['first_cell'].upper() == schema_name.upper():
score += 100
reasons.append("Exact first cell match")
# Heading pattern match
for h in spec.get("headings", []) or []:
pat = h.get("text") if isinstance(h, dict) and h.get("text") else h
try:
if pat and re.search(pat, context.get('heading', ""), re.IGNORECASE):
score += 50
reasons.append(f"Heading regex matched: {pat}")
break
except re.error:
if pat and pat.lower() in context.get('heading', "").lower():
score += 50
reasons.append(f"Heading substring matched: {pat}")
break
# Column header matching (strict)
if spec.get("columns"):
cols = [normalize_text(c) for c in spec["columns"]]
matches = 0
for col in cols:
if any(col.upper() in h.upper() for h in context.get('headers', [])):
matches += 1
if matches == len(cols):
score += 60
reasons.append("All expected columns matched exactly")
elif matches > 0:
score += matches * 20
reasons.append(f"Partial column matches: {matches}/{len(cols)}")
# Label matching for left-oriented tables
if spec.get("orientation") == "left":
labels = [normalize_text(lbl) for lbl in spec.get("labels", [])]
matches = 0
for lbl in labels:
if any(lbl.upper() in c.upper() or c.upper() in lbl.upper() for c in context.get('col0', [])):
matches += 1
if matches > 0:
score += (matches / max(1, len(labels))) * 30
reasons.append(f"Left-orientation label matches: {matches}/{len(labels)}")
# Row1 (header row) flexible matching
elif spec.get("orientation") == "row1":
labels = [normalize_text(lbl) for lbl in spec.get("labels", [])]
matches = 0.0
header_texts = " ".join(context.get('headers', [])).upper()
for lbl in labels:
label_upper = lbl.upper()
# exact in any header
if any(label_upper in h.upper() for h in context.get('headers', [])):
matches += 1.0
else:
# partial words from label in header_texts
for word in label_upper.split():
if len(word) > 3 and word in header_texts:
matches += 0.5
break
if matches > 0:
score += (matches / max(1.0, len(labels))) * 40
reasons.append(f"Row1 header-like matches: {matches}/{len(labels)}")
# Special handling for declaration schemas
if schema_name == "Operator Declaration":
# boost if 'print name' first cell and heading indicates operator declaration
if context.get('first_cell', "").upper().startswith("PRINT"):
if "OPERATOR DECLARATION" in context.get('heading', "").upper():
score += 80
reasons.append("Operator Declaration context & first-cell indicate match")
elif any("MANAGER" in c.upper() for c in context.get('all_cells', [])):
score += 60
reasons.append("Manager found in cells for Operator Declaration")
if schema_name == "NHVAS Approved Auditor Declaration":
if context.get('first_cell', "").upper().startswith("PRINT"):
# penalize where manager words appear (to reduce false positives)
if any("MANAGER" in c.upper() for c in context.get('all_cells', [])):
score -= 50
reasons.append("Penalty: found manager text in auditor declaration table")
return score, reasons
def match_table_schema(tbl):
"""
Iterate TABLE_SCHEMAS and pick best match by score threshold.
Returns schema name or None when below threshold.
"""
context = get_table_context(tbl)
best_match = None
best_score = float("-inf")
for name, spec in TABLE_SCHEMAS.items():
try:
score, reasons = calculate_schema_match_score(name, spec, context)
except Exception:
score, reasons = 0, ["error computing score"]
if score > best_score:
best_score = score
best_match = name
# threshold to avoid spurious picks
if best_score >= 20:
return best_match
return None
def check_multi_schema_table(tbl):
"""
Identify tables that contain multiple logical schemas (e.g., Operator Information + Contact Details)
Return list of schema names if multi, else None.
"""
context = get_table_context(tbl)
operator_labels = ["Operator name (Legal entity)", "NHVAS Accreditation No.", "Registered trading name/s",
"Australian Company Number", "NHVAS Manual"]
contact_labels = ["Operator business address", "Operator Postal address", "Email address", "Operator Telephone Number"]
has_operator = any(any(op_lbl.upper() in cell.upper() for op_lbl in operator_labels) for cell in context.get('col0', []))
has_contact = any(any(cont_lbl.upper() in cell.upper() for cont_lbl in contact_labels) for cell in context.get('col0', []))
if has_operator and has_contact:
return ["Operator Information", "Operator contact details"]
return None
def extract_multi_schema_table(tbl, schemas):
"""
For tables that embed multiple schema sections vertically (left orientation), split and extract.
Returns a dict mapping schema_name -> {label: [values,...]}
"""
result = {}
for schema_name in schemas:
if schema_name not in TABLE_SCHEMAS:
continue
spec = TABLE_SCHEMAS[schema_name]
schema_data = {}
# iterate rows and match the left-most cell against spec labels
for ri, row in enumerate(tbl.rows):
if not row.cells:
continue
row_label = normalize_text(row.cells[0].text)
belongs = False
matched_label = None
for spec_label in spec.get("labels", []):
spec_norm = normalize_text(spec_label).upper()
row_norm = row_label.upper()
if spec_norm == row_norm or spec_norm in row_norm or row_norm in spec_norm:
belongs = True
matched_label = spec_label
break
if not belongs:
continue
# gather red-text from the row's value cells (all others)
for ci, cell in enumerate(row.cells[1:], start=1):
red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip()
if red_txt:
schema_data.setdefault(matched_label, []).append(red_txt)
if schema_data:
result[schema_name] = schema_data
return result
def extract_table_data(tbl, schema_name, spec):
"""
Extract red text from a table for a given schema.
Special handling for Vehicle Registration (row1 header orientation).
"""
# Vehicle Registration special-case (headers in first row)
if "Vehicle Registration" in schema_name:
print(f" 🚗 EXTRACTION FIX: Processing Vehicle Registration table")
labels = spec.get("labels", [])
collected = {lbl: [] for lbl in labels}
seen = {lbl: set() for lbl in labels}
if len(tbl.rows) < 2:
print(" ❌ Vehicle table has less than 2 rows; skipping")
return {}
header_row = tbl.rows[0]
column_mapping = {}
print(f" 📋 Mapping {len(header_row.cells)} header cells to labels")
for col_idx, cell in enumerate(header_row.cells):
header_text = normalize_text(cell.text).strip()
if not header_text:
continue
print(f" Column {col_idx}: '{header_text}'")
best_match = None
best_score = 0.0
for label in labels:
# exact match
if header_text.upper() == label.upper():
best_match = label
best_score = 1.0
break
# partial token overlap scoring
header_words = set(word.upper() for word in header_text.split() if len(word) > 2)
label_words = set(word.upper() for word in label.split() if len(word) > 2)
if header_words and label_words:
common = header_words.intersection(label_words)
if common:
score = len(common) / max(len(header_words), len(label_words))
if score > best_score and score >= 0.35: # relaxed threshold for OCR noise
best_score = score
best_match = label
if best_match:
column_mapping[col_idx] = best_match
print(f" ✅ Mapped to: '{best_match}' (score: {best_score:.2f})")
else:
# additional heuristics: simple substring matches
for label in labels:
if label.lower() in header_text.lower() or header_text.lower() in label.lower():
column_mapping[col_idx] = label
print(f" ✅ Mapped by substring to: '{label}'")
break
else:
print(f" ⚠️ No mapping found for '{header_text}'")
print(f" 📊 Total column mappings: {len(column_mapping)}")
# Extract data rows
for row_idx in range(1, len(tbl.rows)):
row = tbl.rows[row_idx]
print(f" 📌 Processing data row {row_idx}")
for col_idx, cell in enumerate(row.cells):
if col_idx not in column_mapping:
continue
label = column_mapping[col_idx]
red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip()
if red_txt:
print(f" 🔴 Found red text in '{label}': '{red_txt}'")
if red_txt not in seen[label]:
seen[label].add(red_txt)
collected[label].append(red_txt)
result = {k: v for k, v in collected.items() if v}
print(f" ✅ Vehicle Registration extracted: {len(result)} columns with data")
return result
# Generic extraction for other table types
labels = spec.get("labels", []) + [schema_name]
collected = {lbl: [] for lbl in labels}
seen = {lbl: set() for lbl in labels}
by_col = (spec.get("orientation") == "row1")
start_row = 1 if by_col else 0
rows = tbl.rows[start_row:]
for ri, row in enumerate(rows):
for ci, cell in enumerate(row.cells):
red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip()
if not red_txt:
continue
if by_col:
# column-wise mapping (header labels)
if ci < len(spec.get("labels", [])):
lbl = spec["labels"][ci]
else:
lbl = schema_name
else:
# left-oriented: match left label
raw_label = normalize_text(row.cells[0].text)
lbl = None
for spec_label in spec.get("labels", []):
if normalize_text(spec_label).upper() == raw_label.upper():
lbl = spec_label
break
if not lbl:
for spec_label in spec.get("labels", []):
spec_norm = normalize_text(spec_label).upper()
raw_norm = raw_label.upper()
if spec_norm in raw_norm or raw_norm in spec_norm:
lbl = spec_label
break
if not lbl:
lbl = schema_name
if red_txt not in seen[lbl]:
seen[lbl].add(red_txt)
collected[lbl].append(red_txt)
return {k: v for k, v in collected.items() if v}
def extract_red_text(input_doc):
"""
Main extraction function.
Accepts a docx.Document object or a path string (filename).
Returns dictionary of extracted red-text organized by schema.
"""
if isinstance(input_doc, str):
doc = Document(input_doc)
else:
doc = input_doc
out = {}
table_count = 0
for tbl in doc.tables:
table_count += 1
# Check for multi-schema tables first
multi_schemas = check_multi_schema_table(tbl)
if multi_schemas:
multi_data = extract_multi_schema_table(tbl, multi_schemas)
for schema_name, schema_data in multi_data.items():
if schema_data:
if schema_name in out:
for k, v in schema_data.items():
out[schema_name].setdefault(k, []).extend(v)
else:
out[schema_name] = schema_data
continue
# match a single schema
schema = match_table_schema(tbl)
if not schema:
# no confident schema match
continue
spec = TABLE_SCHEMAS.get(schema, {})
data = extract_table_data(tbl, schema, spec)
if data:
if schema in out:
for k, v in data.items():
out[schema].setdefault(k, []).extend(v)
else:
out[schema] = data
# Paragraph-level red-text extraction (with contextual heading resolution)
paras = {}
for idx, para in enumerate(doc.paragraphs):
red_txt = "".join(r.text for r in para.runs if is_red_font(r)).strip()
if not red_txt:
continue
# attempt to find nearest preceding heading paragraph (using HEADING_PATTERNS)
context = None
for j in range(idx - 1, -1, -1):
txt = normalize_text(doc.paragraphs[j].text)
if not txt:
continue
all_patterns = HEADING_PATTERNS.get("main", []) + HEADING_PATTERNS.get("sub", [])
if any(re.search(p, txt, re.IGNORECASE) for p in all_patterns):
context = txt
break
# fallback: date-line mapping for 'Date' single-line red texts
if not context and re.fullmatch(PARAGRAPH_PATTERNS.get("date_line", r"^\s*\d{1,2}(?:st|nd|rd|th)?\s+[A-Za-z]+\s+\d{4}\s*$|^Date$"), red_txt):
context = "Date"
if not context:
context = "(para)"
paras.setdefault(context, []).append(red_txt)
if paras:
out["paragraphs"] = paras
return out
def extract_red_text_filelike(input_file, output_file):
"""
Accepts:
- input_file: file-like object (BytesIO/File) or path
- output_file: file-like object (opened for writing text) or path
Returns the parsed dictionary.
Writes the JSON to output_file if possible.
"""
# Reset file-like if necessary
if hasattr(input_file, "seek"):
try:
input_file.seek(0)
except Exception:
pass
# Load Document
if isinstance(input_file, (str, bytes)):
doc = Document(input_file)
else:
doc = Document(input_file)
result = extract_red_text(doc)
# Write result out
if hasattr(output_file, "write"):
json.dump(result, output_file, indent=2, ensure_ascii=False)
try:
output_file.flush()
except Exception:
pass
else:
with open(output_file, "w", encoding="utf-8") as f:
json.dump(result, f, indent=2, ensure_ascii=False)
return result
if __name__ == "__main__":
# Backwards-compatible script entry point
if len(sys.argv) == 3:
input_docx = sys.argv[1]
output_json = sys.argv[2]
try:
doc = Document(input_docx)
word_data = extract_red_text(doc)
with open(output_json, 'w', encoding='utf-8') as f:
json.dump(word_data, f, indent=2, ensure_ascii=False)
print(json.dumps(word_data, indent=2, ensure_ascii=False))
except Exception as e:
print("Error during extraction:", e)
raise
else:
print("To use as a module: extract_red_text_filelike(input_file, output_file)")