Spaces:
Running
Running
| import io | |
| import mimetypes | |
| from typing import Tuple, Optional, BinaryIO | |
| from PIL import Image, ImageOps | |
| # Import PyMuPDF for PDF processing | |
| try: | |
| import fitz # PyMuPDF for PDF processing | |
| FITZ_AVAILABLE = True | |
| except ImportError: | |
| try: | |
| from PyMuPDF import fitz # Alternative import method | |
| FITZ_AVAILABLE = True | |
| except ImportError: | |
| try: | |
| import PyMuPDF as fitz # Another alternative import | |
| FITZ_AVAILABLE = True | |
| except ImportError: | |
| fitz = None # PDF processing will be disabled | |
| FITZ_AVAILABLE = False | |
| import tempfile | |
| import os | |
| class ImagePreprocessor: | |
| """Service for preprocessing various image formats before storage""" | |
| # Configuration options for performance tuning | |
| PDF_ZOOM_FACTOR = 1.5 # Reduce from 2.0 for better performance | |
| PDF_COMPRESS_LEVEL = 6 # PNG compression level (0-9, higher = smaller but slower) | |
| PDF_QUALITY_MODE = 'balanced' # 'fast', 'balanced', or 'quality' | |
| # PyMuPDF availability flag | |
| FITZ_AVAILABLE = FITZ_AVAILABLE | |
| SUPPORTED_IMAGE_MIME_TYPES = { | |
| 'image/png', | |
| 'image/jpeg', | |
| 'image/jpg', | |
| 'image/heic', | |
| 'image/heif', | |
| 'image/webp', | |
| 'image/gif', | |
| 'image/tiff', | |
| 'image/tif', | |
| 'application/pdf' | |
| } | |
| def detect_mime_type(file_content: bytes, filename: str) -> str: | |
| """Detect MIME type from file content and filename""" | |
| # First try to detect from content | |
| mime_type, _ = mimetypes.guess_type(filename) | |
| # If no MIME type detected, try to infer from content | |
| if not mime_type: | |
| # Check for common file signatures | |
| if file_content.startswith(b'\x89PNG\r\n\x1a\n'): | |
| mime_type = 'image/png' | |
| elif file_content.startswith(b'\xff\xd8\xff'): | |
| mime_type = 'image/jpeg' | |
| elif file_content.startswith(b'\x49\x49\x2a\x00') or file_content.startswith(b'\x4d\x4d\x00\x2a'): | |
| mime_type = 'image/tiff' | |
| elif file_content.startswith(b'%PDF'): | |
| mime_type = 'application/pdf' | |
| elif file_content.startswith(b'GIF87a') or file_content.startswith(b'GIF89a'): | |
| mime_type = 'image/gif' | |
| elif file_content.startswith(b'RIFF') and file_content[8:12] == b'WEBP': | |
| mime_type = 'image/webp' | |
| elif file_content.startswith(b'\x00\x00\x00\x20ftypheic') or file_content.startswith(b'\x00\x00\x00\x20ftypheix'): | |
| mime_type = 'image/heic' | |
| else: | |
| mime_type = 'application/octet-stream' | |
| return mime_type | |
| def needs_preprocessing(mime_type: str) -> bool: | |
| """Check if the file needs preprocessing""" | |
| return mime_type not in {'image/png', 'image/jpeg', 'image/jpg'} | |
| def preprocess_image( | |
| file_content: bytes, | |
| filename: str, | |
| target_format: str = 'PNG', | |
| quality: int = 95 | |
| ) -> Tuple[bytes, str, str]: | |
| """ | |
| Preprocess image and return processed content, new filename, and MIME type | |
| Args: | |
| file_content: Raw file content | |
| filename: Original filename | |
| target_format: Target format ('PNG' or 'JPEG') | |
| quality: JPEG quality (1-100, only used for JPEG) | |
| Returns: | |
| Tuple of (processed_content, new_filename, mime_type) | |
| """ | |
| mime_type = ImagePreprocessor.detect_mime_type(file_content, filename) | |
| if not ImagePreprocessor.needs_preprocessing(mime_type): | |
| # No preprocessing needed | |
| return file_content, filename, mime_type | |
| try: | |
| if mime_type == 'application/pdf': | |
| return ImagePreprocessor._process_pdf(file_content, filename, target_format, quality) | |
| elif mime_type in {'image/tiff', 'image/tif'}: | |
| return ImagePreprocessor._process_tiff(file_content, filename, target_format, quality) | |
| elif mime_type in {'image/heic', 'image/heif'}: | |
| return ImagePreprocessor._process_heic(file_content, filename, target_format, quality) | |
| elif mime_type == 'image/webp': | |
| return ImagePreprocessor._process_webp(file_content, filename, target_format, quality) | |
| elif mime_type == 'image/gif': | |
| return ImagePreprocessor._process_gif(file_content, filename, target_format, quality) | |
| else: | |
| # Unsupported format, try to open with PIL as fallback | |
| return ImagePreprocessor._process_generic(file_content, filename, target_format, quality) | |
| except Exception as e: | |
| raise ValueError(f"Failed to preprocess {mime_type} file: {str(e)}") | |
| def configure_pdf_processing(zoom_factor: float = 1.5, compress_level: int = 6, quality_mode: str = 'balanced'): | |
| """ | |
| Configure PDF processing performance settings | |
| Args: | |
| zoom_factor: PDF zoom factor (1.0 = original size, 2.0 = 2x size) | |
| Lower values = faster processing, lower quality | |
| Higher values = slower processing, higher quality | |
| compress_level: PNG compression level (0-9) | |
| Lower values = faster compression, larger files | |
| Higher values = slower compression, smaller files | |
| quality_mode: Processing mode ('fast', 'balanced', 'quality') | |
| """ | |
| if quality_mode == 'fast': | |
| ImagePreprocessor.PDF_ZOOM_FACTOR = 1.0 | |
| ImagePreprocessor.PDF_COMPRESS_LEVEL = 3 | |
| elif quality_mode == 'quality': | |
| ImagePreprocessor.PDF_ZOOM_FACTOR = 2.0 | |
| ImagePreprocessor.PDF_COMPRESS_LEVEL = 9 | |
| else: # balanced | |
| ImagePreprocessor.PDF_ZOOM_FACTOR = zoom_factor | |
| ImagePreprocessor.PDF_COMPRESS_LEVEL = compress_level | |
| print(f"PDF processing configured: zoom={ImagePreprocessor.PDF_ZOOM_FACTOR}, " | |
| f"compression={ImagePreprocessor.PDF_COMPRESS_LEVEL}, mode={quality_mode}") | |
| def _process_pdf( | |
| file_content: bytes, | |
| filename: str, | |
| target_format: str, | |
| quality: int | |
| ) -> Tuple[bytes, str, str]: | |
| """Process PDF files by rasterizing the first page""" | |
| if not ImagePreprocessor.FITZ_AVAILABLE: | |
| raise ValueError("PDF processing is not available. PyMuPDF is not installed.") | |
| try: | |
| print(f"Starting PDF processing for {filename}...") | |
| # Open PDF with PyMuPDF | |
| pdf_document = fitz.open(stream=file_content, filetype="pdf") | |
| if len(pdf_document) == 0: | |
| raise ValueError("PDF has no pages") | |
| print(f"PDF opened successfully, processing page 1 of {len(pdf_document)}...") | |
| # Get first page | |
| page = pdf_document[0] | |
| # Use configurable zoom factor for performance tuning | |
| zoom = ImagePreprocessor.PDF_ZOOM_FACTOR | |
| mat = fitz.Matrix(zoom, zoom) | |
| print(f"Rendering page at {zoom}x zoom...") | |
| # Render page to image with optimized settings | |
| pix = page.get_pixmap( | |
| matrix=mat, | |
| alpha=False, # No alpha channel needed | |
| colorspace="rgb" # Force RGB colorspace | |
| ) | |
| print(f"Page rendered, size: {pix.width}x{pix.height}") | |
| # Convert to PIL Image - use more efficient method | |
| img_data = pix.tobytes("png") | |
| img = Image.open(io.BytesIO(img_data)) | |
| # Convert to RGB if needed | |
| if img.mode in ('RGBA', 'LA', 'P'): | |
| img = img.convert('RGB') | |
| print(f"Image converted to RGB, mode: {img.mode}") | |
| # Save to bytes with optimization | |
| output_buffer = io.BytesIO() | |
| if target_format == 'PNG': | |
| img.save(output_buffer, format='PNG', optimize=True, compress_level=ImagePreprocessor.PDF_COMPRESS_LEVEL) | |
| new_mime_type = 'image/png' | |
| new_extension = '.png' | |
| else: | |
| img.save(output_buffer, format='JPEG', quality=quality, optimize=True) | |
| new_mime_type = 'image/jpeg' | |
| new_extension = '.jpg' | |
| # Clean up resources immediately | |
| pdf_document.close() | |
| del pix # Free memory | |
| # Generate new filename | |
| base_name = os.path.splitext(filename)[0] | |
| new_filename = f"{base_name}{new_extension}" | |
| print(f"PDF processing completed: {filename} -> {new_filename}") | |
| return output_buffer.getvalue(), new_filename, new_mime_type | |
| except Exception as e: | |
| print(f"PDF processing failed: {str(e)}") | |
| raise ValueError(f"Failed to process PDF: {str(e)}") | |
| def _process_tiff( | |
| file_content: bytes, | |
| filename: str, | |
| target_format: str, | |
| quality: int | |
| ) -> Tuple[bytes, str, str]: | |
| """Process TIFF/GeoTIFF files by rendering RGB view""" | |
| try: | |
| img = Image.open(io.BytesIO(file_content)) | |
| # Convert to RGB if needed | |
| if img.mode in ('RGBA', 'LA', 'P', 'CMYK', 'LAB', 'HSV', 'I', 'F'): | |
| img = img.convert('RGB') | |
| # Save to bytes | |
| output_buffer = io.BytesIO() | |
| if target_format == 'PNG': | |
| img.save(output_buffer, format='PNG', optimize=True) | |
| new_mime_type = 'image/png' | |
| new_extension = '.png' | |
| else: | |
| img.save(output_buffer, format='JPEG', quality=quality, optimize=True) | |
| new_mime_type = 'image/jpeg' | |
| new_extension = '.jpg' | |
| # Generate new filename | |
| base_name = os.path.splitext(filename)[0] | |
| new_filename = f"{base_name}{new_extension}" | |
| return output_buffer.getvalue(), new_filename, new_mime_type | |
| except Exception as e: | |
| raise ValueError(f"Failed to process TIFF: {str(e)}") | |
| def _process_heic( | |
| file_content: bytes, | |
| filename: str, | |
| target_format: str, | |
| quality: int | |
| ) -> Tuple[bytes, str, str]: | |
| """Process HEIC/HEIF files""" | |
| try: | |
| img = Image.open(io.BytesIO(file_content)) | |
| # Convert to RGB if needed | |
| if img.mode in ('RGBA', 'LA', 'P'): | |
| img = img.convert('RGB') | |
| # Save to bytes | |
| output_buffer = io.BytesIO() | |
| if target_format == 'PNG': | |
| img.save(output_buffer, format='PNG', optimize=True) | |
| new_mime_type = 'image/png' | |
| new_extension = '.png' | |
| else: | |
| img.save(output_buffer, format='JPEG', quality=quality, optimize=True) | |
| new_mime_type = 'image/jpeg' | |
| new_extension = '.jpg' | |
| # Generate new filename | |
| base_name = os.path.splitext(filename)[0] | |
| new_filename = f"{base_name}{new_extension}" | |
| return output_buffer.getvalue(), new_filename, new_mime_type | |
| except Exception as e: | |
| raise ValueError(f"Failed to process HEIC: {str(e)}") | |
| def _process_webp( | |
| file_content: bytes, | |
| filename: str, | |
| target_format: str, | |
| quality: int | |
| ) -> Tuple[bytes, str, str]: | |
| """Process WebP files""" | |
| try: | |
| img = Image.open(io.BytesIO(file_content)) | |
| # Convert to RGB if needed | |
| if img.mode in ('RGBA', 'LA', 'P'): | |
| img = img.convert('RGB') | |
| # Save to bytes | |
| output_buffer = io.BytesIO() | |
| if target_format == 'PNG': | |
| img.save(output_buffer, format='PNG', optimize=True) | |
| new_mime_type = 'image/png' | |
| new_extension = '.png' | |
| else: | |
| img.save(output_buffer, format='JPEG', quality=quality, optimize=True) | |
| new_mime_type = 'image/jpeg' | |
| new_extension = '.jpg' | |
| # Generate new filename | |
| base_name = os.path.splitext(filename)[0] | |
| new_filename = f"{base_name}{new_extension}" | |
| return output_buffer.getvalue(), new_filename, new_mime_type | |
| except Exception as e: | |
| raise ValueError(f"Failed to process WebP: {str(e)}") | |
| def _process_gif( | |
| file_content: bytes, | |
| filename: str, | |
| target_format: str, | |
| quality: int | |
| ) -> Tuple[bytes, str, str]: | |
| """Process GIF files (static only)""" | |
| try: | |
| img = Image.open(io.BytesIO(file_content)) | |
| # Check if GIF is animated | |
| if hasattr(img, 'n_frames') and img.n_frames > 1: | |
| # Take first frame for animated GIFs | |
| img.seek(0) | |
| # Convert to RGB if needed | |
| if img.mode in ('RGBA', 'LA', 'P'): | |
| img = img.convert('RGB') | |
| # Save to bytes | |
| output_buffer = io.BytesIO() | |
| if target_format == 'PNG': | |
| img.save(output_buffer, format='PNG', optimize=True) | |
| new_mime_type = 'image/png' | |
| new_extension = '.png' | |
| else: | |
| img.save(output_buffer, format='JPEG', quality=quality, optimize=True) | |
| new_mime_type = 'image/jpeg' | |
| new_extension = '.jpg' | |
| # Generate new filename | |
| base_name = os.path.splitext(filename)[0] | |
| new_filename = f"{base_name}{new_extension}" | |
| return output_buffer.getvalue(), new_filename, new_mime_type | |
| except Exception as e: | |
| raise ValueError(f"Failed to process GIF: {str(e)}") | |
| def _process_generic( | |
| file_content: bytes, | |
| filename: str, | |
| target_format: str, | |
| quality: int | |
| ) -> Tuple[bytes, str, str]: | |
| """Generic processing for other formats""" | |
| try: | |
| img = Image.open(io.BytesIO(file_content)) | |
| # Convert to RGB if needed | |
| if img.mode in ('RGBA', 'LA', 'P', 'CMYK', 'LAB', 'HSV', 'I', 'F'): | |
| img = img.convert('RGB') | |
| # Save to bytes | |
| output_buffer = io.BytesIO() | |
| if target_format == 'PNG': | |
| img.save(output_buffer, format='PNG', optimize=True) | |
| new_mime_type = 'image/png' | |
| new_extension = '.png' | |
| else: | |
| img.save(output_buffer, format='JPEG', quality=quality, optimize=True) | |
| new_mime_type = 'image/jpeg' | |
| new_extension = '.jpg' | |
| # Generate new filename | |
| base_name = os.path.splitext(filename)[0] | |
| new_filename = f"{base_name}{new_extension}" | |
| return output_buffer.getvalue(), new_filename, new_mime_type | |
| except Exception as e: | |
| raise ValueError(f"Failed to process generic format: {str(e)}") | |