Spaces:
Running
Running
| import os | |
| import json | |
| import time | |
| import yaml | |
| import random | |
| import shortuuid | |
| import requests | |
| from typing import Optional | |
| import boto3 | |
| from glob import glob | |
| from tqdm import tqdm | |
| # API setting constants | |
| API_MAX_RETRY = 50 | |
| API_RETRY_SLEEP = 10 | |
| API_ERROR_OUTPUT = None | |
| registered_api_completion = {} | |
| registered_engine_completion = {} | |
| def register_api(api_type): | |
| def decorator(func): | |
| registered_api_completion[api_type] = func | |
| return func | |
| return decorator | |
| def register_engine(engine_type): | |
| def decorator(func): | |
| registered_engine_completion[engine_type] = func | |
| return func | |
| return decorator | |
| def load_questions(question_file: str): | |
| """Load questions from a file.""" | |
| questions = [] | |
| with open(question_file, "r") as ques_file: | |
| for line in ques_file: | |
| if line: | |
| questions.append(json.loads(line)) | |
| return questions | |
| def load_model_answers(answer_dir: str): | |
| """Load model answers. | |
| The return value is a python dict of type: | |
| Dict[model_name: str -> Dict[uid: int -> answer: dict]] | |
| """ | |
| if not os.path.exists(answer_dir): | |
| return {} | |
| filenames = [] | |
| for folder in os.listdir(answer_dir): | |
| if not os.path.isdir(os.path.join(answer_dir, folder)): | |
| continue | |
| if not os.path.exists(os.path.join(answer_dir, folder, "generation.jsonl")): | |
| continue | |
| filenames.append(os.path.join(answer_dir, folder, "generation.jsonl")) | |
| filenames.sort() | |
| model_answers = {} | |
| for filename in filenames: | |
| # Use parent directory name as model name | |
| model_name = os.path.basename(os.path.dirname(filename)) | |
| answer = {} | |
| with open(filename) as fin: | |
| for line in fin: | |
| line = json.loads(line) | |
| answer[line["uid"]] = line | |
| model_answers[model_name] = answer | |
| return model_answers | |
| def load_model_judgements(answer_dir: str): | |
| """Load model judgements. | |
| The return value is a python dict of type: | |
| Dict[model_name: str -> Dict[uid: int -> answer: dict]] | |
| """ | |
| filenames = glob(os.path.join(answer_dir, "*.jsonl")) | |
| filenames.sort() | |
| model_answers = {} | |
| for filename in filenames: | |
| model_name = os.path.basename(filename)[:-6] | |
| answer = {} | |
| with open(filename) as fin: | |
| for line in fin: | |
| line = json.loads(line) | |
| answer[line["uid"]] = line | |
| model_answers[model_name] = answer | |
| return model_answers | |
| def load_model_answers_and_execution_results(data_dir: str): | |
| """Load model answers and execution results. | |
| The return value is a python dict of type: | |
| Dict[model_name: str -> Dict[uid: int -> answer: dict]] | |
| """ | |
| filenames = [] | |
| for folder in os.listdir(data_dir): | |
| if not os.path.isdir(os.path.join(data_dir, folder)): | |
| continue | |
| if not os.path.exists(os.path.join(data_dir, folder, "execution_results.jsonl")): | |
| continue | |
| filenames.append(os.path.join(data_dir, folder, "execution_results.jsonl")) | |
| filenames.sort() | |
| model_answers = {} | |
| for filename in filenames: | |
| # Use parent directory name as model name | |
| model_name = os.path.basename(os.path.dirname(filename)) | |
| answer = {} | |
| with open(filename) as fin: | |
| for line in fin: | |
| line = json.loads(line) | |
| answer[line["uid"]] = line | |
| model_answers[model_name] = answer | |
| return model_answers | |
| def load_id_to_model_answers(answer_dir: str): | |
| """Load model answers. | |
| The return value is a python dict of type: | |
| Dict[model_name: str -> Dict[uid: int -> answer: dict]] | |
| """ | |
| filenames = glob(os.path.join(answer_dir, "*.jsonl")) | |
| filenames.sort() | |
| model_answers = {} | |
| for filename in filenames: | |
| model_name = os.path.basename(filename)[:-6] | |
| with open(filename) as fin: | |
| for line in fin: | |
| line = json.loads(line) | |
| if line["uid"] in model_answers: | |
| model_answers[line["uid"]][model_name] = line | |
| else: | |
| model_answers[line["uid"]] = {model_name: line} | |
| return model_answers | |
| def get_endpoint(endpoint_list): | |
| if endpoint_list is None: | |
| return None | |
| assert endpoint_list is not None | |
| # randomly pick one | |
| api_dict = random.choices( | |
| endpoint_list | |
| )[0] | |
| return api_dict | |
| # load config args from config yaml files | |
| def make_config(config_file: str) -> dict: | |
| config_kwargs = {} | |
| with open(config_file, "r") as f: | |
| config_kwargs = yaml.load(f, Loader=yaml.SafeLoader) | |
| return config_kwargs | |
| def chat_completion_openai(model, messages, temperature, max_tokens, api_dict=None, **kwargs): | |
| import openai | |
| if api_dict: | |
| client = openai.OpenAI( | |
| base_url=api_dict["api_base"], | |
| api_key=api_dict["api_key"], | |
| ) | |
| else: | |
| client = openai.OpenAI() | |
| if api_dict and "model_name" in api_dict: | |
| model = api_dict["model_name"] | |
| output = API_ERROR_OUTPUT | |
| for _ in range(API_MAX_RETRY): | |
| try: | |
| completion = client.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| ) | |
| output = { | |
| "answer": completion.choices[0].message.content | |
| } | |
| break | |
| except openai.RateLimitError as e: | |
| time.sleep(API_RETRY_SLEEP) | |
| except openai.BadRequestError as e: | |
| break | |
| except KeyError: | |
| break | |
| return output | |
| def chat_completion_openai_streaming(model, messages, temperature, max_tokens, api_dict=None, **kwargs): | |
| """Streaming version of OpenAI completion that yields tokens as they arrive""" | |
| import openai | |
| if api_dict: | |
| client = openai.OpenAI( | |
| base_url=api_dict["api_base"], | |
| api_key=api_dict["api_key"], | |
| ) | |
| else: | |
| client = openai.OpenAI() | |
| if api_dict and "model_name" in api_dict: | |
| model = api_dict["model_name"] | |
| try: | |
| stream = client.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| stream=True | |
| ) | |
| for chunk in stream: | |
| if chunk.choices[0].delta.content is not None: | |
| yield chunk.choices[0].delta.content | |
| except Exception as e: | |
| print(f"Error in streaming completion: {e}") | |
| yield f"Error: {str(e)}" | |
| def chat_completion_openai_thinking(model, messages, api_dict=None, **kwargs): | |
| import openai | |
| if api_dict: | |
| client = openai.OpenAI( | |
| api_key=api_dict["api_key"], | |
| base_url=api_dict["api_base"], | |
| ) | |
| else: | |
| client = openai.OpenAI() | |
| output = API_ERROR_OUTPUT | |
| for i in range(API_MAX_RETRY): | |
| try: | |
| completion = client.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| reasoning_effort=kwargs['reasoning_effort'] if 'reasoning_effort' in kwargs else 'medium', | |
| ) | |
| output = { | |
| "answer": completion.choices[0].message.content | |
| } | |
| break | |
| except openai.RateLimitError as e: | |
| time.sleep(API_RETRY_SLEEP) | |
| except openai.BadRequestError as e: | |
| break | |
| except KeyError: | |
| break | |
| return output | |
| def chat_completion_deepseek_reasoner(messages, api_dict, **kwargs): | |
| import urllib.request | |
| chat_endpoint_headers = { | |
| "User-Agent": "curl/8.7.1", | |
| "Authorization": "Bearer {}".format(api_dict['api_key']), | |
| "Content-Type": "application/json", | |
| "Accept": "application/json", | |
| } | |
| chat_endpoint_url = "https://api.deepseek.com/chat/completions" | |
| req_body = { | |
| "messages": messages, | |
| "model": "deepseek-reasoner", | |
| "stream": False, | |
| } | |
| req_data = json.dumps(req_body).encode("utf-8") | |
| output = API_ERROR_OUTPUT | |
| for i in range(API_MAX_RETRY): | |
| try: | |
| req = urllib.request.Request( | |
| chat_endpoint_url, | |
| headers = chat_endpoint_headers.copy(), | |
| data = req_data, | |
| ) | |
| with urllib.request.urlopen(req) as res: | |
| res_data = res.read() | |
| res_body = json.loads(res_data.decode("utf-8")) | |
| output = { | |
| "thought": res_body["choices"][0]["message"]["reasoning_content"], | |
| "answer": res_body["choices"][0]["message"]["content"], | |
| } | |
| break | |
| except Exception as e: | |
| time.sleep(API_RETRY_SLEEP) | |
| return output | |
| def chat_completion_deepseek(messages, max_tokens, api_dict, **kwargs): | |
| import urllib.request | |
| chat_endpoint_headers = { | |
| "User-Agent": "curl/8.7.1", | |
| "Authorization": "Bearer {}".format(api_dict['api_key']), | |
| "Content-Type": "application/json", | |
| "Accept": "application/json", | |
| } | |
| chat_endpoint_url = "https://api.deepseek.com/chat/completions" | |
| req_body = { | |
| "messages": messages, | |
| "model": "deepseek-chat", | |
| "stream": False, | |
| "max_tokens": max_tokens, | |
| } | |
| req_data = json.dumps(req_body).encode("utf-8") | |
| output = API_ERROR_OUTPUT | |
| for i in range(API_MAX_RETRY): | |
| try: | |
| req = urllib.request.Request( | |
| chat_endpoint_url, | |
| headers = chat_endpoint_headers.copy(), | |
| data = req_data, | |
| ) | |
| with urllib.request.urlopen(req) as res: | |
| res_data = res.read() | |
| res_body = json.loads(res_data.decode("utf-8")) | |
| output = { | |
| "answer": res_body["choices"][0]["message"]["content"], | |
| } | |
| break | |
| except Exception as e: | |
| time.sleep(API_RETRY_SLEEP) | |
| return output | |
| def chat_completion_anthropic(model, messages, temperature, max_tokens, api_dict=None, **kwargs): | |
| import anthropic | |
| if api_dict: | |
| api_key = api_dict["api_key"] | |
| else: | |
| api_key = os.environ["ANTHROPIC_API_KEY"] | |
| sys_msg = "" | |
| if messages[0]["role"] == "system": | |
| sys_msg = messages[0]["content"] | |
| messages = messages[1:] | |
| output = API_ERROR_OUTPUT | |
| for _ in range(API_MAX_RETRY): | |
| try: | |
| c = anthropic.Anthropic(api_key=api_key) | |
| response = c.messages.create( | |
| model=model, | |
| messages=messages, | |
| stop_sequences=[anthropic.HUMAN_PROMPT], | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| system=sys_msg | |
| ) | |
| output = { | |
| "answer": response.content[0].text | |
| } | |
| break | |
| except anthropic.APIError as e: | |
| time.sleep(API_RETRY_SLEEP) | |
| return output | |
| def chat_completion_anthropic_thinking(model, messages, max_tokens, budget_tokens, **kwargs): | |
| import anthropic | |
| client = anthropic.Anthropic( | |
| timeout=1200, | |
| ) | |
| output = API_ERROR_OUTPUT | |
| for _ in range(API_MAX_RETRY): | |
| try: | |
| response = client.messages.create( | |
| model=model, | |
| max_tokens=max_tokens, | |
| thinking={ | |
| "type": "enabled", | |
| "budget_tokens": budget_tokens | |
| }, | |
| messages=messages, | |
| ) | |
| output = { | |
| "thought": response.content[0].thinking, | |
| "answer": response.content[1].text, | |
| } | |
| break | |
| except anthropic.APIError as e: | |
| time.sleep(API_RETRY_SLEEP) | |
| return output | |
| def chat_completion_mistral(model, messages, temperature, max_tokens, **kwargs): | |
| from mistralai.client import MistralClient | |
| from mistralai.models.chat_completion import ChatMessage | |
| from mistralai.exceptions import MistralException | |
| api_key = os.environ["MISTRAL_API_KEY"] | |
| client = MistralClient(api_key=api_key) | |
| prompts = [ChatMessage(role=message["role"], content=message["content"]) for message in messages] | |
| output = API_ERROR_OUTPUT | |
| for _ in range(API_MAX_RETRY): | |
| try: | |
| chat_response = client.chat( | |
| model=model, | |
| messages=prompts, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| ) | |
| output = { | |
| "answer": chat_response.choices[0].message.content | |
| } | |
| break | |
| except MistralException as e: | |
| break | |
| return output | |
| def chat_completion_xai(model, messages, temperature, max_tokens, api_dict=None, **kwargs): | |
| import xai_sdk | |
| client = xai_sdk.Client(api_key=api_dict['api_key'], api_host=api_dict['api_base']).compat | |
| output = API_ERROR_OUTPUT | |
| for _ in range(API_MAX_RETRY): | |
| try: | |
| stream = client.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| stream=True, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=0.95, | |
| ) | |
| output_text = "" | |
| for chunk in stream: | |
| if chunk.choices[0].delta.content: | |
| output_text += chunk.choices[0].delta.content | |
| output = { | |
| "answer": output_text | |
| } | |
| break | |
| except Exception as e: | |
| time.sleep(API_RETRY_SLEEP) | |
| return output | |
| def chat_completion_litellm(model, messages, temperature, max_tokens, api_dict=None, **kwargs): | |
| import litellm | |
| output = API_ERROR_OUTPUT | |
| for _ in range(API_MAX_RETRY): | |
| try: | |
| response = litellm.completion( | |
| model=model, | |
| messages=messages, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| ) | |
| output = { | |
| "answer": response.choices[0].message.content | |
| } | |
| break | |
| except Exception as e: | |
| time.sleep(API_RETRY_SLEEP) | |
| return output | |
| def chat_completion_litellm_streaming(model, messages, temperature, max_tokens, api_dict=None, **kwargs): | |
| """Streaming version of litellm completion""" | |
| import litellm | |
| try: | |
| response = litellm.completion( | |
| model=model, | |
| messages=messages, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| stream=True | |
| ) | |
| for chunk in response: | |
| if chunk.choices[0].delta.content is not None: | |
| yield chunk.choices[0].delta.content | |
| except Exception as e: | |
| print(f"Error in litellm streaming completion: {e}") | |
| yield f"Error: {str(e)}" | |
| def chat_completion_anthropic_streaming(model, messages, temperature, max_tokens, api_dict=None, **kwargs): | |
| """Streaming version of Anthropic completion""" | |
| import anthropic | |
| if api_dict: | |
| client = anthropic.Anthropic(api_key=api_dict["api_key"]) | |
| else: | |
| client = anthropic.Anthropic() | |
| try: | |
| # Convert messages to Anthropic format | |
| system_message = "" | |
| conversation_messages = [] | |
| for msg in messages: | |
| if msg["role"] == "system": | |
| system_message = msg["content"] | |
| else: | |
| conversation_messages.append(msg) | |
| stream = client.messages.create( | |
| model=model, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| system=system_message if system_message else None, | |
| messages=conversation_messages, | |
| stream=True | |
| ) | |
| for chunk in stream: | |
| if chunk.type == "content_block_delta" and chunk.delta.text: | |
| yield chunk.delta.text | |
| except Exception as e: | |
| print(f"Error in Anthropic streaming completion: {e}") | |
| yield f"Error: {str(e)}" | |
| def http_completion_gemini(model, messages, **kwargs): | |
| import requests | |
| api_key = os.environ["GEMINI_API_KEY"] | |
| safety_settings = [ | |
| { | |
| "category": "HARM_CATEGORY_HARASSMENT", | |
| "threshold": "BLOCK_NONE" | |
| }, | |
| { | |
| "category": "HARM_CATEGORY_HATE_SPEECH", | |
| "threshold": "BLOCK_NONE" | |
| }, | |
| { | |
| "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", | |
| "threshold": "BLOCK_NONE" | |
| }, | |
| { | |
| "category": "HARM_CATEGORY_DANGEROUS_CONTENT", | |
| "threshold": "BLOCK_NONE" | |
| }, | |
| ] | |
| sys_prompt = None | |
| if messages[0]["role"] == "system": | |
| sys_prompt = { | |
| "parts":[ | |
| {"text": messages[0]["content"]} | |
| ] | |
| } | |
| messages = messages[1:] | |
| role_map = {"user": "user", | |
| "assistant": "model"} | |
| conv = [{"parts":[{"text":turn["content"]}], "role":role_map[turn["role"]]} for turn in messages] | |
| json_request = { | |
| "contents": conv, | |
| "safetySettings": safety_settings, | |
| "systemInstruction": sys_prompt, | |
| } | |
| if "temperature" in kwargs and "max_tokens" in kwargs: | |
| gen_config = { | |
| "temperature": kwargs["temperature"], | |
| "maxOutputTokens": kwargs["max_tokens"], | |
| } | |
| json_request["generationConfig"] = gen_config | |
| elif "temperature" in kwargs: | |
| gen_config = { | |
| "temperature": kwargs["temperature"], | |
| } | |
| json_request["generationConfig"] = gen_config | |
| elif "max_tokens" in kwargs: | |
| gen_config = { | |
| "maxOutputTokens": kwargs["max_tokens"], | |
| } | |
| json_request["generationConfig"] = gen_config | |
| output = API_ERROR_OUTPUT | |
| for _ in range(API_MAX_RETRY): | |
| try: | |
| response = requests.post( | |
| f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={api_key}", | |
| json=json_request, | |
| ) | |
| except Exception as e: | |
| print(f"**API REQUEST ERROR** Reason: {e}.") | |
| time.sleep(API_RETRY_SLEEP) | |
| if response.status_code != 200: | |
| print(f"**API REQUEST ERROR** Reason: status code {response.status_code}.") | |
| time.sleep(API_RETRY_SLEEP) | |
| try: | |
| output = { | |
| "answer": response.json()["candidates"][0]["content"]["parts"][0]["text"], | |
| } | |
| except KeyError as e: | |
| print(response.json()) | |
| return output | |
| def vertex_completion_gemini(model, messages, project_id, regions, **kwargs): | |
| import requests | |
| import subprocess | |
| output = API_ERROR_OUTPUT | |
| # Obtain the access token using gcloud CLI | |
| access_token = subprocess.check_output( | |
| ["gcloud", "auth", "application-default", "print-access-token"], | |
| text=True | |
| ).strip() | |
| if messages[0]["role"] == "system": | |
| data = { | |
| "systemInstruction": { | |
| "role": "system", # ignored by vertexi api (04/18/2025) | |
| "parts": [{ | |
| "text": messages[0]["content"] | |
| }] | |
| }, | |
| } | |
| messages = messages[1:] | |
| else: | |
| data = {} | |
| role_map = { | |
| "user": "user", | |
| "assistant": "model" | |
| } | |
| messages = [{"parts":[{"text":turn["content"]}], "role":role_map[turn["role"]]} for turn in messages] | |
| url = ( | |
| f"https://us-central1-aiplatform.googleapis.com/v1/projects/" | |
| f"{project_id}/locations/{regions}/publishers/google/models/" | |
| f"{model}:generateContent" | |
| ) | |
| headers = { | |
| "Authorization": f"Bearer {access_token}", | |
| "Content-Type": "application/json", | |
| } | |
| data = data | { | |
| "contents": messages, | |
| } | |
| if "temperature" in kwargs or "max_tokens" in kwargs: | |
| gen_config = {} | |
| if "temperature" in kwargs: | |
| gen_config["temperature"] = kwargs["temperature"] | |
| if "max_tokens" in kwargs: | |
| gen_config["maxOutputTokens"] = kwargs["max_tokens"] | |
| data["generationConfig"] = gen_config | |
| response = requests.post(url, json=data, headers=headers) | |
| try: | |
| output = { | |
| "answer": response.json()["candidates"][0]["content"]["parts"][0]["text"], | |
| } | |
| except KeyError as e: | |
| print(type(e), e) | |
| print(response.json()) | |
| return output | |
| def chat_completion_cohere(model, messages, temperature, max_tokens, **kwargs): | |
| import cohere | |
| co = cohere.Client(os.environ["COHERE_API_KEY"]) | |
| assert len(messages) > 0 | |
| template_map = {"system":"SYSTEM", | |
| "assistant":"CHATBOT", | |
| "user":"USER"} | |
| assert messages[-1]["role"] == "user" | |
| prompt = messages[-1]["content"] | |
| if len(messages) > 1: | |
| history = [] | |
| for message in messages[:-1]: | |
| history.append({"role":template_map[message["role"]], "message":message["content"]}) | |
| else: | |
| history = None | |
| output = API_ERROR_OUTPUT | |
| for _ in range(API_MAX_RETRY): | |
| try: | |
| response = co.chat( | |
| message=prompt, | |
| model=model, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| chat_history=history, | |
| ) | |
| output = { | |
| "answer": response.text | |
| } | |
| break | |
| except cohere.core.api_error.ApiError as e: | |
| raise | |
| except Exception as e: | |
| break | |
| return output | |
| def chat_completion_meta(model, messages, temperature, max_tokens, api_dict, **kwargs): | |
| assert api_dict | |
| texts = [{"role": m["role"], | |
| "text": m["content"]} for m in messages] | |
| output = "" | |
| for _ in range(API_MAX_RETRY): | |
| try: | |
| res = requests.post( | |
| f"{api_dict['api_base']}/chat_stream_completions?access_token={api_dict['api_key']}", | |
| stream=True, | |
| headers={"Content-Type": "application/json"}, | |
| json={ | |
| "model": model, | |
| "chunks_delimited": True, | |
| "messages": texts, | |
| "options": { | |
| "max_tokens": max_tokens, | |
| "generation_algorithm": "top_p", | |
| "top_p": 1, | |
| "temperature": temperature, | |
| }, | |
| }, | |
| timeout=30, | |
| ) | |
| if res.status_code == 200: | |
| for line in res.iter_lines(): | |
| if line: | |
| part = json.loads(line.decode("utf-8")) | |
| if "text" in part: | |
| output += part["text"] | |
| break | |
| else: | |
| print(f"**API REQUEST ERROR** Code: {res.status_code}") | |
| time.sleep(API_RETRY_SLEEP) | |
| except Exception as e: | |
| print("**API REQUEST ERROR** Reason: Unknown.") | |
| time.sleep(API_RETRY_SLEEP) | |
| continue | |
| return { | |
| "answer": output | |
| } | |
| def batch_submit_sglang( | |
| executor, | |
| tokenizer, | |
| temperature, | |
| max_tokens, | |
| all_context, | |
| max_context_length=None, | |
| end_think_token=None, | |
| ): | |
| print(f"DEBUG: sglang_completion_qwq: max_context_length: {max_context_length}") | |
| sampling_params = { | |
| "temperature": temperature, | |
| "skip_special_tokens": False, | |
| "max_new_tokens": max_tokens - 1, | |
| "no_stop_trim": True, | |
| } | |
| batch_prompt_token_ids = [] | |
| batch_uids =[] | |
| uid_to_prompt = {} | |
| uid_to_response = {} | |
| for context in all_context: | |
| prompt_token_ids = tokenizer.apply_chat_template( | |
| context['turns'], | |
| add_generation_prompt=True, | |
| tokenize=True, | |
| ) | |
| if max_context_length and (len(prompt_token_ids) + max_tokens) > max_context_length: | |
| print(f"DEBUG: sglang_completion_qwq: context length ({len(prompt_token_ids) + max_tokens}) > max_context_length ({max_context_length}), skip this context") | |
| continue | |
| batch_prompt_token_ids.append(prompt_token_ids) | |
| batch_uids.append(context['uid']) | |
| uid_to_prompt[context['uid']] = context['turns'] | |
| err_msg = f"ERROR: len(batch_prompt_token_ids): {len(batch_prompt_token_ids)} != len(batch_uids): {len(batch_uids)}" | |
| assert len(batch_prompt_token_ids) == len(batch_uids), err_msg | |
| _ = executor.submit( | |
| prompt_token_ids=batch_prompt_token_ids, | |
| sampling_params=[sampling_params] * len(batch_uids), | |
| keys=batch_uids, | |
| ) | |
| for request in tqdm(executor.as_completed(), total=len(batch_uids)): | |
| uid = request.key() | |
| result = request.result() | |
| raw_response = tokenizer.decode( | |
| result['output_ids'], | |
| skip_special_tokens=True, | |
| ) | |
| if end_think_token: | |
| thought, _, ans = raw_response.partition(end_think_token) | |
| if ans == "": | |
| uid_to_response[uid] = {"thought": thought, "answer": raw_response} | |
| else: | |
| uid_to_response[uid] = {"thought": thought, "answer": ans} | |
| else: | |
| uid_to_response[uid] = {"answer": raw_response} | |
| # assert len(uid_to_response) == len(all_context), f"ERROR: len output ({len(uid_to_response)}) != len input ({len(all_context)})" | |
| return uid_to_response | |
| def _infer_cuda_tp_world_size(): | |
| cuda_devices = os.environ.get("CUDA_VISIBLE_DEVICES", None) | |
| if cuda_devices is None: | |
| tp_world_size = 8 | |
| else: | |
| tp_world_size = len(cuda_devices.split(",")) | |
| return tp_world_size | |
| def download_model(model: str, max_workers: int = 64): | |
| import subprocess | |
| env = os.environ.copy() | |
| env["HF_HUB_ENABLE_HF_TRANSFER"] = "0" | |
| cmd = [ | |
| "huggingface-cli", | |
| "download", | |
| f"--max-workers={max_workers}", | |
| model | |
| ] | |
| try: | |
| subprocess.run(cmd, env=env, check=True) | |
| print(f"Successfully downloaded model '{model}' with {max_workers} max workers.") | |
| except subprocess.CalledProcessError as e: | |
| print(f"Error occurred while downloading the model: {e}") | |
| def sglang_completion( | |
| model, | |
| batch_context, | |
| answer_file, | |
| temperature, | |
| max_tokens=32768, | |
| end_think_token=None, | |
| **kwargs, | |
| ): | |
| from transformers import AutoTokenizer | |
| from utils.sglang_server import SGLangServerExecutor | |
| import re | |
| tokenizer = AutoTokenizer.from_pretrained(model) | |
| uids = [context['uid'] for context in batch_context] | |
| prompts = [context['instruction'] for context in batch_context] | |
| code_envs = [context['environment'] for context in batch_context] | |
| processed_context = [ | |
| { | |
| "uid": uids[i], | |
| "turns": [{ | |
| "content": prompts[i], | |
| "role": "user", | |
| }] | |
| } | |
| for i in tqdm(range(len(uids))) | |
| ] | |
| download_model(model=model) | |
| server_args = { | |
| "model_path": model, | |
| "dtype": "auto", | |
| "tp_size": _infer_cuda_tp_world_size(), | |
| "mem_fraction_static": 0.7, | |
| "max_prefill_tokens": max_tokens, | |
| "max_workers": 256, | |
| "server_port": 30000, | |
| } | |
| executor = SGLangServerExecutor( | |
| **server_args, | |
| ) | |
| print(f"DEBUG: sglang_completion: model: {model}") | |
| uid_to_response = batch_submit_sglang( | |
| executor=executor, | |
| tokenizer=tokenizer, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| all_context=processed_context, | |
| end_think_token=end_think_token, | |
| ) | |
| executor.join() | |
| print("DEBUG: sglang_completion: done, sleep 10 seconds...") | |
| time.sleep(10) | |
| num_null = sum( | |
| [uid_to_response[uid]['answer'] is None for uid in uids if uid in uid_to_response] | |
| ) | |
| print(f"Number of null responses: {num_null}") | |
| records = [] | |
| for i, context in enumerate(processed_context): | |
| uid = context['uid'] | |
| if uid not in uid_to_response: | |
| continue | |
| answer_data = uid_to_response[uid] | |
| record = { | |
| "uid": uid, | |
| "ans_id": shortuuid.uuid(), | |
| "model": kwargs.get("model_display_name", model), | |
| "messages": context['turns'] + [ | |
| {"content": answer_data, "role": "assistant"} | |
| ], | |
| "environment": code_envs[i], | |
| "tstamp": time.time(), | |
| "metadata": {}, | |
| } | |
| records.append(record) | |
| with open(answer_file, 'w', encoding='utf-8') as f: | |
| for rec in records: | |
| f.write(json.dumps(rec, ensure_ascii=True) + '\n') | |
| def chat_completion_aws_bedrock_claude(messages, api_dict=None, aws_region="us-west-2", **kwargs): | |
| """ | |
| Call AWS Bedrock API for chat completion | |
| Args: | |
| model (str): Model ID | |
| conv (object): Conversation object containing messages | |
| temperature (float): Temperature parameter for response generation | |
| max_tokens (int): Maximum tokens in response | |
| api_dict (dict, optional): API configuration dictionary | |
| aws_region (str, optional): AWS region, defaults to "us-west-2" | |
| Returns: | |
| str: Generated response text or error message | |
| """ | |
| # Configure AWS client if api_dict provided | |
| if api_dict is not None: | |
| bedrock_rt_client = boto3.client( | |
| service_name='bedrock-runtime', | |
| region_name=aws_region, | |
| aws_access_key_id=api_dict.get('aws_access_key_id'), | |
| aws_secret_access_key=api_dict.get('aws_secret_access_key') | |
| ) | |
| else: | |
| bedrock_rt_client = boto3.client( | |
| service_name='bedrock-runtime', | |
| region_name=aws_region,) | |
| output = API_ERROR_OUTPUT | |
| #get kwargs from settings | |
| temperature= kwargs["temperature"] | |
| max_tokens= kwargs["max_tokens"] | |
| model = kwargs["model_id"] | |
| sys_msg = "" | |
| if messages[0]["role"] == "system": | |
| sys_msg = messages[0]["content"] | |
| messages = messages[1:] | |
| else: | |
| prompt = messages[0]['content'] | |
| # Retry logic for API calls | |
| for _ in range(API_MAX_RETRY): | |
| try: | |
| # Prepare request body | |
| prompt_json = { | |
| "system": sys_msg, | |
| "messages": messages, | |
| "max_tokens": max_tokens, | |
| "temperature": temperature, | |
| "anthropic_version": "bedrock-2023-05-31", | |
| "stop_sequences": ["Human"] | |
| } | |
| # Call Bedrock API | |
| response = bedrock_rt_client.invoke_model( | |
| body=json.dumps(prompt_json), | |
| modelId=model, | |
| accept='application/json', | |
| contentType='application/json' | |
| ) | |
| # Parse response | |
| response_body = json.loads(response.get('body').read()) | |
| output = {"answer":response_body.get("content")[0].get("text")} | |
| break | |
| except Exception as e: | |
| time.sleep(API_RETRY_SLEEP) | |
| return output | |
| def chat_completion_aws_bedrock_mistral(messages, api_dict=None, aws_region="us-west-2", **kwargs): | |
| """ | |
| Call AWS Bedrock API for chat completion | |
| Args: | |
| model (str): Model ID | |
| conv (object): Conversation object containing messages | |
| temperature (float): Temperature parameter for response generation | |
| max_tokens (int): Maximum tokens in response | |
| api_dict (dict, optional): API configuration dictionary | |
| aws_region (str, optional): AWS region, defaults to "us-west-2" | |
| Returns: | |
| str: Generated response text or error message | |
| """ | |
| # Configure AWS client if api_dict provided | |
| if api_dict is not None: | |
| bedrock_rt_client = boto3.client( | |
| service_name='bedrock-runtime', | |
| region_name=aws_region, | |
| aws_access_key_id=api_dict.get('aws_access_key_id'), | |
| aws_secret_access_key=api_dict.get('aws_secret_access_key') | |
| ) | |
| else: | |
| bedrock_rt_client = boto3.client( | |
| service_name='bedrock-runtime', | |
| region_name=aws_region,) | |
| output = API_ERROR_OUTPUT | |
| #get kwargs from settings | |
| temperature= kwargs["temperature"] | |
| max_tokens= kwargs["max_tokens"] | |
| model = kwargs["model_id"] | |
| # Retry logic for API calls | |
| for _ in range(API_MAX_RETRY): | |
| try: | |
| ## =============== Format prompt ================ | |
| prompt = "\n".join([content for message in messages for content in message["content"]]) | |
| formatted_prompt = f"<s>[INST] {prompt.strip()} [/INST]" | |
| body = { | |
| "prompt": formatted_prompt, | |
| "max_tokens": max_tokens, | |
| "stop": ["Human:"], | |
| "temperature": temperature, | |
| } | |
| # Call Bedrock API | |
| response = bedrock_rt_client.invoke_model( | |
| body=json.dumps(body), | |
| modelId=model, | |
| accept='application/json', | |
| contentType='application/json' | |
| ) | |
| # Parse response | |
| response_body = json.loads(response.get('body').read()) | |
| if "pixtral-large" in model: #us.mistral.pixtral-large-2502-v1:0 | |
| output = {"answer": response_body.get("choices")[0].get("message").get("content")} | |
| else: | |
| output = {"answer": response_body.get("outputs")[0].get("text")} | |
| break | |
| except Exception as e: | |
| time.sleep(API_RETRY_SLEEP) | |
| return output | |
| def chat_completion_mistral_streaming(model, messages, temperature, max_tokens, api_dict=None, **kwargs): | |
| """Streaming version of Mistral completion""" | |
| import openai | |
| if api_dict: | |
| client = openai.OpenAI( | |
| base_url=api_dict["api_base"], | |
| api_key=api_dict["api_key"], | |
| ) | |
| else: | |
| client = openai.OpenAI() | |
| try: | |
| stream = client.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| stream=True | |
| ) | |
| for chunk in stream: | |
| if chunk.choices[0].delta.content is not None: | |
| yield chunk.choices[0].delta.content | |
| except Exception as e: | |
| print(f"Error in Mistral streaming completion: {e}") | |
| yield f"Error: {str(e)}" | |
| def chat_completion_gemini_streaming(model, messages, **kwargs): | |
| """Streaming version of Gemini completion""" | |
| import google.generativeai as genai | |
| try: | |
| # Configure the API | |
| genai.configure(api_key=os.environ.get("GEMINI_API_KEY")) | |
| # Create model | |
| model_genai = genai.GenerativeModel(model) | |
| # Convert messages to Gemini format | |
| conversation = model_genai.start_chat(history=[]) | |
| # Get the last user message | |
| last_user_message = None | |
| for msg in messages: | |
| if msg["role"] == "user": | |
| last_user_message = msg["content"] | |
| if not last_user_message: | |
| yield "Error: No user message found" | |
| return | |
| # Stream the response | |
| response = conversation.send_message(last_user_message, stream=True) | |
| for chunk in response: | |
| if chunk.text: | |
| yield chunk.text | |
| except Exception as e: | |
| print(f"Error in Gemini streaming completion: {e}") | |
| yield f"Error: {str(e)}" | |