Spaces:
Sleeping
Sleeping
| """ | |
| Image Extractor Module | |
| This module extracts high-resolution product images and measurements from web pages. | |
| Designed primarily for IKEA product pages but can be extended for other sites. | |
| """ | |
| import uuid | |
| import re | |
| import os | |
| import logging | |
| from typing import Dict, Any, Optional, List, Tuple | |
| from dataclasses import dataclass, field | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from PIL import Image | |
| from io import BytesIO | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class ImageInfo: | |
| """Class for storing image information""" | |
| url: str | |
| alt: str = "" | |
| type: str = "unknown" | |
| path: Optional[str] = None | |
| id: Optional[str] = None | |
| class ExtractionResult: | |
| """Class for storing the results of a webpage extraction""" | |
| request_id: str | |
| images: Dict[str, ImageInfo] = field(default_factory=dict) | |
| measurements: Dict[str, str] = field(default_factory=dict) | |
| materials: Dict[str, str] = field(default_factory=dict) | |
| output_dir: Optional[str] = None | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert the extraction result to a dictionary""" | |
| images_dict = { | |
| img_id: { | |
| "id": img_id, | |
| "url": img_info.url, | |
| "alt": img_info.alt, | |
| "type": img_info.type, | |
| "path": img_info.path | |
| } for img_id, img_info in self.images.items() | |
| } | |
| return { | |
| "request_id": self.request_id, | |
| "images": images_dict, | |
| "measurements": self.measurements, | |
| "materials": self.materials, | |
| "output_dir": self.output_dir | |
| } | |
| class SrcsetParser: | |
| """Helper class for parsing srcset attributes from HTML img tags""" | |
| def parse_srcset(srcset: str) -> List[Dict[str, Any]]: | |
| """ | |
| Parse a srcset attribute into a structured list of image URLs and descriptors. | |
| Args: | |
| srcset: The srcset attribute from an img tag | |
| Returns: | |
| List of dictionaries containing parsed srcset components | |
| """ | |
| if not srcset: | |
| return [] | |
| results = [] | |
| srcset_parts = [part.strip() for part in srcset.split(',')] | |
| for part in srcset_parts: | |
| parts = part.split() | |
| if len(parts) < 2: | |
| continue | |
| url = parts[0] | |
| descriptor = parts[1] | |
| try: | |
| width = int(re.search(r'\d+', descriptor).group(0)) if re.search(r'\d+', descriptor) else 0 | |
| results.append({"url": url, "descriptor": descriptor, "width": width}) | |
| except (AttributeError, ValueError): | |
| continue | |
| return results | |
| def extract_f_xl_image(cls, srcset: str) -> Optional[str]: | |
| """ | |
| Extract specifically the image URL with f=xl 900w from a srcset attribute. | |
| Args: | |
| srcset: The srcset attribute from an img tag | |
| Returns: | |
| The URL with f=xl 900w descriptor or None if not found | |
| """ | |
| if not srcset: | |
| return None | |
| srcset_entries = cls.parse_srcset(srcset) | |
| # First, look for f=xl with 900w | |
| for entry in srcset_entries: | |
| if "f=xl" in entry["url"] and entry["descriptor"] == "900w": | |
| return entry["url"] | |
| # If not found, try any 900w image | |
| for entry in srcset_entries: | |
| if entry["descriptor"] == "900w": | |
| return entry["url"] | |
| # Finally, fall back to highest resolution | |
| if srcset_entries: | |
| srcset_entries.sort(key=lambda x: x["width"], reverse=True) | |
| return srcset_entries[0]["url"] | |
| return None | |
| class ImageDownloader: | |
| """Helper class for downloading images""" | |
| def download_image(image_url: str, save_path: str) -> Optional[str]: | |
| """ | |
| Download an image from URL and save it to disk. | |
| Args: | |
| image_url: URL of the image to download | |
| save_path: Path where the image will be saved | |
| Returns: | |
| The path to the saved image or None if download failed | |
| """ | |
| try: | |
| # Create directory if it doesn't exist | |
| os.makedirs(os.path.dirname(save_path), exist_ok=True) | |
| # Get the image content | |
| response = requests.get(image_url, timeout=30) | |
| response.raise_for_status() | |
| # Save the image | |
| img = Image.open(BytesIO(response.content)) | |
| img.save(save_path) | |
| logger.info(f"Image saved to {save_path}") | |
| return save_path | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Error downloading image: {e}") | |
| return None | |
| except IOError as e: | |
| logger.error(f"Error saving image: {e}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Unexpected error while downloading image: {e}") | |
| return None | |
| class WebPageFetcher: | |
| """Helper class for fetching web pages""" | |
| DEFAULT_HEADERS = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
| } | |
| def fetch_page(cls, url: str) -> Tuple[str, BeautifulSoup]: | |
| """ | |
| Fetch a web page and return its content as text and parsed BeautifulSoup. | |
| Args: | |
| url: The URL to fetch | |
| Returns: | |
| Tuple containing (raw_html, parsed_soup) | |
| Raises: | |
| requests.exceptions.RequestException: If the request fails | |
| """ | |
| logger.info(f"Fetching page: {url}") | |
| response = requests.get(url, headers=cls.DEFAULT_HEADERS, timeout=30) | |
| response.raise_for_status() | |
| html = response.text | |
| # Parse HTML with BeautifulSoup | |
| soup = BeautifulSoup(html, 'html.parser') | |
| return html, soup | |
| class ProductExtractor: | |
| """Main class for extracting product information""" | |
| def __init__(self): | |
| self.srcset_parser = SrcsetParser() | |
| self.image_downloader = ImageDownloader() | |
| def extract_images_from_url(self, url: str) -> ExtractionResult: | |
| """ | |
| Extract images with preference for f=xl 900w versions from a URL. | |
| Args: | |
| url: The URL to extract images from | |
| Returns: | |
| ExtractionResult object with extracted image information | |
| Raises: | |
| requests.exceptions.RequestException: If the request fails | |
| ValueError: If the HTML cannot be parsed correctly | |
| """ | |
| try: | |
| logger.info(f"Extracting images from: {url}") | |
| # Fetch the HTML content | |
| _, soup = WebPageFetcher.fetch_page(url) | |
| # Generate a UUID for this request | |
| request_uuid = str(uuid.uuid4()) | |
| logger.info(f"Generated request ID: {request_uuid}") | |
| # Initialize result | |
| result = ExtractionResult(request_id=request_uuid) | |
| # Extract images | |
| self._extract_main_product_image(soup, result, request_uuid) | |
| self._extract_measurement_image(soup, result, request_uuid) | |
| # If no specific images found, try general approach | |
| if not result.images: | |
| self._extract_images_general_approach(soup, result, request_uuid) | |
| # Extract measurements | |
| self._extract_measurements(soup, result) | |
| logger.info(f"Total images found: {len(result.images)}") | |
| logger.info(f"Measurements extracted: {result.measurements}") | |
| return result | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Error fetching URL: {e}") | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error extracting images: {e}") | |
| raise | |
| def _extract_main_product_image(self, soup: BeautifulSoup, result: ExtractionResult, request_uuid: str) -> None: | |
| """Extract the main product image""" | |
| main_image_element = soup.select_one('div[data-type="MAIN_PRODUCT_IMAGE"] img.pip-image') | |
| if main_image_element and main_image_element.get('srcset'): | |
| srcset = main_image_element.get('srcset') | |
| target_url = self.srcset_parser.extract_f_xl_image(srcset) | |
| if target_url: | |
| logger.info(f"Found main product image: {target_url}") | |
| image_id = f"{request_uuid}-main" | |
| result.images[image_id] = ImageInfo( | |
| id=image_id, | |
| url=target_url, | |
| alt=main_image_element.get('alt', ''), | |
| type="main" | |
| ) | |
| def _extract_measurement_image(self, soup: BeautifulSoup, result: ExtractionResult, request_uuid: str) -> None: | |
| """Extract the measurement illustration image""" | |
| measurement_image_element = soup.select_one('div[data-type="MEASUREMENT_ILLUSTRATION"] img.pip-image') | |
| if measurement_image_element and measurement_image_element.get('srcset'): | |
| srcset = measurement_image_element.get('srcset') | |
| target_url = self.srcset_parser.extract_f_xl_image(srcset) | |
| if target_url: | |
| logger.info(f"Found measurement image: {target_url}") | |
| image_id = f"{request_uuid}-measurement" | |
| result.images[image_id] = ImageInfo( | |
| id=image_id, | |
| url=target_url, | |
| alt=measurement_image_element.get('alt', ''), | |
| type="measurement" | |
| ) | |
| def _extract_images_general_approach(self, soup: BeautifulSoup, result: ExtractionResult, request_uuid: str) -> None: | |
| """Extract images using a more general approach""" | |
| logger.info("No specific images found, trying general approach...") | |
| for i, img in enumerate(soup.select('img[srcset]')): | |
| srcset = img.get('srcset') | |
| target_url = self.srcset_parser.extract_f_xl_image(srcset) | |
| if target_url: | |
| img_type = self._determine_image_type(img) | |
| logger.info(f"Found {img_type} image: {target_url}") | |
| image_id = f"{request_uuid}-{img_type}-{i}" | |
| result.images[image_id] = ImageInfo( | |
| id=image_id, | |
| url=target_url, | |
| alt=img.get('alt', ''), | |
| type=img_type | |
| ) | |
| def _determine_image_type(self, img_element: BeautifulSoup) -> str: | |
| """Determine the type of image based on its context""" | |
| parent_html = str(img_element.parent.parent) | |
| if "MAIN_PRODUCT_IMAGE" in parent_html or "main" in parent_html.lower(): | |
| return "main" | |
| elif "MEASUREMENT" in parent_html or "measurement" in parent_html.lower(): | |
| return "measurement" | |
| return "unknown" | |
| def _extract_measurements(self, soup: BeautifulSoup, result: ExtractionResult) -> None: | |
| """Extract product measurements""" | |
| dimensions_ul = soup.select_one('ul.pip-product-dimensions__dimensions-container') | |
| if dimensions_ul: | |
| for li in dimensions_ul.select('li.pip-product-dimensions__measurement-wrapper'): | |
| label_span = li.select_one('span.pip-product-dimensions__measurement-name') | |
| if label_span: | |
| label = label_span.get_text(strip=True).replace(":", "") | |
| full_text = li.get_text(strip=True) | |
| value = full_text.replace(label_span.get_text(), '').strip() | |
| result.measurements[label.lower()] = value | |
| def process_product_page(self, url: str, output_dir: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Process a product page to extract and save high-resolution images. | |
| Args: | |
| url: The product page URL | |
| output_dir: Optional custom output directory | |
| Returns: | |
| Dictionary with paths to downloaded images and other product information | |
| """ | |
| # Extract images and measurements | |
| extraction_result = self.extract_images_from_url(url) | |
| # Create a directory for the images using the request ID | |
| if not output_dir: | |
| output_dir = f"output/{extraction_result.request_id}" | |
| extraction_result.output_dir = output_dir | |
| # Process all extracted images | |
| downloaded_images = {} | |
| for image_id, image_info in extraction_result.images.items(): | |
| # Determine filename based on image type | |
| image_type = image_info.type | |
| file_ext = os.path.splitext(image_info.url.split('?')[0])[1] or '.jpg' | |
| filename = f"{image_type}{file_ext}" | |
| # Download the image | |
| save_path = os.path.join(output_dir, filename) | |
| image_path = self.image_downloader.download_image(image_info.url, save_path) | |
| if image_path: | |
| image_info.path = image_path | |
| downloaded_images[image_type] = { | |
| 'id': image_id, | |
| 'path': image_path, | |
| 'url': image_info.url, | |
| 'alt': image_info.alt, | |
| 'type': image_type | |
| } | |
| logger.info(f"Images downloaded to directory: {output_dir}") | |
| return extraction_result.to_dict() | |
| # Create a singleton instance for easy import | |
| extractor = ProductExtractor() | |
| # Export the main functions for API use | |
| extract_images_from_url = extractor.extract_images_from_url | |
| process_product_page = extractor.process_product_page | |
| download_image = ImageDownloader.download_image |