Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Fixed PDF Data Extractor - Addresses key issues in comprehensive_extract.py | |
| Key fixes: | |
| 1. Better table extraction and cleaning | |
| 2. Improved key-value pair extraction | |
| 3. More robust text processing | |
| 4. Enhanced vehicle registration extraction | |
| 5. Better date/number pattern recognition | |
| """ | |
| import json | |
| import re | |
| import pandas as pd | |
| from typing import Dict, List, Any, Optional | |
| import logging | |
| from pathlib import Path | |
| import sys | |
| from datetime import datetime | |
| try: | |
| import pdfplumber | |
| HAS_PDFPLUMBER = True | |
| except ImportError: | |
| HAS_PDFPLUMBER = False | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger("fixed_pdf_extractor") | |
| class FixedPDFExtractor: | |
| def __init__(self): | |
| logger.info("π Initializing Fixed PDF Extractor") | |
| def extract_everything(self, pdf_path: str) -> Dict[str, Any]: | |
| if not HAS_PDFPLUMBER: | |
| raise RuntimeError("pdfplumber is required. Install with: pip install pdfplumber") | |
| logger.info(f"π Processing PDF: {pdf_path}") | |
| result = { | |
| "document_info": { | |
| "filename": Path(pdf_path).name, | |
| "total_pages": 0, | |
| "extraction_timestamp": datetime.now().isoformat() | |
| }, | |
| "extracted_data": { | |
| "all_text_content": [], | |
| "all_tables": [], | |
| "key_value_pairs": {}, | |
| "audit_information": {}, | |
| "operator_information": {}, | |
| "vehicle_registrations": [], | |
| "driver_records": [], | |
| "compliance_summary": {}, | |
| "dates_and_numbers": {} | |
| } | |
| } | |
| all_text_blocks, all_tables = [], [] | |
| with pdfplumber.open(pdf_path) as pdf: | |
| result["document_info"]["total_pages"] = len(pdf.pages) | |
| for page_num, page in enumerate(pdf.pages, 1): | |
| logger.info(f"π Processing page {page_num}") | |
| # Extract text with better handling | |
| page_text = self._extract_page_text(page) | |
| if page_text: | |
| all_text_blocks.append({ | |
| "page": page_num, | |
| "text": page_text, | |
| "word_count": len(page_text.split()) | |
| }) | |
| # Extract tables with improved cleaning | |
| tables = self._extract_page_tables(page, page_num) | |
| all_tables.extend(tables) | |
| result["extracted_data"]["all_text_content"] = all_text_blocks | |
| result["extracted_data"]["all_tables"] = all_tables | |
| # Process extracted data with improved methods | |
| combined_text = "\n\n".join(b["text"] for b in all_text_blocks) | |
| result["extracted_data"]["key_value_pairs"] = self._extract_key_value_pairs_improved(combined_text) | |
| result["extracted_data"]["audit_information"] = self._extract_audit_info(combined_text, all_tables) | |
| result["extracted_data"]["operator_information"] = self._extract_operator_info(combined_text, all_tables) | |
| result["extracted_data"]["vehicle_registrations"] = self._extract_vehicle_registrations(all_tables) | |
| result["extracted_data"]["driver_records"] = self._extract_driver_records(all_tables) | |
| result["extracted_data"]["compliance_summary"] = self._extract_compliance_summary(combined_text, all_tables) | |
| result["extracted_data"]["dates_and_numbers"] = self._extract_dates_and_numbers_improved(combined_text) | |
| # Generate summary | |
| result["extraction_summary"] = { | |
| "text_blocks_found": len(all_text_blocks), | |
| "tables_found": len(all_tables), | |
| "key_value_pairs_found": len(result["extracted_data"]["key_value_pairs"]), | |
| "vehicle_registrations_found": len(result["extracted_data"]["vehicle_registrations"]), | |
| "driver_records_found": len(result["extracted_data"]["driver_records"]), | |
| "total_characters": len(combined_text), | |
| "processing_timestamp": datetime.now().isoformat() | |
| } | |
| logger.info("β Extraction completed!") | |
| return result | |
| def _extract_page_text(self, page) -> Optional[str]: | |
| """Extract text from page with better handling""" | |
| try: | |
| text = page.extract_text() | |
| if text: | |
| # Clean up text | |
| text = re.sub(r'[ \t]+', ' ', text.strip()) | |
| text = re.sub(r'\n\s*\n', '\n', text) | |
| return text | |
| except Exception as e: | |
| logger.warning(f"Failed to extract text from page: {e}") | |
| return None | |
| def _extract_page_tables(self, page, page_num: int) -> List[Dict]: | |
| """Extract tables with improved processing""" | |
| tables = [] | |
| try: | |
| raw_tables = page.extract_tables() | |
| if raw_tables: | |
| for table_idx, table in enumerate(raw_tables): | |
| cleaned_table = self._clean_table_improved(table) | |
| if cleaned_table and len(cleaned_table) > 0: | |
| tables.append({ | |
| "page": page_num, | |
| "table_index": table_idx + 1, | |
| "headers": cleaned_table[0] if cleaned_table else [], | |
| "data": cleaned_table[1:] if len(cleaned_table) > 1 else [], | |
| "raw_data": cleaned_table, | |
| "row_count": len(cleaned_table) - 1 if len(cleaned_table) > 1 else 0, | |
| "column_count": len(cleaned_table[0]) if cleaned_table else 0 | |
| }) | |
| except Exception as e: | |
| logger.warning(f"Failed to extract tables from page {page_num}: {e}") | |
| return tables | |
| def _clean_table_improved(self, table: List[List]) -> List[List[str]]: | |
| """Improved table cleaning with better cell processing""" | |
| if not table: | |
| return [] | |
| cleaned = [] | |
| for row in table: | |
| cleaned_row = [] | |
| for cell in row: | |
| if cell is None: | |
| cleaned_cell = "" | |
| else: | |
| cleaned_cell = str(cell).strip() | |
| cleaned_cell = re.sub(r'\s+', ' ', cleaned_cell) | |
| cleaned_cell = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', cleaned_cell) | |
| cleaned_row.append(cleaned_cell) | |
| if any(cell.strip() for cell in cleaned_row): | |
| cleaned.append(cleaned_row) | |
| # Optional: collapse single-column tables of empty strings | |
| if cleaned and all(len(r) == len(cleaned[0]) for r in cleaned): | |
| return cleaned | |
| return cleaned | |
| def _extract_key_value_pairs_improved(self, text: str) -> Dict[str, str]: | |
| """Improved key-value pair extraction with better cleaning""" | |
| pairs: Dict[str, str] = {} | |
| # Normalize text a bit for regex stability | |
| t = text.replace('\r', '\n') | |
| # Pattern 1: colon-separated pairs (key: value) | |
| pattern1 = re.compile( | |
| r'([A-Za-z][\w\s()/\-.]{2,80}?):\s*([^\n\r:][^\n\r]*)' | |
| ) | |
| for key, val in pattern1.findall(t): | |
| k = key.strip() | |
| v = val.strip() | |
| # Filter junk: very long values, pure separators, or obvious headers | |
| if not v or len(v) > 200: | |
| continue | |
| if re.fullmatch(r'[-_/\.]+', v): | |
| continue | |
| # Avoid capturing the next key as value by trimming trailing key-like tokens | |
| v = re.sub(r'\s+[A-Z][\w\s()/\-.]{2,40}:$', '', v).strip() | |
| # Skip values that are just long digit runs (likely id lists without meaning) | |
| if re.fullmatch(r'\d{6,}', v): | |
| continue | |
| pairs[k] = v | |
| # Pattern 2: inline βKey β Valueβ or βKey β Valueβ | |
| pattern2 = re.compile(r'([A-Za-z][\w\s()/\-.]{2,80}?)\s*[ββ-]\s*([^\n\r]+)') | |
| for key, val in pattern2.findall(t): | |
| k = key.strip() | |
| v = val.strip() | |
| if v and len(v) <= 200 and not re.fullmatch(r'\d{6,}', v): | |
| pairs.setdefault(k, v) | |
| return pairs | |
| def _extract_audit_info(self, text: str, tables: List[Dict]) -> Dict[str, Any]: | |
| """Extract audit-specific information with better filtering""" | |
| audit_info: Dict[str, Any] = {} | |
| # Prefer tables | |
| for table in tables: | |
| headers = [str(h).lower() for h in table.get("headers", [])] | |
| joined = ' '.join(headers) | |
| if "audit information" in joined or "auditinformation" in joined: | |
| data = table.get("data", []) | |
| for row in data: | |
| if len(row) >= 2 and row[0] and row[1]: | |
| key = str(row[0]).strip() | |
| value = str(row[1]).strip() | |
| # Skip numbered list rows (e.g., "1.", "2)") | |
| if re.match(r'^\s*\d+\s*[.)]\s*$', key): | |
| continue | |
| if key and value: | |
| audit_info[key] = value | |
| # Backup from text | |
| candidates = { | |
| "Date of Audit": r'Date\s+of\s+Audit[:\s]*([^\n\r]+)', | |
| "Location of audit": r'Location\s+of\s+audit[:\s]*([^\n\r]+)', | |
| "Auditor name": r'Auditor\s+name[:\s]*([^\n\r]+)', | |
| "Audit Matrix Identifier (Name or Number)": r'Audit\s+Matrix\s+Identifier.*?[:\s]*([^\n\r]+)', | |
| } | |
| for k, pat in candidates.items(): | |
| if k not in audit_info: | |
| m = re.search(pat, text, re.IGNORECASE) | |
| if m: | |
| audit_info[k] = m.group(1).strip() | |
| return audit_info | |
| def _extract_operator_info(self, text: str, tables: List[Dict]) -> Dict[str, Any]: | |
| """Extract operator information with better table parsing""" | |
| operator_info: Dict[str, Any] = {} | |
| # Look for operator information in tables first | |
| for table in tables: | |
| headers = [str(h).lower() for h in table.get("headers", [])] | |
| if ("operatorinformation" in ' '.join(headers) or | |
| "operator information" in ' '.join(headers) or | |
| "operatorcontactdetails" in ' '.join(headers)): | |
| data = table.get("data", []) | |
| for row in data: | |
| if len(row) >= 2 and row[0] and row[1]: | |
| key = str(row[0]).strip() | |
| value = str(row[1]).strip() | |
| if key and value: | |
| # Clean up key names | |
| kl = key.lower() | |
| if "operator name" in kl: | |
| operator_info["operator_name"] = value | |
| elif "trading name" in kl: | |
| operator_info["trading_name"] = value | |
| elif "company number" in kl: | |
| if len(row) > 2: | |
| company_parts = [str(r).strip() for r in row[1:] if str(r).strip()] | |
| operator_info["company_number"] = "".join(company_parts) | |
| else: | |
| operator_info["company_number"] = value | |
| elif "business address" in kl: | |
| operator_info["business_address"] = value | |
| elif "postal address" in kl: | |
| operator_info["postal_address"] = value | |
| elif "email" in kl: | |
| operator_info["email"] = value | |
| elif "telephone" in kl or "phone" in kl: | |
| operator_info["phone"] = value | |
| elif "nhvas accreditation" in kl: | |
| operator_info["nhvas_accreditation"] = value | |
| elif "nhvas manual" in kl: | |
| operator_info["nhvas_manual"] = value | |
| # Extract from text patterns as backup | |
| patterns = { | |
| 'operator_name': r'Operator\s*name[:\s\(]*([^\n\r\)]+?)(?=\s*NHVAS|\s*Registered|$)', | |
| 'trading_name': r'Registered\s*trading\s*name[:\s\/]*([^\n\r]+?)(?=\s*Australian|$)', | |
| 'company_number': r'Australian\s*Company\s*Number[:\s]*([0-9\s]+?)(?=\s*NHVAS|$)', | |
| 'business_address': r'Operator\s*business\s*address[:\s]*([^\n\r]+?)(?=\s*Operator\s*Postal|$)', | |
| 'postal_address': r'Operator\s*Postal\s*address[:\s]*([^\n\r]+?)(?=\s*Email|$)', | |
| 'email': r'Email\s*address[:\s]*([^\s\n\r]+)', | |
| 'phone': r'Operator\s*Telephone\s*Number[:\s]*([^\s\n\r]+)', | |
| 'nhvas_accreditation': r'NHVAS\s*Accreditation\s*No\.[:\s\(]*([^\n\r\)]+)', | |
| } | |
| for key, pattern in patterns.items(): | |
| if key not in operator_info: # Only use text if not found in tables | |
| match = re.search(pattern, text, re.IGNORECASE) | |
| if match: | |
| value = match.group(1).strip() | |
| if value and len(value) < 200: | |
| if key == 'company_number': | |
| value = re.sub(r'\s+', '', value) | |
| operator_info[key] = value | |
| return operator_info | |
| def _extract_vehicle_registrations(self, tables: List[Dict]) -> List[Dict]: | |
| """Extract vehicle registration information from tables""" | |
| vehicles: List[Dict[str, Any]] = [] | |
| for table in tables: | |
| headers = [str(h).lower() for h in table.get("headers", [])] | |
| # Look for vehicle registration tables | |
| if any(keyword in ' '.join(headers) for keyword in ['registration', 'vehicle', 'number']): | |
| reg_col = None | |
| for i, header in enumerate(headers): | |
| if 'registration' in header and 'number' in header: | |
| reg_col = i | |
| break | |
| if reg_col is not None: | |
| data = table.get("data", []) | |
| for row in data: | |
| if len(row) > reg_col and row[reg_col]: | |
| reg_num = str(row[reg_col]).strip() | |
| # Validate registration format (letters/numbers) | |
| if re.match(r'^[A-Z]{1,3}\s*\d{1,3}\s*[A-Z]{0,3}$', reg_num): | |
| vehicle_info = {"registration_number": reg_num} | |
| # Add other columns as additional info | |
| for i, header in enumerate(table.get("headers", [])): | |
| if i < len(row) and i != reg_col: | |
| vehicle_info[str(header)] = str(row[i]).strip() | |
| vehicles.append(vehicle_info) | |
| return vehicles | |
| def _extract_driver_records(self, tables: List[Dict]) -> List[Dict]: | |
| """Extract driver records from tables""" | |
| drivers: List[Dict[str, Any]] = [] | |
| for table in tables: | |
| headers = [str(h).lower() for h in table.get("headers", [])] | |
| # Look for driver/scheduler tables | |
| if any(keyword in ' '.join(headers) for keyword in ['driver', 'scheduler', 'name']): | |
| name_col = None | |
| for i, header in enumerate(headers): | |
| if 'name' in header: | |
| name_col = i | |
| break | |
| if name_col is not None: | |
| data = table.get("data", []) | |
| for row in data: | |
| if len(row) > name_col and row[name_col]: | |
| name = str(row[name_col]).strip() | |
| # Basic name validation | |
| if re.match(r'^[A-Za-z\s]{2,}$', name) and len(name.split()) >= 2: | |
| driver_info = {"name": name} | |
| # Add other columns | |
| for i, header in enumerate(table.get("headers", [])): | |
| if i < len(row) and i != name_col: | |
| driver_info[str(header)] = str(row[i]).strip() | |
| drivers.append(driver_info) | |
| return drivers | |
| def _extract_compliance_summary(self, text: str, tables: List[Dict]) -> Dict[str, Any]: | |
| """Extract compliance information""" | |
| compliance = { | |
| "standards_compliance": {}, | |
| "compliance_codes": {}, | |
| "audit_results": [] | |
| } | |
| # Look for compliance tables | |
| for table in tables: | |
| headers = [str(h).lower() for h in table.get("headers", [])] | |
| if any(keyword in ' '.join(headers) for keyword in ['compliance', 'standard', 'requirement']): | |
| data = table.get("data", []) | |
| for row in data: | |
| if len(row) >= 2: | |
| standard = str(row[0]).strip() | |
| code = str(row[1]).strip() | |
| if standard.startswith('Std') and code in ['V', 'NC', 'SFI', 'NAP', 'NA']: | |
| compliance["standards_compliance"][standard] = code | |
| # Extract compliance codes definitions | |
| code_patterns = { | |
| 'V': r'\bV\b\s+([^\n\r]+)', | |
| 'NC': r'\bNC\b\s+([^\n\r]+)', | |
| 'SFI': r'\bSFI\b\s+([^\n\r]+)', | |
| 'NAP': r'\bNAP\b\s+([^\n\r]+)', | |
| 'NA': r'\bNA\b\s+([^\n\r]+)', | |
| } | |
| for code, pattern in code_patterns.items(): | |
| match = re.search(pattern, text, re.IGNORECASE) | |
| if match: | |
| compliance["compliance_codes"][code] = match.group(1).strip() | |
| return compliance | |
| def _extract_dates_and_numbers_improved(self, text: str) -> Dict[str, Any]: | |
| """Improved date and number extraction""" | |
| result = { | |
| "dates": [], | |
| "registration_numbers": [], | |
| "phone_numbers": [], | |
| "email_addresses": [], | |
| "reference_numbers": [] | |
| } | |
| # Date patterns | |
| date_patterns = [ | |
| r'\b(\d{1,2}(?:st|nd|rd|th)?\s+[A-Za-z]+\s+\d{4})\b', | |
| r'\b(\d{1,2}/\d{1,2}/\d{4})\b', | |
| r'\b(\d{1,2}-\d{1,2}-\d{4})\b', | |
| r'\b(\d{1,2}\.\d{1,2}\.\d{4})\b', | |
| ] | |
| for pattern in date_patterns: | |
| result["dates"].extend(re.findall(pattern, text)) | |
| # Registration numbers (Australian format-ish) | |
| reg_pattern = r'\b([A-Z]{1,3}\s*\d{1,3}\s*[A-Z]{0,3})\b' | |
| result["registration_numbers"] = list(set(re.findall(reg_pattern, text))) | |
| # Phone numbers (AU) | |
| phone_pattern = r'\b((?:\+61|0)[2-9]\s?\d{4}\s?\d{4})\b' | |
| result["phone_numbers"] = list(set(re.findall(phone_pattern, text))) | |
| # Email addresses | |
| email_pattern = r'\b([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b' | |
| result["email_addresses"] = list(set(re.findall(email_pattern, text))) | |
| # Reference numbers | |
| ref_patterns = [ | |
| (r'RF(?:S)?\s*#?\s*(\d+)', 'RFS_Certifications'), | |
| (r'NHVAS\s+Accreditation\s+No\.?\s*(\d+)', 'NHVAS_Numbers'), | |
| (r'Registration\s+Number\s*#?\s*(\d+)', 'Registration_Numbers'), | |
| ] | |
| for pattern, key in ref_patterns: | |
| matches = re.findall(pattern, text, re.IGNORECASE) | |
| if matches: | |
| result["reference_numbers"].extend([f"{key}: {m}" for m in matches]) | |
| return result | |
| def save_results(results: Dict[str, Any], output_path: str): | |
| """Save results to JSON file""" | |
| try: | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| json.dump(results, f, indent=2, ensure_ascii=False) | |
| logger.info(f"πΎ Results saved to {output_path}") | |
| except Exception as e: | |
| logger.error(f"Failed to save results: {e}") | |
| def export_to_excel(results: Dict[str, Any], excel_path: str): | |
| """Export results to Excel with improved formatting""" | |
| try: | |
| with pd.ExcelWriter(excel_path, engine='openpyxl') as writer: | |
| # Summary sheet | |
| summary_data = [] | |
| extraction_summary = results.get("extraction_summary", {}) | |
| for key, value in extraction_summary.items(): | |
| summary_data.append({"Metric": key.replace("_", " ").title(), "Value": value}) | |
| pd.DataFrame(summary_data).to_excel(writer, sheet_name='Summary', index=False) | |
| # Key-value pairs | |
| kv_pairs = results.get("extracted_data", {}).get("key_value_pairs", {}) | |
| if kv_pairs: | |
| kv_df = pd.DataFrame(list(kv_pairs.items()), columns=['Key', 'Value']) | |
| kv_df.to_excel(writer, sheet_name='Key_Value_Pairs', index=False) | |
| # Vehicle registrations | |
| vehicles = results.get("extracted_data", {}).get("vehicle_registrations", []) | |
| if vehicles: | |
| pd.DataFrame(vehicles).to_excel(writer, sheet_name='Vehicle_Registrations', index=False) | |
| # Driver records | |
| drivers = results.get("extracted_data", {}).get("driver_records", []) | |
| if drivers: | |
| pd.DataFrame(drivers).to_excel(writer, sheet_name='Driver_Records', index=False) | |
| # Compliance summary | |
| compliance = results.get("extracted_data", {}).get("compliance_summary", {}) | |
| if compliance.get("standards_compliance"): | |
| comp_df = pd.DataFrame(list(compliance["standards_compliance"].items()), | |
| columns=['Standard', 'Compliance_Code']) | |
| comp_df.to_excel(writer, sheet_name='Compliance_Standards', index=False) | |
| logger.info(f"π Results exported to Excel: {excel_path}") | |
| except Exception as e: | |
| logger.error(f"Failed to export to Excel: {e}") | |
| def main(): | |
| if len(sys.argv) < 2: | |
| print("Usage: python fixed_pdf_extractor.py <pdf_path>") | |
| sys.exit(1) | |
| pdf_path = Path(sys.argv[1]) | |
| if not pdf_path.exists(): | |
| print(f"β PDF not found: {pdf_path}") | |
| sys.exit(1) | |
| print("π Fixed PDF Data Extractor") | |
| print("=" * 50) | |
| extractor = FixedPDFExtractor() | |
| results = extractor.extract_everything(str(pdf_path)) | |
| base = pdf_path.stem | |
| output_dir = pdf_path.parent | |
| # Save outputs | |
| json_path = output_dir / f"{base}_comprehensive_data.json" | |
| excel_path = output_dir / f"{base}_fixed_extraction.xlsx" | |
| FixedPDFExtractor.save_results(results, str(json_path)) | |
| FixedPDFExtractor.export_to_excel(results, str(excel_path)) | |
| print("\nπΎ OUTPUT FILES:") | |
| print(f" π JSON Data: {json_path}") | |
| print(f" π Excel Data: {excel_path}") | |
| print(f"\n⨠FIXED EXTRACTION COMPLETE!") | |
| if __name__ == "__main__": | |
| main() | |