|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging |
|
|
import threading |
|
|
import time |
|
|
from typing import Any, Dict, List, Optional, Union |
|
|
|
|
|
from openai import OpenAI, Stream |
|
|
|
|
|
from camel.configs import SGLANG_API_PARAMS, SGLangConfig |
|
|
from camel.messages import OpenAIMessage |
|
|
from camel.models import BaseModelBackend |
|
|
from camel.types import ( |
|
|
ChatCompletion, |
|
|
ChatCompletionChunk, |
|
|
ModelType, |
|
|
) |
|
|
from camel.utils import BaseTokenCounter, OpenAITokenCounter |
|
|
|
|
|
|
|
|
class SGLangModel(BaseModelBackend): |
|
|
r"""SGLang service interface. |
|
|
|
|
|
Args: |
|
|
model_type (Union[ModelType, str]): Model for which a backend is |
|
|
created. |
|
|
model_config_dict (Optional[Dict[str, Any]], optional): A dictionary |
|
|
that will be fed into:obj:`openai.ChatCompletion.create()`. If |
|
|
:obj:`None`, :obj:`SGLangConfig().as_dict()` will be used. |
|
|
(default: :obj:`None`) |
|
|
api_key (Optional[str], optional): The API key for authenticating with |
|
|
the model service. SGLang doesn't need API key, it would be ignored |
|
|
if set. (default: :obj:`None`) |
|
|
url (Optional[str], optional): The url to the model service. If not |
|
|
provided, :obj:`"http://127.0.0.1:30000/v1"` will be used. |
|
|
(default: :obj:`None`) |
|
|
token_counter (Optional[BaseTokenCounter], optional): Token counter to |
|
|
use for the model. If not provided, :obj:`OpenAITokenCounter( |
|
|
ModelType.GPT_4O_MINI)` will be used. |
|
|
(default: :obj:`None`) |
|
|
|
|
|
Reference: https://sgl-project.github.io/backend/openai_api_completions.html |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
model_type: Union[ModelType, str], |
|
|
model_config_dict: Optional[Dict[str, Any]] = None, |
|
|
api_key: Optional[str] = None, |
|
|
url: Optional[str] = None, |
|
|
token_counter: Optional[BaseTokenCounter] = None, |
|
|
) -> None: |
|
|
if model_config_dict is None: |
|
|
model_config_dict = SGLangConfig().as_dict() |
|
|
|
|
|
self.server_process = None |
|
|
self.last_run_time: Optional[float] = ( |
|
|
None |
|
|
) |
|
|
self._lock = threading.Lock() |
|
|
self._inactivity_thread: Optional[threading.Thread] = None |
|
|
|
|
|
super().__init__( |
|
|
model_type, model_config_dict, api_key, url, token_counter |
|
|
) |
|
|
|
|
|
self._client = None |
|
|
|
|
|
if self._url: |
|
|
|
|
|
self._client = OpenAI( |
|
|
timeout=180, |
|
|
max_retries=3, |
|
|
api_key="Set-but-ignored", |
|
|
base_url=self._url, |
|
|
) |
|
|
|
|
|
def _start_server(self) -> None: |
|
|
from sglang.utils import ( |
|
|
execute_shell_command, |
|
|
wait_for_server, |
|
|
) |
|
|
|
|
|
try: |
|
|
if not self._url: |
|
|
cmd = ( |
|
|
f"python -m sglang.launch_server " |
|
|
f"--model-path {self.model_type} " |
|
|
f"--port 30000 " |
|
|
f"--host 0.0.0.0" |
|
|
) |
|
|
|
|
|
server_process = execute_shell_command(cmd) |
|
|
wait_for_server("http://localhost:30000") |
|
|
self._url = "http://127.0.0.1:30000/v1" |
|
|
self.server_process = server_process |
|
|
|
|
|
self._inactivity_thread = threading.Thread( |
|
|
target=self._monitor_inactivity, daemon=True |
|
|
) |
|
|
self._inactivity_thread.start() |
|
|
self.last_run_time = time.time() |
|
|
|
|
|
self._client = OpenAI( |
|
|
timeout=180, |
|
|
max_retries=3, |
|
|
api_key="Set-but-ignored", |
|
|
base_url=self._url, |
|
|
) |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Failed to start SGLang server: {e}") from e |
|
|
|
|
|
def _ensure_server_running(self) -> None: |
|
|
r"""Ensures that the server is running. If not, starts the server.""" |
|
|
with self._lock: |
|
|
if self.server_process is None: |
|
|
self._start_server() |
|
|
|
|
|
def _monitor_inactivity(self): |
|
|
r"""Monitor whether the server process has been inactive for over 10 |
|
|
minutes. |
|
|
""" |
|
|
from sglang.utils import terminate_process |
|
|
|
|
|
while True: |
|
|
|
|
|
time.sleep(10) |
|
|
|
|
|
with self._lock: |
|
|
|
|
|
if self.last_run_time and ( |
|
|
time.time() - self.last_run_time > 600 |
|
|
): |
|
|
if self.server_process: |
|
|
terminate_process(self.server_process) |
|
|
self.server_process = None |
|
|
self._client = None |
|
|
logging.info( |
|
|
"Server process terminated due to inactivity." |
|
|
) |
|
|
break |
|
|
|
|
|
@property |
|
|
def token_counter(self) -> BaseTokenCounter: |
|
|
r"""Initialize the token counter for the model backend. |
|
|
|
|
|
Returns: |
|
|
BaseTokenCounter: The token counter following the model's |
|
|
tokenization style. |
|
|
""" |
|
|
if not self._token_counter: |
|
|
self._token_counter = OpenAITokenCounter(ModelType.GPT_4O_MINI) |
|
|
return self._token_counter |
|
|
|
|
|
def check_model_config(self): |
|
|
r"""Check whether the model configuration contains any |
|
|
unexpected arguments to SGLang API. |
|
|
|
|
|
Raises: |
|
|
ValueError: If the model configuration dictionary contains any |
|
|
unexpected arguments to OpenAI API. |
|
|
""" |
|
|
for param in self.model_config_dict: |
|
|
if param not in SGLANG_API_PARAMS: |
|
|
raise ValueError( |
|
|
f"Unexpected argument `{param}` is " |
|
|
"input into SGLang model backend." |
|
|
) |
|
|
|
|
|
def run( |
|
|
self, |
|
|
messages: List[OpenAIMessage], |
|
|
) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]: |
|
|
r"""Runs inference of OpenAI chat completion. |
|
|
|
|
|
Args: |
|
|
messages (List[OpenAIMessage]): Message list with the chat history |
|
|
in OpenAI API format. |
|
|
|
|
|
Returns: |
|
|
Union[ChatCompletion, Stream[ChatCompletionChunk]]: |
|
|
`ChatCompletion` in the non-stream mode, or |
|
|
`Stream[ChatCompletionChunk]` in the stream mode. |
|
|
""" |
|
|
|
|
|
|
|
|
self._ensure_server_running() |
|
|
|
|
|
with self._lock: |
|
|
|
|
|
self.last_run_time = time.time() |
|
|
|
|
|
if self._client is None: |
|
|
raise RuntimeError( |
|
|
"Client is not initialized. Ensure the server is running." |
|
|
) |
|
|
|
|
|
response = self._client.chat.completions.create( |
|
|
messages=messages, |
|
|
model=self.model_type, |
|
|
**self.model_config_dict, |
|
|
) |
|
|
|
|
|
return response |
|
|
|
|
|
@property |
|
|
def stream(self) -> bool: |
|
|
r"""Returns whether the model is in stream mode, which sends partial |
|
|
results each time. |
|
|
|
|
|
Returns: |
|
|
bool: Whether the model is in stream mode. |
|
|
""" |
|
|
return self.model_config_dict.get('stream', False) |
|
|
|