#!/usr/bin/env python3 """ extract_red_text.py Hardened version: preserves original logic/prints while improving header-label mapping, robustness to missing hf_utils and better synonym handling for vehicle tables. """ import re import json import sys from docx import Document from docx.oxml.ns import qn # Try to reuse your hf_utils if available (non-breaking); otherwise fall back to local helpers. try: from hf_utils import ( is_red_font, normalize_text, normalize_header_text, get_clean_text, ) except Exception: # Minimal compatible fallbacks if hf_utils is not present. def normalize_text(s: str) -> str: if not s: return "" s = re.sub(r"\u2013|\u2014", "-", s) # smart dashes s = re.sub(r"[^\w\s\-\&\(\)\/:]", " ", s) # keep a small set of punctuation s = re.sub(r"\s+", " ", s).strip() return s def normalize_header_text(s: str) -> str: return normalize_text(s).upper() def is_red_font(run): """Best-effort red detection fallback for when hf_utils isn't available.""" try: col = getattr(run.font, "color", None) if col and getattr(col, "rgb", None): rgb = col.rgb r, g, b = rgb[0], rgb[1], rgb[2] if r > 150 and g < 120 and b < 120 and (r - max(g, b)) > 30: return True except Exception: pass # fallback to xml check try: rPr = getattr(run._element, "rPr", None) if rPr is not None: clr = rPr.find(qn('w:color')) if clr is not None: val = clr.get(qn('w:val')) if val and re.fullmatch(r"[0-9A-Fa-f]{6}", val): rr, gg, bb = int(val[:2], 16), int(val[2:4], 16), int(val[4:], 16) if rr > 150 and gg < 120 and bb < 120 and (rr - max(gg, bb)) > 30: return True except Exception: pass return False def get_clean_text(elem): return "".join(node.text for node in elem.iter() if node.tag.endswith("}t") and node.text).strip() # Import master schemas and patterns (your file) from master_key import TABLE_SCHEMAS, HEADING_PATTERNS, PARAGRAPH_PATTERNS # --------------------------------------------------------------------- # Low-level helpers (kept and hardened) # --------------------------------------------------------------------- def _prev_para_text(tbl): """Get text from previous paragraph before table""" prev = tbl._tbl.getprevious() while prev is not None and not prev.tag.endswith("}p"): prev = prev.getprevious() if prev is None: return "" return "".join(node.text for node in prev.iter() if node.tag.endswith("}t") and node.text).strip() def get_table_context(tbl): """Return structured context for a table""" heading = normalize_text(_prev_para_text(tbl)) headers = [normalize_text(c.text) for c in tbl.rows[0].cells if c.text.strip()] if tbl.rows else [] col0 = [normalize_text(r.cells[0].text) for r in tbl.rows if r.cells and r.cells[0].text.strip()] first_cell = normalize_text(tbl.rows[0].cells[0].text) if tbl.rows else "" all_cells = [] for row in tbl.rows: for cell in row.cells: t = normalize_text(cell.text) if t: all_cells.append(t) return { "heading": heading, "headers": headers, "col0": col0, "first_cell": first_cell, "all_cells": all_cells, "num_rows": len(tbl.rows), "num_cols": len(tbl.rows[0].cells) if tbl.rows else 0, } def fuzzy_match_heading(heading, patterns): """Return True if heading fuzzy-matches any regex patterns""" if not heading: return False heading_norm = heading.upper() for pattern in patterns: try: if re.search(pattern, heading_norm, re.IGNORECASE): return True except re.error: if pattern.upper() in heading_norm: return True return False # --------------------------------------------------------------------- # Header-to-label synonym map: improved coverage for common OCR/header variants # --------------------------------------------------------------------- HEADER_SYNONYMS = { # normalized header (upper) -> canonical label in TABLE_SCHEMAS "NO": "No.", "NO.": "No.", "REG NO": "Registration Number", "REGISTRATIONNO": "Registration Number", "REGISTRATION NUMBER": "Registration Number", "REGISTRATION": "Registration Number", "PRINT NAME": "Print Name", "NHVR OR EXEMPLAR GLOBAL AUDITOR REGISTRATION NUMBER": "NHVR or Exemplar Global Auditor Registration Number", "ROADWORTHINESS CERTIFICATES": "Roadworthiness Certificates", "ROADWORTHINESS CERTIFICATES (APPLICABLE FOR ENTRY AUDIT)": "Roadworthiness Certificates", "MAINTENANCE RECORDS": "Maintenance Records", "DAILY CHECKS": "Daily Checks", "FAULT RECORDING/ REPORTING": "Fault Recording/ Reporting", "FAULT RECORDING/REPORTING": "Fault Recording/ Reporting", "FAULT REPAIR": "Fault Repair", "WEIGHT VERIFICATION RECORDS": "Weight Verification Records", "RFS SUSPENSION CERTIFICATION #": "RFS Suspension Certification #", "SUSPENSION SYSTEM MAINTENANCE": "Suspension System Maintenance", "TRIP RECORDS": "Trip Records", "FAULT RECORDING/ REPORTING ON SUSPENSION SYSTEM": "Fault Recording/ Reporting", # short forms "REG NO.": "Registration Number", "REGISTRATION #": "Registration Number", } def map_header_to_label(header_text, labels): """ Given a header_text (raw) and list of candidate labels (from schema), return the best matching label or None. """ if not header_text: return None hnorm = normalize_header_text(header_text) # exact synonym map for key, lab in HEADER_SYNONYMS.items(): if key in hnorm: # ensure lab exists in candidate labels (case-insensitive) for cand in labels: if normalize_header_text(cand) == normalize_header_text(lab): return cand # if it isn't in labels, still return the lab (labels sometimes omit punctuation) return lab # try exact match to any candidate label for cand in labels: if normalize_header_text(cand) == hnorm: return cand # token overlap scoring (flexible) header_words = [w for w in re.split(r"\W+", header_text) if len(w) > 2] best = (None, 0.0) for cand in labels: cand_words = [w for w in re.split(r"\W+", cand) if len(w) > 2] if not cand_words or not header_words: continue common = set(w.upper() for w in header_words).intersection(set(w.upper() for w in cand_words)) score = len(common) / max(1, max(len(header_words), len(cand_words))) if score > best[1]: best = (cand, score) # lower threshold for vehicle tables / noisy OCR (accept >= 0.25) if best[1] >= 0.25: return best[0] return None # --------------------------------------------------------------------- # Matching / scoring logic (keeps original heuristics) # --------------------------------------------------------------------- def calculate_schema_match_score(schema_name, spec, context): score = 0 reasons = [] # Vehicle registration boost if "Vehicle Registration" in schema_name: vehicle_keywords = ["registration", "vehicle", "sub-contractor", "weight verification", "rfs suspension"] table_text = " ".join(context["headers"]).lower() + " " + context["heading"].lower() keyword_matches = sum(1 for k in vehicle_keywords if k in table_text) if keyword_matches >= 2: score += 150 reasons.append(f"Vehicle Registration keywords: {keyword_matches}/5") elif keyword_matches >= 1: score += 75 reasons.append(f"Some Vehicle Registration keywords: {keyword_matches}/5") # Summary boost if "Summary" in schema_name and "details" in " ".join(context["headers"]).lower(): score += 100 reasons.append("Summary schema with DETAILS column - perfect match") if "Summary" not in schema_name and "details" in " ".join(context["headers"]).lower(): score -= 75 reasons.append("Non-summary schema penalized for DETAILS column presence") # context exclusions & keywords if spec.get("context_exclusions"): table_text = " ".join(context["headers"]).lower() + " " + context["heading"].lower() for exc in spec["context_exclusions"]: if exc.lower() in table_text: score -= 50 reasons.append(f"Context exclusion penalty: '{exc}'") if spec.get("context_keywords"): table_text = " ".join(context["headers"]).lower() + " " + context["heading"].lower() matches = sum(1 for kw in spec["context_keywords"] if kw.lower() in table_text) if matches: score += matches * 15 reasons.append(f"Context keyword matches: {matches}/{len(spec['context_keywords'])}") # direct first-cell match if context["first_cell"] and context["first_cell"].upper() == schema_name.upper(): score += 100 reasons.append(f"Direct first cell match: '{context['first_cell']}'") # heading pattern if spec.get("headings"): for h in spec["headings"]: if isinstance(h, dict): text = h.get("text", "") else: text = h if fuzzy_match_heading(context["heading"], [text]): score += 50 reasons.append(f"Heading match: '{context['heading']}'") break # columns matching if spec.get("columns"): cols = [normalize_text(c) for c in spec["columns"]] matches = 0 for col in cols: if any(col.upper() in h.upper() for h in context["headers"]): matches += 1 if matches == len(cols): score += 60 reasons.append(f"All column headers match: {cols}") elif matches > 0: score += matches * 20 reasons.append(f"Partial column matches: {matches}/{len(cols)}") # left orientation if spec.get("orientation") == "left": labels = [normalize_text(lbl) for lbl in spec.get("labels", [])] matches = 0 for lbl in labels: if any(lbl.upper() in c.upper() or c.upper() in lbl.upper() for c in context["col0"]): matches += 1 if matches > 0: score += (matches / max(1, len(labels))) * 30 reasons.append(f"Left orientation label matches: {matches}/{len(labels)}") # row1 orientation elif spec.get("orientation") == "row1": labels = [normalize_text(lbl) for lbl in spec.get("labels", [])] matches = 0 for lbl in labels: if any(lbl.upper() in h.upper() or h.upper() in lbl.upper() for h in context["headers"]): matches += 1 elif any(word.upper() in " ".join(context["headers"]).upper() for word in lbl.split() if len(word) > 3): matches += 0.5 if matches > 0: score += (matches / max(1, len(labels))) * 40 reasons.append(f"Row1 orientation header matches: {matches}/{len(labels)}") # Declarations special cases if schema_name == "Operator Declaration" and context["first_cell"].upper().startswith("PRINT"): if "OPERATOR DECLARATION" in context["heading"].upper(): score += 80 reasons.append("Operator Declaration context match") elif any("MANAGER" in cell.upper() for cell in context["all_cells"]): score += 60 reasons.append("Manager found in cells (likely Operator Declaration)") if schema_name == "NHVAS Approved Auditor Declaration" and context["first_cell"].upper().startswith("PRINT"): if any("MANAGER" in cell.upper() for cell in context["all_cells"]): score -= 50 reasons.append("Penalty: Manager found (not auditor)") return score, reasons def match_table_schema(tbl): context = get_table_context(tbl) best_match = None best_score = 0 for name, spec in TABLE_SCHEMAS.items(): score, reasons = calculate_schema_match_score(name, spec, context) if score > best_score: best_score = score best_match = name if best_score >= 20: return best_match return None # --------------------------------------------------------------------- # Multi-schema detection & extraction (keeps original behavior) # --------------------------------------------------------------------- def check_multi_schema_table(tbl): context = get_table_context(tbl) operator_labels = [ "Operator name (Legal entity)", "NHVAS Accreditation No.", "Registered trading name/s", "Australian Company Number", "NHVAS Manual" ] contact_labels = ["Operator business address", "Operator Postal address", "Email address", "Operator Telephone Number"] has_operator = any(any(op_lbl.upper() in cell.upper() for op_lbl in operator_labels) for cell in context["col0"]) has_contact = any(any(cont_lbl.upper() in cell.upper() for cont_lbl in contact_labels) for cell in context["col0"]) if has_operator and has_contact: return ["Operator Information", "Operator contact details"] return None def extract_multi_schema_table(tbl, schemas): result = {} for schema_name in schemas: if schema_name not in TABLE_SCHEMAS: continue spec = TABLE_SCHEMAS[schema_name] schema_data = {} for ri, row in enumerate(tbl.rows): if ri == 0: continue row_label = normalize_text(row.cells[0].text) belongs_to_schema = False matched_label = None for spec_label in spec.get("labels", []): spec_norm = normalize_text(spec_label).upper() row_norm = row_label.upper() if spec_norm == row_norm or spec_norm in row_norm or row_norm in spec_norm: belongs_to_schema = True matched_label = spec_label break if not belongs_to_schema: continue for ci, cell in enumerate(row.cells): red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip() if red_txt: schema_data.setdefault(matched_label, []) if red_txt not in schema_data[matched_label]: schema_data[matched_label].append(red_txt) if schema_data: result[schema_name] = schema_data return result # --------------------------------------------------------------------- # Extraction: special-case for Vehicle Registration tables (row1) and generic fallback # --------------------------------------------------------------------- def extract_table_data(tbl, schema_name, spec): # Vehicle registration special handling if "Vehicle Registration" in schema_name: print(f" 🚗 EXTRACTION FIX: Processing Vehicle Registration table") labels = spec.get("labels", []) collected = {lbl: [] for lbl in labels} seen = {lbl: set() for lbl in labels} if len(tbl.rows) < 2: print(" ❌ Vehicle table has less than 2 rows") return {} header_row = tbl.rows[0] column_mapping = {} print(f" 📋 Mapping {len(header_row.cells)} header cells to labels") for col_idx, cell in enumerate(header_row.cells): header_text = normalize_text(cell.text).strip() if not header_text: continue print(f" Column {col_idx}: '{header_text}'") mapped = map_header_to_label(header_text, labels) if mapped: # find exact candidate label string (preserve original label spelling if possible) chosen = None for cand in labels: if normalize_header_text(cand) == normalize_header_text(mapped): chosen = cand break column_mapping[col_idx] = chosen or mapped print(f" ✅ Mapped to: '{column_mapping[col_idx]}'") else: # fallback: try fuzzy token overlap directly with candidate labels best = None best_score = 0.0 hwords = [w for w in re.split(r"\W+", header_text) if len(w) > 2] for cand in labels: cwords = [w for w in re.split(r"\W+", cand) if len(w) > 2] if not cwords or not hwords: continue common = set(w.upper() for w in hwords).intersection(set(w.upper() for w in cwords)) score = len(common) / max(1, max(len(hwords), len(cwords))) if score > best_score: best = cand best_score = score if best and best_score >= 0.25: column_mapping[col_idx] = best print(f" ✅ Fuzzy-mapped to: '{best}' (score: {best_score:.2f})") else: print(f" ⚠️ No mapping found for '{header_text}'") print(f" 📊 Total column mappings: {len(column_mapping)}") # Extract red text from data rows for row_idx in range(1, len(tbl.rows)): row = tbl.rows[row_idx] print(f" 📌 Processing data row {row_idx}") for col_idx, cell in enumerate(row.cells): if col_idx in column_mapping: label = column_mapping[col_idx] red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip() if red_txt: print(f" 🔴 Found red text in '{label}': '{red_txt}'") if red_txt not in seen.setdefault(label, set()): seen[label].add(red_txt) collected.setdefault(label, []).append(red_txt) result = {k: v for k, v in collected.items() if v} print(f" ✅ Vehicle Registration extracted: {len(result)} columns with data") return result # Generic fallback extraction logic labels = spec.get("labels", []) + [schema_name] collected = {lbl: [] for lbl in labels} seen = {lbl: set() for lbl in labels} by_col = (spec.get("orientation") == "row1") start_row = 1 if by_col else 0 rows = tbl.rows[start_row:] for ri, row in enumerate(rows): for ci, cell in enumerate(row.cells): red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip() if not red_txt: continue if by_col: if ci < len(spec.get("labels", [])): lbl = spec["labels"][ci] else: lbl = schema_name else: raw_label = normalize_text(row.cells[0].text) lbl = None for spec_label in spec.get("labels", []): if normalize_text(spec_label).upper() == raw_label.upper(): lbl = spec_label break if not lbl: for spec_label in spec.get("labels", []): spec_norm = normalize_text(spec_label).upper() raw_norm = raw_label.upper() if spec_norm in raw_norm or raw_norm in spec_norm: lbl = spec_label break if not lbl: lbl = schema_name if red_txt not in seen.setdefault(lbl, set()): seen[lbl].add(red_txt) collected.setdefault(lbl, []).append(red_txt) return {k: v for k, v in collected.items() if v} # --------------------------------------------------------------------- # Main extraction: process all tables then paragraphs # --------------------------------------------------------------------- def extract_red_text(input_doc): if isinstance(input_doc, str): doc = Document(input_doc) else: doc = input_doc out = {} table_count = 0 for tbl in doc.tables: table_count += 1 multi_schemas = check_multi_schema_table(tbl) if multi_schemas: multi_data = extract_multi_schema_table(tbl, multi_schemas) for schema_name, schema_data in multi_data.items(): if schema_data: # merge safely and dedupe existing = out.get(schema_name, {}) for k, v in schema_data.items(): existing.setdefault(k, []) for val in v: if val not in existing[k]: existing[k].append(val) out[schema_name] = existing continue schema = match_table_schema(tbl) if not schema: continue spec = TABLE_SCHEMAS[schema] data = extract_table_data(tbl, schema, spec) if data: existing = out.get(schema, {}) for k, v in data.items(): existing.setdefault(k, []) for val in v: if val not in existing[k]: existing[k].append(val) out[schema] = existing # Paragraph red-text extraction with context paras = {} for idx, para in enumerate(doc.paragraphs): red_txt = "".join(r.text for r in para.runs if is_red_font(r)).strip() if not red_txt: continue # find a heading context by scanning backwards context = None for j in range(idx - 1, -1, -1): txt = normalize_text(doc.paragraphs[j].text) if txt: patterns = HEADING_PATTERNS["main"] + HEADING_PATTERNS["sub"] if any(re.search(p, txt, re.IGNORECASE) for p in patterns): context = txt break # special-case date-like lines if not context and re.fullmatch(PARAGRAPH_PATTERNS.get("date_line", r".*"), red_txt): context = "Date" if not context: context = "(para)" paras.setdefault(context, []) if red_txt not in paras[context]: paras[context].append(red_txt) if paras: out["paragraphs"] = paras return out # --------------------------------------------------------------------- # File wrapper to support your existing calls # --------------------------------------------------------------------- def extract_red_text_filelike(input_file, output_file): if hasattr(input_file, "seek"): input_file.seek(0) doc = Document(input_file) result = extract_red_text(doc) if hasattr(output_file, "write"): json.dump(result, output_file, indent=2, ensure_ascii=False) output_file.flush() else: with open(output_file, "w", encoding="utf-8") as f: json.dump(result, f, indent=2, ensure_ascii=False) return result # --------------------------------------------------------------------- # CLI entrypoint (same as before) # --------------------------------------------------------------------- if __name__ == "__main__": if len(sys.argv) == 3: input_docx = sys.argv[1] output_json = sys.argv[2] doc = Document(input_docx) word_data = extract_red_text(doc) # write file (dedupe already handled in merging logic above) with open(output_json, "w", encoding="utf-8") as f: json.dump(word_data, f, indent=2, ensure_ascii=False) print(json.dumps(word_data, indent=2, ensure_ascii=False)) else: print("To use as a module: extract_red_text_filelike(input_file, output_file)")