import asyncio import re import os import torch if torch.version.cuda == '11.8': os.environ["TRITON_PTXAS_PATH"] = "/usr/local/cuda-11.8/bin/ptxas" os.environ['VLLM_USE_V1'] = '0' os.environ["CUDA_VISIBLE_DEVICES"] = '0' from vllm import AsyncLLMEngine, SamplingParams from vllm.engine.arg_utils import AsyncEngineArgs from vllm.model_executor.models.registry import ModelRegistry import time from deepseek_ocr import DeepseekOCRForCausalLM from PIL import Image, ImageDraw, ImageFont, ImageOps import numpy as np from tqdm import tqdm from process.ngram_norepeat import NoRepeatNGramLogitsProcessor from process.image_process import DeepseekOCRProcessor from config import MODEL_PATH, INPUT_PATH, OUTPUT_PATH, PROMPT, CROP_MODE ModelRegistry.register_model("DeepseekOCRForCausalLM", DeepseekOCRForCausalLM) def load_image(image_path): try: image = Image.open(image_path) corrected_image = ImageOps.exif_transpose(image) return corrected_image except Exception as e: print(f"error: {e}") try: return Image.open(image_path) except: return None def re_match(text): pattern = r'(<\|ref\|>(.*?)<\|/ref\|><\|det\|>(.*?)<\|/det\|>)' matches = re.findall(pattern, text, re.DOTALL) mathes_image = [] mathes_other = [] for a_match in matches: if '<|ref|>image<|/ref|>' in a_match[0]: mathes_image.append(a_match[0]) else: mathes_other.append(a_match[0]) return matches, mathes_image, mathes_other def extract_coordinates_and_label(ref_text, image_width, image_height): try: label_type = ref_text[1] cor_list = eval(ref_text[2]) except Exception as e: print(e) return None return (label_type, cor_list) def draw_bounding_boxes(image, refs): image_width, image_height = image.size img_draw = image.copy() draw = ImageDraw.Draw(img_draw) overlay = Image.new('RGBA', img_draw.size, (0, 0, 0, 0)) draw2 = ImageDraw.Draw(overlay) # except IOError: font = ImageFont.load_default() img_idx = 0 for i, ref in enumerate(refs): try: result = extract_coordinates_and_label(ref, image_width, image_height) if result: label_type, points_list = result color = (np.random.randint(0, 200), np.random.randint(0, 200), np.random.randint(0, 255)) color_a = color + (20, ) for points in points_list: x1, y1, x2, y2 = points x1 = int(x1 / 999 * image_width) y1 = int(y1 / 999 * image_height) x2 = int(x2 / 999 * image_width) y2 = int(y2 / 999 * image_height) if label_type == 'image': try: cropped = image.crop((x1, y1, x2, y2)) cropped.save(f"{OUTPUT_PATH}/images/{img_idx}.jpg") except Exception as e: print(e) pass img_idx += 1 try: if label_type == 'title': draw.rectangle([x1, y1, x2, y2], outline=color, width=4) draw2.rectangle([x1, y1, x2, y2], fill=color_a, outline=(0, 0, 0, 0), width=1) else: draw.rectangle([x1, y1, x2, y2], outline=color, width=2) draw2.rectangle([x1, y1, x2, y2], fill=color_a, outline=(0, 0, 0, 0), width=1) text_x = x1 text_y = max(0, y1 - 15) text_bbox = draw.textbbox((0, 0), label_type, font=font) text_width = text_bbox[2] - text_bbox[0] text_height = text_bbox[3] - text_bbox[1] draw.rectangle([text_x, text_y, text_x + text_width, text_y + text_height], fill=(255, 255, 255, 30)) draw.text((text_x, text_y), label_type, font=font, fill=color) except: pass except: continue img_draw.paste(overlay, (0, 0), overlay) return img_draw def process_image_with_refs(image, ref_texts): result_image = draw_bounding_boxes(image, ref_texts) return result_image async def stream_generate(image=None, prompt=''): engine_args = AsyncEngineArgs( model=MODEL_PATH, hf_overrides={"architectures": ["DeepseekOCRForCausalLM"]}, block_size=256, max_model_len=8192, enforce_eager=False, trust_remote_code=True, tensor_parallel_size=1, gpu_memory_utilization=0.75, ) engine = AsyncLLMEngine.from_engine_args(engine_args) logits_processors = [NoRepeatNGramLogitsProcessor(ngram_size=30, window_size=90, whitelist_token_ids= {128821, 128822})] #whitelist: