Shami96 commited on
Commit
5737d0c
Β·
verified Β·
1 Parent(s): 2debd4d

Update extract_red_text.py

Browse files
Files changed (1) hide show
  1. extract_red_text.py +278 -15
extract_red_text.py CHANGED
@@ -5,6 +5,33 @@ import sys
5
  from docx import Document
6
  from docx.oxml.ns import qn
7
  from master_key import TABLE_SCHEMAS, HEADING_PATTERNS, PARAGRAPH_PATTERNS
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8
 
9
  def normalize_header_label(s: str) -> str:
10
  """Normalize a header/label by stripping parentheticals & punctuation."""
@@ -314,28 +341,47 @@ def calculate_schema_match_score(schema_name, spec, context):
314
  return score, reasons
315
 
316
  def match_table_schema(tbl):
317
- """Improved table schema matching with scoring system"""
318
  context = get_table_context(tbl)
319
- # Auditor Declaration first
320
- if ("print name" in " ".join(context.get("headers", [])).lower() and
321
- "auditor" in " ".join(context.get("headers", [])).lower()):
 
 
 
 
 
 
 
 
 
 
322
  return "NHVAS Approved Auditor Declaration"
323
- # NEW: prioritize Auditor Declaration to avoid misclassification
 
 
 
 
 
 
 
 
 
 
 
324
  if looks_like_auditor_declaration(context):
325
  return "NHVAS Approved Auditor Declaration"
326
- # hard-match Operator Declaration first (high priority, avoids misclassification)
327
  if looks_like_operator_declaration(context):
328
  return "Operator Declaration"
329
- best_match = None
330
- best_score = 0
 
331
  for name, spec in TABLE_SCHEMAS.items():
332
- score, reasons = calculate_schema_match_score(name, spec, context)
333
  if score > best_score:
334
- best_score = score
335
- best_match = name
336
- if best_score >= 20:
337
- return best_match
338
- return None
339
 
340
  def check_multi_schema_table(tbl):
341
  """Check if table contains multiple schemas and split appropriately"""
@@ -613,6 +659,81 @@ def extract_table_data(tbl, schema_name, spec):
613
  result.update({f"UNMAPPED::{k}": v for k, v in unmapped_bucket.items() if v})
614
  print(f" βœ… Driver / Scheduler extracted: {len(result)} columns with data")
615
  return result
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
616
 
617
  # ───────────────────────────────────────────────────────────────────────────
618
  # C) Generic tables (unchanged: WITH dedupe)
@@ -661,6 +782,55 @@ def extract_table_data(tbl, schema_name, spec):
661
  collected[lbl].append(red_txt)
662
 
663
  return {k: v for k, v in collected.items() if v}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
664
 
665
  def extract_red_text(input_doc):
666
  # input_doc: docx.Document object or file path
@@ -672,6 +842,9 @@ def extract_red_text(input_doc):
672
  table_count = 0
673
  for tbl in doc.tables:
674
  table_count += 1
 
 
 
675
  multi_schemas = check_multi_schema_table(tbl)
676
  if multi_schemas:
677
  multi_data = extract_multi_schema_table(tbl, multi_schemas)
@@ -730,6 +903,96 @@ def extract_red_text(input_doc):
730
  if op_dec:
731
  out["Operator Declaration"] = op_dec
732
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
733
  return out
734
 
735
  def extract_red_text_filelike(input_file, output_file):
@@ -761,4 +1024,4 @@ if __name__ == "__main__":
761
  json.dump(word_data, f, indent=2, ensure_ascii=False)
762
  print(json.dumps(word_data, indent=2, ensure_ascii=False))
763
  else:
764
- print("To use as a module: extract_red_text_filelike(input_file, output_file)")
 
5
  from docx import Document
6
  from docx.oxml.ns import qn
7
  from master_key import TABLE_SCHEMAS, HEADING_PATTERNS, PARAGRAPH_PATTERNS
8
+ import unicodedata # if not already imported
9
+
10
+ MONTHS = r"(January|February|March|April|May|June|July|August|September|October|November|December|Jan|Feb|Mar|Apr|Jun|Jul|Aug|Sep|Sept|Oct|Nov|Dec)"
11
+ DATE_RE = re.compile(rf"\b(\d{{1,2}})\s*(st|nd|rd|th)?\s+{MONTHS}\s+\d{{4}}\b", re.I)
12
+ DATE_NUM_RE = re.compile(r"\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b")
13
+
14
+ # Inline sub-label regexes for Nature paragraph
15
+ ACCRED_RE = re.compile(r"\bAccreditation\s*Number[:\s-]*([A-Za-z0-9\s/-]{2,})", re.I)
16
+ EXPIRY_RE = re.compile(r"\bExpiry\s*Date[:\s-]*([A-Za-z0-9\s,/-]{2,})", re.I)
17
+
18
+ # Parent name aliases to prevent Mass Management vs Mass Management Summary mismatches
19
+ AMBIGUOUS_PARENTS = [
20
+ ("Mass Management Summary", "Mass Management"),
21
+ ("Mass Management", "Mass Management Summary"),
22
+ ]
23
+ def get_red_text(cell):
24
+ reds = [r.text for p in cell.paragraphs for r in p.runs if is_red_font(r) and r.text]
25
+ reds = coalesce_numeric_runs(reds)
26
+ return normalize_text(" ".join(reds)) if reds else ""
27
+
28
+ def _compact_digits(s: str) -> str:
29
+ # "5 1 0 6 6" -> "51066"
30
+ return re.sub(r"(?<=\d)\s+(?=\d)", "", s)
31
+
32
+ def _fix_ordinal_space(s: str) -> str:
33
+ # "13 th" -> "13th"
34
+ return re.sub(r"\b(\d+)\s+(st|nd|rd|th)\b", r"\1\2", s, flags=re.I)
35
 
36
  def normalize_header_label(s: str) -> str:
37
  """Normalize a header/label by stripping parentheticals & punctuation."""
 
341
  return score, reasons
342
 
343
  def match_table_schema(tbl):
344
+ """Improved table schema matching with explicit Attendance/Operator/Auditor guards."""
345
  context = get_table_context(tbl)
346
+ heading_low = (context.get("heading") or "").strip().lower()
347
+ headers_norm = [normalize_header_label(h).lower() for h in context.get("headers", [])]
348
+
349
+ has_print = any("print name" in h for h in headers_norm)
350
+ has_pos = any(("position title" in h) or ("position" in h and "title" in h) for h in headers_norm)
351
+ has_namecol = any(("name" in h) and ("print name" not in h) for h in headers_norm)
352
+ has_poscol = any("position" in h for h in headers_norm)
353
+ has_aud_hint = any(("auditor" in h) or ("auditor registration" in h) for h in headers_norm)
354
+
355
+ # Force-guard: explicit headings
356
+ if "operator declaration" in heading_low and has_print and has_pos:
357
+ return "Operator Declaration"
358
+ if "auditor declaration" in heading_low and has_print:
359
  return "NHVAS Approved Auditor Declaration"
360
+ if ("attendance" in heading_low or "attendees" in heading_low) and has_namecol and has_poscol:
361
+ return "Attendance List (Names and Position Titles)"
362
+
363
+ # Priority: auditor if signature columns + auditor hints
364
+ if has_print and has_aud_hint:
365
+ return "NHVAS Approved Auditor Declaration"
366
+
367
+ # Classic 2-col signature table β†’ Operator Declaration
368
+ if has_print and has_pos:
369
+ return "Operator Declaration"
370
+
371
+ # Heuristic fallbacks
372
  if looks_like_auditor_declaration(context):
373
  return "NHVAS Approved Auditor Declaration"
 
374
  if looks_like_operator_declaration(context):
375
  return "Operator Declaration"
376
+
377
+ # Score-based fallback
378
+ best_match, best_score = None, 0
379
  for name, spec in TABLE_SCHEMAS.items():
380
+ score, _ = calculate_schema_match_score(name, spec, context)
381
  if score > best_score:
382
+ best_score, best_match = score, name
383
+ return best_match if best_score >= 20 else None
384
+
 
 
385
 
386
  def check_multi_schema_table(tbl):
387
  """Check if table contains multiple schemas and split appropriately"""
 
659
  result.update({f"UNMAPPED::{k}": v for k, v in unmapped_bucket.items() if v})
660
  print(f" βœ… Driver / Scheduler extracted: {len(result)} columns with data")
661
  return result
662
+ # ───────────────────────────────────────────────────────────────────────────
663
+ # ATTENDANCE LIST (keep red-only; avoid duplicates; prefer whole-cell lines)
664
+ # ───────────────────────────────────────────────────────────────────────────
665
+ if "Attendance List" in schema_name:
666
+ items, seen = [], set()
667
+
668
+ # header sniff
669
+ hdr = [normalize_text(c.text).lower() for c in (tbl.rows[0].cells if tbl.rows else [])]
670
+ start_row = 1 if (any("name" in h for h in hdr) and any("position" in h for h in hdr)) else 0
671
+
672
+ for row in tbl.rows[start_row:]:
673
+ # collect red text from each cell
674
+ reds = [get_red_text(c) for c in row.cells]
675
+ reds = [r for r in reds if r]
676
+
677
+ if not reds:
678
+ continue
679
+
680
+ # if first cell already contains "Name - Position", use it as-is
681
+ if " - " in reds[0]:
682
+ entry = reds[0]
683
+ else:
684
+ # typical 2 columns: Name | Position
685
+ if len(reds) >= 2:
686
+ entry = f"{reds[0]} - {reds[1]}"
687
+ else:
688
+ entry = reds[0]
689
+
690
+ entry = normalize_text(entry)
691
+
692
+ # collapse accidental double-ups like "A - B - A - B"
693
+ parts = [p.strip() for p in entry.split(" - ") if p.strip()]
694
+ if len(parts) >= 4 and parts[:2] == parts[2:4]:
695
+ entry = " - ".join(parts[:2])
696
+
697
+ if entry and entry not in seen:
698
+ seen.add(entry)
699
+ items.append(entry)
700
+
701
+ return {schema_name: items} if items else {}
702
+
703
+ # ───────────────────────────────────────────────────────────────────────────
704
+ # ACCREDITATION VEHICLE SUMMARY (pairwise label/value per row)
705
+ # Expected labels in spec["labels"]:
706
+ # ["Number of powered vehicles", "Number of trailing vehicles"]
707
+ # ───────────────────────────────────────────────────────────────────────────
708
+ if schema_name == "Accreditation Vehicle Summary":
709
+ labels = spec["labels"]
710
+ canonical_labels = {normalize_header_label(lbl).lower(): lbl for lbl in labels}
711
+ collected = {lbl: [] for lbl in labels}
712
+
713
+ def map_label(txt):
714
+ t = normalize_header_label(txt).lower()
715
+ if t in canonical_labels:
716
+ return canonical_labels[t]
717
+ # loose fallback
718
+ best, score = None, 0.0
719
+ for canon, original in canonical_labels.items():
720
+ s = bag_similarity(t, canon)
721
+ if s > score:
722
+ best, score = original, s
723
+ return best if score >= 0.40 else None
724
+
725
+ for row in tbl.rows:
726
+ # iterate label/value pairs across the row: (0,1), (2,3), ...
727
+ i = 0
728
+ while i + 1 < len(row.cells):
729
+ lbl_txt = normalize_text(row.cells[i].text)
730
+ val_txt = get_red_text(row.cells[i + 1])
731
+ mlabel = map_label(lbl_txt)
732
+ if mlabel and val_txt:
733
+ collected[mlabel].append(val_txt)
734
+ i += 2
735
+
736
+ return {k: v for k, v in collected.items() if v}
737
 
738
  # ───────────────────────────────────────────────────────────────────────────
739
  # C) Generic tables (unchanged: WITH dedupe)
 
782
  collected[lbl].append(red_txt)
783
 
784
  return {k: v for k, v in collected.items() if v}
785
+ def _try_extract_nature_inline_labels(tbl, out_dict):
786
+ # Check context
787
+ prev = normalize_text(_prev_para_text(tbl)).lower()
788
+ if "nature of the operators business" not in prev:
789
+ return False
790
+
791
+ acc_val, exp_val, para_bits = None, None, []
792
+
793
+ for row in tbl.rows[1:]:
794
+ row_text = " ".join(normalize_text(c.text) for c in row.cells if c.text.strip())
795
+ if not row_text:
796
+ continue
797
+ low = row_text.lower()
798
+
799
+ def _red_from_row():
800
+ vals = []
801
+ for c in row.cells:
802
+ for p in c.paragraphs:
803
+ reds = [r.text for r in p.runs if is_red_font(r) and r.text.strip()]
804
+ if reds:
805
+ vals.extend(reds)
806
+ return normalize_text(" ".join(coalesce_numeric_runs(vals)))
807
+
808
+ if low.startswith("accreditation number"):
809
+ v = _red_from_row() or normalize_text(row_text.split(":", 1)[-1])
810
+ acc_val = _compact_digits(v) if v else acc_val
811
+ continue
812
+
813
+ if low.startswith("expiry date"):
814
+ v = _red_from_row() or normalize_text(row_text.split(":", 1)[-1])
815
+ exp_val = _fix_ordinal_space(v) if v else exp_val
816
+ continue
817
+
818
+ # otherwise narrative line
819
+ para_bits.append(row_text)
820
+
821
+ if not (para_bits or acc_val or exp_val):
822
+ return False
823
+
824
+ sec = out_dict.setdefault("Nature of the Operators Business (Summary)", {})
825
+ if para_bits:
826
+ sec.setdefault("Nature of the Operators Business (Summary):", []).append(
827
+ normalize_text(" ".join(para_bits))
828
+ )
829
+ if acc_val:
830
+ sec.setdefault("Accreditation Number", []).append(acc_val)
831
+ if exp_val:
832
+ sec.setdefault("Expiry Date", []).append(exp_val)
833
+ return True
834
 
835
  def extract_red_text(input_doc):
836
  # input_doc: docx.Document object or file path
 
842
  table_count = 0
843
  for tbl in doc.tables:
844
  table_count += 1
845
+ # Nature-of-business inline labels, if present as table rows
846
+ if _try_extract_nature_inline_labels(tbl, out):
847
+ continue
848
  multi_schemas = check_multi_schema_table(tbl)
849
  if multi_schemas:
850
  multi_data = extract_multi_schema_table(tbl, multi_schemas)
 
903
  if op_dec:
904
  out["Operator Declaration"] = op_dec
905
 
906
+ # β€”β€” Handle ambiguous parents without creating unwanted duplicates β€”β€”
907
+ # Only create aliases for legitimately different content, not summary tables
908
+ summary_sections = {k for k in out.keys() if "Summary" in k}
909
+
910
+ processed_pairs = set()
911
+ for a, b in AMBIGUOUS_PARENTS:
912
+ pair_key = tuple(sorted([a, b]))
913
+ if pair_key in processed_pairs:
914
+ continue
915
+ processed_pairs.add(pair_key)
916
+
917
+ # Skip if one is a Summary table and the other isn't - these should remain separate
918
+ if ("Summary" in a) != ("Summary" in b):
919
+ continue
920
+
921
+ if a in out and b in out:
922
+ # Both exist - check if they have identical content
923
+ if out[a] == out[b]:
924
+ # Remove the duplicate (prefer Summary version if available)
925
+ to_remove = b if "Summary" in a else a
926
+ del out[to_remove]
927
+ elif a in out and b not in out:
928
+ # Only create alias if not a summary table and names are different
929
+ if a != b and "Summary" not in a and len(out[a]) > 1:
930
+ out[b] = out[a]
931
+ elif b in out and a not in out:
932
+ # Only create alias if not a summary table and names are different
933
+ if a != b and "Summary" not in b and len(out[b]) > 1:
934
+ out[a] = out[b]
935
+
936
+ # β€”β€” add Accreditation Number and Expiry Date from Nature paragraph (do NOT edit the paragraph) β€”β€”
937
+ for sec_key, section in list(out.items()):
938
+ if not isinstance(section, dict):
939
+ continue
940
+ if re.fullmatch(r"Nature of the Operators Business \(Summary\)", sec_key, flags=re.I):
941
+ # find the main paragraph field "...(Summary):"
942
+ para_field = None
943
+ for k in section.keys():
944
+ if re.search(r"\(Summary\):\s*$", k):
945
+ para_field = k
946
+ break # <- break only when found
947
+ if not para_field:
948
+ continue
949
+
950
+ raw = section.get(para_field)
951
+ if isinstance(raw, list):
952
+ para = " ".join(str(x) for x in raw)
953
+ else:
954
+ para = str(raw or "")
955
+
956
+ m_acc = ACCRED_RE.search(para)
957
+ m_exp = EXPIRY_RE.search(para)
958
+
959
+ # labeled matches
960
+ if m_acc:
961
+ v = _compact_digits(_fix_ordinal_space(normalize_text(m_acc.group(1))))
962
+ if v:
963
+ section.setdefault("Accreditation Number", []).append(v)
964
+ if m_exp:
965
+ v = _compact_digits(_fix_ordinal_space(normalize_text(m_exp.group(1))))
966
+ if v:
967
+ section.setdefault("Expiry Date", []).append(v)
968
+
969
+ # fallback when labels are missing but values appear at the end
970
+ acc_missing = not section.get("Accreditation Number")
971
+ exp_missing = not section.get("Expiry Date")
972
+
973
+ if acc_missing or exp_missing:
974
+ # find the last date-like token (wordy month or numeric)
975
+ last_date_match = None
976
+ for md in DATE_RE.finditer(para):
977
+ last_date_match = md
978
+ if not last_date_match:
979
+ for md in DATE_NUM_RE.finditer(para):
980
+ last_date_match = md
981
+
982
+ if last_date_match:
983
+ if exp_missing:
984
+ date_txt = _fix_ordinal_space(last_date_match.group(0))
985
+ section.setdefault("Expiry Date", []).append(normalize_text(date_txt))
986
+
987
+ if acc_missing:
988
+ # take digits immediately before the date
989
+ before = para[: last_date_match.start()]
990
+ m_num = re.search(r"(\d[\d\s]{3,12})\s*$", before)
991
+ if m_num:
992
+ num_txt = _compact_digits(normalize_text(m_num.group(1)))
993
+ if num_txt:
994
+ section.setdefault("Accreditation Number", []).append(num_txt)
995
+
996
  return out
997
 
998
  def extract_red_text_filelike(input_file, output_file):
 
1024
  json.dump(word_data, f, indent=2, ensure_ascii=False)
1025
  print(json.dumps(word_data, indent=2, ensure_ascii=False))
1026
  else:
1027
+ print("To use as a module: extract_red_text_filelike(input_file, output_file)")