Spaces:
Paused
Paused
| """ | |
| Testing for _assemble_complete_response_from_streaming_chunks | |
| - Test 1 - ModelResponse with 1 list of streaming chunks. Assert chunks are added to the streaming_chunks, after final chunk sent assert complete_streaming_response is not None | |
| - Test 2 - TextCompletionResponse with 1 list of streaming chunks. Assert chunks are added to the streaming_chunks, after final chunk sent assert complete_streaming_response is not None | |
| - Test 3 - Have multiple lists of streaming chunks, Assert that chunks are added to the correct list and that complete_streaming_response is None. After final chunk sent assert complete_streaming_response is not None | |
| - Test 4 - build a complete response when 1 chunk is poorly formatted | |
| """ | |
| import json | |
| import os | |
| import sys | |
| from datetime import datetime | |
| from unittest.mock import AsyncMock | |
| sys.path.insert( | |
| 0, os.path.abspath("../..") | |
| ) # Adds the parent directory to the system path | |
| import httpx | |
| import pytest | |
| from respx import MockRouter | |
| import litellm | |
| from litellm import ( | |
| Choices, | |
| Message, | |
| ModelResponse, | |
| ModelResponseStream, | |
| TextCompletionResponse, | |
| TextChoices, | |
| ) | |
| from litellm.litellm_core_utils.logging_utils import ( | |
| _assemble_complete_response_from_streaming_chunks, | |
| ) | |
| def test_assemble_complete_response_from_streaming_chunks_1(is_async): | |
| """ | |
| Test 1 - ModelResponse with 1 list of streaming chunks. Assert chunks are added to the streaming_chunks, after final chunk sent assert complete_streaming_response is not None | |
| """ | |
| request_kwargs = { | |
| "model": "test_model", | |
| "messages": [{"role": "user", "content": "Hello, world!"}], | |
| } | |
| list_streaming_chunks = [] | |
| chunk = { | |
| "id": "chatcmpl-9mWtyDnikZZoB75DyfUzWUxiiE2Pi", | |
| "choices": [ | |
| litellm.utils.StreamingChoices( | |
| delta=litellm.utils.Delta( | |
| content="hello in response", | |
| function_call=None, | |
| role=None, | |
| tool_calls=None, | |
| ), | |
| index=0, | |
| logprobs=None, | |
| ) | |
| ], | |
| "created": 1721353246, | |
| "model": "gpt-3.5-turbo", | |
| "object": "chat.completion.chunk", | |
| "system_fingerprint": None, | |
| "usage": None, | |
| } | |
| chunk = ModelResponseStream(**chunk) | |
| complete_streaming_response = _assemble_complete_response_from_streaming_chunks( | |
| result=chunk, | |
| start_time=datetime.now(), | |
| end_time=datetime.now(), | |
| request_kwargs=request_kwargs, | |
| streaming_chunks=list_streaming_chunks, | |
| is_async=is_async, | |
| ) | |
| # this is the 1st chunk - complete_streaming_response should be None | |
| print("list_streaming_chunks", list_streaming_chunks) | |
| print("complete_streaming_response", complete_streaming_response) | |
| assert complete_streaming_response is None | |
| assert len(list_streaming_chunks) == 1 | |
| assert list_streaming_chunks[0] == chunk | |
| # Add final chunk | |
| chunk = { | |
| "id": "chatcmpl-9mWtyDnikZZoB75DyfUzWUxiiE2Pi", | |
| "choices": [ | |
| litellm.utils.StreamingChoices( | |
| finish_reason="stop", | |
| delta=litellm.utils.Delta( | |
| content="end of response", | |
| function_call=None, | |
| role=None, | |
| tool_calls=None, | |
| ), | |
| index=0, | |
| logprobs=None, | |
| ) | |
| ], | |
| "created": 1721353246, | |
| "model": "gpt-3.5-turbo", | |
| "object": "chat.completion.chunk", | |
| "system_fingerprint": None, | |
| "usage": None, | |
| } | |
| chunk = ModelResponseStream(**chunk) | |
| complete_streaming_response = _assemble_complete_response_from_streaming_chunks( | |
| result=chunk, | |
| start_time=datetime.now(), | |
| end_time=datetime.now(), | |
| request_kwargs=request_kwargs, | |
| streaming_chunks=list_streaming_chunks, | |
| is_async=is_async, | |
| ) | |
| print("list_streaming_chunks", list_streaming_chunks) | |
| print("complete_streaming_response", complete_streaming_response) | |
| # this is the 2nd chunk - complete_streaming_response should not be None | |
| assert complete_streaming_response is not None | |
| assert len(list_streaming_chunks) == 2 | |
| assert isinstance(complete_streaming_response, ModelResponse) | |
| assert isinstance(complete_streaming_response.choices[0], Choices) | |
| pass | |
| def test_assemble_complete_response_from_streaming_chunks_2(is_async): | |
| """ | |
| Test 2 - TextCompletionResponse with 1 list of streaming chunks. Assert chunks are added to the streaming_chunks, after final chunk sent assert complete_streaming_response is not None | |
| """ | |
| from litellm.utils import TextCompletionStreamWrapper | |
| _text_completion_stream_wrapper = TextCompletionStreamWrapper( | |
| completion_stream=None, model="test_model" | |
| ) | |
| request_kwargs = { | |
| "model": "test_model", | |
| "messages": [{"role": "user", "content": "Hello, world!"}], | |
| } | |
| list_streaming_chunks = [] | |
| chunk = { | |
| "id": "chatcmpl-9mWtyDnikZZoB75DyfUzWUxiiE2Pi", | |
| "choices": [ | |
| litellm.utils.StreamingChoices( | |
| delta=litellm.utils.Delta( | |
| content="hello in response", | |
| function_call=None, | |
| role=None, | |
| tool_calls=None, | |
| ), | |
| index=0, | |
| logprobs=None, | |
| ) | |
| ], | |
| "created": 1721353246, | |
| "model": "gpt-3.5-turbo", | |
| "object": "chat.completion.chunk", | |
| "system_fingerprint": None, | |
| "usage": None, | |
| } | |
| chunk = ModelResponseStream(**chunk) | |
| chunk = _text_completion_stream_wrapper.convert_to_text_completion_object(chunk) | |
| complete_streaming_response = _assemble_complete_response_from_streaming_chunks( | |
| result=chunk, | |
| start_time=datetime.now(), | |
| end_time=datetime.now(), | |
| request_kwargs=request_kwargs, | |
| streaming_chunks=list_streaming_chunks, | |
| is_async=is_async, | |
| ) | |
| # this is the 1st chunk - complete_streaming_response should be None | |
| print("list_streaming_chunks", list_streaming_chunks) | |
| print("complete_streaming_response", complete_streaming_response) | |
| assert complete_streaming_response is None | |
| assert len(list_streaming_chunks) == 1 | |
| assert list_streaming_chunks[0] == chunk | |
| # Add final chunk | |
| chunk = { | |
| "id": "chatcmpl-9mWtyDnikZZoB75DyfUzWUxiiE2Pi", | |
| "choices": [ | |
| litellm.utils.StreamingChoices( | |
| finish_reason="stop", | |
| delta=litellm.utils.Delta( | |
| content="end of response", | |
| function_call=None, | |
| role=None, | |
| tool_calls=None, | |
| ), | |
| index=0, | |
| logprobs=None, | |
| ) | |
| ], | |
| "created": 1721353246, | |
| "model": "gpt-3.5-turbo", | |
| "object": "chat.completion.chunk", | |
| "system_fingerprint": None, | |
| "usage": None, | |
| } | |
| chunk = ModelResponseStream(**chunk) | |
| chunk = _text_completion_stream_wrapper.convert_to_text_completion_object(chunk) | |
| complete_streaming_response = _assemble_complete_response_from_streaming_chunks( | |
| result=chunk, | |
| start_time=datetime.now(), | |
| end_time=datetime.now(), | |
| request_kwargs=request_kwargs, | |
| streaming_chunks=list_streaming_chunks, | |
| is_async=is_async, | |
| ) | |
| print("list_streaming_chunks", list_streaming_chunks) | |
| print("complete_streaming_response", complete_streaming_response) | |
| # this is the 2nd chunk - complete_streaming_response should not be None | |
| assert complete_streaming_response is not None | |
| assert len(list_streaming_chunks) == 2 | |
| assert isinstance(complete_streaming_response, TextCompletionResponse) | |
| assert isinstance(complete_streaming_response.choices[0], TextChoices) | |
| pass | |
| def test_assemble_complete_response_from_streaming_chunks_3(is_async): | |
| request_kwargs = { | |
| "model": "test_model", | |
| "messages": [{"role": "user", "content": "Hello, world!"}], | |
| } | |
| list_streaming_chunks_1 = [] | |
| list_streaming_chunks_2 = [] | |
| chunk = { | |
| "id": "chatcmpl-9mWtyDnikZZoB75DyfUzWUxiiE2Pi", | |
| "choices": [ | |
| litellm.utils.StreamingChoices( | |
| delta=litellm.utils.Delta( | |
| content="hello in response", | |
| function_call=None, | |
| role=None, | |
| tool_calls=None, | |
| ), | |
| index=0, | |
| logprobs=None, | |
| ) | |
| ], | |
| "created": 1721353246, | |
| "model": "gpt-3.5-turbo", | |
| "object": "chat.completion.chunk", | |
| "system_fingerprint": None, | |
| "usage": None, | |
| } | |
| chunk = ModelResponseStream(**chunk) | |
| complete_streaming_response = _assemble_complete_response_from_streaming_chunks( | |
| result=chunk, | |
| start_time=datetime.now(), | |
| end_time=datetime.now(), | |
| request_kwargs=request_kwargs, | |
| streaming_chunks=list_streaming_chunks_1, | |
| is_async=is_async, | |
| ) | |
| # this is the 1st chunk - complete_streaming_response should be None | |
| print("list_streaming_chunks_1", list_streaming_chunks_1) | |
| print("complete_streaming_response", complete_streaming_response) | |
| assert complete_streaming_response is None | |
| assert len(list_streaming_chunks_1) == 1 | |
| assert list_streaming_chunks_1[0] == chunk | |
| assert len(list_streaming_chunks_2) == 0 | |
| # now add a chunk to the 2nd list | |
| complete_streaming_response = _assemble_complete_response_from_streaming_chunks( | |
| result=chunk, | |
| start_time=datetime.now(), | |
| end_time=datetime.now(), | |
| request_kwargs=request_kwargs, | |
| streaming_chunks=list_streaming_chunks_2, | |
| is_async=is_async, | |
| ) | |
| print("list_streaming_chunks_2", list_streaming_chunks_2) | |
| print("complete_streaming_response", complete_streaming_response) | |
| assert complete_streaming_response is None | |
| assert len(list_streaming_chunks_2) == 1 | |
| assert list_streaming_chunks_2[0] == chunk | |
| assert len(list_streaming_chunks_1) == 1 | |
| # now add a chunk to the 1st list | |
| def test_assemble_complete_response_from_streaming_chunks_4(is_async): | |
| """ | |
| Test 4 - build a complete response when 1 chunk is poorly formatted | |
| - Assert complete_streaming_response is None | |
| - Assert list_streaming_chunks is not empty | |
| """ | |
| request_kwargs = { | |
| "model": "test_model", | |
| "messages": [{"role": "user", "content": "Hello, world!"}], | |
| } | |
| list_streaming_chunks = [] | |
| chunk = { | |
| "id": "chatcmpl-9mWtyDnikZZoB75DyfUzWUxiiE2Pi", | |
| "choices": [ | |
| litellm.utils.StreamingChoices( | |
| finish_reason="stop", | |
| delta=litellm.utils.Delta( | |
| content="end of response", | |
| function_call=None, | |
| role=None, | |
| tool_calls=None, | |
| ), | |
| index=0, | |
| logprobs=None, | |
| ) | |
| ], | |
| "created": 1721353246, | |
| "model": "gpt-3.5-turbo", | |
| "object": "chat.completion.chunk", | |
| "system_fingerprint": None, | |
| "usage": None, | |
| } | |
| chunk = ModelResponseStream(**chunk) | |
| # remove attribute id from chunk | |
| del chunk.object | |
| complete_streaming_response = _assemble_complete_response_from_streaming_chunks( | |
| result=chunk, | |
| start_time=datetime.now(), | |
| end_time=datetime.now(), | |
| request_kwargs=request_kwargs, | |
| streaming_chunks=list_streaming_chunks, | |
| is_async=is_async, | |
| ) | |
| print("complete_streaming_response", complete_streaming_response) | |
| assert complete_streaming_response is None | |
| print("list_streaming_chunks", list_streaming_chunks) | |
| assert len(list_streaming_chunks) == 1 | |