import json from docx import Document from docx.shared import RGBColor import re # Your original heading patterns (unchanged) HEADING_PATTERNS = { "main": [ r"NHVAS\s+Audit\s+Summary\s+Report", r"NATIONAL\s+HEAVY\s+VEHICLE\s+ACCREDITATION\s+AUDIT\s+SUMMARY\s+REPORT", r"NHVAS\s+AUDIT\s+SUMMARY\s+REPORT" ], "sub": [ r"AUDIT\s+OBSERVATIONS\s+AND\s+COMMENTS", r"MAINTENANCE\s+MANAGEMENT", r"MASS\s+MANAGEMENT", r"FATIGUE\s+MANAGEMENT", r"Fatigue\s+Management\s+Summary\s+of\s+Audit\s+findings", r"MAINTENANCE\s+MANAGEMENT\s+SUMMARY\s+OF\s+AUDIT\s+FINDINGS", r"MASS\s+MANAGEMENT\s+SUMMARY\s+OF\s+AUDIT\s+FINDINGS", r"Vehicle\s+Registration\s+Numbers\s+of\s+Records\s+Examined", r"CORRECTIVE\s+ACTION\s+REQUEST\s+\(CAR\)", r"NHVAS\s+APPROVED\s+AUDITOR\s+DECLARATION", r"Operator\s+Declaration", r"Operator\s+Information", r"Driver\s*/\s*Scheduler\s+Records\s+Examined" ] } def load_json(filepath): with open(filepath, 'r') as file: return json.load(file) def flatten_json(y, prefix=''): out = {} for key, val in y.items(): new_key = f"{prefix}.{key}" if prefix else key if isinstance(val, dict): out.update(flatten_json(val, new_key)) else: out[new_key] = val out[key] = val return out def is_red(run): color = run.font.color return color and (color.rgb == RGBColor(255, 0, 0) or getattr(color, "theme_color", None) == 1) def get_value_as_string(value, field_name=""): if isinstance(value, list): if len(value) == 0: return "" elif len(value) == 1: return str(value[0]) else: if "australian company number" in field_name.lower() or "company number" in field_name.lower(): return value else: return " ".join(str(v) for v in value) else: return str(value) def find_matching_json_value(field_name, flat_json): """Your original matching function with minimal improvements""" field_name = field_name.strip() # Try exact match first if field_name in flat_json: print(f" āœ… Direct match found for key '{field_name}'") return flat_json[field_name] # Try case-insensitive exact match for key, value in flat_json.items(): if key.lower() == field_name.lower(): print(f" āœ… Case-insensitive match found for key '{field_name}' with JSON key '{key}'") return value # šŸŽÆ MINIMAL IMPROVEMENT: Better Print Name detection for operator vs auditor if field_name.lower().strip() == "print name": # Look in the flat_json keys to see what context we're in operator_keys = [k for k in flat_json.keys() if "operator" in k.lower() and "print name" in k.lower()] auditor_keys = [k for k in flat_json.keys() if "auditor" in k.lower() and ("print name" in k.lower() or "name" in k.lower())] # If we have operator-specific keys, prefer those in operator context if operator_keys: print(f" āœ… Operator Print Name match: '{field_name}' -> '{operator_keys[0]}'") return flat_json[operator_keys[0]] elif auditor_keys: print(f" āœ… Auditor Name match: '{field_name}' -> '{auditor_keys[0]}'") return flat_json[auditor_keys[0]] # Try suffix matching (for nested keys like "section.field") for key, value in flat_json.items(): if '.' in key and key.split('.')[-1].lower() == field_name.lower(): print(f" āœ… Suffix match found for key '{field_name}' with JSON key '{key}'") return value # Try partial matching - remove parentheses and special chars clean_field = re.sub(r'[^\w\s]', ' ', field_name.lower()).strip() clean_field = re.sub(r'\s+', ' ', clean_field) for key, value in flat_json.items(): clean_key = re.sub(r'[^\w\s]', ' ', key.lower()).strip() clean_key = re.sub(r'\s+', ' ', clean_key) if clean_field == clean_key: print(f" āœ… Clean match found for key '{field_name}' with JSON key '{key}'") return value # Enhanced fuzzy matching with better scoring field_words = set(word.lower() for word in re.findall(r'\b\w+\b', field_name) if len(word) > 2) if not field_words: return None best_match = None best_score = 0 best_key = None for key, value in flat_json.items(): key_words = set(word.lower() for word in re.findall(r'\b\w+\b', key) if len(word) > 2) if not key_words: continue # Calculate similarity score common_words = field_words.intersection(key_words) if common_words: # Use Jaccard similarity: intersection / union similarity = len(common_words) / len(field_words.union(key_words)) # Bonus for high word coverage in field_name coverage = len(common_words) / len(field_words) final_score = (similarity * 0.6) + (coverage * 0.4) if final_score > best_score: best_score = final_score best_match = value best_key = key if best_match and best_score >= 0.25: print(f" āœ… Fuzzy match found for key '{field_name}' with JSON key '{best_key}' (score: {best_score:.2f})") return best_match print(f" āŒ No match found for '{field_name}'") return None def get_clean_text(cell): text = "" for paragraph in cell.paragraphs: for run in paragraph.runs: text += run.text return text.strip() def has_red_text(cell): for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run) and run.text.strip(): return True return False def extract_red_text_segments(cell): """Your original red text extraction (unchanged)""" red_segments = [] for para_idx, paragraph in enumerate(cell.paragraphs): current_segment = "" segment_runs = [] for run_idx, run in enumerate(paragraph.runs): if is_red(run): if run.text: current_segment += run.text segment_runs.append((para_idx, run_idx, run)) else: # End of current red segment if segment_runs: red_segments.append({ 'text': current_segment, 'runs': segment_runs.copy(), 'paragraph_idx': para_idx }) current_segment = "" segment_runs = [] # Handle segment at end of paragraph if segment_runs: red_segments.append({ 'text': current_segment, 'runs': segment_runs.copy(), 'paragraph_idx': para_idx }) return red_segments def replace_red_text_in_cell(cell, replacement_text): """Your original replacement function (unchanged)""" red_segments = extract_red_text_segments(cell) if not red_segments: return 0 if len(red_segments) > 1: replacements_made = 0 for segment in red_segments: segment_text = segment['text'].strip() if segment_text: pass if replacements_made == 0: return replace_all_red_segments(red_segments, replacement_text) return replace_all_red_segments(red_segments, replacement_text) def replace_all_red_segments(red_segments, replacement_text): """Your original function (unchanged)""" if not red_segments: return 0 if '\n' in replacement_text: replacement_lines = replacement_text.split('\n') else: replacement_lines = [replacement_text] replacements_made = 0 if red_segments and replacement_lines: first_segment = red_segments[0] if first_segment['runs']: first_run = first_segment['runs'][0][2] first_run.text = replacement_lines[0] first_run.font.color.rgb = RGBColor(0, 0, 0) replacements_made = 1 for _, _, run in first_segment['runs'][1:]: run.text = '' for segment in red_segments[1:]: for _, _, run in segment['runs']: run.text = '' if len(replacement_lines) > 1 and red_segments: try: first_run = red_segments[0]['runs'][0][2] paragraph = first_run.element.getparent() for line in replacement_lines[1:]: if line.strip(): from docx.oxml import OxmlElement, ns br = OxmlElement('w:br') first_run.element.append(br) new_run = paragraph.add_run(line.strip()) new_run.font.color.rgb = RGBColor(0, 0, 0) except: if red_segments and red_segments[0]['runs']: first_run = red_segments[0]['runs'][0][2] first_run.text = ' '.join(replacement_lines) first_run.font.color.rgb = RGBColor(0, 0, 0) return replacements_made def replace_single_segment(segment, replacement_text): """Your original function (unchanged)""" if not segment['runs']: return False first_run = segment['runs'][0][2] first_run.text = replacement_text first_run.font.color.rgb = RGBColor(0, 0, 0) for _, _, run in segment['runs'][1:]: run.text = '' return True def handle_multiple_red_segments_in_cell(cell, flat_json): """Your original function (unchanged)""" red_segments = extract_red_text_segments(cell) if not red_segments: return 0 print(f" šŸ” Found {len(red_segments)} red text segments in cell") replacements_made = 0 unmatched_segments = [] for i, segment in enumerate(red_segments): segment_text = segment['text'].strip() if not segment_text: continue print(f" Segment {i+1}: '{segment_text[:50]}...'") json_value = find_matching_json_value(segment_text, flat_json) if json_value is not None: replacement_text = get_value_as_string(json_value, segment_text) if isinstance(json_value, list) and len(json_value) > 1: replacement_text = "\n".join(str(item) for item in json_value if str(item).strip()) success = replace_single_segment(segment, replacement_text) if success: replacements_made += 1 print(f" āœ… Replaced segment '{segment_text[:30]}...' with '{replacement_text[:30]}...'") else: unmatched_segments.append(segment) print(f" ā³ No individual match for segment '{segment_text[:30]}...'") if unmatched_segments and replacements_made == 0: combined_text = " ".join(seg['text'] for seg in red_segments).strip() print(f" šŸ”„ Trying combined text match: '{combined_text[:50]}...'") json_value = find_matching_json_value(combined_text, flat_json) if json_value is not None: replacement_text = get_value_as_string(json_value, combined_text) if isinstance(json_value, list) and len(json_value) > 1: replacement_text = "\n".join(str(item) for item in json_value if str(item).strip()) replacements_made = replace_all_red_segments(red_segments, replacement_text) print(f" āœ… Replaced combined text with '{replacement_text[:50]}...'") return replacements_made # šŸŽÆ SURGICAL FIX 1: Handle Nature of Business multi-line red text def handle_nature_business_multiline_fix(cell, flat_json): """SURGICAL FIX: Handle multi-line red text in Nature of Business cells""" if not has_red_text(cell): return 0 # Check if this cell contains "Nature of the Operators Business" cell_text = get_clean_text(cell).lower() if "nature of the operators business" not in cell_text and "nature of the operator business" not in cell_text: return 0 print(f" šŸŽÆ SURGICAL FIX: Nature of Business multi-line processing") # Look for sub-fields like "Accreditation Number:" and "Expiry Date:" red_segments = extract_red_text_segments(cell) replacements_made = 0 # Try to replace each segment individually first for segment in red_segments: segment_text = segment['text'].strip() if not segment_text: continue json_value = find_matching_json_value(segment_text, flat_json) if json_value is not None: replacement_text = get_value_as_string(json_value, segment_text) success = replace_single_segment(segment, replacement_text) if success: replacements_made += 1 print(f" āœ… Fixed segment: '{segment_text[:30]}...'") # If no individual matches, try combined approach if replacements_made == 0 and red_segments: combined_text = " ".join(seg['text'] for seg in red_segments).strip() json_value = find_matching_json_value(combined_text, flat_json) if json_value is not None: replacement_text = get_value_as_string(json_value, combined_text) replacements_made = replace_all_red_segments(red_segments, replacement_text) print(f" āœ… Fixed combined text") return replacements_made # šŸŽÆ SURGICAL FIX 2: Handle Operator Declaration table with context awareness def handle_operator_declaration_fix(table, flat_json): """SURGICAL FIX: Handle Operator Declaration Print Name and Position Title with better context detection""" replacements_made = 0 # Build table context to understand what type of declaration this is table_context = "" for row in table.rows: for cell in row.cells: table_context += get_clean_text(cell).lower() + " " # Determine if this is an operator declaration vs auditor declaration is_operator_declaration = any(keyword in table_context for keyword in [ "hereby acknowledge", "findings detailed", "management system", "accreditation to be shared", "operator signature" ]) is_auditor_declaration = any(keyword in table_context for keyword in [ "nhvas approved auditor", "auditor registration", "hereby certify", "auditor signature" ]) # Process the table based on context for row_idx, row in enumerate(table.rows): if len(row.cells) >= 2: cell1_text = get_clean_text(row.cells[0]).strip() cell2_text = get_clean_text(row.cells[1]).strip() # Check if this is a header row with Print Name and Position Title if ("print name" in cell1_text.lower() and "position title" in cell2_text.lower() and len(table.rows) <= 4): # Small table only context_type = "Operator" if is_operator_declaration else ("Auditor" if is_auditor_declaration else "Unknown") print(f" šŸŽÆ SURGICAL FIX: {context_type} Declaration table detected") # Look for the data row (should be next row) if row_idx + 1 < len(table.rows): data_row = table.rows[row_idx + 1] if len(data_row.cells) >= 2: name_cell = data_row.cells[0] position_cell = data_row.cells[1] # Fix Print Name based on context if has_red_text(name_cell): name_value = None if is_operator_declaration: # Try operator-specific fields first for field_attempt in ["Operator Declaration.Print Name", "operator.print name", "Print Name"]: name_value = find_matching_json_value(field_attempt, flat_json) if name_value is not None: break elif is_auditor_declaration: # Try auditor-specific fields first for field_attempt in ["NHVAS Approved Auditor Declaration.Print Name", "auditor name", "auditor", "Print Name"]: name_value = find_matching_json_value(field_attempt, flat_json) if name_value is not None: break else: # Fallback to generic name_value = find_matching_json_value("Print Name", flat_json) if name_value is not None: name_text = get_value_as_string(name_value) cell_replacements = replace_red_text_in_cell(name_cell, name_text) replacements_made += cell_replacements print(f" āœ… Fixed {context_type} Print Name: '{name_text}'") # Fix Position Title based on context if has_red_text(position_cell): position_value = None if is_operator_declaration: # Try operator-specific fields first for field_attempt in ["Operator Declaration.Position Title", "operator.position title", "Position Title"]: position_value = find_matching_json_value(field_attempt, flat_json) if position_value is not None: break elif is_auditor_declaration: # Try auditor registration number for auditor declarations for field_attempt in ["NHVR or Exemplar Global Auditor Registration Number", "auditor registration", "registration number"]: position_value = find_matching_json_value(field_attempt, flat_json) if position_value is not None: break else: # Fallback to generic position_value = find_matching_json_value("Position Title", flat_json) if position_value is not None: position_text = get_value_as_string(position_value) cell_replacements = replace_red_text_in_cell(position_cell, position_text) replacements_made += cell_replacements print(f" āœ… Fixed {context_type} Position/Registration: '{position_text}'") break # Found the table, stop looking return replacements_made def handle_australian_company_number(row, company_numbers): """Your original function (unchanged)""" replacements_made = 0 for i, digit in enumerate(company_numbers): cell_idx = i + 1 if cell_idx < len(row.cells): cell = row.cells[cell_idx] if has_red_text(cell): cell_replacements = replace_red_text_in_cell(cell, str(digit)) replacements_made += cell_replacements print(f" -> Placed digit '{digit}' in cell {cell_idx + 1}") return replacements_made def handle_vehicle_registration_table(table, flat_json): """Your original function (unchanged)""" replacements_made = 0 # Try to find vehicle registration data vehicle_section = None for key, value in flat_json.items(): if "vehicle registration numbers of records examined" in key.lower(): if isinstance(value, dict): vehicle_section = value print(f" āœ… Found vehicle data in key: '{key}'") break if not vehicle_section: potential_columns = {} for key, value in flat_json.items(): if any(col_name in key.lower() for col_name in ["registration number", "sub-contractor", "weight verification", "rfs suspension"]): if "." in key: column_name = key.split(".")[-1] else: column_name = key potential_columns[column_name] = value if potential_columns: vehicle_section = potential_columns print(f" āœ… Found vehicle data from flattened keys: {list(vehicle_section.keys())}") else: print(f" āŒ Vehicle registration data not found in JSON") return 0 print(f" āœ… Found vehicle registration data with {len(vehicle_section)} columns") # Find header row header_row_idx = -1 header_row = None for row_idx, row in enumerate(table.rows): row_text = "".join(get_clean_text(cell).lower() for cell in row.cells) if "registration" in row_text and "number" in row_text: header_row_idx = row_idx header_row = row break if header_row_idx == -1: print(f" āŒ Could not find header row in vehicle table") return 0 print(f" āœ… Found header row at index {header_row_idx}") # Enhanced column mapping column_mapping = {} for col_idx, cell in enumerate(header_row.cells): header_text = get_clean_text(cell).strip() if not header_text or header_text.lower() == "no.": continue best_match = None best_score = 0 normalized_header = header_text.lower().replace("(", " (").replace(")", ") ").strip() for json_key in vehicle_section.keys(): normalized_json = json_key.lower().strip() if normalized_header == normalized_json: best_match = json_key best_score = 1.0 break header_words = set(word.lower() for word in normalized_header.split() if len(word) > 2) json_words = set(word.lower() for word in normalized_json.split() if len(word) > 2) if header_words and json_words: common_words = header_words.intersection(json_words) score = len(common_words) / max(len(header_words), len(json_words)) if score > best_score and score >= 0.3: best_score = score best_match = json_key header_clean = normalized_header.replace(" ", "").replace("-", "").replace("(", "").replace(")", "") json_clean = normalized_json.replace(" ", "").replace("-", "").replace("(", "").replace(")", "") if header_clean in json_clean or json_clean in header_clean: if len(header_clean) > 5 and len(json_clean) > 5: substring_score = min(len(header_clean), len(json_clean)) / max(len(header_clean), len(json_clean)) if substring_score > best_score and substring_score >= 0.6: best_score = substring_score best_match = json_key if best_match: column_mapping[col_idx] = best_match print(f" šŸ“Œ Column {col_idx + 1} ('{header_text}') -> '{best_match}' (score: {best_score:.2f})") if not column_mapping: print(f" āŒ No column mappings found") return 0 # Determine data rows needed max_data_rows = 0 for json_key, data in vehicle_section.items(): if isinstance(data, list): max_data_rows = max(max_data_rows, len(data)) print(f" šŸ“Œ Need to populate {max_data_rows} data rows") # Process data rows for data_row_index in range(max_data_rows): table_row_idx = header_row_idx + 1 + data_row_index if table_row_idx >= len(table.rows): print(f" āš ļø Row {table_row_idx + 1} doesn't exist - table only has {len(table.rows)} rows") print(f" āž• Adding new row for vehicle {data_row_index + 1}") new_row = table.add_row() print(f" āœ… Successfully added row {len(table.rows)} to the table") row = table.rows[table_row_idx] print(f" šŸ“Œ Processing data row {table_row_idx + 1} (vehicle {data_row_index + 1})") for col_idx, json_key in column_mapping.items(): if col_idx < len(row.cells): cell = row.cells[col_idx] column_data = vehicle_section.get(json_key, []) if isinstance(column_data, list) and data_row_index < len(column_data): replacement_value = str(column_data[data_row_index]) cell_text = get_clean_text(cell) if has_red_text(cell) or not cell_text.strip(): if not cell_text.strip(): cell.text = replacement_value replacements_made += 1 print(f" -> Added '{replacement_value}' to empty cell (column '{json_key}')") else: cell_replacements = replace_red_text_in_cell(cell, replacement_value) replacements_made += cell_replacements if cell_replacements > 0: print(f" -> Replaced red text with '{replacement_value}' (column '{json_key}')") return replacements_made def handle_print_accreditation_section(table, flat_json): """Your original function (unchanged)""" replacements_made = 0 print_data = flat_json.get("print accreditation name.print accreditation name", []) if not isinstance(print_data, list) or len(print_data) < 2: return 0 name_value = print_data[0] position_value = print_data[1] print(f" šŸ“‹ Print accreditation data: Name='{name_value}', Position='{position_value}'") for row_idx, row in enumerate(table.rows): if len(row.cells) >= 2: cell1_text = get_clean_text(row.cells[0]).lower() cell2_text = get_clean_text(row.cells[1]).lower() if "print name" in cell1_text and "position title" in cell2_text: print(f" šŸ“ Found header row {row_idx + 1}: '{cell1_text}' | '{cell2_text}'") if row_idx + 1 < len(table.rows): data_row = table.rows[row_idx + 1] if len(data_row.cells) >= 2: if has_red_text(data_row.cells[0]): cell_replacements = replace_red_text_in_cell(data_row.cells[0], name_value) replacements_made += cell_replacements if cell_replacements > 0: print(f" āœ… Replaced Print Name: '{name_value}'") if has_red_text(data_row.cells[1]): cell_replacements = replace_red_text_in_cell(data_row.cells[1], position_value) replacements_made += cell_replacements if cell_replacements > 0: print(f" āœ… Replaced Position Title: '{position_value}'") break return replacements_made def process_single_column_sections(cell, field_name, flat_json): """Your original function (unchanged)""" json_value = find_matching_json_value(field_name, flat_json) if json_value is not None: replacement_text = get_value_as_string(json_value, field_name) if isinstance(json_value, list) and len(json_value) > 1: replacement_text = "\n".join(str(item) for item in json_value) if has_red_text(cell): print(f" āœ… Replacing red text in single-column section: '{field_name}'") print(f" āœ… Replacement text:\n{replacement_text}") cell_replacements = replace_red_text_in_cell(cell, replacement_text) if cell_replacements > 0: print(f" -> Replaced with: '{replacement_text[:100]}...'") return cell_replacements return 0 def handle_attendance_list_table_enhanced(table, flat_json): """Enhanced Attendance List processing with better detection""" replacements_made = 0 # Check multiple patterns for attendance list attendance_patterns = [ "attendance list", "names and position titles", "attendees" ] # Scan all cells in the first few rows for attendance list indicators found_attendance_row = None found_attendance_cell = None for row_idx, row in enumerate(table.rows[:3]): # Check first 3 rows for cell_idx, cell in enumerate(row.cells): cell_text = get_clean_text(cell).lower() # Check if this cell contains attendance list header if any(pattern in cell_text for pattern in attendance_patterns): found_attendance_row = row_idx found_attendance_cell = cell_idx print(f" šŸŽÆ ENHANCED: Found Attendance List in row {row_idx + 1}, cell {cell_idx + 1}") break if found_attendance_row is not None: break if found_attendance_row is None: return 0 # šŸ”§ FIX: Look for attendance data in JSON attendance_value = None attendance_search_keys = [ "Attendance List (Names and Position Titles).Attendance List (Names and Position Titles)", "Attendance List (Names and Position Titles)", "attendance list", "attendees" ] print(f" šŸ” Searching for attendance data in JSON...") for search_key in attendance_search_keys: attendance_value = find_matching_json_value(search_key, flat_json) if attendance_value is not None: print(f" āœ… Found attendance data with key: '{search_key}'") print(f" šŸ“Š Raw value: {attendance_value}") break if attendance_value is None: print(f" āŒ No attendance data found in JSON") return 0 # šŸ”§ CRITICAL FIX: Look for red text in ALL cells of the table, not just the header target_cell = None print(f" šŸ” Scanning ALL cells in attendance table for red text...") for row_idx, row in enumerate(table.rows): for cell_idx, cell in enumerate(row.cells): if has_red_text(cell): print(f" šŸŽÆ Found red text in row {row_idx + 1}, cell {cell_idx + 1}") # Get the red text to see if it looks like attendance data red_text = "" for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run): red_text += run.text print(f" šŸ“‹ Red text content: '{red_text[:50]}...'") # Check if this red text looks like attendance data (contains names/manager/etc) red_text_lower = red_text.lower() if any(indicator in red_text_lower for indicator in ['manager', 'herbig', 'palin', '–', '-']): target_cell = cell print(f" āœ… This looks like attendance data - using this cell") break if target_cell is not None: break # If no red text found that looks like attendance data, return if target_cell is None: print(f" āš ļø No red text found that looks like attendance data") return 0 # šŸ”§ CRITICAL FIX: ONLY replace red text, preserve everything else if has_red_text(target_cell): print(f" šŸ”§ Replacing ONLY red text in target cell...") # Format the attendance data for replacement if isinstance(attendance_value, list): # Join with line breaks for Word formatted_attendance = '\n'.join(str(item).strip() for item in attendance_value if str(item).strip()) else: formatted_attendance = str(attendance_value) print(f" šŸ“ Replacement text:\n{formatted_attendance}") # Use the existing replace_red_text_in_cell function which preserves non-red text cell_replacements = replace_red_text_in_cell(target_cell, formatted_attendance) replacements_made += cell_replacements print(f" āœ… Replaced only red text, preserved other content") print(f" šŸ“Š Replacements made: {cell_replacements}") return replacements_made # šŸŽÆ FINAL FIX 2: Generic Management Summary fix for ALL types (Mass, Fatigue, Maintenance) def handle_management_summary_fix(cell, flat_json): """FINAL FIX: Handle ANY Management Summary section (Mass/Fatigue/Maintenance) - RED TEXT ONLY""" if not has_red_text(cell): return 0 # Check if this cell contains any Management Summary cell_text = get_clean_text(cell).lower() # Detect which type of management summary this is management_type = None if "mass management" in cell_text and "summary" in cell_text: management_type = "Mass Management" elif "fatigue management" in cell_text and "summary" in cell_text: management_type = "Fatigue Management" elif "maintenance management" in cell_text and "summary" in cell_text: management_type = "Maintenance Management" if not management_type: return 0 print(f" šŸŽÆ FINAL FIX: {management_type} Summary processing - RED TEXT ONLY") # ONLY process red text segments, not the entire cell text red_segments = extract_red_text_segments(cell) replacements_made = 0 # Try to replace ONLY the red text segments for segment in red_segments: segment_text = segment['text'].strip() if not segment_text: continue print(f" šŸ” Processing red text segment: '{segment_text[:50]}...'") # Try multiple variations based on the management type summary_value = None field_attempts = [ f"{management_type} Summary of Audit findings", f"{management_type} Summary", f"{management_type.lower()} summary", management_type.lower(), segment_text # Also try the exact red text ] # Also try variations without "Management" base_type = management_type.replace(" Management", "") field_attempts.extend([ f"{base_type} Management Summary of Audit findings", f"{base_type} Summary of Audit findings", f"{base_type} Summary", f"{base_type.lower()} summary" ]) for field_attempt in field_attempts: summary_value = find_matching_json_value(field_attempt, flat_json) if summary_value is not None: print(f" āœ… Found match with field: '{field_attempt}'") break if summary_value is not None: replacement_text = get_value_as_string(summary_value, segment_text) if isinstance(summary_value, list): replacement_text = "\n".join(str(item) for item in summary_value if str(item).strip()) success = replace_single_segment(segment, replacement_text) if success: replacements_made += 1 print(f" āœ… Fixed {management_type} Summary segment: '{segment_text[:30]}...' -> '{replacement_text[:30]}...'") else: print(f" āŒ No match found for red text: '{segment_text[:30]}...'") # If no individual segment matches, try combined approach on red text only if replacements_made == 0 and red_segments: combined_red_text = " ".join(seg['text'] for seg in red_segments).strip() print(f" šŸ”„ Trying combined red text match: '{combined_red_text[:50]}...'") # Try combined text matching with all field variations field_attempts = [ f"{management_type} Summary of Audit findings", f"{management_type} Summary", f"{management_type.lower()} summary", combined_red_text ] base_type = management_type.replace(" Management", "") field_attempts.extend([ f"{base_type} Management Summary of Audit findings", f"{base_type} Summary of Audit findings", f"{base_type} Summary" ]) for field_attempt in field_attempts: summary_value = find_matching_json_value(field_attempt, flat_json) if summary_value is not None: replacement_text = get_value_as_string(summary_value, combined_red_text) if isinstance(summary_value, list): replacement_text = "\n".join(str(item) for item in summary_value if str(item).strip()) replacements_made = replace_all_red_segments(red_segments, replacement_text) print(f" āœ… Fixed {management_type} Summary combined red text with field: '{field_attempt}'") break return replacements_made def process_tables(document, flat_json): """Your original function with ALL surgical fixes added""" replacements_made = 0 for table_idx, table in enumerate(document.tables): print(f"\nšŸ” Processing table {table_idx + 1}:") # Your original logic table_text = "" for row in table.rows[:3]: for cell in row.cells: table_text += get_clean_text(cell).lower() + " " # Enhanced vehicle registration detection vehicle_indicators = ["registration number", "sub-contractor", "weight verification", "rfs suspension"] indicator_count = sum(1 for indicator in vehicle_indicators if indicator in table_text) if indicator_count >= 2: print(f" šŸš— Detected Vehicle Registration table") vehicle_replacements = handle_vehicle_registration_table(table, flat_json) replacements_made += vehicle_replacements continue # šŸŽÆ FINAL FIX 1: Enhanced attendance list detection if "attendance list" in table_text and "names and position titles" in table_text: print(f" šŸ‘„ Detected Attendance List table") attendance_replacements = handle_attendance_list_table_enhanced(table, flat_json) replacements_made += attendance_replacements continue # Enhanced print accreditation detection print_accreditation_indicators = ["print name", "position title"] indicator_count = sum(1 for indicator in print_accreditation_indicators if indicator in table_text) if indicator_count >= 1: print(f" šŸ“‹ Detected Print Accreditation table") print_accreditation_replacements = handle_print_accreditation_section(table, flat_json) replacements_made += print_accreditation_replacements continue # Your existing row processing for row_idx, row in enumerate(table.rows): if len(row.cells) < 1: continue key_cell = row.cells[0] key_text = get_clean_text(key_cell) if not key_text: continue print(f" šŸ“Œ Row {row_idx + 1}: Key = '{key_text}'") json_value = find_matching_json_value(key_text, flat_json) if json_value is not None: replacement_text = get_value_as_string(json_value, key_text) # Enhanced ACN handling if ("australian company number" in key_text.lower() or "company number" in key_text.lower()) and isinstance(json_value, list): cell_replacements = handle_australian_company_number(row, json_value) replacements_made += cell_replacements # Enhanced section header handling elif ("attendance list" in key_text.lower() or "nature of" in key_text.lower()) and row_idx + 1 < len(table.rows): print(f" āœ… Section header detected, checking next row for content...") next_row = table.rows[row_idx + 1] for cell_idx, cell in enumerate(next_row.cells): if has_red_text(cell): print(f" āœ… Found red text in next row, cell {cell_idx + 1}") if isinstance(json_value, list): replacement_text = "\n".join(str(item) for item in json_value) cell_replacements = replace_red_text_in_cell(cell, replacement_text) replacements_made += cell_replacements if cell_replacements > 0: print(f" -> Replaced section content with: '{replacement_text[:100]}...'") elif len(row.cells) == 1 or (len(row.cells) > 1 and not any(has_red_text(row.cells[i]) for i in range(1, len(row.cells)))): if has_red_text(key_cell): cell_replacements = process_single_column_sections(key_cell, key_text, flat_json) replacements_made += cell_replacements else: for cell_idx in range(1, len(row.cells)): value_cell = row.cells[cell_idx] if has_red_text(value_cell): print(f" āœ… Found red text in column {cell_idx + 1}") cell_replacements = replace_red_text_in_cell(value_cell, replacement_text) replacements_made += cell_replacements else: # Enhanced fallback processing for unmatched keys if len(row.cells) == 1 and has_red_text(key_cell): red_text = "" for paragraph in key_cell.paragraphs: for run in paragraph.runs: if is_red(run): red_text += run.text if red_text.strip(): section_value = find_matching_json_value(red_text.strip(), flat_json) if section_value is not None: section_replacement = get_value_as_string(section_value, red_text.strip()) cell_replacements = replace_red_text_in_cell(key_cell, section_replacement) replacements_made += cell_replacements # Enhanced red text processing for all cells for cell_idx in range(len(row.cells)): cell = row.cells[cell_idx] if has_red_text(cell): cell_replacements = handle_multiple_red_segments_in_cell(cell, flat_json) replacements_made += cell_replacements # šŸŽÆ SURGICAL FIX 1: Only if no replacements were made if cell_replacements == 0: surgical_fix = handle_nature_business_multiline_fix(cell, flat_json) replacements_made += surgical_fix # šŸŽÆ FINAL FIX 2: Only if still no replacements were made, try ANY Management Summary fix if cell_replacements == 0 and surgical_fix == 0: management_summary_fix = handle_management_summary_fix(cell, flat_json) replacements_made += management_summary_fix # šŸŽÆ SURGICAL FIX 3: Handle Operator Declaration tables (only check last few tables) print(f"\nšŸŽÆ SURGICAL FIX: Checking for Operator/Auditor Declaration tables...") for table in document.tables[-3:]: # Only check last 3 tables if len(table.rows) <= 4: # Only small tables declaration_fix = handle_operator_declaration_fix(table, flat_json) replacements_made += declaration_fix return replacements_made def process_paragraphs(document, flat_json): """Your original function (unchanged)""" replacements_made = 0 print(f"\nšŸ” Processing paragraphs:") for para_idx, paragraph in enumerate(document.paragraphs): red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()] if red_runs: full_text = paragraph.text.strip() red_text_only = "".join(run.text for run in red_runs).strip() print(f" šŸ“Œ Paragraph {para_idx + 1}: Found red text: '{red_text_only}'") # Your existing matching logic json_value = find_matching_json_value(red_text_only, flat_json) if json_value is None: # Enhanced pattern matching for signatures and dates if "AUDITOR SIGNATURE" in red_text_only.upper() or "DATE" in red_text_only.upper(): json_value = find_matching_json_value("auditor signature", flat_json) elif "OPERATOR SIGNATURE" in red_text_only.upper(): json_value = find_matching_json_value("operator signature", flat_json) if json_value is not None: replacement_text = get_value_as_string(json_value) print(f" āœ… Replacing red text with: '{replacement_text}'") red_runs[0].text = replacement_text red_runs[0].font.color.rgb = RGBColor(0, 0, 0) for run in red_runs[1:]: run.text = '' replacements_made += 1 return replacements_made def process_headings(document, flat_json): """Your original function (unchanged)""" replacements_made = 0 print(f"\nšŸ” Processing headings:") paragraphs = document.paragraphs for para_idx, paragraph in enumerate(paragraphs): paragraph_text = paragraph.text.strip() if not paragraph_text: continue # Enhanced heading detection matched_heading = None for category, patterns in HEADING_PATTERNS.items(): for pattern in patterns: if re.search(pattern, paragraph_text, re.IGNORECASE): matched_heading = pattern break if matched_heading: break if matched_heading: print(f" šŸ“Œ Found heading at paragraph {para_idx + 1}: '{paragraph_text}'") # Check current heading paragraph if has_red_text_in_paragraph(paragraph): print(f" šŸ”“ Found red text in heading itself") heading_replacements = process_red_text_in_paragraph(paragraph, paragraph_text, flat_json) replacements_made += heading_replacements # Enhanced: Look further ahead for related content for next_para_offset in range(1, 6): # Extended range next_para_idx = para_idx + next_para_offset if next_para_idx >= len(paragraphs): break next_paragraph = paragraphs[next_para_idx] next_text = next_paragraph.text.strip() if not next_text: continue # Stop if we hit another heading is_another_heading = False for category, patterns in HEADING_PATTERNS.items(): for pattern in patterns: if re.search(pattern, next_text, re.IGNORECASE): is_another_heading = True break if is_another_heading: break if is_another_heading: break # Process red text with enhanced context if has_red_text_in_paragraph(next_paragraph): print(f" šŸ”“ Found red text in paragraph {next_para_idx + 1} after heading: '{next_text[:50]}...'") context_replacements = process_red_text_in_paragraph( next_paragraph, paragraph_text, flat_json ) replacements_made += context_replacements return replacements_made def has_red_text_in_paragraph(paragraph): """Your original function (unchanged)""" for run in paragraph.runs: if is_red(run) and run.text.strip(): return True return False def process_red_text_in_paragraph(paragraph, context_text, flat_json): """Your original function (unchanged)""" replacements_made = 0 red_text_segments = [] for run in paragraph.runs: if is_red(run) and run.text.strip(): red_text_segments.append(run.text.strip()) if not red_text_segments: return 0 combined_red_text = " ".join(red_text_segments).strip() print(f" šŸ” Red text found: '{combined_red_text}'") json_value = None # Strategy 1: Direct matching json_value = find_matching_json_value(combined_red_text, flat_json) # Strategy 2: Enhanced context-based matching if json_value is None: if "NHVAS APPROVED AUDITOR" in context_text.upper(): auditor_fields = ["auditor name", "auditor", "nhvas auditor", "approved auditor", "print name"] for field in auditor_fields: json_value = find_matching_json_value(field, flat_json) if json_value is not None: print(f" āœ… Found auditor match with field: '{field}'") break elif "OPERATOR DECLARATION" in context_text.upper(): operator_fields = ["operator name", "operator", "company name", "organisation name", "print name"] for field in operator_fields: json_value = find_matching_json_value(field, flat_json) if json_value is not None: print(f" āœ… Found operator match with field: '{field}'") break # Strategy 3: Enhanced context combination if json_value is None: context_queries = [ f"{context_text} {combined_red_text}", combined_red_text, context_text ] for query in context_queries: json_value = find_matching_json_value(query, flat_json) if json_value is not None: print(f" āœ… Found match with combined query: '{query[:50]}...'") break # Replace if match found if json_value is not None: replacement_text = get_value_as_string(json_value, combined_red_text) red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()] if red_runs: red_runs[0].text = replacement_text red_runs[0].font.color.rgb = RGBColor(0, 0, 0) for run in red_runs[1:]: run.text = '' replacements_made = 1 print(f" āœ… Replaced with: '{replacement_text}'") else: print(f" āŒ No match found for red text: '{combined_red_text}'") return replacements_made def force_red_text_replacement(document, flat_json): """Force replacement of any remaining red text by trying ALL JSON values - FIXED""" replacements_made = 0 print(f"\nšŸŽÆ FORCE FIX: Scanning for any remaining red text...") # Collect ALL possible replacement values from JSON - FIXED to handle lists properly all_values = {} for key, value in flat_json.items(): if value: # Convert value to string properly value_str = get_value_as_string(value, key) # Only add if we have a valid string if value_str and isinstance(value_str, str) and value_str.strip(): all_values[key] = value_str.strip() # Also store individual items from lists for partial matching if isinstance(value, list): for i, item in enumerate(value): item_str = str(item).strip() if item else "" if item_str: all_values[f"{key}_item_{i}"] = item_str print(f" Found {len(all_values)} potential replacement values") # Process all tables for table_idx, table in enumerate(document.tables): for row_idx, row in enumerate(table.rows): for cell_idx, cell in enumerate(row.cells): if has_red_text(cell): print(f" šŸ” Found red text in Table {table_idx + 1}, Row {row_idx + 1}, Cell {cell_idx + 1}") # Extract all red text from this cell red_text_parts = [] for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run) and run.text.strip(): red_text_parts.append(run.text.strip()) combined_red_text = " ".join(red_text_parts).strip() print(f" Red text: '{combined_red_text}'") # Try to find a match best_match = None best_key = None # First try exact matching for key, value in all_values.items(): if combined_red_text.lower() == value.lower(): best_match = value best_key = key break # If no exact match, try partial matching if not best_match: for key, value in all_values.items(): # Try if red text contains this value or vice versa if (len(value) > 3 and value.lower() in combined_red_text.lower()) or \ (len(combined_red_text) > 3 and combined_red_text.lower() in value.lower()): best_match = value best_key = key break # If still no match, try word-by-word matching for names/dates if not best_match: red_words = set(word.lower() for word in combined_red_text.split() if len(word) > 2) best_score = 0 for key, value in all_values.items(): value_words = set(word.lower() for word in str(value).split() if len(word) > 2) if red_words and value_words: common_words = red_words.intersection(value_words) if common_words: score = len(common_words) / len(red_words) if score > best_score and score >= 0.5: # At least 50% match best_score = score best_match = value best_key = key # Replace if we found a match if best_match: print(f" āœ… Replacing with: '{best_match}' (from key: '{best_key}')") cell_replacements = replace_red_text_in_cell(cell, best_match) replacements_made += cell_replacements print(f" Made {cell_replacements} replacements") else: print(f" āŒ No suitable replacement found") # Process all paragraphs for para_idx, paragraph in enumerate(document.paragraphs): if has_red_text_in_paragraph(paragraph): red_text_parts = [] for run in paragraph.runs: if is_red(run) and run.text.strip(): red_text_parts.append(run.text.strip()) combined_red_text = " ".join(red_text_parts).strip() if combined_red_text: print(f" šŸ” Found red text in Paragraph {para_idx + 1}: '{combined_red_text}'") # Same matching logic as above best_match = None best_key = None # Exact match for key, value in all_values.items(): if combined_red_text.lower() == value.lower(): best_match = value best_key = key break # Partial match if not best_match: for key, value in all_values.items(): if (len(value) > 3 and value.lower() in combined_red_text.lower()) or \ (len(combined_red_text) > 3 and combined_red_text.lower() in value.lower()): best_match = value best_key = key break # Word match if not best_match: red_words = set(word.lower() for word in combined_red_text.split() if len(word) > 2) best_score = 0 for key, value in all_values.items(): value_words = set(word.lower() for word in str(value).split() if len(word) > 2) if red_words and value_words: common_words = red_words.intersection(value_words) if common_words: score = len(common_words) / len(red_words) if score > best_score and score >= 0.5: best_score = score best_match = value best_key = key # Replace if found if best_match: print(f" āœ… Replacing with: '{best_match}' (from key: '{best_key}')") red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()] if red_runs: red_runs[0].text = best_match red_runs[0].font.color.rgb = RGBColor(0, 0, 0) for run in red_runs[1:]: run.text = '' replacements_made += 1 print(f" Made 1 paragraph replacement") else: print(f" āŒ No suitable replacement found") return replacements_made def process_hf(json_file, docx_file, output_file): """Your original main function with force fix added at the end""" try: # Load JSON if hasattr(json_file, "read"): json_data = json.load(json_file) else: with open(json_file, 'r', encoding='utf-8') as f: json_data = json.load(f) flat_json = flatten_json(json_data) print("šŸ“„ Available JSON keys (sample):") for i, (key, value) in enumerate(sorted(flat_json.items())): if i < 10: print(f" - {key}: {value}") print(f" ... and {len(flat_json) - 10} more keys\n") # Load DOCX if hasattr(docx_file, "read"): doc = Document(docx_file) else: doc = Document(docx_file) # Your original processing with surgical fixes print("šŸš€ Starting processing with minimal surgical fixes...") table_replacements = process_tables(doc, flat_json) paragraph_replacements = process_paragraphs(doc, flat_json) heading_replacements = process_headings(doc, flat_json) # šŸŽÆ ADD THIS: Force fix for any remaining red text force_replacements = force_red_text_replacement(doc, flat_json) total_replacements = table_replacements + paragraph_replacements + heading_replacements + force_replacements # Save output if hasattr(output_file, "write"): doc.save(output_file) else: doc.save(output_file) print(f"\nāœ… Document saved as: {output_file}") print(f"āœ… Total replacements: {total_replacements}") print(f" šŸ“Š Tables: {table_replacements}") print(f" šŸ“ Paragraphs: {paragraph_replacements}") print(f" šŸ“‹ Headings: {heading_replacements}") print(f" šŸŽÆ Force fixes: {force_replacements}") print(f"šŸŽ‰ Processing complete!") except FileNotFoundError as e: print(f"āŒ File not found: {e}") except Exception as e: print(f"āŒ Error: {e}") import traceback traceback.print_exc() if __name__ == "__main__": import sys if len(sys.argv) != 4: print("Usage: python pipeline.py ") exit(1) docx_path = sys.argv[1] json_path = sys.argv[2] output_path = sys.argv[3] process_hf(json_path, docx_path, output_path)