PDF-Data_Extractor / updated_word.py
Shami96's picture
Update updated_word.py
ddb37e5 verified
raw
history blame
17.9 kB
import json
from docx import Document
from docx.shared import RGBColor
import re
def load_json(filepath):
with open(filepath, 'r') as file:
return json.load(file)
def flatten_json_new_system(json_data):
"""Flatten your new JSON structure to work with replacement logic"""
flat_json = {}
for schema_name, schema_data in json_data.items():
if isinstance(schema_data, dict):
for field_name, values in schema_data.items():
# Handle list values (your system returns lists)
if isinstance(values, list) and values:
value = values[0] if len(values) == 1 else values
else:
value = values
# Add multiple key variations for better matching
flat_json[field_name] = value
flat_json[field_name.lower()] = value
flat_json[field_name.lower().strip()] = value
# Add schema-prefixed keys
flat_json[f"{schema_name}.{field_name}"] = value
flat_json[f"{schema_name.lower()}.{field_name.lower()}"] = value
# Special mappings for common cases
if "print name" in field_name.lower():
flat_json["print name"] = value
flat_json["operator name"] = value
flat_json["name"] = value
if "position title" in field_name.lower():
flat_json["position title"] = value
flat_json["position"] = value
flat_json["title"] = value
if "accreditation number" in field_name.lower():
flat_json["accreditation number"] = value
flat_json["nhvas accreditation no"] = value
if "expiry date" in field_name.lower():
flat_json["expiry date"] = value
flat_json["expiry"] = value
return flat_json
def is_red(run):
"""Detect red colored text"""
color = run.font.color
return color and (color.rgb == RGBColor(255, 0, 0) or getattr(color, "theme_color", None) == 1)
def get_value_as_string(value, field_name=""):
"""Convert value to string, handling lists appropriately"""
if isinstance(value, list):
if len(value) == 0:
return ""
elif len(value) == 1:
return str(value[0])
else:
if "australian company number" in field_name.lower() or "company number" in field_name.lower():
return value # Return as list for ACN processing
else:
return " ".join(str(v) for v in value)
else:
return str(value)
def find_matching_json_value(field_name, flat_json):
"""Enhanced matching for your new JSON structure"""
field_name = field_name.strip()
# Direct match (exact)
if field_name in flat_json:
print(f" βœ… Direct match found for key '{field_name}'")
return flat_json[field_name]
# Case-insensitive exact match
for key, value in flat_json.items():
if key.lower() == field_name.lower():
print(f" βœ… Case-insensitive match found for key '{field_name}' with JSON key '{key}'")
return value
# Partial matching for common field names
field_lower = field_name.lower().strip()
# Handle common variations
if "print name" in field_lower:
for key in ["Print Name", "print name", "operator name", "name"]:
if key in flat_json:
print(f" βœ… Print name match: '{field_name}' -> '{key}'")
return flat_json[key]
if "position title" in field_lower:
for key in ["Position Title", "position title", "position", "title"]:
if key in flat_json:
print(f" βœ… Position title match: '{field_name}' -> '{key}'")
return flat_json[key]
if "accreditation number" in field_lower:
for key in flat_json.keys():
if "accreditation" in key.lower() and "number" in key.lower():
print(f" βœ… Accreditation number match: '{field_name}' -> '{key}'")
return flat_json[key]
if "expiry date" in field_lower:
for key in flat_json.keys():
if "expiry" in key.lower():
print(f" βœ… Expiry date match: '{field_name}' -> '{key}'")
return flat_json[key]
# Fuzzy matching
field_words = set(word.lower() for word in re.findall(r'\b\w+\b', field_name) if len(word) > 2)
if not field_words:
return None
best_match = None
best_score = 0
best_key = None
for key, value in flat_json.items():
key_words = set(word.lower() for word in re.findall(r'\b\w+\b', key) if len(word) > 2)
if not key_words:
continue
common_words = field_words.intersection(key_words)
if common_words:
similarity = len(common_words) / len(field_words.union(key_words))
coverage = len(common_words) / len(field_words)
final_score = (similarity * 0.6) + (coverage * 0.4)
if final_score > best_score:
best_score = final_score
best_match = value
best_key = key
if best_match and best_score >= 0.25:
print(f" βœ… Fuzzy match found for key '{field_name}' with JSON key '{best_key}' (score: {best_score:.2f})")
return best_match
print(f" ❌ No match found for '{field_name}'")
return None
def get_clean_text(cell):
"""Extract clean text from cell"""
text = ""
for paragraph in cell.paragraphs:
for run in paragraph.runs:
text += run.text
return text.strip()
def has_red_text(cell):
"""Check if cell has red text"""
for paragraph in cell.paragraphs:
for run in paragraph.runs:
if is_red(run) and run.text.strip():
return True
return False
def replace_red_text_in_cell(cell, replacement_text):
"""Replace red text in cell with new text"""
replacements_made = 0
for paragraph in cell.paragraphs:
for run in paragraph.runs:
if is_red(run) and run.text.strip():
run.text = replacement_text
run.font.color.rgb = RGBColor(0, 0, 0) # Change to black
replacements_made += 1
break # Only replace first red text found
return replacements_made
def handle_australian_company_number(row, company_numbers):
"""Handle ACN digit placement"""
replacements_made = 0
for i, digit in enumerate(company_numbers):
cell_idx = i + 1
if cell_idx < len(row.cells):
cell = row.cells[cell_idx]
if has_red_text(cell):
cell_replacements = replace_red_text_in_cell(cell, str(digit))
replacements_made += cell_replacements
print(f" -> Placed digit '{digit}' in cell {cell_idx + 1}")
return replacements_made
def handle_nature_business_section(cell, flat_json):
"""Handle Nature of Business section with sub-fields"""
if not has_red_text(cell):
return 0
cell_text = get_clean_text(cell).lower()
if "nature of the operators business" not in cell_text and "nature of the operator business" not in cell_text:
return 0
print(f" 🎯 Found Nature of Business section")
# Check for business description
for key in flat_json.keys():
if "nature of the operators business" in key.lower():
business_value = flat_json[key]
replacement_text = get_value_as_string(business_value)
cell_replacements = replace_red_text_in_cell(cell, replacement_text)
if cell_replacements > 0:
print(f" βœ… Updated business description")
return cell_replacements
return 0
def handle_operator_declaration_table(table, flat_json):
"""Handle Operator Declaration table specifically"""
replacements_made = 0
for row_idx, row in enumerate(table.rows):
if len(row.cells) >= 2:
cell1_text = get_clean_text(row.cells[0]).strip()
cell2_text = get_clean_text(row.cells[1]).strip()
# Check if this is the Print Name / Position Title header row
if ("print name" in cell1_text.lower() and "position title" in cell2_text.lower()):
print(f" 🎯 Found Operator Declaration table")
# Look for data row
if row_idx + 1 < len(table.rows):
data_row = table.rows[row_idx + 1]
if len(data_row.cells) >= 2:
name_cell = data_row.cells[0]
position_cell = data_row.cells[1]
# Update Print Name
if has_red_text(name_cell):
name_value = None
for key in ["Print Name", "print name", "Operator Declaration.Print Name"]:
if key in flat_json:
name_value = flat_json[key]
break
if name_value:
name_text = get_value_as_string(name_value)
cell_replacements = replace_red_text_in_cell(name_cell, name_text)
replacements_made += cell_replacements
print(f" βœ… Updated Print Name: '{name_text}'")
# Update Position Title
if has_red_text(position_cell):
position_value = None
for key in ["Position Title", "position title", "Operator Declaration.Position Title"]:
if key in flat_json:
position_value = flat_json[key]
break
if position_value:
position_text = get_value_as_string(position_value)
cell_replacements = replace_red_text_in_cell(position_cell, position_text)
replacements_made += cell_replacements
print(f" βœ… Updated Position Title: '{position_text}'")
break
return replacements_made
def process_tables(document, flat_json):
"""Process all tables in document"""
replacements_made = 0
for table_idx, table in enumerate(document.tables):
print(f"\nπŸ” Processing table {table_idx + 1}:")
# Check for Operator Declaration table first (priority fix)
if len(table.rows) <= 4: # Small tables
declaration_replacements = handle_operator_declaration_table(table, flat_json)
if declaration_replacements > 0:
replacements_made += declaration_replacements
continue
# Process all rows
for row_idx, row in enumerate(table.rows):
if len(row.cells) < 1:
continue
key_cell = row.cells[0]
key_text = get_clean_text(key_cell)
if not key_text:
continue
print(f" πŸ“Œ Row {row_idx + 1}: Key = '{key_text}'")
# Handle Nature of Business section
if "nature of the operators business" in key_text.lower():
nature_replacements = handle_nature_business_section(key_cell, flat_json)
replacements_made += nature_replacements
continue
# Regular field matching
json_value = find_matching_json_value(key_text, flat_json)
if json_value is not None:
replacement_text = get_value_as_string(json_value, key_text)
# Handle Australian Company Number specially
if ("australian company number" in key_text.lower() or "company number" in key_text.lower()) and isinstance(json_value, list):
cell_replacements = handle_australian_company_number(row, json_value)
replacements_made += cell_replacements
else:
# Handle regular fields
for cell_idx in range(len(row.cells)):
cell = row.cells[cell_idx]
if has_red_text(cell):
cell_replacements = replace_red_text_in_cell(cell, replacement_text)
replacements_made += cell_replacements
if cell_replacements > 0:
print(f" βœ… Updated cell {cell_idx + 1}: '{replacement_text}'")
else:
# Process any red text in row cells
for cell_idx in range(len(row.cells)):
cell = row.cells[cell_idx]
if has_red_text(cell):
# Try to extract red text and match it
red_text = ""
for paragraph in cell.paragraphs:
for run in paragraph.runs:
if is_red(run):
red_text += run.text
if red_text.strip():
json_value = find_matching_json_value(red_text.strip(), flat_json)
if json_value is not None:
replacement_text = get_value_as_string(json_value)
cell_replacements = replace_red_text_in_cell(cell, replacement_text)
replacements_made += cell_replacements
if cell_replacements > 0:
print(f" βœ… Replaced red text: '{red_text.strip()}' -> '{replacement_text}'")
return replacements_made
def process_paragraphs(document, flat_json):
"""Process paragraphs for red text"""
replacements_made = 0
print(f"\nπŸ” Processing paragraphs:")
for para_idx, paragraph in enumerate(document.paragraphs):
red_text = ""
red_runs = []
for run in paragraph.runs:
if is_red(run) and run.text.strip():
red_text += run.text
red_runs.append(run)
if red_text.strip():
print(f" πŸ“Œ Paragraph {para_idx + 1}: Found red text: '{red_text.strip()}'")
json_value = find_matching_json_value(red_text.strip(), flat_json)
if json_value is not None:
replacement_text = get_value_as_string(json_value)
print(f" βœ… Replacing with: '{replacement_text}'")
# Replace in first red run only
if red_runs:
red_runs[0].text = replacement_text
red_runs[0].font.color.rgb = RGBColor(0, 0, 0)
# Clear other red runs
for run in red_runs[1:]:
run.text = ''
replacements_made += 1
return replacements_made
def process_hf(json_file, docx_file, output_file):
"""Main processing function compatible with your new system"""
try:
# Load JSON
if hasattr(json_file, "read"):
json_data = json.load(json_file)
else:
with open(json_file, 'r', encoding='utf-8') as f:
json_data = json.load(f)
# Flatten your new JSON structure
flat_json = flatten_json_new_system(json_data)
print("πŸ“„ Available JSON keys (sample):")
for i, (key, value) in enumerate(sorted(flat_json.items())):
if i < 10:
print(f" - {key}: {value}")
print(f" ... and {len(flat_json) - 10} more keys\n")
# Load DOCX
if hasattr(docx_file, "read"):
doc = Document(docx_file)
else:
doc = Document(docx_file)
# Process document
print("πŸš€ Starting processing compatible with your new system...")
table_replacements = process_tables(doc, flat_json)
paragraph_replacements = process_paragraphs(doc, flat_json)
total_replacements = table_replacements + paragraph_replacements
# Save output
if hasattr(output_file, "write"):
doc.save(output_file)
else:
doc.save(output_file)
print(f"\nβœ… Document saved as: {output_file}")
print(f"βœ… Total replacements: {total_replacements}")
print(f" πŸ“Š Tables: {table_replacements}")
print(f" πŸ“ Paragraphs: {paragraph_replacements}")
print(f"πŸŽ‰ Processing complete!")
except FileNotFoundError as e:
print(f"❌ File not found: {e}")
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
import sys
if len(sys.argv) != 4:
print("Usage: python compatible_pipeline.py <input_docx> <updated_json> <output_docx>")
exit(1)
docx_path = sys.argv[1]
json_path = sys.argv[2]
output_path = sys.argv[3]
process_hf(json_path, docx_path, output_path)