Spaces:
Paused
Paused
| """Image and PDF preprocessing utilities for Dots.OCR. | |
| This module handles PDF to image conversion, image preprocessing, | |
| and multi-page document processing for the Dots.OCR model. | |
| """ | |
| import os | |
| import logging | |
| from typing import List, Tuple, Optional, Union | |
| from pathlib import Path | |
| import io | |
| import fitz # PyMuPDF | |
| import numpy as np | |
| from PIL import Image, ImageOps | |
| import cv2 | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| # Environment variable configuration | |
| PDF_DPI = int(os.getenv("DOTS_OCR_PDF_DPI", "300")) | |
| PDF_MAX_PAGES = int(os.getenv("DOTS_OCR_PDF_MAX_PAGES", "10")) | |
| IMAGE_MAX_SIZE = ( | |
| int(os.getenv("DOTS_OCR_IMAGE_MAX_SIZE", "10")) * 1024 * 1024 | |
| ) # 10MB default | |
| class ImagePreprocessor: | |
| """Handles image preprocessing for Dots.OCR model.""" | |
| def __init__( | |
| self, min_pixels: int = 3136, max_pixels: int = 11289600, divisor: int = 28 | |
| ): | |
| """Initialize the image preprocessor. | |
| Args: | |
| min_pixels: Minimum pixel count for images | |
| max_pixels: Maximum pixel count for images | |
| divisor: Required divisor for image dimensions | |
| """ | |
| self.min_pixels = min_pixels | |
| self.max_pixels = max_pixels | |
| self.divisor = divisor | |
| def preprocess_image(self, image: Image.Image) -> Image.Image: | |
| """Preprocess an image to meet model requirements. | |
| Args: | |
| image: Input PIL Image | |
| Returns: | |
| Preprocessed PIL Image | |
| """ | |
| # Convert to RGB if necessary | |
| if image.mode != "RGB": | |
| image = image.convert("RGB") | |
| # Auto-orient image based on EXIF data | |
| image = ImageOps.exif_transpose(image) | |
| # Calculate current pixel count | |
| width, height = image.size | |
| current_pixels = width * height | |
| logger.info(f"Original image size: {width}x{height} ({current_pixels} pixels)") | |
| # Resize if necessary to meet pixel requirements | |
| if current_pixels < self.min_pixels: | |
| # Scale up to meet minimum pixel requirement | |
| scale_factor = (self.min_pixels / current_pixels) ** 0.5 | |
| new_width = int(width * scale_factor) | |
| new_height = int(height * scale_factor) | |
| image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| logger.info(f"Scaled up image to {new_width}x{new_height}") | |
| elif current_pixels > self.max_pixels: | |
| # Scale down to meet maximum pixel requirement | |
| scale_factor = (self.max_pixels / current_pixels) ** 0.5 | |
| new_width = int(width * scale_factor) | |
| new_height = int(height * scale_factor) | |
| image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| logger.info(f"Scaled down image to {new_width}x{new_height}") | |
| # Ensure dimensions are divisible by the required divisor | |
| width, height = image.size | |
| new_width = ((width + self.divisor - 1) // self.divisor) * self.divisor | |
| new_height = ((height + self.divisor - 1) // self.divisor) * self.divisor | |
| if new_width != width or new_height != height: | |
| image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| logger.info( | |
| f"Adjusted dimensions to be divisible by {self.divisor}: {new_width}x{new_height}" | |
| ) | |
| return image | |
| def crop_by_roi( | |
| self, image: Image.Image, roi: Tuple[float, float, float, float] | |
| ) -> Image.Image: | |
| """Crop image using ROI coordinates. | |
| Args: | |
| image: Input PIL Image | |
| roi: ROI coordinates as (x1, y1, x2, y2) normalized to [0, 1] | |
| Returns: | |
| Cropped PIL Image | |
| """ | |
| x1, y1, x2, y2 = roi | |
| width, height = image.size | |
| # Convert normalized coordinates to pixel coordinates | |
| x1_px = int(x1 * width) | |
| y1_px = int(y1 * height) | |
| x2_px = int(x2 * width) | |
| y2_px = int(y2 * height) | |
| # Ensure coordinates are within image bounds | |
| x1_px = max(0, min(x1_px, width)) | |
| y1_px = max(0, min(y1_px, height)) | |
| x2_px = max(x1_px, min(x2_px, width)) | |
| y2_px = max(y1_px, min(y2_px, height)) | |
| # Crop the image | |
| cropped = image.crop((x1_px, y1_px, x2_px, y2_px)) | |
| logger.info(f"Cropped image to {x2_px - x1_px}x{y2_px - y1_px} pixels") | |
| return cropped | |
| class PDFProcessor: | |
| """Handles PDF to image conversion and multi-page processing.""" | |
| def __init__(self, dpi: int = PDF_DPI, max_pages: int = PDF_MAX_PAGES): | |
| """Initialize the PDF processor. | |
| Args: | |
| dpi: DPI for PDF to image conversion | |
| max_pages: Maximum number of pages to process | |
| """ | |
| self.dpi = dpi | |
| self.max_pages = max_pages | |
| def pdf_to_images(self, pdf_data: bytes) -> List[Image.Image]: | |
| """Convert PDF to list of images. | |
| Args: | |
| pdf_data: PDF file data as bytes | |
| Returns: | |
| List of PIL Images, one per page | |
| """ | |
| try: | |
| # Open PDF from bytes | |
| pdf_document = fitz.open(stream=pdf_data, filetype="pdf") | |
| images = [] | |
| # Limit number of pages to process | |
| num_pages = min(len(pdf_document), self.max_pages) | |
| logger.info(f"Processing {num_pages} pages from PDF") | |
| for page_num in range(num_pages): | |
| page = pdf_document[page_num] | |
| # Convert page to image | |
| mat = fitz.Matrix(self.dpi / 72, self.dpi / 72) # 72 is default DPI | |
| pix = page.get_pixmap(matrix=mat) | |
| # Convert to PIL Image | |
| img_data = pix.tobytes("png") | |
| image = Image.open(io.BytesIO(img_data)) | |
| images.append(image) | |
| logger.info(f"Converted page {page_num + 1} to image: {image.size}") | |
| pdf_document.close() | |
| return images | |
| except Exception as e: | |
| logger.error(f"Failed to convert PDF to images: {e}") | |
| raise RuntimeError(f"PDF conversion failed: {e}") | |
| def is_pdf(self, file_data: bytes) -> bool: | |
| """Check if file data is a PDF. | |
| Args: | |
| file_data: File data as bytes | |
| Returns: | |
| True if file is a PDF | |
| """ | |
| return file_data.startswith(b"%PDF-") | |
| def get_pdf_page_count(self, pdf_data: bytes) -> int: | |
| """Get the number of pages in a PDF. | |
| Args: | |
| pdf_data: PDF file data as bytes | |
| Returns: | |
| Number of pages in the PDF | |
| """ | |
| try: | |
| pdf_document = fitz.open(stream=pdf_data, filetype="pdf") | |
| page_count = len(pdf_document) | |
| pdf_document.close() | |
| return page_count | |
| except Exception as e: | |
| logger.error(f"Failed to get PDF page count: {e}") | |
| return 0 | |
| class DocumentProcessor: | |
| """Main document processing class that handles both images and PDFs.""" | |
| def __init__(self): | |
| """Initialize the document processor.""" | |
| self.image_preprocessor = ImagePreprocessor() | |
| self.pdf_processor = PDFProcessor() | |
| def process_document( | |
| self, file_data: bytes, roi: Optional[Tuple[float, float, float, float]] = None | |
| ) -> List[Image.Image]: | |
| """Process a document (image or PDF) and return preprocessed images. | |
| Args: | |
| file_data: Document file data as bytes | |
| roi: Optional ROI coordinates as (x1, y1, x2, y2) normalized to [0, 1] | |
| Returns: | |
| List of preprocessed PIL Images | |
| """ | |
| # Check if it's a PDF | |
| if self.pdf_processor.is_pdf(file_data): | |
| logger.info("Processing PDF document") | |
| images = self.pdf_processor.pdf_to_images(file_data) | |
| else: | |
| # Process as image | |
| logger.info("Processing image document") | |
| try: | |
| image = Image.open(io.BytesIO(file_data)) | |
| images = [image] | |
| except Exception as e: | |
| logger.error(f"Failed to open image: {e}") | |
| raise RuntimeError(f"Image processing failed: {e}") | |
| # Preprocess each image | |
| processed_images = [] | |
| for i, image in enumerate(images): | |
| try: | |
| # Apply ROI cropping if provided | |
| if roi is not None: | |
| image = self.image_preprocessor.crop_by_roi(image, roi) | |
| # Preprocess image for model requirements | |
| processed_image = self.image_preprocessor.preprocess_image(image) | |
| processed_images.append(processed_image) | |
| logger.info(f"Processed image {i + 1}: {processed_image.size}") | |
| except Exception as e: | |
| logger.error(f"Failed to preprocess image {i + 1}: {e}") | |
| # Continue with other images even if one fails | |
| continue | |
| if not processed_images: | |
| raise RuntimeError("No images could be processed from the document") | |
| logger.info(f"Successfully processed {len(processed_images)} images") | |
| return processed_images | |
| def validate_file_size(self, file_data: bytes) -> bool: | |
| """Validate that file size is within limits. | |
| Args: | |
| file_data: File data as bytes | |
| Returns: | |
| True if file size is acceptable | |
| """ | |
| file_size = len(file_data) | |
| if file_size > IMAGE_MAX_SIZE: | |
| logger.warning(f"File size {file_size} exceeds limit {IMAGE_MAX_SIZE}") | |
| return False | |
| return True | |
| def get_document_info(self, file_data: bytes) -> dict: | |
| """Get information about the document. | |
| Args: | |
| file_data: Document file data as bytes | |
| Returns: | |
| Dictionary with document information | |
| """ | |
| info = { | |
| "file_size": len(file_data), | |
| "is_pdf": self.pdf_processor.is_pdf(file_data), | |
| "page_count": 1, | |
| } | |
| if info["is_pdf"]: | |
| info["page_count"] = self.pdf_processor.get_pdf_page_count(file_data) | |
| return info | |
| # Global document processor instance | |
| _document_processor: Optional[DocumentProcessor] = None | |
| def get_document_processor() -> DocumentProcessor: | |
| """Get the global document processor instance.""" | |
| global _document_processor | |
| if _document_processor is None: | |
| _document_processor = DocumentProcessor() | |
| return _document_processor | |
| def process_document( | |
| file_data: bytes, roi: Optional[Tuple[float, float, float, float]] = None | |
| ) -> List[Image.Image]: | |
| """Process a document and return preprocessed images.""" | |
| processor = get_document_processor() | |
| return processor.process_document(file_data, roi) | |
| def validate_file_size(file_data: bytes) -> bool: | |
| """Validate that file size is within limits.""" | |
| processor = get_document_processor() | |
| return processor.validate_file_size(file_data) | |
| def get_document_info(file_data: bytes) -> dict: | |
| """Get information about the document.""" | |
| processor = get_document_processor() | |
| return processor.get_document_info(file_data) | |