Spaces:
Running
Running
| # hf_utils.py | |
| """ | |
| Shared helpers for HF red-text extraction / matching. | |
| Usage: | |
| from hf_utils import ( | |
| is_red_font, normalize_text, normalize_header_text, | |
| flatten_json, find_matching_json_key_and_value, | |
| get_clean_text, has_red_text, extract_red_text_segments, | |
| replace_red_text_in_cell, key_is_forbidden_for_position | |
| ) | |
| """ | |
| import re | |
| from typing import Any, Dict, Optional, Tuple | |
| from docx.shared import RGBColor | |
| # ------------------------- | |
| # Red color detection | |
| # ------------------------- | |
| def is_red_font(run) -> bool: | |
| """Robust red-color detection for docx.run objects. | |
| - checks run.font.color.rgb when available | |
| - checks run._element.rPr/w:color hex val | |
| - tolerant to slightly different reds (not strict 255,0,0). | |
| """ | |
| try: | |
| col = getattr(run.font, "color", None) | |
| if col is not None and getattr(col, "rgb", None): | |
| rgb = col.rgb | |
| try: | |
| # rgb may be sequence-like | |
| r, g, b = rgb[0], rgb[1], rgb[2] | |
| except Exception: | |
| # fallback attribute access | |
| r = getattr(rgb, "r", None) or getattr(rgb, "red", None) | |
| g = getattr(rgb, "g", None) or getattr(rgb, "green", None) | |
| b = getattr(rgb, "b", None) or getattr(rgb, "blue", None) | |
| if r is None: | |
| return False | |
| # tolerant heuristic: red must be noticeably higher than green/blue | |
| if r >= 160 and g <= 120 and b <= 120 and (r - g) >= 30 and (r - b) >= 30: | |
| return True | |
| except Exception: | |
| pass | |
| # fallback to raw XML color code if present | |
| try: | |
| rPr = run._element.rPr | |
| if rPr is not None: | |
| clr = rPr.find('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}color') | |
| if clr is not None: | |
| val = clr.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val') | |
| if val and re.fullmatch(r"[0-9A-Fa-f]{6}", val): | |
| rr, gg, bb = int(val[:2], 16), int(val[2:4], 16), int(val[4:], 16) | |
| if rr >= 160 and gg <= 120 and bb <= 120 and (rr - gg) >= 30 and (rr - bb) >= 30: | |
| return True | |
| except Exception: | |
| pass | |
| return False | |
| # ------------------------- | |
| # Text normalization | |
| # ------------------------- | |
| def normalize_text(s: Optional[str]) -> str: | |
| if s is None: | |
| return "" | |
| s = str(s) | |
| s = s.replace('\u2013', '-').replace('\u2014', '-') | |
| s = re.sub(r'[^\w\s\#\%\/\-\(\)]', ' ', s) | |
| s = re.sub(r'\s+', ' ', s).strip() | |
| return s | |
| def normalize_header_text(s: Optional[str]) -> str: | |
| if not s: | |
| return "" | |
| t = re.sub(r'\([^)]*\)', ' ', s) | |
| t = t.replace("/", " ").replace("\\", " ") | |
| t = re.sub(r'[^\w\s\#\%]', ' ', t) | |
| t = re.sub(r'\s+', ' ', t).strip().lower() | |
| t = t.replace('registrationno', 'registration number') | |
| t = t.replace('registrationnumber', 'registration number') | |
| t = t.replace('sub-contractor', 'sub contractor') | |
| t = t.replace('sub contracted', 'sub contractor') | |
| return t.strip() | |
| # ------------------------- | |
| # docx helpers | |
| # ------------------------- | |
| def get_clean_text(cell) -> str: | |
| out = [] | |
| for paragraph in cell.paragraphs: | |
| out.append("".join(run.text for run in paragraph.runs)) | |
| return " ".join(out).strip() | |
| def has_red_text(cell) -> bool: | |
| for paragraph in cell.paragraphs: | |
| for run in paragraph.runs: | |
| try: | |
| if is_red_font(run) and run.text.strip(): | |
| return True | |
| except Exception: | |
| continue | |
| return False | |
| def extract_red_text_segments(cell): | |
| segments = [] | |
| for p_idx, paragraph in enumerate(cell.paragraphs): | |
| current_text = "" | |
| current_runs = [] | |
| for r_idx, run in enumerate(paragraph.runs): | |
| if is_red_font(run) and run.text.strip(): | |
| current_text += run.text | |
| current_runs.append((p_idx, r_idx, run)) | |
| else: | |
| if current_runs: | |
| segments.append({'text': current_text, 'runs': current_runs.copy(), 'paragraph_idx': p_idx}) | |
| current_text = "" | |
| current_runs = [] | |
| if current_runs: | |
| segments.append({'text': current_text, 'runs': current_runs.copy(), 'paragraph_idx': p_idx}) | |
| return segments | |
| def replace_red_text_in_cell(cell, replacement_text: str) -> int: | |
| segments = extract_red_text_segments(cell) | |
| if not segments: | |
| return 0 | |
| first = segments[0] | |
| first_run = first['runs'][0][2] | |
| first_run.text = replacement_text | |
| try: | |
| first_run.font.color.rgb = RGBColor(0, 0, 0) | |
| except Exception: | |
| pass | |
| for _, _, run in first['runs'][1:]: | |
| run.text = '' | |
| for seg in segments[1:]: | |
| for _, _, run in seg['runs']: | |
| run.text = '' | |
| return 1 | |
| # ------------------------- | |
| # JSON helpers & matching | |
| # ------------------------- | |
| def flatten_json(y: Dict[str, Any], prefix: str = '') -> Dict[str, Any]: | |
| out = {} | |
| for key, val in y.items(): | |
| new_key = f"{prefix}.{key}" if prefix else key | |
| if isinstance(val, dict): | |
| out.update(flatten_json(val, new_key)) | |
| else: | |
| out[new_key] = val | |
| out[key] = val | |
| return out | |
| def find_matching_json_key_and_value(field_name: str, flat_json: Dict[str, Any]) -> Optional[Tuple[str, Any]]: | |
| if not field_name: | |
| return None | |
| fn = field_name.strip() | |
| if fn in flat_json: | |
| return fn, flat_json[fn] | |
| for k in flat_json: | |
| if k.lower() == fn.lower(): | |
| return k, flat_json[k] | |
| clean_field = normalize_header_text(fn) | |
| for k in flat_json: | |
| if normalize_header_text(k) == clean_field: | |
| return k, flat_json[k] | |
| field_tokens = set(w for w in re.findall(r'\b\w+\b', fn.lower()) if len(w) > 2) | |
| if not field_tokens: | |
| return None | |
| best = None | |
| best_score = 0.0 | |
| for k, v in flat_json.items(): | |
| key_tokens = set(w for w in re.findall(r'\b\w+\b', k.lower()) if len(w) > 2) | |
| if not key_tokens: | |
| continue | |
| common = field_tokens.intersection(key_tokens) | |
| if common: | |
| sim = len(common) / len(field_tokens.union(key_tokens)) | |
| cov = len(common) / len(field_tokens) | |
| score = (0.6 * sim) + (0.4 * cov) | |
| else: | |
| nf = normalize_header_text(fn) | |
| nk = normalize_header_text(k) | |
| if nf and nk and (nf in nk or nk in nf): | |
| substring_score = min(len(nf), len(nk)) / max(len(nf), len(nk)) | |
| score = 0.4 * substring_score | |
| else: | |
| score = 0.0 | |
| if score > best_score: | |
| best_score = score | |
| best = (k, v) | |
| if best and best_score >= 0.35: | |
| return best[0], best[1] | |
| return None | |
| # ------------------------- | |
| # Small safety helpers | |
| # ------------------------- | |
| _POSITION_KEY_BLACKLIST = ["attendance", "attendance list", "attendees", "attendance list (names and position titles)"] | |
| def key_is_forbidden_for_position(key: Optional[str]) -> bool: | |
| if not key: | |
| return False | |
| lk = key.lower() | |
| for b in _POSITION_KEY_BLACKLIST: | |
| if b in lk: | |
| return True | |
| return False |