from flask import Flask, request, jsonify, render_template, send_from_directory, send_file import cv2, json,base64,io,os,tempfile,logging, re import numpy as np from unstructured.partition.pdf import partition_pdf from PIL import Image, ImageOps, ImageEnhance from dotenv import load_dotenv from werkzeug.utils import secure_filename from langchain_groq import ChatGroq from langgraph.prebuilt import create_react_agent from pdf2image import convert_from_path, convert_from_bytes from typing import Dict, TypedDict, Optional, Any, List, Tuple from collections import defaultdict from langgraph.graph import StateGraph, END import uuid import shutil, time, functools from io import BytesIO from pathlib import Path from utils.block_relation_builder import block_builder, separate_scripts, transform_logic_to_action_flow, analyze_opcode_counts from difflib import get_close_matches import torch from transformers import AutoImageProcessor, AutoModel import torch import json import cv2 from imagededup.methods import PHash from image_match.goldberg import ImageSignature import sys import math import hashlib # DINOv2 model id DINOV2_MODEL = "facebook/dinov2-small" # For PHash normalization when combining scores: assumed max hamming bits (typical phash=64) MAX_PHASH_BITS = 64 # ---------------------- # INITIALIZE MODELS # ---------------------- print("Initializing models and helpers...") DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") if DEVICE.type == "cpu": torch.set_num_threads(4) dinov2_processor = AutoImageProcessor.from_pretrained(DINOV2_MODEL) dinov2_model = AutoModel.from_pretrained(DINOV2_MODEL) dinov2_model.to(DEVICE) dinov2_model.eval() phash = PHash() gis = ImageSignature() load_dotenv() # os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY") groq_api_key = os.getenv("GROQ_API_KEY") llm = ChatGroq( model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, max_tokens=None, ) app = Flask(__name__) backdrop_images_path = r"app\blocks\Backdrops" sprite_images_path = r"app\blocks\sprites" code_blocks_image_path = r"app\blocks\code_blocks" count = 0 from pathlib import Path BASE_DIR = Path(os.getenv("APP_BASE_DIR", Path(__file__).resolve().parent)) # fallback to code location BASE_DIR = Path("/app") LOGS_DIR = Path(os.getenv("LOGS_DIR", "/tmp/logs")).resolve() LOGS_DIR.mkdir(parents=True, exist_ok=True) STATIC_DIR = BASE_DIR / "static" GEN_PROJECT_DIR = BASE_DIR / "generated_projects" # adjust BLOCKS_DIR etc: BLOCKS_DIR = BASE_DIR / "blocks" BACKDROP_DIR = BLOCKS_DIR / "Backdrops" SPRITE_DIR = BLOCKS_DIR / "sprites" CODE_BLOCKS_DIR = BLOCKS_DIR / "code_blocks" SOUND_DIR = BLOCKS_DIR / "sound" # BASE_DIR = Path("/app") # BLOCKS_DIR = BASE_DIR / "blocks" # BACKDROP_DIR = BLOCKS_DIR / "Backdrops" # SPRITE_DIR = BLOCKS_DIR / "sprites" # CODE_BLOCKS_DIR = BLOCKS_DIR / "code_blocks" # # === new: outputs rooted under BASE_DIR === OUTPUT_DIR = BASE_DIR / "outputs" # Global variables to hold the model and index, loaded only once. MODEL = None FAISS_INDEX = None IMAGE_PATHS = None # make all of them in one go for d in ( BLOCKS_DIR, STATIC_DIR, GEN_PROJECT_DIR, BACKDROP_DIR, SPRITE_DIR, CODE_BLOCKS_DIR, SOUND_DIR, OUTPUT_DIR, ): d.mkdir(parents=True, exist_ok=True) def log_execution_time(func): @functools.wraps(func) def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() logger.info(f"⏱ {func.__name__} executed in {end_time - start_time:.2f} seconds") return result return wrapper # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(str(LOGS_DIR / "app.log")), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class GameState(TypedDict): project_json: dict description: str project_id: str project_image: str pseudo_code: dict action_plan: Optional[Dict] temporary_node: Optional[Dict] page_count: int processing: bool temp_pseudo_code: list SYSTEM_PROMPT ="""Your task is to process OCR-extracted text from images of Scratch 3.0 code blocks and produce precisely formatted pseudocode JSON. ### Core Role - Treat this as an OCR refinement task: the input may contain typos or spacing issues. - Intelligently correct OCR mistakes to align with valid Scratch 3.0 block syntax. ### Universal Rules 1. **Code Detection:** If no Scratch blocks are detected, the `pseudocode` value must be "No Code-blocks". 2. **Script Ownership:** Determine the target from "Script for:". If it matches a `Stage_costumes` name, set `name_variable` to "Stage". 3. **Pseudocode Structure:** - The pseudocode must be a single JSON string with `\n` for newlines. - Indent nested blocks with 4 spaces. - Every script (hat block) and every C-block (if, repeat, forever) MUST have a corresponding `end` at the correct indentation level. 4. **Formatting Syntax:** - Numbers & Text: `(5)`, `(hello)` - Variables & Dropdowns: `[score v]`, `[space v]` - Reporters: `((x position))` - Booleans: `` 5. **Final Output:** Your response must ONLY be the valid JSON object and nothing else.""" SYSTEM_PROMPT_JSON_CORRECTOR = """ You are a JSON correction assistant. Your ONLY task is to fix malformed JSON and return it in the correct format. REQUIRED OUTPUT FORMAT: { "refined_logic": { "name_variable": "sprite_name_here", "pseudocode": "pseudocode_string_here" } } RULES: 1. Extract the sprite name and pseudocode from the input 2. Return ONLY valid JSON in the exact format above 3. No explanations, no extra text, no other fields 4. If you can't find the data, use "Unknown" for name_variable and "No pseudocode found" for pseudocode """ # Main agent of the system agent for Scratch 3.0 agent = create_react_agent( model=llm, tools=[], # No specific tools are defined here, but could be added later prompt=SYSTEM_PROMPT ) agent_json_resolver = create_react_agent( model=llm, tools=[], # No specific tools are defined here, but could be added later prompt=SYSTEM_PROMPT_JSON_CORRECTOR ) # ----------------------- # SERIALIZABLE HELPER # ----------------------- def make_json_serializable(obj): """Recursively convert numpy and other objects into JSON-serializable types.""" if obj is None: return None if isinstance(obj, (str, int, float, bool)): return obj if isinstance(obj, np.ndarray): return obj.tolist() if isinstance(obj, dict): return {str(k): make_json_serializable(v) for k, v in obj.items()} if isinstance(obj, (list, tuple)): return [make_json_serializable(v) for v in obj] # some image-match signatures may contain numpy, so try .tolist try: return obj.tolist() except Exception: pass # fallback to string return str(obj) # ----------------------- # BASE64 <-> PIL # ----------------------- def pil_to_base64(pil_img, fmt="PNG"): buffer = io.BytesIO() pil_img.save(buffer, format=fmt) return base64.b64encode(buffer.getvalue()).decode("utf-8") def base64_to_pil(b64): try: data = base64.b64decode(b64) return Image.open(io.BytesIO(data)) except Exception as e: print(f"[base64_to_pil] Error: {e}") return None # ----------------------- # PIL helpers # ----------------------- def load_image_pil(path): try: return Image.open(path) except Exception as e: print(f"[load_image_pil] Could not open {path}: {e}") return None def add_background(pil_img, bg_color=(255,255,255), size=None): if pil_img is None: return None try: target = size if size is not None else pil_img.size bg = Image.new("RGB", target, bg_color) img_rgba = pil_img.convert("RGBA") if img_rgba.size != target: x = (target[0] - img_rgba.size[0]) // 2 y = (target[1] - img_rgba.size[1]) // 2 else: x, y = 0, 0 mask = img_rgba.split()[3] if img_rgba.mode == "RGBA" else None bg.paste(img_rgba.convert("RGB"), (x,y), mask=mask) return bg except Exception as e: print(f"[add_background] Error: {e}") return None def preprocess_for_hash(pil_img, size=(256,256)): try: img = pil_img.convert("RGB") img = ImageOps.grayscale(img) img = ImageOps.equalize(img) img = img.resize(size) return np.array(img).astype(np.uint8) except Exception as e: print(f"[preprocess_for_hash] Error: {e}") return None def preprocess_for_model(pil_img): try: if pil_img.mode == "RGBA": pil_img = pil_img.convert("RGB") elif pil_img.mode == "L": pil_img = pil_img.convert("RGB") else: pil_img = pil_img.convert("RGB") return pil_img except Exception as e: print(f"[preprocess_for_model] Error: {e}") return None def get_dinov2_embedding_from_pil(pil_img): try: if pil_img is None: return None inputs = dinov2_processor(images=pil_img, return_tensors="pt").to(DEVICE) with torch.no_grad(): outputs = dinov2_model(**inputs) # CLS token embedding emb = outputs.last_hidden_state[:,0,:].squeeze(0).cpu().numpy() n = np.linalg.norm(emb) if n == 0 or np.isnan(n): return None return (emb / n).astype(float) except Exception as e: print(f"[get_dinov2_embedding_from_pil] Error: {e}") return None # ----------------------- # OpenCV enhancement (accepts PIL) # ----------------------- def pil_to_bgr_np(pil_img): arr = np.array(pil_img.convert("RGB")) return cv2.cvtColor(arr, cv2.COLOR_RGB2BGR) def bgr_np_to_pil(bgr_np): rgb = cv2.cvtColor(bgr_np, cv2.COLOR_BGR2RGB) return Image.fromarray(rgb) def upscale_image_cv(bgr_np, scale=2): h,w = bgr_np.shape[:2] return cv2.resize(bgr_np, (w*scale, h*scale), interpolation=cv2.INTER_CUBIC) def reduce_noise_cv(bgr_np): return cv2.fastNlMeansDenoisingColored(bgr_np, None, 10,10,7,21) def sharpen_cv(bgr_np): kernel = np.array([[0,-1,0],[-1,5,-1],[0,-1,0]]) return cv2.filter2D(bgr_np, -1, kernel) def enhance_contrast_cv(bgr_np): pil_img = Image.fromarray(cv2.cvtColor(bgr_np, cv2.COLOR_BGR2RGB)) enhancer = ImageEnhance.Contrast(pil_img) enhanced = enhancer.enhance(1.5) return cv2.cvtColor(np.array(enhanced), cv2.COLOR_RGB2BGR) def process_image_cv2_from_pil(pil_img, scale=2): try: bgr = pil_to_bgr_np(pil_img) bgr = upscale_image_cv(bgr, scale=scale) if scale != 1 else bgr bgr = reduce_noise_cv(bgr) bgr = sharpen_cv(bgr) bgr = enhance_contrast_cv(bgr) return bgr_np_to_pil(bgr) except Exception as e: print(f"[process_image_cv2_from_pil] Error: {e}") return None # cosine similarity def cosine_similarity(a, b): return float(np.dot(a, b)) # -------------------------- # Hybrid Selection of Best Match # -------------------------- def run_query_search_flow( query_path: Optional[str] = None, query_b64: Optional[str] = None, processed_dir: str = "./processed", embeddings_dict: Dict[str, np.ndarray] = None, hash_dict: Dict[str, Any] = None, signature_obj_map: Dict[str, Any] = None, gis: Any = None, phash: Any = None, MAX_PHASH_BITS: int = 64, k: int = 10, ) -> Tuple[ List[Tuple[str, float]], List[Tuple[str, Any, float]], List[Tuple[str, Any, float]], List[Tuple[str, float, float, float, float]], ]: """ Run the full query/search flow (base64 -> preprocess -> embed -> scoring). Accepts either query_path (file on disk) OR query_b64 (base64 string). If both are provided, query_b64 takes precedence. Returns: embedding_results_sorted, phash_results_sorted, imgmatch_results_sorted, combined_results_sorted """ # Validate inputs if (query_path is None or query_path == "") and (query_b64 is None or query_b64 == ""): raise ValueError("Either query_path or query_b64 must be provided.") # Ensure processed_dir exists os.makedirs(processed_dir, exist_ok=True) print("\n--- Query/Search Phase ---") # 1) Load query image (prefer base64 if provided) if query_b64: # base64 provided directly -> decode to PIL query_from_b64 = base64_to_pil(query_b64) if query_from_b64 is None: raise RuntimeError("Could not decode provided base64 query. Exiting.") query_pil_orig = query_from_b64 else: # load from disk if not os.path.exists(query_path): raise FileNotFoundError(f"Query image not found: {query_path}") query_pil_orig = load_image_pil(query_path) if query_pil_orig is None: raise RuntimeError("Could not load query image from path. Exiting.") # also create a base64 roundtrip for robustness (keep original behaviour) try: query_b64 = pil_to_base64(query_pil_orig, fmt="PNG") except Exception as e: raise RuntimeError(f"Could not base64 query from disk image: {e}") # keep decoded copy for consistency query_from_b64 = base64_to_pil(query_b64) if query_from_b64 is None: raise RuntimeError("Could not decode query base64 after roundtrip. Exiting.") # At this point, query_from_b64 is a PIL.Image we can continue with # 2) Preprocess with OpenCV enhancement (best-effort; fallback to base64-decoded image) enhanced_query_pil = process_image_cv2_from_pil(query_from_b64, scale=2) if enhanced_query_pil is None: print("[Query] OpenCV enhancement failed; falling back to base64-decoded image.") enhanced_query_pil = query_from_b64 # Save the enhanced query (best-effort) query_enhanced_path = os.path.join(processed_dir, "query_enhanced.png") try: enhanced_query_pil.save(query_enhanced_path, format="PNG") except Exception: try: enhanced_query_pil.convert("RGB").save(query_enhanced_path, format="PNG") except Exception: print("[Warning] Could not save enhanced query image for inspection.") # 3) Query embedding (preprocess -> model) prepped = preprocess_for_model(enhanced_query_pil) query_emb = get_dinov2_embedding_from_pil(prepped) if query_emb is None: raise RuntimeError("Could not compute query embedding. Exiting.") # 4) Query phash computation query_hash_arr = preprocess_for_hash(enhanced_query_pil) if query_hash_arr is None: raise RuntimeError("Could not compute query phash array. Exiting.") query_phash = phash.encode_image(image_array=query_hash_arr) # 5) Query signature generation (best-effort) query_sig = None query_sig_path = os.path.join(processed_dir, "query_for_sig.png") try: enhanced_query_pil.save(query_sig_path, format="PNG") except Exception: try: enhanced_query_pil.convert("RGB").save(query_sig_path, format="PNG") except Exception: query_sig_path = None if query_sig_path: try: query_sig = gis.generate_signature(query_sig_path) except Exception as e: print(f"[ImageSignature] failed for query: {e}") query_sig = None # ----------------------- # Prepare stored data arrays # ----------------------- embeddings_dict = embeddings_dict or {} hash_dict = hash_dict or {} signature_obj_map = signature_obj_map or {} image_paths = list(embeddings_dict.keys()) image_embeddings = np.array(list(embeddings_dict.values()), dtype=float) if embeddings_dict else np.array([]) def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: try: return float(np.dot(a, b)) except Exception: return -1.0 # Collections embedding_results: List[Tuple[str, float]] = [] phash_results: List[Tuple[str, Any, float]] = [] imgmatch_results: List[Tuple[str, Any, float]] = [] combined_results: List[Tuple[str, float, float, float, float]] = [] # Iterate stored images and compute similarities for idx, path in enumerate(image_paths): # Embedding similarity try: stored_emb = image_embeddings[idx] emb_sim = cosine_similarity(query_emb, stored_emb) except Exception: emb_sim = -1.0 embedding_results.append((path, emb_sim)) # PHash similarity (Hamming -> normalized sim) try: stored_ph = hash_dict.get(path) if stored_ph is not None: hd = phash.hamming_distance(query_phash, stored_ph) ph_sim = max(0.0, 1.0 - (hd / float(MAX_PHASH_BITS))) else: hd = None ph_sim = 0.0 except Exception: hd = None ph_sim = 0.0 phash_results.append((path, hd, ph_sim)) # Image signature similarity (normalized distance -> similarity) try: stored_sig = signature_obj_map.get(path) if stored_sig is not None and query_sig is not None: dist = gis.normalized_distance(stored_sig, query_sig) im_sim = max(0.0, 1.0 - dist) else: dist = None im_sim = 0.0 except Exception: dist = None im_sim = 0.0 imgmatch_results.append((path, dist, im_sim)) # Combined score: average of the three (embedding is clamped into [0,1]) emb_clamped = max(0.0, min(1.0, emb_sim)) combined = (emb_clamped + ph_sim + im_sim) / 3.0 combined_results.append((path, combined, emb_clamped, ph_sim, im_sim)) # ----------------------- # Sort results # ----------------------- embedding_results.sort(key=lambda x: x[1], reverse=True) phash_results_sorted = sorted(phash_results, key=lambda x: (x[2] is not None, x[2]), reverse=True) imgmatch_results_sorted = sorted(imgmatch_results, key=lambda x: (x[2] is not None, x[2]), reverse=True) combined_results.sort(key=lambda x: x[1], reverse=True) # ----------------------- # Print Top-K results # ----------------------- print("\nTop results by DINOv2 Embeddings:") for i, (path, score) in enumerate(embedding_results[:k], start=1): print(f"Rank {i}: {path} | Cosine: {score:.4f}") print("\nTop results by PHash (Hamming distance & normalized sim):") for i, (path, hd, sim) in enumerate(phash_results_sorted[:k], start=1): print(f"Rank {i}: {path} | Hamming: {hd} | NormSim: {sim:.4f}") print("\nTop results by ImageSignature (normalized similarity = 1 - distance):") for i, (path, dist, sim) in enumerate(imgmatch_results_sorted[:k], start=1): print(f"Rank {i}: {path} | NormDist: {dist} | NormSim: {sim:.4f}") print("\nTop results by Combined Score (avg of embedding|phash|image-match):") for i, (path, combined, emb_clamped, ph_sim, im_sim) in enumerate(combined_results[:k], start=1): print(f"Rank {i}: {path} | Combined: {combined:.4f} | emb: {emb_clamped:.4f} | phash_sim: {ph_sim:.4f} | imgmatch_sim: {im_sim:.4f}") print("\nSearch complete.") # Return sorted lists for programmatic consumption return embedding_results, phash_results_sorted, imgmatch_results_sorted, combined_results # -------------------------- # Choose best candidate helper # -------------------------- from collections import defaultdict import math def choose_top_candidates(embedding_results, phash_results, imgmatch_results, top_k=10, method_weights=(0.5, 0.3, 0.2), verbose=True): """ embedding_results: list of (path, emb_sim) where emb_sim roughly in [-1,1] (we'll clamp to 0..1) phash_results: list of (path, hamming, ph_sim) where ph_sim in [0,1] imgmatch_results: list of (path, dist, im_sim) where im_sim in [0,1] method_weights: weights for (emb, phash, imgmatch) when using weighted average returns dict with top candidates from three methods and diagnostics """ # Build dicts for quick lookup emb_map = {p: float(s) for p, s in embedding_results} ph_map = {p: float(sim) for p, _, sim in phash_results} im_map = {p: float(sim) for p, _, sim in imgmatch_results} # Universe of candidates (union) all_paths = sorted(set(list(emb_map.keys()) + list(ph_map.keys()) + list(im_map.keys()))) # --- Normalize each metric across candidates to [0,1] --- def normalize_map(m): vals = [m.get(p, None) for p in all_paths] # treat missing as None present = [v for v in vals if v is not None and not math.isnan(v)] if not present: return {p: 0.0 for p in all_paths} vmin, vmax = min(present), max(present) if vmax == vmin: # constant -> map present values to 1.0, missing to 0 return {p: (1.0 if (m.get(p, None) is not None) else 0.0) for p in all_paths} norm = {} for p in all_paths: v = m.get(p, None) if v is None or math.isnan(v): norm[p] = 0.0 else: norm[p] = (v - vmin) / (vmax - vmin) # clamp if norm[p] < 0: norm[p] = 0.0 if norm[p] > 1: norm[p] = 1.0 return norm # For embeddings, clamp negatives to 0 first (optional) emb_map_clamped = {} for p, v in emb_map.items(): # common approach: embeddings are cosine in [-1,1]; clamp negatives to 0 to treat as no-sim emb_map_clamped[p] = max(0.0, v) emb_norm = normalize_map(emb_map_clamped) ph_norm = normalize_map(ph_map) im_norm = normalize_map(im_map) # --- Method A: Normalized weighted average --- w_emb, w_ph, w_im = method_weights weighted_scores = {} for p in all_paths: weighted_scores[p] = (w_emb * emb_norm.get(p, 0.0) + w_ph * ph_norm.get(p, 0.0) + w_im * im_norm.get(p, 0.0)) top_weighted = sorted(weighted_scores.items(), key=lambda x: x[1], reverse=True)[:top_k] # --- Method B: Rank-sum (Borda) --- # compute ranks per metric (higher value => better rank 1) def ranks_from_map(m_norm): # bigger is better items = sorted(m_norm.items(), key=lambda x: x[1], reverse=True) ranks = {} for i, (p, _) in enumerate(items): ranks[p] = i + 1 # 1-based # missing entries get worst rank (len+1) worst = len(items) + 1 for p in all_paths: if p not in ranks: ranks[p] = worst return ranks rank_emb = ranks_from_map(emb_norm) rank_ph = ranks_from_map(ph_norm) rank_im = ranks_from_map(im_norm) rank_sum = {} for p in all_paths: rank_sum[p] = rank_emb.get(p, 9999) + rank_ph.get(p, 9999) + rank_im.get(p, 9999) top_rank_sum = sorted(rank_sum.items(), key=lambda x: x[1])[:top_k] # smaller is better # --- Method C: Harmonic mean of the normalized scores (penalizes missing/low values) --- harm_scores = {} for p in all_paths: a = emb_norm.get(p, 0.0) b = ph_norm.get(p, 0.0) c = im_norm.get(p, 0.0) # avoid zeros -> harmonic is defined for positive values, but we want to allow zero => it will be 0 if a + b + c == 0: harm = 0.0 else: # harmonic mean for three values: 3 / (1/a + 1/b + 1/c), but if any is zero, result is 0 if a == 0 or b == 0 or c == 0: harm = 0.0 else: harm = 3.0 / ((1.0/a) + (1.0/b) + (1.0/c)) harm_scores[p] = harm top_harm = sorted(harm_scores.items(), key=lambda x: x[1], reverse=True)[:top_k] # --- Consensus set: items that appear in top-K of each metric individually --- def topk_set_by_map(m_norm, k=top_k): return set([p for p,_ in sorted(m_norm.items(), key=lambda x: x[1], reverse=True)[:k]]) cons_set = topk_set_by_map(emb_norm, top_k) & topk_set_by_map(ph_norm, top_k) & topk_set_by_map(im_norm, top_k) # Build readable outputs result = { "emb_norm": emb_norm, "ph_norm": ph_norm, "im_norm": im_norm, "weighted_topk": top_weighted, "rank_sum_topk": top_rank_sum, "harmonic_topk": top_harm, "consensus_topk": list(cons_set), "weighted_scores_full": weighted_scores, "rank_sum_full": rank_sum, "harmonic_full": harm_scores } if verbose: print("\nTop by Weighted Normalized Average (weights emb,ph,img = {:.2f},{:.2f},{:.2f}):".format(w_emb, w_ph, w_im)) for i,(p,s) in enumerate(result["weighted_topk"], start=1): print(f" {i}. {p} score={s:.4f} emb={emb_norm.get(p,0):.3f} ph={ph_norm.get(p,0):.3f} im={im_norm.get(p,0):.3f}") print("\nTop by Rank-sum (lower is better):") for i,(p,s) in enumerate(result["rank_sum_topk"], start=1): print(f" {i}. {p} rank_sum={s} emb_rank={rank_emb.get(p)} ph_rank={rank_ph.get(p)} img_rank={rank_im.get(p)}") print("\nTop by Harmonic mean (requires non-zero on all metrics):") for i,(p,s) in enumerate(result["harmonic_topk"], start=1): print(f" {i}. {p} harm={s:.4f} emb={emb_norm.get(p,0):.3f} ph={ph_norm.get(p,0):.3f} im={im_norm.get(p,0):.3f}") print("\nConsensus (in top-{0} of ALL metrics): {1}".format(top_k, result["consensus_topk"])) return result def is_subpath(path: str, base: str) -> bool: """Return True if path is inside base (works across OSes).""" try: p = os.path.normpath(os.path.abspath(path)) b = os.path.normpath(os.path.abspath(base)) if os.name == "nt": p = p.lower(); b = b.lower() return os.path.commonpath([p, b]) == b except Exception: return False # Helper function to load the block catalog from a JSON file def _load_block_catalog(block_type: str) -> Dict: """ Loads the Scratch block catalog named '{block_type}_blocks.json' from the /blocks/ folder. Returns {} on any error. """ catalog_path = BLOCKS_DIR / f"{block_type}.json" try: text = catalog_path.read_text() # will raise FileNotFoundError if missing catalog = json.loads(text) # will raise JSONDecodeError if malformed logger.info(f"Successfully loaded block catalog from {catalog_path}") return catalog except FileNotFoundError: logger.error(f"Error: Block catalog file not found at {catalog_path}") except json.JSONDecodeError as e: logger.error(f"Error decoding JSON from {catalog_path}: {e}") except Exception as e: logger.error(f"Unexpected error loading {catalog_path}: {e}") def get_block_by_opcode(catalog_data: dict, opcode: str) -> dict | None: """ Search a single catalog (with keys "description" and "blocks": List[dict]) for a block whose 'op_code' matches the given opcode. Returns the block dict or None if not found. """ for block in catalog_data["blocks"]: if block.get("op_code") == opcode: return block return None # Helper function to find a block in all catalogs by opcode def find_block_in_all(opcode: str, all_catalogs: list[dict]) -> dict | None: """ Search across multiple catalogs for a given opcode. Returns the first matching block dict or None. """ for catalog in all_catalogs: blk = get_block_by_opcode(catalog, opcode) if blk is not None: return blk return None def variable_intialization(project_data): """ Updates variable and broadcast definitions in a Scratch project JSON, populating the 'variables' and 'broadcasts' sections of the Stage target and extracting initial values for variables. Args: project_data (dict): The loaded JSON data of the Scratch project. Returns: dict: The updated project JSON data. """ stage_target = None for target in project_data['targets']: if target.get('isStage'): stage_target = target break if stage_target is None: print("Error: Stage target not found in the project data.") return project_data # Ensure 'variables' and 'broadcasts' exist in the Stage target if "variables" not in stage_target: stage_target["variables"] = {} if "broadcasts" not in stage_target: stage_target["broadcasts"] = {} # Helper function to recursively find and update variable/broadcast fields def process_dict(obj): if isinstance(obj, dict): # Check for "data_setvariableto" opcode to extract initial values if obj.get("opcode") == "data_setvariableto": variable_field = obj.get("fields", {}).get("VARIABLE") value_input = obj.get("inputs", {}).get("VALUE") if variable_field and isinstance(variable_field, list) and len(variable_field) == 2: var_name = variable_field[0] var_id = variable_field[1] initial_value = "" if value_input and isinstance(value_input, list) and len(value_input) > 1 and \ isinstance(value_input[1], list) and len(value_input[1]) > 1: if value_input[1][0] == 10: initial_value = str(value_input[1][1]) elif value_input[1][0] == 12 and len(value_input) > 2 and isinstance(value_input[2], list) and value_input[2][0] == 10: initial_value = str(value_input[2][1]) elif isinstance(value_input[1], (str, int, float)): initial_value = str(value_input[1]) stage_target["variables"][var_id] = [var_name, initial_value] for key, value in obj.items(): # Process broadcast definitions in 'inputs' (BROADCAST_INPUT) if key == "BROADCAST_INPUT" and isinstance(value, list) and len(value) == 2 and \ isinstance(value[1], list) and len(value[1]) == 3 and value[1][0] == 11: broadcast_name = value[1][1] broadcast_id = value[1][2] stage_target["broadcasts"][broadcast_id] = broadcast_name # Process broadcast definitions in 'fields' (BROADCAST_OPTION) elif key == "BROADCAST_OPTION" and isinstance(value, list) and len(value) == 2: broadcast_name = value[0] broadcast_id = value[1] stage_target["broadcasts"][broadcast_id] = broadcast_name # Recursively call for nested dictionaries or lists process_dict(value) elif isinstance(obj, list): for i, item in enumerate(obj): # Process variable references in 'inputs' (like [12, "score", "id"]) if isinstance(item, list) and len(item) == 3 and item[0] == 12: var_name = item[1] var_id = item[2] if var_id not in stage_target["variables"]: stage_target["variables"][var_id] = [var_name, ""] process_dict(item) # Iterate through all targets to process their blocks for target in project_data['targets']: if "blocks" in target: for block_id, block_data in target["blocks"].items(): process_dict(block_data) return project_data def deduplicate_variables(project_data): """ Removes duplicate variable entries in the 'variables' dictionary of the Stage target, prioritizing entries with non-empty values. Args: project_data (dict): The loaded JSON data of the Scratch project. Returns: dict: The updated project JSON data with deduplicated variables. """ stage_target = None for target in project_data['targets']: if target.get('isStage'): stage_target = target break if stage_target is None: print("Error: Stage target not found in the project data.") return project_data if "variables" not in stage_target: return project_data # No variables to deduplicate resolved_variables = {} for var_id, var_info in stage_target["variables"].items(): var_name = var_info[0] var_value = var_info[1] if var_name not in resolved_variables: # If the variable name is not yet seen, add it resolved_variables[var_name] = [var_id, var_name, var_value] else: # If the variable name is already seen, decide which one to keep existing_id, existing_name, existing_value = resolved_variables[var_name] # Prioritize the entry with a non-empty value if var_value != "" and existing_value == "": resolved_variables[var_name] = [var_id, var_name, var_value] elif var_value != "" and existing_value != "": resolved_variables[var_name] = [var_id, var_name, var_value] elif var_value == "" and existing_value == "": # If both are empty, just keep the current one (arbitrary) resolved_variables[var_name] = [var_id, var_name, var_value] # Reconstruct the 'variables' dictionary using the resolved entries new_variables_dict = {} for var_name, var_data in resolved_variables.items(): var_id_to_keep = var_data[0] var_name_to_keep = var_data[1] var_value_to_keep = var_data[2] new_variables_dict[var_id_to_keep] = [var_name_to_keep, var_value_to_keep] stage_target["variables"] = new_variables_dict return project_data def variable_adder_main(project_data): try: declare_variable_json= variable_intialization(project_data) print("declare_variable_json------->",declare_variable_json) except Exception as e: print(f"Error error in the variable initialization opcodes: {e}") try: processed_json= deduplicate_variables(declare_variable_json) print("processed_json------->",processed_json) return processed_json except Exception as e: print(f"Error error in the variable initialization opcodes: {e}") # # --- Global variable for the block catalog --- # ALL_SCRATCH_BLOCKS_CATALOG = {} # BLOCK_CATALOG_PATH = "blocks" # Define the path to your JSON file # HAT_BLOCKS_PATH = "hat_blocks" # Path to the hat blocks JSON file # STACK_BLOCKS_PATH = "stack_blocks" # Path to the stack blocks JSON file # REPORTER_BLOCKS_PATH = "reporter_blocks" # Path to the reporter blocks JSON file # BOOLEAN_BLOCKS_PATH = "boolean_blocks" # Path to the boolean blocks JSON file # C_BLOCKS_PATH = "c_blocks" # Path to the C blocks JSON file # CAP_BLOCKS_PATH = "cap_blocks" # Path to the cap blocks JSON file # # Load the block catalogs from their respective JSON files # hat_block_data = _load_block_catalog(HAT_BLOCKS_PATH) # hat_description = hat_block_data["description"] # #hat_description = hat_block_data.get("description", "No description available") # # hat_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in hat_block_data["blocks"]]) # hat_opcodes_functionalities = "\n".join([ # # f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" # f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" # for block in hat_block_data.get("blocks", []) # ]) if isinstance(hat_block_data.get("blocks"), list) else " No blocks information available." # #hat_opcodes_functionalities = os.path.join(BLOCKS_DIR, "hat_blocks.txt") # print("Hat blocks loaded successfully.", hat_description) # boolean_block_data = _load_block_catalog(BOOLEAN_BLOCKS_PATH) # boolean_description = boolean_block_data["description"] # # boolean_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in boolean_block_data["blocks"]]) # boolean_opcodes_functionalities = "\n".join([ # # f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" # f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" # for block in boolean_block_data.get("blocks", []) # ]) if isinstance(boolean_block_data.get("blocks"), list) else " No blocks information available." # #boolean_opcodes_functionalities = os.path.join(BLOCKS_DIR, "boolean_blocks.txt") # c_block_data = _load_block_catalog(C_BLOCKS_PATH) # c_description = c_block_data["description"] # # c_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in c_block_data["blocks"]]) # c_opcodes_functionalities = "\n".join([ # # f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" # f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" # for block in c_block_data.get("blocks", []) # ]) if isinstance(c_block_data.get("blocks"), list) else " No blocks information available." # #c_opcodes_functionalities = os.path.join(BLOCKS_DIR, "c_blocks.txt") # cap_block_data = _load_block_catalog(CAP_BLOCKS_PATH) # cap_description = cap_block_data["description"] # # cap_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in cap_block_data["blocks"]]) # cap_opcodes_functionalities = "\n".join([ # # f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" # f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" # for block in cap_block_data.get("blocks", []) # ]) if isinstance(cap_block_data.get("blocks"), list) else " No blocks information available." # #cap_opcodes_functionalities = os.path.join(BLOCKS_DIR, "cap_blocks.txt") # reporter_block_data = _load_block_catalog(REPORTER_BLOCKS_PATH) # reporter_description = reporter_block_data["description"] # # reporter_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in reporter_block_data["blocks"]]) # reporter_opcodes_functionalities = "\n".join([ # # f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" # f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" # for block in reporter_block_data.get("blocks", []) # ]) if isinstance(reporter_block_data.get("blocks"), list) else " No blocks information available." # #reporter_opcodes_functionalities = os.path.join(BLOCKS_DIR, "reporter_blocks.txt") # stack_block_data = _load_block_catalog(STACK_BLOCKS_PATH) # stack_description = stack_block_data["description"] # # stack_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in stack_block_data["blocks"]]) # stack_opcodes_functionalities = "\n".join([ # # f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" # f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" # for block in stack_block_data.get("blocks", []) # ]) if isinstance(stack_block_data.get("blocks"), list) else " No blocks information available." # #stack_opcodes_functionalities = os.path.join(BLOCKS_DIR, "stack_blocks.txt") # # This makes ALL_SCRATCH_BLOCKS_CATALOG available globally # ALL_SCRATCH_BLOCKS_CATALOG = _load_block_catalog(BLOCK_CATALOG_PATH) def extract_json_from_llm_response(raw_response: str) -> dict: """ Finds and parses the first valid JSON object from a raw LLM response string. """ logger.debug("Attempting to extract JSON from raw LLM response...") # 1. Look for a JSON markdown block first match = re.search(r"```(?:json)?\s*({[\s\S]*?})\s*```", raw_response) if match: json_string = match.group(1) logger.debug("Found JSON inside a markdown block.") try: return json.loads(json_string) except json.JSONDecodeError as e: logger.warning(f"Failed to parse JSON from markdown block: {e}") # Fall through to the next method if parsing fails # 2. If no block is found (or it failed), find the outermost braces logger.debug("Markdown block not found or failed. Searching for outermost braces.") try: first_brace = raw_response.find('{') last_brace = raw_response.rfind('}') if first_brace != -1 and last_brace != -1 and first_brace < last_brace: json_string = raw_response[first_brace : last_brace + 1] return json.loads(json_string) else: logger.error("Could not find a valid JSON structure (outermost braces).") raise json.JSONDecodeError("No valid JSON object found in the response.", raw_response, 0) except json.JSONDecodeError as e: logger.error(f"Final JSON parsing attempt failed: {e}") # Re-raise the exception to be caught by the calling logic (to invoke the corrector agent) raise def reduce_image_size_to_limit(clean_b64_str: str, max_kb: int = 4000) -> str: """ Input: clean_b64_str = BASE64 STRING (no data: prefix) Output: BASE64 STRING (no data: prefix), sized as close as possible to max_kb KB. Guarantees: returns a valid base64 string (never None). May still be larger than max_kb if saving at lowest quality cannot get under the limit. """ # sanitize clean = re.sub(r"\s+", "", clean_b64_str).strip() # fix padding missing = len(clean) % 4 if missing: clean += "=" * (4 - missing) try: image_data = base64.b64decode(clean) except Exception as e: raise ValueError("Invalid base64 input to reduce_image_size_to_limit") from e try: img = Image.open(io.BytesIO(image_data)) img.load() except Exception as e: raise ValueError("Could not open image from base64") from e # convert alpha -> RGB because JPEG doesn't support alpha if img.mode in ("RGBA", "LA") or (img.mode == "P" and "transparency" in img.info): background = Image.new("RGB", img.size, (255, 255, 255)) background.paste(img, mask=img.split()[-1] if img.mode != "RGB" else None) img = background elif img.mode != "RGB": img = img.convert("RGB") low, high = 20, 95 best_bytes = None # binary search for best quality while low <= high: mid = (low + high) // 2 buf = io.BytesIO() try: img.save(buf, format="JPEG", quality=mid, optimize=True) except OSError: # some PIL builds/channels may throw on optimize=True; fallback without optimize buf = io.BytesIO() img.save(buf, format="JPEG", quality=mid) size_kb = len(buf.getvalue()) / 1024.0 if size_kb <= max_kb: best_bytes = buf.getvalue() low = mid + 1 else: high = mid - 1 # if never found a quality <= max_kb, use the smallest we created (quality = 20) if best_bytes is None: buf = io.BytesIO() try: img.save(buf, format="JPEG", quality=20, optimize=True) except OSError: buf = io.BytesIO() img.save(buf, format="JPEG", quality=20) best_bytes = buf.getvalue() return base64.b64encode(best_bytes).decode("utf-8") def clean_base64_for_model(raw_b64, max_bytes_threshold=4000000) -> str: """ Accepts: raw_b64 can be: - a data URI 'data:image/png;base64,...' - a plain base64 string - a PIL Image - a list containing the above (take first) Returns: a data URI string 'data:;base64,' guaranteed to be syntactically valid. """ # normalize input if not raw_b64: return "" if isinstance(raw_b64, list): raw_b64 = raw_b64[0] if raw_b64 else "" if not raw_b64: return "" if isinstance(raw_b64, Image.Image): buf = io.BytesIO() # convert to RGB and save as JPEG to keep consistent img = raw_b64.convert("RGB") img.save(buf, format="JPEG") clean_b64 = base64.b64encode(buf.getvalue()).decode("utf-8") mime = "image/jpeg" return f"data:{mime};base64,{clean_b64}" if not isinstance(raw_b64, str): raise TypeError(f"Expected base64 string or PIL Image, got {type(raw_b64)}") # detect mime if present; otherwise default to png m = re.match(r"^data:(image\/[a-zA-Z0-9.+-]+);base64,(.+)$", raw_b64, flags=re.DOTALL) if m: mime = m.group(1) clean_b64 = m.group(2) else: # no prefix; assume png by default (you can change to jpeg if you prefer) mime = "image/png" clean_b64 = raw_b64 # sanitize base64 string clean_b64 = re.sub(r"\s+", "", clean_b64).strip() missing = len(clean_b64) % 4 if missing: clean_b64 += "=" * (4 - missing) original_size_bytes = len(clean_b64.encode("utf-8")) # debug print print(f"Original base64 size (bytes): {original_size_bytes}, mime: {mime}") if original_size_bytes > max_bytes_threshold: # reduce and return JPEG prefixed data URI (JPEG tends to compress better for photos) reduced_clean = reduce_image_size_to_limit(clean_b64, max_kb=4000) # reduced_clean is plain base64 (no prefix) print(f"Reduced base64 size (bytes): {original_size_bytes}, mime: {mime}") return f"data:image/jpeg;base64,{reduced_clean}" # otherwise return original with its mime prefix (ensure prefix exists) return f"data:{mime};base64,{clean_b64}" SCRATCH_OPCODES = [ 'motion_movesteps', 'motion_turnright', 'motion_turnleft', 'motion_goto', 'motion_gotoxy', 'motion_glideto', 'motion_glidesecstoxy', 'motion_pointindirection', 'motion_pointtowards', 'motion_changexby', 'motion_setx', 'motion_changeyby', 'motion_sety', 'motion_ifonedgebounce', 'motion_setrotationstyle', 'looks_sayforsecs', 'looks_say', 'looks_thinkforsecs', 'looks_think', 'looks_switchcostumeto', 'looks_nextcostume', 'looks_switchbackdropto', 'looks_switchbackdroptowait', 'looks_nextbackdrop', 'looks_changesizeby', 'looks_setsizeto', 'looks_changeeffectby', 'looks_seteffectto', 'looks_cleargraphiceffects', 'looks_show', 'looks_hide', 'looks_gotofrontback', 'looks_goforwardbackwardlayers', 'sound_playuntildone', 'sound_play', 'sound_stopallsounds', 'sound_changevolumeby', 'sound_setvolumeto', 'event_broadcast', 'event_broadcastandwait', 'control_wait', 'control_wait_until', 'control_stop', 'control_create_clone_of', 'control_delete_this_clone', 'data_setvariableto', 'data_changevariableby', 'data_addtolist', 'data_deleteoflist', 'data_insertatlist', 'data_replaceitemoflist', 'data_showvariable', 'data_hidevariable', 'data_showlist', 'data_hidelist', 'sensing_askandwait', 'sensing_resettimer', 'sensing_setdragmode', 'procedures_call', 'operator_lt', 'operator_equals', 'operator_gt', 'operator_and', 'operator_or', 'operator_not', 'operator_contains', 'sensing_touchingobject', 'sensing_touchingcolor', 'sensing_coloristouchingcolor', 'sensing_keypressed', 'sensing_mousedown', 'data_listcontainsitem', 'control_repeat', 'control_forever', 'control_if', 'control_if_else', 'control_repeat_until', 'motion_xposition', 'motion_yposition', 'motion_direction', 'looks_costumenumbername', 'looks_size', 'looks_backdropnumbername', 'sound_volume', 'sensing_distanceto', 'sensing_answer', 'sensing_mousex', 'sensing_mousey', 'sensing_loudness', 'sensing_timer', 'sensing_of', 'sensing_current', 'sensing_dayssince2000', 'sensing_username', 'operator_add', 'operator_subtract', 'operator_multiply', 'operator_divide', 'operator_random', 'operator_join', 'operator_letterof', 'operator_length', 'operator_mod', 'operator_round', 'operator_mathop', 'data_variable', 'data_list', 'data_itemoflist', 'data_lengthoflist', 'data_itemnumoflist', 'event_whenflagclicked', 'event_whenkeypressed', 'event_whenthisspriteclicked', 'event_whenbackdropswitchesto', 'event_whengreaterthan', 'event_whenbroadcastreceived', 'control_start_as_clone', 'procedures_definition' ] def validate_and_fix_opcodes(opcode_counts): """ Ensures all opcodes are valid. If an opcode is invalid, replace with closest match. """ corrected_list = [] for item in opcode_counts: opcode = item.get("opcode") count = item.get("count", 1) if opcode not in SCRATCH_OPCODES: # Find closest match (case-sensitive) match = get_close_matches(opcode, SCRATCH_OPCODES, n=1, cutoff=0.6) if match: print(f"Opcode '{opcode}' not found. Replacing with '{match[0]}'") opcode = match[0] else: print(f"Opcode '{opcode}' not recognized and no close match found. Skipping.") continue corrected_list.append({"opcode": opcode, "count": count}) # Merge duplicates after correction merged = {} for item in corrected_list: merged[item["opcode"]] = merged.get(item["opcode"], 0) + item["count"] return [{"opcode": k, "count": v} for k, v in merged.items()] def format_scratch_pseudo_code(code_string): """ Parses and formats Scratch pseudo-code with correct indentation, specifically handling if/else/end structures correctly. Args: code_string (str): A string containing Scratch pseudo-code with potentially inconsistent indentation. Returns: str: The correctly formatted and indented pseudo-code string. """ lines = code_string.strip().split('\n') formatted_lines = [] indent_level = 0 # Keywords that increase indentation for the NEXT line indent_keywords = ['when', 'forever', 'if', 'repeat', 'else'] # Keywords that decrease indentation for the CURRENT line unindent_keywords = ['end', 'else'] for line in lines: stripped_line = line.strip() if not stripped_line: continue # Check for keywords that should un-indent the current line if any(keyword in stripped_line for keyword in unindent_keywords): # Special case for 'else': it should align with its 'if' if 'else' in stripped_line: # Decrease indentation for 'else' and its following lines indentation = ' ' * (indent_level -1) formatted_lines.append(indentation + stripped_line) continue # For 'end', decrease the level before formatting indent_level = max(0, indent_level - 1) indentation = ' ' * indent_level formatted_lines.append(indentation + stripped_line) # Check for keywords that should indent the next line if any(keyword in stripped_line for keyword in indent_keywords): # 'else' both un-indents and indents, so the level remains the same for the next block if 'else' not in stripped_line: indent_level += 1 return '\n'.join(formatted_lines) # Node 1: Logic updating if any issue here def pseudo_generator_node(state: GameState): logger.info("--- Running plan_logic_aligner_node ---") image = state.get("project_image", "") project_json = state["project_json"] cnt =state["page_count"] print(f"The page number recived at the pseudo_generator node:-----> {cnt}") # MODIFICATION 1: Include 'Stage' in the list of names to plan for. # It's crucial to ensure 'Stage' is always present for its global role. target_names = [t["name"] for t in project_json["targets"]] stage_names = [t["name"] for t in project_json["targets"] if t.get("isStage")] sprite_names = [t["name"] for t in project_json["targets"] if not t.get("isStage")] # Get costumes separately for Stage and Sprites stage_costumes = [ c["name"] for t in project_json["targets"] if t.get("isStage") for c in t.get("costumes", []) ] refinement_prompt = f""" You are an expert Scratch 3.0 programmer. Your task is to analyze an image of Scratch code blocks and convert it into a structured JSON object containing precise pseudocode. --- ## CONTEXT - **Available Sprites:** {', '.join(sprite_names)} - **Available Stage Costumes:** {', '.join(stage_costumes)} --- ## INSTRUCTIONS 1. **Identify the Target:** Find the text "Script for:" in the image to determine the target sprite or stage. 2. **Apply Stage Rule:** If the identified target name exactly matches any name in the `Available Stage Costumes` list, you MUST set the output `name_variable` to `"Stage"`. Otherwise, use the identified target name. 3. **Handle No Code:** If no Scratch blocks are visible in the image, return the specified "No Code-blocks" JSON format. 4. **Generate Pseudocode:** If blocks are present, convert them to pseudocode according to the rules below. 5. **Output ONLY JSON:** Your entire response must be a single, valid JSON object inside a ```json code block and nothing else. --- ## PSEUDOCODE FORMATTING RULES - **Numbers & Text:** Enclose in parentheses. `(10)`, `(-50)`, `(hello)`. - **Variables & Dropdowns:** Enclose in square brackets with ` v`. `[score v]`, `[space v]`. - **Reporter Blocks:** Enclose in double parentheses. `((x position))`. - **Boolean Conditions:** Enclose in angle brackets. `<((score)) > (50)>`, `>`. - **Specific Block Exceptions:** Self-contained blocks like `if on edge, bounce`, `next costume`, and `hide` should be written as-is, without any parentheses or brackets. - **Line Breaks:** Use `\n` to separate each block onto a new line. The entire pseudocode must be a single JSON string. - **Indentation:** Use **4 spaces** to indent blocks nested inside C-Blocks (like `if`, `if else`, `repeat`, `forever`). - **Termination:** - **Every script** (starting with a hat block) MUST conclude with `end`. - **Every C-Block** (`if`, `repeat`, `forever`) MUST also have its own corresponding `end` at the correct indentation level. This is critical. --- ## REQUIRED JSON FORMAT If code blocks are found: ```json {{ "refined_logic": {{ "name_variable": "Name_Identified_From_Instructions", "pseudocode": "Your fully formatted pseudocode as a single string with \\n newlines." }} }} ```` If no code blocks are found: ```json {{ "refined_logic": {{ "name_variable": "Name_Identified_From_Instructions", "pseudocode": "No Code-blocks" }} }} ``` ----- ## EXAMPLES **Example 1: Looping and Conditionals** ``` when green flag clicked go to x: (240) y: (-100) set [speed v] to (-5) forever change x by ([speed v]) if <((x position)) < (-240)> then go to x: (240) y: (-100) end end end ``` **Example 2: Events and Broadcasting** ``` when I receive [Game Over v] if <((score)) > (([High Score v]))> then set [High Score v] to ([score v]) end switch backdrop to [Game Over v] end ``` """ image_input = { "type": "image_url", "image_url": { # "url": f"data:image/png;base64,{image}" "url": clean_base64_for_model(image[cnt]) } } content = [ {"type": "text", "text": refinement_prompt}, image_input ] try: # Invoke the main agent for logic refinement and relationship identification response = agent.invoke({"messages": [{"role": "user", "content": content}]}) llm_output_raw = response["messages"][-1].content.strip() print(f"llm_output_raw: {response}") parsed_llm_output = extract_json_from_llm_response(llm_output_raw) result = parsed_llm_output print(f"result:\n\n {result}") except json.JSONDecodeError as error_json: correction_prompt = f""" Fix this malformed response and return only the corrected JSON: Input: {llm_output_raw if 'llm_output_raw' in locals() else 'No response available'} Extract the sprite name and pseudocode, then return in this exact format: {{ "refined_logic": {{ "name_variable": "sprite_name", "pseudocode": "pseudocode_here" }} }} """ try: correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) corrected_output = extract_json_from_llm_response(correction_response['messages'][-1].content) result = corrected_output print(f"result:\n\n {result}") except Exception as e_corr: logger.error(f"Failed to correct JSON output for even after retry: {e_corr}") # Update the original action_plan in the state with the refined version state["pseudo_code"] = result state["temp_pseudo_code"] += [result] Data = state["temp_pseudo_code"] print(f"[OVREALL REFINED PSEUDO CODE LOGIC]: {result}") print(f"[OVREALL LISTS OF LOGICS]: {Data}") logger.info("Plan refinement and block relation analysis completed for all plans.") return state # Node2: Node Optimizer node def node_optimizer(state: GameState): logger.info("--- Running Node Optimizer Node ---") project_json = state["project_json"] raw = state.get("pseudo_code", {}) refined_logic_data = raw.get("refined_logic", {}) sprite_name = refined_logic_data.get("name_variable", "") pseudo = refined_logic_data.get("pseudocode", "") sprite_name = {} project_json_targets = state.get("project_json", {}).get("targets", []) for target in project_json_targets: sprite_name[target["name"]] = target["name"] action_flow = state.get("action_plan", {}) try: refined_logic_data["pseudocode"] = separate_scripts(str(pseudo)) # Step 4: If you want to update the `state` dictionary with the new refined_logic_data state["pseudo_code"]["refined_logic"] = refined_logic_data print(f"[The pseudo_code generated here]: { state['pseudo_code']}") state["action_plan"] = transform_logic_to_action_flow(state["pseudo_code"]) print(f"[The action plan generated here]: { state['action_plan']}") action_flow = state.get("action_plan", {}) if action_flow.get("action_overall_flow", {}) == {}: plan_data = action_flow.items() else: plan_data = action_flow.get("action_overall_flow", {}).items() refined_flow: Dict[str, Any] = {} for sprite, sprite_data in plan_data: refined_plans = [] for plan in sprite_data.get("plans", []): logic = plan.get("logic", "") plan["opcode_counts"]= analyze_opcode_counts(str(logic)) refined_plans.append(plan) refined_flow[sprite] = { "description": sprite_data.get("description", ""), "plans": refined_plans } if refined_flow: state["action_plan"] = refined_flow logger.info("Node Optimization completed.") return state except Exception as e: logger.error(f"Error in Node Optimizer Node: {e}") # Node 5: block_builder_node def overall_block_builder_node_2(state: GameState): logger.info("--- Running OverallBlockBuilderNode ---") print("--- Running OverallBlockBuilderNode ---") project_json = state["project_json"] targets = project_json["targets"] # --- Sprite and Stage Target Mapping --- sprite_map = {target["name"]: target for target in targets if not target["isStage"]} stage_target = next((target for target in targets if target["isStage"]), None) if stage_target: sprite_map[stage_target["name"]] = stage_target action_plan = state.get("action_plan", {}) print("[Overall Action Plan received at the block generator]:", json.dumps(action_plan, indent=2)) if not action_plan: logger.warning("No action plan found in state. Skipping OverallBlockBuilderNode.") return state # Initialize offsets for script placement on the Scratch canvas script_y_offset = {} script_x_offset_per_sprite = {name: 0 for name in sprite_map.keys()} # This handles potential variations in the action_plan structure. if action_plan.get("action_overall_flow", {}) == {}: plan_data = action_plan.items() else: plan_data = action_plan.get("action_overall_flow", {}).items() # --- Extract global project context for LLM --- all_sprite_names = list(sprite_map.keys()) all_variable_names = {} all_list_names = {} all_broadcast_messages = {} for target in targets: for var_id, var_info in target.get("variables", {}).items(): all_variable_names[var_info[0]] = var_id # Store name -> ID mapping (e.g., "myVariable": "myVarId123") for list_id, list_info in target.get("lists", {}).items(): all_list_names[list_info[0]] = list_id # Store name -> ID mapping for broadcast_id, broadcast_name in target.get("broadcasts", {}).items(): all_broadcast_messages[broadcast_name] = broadcast_id # Store name -> ID mapping # --- Process each sprite's action plan --- for sprite_name, sprite_actions_data in plan_data: if sprite_name in sprite_map: current_sprite_target = sprite_map[sprite_name] if "blocks" not in current_sprite_target: current_sprite_target["blocks"] = {} if sprite_name not in script_y_offset: script_y_offset[sprite_name] = 0 for plan_entry in sprite_actions_data.get("plans", []): logic_sequence = str(plan_entry["logic"]) opcode_counts = plan_entry.get("opcode_counts", {}) refined_indent_logic = format_scratch_pseudo_code(logic_sequence) print(f"\n--------------------------- refined indent logic: {refined_indent_logic}-------------------------------\n") try: generated_blocks = block_builder(opcode_counts, refined_indent_logic) # Ensure generated_blocks is a dictionary if not isinstance(generated_blocks, dict): logger.error(f"block_builder for sprite '{sprite_name}' returned non-dict type: {type(generated_blocks)}. Skipping block update.") continue # Skip to next plan_entry if output is not a dictionary if "blocks" in generated_blocks and isinstance(generated_blocks["blocks"], dict): logger.warning(f"LLM returned nested 'blocks' key for {sprite_name}. Unwrapping.") generated_blocks = generated_blocks["blocks"] # Update block positions for top-level script for block_id, block_data in generated_blocks.items(): if block_data.get("topLevel"): block_data["x"] = script_x_offset_per_sprite.get(sprite_name, 0) block_data["y"] = script_y_offset[sprite_name] script_y_offset[sprite_name] += 150 # Increment for next script current_sprite_target["blocks"].update(generated_blocks) print(f"[current_sprite_target block updated]: {current_sprite_target['blocks']}") state["iteration_count"] = 0 logger.info(f"Action blocks added for sprite '{sprite_name}' by OverallBlockBuilderNode.") except Exception as e: logger.error(f"Error generating blocks for sprite '{sprite_name}': {e}") state["project_json"] = project_json return state # Node 6: variable adder node def variable_adder_node(state: GameState): logger.info("--- Running Variable Adder Node ---") project_json = state["project_json"] try: updated_project_json = variable_adder_main(project_json) if updated_project_json is not None: print("Variable added inside the project successfully!") state["project_json"]=updated_project_json else: print("Variable adder unable to add any variable inside the project!") state["project_json"]=project_json state["page_count"] +=1 return state except Exception as e: logger.error(f"Error in variable adder node while updating project_json': {e}") raise # Node 7: variable adder node def layer_order_correction(state: GameState): """ Ensures that all sprites (isStage: false) have unique layerOrder values >= 1. If duplicates are found, they are reassigned sequentially. """ logger.info("--- Running Layer Order Correction Node ---") try: project_json = state.get("project_json", {}) targets = project_json.get("targets", []) # Collect all sprites (ignore Stage) sprites = [t for t in targets if not t.get("isStage", False)] # Reassign layerOrder sequentially (starting from 1) for idx, sprite in enumerate(sprites, start=1): old_lo = sprite.get("layerOrder", None) sprite["layerOrder"] = idx logger.debug(f"Sprite '{sprite.get('name')}' layerOrder: {old_lo} -> {idx}") # Stage always remains 0 for target in targets: if target.get("isStage", False): target["layerOrder"] = 0 # Update state state["project_json"]["targets"] = targets logger.info("Layer Order Correction completed successfully.") return state except Exception as e: logger.error(f"Error in Layer Order Correction Node: {e}") return state # Node 8: variable adder node def processed_page_node(state: GameState): logger.info("--- Processing the Pages Node ---") image = state.get("project_image", "") cnt =state["page_count"] print(f"The page processed for page:--------------> {cnt}") if cnt str: print("πŸ” Running similarity matching…") import os import json import numpy as np import torch from PIL import Image, ImageOps, ImageEnhance from imagededup.methods import PHash from transformers import AutoImageProcessor, AutoModel import io import base64 from pathlib import Path import cv2 # hashing & image-match from image_match.goldberg import ImageSignature import sys import math import hashlib from typing import List, Tuple os.makedirs(project_folder, exist_ok=True) # backdrop_base_path = r"D:\DEV PATEL\2025\scratch_VLM\scratch_agent\blocks\Backdrops" # sprite_base_path = r"D:\DEV PATEL\2025\scratch_VLM\scratch_agent\blocks\sprites" # code_blocks_path = r"D:\DEV PATEL\2025\scratch_VLM\scratch_agent\blocks\code_blocks" backdrop_base_path = os.path.normpath(str(BACKDROP_DIR)) sprite_base_path = os.path.normpath(str(SPRITE_DIR)) code_blocks_path = os.path.normpath(str(CODE_BLOCKS_DIR)) # out_path = r"D:\DEV PATEL\2025\scratch_VLM\scratch_agent\blocks\out_json" project_json_path = os.path.join(project_folder, "project.json") # ------------------------- # Build sprite images list (BytesIO) from sprites_data # ------------------------- sprite_ids, sprite_base64 = [], [] for sid, sprite in sprites_data.items(): sprite_ids.append(sid) sprite_base64.append(sprite["base64"]) sprite_images_bytes = [] sprite_b64_clean = [] # <<< new: store cleaned base64 strings for b64 in sprite_base64: # remove possible "data:image/..;base64," prefix raw_b64 = b64.split(",")[-1] sprite_b64_clean.append(raw_b64) # decode into BytesIO for local processing img = Image.open(BytesIO(base64.b64decode(raw_b64))).convert("RGB") buffer = BytesIO() img.save(buffer, format="PNG") buffer.seek(0) sprite_images_bytes.append(buffer) def hybrid_similarity_matching(sprite_images_bytes, sprite_ids, min_similarity=None, top_k=5, method_weights=(0.5,0.3,0.2)): from PIL import Image # Local safe defaults embeddings_path = os.path.join(BLOCKS_DIR, "hybrid_embeddings.json") hash_path = os.path.join(BLOCKS_DIR, "phash_data.json") signature_path = os.path.join(BLOCKS_DIR, "signature_data.json") # Load embeddings embedding_json = {} if os.path.exists(embeddings_path): with open(embeddings_path, "r", encoding="utf-8") as f: embedding_json = json.load(f) # Load phash data (if exists) -> ensure hash_dict variable exists hash_dict = {} if os.path.exists(hash_path): try: with open(hash_path, "r", encoding="utf-8") as f: hash_data = json.load(f) for path, hash_str in hash_data.items(): try: hash_dict[path] = hash_str except Exception: pass except Exception: pass # Load signature data (if exists) -> ensure signature_dict exists signature_dict = {} sig_data = {} if os.path.exists(signature_path): try: with open(signature_path, "r", encoding="utf-8") as f: sig_data = json.load(f) for path, sig_list in sig_data.items(): try: signature_dict[path] = np.array(sig_list) except Exception: pass except Exception: pass # Parse embeddings into lists paths_list = [] embeddings_list = [] if isinstance(embedding_json, dict): for p, emb in embedding_json.items(): if isinstance(emb, dict): maybe_emb = emb.get("embedding") or emb.get("embeddings") or emb.get("emb") if maybe_emb is None: continue arr = np.asarray(maybe_emb, dtype=np.float32) elif isinstance(emb, list): arr = np.asarray(emb, dtype=np.float32) else: continue paths_list.append(os.path.normpath(str(p))) embeddings_list.append(arr) elif isinstance(embedding_json, list): for item in embedding_json: if not isinstance(item, dict): continue p = item.get("path") or item.get("image_path") or item.get("file") or item.get("filename") or item.get("img_path") emb = item.get("embeddings") or item.get("embedding") or item.get("features") or item.get("vector") or item.get("emb") if p is None or emb is None: continue paths_list.append(os.path.normpath(str(p))) embeddings_list.append(np.asarray(emb, dtype=np.float32)) if len(paths_list) == 0: print("⚠ No reference images/embeddings found (this test harness may be running without data)") # Return empty results gracefully return [[] for _ in sprite_images_bytes], [[] for _ in sprite_images_bytes], [] ref_matrix = np.vstack(embeddings_list).astype(np.float32) # Batch: Get all sprite embeddings, phash, sigs first sprite_emb_list = [] sprite_phash_list = [] sprite_sig_list = [] per_sprite_final_indices = [] per_sprite_final_scores = [] per_sprite_rerank_debug = [] for i, sprite_bytes in enumerate(sprite_images_bytes): sprite_pil = Image.open(sprite_bytes) enhanced_sprite = process_image_cv2_from_pil(sprite_pil, scale=2) or sprite_pil # sprite_emb = get_dinov2_embedding_from_pil(preprocess_for_model(enhanced_sprite)) or np.zeros(ref_matrix.shape[1]) # sprite_emb_list.append(sprite_emb) sprite_emb = get_dinov2_embedding_from_pil(preprocess_for_model(enhanced_sprite)) sprite_emb = sprite_emb if sprite_emb is not None else np.zeros(ref_matrix.shape[1]) sprite_emb_list.append(sprite_emb) # Perceptual hash sprite_hash_arr = preprocess_for_hash(enhanced_sprite) sprite_phash = None if sprite_hash_arr is not None: try: sprite_phash = phash.encode_image(image_array=sprite_hash_arr) except: pass sprite_phash_list.append(sprite_phash) # Signature sprite_sig = None embedding_results, phash_results, imgmatch_results, combined_results = run_query_search_flow( query_b64=sprite_b64_clean[i], processed_dir=BLOCKS_DIR, embeddings_dict=embedding_json, hash_dict=hash_data, signature_obj_map=sig_data, gis=gis, phash=phash, MAX_PHASH_BITS=64, k=5 ) # Call the advanced re-ranker rerank_result = choose_top_candidates(embedding_results, phash_results, imgmatch_results, top_k=top_k, method_weights=method_weights, verbose=True) per_sprite_rerank_debug.append(rerank_result) # Selection logic: prefer consensus, else weighted top-1 final = None if len(rerank_result["consensus_topk"]) > 0: consensus = rerank_result["consensus_topk"] best = max(consensus, key=lambda p: rerank_result["weighted_scores_full"].get(p, 0.0)) final = best else: final = rerank_result["weighted_topk"][0][0] if rerank_result["weighted_topk"] else None # Store index and score for downstream use if final is not None and final in paths_list: idx = paths_list.index(final) score = rerank_result["weighted_scores_full"].get(final, 0.0) per_sprite_final_indices.append([idx]) per_sprite_final_scores.append([score]) print(f"Sprite '{sprite_ids}' FINAL selected: {final} (index {idx}) score={score:.4f}") else: per_sprite_final_indices.append([]) per_sprite_final_scores.append([]) return per_sprite_final_indices, per_sprite_final_scores, paths_list#, per_sprite_rerank_debug # Use hybrid matching system per_sprite_matched_indices, per_sprite_scores, paths_list = hybrid_similarity_matching( sprite_images_bytes, sprite_ids, min_similarity, top_k, method_weights=(0.5, 0.3, 0.2) ) # ========================================= # Copy matched sprite assets + collect data # ========================================= project_data = [] backdrop_data = [] copied_sprite_folders = set() copied_backdrop_folders = set() matched_indices = sorted({idx for lst in per_sprite_matched_indices for idx in lst}) print("matched_indices------------------>",matched_indices) import shutil import json import os from pathlib import Path # normalize base paths once before the loop sprite_base_p = Path(sprite_base_path).resolve(strict=False) backdrop_base_p = Path(backdrop_base_path).resolve(strict=False) project_folder_p = Path(project_folder) project_folder_p.mkdir(parents=True, exist_ok=True) copied_sprite_folders = set() copied_backdrop_folders = set() def display_like_windows_no_lead(p: Path) -> str: """ For human-readable logs only β€” convert Path to a string like: "app\\blocks\\Backdrops\\Castle 2.sb3" (no leading slash). """ s = p.as_posix() # forward-slash string, safe for Path objects if s.startswith("/"): s = s[1:] return s.replace("/", "\\") def is_subpath(child: Path, parent: Path) -> bool: """Robust membership test: is child under parent?""" try: # use non-strict resolve only if needed, but avoid exceptions child.relative_to(parent) return True except Exception: return False # Flatten unique matched indices (if not already) matched_indices = sorted({idx for lst in per_sprite_matched_indices for idx in lst}) print("matched_indices------------------>", matched_indices) for matched_idx in matched_indices: # defensive check if not (0 <= matched_idx < len(paths_list)): print(f" ⚠ matched_idx {matched_idx} out of range, skipping") continue matched_image_path = paths_list[matched_idx] matched_path_p = Path(matched_image_path).resolve(strict=False) # keep as Path matched_folder_p = matched_path_p.parent # Path object matched_filename = matched_path_p.name # Prepare display-only string (do NOT reassign matched_folder_p) matched_folder_display = display_like_windows_no_lead(matched_folder_p) print(f"Processing matched image: {matched_image_path}") print(f" - Folder: {matched_folder_display}") print(f" - Sprite path: {display_like_windows_no_lead(sprite_base_p)}") print(f" - Backdrop path: {display_like_windows_no_lead(backdrop_base_p)}") print(f" - Filename: {matched_filename}") # Use a canonical string to store in the copied set (POSIX absolute-ish) folder_key = matched_folder_p.as_posix() # ---------- SPRITE ---------- if is_subpath(matched_folder_p, sprite_base_p) and folder_key not in copied_sprite_folders: print(f"Processing SPRITE folder: {matched_folder_display}") copied_sprite_folders.add(folder_key) sprite_json_path = matched_folder_p / "sprite.json" print("sprite_json_path----------------------->", sprite_json_path) print("copied sprite folder----------------------->", copied_sprite_folders) if sprite_json_path.exists() and sprite_json_path.is_file(): try: with sprite_json_path.open("r", encoding="utf-8") as f: sprite_info = json.load(f) project_data.append(sprite_info) print(f" βœ“ Successfully read sprite.json from {matched_folder_display}") except Exception as e: print(f" βœ— Failed to read sprite.json in {matched_folder_display}: {repr(e)}") else: print(f" ⚠ No sprite.json in {matched_folder_display}") # copy non-matching files from the sprite folder (except matched image and sprite.json) try: sprite_files = list(matched_folder_p.iterdir()) except Exception as e: sprite_files = [] print(f" βœ— Failed to list files in {matched_folder_display}: {repr(e)}") print(f" Files in sprite folder: {[p.name for p in sprite_files]}") for p in sprite_files: fname = p.name if fname in (matched_filename, "sprite.json"): print(f" Skipping {fname} (matched image or sprite.json)") continue if p.is_file(): dst = project_folder_p / fname try: shutil.copy2(str(p), str(dst)) print(f" βœ“ Copied sprite asset: {p} -> {dst}") except Exception as e: print(f" βœ— Failed to copy sprite asset {p}: {repr(e)}") else: print(f" Skipping {fname} (not a file)") # ---------- BACKDROP ---------- if is_subpath(matched_folder_p, backdrop_base_p) and folder_key not in copied_backdrop_folders: print(f"Processing BACKDROP folder: {matched_folder_display}") copied_backdrop_folders.add(folder_key) print("backdrop_base_path----------------------->", display_like_windows_no_lead(backdrop_base_p)) print("copied backdrop folder----------------------->", copied_backdrop_folders) # copy matched backdrop image backdrop_src = matched_folder_p / matched_filename backdrop_dst = project_folder_p / matched_filename if backdrop_src.exists() and backdrop_src.is_file(): try: shutil.copy2(str(backdrop_src), str(backdrop_dst)) print(f" βœ“ Copied matched backdrop image: {backdrop_src} -> {backdrop_dst}") except Exception as e: print(f" βœ— Failed to copy matched backdrop image {backdrop_src}: {repr(e)}") else: print(f" ⚠ Matched backdrop source not found: {backdrop_src}") # copy other files from folder (skip project.json and matched image) try: backdrop_files = list(matched_folder_p.iterdir()) except Exception as e: backdrop_files = [] print(f" βœ— Failed to list files in {matched_folder_display}: {repr(e)}") print(f" Files in backdrop folder: {[p.name for p in backdrop_files]}") for p in backdrop_files: fname = p.name if fname in (matched_filename, "project.json"): print(f" Skipping {fname} (matched image or project.json)") continue if p.is_file(): dst = project_folder_p / fname try: shutil.copy2(str(p), str(dst)) print(f" βœ“ Copied backdrop asset: {p} -> {dst}") except Exception as e: print(f" βœ— Failed to copy backdrop asset {p}: {repr(e)}") else: print(f" Skipping {fname} (not a file)") # read project.json to extract Stage/targets pj = matched_folder_p / "project.json" if pj.exists() and pj.is_file(): try: with pj.open("r", encoding="utf-8") as f: bd_json = json.load(f) stage_count = 0 for tgt in bd_json.get("targets", []): if tgt.get("isStage"): backdrop_data.append(tgt) stage_count += 1 print(f" βœ“ Successfully read project.json from {matched_folder_display}, found {stage_count} stage(s)") except Exception as e: print(f" βœ— Failed to read project.json in {matched_folder_display}: {repr(e)}") else: print(f" ⚠ No project.json in {matched_folder_display}") print("---") final_project = { "targets": [], "monitors": [], "extensions": [], "meta": { "semver": "3.0.0", "vm": "11.3.0", "agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36" } } # Add sprite targets (non-stage) for spr in project_data: if not spr.get("isStage", False): final_project["targets"].append(spr) if backdrop_data: all_costumes, sounds = [], [] seen_costumes = set() for i, bd in enumerate(backdrop_data): for costume in bd.get("costumes", []): key = (costume.get("name"), costume.get("assetId")) if key not in seen_costumes: seen_costumes.add(key) all_costumes.append(costume) if i == 0: sounds = bd.get("sounds", []) stage_obj={ "isStage": True, "name": "Stage", "objName": "Stage", "variables": {}, "lists": {}, "broadcasts": {}, "blocks": {}, "comments": {}, "currentCostume": 1 if len(all_costumes) > 1 else 0, "costumes": all_costumes, "sounds": sounds, "volume": 100, "layerOrder": 0, "tempo": 60, "videoTransparency": 50, "videoState": "on", "textToSpeechLanguage": None } final_project["targets"].insert(0, stage_obj) else: logger.warning("⚠️ No backdrop matched. Using default static backdrop.") default_backdrop_path = BACKDROP_DIR / "cd21514d0531fdffb22204e0ec5ed84a.svg" default_backdrop_name = "cd21514d0531fdffb22204e0ec5ed84a.svg" default_backdrop_sound = BACKDROP_DIR / "83a9787d4cb6f3b7632b4ddfebf74367.wav" default_backdrop_sound_name = "cd21514d0531fdffb22204e0ec5ed84a.svg" try: shutil.copy2(default_backdrop_path, os.path.join(project_folder, default_backdrop_name)) logger.info(f"βœ… Default backdrop copied to project: {default_backdrop_name}") shutil.copy2(default_backdrop_sound, os.path.join(project_folder, default_backdrop_sound_name)) logger.info(f"βœ… Default backdrop sound copied to project: {default_backdrop_sound_name}") except Exception as e: logger.error(f"❌ Failed to copy default backdrop: {e}") stage_obj={ "isStage": True, "name": "Stage", "objName": "Stage", "variables": {}, "lists": {}, "broadcasts": {}, "blocks": {}, "comments": {}, "currentCostume": 0, "costumes": [ { "assetId": default_backdrop_name.split(".")[0], "name": "defaultBackdrop", "md5ext": default_backdrop_name, "dataFormat": "svg", "rotationCenterX": 240, "rotationCenterY": 180 } ], "sounds": [ { "name": "pop", "assetId": "83a9787d4cb6f3b7632b4ddfebf74367", "dataFormat": "wav", "format": "", "rate": 48000, "sampleCount": 1123, "md5ext": "83a9787d4cb6f3b7632b4ddfebf74367.wav" } ], "volume": 100, "layerOrder": 0, "tempo": 60, "videoTransparency": 50, "videoState": "on", "textToSpeechLanguage": None } final_project["targets"].insert(0, stage_obj) with open(project_json_path, 'w') as f: json.dump(final_project, f, indent=2) return project_json_path # ''' It appends all the list and paths from json files and pick the best match's path''' # def similarity_matching(sprites_data: dict, project_folder: str, top_k: int = 1, min_similarity: float = None) -> str: # print("πŸ” Running similarity matching…") # import os # import json # os.makedirs(project_folder, exist_ok=True) # backdrop_base_path = os.path.normpath(str(BACKDROP_DIR)) # sprite_base_path = os.path.normpath(str(SPRITE_DIR)) # code_blocks_path = os.path.normpath(str(CODE_BLOCKS_DIR)) # project_json_path = os.path.join(project_folder, "project.json") # # ------------------------- # # Build sprite images list (BytesIO) from sprites_data # # ------------------------- # sprite_ids, sprite_base64 = [], [] # for sid, sprite in sprites_data.items(): # sprite_ids.append(sid) # sprite_base64.append(sprite["base64"]) # sprite_images_bytes = [] # for b64 in sprite_base64: # img = Image.open(BytesIO(base64.b64decode(b64.split(",")[-1]))).convert("RGB") # buffer = BytesIO() # img.save(buffer, format="PNG") # buffer.seek(0) # sprite_images_bytes.append(buffer) # # ----------------------------------------- # # Hybrid Similarity Matching System # # ----------------------------------------- # def hybrid_similarity_matching(sprite_images_bytes, sprite_ids, # min_similarity=None, top_k=5, method_weights=(0.5, 0.3, 0.2)): # """ # Hybrid similarity matching using DINOv2 embeddings, perceptual hashing, and image signatures # Args: # sprite_images_bytes: List of image bytes # sprite_ids: List of sprite identifiers # blocks_dir: Directory containing reference blocks # min_similarity: Minimum similarity threshold # top_k: Number of top matches to return # method_weights: Weights for (embedding, phash, image_signature) methods # Returns: # per_sprite_matched_indices, per_sprite_scores, paths_list # """ # import imagehash as phash # from image_match.goldberg import ImageSignature # import math # from collections import defaultdict # # Load reference data # embeddings_path = os.path.join(BLOCKS_DIR, "hybrid_embeddings.json") # hash_path = os.path.join(BLOCKS_DIR, "phash_data.json") # signature_path = os.path.join(BLOCKS_DIR, "signature_data.json") # # Load embeddings # with open(embeddings_path, "r", encoding="utf-8") as f: # embedding_json = json.load(f) # # Load phash data (if exists) # hash_dict = {} # if os.path.exists(hash_path): # with open(hash_path, "r", encoding="utf-8") as f: # hash_data = json.load(f) # for path, hash_str in hash_data.items(): # try: # hash_dict[path] = phash.hex_to_hash(hash_str) # except: # pass # # Load signature data (if exists) # signature_dict = {} # gis = ImageSignature() # if os.path.exists(signature_path): # with open(signature_path, "r", encoding="utf-8") as f: # sig_data = json.load(f) # for path, sig_list in sig_data.items(): # try: # signature_dict[path] = np.array(sig_list) # except: # pass # # Parse embeddings # paths_list = [] # embeddings_list = [] # if isinstance(embedding_json, dict): # for p, emb in embedding_json.items(): # if isinstance(emb, dict): # maybe_emb = emb.get("embedding") or emb.get("embeddings") or emb.get("emb") # if maybe_emb is None: # continue # arr = np.asarray(maybe_emb, dtype=np.float32) # elif isinstance(emb, list): # arr = np.asarray(emb, dtype=np.float32) # else: # continue # paths_list.append(os.path.normpath(str(p))) # embeddings_list.append(arr) # elif isinstance(embedding_json, list): # for item in embedding_json: # if not isinstance(item, dict): # continue # p = item.get("path") or item.get("image_path") or item.get("file") or item.get("filename") or item.get("img_path") # emb = item.get("embeddings") or item.get("embedding") or item.get("features") or item.get("vector") or item.get("emb") # if p is None or emb is None: # continue # paths_list.append(os.path.normpath(str(p))) # embeddings_list.append(np.asarray(emb, dtype=np.float32)) # if len(paths_list) == 0: # raise RuntimeError("No reference images/embeddings found") # ref_matrix = np.vstack(embeddings_list).astype(np.float32) # # Process input sprites # # init_dinov2() # per_sprite_matched_indices = [] # per_sprite_scores = [] # for i, (sprite_bytes, sprite_id) in enumerate(zip(sprite_images_bytes, sprite_ids)): # print(f"Processing sprite {i+1}/{len(sprite_ids)}: {sprite_id}") # # Convert bytes to PIL for processing # sprite_pil = Image.open(sprite_bytes) # if sprite_pil is None: # per_sprite_matched_indices.append([]) # per_sprite_scores.append([]) # continue # # Enhance image # enhanced_sprite = process_image_cv2_from_pil(sprite_pil, scale=2) # if enhanced_sprite is None: # enhanced_sprite = sprite_pil # # 1. Compute DINOv2 embedding # sprite_emb = get_dinov2_embedding_from_pil(preprocess_for_model(enhanced_sprite)) # if sprite_emb is None: # sprite_emb = np.zeros(ref_matrix.shape[1]) # # 2. Compute perceptual hash # sprite_hash_arr = preprocess_for_hash(enhanced_sprite) # sprite_phash = None # if sprite_hash_arr is not None: # try: # sprite_phash = phash.encode_image(image_array=sprite_hash_arr) # except: # pass # # 3. Compute image signature # sprite_sig = None # try: # temp_path = f"temp_sprite_{i}.png" # enhanced_sprite.save(temp_path, format="PNG") # sprite_sig = gis.generate_signature(temp_path) # os.remove(temp_path) # except: # pass # # Calculate similarities for all reference images # embedding_results = [] # phash_results = [] # signature_results = [] # for j, ref_path in enumerate(paths_list): # # Embedding similarity # try: # ref_emb = ref_matrix[j] # emb_sim = float(np.dot(sprite_emb, ref_emb)) # emb_sim = max(0.0, emb_sim) # Clamp negative values # except: # emb_sim = 0.0 # embedding_results.append((ref_path, emb_sim)) # # Phash similarity # ph_sim = 0.0 # if sprite_phash is not None and ref_path in hash_dict: # try: # ref_hash = hash_dict[ref_path] # hd = phash.hamming_distance(sprite_phash, ref_hash) # ph_sim = max(0.0, 1.0 - (hd / 64.0)) # Normalize to [0,1] # except: # pass # phash_results.append((ref_path, ph_sim)) # # Signature similarity # sig_sim = 0.0 # if sprite_sig is not None and ref_path in signature_dict: # try: # ref_sig = signature_dict[ref_path] # dist = gis.normalized_distance(ref_sig, sprite_sig) # sig_sim = max(0.0, 1.0 - dist) # except: # pass # signature_results.append((ref_path, sig_sim)) # # Combine similarities using weighted approach # def normalize_scores(scores): # """Normalize scores to [0,1] range""" # if not scores: # return {} # vals = [s for _, s in scores if not math.isnan(s)] # if not vals: # return {p: 0.0 for p, _ in scores} # vmin, vmax = min(vals), max(vals) # if vmax == vmin: # return {p: 1.0 if s == vmax else 0.0 for p, s in scores} # return {p: (s - vmin) / (vmax - vmin) for p, s in scores} # # Normalize each method's scores # emb_norm = normalize_scores(embedding_results) # ph_norm = normalize_scores(phash_results) # sig_norm = normalize_scores(signature_results) # # Calculate weighted combined scores # w_emb, w_ph, w_sig = method_weights # combined_scores = [] # for ref_path in paths_list: # combined_score = (w_emb * emb_norm.get(ref_path, 0.0) + # w_ph * ph_norm.get(ref_path, 0.0) + # w_sig * sig_norm.get(ref_path, 0.0)) # combined_scores.append((ref_path, combined_score)) # # Sort by combined score and apply thresholds # combined_scores.sort(key=lambda x: x[1], reverse=True) # # Filter by minimum similarity if specified # if min_similarity is not None: # combined_scores = [(p, s) for p, s in combined_scores if s >= float(min_similarity)] # # Get top-k matches # top_matches = combined_scores[:int(top_k)] # # Convert to indices and scores # matched_indices = [] # matched_scores = [] # for ref_path, score in top_matches: # try: # idx = paths_list.index(ref_path) # matched_indices.append(idx) # matched_scores.append(score) # except ValueError: # continue # per_sprite_matched_indices.append(matched_indices) # per_sprite_scores.append(matched_scores) # print(f"Sprite '{sprite_id}' matched {len(matched_indices)} references with scores: {matched_scores}") # return per_sprite_matched_indices, per_sprite_scores, paths_list # def choose_top_candidates_advanced(embedding_results, phash_results, imgmatch_results, top_k=10, # method_weights=(0.5, 0.3, 0.2), verbose=True): # """ # Advanced candidate selection using multiple ranking methods # Args: # embedding_results: list of (path, emb_sim) # phash_results: list of (path, hamming, ph_sim) # imgmatch_results: list of (path, dist, im_sim) # top_k: number of top candidates to return # method_weights: weights for (emb, phash, imgmatch) # verbose: whether to print detailed results # Returns: # dict with top candidates from different methods and final selection # """ # import math # from collections import defaultdict # # Build dicts for quick lookup # emb_map = {p: float(s) for p, s in embedding_results} # ph_map = {p: float(sim) for p, _, sim in phash_results} # im_map = {p: float(sim) for p, _, sim in imgmatch_results} # # Universe of candidates (union) # all_paths = sorted(set(list(emb_map.keys()) + list(ph_map.keys()) + list(im_map.keys()))) # # Normalize each metric across candidates to [0,1] # def normalize_map(m): # vals = [m.get(p, None) for p in all_paths] # present = [v for v in vals if v is not None and not math.isnan(v)] # if not present: # return {p: 0.0 for p in all_paths} # vmin, vmax = min(present), max(present) # if vmax == vmin: # return {p: (1.0 if (m.get(p, None) is not None) else 0.0) for p in all_paths} # norm = {} # for p in all_paths: # v = m.get(p, None) # if v is None or math.isnan(v): # norm[p] = 0.0 # else: # norm[p] = max(0.0, min(1.0, (v - vmin) / (vmax - vmin))) # return norm # # For embeddings, clamp negatives to 0 first # emb_map_clamped = {p: max(0.0, v) for p, v in emb_map.items()} # emb_norm = normalize_map(emb_map_clamped) # ph_norm = normalize_map(ph_map) # im_norm = normalize_map(im_map) # # Method A: Normalized weighted average # w_emb, w_ph, w_im = method_weights # weighted_scores = {} # for p in all_paths: # weighted_scores[p] = (w_emb * emb_norm.get(p, 0.0) # + w_ph * ph_norm.get(p, 0.0) # + w_im * im_norm.get(p, 0.0)) # top_weighted = sorted(weighted_scores.items(), key=lambda x: x[1], reverse=True)[:top_k] # # Method B: Rank-sum (Borda) # def ranks_from_map(m_norm): # items = sorted(m_norm.items(), key=lambda x: x[1], reverse=True) # ranks = {} # for i, (p, _) in enumerate(items): # ranks[p] = i + 1 # 1-based # worst = len(items) + 1 # for p in all_paths: # if p not in ranks: # ranks[p] = worst # return ranks # rank_emb = ranks_from_map(emb_norm) # rank_ph = ranks_from_map(ph_norm) # rank_im = ranks_from_map(im_norm) # rank_sum = {} # for p in all_paths: # rank_sum[p] = rank_emb.get(p, 9999) + rank_ph.get(p, 9999) + rank_im.get(p, 9999) # top_rank_sum = sorted(rank_sum.items(), key=lambda x: x[1])[:top_k] # smaller is better # # Method C: Harmonic mean # harm_scores = {} # for p in all_paths: # a = emb_norm.get(p, 0.0) # b = ph_norm.get(p, 0.0) # c = im_norm.get(p, 0.0) # if a + b + c == 0 or a == 0 or b == 0 or c == 0: # harm = 0.0 # else: # harm = 3.0 / ((1.0/a) + (1.0/b) + (1.0/c)) # harm_scores[p] = harm # top_harm = sorted(harm_scores.items(), key=lambda x: x[1], reverse=True)[:top_k] # # Consensus set: items in top-K of each metric # def topk_set_by_map(m_norm, k=top_k): # return set([p for p,_ in sorted(m_norm.items(), key=lambda x: x[1], reverse=True)[:k]]) # cons_set = topk_set_by_map(emb_norm, top_k) & topk_set_by_map(ph_norm, top_k) & topk_set_by_map(im_norm, top_k) # result = { # "emb_norm": emb_norm, # "ph_norm": ph_norm, # "im_norm": im_norm, # "weighted_topk": top_weighted, # "rank_sum_topk": top_rank_sum, # "harmonic_topk": top_harm, # "consensus_topk": list(cons_set), # "weighted_scores_full": weighted_scores, # "rank_sum_full": rank_sum, # "harmonic_full": harm_scores # } # if verbose: # print(f"\nTop by Weighted Average (weights emb,ph,img = {w_emb:.2f},{w_ph:.2f},{w_im:.2f}):") # for i,(p,s) in enumerate(result["weighted_topk"], start=1): # print(f" {i}. {p} score={s:.4f} emb={emb_norm.get(p,0):.3f} ph={ph_norm.get(p,0):.3f} im={im_norm.get(p,0):.3f}") # print("\nTop by Rank-sum (lower is better):") # for i,(p,s) in enumerate(result["rank_sum_topk"], start=1): # print(f" {i}. {p} rank_sum={s} emb_rank={rank_emb.get(p)} ph_rank={rank_ph.get(p)} img_rank={rank_im.get(p)}") # print("\nTop by Harmonic mean:") # for i,(p,s) in enumerate(result["harmonic_topk"], start=1): # print(f" {i}. {p} harm={s:.4f} emb={emb_norm.get(p,0):.3f} ph={ph_norm.get(p,0):.3f} im={im_norm.get(p,0):.3f}") # print(f"\nConsensus (in top-{top_k} of ALL metrics): {result['consensus_topk']}") # # Final selection logic # final = None # if len(result["consensus_topk"]) > 0: # # Choose best-weighted among consensus # consensus = result["consensus_topk"] # best = max(consensus, key=lambda p: result["weighted_scores_full"].get(p, 0.0)) # final = best # else: # final = result["weighted_topk"][0][0] if result["weighted_topk"] else None # result["final_selection"] = final # return result # # Use hybrid matching system # # BLOCKS_DIR = r"D:\DEV PATEL\2025\scratch_VLM\scratch_agent\blocks" # per_sprite_matched_indices, per_sprite_scores, paths_list = hybrid_similarity_matching( # sprite_images_bytes, sprite_ids, min_similarity, top_k, method_weights=(0.5, 0.3, 0.2) # ) # # ========================================= # # Copy matched sprite assets + collect data # # ========================================= # project_data = [] # backdrop_data = [] # copied_sprite_folders = set() # copied_backdrop_folders = set() # # Flatten unique matched indices to process copying once per folder # matched_indices = sorted({idx for lst in per_sprite_matched_indices for idx in lst}) # print("matched_indices------------------>",matched_indices) # import shutil # import json # import os # from pathlib import Path # # normalize base paths once before the loop # sprite_base_p = Path(sprite_base_path).resolve(strict=False) # backdrop_base_p = Path(backdrop_base_path).resolve(strict=False) # project_folder_p = Path(project_folder) # project_folder_p.mkdir(parents=True, exist_ok=True) # copied_sprite_folders = set() # copied_backdrop_folders = set() # def display_like_windows_no_lead(p: Path) -> str: # """ # For human-readable logs only β€” convert Path to a string like: # "app\\blocks\\Backdrops\\Castle 2.sb3" (no leading slash). # """ # s = p.as_posix() # forward-slash string, safe for Path objects # if s.startswith("/"): # s = s[1:] # return s.replace("/", "\\") # def is_subpath(child: Path, parent: Path) -> bool: # """Robust membership test: is child under parent?""" # try: # # use non-strict resolve only if needed, but avoid exceptions # child.relative_to(parent) # return True # except Exception: # return False # # Flatten unique matched indices (if not already) # matched_indices = sorted({idx for lst in per_sprite_matched_indices for idx in lst}) # print("matched_indices------------------>", matched_indices) # for matched_idx in matched_indices: # # defensive check # if not (0 <= matched_idx < len(paths_list)): # print(f" ⚠ matched_idx {matched_idx} out of range, skipping") # continue # matched_image_path = paths_list[matched_idx] # matched_path_p = Path(matched_image_path).resolve(strict=False) # keep as Path # matched_folder_p = matched_path_p.parent # Path object # matched_filename = matched_path_p.name # # Prepare display-only string (do NOT reassign matched_folder_p) # matched_folder_display = display_like_windows_no_lead(matched_folder_p) # print(f"Processing matched image: {matched_image_path}") # print(f" - Folder: {matched_folder_display}") # print(f" - Sprite path: {display_like_windows_no_lead(sprite_base_p)}") # print(f" - Backdrop path: {display_like_windows_no_lead(backdrop_base_p)}") # print(f" - Filename: {matched_filename}") # # Use a canonical string to store in the copied set (POSIX absolute-ish) # folder_key = matched_folder_p.as_posix() # # ---------- SPRITE ---------- # if is_subpath(matched_folder_p, sprite_base_p) and folder_key not in copied_sprite_folders: # print(f"Processing SPRITE folder: {matched_folder_display}") # copied_sprite_folders.add(folder_key) # sprite_json_path = matched_folder_p / "sprite.json" # print("sprite_json_path----------------------->", sprite_json_path) # print("copied sprite folder----------------------->", copied_sprite_folders) # if sprite_json_path.exists() and sprite_json_path.is_file(): # try: # with sprite_json_path.open("r", encoding="utf-8") as f: # sprite_info = json.load(f) # project_data.append(sprite_info) # print(f" βœ“ Successfully read sprite.json from {matched_folder_display}") # except Exception as e: # print(f" βœ— Failed to read sprite.json in {matched_folder_display}: {repr(e)}") # else: # print(f" ⚠ No sprite.json in {matched_folder_display}") # # copy non-matching files from the sprite folder (except matched image and sprite.json) # try: # sprite_files = list(matched_folder_p.iterdir()) # except Exception as e: # sprite_files = [] # print(f" βœ— Failed to list files in {matched_folder_display}: {repr(e)}") # print(f" Files in sprite folder: {[p.name for p in sprite_files]}") # for p in sprite_files: # fname = p.name # if fname in (matched_filename, "sprite.json"): # print(f" Skipping {fname} (matched image or sprite.json)") # continue # if p.is_file(): # dst = project_folder_p / fname # try: # shutil.copy2(str(p), str(dst)) # print(f" βœ“ Copied sprite asset: {p} -> {dst}") # except Exception as e: # print(f" βœ— Failed to copy sprite asset {p}: {repr(e)}") # else: # print(f" Skipping {fname} (not a file)") # # ---------- BACKDROP ---------- # if is_subpath(matched_folder_p, backdrop_base_p) and folder_key not in copied_backdrop_folders: # print(f"Processing BACKDROP folder: {matched_folder_display}") # copied_backdrop_folders.add(folder_key) # print("backdrop_base_path----------------------->", display_like_windows_no_lead(backdrop_base_p)) # print("copied backdrop folder----------------------->", copied_backdrop_folders) # # copy matched backdrop image # backdrop_src = matched_folder_p / matched_filename # backdrop_dst = project_folder_p / matched_filename # if backdrop_src.exists() and backdrop_src.is_file(): # try: # shutil.copy2(str(backdrop_src), str(backdrop_dst)) # print(f" βœ“ Copied matched backdrop image: {backdrop_src} -> {backdrop_dst}") # except Exception as e: # print(f" βœ— Failed to copy matched backdrop image {backdrop_src}: {repr(e)}") # else: # print(f" ⚠ Matched backdrop source not found: {backdrop_src}") # # copy other files from folder (skip project.json and matched image) # try: # backdrop_files = list(matched_folder_p.iterdir()) # except Exception as e: # backdrop_files = [] # print(f" βœ— Failed to list files in {matched_folder_display}: {repr(e)}") # print(f" Files in backdrop folder: {[p.name for p in backdrop_files]}") # for p in backdrop_files: # fname = p.name # if fname in (matched_filename, "project.json"): # print(f" Skipping {fname} (matched image or project.json)") # continue # if p.is_file(): # dst = project_folder_p / fname # try: # shutil.copy2(str(p), str(dst)) # print(f" βœ“ Copied backdrop asset: {p} -> {dst}") # except Exception as e: # print(f" βœ— Failed to copy backdrop asset {p}: {repr(e)}") # else: # print(f" Skipping {fname} (not a file)") # # read project.json to extract Stage/targets # pj = matched_folder_p / "project.json" # if pj.exists() and pj.is_file(): # try: # with pj.open("r", encoding="utf-8") as f: # bd_json = json.load(f) # stage_count = 0 # for tgt in bd_json.get("targets", []): # if tgt.get("isStage"): # backdrop_data.append(tgt) # stage_count += 1 # print(f" βœ“ Successfully read project.json from {matched_folder_display}, found {stage_count} stage(s)") # except Exception as e: # print(f" βœ— Failed to read project.json in {matched_folder_display}: {repr(e)}") # else: # print(f" ⚠ No project.json in {matched_folder_display}") # print("---") # final_project = { # "targets": [], "monitors": [], "extensions": [], # "meta": { # "semver": "3.0.0", # "vm": "11.3.0", # "agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36" # } # } # # Add sprite targets (non-stage) # for spr in project_data: # if not spr.get("isStage", False): # final_project["targets"].append(spr) # # then backdrop as the Stage # if backdrop_data: # all_costumes, sounds = [], [] # seen_costumes = set() # for i, bd in enumerate(backdrop_data): # for costume in bd.get("costumes", []): # # Create a unique key for the costume # key = (costume.get("name"), costume.get("assetId")) # if key not in seen_costumes: # seen_costumes.add(key) # all_costumes.append(costume) # if i == 0: # sounds = bd.get("sounds", []) # stage_obj={ # "isStage": True, # "name": "Stage", # "objName": "Stage", # "variables": {}, # "lists": {}, # "broadcasts": {}, # "blocks": {}, # "comments": {}, # "currentCostume": 1 if len(all_costumes) > 1 else 0, # "costumes": all_costumes, # "sounds": sounds, # "volume": 100, # "layerOrder": 0, # "tempo": 60, # "videoTransparency": 50, # "videoState": "on", # "textToSpeechLanguage": None # } # final_project["targets"].insert(0, stage_obj) # else: # logger.warning("⚠️ No backdrop matched. Using default static backdrop.") # default_backdrop_path = BACKDROP_DIR / "cd21514d0531fdffb22204e0ec5ed84a.svg" # default_backdrop_name = "cd21514d0531fdffb22204e0ec5ed84a.svg" # default_backdrop_sound = BACKDROP_DIR / "83a9787d4cb6f3b7632b4ddfebf74367.wav" # default_backdrop_sound_name = "cd21514d0531fdffb22204e0ec5ed84a.svg" # try: # shutil.copy2(default_backdrop_path, os.path.join(project_folder, default_backdrop_name)) # logger.info(f"βœ… Default backdrop copied to project: {default_backdrop_name}") # shutil.copy2(default_backdrop_sound, os.path.join(project_folder, default_backdrop_sound_name)) # logger.info(f"βœ… Default backdrop sound copied to project: {default_backdrop_sound_name}") # except Exception as e: # logger.error(f"❌ Failed to copy default backdrop: {e}") # stage_obj={ # "isStage": True, # "name": "Stage", # "objName": "Stage", # "variables": {}, # "lists": {}, # "broadcasts": {}, # "blocks": {}, # "comments": {}, # "currentCostume": 0, # "costumes": [ # { # "assetId": default_backdrop_name.split(".")[0], # "name": "defaultBackdrop", # "md5ext": default_backdrop_name, # "dataFormat": "svg", # "rotationCenterX": 240, # "rotationCenterY": 180 # } # ], # "sounds": [ # { # "name": "pop", # "assetId": "83a9787d4cb6f3b7632b4ddfebf74367", # "dataFormat": "wav", # "format": "", # "rate": 48000, # "sampleCount": 1123, # "md5ext": "83a9787d4cb6f3b7632b4ddfebf74367.wav" # } # ], # "volume": 100, # "layerOrder": 0, # "tempo": 60, # "videoTransparency": 50, # "videoState": "on", # "textToSpeechLanguage": None # } # final_project["targets"].insert(0, stage_obj) # with open(project_json_path, 'w') as f: # json.dump(final_project, f, indent=2) # return project_json_path def convert_pdf_stream_to_images(pdf_stream: io.BytesIO, dpi=300): # Ensure we are at the start of the stream pdf_stream.seek(0) with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf: tmp_pdf.write(pdf_stream.read()) tmp_pdf_path = tmp_pdf.name # Now use convert_from_path on the temp file images = convert_from_path(tmp_pdf_path, dpi=dpi) return images def delay_for_tpm_node(state: GameState): logger.info("--- Running DelayForTPMNode ---") time.sleep(10) # Adjust the delay as needed logger.info("Delay completed.") return state # Build the LangGraph workflow workflow = StateGraph(GameState) workflow.add_node("pseudo_generator", pseudo_generator_node) workflow.add_node("Node_optimizer", node_optimizer) workflow.add_node("layer_optimizer", layer_order_correction) workflow.add_node("block_builder", overall_block_builder_node_2) workflow.add_node("variable_initializer", variable_adder_node) workflow.add_node("page_processed", processed_page_node) workflow.set_entry_point("page_processed") # Conditional branching from the start def decide_next_step(state: GameState): if state.get("processing", False): return "pseudo_generator" else: return "layer_optimizer"#END workflow.add_conditional_edges( "page_processed", decide_next_step, { "pseudo_generator": "pseudo_generator", "layer_optimizer": "layer_optimizer" } ) # Main chain workflow.add_edge("pseudo_generator", "Node_optimizer") workflow.add_edge("Node_optimizer", "block_builder") workflow.add_edge("block_builder", "variable_initializer") workflow.add_edge("variable_initializer", "page_processed") workflow.add_edge("layer_optimizer", END) app_graph = workflow.compile() # ============== Helper function to Upscale an Image ============== # def upscale_image(image: Image.Image, scale: int = 2) -> Image.Image: """ Upscales a PIL image by a given scale factor. """ try: width, height = image.size new_size = (width * scale, height * scale) upscaled_image = image.resize(new_size, Image.LANCZOS) logger.info(f"βœ… Upscaled image to {new_size}") return upscaled_image except Exception as e: logger.error(f"❌ Error during image upscaling: {str(e)}") return image @log_execution_time def create_sb3_archive(project_folder, project_id): """ Zips the project folder and renames it to an .sb3 file. Args: project_folder (str): The path to the directory containing the project.json and assets. project_id (str): The unique ID for the project, used for naming the .sb3 file. Returns: str: The path to the created .sb3 file, or None if an error occurred. """ print(" --------------------------------------- create_sb3_archive INITIALIZE ---------------------------------------") output_filename = GEN_PROJECT_DIR / project_id print(" --------------------------------------- output_filename ---------------------------------------",output_filename) zip_path = None sb3_path = None try: zip_path = shutil.make_archive(output_filename, 'zip', root_dir=project_folder) print(" --------------------------------------- zip_path_str ---------------------------------------", output_filename, project_folder) logger.info(f"Project folder zipped to: {zip_path}") # 2. Rename the .zip file to .sb3 sb3_path = f"{output_filename}.sb3" os.rename(zip_path, sb3_path) print(" --------------------------------------- rename paths ---------------------------------------", zip_path, sb3_path) logger.info(f"Renamed {zip_path} to {sb3_path}") return sb3_path except Exception as e: logger.error(f"Error creating SB3 archive for {project_id}: {e}") # Clean up any partial files if an error occurs if zip_path and os.path.exists(zip_path): os.remove(zip_path) if sb3_path and os.path.exists(sb3_path): os.remove(sb3_path) return sb3_path #{ changes -> pdf_stream replacement of pdf_path # def save_pdf_to_generated_dir(pdf_path: str, project_id: str) -> str: def save_pdf_to_generated_dir(pdf_stream: io.BytesIO, project_id: str) -> str: """ Copies the PDF at `pdf_stream` into GEN_PROJECT_DIR/project_id/, renaming it to .pdf. Args: pdf_stream (io.BytesIO): Any existing stream to a PDF file. project_id (str): Your unique project identifier. Returns: str: Path to the copied PDF in the generated directory, or None if something went wrong. """ # } try: # 1) Build the destination directory and base filename output_dir = GEN_PROJECT_DIR / project_id output_dir.mkdir(parents=True, exist_ok=True) print(f"\n--------------------------------output_dir {output_dir}") # 2) Define the target PDF path target_pdf = output_dir / f"{project_id}.pdf" print(f"\n--------------------------------target_pdf {target_pdf}") # 3) Copy the PDF # { # shutil.copy2(pdf_path, target_pdf) if isinstance(pdf_stream, io.BytesIO): with open(target_pdf, "wb") as f: f.write(pdf_stream.getbuffer()) else: shutil.copy2(pdf_stream, target_pdf) print(f"Copied PDF from {pdf_stream} β†’ {target_pdf}") logger.info(f"Copied PDF from {pdf_stream} β†’ {target_pdf}") # } return str(target_pdf) except Exception as e: logger.error(f"Failed to save PDF to generated dir: {e}", exc_info=True) return None @app.route('/') def index(): return render_template('app_index.html') @app.route("/download_sb3/", methods=["GET"]) def download_sb3(project_id): sb3_path = GEN_PROJECT_DIR / f"{project_id}.sb3" if not sb3_path.exists(): return jsonify({"error": "Scratch project file not found"}), 404 return send_file( sb3_path, as_attachment=True, download_name=sb3_path.name ) @app.route("/download_pdf/", methods=["GET"]) def download_pdf(project_id): pdf_path = GEN_PROJECT_DIR / project_id / f"{project_id}.pdf" if not pdf_path.exists(): return jsonify({"error": "Scratch project file not found"}), 404 return send_file( pdf_path, as_attachment=True, download_name=pdf_path.name ) @app.route("/download_sound/", methods=["GET"]) def download_sound(sound_id): sound_path = SOUND_DIR / f"{sound_id}.wav" if not sound_path.exists(): return jsonify({"error": "Scratch project sound file not found"}), 404 return send_file( sound_path, as_attachment=True, download_name=sound_path.name ) # API endpoint @app.route('/process_pdf', methods=['POST']) def process_pdf(): try: logger.info("Received request to process PDF.") if 'pdf_file' not in request.files: logger.warning("No PDF file found in request.") return jsonify({"error": "Missing PDF file in form-data with key 'pdf_file'"}), 400 pdf_file = request.files['pdf_file'] if pdf_file.filename == '': return jsonify({"error": "Empty filename"}), 400 # ================================================= # # Generate Random UUID for project folder name # # ================================================= # project_id = str(uuid.uuid4()).replace('-', '') # project_folder = os.path.join("outputs", f"{project_id}") project_folder = OUTPUT_DIR / project_id pdf_bytes = pdf_file.read() pdf_stream = io.BytesIO(pdf_bytes) logger.info(f"Saved uploaded PDF to: {pdf_stream}") # pdf= save_pdf_to_generated_dir(saved_pdf_path, project_id) start_time = time.time() pdf= save_pdf_to_generated_dir(pdf_stream, project_id) logger.info(f"Saved uploaded PDF to: {pdf_file}: {pdf}") print("--------------------------------pdf_file_path---------------------",pdf_file,pdf_stream) total_time = time.time() - start_time print(f"-----------------------------Execution Time save_pdf_to_generated_dir() : {total_time}-----------------------------\n") start_time = time.time() # output_path = extract_images_from_pdf(pdf_stream) output_path = extract_images_from_pdf(pdf_stream,project_folder) print(" --------------------------------------- zip_path_str ---------------------------------------", output_path) total_time = time.time() - start_time print(f"-----------------------------Execution Time extract_images_from_pdf() : {total_time}-----------------------------\n") start_time = time.time() project_output = similarity_matching(output_path, project_folder) logger.info("Received request to process PDF.") total_time = time.time() - start_time print(f"-----------------------------Execution Time similarity_matching() : {total_time}-----------------------------\n") with open(project_output, 'r') as f: project_skeleton = json.load(f) if isinstance(pdf_stream, io.BytesIO): images = convert_pdf_stream_to_images(pdf_stream, dpi=300) else: images = convert_from_path(pdf_stream, dpi=300) #updating logic here [Dev Patel] initial_state_dict = { "project_json": project_skeleton, "description": "The pseudo code for the script", "project_id": project_id, "project_image": images, "action_plan": {}, "pseudo_code": {}, "temporary_node": {}, "processing":True, "page_count": 0, "temp_pseudo_code":[], } final_state_dict = app_graph.invoke(initial_state_dict,config={"recursion_limit": 200}) final_project_json = final_state_dict['project_json'] # Access as dict #final_project_json = project_skeleton # Save the *final* filled project JSON, overwriting the skeleton with open(project_output, "w") as f: json.dump(final_project_json, f, indent=2) logger.info(f"Final project JSON saved to {project_output}") # --- Call the new function to create the .sb3 file --- sb3_file_path = create_sb3_archive(project_folder, project_id) if sb3_file_path: logger.info(f"Successfully created SB3 file: {sb3_file_path}") # Instead of returning the local path, return a URL to the download endpoint download_url = f"https://prthm11-scratch-vision-game.hf.space/download_sb3/{project_id}" pdf_url = f"https://prthm11-scratch-vision-game.hf.space/download_pdf/{project_id}" print(f"DOWNLOAD_URL: {download_url}") print(f"PDF_URL: {pdf_url}") # return jsonify({"message": "Procesed PDF and Game sb3 generated successfully", "project_id": project_id, "download_url": download_url}) return jsonify({ "message": "βœ… PDF processed successfully", "output_json": "output_path", "sprites": "result", "project_output_json": "project_output", "test_url": download_url }) else: return jsonify({ "message": "❌ Scanned images are not clear please retry!", "isError": True, "output_json": "output_path", "sprites": "result", "project_output_json": "project_output", "test_url": download_url }), 500 except Exception as e: logger.error(f"Error during processing the pdf workflow for project ID {project_id}: {e}", exc_info=True) return jsonify({ "message": "❌ Scanned images are not clear please retry!", "isError": True, "output_json": "output_path", "sprites": "result", "project_output_json": "project_output", "test_url": "download_url" }), 500 if __name__ == '__main__': # os.makedirs("outputs", exist_ok=True) #== commented by P app.run(host='0.0.0.0', port=7860, debug=True)