Spaces:
Paused
Paused
| import os, types, traceback, copy | |
| import json | |
| from enum import Enum | |
| import time | |
| from typing import Callable, Optional | |
| from litellm.utils import ModelResponse, get_secret, Choices, Message, Usage | |
| import litellm | |
| import sys, httpx | |
| from .prompt_templates.factory import prompt_factory, custom_prompt | |
| class GeminiError(Exception): | |
| def __init__(self, status_code, message): | |
| self.status_code = status_code | |
| self.message = message | |
| self.request = httpx.Request( | |
| method="POST", | |
| url="https://developers.generativeai.google/api/python/google/generativeai/chat", | |
| ) | |
| self.response = httpx.Response(status_code=status_code, request=self.request) | |
| super().__init__( | |
| self.message | |
| ) # Call the base class constructor with the parameters it needs | |
| class GeminiConfig: | |
| """ | |
| Reference: https://ai.google.dev/api/python/google/generativeai/GenerationConfig | |
| The class `GeminiConfig` provides configuration for the Gemini's API interface. Here are the parameters: | |
| - `candidate_count` (int): Number of generated responses to return. | |
| - `stop_sequences` (List[str]): The set of character sequences (up to 5) that will stop output generation. If specified, the API will stop at the first appearance of a stop sequence. The stop sequence will not be included as part of the response. | |
| - `max_output_tokens` (int): The maximum number of tokens to include in a candidate. If unset, this will default to output_token_limit specified in the model's specification. | |
| - `temperature` (float): Controls the randomness of the output. Note: The default value varies by model, see the Model.temperature attribute of the Model returned the genai.get_model function. Values can range from [0.0,1.0], inclusive. A value closer to 1.0 will produce responses that are more varied and creative, while a value closer to 0.0 will typically result in more straightforward responses from the model. | |
| - `top_p` (float): Optional. The maximum cumulative probability of tokens to consider when sampling. | |
| - `top_k` (int): Optional. The maximum number of tokens to consider when sampling. | |
| """ | |
| candidate_count: Optional[int] = None | |
| stop_sequences: Optional[list] = None | |
| max_output_tokens: Optional[int] = None | |
| temperature: Optional[float] = None | |
| top_p: Optional[float] = None | |
| top_k: Optional[int] = None | |
| def __init__( | |
| self, | |
| candidate_count: Optional[int] = None, | |
| stop_sequences: Optional[list] = None, | |
| max_output_tokens: Optional[int] = None, | |
| temperature: Optional[float] = None, | |
| top_p: Optional[float] = None, | |
| top_k: Optional[int] = None, | |
| ) -> None: | |
| locals_ = locals() | |
| for key, value in locals_.items(): | |
| if key != "self" and value is not None: | |
| setattr(self.__class__, key, value) | |
| def get_config(cls): | |
| return { | |
| k: v | |
| for k, v in cls.__dict__.items() | |
| if not k.startswith("__") | |
| and not isinstance( | |
| v, | |
| ( | |
| types.FunctionType, | |
| types.BuiltinFunctionType, | |
| classmethod, | |
| staticmethod, | |
| ), | |
| ) | |
| and v is not None | |
| } | |
| def completion( | |
| model: str, | |
| messages: list, | |
| model_response: ModelResponse, | |
| print_verbose: Callable, | |
| api_key, | |
| encoding, | |
| logging_obj, | |
| custom_prompt_dict: dict, | |
| acompletion: bool = False, | |
| optional_params=None, | |
| litellm_params=None, | |
| logger_fn=None, | |
| ): | |
| try: | |
| import google.generativeai as genai | |
| except: | |
| raise Exception( | |
| "Importing google.generativeai failed, please run 'pip install -q google-generativeai" | |
| ) | |
| genai.configure(api_key=api_key) | |
| if model in custom_prompt_dict: | |
| # check if the model has a registered custom prompt | |
| model_prompt_details = custom_prompt_dict[model] | |
| prompt = custom_prompt( | |
| role_dict=model_prompt_details["roles"], | |
| initial_prompt_value=model_prompt_details["initial_prompt_value"], | |
| final_prompt_value=model_prompt_details["final_prompt_value"], | |
| messages=messages, | |
| ) | |
| else: | |
| prompt = prompt_factory( | |
| model=model, messages=messages, custom_llm_provider="gemini" | |
| ) | |
| ## Load Config | |
| inference_params = copy.deepcopy(optional_params) | |
| inference_params.pop( | |
| "stream", None | |
| ) # palm does not support streaming, so we handle this by fake streaming in main.py | |
| config = litellm.GeminiConfig.get_config() | |
| for k, v in config.items(): | |
| if ( | |
| k not in inference_params | |
| ): # completion(top_k=3) > gemini_config(top_k=3) <- allows for dynamic variables to be passed in | |
| inference_params[k] = v | |
| ## LOGGING | |
| logging_obj.pre_call( | |
| input=prompt, | |
| api_key="", | |
| additional_args={"complete_input_dict": {"inference_params": inference_params}}, | |
| ) | |
| ## COMPLETION CALL | |
| try: | |
| _model = genai.GenerativeModel(f"models/{model}") | |
| response = _model.generate_content( | |
| contents=prompt, | |
| generation_config=genai.types.GenerationConfig(**inference_params), | |
| ) | |
| except Exception as e: | |
| raise GeminiError( | |
| message=str(e), | |
| status_code=500, | |
| ) | |
| ## LOGGING | |
| logging_obj.post_call( | |
| input=prompt, | |
| api_key="", | |
| original_response=response, | |
| additional_args={"complete_input_dict": {}}, | |
| ) | |
| print_verbose(f"raw model_response: {response}") | |
| ## RESPONSE OBJECT | |
| completion_response = response | |
| try: | |
| choices_list = [] | |
| for idx, item in enumerate(completion_response.candidates): | |
| if len(item.content.parts) > 0: | |
| message_obj = Message(content=item.content.parts[0].text) | |
| else: | |
| message_obj = Message(content=None) | |
| choice_obj = Choices(index=idx + 1, message=message_obj) | |
| choices_list.append(choice_obj) | |
| model_response["choices"] = choices_list | |
| except Exception as e: | |
| traceback.print_exc() | |
| raise GeminiError( | |
| message=traceback.format_exc(), status_code=response.status_code | |
| ) | |
| try: | |
| completion_response = model_response["choices"][0]["message"].get("content") | |
| if completion_response is None: | |
| raise Exception | |
| except: | |
| original_response = f"response: {response}" | |
| if hasattr(response, "candidates"): | |
| original_response = f"response: {response.candidates}" | |
| if "SAFETY" in original_response: | |
| original_response += "\nThe candidate content was flagged for safety reasons." | |
| elif "RECITATION" in original_response: | |
| original_response += "\nThe candidate content was flagged for recitation reasons." | |
| raise GeminiError( | |
| status_code=400, | |
| message=f"No response received. Original response - {original_response}", | |
| ) | |
| ## CALCULATING USAGE | |
| prompt_str = "" | |
| for m in messages: | |
| if isinstance(m["content"], str): | |
| prompt_str += m["content"] | |
| elif isinstance(m["content"], list): | |
| for content in m["content"]: | |
| if content["type"] == "text": | |
| prompt_str += content["text"] | |
| prompt_tokens = len(encoding.encode(prompt_str)) | |
| completion_tokens = len( | |
| encoding.encode(model_response["choices"][0]["message"].get("content", "")) | |
| ) | |
| model_response["created"] = int(time.time()) | |
| model_response["model"] = "gemini/" + model | |
| usage = Usage( | |
| prompt_tokens=prompt_tokens, | |
| completion_tokens=completion_tokens, | |
| total_tokens=prompt_tokens + completion_tokens, | |
| ) | |
| model_response.usage = usage | |
| return model_response | |
| def embedding(): | |
| # logic for parsing in - calling - parsing out model embedding calls | |
| pass | |