Spaces:
Running
Running
| import json | |
| from docx import Document | |
| from docx.shared import RGBColor | |
| import re | |
| def load_json(filepath): | |
| with open(filepath, 'r') as file: | |
| return json.load(file) | |
| def flatten_json_new_system(json_data): | |
| """Flatten your new JSON structure to work with replacement logic""" | |
| flat_json = {} | |
| for schema_name, schema_data in json_data.items(): | |
| if isinstance(schema_data, dict): | |
| for field_name, values in schema_data.items(): | |
| # Handle list values (your system returns lists) | |
| if isinstance(values, list) and values: | |
| value = values[0] if len(values) == 1 else values | |
| else: | |
| value = values | |
| # Add multiple key variations for better matching | |
| flat_json[field_name] = value | |
| flat_json[field_name.lower()] = value | |
| flat_json[field_name.lower().strip()] = value | |
| # Add schema-prefixed keys | |
| flat_json[f"{schema_name}.{field_name}"] = value | |
| flat_json[f"{schema_name.lower()}.{field_name.lower()}"] = value | |
| # Special mappings for common cases | |
| if "print name" in field_name.lower(): | |
| flat_json["print name"] = value | |
| flat_json["operator name"] = value | |
| flat_json["name"] = value | |
| if "position title" in field_name.lower(): | |
| flat_json["position title"] = value | |
| flat_json["position"] = value | |
| flat_json["title"] = value | |
| if "accreditation number" in field_name.lower(): | |
| flat_json["accreditation number"] = value | |
| flat_json["nhvas accreditation no"] = value | |
| if "expiry date" in field_name.lower(): | |
| flat_json["expiry date"] = value | |
| flat_json["expiry"] = value | |
| return flat_json | |
| def is_red(run): | |
| """Detect red colored text""" | |
| color = run.font.color | |
| return color and (color.rgb == RGBColor(255, 0, 0) or getattr(color, "theme_color", None) == 1) | |
| def get_value_as_string(value, field_name=""): | |
| """Convert value to string, handling lists appropriately""" | |
| if isinstance(value, list): | |
| if len(value) == 0: | |
| return "" | |
| elif len(value) == 1: | |
| return str(value[0]) | |
| else: | |
| if "australian company number" in field_name.lower() or "company number" in field_name.lower(): | |
| return value # Return as list for ACN processing | |
| else: | |
| return " ".join(str(v) for v in value) | |
| else: | |
| return str(value) | |
| def find_matching_json_value(field_name, flat_json): | |
| """Enhanced matching for your new JSON structure""" | |
| field_name = field_name.strip() | |
| # Direct match (exact) | |
| if field_name in flat_json: | |
| print(f" β Direct match found for key '{field_name}'") | |
| return flat_json[field_name] | |
| # Case-insensitive exact match | |
| for key, value in flat_json.items(): | |
| if key.lower() == field_name.lower(): | |
| print(f" β Case-insensitive match found for key '{field_name}' with JSON key '{key}'") | |
| return value | |
| # Partial matching for common field names | |
| field_lower = field_name.lower().strip() | |
| # Handle common variations | |
| if "print name" in field_lower: | |
| for key in ["Print Name", "print name", "operator name", "name"]: | |
| if key in flat_json: | |
| print(f" β Print name match: '{field_name}' -> '{key}'") | |
| return flat_json[key] | |
| if "position title" in field_lower: | |
| for key in ["Position Title", "position title", "position", "title"]: | |
| if key in flat_json: | |
| print(f" β Position title match: '{field_name}' -> '{key}'") | |
| return flat_json[key] | |
| if "accreditation number" in field_lower: | |
| for key in flat_json.keys(): | |
| if "accreditation" in key.lower() and "number" in key.lower(): | |
| print(f" β Accreditation number match: '{field_name}' -> '{key}'") | |
| return flat_json[key] | |
| if "expiry date" in field_lower: | |
| for key in flat_json.keys(): | |
| if "expiry" in key.lower(): | |
| print(f" β Expiry date match: '{field_name}' -> '{key}'") | |
| return flat_json[key] | |
| # Fuzzy matching | |
| field_words = set(word.lower() for word in re.findall(r'\b\w+\b', field_name) if len(word) > 2) | |
| if not field_words: | |
| return None | |
| best_match = None | |
| best_score = 0 | |
| best_key = None | |
| for key, value in flat_json.items(): | |
| key_words = set(word.lower() for word in re.findall(r'\b\w+\b', key) if len(word) > 2) | |
| if not key_words: | |
| continue | |
| common_words = field_words.intersection(key_words) | |
| if common_words: | |
| similarity = len(common_words) / len(field_words.union(key_words)) | |
| coverage = len(common_words) / len(field_words) | |
| final_score = (similarity * 0.6) + (coverage * 0.4) | |
| if final_score > best_score: | |
| best_score = final_score | |
| best_match = value | |
| best_key = key | |
| if best_match and best_score >= 0.25: | |
| print(f" β Fuzzy match found for key '{field_name}' with JSON key '{best_key}' (score: {best_score:.2f})") | |
| return best_match | |
| print(f" β No match found for '{field_name}'") | |
| return None | |
| def get_clean_text(cell): | |
| """Extract clean text from cell""" | |
| text = "" | |
| for paragraph in cell.paragraphs: | |
| for run in paragraph.runs: | |
| text += run.text | |
| return text.strip() | |
| def has_red_text(cell): | |
| """Check if cell has red text""" | |
| for paragraph in cell.paragraphs: | |
| for run in paragraph.runs: | |
| if is_red(run) and run.text.strip(): | |
| return True | |
| return False | |
| def replace_red_text_in_cell(cell, replacement_text): | |
| """Replace red text in cell with new text""" | |
| replacements_made = 0 | |
| for paragraph in cell.paragraphs: | |
| for run in paragraph.runs: | |
| if is_red(run) and run.text.strip(): | |
| run.text = replacement_text | |
| run.font.color.rgb = RGBColor(0, 0, 0) # Change to black | |
| replacements_made += 1 | |
| break # Only replace first red text found | |
| return replacements_made | |
| def handle_australian_company_number(row, company_numbers): | |
| """Handle ACN digit placement""" | |
| replacements_made = 0 | |
| for i, digit in enumerate(company_numbers): | |
| cell_idx = i + 1 | |
| if cell_idx < len(row.cells): | |
| cell = row.cells[cell_idx] | |
| if has_red_text(cell): | |
| cell_replacements = replace_red_text_in_cell(cell, str(digit)) | |
| replacements_made += cell_replacements | |
| print(f" -> Placed digit '{digit}' in cell {cell_idx + 1}") | |
| return replacements_made | |
| def handle_nature_business_section(cell, flat_json): | |
| """Handle Nature of Business section with sub-fields""" | |
| if not has_red_text(cell): | |
| return 0 | |
| cell_text = get_clean_text(cell).lower() | |
| if "nature of the operators business" not in cell_text and "nature of the operator business" not in cell_text: | |
| return 0 | |
| print(f" π― Found Nature of Business section") | |
| # Check for business description | |
| for key in flat_json.keys(): | |
| if "nature of the operators business" in key.lower(): | |
| business_value = flat_json[key] | |
| replacement_text = get_value_as_string(business_value) | |
| cell_replacements = replace_red_text_in_cell(cell, replacement_text) | |
| if cell_replacements > 0: | |
| print(f" β Updated business description") | |
| return cell_replacements | |
| return 0 | |
| def handle_operator_declaration_table(table, flat_json): | |
| """Handle Operator Declaration table specifically""" | |
| replacements_made = 0 | |
| for row_idx, row in enumerate(table.rows): | |
| if len(row.cells) >= 2: | |
| cell1_text = get_clean_text(row.cells[0]).strip() | |
| cell2_text = get_clean_text(row.cells[1]).strip() | |
| # Check if this is the Print Name / Position Title header row | |
| if ("print name" in cell1_text.lower() and "position title" in cell2_text.lower()): | |
| print(f" π― Found Operator Declaration table") | |
| # Look for data row | |
| if row_idx + 1 < len(table.rows): | |
| data_row = table.rows[row_idx + 1] | |
| if len(data_row.cells) >= 2: | |
| name_cell = data_row.cells[0] | |
| position_cell = data_row.cells[1] | |
| # Update Print Name | |
| if has_red_text(name_cell): | |
| name_value = None | |
| for key in ["Print Name", "print name", "Operator Declaration.Print Name"]: | |
| if key in flat_json: | |
| name_value = flat_json[key] | |
| break | |
| if name_value: | |
| name_text = get_value_as_string(name_value) | |
| cell_replacements = replace_red_text_in_cell(name_cell, name_text) | |
| replacements_made += cell_replacements | |
| print(f" β Updated Print Name: '{name_text}'") | |
| # Update Position Title | |
| if has_red_text(position_cell): | |
| position_value = None | |
| for key in ["Position Title", "position title", "Operator Declaration.Position Title"]: | |
| if key in flat_json: | |
| position_value = flat_json[key] | |
| break | |
| if position_value: | |
| position_text = get_value_as_string(position_value) | |
| cell_replacements = replace_red_text_in_cell(position_cell, position_text) | |
| replacements_made += cell_replacements | |
| print(f" β Updated Position Title: '{position_text}'") | |
| break | |
| return replacements_made | |
| def process_tables(document, flat_json): | |
| """Process all tables in document""" | |
| replacements_made = 0 | |
| for table_idx, table in enumerate(document.tables): | |
| print(f"\nπ Processing table {table_idx + 1}:") | |
| # Check for Operator Declaration table first (priority fix) | |
| if len(table.rows) <= 4: # Small tables | |
| declaration_replacements = handle_operator_declaration_table(table, flat_json) | |
| if declaration_replacements > 0: | |
| replacements_made += declaration_replacements | |
| continue | |
| # Process all rows | |
| for row_idx, row in enumerate(table.rows): | |
| if len(row.cells) < 1: | |
| continue | |
| key_cell = row.cells[0] | |
| key_text = get_clean_text(key_cell) | |
| if not key_text: | |
| continue | |
| print(f" π Row {row_idx + 1}: Key = '{key_text}'") | |
| # Handle Nature of Business section | |
| if "nature of the operators business" in key_text.lower(): | |
| nature_replacements = handle_nature_business_section(key_cell, flat_json) | |
| replacements_made += nature_replacements | |
| continue | |
| # Regular field matching | |
| json_value = find_matching_json_value(key_text, flat_json) | |
| if json_value is not None: | |
| replacement_text = get_value_as_string(json_value, key_text) | |
| # Handle Australian Company Number specially | |
| if ("australian company number" in key_text.lower() or "company number" in key_text.lower()) and isinstance(json_value, list): | |
| cell_replacements = handle_australian_company_number(row, json_value) | |
| replacements_made += cell_replacements | |
| else: | |
| # Handle regular fields | |
| for cell_idx in range(len(row.cells)): | |
| cell = row.cells[cell_idx] | |
| if has_red_text(cell): | |
| cell_replacements = replace_red_text_in_cell(cell, replacement_text) | |
| replacements_made += cell_replacements | |
| if cell_replacements > 0: | |
| print(f" β Updated cell {cell_idx + 1}: '{replacement_text}'") | |
| else: | |
| # Process any red text in row cells | |
| for cell_idx in range(len(row.cells)): | |
| cell = row.cells[cell_idx] | |
| if has_red_text(cell): | |
| # Try to extract red text and match it | |
| red_text = "" | |
| for paragraph in cell.paragraphs: | |
| for run in paragraph.runs: | |
| if is_red(run): | |
| red_text += run.text | |
| if red_text.strip(): | |
| json_value = find_matching_json_value(red_text.strip(), flat_json) | |
| if json_value is not None: | |
| replacement_text = get_value_as_string(json_value) | |
| cell_replacements = replace_red_text_in_cell(cell, replacement_text) | |
| replacements_made += cell_replacements | |
| if cell_replacements > 0: | |
| print(f" β Replaced red text: '{red_text.strip()}' -> '{replacement_text}'") | |
| return replacements_made | |
| def process_paragraphs(document, flat_json): | |
| """Process paragraphs for red text""" | |
| replacements_made = 0 | |
| print(f"\nπ Processing paragraphs:") | |
| for para_idx, paragraph in enumerate(document.paragraphs): | |
| red_text = "" | |
| red_runs = [] | |
| for run in paragraph.runs: | |
| if is_red(run) and run.text.strip(): | |
| red_text += run.text | |
| red_runs.append(run) | |
| if red_text.strip(): | |
| print(f" π Paragraph {para_idx + 1}: Found red text: '{red_text.strip()}'") | |
| json_value = find_matching_json_value(red_text.strip(), flat_json) | |
| if json_value is not None: | |
| replacement_text = get_value_as_string(json_value) | |
| print(f" β Replacing with: '{replacement_text}'") | |
| # Replace in first red run only | |
| if red_runs: | |
| red_runs[0].text = replacement_text | |
| red_runs[0].font.color.rgb = RGBColor(0, 0, 0) | |
| # Clear other red runs | |
| for run in red_runs[1:]: | |
| run.text = '' | |
| replacements_made += 1 | |
| return replacements_made | |
| def process_hf(json_file, docx_file, output_file): | |
| """Main processing function compatible with your new system""" | |
| try: | |
| # Load JSON | |
| if hasattr(json_file, "read"): | |
| json_data = json.load(json_file) | |
| else: | |
| with open(json_file, 'r', encoding='utf-8') as f: | |
| json_data = json.load(f) | |
| # Flatten your new JSON structure | |
| flat_json = flatten_json_new_system(json_data) | |
| print("π Available JSON keys (sample):") | |
| for i, (key, value) in enumerate(sorted(flat_json.items())): | |
| if i < 10: | |
| print(f" - {key}: {value}") | |
| print(f" ... and {len(flat_json) - 10} more keys\n") | |
| # Load DOCX | |
| if hasattr(docx_file, "read"): | |
| doc = Document(docx_file) | |
| else: | |
| doc = Document(docx_file) | |
| # Process document | |
| print("π Starting processing compatible with your new system...") | |
| table_replacements = process_tables(doc, flat_json) | |
| paragraph_replacements = process_paragraphs(doc, flat_json) | |
| total_replacements = table_replacements + paragraph_replacements | |
| # Save output | |
| if hasattr(output_file, "write"): | |
| doc.save(output_file) | |
| else: | |
| doc.save(output_file) | |
| print(f"\nβ Document saved as: {output_file}") | |
| print(f"β Total replacements: {total_replacements}") | |
| print(f" π Tables: {table_replacements}") | |
| print(f" π Paragraphs: {paragraph_replacements}") | |
| print(f"π Processing complete!") | |
| except FileNotFoundError as e: | |
| print(f"β File not found: {e}") | |
| except Exception as e: | |
| print(f"β Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) != 4: | |
| print("Usage: python compatible_pipeline.py <input_docx> <updated_json> <output_docx>") | |
| exit(1) | |
| docx_path = sys.argv[1] | |
| json_path = sys.argv[2] | |
| output_path = sys.argv[3] | |
| process_hf(json_path, docx_path, output_path) |