Spaces:
Running
Running
| import os | |
| import io | |
| import json | |
| import time | |
| from typing import List, Tuple, Dict, Any, Optional | |
| import fitz # PyMuPDF | |
| from PIL import Image | |
| import gradio as gr | |
| import numpy as np | |
| # ========================= | |
| # Config | |
| # ========================= | |
| LOGO_IMAGE_PATH = './assets/logo.jpg' | |
| GOOGLE_FONTS_URL = "<link href='https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700&display=swap' rel='stylesheet'>" | |
| # Lazy-load the OCR model to reduce startup time and memory | |
| _ocr_model = None | |
| def get_ocr_model(lang: str = "en"): | |
| global _ocr_model | |
| if _ocr_model is not None: | |
| return _ocr_model | |
| # PaddleOCR supports language packs like 'en', 'ch', 'fr', 'german', etc. | |
| # The Spaces container will download the model weights on first run and cache them. | |
| from paddleocr import PaddleOCR # import here to avoid heavy import at startup | |
| _ocr_model = PaddleOCR(use_angle_cls=True, lang=lang, show_log=False) | |
| return _ocr_model | |
| def pdf_page_to_image(pdf_doc: fitz.Document, page_index: int, dpi: int = 300) -> Image.Image: | |
| page = pdf_doc.load_page(page_index) | |
| zoom = dpi / 72.0 # 72 dpi is PDF default | |
| mat = fitz.Matrix(zoom, zoom) | |
| pix = page.get_pixmap(matrix=mat, alpha=False) | |
| img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
| return img | |
| def run_paddle_ocr_on_image(image: Image.Image, lang: str = "en") -> Tuple[str, List[Dict[str, Any]]]: | |
| ocr = get_ocr_model(lang=lang) | |
| # Convert PIL image to numpy array for PaddleOCR | |
| img_np = np.array(image) | |
| result = ocr.ocr(img_np, cls=True) | |
| lines: List[str] = [] | |
| items: List[Dict[str, Any]] = [] | |
| # PaddleOCR returns list per image: [[(box, (text, conf)), ...]] | |
| for page_result in result: | |
| if page_result is None: | |
| continue | |
| for det in page_result: | |
| box = det[0] | |
| text = det[1][0] | |
| conf = float(det[1][1]) | |
| lines.append(text) | |
| items.append({"bbox": box, "text": text, "confidence": conf}) | |
| return "\n".join(lines), items | |
| def extract_text_from_pdf(file_obj, dpi: int = 300, max_pages: int | None = None, lang: str = "en") -> Tuple[str, str, Dict[str, Any]]: | |
| """ | |
| Returns combined text, JSON string with per-page OCR results, and processing stats. | |
| """ | |
| if file_obj is None: | |
| return "", json.dumps({"pages": []}, ensure_ascii=False), {"error": "No file provided"} | |
| start_time = time.time() | |
| try: | |
| # Gradio may pass a path or a tempfile.NamedTemporaryFile-like with .name | |
| pdf_path = file_obj if isinstance(file_obj, str) else getattr(file_obj, "name", None) | |
| if pdf_path is None or not os.path.exists(pdf_path): | |
| # If bytes were passed, fall back to reading from buffer | |
| file_bytes = file_obj.read() if hasattr(file_obj, "read") else None | |
| if not file_bytes: | |
| return "", json.dumps({"pages": []}, ensure_ascii=False), {"error": "Could not read file"} | |
| pdf_doc = fitz.open(stream=file_bytes, filetype="pdf") | |
| else: | |
| pdf_doc = fitz.open(pdf_path) | |
| num_pages = pdf_doc.page_count | |
| if max_pages is not None: | |
| num_pages = min(num_pages, max_pages) | |
| all_text_lines: List[str] = [] | |
| pages_payload: List[Dict[str, Any]] = [] | |
| for page_index in range(num_pages): | |
| image = pdf_page_to_image(pdf_doc, page_index, dpi=dpi) | |
| page_text, page_items = run_paddle_ocr_on_image(image, lang=lang) | |
| all_text_lines.append(page_text) | |
| pages_payload.append({ | |
| "page": page_index + 1, | |
| "items": page_items, | |
| }) | |
| combined_text = "\n\n".join([t for t in all_text_lines if t]) | |
| json_payload = json.dumps({"pages": pages_payload}, ensure_ascii=False) | |
| processing_time = time.time() - start_time | |
| stats = { | |
| "pages_processed": num_pages, | |
| "total_pages": pdf_doc.page_count, | |
| "processing_time": round(processing_time, 2), | |
| "dpi": dpi, | |
| "language": lang | |
| } | |
| pdf_doc.close() | |
| return combined_text, json_payload, stats | |
| except Exception as e: | |
| return "", json.dumps({"pages": []}, ensure_ascii=False), {"error": str(e)} | |
| def handle_pdf_ocr(pdf_file: str) -> Tuple[str, str, str]: | |
| """Main handler for PDF OCR processing""" | |
| if not pdf_file: | |
| raise gr.Error("Please upload a PDF file first.") | |
| try: | |
| print(f"Processing PDF: {pdf_file}") | |
| start_time = time.time() | |
| text, json_data, stats = extract_text_from_pdf(pdf_file, dpi=300, max_pages=None, lang="en") | |
| end_time = time.time() | |
| duration = end_time - start_time | |
| print(f"PDF processing completed in {duration:.2f} seconds.") | |
| if "error" in stats: | |
| raise gr.Error(f"Processing failed: {stats['error']}") | |
| # Format stats for display | |
| stats_text = f"""**Processing Statistics:** | |
| - Pages processed: {stats.get('pages_processed', 0)}/{stats.get('total_pages', 0)} | |
| - Processing time: {stats.get('processing_time', 0)}s | |
| - DPI: {stats.get('dpi', 300)} | |
| - Language: {stats.get('language', 'en')}""" | |
| return text, json_data, stats_text | |
| except Exception as e: | |
| error_msg = f"Error processing PDF: {str(e)}" | |
| print(error_msg) | |
| raise gr.Error(error_msg) | |
| # ========================= | |
| # CSS & UI | |
| # ========================= | |
| custom_css = """ | |
| /* Global fonts */ | |
| body, .gradio-container { | |
| font-family: "Inter", "Segoe UI", "Roboto", sans-serif; | |
| } | |
| .app-header { | |
| text-align: center; | |
| max-width: 900px; | |
| margin: 0 auto 20px !important; | |
| padding: 20px; | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| border-radius: 12px; | |
| color: white; | |
| } | |
| .app-header h1 { | |
| margin: 0; | |
| font-size: 2.5rem; | |
| font-weight: 700; | |
| } | |
| .app-header p { | |
| margin: 10px 0 0 0; | |
| opacity: 0.9; | |
| font-size: 1.1rem; | |
| } | |
| .gradio-container { | |
| padding: 20px 0 !important; | |
| max-width: 1200px; | |
| margin: 0 auto; | |
| } | |
| .upload-section { | |
| background: #f8fafc; | |
| border: 2px dashed #cbd5e1; | |
| border-radius: 12px; | |
| padding: 30px; | |
| text-align: center; | |
| margin: 20px 0; | |
| } | |
| .upload-section:hover { | |
| border-color: #667eea; | |
| background: #f1f5f9; | |
| } | |
| .results-section { | |
| margin-top: 20px; | |
| } | |
| .stats-box { | |
| background: #f0f9ff; | |
| border: 1px solid #0ea5e9; | |
| border-radius: 8px; | |
| padding: 15px; | |
| margin: 10px 0; | |
| } | |
| #text_output { | |
| min-height: 300px; | |
| font-family: 'Monaco', 'Menlo', 'Ubuntu Mono', monospace; | |
| font-size: 14px; | |
| line-height: 1.6; | |
| } | |
| #json_output { | |
| min-height: 200px; | |
| font-family: 'Monaco', 'Menlo', 'Ubuntu Mono', monospace; | |
| font-size: 12px; | |
| } | |
| .process-btn { | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; | |
| color: white !important; | |
| border: none !important; | |
| padding: 12px 30px !important; | |
| border-radius: 8px !important; | |
| font-weight: 600 !important; | |
| font-size: 16px !important; | |
| } | |
| .process-btn:hover { | |
| transform: translateY(-2px); | |
| box-shadow: 0 8px 25px rgba(102, 126, 234, 0.3); | |
| } | |
| .notice { | |
| background: #fef3c7; | |
| border: 1px solid #f59e0b; | |
| border-radius: 8px; | |
| padding: 15px; | |
| margin: 20px 0; | |
| color: #92400e; | |
| } | |
| .api-section { | |
| background: #f1f5f9; | |
| border-radius: 8px; | |
| padding: 20px; | |
| margin: 20px 0; | |
| border-left: 4px solid #667eea; | |
| } | |
| """ | |
| with gr.Blocks(head=GOOGLE_FONTS_URL, css=custom_css, theme=gr.themes.Soft()) as demo: | |
| # Header | |
| gr.HTML(""" | |
| <div class="app-header"> | |
| <h1>π PDF OCR Extractor</h1> | |
| <p>Extract text from PDF documents using PaddleOCR + PyMuPDF</p> | |
| </div> | |
| """) | |
| # Notice | |
| gr.HTML(""" | |
| <div class="notice"> | |
| <strong>π‘ Tip:</strong> This tool processes PDFs by rendering each page as a high-resolution image (300 DPI) and then applying OCR. | |
| For best results, use clear, well-scanned PDFs with good contrast. | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| # Upload section | |
| gr.HTML('<div class="upload-section">') | |
| pdf_input = gr.File( | |
| label="π Upload PDF File", | |
| file_types=[".pdf"], | |
| file_count="single", | |
| elem_id="pdf_upload" | |
| ) | |
| gr.HTML('</div>') | |
| # Process button | |
| process_btn = gr.Button( | |
| "π Extract Text", | |
| variant="primary", | |
| elem_classes=["process-btn"], | |
| scale=2 | |
| ) | |
| # API section | |
| gr.HTML(""" | |
| <div class="api-section"> | |
| <h3>π API Usage</h3> | |
| <p><strong>Endpoint:</strong> <code>/predict</code></p> | |
| <p><strong>Input:</strong> PDF file</p> | |
| <p><strong>Output:</strong> Extracted text</p> | |
| </div> | |
| """) | |
| with gr.Column(scale=2): | |
| # Results section | |
| gr.HTML('<div class="results-section">') | |
| with gr.Tabs(): | |
| with gr.Tab("π Extracted Text"): | |
| text_output = gr.Textbox( | |
| label="Extracted Text", | |
| lines=20, | |
| elem_id="text_output", | |
| placeholder="Extracted text will appear here..." | |
| ) | |
| with gr.Tab("π JSON Data"): | |
| json_output = gr.Code( | |
| label="Detailed OCR Results (JSON)", | |
| language="json", | |
| elem_id="json_output" | |
| ) | |
| with gr.Tab("π Statistics"): | |
| stats_output = gr.Markdown( | |
| label="Processing Statistics" | |
| ) | |
| gr.HTML('</div>') | |
| # Event handlers | |
| process_btn.click( | |
| fn=handle_pdf_ocr, | |
| inputs=[pdf_input], | |
| outputs=[text_output, json_output, stats_output], | |
| api_name="predict" | |
| ) | |
| # Auto-process on file upload | |
| pdf_input.change( | |
| fn=handle_pdf_ocr, | |
| inputs=[pdf_input], | |
| outputs=[text_output, json_output, stats_output], | |
| api_name="predict" | |
| ) | |
| if __name__ == "__main__": | |
| port = int(os.getenv("PORT", "7860")) | |
| demo.queue(max_size=6).launch( | |
| server_name="0.0.0.0", | |
| server_port=port, | |
| share=False | |
| ) | |