Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| pipeline.py β safer matching and operator-declaration protections | |
| Key improvements: | |
| - find_matching_json_key_and_value() returns (key, value) so callers can accept/reject by key. | |
| - Higher fuzzy thresholds for risky substitutions. | |
| - Operator Declaration: avoid using attendance lists / unrelated keys for Position Title. | |
| - Vehicle header mapping: stronger normalized substring/ token matching for long headers. | |
| - Preserves existing logging and all previous handlers/logic. | |
| """ | |
| import json | |
| from docx import Document | |
| from docx.shared import RGBColor | |
| import re | |
| from typing import Any, Tuple, Optional | |
| # ============================================================================ | |
| # Heading patterns for document structure detection (unchanged) | |
| # ============================================================================ | |
| HEADING_PATTERNS = { | |
| "main": [ | |
| r"NHVAS\s+Audit\s+Summary\s+Report", | |
| r"NATIONAL\s+HEAVY\s+VEHICLE\s+ACCREDITATION\s+AUDIT\s+SUMMARY\s+REPORT", | |
| r"NHVAS\s+AUDIT\s+SUMMARY\s+REPORT" | |
| ], | |
| "sub": [ | |
| r"AUDIT\s+OBSERVATIONS\s+AND\s+COMMENTS", | |
| r"MAINTENANCE\s+MANAGEMENT", | |
| r"MASS\s+MANAGEMENT", | |
| r"FATIGUE\s+MANAGEMENT", | |
| r"Fatigue\s+Management\s+Summary\s+of\s+Audit\s+findings", | |
| r"MAINTENANCE\s+MANAGEMENT\s+SUMMARY\s+OF\s+AUDIT\s+FINDINGS", | |
| r"MASS\s+MANAGEMENT\s+SUMMARY\s+OF\s+AUDIT\s+FINDINGS", | |
| r"Vehicle\s+Registration\s+Numbers\s+of\s+Records\s+Examined", | |
| r"CORRECTIVE\s+ACTION\s+REQUEST\s+\(CAR\)", | |
| r"NHVAS\s+APPROVED\s+AUDITOR\s+DECLARATION", | |
| r"Operator\s+Declaration", | |
| r"Operator\s+Information", | |
| r"Driver\s*/\s*Scheduler\s+Records\s+Examined" | |
| ] | |
| } | |
| # ============================================================================ | |
| # Utility helpers | |
| # ============================================================================ | |
| _unmatched_headers = {} | |
| def record_unmatched_header(header: str): | |
| if not header: | |
| return | |
| _unmatched_headers[header] = _unmatched_headers.get(header, 0) + 1 | |
| def load_json(filepath): | |
| with open(filepath, 'r', encoding='utf-8') as file: | |
| return json.load(file) | |
| def flatten_json(y, prefix=''): | |
| out = {} | |
| for key, val in y.items(): | |
| new_key = f"{prefix}.{key}" if prefix else key | |
| if isinstance(val, dict): | |
| out.update(flatten_json(val, new_key)) | |
| else: | |
| out[new_key] = val | |
| out[key] = val | |
| return out | |
| def is_red(run): | |
| color = run.font.color | |
| try: | |
| return color and ((getattr(color, "rgb", None) and color.rgb == RGBColor(255, 0, 0)) or getattr(color, "theme_color", None) == 1) | |
| except Exception: | |
| return False | |
| def get_value_as_string(value, field_name=""): | |
| if isinstance(value, list): | |
| if len(value) == 0: | |
| return "" | |
| elif len(value) == 1: | |
| return str(value[0]) | |
| else: | |
| # Keep lists intact for special patterns (e.g., ACN digits) but default to join | |
| if "australian company number" in field_name.lower() or "company number" in field_name.lower(): | |
| return value | |
| return " ".join(str(v) for v in value) | |
| else: | |
| return str(value) | |
| def get_clean_text(cell): | |
| text = "" | |
| for paragraph in cell.paragraphs: | |
| for run in paragraph.runs: | |
| text += run.text | |
| return text.strip() | |
| def has_red_text(cell): | |
| for paragraph in cell.paragraphs: | |
| for run in paragraph.runs: | |
| if is_red(run) and run.text.strip(): | |
| return True | |
| return False | |
| def has_red_text_in_paragraph(paragraph): | |
| for run in paragraph.runs: | |
| if is_red(run) and run.text.strip(): | |
| return True | |
| return False | |
| def normalize_header_text(s: str) -> str: | |
| if not s: | |
| return "" | |
| s = re.sub(r'\([^)]*\)', ' ', s) # remove parenthetical content | |
| s = s.replace("/", " ") | |
| s = re.sub(r'[^\w\s\#\%]', ' ', s) | |
| s = re.sub(r'\s+', ' ', s).strip().lower() | |
| # canonical tweaks | |
| s = s.replace('registrationno', 'registration number') | |
| s = s.replace('registrationnumber', 'registration number') | |
| s = s.replace('sub-contractor', 'sub contractor') | |
| s = s.replace('sub contracted', 'sub contractor') | |
| return s.strip() | |
| # ============================================================================ | |
| # JSON matching functions | |
| # - find_matching_json_value: (keeps behavior used elsewhere) | |
| # - find_matching_json_key_and_value: returns (key, value) so callers can | |
| # decide whether to use an entry based on the matched key. | |
| # ============================================================================ | |
| def find_matching_json_value(field_name, flat_json): | |
| """Legacy API: return value only (preserves existing callers).""" | |
| result = find_matching_json_key_and_value(field_name, flat_json) | |
| return result[1] if result else None | |
| def find_matching_json_key_and_value(field_name, flat_json) -> Optional[Tuple[str, Any]]: | |
| """ | |
| Return (matched_key, matched_value) or None. | |
| Safer thresholds: fuzzy matches require >=0.35 by default. | |
| """ | |
| field_name = (field_name or "").strip() | |
| if not field_name: | |
| return None | |
| # Exact match | |
| if field_name in flat_json: | |
| print(f" β Direct match found for key '{field_name}'") | |
| return field_name, flat_json[field_name] | |
| # Case-insensitive exact | |
| for key, value in flat_json.items(): | |
| if key.lower() == field_name.lower(): | |
| print(f" β Case-insensitive match found for key '{field_name}' -> '{key}'") | |
| return key, value | |
| # Special-case 'print name' preference for operator vs auditor (prefer fully-qualified) | |
| if field_name.lower().strip() == "print name": | |
| operator_keys = [k for k in flat_json.keys() if "operator" in k.lower() and "print name" in k.lower()] | |
| auditor_keys = [k for k in flat_json.keys() if "auditor" in k.lower() and ("print name" in k.lower() or "name" in k.lower())] | |
| if operator_keys: | |
| print(f" β Operator Print Name match: '{field_name}' -> '{operator_keys[0]}'") | |
| return operator_keys[0], flat_json[operator_keys[0]] | |
| elif auditor_keys: | |
| print(f" β Auditor Name match: '{field_name}' -> '{auditor_keys[0]}'") | |
| return auditor_keys[0], flat_json[auditor_keys[0]] | |
| # Suffix match for nested keys (e.g., 'section.field') | |
| for key, value in flat_json.items(): | |
| if '.' in key and key.split('.')[-1].lower() == field_name.lower(): | |
| print(f" β Suffix match found for key '{field_name}' -> '{key}'") | |
| return key, value | |
| # Clean and exact | |
| clean_field = re.sub(r'[^\w\s]', ' ', field_name.lower()).strip() | |
| clean_field = re.sub(r'\s+', ' ', clean_field) | |
| for key, value in flat_json.items(): | |
| clean_key = re.sub(r'[^\w\s]', ' ', key.lower()).strip() | |
| clean_key = re.sub(r'\s+', ' ', clean_key) | |
| if clean_field == clean_key: | |
| print(f" β Clean match found for key '{field_name}' -> '{key}'") | |
| return key, value | |
| # Fuzzy matching with token scoring | |
| field_words = set(word.lower() for word in re.findall(r'\b\w+\b', field_name) if len(word) > 2) | |
| if not field_words: | |
| return None | |
| best_key = None | |
| best_value = None | |
| best_score = 0.0 | |
| for key, value in flat_json.items(): | |
| key_words = set(word.lower() for word in re.findall(r'\b\w+\b', key) if len(word) > 2) | |
| if not key_words: | |
| continue | |
| common = field_words.intersection(key_words) | |
| if not common: | |
| # allow substring in normalized forms as a weaker fallback | |
| norm_field = normalize_header_text(field_name) | |
| norm_key = normalize_header_text(key) | |
| if norm_field and norm_key and (norm_field in norm_key or norm_key in norm_field): | |
| # substring score based on length ratio | |
| substring_score = min(len(norm_field), len(norm_key)) / max(len(norm_field), len(norm_key)) | |
| final_score = 0.4 * substring_score | |
| else: | |
| final_score = 0.0 | |
| else: | |
| similarity = len(common) / len(field_words.union(key_words)) | |
| coverage = len(common) / len(field_words) | |
| final_score = (similarity * 0.6) + (coverage * 0.4) | |
| if final_score > best_score: | |
| best_score = final_score | |
| best_key = key | |
| best_value = value | |
| # Accept only reasonable fuzzy matches (threshold 0.35) | |
| if best_key and best_score >= 0.35: | |
| print(f" β Fuzzy match found for key '{field_name}' with JSON key '{best_key}' (score: {best_score:.2f})") | |
| return best_key, best_value | |
| print(f" β No match found for '{field_name}'") | |
| return None | |
| # ============================================================================ | |
| # Red text helpers (unchanged except kept robust) | |
| # ============================================================================ | |
| def extract_red_text_segments(cell): | |
| red_segments = [] | |
| for para_idx, paragraph in enumerate(cell.paragraphs): | |
| current_segment = "" | |
| segment_runs = [] | |
| for run_idx, run in enumerate(paragraph.runs): | |
| if is_red(run): | |
| if run.text: | |
| current_segment += run.text | |
| segment_runs.append((para_idx, run_idx, run)) | |
| else: | |
| if segment_runs: | |
| red_segments.append({'text': current_segment, 'runs': segment_runs.copy(), 'paragraph_idx': para_idx}) | |
| current_segment = "" | |
| segment_runs = [] | |
| if segment_runs: | |
| red_segments.append({'text': current_segment, 'runs': segment_runs.copy(), 'paragraph_idx': para_idx}) | |
| return red_segments | |
| def replace_all_red_segments(red_segments, replacement_text): | |
| if not red_segments: | |
| return 0 | |
| if '\n' in replacement_text: | |
| replacement_lines = replacement_text.split('\n') | |
| else: | |
| replacement_lines = [replacement_text] | |
| replacements_made = 0 | |
| first_segment = red_segments[0] | |
| if first_segment['runs']: | |
| first_run = first_segment['runs'][0][2] | |
| first_run.text = replacement_lines[0] | |
| first_run.font.color.rgb = RGBColor(0, 0, 0) | |
| replacements_made = 1 | |
| for _, _, run in first_segment['runs'][1:]: | |
| run.text = '' | |
| for segment in red_segments[1:]: | |
| for _, _, run in segment['runs']: | |
| run.text = '' | |
| if len(replacement_lines) > 1 and red_segments: | |
| try: | |
| first_run = red_segments[0]['runs'][0][2] | |
| paragraph = first_run.element.getparent() | |
| from docx.oxml import OxmlElement | |
| for line in replacement_lines[1:]: | |
| if line.strip(): | |
| br = OxmlElement('w:br') | |
| first_run.element.append(br) | |
| new_run = paragraph.add_run(line.strip()) | |
| new_run.font.color.rgb = RGBColor(0, 0, 0) | |
| except Exception: | |
| if red_segments and red_segments[0]['runs']: | |
| first_run = red_segments[0]['runs'][0][2] | |
| first_run.text = ' '.join(replacement_lines) | |
| first_run.font.color.rgb = RGBColor(0, 0, 0) | |
| return replacements_made | |
| def replace_single_segment(segment, replacement_text): | |
| if not segment['runs']: | |
| return False | |
| first_run = segment['runs'][0][2] | |
| first_run.text = replacement_text | |
| first_run.font.color.rgb = RGBColor(0, 0, 0) | |
| for _, _, run in segment['runs'][1:]: | |
| run.text = '' | |
| return True | |
| def replace_red_text_in_cell(cell, replacement_text): | |
| red_segments = extract_red_text_segments(cell) | |
| if not red_segments: | |
| return 0 | |
| return replace_all_red_segments(red_segments, replacement_text) | |
| # ============================================================================ | |
| # Specialized handlers (vehicle, attendance, management, operator) with fixes | |
| # ============================================================================ | |
| def handle_australian_company_number(row, company_numbers): | |
| replacements_made = 0 | |
| for i, digit in enumerate(company_numbers): | |
| cell_idx = i + 1 | |
| if cell_idx < len(row.cells): | |
| cell = row.cells[cell_idx] | |
| if has_red_text(cell): | |
| cell_replacements = replace_red_text_in_cell(cell, str(digit)) | |
| replacements_made += cell_replacements | |
| print(f" -> Placed digit '{digit}' in cell {cell_idx + 1}") | |
| return replacements_made | |
| def handle_vehicle_registration_table(table, flat_json): | |
| """ | |
| Stronger header normalization + substring matching for long headers. | |
| Keeps existing behavior but reduces 'No mapping found' by using normalized substring matching. | |
| """ | |
| replacements_made = 0 | |
| # Build candidate vehicle_section similar to prior logic | |
| vehicle_section = None | |
| # Prefer keys explicitly mentioning 'registration' or 'vehicle' | |
| candidates = [(k, v) for k, v in flat_json.items() if 'registration' in k.lower() or 'vehicle' in k.lower()] | |
| if candidates: | |
| # prefer the one with longest key match (likely most specific) | |
| candidates.sort(key=lambda kv: -len(kv[0])) | |
| vehicle_section = candidates[0][1] | |
| # fallback: collect flattened keys that look like vehicle columns | |
| if vehicle_section is None: | |
| potential_columns = {} | |
| for key, value in flat_json.items(): | |
| lk = key.lower() | |
| if any(col_name in lk for col_name in ["registration number", "sub-contractor", "weight verification", "rfs suspension", "trip records", "fault recording", "fault repair", "daily checks", "roadworthiness"]): | |
| if "." in key: | |
| column_name = key.split(".")[-1] | |
| else: | |
| column_name = key | |
| potential_columns[column_name] = value | |
| if potential_columns: | |
| vehicle_section = potential_columns | |
| print(f" β Found vehicle data from flattened keys: {list(vehicle_section.keys())}") | |
| if not vehicle_section: | |
| print(f" β Vehicle registration data not found in JSON") | |
| return 0 | |
| # Normalize vehicle_section into dict of column_label -> list/value | |
| if isinstance(vehicle_section, list): | |
| # if list of dicts, pivot | |
| if vehicle_section and isinstance(vehicle_section[0], dict): | |
| flattened = {} | |
| for entry in vehicle_section: | |
| for k, v in entry.items(): | |
| flattened.setdefault(k, []).append(v) | |
| vehicle_section = flattened | |
| else: | |
| # can't interpret, bail | |
| vehicle_section = {} | |
| if not isinstance(vehicle_section, dict): | |
| try: | |
| vehicle_section = dict(vehicle_section) | |
| except Exception: | |
| vehicle_section = {} | |
| print(f" β Found vehicle registration data with {len(vehicle_section)} columns") | |
| # Find header row (look for registration + number or reg no) | |
| header_row_idx = -1 | |
| header_row = None | |
| for row_idx, row in enumerate(table.rows): | |
| row_text = " ".join(get_clean_text(cell).lower() for cell in row.cells) | |
| if ("registration" in row_text and "number" in row_text) or "reg no" in row_text or "registration no" in row_text: | |
| header_row_idx = row_idx | |
| header_row = row | |
| break | |
| if header_row_idx == -1: | |
| print(f" β Could not find header row in vehicle table") | |
| return 0 | |
| print(f" β Found header row at index {header_row_idx}") | |
| # Build master labels from vehicle_section keys | |
| master_labels = {} | |
| for orig_key in vehicle_section.keys(): | |
| norm = normalize_header_text(str(orig_key)) | |
| if norm: | |
| # if there is collision, prefer longer orig_key (more specific) | |
| if norm in master_labels: | |
| if len(orig_key) > len(master_labels[norm]): | |
| master_labels[norm] = orig_key | |
| else: | |
| master_labels[norm] = orig_key | |
| # Map header cells using normalized token overlap + substring fallback | |
| column_mapping = {} | |
| for col_idx, cell in enumerate(header_row.cells): | |
| header_text = get_clean_text(cell).strip() | |
| if not header_text: | |
| continue | |
| header_key = header_text.strip().lower() | |
| if header_key in {"no", "no.", "#"}: | |
| continue | |
| norm_header = normalize_header_text(header_text) | |
| best_match = None | |
| best_score = 0.0 | |
| # exact normalized match | |
| if norm_header in master_labels: | |
| best_match = master_labels[norm_header] | |
| best_score = 1.0 | |
| else: | |
| # token overlap | |
| header_tokens = set(t for t in norm_header.split() if len(t) > 2) | |
| for norm_key, orig_label in master_labels.items(): | |
| key_tokens = set(t for t in norm_key.split() if len(t) > 2) | |
| if not key_tokens: | |
| continue | |
| common = header_tokens.intersection(key_tokens) | |
| if common: | |
| score = len(common) / max(1, len(header_tokens.union(key_tokens))) | |
| else: | |
| # substring fallback on normalized strings | |
| if norm_header in norm_key or norm_key in norm_header: | |
| score = min(len(norm_header), len(norm_key)) / max(len(norm_header), len(norm_key)) | |
| else: | |
| score = 0.0 | |
| if score > best_score: | |
| best_score = score | |
| best_match = orig_label | |
| # additional heuristic: if header contains 'roadworthiness' and any master_labels key contains that token, accept | |
| if not best_match: | |
| for norm_key, orig_label in master_labels.items(): | |
| if 'roadworthiness' in norm_header and 'roadworthiness' in norm_key: | |
| best_match = orig_label | |
| best_score = 0.65 | |
| break | |
| if best_match and best_score >= 0.30: | |
| column_mapping[col_idx] = best_match | |
| print(f" π Column {col_idx}: '{header_text}' -> '{best_match}' (norm:'{norm_header}' score:{best_score:.2f})") | |
| else: | |
| print(f" β οΈ No mapping found for '{header_text}' (norm:'{norm_header}')") | |
| record_unmatched_header(header_text) | |
| if not column_mapping: | |
| print(f" β No column mappings found") | |
| return 0 | |
| # Determine how many rows of data to populate | |
| max_data_rows = 0 | |
| for json_key, data in vehicle_section.items(): | |
| if isinstance(data, list): | |
| max_data_rows = max(max_data_rows, len(data)) | |
| print(f" π Need to populate {max_data_rows} data rows") | |
| # Populate or add rows | |
| for data_row_index in range(max_data_rows): | |
| table_row_idx = header_row_idx + 1 + data_row_index | |
| if table_row_idx >= len(table.rows): | |
| print(f" β οΈ Row {table_row_idx + 1} doesn't exist, adding one") | |
| table.add_row() | |
| row = table.rows[table_row_idx] | |
| print(f" π Processing data row {table_row_idx + 1} (vehicle {data_row_index + 1})") | |
| for col_idx, json_key in column_mapping.items(): | |
| if col_idx < len(row.cells): | |
| cell = row.cells[col_idx] | |
| column_data = vehicle_section.get(json_key, []) | |
| if isinstance(column_data, list) and data_row_index < len(column_data): | |
| replacement_value = str(column_data[data_row_index]) | |
| cell_text = get_clean_text(cell) | |
| if has_red_text(cell) or not cell_text.strip(): | |
| if not cell_text.strip(): | |
| cell.text = replacement_value | |
| replacements_made += 1 | |
| print(f" -> Added '{replacement_value}' to empty cell (col '{json_key}')") | |
| else: | |
| cell_replacements = replace_red_text_in_cell(cell, replacement_value) | |
| replacements_made += cell_replacements | |
| if cell_replacements > 0: | |
| print(f" -> Replaced red text with '{replacement_value}' (col '{json_key}')") | |
| return replacements_made | |
| def handle_attendance_list_table_enhanced(table, flat_json): | |
| """Same as before β preserved behavior.""" | |
| replacements_made = 0 | |
| attendance_patterns = ["attendance list", "names and position titles", "attendees"] | |
| found_attendance_row = None | |
| for row_idx, row in enumerate(table.rows[:3]): | |
| for cell_idx, cell in enumerate(row.cells): | |
| cell_text = get_clean_text(cell).lower() | |
| if any(pattern in cell_text for pattern in attendance_patterns): | |
| found_attendance_row = row_idx | |
| print(f" π― ENHANCED: Found Attendance List in row {row_idx + 1}, cell {cell_idx + 1}") | |
| break | |
| if found_attendance_row is not None: | |
| break | |
| if found_attendance_row is None: | |
| return 0 | |
| attendance_value = None | |
| attendance_search_keys = [ | |
| "Attendance List (Names and Position Titles).Attendance List (Names and Position Titles)", | |
| "Attendance List (Names and Position Titles)", | |
| "attendance list", | |
| "attendees" | |
| ] | |
| print(f" π Searching for attendance data in JSON...") | |
| for search_key in attendance_search_keys: | |
| kv = find_matching_json_key_and_value(search_key, flat_json) | |
| if kv: | |
| attendance_value = kv[1] | |
| print(f" β Found attendance data with key: '{kv[0]}'") | |
| print(f" π Raw value: {attendance_value}") | |
| break | |
| if attendance_value is None: | |
| print(f" β No attendance data found in JSON") | |
| return 0 | |
| # Find red text candidate cell | |
| target_cell = None | |
| print(f" π Scanning ALL cells in attendance table for red text...") | |
| for row_idx, row in enumerate(table.rows): | |
| for cell_idx, cell in enumerate(row.cells): | |
| if has_red_text(cell): | |
| red_text = "" | |
| for paragraph in cell.paragraphs: | |
| for run in paragraph.runs: | |
| if is_red(run): | |
| red_text += run.text | |
| if red_text.strip(): | |
| print(f" π― Found red text in row {row_idx + 1}, cell {cell_idx + 1}") | |
| print(f" π Red text content: '{red_text[:60]}...'") | |
| red_lower = red_text.lower() | |
| if any(ind in red_lower for ind in ['manager', 'director', 'auditor', 'β', '-']): | |
| target_cell = cell | |
| print(f" β This looks like attendance data - using this cell") | |
| break | |
| if target_cell: | |
| break | |
| if target_cell is None: | |
| print(f" β οΈ No red text found that looks like attendance data") | |
| return 0 | |
| if has_red_text(target_cell): | |
| print(f" π§ Replacing red text with properly formatted attendance list...") | |
| if isinstance(attendance_value, list): | |
| attendance_list = [str(item).strip() for item in attendance_value if str(item).strip()] | |
| else: | |
| attendance_list = [str(attendance_value).strip()] | |
| print(f" π Attendance items to add:") | |
| for i, item in enumerate(attendance_list): | |
| print(f" {i+1}. {item}") | |
| replacement_text = "\n".join(attendance_list) | |
| cell_replacements = replace_red_text_in_cell(target_cell, replacement_text) | |
| replacements_made += cell_replacements | |
| print(f" β Added {len(attendance_list)} attendance items") | |
| print(f" π Replacements made: {cell_replacements}") | |
| return replacements_made | |
| def fix_management_summary_details_column(table, flat_json): | |
| """Preserve behavior but prefer scoped mgmt dicts.""" | |
| replacements_made = 0 | |
| print(f" π― FIX: Management Summary DETAILS column processing") | |
| table_text = "" | |
| for row in table.rows[:3]: | |
| for cell in row.cells: | |
| table_text += get_clean_text(cell).lower() + " " | |
| mgmt_types = [] | |
| if "mass management" in table_text or "mass" in table_text: | |
| mgmt_types.append("Mass Management Summary") | |
| if "maintenance management" in table_text or "maintenance" in table_text: | |
| mgmt_types.append("Maintenance Management Summary") | |
| if "fatigue management" in table_text or "fatigue" in table_text: | |
| mgmt_types.append("Fatigue Management Summary") | |
| if not mgmt_types: | |
| if any("std 5" in get_clean_text(c).lower() for r in table.rows for c in r.cells): | |
| mgmt_types.append("Mass Management Summary") | |
| if not mgmt_types: | |
| return 0 | |
| for mgmt_type in mgmt_types: | |
| print(f" β Confirmed {mgmt_type} table processing") | |
| mgmt_data = flat_json.get(mgmt_type) | |
| if not isinstance(mgmt_data, dict): | |
| for key in flat_json.keys(): | |
| if mgmt_type.split()[0].lower() in key.lower() and "summary" in key.lower(): | |
| mgmt_data = flat_json.get(key) | |
| break | |
| if not isinstance(mgmt_data, dict): | |
| print(f" β οΈ No JSON management dict found for {mgmt_type}, skipping this type") | |
| continue | |
| for row_idx, row in enumerate(table.rows): | |
| if len(row.cells) >= 2: | |
| standard_cell = row.cells[0] | |
| details_cell = row.cells[1] | |
| standard_text = get_clean_text(standard_cell).strip().lower() | |
| if "std 5" in standard_text or "verification" in standard_text: | |
| if has_red_text(details_cell): | |
| std_val = None | |
| for candidate in ("Std 5. Verification", "Std 5 Verification", "Std 5", "Verification"): | |
| std_val = mgmt_data.get(candidate) | |
| if std_val is not None: | |
| break | |
| if std_val is None: | |
| for k, v in mgmt_data.items(): | |
| if 'std 5' in k.lower() or 'verification' in k.lower(): | |
| std_val = v | |
| break | |
| if std_val is not None: | |
| replacement_text = get_value_as_string(std_val, "Std 5. Verification") | |
| cell_replacements = replace_red_text_in_cell(details_cell, replacement_text) | |
| replacements_made += cell_replacements | |
| if cell_replacements: | |
| print(f" β Replaced Std 5. Verification details for {mgmt_type}") | |
| if "std 6" in standard_text or "internal review" in standard_text: | |
| if has_red_text(details_cell): | |
| std_val = None | |
| for candidate in ("Std 6. Internal Review", "Std 6 Internal Review", "Std 6", "Internal Review"): | |
| std_val = mgmt_data.get(candidate) | |
| if std_val is not None: | |
| break | |
| if std_val is None: | |
| for k, v in mgmt_data.items(): | |
| if 'std 6' in k.lower() or 'internal review' in k.lower(): | |
| std_val = v | |
| break | |
| if std_val is not None: | |
| replacement_text = get_value_as_string(std_val, "Std 6. Internal Review") | |
| cell_replacements = replace_red_text_in_cell(details_cell, replacement_text) | |
| replacements_made += cell_replacements | |
| if cell_replacements: | |
| print(f" β Replaced Std 6. Internal Review details for {mgmt_type}") | |
| return replacements_made | |
| # ============================================================================ | |
| # Canonical operator declaration fixer β SAFER | |
| # ============================================================================ | |
| def fix_operator_declaration_empty_values(table, flat_json): | |
| replacements_made = 0 | |
| print(f" π― FIX: Operator Declaration empty values processing") | |
| table_context = "" | |
| for row in table.rows: | |
| for cell in row.cells: | |
| table_context += get_clean_text(cell).lower() + " " | |
| if not ("print name" in table_context and "position title" in table_context): | |
| return 0 | |
| print(f" β Confirmed Operator Declaration table") | |
| def parse_name_and_position(value): | |
| if value is None: | |
| return None, None | |
| if isinstance(value, list): | |
| if len(value) == 0: | |
| return None, None | |
| if len(value) == 1: | |
| return str(value[0]).strip(), None | |
| # common [name, position] pattern | |
| first = str(value[0]).strip() | |
| second = str(value[1]).strip() | |
| if first and second: | |
| return first, second | |
| value = " ".join(str(v).strip() for v in value if str(v).strip()) | |
| s = str(value).strip() | |
| if not s: | |
| return None, None | |
| parts = re.split(r'\s+[-ββ]\s+|\s*,\s*|\s*\|\s*', s) | |
| if len(parts) >= 2: | |
| left = parts[0].strip() | |
| right = parts[1].strip() | |
| role_indicators = ['manager', 'auditor', 'owner', 'director', 'supervisor', | |
| 'coordinator', 'driver', 'operator', 'representative', 'chief'] | |
| if any(ind in right.lower() for ind in role_indicators) or len(right.split()) <= 4: | |
| return left, right | |
| if any(ind in left.lower() for ind in role_indicators) and not any(ind in right.lower() for ind in role_indicators): | |
| return right, left | |
| return left, right | |
| tokens = s.split() | |
| if len(tokens) >= 2: | |
| last = tokens[-1] | |
| role_indicators = ['manager', 'auditor', 'owner', 'director', 'supervisor', | |
| 'coordinator', 'driver', 'operator', 'representative', 'chief'] | |
| if any(ind == last.lower() for ind in role_indicators): | |
| return " ".join(tokens[:-1]), last | |
| return s, None | |
| for row_idx, row in enumerate(table.rows): | |
| if len(row.cells) >= 2: | |
| cell1_text = get_clean_text(row.cells[0]).strip().lower() | |
| cell2_text = get_clean_text(row.cells[1]).strip().lower() | |
| # header detection | |
| if "print name" in cell1_text and "position" in cell2_text: | |
| print(f" π Found header row at {row_idx + 1}") | |
| if row_idx + 1 < len(table.rows): | |
| data_row = table.rows[row_idx + 1] | |
| if len(data_row.cells) >= 2: | |
| name_cell = data_row.cells[0] | |
| position_cell = data_row.cells[1] | |
| name_text = get_clean_text(name_cell).strip() | |
| position_text = get_clean_text(position_cell).strip() | |
| print(f" π Current values: Name='{name_text}', Position='{position_text}'") | |
| # Prefer exact qualified keys first (use key-aware lookup) | |
| name_kv = find_matching_json_key_and_value("Operator Declaration.Print Name", flat_json) or find_matching_json_key_and_value("Print Name", flat_json) | |
| position_kv = find_matching_json_key_and_value("Operator Declaration.Position Title", flat_json) or find_matching_json_key_and_value("Position Title", flat_json) | |
| name_value = name_kv[1] if name_kv else None | |
| name_key = name_kv[0] if name_kv else None | |
| position_value = position_kv[1] if position_kv else None | |
| position_key = position_kv[0] if position_kv else None | |
| # parse combined cases | |
| parsed_name_from_nameval, parsed_pos_from_nameval = parse_name_and_position(name_value) if name_value is not None else (None, None) | |
| parsed_name_from_posval, parsed_pos_from_posval = parse_name_and_position(position_value) if position_value is not None else (None, None) | |
| final_name = None | |
| final_pos = None | |
| if parsed_name_from_nameval: | |
| final_name = parsed_name_from_nameval | |
| elif name_value is not None: | |
| final_name = get_value_as_string(name_value) | |
| # Position acceptance policy: | |
| # - Accept position_value ONLY if matched key indicates position/title OR parsed value looks like a role | |
| def looks_like_role(s: str) -> bool: | |
| if not s: | |
| return False | |
| s = s.lower() | |
| roles = ['manager', 'auditor', 'owner', 'director', 'supervisor', 'coordinator', 'driver', 'operator', 'representative', 'chief'] | |
| # short role descriptions or containing role token | |
| if any(r in s for r in roles): | |
| return True | |
| # single/short token likely role (<=4 tokens) | |
| if len(s.split()) <= 4 and any(c.isalpha() for c in s): | |
| return True | |
| return False | |
| # Only use position_value if the matched key strongly indicates position/title | |
| use_position = False | |
| if position_kv: | |
| k_lower = (position_key or "").lower() | |
| if ("position" in k_lower or "title" in k_lower or "role" in k_lower): | |
| use_position = True | |
| # Avoid using attendance keys or attendance text as position source | |
| if position_kv and ("attendance" in position_key.lower() or "attendance list" in position_key.lower() or "attendees" in position_key.lower()): | |
| use_position = False | |
| if use_position: | |
| # choose parsed pos if available | |
| if parsed_pos_from_posval: | |
| final_pos = parsed_pos_from_posval | |
| else: | |
| final_pos = get_value_as_string(position_value) if position_value is not None else None | |
| else: | |
| # allow parsed position gleaned from name_value (if it looks like a role) | |
| if parsed_pos_from_nameval and looks_like_role(parsed_pos_from_nameval): | |
| final_pos = parsed_pos_from_nameval | |
| # final normalization | |
| if isinstance(final_name, list): | |
| final_name = " ".join(str(x) for x in final_name).strip() | |
| if isinstance(final_pos, list): | |
| final_pos = " ".join(str(x) for x in final_pos).strip() | |
| if isinstance(final_name, str): | |
| final_name = final_name.strip() | |
| if isinstance(final_pos, str): | |
| final_pos = final_pos.strip() | |
| def looks_like_person(name_str): | |
| if not name_str: | |
| return False | |
| bad_phrases = ["pty ltd", "company", "farming", "p/l", "plc"] | |
| low = name_str.lower() | |
| if any(bp in low for bp in bad_phrases): | |
| return False | |
| return len(name_str) > 1 and any(c.isalpha() for c in name_str) | |
| # Write name if empty or red | |
| if (not name_text or has_red_text(name_cell)) and final_name and looks_like_person(final_name): | |
| if has_red_text(name_cell): | |
| replace_red_text_in_cell(name_cell, final_name) | |
| else: | |
| name_cell.text = final_name | |
| replacements_made += 1 | |
| print(f" β Updated Print Name -> '{final_name}'") | |
| # Write position if empty or red and final_pos appears role-like | |
| if (not position_text or has_red_text(position_cell)) and final_pos and looks_like_role(final_pos): | |
| if has_red_text(position_cell): | |
| replace_red_text_in_cell(position_cell, final_pos) | |
| else: | |
| position_cell.text = final_pos | |
| replacements_made += 1 | |
| print(f" β Updated Position Title -> '{final_pos}'") | |
| break | |
| if replacements_made > 0: | |
| try: | |
| setattr(table, "_processed_operator_declaration", True) | |
| print(" π Marked table as processed by Operator Declaration handler") | |
| except Exception: | |
| pass | |
| return replacements_made | |
| def handle_multiple_red_segments_in_cell(cell, flat_json): | |
| replacements_made = 0 | |
| red_segments = extract_red_text_segments(cell) | |
| if not red_segments: | |
| return 0 | |
| for i, segment in enumerate(red_segments): | |
| segment_text = segment['text'].strip() | |
| if segment_text: | |
| kv = find_matching_json_key_and_value(segment_text, flat_json) | |
| if kv: | |
| replacement_text = get_value_as_string(kv[1], segment_text) | |
| if replace_single_segment(segment, replacement_text): | |
| replacements_made += 1 | |
| print(f" β Replaced segment {i+1}: '{segment_text}' -> '{replacement_text}'") | |
| return replacements_made | |
| def handle_nature_business_multiline_fix(cell, flat_json): | |
| replacements_made = 0 | |
| red_text = "" | |
| for paragraph in cell.paragraphs: | |
| for run in paragraph.runs: | |
| if is_red(run): | |
| red_text += run.text | |
| red_text = red_text.strip() | |
| if not red_text: | |
| return 0 | |
| nature_indicators = ["transport", "logistics", "freight", "delivery", "trucking", "haulage"] | |
| if any(indicator in red_text.lower() for indicator in nature_indicators): | |
| kv = find_matching_json_key_and_value("Nature of Business", flat_json) or find_matching_json_key_and_value("Nature of the Operators Business (Summary)", flat_json) | |
| if kv: | |
| replacement_text = get_value_as_string(kv[1], "Nature of Business") | |
| cell_replacements = replace_red_text_in_cell(cell, replacement_text) | |
| replacements_made += cell_replacements | |
| print(f" β Fixed Nature of Business multiline content") | |
| return replacements_made | |
| def handle_management_summary_fix(cell, flat_json): | |
| replacements_made = 0 | |
| red_text = "" | |
| for paragraph in cell.paragraphs: | |
| for run in paragraph.runs: | |
| if is_red(run): | |
| red_text += run.text | |
| red_text = red_text.strip() | |
| if not red_text: | |
| return 0 | |
| management_types = ["Mass Management Summary", "Maintenance Management Summary", "Fatigue Management Summary"] | |
| for mgmt_type in management_types: | |
| if mgmt_type in flat_json and isinstance(flat_json[mgmt_type], dict): | |
| mgmt_data = flat_json[mgmt_type] | |
| for std_key, std_value in mgmt_data.items(): | |
| if isinstance(std_value, list) and std_value: | |
| if len(red_text) > 10: | |
| for item in std_value: | |
| if red_text.lower() in str(item).lower() or str(item).lower() in red_text.lower(): | |
| replacement_text = "\n".join(str(i) for i in std_value) | |
| cell_replacements = replace_red_text_in_cell(cell, replacement_text) | |
| replacements_made += cell_replacements | |
| print(f" β Fixed {mgmt_type} - {std_key}") | |
| return replacements_made | |
| return replacements_made | |
| def handle_print_accreditation_section(table, flat_json): | |
| replacements_made = 0 | |
| if getattr(table, "_processed_operator_declaration", False): | |
| print(f" βοΈ Skipping Print Accreditation - this is an Operator Declaration table") | |
| return 0 | |
| table_context = "" | |
| for row in table.rows: | |
| for cell in row.cells: | |
| table_context += get_clean_text(cell).lower() + " " | |
| if "operator declaration" in table_context or ("print name" in table_context and "position title" in table_context): | |
| print(f" βοΈ Skipping Print Accreditation - this is an Operator Declaration table") | |
| return 0 | |
| print(f" π Processing Print Accreditation section") | |
| for row_idx, row in enumerate(table.rows): | |
| for cell_idx, cell in enumerate(row.cells): | |
| if has_red_text(cell): | |
| accreditation_fields = [ | |
| "(print accreditation name)", | |
| "Operator name (Legal entity)", | |
| "Print accreditation name" | |
| ] | |
| for field in accreditation_fields: | |
| kv = find_matching_json_key_and_value(field, flat_json) | |
| if kv: | |
| replacement_text = get_value_as_string(kv[1], field) | |
| if replacement_text.strip(): | |
| cell_replacements = replace_red_text_in_cell(cell, replacement_text) | |
| replacements_made += cell_replacements | |
| if cell_replacements > 0: | |
| print(f" β Fixed accreditation: {kv[0]}") | |
| break | |
| return replacements_made | |
| def process_single_column_sections(cell, key_text, flat_json): | |
| replacements_made = 0 | |
| if has_red_text(cell): | |
| red_text = "" | |
| for paragraph in cell.paragraphs: | |
| for run in paragraph.runs: | |
| if is_red(run): | |
| red_text += run.text | |
| if red_text.strip(): | |
| kv = find_matching_json_key_and_value(red_text.strip(), flat_json) | |
| if not kv: | |
| kv = find_matching_json_key_and_value(key_text, flat_json) | |
| if kv: | |
| section_replacement = get_value_as_string(kv[1], red_text.strip()) | |
| cell_replacements = replace_red_text_in_cell(cell, section_replacement) | |
| replacements_made += cell_replacements | |
| if cell_replacements > 0: | |
| print(f" β Fixed single column section: '{key_text}'") | |
| return replacements_made | |
| # ============================================================================ | |
| # Main table/paragraph/heading processing (preserve logic + use new helpers) | |
| # ============================================================================ | |
| def process_tables(document, flat_json): | |
| replacements_made = 0 | |
| for table_idx, table in enumerate(document.tables): | |
| print(f"\nπ Processing table {table_idx + 1}:") | |
| table_text = "" | |
| for row in table.rows[:3]: | |
| for cell in row.cells: | |
| table_text += get_clean_text(cell).lower() + " " | |
| management_summary_indicators = ["mass management", "maintenance management", "fatigue management"] | |
| has_management = any(indicator in table_text for indicator in management_summary_indicators) | |
| has_details = "details" in table_text | |
| if has_management and has_details: | |
| print(f" π Detected Management Summary table") | |
| summary_fixes = fix_management_summary_details_column(table, flat_json) | |
| replacements_made += summary_fixes | |
| summary_replacements = 0 | |
| for row_idx, row in enumerate(table.rows): | |
| for cell_idx, cell in enumerate(row.cells): | |
| if has_red_text(cell): | |
| for mgmt_type in ["Mass Management Summary", "Maintenance Management Summary", "Fatigue Management Summary"]: | |
| if mgmt_type.lower().replace(" summary", "") in table_text: | |
| if mgmt_type in flat_json: | |
| mgmt_data = flat_json[mgmt_type] | |
| if isinstance(mgmt_data, dict): | |
| for std_key, std_value in mgmt_data.items(): | |
| if isinstance(std_value, list) and len(std_value) > 0: | |
| red_text = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red(run)).strip() | |
| for item in std_value: | |
| if len(red_text) > 15 and red_text.lower() in str(item).lower(): | |
| replacement_text = "\n".join(str(i) for i in std_value) | |
| cell_replacements = replace_red_text_in_cell(cell, replacement_text) | |
| summary_replacements += cell_replacements | |
| print(f" β Updated {std_key} with summary data") | |
| break | |
| break | |
| if summary_replacements == 0: | |
| cell_replacements = handle_management_summary_fix(cell, flat_json) | |
| summary_replacements += cell_replacements | |
| replacements_made += summary_replacements | |
| continue | |
| # Vehicle tables detection | |
| vehicle_indicators = ["registration number", "sub-contractor", "weight verification", "rfs suspension", "registration"] | |
| indicator_count = sum(1 for indicator in vehicle_indicators if indicator in table_text) | |
| if indicator_count >= 2: | |
| print(f" π Detected Vehicle Registration table") | |
| vehicle_replacements = handle_vehicle_registration_table(table, flat_json) | |
| replacements_made += vehicle_replacements | |
| continue | |
| # Attendance | |
| if "attendance list" in table_text and "names and position titles" in table_text: | |
| print(f" π₯ Detected Attendance List table") | |
| attendance_replacements = handle_attendance_list_table_enhanced(table, flat_json) | |
| replacements_made += attendance_replacements | |
| continue | |
| # Print Accreditation / Operator Declaration | |
| print_accreditation_indicators = ["print name", "position title"] | |
| indicator_count = sum(1 for indicator in print_accreditation_indicators if indicator in table_text) | |
| if indicator_count >= 2 or ("print name" in table_text and "position title" in table_text): | |
| print(f" π Detected Print Accreditation/Operator Declaration table") | |
| declaration_fixes = fix_operator_declaration_empty_values(table, flat_json) | |
| replacements_made += declaration_fixes | |
| if not getattr(table, "_processed_operator_declaration", False): | |
| print_accreditation_replacements = handle_print_accreditation_section(table, flat_json) | |
| replacements_made += print_accreditation_replacements | |
| continue | |
| # Regular table rows handling (preserved) | |
| for row_idx, row in enumerate(table.rows): | |
| if len(row.cells) < 1: | |
| continue | |
| key_cell = row.cells[0] | |
| key_text = get_clean_text(key_cell) | |
| if not key_text: | |
| continue | |
| print(f" π Row {row_idx + 1}: Key = '{key_text}'") | |
| kv = find_matching_json_key_and_value(key_text, flat_json) | |
| json_value = kv[1] if kv else None | |
| if json_value is not None: | |
| replacement_text = get_value_as_string(json_value, key_text) | |
| # ACN handling | |
| if ("australian company number" in key_text.lower() or "company number" in key_text.lower()) and isinstance(json_value, list): | |
| cell_replacements = handle_australian_company_number(row, json_value) | |
| replacements_made += cell_replacements | |
| # section headers | |
| elif ("attendance list" in key_text.lower() or "nature of" in key_text.lower()) and row_idx + 1 < len(table.rows): | |
| print(f" β Section header detected, checking next row...") | |
| next_row = table.rows[row_idx + 1] | |
| for cell_idx, cell in enumerate(next_row.cells): | |
| if has_red_text(cell): | |
| print(f" β Found red text in next row, cell {cell_idx + 1}") | |
| if isinstance(json_value, list): | |
| section_text = "\n".join(str(item) for item in json_value) | |
| else: | |
| section_text = replacement_text | |
| cell_replacements = replace_red_text_in_cell(cell, section_text) | |
| replacements_made += cell_replacements | |
| if cell_replacements > 0: | |
| print(f" -> Replaced section content") | |
| # single column | |
| elif len(row.cells) == 1 or (len(row.cells) > 1 and not any(has_red_text(row.cells[i]) for i in range(1, len(row.cells)))): | |
| if has_red_text(key_cell): | |
| cell_replacements = process_single_column_sections(key_cell, key_text, flat_json) | |
| replacements_made += cell_replacements | |
| # key-value pairs | |
| else: | |
| for cell_idx in range(1, len(row.cells)): | |
| value_cell = row.cells[cell_idx] | |
| if has_red_text(value_cell): | |
| print(f" β Found red text in column {cell_idx + 1}") | |
| cell_replacements = replace_red_text_in_cell(value_cell, replacement_text) | |
| replacements_made += cell_replacements | |
| else: | |
| # fallback single cell red-text key | |
| if len(row.cells) == 1 and has_red_text(key_cell): | |
| red_text = "" | |
| for paragraph in key_cell.paragraphs: | |
| for run in paragraph.runs: | |
| if is_red(run): | |
| red_text += run.text | |
| if red_text.strip(): | |
| kv2 = find_matching_json_key_and_value(red_text.strip(), flat_json) | |
| if kv2: | |
| section_replacement = get_value_as_string(kv2[1], red_text.strip()) | |
| cell_replacements = replace_red_text_in_cell(key_cell, section_replacement) | |
| replacements_made += cell_replacements | |
| # attempt multiple red-segments or surgical fixes | |
| for cell_idx in range(len(row.cells)): | |
| cell = row.cells[cell_idx] | |
| if has_red_text(cell): | |
| cell_replacements = handle_multiple_red_segments_in_cell(cell, flat_json) | |
| replacements_made += cell_replacements | |
| if cell_replacements == 0: | |
| surgical_fix = handle_nature_business_multiline_fix(cell, flat_json) | |
| replacements_made += surgical_fix | |
| if cell_replacements == 0: | |
| management_summary_fix = handle_management_summary_fix(cell, flat_json) | |
| replacements_made += management_summary_fix | |
| # Final operator/auditor declaration check on last few tables | |
| print(f"\nπ― Final check for Declaration tables...") | |
| for table in document.tables[-3:]: | |
| if len(table.rows) <= 4: | |
| if getattr(table, "_processed_operator_declaration", False): | |
| print(f" βοΈ Skipping - already processed by operator declaration handler") | |
| continue | |
| declaration_fix = fix_operator_declaration_empty_values(table, flat_json) | |
| replacements_made += declaration_fix | |
| return replacements_made | |
| def process_paragraphs(document, flat_json): | |
| replacements_made = 0 | |
| print(f"\nπ Processing paragraphs:") | |
| for para_idx, paragraph in enumerate(document.paragraphs): | |
| red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()] | |
| if red_runs: | |
| red_text_only = "".join(run.text for run in red_runs).strip() | |
| print(f" π Paragraph {para_idx + 1}: Found red text: '{red_text_only}'") | |
| kv = find_matching_json_key_and_value(red_text_only, flat_json) | |
| json_value = kv[1] if kv else None | |
| if json_value is None: | |
| if "AUDITOR SIGNATURE" in red_text_only.upper() or "DATE" in red_text_only.upper(): | |
| kv = find_matching_json_key_and_value("auditor signature", flat_json) | |
| elif "OPERATOR SIGNATURE" in red_text_only.upper(): | |
| kv = find_matching_json_key_and_value("operator signature", flat_json) | |
| json_value = kv[1] if kv else None | |
| if json_value is not None: | |
| replacement_text = get_value_as_string(json_value) | |
| print(f" β Replacing red text with: '{replacement_text}'") | |
| red_runs[0].text = replacement_text | |
| red_runs[0].font.color.rgb = RGBColor(0, 0, 0) | |
| for run in red_runs[1:]: | |
| run.text = '' | |
| replacements_made += 1 | |
| return replacements_made | |
| def process_headings(document, flat_json): | |
| replacements_made = 0 | |
| print(f"\nπ Processing headings:") | |
| paragraphs = document.paragraphs | |
| for para_idx, paragraph in enumerate(paragraphs): | |
| paragraph_text = paragraph.text.strip() | |
| if not paragraph_text: | |
| continue | |
| matched_heading = None | |
| for category, patterns in HEADING_PATTERNS.items(): | |
| for pattern in patterns: | |
| if re.search(pattern, paragraph_text, re.IGNORECASE): | |
| matched_heading = pattern | |
| break | |
| if matched_heading: | |
| break | |
| if matched_heading: | |
| print(f" π Found heading at paragraph {para_idx + 1}: '{paragraph_text}'") | |
| if has_red_text_in_paragraph(paragraph): | |
| print(f" π΄ Found red text in heading itself") | |
| heading_replacements = process_red_text_in_paragraph(paragraph, paragraph_text, flat_json) | |
| replacements_made += heading_replacements | |
| for next_para_offset in range(1, 6): | |
| next_para_idx = para_idx + next_para_offset | |
| if next_para_idx >= len(paragraphs): | |
| break | |
| next_paragraph = paragraphs[next_para_idx] | |
| next_text = next_paragraph.text.strip() | |
| if not next_text: | |
| continue | |
| is_another_heading = False | |
| for category, patterns in HEADING_PATTERNS.items(): | |
| for pattern in patterns: | |
| if re.search(pattern, next_text, re.IGNORECASE): | |
| is_another_heading = True | |
| break | |
| if is_another_heading: | |
| break | |
| if is_another_heading: | |
| break | |
| if has_red_text_in_paragraph(next_paragraph): | |
| print(f" π΄ Found red text in paragraph {next_para_idx + 1} after heading") | |
| context_replacements = process_red_text_in_paragraph( | |
| next_paragraph, | |
| paragraph_text, | |
| flat_json | |
| ) | |
| replacements_made += context_replacements | |
| return replacements_made | |
| def process_red_text_in_paragraph(paragraph, context_text, flat_json): | |
| replacements_made = 0 | |
| red_text_segments = [] | |
| for run in paragraph.runs: | |
| if is_red(run) and run.text.strip(): | |
| red_text_segments.append(run.text.strip()) | |
| if not red_text_segments: | |
| return 0 | |
| combined_red_text = " ".join(red_text_segments).strip() | |
| print(f" π Red text found: '{combined_red_text}'") | |
| kv = find_matching_json_key_and_value(combined_red_text, flat_json) | |
| json_value = kv[1] if kv else None | |
| if json_value is None: | |
| if "NHVAS APPROVED AUDITOR" in context_text.upper(): | |
| auditor_fields = ["auditor name", "auditor", "nhvas auditor", "approved auditor", "print name"] | |
| for field in auditor_fields: | |
| kv = find_matching_json_key_and_value(field, flat_json) | |
| if kv: | |
| print(f" β Found auditor match with field: '{kv[0]}'") | |
| json_value = kv[1] | |
| break | |
| elif "OPERATOR DECLARATION" in context_text.upper(): | |
| operator_fields = ["operator name", "operator", "company name", "organisation name", "print name"] | |
| for field in operator_fields: | |
| kv = find_matching_json_key_and_value(field, flat_json) | |
| if kv: | |
| print(f" β Found operator match with field: '{kv[0]}'") | |
| json_value = kv[1] | |
| break | |
| if json_value is None: | |
| context_queries = [f"{context_text} {combined_red_text}", combined_red_text, context_text] | |
| for query in context_queries: | |
| kv = find_matching_json_key_and_value(query, flat_json) | |
| if kv: | |
| print(f" β Found match with combined query -> {kv[0]}") | |
| json_value = kv[1] | |
| break | |
| if json_value is not None: | |
| replacement_text = get_value_as_string(json_value, combined_red_text) | |
| red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()] | |
| if red_runs: | |
| red_runs[0].text = replacement_text | |
| red_runs[0].font.color.rgb = RGBColor(0, 0, 0) | |
| for run in red_runs[1:]: | |
| run.text = '' | |
| replacements_made = 1 | |
| print(f" β Replaced with: '{replacement_text}'") | |
| else: | |
| print(f" β No match found for red text: '{combined_red_text}'") | |
| return replacements_made | |
| # ============================================================================ | |
| # Orchestrator | |
| # ============================================================================ | |
| def process_hf(json_file, docx_file, output_file): | |
| try: | |
| if hasattr(json_file, "read"): | |
| json_data = json.load(json_file) | |
| else: | |
| with open(json_file, 'r', encoding='utf-8') as f: | |
| json_data = json.load(f) | |
| flat_json = flatten_json(json_data) | |
| print("π Available JSON keys (sample):") | |
| for i, (key, value) in enumerate(sorted(flat_json.items())): | |
| if i < 10: | |
| print(f" - {key}: {value}") | |
| print(f" ... and {len(flat_json) - 10} more keys\n") | |
| if hasattr(docx_file, "read"): | |
| doc = Document(docx_file) | |
| else: | |
| doc = Document(docx_file) | |
| print("π Starting comprehensive document processing...") | |
| table_replacements = process_tables(doc, flat_json) | |
| paragraph_replacements = process_paragraphs(doc, flat_json) | |
| heading_replacements = process_headings(doc, flat_json) | |
| total_replacements = table_replacements + paragraph_replacements + heading_replacements | |
| # Save unmatched headers for iterative improvement | |
| if _unmatched_headers: | |
| try: | |
| tmp_path = "/tmp/unmatched_headers.json" | |
| with open(tmp_path, 'w', encoding='utf-8') as f: | |
| json.dump(_unmatched_headers, f, indent=2, ensure_ascii=False) | |
| print(f"β Unmatched headers saved to {tmp_path}") | |
| except Exception as e: | |
| print(f"β οΈ Could not save unmatched headers: {e}") | |
| if hasattr(output_file, "write"): | |
| doc.save(output_file) | |
| else: | |
| doc.save(output_file) | |
| print(f"\nβ Document saved as: {output_file}") | |
| print(f"β Total replacements: {total_replacements}") | |
| print(f" π Tables: {table_replacements}") | |
| print(f" π Paragraphs: {paragraph_replacements}") | |
| print(f" π Headings: {heading_replacements}") | |
| print(f"π Processing complete!") | |
| except FileNotFoundError as e: | |
| print(f"β File not found: {e}") | |
| except Exception as e: | |
| print(f"β Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) != 4: | |
| print("Usage: python pipeline.py <input_docx> <updated_json> <output_docx>") | |
| exit(1) | |
| docx_path = sys.argv[1] | |
| json_path = sys.argv[2] | |
| output_path = sys.argv[3] | |
| process_hf(json_path, docx_path, output_path) |