|
|
from flask import Flask, render_template, Response, flash, redirect, url_for, request, jsonify, send_from_directory |
|
|
import cv2 |
|
|
import numpy as np |
|
|
from unstructured.partition.pdf import partition_pdf |
|
|
import json |
|
|
import base64 |
|
|
import io |
|
|
import os |
|
|
from PIL import Image, ImageEnhance, ImageDraw |
|
|
from imutils.perspective import four_point_transform |
|
|
from dotenv import load_dotenv |
|
|
import pytesseract |
|
|
from transformers import AutoProcessor, AutoModelForImageTextToText, AutoModelForVision2Seq |
|
|
from langchain_community.document_loaders.image_captions import ImageCaptionLoader |
|
|
from werkzeug.utils import secure_filename |
|
|
import tempfile |
|
|
import torch |
|
|
from langchain_groq import ChatGroq |
|
|
from langgraph.prebuilt import create_react_agent |
|
|
import logging, time |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.DEBUG, |
|
|
format="%(asctime)s [%(levelname)s] %(message)s", |
|
|
handlers=[ |
|
|
logging.FileHandler("app.log"), |
|
|
logging.StreamHandler() |
|
|
] |
|
|
) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
groq_api_key = os.getenv("GROQ_API_KEY") |
|
|
|
|
|
llm = ChatGroq( |
|
|
model="meta-llama/llama-4-maverick-17b-128e-instruct", |
|
|
temperature=0, |
|
|
max_tokens=None, |
|
|
) |
|
|
|
|
|
app = Flask(__name__) |
|
|
|
|
|
pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe" |
|
|
poppler_path = r"C:\poppler-23.11.0\Library\bin" |
|
|
|
|
|
count = 0 |
|
|
|
|
|
OUTPUT_FOLDER = "OUTPUTS" |
|
|
DETECTED_IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "DETECTED_IMAGE") |
|
|
IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "SCANNED_IMAGE") |
|
|
JSON_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "EXTRACTED_JSON") |
|
|
|
|
|
for path in [OUTPUT_FOLDER, IMAGE_FOLDER_PATH, DETECTED_IMAGE_FOLDER_PATH, JSON_FOLDER_PATH]: |
|
|
os.makedirs(path, exist_ok=True) |
|
|
|
|
|
|
|
|
try: |
|
|
smolvlm256m_processor = AutoProcessor.from_pretrained( |
|
|
"HuggingFaceTB/SmolVLM-256M-Instruct") |
|
|
|
|
|
smolvlm256m_model = AutoModelForVision2Seq.from_pretrained( |
|
|
"HuggingFaceTB/SmolVLM-256M-Instruct", |
|
|
torch_dtype=torch.bfloat16 if hasattr( |
|
|
torch, "bfloat16") else torch.float32, |
|
|
_attn_implementation="eager" |
|
|
).to("cpu") |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"❌ Failed to load SmolVLM model: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_smolvlm_caption(image: Image.Image, prompt: str = "") -> str: |
|
|
try: |
|
|
|
|
|
if "<image>" not in prompt: |
|
|
prompt = f"<image> {prompt.strip()}" |
|
|
|
|
|
num_image_tokens = prompt.count("<image>") |
|
|
if num_image_tokens != 1: |
|
|
raise ValueError( |
|
|
f"Prompt must contain exactly 1 <image> token. Found {num_image_tokens}") |
|
|
|
|
|
inputs = smolvlm256m_processor( |
|
|
images=[image], text=[prompt], return_tensors="pt").to("cpu") |
|
|
output_ids = smolvlm256m_model.generate(**inputs, max_new_tokens=100) |
|
|
return smolvlm256m_processor.decode(output_ids[0], skip_special_tokens=True) |
|
|
except Exception as e: |
|
|
return f"❌ Error during caption generation: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_images_from_pdf(pdf_path, output_json_path): |
|
|
''' Extract images from PDF and generate structured sprite JSON ''' |
|
|
|
|
|
try: |
|
|
pdf_filename = os.path.splitext(os.path.basename(pdf_path))[ |
|
|
0] |
|
|
pdf_dir_path = os.path.dirname(pdf_path).replace("/", "\\") |
|
|
|
|
|
|
|
|
extracted_image_subdir = os.path.join( |
|
|
DETECTED_IMAGE_FOLDER_PATH, pdf_filename) |
|
|
json_subdir = os.path.join(JSON_FOLDER_PATH, pdf_filename) |
|
|
os.makedirs(extracted_image_subdir, exist_ok=True) |
|
|
os.makedirs(json_subdir, exist_ok=True) |
|
|
|
|
|
|
|
|
output_json_path = os.path.join(json_subdir, "extracted.json") |
|
|
final_json_path = os.path.join(json_subdir, "extracted_sprites.json") |
|
|
|
|
|
try: |
|
|
elements = partition_pdf( |
|
|
filename=pdf_path, |
|
|
strategy="hi_res", |
|
|
extract_image_block_types=["Image"], |
|
|
extract_image_block_to_payload=True, |
|
|
) |
|
|
except Exception as e: |
|
|
raise RuntimeError( |
|
|
f"❌ Failed to extract images from PDF: {str(e)}") |
|
|
|
|
|
try: |
|
|
start_time = time.perf_counter() |
|
|
with open(output_json_path, "w") as f: |
|
|
json.dump([element.to_dict() |
|
|
for element in elements], f, indent=4) |
|
|
elapsed = time.perf_counter() - start_time |
|
|
logger.info(f"✅ extracted.json write in {elapsed:.2f} seconds") |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"❌ Failed to write extracted.json: {str(e)}") |
|
|
|
|
|
try: |
|
|
|
|
|
with open(output_json_path, 'r') as file: |
|
|
file_elements = json.load(file) |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"❌ Failed to read extracted.json: {str(e)}") |
|
|
|
|
|
|
|
|
manipulated_json = {} |
|
|
|
|
|
|
|
|
system_prompt = """ |
|
|
You are an expert in visual scene understanding. |
|
|
Your Job is to analyze an image and respond acoording if asked for name give simple name by analyzing it and if ask for descrption generate a short description covering its elements. |
|
|
|
|
|
Guidelines: |
|
|
- Focus only the images given in Square Shape. |
|
|
- Don't Consider Blank areas in Image as. |
|
|
- Don't include generic summary or explanation outside the fields. |
|
|
Return only string. |
|
|
""" |
|
|
|
|
|
agent = create_react_agent( |
|
|
model=llm, |
|
|
tools=[], |
|
|
prompt=system_prompt |
|
|
) |
|
|
|
|
|
|
|
|
if os.path.exists(final_json_path): |
|
|
with open(final_json_path, "r") as existing_file: |
|
|
manipulated = json.load(existing_file) |
|
|
|
|
|
existing_keys = [int(k.replace("Sprite ", "")) |
|
|
for k in manipulated.keys()] |
|
|
start_count = max(existing_keys, default=0) + 1 |
|
|
else: |
|
|
start_count = 1 |
|
|
|
|
|
sprite_count = start_count |
|
|
start_time = time.perf_counter() |
|
|
for i, element in enumerate(file_elements): |
|
|
if "image_base64" in element["metadata"]: |
|
|
try: |
|
|
image_data = base64.b64decode( |
|
|
element["metadata"]["image_base64"]) |
|
|
image = Image.open(io.BytesIO(image_data)).convert("RGB") |
|
|
image.show(title=f"Extracted Image {i+1}") |
|
|
image_path = os.path.join( |
|
|
extracted_image_subdir, f"Sprite_{i+1}.png") |
|
|
image.save(image_path) |
|
|
with open(image_path, "rb") as image_file: |
|
|
image_bytes = image_file.read() |
|
|
img_base64 = base64.b64encode(image_bytes).decode("utf-8") |
|
|
|
|
|
|
|
|
|
|
|
def clean_caption_output(raw_output: str, prompt: str) -> str: |
|
|
answer = raw_output.replace(prompt, '').replace( |
|
|
"<image>", '').strip(" :-\n") |
|
|
return answer |
|
|
|
|
|
prompt_description = "Give a brief Captioning." |
|
|
prompt_name = "give a short name caption of this Image." |
|
|
|
|
|
content1 = [ |
|
|
{ |
|
|
"type": "text", |
|
|
"text": f"{prompt_description}" |
|
|
}, |
|
|
{ |
|
|
"type": "image_url", |
|
|
"image_url": { |
|
|
"url": f"data:image/jpeg;base64,{img_base64}" |
|
|
} |
|
|
} |
|
|
] |
|
|
response1 = agent.invoke( |
|
|
{"messages": [{"role": "user", "content": content1}]}) |
|
|
print(response1) |
|
|
description = response1["messages"][-1].content |
|
|
|
|
|
content2 = [ |
|
|
{ |
|
|
"type": "text", |
|
|
"text": f"{prompt_name}" |
|
|
}, |
|
|
{ |
|
|
"type": "image_url", |
|
|
"image_url": { |
|
|
"url": f"data:image/jpeg;base64,{img_base64}" |
|
|
} |
|
|
} |
|
|
] |
|
|
|
|
|
response2 = agent.invoke( |
|
|
{"messages": [{"role": "user", "content": content2}]}) |
|
|
print(response2) |
|
|
name = response2["messages"][-1].content |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
manipulated_json[f"Sprite {sprite_count}"] = { |
|
|
"name": name, |
|
|
"base64": element["metadata"]["image_base64"], |
|
|
"file-path": pdf_dir_path, |
|
|
"description": description |
|
|
} |
|
|
sprite_count += 1 |
|
|
except Exception as e: |
|
|
print(f"⚠️ Error processing Sprite {i+1}: {str(e)}") |
|
|
elapsed = time.perf_counter() - start_time |
|
|
logger.info(f"✅ extracted_sprites.json write in {elapsed:.2f} seconds") |
|
|
|
|
|
|
|
|
with open(final_json_path, "w") as sprite_file: |
|
|
json.dump(manipulated_json, sprite_file, indent=4) |
|
|
|
|
|
print(f"✅ Manipulated sprite JSON saved: {final_json_path}") |
|
|
return final_json_path, manipulated_json |
|
|
|
|
|
except Exception as e: |
|
|
raise RuntimeError(f"❌ Error in extract_images_from_pdf: {str(e)}") |
|
|
|
|
|
|
|
|
def similarity_matching(input_json_path: str) -> str: |
|
|
import uuid |
|
|
import shutil |
|
|
import tempfile |
|
|
from langchain_experimental.open_clip.open_clip import OpenCLIPEmbeddings |
|
|
from matplotlib.offsetbox import OffsetImage, AnnotationBbox |
|
|
from io import BytesIO |
|
|
|
|
|
logger.info("🔍 Running similarity matching...") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
backdrop_images_path = os.getenv("BACKDROP_FOLDER_PATH", "/app/reference/backdrops") |
|
|
sprite_images_path = os.getenv("SPRITE_FOLDER_PATH", "/app/reference/sprites") |
|
|
image_dirs = [backdrop_images_path, sprite_images_path] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
random_id = str(uuid.uuid4()).replace('-', '') |
|
|
project_folder = os.path.join("outputs", f"project_{random_id}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
os.makedirs(project_folder, exist_ok=True) |
|
|
project_json_path = os.path.join(project_folder, "project.json") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with open(input_json_path, 'r') as f: |
|
|
sprites_data = json.load(f) |
|
|
|
|
|
sprite_ids, texts, sprite_base64 = [], [], [] |
|
|
start_time = time.perf_counter() |
|
|
for sid, sprite in sprites_data.items(): |
|
|
sprite_ids.append(sid) |
|
|
texts.append( |
|
|
"This is " + sprite.get("description", sprite.get("name", ""))) |
|
|
sprite_base64.append(sprite["base64"]) |
|
|
elapsed = time.perf_counter() - start_time |
|
|
logger.info(f"✅ Append Sprite's Name and Description in {elapsed:.2f} seconds") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
clip_embd = OpenCLIPEmbeddings() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
folder_image_paths = [] |
|
|
for image_dir in image_dirs: |
|
|
for root, _, files in os.walk(image_dir): |
|
|
for fname in files: |
|
|
if fname.lower().endswith((".png", ".jpg", ".jpeg")): |
|
|
folder_image_paths.append(os.path.join(root, fname)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
temp_dir = tempfile.mkdtemp() |
|
|
sprite_image_paths = [] |
|
|
start_time = time.perf_counter() |
|
|
for idx, b64 in enumerate(sprite_base64): |
|
|
image_data = base64.b64decode(b64.split(",")[-1]) |
|
|
img = Image.open(BytesIO(image_data)).convert("RGB") |
|
|
temp_path = os.path.join(temp_dir, f"sprite_{idx}.png") |
|
|
img.save(temp_path) |
|
|
sprite_image_paths.append(temp_path) |
|
|
elapsed = time.perf_counter() - start_time |
|
|
logger.info(f"✅ Decoded Sprite Base64 in {elapsed:.2f} seconds") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sprite_features = clip_embd.embed_image(sprite_image_paths) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with open(f"{OUTPUT_FOLDER}/embeddings.json", "r") as f: |
|
|
embedding_json = json.load(f) |
|
|
|
|
|
img_matrix = np.array([img["embeddings"] for img in embedding_json]) |
|
|
sprite_matrix = np.array(sprite_features) |
|
|
|
|
|
if sprite_matrix.size == 0 or img_matrix.size == 0: |
|
|
raise RuntimeError("❌ No valid embeddings found for sprites or reference images.") |
|
|
|
|
|
try: |
|
|
similarity = np.matmul(sprite_matrix, img_matrix.T) |
|
|
except ValueError as ve: |
|
|
if "matmul" in str(ve) and "size" in str(ve): |
|
|
logger.error("❌ Matrix multiplication failed due to shape mismatch. Likely due to empty or invalid embeddings.") |
|
|
raise RuntimeError("Matrix shape mismatch: CLIP embedding input is invalid or empty.") |
|
|
else: |
|
|
raise |
|
|
most_similar_indices = np.argmax(similarity, axis=1) |
|
|
|
|
|
|
|
|
project_data = [] |
|
|
copied_folders = set() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for sprite_idx, matched_idx in enumerate(most_similar_indices): |
|
|
matched_image_path = folder_image_paths[matched_idx] |
|
|
matched_image_path = os.path.normpath(matched_image_path) |
|
|
|
|
|
matched_folder = os.path.dirname(matched_image_path) |
|
|
folder_name = os.path.basename(matched_folder) |
|
|
|
|
|
if matched_folder in copied_folders: |
|
|
continue |
|
|
copied_folders.add(matched_folder) |
|
|
logger.info(f"Matched image path: {matched_image_path}") |
|
|
|
|
|
sprite_json_path = os.path.join(matched_folder, 'sprite.json') |
|
|
if not os.path.exists(sprite_json_path): |
|
|
logger.warning(f"sprite.json not found in: {matched_folder}") |
|
|
continue |
|
|
|
|
|
with open(sprite_json_path, 'r') as f: |
|
|
sprite_data = json.load(f) |
|
|
print(f"SPRITE DATA: \n{sprite_data}") |
|
|
|
|
|
for fname in os.listdir(matched_folder): |
|
|
fpath = os.path.join(matched_folder, fname) |
|
|
if os.path.isfile(fpath) and fname not in {os.path.basename(matched_image_path), 'sprite.json'}: |
|
|
shutil.copy2(fpath, os.path.join(project_folder, fname)) |
|
|
logger.info(f"Copied Sprite asset: {fname}") |
|
|
project_data.append(sprite_data) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
backdrop_data = [] |
|
|
|
|
|
for backdrop_idx, matched_idx in enumerate(most_similar_indices): |
|
|
matched_image_path = os.path.normpath(folder_image_paths[matched_idx]) |
|
|
|
|
|
|
|
|
if matched_image_path.startswith(os.path.normpath(backdrop_images_path)): |
|
|
matched_folder = os.path.dirname(matched_image_path) |
|
|
folder_name = os.path.basename(matched_folder) |
|
|
|
|
|
logger.info(f"Backdrop matched image: {matched_image_path}") |
|
|
|
|
|
|
|
|
for fname in os.listdir(matched_folder): |
|
|
fpath = os.path.join(matched_folder, fname) |
|
|
if os.path.isfile(fpath) and fname not in {os.path.basename(matched_image_path), 'project.json'}: |
|
|
shutil.copy2(fpath, os.path.join(project_folder, fname)) |
|
|
logger.info(f"Copied Backdrop asset: {fname}") |
|
|
|
|
|
|
|
|
backdrop_json_path = os.path.join(matched_folder, 'project.json') |
|
|
if os.path.exists(backdrop_json_path): |
|
|
with open(backdrop_json_path, 'r') as f: |
|
|
backdrop_json_data = json.load(f) |
|
|
print(f"SPRITE DATA: \n{backdrop_json_data}") |
|
|
if "targets" in backdrop_json_data: |
|
|
for target in backdrop_json_data["targets"]: |
|
|
if target.get("isStage") == True: |
|
|
backdrop_data.append(target) |
|
|
else: |
|
|
logger.warning(f"project.json not found in: {matched_folder}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
final_project = { |
|
|
"targets": [], |
|
|
"monitors": [], |
|
|
"extensions": [], |
|
|
"meta": { |
|
|
"semver": "3.0.0", |
|
|
"vm": "11.3.0", |
|
|
"agent": "OpenAI ScratchVision Agent" |
|
|
} |
|
|
} |
|
|
start_time = time.perf_counter() |
|
|
for sprite in project_data: |
|
|
if not sprite.get("isStage", False): |
|
|
final_project["targets"].append(sprite) |
|
|
elapsed = time.perf_counter() - start_time |
|
|
logger.info(f"✅ Append sprite 'targets' in {elapsed:.2f} seconds") |
|
|
|
|
|
if backdrop_data: |
|
|
all_costumes, sounds = [], [] |
|
|
for idx, bd in enumerate(backdrop_data): |
|
|
all_costumes.extend(bd.get("costumes", [])) |
|
|
if idx == 0 and "sounds" in bd: |
|
|
sounds = bd["sounds"] |
|
|
final_project["targets"].append({ |
|
|
"isStage": True, |
|
|
"name": "Stage", |
|
|
"variables": {}, |
|
|
"lists": {}, |
|
|
"broadcasts": {}, |
|
|
"blocks": {}, |
|
|
"comments": {}, |
|
|
"currentCostume": 1 if len(all_costumes) > 1 else 0, |
|
|
"costumes": all_costumes, |
|
|
"sounds": sounds, |
|
|
"volume": 100, |
|
|
"layerOrder": 0, |
|
|
"tempo": 60, |
|
|
"videoTransparency": 50, |
|
|
"videoState": "on", |
|
|
"textToSpeechLanguage": None |
|
|
}) |
|
|
|
|
|
with open(project_json_path, 'w') as f: |
|
|
json.dump(final_project, f, indent=2) |
|
|
|
|
|
logger.info(f"🎉 Final project saved: {project_json_path}") |
|
|
return project_json_path |
|
|
|
|
|
|
|
|
@app.route('/') |
|
|
def index(): |
|
|
return render_template('app_index.html') |
|
|
|
|
|
|
|
|
@app.route('/process_pdf', methods=['POST']) |
|
|
def process_pdf(): |
|
|
try: |
|
|
logger.info("Received request to process PDF.") |
|
|
if 'pdf_file' not in request.files: |
|
|
logger.warning("No PDF file found in request.") |
|
|
return jsonify({"error": "Missing PDF file in form-data with key 'pdf_file'"}), 400 |
|
|
|
|
|
pdf_file = request.files['pdf_file'] |
|
|
if pdf_file.filename == '': |
|
|
return jsonify({"error": "Empty filename"}), 400 |
|
|
|
|
|
|
|
|
filename = secure_filename(pdf_file.filename) |
|
|
temp_dir = tempfile.mkdtemp() |
|
|
saved_pdf_path = os.path.join(temp_dir, filename) |
|
|
pdf_file.save(saved_pdf_path) |
|
|
|
|
|
logger.info(f"Saved uploaded PDF to: {saved_pdf_path}") |
|
|
|
|
|
|
|
|
json_path = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.info("Received request to process PDF.") |
|
|
|
|
|
return jsonify({ |
|
|
"message": "✅ PDF processed successfully", |
|
|
"output_json": "output_path", |
|
|
"sprites": "result", |
|
|
"project_output_json": "project_output", |
|
|
"test_url":r"https://prthm11-scratch-vision-game.hf.space/download_sb3/Event_test" |
|
|
}) |
|
|
except Exception as e: |
|
|
logger.exception("❌ Failed to process PDF") |
|
|
return jsonify({"error": f"❌ Failed to process PDF: {str(e)}"}), 500 |
|
|
|
|
|
|
|
|
@app.route("/download_sb3/<project_id>", methods=["GET"]) |
|
|
def download_sb3(project_id): |
|
|
""" |
|
|
Allows users to download the generated .sb3 Scratch project file. |
|
|
""" |
|
|
sb3_filename = f"{project_id}.sb3" |
|
|
sb3_filepath = os.path.join("game_samples", sb3_filename) |
|
|
|
|
|
try: |
|
|
if os.path.exists(sb3_filepath): |
|
|
logger.info(f"Serving SB3 file for project ID: {project_id}") |
|
|
|
|
|
return send_from_directory( |
|
|
directory="game_samples", |
|
|
path=sb3_filename, |
|
|
as_attachment=True, |
|
|
download_name=sb3_filename |
|
|
) |
|
|
else: |
|
|
logger.warning(f"SB3 file not found for ID: {project_id}") |
|
|
return jsonify({"error": "Scratch project file not found"}), 404 |
|
|
except Exception as e: |
|
|
logger.error(f"Error serving SB3 file for ID {project_id}: {e}") |
|
|
return jsonify({"error": "Failed to retrieve Scratch project file"}), 500 |
|
|
|
|
|
if __name__ == '__main__': |
|
|
app.run(host='0.0.0.0', port=7860, debug=True) |