#!/usr/bin/env python3 """ extract_red_text.py - Enhanced version with improved red text detection and master key alignment """ from __future__ import annotations import json import re import sys import logging from collections import defaultdict from typing import List, Dict, Optional, Any, Tuple # attempt to import python-docx (document processing) try: from docx import Document from docx.oxml.ns import qn from docx.shared import RGBColor except Exception as e: raise RuntimeError("python-docx is required. Install with: pip install python-docx") from e # ------------------------------ # Import master_key configurations # ------------------------------ try: import master_key as mk GLOBAL_SETTINGS = mk.GLOBAL_SETTINGS EXTRA_HEADER_SYNONYMS = mk.EXTRA_HEADER_SYNONYMS TABLE_SCHEMAS = getattr(mk, "TABLE_SCHEMAS", {}) except ImportError as e: logging.error("Failed to import master_key.py: %s", e) raise RuntimeError("master_key.py is required for configuration") from e except AttributeError as e: logging.error("Missing required configuration in master_key.py: %s", e) raise RuntimeError("master_key.py missing required GLOBAL_SETTINGS or EXTRA_HEADER_SYNONYMS") from e # ------------------------------ # Logging # ------------------------------ logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") log = logging.getLogger("extract_red_text") # ------------------------------ # Normalization & OCR-repair utilities (aligned to GLOBAL_SETTINGS) # ------------------------------ def _apply_ocr_repair_rules(text: str) -> str: """Apply OCR repair rules from GLOBAL_SETTINGS.""" s = text or "" for pat, repl in GLOBAL_SETTINGS.get("ocr_repair_rules", []): try: s = re.sub(pat, repl, s, flags=re.I) except re.error: # skip invalid rule continue return s def _normalize_text(text: str) -> str: """Normalize text according to GLOBAL_SETTINGS (readable normalized form).""" s = _apply_ocr_repair_rules(text or "") norm_cfg = GLOBAL_SETTINGS.get("normalize", {}) if norm_cfg.get("replace_smart_dashes", False): s = s.replace("–", "-").replace("—", "-") if norm_cfg.get("lower", False): s = s.lower() if norm_cfg.get("strip_punctuation", False): # keep hyphen, ampersand, parentheses, slash, colon; drop other punctuation s = re.sub(r"[^\w\s\-\&\(\)\/:]", " ", s) if norm_cfg.get("collapse_whitespace", False): s = re.sub(r"\s+", " ", s) return s.strip() def _compact_key(text: str) -> str: """Create compact key (no non-word chars) for deterministic lookup.""" if text is None: return "" normalized = _normalize_text(text) return re.sub(r"[^\w]", "", normalized) def map_header_using_extra_synonyms(header_text: str) -> Optional[str]: """ Try deterministic mapping using EXTRA_HEADER_SYNONYMS. Return canonical label if found, else None. """ if not header_text: return None normalized = _normalize_text(header_text) compact = _compact_key(header_text) # try compact key if compact in EXTRA_HEADER_SYNONYMS: return EXTRA_HEADER_SYNONYMS[compact] # try normalized key directly if normalized in EXTRA_HEADER_SYNONYMS: return EXTRA_HEADER_SYNONYMS[normalized] # also try case-insensitive match on keys for k, v in EXTRA_HEADER_SYNONYMS.items(): if k.lower() == normalized.lower() or k.lower() == compact.lower(): return v return None # ------------------------------ # Enhanced red font detection using hf_utils pattern # ------------------------------ def _run_is_red(run) -> bool: """ Enhanced red color detection for docx.run objects. Uses multiple methods to detect red text robustly. """ try: # Method 1: Check run.font.color.rgb col = getattr(run.font, "color", None) if col is not None and getattr(col, "rgb", None): rgb = col.rgb try: # rgb may be sequence-like or have attributes if hasattr(rgb, '__getitem__'): # sequence-like r, g, b = rgb[0], rgb[1], rgb[2] else: # attribute access r = getattr(rgb, "r", None) or getattr(rgb, "red", None) g = getattr(rgb, "g", None) or getattr(rgb, "green", None) b = getattr(rgb, "b", None) or getattr(rgb, "blue", None) if r is not None and g is not None and b is not None: # Tolerant heuristic: red must be noticeably higher than green/blue if r >= 160 and g <= 120 and b <= 120 and (r - g) >= 30 and (r - b) >= 30: return True except Exception: pass except Exception: pass # Method 2: Check raw XML color code try: rPr = run._element.rPr if rPr is not None: clr = rPr.find('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}color') if clr is not None: val = clr.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val') if val and re.fullmatch(r"[0-9A-Fa-f]{6}", val): rr = int(val[:2], 16) gg = int(val[2:4], 16) bb = int(val[4:], 16) if rr >= 160 and gg <= 120 and bb <= 120 and (rr - gg) >= 30 and (rr - bb) >= 30: return True except Exception: pass # Method 3: Check theme color try: color = run.font.color if color is not None: theme_color = getattr(color, "theme_color", None) if theme_color: theme_str = str(theme_color).lower() if "red" in theme_str or "accent_2" in theme_str: # Common red theme return True except Exception: pass # Method 4: String representation fallback try: if hasattr(run.font.color, "rgb") and run.font.color.rgb is not None: s = str(run.font.color.rgb) # Look for patterns like "FF0000" or similar high-red values if re.search(r"[Ff]{2}0{4}|[Ee]{2}0{4}|[Dd]{2}0{4}", s): return True except Exception: pass return False def _extract_red_text_segments(cell): """Extract red text segments from a table cell.""" segments = [] for p_idx, paragraph in enumerate(cell.paragraphs): current_text = "" current_runs = [] for r_idx, run in enumerate(paragraph.runs): if _run_is_red(run) and run.text.strip(): current_text += run.text current_runs.append((p_idx, r_idx, run)) else: # End of red segment if current_runs: segments.append({ 'text': current_text.strip(), 'runs': current_runs.copy(), 'paragraph_idx': p_idx }) current_text = "" current_runs = [] # Handle segment at end of paragraph if current_runs: segments.append({ 'text': current_text.strip(), 'runs': current_runs.copy(), 'paragraph_idx': p_idx }) return segments def _has_red_text(cell) -> bool: """Check if a cell contains any red text.""" for paragraph in cell.paragraphs: for run in paragraph.runs: if _run_is_red(run) and run.text.strip(): return True return False # ------------------------------ # Enhanced table processing with schema-aware header mapping # ------------------------------ def _process_table_with_schema_mapping(table, t_index: int) -> Dict[str, Any]: """Process table with enhanced header mapping using master key schemas.""" nrows = len(table.rows) ncols = max(len(row.cells) for row in table.rows) if nrows > 0 else 0 if nrows == 0: return { "table_index": t_index, "nrows": 0, "ncols": 0, "headers": [], "rows": [], "red_cells": [], "mapped_headers": [] } # Process headers from first row header_row = table.rows[0] headers = [] mapped_headers = [] for c_idx, cell in enumerate(header_row.cells[:ncols]): cell_text = cell.text.strip() # Try mapping using EXTRA_HEADER_SYNONYMS first mapped = map_header_using_extra_synonyms(cell_text) if mapped: header_label = mapped log.debug(f"Mapped header '{cell_text}' -> '{mapped}'") else: header_label = cell_text headers.append(cell_text) # Original header mapped_headers.append(header_label) # Mapped header # Process all rows rows_text = [] rows_red_cells = [] rows_red_metadata = [] for r_i, row in enumerate(table.rows): row_texts = [] row_reds = [] row_red_meta = [] for c_i, cell in enumerate(row.cells[:ncols]): cell_text = cell.text.strip() # Extract red text segments with metadata red_segments = _extract_red_text_segments(cell) if red_segments: # Join all red text segments red_text_parts = [seg['text'] for seg in red_segments if seg['text']] red_text_joined = " ".join(red_text_parts).strip() # Store metadata about red text location red_metadata = { "has_red": True, "red_text": red_text_joined, "segments": len(red_segments), "total_red_runs": sum(len(seg['runs']) for seg in red_segments) } else: red_text_joined = None red_metadata = {"has_red": False} row_texts.append(cell_text) row_reds.append(red_text_joined) row_red_meta.append(red_metadata) rows_text.append(row_texts) rows_red_cells.append(row_reds) rows_red_metadata.append(row_red_meta) return { "table_index": t_index, "nrows": nrows, "ncols": ncols, "headers": headers, # Original headers "mapped_headers": mapped_headers, # Mapped headers "rows": rows_text, "red_cells": rows_red_cells, "red_metadata": rows_red_metadata # Additional red text metadata } # ------------------------------ # Extraction: paragraphs, headings, tables # ------------------------------ def extract_from_docx(path: str) -> Dict[str, Any]: """Extract content from DOCX with enhanced red text detection and schema mapping.""" log.info(f"Opening document: {path}") doc = Document(path) headings: List[str] = [] paragraphs_red: List[Dict[str, Any]] = [] red_runs: List[Dict[str, Any]] = [] tables_out: List[Dict[str, Any]] = [] # Extract headings and paragraphs with red runs log.info("Processing paragraphs and headings...") for p_index, para in enumerate(doc.paragraphs): text = para.text or "" # Identify heading level from style name if available style_name = getattr(para.style, "name", "") if para.style is not None else "" is_heading = bool(re.search(r"Heading\s*\d+|HEADING|TITLE|SUBTITLE", style_name, flags=re.I)) or \ bool(re.search(r"^(MAINTENANCE|MASS|FATIGUE|NHVAS|Vehicle Registration|CORRECTIVE)", text, flags=re.I)) if is_heading: headings.append(text.strip()) log.debug(f"Found heading: {text.strip()}") # Gather red runs in this paragraph paragraph_red_texts = [] char_cursor = 0 for run in para.runs: run_text = run.text or "" run_len = len(run_text) if _run_is_red(run) and run_text.strip(): # Store a red run entry rr = { "text": run_text, "paragraph_index": p_index, "char_index": char_cursor, "style_name": style_name, "normalized_text": _normalize_text(run_text) } red_runs.append(rr) paragraph_red_texts.append(run_text) log.debug(f"Found red text in paragraph {p_index}: '{run_text.strip()}'") char_cursor += run_len if paragraph_red_texts: paragraphs_red.append({ "paragraph_index": p_index, "text": text, "red_texts": paragraph_red_texts, "style_name": style_name, "red_text_joined": " ".join(paragraph_red_texts).strip() }) # Extract tables with enhanced processing log.info(f"Processing {len(doc.tables)} tables...") for t_index, table in enumerate(doc.tables): table_data = _process_table_with_schema_mapping(table, t_index) tables_out.append(table_data) # Log red text findings red_cell_count = sum(1 for row in table_data["red_cells"] for cell in row if cell) if red_cell_count > 0: log.info(f"Table {t_index}: Found {red_cell_count} cells with red text") # Assemble output structure out = { "headings": headings, "paragraphs": paragraphs_red, "tables": tables_out, "red_runs": red_runs, # Enhanced metadata "meta": { "source_file": path, "total_headings": len(headings), "total_red_paragraphs": len(paragraphs_red), "total_tables": len(tables_out), "total_red_runs": len(red_runs), "total_red_cells": sum( sum(1 for cell in row for cell in table["red_cells"] if cell) for table in tables_out ), "global_settings_used": { "normalization": GLOBAL_SETTINGS.get("normalize", {}), "ocr_repair_rules_count": len(GLOBAL_SETTINGS.get("ocr_repair_rules", [])), "synonyms_count": len(EXTRA_HEADER_SYNONYMS) if EXTRA_HEADER_SYNONYMS else 0 } } } return out # ------------------------------ # Command-line interface # ------------------------------ def main(argv): if len(argv) < 3: print("Usage: python extract_red_text.py input.docx output.json") sys.exit(2) input_docx = argv[1] output_json = argv[2] log.info("Starting red text extraction from: %s", input_docx) log.info("Using master_key configuration with %d header synonyms", len(EXTRA_HEADER_SYNONYMS) if EXTRA_HEADER_SYNONYMS else 0) try: result = extract_from_docx(input_docx) except Exception as exc: log.exception("Failed to extract from docx: %s", exc) raise # Save JSON pretty-printed for debugging by default try: with open(output_json, "w", encoding="utf-8") as fh: json.dump(result, fh, ensure_ascii=False, indent=2) log.info("Saved extracted data to: %s", output_json) except Exception: log.exception("Failed to write output JSON to %s", output_json) raise # Print comprehensive summary meta = result.get("meta", {}) log.info("=== EXTRACTION SUMMARY ===") log.info("Headings found: %d", meta.get("total_headings", 0)) log.info("Red paragraphs: %d", meta.get("total_red_paragraphs", 0)) log.info("Red runs total: %d", meta.get("total_red_runs", 0)) log.info("Tables processed: %d", meta.get("total_tables", 0)) log.info("Red cells found: %d", meta.get("total_red_cells", 0)) log.info("Header synonyms used: %d", meta.get("global_settings_used", {}).get("synonyms_count", 0)) if __name__ == "__main__": main(sys.argv) # Print output for verification if len(sys.argv) >= 3: try: with open(sys.argv[2], 'r') as f: print(f"\nšŸ“„ EXTRACT_RED_TEXT OUTPUT:\n{f.read()}") except Exception as e: print(f"\nāŒ Could not read output file: {e}")