|
|
from flask import Flask, request, jsonify, render_template, send_from_directory, send_file |
|
|
import cv2, json,base64,io,os,tempfile,logging, re |
|
|
import numpy as np |
|
|
from unstructured.partition.pdf import partition_pdf |
|
|
from PIL import Image, ImageOps, ImageEnhance |
|
|
from dotenv import load_dotenv |
|
|
from werkzeug.utils import secure_filename |
|
|
from langchain_groq import ChatGroq |
|
|
from langgraph.prebuilt import create_react_agent |
|
|
from pdf2image import convert_from_path, convert_from_bytes |
|
|
from typing import Dict, TypedDict, Optional, Any, List, Tuple |
|
|
from collections import defaultdict |
|
|
from langgraph.graph import StateGraph, END |
|
|
import uuid |
|
|
import shutil, time, functools |
|
|
from io import BytesIO |
|
|
from pathlib import Path |
|
|
from utils.block_relation_builder import block_builder, separate_scripts, transform_logic_to_action_flow, analyze_opcode_counts |
|
|
from difflib import get_close_matches |
|
|
import torch |
|
|
from transformers import AutoImageProcessor, AutoModel |
|
|
import torch |
|
|
import json |
|
|
import cv2 |
|
|
from imagededup.methods import PHash |
|
|
from image_match.goldberg import ImageSignature |
|
|
import sys |
|
|
import math |
|
|
import hashlib |
|
|
|
|
|
|
|
|
|
|
|
DINOV2_MODEL = "facebook/dinov2-small" |
|
|
|
|
|
|
|
|
MAX_PHASH_BITS = 64 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("Initializing models and helpers...") |
|
|
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
if DEVICE.type == "cpu": |
|
|
torch.set_num_threads(4) |
|
|
|
|
|
dinov2_processor = AutoImageProcessor.from_pretrained(DINOV2_MODEL) |
|
|
dinov2_model = AutoModel.from_pretrained(DINOV2_MODEL) |
|
|
dinov2_model.to(DEVICE) |
|
|
dinov2_model.eval() |
|
|
|
|
|
phash = PHash() |
|
|
gis = ImageSignature() |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
groq_api_key = os.getenv("GROQ_API_KEY") |
|
|
|
|
|
llm = ChatGroq( |
|
|
model="meta-llama/llama-4-scout-17b-16e-instruct", |
|
|
temperature=0, |
|
|
max_tokens=None, |
|
|
) |
|
|
|
|
|
app = Flask(__name__) |
|
|
|
|
|
backdrop_images_path = r"app\blocks\Backdrops" |
|
|
sprite_images_path = r"app\blocks\sprites" |
|
|
code_blocks_image_path = r"app\blocks\code_blocks" |
|
|
|
|
|
count = 0 |
|
|
|
|
|
from pathlib import Path |
|
|
BASE_DIR = Path(os.getenv("APP_BASE_DIR", Path(__file__).resolve().parent)) |
|
|
BASE_DIR = Path("/app") |
|
|
LOGS_DIR = Path(os.getenv("LOGS_DIR", "/tmp/logs")).resolve() |
|
|
LOGS_DIR.mkdir(parents=True, exist_ok=True) |
|
|
STATIC_DIR = BASE_DIR / "static" |
|
|
GEN_PROJECT_DIR = BASE_DIR / "generated_projects" |
|
|
|
|
|
BLOCKS_DIR = BASE_DIR / "blocks" |
|
|
BACKDROP_DIR = BLOCKS_DIR / "Backdrops" |
|
|
SPRITE_DIR = BLOCKS_DIR / "sprites" |
|
|
CODE_BLOCKS_DIR = BLOCKS_DIR / "code_blocks" |
|
|
SOUND_DIR = BLOCKS_DIR / "sound" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
OUTPUT_DIR = BASE_DIR / "outputs" |
|
|
|
|
|
|
|
|
MODEL = None |
|
|
FAISS_INDEX = None |
|
|
IMAGE_PATHS = None |
|
|
|
|
|
|
|
|
for d in ( |
|
|
BLOCKS_DIR, |
|
|
STATIC_DIR, |
|
|
GEN_PROJECT_DIR, |
|
|
BACKDROP_DIR, |
|
|
SPRITE_DIR, |
|
|
CODE_BLOCKS_DIR, |
|
|
SOUND_DIR, |
|
|
OUTPUT_DIR, |
|
|
): |
|
|
d.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
def log_execution_time(func): |
|
|
@functools.wraps(func) |
|
|
def wrapper(*args, **kwargs): |
|
|
start_time = time.time() |
|
|
result = func(*args, **kwargs) |
|
|
end_time = time.time() |
|
|
logger.info(f"⏱ {func.__name__} executed in {end_time - start_time:.2f} seconds") |
|
|
return result |
|
|
return wrapper |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format="%(asctime)s [%(levelname)s] %(message)s", |
|
|
handlers=[ |
|
|
logging.FileHandler(str(LOGS_DIR / "app.log")), |
|
|
logging.StreamHandler() |
|
|
] |
|
|
) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class GameState(TypedDict): |
|
|
project_json: dict |
|
|
description: str |
|
|
project_id: str |
|
|
project_image: str |
|
|
pseudo_code: dict |
|
|
action_plan: Optional[Dict] |
|
|
temporary_node: Optional[Dict] |
|
|
page_count: int |
|
|
processing: bool |
|
|
temp_pseudo_code: list |
|
|
|
|
|
SYSTEM_PROMPT ="""Your task is to process OCR-extracted text from images of Scratch 3.0 code blocks and produce precisely formatted pseudocode JSON. |
|
|
|
|
|
### Core Role |
|
|
- Treat this as an OCR refinement task: the input may contain typos or spacing issues. |
|
|
- Intelligently correct OCR mistakes to align with valid Scratch 3.0 block syntax. |
|
|
|
|
|
### Universal Rules |
|
|
1. **Code Detection:** If no Scratch blocks are detected, the `pseudocode` value must be "No Code-blocks". |
|
|
2. **Script Ownership:** Determine the target from "Script for:". If it matches a `Stage_costumes` name, set `name_variable` to "Stage". |
|
|
3. **Pseudocode Structure:** |
|
|
- The pseudocode must be a single JSON string with `\n` for newlines. |
|
|
- Indent nested blocks with 4 spaces. |
|
|
- Every script (hat block) and every C-block (if, repeat, forever) MUST have a corresponding `end` at the correct indentation level. |
|
|
4. **Formatting Syntax:** |
|
|
- Numbers & Text: `(5)`, `(hello)` |
|
|
- Variables & Dropdowns: `[score v]`, `[space v]` |
|
|
- Reporters: `((x position))` |
|
|
- Booleans: `<condition>` |
|
|
5. **Final Output:** Your response must ONLY be the valid JSON object and nothing else.""" |
|
|
|
|
|
SYSTEM_PROMPT_JSON_CORRECTOR = """ |
|
|
You are a JSON correction assistant. Your ONLY task is to fix malformed JSON and return it in the correct format. |
|
|
|
|
|
REQUIRED OUTPUT FORMAT: |
|
|
{ |
|
|
"refined_logic": { |
|
|
"name_variable": "sprite_name_here", |
|
|
"pseudocode": "pseudocode_string_here" |
|
|
} |
|
|
} |
|
|
|
|
|
RULES: |
|
|
1. Extract the sprite name and pseudocode from the input |
|
|
2. Return ONLY valid JSON in the exact format above |
|
|
3. No explanations, no extra text, no other fields |
|
|
4. If you can't find the data, use "Unknown" for name_variable and "No pseudocode found" for pseudocode |
|
|
""" |
|
|
|
|
|
|
|
|
agent = create_react_agent( |
|
|
model=llm, |
|
|
tools=[], |
|
|
prompt=SYSTEM_PROMPT |
|
|
) |
|
|
|
|
|
agent_json_resolver = create_react_agent( |
|
|
model=llm, |
|
|
tools=[], |
|
|
prompt=SYSTEM_PROMPT_JSON_CORRECTOR |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def make_json_serializable(obj): |
|
|
"""Recursively convert numpy and other objects into JSON-serializable types.""" |
|
|
if obj is None: |
|
|
return None |
|
|
if isinstance(obj, (str, int, float, bool)): |
|
|
return obj |
|
|
if isinstance(obj, np.ndarray): |
|
|
return obj.tolist() |
|
|
if isinstance(obj, dict): |
|
|
return {str(k): make_json_serializable(v) for k, v in obj.items()} |
|
|
if isinstance(obj, (list, tuple)): |
|
|
return [make_json_serializable(v) for v in obj] |
|
|
|
|
|
try: |
|
|
return obj.tolist() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
return str(obj) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def pil_to_base64(pil_img, fmt="PNG"): |
|
|
buffer = io.BytesIO() |
|
|
pil_img.save(buffer, format=fmt) |
|
|
return base64.b64encode(buffer.getvalue()).decode("utf-8") |
|
|
|
|
|
def base64_to_pil(b64): |
|
|
try: |
|
|
data = base64.b64decode(b64) |
|
|
return Image.open(io.BytesIO(data)) |
|
|
except Exception as e: |
|
|
print(f"[base64_to_pil] Error: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_image_pil(path): |
|
|
try: |
|
|
return Image.open(path) |
|
|
except Exception as e: |
|
|
print(f"[load_image_pil] Could not open {path}: {e}") |
|
|
return None |
|
|
|
|
|
def add_background(pil_img, bg_color=(255,255,255), size=None): |
|
|
if pil_img is None: |
|
|
return None |
|
|
try: |
|
|
target = size if size is not None else pil_img.size |
|
|
bg = Image.new("RGB", target, bg_color) |
|
|
img_rgba = pil_img.convert("RGBA") |
|
|
if img_rgba.size != target: |
|
|
x = (target[0] - img_rgba.size[0]) // 2 |
|
|
y = (target[1] - img_rgba.size[1]) // 2 |
|
|
else: |
|
|
x, y = 0, 0 |
|
|
mask = img_rgba.split()[3] if img_rgba.mode == "RGBA" else None |
|
|
bg.paste(img_rgba.convert("RGB"), (x,y), mask=mask) |
|
|
return bg |
|
|
except Exception as e: |
|
|
print(f"[add_background] Error: {e}") |
|
|
return None |
|
|
|
|
|
def preprocess_for_hash(pil_img, size=(256,256)): |
|
|
try: |
|
|
img = pil_img.convert("RGB") |
|
|
img = ImageOps.grayscale(img) |
|
|
img = ImageOps.equalize(img) |
|
|
img = img.resize(size) |
|
|
return np.array(img).astype(np.uint8) |
|
|
except Exception as e: |
|
|
print(f"[preprocess_for_hash] Error: {e}") |
|
|
return None |
|
|
|
|
|
def preprocess_for_model(pil_img): |
|
|
try: |
|
|
if pil_img.mode == "RGBA": |
|
|
pil_img = pil_img.convert("RGB") |
|
|
elif pil_img.mode == "L": |
|
|
pil_img = pil_img.convert("RGB") |
|
|
else: |
|
|
pil_img = pil_img.convert("RGB") |
|
|
return pil_img |
|
|
except Exception as e: |
|
|
print(f"[preprocess_for_model] Error: {e}") |
|
|
return None |
|
|
|
|
|
def get_dinov2_embedding_from_pil(pil_img): |
|
|
try: |
|
|
if pil_img is None: |
|
|
return None |
|
|
inputs = dinov2_processor(images=pil_img, return_tensors="pt").to(DEVICE) |
|
|
with torch.no_grad(): |
|
|
outputs = dinov2_model(**inputs) |
|
|
|
|
|
emb = outputs.last_hidden_state[:,0,:].squeeze(0).cpu().numpy() |
|
|
n = np.linalg.norm(emb) |
|
|
if n == 0 or np.isnan(n): |
|
|
return None |
|
|
return (emb / n).astype(float) |
|
|
except Exception as e: |
|
|
print(f"[get_dinov2_embedding_from_pil] Error: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def pil_to_bgr_np(pil_img): |
|
|
arr = np.array(pil_img.convert("RGB")) |
|
|
return cv2.cvtColor(arr, cv2.COLOR_RGB2BGR) |
|
|
|
|
|
def bgr_np_to_pil(bgr_np): |
|
|
rgb = cv2.cvtColor(bgr_np, cv2.COLOR_BGR2RGB) |
|
|
return Image.fromarray(rgb) |
|
|
|
|
|
def upscale_image_cv(bgr_np, scale=2): |
|
|
h,w = bgr_np.shape[:2] |
|
|
return cv2.resize(bgr_np, (w*scale, h*scale), interpolation=cv2.INTER_CUBIC) |
|
|
|
|
|
def reduce_noise_cv(bgr_np): |
|
|
return cv2.fastNlMeansDenoisingColored(bgr_np, None, 10,10,7,21) |
|
|
|
|
|
def sharpen_cv(bgr_np): |
|
|
kernel = np.array([[0,-1,0],[-1,5,-1],[0,-1,0]]) |
|
|
return cv2.filter2D(bgr_np, -1, kernel) |
|
|
|
|
|
def enhance_contrast_cv(bgr_np): |
|
|
pil_img = Image.fromarray(cv2.cvtColor(bgr_np, cv2.COLOR_BGR2RGB)) |
|
|
enhancer = ImageEnhance.Contrast(pil_img) |
|
|
enhanced = enhancer.enhance(1.5) |
|
|
return cv2.cvtColor(np.array(enhanced), cv2.COLOR_RGB2BGR) |
|
|
|
|
|
def process_image_cv2_from_pil(pil_img, scale=2): |
|
|
try: |
|
|
bgr = pil_to_bgr_np(pil_img) |
|
|
bgr = upscale_image_cv(bgr, scale=scale) if scale != 1 else bgr |
|
|
bgr = reduce_noise_cv(bgr) |
|
|
bgr = sharpen_cv(bgr) |
|
|
bgr = enhance_contrast_cv(bgr) |
|
|
return bgr_np_to_pil(bgr) |
|
|
except Exception as e: |
|
|
print(f"[process_image_cv2_from_pil] Error: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def cosine_similarity(a, b): |
|
|
return float(np.dot(a, b)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_query_search_flow( |
|
|
query_path: Optional[str] = None, |
|
|
query_b64: Optional[str] = None, |
|
|
processed_dir: str = "./processed", |
|
|
embeddings_dict: Dict[str, np.ndarray] = None, |
|
|
hash_dict: Dict[str, Any] = None, |
|
|
signature_obj_map: Dict[str, Any] = None, |
|
|
gis: Any = None, |
|
|
phash: Any = None, |
|
|
MAX_PHASH_BITS: int = 64, |
|
|
k: int = 10, |
|
|
) -> Tuple[ |
|
|
List[Tuple[str, float]], |
|
|
List[Tuple[str, Any, float]], |
|
|
List[Tuple[str, Any, float]], |
|
|
List[Tuple[str, float, float, float, float]], |
|
|
]: |
|
|
""" |
|
|
Run the full query/search flow (base64 -> preprocess -> embed -> scoring). |
|
|
Accepts either query_path (file on disk) OR query_b64 (base64 string). If both are |
|
|
provided, query_b64 takes precedence. |
|
|
|
|
|
Returns: |
|
|
embedding_results_sorted, |
|
|
phash_results_sorted, |
|
|
imgmatch_results_sorted, |
|
|
combined_results_sorted |
|
|
""" |
|
|
|
|
|
|
|
|
if (query_path is None or query_path == "") and (query_b64 is None or query_b64 == ""): |
|
|
raise ValueError("Either query_path or query_b64 must be provided.") |
|
|
|
|
|
|
|
|
os.makedirs(processed_dir, exist_ok=True) |
|
|
|
|
|
print("\n--- Query/Search Phase ---") |
|
|
|
|
|
|
|
|
if query_b64: |
|
|
|
|
|
query_from_b64 = base64_to_pil(query_b64) |
|
|
if query_from_b64 is None: |
|
|
raise RuntimeError("Could not decode provided base64 query. Exiting.") |
|
|
query_pil_orig = query_from_b64 |
|
|
else: |
|
|
|
|
|
if not os.path.exists(query_path): |
|
|
raise FileNotFoundError(f"Query image not found: {query_path}") |
|
|
query_pil_orig = load_image_pil(query_path) |
|
|
if query_pil_orig is None: |
|
|
raise RuntimeError("Could not load query image from path. Exiting.") |
|
|
|
|
|
|
|
|
try: |
|
|
query_b64 = pil_to_base64(query_pil_orig, fmt="PNG") |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Could not base64 query from disk image: {e}") |
|
|
|
|
|
query_from_b64 = base64_to_pil(query_b64) |
|
|
if query_from_b64 is None: |
|
|
raise RuntimeError("Could not decode query base64 after roundtrip. Exiting.") |
|
|
|
|
|
|
|
|
|
|
|
enhanced_query_pil = process_image_cv2_from_pil(query_from_b64, scale=2) |
|
|
if enhanced_query_pil is None: |
|
|
print("[Query] OpenCV enhancement failed; falling back to base64-decoded image.") |
|
|
enhanced_query_pil = query_from_b64 |
|
|
|
|
|
|
|
|
query_enhanced_path = os.path.join(processed_dir, "query_enhanced.png") |
|
|
try: |
|
|
enhanced_query_pil.save(query_enhanced_path, format="PNG") |
|
|
except Exception: |
|
|
try: |
|
|
enhanced_query_pil.convert("RGB").save(query_enhanced_path, format="PNG") |
|
|
except Exception: |
|
|
print("[Warning] Could not save enhanced query image for inspection.") |
|
|
|
|
|
|
|
|
prepped = preprocess_for_model(enhanced_query_pil) |
|
|
query_emb = get_dinov2_embedding_from_pil(prepped) |
|
|
if query_emb is None: |
|
|
raise RuntimeError("Could not compute query embedding. Exiting.") |
|
|
|
|
|
|
|
|
query_hash_arr = preprocess_for_hash(enhanced_query_pil) |
|
|
if query_hash_arr is None: |
|
|
raise RuntimeError("Could not compute query phash array. Exiting.") |
|
|
query_phash = phash.encode_image(image_array=query_hash_arr) |
|
|
|
|
|
|
|
|
query_sig = None |
|
|
query_sig_path = os.path.join(processed_dir, "query_for_sig.png") |
|
|
try: |
|
|
enhanced_query_pil.save(query_sig_path, format="PNG") |
|
|
except Exception: |
|
|
try: |
|
|
enhanced_query_pil.convert("RGB").save(query_sig_path, format="PNG") |
|
|
except Exception: |
|
|
query_sig_path = None |
|
|
|
|
|
if query_sig_path: |
|
|
try: |
|
|
query_sig = gis.generate_signature(query_sig_path) |
|
|
except Exception as e: |
|
|
print(f"[ImageSignature] failed for query: {e}") |
|
|
query_sig = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
embeddings_dict = embeddings_dict or {} |
|
|
hash_dict = hash_dict or {} |
|
|
signature_obj_map = signature_obj_map or {} |
|
|
|
|
|
image_paths = list(embeddings_dict.keys()) |
|
|
image_embeddings = np.array(list(embeddings_dict.values()), dtype=float) if embeddings_dict else np.array([]) |
|
|
|
|
|
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: |
|
|
try: |
|
|
return float(np.dot(a, b)) |
|
|
except Exception: |
|
|
return -1.0 |
|
|
|
|
|
|
|
|
embedding_results: List[Tuple[str, float]] = [] |
|
|
phash_results: List[Tuple[str, Any, float]] = [] |
|
|
imgmatch_results: List[Tuple[str, Any, float]] = [] |
|
|
combined_results: List[Tuple[str, float, float, float, float]] = [] |
|
|
|
|
|
|
|
|
for idx, path in enumerate(image_paths): |
|
|
|
|
|
try: |
|
|
stored_emb = image_embeddings[idx] |
|
|
emb_sim = cosine_similarity(query_emb, stored_emb) |
|
|
except Exception: |
|
|
emb_sim = -1.0 |
|
|
embedding_results.append((path, emb_sim)) |
|
|
|
|
|
|
|
|
try: |
|
|
stored_ph = hash_dict.get(path) |
|
|
if stored_ph is not None: |
|
|
hd = phash.hamming_distance(query_phash, stored_ph) |
|
|
ph_sim = max(0.0, 1.0 - (hd / float(MAX_PHASH_BITS))) |
|
|
else: |
|
|
hd = None |
|
|
ph_sim = 0.0 |
|
|
except Exception: |
|
|
hd = None |
|
|
ph_sim = 0.0 |
|
|
phash_results.append((path, hd, ph_sim)) |
|
|
|
|
|
|
|
|
try: |
|
|
stored_sig = signature_obj_map.get(path) |
|
|
if stored_sig is not None and query_sig is not None: |
|
|
dist = gis.normalized_distance(stored_sig, query_sig) |
|
|
im_sim = max(0.0, 1.0 - dist) |
|
|
else: |
|
|
dist = None |
|
|
im_sim = 0.0 |
|
|
except Exception: |
|
|
dist = None |
|
|
im_sim = 0.0 |
|
|
imgmatch_results.append((path, dist, im_sim)) |
|
|
|
|
|
|
|
|
emb_clamped = max(0.0, min(1.0, emb_sim)) |
|
|
combined = (emb_clamped + ph_sim + im_sim) / 3.0 |
|
|
combined_results.append((path, combined, emb_clamped, ph_sim, im_sim)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
embedding_results.sort(key=lambda x: x[1], reverse=True) |
|
|
phash_results_sorted = sorted(phash_results, key=lambda x: (x[2] is not None, x[2]), reverse=True) |
|
|
imgmatch_results_sorted = sorted(imgmatch_results, key=lambda x: (x[2] is not None, x[2]), reverse=True) |
|
|
combined_results.sort(key=lambda x: x[1], reverse=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("\nTop results by DINOv2 Embeddings:") |
|
|
for i, (path, score) in enumerate(embedding_results[:k], start=1): |
|
|
print(f"Rank {i}: {path} | Cosine: {score:.4f}") |
|
|
|
|
|
print("\nTop results by PHash (Hamming distance & normalized sim):") |
|
|
for i, (path, hd, sim) in enumerate(phash_results_sorted[:k], start=1): |
|
|
print(f"Rank {i}: {path} | Hamming: {hd} | NormSim: {sim:.4f}") |
|
|
|
|
|
print("\nTop results by ImageSignature (normalized similarity = 1 - distance):") |
|
|
for i, (path, dist, sim) in enumerate(imgmatch_results_sorted[:k], start=1): |
|
|
print(f"Rank {i}: {path} | NormDist: {dist} | NormSim: {sim:.4f}") |
|
|
|
|
|
print("\nTop results by Combined Score (avg of embedding|phash|image-match):") |
|
|
for i, (path, combined, emb_clamped, ph_sim, im_sim) in enumerate(combined_results[:k], start=1): |
|
|
print(f"Rank {i}: {path} | Combined: {combined:.4f} | emb: {emb_clamped:.4f} | phash_sim: {ph_sim:.4f} | imgmatch_sim: {im_sim:.4f}") |
|
|
|
|
|
print("\nSearch complete.") |
|
|
|
|
|
|
|
|
return embedding_results, phash_results_sorted, imgmatch_results_sorted, combined_results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from collections import defaultdict |
|
|
import math |
|
|
|
|
|
def choose_top_candidates(embedding_results, phash_results, imgmatch_results, top_k=10, |
|
|
method_weights=(0.5, 0.3, 0.2), verbose=True): |
|
|
""" |
|
|
embedding_results: list of (path, emb_sim) where emb_sim roughly in [-1,1] (we'll clamp to 0..1) |
|
|
phash_results: list of (path, hamming, ph_sim) where ph_sim in [0,1] |
|
|
imgmatch_results: list of (path, dist, im_sim) where im_sim in [0,1] |
|
|
method_weights: weights for (emb, phash, imgmatch) when using weighted average |
|
|
returns dict with top candidates from three methods and diagnostics |
|
|
""" |
|
|
|
|
|
emb_map = {p: float(s) for p, s in embedding_results} |
|
|
ph_map = {p: float(sim) for p, _, sim in phash_results} |
|
|
im_map = {p: float(sim) for p, _, sim in imgmatch_results} |
|
|
|
|
|
|
|
|
all_paths = sorted(set(list(emb_map.keys()) + list(ph_map.keys()) + list(im_map.keys()))) |
|
|
|
|
|
|
|
|
def normalize_map(m): |
|
|
vals = [m.get(p, None) for p in all_paths] |
|
|
|
|
|
present = [v for v in vals if v is not None and not math.isnan(v)] |
|
|
if not present: |
|
|
return {p: 0.0 for p in all_paths} |
|
|
vmin, vmax = min(present), max(present) |
|
|
if vmax == vmin: |
|
|
|
|
|
return {p: (1.0 if (m.get(p, None) is not None) else 0.0) for p in all_paths} |
|
|
norm = {} |
|
|
for p in all_paths: |
|
|
v = m.get(p, None) |
|
|
if v is None or math.isnan(v): |
|
|
norm[p] = 0.0 |
|
|
else: |
|
|
norm[p] = (v - vmin) / (vmax - vmin) |
|
|
|
|
|
if norm[p] < 0: norm[p] = 0.0 |
|
|
if norm[p] > 1: norm[p] = 1.0 |
|
|
return norm |
|
|
|
|
|
|
|
|
emb_map_clamped = {} |
|
|
for p, v in emb_map.items(): |
|
|
|
|
|
emb_map_clamped[p] = max(0.0, v) |
|
|
|
|
|
emb_norm = normalize_map(emb_map_clamped) |
|
|
ph_norm = normalize_map(ph_map) |
|
|
im_norm = normalize_map(im_map) |
|
|
|
|
|
|
|
|
w_emb, w_ph, w_im = method_weights |
|
|
weighted_scores = {} |
|
|
for p in all_paths: |
|
|
weighted_scores[p] = (w_emb * emb_norm.get(p, 0.0) |
|
|
+ w_ph * ph_norm.get(p, 0.0) |
|
|
+ w_im * im_norm.get(p, 0.0)) |
|
|
|
|
|
top_weighted = sorted(weighted_scores.items(), key=lambda x: x[1], reverse=True)[:top_k] |
|
|
|
|
|
|
|
|
|
|
|
def ranks_from_map(m_norm): |
|
|
|
|
|
items = sorted(m_norm.items(), key=lambda x: x[1], reverse=True) |
|
|
ranks = {} |
|
|
for i, (p, _) in enumerate(items): |
|
|
ranks[p] = i + 1 |
|
|
|
|
|
worst = len(items) + 1 |
|
|
for p in all_paths: |
|
|
if p not in ranks: |
|
|
ranks[p] = worst |
|
|
return ranks |
|
|
|
|
|
rank_emb = ranks_from_map(emb_norm) |
|
|
rank_ph = ranks_from_map(ph_norm) |
|
|
rank_im = ranks_from_map(im_norm) |
|
|
|
|
|
rank_sum = {} |
|
|
for p in all_paths: |
|
|
rank_sum[p] = rank_emb.get(p, 9999) + rank_ph.get(p, 9999) + rank_im.get(p, 9999) |
|
|
top_rank_sum = sorted(rank_sum.items(), key=lambda x: x[1])[:top_k] |
|
|
|
|
|
|
|
|
harm_scores = {} |
|
|
for p in all_paths: |
|
|
a = emb_norm.get(p, 0.0) |
|
|
b = ph_norm.get(p, 0.0) |
|
|
c = im_norm.get(p, 0.0) |
|
|
|
|
|
if a + b + c == 0: |
|
|
harm = 0.0 |
|
|
else: |
|
|
|
|
|
if a == 0 or b == 0 or c == 0: |
|
|
harm = 0.0 |
|
|
else: |
|
|
harm = 3.0 / ((1.0/a) + (1.0/b) + (1.0/c)) |
|
|
harm_scores[p] = harm |
|
|
top_harm = sorted(harm_scores.items(), key=lambda x: x[1], reverse=True)[:top_k] |
|
|
|
|
|
|
|
|
def topk_set_by_map(m_norm, k=top_k): |
|
|
return set([p for p,_ in sorted(m_norm.items(), key=lambda x: x[1], reverse=True)[:k]]) |
|
|
cons_set = topk_set_by_map(emb_norm, top_k) & topk_set_by_map(ph_norm, top_k) & topk_set_by_map(im_norm, top_k) |
|
|
|
|
|
|
|
|
result = { |
|
|
"emb_norm": emb_norm, |
|
|
"ph_norm": ph_norm, |
|
|
"im_norm": im_norm, |
|
|
"weighted_topk": top_weighted, |
|
|
"rank_sum_topk": top_rank_sum, |
|
|
"harmonic_topk": top_harm, |
|
|
"consensus_topk": list(cons_set), |
|
|
"weighted_scores_full": weighted_scores, |
|
|
"rank_sum_full": rank_sum, |
|
|
"harmonic_full": harm_scores |
|
|
} |
|
|
|
|
|
if verbose: |
|
|
print("\nTop by Weighted Normalized Average (weights emb,ph,img = {:.2f},{:.2f},{:.2f}):".format(w_emb, w_ph, w_im)) |
|
|
for i,(p,s) in enumerate(result["weighted_topk"], start=1): |
|
|
print(f" {i}. {p} score={s:.4f} emb={emb_norm.get(p,0):.3f} ph={ph_norm.get(p,0):.3f} im={im_norm.get(p,0):.3f}") |
|
|
|
|
|
print("\nTop by Rank-sum (lower is better):") |
|
|
for i,(p,s) in enumerate(result["rank_sum_topk"], start=1): |
|
|
print(f" {i}. {p} rank_sum={s} emb_rank={rank_emb.get(p)} ph_rank={rank_ph.get(p)} img_rank={rank_im.get(p)}") |
|
|
|
|
|
print("\nTop by Harmonic mean (requires non-zero on all metrics):") |
|
|
for i,(p,s) in enumerate(result["harmonic_topk"], start=1): |
|
|
print(f" {i}. {p} harm={s:.4f} emb={emb_norm.get(p,0):.3f} ph={ph_norm.get(p,0):.3f} im={im_norm.get(p,0):.3f}") |
|
|
|
|
|
print("\nConsensus (in top-{0} of ALL metrics): {1}".format(top_k, result["consensus_topk"])) |
|
|
|
|
|
return result |
|
|
|
|
|
def is_subpath(path: str, base: str) -> bool: |
|
|
"""Return True if path is inside base (works across OSes).""" |
|
|
try: |
|
|
p = os.path.normpath(os.path.abspath(path)) |
|
|
b = os.path.normpath(os.path.abspath(base)) |
|
|
if os.name == "nt": p = p.lower(); b = b.lower() |
|
|
return os.path.commonpath([p, b]) == b |
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
|
|
|
def _load_block_catalog(block_type: str) -> Dict: |
|
|
""" |
|
|
Loads the Scratch block catalog named '{block_type}_blocks.json' |
|
|
from the <project_root>/blocks/ folder. Returns {} on any error. |
|
|
""" |
|
|
catalog_path = BLOCKS_DIR / f"{block_type}.json" |
|
|
|
|
|
try: |
|
|
text = catalog_path.read_text() |
|
|
catalog = json.loads(text) |
|
|
logger.info(f"Successfully loaded block catalog from {catalog_path}") |
|
|
return catalog |
|
|
except FileNotFoundError: |
|
|
logger.error(f"Error: Block catalog file not found at {catalog_path}") |
|
|
except json.JSONDecodeError as e: |
|
|
logger.error(f"Error decoding JSON from {catalog_path}: {e}") |
|
|
except Exception as e: |
|
|
logger.error(f"Unexpected error loading {catalog_path}: {e}") |
|
|
|
|
|
def get_block_by_opcode(catalog_data: dict, opcode: str) -> dict | None: |
|
|
""" |
|
|
Search a single catalog (with keys "description" and "blocks": List[dict]) |
|
|
for a block whose 'op_code' matches the given opcode. |
|
|
Returns the block dict or None if not found. |
|
|
""" |
|
|
for block in catalog_data["blocks"]: |
|
|
if block.get("op_code") == opcode: return block |
|
|
return None |
|
|
|
|
|
|
|
|
def find_block_in_all(opcode: str, all_catalogs: list[dict]) -> dict | None: |
|
|
""" |
|
|
Search across multiple catalogs for a given opcode. |
|
|
Returns the first matching block dict or None. |
|
|
""" |
|
|
for catalog in all_catalogs: |
|
|
blk = get_block_by_opcode(catalog, opcode) |
|
|
if blk is not None: return blk |
|
|
return None |
|
|
|
|
|
def variable_intialization(project_data): |
|
|
""" |
|
|
Updates variable and broadcast definitions in a Scratch project JSON, |
|
|
populating the 'variables' and 'broadcasts' sections of the Stage target |
|
|
and extracting initial values for variables. |
|
|
Args: project_data (dict): The loaded JSON data of the Scratch project. |
|
|
Returns: dict: The updated project JSON data. |
|
|
""" |
|
|
|
|
|
stage_target = None |
|
|
for target in project_data['targets']: |
|
|
if target.get('isStage'): |
|
|
stage_target = target |
|
|
break |
|
|
if stage_target is None: |
|
|
print("Error: Stage target not found in the project data.") |
|
|
return project_data |
|
|
|
|
|
if "variables" not in stage_target: |
|
|
stage_target["variables"] = {} |
|
|
if "broadcasts" not in stage_target: |
|
|
stage_target["broadcasts"] = {} |
|
|
|
|
|
|
|
|
def process_dict(obj): |
|
|
if isinstance(obj, dict): |
|
|
|
|
|
if obj.get("opcode") == "data_setvariableto": |
|
|
variable_field = obj.get("fields", {}).get("VARIABLE") |
|
|
value_input = obj.get("inputs", {}).get("VALUE") |
|
|
|
|
|
if variable_field and isinstance(variable_field, list) and len(variable_field) == 2: |
|
|
var_name = variable_field[0] |
|
|
var_id = variable_field[1] |
|
|
|
|
|
initial_value = "" |
|
|
if value_input and isinstance(value_input, list) and len(value_input) > 1 and \ |
|
|
isinstance(value_input[1], list) and len(value_input[1]) > 1: |
|
|
if value_input[1][0] == 10: |
|
|
initial_value = str(value_input[1][1]) |
|
|
elif value_input[1][0] == 12 and len(value_input) > 2 and isinstance(value_input[2], list) and value_input[2][0] == 10: |
|
|
initial_value = str(value_input[2][1]) |
|
|
elif isinstance(value_input[1], (str, int, float)): |
|
|
initial_value = str(value_input[1]) |
|
|
stage_target["variables"][var_id] = [var_name, initial_value] |
|
|
|
|
|
for key, value in obj.items(): |
|
|
|
|
|
if key == "BROADCAST_INPUT" and isinstance(value, list) and len(value) == 2 and \ |
|
|
isinstance(value[1], list) and len(value[1]) == 3 and value[1][0] == 11: |
|
|
broadcast_name = value[1][1] |
|
|
broadcast_id = value[1][2] |
|
|
stage_target["broadcasts"][broadcast_id] = broadcast_name |
|
|
|
|
|
|
|
|
elif key == "BROADCAST_OPTION" and isinstance(value, list) and len(value) == 2: |
|
|
broadcast_name = value[0] |
|
|
broadcast_id = value[1] |
|
|
stage_target["broadcasts"][broadcast_id] = broadcast_name |
|
|
|
|
|
|
|
|
process_dict(value) |
|
|
|
|
|
elif isinstance(obj, list): |
|
|
for i, item in enumerate(obj): |
|
|
|
|
|
if isinstance(item, list) and len(item) == 3 and item[0] == 12: |
|
|
var_name = item[1] |
|
|
var_id = item[2] |
|
|
if var_id not in stage_target["variables"]: |
|
|
stage_target["variables"][var_id] = [var_name, ""] |
|
|
process_dict(item) |
|
|
|
|
|
|
|
|
for target in project_data['targets']: |
|
|
if "blocks" in target: |
|
|
for block_id, block_data in target["blocks"].items(): |
|
|
process_dict(block_data) |
|
|
|
|
|
return project_data |
|
|
|
|
|
def deduplicate_variables(project_data): |
|
|
""" |
|
|
Removes duplicate variable entries in the 'variables' dictionary of the Stage target, |
|
|
prioritizing entries with non-empty values. |
|
|
Args: project_data (dict): The loaded JSON data of the Scratch project. |
|
|
Returns: dict: The updated project JSON data with deduplicated variables. |
|
|
""" |
|
|
stage_target = None |
|
|
for target in project_data['targets']: |
|
|
if target.get('isStage'): |
|
|
stage_target = target |
|
|
break |
|
|
|
|
|
if stage_target is None: |
|
|
print("Error: Stage target not found in the project data.") |
|
|
return project_data |
|
|
|
|
|
if "variables" not in stage_target: |
|
|
return project_data |
|
|
|
|
|
resolved_variables = {} |
|
|
|
|
|
for var_id, var_info in stage_target["variables"].items(): |
|
|
var_name = var_info[0] |
|
|
var_value = var_info[1] |
|
|
|
|
|
if var_name not in resolved_variables: |
|
|
|
|
|
resolved_variables[var_name] = [var_id, var_name, var_value] |
|
|
else: |
|
|
|
|
|
existing_id, existing_name, existing_value = resolved_variables[var_name] |
|
|
|
|
|
if var_value != "" and existing_value == "": |
|
|
resolved_variables[var_name] = [var_id, var_name, var_value] |
|
|
elif var_value != "" and existing_value != "": |
|
|
resolved_variables[var_name] = [var_id, var_name, var_value] |
|
|
elif var_value == "" and existing_value == "": |
|
|
|
|
|
resolved_variables[var_name] = [var_id, var_name, var_value] |
|
|
|
|
|
|
|
|
new_variables_dict = {} |
|
|
for var_name, var_data in resolved_variables.items(): |
|
|
var_id_to_keep = var_data[0] |
|
|
var_name_to_keep = var_data[1] |
|
|
var_value_to_keep = var_data[2] |
|
|
new_variables_dict[var_id_to_keep] = [var_name_to_keep, var_value_to_keep] |
|
|
stage_target["variables"] = new_variables_dict |
|
|
return project_data |
|
|
|
|
|
def variable_adder_main(project_data): |
|
|
try: |
|
|
declare_variable_json= variable_intialization(project_data) |
|
|
print("declare_variable_json------->",declare_variable_json) |
|
|
except Exception as e: |
|
|
print(f"Error error in the variable initialization opcodes: {e}") |
|
|
try: |
|
|
processed_json= deduplicate_variables(declare_variable_json) |
|
|
print("processed_json------->",processed_json) |
|
|
return processed_json |
|
|
except Exception as e: |
|
|
print(f"Error error in the variable initialization opcodes: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_json_from_llm_response(raw_response: str) -> dict: |
|
|
""" |
|
|
Finds and parses the first valid JSON object from a raw LLM response string. |
|
|
""" |
|
|
logger.debug("Attempting to extract JSON from raw LLM response...") |
|
|
|
|
|
|
|
|
match = re.search(r"```(?:json)?\s*({[\s\S]*?})\s*```", raw_response) |
|
|
if match: |
|
|
json_string = match.group(1) |
|
|
logger.debug("Found JSON inside a markdown block.") |
|
|
try: |
|
|
return json.loads(json_string) |
|
|
except json.JSONDecodeError as e: |
|
|
logger.warning(f"Failed to parse JSON from markdown block: {e}") |
|
|
|
|
|
|
|
|
|
|
|
logger.debug("Markdown block not found or failed. Searching for outermost braces.") |
|
|
try: |
|
|
first_brace = raw_response.find('{') |
|
|
last_brace = raw_response.rfind('}') |
|
|
if first_brace != -1 and last_brace != -1 and first_brace < last_brace: |
|
|
json_string = raw_response[first_brace : last_brace + 1] |
|
|
return json.loads(json_string) |
|
|
else: |
|
|
logger.error("Could not find a valid JSON structure (outermost braces).") |
|
|
raise json.JSONDecodeError("No valid JSON object found in the response.", raw_response, 0) |
|
|
except json.JSONDecodeError as e: |
|
|
logger.error(f"Final JSON parsing attempt failed: {e}") |
|
|
|
|
|
raise |
|
|
|
|
|
def reduce_image_size_to_limit(clean_b64_str: str, max_kb: int = 4000) -> str: |
|
|
""" |
|
|
Input: clean_b64_str = BASE64 STRING (no data: prefix) |
|
|
Output: BASE64 STRING (no data: prefix), sized as close as possible to max_kb KB. |
|
|
Guarantees: returns a valid base64 string (never None). May still be larger than max_kb |
|
|
if saving at lowest quality cannot get under the limit. |
|
|
""" |
|
|
|
|
|
clean = re.sub(r"\s+", "", clean_b64_str).strip() |
|
|
|
|
|
missing = len(clean) % 4 |
|
|
if missing: |
|
|
clean += "=" * (4 - missing) |
|
|
|
|
|
try: |
|
|
image_data = base64.b64decode(clean) |
|
|
except Exception as e: |
|
|
raise ValueError("Invalid base64 input to reduce_image_size_to_limit") from e |
|
|
|
|
|
try: |
|
|
img = Image.open(io.BytesIO(image_data)) |
|
|
img.load() |
|
|
except Exception as e: |
|
|
raise ValueError("Could not open image from base64") from e |
|
|
|
|
|
|
|
|
if img.mode in ("RGBA", "LA") or (img.mode == "P" and "transparency" in img.info): |
|
|
background = Image.new("RGB", img.size, (255, 255, 255)) |
|
|
background.paste(img, mask=img.split()[-1] if img.mode != "RGB" else None) |
|
|
img = background |
|
|
elif img.mode != "RGB": |
|
|
img = img.convert("RGB") |
|
|
|
|
|
low, high = 20, 95 |
|
|
best_bytes = None |
|
|
|
|
|
while low <= high: |
|
|
mid = (low + high) // 2 |
|
|
buf = io.BytesIO() |
|
|
try: |
|
|
img.save(buf, format="JPEG", quality=mid, optimize=True) |
|
|
except OSError: |
|
|
|
|
|
buf = io.BytesIO() |
|
|
img.save(buf, format="JPEG", quality=mid) |
|
|
size_kb = len(buf.getvalue()) / 1024.0 |
|
|
if size_kb <= max_kb: |
|
|
best_bytes = buf.getvalue() |
|
|
low = mid + 1 |
|
|
else: |
|
|
high = mid - 1 |
|
|
|
|
|
|
|
|
if best_bytes is None: |
|
|
buf = io.BytesIO() |
|
|
try: |
|
|
img.save(buf, format="JPEG", quality=20, optimize=True) |
|
|
except OSError: |
|
|
buf = io.BytesIO() |
|
|
img.save(buf, format="JPEG", quality=20) |
|
|
best_bytes = buf.getvalue() |
|
|
|
|
|
return base64.b64encode(best_bytes).decode("utf-8") |
|
|
|
|
|
|
|
|
def clean_base64_for_model(raw_b64, max_bytes_threshold=4000000) -> str: |
|
|
""" |
|
|
Accepts: raw_b64 can be: |
|
|
- a data URI 'data:image/png;base64,...' |
|
|
- a plain base64 string |
|
|
- a PIL Image |
|
|
- a list containing the above (take first) |
|
|
Returns: a data URI string 'data:<mime>;base64,<base64>' guaranteed to be syntactically valid. |
|
|
""" |
|
|
|
|
|
if not raw_b64: |
|
|
return "" |
|
|
|
|
|
if isinstance(raw_b64, list): |
|
|
raw_b64 = raw_b64[0] if raw_b64 else "" |
|
|
if not raw_b64: |
|
|
return "" |
|
|
|
|
|
if isinstance(raw_b64, Image.Image): |
|
|
buf = io.BytesIO() |
|
|
|
|
|
img = raw_b64.convert("RGB") |
|
|
img.save(buf, format="JPEG") |
|
|
clean_b64 = base64.b64encode(buf.getvalue()).decode("utf-8") |
|
|
mime = "image/jpeg" |
|
|
return f"data:{mime};base64,{clean_b64}" |
|
|
|
|
|
if not isinstance(raw_b64, str): |
|
|
raise TypeError(f"Expected base64 string or PIL Image, got {type(raw_b64)}") |
|
|
|
|
|
|
|
|
m = re.match(r"^data:(image\/[a-zA-Z0-9.+-]+);base64,(.+)$", raw_b64, flags=re.DOTALL) |
|
|
if m: |
|
|
mime = m.group(1) |
|
|
clean_b64 = m.group(2) |
|
|
else: |
|
|
|
|
|
mime = "image/png" |
|
|
clean_b64 = raw_b64 |
|
|
|
|
|
|
|
|
clean_b64 = re.sub(r"\s+", "", clean_b64).strip() |
|
|
missing = len(clean_b64) % 4 |
|
|
if missing: |
|
|
clean_b64 += "=" * (4 - missing) |
|
|
|
|
|
original_size_bytes = len(clean_b64.encode("utf-8")) |
|
|
|
|
|
print(f"Original base64 size (bytes): {original_size_bytes}, mime: {mime}") |
|
|
|
|
|
if original_size_bytes > max_bytes_threshold: |
|
|
|
|
|
reduced_clean = reduce_image_size_to_limit(clean_b64, max_kb=4000) |
|
|
|
|
|
print(f"Reduced base64 size (bytes): {original_size_bytes}, mime: {mime}") |
|
|
return f"data:image/jpeg;base64,{reduced_clean}" |
|
|
|
|
|
|
|
|
return f"data:{mime};base64,{clean_b64}" |
|
|
|
|
|
SCRATCH_OPCODES = [ |
|
|
'motion_movesteps', 'motion_turnright', 'motion_turnleft', 'motion_goto', |
|
|
'motion_gotoxy', 'motion_glideto', 'motion_glidesecstoxy', 'motion_pointindirection', |
|
|
'motion_pointtowards', 'motion_changexby', 'motion_setx', 'motion_changeyby', |
|
|
'motion_sety', 'motion_ifonedgebounce', 'motion_setrotationstyle', 'looks_sayforsecs', |
|
|
'looks_say', 'looks_thinkforsecs', 'looks_think', 'looks_switchcostumeto', |
|
|
'looks_nextcostume', 'looks_switchbackdropto', 'looks_switchbackdroptowait', |
|
|
'looks_nextbackdrop', 'looks_changesizeby', 'looks_setsizeto', 'looks_changeeffectby', |
|
|
'looks_seteffectto', 'looks_cleargraphiceffects', 'looks_show', 'looks_hide', |
|
|
'looks_gotofrontback', 'looks_goforwardbackwardlayers', 'sound_playuntildone', |
|
|
'sound_play', 'sound_stopallsounds', 'sound_changevolumeby', 'sound_setvolumeto', |
|
|
'event_broadcast', 'event_broadcastandwait', 'control_wait', 'control_wait_until', |
|
|
'control_stop', 'control_create_clone_of', 'control_delete_this_clone', |
|
|
'data_setvariableto', 'data_changevariableby', 'data_addtolist', 'data_deleteoflist', |
|
|
'data_insertatlist', 'data_replaceitemoflist', 'data_showvariable', 'data_hidevariable', |
|
|
'data_showlist', 'data_hidelist', 'sensing_askandwait', 'sensing_resettimer', |
|
|
'sensing_setdragmode', 'procedures_call', 'operator_lt', 'operator_equals', |
|
|
'operator_gt', 'operator_and', 'operator_or', 'operator_not', 'operator_contains', |
|
|
'sensing_touchingobject', 'sensing_touchingcolor', 'sensing_coloristouchingcolor', |
|
|
'sensing_keypressed', 'sensing_mousedown', 'data_listcontainsitem', 'control_repeat', |
|
|
'control_forever', 'control_if', 'control_if_else', 'control_repeat_until', |
|
|
'motion_xposition', 'motion_yposition', 'motion_direction', 'looks_costumenumbername', |
|
|
'looks_size', 'looks_backdropnumbername', 'sound_volume', 'sensing_distanceto', |
|
|
'sensing_answer', 'sensing_mousex', 'sensing_mousey', 'sensing_loudness', |
|
|
'sensing_timer', 'sensing_of', 'sensing_current', 'sensing_dayssince2000', |
|
|
'sensing_username', 'operator_add', 'operator_subtract', 'operator_multiply', |
|
|
'operator_divide', 'operator_random', 'operator_join', 'operator_letterof', |
|
|
'operator_length', 'operator_mod', 'operator_round', 'operator_mathop', |
|
|
'data_variable', 'data_list', 'data_itemoflist', 'data_lengthoflist', |
|
|
'data_itemnumoflist', 'event_whenflagclicked', 'event_whenkeypressed', |
|
|
'event_whenthisspriteclicked', 'event_whenbackdropswitchesto', 'event_whengreaterthan', |
|
|
'event_whenbroadcastreceived', 'control_start_as_clone', 'procedures_definition' |
|
|
] |
|
|
|
|
|
def validate_and_fix_opcodes(opcode_counts): |
|
|
""" |
|
|
Ensures all opcodes are valid. If an opcode is invalid, replace with closest match. |
|
|
""" |
|
|
corrected_list = [] |
|
|
for item in opcode_counts: |
|
|
opcode = item.get("opcode") |
|
|
count = item.get("count", 1) |
|
|
|
|
|
if opcode not in SCRATCH_OPCODES: |
|
|
|
|
|
match = get_close_matches(opcode, SCRATCH_OPCODES, n=1, cutoff=0.6) |
|
|
if match: |
|
|
print(f"Opcode '{opcode}' not found. Replacing with '{match[0]}'") |
|
|
opcode = match[0] |
|
|
else: |
|
|
print(f"Opcode '{opcode}' not recognized and no close match found. Skipping.") |
|
|
continue |
|
|
|
|
|
corrected_list.append({"opcode": opcode, "count": count}) |
|
|
|
|
|
|
|
|
merged = {} |
|
|
for item in corrected_list: |
|
|
merged[item["opcode"]] = merged.get(item["opcode"], 0) + item["count"] |
|
|
|
|
|
return [{"opcode": k, "count": v} for k, v in merged.items()] |
|
|
|
|
|
def format_scratch_pseudo_code(code_string): |
|
|
""" |
|
|
Parses and formats Scratch pseudo-code with correct indentation, |
|
|
specifically handling if/else/end structures correctly. |
|
|
|
|
|
Args: |
|
|
code_string (str): A string containing Scratch pseudo-code with |
|
|
potentially inconsistent indentation. |
|
|
|
|
|
Returns: |
|
|
str: The correctly formatted and indented pseudo-code string. |
|
|
""" |
|
|
lines = code_string.strip().split('\n') |
|
|
formatted_lines = [] |
|
|
indent_level = 0 |
|
|
|
|
|
|
|
|
indent_keywords = ['when', 'forever', 'if', 'repeat', 'else'] |
|
|
|
|
|
|
|
|
unindent_keywords = ['end', 'else'] |
|
|
|
|
|
for line in lines: |
|
|
stripped_line = line.strip() |
|
|
if not stripped_line: |
|
|
continue |
|
|
|
|
|
|
|
|
if any(keyword in stripped_line for keyword in unindent_keywords): |
|
|
|
|
|
if 'else' in stripped_line: |
|
|
|
|
|
indentation = ' ' * (indent_level -1) |
|
|
formatted_lines.append(indentation + stripped_line) |
|
|
continue |
|
|
|
|
|
|
|
|
indent_level = max(0, indent_level - 1) |
|
|
|
|
|
indentation = ' ' * indent_level |
|
|
formatted_lines.append(indentation + stripped_line) |
|
|
|
|
|
|
|
|
if any(keyword in stripped_line for keyword in indent_keywords): |
|
|
|
|
|
if 'else' not in stripped_line: |
|
|
indent_level += 1 |
|
|
|
|
|
return '\n'.join(formatted_lines) |
|
|
|
|
|
|
|
|
def pseudo_generator_node(state: GameState): |
|
|
logger.info("--- Running plan_logic_aligner_node ---") |
|
|
image = state.get("project_image", "") |
|
|
project_json = state["project_json"] |
|
|
cnt =state["page_count"] |
|
|
print(f"The page number recived at the pseudo_generator node:-----> {cnt}") |
|
|
|
|
|
|
|
|
target_names = [t["name"] for t in project_json["targets"]] |
|
|
stage_names = [t["name"] for t in project_json["targets"] if t.get("isStage")] |
|
|
sprite_names = [t["name"] for t in project_json["targets"] if not t.get("isStage")] |
|
|
|
|
|
stage_costumes = [ |
|
|
c["name"] |
|
|
for t in project_json["targets"] if t.get("isStage") |
|
|
for c in t.get("costumes", []) |
|
|
] |
|
|
refinement_prompt = f""" |
|
|
You are an expert Scratch 3.0 programmer. Your task is to analyze an image of Scratch code blocks and convert it into a structured JSON object containing precise pseudocode. |
|
|
|
|
|
--- |
|
|
## CONTEXT |
|
|
- **Available Sprites:** {', '.join(sprite_names)} |
|
|
- **Available Stage Costumes:** {', '.join(stage_costumes)} |
|
|
|
|
|
--- |
|
|
## INSTRUCTIONS |
|
|
1. **Identify the Target:** Find the text "Script for:" in the image to determine the target sprite or stage. |
|
|
2. **Apply Stage Rule:** If the identified target name exactly matches any name in the `Available Stage Costumes` list, you MUST set the output `name_variable` to `"Stage"`. Otherwise, use the identified target name. |
|
|
3. **Handle No Code:** If no Scratch blocks are visible in the image, return the specified "No Code-blocks" JSON format. |
|
|
4. **Generate Pseudocode:** If blocks are present, convert them to pseudocode according to the rules below. |
|
|
5. **Output ONLY JSON:** Your entire response must be a single, valid JSON object inside a ```json code block and nothing else. |
|
|
|
|
|
--- |
|
|
## PSEUDOCODE FORMATTING RULES |
|
|
- **Numbers & Text:** Enclose in parentheses. `(10)`, `(-50)`, `(hello)`. |
|
|
- **Variables & Dropdowns:** Enclose in square brackets with ` v`. `[score v]`, `[space v]`. |
|
|
- **Reporter Blocks:** Enclose in double parentheses. `((x position))`. |
|
|
- **Boolean Conditions:** Enclose in angle brackets. `<((score)) > (50)>`, `<not <touching [edge v]?>>`. |
|
|
- **Specific Block Exceptions:** Self-contained blocks like `if on edge, bounce`, `next costume`, and `hide` should be written as-is, without any parentheses or brackets. |
|
|
- **Line Breaks:** Use `\n` to separate each block onto a new line. The entire pseudocode must be a single JSON string. |
|
|
- **Indentation:** Use **4 spaces** to indent blocks nested inside C-Blocks (like `if`, `if else`, `repeat`, `forever`). |
|
|
- **Termination:** |
|
|
- **Every script** (starting with a hat block) MUST conclude with `end`. |
|
|
- **Every C-Block** (`if`, `repeat`, `forever`) MUST also have its own corresponding `end` at the correct indentation level. This is critical. |
|
|
|
|
|
--- |
|
|
## REQUIRED JSON FORMAT |
|
|
If code blocks are found: |
|
|
```json |
|
|
{{ |
|
|
"refined_logic": {{ |
|
|
"name_variable": "Name_Identified_From_Instructions", |
|
|
"pseudocode": "Your fully formatted pseudocode as a single string with \\n newlines." |
|
|
}} |
|
|
}} |
|
|
```` |
|
|
|
|
|
If no code blocks are found: |
|
|
|
|
|
```json |
|
|
{{ |
|
|
"refined_logic": {{ |
|
|
"name_variable": "Name_Identified_From_Instructions", |
|
|
"pseudocode": "No Code-blocks" |
|
|
}} |
|
|
}} |
|
|
``` |
|
|
|
|
|
----- |
|
|
|
|
|
## EXAMPLES |
|
|
|
|
|
**Example 1: Looping and Conditionals** |
|
|
|
|
|
``` |
|
|
when green flag clicked |
|
|
go to x: (240) y: (-100) |
|
|
set [speed v] to (-5) |
|
|
forever |
|
|
change x by ([speed v]) |
|
|
if <((x position)) < (-240)> then |
|
|
go to x: (240) y: (-100) |
|
|
end |
|
|
end |
|
|
end |
|
|
``` |
|
|
|
|
|
**Example 2: Events and Broadcasting** |
|
|
|
|
|
``` |
|
|
when I receive [Game Over v] |
|
|
if <((score)) > (([High Score v]))> then |
|
|
set [High Score v] to ([score v]) |
|
|
end |
|
|
switch backdrop to [Game Over v] |
|
|
end |
|
|
``` |
|
|
""" |
|
|
image_input = { |
|
|
"type": "image_url", |
|
|
"image_url": { |
|
|
|
|
|
"url": clean_base64_for_model(image[cnt]) |
|
|
} |
|
|
} |
|
|
|
|
|
content = [ |
|
|
{"type": "text", "text": refinement_prompt}, |
|
|
image_input |
|
|
] |
|
|
|
|
|
try: |
|
|
|
|
|
response = agent.invoke({"messages": [{"role": "user", "content": content}]}) |
|
|
llm_output_raw = response["messages"][-1].content.strip() |
|
|
print(f"llm_output_raw: {response}") |
|
|
parsed_llm_output = extract_json_from_llm_response(llm_output_raw) |
|
|
result = parsed_llm_output |
|
|
print(f"result:\n\n {result}") |
|
|
|
|
|
except json.JSONDecodeError as error_json: |
|
|
correction_prompt = f""" |
|
|
Fix this malformed response and return only the corrected JSON: |
|
|
|
|
|
Input: {llm_output_raw if 'llm_output_raw' in locals() else 'No response available'} |
|
|
|
|
|
Extract the sprite name and pseudocode, then return in this exact format: |
|
|
{{ |
|
|
"refined_logic": {{ |
|
|
"name_variable": "sprite_name", |
|
|
"pseudocode": "pseudocode_here" |
|
|
}} |
|
|
}} |
|
|
""" |
|
|
try: |
|
|
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) |
|
|
corrected_output = extract_json_from_llm_response(correction_response['messages'][-1].content) |
|
|
result = corrected_output |
|
|
print(f"result:\n\n {result}") |
|
|
except Exception as e_corr: |
|
|
logger.error(f"Failed to correct JSON output for even after retry: {e_corr}") |
|
|
|
|
|
|
|
|
|
|
|
state["pseudo_code"] = result |
|
|
state["temp_pseudo_code"] += [result] |
|
|
Data = state["temp_pseudo_code"] |
|
|
print(f"[OVREALL REFINED PSEUDO CODE LOGIC]: {result}") |
|
|
print(f"[OVREALL LISTS OF LOGICS]: {Data}") |
|
|
logger.info("Plan refinement and block relation analysis completed for all plans.") |
|
|
return state |
|
|
|
|
|
|
|
|
def node_optimizer(state: GameState): |
|
|
logger.info("--- Running Node Optimizer Node ---") |
|
|
project_json = state["project_json"] |
|
|
raw = state.get("pseudo_code", {}) |
|
|
refined_logic_data = raw.get("refined_logic", {}) |
|
|
sprite_name = refined_logic_data.get("name_variable", "<unknown>") |
|
|
pseudo = refined_logic_data.get("pseudocode", "") |
|
|
sprite_name = {} |
|
|
project_json_targets = state.get("project_json", {}).get("targets", []) |
|
|
for target in project_json_targets: |
|
|
sprite_name[target["name"]] = target["name"] |
|
|
action_flow = state.get("action_plan", {}) |
|
|
|
|
|
try: |
|
|
refined_logic_data["pseudocode"] = separate_scripts(str(pseudo)) |
|
|
|
|
|
state["pseudo_code"]["refined_logic"] = refined_logic_data |
|
|
print(f"[The pseudo_code generated here]: { state['pseudo_code']}") |
|
|
state["action_plan"] = transform_logic_to_action_flow(state["pseudo_code"]) |
|
|
print(f"[The action plan generated here]: { state['action_plan']}") |
|
|
|
|
|
action_flow = state.get("action_plan", {}) |
|
|
if action_flow.get("action_overall_flow", {}) == {}: |
|
|
plan_data = action_flow.items() |
|
|
else: |
|
|
plan_data = action_flow.get("action_overall_flow", {}).items() |
|
|
|
|
|
refined_flow: Dict[str, Any] = {} |
|
|
for sprite, sprite_data in plan_data: |
|
|
refined_plans = [] |
|
|
for plan in sprite_data.get("plans", []): |
|
|
logic = plan.get("logic", "") |
|
|
plan["opcode_counts"]= analyze_opcode_counts(str(logic)) |
|
|
refined_plans.append(plan) |
|
|
refined_flow[sprite] = { |
|
|
"description": sprite_data.get("description", ""), |
|
|
"plans": refined_plans |
|
|
} |
|
|
if refined_flow: |
|
|
state["action_plan"] = refined_flow |
|
|
logger.info("Node Optimization completed.") |
|
|
|
|
|
return state |
|
|
except Exception as e: |
|
|
logger.error(f"Error in Node Optimizer Node: {e}") |
|
|
|
|
|
|
|
|
def overall_block_builder_node_2(state: GameState): |
|
|
logger.info("--- Running OverallBlockBuilderNode ---") |
|
|
print("--- Running OverallBlockBuilderNode ---") |
|
|
project_json = state["project_json"] |
|
|
targets = project_json["targets"] |
|
|
|
|
|
sprite_map = {target["name"]: target for target in targets if not target["isStage"]} |
|
|
stage_target = next((target for target in targets if target["isStage"]), None) |
|
|
if stage_target: |
|
|
sprite_map[stage_target["name"]] = stage_target |
|
|
|
|
|
action_plan = state.get("action_plan", {}) |
|
|
print("[Overall Action Plan received at the block generator]:", json.dumps(action_plan, indent=2)) |
|
|
if not action_plan: |
|
|
logger.warning("No action plan found in state. Skipping OverallBlockBuilderNode.") |
|
|
return state |
|
|
|
|
|
|
|
|
script_y_offset = {} |
|
|
script_x_offset_per_sprite = {name: 0 for name in sprite_map.keys()} |
|
|
|
|
|
|
|
|
if action_plan.get("action_overall_flow", {}) == {}: |
|
|
plan_data = action_plan.items() |
|
|
else: |
|
|
plan_data = action_plan.get("action_overall_flow", {}).items() |
|
|
|
|
|
|
|
|
all_sprite_names = list(sprite_map.keys()) |
|
|
all_variable_names = {} |
|
|
all_list_names = {} |
|
|
all_broadcast_messages = {} |
|
|
|
|
|
for target in targets: |
|
|
for var_id, var_info in target.get("variables", {}).items(): |
|
|
all_variable_names[var_info[0]] = var_id |
|
|
for list_id, list_info in target.get("lists", {}).items(): |
|
|
all_list_names[list_info[0]] = list_id |
|
|
for broadcast_id, broadcast_name in target.get("broadcasts", {}).items(): |
|
|
all_broadcast_messages[broadcast_name] = broadcast_id |
|
|
|
|
|
|
|
|
for sprite_name, sprite_actions_data in plan_data: |
|
|
if sprite_name in sprite_map: |
|
|
current_sprite_target = sprite_map[sprite_name] |
|
|
if "blocks" not in current_sprite_target: |
|
|
current_sprite_target["blocks"] = {} |
|
|
|
|
|
if sprite_name not in script_y_offset: |
|
|
script_y_offset[sprite_name] = 0 |
|
|
|
|
|
for plan_entry in sprite_actions_data.get("plans", []): |
|
|
logic_sequence = str(plan_entry["logic"]) |
|
|
opcode_counts = plan_entry.get("opcode_counts", {}) |
|
|
refined_indent_logic = format_scratch_pseudo_code(logic_sequence) |
|
|
print(f"\n--------------------------- refined indent logic: {refined_indent_logic}-------------------------------\n") |
|
|
try: |
|
|
generated_blocks = block_builder(opcode_counts, refined_indent_logic) |
|
|
|
|
|
|
|
|
if not isinstance(generated_blocks, dict): |
|
|
logger.error(f"block_builder for sprite '{sprite_name}' returned non-dict type: {type(generated_blocks)}. Skipping block update.") |
|
|
continue |
|
|
|
|
|
if "blocks" in generated_blocks and isinstance(generated_blocks["blocks"], dict): |
|
|
logger.warning(f"LLM returned nested 'blocks' key for {sprite_name}. Unwrapping.") |
|
|
generated_blocks = generated_blocks["blocks"] |
|
|
|
|
|
|
|
|
for block_id, block_data in generated_blocks.items(): |
|
|
if block_data.get("topLevel"): |
|
|
block_data["x"] = script_x_offset_per_sprite.get(sprite_name, 0) |
|
|
block_data["y"] = script_y_offset[sprite_name] |
|
|
script_y_offset[sprite_name] += 150 |
|
|
|
|
|
current_sprite_target["blocks"].update(generated_blocks) |
|
|
print(f"[current_sprite_target block updated]: {current_sprite_target['blocks']}") |
|
|
state["iteration_count"] = 0 |
|
|
logger.info(f"Action blocks added for sprite '{sprite_name}' by OverallBlockBuilderNode.") |
|
|
except Exception as e: |
|
|
logger.error(f"Error generating blocks for sprite '{sprite_name}': {e}") |
|
|
state["project_json"] = project_json |
|
|
return state |
|
|
|
|
|
|
|
|
def variable_adder_node(state: GameState): |
|
|
logger.info("--- Running Variable Adder Node ---") |
|
|
project_json = state["project_json"] |
|
|
try: |
|
|
updated_project_json = variable_adder_main(project_json) |
|
|
if updated_project_json is not None: |
|
|
print("Variable added inside the project successfully!") |
|
|
state["project_json"]=updated_project_json |
|
|
else: |
|
|
print("Variable adder unable to add any variable inside the project!") |
|
|
state["project_json"]=project_json |
|
|
state["page_count"] +=1 |
|
|
return state |
|
|
except Exception as e: |
|
|
logger.error(f"Error in variable adder node while updating project_json': {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
def layer_order_correction(state: GameState): |
|
|
""" |
|
|
Ensures that all sprites (isStage: false) have unique layerOrder values >= 1. |
|
|
If duplicates are found, they are reassigned sequentially. |
|
|
""" |
|
|
logger.info("--- Running Layer Order Correction Node ---") |
|
|
try: |
|
|
project_json = state.get("project_json", {}) |
|
|
targets = project_json.get("targets", []) |
|
|
|
|
|
|
|
|
sprites = [t for t in targets if not t.get("isStage", False)] |
|
|
|
|
|
|
|
|
for idx, sprite in enumerate(sprites, start=1): |
|
|
old_lo = sprite.get("layerOrder", None) |
|
|
sprite["layerOrder"] = idx |
|
|
logger.debug(f"Sprite '{sprite.get('name')}' layerOrder: {old_lo} -> {idx}") |
|
|
|
|
|
|
|
|
for target in targets: |
|
|
if target.get("isStage", False): |
|
|
target["layerOrder"] = 0 |
|
|
|
|
|
|
|
|
state["project_json"]["targets"] = targets |
|
|
logger.info("Layer Order Correction completed successfully.") |
|
|
|
|
|
return state |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in Layer Order Correction Node: {e}") |
|
|
return state |
|
|
|
|
|
|
|
|
def processed_page_node(state: GameState): |
|
|
logger.info("--- Processing the Pages Node ---") |
|
|
image = state.get("project_image", "") |
|
|
cnt =state["page_count"] |
|
|
print(f"The page processed for page:--------------> {cnt}") |
|
|
if cnt<len(image): |
|
|
state["processing"]= True |
|
|
else: |
|
|
state["processing"]= False |
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_images_from_pdf(pdf_stream, output_dir): |
|
|
manipulated_json = {} |
|
|
try: |
|
|
pdf_id = uuid.uuid4().hex |
|
|
elements = partition_pdf( |
|
|
file=pdf_stream, |
|
|
strategy="hi_res", |
|
|
extract_image_block_types=["Image"], |
|
|
hi_res_model_name="yolox", |
|
|
extract_image_block_to_payload=False, |
|
|
extract_image_block_output_dir=BLOCKS_DIR, |
|
|
) |
|
|
file_elements = [element.to_dict() for element in elements] |
|
|
sprite_count = 1 |
|
|
for el in file_elements: |
|
|
img_path = el["metadata"].get("image_path") |
|
|
|
|
|
|
|
|
if not img_path: |
|
|
continue |
|
|
|
|
|
with open(img_path, "rb") as f: |
|
|
base_file = base64.b64encode(f.read()).decode("utf-8") |
|
|
|
|
|
image_uuid = str(uuid.uuid4()) |
|
|
manipulated_json[f"Sprite {sprite_count}"] = { |
|
|
"base64": base_file, |
|
|
"file-path": img_path, |
|
|
"pdf-id": pdf_id, |
|
|
"image-uuid": image_uuid, |
|
|
} |
|
|
|
|
|
sprite_count += 1 |
|
|
|
|
|
return manipulated_json |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"❌ Error in extract_images_from_pdf: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
def similarity_matching(sprites_data: dict, project_folder: str, top_k: int = 1, min_similarity: float = None) -> str: |
|
|
print("🔍 Running similarity matching…") |
|
|
import os |
|
|
import json |
|
|
import numpy as np |
|
|
import torch |
|
|
from PIL import Image, ImageOps, ImageEnhance |
|
|
from imagededup.methods import PHash |
|
|
from transformers import AutoImageProcessor, AutoModel |
|
|
import io |
|
|
import base64 |
|
|
from pathlib import Path |
|
|
import cv2 |
|
|
|
|
|
from image_match.goldberg import ImageSignature |
|
|
import sys |
|
|
import math |
|
|
import hashlib |
|
|
from typing import List, Tuple |
|
|
os.makedirs(project_folder, exist_ok=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
backdrop_base_path = os.path.normpath(str(BACKDROP_DIR)) |
|
|
sprite_base_path = os.path.normpath(str(SPRITE_DIR)) |
|
|
code_blocks_path = os.path.normpath(str(CODE_BLOCKS_DIR)) |
|
|
|
|
|
|
|
|
project_json_path = os.path.join(project_folder, "project.json") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sprite_ids, sprite_base64 = [], [] |
|
|
for sid, sprite in sprites_data.items(): |
|
|
sprite_ids.append(sid) |
|
|
sprite_base64.append(sprite["base64"]) |
|
|
|
|
|
sprite_images_bytes = [] |
|
|
sprite_b64_clean = [] |
|
|
for b64 in sprite_base64: |
|
|
|
|
|
raw_b64 = b64.split(",")[-1] |
|
|
sprite_b64_clean.append(raw_b64) |
|
|
|
|
|
|
|
|
img = Image.open(BytesIO(base64.b64decode(raw_b64))).convert("RGB") |
|
|
buffer = BytesIO() |
|
|
img.save(buffer, format="PNG") |
|
|
buffer.seek(0) |
|
|
sprite_images_bytes.append(buffer) |
|
|
|
|
|
def hybrid_similarity_matching(sprite_images_bytes, sprite_ids, min_similarity=None, top_k=5, method_weights=(0.5,0.3,0.2)): |
|
|
from PIL import Image |
|
|
|
|
|
embeddings_path = os.path.join(BLOCKS_DIR, "hybrid_embeddings.json") |
|
|
hash_path = os.path.join(BLOCKS_DIR, "phash_data.json") |
|
|
signature_path = os.path.join(BLOCKS_DIR, "signature_data.json") |
|
|
|
|
|
|
|
|
embedding_json = {} |
|
|
if os.path.exists(embeddings_path): |
|
|
with open(embeddings_path, "r", encoding="utf-8") as f: |
|
|
embedding_json = json.load(f) |
|
|
|
|
|
|
|
|
hash_dict = {} |
|
|
if os.path.exists(hash_path): |
|
|
try: |
|
|
with open(hash_path, "r", encoding="utf-8") as f: |
|
|
hash_data = json.load(f) |
|
|
for path, hash_str in hash_data.items(): |
|
|
try: |
|
|
hash_dict[path] = hash_str |
|
|
except Exception: |
|
|
pass |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
signature_dict = {} |
|
|
sig_data = {} |
|
|
if os.path.exists(signature_path): |
|
|
try: |
|
|
with open(signature_path, "r", encoding="utf-8") as f: |
|
|
sig_data = json.load(f) |
|
|
for path, sig_list in sig_data.items(): |
|
|
try: |
|
|
signature_dict[path] = np.array(sig_list) |
|
|
except Exception: |
|
|
pass |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
paths_list = [] |
|
|
embeddings_list = [] |
|
|
if isinstance(embedding_json, dict): |
|
|
for p, emb in embedding_json.items(): |
|
|
if isinstance(emb, dict): |
|
|
maybe_emb = emb.get("embedding") or emb.get("embeddings") or emb.get("emb") |
|
|
if maybe_emb is None: |
|
|
continue |
|
|
arr = np.asarray(maybe_emb, dtype=np.float32) |
|
|
elif isinstance(emb, list): |
|
|
arr = np.asarray(emb, dtype=np.float32) |
|
|
else: |
|
|
continue |
|
|
paths_list.append(os.path.normpath(str(p))) |
|
|
embeddings_list.append(arr) |
|
|
elif isinstance(embedding_json, list): |
|
|
for item in embedding_json: |
|
|
if not isinstance(item, dict): |
|
|
continue |
|
|
p = item.get("path") or item.get("image_path") or item.get("file") or item.get("filename") or item.get("img_path") |
|
|
emb = item.get("embeddings") or item.get("embedding") or item.get("features") or item.get("vector") or item.get("emb") |
|
|
if p is None or emb is None: |
|
|
continue |
|
|
paths_list.append(os.path.normpath(str(p))) |
|
|
embeddings_list.append(np.asarray(emb, dtype=np.float32)) |
|
|
|
|
|
if len(paths_list) == 0: |
|
|
print("⚠ No reference images/embeddings found (this test harness may be running without data)") |
|
|
|
|
|
return [[] for _ in sprite_images_bytes], [[] for _ in sprite_images_bytes], [] |
|
|
|
|
|
ref_matrix = np.vstack(embeddings_list).astype(np.float32) |
|
|
|
|
|
|
|
|
sprite_emb_list = [] |
|
|
sprite_phash_list = [] |
|
|
sprite_sig_list = [] |
|
|
per_sprite_final_indices = [] |
|
|
per_sprite_final_scores = [] |
|
|
per_sprite_rerank_debug = [] |
|
|
for i, sprite_bytes in enumerate(sprite_images_bytes): |
|
|
sprite_pil = Image.open(sprite_bytes) |
|
|
enhanced_sprite = process_image_cv2_from_pil(sprite_pil, scale=2) or sprite_pil |
|
|
|
|
|
|
|
|
sprite_emb = get_dinov2_embedding_from_pil(preprocess_for_model(enhanced_sprite)) |
|
|
sprite_emb = sprite_emb if sprite_emb is not None else np.zeros(ref_matrix.shape[1]) |
|
|
sprite_emb_list.append(sprite_emb) |
|
|
|
|
|
sprite_hash_arr = preprocess_for_hash(enhanced_sprite) |
|
|
sprite_phash = None |
|
|
if sprite_hash_arr is not None: |
|
|
try: sprite_phash = phash.encode_image(image_array=sprite_hash_arr) |
|
|
except: pass |
|
|
sprite_phash_list.append(sprite_phash) |
|
|
|
|
|
sprite_sig = None |
|
|
embedding_results, phash_results, imgmatch_results, combined_results = run_query_search_flow( |
|
|
query_b64=sprite_b64_clean[i], |
|
|
processed_dir=BLOCKS_DIR, |
|
|
embeddings_dict=embedding_json, |
|
|
hash_dict=hash_data, |
|
|
signature_obj_map=sig_data, |
|
|
gis=gis, |
|
|
phash=phash, |
|
|
MAX_PHASH_BITS=64, |
|
|
k=5 |
|
|
) |
|
|
|
|
|
rerank_result = choose_top_candidates(embedding_results, phash_results, imgmatch_results, |
|
|
top_k=top_k, method_weights=method_weights, verbose=True) |
|
|
per_sprite_rerank_debug.append(rerank_result) |
|
|
|
|
|
|
|
|
final = None |
|
|
if len(rerank_result["consensus_topk"]) > 0: |
|
|
consensus = rerank_result["consensus_topk"] |
|
|
best = max(consensus, key=lambda p: rerank_result["weighted_scores_full"].get(p, 0.0)) |
|
|
final = best |
|
|
else: |
|
|
final = rerank_result["weighted_topk"][0][0] if rerank_result["weighted_topk"] else None |
|
|
|
|
|
|
|
|
if final is not None and final in paths_list: |
|
|
idx = paths_list.index(final) |
|
|
score = rerank_result["weighted_scores_full"].get(final, 0.0) |
|
|
per_sprite_final_indices.append([idx]) |
|
|
per_sprite_final_scores.append([score]) |
|
|
print(f"Sprite '{sprite_ids}' FINAL selected: {final} (index {idx}) score={score:.4f}") |
|
|
else: |
|
|
per_sprite_final_indices.append([]) |
|
|
per_sprite_final_scores.append([]) |
|
|
|
|
|
return per_sprite_final_indices, per_sprite_final_scores, paths_list |
|
|
|
|
|
|
|
|
|
|
|
per_sprite_matched_indices, per_sprite_scores, paths_list = hybrid_similarity_matching( |
|
|
sprite_images_bytes, sprite_ids, min_similarity, top_k, method_weights=(0.5, 0.3, 0.2) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
project_data = [] |
|
|
backdrop_data = [] |
|
|
copied_sprite_folders = set() |
|
|
copied_backdrop_folders = set() |
|
|
|
|
|
matched_indices = sorted({idx for lst in per_sprite_matched_indices for idx in lst}) |
|
|
print("matched_indices------------------>",matched_indices) |
|
|
|
|
|
import shutil |
|
|
import json |
|
|
import os |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
sprite_base_p = Path(sprite_base_path).resolve(strict=False) |
|
|
backdrop_base_p = Path(backdrop_base_path).resolve(strict=False) |
|
|
project_folder_p = Path(project_folder) |
|
|
project_folder_p.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
copied_sprite_folders = set() |
|
|
copied_backdrop_folders = set() |
|
|
|
|
|
def display_like_windows_no_lead(p: Path) -> str: |
|
|
""" |
|
|
For human-readable logs only — convert Path to a string like: |
|
|
"app\\blocks\\Backdrops\\Castle 2.sb3" (no leading slash). |
|
|
""" |
|
|
s = p.as_posix() |
|
|
if s.startswith("/"): |
|
|
s = s[1:] |
|
|
return s.replace("/", "\\") |
|
|
|
|
|
def is_subpath(child: Path, parent: Path) -> bool: |
|
|
"""Robust membership test: is child under parent?""" |
|
|
try: |
|
|
|
|
|
child.relative_to(parent) |
|
|
return True |
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
|
|
|
matched_indices = sorted({idx for lst in per_sprite_matched_indices for idx in lst}) |
|
|
print("matched_indices------------------>", matched_indices) |
|
|
|
|
|
for matched_idx in matched_indices: |
|
|
|
|
|
if not (0 <= matched_idx < len(paths_list)): |
|
|
print(f" ⚠ matched_idx {matched_idx} out of range, skipping") |
|
|
continue |
|
|
|
|
|
matched_image_path = paths_list[matched_idx] |
|
|
matched_path_p = Path(matched_image_path).resolve(strict=False) |
|
|
matched_folder_p = matched_path_p.parent |
|
|
matched_filename = matched_path_p.name |
|
|
|
|
|
|
|
|
matched_folder_display = display_like_windows_no_lead(matched_folder_p) |
|
|
|
|
|
print(f"Processing matched image: {matched_image_path}") |
|
|
print(f" - Folder: {matched_folder_display}") |
|
|
print(f" - Sprite path: {display_like_windows_no_lead(sprite_base_p)}") |
|
|
print(f" - Backdrop path: {display_like_windows_no_lead(backdrop_base_p)}") |
|
|
print(f" - Filename: {matched_filename}") |
|
|
|
|
|
|
|
|
folder_key = matched_folder_p.as_posix() |
|
|
|
|
|
|
|
|
if is_subpath(matched_folder_p, sprite_base_p) and folder_key not in copied_sprite_folders: |
|
|
print(f"Processing SPRITE folder: {matched_folder_display}") |
|
|
copied_sprite_folders.add(folder_key) |
|
|
|
|
|
sprite_json_path = matched_folder_p / "sprite.json" |
|
|
print("sprite_json_path----------------------->", sprite_json_path) |
|
|
print("copied sprite folder----------------------->", copied_sprite_folders) |
|
|
if sprite_json_path.exists() and sprite_json_path.is_file(): |
|
|
try: |
|
|
with sprite_json_path.open("r", encoding="utf-8") as f: |
|
|
sprite_info = json.load(f) |
|
|
project_data.append(sprite_info) |
|
|
print(f" ✓ Successfully read sprite.json from {matched_folder_display}") |
|
|
except Exception as e: |
|
|
print(f" ✗ Failed to read sprite.json in {matched_folder_display}: {repr(e)}") |
|
|
else: |
|
|
print(f" ⚠ No sprite.json in {matched_folder_display}") |
|
|
|
|
|
|
|
|
try: |
|
|
sprite_files = list(matched_folder_p.iterdir()) |
|
|
except Exception as e: |
|
|
sprite_files = [] |
|
|
print(f" ✗ Failed to list files in {matched_folder_display}: {repr(e)}") |
|
|
|
|
|
print(f" Files in sprite folder: {[p.name for p in sprite_files]}") |
|
|
for p in sprite_files: |
|
|
fname = p.name |
|
|
if fname in (matched_filename, "sprite.json"): |
|
|
print(f" Skipping {fname} (matched image or sprite.json)") |
|
|
continue |
|
|
if p.is_file(): |
|
|
dst = project_folder_p / fname |
|
|
try: |
|
|
shutil.copy2(str(p), str(dst)) |
|
|
print(f" ✓ Copied sprite asset: {p} -> {dst}") |
|
|
except Exception as e: |
|
|
print(f" ✗ Failed to copy sprite asset {p}: {repr(e)}") |
|
|
else: |
|
|
print(f" Skipping {fname} (not a file)") |
|
|
|
|
|
|
|
|
if is_subpath(matched_folder_p, backdrop_base_p) and folder_key not in copied_backdrop_folders: |
|
|
print(f"Processing BACKDROP folder: {matched_folder_display}") |
|
|
copied_backdrop_folders.add(folder_key) |
|
|
print("backdrop_base_path----------------------->", display_like_windows_no_lead(backdrop_base_p)) |
|
|
print("copied backdrop folder----------------------->", copied_backdrop_folders) |
|
|
|
|
|
|
|
|
backdrop_src = matched_folder_p / matched_filename |
|
|
backdrop_dst = project_folder_p / matched_filename |
|
|
if backdrop_src.exists() and backdrop_src.is_file(): |
|
|
try: |
|
|
shutil.copy2(str(backdrop_src), str(backdrop_dst)) |
|
|
print(f" ✓ Copied matched backdrop image: {backdrop_src} -> {backdrop_dst}") |
|
|
except Exception as e: |
|
|
print(f" ✗ Failed to copy matched backdrop image {backdrop_src}: {repr(e)}") |
|
|
else: |
|
|
print(f" ⚠ Matched backdrop source not found: {backdrop_src}") |
|
|
|
|
|
|
|
|
try: |
|
|
backdrop_files = list(matched_folder_p.iterdir()) |
|
|
except Exception as e: |
|
|
backdrop_files = [] |
|
|
print(f" ✗ Failed to list files in {matched_folder_display}: {repr(e)}") |
|
|
|
|
|
print(f" Files in backdrop folder: {[p.name for p in backdrop_files]}") |
|
|
for p in backdrop_files: |
|
|
fname = p.name |
|
|
if fname in (matched_filename, "project.json"): |
|
|
print(f" Skipping {fname} (matched image or project.json)") |
|
|
continue |
|
|
if p.is_file(): |
|
|
dst = project_folder_p / fname |
|
|
try: |
|
|
shutil.copy2(str(p), str(dst)) |
|
|
print(f" ✓ Copied backdrop asset: {p} -> {dst}") |
|
|
except Exception as e: |
|
|
print(f" ✗ Failed to copy backdrop asset {p}: {repr(e)}") |
|
|
else: |
|
|
print(f" Skipping {fname} (not a file)") |
|
|
|
|
|
|
|
|
pj = matched_folder_p / "project.json" |
|
|
if pj.exists() and pj.is_file(): |
|
|
try: |
|
|
with pj.open("r", encoding="utf-8") as f: |
|
|
bd_json = json.load(f) |
|
|
stage_count = 0 |
|
|
for tgt in bd_json.get("targets", []): |
|
|
if tgt.get("isStage"): |
|
|
backdrop_data.append(tgt) |
|
|
stage_count += 1 |
|
|
print(f" ✓ Successfully read project.json from {matched_folder_display}, found {stage_count} stage(s)") |
|
|
except Exception as e: |
|
|
print(f" ✗ Failed to read project.json in {matched_folder_display}: {repr(e)}") |
|
|
else: |
|
|
print(f" ⚠ No project.json in {matched_folder_display}") |
|
|
|
|
|
print("---") |
|
|
|
|
|
final_project = { |
|
|
"targets": [], "monitors": [], "extensions": [], |
|
|
"meta": { |
|
|
"semver": "3.0.0", |
|
|
"vm": "11.3.0", |
|
|
"agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
for spr in project_data: |
|
|
if not spr.get("isStage", False): |
|
|
final_project["targets"].append(spr) |
|
|
|
|
|
if backdrop_data: |
|
|
all_costumes, sounds = [], [] |
|
|
seen_costumes = set() |
|
|
for i, bd in enumerate(backdrop_data): |
|
|
for costume in bd.get("costumes", []): |
|
|
key = (costume.get("name"), costume.get("assetId")) |
|
|
if key not in seen_costumes: |
|
|
seen_costumes.add(key) |
|
|
all_costumes.append(costume) |
|
|
if i == 0: |
|
|
sounds = bd.get("sounds", []) |
|
|
stage_obj={ |
|
|
"isStage": True, |
|
|
"name": "Stage", |
|
|
"objName": "Stage", |
|
|
"variables": {}, |
|
|
"lists": {}, |
|
|
"broadcasts": {}, |
|
|
"blocks": {}, |
|
|
"comments": {}, |
|
|
"currentCostume": 1 if len(all_costumes) > 1 else 0, |
|
|
"costumes": all_costumes, |
|
|
"sounds": sounds, |
|
|
"volume": 100, |
|
|
"layerOrder": 0, |
|
|
"tempo": 60, |
|
|
"videoTransparency": 50, |
|
|
"videoState": "on", |
|
|
"textToSpeechLanguage": None |
|
|
} |
|
|
final_project["targets"].insert(0, stage_obj) |
|
|
else: |
|
|
logger.warning("⚠️ No backdrop matched. Using default static backdrop.") |
|
|
default_backdrop_path = BACKDROP_DIR / "cd21514d0531fdffb22204e0ec5ed84a.svg" |
|
|
default_backdrop_name = "cd21514d0531fdffb22204e0ec5ed84a.svg" |
|
|
default_backdrop_sound = BACKDROP_DIR / "83a9787d4cb6f3b7632b4ddfebf74367.wav" |
|
|
default_backdrop_sound_name = "cd21514d0531fdffb22204e0ec5ed84a.svg" |
|
|
try: |
|
|
shutil.copy2(default_backdrop_path, os.path.join(project_folder, default_backdrop_name)) |
|
|
logger.info(f"✅ Default backdrop copied to project: {default_backdrop_name}") |
|
|
shutil.copy2(default_backdrop_sound, os.path.join(project_folder, default_backdrop_sound_name)) |
|
|
logger.info(f"✅ Default backdrop sound copied to project: {default_backdrop_sound_name}") |
|
|
except Exception as e: |
|
|
logger.error(f"❌ Failed to copy default backdrop: {e}") |
|
|
stage_obj={ |
|
|
"isStage": True, |
|
|
"name": "Stage", |
|
|
"objName": "Stage", |
|
|
"variables": {}, |
|
|
"lists": {}, |
|
|
"broadcasts": {}, |
|
|
"blocks": {}, |
|
|
"comments": {}, |
|
|
"currentCostume": 0, |
|
|
"costumes": [ |
|
|
{ |
|
|
"assetId": default_backdrop_name.split(".")[0], |
|
|
"name": "defaultBackdrop", |
|
|
"md5ext": default_backdrop_name, |
|
|
"dataFormat": "svg", |
|
|
"rotationCenterX": 240, |
|
|
"rotationCenterY": 180 |
|
|
} |
|
|
], |
|
|
"sounds": [ |
|
|
{ |
|
|
"name": "pop", |
|
|
"assetId": "83a9787d4cb6f3b7632b4ddfebf74367", |
|
|
"dataFormat": "wav", |
|
|
"format": "", |
|
|
"rate": 48000, |
|
|
"sampleCount": 1123, |
|
|
"md5ext": "83a9787d4cb6f3b7632b4ddfebf74367.wav" |
|
|
} |
|
|
], |
|
|
"volume": 100, |
|
|
"layerOrder": 0, |
|
|
"tempo": 60, |
|
|
"videoTransparency": 50, |
|
|
"videoState": "on", |
|
|
"textToSpeechLanguage": None |
|
|
} |
|
|
final_project["targets"].insert(0, stage_obj) |
|
|
|
|
|
with open(project_json_path, 'w') as f: |
|
|
json.dump(final_project, f, indent=2) |
|
|
|
|
|
return project_json_path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def convert_pdf_stream_to_images(pdf_stream: io.BytesIO, dpi=300): |
|
|
|
|
|
pdf_stream.seek(0) |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf: |
|
|
tmp_pdf.write(pdf_stream.read()) |
|
|
tmp_pdf_path = tmp_pdf.name |
|
|
|
|
|
|
|
|
images = convert_from_path(tmp_pdf_path, dpi=dpi) |
|
|
return images |
|
|
|
|
|
def delay_for_tpm_node(state: GameState): |
|
|
logger.info("--- Running DelayForTPMNode ---") |
|
|
time.sleep(10) |
|
|
logger.info("Delay completed.") |
|
|
return state |
|
|
|
|
|
|
|
|
workflow = StateGraph(GameState) |
|
|
workflow.add_node("pseudo_generator", pseudo_generator_node) |
|
|
workflow.add_node("Node_optimizer", node_optimizer) |
|
|
workflow.add_node("layer_optimizer", layer_order_correction) |
|
|
workflow.add_node("block_builder", overall_block_builder_node_2) |
|
|
workflow.add_node("variable_initializer", variable_adder_node) |
|
|
workflow.add_node("page_processed", processed_page_node) |
|
|
|
|
|
workflow.set_entry_point("page_processed") |
|
|
|
|
|
def decide_next_step(state: GameState): |
|
|
if state.get("processing", False): |
|
|
return "pseudo_generator" |
|
|
else: |
|
|
return "layer_optimizer" |
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"page_processed", |
|
|
decide_next_step, |
|
|
{ |
|
|
"pseudo_generator": "pseudo_generator", |
|
|
"layer_optimizer": "layer_optimizer" |
|
|
} |
|
|
) |
|
|
|
|
|
workflow.add_edge("pseudo_generator", "Node_optimizer") |
|
|
workflow.add_edge("Node_optimizer", "block_builder") |
|
|
workflow.add_edge("block_builder", "variable_initializer") |
|
|
workflow.add_edge("variable_initializer", "page_processed") |
|
|
workflow.add_edge("layer_optimizer", END) |
|
|
|
|
|
app_graph = workflow.compile() |
|
|
|
|
|
|
|
|
def upscale_image(image: Image.Image, scale: int = 2) -> Image.Image: |
|
|
""" |
|
|
Upscales a PIL image by a given scale factor. |
|
|
""" |
|
|
try: |
|
|
width, height = image.size |
|
|
new_size = (width * scale, height * scale) |
|
|
upscaled_image = image.resize(new_size, Image.LANCZOS) |
|
|
logger.info(f"✅ Upscaled image to {new_size}") |
|
|
return upscaled_image |
|
|
except Exception as e: |
|
|
logger.error(f"❌ Error during image upscaling: {str(e)}") |
|
|
return image |
|
|
|
|
|
@log_execution_time |
|
|
def create_sb3_archive(project_folder, project_id): |
|
|
""" |
|
|
Zips the project folder and renames it to an .sb3 file. |
|
|
|
|
|
Args: |
|
|
project_folder (str): The path to the directory containing the project.json and assets. |
|
|
project_id (str): The unique ID for the project, used for naming the .sb3 file. |
|
|
|
|
|
Returns: |
|
|
str: The path to the created .sb3 file, or None if an error occurred. |
|
|
""" |
|
|
print(" --------------------------------------- create_sb3_archive INITIALIZE ---------------------------------------") |
|
|
output_filename = GEN_PROJECT_DIR / project_id |
|
|
print(" --------------------------------------- output_filename ---------------------------------------",output_filename) |
|
|
zip_path = None |
|
|
sb3_path = None |
|
|
try: |
|
|
zip_path = shutil.make_archive(output_filename, 'zip', root_dir=project_folder) |
|
|
print(" --------------------------------------- zip_path_str ---------------------------------------", output_filename, project_folder) |
|
|
logger.info(f"Project folder zipped to: {zip_path}") |
|
|
|
|
|
|
|
|
sb3_path = f"{output_filename}.sb3" |
|
|
os.rename(zip_path, sb3_path) |
|
|
print(" --------------------------------------- rename paths ---------------------------------------", zip_path, sb3_path) |
|
|
logger.info(f"Renamed {zip_path} to {sb3_path}") |
|
|
|
|
|
return sb3_path |
|
|
except Exception as e: |
|
|
logger.error(f"Error creating SB3 archive for {project_id}: {e}") |
|
|
|
|
|
if zip_path and os.path.exists(zip_path): |
|
|
os.remove(zip_path) |
|
|
if sb3_path and os.path.exists(sb3_path): |
|
|
os.remove(sb3_path) |
|
|
return sb3_path |
|
|
|
|
|
|
|
|
|
|
|
def save_pdf_to_generated_dir(pdf_stream: io.BytesIO, project_id: str) -> str: |
|
|
""" |
|
|
Copies the PDF at `pdf_stream` into GEN_PROJECT_DIR/project_id/, |
|
|
renaming it to <project_id>.pdf. |
|
|
|
|
|
Args: |
|
|
pdf_stream (io.BytesIO): Any existing stream to a PDF file. |
|
|
project_id (str): Your unique project identifier. |
|
|
|
|
|
Returns: |
|
|
str: Path to the copied PDF in the generated directory, |
|
|
or None if something went wrong. |
|
|
""" |
|
|
|
|
|
try: |
|
|
|
|
|
output_dir = GEN_PROJECT_DIR / project_id |
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
print(f"\n--------------------------------output_dir {output_dir}") |
|
|
|
|
|
|
|
|
target_pdf = output_dir / f"{project_id}.pdf" |
|
|
print(f"\n--------------------------------target_pdf {target_pdf}") |
|
|
|
|
|
|
|
|
|
|
|
if isinstance(pdf_stream, io.BytesIO): |
|
|
with open(target_pdf, "wb") as f: |
|
|
f.write(pdf_stream.getbuffer()) |
|
|
else: |
|
|
shutil.copy2(pdf_stream, target_pdf) |
|
|
print(f"Copied PDF from {pdf_stream} → {target_pdf}") |
|
|
logger.info(f"Copied PDF from {pdf_stream} → {target_pdf}") |
|
|
|
|
|
|
|
|
|
|
|
return str(target_pdf) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to save PDF to generated dir: {e}", exc_info=True) |
|
|
return None |
|
|
|
|
|
@app.route('/') |
|
|
def index(): |
|
|
return render_template('app_index.html') |
|
|
|
|
|
@app.route("/download_sb3/<project_id>", methods=["GET"]) |
|
|
def download_sb3(project_id): |
|
|
sb3_path = GEN_PROJECT_DIR / f"{project_id}.sb3" |
|
|
if not sb3_path.exists(): |
|
|
return jsonify({"error": "Scratch project file not found"}), 404 |
|
|
|
|
|
return send_file( |
|
|
sb3_path, |
|
|
as_attachment=True, |
|
|
download_name=sb3_path.name |
|
|
) |
|
|
|
|
|
@app.route("/download_pdf/<project_id>", methods=["GET"]) |
|
|
def download_pdf(project_id): |
|
|
pdf_path = GEN_PROJECT_DIR / project_id / f"{project_id}.pdf" |
|
|
if not pdf_path.exists(): |
|
|
return jsonify({"error": "Scratch project file not found"}), 404 |
|
|
|
|
|
return send_file( |
|
|
pdf_path, |
|
|
as_attachment=True, |
|
|
download_name=pdf_path.name |
|
|
) |
|
|
|
|
|
@app.route("/download_sound/<sound_id>", methods=["GET"]) |
|
|
def download_sound(sound_id): |
|
|
sound_path = SOUND_DIR / f"{sound_id}.wav" |
|
|
if not sound_path.exists(): |
|
|
return jsonify({"error": "Scratch project sound file not found"}), 404 |
|
|
|
|
|
return send_file( |
|
|
sound_path, |
|
|
as_attachment=True, |
|
|
download_name=sound_path.name |
|
|
) |
|
|
|
|
|
|
|
|
@app.route('/process_pdf', methods=['POST']) |
|
|
def process_pdf(): |
|
|
try: |
|
|
logger.info("Received request to process PDF.") |
|
|
if 'pdf_file' not in request.files: |
|
|
logger.warning("No PDF file found in request.") |
|
|
return jsonify({"error": "Missing PDF file in form-data with key 'pdf_file'"}), 400 |
|
|
|
|
|
pdf_file = request.files['pdf_file'] |
|
|
if pdf_file.filename == '': |
|
|
return jsonify({"error": "Empty filename"}), 400 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
project_id = str(uuid.uuid4()).replace('-', '') |
|
|
|
|
|
project_folder = OUTPUT_DIR / project_id |
|
|
|
|
|
pdf_bytes = pdf_file.read() |
|
|
pdf_stream = io.BytesIO(pdf_bytes) |
|
|
logger.info(f"Saved uploaded PDF to: {pdf_stream}") |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
pdf= save_pdf_to_generated_dir(pdf_stream, project_id) |
|
|
logger.info(f"Saved uploaded PDF to: {pdf_file}: {pdf}") |
|
|
print("--------------------------------pdf_file_path---------------------",pdf_file,pdf_stream) |
|
|
total_time = time.time() - start_time |
|
|
print(f"-----------------------------Execution Time save_pdf_to_generated_dir() : {total_time}-----------------------------\n") |
|
|
start_time = time.time() |
|
|
|
|
|
output_path = extract_images_from_pdf(pdf_stream,project_folder) |
|
|
print(" --------------------------------------- zip_path_str ---------------------------------------", output_path) |
|
|
total_time = time.time() - start_time |
|
|
print(f"-----------------------------Execution Time extract_images_from_pdf() : {total_time}-----------------------------\n") |
|
|
start_time = time.time() |
|
|
project_output = similarity_matching(output_path, project_folder) |
|
|
logger.info("Received request to process PDF.") |
|
|
total_time = time.time() - start_time |
|
|
print(f"-----------------------------Execution Time similarity_matching() : {total_time}-----------------------------\n") |
|
|
|
|
|
with open(project_output, 'r') as f: |
|
|
project_skeleton = json.load(f) |
|
|
|
|
|
if isinstance(pdf_stream, io.BytesIO): |
|
|
images = convert_pdf_stream_to_images(pdf_stream, dpi=300) |
|
|
else: |
|
|
images = convert_from_path(pdf_stream, dpi=300) |
|
|
|
|
|
|
|
|
initial_state_dict = { |
|
|
"project_json": project_skeleton, |
|
|
"description": "The pseudo code for the script", |
|
|
"project_id": project_id, |
|
|
"project_image": images, |
|
|
"action_plan": {}, |
|
|
"pseudo_code": {}, |
|
|
"temporary_node": {}, |
|
|
"processing":True, |
|
|
"page_count": 0, |
|
|
"temp_pseudo_code":[], |
|
|
} |
|
|
|
|
|
final_state_dict = app_graph.invoke(initial_state_dict,config={"recursion_limit": 200}) |
|
|
final_project_json = final_state_dict['project_json'] |
|
|
|
|
|
|
|
|
|
|
|
with open(project_output, "w") as f: |
|
|
json.dump(final_project_json, f, indent=2) |
|
|
logger.info(f"Final project JSON saved to {project_output}") |
|
|
|
|
|
|
|
|
sb3_file_path = create_sb3_archive(project_folder, project_id) |
|
|
|
|
|
if sb3_file_path: |
|
|
logger.info(f"Successfully created SB3 file: {sb3_file_path}") |
|
|
|
|
|
download_url = f"https://prthm11-scratch-vision-game.hf.space/download_sb3/{project_id}" |
|
|
pdf_url = f"https://prthm11-scratch-vision-game.hf.space/download_pdf/{project_id}" |
|
|
print(f"DOWNLOAD_URL: {download_url}") |
|
|
print(f"PDF_URL: {pdf_url}") |
|
|
|
|
|
return jsonify({ |
|
|
"message": "✅ PDF processed successfully", |
|
|
"output_json": "output_path", |
|
|
"sprites": "result", |
|
|
"project_output_json": "project_output", |
|
|
"test_url": download_url |
|
|
}) |
|
|
else: |
|
|
return jsonify({ |
|
|
"message": "❌ Scanned images are not clear please retry!", |
|
|
"isError": True, |
|
|
"output_json": "output_path", |
|
|
"sprites": "result", |
|
|
"project_output_json": "project_output", |
|
|
"test_url": download_url |
|
|
}), 500 |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error during processing the pdf workflow for project ID {project_id}: {e}", exc_info=True) |
|
|
return jsonify({ |
|
|
"message": "❌ Scanned images are not clear please retry!", |
|
|
"isError": True, |
|
|
"output_json": "output_path", |
|
|
"sprites": "result", |
|
|
"project_output_json": "project_output", |
|
|
"test_url": "download_url" |
|
|
}), 500 |
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
|
|
app.run(host='0.0.0.0', port=7860, debug=True) |