Shami96 commited on
Commit
704d2a2
·
verified ·
1 Parent(s): 9880bcc

Update extract_red_text.py

Browse files
Files changed (1) hide show
  1. extract_red_text.py +316 -577
extract_red_text.py CHANGED
@@ -2,601 +2,340 @@
2
  """
3
  extract_red_text.py
4
  """
5
- import re
 
6
  import json
 
7
  import sys
8
- from io import BytesIO
9
- from docx import Document
10
- from docx.oxml.ns import qn
11
-
12
- # Import schema constants (TABLE_SCHEMAS, HEADING_PATTERNS, PARAGRAPH_PATTERNS, GLOBAL_SETTINGS)
13
- # Ensure master_key.py is present in same dir / importable path
14
- from master_key import TABLE_SCHEMAS, HEADING_PATTERNS, PARAGRAPH_PATTERNS, GLOBAL_SETTINGS
15
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
 
17
- def is_red_font(run):
18
  """
19
- Robust detection of 'red' font in a run.
20
- Tries several sources:
21
- - python-docx run.font.color.rgb (safe-guarded)
22
- - raw XML rPr/w:color value (hex)
23
- Returns True if color appears predominantly red.
24
  """
25
- # Quick guard
26
- if run is None:
27
- return False
 
 
 
 
 
 
 
 
 
 
 
 
28
 
29
- # 1) Try docx high-level color API if available
 
 
 
 
 
 
 
30
  try:
31
- col = getattr(run.font, "color", None)
32
- if col is not None:
33
- rgb_val = getattr(col, "rgb", None)
34
- if rgb_val:
35
- # rgb_val might be an RGBColor object or a tuple/list or hex-string
36
- try:
37
- # If it's sequence-like (tuple/list) with 3 ints
38
- if isinstance(rgb_val, (tuple, list)) and len(rgb_val) == 3:
39
- rr, gg, bb = rgb_val
40
- else:
41
- # Try string representation like 'FF0000' or 'ff0000'
42
- hexstr = str(rgb_val).strip()
43
- if re.fullmatch(r"[0-9A-Fa-f]{6}", hexstr):
44
- rr, gg, bb = int(hexstr[0:2], 16), int(hexstr[2:4], 16), int(hexstr[4:6], 16)
45
- else:
46
- # unknown format - fall through to XML check
47
- rr = gg = bb = None
48
- if rr is not None:
49
- # Heuristic thresholds for 'red-ish'
50
- if rr > 150 and gg < 120 and bb < 120 and (rr - gg) > 30 and (rr - bb) > 30:
51
- return True
52
- except Exception:
53
- # fall back to rPr introspection below
54
- pass
 
 
 
 
 
55
  except Exception:
56
- # ignore and continue to XML method
57
  pass
58
-
59
- # 2) Inspect raw XML run properties for <w:color w:val="RRGGBB" />
60
  try:
61
- rPr = getattr(run._element, "rPr", None)
62
- if rPr is not None:
63
- clr = rPr.find(qn('w:color'))
64
- if clr is not None:
65
- val = clr.get(qn('w:val')) or clr.get('w:val') or clr.get('val')
66
- if val and isinstance(val, str):
67
- val = val.strip()
68
- # sometimes color is provided as 'FF0000' hex or shorthand
69
- if re.fullmatch(r"[0-9A-Fa-f]{6}", val):
70
- rr, gg, bb = int(val[0:2], 16), int(val[2:4], 16), int(val[4:6], 16)
71
- if rr > 150 and gg < 120 and bb < 120 and (rr - gg) > 30 and (rr - bb) > 30:
72
- return True
73
  except Exception:
74
  pass
75
-
76
- return False
77
-
78
-
79
- def _prev_para_text(tbl):
80
- """Return text of previous paragraph node before a given table element."""
81
- prev = tbl._tbl.getprevious()
82
- while prev is not None and not prev.tag.endswith("}p"):
83
- prev = prev.getprevious()
84
- if prev is None:
85
- return ""
86
- # gather all text nodes under the paragraph element
87
- return "".join(node.text for node in prev.iter() if node.tag.endswith("}t") and node.text).strip()
88
-
89
-
90
- def normalize_text(text):
91
- """Normalize text for more reliable matching (collapse whitespace)."""
92
- if text is None:
93
- return ""
94
- return re.sub(r'\s+', ' ', text.strip())
95
-
96
-
97
- def fuzzy_match_heading(heading, patterns):
98
- """
99
- Attempt fuzzy matching of heading against regex patterns.
100
- patterns is a list of pattern dicts or strings.
101
- """
102
- heading_norm = normalize_text(heading.upper())
103
- for p in patterns:
104
- if isinstance(p, dict):
105
- pat = p.get("text", "")
106
- else:
107
- pat = p
108
- try:
109
- if re.search(pat, heading_norm, re.IGNORECASE):
110
- return True
111
- except re.error:
112
- # treat as plain substring fallback
113
- if pat and pat.upper() in heading_norm:
114
- return True
115
  return False
116
 
117
-
118
- def get_table_context(tbl):
119
- """Return context metadata for a table to aid schema matching."""
120
- heading = normalize_text(_prev_para_text(tbl))
121
- headers = []
122
- if tbl.rows:
123
- # collect header text of first row, keeping cell order
124
- headers = [normalize_text(c.text) for c in tbl.rows[0].cells]
125
- col0 = [normalize_text(r.cells[0].text) for r in tbl.rows if r.cells and r.cells[0].text.strip()]
126
- first_cell = normalize_text(tbl.rows[0].cells[0].text) if tbl.rows else ""
127
- all_cells = []
128
- for row in tbl.rows:
129
- for cell in row.cells:
130
- text = normalize_text(cell.text)
131
- if text:
132
- all_cells.append(text)
133
- return {
134
- 'heading': heading,
135
- 'headers': headers,
136
- 'col0': col0,
137
- 'first_cell': first_cell,
138
- 'all_cells': all_cells,
139
- 'num_rows': len(tbl.rows),
140
- 'num_cols': len(tbl.rows[0].cells) if tbl.rows else 0
141
- }
142
-
143
-
144
- def calculate_schema_match_score(schema_name, spec, context):
145
- """
146
- Return (score, reasons[]) for how well a table context matches a schema.
147
- Heuristic-based scoring; vehicle registration and 'DETAILS' summary boosts added.
148
- """
149
- score = 0
150
- reasons = []
151
-
152
- table_text = " ".join(context.get('headers', [])).lower() + " " + context.get('heading', "").lower()
153
-
154
- # Vehicle Registration specific boost
155
- if "Vehicle Registration" in schema_name:
156
- vehicle_keywords = ["registration", "vehicle", "sub-contractor", "weight verification", "rfs suspension", "roadworthiness"]
157
- keyword_matches = sum(1 for kw in vehicle_keywords if kw in table_text)
158
- if keyword_matches >= 2:
159
- score += 150
160
- reasons.append(f"Vehicle keywords matched: {keyword_matches}")
161
- elif keyword_matches >= 1:
162
- score += 75
163
- reasons.append(f"Some vehicle keywords matched: {keyword_matches}")
164
-
165
- # Summary DETAILS boost
166
- if "Summary" in schema_name and "details" in table_text:
167
- score += 100
168
- reasons.append("Summary with DETAILS found")
169
-
170
- if "Summary" not in schema_name and "details" in table_text:
171
- score -= 75
172
- reasons.append("Non-summary schema penalized due to DETAILS column presence")
173
-
174
- # Context exclusions
175
- for exclusion in spec.get("context_exclusions", []):
176
- if exclusion.lower() in table_text:
177
- score -= 50
178
- reasons.append(f"Context exclusion: {exclusion}")
179
-
180
- # Context keywords positive matches
181
- kw_count = 0
182
- for kw in spec.get("context_keywords", []):
183
- if kw.lower() in table_text:
184
- kw_count += 1
185
- if kw_count:
186
- score += kw_count * 15
187
- reasons.append(f"Context keywords matched: {kw_count}")
188
-
189
- # First-cell exact match
190
- if context.get('first_cell') and context['first_cell'].upper() == schema_name.upper():
191
- score += 100
192
- reasons.append("Exact first cell match")
193
-
194
- # Heading pattern match
195
- for h in spec.get("headings", []) or []:
196
- pat = h.get("text") if isinstance(h, dict) and h.get("text") else h
197
- try:
198
- if pat and re.search(pat, context.get('heading', ""), re.IGNORECASE):
199
- score += 50
200
- reasons.append(f"Heading regex matched: {pat}")
201
- break
202
- except re.error:
203
- if pat and pat.lower() in context.get('heading', "").lower():
204
- score += 50
205
- reasons.append(f"Heading substring matched: {pat}")
206
- break
207
-
208
- # Column header matching (strict)
209
- if spec.get("columns"):
210
- cols = [normalize_text(c) for c in spec["columns"]]
211
- matches = 0
212
- for col in cols:
213
- if any(col.upper() in h.upper() for h in context.get('headers', [])):
214
- matches += 1
215
- if matches == len(cols):
216
- score += 60
217
- reasons.append("All expected columns matched exactly")
218
- elif matches > 0:
219
- score += matches * 20
220
- reasons.append(f"Partial column matches: {matches}/{len(cols)}")
221
-
222
- # Label matching for left-oriented tables
223
- if spec.get("orientation") == "left":
224
- labels = [normalize_text(lbl) for lbl in spec.get("labels", [])]
225
- matches = 0
226
- for lbl in labels:
227
- if any(lbl.upper() in c.upper() or c.upper() in lbl.upper() for c in context.get('col0', [])):
228
- matches += 1
229
- if matches > 0:
230
- score += (matches / max(1, len(labels))) * 30
231
- reasons.append(f"Left-orientation label matches: {matches}/{len(labels)}")
232
-
233
- # Row1 (header row) flexible matching
234
- elif spec.get("orientation") == "row1":
235
- labels = [normalize_text(lbl) for lbl in spec.get("labels", [])]
236
- matches = 0.0
237
- header_texts = " ".join(context.get('headers', [])).upper()
238
- for lbl in labels:
239
- label_upper = lbl.upper()
240
- # exact in any header
241
- if any(label_upper in h.upper() for h in context.get('headers', [])):
242
- matches += 1.0
243
- else:
244
- # partial words from label in header_texts
245
- for word in label_upper.split():
246
- if len(word) > 3 and word in header_texts:
247
- matches += 0.5
248
- break
249
- if matches > 0:
250
- score += (matches / max(1.0, len(labels))) * 40
251
- reasons.append(f"Row1 header-like matches: {matches}/{len(labels)}")
252
-
253
- # Special handling for declaration schemas
254
- if schema_name == "Operator Declaration":
255
- # boost if 'print name' first cell and heading indicates operator declaration
256
- if context.get('first_cell', "").upper().startswith("PRINT"):
257
- if "OPERATOR DECLARATION" in context.get('heading', "").upper():
258
- score += 80
259
- reasons.append("Operator Declaration context & first-cell indicate match")
260
- elif any("MANAGER" in c.upper() for c in context.get('all_cells', [])):
261
- score += 60
262
- reasons.append("Manager found in cells for Operator Declaration")
263
-
264
- if schema_name == "NHVAS Approved Auditor Declaration":
265
- if context.get('first_cell', "").upper().startswith("PRINT"):
266
- # penalize where manager words appear (to reduce false positives)
267
- if any("MANAGER" in c.upper() for c in context.get('all_cells', [])):
268
- score -= 50
269
- reasons.append("Penalty: found manager text in auditor declaration table")
270
-
271
- return score, reasons
272
-
273
-
274
- def match_table_schema(tbl):
275
- """
276
- Iterate TABLE_SCHEMAS and pick best match by score threshold.
277
- Returns schema name or None when below threshold.
278
- """
279
- context = get_table_context(tbl)
280
- best_match = None
281
- best_score = float("-inf")
282
- for name, spec in TABLE_SCHEMAS.items():
283
- try:
284
- score, reasons = calculate_schema_match_score(name, spec, context)
285
- except Exception:
286
- score, reasons = 0, ["error computing score"]
287
- if score > best_score:
288
- best_score = score
289
- best_match = name
290
- # threshold to avoid spurious picks
291
- if best_score >= 20:
292
- return best_match
293
- return None
294
-
295
-
296
- def check_multi_schema_table(tbl):
297
- """
298
- Identify tables that contain multiple logical schemas (e.g., Operator Information + Contact Details)
299
- Return list of schema names if multi, else None.
300
- """
301
- context = get_table_context(tbl)
302
- operator_labels = ["Operator name (Legal entity)", "NHVAS Accreditation No.", "Registered trading name/s",
303
- "Australian Company Number", "NHVAS Manual"]
304
- contact_labels = ["Operator business address", "Operator Postal address", "Email address", "Operator Telephone Number"]
305
- has_operator = any(any(op_lbl.upper() in cell.upper() for op_lbl in operator_labels) for cell in context.get('col0', []))
306
- has_contact = any(any(cont_lbl.upper() in cell.upper() for cont_lbl in contact_labels) for cell in context.get('col0', []))
307
- if has_operator and has_contact:
308
- return ["Operator Information", "Operator contact details"]
309
- return None
310
-
311
-
312
- def extract_multi_schema_table(tbl, schemas):
313
- """
314
- For tables that embed multiple schema sections vertically (left orientation), split and extract.
315
- Returns a dict mapping schema_name -> {label: [values,...]}
316
- """
317
- result = {}
318
- for schema_name in schemas:
319
- if schema_name not in TABLE_SCHEMAS:
320
- continue
321
- spec = TABLE_SCHEMAS[schema_name]
322
- schema_data = {}
323
- # iterate rows and match the left-most cell against spec labels
324
- for ri, row in enumerate(tbl.rows):
325
- if not row.cells:
326
- continue
327
- row_label = normalize_text(row.cells[0].text)
328
- belongs = False
329
- matched_label = None
330
- for spec_label in spec.get("labels", []):
331
- spec_norm = normalize_text(spec_label).upper()
332
- row_norm = row_label.upper()
333
- if spec_norm == row_norm or spec_norm in row_norm or row_norm in spec_norm:
334
- belongs = True
335
- matched_label = spec_label
336
- break
337
- if not belongs:
338
- continue
339
- # gather red-text from the row's value cells (all others)
340
- for ci, cell in enumerate(row.cells[1:], start=1):
341
- red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip()
342
- if red_txt:
343
- schema_data.setdefault(matched_label, []).append(red_txt)
344
- if schema_data:
345
- result[schema_name] = schema_data
346
- return result
347
-
348
-
349
- def extract_table_data(tbl, schema_name, spec):
350
- """
351
- Extract red text from a table for a given schema.
352
- Special handling for Vehicle Registration (row1 header orientation).
353
- """
354
- # Vehicle Registration special-case (headers in first row)
355
- if "Vehicle Registration" in schema_name:
356
- print(f" 🚗 EXTRACTION FIX: Processing Vehicle Registration table")
357
- labels = spec.get("labels", [])
358
- collected = {lbl: [] for lbl in labels}
359
- seen = {lbl: set() for lbl in labels}
360
-
361
- if len(tbl.rows) < 2:
362
- print(" ❌ Vehicle table has less than 2 rows; skipping")
363
- return {}
364
-
365
- header_row = tbl.rows[0]
366
- column_mapping = {}
367
- print(f" 📋 Mapping {len(header_row.cells)} header cells to labels")
368
-
369
- for col_idx, cell in enumerate(header_row.cells):
370
- header_text = normalize_text(cell.text).strip()
371
- if not header_text:
372
- continue
373
- print(f" Column {col_idx}: '{header_text}'")
374
- best_match = None
375
- best_score = 0.0
376
-
377
- for label in labels:
378
- # exact match
379
- if header_text.upper() == label.upper():
380
- best_match = label
381
- best_score = 1.0
382
- break
383
-
384
- # partial token overlap scoring
385
- header_words = set(word.upper() for word in header_text.split() if len(word) > 2)
386
- label_words = set(word.upper() for word in label.split() if len(word) > 2)
387
- if header_words and label_words:
388
- common = header_words.intersection(label_words)
389
- if common:
390
- score = len(common) / max(len(header_words), len(label_words))
391
- if score > best_score and score >= 0.35: # relaxed threshold for OCR noise
392
- best_score = score
393
- best_match = label
394
-
395
- if best_match:
396
- column_mapping[col_idx] = best_match
397
- print(f" ✅ Mapped to: '{best_match}' (score: {best_score:.2f})")
398
- else:
399
- # additional heuristics: simple substring matches
400
- for label in labels:
401
- if label.lower() in header_text.lower() or header_text.lower() in label.lower():
402
- column_mapping[col_idx] = label
403
- print(f" ✅ Mapped by substring to: '{label}'")
404
- break
405
  else:
406
- print(f" ⚠️ No mapping found for '{header_text}'")
407
-
408
- print(f" 📊 Total column mappings: {len(column_mapping)}")
409
-
410
- # Extract data rows
411
- for row_idx in range(1, len(tbl.rows)):
412
- row = tbl.rows[row_idx]
413
- print(f" 📌 Processing data row {row_idx}")
414
- for col_idx, cell in enumerate(row.cells):
415
- if col_idx not in column_mapping:
416
- continue
417
- label = column_mapping[col_idx]
418
- red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip()
419
- if red_txt:
420
- print(f" 🔴 Found red text in '{label}': '{red_txt}'")
421
- if red_txt not in seen[label]:
422
- seen[label].add(red_txt)
423
- collected[label].append(red_txt)
424
-
425
- result = {k: v for k, v in collected.items() if v}
426
- print(f" ✅ Vehicle Registration extracted: {len(result)} columns with data")
427
- return result
428
-
429
- # Generic extraction for other table types
430
- labels = spec.get("labels", []) + [schema_name]
431
- collected = {lbl: [] for lbl in labels}
432
- seen = {lbl: set() for lbl in labels}
433
- by_col = (spec.get("orientation") == "row1")
434
- start_row = 1 if by_col else 0
435
- rows = tbl.rows[start_row:]
436
-
437
- for ri, row in enumerate(rows):
438
- for ci, cell in enumerate(row.cells):
439
- red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip()
440
- if not red_txt:
441
- continue
442
-
443
- if by_col:
444
- # column-wise mapping (header labels)
445
- if ci < len(spec.get("labels", [])):
446
- lbl = spec["labels"][ci]
447
- else:
448
- lbl = schema_name
449
- else:
450
- # left-oriented: match left label
451
- raw_label = normalize_text(row.cells[0].text)
452
- lbl = None
453
- for spec_label in spec.get("labels", []):
454
- if normalize_text(spec_label).upper() == raw_label.upper():
455
- lbl = spec_label
456
- break
457
- if not lbl:
458
- for spec_label in spec.get("labels", []):
459
- spec_norm = normalize_text(spec_label).upper()
460
- raw_norm = raw_label.upper()
461
- if spec_norm in raw_norm or raw_norm in spec_norm:
462
- lbl = spec_label
463
- break
464
- if not lbl:
465
- lbl = schema_name
466
-
467
- if red_txt not in seen[lbl]:
468
- seen[lbl].add(red_txt)
469
- collected[lbl].append(red_txt)
470
-
471
- return {k: v for k, v in collected.items() if v}
472
-
473
-
474
- def extract_red_text(input_doc):
475
- """
476
- Main extraction function.
477
- Accepts a docx.Document object or a path string (filename).
478
- Returns dictionary of extracted red-text organized by schema.
479
- """
480
- if isinstance(input_doc, str):
481
- doc = Document(input_doc)
482
- else:
483
- doc = input_doc
484
-
485
- out = {}
486
- table_count = 0
487
-
488
- for tbl in doc.tables:
489
- table_count += 1
490
- # Check for multi-schema tables first
491
- multi_schemas = check_multi_schema_table(tbl)
492
- if multi_schemas:
493
- multi_data = extract_multi_schema_table(tbl, multi_schemas)
494
- for schema_name, schema_data in multi_data.items():
495
- if schema_data:
496
- if schema_name in out:
497
- for k, v in schema_data.items():
498
- out[schema_name].setdefault(k, []).extend(v)
499
- else:
500
- out[schema_name] = schema_data
501
- continue
502
-
503
- # match a single schema
504
- schema = match_table_schema(tbl)
505
- if not schema:
506
- # no confident schema match
507
- continue
508
- spec = TABLE_SCHEMAS.get(schema, {})
509
- data = extract_table_data(tbl, schema, spec)
510
- if data:
511
- if schema in out:
512
- for k, v in data.items():
513
- out[schema].setdefault(k, []).extend(v)
514
- else:
515
- out[schema] = data
516
-
517
- # Paragraph-level red-text extraction (with contextual heading resolution)
518
- paras = {}
519
- for idx, para in enumerate(doc.paragraphs):
520
- red_txt = "".join(r.text for r in para.runs if is_red_font(r)).strip()
521
- if not red_txt:
522
- continue
523
-
524
- # attempt to find nearest preceding heading paragraph (using HEADING_PATTERNS)
525
- context = None
526
- for j in range(idx - 1, -1, -1):
527
- txt = normalize_text(doc.paragraphs[j].text)
528
- if not txt:
529
- continue
530
- all_patterns = HEADING_PATTERNS.get("main", []) + HEADING_PATTERNS.get("sub", [])
531
- if any(re.search(p, txt, re.IGNORECASE) for p in all_patterns):
532
- context = txt
533
- break
534
-
535
- # fallback: date-line mapping for 'Date' single-line red texts
536
- if not context and re.fullmatch(PARAGRAPH_PATTERNS.get("date_line", r"^\s*\d{1,2}(?:st|nd|rd|th)?\s+[A-Za-z]+\s+\d{4}\s*$|^Date$"), red_txt):
537
- context = "Date"
538
-
539
- if not context:
540
- context = "(para)"
541
-
542
- paras.setdefault(context, []).append(red_txt)
543
-
544
- if paras:
545
- out["paragraphs"] = paras
546
-
547
  return out
548
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
549
 
550
- def extract_red_text_filelike(input_file, output_file):
551
- """
552
- Accepts:
553
- - input_file: file-like object (BytesIO/File) or path
554
- - output_file: file-like object (opened for writing text) or path
555
- Returns the parsed dictionary.
556
- Writes the JSON to output_file if possible.
557
- """
558
- # Reset file-like if necessary
559
- if hasattr(input_file, "seek"):
560
- try:
561
- input_file.seek(0)
562
- except Exception:
563
- pass
564
-
565
- # Load Document
566
- if isinstance(input_file, (str, bytes)):
567
- doc = Document(input_file)
568
- else:
569
- doc = Document(input_file)
570
-
571
- result = extract_red_text(doc)
572
-
573
- # Write result out
574
- if hasattr(output_file, "write"):
575
- json.dump(result, output_file, indent=2, ensure_ascii=False)
576
- try:
577
- output_file.flush()
578
- except Exception:
579
- pass
580
- else:
581
- with open(output_file, "w", encoding="utf-8") as f:
582
- json.dump(result, f, indent=2, ensure_ascii=False)
583
-
584
- return result
585
 
 
 
 
 
 
 
 
586
 
587
  if __name__ == "__main__":
588
- # Backwards-compatible script entry point
589
- if len(sys.argv) == 3:
590
- input_docx = sys.argv[1]
591
- output_json = sys.argv[2]
592
- try:
593
- doc = Document(input_docx)
594
- word_data = extract_red_text(doc)
595
- with open(output_json, 'w', encoding='utf-8') as f:
596
- json.dump(word_data, f, indent=2, ensure_ascii=False)
597
- print(json.dumps(word_data, indent=2, ensure_ascii=False))
598
- except Exception as e:
599
- print("Error during extraction:", e)
600
- raise
601
- else:
602
- print("To use as a module: extract_red_text_filelike(input_file, output_file)")
 
2
  """
3
  extract_red_text.py
4
  """
5
+
6
+ from __future__ import annotations
7
  import json
8
+ import re
9
  import sys
10
+ import logging
11
+ from collections import defaultdict
12
+ from typing import List, Dict, Optional, Any
13
+
14
+ # attempt to import python-docx (document processing)
15
+ try:
16
+ from docx import Document
17
+ from docx.oxml.ns import qn
18
+ from docx.shared import RGBColor
19
+ except Exception as e:
20
+ raise RuntimeError("python-docx is required. Install with: pip install python-docx") from e
21
+
22
+ # ------------------------------
23
+ # Import master_key GLOBAL_SETTINGS and optional EXTRA_HEADER_SYNONYMS
24
+ # ------------------------------
25
+ try:
26
+ import master_key as mk
27
+ GLOBAL_SETTINGS = getattr(mk, "GLOBAL_SETTINGS", {})
28
+ EXTRA_HEADER_SYNONYMS = getattr(mk, "EXTRA_HEADER_SYNONYMS", None)
29
+ except Exception:
30
+ GLOBAL_SETTINGS = {
31
+ "normalize": {
32
+ "lower": True,
33
+ "strip_punctuation": True,
34
+ "collapse_whitespace": True,
35
+ "replace_smart_dashes": True
36
+ },
37
+ "ocr_repair_rules": [
38
+ (r"\s*\(\s*Yes\s*/\s*No\s*\)", " (Yes/No)"),
39
+ (r"R[e3]gistrat[i1]on", "Registration"),
40
+ (r"Prin?t", "Print"),
41
+ (r"Accredi[ta]tion", "Accreditation"),
42
+ (r"[^\w\s\-\&\(\)\/:]", " "),
43
+ ],
44
+ "split_on": [" – ", " - ", ";", "\n", " / "],
45
+ "date_like_pattern": r"^\s*(\d{1,2}(st|nd|rd|th)?\s+[A-Za-z]+|\d{1,2}\/\d{1,2}\/\d{2,4}|\d{1,2}\.\d{1,2}\.\d{2,4}|\d{1,2}\s+[A-Za-z]{3,})",
46
+ "fuzzy_thresholds": {"high_priority": 70, "medium_priority": 60, "low_priority": 45},
47
+ "fuzzy_algorithm": "token_set_ratio",
48
+ }
49
+ EXTRA_HEADER_SYNONYMS = None
50
+
51
+ # Provide an internal default synonyms map (compact keys -> canonical label)
52
+ # This is used only if master_key.EXTRA_HEADER_SYNONYMS is not defined.
53
+ _DEFAULT_EXTRA_HEADER_SYNONYMS = {
54
+ # Compact key: canonical label
55
+ # Examples from your logs (long/noisy headers)
56
+ "roadworthinesscertificatesapplicableforentryaudit": "Roadworthiness Certificates",
57
+ "roadworthinesscertificates": "Roadworthiness Certificates",
58
+ "rfsuspensioncertificationn/aifnotapplicable": "RFS Suspension Certification #",
59
+ "rfsuspensioncertification": "RFS Suspension Certification #",
60
+ "maintenanceRecordsrecorddaterangeofrecordsreviewed".lower(): "Maintenance Records",
61
+ "maintenancerecords": "Maintenance Records",
62
+ "faultrecordingreportingonsuspensionsystemdaterange".lower(): "Fault Recording/ Reporting",
63
+ "faultrecordingreporting": "Fault Recording/ Reporting",
64
+ "faultrepairdaterange": "Fault Repair",
65
+ "triprecordsdaterange": "Trip Records",
66
+ # Add common variations
67
+ "registrationnumber": "Registration Number",
68
+ "registrationnumbernumber": "Registration Number",
69
+ "subcontractor(yesno)": "Sub-contractor (Yes/No)",
70
+ "sub-contractor(yes/no)": "Sub-contractor (Yes/No)",
71
+ "nhvrorexemplarglobalauditorregistrationnumber": "NHVR or Exemplar Global Auditor Registration Number",
72
+ "printname": "Print Name",
73
+ "print": "Print Name",
74
+ }
75
+
76
+ # If mk provided EXTRA_HEADER_SYNONYMS, use it (but ensure keys are compacted similarly)
77
+ if EXTRA_HEADER_SYNONYMS is None:
78
+ EXTRA_HEADER_SYNONYMS = _DEFAULT_EXTRA_HEADER_SYNONYMS
79
+
80
+ # ------------------------------
81
+ # Logging
82
+ # ------------------------------
83
+ logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
84
+ log = logging.getLogger("extract_red_text")
85
+
86
+ # ------------------------------
87
+ # Normalization & OCR-repair utilities (aligned to GLOBAL_SETTINGS)
88
+ # ------------------------------
89
+ def _apply_ocr_repair_rules(text: str) -> str:
90
+ s = text or ""
91
+ for pat, repl in GLOBAL_SETTINGS.get("ocr_repair_rules", []):
92
+ try:
93
+ s = re.sub(pat, repl, s, flags=re.I)
94
+ except re.error:
95
+ # skip invalid rule
96
+ continue
97
+ return s
98
+
99
+ def _normalize_text(text: str) -> str:
100
+ """Normalize text according to GLOBAL_SETTINGS (readable normalized form)."""
101
+ s = _apply_ocr_repair_rules(text or "")
102
+ norm_cfg = GLOBAL_SETTINGS.get("normalize", {})
103
+ if norm_cfg.get("replace_smart_dashes", False):
104
+ s = s.replace("–", "-").replace("—", "-")
105
+ if norm_cfg.get("lower", False):
106
+ s = s.lower()
107
+ if norm_cfg.get("strip_punctuation", False):
108
+ # keep hyphen, ampersand, parentheses, slash, colon; drop other punctuation
109
+ s = re.sub(r"[^\w\s\-\&\(\)\/:]", " ", s)
110
+ if norm_cfg.get("collapse_whitespace", False):
111
+ s = re.sub(r"\s+", " ", s)
112
+ return s.strip()
113
+
114
+ def _compact_key(text: str) -> str:
115
+ """Create compact key (no non-word chars) for deterministic lookup."""
116
+ if text is None:
117
+ return ""
118
+ normalized = _normalize_text(text)
119
+ return re.sub(r"[^\w]", "", normalized)
120
 
121
+ def map_header_using_extra_synonyms(header_text: str) -> Optional[str]:
122
  """
123
+ Try deterministic mapping using EXTRA_HEADER_SYNONYMS.
124
+ Return canonical label if found, else None.
 
 
 
125
  """
126
+ if not header_text:
127
+ return None
128
+ normalized = _normalize_text(header_text)
129
+ compact = _compact_key(header_text)
130
+ # try compact key
131
+ if compact in EXTRA_HEADER_SYNONYMS:
132
+ return EXTRA_HEADER_SYNONYMS[compact]
133
+ # try normalized key directly
134
+ if normalized in EXTRA_HEADER_SYNONYMS:
135
+ return EXTRA_HEADER_SYNONYMS[normalized]
136
+ # also try case-insensitive match on keys
137
+ for k, v in EXTRA_HEADER_SYNONYMS.items():
138
+ if k.lower() == normalized.lower() or k.lower() == compact.lower():
139
+ return v
140
+ return None
141
 
142
+ # ------------------------------
143
+ # Helpers to detect red font runs robustly
144
+ # ------------------------------
145
+ def _run_is_red(run) -> bool:
146
+ """
147
+ Detect if a run is red. python-docx represents color by run.font.color.
148
+ We check RGB if available, or theme color 'red' as fallback.
149
+ """
150
  try:
151
+ color = run.font.color
152
+ if color is None:
153
+ return False
154
+ # If RGB is specified
155
+ rgb = getattr(color, "rgb", None)
156
+ if rgb is not None:
157
+ # rgb is a docx.shared.RGBColor or similar. Representable as 'FF0000' or integer tuple
158
+ hexval = ''.join("{:02X}".format(c) for c in rgb) if isinstance(rgb, (tuple, list)) else str(rgb)
159
+ # accept strings containing 'FF0000' or '0000FF'? (we want red)
160
+ # Accept any color where red component is high and others low-ish
161
+ try:
162
+ # If hex-like 'FF0000' -> interpret
163
+ hex_clean = re.sub(r"[^0-9A-Fa-f]", "", hexval)
164
+ if len(hex_clean) >= 6:
165
+ r = int(hex_clean[-6:-4], 16)
166
+ g = int(hex_clean[-4:-2], 16)
167
+ b = int(hex_clean[-2:], 16)
168
+ if r >= 150 and g < 120 and b < 120:
169
+ return True
170
+ except Exception:
171
+ pass
172
+ # fallback: theme color or color.theme_color value
173
+ theme_color = getattr(color, "theme_color", None)
174
+ if theme_color:
175
+ try:
176
+ if str(theme_color).lower().find("red") != -1:
177
+ return True
178
+ except Exception:
179
+ pass
180
  except Exception:
 
181
  pass
182
+ # final heuristic: if run.font.color.rgb as string contains 'FF' prefix and '00' for others
 
183
  try:
184
+ if hasattr(run.font.color, "rgb") and run.font.color.rgb is not None:
185
+ s = str(run.font.color.rgb)
186
+ if "FF" in s and "0000" in s:
187
+ return True
 
 
 
 
 
 
 
 
188
  except Exception:
189
  pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
190
  return False
191
 
192
+ # ------------------------------
193
+ # Extraction: paragraphs, headings, tables
194
+ # ------------------------------
195
+ def extract_from_docx(path: str) -> Dict[str, Any]:
196
+ doc = Document(path)
197
+ headings: List[str] = []
198
+ paragraphs_red: List[Dict[str, Any]] = []
199
+ red_runs: List[Dict[str, Any]] = []
200
+ tables_out: List[Dict[str, Any]] = []
201
+
202
+ # extract headings and paragraphs with red runs
203
+ for p_index, para in enumerate(doc.paragraphs):
204
+ text = para.text or ""
205
+ # identify heading level from style name if available
206
+ style_name = getattr(para.style, "name", "") if para.style is not None else ""
207
+ is_heading = bool(re.search(r"Heading\s*\d+|HEADING|TITLE|SUBTITLE", style_name, flags=re.I)) or bool(re.search(r"^(MAINTENANCE|MASS|FATIGUE|NHVAS|Vehicle Registration|CORRECTIVE)", text, flags=re.I))
208
+ if is_heading:
209
+ headings.append(text.strip())
210
+
211
+ # gather red runs in this paragraph
212
+ paragraph_red_texts = []
213
+ char_cursor = 0
214
+ for run in para.runs:
215
+ run_text = run.text or ""
216
+ run_len = len(run_text)
217
+ if _run_is_red(run) and run_text.strip():
218
+ # store a red run entry
219
+ rr = {
220
+ "text": run_text,
221
+ "paragraph_index": p_index,
222
+ "char_index": char_cursor,
223
+ "style_name": style_name
224
+ }
225
+ red_runs.append(rr)
226
+ paragraph_red_texts.append(run_text)
227
+ char_cursor += run_len
228
+ if paragraph_red_texts:
229
+ paragraphs_red.append({
230
+ "paragraph_index": p_index,
231
+ "text": text,
232
+ "red_texts": paragraph_red_texts,
233
+ "style_name": style_name
234
+ })
235
+
236
+ # extract tables
237
+ for t_index, table in enumerate(doc.tables):
238
+ # convert table to simple cell-text matrix
239
+ nrows = len(table.rows)
240
+ ncols = max(len(row.cells) for row in table.rows) if nrows > 0 else 0
241
+ headers = []
242
+ rows_text = []
243
+ rows_red_cells = []
244
+
245
+ # Attempt to treat first row as header if cells look like headers (bold or all-caps)
246
+ header_row = table.rows[0] if nrows > 0 else None
247
+
248
+ # build header texts & apply header mapping
249
+ if header_row:
250
+ for c_idx, cell in enumerate(header_row.cells):
251
+ cell_text = cell.text.strip()
252
+ # normalize & map using EXTRA_HEADER_SYNONYMS
253
+ mapped = map_header_using_extra_synonyms(cell_text)
254
+ if mapped:
255
+ header_label = mapped
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
256
  else:
257
+ header_label = cell_text
258
+ headers.append(header_label)
259
+
260
+ # process all rows -> list of lists
261
+ for r_i, row in enumerate(table.rows):
262
+ row_texts = []
263
+ row_reds = []
264
+ for c_i, cell in enumerate(row.cells):
265
+ ct = cell.text.strip()
266
+ # gather red text from runs in this cell
267
+ red_in_cell = []
268
+ # docx cell may have paragraphs
269
+ for cpara in cell.paragraphs:
270
+ for run in cpara.runs:
271
+ if _run_is_red(run) and (run.text or "").strip():
272
+ red_in_cell.append((run.text or "").strip())
273
+ # compact red text into a single string if multiple runs present
274
+ red_text_joined = " ".join(red_in_cell) if red_in_cell else None
275
+ row_texts.append(ct)
276
+ row_reds.append(red_text_joined)
277
+ rows_text.append(row_texts)
278
+ rows_red_cells.append(row_reds)
279
+
280
+ tables_out.append({
281
+ "table_index": t_index,
282
+ "nrows": nrows,
283
+ "ncols": ncols,
284
+ "headers": headers,
285
+ "rows": rows_text,
286
+ "red_cells": rows_red_cells
287
+ })
288
+
289
+ # assemble output structure
290
+ out = {
291
+ "headings": headings,
292
+ "paragraphs": paragraphs_red,
293
+ "tables": tables_out,
294
+ "red_runs": red_runs,
295
+ # helpful metadata for downstream processing
296
+ "meta": {
297
+ "source_file": path,
298
+ "total_headings": len(headings),
299
+ "total_red_paragraphs": len(paragraphs_red),
300
+ "total_tables": len(tables_out),
301
+ "total_red_runs": len(red_runs)
302
+ }
303
+ }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
304
  return out
305
 
306
+ # ------------------------------
307
+ # Command-line interface
308
+ # ------------------------------
309
+ def main(argv):
310
+ if len(argv) < 3:
311
+ print("Usage: python extract_red_text.py input.docx output.json")
312
+ sys.exit(2)
313
+ input_docx = argv[1]
314
+ output_json = argv[2]
315
+
316
+ log.info("Extracting red text from: %s", input_docx)
317
+ try:
318
+ result = extract_from_docx(input_docx)
319
+ except Exception as exc:
320
+ log.exception("Failed to extract from docx: %s", exc)
321
+ raise
322
 
323
+ # Save JSON pretty-printed for debugging by default
324
+ try:
325
+ with open(output_json, "w", encoding="utf-8") as fh:
326
+ json.dump(result, fh, ensure_ascii=False, indent=2)
327
+ log.info("Saved extracted word JSON to: %s", output_json)
328
+ except Exception:
329
+ log.exception("Failed to write output JSON to %s", output_json)
330
+ raise
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
331
 
332
+ # Print a short summary for logs / quick verification
333
+ log.info("Headings found: %d, Red paragraphs: %d, Tables: %d, Red runs: %d",
334
+ len(result.get("headings", [])),
335
+ len(result.get("paragraphs", [])),
336
+ len(result.get("tables", [])),
337
+ len(result.get("red_runs", []))
338
+ )
339
 
340
  if __name__ == "__main__":
341
+ main(sys.argv)