#!/usr/bin/env python3 """ extract_red_text.py """ from __future__ import annotations import json import re import sys import logging from collections import defaultdict from typing import List, Dict, Optional, Any # attempt to import python-docx (document processing) try: from docx import Document from docx.oxml.ns import qn from docx.shared import RGBColor except Exception as e: raise RuntimeError("python-docx is required. Install with: pip install python-docx") from e # ------------------------------ # Import master_key GLOBAL_SETTINGS and optional EXTRA_HEADER_SYNONYMS # ------------------------------ try: import master_key as mk GLOBAL_SETTINGS = getattr(mk, "GLOBAL_SETTINGS", {}) EXTRA_HEADER_SYNONYMS = getattr(mk, "EXTRA_HEADER_SYNONYMS", None) except Exception: GLOBAL_SETTINGS = { "normalize": { "lower": True, "strip_punctuation": True, "collapse_whitespace": True, "replace_smart_dashes": True }, "ocr_repair_rules": [ (r"\s*\(\s*Yes\s*/\s*No\s*\)", " (Yes/No)"), (r"R[e3]gistrat[i1]on", "Registration"), (r"Prin?t", "Print"), (r"Accredi[ta]tion", "Accreditation"), (r"[^\w\s\-\&\(\)\/:]", " "), ], "split_on": [" – ", " - ", ";", "\n", " / "], "date_like_pattern": r"^\s*(\d{1,2}(st|nd|rd|th)?\s+[A-Za-z]+|\d{1,2}\/\d{1,2}\/\d{2,4}|\d{1,2}\.\d{1,2}\.\d{2,4}|\d{1,2}\s+[A-Za-z]{3,})", "fuzzy_thresholds": {"high_priority": 70, "medium_priority": 60, "low_priority": 45}, "fuzzy_algorithm": "token_set_ratio", } EXTRA_HEADER_SYNONYMS = None # Provide an internal default synonyms map (compact keys -> canonical label) # This is used only if master_key.EXTRA_HEADER_SYNONYMS is not defined. _DEFAULT_EXTRA_HEADER_SYNONYMS = { # Compact key: canonical label # Examples from your logs (long/noisy headers) "roadworthinesscertificatesapplicableforentryaudit": "Roadworthiness Certificates", "roadworthinesscertificates": "Roadworthiness Certificates", "rfsuspensioncertificationn/aifnotapplicable": "RFS Suspension Certification #", "rfsuspensioncertification": "RFS Suspension Certification #", "maintenanceRecordsrecorddaterangeofrecordsreviewed".lower(): "Maintenance Records", "maintenancerecords": "Maintenance Records", "faultrecordingreportingonsuspensionsystemdaterange".lower(): "Fault Recording/ Reporting", "faultrecordingreporting": "Fault Recording/ Reporting", "faultrepairdaterange": "Fault Repair", "triprecordsdaterange": "Trip Records", # Add common variations "registrationnumber": "Registration Number", "registrationnumbernumber": "Registration Number", "subcontractor(yesno)": "Sub-contractor (Yes/No)", "sub-contractor(yes/no)": "Sub-contractor (Yes/No)", "nhvrorexemplarglobalauditorregistrationnumber": "NHVR or Exemplar Global Auditor Registration Number", "printname": "Print Name", "print": "Print Name", } # If mk provided EXTRA_HEADER_SYNONYMS, use it (but ensure keys are compacted similarly) if EXTRA_HEADER_SYNONYMS is None: EXTRA_HEADER_SYNONYMS = _DEFAULT_EXTRA_HEADER_SYNONYMS # ------------------------------ # Logging # ------------------------------ logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") log = logging.getLogger("extract_red_text") # ------------------------------ # Normalization & OCR-repair utilities (aligned to GLOBAL_SETTINGS) # ------------------------------ def _apply_ocr_repair_rules(text: str) -> str: s = text or "" for pat, repl in GLOBAL_SETTINGS.get("ocr_repair_rules", []): try: s = re.sub(pat, repl, s, flags=re.I) except re.error: # skip invalid rule continue return s def _normalize_text(text: str) -> str: """Normalize text according to GLOBAL_SETTINGS (readable normalized form).""" s = _apply_ocr_repair_rules(text or "") norm_cfg = GLOBAL_SETTINGS.get("normalize", {}) if norm_cfg.get("replace_smart_dashes", False): s = s.replace("–", "-").replace("—", "-") if norm_cfg.get("lower", False): s = s.lower() if norm_cfg.get("strip_punctuation", False): # keep hyphen, ampersand, parentheses, slash, colon; drop other punctuation s = re.sub(r"[^\w\s\-\&\(\)\/:]", " ", s) if norm_cfg.get("collapse_whitespace", False): s = re.sub(r"\s+", " ", s) return s.strip() def _compact_key(text: str) -> str: """Create compact key (no non-word chars) for deterministic lookup.""" if text is None: return "" normalized = _normalize_text(text) return re.sub(r"[^\w]", "", normalized) def map_header_using_extra_synonyms(header_text: str) -> Optional[str]: """ Try deterministic mapping using EXTRA_HEADER_SYNONYMS. Return canonical label if found, else None. """ if not header_text: return None normalized = _normalize_text(header_text) compact = _compact_key(header_text) # try compact key if compact in EXTRA_HEADER_SYNONYMS: return EXTRA_HEADER_SYNONYMS[compact] # try normalized key directly if normalized in EXTRA_HEADER_SYNONYMS: return EXTRA_HEADER_SYNONYMS[normalized] # also try case-insensitive match on keys for k, v in EXTRA_HEADER_SYNONYMS.items(): if k.lower() == normalized.lower() or k.lower() == compact.lower(): return v return None # ------------------------------ # Helpers to detect red font runs robustly # ------------------------------ def _run_is_red(run) -> bool: """ Detect if a run is red. python-docx represents color by run.font.color. We check RGB if available, or theme color 'red' as fallback. """ try: color = run.font.color if color is None: return False # If RGB is specified rgb = getattr(color, "rgb", None) if rgb is not None: # rgb is a docx.shared.RGBColor or similar. Representable as 'FF0000' or integer tuple hexval = ''.join("{:02X}".format(c) for c in rgb) if isinstance(rgb, (tuple, list)) else str(rgb) # accept strings containing 'FF0000' or '0000FF'? (we want red) # Accept any color where red component is high and others low-ish try: # If hex-like 'FF0000' -> interpret hex_clean = re.sub(r"[^0-9A-Fa-f]", "", hexval) if len(hex_clean) >= 6: r = int(hex_clean[-6:-4], 16) g = int(hex_clean[-4:-2], 16) b = int(hex_clean[-2:], 16) if r >= 150 and g < 120 and b < 120: return True except Exception: pass # fallback: theme color or color.theme_color value theme_color = getattr(color, "theme_color", None) if theme_color: try: if str(theme_color).lower().find("red") != -1: return True except Exception: pass except Exception: pass # final heuristic: if run.font.color.rgb as string contains 'FF' prefix and '00' for others try: if hasattr(run.font.color, "rgb") and run.font.color.rgb is not None: s = str(run.font.color.rgb) if "FF" in s and "0000" in s: return True except Exception: pass return False # ------------------------------ # Extraction: paragraphs, headings, tables # ------------------------------ def extract_from_docx(path: str) -> Dict[str, Any]: doc = Document(path) headings: List[str] = [] paragraphs_red: List[Dict[str, Any]] = [] red_runs: List[Dict[str, Any]] = [] tables_out: List[Dict[str, Any]] = [] # extract headings and paragraphs with red runs for p_index, para in enumerate(doc.paragraphs): text = para.text or "" # identify heading level from style name if available style_name = getattr(para.style, "name", "") if para.style is not None else "" is_heading = bool(re.search(r"Heading\s*\d+|HEADING|TITLE|SUBTITLE", style_name, flags=re.I)) or bool(re.search(r"^(MAINTENANCE|MASS|FATIGUE|NHVAS|Vehicle Registration|CORRECTIVE)", text, flags=re.I)) if is_heading: headings.append(text.strip()) # gather red runs in this paragraph paragraph_red_texts = [] char_cursor = 0 for run in para.runs: run_text = run.text or "" run_len = len(run_text) if _run_is_red(run) and run_text.strip(): # store a red run entry rr = { "text": run_text, "paragraph_index": p_index, "char_index": char_cursor, "style_name": style_name } red_runs.append(rr) paragraph_red_texts.append(run_text) char_cursor += run_len if paragraph_red_texts: paragraphs_red.append({ "paragraph_index": p_index, "text": text, "red_texts": paragraph_red_texts, "style_name": style_name }) # extract tables for t_index, table in enumerate(doc.tables): # convert table to simple cell-text matrix nrows = len(table.rows) ncols = max(len(row.cells) for row in table.rows) if nrows > 0 else 0 headers = [] rows_text = [] rows_red_cells = [] # Attempt to treat first row as header if cells look like headers (bold or all-caps) header_row = table.rows[0] if nrows > 0 else None # build header texts & apply header mapping if header_row: for c_idx, cell in enumerate(header_row.cells): cell_text = cell.text.strip() # normalize & map using EXTRA_HEADER_SYNONYMS mapped = map_header_using_extra_synonyms(cell_text) if mapped: header_label = mapped else: header_label = cell_text headers.append(header_label) # process all rows -> list of lists for r_i, row in enumerate(table.rows): row_texts = [] row_reds = [] for c_i, cell in enumerate(row.cells): ct = cell.text.strip() # gather red text from runs in this cell red_in_cell = [] # docx cell may have paragraphs for cpara in cell.paragraphs: for run in cpara.runs: if _run_is_red(run) and (run.text or "").strip(): red_in_cell.append((run.text or "").strip()) # compact red text into a single string if multiple runs present red_text_joined = " ".join(red_in_cell) if red_in_cell else None row_texts.append(ct) row_reds.append(red_text_joined) rows_text.append(row_texts) rows_red_cells.append(row_reds) tables_out.append({ "table_index": t_index, "nrows": nrows, "ncols": ncols, "headers": headers, "rows": rows_text, "red_cells": rows_red_cells }) # assemble output structure out = { "headings": headings, "paragraphs": paragraphs_red, "tables": tables_out, "red_runs": red_runs, # helpful metadata for downstream processing "meta": { "source_file": path, "total_headings": len(headings), "total_red_paragraphs": len(paragraphs_red), "total_tables": len(tables_out), "total_red_runs": len(red_runs) } } return out # ------------------------------ # Command-line interface # ------------------------------ def main(argv): if len(argv) < 3: print("Usage: python extract_red_text.py input.docx output.json") sys.exit(2) input_docx = argv[1] output_json = argv[2] log.info("Extracting red text from: %s", input_docx) try: result = extract_from_docx(input_docx) except Exception as exc: log.exception("Failed to extract from docx: %s", exc) raise # Save JSON pretty-printed for debugging by default try: with open(output_json, "w", encoding="utf-8") as fh: json.dump(result, fh, ensure_ascii=False, indent=2) log.info("Saved extracted word JSON to: %s", output_json) except Exception: log.exception("Failed to write output JSON to %s", output_json) raise # Print a short summary for logs / quick verification log.info("Headings found: %d, Red paragraphs: %d, Tables: %d, Red runs: %d", len(result.get("headings", [])), len(result.get("paragraphs", [])), len(result.get("tables", [])), len(result.get("red_runs", [])) ) if __name__ == "__main__": main(sys.argv)