PDF-Data_Extractor / updated_word.py
Shami96's picture
Update updated_word.py
560885b verified
raw
history blame
78 kB
#!/usr/bin/env python3
"""
pipeline.py β€” safer matching and operator-declaration protections
Key improvements:
- find_matching_json_key_and_value() returns (key, value) so callers can accept/reject by key.
- Higher fuzzy thresholds for risky substitutions.
- Operator Declaration: avoid using attendance lists / unrelated keys for Position Title.
- Vehicle header mapping: stronger normalized substring/ token matching for long headers.
- Preserves existing logging and all previous handlers/logic.
"""
import json
from docx import Document
from docx.shared import RGBColor
import re
from typing import Any, Tuple, Optional
# ============================================================================
# Heading patterns for document structure detection (unchanged)
# ============================================================================
HEADING_PATTERNS = {
"main": [
r"NHVAS\s+Audit\s+Summary\s+Report",
r"NATIONAL\s+HEAVY\s+VEHICLE\s+ACCREDITATION\s+AUDIT\s+SUMMARY\s+REPORT",
r"NHVAS\s+AUDIT\s+SUMMARY\s+REPORT"
],
"sub": [
r"AUDIT\s+OBSERVATIONS\s+AND\s+COMMENTS",
r"MAINTENANCE\s+MANAGEMENT",
r"MASS\s+MANAGEMENT",
r"FATIGUE\s+MANAGEMENT",
r"Fatigue\s+Management\s+Summary\s+of\s+Audit\s+findings",
r"MAINTENANCE\s+MANAGEMENT\s+SUMMARY\s+OF\s+AUDIT\s+FINDINGS",
r"MASS\s+MANAGEMENT\s+SUMMARY\s+OF\s+AUDIT\s+FINDINGS",
r"Vehicle\s+Registration\s+Numbers\s+of\s+Records\s+Examined",
r"CORRECTIVE\s+ACTION\s+REQUEST\s+\(CAR\)",
r"NHVAS\s+APPROVED\s+AUDITOR\s+DECLARATION",
r"Operator\s+Declaration",
r"Operator\s+Information",
r"Driver\s*/\s*Scheduler\s+Records\s+Examined"
]
}
# ============================================================================
# Utility helpers
# ============================================================================
_unmatched_headers = {}
def record_unmatched_header(header: str):
if not header:
return
_unmatched_headers[header] = _unmatched_headers.get(header, 0) + 1
def load_json(filepath):
with open(filepath, 'r', encoding='utf-8') as file:
return json.load(file)
def flatten_json(y, prefix=''):
out = {}
for key, val in y.items():
new_key = f"{prefix}.{key}" if prefix else key
if isinstance(val, dict):
out.update(flatten_json(val, new_key))
else:
out[new_key] = val
out[key] = val
return out
def is_red(run):
color = run.font.color
try:
return color and ((getattr(color, "rgb", None) and color.rgb == RGBColor(255, 0, 0)) or getattr(color, "theme_color", None) == 1)
except Exception:
return False
def get_value_as_string(value, field_name=""):
if isinstance(value, list):
if len(value) == 0:
return ""
elif len(value) == 1:
return str(value[0])
else:
# Keep lists intact for special patterns (e.g., ACN digits) but default to join
if "australian company number" in field_name.lower() or "company number" in field_name.lower():
return value
return " ".join(str(v) for v in value)
else:
return str(value)
def get_clean_text(cell):
text = ""
for paragraph in cell.paragraphs:
for run in paragraph.runs:
text += run.text
return text.strip()
def has_red_text(cell):
for paragraph in cell.paragraphs:
for run in paragraph.runs:
if is_red(run) and run.text.strip():
return True
return False
def has_red_text_in_paragraph(paragraph):
for run in paragraph.runs:
if is_red(run) and run.text.strip():
return True
return False
def normalize_header_text(s: str) -> str:
if not s:
return ""
s = re.sub(r'\([^)]*\)', ' ', s) # remove parenthetical content
s = s.replace("/", " ")
s = re.sub(r'[^\w\s\#\%]', ' ', s)
s = re.sub(r'\s+', ' ', s).strip().lower()
# canonical tweaks
s = s.replace('registrationno', 'registration number')
s = s.replace('registrationnumber', 'registration number')
s = s.replace('sub-contractor', 'sub contractor')
s = s.replace('sub contracted', 'sub contractor')
return s.strip()
# ============================================================================
# JSON matching functions
# - find_matching_json_value: (keeps behavior used elsewhere)
# - find_matching_json_key_and_value: returns (key, value) so callers can
# decide whether to use an entry based on the matched key.
# ============================================================================
def find_matching_json_value(field_name, flat_json):
"""Legacy API: return value only (preserves existing callers)."""
result = find_matching_json_key_and_value(field_name, flat_json)
return result[1] if result else None
def find_matching_json_key_and_value(field_name, flat_json) -> Optional[Tuple[str, Any]]:
"""
Return (matched_key, matched_value) or None.
Safer thresholds: fuzzy matches require >=0.35 by default.
"""
field_name = (field_name or "").strip()
if not field_name:
return None
# Exact match
if field_name in flat_json:
print(f" βœ… Direct match found for key '{field_name}'")
return field_name, flat_json[field_name]
# Case-insensitive exact
for key, value in flat_json.items():
if key.lower() == field_name.lower():
print(f" βœ… Case-insensitive match found for key '{field_name}' -> '{key}'")
return key, value
# Special-case 'print name' preference for operator vs auditor (prefer fully-qualified)
if field_name.lower().strip() == "print name":
operator_keys = [k for k in flat_json.keys() if "operator" in k.lower() and "print name" in k.lower()]
auditor_keys = [k for k in flat_json.keys() if "auditor" in k.lower() and ("print name" in k.lower() or "name" in k.lower())]
if operator_keys:
print(f" βœ… Operator Print Name match: '{field_name}' -> '{operator_keys[0]}'")
return operator_keys[0], flat_json[operator_keys[0]]
elif auditor_keys:
print(f" βœ… Auditor Name match: '{field_name}' -> '{auditor_keys[0]}'")
return auditor_keys[0], flat_json[auditor_keys[0]]
# Suffix match for nested keys (e.g., 'section.field')
for key, value in flat_json.items():
if '.' in key and key.split('.')[-1].lower() == field_name.lower():
print(f" βœ… Suffix match found for key '{field_name}' -> '{key}'")
return key, value
# Clean and exact
clean_field = re.sub(r'[^\w\s]', ' ', field_name.lower()).strip()
clean_field = re.sub(r'\s+', ' ', clean_field)
for key, value in flat_json.items():
clean_key = re.sub(r'[^\w\s]', ' ', key.lower()).strip()
clean_key = re.sub(r'\s+', ' ', clean_key)
if clean_field == clean_key:
print(f" βœ… Clean match found for key '{field_name}' -> '{key}'")
return key, value
# Fuzzy matching with token scoring
field_words = set(word.lower() for word in re.findall(r'\b\w+\b', field_name) if len(word) > 2)
if not field_words:
return None
best_key = None
best_value = None
best_score = 0.0
for key, value in flat_json.items():
key_words = set(word.lower() for word in re.findall(r'\b\w+\b', key) if len(word) > 2)
if not key_words:
continue
common = field_words.intersection(key_words)
if not common:
# allow substring in normalized forms as a weaker fallback
norm_field = normalize_header_text(field_name)
norm_key = normalize_header_text(key)
if norm_field and norm_key and (norm_field in norm_key or norm_key in norm_field):
# substring score based on length ratio
substring_score = min(len(norm_field), len(norm_key)) / max(len(norm_field), len(norm_key))
final_score = 0.4 * substring_score
else:
final_score = 0.0
else:
similarity = len(common) / len(field_words.union(key_words))
coverage = len(common) / len(field_words)
final_score = (similarity * 0.6) + (coverage * 0.4)
if final_score > best_score:
best_score = final_score
best_key = key
best_value = value
# Accept only reasonable fuzzy matches (threshold 0.35)
if best_key and best_score >= 0.35:
print(f" βœ… Fuzzy match found for key '{field_name}' with JSON key '{best_key}' (score: {best_score:.2f})")
return best_key, best_value
print(f" ❌ No match found for '{field_name}'")
return None
# ============================================================================
# Red text helpers (unchanged except kept robust)
# ============================================================================
def extract_red_text_segments(cell):
red_segments = []
for para_idx, paragraph in enumerate(cell.paragraphs):
current_segment = ""
segment_runs = []
for run_idx, run in enumerate(paragraph.runs):
if is_red(run):
if run.text:
current_segment += run.text
segment_runs.append((para_idx, run_idx, run))
else:
if segment_runs:
red_segments.append({'text': current_segment, 'runs': segment_runs.copy(), 'paragraph_idx': para_idx})
current_segment = ""
segment_runs = []
if segment_runs:
red_segments.append({'text': current_segment, 'runs': segment_runs.copy(), 'paragraph_idx': para_idx})
return red_segments
def replace_all_red_segments(red_segments, replacement_text):
if not red_segments:
return 0
if '\n' in replacement_text:
replacement_lines = replacement_text.split('\n')
else:
replacement_lines = [replacement_text]
replacements_made = 0
first_segment = red_segments[0]
if first_segment['runs']:
first_run = first_segment['runs'][0][2]
first_run.text = replacement_lines[0]
first_run.font.color.rgb = RGBColor(0, 0, 0)
replacements_made = 1
for _, _, run in first_segment['runs'][1:]:
run.text = ''
for segment in red_segments[1:]:
for _, _, run in segment['runs']:
run.text = ''
if len(replacement_lines) > 1 and red_segments:
try:
first_run = red_segments[0]['runs'][0][2]
paragraph = first_run.element.getparent()
from docx.oxml import OxmlElement
for line in replacement_lines[1:]:
if line.strip():
br = OxmlElement('w:br')
first_run.element.append(br)
new_run = paragraph.add_run(line.strip())
new_run.font.color.rgb = RGBColor(0, 0, 0)
except Exception:
if red_segments and red_segments[0]['runs']:
first_run = red_segments[0]['runs'][0][2]
first_run.text = ' '.join(replacement_lines)
first_run.font.color.rgb = RGBColor(0, 0, 0)
return replacements_made
def replace_single_segment(segment, replacement_text):
if not segment['runs']:
return False
first_run = segment['runs'][0][2]
first_run.text = replacement_text
first_run.font.color.rgb = RGBColor(0, 0, 0)
for _, _, run in segment['runs'][1:]:
run.text = ''
return True
def replace_red_text_in_cell(cell, replacement_text):
red_segments = extract_red_text_segments(cell)
if not red_segments:
return 0
return replace_all_red_segments(red_segments, replacement_text)
# ============================================================================
# Specialized handlers (vehicle, attendance, management, operator) with fixes
# ============================================================================
def handle_australian_company_number(row, company_numbers):
replacements_made = 0
for i, digit in enumerate(company_numbers):
cell_idx = i + 1
if cell_idx < len(row.cells):
cell = row.cells[cell_idx]
if has_red_text(cell):
cell_replacements = replace_red_text_in_cell(cell, str(digit))
replacements_made += cell_replacements
print(f" -> Placed digit '{digit}' in cell {cell_idx + 1}")
return replacements_made
def handle_vehicle_registration_table(table, flat_json):
"""
Stronger header normalization + substring matching for long headers.
Keeps existing behavior but reduces 'No mapping found' by using normalized substring matching.
"""
replacements_made = 0
# Build candidate vehicle_section similar to prior logic
vehicle_section = None
# Prefer keys explicitly mentioning 'registration' or 'vehicle'
candidates = [(k, v) for k, v in flat_json.items() if 'registration' in k.lower() or 'vehicle' in k.lower()]
if candidates:
# prefer the one with longest key match (likely most specific)
candidates.sort(key=lambda kv: -len(kv[0]))
vehicle_section = candidates[0][1]
# fallback: collect flattened keys that look like vehicle columns
if vehicle_section is None:
potential_columns = {}
for key, value in flat_json.items():
lk = key.lower()
if any(col_name in lk for col_name in ["registration number", "sub-contractor", "weight verification", "rfs suspension", "trip records", "fault recording", "fault repair", "daily checks", "roadworthiness"]):
if "." in key:
column_name = key.split(".")[-1]
else:
column_name = key
potential_columns[column_name] = value
if potential_columns:
vehicle_section = potential_columns
print(f" βœ… Found vehicle data from flattened keys: {list(vehicle_section.keys())}")
if not vehicle_section:
print(f" ❌ Vehicle registration data not found in JSON")
return 0
# Normalize vehicle_section into dict of column_label -> list/value
if isinstance(vehicle_section, list):
# if list of dicts, pivot
if vehicle_section and isinstance(vehicle_section[0], dict):
flattened = {}
for entry in vehicle_section:
for k, v in entry.items():
flattened.setdefault(k, []).append(v)
vehicle_section = flattened
else:
# can't interpret, bail
vehicle_section = {}
if not isinstance(vehicle_section, dict):
try:
vehicle_section = dict(vehicle_section)
except Exception:
vehicle_section = {}
print(f" βœ… Found vehicle registration data with {len(vehicle_section)} columns")
# Find header row (look for registration + number or reg no)
header_row_idx = -1
header_row = None
for row_idx, row in enumerate(table.rows):
row_text = " ".join(get_clean_text(cell).lower() for cell in row.cells)
if ("registration" in row_text and "number" in row_text) or "reg no" in row_text or "registration no" in row_text:
header_row_idx = row_idx
header_row = row
break
if header_row_idx == -1:
print(f" ❌ Could not find header row in vehicle table")
return 0
print(f" βœ… Found header row at index {header_row_idx}")
# Build master labels from vehicle_section keys
master_labels = {}
for orig_key in vehicle_section.keys():
norm = normalize_header_text(str(orig_key))
if norm:
# if there is collision, prefer longer orig_key (more specific)
if norm in master_labels:
if len(orig_key) > len(master_labels[norm]):
master_labels[norm] = orig_key
else:
master_labels[norm] = orig_key
# Map header cells using normalized token overlap + substring fallback
column_mapping = {}
for col_idx, cell in enumerate(header_row.cells):
header_text = get_clean_text(cell).strip()
if not header_text:
continue
header_key = header_text.strip().lower()
if header_key in {"no", "no.", "#"}:
continue
norm_header = normalize_header_text(header_text)
best_match = None
best_score = 0.0
# exact normalized match
if norm_header in master_labels:
best_match = master_labels[norm_header]
best_score = 1.0
else:
# token overlap
header_tokens = set(t for t in norm_header.split() if len(t) > 2)
for norm_key, orig_label in master_labels.items():
key_tokens = set(t for t in norm_key.split() if len(t) > 2)
if not key_tokens:
continue
common = header_tokens.intersection(key_tokens)
if common:
score = len(common) / max(1, len(header_tokens.union(key_tokens)))
else:
# substring fallback on normalized strings
if norm_header in norm_key or norm_key in norm_header:
score = min(len(norm_header), len(norm_key)) / max(len(norm_header), len(norm_key))
else:
score = 0.0
if score > best_score:
best_score = score
best_match = orig_label
# additional heuristic: if header contains 'roadworthiness' and any master_labels key contains that token, accept
if not best_match:
for norm_key, orig_label in master_labels.items():
if 'roadworthiness' in norm_header and 'roadworthiness' in norm_key:
best_match = orig_label
best_score = 0.65
break
if best_match and best_score >= 0.30:
column_mapping[col_idx] = best_match
print(f" πŸ“Œ Column {col_idx}: '{header_text}' -> '{best_match}' (norm:'{norm_header}' score:{best_score:.2f})")
else:
print(f" ⚠️ No mapping found for '{header_text}' (norm:'{norm_header}')")
record_unmatched_header(header_text)
if not column_mapping:
print(f" ❌ No column mappings found")
return 0
# Determine how many rows of data to populate
max_data_rows = 0
for json_key, data in vehicle_section.items():
if isinstance(data, list):
max_data_rows = max(max_data_rows, len(data))
print(f" πŸ“Œ Need to populate {max_data_rows} data rows")
# Populate or add rows
for data_row_index in range(max_data_rows):
table_row_idx = header_row_idx + 1 + data_row_index
if table_row_idx >= len(table.rows):
print(f" ⚠️ Row {table_row_idx + 1} doesn't exist, adding one")
table.add_row()
row = table.rows[table_row_idx]
print(f" πŸ“Œ Processing data row {table_row_idx + 1} (vehicle {data_row_index + 1})")
for col_idx, json_key in column_mapping.items():
if col_idx < len(row.cells):
cell = row.cells[col_idx]
column_data = vehicle_section.get(json_key, [])
if isinstance(column_data, list) and data_row_index < len(column_data):
replacement_value = str(column_data[data_row_index])
cell_text = get_clean_text(cell)
if has_red_text(cell) or not cell_text.strip():
if not cell_text.strip():
cell.text = replacement_value
replacements_made += 1
print(f" -> Added '{replacement_value}' to empty cell (col '{json_key}')")
else:
cell_replacements = replace_red_text_in_cell(cell, replacement_value)
replacements_made += cell_replacements
if cell_replacements > 0:
print(f" -> Replaced red text with '{replacement_value}' (col '{json_key}')")
return replacements_made
def handle_attendance_list_table_enhanced(table, flat_json):
"""Same as before β€” preserved behavior."""
replacements_made = 0
attendance_patterns = ["attendance list", "names and position titles", "attendees"]
found_attendance_row = None
for row_idx, row in enumerate(table.rows[:3]):
for cell_idx, cell in enumerate(row.cells):
cell_text = get_clean_text(cell).lower()
if any(pattern in cell_text for pattern in attendance_patterns):
found_attendance_row = row_idx
print(f" 🎯 ENHANCED: Found Attendance List in row {row_idx + 1}, cell {cell_idx + 1}")
break
if found_attendance_row is not None:
break
if found_attendance_row is None:
return 0
attendance_value = None
attendance_search_keys = [
"Attendance List (Names and Position Titles).Attendance List (Names and Position Titles)",
"Attendance List (Names and Position Titles)",
"attendance list",
"attendees"
]
print(f" πŸ” Searching for attendance data in JSON...")
for search_key in attendance_search_keys:
kv = find_matching_json_key_and_value(search_key, flat_json)
if kv:
attendance_value = kv[1]
print(f" βœ… Found attendance data with key: '{kv[0]}'")
print(f" πŸ“Š Raw value: {attendance_value}")
break
if attendance_value is None:
print(f" ❌ No attendance data found in JSON")
return 0
# Find red text candidate cell
target_cell = None
print(f" πŸ” Scanning ALL cells in attendance table for red text...")
for row_idx, row in enumerate(table.rows):
for cell_idx, cell in enumerate(row.cells):
if has_red_text(cell):
red_text = ""
for paragraph in cell.paragraphs:
for run in paragraph.runs:
if is_red(run):
red_text += run.text
if red_text.strip():
print(f" 🎯 Found red text in row {row_idx + 1}, cell {cell_idx + 1}")
print(f" πŸ“‹ Red text content: '{red_text[:60]}...'")
red_lower = red_text.lower()
if any(ind in red_lower for ind in ['manager', 'director', 'auditor', '–', '-']):
target_cell = cell
print(f" βœ… This looks like attendance data - using this cell")
break
if target_cell:
break
if target_cell is None:
print(f" ⚠️ No red text found that looks like attendance data")
return 0
if has_red_text(target_cell):
print(f" πŸ”§ Replacing red text with properly formatted attendance list...")
if isinstance(attendance_value, list):
attendance_list = [str(item).strip() for item in attendance_value if str(item).strip()]
else:
attendance_list = [str(attendance_value).strip()]
print(f" πŸ“ Attendance items to add:")
for i, item in enumerate(attendance_list):
print(f" {i+1}. {item}")
replacement_text = "\n".join(attendance_list)
cell_replacements = replace_red_text_in_cell(target_cell, replacement_text)
replacements_made += cell_replacements
print(f" βœ… Added {len(attendance_list)} attendance items")
print(f" πŸ“Š Replacements made: {cell_replacements}")
return replacements_made
def fix_management_summary_details_column(table, flat_json):
"""Enhanced management summary processing with better data matching"""
replacements_made = 0
print(f" 🎯 FIX: Management Summary DETAILS column processing")
# Determine which type of management summary this is
table_text = ""
for row in table.rows[:3]:
for cell in row.cells:
table_text += get_clean_text(cell).lower() + " "
mgmt_types = []
if "mass management" in table_text or "mass" in table_text:
mgmt_types.append("Mass Management Summary")
if "maintenance management" in table_text or "maintenance" in table_text:
mgmt_types.append("Maintenance Management Summary")
if "fatigue management" in table_text or "fatigue" in table_text:
mgmt_types.append("Fatigue Management Summary")
# Fallback detection
if not mgmt_types:
if any("std 5" in get_clean_text(c).lower() for r in table.rows for c in r.cells):
mgmt_types.append("Mass Management Summary")
if not mgmt_types:
print(f" ⚠️ Could not determine management summary type")
return 0
for mgmt_type in mgmt_types:
print(f" βœ… Confirmed {mgmt_type} table processing")
# Look for management data in the JSON
mgmt_data = None
# Try direct key match first
if mgmt_type in flat_json:
mgmt_data = flat_json[mgmt_type]
# Try variations of the key
if not mgmt_data:
for key in flat_json.keys():
key_lower = key.lower()
mgmt_lower = mgmt_type.lower()
if mgmt_lower in key_lower or key_lower in mgmt_lower:
mgmt_data = flat_json[key]
print(f" βœ… Found data using key variation: '{key}'")
break
# If still no data, look for individual standard data
if not mgmt_data:
# Collect individual standard entries
mgmt_data = {}
for key, value in flat_json.items():
key_lower = key.lower()
# Look for standard entries related to this management type
if ("std " in key_lower and
(("mass" in mgmt_type.lower() and any(term in key_lower for term in ["verification", "internal review"])) or
("maintenance" in mgmt_type.lower() and any(term in key_lower for term in ["daily check", "internal review"])) or
("fatigue" in mgmt_type.lower() and any(term in key_lower for term in ["internal review"])))):
mgmt_data[key] = value
if mgmt_data:
print(f" βœ… Collected individual standard data: {list(mgmt_data.keys())}")
if not mgmt_data or not isinstance(mgmt_data, dict):
print(f" ⚠️ No JSON management dict found for {mgmt_type}, skipping this type")
continue
# Process the table rows
for row_idx, row in enumerate(table.rows):
if len(row.cells) >= 2:
standard_cell = row.cells[0]
details_cell = row.cells[1]
standard_text = get_clean_text(standard_cell).strip().lower()
# Skip header rows
if "standard" in standard_text or "requirement" in standard_text or "details" in standard_text:
continue
# Look for specific standards
if "std 5" in standard_text or "verification" in standard_text:
if has_red_text(details_cell):
std_val = find_best_standard_value(mgmt_data, ["Std 5. Verification", "Std 5 Verification", "Std 5", "Verification"])
if std_val:
replacement_text = get_value_as_string(std_val, "Std 5. Verification")
cell_replacements = replace_red_text_in_cell(details_cell, replacement_text)
replacements_made += cell_replacements
if cell_replacements:
print(f" βœ… Replaced Std 5. Verification details for {mgmt_type}")
elif "std 6" in standard_text or "internal review" in standard_text:
if has_red_text(details_cell):
std_val = find_best_standard_value(mgmt_data, ["Std 6. Internal Review", "Std 6 Internal Review", "Std 6", "Internal Review"])
if std_val:
replacement_text = get_value_as_string(std_val, "Std 6. Internal Review")
cell_replacements = replace_red_text_in_cell(details_cell, replacement_text)
replacements_made += cell_replacements
if cell_replacements:
print(f" βœ… Replaced Std 6. Internal Review details for {mgmt_type}")
elif "std 1" in standard_text or "daily check" in standard_text:
if has_red_text(details_cell):
std_val = find_best_standard_value(mgmt_data, ["Std 1. Daily Check", "Std 1 Daily Check", "Std 1", "Daily Check"])
if std_val:
replacement_text = get_value_as_string(std_val, "Std 1. Daily Check")
cell_replacements = replace_red_text_in_cell(details_cell, replacement_text)
replacements_made += cell_replacements
if cell_replacements:
print(f" βœ… Replaced Std 1. Daily Check details for {mgmt_type}")
elif "std 7" in standard_text:
if has_red_text(details_cell):
std_val = find_best_standard_value(mgmt_data, ["Std 7. Internal Review", "Std 7 Internal Review", "Std 7"])
if std_val:
replacement_text = get_value_as_string(std_val, "Std 7. Internal Review")
cell_replacements = replace_red_text_in_cell(details_cell, replacement_text)
replacements_made += cell_replacements
if cell_replacements:
print(f" βœ… Replaced Std 7. Internal Review details for {mgmt_type}")
return replacements_made
def find_best_standard_value(mgmt_data, candidate_keys):
"""Find the best matching value for a standard from management data"""
for candidate in candidate_keys:
if candidate in mgmt_data:
return mgmt_data[candidate]
# Try fuzzy matching
for key, value in mgmt_data.items():
for candidate in candidate_keys:
if candidate.lower() in key.lower() or key.lower() in candidate.lower():
return value
return None
# ============================================================================
# Canonical operator declaration fixer β€” SAFER
# ============================================================================
def fix_operator_declaration_empty_values(table, flat_json):
"""
FIXED: Properly distinguish between auditor and operator data for Operator Declaration table
"""
replacements_made = 0
print(f" 🎯 FIX: Operator Declaration empty values processing")
# Verify this is actually an operator declaration table
table_context = ""
for row in table.rows:
for cell in row.cells:
table_context += get_clean_text(cell).lower() + " "
if not ("print name" in table_context and "position title" in table_context):
return 0
print(f" βœ… Confirmed Operator Declaration table")
def parse_name_and_position(value):
"""Enhanced parsing for name/position combinations"""
if value is None:
return None, None
if isinstance(value, list):
if len(value) == 0:
return None, None
if len(value) == 1:
# Check if single item looks like "Name - Position" format
single_item = str(value[0]).strip()
if ' - ' in single_item:
parts = single_item.split(' - ', 1)
if len(parts) == 2:
return parts[0].strip(), parts[1].strip()
return single_item, None
# Handle [name, position] pattern or multiple attendance entries
if len(value) == 2:
first = str(value[0]).strip()
second = str(value[1]).strip()
# Check if both look like names (attendance list pattern)
if (' ' in first and ' ' in second and
not any(role in first.lower() for role in ['manager', 'director', 'auditor', 'officer']) and
not any(role in second.lower() for role in ['manager', 'director', 'auditor', 'officer'])):
# This is likely attendance list data, return first name only
return first, None
return first, second
# Multiple items - check if it's attendance list format
attendance_like = any(' - ' in str(item) for item in value)
if attendance_like:
# Extract first person's name from attendance format
first_entry = str(value[0]).strip()
if ' - ' in first_entry:
return first_entry.split(' - ')[0].strip(), first_entry.split(' - ')[1].strip()
return first_entry, None
# Join list elements as fallback
value = " ".join(str(v).strip() for v in value if str(v).strip())
s = str(value).strip()
if not s:
return None, None
# Split on common separators
separators = [r'\s+[-–—]\s+', r'\s*,\s*', r'\s*\|\s*', r'\s*;\s*']
parts = None
for sep_pattern in separators:
parts = re.split(sep_pattern, s)
if len(parts) >= 2:
break
if parts and len(parts) >= 2:
left = parts[0].strip()
right = parts[1].strip()
# Check which part is more likely to be a position
role_indicators = ['manager', 'auditor', 'owner', 'director', 'supervisor',
'coordinator', 'driver', 'operator', 'representative', 'chief',
'president', 'ceo', 'cfo', 'secretary', 'treasurer', 'officer',
'compliance']
right_has_role = any(ind in right.lower() for ind in role_indicators)
left_has_role = any(ind in left.lower() for ind in role_indicators)
if right_has_role and not left_has_role:
return left, right # Standard: name, position
elif left_has_role and not right_has_role:
return right, left # Reversed: position, name
else:
# Default to left=name, right=position
return left, right
# Look for single word position at end
tokens = s.split()
if len(tokens) >= 2:
last_token = tokens[-1].lower()
role_indicators = ['manager', 'auditor', 'owner', 'director', 'supervisor',
'coordinator', 'driver', 'operator', 'representative', 'chief', 'officer']
if any(ind == last_token for ind in role_indicators):
return " ".join(tokens[:-1]), tokens[-1]
return s, None
def looks_like_role(s: str) -> bool:
"""Check if string looks like a job role/position"""
if not s:
return False
s = s.lower().strip()
# Common role words
roles = ['manager', 'auditor', 'owner', 'director', 'supervisor',
'coordinator', 'driver', 'operator', 'representative', 'chief',
'president', 'ceo', 'cfo', 'secretary', 'treasurer', 'officer']
# Direct role match
if any(role in s for role in roles):
return True
# Short descriptive terms (likely roles)
if len(s.split()) <= 3 and any(c.isalpha() for c in s) and len(s) > 1:
return True
return False
def looks_like_person_name(s: str) -> bool:
"""Check if string looks like a person's name"""
if not s:
return False
s = s.strip()
# Exclude company-like terms
company_terms = ['pty ltd', 'ltd', 'inc', 'corp', 'company', 'llc', 'plc']
s_lower = s.lower()
if any(term in s_lower for term in company_terms):
return False
# Should have letters and reasonable length
if len(s) > 1 and any(c.isalpha() for c in s):
return True
return False
# Process the table
for row_idx, row in enumerate(table.rows):
if len(row.cells) >= 2:
cell1_text = get_clean_text(row.cells[0]).strip().lower()
cell2_text = get_clean_text(row.cells[1]).strip().lower()
# Detect header row
if "print name" in cell1_text and "position" in cell2_text:
print(f" πŸ“Œ Found header row at {row_idx + 1}")
# Process data row (next row after header)
if row_idx + 1 < len(table.rows):
data_row = table.rows[row_idx + 1]
if len(data_row.cells) >= 2:
name_cell = data_row.cells[0]
position_cell = data_row.cells[1]
current_name = get_clean_text(name_cell).strip()
current_position = get_clean_text(position_cell).strip()
print(f" πŸ“‹ Current values: Name='{current_name}', Position='{current_position}'")
# IMPROVED: More comprehensive search for operator declaration data
final_name = None
final_position = None
# IMPROVED: Better strategy to find OPERATOR (not auditor) data
final_name = None
final_position = None
# Strategy 1: Look specifically in Attendance List for operator names
attendance_kv = find_matching_json_key_and_value("Attendance List (Names and Position Titles)", flat_json)
if attendance_kv and attendance_kv[1]:
attendance_data = attendance_kv[1]
print(f" πŸ“‹ Found attendance data: {attendance_data}")
# Parse attendance list to find non-auditor names
if isinstance(attendance_data, list):
for entry in attendance_data:
entry_str = str(entry).strip()
if 'auditor' not in entry_str.lower() and entry_str:
# Parse this entry for name and position
parsed_name, parsed_pos = parse_name_and_position(entry_str)
if parsed_name and looks_like_person_name(parsed_name):
final_name = parsed_name
if parsed_pos and looks_like_role(parsed_pos):
final_position = parsed_pos
break
# Strategy 2: If no good name from attendance, try nested attendance keys
if not final_name:
nested_attendance_kv = find_matching_json_key_and_value("Attendance List (Names and Position Titles).Attendance List (Names and Position Titles)", flat_json)
if nested_attendance_kv and nested_attendance_kv[1]:
nested_data = nested_attendance_kv[1]
print(f" πŸ“‹ Found nested attendance data: {nested_data}")
if isinstance(nested_data, list):
for entry in nested_data:
entry_str = str(entry).strip()
if 'auditor' not in entry_str.lower() and entry_str:
parsed_name, parsed_pos = parse_name_and_position(entry_str)
if parsed_name and looks_like_person_name(parsed_name):
final_name = parsed_name
if parsed_pos and looks_like_role(parsed_pos):
final_position = parsed_pos
break
# Strategy 3: Direct operator declaration keys (with filtering)
if not final_name:
search_strategies = [
("Operator Declaration.Print Name", "Operator Declaration.Position Title"),
("Print Name", "Position Title"),
]
for name_key_pattern, pos_key_pattern in search_strategies:
name_kv = find_matching_json_key_and_value(name_key_pattern, flat_json)
pos_kv = find_matching_json_key_and_value(pos_key_pattern, flat_json)
if name_kv and name_kv[1]:
# Filter out auditor names
potential_name = str(name_kv[1]).strip()
# Skip if this is clearly auditor data
if name_kv[0] and 'auditor' in name_kv[0].lower():
continue
# Skip common auditor names that appear in our data
auditor_names = ['greg dyer', 'greg', 'dyer']
if any(aud_name in potential_name.lower() for aud_name in auditor_names):
continue
name_from_val, pos_from_val = parse_name_and_position(name_kv[1])
if name_from_val and looks_like_person_name(name_from_val):
# Additional check - avoid auditor names
if not any(aud_name in name_from_val.lower() for aud_name in auditor_names):
final_name = name_from_val
if pos_from_val and looks_like_role(pos_from_val):
final_position = pos_from_val
if pos_kv and pos_kv[1] and not final_position:
# Only use if key doesn't indicate auditor data
if not (pos_kv[0] and 'auditor' in pos_kv[0].lower()):
pos_val = str(pos_kv[1]).strip()
if looks_like_role(pos_val) and 'auditor' not in pos_val.lower():
final_position = pos_val
if final_name:
break
# Strategy 4: Last resort - search all keys but with strict filtering
if not final_name:
print(f" πŸ” Searching all keys with strict operator filtering...")
for key, value in flat_json.items():
key_lower = key.lower()
# Skip keys that clearly relate to auditor
if 'auditor' in key_lower:
continue
# Look for operator-related keys
if (("operator" in key_lower and "name" in key_lower) or
("print name" in key_lower and "operator" in key_lower)):
if value and looks_like_person_name(str(value)):
potential_name = str(value).strip()
# Skip auditor names
auditor_names = ['greg dyer', 'greg', 'dyer']
if not any(aud_name in potential_name.lower() for aud_name in auditor_names):
name_from_val, pos_from_val = parse_name_and_position(value)
if name_from_val and looks_like_person_name(name_from_val):
final_name = name_from_val
if pos_from_val and looks_like_role(pos_from_val):
final_position = pos_from_val
break
# Clean up final values
if isinstance(final_name, (list, tuple)):
final_name = " ".join(str(x) for x in final_name).strip()
if isinstance(final_position, (list, tuple)):
final_position = " ".join(str(x) for x in final_position).strip()
final_name = str(final_name).strip() if final_name else None
final_position = str(final_position).strip() if final_position else None
print(f" 🎯 Final extracted values: Name='{final_name}', Position='{final_position}'")
# Update name cell if needed
if (not current_name or has_red_text(name_cell)) and final_name and looks_like_person_name(final_name):
if has_red_text(name_cell):
replace_red_text_in_cell(name_cell, final_name)
else:
name_cell.text = final_name
replacements_made += 1
print(f" βœ… Updated Print Name -> '{final_name}'")
# Update position cell if needed
if (not current_position or has_red_text(position_cell)) and final_position and looks_like_role(final_position):
if has_red_text(position_cell):
replace_red_text_in_cell(position_cell, final_position)
else:
position_cell.text = final_position
replacements_made += 1
print(f" βœ… Updated Position Title -> '{final_position}'")
break # Found and processed the header row
# Mark table as processed
if replacements_made > 0:
try:
setattr(table, "_processed_operator_declaration", True)
print(" πŸ”– Marked table as processed by Operator Declaration handler")
except Exception:
pass
return replacements_made
def handle_multiple_red_segments_in_cell(cell, flat_json):
replacements_made = 0
red_segments = extract_red_text_segments(cell)
if not red_segments:
return 0
for i, segment in enumerate(red_segments):
segment_text = segment['text'].strip()
if segment_text:
kv = find_matching_json_key_and_value(segment_text, flat_json)
if kv:
replacement_text = get_value_as_string(kv[1], segment_text)
if replace_single_segment(segment, replacement_text):
replacements_made += 1
print(f" βœ… Replaced segment {i+1}: '{segment_text}' -> '{replacement_text}'")
return replacements_made
def handle_nature_business_multiline_fix(cell, flat_json):
replacements_made = 0
red_text = ""
for paragraph in cell.paragraphs:
for run in paragraph.runs:
if is_red(run):
red_text += run.text
red_text = red_text.strip()
if not red_text:
return 0
nature_indicators = ["transport", "logistics", "freight", "delivery", "trucking", "haulage"]
if any(indicator in red_text.lower() for indicator in nature_indicators):
kv = find_matching_json_key_and_value("Nature of Business", flat_json) or find_matching_json_key_and_value("Nature of the Operators Business (Summary)", flat_json)
if kv:
replacement_text = get_value_as_string(kv[1], "Nature of Business")
cell_replacements = replace_red_text_in_cell(cell, replacement_text)
replacements_made += cell_replacements
print(f" βœ… Fixed Nature of Business multiline content")
return replacements_made
def handle_management_summary_fix(cell, flat_json):
replacements_made = 0
red_text = ""
for paragraph in cell.paragraphs:
for run in paragraph.runs:
if is_red(run):
red_text += run.text
red_text = red_text.strip()
if not red_text:
return 0
management_types = ["Mass Management Summary", "Maintenance Management Summary", "Fatigue Management Summary"]
for mgmt_type in management_types:
if mgmt_type in flat_json and isinstance(flat_json[mgmt_type], dict):
mgmt_data = flat_json[mgmt_type]
for std_key, std_value in mgmt_data.items():
if isinstance(std_value, list) and std_value:
if len(red_text) > 10:
for item in std_value:
if red_text.lower() in str(item).lower() or str(item).lower() in red_text.lower():
replacement_text = "\n".join(str(i) for i in std_value)
cell_replacements = replace_red_text_in_cell(cell, replacement_text)
replacements_made += cell_replacements
print(f" βœ… Fixed {mgmt_type} - {std_key}")
return replacements_made
return replacements_made
def handle_print_accreditation_section(table, flat_json):
replacements_made = 0
if getattr(table, "_processed_operator_declaration", False):
print(f" ⏭️ Skipping Print Accreditation - this is an Operator Declaration table")
return 0
table_context = ""
for row in table.rows:
for cell in row.cells:
table_context += get_clean_text(cell).lower() + " "
if "operator declaration" in table_context or ("print name" in table_context and "position title" in table_context):
print(f" ⏭️ Skipping Print Accreditation - this is an Operator Declaration table")
return 0
print(f" πŸ“‹ Processing Print Accreditation section")
for row_idx, row in enumerate(table.rows):
for cell_idx, cell in enumerate(row.cells):
if has_red_text(cell):
accreditation_fields = [
"(print accreditation name)",
"Operator name (Legal entity)",
"Print accreditation name"
]
for field in accreditation_fields:
kv = find_matching_json_key_and_value(field, flat_json)
if kv:
replacement_text = get_value_as_string(kv[1], field)
if replacement_text.strip():
cell_replacements = replace_red_text_in_cell(cell, replacement_text)
replacements_made += cell_replacements
if cell_replacements > 0:
print(f" βœ… Fixed accreditation: {kv[0]}")
break
return replacements_made
def process_single_column_sections(cell, key_text, flat_json):
replacements_made = 0
if has_red_text(cell):
red_text = ""
for paragraph in cell.paragraphs:
for run in paragraph.runs:
if is_red(run):
red_text += run.text
if red_text.strip():
kv = find_matching_json_key_and_value(red_text.strip(), flat_json)
if not kv:
kv = find_matching_json_key_and_value(key_text, flat_json)
if kv:
section_replacement = get_value_as_string(kv[1], red_text.strip())
cell_replacements = replace_red_text_in_cell(cell, section_replacement)
replacements_made += cell_replacements
if cell_replacements > 0:
print(f" βœ… Fixed single column section: '{key_text}'")
return replacements_made
# ============================================================================
# Main table/paragraph/heading processing (preserve logic + use new helpers)
# ============================================================================
def process_tables(document, flat_json):
replacements_made = 0
for table_idx, table in enumerate(document.tables):
print(f"\nπŸ” Processing table {table_idx + 1}:")
table_text = ""
for row in table.rows[:3]:
for cell in row.cells:
table_text += get_clean_text(cell).lower() + " "
management_summary_indicators = ["mass management", "maintenance management", "fatigue management"]
has_management = any(indicator in table_text for indicator in management_summary_indicators)
has_details = "details" in table_text
if has_management and has_details:
print(f" πŸ“‹ Detected Management Summary table")
summary_fixes = fix_management_summary_details_column(table, flat_json)
replacements_made += summary_fixes
summary_replacements = 0
for row_idx, row in enumerate(table.rows):
for cell_idx, cell in enumerate(row.cells):
if has_red_text(cell):
for mgmt_type in ["Mass Management Summary", "Maintenance Management Summary", "Fatigue Management Summary"]:
if mgmt_type.lower().replace(" summary", "") in table_text:
if mgmt_type in flat_json:
mgmt_data = flat_json[mgmt_type]
if isinstance(mgmt_data, dict):
for std_key, std_value in mgmt_data.items():
if isinstance(std_value, list) and len(std_value) > 0:
red_text = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red(run)).strip()
for item in std_value:
if len(red_text) > 15 and red_text.lower() in str(item).lower():
replacement_text = "\n".join(str(i) for i in std_value)
cell_replacements = replace_red_text_in_cell(cell, replacement_text)
summary_replacements += cell_replacements
print(f" βœ… Updated {std_key} with summary data")
break
break
if summary_replacements == 0:
cell_replacements = handle_management_summary_fix(cell, flat_json)
summary_replacements += cell_replacements
replacements_made += summary_replacements
continue
# Vehicle tables detection
vehicle_indicators = ["registration number", "sub-contractor", "weight verification", "rfs suspension", "registration"]
indicator_count = sum(1 for indicator in vehicle_indicators if indicator in table_text)
if indicator_count >= 2:
print(f" πŸš— Detected Vehicle Registration table")
vehicle_replacements = handle_vehicle_registration_table(table, flat_json)
replacements_made += vehicle_replacements
continue
# Attendance
if "attendance list" in table_text and "names and position titles" in table_text:
print(f" πŸ‘₯ Detected Attendance List table")
attendance_replacements = handle_attendance_list_table_enhanced(table, flat_json)
replacements_made += attendance_replacements
continue
# Print Accreditation / Operator Declaration
print_accreditation_indicators = ["print name", "position title"]
indicator_count = sum(1 for indicator in print_accreditation_indicators if indicator in table_text)
if indicator_count >= 2 or ("print name" in table_text and "position title" in table_text):
print(f" πŸ“‹ Detected Print Accreditation/Operator Declaration table")
declaration_fixes = fix_operator_declaration_empty_values(table, flat_json)
replacements_made += declaration_fixes
if not getattr(table, "_processed_operator_declaration", False):
print_accreditation_replacements = handle_print_accreditation_section(table, flat_json)
replacements_made += print_accreditation_replacements
continue
# Regular table rows handling (preserved)
for row_idx, row in enumerate(table.rows):
if len(row.cells) < 1:
continue
key_cell = row.cells[0]
key_text = get_clean_text(key_cell)
if not key_text:
continue
print(f" πŸ“Œ Row {row_idx + 1}: Key = '{key_text}'")
kv = find_matching_json_key_and_value(key_text, flat_json)
json_value = kv[1] if kv else None
if json_value is not None:
replacement_text = get_value_as_string(json_value, key_text)
# ACN handling
if ("australian company number" in key_text.lower() or "company number" in key_text.lower()) and isinstance(json_value, list):
cell_replacements = handle_australian_company_number(row, json_value)
replacements_made += cell_replacements
# section headers
elif ("attendance list" in key_text.lower() or "nature of" in key_text.lower()) and row_idx + 1 < len(table.rows):
print(f" βœ… Section header detected, checking next row...")
next_row = table.rows[row_idx + 1]
for cell_idx, cell in enumerate(next_row.cells):
if has_red_text(cell):
print(f" βœ… Found red text in next row, cell {cell_idx + 1}")
if isinstance(json_value, list):
section_text = "\n".join(str(item) for item in json_value)
else:
section_text = replacement_text
cell_replacements = replace_red_text_in_cell(cell, section_text)
replacements_made += cell_replacements
if cell_replacements > 0:
print(f" -> Replaced section content")
# single column
elif len(row.cells) == 1 or (len(row.cells) > 1 and not any(has_red_text(row.cells[i]) for i in range(1, len(row.cells)))):
if has_red_text(key_cell):
cell_replacements = process_single_column_sections(key_cell, key_text, flat_json)
replacements_made += cell_replacements
# key-value pairs
else:
for cell_idx in range(1, len(row.cells)):
value_cell = row.cells[cell_idx]
if has_red_text(value_cell):
print(f" βœ… Found red text in column {cell_idx + 1}")
cell_replacements = replace_red_text_in_cell(value_cell, replacement_text)
replacements_made += cell_replacements
else:
# fallback single cell red-text key
if len(row.cells) == 1 and has_red_text(key_cell):
red_text = ""
for paragraph in key_cell.paragraphs:
for run in paragraph.runs:
if is_red(run):
red_text += run.text
if red_text.strip():
kv2 = find_matching_json_key_and_value(red_text.strip(), flat_json)
if kv2:
section_replacement = get_value_as_string(kv2[1], red_text.strip())
cell_replacements = replace_red_text_in_cell(key_cell, section_replacement)
replacements_made += cell_replacements
# attempt multiple red-segments or surgical fixes
for cell_idx in range(len(row.cells)):
cell = row.cells[cell_idx]
if has_red_text(cell):
cell_replacements = handle_multiple_red_segments_in_cell(cell, flat_json)
replacements_made += cell_replacements
if cell_replacements == 0:
surgical_fix = handle_nature_business_multiline_fix(cell, flat_json)
replacements_made += surgical_fix
if cell_replacements == 0:
management_summary_fix = handle_management_summary_fix(cell, flat_json)
replacements_made += management_summary_fix
# Final operator/auditor declaration check on last few tables
print(f"\n🎯 Final check for Declaration tables...")
for table in document.tables[-3:]:
if len(table.rows) <= 4:
if getattr(table, "_processed_operator_declaration", False):
print(f" ⏭️ Skipping - already processed by operator declaration handler")
continue
declaration_fix = fix_operator_declaration_empty_values(table, flat_json)
replacements_made += declaration_fix
return replacements_made
def process_paragraphs(document, flat_json):
replacements_made = 0
print(f"\nπŸ” Processing paragraphs:")
for para_idx, paragraph in enumerate(document.paragraphs):
red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()]
if red_runs:
red_text_only = "".join(run.text for run in red_runs).strip()
print(f" πŸ“Œ Paragraph {para_idx + 1}: Found red text: '{red_text_only}'")
kv = find_matching_json_key_and_value(red_text_only, flat_json)
json_value = kv[1] if kv else None
if json_value is None:
if "AUDITOR SIGNATURE" in red_text_only.upper() or "DATE" in red_text_only.upper():
kv = find_matching_json_key_and_value("auditor signature", flat_json)
elif "OPERATOR SIGNATURE" in red_text_only.upper():
kv = find_matching_json_key_and_value("operator signature", flat_json)
json_value = kv[1] if kv else None
if json_value is not None:
replacement_text = get_value_as_string(json_value)
print(f" βœ… Replacing red text with: '{replacement_text}'")
red_runs[0].text = replacement_text
red_runs[0].font.color.rgb = RGBColor(0, 0, 0)
for run in red_runs[1:]:
run.text = ''
replacements_made += 1
return replacements_made
def process_headings(document, flat_json):
"""
IMPROVED: Better heading processing that avoids mixing company data
"""
replacements_made = 0
print(f"\nπŸ” Processing headings:")
paragraphs = document.paragraphs
# Extract the correct operator name from the JSON data
operator_name = None
for key, value in flat_json.items():
if "operator name" in key.lower() and "legal entity" in key.lower():
if isinstance(value, list) and value:
operator_name = str(value[0]).strip()
else:
operator_name = str(value).strip()
break
if not operator_name:
# Fallback - try other operator name keys
for key, value in flat_json.items():
if ("operator" in key.lower() and "name" in key.lower()) or key.lower() == "operator name":
if isinstance(value, list) and value:
operator_name = str(value[0]).strip()
elif value:
operator_name = str(value).strip()
break
print(f" πŸ“‹ Using operator name: '{operator_name}'")
for para_idx, paragraph in enumerate(paragraphs):
paragraph_text = paragraph.text.strip()
if not paragraph_text:
continue
matched_heading = None
for category, patterns in HEADING_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, paragraph_text, re.IGNORECASE):
matched_heading = pattern
break
if matched_heading:
break
if matched_heading:
print(f" πŸ“Œ Found heading at paragraph {para_idx + 1}: '{paragraph_text}'")
# Check if the heading itself has red text
if has_red_text_in_paragraph(paragraph):
print(f" πŸ”΄ Found red text in heading itself")
heading_replacements = process_red_text_in_heading_paragraph(paragraph, paragraph_text, flat_json, operator_name)
replacements_made += heading_replacements
# Look for red text in paragraphs immediately following this heading
for next_para_offset in range(1, 6):
next_para_idx = para_idx + next_para_offset
if next_para_idx >= len(paragraphs):
break
next_paragraph = paragraphs[next_para_idx]
next_text = next_paragraph.text.strip()
if not next_text:
continue
# Stop if we hit another heading
is_another_heading = False
for category, patterns in HEADING_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, next_text, re.IGNORECASE):
is_another_heading = True
break
if is_another_heading:
break
if is_another_heading:
break
if has_red_text_in_paragraph(next_paragraph):
print(f" πŸ”΄ Found red text in paragraph {next_para_idx + 1} after heading")
context_replacements = process_red_text_in_context_paragraph(
next_paragraph,
paragraph_text,
flat_json,
operator_name
)
replacements_made += context_replacements
return replacements_made
def process_red_text_in_paragraph(paragraph, context_text, flat_json):
replacements_made = 0
red_text_segments = []
for run in paragraph.runs:
if is_red(run) and run.text.strip():
red_text_segments.append(run.text.strip())
if not red_text_segments:
return 0
combined_red_text = " ".join(red_text_segments).strip()
print(f" πŸ” Red text found: '{combined_red_text}'")
kv = find_matching_json_key_and_value(combined_red_text, flat_json)
json_value = kv[1] if kv else None
if json_value is None:
if "NHVAS APPROVED AUDITOR" in context_text.upper():
auditor_fields = ["auditor name", "auditor", "nhvas auditor", "approved auditor", "print name"]
for field in auditor_fields:
kv = find_matching_json_key_and_value(field, flat_json)
if kv:
print(f" βœ… Found auditor match with field: '{kv[0]}'")
json_value = kv[1]
break
elif "OPERATOR DECLARATION" in context_text.upper():
operator_fields = ["operator name", "operator", "company name", "organisation name", "print name"]
for field in operator_fields:
kv = find_matching_json_key_and_value(field, flat_json)
if kv:
print(f" βœ… Found operator match with field: '{kv[0]}'")
json_value = kv[1]
break
if json_value is None:
context_queries = [f"{context_text} {combined_red_text}", combined_red_text, context_text]
for query in context_queries:
kv = find_matching_json_key_and_value(query, flat_json)
if kv:
print(f" βœ… Found match with combined query -> {kv[0]}")
json_value = kv[1]
break
if json_value is not None:
replacement_text = get_value_as_string(json_value, combined_red_text)
red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()]
if red_runs:
red_runs[0].text = replacement_text
red_runs[0].font.color.rgb = RGBColor(0, 0, 0)
for run in red_runs[1:]:
run.text = ''
replacements_made = 1
print(f" βœ… Replaced with: '{replacement_text}'")
else:
print(f" ❌ No match found for red text: '{combined_red_text}'")
return replacements_made
def process_red_text_in_context_paragraph(paragraph, heading_text, flat_json, operator_name):
"""Process red text found in paragraphs following headings"""
replacements_made = 0
red_text_segments = []
for run in paragraph.runs:
if is_red(run) and run.text.strip():
red_text_segments.append(run.text.strip())
if not red_text_segments:
return 0
combined_red_text = " ".join(red_text_segments).strip()
print(f" πŸ” Red text found: '{combined_red_text}'")
replacement_value = None
# Determine what to replace based on heading context
if any(mgmt_type in heading_text.upper() for mgmt_type in ["MAINTENANCE MANAGEMENT", "MASS MANAGEMENT", "FATIGUE MANAGEMENT"]):
# For management section headings, replace with operator name
if operator_name:
replacement_value = operator_name
print(f" βœ… Using operator name for management section: '{operator_name}'")
elif "NHVAS APPROVED AUDITOR DECLARATION" in heading_text.upper():
# For auditor declarations, look for auditor name
auditor_name = None
for key, value in flat_json.items():
if "auditor" in key.lower() and "name" in key.lower():
if isinstance(value, list) and value:
auditor_name = str(value[0]).strip()
elif value:
auditor_name = str(value).strip()
break
if auditor_name:
replacement_value = auditor_name
print(f" βœ… Using auditor name: '{auditor_name}'")
elif "OPERATOR DECLARATION" in heading_text.upper():
# For operator declarations, use operator name
if operator_name:
replacement_value = operator_name
print(f" βœ… Using operator name for operator declaration: '{operator_name}'")
else:
# For other headings, try to find a relevant match
# First try direct match
kv = find_matching_json_key_and_value(combined_red_text, flat_json)
if kv:
replacement_value = get_value_as_string(kv[1], combined_red_text)
else:
# Try contextual search with heading
context_queries = [f"{heading_text} {combined_red_text}", combined_red_text, heading_text]
for query in context_queries:
kv = find_matching_json_key_and_value(query, flat_json)
if kv:
replacement_value = get_value_as_string(kv[1], combined_red_text)
print(f" βœ… Found match with combined query: {kv[0]}")
break
# Apply the replacement if we found a suitable value
if replacement_value:
red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()]
if red_runs:
red_runs[0].text = replacement_value
red_runs[0].font.color.rgb = RGBColor(0, 0, 0)
for run in red_runs[1:]:
run.text = ''
replacements_made = 1
print(f" βœ… Replaced with: '{replacement_value}'")
else:
print(f" ❌ No suitable replacement found for: '{combined_red_text}'")
return replacements_made
# ============================================================================
# Orchestrator
# ============================================================================
def process_hf(json_file, docx_file, output_file):
try:
if hasattr(json_file, "read"):
json_data = json.load(json_file)
else:
with open(json_file, 'r', encoding='utf-8') as f:
json_data = json.load(f)
flat_json = flatten_json(json_data)
print("πŸ“„ Available JSON keys (sample):")
for i, (key, value) in enumerate(sorted(flat_json.items())):
if i < 10:
print(f" - {key}: {value}")
print(f" ... and {len(flat_json) - 10} more keys\n")
if hasattr(docx_file, "read"):
doc = Document(docx_file)
else:
doc = Document(docx_file)
print("πŸš€ Starting comprehensive document processing...")
table_replacements = process_tables(doc, flat_json)
paragraph_replacements = process_paragraphs(doc, flat_json)
heading_replacements = process_headings(doc, flat_json)
red_text_para = process_red_text_in_paragraph(paragraph, context_text, flat_json)
total_replacements = table_replacements + paragraph_replacements + heading_replacements + red_text_para
# Save unmatched headers for iterative improvement
if _unmatched_headers:
try:
tmp_path = "/tmp/unmatched_headers.json"
with open(tmp_path, 'w', encoding='utf-8') as f:
json.dump(_unmatched_headers, f, indent=2, ensure_ascii=False)
print(f"βœ… Unmatched headers saved to {tmp_path}")
except Exception as e:
print(f"⚠️ Could not save unmatched headers: {e}")
if hasattr(output_file, "write"):
doc.save(output_file)
else:
doc.save(output_file)
print(f"\nβœ… Document saved as: {output_file}")
print(f"βœ… Total replacements: {total_replacements}")
print(f" πŸ“Š Tables: {table_replacements}")
print(f" πŸ“ Paragraphs: {paragraph_replacements}")
print(f" πŸ“‹ Headings: {heading_replacements}")
print(f"πŸŽ‰ Processing complete!")
except FileNotFoundError as e:
print(f"❌ File not found: {e}")
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
import sys
if len(sys.argv) != 4:
print("Usage: python pipeline.py <input_docx> <updated_json> <output_docx>")
exit(1)
docx_path = sys.argv[1]
json_path = sys.argv[2]
output_path = sys.argv[3]
process_hf(json_path, docx_path, output_path)