prthm11's picture
Update app.py
b53e673 verified
raw
history blame
136 kB
from flask import Flask, request, jsonify, render_template, send_from_directory, send_file
import cv2, json,base64,io,os,tempfile,logging, re
import numpy as np
from unstructured.partition.pdf import partition_pdf
from PIL import Image, ImageOps, ImageEnhance
from dotenv import load_dotenv
from werkzeug.utils import secure_filename
from langchain_groq import ChatGroq
from langgraph.prebuilt import create_react_agent
from pdf2image import convert_from_path, convert_from_bytes
from typing import Dict, TypedDict, Optional, Any, List, Tuple
from collections import defaultdict
from langgraph.graph import StateGraph, END
import uuid
import shutil, time, functools
from io import BytesIO
from pathlib import Path
from utils.block_relation_builder import block_builder, separate_scripts, transform_logic_to_action_flow, analyze_opcode_counts
from difflib import get_close_matches
import torch
from transformers import AutoImageProcessor, AutoModel
import torch
import json
import cv2
from imagededup.methods import PHash
from image_match.goldberg import ImageSignature
import sys
import math
import hashlib
# DINOv2 model id
DINOV2_MODEL = "facebook/dinov2-small"
# For PHash normalization when combining scores: assumed max hamming bits (typical phash=64)
MAX_PHASH_BITS = 64
# ----------------------
# INITIALIZE MODELS
# ----------------------
print("Initializing models and helpers...")
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if DEVICE.type == "cpu":
torch.set_num_threads(4)
dinov2_processor = AutoImageProcessor.from_pretrained(DINOV2_MODEL)
dinov2_model = AutoModel.from_pretrained(DINOV2_MODEL)
dinov2_model.to(DEVICE)
dinov2_model.eval()
phash = PHash()
gis = ImageSignature()
load_dotenv()
# os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY")
groq_api_key = os.getenv("GROQ_API_KEY")
llm = ChatGroq(
model="meta-llama/llama-4-scout-17b-16e-instruct",
temperature=0,
max_tokens=None,
)
app = Flask(__name__)
backdrop_images_path = r"app\blocks\Backdrops"
sprite_images_path = r"app\blocks\sprites"
code_blocks_image_path = r"app\blocks\code_blocks"
count = 0
from pathlib import Path
BASE_DIR = Path(os.getenv("APP_BASE_DIR", Path(__file__).resolve().parent)) # fallback to code location
BASE_DIR = Path("/app")
LOGS_DIR = Path(os.getenv("LOGS_DIR", "/tmp/logs")).resolve()
LOGS_DIR.mkdir(parents=True, exist_ok=True)
STATIC_DIR = BASE_DIR / "static"
GEN_PROJECT_DIR = BASE_DIR / "generated_projects"
# adjust BLOCKS_DIR etc:
BLOCKS_DIR = BASE_DIR / "blocks"
BACKDROP_DIR = BLOCKS_DIR / "Backdrops"
SPRITE_DIR = BLOCKS_DIR / "sprites"
CODE_BLOCKS_DIR = BLOCKS_DIR / "code_blocks"
SOUND_DIR = BLOCKS_DIR / "sound"
# BASE_DIR = Path("/app")
# BLOCKS_DIR = BASE_DIR / "blocks"
# BACKDROP_DIR = BLOCKS_DIR / "Backdrops"
# SPRITE_DIR = BLOCKS_DIR / "sprites"
# CODE_BLOCKS_DIR = BLOCKS_DIR / "code_blocks"
# # === new: outputs rooted under BASE_DIR ===
OUTPUT_DIR = BASE_DIR / "outputs"
# Global variables to hold the model and index, loaded only once.
MODEL = None
FAISS_INDEX = None
IMAGE_PATHS = None
# make all of them in one go
for d in (
BLOCKS_DIR,
STATIC_DIR,
GEN_PROJECT_DIR,
BACKDROP_DIR,
SPRITE_DIR,
CODE_BLOCKS_DIR,
SOUND_DIR,
OUTPUT_DIR,
):
d.mkdir(parents=True, exist_ok=True)
def log_execution_time(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
logger.info(f"⏱ {func.__name__} executed in {end_time - start_time:.2f} seconds")
return result
return wrapper
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(str(LOGS_DIR / "app.log")),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class GameState(TypedDict):
project_json: dict
description: str
project_id: str
project_image: str
pseudo_code: dict
action_plan: Optional[Dict]
temporary_node: Optional[Dict]
page_count: int
processing: bool
temp_pseudo_code: list
SYSTEM_PROMPT ="""Your task is to process OCR-extracted text from images of Scratch 3.0 code blocks and produce precisely formatted pseudocode JSON.
### Core Role
- Treat this as an OCR refinement task: the input may contain typos or spacing issues.
- Intelligently correct OCR mistakes to align with valid Scratch 3.0 block syntax.
### Universal Rules
1. **Code Detection:** If no Scratch blocks are detected, the `pseudocode` value must be "No Code-blocks".
2. **Script Ownership:** Determine the target from "Script for:". If it matches a `Stage_costumes` name, set `name_variable` to "Stage".
3. **Pseudocode Structure:**
- The pseudocode must be a single JSON string with `\n` for newlines.
- Indent nested blocks with 4 spaces.
- Every script (hat block) and every C-block (if, repeat, forever) MUST have a corresponding `end` at the correct indentation level.
4. **Formatting Syntax:**
- Numbers & Text: `(5)`, `(hello)`
- Variables & Dropdowns: `[score v]`, `[space v]`
- Reporters: `((x position))`
- Booleans: `<condition>`
5. **Final Output:** Your response must ONLY be the valid JSON object and nothing else."""
SYSTEM_PROMPT_JSON_CORRECTOR = """
You are a JSON correction assistant. Your ONLY task is to fix malformed JSON and return it in the correct format.
REQUIRED OUTPUT FORMAT:
{
"refined_logic": {
"name_variable": "sprite_name_here",
"pseudocode": "pseudocode_string_here"
}
}
RULES:
1. Extract the sprite name and pseudocode from the input
2. Return ONLY valid JSON in the exact format above
3. No explanations, no extra text, no other fields
4. If you can't find the data, use "Unknown" for name_variable and "No pseudocode found" for pseudocode
"""
# Main agent of the system agent for Scratch 3.0
agent = create_react_agent(
model=llm,
tools=[], # No specific tools are defined here, but could be added later
prompt=SYSTEM_PROMPT
)
agent_json_resolver = create_react_agent(
model=llm,
tools=[], # No specific tools are defined here, but could be added later
prompt=SYSTEM_PROMPT_JSON_CORRECTOR
)
# -----------------------
# SERIALIZABLE HELPER
# -----------------------
def make_json_serializable(obj):
"""Recursively convert numpy and other objects into JSON-serializable types."""
if obj is None:
return None
if isinstance(obj, (str, int, float, bool)):
return obj
if isinstance(obj, np.ndarray):
return obj.tolist()
if isinstance(obj, dict):
return {str(k): make_json_serializable(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)):
return [make_json_serializable(v) for v in obj]
# some image-match signatures may contain numpy, so try .tolist
try:
return obj.tolist()
except Exception:
pass
# fallback to string
return str(obj)
# -----------------------
# BASE64 <-> PIL
# -----------------------
def pil_to_base64(pil_img, fmt="PNG"):
buffer = io.BytesIO()
pil_img.save(buffer, format=fmt)
return base64.b64encode(buffer.getvalue()).decode("utf-8")
def base64_to_pil(b64):
try:
data = base64.b64decode(b64)
return Image.open(io.BytesIO(data))
except Exception as e:
print(f"[base64_to_pil] Error: {e}")
return None
# -----------------------
# PIL helpers
# -----------------------
def load_image_pil(path):
try:
return Image.open(path)
except Exception as e:
print(f"[load_image_pil] Could not open {path}: {e}")
return None
def add_background(pil_img, bg_color=(255,255,255), size=None):
if pil_img is None:
return None
try:
target = size if size is not None else pil_img.size
bg = Image.new("RGB", target, bg_color)
img_rgba = pil_img.convert("RGBA")
if img_rgba.size != target:
x = (target[0] - img_rgba.size[0]) // 2
y = (target[1] - img_rgba.size[1]) // 2
else:
x, y = 0, 0
mask = img_rgba.split()[3] if img_rgba.mode == "RGBA" else None
bg.paste(img_rgba.convert("RGB"), (x,y), mask=mask)
return bg
except Exception as e:
print(f"[add_background] Error: {e}")
return None
def preprocess_for_hash(pil_img, size=(256,256)):
try:
img = pil_img.convert("RGB")
img = ImageOps.grayscale(img)
img = ImageOps.equalize(img)
img = img.resize(size)
return np.array(img).astype(np.uint8)
except Exception as e:
print(f"[preprocess_for_hash] Error: {e}")
return None
def preprocess_for_model(pil_img):
try:
if pil_img.mode == "RGBA":
pil_img = pil_img.convert("RGB")
elif pil_img.mode == "L":
pil_img = pil_img.convert("RGB")
else:
pil_img = pil_img.convert("RGB")
return pil_img
except Exception as e:
print(f"[preprocess_for_model] Error: {e}")
return None
def get_dinov2_embedding_from_pil(pil_img):
try:
if pil_img is None:
return None
inputs = dinov2_processor(images=pil_img, return_tensors="pt").to(DEVICE)
with torch.no_grad():
outputs = dinov2_model(**inputs)
# CLS token embedding
emb = outputs.last_hidden_state[:,0,:].squeeze(0).cpu().numpy()
n = np.linalg.norm(emb)
if n == 0 or np.isnan(n):
return None
return (emb / n).astype(float)
except Exception as e:
print(f"[get_dinov2_embedding_from_pil] Error: {e}")
return None
# -----------------------
# OpenCV enhancement (accepts PIL)
# -----------------------
def pil_to_bgr_np(pil_img):
arr = np.array(pil_img.convert("RGB"))
return cv2.cvtColor(arr, cv2.COLOR_RGB2BGR)
def bgr_np_to_pil(bgr_np):
rgb = cv2.cvtColor(bgr_np, cv2.COLOR_BGR2RGB)
return Image.fromarray(rgb)
def upscale_image_cv(bgr_np, scale=2):
h,w = bgr_np.shape[:2]
return cv2.resize(bgr_np, (w*scale, h*scale), interpolation=cv2.INTER_CUBIC)
def reduce_noise_cv(bgr_np):
return cv2.fastNlMeansDenoisingColored(bgr_np, None, 10,10,7,21)
def sharpen_cv(bgr_np):
kernel = np.array([[0,-1,0],[-1,5,-1],[0,-1,0]])
return cv2.filter2D(bgr_np, -1, kernel)
def enhance_contrast_cv(bgr_np):
pil_img = Image.fromarray(cv2.cvtColor(bgr_np, cv2.COLOR_BGR2RGB))
enhancer = ImageEnhance.Contrast(pil_img)
enhanced = enhancer.enhance(1.5)
return cv2.cvtColor(np.array(enhanced), cv2.COLOR_RGB2BGR)
def process_image_cv2_from_pil(pil_img, scale=2):
try:
bgr = pil_to_bgr_np(pil_img)
bgr = upscale_image_cv(bgr, scale=scale) if scale != 1 else bgr
bgr = reduce_noise_cv(bgr)
bgr = sharpen_cv(bgr)
bgr = enhance_contrast_cv(bgr)
return bgr_np_to_pil(bgr)
except Exception as e:
print(f"[process_image_cv2_from_pil] Error: {e}")
return None
# cosine similarity
def cosine_similarity(a, b):
return float(np.dot(a, b))
# --------------------------
# Hybrid Selection of Best Match
# --------------------------
def run_query_search_flow(
query_path: Optional[str] = None,
query_b64: Optional[str] = None,
processed_dir: str = "./processed",
embeddings_dict: Dict[str, np.ndarray] = None,
hash_dict: Dict[str, Any] = None,
signature_obj_map: Dict[str, Any] = None,
gis: Any = None,
phash: Any = None,
MAX_PHASH_BITS: int = 64,
k: int = 10,
) -> Tuple[
List[Tuple[str, float]],
List[Tuple[str, Any, float]],
List[Tuple[str, Any, float]],
List[Tuple[str, float, float, float, float]],
]:
"""
Run the full query/search flow (base64 -> preprocess -> embed -> scoring).
Accepts either query_path (file on disk) OR query_b64 (base64 string). If both are
provided, query_b64 takes precedence.
Returns:
embedding_results_sorted,
phash_results_sorted,
imgmatch_results_sorted,
combined_results_sorted
"""
# Validate inputs
if (query_path is None or query_path == "") and (query_b64 is None or query_b64 == ""):
raise ValueError("Either query_path or query_b64 must be provided.")
# Ensure processed_dir exists
os.makedirs(processed_dir, exist_ok=True)
print("\n--- Query/Search Phase ---")
# 1) Load query image (prefer base64 if provided)
if query_b64:
# base64 provided directly -> decode to PIL
query_from_b64 = base64_to_pil(query_b64)
if query_from_b64 is None:
raise RuntimeError("Could not decode provided base64 query. Exiting.")
query_pil_orig = query_from_b64
else:
# load from disk
if not os.path.exists(query_path):
raise FileNotFoundError(f"Query image not found: {query_path}")
query_pil_orig = load_image_pil(query_path)
if query_pil_orig is None:
raise RuntimeError("Could not load query image from path. Exiting.")
# also create a base64 roundtrip for robustness (keep original behaviour)
try:
query_b64 = pil_to_base64(query_pil_orig, fmt="PNG")
except Exception as e:
raise RuntimeError(f"Could not base64 query from disk image: {e}")
# keep decoded copy for consistency
query_from_b64 = base64_to_pil(query_b64)
if query_from_b64 is None:
raise RuntimeError("Could not decode query base64 after roundtrip. Exiting.")
# At this point, query_from_b64 is a PIL.Image we can continue with
# 2) Preprocess with OpenCV enhancement (best-effort; fallback to base64-decoded image)
enhanced_query_pil = process_image_cv2_from_pil(query_from_b64, scale=2)
if enhanced_query_pil is None:
print("[Query] OpenCV enhancement failed; falling back to base64-decoded image.")
enhanced_query_pil = query_from_b64
# Save the enhanced query (best-effort)
query_enhanced_path = os.path.join(processed_dir, "query_enhanced.png")
try:
enhanced_query_pil.save(query_enhanced_path, format="PNG")
except Exception:
try:
enhanced_query_pil.convert("RGB").save(query_enhanced_path, format="PNG")
except Exception:
print("[Warning] Could not save enhanced query image for inspection.")
# 3) Query embedding (preprocess -> model)
prepped = preprocess_for_model(enhanced_query_pil)
query_emb = get_dinov2_embedding_from_pil(prepped)
if query_emb is None:
raise RuntimeError("Could not compute query embedding. Exiting.")
# 4) Query phash computation
query_hash_arr = preprocess_for_hash(enhanced_query_pil)
if query_hash_arr is None:
raise RuntimeError("Could not compute query phash array. Exiting.")
query_phash = phash.encode_image(image_array=query_hash_arr)
# 5) Query signature generation (best-effort)
query_sig = None
query_sig_path = os.path.join(processed_dir, "query_for_sig.png")
try:
enhanced_query_pil.save(query_sig_path, format="PNG")
except Exception:
try:
enhanced_query_pil.convert("RGB").save(query_sig_path, format="PNG")
except Exception:
query_sig_path = None
if query_sig_path:
try:
query_sig = gis.generate_signature(query_sig_path)
except Exception as e:
print(f"[ImageSignature] failed for query: {e}")
query_sig = None
# -----------------------
# Prepare stored data arrays
# -----------------------
embeddings_dict = embeddings_dict or {}
hash_dict = hash_dict or {}
signature_obj_map = signature_obj_map or {}
image_paths = list(embeddings_dict.keys())
image_embeddings = np.array(list(embeddings_dict.values()), dtype=float) if embeddings_dict else np.array([])
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
try:
return float(np.dot(a, b))
except Exception:
return -1.0
# Collections
embedding_results: List[Tuple[str, float]] = []
phash_results: List[Tuple[str, Any, float]] = []
imgmatch_results: List[Tuple[str, Any, float]] = []
combined_results: List[Tuple[str, float, float, float, float]] = []
# Iterate stored images and compute similarities
for idx, path in enumerate(image_paths):
# Embedding similarity
try:
stored_emb = image_embeddings[idx]
emb_sim = cosine_similarity(query_emb, stored_emb)
except Exception:
emb_sim = -1.0
embedding_results.append((path, emb_sim))
# PHash similarity (Hamming -> normalized sim)
try:
stored_ph = hash_dict.get(path)
if stored_ph is not None:
hd = phash.hamming_distance(query_phash, stored_ph)
ph_sim = max(0.0, 1.0 - (hd / float(MAX_PHASH_BITS)))
else:
hd = None
ph_sim = 0.0
except Exception:
hd = None
ph_sim = 0.0
phash_results.append((path, hd, ph_sim))
# Image signature similarity (normalized distance -> similarity)
try:
stored_sig = signature_obj_map.get(path)
if stored_sig is not None and query_sig is not None:
dist = gis.normalized_distance(stored_sig, query_sig)
im_sim = max(0.0, 1.0 - dist)
else:
dist = None
im_sim = 0.0
except Exception:
dist = None
im_sim = 0.0
imgmatch_results.append((path, dist, im_sim))
# Combined score: average of the three (embedding is clamped into [0,1])
emb_clamped = max(0.0, min(1.0, emb_sim))
combined = (emb_clamped + ph_sim + im_sim) / 3.0
combined_results.append((path, combined, emb_clamped, ph_sim, im_sim))
# -----------------------
# Sort results
# -----------------------
embedding_results.sort(key=lambda x: x[1], reverse=True)
phash_results_sorted = sorted(phash_results, key=lambda x: (x[2] is not None, x[2]), reverse=True)
imgmatch_results_sorted = sorted(imgmatch_results, key=lambda x: (x[2] is not None, x[2]), reverse=True)
combined_results.sort(key=lambda x: x[1], reverse=True)
# -----------------------
# Print Top-K results
# -----------------------
print("\nTop results by DINOv2 Embeddings:")
for i, (path, score) in enumerate(embedding_results[:k], start=1):
print(f"Rank {i}: {path} | Cosine: {score:.4f}")
print("\nTop results by PHash (Hamming distance & normalized sim):")
for i, (path, hd, sim) in enumerate(phash_results_sorted[:k], start=1):
print(f"Rank {i}: {path} | Hamming: {hd} | NormSim: {sim:.4f}")
print("\nTop results by ImageSignature (normalized similarity = 1 - distance):")
for i, (path, dist, sim) in enumerate(imgmatch_results_sorted[:k], start=1):
print(f"Rank {i}: {path} | NormDist: {dist} | NormSim: {sim:.4f}")
print("\nTop results by Combined Score (avg of embedding|phash|image-match):")
for i, (path, combined, emb_clamped, ph_sim, im_sim) in enumerate(combined_results[:k], start=1):
print(f"Rank {i}: {path} | Combined: {combined:.4f} | emb: {emb_clamped:.4f} | phash_sim: {ph_sim:.4f} | imgmatch_sim: {im_sim:.4f}")
print("\nSearch complete.")
# Return sorted lists for programmatic consumption
return embedding_results, phash_results_sorted, imgmatch_results_sorted, combined_results
# --------------------------
# Choose best candidate helper
# --------------------------
from collections import defaultdict
import math
def choose_top_candidates(embedding_results, phash_results, imgmatch_results, top_k=10,
method_weights=(0.5, 0.3, 0.2), verbose=True):
"""
embedding_results: list of (path, emb_sim) where emb_sim roughly in [-1,1] (we'll clamp to 0..1)
phash_results: list of (path, hamming, ph_sim) where ph_sim in [0,1]
imgmatch_results: list of (path, dist, im_sim) where im_sim in [0,1]
method_weights: weights for (emb, phash, imgmatch) when using weighted average
returns dict with top candidates from three methods and diagnostics
"""
# Build dicts for quick lookup
emb_map = {p: float(s) for p, s in embedding_results}
ph_map = {p: float(sim) for p, _, sim in phash_results}
im_map = {p: float(sim) for p, _, sim in imgmatch_results}
# Universe of candidates (union)
all_paths = sorted(set(list(emb_map.keys()) + list(ph_map.keys()) + list(im_map.keys())))
# --- Normalize each metric across candidates to [0,1] ---
def normalize_map(m):
vals = [m.get(p, None) for p in all_paths]
# treat missing as None
present = [v for v in vals if v is not None and not math.isnan(v)]
if not present:
return {p: 0.0 for p in all_paths}
vmin, vmax = min(present), max(present)
if vmax == vmin:
# constant -> map present values to 1.0, missing to 0
return {p: (1.0 if (m.get(p, None) is not None) else 0.0) for p in all_paths}
norm = {}
for p in all_paths:
v = m.get(p, None)
if v is None or math.isnan(v):
norm[p] = 0.0
else:
norm[p] = (v - vmin) / (vmax - vmin)
# clamp
if norm[p] < 0: norm[p] = 0.0
if norm[p] > 1: norm[p] = 1.0
return norm
# For embeddings, clamp negatives to 0 first (optional)
emb_map_clamped = {}
for p, v in emb_map.items():
# common approach: embeddings are cosine in [-1,1]; clamp negatives to 0 to treat as no-sim
emb_map_clamped[p] = max(0.0, v)
emb_norm = normalize_map(emb_map_clamped)
ph_norm = normalize_map(ph_map)
im_norm = normalize_map(im_map)
# --- Method A: Normalized weighted average ---
w_emb, w_ph, w_im = method_weights
weighted_scores = {}
for p in all_paths:
weighted_scores[p] = (w_emb * emb_norm.get(p, 0.0)
+ w_ph * ph_norm.get(p, 0.0)
+ w_im * im_norm.get(p, 0.0))
top_weighted = sorted(weighted_scores.items(), key=lambda x: x[1], reverse=True)[:top_k]
# --- Method B: Rank-sum (Borda) ---
# compute ranks per metric (higher value => better rank 1)
def ranks_from_map(m_norm):
# bigger is better
items = sorted(m_norm.items(), key=lambda x: x[1], reverse=True)
ranks = {}
for i, (p, _) in enumerate(items):
ranks[p] = i + 1 # 1-based
# missing entries get worst rank (len+1)
worst = len(items) + 1
for p in all_paths:
if p not in ranks:
ranks[p] = worst
return ranks
rank_emb = ranks_from_map(emb_norm)
rank_ph = ranks_from_map(ph_norm)
rank_im = ranks_from_map(im_norm)
rank_sum = {}
for p in all_paths:
rank_sum[p] = rank_emb.get(p, 9999) + rank_ph.get(p, 9999) + rank_im.get(p, 9999)
top_rank_sum = sorted(rank_sum.items(), key=lambda x: x[1])[:top_k] # smaller is better
# --- Method C: Harmonic mean of the normalized scores (penalizes missing/low values) ---
harm_scores = {}
for p in all_paths:
a = emb_norm.get(p, 0.0)
b = ph_norm.get(p, 0.0)
c = im_norm.get(p, 0.0)
# avoid zeros -> harmonic is defined for positive values, but we want to allow zero => it will be 0
if a + b + c == 0:
harm = 0.0
else:
# harmonic mean for three values: 3 / (1/a + 1/b + 1/c), but if any is zero, result is 0
if a == 0 or b == 0 or c == 0:
harm = 0.0
else:
harm = 3.0 / ((1.0/a) + (1.0/b) + (1.0/c))
harm_scores[p] = harm
top_harm = sorted(harm_scores.items(), key=lambda x: x[1], reverse=True)[:top_k]
# --- Consensus set: items that appear in top-K of each metric individually ---
def topk_set_by_map(m_norm, k=top_k):
return set([p for p,_ in sorted(m_norm.items(), key=lambda x: x[1], reverse=True)[:k]])
cons_set = topk_set_by_map(emb_norm, top_k) & topk_set_by_map(ph_norm, top_k) & topk_set_by_map(im_norm, top_k)
# Build readable outputs
result = {
"emb_norm": emb_norm,
"ph_norm": ph_norm,
"im_norm": im_norm,
"weighted_topk": top_weighted,
"rank_sum_topk": top_rank_sum,
"harmonic_topk": top_harm,
"consensus_topk": list(cons_set),
"weighted_scores_full": weighted_scores,
"rank_sum_full": rank_sum,
"harmonic_full": harm_scores
}
if verbose:
print("\nTop by Weighted Normalized Average (weights emb,ph,img = {:.2f},{:.2f},{:.2f}):".format(w_emb, w_ph, w_im))
for i,(p,s) in enumerate(result["weighted_topk"], start=1):
print(f" {i}. {p} score={s:.4f} emb={emb_norm.get(p,0):.3f} ph={ph_norm.get(p,0):.3f} im={im_norm.get(p,0):.3f}")
print("\nTop by Rank-sum (lower is better):")
for i,(p,s) in enumerate(result["rank_sum_topk"], start=1):
print(f" {i}. {p} rank_sum={s} emb_rank={rank_emb.get(p)} ph_rank={rank_ph.get(p)} img_rank={rank_im.get(p)}")
print("\nTop by Harmonic mean (requires non-zero on all metrics):")
for i,(p,s) in enumerate(result["harmonic_topk"], start=1):
print(f" {i}. {p} harm={s:.4f} emb={emb_norm.get(p,0):.3f} ph={ph_norm.get(p,0):.3f} im={im_norm.get(p,0):.3f}")
print("\nConsensus (in top-{0} of ALL metrics): {1}".format(top_k, result["consensus_topk"]))
return result
def is_subpath(path: str, base: str) -> bool:
"""Return True if path is inside base (works across OSes)."""
try:
p = os.path.normpath(os.path.abspath(path))
b = os.path.normpath(os.path.abspath(base))
if os.name == "nt": p = p.lower(); b = b.lower()
return os.path.commonpath([p, b]) == b
except Exception:
return False
# Helper function to load the block catalog from a JSON file
def _load_block_catalog(block_type: str) -> Dict:
"""
Loads the Scratch block catalog named '{block_type}_blocks.json'
from the <project_root>/blocks/ folder. Returns {} on any error.
"""
catalog_path = BLOCKS_DIR / f"{block_type}.json"
try:
text = catalog_path.read_text() # will raise FileNotFoundError if missing
catalog = json.loads(text) # will raise JSONDecodeError if malformed
logger.info(f"Successfully loaded block catalog from {catalog_path}")
return catalog
except FileNotFoundError:
logger.error(f"Error: Block catalog file not found at {catalog_path}")
except json.JSONDecodeError as e:
logger.error(f"Error decoding JSON from {catalog_path}: {e}")
except Exception as e:
logger.error(f"Unexpected error loading {catalog_path}: {e}")
def get_block_by_opcode(catalog_data: dict, opcode: str) -> dict | None:
"""
Search a single catalog (with keys "description" and "blocks": List[dict])
for a block whose 'op_code' matches the given opcode.
Returns the block dict or None if not found.
"""
for block in catalog_data["blocks"]:
if block.get("op_code") == opcode: return block
return None
# Helper function to find a block in all catalogs by opcode
def find_block_in_all(opcode: str, all_catalogs: list[dict]) -> dict | None:
"""
Search across multiple catalogs for a given opcode.
Returns the first matching block dict or None.
"""
for catalog in all_catalogs:
blk = get_block_by_opcode(catalog, opcode)
if blk is not None: return blk
return None
def variable_intialization(project_data):
"""
Updates variable and broadcast definitions in a Scratch project JSON,
populating the 'variables' and 'broadcasts' sections of the Stage target
and extracting initial values for variables.
Args: project_data (dict): The loaded JSON data of the Scratch project.
Returns: dict: The updated project JSON data.
"""
stage_target = None
for target in project_data['targets']:
if target.get('isStage'):
stage_target = target
break
if stage_target is None:
print("Error: Stage target not found in the project data.")
return project_data
# Ensure 'variables' and 'broadcasts' exist in the Stage target
if "variables" not in stage_target:
stage_target["variables"] = {}
if "broadcasts" not in stage_target:
stage_target["broadcasts"] = {}
# Helper function to recursively find and update variable/broadcast fields
def process_dict(obj):
if isinstance(obj, dict):
# Check for "data_setvariableto" opcode to extract initial values
if obj.get("opcode") == "data_setvariableto":
variable_field = obj.get("fields", {}).get("VARIABLE")
value_input = obj.get("inputs", {}).get("VALUE")
if variable_field and isinstance(variable_field, list) and len(variable_field) == 2:
var_name = variable_field[0]
var_id = variable_field[1]
initial_value = ""
if value_input and isinstance(value_input, list) and len(value_input) > 1 and \
isinstance(value_input[1], list) and len(value_input[1]) > 1:
if value_input[1][0] == 10:
initial_value = str(value_input[1][1])
elif value_input[1][0] == 12 and len(value_input) > 2 and isinstance(value_input[2], list) and value_input[2][0] == 10:
initial_value = str(value_input[2][1])
elif isinstance(value_input[1], (str, int, float)):
initial_value = str(value_input[1])
stage_target["variables"][var_id] = [var_name, initial_value]
for key, value in obj.items():
# Process broadcast definitions in 'inputs' (BROADCAST_INPUT)
if key == "BROADCAST_INPUT" and isinstance(value, list) and len(value) == 2 and \
isinstance(value[1], list) and len(value[1]) == 3 and value[1][0] == 11:
broadcast_name = value[1][1]
broadcast_id = value[1][2]
stage_target["broadcasts"][broadcast_id] = broadcast_name
# Process broadcast definitions in 'fields' (BROADCAST_OPTION)
elif key == "BROADCAST_OPTION" and isinstance(value, list) and len(value) == 2:
broadcast_name = value[0]
broadcast_id = value[1]
stage_target["broadcasts"][broadcast_id] = broadcast_name
# Recursively call for nested dictionaries or lists
process_dict(value)
elif isinstance(obj, list):
for i, item in enumerate(obj):
# Process variable references in 'inputs' (like [12, "score", "id"])
if isinstance(item, list) and len(item) == 3 and item[0] == 12:
var_name = item[1]
var_id = item[2]
if var_id not in stage_target["variables"]:
stage_target["variables"][var_id] = [var_name, ""]
process_dict(item)
# Iterate through all targets to process their blocks
for target in project_data['targets']:
if "blocks" in target:
for block_id, block_data in target["blocks"].items():
process_dict(block_data)
return project_data
def deduplicate_variables(project_data):
"""
Removes duplicate variable entries in the 'variables' dictionary of the Stage target,
prioritizing entries with non-empty values.
Args: project_data (dict): The loaded JSON data of the Scratch project.
Returns: dict: The updated project JSON data with deduplicated variables.
"""
stage_target = None
for target in project_data['targets']:
if target.get('isStage'):
stage_target = target
break
if stage_target is None:
print("Error: Stage target not found in the project data.")
return project_data
if "variables" not in stage_target:
return project_data # No variables to deduplicate
resolved_variables = {}
for var_id, var_info in stage_target["variables"].items():
var_name = var_info[0]
var_value = var_info[1]
if var_name not in resolved_variables:
# If the variable name is not yet seen, add it
resolved_variables[var_name] = [var_id, var_name, var_value]
else:
# If the variable name is already seen, decide which one to keep
existing_id, existing_name, existing_value = resolved_variables[var_name]
# Prioritize the entry with a non-empty value
if var_value != "" and existing_value == "":
resolved_variables[var_name] = [var_id, var_name, var_value]
elif var_value != "" and existing_value != "":
resolved_variables[var_name] = [var_id, var_name, var_value]
elif var_value == "" and existing_value == "":
# If both are empty, just keep the current one (arbitrary)
resolved_variables[var_name] = [var_id, var_name, var_value]
# Reconstruct the 'variables' dictionary using the resolved entries
new_variables_dict = {}
for var_name, var_data in resolved_variables.items():
var_id_to_keep = var_data[0]
var_name_to_keep = var_data[1]
var_value_to_keep = var_data[2]
new_variables_dict[var_id_to_keep] = [var_name_to_keep, var_value_to_keep]
stage_target["variables"] = new_variables_dict
return project_data
def variable_adder_main(project_data):
try:
declare_variable_json= variable_intialization(project_data)
print("declare_variable_json------->",declare_variable_json)
except Exception as e:
print(f"Error error in the variable initialization opcodes: {e}")
try:
processed_json= deduplicate_variables(declare_variable_json)
print("processed_json------->",processed_json)
return processed_json
except Exception as e:
print(f"Error error in the variable initialization opcodes: {e}")
# # --- Global variable for the block catalog ---
# ALL_SCRATCH_BLOCKS_CATALOG = {}
# BLOCK_CATALOG_PATH = "blocks" # Define the path to your JSON file
# HAT_BLOCKS_PATH = "hat_blocks" # Path to the hat blocks JSON file
# STACK_BLOCKS_PATH = "stack_blocks" # Path to the stack blocks JSON file
# REPORTER_BLOCKS_PATH = "reporter_blocks" # Path to the reporter blocks JSON file
# BOOLEAN_BLOCKS_PATH = "boolean_blocks" # Path to the boolean blocks JSON file
# C_BLOCKS_PATH = "c_blocks" # Path to the C blocks JSON file
# CAP_BLOCKS_PATH = "cap_blocks" # Path to the cap blocks JSON file
# # Load the block catalogs from their respective JSON files
# hat_block_data = _load_block_catalog(HAT_BLOCKS_PATH)
# hat_description = hat_block_data["description"]
# #hat_description = hat_block_data.get("description", "No description available")
# # hat_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in hat_block_data["blocks"]])
# hat_opcodes_functionalities = "\n".join([
# # f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
# f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
# for block in hat_block_data.get("blocks", [])
# ]) if isinstance(hat_block_data.get("blocks"), list) else " No blocks information available."
# #hat_opcodes_functionalities = os.path.join(BLOCKS_DIR, "hat_blocks.txt")
# print("Hat blocks loaded successfully.", hat_description)
# boolean_block_data = _load_block_catalog(BOOLEAN_BLOCKS_PATH)
# boolean_description = boolean_block_data["description"]
# # boolean_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in boolean_block_data["blocks"]])
# boolean_opcodes_functionalities = "\n".join([
# # f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
# f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
# for block in boolean_block_data.get("blocks", [])
# ]) if isinstance(boolean_block_data.get("blocks"), list) else " No blocks information available."
# #boolean_opcodes_functionalities = os.path.join(BLOCKS_DIR, "boolean_blocks.txt")
# c_block_data = _load_block_catalog(C_BLOCKS_PATH)
# c_description = c_block_data["description"]
# # c_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in c_block_data["blocks"]])
# c_opcodes_functionalities = "\n".join([
# # f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
# f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
# for block in c_block_data.get("blocks", [])
# ]) if isinstance(c_block_data.get("blocks"), list) else " No blocks information available."
# #c_opcodes_functionalities = os.path.join(BLOCKS_DIR, "c_blocks.txt")
# cap_block_data = _load_block_catalog(CAP_BLOCKS_PATH)
# cap_description = cap_block_data["description"]
# # cap_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in cap_block_data["blocks"]])
# cap_opcodes_functionalities = "\n".join([
# # f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
# f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
# for block in cap_block_data.get("blocks", [])
# ]) if isinstance(cap_block_data.get("blocks"), list) else " No blocks information available."
# #cap_opcodes_functionalities = os.path.join(BLOCKS_DIR, "cap_blocks.txt")
# reporter_block_data = _load_block_catalog(REPORTER_BLOCKS_PATH)
# reporter_description = reporter_block_data["description"]
# # reporter_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in reporter_block_data["blocks"]])
# reporter_opcodes_functionalities = "\n".join([
# # f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
# f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
# for block in reporter_block_data.get("blocks", [])
# ]) if isinstance(reporter_block_data.get("blocks"), list) else " No blocks information available."
# #reporter_opcodes_functionalities = os.path.join(BLOCKS_DIR, "reporter_blocks.txt")
# stack_block_data = _load_block_catalog(STACK_BLOCKS_PATH)
# stack_description = stack_block_data["description"]
# # stack_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in stack_block_data["blocks"]])
# stack_opcodes_functionalities = "\n".join([
# # f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
# f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
# for block in stack_block_data.get("blocks", [])
# ]) if isinstance(stack_block_data.get("blocks"), list) else " No blocks information available."
# #stack_opcodes_functionalities = os.path.join(BLOCKS_DIR, "stack_blocks.txt")
# # This makes ALL_SCRATCH_BLOCKS_CATALOG available globally
# ALL_SCRATCH_BLOCKS_CATALOG = _load_block_catalog(BLOCK_CATALOG_PATH)
def extract_json_from_llm_response(raw_response: str) -> dict:
"""
Finds and parses the first valid JSON object from a raw LLM response string.
"""
logger.debug("Attempting to extract JSON from raw LLM response...")
# 1. Look for a JSON markdown block first
match = re.search(r"```(?:json)?\s*({[\s\S]*?})\s*```", raw_response)
if match:
json_string = match.group(1)
logger.debug("Found JSON inside a markdown block.")
try:
return json.loads(json_string)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse JSON from markdown block: {e}")
# Fall through to the next method if parsing fails
# 2. If no block is found (or it failed), find the outermost braces
logger.debug("Markdown block not found or failed. Searching for outermost braces.")
try:
first_brace = raw_response.find('{')
last_brace = raw_response.rfind('}')
if first_brace != -1 and last_brace != -1 and first_brace < last_brace:
json_string = raw_response[first_brace : last_brace + 1]
return json.loads(json_string)
else:
logger.error("Could not find a valid JSON structure (outermost braces).")
raise json.JSONDecodeError("No valid JSON object found in the response.", raw_response, 0)
except json.JSONDecodeError as e:
logger.error(f"Final JSON parsing attempt failed: {e}")
# Re-raise the exception to be caught by the calling logic (to invoke the corrector agent)
raise
def reduce_image_size_to_limit(clean_b64_str: str, max_kb: int = 4000) -> str:
"""
Input: clean_b64_str = BASE64 STRING (no data: prefix)
Output: BASE64 STRING (no data: prefix), sized as close as possible to max_kb KB.
Guarantees: returns a valid base64 string (never None). May still be larger than max_kb
if saving at lowest quality cannot get under the limit.
"""
# sanitize
clean = re.sub(r"\s+", "", clean_b64_str).strip()
# fix padding
missing = len(clean) % 4
if missing:
clean += "=" * (4 - missing)
try:
image_data = base64.b64decode(clean)
except Exception as e:
raise ValueError("Invalid base64 input to reduce_image_size_to_limit") from e
try:
img = Image.open(io.BytesIO(image_data))
img.load()
except Exception as e:
raise ValueError("Could not open image from base64") from e
# convert alpha -> RGB because JPEG doesn't support alpha
if img.mode in ("RGBA", "LA") or (img.mode == "P" and "transparency" in img.info):
background = Image.new("RGB", img.size, (255, 255, 255))
background.paste(img, mask=img.split()[-1] if img.mode != "RGB" else None)
img = background
elif img.mode != "RGB":
img = img.convert("RGB")
low, high = 20, 95
best_bytes = None
# binary search for best quality
while low <= high:
mid = (low + high) // 2
buf = io.BytesIO()
try:
img.save(buf, format="JPEG", quality=mid, optimize=True)
except OSError:
# some PIL builds/channels may throw on optimize=True; fallback without optimize
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=mid)
size_kb = len(buf.getvalue()) / 1024.0
if size_kb <= max_kb:
best_bytes = buf.getvalue()
low = mid + 1
else:
high = mid - 1
# if never found a quality <= max_kb, use the smallest we created (quality = 20)
if best_bytes is None:
buf = io.BytesIO()
try:
img.save(buf, format="JPEG", quality=20, optimize=True)
except OSError:
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=20)
best_bytes = buf.getvalue()
return base64.b64encode(best_bytes).decode("utf-8")
def clean_base64_for_model(raw_b64, max_bytes_threshold=4000000) -> str:
"""
Accepts: raw_b64 can be:
- a data URI 'data:image/png;base64,...'
- a plain base64 string
- a PIL Image
- a list containing the above (take first)
Returns: a data URI string 'data:<mime>;base64,<base64>' guaranteed to be syntactically valid.
"""
# normalize input
if not raw_b64:
return ""
if isinstance(raw_b64, list):
raw_b64 = raw_b64[0] if raw_b64 else ""
if not raw_b64:
return ""
if isinstance(raw_b64, Image.Image):
buf = io.BytesIO()
# convert to RGB and save as JPEG to keep consistent
img = raw_b64.convert("RGB")
img.save(buf, format="JPEG")
clean_b64 = base64.b64encode(buf.getvalue()).decode("utf-8")
mime = "image/jpeg"
return f"data:{mime};base64,{clean_b64}"
if not isinstance(raw_b64, str):
raise TypeError(f"Expected base64 string or PIL Image, got {type(raw_b64)}")
# detect mime if present; otherwise default to png
m = re.match(r"^data:(image\/[a-zA-Z0-9.+-]+);base64,(.+)$", raw_b64, flags=re.DOTALL)
if m:
mime = m.group(1)
clean_b64 = m.group(2)
else:
# no prefix; assume png by default (you can change to jpeg if you prefer)
mime = "image/png"
clean_b64 = raw_b64
# sanitize base64 string
clean_b64 = re.sub(r"\s+", "", clean_b64).strip()
missing = len(clean_b64) % 4
if missing:
clean_b64 += "=" * (4 - missing)
original_size_bytes = len(clean_b64.encode("utf-8"))
# debug print
print(f"Original base64 size (bytes): {original_size_bytes}, mime: {mime}")
if original_size_bytes > max_bytes_threshold:
# reduce and return JPEG prefixed data URI (JPEG tends to compress better for photos)
reduced_clean = reduce_image_size_to_limit(clean_b64, max_kb=4000)
# reduced_clean is plain base64 (no prefix)
print(f"Reduced base64 size (bytes): {original_size_bytes}, mime: {mime}")
return f"data:image/jpeg;base64,{reduced_clean}"
# otherwise return original with its mime prefix (ensure prefix exists)
return f"data:{mime};base64,{clean_b64}"
SCRATCH_OPCODES = [
'motion_movesteps', 'motion_turnright', 'motion_turnleft', 'motion_goto',
'motion_gotoxy', 'motion_glideto', 'motion_glidesecstoxy', 'motion_pointindirection',
'motion_pointtowards', 'motion_changexby', 'motion_setx', 'motion_changeyby',
'motion_sety', 'motion_ifonedgebounce', 'motion_setrotationstyle', 'looks_sayforsecs',
'looks_say', 'looks_thinkforsecs', 'looks_think', 'looks_switchcostumeto',
'looks_nextcostume', 'looks_switchbackdropto', 'looks_switchbackdroptowait',
'looks_nextbackdrop', 'looks_changesizeby', 'looks_setsizeto', 'looks_changeeffectby',
'looks_seteffectto', 'looks_cleargraphiceffects', 'looks_show', 'looks_hide',
'looks_gotofrontback', 'looks_goforwardbackwardlayers', 'sound_playuntildone',
'sound_play', 'sound_stopallsounds', 'sound_changevolumeby', 'sound_setvolumeto',
'event_broadcast', 'event_broadcastandwait', 'control_wait', 'control_wait_until',
'control_stop', 'control_create_clone_of', 'control_delete_this_clone',
'data_setvariableto', 'data_changevariableby', 'data_addtolist', 'data_deleteoflist',
'data_insertatlist', 'data_replaceitemoflist', 'data_showvariable', 'data_hidevariable',
'data_showlist', 'data_hidelist', 'sensing_askandwait', 'sensing_resettimer',
'sensing_setdragmode', 'procedures_call', 'operator_lt', 'operator_equals',
'operator_gt', 'operator_and', 'operator_or', 'operator_not', 'operator_contains',
'sensing_touchingobject', 'sensing_touchingcolor', 'sensing_coloristouchingcolor',
'sensing_keypressed', 'sensing_mousedown', 'data_listcontainsitem', 'control_repeat',
'control_forever', 'control_if', 'control_if_else', 'control_repeat_until',
'motion_xposition', 'motion_yposition', 'motion_direction', 'looks_costumenumbername',
'looks_size', 'looks_backdropnumbername', 'sound_volume', 'sensing_distanceto',
'sensing_answer', 'sensing_mousex', 'sensing_mousey', 'sensing_loudness',
'sensing_timer', 'sensing_of', 'sensing_current', 'sensing_dayssince2000',
'sensing_username', 'operator_add', 'operator_subtract', 'operator_multiply',
'operator_divide', 'operator_random', 'operator_join', 'operator_letterof',
'operator_length', 'operator_mod', 'operator_round', 'operator_mathop',
'data_variable', 'data_list', 'data_itemoflist', 'data_lengthoflist',
'data_itemnumoflist', 'event_whenflagclicked', 'event_whenkeypressed',
'event_whenthisspriteclicked', 'event_whenbackdropswitchesto', 'event_whengreaterthan',
'event_whenbroadcastreceived', 'control_start_as_clone', 'procedures_definition'
]
def validate_and_fix_opcodes(opcode_counts):
"""
Ensures all opcodes are valid. If an opcode is invalid, replace with closest match.
"""
corrected_list = []
for item in opcode_counts:
opcode = item.get("opcode")
count = item.get("count", 1)
if opcode not in SCRATCH_OPCODES:
# Find closest match (case-sensitive)
match = get_close_matches(opcode, SCRATCH_OPCODES, n=1, cutoff=0.6)
if match:
print(f"Opcode '{opcode}' not found. Replacing with '{match[0]}'")
opcode = match[0]
else:
print(f"Opcode '{opcode}' not recognized and no close match found. Skipping.")
continue
corrected_list.append({"opcode": opcode, "count": count})
# Merge duplicates after correction
merged = {}
for item in corrected_list:
merged[item["opcode"]] = merged.get(item["opcode"], 0) + item["count"]
return [{"opcode": k, "count": v} for k, v in merged.items()]
def format_scratch_pseudo_code(code_string):
"""
Parses and formats Scratch pseudo-code with correct indentation,
specifically handling if/else/end structures correctly.
Args:
code_string (str): A string containing Scratch pseudo-code with
potentially inconsistent indentation.
Returns:
str: The correctly formatted and indented pseudo-code string.
"""
lines = code_string.strip().split('\n')
formatted_lines = []
indent_level = 0
# Keywords that increase indentation for the NEXT line
indent_keywords = ['when', 'forever', 'if', 'repeat', 'else']
# Keywords that decrease indentation for the CURRENT line
unindent_keywords = ['end', 'else']
for line in lines:
stripped_line = line.strip()
if not stripped_line:
continue
# Check for keywords that should un-indent the current line
if any(keyword in stripped_line for keyword in unindent_keywords):
# Special case for 'else': it should align with its 'if'
if 'else' in stripped_line:
# Decrease indentation for 'else' and its following lines
indentation = ' ' * (indent_level -1)
formatted_lines.append(indentation + stripped_line)
continue
# For 'end', decrease the level before formatting
indent_level = max(0, indent_level - 1)
indentation = ' ' * indent_level
formatted_lines.append(indentation + stripped_line)
# Check for keywords that should indent the next line
if any(keyword in stripped_line for keyword in indent_keywords):
# 'else' both un-indents and indents, so the level remains the same for the next block
if 'else' not in stripped_line:
indent_level += 1
return '\n'.join(formatted_lines)
# Node 1: Logic updating if any issue here
def pseudo_generator_node(state: GameState):
logger.info("--- Running plan_logic_aligner_node ---")
image = state.get("project_image", "")
project_json = state["project_json"]
cnt =state["page_count"]
print(f"The page number recived at the pseudo_generator node:-----> {cnt}")
# MODIFICATION 1: Include 'Stage' in the list of names to plan for.
# It's crucial to ensure 'Stage' is always present for its global role.
target_names = [t["name"] for t in project_json["targets"]]
stage_names = [t["name"] for t in project_json["targets"] if t.get("isStage")]
sprite_names = [t["name"] for t in project_json["targets"] if not t.get("isStage")]
# Get costumes separately for Stage and Sprites
stage_costumes = [
c["name"]
for t in project_json["targets"] if t.get("isStage")
for c in t.get("costumes", [])
]
refinement_prompt = f"""
You are an expert Scratch 3.0 programmer. Your task is to analyze an image of Scratch code blocks and convert it into a structured JSON object containing precise pseudocode.
---
## CONTEXT
- **Available Sprites:** {', '.join(sprite_names)}
- **Available Stage Costumes:** {', '.join(stage_costumes)}
---
## INSTRUCTIONS
1. **Identify the Target:** Find the text "Script for:" in the image to determine the target sprite or stage.
2. **Apply Stage Rule:** If the identified target name exactly matches any name in the `Available Stage Costumes` list, you MUST set the output `name_variable` to `"Stage"`. Otherwise, use the identified target name.
3. **Handle No Code:** If no Scratch blocks are visible in the image, return the specified "No Code-blocks" JSON format.
4. **Generate Pseudocode:** If blocks are present, convert them to pseudocode according to the rules below.
5. **Output ONLY JSON:** Your entire response must be a single, valid JSON object inside a ```json code block and nothing else.
---
## PSEUDOCODE FORMATTING RULES
- **Numbers & Text:** Enclose in parentheses. `(10)`, `(-50)`, `(hello)`.
- **Variables & Dropdowns:** Enclose in square brackets with ` v`. `[score v]`, `[space v]`.
- **Reporter Blocks:** Enclose in double parentheses. `((x position))`.
- **Boolean Conditions:** Enclose in angle brackets. `<((score)) > (50)>`, `<not <touching [edge v]?>>`.
- **Specific Block Exceptions:** Self-contained blocks like `if on edge, bounce`, `next costume`, and `hide` should be written as-is, without any parentheses or brackets.
- **Line Breaks:** Use `\n` to separate each block onto a new line. The entire pseudocode must be a single JSON string.
- **Indentation:** Use **4 spaces** to indent blocks nested inside C-Blocks (like `if`, `if else`, `repeat`, `forever`).
- **Termination:**
- **Every script** (starting with a hat block) MUST conclude with `end`.
- **Every C-Block** (`if`, `repeat`, `forever`) MUST also have its own corresponding `end` at the correct indentation level. This is critical.
---
## REQUIRED JSON FORMAT
If code blocks are found:
```json
{{
"refined_logic": {{
"name_variable": "Name_Identified_From_Instructions",
"pseudocode": "Your fully formatted pseudocode as a single string with \\n newlines."
}}
}}
````
If no code blocks are found:
```json
{{
"refined_logic": {{
"name_variable": "Name_Identified_From_Instructions",
"pseudocode": "No Code-blocks"
}}
}}
```
-----
## EXAMPLES
**Example 1: Looping and Conditionals**
```
when green flag clicked
go to x: (240) y: (-100)
set [speed v] to (-5)
forever
change x by ([speed v])
if <((x position)) < (-240)> then
go to x: (240) y: (-100)
end
end
end
```
**Example 2: Events and Broadcasting**
```
when I receive [Game Over v]
if <((score)) > (([High Score v]))> then
set [High Score v] to ([score v])
end
switch backdrop to [Game Over v]
end
```
"""
image_input = {
"type": "image_url",
"image_url": {
# "url": f"data:image/png;base64,{image}"
"url": clean_base64_for_model(image[cnt])
}
}
content = [
{"type": "text", "text": refinement_prompt},
image_input
]
try:
# Invoke the main agent for logic refinement and relationship identification
response = agent.invoke({"messages": [{"role": "user", "content": content}]})
llm_output_raw = response["messages"][-1].content.strip()
print(f"llm_output_raw: {response}")
parsed_llm_output = extract_json_from_llm_response(llm_output_raw)
result = parsed_llm_output
print(f"result:\n\n {result}")
except json.JSONDecodeError as error_json:
correction_prompt = f"""
Fix this malformed response and return only the corrected JSON:
Input: {llm_output_raw if 'llm_output_raw' in locals() else 'No response available'}
Extract the sprite name and pseudocode, then return in this exact format:
{{
"refined_logic": {{
"name_variable": "sprite_name",
"pseudocode": "pseudocode_here"
}}
}}
"""
try:
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
corrected_output = extract_json_from_llm_response(correction_response['messages'][-1].content)
result = corrected_output
print(f"result:\n\n {result}")
except Exception as e_corr:
logger.error(f"Failed to correct JSON output for even after retry: {e_corr}")
# Update the original action_plan in the state with the refined version
state["pseudo_code"] = result
state["temp_pseudo_code"] += [result]
Data = state["temp_pseudo_code"]
print(f"[OVREALL REFINED PSEUDO CODE LOGIC]: {result}")
print(f"[OVREALL LISTS OF LOGICS]: {Data}")
logger.info("Plan refinement and block relation analysis completed for all plans.")
return state
# Node2: Node Optimizer node
def node_optimizer(state: GameState):
logger.info("--- Running Node Optimizer Node ---")
project_json = state["project_json"]
raw = state.get("pseudo_code", {})
refined_logic_data = raw.get("refined_logic", {})
sprite_name = refined_logic_data.get("name_variable", "<unknown>")
pseudo = refined_logic_data.get("pseudocode", "")
sprite_name = {}
project_json_targets = state.get("project_json", {}).get("targets", [])
for target in project_json_targets:
sprite_name[target["name"]] = target["name"]
action_flow = state.get("action_plan", {})
try:
refined_logic_data["pseudocode"] = separate_scripts(str(pseudo))
# Step 4: If you want to update the `state` dictionary with the new refined_logic_data
state["pseudo_code"]["refined_logic"] = refined_logic_data
print(f"[The pseudo_code generated here]: { state['pseudo_code']}")
state["action_plan"] = transform_logic_to_action_flow(state["pseudo_code"])
print(f"[The action plan generated here]: { state['action_plan']}")
action_flow = state.get("action_plan", {})
if action_flow.get("action_overall_flow", {}) == {}:
plan_data = action_flow.items()
else:
plan_data = action_flow.get("action_overall_flow", {}).items()
refined_flow: Dict[str, Any] = {}
for sprite, sprite_data in plan_data:
refined_plans = []
for plan in sprite_data.get("plans", []):
logic = plan.get("logic", "")
plan["opcode_counts"]= analyze_opcode_counts(str(logic))
refined_plans.append(plan)
refined_flow[sprite] = {
"description": sprite_data.get("description", ""),
"plans": refined_plans
}
if refined_flow:
state["action_plan"] = refined_flow
logger.info("Node Optimization completed.")
return state
except Exception as e:
logger.error(f"Error in Node Optimizer Node: {e}")
# Node 5: block_builder_node
def overall_block_builder_node_2(state: GameState):
logger.info("--- Running OverallBlockBuilderNode ---")
print("--- Running OverallBlockBuilderNode ---")
project_json = state["project_json"]
targets = project_json["targets"]
# --- Sprite and Stage Target Mapping ---
sprite_map = {target["name"]: target for target in targets if not target["isStage"]}
stage_target = next((target for target in targets if target["isStage"]), None)
if stage_target:
sprite_map[stage_target["name"]] = stage_target
action_plan = state.get("action_plan", {})
print("[Overall Action Plan received at the block generator]:", json.dumps(action_plan, indent=2))
if not action_plan:
logger.warning("No action plan found in state. Skipping OverallBlockBuilderNode.")
return state
# Initialize offsets for script placement on the Scratch canvas
script_y_offset = {}
script_x_offset_per_sprite = {name: 0 for name in sprite_map.keys()}
# This handles potential variations in the action_plan structure.
if action_plan.get("action_overall_flow", {}) == {}:
plan_data = action_plan.items()
else:
plan_data = action_plan.get("action_overall_flow", {}).items()
# --- Extract global project context for LLM ---
all_sprite_names = list(sprite_map.keys())
all_variable_names = {}
all_list_names = {}
all_broadcast_messages = {}
for target in targets:
for var_id, var_info in target.get("variables", {}).items():
all_variable_names[var_info[0]] = var_id # Store name -> ID mapping (e.g., "myVariable": "myVarId123")
for list_id, list_info in target.get("lists", {}).items():
all_list_names[list_info[0]] = list_id # Store name -> ID mapping
for broadcast_id, broadcast_name in target.get("broadcasts", {}).items():
all_broadcast_messages[broadcast_name] = broadcast_id # Store name -> ID mapping
# --- Process each sprite's action plan ---
for sprite_name, sprite_actions_data in plan_data:
if sprite_name in sprite_map:
current_sprite_target = sprite_map[sprite_name]
if "blocks" not in current_sprite_target:
current_sprite_target["blocks"] = {}
if sprite_name not in script_y_offset:
script_y_offset[sprite_name] = 0
for plan_entry in sprite_actions_data.get("plans", []):
logic_sequence = str(plan_entry["logic"])
opcode_counts = plan_entry.get("opcode_counts", {})
refined_indent_logic = format_scratch_pseudo_code(logic_sequence)
print(f"\n--------------------------- refined indent logic: {refined_indent_logic}-------------------------------\n")
try:
generated_blocks = block_builder(opcode_counts, refined_indent_logic)
# Ensure generated_blocks is a dictionary
if not isinstance(generated_blocks, dict):
logger.error(f"block_builder for sprite '{sprite_name}' returned non-dict type: {type(generated_blocks)}. Skipping block update.")
continue # Skip to next plan_entry if output is not a dictionary
if "blocks" in generated_blocks and isinstance(generated_blocks["blocks"], dict):
logger.warning(f"LLM returned nested 'blocks' key for {sprite_name}. Unwrapping.")
generated_blocks = generated_blocks["blocks"]
# Update block positions for top-level script
for block_id, block_data in generated_blocks.items():
if block_data.get("topLevel"):
block_data["x"] = script_x_offset_per_sprite.get(sprite_name, 0)
block_data["y"] = script_y_offset[sprite_name]
script_y_offset[sprite_name] += 150 # Increment for next script
current_sprite_target["blocks"].update(generated_blocks)
print(f"[current_sprite_target block updated]: {current_sprite_target['blocks']}")
state["iteration_count"] = 0
logger.info(f"Action blocks added for sprite '{sprite_name}' by OverallBlockBuilderNode.")
except Exception as e:
logger.error(f"Error generating blocks for sprite '{sprite_name}': {e}")
state["project_json"] = project_json
return state
# Node 6: variable adder node
def variable_adder_node(state: GameState):
logger.info("--- Running Variable Adder Node ---")
project_json = state["project_json"]
try:
updated_project_json = variable_adder_main(project_json)
if updated_project_json is not None:
print("Variable added inside the project successfully!")
state["project_json"]=updated_project_json
else:
print("Variable adder unable to add any variable inside the project!")
state["project_json"]=project_json
state["page_count"] +=1
return state
except Exception as e:
logger.error(f"Error in variable adder node while updating project_json': {e}")
raise
# Node 7: variable adder node
def layer_order_correction(state: GameState):
"""
Ensures that all sprites (isStage: false) have unique layerOrder values >= 1.
If duplicates are found, they are reassigned sequentially.
"""
logger.info("--- Running Layer Order Correction Node ---")
try:
project_json = state.get("project_json", {})
targets = project_json.get("targets", [])
# Collect all sprites (ignore Stage)
sprites = [t for t in targets if not t.get("isStage", False)]
# Reassign layerOrder sequentially (starting from 1)
for idx, sprite in enumerate(sprites, start=1):
old_lo = sprite.get("layerOrder", None)
sprite["layerOrder"] = idx
logger.debug(f"Sprite '{sprite.get('name')}' layerOrder: {old_lo} -> {idx}")
# Stage always remains 0
for target in targets:
if target.get("isStage", False):
target["layerOrder"] = 0
# Update state
state["project_json"]["targets"] = targets
logger.info("Layer Order Correction completed successfully.")
return state
except Exception as e:
logger.error(f"Error in Layer Order Correction Node: {e}")
return state
# Node 8: variable adder node
def processed_page_node(state: GameState):
logger.info("--- Processing the Pages Node ---")
image = state.get("project_image", "")
cnt =state["page_count"]
print(f"The page processed for page:--------------> {cnt}")
if cnt<len(image):
state["processing"]= True
else:
state["processing"]= False
return state
# def extract_images_from_pdf(pdf_stream: io.BytesIO):
# ''' Extract images from PDF and generate structured sprite JSON '''
# manipulated_json = {}
# img_elements = []
# try:
# if isinstance(pdf_stream, io.BytesIO):
# # use a random ID since there's no filename
# pdf_id = uuid.uuid4().hex
# else:
# pdf_id = os.path.splitext(os.path.basename(pdf_stream))[0]
# try:
# elements = partition_pdf(
# file=pdf_stream,
# strategy="hi_res",
# # strategy="fast",
# extract_image_block_types=["Image"],
# hi_res_model_name="yolox",
# extract_image_block_to_payload=True,
# )
# print(f"ELEMENTS")
# except Exception as e:
# raise RuntimeError(
# f"❌ Failed to extract images from PDF: {str(e)}")
# file_elements = [element.to_dict() for element in elements]
# print(f"========== file elements: \n{file_elements}")
# sprite_count = 1
# for el in file_elements:
# img_b64 = el["metadata"].get("image_base64")
# if not img_b64:
# continue
# manipulated_json[f"Sprite {sprite_count}"] = {
# "base64": el["metadata"]["image_base64"],
# "file-path": pdf_id,
# }
# sprite_count += 1
# return manipulated_json
# except Exception as e:
# raise RuntimeError(f"❌ Error in extract_images_from_pdf: {str(e)}")
def extract_images_from_pdf(pdf_stream, output_dir):
manipulated_json = {}
try:
pdf_id = uuid.uuid4().hex
elements = partition_pdf(
file=pdf_stream,
strategy="hi_res",
extract_image_block_types=["Image"],
hi_res_model_name="yolox",
extract_image_block_to_payload=False,
extract_image_block_output_dir=BLOCKS_DIR,
)
file_elements = [element.to_dict() for element in elements]
sprite_count = 1
for el in file_elements:
img_path = el["metadata"].get("image_path")
# ✅ skip if no image_path was returned
if not img_path:
continue
with open(img_path, "rb") as f:
base_file = base64.b64encode(f.read()).decode("utf-8")
image_uuid = str(uuid.uuid4())
manipulated_json[f"Sprite {sprite_count}"] = {
"base64": base_file,
"file-path": img_path,
"pdf-id": pdf_id,
"image-uuid": image_uuid,
}
sprite_count += 1
return manipulated_json
except Exception as e:
raise RuntimeError(f"❌ Error in extract_images_from_pdf: {str(e)}")
def similarity_matching(sprites_data: dict, project_folder: str, top_k: int = 1, min_similarity: float = None) -> str:
print("🔍 Running similarity matching…")
import os
import json
import numpy as np
import torch
from PIL import Image, ImageOps, ImageEnhance
from imagededup.methods import PHash
from transformers import AutoImageProcessor, AutoModel
import io
import base64
from pathlib import Path
import cv2
# hashing & image-match
from image_match.goldberg import ImageSignature
import sys
import math
import hashlib
from typing import List, Tuple
os.makedirs(project_folder, exist_ok=True)
# backdrop_base_path = r"D:\DEV PATEL\2025\scratch_VLM\scratch_agent\blocks\Backdrops"
# sprite_base_path = r"D:\DEV PATEL\2025\scratch_VLM\scratch_agent\blocks\sprites"
# code_blocks_path = r"D:\DEV PATEL\2025\scratch_VLM\scratch_agent\blocks\code_blocks"
backdrop_base_path = os.path.normpath(str(BACKDROP_DIR))
sprite_base_path = os.path.normpath(str(SPRITE_DIR))
code_blocks_path = os.path.normpath(str(CODE_BLOCKS_DIR))
# out_path = r"D:\DEV PATEL\2025\scratch_VLM\scratch_agent\blocks\out_json"
project_json_path = os.path.join(project_folder, "project.json")
# -------------------------
# Build sprite images list (BytesIO) from sprites_data
# -------------------------
sprite_ids, sprite_base64 = [], []
for sid, sprite in sprites_data.items():
sprite_ids.append(sid)
sprite_base64.append(sprite["base64"])
sprite_images_bytes = []
sprite_b64_clean = [] # <<< new: store cleaned base64 strings
for b64 in sprite_base64:
# remove possible "data:image/..;base64," prefix
raw_b64 = b64.split(",")[-1]
sprite_b64_clean.append(raw_b64)
# decode into BytesIO for local processing
img = Image.open(BytesIO(base64.b64decode(raw_b64))).convert("RGB")
buffer = BytesIO()
img.save(buffer, format="PNG")
buffer.seek(0)
sprite_images_bytes.append(buffer)
def hybrid_similarity_matching(sprite_images_bytes, sprite_ids, min_similarity=None, top_k=5, method_weights=(0.5,0.3,0.2)):
from PIL import Image
# Local safe defaults
embeddings_path = os.path.join(BLOCKS_DIR, "hybrid_embeddings.json")
hash_path = os.path.join(BLOCKS_DIR, "phash_data.json")
signature_path = os.path.join(BLOCKS_DIR, "signature_data.json")
# Load embeddings
embedding_json = {}
if os.path.exists(embeddings_path):
with open(embeddings_path, "r", encoding="utf-8") as f:
embedding_json = json.load(f)
# Load phash data (if exists) -> ensure hash_dict variable exists
hash_dict = {}
if os.path.exists(hash_path):
try:
with open(hash_path, "r", encoding="utf-8") as f:
hash_data = json.load(f)
for path, hash_str in hash_data.items():
try:
hash_dict[path] = hash_str
except Exception:
pass
except Exception:
pass
# Load signature data (if exists) -> ensure signature_dict exists
signature_dict = {}
sig_data = {}
if os.path.exists(signature_path):
try:
with open(signature_path, "r", encoding="utf-8") as f:
sig_data = json.load(f)
for path, sig_list in sig_data.items():
try:
signature_dict[path] = np.array(sig_list)
except Exception:
pass
except Exception:
pass
# Parse embeddings into lists
paths_list = []
embeddings_list = []
if isinstance(embedding_json, dict):
for p, emb in embedding_json.items():
if isinstance(emb, dict):
maybe_emb = emb.get("embedding") or emb.get("embeddings") or emb.get("emb")
if maybe_emb is None:
continue
arr = np.asarray(maybe_emb, dtype=np.float32)
elif isinstance(emb, list):
arr = np.asarray(emb, dtype=np.float32)
else:
continue
paths_list.append(os.path.normpath(str(p)))
embeddings_list.append(arr)
elif isinstance(embedding_json, list):
for item in embedding_json:
if not isinstance(item, dict):
continue
p = item.get("path") or item.get("image_path") or item.get("file") or item.get("filename") or item.get("img_path")
emb = item.get("embeddings") or item.get("embedding") or item.get("features") or item.get("vector") or item.get("emb")
if p is None or emb is None:
continue
paths_list.append(os.path.normpath(str(p)))
embeddings_list.append(np.asarray(emb, dtype=np.float32))
if len(paths_list) == 0:
print("⚠ No reference images/embeddings found (this test harness may be running without data)")
# Return empty results gracefully
return [[] for _ in sprite_images_bytes], [[] for _ in sprite_images_bytes], []
ref_matrix = np.vstack(embeddings_list).astype(np.float32)
# Batch: Get all sprite embeddings, phash, sigs first
sprite_emb_list = []
sprite_phash_list = []
sprite_sig_list = []
per_sprite_final_indices = []
per_sprite_final_scores = []
per_sprite_rerank_debug = []
for i, sprite_bytes in enumerate(sprite_images_bytes):
sprite_pil = Image.open(sprite_bytes)
enhanced_sprite = process_image_cv2_from_pil(sprite_pil, scale=2) or sprite_pil
# sprite_emb = get_dinov2_embedding_from_pil(preprocess_for_model(enhanced_sprite)) or np.zeros(ref_matrix.shape[1])
# sprite_emb_list.append(sprite_emb)
sprite_emb = get_dinov2_embedding_from_pil(preprocess_for_model(enhanced_sprite))
sprite_emb = sprite_emb if sprite_emb is not None else np.zeros(ref_matrix.shape[1])
sprite_emb_list.append(sprite_emb)
# Perceptual hash
sprite_hash_arr = preprocess_for_hash(enhanced_sprite)
sprite_phash = None
if sprite_hash_arr is not None:
try: sprite_phash = phash.encode_image(image_array=sprite_hash_arr)
except: pass
sprite_phash_list.append(sprite_phash)
# Signature
sprite_sig = None
embedding_results, phash_results, imgmatch_results, combined_results = run_query_search_flow(
query_b64=sprite_b64_clean[i],
processed_dir=BLOCKS_DIR,
embeddings_dict=embedding_json,
hash_dict=hash_data,
signature_obj_map=sig_data,
gis=gis,
phash=phash,
MAX_PHASH_BITS=64,
k=5
)
# Call the advanced re-ranker
rerank_result = choose_top_candidates(embedding_results, phash_results, imgmatch_results,
top_k=top_k, method_weights=method_weights, verbose=True)
per_sprite_rerank_debug.append(rerank_result)
# Selection logic: prefer consensus, else weighted top-1
final = None
if len(rerank_result["consensus_topk"]) > 0:
consensus = rerank_result["consensus_topk"]
best = max(consensus, key=lambda p: rerank_result["weighted_scores_full"].get(p, 0.0))
final = best
else:
final = rerank_result["weighted_topk"][0][0] if rerank_result["weighted_topk"] else None
# Store index and score for downstream use
if final is not None and final in paths_list:
idx = paths_list.index(final)
score = rerank_result["weighted_scores_full"].get(final, 0.0)
per_sprite_final_indices.append([idx])
per_sprite_final_scores.append([score])
print(f"Sprite '{sprite_ids}' FINAL selected: {final} (index {idx}) score={score:.4f}")
else:
per_sprite_final_indices.append([])
per_sprite_final_scores.append([])
return per_sprite_final_indices, per_sprite_final_scores, paths_list#, per_sprite_rerank_debug
# Use hybrid matching system
per_sprite_matched_indices, per_sprite_scores, paths_list = hybrid_similarity_matching(
sprite_images_bytes, sprite_ids, min_similarity, top_k, method_weights=(0.5, 0.3, 0.2)
)
# =========================================
# Copy matched sprite assets + collect data
# =========================================
project_data = []
backdrop_data = []
copied_sprite_folders = set()
copied_backdrop_folders = set()
matched_indices = sorted({idx for lst in per_sprite_matched_indices for idx in lst})
print("matched_indices------------------>",matched_indices)
import shutil
import json
import os
from pathlib import Path
# normalize base paths once before the loop
sprite_base_p = Path(sprite_base_path).resolve(strict=False)
backdrop_base_p = Path(backdrop_base_path).resolve(strict=False)
project_folder_p = Path(project_folder)
project_folder_p.mkdir(parents=True, exist_ok=True)
copied_sprite_folders = set()
copied_backdrop_folders = set()
def display_like_windows_no_lead(p: Path) -> str:
"""
For human-readable logs only — convert Path to a string like:
"app\\blocks\\Backdrops\\Castle 2.sb3" (no leading slash).
"""
s = p.as_posix() # forward-slash string, safe for Path objects
if s.startswith("/"):
s = s[1:]
return s.replace("/", "\\")
def is_subpath(child: Path, parent: Path) -> bool:
"""Robust membership test: is child under parent?"""
try:
# use non-strict resolve only if needed, but avoid exceptions
child.relative_to(parent)
return True
except Exception:
return False
# Flatten unique matched indices (if not already)
matched_indices = sorted({idx for lst in per_sprite_matched_indices for idx in lst})
print("matched_indices------------------>", matched_indices)
for matched_idx in matched_indices:
# defensive check
if not (0 <= matched_idx < len(paths_list)):
print(f" ⚠ matched_idx {matched_idx} out of range, skipping")
continue
matched_image_path = paths_list[matched_idx]
matched_path_p = Path(matched_image_path).resolve(strict=False) # keep as Path
matched_folder_p = matched_path_p.parent # Path object
matched_filename = matched_path_p.name
# Prepare display-only string (do NOT reassign matched_folder_p)
matched_folder_display = display_like_windows_no_lead(matched_folder_p)
print(f"Processing matched image: {matched_image_path}")
print(f" - Folder: {matched_folder_display}")
print(f" - Sprite path: {display_like_windows_no_lead(sprite_base_p)}")
print(f" - Backdrop path: {display_like_windows_no_lead(backdrop_base_p)}")
print(f" - Filename: {matched_filename}")
# Use a canonical string to store in the copied set (POSIX absolute-ish)
folder_key = matched_folder_p.as_posix()
# ---------- SPRITE ----------
if is_subpath(matched_folder_p, sprite_base_p) and folder_key not in copied_sprite_folders:
print(f"Processing SPRITE folder: {matched_folder_display}")
copied_sprite_folders.add(folder_key)
sprite_json_path = matched_folder_p / "sprite.json"
print("sprite_json_path----------------------->", sprite_json_path)
print("copied sprite folder----------------------->", copied_sprite_folders)
if sprite_json_path.exists() and sprite_json_path.is_file():
try:
with sprite_json_path.open("r", encoding="utf-8") as f:
sprite_info = json.load(f)
project_data.append(sprite_info)
print(f" ✓ Successfully read sprite.json from {matched_folder_display}")
except Exception as e:
print(f" ✗ Failed to read sprite.json in {matched_folder_display}: {repr(e)}")
else:
print(f" ⚠ No sprite.json in {matched_folder_display}")
# copy non-matching files from the sprite folder (except matched image and sprite.json)
try:
sprite_files = list(matched_folder_p.iterdir())
except Exception as e:
sprite_files = []
print(f" ✗ Failed to list files in {matched_folder_display}: {repr(e)}")
print(f" Files in sprite folder: {[p.name for p in sprite_files]}")
for p in sprite_files:
fname = p.name
if fname in (matched_filename, "sprite.json"):
print(f" Skipping {fname} (matched image or sprite.json)")
continue
if p.is_file():
dst = project_folder_p / fname
try:
shutil.copy2(str(p), str(dst))
print(f" ✓ Copied sprite asset: {p} -> {dst}")
except Exception as e:
print(f" ✗ Failed to copy sprite asset {p}: {repr(e)}")
else:
print(f" Skipping {fname} (not a file)")
# ---------- BACKDROP ----------
if is_subpath(matched_folder_p, backdrop_base_p) and folder_key not in copied_backdrop_folders:
print(f"Processing BACKDROP folder: {matched_folder_display}")
copied_backdrop_folders.add(folder_key)
print("backdrop_base_path----------------------->", display_like_windows_no_lead(backdrop_base_p))
print("copied backdrop folder----------------------->", copied_backdrop_folders)
# copy matched backdrop image
backdrop_src = matched_folder_p / matched_filename
backdrop_dst = project_folder_p / matched_filename
if backdrop_src.exists() and backdrop_src.is_file():
try:
shutil.copy2(str(backdrop_src), str(backdrop_dst))
print(f" ✓ Copied matched backdrop image: {backdrop_src} -> {backdrop_dst}")
except Exception as e:
print(f" ✗ Failed to copy matched backdrop image {backdrop_src}: {repr(e)}")
else:
print(f" ⚠ Matched backdrop source not found: {backdrop_src}")
# copy other files from folder (skip project.json and matched image)
try:
backdrop_files = list(matched_folder_p.iterdir())
except Exception as e:
backdrop_files = []
print(f" ✗ Failed to list files in {matched_folder_display}: {repr(e)}")
print(f" Files in backdrop folder: {[p.name for p in backdrop_files]}")
for p in backdrop_files:
fname = p.name
if fname in (matched_filename, "project.json"):
print(f" Skipping {fname} (matched image or project.json)")
continue
if p.is_file():
dst = project_folder_p / fname
try:
shutil.copy2(str(p), str(dst))
print(f" ✓ Copied backdrop asset: {p} -> {dst}")
except Exception as e:
print(f" ✗ Failed to copy backdrop asset {p}: {repr(e)}")
else:
print(f" Skipping {fname} (not a file)")
# read project.json to extract Stage/targets
pj = matched_folder_p / "project.json"
if pj.exists() and pj.is_file():
try:
with pj.open("r", encoding="utf-8") as f:
bd_json = json.load(f)
stage_count = 0
for tgt in bd_json.get("targets", []):
if tgt.get("isStage"):
backdrop_data.append(tgt)
stage_count += 1
print(f" ✓ Successfully read project.json from {matched_folder_display}, found {stage_count} stage(s)")
except Exception as e:
print(f" ✗ Failed to read project.json in {matched_folder_display}: {repr(e)}")
else:
print(f" ⚠ No project.json in {matched_folder_display}")
print("---")
final_project = {
"targets": [], "monitors": [], "extensions": [],
"meta": {
"semver": "3.0.0",
"vm": "11.3.0",
"agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"
}
}
# Add sprite targets (non-stage)
for spr in project_data:
if not spr.get("isStage", False):
final_project["targets"].append(spr)
if backdrop_data:
all_costumes, sounds = [], []
seen_costumes = set()
for i, bd in enumerate(backdrop_data):
for costume in bd.get("costumes", []):
key = (costume.get("name"), costume.get("assetId"))
if key not in seen_costumes:
seen_costumes.add(key)
all_costumes.append(costume)
if i == 0:
sounds = bd.get("sounds", [])
stage_obj={
"isStage": True,
"name": "Stage",
"objName": "Stage",
"variables": {},
"lists": {},
"broadcasts": {},
"blocks": {},
"comments": {},
"currentCostume": 1 if len(all_costumes) > 1 else 0,
"costumes": all_costumes,
"sounds": sounds,
"volume": 100,
"layerOrder": 0,
"tempo": 60,
"videoTransparency": 50,
"videoState": "on",
"textToSpeechLanguage": None
}
final_project["targets"].insert(0, stage_obj)
else:
logger.warning("⚠️ No backdrop matched. Using default static backdrop.")
default_backdrop_path = BACKDROP_DIR / "cd21514d0531fdffb22204e0ec5ed84a.svg"
default_backdrop_name = "cd21514d0531fdffb22204e0ec5ed84a.svg"
default_backdrop_sound = BACKDROP_DIR / "83a9787d4cb6f3b7632b4ddfebf74367.wav"
default_backdrop_sound_name = "cd21514d0531fdffb22204e0ec5ed84a.svg"
try:
shutil.copy2(default_backdrop_path, os.path.join(project_folder, default_backdrop_name))
logger.info(f"✅ Default backdrop copied to project: {default_backdrop_name}")
shutil.copy2(default_backdrop_sound, os.path.join(project_folder, default_backdrop_sound_name))
logger.info(f"✅ Default backdrop sound copied to project: {default_backdrop_sound_name}")
except Exception as e:
logger.error(f"❌ Failed to copy default backdrop: {e}")
stage_obj={
"isStage": True,
"name": "Stage",
"objName": "Stage",
"variables": {},
"lists": {},
"broadcasts": {},
"blocks": {},
"comments": {},
"currentCostume": 0,
"costumes": [
{
"assetId": default_backdrop_name.split(".")[0],
"name": "defaultBackdrop",
"md5ext": default_backdrop_name,
"dataFormat": "svg",
"rotationCenterX": 240,
"rotationCenterY": 180
}
],
"sounds": [
{
"name": "pop",
"assetId": "83a9787d4cb6f3b7632b4ddfebf74367",
"dataFormat": "wav",
"format": "",
"rate": 48000,
"sampleCount": 1123,
"md5ext": "83a9787d4cb6f3b7632b4ddfebf74367.wav"
}
],
"volume": 100,
"layerOrder": 0,
"tempo": 60,
"videoTransparency": 50,
"videoState": "on",
"textToSpeechLanguage": None
}
final_project["targets"].insert(0, stage_obj)
with open(project_json_path, 'w') as f:
json.dump(final_project, f, indent=2)
return project_json_path
# ''' It appends all the list and paths from json files and pick the best match's path'''
# def similarity_matching(sprites_data: dict, project_folder: str, top_k: int = 1, min_similarity: float = None) -> str:
# print("🔍 Running similarity matching…")
# import os
# import json
# os.makedirs(project_folder, exist_ok=True)
# backdrop_base_path = os.path.normpath(str(BACKDROP_DIR))
# sprite_base_path = os.path.normpath(str(SPRITE_DIR))
# code_blocks_path = os.path.normpath(str(CODE_BLOCKS_DIR))
# project_json_path = os.path.join(project_folder, "project.json")
# # -------------------------
# # Build sprite images list (BytesIO) from sprites_data
# # -------------------------
# sprite_ids, sprite_base64 = [], []
# for sid, sprite in sprites_data.items():
# sprite_ids.append(sid)
# sprite_base64.append(sprite["base64"])
# sprite_images_bytes = []
# for b64 in sprite_base64:
# img = Image.open(BytesIO(base64.b64decode(b64.split(",")[-1]))).convert("RGB")
# buffer = BytesIO()
# img.save(buffer, format="PNG")
# buffer.seek(0)
# sprite_images_bytes.append(buffer)
# # -----------------------------------------
# # Hybrid Similarity Matching System
# # -----------------------------------------
# def hybrid_similarity_matching(sprite_images_bytes, sprite_ids,
# min_similarity=None, top_k=5, method_weights=(0.5, 0.3, 0.2)):
# """
# Hybrid similarity matching using DINOv2 embeddings, perceptual hashing, and image signatures
# Args:
# sprite_images_bytes: List of image bytes
# sprite_ids: List of sprite identifiers
# blocks_dir: Directory containing reference blocks
# min_similarity: Minimum similarity threshold
# top_k: Number of top matches to return
# method_weights: Weights for (embedding, phash, image_signature) methods
# Returns:
# per_sprite_matched_indices, per_sprite_scores, paths_list
# """
# import imagehash as phash
# from image_match.goldberg import ImageSignature
# import math
# from collections import defaultdict
# # Load reference data
# embeddings_path = os.path.join(BLOCKS_DIR, "hybrid_embeddings.json")
# hash_path = os.path.join(BLOCKS_DIR, "phash_data.json")
# signature_path = os.path.join(BLOCKS_DIR, "signature_data.json")
# # Load embeddings
# with open(embeddings_path, "r", encoding="utf-8") as f:
# embedding_json = json.load(f)
# # Load phash data (if exists)
# hash_dict = {}
# if os.path.exists(hash_path):
# with open(hash_path, "r", encoding="utf-8") as f:
# hash_data = json.load(f)
# for path, hash_str in hash_data.items():
# try:
# hash_dict[path] = phash.hex_to_hash(hash_str)
# except:
# pass
# # Load signature data (if exists)
# signature_dict = {}
# gis = ImageSignature()
# if os.path.exists(signature_path):
# with open(signature_path, "r", encoding="utf-8") as f:
# sig_data = json.load(f)
# for path, sig_list in sig_data.items():
# try:
# signature_dict[path] = np.array(sig_list)
# except:
# pass
# # Parse embeddings
# paths_list = []
# embeddings_list = []
# if isinstance(embedding_json, dict):
# for p, emb in embedding_json.items():
# if isinstance(emb, dict):
# maybe_emb = emb.get("embedding") or emb.get("embeddings") or emb.get("emb")
# if maybe_emb is None:
# continue
# arr = np.asarray(maybe_emb, dtype=np.float32)
# elif isinstance(emb, list):
# arr = np.asarray(emb, dtype=np.float32)
# else:
# continue
# paths_list.append(os.path.normpath(str(p)))
# embeddings_list.append(arr)
# elif isinstance(embedding_json, list):
# for item in embedding_json:
# if not isinstance(item, dict):
# continue
# p = item.get("path") or item.get("image_path") or item.get("file") or item.get("filename") or item.get("img_path")
# emb = item.get("embeddings") or item.get("embedding") or item.get("features") or item.get("vector") or item.get("emb")
# if p is None or emb is None:
# continue
# paths_list.append(os.path.normpath(str(p)))
# embeddings_list.append(np.asarray(emb, dtype=np.float32))
# if len(paths_list) == 0:
# raise RuntimeError("No reference images/embeddings found")
# ref_matrix = np.vstack(embeddings_list).astype(np.float32)
# # Process input sprites
# # init_dinov2()
# per_sprite_matched_indices = []
# per_sprite_scores = []
# for i, (sprite_bytes, sprite_id) in enumerate(zip(sprite_images_bytes, sprite_ids)):
# print(f"Processing sprite {i+1}/{len(sprite_ids)}: {sprite_id}")
# # Convert bytes to PIL for processing
# sprite_pil = Image.open(sprite_bytes)
# if sprite_pil is None:
# per_sprite_matched_indices.append([])
# per_sprite_scores.append([])
# continue
# # Enhance image
# enhanced_sprite = process_image_cv2_from_pil(sprite_pil, scale=2)
# if enhanced_sprite is None:
# enhanced_sprite = sprite_pil
# # 1. Compute DINOv2 embedding
# sprite_emb = get_dinov2_embedding_from_pil(preprocess_for_model(enhanced_sprite))
# if sprite_emb is None:
# sprite_emb = np.zeros(ref_matrix.shape[1])
# # 2. Compute perceptual hash
# sprite_hash_arr = preprocess_for_hash(enhanced_sprite)
# sprite_phash = None
# if sprite_hash_arr is not None:
# try:
# sprite_phash = phash.encode_image(image_array=sprite_hash_arr)
# except:
# pass
# # 3. Compute image signature
# sprite_sig = None
# try:
# temp_path = f"temp_sprite_{i}.png"
# enhanced_sprite.save(temp_path, format="PNG")
# sprite_sig = gis.generate_signature(temp_path)
# os.remove(temp_path)
# except:
# pass
# # Calculate similarities for all reference images
# embedding_results = []
# phash_results = []
# signature_results = []
# for j, ref_path in enumerate(paths_list):
# # Embedding similarity
# try:
# ref_emb = ref_matrix[j]
# emb_sim = float(np.dot(sprite_emb, ref_emb))
# emb_sim = max(0.0, emb_sim) # Clamp negative values
# except:
# emb_sim = 0.0
# embedding_results.append((ref_path, emb_sim))
# # Phash similarity
# ph_sim = 0.0
# if sprite_phash is not None and ref_path in hash_dict:
# try:
# ref_hash = hash_dict[ref_path]
# hd = phash.hamming_distance(sprite_phash, ref_hash)
# ph_sim = max(0.0, 1.0 - (hd / 64.0)) # Normalize to [0,1]
# except:
# pass
# phash_results.append((ref_path, ph_sim))
# # Signature similarity
# sig_sim = 0.0
# if sprite_sig is not None and ref_path in signature_dict:
# try:
# ref_sig = signature_dict[ref_path]
# dist = gis.normalized_distance(ref_sig, sprite_sig)
# sig_sim = max(0.0, 1.0 - dist)
# except:
# pass
# signature_results.append((ref_path, sig_sim))
# # Combine similarities using weighted approach
# def normalize_scores(scores):
# """Normalize scores to [0,1] range"""
# if not scores:
# return {}
# vals = [s for _, s in scores if not math.isnan(s)]
# if not vals:
# return {p: 0.0 for p, _ in scores}
# vmin, vmax = min(vals), max(vals)
# if vmax == vmin:
# return {p: 1.0 if s == vmax else 0.0 for p, s in scores}
# return {p: (s - vmin) / (vmax - vmin) for p, s in scores}
# # Normalize each method's scores
# emb_norm = normalize_scores(embedding_results)
# ph_norm = normalize_scores(phash_results)
# sig_norm = normalize_scores(signature_results)
# # Calculate weighted combined scores
# w_emb, w_ph, w_sig = method_weights
# combined_scores = []
# for ref_path in paths_list:
# combined_score = (w_emb * emb_norm.get(ref_path, 0.0) +
# w_ph * ph_norm.get(ref_path, 0.0) +
# w_sig * sig_norm.get(ref_path, 0.0))
# combined_scores.append((ref_path, combined_score))
# # Sort by combined score and apply thresholds
# combined_scores.sort(key=lambda x: x[1], reverse=True)
# # Filter by minimum similarity if specified
# if min_similarity is not None:
# combined_scores = [(p, s) for p, s in combined_scores if s >= float(min_similarity)]
# # Get top-k matches
# top_matches = combined_scores[:int(top_k)]
# # Convert to indices and scores
# matched_indices = []
# matched_scores = []
# for ref_path, score in top_matches:
# try:
# idx = paths_list.index(ref_path)
# matched_indices.append(idx)
# matched_scores.append(score)
# except ValueError:
# continue
# per_sprite_matched_indices.append(matched_indices)
# per_sprite_scores.append(matched_scores)
# print(f"Sprite '{sprite_id}' matched {len(matched_indices)} references with scores: {matched_scores}")
# return per_sprite_matched_indices, per_sprite_scores, paths_list
# def choose_top_candidates_advanced(embedding_results, phash_results, imgmatch_results, top_k=10,
# method_weights=(0.5, 0.3, 0.2), verbose=True):
# """
# Advanced candidate selection using multiple ranking methods
# Args:
# embedding_results: list of (path, emb_sim)
# phash_results: list of (path, hamming, ph_sim)
# imgmatch_results: list of (path, dist, im_sim)
# top_k: number of top candidates to return
# method_weights: weights for (emb, phash, imgmatch)
# verbose: whether to print detailed results
# Returns:
# dict with top candidates from different methods and final selection
# """
# import math
# from collections import defaultdict
# # Build dicts for quick lookup
# emb_map = {p: float(s) for p, s in embedding_results}
# ph_map = {p: float(sim) for p, _, sim in phash_results}
# im_map = {p: float(sim) for p, _, sim in imgmatch_results}
# # Universe of candidates (union)
# all_paths = sorted(set(list(emb_map.keys()) + list(ph_map.keys()) + list(im_map.keys())))
# # Normalize each metric across candidates to [0,1]
# def normalize_map(m):
# vals = [m.get(p, None) for p in all_paths]
# present = [v for v in vals if v is not None and not math.isnan(v)]
# if not present:
# return {p: 0.0 for p in all_paths}
# vmin, vmax = min(present), max(present)
# if vmax == vmin:
# return {p: (1.0 if (m.get(p, None) is not None) else 0.0) for p in all_paths}
# norm = {}
# for p in all_paths:
# v = m.get(p, None)
# if v is None or math.isnan(v):
# norm[p] = 0.0
# else:
# norm[p] = max(0.0, min(1.0, (v - vmin) / (vmax - vmin)))
# return norm
# # For embeddings, clamp negatives to 0 first
# emb_map_clamped = {p: max(0.0, v) for p, v in emb_map.items()}
# emb_norm = normalize_map(emb_map_clamped)
# ph_norm = normalize_map(ph_map)
# im_norm = normalize_map(im_map)
# # Method A: Normalized weighted average
# w_emb, w_ph, w_im = method_weights
# weighted_scores = {}
# for p in all_paths:
# weighted_scores[p] = (w_emb * emb_norm.get(p, 0.0)
# + w_ph * ph_norm.get(p, 0.0)
# + w_im * im_norm.get(p, 0.0))
# top_weighted = sorted(weighted_scores.items(), key=lambda x: x[1], reverse=True)[:top_k]
# # Method B: Rank-sum (Borda)
# def ranks_from_map(m_norm):
# items = sorted(m_norm.items(), key=lambda x: x[1], reverse=True)
# ranks = {}
# for i, (p, _) in enumerate(items):
# ranks[p] = i + 1 # 1-based
# worst = len(items) + 1
# for p in all_paths:
# if p not in ranks:
# ranks[p] = worst
# return ranks
# rank_emb = ranks_from_map(emb_norm)
# rank_ph = ranks_from_map(ph_norm)
# rank_im = ranks_from_map(im_norm)
# rank_sum = {}
# for p in all_paths:
# rank_sum[p] = rank_emb.get(p, 9999) + rank_ph.get(p, 9999) + rank_im.get(p, 9999)
# top_rank_sum = sorted(rank_sum.items(), key=lambda x: x[1])[:top_k] # smaller is better
# # Method C: Harmonic mean
# harm_scores = {}
# for p in all_paths:
# a = emb_norm.get(p, 0.0)
# b = ph_norm.get(p, 0.0)
# c = im_norm.get(p, 0.0)
# if a + b + c == 0 or a == 0 or b == 0 or c == 0:
# harm = 0.0
# else:
# harm = 3.0 / ((1.0/a) + (1.0/b) + (1.0/c))
# harm_scores[p] = harm
# top_harm = sorted(harm_scores.items(), key=lambda x: x[1], reverse=True)[:top_k]
# # Consensus set: items in top-K of each metric
# def topk_set_by_map(m_norm, k=top_k):
# return set([p for p,_ in sorted(m_norm.items(), key=lambda x: x[1], reverse=True)[:k]])
# cons_set = topk_set_by_map(emb_norm, top_k) & topk_set_by_map(ph_norm, top_k) & topk_set_by_map(im_norm, top_k)
# result = {
# "emb_norm": emb_norm,
# "ph_norm": ph_norm,
# "im_norm": im_norm,
# "weighted_topk": top_weighted,
# "rank_sum_topk": top_rank_sum,
# "harmonic_topk": top_harm,
# "consensus_topk": list(cons_set),
# "weighted_scores_full": weighted_scores,
# "rank_sum_full": rank_sum,
# "harmonic_full": harm_scores
# }
# if verbose:
# print(f"\nTop by Weighted Average (weights emb,ph,img = {w_emb:.2f},{w_ph:.2f},{w_im:.2f}):")
# for i,(p,s) in enumerate(result["weighted_topk"], start=1):
# print(f" {i}. {p} score={s:.4f} emb={emb_norm.get(p,0):.3f} ph={ph_norm.get(p,0):.3f} im={im_norm.get(p,0):.3f}")
# print("\nTop by Rank-sum (lower is better):")
# for i,(p,s) in enumerate(result["rank_sum_topk"], start=1):
# print(f" {i}. {p} rank_sum={s} emb_rank={rank_emb.get(p)} ph_rank={rank_ph.get(p)} img_rank={rank_im.get(p)}")
# print("\nTop by Harmonic mean:")
# for i,(p,s) in enumerate(result["harmonic_topk"], start=1):
# print(f" {i}. {p} harm={s:.4f} emb={emb_norm.get(p,0):.3f} ph={ph_norm.get(p,0):.3f} im={im_norm.get(p,0):.3f}")
# print(f"\nConsensus (in top-{top_k} of ALL metrics): {result['consensus_topk']}")
# # Final selection logic
# final = None
# if len(result["consensus_topk"]) > 0:
# # Choose best-weighted among consensus
# consensus = result["consensus_topk"]
# best = max(consensus, key=lambda p: result["weighted_scores_full"].get(p, 0.0))
# final = best
# else:
# final = result["weighted_topk"][0][0] if result["weighted_topk"] else None
# result["final_selection"] = final
# return result
# # Use hybrid matching system
# # BLOCKS_DIR = r"D:\DEV PATEL\2025\scratch_VLM\scratch_agent\blocks"
# per_sprite_matched_indices, per_sprite_scores, paths_list = hybrid_similarity_matching(
# sprite_images_bytes, sprite_ids, min_similarity, top_k, method_weights=(0.5, 0.3, 0.2)
# )
# # =========================================
# # Copy matched sprite assets + collect data
# # =========================================
# project_data = []
# backdrop_data = []
# copied_sprite_folders = set()
# copied_backdrop_folders = set()
# # Flatten unique matched indices to process copying once per folder
# matched_indices = sorted({idx for lst in per_sprite_matched_indices for idx in lst})
# print("matched_indices------------------>",matched_indices)
# import shutil
# import json
# import os
# from pathlib import Path
# # normalize base paths once before the loop
# sprite_base_p = Path(sprite_base_path).resolve(strict=False)
# backdrop_base_p = Path(backdrop_base_path).resolve(strict=False)
# project_folder_p = Path(project_folder)
# project_folder_p.mkdir(parents=True, exist_ok=True)
# copied_sprite_folders = set()
# copied_backdrop_folders = set()
# def display_like_windows_no_lead(p: Path) -> str:
# """
# For human-readable logs only — convert Path to a string like:
# "app\\blocks\\Backdrops\\Castle 2.sb3" (no leading slash).
# """
# s = p.as_posix() # forward-slash string, safe for Path objects
# if s.startswith("/"):
# s = s[1:]
# return s.replace("/", "\\")
# def is_subpath(child: Path, parent: Path) -> bool:
# """Robust membership test: is child under parent?"""
# try:
# # use non-strict resolve only if needed, but avoid exceptions
# child.relative_to(parent)
# return True
# except Exception:
# return False
# # Flatten unique matched indices (if not already)
# matched_indices = sorted({idx for lst in per_sprite_matched_indices for idx in lst})
# print("matched_indices------------------>", matched_indices)
# for matched_idx in matched_indices:
# # defensive check
# if not (0 <= matched_idx < len(paths_list)):
# print(f" ⚠ matched_idx {matched_idx} out of range, skipping")
# continue
# matched_image_path = paths_list[matched_idx]
# matched_path_p = Path(matched_image_path).resolve(strict=False) # keep as Path
# matched_folder_p = matched_path_p.parent # Path object
# matched_filename = matched_path_p.name
# # Prepare display-only string (do NOT reassign matched_folder_p)
# matched_folder_display = display_like_windows_no_lead(matched_folder_p)
# print(f"Processing matched image: {matched_image_path}")
# print(f" - Folder: {matched_folder_display}")
# print(f" - Sprite path: {display_like_windows_no_lead(sprite_base_p)}")
# print(f" - Backdrop path: {display_like_windows_no_lead(backdrop_base_p)}")
# print(f" - Filename: {matched_filename}")
# # Use a canonical string to store in the copied set (POSIX absolute-ish)
# folder_key = matched_folder_p.as_posix()
# # ---------- SPRITE ----------
# if is_subpath(matched_folder_p, sprite_base_p) and folder_key not in copied_sprite_folders:
# print(f"Processing SPRITE folder: {matched_folder_display}")
# copied_sprite_folders.add(folder_key)
# sprite_json_path = matched_folder_p / "sprite.json"
# print("sprite_json_path----------------------->", sprite_json_path)
# print("copied sprite folder----------------------->", copied_sprite_folders)
# if sprite_json_path.exists() and sprite_json_path.is_file():
# try:
# with sprite_json_path.open("r", encoding="utf-8") as f:
# sprite_info = json.load(f)
# project_data.append(sprite_info)
# print(f" ✓ Successfully read sprite.json from {matched_folder_display}")
# except Exception as e:
# print(f" ✗ Failed to read sprite.json in {matched_folder_display}: {repr(e)}")
# else:
# print(f" ⚠ No sprite.json in {matched_folder_display}")
# # copy non-matching files from the sprite folder (except matched image and sprite.json)
# try:
# sprite_files = list(matched_folder_p.iterdir())
# except Exception as e:
# sprite_files = []
# print(f" ✗ Failed to list files in {matched_folder_display}: {repr(e)}")
# print(f" Files in sprite folder: {[p.name for p in sprite_files]}")
# for p in sprite_files:
# fname = p.name
# if fname in (matched_filename, "sprite.json"):
# print(f" Skipping {fname} (matched image or sprite.json)")
# continue
# if p.is_file():
# dst = project_folder_p / fname
# try:
# shutil.copy2(str(p), str(dst))
# print(f" ✓ Copied sprite asset: {p} -> {dst}")
# except Exception as e:
# print(f" ✗ Failed to copy sprite asset {p}: {repr(e)}")
# else:
# print(f" Skipping {fname} (not a file)")
# # ---------- BACKDROP ----------
# if is_subpath(matched_folder_p, backdrop_base_p) and folder_key not in copied_backdrop_folders:
# print(f"Processing BACKDROP folder: {matched_folder_display}")
# copied_backdrop_folders.add(folder_key)
# print("backdrop_base_path----------------------->", display_like_windows_no_lead(backdrop_base_p))
# print("copied backdrop folder----------------------->", copied_backdrop_folders)
# # copy matched backdrop image
# backdrop_src = matched_folder_p / matched_filename
# backdrop_dst = project_folder_p / matched_filename
# if backdrop_src.exists() and backdrop_src.is_file():
# try:
# shutil.copy2(str(backdrop_src), str(backdrop_dst))
# print(f" ✓ Copied matched backdrop image: {backdrop_src} -> {backdrop_dst}")
# except Exception as e:
# print(f" ✗ Failed to copy matched backdrop image {backdrop_src}: {repr(e)}")
# else:
# print(f" ⚠ Matched backdrop source not found: {backdrop_src}")
# # copy other files from folder (skip project.json and matched image)
# try:
# backdrop_files = list(matched_folder_p.iterdir())
# except Exception as e:
# backdrop_files = []
# print(f" ✗ Failed to list files in {matched_folder_display}: {repr(e)}")
# print(f" Files in backdrop folder: {[p.name for p in backdrop_files]}")
# for p in backdrop_files:
# fname = p.name
# if fname in (matched_filename, "project.json"):
# print(f" Skipping {fname} (matched image or project.json)")
# continue
# if p.is_file():
# dst = project_folder_p / fname
# try:
# shutil.copy2(str(p), str(dst))
# print(f" ✓ Copied backdrop asset: {p} -> {dst}")
# except Exception as e:
# print(f" ✗ Failed to copy backdrop asset {p}: {repr(e)}")
# else:
# print(f" Skipping {fname} (not a file)")
# # read project.json to extract Stage/targets
# pj = matched_folder_p / "project.json"
# if pj.exists() and pj.is_file():
# try:
# with pj.open("r", encoding="utf-8") as f:
# bd_json = json.load(f)
# stage_count = 0
# for tgt in bd_json.get("targets", []):
# if tgt.get("isStage"):
# backdrop_data.append(tgt)
# stage_count += 1
# print(f" ✓ Successfully read project.json from {matched_folder_display}, found {stage_count} stage(s)")
# except Exception as e:
# print(f" ✗ Failed to read project.json in {matched_folder_display}: {repr(e)}")
# else:
# print(f" ⚠ No project.json in {matched_folder_display}")
# print("---")
# final_project = {
# "targets": [], "monitors": [], "extensions": [],
# "meta": {
# "semver": "3.0.0",
# "vm": "11.3.0",
# "agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"
# }
# }
# # Add sprite targets (non-stage)
# for spr in project_data:
# if not spr.get("isStage", False):
# final_project["targets"].append(spr)
# # then backdrop as the Stage
# if backdrop_data:
# all_costumes, sounds = [], []
# seen_costumes = set()
# for i, bd in enumerate(backdrop_data):
# for costume in bd.get("costumes", []):
# # Create a unique key for the costume
# key = (costume.get("name"), costume.get("assetId"))
# if key not in seen_costumes:
# seen_costumes.add(key)
# all_costumes.append(costume)
# if i == 0:
# sounds = bd.get("sounds", [])
# stage_obj={
# "isStage": True,
# "name": "Stage",
# "objName": "Stage",
# "variables": {},
# "lists": {},
# "broadcasts": {},
# "blocks": {},
# "comments": {},
# "currentCostume": 1 if len(all_costumes) > 1 else 0,
# "costumes": all_costumes,
# "sounds": sounds,
# "volume": 100,
# "layerOrder": 0,
# "tempo": 60,
# "videoTransparency": 50,
# "videoState": "on",
# "textToSpeechLanguage": None
# }
# final_project["targets"].insert(0, stage_obj)
# else:
# logger.warning("⚠️ No backdrop matched. Using default static backdrop.")
# default_backdrop_path = BACKDROP_DIR / "cd21514d0531fdffb22204e0ec5ed84a.svg"
# default_backdrop_name = "cd21514d0531fdffb22204e0ec5ed84a.svg"
# default_backdrop_sound = BACKDROP_DIR / "83a9787d4cb6f3b7632b4ddfebf74367.wav"
# default_backdrop_sound_name = "cd21514d0531fdffb22204e0ec5ed84a.svg"
# try:
# shutil.copy2(default_backdrop_path, os.path.join(project_folder, default_backdrop_name))
# logger.info(f"✅ Default backdrop copied to project: {default_backdrop_name}")
# shutil.copy2(default_backdrop_sound, os.path.join(project_folder, default_backdrop_sound_name))
# logger.info(f"✅ Default backdrop sound copied to project: {default_backdrop_sound_name}")
# except Exception as e:
# logger.error(f"❌ Failed to copy default backdrop: {e}")
# stage_obj={
# "isStage": True,
# "name": "Stage",
# "objName": "Stage",
# "variables": {},
# "lists": {},
# "broadcasts": {},
# "blocks": {},
# "comments": {},
# "currentCostume": 0,
# "costumes": [
# {
# "assetId": default_backdrop_name.split(".")[0],
# "name": "defaultBackdrop",
# "md5ext": default_backdrop_name,
# "dataFormat": "svg",
# "rotationCenterX": 240,
# "rotationCenterY": 180
# }
# ],
# "sounds": [
# {
# "name": "pop",
# "assetId": "83a9787d4cb6f3b7632b4ddfebf74367",
# "dataFormat": "wav",
# "format": "",
# "rate": 48000,
# "sampleCount": 1123,
# "md5ext": "83a9787d4cb6f3b7632b4ddfebf74367.wav"
# }
# ],
# "volume": 100,
# "layerOrder": 0,
# "tempo": 60,
# "videoTransparency": 50,
# "videoState": "on",
# "textToSpeechLanguage": None
# }
# final_project["targets"].insert(0, stage_obj)
# with open(project_json_path, 'w') as f:
# json.dump(final_project, f, indent=2)
# return project_json_path
def convert_pdf_stream_to_images(pdf_stream: io.BytesIO, dpi=300):
# Ensure we are at the start of the stream
pdf_stream.seek(0)
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf:
tmp_pdf.write(pdf_stream.read())
tmp_pdf_path = tmp_pdf.name
# Now use convert_from_path on the temp file
images = convert_from_path(tmp_pdf_path, dpi=dpi)
return images
def delay_for_tpm_node(state: GameState):
logger.info("--- Running DelayForTPMNode ---")
time.sleep(10) # Adjust the delay as needed
logger.info("Delay completed.")
return state
# Build the LangGraph workflow
workflow = StateGraph(GameState)
workflow.add_node("pseudo_generator", pseudo_generator_node)
workflow.add_node("Node_optimizer", node_optimizer)
workflow.add_node("layer_optimizer", layer_order_correction)
workflow.add_node("block_builder", overall_block_builder_node_2)
workflow.add_node("variable_initializer", variable_adder_node)
workflow.add_node("page_processed", processed_page_node)
workflow.set_entry_point("page_processed")
# Conditional branching from the start
def decide_next_step(state: GameState):
if state.get("processing", False):
return "pseudo_generator"
else:
return "layer_optimizer"#END
workflow.add_conditional_edges(
"page_processed",
decide_next_step,
{
"pseudo_generator": "pseudo_generator",
"layer_optimizer": "layer_optimizer"
}
)
# Main chain
workflow.add_edge("pseudo_generator", "Node_optimizer")
workflow.add_edge("Node_optimizer", "block_builder")
workflow.add_edge("block_builder", "variable_initializer")
workflow.add_edge("variable_initializer", "page_processed")
workflow.add_edge("layer_optimizer", END)
app_graph = workflow.compile()
# ============== Helper function to Upscale an Image ============== #
def upscale_image(image: Image.Image, scale: int = 2) -> Image.Image:
"""
Upscales a PIL image by a given scale factor.
"""
try:
width, height = image.size
new_size = (width * scale, height * scale)
upscaled_image = image.resize(new_size, Image.LANCZOS)
logger.info(f"✅ Upscaled image to {new_size}")
return upscaled_image
except Exception as e:
logger.error(f"❌ Error during image upscaling: {str(e)}")
return image
@log_execution_time
def create_sb3_archive(project_folder, project_id):
"""
Zips the project folder and renames it to an .sb3 file.
Args:
project_folder (str): The path to the directory containing the project.json and assets.
project_id (str): The unique ID for the project, used for naming the .sb3 file.
Returns:
str: The path to the created .sb3 file, or None if an error occurred.
"""
print(" --------------------------------------- create_sb3_archive INITIALIZE ---------------------------------------")
output_filename = GEN_PROJECT_DIR / project_id
print(" --------------------------------------- output_filename ---------------------------------------",output_filename)
zip_path = None
sb3_path = None
try:
zip_path = shutil.make_archive(output_filename, 'zip', root_dir=project_folder)
print(" --------------------------------------- zip_path_str ---------------------------------------", output_filename, project_folder)
logger.info(f"Project folder zipped to: {zip_path}")
# 2. Rename the .zip file to .sb3
sb3_path = f"{output_filename}.sb3"
os.rename(zip_path, sb3_path)
print(" --------------------------------------- rename paths ---------------------------------------", zip_path, sb3_path)
logger.info(f"Renamed {zip_path} to {sb3_path}")
return sb3_path
except Exception as e:
logger.error(f"Error creating SB3 archive for {project_id}: {e}")
# Clean up any partial files if an error occurs
if zip_path and os.path.exists(zip_path):
os.remove(zip_path)
if sb3_path and os.path.exists(sb3_path):
os.remove(sb3_path)
return sb3_path
#{ changes -> pdf_stream replacement of pdf_path
# def save_pdf_to_generated_dir(pdf_path: str, project_id: str) -> str:
def save_pdf_to_generated_dir(pdf_stream: io.BytesIO, project_id: str) -> str:
"""
Copies the PDF at `pdf_stream` into GEN_PROJECT_DIR/project_id/,
renaming it to <project_id>.pdf.
Args:
pdf_stream (io.BytesIO): Any existing stream to a PDF file.
project_id (str): Your unique project identifier.
Returns:
str: Path to the copied PDF in the generated directory,
or None if something went wrong.
"""
# }
try:
# 1) Build the destination directory and base filename
output_dir = GEN_PROJECT_DIR / project_id
output_dir.mkdir(parents=True, exist_ok=True)
print(f"\n--------------------------------output_dir {output_dir}")
# 2) Define the target PDF path
target_pdf = output_dir / f"{project_id}.pdf"
print(f"\n--------------------------------target_pdf {target_pdf}")
# 3) Copy the PDF
# {
# shutil.copy2(pdf_path, target_pdf)
if isinstance(pdf_stream, io.BytesIO):
with open(target_pdf, "wb") as f:
f.write(pdf_stream.getbuffer())
else:
shutil.copy2(pdf_stream, target_pdf)
print(f"Copied PDF from {pdf_stream}{target_pdf}")
logger.info(f"Copied PDF from {pdf_stream}{target_pdf}")
# }
return str(target_pdf)
except Exception as e:
logger.error(f"Failed to save PDF to generated dir: {e}", exc_info=True)
return None
@app.route('/')
def index():
return render_template('app_index.html')
@app.route("/download_sb3/<project_id>", methods=["GET"])
def download_sb3(project_id):
sb3_path = GEN_PROJECT_DIR / f"{project_id}.sb3"
if not sb3_path.exists():
return jsonify({"error": "Scratch project file not found"}), 404
return send_file(
sb3_path,
as_attachment=True,
download_name=sb3_path.name
)
@app.route("/download_pdf/<project_id>", methods=["GET"])
def download_pdf(project_id):
pdf_path = GEN_PROJECT_DIR / project_id / f"{project_id}.pdf"
if not pdf_path.exists():
return jsonify({"error": "Scratch project file not found"}), 404
return send_file(
pdf_path,
as_attachment=True,
download_name=pdf_path.name
)
@app.route("/download_sound/<sound_id>", methods=["GET"])
def download_sound(sound_id):
sound_path = SOUND_DIR / f"{sound_id}.wav"
if not sound_path.exists():
return jsonify({"error": "Scratch project sound file not found"}), 404
return send_file(
sound_path,
as_attachment=True,
download_name=sound_path.name
)
# API endpoint
@app.route('/process_pdf', methods=['POST'])
def process_pdf():
try:
logger.info("Received request to process PDF.")
if 'pdf_file' not in request.files:
logger.warning("No PDF file found in request.")
return jsonify({"error": "Missing PDF file in form-data with key 'pdf_file'"}), 400
pdf_file = request.files['pdf_file']
if pdf_file.filename == '':
return jsonify({"error": "Empty filename"}), 400
# ================================================= #
# Generate Random UUID for project folder name #
# ================================================= #
project_id = str(uuid.uuid4()).replace('-', '')
# project_folder = os.path.join("outputs", f"{project_id}")
project_folder = OUTPUT_DIR / project_id
pdf_bytes = pdf_file.read()
pdf_stream = io.BytesIO(pdf_bytes)
logger.info(f"Saved uploaded PDF to: {pdf_stream}")
# pdf= save_pdf_to_generated_dir(saved_pdf_path, project_id)
start_time = time.time()
pdf= save_pdf_to_generated_dir(pdf_stream, project_id)
logger.info(f"Saved uploaded PDF to: {pdf_file}: {pdf}")
print("--------------------------------pdf_file_path---------------------",pdf_file,pdf_stream)
total_time = time.time() - start_time
print(f"-----------------------------Execution Time save_pdf_to_generated_dir() : {total_time}-----------------------------\n")
start_time = time.time()
# output_path = extract_images_from_pdf(pdf_stream)
output_path = extract_images_from_pdf(pdf_stream,project_folder)
print(" --------------------------------------- zip_path_str ---------------------------------------", output_path)
total_time = time.time() - start_time
print(f"-----------------------------Execution Time extract_images_from_pdf() : {total_time}-----------------------------\n")
start_time = time.time()
project_output = similarity_matching(output_path, project_folder)
logger.info("Received request to process PDF.")
total_time = time.time() - start_time
print(f"-----------------------------Execution Time similarity_matching() : {total_time}-----------------------------\n")
with open(project_output, 'r') as f:
project_skeleton = json.load(f)
if isinstance(pdf_stream, io.BytesIO):
images = convert_pdf_stream_to_images(pdf_stream, dpi=300)
else:
images = convert_from_path(pdf_stream, dpi=300)
#updating logic here [Dev Patel]
initial_state_dict = {
"project_json": project_skeleton,
"description": "The pseudo code for the script",
"project_id": project_id,
"project_image": images,
"action_plan": {},
"pseudo_code": {},
"temporary_node": {},
"processing":True,
"page_count": 0,
"temp_pseudo_code":[],
}
final_state_dict = app_graph.invoke(initial_state_dict,config={"recursion_limit": 200})
final_project_json = final_state_dict['project_json'] # Access as dict
#final_project_json = project_skeleton
# Save the *final* filled project JSON, overwriting the skeleton
with open(project_output, "w") as f:
json.dump(final_project_json, f, indent=2)
logger.info(f"Final project JSON saved to {project_output}")
# --- Call the new function to create the .sb3 file ---
sb3_file_path = create_sb3_archive(project_folder, project_id)
if sb3_file_path:
logger.info(f"Successfully created SB3 file: {sb3_file_path}")
# Instead of returning the local path, return a URL to the download endpoint
download_url = f"https://prthm11-scratch-vision-game.hf.space/download_sb3/{project_id}"
pdf_url = f"https://prthm11-scratch-vision-game.hf.space/download_pdf/{project_id}"
print(f"DOWNLOAD_URL: {download_url}")
print(f"PDF_URL: {pdf_url}")
# return jsonify({"message": "Procesed PDF and Game sb3 generated successfully", "project_id": project_id, "download_url": download_url})
return jsonify({
"message": "✅ PDF processed successfully",
"output_json": "output_path",
"sprites": "result",
"project_output_json": "project_output",
"test_url": download_url
})
else:
return jsonify({
"message": "❌ Scanned images are not clear please retry!",
"isError": True,
"output_json": "output_path",
"sprites": "result",
"project_output_json": "project_output",
"test_url": download_url
}), 500
except Exception as e:
logger.error(f"Error during processing the pdf workflow for project ID {project_id}: {e}", exc_info=True)
return jsonify({
"message": "❌ Scanned images are not clear please retry!",
"isError": True,
"output_json": "output_path",
"sprites": "result",
"project_output_json": "project_output",
"test_url": "download_url"
}), 500
if __name__ == '__main__':
# os.makedirs("outputs", exist_ok=True) #== commented by P
app.run(host='0.0.0.0', port=7860, debug=True)