Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Enhanced NHVAS PDF to DOCX JSON Merger | |
| Comprehensive extraction and mapping from PDF to DOCX structure | |
| (keep pipeline intact; fix spacing, operator info mapping, vehicle-reg header mapping, date fallback) | |
| """ | |
| import json | |
| import re | |
| import sys | |
| from pathlib import Path | |
| from typing import Dict, List, Any, Optional | |
| from collections import OrderedDict # <-- add this | |
| def _nz(x): | |
| return x if isinstance(x, str) and x.strip() else "" | |
| def _fix_ocr_date_noise(date_str: str) -> str: | |
| """Clean up OCR date noise and standardize date format.""" | |
| if not date_str: | |
| return "" | |
| # Remove common OCR artifacts | |
| cleaned = re.sub(r'\s+', ' ', date_str.strip()) | |
| cleaned = re.sub(r'[^\w\s/\-]', '', cleaned) | |
| # Try to extract month/year patterns | |
| month_year_match = re.search(r'([A-Za-z]+)\s+(\d{4})', cleaned) | |
| if month_year_match: | |
| return f"{month_year_match.group(1)} {month_year_match.group(2)}" | |
| # Try to extract date patterns like "21st October 2022" | |
| date_match = re.search(r'(\d{1,2})(?:st|nd|rd|th)?\s+([A-Za-z]+)\s+(\d{4})', cleaned) | |
| if date_match: | |
| return f"{date_match.group(1)} {date_match.group(2)} {date_match.group(3)}" | |
| # Return cleaned version if no specific pattern found | |
| return cleaned | |
| SUMMARY_SECTIONS = { | |
| "MAINTENANCE MANAGEMENT": "Maintenance Management Summary", | |
| "MASS MANAGEMENT": "Mass Management Summary", | |
| "FATIGUE MANAGEMENT": "Fatigue Management Summary", | |
| } | |
| # βββββββββββββββββββββββββββββ helpers: text cleanup & label matching βββββββββββββββββββββββββββββ | |
| def _canon_header(s: str) -> str: | |
| if not s: return "" | |
| s = re.sub(r"\s+", " ", str(s)).strip().lower() | |
| s = s.replace("β", "-").replace("β", "-") | |
| s = re.sub(r"[/]+", " / ", s) | |
| s = re.sub(r"[^a-z0-9#/ ]+", " ", s) | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s | |
| # Header aliases -> internal keys we already use later during mapping | |
| _VEH_HEADER_ALIASES = { | |
| # common | |
| "registration number": "registration", | |
| "reg no": "registration", | |
| "reg.#": "registration", | |
| "no.": "no", | |
| "no": "no", | |
| # maintenance table | |
| "roadworthiness certificates": "roadworthiness", | |
| "maintenance records": "maintenance_records", | |
| "daily checks": "daily_checks", | |
| "fault recording reporting": "fault_recording", | |
| "fault recording / reporting": "fault_recording", | |
| "fault repair": "fault_repair", | |
| # mass table | |
| "sub contractor": "sub_contractor", | |
| "sub-contractor": "sub_contractor", | |
| "sub contracted vehicles statement of compliance": "sub_comp", | |
| "sub-contracted vehicles statement of compliance": "sub_comp", | |
| "weight verification records": "weight_verification", | |
| "rfs suspension certification #": "rfs_certification", | |
| "rfs suspension certification number": "rfs_certification", | |
| "suspension system maintenance": "suspension_maintenance", | |
| "trip records": "trip_records", | |
| "fault recording reporting on suspension system": "fault_reporting_suspension", | |
| "fault recording / reporting on suspension system": "fault_reporting_suspension", | |
| } | |
| # --- helpers --- | |
| def build_vehicle_sections(extracted: dict) -> dict: | |
| """Build arrays for Maintenance and Mass tables. Maintenance uses recorded rows to include ALL entries.""" | |
| maint = { | |
| "Registration Number": [], | |
| "Roadworthiness Certificates": [], | |
| "Maintenance Records": [], | |
| "Daily Checks": [], | |
| "Fault Recording/ Reporting": [], | |
| "Fault Repair": [], | |
| } | |
| mass = { | |
| "Registration Number": [], | |
| "Weight Verification Records": [], | |
| "RFS Suspension Certification #": [], | |
| "Suspension System Maintenance": [], | |
| "Trip Records": [], | |
| "Fault Recording/ Reporting on Suspension System": [], | |
| } | |
| # Prefer authoritative maintenance rows captured during parsing (spans all pages) | |
| if extracted.get("_maint_rows"): | |
| for row in extracted["_maint_rows"]: | |
| maint["Registration Number"].append(_smart_space(row.get("registration", ""))) | |
| maint["Roadworthiness Certificates"].append(_nz(row.get("roadworthiness", ""))) | |
| maint["Maintenance Records"].append(_nz(row.get("maintenance_records", ""))) | |
| maint["Daily Checks"].append(_nz(row.get("daily_checks", ""))) | |
| maint["Fault Recording/ Reporting"].append(_nz(row.get("fault_recording", ""))) | |
| maint["Fault Repair"].append(_nz(row.get("fault_repair", ""))) | |
| else: | |
| # Fallback to vehicles map (older behavior) | |
| for v in extracted.get("vehicles", []) or []: | |
| if not v.get("registration"): continue | |
| if v.get("seen_in_maintenance") or any(v.get(k) for k in ["roadworthiness","maintenance_records","daily_checks","fault_recording","fault_repair"]): | |
| rw = _nz(v.get("roadworthiness", "")); mr = _nz(v.get("maintenance_records", "")); dc = _nz(v.get("daily_checks", "")) | |
| fr = _nz(v.get("fault_recording", "")); rp = _nz(v.get("fault_repair", "")) | |
| if not mr and dc: mr = dc | |
| if not rp and fr: rp = fr | |
| if not fr and rp: fr = rp | |
| maint["Registration Number"].append(_smart_space(v["registration"])) | |
| maint["Roadworthiness Certificates"].append(rw) | |
| maint["Maintenance Records"].append(mr) | |
| maint["Daily Checks"].append(dc) | |
| maint["Fault Recording/ Reporting"].append(fr) | |
| maint["Fault Repair"].append(rp) | |
| # Mass stays as-is (from vehicles) | |
| for v in extracted.get("vehicles", []) or []: | |
| if not v.get("registration"): continue | |
| if v.get("seen_in_mass") or any(v.get(k) for k in ["weight_verification","rfs_certification","suspension_maintenance","trip_records","fault_reporting_suspension"]): | |
| mass["Registration Number"].append(_smart_space(v["registration"])) | |
| mass["Weight Verification Records"].append(_nz(v.get("weight_verification", ""))) | |
| mass["RFS Suspension Certification #"].append(_nz(v.get("rfs_certification", ""))) | |
| mass["Suspension System Maintenance"].append(_nz(v.get("suspension_maintenance", ""))) | |
| mass["Trip Records"].append(_nz(v.get("trip_records", ""))) | |
| mass["Fault Recording/ Reporting on Suspension System"].append(_nz(v.get("fault_reporting_suspension", ""))) | |
| return { | |
| "Vehicle Registration Numbers Maintenance": maint, | |
| "Vehicle Registration Numbers Mass": mass, | |
| } | |
| def _map_header_indices(headers: list[str]) -> dict: | |
| """Return {internal_key: column_index} by matching/aliasing header text.""" | |
| idx = {} | |
| for i, h in enumerate(headers or []): | |
| ch = _canon_header(h) | |
| # try direct alias | |
| if ch in _VEH_HEADER_ALIASES: | |
| idx[_VEH_HEADER_ALIASES[ch]] = i | |
| continue | |
| # relax a little for 'registration number' variants | |
| if "registration" in ch and "number" in ch: | |
| idx["registration"] = i | |
| continue | |
| if "roadworthiness" in ch: | |
| idx["roadworthiness"] = i | |
| continue | |
| if "maintenance" in ch and "records" in ch: | |
| idx["maintenance_records"] = i | |
| continue | |
| if "daily" in ch and "check" in ch: | |
| idx["daily_checks"] = i | |
| continue | |
| if "fault" in ch and "record" in ch and "suspension" not in ch: | |
| # maintenance fault-recording column | |
| if "repair" in ch: | |
| idx["fault_repair"] = i | |
| else: | |
| idx["fault_recording"] = i | |
| continue | |
| if "weight" in ch and "verification" in ch: | |
| idx["weight_verification"] = i | |
| continue | |
| if "rfs" in ch and "certification" in ch: | |
| idx["rfs_certification"] = i | |
| continue | |
| if "suspension" in ch and "maintenance" in ch: | |
| idx["suspension_maintenance"] = i | |
| continue | |
| if "trip" in ch and "record" in ch: | |
| idx["trip_records"] = i | |
| continue | |
| if "fault" in ch and "report" in ch and "suspension" in ch: | |
| idx["fault_reporting_suspension"] = i | |
| continue | |
| return idx | |
| def _canon(s: str) -> str: | |
| if not s: return "" | |
| s = re.sub(r"\s+", " ", str(s)).strip().lower() | |
| s = re.sub(r"[^a-z0-9#]+", " ", s) | |
| return re.sub(r"\s+", " ", s).strip() | |
| def _smart_space(s: str) -> str: | |
| if not s: return s | |
| s = str(s) | |
| # Insert spaces at typical OCR glue points | |
| s = re.sub(r'([a-z])([A-Z])', r'\1 \2', s) | |
| s = re.sub(r'([A-Za-z])(\d)', r'\1 \2', s) | |
| s = re.sub(r'(\d)([A-Za-z])', r'\1 \2', s) | |
| s = re.sub(r'([A-Z]{2,})(\d)', r'\1 \2', s) | |
| # Fix common glued tokens | |
| s = s.replace("POBox", "PO Box") | |
| # Compact ordinals back together: "9 th" -> "9th", but preserve a space after the ordinal if followed by a word | |
| s = re.sub(r'\b(\d+)\s*(st|nd|rd|th)\b', r'\1\2', s) | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s | |
| def looks_like_plate(s: str) -> bool: | |
| if not s: return False | |
| t = re.sub(r"[\s-]", "", str(s).upper()) | |
| if not (5 <= len(t) <= 8): return False | |
| if not re.fullmatch(r"[A-Z0-9]+", t): return False | |
| if sum(c.isalpha() for c in t) < 2: return False | |
| if sum(c.isdigit() for c in t) < 2: return False | |
| if t in {"ENTRY","YES","NO","N/A","NA"}: return False | |
| return True | |
| def is_dateish(s: str) -> bool: | |
| if not s: return False | |
| s = _smart_space(s) | |
| # tokens like 03/22, 20/02/2023, 01.02.21, 2023-02-20 | |
| return bool(re.search(r"\b\d{1,4}(?:[./-]\d{1,2}){1,2}\b", s)) | |
| def extract_date_tokens(s: str) -> list[str]: | |
| if not s: return [] | |
| s = _smart_space(s) | |
| return re.findall(r"\b\d{1,4}(?:[./-]\d{1,2}){1,2}\b", s) | |
| def _clean_list(vals: List[str]) -> List[str]: | |
| out = [] | |
| for v in vals: | |
| v = _smart_space(v) | |
| if v: | |
| out.append(v) | |
| return out | |
| def _looks_like_manual_value(s: str) -> bool: | |
| if not s: return False | |
| s = s.strip() | |
| # reject pure digits (e.g., "51902") and very short tokens | |
| if re.fullmatch(r"\d{3,}", s): | |
| return False | |
| # accept if it has any letters or typical version hints | |
| return bool(re.search(r"[A-Za-z]", s)) | |
| def _looks_like_company(s: str) -> bool: | |
| """Very light validation to avoid capturing labels as values.""" | |
| if not s: return False | |
| s = _smart_space(s) | |
| # at least two words containing letters (e.g., "Kangaroo Transport") | |
| return bool(re.search(r"[A-Za-z]{2,}\s+[A-Za-z&]{2,}", s)) | |
| # βββββββββββββββββββββββββββββ label index (non-summary only; no values) βββββββββββββββββββββββββββββ | |
| LABEL_INDEX: Dict[str, Dict[str, Dict[str, Any]]] = { | |
| "Audit Information": { | |
| "Date of Audit": {"alts": ["Date of Audit"]}, | |
| "Location of audit": {"alts": ["Location of audit", "Location"]}, | |
| "Auditor name": {"alts": ["Auditor name", "Auditor"]}, | |
| "Audit Matrix Identifier (Name or Number)": {"alts": ["Audit Matrix Identifier (Name or Number)", "Audit Matrix Identifier"]}, | |
| "Auditor Exemplar Global Reg No.": {"alts": ["Auditor Exemplar Global Reg No."]}, | |
| "NHVR Auditor Registration Number": {"alts": ["NHVR Auditor Registration Number"]}, | |
| "expiry Date:": {"alts": ["expiry Date:", "Expiry Date:"]}, | |
| }, | |
| "Operator Information": { | |
| "Operator name (Legal entity)": {"alts": ["Operator name (Legal entity)", "Operator's Name (legal entity)"]}, | |
| "NHVAS Accreditation No. (If applicable)": {"alts": ["NHVAS Accreditation No. (If applicable)", "NHVAS Accreditation No."]}, | |
| "Registered trading name/s": {"alts": ["Registered trading name/s", "Trading name/s"]}, | |
| "Australian Company Number": {"alts": ["Australian Company Number", "ACN"]}, | |
| "NHVAS Manual (Policies and Procedures) developed by": {"alts": [ | |
| "NHVAS Manual (Policies and Procedures) developed by", | |
| "NHVAS Manual developed by", | |
| "Manual developed by" | |
| ]}, | |
| }, | |
| "Operator contact details": { | |
| "Operator business address": {"alts": ["Operator business address", "Business address"]}, | |
| "Operator Postal address": {"alts": ["Operator Postal address", "Postal address"]}, | |
| "Email address": {"alts": ["Email address", "Email"]}, | |
| "Operator Telephone Number": {"alts": ["Operator Telephone Number", "Telephone", "Phone"]}, | |
| }, | |
| "Attendance List (Names and Position Titles)": { | |
| "Attendance List (Names and Position Titles)": {"alts": ["Attendance List (Names and Position Titles)", "Attendance List"]}, | |
| }, | |
| "Nature of the Operators Business (Summary)": { | |
| "Nature of the Operators Business (Summary):": {"alts": ["Nature of the Operators Business (Summary):"]}, | |
| }, | |
| "Accreditation Vehicle Summary": { | |
| "Number of powered vehicles": {"alts": ["Number of powered vehicles"]}, | |
| "Number of trailing vehicles": {"alts": ["Number of trailing vehicles"]}, | |
| }, | |
| "Accreditation Driver Summary": { | |
| "Number of drivers in BFM": {"alts": ["Number of drivers in BFM"]}, | |
| "Number of drivers in AFM": {"alts": ["Number of drivers in AFM"]}, | |
| }, | |
| "Vehicle Registration Numbers Maintenance": { | |
| "No.": {"alts": ["No.", "No"]}, | |
| "Registration Number": {"alts": ["Registration Number", "Registration"]}, | |
| "Roadworthiness Certificates": {"alts": ["Roadworthiness Certificates", "Roadworthiness"]}, | |
| "Maintenance Records": {"alts": ["Maintenance Records"]}, | |
| "Daily Checks": {"alts": ["Daily Checks", "Daily Check"]}, | |
| "Fault Recording/ Reporting": {"alts": ["Fault Recording/ Reporting", "Fault Recording / Reporting"]}, | |
| "Fault Repair": {"alts": ["Fault Repair"]}, | |
| }, | |
| "Vehicle Registration Numbers Mass": { | |
| "No.": {"alts": ["No.", "No"]}, | |
| "Registration Number": {"alts": ["Registration Number", "Registration"]}, | |
| "Sub contractor": {"alts": ["Sub contractor", "Sub-contractor"]}, | |
| "Sub-contracted Vehicles Statement of Compliance": {"alts": ["Sub-contracted Vehicles Statement of Compliance"]}, | |
| "Weight Verification Records": {"alts": ["Weight Verification Records"]}, | |
| "RFS Suspension Certification #": {"alts": ["RFS Suspension Certification #", "RFS Suspension Certification Number"]}, | |
| "Suspension System Maintenance": {"alts": ["Suspension System Maintenance"]}, | |
| "Trip Records": {"alts": ["Trip Records"]}, | |
| "Fault Recording/ Reporting on Suspension System": {"alts": ["Fault Recording/ Reporting on Suspension System"]}, | |
| }, | |
| "Driver / Scheduler Records Examined": { | |
| "No.": {"alts": ["No.", "No"]}, | |
| "Driver / Scheduler Name": {"alts": ["Driver / Scheduler Name"]}, | |
| "Driver TLIF Course # Completed": {"alts": ["Driver TLIF Course # Completed"]}, | |
| "Scheduler TLIF Course # Completed": {"alts": ["Scheduler TLIF Course # Completed"]}, | |
| "Medical Certificates (Current Yes/No) Date of expiry": {"alts": ["Medical Certificates (Current Yes/No) Date of expiry"]}, | |
| "Roster / Schedule / Safe Driving Plan (Date Range)": {"alts": ["Roster / Schedule / Safe Driving Plan (Date Range)"]}, | |
| "Fit for Duty Statement Completed (Yes/No)": {"alts": ["Fit for Duty Statement Completed (Yes/No)"]}, | |
| "Work Diary Pages (Page Numbers) Electronic Work Diary Records (Date Range)": {"alts": ["Work Diary Pages (Page Numbers) Electronic Work Diary Records (Date Range)"]}, | |
| }, | |
| "NHVAS Approved Auditor Declaration": { | |
| "Print Name": {"alts": ["Print Name"]}, | |
| "NHVR or Exemplar Global Auditor Registration Number": {"alts": ["NHVR or Exemplar Global Auditor Registration Number"]}, | |
| }, | |
| "Audit Declaration dates": { | |
| "Audit was conducted on": {"alts": ["Audit was conducted on"]}, | |
| "Unconditional CARs closed out on:": {"alts": ["Unconditional CARs closed out on:"]}, | |
| "Conditional CARs to be closed out by:": {"alts": ["Conditional CARs to be closed out by:"]}, | |
| }, | |
| "Print accreditation name": { | |
| "(print accreditation name)": {"alts": ["(print accreditation name)"]}, | |
| }, | |
| "Operator Declaration": { | |
| "Print Name": {"alts": ["Print Name"]}, | |
| "Position Title": {"alts": ["Position Title"]}, | |
| }, | |
| } | |
| class NHVASMerger: | |
| def __init__(self): | |
| self.debug_mode = True | |
| self._vehicle_by_reg = OrderedDict() | |
| def log_debug(self, msg: str): | |
| if self.debug_mode: | |
| print(f"π {msg}") | |
| def normalize_std_label(self, label: str) -> str: | |
| if not label: return "" | |
| base = re.sub(r"\([^)]*\)", "", label) | |
| base = re.sub(r"\s+", " ", base).strip() | |
| m = re.match(r"^(Std\s*\d+\.\s*[^:]+?)\s*$", base, flags=re.IGNORECASE) | |
| return m.group(1).strip() if m else base | |
| def _pick_nearby(self, row, anchor_idx: int | None, want: str = "plate", window: int = 3) -> str: | |
| """Return the best cell for a field by looking at the anchor index and nearby columns. | |
| want β {"plate","date","rf","yn"}""" | |
| def cell(i): | |
| if i is None or i < 0 or i >= len(row): return "" | |
| v = row[i] | |
| return v.strip() if isinstance(v, str) else str(v).strip() | |
| # 1) try the anchor cell | |
| cand = cell(anchor_idx) | |
| if want == "plate" and looks_like_plate(cand): return _smart_space(cand) | |
| if want == "date" and is_dateish(cand): return _smart_space(cand) | |
| if want == "rf" and re.search(r"\bRF\s*\d+\b", cand, re.I): return _smart_space(re.search(r"\bRF\s*\d+\b", cand, re.I).group(0)) | |
| if want == "yn" and cand.strip().lower() in {"yes","no"}: return cand.strip().title() | |
| # 2) scan a window around the anchor | |
| if anchor_idx is not None: | |
| for offset in range(1, window+1): | |
| for i in (anchor_idx - offset, anchor_idx + offset): | |
| c = cell(i) | |
| if not c: continue | |
| if want == "plate" and looks_like_plate(c): return _smart_space(c) | |
| if want == "date" and is_dateish(c): return _smart_space(c) | |
| if want == "rf": | |
| m = re.search(r"\bRF\s*\d+\b", c, re.I) | |
| if m: return _smart_space(m.group(0)) | |
| if want == "yn" and c.strip().lower() in {"yes","no"}: return c.strip().title() | |
| # 3) last resort: scan whole row | |
| joined = " ".join(str(c or "") for c in row) | |
| if want == "plate": | |
| for tok in joined.split(): | |
| if looks_like_plate(tok): return _smart_space(tok) | |
| if want == "date": | |
| tok = extract_date_tokens(joined) | |
| return tok[0] if tok else "" | |
| if want == "rf": | |
| m = re.search(r"\bRF\s*\d+\b", joined, re.I) | |
| return _smart_space(m.group(0)) if m else "" | |
| if want == "yn": | |
| j = f" {joined.lower()} " | |
| if " yes " in j: return "Yes" | |
| if " no " in j: return "No" | |
| return "" | |
| def _force_fill_maintenance_from_tables(self, pdf_data: Dict, merged: Dict) -> None: | |
| """Overwrite Maintenance arrays by scanning ALL maintenance tables across pages.""" | |
| maint = merged.get("Vehicle Registration Numbers Maintenance") | |
| if not isinstance(maint, dict): | |
| return | |
| tables = (pdf_data.get("extracted_data") or {}).get("all_tables") or [] | |
| regs, rw, mr, dc, fr, rp = [], [], [], [], [], [] | |
| for t in tables: | |
| hdrs = [_canon_header(h or "") for h in t.get("headers") or []] | |
| if not hdrs: | |
| continue | |
| # detect a maintenance table | |
| txt = " ".join(hdrs) | |
| if ("registration" not in txt) or not any( | |
| k in txt for k in ["maintenance records", "daily", "fault recording", "fault repair", "roadworthiness"] | |
| ): | |
| continue | |
| def fidx(pred): | |
| for i, h in enumerate(hdrs): | |
| if pred(h): | |
| return i | |
| return None | |
| reg_i = fidx(lambda h: "registration" in h) | |
| rw_i = fidx(lambda h: "roadworthiness" in h) | |
| mr_i = fidx(lambda h: "maintenance" in h and "record" in h) | |
| dc_i = fidx(lambda h: "daily" in h and "check" in h) | |
| fr_i = fidx(lambda h: "fault" in h and "record" in h and "suspension" not in h) | |
| rp_i = fidx(lambda h: "fault" in h and "repair" in h) | |
| for r in t.get("data") or []: | |
| def cell(i): | |
| if i is None or i >= len(r): return "" | |
| v = r[i] | |
| return v.strip() if isinstance(v, str) else str(v).strip() | |
| plate = _smart_space(cell(reg_i)) | |
| if not plate or not looks_like_plate(plate): | |
| continue | |
| v_rw = _nz(cell(rw_i)) | |
| v_mr = _nz(cell(mr_i)) | |
| v_dc = _nz(cell(dc_i)) | |
| v_fr = _nz(cell(fr_i)) | |
| v_rp = _nz(cell(rp_i)) | |
| # sensible fallbacks | |
| if not v_mr and v_dc: v_mr = v_dc | |
| if not v_rp and v_fr: v_rp = v_fr | |
| if not v_fr and v_rp: v_fr = v_rp | |
| regs.append(plate); rw.append(v_rw); mr.append(v_mr) | |
| dc.append(v_dc); fr.append(v_fr); rp.append(v_rp) | |
| if regs: # overwrite arrays only if we found rows | |
| maint["Registration Number"] = regs | |
| maint["Roadworthiness Certificates"] = rw | |
| maint["Maintenance Records"] = mr | |
| maint["Daily Checks"] = dc | |
| maint["Fault Recording/ Reporting"] = fr | |
| maint["Fault Repair"] = rp | |
| def _collapse_multiline_headers(self, headers: List[str], data_rows: List[List[str]]): | |
| """ | |
| Merge header continuation rows (when first data rows are not numeric '1.', '2.', β¦) | |
| into the main headers, then return (merged_headers, remaining_data_rows). | |
| """ | |
| merged = [_smart_space(h or "") for h in (headers or [])] | |
| consumed = 0 | |
| header_frags: List[List[str]] = [] | |
| # Collect up to 5 leading rows that look like header fragments | |
| for r in data_rows[:5]: | |
| first = (str(r[0]).strip() if r else "") | |
| if re.match(r"^\d+\.?$", first): | |
| break # real data starts | |
| consumed += 1 | |
| header_frags.append(r) | |
| # Merge every collected fragment row into merged | |
| for frag in header_frags: | |
| for i, cell in enumerate(frag): | |
| cell_txt = _smart_space(str(cell or "").strip()) | |
| if not cell_txt: | |
| continue | |
| if i >= len(merged): | |
| merged.append(cell_txt) | |
| else: | |
| merged[i] = (merged[i] + " " + cell_txt).strip() | |
| return merged, data_rows[consumed:] | |
| def _first_attendance_name_title(self, att_list: List[str]) -> Optional[tuple[str, str]]: | |
| """Return (print_name, position_title) from the first 'Name - Title' in attendance.""" | |
| if not att_list: | |
| return None | |
| # First "Name - Title", stop before next "Name -" | |
| pat = re.compile( | |
| r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+){0,3})\s*-\s*(.*?)(?=(?:\s+[A-Z][a-z]+(?:\s+[A-Z][a-z]+){0,3}\s*-\s*)|$)' | |
| ) | |
| for item in att_list: | |
| s = _smart_space(str(item)) | |
| m = pat.search(s) | |
| if m: | |
| name = _smart_space(m.group(1)) | |
| title = _smart_space(m.group(2)) | |
| return name, title | |
| return None | |
| # βββββββββββββββββββββββββββββ summary tables (unchanged logic) βββββββββββββββββββββββββββββ | |
| def build_summary_maps(self, pdf_json: dict) -> dict: | |
| """Enhanced summary mapping that correctly identifies detailed summary tables.""" | |
| out = {v: {} for v in SUMMARY_SECTIONS.values()} | |
| try: | |
| tables = pdf_json["extracted_data"]["all_tables"] | |
| except Exception: | |
| return out | |
| self.log_debug(f"Processing {len(tables)} tables total") | |
| for i, t in enumerate(tables): | |
| page = t.get("page", "?") | |
| headers = [re.sub(r"\s+", " ", (h or "")).strip().upper() for h in t.get("headers", [])] | |
| if not headers: | |
| continue | |
| data_rows = t.get("data", []) | |
| if not data_rows: | |
| continue | |
| self.log_debug(f"Table {i} (page {page}): headers = {headers[:5]}") | |
| # Check for DETAILS column - but be more flexible | |
| table_header_text = " ".join(headers).upper() | |
| has_details_column = "DETAILS" in table_header_text | |
| self.log_debug(f" Has DETAILS column: {has_details_column}") | |
| if not has_details_column: | |
| # Check if this might be a summary table without explicit "DETAILS" header | |
| # Look for management type + detailed content | |
| has_management_keyword = any(keyword in table_header_text for keyword in | |
| ["MAINTENANCE MANAGEMENT", "MASS MANAGEMENT", "FATIGUE MANAGEMENT"]) | |
| if not has_management_keyword: | |
| self.log_debug(f" Skipping - no DETAILS column and no management keywords") | |
| continue | |
| else: | |
| self.log_debug(f" No DETAILS column but has management keyword - checking content...") | |
| # Look for "Std X." patterns in the first column | |
| has_standards = False | |
| is_detailed_summary = False | |
| section_type = None | |
| sample_content = [] | |
| for row in data_rows[:3]: # Check first few rows | |
| if not row: | |
| continue | |
| first_cell = str(row[0]).strip() | |
| if re.match(r"Std\s+\d+\.", first_cell, re.IGNORECASE): | |
| has_standards = True | |
| self.log_debug(f" Found standard: {first_cell}") | |
| # Check all cells in this row for detailed content | |
| for i in range(1, len(row)): | |
| cell_content = str(row[i]).strip() | |
| if len(cell_content) > 50: # Detailed content is much longer | |
| sample_content.append(cell_content[:200]) # Store sample for analysis | |
| if not re.match(r"^[VNC\s]*$", cell_content): | |
| is_detailed_summary = True | |
| self.log_debug(f" Found detailed content: {cell_content[:100]}...") | |
| break | |
| self.log_debug(f" Has standards: {has_standards}, Is detailed: {is_detailed_summary}") | |
| if not has_standards: | |
| self.log_debug(f" Skipping - no standards found") | |
| continue | |
| if not is_detailed_summary: | |
| self.log_debug(f" Skipping - not detailed summary (content too short or just V/NC)") | |
| continue | |
| # Identify management type from headers AND content | |
| if "MAINTENANCE" in table_header_text: | |
| section_type = "Maintenance Management Summary" | |
| elif "MASS" in table_header_text: | |
| section_type = "Mass Management Summary" | |
| elif "FATIGUE" in table_header_text: | |
| section_type = "Fatigue Management Summary" | |
| else: | |
| # Fallback: analyze the actual standard content to determine type | |
| combined_content = " ".join(sample_content).lower() | |
| self.log_debug(f" Analyzing content keywords: {combined_content[:200]}...") | |
| # Identify by content keywords | |
| if any(keyword in combined_content for keyword in [ | |
| "daily check", "fault", "maintenance", "repair", "service", "workshop" | |
| ]): | |
| section_type = "Maintenance Management Summary" | |
| self.log_debug(f" Identified as Maintenance by content") | |
| elif any(keyword in combined_content for keyword in [ | |
| "mass", "weight", "verification", "vehicle", "load", "gauge" | |
| ]): | |
| section_type = "Mass Management Summary" | |
| self.log_debug(f" Identified as Mass by content") | |
| elif any(keyword in combined_content for keyword in [ | |
| "fatigue", "scheduling", "rostering", "duty", "medical", "driver" | |
| ]): | |
| section_type = "Fatigue Management Summary" | |
| self.log_debug(f" Identified as Fatigue by content") | |
| if not section_type: | |
| self.log_debug(f" Could not determine section type for table with headers: {headers[:3]}") | |
| continue | |
| self.log_debug(f" β Processing {section_type} table from page {page}") | |
| # Extract the data from the detailed content | |
| standards_found = 0 | |
| for row in data_rows: | |
| if not row: | |
| continue | |
| left = str(row[0]) if len(row) >= 1 else "" | |
| # Find the details content (longest cell in the row) | |
| details_content = "" | |
| for i in range(1, len(row)): | |
| cell_content = str(row[i]).strip() | |
| if len(cell_content) > len(details_content): | |
| details_content = cell_content | |
| left_norm = self.normalize_std_label(left) | |
| if left_norm and details_content and len(details_content) > 50: | |
| prev = out[section_type].get(left_norm, "") | |
| merged_text = (prev + " " + details_content).strip() if prev else details_content.strip() | |
| out[section_type][left_norm] = merged_text | |
| standards_found += 1 | |
| self.log_debug(f" Added {left_norm}: {details_content[:100]}...") | |
| self.log_debug(f" Extracted {standards_found} standards from {section_type}") | |
| # Convert to list format as expected by the rest of the code | |
| for sec in out: | |
| out[sec] = {k: [_smart_space(v)] for k, v in out[sec].items() if v} | |
| self.log_debug(f"Summary maps built: {list(out.keys())}") | |
| for section_name, data in out.items(): | |
| if data: | |
| self.log_debug(f" β {section_name}: {len(data)} standards found - {list(data.keys())[:3]}...") | |
| else: | |
| self.log_debug(f" β {section_name}: No data found") | |
| return out | |
| # βββββββββββββββββββββββββββββ NEW: find cell by label in tables βββββββββββββββββββββββββββββ | |
| def _find_table_value(self, tables: List[Dict], label_variants: List[str]) -> Optional[str]: | |
| targets = {_canon(v) for v in label_variants} | |
| for t in tables: | |
| data = t.get("data", []) | |
| if not data: continue | |
| for row in data: | |
| if not row: continue | |
| key = _canon(str(row[0])) | |
| if key in targets: | |
| vals = [str(c).strip() for c in row[1:] if str(c).strip()] | |
| if vals: | |
| return _smart_space(" ".join(vals)) | |
| return None | |
| # βββββββββββββββββββββββββββββ comprehensive extraction (minimal changes) βββββββββββββββββββββββββββββ | |
| def extract_from_pdf_comprehensive(self, pdf_data: Dict) -> Dict[str, Any]: | |
| self._vehicle_by_reg.clear() | |
| extracted = {} | |
| extracted_data = pdf_data.get("extracted_data", {}) | |
| tables = extracted_data.get("all_tables", []) | |
| # Capture "Audit was conducted on" from tables; ignore placeholder "Date" | |
| awd = self._find_table_value( | |
| tables, | |
| LABEL_INDEX["Audit Declaration dates"]["Audit was conducted on"]["alts"] | |
| ) | |
| if awd: | |
| awd = _smart_space(awd) | |
| if re.search(r"\d", awd) and not re.fullmatch(r"date", awd, re.I): | |
| extracted["audit_conducted_date"] = awd | |
| # 1) Audit Information (table first) | |
| audit_info = extracted_data.get("audit_information", {}) | |
| if audit_info: | |
| extracted["audit_info"] = { | |
| "date_of_audit": _smart_space(audit_info.get("DateofAudit", "")), | |
| "location": _smart_space(audit_info.get("Locationofaudit", "")), | |
| "auditor_name": _smart_space(audit_info.get("Auditorname", "")), | |
| "matrix_id": _smart_space(audit_info.get("AuditMatrixIdentifier (Name or Number)", "")), | |
| } | |
| # If missing, try generic table lookup | |
| for label, meta in LABEL_INDEX.get("Audit Information", {}).items(): | |
| if label == "expiry Date:": # not used in your DOCX example | |
| continue | |
| val = self._find_table_value(tables, meta.get("alts", [label])) | |
| if val: | |
| extracted.setdefault("audit_info", {}) | |
| if _canon(label) == _canon("Date of Audit"): extracted["audit_info"]["date_of_audit"] = val | |
| elif _canon(label) == _canon("Location of audit"): extracted["audit_info"]["location"] = val | |
| elif _canon(label) == _canon("Auditor name"): extracted["audit_info"]["auditor_name"] = val | |
| elif _canon(label) == _canon("Audit Matrix Identifier (Name or Number)"): extracted["audit_info"]["matrix_id"] = val | |
| # 2) Operator Information (prefer table rows) | |
| operator_info = extracted_data.get("operator_information", {}) | |
| if operator_info: | |
| extracted["operator_info"] = { | |
| "name": "", | |
| "trading_name": _smart_space(operator_info.get("trading_name", "")), | |
| "acn": _smart_space(operator_info.get("company_number", "")), | |
| "manual": _smart_space(operator_info.get("nhvas_accreditation", "")), | |
| "business_address": _smart_space(operator_info.get("business_address", "")), | |
| "postal_address": _smart_space(operator_info.get("postal_address", "")), | |
| "email": operator_info.get("email", ""), | |
| "phone": _smart_space(operator_info.get("phone", "")), | |
| } | |
| # Fill operator info via table lookup | |
| for label, meta in LABEL_INDEX.get("Operator Information", {}).items(): | |
| val = self._find_table_value(tables, meta.get("alts", [label])) | |
| if not val: continue | |
| if _canon(label) == _canon("Operator name (Legal entity)") and _looks_like_company(val): | |
| extracted.setdefault("operator_info", {}) | |
| extracted["operator_info"]["name"] = val | |
| elif _canon(label) == _canon("Registered trading name/s"): | |
| extracted.setdefault("operator_info", {}) | |
| extracted["operator_info"]["trading_name"] = val | |
| elif _canon(label) == _canon("Australian Company Number"): | |
| extracted.setdefault("operator_info", {}) | |
| extracted["operator_info"]["acn"] = val | |
| elif _canon(label) == _canon("NHVAS Manual (Policies and Procedures) developed by"): | |
| extracted.setdefault("operator_info", {}) | |
| if _looks_like_manual_value(val): | |
| extracted["operator_info"]["manual"] = val | |
| # 3) Generic table parsing (unchanged logic for other sections) | |
| self._extract_table_data(tables, extracted) | |
| # 4) Text parsing (kept, but spacing applied) | |
| self._extract_text_content(extracted_data.get("all_text_content", []), extracted) | |
| # Vehicle tables sometimes fail to land in all_tables; parse from text as a fallback | |
| self._extract_vehicle_tables_from_text(extracted_data.get("all_text_content", []), extracted) | |
| # 5) Vehicle/Driver data (kept) | |
| self._extract_vehicle_driver_data(extracted_data, extracted) | |
| # 6) Detailed mgmt (kept) | |
| self._extract_detailed_management_data(extracted_data, extracted) | |
| return extracted | |
| # βββββββββββββββββββββββββββββ table classifiers βββββββββββββββββββββββββββββ | |
| # replace your _extract_table_data with this version | |
| def _extract_table_data(self, tables: List[Dict], extracted: Dict): | |
| for table in tables: | |
| headers = table.get("headers", []) or [] | |
| data_rows = table.get("data", []) or [] | |
| if not data_rows: | |
| continue | |
| page_num = table.get("page", 0) | |
| self.log_debug(f"Processing table on page {page_num} with headers: {headers[:3]}...") | |
| # NEW: Check for single-column Nature of Business table | |
| if (len(headers) == 1 and | |
| "nature of the operators business" in str(headers[0]).lower() and | |
| len(data_rows) > 0 and len(data_rows[0]) > 0): | |
| text = str(data_rows[0][0]) | |
| self.log_debug(f"Found Nature of Business table with text: {text[:100]}...") | |
| # Extract inline expiry date and accreditation number | |
| m_exp = re.search(r"\b(?:Mass and Maintenance\s+)?Expiry\s*Date[:\s-]*([0-9\.\/\-]+)", text, flags=re.I) | |
| m_acc = re.search(r"\bAccreditation\s*Number[:\s-]*([A-Za-z0-9\s\-\/]+)", text, flags=re.I) | |
| if m_exp: | |
| exp_date = m_exp.group(1).strip() | |
| extracted.setdefault("business_summary_extras", {})["expiry_date"] = exp_date | |
| self.log_debug(f"Extracted expiry date: {exp_date}") | |
| if m_acc: | |
| acc_num = m_acc.group(1).strip() | |
| extracted.setdefault("business_summary_extras", {})["accreditation_number"] = acc_num | |
| self.log_debug(f"Extracted accreditation number: {acc_num}") | |
| # Store the clean text (without the inline date/number) | |
| clean_text = re.sub(r"\s*(?:Mass and Maintenance\s+)?Expiry\s*Date[:\s-]*[0-9\.\/\-]+", "", text, flags=re.I) | |
| clean_text = re.sub(r"\s*Accreditation\s*Number[:\s-]*[A-Za-z0-9\s\-\/]+", "", clean_text, flags=re.I) | |
| extracted["business_summary"] = clean_text.strip() | |
| continue | |
| # π§ NEW: collapse possible multi-line headers once up front | |
| collapsed_headers, collapsed_rows = self._collapse_multiline_headers(headers, data_rows) | |
| # π§ Try vehicle tables FIRST using either raw or collapsed headers | |
| if self._is_vehicle_registration_table(headers) or self._is_vehicle_registration_table(collapsed_headers): | |
| # always extract with the collapsed header/rows so we see "Registration Number", etc. | |
| self._extract_vehicle_registration_table(collapsed_headers, collapsed_rows, extracted, page_num) | |
| continue | |
| # the rest keep their existing order/logic (use the original headers/rows) | |
| if self._is_audit_info_table(headers): | |
| self._extract_audit_info_table(data_rows, extracted) | |
| elif self._is_operator_info_table(headers): | |
| self._extract_operator_info_table(data_rows, extracted) | |
| elif self._is_attendance_table(headers): | |
| self._extract_attendance_table(data_rows, extracted) | |
| elif self._is_vehicle_summary_table(headers): | |
| self._extract_vehicle_summary_table(data_rows, extracted) | |
| elif self._is_driver_table(headers): | |
| self._extract_driver_table(headers, data_rows, extracted) | |
| elif self._is_management_compliance_table(headers): | |
| self._extract_management_table(data_rows, extracted, headers) | |
| def _is_audit_info_table(self, headers: List[str]) -> bool: | |
| txt = " ".join(str(h) for h in headers).lower() | |
| return any(t in txt for t in ["audit", "date", "location", "auditor"]) | |
| def _is_operator_info_table(self, headers: List[str]) -> bool: | |
| txt = " ".join(str(h) for h in headers).lower() | |
| return any(t in txt for t in ["operator", "company", "trading", "address"]) | |
| def _is_attendance_table(self, headers: List[str]) -> bool: | |
| txt = " ".join(str(h) for h in headers).lower() | |
| return "attendance" in txt | |
| def _is_vehicle_summary_table(self, headers: List[str]) -> bool: | |
| txt = " ".join(str(h) for h in headers).lower() | |
| return any(t in txt for t in ["powered vehicles", "trailing vehicles", "drivers in bfm"]) | |
| def _is_vehicle_registration_table(self, headers: List[str]) -> bool: | |
| if not headers: return False | |
| ch = [_canon_header(h) for h in headers] | |
| has_reg = any( | |
| ("registration" in h) or re.search(r"\breg(?:istration)?\b", h) or ("reg" in h and "no" in h) | |
| for h in ch | |
| ) | |
| others = ["roadworthiness","maintenance records","daily checks","fault recording","fault repair", | |
| "sub contractor","sub-contractor","weight verification","rfs suspension","suspension system maintenance", | |
| "trip records","fault recording reporting on suspension system","fault reporting suspension"] | |
| has_signal = any(any(tok in h for tok in others) for h in ch) | |
| return has_reg and has_signal | |
| def _is_driver_table(self, headers: List[str]) -> bool: | |
| txt = " ".join(str(h) for h in headers).lower() | |
| return any(t in txt for t in ["driver", "scheduler", "tlif", "medical"]) | |
| def _is_management_compliance_table(self, headers: List[str]) -> bool: | |
| txt = " ".join(str(h) for h in headers).lower() | |
| return any(t in txt for t in ["maintenance management", "mass management", "fatigue management"]) | |
| def _extract_vehicle_tables_from_text(self, text_pages: List[Dict], extracted: Dict): | |
| # flatten text | |
| lines = [] | |
| for p in text_pages or []: | |
| for ln in re.split(r"\s*\n\s*", p.get("text", "")): | |
| ln = _smart_space(ln) | |
| if ln: lines.append(ln) | |
| maint_rows, mass_rows = [], [] | |
| rf_pat = re.compile(r"\bRF\s*\d+\b", re.IGNORECASE) | |
| for ln in lines: | |
| # find first token that looks like a rego | |
| tokens = ln.split() | |
| reg = next((t for t in tokens if looks_like_plate(t)), None) | |
| if not reg: | |
| continue | |
| # everything after the reg on that line | |
| tail = _smart_space(ln.split(reg, 1)[1]) if reg in ln else "" | |
| dates = extract_date_tokens(tail) | |
| has_rf = bool(rf_pat.search(ln)) or "suspension" in ln.lower() | |
| if has_rf: | |
| rfs = (rf_pat.search(ln).group(0).upper().replace(" ", "") if rf_pat.search(ln) else "") | |
| wv = dates[0] if len(dates) > 0 else "" | |
| rest = dates[1:] | |
| mass_rows.append({ | |
| "registration": reg, | |
| "sub_contractor": "Yes" if " yes " in f" {ln.lower()} " else ("No" if " no " in f" {ln.lower()} " else ""), | |
| "sub_comp": "Yes" if " yes " in f" {ln.lower()} " else ("No" if " no " in f" {ln.lower()} " else ""), | |
| "weight_verification": wv, | |
| "rfs_certification": rfs or ("N/A" if "n/a" in ln.lower() else ""), | |
| "suspension_maintenance": rest[0] if len(rest) > 0 else "", | |
| "trip_records": rest[1] if len(rest) > 1 else "", | |
| "fault_reporting_suspension": rest[2] if len(rest) > 2 else "", | |
| }) | |
| else: | |
| # map first 5 date-like tokens in sensible order; fallbacks keep table consistent | |
| rw = dates[0] if len(dates) > 0 else "" | |
| mr = dates[1] if len(dates) > 1 else "" | |
| dc = dates[2] if len(dates) > 2 else "" | |
| fr = dates[3] if len(dates) > 3 else "" | |
| rp = dates[4] if len(dates) > 4 else "" | |
| maint_rows.append({ | |
| "registration": reg, | |
| "roadworthiness": rw, | |
| "maintenance_records": mr or dc, | |
| "daily_checks": dc, | |
| "fault_recording": fr or rp, | |
| "fault_repair": rp or fr, | |
| }) | |
| # ... after building maint_rows and mass_rows ... | |
| vlist = extracted.setdefault("vehicles", []) # ensure it always exists | |
| if maint_rows or mass_rows: | |
| for r in maint_rows: | |
| r["section"] = "maintenance" | |
| vlist.append(r) | |
| for r in mass_rows: | |
| r["section"] = "mass" | |
| vlist.append(r) | |
| self.log_debug(f"Vehicle rows (text fallback): maint={len(maint_rows)} mass={len(mass_rows)} total={len(vlist)}") | |
| else: | |
| self.log_debug("Vehicle rows (text fallback): none detected.") | |
| # βββββββββββββββββββββββββββββ simple extractors (spacing applied) βββββββββββββββββββββββββββββ | |
| def _extract_audit_info_table(self, data_rows: List[List], extracted: Dict): | |
| ai = extracted.setdefault("audit_info", {}) | |
| for row in data_rows: | |
| if len(row) < 2: continue | |
| key = _canon(row[0]) | |
| val = _smart_space(" ".join(str(c).strip() for c in row[1:] if str(c).strip())) | |
| if not val: continue | |
| if "date" in key and "audit" in key: ai["date_of_audit"] = val | |
| elif "location" in key: ai["location"] = val | |
| elif "auditor" in key and "name" in key: ai["auditor_name"] = val | |
| elif "matrix" in key: ai["matrix_id"] = val | |
| def _extract_operator_info_table(self, data_rows: List[List], extracted: Dict): | |
| oi = extracted.setdefault("operator_info", {}) | |
| for row in data_rows: | |
| if len(row) < 2: continue | |
| key = _canon(row[0]) | |
| val = _smart_space(" ".join(str(c).strip() for c in row[1:] if str(c).strip())) | |
| if not val: continue | |
| if "operator" in key and "name" in key and _looks_like_company(val): oi["name"] = val | |
| elif "trading" in key: oi["trading_name"] = val | |
| elif "australian" in key and "company" in key: oi["acn"] = val | |
| elif "business" in key and "address" in key: oi["business_address"] = val | |
| elif "postal" in key and "address" in key: oi["postal_address"] = val | |
| elif "email" in key: oi["email"] = val | |
| elif "telephone" in key or "phone" in key: oi["phone"] = val | |
| elif "manual" in key or ("nhvas" in key and "manual" in key) or "developed" in key: | |
| if _looks_like_manual_value(val): | |
| oi["manual"] = val | |
| def _extract_attendance_table(self, data_rows: List[List], extracted: Dict): | |
| lst = [] | |
| for row in data_rows: | |
| if not row: continue | |
| cells = [str(c).strip() for c in row if str(c).strip()] | |
| if not cells: continue | |
| lst.append(_smart_space(" ".join(cells))) | |
| if lst: | |
| extracted["attendance"] = lst | |
| def _extract_vehicle_summary_table(self, data_rows: List[List], extracted: Dict): | |
| vs = extracted.setdefault("vehicle_summary", {}) | |
| for row in data_rows: | |
| if len(row) < 2: continue | |
| key = _canon(row[0]) | |
| value = "" | |
| for c in row[1:]: | |
| if str(c).strip(): | |
| value = _smart_space(str(c).strip()); break | |
| if not value: continue | |
| if "powered" in key and "vehicle" in key: vs["powered_vehicles"] = value | |
| elif "trailing" in key and "vehicle" in key: vs["trailing_vehicles"] = value | |
| elif "drivers" in key and "bfm" in key: vs["drivers_bfm"] = value | |
| elif "drivers" in key and "afm" in key: vs["drivers_afm"] = value | |
| # βΆβΆ REPLACED: column mapping by headers | |
| def _extract_vehicle_registration_table(self, headers, rows, extracted, page_num): | |
| ch = [_canon_header(h) for h in (headers or [])] | |
| alias = _map_header_indices(headers or []) | |
| # header indices (may be misaligned vs data; that's OK, weβll search near them) | |
| def idx_of(*needles): | |
| for i, h in enumerate(ch): | |
| if all(n in h for n in needles): return i | |
| return None | |
| reg_i = alias.get("registration") or idx_of("registration number") or idx_of("registration") or idx_of("reg","no") | |
| rw_i = alias.get("roadworthiness") or idx_of("roadworthiness") | |
| maint_i = alias.get("maintenance_records") or idx_of("maintenance","records") | |
| daily_i = alias.get("daily_checks") or idx_of("daily","check") | |
| fr_i = alias.get("fault_recording") or idx_of("fault","recording") | |
| rep_i = alias.get("fault_repair") or idx_of("fault","repair") | |
| weight_i = alias.get("weight_verification") or idx_of("weight","verification") | |
| rfs_i = alias.get("rfs_certification") or idx_of("rfs","certification") | |
| susp_i = alias.get("suspension_maintenance") or idx_of("suspension","maintenance") | |
| trip_i = alias.get("trip_records") or idx_of("trip","records") | |
| frs_i = alias.get("fault_reporting_suspension") or idx_of("fault","reporting","suspension") | |
| # classify table type by header signals | |
| is_maint = any("roadworthiness" in h or "maintenance records" in h or ("daily" in h and "check" in h) or "fault repair" in h for h in ch) | |
| is_mass = any("weight verification" in h or "rfs" in h or "suspension system" in h or "trip records" in h or "reporting on suspension" in h for h in ch) | |
| maint_rows = extracted.setdefault("_maint_rows", []) if is_maint else None | |
| added = 0 | |
| for r in rows or []: | |
| # tolerant plate pick (handles misaligned columns) | |
| reg = self._pick_nearby(r, reg_i, "plate", window=4) | |
| if not reg or not looks_like_plate(reg): | |
| continue | |
| # collect values using tolerant picks | |
| if is_maint: | |
| rw = self._pick_nearby(r, rw_i, "date", window=4) | |
| mr = self._pick_nearby(r, maint_i, "date", window=4) | |
| dc = self._pick_nearby(r, daily_i, "date", window=4) | |
| fr = self._pick_nearby(r, fr_i, "date", window=4) | |
| rep = self._pick_nearby(r, rep_i, "date", window=4) | |
| # sensible fallbacks | |
| if not mr and dc: mr = dc | |
| if not rep and fr: rep = fr | |
| if not fr and rep: fr = rep | |
| else: # mass or mixed | |
| wv = self._pick_nearby(r, weight_i, "date", window=4) | |
| rfs = self._pick_nearby(r, rfs_i, "rf", window=5) | |
| sm = self._pick_nearby(r, susp_i, "date", window=4) | |
| tr = self._pick_nearby(r, trip_i, "date", window=4) | |
| frs = self._pick_nearby(r, frs_i, "date", window=4) | |
| yn1 = self._pick_nearby(r, idx_of("sub","contractor"), "yn", window=3) or "" | |
| yn2 = self._pick_nearby(r, idx_of("sub contracted vehicles statement of compliance"), "yn", window=3) or yn1 | |
| # merge into vehicle map | |
| v = self._vehicle_by_reg.get(reg) | |
| if v is None: | |
| v = {"registration": reg} | |
| self._vehicle_by_reg[reg] = v | |
| added += 1 | |
| if is_maint: | |
| v["seen_in_maintenance"] = True | |
| if rw: v.setdefault("roadworthiness", rw) | |
| if mr: v.setdefault("maintenance_records", mr) | |
| if dc: v.setdefault("daily_checks", dc) | |
| if fr: v.setdefault("fault_recording", fr) | |
| if rep: v.setdefault("fault_repair", rep) | |
| if maint_rows is not None: | |
| maint_rows.append({ | |
| "registration": reg, | |
| "roadworthiness": rw, | |
| "maintenance_records": mr or dc, | |
| "daily_checks": dc, | |
| "fault_recording": fr or rep, | |
| "fault_repair": rep or fr, | |
| }) | |
| else: | |
| v["seen_in_mass"] = True | |
| if yn1: v.setdefault("sub_contractor", yn1) | |
| if yn2: v.setdefault("sub_comp", yn2) | |
| if wv: v.setdefault("weight_verification", wv) | |
| if rfs: v.setdefault("rfs_certification", _smart_space(rfs).upper().replace(" ", "")) | |
| if sm: v.setdefault("suspension_maintenance", sm) | |
| if tr: v.setdefault("trip_records", tr) | |
| if frs: v.setdefault("fault_reporting_suspension", frs) | |
| extracted["vehicles"] = list(self._vehicle_by_reg.values()) | |
| return added | |
| def _extract_driver_table(self, headers: List[str], data_rows: List[List], extracted: Dict): | |
| """Enhanced header-driven extraction for Driver / Scheduler Records.""" | |
| drivers = [] | |
| self.log_debug(f"Driver table has {len(data_rows)} rows") | |
| # Skip header continuation rows - look for the first row that starts with a number | |
| actual_data_start = 0 | |
| for i, row in enumerate(data_rows): | |
| if row and str(row[0]).strip().startswith(('1.', '1')): | |
| actual_data_start = i | |
| self.log_debug(f"Found actual data starting at row {i}") | |
| break | |
| if actual_data_start == 0: | |
| self.log_debug("Warning: Could not find numbered data rows") | |
| # Process only the actual data rows (skip header continuation rows) | |
| for row_idx, row in enumerate(data_rows[actual_data_start:], start=actual_data_start): | |
| if not row: | |
| continue | |
| self.log_debug(f"Processing data row {row_idx}: {row}") | |
| # Check if this is a numbered row (1., 2., etc.) | |
| first_cell = str(row[0]).strip() | |
| if not (first_cell.endswith('.') and first_cell[:-1].isdigit()): | |
| self.log_debug(f"Skipping row {row_idx} - not a numbered data row") | |
| continue | |
| # Based on the raw data structure, extract from fixed positions | |
| name = None | |
| driver_tlif = "" | |
| scheduler_tlif = "" | |
| medical = "" | |
| roster = "" | |
| fit_duty = "" | |
| work_diary = "" | |
| # Look for name in columns around index 3-4 | |
| for i in range(2, min(6, len(row))): | |
| candidate = _smart_space(str(row[i]).strip()) | |
| if (candidate and | |
| len(candidate) > 3 and | |
| any(c.isalpha() for c in candidate) and | |
| candidate.lower() not in ['entry', 'n/a', 'yes', 'no', 'name'] and | |
| not candidate.isdigit() and | |
| not candidate.endswith('.')): | |
| name = candidate | |
| self.log_debug(f"Found name at column {i}: {name}") | |
| break | |
| if not name: | |
| self.log_debug(f"Skipping row {row_idx} - no valid name found") | |
| continue | |
| # Extract other fields from approximate positions based on raw data | |
| # Driver TLIF around column 6 | |
| for i in range(5, min(8, len(row))): | |
| val = str(row[i]).strip() | |
| if val and val.lower() in ['yes', 'no']: | |
| driver_tlif = val.title() | |
| break | |
| # Scheduler TLIF around column 9 | |
| for i in range(8, min(12, len(row))): | |
| val = str(row[i]).strip() | |
| if val and val.lower() in ['yes', 'no']: | |
| scheduler_tlif = val.title() | |
| break | |
| # Medical around column 12 | |
| for i in range(11, min(15, len(row))): | |
| val = _smart_space(str(row[i]).strip()) | |
| if val and val.lower() not in ['', 'entry']: | |
| medical = val | |
| break | |
| # Roster around column 15 | |
| for i in range(14, min(18, len(row))): | |
| val = _smart_space(str(row[i]).strip()) | |
| if val: | |
| roster = val | |
| break | |
| # Fit for Duty around column 18 | |
| for i in range(17, min(21, len(row))): | |
| val = str(row[i]).strip() | |
| if val and val.lower() in ['yes', 'no']: | |
| fit_duty = val.title() | |
| break | |
| # Work Diary around column 21 | |
| for i in range(20, min(len(row), 24)): | |
| val = _smart_space(str(row[i]).strip()) | |
| if val: | |
| work_diary = val | |
| break | |
| d = { | |
| "name": name, | |
| "driver_tlif": driver_tlif, | |
| "scheduler_tlif": scheduler_tlif, | |
| "medical_expiry": medical, | |
| "roster_schedule": roster, | |
| "fit_for_duty": fit_duty, | |
| "work_diary": work_diary | |
| } | |
| drivers.append(d) | |
| self.log_debug(f"Added driver: {d}") | |
| if drivers: | |
| extracted["drivers_detailed"] = drivers | |
| self.log_debug(f"Driver rows extracted: {len(drivers)}") | |
| else: | |
| self.log_debug("No drivers extracted") | |
| def _extract_management_table(self, data_rows: List[List], extracted: Dict, headers: List[str]): | |
| txt = " ".join(str(h) for h in headers).lower() | |
| comp = {} | |
| for row in data_rows: | |
| if len(row) < 2: continue | |
| std = str(row[0]).strip() | |
| val = _smart_space(str(row[1]).strip()) | |
| if std.startswith("Std") and val: | |
| comp[std] = val | |
| if comp: | |
| if "maintenance" in txt: extracted["maintenance_compliance"] = comp | |
| elif "mass" in txt: extracted["mass_compliance"] = comp | |
| elif "fatigue" in txt: extracted["fatigue_compliance"] = comp | |
| def _extract_text_content(self, text_pages: List[Dict], extracted: Dict) -> None: | |
| all_text = " ".join(page.get("text", "") for page in text_pages) | |
| all_text = _smart_space(all_text) | |
| # ------- Nature of business (positional, robust to collapsed newlines) ---------- | |
| heading_rx = re.compile(r"(Nature of the Operators? Business(?:\s*\(Summary\))?\s*[:\-]?)", flags=re.I) | |
| start_m = heading_rx.search(all_text) | |
| if start_m: | |
| start_idx = start_m.end() | |
| # Stop phrases (no newlines). Use word boundaries where appropriate so we don't | |
| # accidentally cut on substrings inside words. | |
| stop_phrases = [ | |
| r"\bAccreditation Vehicle Summary\b", | |
| r"\bACCREDITATION VEHICLE SUMMARY\b", | |
| r"\bAUDIT OBSERVATIONS\b", | |
| r"\bNHVAS AUDIT SUMMARY REPORT\b", | |
| r"\bPage\s+\d+\s+of\s+\d+\b", | |
| r"\bVehicle Registration Numbers\b", | |
| r"\bVehicle Registration Numbers of Records Examined\b", | |
| r"\bAUDIT SUMMARY REPORT\b", | |
| ] | |
| # Find earliest occurrence of any stop phrase after the heading | |
| next_idx = None | |
| for sp in stop_phrases: | |
| m = re.search(sp, all_text[start_idx:], flags=re.I) | |
| if m: | |
| idx = start_idx + m.start() | |
| if next_idx is None or idx < next_idx: | |
| next_idx = idx | |
| # Slice from end of heading to earliest stop phrase (or limit to a reasonable length) | |
| end_idx = next_idx if next_idx is not None else min(len(all_text), start_idx + 4000) | |
| candidate = all_text[start_idx:end_idx].strip() | |
| # Defensive trimming of trailing uppercase boilerplate or table header noise | |
| candidate = re.sub( | |
| r"(Mass and Maintenance Expiry Date:|ACCREDITATION DRIVER SUMMARY|ACCREDITATION VEHICLE SUMMARY|AUDIT OBSERVATIONS|NHVAS AUDIT SUMMARY REPORT|STD\s+\d+\.).*$", | |
| "", | |
| candidate, | |
| flags=re.I | re.DOTALL, | |
| ) | |
| candidate = re.sub(r"\s+", " ", candidate).strip() | |
| if 20 < len(candidate) < 5000: | |
| extracted["business_summary"] = candidate | |
| # Extract Accreditation Number / Expiry only if they appear inline in this small block | |
| m_acc = re.search(r"\bAccreditation\s*Number[:\s-]*([A-Za-z0-9\s\-\/]+)", candidate, flags=re.I) | |
| m_exp = re.search(r"\b(?:Mass and Maintenance\s+)?Expiry\s*Date[:\s-]*([A-Za-z0-9\s,\/\-\.]+)", candidate, flags=re.I) | |
| if m_acc: | |
| acc = re.sub(r"\s+", " ", m_acc.group(1)).strip() | |
| acc = re.sub(r"[^\d]", "", acc) or acc | |
| extracted.setdefault("business_summary_extras", {})["accreditation_number"] = acc | |
| if m_exp: | |
| exp = _fix_ocr_date_noise(m_exp.group(1).strip()) | |
| extracted.setdefault("business_summary_extras", {})["expiry_date"] = _smart_space(exp) | |
| # --- fallback (preserve previous behaviour for other templates) --- | |
| if "business_summary" not in extracted: | |
| patt = [ | |
| r"Nature of the Operators? Business.*?:\s*(.*?)(?:Accreditation Number|Expiry Date|$)", | |
| r"Nature of.*?Business.*?Summary.*?:\s*(.*?)(?:Accreditation|$)" | |
| ] | |
| for p in patt: | |
| m = re.search(p, all_text, re.IGNORECASE | re.DOTALL) | |
| if m: | |
| txt = re.sub(r'\s+', ' ', m.group(1).strip()) | |
| txt = re.sub(r'\s*(Accreditation Number.*|Expiry Date.*)', '', txt, flags=re.IGNORECASE) | |
| if len(txt) > 50: | |
| extracted["business_summary"] = txt | |
| break | |
| # --- audit conducted date (unchanged) --- | |
| for p in [ | |
| r"Audit was conducted on\s+([0-9]+(?:st|nd|rd|th)?\s+[A-Za-z]+\s+\d{4})", | |
| r"DATE\s+([0-9]+(?:st|nd|rd|th)?\s+[A-Za-z]+\s+\d{4})", | |
| r"AUDITOR SIGNATURE\s+DATE\s+([0-9]+(?:st|nd|rd|th)?\s+[A-Za-z]+\s+\d{4})" | |
| ]: | |
| m = re.search(p, all_text, re.IGNORECASE) | |
| if m: | |
| extracted["audit_conducted_date"] = _smart_space(m.group(1).strip()) | |
| break | |
| # --- print accreditation name (unchanged) --- | |
| for p in [ | |
| r"\(print accreditation name\)\s*([A-Za-z0-9\s&().,'/\-]+?)(?:\s+DOES|\s+does|\n|$)", | |
| r"print accreditation name.*?\n\s*([A-Za-z0-9\s&().,'/\-]+?)(?:\s+DOES|\s+does|\n|$)" | |
| ]: | |
| m = re.search(p, all_text, re.IGNORECASE) | |
| if m: | |
| extracted["print_accreditation_name"] = _smart_space(m.group(1).strip()) | |
| break | |
| # --- Vehicle/driver simple numbers (unchanged) --- | |
| for p in [ | |
| r"Number of powered vehicles\s+(\d+)", | |
| r"powered vehicles\s+(\d+)", | |
| r"Number of trailing vehicles\s+(\d+)", | |
| r"trailing vehicles\s+(\d+)", | |
| r"Number of drivers in BFM\s+(\d+)", | |
| r"drivers in BFM\s+(\d+)" | |
| ]: | |
| m = re.search(p, all_text, re.IGNORECASE) | |
| if m: | |
| val = m.group(1) | |
| if "powered" in p: | |
| extracted.setdefault("vehicle_summary", {})["powered_vehicles"] = val | |
| elif "trailing" in p: | |
| extracted.setdefault("vehicle_summary", {})["trailing_vehicles"] = val | |
| elif "bfm" in p.lower(): | |
| extracted.setdefault("vehicle_summary", {})["drivers_bfm"] = val | |
| def _extract_detailed_management_data(self, extracted_data: Dict, extracted: Dict): | |
| all_tables = extracted_data.get("all_tables", []) | |
| for table in all_tables: | |
| headers = table.get("headers", []) | |
| data_rows = table.get("data", []) | |
| page_num = table.get("page", 0) | |
| if self._has_details_column(headers): | |
| section = self._identify_management_section(headers) | |
| if section: | |
| self._extract_management_details(data_rows, extracted, section) | |
| elif 6 <= page_num <= 15: | |
| self._extract_summary_by_content(data_rows, headers, extracted, page_num) | |
| def _extract_summary_by_content(self, data_rows: List[List], headers: List[str], extracted: Dict, page_num: int): | |
| section_type = "maintenance" if 6 <= page_num <= 9 else "mass" if 10 <= page_num <= 12 else "fatigue" if 13 <= page_num <= 15 else None | |
| if not section_type: return | |
| details_key = f"{section_type}_summary_details" | |
| extracted[details_key] = {} | |
| for row in data_rows: | |
| if len(row) < 2: continue | |
| standard = str(row[0]).strip() | |
| details = _smart_space(str(row[1]).strip()) | |
| if standard.startswith("Std") and details and len(details) > 10: | |
| m = re.search(r"Std\s+(\d+)\.\s*([^(]+)", standard) | |
| if m: | |
| key = f"Std {m.group(1)}. {m.group(2).strip()}" | |
| extracted[details_key][key] = details | |
| def _has_details_column(self, headers: List[str]) -> bool: | |
| return "details" in " ".join(str(h) for h in headers).lower() | |
| def _identify_management_section(self, headers: List[str]) -> Optional[str]: | |
| txt = " ".join(str(h) for h in headers).lower() | |
| if "maintenance" in txt: return "maintenance" | |
| if "mass" in txt: return "mass" | |
| if "fatigue" in txt: return "fatigue" | |
| return None | |
| def _extract_management_details(self, data_rows: List[List], extracted: Dict, section: str): | |
| details_key = f"{section}_details" | |
| extracted[details_key] = {} | |
| for row in data_rows: | |
| if len(row) < 2: continue | |
| standard = str(row[0]).strip() | |
| details = _smart_space(str(row[1]).strip()) | |
| if standard.startswith("Std") and details and details != "V" and len(details) > 10: | |
| m = re.search(r"Std\s+\d+\.\s*([^(]+)", standard) | |
| if m: | |
| extracted[details_key][m.group(1).strip()] = details | |
| def _extract_vehicle_driver_data(self, extracted_data: Dict, extracted: Dict): | |
| vehicle_regs = extracted_data.get("vehicle_registrations", []) | |
| if vehicle_regs: | |
| extracted["vehicle_registrations"] = vehicle_regs | |
| driver_records = extracted_data.get("driver_records", []) | |
| if driver_records: | |
| extracted["driver_records"] = driver_records | |
| # Add this method inside your NHVASMerger class, with proper indentation | |
| # Place it after the _extract_vehicle_driver_data method | |
| def map_vehicle_registration_arrays(self, pdf_extracted: Dict, merged: Dict): | |
| """Extract and map vehicle registration data (Maintenance + Mass) to DOCX arrays.""" | |
| vehicles_src = [] | |
| # Prefer rows we parsed ourselves (header-based). Fall back to curated list if present. | |
| if "vehicles" in pdf_extracted and isinstance(pdf_extracted["vehicles"], list): | |
| vehicles_src = pdf_extracted["vehicles"] | |
| elif "vehicle_registrations" in pdf_extracted and isinstance(pdf_extracted["vehicle_registrations"], list): | |
| # Normalize curated structure (list of dicts with keys like 'registration_number', etc.) | |
| for row in pdf_extracted["vehicle_registrations"]: | |
| if not isinstance(row, dict): | |
| continue | |
| v = { | |
| "registration": _smart_space(row.get("registration_number") or row.get("registration") or ""), | |
| # Maintenance table columns (names as seen in curated JSON) | |
| "roadworthiness": _smart_space(row.get("roadworthiness_certificates", "")), | |
| "maintenance_records": _smart_space(row.get("maintenance_records", "")), | |
| "daily_checks": _smart_space(row.get("daily_checks", "")), | |
| "fault_recording": _smart_space(row.get("fault_recording_reporting", "")), | |
| "fault_repair": _smart_space(row.get("fault_repair", "")), | |
| # Mass table columns (in case the curated list ever includes them) | |
| "sub_contractor": _smart_space(row.get("sub_contractor", "")), | |
| "sub_comp": _smart_space(row.get("sub_contracted_vehicles_statement_of_compliance", "")), | |
| "weight_verification": _smart_space(row.get("weight_verification_records", "")), | |
| "rfs_certification": _smart_space(row.get("rfs_suspension_certification", row.get("rfs_suspension_certification_#", ""))), | |
| "suspension_maintenance": _smart_space(row.get("suspension_system_maintenance", "")), | |
| "trip_records": _smart_space(row.get("trip_records", "")), | |
| "fault_reporting_suspension": _smart_space(row.get("fault_recording_reporting_on_suspension_system", "")), | |
| } | |
| if v["registration"]: | |
| vehicles_src.append(v) | |
| if not vehicles_src: | |
| return # nothing to map | |
| # Build column arrays | |
| regs = [] | |
| roadworthiness = [] | |
| maint_records = [] | |
| daily_checks = [] | |
| fault_recording = [] | |
| fault_repair = [] | |
| sub_contractors = [] | |
| weight_verification = [] | |
| rfs_certification = [] | |
| suspension_maintenance = [] | |
| trip_records = [] | |
| fault_reporting_suspension = [] | |
| for v in vehicles_src: | |
| reg = _smart_space(v.get("registration", "")).strip() | |
| if not reg: | |
| continue | |
| regs.append(reg) | |
| roadworthiness.append(_smart_space(v.get("roadworthiness", "")).strip()) | |
| maint_records.append(_smart_space(v.get("maintenance_records", "")).strip()) | |
| daily_checks.append(_smart_space(v.get("daily_checks", "")).strip()) | |
| fault_recording.append(_smart_space(v.get("fault_recording", "")).strip()) | |
| fault_repair.append(_smart_space(v.get("fault_repair", "")).strip()) | |
| sub_contractors.append(_smart_space(v.get("sub_contractor", "")).strip()) | |
| weight_verification.append(_smart_space(v.get("weight_verification", "")).strip()) | |
| rfs_certification.append(_smart_space(v.get("rfs_certification", "")).strip()) | |
| suspension_maintenance.append(_smart_space(v.get("suspension_maintenance", "")).strip()) | |
| trip_records.append(_smart_space(v.get("trip_records", "")).strip()) | |
| fault_reporting_suspension.append(_smart_space(v.get("fault_reporting_suspension", "")).strip()) | |
| # Update Maintenance table arrays (if present in template) | |
| if "Vehicle Registration Numbers Maintenance" in merged and regs: | |
| m = merged["Vehicle Registration Numbers Maintenance"] | |
| m["Registration Number"] = regs | |
| m["Roadworthiness Certificates"] = roadworthiness | |
| m["Maintenance Records"] = maint_records | |
| m["Daily Checks"] = daily_checks | |
| m["Fault Recording/ Reporting"] = fault_recording | |
| m["Fault Repair"] = fault_repair | |
| # Update Mass table arrays (if present in template) | |
| if "Vehicle Registration Numbers Mass" in merged and regs: | |
| ms = merged["Vehicle Registration Numbers Mass"] | |
| ms["Registration Number"] = regs | |
| ms["Sub contractor"] = sub_contractors | |
| ms["Weight Verification Records"] = weight_verification | |
| ms["RFS Suspension Certification #"] = rfs_certification | |
| ms["Suspension System Maintenance"] = suspension_maintenance | |
| ms["Trip Records"] = trip_records | |
| ms["Fault Recording/ Reporting on Suspension System"] = fault_reporting_suspension | |
| self.log_debug(f"Updated vehicle registration arrays for {len(regs)} vehicles") | |
| # βββββββββββββββββββββββββββββ map to DOCX (apply spacing + safe fallbacks) βββββββββββββββββββββββββββββ | |
| def map_to_docx_structure(self, pdf_extracted: Dict, docx_data: Dict, pdf_data: Dict) -> Dict: | |
| merged = json.loads(json.dumps(docx_data)) | |
| # Audit Information | |
| if "audit_info" in pdf_extracted and "Audit Information" in merged: | |
| ai = pdf_extracted["audit_info"] | |
| if ai.get("date_of_audit"): | |
| merged["Audit Information"]["Date of Audit"] = [_smart_space(ai["date_of_audit"])] | |
| if ai.get("location"): | |
| merged["Audit Information"]["Location of audit"] = [_smart_space(ai["location"])] | |
| if ai.get("auditor_name"): | |
| merged["Audit Information"]["Auditor name"] = [_smart_space(ai["auditor_name"])] | |
| if ai.get("matrix_id"): | |
| merged["Audit Information"]["Audit Matrix Identifier (Name or Number)"] = [_smart_space(ai["matrix_id"])] | |
| # Operator Information | |
| if "operator_info" in pdf_extracted and "Operator Information" in merged: | |
| op = pdf_extracted["operator_info"] | |
| if op.get("name") and _looks_like_company(op["name"]): | |
| merged["Operator Information"]["Operator name (Legal entity)"] = [_smart_space(op["name"])] | |
| if op.get("trading_name"): | |
| merged["Operator Information"]["Registered trading name/s"] = [_smart_space(op["trading_name"])] | |
| if op.get("acn"): | |
| merged["Operator Information"]["Australian Company Number"] = [_smart_space(op["acn"])] | |
| if op.get("manual"): | |
| merged["Operator Information"]["NHVAS Manual (Policies and Procedures) developed by"] = [_smart_space(op["manual"])] | |
| # Contact details | |
| if "operator_info" in pdf_extracted and "Operator contact details" in merged: | |
| op = pdf_extracted["operator_info"] | |
| if op.get("business_address"): | |
| merged["Operator contact details"]["Operator business address"] = [_smart_space(op["business_address"])] | |
| if op.get("postal_address"): | |
| merged["Operator contact details"]["Operator Postal address"] = [_smart_space(op["postal_address"])] | |
| if op.get("email"): | |
| merged["Operator contact details"]["Email address"] = [op["email"]] | |
| if op.get("phone"): | |
| merged["Operator contact details"]["Operator Telephone Number"] = [_smart_space(op["phone"])] | |
| # Attendance - Modified logic | |
| if "attendance" in pdf_extracted and "Attendance List (Names and Position Titles)" in merged: | |
| # Get expected count from DOCX template | |
| docx_attendance = merged["Attendance List (Names and Position Titles)"]["Attendance List (Names and Position Titles)"] | |
| expected_count = len(docx_attendance) if isinstance(docx_attendance, list) else 1 | |
| # Get PDF attendance (already processed by existing extraction) | |
| pdf_attendance_raw = pdf_extracted["attendance"] | |
| # The PDF might have combined entries like "Name1 - Position1 Name2 - Position2" | |
| # Split them properly | |
| separated_attendance = [] | |
| for entry in pdf_attendance_raw: | |
| # Handle combined entries like "Grant Pontifex - Manager Jodie Jones - Auditor" | |
| if " - " in entry: | |
| # Try to split by pattern: Position followed by Name | |
| import re | |
| # Look for pattern: Name - Position Name - Position | |
| match = re.search(r'([A-Z][a-z]+\s+[A-Z][a-z]+)\s*-\s*([A-Z][a-z]+)\s+([A-Z][a-z]+\s+[A-Z][a-z]+)\s*-\s*([A-Z][a-z]+)', entry) | |
| if match: | |
| person1 = f"{match.group(1)} - {match.group(2)}" | |
| person2 = f"{match.group(3)} - {match.group(4)}" | |
| separated_attendance.extend([person1, person2]) | |
| else: | |
| # If pattern doesn't match, keep as single entry | |
| separated_attendance.append(entry) | |
| else: | |
| separated_attendance.append(entry) | |
| # Limit to expected count | |
| final_attendance = separated_attendance[:expected_count] | |
| self.log_debug(f"Attendance: DOCX expects {expected_count}, PDF has {len(separated_attendance)}, using: {final_attendance}") | |
| merged["Attendance List (Names and Position Titles)"]["Attendance List (Names and Position Titles)"] = _clean_list(final_attendance) | |
| # Business summary | |
| if "business_summary" in pdf_extracted and "Nature of the Operators Business (Summary)" in merged: | |
| # Clean the main text by removing either inline date pattern | |
| business_text = pdf_extracted["business_summary"] | |
| clean_text = re.sub(r"\s*(?:Mass and Maintenance\s+)?Expiry Date[:\s]*[A-Za-z0-9\s,\/\-\.]+", "", business_text, flags=re.I) | |
| merged["Nature of the Operators Business (Summary)"]["Nature of the Operators Business (Summary):"] = [_smart_space(clean_text)] | |
| # Override with extracted inline values | |
| extras = pdf_extracted.get("business_summary_extras", {}) | |
| if extras.get("expiry_date"): | |
| merged["Nature of the Operators Business (Summary)"]["Expiry Date"] = [extras["expiry_date"]] | |
| if extras.get("accreditation_number"): | |
| merged["Nature of the Operators Business (Summary)"]["Accreditation Number"] = [extras["accreditation_number"]] | |
| # Vehicle summary | |
| if "vehicle_summary" in pdf_extracted: | |
| vs = pdf_extracted["vehicle_summary"] | |
| if "Accreditation Vehicle Summary" in merged: | |
| if vs.get("powered_vehicles"): | |
| merged["Accreditation Vehicle Summary"]["Number of powered vehicles"] = [vs["powered_vehicles"]] | |
| if vs.get("trailing_vehicles"): | |
| merged["Accreditation Vehicle Summary"]["Number of trailing vehicles"] = [vs["trailing_vehicles"]] | |
| if "Accreditation Driver Summary" in merged: | |
| if vs.get("drivers_bfm"): | |
| merged["Accreditation Driver Summary"]["Number of drivers in BFM"] = [vs["drivers_bfm"]] | |
| if vs.get("drivers_afm"): | |
| merged["Accreditation Driver Summary"]["Number of drivers in AFM"] = [vs["drivers_afm"]] | |
| # Summary sections (unchanged behavior) | |
| summary_maps = self.build_summary_maps(pdf_data) | |
| for section_name, std_map in summary_maps.items(): | |
| if section_name in merged and std_map: | |
| for detail_key, details_list in std_map.items(): | |
| if detail_key in merged[section_name]: | |
| merged[section_name][detail_key] = details_list | |
| continue | |
| for docx_key in list(merged[section_name].keys()): | |
| m1 = re.search(r"Std\s+(\d+)", detail_key) | |
| m2 = re.search(r"Std\s+(\d+)", docx_key) | |
| if m1 and m2 and m1.group(1) == m2.group(1): | |
| merged[section_name][docx_key] = details_list | |
| break | |
| # Vehicle registration arrays via consolidated builder | |
| sections = build_vehicle_sections(pdf_extracted) | |
| if "Vehicle Registration Numbers Maintenance" in merged: | |
| merged["Vehicle Registration Numbers Maintenance"].update( | |
| sections["Vehicle Registration Numbers Maintenance"] | |
| ) | |
| if "Vehicle Registration Numbers Mass" in merged: | |
| merged["Vehicle Registration Numbers Mass"].update( | |
| sections["Vehicle Registration Numbers Mass"] | |
| ) | |
| # Complete driver mapping - add these lines: | |
| if "drivers_detailed" in pdf_extracted and "Driver / Scheduler Records Examined" in merged: | |
| drivers = pdf_extracted["drivers_detailed"] | |
| # Map ALL the driver fields | |
| merged["Driver / Scheduler Records Examined"]["Driver / Scheduler Name"] = [d.get("name","") for d in drivers] | |
| merged["Driver / Scheduler Records Examined"]["Driver TLIF Course # Completed"] = [d.get("driver_tlif","") for d in drivers] | |
| merged["Driver / Scheduler Records Examined"]["Scheduler TLIF Course # Completed"] = [d.get("scheduler_tlif","") for d in drivers] | |
| merged["Driver / Scheduler Records Examined"]["Medical Certificates (Current Yes/No) Date of expiry"] = [d.get("medical_expiry","") for d in drivers] | |
| merged["Driver / Scheduler Records Examined"]["Roster / Schedule / Safe Driving Plan (Date Range)"] = [d.get("roster_schedule","") for d in drivers] | |
| merged["Driver / Scheduler Records Examined"]["Fit for Duty Statement Completed (Yes/No)"] = [d.get("fit_for_duty","") for d in drivers] | |
| merged["Driver / Scheduler Records Examined"]["Work Diary Pages (Page Numbers) Electronic Work Diary Records (Date Range)"] = [d.get("work_diary","") for d in drivers] | |
| # --- Print accreditation name (robust, no UnboundLocalError) --- | |
| if "Print accreditation name" in merged: | |
| acc_name = "" # init | |
| acc_name = _smart_space(pdf_extracted.get("print_accreditation_name") or "") | |
| if not acc_name: | |
| oi = pdf_extracted.get("operator_info") or {} | |
| acc_name = _smart_space(oi.get("name") or "") or _smart_space(oi.get("trading_name") or "") | |
| if acc_name: | |
| merged["Print accreditation name"]["(print accreditation name)"] = [acc_name] | |
| # Audit Declaration dates: prefer explicit extracted date; fallback to audit_info; ignore literal "Date" | |
| if "Audit Declaration dates" in merged: | |
| def _real_date(s: Optional[str]) -> bool: | |
| return bool(s and re.search(r"\d", s) and not re.fullmatch(r"date", s.strip(), re.I)) | |
| val = pdf_extracted.get("audit_conducted_date") | |
| if not _real_date(val): | |
| val = (pdf_extracted.get("audit_info", {}) or {}).get("date_of_audit") | |
| if _real_date(val): | |
| merged["Audit Declaration dates"]["Audit was conducted on"] = [_smart_space(val)] | |
| # Operator Declaration: page 22 image missing β derive from first Attendance "Name - Title" | |
| if "Operator Declaration" in merged: | |
| # If an explicit operator declaration exists, use it | |
| if "operator_declaration" in pdf_extracted: | |
| od = pdf_extracted["operator_declaration"] | |
| pn = _smart_space(od.get("print_name", "")) | |
| pt = _smart_space(od.get("position_title", "")) | |
| if pn: | |
| merged["Operator Declaration"]["Print Name"] = [pn] | |
| if pt: | |
| merged["Operator Declaration"]["Position Title"] = [pt] | |
| else: | |
| # Fallback: first "Name - Title" from Attendance | |
| nt = self._first_attendance_name_title(pdf_extracted.get("attendance", [])) | |
| if nt: | |
| merged["Operator Declaration"]["Print Name"] = [nt[0]] | |
| merged["Operator Declaration"]["Position Title"] = [nt[1]] | |
| # Paragraphs: fill company name for the 3 management headings; set the 2 dates | |
| if "paragraphs" in merged: | |
| paras = merged["paragraphs"] | |
| audit_date = ( | |
| pdf_extracted.get("audit_conducted_date") | |
| or pdf_extracted.get("audit_info", {}).get("date_of_audit") | |
| ) | |
| # Prefer accreditation name, else operator legal name, else trading name | |
| company_name = ( | |
| _smart_space(pdf_extracted.get("print_accreditation_name") or "") | |
| or _smart_space(pdf_extracted.get("operator_info", {}).get("name") or "") | |
| or _smart_space(pdf_extracted.get("operator_info", {}).get("trading_name") or "") | |
| ) | |
| # Update the three layered headings | |
| for key in ("MAINTENANCE MANAGEMENT", "MASS MANAGEMENT", "FATIGUE MANAGEMENT"): | |
| if key in paras and company_name: | |
| paras[key] = [company_name] | |
| # Second-last page: date under page heading | |
| if "NHVAS APPROVED AUDITOR DECLARATION" in paras and audit_date: | |
| paras["NHVAS APPROVED AUDITOR DECLARATION"] = [_smart_space(audit_date)] | |
| # Last page: date under long acknowledgement paragraph | |
| ack_key = ("I hereby acknowledge and agree with the findings detailed in this NHVAS Audit Summary Report. " | |
| "I have read and understand the conditions applicable to the Scheme, including the NHVAS Business Rules and Standards.") | |
| if ack_key in paras and audit_date: | |
| paras[ack_key] = [_smart_space(audit_date)] | |
| self._force_fill_maintenance_from_tables(pdf_data, merged) | |
| return merged | |
| # βββββββββββββββββββββββββββββ merge & CLI (unchanged) βββββββββββββββββββββββββββββ | |
| def merge_pdf_to_docx(self, docx_data: Dict, pdf_data: Dict) -> Dict: | |
| self.log_debug("Starting comprehensive PDF extraction...") | |
| pdf_extracted = self.extract_from_pdf_comprehensive(pdf_data) | |
| self.log_debug(f"Extracted PDF data keys: {list(pdf_extracted.keys())}") | |
| self.log_debug("Mapping to DOCX structure...") | |
| merged_data = self.map_to_docx_structure(pdf_extracted, docx_data, pdf_data) | |
| for section_name, section_data in docx_data.items(): | |
| if isinstance(section_data, dict): | |
| for label in section_data: | |
| if (section_name in merged_data and | |
| label in merged_data[section_name] and | |
| merged_data[section_name][label] != docx_data[section_name][label]): | |
| print(f"β Updated {section_name}.{label}: {merged_data[section_name][label]}") | |
| return merged_data | |
| def process_files(self, docx_file: str, pdf_file: str, output_file: str): | |
| try: | |
| print(f"Loading DOCX JSON from: {docx_file}") | |
| with open(docx_file, 'r', encoding='utf-8') as f: | |
| docx_data = json.load(f) | |
| print(f"Loading PDF JSON from: {pdf_file}") | |
| with open(pdf_file, 'r', encoding='utf-8') as f: | |
| pdf_data = json.load(f) | |
| print("Merging PDF data into DOCX structure...") | |
| merged_data = self.merge_pdf_to_docx(docx_data, pdf_data) | |
| print(f"Saving merged data to: {output_file}") | |
| with open(output_file, 'w', encoding='utf-8') as f: | |
| json.dump(merged_data, f, indent=2, ensure_ascii=False) | |
| print("β Merge completed successfully!") | |
| return merged_data | |
| except Exception as e: | |
| print(f"β Error processing files: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| raise | |
| def main(): | |
| if len(sys.argv) != 4: | |
| print("Usage: python nhvas_merger.py <docx_json_file> <pdf_json_file> <output_file>") | |
| print("Example: python nhvas_merger.py docx_template.json pdf_extracted.json merged_output.json") | |
| sys.exit(1) | |
| docx_file = sys.argv[1] | |
| pdf_file = sys.argv[2] | |
| output_file = sys.argv[3] | |
| for file_path in [docx_file, pdf_file]: | |
| if not Path(file_path).exists(): | |
| print(f"β File not found: {file_path}") | |
| sys.exit(1) | |
| merger = NHVASMerger() | |
| merger.process_files(docx_file, pdf_file, output_file) | |
| if __name__ == "__main__": | |
| main() |