|
|
import json
|
|
|
import re
|
|
|
from typing import Any, Dict
|
|
|
import logging
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
hat_block_data = [
|
|
|
{"opcode": "event_whenflagclicked", "text": "when green flag clicked"},
|
|
|
{"opcode": "event_whenkeypressed", "text": "when [key] pressed"},
|
|
|
{"opcode": "event_whenbroadcastreceived", "text": "when I receive [message]"},
|
|
|
]
|
|
|
boolean_block_data = [
|
|
|
{"opcode": "operator_gt", "text": "< ( ) > ( ) >"},
|
|
|
{"opcode": "sensing_touchingobject", "text": "<touching [object]?>"},
|
|
|
{"opcode": "operator_equals", "text": "< ( ) = ( ) >"},
|
|
|
]
|
|
|
c_block_data = [
|
|
|
{"opcode": "control_forever", "text": "forever"},
|
|
|
{"opcode": "control_if", "text": "if < > then"},
|
|
|
{"opcode": "control_repeat", "text": "repeat ( )"},
|
|
|
]
|
|
|
cap_block_data = [
|
|
|
{"opcode": "control_stop", "text": "stop [all]"},
|
|
|
]
|
|
|
reporter_block_data = [
|
|
|
{"opcode": "motion_xposition", "text": "(x position)"},
|
|
|
{"opcode": "motion_yposition", "text": "(y position)"},
|
|
|
{"opcode": "data_variable", "text": "(variable)"},
|
|
|
{"opcode": "sensing_answer", "text": "(answer)"},
|
|
|
]
|
|
|
stack_block_data = [
|
|
|
{"opcode": "motion_gotoxy", "text": "go to x: ( ) y: ( )"},
|
|
|
{"opcode": "motion_changeyby", "text": "change y by ( )"},
|
|
|
{"opcode": "motion_setx", "text": "set x to ( )"},
|
|
|
{"opcode": "motion_glidesecstoxy", "text": "glide ( ) secs to x: ( ) y: ( )"},
|
|
|
{"opcode": "data_setvariableto", "text": "set [variable] to ( )"},
|
|
|
{"opcode": "looks_hide", "text": "hide"},
|
|
|
{"opcode": "looks_show", "text": "show"},
|
|
|
{"opcode": "event_broadcast", "text": "broadcast [message]"},
|
|
|
]
|
|
|
|
|
|
|
|
|
all_opcodes_list = []
|
|
|
for category_data in [
|
|
|
hat_block_data,
|
|
|
boolean_block_data,
|
|
|
c_block_data,
|
|
|
cap_block_data,
|
|
|
reporter_block_data,
|
|
|
stack_block_data,
|
|
|
]:
|
|
|
all_opcodes_list.extend(category_data)
|
|
|
|
|
|
|
|
|
def extract_json_from_llm_response(response_text: str) -> Dict[str, Any]:
|
|
|
"""Extracts JSON from an LLM response string."""
|
|
|
try:
|
|
|
json_match = re.search(r"```json\n(.*)\n```", response_text, re.DOTALL)
|
|
|
if json_match:
|
|
|
return json.loads(json_match.group(1))
|
|
|
return json.loads(response_text)
|
|
|
except json.JSONDecodeError as e:
|
|
|
logger.error(f"Failed to decode JSON: {e} from response: {response_text}")
|
|
|
raise
|
|
|
|
|
|
|
|
|
def plan_opcode_counter_node(state: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
"""
|
|
|
For each plan in state["action_plan"]["action_overall_flow"], calls the LLM agent
|
|
|
to analyze the `logic` string and return a list of {opcode, count} for each category.
|
|
|
"""
|
|
|
logger.info("=== Running OPCODE COUTER LOGIC with LLM counts ===")
|
|
|
game_description = state.get("description", "No game description provided.")
|
|
|
sprite_name = {target["name"]: target["name"] for target in state["project_json"]["targets"]}
|
|
|
|
|
|
action_flow = state.get("action_plan", {}).get("action_overall_flow", {})
|
|
|
if not action_flow:
|
|
|
logger.warning("No action_overall_flow found; skipping.")
|
|
|
return state
|
|
|
|
|
|
|
|
|
hat_description = "Blocks that start a script when an event happens."
|
|
|
hat_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in hat_block_data])
|
|
|
|
|
|
boolean_description = "Blocks that report a true or false value and fit into hexagonal inputs."
|
|
|
boolean_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in boolean_block_data])
|
|
|
|
|
|
c_description = "Blocks that run scripts inside them repeatedly or conditionally."
|
|
|
c_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in c_block_data])
|
|
|
|
|
|
cap_description = "Blocks that end a script."
|
|
|
cap_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in cap_block_data])
|
|
|
|
|
|
reporter_description = "Blocks that report a value (number or string) and fit into rounded inputs."
|
|
|
reporter_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in reporter_block_data])
|
|
|
|
|
|
stack_description = "Blocks that perform a main action in a script."
|
|
|
stack_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in stack_block_data])
|
|
|
|
|
|
refined_flow: Dict[str, Any] = {}
|
|
|
for sprite, sprite_data in action_flow.items():
|
|
|
refined_plans = []
|
|
|
for plan in sprite_data.get("plans", []):
|
|
|
logic = plan.get("logic", "")
|
|
|
event = plan.get("event", "")
|
|
|
|
|
|
|
|
|
|
|
|
opcode_counts = {
|
|
|
"motion": [],
|
|
|
"control": [],
|
|
|
"operator": [],
|
|
|
"sensing": [],
|
|
|
"looks": [],
|
|
|
"sounds": [],
|
|
|
"events": [],
|
|
|
"data": [],
|
|
|
}
|
|
|
|
|
|
|
|
|
temp_opcode_counts = {}
|
|
|
|
|
|
|
|
|
if event:
|
|
|
event_opcode = event.replace('v', '').strip()
|
|
|
temp_opcode_counts[event_opcode] = temp_opcode_counts.get(event_opcode, 0) + 1
|
|
|
|
|
|
|
|
|
|
|
|
for block_info in all_opcodes_list:
|
|
|
opcode = block_info["opcode"]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
block_text_escaped = re.escape(block_info["text"])
|
|
|
|
|
|
|
|
|
block_text_pattern = block_text_escaped.replace(r"\[key\]", r".*?").replace(r"\[message\]", r".*?").replace(r"\[object\]", r".*?").replace(r"\( \)", r".*?")
|
|
|
block_text_pattern = block_text_pattern.replace(r"\[variable\]", r".*?")
|
|
|
|
|
|
|
|
|
if opcode == "control_if":
|
|
|
if_regex = r"if <.+?> then"
|
|
|
if_else_regex = r"if <.+?> then\n.*else"
|
|
|
|
|
|
if re.search(if_else_regex, logic, re.DOTALL):
|
|
|
temp_opcode_counts["control_if_else"] = temp_opcode_counts.get("control_if_else", 0) + 1
|
|
|
elif re.search(if_regex, logic, re.DOTALL):
|
|
|
temp_opcode_counts["control_if"] = temp_opcode_counts.get("control_if", 0) + 1
|
|
|
continue
|
|
|
|
|
|
if opcode == "control_forever" and "forever" in logic:
|
|
|
temp_opcode_counts[opcode] = temp_opcode_counts.get(opcode, 0) + 1
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
count = len(re.findall(block_text_pattern, logic))
|
|
|
if count > 0:
|
|
|
temp_opcode_counts[opcode] = temp_opcode_counts.get(opcode, 0) + count
|
|
|
|
|
|
|
|
|
def add_to_category(category_list, opcode_name, count):
|
|
|
if count > 0:
|
|
|
category_list.append({"opcode": opcode_name, "count": count})
|
|
|
|
|
|
for opcode, count in temp_opcode_counts.items():
|
|
|
if opcode.startswith("motion_"):
|
|
|
add_to_category(opcode_counts["motion"], opcode, count)
|
|
|
elif opcode.startswith("control_"):
|
|
|
add_to_category(opcode_counts["control"], opcode, count)
|
|
|
elif opcode.startswith("operator_"):
|
|
|
add_to_category(opcode_counts["operator"], opcode, count)
|
|
|
elif opcode.startswith("sensing_"):
|
|
|
add_to_category(opcode_counts["sensing"], opcode, count)
|
|
|
elif opcode.startswith("looks_"):
|
|
|
add_to_category(opcode_counts["looks"], opcode, count)
|
|
|
elif opcode.startswith("sounds_"):
|
|
|
add_to_category(opcode_counts["sounds"], opcode, count)
|
|
|
elif opcode.startswith("event_"):
|
|
|
add_to_category(opcode_counts["events"], opcode, count)
|
|
|
elif opcode.startswith("data_"):
|
|
|
add_to_category(opcode_counts["data"], opcode, count)
|
|
|
|
|
|
|
|
|
plan["opcode_counts"] = opcode_counts
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
plan["motion"] = []
|
|
|
plan["control"] = []
|
|
|
plan["operator"] = []
|
|
|
plan["sensing"] = []
|
|
|
plan["looks"] = []
|
|
|
plan["sounds"] = []
|
|
|
plan["events"] = []
|
|
|
plan["data"] = []
|
|
|
|
|
|
|
|
|
for category, opcodes_list in opcode_counts.items():
|
|
|
for item in opcodes_list:
|
|
|
|
|
|
plan[category].extend([item['opcode']] * item['count'])
|
|
|
|
|
|
|
|
|
refined_plans.append(plan)
|
|
|
|
|
|
refined_flow[sprite] = {
|
|
|
"description": sprite_data.get("description", ""),
|
|
|
"plans": refined_plans
|
|
|
}
|
|
|
|
|
|
state["temporary_node"] = refined_flow
|
|
|
print(f"[OPCODE COUTER LOGIC]: {refined_flow}")
|
|
|
logger.info("=== OPCODE COUTER LOGIC completed ===")
|
|
|
return state |