import json from docx import Document from docx.shared import RGBColor import re # Heading patterns for document structure detection HEADING_PATTERNS = { "main": [ r"NHVAS\s+Audit\s+Summary\s+Report", r"NATIONAL\s+HEAVY\s+VEHICLE\s+ACCREDITATION\s+AUDIT\s+SUMMARY\s+REPORT", r"NHVAS\s+AUDIT\s+SUMMARY\s+REPORT" ], "sub": [ r"AUDIT\s+OBSERVATIONS\s+AND\s+COMMENTS", r"MAINTENANCE\s+MANAGEMENT", r"MASS\s+MANAGEMENT", r"FATIGUE\s+MANAGEMENT", r"Fatigue\s+Management\s+Summary\s+of\s+Audit\s+findings", r"MAINTENANCE\s+MANAGEMENT\s+SUMMARY\s+OF\s+AUDIT\s+FINDINGS", r"MASS\s+MANAGEMENT\s+SUMMARY\s+OF\s+AUDIT\s+FINDINGS", r"Vehicle\s+Registration\s+Numbers\s+of\s+Records\s+Examined", r"CORRECTIVE\s+ACTION\s+REQUEST\s+\(CAR\)", r"NHVAS\s+APPROVED\s+AUDITOR\s+DECLARATION", r"Operator\s+Declaration", r"Operator\s+Information", r"Driver\s*/\s*Scheduler\s+Records\s+Examined" ] } # ============================================================================ # UTILITY FUNCTIONS # ============================================================================ def load_json(filepath): with open(filepath, 'r') as file: return json.load(file) def flatten_json(y, prefix=''): out = {} for key, val in y.items(): new_key = f"{prefix}.{key}" if prefix else key if isinstance(val, dict): out.update(flatten_json(val, new_key)) else: out[new_key] = val out[key] = val return out def is_red(run): color = run.font.color return color and (color.rgb == RGBColor(255, 0, 0) or getattr(color, "theme_color", None) == 1) def get_value_as_string(value, field_name=""): if isinstance(value, list): if len(value) == 0: return "" elif len(value) == 1: return str(value[0]) else: if "australian company number" in field_name.lower() or "company number" in field_name.lower(): return value else: return " ".join(str(v) for v in value) else: return str(value) def get_clean_text(cell): text = "" for paragraph in cell.paragraphs: for run in paragraph.runs: text += run.text return text.strip() def has_red_text(cell): for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run) and run.text.strip(): return True return False def has_red_text_in_paragraph(paragraph): for run in paragraph.runs: if is_red(run) and run.text.strip(): return True return False # ============================================================================ # JSON MATCHING FUNCTIONS # ============================================================================ def find_matching_json_value(field_name, flat_json): """Find matching value in JSON with multiple strategies""" field_name = field_name.strip() # Try exact match first if field_name in flat_json: print(f" ✅ Direct match found for key '{field_name}'") return flat_json[field_name] # Try case-insensitive exact match for key, value in flat_json.items(): if key.lower() == field_name.lower(): print(f" ✅ Case-insensitive match found for key '{field_name}' with JSON key '{key}'") return value # Better Print Name detection for operator vs auditor if field_name.lower().strip() == "print name": operator_keys = [k for k in flat_json.keys() if "operator" in k.lower() and "print name" in k.lower()] auditor_keys = [k for k in flat_json.keys() if "auditor" in k.lower() and ("print name" in k.lower() or "name" in k.lower())] if operator_keys: print(f" ✅ Operator Print Name match: '{field_name}' -> '{operator_keys[0]}'") return flat_json[operator_keys[0]] elif auditor_keys: print(f" ✅ Auditor Name match: '{field_name}' -> '{auditor_keys[0]}'") return flat_json[auditor_keys[0]] # Try suffix matching (for nested keys like "section.field") for key, value in flat_json.items(): if '.' in key and key.split('.')[-1].lower() == field_name.lower(): print(f" ✅ Suffix match found for key '{field_name}' with JSON key '{key}'") return value # Try partial matching - remove parentheses and special chars clean_field = re.sub(r'[^\w\s]', ' ', field_name.lower()).strip() clean_field = re.sub(r'\s+', ' ', clean_field) for key, value in flat_json.items(): clean_key = re.sub(r'[^\w\s]', ' ', key.lower()).strip() clean_key = re.sub(r'\s+', ' ', clean_key) if clean_field == clean_key: print(f" ✅ Clean match found for key '{field_name}' with JSON key '{key}'") return value # Enhanced fuzzy matching with better scoring field_words = set(word.lower() for word in re.findall(r'\b\w+\b', field_name) if len(word) > 2) if not field_words: return None best_match = None best_score = 0 best_key = None for key, value in flat_json.items(): key_words = set(word.lower() for word in re.findall(r'\b\w+\b', key) if len(word) > 2) if not key_words: continue # Calculate similarity score common_words = field_words.intersection(key_words) if common_words: # Use Jaccard similarity: intersection / union similarity = len(common_words) / len(field_words.union(key_words)) # Bonus for high word coverage in field_name coverage = len(common_words) / len(field_words) final_score = (similarity * 0.6) + (coverage * 0.4) if final_score > best_score: best_score = final_score best_match = value best_key = key if best_match and best_score >= 0.25: print(f" ✅ Fuzzy match found for key '{field_name}' with JSON key '{best_key}' (score: {best_score:.2f})") return best_match print(f" ❌ No match found for '{field_name}'") return None # ============================================================================ # RED TEXT PROCESSING FUNCTIONS # ============================================================================ def extract_red_text_segments(cell): """Extract red text segments from a cell""" red_segments = [] for para_idx, paragraph in enumerate(cell.paragraphs): current_segment = "" segment_runs = [] for run_idx, run in enumerate(paragraph.runs): if is_red(run): if run.text: current_segment += run.text segment_runs.append((para_idx, run_idx, run)) else: # End of current red segment if segment_runs: red_segments.append({ 'text': current_segment, 'runs': segment_runs.copy(), 'paragraph_idx': para_idx }) current_segment = "" segment_runs = [] # Handle segment at end of paragraph if segment_runs: red_segments.append({ 'text': current_segment, 'runs': segment_runs.copy(), 'paragraph_idx': para_idx }) return red_segments def replace_all_red_segments(red_segments, replacement_text): """Replace all red segments with replacement text""" if not red_segments: return 0 if '\n' in replacement_text: replacement_lines = replacement_text.split('\n') else: replacement_lines = [replacement_text] replacements_made = 0 if red_segments and replacement_lines: first_segment = red_segments[0] if first_segment['runs']: first_run = first_segment['runs'][0][2] first_run.text = replacement_lines[0] first_run.font.color.rgb = RGBColor(0, 0, 0) replacements_made = 1 for _, _, run in first_segment['runs'][1:]: run.text = '' for segment in red_segments[1:]: for _, _, run in segment['runs']: run.text = '' if len(replacement_lines) > 1 and red_segments: try: first_run = red_segments[0]['runs'][0][2] paragraph = first_run.element.getparent() for line in replacement_lines[1:]: if line.strip(): from docx.oxml import OxmlElement br = OxmlElement('w:br') first_run.element.append(br) new_run = paragraph.add_run(line.strip()) new_run.font.color.rgb = RGBColor(0, 0, 0) except: if red_segments and red_segments[0]['runs']: first_run = red_segments[0]['runs'][0][2] first_run.text = ' '.join(replacement_lines) first_run.font.color.rgb = RGBColor(0, 0, 0) return replacements_made def replace_single_segment(segment, replacement_text): """Replace a single red text segment""" if not segment['runs']: return False first_run = segment['runs'][0][2] first_run.text = replacement_text first_run.font.color.rgb = RGBColor(0, 0, 0) for _, _, run in segment['runs'][1:]: run.text = '' return True def replace_red_text_in_cell(cell, replacement_text): """Replace red text in a cell with replacement text""" red_segments = extract_red_text_segments(cell) if not red_segments: return 0 return replace_all_red_segments(red_segments, replacement_text) # ============================================================================ # SPECIALIZED TABLE HANDLERS # ============================================================================ def handle_australian_company_number(row, company_numbers): """Handle Australian Company Number digit placement""" replacements_made = 0 for i, digit in enumerate(company_numbers): cell_idx = i + 1 if cell_idx < len(row.cells): cell = row.cells[cell_idx] if has_red_text(cell): cell_replacements = replace_red_text_in_cell(cell, str(digit)) replacements_made += cell_replacements print(f" -> Placed digit '{digit}' in cell {cell_idx + 1}") return replacements_made def handle_vehicle_registration_table(table, flat_json): """Handle vehicle registration table data replacement""" replacements_made = 0 # Try to find vehicle registration data vehicle_section = None for key, value in flat_json.items(): if "vehicle registration numbers of records examined" in key.lower(): if isinstance(value, dict): vehicle_section = value print(f" ✅ Found vehicle data in key: '{key}'") break if not vehicle_section: potential_columns = {} for key, value in flat_json.items(): if any(col_name in key.lower() for col_name in ["registration number", "sub-contractor", "weight verification", "rfs suspension"]): if "." in key: column_name = key.split(".")[-1] else: column_name = key potential_columns[column_name] = value if potential_columns: vehicle_section = potential_columns print(f" ✅ Found vehicle data from flattened keys: {list(vehicle_section.keys())}") else: print(f" ❌ Vehicle registration data not found in JSON") return 0 print(f" ✅ Found vehicle registration data with {len(vehicle_section)} columns") # Find header row header_row_idx = -1 header_row = None for row_idx, row in enumerate(table.rows): row_text = "".join(get_clean_text(cell).lower() for cell in row.cells) if "registration" in row_text and "number" in row_text: header_row_idx = row_idx header_row = row break if header_row_idx == -1: print(f" ❌ Could not find header row in vehicle table") return 0 print(f" ✅ Found header row at index {header_row_idx}") # Enhanced column mapping column_mapping = {} for col_idx, cell in enumerate(header_row.cells): header_text = get_clean_text(cell).strip() if not header_text or header_text.lower() == "no.": continue best_match = None best_score = 0 normalized_header = header_text.lower().replace("(", " (").replace(")", ") ").strip() for json_key in vehicle_section.keys(): normalized_json = json_key.lower().strip() if normalized_header == normalized_json: best_match = json_key best_score = 1.0 break header_words = set(word.lower() for word in normalized_header.split() if len(word) > 2) json_words = set(word.lower() for word in normalized_json.split() if len(word) > 2) if header_words and json_words: common_words = header_words.intersection(json_words) score = len(common_words) / max(len(header_words), len(json_words)) if score > best_score and score >= 0.3: best_score = score best_match = json_key header_clean = normalized_header.replace(" ", "").replace("-", "").replace("(", "").replace(")", "") json_clean = normalized_json.replace(" ", "").replace("-", "").replace("(", "").replace(")", "") if header_clean in json_clean or json_clean in header_clean: if len(header_clean) > 5 and len(json_clean) > 5: substring_score = min(len(header_clean), len(json_clean)) / max(len(header_clean), len(json_clean)) if substring_score > best_score and substring_score >= 0.6: best_score = substring_score best_match = json_key if best_match: column_mapping[col_idx] = best_match print(f" 📌 Column {col_idx + 1} ('{header_text}') -> '{best_match}' (score: {best_score:.2f})") if not column_mapping: print(f" ❌ No column mappings found") return 0 # Determine data rows needed max_data_rows = 0 for json_key, data in vehicle_section.items(): if isinstance(data, list): max_data_rows = max(max_data_rows, len(data)) print(f" 📌 Need to populate {max_data_rows} data rows") # Process data rows for data_row_index in range(max_data_rows): table_row_idx = header_row_idx + 1 + data_row_index if table_row_idx >= len(table.rows): print(f" ⚠️ Row {table_row_idx + 1} doesn't exist - table only has {len(table.rows)} rows") print(f" ➕ Adding new row for vehicle {data_row_index + 1}") new_row = table.add_row() print(f" ✅ Successfully added row {len(table.rows)} to the table") row = table.rows[table_row_idx] print(f" 📌 Processing data row {table_row_idx + 1} (vehicle {data_row_index + 1})") for col_idx, json_key in column_mapping.items(): if col_idx < len(row.cells): cell = row.cells[col_idx] column_data = vehicle_section.get(json_key, []) if isinstance(column_data, list) and data_row_index < len(column_data): replacement_value = str(column_data[data_row_index]) cell_text = get_clean_text(cell) if has_red_text(cell) or not cell_text.strip(): if not cell_text.strip(): cell.text = replacement_value replacements_made += 1 print(f" -> Added '{replacement_value}' to empty cell (column '{json_key}')") else: cell_replacements = replace_red_text_in_cell(cell, replacement_value) replacements_made += cell_replacements if cell_replacements > 0: print(f" -> Replaced red text with '{replacement_value}' (column '{json_key}')") return replacements_made def handle_attendance_list_table_enhanced(table, flat_json): """Enhanced Attendance List processing with better detection""" replacements_made = 0 # Check multiple patterns for attendance list attendance_patterns = [ "attendance list", "names and position titles", "attendees" ] # Scan all cells in the first few rows for attendance list indicators found_attendance_row = None for row_idx, row in enumerate(table.rows[:3]): # Check first 3 rows for cell_idx, cell in enumerate(row.cells): cell_text = get_clean_text(cell).lower() # Check if this cell contains attendance list header if any(pattern in cell_text for pattern in attendance_patterns): found_attendance_row = row_idx print(f" 🎯 ENHANCED: Found Attendance List in row {row_idx + 1}, cell {cell_idx + 1}") break if found_attendance_row is not None: break if found_attendance_row is None: return 0 # Look for attendance data in JSON attendance_value = None attendance_search_keys = [ "Attendance List (Names and Position Titles).Attendance List (Names and Position Titles)", "Attendance List (Names and Position Titles)", "attendance list", "attendees" ] print(f" 🔍 Searching for attendance data in JSON...") for search_key in attendance_search_keys: attendance_value = find_matching_json_value(search_key, flat_json) if attendance_value is not None: print(f" ✅ Found attendance data with key: '{search_key}'") print(f" 📊 Raw value: {attendance_value}") break if attendance_value is None: print(f" ❌ No attendance data found in JSON") return 0 # Look for red text in ALL cells of the table target_cell = None print(f" 🔍 Scanning ALL cells in attendance table for red text...") for row_idx, row in enumerate(table.rows): for cell_idx, cell in enumerate(row.cells): if has_red_text(cell): print(f" 🎯 Found red text in row {row_idx + 1}, cell {cell_idx + 1}") # Get the red text to see if it looks like attendance data red_text = "" for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run): red_text += run.text print(f" 📋 Red text content: '{red_text[:50]}...'") # Check if this red text looks like attendance data (contains names/manager/etc) red_text_lower = red_text.lower() if any(indicator in red_text_lower for indicator in ['manager', 'herbig', 'palin', '–', '-']): target_cell = cell print(f" ✅ This looks like attendance data - using this cell") break if target_cell is not None: break # If no red text found that looks like attendance data, return if target_cell is None: print(f" ⚠️ No red text found that looks like attendance data") return 0 # Replace red text with properly formatted attendance list if has_red_text(target_cell): print(f" 🔧 Replacing red text with properly formatted attendance list...") # Ensure attendance_value is a list if isinstance(attendance_value, list): attendance_list = [str(item).strip() for item in attendance_value if str(item).strip()] else: attendance_list = [str(attendance_value).strip()] print(f" 📝 Attendance items to add:") for i, item in enumerate(attendance_list): print(f" {i+1}. {item}") # Replace with line-separated attendance list replacement_text = "\n".join(attendance_list) cell_replacements = replace_red_text_in_cell(target_cell, replacement_text) replacements_made += cell_replacements print(f" ✅ Added {len(attendance_list)} attendance items") print(f" 📊 Replacements made: {cell_replacements}") return replacements_made def fix_management_summary_details_column(table, flat_json): """Fix the DETAILS column in Management Summary table""" replacements_made = 0 print(f" 🎯 FIX: Management Summary DETAILS column processing") # Check if this is a Management Summary table table_text = "" for row in table.rows[:2]: for cell in row.cells: table_text += get_clean_text(cell).lower() + " " if not ("mass management" in table_text and "details" in table_text): return 0 print(f" ✅ Confirmed Mass Management Summary table") # Process each row looking for Std 5. and Std 6. with red text for row_idx, row in enumerate(table.rows): if len(row.cells) >= 2: standard_cell = row.cells[0] details_cell = row.cells[1] standard_text = get_clean_text(standard_cell).strip() # Look for Std 5. Verification and Std 6. Internal Review specifically if "Std 5." in standard_text and "Verification" in standard_text: if has_red_text(details_cell): print(f" 🔍 Found Std 5. Verification with red text") json_value = find_matching_json_value("Std 5. Verification", flat_json) if json_value is not None: replacement_text = get_value_as_string(json_value, "Std 5. Verification") cell_replacements = replace_red_text_in_cell(details_cell, replacement_text) replacements_made += cell_replacements print(f" ✅ Replaced Std 5. Verification details") elif "Std 6." in standard_text and "Internal Review" in standard_text: if has_red_text(details_cell): print(f" 🔍 Found Std 6. Internal Review with red text") json_value = find_matching_json_value("Std 6. Internal Review", flat_json) if json_value is not None: replacement_text = get_value_as_string(json_value, "Std 6. Internal Review") cell_replacements = replace_red_text_in_cell(details_cell, replacement_text) replacements_made += cell_replacements print(f" ✅ Replaced Std 6. Internal Review details") return replacements_made def fix_operator_declaration_empty_values(table, flat_json): """Fix Operator Declaration table when values are empty or need updating""" replacements_made = 0 print(f" 🎯 FIX: Operator Declaration empty values processing") # Check if this is an Operator Declaration table table_context = "" for row in table.rows: for cell in row.cells: table_context += get_clean_text(cell).lower() + " " if not ("print name" in table_context and "position title" in table_context): return 0 print(f" ✅ Confirmed Operator Declaration table") # Find the data row with Print Name and Position Title for row_idx, row in enumerate(table.rows): if len(row.cells) >= 2: cell1_text = get_clean_text(row.cells[0]).strip().lower() cell2_text = get_clean_text(row.cells[1]).strip().lower() # Check if this is the header row if "print name" in cell1_text and "position" in cell2_text: print(f" 📌 Found header row at {row_idx + 1}") # Look for the data row (next row) if row_idx + 1 < len(table.rows): data_row = table.rows[row_idx + 1] if len(data_row.cells) >= 2: name_cell = data_row.cells[0] position_cell = data_row.cells[1] # Check if cells are empty or have red text name_text = get_clean_text(name_cell).strip() position_text = get_clean_text(position_cell).strip() print(f" 📋 Current values: Name='{name_text}', Position='{position_text}'") # Get the Operator Declaration section data operator_declaration = find_matching_json_value("Operator Declaration", flat_json) if operator_declaration and isinstance(operator_declaration, dict): print(f" 🔍 Found Operator Declaration data: {operator_declaration}") # Update Print Name if "Print Name" in operator_declaration: print_name_value = operator_declaration["Print Name"] if isinstance(print_name_value, list) and print_name_value: new_name = str(print_name_value[0]).strip() if new_name and "Pty Ltd" not in new_name and "Company" not in new_name: name_cell.text = new_name replacements_made += 1 print(f" ✅ Updated Print Name: '{name_text}' -> '{new_name}'") # Update Position Title if "Position Title" in operator_declaration: position_value = operator_declaration["Position Title"] if isinstance(position_value, list) and position_value: new_position = str(position_value[0]).strip() if new_position: position_cell.text = new_position replacements_made += 1 print(f" ✅ Updated Position Title: '{position_text}' -> '{new_position}'") else: print(f" ❌ No Operator Declaration section found in JSON") # Fallback: try individual fields name_value = find_matching_json_value("Operator Declaration.Print Name", flat_json) if name_value: new_name = get_value_as_string(name_value).strip() if new_name and "Pty Ltd" not in new_name: name_cell.text = new_name replacements_made += 1 print(f" ✅ Updated Print Name (fallback): '{new_name}'") position_value = find_matching_json_value("Operator Declaration.Position Title", flat_json) if position_value: new_position = get_value_as_string(position_value).strip() if new_position: position_cell.text = new_position replacements_made += 1 print(f" ✅ Updated Position Title (fallback): '{new_position}'") break return replacements_made def handle_multiple_red_segments_in_cell(cell, flat_json): """Handle multiple red text segments within a single cell""" replacements_made = 0 red_segments = extract_red_text_segments(cell) if not red_segments: return 0 # Try to match each segment individually for i, segment in enumerate(red_segments): segment_text = segment['text'].strip() if segment_text: json_value = find_matching_json_value(segment_text, flat_json) if json_value is not None: replacement_text = get_value_as_string(json_value, segment_text) if replace_single_segment(segment, replacement_text): replacements_made += 1 print(f" ✅ Replaced segment {i+1}: '{segment_text}' -> '{replacement_text}'") return replacements_made def handle_nature_business_multiline_fix(cell, flat_json): """Handle Nature of Business multiline red text""" replacements_made = 0 # Extract red text to check if it looks like nature of business red_text = "" for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run): red_text += run.text red_text = red_text.strip() if not red_text: return 0 # Check if this looks like nature of business content nature_indicators = ["transport", "logistics", "freight", "delivery", "trucking", "haulage"] if any(indicator in red_text.lower() for indicator in nature_indicators): # Try to find nature of business in JSON nature_value = find_matching_json_value("Nature of Business", flat_json) if nature_value is not None: replacement_text = get_value_as_string(nature_value, "Nature of Business") cell_replacements = replace_red_text_in_cell(cell, replacement_text) replacements_made += cell_replacements print(f" ✅ Fixed Nature of Business multiline content") return replacements_made def handle_management_summary_fix(cell, flat_json): """Handle Management Summary content fixes""" replacements_made = 0 # Extract red text red_text = "" for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run): red_text += run.text red_text = red_text.strip() if not red_text: return 0 # Look for management summary data in new schema format management_types = ["Mass Management Summary", "Maintenance Management Summary", "Fatigue Management Summary"] for mgmt_type in management_types: if mgmt_type in flat_json: mgmt_data = flat_json[mgmt_type] if isinstance(mgmt_data, dict): # Try to match red text with any standard in this management type for std_key, std_value in mgmt_data.items(): if isinstance(std_value, list) and std_value: # Check if red text matches this standard if len(red_text) > 10: for item in std_value: if red_text.lower() in str(item).lower() or str(item).lower() in red_text.lower(): replacement_text = "\n".join(str(i) for i in std_value) cell_replacements = replace_red_text_in_cell(cell, replacement_text) replacements_made += cell_replacements print(f" ✅ Fixed {mgmt_type} - {std_key}") return replacements_made return replacements_made def handle_operator_declaration_fix(table, flat_json): """Handle small Operator/Auditor Declaration tables""" replacements_made = 0 if len(table.rows) > 4: # Only process small tables return 0 # Get table context table_text = "" for row in table.rows: for cell in row.cells: table_text += get_clean_text(cell).lower() + " " # Check if this is a declaration table if not ("print name" in table_text or "signature" in table_text or "date" in table_text): return 0 print(f" 🎯 Processing declaration table") # Process each cell with red text for row_idx, row in enumerate(table.rows): for cell_idx, cell in enumerate(row.cells): if has_red_text(cell): # Try common declaration fields declaration_fields = [ "Print Name", "Position Title", "Signature", "Date", "Operator Declaration.Print Name", "Operator Declaration.Position Title", "NHVAS Approved Auditor Declaration.Print Name" ] replaced = False for field in declaration_fields: field_value = find_matching_json_value(field, flat_json) if field_value is not None: replacement_text = get_value_as_string(field_value, field) if replacement_text.strip(): cell_replacements = replace_red_text_in_cell(cell, replacement_text) if cell_replacements > 0: replacements_made += cell_replacements print(f" ✅ Fixed declaration field: {field}") replaced = True break # If no specific field match, try generic signature/date if not replaced: red_text = "" for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run): red_text += run.text if "signature" in red_text.lower(): cell_replacements = replace_red_text_in_cell(cell, "[Signature]") replacements_made += cell_replacements elif "date" in red_text.lower(): cell_replacements = replace_red_text_in_cell(cell, "[Date]") replacements_made += cell_replacements return replacements_made def handle_print_accreditation_section(table, flat_json): """Handle Print Accreditation section""" replacements_made = 0 print(f" 📋 Processing Print Accreditation section") for row_idx, row in enumerate(table.rows): for cell_idx, cell in enumerate(row.cells): if has_red_text(cell): # Try print accreditation fields accreditation_fields = [ "(print accreditation name)", "Print Name", "Operator name (Legal entity)" ] for field in accreditation_fields: field_value = find_matching_json_value(field, flat_json) if field_value is not None: replacement_text = get_value_as_string(field_value, field) if replacement_text.strip(): cell_replacements = replace_red_text_in_cell(cell, replacement_text) replacements_made += cell_replacements if cell_replacements > 0: print(f" ✅ Fixed accreditation: {field}") break return replacements_made def process_single_column_sections(cell, key_text, flat_json): """Process single column sections with red text""" replacements_made = 0 if has_red_text(cell): red_text = "" for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run): red_text += run.text if red_text.strip(): # Try direct matching first section_value = find_matching_json_value(red_text.strip(), flat_json) if section_value is None: # Try key-based matching section_value = find_matching_json_value(key_text, flat_json) if section_value is not None: section_replacement = get_value_as_string(section_value, red_text.strip()) cell_replacements = replace_red_text_in_cell(cell, section_replacement) replacements_made += cell_replacements if cell_replacements > 0: print(f" ✅ Fixed single column section: '{key_text}'") return replacements_made def process_tables(document, flat_json): """Process all tables in the document with comprehensive fixes""" replacements_made = 0 for table_idx, table in enumerate(document.tables): print(f"\n🔍 Processing table {table_idx + 1}:") # Get table context table_text = "" for row in table.rows[:3]: for cell in row.cells: table_text += get_clean_text(cell).lower() + " " # Detect Management Summary tables management_summary_indicators = ["mass management", "maintenance management", "fatigue management"] has_management = any(indicator in table_text for indicator in management_summary_indicators) has_details = "details" in table_text if has_management and has_details: print(f" 📋 Detected Management Summary table") summary_fixes = fix_management_summary_details_column(table, flat_json) replacements_made += summary_fixes # Process remaining red text in management summary summary_replacements = 0 for row_idx, row in enumerate(table.rows): for cell_idx, cell in enumerate(row.cells): if has_red_text(cell): # Try direct matching with the new schema names first for mgmt_type in ["Mass Management Summary", "Maintenance Management Summary", "Fatigue Management Summary"]: if mgmt_type.lower().replace(" summary", "") in table_text: # Look for this standard in the JSON if mgmt_type in flat_json: mgmt_data = flat_json[mgmt_type] if isinstance(mgmt_data, dict): # Find matching standard for std_key, std_value in mgmt_data.items(): if isinstance(std_value, list) and len(std_value) > 0: # Check if red text matches this standard data red_text = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red(run)).strip() for item in std_value: if len(red_text) > 15 and red_text.lower() in str(item).lower(): replacement_text = "\n".join(str(i) for i in std_value) cell_replacements = replace_red_text_in_cell(cell, replacement_text) summary_replacements += cell_replacements print(f" ✅ Updated {std_key} with summary data") break break # Fallback to existing method if summary_replacements == 0: cell_replacements = handle_management_summary_fix(cell, flat_json) summary_replacements += cell_replacements replacements_made += summary_replacements continue # Detect Vehicle Registration tables vehicle_indicators = ["registration number", "sub-contractor", "weight verification", "rfs suspension"] indicator_count = sum(1 for indicator in vehicle_indicators if indicator in table_text) if indicator_count >= 2: print(f" 🚗 Detected Vehicle Registration table") vehicle_replacements = handle_vehicle_registration_table(table, flat_json) replacements_made += vehicle_replacements continue # Detect Attendance List tables if "attendance list" in table_text and "names and position titles" in table_text: print(f" 👥 Detected Attendance List table") attendance_replacements = handle_attendance_list_table_enhanced(table, flat_json) replacements_made += attendance_replacements continue # Detect Print Accreditation tables print_accreditation_indicators = ["print name", "position title"] indicator_count = sum(1 for indicator in print_accreditation_indicators if indicator in table_text) if indicator_count >= 1: print(f" 📋 Detected Print Accreditation table") # Check for declaration tables that need fixing if "print name" in table_text and "position" in table_text: declaration_fixes = fix_operator_declaration_empty_values(table, flat_json) replacements_made += declaration_fixes print_accreditation_replacements = handle_print_accreditation_section(table, flat_json) replacements_made += print_accreditation_replacements continue # Process regular table rows for row_idx, row in enumerate(table.rows): if len(row.cells) < 1: continue key_cell = row.cells[0] key_text = get_clean_text(key_cell) if not key_text: continue print(f" 📌 Row {row_idx + 1}: Key = '{key_text}'") json_value = find_matching_json_value(key_text, flat_json) if json_value is not None: replacement_text = get_value_as_string(json_value, key_text) # Handle Australian Company Number if ("australian company number" in key_text.lower() or "company number" in key_text.lower()) and isinstance(json_value, list): cell_replacements = handle_australian_company_number(row, json_value) replacements_made += cell_replacements # Handle section headers elif ("attendance list" in key_text.lower() or "nature of" in key_text.lower()) and row_idx + 1 < len(table.rows): print(f" ✅ Section header detected, checking next row...") next_row = table.rows[row_idx + 1] for cell_idx, cell in enumerate(next_row.cells): if has_red_text(cell): print(f" ✅ Found red text in next row, cell {cell_idx + 1}") if isinstance(json_value, list): replacement_text = "\n".join(str(item) for item in json_value) cell_replacements = replace_red_text_in_cell(cell, replacement_text) replacements_made += cell_replacements if cell_replacements > 0: print(f" -> Replaced section content") # Handle single column sections elif len(row.cells) == 1 or (len(row.cells) > 1 and not any(has_red_text(row.cells[i]) for i in range(1, len(row.cells)))): if has_red_text(key_cell): cell_replacements = process_single_column_sections(key_cell, key_text, flat_json) replacements_made += cell_replacements # Handle regular key-value pairs else: for cell_idx in range(1, len(row.cells)): value_cell = row.cells[cell_idx] if has_red_text(value_cell): print(f" ✅ Found red text in column {cell_idx + 1}") cell_replacements = replace_red_text_in_cell(value_cell, replacement_text) replacements_made += cell_replacements else: # Fallback processing for unmatched keys if len(row.cells) == 1 and has_red_text(key_cell): red_text = "" for paragraph in key_cell.paragraphs: for run in paragraph.runs: if is_red(run): red_text += run.text if red_text.strip(): section_value = find_matching_json_value(red_text.strip(), flat_json) if section_value is not None: section_replacement = get_value_as_string(section_value, red_text.strip()) cell_replacements = replace_red_text_in_cell(key_cell, section_replacement) replacements_made += cell_replacements # Process red text in all cells for cell_idx in range(len(row.cells)): cell = row.cells[cell_idx] if has_red_text(cell): cell_replacements = handle_multiple_red_segments_in_cell(cell, flat_json) replacements_made += cell_replacements # Apply fixes if no replacements made if cell_replacements == 0: surgical_fix = handle_nature_business_multiline_fix(cell, flat_json) replacements_made += surgical_fix if cell_replacements == 0: management_summary_fix = handle_management_summary_fix(cell, flat_json) replacements_made += management_summary_fix # Handle Operator/Auditor Declaration tables (check last few tables) print(f"\n🎯 Final check for Declaration tables...") for table in document.tables[-3:]: if len(table.rows) <= 4: declaration_fix = handle_operator_declaration_fix(table, flat_json) replacements_made += declaration_fix return replacements_made def process_paragraphs(document, flat_json): """Process all paragraphs in the document""" replacements_made = 0 print(f"\n🔍 Processing paragraphs:") for para_idx, paragraph in enumerate(document.paragraphs): red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()] if red_runs: red_text_only = "".join(run.text for run in red_runs).strip() print(f" 📌 Paragraph {para_idx + 1}: Found red text: '{red_text_only}'") json_value = find_matching_json_value(red_text_only, flat_json) if json_value is None: # Enhanced pattern matching for signatures and dates if "AUDITOR SIGNATURE" in red_text_only.upper() or "DATE" in red_text_only.upper(): json_value = find_matching_json_value("auditor signature", flat_json) elif "OPERATOR SIGNATURE" in red_text_only.upper(): json_value = find_matching_json_value("operator signature", flat_json) if json_value is not None: replacement_text = get_value_as_string(json_value) print(f" ✅ Replacing red text with: '{replacement_text}'") red_runs[0].text = replacement_text red_runs[0].font.color.rgb = RGBColor(0, 0, 0) for run in red_runs[1:]: run.text = '' replacements_made += 1 return replacements_made def process_headings(document, flat_json): """Process headings and their related content""" replacements_made = 0 print(f"\n🔍 Processing headings:") paragraphs = document.paragraphs for para_idx, paragraph in enumerate(paragraphs): paragraph_text = paragraph.text.strip() if not paragraph_text: continue # Check if this is a heading matched_heading = None for category, patterns in HEADING_PATTERNS.items(): for pattern in patterns: if re.search(pattern, paragraph_text, re.IGNORECASE): matched_heading = pattern break if matched_heading: break if matched_heading: print(f" 📌 Found heading at paragraph {para_idx + 1}: '{paragraph_text}'") # Check current heading paragraph if has_red_text_in_paragraph(paragraph): print(f" 🔴 Found red text in heading itself") heading_replacements = process_red_text_in_paragraph(paragraph, paragraph_text, flat_json) replacements_made += heading_replacements # Look ahead for related content for next_para_offset in range(1, 6): next_para_idx = para_idx + next_para_offset if next_para_idx >= len(paragraphs): break next_paragraph = paragraphs[next_para_idx] next_text = next_paragraph.text.strip() if not next_text: continue # Stop if we hit another heading is_another_heading = False for category, patterns in HEADING_PATTERNS.items(): for pattern in patterns: if re.search(pattern, next_text, re.IGNORECASE): is_another_heading = True break if is_another_heading: break if is_another_heading: break # Process red text with context if has_red_text_in_paragraph(next_paragraph): print(f" 🔴 Found red text in paragraph {next_para_idx + 1} after heading") context_replacements = process_red_text_in_paragraph( next_paragraph, paragraph_text, flat_json ) replacements_made += context_replacements return replacements_made def process_red_text_in_paragraph(paragraph, context_text, flat_json): """Process red text within a paragraph using context""" replacements_made = 0 red_text_segments = [] for run in paragraph.runs: if is_red(run) and run.text.strip(): red_text_segments.append(run.text.strip()) if not red_text_segments: return 0 combined_red_text = " ".join(red_text_segments).strip() print(f" 🔍 Red text found: '{combined_red_text}'") json_value = None # Direct matching json_value = find_matching_json_value(combined_red_text, flat_json) # Context-based matching if json_value is None: if "NHVAS APPROVED AUDITOR" in context_text.upper(): auditor_fields = ["auditor name", "auditor", "nhvas auditor", "approved auditor", "print name"] for field in auditor_fields: json_value = find_matching_json_value(field, flat_json) if json_value is not None: print(f" ✅ Found auditor match with field: '{field}'") break elif "OPERATOR DECLARATION" in context_text.upper(): operator_fields = ["operator name", "operator", "company name", "organisation name", "print name"] for field in operator_fields: json_value = find_matching_json_value(field, flat_json) if json_value is not None: print(f" ✅ Found operator match with field: '{field}'") break # Combined context queries if json_value is None: context_queries = [ f"{context_text} {combined_red_text}", combined_red_text, context_text ] for query in context_queries: json_value = find_matching_json_value(query, flat_json) if json_value is not None: print(f" ✅ Found match with combined query") break # Replace if match found if json_value is not None: replacement_text = get_value_as_string(json_value, combined_red_text) red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()] if red_runs: red_runs[0].text = replacement_text red_runs[0].font.color.rgb = RGBColor(0, 0, 0) for run in red_runs[1:]: run.text = '' replacements_made = 1 print(f" ✅ Replaced with: '{replacement_text}'") else: print(f" ❌ No match found for red text: '{combined_red_text}'") return replacements_made def force_red_text_replacement(document, flat_json): """Force replacement of any remaining red text by trying ALL JSON values""" replacements_made = 0 print(f"\n🎯 FORCE FIX: Scanning for any remaining red text...") # Collect all possible replacement values from JSON all_values = {} for key, value in flat_json.items(): if value: value_str = get_value_as_string(value, key) if value_str and isinstance(value_str, str) and value_str.strip(): all_values[key] = value_str.strip() # Store individual items from lists for partial matching if isinstance(value, list): for i, item in enumerate(value): item_str = str(item).strip() if item else "" if item_str: all_values[f"{key}_item_{i}"] = item_str print(f" Found {len(all_values)} potential replacement values") # Process all tables for table_idx, table in enumerate(document.tables): for row_idx, row in enumerate(table.rows): for cell_idx, cell in enumerate(row.cells): if has_red_text(cell): print(f" 🔍 Found red text in Table {table_idx + 1}, Row {row_idx + 1}, Cell {cell_idx + 1}") # Extract all red text from this cell red_text_parts = [] for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run) and run.text.strip(): red_text_parts.append(run.text.strip()) combined_red_text = " ".join(red_text_parts).strip() print(f" Red text: '{combined_red_text}'") # Find best match best_match = None best_key = None # Exact matching for key, value in all_values.items(): if combined_red_text.lower() == value.lower(): best_match = value best_key = key break # Partial matching if not best_match: for key, value in all_values.items(): if (len(value) > 3 and value.lower() in combined_red_text.lower()) or \ (len(combined_red_text) > 3 and combined_red_text.lower() in value.lower()): best_match = value best_key = key break # Word-by-word matching for names/dates if not best_match: red_words = set(word.lower() for word in combined_red_text.split() if len(word) > 2) best_score = 0 for key, value in all_values.items(): value_words = set(word.lower() for word in str(value).split() if len(word) > 2) if red_words and value_words: common_words = red_words.intersection(value_words) if common_words: score = len(common_words) / len(red_words) if score > best_score and score >= 0.5: # At least 50% match best_score = score best_match = value best_key = key # Replace if we found a match if best_match: print(f" ✅ Replacing with: '{best_match}' (from key: '{best_key}')") cell_replacements = replace_red_text_in_cell(cell, best_match) replacements_made += cell_replacements print(f" Made {cell_replacements} replacements") else: print(f" ❌ No suitable replacement found") # Process all paragraphs for para_idx, paragraph in enumerate(document.paragraphs): if has_red_text_in_paragraph(paragraph): red_text_parts = [] for run in paragraph.runs: if is_red(run) and run.text.strip(): red_text_parts.append(run.text.strip()) combined_red_text = " ".join(red_text_parts).strip() if combined_red_text: print(f" 🔍 Found red text in Paragraph {para_idx + 1}: '{combined_red_text}'") # Same matching logic as above best_match = None best_key = None # Exact match for key, value in all_values.items(): if combined_red_text.lower() == value.lower(): best_match = value best_key = key break # Partial match if not best_match: for key, value in all_values.items(): if (len(value) > 3 and value.lower() in combined_red_text.lower()) or \ (len(combined_red_text) > 3 and combined_red_text.lower() in value.lower()): best_match = value best_key = key break # Word match if not best_match: red_words = set(word.lower() for word in combined_red_text.split() if len(word) > 2) best_score = 0 for key, value in all_values.items(): value_words = set(word.lower() for word in str(value).split() if len(word) > 2) if red_words and value_words: common_words = red_words.intersection(value_words) if common_words: score = len(common_words) / len(red_words) if score > best_score and score >= 0.5: best_score = score best_match = value best_key = key # Replace if found if best_match: print(f" ✅ Replacing with: '{best_match}' (from key: '{best_key}')") red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()] if red_runs: red_runs[0].text = best_match red_runs[0].font.color.rgb = RGBColor(0, 0, 0) for run in red_runs[1:]: run.text = '' replacements_made += 1 print(f" Made 1 paragraph replacement") else: print(f" ❌ No suitable replacement found") return replacements_made def process_hf(json_file, docx_file, output_file): """Main processing function with comprehensive error handling""" try: # Load JSON if hasattr(json_file, "read"): json_data = json.load(json_file) else: with open(json_file, 'r', encoding='utf-8') as f: json_data = json.load(f) flat_json = flatten_json(json_data) print("📄 Available JSON keys (sample):") for i, (key, value) in enumerate(sorted(flat_json.items())): if i < 10: print(f" - {key}: {value}") print(f" ... and {len(flat_json) - 10} more keys\n") # Load DOCX if hasattr(docx_file, "read"): doc = Document(docx_file) else: doc = Document(docx_file) # Process document with all fixes print("🚀 Starting comprehensive document processing...") table_replacements = process_tables(doc, flat_json) paragraph_replacements = process_paragraphs(doc, flat_json) heading_replacements = process_headings(doc, flat_json) # Final force fix for any remaining red text force_replacements = force_red_text_replacement(doc, flat_json) total_replacements = table_replacements + paragraph_replacements + heading_replacements + force_replacements # Save output if hasattr(output_file, "write"): doc.save(output_file) else: doc.save(output_file) print(f"\n✅ Document saved as: {output_file}") print(f"✅ Total replacements: {total_replacements}") print(f" 📊 Tables: {table_replacements}") print(f" 📝 Paragraphs: {paragraph_replacements}") print(f" 📋 Headings: {heading_replacements}") print(f" 🎯 Force fixes: {force_replacements}") print(f"🎉 Processing complete!") except FileNotFoundError as e: print(f"❌ File not found: {e}") except Exception as e: print(f"❌ Error: {e}") import traceback traceback.print_exc() if __name__ == "__main__": import sys if len(sys.argv) != 4: print("Usage: python pipeline.py ") exit(1) docx_path = sys.argv[1] json_path = sys.argv[2] output_path = sys.argv[3] process_hf(json_path, docx_path, output_path)