PDF-Data_Extractor / extract_red_text.py
Shami96's picture
Update extract_red_text.py
704d2a2 verified
raw
history blame
13.2 kB
#!/usr/bin/env python3
"""
extract_red_text.py
"""
from __future__ import annotations
import json
import re
import sys
import logging
from collections import defaultdict
from typing import List, Dict, Optional, Any
# attempt to import python-docx (document processing)
try:
from docx import Document
from docx.oxml.ns import qn
from docx.shared import RGBColor
except Exception as e:
raise RuntimeError("python-docx is required. Install with: pip install python-docx") from e
# ------------------------------
# Import master_key GLOBAL_SETTINGS and optional EXTRA_HEADER_SYNONYMS
# ------------------------------
try:
import master_key as mk
GLOBAL_SETTINGS = getattr(mk, "GLOBAL_SETTINGS", {})
EXTRA_HEADER_SYNONYMS = getattr(mk, "EXTRA_HEADER_SYNONYMS", None)
except Exception:
GLOBAL_SETTINGS = {
"normalize": {
"lower": True,
"strip_punctuation": True,
"collapse_whitespace": True,
"replace_smart_dashes": True
},
"ocr_repair_rules": [
(r"\s*\(\s*Yes\s*/\s*No\s*\)", " (Yes/No)"),
(r"R[e3]gistrat[i1]on", "Registration"),
(r"Prin?t", "Print"),
(r"Accredi[ta]tion", "Accreditation"),
(r"[^\w\s\-\&\(\)\/:]", " "),
],
"split_on": [" – ", " - ", ";", "\n", " / "],
"date_like_pattern": r"^\s*(\d{1,2}(st|nd|rd|th)?\s+[A-Za-z]+|\d{1,2}\/\d{1,2}\/\d{2,4}|\d{1,2}\.\d{1,2}\.\d{2,4}|\d{1,2}\s+[A-Za-z]{3,})",
"fuzzy_thresholds": {"high_priority": 70, "medium_priority": 60, "low_priority": 45},
"fuzzy_algorithm": "token_set_ratio",
}
EXTRA_HEADER_SYNONYMS = None
# Provide an internal default synonyms map (compact keys -> canonical label)
# This is used only if master_key.EXTRA_HEADER_SYNONYMS is not defined.
_DEFAULT_EXTRA_HEADER_SYNONYMS = {
# Compact key: canonical label
# Examples from your logs (long/noisy headers)
"roadworthinesscertificatesapplicableforentryaudit": "Roadworthiness Certificates",
"roadworthinesscertificates": "Roadworthiness Certificates",
"rfsuspensioncertificationn/aifnotapplicable": "RFS Suspension Certification #",
"rfsuspensioncertification": "RFS Suspension Certification #",
"maintenanceRecordsrecorddaterangeofrecordsreviewed".lower(): "Maintenance Records",
"maintenancerecords": "Maintenance Records",
"faultrecordingreportingonsuspensionsystemdaterange".lower(): "Fault Recording/ Reporting",
"faultrecordingreporting": "Fault Recording/ Reporting",
"faultrepairdaterange": "Fault Repair",
"triprecordsdaterange": "Trip Records",
# Add common variations
"registrationnumber": "Registration Number",
"registrationnumbernumber": "Registration Number",
"subcontractor(yesno)": "Sub-contractor (Yes/No)",
"sub-contractor(yes/no)": "Sub-contractor (Yes/No)",
"nhvrorexemplarglobalauditorregistrationnumber": "NHVR or Exemplar Global Auditor Registration Number",
"printname": "Print Name",
"print": "Print Name",
}
# If mk provided EXTRA_HEADER_SYNONYMS, use it (but ensure keys are compacted similarly)
if EXTRA_HEADER_SYNONYMS is None:
EXTRA_HEADER_SYNONYMS = _DEFAULT_EXTRA_HEADER_SYNONYMS
# ------------------------------
# Logging
# ------------------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
log = logging.getLogger("extract_red_text")
# ------------------------------
# Normalization & OCR-repair utilities (aligned to GLOBAL_SETTINGS)
# ------------------------------
def _apply_ocr_repair_rules(text: str) -> str:
s = text or ""
for pat, repl in GLOBAL_SETTINGS.get("ocr_repair_rules", []):
try:
s = re.sub(pat, repl, s, flags=re.I)
except re.error:
# skip invalid rule
continue
return s
def _normalize_text(text: str) -> str:
"""Normalize text according to GLOBAL_SETTINGS (readable normalized form)."""
s = _apply_ocr_repair_rules(text or "")
norm_cfg = GLOBAL_SETTINGS.get("normalize", {})
if norm_cfg.get("replace_smart_dashes", False):
s = s.replace("–", "-").replace("—", "-")
if norm_cfg.get("lower", False):
s = s.lower()
if norm_cfg.get("strip_punctuation", False):
# keep hyphen, ampersand, parentheses, slash, colon; drop other punctuation
s = re.sub(r"[^\w\s\-\&\(\)\/:]", " ", s)
if norm_cfg.get("collapse_whitespace", False):
s = re.sub(r"\s+", " ", s)
return s.strip()
def _compact_key(text: str) -> str:
"""Create compact key (no non-word chars) for deterministic lookup."""
if text is None:
return ""
normalized = _normalize_text(text)
return re.sub(r"[^\w]", "", normalized)
def map_header_using_extra_synonyms(header_text: str) -> Optional[str]:
"""
Try deterministic mapping using EXTRA_HEADER_SYNONYMS.
Return canonical label if found, else None.
"""
if not header_text:
return None
normalized = _normalize_text(header_text)
compact = _compact_key(header_text)
# try compact key
if compact in EXTRA_HEADER_SYNONYMS:
return EXTRA_HEADER_SYNONYMS[compact]
# try normalized key directly
if normalized in EXTRA_HEADER_SYNONYMS:
return EXTRA_HEADER_SYNONYMS[normalized]
# also try case-insensitive match on keys
for k, v in EXTRA_HEADER_SYNONYMS.items():
if k.lower() == normalized.lower() or k.lower() == compact.lower():
return v
return None
# ------------------------------
# Helpers to detect red font runs robustly
# ------------------------------
def _run_is_red(run) -> bool:
"""
Detect if a run is red. python-docx represents color by run.font.color.
We check RGB if available, or theme color 'red' as fallback.
"""
try:
color = run.font.color
if color is None:
return False
# If RGB is specified
rgb = getattr(color, "rgb", None)
if rgb is not None:
# rgb is a docx.shared.RGBColor or similar. Representable as 'FF0000' or integer tuple
hexval = ''.join("{:02X}".format(c) for c in rgb) if isinstance(rgb, (tuple, list)) else str(rgb)
# accept strings containing 'FF0000' or '0000FF'? (we want red)
# Accept any color where red component is high and others low-ish
try:
# If hex-like 'FF0000' -> interpret
hex_clean = re.sub(r"[^0-9A-Fa-f]", "", hexval)
if len(hex_clean) >= 6:
r = int(hex_clean[-6:-4], 16)
g = int(hex_clean[-4:-2], 16)
b = int(hex_clean[-2:], 16)
if r >= 150 and g < 120 and b < 120:
return True
except Exception:
pass
# fallback: theme color or color.theme_color value
theme_color = getattr(color, "theme_color", None)
if theme_color:
try:
if str(theme_color).lower().find("red") != -1:
return True
except Exception:
pass
except Exception:
pass
# final heuristic: if run.font.color.rgb as string contains 'FF' prefix and '00' for others
try:
if hasattr(run.font.color, "rgb") and run.font.color.rgb is not None:
s = str(run.font.color.rgb)
if "FF" in s and "0000" in s:
return True
except Exception:
pass
return False
# ------------------------------
# Extraction: paragraphs, headings, tables
# ------------------------------
def extract_from_docx(path: str) -> Dict[str, Any]:
doc = Document(path)
headings: List[str] = []
paragraphs_red: List[Dict[str, Any]] = []
red_runs: List[Dict[str, Any]] = []
tables_out: List[Dict[str, Any]] = []
# extract headings and paragraphs with red runs
for p_index, para in enumerate(doc.paragraphs):
text = para.text or ""
# identify heading level from style name if available
style_name = getattr(para.style, "name", "") if para.style is not None else ""
is_heading = bool(re.search(r"Heading\s*\d+|HEADING|TITLE|SUBTITLE", style_name, flags=re.I)) or bool(re.search(r"^(MAINTENANCE|MASS|FATIGUE|NHVAS|Vehicle Registration|CORRECTIVE)", text, flags=re.I))
if is_heading:
headings.append(text.strip())
# gather red runs in this paragraph
paragraph_red_texts = []
char_cursor = 0
for run in para.runs:
run_text = run.text or ""
run_len = len(run_text)
if _run_is_red(run) and run_text.strip():
# store a red run entry
rr = {
"text": run_text,
"paragraph_index": p_index,
"char_index": char_cursor,
"style_name": style_name
}
red_runs.append(rr)
paragraph_red_texts.append(run_text)
char_cursor += run_len
if paragraph_red_texts:
paragraphs_red.append({
"paragraph_index": p_index,
"text": text,
"red_texts": paragraph_red_texts,
"style_name": style_name
})
# extract tables
for t_index, table in enumerate(doc.tables):
# convert table to simple cell-text matrix
nrows = len(table.rows)
ncols = max(len(row.cells) for row in table.rows) if nrows > 0 else 0
headers = []
rows_text = []
rows_red_cells = []
# Attempt to treat first row as header if cells look like headers (bold or all-caps)
header_row = table.rows[0] if nrows > 0 else None
# build header texts & apply header mapping
if header_row:
for c_idx, cell in enumerate(header_row.cells):
cell_text = cell.text.strip()
# normalize & map using EXTRA_HEADER_SYNONYMS
mapped = map_header_using_extra_synonyms(cell_text)
if mapped:
header_label = mapped
else:
header_label = cell_text
headers.append(header_label)
# process all rows -> list of lists
for r_i, row in enumerate(table.rows):
row_texts = []
row_reds = []
for c_i, cell in enumerate(row.cells):
ct = cell.text.strip()
# gather red text from runs in this cell
red_in_cell = []
# docx cell may have paragraphs
for cpara in cell.paragraphs:
for run in cpara.runs:
if _run_is_red(run) and (run.text or "").strip():
red_in_cell.append((run.text or "").strip())
# compact red text into a single string if multiple runs present
red_text_joined = " ".join(red_in_cell) if red_in_cell else None
row_texts.append(ct)
row_reds.append(red_text_joined)
rows_text.append(row_texts)
rows_red_cells.append(row_reds)
tables_out.append({
"table_index": t_index,
"nrows": nrows,
"ncols": ncols,
"headers": headers,
"rows": rows_text,
"red_cells": rows_red_cells
})
# assemble output structure
out = {
"headings": headings,
"paragraphs": paragraphs_red,
"tables": tables_out,
"red_runs": red_runs,
# helpful metadata for downstream processing
"meta": {
"source_file": path,
"total_headings": len(headings),
"total_red_paragraphs": len(paragraphs_red),
"total_tables": len(tables_out),
"total_red_runs": len(red_runs)
}
}
return out
# ------------------------------
# Command-line interface
# ------------------------------
def main(argv):
if len(argv) < 3:
print("Usage: python extract_red_text.py input.docx output.json")
sys.exit(2)
input_docx = argv[1]
output_json = argv[2]
log.info("Extracting red text from: %s", input_docx)
try:
result = extract_from_docx(input_docx)
except Exception as exc:
log.exception("Failed to extract from docx: %s", exc)
raise
# Save JSON pretty-printed for debugging by default
try:
with open(output_json, "w", encoding="utf-8") as fh:
json.dump(result, fh, ensure_ascii=False, indent=2)
log.info("Saved extracted word JSON to: %s", output_json)
except Exception:
log.exception("Failed to write output JSON to %s", output_json)
raise
# Print a short summary for logs / quick verification
log.info("Headings found: %d, Red paragraphs: %d, Tables: %d, Red runs: %d",
len(result.get("headings", [])),
len(result.get("paragraphs", [])),
len(result.get("tables", [])),
len(result.get("red_runs", []))
)
if __name__ == "__main__":
main(sys.argv)