# hf_utils.py """ Shared helpers for HF red-text extraction / matching. Usage: from hf_utils import ( is_red_font, normalize_text, normalize_header_text, flatten_json, find_matching_json_key_and_value, get_clean_text, has_red_text, extract_red_text_segments, replace_red_text_in_cell, key_is_forbidden_for_position ) """ import re from typing import Any, Dict, Optional, Tuple from docx.shared import RGBColor # ------------------------- # Red color detection # ------------------------- def is_red_font(run) -> bool: """Robust red-color detection for docx.run objects. - checks run.font.color.rgb when available - checks run._element.rPr/w:color hex val - tolerant to slightly different reds (not strict 255,0,0). """ try: col = getattr(run.font, "color", None) if col is not None and getattr(col, "rgb", None): rgb = col.rgb try: # rgb may be sequence-like r, g, b = rgb[0], rgb[1], rgb[2] except Exception: # fallback attribute access r = getattr(rgb, "r", None) or getattr(rgb, "red", None) g = getattr(rgb, "g", None) or getattr(rgb, "green", None) b = getattr(rgb, "b", None) or getattr(rgb, "blue", None) if r is None: return False # tolerant heuristic: red must be noticeably higher than green/blue if r >= 160 and g <= 120 and b <= 120 and (r - g) >= 30 and (r - b) >= 30: return True except Exception: pass # fallback to raw XML color code if present try: rPr = run._element.rPr if rPr is not None: clr = rPr.find('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}color') if clr is not None: val = clr.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val') if val and re.fullmatch(r"[0-9A-Fa-f]{6}", val): rr, gg, bb = int(val[:2], 16), int(val[2:4], 16), int(val[4:], 16) if rr >= 160 and gg <= 120 and bb <= 120 and (rr - gg) >= 30 and (rr - bb) >= 30: return True except Exception: pass return False # ------------------------- # Text normalization # ------------------------- def normalize_text(s: Optional[str]) -> str: if s is None: return "" s = str(s) s = s.replace('\u2013', '-').replace('\u2014', '-') s = re.sub(r'[^\w\s\#\%\/\-\(\)]', ' ', s) s = re.sub(r'\s+', ' ', s).strip() return s def normalize_header_text(s: Optional[str]) -> str: if not s: return "" t = re.sub(r'\([^)]*\)', ' ', s) t = t.replace("/", " ").replace("\\", " ") t = re.sub(r'[^\w\s\#\%]', ' ', t) t = re.sub(r'\s+', ' ', t).strip().lower() t = t.replace('registrationno', 'registration number') t = t.replace('registrationnumber', 'registration number') t = t.replace('sub-contractor', 'sub contractor') t = t.replace('sub contracted', 'sub contractor') return t.strip() # ------------------------- # docx helpers # ------------------------- def get_clean_text(cell) -> str: out = [] for paragraph in cell.paragraphs: out.append("".join(run.text for run in paragraph.runs)) return " ".join(out).strip() def has_red_text(cell) -> bool: for paragraph in cell.paragraphs: for run in paragraph.runs: try: if is_red_font(run) and run.text.strip(): return True except Exception: continue return False def extract_red_text_segments(cell): segments = [] for p_idx, paragraph in enumerate(cell.paragraphs): current_text = "" current_runs = [] for r_idx, run in enumerate(paragraph.runs): if is_red_font(run) and run.text.strip(): current_text += run.text current_runs.append((p_idx, r_idx, run)) else: if current_runs: segments.append({'text': current_text, 'runs': current_runs.copy(), 'paragraph_idx': p_idx}) current_text = "" current_runs = [] if current_runs: segments.append({'text': current_text, 'runs': current_runs.copy(), 'paragraph_idx': p_idx}) return segments def replace_red_text_in_cell(cell, replacement_text: str) -> int: segments = extract_red_text_segments(cell) if not segments: return 0 first = segments[0] first_run = first['runs'][0][2] first_run.text = replacement_text try: first_run.font.color.rgb = RGBColor(0, 0, 0) except Exception: pass for _, _, run in first['runs'][1:]: run.text = '' for seg in segments[1:]: for _, _, run in seg['runs']: run.text = '' return 1 # ------------------------- # JSON helpers & matching # ------------------------- def flatten_json(y: Dict[str, Any], prefix: str = '') -> Dict[str, Any]: out = {} for key, val in y.items(): new_key = f"{prefix}.{key}" if prefix else key if isinstance(val, dict): out.update(flatten_json(val, new_key)) else: out[new_key] = val out[key] = val return out def find_matching_json_key_and_value(field_name: str, flat_json: Dict[str, Any]) -> Optional[Tuple[str, Any]]: if not field_name: return None fn = field_name.strip() if fn in flat_json: return fn, flat_json[fn] for k in flat_json: if k.lower() == fn.lower(): return k, flat_json[k] clean_field = normalize_header_text(fn) for k in flat_json: if normalize_header_text(k) == clean_field: return k, flat_json[k] field_tokens = set(w for w in re.findall(r'\b\w+\b', fn.lower()) if len(w) > 2) if not field_tokens: return None best = None best_score = 0.0 for k, v in flat_json.items(): key_tokens = set(w for w in re.findall(r'\b\w+\b', k.lower()) if len(w) > 2) if not key_tokens: continue common = field_tokens.intersection(key_tokens) if common: sim = len(common) / len(field_tokens.union(key_tokens)) cov = len(common) / len(field_tokens) score = (0.6 * sim) + (0.4 * cov) else: nf = normalize_header_text(fn) nk = normalize_header_text(k) if nf and nk and (nf in nk or nk in nf): substring_score = min(len(nf), len(nk)) / max(len(nf), len(nk)) score = 0.4 * substring_score else: score = 0.0 if score > best_score: best_score = score best = (k, v) if best and best_score >= 0.35: return best[0], best[1] return None # ------------------------- # Small safety helpers # ------------------------- _POSITION_KEY_BLACKLIST = ["attendance", "attendance list", "attendees", "attendance list (names and position titles)"] def key_is_forbidden_for_position(key: Optional[str]) -> bool: if not key: return False lk = key.lower() for b in _POSITION_KEY_BLACKLIST: if b in lk: return True return False