Spaces:
Running
on
Zero
Running
on
Zero
| import os | |
| os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "0") # disable hf_transfer if missing | |
| import gradio as gr | |
| import torch | |
| from transformers import AutoModel, AutoTokenizer | |
| import spaces | |
| import tempfile | |
| from PIL import Image | |
| import re | |
| from gradio.themes import Soft | |
| from gradio.themes.utils import fonts | |
| import fitz # PyMuPDF for PDF processing | |
| import gc | |
| import base64 | |
| from io import BytesIO | |
| # ===== Model Load ===== | |
| model_name = "deepseek-ai/DeepSeek-OCR" | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) | |
| model = AutoModel.from_pretrained( | |
| model_name, | |
| _attn_implementation="flash_attention_2", | |
| trust_remote_code=True, | |
| use_safetensors=True, | |
| ) | |
| model = model.eval() | |
| # fallback if flash-attn is unavailable | |
| try: | |
| import flash_attn # noqa | |
| except Exception: | |
| if hasattr(model, "config"): | |
| try: | |
| model.config._attn_implementation = "sdpa" | |
| except Exception: | |
| pass | |
| def pdf_to_images(pdf_path, dpi=200): | |
| """ | |
| Convert PDF pages to PIL Images using PyMuPDF | |
| Args: | |
| pdf_path: Path to PDF file | |
| dpi: Resolution for rendering (default 200) | |
| Returns: | |
| List of PIL Image objects | |
| """ | |
| images = [] | |
| pdf_document = fitz.open(pdf_path) | |
| for page_num in range(len(pdf_document)): | |
| page = pdf_document[page_num] | |
| # Render page to pixmap with specified DPI | |
| mat = fitz.Matrix(dpi / 72, dpi / 72) # 72 is default DPI | |
| pix = page.get_pixmap(matrix=mat) | |
| # Convert to PIL Image | |
| img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
| images.append(img) | |
| pdf_document.close() | |
| return images | |
| def extract_images_from_pdf(pdf_path): | |
| """ | |
| Extract embedded images from PDF using PyMuPDF | |
| Returns dict: {page_num: [list of PIL Images]} | |
| """ | |
| pdf_document = fitz.open(pdf_path) | |
| images_by_page = {} | |
| for page_num in range(len(pdf_document)): | |
| page = pdf_document[page_num] | |
| image_list = page.get_images() | |
| page_images = [] | |
| for img_index, img in enumerate(image_list): | |
| xref = img[0] | |
| try: | |
| base_image = pdf_document.extract_image(xref) | |
| image_bytes = base_image["image"] | |
| pil_image = Image.open(BytesIO(image_bytes)) | |
| page_images.append(pil_image) | |
| except Exception as e: | |
| print(f"Could not extract image {img_index} from page {page_num}: {e}") | |
| continue | |
| if page_images: | |
| images_by_page[page_num] = page_images | |
| pdf_document.close() | |
| return images_by_page | |
| def image_to_base64(pil_image, format='JPEG', max_size=(1200, 1200)): | |
| """ | |
| Convert PIL Image to base64 string for markdown embedding | |
| Resize if too large to keep file size manageable | |
| """ | |
| # Resize if image is too large | |
| if pil_image.size[0] > max_size[0] or pil_image.size[1] > max_size[1]: | |
| pil_image = pil_image.copy() | |
| pil_image.thumbnail(max_size, Image.Resampling.LANCZOS) | |
| buffered = BytesIO() | |
| # Convert RGBA to RGB if necessary | |
| if pil_image.mode == 'RGBA' and format == 'JPEG': | |
| rgb_image = Image.new('RGB', pil_image.size, (255, 255, 255)) | |
| rgb_image.paste(pil_image, mask=pil_image.split()[3]) | |
| rgb_image.save(buffered, format=format, quality=85) | |
| else: | |
| pil_image.save(buffered, format=format, quality=85 if format == 'JPEG' else None) | |
| img_str = base64.b64encode(buffered.getvalue()).decode() | |
| return f"data:image/{format.lower()};base64,{img_str}" | |
| def detect_figure_regions(text_result, original_image): | |
| """ | |
| Detect figure regions from OCR output using bounding boxes | |
| Returns list of cropped figure images | |
| """ | |
| figure_images = [] | |
| # Pattern to detect bounding boxes (if model returns them) | |
| pattern = re.compile(r"<\|det\|>\[\[(\d+),\s*(\d+),\s*(\d+),\s*(\d+)\]\]<\|/det\|>") | |
| matches = list(pattern.finditer(text_result or "")) | |
| if matches and len(matches) > 0: | |
| w, h = original_image.size | |
| for match in matches: | |
| x1, y1, x2, y2 = [int(c) for c in match.groups()] | |
| # Convert from normalized coordinates (0-1000) to actual pixels | |
| x1 = int(x1 / 1000 * w) | |
| y1 = int(y1 / 1000 * h) | |
| x2 = int(x2 / 1000 * w) | |
| y2 = int(y2 / 1000 * h) | |
| # Crop the region | |
| cropped = original_image.crop((x1, y1, x2, y2)) | |
| figure_images.append(cropped) | |
| return figure_images | |
| def process_single_page(image, model_runtime, tokenizer, model_size, task_type, ref_text, is_eval_mode, output_path, page_num, embed_images=True): | |
| """ | |
| Process a single page/image with DeepSeek-OCR | |
| Returns markdown content with embedded images inline with context | |
| """ | |
| # ===== choose task prompt ===== | |
| if task_type == "📝 Free OCR": | |
| prompt = "<image>\nFree OCR." | |
| elif task_type == "📄 Convert to Markdown": | |
| prompt = "<image>\n<|grounding|>Convert the document to markdown." | |
| elif task_type == "📈 Parse Figure": | |
| prompt = "<image>\nParse the figure." | |
| elif task_type == "🔍 Locate Object by Reference": | |
| if not ref_text or ref_text.strip() == "": | |
| raise gr.Error("Please provide reference text for the Locate task!") | |
| prompt = f"<image>\nLocate <|ref|>{ref_text.strip()}<|/ref|> in the image." | |
| else: | |
| prompt = "<image>\nFree OCR." | |
| # save image temporarily | |
| temp_image_path = os.path.join(output_path, "temp_image.jpg") | |
| image.save(temp_image_path) | |
| # ===== size config ===== | |
| size_configs = { | |
| "Tiny": {"base_size": 512, "image_size": 512, "crop_mode": False}, | |
| "Small": {"base_size": 640, "image_size": 640, "crop_mode": False}, | |
| "Base": {"base_size": 1024, "image_size": 1024, "crop_mode": False}, | |
| "Large": {"base_size": 1280, "image_size": 1280, "crop_mode": False}, | |
| "Gundam (Recommended)": {"base_size": 1024, "image_size": 640, "crop_mode": True}, | |
| } | |
| config = size_configs.get(model_size, size_configs["Gundam (Recommended)"]) | |
| # ===== inference ===== | |
| with torch.no_grad(): | |
| plain_text_result = model_runtime.infer( | |
| tokenizer, | |
| prompt=prompt, | |
| image_file=temp_image_path, | |
| output_path=output_path, | |
| base_size=config["base_size"], | |
| image_size=config["image_size"], | |
| crop_mode=config["crop_mode"], | |
| save_results=True, | |
| test_compress=True, | |
| eval_mode=is_eval_mode, | |
| ) | |
| # ===== collect markdown result ===== | |
| markdown_result_path = os.path.join(output_path, "result.mmd") | |
| markdown_content = "" | |
| if os.path.exists(markdown_result_path): | |
| try: | |
| with open(markdown_result_path, "r", encoding="utf-8") as f: | |
| markdown_content = f.read() | |
| except Exception: | |
| pass | |
| # If no markdown file, use plain text result | |
| if not markdown_content and plain_text_result: | |
| markdown_content = plain_text_result | |
| # ===== Embed images if requested ===== | |
| embedded_images_list = [] | |
| if embed_images and markdown_content: | |
| # Extract embedded PDF images first (logos, seals, etc.) | |
| figure_images = detect_figure_regions(plain_text_result, image) | |
| # Detect document type for smart placement | |
| is_certificate = any(word in markdown_content.lower() for word in ['sertifikat', 'certificate', 'pengesahan', 'approval']) | |
| is_letter = any(word in markdown_content.lower() for word in ['surat', 'letter', 'memo', 'kementerian', 'ministry']) | |
| has_logo = 'logo' in markdown_content.lower() or 'seal' in markdown_content.lower() | |
| # For certificates and official letters, embed full page at top | |
| if (is_certificate or is_letter or has_logo) and len(markdown_content.split()) > 20: | |
| try: | |
| base64_img = image_to_base64(image, format='JPEG') | |
| # Find the first heading or title | |
| lines = markdown_content.split('\n') | |
| insert_pos = 0 | |
| # Look for the first significant heading (##, ###, or all-caps line) | |
| for i, line in enumerate(lines): | |
| stripped = line.strip() | |
| if stripped.startswith('##') or (stripped.isupper() and len(stripped.split()) >= 3): | |
| insert_pos = i + 1 | |
| break | |
| # Insert image right after the title | |
| if insert_pos > 0: | |
| lines.insert(insert_pos, f"\n\n") | |
| markdown_content = '\n'.join(lines) | |
| else: | |
| # No clear heading, insert at top | |
| markdown_content = f"\n\n" + markdown_content | |
| except Exception as e: | |
| print(f"Error embedding page image: {e}") | |
| # If specific figures detected (charts, graphs), embed them inline | |
| elif figure_images: | |
| figures_markdown = "\n\n" | |
| for idx, fig_img in enumerate(figure_images): | |
| try: | |
| base64_img = image_to_base64(fig_img, format='PNG') | |
| figures_markdown += f"\n\n" | |
| except Exception as e: | |
| print(f"Error embedding figure {idx+1}: {e}") | |
| # Insert after the first paragraph | |
| paragraphs = markdown_content.split('\n\n', 1) | |
| if len(paragraphs) >= 2: | |
| markdown_content = paragraphs[0] + figures_markdown + paragraphs[1] | |
| else: | |
| markdown_content = figures_markdown + markdown_content | |
| # For pages with charts/graphs mentioned but not detected | |
| elif any(word in markdown_content.lower() for word in ['chart', 'graph', 'diagram', 'figure']): | |
| try: | |
| base64_img = image_to_base64(image, format='JPEG') | |
| # Insert after first mention of chart/graph | |
| for keyword in ['chart', 'graph', 'diagram', 'figure']: | |
| if keyword in markdown_content.lower(): | |
| parts = markdown_content.lower().split(keyword, 1) | |
| # Find the position in the original text | |
| pos = len(parts[0]) | |
| # Insert image after the current paragraph | |
| para_end = markdown_content.find('\n\n', pos) | |
| if para_end > 0: | |
| markdown_content = markdown_content[:para_end] + f"\n\n\n\n" + markdown_content[para_end+2:] | |
| else: | |
| markdown_content += f"\n\n\n\n" | |
| break | |
| except Exception as e: | |
| print(f"Error embedding contextual image: {e}") | |
| return markdown_content, plain_text_result, embedded_images_list | |
| # ===== Main Processing Function ===== | |
| def process_pdf(pdf_file, model_size, task_type, ref_text, is_eval_mode, embed_images, progress=gr.Progress()): | |
| """ | |
| Process PDF with DeepSeek-OCR and return combined markdown from all pages. | |
| Includes both visual images and extracted text content. | |
| """ | |
| if pdf_file is None: | |
| return "Please upload a PDF file first.", "Please upload a PDF file first." | |
| # handle CPU/GPU | |
| if torch.cuda.is_available(): | |
| model_runtime = model.to("cuda", dtype=torch.bfloat16) | |
| else: | |
| model_runtime = model.to("cpu", dtype=torch.float32) | |
| try: | |
| # Convert PDF to images | |
| progress(0, desc="Converting PDF to images...") | |
| images = pdf_to_images(pdf_file.name) | |
| total_pages = len(images) | |
| if total_pages == 0: | |
| return "No pages found in the PDF.", "No pages found in the PDF." | |
| # Extract embedded images if needed | |
| embedded_images = {} | |
| if embed_images: | |
| progress(0.05, desc="Extracting embedded images from PDF...") | |
| try: | |
| embedded_images = extract_images_from_pdf(pdf_file.name) | |
| print(f"Found embedded images on {len(embedded_images)} pages") | |
| except Exception as e: | |
| print(f"Could not extract embedded images: {e}") | |
| progress(0.1, desc=f"Found {total_pages} pages. Starting OCR...") | |
| # Process each page with memory management | |
| all_markdown_results = [] | |
| with tempfile.TemporaryDirectory() as output_path: | |
| for page_num, image in enumerate(images, start=1): | |
| try: | |
| progress( | |
| (page_num / total_pages) * 0.9 + 0.1, | |
| desc=f"Processing page {page_num}/{total_pages}..." | |
| ) | |
| markdown_content, plain_text, page_embedded_imgs = process_single_page( | |
| image, | |
| model_runtime, | |
| tokenizer, | |
| model_size, | |
| task_type, | |
| ref_text, | |
| is_eval_mode, | |
| output_path, | |
| page_num, | |
| embed_images | |
| ) | |
| # Add embedded images from PDF inline if any | |
| if embed_images and (page_num - 1) in embedded_images: | |
| # Insert PDF images right after the OCR'd content, but before page separator | |
| pdf_images_markdown = "\n\n" | |
| for idx, img in enumerate(embedded_images[page_num - 1]): | |
| try: | |
| base64_img = image_to_base64(img, format='PNG') | |
| # Add inline without a big header | |
| pdf_images_markdown += f"\n\n" | |
| except Exception as e: | |
| print(f"Error embedding PDF image {idx+1}: {e}") | |
| # Append to the markdown content directly (inline) | |
| markdown_content += pdf_images_markdown | |
| # Add page separator and content | |
| page_header = f"\n\n---\n\n# Page {page_num}\n\n" | |
| all_markdown_results.append(page_header + markdown_content) | |
| # Clear memory after each page to prevent accumulation | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| except Exception as e: | |
| error_msg = f"\n\n---\n\n# Page {page_num}\n\n**⚠️ Error processing this page:** {str(e)}\n\n" | |
| all_markdown_results.append(error_msg) | |
| print(f"Error on page {page_num}: {str(e)}") | |
| # Clear memory even on error | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| continue | |
| # Combine all results | |
| progress(1.0, desc="Finalizing document...") | |
| combined_markdown = "\n\n".join(all_markdown_results) | |
| # Add document header with metadata | |
| image_status = "✅ Enabled" if embed_images else "❌ Disabled" | |
| final_output = f"""# 📄 Document OCR Results | |
| **Total Pages:** {total_pages} | |
| **Model Size:** {model_size} | |
| **Task Type:** {task_type} | |
| **Image Embedding:** {image_status} | |
| --- | |
| {combined_markdown} | |
| --- | |
| **End of Document** - Processed {total_pages} pages successfully. | |
| """ | |
| return final_output, final_output # Return twice: once for preview, once for raw text | |
| except Exception as e: | |
| error_message = f"""# ❌ Error Processing PDF | |
| **Error:** {str(e)} | |
| **Troubleshooting Tips:** | |
| - Try using a smaller model size (Tiny or Small) | |
| - Disable image embedding for faster processing | |
| - Check if the PDF is corrupted or password-protected | |
| - For very large PDFs (50+ pages), consider processing in batches | |
| - Ensure you have enough GPU memory available | |
| **Technical Details:** | |
| ``` | |
| {str(e)} | |
| ``` | |
| """ | |
| print(f"Fatal error: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return error_message, error_message # Return twice: once for preview, once for raw text | |
| # ===== Theme and UI ===== | |
| theme = Soft( | |
| font=fonts.GoogleFont("Inter"), | |
| font_mono=fonts.GoogleFont("JetBrains Mono"), | |
| ) | |
| custom_css = """ | |
| .gradio-container, body { | |
| font-family: 'Inter', ui-sans-serif, system-ui, -apple-system, 'Segoe UI', Roboto, 'Helvetica Neue', Arial, 'Noto Sans', 'Apple Color Emoji','Segoe UI Emoji','Segoe UI Symbol','Noto Color Emoji' !important; | |
| } | |
| .prose h1 { font-weight: 800; letter-spacing: -0.02em; } | |
| .prose h2, .prose h3 { font-weight: 700; letter-spacing: -0.01em; } | |
| .gr-button { border-radius: 12px; font-weight: 600; } | |
| .prose img { max-width: 100%; height: auto; border-radius: 8px; margin: 1rem 0; } | |
| """ | |
| # ===== Interface ===== | |
| with gr.Blocks( | |
| title="DeepSeek-OCR PDF Parser by Jatevo LLM Inference", | |
| theme=theme, | |
| css=custom_css, | |
| ) as demo: | |
| gr.Markdown( | |
| """ | |
| # 📄 DeepSeek-OCR PDF Parser by Jatevo LLM Inference | |
| Upload a PDF to extract text and convert to Markdown using **DeepSeek-OCR**. | |
| Each page is processed sequentially and combined into a single markdown document. | |
| ## ✨ Features | |
| - 🖼️ **Image Embedding** - Charts, graphs, and figures embedded directly in markdown | |
| - 📝 **Text Extraction** - All text content from images and charts extracted | |
| - 📊 **Table Support** - Tables converted to markdown format | |
| - 🔍 **Object Detection** - Locate specific elements in documents | |
| - 🎯 **Multiple Models** - Choose speed vs. accuracy trade-off | |
| ## 📏 Model Sizes | |
| - **Tiny** — Fastest, lower accuracy (512×512) - Best for large PDFs (30+ pages) | |
| - **Small** — Fast, good accuracy (640×640) - Good for 15-30 pages | |
| - **Base** — Balanced performance (1024×1024) - Good for 10-20 pages | |
| - **Large** — Best accuracy, slower (1280×1280) - Best for <10 pages | |
| - **Gundam (Recommended)** — Optimized for documents (1024 base, 640 image, crop mode) | |
| ## 💡 Tips | |
| - Enable **"Embed Images"** to include charts/figures (recommended) | |
| - Use **Tiny or Small** model for large PDFs (20+ pages) | |
| - Processing time: ~2-5 seconds per page depending on model | |
| - Maximum recommended: 50 pages at once | |
| - Image embedding increases file size (~1-2MB per page with images) | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| pdf_input = gr.File( | |
| label="📎 Upload PDF", | |
| file_types=[".pdf"], | |
| type="filepath" | |
| ) | |
| model_size = gr.Dropdown( | |
| choices=["Tiny", "Small", "Base", "Large", "Gundam (Recommended)"], | |
| value="Small", | |
| label="🎯 Model Size", | |
| info="Use Tiny/Small for large PDFs (20+ pages)" | |
| ) | |
| task_type = gr.Dropdown( | |
| choices=[ | |
| "📝 Free OCR", | |
| "📄 Convert to Markdown", | |
| "📈 Parse Figure", | |
| "🔍 Locate Object by Reference", | |
| ], | |
| value="📄 Convert to Markdown", | |
| label="📋 Task Type", | |
| ) | |
| ref_text_input = gr.Textbox( | |
| label="🔍 Reference Text (for Locate task)", | |
| placeholder="e.g., 'the teacher', '20-10', 'a red car'...", | |
| visible=False, | |
| ) | |
| with gr.Row(): | |
| eval_mode_checkbox = gr.Checkbox( | |
| value=False, | |
| label="⚡ Evaluation Mode", | |
| info="Plain text only (faster)", | |
| ) | |
| embed_images_checkbox = gr.Checkbox( | |
| value=True, | |
| label="🖼️ Embed Images", | |
| info="Include charts/figures in output", | |
| ) | |
| submit_btn = gr.Button("🚀 Process PDF", variant="primary", size="lg") | |
| gr.Markdown( | |
| """ | |
| --- | |
| ### 📊 Processing Status | |
| Watch the progress bar for real-time updates. | |
| **Note:** Image embedding provides both: | |
| - 👁️ Visual image (embedded as base64) | |
| - 📝 Extracted text content (OCR'd from image) | |
| You get the best of both worlds! | |
| """ | |
| ) | |
| with gr.Column(scale=2): | |
| gr.Markdown("### 📝 Markdown Output Preview") | |
| output_markdown_preview = gr.Markdown( | |
| label="Rendered Markdown", | |
| value="*Upload a PDF and click 'Process PDF' to see results here.*\n\n*The output will include both images and extracted text.*" | |
| ) | |
| gr.Markdown("### 📄 Raw Markdown Source (Copy/Download)") | |
| output_text = gr.Textbox( | |
| label="Raw Markdown", | |
| lines=25, | |
| show_copy_button=True, | |
| interactive=False, | |
| placeholder="Markdown source will appear here... You can copy/paste this into any markdown editor." | |
| ) | |
| # show/hide reference text box based on selected task | |
| def toggle_ref_text_visibility(task): | |
| return gr.Textbox(visible=(task == "🔍 Locate Object by Reference")) | |
| task_type.change( | |
| fn=toggle_ref_text_visibility, | |
| inputs=task_type, | |
| outputs=ref_text_input, | |
| ) | |
| submit_btn.click( | |
| fn=process_pdf, | |
| inputs=[pdf_input, model_size, task_type, ref_text_input, eval_mode_checkbox, embed_images_checkbox], | |
| outputs=[output_markdown_preview, output_text], | |
| ) | |
| # ===== Launch ===== | |
| if __name__ == "__main__": | |
| # Increase timeout for large PDFs | |
| demo.queue( | |
| max_size=20, | |
| default_concurrency_limit=2 | |
| ) | |
| demo.launch( | |
| max_threads=40, # Increase thread limit for better concurrency | |
| show_error=True, # Show errors in UI for debugging | |
| share=False # Set to True to create a public link | |
| ) |