Spaces:
Running
on
Zero
Running
on
Zero
| # Install FlashAttention | |
| import subprocess | |
| subprocess.run( | |
| "pip install flash-attn==2.7.4.post1 --no-build-isolation", | |
| env={"FLASH_ATTENTION_SKIP_CUDA_BUILD": "TRUE"}, | |
| shell=True, | |
| ) | |
| import base64 | |
| from collections import Counter | |
| from io import BytesIO | |
| import re | |
| from PIL import Image, ImageDraw | |
| import gradio as gr | |
| import spaces | |
| import torch | |
| from transformers import Qwen2_5_VLForConditionalGeneration, Qwen2_5_VLProcessor | |
| from qwen_vl_utils import process_vision_info, smart_resize | |
| repo_id = "hal-utokyo/MangaLMM" | |
| processor = Qwen2_5_VLProcessor.from_pretrained(repo_id) | |
| # pre-load | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| model = Qwen2_5_VLForConditionalGeneration.from_pretrained( | |
| repo_id, | |
| torch_dtype=torch.bfloat16, | |
| attn_implementation="flash_attention_2", | |
| device_map=device, | |
| ) | |
| def pil2base64(image: Image.Image) -> str: | |
| buffered = BytesIO() | |
| image.save(buffered, format="PNG") | |
| return base64.b64encode(buffered.getvalue()).decode() | |
| def bbox2d_to_quad(bbox_2d): | |
| xmin, ymin, xmax, ymax = bbox_2d | |
| return [xmin, ymin, xmax, ymin, xmax, ymax, xmin, ymax] | |
| def normalize_repeated_symbols(text): | |
| text = re.sub(r'([~\~\〜\-\ー]+)', lambda m: m.group(1)[0], text) | |
| text = re.sub(r'[~~〜]', '~', text) | |
| text = re.sub(r'[-ー]', '-', text) | |
| return text | |
| def normalize_punctuation(text): | |
| conversion_map = { | |
| "!": "!", | |
| "?": "?", | |
| "…": "..." | |
| } | |
| text = re.sub("|".join(map(re.escape, conversion_map.keys())), lambda m: conversion_map[m.group()], text) | |
| text = re.sub(r'[・・.]', '・', text) | |
| return text | |
| def restore_chouon(text): | |
| # hirakana + katakana + kanji | |
| # jp_range = r"ぁ-んァ-ン一-龯㐀-䶵" # \u3400-\u4DBF = r"㐀-䶵" | |
| # Extended Unicode version: covers Hiragana, Katakana, and a wide range of Kanji (including Extension A) | |
| jp_range = r"\u3040-\u309F\u30A0-\u30FF\u3400-\u4DBF\u4E00-\u9FFF" | |
| pattern = rf"(?<=[{jp_range}])-(?=[{jp_range}])" | |
| return re.sub(pattern, "ー", text) | |
| def process_text(text: str) -> str: | |
| text = re.sub(r"[\s\u3000]+", "", text) | |
| text = normalize_repeated_symbols(text) | |
| text = normalize_punctuation(text) | |
| text = restore_chouon(text) | |
| return text | |
| def parse_ocr_text(text: str) -> list[list]: | |
| if not text.strip(): | |
| return [] | |
| # handle escape | |
| text = text.replace('\\"', '"') | |
| # find \n\t{ ... } blocks | |
| blocks = re.findall(r"\n\t\{.*?\}", text, re.DOTALL) | |
| # extract OCR text and bounding box | |
| ocrs = [] | |
| for block in blocks: | |
| block = block.strip() # remove \n\t | |
| bbox_match = re.search(r'"bbox_2d"\s*:\s*\[([^\]]+)\]', block, flags=re.DOTALL) | |
| text_match = re.search( | |
| r'"text_content"\s*:\s*"([^"]*)"', block, flags=re.DOTALL | |
| ) | |
| if bbox_match and text_match: | |
| try: | |
| bbox_list = [int(x.strip()) for x in bbox_match.group(1).split(",")] | |
| content = process_text(text_match.group(1)) | |
| quad = bbox2d_to_quad(bbox_list) | |
| ocrs.append([content, quad]) | |
| except: | |
| continue | |
| # remove duplicates (sometimes the model generates the same text multiple times) | |
| counter = Counter([ocr[0] for ocr in ocrs]) | |
| ocrs = [ocr for ocr in ocrs if counter[ocr[0]] < 10] | |
| return ocrs | |
| def inference_fn( | |
| image: Image.Image | None, | |
| text: str | None, | |
| # progress=gr.Progress(track_tqdm=True), | |
| ) -> tuple[str, str, Image.Image | None]: | |
| if image is None: | |
| gr.Warning("Please upload an image!", duration=10) | |
| return "Please upload an image!", "Please upload an image!", None | |
| if image.width * image.height > 2116800: | |
| gr.Warning("The image size is too large! We resize it to smaller size.", duration=10) | |
| resized_height, resized_width = smart_resize( | |
| height=image.height, | |
| width=image.width, | |
| factor=processor.image_processor.patch_size * processor.image_processor.merge_size, | |
| min_pixels=processor.image_processor.min_pixels, | |
| max_pixels=processor.image_processor.max_pixels, | |
| ) | |
| image = image.resize((resized_width, resized_height), resample=Image.Resampling.BICUBIC) | |
| if text is None or text.strip() == "": | |
| # OCR | |
| text = "Please perform OCR on this image and output the recognized Japanese text along with its position (grounding)." | |
| base64_image = pil2base64(image) | |
| messages = [ | |
| {"role": "user", "content": [ | |
| {"type": "image", "image": f"data:image;base64,{base64_image}"}, | |
| {"type": "text", "text": text}, | |
| ]}, | |
| ] | |
| text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| image_inputs, video_inputs = process_vision_info(messages) | |
| inputs = processor( | |
| text=[text], | |
| images=image_inputs, | |
| videos=video_inputs, | |
| padding=True, | |
| return_tensors="pt", | |
| ) | |
| inputs = inputs.to(model.device) | |
| generated_ids = model.generate(**inputs, max_new_tokens=4096) | |
| generated_ids_trimmed = [out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)] | |
| raw_output = processor.batch_decode( | |
| generated_ids_trimmed, | |
| skip_special_tokens=True, | |
| clean_up_tokenization_spaces=False, | |
| )[0] | |
| result_image = image_inputs[0].copy() | |
| ocrs = parse_ocr_text(raw_output) | |
| if not ocrs: | |
| return raw_output, "OCR feature was not performed.", result_image | |
| draw = ImageDraw.Draw(result_image) | |
| ocr_texts = [] | |
| for ocr_text, quad in ocrs: | |
| ocr_texts.append(f'{ocr_text} ({quad[0]}, {quad[1]}, {quad[4]}, {quad[5]})') | |
| for i in range(4): | |
| start_point = quad[i*2:i*2+2] | |
| end_point = quad[i*2+2:i*2+4] if i < 3 else quad[:2] | |
| draw.line(start_point + end_point, fill="red", width=4) | |
| draw.polygon(quad, outline="red", width=4) | |
| # draw.text((quad[0], quad[1]), ocr_text, fill="red") | |
| ocr_texts_str = "\n".join(ocr_texts) | |
| return "No question was entered.", ocr_texts_str, result_image | |
| with gr.Blocks() as demo: | |
| gr.Markdown("""# MangaLMM Official Demo | |
|  | |
| We propose MangaVQA and MangaLMM, which are a benchmark and a specialized LMM for multimodal manga understanding. | |
| This demo uses our [MangaLMM model](https://huggingface.co/hal-utokyo/MangaLMM) to perform OCR on an image of manga panels and answer a question about the image. | |
| Please ensure that the image contains fewer than 2116800 pixels. (e.g. 1600x1200, 1920x1080, etc.) If more, we resize it to smaller size. | |
| *Note: This model is for research purposes only and may return incorrect results. Please use it at your own risk.* | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_button = gr.Button(value="Submit") | |
| input_text = gr.Textbox( | |
| label="Input Text", lines=5, max_lines=5, | |
| placeholder="Please enter a question about your image.\nEmpty text will perform OCR.", | |
| ) | |
| input_image = gr.Image(label="Input Image", image_mode="RGB", type="pil") | |
| with gr.Column(): | |
| vqa_text = gr.Textbox(label="VQA Result", lines=2, max_lines=2) | |
| ocr_text = gr.Textbox(label="OCR Result", lines=3, max_lines=3) | |
| ocr_image = gr.Image(label="OCR Result", type="pil", show_label=False) | |
| input_button.click( | |
| fn=inference_fn, | |
| inputs=[input_image, input_text], | |
| outputs=[vqa_text, ocr_text, ocr_image], | |
| ) | |
| ocr_examples = gr.Examples( | |
| examples=[], | |
| fn=inference_fn, | |
| inputs=[input_image, input_text], | |
| outputs=[vqa_text, ocr_text, ocr_image], | |
| cache_examples=False, | |
| ) | |
| demo.queue().launch() | |