import json from docx import Document from docx.shared import RGBColor import re def load_json(filepath): with open(filepath, 'r') as file: return json.load(file) def flatten_json_new_system(json_data): """Flatten your new JSON structure to work with replacement logic""" flat_json = {} for schema_name, schema_data in json_data.items(): if isinstance(schema_data, dict): for field_name, values in schema_data.items(): # Handle list values (your system returns lists) if isinstance(values, list) and values: value = values[0] if len(values) == 1 else values else: value = values # Add multiple key variations for better matching flat_json[field_name] = value flat_json[field_name.lower()] = value flat_json[field_name.lower().strip()] = value # Add schema-prefixed keys flat_json[f"{schema_name}.{field_name}"] = value flat_json[f"{schema_name.lower()}.{field_name.lower()}"] = value # Special mappings for common cases if "print name" in field_name.lower(): flat_json["print name"] = value flat_json["operator name"] = value flat_json["name"] = value if "position title" in field_name.lower(): flat_json["position title"] = value flat_json["position"] = value flat_json["title"] = value if "accreditation number" in field_name.lower(): flat_json["accreditation number"] = value flat_json["nhvas accreditation no"] = value if "expiry date" in field_name.lower(): flat_json["expiry date"] = value flat_json["expiry"] = value return flat_json def is_red(run): """Detect red colored text""" color = run.font.color return color and (color.rgb == RGBColor(255, 0, 0) or getattr(color, "theme_color", None) == 1) def get_value_as_string(value, field_name=""): """Convert value to string, handling lists appropriately""" if isinstance(value, list): if len(value) == 0: return "" elif len(value) == 1: return str(value[0]) else: if "australian company number" in field_name.lower() or "company number" in field_name.lower(): return value # Return as list for ACN processing else: return " ".join(str(v) for v in value) else: return str(value) def find_matching_json_value(field_name, flat_json): """Enhanced matching for your new JSON structure""" field_name = field_name.strip() # Direct match (exact) if field_name in flat_json: print(f" ✅ Direct match found for key '{field_name}'") return flat_json[field_name] # Case-insensitive exact match for key, value in flat_json.items(): if key.lower() == field_name.lower(): print(f" ✅ Case-insensitive match found for key '{field_name}' with JSON key '{key}'") return value # Partial matching for common field names field_lower = field_name.lower().strip() # Handle common variations if "print name" in field_lower: for key in ["Print Name", "print name", "operator name", "name"]: if key in flat_json: print(f" ✅ Print name match: '{field_name}' -> '{key}'") return flat_json[key] if "position title" in field_lower: for key in ["Position Title", "position title", "position", "title"]: if key in flat_json: print(f" ✅ Position title match: '{field_name}' -> '{key}'") return flat_json[key] if "accreditation number" in field_lower: for key in flat_json.keys(): if "accreditation" in key.lower() and "number" in key.lower(): print(f" ✅ Accreditation number match: '{field_name}' -> '{key}'") return flat_json[key] if "expiry date" in field_lower: for key in flat_json.keys(): if "expiry" in key.lower(): print(f" ✅ Expiry date match: '{field_name}' -> '{key}'") return flat_json[key] # Fuzzy matching field_words = set(word.lower() for word in re.findall(r'\b\w+\b', field_name) if len(word) > 2) if not field_words: return None best_match = None best_score = 0 best_key = None for key, value in flat_json.items(): key_words = set(word.lower() for word in re.findall(r'\b\w+\b', key) if len(word) > 2) if not key_words: continue common_words = field_words.intersection(key_words) if common_words: similarity = len(common_words) / len(field_words.union(key_words)) coverage = len(common_words) / len(field_words) final_score = (similarity * 0.6) + (coverage * 0.4) if final_score > best_score: best_score = final_score best_match = value best_key = key if best_match and best_score >= 0.25: print(f" ✅ Fuzzy match found for key '{field_name}' with JSON key '{best_key}' (score: {best_score:.2f})") return best_match print(f" ❌ No match found for '{field_name}'") return None def get_clean_text(cell): """Extract clean text from cell""" text = "" for paragraph in cell.paragraphs: for run in paragraph.runs: text += run.text return text.strip() def has_red_text(cell): """Check if cell has red text""" for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run) and run.text.strip(): return True return False def replace_red_text_in_cell(cell, replacement_text): """Replace red text in cell with new text""" replacements_made = 0 for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run) and run.text.strip(): run.text = replacement_text run.font.color.rgb = RGBColor(0, 0, 0) # Change to black replacements_made += 1 break # Only replace first red text found return replacements_made def handle_australian_company_number(row, company_numbers): """Handle ACN digit placement""" replacements_made = 0 for i, digit in enumerate(company_numbers): cell_idx = i + 1 if cell_idx < len(row.cells): cell = row.cells[cell_idx] if has_red_text(cell): cell_replacements = replace_red_text_in_cell(cell, str(digit)) replacements_made += cell_replacements print(f" -> Placed digit '{digit}' in cell {cell_idx + 1}") return replacements_made def handle_nature_business_section(cell, flat_json): """Handle Nature of Business section with sub-fields""" if not has_red_text(cell): return 0 cell_text = get_clean_text(cell).lower() if "nature of the operators business" not in cell_text and "nature of the operator business" not in cell_text: return 0 print(f" 🎯 Found Nature of Business section") # Check for business description for key in flat_json.keys(): if "nature of the operators business" in key.lower(): business_value = flat_json[key] replacement_text = get_value_as_string(business_value) cell_replacements = replace_red_text_in_cell(cell, replacement_text) if cell_replacements > 0: print(f" ✅ Updated business description") return cell_replacements return 0 def handle_operator_declaration_table(table, flat_json): """Handle Operator Declaration table specifically""" replacements_made = 0 for row_idx, row in enumerate(table.rows): if len(row.cells) >= 2: cell1_text = get_clean_text(row.cells[0]).strip() cell2_text = get_clean_text(row.cells[1]).strip() # Check if this is the Print Name / Position Title header row if ("print name" in cell1_text.lower() and "position title" in cell2_text.lower()): print(f" 🎯 Found Operator Declaration table") # Look for data row if row_idx + 1 < len(table.rows): data_row = table.rows[row_idx + 1] if len(data_row.cells) >= 2: name_cell = data_row.cells[0] position_cell = data_row.cells[1] # Update Print Name if has_red_text(name_cell): name_value = None for key in ["Print Name", "print name", "Operator Declaration.Print Name"]: if key in flat_json: name_value = flat_json[key] break if name_value: name_text = get_value_as_string(name_value) cell_replacements = replace_red_text_in_cell(name_cell, name_text) replacements_made += cell_replacements print(f" ✅ Updated Print Name: '{name_text}'") # Update Position Title if has_red_text(position_cell): position_value = None for key in ["Position Title", "position title", "Operator Declaration.Position Title"]: if key in flat_json: position_value = flat_json[key] break if position_value: position_text = get_value_as_string(position_value) cell_replacements = replace_red_text_in_cell(position_cell, position_text) replacements_made += cell_replacements print(f" ✅ Updated Position Title: '{position_text}'") break return replacements_made def process_tables(document, flat_json): """Process all tables in document""" replacements_made = 0 for table_idx, table in enumerate(document.tables): print(f"\n🔍 Processing table {table_idx + 1}:") # Check for Operator Declaration table first (priority fix) if len(table.rows) <= 4: # Small tables declaration_replacements = handle_operator_declaration_table(table, flat_json) if declaration_replacements > 0: replacements_made += declaration_replacements continue # Process all rows for row_idx, row in enumerate(table.rows): if len(row.cells) < 1: continue key_cell = row.cells[0] key_text = get_clean_text(key_cell) if not key_text: continue print(f" 📌 Row {row_idx + 1}: Key = '{key_text}'") # Handle Nature of Business section if "nature of the operators business" in key_text.lower(): nature_replacements = handle_nature_business_section(key_cell, flat_json) replacements_made += nature_replacements continue # Regular field matching json_value = find_matching_json_value(key_text, flat_json) if json_value is not None: replacement_text = get_value_as_string(json_value, key_text) # Handle Australian Company Number specially if ("australian company number" in key_text.lower() or "company number" in key_text.lower()) and isinstance(json_value, list): cell_replacements = handle_australian_company_number(row, json_value) replacements_made += cell_replacements else: # Handle regular fields for cell_idx in range(len(row.cells)): cell = row.cells[cell_idx] if has_red_text(cell): cell_replacements = replace_red_text_in_cell(cell, replacement_text) replacements_made += cell_replacements if cell_replacements > 0: print(f" ✅ Updated cell {cell_idx + 1}: '{replacement_text}'") else: # Process any red text in row cells for cell_idx in range(len(row.cells)): cell = row.cells[cell_idx] if has_red_text(cell): # Try to extract red text and match it red_text = "" for paragraph in cell.paragraphs: for run in paragraph.runs: if is_red(run): red_text += run.text if red_text.strip(): json_value = find_matching_json_value(red_text.strip(), flat_json) if json_value is not None: replacement_text = get_value_as_string(json_value) cell_replacements = replace_red_text_in_cell(cell, replacement_text) replacements_made += cell_replacements if cell_replacements > 0: print(f" ✅ Replaced red text: '{red_text.strip()}' -> '{replacement_text}'") return replacements_made def process_paragraphs(document, flat_json): """Process paragraphs for red text""" replacements_made = 0 print(f"\n🔍 Processing paragraphs:") for para_idx, paragraph in enumerate(document.paragraphs): red_text = "" red_runs = [] for run in paragraph.runs: if is_red(run) and run.text.strip(): red_text += run.text red_runs.append(run) if red_text.strip(): print(f" 📌 Paragraph {para_idx + 1}: Found red text: '{red_text.strip()}'") json_value = find_matching_json_value(red_text.strip(), flat_json) if json_value is not None: replacement_text = get_value_as_string(json_value) print(f" ✅ Replacing with: '{replacement_text}'") # Replace in first red run only if red_runs: red_runs[0].text = replacement_text red_runs[0].font.color.rgb = RGBColor(0, 0, 0) # Clear other red runs for run in red_runs[1:]: run.text = '' replacements_made += 1 return replacements_made def process_hf(json_file, docx_file, output_file): """Main processing function compatible with your new system""" try: # Load JSON if hasattr(json_file, "read"): json_data = json.load(json_file) else: with open(json_file, 'r', encoding='utf-8') as f: json_data = json.load(f) # Flatten your new JSON structure flat_json = flatten_json_new_system(json_data) print("📄 Available JSON keys (sample):") for i, (key, value) in enumerate(sorted(flat_json.items())): if i < 10: print(f" - {key}: {value}") print(f" ... and {len(flat_json) - 10} more keys\n") # Load DOCX if hasattr(docx_file, "read"): doc = Document(docx_file) else: doc = Document(docx_file) # Process document print("🚀 Starting processing compatible with your new system...") table_replacements = process_tables(doc, flat_json) paragraph_replacements = process_paragraphs(doc, flat_json) total_replacements = table_replacements + paragraph_replacements # Save output if hasattr(output_file, "write"): doc.save(output_file) else: doc.save(output_file) print(f"\n✅ Document saved as: {output_file}") print(f"✅ Total replacements: {total_replacements}") print(f" 📊 Tables: {table_replacements}") print(f" 📝 Paragraphs: {paragraph_replacements}") print(f"🎉 Processing complete!") except FileNotFoundError as e: print(f"❌ File not found: {e}") except Exception as e: print(f"❌ Error: {e}") import traceback traceback.print_exc() if __name__ == "__main__": import sys if len(sys.argv) != 4: print("Usage: python compatible_pipeline.py ") exit(1) docx_path = sys.argv[1] json_path = sys.argv[2] output_path = sys.argv[3] process_hf(json_path, docx_path, output_path)