Spaces:
Running
Running
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| from matplotlib.patches import Rectangle | |
| from typing import List, Dict, Literal | |
| def visualize_pipeline_parallelism( | |
| schedule: Dict[int, List[Dict]], | |
| schedule_type: Literal["simple", "1f1b"] = "1f1b", | |
| output_file: str = "pipeline_visualization.png", | |
| ): | |
| """ | |
| Visualize pipeline parallelism scheduling. | |
| Args: | |
| schedule: Dictionary mapping device IDs to lists of tasks. | |
| Each task is a dictionary with keys: | |
| - 'type': 'forward' or 'backward' | |
| - 'batch': batch number | |
| - 'start_time': start time of the task | |
| - 'duration': duration of the task | |
| schedule_type: Type of scheduling algorithm used ("simple" or "1f1b") | |
| output_file: Path to save the visualization | |
| """ | |
| # Colors for forward and backward passes | |
| forward_color = "royalblue" | |
| backward_color = "lightgreen" | |
| empty_color = "lightgray" | |
| # Find the number of stages (devices) | |
| num_stages = len(schedule) | |
| # Find the maximum time in the schedule | |
| max_time = 0 | |
| for device in schedule: | |
| for task in schedule[device]: | |
| end_time = task["start_time"] + task["duration"] | |
| if end_time > max_time: | |
| max_time = end_time | |
| # Create figure and axis | |
| fig, ax = plt.subplots(figsize=(15, 5)) | |
| # Plot the schedule | |
| for device_idx, device in enumerate(schedule): | |
| for task in schedule[device]: | |
| color = forward_color if task["type"] == "forward" else backward_color | |
| rect = Rectangle( | |
| (task["start_time"], device_idx), | |
| task["duration"], | |
| 0.8, | |
| edgecolor="black", | |
| facecolor=color, | |
| alpha=0.8, | |
| ) | |
| ax.add_patch(rect) | |
| # Add text (batch number) | |
| ax.text( | |
| task["start_time"] + task["duration"] / 2, | |
| device_idx + 0.4, | |
| str(task["batch"]), | |
| ha="center", | |
| va="center", | |
| fontsize=10, | |
| fontweight="bold", | |
| color="white" if task["type"] == "forward" else "black", | |
| ) | |
| # Set axis limits and labels | |
| ax.set_xlim(0, max_time * 1.05) | |
| ax.set_ylim(-0.2, num_stages + 0.2) | |
| ax.set_yticks(np.arange(num_stages) + 0.4) | |
| ax.set_yticklabels([f"Device {i+1}" for i in range(num_stages)]) | |
| ax.set_xlabel("Time") | |
| ax.set_title(f"Pipeline Parallelism Schedule ({schedule_type})") | |
| # Add a legend | |
| forward_patch = Rectangle((0, 0), 1, 1, facecolor=forward_color) | |
| backward_patch = Rectangle((0, 0), 1, 1, facecolor=backward_color) | |
| ax.legend( | |
| [forward_patch, backward_patch], | |
| ["Forward Pass", "Backward Pass"], | |
| loc="upper center", | |
| bbox_to_anchor=(0.5, -0.15), | |
| ncol=2, | |
| ) | |
| # Add grid | |
| ax.grid(True, linestyle="--", alpha=0.7) | |
| # Save the figure | |
| plt.tight_layout() | |
| plt.savefig(output_file, dpi=300, bbox_inches="tight") | |
| plt.close() | |
| print(f"Visualization saved to {output_file}") | |