Spaces:
Running
Running
| # extract_text_from_pdf.py | |
| import os | |
| import torch | |
| import spaces | |
| from PyPDF2 import PdfReader | |
| from accelerate import Accelerator | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| from tqdm import tqdm | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| class PDFTextExtractor: | |
| """ | |
| A class to handle PDF text extraction and preprocessing for podcast preparation. | |
| """ | |
| def __init__(self, pdf_path, output_path): | |
| """ | |
| Initialize the PDFTextExtractor with paths and model details. | |
| Args: | |
| pdf_path (str): Path to the PDF file. | |
| output_path (str): Path to save the cleaned text file. | |
| model_name (str): Name of the model to use for text processing. | |
| """ | |
| model_name="meta-llama/Llama-3.2-1B-Instruct" | |
| self.pdf_path = pdf_path | |
| self.output_path = output_path | |
| self.max_chars = 100000 | |
| self.chunk_size = 1000 | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # Initialize model and tokenizer | |
| self.accelerator = Accelerator() | |
| self.model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16).to(self.device) | |
| self.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| self.model, self.tokenizer = self.accelerator.prepare(self.model, self.tokenizer) | |
| # System prompt for text processing | |
| self.system_prompt = """ | |
| You are a world class text pre-processor, here is the raw data from a PDF, please parse and return it in a way that is crispy and usable to send to a podcast writer. | |
| Be smart and aggressive with removing details; you're only cleaning up the text without summarizing. | |
| Here is the text: | |
| """ | |
| def validate_pdf(self): | |
| """Check if the file exists and is a valid PDF.""" | |
| if not os.path.exists(self.pdf_path): | |
| print(f"Error: File not found at path: {self.pdf_path}") | |
| return False | |
| if not self.pdf_path.lower().endswith('.pdf'): | |
| print("Error: File is not a PDF") | |
| return False | |
| return True | |
| def extract_text(self): | |
| """Extract text from the PDF, limited by max_chars.""" | |
| if not self.validate_pdf(): | |
| return None | |
| with open(self.pdf_path, 'rb') as file: | |
| pdf_reader = PdfReader(file) | |
| num_pages = len(pdf_reader.pages) | |
| print(f"Processing PDF with {num_pages} pages...") | |
| extracted_text = [] | |
| total_chars = 0 | |
| for page_num in range(num_pages): | |
| page = pdf_reader.pages[page_num] | |
| text = page.extract_text() or "" | |
| if total_chars + len(text) > self.max_chars: | |
| remaining_chars = self.max_chars - total_chars | |
| extracted_text.append(text[:remaining_chars]) | |
| print(f"Reached {self.max_chars} character limit at page {page_num + 1}") | |
| break | |
| extracted_text.append(text) | |
| total_chars += len(text) | |
| print(f"Processed page {page_num + 1}/{num_pages}") | |
| final_text = '\n'.join(extracted_text) | |
| print(f"Extraction complete! Total characters: {len(final_text)}") | |
| return final_text | |
| def create_word_bounded_chunks(self, text): | |
| """Split text into chunks around the target size.""" | |
| words = text.split() | |
| chunks = [] | |
| current_chunk = [] | |
| current_length = 0 | |
| for word in words: | |
| word_length = len(word) + 1 # +1 for the space | |
| if current_length + word_length > self.chunk_size and current_chunk: | |
| chunks.append(' '.join(current_chunk)) | |
| current_chunk = [word] | |
| current_length = word_length | |
| else: | |
| current_chunk.append(word) | |
| current_length += word_length | |
| if current_chunk: | |
| chunks.append(' '.join(current_chunk)) | |
| return chunks | |
| def process_chunk(self, text_chunk): | |
| """Process a text chunk with the model and return the cleaned text.""" | |
| conversation = [ | |
| {"role": "system", "content": self.system_prompt}, | |
| {"role": "user", "content": text_chunk} | |
| ] | |
| prompt = self.tokenizer.apply_chat_template(conversation, tokenize=False) | |
| inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device) | |
| with torch.no_grad(): | |
| output = self.model.generate(**inputs, temperature=0.7, top_p=0.9, max_new_tokens=512) | |
| processed_text = self.tokenizer.decode(output[0], skip_special_tokens=True)[len(prompt):].strip() | |
| return processed_text | |
| def clean_and_save_text(self): | |
| """Extract, clean, and save processed text to a file.""" | |
| extracted_text = self.extract_text() | |
| if not extracted_text: | |
| return None | |
| chunks = self.create_word_bounded_chunks(extracted_text) | |
| processed_text = "" | |
| with open(self.output_path, 'w', encoding='utf-8') as out_file: | |
| for chunk_num, chunk in enumerate(tqdm(chunks, desc="Processing chunks")): | |
| processed_chunk = self.process_chunk(chunk) | |
| processed_text += processed_chunk + "\n" | |
| out_file.write(processed_chunk + "\n") | |
| out_file.flush() | |
| print(f"\nExtracted and cleaned text has been saved to {self.output_path}") | |
| return self.output_path | |