Spaces:
Sleeping
Sleeping
| import sys | |
| from fastchat.conversation import Conversation | |
| from server.model_workers.base import * | |
| from server.utils import get_httpx_client | |
| from fastchat import conversation as conv | |
| import json, httpx | |
| from typing import List, Dict | |
| from configs import logger, log_verbose | |
| class GeminiWorker(ApiModelWorker): | |
| def __init__( | |
| self, | |
| *, | |
| controller_addr: str = None, | |
| worker_addr: str = None, | |
| model_names: List[str] = ["gemini-api"], | |
| **kwargs, | |
| ): | |
| kwargs.update(model_names=model_names, controller_addr=controller_addr, worker_addr=worker_addr) | |
| kwargs.setdefault("context_len", 4096) | |
| super().__init__(**kwargs) | |
| def create_gemini_messages(self, messages) -> json: | |
| has_history = any(msg['role'] == 'assistant' for msg in messages) | |
| gemini_msg = [] | |
| for msg in messages: | |
| role = msg['role'] | |
| content = msg['content'] | |
| if role == 'system': | |
| continue | |
| if has_history: | |
| if role == 'assistant': | |
| role = "model" | |
| transformed_msg = {"role": role, "parts": [{"text": content}]} | |
| else: | |
| if role == 'user': | |
| transformed_msg = {"parts": [{"text": content}]} | |
| gemini_msg.append(transformed_msg) | |
| msg = dict(contents=gemini_msg) | |
| return msg | |
| def do_chat(self, params: ApiChatParams) -> Dict: | |
| params.load_config(self.model_names[0]) | |
| data = self.create_gemini_messages(messages=params.messages) | |
| generationConfig = dict( | |
| temperature=params.temperature, | |
| topK=1, | |
| topP=1, | |
| maxOutputTokens=4096, | |
| stopSequences=[] | |
| ) | |
| data['generationConfig'] = generationConfig | |
| url = "https://generativelanguage.googleapis.com/v1beta/models/gemini-pro:generateContent" + '?key=' + params.api_key | |
| headers = { | |
| 'Content-Type': 'application/json', | |
| } | |
| if log_verbose: | |
| logger.info(f'{self.__class__.__name__}:url: {url}') | |
| logger.info(f'{self.__class__.__name__}:headers: {headers}') | |
| logger.info(f'{self.__class__.__name__}:data: {data}') | |
| text = "" | |
| json_string = "" | |
| timeout = httpx.Timeout(60.0) | |
| client = get_httpx_client(timeout=timeout) | |
| with client.stream("POST", url, headers=headers, json=data) as response: | |
| for line in response.iter_lines(): | |
| line = line.strip() | |
| if not line or "[DONE]" in line: | |
| continue | |
| json_string += line | |
| try: | |
| resp = json.loads(json_string) | |
| if 'candidates' in resp: | |
| for candidate in resp['candidates']: | |
| content = candidate.get('content', {}) | |
| parts = content.get('parts', []) | |
| for part in parts: | |
| if 'text' in part: | |
| text += part['text'] | |
| yield { | |
| "error_code": 0, | |
| "text": text | |
| } | |
| print(text) | |
| except json.JSONDecodeError as e: | |
| print("Failed to decode JSON:", e) | |
| print("Invalid JSON string:", json_string) | |
| def get_embeddings(self, params): | |
| print("embedding") | |
| print(params) | |
| def make_conv_template(self, conv_template: str = None, model_path: str = None) -> Conversation: | |
| return conv.Conversation( | |
| name=self.model_names[0], | |
| system_message="You are a helpful, respectful and honest assistant.", | |
| messages=[], | |
| roles=["user", "assistant"], | |
| sep="\n### ", | |
| stop_str="###", | |
| ) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| from server.utils import MakeFastAPIOffline | |
| from fastchat.serve.base_model_worker import app | |
| worker = GeminiWorker( | |
| controller_addr="http://127.0.0.1:20001", | |
| worker_addr="http://127.0.0.1:21012", | |
| ) | |
| sys.modules["fastchat.serve.model_worker"].worker = worker | |
| MakeFastAPIOffline(app) | |
| uvicorn.run(app, port=21012) | |