from src.execution_model import ScheduleConfig from src.strategies import ( generate_1f1b_interleave_overlap_schedule, generate_1f1b_interleave_schedule, generate_1f1b_overlap_schedule, generate_1f1b_schedule, generate_zero_bubble_1p_schedule, generate_dualpipe_schedule, ) from src.visualizer import visualize_pipeline_parallelism_dash import hydra from omegaconf import DictConfig, OmegaConf @hydra.main(config_path="conf", config_name="config", version_base=None) def main(cfg: DictConfig) -> None: """Run pipeline parallelism simulation with the specified configuration.""" print(f"Running with configuration: {cfg}") if cfg.strategy == "1f1b": run_1f1b(cfg) elif cfg.strategy == "interleave": run_interleave(cfg) elif cfg.strategy == "zb1p": run_zero_bubble_1p(cfg) elif cfg.strategy == "1f1b_overlap": run_1f1b_overlap(cfg) elif cfg.strategy == "1f1b_interleave_overlap": run_1f1b_interleave_overlap(cfg) elif cfg.strategy == "dualpipe": run_dualpipe(cfg) else: raise ValueError(f"Unknown strategy: {cfg.strategy}") def run_1f1b(cfg: DictConfig) -> None: """Run 1F1B pipeline parallelism simulation.""" # Convert OmegaConf to dict for op_times if it exists op_times = ( OmegaConf.to_container(cfg.op_times) if hasattr(cfg, "op_times") else None ) schedule_config = ScheduleConfig( num_devices=cfg.num_devices, num_stages=cfg.num_stages, num_batches=cfg.num_batches, p2p_latency=cfg.p2p_latency, op_times=op_times, placement_strategy="standard", ) schedule = generate_1f1b_schedule(schedule_config) schedule.execute() visualize_pipeline_parallelism_dash(schedule, port=cfg.visualization_port) def run_interleave(cfg: DictConfig) -> None: """Run interleaved pipeline parallelism simulation.""" # Convert OmegaConf to dict for op_times if it exists op_times = ( OmegaConf.to_container(cfg.op_times) if hasattr(cfg, "op_times") else None ) schedule_config = ScheduleConfig( num_devices=cfg.num_devices, num_stages=cfg.num_stages, num_batches=cfg.num_batches, p2p_latency=cfg.p2p_latency, placement_strategy="interleave", op_times=op_times, ) schedule = generate_1f1b_interleave_schedule(schedule_config) schedule.execute() visualize_pipeline_parallelism_dash(schedule, port=cfg.visualization_port) def run_zero_bubble_1p(cfg: DictConfig) -> None: """Run zero bubble 1P pipeline parallelism simulation.""" # Convert OmegaConf to dict for op_times if it exists op_times = ( OmegaConf.to_container(cfg.op_times) if hasattr(cfg, "op_times") else None ) schedule_config = ScheduleConfig( num_devices=cfg.num_devices, num_stages=cfg.num_stages, num_batches=cfg.num_batches, p2p_latency=cfg.p2p_latency, op_times=op_times, split_backward=True, ) schedule = generate_zero_bubble_1p_schedule(schedule_config) schedule.execute() visualize_pipeline_parallelism_dash(schedule, port=cfg.visualization_port) def run_1f1b_overlap(cfg: DictConfig) -> None: """Run 1F1B overlap pipeline parallelism simulation.""" # Convert OmegaConf to dict for op_times if it exists op_times = ( OmegaConf.to_container(cfg.op_times) if hasattr(cfg, "op_times") else None ) schedule_config = ScheduleConfig( num_devices=cfg.num_devices, num_stages=cfg.num_stages, num_batches=cfg.num_batches, p2p_latency=cfg.p2p_latency, op_times=op_times, split_backward=False, ) schedule = generate_1f1b_overlap_schedule(schedule_config) schedule.execute() visualize_pipeline_parallelism_dash(schedule, port=cfg.visualization_port) def run_1f1b_interleave_overlap(cfg: DictConfig) -> None: """Run 1F1B interleave overlapped pipeline parallelism simulation.""" # Convert OmegaConf to dict for op_times if it exists op_times = ( OmegaConf.to_container(cfg.op_times) if hasattr(cfg, "op_times") else None ) schedule_config = ScheduleConfig( num_devices=cfg.num_devices, num_stages=cfg.num_stages, num_batches=cfg.num_batches, p2p_latency=cfg.p2p_latency, placement_strategy="interleave", op_times=op_times, ) schedule = generate_1f1b_interleave_overlap_schedule(schedule_config) schedule.execute() visualize_pipeline_parallelism_dash(schedule, port=cfg.visualization_port) def run_dualpipe(cfg: DictConfig) -> None: """Run DualPipe pipeline parallelism simulation.""" # Convert OmegaConf to dict for op_times if it exists op_times = ( OmegaConf.to_container(cfg.op_times) if hasattr(cfg, "op_times") else None ) schedule_config = ScheduleConfig( num_devices=cfg.num_devices, num_stages=cfg.num_stages, num_batches=cfg.num_batches, p2p_latency=cfg.p2p_latency, op_times=op_times, split_backward=True, placement_strategy="dualpipe", ) schedule = generate_dualpipe_schedule(schedule_config) schedule.execute() visualize_pipeline_parallelism_dash(schedule, port=cfg.visualization_port) if __name__ == "__main__": main()