File size: 16,571 Bytes
e8b46b5
24ad2d2
47f7e99
24ad2d2
704d2a2
 
1055fe1
704d2a2
1055fe1
704d2a2
 
47f7e99
704d2a2
 
 
 
 
 
 
 
 
 
47f7e99
704d2a2
 
 
47f7e99
 
 
 
 
 
 
 
 
704d2a2
 
 
 
 
 
 
 
 
 
 
47f7e99
704d2a2
 
 
 
 
 
 
 
 
 
 
 
 
47f7e99
704d2a2
 
 
 
 
 
 
 
 
47f7e99
704d2a2
 
 
 
 
 
 
 
f4b6b63
704d2a2
f4b6b63
704d2a2
 
f4b6b63
704d2a2
 
47f7e99
704d2a2
 
47f7e99
704d2a2
 
 
47f7e99
704d2a2
 
 
47f7e99
704d2a2
 
 
 
47f7e99
704d2a2
7a2fc08
704d2a2
47f7e99
704d2a2
 
 
47f7e99
 
704d2a2
f4b6b63
47f7e99
 
 
 
704d2a2
47f7e99
 
 
 
 
 
 
 
 
 
 
704d2a2
 
 
47f7e99
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
704d2a2
f4b6b63
 
47f7e99
 
f4b6b63
704d2a2
 
47f7e99
 
704d2a2
f4b6b63
 
47f7e99
f4b6b63
 
47f7e99
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
704d2a2
 
 
 
47f7e99
 
704d2a2
47f7e99
704d2a2
 
 
 
 
47f7e99
 
704d2a2
 
47f7e99
 
704d2a2
47f7e99
 
 
704d2a2
 
47f7e99
704d2a2
47f7e99
704d2a2
 
47f7e99
704d2a2
 
 
47f7e99
704d2a2
47f7e99
704d2a2
 
 
 
47f7e99
 
704d2a2
 
 
47f7e99
 
704d2a2
47f7e99
704d2a2
 
 
 
 
47f7e99
 
704d2a2
 
47f7e99
 
704d2a2
47f7e99
 
 
 
 
 
 
704d2a2
47f7e99
704d2a2
 
 
 
 
47f7e99
704d2a2
 
 
 
 
47f7e99
 
 
 
 
 
 
 
 
 
704d2a2
 
47f7e99
e8b46b5
 
704d2a2
 
 
 
 
 
 
47f7e99
704d2a2
 
 
47f7e99
 
 
 
704d2a2
 
 
 
 
f4b6b63
704d2a2
 
 
 
47f7e99
704d2a2
 
 
1055fe1
47f7e99
 
 
 
 
 
 
 
 
ab82879
1055fe1
ab82879
47f7e99
ab82879
47f7e99
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
#!/usr/bin/env python3
"""
extract_red_text.py - Enhanced version with improved red text detection and master key alignment
"""

from __future__ import annotations
import json
import re
import sys
import logging
from collections import defaultdict
from typing import List, Dict, Optional, Any, Tuple

# attempt to import python-docx (document processing)
try:
    from docx import Document
    from docx.oxml.ns import qn
    from docx.shared import RGBColor
except Exception as e:
    raise RuntimeError("python-docx is required. Install with: pip install python-docx") from e

# ------------------------------
# Import master_key configurations
# ------------------------------
try:
    import master_key as mk
    GLOBAL_SETTINGS = mk.GLOBAL_SETTINGS
    EXTRA_HEADER_SYNONYMS = mk.EXTRA_HEADER_SYNONYMS
    TABLE_SCHEMAS = getattr(mk, "TABLE_SCHEMAS", {})
except ImportError as e:
    logging.error("Failed to import master_key.py: %s", e)
    raise RuntimeError("master_key.py is required for configuration") from e
except AttributeError as e:
    logging.error("Missing required configuration in master_key.py: %s", e)
    raise RuntimeError("master_key.py missing required GLOBAL_SETTINGS or EXTRA_HEADER_SYNONYMS") from e

# ------------------------------
# Logging
# ------------------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
log = logging.getLogger("extract_red_text")

# ------------------------------
# Normalization & OCR-repair utilities (aligned to GLOBAL_SETTINGS)
# ------------------------------
def _apply_ocr_repair_rules(text: str) -> str:
    """Apply OCR repair rules from GLOBAL_SETTINGS."""
    s = text or ""
    for pat, repl in GLOBAL_SETTINGS.get("ocr_repair_rules", []):
        try:
            s = re.sub(pat, repl, s, flags=re.I)
        except re.error:
            # skip invalid rule
            continue
    return s

def _normalize_text(text: str) -> str:
    """Normalize text according to GLOBAL_SETTINGS (readable normalized form)."""
    s = _apply_ocr_repair_rules(text or "")
    norm_cfg = GLOBAL_SETTINGS.get("normalize", {})
    
    if norm_cfg.get("replace_smart_dashes", False):
        s = s.replace("–", "-").replace("—", "-")
    if norm_cfg.get("lower", False):
        s = s.lower()
    if norm_cfg.get("strip_punctuation", False):
        # keep hyphen, ampersand, parentheses, slash, colon; drop other punctuation
        s = re.sub(r"[^\w\s\-\&\(\)\/:]", " ", s)
    if norm_cfg.get("collapse_whitespace", False):
        s = re.sub(r"\s+", " ", s)
    
    return s.strip()

def _compact_key(text: str) -> str:
    """Create compact key (no non-word chars) for deterministic lookup."""
    if text is None:
        return ""
    normalized = _normalize_text(text)
    return re.sub(r"[^\w]", "", normalized)

def map_header_using_extra_synonyms(header_text: str) -> Optional[str]:
    """
    Try deterministic mapping using EXTRA_HEADER_SYNONYMS.
    Return canonical label if found, else None.
    """
    if not header_text:
        return None
    
    normalized = _normalize_text(header_text)
    compact = _compact_key(header_text)
    
    # try compact key
    if compact in EXTRA_HEADER_SYNONYMS:
        return EXTRA_HEADER_SYNONYMS[compact]
    
    # try normalized key directly
    if normalized in EXTRA_HEADER_SYNONYMS:
        return EXTRA_HEADER_SYNONYMS[normalized]
    
    # also try case-insensitive match on keys
    for k, v in EXTRA_HEADER_SYNONYMS.items():
        if k.lower() == normalized.lower() or k.lower() == compact.lower():
            return v
    
    return None

# ------------------------------
# Enhanced red font detection using hf_utils pattern
# ------------------------------
def _run_is_red(run) -> bool:
    """
    Enhanced red color detection for docx.run objects.
    Uses multiple methods to detect red text robustly.
    """
    try:
        # Method 1: Check run.font.color.rgb
        col = getattr(run.font, "color", None)
        if col is not None and getattr(col, "rgb", None):
            rgb = col.rgb
            try:
                # rgb may be sequence-like or have attributes
                if hasattr(rgb, '__getitem__'):  # sequence-like
                    r, g, b = rgb[0], rgb[1], rgb[2]
                else:  # attribute access
                    r = getattr(rgb, "r", None) or getattr(rgb, "red", None)
                    g = getattr(rgb, "g", None) or getattr(rgb, "green", None)
                    b = getattr(rgb, "b", None) or getattr(rgb, "blue", None)
                
                if r is not None and g is not None and b is not None:
                    # Tolerant heuristic: red must be noticeably higher than green/blue
                    if r >= 160 and g <= 120 and b <= 120 and (r - g) >= 30 and (r - b) >= 30:
                        return True
            except Exception:
                pass
    except Exception:
        pass

    # Method 2: Check raw XML color code
    try:
        rPr = run._element.rPr
        if rPr is not None:
            clr = rPr.find('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}color')
            if clr is not None:
                val = clr.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val')
                if val and re.fullmatch(r"[0-9A-Fa-f]{6}", val):
                    rr = int(val[:2], 16)
                    gg = int(val[2:4], 16)
                    bb = int(val[4:], 16)
                    if rr >= 160 and gg <= 120 and bb <= 120 and (rr - gg) >= 30 and (rr - bb) >= 30:
                        return True
    except Exception:
        pass

    # Method 3: Check theme color
    try:
        color = run.font.color
        if color is not None:
            theme_color = getattr(color, "theme_color", None)
            if theme_color:
                theme_str = str(theme_color).lower()
                if "red" in theme_str or "accent_2" in theme_str:  # Common red theme
                    return True
    except Exception:
        pass

    # Method 4: String representation fallback
    try:
        if hasattr(run.font.color, "rgb") and run.font.color.rgb is not None:
            s = str(run.font.color.rgb)
            # Look for patterns like "FF0000" or similar high-red values
            if re.search(r"[Ff]{2}0{4}|[Ee]{2}0{4}|[Dd]{2}0{4}", s):
                return True
    except Exception:
        pass

    return False

def _extract_red_text_segments(cell):
    """Extract red text segments from a table cell."""
    segments = []
    for p_idx, paragraph in enumerate(cell.paragraphs):
        current_text = ""
        current_runs = []
        
        for r_idx, run in enumerate(paragraph.runs):
            if _run_is_red(run) and run.text.strip():
                current_text += run.text
                current_runs.append((p_idx, r_idx, run))
            else:
                # End of red segment
                if current_runs:
                    segments.append({
                        'text': current_text.strip(), 
                        'runs': current_runs.copy(), 
                        'paragraph_idx': p_idx
                    })
                    current_text = ""
                    current_runs = []
        
        # Handle segment at end of paragraph
        if current_runs:
            segments.append({
                'text': current_text.strip(), 
                'runs': current_runs.copy(), 
                'paragraph_idx': p_idx
            })
    
    return segments

def _has_red_text(cell) -> bool:
    """Check if a cell contains any red text."""
    for paragraph in cell.paragraphs:
        for run in paragraph.runs:
            if _run_is_red(run) and run.text.strip():
                return True
    return False

# ------------------------------
# Enhanced table processing with schema-aware header mapping
# ------------------------------
def _process_table_with_schema_mapping(table, t_index: int) -> Dict[str, Any]:
    """Process table with enhanced header mapping using master key schemas."""
    nrows = len(table.rows)
    ncols = max(len(row.cells) for row in table.rows) if nrows > 0 else 0
    
    if nrows == 0:
        return {
            "table_index": t_index,
            "nrows": 0,
            "ncols": 0,
            "headers": [],
            "rows": [],
            "red_cells": [],
            "mapped_headers": []
        }
    
    # Process headers from first row
    header_row = table.rows[0]
    headers = []
    mapped_headers = []
    
    for c_idx, cell in enumerate(header_row.cells[:ncols]):
        cell_text = cell.text.strip()
        
        # Try mapping using EXTRA_HEADER_SYNONYMS first
        mapped = map_header_using_extra_synonyms(cell_text)
        if mapped:
            header_label = mapped
            log.debug(f"Mapped header '{cell_text}' -> '{mapped}'")
        else:
            header_label = cell_text
        
        headers.append(cell_text)  # Original header
        mapped_headers.append(header_label)  # Mapped header
    
    # Process all rows
    rows_text = []
    rows_red_cells = []
    rows_red_metadata = []
    
    for r_i, row in enumerate(table.rows):
        row_texts = []
        row_reds = []
        row_red_meta = []
        
        for c_i, cell in enumerate(row.cells[:ncols]):
            cell_text = cell.text.strip()
            
            # Extract red text segments with metadata
            red_segments = _extract_red_text_segments(cell)
            
            if red_segments:
                # Join all red text segments
                red_text_parts = [seg['text'] for seg in red_segments if seg['text']]
                red_text_joined = " ".join(red_text_parts).strip()
                
                # Store metadata about red text location
                red_metadata = {
                    "has_red": True,
                    "red_text": red_text_joined,
                    "segments": len(red_segments),
                    "total_red_runs": sum(len(seg['runs']) for seg in red_segments)
                }
            else:
                red_text_joined = None
                red_metadata = {"has_red": False}
            
            row_texts.append(cell_text)
            row_reds.append(red_text_joined)
            row_red_meta.append(red_metadata)
        
        rows_text.append(row_texts)
        rows_red_cells.append(row_reds)
        rows_red_metadata.append(row_red_meta)
    
    return {
        "table_index": t_index,
        "nrows": nrows,
        "ncols": ncols,
        "headers": headers,  # Original headers
        "mapped_headers": mapped_headers,  # Mapped headers
        "rows": rows_text,
        "red_cells": rows_red_cells,
        "red_metadata": rows_red_metadata  # Additional red text metadata
    }

# ------------------------------
# Extraction: paragraphs, headings, tables
# ------------------------------
def extract_from_docx(path: str) -> Dict[str, Any]:
    """Extract content from DOCX with enhanced red text detection and schema mapping."""
    log.info(f"Opening document: {path}")
    doc = Document(path)
    
    headings: List[str] = []
    paragraphs_red: List[Dict[str, Any]] = []
    red_runs: List[Dict[str, Any]] = []
    tables_out: List[Dict[str, Any]] = []

    # Extract headings and paragraphs with red runs
    log.info("Processing paragraphs and headings...")
    for p_index, para in enumerate(doc.paragraphs):
        text = para.text or ""
        
        # Identify heading level from style name if available
        style_name = getattr(para.style, "name", "") if para.style is not None else ""
        is_heading = bool(re.search(r"Heading\s*\d+|HEADING|TITLE|SUBTITLE", style_name, flags=re.I)) or \
                    bool(re.search(r"^(MAINTENANCE|MASS|FATIGUE|NHVAS|Vehicle Registration|CORRECTIVE)", text, flags=re.I))
        
        if is_heading:
            headings.append(text.strip())
            log.debug(f"Found heading: {text.strip()}")

        # Gather red runs in this paragraph
        paragraph_red_texts = []
        char_cursor = 0
        
        for run in para.runs:
            run_text = run.text or ""
            run_len = len(run_text)
            
            if _run_is_red(run) and run_text.strip():
                # Store a red run entry
                rr = {
                    "text": run_text,
                    "paragraph_index": p_index,
                    "char_index": char_cursor,
                    "style_name": style_name,
                    "normalized_text": _normalize_text(run_text)
                }
                red_runs.append(rr)
                paragraph_red_texts.append(run_text)
                log.debug(f"Found red text in paragraph {p_index}: '{run_text.strip()}'")
            
            char_cursor += run_len
        
        if paragraph_red_texts:
            paragraphs_red.append({
                "paragraph_index": p_index,
                "text": text,
                "red_texts": paragraph_red_texts,
                "style_name": style_name,
                "red_text_joined": " ".join(paragraph_red_texts).strip()
            })

    # Extract tables with enhanced processing
    log.info(f"Processing {len(doc.tables)} tables...")
    for t_index, table in enumerate(doc.tables):
        table_data = _process_table_with_schema_mapping(table, t_index)
        tables_out.append(table_data)
        
        # Log red text findings
        red_cell_count = sum(1 for row in table_data["red_cells"] for cell in row if cell)
        if red_cell_count > 0:
            log.info(f"Table {t_index}: Found {red_cell_count} cells with red text")

    # Assemble output structure
    out = {
        "headings": headings,
        "paragraphs": paragraphs_red,
        "tables": tables_out,
        "red_runs": red_runs,
        # Enhanced metadata
        "meta": {
            "source_file": path,
            "total_headings": len(headings),
            "total_red_paragraphs": len(paragraphs_red),
            "total_tables": len(tables_out),
            "total_red_runs": len(red_runs),
            "total_red_cells": sum(
                sum(1 for cell in row for cell in table["red_cells"] if cell) 
                for table in tables_out
            ),
            "global_settings_used": {
                "normalization": GLOBAL_SETTINGS.get("normalize", {}),
                "ocr_repair_rules_count": len(GLOBAL_SETTINGS.get("ocr_repair_rules", [])),
                "synonyms_count": len(EXTRA_HEADER_SYNONYMS) if EXTRA_HEADER_SYNONYMS else 0
            }
        }
    }
    
    return out

# ------------------------------
# Command-line interface
# ------------------------------
def main(argv):
    if len(argv) < 3:
        print("Usage: python extract_red_text.py input.docx output.json")
        sys.exit(2)
    
    input_docx = argv[1]
    output_json = argv[2]

    log.info("Starting red text extraction from: %s", input_docx)
    log.info("Using master_key configuration with %d header synonyms", 
             len(EXTRA_HEADER_SYNONYMS) if EXTRA_HEADER_SYNONYMS else 0)
    
    try:
        result = extract_from_docx(input_docx)
    except Exception as exc:
        log.exception("Failed to extract from docx: %s", exc)
        raise

    # Save JSON pretty-printed for debugging by default
    try:
        with open(output_json, "w", encoding="utf-8") as fh:
            json.dump(result, fh, ensure_ascii=False, indent=2)
        log.info("Saved extracted data to: %s", output_json)
    except Exception:
        log.exception("Failed to write output JSON to %s", output_json)
        raise

    # Print comprehensive summary
    meta = result.get("meta", {})
    log.info("=== EXTRACTION SUMMARY ===")
    log.info("Headings found: %d", meta.get("total_headings", 0))
    log.info("Red paragraphs: %d", meta.get("total_red_paragraphs", 0))
    log.info("Red runs total: %d", meta.get("total_red_runs", 0))
    log.info("Tables processed: %d", meta.get("total_tables", 0))
    log.info("Red cells found: %d", meta.get("total_red_cells", 0))
    log.info("Header synonyms used: %d", meta.get("global_settings_used", {}).get("synonyms_count", 0))

if __name__ == "__main__":
    main(sys.argv)
    # Print output for verification
    if len(sys.argv) >= 3:
        try:
            with open(sys.argv[2], 'r') as f: 
                print(f"\n📄 EXTRACT_RED_TEXT OUTPUT:\n{f.read()}")
        except Exception as e:
            print(f"\n❌ Could not read output file: {e}")