Shami96 commited on
Commit
f4b6b63
·
verified ·
1 Parent(s): 3edd648

Update extract_red_text.py

Browse files
Files changed (1) hide show
  1. extract_red_text.py +356 -328
extract_red_text.py CHANGED
@@ -1,365 +1,357 @@
1
  #!/usr/bin/env python3
2
  """
3
  extract_red_text.py
4
- Hardened version: preserves original logic/prints while improving header-label mapping,
5
- robustness to missing hf_utils and better synonym handling for vehicle tables.
6
  """
7
-
8
  import re
9
  import json
10
  import sys
 
11
  from docx import Document
12
  from docx.oxml.ns import qn
13
 
14
- # Try to reuse your hf_utils if available (non-breaking); otherwise fall back to local helpers.
15
- try:
16
- from hf_utils import (
17
- is_red_font,
18
- normalize_text,
19
- normalize_header_text,
20
- get_clean_text,
21
- )
22
- except Exception:
23
- # Minimal compatible fallbacks if hf_utils is not present.
24
- def normalize_text(s: str) -> str:
25
- if not s:
26
- return ""
27
- s = re.sub(r"\u2013|\u2014", "-", s) # smart dashes
28
- s = re.sub(r"[^\w\s\-\&\(\)\/:]", " ", s) # keep a small set of punctuation
29
- s = re.sub(r"\s+", " ", s).strip()
30
- return s
31
-
32
- def normalize_header_text(s: str) -> str:
33
- return normalize_text(s).upper()
34
-
35
- def is_red_font(run):
36
- """Best-effort red detection fallback for when hf_utils isn't available."""
37
- try:
38
- col = getattr(run.font, "color", None)
39
- if col and getattr(col, "rgb", None):
40
- rgb = col.rgb
41
- r, g, b = rgb[0], rgb[1], rgb[2]
42
- if r > 150 and g < 120 and b < 120 and (r - max(g, b)) > 30:
43
- return True
44
- except Exception:
45
- pass
46
- # fallback to xml check
47
- try:
48
- rPr = getattr(run._element, "rPr", None)
49
- if rPr is not None:
50
- clr = rPr.find(qn('w:color'))
51
- if clr is not None:
52
- val = clr.get(qn('w:val'))
53
- if val and re.fullmatch(r"[0-9A-Fa-f]{6}", val):
54
- rr, gg, bb = int(val[:2], 16), int(val[2:4], 16), int(val[4:], 16)
55
- if rr > 150 and gg < 120 and bb < 120 and (rr - max(gg, bb)) > 30:
56
- return True
57
- except Exception:
58
- pass
59
  return False
60
 
61
- def get_clean_text(elem):
62
- return "".join(node.text for node in elem.iter() if node.tag.endswith("}t") and node.text).strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
63
 
64
- # Import master schemas and patterns (your file)
65
- from master_key import TABLE_SCHEMAS, HEADING_PATTERNS, PARAGRAPH_PATTERNS
66
 
67
- # ---------------------------------------------------------------------
68
- # Low-level helpers (kept and hardened)
69
- # ---------------------------------------------------------------------
70
  def _prev_para_text(tbl):
71
- """Get text from previous paragraph before table"""
72
  prev = tbl._tbl.getprevious()
73
  while prev is not None and not prev.tag.endswith("}p"):
74
  prev = prev.getprevious()
75
  if prev is None:
76
  return ""
 
77
  return "".join(node.text for node in prev.iter() if node.tag.endswith("}t") and node.text).strip()
78
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
  def get_table_context(tbl):
80
- """Return structured context for a table"""
81
  heading = normalize_text(_prev_para_text(tbl))
82
- headers = [normalize_text(c.text) for c in tbl.rows[0].cells if c.text.strip()] if tbl.rows else []
 
 
 
83
  col0 = [normalize_text(r.cells[0].text) for r in tbl.rows if r.cells and r.cells[0].text.strip()]
84
  first_cell = normalize_text(tbl.rows[0].cells[0].text) if tbl.rows else ""
85
  all_cells = []
86
  for row in tbl.rows:
87
  for cell in row.cells:
88
- t = normalize_text(cell.text)
89
- if t:
90
- all_cells.append(t)
91
  return {
92
- "heading": heading,
93
- "headers": headers,
94
- "col0": col0,
95
- "first_cell": first_cell,
96
- "all_cells": all_cells,
97
- "num_rows": len(tbl.rows),
98
- "num_cols": len(tbl.rows[0].cells) if tbl.rows else 0,
99
  }
100
 
101
- def fuzzy_match_heading(heading, patterns):
102
- """Return True if heading fuzzy-matches any regex patterns"""
103
- if not heading:
104
- return False
105
- heading_norm = heading.upper()
106
- for pattern in patterns:
107
- try:
108
- if re.search(pattern, heading_norm, re.IGNORECASE):
109
- return True
110
- except re.error:
111
- if pattern.upper() in heading_norm:
112
- return True
113
- return False
114
 
115
- # ---------------------------------------------------------------------
116
- # Header-to-label synonym map: improved coverage for common OCR/header variants
117
- # ---------------------------------------------------------------------
118
- HEADER_SYNONYMS = {
119
- # normalized header (upper) -> canonical label in TABLE_SCHEMAS
120
- "NO": "No.",
121
- "NO.": "No.",
122
- "REG NO": "Registration Number",
123
- "REGISTRATIONNO": "Registration Number",
124
- "REGISTRATION NUMBER": "Registration Number",
125
- "REGISTRATION": "Registration Number",
126
- "PRINT NAME": "Print Name",
127
- "NHVR OR EXEMPLAR GLOBAL AUDITOR REGISTRATION NUMBER": "NHVR or Exemplar Global Auditor Registration Number",
128
- "ROADWORTHINESS CERTIFICATES": "Roadworthiness Certificates",
129
- "ROADWORTHINESS CERTIFICATES (APPLICABLE FOR ENTRY AUDIT)": "Roadworthiness Certificates",
130
- "MAINTENANCE RECORDS": "Maintenance Records",
131
- "DAILY CHECKS": "Daily Checks",
132
- "FAULT RECORDING/ REPORTING": "Fault Recording/ Reporting",
133
- "FAULT RECORDING/REPORTING": "Fault Recording/ Reporting",
134
- "FAULT REPAIR": "Fault Repair",
135
- "WEIGHT VERIFICATION RECORDS": "Weight Verification Records",
136
- "RFS SUSPENSION CERTIFICATION #": "RFS Suspension Certification #",
137
- "SUSPENSION SYSTEM MAINTENANCE": "Suspension System Maintenance",
138
- "TRIP RECORDS": "Trip Records",
139
- "FAULT RECORDING/ REPORTING ON SUSPENSION SYSTEM": "Fault Recording/ Reporting",
140
- # short forms
141
- "REG NO.": "Registration Number",
142
- "REGISTRATION #": "Registration Number",
143
- }
144
-
145
- def map_header_to_label(header_text, labels):
146
  """
147
- Given a header_text (raw) and list of candidate labels (from schema),
148
- return the best matching label or None.
149
  """
150
- if not header_text:
151
- return None
152
- hnorm = normalize_header_text(header_text)
153
- # exact synonym map
154
- for key, lab in HEADER_SYNONYMS.items():
155
- if key in hnorm:
156
- # ensure lab exists in candidate labels (case-insensitive)
157
- for cand in labels:
158
- if normalize_header_text(cand) == normalize_header_text(lab):
159
- return cand
160
- # if it isn't in labels, still return the lab (labels sometimes omit punctuation)
161
- return lab
162
-
163
- # try exact match to any candidate label
164
- for cand in labels:
165
- if normalize_header_text(cand) == hnorm:
166
- return cand
167
-
168
- # token overlap scoring (flexible)
169
- header_words = [w for w in re.split(r"\W+", header_text) if len(w) > 2]
170
- best = (None, 0.0)
171
- for cand in labels:
172
- cand_words = [w for w in re.split(r"\W+", cand) if len(w) > 2]
173
- if not cand_words or not header_words:
174
- continue
175
- common = set(w.upper() for w in header_words).intersection(set(w.upper() for w in cand_words))
176
- score = len(common) / max(1, max(len(header_words), len(cand_words)))
177
- if score > best[1]:
178
- best = (cand, score)
179
- # lower threshold for vehicle tables / noisy OCR (accept >= 0.25)
180
- if best[1] >= 0.25:
181
- return best[0]
182
- return None
183
-
184
- # ---------------------------------------------------------------------
185
- # Matching / scoring logic (keeps original heuristics)
186
- # ---------------------------------------------------------------------
187
- def calculate_schema_match_score(schema_name, spec, context):
188
  score = 0
189
  reasons = []
190
 
191
- # Vehicle registration boost
 
 
192
  if "Vehicle Registration" in schema_name:
193
- vehicle_keywords = ["registration", "vehicle", "sub-contractor", "weight verification", "rfs suspension"]
194
- table_text = " ".join(context["headers"]).lower() + " " + context["heading"].lower()
195
- keyword_matches = sum(1 for k in vehicle_keywords if k in table_text)
196
  if keyword_matches >= 2:
197
  score += 150
198
- reasons.append(f"Vehicle Registration keywords: {keyword_matches}/5")
199
  elif keyword_matches >= 1:
200
  score += 75
201
- reasons.append(f"Some Vehicle Registration keywords: {keyword_matches}/5")
202
 
203
- # Summary boost
204
- if "Summary" in schema_name and "details" in " ".join(context["headers"]).lower():
205
  score += 100
206
- reasons.append("Summary schema with DETAILS column - perfect match")
207
- if "Summary" not in schema_name and "details" in " ".join(context["headers"]).lower():
208
- score -= 75
209
- reasons.append("Non-summary schema penalized for DETAILS column presence")
210
-
211
- # context exclusions & keywords
212
- if spec.get("context_exclusions"):
213
- table_text = " ".join(context["headers"]).lower() + " " + context["heading"].lower()
214
- for exc in spec["context_exclusions"]:
215
- if exc.lower() in table_text:
216
- score -= 50
217
- reasons.append(f"Context exclusion penalty: '{exc}'")
218
 
219
- if spec.get("context_keywords"):
220
- table_text = " ".join(context["headers"]).lower() + " " + context["heading"].lower()
221
- matches = sum(1 for kw in spec["context_keywords"] if kw.lower() in table_text)
222
- if matches:
223
- score += matches * 15
224
- reasons.append(f"Context keyword matches: {matches}/{len(spec['context_keywords'])}")
225
 
226
- # direct first-cell match
227
- if context["first_cell"] and context["first_cell"].upper() == schema_name.upper():
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
228
  score += 100
229
- reasons.append(f"Direct first cell match: '{context['first_cell']}'")
230
 
231
- # heading pattern
232
- if spec.get("headings"):
233
- for h in spec["headings"]:
234
- if isinstance(h, dict):
235
- text = h.get("text", "")
236
- else:
237
- text = h
238
- if fuzzy_match_heading(context["heading"], [text]):
239
  score += 50
240
- reasons.append(f"Heading match: '{context['heading']}'")
 
 
 
 
 
241
  break
242
 
243
- # columns matching
244
  if spec.get("columns"):
245
  cols = [normalize_text(c) for c in spec["columns"]]
246
  matches = 0
247
  for col in cols:
248
- if any(col.upper() in h.upper() for h in context["headers"]):
249
  matches += 1
250
  if matches == len(cols):
251
  score += 60
252
- reasons.append(f"All column headers match: {cols}")
253
  elif matches > 0:
254
  score += matches * 20
255
  reasons.append(f"Partial column matches: {matches}/{len(cols)}")
256
 
257
- # left orientation
258
  if spec.get("orientation") == "left":
259
  labels = [normalize_text(lbl) for lbl in spec.get("labels", [])]
260
  matches = 0
261
  for lbl in labels:
262
- if any(lbl.upper() in c.upper() or c.upper() in lbl.upper() for c in context["col0"]):
263
  matches += 1
264
  if matches > 0:
265
  score += (matches / max(1, len(labels))) * 30
266
- reasons.append(f"Left orientation label matches: {matches}/{len(labels)}")
267
 
268
- # row1 orientation
269
  elif spec.get("orientation") == "row1":
270
  labels = [normalize_text(lbl) for lbl in spec.get("labels", [])]
271
- matches = 0
 
272
  for lbl in labels:
273
- if any(lbl.upper() in h.upper() or h.upper() in lbl.upper() for h in context["headers"]):
274
- matches += 1
275
- elif any(word.upper() in " ".join(context["headers"]).upper() for word in lbl.split() if len(word) > 3):
276
- matches += 0.5
 
 
 
 
 
 
277
  if matches > 0:
278
- score += (matches / max(1, len(labels))) * 40
279
- reasons.append(f"Row1 orientation header matches: {matches}/{len(labels)}")
280
-
281
- # Declarations special cases
282
- if schema_name == "Operator Declaration" and context["first_cell"].upper().startswith("PRINT"):
283
- if "OPERATOR DECLARATION" in context["heading"].upper():
284
- score += 80
285
- reasons.append("Operator Declaration context match")
286
- elif any("MANAGER" in cell.upper() for cell in context["all_cells"]):
287
- score += 60
288
- reasons.append("Manager found in cells (likely Operator Declaration)")
289
-
290
- if schema_name == "NHVAS Approved Auditor Declaration" and context["first_cell"].upper().startswith("PRINT"):
291
- if any("MANAGER" in cell.upper() for cell in context["all_cells"]):
292
- score -= 50
293
- reasons.append("Penalty: Manager found (not auditor)")
 
 
 
 
294
 
295
  return score, reasons
296
 
 
297
  def match_table_schema(tbl):
 
 
 
 
298
  context = get_table_context(tbl)
299
  best_match = None
300
- best_score = 0
301
  for name, spec in TABLE_SCHEMAS.items():
302
- score, reasons = calculate_schema_match_score(name, spec, context)
 
 
 
303
  if score > best_score:
304
  best_score = score
305
  best_match = name
 
306
  if best_score >= 20:
307
  return best_match
308
  return None
309
 
310
- # ---------------------------------------------------------------------
311
- # Multi-schema detection & extraction (keeps original behavior)
312
- # ---------------------------------------------------------------------
313
  def check_multi_schema_table(tbl):
 
 
 
 
314
  context = get_table_context(tbl)
315
- operator_labels = [
316
- "Operator name (Legal entity)", "NHVAS Accreditation No.", "Registered trading name/s",
317
- "Australian Company Number", "NHVAS Manual"
318
- ]
319
  contact_labels = ["Operator business address", "Operator Postal address", "Email address", "Operator Telephone Number"]
320
- has_operator = any(any(op_lbl.upper() in cell.upper() for op_lbl in operator_labels) for cell in context["col0"])
321
- has_contact = any(any(cont_lbl.upper() in cell.upper() for cont_lbl in contact_labels) for cell in context["col0"])
322
  if has_operator and has_contact:
323
  return ["Operator Information", "Operator contact details"]
324
  return None
325
 
 
326
  def extract_multi_schema_table(tbl, schemas):
 
 
 
 
327
  result = {}
328
  for schema_name in schemas:
329
  if schema_name not in TABLE_SCHEMAS:
330
  continue
331
  spec = TABLE_SCHEMAS[schema_name]
332
  schema_data = {}
 
333
  for ri, row in enumerate(tbl.rows):
334
- if ri == 0:
335
  continue
336
  row_label = normalize_text(row.cells[0].text)
337
- belongs_to_schema = False
338
  matched_label = None
339
  for spec_label in spec.get("labels", []):
340
  spec_norm = normalize_text(spec_label).upper()
341
  row_norm = row_label.upper()
342
  if spec_norm == row_norm or spec_norm in row_norm or row_norm in spec_norm:
343
- belongs_to_schema = True
344
  matched_label = spec_label
345
  break
346
- if not belongs_to_schema:
347
  continue
348
- for ci, cell in enumerate(row.cells):
 
349
  red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip()
350
  if red_txt:
351
- schema_data.setdefault(matched_label, [])
352
- if red_txt not in schema_data[matched_label]:
353
- schema_data[matched_label].append(red_txt)
354
  if schema_data:
355
  result[schema_name] = schema_data
356
  return result
357
 
358
- # ---------------------------------------------------------------------
359
- # Extraction: special-case for Vehicle Registration tables (row1) and generic fallback
360
- # ---------------------------------------------------------------------
361
  def extract_table_data(tbl, schema_name, spec):
362
- # Vehicle registration special handling
 
 
 
 
363
  if "Vehicle Registration" in schema_name:
364
  print(f" 🚗 EXTRACTION FIX: Processing Vehicle Registration table")
365
  labels = spec.get("labels", [])
@@ -367,12 +359,11 @@ def extract_table_data(tbl, schema_name, spec):
367
  seen = {lbl: set() for lbl in labels}
368
 
369
  if len(tbl.rows) < 2:
370
- print(" ❌ Vehicle table has less than 2 rows")
371
  return {}
372
 
373
  header_row = tbl.rows[0]
374
  column_mapping = {}
375
-
376
  print(f" 📋 Mapping {len(header_row.cells)} header cells to labels")
377
 
378
  for col_idx, cell in enumerate(header_row.cells):
@@ -380,56 +371,62 @@ def extract_table_data(tbl, schema_name, spec):
380
  if not header_text:
381
  continue
382
  print(f" Column {col_idx}: '{header_text}'")
383
- mapped = map_header_to_label(header_text, labels)
384
- if mapped:
385
- # find exact candidate label string (preserve original label spelling if possible)
386
- chosen = None
387
- for cand in labels:
388
- if normalize_header_text(cand) == normalize_header_text(mapped):
389
- chosen = cand
390
- break
391
- column_mapping[col_idx] = chosen or mapped
392
- print(f" ✅ Mapped to: '{column_mapping[col_idx]}'")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
393
  else:
394
- # fallback: try fuzzy token overlap directly with candidate labels
395
- best = None
396
- best_score = 0.0
397
- hwords = [w for w in re.split(r"\W+", header_text) if len(w) > 2]
398
- for cand in labels:
399
- cwords = [w for w in re.split(r"\W+", cand) if len(w) > 2]
400
- if not cwords or not hwords:
401
- continue
402
- common = set(w.upper() for w in hwords).intersection(set(w.upper() for w in cwords))
403
- score = len(common) / max(1, max(len(hwords), len(cwords)))
404
- if score > best_score:
405
- best = cand
406
- best_score = score
407
- if best and best_score >= 0.25:
408
- column_mapping[col_idx] = best
409
- print(f" ✅ Fuzzy-mapped to: '{best}' (score: {best_score:.2f})")
410
  else:
411
  print(f" ⚠️ No mapping found for '{header_text}'")
412
 
413
  print(f" 📊 Total column mappings: {len(column_mapping)}")
414
 
415
- # Extract red text from data rows
416
  for row_idx in range(1, len(tbl.rows)):
417
  row = tbl.rows[row_idx]
418
  print(f" 📌 Processing data row {row_idx}")
419
  for col_idx, cell in enumerate(row.cells):
420
- if col_idx in column_mapping:
421
- label = column_mapping[col_idx]
422
- red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip()
423
- if red_txt:
424
- print(f" 🔴 Found red text in '{label}': '{red_txt}'")
425
- if red_txt not in seen.setdefault(label, set()):
426
- seen[label].add(red_txt)
427
- collected.setdefault(label, []).append(red_txt)
 
 
428
  result = {k: v for k, v in collected.items() if v}
429
  print(f" ✅ Vehicle Registration extracted: {len(result)} columns with data")
430
  return result
431
 
432
- # Generic fallback extraction logic
433
  labels = spec.get("labels", []) + [schema_name]
434
  collected = {lbl: [] for lbl in labels}
435
  seen = {lbl: set() for lbl in labels}
@@ -442,12 +439,15 @@ def extract_table_data(tbl, schema_name, spec):
442
  red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip()
443
  if not red_txt:
444
  continue
 
445
  if by_col:
 
446
  if ci < len(spec.get("labels", [])):
447
  lbl = spec["labels"][ci]
448
  else:
449
  lbl = schema_name
450
  else:
 
451
  raw_label = normalize_text(row.cells[0].text)
452
  lbl = None
453
  for spec_label in spec.get("labels", []):
@@ -463,112 +463,140 @@ def extract_table_data(tbl, schema_name, spec):
463
  break
464
  if not lbl:
465
  lbl = schema_name
466
- if red_txt not in seen.setdefault(lbl, set()):
 
467
  seen[lbl].add(red_txt)
468
- collected.setdefault(lbl, []).append(red_txt)
 
469
  return {k: v for k, v in collected.items() if v}
470
 
471
- # ---------------------------------------------------------------------
472
- # Main extraction: process all tables then paragraphs
473
- # ---------------------------------------------------------------------
474
  def extract_red_text(input_doc):
 
 
 
 
 
475
  if isinstance(input_doc, str):
476
  doc = Document(input_doc)
477
  else:
478
  doc = input_doc
 
479
  out = {}
480
  table_count = 0
481
 
482
  for tbl in doc.tables:
483
  table_count += 1
 
484
  multi_schemas = check_multi_schema_table(tbl)
485
  if multi_schemas:
486
  multi_data = extract_multi_schema_table(tbl, multi_schemas)
487
  for schema_name, schema_data in multi_data.items():
488
  if schema_data:
489
- # merge safely and dedupe
490
- existing = out.get(schema_name, {})
491
- for k, v in schema_data.items():
492
- existing.setdefault(k, [])
493
- for val in v:
494
- if val not in existing[k]:
495
- existing[k].append(val)
496
- out[schema_name] = existing
497
  continue
498
 
 
499
  schema = match_table_schema(tbl)
500
  if not schema:
 
501
  continue
502
- spec = TABLE_SCHEMAS[schema]
503
  data = extract_table_data(tbl, schema, spec)
504
  if data:
505
- existing = out.get(schema, {})
506
- for k, v in data.items():
507
- existing.setdefault(k, [])
508
- for val in v:
509
- if val not in existing[k]:
510
- existing[k].append(val)
511
- out[schema] = existing
512
-
513
- # Paragraph red-text extraction with context
514
  paras = {}
515
  for idx, para in enumerate(doc.paragraphs):
516
  red_txt = "".join(r.text for r in para.runs if is_red_font(r)).strip()
517
  if not red_txt:
518
  continue
519
 
520
- # find a heading context by scanning backwards
521
  context = None
522
  for j in range(idx - 1, -1, -1):
523
  txt = normalize_text(doc.paragraphs[j].text)
524
- if txt:
525
- patterns = HEADING_PATTERNS["main"] + HEADING_PATTERNS["sub"]
526
- if any(re.search(p, txt, re.IGNORECASE) for p in patterns):
527
- context = txt
528
- break
 
529
 
530
- # special-case date-like lines
531
- if not context and re.fullmatch(PARAGRAPH_PATTERNS.get("date_line", r".*"), red_txt):
532
  context = "Date"
533
 
534
  if not context:
535
  context = "(para)"
536
- paras.setdefault(context, [])
537
- if red_txt not in paras[context]:
538
- paras[context].append(red_txt)
539
 
540
  if paras:
541
  out["paragraphs"] = paras
 
542
  return out
543
 
544
- # ---------------------------------------------------------------------
545
- # File wrapper to support your existing calls
546
- # ---------------------------------------------------------------------
547
  def extract_red_text_filelike(input_file, output_file):
 
 
 
 
 
 
 
 
548
  if hasattr(input_file, "seek"):
549
- input_file.seek(0)
550
- doc = Document(input_file)
 
 
 
 
 
 
 
 
 
551
  result = extract_red_text(doc)
 
 
552
  if hasattr(output_file, "write"):
553
  json.dump(result, output_file, indent=2, ensure_ascii=False)
554
- output_file.flush()
 
 
 
555
  else:
556
  with open(output_file, "w", encoding="utf-8") as f:
557
  json.dump(result, f, indent=2, ensure_ascii=False)
 
558
  return result
559
 
560
- # ---------------------------------------------------------------------
561
- # CLI entrypoint (same as before)
562
- # ---------------------------------------------------------------------
563
  if __name__ == "__main__":
 
564
  if len(sys.argv) == 3:
565
  input_docx = sys.argv[1]
566
  output_json = sys.argv[2]
567
- doc = Document(input_docx)
568
- word_data = extract_red_text(doc)
569
- # write file (dedupe already handled in merging logic above)
570
- with open(output_json, "w", encoding="utf-8") as f:
571
- json.dump(word_data, f, indent=2, ensure_ascii=False)
572
- print(json.dumps(word_data, indent=2, ensure_ascii=False))
 
 
 
573
  else:
574
  print("To use as a module: extract_red_text_filelike(input_file, output_file)")
 
1
  #!/usr/bin/env python3
2
  """
3
  extract_red_text.py
 
 
4
  """
 
5
  import re
6
  import json
7
  import sys
8
+ from io import BytesIO
9
  from docx import Document
10
  from docx.oxml.ns import qn
11
 
12
+ # Import schema constants (TABLE_SCHEMAS, HEADING_PATTERNS, PARAGRAPH_PATTERNS, GLOBAL_SETTINGS)
13
+ # Ensure master_key.py is present in same dir / importable path
14
+ from master_key import TABLE_SCHEMAS, HEADING_PATTERNS, PARAGRAPH_PATTERNS, GLOBAL_SETTINGS
15
+
16
+
17
+ def is_red_font(run):
18
+ """
19
+ Robust detection of 'red' font in a run.
20
+ Tries several sources:
21
+ - python-docx run.font.color.rgb (safe-guarded)
22
+ - raw XML rPr/w:color value (hex)
23
+ Returns True if color appears predominantly red.
24
+ """
25
+ # Quick guard
26
+ if run is None:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
  return False
28
 
29
+ # 1) Try docx high-level color API if available
30
+ try:
31
+ col = getattr(run.font, "color", None)
32
+ if col is not None:
33
+ rgb_val = getattr(col, "rgb", None)
34
+ if rgb_val:
35
+ # rgb_val might be an RGBColor object or a tuple/list or hex-string
36
+ try:
37
+ # If it's sequence-like (tuple/list) with 3 ints
38
+ if isinstance(rgb_val, (tuple, list)) and len(rgb_val) == 3:
39
+ rr, gg, bb = rgb_val
40
+ else:
41
+ # Try string representation like 'FF0000' or 'ff0000'
42
+ hexstr = str(rgb_val).strip()
43
+ if re.fullmatch(r"[0-9A-Fa-f]{6}", hexstr):
44
+ rr, gg, bb = int(hexstr[0:2], 16), int(hexstr[2:4], 16), int(hexstr[4:6], 16)
45
+ else:
46
+ # unknown format - fall through to XML check
47
+ rr = gg = bb = None
48
+ if rr is not None:
49
+ # Heuristic thresholds for 'red-ish'
50
+ if rr > 150 and gg < 120 and bb < 120 and (rr - gg) > 30 and (rr - bb) > 30:
51
+ return True
52
+ except Exception:
53
+ # fall back to rPr introspection below
54
+ pass
55
+ except Exception:
56
+ # ignore and continue to XML method
57
+ pass
58
+
59
+ # 2) Inspect raw XML run properties for <w:color w:val="RRGGBB" />
60
+ try:
61
+ rPr = getattr(run._element, "rPr", None)
62
+ if rPr is not None:
63
+ clr = rPr.find(qn('w:color'))
64
+ if clr is not None:
65
+ val = clr.get(qn('w:val')) or clr.get('w:val') or clr.get('val')
66
+ if val and isinstance(val, str):
67
+ val = val.strip()
68
+ # sometimes color is provided as 'FF0000' hex or shorthand
69
+ if re.fullmatch(r"[0-9A-Fa-f]{6}", val):
70
+ rr, gg, bb = int(val[0:2], 16), int(val[2:4], 16), int(val[4:6], 16)
71
+ if rr > 150 and gg < 120 and bb < 120 and (rr - gg) > 30 and (rr - bb) > 30:
72
+ return True
73
+ except Exception:
74
+ pass
75
+
76
+ return False
77
 
 
 
78
 
 
 
 
79
  def _prev_para_text(tbl):
80
+ """Return text of previous paragraph node before a given table element."""
81
  prev = tbl._tbl.getprevious()
82
  while prev is not None and not prev.tag.endswith("}p"):
83
  prev = prev.getprevious()
84
  if prev is None:
85
  return ""
86
+ # gather all text nodes under the paragraph element
87
  return "".join(node.text for node in prev.iter() if node.tag.endswith("}t") and node.text).strip()
88
 
89
+
90
+ def normalize_text(text):
91
+ """Normalize text for more reliable matching (collapse whitespace)."""
92
+ if text is None:
93
+ return ""
94
+ return re.sub(r'\s+', ' ', text.strip())
95
+
96
+
97
+ def fuzzy_match_heading(heading, patterns):
98
+ """
99
+ Attempt fuzzy matching of heading against regex patterns.
100
+ patterns is a list of pattern dicts or strings.
101
+ """
102
+ heading_norm = normalize_text(heading.upper())
103
+ for p in patterns:
104
+ if isinstance(p, dict):
105
+ pat = p.get("text", "")
106
+ else:
107
+ pat = p
108
+ try:
109
+ if re.search(pat, heading_norm, re.IGNORECASE):
110
+ return True
111
+ except re.error:
112
+ # treat as plain substring fallback
113
+ if pat and pat.upper() in heading_norm:
114
+ return True
115
+ return False
116
+
117
+
118
  def get_table_context(tbl):
119
+ """Return context metadata for a table to aid schema matching."""
120
  heading = normalize_text(_prev_para_text(tbl))
121
+ headers = []
122
+ if tbl.rows:
123
+ # collect header text of first row, keeping cell order
124
+ headers = [normalize_text(c.text) for c in tbl.rows[0].cells]
125
  col0 = [normalize_text(r.cells[0].text) for r in tbl.rows if r.cells and r.cells[0].text.strip()]
126
  first_cell = normalize_text(tbl.rows[0].cells[0].text) if tbl.rows else ""
127
  all_cells = []
128
  for row in tbl.rows:
129
  for cell in row.cells:
130
+ text = normalize_text(cell.text)
131
+ if text:
132
+ all_cells.append(text)
133
  return {
134
+ 'heading': heading,
135
+ 'headers': headers,
136
+ 'col0': col0,
137
+ 'first_cell': first_cell,
138
+ 'all_cells': all_cells,
139
+ 'num_rows': len(tbl.rows),
140
+ 'num_cols': len(tbl.rows[0].cells) if tbl.rows else 0
141
  }
142
 
 
 
 
 
 
 
 
 
 
 
 
 
 
143
 
144
+ def calculate_schema_match_score(schema_name, spec, context):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
145
  """
146
+ Return (score, reasons[]) for how well a table context matches a schema.
147
+ Heuristic-based scoring; vehicle registration and 'DETAILS' summary boosts added.
148
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
149
  score = 0
150
  reasons = []
151
 
152
+ table_text = " ".join(context.get('headers', [])).lower() + " " + context.get('heading', "").lower()
153
+
154
+ # Vehicle Registration specific boost
155
  if "Vehicle Registration" in schema_name:
156
+ vehicle_keywords = ["registration", "vehicle", "sub-contractor", "weight verification", "rfs suspension", "roadworthiness"]
157
+ keyword_matches = sum(1 for kw in vehicle_keywords if kw in table_text)
 
158
  if keyword_matches >= 2:
159
  score += 150
160
+ reasons.append(f"Vehicle keywords matched: {keyword_matches}")
161
  elif keyword_matches >= 1:
162
  score += 75
163
+ reasons.append(f"Some vehicle keywords matched: {keyword_matches}")
164
 
165
+ # Summary DETAILS boost
166
+ if "Summary" in schema_name and "details" in table_text:
167
  score += 100
168
+ reasons.append("Summary with DETAILS found")
 
 
 
 
 
 
 
 
 
 
 
169
 
170
+ if "Summary" not in schema_name and "details" in table_text:
171
+ score -= 75
172
+ reasons.append("Non-summary schema penalized due to DETAILS column presence")
 
 
 
173
 
174
+ # Context exclusions
175
+ for exclusion in spec.get("context_exclusions", []):
176
+ if exclusion.lower() in table_text:
177
+ score -= 50
178
+ reasons.append(f"Context exclusion: {exclusion}")
179
+
180
+ # Context keywords positive matches
181
+ kw_count = 0
182
+ for kw in spec.get("context_keywords", []):
183
+ if kw.lower() in table_text:
184
+ kw_count += 1
185
+ if kw_count:
186
+ score += kw_count * 15
187
+ reasons.append(f"Context keywords matched: {kw_count}")
188
+
189
+ # First-cell exact match
190
+ if context.get('first_cell') and context['first_cell'].upper() == schema_name.upper():
191
  score += 100
192
+ reasons.append("Exact first cell match")
193
 
194
+ # Heading pattern match
195
+ for h in spec.get("headings", []) or []:
196
+ pat = h.get("text") if isinstance(h, dict) and h.get("text") else h
197
+ try:
198
+ if pat and re.search(pat, context.get('heading', ""), re.IGNORECASE):
 
 
 
199
  score += 50
200
+ reasons.append(f"Heading regex matched: {pat}")
201
+ break
202
+ except re.error:
203
+ if pat and pat.lower() in context.get('heading', "").lower():
204
+ score += 50
205
+ reasons.append(f"Heading substring matched: {pat}")
206
  break
207
 
208
+ # Column header matching (strict)
209
  if spec.get("columns"):
210
  cols = [normalize_text(c) for c in spec["columns"]]
211
  matches = 0
212
  for col in cols:
213
+ if any(col.upper() in h.upper() for h in context.get('headers', [])):
214
  matches += 1
215
  if matches == len(cols):
216
  score += 60
217
+ reasons.append("All expected columns matched exactly")
218
  elif matches > 0:
219
  score += matches * 20
220
  reasons.append(f"Partial column matches: {matches}/{len(cols)}")
221
 
222
+ # Label matching for left-oriented tables
223
  if spec.get("orientation") == "left":
224
  labels = [normalize_text(lbl) for lbl in spec.get("labels", [])]
225
  matches = 0
226
  for lbl in labels:
227
+ if any(lbl.upper() in c.upper() or c.upper() in lbl.upper() for c in context.get('col0', [])):
228
  matches += 1
229
  if matches > 0:
230
  score += (matches / max(1, len(labels))) * 30
231
+ reasons.append(f"Left-orientation label matches: {matches}/{len(labels)}")
232
 
233
+ # Row1 (header row) flexible matching
234
  elif spec.get("orientation") == "row1":
235
  labels = [normalize_text(lbl) for lbl in spec.get("labels", [])]
236
+ matches = 0.0
237
+ header_texts = " ".join(context.get('headers', [])).upper()
238
  for lbl in labels:
239
+ label_upper = lbl.upper()
240
+ # exact in any header
241
+ if any(label_upper in h.upper() for h in context.get('headers', [])):
242
+ matches += 1.0
243
+ else:
244
+ # partial words from label in header_texts
245
+ for word in label_upper.split():
246
+ if len(word) > 3 and word in header_texts:
247
+ matches += 0.5
248
+ break
249
  if matches > 0:
250
+ score += (matches / max(1.0, len(labels))) * 40
251
+ reasons.append(f"Row1 header-like matches: {matches}/{len(labels)}")
252
+
253
+ # Special handling for declaration schemas
254
+ if schema_name == "Operator Declaration":
255
+ # boost if 'print name' first cell and heading indicates operator declaration
256
+ if context.get('first_cell', "").upper().startswith("PRINT"):
257
+ if "OPERATOR DECLARATION" in context.get('heading', "").upper():
258
+ score += 80
259
+ reasons.append("Operator Declaration context & first-cell indicate match")
260
+ elif any("MANAGER" in c.upper() for c in context.get('all_cells', [])):
261
+ score += 60
262
+ reasons.append("Manager found in cells for Operator Declaration")
263
+
264
+ if schema_name == "NHVAS Approved Auditor Declaration":
265
+ if context.get('first_cell', "").upper().startswith("PRINT"):
266
+ # penalize where manager words appear (to reduce false positives)
267
+ if any("MANAGER" in c.upper() for c in context.get('all_cells', [])):
268
+ score -= 50
269
+ reasons.append("Penalty: found manager text in auditor declaration table")
270
 
271
  return score, reasons
272
 
273
+
274
  def match_table_schema(tbl):
275
+ """
276
+ Iterate TABLE_SCHEMAS and pick best match by score threshold.
277
+ Returns schema name or None when below threshold.
278
+ """
279
  context = get_table_context(tbl)
280
  best_match = None
281
+ best_score = float("-inf")
282
  for name, spec in TABLE_SCHEMAS.items():
283
+ try:
284
+ score, reasons = calculate_schema_match_score(name, spec, context)
285
+ except Exception:
286
+ score, reasons = 0, ["error computing score"]
287
  if score > best_score:
288
  best_score = score
289
  best_match = name
290
+ # threshold to avoid spurious picks
291
  if best_score >= 20:
292
  return best_match
293
  return None
294
 
295
+
 
 
296
  def check_multi_schema_table(tbl):
297
+ """
298
+ Identify tables that contain multiple logical schemas (e.g., Operator Information + Contact Details)
299
+ Return list of schema names if multi, else None.
300
+ """
301
  context = get_table_context(tbl)
302
+ operator_labels = ["Operator name (Legal entity)", "NHVAS Accreditation No.", "Registered trading name/s",
303
+ "Australian Company Number", "NHVAS Manual"]
 
 
304
  contact_labels = ["Operator business address", "Operator Postal address", "Email address", "Operator Telephone Number"]
305
+ has_operator = any(any(op_lbl.upper() in cell.upper() for op_lbl in operator_labels) for cell in context.get('col0', []))
306
+ has_contact = any(any(cont_lbl.upper() in cell.upper() for cont_lbl in contact_labels) for cell in context.get('col0', []))
307
  if has_operator and has_contact:
308
  return ["Operator Information", "Operator contact details"]
309
  return None
310
 
311
+
312
  def extract_multi_schema_table(tbl, schemas):
313
+ """
314
+ For tables that embed multiple schema sections vertically (left orientation), split and extract.
315
+ Returns a dict mapping schema_name -> {label: [values,...]}
316
+ """
317
  result = {}
318
  for schema_name in schemas:
319
  if schema_name not in TABLE_SCHEMAS:
320
  continue
321
  spec = TABLE_SCHEMAS[schema_name]
322
  schema_data = {}
323
+ # iterate rows and match the left-most cell against spec labels
324
  for ri, row in enumerate(tbl.rows):
325
+ if not row.cells:
326
  continue
327
  row_label = normalize_text(row.cells[0].text)
328
+ belongs = False
329
  matched_label = None
330
  for spec_label in spec.get("labels", []):
331
  spec_norm = normalize_text(spec_label).upper()
332
  row_norm = row_label.upper()
333
  if spec_norm == row_norm or spec_norm in row_norm or row_norm in spec_norm:
334
+ belongs = True
335
  matched_label = spec_label
336
  break
337
+ if not belongs:
338
  continue
339
+ # gather red-text from the row's value cells (all others)
340
+ for ci, cell in enumerate(row.cells[1:], start=1):
341
  red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip()
342
  if red_txt:
343
+ schema_data.setdefault(matched_label, []).append(red_txt)
 
 
344
  if schema_data:
345
  result[schema_name] = schema_data
346
  return result
347
 
348
+
 
 
349
  def extract_table_data(tbl, schema_name, spec):
350
+ """
351
+ Extract red text from a table for a given schema.
352
+ Special handling for Vehicle Registration (row1 header orientation).
353
+ """
354
+ # Vehicle Registration special-case (headers in first row)
355
  if "Vehicle Registration" in schema_name:
356
  print(f" 🚗 EXTRACTION FIX: Processing Vehicle Registration table")
357
  labels = spec.get("labels", [])
 
359
  seen = {lbl: set() for lbl in labels}
360
 
361
  if len(tbl.rows) < 2:
362
+ print(" ❌ Vehicle table has less than 2 rows; skipping")
363
  return {}
364
 
365
  header_row = tbl.rows[0]
366
  column_mapping = {}
 
367
  print(f" 📋 Mapping {len(header_row.cells)} header cells to labels")
368
 
369
  for col_idx, cell in enumerate(header_row.cells):
 
371
  if not header_text:
372
  continue
373
  print(f" Column {col_idx}: '{header_text}'")
374
+ best_match = None
375
+ best_score = 0.0
376
+
377
+ for label in labels:
378
+ # exact match
379
+ if header_text.upper() == label.upper():
380
+ best_match = label
381
+ best_score = 1.0
382
+ break
383
+
384
+ # partial token overlap scoring
385
+ header_words = set(word.upper() for word in header_text.split() if len(word) > 2)
386
+ label_words = set(word.upper() for word in label.split() if len(word) > 2)
387
+ if header_words and label_words:
388
+ common = header_words.intersection(label_words)
389
+ if common:
390
+ score = len(common) / max(len(header_words), len(label_words))
391
+ if score > best_score and score >= 0.35: # relaxed threshold for OCR noise
392
+ best_score = score
393
+ best_match = label
394
+
395
+ if best_match:
396
+ column_mapping[col_idx] = best_match
397
+ print(f" ✅ Mapped to: '{best_match}' (score: {best_score:.2f})")
398
  else:
399
+ # additional heuristics: simple substring matches
400
+ for label in labels:
401
+ if label.lower() in header_text.lower() or header_text.lower() in label.lower():
402
+ column_mapping[col_idx] = label
403
+ print(f" ✅ Mapped by substring to: '{label}'")
404
+ break
 
 
 
 
 
 
 
 
 
 
405
  else:
406
  print(f" ⚠️ No mapping found for '{header_text}'")
407
 
408
  print(f" 📊 Total column mappings: {len(column_mapping)}")
409
 
410
+ # Extract data rows
411
  for row_idx in range(1, len(tbl.rows)):
412
  row = tbl.rows[row_idx]
413
  print(f" 📌 Processing data row {row_idx}")
414
  for col_idx, cell in enumerate(row.cells):
415
+ if col_idx not in column_mapping:
416
+ continue
417
+ label = column_mapping[col_idx]
418
+ red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip()
419
+ if red_txt:
420
+ print(f" 🔴 Found red text in '{label}': '{red_txt}'")
421
+ if red_txt not in seen[label]:
422
+ seen[label].add(red_txt)
423
+ collected[label].append(red_txt)
424
+
425
  result = {k: v for k, v in collected.items() if v}
426
  print(f" ✅ Vehicle Registration extracted: {len(result)} columns with data")
427
  return result
428
 
429
+ # Generic extraction for other table types
430
  labels = spec.get("labels", []) + [schema_name]
431
  collected = {lbl: [] for lbl in labels}
432
  seen = {lbl: set() for lbl in labels}
 
439
  red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip()
440
  if not red_txt:
441
  continue
442
+
443
  if by_col:
444
+ # column-wise mapping (header labels)
445
  if ci < len(spec.get("labels", [])):
446
  lbl = spec["labels"][ci]
447
  else:
448
  lbl = schema_name
449
  else:
450
+ # left-oriented: match left label
451
  raw_label = normalize_text(row.cells[0].text)
452
  lbl = None
453
  for spec_label in spec.get("labels", []):
 
463
  break
464
  if not lbl:
465
  lbl = schema_name
466
+
467
+ if red_txt not in seen[lbl]:
468
  seen[lbl].add(red_txt)
469
+ collected[lbl].append(red_txt)
470
+
471
  return {k: v for k, v in collected.items() if v}
472
 
473
+
 
 
474
  def extract_red_text(input_doc):
475
+ """
476
+ Main extraction function.
477
+ Accepts a docx.Document object or a path string (filename).
478
+ Returns dictionary of extracted red-text organized by schema.
479
+ """
480
  if isinstance(input_doc, str):
481
  doc = Document(input_doc)
482
  else:
483
  doc = input_doc
484
+
485
  out = {}
486
  table_count = 0
487
 
488
  for tbl in doc.tables:
489
  table_count += 1
490
+ # Check for multi-schema tables first
491
  multi_schemas = check_multi_schema_table(tbl)
492
  if multi_schemas:
493
  multi_data = extract_multi_schema_table(tbl, multi_schemas)
494
  for schema_name, schema_data in multi_data.items():
495
  if schema_data:
496
+ if schema_name in out:
497
+ for k, v in schema_data.items():
498
+ out[schema_name].setdefault(k, []).extend(v)
499
+ else:
500
+ out[schema_name] = schema_data
 
 
 
501
  continue
502
 
503
+ # match a single schema
504
  schema = match_table_schema(tbl)
505
  if not schema:
506
+ # no confident schema match
507
  continue
508
+ spec = TABLE_SCHEMAS.get(schema, {})
509
  data = extract_table_data(tbl, schema, spec)
510
  if data:
511
+ if schema in out:
512
+ for k, v in data.items():
513
+ out[schema].setdefault(k, []).extend(v)
514
+ else:
515
+ out[schema] = data
516
+
517
+ # Paragraph-level red-text extraction (with contextual heading resolution)
 
 
518
  paras = {}
519
  for idx, para in enumerate(doc.paragraphs):
520
  red_txt = "".join(r.text for r in para.runs if is_red_font(r)).strip()
521
  if not red_txt:
522
  continue
523
 
524
+ # attempt to find nearest preceding heading paragraph (using HEADING_PATTERNS)
525
  context = None
526
  for j in range(idx - 1, -1, -1):
527
  txt = normalize_text(doc.paragraphs[j].text)
528
+ if not txt:
529
+ continue
530
+ all_patterns = HEADING_PATTERNS.get("main", []) + HEADING_PATTERNS.get("sub", [])
531
+ if any(re.search(p, txt, re.IGNORECASE) for p in all_patterns):
532
+ context = txt
533
+ break
534
 
535
+ # fallback: date-line mapping for 'Date' single-line red texts
536
+ if not context and re.fullmatch(PARAGRAPH_PATTERNS.get("date_line", r"^\s*\d{1,2}(?:st|nd|rd|th)?\s+[A-Za-z]+\s+\d{4}\s*$|^Date$"), red_txt):
537
  context = "Date"
538
 
539
  if not context:
540
  context = "(para)"
541
+
542
+ paras.setdefault(context, []).append(red_txt)
 
543
 
544
  if paras:
545
  out["paragraphs"] = paras
546
+
547
  return out
548
 
549
+
 
 
550
  def extract_red_text_filelike(input_file, output_file):
551
+ """
552
+ Accepts:
553
+ - input_file: file-like object (BytesIO/File) or path
554
+ - output_file: file-like object (opened for writing text) or path
555
+ Returns the parsed dictionary.
556
+ Writes the JSON to output_file if possible.
557
+ """
558
+ # Reset file-like if necessary
559
  if hasattr(input_file, "seek"):
560
+ try:
561
+ input_file.seek(0)
562
+ except Exception:
563
+ pass
564
+
565
+ # Load Document
566
+ if isinstance(input_file, (str, bytes)):
567
+ doc = Document(input_file)
568
+ else:
569
+ doc = Document(input_file)
570
+
571
  result = extract_red_text(doc)
572
+
573
+ # Write result out
574
  if hasattr(output_file, "write"):
575
  json.dump(result, output_file, indent=2, ensure_ascii=False)
576
+ try:
577
+ output_file.flush()
578
+ except Exception:
579
+ pass
580
  else:
581
  with open(output_file, "w", encoding="utf-8") as f:
582
  json.dump(result, f, indent=2, ensure_ascii=False)
583
+
584
  return result
585
 
586
+
 
 
587
  if __name__ == "__main__":
588
+ # Backwards-compatible script entry point
589
  if len(sys.argv) == 3:
590
  input_docx = sys.argv[1]
591
  output_json = sys.argv[2]
592
+ try:
593
+ doc = Document(input_docx)
594
+ word_data = extract_red_text(doc)
595
+ with open(output_json, 'w', encoding='utf-8') as f:
596
+ json.dump(word_data, f, indent=2, ensure_ascii=False)
597
+ print(json.dumps(word_data, indent=2, ensure_ascii=False))
598
+ except Exception as e:
599
+ print("Error during extraction:", e)
600
+ raise
601
  else:
602
  print("To use as a module: extract_red_text_filelike(input_file, output_file)")