Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| extract_red_text.py | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import re | |
| import sys | |
| import logging | |
| from collections import defaultdict | |
| from typing import List, Dict, Optional, Any | |
| # attempt to import python-docx (document processing) | |
| try: | |
| from docx import Document | |
| from docx.oxml.ns import qn | |
| from docx.shared import RGBColor | |
| except Exception as e: | |
| raise RuntimeError("python-docx is required. Install with: pip install python-docx") from e | |
| # ------------------------------ | |
| # Import master_key GLOBAL_SETTINGS and optional EXTRA_HEADER_SYNONYMS | |
| # ------------------------------ | |
| try: | |
| import master_key as mk | |
| GLOBAL_SETTINGS = getattr(mk, "GLOBAL_SETTINGS", {}) | |
| EXTRA_HEADER_SYNONYMS = getattr(mk, "EXTRA_HEADER_SYNONYMS", None) | |
| except Exception: | |
| GLOBAL_SETTINGS = { | |
| "normalize": { | |
| "lower": True, | |
| "strip_punctuation": True, | |
| "collapse_whitespace": True, | |
| "replace_smart_dashes": True | |
| }, | |
| "ocr_repair_rules": [ | |
| (r"\s*\(\s*Yes\s*/\s*No\s*\)", " (Yes/No)"), | |
| (r"R[e3]gistrat[i1]on", "Registration"), | |
| (r"Prin?t", "Print"), | |
| (r"Accredi[ta]tion", "Accreditation"), | |
| (r"[^\w\s\-\&\(\)\/:]", " "), | |
| ], | |
| "split_on": [" – ", " - ", ";", "\n", " / "], | |
| "date_like_pattern": r"^\s*(\d{1,2}(st|nd|rd|th)?\s+[A-Za-z]+|\d{1,2}\/\d{1,2}\/\d{2,4}|\d{1,2}\.\d{1,2}\.\d{2,4}|\d{1,2}\s+[A-Za-z]{3,})", | |
| "fuzzy_thresholds": {"high_priority": 70, "medium_priority": 60, "low_priority": 45}, | |
| "fuzzy_algorithm": "token_set_ratio", | |
| } | |
| EXTRA_HEADER_SYNONYMS = None | |
| # Provide an internal default synonyms map (compact keys -> canonical label) | |
| # This is used only if master_key.EXTRA_HEADER_SYNONYMS is not defined. | |
| _DEFAULT_EXTRA_HEADER_SYNONYMS = { | |
| # Compact key: canonical label | |
| # Examples from your logs (long/noisy headers) | |
| "roadworthinesscertificatesapplicableforentryaudit": "Roadworthiness Certificates", | |
| "roadworthinesscertificates": "Roadworthiness Certificates", | |
| "rfsuspensioncertificationn/aifnotapplicable": "RFS Suspension Certification #", | |
| "rfsuspensioncertification": "RFS Suspension Certification #", | |
| "maintenanceRecordsrecorddaterangeofrecordsreviewed".lower(): "Maintenance Records", | |
| "maintenancerecords": "Maintenance Records", | |
| "faultrecordingreportingonsuspensionsystemdaterange".lower(): "Fault Recording/ Reporting", | |
| "faultrecordingreporting": "Fault Recording/ Reporting", | |
| "faultrepairdaterange": "Fault Repair", | |
| "triprecordsdaterange": "Trip Records", | |
| # Add common variations | |
| "registrationnumber": "Registration Number", | |
| "registrationnumbernumber": "Registration Number", | |
| "subcontractor(yesno)": "Sub-contractor (Yes/No)", | |
| "sub-contractor(yes/no)": "Sub-contractor (Yes/No)", | |
| "nhvrorexemplarglobalauditorregistrationnumber": "NHVR or Exemplar Global Auditor Registration Number", | |
| "printname": "Print Name", | |
| "print": "Print Name", | |
| } | |
| # If mk provided EXTRA_HEADER_SYNONYMS, use it (but ensure keys are compacted similarly) | |
| if EXTRA_HEADER_SYNONYMS is None: | |
| EXTRA_HEADER_SYNONYMS = _DEFAULT_EXTRA_HEADER_SYNONYMS | |
| # ------------------------------ | |
| # Logging | |
| # ------------------------------ | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") | |
| log = logging.getLogger("extract_red_text") | |
| # ------------------------------ | |
| # Normalization & OCR-repair utilities (aligned to GLOBAL_SETTINGS) | |
| # ------------------------------ | |
| def _apply_ocr_repair_rules(text: str) -> str: | |
| s = text or "" | |
| for pat, repl in GLOBAL_SETTINGS.get("ocr_repair_rules", []): | |
| try: | |
| s = re.sub(pat, repl, s, flags=re.I) | |
| except re.error: | |
| # skip invalid rule | |
| continue | |
| return s | |
| def _normalize_text(text: str) -> str: | |
| """Normalize text according to GLOBAL_SETTINGS (readable normalized form).""" | |
| s = _apply_ocr_repair_rules(text or "") | |
| norm_cfg = GLOBAL_SETTINGS.get("normalize", {}) | |
| if norm_cfg.get("replace_smart_dashes", False): | |
| s = s.replace("–", "-").replace("—", "-") | |
| if norm_cfg.get("lower", False): | |
| s = s.lower() | |
| if norm_cfg.get("strip_punctuation", False): | |
| # keep hyphen, ampersand, parentheses, slash, colon; drop other punctuation | |
| s = re.sub(r"[^\w\s\-\&\(\)\/:]", " ", s) | |
| if norm_cfg.get("collapse_whitespace", False): | |
| s = re.sub(r"\s+", " ", s) | |
| return s.strip() | |
| def _compact_key(text: str) -> str: | |
| """Create compact key (no non-word chars) for deterministic lookup.""" | |
| if text is None: | |
| return "" | |
| normalized = _normalize_text(text) | |
| return re.sub(r"[^\w]", "", normalized) | |
| def map_header_using_extra_synonyms(header_text: str) -> Optional[str]: | |
| """ | |
| Try deterministic mapping using EXTRA_HEADER_SYNONYMS. | |
| Return canonical label if found, else None. | |
| """ | |
| if not header_text: | |
| return None | |
| normalized = _normalize_text(header_text) | |
| compact = _compact_key(header_text) | |
| # try compact key | |
| if compact in EXTRA_HEADER_SYNONYMS: | |
| return EXTRA_HEADER_SYNONYMS[compact] | |
| # try normalized key directly | |
| if normalized in EXTRA_HEADER_SYNONYMS: | |
| return EXTRA_HEADER_SYNONYMS[normalized] | |
| # also try case-insensitive match on keys | |
| for k, v in EXTRA_HEADER_SYNONYMS.items(): | |
| if k.lower() == normalized.lower() or k.lower() == compact.lower(): | |
| return v | |
| return None | |
| # ------------------------------ | |
| # Helpers to detect red font runs robustly | |
| # ------------------------------ | |
| def _run_is_red(run) -> bool: | |
| """ | |
| Detect if a run is red. python-docx represents color by run.font.color. | |
| We check RGB if available, or theme color 'red' as fallback. | |
| """ | |
| try: | |
| color = run.font.color | |
| if color is None: | |
| return False | |
| # If RGB is specified | |
| rgb = getattr(color, "rgb", None) | |
| if rgb is not None: | |
| # rgb is a docx.shared.RGBColor or similar. Representable as 'FF0000' or integer tuple | |
| hexval = ''.join("{:02X}".format(c) for c in rgb) if isinstance(rgb, (tuple, list)) else str(rgb) | |
| # accept strings containing 'FF0000' or '0000FF'? (we want red) | |
| # Accept any color where red component is high and others low-ish | |
| try: | |
| # If hex-like 'FF0000' -> interpret | |
| hex_clean = re.sub(r"[^0-9A-Fa-f]", "", hexval) | |
| if len(hex_clean) >= 6: | |
| r = int(hex_clean[-6:-4], 16) | |
| g = int(hex_clean[-4:-2], 16) | |
| b = int(hex_clean[-2:], 16) | |
| if r >= 150 and g < 120 and b < 120: | |
| return True | |
| except Exception: | |
| pass | |
| # fallback: theme color or color.theme_color value | |
| theme_color = getattr(color, "theme_color", None) | |
| if theme_color: | |
| try: | |
| if str(theme_color).lower().find("red") != -1: | |
| return True | |
| except Exception: | |
| pass | |
| except Exception: | |
| pass | |
| # final heuristic: if run.font.color.rgb as string contains 'FF' prefix and '00' for others | |
| try: | |
| if hasattr(run.font.color, "rgb") and run.font.color.rgb is not None: | |
| s = str(run.font.color.rgb) | |
| if "FF" in s and "0000" in s: | |
| return True | |
| except Exception: | |
| pass | |
| return False | |
| # ------------------------------ | |
| # Extraction: paragraphs, headings, tables | |
| # ------------------------------ | |
| def extract_from_docx(path: str) -> Dict[str, Any]: | |
| doc = Document(path) | |
| headings: List[str] = [] | |
| paragraphs_red: List[Dict[str, Any]] = [] | |
| red_runs: List[Dict[str, Any]] = [] | |
| tables_out: List[Dict[str, Any]] = [] | |
| # extract headings and paragraphs with red runs | |
| for p_index, para in enumerate(doc.paragraphs): | |
| text = para.text or "" | |
| # identify heading level from style name if available | |
| style_name = getattr(para.style, "name", "") if para.style is not None else "" | |
| is_heading = bool(re.search(r"Heading\s*\d+|HEADING|TITLE|SUBTITLE", style_name, flags=re.I)) or bool(re.search(r"^(MAINTENANCE|MASS|FATIGUE|NHVAS|Vehicle Registration|CORRECTIVE)", text, flags=re.I)) | |
| if is_heading: | |
| headings.append(text.strip()) | |
| # gather red runs in this paragraph | |
| paragraph_red_texts = [] | |
| char_cursor = 0 | |
| for run in para.runs: | |
| run_text = run.text or "" | |
| run_len = len(run_text) | |
| if _run_is_red(run) and run_text.strip(): | |
| # store a red run entry | |
| rr = { | |
| "text": run_text, | |
| "paragraph_index": p_index, | |
| "char_index": char_cursor, | |
| "style_name": style_name | |
| } | |
| red_runs.append(rr) | |
| paragraph_red_texts.append(run_text) | |
| char_cursor += run_len | |
| if paragraph_red_texts: | |
| paragraphs_red.append({ | |
| "paragraph_index": p_index, | |
| "text": text, | |
| "red_texts": paragraph_red_texts, | |
| "style_name": style_name | |
| }) | |
| # extract tables | |
| for t_index, table in enumerate(doc.tables): | |
| # convert table to simple cell-text matrix | |
| nrows = len(table.rows) | |
| ncols = max(len(row.cells) for row in table.rows) if nrows > 0 else 0 | |
| headers = [] | |
| rows_text = [] | |
| rows_red_cells = [] | |
| # Attempt to treat first row as header if cells look like headers (bold or all-caps) | |
| header_row = table.rows[0] if nrows > 0 else None | |
| # build header texts & apply header mapping | |
| if header_row: | |
| for c_idx, cell in enumerate(header_row.cells): | |
| cell_text = cell.text.strip() | |
| # normalize & map using EXTRA_HEADER_SYNONYMS | |
| mapped = map_header_using_extra_synonyms(cell_text) | |
| if mapped: | |
| header_label = mapped | |
| else: | |
| header_label = cell_text | |
| headers.append(header_label) | |
| # process all rows -> list of lists | |
| for r_i, row in enumerate(table.rows): | |
| row_texts = [] | |
| row_reds = [] | |
| for c_i, cell in enumerate(row.cells): | |
| ct = cell.text.strip() | |
| # gather red text from runs in this cell | |
| red_in_cell = [] | |
| # docx cell may have paragraphs | |
| for cpara in cell.paragraphs: | |
| for run in cpara.runs: | |
| if _run_is_red(run) and (run.text or "").strip(): | |
| red_in_cell.append((run.text or "").strip()) | |
| # compact red text into a single string if multiple runs present | |
| red_text_joined = " ".join(red_in_cell) if red_in_cell else None | |
| row_texts.append(ct) | |
| row_reds.append(red_text_joined) | |
| rows_text.append(row_texts) | |
| rows_red_cells.append(row_reds) | |
| tables_out.append({ | |
| "table_index": t_index, | |
| "nrows": nrows, | |
| "ncols": ncols, | |
| "headers": headers, | |
| "rows": rows_text, | |
| "red_cells": rows_red_cells | |
| }) | |
| # assemble output structure | |
| out = { | |
| "headings": headings, | |
| "paragraphs": paragraphs_red, | |
| "tables": tables_out, | |
| "red_runs": red_runs, | |
| # helpful metadata for downstream processing | |
| "meta": { | |
| "source_file": path, | |
| "total_headings": len(headings), | |
| "total_red_paragraphs": len(paragraphs_red), | |
| "total_tables": len(tables_out), | |
| "total_red_runs": len(red_runs) | |
| } | |
| } | |
| return out | |
| # ------------------------------ | |
| # Command-line interface | |
| # ------------------------------ | |
| def main(argv): | |
| if len(argv) < 3: | |
| print("Usage: python extract_red_text.py input.docx output.json") | |
| sys.exit(2) | |
| input_docx = argv[1] | |
| output_json = argv[2] | |
| log.info("Extracting red text from: %s", input_docx) | |
| try: | |
| result = extract_from_docx(input_docx) | |
| except Exception as exc: | |
| log.exception("Failed to extract from docx: %s", exc) | |
| raise | |
| # Save JSON pretty-printed for debugging by default | |
| try: | |
| with open(output_json, "w", encoding="utf-8") as fh: | |
| json.dump(result, fh, ensure_ascii=False, indent=2) | |
| log.info("Saved extracted word JSON to: %s", output_json) | |
| except Exception: | |
| log.exception("Failed to write output JSON to %s", output_json) | |
| raise | |
| # Print a short summary for logs / quick verification | |
| log.info("Headings found: %d, Red paragraphs: %d, Tables: %d, Red runs: %d", | |
| len(result.get("headings", [])), | |
| len(result.get("paragraphs", [])), | |
| len(result.get("tables", [])), | |
| len(result.get("red_runs", [])) | |
| ) | |
| if __name__ == "__main__": | |
| main(sys.argv) |