| import json | |
| import os | |
| from copy import deepcopy | |
| import numpy as np | |
| # import torch | |
| # import torchvision.transforms as T | |
| # from FlagEmbedding import BGEM3FlagModel | |
| from marker.config.parser import ConfigParser | |
| from marker.converters.pdf import PdfConverter | |
| from marker.output import text_from_rendered | |
| from PIL import Image | |
| # from torchvision.transforms.functional import InterpolationMode | |
| # from transformers import AutoFeatureExtractor, AutoModel | |
| # from utils.src.presentation import Presentation, SlidePage | |
| # from utils.src.utils import is_image_path, pjoin | |
| pjoin = os.path.join | |
| # device_count = torch.cuda.device_count() | |
| # def prs_dedup( | |
| # presentation: Presentation, | |
| # model: BGEM3FlagModel, | |
| # batchsize: int = 32, | |
| # threshold: float = 0.8, | |
| # ) -> list[SlidePage]: | |
| # """ | |
| # Deduplicate slides in a presentation based on text similarity. | |
| # Args: | |
| # presentation (Presentation): The presentation object containing slides. | |
| # model: The model used for generating text embeddings. | |
| # batchsize (int): The batch size for processing slides. | |
| # threshold (float): The similarity threshold for deduplication. | |
| # Returns: | |
| # list: A list of removed duplicate slides. | |
| # """ | |
| # text_embeddings = get_text_embedding( | |
| # [i.to_text() for i in presentation.slides], model, batchsize | |
| # ) | |
| # pre_embedding = text_embeddings[0] | |
| # slide_idx = 1 | |
| # duplicates = [] | |
| # while slide_idx < len(presentation): | |
| # cur_embedding = text_embeddings[slide_idx] | |
| # if torch.cosine_similarity(pre_embedding, cur_embedding, -1) > threshold: | |
| # duplicates.append(slide_idx - 1) | |
| # slide_idx += 1 | |
| # pre_embedding = cur_embedding | |
| # return [presentation.slides.pop(i) for i in reversed(duplicates)] | |
| # def get_text_model(device: str = None) -> BGEM3FlagModel: | |
| # """ | |
| # Initialize and return a text model. | |
| # Args: | |
| # device (str): The device to run the model on. | |
| # Returns: | |
| # BGEM3FlagModel: The initialized text model. | |
| # """ | |
| # return BGEM3FlagModel( | |
| # "BAAI/bge-m3", | |
| # use_fp16=True, | |
| # device=device, | |
| # ) | |
| # def get_image_model(device: str = None): | |
| # """ | |
| # Initialize and return an image model and its feature extractor. | |
| # Args: | |
| # device (str): The device to run the model on. | |
| # Returns: | |
| # tuple: A tuple containing the feature extractor and the image model. | |
| # """ | |
| # model_base = "google/vit-base-patch16-224-in21k" | |
| # return ( | |
| # AutoFeatureExtractor.from_pretrained( | |
| # model_base, | |
| # torch_dtype=torch.float16, | |
| # device_map=device, | |
| # ), | |
| # AutoModel.from_pretrained( | |
| # model_base, | |
| # torch_dtype=torch.float16, | |
| # device_map=device, | |
| # ).eval(), | |
| # ) | |
| def parse_pdf( | |
| pdf_path: str, | |
| output_path: str = None, | |
| model_lst: list = None, | |
| save_file: bool = True, | |
| ) -> str: | |
| """ | |
| Parse a PDF file and extract text and images. | |
| Args: | |
| pdf_path (str): The path to the PDF file. | |
| output_path (str): The directory to save the extracted content. | |
| model_lst (list): A list of models for processing the PDF. | |
| Returns: | |
| str: The full text extracted from the PDF. | |
| """ | |
| if save_file: | |
| os.makedirs(output_path, exist_ok=True) | |
| config_parser = ConfigParser( | |
| { | |
| "output_format": "markdown", | |
| } | |
| ) | |
| converter = PdfConverter( | |
| config=config_parser.generate_config_dict(), | |
| artifact_dict=model_lst, | |
| processor_list=config_parser.get_processors(), | |
| renderer=config_parser.get_renderer(), | |
| ) | |
| rendered = converter(pdf_path) | |
| full_text, _, images = text_from_rendered(rendered) | |
| if save_file: | |
| with open(pjoin(output_path, "source.md"), "w+", encoding="utf-8") as f: | |
| f.write(full_text) | |
| for filename, image in images.items(): | |
| image_filepath = os.path.join(output_path, filename) | |
| image.save(image_filepath, "JPEG") | |
| with open(pjoin(output_path, "meta.json"), "w+") as f: | |
| f.write(json.dumps(rendered.metadata, indent=4)) | |
| if not save_file: | |
| return full_text, rendered | |
| return full_text | |
| # def get_text_embedding( | |
| # text: list[str], model: BGEM3FlagModel, batchsize: int = 32 | |
| # ) -> list[torch.Tensor]: | |
| # """ | |
| # Generate text embeddings for a list of text strings. | |
| # Args: | |
| # text (list[str]): A list of text strings. | |
| # model: The model used for generating embeddings. | |
| # batchsize (int): The batch size for processing text. | |
| # Returns: | |
| # list: A list of text embeddings. | |
| # """ | |
| # if isinstance(text, str): | |
| # return torch.tensor(model.encode(text)["dense_vecs"]).to(model.device) | |
| # result = [] | |
| # for i in range(0, len(text), batchsize): | |
| # result.extend( | |
| # torch.tensor(model.encode(text[i : i + batchsize])["dense_vecs"]).to( | |
| # model.device | |
| # ) | |
| # ) | |
| # return result | |
| # def get_image_embedding( | |
| # image_dir: str, extractor, model, batchsize: int = 16 | |
| # ) -> dict[str, torch.Tensor]: | |
| # """ | |
| # Generate image embeddings for images in a directory. | |
| # Args: | |
| # image_dir (str): The directory containing images. | |
| # extractor: The feature extractor for images. | |
| # model: The model used for generating embeddings. | |
| # batchsize (int): The batch size for processing images. | |
| # Returns: | |
| # dict: A dictionary mapping image filenames to their embeddings. | |
| # """ | |
| # transform = T.Compose( | |
| # [ | |
| # T.Resize(int((256 / 224) * extractor.size["height"])), | |
| # T.CenterCrop(extractor.size["height"]), | |
| # T.ToTensor(), | |
| # T.Normalize(mean=extractor.image_mean, std=extractor.image_std), | |
| # ] | |
| # ) | |
| # inputs = [] | |
| # embeddings = [] | |
| # images = [i for i in sorted(os.listdir(image_dir)) if is_image_path(i)] | |
| # for file in images: | |
| # image = Image.open(pjoin(image_dir, file)).convert("RGB") | |
| # inputs.append(transform(image)) | |
| # if len(inputs) % batchsize == 0 or file == images[-1]: | |
| # batch = {"pixel_values": torch.stack(inputs).to(model.device)} | |
| # embeddings.extend(model(**batch).last_hidden_state.detach()) | |
| # inputs.clear() | |
| # return {image: embedding.flatten() for image, embedding in zip(images, embeddings)} | |
| # def images_cosine_similarity(embeddings: list[torch.Tensor]) -> torch.Tensor: | |
| # """ | |
| # Calculate the cosine similarity matrix for a list of embeddings. | |
| # Args: | |
| # embeddings (list[torch.Tensor]): A list of image embeddings. | |
| # Returns: | |
| # torch.Tensor: A NxN similarity matrix. | |
| # """ | |
| # embeddings = [embedding for embedding in embeddings] | |
| # sim_matrix = torch.zeros((len(embeddings), len(embeddings))) | |
| # for i in range(len(embeddings)): | |
| # for j in range(i + 1, len(embeddings)): | |
| # sim_matrix[i, j] = sim_matrix[j, i] = torch.cosine_similarity( | |
| # embeddings[i], embeddings[j], -1 | |
| # ) | |
| # return sim_matrix | |
| IMAGENET_MEAN = (0.485, 0.456, 0.406) | |
| IMAGENET_STD = (0.229, 0.224, 0.225) | |
| # def average_distance( | |
| # similarity: torch.Tensor, idx: int, cluster_idx: list[int] | |
| # ) -> float: | |
| # """ | |
| # Calculate the average distance between a point (idx) and a cluster (cluster_idx). | |
| # Args: | |
| # similarity (np.ndarray): The similarity matrix. | |
| # idx (int): The index of the point. | |
| # cluster_idx (list): The indices of the cluster. | |
| # Returns: | |
| # float: The average distance. | |
| # """ | |
| # if idx in cluster_idx: | |
| # return 0 | |
| # total_similarity = 0 | |
| # for idx_in_cluster in cluster_idx: | |
| # total_similarity += similarity[idx, idx_in_cluster] | |
| # return total_similarity / len(cluster_idx) | |
| # def get_cluster(similarity: np.ndarray, sim_bound: float = 0.65): | |
| # """ | |
| # Cluster points based on similarity. | |
| # Args: | |
| # similarity (np.ndarray): The similarity matrix. | |
| # sim_bound (float): The similarity threshold for clustering. | |
| # Returns: | |
| # list: A list of clusters. | |
| # """ | |
| # num_points = similarity.shape[0] | |
| # clusters = [] | |
| # sim_copy = deepcopy(similarity) | |
| # added = [False] * num_points | |
| # while True: | |
| # max_avg_dist = sim_bound | |
| # best_cluster = None | |
| # best_point = None | |
| # for c in clusters: | |
| # for point_idx in range(num_points): | |
| # if added[point_idx]: | |
| # continue | |
| # avg_dist = average_distance(sim_copy, point_idx, c) | |
| # if avg_dist > max_avg_dist: | |
| # max_avg_dist = avg_dist | |
| # best_cluster = c | |
| # best_point = point_idx | |
| # if best_point is not None: | |
| # best_cluster.append(best_point) | |
| # added[best_point] = True | |
| # similarity[best_point, :] = 0 | |
| # similarity[:, best_point] = 0 | |
| # else: | |
| # if similarity.max() < sim_bound: | |
| # break | |
| # i, j = np.unravel_index(np.argmax(similarity), similarity.shape) | |
| # clusters.append([int(i), int(j)]) | |
| # added[i] = True | |
| # added[j] = True | |
| # similarity[i, :] = 0 | |
| # similarity[:, i] = 0 | |
| # similarity[j, :] = 0 | |
| # similarity[:, j] = 0 | |
| # return clusters | |