PDF-Data_Extractor / hf_utils.py
Shami96's picture
Create hf_utils.py
97cac57 verified
raw
history blame
7.35 kB
# hf_utils.py
"""
Shared helpers for HF red-text extraction / matching.
Usage:
from hf_utils import (
is_red_font, normalize_text, normalize_header_text,
flatten_json, find_matching_json_key_and_value,
get_clean_text, has_red_text, extract_red_text_segments,
replace_red_text_in_cell, key_is_forbidden_for_position
)
"""
import re
from typing import Any, Dict, Optional, Tuple
from docx.shared import RGBColor
# -------------------------
# Red color detection
# -------------------------
def is_red_font(run) -> bool:
"""Robust red-color detection for docx.run objects.
- checks run.font.color.rgb when available
- checks run._element.rPr/w:color hex val
- tolerant to slightly different reds (not strict 255,0,0).
"""
try:
col = getattr(run.font, "color", None)
if col is not None and getattr(col, "rgb", None):
rgb = col.rgb
try:
# rgb may be sequence-like
r, g, b = rgb[0], rgb[1], rgb[2]
except Exception:
# fallback attribute access
r = getattr(rgb, "r", None) or getattr(rgb, "red", None)
g = getattr(rgb, "g", None) or getattr(rgb, "green", None)
b = getattr(rgb, "b", None) or getattr(rgb, "blue", None)
if r is None:
return False
# tolerant heuristic: red must be noticeably higher than green/blue
if r >= 160 and g <= 120 and b <= 120 and (r - g) >= 30 and (r - b) >= 30:
return True
except Exception:
pass
# fallback to raw XML color code if present
try:
rPr = run._element.rPr
if rPr is not None:
clr = rPr.find('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}color')
if clr is not None:
val = clr.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val')
if val and re.fullmatch(r"[0-9A-Fa-f]{6}", val):
rr, gg, bb = int(val[:2], 16), int(val[2:4], 16), int(val[4:], 16)
if rr >= 160 and gg <= 120 and bb <= 120 and (rr - gg) >= 30 and (rr - bb) >= 30:
return True
except Exception:
pass
return False
# -------------------------
# Text normalization
# -------------------------
def normalize_text(s: Optional[str]) -> str:
if s is None:
return ""
s = str(s)
s = s.replace('\u2013', '-').replace('\u2014', '-')
s = re.sub(r'[^\w\s\#\%\/\-\(\)]', ' ', s)
s = re.sub(r'\s+', ' ', s).strip()
return s
def normalize_header_text(s: Optional[str]) -> str:
if not s:
return ""
t = re.sub(r'\([^)]*\)', ' ', s)
t = t.replace("/", " ").replace("\\", " ")
t = re.sub(r'[^\w\s\#\%]', ' ', t)
t = re.sub(r'\s+', ' ', t).strip().lower()
t = t.replace('registrationno', 'registration number')
t = t.replace('registrationnumber', 'registration number')
t = t.replace('sub-contractor', 'sub contractor')
t = t.replace('sub contracted', 'sub contractor')
return t.strip()
# -------------------------
# docx helpers
# -------------------------
def get_clean_text(cell) -> str:
out = []
for paragraph in cell.paragraphs:
out.append("".join(run.text for run in paragraph.runs))
return " ".join(out).strip()
def has_red_text(cell) -> bool:
for paragraph in cell.paragraphs:
for run in paragraph.runs:
try:
if is_red_font(run) and run.text.strip():
return True
except Exception:
continue
return False
def extract_red_text_segments(cell):
segments = []
for p_idx, paragraph in enumerate(cell.paragraphs):
current_text = ""
current_runs = []
for r_idx, run in enumerate(paragraph.runs):
if is_red_font(run) and run.text.strip():
current_text += run.text
current_runs.append((p_idx, r_idx, run))
else:
if current_runs:
segments.append({'text': current_text, 'runs': current_runs.copy(), 'paragraph_idx': p_idx})
current_text = ""
current_runs = []
if current_runs:
segments.append({'text': current_text, 'runs': current_runs.copy(), 'paragraph_idx': p_idx})
return segments
def replace_red_text_in_cell(cell, replacement_text: str) -> int:
segments = extract_red_text_segments(cell)
if not segments:
return 0
first = segments[0]
first_run = first['runs'][0][2]
first_run.text = replacement_text
try:
first_run.font.color.rgb = RGBColor(0, 0, 0)
except Exception:
pass
for _, _, run in first['runs'][1:]:
run.text = ''
for seg in segments[1:]:
for _, _, run in seg['runs']:
run.text = ''
return 1
# -------------------------
# JSON helpers & matching
# -------------------------
def flatten_json(y: Dict[str, Any], prefix: str = '') -> Dict[str, Any]:
out = {}
for key, val in y.items():
new_key = f"{prefix}.{key}" if prefix else key
if isinstance(val, dict):
out.update(flatten_json(val, new_key))
else:
out[new_key] = val
out[key] = val
return out
def find_matching_json_key_and_value(field_name: str, flat_json: Dict[str, Any]) -> Optional[Tuple[str, Any]]:
if not field_name:
return None
fn = field_name.strip()
if fn in flat_json:
return fn, flat_json[fn]
for k in flat_json:
if k.lower() == fn.lower():
return k, flat_json[k]
clean_field = normalize_header_text(fn)
for k in flat_json:
if normalize_header_text(k) == clean_field:
return k, flat_json[k]
field_tokens = set(w for w in re.findall(r'\b\w+\b', fn.lower()) if len(w) > 2)
if not field_tokens:
return None
best = None
best_score = 0.0
for k, v in flat_json.items():
key_tokens = set(w for w in re.findall(r'\b\w+\b', k.lower()) if len(w) > 2)
if not key_tokens:
continue
common = field_tokens.intersection(key_tokens)
if common:
sim = len(common) / len(field_tokens.union(key_tokens))
cov = len(common) / len(field_tokens)
score = (0.6 * sim) + (0.4 * cov)
else:
nf = normalize_header_text(fn)
nk = normalize_header_text(k)
if nf and nk and (nf in nk or nk in nf):
substring_score = min(len(nf), len(nk)) / max(len(nf), len(nk))
score = 0.4 * substring_score
else:
score = 0.0
if score > best_score:
best_score = score
best = (k, v)
if best and best_score >= 0.35:
return best[0], best[1]
return None
# -------------------------
# Small safety helpers
# -------------------------
_POSITION_KEY_BLACKLIST = ["attendance", "attendance list", "attendees", "attendance list (names and position titles)"]
def key_is_forbidden_for_position(key: Optional[str]) -> bool:
if not key:
return False
lk = key.lower()
for b in _POSITION_KEY_BLACKLIST:
if b in lk:
return True
return False