Spaces:
Sleeping
Sleeping
| import os | |
| import base64 | |
| import json | |
| import requests | |
| from typing import Dict, List, Any, Optional | |
| import fitz # PyMuPDF | |
| from PIL import Image | |
| import io | |
| import re | |
| from dataclasses import dataclass, asdict | |
| from pathlib import Path | |
| from datetime import datetime | |
| class TextBlock: | |
| text: str | |
| x: float | |
| y: float | |
| width: float | |
| height: float | |
| font_size: float | |
| font_name: str | |
| is_bold: bool = False | |
| is_italic: bool = False | |
| block_id: str = "" | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert TextBlock to dictionary""" | |
| return asdict(self) | |
| class ImageData: | |
| index: int | |
| base64_data: str | |
| bbox: tuple | |
| width: float | |
| height: float | |
| format: str = "PNG" | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert ImageData to dictionary""" | |
| return asdict(self) | |
| class TableData: | |
| bbox: tuple | |
| data: List[List[str]] | |
| rows: int | |
| columns: int | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert TableData to dictionary""" | |
| return asdict(self) | |
| class PageData: | |
| page_number: int | |
| text_blocks: List[TextBlock] | |
| images: List[ImageData] | |
| tables: List[TableData] | |
| page_width: float | |
| page_height: float | |
| word_count: int = 0 | |
| character_count: int = 0 | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert PageData to dictionary""" | |
| return { | |
| "page_number": self.page_number, | |
| "text_blocks": [block.to_dict() for block in self.text_blocks], | |
| "images": [img.to_dict() for img in self.images], | |
| "tables": [table.to_dict() for table in self.tables], | |
| "page_width": self.page_width, | |
| "page_height": self.page_height, | |
| "word_count": self.word_count, | |
| "character_count": self.character_count | |
| } | |
| class PDFToJSONConverter: | |
| def __init__(self, huggingface_token: str = None): | |
| self.hf_token = huggingface_token | |
| self.hf_headers = { | |
| "Authorization": f"Bearer {huggingface_token}" if huggingface_token else None | |
| } | |
| self.models = { | |
| "document_layout": "microsoft/layoutlm-base-uncased", | |
| "table_detection": "microsoft/table-transformer-detection", | |
| "ocr": "microsoft/trocr-base-printed", | |
| "math_detection": "facebook/detr-resnet-50" | |
| } | |
| self.hf_inference_url = "https://api-inference.huggingface.co/models" | |
| def pdf_to_base64(self, pdf_path: str) -> str: | |
| """Convert PDF file to base64 string""" | |
| try: | |
| with open(pdf_path, "rb") as pdf_file: | |
| return base64.b64encode(pdf_file.read()).decode('utf-8') | |
| except Exception as e: | |
| raise Exception(f"Error converting PDF to base64: {str(e)}") | |
| def extract_pdf_content(self, pdf_path: str) -> Dict[str, Any]: | |
| """Extract all content from PDF and return structured data""" | |
| doc = None | |
| try: | |
| if not os.path.exists(pdf_path): | |
| raise FileNotFoundError(f"PDF file not found: {pdf_path}") | |
| doc = fitz.open(pdf_path) | |
| if doc is None: | |
| raise RuntimeError("Failed to open PDF document") | |
| if doc.page_count == 0: | |
| raise ValueError("PDF document has no pages") | |
| print(f"π PDF opened successfully: {doc.page_count} pages") | |
| pages_data = [] | |
| document_stats = { | |
| "total_pages": doc.page_count, | |
| "total_words": 0, | |
| "total_characters": 0, | |
| "total_images": 0, | |
| "total_tables": 0 | |
| } | |
| for page_num in range(doc.page_count): | |
| try: | |
| page = doc[page_num] | |
| print(f"π Processing page {page_num + 1}/{doc.page_count}") | |
| # Extract text blocks | |
| text_blocks = [] | |
| try: | |
| page_dict = page.get_text("dict") | |
| text_blocks = self._extract_text_blocks_from_dict(page_dict, page_num) | |
| except Exception as e: | |
| print(f"β οΈ Dict method failed for page {page_num + 1}, falling back to simple text extraction: {e}") | |
| text_blocks = self._extract_text_blocks_simple(page, page_num) | |
| # Extract images | |
| images = self._extract_images_safely(page, doc, page_num) | |
| # Extract tables | |
| tables = self._detect_tables_safely(page) | |
| # Get page dimensions | |
| page_rect = page.rect | |
| # Calculate statistics | |
| page_text = " ".join([block.text for block in text_blocks]) | |
| word_count = len(page_text.split()) | |
| char_count = len(page_text) | |
| # Create page data | |
| page_data = PageData( | |
| page_number=page_num + 1, | |
| text_blocks=text_blocks, | |
| images=images, | |
| tables=tables, | |
| page_width=page_rect.width, | |
| page_height=page_rect.height, | |
| word_count=word_count, | |
| character_count=char_count | |
| ) | |
| pages_data.append(page_data) | |
| # Update document statistics | |
| document_stats["total_words"] += word_count | |
| document_stats["total_characters"] += char_count | |
| document_stats["total_images"] += len(images) | |
| document_stats["total_tables"] += len(tables) | |
| except Exception as e: | |
| print(f"β Error processing page {page_num + 1}: {e}") | |
| # Create empty page data for failed pages | |
| empty_page = PageData( | |
| page_number=page_num + 1, | |
| text_blocks=[], | |
| images=[], | |
| tables=[], | |
| page_width=595, | |
| page_height=842, | |
| word_count=0, | |
| character_count=0 | |
| ) | |
| pages_data.append(empty_page) | |
| result = { | |
| "document_info": { | |
| "filename": os.path.basename(pdf_path), | |
| "file_size": os.path.getsize(pdf_path), | |
| "conversion_timestamp": self._get_current_timestamp(), | |
| "converter_version": "1.0.0" | |
| }, | |
| "document_statistics": document_stats, | |
| "pages": [page.to_dict() for page in pages_data] | |
| } | |
| return result | |
| except Exception as e: | |
| raise Exception(f"Error extracting PDF content: {str(e)}") | |
| finally: | |
| if doc is not None: | |
| try: | |
| doc.close() | |
| print("β PDF document closed successfully") | |
| except Exception as e: | |
| print(f"β οΈ Error closing PDF document: {e}") | |
| def _extract_text_blocks_from_dict(self, page_dict: dict, page_num: int) -> List[TextBlock]: | |
| """Extract text blocks from page dictionary with detailed formatting""" | |
| text_blocks = [] | |
| for block_idx, block in enumerate(page_dict.get("blocks", [])): | |
| if "lines" not in block: | |
| continue | |
| for line_idx, line in enumerate(block["lines"]): | |
| for span_idx, span in enumerate(line["spans"]): | |
| text_content = span.get("text", "").strip() | |
| if text_content: | |
| bbox = span["bbox"] | |
| font_info = { | |
| "size": span.get("size", 12), | |
| "font": span.get("font", "Arial"), | |
| "is_bold": "bold" in span.get("font", "").lower() or span.get("flags", 0) & 16, | |
| "is_italic": "italic" in span.get("font", "").lower() or span.get("flags", 0) & 2 | |
| } | |
| text_block = TextBlock( | |
| text=text_content, | |
| x=round(bbox[0], 2), | |
| y=round(bbox[1], 2), | |
| width=round(bbox[2] - bbox[0], 2), | |
| height=round(bbox[3] - bbox[1], 2), | |
| font_size=round(font_info["size"], 2), | |
| font_name=font_info["font"], | |
| is_bold=font_info["is_bold"], | |
| is_italic=font_info["is_italic"], | |
| block_id=f"p{page_num}-b{block_idx}-l{line_idx}-s{span_idx}" | |
| ) | |
| text_blocks.append(text_block) | |
| return text_blocks | |
| def _extract_text_blocks_simple(self, page, page_num: int) -> List[TextBlock]: | |
| """Fallback method for text extraction""" | |
| text_blocks = [] | |
| try: | |
| blocks_data = page.get_text("blocks") | |
| for block_idx, block in enumerate(blocks_data): | |
| if block[6] == 0: # Text block | |
| text = block[4].strip() | |
| if text: | |
| x0, y0, x1, y1 = block[0], block[1], block[2], block[3] | |
| lines = text.split('\n') | |
| line_height = (y1 - y0) / max(len(lines), 1) | |
| for line_idx, line in enumerate(lines): | |
| if line.strip(): | |
| text_block = TextBlock( | |
| text=line.strip(), | |
| x=round(x0, 2), | |
| y=round(y0 + (line_idx * line_height), 2), | |
| width=round(x1 - x0, 2), | |
| height=round(line_height, 2), | |
| font_size=12.0, | |
| font_name="Arial", | |
| is_bold=False, | |
| is_italic=False, | |
| block_id=f"p{page_num}-simple-b{block_idx}-l{line_idx}" | |
| ) | |
| text_blocks.append(text_block) | |
| except Exception as e: | |
| print(f"β οΈ Simple text block extraction failed: {e}") | |
| return text_blocks | |
| def _extract_images_safely(self, page, doc, page_num) -> List[ImageData]: | |
| """Extract images from page and return structured data""" | |
| images = [] | |
| try: | |
| image_list = page.get_images(full=True) | |
| for img_index, img_info in enumerate(image_list): | |
| try: | |
| xref = img_info[0] | |
| # Get image rectangles | |
| img_rects = [r for r in page.get_image_rects(xref)] | |
| if not img_rects: | |
| continue | |
| bbox = img_rects[0] | |
| # Extract image data | |
| pix = fitz.Pixmap(doc, xref) | |
| if pix.n - pix.alpha < 4: # Valid image | |
| img_data = pix.tobytes("png") | |
| img_base64 = base64.b64encode(img_data).decode() | |
| image_data = ImageData( | |
| index=img_index, | |
| base64_data=img_base64, | |
| bbox=(round(bbox.x0, 2), round(bbox.y0, 2), | |
| round(bbox.x1, 2), round(bbox.y1, 2)), | |
| width=round(bbox.x1 - bbox.x0, 2), | |
| height=round(bbox.y1 - bbox.y0, 2), | |
| format="PNG" | |
| ) | |
| images.append(image_data) | |
| pix = None | |
| except Exception as e: | |
| print(f"β οΈ Error extracting image {img_index} on page {page_num+1}: {e}") | |
| continue | |
| except Exception as e: | |
| print(f"β οΈ General error in image extraction for page {page_num+1}: {e}") | |
| return images | |
| def _detect_tables_safely(self, page) -> List[TableData]: | |
| """Extract tables from page and return structured data""" | |
| tables = [] | |
| try: | |
| tabs = page.find_tables() | |
| for tab_index, tab in enumerate(tabs): | |
| try: | |
| table_data = tab.extract() | |
| if table_data: | |
| # Clean table data | |
| cleaned_data = [] | |
| for row in table_data: | |
| cleaned_row = [str(cell).strip() if cell else "" for cell in row] | |
| if any(cleaned_row): # Only add non-empty rows | |
| cleaned_data.append(cleaned_row) | |
| if cleaned_data: | |
| table_obj = TableData( | |
| bbox=(round(tab.bbox.x0, 2), round(tab.bbox.y0, 2), | |
| round(tab.bbox.x1, 2), round(tab.bbox.y1, 2)), | |
| data=cleaned_data, | |
| rows=len(cleaned_data), | |
| columns=max(len(row) for row in cleaned_data) if cleaned_data else 0 | |
| ) | |
| tables.append(table_obj) | |
| except Exception as e: | |
| print(f"β οΈ Error extracting table {tab_index}: {e}") | |
| continue | |
| except Exception as e: | |
| print(f"β οΈ General error in table detection: {e}") | |
| return tables | |
| def convert_to_json(self, pdf_content: Dict[str, Any], output_path: str = None, | |
| pretty_print: bool = True, include_base64_images: bool = True) -> str: | |
| """Convert PDF content to JSON format""" | |
| print("π Converting to JSON format...") | |
| try: | |
| # Create a copy of the content for modification | |
| json_content = pdf_content.copy() | |
| # Add metadata | |
| json_content["conversion_options"] = { | |
| "pretty_print": pretty_print, | |
| "include_base64_images": include_base64_images, | |
| "json_schema_version": "1.0" | |
| } | |
| # Optionally remove base64 image data to reduce file size | |
| if not include_base64_images: | |
| for page in json_content["pages"]: | |
| for image in page["images"]: | |
| image["base64_data"] = "[Base64 data removed - set include_base64_images=True to include]" | |
| # Convert to JSON string | |
| if pretty_print: | |
| json_string = json.dumps(json_content, indent=2, ensure_ascii=False) | |
| else: | |
| json_string = json.dumps(json_content, ensure_ascii=False) | |
| # Save to file if output path provided | |
| if output_path: | |
| try: | |
| Path(output_path).parent.mkdir(parents=True, exist_ok=True) | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| f.write(json_string) | |
| print(f"β JSON saved to: {output_path}") | |
| print(f"π File size: {len(json_string):,} characters") | |
| except Exception as e: | |
| print(f"β οΈ Error saving JSON to {output_path}: {e}") | |
| return json_string | |
| except Exception as e: | |
| raise Exception(f"Error converting to JSON: {str(e)}") | |
| def create_json_summary(self, pdf_content: Dict[str, Any]) -> Dict[str, Any]: | |
| """Create a summary of the PDF content without full data""" | |
| summary = { | |
| "document_info": pdf_content.get("document_info", {}), | |
| "document_statistics": pdf_content.get("document_statistics", {}), | |
| "page_summaries": [] | |
| } | |
| for page in pdf_content.get("pages", []): | |
| page_summary = { | |
| "page_number": page["page_number"], | |
| "text_blocks_count": len(page["text_blocks"]), | |
| "images_count": len(page["images"]), | |
| "tables_count": len(page["tables"]), | |
| "word_count": page["word_count"], | |
| "character_count": page["character_count"], | |
| "page_dimensions": { | |
| "width": page["page_width"], | |
| "height": page["page_height"] | |
| }, | |
| "sample_text": " ".join([block["text"] for block in page["text_blocks"][:3]])[:200] + "..." if page["text_blocks"] else "" | |
| } | |
| summary["page_summaries"].append(page_summary) | |
| return summary | |
| def _get_current_timestamp(self) -> str: | |
| """Get current timestamp as string""" | |
| return datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| def process_pdf_to_json(self, pdf_path: str, output_path: str = None, | |
| pretty_print: bool = True, include_base64_images: bool = True, | |
| create_summary: bool = False, use_hf_models: bool = False) -> str: | |
| """Main method to process PDF and convert to JSON""" | |
| print(f"π Processing PDF to JSON: {pdf_path}") | |
| if not os.path.exists(pdf_path): | |
| raise FileNotFoundError(f"PDF file not found: {pdf_path}") | |
| print("π Extracting PDF content...") | |
| pdf_content = self.extract_pdf_content(pdf_path) | |
| if use_hf_models and self.hf_token: | |
| print("π€ Attempting to enhance with Hugging Face models...") | |
| try: | |
| print("Note: Hugging Face model integration requires further implementation.") | |
| except Exception as e: | |
| print(f"β οΈ Hugging Face enhancement failed: {e}") | |
| print("π Converting to JSON...") | |
| json_content = self.convert_to_json( | |
| pdf_content, | |
| output_path, | |
| pretty_print, | |
| include_base64_images | |
| ) | |
| # Create summary file if requested | |
| if create_summary and output_path: | |
| summary_path = output_path.replace('.json', '_summary.json') | |
| summary_data = self.create_json_summary(pdf_content) | |
| summary_json = json.dumps(summary_data, indent=2, ensure_ascii=False) | |
| try: | |
| with open(summary_path, 'w', encoding='utf-8') as f: | |
| f.write(summary_json) | |
| print(f"β Summary JSON saved to: {summary_path}") | |
| except Exception as e: | |
| print(f"β οΈ Error saving summary: {e}") | |
| print("β Processing complete!") | |
| return json_content | |
| def main(): | |
| """Main function to demonstrate PDF to JSON conversion""" | |
| # Set your Hugging Face token if needed | |
| HF_TOKEN = os.getenv("HF_API_TOKEN") | |
| # Initialize converter | |
| converter = PDFToJSONConverter(huggingface_token=HF_TOKEN) | |
| # Define paths | |
| pdf_path = "new-pdf.pdf" # Change this to your PDF file path | |
| output_path = "converted_document.json" # Output JSON file path | |
| try: | |
| # Convert PDF to JSON | |
| json_content = converter.process_pdf_to_json( | |
| pdf_path=pdf_path, | |
| output_path=output_path, | |
| pretty_print=True, # Format JSON with indentation | |
| include_base64_images=True, # Include image data (set False to reduce file size) | |
| create_summary=True, # Create additional summary file | |
| use_hf_models=False # Set to True if you want to use HuggingFace models | |
| ) | |
| print(f"β Successfully converted '{pdf_path}' to '{output_path}'") | |
| print(f"π JSON length: {len(json_content):,} characters") | |
| print(f"π Open '{output_path}' to view the structured JSON data!") | |
| # Optional: Print first 500 characters of JSON as preview | |
| print("\nπ JSON Preview (first 500 characters):") | |
| print("-" * 50) | |
| print(json_content[:500] + "..." if len(json_content) > 500 else json_content) | |
| except FileNotFoundError as e: | |
| print(f"β Error: {e}") | |
| print("Please ensure the PDF file exists at the specified path.") | |
| except Exception as e: | |
| print(f"β An unexpected error occurred: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| if __name__ == "__main__": | |
| main() |