Spaces:
Sleeping
Sleeping
| import os | |
| import pandas as pd | |
| import fitz # PyMuPDF | |
| import openpyxl | |
| from openpyxl.utils.dataframe import dataframe_to_rows | |
| from openpyxl.styles import Font, PatternFill, Border, Side, Alignment | |
| from dataclasses import dataclass | |
| from typing import List, Dict, Any, Tuple, Optional | |
| import re | |
| from pathlib import Path | |
| import logging | |
| from datetime import datetime | |
| import numpy as np | |
| # Optional imports with graceful fallback | |
| try: | |
| import camelot # For advanced table extraction | |
| CAMELOT_AVAILABLE = True | |
| except ImportError: | |
| CAMELOT_AVAILABLE = False | |
| print("β οΈ Camelot not installed. Run: pip install camelot-py[cv]") | |
| try: | |
| import tabula # Alternative table extraction | |
| TABULA_AVAILABLE = True | |
| except ImportError: | |
| TABULA_AVAILABLE = False | |
| print("β οΈ Tabula not installed. Run: pip install tabula-py") | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| class TextBlock: | |
| text: str | |
| x: float | |
| y: float | |
| width: float | |
| height: float | |
| font_size: float | |
| font_name: str | |
| is_bold: bool = False | |
| is_italic: bool = False | |
| page_num: int = 1 | |
| block_id: str = "" | |
| class TableData: | |
| data: List[List[str]] | |
| bbox: Tuple[float, float, float, float] | |
| page_num: int | |
| confidence: float = 0.0 | |
| has_header: bool = True | |
| class PDFToExcelConverter: | |
| """ | |
| Enhanced PDF to Excel converter with multiple extraction methods | |
| for better accuracy and handling of complex documents. | |
| """ | |
| def __init__(self): | |
| # Check available extraction methods | |
| available_methods = ['pymupdf'] # Always available | |
| if CAMELOT_AVAILABLE: | |
| available_methods.append('camelot') | |
| if TABULA_AVAILABLE: | |
| available_methods.append('tabula') | |
| self.extraction_methods = available_methods | |
| self.output_formats = { | |
| 'separate_sheets': 'Each table and text section on separate sheets', | |
| 'combined': 'All content combined logically', | |
| 'structured': 'Maintain document structure with proper formatting' | |
| } | |
| # Log available methods | |
| logger.info(f"Available extraction methods: {', '.join(available_methods)}") | |
| def extract_text_blocks_advanced(self, page, page_num: int) -> List[TextBlock]: | |
| """ | |
| Advanced text extraction with better formatting detection | |
| """ | |
| text_blocks = [] | |
| try: | |
| # Method 1: Dictionary-based extraction (most detailed) | |
| page_dict = page.get_text("dict") | |
| for block_idx, block in enumerate(page_dict.get("blocks", [])): | |
| if block.get("type", 1) != 0: # Skip non-text blocks | |
| continue | |
| for line_idx, line in enumerate(block.get("lines", [])): | |
| for span_idx, span in enumerate(line.get("spans", [])): | |
| text_content = span.get("text", "").strip() | |
| if not text_content: | |
| continue | |
| bbox = span["bbox"] | |
| flags = span.get("flags", 0) | |
| # Enhanced font detection | |
| font_name = span.get("font", "Arial") | |
| font_size = span.get("size", 12) | |
| is_bold = bool(flags & 16) or "bold" in font_name.lower() | |
| is_italic = bool(flags & 2) or "italic" in font_name.lower() | |
| text_block = TextBlock( | |
| text=text_content, | |
| x=bbox[0], y=bbox[1], | |
| width=bbox[2] - bbox[0], | |
| height=bbox[3] - bbox[1], | |
| font_size=font_size, | |
| font_name=font_name, | |
| is_bold=is_bold, | |
| is_italic=is_italic, | |
| page_num=page_num, | |
| block_id=f"p{page_num}_b{block_idx}_l{line_idx}_s{span_idx}" | |
| ) | |
| text_blocks.append(text_block) | |
| except Exception as e: | |
| logger.warning(f"Advanced text extraction failed for page {page_num}: {e}") | |
| # Fallback to simple extraction | |
| text_blocks = self._extract_text_simple_fallback(page, page_num) | |
| return text_blocks | |
| def _extract_text_simple_fallback(self, page, page_num: int) -> List[TextBlock]: | |
| """ | |
| Fallback text extraction method | |
| """ | |
| text_blocks = [] | |
| try: | |
| text = page.get_text() | |
| if text.strip(): | |
| # Create a single text block for the entire page content | |
| rect = page.rect | |
| text_block = TextBlock( | |
| text=text.strip(), | |
| x=0, y=0, | |
| width=rect.width, | |
| height=rect.height, | |
| font_size=12, | |
| font_name="Arial", | |
| page_num=page_num, | |
| block_id=f"p{page_num}_fallback" | |
| ) | |
| text_blocks.append(text_block) | |
| except Exception as e: | |
| logger.error(f"Fallback text extraction failed for page {page_num}: {e}") | |
| return text_blocks | |
| def extract_tables_multiple_methods(self, pdf_path: str, page_num: int) -> List[TableData]: | |
| """ | |
| Extract tables using multiple methods and combine results | |
| """ | |
| all_tables = [] | |
| # Method 1: PyMuPDF built-in table detection | |
| tables_pymupdf = self._extract_tables_pymupdf(pdf_path, page_num) | |
| all_tables.extend(tables_pymupdf) | |
| # Method 2: Camelot (if available) | |
| if CAMELOT_AVAILABLE: | |
| try: | |
| tables_camelot = self._extract_tables_camelot(pdf_path, page_num) | |
| all_tables.extend(tables_camelot) | |
| except Exception as e: | |
| logger.warning(f"Camelot extraction failed: {e}") | |
| # Method 3: Tabula (if available) | |
| if TABULA_AVAILABLE: | |
| try: | |
| tables_tabula = self._extract_tables_tabula(pdf_path, page_num) | |
| all_tables.extend(tables_tabula) | |
| except Exception as e: | |
| logger.warning(f"Tabula extraction failed: {e}") | |
| # Remove duplicates and return best tables | |
| return self._deduplicate_tables(all_tables) | |
| def _extract_tables_pymupdf(self, pdf_path: str, page_num: int) -> List[TableData]: | |
| """ | |
| Extract tables using PyMuPDF | |
| """ | |
| tables = [] | |
| try: | |
| doc = fitz.open(pdf_path) | |
| page = doc[page_num - 1] # Convert to 0-based index | |
| detected_tables = page.find_tables() | |
| for i, table in enumerate(detected_tables): | |
| try: | |
| table_data = table.extract() | |
| if table_data and len(table_data) > 0: | |
| # Clean the table data | |
| cleaned_data = [] | |
| for row in table_data: | |
| cleaned_row = [] | |
| for cell in row: | |
| cell_text = str(cell).strip() if cell else "" | |
| cleaned_row.append(cell_text) | |
| if any(cleaned_row): # Only add non-empty rows | |
| cleaned_data.append(cleaned_row) | |
| if cleaned_data: | |
| tables.append(TableData( | |
| data=cleaned_data, | |
| bbox=table.bbox, | |
| page_num=page_num, | |
| confidence=0.8, # PyMuPDF generally reliable | |
| has_header=True | |
| )) | |
| except Exception as e: | |
| logger.warning(f"Error extracting PyMuPDF table {i}: {e}") | |
| doc.close() | |
| except Exception as e: | |
| logger.error(f"PyMuPDF table extraction failed: {e}") | |
| return tables | |
| def _extract_tables_camelot(self, pdf_path: str, page_num: int) -> List[TableData]: | |
| """ | |
| Extract tables using Camelot (only if available) | |
| """ | |
| if not CAMELOT_AVAILABLE: | |
| return [] | |
| tables = [] | |
| try: | |
| # Camelot works with page numbers (1-based) | |
| camelot_tables = camelot.read_pdf(pdf_path, pages=str(page_num), flavor='lattice') | |
| for i, table in enumerate(camelot_tables): | |
| df = table.df | |
| if not df.empty: | |
| # Convert DataFrame to list of lists | |
| table_data = df.values.tolist() | |
| # Add headers if they exist | |
| if not df.columns.empty: | |
| headers = df.columns.tolist() | |
| table_data.insert(0, headers) | |
| tables.append(TableData( | |
| data=table_data, | |
| bbox=(0, 0, 100, 100), # Camelot doesn't provide bbox | |
| page_num=page_num, | |
| confidence=table.accuracy / 100.0 if hasattr(table, 'accuracy') else 0.7, | |
| has_header=True | |
| )) | |
| except Exception as e: | |
| logger.warning(f"Camelot extraction failed: {e}") | |
| return tables | |
| def _extract_tables_tabula(self, pdf_path: str, page_num: int) -> List[TableData]: | |
| """ | |
| Extract tables using Tabula (only if available) | |
| """ | |
| if not TABULA_AVAILABLE: | |
| return [] | |
| tables = [] | |
| try: | |
| # Tabula works with page numbers (1-based) | |
| tabula_tables = tabula.read_pdf(pdf_path, pages=page_num, multiple_tables=True) | |
| for i, df in enumerate(tabula_tables): | |
| if not df.empty: | |
| # Convert DataFrame to list of lists | |
| table_data = df.fillna('').values.tolist() | |
| # Add headers | |
| headers = df.columns.tolist() | |
| table_data.insert(0, headers) | |
| tables.append(TableData( | |
| data=table_data, | |
| bbox=(0, 0, 100, 100), # Tabula doesn't provide bbox | |
| page_num=page_num, | |
| confidence=0.7, | |
| has_header=True | |
| )) | |
| except Exception as e: | |
| logger.warning(f"Tabula extraction failed: {e}") | |
| return tables | |
| def _deduplicate_tables(self, tables: List[TableData]) -> List[TableData]: | |
| """ | |
| Remove duplicate tables by comparing content | |
| """ | |
| if not tables: | |
| return tables | |
| unique_tables = [] | |
| for table in tables: | |
| is_duplicate = False | |
| for existing_table in unique_tables: | |
| if self._tables_are_similar(table, existing_table): | |
| # Keep the one with higher confidence | |
| if table.confidence > existing_table.confidence: | |
| unique_tables.remove(existing_table) | |
| unique_tables.append(table) | |
| is_duplicate = True | |
| break | |
| if not is_duplicate: | |
| unique_tables.append(table) | |
| return unique_tables | |
| def _tables_are_similar(self, table1: TableData, table2: TableData, threshold: float = 0.8) -> bool: | |
| """ | |
| Check if two tables are similar (likely duplicates) | |
| """ | |
| if len(table1.data) != len(table2.data): | |
| return False | |
| if not table1.data or not table2.data: | |
| return False | |
| # Compare dimensions | |
| if len(table1.data[0]) != len(table2.data[0]): | |
| return False | |
| # Compare content similarity | |
| matching_cells = 0 | |
| total_cells = len(table1.data) * len(table1.data[0]) | |
| for i, (row1, row2) in enumerate(zip(table1.data, table2.data)): | |
| for j, (cell1, cell2) in enumerate(zip(row1, row2)): | |
| if str(cell1).strip().lower() == str(cell2).strip().lower(): | |
| matching_cells += 1 | |
| similarity = matching_cells / total_cells if total_cells > 0 else 0 | |
| return similarity >= threshold | |
| def process_pdf_to_excel(self, pdf_path: str, output_path: str, format_type: str = 'structured') -> str: | |
| """ | |
| Convert PDF to Excel with enhanced processing | |
| """ | |
| logger.info(f"Starting PDF to Excel conversion: {pdf_path}") | |
| if not os.path.exists(pdf_path): | |
| raise FileNotFoundError(f"PDF file not found: {pdf_path}") | |
| # Extract content from PDF | |
| pdf_content = self._extract_comprehensive_content(pdf_path) | |
| # Create Excel workbook | |
| output_path = self._create_excel_workbook(pdf_content, output_path, format_type) | |
| logger.info(f"Successfully converted PDF to Excel: {output_path}") | |
| return output_path | |
| def _extract_comprehensive_content(self, pdf_path: str) -> Dict[str, Any]: | |
| """ | |
| Extract all content from PDF using multiple methods | |
| """ | |
| content = { | |
| 'pages': [], | |
| 'total_pages': 0, | |
| 'metadata': {} | |
| } | |
| try: | |
| doc = fitz.open(pdf_path) | |
| content['total_pages'] = doc.page_count | |
| content['metadata'] = doc.metadata | |
| logger.info(f"Processing {doc.page_count} pages...") | |
| for page_num in range(doc.page_count): | |
| page = doc[page_num] | |
| logger.info(f"Processing page {page_num + 1}/{doc.page_count}") | |
| # Extract text blocks | |
| text_blocks = self.extract_text_blocks_advanced(page, page_num + 1) | |
| # Extract tables using multiple methods | |
| tables = self.extract_tables_multiple_methods(pdf_path, page_num + 1) | |
| # Extract images (basic) | |
| images = self._extract_images_basic(page, page_num + 1) | |
| page_content = { | |
| 'page_number': page_num + 1, | |
| 'text_blocks': text_blocks, | |
| 'tables': tables, | |
| 'images': images, | |
| 'page_width': page.rect.width, | |
| 'page_height': page.rect.height | |
| } | |
| content['pages'].append(page_content) | |
| doc.close() | |
| except Exception as e: | |
| logger.error(f"Error extracting PDF content: {e}") | |
| raise | |
| return content | |
| def _extract_images_basic(self, page, page_num: int) -> List[Dict]: | |
| """ | |
| Basic image extraction for reference | |
| """ | |
| images = [] | |
| try: | |
| image_list = page.get_images() | |
| for i, img in enumerate(image_list): | |
| images.append({ | |
| 'index': i, | |
| 'page': page_num, | |
| 'bbox': img # Simplified | |
| }) | |
| except Exception as e: | |
| logger.warning(f"Image extraction failed for page {page_num}: {e}") | |
| return images | |
| def _create_excel_workbook(self, content: Dict[str, Any], output_path: str, format_type: str) -> str: | |
| """ | |
| Create Excel workbook with proper formatting | |
| """ | |
| with pd.ExcelWriter(output_path, engine='openpyxl') as writer: | |
| if format_type == 'structured': | |
| self._create_structured_workbook(content, writer) | |
| elif format_type == 'combined': | |
| self._create_combined_workbook(content, writer) | |
| else: # separate_sheets | |
| self._create_separate_sheets_workbook(content, writer) | |
| # Add summary sheet | |
| self._add_summary_sheet(content, writer) | |
| # Apply formatting | |
| self._apply_excel_formatting(output_path) | |
| return output_path | |
| def _create_structured_workbook(self, content: Dict[str, Any], writer): | |
| """ | |
| Create structured workbook maintaining document flow | |
| """ | |
| for page_data in content['pages']: | |
| page_num = page_data['page_number'] | |
| # Process tables first | |
| table_count = 0 | |
| for table in page_data['tables']: | |
| if table.data: | |
| df = pd.DataFrame(table.data[1:], columns=table.data[0] if table.has_header else None) | |
| sheet_name = f"P{page_num}_Table{table_count + 1}"[:31] | |
| df.to_excel(writer, sheet_name=sheet_name, index=False) | |
| table_count += 1 | |
| # Process text content | |
| if page_data['text_blocks']: | |
| # Group text blocks by proximity and formatting | |
| text_groups = self._group_text_blocks(page_data['text_blocks']) | |
| for i, group in enumerate(text_groups): | |
| if group['content'].strip(): | |
| text_df = pd.DataFrame([{ | |
| 'Content': group['content'], | |
| 'Font_Size': group.get('font_size', 12), | |
| 'Is_Bold': group.get('is_bold', False), | |
| 'Position_X': group.get('x', 0), | |
| 'Position_Y': group.get('y', 0) | |
| }]) | |
| sheet_name = f"P{page_num}_Text{i + 1}"[:31] | |
| text_df.to_excel(writer, sheet_name=sheet_name, index=False) | |
| def _create_combined_workbook(self, content: Dict[str, Any], writer): | |
| """ | |
| Create combined workbook with all tables and text together | |
| """ | |
| all_tables = [] | |
| all_text = [] | |
| for page_data in content['pages']: | |
| page_num = page_data['page_number'] | |
| # Collect all tables | |
| for i, table in enumerate(page_data['tables']): | |
| if table.data: | |
| df = pd.DataFrame(table.data[1:], columns=table.data[0] if table.has_header else None) | |
| df['Source_Page'] = page_num | |
| df['Table_Index'] = i + 1 | |
| all_tables.append(df) | |
| # Collect all text | |
| text_content = '\n'.join([block.text for block in page_data['text_blocks']]) | |
| if text_content.strip(): | |
| all_text.append({ | |
| 'Page': page_num, | |
| 'Content': text_content.strip() | |
| }) | |
| # Write combined tables | |
| if all_tables: | |
| combined_tables = pd.concat(all_tables, ignore_index=True) | |
| combined_tables.to_excel(writer, sheet_name='All_Tables', index=False) | |
| # Write combined text | |
| if all_text: | |
| text_df = pd.DataFrame(all_text) | |
| text_df.to_excel(writer, sheet_name='All_Text', index=False) | |
| def _create_separate_sheets_workbook(self, content: Dict[str, Any], writer): | |
| """ | |
| Create workbook with each element on separate sheets | |
| """ | |
| table_counter = 1 | |
| text_counter = 1 | |
| for page_data in content['pages']: | |
| page_num = page_data['page_number'] | |
| # Each table gets its own sheet | |
| for table in page_data['tables']: | |
| if table.data: | |
| df = pd.DataFrame(table.data[1:], columns=table.data[0] if table.has_header else None) | |
| sheet_name = f"Table_{table_counter}"[:31] | |
| df.to_excel(writer, sheet_name=sheet_name, index=False) | |
| table_counter += 1 | |
| # Page text gets its own sheet | |
| if page_data['text_blocks']: | |
| text_content = '\n'.join([block.text for block in page_data['text_blocks']]) | |
| if text_content.strip(): | |
| text_df = pd.DataFrame([{'Page': page_num, 'Content': text_content}]) | |
| sheet_name = f"Text_{text_counter}"[:31] | |
| text_df.to_excel(writer, sheet_name=sheet_name, index=False) | |
| text_counter += 1 | |
| def _group_text_blocks(self, text_blocks: List[TextBlock]) -> List[Dict]: | |
| """ | |
| Group text blocks by proximity and formatting | |
| """ | |
| if not text_blocks: | |
| return [] | |
| # Sort by position (top to bottom, left to right) | |
| sorted_blocks = sorted(text_blocks, key=lambda b: (b.y, b.x)) | |
| groups = [] | |
| current_group = { | |
| 'content': '', | |
| 'font_size': sorted_blocks[0].font_size, | |
| 'is_bold': sorted_blocks[0].is_bold, | |
| 'x': sorted_blocks[0].x, | |
| 'y': sorted_blocks[0].y | |
| } | |
| for block in sorted_blocks: | |
| # Check if block should be in current group (similar formatting and position) | |
| if (abs(current_group['font_size'] - block.font_size) < 2 and | |
| current_group['is_bold'] == block.is_bold): | |
| current_group['content'] += ' ' + block.text | |
| else: | |
| # Start new group | |
| if current_group['content'].strip(): | |
| groups.append(current_group) | |
| current_group = { | |
| 'content': block.text, | |
| 'font_size': block.font_size, | |
| 'is_bold': block.is_bold, | |
| 'x': block.x, | |
| 'y': block.y | |
| } | |
| # Add last group | |
| if current_group['content'].strip(): | |
| groups.append(current_group) | |
| return groups | |
| def _add_summary_sheet(self, content: Dict[str, Any], writer): | |
| """ | |
| Add summary sheet with document statistics | |
| """ | |
| total_tables = sum(len(page['tables']) for page in content['pages']) | |
| total_text_blocks = sum(len(page['text_blocks']) for page in content['pages']) | |
| summary_data = { | |
| 'Statistic': [ | |
| 'Total Pages', | |
| 'Total Tables', | |
| 'Total Text Blocks', | |
| 'Processing Date', | |
| 'Document Title' | |
| ], | |
| 'Value': [ | |
| content['total_pages'], | |
| total_tables, | |
| total_text_blocks, | |
| datetime.now().strftime('%Y-%m-%d %H:%M:%S'), | |
| content['metadata'].get('title', 'Unknown') | |
| ] | |
| } | |
| summary_df = pd.DataFrame(summary_data) | |
| summary_df.to_excel(writer, sheet_name='Summary', index=False) | |
| def _apply_excel_formatting(self, file_path: str): | |
| """ | |
| Apply formatting to the Excel file | |
| """ | |
| try: | |
| wb = openpyxl.load_workbook(file_path) | |
| # Define styles | |
| header_font = Font(bold=True, color="FFFFFF") | |
| header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid") | |
| border = Border( | |
| left=Side(style='thin'), | |
| right=Side(style='thin'), | |
| top=Side(style='thin'), | |
| bottom=Side(style='thin') | |
| ) | |
| for sheet_name in wb.sheetnames: | |
| ws = wb[sheet_name] | |
| # Format headers | |
| if ws.max_row > 0: | |
| for cell in ws[1]: | |
| cell.font = header_font | |
| cell.fill = header_fill | |
| cell.alignment = Alignment(horizontal='center', vertical='center') | |
| cell.border = border | |
| # Auto-adjust column widths | |
| for column in ws.columns: | |
| max_length = 0 | |
| column_letter = column[0].column_letter | |
| for cell in column: | |
| try: | |
| if len(str(cell.value)) > max_length: | |
| max_length = len(str(cell.value)) | |
| except: | |
| pass | |
| adjusted_width = min(max_length + 2, 50) | |
| ws.column_dimensions[column_letter].width = adjusted_width | |
| wb.save(file_path) | |
| except Exception as e: | |
| logger.warning(f"Could not apply formatting: {e}") | |
| # Usage example and main function | |
| def install_dependencies(): | |
| """ | |
| Print installation instructions for missing dependencies | |
| """ | |
| print("π¦ INSTALLATION INSTRUCTIONS:") | |
| print("=" * 50) | |
| required_packages = [ | |
| ("PyMuPDF", "pip install PyMuPDF", True), | |
| ("pandas", "pip install pandas", True), | |
| ("openpyxl", "pip install openpyxl", True), | |
| ("numpy", "pip install numpy", True), | |
| ("camelot-py", "pip install camelot-py[cv]", CAMELOT_AVAILABLE), | |
| ("tabula-py", "pip install tabula-py", TABULA_AVAILABLE) | |
| ] | |
| print("\nβ CORE PACKAGES (Required):") | |
| for name, cmd, available in required_packages[:4]: | |
| status = "β Installed" if available else "β Missing" | |
| print(f" {name}: {status}") | |
| if not available: | |
| print(f" Install: {cmd}") | |
| print("\nπ§ OPTIONAL PACKAGES (For better table extraction):") | |
| for name, cmd, available in required_packages[4:]: | |
| status = "β Installed" if available else "β Missing" | |
| print(f" {name}: {status}") | |
| if not available: | |
| print(f" Install: {cmd}") | |
| print("\nπ‘ INSTALL ALL AT ONCE:") | |
| print("pip install PyMuPDF pandas openpyxl numpy camelot-py[cv] tabula-py") | |
| print("\n" + "=" * 50) | |
| def main(): | |
| """ | |
| Main function to demonstrate usage | |
| """ | |
| print("π Enhanced PDF to Excel Converter") | |
| print("=" * 40) | |
| # Show installation status | |
| install_dependencies() | |
| converter = PDFToExcelConverter() | |
| # Example usage | |
| pdf_path = "input.pdf" # Replace with your PDF path | |
| output_path = "output.xlsx" # Replace with desired output path | |
| try: | |
| # Check if PDF file exists | |
| if not os.path.exists(pdf_path): | |
| print(f"\nβ PDF file not found: {pdf_path}") | |
| print("Please update the 'pdf_path' variable with your actual PDF file path.") | |
| return | |
| print(f"\nπ Converting: {pdf_path}") | |
| result = converter.process_pdf_to_excel( | |
| pdf_path=pdf_path, | |
| output_path=output_path, | |
| format_type='structured' # Options: 'structured', 'combined', 'separate_sheets' | |
| ) | |
| print(f"β Conversion completed successfully: {result}") | |
| except Exception as e: | |
| print(f"β Conversion failed: {e}") | |
| print("\nπ οΈ TROUBLESHOOTING:") | |
| print("1. Make sure all required packages are installed") | |
| print("2. Check that your PDF file exists and is readable") | |
| print("3. Ensure you have write permissions for the output directory") | |
| if __name__ == "__main__": | |
| main() |