Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Enhanced update_docx_with_pdf.py with better JSON structure handling | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import time | |
| import re | |
| from typing import Optional | |
| try: | |
| from openai import OpenAI | |
| except Exception: | |
| OpenAI = None | |
| # Config | |
| RETRIES = 3 | |
| RETRY_DELAY = 1.0 | |
| DEFAULT_MODEL = os.environ.get("OPENAI_MODEL", "gpt-4o") | |
| MAX_TOKENS = 4096 | |
| TEMPERATURE = 0.0 | |
| def read_any(path_or_file): | |
| """Read content from file path or file-like object.""" | |
| if hasattr(path_or_file, "read"): | |
| path_or_file.seek(0) | |
| content = path_or_file.read() | |
| if isinstance(content, bytes): | |
| content = content.decode("utf-8") | |
| return content | |
| else: | |
| with open(path_or_file, "r", encoding="utf-8") as fh: | |
| return fh.read() | |
| def find_first_balanced_json(s: str) -> Optional[str]: | |
| """Find the first valid JSON object in the string""" | |
| if not s: | |
| return None | |
| for m in re.finditer(r"\{", s): | |
| start = m.start() | |
| depth = 0 | |
| in_str = False | |
| escape = False | |
| for i in range(start, len(s)): | |
| ch = s[i] | |
| if ch == '"' and not escape: | |
| in_str = not in_str | |
| if in_str: | |
| if ch == "\\" and not escape: | |
| escape = True | |
| else: | |
| escape = False | |
| continue | |
| if ch == "{": | |
| depth += 1 | |
| elif ch == "}": | |
| depth -= 1 | |
| if depth == 0: | |
| candidate = s[start : i + 1] | |
| try: | |
| json.loads(candidate) | |
| return candidate | |
| except Exception: | |
| break | |
| return None | |
| def call_model_and_get_raw(client, model_name: str, system_msg: str, user_msg: str): | |
| """Call the model and return raw text content""" | |
| resp = client.chat.completions.create( | |
| model=model_name, | |
| messages=[ | |
| {"role": "system", "content": system_msg}, | |
| {"role": "user", "content": user_msg}, | |
| ], | |
| max_tokens=MAX_TOKENS, | |
| temperature=TEMPERATURE, | |
| ) | |
| try: | |
| raw_text = resp.choices[0].message.content | |
| except Exception: | |
| try: | |
| raw_text = resp.choices[0].text | |
| except Exception: | |
| raw_text = str(resp) | |
| if isinstance(raw_text, bytes): | |
| raw_text = raw_text.decode("utf-8", errors="replace") | |
| return (raw_text or "").strip() | |
| def create_enhanced_prompt(word_json, pdf_text): | |
| """Create an enhanced prompt that ensures proper JSON structure""" | |
| # Analyze the word_json structure to understand what needs to be filled | |
| structure_analysis = [] | |
| def analyze_structure(obj, path=""): | |
| if isinstance(obj, dict): | |
| for key, value in obj.items(): | |
| current_path = f"{path}.{key}" if path else key | |
| if isinstance(value, dict): | |
| structure_analysis.append(f" {current_path} (nested object)") | |
| analyze_structure(value, current_path) | |
| elif isinstance(value, list): | |
| structure_analysis.append(f" {current_path} (list with {len(value)} items)") | |
| elif value is None or str(value).strip() == "": | |
| structure_analysis.append(f" {current_path} (EMPTY - needs data)") | |
| else: | |
| structure_analysis.append(f" {current_path} (has data: {str(value)[:50]}...)") | |
| analyze_structure(word_json) | |
| system_msg = """You are a precise JSON data extraction assistant. | |
| CRITICAL RULES: | |
| 1. Output ONLY valid JSON - no markdown, no explanations, no extra text | |
| 2. Maintain the EXACT structure provided in the template | |
| 3. Only UPDATE fields that are empty or null - do not change existing data | |
| 4. Extract data accurately from the PDF text provided | |
| 5. If you cannot find data for a field, leave it as null or empty string | |
| 6. Ensure all nested objects and arrays maintain their structure""" | |
| user_prompt = f"""TASK: Update this JSON template with data from the PDF text. | |
| JSON TEMPLATE TO UPDATE: | |
| {json.dumps(word_json, indent=2, ensure_ascii=False)} | |
| STRUCTURE ANALYSIS: | |
| {chr(10).join(structure_analysis[:50])} # Limit to first 50 for brevity | |
| PDF SOURCE TEXT: | |
| {pdf_text[:100000]} # Truncate very long text | |
| EXTRACTION GUIDELINES: | |
| - For "Operator name (Legal entity)" or similar: Extract the company name | |
| - For "Date of Audit": Look for audit dates (format: DD Month YYYY or DD/MM/YYYY) | |
| - For "Auditor name": Extract auditor's name | |
| - For "Attendance List": Extract names and positions, format as list | |
| - For vehicle data: Extract registration numbers, maintenance info, etc. | |
| - For management summaries: Extract compliance details and findings | |
| CRITICAL: Return ONLY the updated JSON object. No other text whatsoever.""" | |
| return system_msg, user_prompt | |
| def update_json_with_pdf(word_json_file, pdf_txt_file, output_file): | |
| # Load inputs | |
| word_json_text = read_any(word_json_file) | |
| pdf_txt = read_any(pdf_txt_file) | |
| try: | |
| word_json = json.loads(word_json_text) | |
| except Exception as e: | |
| print(f"⚠️ Input word_json is not valid JSON: {e}") | |
| print("Writing original to output and exiting.") | |
| if hasattr(output_file, "write"): | |
| output_file.write(word_json_text) | |
| output_file.flush() | |
| else: | |
| with open(output_file, "w", encoding="utf-8") as f: | |
| f.write(word_json_text) | |
| return | |
| # Check API key | |
| api_key = os.environ.get("OPENAI_API_KEY") | |
| if not api_key: | |
| print("⚠️ OPENAI_API_KEY not found! Writing original JSON to output.") | |
| if hasattr(output_file, "write"): | |
| json.dump(word_json, output_file, indent=2, ensure_ascii=False) | |
| output_file.flush() | |
| else: | |
| with open(output_file, "w", encoding="utf-8") as f: | |
| json.dump(word_json, f, indent=2, ensure_ascii=False) | |
| return | |
| if OpenAI is None: | |
| print("⚠️ OpenAI SDK not available. Writing original JSON to output.") | |
| if hasattr(output_file, "write"): | |
| json.dump(word_json, output_file, indent=2, ensure_ascii=False) | |
| output_file.flush() | |
| else: | |
| with open(output_file, "w", encoding="utf-8") as f: | |
| json.dump(word_json, f, indent=2, ensure_ascii=False) | |
| return | |
| # Create enhanced prompts | |
| system_msg, user_prompt = create_enhanced_prompt(word_json, pdf_txt) | |
| print(f"📊 Original JSON has {len(json.dumps(word_json))} characters") | |
| print(f"📊 PDF text has {len(pdf_txt)} characters") | |
| client = OpenAI(api_key=api_key) | |
| model_name = DEFAULT_MODEL | |
| # Try multiple attempts with different strategies | |
| for attempt in range(1, RETRIES + 1): | |
| print(f"🛰️ Calling LLM (attempt {attempt}/{RETRIES}) with model {model_name}...") | |
| # Modify prompt for different attempts | |
| current_user_prompt = user_prompt | |
| if attempt == 2: | |
| current_user_prompt += "\n\nIMPORTANT: Focus on extracting the most obvious data first. Ensure valid JSON format." | |
| elif attempt >= 3: | |
| current_user_prompt += "\n\nFINAL ATTEMPT: Return a valid JSON object even if some fields remain empty. Prioritize JSON validity over completeness." | |
| try: | |
| raw_text = call_model_and_get_raw(client, model_name, system_msg, current_user_prompt) | |
| # Save raw output for debugging | |
| out_base = output_file if isinstance(output_file, str) else getattr(output_file, "name", "output") | |
| raw_save_path = f"{out_base}.model_raw_attempt{attempt}.txt" | |
| try: | |
| with open(raw_save_path, "w", encoding="utf-8") as f: | |
| f.write(raw_text) | |
| except: | |
| pass | |
| # Try to parse directly | |
| try: | |
| parsed = json.loads(raw_text) | |
| print("✅ Model returned valid JSON directly.") | |
| # Validate structure matches original | |
| if validate_json_structure(parsed, word_json): | |
| print("✅ JSON structure validation passed.") | |
| else: | |
| print("⚠️ JSON structure differs from template, but proceeding...") | |
| # Write output | |
| if hasattr(output_file, "write"): | |
| json.dump(parsed, output_file, indent=2, ensure_ascii=False) | |
| output_file.flush() | |
| else: | |
| with open(output_file, "w", encoding="utf-8") as f: | |
| json.dump(parsed, f, indent=2, ensure_ascii=False) | |
| return parsed | |
| except Exception as parse_error: | |
| print(f"⚠️ Direct parsing failed: {parse_error}") | |
| # Try to extract JSON substring | |
| candidate = find_first_balanced_json(raw_text) | |
| if candidate: | |
| try: | |
| parsed = json.loads(candidate) | |
| print("✅ Successfully extracted JSON substring from model output.") | |
| # Write output | |
| if hasattr(output_file, "write"): | |
| json.dump(parsed, output_file, indent=2, ensure_ascii=False) | |
| output_file.flush() | |
| else: | |
| with open(output_file, "w", encoding="utf-8") as f: | |
| json.dump(parsed, f, indent=2, ensure_ascii=False) | |
| return parsed | |
| except Exception as sub_parse_error: | |
| print(f"⚠️ Substring parsing also failed: {sub_parse_error}") | |
| # Try repair pass | |
| print("🔧 Attempting JSON repair...") | |
| repair_system = "You are a JSON repair specialist. Fix the JSON below to be valid. Return ONLY valid JSON, nothing else." | |
| repair_user = f"Fix this JSON:\n\n{raw_text}" | |
| try: | |
| repair_raw = call_model_and_get_raw(client, model_name, repair_system, repair_user) | |
| repair_parsed = json.loads(repair_raw) | |
| print("✅ Repair pass succeeded.") | |
| # Write output | |
| if hasattr(output_file, "write"): | |
| json.dump(repair_parsed, output_file, indent=2, ensure_ascii=False) | |
| output_file.flush() | |
| else: | |
| with open(output_file, "w", encoding="utf-8") as f: | |
| json.dump(repair_parsed, f, indent=2, ensure_ascii=False) | |
| return repair_parsed | |
| except Exception as repair_error: | |
| print(f"⚠️ Repair pass failed: {repair_error}") | |
| except Exception as call_error: | |
| print(f"⚠️ Exception while calling model: {call_error}") | |
| # Wait before next attempt | |
| if attempt < RETRIES: | |
| time.sleep(RETRY_DELAY) | |
| # All attempts failed | |
| print("❗ All LLM attempts failed. Writing original JSON to output.") | |
| try: | |
| if hasattr(output_file, "write"): | |
| json.dump(word_json, output_file, indent=2, ensure_ascii=False) | |
| output_file.flush() | |
| else: | |
| with open(output_file, "w", encoding="utf-8") as f: | |
| json.dump(word_json, f, indent=2, ensure_ascii=False) | |
| print("✅ Original JSON template written to output.") | |
| except Exception as e: | |
| print(f"⚠️ Failed to write original JSON: {e}") | |
| return None | |
| def validate_json_structure(parsed_json, original_json): | |
| """Validate that the parsed JSON maintains the original structure""" | |
| try: | |
| def compare_structure(parsed, original, path=""): | |
| if type(parsed) != type(original): | |
| print(f"⚠️ Type mismatch at {path}: {type(parsed)} vs {type(original)}") | |
| return False | |
| if isinstance(original, dict): | |
| for key in original.keys(): | |
| if key not in parsed: | |
| print(f"⚠️ Missing key at {path}.{key}") | |
| return False | |
| if not compare_structure(parsed[key], original[key], f"{path}.{key}"): | |
| return False | |
| return True | |
| return compare_structure(parsed_json, original_json) | |
| except Exception: | |
| return False | |
| if __name__ == "__main__": | |
| if len(sys.argv) != 4: | |
| print("Usage: python update_docx_with_pdf.py <word_json_file> <pdf_txt_file> <output_json_file>") | |
| sys.exit(0) | |
| try: | |
| update_json_with_pdf(sys.argv[1], sys.argv[2], sys.argv[3]) | |
| except Exception as e: | |
| print(f"Unexpected exception: {e}") | |
| try: | |
| with open(sys.argv[1], "r", encoding="utf-8") as inf, open(sys.argv[3], "w", encoding="utf-8") as outf: | |
| outf.write(inf.read()) | |
| print("Wrote original input JSON to output due to exception.") | |
| except Exception: | |
| pass | |
| sys.exit(0) |