PDF-Data_Extractor / extract_red_text.py
Shami96's picture
Update extract_red_text.py
8df3e10 verified
#!/usr/bin/env python3
import re
import json
import sys
from docx import Document
from docx.oxml.ns import qn
try:
from .master_key import TABLE_SCHEMAS, HEADING_PATTERNS, PARAGRAPH_PATTERNS
except ImportError:
# When running as a script directly
from master_key import TABLE_SCHEMAS, HEADING_PATTERNS, PARAGRAPH_PATTERNS
import unicodedata # if not already imported
MONTHS = r"(January|February|March|April|May|June|July|August|September|October|November|December|Jan|Feb|Mar|Apr|Jun|Jul|Aug|Sep|Sept|Oct|Nov|Dec)"
DATE_RE = re.compile(rf"\b(\d{{1,2}})\s*(st|nd|rd|th)?\s+{MONTHS}\s+\d{{4}}\b", re.I)
DATE_NUM_RE = re.compile(r"\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b")
# permissive inline label regexes β€” allow OCR-space noise and varied punctuation
ACCRED_RE = re.compile(r"\bAccreditation\s*Number[:\s\-–:]*([A-Za-z0-9\s\.\-/,]{2,})", re.I)
EXPIRY_RE = re.compile(r"\bExpiry\s*Date[:\s\-–:]*([A-Za-z0-9\s\.\-/,]{2,})", re.I)
# Parent name aliases to prevent Mass Management vs Mass Management Summary mismatches
AMBIGUOUS_PARENTS = [
("Mass Management Summary", "Mass Management"),
("Mass Management", "Mass Management Summary"),
]
def get_red_text(cell):
reds = [r.text for p in cell.paragraphs for r in p.runs if is_red_font(r) and r.text]
reds = coalesce_numeric_runs(reds)
return normalize_text(" ".join(reds)) if reds else ""
def _compact_digits(s: str) -> str:
# "5 1 0 6 6" -> "51066"
return re.sub(r"(?<=\d)\s+(?=\d)", "", s)
def _fix_ordinal_space(s: str) -> str:
# "13 th" -> "13th"
return re.sub(r"\b(\d+)\s+(st|nd|rd|th)\b", r"\1\2", s, flags=re.I)
def normalize_header_label(s: str) -> str:
"""Normalize a header/label by stripping parentheticals & punctuation."""
s = re.sub(r"\s+", " ", s.strip())
# remove content in parentheses/brackets
s = re.sub(r"\([^)]*\)", "", s)
s = re.sub(r"\[[^]]*\]", "", s)
# unify slashes and hyphens, collapse spaces
s = s.replace("–", "-").replace("β€”", "-").replace("/", " / ").replace(" ", " ")
return s.strip()
# Canonical label aliases for Vehicle/Maintenance/General headers
LABEL_ALIASES = {
# Vehicle Registration (Maintenance)
"roadworthiness certificates": "Roadworthiness Certificates",
"maintenance records": "Maintenance Records",
"daily checks": "Daily Checks",
"fault recording / reporting": "Fault Recording/ Reporting",
"fault repair": "Fault Repair",
# Vehicle Registration (Mass)
"sub contracted vehicles statement of compliance": "Sub-contracted Vehicles Statement of Compliance",
"weight verification records": "Weight Verification Records",
"rfs suspension certification #": "RFS Suspension Certification #",
"suspension system maintenance": "Suspension System Maintenance",
"trip records": "Trip Records",
"fault recording/ reporting on suspension system": "Fault Recording/ Reporting on Suspension System",
# Common
"registration number": "Registration Number",
"no.": "No.",
"sub contractor": "Sub contractor",
"sub-contractor": "Sub contractor",
}
def looks_like_operator_declaration(context):
"""True iff heading says Operator Declaration and headers include Print Name + Position Title."""
heading = (context.get("heading") or "").strip().lower()
headers = " ".join(context.get("headers") or []).lower()
return (
"operator declaration" in heading
and "print name" in headers
and "position" in headers
and "title" in headers
)
def looks_like_auditor_declaration(context):
heading = (context.get("heading") or "").strip().lower()
headers = " ".join(context.get("headers") or []).lower()
return (
"auditor declaration" in heading
and "print name" in headers
and ("nhvr" in headers or "auditor registration number" in headers)
)
# --- NEW: header-only fallback that ignores headings and just keys on the two column names
def extract_operator_declaration_by_headers_from_end(doc):
"""
Scan tables from the end; if a table's first row contains both
'Print Name' AND 'Position Title' (case-insensitive), extract red text
from the data rows into:
{"Print Name": [...], "Position Title": [...]}
"""
for tbl in reversed(doc.tables):
if len(tbl.rows) < 2:
continue # need header + at least one data row
headers_norm = [normalize_header_label(c.text).lower() for c in tbl.rows[0].cells]
has_print = any("print name" in h for h in headers_norm)
has_pos_tit = any(("position title" in h) or ("position" in h and "title" in h) for h in headers_norm)
if not (has_print and has_pos_tit):
continue
idx_print = next((i for i, h in enumerate(headers_norm) if "print name" in h), None)
idx_pos = next((i for i, h in enumerate(headers_norm) if "position title" in h), None)
if idx_pos is None:
idx_pos = next((i for i, h in enumerate(headers_norm) if ("position" in h and "title" in h)), None)
result = {"Print Name": [], "Position Title": []}
for row in tbl.rows[1:]:
if idx_print is not None and idx_print < len(row.cells):
cell = row.cells[idx_print]
reds = [r.text for p in cell.paragraphs for r in p.runs if is_red_font(r) and r.text]
reds = coalesce_numeric_runs(reds)
txt = normalize_text(" ".join(reds))
if txt:
result["Print Name"].append(txt)
if idx_pos is not None and idx_pos < len(row.cells):
cell = row.cells[idx_pos]
reds = [r.text for p in cell.paragraphs for r in p.runs if is_red_font(r) and r.text]
reds = coalesce_numeric_runs(reds)
txt = normalize_text(" ".join(reds))
if txt:
result["Position Title"].append(txt)
if result["Print Name"] or result["Position Title"]:
return {k: v for k, v in result.items() if v}
return None
# --- end NEW helper
def canonicalize_label(s: str) -> str:
key = normalize_header_label(s).lower()
key = re.sub(r"\s+", " ", key)
return LABEL_ALIASES.get(key, s)
def bag_similarity(a: str, b: str) -> float:
"""Loose bag-of-words similarity for header↔label matching."""
aw = {w for w in re.split(r"[^A-Za-z0-9#]+", normalize_header_label(a).lower()) if len(w) > 2 or w in {"#","no"}}
bw = {w for w in re.split(r"[^A-Za-z0-9#]+", normalize_header_label(b).lower()) if len(w) > 2 or w in {"#","no"}}
if not aw or not bw:
return 0.0
inter = len(aw & bw)
return inter / max(len(aw), len(bw))
def coalesce_numeric_runs(text_list):
"""
If a cell yields ['4','5','6','9','8','7','1','2','3'] etc., join continuous single-char digit runs.
Returns ['456987123'] instead of many singles. Non-digit tokens are preserved.
"""
out, buf = [], []
for t in text_list:
if len(t) == 1 and t.isdigit():
buf.append(t)
else:
if buf:
out.append("".join(buf))
buf = []
out.append(t)
if buf:
out.append("".join(buf))
return out
def is_red_font(run):
"""Enhanced red font detection with better color checking"""
col = run.font.color
if col and col.rgb:
r, g, b = col.rgb
if r > 150 and g < 100 and b < 100 and (r-g) > 30 and (r-b) > 30:
return True
rPr = getattr(run._element, "rPr", None)
if rPr is not None:
clr = rPr.find(qn('w:color'))
if clr is not None:
val = clr.get(qn('w:val'))
if val and re.fullmatch(r"[0-9A-Fa-f]{6}", val):
rr, gg, bb = int(val[:2], 16), int(val[2:4], 16), int(val[4:], 16)
if rr > 150 and gg < 100 and bb < 100 and (rr-gg) > 30 and (rr-bb) > 30:
return True
return False
def _prev_para_text(tbl):
"""Get text from previous paragraph before table"""
prev = tbl._tbl.getprevious()
while prev is not None and not prev.tag.endswith("}p"):
prev = prev.getprevious()
if prev is None:
return ""
return "".join(node.text for node in prev.iter() if node.tag.endswith("}t") and node.text).strip()
def normalize_text(text):
"""Normalize text for better matching"""
return re.sub(r'\s+', ' ', text.strip())
def fuzzy_match_heading(heading, patterns):
"""Check if heading matches any pattern with fuzzy matching"""
heading_norm = normalize_text(heading.upper())
for pattern in patterns:
if re.search(pattern, heading_norm, re.IGNORECASE):
return True
return False
def get_table_context(tbl):
"""Get comprehensive context information for table"""
heading = normalize_text(_prev_para_text(tbl))
headers = [normalize_text(c.text) for c in tbl.rows[0].cells if c.text.strip()]
col0 = [normalize_text(r.cells[0].text) for r in tbl.rows if r.cells[0].text.strip()]
first_cell = normalize_text(tbl.rows[0].cells[0].text) if tbl.rows else ""
all_cells = []
for row in tbl.rows:
for cell in row.cells:
text = normalize_text(cell.text)
if text:
all_cells.append(text)
return {
'heading': heading,
'headers': headers,
'col0': col0,
'first_cell': first_cell,
'all_cells': all_cells,
'num_rows': len(tbl.rows),
'num_cols': len(tbl.rows[0].cells) if tbl.rows else 0
}
def calculate_schema_match_score(schema_name, spec, context):
"""Enhanced calculate match score - IMPROVED for Vehicle Registration tables"""
score = 0
reasons = []
# 🎯 VEHICLE REGISTRATION BOOST
if "Vehicle Registration" in schema_name:
vehicle_keywords = ["registration", "vehicle", "sub-contractor", "weight verification", "rfs suspension"]
table_text = " ".join(context['headers']).lower() + " " + context['heading'].lower()
keyword_matches = sum(1 for keyword in vehicle_keywords if keyword in table_text)
if keyword_matches >= 2:
score += 150 # Very high boost for vehicle tables
reasons.append(f"Vehicle Registration keywords: {keyword_matches}/5")
elif keyword_matches >= 1:
score += 75 # Medium boost
reasons.append(f"Some Vehicle Registration keywords: {keyword_matches}/5")
# 🎯 SUMMARY TABLE BOOST (existing logic)
if "Summary" in schema_name and "details" in " ".join(context['headers']).lower():
score += 100
reasons.append(f"Summary schema with DETAILS column - perfect match")
if "Summary" not in schema_name and "details" in " ".join(context['headers']).lower():
score -= 75
reasons.append(f"Non-summary schema penalized for DETAILS column presence")
# Context exclusions
if spec.get("context_exclusions"):
table_text = " ".join(context['headers']).lower() + " " + context['heading'].lower()
for exclusion in spec["context_exclusions"]:
if exclusion.lower() in table_text:
score -= 50
reasons.append(f"Context exclusion penalty: '{exclusion}' found")
# Context keywords
if spec.get("context_keywords"):
table_text = " ".join(context['headers']).lower() + " " + context['heading'].lower()
keyword_matches = 0
for keyword in spec["context_keywords"]:
if keyword.lower() in table_text:
keyword_matches += 1
if keyword_matches > 0:
score += keyword_matches * 15
reasons.append(f"Context keyword matches: {keyword_matches}/{len(spec['context_keywords'])}")
# Direct first cell match
if context['first_cell'] and context['first_cell'].upper() == schema_name.upper():
score += 100
reasons.append(f"Direct first cell match: '{context['first_cell']}'")
# Heading pattern matching
if spec.get("headings"):
for h in spec["headings"]:
if fuzzy_match_heading(context['heading'], [h["text"]]):
score += 50
reasons.append(f"Heading match: '{context['heading']}'")
break
# Column header matching
if spec.get("columns"):
cols = [normalize_text(col) for col in spec["columns"]]
matches = 0
for col in cols:
if any(col.upper() in h.upper() for h in context['headers']):
matches += 1
if matches == len(cols):
score += 60
reasons.append(f"All column headers match: {cols}")
elif matches > 0:
score += matches * 20
reasons.append(f"Partial column matches: {matches}/{len(cols)}")
# Label matching for left-oriented tables
if spec.get("orientation") == "left":
labels = [normalize_text(lbl) for lbl in spec["labels"]]
matches = 0
for lbl in labels:
if any(lbl.upper() in c.upper() or c.upper() in lbl.upper() for c in context['col0']):
matches += 1
if matches > 0:
score += (matches / len(labels)) * 30
reasons.append(f"Left orientation label matches: {matches}/{len(labels)}")
# 🎯 ENHANCED Label matching for row1-oriented tables (Vehicle Registration)
elif spec.get("orientation") == "row1":
labels = [normalize_text(lbl) for lbl in spec["labels"]]
matches = 0
for lbl in labels:
if any(lbl.upper() in h.upper() or h.upper() in lbl.upper() for h in context['headers']):
matches += 1
elif any(word.upper() in " ".join(context['headers']).upper() for word in lbl.split() if len(word) > 3):
matches += 0.5 # Partial credit
if matches > 0:
score += (matches / len(labels)) * 40
reasons.append(f"Row1 orientation header matches: {matches}/{len(labels)}")
# Special handling for Declaration tables (existing logic)
if schema_name == "Operator Declaration" and context['first_cell'].upper() == "PRINT NAME":
if "OPERATOR DECLARATION" in context['heading'].upper():
score += 80
reasons.append("Operator Declaration context match")
elif any("MANAGER" in cell.upper() for cell in context['all_cells']):
score += 60
reasons.append("Manager found in cells (likely Operator Declaration)")
if schema_name == "NHVAS Approved Auditor Declaration" and context['first_cell'].upper() == "PRINT NAME":
if any("MANAGER" in cell.upper() for cell in context['all_cells']):
score -= 50
reasons.append("Penalty: Manager found (not auditor)")
return score, reasons
def match_table_schema(tbl):
"""Improved table schema matching with explicit Attendance/Operator/Auditor guards."""
context = get_table_context(tbl)
heading_low = (context.get("heading") or "").strip().lower()
headers_norm = [normalize_header_label(h).lower() for h in context.get("headers", [])]
has_print = any("print name" in h for h in headers_norm)
has_pos = any(("position title" in h) or ("position" in h and "title" in h) for h in headers_norm)
has_namecol = any(("name" in h) and ("print name" not in h) for h in headers_norm)
has_poscol = any("position" in h for h in headers_norm)
has_aud_hint = any(("auditor" in h) or ("auditor registration" in h) for h in headers_norm)
# Force-guard: explicit headings
if "operator declaration" in heading_low and has_print and has_pos:
return "Operator Declaration"
if "auditor declaration" in heading_low and has_print:
return "NHVAS Approved Auditor Declaration"
if ("attendance" in heading_low or "attendees" in heading_low) and has_namecol and has_poscol:
return "Attendance List (Names and Position Titles)"
# Priority: auditor if signature columns + auditor hints
if has_print and has_aud_hint:
return "NHVAS Approved Auditor Declaration"
# Classic 2-col signature table β†’ Operator Declaration
if has_print and has_pos:
return "Operator Declaration"
# Heuristic fallbacks
if looks_like_auditor_declaration(context):
return "NHVAS Approved Auditor Declaration"
if looks_like_operator_declaration(context):
return "Operator Declaration"
# Score-based fallback
best_match, best_score = None, 0
for name, spec in TABLE_SCHEMAS.items():
score, _ = calculate_schema_match_score(name, spec, context)
if score > best_score:
best_score, best_match = score, name
return best_match if best_score >= 20 else None
def check_multi_schema_table(tbl):
"""Check if table contains multiple schemas and split appropriately"""
context = get_table_context(tbl)
operator_labels = ["Operator name (Legal entity)", "NHVAS Accreditation No.", "Registered trading name/s",
"Australian Company Number", "NHVAS Manual"]
contact_labels = ["Operator business address", "Operator Postal address", "Email address", "Operator Telephone Number"]
has_operator = any(any(op_lbl.upper() in cell.upper() for op_lbl in operator_labels) for cell in context['col0'])
has_contact = any(any(cont_lbl.upper() in cell.upper() for cont_lbl in contact_labels) for cell in context['col0'])
if has_operator and has_contact:
return ["Operator Information", "Operator contact details"]
return None
def extract_multi_schema_table(tbl, schemas):
"""Extract data from table with multiple schemas"""
result = {}
for schema_name in schemas:
if schema_name not in TABLE_SCHEMAS:
continue
spec = TABLE_SCHEMAS[schema_name]
schema_data = {}
for ri, row in enumerate(tbl.rows):
if ri == 0:
continue
row_label = normalize_text(row.cells[0].text)
belongs_to_schema = False
matched_label = None
for spec_label in spec["labels"]:
spec_norm = normalize_text(spec_label).upper()
row_norm = row_label.upper()
if spec_norm == row_norm or spec_norm in row_norm or row_norm in spec_norm:
belongs_to_schema = True
matched_label = spec_label
break
if not belongs_to_schema:
continue
for ci, cell in enumerate(row.cells):
red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip()
if red_txt:
if matched_label not in schema_data:
schema_data[matched_label] = []
if red_txt not in schema_data[matched_label]:
schema_data[matched_label].append(red_txt)
if schema_data:
result[schema_name] = schema_data
return result
def extract_table_data(tbl, schema_name, spec):
"""Extract red text data from table based on schema – per-row repeats for specific tables."""
# ───────────────────────────────────────────────────────────────────────────
# OPERATOR DECLARATION (row1 headers: Print Name | Position Title)
# ───────────────────────────────────────────────────────────────────────────
if schema_name == "Operator Declaration":
print(f" 🧾 EXTRACTION FIX: Processing Operator Declaration table")
labels = spec["labels"] # ["Print Name", "Position Title"]
canonical_labels = {canonicalize_label(lbl): lbl for lbl in labels}
collected = {lbl: [] for lbl in labels}
if len(tbl.rows) < 2:
print(f" ❌ Operator Declaration table has less than 2 rows")
return {}
# map header cells β†’ labels (row1 orientation)
header_row = tbl.rows[0]
column_mapping = {}
print(f" πŸ“‹ Mapping {len(header_row.cells)} header cells to labels")
for col_idx, cell in enumerate(header_row.cells):
raw_h = normalize_text(cell.text)
header_text = normalize_header_label(raw_h)
if not header_text:
continue
print(f" Column {col_idx}: '{raw_h}'")
# alias/canonical first
canon = canonicalize_label(header_text)
if canon in canonical_labels:
best_label = canonical_labels[canon]
print(f" βœ… Mapped to: '{best_label}' (alias)")
column_mapping[col_idx] = best_label
continue
# else bag-of-words similarity
best_label, best_score = None, 0.0
for canon_lab, original_lab in canonical_labels.items():
s = bag_similarity(header_text, canon_lab)
if s > best_score:
best_score, best_label = s, original_lab
if best_label and best_score >= 0.40:
print(f" βœ… Mapped to: '{best_label}' (score: {best_score:.2f})")
column_mapping[col_idx] = best_label
else:
print(f" ⚠️ No mapping found for '{raw_h}'")
print(f" πŸ“Š Total column mappings: {len(column_mapping)}")
# collect red text from the (usually single) data row
for row_idx in range(1, len(tbl.rows)):
row = tbl.rows[row_idx]
print(f" πŸ“Œ Processing data row {row_idx}")
for col_idx, cell in enumerate(row.cells):
if col_idx not in column_mapping:
continue
label = column_mapping[col_idx]
reds = [run.text for p in cell.paragraphs for run in p.runs if is_red_font(run) and run.text]
if not reds:
continue
reds = coalesce_numeric_runs(reds)
red_txt = normalize_text(" ".join(reds))
if not red_txt:
continue
print(f" πŸ”΄ Found red text in '{label}': '{red_txt}'")
collected[label].append(red_txt)
result = {k: v for k, v in collected.items() if v}
print(f" βœ… Operator Declaration extracted: {len(result)} columns with data")
return result
# ───────────────────────────────────────────────────────────────────────────
# A) Vehicle Registration tables (per-row accumulation; NO dedupe)
# ───────────────────────────────────────────────────────────────────────────
if "Vehicle Registration" in schema_name:
print(f" πŸš— EXTRACTION FIX: Processing Vehicle Registration table")
labels = spec["labels"]
canonical_labels = {canonicalize_label(lbl): lbl for lbl in labels}
collected = {lbl: [] for lbl in labels} # ← keep every row value
unmapped_bucket = {}
if len(tbl.rows) < 2:
print(f" ❌ Vehicle table has less than 2 rows")
return {}
header_row = tbl.rows[0]
column_mapping = {}
print(f" πŸ“‹ Mapping {len(header_row.cells)} header cells to labels")
for col_idx, cell in enumerate(header_row.cells):
raw_h = normalize_text(cell.text)
header_text = normalize_header_label(raw_h)
if not header_text:
continue
print(f" Column {col_idx}: '{raw_h}'")
# Try alias/canonical first
canon = canonicalize_label(header_text)
if canon in canonical_labels:
best_label = canonical_labels[canon]
print(f" βœ… Mapped to: '{best_label}' (alias)")
column_mapping[col_idx] = best_label
continue
# Else bag-of-words similarity
best_label, best_score = None, 0.0
for canon_lab, original_lab in canonical_labels.items():
s = bag_similarity(header_text, canon_lab)
if s > best_score:
best_score, best_label = s, original_lab
if best_label and best_score >= 0.40:
print(f" βœ… Mapped to: '{best_label}' (score: {best_score:.2f})")
column_mapping[col_idx] = best_label
else:
print(f" ⚠️ No mapping found for '{raw_h}'")
unmapped_bucket[raw_h] = []
print(f" πŸ“Š Total column mappings: {len(column_mapping)}")
header_texts = [normalize_text(hc.text) for hc in header_row.cells]
for row_idx in range(1, len(tbl.rows)):
row = tbl.rows[row_idx]
print(f" πŸ“Œ Processing data row {row_idx}")
for col_idx, cell in enumerate(row.cells):
reds = [run.text for p in cell.paragraphs for run in p.runs if is_red_font(run) and run.text]
if not reds:
continue
reds = coalesce_numeric_runs(reds)
red_txt = normalize_text(" ".join(reds))
if not red_txt:
continue
if col_idx in column_mapping:
label = column_mapping[col_idx]
print(f" πŸ”΄ Found red text in '{label}': '{red_txt}'")
collected[label].append(red_txt) # ← append every occurrence
else:
header_name = header_texts[col_idx] if col_idx < len(header_texts) else f"(unmapped col {col_idx})"
unmapped_bucket.setdefault(header_name, []).append(red_txt)
result = {k: v for k, v in collected.items() if v}
if unmapped_bucket:
result.update({f"UNMAPPED::{k}": v for k, v in unmapped_bucket.items() if v})
print(f" βœ… Vehicle Registration extracted: {len(result)} columns with data")
return result
# ───────────────────────────────────────────────────────────────────────────
# B) Driver / Scheduler Records Examined (per-row accumulation; NO dedupe)
# ───────────────────────────────────────────────────────────────────────────
if "Driver / Scheduler" in schema_name:
print(f" πŸ‘€ EXTRACTION FIX: Processing Driver / Scheduler table")
labels = spec["labels"]
canonical_labels = {canonicalize_label(lbl): lbl for lbl in labels}
collected = {lbl: [] for lbl in labels} # ← keep every row value
unmapped_bucket = {}
if len(tbl.rows) < 2:
print(f" ❌ Driver/Scheduler table has less than 2 rows")
return {}
header_row = tbl.rows[0]
column_mapping = {}
print(f" πŸ“‹ Mapping {len(header_row.cells)} header cells to labels")
for col_idx, cell in enumerate(header_row.cells):
raw_h = normalize_text(cell.text)
header_text = normalize_header_label(raw_h)
if not header_text:
continue
print(f" Column {col_idx}: '{raw_h}'")
# Try alias/canonical first (rarely used here, but safe)
canon = canonicalize_label(header_text)
if canon in canonical_labels:
best_label = canonical_labels[canon]
print(f" βœ… Mapped to: '{best_label}' (alias)")
column_mapping[col_idx] = best_label
continue
# Else bag-of-words similarity (good for long headings)
best_label, best_score = None, 0.0
for canon_lab, original_lab in canonical_labels.items():
s = bag_similarity(header_text, canon_lab)
if s > best_score:
best_score, best_label = s, original_lab
if best_label and best_score >= 0.40:
print(f" βœ… Mapped to: '{best_label}' (score: {best_score:.2f})")
column_mapping[col_idx] = best_label
else:
print(f" ⚠️ No mapping found for '{raw_h}'")
unmapped_bucket[raw_h] = []
print(f" πŸ“Š Total column mappings: {len(column_mapping)}")
header_texts = [normalize_text(hc.text) for hc in header_row.cells]
for row_idx in range(1, len(tbl.rows)):
row = tbl.rows[row_idx]
print(f" πŸ“Œ Processing data row {row_idx}")
for col_idx, cell in enumerate(row.cells):
reds = [run.text for p in cell.paragraphs for run in p.runs if is_red_font(run) and run.text]
if not reds:
continue
reds = coalesce_numeric_runs(reds)
red_txt = normalize_text(" ".join(reds))
if not red_txt:
continue
if col_idx in column_mapping:
label = column_mapping[col_idx]
print(f" πŸ”΄ Found red text in '{label}': '{red_txt}'")
collected[label].append(red_txt) # ← append every occurrence
else:
header_name = header_texts[col_idx] if col_idx < len(header_texts) else f"(unmapped col {col_idx})"
unmapped_bucket.setdefault(header_name, []).append(red_txt)
result = {k: v for k, v in collected.items() if v}
if unmapped_bucket:
result.update({f"UNMAPPED::{k}": v for k, v in unmapped_bucket.items() if v})
print(f" βœ… Driver / Scheduler extracted: {len(result)} columns with data")
return result
# ───────────────────────────────────────────────────────────────────────────
# ATTENDANCE LIST (keep red-only; avoid duplicates; prefer whole-cell lines)
# ───────────────────────────────────────────────────────────────────────────
if "Attendance List" in schema_name:
items, seen = [], set()
# header sniff
hdr = [normalize_text(c.text).lower() for c in (tbl.rows[0].cells if tbl.rows else [])]
start_row = 1 if (any("name" in h for h in hdr) and any("position" in h for h in hdr)) else 0
for row in tbl.rows[start_row:]:
# collect red text from each cell
reds = [get_red_text(c) for c in row.cells]
reds = [r for r in reds if r]
if not reds:
continue
# if first cell already contains "Name - Position", use it as-is
if " - " in reds[0]:
entry = reds[0]
else:
# typical 2 columns: Name | Position
if len(reds) >= 2:
entry = f"{reds[0]} - {reds[1]}"
else:
entry = reds[0]
entry = normalize_text(entry)
# collapse accidental double-ups like "A - B - A - B"
parts = [p.strip() for p in entry.split(" - ") if p.strip()]
if len(parts) >= 4 and parts[:2] == parts[2:4]:
entry = " - ".join(parts[:2])
if entry and entry not in seen:
seen.add(entry)
items.append(entry)
return {schema_name: items} if items else {}
# ───────────────────────────────────────────────────────────────────────────
# ACCREDITATION VEHICLE SUMMARY (pairwise label/value per row)
# Expected labels in spec["labels"]:
# ["Number of powered vehicles", "Number of trailing vehicles"]
# ───────────────────────────────────────────────────────────────────────────
if schema_name == "Accreditation Vehicle Summary":
labels = spec["labels"]
canonical_labels = {normalize_header_label(lbl).lower(): lbl for lbl in labels}
collected = {lbl: [] for lbl in labels}
def map_label(txt):
t = normalize_header_label(txt).lower()
if t in canonical_labels:
return canonical_labels[t]
# loose fallback
best, score = None, 0.0
for canon, original in canonical_labels.items():
s = bag_similarity(t, canon)
if s > score:
best, score = original, s
return best if score >= 0.40 else None
for row in tbl.rows:
# iterate label/value pairs across the row: (0,1), (2,3), ...
i = 0
while i + 1 < len(row.cells):
lbl_txt = normalize_text(row.cells[i].text)
val_txt = get_red_text(row.cells[i + 1])
mlabel = map_label(lbl_txt)
if mlabel and val_txt:
collected[mlabel].append(val_txt)
i += 2
return {k: v for k, v in collected.items() if v}
# ───────────────────────────────────────────────────────────────────────────
# C) Generic tables (unchanged: WITH dedupe)
# ───────────────────────────────────────────────────────────────────────────
labels = spec["labels"] + [schema_name]
collected = {lbl: [] for lbl in labels}
seen = {lbl: set() for lbl in labels}
by_col = (spec.get("orientation") == "row1")
start_row = 1 if by_col else 0
rows = tbl.rows[start_row:]
for ri, row in enumerate(rows):
for ci, cell in enumerate(row.cells):
reds = [run.text for p in cell.paragraphs for run in p.runs if is_red_font(run) and run.text]
if not reds:
continue
reds = coalesce_numeric_runs(reds)
red_txt = normalize_text(" ".join(reds))
if not red_txt:
continue
if by_col:
if ci < len(spec["labels"]):
lbl = spec["labels"][ci]
else:
lbl = schema_name
else:
raw_label = normalize_text(row.cells[0].text)
lbl = None
for spec_label in spec["labels"]:
if normalize_text(spec_label).upper() == raw_label.upper():
lbl = spec_label
break
if not lbl:
a_raw = normalize_header_label(raw_label).upper()
for spec_label in spec["labels"]:
a_spec = normalize_header_label(spec_label).upper()
if a_spec in a_raw or a_raw in a_spec:
lbl = spec_label
break
if not lbl:
lbl = schema_name
if red_txt not in seen[lbl]:
seen[lbl].add(red_txt)
collected[lbl].append(red_txt)
return {k: v for k, v in collected.items() if v}
def _try_extract_nature_inline_labels(tbl, out_dict):
# Check context
prev = normalize_text(_prev_para_text(tbl)).lower()
if "nature of the operators business" not in prev:
return False
acc_val, exp_val, para_bits = None, None, []
for row in tbl.rows[1:]:
row_text = " ".join(normalize_text(c.text) for c in row.cells if c.text.strip())
if not row_text:
continue
low = row_text.lower()
def _red_from_row():
vals = []
for c in row.cells:
for p in c.paragraphs:
reds = [r.text for r in p.runs if is_red_font(r) and r.text.strip()]
if reds:
vals.extend(reds)
return normalize_text(" ".join(coalesce_numeric_runs(vals)))
if low.startswith("accreditation number"):
v = _red_from_row() or normalize_text(row_text.split(":", 1)[-1])
acc_val = _compact_digits(v) if v else acc_val
continue
if low.startswith("expiry date"):
v = _red_from_row() or normalize_text(row_text.split(":", 1)[-1])
exp_val = _fix_ordinal_space(v) if v else exp_val
continue
# otherwise narrative line
para_bits.append(row_text)
if not (para_bits or acc_val or exp_val):
return False
sec = out_dict.setdefault("Nature of the Operators Business (Summary)", {})
if para_bits:
sec.setdefault("Nature of the Operators Business (Summary):", []).append(
normalize_text(" ".join(para_bits))
)
if acc_val:
sec.setdefault("Accreditation Number", []).append(acc_val)
if exp_val:
sec.setdefault("Expiry Date", []).append(exp_val)
return True
def extract_red_text(input_doc):
# input_doc: docx.Document object or file path
if isinstance(input_doc, str):
doc = Document(input_doc)
else:
doc = input_doc
out = {}
table_count = 0
for tbl in doc.tables:
table_count += 1
# Nature-of-business inline labels, if present as table rows
if _try_extract_nature_inline_labels(tbl, out):
continue
multi_schemas = check_multi_schema_table(tbl)
if multi_schemas:
multi_data = extract_multi_schema_table(tbl, multi_schemas)
for schema_name, schema_data in multi_data.items():
if schema_data:
if schema_name in out:
for k, v in schema_data.items():
if k in out[schema_name]:
out[schema_name][k].extend(v)
else:
out[schema_name][k] = v
else:
out[schema_name] = schema_data
continue
schema = match_table_schema(tbl)
if not schema:
continue
spec = TABLE_SCHEMAS[schema]
data = extract_table_data(tbl, schema, spec)
if data:
if schema in out:
for k, v in data.items():
if k in out[schema]:
out[schema][k].extend(v)
else:
out[schema][k] = v
else:
out[schema] = data
# paragraphs (FIX: do not return early; build full 'paras' then attach)
paras = {}
for idx, para in enumerate(doc.paragraphs):
red_txt = "".join(r.text for r in para.runs if is_red_font(r)).strip()
if not red_txt:
continue
context = None
for j in range(idx-1, -1, -1):
txt = normalize_text(doc.paragraphs[j].text)
if txt:
all_patterns = HEADING_PATTERNS["main"] + HEADING_PATTERNS["sub"]
if any(re.search(p, txt, re.IGNORECASE) for p in all_patterns):
context = txt
break
if not context and re.fullmatch(PARAGRAPH_PATTERNS["date_line"], red_txt):
context = "Date"
if not context:
context = "(para)"
paras.setdefault(context, []).append(red_txt)
if paras:
out["paragraphs"] = paras
# Fallback: ensure we capture the last-page Operator Declaration by headers
if "Operator Declaration" not in out:
op_dec = extract_operator_declaration_by_headers_from_end(doc)
if op_dec:
out["Operator Declaration"] = op_dec
# β€”β€” Handle ambiguous parents without creating unwanted duplicates β€”β€”
# Prefer the "Summary" variant when both keys derive from the same Std-style content.
summary_pairs = [
("Mass Management Summary", "Mass Management"),
("Maintenance Management Summary", "Maintenance Management"),
("Fatigue Management Summary", "Fatigue Management"),
]
for summary_key, alt_key in summary_pairs:
# if only alt exists, consider promoting it to the summary name
if alt_key in out and summary_key not in out:
# only promote if the alt content looks like a standards/details map
alt_section = out.get(alt_key)
if isinstance(alt_section, dict) and any(k.strip().startswith("Std") for k in alt_section.keys()):
out[summary_key] = alt_section
del out[alt_key]
continue
# if both exist, merge alt into summary (avoiding duplicates)
if summary_key in out and alt_key in out:
s = out[summary_key] or {}
a = out[alt_key] or {}
# Only auto-merge when both are dicts and look like Std mappings (safe heuristic)
if isinstance(s, dict) and isinstance(a, dict) and \
(any(k.strip().startswith("Std") for k in s.keys()) or any(k.strip().startswith("Std") for k in a.keys())):
for k, v in a.items():
if not v:
continue
if k in s:
# append unique items
if isinstance(s[k], list) and isinstance(v, list):
for item in v:
if item not in s[k]:
s[k].append(item)
else:
# fallback: convert to lists
s.setdefault(k, [])
for item in (v if isinstance(v, list) else [v]):
if item not in s[k]:
s[k].append(item)
else:
s[k] = v if isinstance(v, list) else [v]
out[summary_key] = s
# remove the alt key to avoid duplicate sections
del out[alt_key]
# β€”β€” add Accreditation Number and Expiry Date from Nature paragraph (do NOT edit the paragraph) β€”β€”
for sec_key, section in list(out.items()):
if not isinstance(section, dict):
continue
if re.fullmatch(r"Nature of the Operators Business \(Summary\)", sec_key, flags=re.I):
# find the main paragraph field "...(Summary):"
para_field = None
for k in section.keys():
if re.search(r"\(Summary\):\s*$", k):
para_field = k
break # <- break only when found
if not para_field:
continue
raw = section.get(para_field)
if isinstance(raw, list):
para = " ".join(str(x) for x in raw)
else:
para = str(raw or "")
m_acc = ACCRED_RE.search(para)
m_exp = EXPIRY_RE.search(para)
# labeled matches
if m_acc:
v = _compact_digits(_fix_ordinal_space(normalize_text(m_acc.group(1))))
if v:
section.setdefault("Accreditation Number", []).append(v)
if m_exp:
v = _compact_digits(_fix_ordinal_space(normalize_text(m_exp.group(1))))
if v:
section.setdefault("Expiry Date", []).append(v)
# fallback when labels are missing but values appear at the end
acc_missing = not section.get("Accreditation Number")
exp_missing = not section.get("Expiry Date")
if acc_missing or exp_missing:
# 1) Try to find the last date-like token (wordy month or numeric)
last_date_match = None
# prefer textual month matches (allowing OCR noise like "22 nd September 2023" or "202 3")
month_rx = re.compile(rf"\b\d{{1,2}}\s*(?:st|nd|rd|th)?\s+{MONTHS}\s+\d{{2,4}}\b", re.I)
for md in month_rx.finditer(para):
last_date_match = md
# fallback numeric date forms (dd/mm/yyyy or dd-mm-yyyy)
if not last_date_match:
for md in DATE_RE.finditer(para):
last_date_match = md
if not last_date_match:
for md in DATE_NUM_RE.finditer(para):
last_date_match = md
# 2) If we found a candidate expiry date, normalise and use it
if last_date_match:
date_txt = last_date_match.group(0)
# fix noisy ordinals/spacing and collapsed digit noise (e.g., "202 3" -> "2023")
date_txt = _fix_ordinal_space(date_txt)
date_txt = re.sub(r"\b(20)\s?(\d{2})\b", r"\1\2", date_txt)
date_txt = re.sub(r"\b(19)\s?(\d{2})\b", r"\1\2", date_txt)
if exp_missing:
section.setdefault("Expiry Date", []).append(normalize_text(date_txt))
# 3) If accreditation is missing, try to extract digits immediately *before* the date
if acc_missing:
before = para[: last_date_match.start()].strip()
# look for long digit run (allow spaces between digits)
m_num = re.search(r"(\d[\d\s]{3,16}\d)\s*$", before)
if m_num:
num_txt = _compact_digits(normalize_text(m_num.group(1)))
if num_txt:
section.setdefault("Accreditation Number", []).append(num_txt)
# 4) If we still didn't find an accreditation number, try scanning entire paragraph for the longest digit run
if acc_missing:
# collect digit-like tokens, collapse internal spaces and pick the longest
digit_tokens = [ _compact_digits(t) for t in re.findall(r"[\d\s]{4,}", para) ]
digit_tokens = [d for d in digit_tokens if len(re.sub(r'\D','',d)) >= 5] # require >=5 digits
if digit_tokens:
# choose the longest / most plausible digits (deterministic)
digit_tokens.sort(key=lambda s: (-len(re.sub(r'\D','',s)), s))
section.setdefault("Accreditation Number", []).append(digit_tokens[0])
# 5) If expiry still missing, do a broad textual month search anywhere in the paragraph
if exp_missing:
broad_month_rx = re.compile(rf"\b\d{{1,2}}\s*(?:st|nd|rd|th)?\s+{MONTHS}\s+\d{{2,4}}\b|\b{MONTHS}\s+\d{{2,4}}\b", re.I)
md_any = list(broad_month_rx.finditer(para))
if md_any:
candidate = md_any[-1].group(0)
candidate = _fix_ordinal_space(candidate)
candidate = re.sub(r"\b(20)\s?(\d{2})\b", r"\1\2", candidate)
if candidate:
section.setdefault("Expiry Date", []).append(normalize_text(candidate))
# β€”β€” STRONGER: canonicalise & merge "X Summary" <-> "X" variants (case-insensitive) β€”β€”
def _base_name(k: str) -> str:
# remove trailing "summary" and punctuation, normalise spaces
if not isinstance(k, str):
return ""
b = re.sub(r"[\(\)\[\]\:]+", " ", k)
b = re.sub(r"\bsummary\b\s*[:\-]*", "", b, flags=re.I)
b = re.sub(r"\s+", " ", b).strip().lower()
return b
# Build index: base -> list of original keys
base_index = {}
for key in list(out.keys()):
base = _base_name(key)
if not base:
continue
base_index.setdefault(base, []).append(key)
# For each base that maps to >1 key, merge into the Summary-preferring canonical key
for base, keys in base_index.items():
if len(keys) < 2:
continue
# prefer a key that explicitly contains 'summary' (case-insensitive)
canonical = None
for k in keys:
if re.search(r"\bsummary\b", k, re.I):
canonical = k
break
# else pick the lexicographically first (deterministic)
canonical = canonical or sorted(keys, key=lambda s: s.lower())[0]
# merge everything else into canonical
for k in keys:
if k == canonical:
continue
src = out.get(k)
dst = out.get(canonical)
# only merge dict-like Std mappings (safe-guard)
if isinstance(dst, dict) and isinstance(src, dict):
for std_key, vals in src.items():
if not vals:
continue
if std_key in dst:
# append unique items preserving order
for v in vals if isinstance(vals, list) else [vals]:
if v not in dst[std_key]:
dst[std_key].append(v)
else:
dst[std_key] = list(vals) if isinstance(vals, list) else [vals]
out[canonical] = dst
# remove source key
del out[k]
else:
# If not both dicts, prefer keeping canonical and drop duplicates conservatively
if k in out:
del out[k]
return out
def extract_red_text_filelike(input_file, output_file):
"""
Accepts:
input_file: file-like object (BytesIO/File) or path
output_file: file-like object (opened for writing text) or path
"""
if hasattr(input_file, "seek"):
input_file.seek(0)
doc = Document(input_file)
result = extract_red_text(doc)
if hasattr(output_file, "write"):
json.dump(result, output_file, indent=2, ensure_ascii=False)
output_file.flush()
else:
with open(output_file, "w", encoding="utf-8") as f:
json.dump(result, f, indent=2, ensure_ascii=False)
return result
if __name__ == "__main__":
# Support both script and app/file-like usage
if len(sys.argv) == 3:
input_docx = sys.argv[1]
output_json = sys.argv[2]
doc = Document(input_docx)
word_data = extract_red_text(doc)
with open(output_json, 'w', encoding='utf-8') as f:
json.dump(word_data, f, indent=2, ensure_ascii=False)
print(json.dumps(word_data, indent=2, ensure_ascii=False))
else:
print("To use as a module: extract_red_text_filelike(input_file, output_file)")