Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| extract_red_text.py | |
| Hardened version: preserves original logic/prints while improving header-label mapping, | |
| robustness to missing hf_utils and better synonym handling for vehicle tables. | |
| """ | |
| import re | |
| import json | |
| import sys | |
| from docx import Document | |
| from docx.oxml.ns import qn | |
| # Try to reuse your hf_utils if available (non-breaking); otherwise fall back to local helpers. | |
| try: | |
| from hf_utils import ( | |
| is_red_font, | |
| normalize_text, | |
| normalize_header_text, | |
| get_clean_text, | |
| ) | |
| except Exception: | |
| # Minimal compatible fallbacks if hf_utils is not present. | |
| def normalize_text(s: str) -> str: | |
| if not s: | |
| return "" | |
| s = re.sub(r"\u2013|\u2014", "-", s) # smart dashes | |
| s = re.sub(r"[^\w\s\-\&\(\)\/:]", " ", s) # keep a small set of punctuation | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s | |
| def normalize_header_text(s: str) -> str: | |
| return normalize_text(s).upper() | |
| def is_red_font(run): | |
| """Best-effort red detection fallback for when hf_utils isn't available.""" | |
| try: | |
| col = getattr(run.font, "color", None) | |
| if col and getattr(col, "rgb", None): | |
| rgb = col.rgb | |
| r, g, b = rgb[0], rgb[1], rgb[2] | |
| if r > 150 and g < 120 and b < 120 and (r - max(g, b)) > 30: | |
| return True | |
| except Exception: | |
| pass | |
| # fallback to xml check | |
| try: | |
| rPr = getattr(run._element, "rPr", None) | |
| if rPr is not None: | |
| clr = rPr.find(qn('w:color')) | |
| if clr is not None: | |
| val = clr.get(qn('w:val')) | |
| if val and re.fullmatch(r"[0-9A-Fa-f]{6}", val): | |
| rr, gg, bb = int(val[:2], 16), int(val[2:4], 16), int(val[4:], 16) | |
| if rr > 150 and gg < 120 and bb < 120 and (rr - max(gg, bb)) > 30: | |
| return True | |
| except Exception: | |
| pass | |
| return False | |
| def get_clean_text(elem): | |
| return "".join(node.text for node in elem.iter() if node.tag.endswith("}t") and node.text).strip() | |
| # Import master schemas and patterns (your file) | |
| from master_key import TABLE_SCHEMAS, HEADING_PATTERNS, PARAGRAPH_PATTERNS | |
| # --------------------------------------------------------------------- | |
| # Low-level helpers (kept and hardened) | |
| # --------------------------------------------------------------------- | |
| def _prev_para_text(tbl): | |
| """Get text from previous paragraph before table""" | |
| prev = tbl._tbl.getprevious() | |
| while prev is not None and not prev.tag.endswith("}p"): | |
| prev = prev.getprevious() | |
| if prev is None: | |
| return "" | |
| return "".join(node.text for node in prev.iter() if node.tag.endswith("}t") and node.text).strip() | |
| def get_table_context(tbl): | |
| """Return structured context for a table""" | |
| heading = normalize_text(_prev_para_text(tbl)) | |
| headers = [normalize_text(c.text) for c in tbl.rows[0].cells if c.text.strip()] if tbl.rows else [] | |
| col0 = [normalize_text(r.cells[0].text) for r in tbl.rows if r.cells and r.cells[0].text.strip()] | |
| first_cell = normalize_text(tbl.rows[0].cells[0].text) if tbl.rows else "" | |
| all_cells = [] | |
| for row in tbl.rows: | |
| for cell in row.cells: | |
| t = normalize_text(cell.text) | |
| if t: | |
| all_cells.append(t) | |
| return { | |
| "heading": heading, | |
| "headers": headers, | |
| "col0": col0, | |
| "first_cell": first_cell, | |
| "all_cells": all_cells, | |
| "num_rows": len(tbl.rows), | |
| "num_cols": len(tbl.rows[0].cells) if tbl.rows else 0, | |
| } | |
| def fuzzy_match_heading(heading, patterns): | |
| """Return True if heading fuzzy-matches any regex patterns""" | |
| if not heading: | |
| return False | |
| heading_norm = heading.upper() | |
| for pattern in patterns: | |
| try: | |
| if re.search(pattern, heading_norm, re.IGNORECASE): | |
| return True | |
| except re.error: | |
| if pattern.upper() in heading_norm: | |
| return True | |
| return False | |
| # --------------------------------------------------------------------- | |
| # Header-to-label synonym map: improved coverage for common OCR/header variants | |
| # --------------------------------------------------------------------- | |
| HEADER_SYNONYMS = { | |
| # normalized header (upper) -> canonical label in TABLE_SCHEMAS | |
| "NO": "No.", | |
| "NO.": "No.", | |
| "REG NO": "Registration Number", | |
| "REGISTRATIONNO": "Registration Number", | |
| "REGISTRATION NUMBER": "Registration Number", | |
| "REGISTRATION": "Registration Number", | |
| "PRINT NAME": "Print Name", | |
| "NHVR OR EXEMPLAR GLOBAL AUDITOR REGISTRATION NUMBER": "NHVR or Exemplar Global Auditor Registration Number", | |
| "ROADWORTHINESS CERTIFICATES": "Roadworthiness Certificates", | |
| "ROADWORTHINESS CERTIFICATES (APPLICABLE FOR ENTRY AUDIT)": "Roadworthiness Certificates", | |
| "MAINTENANCE RECORDS": "Maintenance Records", | |
| "DAILY CHECKS": "Daily Checks", | |
| "FAULT RECORDING/ REPORTING": "Fault Recording/ Reporting", | |
| "FAULT RECORDING/REPORTING": "Fault Recording/ Reporting", | |
| "FAULT REPAIR": "Fault Repair", | |
| "WEIGHT VERIFICATION RECORDS": "Weight Verification Records", | |
| "RFS SUSPENSION CERTIFICATION #": "RFS Suspension Certification #", | |
| "SUSPENSION SYSTEM MAINTENANCE": "Suspension System Maintenance", | |
| "TRIP RECORDS": "Trip Records", | |
| "FAULT RECORDING/ REPORTING ON SUSPENSION SYSTEM": "Fault Recording/ Reporting", | |
| # short forms | |
| "REG NO.": "Registration Number", | |
| "REGISTRATION #": "Registration Number", | |
| } | |
| def map_header_to_label(header_text, labels): | |
| """ | |
| Given a header_text (raw) and list of candidate labels (from schema), | |
| return the best matching label or None. | |
| """ | |
| if not header_text: | |
| return None | |
| hnorm = normalize_header_text(header_text) | |
| # exact synonym map | |
| for key, lab in HEADER_SYNONYMS.items(): | |
| if key in hnorm: | |
| # ensure lab exists in candidate labels (case-insensitive) | |
| for cand in labels: | |
| if normalize_header_text(cand) == normalize_header_text(lab): | |
| return cand | |
| # if it isn't in labels, still return the lab (labels sometimes omit punctuation) | |
| return lab | |
| # try exact match to any candidate label | |
| for cand in labels: | |
| if normalize_header_text(cand) == hnorm: | |
| return cand | |
| # token overlap scoring (flexible) | |
| header_words = [w for w in re.split(r"\W+", header_text) if len(w) > 2] | |
| best = (None, 0.0) | |
| for cand in labels: | |
| cand_words = [w for w in re.split(r"\W+", cand) if len(w) > 2] | |
| if not cand_words or not header_words: | |
| continue | |
| common = set(w.upper() for w in header_words).intersection(set(w.upper() for w in cand_words)) | |
| score = len(common) / max(1, max(len(header_words), len(cand_words))) | |
| if score > best[1]: | |
| best = (cand, score) | |
| # lower threshold for vehicle tables / noisy OCR (accept >= 0.25) | |
| if best[1] >= 0.25: | |
| return best[0] | |
| return None | |
| # --------------------------------------------------------------------- | |
| # Matching / scoring logic (keeps original heuristics) | |
| # --------------------------------------------------------------------- | |
| def calculate_schema_match_score(schema_name, spec, context): | |
| score = 0 | |
| reasons = [] | |
| # Vehicle registration boost | |
| if "Vehicle Registration" in schema_name: | |
| vehicle_keywords = ["registration", "vehicle", "sub-contractor", "weight verification", "rfs suspension"] | |
| table_text = " ".join(context["headers"]).lower() + " " + context["heading"].lower() | |
| keyword_matches = sum(1 for k in vehicle_keywords if k in table_text) | |
| if keyword_matches >= 2: | |
| score += 150 | |
| reasons.append(f"Vehicle Registration keywords: {keyword_matches}/5") | |
| elif keyword_matches >= 1: | |
| score += 75 | |
| reasons.append(f"Some Vehicle Registration keywords: {keyword_matches}/5") | |
| # Summary boost | |
| if "Summary" in schema_name and "details" in " ".join(context["headers"]).lower(): | |
| score += 100 | |
| reasons.append("Summary schema with DETAILS column - perfect match") | |
| if "Summary" not in schema_name and "details" in " ".join(context["headers"]).lower(): | |
| score -= 75 | |
| reasons.append("Non-summary schema penalized for DETAILS column presence") | |
| # context exclusions & keywords | |
| if spec.get("context_exclusions"): | |
| table_text = " ".join(context["headers"]).lower() + " " + context["heading"].lower() | |
| for exc in spec["context_exclusions"]: | |
| if exc.lower() in table_text: | |
| score -= 50 | |
| reasons.append(f"Context exclusion penalty: '{exc}'") | |
| if spec.get("context_keywords"): | |
| table_text = " ".join(context["headers"]).lower() + " " + context["heading"].lower() | |
| matches = sum(1 for kw in spec["context_keywords"] if kw.lower() in table_text) | |
| if matches: | |
| score += matches * 15 | |
| reasons.append(f"Context keyword matches: {matches}/{len(spec['context_keywords'])}") | |
| # direct first-cell match | |
| if context["first_cell"] and context["first_cell"].upper() == schema_name.upper(): | |
| score += 100 | |
| reasons.append(f"Direct first cell match: '{context['first_cell']}'") | |
| # heading pattern | |
| if spec.get("headings"): | |
| for h in spec["headings"]: | |
| if isinstance(h, dict): | |
| text = h.get("text", "") | |
| else: | |
| text = h | |
| if fuzzy_match_heading(context["heading"], [text]): | |
| score += 50 | |
| reasons.append(f"Heading match: '{context['heading']}'") | |
| break | |
| # columns matching | |
| if spec.get("columns"): | |
| cols = [normalize_text(c) for c in spec["columns"]] | |
| matches = 0 | |
| for col in cols: | |
| if any(col.upper() in h.upper() for h in context["headers"]): | |
| matches += 1 | |
| if matches == len(cols): | |
| score += 60 | |
| reasons.append(f"All column headers match: {cols}") | |
| elif matches > 0: | |
| score += matches * 20 | |
| reasons.append(f"Partial column matches: {matches}/{len(cols)}") | |
| # left orientation | |
| if spec.get("orientation") == "left": | |
| labels = [normalize_text(lbl) for lbl in spec.get("labels", [])] | |
| matches = 0 | |
| for lbl in labels: | |
| if any(lbl.upper() in c.upper() or c.upper() in lbl.upper() for c in context["col0"]): | |
| matches += 1 | |
| if matches > 0: | |
| score += (matches / max(1, len(labels))) * 30 | |
| reasons.append(f"Left orientation label matches: {matches}/{len(labels)}") | |
| # row1 orientation | |
| elif spec.get("orientation") == "row1": | |
| labels = [normalize_text(lbl) for lbl in spec.get("labels", [])] | |
| matches = 0 | |
| for lbl in labels: | |
| if any(lbl.upper() in h.upper() or h.upper() in lbl.upper() for h in context["headers"]): | |
| matches += 1 | |
| elif any(word.upper() in " ".join(context["headers"]).upper() for word in lbl.split() if len(word) > 3): | |
| matches += 0.5 | |
| if matches > 0: | |
| score += (matches / max(1, len(labels))) * 40 | |
| reasons.append(f"Row1 orientation header matches: {matches}/{len(labels)}") | |
| # Declarations special cases | |
| if schema_name == "Operator Declaration" and context["first_cell"].upper().startswith("PRINT"): | |
| if "OPERATOR DECLARATION" in context["heading"].upper(): | |
| score += 80 | |
| reasons.append("Operator Declaration context match") | |
| elif any("MANAGER" in cell.upper() for cell in context["all_cells"]): | |
| score += 60 | |
| reasons.append("Manager found in cells (likely Operator Declaration)") | |
| if schema_name == "NHVAS Approved Auditor Declaration" and context["first_cell"].upper().startswith("PRINT"): | |
| if any("MANAGER" in cell.upper() for cell in context["all_cells"]): | |
| score -= 50 | |
| reasons.append("Penalty: Manager found (not auditor)") | |
| return score, reasons | |
| def match_table_schema(tbl): | |
| context = get_table_context(tbl) | |
| best_match = None | |
| best_score = 0 | |
| for name, spec in TABLE_SCHEMAS.items(): | |
| score, reasons = calculate_schema_match_score(name, spec, context) | |
| if score > best_score: | |
| best_score = score | |
| best_match = name | |
| if best_score >= 20: | |
| return best_match | |
| return None | |
| # --------------------------------------------------------------------- | |
| # Multi-schema detection & extraction (keeps original behavior) | |
| # --------------------------------------------------------------------- | |
| def check_multi_schema_table(tbl): | |
| context = get_table_context(tbl) | |
| operator_labels = [ | |
| "Operator name (Legal entity)", "NHVAS Accreditation No.", "Registered trading name/s", | |
| "Australian Company Number", "NHVAS Manual" | |
| ] | |
| contact_labels = ["Operator business address", "Operator Postal address", "Email address", "Operator Telephone Number"] | |
| has_operator = any(any(op_lbl.upper() in cell.upper() for op_lbl in operator_labels) for cell in context["col0"]) | |
| has_contact = any(any(cont_lbl.upper() in cell.upper() for cont_lbl in contact_labels) for cell in context["col0"]) | |
| if has_operator and has_contact: | |
| return ["Operator Information", "Operator contact details"] | |
| return None | |
| def extract_multi_schema_table(tbl, schemas): | |
| result = {} | |
| for schema_name in schemas: | |
| if schema_name not in TABLE_SCHEMAS: | |
| continue | |
| spec = TABLE_SCHEMAS[schema_name] | |
| schema_data = {} | |
| for ri, row in enumerate(tbl.rows): | |
| if ri == 0: | |
| continue | |
| row_label = normalize_text(row.cells[0].text) | |
| belongs_to_schema = False | |
| matched_label = None | |
| for spec_label in spec.get("labels", []): | |
| spec_norm = normalize_text(spec_label).upper() | |
| row_norm = row_label.upper() | |
| if spec_norm == row_norm or spec_norm in row_norm or row_norm in spec_norm: | |
| belongs_to_schema = True | |
| matched_label = spec_label | |
| break | |
| if not belongs_to_schema: | |
| continue | |
| for ci, cell in enumerate(row.cells): | |
| red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip() | |
| if red_txt: | |
| schema_data.setdefault(matched_label, []) | |
| if red_txt not in schema_data[matched_label]: | |
| schema_data[matched_label].append(red_txt) | |
| if schema_data: | |
| result[schema_name] = schema_data | |
| return result | |
| # --------------------------------------------------------------------- | |
| # Extraction: special-case for Vehicle Registration tables (row1) and generic fallback | |
| # --------------------------------------------------------------------- | |
| def extract_table_data(tbl, schema_name, spec): | |
| # Vehicle registration special handling | |
| if "Vehicle Registration" in schema_name: | |
| print(f" 🚗 EXTRACTION FIX: Processing Vehicle Registration table") | |
| labels = spec.get("labels", []) | |
| collected = {lbl: [] for lbl in labels} | |
| seen = {lbl: set() for lbl in labels} | |
| if len(tbl.rows) < 2: | |
| print(" ❌ Vehicle table has less than 2 rows") | |
| return {} | |
| header_row = tbl.rows[0] | |
| column_mapping = {} | |
| print(f" 📋 Mapping {len(header_row.cells)} header cells to labels") | |
| for col_idx, cell in enumerate(header_row.cells): | |
| header_text = normalize_text(cell.text).strip() | |
| if not header_text: | |
| continue | |
| print(f" Column {col_idx}: '{header_text}'") | |
| mapped = map_header_to_label(header_text, labels) | |
| if mapped: | |
| # find exact candidate label string (preserve original label spelling if possible) | |
| chosen = None | |
| for cand in labels: | |
| if normalize_header_text(cand) == normalize_header_text(mapped): | |
| chosen = cand | |
| break | |
| column_mapping[col_idx] = chosen or mapped | |
| print(f" ✅ Mapped to: '{column_mapping[col_idx]}'") | |
| else: | |
| # fallback: try fuzzy token overlap directly with candidate labels | |
| best = None | |
| best_score = 0.0 | |
| hwords = [w for w in re.split(r"\W+", header_text) if len(w) > 2] | |
| for cand in labels: | |
| cwords = [w for w in re.split(r"\W+", cand) if len(w) > 2] | |
| if not cwords or not hwords: | |
| continue | |
| common = set(w.upper() for w in hwords).intersection(set(w.upper() for w in cwords)) | |
| score = len(common) / max(1, max(len(hwords), len(cwords))) | |
| if score > best_score: | |
| best = cand | |
| best_score = score | |
| if best and best_score >= 0.25: | |
| column_mapping[col_idx] = best | |
| print(f" ✅ Fuzzy-mapped to: '{best}' (score: {best_score:.2f})") | |
| else: | |
| print(f" ⚠️ No mapping found for '{header_text}'") | |
| print(f" 📊 Total column mappings: {len(column_mapping)}") | |
| # Extract red text from data rows | |
| for row_idx in range(1, len(tbl.rows)): | |
| row = tbl.rows[row_idx] | |
| print(f" 📌 Processing data row {row_idx}") | |
| for col_idx, cell in enumerate(row.cells): | |
| if col_idx in column_mapping: | |
| label = column_mapping[col_idx] | |
| red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip() | |
| if red_txt: | |
| print(f" 🔴 Found red text in '{label}': '{red_txt}'") | |
| if red_txt not in seen.setdefault(label, set()): | |
| seen[label].add(red_txt) | |
| collected.setdefault(label, []).append(red_txt) | |
| result = {k: v for k, v in collected.items() if v} | |
| print(f" ✅ Vehicle Registration extracted: {len(result)} columns with data") | |
| return result | |
| # Generic fallback extraction logic | |
| labels = spec.get("labels", []) + [schema_name] | |
| collected = {lbl: [] for lbl in labels} | |
| seen = {lbl: set() for lbl in labels} | |
| by_col = (spec.get("orientation") == "row1") | |
| start_row = 1 if by_col else 0 | |
| rows = tbl.rows[start_row:] | |
| for ri, row in enumerate(rows): | |
| for ci, cell in enumerate(row.cells): | |
| red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip() | |
| if not red_txt: | |
| continue | |
| if by_col: | |
| if ci < len(spec.get("labels", [])): | |
| lbl = spec["labels"][ci] | |
| else: | |
| lbl = schema_name | |
| else: | |
| raw_label = normalize_text(row.cells[0].text) | |
| lbl = None | |
| for spec_label in spec.get("labels", []): | |
| if normalize_text(spec_label).upper() == raw_label.upper(): | |
| lbl = spec_label | |
| break | |
| if not lbl: | |
| for spec_label in spec.get("labels", []): | |
| spec_norm = normalize_text(spec_label).upper() | |
| raw_norm = raw_label.upper() | |
| if spec_norm in raw_norm or raw_norm in spec_norm: | |
| lbl = spec_label | |
| break | |
| if not lbl: | |
| lbl = schema_name | |
| if red_txt not in seen.setdefault(lbl, set()): | |
| seen[lbl].add(red_txt) | |
| collected.setdefault(lbl, []).append(red_txt) | |
| return {k: v for k, v in collected.items() if v} | |
| # --------------------------------------------------------------------- | |
| # Main extraction: process all tables then paragraphs | |
| # --------------------------------------------------------------------- | |
| def extract_red_text(input_doc): | |
| if isinstance(input_doc, str): | |
| doc = Document(input_doc) | |
| else: | |
| doc = input_doc | |
| out = {} | |
| table_count = 0 | |
| for tbl in doc.tables: | |
| table_count += 1 | |
| multi_schemas = check_multi_schema_table(tbl) | |
| if multi_schemas: | |
| multi_data = extract_multi_schema_table(tbl, multi_schemas) | |
| for schema_name, schema_data in multi_data.items(): | |
| if schema_data: | |
| # merge safely and dedupe | |
| existing = out.get(schema_name, {}) | |
| for k, v in schema_data.items(): | |
| existing.setdefault(k, []) | |
| for val in v: | |
| if val not in existing[k]: | |
| existing[k].append(val) | |
| out[schema_name] = existing | |
| continue | |
| schema = match_table_schema(tbl) | |
| if not schema: | |
| continue | |
| spec = TABLE_SCHEMAS[schema] | |
| data = extract_table_data(tbl, schema, spec) | |
| if data: | |
| existing = out.get(schema, {}) | |
| for k, v in data.items(): | |
| existing.setdefault(k, []) | |
| for val in v: | |
| if val not in existing[k]: | |
| existing[k].append(val) | |
| out[schema] = existing | |
| # Paragraph red-text extraction with context | |
| paras = {} | |
| for idx, para in enumerate(doc.paragraphs): | |
| red_txt = "".join(r.text for r in para.runs if is_red_font(r)).strip() | |
| if not red_txt: | |
| continue | |
| # find a heading context by scanning backwards | |
| context = None | |
| for j in range(idx - 1, -1, -1): | |
| txt = normalize_text(doc.paragraphs[j].text) | |
| if txt: | |
| patterns = HEADING_PATTERNS["main"] + HEADING_PATTERNS["sub"] | |
| if any(re.search(p, txt, re.IGNORECASE) for p in patterns): | |
| context = txt | |
| break | |
| # special-case date-like lines | |
| if not context and re.fullmatch(PARAGRAPH_PATTERNS.get("date_line", r".*"), red_txt): | |
| context = "Date" | |
| if not context: | |
| context = "(para)" | |
| paras.setdefault(context, []) | |
| if red_txt not in paras[context]: | |
| paras[context].append(red_txt) | |
| if paras: | |
| out["paragraphs"] = paras | |
| return out | |
| # --------------------------------------------------------------------- | |
| # File wrapper to support your existing calls | |
| # --------------------------------------------------------------------- | |
| def extract_red_text_filelike(input_file, output_file): | |
| if hasattr(input_file, "seek"): | |
| input_file.seek(0) | |
| doc = Document(input_file) | |
| result = extract_red_text(doc) | |
| if hasattr(output_file, "write"): | |
| json.dump(result, output_file, indent=2, ensure_ascii=False) | |
| output_file.flush() | |
| else: | |
| with open(output_file, "w", encoding="utf-8") as f: | |
| json.dump(result, f, indent=2, ensure_ascii=False) | |
| return result | |
| # --------------------------------------------------------------------- | |
| # CLI entrypoint (same as before) | |
| # --------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| if len(sys.argv) == 3: | |
| input_docx = sys.argv[1] | |
| output_json = sys.argv[2] | |
| doc = Document(input_docx) | |
| word_data = extract_red_text(doc) | |
| # write file (dedupe already handled in merging logic above) | |
| with open(output_json, "w", encoding="utf-8") as f: | |
| json.dump(word_data, f, indent=2, ensure_ascii=False) | |
| print(json.dumps(word_data, indent=2, ensure_ascii=False)) | |
| else: | |
| print("To use as a module: extract_red_text_filelike(input_file, output_file)") |