PDF-Data_Extractor / updated_word.py
Shami96's picture
Update updated_word.py
f144bc7 verified
raw
history blame
65.1 kB
import json
from docx import Document
from docx.shared import RGBColor
import re
# Heading patterns for document structure detection
HEADING_PATTERNS = {
"main": [
r"NHVAS\s+Audit\s+Summary\s+Report",
r"NATIONAL\s+HEAVY\s+VEHICLE\s+ACCREDITATION\s+AUDIT\s+SUMMARY\s+REPORT",
r"NHVAS\s+AUDIT\s+SUMMARY\s+REPORT"
],
"sub": [
r"AUDIT\s+OBSERVATIONS\s+AND\s+COMMENTS",
r"MAINTENANCE\s+MANAGEMENT",
r"MASS\s+MANAGEMENT",
r"FATIGUE\s+MANAGEMENT",
r"Fatigue\s+Management\s+Summary\s+of\s+Audit\s+findings",
r"MAINTENANCE\s+MANAGEMENT\s+SUMMARY\s+OF\s+AUDIT\s+FINDINGS",
r"MASS\s+MANAGEMENT\s+SUMMARY\s+OF\s+AUDIT\s+FINDINGS",
r"Vehicle\s+Registration\s+Numbers\s+of\s+Records\s+Examined",
r"CORRECTIVE\s+ACTION\s+REQUEST\s+\(CAR\)",
r"NHVAS\s+APPROVED\s+AUDITOR\s+DECLARATION",
r"Operator\s+Declaration",
r"Operator\s+Information",
r"Driver\s*/\s*Scheduler\s+Records\s+Examined"
]
}
# ============================================================================
# UTILITY FUNCTIONS
# ============================================================================
def load_json(filepath):
with open(filepath, 'r') as file:
return json.load(file)
def flatten_json(y, prefix=''):
out = {}
for key, val in y.items():
new_key = f"{prefix}.{key}" if prefix else key
if isinstance(val, dict):
out.update(flatten_json(val, new_key))
else:
out[new_key] = val
out[key] = val
return out
def is_red(run):
color = run.font.color
return color and (color.rgb == RGBColor(255, 0, 0) or getattr(color, "theme_color", None) == 1)
def get_value_as_string(value, field_name=""):
if isinstance(value, list):
if len(value) == 0:
return ""
elif len(value) == 1:
return str(value[0])
else:
if "australian company number" in field_name.lower() or "company number" in field_name.lower():
return value
else:
return " ".join(str(v) for v in value)
else:
return str(value)
def get_clean_text(cell):
text = ""
for paragraph in cell.paragraphs:
for run in paragraph.runs:
text += run.text
return text.strip()
def has_red_text(cell):
for paragraph in cell.paragraphs:
for run in paragraph.runs:
if is_red(run) and run.text.strip():
return True
return False
def has_red_text_in_paragraph(paragraph):
for run in paragraph.runs:
if is_red(run) and run.text.strip():
return True
return False
# ============================================================================
# JSON MATCHING FUNCTIONS
# ============================================================================
def find_matching_json_value(field_name, flat_json):
"""Find matching value in JSON with multiple strategies"""
field_name = field_name.strip()
# Try exact match first
if field_name in flat_json:
print(f" βœ… Direct match found for key '{field_name}'")
return flat_json[field_name]
# Try case-insensitive exact match
for key, value in flat_json.items():
if key.lower() == field_name.lower():
print(f" βœ… Case-insensitive match found for key '{field_name}' with JSON key '{key}'")
return value
# Better Print Name detection for operator vs auditor
if field_name.lower().strip() == "print name":
operator_keys = [k for k in flat_json.keys() if "operator" in k.lower() and "print name" in k.lower()]
auditor_keys = [k for k in flat_json.keys() if "auditor" in k.lower() and ("print name" in k.lower() or "name" in k.lower())]
if operator_keys:
print(f" βœ… Operator Print Name match: '{field_name}' -> '{operator_keys[0]}'")
return flat_json[operator_keys[0]]
elif auditor_keys:
print(f" βœ… Auditor Name match: '{field_name}' -> '{auditor_keys[0]}'")
return flat_json[auditor_keys[0]]
# Try suffix matching (for nested keys like "section.field")
for key, value in flat_json.items():
if '.' in key and key.split('.')[-1].lower() == field_name.lower():
print(f" βœ… Suffix match found for key '{field_name}' with JSON key '{key}'")
return value
# Try partial matching - remove parentheses and special chars
clean_field = re.sub(r'[^\w\s]', ' ', field_name.lower()).strip()
clean_field = re.sub(r'\s+', ' ', clean_field)
for key, value in flat_json.items():
clean_key = re.sub(r'[^\w\s]', ' ', key.lower()).strip()
clean_key = re.sub(r'\s+', ' ', clean_key)
if clean_field == clean_key:
print(f" βœ… Clean match found for key '{field_name}' with JSON key '{key}'")
return value
# Enhanced fuzzy matching with better scoring
field_words = set(word.lower() for word in re.findall(r'\b\w+\b', field_name) if len(word) > 2)
if not field_words:
return None
best_match = None
best_score = 0
best_key = None
for key, value in flat_json.items():
key_words = set(word.lower() for word in re.findall(r'\b\w+\b', key) if len(word) > 2)
if not key_words:
continue
# Calculate similarity score
common_words = field_words.intersection(key_words)
if common_words:
# Use Jaccard similarity: intersection / union
similarity = len(common_words) / len(field_words.union(key_words))
# Bonus for high word coverage in field_name
coverage = len(common_words) / len(field_words)
final_score = (similarity * 0.6) + (coverage * 0.4)
if final_score > best_score:
best_score = final_score
best_match = value
best_key = key
if best_match and best_score >= 0.25:
print(f" βœ… Fuzzy match found for key '{field_name}' with JSON key '{best_key}' (score: {best_score:.2f})")
return best_match
print(f" ❌ No match found for '{field_name}'")
return None
# ============================================================================
# RED TEXT PROCESSING FUNCTIONS
# ============================================================================
def extract_red_text_segments(cell):
"""Extract red text segments from a cell"""
red_segments = []
for para_idx, paragraph in enumerate(cell.paragraphs):
current_segment = ""
segment_runs = []
for run_idx, run in enumerate(paragraph.runs):
if is_red(run):
if run.text:
current_segment += run.text
segment_runs.append((para_idx, run_idx, run))
else:
# End of current red segment
if segment_runs:
red_segments.append({
'text': current_segment,
'runs': segment_runs.copy(),
'paragraph_idx': para_idx
})
current_segment = ""
segment_runs = []
# Handle segment at end of paragraph
if segment_runs:
red_segments.append({
'text': current_segment,
'runs': segment_runs.copy(),
'paragraph_idx': para_idx
})
return red_segments
def replace_all_red_segments(red_segments, replacement_text):
"""Replace all red segments with replacement text"""
if not red_segments:
return 0
if '\n' in replacement_text:
replacement_lines = replacement_text.split('\n')
else:
replacement_lines = [replacement_text]
replacements_made = 0
if red_segments and replacement_lines:
first_segment = red_segments[0]
if first_segment['runs']:
first_run = first_segment['runs'][0][2]
first_run.text = replacement_lines[0]
first_run.font.color.rgb = RGBColor(0, 0, 0)
replacements_made = 1
for _, _, run in first_segment['runs'][1:]:
run.text = ''
for segment in red_segments[1:]:
for _, _, run in segment['runs']:
run.text = ''
if len(replacement_lines) > 1 and red_segments:
try:
first_run = red_segments[0]['runs'][0][2]
paragraph = first_run.element.getparent()
for line in replacement_lines[1:]:
if line.strip():
from docx.oxml import OxmlElement
br = OxmlElement('w:br')
first_run.element.append(br)
new_run = paragraph.add_run(line.strip())
new_run.font.color.rgb = RGBColor(0, 0, 0)
except:
if red_segments and red_segments[0]['runs']:
first_run = red_segments[0]['runs'][0][2]
first_run.text = ' '.join(replacement_lines)
first_run.font.color.rgb = RGBColor(0, 0, 0)
return replacements_made
def replace_single_segment(segment, replacement_text):
"""Replace a single red text segment"""
if not segment['runs']:
return False
first_run = segment['runs'][0][2]
first_run.text = replacement_text
first_run.font.color.rgb = RGBColor(0, 0, 0)
for _, _, run in segment['runs'][1:]:
run.text = ''
return True
def replace_red_text_in_cell(cell, replacement_text):
"""Replace red text in a cell with replacement text"""
red_segments = extract_red_text_segments(cell)
if not red_segments:
return 0
return replace_all_red_segments(red_segments, replacement_text)
# ============================================================================
# SPECIALIZED TABLE HANDLERS
# ============================================================================
def handle_australian_company_number(row, company_numbers):
"""Handle Australian Company Number digit placement"""
replacements_made = 0
for i, digit in enumerate(company_numbers):
cell_idx = i + 1
if cell_idx < len(row.cells):
cell = row.cells[cell_idx]
if has_red_text(cell):
cell_replacements = replace_red_text_in_cell(cell, str(digit))
replacements_made += cell_replacements
print(f" -> Placed digit '{digit}' in cell {cell_idx + 1}")
return replacements_made
def handle_vehicle_registration_table(table, flat_json):
"""Handle vehicle registration table data replacement"""
replacements_made = 0
# Try to find vehicle registration data
vehicle_section = None
for key, value in flat_json.items():
if "vehicle registration numbers of records examined" in key.lower():
if isinstance(value, dict):
vehicle_section = value
print(f" βœ… Found vehicle data in key: '{key}'")
break
if not vehicle_section:
potential_columns = {}
for key, value in flat_json.items():
if any(col_name in key.lower() for col_name in ["registration number", "sub-contractor", "weight verification", "rfs suspension"]):
if "." in key:
column_name = key.split(".")[-1]
else:
column_name = key
potential_columns[column_name] = value
if potential_columns:
vehicle_section = potential_columns
print(f" βœ… Found vehicle data from flattened keys: {list(vehicle_section.keys())}")
else:
print(f" ❌ Vehicle registration data not found in JSON")
return 0
print(f" βœ… Found vehicle registration data with {len(vehicle_section)} columns")
# Find header row
header_row_idx = -1
header_row = None
for row_idx, row in enumerate(table.rows):
row_text = "".join(get_clean_text(cell).lower() for cell in row.cells)
if "registration" in row_text and "number" in row_text:
header_row_idx = row_idx
header_row = row
break
if header_row_idx == -1:
print(f" ❌ Could not find header row in vehicle table")
return 0
print(f" βœ… Found header row at index {header_row_idx}")
# Enhanced column mapping
column_mapping = {}
for col_idx, cell in enumerate(header_row.cells):
header_text = get_clean_text(cell).strip()
if not header_text or header_text.lower() == "no.":
continue
best_match = None
best_score = 0
normalized_header = header_text.lower().replace("(", " (").replace(")", ") ").strip()
for json_key in vehicle_section.keys():
normalized_json = json_key.lower().strip()
if normalized_header == normalized_json:
best_match = json_key
best_score = 1.0
break
header_words = set(word.lower() for word in normalized_header.split() if len(word) > 2)
json_words = set(word.lower() for word in normalized_json.split() if len(word) > 2)
if header_words and json_words:
common_words = header_words.intersection(json_words)
score = len(common_words) / max(len(header_words), len(json_words))
if score > best_score and score >= 0.3:
best_score = score
best_match = json_key
header_clean = normalized_header.replace(" ", "").replace("-", "").replace("(", "").replace(")", "")
json_clean = normalized_json.replace(" ", "").replace("-", "").replace("(", "").replace(")", "")
if header_clean in json_clean or json_clean in header_clean:
if len(header_clean) > 5 and len(json_clean) > 5:
substring_score = min(len(header_clean), len(json_clean)) / max(len(header_clean), len(json_clean))
if substring_score > best_score and substring_score >= 0.6:
best_score = substring_score
best_match = json_key
if best_match:
column_mapping[col_idx] = best_match
print(f" πŸ“Œ Column {col_idx + 1} ('{header_text}') -> '{best_match}' (score: {best_score:.2f})")
if not column_mapping:
print(f" ❌ No column mappings found")
return 0
# Determine data rows needed
max_data_rows = 0
for json_key, data in vehicle_section.items():
if isinstance(data, list):
max_data_rows = max(max_data_rows, len(data))
print(f" πŸ“Œ Need to populate {max_data_rows} data rows")
# Process data rows
for data_row_index in range(max_data_rows):
table_row_idx = header_row_idx + 1 + data_row_index
if table_row_idx >= len(table.rows):
print(f" ⚠️ Row {table_row_idx + 1} doesn't exist - table only has {len(table.rows)} rows")
print(f" βž• Adding new row for vehicle {data_row_index + 1}")
new_row = table.add_row()
print(f" βœ… Successfully added row {len(table.rows)} to the table")
row = table.rows[table_row_idx]
print(f" πŸ“Œ Processing data row {table_row_idx + 1} (vehicle {data_row_index + 1})")
for col_idx, json_key in column_mapping.items():
if col_idx < len(row.cells):
cell = row.cells[col_idx]
column_data = vehicle_section.get(json_key, [])
if isinstance(column_data, list) and data_row_index < len(column_data):
replacement_value = str(column_data[data_row_index])
cell_text = get_clean_text(cell)
if has_red_text(cell) or not cell_text.strip():
if not cell_text.strip():
cell.text = replacement_value
replacements_made += 1
print(f" -> Added '{replacement_value}' to empty cell (column '{json_key}')")
else:
cell_replacements = replace_red_text_in_cell(cell, replacement_value)
replacements_made += cell_replacements
if cell_replacements > 0:
print(f" -> Replaced red text with '{replacement_value}' (column '{json_key}')")
return replacements_made
def handle_attendance_list_table_enhanced(table, flat_json):
"""Enhanced Attendance List processing with better detection"""
replacements_made = 0
# Check multiple patterns for attendance list
attendance_patterns = [
"attendance list",
"names and position titles",
"attendees"
]
# Scan all cells in the first few rows for attendance list indicators
found_attendance_row = None
for row_idx, row in enumerate(table.rows[:3]): # Check first 3 rows
for cell_idx, cell in enumerate(row.cells):
cell_text = get_clean_text(cell).lower()
# Check if this cell contains attendance list header
if any(pattern in cell_text for pattern in attendance_patterns):
found_attendance_row = row_idx
print(f" 🎯 ENHANCED: Found Attendance List in row {row_idx + 1}, cell {cell_idx + 1}")
break
if found_attendance_row is not None:
break
if found_attendance_row is None:
return 0
# Look for attendance data in JSON
attendance_value = None
attendance_search_keys = [
"Attendance List (Names and Position Titles).Attendance List (Names and Position Titles)",
"Attendance List (Names and Position Titles)",
"attendance list",
"attendees"
]
print(f" πŸ” Searching for attendance data in JSON...")
for search_key in attendance_search_keys:
attendance_value = find_matching_json_value(search_key, flat_json)
if attendance_value is not None:
print(f" βœ… Found attendance data with key: '{search_key}'")
print(f" πŸ“Š Raw value: {attendance_value}")
break
if attendance_value is None:
print(f" ❌ No attendance data found in JSON")
return 0
# Look for red text in ALL cells of the table
target_cell = None
print(f" πŸ” Scanning ALL cells in attendance table for red text...")
for row_idx, row in enumerate(table.rows):
for cell_idx, cell in enumerate(row.cells):
if has_red_text(cell):
print(f" 🎯 Found red text in row {row_idx + 1}, cell {cell_idx + 1}")
# Get the red text to see if it looks like attendance data
red_text = ""
for paragraph in cell.paragraphs:
for run in paragraph.runs:
if is_red(run):
red_text += run.text
print(f" πŸ“‹ Red text content: '{red_text[:50]}...'")
# Check if this red text looks like attendance data (contains names/manager/etc)
red_text_lower = red_text.lower()
if any(indicator in red_text_lower for indicator in ['manager', 'herbig', 'palin', '–', '-']):
target_cell = cell
print(f" βœ… This looks like attendance data - using this cell")
break
if target_cell is not None:
break
# If no red text found that looks like attendance data, return
if target_cell is None:
print(f" ⚠️ No red text found that looks like attendance data")
return 0
# Replace red text with properly formatted attendance list
if has_red_text(target_cell):
print(f" πŸ”§ Replacing red text with properly formatted attendance list...")
# Ensure attendance_value is a list
if isinstance(attendance_value, list):
attendance_list = [str(item).strip() for item in attendance_value if str(item).strip()]
else:
attendance_list = [str(attendance_value).strip()]
print(f" πŸ“ Attendance items to add:")
for i, item in enumerate(attendance_list):
print(f" {i+1}. {item}")
# Replace with line-separated attendance list
replacement_text = "\n".join(attendance_list)
cell_replacements = replace_red_text_in_cell(target_cell, replacement_text)
replacements_made += cell_replacements
print(f" βœ… Added {len(attendance_list)} attendance items")
print(f" πŸ“Š Replacements made: {cell_replacements}")
return replacements_made
def fix_management_summary_details_column(table, flat_json):
"""Fix the DETAILS column in Management Summary table"""
replacements_made = 0
print(f" 🎯 FIX: Management Summary DETAILS column processing")
# Check if this is a Management Summary table
table_text = ""
for row in table.rows[:2]:
for cell in row.cells:
table_text += get_clean_text(cell).lower() + " "
if not ("mass management" in table_text and "details" in table_text):
return 0
print(f" βœ… Confirmed Mass Management Summary table")
# Process each row looking for Std 5. and Std 6. with red text
for row_idx, row in enumerate(table.rows):
if len(row.cells) >= 2:
standard_cell = row.cells[0]
details_cell = row.cells[1]
standard_text = get_clean_text(standard_cell).strip()
# Look for Std 5. Verification and Std 6. Internal Review specifically
if "Std 5." in standard_text and "Verification" in standard_text:
if has_red_text(details_cell):
print(f" πŸ” Found Std 5. Verification with red text")
json_value = find_matching_json_value("Std 5. Verification", flat_json)
if json_value is not None:
replacement_text = get_value_as_string(json_value, "Std 5. Verification")
cell_replacements = replace_red_text_in_cell(details_cell, replacement_text)
replacements_made += cell_replacements
print(f" βœ… Replaced Std 5. Verification details")
elif "Std 6." in standard_text and "Internal Review" in standard_text:
if has_red_text(details_cell):
print(f" πŸ” Found Std 6. Internal Review with red text")
json_value = find_matching_json_value("Std 6. Internal Review", flat_json)
if json_value is not None:
replacement_text = get_value_as_string(json_value, "Std 6. Internal Review")
cell_replacements = replace_red_text_in_cell(details_cell, replacement_text)
replacements_made += cell_replacements
print(f" βœ… Replaced Std 6. Internal Review details")
return replacements_made
def fix_operator_declaration_empty_values(table, flat_json):
"""Fix Operator Declaration table when values are empty or need updating"""
replacements_made = 0
print(f" 🎯 FIX: Operator Declaration empty values processing")
# Check if this is an Operator Declaration table
table_context = ""
for row in table.rows:
for cell in row.cells:
table_context += get_clean_text(cell).lower() + " "
if not ("print name" in table_context and "position title" in table_context):
return 0
print(f" βœ… Confirmed Operator Declaration table")
# Find the data row with Print Name and Position Title
for row_idx, row in enumerate(table.rows):
if len(row.cells) >= 2:
cell1_text = get_clean_text(row.cells[0]).strip().lower()
cell2_text = get_clean_text(row.cells[1]).strip().lower()
# Check if this is the header row
if "print name" in cell1_text and "position" in cell2_text:
print(f" πŸ“Œ Found header row at {row_idx + 1}")
# Look for the data row (next row)
if row_idx + 1 < len(table.rows):
data_row = table.rows[row_idx + 1]
if len(data_row.cells) >= 2:
name_cell = data_row.cells[0]
position_cell = data_row.cells[1]
# Check if cells are empty or have red text
name_text = get_clean_text(name_cell).strip()
position_text = get_clean_text(position_cell).strip()
print(f" πŸ“‹ Current values: Name='{name_text}', Position='{position_text}'")
# Get the Operator Declaration section data
operator_declaration = find_matching_json_value("Operator Declaration", flat_json)
if operator_declaration and isinstance(operator_declaration, dict):
print(f" πŸ” Found Operator Declaration data: {operator_declaration}")
# Update Print Name
if "Print Name" in operator_declaration:
print_name_value = operator_declaration["Print Name"]
if isinstance(print_name_value, list) and print_name_value:
new_name = str(print_name_value[0]).strip()
if new_name and "Pty Ltd" not in new_name and "Company" not in new_name:
name_cell.text = new_name
replacements_made += 1
print(f" βœ… Updated Print Name: '{name_text}' -> '{new_name}'")
# Update Position Title
if "Position Title" in operator_declaration:
position_value = operator_declaration["Position Title"]
if isinstance(position_value, list) and position_value:
new_position = str(position_value[0]).strip()
if new_position:
position_cell.text = new_position
replacements_made += 1
print(f" βœ… Updated Position Title: '{position_text}' -> '{new_position}'")
else:
print(f" ❌ No Operator Declaration section found in JSON")
# Fallback: try individual fields
name_value = find_matching_json_value("Operator Declaration.Print Name", flat_json)
if name_value:
new_name = get_value_as_string(name_value).strip()
if new_name and "Pty Ltd" not in new_name:
name_cell.text = new_name
replacements_made += 1
print(f" βœ… Updated Print Name (fallback): '{new_name}'")
position_value = find_matching_json_value("Operator Declaration.Position Title", flat_json)
if position_value:
new_position = get_value_as_string(position_value).strip()
if new_position:
position_cell.text = new_position
replacements_made += 1
print(f" βœ… Updated Position Title (fallback): '{new_position}'")
break
return replacements_made
def handle_multiple_red_segments_in_cell(cell, flat_json):
"""Handle multiple red text segments within a single cell"""
replacements_made = 0
red_segments = extract_red_text_segments(cell)
if not red_segments:
return 0
# Try to match each segment individually
for i, segment in enumerate(red_segments):
segment_text = segment['text'].strip()
if segment_text:
json_value = find_matching_json_value(segment_text, flat_json)
if json_value is not None:
replacement_text = get_value_as_string(json_value, segment_text)
if replace_single_segment(segment, replacement_text):
replacements_made += 1
print(f" βœ… Replaced segment {i+1}: '{segment_text}' -> '{replacement_text}'")
return replacements_made
def handle_nature_business_multiline_fix(cell, flat_json):
"""Handle Nature of Business multiline red text"""
replacements_made = 0
# Extract red text to check if it looks like nature of business
red_text = ""
for paragraph in cell.paragraphs:
for run in paragraph.runs:
if is_red(run):
red_text += run.text
red_text = red_text.strip()
if not red_text:
return 0
# Check if this looks like nature of business content
nature_indicators = ["transport", "logistics", "freight", "delivery", "trucking", "haulage"]
if any(indicator in red_text.lower() for indicator in nature_indicators):
# Try to find nature of business in JSON
nature_value = find_matching_json_value("Nature of Business", flat_json)
if nature_value is not None:
replacement_text = get_value_as_string(nature_value, "Nature of Business")
cell_replacements = replace_red_text_in_cell(cell, replacement_text)
replacements_made += cell_replacements
print(f" βœ… Fixed Nature of Business multiline content")
return replacements_made
def handle_management_summary_fix(cell, flat_json):
"""Handle Management Summary content fixes"""
replacements_made = 0
# Extract red text
red_text = ""
for paragraph in cell.paragraphs:
for run in paragraph.runs:
if is_red(run):
red_text += run.text
red_text = red_text.strip()
if not red_text:
return 0
# Look for management summary data in new schema format
management_types = ["Mass Management Summary", "Maintenance Management Summary", "Fatigue Management Summary"]
for mgmt_type in management_types:
if mgmt_type in flat_json:
mgmt_data = flat_json[mgmt_type]
if isinstance(mgmt_data, dict):
# Try to match red text with any standard in this management type
for std_key, std_value in mgmt_data.items():
if isinstance(std_value, list) and std_value:
# Check if red text matches this standard
if len(red_text) > 10:
for item in std_value:
if red_text.lower() in str(item).lower() or str(item).lower() in red_text.lower():
replacement_text = "\n".join(str(i) for i in std_value)
cell_replacements = replace_red_text_in_cell(cell, replacement_text)
replacements_made += cell_replacements
print(f" βœ… Fixed {mgmt_type} - {std_key}")
return replacements_made
return replacements_made
def handle_operator_declaration_fix(table, flat_json):
"""Handle small Operator/Auditor Declaration tables"""
replacements_made = 0
if len(table.rows) > 4: # Only process small tables
return 0
# Get table context
table_text = ""
for row in table.rows:
for cell in row.cells:
table_text += get_clean_text(cell).lower() + " "
# Check if this is a declaration table
if not ("print name" in table_text or "signature" in table_text or "date" in table_text):
return 0
print(f" 🎯 Processing declaration table")
# Process each cell with red text
for row_idx, row in enumerate(table.rows):
for cell_idx, cell in enumerate(row.cells):
if has_red_text(cell):
# Try common declaration fields
declaration_fields = [
"Print Name", "Position Title", "Signature", "Date",
"Operator Declaration.Print Name", "Operator Declaration.Position Title",
"NHVAS Approved Auditor Declaration.Print Name"
]
replaced = False
for field in declaration_fields:
field_value = find_matching_json_value(field, flat_json)
if field_value is not None:
replacement_text = get_value_as_string(field_value, field)
if replacement_text.strip():
cell_replacements = replace_red_text_in_cell(cell, replacement_text)
if cell_replacements > 0:
replacements_made += cell_replacements
print(f" βœ… Fixed declaration field: {field}")
replaced = True
break
# If no specific field match, try generic signature/date
if not replaced:
red_text = ""
for paragraph in cell.paragraphs:
for run in paragraph.runs:
if is_red(run):
red_text += run.text
if "signature" in red_text.lower():
cell_replacements = replace_red_text_in_cell(cell, "[Signature]")
replacements_made += cell_replacements
elif "date" in red_text.lower():
cell_replacements = replace_red_text_in_cell(cell, "[Date]")
replacements_made += cell_replacements
return replacements_made
def handle_print_accreditation_section(table, flat_json):
"""Handle Print Accreditation section"""
replacements_made = 0
print(f" πŸ“‹ Processing Print Accreditation section")
for row_idx, row in enumerate(table.rows):
for cell_idx, cell in enumerate(row.cells):
if has_red_text(cell):
# Try print accreditation fields
accreditation_fields = [
"(print accreditation name)",
"Print Name",
"Operator name (Legal entity)"
]
for field in accreditation_fields:
field_value = find_matching_json_value(field, flat_json)
if field_value is not None:
replacement_text = get_value_as_string(field_value, field)
if replacement_text.strip():
cell_replacements = replace_red_text_in_cell(cell, replacement_text)
replacements_made += cell_replacements
if cell_replacements > 0:
print(f" βœ… Fixed accreditation: {field}")
break
return replacements_made
def process_single_column_sections(cell, key_text, flat_json):
"""Process single column sections with red text"""
replacements_made = 0
if has_red_text(cell):
red_text = ""
for paragraph in cell.paragraphs:
for run in paragraph.runs:
if is_red(run):
red_text += run.text
if red_text.strip():
# Try direct matching first
section_value = find_matching_json_value(red_text.strip(), flat_json)
if section_value is None:
# Try key-based matching
section_value = find_matching_json_value(key_text, flat_json)
if section_value is not None:
section_replacement = get_value_as_string(section_value, red_text.strip())
cell_replacements = replace_red_text_in_cell(cell, section_replacement)
replacements_made += cell_replacements
if cell_replacements > 0:
print(f" βœ… Fixed single column section: '{key_text}'")
return replacements_made
def process_tables(document, flat_json):
"""Process all tables in the document with comprehensive fixes"""
replacements_made = 0
for table_idx, table in enumerate(document.tables):
print(f"\nπŸ” Processing table {table_idx + 1}:")
# Get table context
table_text = ""
for row in table.rows[:3]:
for cell in row.cells:
table_text += get_clean_text(cell).lower() + " "
# Detect Management Summary tables
management_summary_indicators = ["mass management", "maintenance management", "fatigue management"]
has_management = any(indicator in table_text for indicator in management_summary_indicators)
has_details = "details" in table_text
if has_management and has_details:
print(f" πŸ“‹ Detected Management Summary table")
summary_fixes = fix_management_summary_details_column(table, flat_json)
replacements_made += summary_fixes
# Process remaining red text in management summary
summary_replacements = 0
for row_idx, row in enumerate(table.rows):
for cell_idx, cell in enumerate(row.cells):
if has_red_text(cell):
# Try direct matching with the new schema names first
for mgmt_type in ["Mass Management Summary", "Maintenance Management Summary", "Fatigue Management Summary"]:
if mgmt_type.lower().replace(" summary", "") in table_text:
# Look for this standard in the JSON
if mgmt_type in flat_json:
mgmt_data = flat_json[mgmt_type]
if isinstance(mgmt_data, dict):
# Find matching standard
for std_key, std_value in mgmt_data.items():
if isinstance(std_value, list) and len(std_value) > 0:
# Check if red text matches this standard data
red_text = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red(run)).strip()
for item in std_value:
if len(red_text) > 15 and red_text.lower() in str(item).lower():
replacement_text = "\n".join(str(i) for i in std_value)
cell_replacements = replace_red_text_in_cell(cell, replacement_text)
summary_replacements += cell_replacements
print(f" βœ… Updated {std_key} with summary data")
break
break
# Fallback to existing method
if summary_replacements == 0:
cell_replacements = handle_management_summary_fix(cell, flat_json)
summary_replacements += cell_replacements
replacements_made += summary_replacements
continue
# Detect Vehicle Registration tables
vehicle_indicators = ["registration number", "sub-contractor", "weight verification", "rfs suspension"]
indicator_count = sum(1 for indicator in vehicle_indicators if indicator in table_text)
if indicator_count >= 2:
print(f" πŸš— Detected Vehicle Registration table")
vehicle_replacements = handle_vehicle_registration_table(table, flat_json)
replacements_made += vehicle_replacements
continue
# Detect Attendance List tables
if "attendance list" in table_text and "names and position titles" in table_text:
print(f" πŸ‘₯ Detected Attendance List table")
attendance_replacements = handle_attendance_list_table_enhanced(table, flat_json)
replacements_made += attendance_replacements
continue
# Detect Print Accreditation tables
print_accreditation_indicators = ["print name", "position title"]
indicator_count = sum(1 for indicator in print_accreditation_indicators if indicator in table_text)
if indicator_count >= 1:
print(f" πŸ“‹ Detected Print Accreditation table")
# Check for declaration tables that need fixing
if "print name" in table_text and "position" in table_text:
declaration_fixes = fix_operator_declaration_empty_values(table, flat_json)
replacements_made += declaration_fixes
print_accreditation_replacements = handle_print_accreditation_section(table, flat_json)
replacements_made += print_accreditation_replacements
continue
# Process regular table rows
for row_idx, row in enumerate(table.rows):
if len(row.cells) < 1:
continue
key_cell = row.cells[0]
key_text = get_clean_text(key_cell)
if not key_text:
continue
print(f" πŸ“Œ Row {row_idx + 1}: Key = '{key_text}'")
json_value = find_matching_json_value(key_text, flat_json)
if json_value is not None:
replacement_text = get_value_as_string(json_value, key_text)
# Handle Australian Company Number
if ("australian company number" in key_text.lower() or "company number" in key_text.lower()) and isinstance(json_value, list):
cell_replacements = handle_australian_company_number(row, json_value)
replacements_made += cell_replacements
# Handle section headers
elif ("attendance list" in key_text.lower() or "nature of" in key_text.lower()) and row_idx + 1 < len(table.rows):
print(f" βœ… Section header detected, checking next row...")
next_row = table.rows[row_idx + 1]
for cell_idx, cell in enumerate(next_row.cells):
if has_red_text(cell):
print(f" βœ… Found red text in next row, cell {cell_idx + 1}")
if isinstance(json_value, list):
replacement_text = "\n".join(str(item) for item in json_value)
cell_replacements = replace_red_text_in_cell(cell, replacement_text)
replacements_made += cell_replacements
if cell_replacements > 0:
print(f" -> Replaced section content")
# Handle single column sections
elif len(row.cells) == 1 or (len(row.cells) > 1 and not any(has_red_text(row.cells[i]) for i in range(1, len(row.cells)))):
if has_red_text(key_cell):
cell_replacements = process_single_column_sections(key_cell, key_text, flat_json)
replacements_made += cell_replacements
# Handle regular key-value pairs
else:
for cell_idx in range(1, len(row.cells)):
value_cell = row.cells[cell_idx]
if has_red_text(value_cell):
print(f" βœ… Found red text in column {cell_idx + 1}")
cell_replacements = replace_red_text_in_cell(value_cell, replacement_text)
replacements_made += cell_replacements
else:
# Fallback processing for unmatched keys
if len(row.cells) == 1 and has_red_text(key_cell):
red_text = ""
for paragraph in key_cell.paragraphs:
for run in paragraph.runs:
if is_red(run):
red_text += run.text
if red_text.strip():
section_value = find_matching_json_value(red_text.strip(), flat_json)
if section_value is not None:
section_replacement = get_value_as_string(section_value, red_text.strip())
cell_replacements = replace_red_text_in_cell(key_cell, section_replacement)
replacements_made += cell_replacements
# Process red text in all cells
for cell_idx in range(len(row.cells)):
cell = row.cells[cell_idx]
if has_red_text(cell):
cell_replacements = handle_multiple_red_segments_in_cell(cell, flat_json)
replacements_made += cell_replacements
# Apply fixes if no replacements made
if cell_replacements == 0:
surgical_fix = handle_nature_business_multiline_fix(cell, flat_json)
replacements_made += surgical_fix
if cell_replacements == 0:
management_summary_fix = handle_management_summary_fix(cell, flat_json)
replacements_made += management_summary_fix
# Handle Operator/Auditor Declaration tables (check last few tables)
print(f"\n🎯 Final check for Declaration tables...")
for table in document.tables[-3:]:
if len(table.rows) <= 4:
declaration_fix = handle_operator_declaration_fix(table, flat_json)
replacements_made += declaration_fix
return replacements_made
def process_paragraphs(document, flat_json):
"""Process all paragraphs in the document"""
replacements_made = 0
print(f"\nπŸ” Processing paragraphs:")
for para_idx, paragraph in enumerate(document.paragraphs):
red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()]
if red_runs:
red_text_only = "".join(run.text for run in red_runs).strip()
print(f" πŸ“Œ Paragraph {para_idx + 1}: Found red text: '{red_text_only}'")
json_value = find_matching_json_value(red_text_only, flat_json)
if json_value is None:
# Enhanced pattern matching for signatures and dates
if "AUDITOR SIGNATURE" in red_text_only.upper() or "DATE" in red_text_only.upper():
json_value = find_matching_json_value("auditor signature", flat_json)
elif "OPERATOR SIGNATURE" in red_text_only.upper():
json_value = find_matching_json_value("operator signature", flat_json)
if json_value is not None:
replacement_text = get_value_as_string(json_value)
print(f" βœ… Replacing red text with: '{replacement_text}'")
red_runs[0].text = replacement_text
red_runs[0].font.color.rgb = RGBColor(0, 0, 0)
for run in red_runs[1:]:
run.text = ''
replacements_made += 1
return replacements_made
def process_headings(document, flat_json):
"""Process headings and their related content"""
replacements_made = 0
print(f"\nπŸ” Processing headings:")
paragraphs = document.paragraphs
for para_idx, paragraph in enumerate(paragraphs):
paragraph_text = paragraph.text.strip()
if not paragraph_text:
continue
# Check if this is a heading
matched_heading = None
for category, patterns in HEADING_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, paragraph_text, re.IGNORECASE):
matched_heading = pattern
break
if matched_heading:
break
if matched_heading:
print(f" πŸ“Œ Found heading at paragraph {para_idx + 1}: '{paragraph_text}'")
# Check current heading paragraph
if has_red_text_in_paragraph(paragraph):
print(f" πŸ”΄ Found red text in heading itself")
heading_replacements = process_red_text_in_paragraph(paragraph, paragraph_text, flat_json)
replacements_made += heading_replacements
# Look ahead for related content
for next_para_offset in range(1, 6):
next_para_idx = para_idx + next_para_offset
if next_para_idx >= len(paragraphs):
break
next_paragraph = paragraphs[next_para_idx]
next_text = next_paragraph.text.strip()
if not next_text:
continue
# Stop if we hit another heading
is_another_heading = False
for category, patterns in HEADING_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, next_text, re.IGNORECASE):
is_another_heading = True
break
if is_another_heading:
break
if is_another_heading:
break
# Process red text with context
if has_red_text_in_paragraph(next_paragraph):
print(f" πŸ”΄ Found red text in paragraph {next_para_idx + 1} after heading")
context_replacements = process_red_text_in_paragraph(
next_paragraph,
paragraph_text,
flat_json
)
replacements_made += context_replacements
return replacements_made
def process_red_text_in_paragraph(paragraph, context_text, flat_json):
"""Process red text within a paragraph using context"""
replacements_made = 0
red_text_segments = []
for run in paragraph.runs:
if is_red(run) and run.text.strip():
red_text_segments.append(run.text.strip())
if not red_text_segments:
return 0
combined_red_text = " ".join(red_text_segments).strip()
print(f" πŸ” Red text found: '{combined_red_text}'")
json_value = None
# Direct matching
json_value = find_matching_json_value(combined_red_text, flat_json)
# Context-based matching
if json_value is None:
if "NHVAS APPROVED AUDITOR" in context_text.upper():
auditor_fields = ["auditor name", "auditor", "nhvas auditor", "approved auditor", "print name"]
for field in auditor_fields:
json_value = find_matching_json_value(field, flat_json)
if json_value is not None:
print(f" βœ… Found auditor match with field: '{field}'")
break
elif "OPERATOR DECLARATION" in context_text.upper():
operator_fields = ["operator name", "operator", "company name", "organisation name", "print name"]
for field in operator_fields:
json_value = find_matching_json_value(field, flat_json)
if json_value is not None:
print(f" βœ… Found operator match with field: '{field}'")
break
# Combined context queries
if json_value is None:
context_queries = [
f"{context_text} {combined_red_text}",
combined_red_text,
context_text
]
for query in context_queries:
json_value = find_matching_json_value(query, flat_json)
if json_value is not None:
print(f" βœ… Found match with combined query")
break
# Replace if match found
if json_value is not None:
replacement_text = get_value_as_string(json_value, combined_red_text)
red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()]
if red_runs:
red_runs[0].text = replacement_text
red_runs[0].font.color.rgb = RGBColor(0, 0, 0)
for run in red_runs[1:]:
run.text = ''
replacements_made = 1
print(f" βœ… Replaced with: '{replacement_text}'")
else:
print(f" ❌ No match found for red text: '{combined_red_text}'")
return replacements_made
def force_red_text_replacement(document, flat_json):
"""Force replacement of any remaining red text by trying ALL JSON values"""
replacements_made = 0
print(f"\n🎯 FORCE FIX: Scanning for any remaining red text...")
# Collect all possible replacement values from JSON
all_values = {}
for key, value in flat_json.items():
if value:
value_str = get_value_as_string(value, key)
if value_str and isinstance(value_str, str) and value_str.strip():
all_values[key] = value_str.strip()
# Store individual items from lists for partial matching
if isinstance(value, list):
for i, item in enumerate(value):
item_str = str(item).strip() if item else ""
if item_str:
all_values[f"{key}_item_{i}"] = item_str
print(f" Found {len(all_values)} potential replacement values")
# Process all tables
for table_idx, table in enumerate(document.tables):
for row_idx, row in enumerate(table.rows):
for cell_idx, cell in enumerate(row.cells):
if has_red_text(cell):
print(f" πŸ” Found red text in Table {table_idx + 1}, Row {row_idx + 1}, Cell {cell_idx + 1}")
# Extract all red text from this cell
red_text_parts = []
for paragraph in cell.paragraphs:
for run in paragraph.runs:
if is_red(run) and run.text.strip():
red_text_parts.append(run.text.strip())
combined_red_text = " ".join(red_text_parts).strip()
print(f" Red text: '{combined_red_text}'")
# Find best match
best_match = None
best_key = None
# Exact matching
for key, value in all_values.items():
if combined_red_text.lower() == value.lower():
best_match = value
best_key = key
break
# Partial matching
if not best_match:
for key, value in all_values.items():
if (len(value) > 3 and value.lower() in combined_red_text.lower()) or \
(len(combined_red_text) > 3 and combined_red_text.lower() in value.lower()):
best_match = value
best_key = key
break
# Word-by-word matching for names/dates
if not best_match:
red_words = set(word.lower() for word in combined_red_text.split() if len(word) > 2)
best_score = 0
for key, value in all_values.items():
value_words = set(word.lower() for word in str(value).split() if len(word) > 2)
if red_words and value_words:
common_words = red_words.intersection(value_words)
if common_words:
score = len(common_words) / len(red_words)
if score > best_score and score >= 0.5: # At least 50% match
best_score = score
best_match = value
best_key = key
# Replace if we found a match
if best_match:
print(f" βœ… Replacing with: '{best_match}' (from key: '{best_key}')")
cell_replacements = replace_red_text_in_cell(cell, best_match)
replacements_made += cell_replacements
print(f" Made {cell_replacements} replacements")
else:
print(f" ❌ No suitable replacement found")
# Process all paragraphs
for para_idx, paragraph in enumerate(document.paragraphs):
if has_red_text_in_paragraph(paragraph):
red_text_parts = []
for run in paragraph.runs:
if is_red(run) and run.text.strip():
red_text_parts.append(run.text.strip())
combined_red_text = " ".join(red_text_parts).strip()
if combined_red_text:
print(f" πŸ” Found red text in Paragraph {para_idx + 1}: '{combined_red_text}'")
# Same matching logic as above
best_match = None
best_key = None
# Exact match
for key, value in all_values.items():
if combined_red_text.lower() == value.lower():
best_match = value
best_key = key
break
# Partial match
if not best_match:
for key, value in all_values.items():
if (len(value) > 3 and value.lower() in combined_red_text.lower()) or \
(len(combined_red_text) > 3 and combined_red_text.lower() in value.lower()):
best_match = value
best_key = key
break
# Word match
if not best_match:
red_words = set(word.lower() for word in combined_red_text.split() if len(word) > 2)
best_score = 0
for key, value in all_values.items():
value_words = set(word.lower() for word in str(value).split() if len(word) > 2)
if red_words and value_words:
common_words = red_words.intersection(value_words)
if common_words:
score = len(common_words) / len(red_words)
if score > best_score and score >= 0.5:
best_score = score
best_match = value
best_key = key
# Replace if found
if best_match:
print(f" βœ… Replacing with: '{best_match}' (from key: '{best_key}')")
red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()]
if red_runs:
red_runs[0].text = best_match
red_runs[0].font.color.rgb = RGBColor(0, 0, 0)
for run in red_runs[1:]:
run.text = ''
replacements_made += 1
print(f" Made 1 paragraph replacement")
else:
print(f" ❌ No suitable replacement found")
return replacements_made
def process_hf(json_file, docx_file, output_file):
"""Main processing function with comprehensive error handling"""
try:
# Load JSON
if hasattr(json_file, "read"):
json_data = json.load(json_file)
else:
with open(json_file, 'r', encoding='utf-8') as f:
json_data = json.load(f)
flat_json = flatten_json(json_data)
print("πŸ“„ Available JSON keys (sample):")
for i, (key, value) in enumerate(sorted(flat_json.items())):
if i < 10:
print(f" - {key}: {value}")
print(f" ... and {len(flat_json) - 10} more keys\n")
# Load DOCX
if hasattr(docx_file, "read"):
doc = Document(docx_file)
else:
doc = Document(docx_file)
# Process document with all fixes
print("πŸš€ Starting comprehensive document processing...")
table_replacements = process_tables(doc, flat_json)
paragraph_replacements = process_paragraphs(doc, flat_json)
heading_replacements = process_headings(doc, flat_json)
# Final force fix for any remaining red text
force_replacements = force_red_text_replacement(doc, flat_json)
total_replacements = table_replacements + paragraph_replacements + heading_replacements + force_replacements
# Save output
if hasattr(output_file, "write"):
doc.save(output_file)
else:
doc.save(output_file)
print(f"\nβœ… Document saved as: {output_file}")
print(f"βœ… Total replacements: {total_replacements}")
print(f" πŸ“Š Tables: {table_replacements}")
print(f" πŸ“ Paragraphs: {paragraph_replacements}")
print(f" πŸ“‹ Headings: {heading_replacements}")
print(f" 🎯 Force fixes: {force_replacements}")
print(f"πŸŽ‰ Processing complete!")
except FileNotFoundError as e:
print(f"❌ File not found: {e}")
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
import sys
if len(sys.argv) != 4:
print("Usage: python pipeline.py <input_docx> <updated_json> <output_docx>")
exit(1)
docx_path = sys.argv[1]
json_path = sys.argv[2]
output_path = sys.argv[3]
process_hf(json_path, docx_path, output_path)