Update deployer/gradio_generator.py
Browse files- deployer/gradio_generator.py +51 -44
deployer/gradio_generator.py
CHANGED
|
@@ -1,64 +1,71 @@
|
|
| 1 |
import os
|
| 2 |
-
import shutil
|
| 3 |
import tempfile
|
|
|
|
| 4 |
import zipfile
|
| 5 |
-
import asyncio
|
| 6 |
-
|
| 7 |
-
from core_creator.voice_to_app import VoiceToAppCreator
|
| 8 |
-
from deployer.simulator_interface import VirtualRobot
|
| 9 |
-
|
| 10 |
-
async def _run_pipeline(idea: str) -> dict:
|
| 11 |
-
"""
|
| 12 |
-
Asynchronously run the voice-to-app pipeline.
|
| 13 |
-
Returns the assets dict from VoiceToAppCreator.run_pipeline().
|
| 14 |
-
"""
|
| 15 |
-
creator = VoiceToAppCreator(idea)
|
| 16 |
-
return creator.run_pipeline()
|
| 17 |
|
| 18 |
def deploy_callback(idea: str):
|
| 19 |
"""
|
| 20 |
-
|
| 21 |
-
into a ZIP, and returns
|
|
|
|
| 22 |
"""
|
| 23 |
try:
|
| 24 |
-
#
|
| 25 |
-
|
| 26 |
-
|
| 27 |
-
except RuntimeError:
|
| 28 |
-
loop = None
|
| 29 |
-
|
| 30 |
-
if loop and loop.is_running():
|
| 31 |
-
assets = asyncio.run_coroutine_threadsafe(_run_pipeline(idea), loop).result()
|
| 32 |
-
else:
|
| 33 |
-
assets = asyncio.new_event_loop().run_until_complete(_run_pipeline(idea))
|
| 34 |
|
| 35 |
-
#
|
| 36 |
-
|
| 37 |
-
|
| 38 |
|
| 39 |
-
|
| 40 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 41 |
|
| 42 |
-
#
|
| 43 |
-
|
| 44 |
-
|
| 45 |
-
|
| 46 |
|
| 47 |
-
|
|
|
|
|
|
|
| 48 |
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
|
| 49 |
-
for root, _, files in os.walk(
|
| 50 |
for fname in files:
|
| 51 |
-
|
| 52 |
-
|
| 53 |
-
zf.write(
|
| 54 |
|
| 55 |
-
return f"β
Generated app: {
|
| 56 |
|
| 57 |
except Exception as e:
|
| 58 |
-
return f"β
|
|
|
|
| 59 |
|
| 60 |
-
def robot_behavior(
|
| 61 |
"""
|
| 62 |
-
|
|
|
|
| 63 |
"""
|
| 64 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
import os
|
|
|
|
| 2 |
import tempfile
|
| 3 |
+
import shutil
|
| 4 |
import zipfile
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 5 |
|
| 6 |
def deploy_callback(idea: str):
|
| 7 |
"""
|
| 8 |
+
Generates a minimal robot app based on the user's idea, packages it
|
| 9 |
+
into a ZIP in the system temp directory, and returns a status message
|
| 10 |
+
along with the path to the ZIP file.
|
| 11 |
"""
|
| 12 |
try:
|
| 13 |
+
# Sanitize and build a unique directory name
|
| 14 |
+
title = idea.strip().lower().replace(" ", "_") or "app"
|
| 15 |
+
base_dir = os.path.join(tempfile.gettempdir(), f"robosage_{title}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 16 |
|
| 17 |
+
# Clean out any old generation
|
| 18 |
+
shutil.rmtree(base_dir, ignore_errors=True)
|
| 19 |
+
os.makedirs(base_dir, exist_ok=True)
|
| 20 |
|
| 21 |
+
# 1) Create main.py with a simple on_command stub
|
| 22 |
+
main_py = os.path.join(base_dir, "main.py")
|
| 23 |
+
with open(main_py, "w") as f:
|
| 24 |
+
f.write(f"""def on_command(cmd):
|
| 25 |
+
cmd_lower = cmd.strip().lower()
|
| 26 |
+
if "hello" in cmd_lower:
|
| 27 |
+
return "π€ Hello there!"
|
| 28 |
+
else:
|
| 29 |
+
return f"π€ I don't understand: '{{cmd}}'"
|
| 30 |
+
""")
|
| 31 |
|
| 32 |
+
# 2) Create a README
|
| 33 |
+
readme = os.path.join(base_dir, "README.md")
|
| 34 |
+
with open(readme, "w") as f:
|
| 35 |
+
f.write(f"# RoboSage App\nGenerated for idea: {idea}\n\nRun `main.on_command(...)` to simulate.")
|
| 36 |
|
| 37 |
+
# 3) Zip the directory
|
| 38 |
+
zip_name = f"robosage_{title}.zip"
|
| 39 |
+
zip_path = os.path.join(tempfile.gettempdir(), zip_name)
|
| 40 |
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
|
| 41 |
+
for root, _, files in os.walk(base_dir):
|
| 42 |
for fname in files:
|
| 43 |
+
fullpath = os.path.join(root, fname)
|
| 44 |
+
arcname = os.path.relpath(fullpath, base_dir)
|
| 45 |
+
zf.write(fullpath, arcname)
|
| 46 |
|
| 47 |
+
return f"β
Generated app: {idea}", zip_path
|
| 48 |
|
| 49 |
except Exception as e:
|
| 50 |
+
return f"β Error: {e}", None
|
| 51 |
+
|
| 52 |
|
| 53 |
+
def robot_behavior(cmd: str) -> str:
|
| 54 |
"""
|
| 55 |
+
Simulate a robot response. You can swap this out for your
|
| 56 |
+
core_creator or simulator_interface logic if desired.
|
| 57 |
"""
|
| 58 |
+
# Very basic echo-like behavior
|
| 59 |
+
lower = cmd.strip().lower()
|
| 60 |
+
if "hello" in lower:
|
| 61 |
+
return "π€ *waves* Hello there!"
|
| 62 |
+
elif lower:
|
| 63 |
+
return f"π€ I don't know how to '{cmd}'."
|
| 64 |
+
else:
|
| 65 |
+
return "β Please say something."
|
| 66 |
+
|
| 67 |
+
|
| 68 |
+
# Optional entrypoint if you want to import a launcher function
|
| 69 |
+
def launch_gradio_app():
|
| 70 |
+
from app import main
|
| 71 |
+
main()
|