Spaces:
Running
Running
File size: 17,915 Bytes
e8b46b5 ddb37e5 e8b46b5 ddb37e5 e8b46b5 ddb37e5 e8b46b5 ddb37e5 e8b46b5 ddb37e5 e8b46b5 ddb37e5 e8b46b5 5b2b3a8 e8b46b5 ddb37e5 e8b46b5 5b2b3a8 e8b46b5 ddb37e5 e8b46b5 5b2b3a8 e8b46b5 5b2b3a8 e8b46b5 5b2b3a8 e8b46b5 5b2b3a8 e8b46b5 412e2ed 5b2b3a8 e8b46b5 ddb37e5 e8b46b5 ddb37e5 e8b46b5 5b2b3a8 ddb37e5 5b2b3a8 ddb37e5 5b2b3a8 ddb37e5 5b2b3a8 ddb37e5 5b2b3a8 ddb37e5 412e2ed 5b2b3a8 412e2ed 5b2b3a8 ddb37e5 5b2b3a8 ddb37e5 5efc8a5 ddb37e5 5efc8a5 ddb37e5 5efc8a5 412e2ed 5efc8a5 ddb37e5 412e2ed ddb37e5 5efc8a5 ddb37e5 5efc8a5 ddb37e5 5efc8a5 ddb37e5 5efc8a5 ddb37e5 5efc8a5 ddb37e5 5efc8a5 ddb37e5 e8b46b5 7755a4a e8b46b5 ddb37e5 e8b46b5 ddb37e5 e8b46b5 ddb37e5 e8b46b5 7755a4a e8b46b5 ddb37e5 e8b46b5 ddb37e5 e8b46b5 ddb37e5 e8b46b5 ddb37e5 e8b46b5 ddb37e5 e8b46b5 ddb37e5 e8b46b5 ddb37e5 e8b46b5 ddb37e5 e8b46b5 7755a4a e8b46b5 ddb37e5 7755a4a ddb37e5 7755a4a ddb37e5 7755a4a ddb37e5 7755a4a ddb37e5 7755a4a 5b2b3a8 ddb37e5 e8b46b5 7755a4a 5b2b3a8 7755a4a ddb37e5 e8b46b5 5b2b3a8 e8b46b5 5b2b3a8 e8b46b5 7755a4a 5b2b3a8 e8b46b5 ddb37e5 7755a4a e8b46b5 7755a4a ddb37e5 e8b46b5 7755a4a 5b2b3a8 7755a4a 5b2b3a8 7755a4a 412e2ed e8b46b5 a6e31ac ddb37e5 a6e31ac 7755a4a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 |
import json
from docx import Document
from docx.shared import RGBColor
import re
def load_json(filepath):
with open(filepath, 'r') as file:
return json.load(file)
def flatten_json_new_system(json_data):
"""Flatten your new JSON structure to work with replacement logic"""
flat_json = {}
for schema_name, schema_data in json_data.items():
if isinstance(schema_data, dict):
for field_name, values in schema_data.items():
# Handle list values (your system returns lists)
if isinstance(values, list) and values:
value = values[0] if len(values) == 1 else values
else:
value = values
# Add multiple key variations for better matching
flat_json[field_name] = value
flat_json[field_name.lower()] = value
flat_json[field_name.lower().strip()] = value
# Add schema-prefixed keys
flat_json[f"{schema_name}.{field_name}"] = value
flat_json[f"{schema_name.lower()}.{field_name.lower()}"] = value
# Special mappings for common cases
if "print name" in field_name.lower():
flat_json["print name"] = value
flat_json["operator name"] = value
flat_json["name"] = value
if "position title" in field_name.lower():
flat_json["position title"] = value
flat_json["position"] = value
flat_json["title"] = value
if "accreditation number" in field_name.lower():
flat_json["accreditation number"] = value
flat_json["nhvas accreditation no"] = value
if "expiry date" in field_name.lower():
flat_json["expiry date"] = value
flat_json["expiry"] = value
return flat_json
def is_red(run):
"""Detect red colored text"""
color = run.font.color
return color and (color.rgb == RGBColor(255, 0, 0) or getattr(color, "theme_color", None) == 1)
def get_value_as_string(value, field_name=""):
"""Convert value to string, handling lists appropriately"""
if isinstance(value, list):
if len(value) == 0:
return ""
elif len(value) == 1:
return str(value[0])
else:
if "australian company number" in field_name.lower() or "company number" in field_name.lower():
return value # Return as list for ACN processing
else:
return " ".join(str(v) for v in value)
else:
return str(value)
def find_matching_json_value(field_name, flat_json):
"""Enhanced matching for your new JSON structure"""
field_name = field_name.strip()
# Direct match (exact)
if field_name in flat_json:
print(f" β
Direct match found for key '{field_name}'")
return flat_json[field_name]
# Case-insensitive exact match
for key, value in flat_json.items():
if key.lower() == field_name.lower():
print(f" β
Case-insensitive match found for key '{field_name}' with JSON key '{key}'")
return value
# Partial matching for common field names
field_lower = field_name.lower().strip()
# Handle common variations
if "print name" in field_lower:
for key in ["Print Name", "print name", "operator name", "name"]:
if key in flat_json:
print(f" β
Print name match: '{field_name}' -> '{key}'")
return flat_json[key]
if "position title" in field_lower:
for key in ["Position Title", "position title", "position", "title"]:
if key in flat_json:
print(f" β
Position title match: '{field_name}' -> '{key}'")
return flat_json[key]
if "accreditation number" in field_lower:
for key in flat_json.keys():
if "accreditation" in key.lower() and "number" in key.lower():
print(f" β
Accreditation number match: '{field_name}' -> '{key}'")
return flat_json[key]
if "expiry date" in field_lower:
for key in flat_json.keys():
if "expiry" in key.lower():
print(f" β
Expiry date match: '{field_name}' -> '{key}'")
return flat_json[key]
# Fuzzy matching
field_words = set(word.lower() for word in re.findall(r'\b\w+\b', field_name) if len(word) > 2)
if not field_words:
return None
best_match = None
best_score = 0
best_key = None
for key, value in flat_json.items():
key_words = set(word.lower() for word in re.findall(r'\b\w+\b', key) if len(word) > 2)
if not key_words:
continue
common_words = field_words.intersection(key_words)
if common_words:
similarity = len(common_words) / len(field_words.union(key_words))
coverage = len(common_words) / len(field_words)
final_score = (similarity * 0.6) + (coverage * 0.4)
if final_score > best_score:
best_score = final_score
best_match = value
best_key = key
if best_match and best_score >= 0.25:
print(f" β
Fuzzy match found for key '{field_name}' with JSON key '{best_key}' (score: {best_score:.2f})")
return best_match
print(f" β No match found for '{field_name}'")
return None
def get_clean_text(cell):
"""Extract clean text from cell"""
text = ""
for paragraph in cell.paragraphs:
for run in paragraph.runs:
text += run.text
return text.strip()
def has_red_text(cell):
"""Check if cell has red text"""
for paragraph in cell.paragraphs:
for run in paragraph.runs:
if is_red(run) and run.text.strip():
return True
return False
def replace_red_text_in_cell(cell, replacement_text):
"""Replace red text in cell with new text"""
replacements_made = 0
for paragraph in cell.paragraphs:
for run in paragraph.runs:
if is_red(run) and run.text.strip():
run.text = replacement_text
run.font.color.rgb = RGBColor(0, 0, 0) # Change to black
replacements_made += 1
break # Only replace first red text found
return replacements_made
def handle_australian_company_number(row, company_numbers):
"""Handle ACN digit placement"""
replacements_made = 0
for i, digit in enumerate(company_numbers):
cell_idx = i + 1
if cell_idx < len(row.cells):
cell = row.cells[cell_idx]
if has_red_text(cell):
cell_replacements = replace_red_text_in_cell(cell, str(digit))
replacements_made += cell_replacements
print(f" -> Placed digit '{digit}' in cell {cell_idx + 1}")
return replacements_made
def handle_nature_business_section(cell, flat_json):
"""Handle Nature of Business section with sub-fields"""
if not has_red_text(cell):
return 0
cell_text = get_clean_text(cell).lower()
if "nature of the operators business" not in cell_text and "nature of the operator business" not in cell_text:
return 0
print(f" π― Found Nature of Business section")
# Check for business description
for key in flat_json.keys():
if "nature of the operators business" in key.lower():
business_value = flat_json[key]
replacement_text = get_value_as_string(business_value)
cell_replacements = replace_red_text_in_cell(cell, replacement_text)
if cell_replacements > 0:
print(f" β
Updated business description")
return cell_replacements
return 0
def handle_operator_declaration_table(table, flat_json):
"""Handle Operator Declaration table specifically"""
replacements_made = 0
for row_idx, row in enumerate(table.rows):
if len(row.cells) >= 2:
cell1_text = get_clean_text(row.cells[0]).strip()
cell2_text = get_clean_text(row.cells[1]).strip()
# Check if this is the Print Name / Position Title header row
if ("print name" in cell1_text.lower() and "position title" in cell2_text.lower()):
print(f" π― Found Operator Declaration table")
# Look for data row
if row_idx + 1 < len(table.rows):
data_row = table.rows[row_idx + 1]
if len(data_row.cells) >= 2:
name_cell = data_row.cells[0]
position_cell = data_row.cells[1]
# Update Print Name
if has_red_text(name_cell):
name_value = None
for key in ["Print Name", "print name", "Operator Declaration.Print Name"]:
if key in flat_json:
name_value = flat_json[key]
break
if name_value:
name_text = get_value_as_string(name_value)
cell_replacements = replace_red_text_in_cell(name_cell, name_text)
replacements_made += cell_replacements
print(f" β
Updated Print Name: '{name_text}'")
# Update Position Title
if has_red_text(position_cell):
position_value = None
for key in ["Position Title", "position title", "Operator Declaration.Position Title"]:
if key in flat_json:
position_value = flat_json[key]
break
if position_value:
position_text = get_value_as_string(position_value)
cell_replacements = replace_red_text_in_cell(position_cell, position_text)
replacements_made += cell_replacements
print(f" β
Updated Position Title: '{position_text}'")
break
return replacements_made
def process_tables(document, flat_json):
"""Process all tables in document"""
replacements_made = 0
for table_idx, table in enumerate(document.tables):
print(f"\nπ Processing table {table_idx + 1}:")
# Check for Operator Declaration table first (priority fix)
if len(table.rows) <= 4: # Small tables
declaration_replacements = handle_operator_declaration_table(table, flat_json)
if declaration_replacements > 0:
replacements_made += declaration_replacements
continue
# Process all rows
for row_idx, row in enumerate(table.rows):
if len(row.cells) < 1:
continue
key_cell = row.cells[0]
key_text = get_clean_text(key_cell)
if not key_text:
continue
print(f" π Row {row_idx + 1}: Key = '{key_text}'")
# Handle Nature of Business section
if "nature of the operators business" in key_text.lower():
nature_replacements = handle_nature_business_section(key_cell, flat_json)
replacements_made += nature_replacements
continue
# Regular field matching
json_value = find_matching_json_value(key_text, flat_json)
if json_value is not None:
replacement_text = get_value_as_string(json_value, key_text)
# Handle Australian Company Number specially
if ("australian company number" in key_text.lower() or "company number" in key_text.lower()) and isinstance(json_value, list):
cell_replacements = handle_australian_company_number(row, json_value)
replacements_made += cell_replacements
else:
# Handle regular fields
for cell_idx in range(len(row.cells)):
cell = row.cells[cell_idx]
if has_red_text(cell):
cell_replacements = replace_red_text_in_cell(cell, replacement_text)
replacements_made += cell_replacements
if cell_replacements > 0:
print(f" β
Updated cell {cell_idx + 1}: '{replacement_text}'")
else:
# Process any red text in row cells
for cell_idx in range(len(row.cells)):
cell = row.cells[cell_idx]
if has_red_text(cell):
# Try to extract red text and match it
red_text = ""
for paragraph in cell.paragraphs:
for run in paragraph.runs:
if is_red(run):
red_text += run.text
if red_text.strip():
json_value = find_matching_json_value(red_text.strip(), flat_json)
if json_value is not None:
replacement_text = get_value_as_string(json_value)
cell_replacements = replace_red_text_in_cell(cell, replacement_text)
replacements_made += cell_replacements
if cell_replacements > 0:
print(f" β
Replaced red text: '{red_text.strip()}' -> '{replacement_text}'")
return replacements_made
def process_paragraphs(document, flat_json):
"""Process paragraphs for red text"""
replacements_made = 0
print(f"\nπ Processing paragraphs:")
for para_idx, paragraph in enumerate(document.paragraphs):
red_text = ""
red_runs = []
for run in paragraph.runs:
if is_red(run) and run.text.strip():
red_text += run.text
red_runs.append(run)
if red_text.strip():
print(f" π Paragraph {para_idx + 1}: Found red text: '{red_text.strip()}'")
json_value = find_matching_json_value(red_text.strip(), flat_json)
if json_value is not None:
replacement_text = get_value_as_string(json_value)
print(f" β
Replacing with: '{replacement_text}'")
# Replace in first red run only
if red_runs:
red_runs[0].text = replacement_text
red_runs[0].font.color.rgb = RGBColor(0, 0, 0)
# Clear other red runs
for run in red_runs[1:]:
run.text = ''
replacements_made += 1
return replacements_made
def process_hf(json_file, docx_file, output_file):
"""Main processing function compatible with your new system"""
try:
# Load JSON
if hasattr(json_file, "read"):
json_data = json.load(json_file)
else:
with open(json_file, 'r', encoding='utf-8') as f:
json_data = json.load(f)
# Flatten your new JSON structure
flat_json = flatten_json_new_system(json_data)
print("π Available JSON keys (sample):")
for i, (key, value) in enumerate(sorted(flat_json.items())):
if i < 10:
print(f" - {key}: {value}")
print(f" ... and {len(flat_json) - 10} more keys\n")
# Load DOCX
if hasattr(docx_file, "read"):
doc = Document(docx_file)
else:
doc = Document(docx_file)
# Process document
print("π Starting processing compatible with your new system...")
table_replacements = process_tables(doc, flat_json)
paragraph_replacements = process_paragraphs(doc, flat_json)
total_replacements = table_replacements + paragraph_replacements
# Save output
if hasattr(output_file, "write"):
doc.save(output_file)
else:
doc.save(output_file)
print(f"\nβ
Document saved as: {output_file}")
print(f"β
Total replacements: {total_replacements}")
print(f" π Tables: {table_replacements}")
print(f" π Paragraphs: {paragraph_replacements}")
print(f"π Processing complete!")
except FileNotFoundError as e:
print(f"β File not found: {e}")
except Exception as e:
print(f"β Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
import sys
if len(sys.argv) != 4:
print("Usage: python compatible_pipeline.py <input_docx> <updated_json> <output_docx>")
exit(1)
docx_path = sys.argv[1]
json_path = sys.argv[2]
output_path = sys.argv[3]
process_hf(json_path, docx_path, output_path) |