PDF-Data_Extractor / extract_red_text.py
Shami96's picture
Update extract_red_text.py
7a2fc08 verified
raw
history blame
24.4 kB
#!/usr/bin/env python3
"""
extract_red_text.py
Hardened version: preserves original logic/prints while improving header-label mapping,
robustness to missing hf_utils and better synonym handling for vehicle tables.
"""
import re
import json
import sys
from docx import Document
from docx.oxml.ns import qn
# Try to reuse your hf_utils if available (non-breaking); otherwise fall back to local helpers.
try:
from hf_utils import (
is_red_font,
normalize_text,
normalize_header_text,
get_clean_text,
)
except Exception:
# Minimal compatible fallbacks if hf_utils is not present.
def normalize_text(s: str) -> str:
if not s:
return ""
s = re.sub(r"\u2013|\u2014", "-", s) # smart dashes
s = re.sub(r"[^\w\s\-\&\(\)\/:]", " ", s) # keep a small set of punctuation
s = re.sub(r"\s+", " ", s).strip()
return s
def normalize_header_text(s: str) -> str:
return normalize_text(s).upper()
def is_red_font(run):
"""Best-effort red detection fallback for when hf_utils isn't available."""
try:
col = getattr(run.font, "color", None)
if col and getattr(col, "rgb", None):
rgb = col.rgb
r, g, b = rgb[0], rgb[1], rgb[2]
if r > 150 and g < 120 and b < 120 and (r - max(g, b)) > 30:
return True
except Exception:
pass
# fallback to xml check
try:
rPr = getattr(run._element, "rPr", None)
if rPr is not None:
clr = rPr.find(qn('w:color'))
if clr is not None:
val = clr.get(qn('w:val'))
if val and re.fullmatch(r"[0-9A-Fa-f]{6}", val):
rr, gg, bb = int(val[:2], 16), int(val[2:4], 16), int(val[4:], 16)
if rr > 150 and gg < 120 and bb < 120 and (rr - max(gg, bb)) > 30:
return True
except Exception:
pass
return False
def get_clean_text(elem):
return "".join(node.text for node in elem.iter() if node.tag.endswith("}t") and node.text).strip()
# Import master schemas and patterns (your file)
from master_key import TABLE_SCHEMAS, HEADING_PATTERNS, PARAGRAPH_PATTERNS
# ---------------------------------------------------------------------
# Low-level helpers (kept and hardened)
# ---------------------------------------------------------------------
def _prev_para_text(tbl):
"""Get text from previous paragraph before table"""
prev = tbl._tbl.getprevious()
while prev is not None and not prev.tag.endswith("}p"):
prev = prev.getprevious()
if prev is None:
return ""
return "".join(node.text for node in prev.iter() if node.tag.endswith("}t") and node.text).strip()
def get_table_context(tbl):
"""Return structured context for a table"""
heading = normalize_text(_prev_para_text(tbl))
headers = [normalize_text(c.text) for c in tbl.rows[0].cells if c.text.strip()] if tbl.rows else []
col0 = [normalize_text(r.cells[0].text) for r in tbl.rows if r.cells and r.cells[0].text.strip()]
first_cell = normalize_text(tbl.rows[0].cells[0].text) if tbl.rows else ""
all_cells = []
for row in tbl.rows:
for cell in row.cells:
t = normalize_text(cell.text)
if t:
all_cells.append(t)
return {
"heading": heading,
"headers": headers,
"col0": col0,
"first_cell": first_cell,
"all_cells": all_cells,
"num_rows": len(tbl.rows),
"num_cols": len(tbl.rows[0].cells) if tbl.rows else 0,
}
def fuzzy_match_heading(heading, patterns):
"""Return True if heading fuzzy-matches any regex patterns"""
if not heading:
return False
heading_norm = heading.upper()
for pattern in patterns:
try:
if re.search(pattern, heading_norm, re.IGNORECASE):
return True
except re.error:
if pattern.upper() in heading_norm:
return True
return False
# ---------------------------------------------------------------------
# Header-to-label synonym map: improved coverage for common OCR/header variants
# ---------------------------------------------------------------------
HEADER_SYNONYMS = {
# normalized header (upper) -> canonical label in TABLE_SCHEMAS
"NO": "No.",
"NO.": "No.",
"REG NO": "Registration Number",
"REGISTRATIONNO": "Registration Number",
"REGISTRATION NUMBER": "Registration Number",
"REGISTRATION": "Registration Number",
"PRINT NAME": "Print Name",
"NHVR OR EXEMPLAR GLOBAL AUDITOR REGISTRATION NUMBER": "NHVR or Exemplar Global Auditor Registration Number",
"ROADWORTHINESS CERTIFICATES": "Roadworthiness Certificates",
"ROADWORTHINESS CERTIFICATES (APPLICABLE FOR ENTRY AUDIT)": "Roadworthiness Certificates",
"MAINTENANCE RECORDS": "Maintenance Records",
"DAILY CHECKS": "Daily Checks",
"FAULT RECORDING/ REPORTING": "Fault Recording/ Reporting",
"FAULT RECORDING/REPORTING": "Fault Recording/ Reporting",
"FAULT REPAIR": "Fault Repair",
"WEIGHT VERIFICATION RECORDS": "Weight Verification Records",
"RFS SUSPENSION CERTIFICATION #": "RFS Suspension Certification #",
"SUSPENSION SYSTEM MAINTENANCE": "Suspension System Maintenance",
"TRIP RECORDS": "Trip Records",
"FAULT RECORDING/ REPORTING ON SUSPENSION SYSTEM": "Fault Recording/ Reporting",
# short forms
"REG NO.": "Registration Number",
"REGISTRATION #": "Registration Number",
}
def map_header_to_label(header_text, labels):
"""
Given a header_text (raw) and list of candidate labels (from schema),
return the best matching label or None.
"""
if not header_text:
return None
hnorm = normalize_header_text(header_text)
# exact synonym map
for key, lab in HEADER_SYNONYMS.items():
if key in hnorm:
# ensure lab exists in candidate labels (case-insensitive)
for cand in labels:
if normalize_header_text(cand) == normalize_header_text(lab):
return cand
# if it isn't in labels, still return the lab (labels sometimes omit punctuation)
return lab
# try exact match to any candidate label
for cand in labels:
if normalize_header_text(cand) == hnorm:
return cand
# token overlap scoring (flexible)
header_words = [w for w in re.split(r"\W+", header_text) if len(w) > 2]
best = (None, 0.0)
for cand in labels:
cand_words = [w for w in re.split(r"\W+", cand) if len(w) > 2]
if not cand_words or not header_words:
continue
common = set(w.upper() for w in header_words).intersection(set(w.upper() for w in cand_words))
score = len(common) / max(1, max(len(header_words), len(cand_words)))
if score > best[1]:
best = (cand, score)
# lower threshold for vehicle tables / noisy OCR (accept >= 0.25)
if best[1] >= 0.25:
return best[0]
return None
# ---------------------------------------------------------------------
# Matching / scoring logic (keeps original heuristics)
# ---------------------------------------------------------------------
def calculate_schema_match_score(schema_name, spec, context):
score = 0
reasons = []
# Vehicle registration boost
if "Vehicle Registration" in schema_name:
vehicle_keywords = ["registration", "vehicle", "sub-contractor", "weight verification", "rfs suspension"]
table_text = " ".join(context["headers"]).lower() + " " + context["heading"].lower()
keyword_matches = sum(1 for k in vehicle_keywords if k in table_text)
if keyword_matches >= 2:
score += 150
reasons.append(f"Vehicle Registration keywords: {keyword_matches}/5")
elif keyword_matches >= 1:
score += 75
reasons.append(f"Some Vehicle Registration keywords: {keyword_matches}/5")
# Summary boost
if "Summary" in schema_name and "details" in " ".join(context["headers"]).lower():
score += 100
reasons.append("Summary schema with DETAILS column - perfect match")
if "Summary" not in schema_name and "details" in " ".join(context["headers"]).lower():
score -= 75
reasons.append("Non-summary schema penalized for DETAILS column presence")
# context exclusions & keywords
if spec.get("context_exclusions"):
table_text = " ".join(context["headers"]).lower() + " " + context["heading"].lower()
for exc in spec["context_exclusions"]:
if exc.lower() in table_text:
score -= 50
reasons.append(f"Context exclusion penalty: '{exc}'")
if spec.get("context_keywords"):
table_text = " ".join(context["headers"]).lower() + " " + context["heading"].lower()
matches = sum(1 for kw in spec["context_keywords"] if kw.lower() in table_text)
if matches:
score += matches * 15
reasons.append(f"Context keyword matches: {matches}/{len(spec['context_keywords'])}")
# direct first-cell match
if context["first_cell"] and context["first_cell"].upper() == schema_name.upper():
score += 100
reasons.append(f"Direct first cell match: '{context['first_cell']}'")
# heading pattern
if spec.get("headings"):
for h in spec["headings"]:
if isinstance(h, dict):
text = h.get("text", "")
else:
text = h
if fuzzy_match_heading(context["heading"], [text]):
score += 50
reasons.append(f"Heading match: '{context['heading']}'")
break
# columns matching
if spec.get("columns"):
cols = [normalize_text(c) for c in spec["columns"]]
matches = 0
for col in cols:
if any(col.upper() in h.upper() for h in context["headers"]):
matches += 1
if matches == len(cols):
score += 60
reasons.append(f"All column headers match: {cols}")
elif matches > 0:
score += matches * 20
reasons.append(f"Partial column matches: {matches}/{len(cols)}")
# left orientation
if spec.get("orientation") == "left":
labels = [normalize_text(lbl) for lbl in spec.get("labels", [])]
matches = 0
for lbl in labels:
if any(lbl.upper() in c.upper() or c.upper() in lbl.upper() for c in context["col0"]):
matches += 1
if matches > 0:
score += (matches / max(1, len(labels))) * 30
reasons.append(f"Left orientation label matches: {matches}/{len(labels)}")
# row1 orientation
elif spec.get("orientation") == "row1":
labels = [normalize_text(lbl) for lbl in spec.get("labels", [])]
matches = 0
for lbl in labels:
if any(lbl.upper() in h.upper() or h.upper() in lbl.upper() for h in context["headers"]):
matches += 1
elif any(word.upper() in " ".join(context["headers"]).upper() for word in lbl.split() if len(word) > 3):
matches += 0.5
if matches > 0:
score += (matches / max(1, len(labels))) * 40
reasons.append(f"Row1 orientation header matches: {matches}/{len(labels)}")
# Declarations special cases
if schema_name == "Operator Declaration" and context["first_cell"].upper().startswith("PRINT"):
if "OPERATOR DECLARATION" in context["heading"].upper():
score += 80
reasons.append("Operator Declaration context match")
elif any("MANAGER" in cell.upper() for cell in context["all_cells"]):
score += 60
reasons.append("Manager found in cells (likely Operator Declaration)")
if schema_name == "NHVAS Approved Auditor Declaration" and context["first_cell"].upper().startswith("PRINT"):
if any("MANAGER" in cell.upper() for cell in context["all_cells"]):
score -= 50
reasons.append("Penalty: Manager found (not auditor)")
return score, reasons
def match_table_schema(tbl):
context = get_table_context(tbl)
best_match = None
best_score = 0
for name, spec in TABLE_SCHEMAS.items():
score, reasons = calculate_schema_match_score(name, spec, context)
if score > best_score:
best_score = score
best_match = name
if best_score >= 20:
return best_match
return None
# ---------------------------------------------------------------------
# Multi-schema detection & extraction (keeps original behavior)
# ---------------------------------------------------------------------
def check_multi_schema_table(tbl):
context = get_table_context(tbl)
operator_labels = [
"Operator name (Legal entity)", "NHVAS Accreditation No.", "Registered trading name/s",
"Australian Company Number", "NHVAS Manual"
]
contact_labels = ["Operator business address", "Operator Postal address", "Email address", "Operator Telephone Number"]
has_operator = any(any(op_lbl.upper() in cell.upper() for op_lbl in operator_labels) for cell in context["col0"])
has_contact = any(any(cont_lbl.upper() in cell.upper() for cont_lbl in contact_labels) for cell in context["col0"])
if has_operator and has_contact:
return ["Operator Information", "Operator contact details"]
return None
def extract_multi_schema_table(tbl, schemas):
result = {}
for schema_name in schemas:
if schema_name not in TABLE_SCHEMAS:
continue
spec = TABLE_SCHEMAS[schema_name]
schema_data = {}
for ri, row in enumerate(tbl.rows):
if ri == 0:
continue
row_label = normalize_text(row.cells[0].text)
belongs_to_schema = False
matched_label = None
for spec_label in spec.get("labels", []):
spec_norm = normalize_text(spec_label).upper()
row_norm = row_label.upper()
if spec_norm == row_norm or spec_norm in row_norm or row_norm in spec_norm:
belongs_to_schema = True
matched_label = spec_label
break
if not belongs_to_schema:
continue
for ci, cell in enumerate(row.cells):
red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip()
if red_txt:
schema_data.setdefault(matched_label, [])
if red_txt not in schema_data[matched_label]:
schema_data[matched_label].append(red_txt)
if schema_data:
result[schema_name] = schema_data
return result
# ---------------------------------------------------------------------
# Extraction: special-case for Vehicle Registration tables (row1) and generic fallback
# ---------------------------------------------------------------------
def extract_table_data(tbl, schema_name, spec):
# Vehicle registration special handling
if "Vehicle Registration" in schema_name:
print(f" 🚗 EXTRACTION FIX: Processing Vehicle Registration table")
labels = spec.get("labels", [])
collected = {lbl: [] for lbl in labels}
seen = {lbl: set() for lbl in labels}
if len(tbl.rows) < 2:
print(" ❌ Vehicle table has less than 2 rows")
return {}
header_row = tbl.rows[0]
column_mapping = {}
print(f" 📋 Mapping {len(header_row.cells)} header cells to labels")
for col_idx, cell in enumerate(header_row.cells):
header_text = normalize_text(cell.text).strip()
if not header_text:
continue
print(f" Column {col_idx}: '{header_text}'")
mapped = map_header_to_label(header_text, labels)
if mapped:
# find exact candidate label string (preserve original label spelling if possible)
chosen = None
for cand in labels:
if normalize_header_text(cand) == normalize_header_text(mapped):
chosen = cand
break
column_mapping[col_idx] = chosen or mapped
print(f" ✅ Mapped to: '{column_mapping[col_idx]}'")
else:
# fallback: try fuzzy token overlap directly with candidate labels
best = None
best_score = 0.0
hwords = [w for w in re.split(r"\W+", header_text) if len(w) > 2]
for cand in labels:
cwords = [w for w in re.split(r"\W+", cand) if len(w) > 2]
if not cwords or not hwords:
continue
common = set(w.upper() for w in hwords).intersection(set(w.upper() for w in cwords))
score = len(common) / max(1, max(len(hwords), len(cwords)))
if score > best_score:
best = cand
best_score = score
if best and best_score >= 0.25:
column_mapping[col_idx] = best
print(f" ✅ Fuzzy-mapped to: '{best}' (score: {best_score:.2f})")
else:
print(f" ⚠️ No mapping found for '{header_text}'")
print(f" 📊 Total column mappings: {len(column_mapping)}")
# Extract red text from data rows
for row_idx in range(1, len(tbl.rows)):
row = tbl.rows[row_idx]
print(f" 📌 Processing data row {row_idx}")
for col_idx, cell in enumerate(row.cells):
if col_idx in column_mapping:
label = column_mapping[col_idx]
red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip()
if red_txt:
print(f" 🔴 Found red text in '{label}': '{red_txt}'")
if red_txt not in seen.setdefault(label, set()):
seen[label].add(red_txt)
collected.setdefault(label, []).append(red_txt)
result = {k: v for k, v in collected.items() if v}
print(f" ✅ Vehicle Registration extracted: {len(result)} columns with data")
return result
# Generic fallback extraction logic
labels = spec.get("labels", []) + [schema_name]
collected = {lbl: [] for lbl in labels}
seen = {lbl: set() for lbl in labels}
by_col = (spec.get("orientation") == "row1")
start_row = 1 if by_col else 0
rows = tbl.rows[start_row:]
for ri, row in enumerate(rows):
for ci, cell in enumerate(row.cells):
red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip()
if not red_txt:
continue
if by_col:
if ci < len(spec.get("labels", [])):
lbl = spec["labels"][ci]
else:
lbl = schema_name
else:
raw_label = normalize_text(row.cells[0].text)
lbl = None
for spec_label in spec.get("labels", []):
if normalize_text(spec_label).upper() == raw_label.upper():
lbl = spec_label
break
if not lbl:
for spec_label in spec.get("labels", []):
spec_norm = normalize_text(spec_label).upper()
raw_norm = raw_label.upper()
if spec_norm in raw_norm or raw_norm in spec_norm:
lbl = spec_label
break
if not lbl:
lbl = schema_name
if red_txt not in seen.setdefault(lbl, set()):
seen[lbl].add(red_txt)
collected.setdefault(lbl, []).append(red_txt)
return {k: v for k, v in collected.items() if v}
# ---------------------------------------------------------------------
# Main extraction: process all tables then paragraphs
# ---------------------------------------------------------------------
def extract_red_text(input_doc):
if isinstance(input_doc, str):
doc = Document(input_doc)
else:
doc = input_doc
out = {}
table_count = 0
for tbl in doc.tables:
table_count += 1
multi_schemas = check_multi_schema_table(tbl)
if multi_schemas:
multi_data = extract_multi_schema_table(tbl, multi_schemas)
for schema_name, schema_data in multi_data.items():
if schema_data:
# merge safely and dedupe
existing = out.get(schema_name, {})
for k, v in schema_data.items():
existing.setdefault(k, [])
for val in v:
if val not in existing[k]:
existing[k].append(val)
out[schema_name] = existing
continue
schema = match_table_schema(tbl)
if not schema:
continue
spec = TABLE_SCHEMAS[schema]
data = extract_table_data(tbl, schema, spec)
if data:
existing = out.get(schema, {})
for k, v in data.items():
existing.setdefault(k, [])
for val in v:
if val not in existing[k]:
existing[k].append(val)
out[schema] = existing
# Paragraph red-text extraction with context
paras = {}
for idx, para in enumerate(doc.paragraphs):
red_txt = "".join(r.text for r in para.runs if is_red_font(r)).strip()
if not red_txt:
continue
# find a heading context by scanning backwards
context = None
for j in range(idx - 1, -1, -1):
txt = normalize_text(doc.paragraphs[j].text)
if txt:
patterns = HEADING_PATTERNS["main"] + HEADING_PATTERNS["sub"]
if any(re.search(p, txt, re.IGNORECASE) for p in patterns):
context = txt
break
# special-case date-like lines
if not context and re.fullmatch(PARAGRAPH_PATTERNS.get("date_line", r".*"), red_txt):
context = "Date"
if not context:
context = "(para)"
paras.setdefault(context, [])
if red_txt not in paras[context]:
paras[context].append(red_txt)
if paras:
out["paragraphs"] = paras
return out
# ---------------------------------------------------------------------
# File wrapper to support your existing calls
# ---------------------------------------------------------------------
def extract_red_text_filelike(input_file, output_file):
if hasattr(input_file, "seek"):
input_file.seek(0)
doc = Document(input_file)
result = extract_red_text(doc)
if hasattr(output_file, "write"):
json.dump(result, output_file, indent=2, ensure_ascii=False)
output_file.flush()
else:
with open(output_file, "w", encoding="utf-8") as f:
json.dump(result, f, indent=2, ensure_ascii=False)
return result
# ---------------------------------------------------------------------
# CLI entrypoint (same as before)
# ---------------------------------------------------------------------
if __name__ == "__main__":
if len(sys.argv) == 3:
input_docx = sys.argv[1]
output_json = sys.argv[2]
doc = Document(input_docx)
word_data = extract_red_text(doc)
# write file (dedupe already handled in merging logic above)
with open(output_json, "w", encoding="utf-8") as f:
json.dump(word_data, f, indent=2, ensure_ascii=False)
print(json.dumps(word_data, indent=2, ensure_ascii=False))
else:
print("To use as a module: extract_red_text_filelike(input_file, output_file)")