Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| extract_red_text.py - Enhanced version with improved red text detection and master key alignment | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import re | |
| import sys | |
| import logging | |
| from collections import defaultdict | |
| from typing import List, Dict, Optional, Any, Tuple | |
| # attempt to import python-docx (document processing) | |
| try: | |
| from docx import Document | |
| from docx.oxml.ns import qn | |
| from docx.shared import RGBColor | |
| except Exception as e: | |
| raise RuntimeError("python-docx is required. Install with: pip install python-docx") from e | |
| # ------------------------------ | |
| # Import master_key configurations | |
| # ------------------------------ | |
| try: | |
| import master_key as mk | |
| GLOBAL_SETTINGS = mk.GLOBAL_SETTINGS | |
| EXTRA_HEADER_SYNONYMS = mk.EXTRA_HEADER_SYNONYMS | |
| TABLE_SCHEMAS = getattr(mk, "TABLE_SCHEMAS", {}) | |
| except ImportError as e: | |
| logging.error("Failed to import master_key.py: %s", e) | |
| raise RuntimeError("master_key.py is required for configuration") from e | |
| except AttributeError as e: | |
| logging.error("Missing required configuration in master_key.py: %s", e) | |
| raise RuntimeError("master_key.py missing required GLOBAL_SETTINGS or EXTRA_HEADER_SYNONYMS") from e | |
| # ------------------------------ | |
| # Logging | |
| # ------------------------------ | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") | |
| log = logging.getLogger("extract_red_text") | |
| # ------------------------------ | |
| # Normalization & OCR-repair utilities (aligned to GLOBAL_SETTINGS) | |
| # ------------------------------ | |
| def _apply_ocr_repair_rules(text: str) -> str: | |
| """Apply OCR repair rules from GLOBAL_SETTINGS.""" | |
| s = text or "" | |
| for pat, repl in GLOBAL_SETTINGS.get("ocr_repair_rules", []): | |
| try: | |
| s = re.sub(pat, repl, s, flags=re.I) | |
| except re.error: | |
| # skip invalid rule | |
| continue | |
| return s | |
| def _normalize_text(text: str) -> str: | |
| """Normalize text according to GLOBAL_SETTINGS (readable normalized form).""" | |
| s = _apply_ocr_repair_rules(text or "") | |
| norm_cfg = GLOBAL_SETTINGS.get("normalize", {}) | |
| if norm_cfg.get("replace_smart_dashes", False): | |
| s = s.replace("–", "-").replace("—", "-") | |
| if norm_cfg.get("lower", False): | |
| s = s.lower() | |
| if norm_cfg.get("strip_punctuation", False): | |
| # keep hyphen, ampersand, parentheses, slash, colon; drop other punctuation | |
| s = re.sub(r"[^\w\s\-\&\(\)\/:]", " ", s) | |
| if norm_cfg.get("collapse_whitespace", False): | |
| s = re.sub(r"\s+", " ", s) | |
| return s.strip() | |
| def _compact_key(text: str) -> str: | |
| """Create compact key (no non-word chars) for deterministic lookup.""" | |
| if text is None: | |
| return "" | |
| normalized = _normalize_text(text) | |
| return re.sub(r"[^\w]", "", normalized) | |
| def map_header_using_extra_synonyms(header_text: str) -> Optional[str]: | |
| """ | |
| Try deterministic mapping using EXTRA_HEADER_SYNONYMS. | |
| Return canonical label if found, else None. | |
| """ | |
| if not header_text: | |
| return None | |
| normalized = _normalize_text(header_text) | |
| compact = _compact_key(header_text) | |
| # try compact key | |
| if compact in EXTRA_HEADER_SYNONYMS: | |
| return EXTRA_HEADER_SYNONYMS[compact] | |
| # try normalized key directly | |
| if normalized in EXTRA_HEADER_SYNONYMS: | |
| return EXTRA_HEADER_SYNONYMS[normalized] | |
| # also try case-insensitive match on keys | |
| for k, v in EXTRA_HEADER_SYNONYMS.items(): | |
| if k.lower() == normalized.lower() or k.lower() == compact.lower(): | |
| return v | |
| return None | |
| # ------------------------------ | |
| # Enhanced red font detection using hf_utils pattern | |
| # ------------------------------ | |
| def _run_is_red(run) -> bool: | |
| """ | |
| Enhanced red color detection for docx.run objects. | |
| Uses multiple methods to detect red text robustly. | |
| """ | |
| try: | |
| # Method 1: Check run.font.color.rgb | |
| col = getattr(run.font, "color", None) | |
| if col is not None and getattr(col, "rgb", None): | |
| rgb = col.rgb | |
| try: | |
| # rgb may be sequence-like or have attributes | |
| if hasattr(rgb, '__getitem__'): # sequence-like | |
| r, g, b = rgb[0], rgb[1], rgb[2] | |
| else: # attribute access | |
| r = getattr(rgb, "r", None) or getattr(rgb, "red", None) | |
| g = getattr(rgb, "g", None) or getattr(rgb, "green", None) | |
| b = getattr(rgb, "b", None) or getattr(rgb, "blue", None) | |
| if r is not None and g is not None and b is not None: | |
| # Tolerant heuristic: red must be noticeably higher than green/blue | |
| if r >= 160 and g <= 120 and b <= 120 and (r - g) >= 30 and (r - b) >= 30: | |
| return True | |
| except Exception: | |
| pass | |
| except Exception: | |
| pass | |
| # Method 2: Check raw XML color code | |
| try: | |
| rPr = run._element.rPr | |
| if rPr is not None: | |
| clr = rPr.find('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}color') | |
| if clr is not None: | |
| val = clr.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val') | |
| if val and re.fullmatch(r"[0-9A-Fa-f]{6}", val): | |
| rr = int(val[:2], 16) | |
| gg = int(val[2:4], 16) | |
| bb = int(val[4:], 16) | |
| if rr >= 160 and gg <= 120 and bb <= 120 and (rr - gg) >= 30 and (rr - bb) >= 30: | |
| return True | |
| except Exception: | |
| pass | |
| # Method 3: Check theme color | |
| try: | |
| color = run.font.color | |
| if color is not None: | |
| theme_color = getattr(color, "theme_color", None) | |
| if theme_color: | |
| theme_str = str(theme_color).lower() | |
| if "red" in theme_str or "accent_2" in theme_str: # Common red theme | |
| return True | |
| except Exception: | |
| pass | |
| # Method 4: String representation fallback | |
| try: | |
| if hasattr(run.font.color, "rgb") and run.font.color.rgb is not None: | |
| s = str(run.font.color.rgb) | |
| # Look for patterns like "FF0000" or similar high-red values | |
| if re.search(r"[Ff]{2}0{4}|[Ee]{2}0{4}|[Dd]{2}0{4}", s): | |
| return True | |
| except Exception: | |
| pass | |
| return False | |
| def _extract_red_text_segments(cell): | |
| """Extract red text segments from a table cell.""" | |
| segments = [] | |
| for p_idx, paragraph in enumerate(cell.paragraphs): | |
| current_text = "" | |
| current_runs = [] | |
| for r_idx, run in enumerate(paragraph.runs): | |
| if _run_is_red(run) and run.text.strip(): | |
| current_text += run.text | |
| current_runs.append((p_idx, r_idx, run)) | |
| else: | |
| # End of red segment | |
| if current_runs: | |
| segments.append({ | |
| 'text': current_text.strip(), | |
| 'runs': current_runs.copy(), | |
| 'paragraph_idx': p_idx | |
| }) | |
| current_text = "" | |
| current_runs = [] | |
| # Handle segment at end of paragraph | |
| if current_runs: | |
| segments.append({ | |
| 'text': current_text.strip(), | |
| 'runs': current_runs.copy(), | |
| 'paragraph_idx': p_idx | |
| }) | |
| return segments | |
| def _has_red_text(cell) -> bool: | |
| """Check if a cell contains any red text.""" | |
| for paragraph in cell.paragraphs: | |
| for run in paragraph.runs: | |
| if _run_is_red(run) and run.text.strip(): | |
| return True | |
| return False | |
| # ------------------------------ | |
| # Enhanced table processing with schema-aware header mapping | |
| # ------------------------------ | |
| def _process_table_with_schema_mapping(table, t_index: int) -> Dict[str, Any]: | |
| """Process table with enhanced header mapping using master key schemas.""" | |
| nrows = len(table.rows) | |
| ncols = max(len(row.cells) for row in table.rows) if nrows > 0 else 0 | |
| if nrows == 0: | |
| return { | |
| "table_index": t_index, | |
| "nrows": 0, | |
| "ncols": 0, | |
| "headers": [], | |
| "rows": [], | |
| "red_cells": [], | |
| "mapped_headers": [] | |
| } | |
| # Process headers from first row | |
| header_row = table.rows[0] | |
| headers = [] | |
| mapped_headers = [] | |
| for c_idx, cell in enumerate(header_row.cells[:ncols]): | |
| cell_text = cell.text.strip() | |
| # Try mapping using EXTRA_HEADER_SYNONYMS first | |
| mapped = map_header_using_extra_synonyms(cell_text) | |
| if mapped: | |
| header_label = mapped | |
| log.debug(f"Mapped header '{cell_text}' -> '{mapped}'") | |
| else: | |
| header_label = cell_text | |
| headers.append(cell_text) # Original header | |
| mapped_headers.append(header_label) # Mapped header | |
| # Process all rows | |
| rows_text = [] | |
| rows_red_cells = [] | |
| rows_red_metadata = [] | |
| for r_i, row in enumerate(table.rows): | |
| row_texts = [] | |
| row_reds = [] | |
| row_red_meta = [] | |
| for c_i, cell in enumerate(row.cells[:ncols]): | |
| cell_text = cell.text.strip() | |
| # Extract red text segments with metadata | |
| red_segments = _extract_red_text_segments(cell) | |
| if red_segments: | |
| # Join all red text segments | |
| red_text_parts = [seg['text'] for seg in red_segments if seg['text']] | |
| red_text_joined = " ".join(red_text_parts).strip() | |
| # Store metadata about red text location | |
| red_metadata = { | |
| "has_red": True, | |
| "red_text": red_text_joined, | |
| "segments": len(red_segments), | |
| "total_red_runs": sum(len(seg['runs']) for seg in red_segments) | |
| } | |
| else: | |
| red_text_joined = None | |
| red_metadata = {"has_red": False} | |
| row_texts.append(cell_text) | |
| row_reds.append(red_text_joined) | |
| row_red_meta.append(red_metadata) | |
| rows_text.append(row_texts) | |
| rows_red_cells.append(row_reds) | |
| rows_red_metadata.append(row_red_meta) | |
| return { | |
| "table_index": t_index, | |
| "nrows": nrows, | |
| "ncols": ncols, | |
| "headers": headers, # Original headers | |
| "mapped_headers": mapped_headers, # Mapped headers | |
| "rows": rows_text, | |
| "red_cells": rows_red_cells, | |
| "red_metadata": rows_red_metadata # Additional red text metadata | |
| } | |
| # ------------------------------ | |
| # Extraction: paragraphs, headings, tables | |
| # ------------------------------ | |
| def extract_from_docx(path: str) -> Dict[str, Any]: | |
| """Extract content from DOCX with enhanced red text detection and schema mapping.""" | |
| log.info(f"Opening document: {path}") | |
| doc = Document(path) | |
| headings: List[str] = [] | |
| paragraphs_red: List[Dict[str, Any]] = [] | |
| red_runs: List[Dict[str, Any]] = [] | |
| tables_out: List[Dict[str, Any]] = [] | |
| # Extract headings and paragraphs with red runs | |
| log.info("Processing paragraphs and headings...") | |
| for p_index, para in enumerate(doc.paragraphs): | |
| text = para.text or "" | |
| # Identify heading level from style name if available | |
| style_name = getattr(para.style, "name", "") if para.style is not None else "" | |
| is_heading = bool(re.search(r"Heading\s*\d+|HEADING|TITLE|SUBTITLE", style_name, flags=re.I)) or \ | |
| bool(re.search(r"^(MAINTENANCE|MASS|FATIGUE|NHVAS|Vehicle Registration|CORRECTIVE)", text, flags=re.I)) | |
| if is_heading: | |
| headings.append(text.strip()) | |
| log.debug(f"Found heading: {text.strip()}") | |
| # Gather red runs in this paragraph | |
| paragraph_red_texts = [] | |
| char_cursor = 0 | |
| for run in para.runs: | |
| run_text = run.text or "" | |
| run_len = len(run_text) | |
| if _run_is_red(run) and run_text.strip(): | |
| # Store a red run entry | |
| rr = { | |
| "text": run_text, | |
| "paragraph_index": p_index, | |
| "char_index": char_cursor, | |
| "style_name": style_name, | |
| "normalized_text": _normalize_text(run_text) | |
| } | |
| red_runs.append(rr) | |
| paragraph_red_texts.append(run_text) | |
| log.debug(f"Found red text in paragraph {p_index}: '{run_text.strip()}'") | |
| char_cursor += run_len | |
| if paragraph_red_texts: | |
| paragraphs_red.append({ | |
| "paragraph_index": p_index, | |
| "text": text, | |
| "red_texts": paragraph_red_texts, | |
| "style_name": style_name, | |
| "red_text_joined": " ".join(paragraph_red_texts).strip() | |
| }) | |
| # Extract tables with enhanced processing | |
| log.info(f"Processing {len(doc.tables)} tables...") | |
| for t_index, table in enumerate(doc.tables): | |
| table_data = _process_table_with_schema_mapping(table, t_index) | |
| tables_out.append(table_data) | |
| # Log red text findings | |
| red_cell_count = sum(1 for row in table_data["red_cells"] for cell in row if cell) | |
| if red_cell_count > 0: | |
| log.info(f"Table {t_index}: Found {red_cell_count} cells with red text") | |
| # Assemble output structure | |
| out = { | |
| "headings": headings, | |
| "paragraphs": paragraphs_red, | |
| "tables": tables_out, | |
| "red_runs": red_runs, | |
| # Enhanced metadata | |
| "meta": { | |
| "source_file": path, | |
| "total_headings": len(headings), | |
| "total_red_paragraphs": len(paragraphs_red), | |
| "total_tables": len(tables_out), | |
| "total_red_runs": len(red_runs), | |
| "total_red_cells": sum( | |
| sum(1 for cell in row_red_cells if cell) | |
| for table in tables_out | |
| for row_red_cells in table["red_cells"] | |
| ), | |
| "global_settings_used": { | |
| "normalization": GLOBAL_SETTINGS.get("normalize", {}), | |
| "ocr_repair_rules_count": len(GLOBAL_SETTINGS.get("ocr_repair_rules", [])), | |
| "synonyms_count": len(EXTRA_HEADER_SYNONYMS) if EXTRA_HEADER_SYNONYMS else 0 | |
| } | |
| } | |
| } | |
| return out | |
| # ------------------------------ | |
| # Command-line interface | |
| # ------------------------------ | |
| def main(argv): | |
| if len(argv) < 3: | |
| print("Usage: python extract_red_text.py input.docx output.json") | |
| sys.exit(2) | |
| input_docx = argv[1] | |
| output_json = argv[2] | |
| log.info("Starting red text extraction from: %s", input_docx) | |
| log.info("Using master_key configuration with %d header synonyms", | |
| len(EXTRA_HEADER_SYNONYMS) if EXTRA_HEADER_SYNONYMS else 0) | |
| try: | |
| result = extract_from_docx(input_docx) | |
| except Exception as exc: | |
| log.exception("Failed to extract from docx: %s", exc) | |
| raise | |
| # Save JSON pretty-printed for debugging by default | |
| try: | |
| with open(output_json, "w", encoding="utf-8") as fh: | |
| json.dump(result, fh, ensure_ascii=False, indent=2) | |
| log.info("Saved extracted data to: %s", output_json) | |
| except Exception: | |
| log.exception("Failed to write output JSON to %s", output_json) | |
| raise | |
| # Print comprehensive summary | |
| meta = result.get("meta", {}) | |
| log.info("=== EXTRACTION SUMMARY ===") | |
| log.info("Headings found: %d", meta.get("total_headings", 0)) | |
| log.info("Red paragraphs: %d", meta.get("total_red_paragraphs", 0)) | |
| log.info("Red runs total: %d", meta.get("total_red_runs", 0)) | |
| log.info("Tables processed: %d", meta.get("total_tables", 0)) | |
| log.info("Red cells found: %d", meta.get("total_red_cells", 0)) | |
| log.info("Header synonyms used: %d", meta.get("global_settings_used", {}).get("synonyms_count", 0)) | |
| if __name__ == "__main__": | |
| main(sys.argv) | |
| # Print output for verification | |
| if len(sys.argv) >= 3: | |
| try: | |
| with open(sys.argv[2], 'r') as f: | |
| print(f"\n📄 EXTRACT_RED_TEXT OUTPUT:\n{f.read()}") | |
| except Exception as e: | |
| print(f"\n❌ Could not read output file: {e}") |