Spaces:
Paused
Paused
| """Enhanced field extraction utilities for Dots.OCR text processing. | |
| This module provides improved field extraction and mapping from OCR results | |
| to structured KYB field formats with better confidence scoring and validation. | |
| """ | |
| import re | |
| import logging | |
| from typing import Optional, Dict, List, Tuple, Any | |
| from datetime import datetime | |
| from .api_models import ExtractedField, IdCardFields, MRZData | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| class EnhancedFieldExtractor: | |
| """Enhanced field extraction with improved confidence scoring and validation.""" | |
| # Enhanced field mapping patterns with confidence scoring | |
| FIELD_PATTERNS = { | |
| "document_number": [ | |
| (r"documentnummer[:\s]*([A-Z0-9]{6,15})", 0.9), # Dutch format | |
| (r"document\s*number[:\s]*([A-Z0-9]{6,15})", 0.85), # English format | |
| (r"nr[:\s]*([A-Z0-9]{6,15})", 0.7), # Abbreviated format | |
| (r"ID[:\s]*([A-Z0-9]{6,15})", 0.8), # ID format | |
| (r"([A-Z]{3}\d{9})", 0.75), # Passport format (3 letters + 9 digits) | |
| ], | |
| "surname": [ | |
| # Anchor to line and capture value up to newline to avoid spilling into next label | |
| (r"^\s*achternaam[:\s]*([^\r\n]+)", 0.95), # Dutch format (line-anchored) | |
| (r"^\s*surname[:\s]*([^\r\n]+)", 0.9), # English format (line-anchored) | |
| (r"^\s*family\s*name[:\s]*([^\r\n]+)", 0.85), # Full English | |
| (r"^\s*last\s*name[:\s]*([^\r\n]+)", 0.85), # Alternative English | |
| ], | |
| "given_names": [ | |
| (r"^\s*voornamen[:\s]*([^\r\n]+)", 0.95), # Dutch format (line-anchored) | |
| ( | |
| r"^\s*given\s*names[:\s]*([^\r\n]+)", | |
| 0.9, | |
| ), # English format (line-anchored) | |
| (r"^\s*first\s*name[:\s]*([^\r\n]+)", 0.85), # First name only | |
| (r"^\s*voorletters[:\s]*([^\r\n]+)", 0.75), # Dutch initials | |
| ], | |
| "nationality": [ | |
| (r"nationaliteit[:\s]*([A-Z]{3})", 0.9), # Dutch format (3-letter code) | |
| (r"nationality[:\s]*([A-Z]{3})", 0.85), # English format | |
| (r"nationality[:\s]*([A-Za-z\s]{3,20})", 0.7), # Full country name | |
| ], | |
| "date_of_birth": [ | |
| (r"geboortedatum[:\s]*(\d{2}[./-]\d{2}[./-]\d{4})", 0.9), # Dutch format | |
| ( | |
| r"date\s*of\s*birth[:\s]*(\d{2}[./-]\d{2}[./-]\d{4})", | |
| 0.85, | |
| ), # English format | |
| (r"born[:\s]*(\d{2}[./-]\d{2}[./-]\d{4})", 0.8), # Short English | |
| (r"(\d{2}[./-]\d{2}[./-]\d{4})", 0.6), # Generic date pattern | |
| ], | |
| "gender": [ | |
| (r"geslacht[:\s]*([MF])", 0.9), # Dutch format | |
| (r"gender[:\s]*([MF])", 0.85), # English format | |
| (r"sex[:\s]*([MF])", 0.8), # Alternative English | |
| (r"geslacht[:\s]*(man|vrouw)", 0.7), # Dutch full words | |
| (r"gender[:\s]*(male|female)", 0.7), # English full words | |
| ], | |
| "place_of_birth": [ | |
| (r"geboorteplaats[:\s]*([A-Za-z\s]{2,30})", 0.9), # Dutch format | |
| (r"place\s*of\s*birth[:\s]*([A-Za-z\s]{2,30})", 0.85), # English format | |
| (r"born\s*in[:\s]*([A-Za-z\s]{2,30})", 0.8), # Short English | |
| ], | |
| "date_of_issue": [ | |
| (r"uitgiftedatum[:\s]*(\d{2}[./-]\d{2}[./-]\d{4})", 0.9), # Dutch format | |
| ( | |
| r"date\s*of\s*issue[:\s]*(\d{2}[./-]\d{2}[./-]\d{4})", | |
| 0.85, | |
| ), # English format | |
| (r"issued[:\s]*(\d{2}[./-]\d{2}[./-]\d{4})", 0.8), # Short English | |
| ], | |
| "date_of_expiry": [ | |
| (r"vervaldatum[:\s]*(\d{2}[./-]\d{2}[./-]\d{4})", 0.9), # Dutch format | |
| ( | |
| r"date\s*of\s*expiry[:\s]*(\d{2}[./-]\d{2}[./-]\d{4})", | |
| 0.85, | |
| ), # English format | |
| (r"expires[:\s]*(\d{2}[./-]\d{2}[./-]\d{4})", 0.8), # Short English | |
| ( | |
| r"valid\s*until[:\s]*(\d{2}[./-]\d{2}[./-]\d{4})", | |
| 0.8, | |
| ), # Alternative English | |
| ], | |
| "personal_number": [ | |
| (r"persoonsnummer[:\s]*(\d{9})", 0.9), # Dutch format | |
| (r"personal\s*number[:\s]*(\d{9})", 0.85), # English format | |
| (r"bsn[:\s]*(\d{9})", 0.9), # Dutch BSN | |
| (r"social\s*security[:\s]*(\d{9})", 0.8), # SSN format | |
| ], | |
| "document_type": [ | |
| (r"document\s*type[:\s]*([A-Za-z\s]{3,20})", 0.8), # English format | |
| (r"soort\s*document[:\s]*([A-Za-z\s]{3,20})", 0.9), # Dutch format | |
| (r"(passport|paspoort)", 0.9), # Passport | |
| (r"(identity\s*card|identiteitskaart)", 0.9), # ID card | |
| (r"(driving\s*license|rijbewijs)", 0.9), # Driving license | |
| ], | |
| "issuing_country": [ | |
| (r"issuing\s*country[:\s]*([A-Z]{3})", 0.85), # English format | |
| (r"uitgevende\s*land[:\s]*([A-Z]{3})", 0.9), # Dutch format | |
| (r"country[:\s]*([A-Z]{3})", 0.7), # Short format | |
| ], | |
| "issuing_authority": [ | |
| (r"issuing\s*authority[:\s]*([A-Za-z\s]{3,30})", 0.8), # English format | |
| (r"uitgevende\s*autoriteit[:\s]*([A-Za-z\s]{3,30})", 0.9), # Dutch format | |
| (r"authority[:\s]*([A-Za-z\s]{3,30})", 0.7), # Short format | |
| ], | |
| } | |
| # MRZ patterns with confidence scoring | |
| MRZ_PATTERNS = [ | |
| # Strict formats first, allowing leading/trailing whitespace per line | |
| ( | |
| r"^\s*((?:[A-Z0-9<]{44})\s*\n\s*(?:[A-Z0-9<]{44}))\s*$", | |
| 0.95, | |
| ), # TD3: Passport (2 x 44) | |
| ( | |
| r"^\s*((?:[A-Z0-9<]{36})\s*\n\s*(?:[A-Z0-9<]{36}))\s*$", | |
| 0.9, | |
| ), # TD2: ID card (2 x 36) | |
| ( | |
| r"^\s*((?:[A-Z0-9<]{30})\s*\n\s*(?:[A-Z0-9<]{30})\s*\n\s*(?:[A-Z0-9<]{30}))\s*$", | |
| 0.85, | |
| ), # TD1: (3 x 30) | |
| # Fallback generic: a line starting with P< followed by another MRZ-like line | |
| (r"(P<[^\r\n]+\n[^\r\n]+)", 0.85), | |
| ] | |
| def extract_fields(cls, ocr_text: str) -> IdCardFields: | |
| """Extract structured fields from OCR text with enhanced confidence scoring. | |
| Args: | |
| ocr_text: Raw OCR text from document processing | |
| Returns: | |
| IdCardFields object with extracted field data | |
| """ | |
| logger.info(f"Extracting fields from text of length: {len(ocr_text)}") | |
| fields = {} | |
| extraction_stats = {"total_patterns": 0, "matches_found": 0} | |
| for field_name, patterns in cls.FIELD_PATTERNS.items(): | |
| value = None | |
| confidence = 0.0 | |
| best_pattern = None | |
| for pattern, base_confidence in patterns: | |
| extraction_stats["total_patterns"] += 1 | |
| match = re.search(pattern, ocr_text, re.IGNORECASE | re.MULTILINE) | |
| if match: | |
| candidate_value = match.group(1).strip() | |
| # Validate the extracted value | |
| if cls._validate_field_value(field_name, candidate_value): | |
| value = candidate_value | |
| confidence = base_confidence | |
| best_pattern = pattern | |
| extraction_stats["matches_found"] += 1 | |
| logger.debug( | |
| f"Found {field_name}: '{value}' (confidence: {confidence:.2f})" | |
| ) | |
| break | |
| if value: | |
| # Apply additional confidence adjustments | |
| confidence = cls._adjust_confidence( | |
| field_name, value, confidence, ocr_text | |
| ) | |
| fields[field_name] = ExtractedField( | |
| field_name=field_name, | |
| value=value, | |
| confidence=confidence, | |
| source="ocr", | |
| ) | |
| logger.info( | |
| f"Field extraction complete: {extraction_stats['matches_found']}/{extraction_stats['total_patterns']} patterns matched" | |
| ) | |
| return IdCardFields(**fields) | |
| def _validate_field_value(cls, field_name: str, value: str) -> bool: | |
| """Validate extracted field value based on field type. | |
| Args: | |
| field_name: Name of the field | |
| value: Extracted value to validate | |
| Returns: | |
| True if value is valid | |
| """ | |
| if not value or len(value.strip()) == 0: | |
| return False | |
| # Field-specific validation | |
| if field_name == "document_number": | |
| return len(value) >= 6 and len(value) <= 15 | |
| elif field_name in ["surname", "given_names", "place_of_birth"]: | |
| return len(value) >= 2 and len(value) <= 50 | |
| elif field_name == "nationality": | |
| return len(value) == 3 and value.isalpha() | |
| elif field_name in ["date_of_birth", "date_of_issue", "date_of_expiry"]: | |
| return cls._validate_date_format(value) | |
| elif field_name == "gender": | |
| return value.upper() in ["M", "F", "MALE", "FEMALE", "MAN", "VROUW"] | |
| elif field_name == "personal_number": | |
| return len(value) == 9 and value.isdigit() | |
| elif field_name == "issuing_country": | |
| return len(value) == 3 and value.isalpha() | |
| return True | |
| def _validate_date_format(cls, date_str: str) -> bool: | |
| """Validate date format and basic date logic. | |
| Args: | |
| date_str: Date string to validate | |
| Returns: | |
| True if date format is valid | |
| """ | |
| try: | |
| # Try different date separators | |
| for sep in [".", "/", "-"]: | |
| if sep in date_str: | |
| parts = date_str.split(sep) | |
| if len(parts) == 3: | |
| day, month, year = parts | |
| # Basic validation | |
| if ( | |
| 1 <= int(day) <= 31 | |
| and 1 <= int(month) <= 12 | |
| and 1900 <= int(year) <= 2100 | |
| ): | |
| return True | |
| except (ValueError, IndexError): | |
| pass | |
| return False | |
| def _adjust_confidence( | |
| cls, field_name: str, value: str, base_confidence: float, full_text: str | |
| ) -> float: | |
| """Adjust confidence based on additional factors. | |
| Args: | |
| field_name: Name of the field | |
| value: Extracted value | |
| base_confidence: Base confidence from pattern matching | |
| full_text: Full OCR text for context | |
| Returns: | |
| Adjusted confidence score | |
| """ | |
| confidence = base_confidence | |
| # Length-based adjustments | |
| if field_name in ["surname", "given_names"] and len(value) < 3: | |
| confidence *= 0.8 # Shorter names are less reliable | |
| # Context-based adjustments | |
| if field_name == "document_number" and "passport" in full_text.lower(): | |
| confidence *= 1.1 # Higher confidence in passport context | |
| # Multiple occurrence bonus | |
| if value in full_text and full_text.count(value) > 1: | |
| confidence *= 1.05 # Slight bonus for repeated values | |
| # Ensure confidence stays within bounds | |
| return min(max(confidence, 0.0), 1.0) | |
| def extract_mrz(cls, ocr_text: str) -> Optional[MRZData]: | |
| """Extract MRZ data from OCR text with enhanced validation. | |
| Args: | |
| ocr_text: Raw OCR text from document processing | |
| Returns: | |
| MRZData object if MRZ detected, None otherwise | |
| """ | |
| logger.info("Extracting MRZ data from OCR text") | |
| best_match = None | |
| best_confidence = 0.0 | |
| for pattern, base_confidence in cls.MRZ_PATTERNS: | |
| match = re.search(pattern, ocr_text, re.MULTILINE) | |
| if match: | |
| raw_mrz = match.group(1) | |
| # Validate MRZ format | |
| if cls._validate_mrz_format(raw_mrz): | |
| confidence = base_confidence | |
| # Adjust confidence based on MRZ quality | |
| confidence = cls._adjust_mrz_confidence(raw_mrz, confidence) | |
| if confidence > best_confidence: | |
| best_match = raw_mrz | |
| best_confidence = confidence | |
| logger.debug(f"Found MRZ with confidence {confidence:.2f}") | |
| if best_match: | |
| # Parse MRZ to determine format type | |
| format_type = cls._determine_mrz_format(best_match) | |
| # Basic checksum validation | |
| is_valid, errors = cls._validate_mrz_checksums(best_match, format_type) | |
| logger.info(f"MRZ extracted: {format_type} format, valid: {is_valid}") | |
| # Convert to the format expected by the API | |
| from .api_models import MRZData as APIMRZData | |
| # Populate both canonical and legacy alias fields for compatibility | |
| return APIMRZData( | |
| document_type=format_type, | |
| format_type=format_type, # legacy alias | |
| issuing_country=None, # would be parsed in full impl | |
| surname=None, | |
| given_names=None, | |
| document_number=None, | |
| nationality=None, | |
| date_of_birth=None, | |
| gender=None, | |
| date_of_expiry=None, | |
| personal_number=None, | |
| raw_mrz=best_match, | |
| raw_text=best_match, # legacy alias | |
| confidence=best_confidence, | |
| ) | |
| logger.info("No MRZ data found in OCR text") | |
| return None | |
| def _validate_mrz_format(cls, mrz_text: str) -> bool: | |
| """Validate basic MRZ format. | |
| Args: | |
| mrz_text: Raw MRZ text | |
| Returns: | |
| True if format is valid | |
| """ | |
| lines = mrz_text.strip().split("\n") | |
| if len(lines) < 2: | |
| return False | |
| # Normalize whitespace and validate character set only. | |
| normalized_lines = [re.sub(r"\s+", "", line) for line in lines] | |
| for line in normalized_lines: | |
| if not re.match(r"^[A-Z0-9<]+$", line): | |
| return False | |
| return True | |
| def _determine_mrz_format(cls, mrz_text: str) -> str: | |
| """Determine MRZ format type. | |
| Args: | |
| mrz_text: Raw MRZ text | |
| Returns: | |
| Format type (TD1, TD2, TD3, etc.) | |
| """ | |
| lines = mrz_text.strip().split("\n") | |
| lines = [re.sub(r"\s+", "", line) for line in lines] | |
| line_count = len(lines) | |
| line_length = len(lines[0]) if lines else 0 | |
| # Heuristic mapping: prioritize semantics over exact lengths for robustness | |
| if line_count == 2 and lines[0].startswith("P<"): | |
| return "TD3" # Passport format commonly starts with P< | |
| if line_count == 2 and line_length == 36: | |
| return "TD2" # ID card format | |
| if line_count == 3: | |
| return "TD1" | |
| return "UNKNOWN" | |
| def _adjust_mrz_confidence(cls, mrz_text: str, base_confidence: float) -> float: | |
| """Adjust MRZ confidence based on quality indicators. | |
| Args: | |
| mrz_text: Raw MRZ text | |
| base_confidence: Base confidence from pattern matching | |
| Returns: | |
| Adjusted confidence | |
| """ | |
| confidence = base_confidence | |
| # Check line consistency | |
| lines = mrz_text.strip().split("\n") | |
| if len(set(len(line) for line in lines)) == 1: | |
| confidence *= 1.05 # Bonus for consistent line lengths | |
| return min(max(confidence, 0.0), 1.0) | |
| def _validate_mrz_checksums( | |
| cls, mrz_text: str, format_type: str | |
| ) -> Tuple[bool, List[str]]: | |
| """Validate MRZ checksums (simplified implementation). | |
| Args: | |
| mrz_text: Raw MRZ text | |
| format_type: MRZ format type | |
| Returns: | |
| Tuple of (is_valid, list_of_errors) | |
| """ | |
| # This is a simplified implementation | |
| # In production, you would implement full MRZ checksum validation | |
| errors = [] | |
| # Basic validation - check for reasonable character distribution | |
| if mrz_text.count("<") > len(mrz_text) * 0.3: | |
| errors.append("Too many fill characters") | |
| # For now, assume valid if basic format is correct | |
| is_valid = len(errors) == 0 | |
| return is_valid, errors | |
| # Backward compatibility - use enhanced extractor as default | |
| class FieldExtractor(EnhancedFieldExtractor): | |
| """Backward compatible field extractor using enhanced implementation.""" | |
| pass | |