#!/usr/bin/env python3 """ pipeline.py — safer matching and operator-declaration protections Key improvements: - find_matching_json_key_and_value() returns (key, value) so callers can accept/reject by key. - Higher fuzzy thresholds for risky substitutions. - Operator Declaration: avoid using attendance lists / unrelated keys for Position Title. - Vehicle header mapping: stronger normalized substring/ token matching for long headers. - Preserves existing logging and all previous handlers/logic. """ import json from docx import Document from docx.shared import RGBColor import re from typing import Any, Tuple, Optional # ============================================================================ # Heading patterns for document structure detection (unchanged) # ============================================================================ HEADING_PATTERNS = { "main": [ r"NHVAS\s+Audit\s+Summary\s+Report", r"NATIONAL\s+HEAVY\s+VEHICLE\s+ACCREDITATION\s+AUDIT\s+SUMMARY\s+REPORT", r"NHVAS\s+AUDIT\s+SUMMARY\s+REPORT" ], "sub": [ r"AUDIT\s+OBSERVATIONS\s+AND\s+COMMENTS", r"MAINTENANCE\s+MANAGEMENT", r"MASS\s+MANAGEMENT", r"FATIGUE\s+MANAGEMENT", r"Fatigue\s+Management\s+Summary\s+of\s+Audit\s+findings", r"MAINTENANCE\s+MANAGEMENT\s+SUMMARY\s+OF\s+AUDIT\s+FINDINGS", r"MASS\s+MANAGEMENT\s+SUMMARY\s+OF\s+AUDIT\s+FINDINGS", r"Vehicle\s+Registration\s+Numbers\s+of\s+Records\s+Examined", r"CORRECTIVE\s+ACTION\s+REQUEST\s+\(CAR\)", r"NHVAS\s+APPROVED\s+AUDITOR\s+DECLARATION", r"Operator\s+Declaration", r"Operator\s+Information", r"Driver\s*/\s*Scheduler\s+Records\s+Examined" ] } # ============================================================================ # Utility helpers # ============================================================================ _unmatched_headers = {} def record_unmatched_header(header: str): if not header: return _unmatched_headers[header] = _unmatched_headers.get(header, 0) + 1 def load_json(filepath): with open(filepath, 'r', encoding='utf-8') as file: return json.load(file) def flatten_json(y, prefix=''): out = {} for key, val in y.items(): new_key = f"{prefix}.{key}" if prefix else key if isinstance(val, dict): out.update(flatten_json(val, new_key)) else: out[new_key] = val out[key] = val return out def is_red(run): color = run.font.color try: return color and ((getattr(color, "rgb", None) and color.rgb == RGBColor(255, 0, 0)) or getattr(color, "theme_color", None) == 1) except Exception: return False def get_value_as_string(value, field_name=""): if isinstance(value, list): if len(value) == 0: return "" elif len(value) == 1: return str(value[0]) else: # Keep lists intact for special patterns (e.g., ACN digits) but default to join if "australian company number" in field_name.lower() or "company number" in field_name.lower(): return value return " ".join(str(v) for v in value) else: return str(value) def get_clean_text(cell): text = "" for paragraph in cell.paragraphs: for run in paragraph.runs: text += run.text return text.strip() def has_red_text(cell): for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run) and run.text.strip(): return True return False def has_red_text_in_paragraph(paragraph): for run in paragraph.runs: if is_red(run) and run.text.strip(): return True return False def normalize_header_text(s: str) -> str: if not s: return "" s = re.sub(r'\([^)]*\)', ' ', s) # remove parenthetical content s = s.replace("/", " ") s = re.sub(r'[^\w\s\#\%]', ' ', s) s = re.sub(r'\s+', ' ', s).strip().lower() # canonical tweaks s = s.replace('registrationno', 'registration number') s = s.replace('registrationnumber', 'registration number') s = s.replace('sub-contractor', 'sub contractor') s = s.replace('sub contracted', 'sub contractor') return s.strip() # ============================================================================ # JSON matching functions # - find_matching_json_value: (keeps behavior used elsewhere) # - find_matching_json_key_and_value: returns (key, value) so callers can # decide whether to use an entry based on the matched key. # ============================================================================ def find_matching_json_value(field_name, flat_json): """Legacy API: return value only (preserves existing callers).""" result = find_matching_json_key_and_value(field_name, flat_json) return result[1] if result else None def find_matching_json_key_and_value(field_name, flat_json) -> Optional[Tuple[str, Any]]: """ Return (matched_key, matched_value) or None. Safer thresholds: fuzzy matches require >=0.35 by default. """ field_name = (field_name or "").strip() if not field_name: return None # Exact match if field_name in flat_json: print(f" ✅ Direct match found for key '{field_name}'") return field_name, flat_json[field_name] # Case-insensitive exact for key, value in flat_json.items(): if key.lower() == field_name.lower(): print(f" ✅ Case-insensitive match found for key '{field_name}' -> '{key}'") return key, value # Special-case 'print name' preference for operator vs auditor (prefer fully-qualified) if field_name.lower().strip() == "print name": operator_keys = [k for k in flat_json.keys() if "operator" in k.lower() and "print name" in k.lower()] auditor_keys = [k for k in flat_json.keys() if "auditor" in k.lower() and ("print name" in k.lower() or "name" in k.lower())] if operator_keys: print(f" ✅ Operator Print Name match: '{field_name}' -> '{operator_keys[0]}'") return operator_keys[0], flat_json[operator_keys[0]] elif auditor_keys: print(f" ✅ Auditor Name match: '{field_name}' -> '{auditor_keys[0]}'") return auditor_keys[0], flat_json[auditor_keys[0]] # Suffix match for nested keys (e.g., 'section.field') for key, value in flat_json.items(): if '.' in key and key.split('.')[-1].lower() == field_name.lower(): print(f" ✅ Suffix match found for key '{field_name}' -> '{key}'") return key, value # Clean and exact clean_field = re.sub(r'[^\w\s]', ' ', field_name.lower()).strip() clean_field = re.sub(r'\s+', ' ', clean_field) for key, value in flat_json.items(): clean_key = re.sub(r'[^\w\s]', ' ', key.lower()).strip() clean_key = re.sub(r'\s+', ' ', clean_key) if clean_field == clean_key: print(f" ✅ Clean match found for key '{field_name}' -> '{key}'") return key, value # Fuzzy matching with token scoring field_words = set(word.lower() for word in re.findall(r'\b\w+\b', field_name) if len(word) > 2) if not field_words: return None best_key = None best_value = None best_score = 0.0 for key, value in flat_json.items(): key_words = set(word.lower() for word in re.findall(r'\b\w+\b', key) if len(word) > 2) if not key_words: continue common = field_words.intersection(key_words) if not common: # allow substring in normalized forms as a weaker fallback norm_field = normalize_header_text(field_name) norm_key = normalize_header_text(key) if norm_field and norm_key and (norm_field in norm_key or norm_key in norm_field): # substring score based on length ratio substring_score = min(len(norm_field), len(norm_key)) / max(len(norm_field), len(norm_key)) final_score = 0.4 * substring_score else: final_score = 0.0 else: similarity = len(common) / len(field_words.union(key_words)) coverage = len(common) / len(field_words) final_score = (similarity * 0.6) + (coverage * 0.4) if final_score > best_score: best_score = final_score best_key = key best_value = value # Accept only reasonable fuzzy matches (threshold 0.35) if best_key and best_score >= 0.35: print(f" ✅ Fuzzy match found for key '{field_name}' with JSON key '{best_key}' (score: {best_score:.2f})") return best_key, best_value print(f" ❌ No match found for '{field_name}'") return None # ============================================================================ # Red text helpers (unchanged except kept robust) # ============================================================================ def extract_red_text_segments(cell): red_segments = [] for para_idx, paragraph in enumerate(cell.paragraphs): current_segment = "" segment_runs = [] for run_idx, run in enumerate(paragraph.runs): if is_red(run): if run.text: current_segment += run.text segment_runs.append((para_idx, run_idx, run)) else: if segment_runs: red_segments.append({'text': current_segment, 'runs': segment_runs.copy(), 'paragraph_idx': para_idx}) current_segment = "" segment_runs = [] if segment_runs: red_segments.append({'text': current_segment, 'runs': segment_runs.copy(), 'paragraph_idx': para_idx}) return red_segments def replace_all_red_segments(red_segments, replacement_text): if not red_segments: return 0 if '\n' in replacement_text: replacement_lines = replacement_text.split('\n') else: replacement_lines = [replacement_text] replacements_made = 0 first_segment = red_segments[0] if first_segment['runs']: first_run = first_segment['runs'][0][2] first_run.text = replacement_lines[0] first_run.font.color.rgb = RGBColor(0, 0, 0) replacements_made = 1 for _, _, run in first_segment['runs'][1:]: run.text = '' for segment in red_segments[1:]: for _, _, run in segment['runs']: run.text = '' if len(replacement_lines) > 1 and red_segments: try: first_run = red_segments[0]['runs'][0][2] paragraph = first_run.element.getparent() from docx.oxml import OxmlElement for line in replacement_lines[1:]: if line.strip(): br = OxmlElement('w:br') first_run.element.append(br) new_run = paragraph.add_run(line.strip()) new_run.font.color.rgb = RGBColor(0, 0, 0) except Exception: if red_segments and red_segments[0]['runs']: first_run = red_segments[0]['runs'][0][2] first_run.text = ' '.join(replacement_lines) first_run.font.color.rgb = RGBColor(0, 0, 0) return replacements_made def replace_single_segment(segment, replacement_text): if not segment['runs']: return False first_run = segment['runs'][0][2] first_run.text = replacement_text first_run.font.color.rgb = RGBColor(0, 0, 0) for _, _, run in segment['runs'][1:]: run.text = '' return True def replace_red_text_in_cell(cell, replacement_text): red_segments = extract_red_text_segments(cell) if not red_segments: return 0 return replace_all_red_segments(red_segments, replacement_text) # ============================================================================ # Specialized handlers (vehicle, attendance, management, operator) with fixes # ============================================================================ def handle_australian_company_number(row, company_numbers): replacements_made = 0 for i, digit in enumerate(company_numbers): cell_idx = i + 1 if cell_idx < len(row.cells): cell = row.cells[cell_idx] if has_red_text(cell): cell_replacements = replace_red_text_in_cell(cell, str(digit)) replacements_made += cell_replacements print(f" -> Placed digit '{digit}' in cell {cell_idx + 1}") return replacements_made def handle_vehicle_registration_table(table, flat_json): """ Stronger header normalization + substring matching for long headers. Keeps existing behavior but reduces 'No mapping found' by using normalized substring matching. """ replacements_made = 0 # Build candidate vehicle_section similar to prior logic vehicle_section = None # Prefer keys explicitly mentioning 'registration' or 'vehicle' candidates = [(k, v) for k, v in flat_json.items() if 'registration' in k.lower() or 'vehicle' in k.lower()] if candidates: # prefer the one with longest key match (likely most specific) candidates.sort(key=lambda kv: -len(kv[0])) vehicle_section = candidates[0][1] # fallback: collect flattened keys that look like vehicle columns if vehicle_section is None: potential_columns = {} for key, value in flat_json.items(): lk = key.lower() if any(col_name in lk for col_name in ["registration number", "sub-contractor", "weight verification", "rfs suspension", "trip records", "fault recording", "fault repair", "daily checks", "roadworthiness"]): if "." in key: column_name = key.split(".")[-1] else: column_name = key potential_columns[column_name] = value if potential_columns: vehicle_section = potential_columns print(f" ✅ Found vehicle data from flattened keys: {list(vehicle_section.keys())}") if not vehicle_section: print(f" ❌ Vehicle registration data not found in JSON") return 0 # Normalize vehicle_section into dict of column_label -> list/value if isinstance(vehicle_section, list): # if list of dicts, pivot if vehicle_section and isinstance(vehicle_section[0], dict): flattened = {} for entry in vehicle_section: for k, v in entry.items(): flattened.setdefault(k, []).append(v) vehicle_section = flattened else: # can't interpret, bail vehicle_section = {} if not isinstance(vehicle_section, dict): try: vehicle_section = dict(vehicle_section) except Exception: vehicle_section = {} print(f" ✅ Found vehicle registration data with {len(vehicle_section)} columns") # Find header row (look for registration + number or reg no) header_row_idx = -1 header_row = None for row_idx, row in enumerate(table.rows): row_text = " ".join(get_clean_text(cell).lower() for cell in row.cells) if ("registration" in row_text and "number" in row_text) or "reg no" in row_text or "registration no" in row_text: header_row_idx = row_idx header_row = row break if header_row_idx == -1: print(f" ❌ Could not find header row in vehicle table") return 0 print(f" ✅ Found header row at index {header_row_idx}") # Build master labels from vehicle_section keys master_labels = {} for orig_key in vehicle_section.keys(): norm = normalize_header_text(str(orig_key)) if norm: # if there is collision, prefer longer orig_key (more specific) if norm in master_labels: if len(orig_key) > len(master_labels[norm]): master_labels[norm] = orig_key else: master_labels[norm] = orig_key # Map header cells using normalized token overlap + substring fallback column_mapping = {} for col_idx, cell in enumerate(header_row.cells): header_text = get_clean_text(cell).strip() if not header_text: continue header_key = header_text.strip().lower() if header_key in {"no", "no.", "#"}: continue norm_header = normalize_header_text(header_text) best_match = None best_score = 0.0 # exact normalized match if norm_header in master_labels: best_match = master_labels[norm_header] best_score = 1.0 else: # token overlap header_tokens = set(t for t in norm_header.split() if len(t) > 2) for norm_key, orig_label in master_labels.items(): key_tokens = set(t for t in norm_key.split() if len(t) > 2) if not key_tokens: continue common = header_tokens.intersection(key_tokens) if common: score = len(common) / max(1, len(header_tokens.union(key_tokens))) else: # substring fallback on normalized strings if norm_header in norm_key or norm_key in norm_header: score = min(len(norm_header), len(norm_key)) / max(len(norm_header), len(norm_key)) else: score = 0.0 if score > best_score: best_score = score best_match = orig_label # additional heuristic: if header contains 'roadworthiness' and any master_labels key contains that token, accept if not best_match: for norm_key, orig_label in master_labels.items(): if 'roadworthiness' in norm_header and 'roadworthiness' in norm_key: best_match = orig_label best_score = 0.65 break if best_match and best_score >= 0.30: column_mapping[col_idx] = best_match print(f" 📌 Column {col_idx}: '{header_text}' -> '{best_match}' (norm:'{norm_header}' score:{best_score:.2f})") else: print(f" ⚠️ No mapping found for '{header_text}' (norm:'{norm_header}')") record_unmatched_header(header_text) if not column_mapping: print(f" ❌ No column mappings found") return 0 # Determine how many rows of data to populate max_data_rows = 0 for json_key, data in vehicle_section.items(): if isinstance(data, list): max_data_rows = max(max_data_rows, len(data)) print(f" 📌 Need to populate {max_data_rows} data rows") # Populate or add rows for data_row_index in range(max_data_rows): table_row_idx = header_row_idx + 1 + data_row_index if table_row_idx >= len(table.rows): print(f" ⚠️ Row {table_row_idx + 1} doesn't exist, adding one") table.add_row() row = table.rows[table_row_idx] print(f" 📌 Processing data row {table_row_idx + 1} (vehicle {data_row_index + 1})") for col_idx, json_key in column_mapping.items(): if col_idx < len(row.cells): cell = row.cells[col_idx] column_data = vehicle_section.get(json_key, []) if isinstance(column_data, list) and data_row_index < len(column_data): replacement_value = str(column_data[data_row_index]) cell_text = get_clean_text(cell) if has_red_text(cell) or not cell_text.strip(): if not cell_text.strip(): cell.text = replacement_value replacements_made += 1 print(f" -> Added '{replacement_value}' to empty cell (col '{json_key}')") else: cell_replacements = replace_red_text_in_cell(cell, replacement_value) replacements_made += cell_replacements if cell_replacements > 0: print(f" -> Replaced red text with '{replacement_value}' (col '{json_key}')") return replacements_made def handle_attendance_list_table_enhanced(table, flat_json): """Same as before — preserved behavior.""" replacements_made = 0 attendance_patterns = ["attendance list", "names and position titles", "attendees"] found_attendance_row = None for row_idx, row in enumerate(table.rows[:3]): for cell_idx, cell in enumerate(row.cells): cell_text = get_clean_text(cell).lower() if any(pattern in cell_text for pattern in attendance_patterns): found_attendance_row = row_idx print(f" 🎯 ENHANCED: Found Attendance List in row {row_idx + 1}, cell {cell_idx + 1}") break if found_attendance_row is not None: break if found_attendance_row is None: return 0 attendance_value = None attendance_search_keys = [ "Attendance List (Names and Position Titles).Attendance List (Names and Position Titles)", "Attendance List (Names and Position Titles)", "attendance list", "attendees" ] print(f" 🔍 Searching for attendance data in JSON...") for search_key in attendance_search_keys: kv = find_matching_json_key_and_value(search_key, flat_json) if kv: attendance_value = kv[1] print(f" ✅ Found attendance data with key: '{kv[0]}'") print(f" 📊 Raw value: {attendance_value}") break if attendance_value is None: print(f" ❌ No attendance data found in JSON") return 0 # Find red text candidate cell target_cell = None print(f" 🔍 Scanning ALL cells in attendance table for red text...") for row_idx, row in enumerate(table.rows): for cell_idx, cell in enumerate(row.cells): if has_red_text(cell): red_text = "" for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run): red_text += run.text if red_text.strip(): print(f" 🎯 Found red text in row {row_idx + 1}, cell {cell_idx + 1}") print(f" 📋 Red text content: '{red_text[:60]}...'") red_lower = red_text.lower() if any(ind in red_lower for ind in ['manager', 'director', 'auditor', '–', '-']): target_cell = cell print(f" ✅ This looks like attendance data - using this cell") break if target_cell: break if target_cell is None: print(f" ⚠️ No red text found that looks like attendance data") return 0 if has_red_text(target_cell): print(f" 🔧 Replacing red text with properly formatted attendance list...") if isinstance(attendance_value, list): attendance_list = [str(item).strip() for item in attendance_value if str(item).strip()] else: attendance_list = [str(attendance_value).strip()] print(f" 📝 Attendance items to add:") for i, item in enumerate(attendance_list): print(f" {i+1}. {item}") replacement_text = "\n".join(attendance_list) cell_replacements = replace_red_text_in_cell(target_cell, replacement_text) replacements_made += cell_replacements print(f" ✅ Added {len(attendance_list)} attendance items") print(f" 📊 Replacements made: {cell_replacements}") return replacements_made def fix_management_summary_details_column(table, flat_json): """CORRECTED VERSION: Replace red text with UPDATED values from JSON (not old extracted values)""" replacements_made = 0 print(f" 🎯 FIX: Management Summary DETAILS column processing") print(f" 📋 NOTE: JSON contains UPDATED values to replace red text with") # Determine which type of management summary this is table_text = "" for row in table.rows[:3]: for cell in row.cells: table_text += get_clean_text(cell).lower() + " " mgmt_types = [] if "mass management" in table_text or "mass" in table_text: mgmt_types.append("Mass Management Summary") if "maintenance management" in table_text or "maintenance" in table_text: mgmt_types.append("Maintenance Management Summary") if "fatigue management" in table_text or "fatigue" in table_text: mgmt_types.append("Fatigue Management Summary") # Fallback detection if not mgmt_types: if any("std 5" in get_clean_text(c).lower() for r in table.rows for c in r.cells): mgmt_types.append("Mass Management Summary") if not mgmt_types: print(f" ⚠️ Could not determine management summary type") return 0 for mgmt_type in mgmt_types: print(f" ✅ Confirmed {mgmt_type} table processing") # Build management data dict from flattened keys - these contain UPDATED values mgmt_data = {} # Look for flattened keys like "Mass Management Summary.Std 5. Verification" # IMPORTANT: Prioritize longer, more detailed values over shorter ones for key, value in flat_json.items(): if key.startswith(mgmt_type + "."): # Extract the standard part (after the management type) std_key = key[len(mgmt_type) + 1:] # Remove "Mass Management Summary." prefix # Check if this is a longer, more detailed version than what we already have if std_key in mgmt_data: # Compare value lengths - prefer longer, more detailed content existing_value = mgmt_data[std_key] existing_length = len(str(existing_value)) if not isinstance(existing_value, list) else len(str(existing_value[0]) if existing_value else "") new_length = len(str(value)) if not isinstance(value, list) else len(str(value[0]) if value else "") if new_length > existing_length: mgmt_data[std_key] = value print(f" ✅ UPDATED to longer standard: '{std_key}' = {value}") else: print(f" ⏭️ Keeping existing longer standard: '{std_key}'") else: mgmt_data[std_key] = value print(f" ✅ Found UPDATED standard: '{std_key}' = {value}") if not mgmt_data: print(f" ⚠️ No UPDATED JSON data found for {mgmt_type}") continue print(f" 📋 Processing {mgmt_type} with {len(mgmt_data)} updated standards: {list(mgmt_data.keys())}") # Process each row looking for red text in details column print(f" 🔍 Analyzing all {len(table.rows)} rows in table:") for row_idx, row in enumerate(table.rows): if len(row.cells) >= 2: standard_cell = row.cells[0] details_cell = row.cells[1] standard_text = get_clean_text(standard_cell).strip() details_text = get_clean_text(details_cell).strip() standard_text_lower = standard_text.lower() print(f" 📋 Row {row_idx + 1}:") print(f" 📄 Standard: '{standard_text}'") print(f" 📄 Current Details: '{details_text[:50]}...' (length: {len(details_text)})") print(f" 🔴 Has red text (OLD data): {has_red_text(details_cell)}") # Skip header rows - be more specific about what constitutes a header header_indicators = ["standard", "requirement", "details", mgmt_type.lower().split()[0]] if any(header in standard_text_lower for header in header_indicators) and len(standard_text) < 50: print(f" ⏭️ Skipping header row") continue # IMPORTANT: We want to replace red text (old data) with updated data from JSON # Check if this row has red text in details cell - this is what we need to replace if not has_red_text(details_cell): print(f" ⏭️ No red text found in details cell (already updated?), skipping") continue print(f" 🎯 PROCESSING row {row_idx + 1} - REPLACING OLD red text with NEW data") # Extract current red text (this is the OLD data we're replacing) red_segments = extract_red_text_segments(details_cell) current_red_text = "" for segment in red_segments: current_red_text += segment['text'] print(f" 🔴 Current red text (OLD): '{current_red_text[:100]}...'") # Find the UPDATED replacement value from JSON replacement_value = None matched_std = None # Strategy 1: Extract standard number and match std_match = re.search(r'std\s*(\d+)', standard_text_lower) if std_match: std_num = std_match.group(1) print(f" 🎯 Looking for UPDATED Standard {std_num} data") # Look for matching standard in mgmt_data (contains UPDATED values) for std_key, std_value in mgmt_data.items(): if f"std {std_num}" in std_key.lower(): replacement_value = std_value matched_std = std_key print(f" ✅ Found UPDATED data for std {std_num}: '{std_key}'") break # Strategy 2: Keyword-based matching if std number doesn't work if not replacement_value: print(f" 🔍 No std number match, trying keyword matching for UPDATED data...") # More comprehensive keyword matching keyword_mappings = { "daily check": ["Std 1. Daily Check", "Daily Check"], "verification": ["Std 5. Verification", "Verification"], "internal review": ["Std 6. Internal Review", "Std 7. Internal Review", "Std 5. Internal Review", "Internal Review"], "fault recording": ["Std 2. Fault Recording", "Fault Recording/ Reporting"], "fault repair": ["Std 3. Fault Repair", "Fault Repair"], "maintenance schedules": ["Std 4. Maintenance Schedules", "Maintenance Schedules"], "responsibilities": ["Std 1. Responsibilities", "Std 6. Responsibilities"], "vehicle control": ["Std 2. Vehicle Control", "Vehicle Control"], "vehicle use": ["Std 3. Vehicle Use", "Vehicle Use"], "records and documentation": ["Std 4. Records", "Std 5. Records", "Records and Documentation"], "training": ["Std 8. Training", "Std 3. Training", "Training"], "suspension": ["Std 8. Maintenance of Suspension", "Suspension"], "scheduling": ["Std 1. Scheduling", "Scheduling"], "health and wellbeing": ["Std 2. Health", "Health and wellbeing"], "workplace conditions": ["Std 7. Workplace", "Workplace conditions"] } for keyword, candidates in keyword_mappings.items(): if keyword in standard_text_lower: replacement_value = find_best_standard_value(mgmt_data, candidates) if replacement_value: matched_std = f"{keyword} related" print(f" ✅ Found UPDATED data for keyword '{keyword}'") break # Strategy 3: Try exact standard name matching if not replacement_value: print(f" 🔍 Trying exact standard name matching for UPDATED data...") # Clean the standard text for better matching clean_standard = re.sub(r'\([^)]*\)', '', standard_text).strip() for std_key, std_value in mgmt_data.items(): # Try partial matching if (clean_standard.lower() in std_key.lower() or std_key.lower() in clean_standard.lower()): replacement_value = std_value matched_std = std_key print(f" ✅ Found UPDATED data via partial match: '{std_key}'") break # Apply replacement if found if replacement_value: # Handle list values properly if isinstance(replacement_value, list): if len(replacement_value) == 1: replacement_text = str(replacement_value[0]) else: replacement_text = "\n".join(str(item) for item in replacement_value) else: replacement_text = str(replacement_value) print(f" 🎯 REPLACING old red text with UPDATED data: '{replacement_text[:100]}...'") # Use robust red text replacement cell_replacements = replace_red_text_in_cell(details_cell, replacement_text) # FALLBACK: If replace_red_text_in_cell fails, try manual replacement if cell_replacements == 0: print(f" ⚠️ Standard replacement failed, trying manual approach...") # Try to replace red text manually for paragraph in details_cell.paragraphs: for run in paragraph.runs: if is_red(run) and run.text.strip(): print(f" 🔧 Manually replacing red run: '{run.text[:50]}...'") run.text = replacement_text run.font.color.rgb = RGBColor(0, 0, 0) cell_replacements = 1 break if cell_replacements > 0: break replacements_made += cell_replacements if cell_replacements > 0: print(f" ✅ SUCCESSFULLY UPDATED '{standard_text}' with NEW data in {mgmt_type}") print(f" 📋 Used UPDATED data from: '{matched_std}'") # Verify the replacement worked new_details_text = get_clean_text(details_cell).strip() print(f" 🔍 NEW details text: '{new_details_text[:100]}...'") print(f" 🎉 OLD red text replaced with UPDATED data!") else: print(f" ❌ Failed to replace red text in cell") print(f" 🔍 Cell still contains OLD data: '{get_clean_text(details_cell)[:100]}...'") else: print(f" ⚠️ No UPDATED replacement found for '{standard_text}' in {mgmt_type}") print(f" 📋 Available UPDATED standards: {list(mgmt_data.keys())}") # FALLBACK: Try to find ANY available standard that might fit if mgmt_data and current_red_text: print(f" 🔄 Trying fallback - any available UPDATED standard...") # Use the first available standard as a fallback first_std_key = list(mgmt_data.keys())[0] fallback_value = mgmt_data[first_std_key] if isinstance(fallback_value, list): fallback_text = "\n".join(str(item) for item in fallback_value) else: fallback_text = str(fallback_value) print(f" 🔄 Using fallback UPDATED data: '{fallback_text[:100]}...'") cell_replacements = replace_red_text_in_cell(details_cell, fallback_text) if cell_replacements > 0: replacements_made += cell_replacements print(f" ✅ Applied fallback UPDATED data successfully") else: print(f" ⚠️ Row {row_idx + 1} has insufficient columns ({len(row.cells)})") print(f" 📊 Total management summary UPDATES: {replacements_made}") return replacements_made def find_best_standard_value(mgmt_data, candidate_keys): """ENHANCED: Find the best matching value for a standard from management data""" print(f" 🔍 Searching for candidates: {candidate_keys}") print(f" 📋 In available keys: {list(mgmt_data.keys())}") # Direct match for candidate in candidate_keys: if candidate in mgmt_data: print(f" ✅ Direct match found: '{candidate}'") return mgmt_data[candidate] # Case insensitive match for candidate in candidate_keys: for key, value in mgmt_data.items(): if candidate.lower() == key.lower(): print(f" ✅ Case-insensitive match found: '{key}' for '{candidate}'") return value # Partial match (contains) for candidate in candidate_keys: for key, value in mgmt_data.items(): if candidate.lower() in key.lower() or key.lower() in candidate.lower(): print(f" ✅ Partial match found: '{key}' for '{candidate}'") return value # Extract number and match by number for candidate in candidate_keys: candidate_num = re.search(r'(\d+)', candidate) if candidate_num: for key, value in mgmt_data.items(): key_num = re.search(r'(\d+)', key) if key_num and candidate_num.group(1) == key_num.group(1): print(f" ✅ Number match found: '{key}' for '{candidate}'") return value print(f" ❌ No match found for any candidate") return None # ============================================================================ # Canonical operator declaration fixer — SAFER # ============================================================================ def fix_operator_declaration_empty_values(table, flat_json): """ FIXED: Properly distinguish between auditor and operator data for Operator Declaration table """ replacements_made = 0 print(f" 🎯 FIX: Operator Declaration empty values processing") # Verify this is actually an operator declaration table table_context = "" for row in table.rows: for cell in row.cells: table_context += get_clean_text(cell).lower() + " " if not ("print name" in table_context and "position title" in table_context): return 0 print(f" ✅ Confirmed Operator Declaration table") def parse_name_and_position(value): """Enhanced parsing for name/position combinations""" if value is None: return None, None if isinstance(value, list): if len(value) == 0: return None, None if len(value) == 1: # Check if single item looks like "Name - Position" format single_item = str(value[0]).strip() if ' - ' in single_item: parts = single_item.split(' - ', 1) if len(parts) == 2: return parts[0].strip(), parts[1].strip() return single_item, None # Handle [name, position] pattern or multiple attendance entries if len(value) == 2: first = str(value[0]).strip() second = str(value[1]).strip() # Check if both look like names (attendance list pattern) if (' ' in first and ' ' in second and not any(role in first.lower() for role in ['manager', 'director', 'auditor', 'officer']) and not any(role in second.lower() for role in ['manager', 'director', 'auditor', 'officer'])): # This is likely attendance list data, return first name only return first, None return first, second # Multiple items - check if it's attendance list format attendance_like = any(' - ' in str(item) for item in value) if attendance_like: # Extract first person's name from attendance format first_entry = str(value[0]).strip() if ' - ' in first_entry: return first_entry.split(' - ')[0].strip(), first_entry.split(' - ')[1].strip() return first_entry, None # Join list elements as fallback value = " ".join(str(v).strip() for v in value if str(v).strip()) s = str(value).strip() if not s: return None, None # Split on common separators separators = [r'\s+[-–—]\s+', r'\s*,\s*', r'\s*\|\s*', r'\s*;\s*'] parts = None for sep_pattern in separators: parts = re.split(sep_pattern, s) if len(parts) >= 2: break if parts and len(parts) >= 2: left = parts[0].strip() right = parts[1].strip() # Check which part is more likely to be a position role_indicators = ['manager', 'auditor', 'owner', 'director', 'supervisor', 'coordinator', 'driver', 'operator', 'representative', 'chief', 'president', 'ceo', 'cfo', 'secretary', 'treasurer', 'officer', 'compliance'] right_has_role = any(ind in right.lower() for ind in role_indicators) left_has_role = any(ind in left.lower() for ind in role_indicators) if right_has_role and not left_has_role: return left, right # Standard: name, position elif left_has_role and not right_has_role: return right, left # Reversed: position, name else: # Default to left=name, right=position return left, right # Look for single word position at end tokens = s.split() if len(tokens) >= 2: last_token = tokens[-1].lower() role_indicators = ['manager', 'auditor', 'owner', 'director', 'supervisor', 'coordinator', 'driver', 'operator', 'representative', 'chief', 'officer'] if any(ind == last_token for ind in role_indicators): return " ".join(tokens[:-1]), tokens[-1] return s, None def looks_like_role(s: str) -> bool: """Check if string looks like a job role/position""" if not s: return False s = s.lower().strip() # Common role words roles = ['manager', 'auditor', 'owner', 'director', 'supervisor', 'coordinator', 'driver', 'operator', 'representative', 'chief', 'president', 'ceo', 'cfo', 'secretary', 'treasurer', 'officer'] # Direct role match if any(role in s for role in roles): return True # Short descriptive terms (likely roles) if len(s.split()) <= 3 and any(c.isalpha() for c in s) and len(s) > 1: return True return False def looks_like_person_name(s: str) -> bool: """Check if string looks like a person's name""" if not s: return False s = s.strip() # Exclude company-like terms company_terms = ['pty ltd', 'ltd', 'inc', 'corp', 'company', 'llc', 'plc'] s_lower = s.lower() if any(term in s_lower for term in company_terms): return False # Should have letters and reasonable length if len(s) > 1 and any(c.isalpha() for c in s): return True return False # Process the table for row_idx, row in enumerate(table.rows): if len(row.cells) >= 2: cell1_text = get_clean_text(row.cells[0]).strip().lower() cell2_text = get_clean_text(row.cells[1]).strip().lower() # Detect header row if "print name" in cell1_text and "position" in cell2_text: print(f" 📌 Found header row at {row_idx + 1}") # Process data row (next row after header) if row_idx + 1 < len(table.rows): data_row = table.rows[row_idx + 1] if len(data_row.cells) >= 2: name_cell = data_row.cells[0] position_cell = data_row.cells[1] current_name = get_clean_text(name_cell).strip() current_position = get_clean_text(position_cell).strip() print(f" 📋 Current values: Name='{current_name}', Position='{current_position}'") # IMPROVED: More comprehensive search for operator declaration data final_name = None final_position = None # IMPROVED: Better strategy to find OPERATOR (not auditor) data final_name = None final_position = None # Strategy 1: Look specifically in Attendance List for operator names attendance_kv = find_matching_json_key_and_value("Attendance List (Names and Position Titles)", flat_json) if attendance_kv and attendance_kv[1]: attendance_data = attendance_kv[1] print(f" 📋 Found attendance data: {attendance_data}") # Parse attendance list to find non-auditor names if isinstance(attendance_data, list): for entry in attendance_data: entry_str = str(entry).strip() if 'auditor' not in entry_str.lower() and entry_str: # Parse this entry for name and position parsed_name, parsed_pos = parse_name_and_position(entry_str) if parsed_name and looks_like_person_name(parsed_name): final_name = parsed_name if parsed_pos and looks_like_role(parsed_pos): final_position = parsed_pos break # Strategy 2: If no good name from attendance, try nested attendance keys if not final_name: nested_attendance_kv = find_matching_json_key_and_value("Attendance List (Names and Position Titles).Attendance List (Names and Position Titles)", flat_json) if nested_attendance_kv and nested_attendance_kv[1]: nested_data = nested_attendance_kv[1] print(f" 📋 Found nested attendance data: {nested_data}") if isinstance(nested_data, list): for entry in nested_data: entry_str = str(entry).strip() if 'auditor' not in entry_str.lower() and entry_str: parsed_name, parsed_pos = parse_name_and_position(entry_str) if parsed_name and looks_like_person_name(parsed_name): final_name = parsed_name if parsed_pos and looks_like_role(parsed_pos): final_position = parsed_pos break # Strategy 3: Direct operator declaration keys (with filtering) if not final_name: search_strategies = [ ("Operator Declaration.Print Name", "Operator Declaration.Position Title"), ("Print Name", "Position Title"), ] for name_key_pattern, pos_key_pattern in search_strategies: name_kv = find_matching_json_key_and_value(name_key_pattern, flat_json) pos_kv = find_matching_json_key_and_value(pos_key_pattern, flat_json) if name_kv and name_kv[1]: # Filter out auditor names potential_name = str(name_kv[1]).strip() # Skip if this is clearly auditor data if name_kv[0] and 'auditor' in name_kv[0].lower(): continue # Skip common auditor names that appear in our data auditor_names = ['greg dyer', 'greg', 'dyer'] if any(aud_name in potential_name.lower() for aud_name in auditor_names): continue name_from_val, pos_from_val = parse_name_and_position(name_kv[1]) if name_from_val and looks_like_person_name(name_from_val): # Additional check - avoid auditor names if not any(aud_name in name_from_val.lower() for aud_name in auditor_names): final_name = name_from_val if pos_from_val and looks_like_role(pos_from_val): final_position = pos_from_val if pos_kv and pos_kv[1] and not final_position: # Only use if key doesn't indicate auditor data if not (pos_kv[0] and 'auditor' in pos_kv[0].lower()): pos_val = str(pos_kv[1]).strip() if looks_like_role(pos_val) and 'auditor' not in pos_val.lower(): final_position = pos_val if final_name: break # Strategy 4: Last resort - search all keys but with strict filtering if not final_name: print(f" 🔍 Searching all keys with strict operator filtering...") for key, value in flat_json.items(): key_lower = key.lower() # Skip keys that clearly relate to auditor if 'auditor' in key_lower: continue # Look for operator-related keys if (("operator" in key_lower and "name" in key_lower) or ("print name" in key_lower and "operator" in key_lower)): if value and looks_like_person_name(str(value)): potential_name = str(value).strip() # Skip auditor names auditor_names = ['greg dyer', 'greg', 'dyer'] if not any(aud_name in potential_name.lower() for aud_name in auditor_names): name_from_val, pos_from_val = parse_name_and_position(value) if name_from_val and looks_like_person_name(name_from_val): final_name = name_from_val if pos_from_val and looks_like_role(pos_from_val): final_position = pos_from_val break # Clean up final values if isinstance(final_name, (list, tuple)): final_name = " ".join(str(x) for x in final_name).strip() if isinstance(final_position, (list, tuple)): final_position = " ".join(str(x) for x in final_position).strip() final_name = str(final_name).strip() if final_name else None final_position = str(final_position).strip() if final_position else None print(f" 🎯 Final extracted values: Name='{final_name}', Position='{final_position}'") # Update name cell if needed if (not current_name or has_red_text(name_cell)) and final_name and looks_like_person_name(final_name): if has_red_text(name_cell): replace_red_text_in_cell(name_cell, final_name) else: name_cell.text = final_name replacements_made += 1 print(f" ✅ Updated Print Name -> '{final_name}'") # Update position cell if needed if (not current_position or has_red_text(position_cell)) and final_position and looks_like_role(final_position): if has_red_text(position_cell): replace_red_text_in_cell(position_cell, final_position) else: position_cell.text = final_position replacements_made += 1 print(f" ✅ Updated Position Title -> '{final_position}'") break # Found and processed the header row # Mark table as processed if replacements_made > 0: try: setattr(table, "_processed_operator_declaration", True) print(" 🔖 Marked table as processed by Operator Declaration handler") except Exception: pass return replacements_made def handle_multiple_red_segments_in_cell(cell, flat_json): replacements_made = 0 red_segments = extract_red_text_segments(cell) if not red_segments: return 0 for i, segment in enumerate(red_segments): segment_text = segment['text'].strip() if segment_text: kv = find_matching_json_key_and_value(segment_text, flat_json) if kv: replacement_text = get_value_as_string(kv[1], segment_text) if replace_single_segment(segment, replacement_text): replacements_made += 1 print(f" ✅ Replaced segment {i+1}: '{segment_text}' -> '{replacement_text}'") return replacements_made def handle_nature_business_multiline_fix(cell, flat_json): replacements_made = 0 red_text = "" for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run): red_text += run.text red_text = red_text.strip() if not red_text: return 0 nature_indicators = ["transport", "logistics", "freight", "delivery", "trucking", "haulage"] if any(indicator in red_text.lower() for indicator in nature_indicators): kv = find_matching_json_key_and_value("Nature of Business", flat_json) or find_matching_json_key_and_value("Nature of the Operators Business (Summary)", flat_json) if kv: replacement_text = get_value_as_string(kv[1], "Nature of Business") cell_replacements = replace_red_text_in_cell(cell, replacement_text) replacements_made += cell_replacements print(f" ✅ Fixed Nature of Business multiline content") return replacements_made def handle_management_summary_fix(cell, flat_json): replacements_made = 0 red_text = "" for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run): red_text += run.text red_text = red_text.strip() if not red_text: return 0 management_types = ["Mass Management Summary", "Maintenance Management Summary", "Fatigue Management Summary"] for mgmt_type in management_types: if mgmt_type in flat_json and isinstance(flat_json[mgmt_type], dict): mgmt_data = flat_json[mgmt_type] for std_key, std_value in mgmt_data.items(): if isinstance(std_value, list) and std_value: if len(red_text) > 10: for item in std_value: if red_text.lower() in str(item).lower() or str(item).lower() in red_text.lower(): replacement_text = "\n".join(str(i) for i in std_value) cell_replacements = replace_red_text_in_cell(cell, replacement_text) replacements_made += cell_replacements print(f" ✅ Fixed {mgmt_type} - {std_key}") return replacements_made return replacements_made def handle_print_accreditation_section(table, flat_json): replacements_made = 0 if getattr(table, "_processed_operator_declaration", False): print(f" ⏭️ Skipping Print Accreditation - this is an Operator Declaration table") return 0 table_context = "" for row in table.rows: for cell in row.cells: table_context += get_clean_text(cell).lower() + " " if "operator declaration" in table_context or ("print name" in table_context and "position title" in table_context): print(f" ⏭️ Skipping Print Accreditation - this is an Operator Declaration table") return 0 print(f" 📋 Processing Print Accreditation section") for row_idx, row in enumerate(table.rows): for cell_idx, cell in enumerate(row.cells): if has_red_text(cell): accreditation_fields = [ "(print accreditation name)", "Operator name (Legal entity)", "Print accreditation name" ] for field in accreditation_fields: kv = find_matching_json_key_and_value(field, flat_json) if kv: replacement_text = get_value_as_string(kv[1], field) if replacement_text.strip(): cell_replacements = replace_red_text_in_cell(cell, replacement_text) replacements_made += cell_replacements if cell_replacements > 0: print(f" ✅ Fixed accreditation: {kv[0]}") break return replacements_made def process_single_column_sections(cell, key_text, flat_json): replacements_made = 0 if has_red_text(cell): red_text = "" for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run): red_text += run.text if red_text.strip(): kv = find_matching_json_key_and_value(red_text.strip(), flat_json) if not kv: kv = find_matching_json_key_and_value(key_text, flat_json) if kv: section_replacement = get_value_as_string(kv[1], red_text.strip()) cell_replacements = replace_red_text_in_cell(cell, section_replacement) replacements_made += cell_replacements if cell_replacements > 0: print(f" ✅ Fixed single column section: '{key_text}'") return replacements_made # ============================================================================ # Main table/paragraph/heading processing (preserve logic + use new helpers) # ============================================================================ def process_tables(document, flat_json): replacements_made = 0 for table_idx, table in enumerate(document.tables): print(f"\n🔍 Processing table {table_idx + 1}:") table_text = "" for row in table.rows[:3]: for cell in row.cells: table_text += get_clean_text(cell).lower() + " " management_summary_indicators = ["mass management", "maintenance management", "fatigue management"] has_management = any(indicator in table_text for indicator in management_summary_indicators) has_details = "details" in table_text if has_management and has_details: print(f" 📋 Detected Management Summary table") summary_fixes = fix_management_summary_details_column(table, flat_json) replacements_made += summary_fixes summary_replacements = 0 for row_idx, row in enumerate(table.rows): for cell_idx, cell in enumerate(row.cells): if has_red_text(cell): for mgmt_type in ["Mass Management Summary", "Maintenance Management Summary", "Fatigue Management Summary"]: if mgmt_type.lower().replace(" summary", "") in table_text: if mgmt_type in flat_json: mgmt_data = flat_json[mgmt_type] if isinstance(mgmt_data, dict): for std_key, std_value in mgmt_data.items(): if isinstance(std_value, list) and len(std_value) > 0: red_text = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red(run)).strip() for item in std_value: if len(red_text) > 15 and red_text.lower() in str(item).lower(): replacement_text = "\n".join(str(i) for i in std_value) cell_replacements = replace_red_text_in_cell(cell, replacement_text) summary_replacements += cell_replacements print(f" ✅ Updated {std_key} with summary data") break break if summary_replacements == 0: cell_replacements = handle_management_summary_fix(cell, flat_json) summary_replacements += cell_replacements replacements_made += summary_replacements continue # Vehicle tables detection vehicle_indicators = ["registration number", "sub-contractor", "weight verification", "rfs suspension", "registration"] indicator_count = sum(1 for indicator in vehicle_indicators if indicator in table_text) if indicator_count >= 2: print(f" 🚗 Detected Vehicle Registration table") vehicle_replacements = handle_vehicle_registration_table(table, flat_json) replacements_made += vehicle_replacements continue # Attendance if "attendance list" in table_text and "names and position titles" in table_text: print(f" 👥 Detected Attendance List table") attendance_replacements = handle_attendance_list_table_enhanced(table, flat_json) replacements_made += attendance_replacements continue # Print Accreditation / Operator Declaration print_accreditation_indicators = ["print name", "position title"] indicator_count = sum(1 for indicator in print_accreditation_indicators if indicator in table_text) if indicator_count >= 2 or ("print name" in table_text and "position title" in table_text): print(f" 📋 Detected Print Accreditation/Operator Declaration table") declaration_fixes = fix_operator_declaration_empty_values(table, flat_json) replacements_made += declaration_fixes if not getattr(table, "_processed_operator_declaration", False): print_accreditation_replacements = handle_print_accreditation_section(table, flat_json) replacements_made += print_accreditation_replacements continue # Regular table rows handling (preserved) for row_idx, row in enumerate(table.rows): if len(row.cells) < 1: continue key_cell = row.cells[0] key_text = get_clean_text(key_cell) if not key_text: continue print(f" 📌 Row {row_idx + 1}: Key = '{key_text}'") kv = find_matching_json_key_and_value(key_text, flat_json) json_value = kv[1] if kv else None if json_value is not None: replacement_text = get_value_as_string(json_value, key_text) # ACN handling if ("australian company number" in key_text.lower() or "company number" in key_text.lower()) and isinstance(json_value, list): cell_replacements = handle_australian_company_number(row, json_value) replacements_made += cell_replacements # section headers elif ("attendance list" in key_text.lower() or "nature of" in key_text.lower()) and row_idx + 1 < len(table.rows): print(f" ✅ Section header detected, checking next row...") next_row = table.rows[row_idx + 1] for cell_idx, cell in enumerate(next_row.cells): if has_red_text(cell): print(f" ✅ Found red text in next row, cell {cell_idx + 1}") if isinstance(json_value, list): section_text = "\n".join(str(item) for item in json_value) else: section_text = replacement_text cell_replacements = replace_red_text_in_cell(cell, section_text) replacements_made += cell_replacements if cell_replacements > 0: print(f" -> Replaced section content") # single column elif len(row.cells) == 1 or (len(row.cells) > 1 and not any(has_red_text(row.cells[i]) for i in range(1, len(row.cells)))): if has_red_text(key_cell): cell_replacements = process_single_column_sections(key_cell, key_text, flat_json) replacements_made += cell_replacements # key-value pairs else: for cell_idx in range(1, len(row.cells)): value_cell = row.cells[cell_idx] if has_red_text(value_cell): print(f" ✅ Found red text in column {cell_idx + 1}") cell_replacements = replace_red_text_in_cell(value_cell, replacement_text) replacements_made += cell_replacements else: # fallback single cell red-text key if len(row.cells) == 1 and has_red_text(key_cell): red_text = "" for paragraph in key_cell.paragraphs: for run in paragraph.runs: if is_red(run): red_text += run.text if red_text.strip(): kv2 = find_matching_json_key_and_value(red_text.strip(), flat_json) if kv2: section_replacement = get_value_as_string(kv2[1], red_text.strip()) cell_replacements = replace_red_text_in_cell(key_cell, section_replacement) replacements_made += cell_replacements # attempt multiple red-segments or surgical fixes for cell_idx in range(len(row.cells)): cell = row.cells[cell_idx] if has_red_text(cell): cell_replacements = handle_multiple_red_segments_in_cell(cell, flat_json) replacements_made += cell_replacements if cell_replacements == 0: surgical_fix = handle_nature_business_multiline_fix(cell, flat_json) replacements_made += surgical_fix if cell_replacements == 0: management_summary_fix = handle_management_summary_fix(cell, flat_json) replacements_made += management_summary_fix # Final operator/auditor declaration check on last few tables print(f"\n🎯 Final check for Declaration tables...") for table in document.tables[-3:]: if len(table.rows) <= 4: if getattr(table, "_processed_operator_declaration", False): print(f" ⏭️ Skipping - already processed by operator declaration handler") continue declaration_fix = fix_operator_declaration_empty_values(table, flat_json) replacements_made += declaration_fix return replacements_made def process_paragraphs(document, flat_json): replacements_made = 0 print(f"\n🔍 Processing paragraphs:") for para_idx, paragraph in enumerate(document.paragraphs): red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()] if red_runs: red_text_only = "".join(run.text for run in red_runs).strip() print(f" 📌 Paragraph {para_idx + 1}: Found red text: '{red_text_only}'") kv = find_matching_json_key_and_value(red_text_only, flat_json) json_value = kv[1] if kv else None if json_value is None: if "AUDITOR SIGNATURE" in red_text_only.upper() or "DATE" in red_text_only.upper(): kv = find_matching_json_key_and_value("auditor signature", flat_json) elif "OPERATOR SIGNATURE" in red_text_only.upper(): kv = find_matching_json_key_and_value("operator signature", flat_json) json_value = kv[1] if kv else None if json_value is not None: replacement_text = get_value_as_string(json_value) print(f" ✅ Replacing red text with: '{replacement_text}'") red_runs[0].text = replacement_text red_runs[0].font.color.rgb = RGBColor(0, 0, 0) for run in red_runs[1:]: run.text = '' replacements_made += 1 return replacements_made def process_headings(document, flat_json): """ FIXED: Better heading processing with proper red text replacement """ replacements_made = 0 print(f"\n🔍 Processing headings:") paragraphs = document.paragraphs # Extract the correct operator name from the JSON data operator_name = None for key, value in flat_json.items(): if "operator name" in key.lower() and "legal entity" in key.lower(): if isinstance(value, list) and value: operator_name = str(value[0]).strip() else: operator_name = str(value).strip() break if not operator_name: # Fallback - try other operator name keys for key, value in flat_json.items(): if ("operator" in key.lower() and "name" in key.lower()) or key.lower() == "operator name": if isinstance(value, list) and value: operator_name = str(value[0]).strip() elif value: operator_name = str(value).strip() break print(f" 📋 Using operator name: '{operator_name}'") for para_idx, paragraph in enumerate(paragraphs): paragraph_text = paragraph.text.strip() if not paragraph_text: continue matched_heading = None for category, patterns in HEADING_PATTERNS.items(): for pattern in patterns: if re.search(pattern, paragraph_text, re.IGNORECASE): matched_heading = pattern break if matched_heading: break if matched_heading: print(f" 📌 Found heading at paragraph {para_idx + 1}: '{paragraph_text}'") # Check if the heading itself has red text if has_red_text_in_paragraph(paragraph): print(f" 🔴 Found red text in heading itself") heading_replacements = process_red_text_in_heading_paragraph(paragraph, paragraph_text, flat_json, operator_name) replacements_made += heading_replacements # Look for red text in paragraphs immediately following this heading for next_para_offset in range(1, 6): next_para_idx = para_idx + next_para_offset if next_para_idx >= len(paragraphs): break next_paragraph = paragraphs[next_para_idx] next_text = next_paragraph.text.strip() if not next_text: continue # Stop if we hit another heading is_another_heading = False for category, patterns in HEADING_PATTERNS.items(): for pattern in patterns: if re.search(pattern, next_text, re.IGNORECASE): is_another_heading = True break if is_another_heading: break if is_another_heading: break if has_red_text_in_paragraph(next_paragraph): print(f" 🔴 Found red text in paragraph {next_para_idx + 1} after heading") context_replacements = process_red_text_in_context_paragraph( next_paragraph, paragraph_text, flat_json, operator_name ) replacements_made += context_replacements return replacements_made def process_red_text_in_heading_paragraph(paragraph, paragraph_text, flat_json, operator_name): """Process red text found in heading paragraphs - FIXED""" replacements_made = 0 red_text_segments = [] for run in paragraph.runs: if is_red(run) and run.text.strip(): red_text_segments.append(run.text.strip()) if not red_text_segments: return 0 combined_red_text = " ".join(red_text_segments).strip() print(f" 🔍 Red text found in heading: '{combined_red_text}'") replacement_value = None # Determine what to replace based on heading context if any(mgmt_type in paragraph_text.upper() for mgmt_type in ["MAINTENANCE MANAGEMENT", "MASS MANAGEMENT", "FATIGUE MANAGEMENT"]): # For management section headings, replace with operator name if operator_name: replacement_value = operator_name print(f" ✅ Using operator name for management section: '{operator_name}'") elif "NHVAS APPROVED AUDITOR DECLARATION" in paragraph_text.upper(): # For auditor declarations, look for auditor name auditor_name = None for key, value in flat_json.items(): if "auditor" in key.lower() and "name" in key.lower(): if isinstance(value, list) and value: auditor_name = str(value[0]).strip() elif value: auditor_name = str(value).strip() break if auditor_name: replacement_value = auditor_name print(f" ✅ Using auditor name: '{auditor_name}'") elif "OPERATOR DECLARATION" in paragraph_text.upper(): # For operator declarations, use operator name if operator_name: replacement_value = operator_name print(f" ✅ Using operator name for operator declaration: '{operator_name}'") else: # For other headings, try to find a relevant match # First try direct match kv = find_matching_json_key_and_value(combined_red_text, flat_json) if kv: replacement_value = get_value_as_string(kv[1], combined_red_text) else: # Try contextual search with heading context_queries = [f"{paragraph_text} {combined_red_text}", combined_red_text, paragraph_text] for query in context_queries: kv = find_matching_json_key_and_value(query, flat_json) if kv: replacement_value = get_value_as_string(kv[1], combined_red_text) print(f" ✅ Found match with combined query: {kv[0]}") break # FIXED: Apply the replacement if we found a suitable value if replacement_value: red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()] if red_runs: # Replace the first red run with the new text red_runs[0].text = replacement_value red_runs[0].font.color.rgb = RGBColor(0, 0, 0) # Clear subsequent red runs for run in red_runs[1:]: run.text = '' replacements_made = 1 print(f" ✅ Replaced heading red text with: '{replacement_value}'") else: print(f" ❌ No suitable replacement found for: '{combined_red_text}'") return replacements_made def process_red_text_in_context_paragraph(paragraph, heading_text, flat_json, operator_name): """Process red text found in paragraphs following headings - FIXED""" replacements_made = 0 red_text_segments = [] for run in paragraph.runs: if is_red(run) and run.text.strip(): red_text_segments.append(run.text.strip()) if not red_text_segments: return 0 combined_red_text = " ".join(red_text_segments).strip() print(f" 🔍 Red text found: '{combined_red_text}'") replacement_value = None # Determine what to replace based on heading context if any(mgmt_type in heading_text.upper() for mgmt_type in ["MAINTENANCE MANAGEMENT", "MASS MANAGEMENT", "FATIGUE MANAGEMENT"]): # For management section headings, replace with operator name if operator_name: replacement_value = operator_name print(f" ✅ Using operator name for management section: '{operator_name}'") elif "NHVAS APPROVED AUDITOR DECLARATION" in heading_text.upper(): # For auditor declarations, look for auditor name auditor_name = None for key, value in flat_json.items(): if "auditor" in key.lower() and "name" in key.lower(): if isinstance(value, list) and value: auditor_name = str(value[0]).strip() elif value: auditor_name = str(value).strip() break if auditor_name: replacement_value = auditor_name print(f" ✅ Using auditor name: '{auditor_name}'") elif "OPERATOR DECLARATION" in heading_text.upper(): # For operator declarations, use operator name if operator_name: replacement_value = operator_name print(f" ✅ Using operator name for operator declaration: '{operator_name}'") else: # For other headings, try to find a relevant match # First try direct match kv = find_matching_json_key_and_value(combined_red_text, flat_json) if kv: replacement_value = get_value_as_string(kv[1], combined_red_text) else: # Try contextual search with heading context_queries = [f"{heading_text} {combined_red_text}", combined_red_text, heading_text] for query in context_queries: kv = find_matching_json_key_and_value(query, flat_json) if kv: replacement_value = get_value_as_string(kv[1], combined_red_text) print(f" ✅ Found match with combined query: {kv[0]}") break # FIXED: Apply the replacement if we found a suitable value if replacement_value: red_runs = [run for run in paragraph.runs if is_red(run) and run.text.strip()] if red_runs: # Replace the first red run with the new text red_runs[0].text = replacement_value red_runs[0].font.color.rgb = RGBColor(0, 0, 0) # Clear subsequent red runs for run in red_runs[1:]: run.text = '' replacements_made = 1 print(f" ✅ Replaced context red text with: '{replacement_value}'") else: print(f" ❌ No suitable replacement found for: '{combined_red_text}'") return replacements_made # ============================================================================ # Orchestrator # ============================================================================ def process_hf(json_file, docx_file, output_file): try: if hasattr(json_file, "read"): json_data = json.load(json_file) else: with open(json_file, 'r', encoding='utf-8') as f: json_data = json.load(f) flat_json = flatten_json(json_data) print("📄 Available JSON keys (sample):") for i, (key, value) in enumerate(sorted(flat_json.items())): if i < 10: print(f" - {key}: {value}") print(f" ... and {len(flat_json) - 10} more keys\n") if hasattr(docx_file, "read"): doc = Document(docx_file) else: doc = Document(docx_file) print("🚀 Starting comprehensive document processing...") table_replacements = process_tables(doc, flat_json) paragraph_replacements = process_paragraphs(doc, flat_json) heading_replacements = process_headings(doc, flat_json) total_replacements = table_replacements + paragraph_replacements + heading_replacements # Save unmatched headers for iterative improvement if _unmatched_headers: try: tmp_path = "/tmp/unmatched_headers.json" with open(tmp_path, 'w', encoding='utf-8') as f: json.dump(_unmatched_headers, f, indent=2, ensure_ascii=False) print(f"✅ Unmatched headers saved to {tmp_path}") except Exception as e: print(f"⚠️ Could not save unmatched headers: {e}") if hasattr(output_file, "write"): doc.save(output_file) else: doc.save(output_file) print(f"\n✅ Document saved as: {output_file}") print(f"✅ Total replacements: {total_replacements}") print(f" 📊 Tables: {table_replacements}") print(f" 📝 Paragraphs: {paragraph_replacements}") print(f" 📋 Headings: {heading_replacements}") print(f"🎉 Processing complete!") except FileNotFoundError as e: print(f"❌ File not found: {e}") except Exception as e: print(f"❌ Error: {e}") import traceback traceback.print_exc() if __name__ == "__main__": import sys if len(sys.argv) != 4: print("Usage: python pipeline.py ") exit(1) docx_path = sys.argv[1] json_path = sys.argv[2] output_path = sys.argv[3] process_hf(json_path, docx_path, output_path)