File size: 13,198 Bytes
e8b46b5
24ad2d2
 
 
704d2a2
 
1055fe1
704d2a2
1055fe1
704d2a2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f4b6b63
704d2a2
f4b6b63
704d2a2
 
f4b6b63
704d2a2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7a2fc08
704d2a2
 
 
 
 
 
 
 
f4b6b63
704d2a2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f4b6b63
 
704d2a2
f4b6b63
704d2a2
 
 
 
f4b6b63
 
 
 
704d2a2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7a2fc08
704d2a2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e8b46b5
 
704d2a2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f4b6b63
704d2a2
 
 
 
 
 
 
 
1055fe1
704d2a2
 
 
 
 
 
 
f4b6b63
1055fe1
704d2a2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
#!/usr/bin/env python3
"""
extract_red_text.py
"""

from __future__ import annotations
import json
import re
import sys
import logging
from collections import defaultdict
from typing import List, Dict, Optional, Any

# attempt to import python-docx (document processing)
try:
    from docx import Document
    from docx.oxml.ns import qn
    from docx.shared import RGBColor
except Exception as e:
    raise RuntimeError("python-docx is required. Install with: pip install python-docx") from e

# ------------------------------
# Import master_key GLOBAL_SETTINGS and optional EXTRA_HEADER_SYNONYMS
# ------------------------------
try:
    import master_key as mk
    GLOBAL_SETTINGS = getattr(mk, "GLOBAL_SETTINGS", {})
    EXTRA_HEADER_SYNONYMS = getattr(mk, "EXTRA_HEADER_SYNONYMS", None)
except Exception:
    GLOBAL_SETTINGS = {
        "normalize": {
            "lower": True,
            "strip_punctuation": True,
            "collapse_whitespace": True,
            "replace_smart_dashes": True
        },
        "ocr_repair_rules": [
            (r"\s*\(\s*Yes\s*/\s*No\s*\)", " (Yes/No)"),
            (r"R[e3]gistrat[i1]on", "Registration"),
            (r"Prin?t", "Print"),
            (r"Accredi[ta]tion", "Accreditation"),
            (r"[^\w\s\-\&\(\)\/:]", " "),
        ],
        "split_on": [" – ", " - ", ";", "\n", " / "],
        "date_like_pattern": r"^\s*(\d{1,2}(st|nd|rd|th)?\s+[A-Za-z]+|\d{1,2}\/\d{1,2}\/\d{2,4}|\d{1,2}\.\d{1,2}\.\d{2,4}|\d{1,2}\s+[A-Za-z]{3,})",
        "fuzzy_thresholds": {"high_priority": 70, "medium_priority": 60, "low_priority": 45},
        "fuzzy_algorithm": "token_set_ratio",
    }
    EXTRA_HEADER_SYNONYMS = None

# Provide an internal default synonyms map (compact keys -> canonical label)
# This is used only if master_key.EXTRA_HEADER_SYNONYMS is not defined.
_DEFAULT_EXTRA_HEADER_SYNONYMS = {
    # Compact key: canonical label
    # Examples from your logs (long/noisy headers)
    "roadworthinesscertificatesapplicableforentryaudit": "Roadworthiness Certificates",
    "roadworthinesscertificates": "Roadworthiness Certificates",
    "rfsuspensioncertificationn/aifnotapplicable": "RFS Suspension Certification #",
    "rfsuspensioncertification": "RFS Suspension Certification #",
    "maintenanceRecordsrecorddaterangeofrecordsreviewed".lower(): "Maintenance Records",
    "maintenancerecords": "Maintenance Records",
    "faultrecordingreportingonsuspensionsystemdaterange".lower(): "Fault Recording/ Reporting",
    "faultrecordingreporting": "Fault Recording/ Reporting",
    "faultrepairdaterange": "Fault Repair",
    "triprecordsdaterange": "Trip Records",
    # Add common variations
    "registrationnumber": "Registration Number",
    "registrationnumbernumber": "Registration Number",
    "subcontractor(yesno)": "Sub-contractor (Yes/No)",
    "sub-contractor(yes/no)": "Sub-contractor (Yes/No)",
    "nhvrorexemplarglobalauditorregistrationnumber": "NHVR or Exemplar Global Auditor Registration Number",
    "printname": "Print Name",
    "print": "Print Name",
}

# If mk provided EXTRA_HEADER_SYNONYMS, use it (but ensure keys are compacted similarly)
if EXTRA_HEADER_SYNONYMS is None:
    EXTRA_HEADER_SYNONYMS = _DEFAULT_EXTRA_HEADER_SYNONYMS

# ------------------------------
# Logging
# ------------------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
log = logging.getLogger("extract_red_text")

# ------------------------------
# Normalization & OCR-repair utilities (aligned to GLOBAL_SETTINGS)
# ------------------------------
def _apply_ocr_repair_rules(text: str) -> str:
    s = text or ""
    for pat, repl in GLOBAL_SETTINGS.get("ocr_repair_rules", []):
        try:
            s = re.sub(pat, repl, s, flags=re.I)
        except re.error:
            # skip invalid rule
            continue
    return s

def _normalize_text(text: str) -> str:
    """Normalize text according to GLOBAL_SETTINGS (readable normalized form)."""
    s = _apply_ocr_repair_rules(text or "")
    norm_cfg = GLOBAL_SETTINGS.get("normalize", {})
    if norm_cfg.get("replace_smart_dashes", False):
        s = s.replace("–", "-").replace("—", "-")
    if norm_cfg.get("lower", False):
        s = s.lower()
    if norm_cfg.get("strip_punctuation", False):
        # keep hyphen, ampersand, parentheses, slash, colon; drop other punctuation
        s = re.sub(r"[^\w\s\-\&\(\)\/:]", " ", s)
    if norm_cfg.get("collapse_whitespace", False):
        s = re.sub(r"\s+", " ", s)
    return s.strip()

def _compact_key(text: str) -> str:
    """Create compact key (no non-word chars) for deterministic lookup."""
    if text is None:
        return ""
    normalized = _normalize_text(text)
    return re.sub(r"[^\w]", "", normalized)

def map_header_using_extra_synonyms(header_text: str) -> Optional[str]:
    """
    Try deterministic mapping using EXTRA_HEADER_SYNONYMS.
    Return canonical label if found, else None.
    """
    if not header_text:
        return None
    normalized = _normalize_text(header_text)
    compact = _compact_key(header_text)
    # try compact key
    if compact in EXTRA_HEADER_SYNONYMS:
        return EXTRA_HEADER_SYNONYMS[compact]
    # try normalized key directly
    if normalized in EXTRA_HEADER_SYNONYMS:
        return EXTRA_HEADER_SYNONYMS[normalized]
    # also try case-insensitive match on keys
    for k, v in EXTRA_HEADER_SYNONYMS.items():
        if k.lower() == normalized.lower() or k.lower() == compact.lower():
            return v
    return None

# ------------------------------
# Helpers to detect red font runs robustly
# ------------------------------
def _run_is_red(run) -> bool:
    """
    Detect if a run is red. python-docx represents color by run.font.color.
    We check RGB if available, or theme color 'red' as fallback.
    """
    try:
        color = run.font.color
        if color is None:
            return False
        # If RGB is specified
        rgb = getattr(color, "rgb", None)
        if rgb is not None:
            # rgb is a docx.shared.RGBColor or similar. Representable as 'FF0000' or integer tuple
            hexval = ''.join("{:02X}".format(c) for c in rgb) if isinstance(rgb, (tuple, list)) else str(rgb)
            # accept strings containing 'FF0000' or '0000FF'? (we want red)
            # Accept any color where red component is high and others low-ish
            try:
                # If hex-like 'FF0000' -> interpret
                hex_clean = re.sub(r"[^0-9A-Fa-f]", "", hexval)
                if len(hex_clean) >= 6:
                    r = int(hex_clean[-6:-4], 16)
                    g = int(hex_clean[-4:-2], 16)
                    b = int(hex_clean[-2:], 16)
                    if r >= 150 and g < 120 and b < 120:
                        return True
            except Exception:
                pass
        # fallback: theme color or color.theme_color value
        theme_color = getattr(color, "theme_color", None)
        if theme_color:
            try:
                if str(theme_color).lower().find("red") != -1:
                    return True
            except Exception:
                pass
    except Exception:
        pass
    # final heuristic: if run.font.color.rgb as string contains 'FF' prefix and '00' for others
    try:
        if hasattr(run.font.color, "rgb") and run.font.color.rgb is not None:
            s = str(run.font.color.rgb)
            if "FF" in s and "0000" in s:
                return True
    except Exception:
        pass
    return False

# ------------------------------
# Extraction: paragraphs, headings, tables
# ------------------------------
def extract_from_docx(path: str) -> Dict[str, Any]:
    doc = Document(path)
    headings: List[str] = []
    paragraphs_red: List[Dict[str, Any]] = []
    red_runs: List[Dict[str, Any]] = []
    tables_out: List[Dict[str, Any]] = []

    # extract headings and paragraphs with red runs
    for p_index, para in enumerate(doc.paragraphs):
        text = para.text or ""
        # identify heading level from style name if available
        style_name = getattr(para.style, "name", "") if para.style is not None else ""
        is_heading = bool(re.search(r"Heading\s*\d+|HEADING|TITLE|SUBTITLE", style_name, flags=re.I)) or bool(re.search(r"^(MAINTENANCE|MASS|FATIGUE|NHVAS|Vehicle Registration|CORRECTIVE)", text, flags=re.I))
        if is_heading:
            headings.append(text.strip())

        # gather red runs in this paragraph
        paragraph_red_texts = []
        char_cursor = 0
        for run in para.runs:
            run_text = run.text or ""
            run_len = len(run_text)
            if _run_is_red(run) and run_text.strip():
                # store a red run entry
                rr = {
                    "text": run_text,
                    "paragraph_index": p_index,
                    "char_index": char_cursor,
                    "style_name": style_name
                }
                red_runs.append(rr)
                paragraph_red_texts.append(run_text)
            char_cursor += run_len
        if paragraph_red_texts:
            paragraphs_red.append({
                "paragraph_index": p_index,
                "text": text,
                "red_texts": paragraph_red_texts,
                "style_name": style_name
            })

    # extract tables
    for t_index, table in enumerate(doc.tables):
        # convert table to simple cell-text matrix
        nrows = len(table.rows)
        ncols = max(len(row.cells) for row in table.rows) if nrows > 0 else 0
        headers = []
        rows_text = []
        rows_red_cells = []

        # Attempt to treat first row as header if cells look like headers (bold or all-caps)
        header_row = table.rows[0] if nrows > 0 else None

        # build header texts & apply header mapping
        if header_row:
            for c_idx, cell in enumerate(header_row.cells):
                cell_text = cell.text.strip()
                # normalize & map using EXTRA_HEADER_SYNONYMS
                mapped = map_header_using_extra_synonyms(cell_text)
                if mapped:
                    header_label = mapped
                else:
                    header_label = cell_text
                headers.append(header_label)

        # process all rows -> list of lists
        for r_i, row in enumerate(table.rows):
            row_texts = []
            row_reds = []
            for c_i, cell in enumerate(row.cells):
                ct = cell.text.strip()
                # gather red text from runs in this cell
                red_in_cell = []
                # docx cell may have paragraphs
                for cpara in cell.paragraphs:
                    for run in cpara.runs:
                        if _run_is_red(run) and (run.text or "").strip():
                            red_in_cell.append((run.text or "").strip())
                # compact red text into a single string if multiple runs present
                red_text_joined = " ".join(red_in_cell) if red_in_cell else None
                row_texts.append(ct)
                row_reds.append(red_text_joined)
            rows_text.append(row_texts)
            rows_red_cells.append(row_reds)

        tables_out.append({
            "table_index": t_index,
            "nrows": nrows,
            "ncols": ncols,
            "headers": headers,
            "rows": rows_text,
            "red_cells": rows_red_cells
        })

    # assemble output structure
    out = {
        "headings": headings,
        "paragraphs": paragraphs_red,
        "tables": tables_out,
        "red_runs": red_runs,
        # helpful metadata for downstream processing
        "meta": {
            "source_file": path,
            "total_headings": len(headings),
            "total_red_paragraphs": len(paragraphs_red),
            "total_tables": len(tables_out),
            "total_red_runs": len(red_runs)
        }
    }
    return out

# ------------------------------
# Command-line interface
# ------------------------------
def main(argv):
    if len(argv) < 3:
        print("Usage: python extract_red_text.py input.docx output.json")
        sys.exit(2)
    input_docx = argv[1]
    output_json = argv[2]

    log.info("Extracting red text from: %s", input_docx)
    try:
        result = extract_from_docx(input_docx)
    except Exception as exc:
        log.exception("Failed to extract from docx: %s", exc)
        raise

    # Save JSON pretty-printed for debugging by default
    try:
        with open(output_json, "w", encoding="utf-8") as fh:
            json.dump(result, fh, ensure_ascii=False, indent=2)
        log.info("Saved extracted word JSON to: %s", output_json)
    except Exception:
        log.exception("Failed to write output JSON to %s", output_json)
        raise

    # Print a short summary for logs / quick verification
    log.info("Headings found: %d, Red paragraphs: %d, Tables: %d, Red runs: %d",
             len(result.get("headings", [])),
             len(result.get("paragraphs", [])),
             len(result.get("tables", [])),
             len(result.get("red_runs", []))
             )

if __name__ == "__main__":
    main(sys.argv)