# OpenTrack - Humanoid Motion Tracking Demo

This notebook demonstrates OpenTrack, an open-source humanoid motion tracking system using MuJoCo.

**Note**: Training can be resource-intensive. We'll use debug mode for quick testing.

## 1. Setup Environment

In [None]:
import os
import subprocess
import time
import glob
from pathlib import Path
from IPython.display import Video, display, HTML
import threading
import queue

# Define project directory structure
PROJECT_BASE = Path("/data/workspaces/opentrack")
DATASETS_DIR = PROJECT_BASE / "datasets"
MODELS_DIR = PROJECT_BASE / "models"
VIDEOS_DIR = PROJECT_BASE / "videos"
REPO_DIR = PROJECT_BASE / "OpenTrack"

# Create directories
for dir_path in [PROJECT_BASE, DATASETS_DIR, MODELS_DIR, VIDEOS_DIR]:
 dir_path.mkdir(parents=True, exist_ok=True)
 print(f"✓ {dir_path}")

print("\n✓ Environment setup complete!")

## 2. Clone OpenTrack Repository

In [None]:
# Clone the repository if not already cloned
if not REPO_DIR.exists():
 print(f"Cloning OpenTrack to {REPO_DIR}...")
 subprocess.run(
 ['git', 'clone', 'https://github.com/GalaxyGeneralRobotics/OpenTrack.git', str(REPO_DIR)],
 check=True
 )
 print("✓ Repository cloned successfully")
else:
 print(f"✓ Repository already exists at {REPO_DIR}")

# Change to repository directory
os.chdir(REPO_DIR)
print(f"\n✓ Working directory: {os.getcwd()}")

## 3. Install Dependencies

In [None]:
# Install PyTorch (CPU version for compatibility)
!pip install -q torch==2.5.1 torchvision==0.20.1 torchaudio==2.5.1 --index-url https://download.pytorch.org/whl/cpu

# Install OpenTrack requirements
!pip install -q -r requirements.txt

# Install additional packages for video handling
!pip install -q imageio imageio-ffmpeg

print("✓ All dependencies installed")

## 4. Download Motion Capture Data

In [None]:
from huggingface_hub import snapshot_download

# Define mocap directory in our datasets folder
mocap_dir = DATASETS_DIR / "lafan1" / "UnitreeG1"
mocap_dir.mkdir(parents=True, exist_ok=True)

repo_id = "robfiras/loco-mujoco-datasets"

print("Downloading all mocap data from Lafan1/mocap/UnitreeG1...")
print(f"Target directory: {mocap_dir}")
print("This will download all .npz files concurrently.\n")

try:
 # Use snapshot_download with allow_patterns to download only the files we need
 snapshot_path = snapshot_download(
 repo_id=repo_id,
 repo_type="dataset",
 allow_patterns="Lafan1/mocap/UnitreeG1/*.npz",
 local_dir=str(DATASETS_DIR),
 local_dir_use_symlinks=False
 )
 
 print(f"\n✓ Download complete! Files saved to: {snapshot_path}")
 
 # Verify files
 npz_files = list(mocap_dir.glob("*.npz"))
 print(f"✓ Found {len(npz_files)} .npz files in {mocap_dir}")
 
 if npz_files:
 print("\nSample files:")
 for f in sorted(npz_files)[:10]: # Show first 10 files
 print(f" - {f.name}")
 if len(npz_files) > 10:
 print(f" ... and {len(npz_files) - 10} more files")
 
 # Create symlink from OpenTrack's expected data directory to our datasets
 opentrack_data_dir = REPO_DIR / "data" / "mocap"
 opentrack_data_dir.parent.mkdir(parents=True, exist_ok=True)
 
 # Remove old symlink/directory if it exists
 if opentrack_data_dir.exists() or opentrack_data_dir.is_symlink():
 if opentrack_data_dir.is_symlink():
 opentrack_data_dir.unlink()
 else:
 import shutil
 shutil.rmtree(opentrack_data_dir)
 
 # Create symlink
 opentrack_data_dir.symlink_to(DATASETS_DIR, target_is_directory=True)
 print(f"\n✓ Created symlink: {opentrack_data_dir} -> {DATASETS_DIR}")
 
except Exception as e:
 print(f"⚠ Error downloading mocap data: {e}")
 print("\nYou may need to download manually from:")
 print("https://huggingface.co/datasets/robfiras/loco-mujoco-datasets/tree/main/Lafan1/mocap/UnitreeG1")
 print("\nOr check if you need to authenticate:")
 print(" huggingface-cli login")

## 5. Helper Functions for Background Process Management

In [None]:
def run_command_with_output(cmd, description="Running command"):
 """
 Run a command and display output in real-time
 """
 print(f"\n{'='*60}")
 print(f"{description}")
 print(f"Command: {' '.join(cmd)}")
 print(f"{'='*60}\n")
 
 process = subprocess.Popen(
 cmd,
 stdout=subprocess.PIPE,
 stderr=subprocess.STDOUT,
 text=True,
 bufsize=1,
 universal_newlines=True
 )
 
 # Read output line by line
 for line in process.stdout:
 print(line, end='')
 
 process.wait()
 
 if process.returncode == 0:
 print(f"\n✓ {description} completed successfully")
 else:
 print(f"\n⚠ {description} exited with code {process.returncode}")
 
 return process.returncode


def find_latest_experiment(exp_name_pattern):
 """
 Find the latest experiment folder matching the pattern
 """
 # Check both MODELS_DIR and REPO_DIR/logs
 search_dirs = [MODELS_DIR, REPO_DIR / "logs"]
 
 all_experiments = []
 for logs_dir in search_dirs:
 if not logs_dir.exists():
 continue
 
 # Find all matching experiments
 experiments = [
 d for d in logs_dir.iterdir() 
 if d.is_dir() and exp_name_pattern in d.name
 ]
 all_experiments.extend(experiments)
 
 if not all_experiments:
 return None
 
 # Sort by modification time and return latest
 latest = sorted(all_experiments, key=lambda x: x.stat().st_mtime, reverse=True)[0]
 return latest.name


def find_generated_videos(output_dir=None):
 """
 Find all generated video files
 """
 if output_dir is None:
 output_dir = VIDEOS_DIR
 else:
 output_dir = Path(output_dir)
 
 videos = []
 
 if output_dir.exists():
 videos = list(output_dir.glob("*.mp4")) + list(output_dir.glob("*.gif"))
 
 if not videos:
 # Try alternative locations
 alternative_dirs = [REPO_DIR, REPO_DIR / "logs", MODELS_DIR]
 for alt_dir in alternative_dirs:
 if alt_dir.exists():
 found = list(alt_dir.glob("**/*.mp4")) + list(alt_dir.glob("**/*.gif"))
 videos.extend(found)
 
 return sorted(videos, key=lambda x: x.stat().st_mtime, reverse=True)


def setup_model_output_symlink():
 """
 Create symlink from OpenTrack's logs directory to our models directory
 """
 opentrack_logs_dir = REPO_DIR / "logs"
 
 # Remove old symlink/directory if it exists
 if opentrack_logs_dir.exists() or opentrack_logs_dir.is_symlink():
 if opentrack_logs_dir.is_symlink():
 opentrack_logs_dir.unlink()
 else:
 import shutil
 # Move existing logs to MODELS_DIR first
 if opentrack_logs_dir.is_dir():
 for item in opentrack_logs_dir.iterdir():
 shutil.move(str(item), str(MODELS_DIR))
 shutil.rmtree(opentrack_logs_dir)
 
 # Create symlink
 opentrack_logs_dir.symlink_to(MODELS_DIR, target_is_directory=True)
 print(f"✓ Created symlink: {opentrack_logs_dir} -> {MODELS_DIR}")

print("✓ Helper functions loaded")

## 6. Quick Training (Debug Mode)

We'll run a quick training session in debug mode. This won't produce a well-trained model but will verify everything works.

In [None]:
# Setup symlink for model outputs
setup_model_output_symlink()

print(f"\nModels will be saved to: {MODELS_DIR}\n")

# Run quick training
cmd = [
 'python', 'train_policy.py',
 '--exp_name', 'debug',
 '--terrain_type', 'flat_terrain'
]

return_code = run_command_with_output(cmd, "Training OpenTrack (debug mode)")

# Find the experiment folder
exp_folder = find_latest_experiment('debug')
if exp_folder:
 print(f"\n✓ Training completed! Experiment: {exp_folder}")
 print(f"✓ Model saved in: {MODELS_DIR / exp_folder}")
else:
 print("\n⚠ Could not find experiment folder")

## 7. Convert Checkpoint (Brax → PyTorch)

In [None]:
# Get the latest experiment name
exp_folder = find_latest_experiment('debug')

if exp_folder:
 print(f"Converting checkpoint for: {exp_folder}")
 
 cmd = [
 'python', 'brax2torch.py',
 '--exp_name', exp_folder
 ]
 
 return_code = run_command_with_output(cmd, "Converting Brax checkpoint to PyTorch")
else:
 print("⚠ No experiment found. Run training first.")

## 8. Generate Videos (Headless Rendering)

This will run the policy and generate videos using MuJoCo's headless renderer.

In [None]:
# Get the latest experiment name
exp_folder = find_latest_experiment('debug')

if exp_folder:
 print(f"Generating videos for: {exp_folder}")
 print(f"Videos will be saved to: {VIDEOS_DIR}\n")
 
 # Use --use_renderer for headless video generation (NOT --use_viewer)
 cmd = [
 'python', 'play_policy.py',
 '--exp_name', exp_folder,
 '--use_renderer',
 # '--play_ref_motion', # Uncomment to also show reference motion
 ]
 
 return_code = run_command_with_output(cmd, "Generating videos with MuJoCo renderer")
 
 print("\n" + "="*60)
 print("Searching for generated videos...")
 print("="*60)
 
 # Wait a moment for files to be written
 time.sleep(2)
 
 # Find generated videos
 videos = find_generated_videos()
 
 # Move videos to VIDEOS_DIR if they're not already there
 if videos:
 import shutil
 moved_videos = []
 for video in videos:
 if not video.is_relative_to(VIDEOS_DIR):
 dest = VIDEOS_DIR / video.name
 shutil.copy2(video, dest)
 moved_videos.append(dest)
 print(f"Moved: {video.name} -> {dest}")
 else:
 moved_videos.append(video)
 videos = moved_videos
 
 if videos:
 print(f"\n✓ Found {len(videos)} video(s) in {VIDEOS_DIR}:")
 for video in videos:
 print(f" - {video}")
 else:
 print("\n⚠ No videos found. Checking alternative locations...")
 # Search more broadly
 all_videos = list(REPO_DIR.rglob("*.mp4")) + list(REPO_DIR.rglob("*.gif"))
 if all_videos:
 print(f"Found {len(all_videos)} video(s) in project:")
 for video in all_videos[:10]: # Show first 10
 print(f" - {video}")
else:
 print("⚠ No experiment found. Run training first.")

## 9. Display Generated Videos

In [None]:
# Find and display all generated videos from our videos directory
videos = find_generated_videos()

if not videos:
 # Try alternative search in the whole project
 videos = list(REPO_DIR.rglob("*.mp4")) + list(REPO_DIR.rglob("*.gif"))

if videos:
 print(f"Displaying {len(videos)} video(s) from {VIDEOS_DIR}:\n")
 
 for i, video_path in enumerate(videos[:5]): # Display first 5 videos
 print(f"\n{'='*60}")
 print(f"Video {i+1}: {video_path.name}")
 print(f"Location: {video_path}")
 print(f"{'='*60}")
 
 try:
 if video_path.suffix == '.mp4':
 display(Video(str(video_path), width=800, embed=True))
 elif video_path.suffix == '.gif':
 display(HTML(f''))
 except Exception as e:
 print(f"⚠ Error displaying video: {e}")
 print(f"You can download it from: {video_path}")
else:
 print("⚠ No videos found.")
 print("\nPossible reasons:")
 print("1. Training didn't complete successfully")
 print("2. Checkpoint conversion failed")
 print("3. Video generation failed")
 print("\nCheck the output of previous cells for errors.")

## 10. Optional: Generate Rough Terrain

If you want to test on rough terrain, run this cell first to generate terrain data.

In [None]:
# Generate rough terrain using Perlin noise
cmd = ['python', 'generate_terrain.py']
return_code = run_command_with_output(cmd, "Generating rough terrain")

print("\n✓ Terrain generated! You can now train with --terrain_type rough_terrain")

## 11. Optional: Full Training (Takes Much Longer)

⚠️ Warning: This will take significant time and resources. Only run if you have GPU access and time.

In [None]:
# Full training (uncomment to run)
# cmd = [
# 'python', 'train_policy.py',
# '--exp_name', 'flat_terrain_full',
# '--terrain_type', 'flat_terrain'
# ]
# 
# return_code = run_command_with_output(cmd, "Full training on flat terrain")
# 
# # Then convert and play
# exp_folder = find_latest_experiment('flat_terrain_full')
# if exp_folder:
# !python brax2torch.py --exp_name {exp_folder}
# !python play_policy.py --exp_name {exp_folder} --use_renderer

print("Full training cell ready (currently commented out)")

## Summary

This notebook demonstrates:
1. ✅ Setting up OpenTrack in a Jupyter environment
2. ✅ Running training (debug mode for quick testing)
3. ✅ Converting Brax checkpoints to PyTorch
4. ✅ Generating videos using headless MuJoCo renderer
5. ✅ Displaying videos in the notebook

**Project Structure:**
```
/data/workspaces/opentrack/
├── datasets/ # Motion capture data (.npz files)
│ └── lafan1/
│ └── UnitreeG1/
├── models/ # Trained models and checkpoints
│ └── _/
├── videos/ # Generated videos (.mp4, .gif)
└── OpenTrack/ # Cloned repository
 ├── data/mocap -> ../datasets/ (symlink)
 └── logs -> ../models/ (symlink)
```

**Next Steps:**
- All mocap data is in `/data/workspaces/opentrack/datasets/`
- Run full training with GPU support
- Test on rough terrain
- Experiment with reference motion playback
- Trained models are saved in `/data/workspaces/opentrack/models/`
- Generated videos are in `/data/workspaces/opentrack/videos/`

**Troubleshooting:**
- If videos aren't generated, check that `--use_renderer` flag is used (not `--use_viewer`)
- Ensure MuJoCo can run headless (may need `xvfb` on some systems)
- Check `/data/workspaces/opentrack/models/` directory for experiment outputs
- All data persists in `/data/workspaces/opentrack/` across notebook sessions