Spaces:
Runtime error
Runtime error
| from typing import Dict, List, Set, Tuple, Optional, Set | |
| import argparse | |
| from vllm.config import (CacheConfig, DeviceConfig, LoadConfig, LoRAConfig, | |
| ModelConfig, ParallelConfig, SchedulerConfig, | |
| SpeculativeConfig, VisionLanguageConfig) | |
| from vllm.executor.executor_base import ExecutorAsyncBase, ExecutorBase | |
| from vllm.logger import init_logger | |
| from vllm.lora.request import LoRARequest | |
| from vllm.sequence import SamplerOutput, SequenceGroupMetadata | |
| from vllm.utils import (get_distributed_init_method, get_ip, get_open_port, | |
| make_async) | |
| logger = init_logger(__name__) | |
| class GPUExecutor(ExecutorBase): | |
| def __init__( | |
| self, | |
| args: argparse.ArgumentParser, | |
| model_config: ModelConfig, | |
| cache_config: CacheConfig, | |
| parallel_config: ParallelConfig, | |
| scheduler_config: SchedulerConfig, | |
| device_config: DeviceConfig, | |
| load_config: LoadConfig, | |
| lora_config: Optional[LoRAConfig], | |
| vision_language_config: Optional[VisionLanguageConfig], | |
| speculative_config: Optional[SpeculativeConfig], | |
| ) -> None: | |
| self.args = args | |
| self.model_config = model_config | |
| self.cache_config = cache_config | |
| self.lora_config = lora_config | |
| self.load_config = load_config | |
| self.parallel_config = parallel_config | |
| self.scheduler_config = scheduler_config | |
| self.device_config = device_config | |
| self.vision_language_config = vision_language_config | |
| self.speculative_config = speculative_config | |
| self._init_executor() | |
| def _init_executor(self) -> None: | |
| """Initialize the worker and load the model. | |
| If speculative decoding is enabled, we instead create the speculative | |
| worker. | |
| """ | |
| if self.speculative_config is None: | |
| self._init_non_spec_worker() | |
| else: | |
| self._init_spec_worker() | |
| def _init_non_spec_worker(self): | |
| # Lazy import the Worker to avoid importing torch.cuda/xformers | |
| # before CUDA_VISIBLE_DEVICES is set in the Worker | |
| # from vllm.worker.worker import Worker | |
| from serve.worker import Worker | |
| assert self.parallel_config.world_size == 1, ( | |
| "GPUExecutor only supports single GPU.") | |
| distributed_init_method = get_distributed_init_method( | |
| get_ip(), get_open_port()) | |
| self.driver_worker = Worker( | |
| model_config=self.model_config, | |
| parallel_config=self.parallel_config, | |
| scheduler_config=self.scheduler_config, | |
| device_config=self.device_config, | |
| cache_config=self.cache_config, | |
| load_config=self.load_config, | |
| local_rank=0, | |
| rank=0, | |
| distributed_init_method=distributed_init_method, | |
| lora_config=self.lora_config, | |
| vision_language_config=self.vision_language_config, | |
| is_driver_worker=True, | |
| ) | |
| self.driver_worker.init_device() | |
| self.driver_worker.load_model(self.args) | |
| def _init_spec_worker(self): | |
| """Initialize a SpecDecodeWorker, using a draft model for proposals. | |
| """ | |
| assert self.speculative_config is not None | |
| from vllm.spec_decode.multi_step_worker import MultiStepWorker | |
| from vllm.spec_decode.spec_decode_worker import SpecDecodeWorker | |
| from vllm.worker.worker import Worker | |
| distributed_init_method = get_distributed_init_method( | |
| get_ip(), get_open_port()) | |
| target_worker = Worker( | |
| model_config=self.model_config, | |
| parallel_config=self.parallel_config, | |
| scheduler_config=self.scheduler_config, | |
| device_config=self.device_config, | |
| cache_config=self.cache_config, | |
| load_config=self.load_config, | |
| local_rank=0, | |
| rank=0, | |
| distributed_init_method=distributed_init_method, | |
| lora_config=self.lora_config, | |
| vision_language_config=self.vision_language_config, | |
| is_driver_worker=True, | |
| ) | |
| draft_worker = MultiStepWorker( | |
| model_config=self.speculative_config.draft_model_config, | |
| parallel_config=self.speculative_config.draft_parallel_config, | |
| scheduler_config=self.scheduler_config, | |
| device_config=self.device_config, | |
| cache_config=self.cache_config, | |
| load_config=self.load_config, | |
| local_rank=0, | |
| rank=0, | |
| distributed_init_method=distributed_init_method, | |
| lora_config=self.lora_config, | |
| vision_language_config=self.vision_language_config, | |
| is_driver_worker=True, | |
| ) | |
| spec_decode_worker = SpecDecodeWorker.from_workers( | |
| proposer_worker=draft_worker, scorer_worker=target_worker) | |
| assert self.parallel_config.world_size == 1, ( | |
| "GPUExecutor only supports single GPU.") | |
| self.driver_worker = spec_decode_worker | |
| # Load model handled in spec decode worker. | |
| self.driver_worker.init_device() | |
| def determine_num_available_blocks(self) -> Tuple[int, int]: | |
| """Determine the number of available KV blocks by invoking the | |
| underlying worker. | |
| """ | |
| return self.driver_worker.determine_num_available_blocks() | |
| def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks) -> None: | |
| """Initialize the KV cache by invoking the underlying worker. | |
| """ | |
| # NOTE: This is logged in the executor because there can be >1 worker | |
| # with other executors. We could log in the engine level, but work | |
| # remains to abstract away the device for non-GPU configurations. | |
| logger.info(f"# GPU blocks: {num_gpu_blocks}, " | |
| f"# CPU blocks: {num_cpu_blocks}") | |
| self.driver_worker.initialize_cache(num_gpu_blocks, num_cpu_blocks) | |
| def execute_model( | |
| self, | |
| seq_group_metadata_list: List[SequenceGroupMetadata], | |
| blocks_to_swap_in: Dict[int, int], | |
| blocks_to_swap_out: Dict[int, int], | |
| blocks_to_copy: Dict[int, List[int]], | |
| num_lookahead_slots: int, | |
| ) -> List[SamplerOutput]: | |
| output = self.driver_worker.execute_model( | |
| seq_group_metadata_list=seq_group_metadata_list, | |
| blocks_to_swap_in=blocks_to_swap_in, | |
| blocks_to_swap_out=blocks_to_swap_out, | |
| blocks_to_copy=blocks_to_copy, | |
| num_lookahead_slots=num_lookahead_slots, | |
| ) | |
| return output | |
| def add_lora(self, lora_request: LoRARequest) -> bool: | |
| assert lora_request.lora_int_id > 0, "lora_id must be greater than 0." | |
| return self.driver_worker.add_lora(lora_request) | |
| def remove_lora(self, lora_id: int) -> bool: | |
| assert lora_id > 0, "lora_id must be greater than 0." | |
| return self.driver_worker.remove_lora(lora_id) | |
| def list_loras(self) -> Set[int]: | |
| return self.driver_worker.list_loras() | |
| def check_health(self) -> None: | |
| # GPUExecutor will always be healthy as long as | |
| # it's running. | |
| return | |
| class GPUExecutorAsync(GPUExecutor, ExecutorAsyncBase): | |
| async def execute_model_async( | |
| self, | |
| seq_group_metadata_list: List[SequenceGroupMetadata], | |
| blocks_to_swap_in: Dict[int, int], | |
| blocks_to_swap_out: Dict[int, int], | |
| blocks_to_copy: Dict[int, List[int]], | |
| ) -> SamplerOutput: | |
| output = await make_async(self.driver_worker.execute_model)( | |
| seq_group_metadata_list=seq_group_metadata_list, | |
| blocks_to_swap_in=blocks_to_swap_in, | |
| blocks_to_swap_out=blocks_to_swap_out, | |
| blocks_to_copy=blocks_to_copy) | |
| return output |