Spaces:
Paused
Paused
| """ | |
| 1. Default permissions for members in a team - allowed to call /key/info and /key/health | |
| - Create a team, create a member in a team (role = "user") | |
| Invalid Permissions: | |
| - User tries creating a key with team_id = team_id -> expect to fail. Invalid Permissions | |
| - User tries editing a key with team_id = team_id -> expect to fail. Invalid Permissions | |
| - User tries deleting a key with team_id = team_id -> expect to fail. Invalid Permissions | |
| - User tries regenerating a key with team_id = team_id -> expect to fail. Invalid Permissions | |
| Valid Permissions: | |
| - User tries calling /key/info with team_id, expect to get valid response | |
| 2. Permissions - members allowd to edit, delete keys but not allowed to create keys | |
| - Create a team with member_permissions = ["/key/update", "/key/delete", "/key/info"] | |
| - Create a member in the team with role = "user" | |
| Valid Permissions: | |
| - User tries editing a key with team_id = team_id -> expect to pass. Valid Permissions | |
| - User tries deleting a key with team_id = team_id -> expect to pass. Valid Permissions | |
| - User tries creating a key with team_id = team_id -> expect to fail. Invalid Permissions | |
| - User tries regenerating a key with team_id = team_id -> expect to fail. Invalid Permissions | |
| - User tries calling /key/info with team_id, expect to get valid response | |
| 3. Permissions - members allowed to create keys but not allowed to edit, delete keys | |
| - Create a team with member_permissions = ["/key/generate"] | |
| - Create a member in the team with role = "user" | |
| Valid Permissions: | |
| - User tries creating a key with team_id = team_id -> expect to pass. Valid Permissions | |
| Invalid Permissions: | |
| - User tries editing a key with team_id = team_id -> expect to fail. Invalid Permissions | |
| - User tries deleting a key with team_id = team_id -> expect to fail. Invalid Permissions | |
| - User tries regenerating a key with team_id = team_id -> expect to fail. Invalid Permissions | |
| """ | |
| import pytest | |
| import asyncio | |
| import aiohttp, openai | |
| import uuid | |
| import json | |
| from litellm.proxy._types import ProxyErrorTypes | |
| from typing import Optional | |
| LITELLM_MASTER_KEY = "sk-1234" | |
| async def create_team(session, key, member_permissions=None): | |
| url = "http://0.0.0.0:4000/team/new" | |
| headers = { | |
| "Authorization": f"Bearer {key}", | |
| "Content-Type": "application/json", | |
| } | |
| data = { | |
| "team_member_permissions": member_permissions | |
| } | |
| async with session.post(url, headers=headers, json=data) as response: | |
| status = response.status | |
| response_text = await response.text() | |
| if status != 200: | |
| raise Exception(response_text) | |
| return await response.json() | |
| async def create_user(session, key, user_id, team_id=None): | |
| url = "http://0.0.0.0:4000/user/new" | |
| headers = { | |
| "Authorization": f"Bearer {key}", | |
| "Content-Type": "application/json", | |
| } | |
| data = { | |
| "user_id": user_id | |
| } | |
| if team_id: | |
| data["team_id"] = team_id | |
| async with session.post(url, headers=headers, json=data) as response: | |
| status = response.status | |
| response_text = await response.text() | |
| if status != 200: | |
| raise Exception(response_text) | |
| return await response.json() | |
| async def add_team_member(session, key, team_id, user_id, role="user"): | |
| url = "http://0.0.0.0:4000/team/member_add" | |
| headers = { | |
| "Authorization": f"Bearer {key}", | |
| "Content-Type": "application/json", | |
| } | |
| data = { | |
| "team_id": team_id, | |
| "member": { | |
| "role": role, | |
| "user_id": user_id | |
| } | |
| } | |
| print("Adding team member with data: ", data) | |
| async with session.post(url, headers=headers, json=data) as response: | |
| status = response.status | |
| response_text = await response.text() | |
| if status != 200: | |
| raise Exception(response_text) | |
| return await response.json() | |
| async def generate_key(session, key, team_id=None, user_id=None): | |
| url = "http://0.0.0.0:4000/key/generate" | |
| headers = { | |
| "Authorization": f"Bearer {key}", | |
| "Content-Type": "application/json", | |
| } | |
| data = {} | |
| if team_id: | |
| data["team_id"] = team_id | |
| if user_id: | |
| data["user_id"] = user_id | |
| async with session.post(url, headers=headers, json=data) as response: | |
| status = response.status | |
| response_text = await response.text() | |
| if status != 200: | |
| return {"status": status, "error": response_text} | |
| return await response.json() | |
| async def key_info(session, key, key_id): | |
| url = f"http://0.0.0.0:4000/key/info?key={key_id}" | |
| headers = { | |
| "Authorization": f"Bearer {key}", | |
| "Content-Type": "application/json", | |
| } | |
| async with session.get(url, headers=headers) as response: | |
| status = response.status | |
| response_text = await response.text() | |
| if status != 200: | |
| return {"status": status, "error": response_text} | |
| return await response.json() | |
| async def update_key( | |
| session: aiohttp.ClientSession, | |
| key: str, | |
| key_id: str, | |
| team_id: Optional[str] = None, | |
| ): | |
| """ | |
| Update a key | |
| Args: | |
| key: key to use for authentication | |
| key_id: key to update | |
| """ | |
| url = "http://0.0.0.0:4000/key/update" | |
| headers = { | |
| "Authorization": f"Bearer {key}", | |
| "Content-Type": "application/json", | |
| } | |
| data = { | |
| "key": key_id, | |
| "metadata": {"updated": True} | |
| } | |
| if team_id: | |
| data["team_id"] = team_id | |
| async with session.post(url, headers=headers, json=data) as response: | |
| status = response.status | |
| response_text = await response.text() | |
| if status != 200: | |
| return {"status": status, "error": response_text} | |
| return await response.json() | |
| async def delete_key(session, key, key_id): | |
| url = "http://0.0.0.0:4000/key/delete" | |
| headers = { | |
| "Authorization": f"Bearer {key}", | |
| "Content-Type": "application/json", | |
| } | |
| data = { | |
| "keys": [key_id] | |
| } | |
| async with session.post(url, headers=headers, json=data) as response: | |
| status = response.status | |
| response_text = await response.text() | |
| if status != 200: | |
| return {"status": status, "error": response_text} | |
| return await response.json() | |
| async def regenerate_key(session, key, key_id, team_id=None): | |
| url = "http://0.0.0.0:4000/key/regenerate" | |
| headers = { | |
| "Authorization": f"Bearer {key}", | |
| "Content-Type": "application/json", | |
| } | |
| data = { | |
| "key": key_id | |
| } | |
| if team_id: | |
| data["team_id"] = team_id | |
| async with session.post(url, headers=headers, json=data) as response: | |
| status = response.status | |
| response_text = await response.text() | |
| if status != 200: | |
| return {"status": status, "error": response_text} | |
| return await response.json() | |
| async def test_default_member_permissions(): | |
| """ | |
| Test default permissions for members in a team - allowed to call /key/info and /key/health | |
| """ | |
| async with aiohttp.ClientSession() as session: | |
| master_key = LITELLM_MASTER_KEY | |
| # Create a team | |
| team_data = await create_team( | |
| session=session, | |
| key=master_key | |
| ) | |
| team_id = team_data["team_id"] | |
| # create a team key | |
| team_key_data = await generate_key( | |
| session=session, | |
| key=master_key, | |
| team_id=team_id | |
| ) | |
| team_key = team_key_data["key"] | |
| # create a user | |
| user_data = await create_user( | |
| session=session, | |
| key=master_key, | |
| user_id=f"user_{uuid.uuid4().hex[:8]}", | |
| team_id=team_id | |
| ) | |
| user_id = user_data["user_id"] | |
| # Create a user key | |
| print("New user data: ", user_data) | |
| # Create a user key | |
| user_key_data = await generate_key( | |
| session=session, | |
| key=master_key, | |
| user_id=user_id | |
| ) | |
| print("new user key: ", user_key_data) | |
| user_key = user_key_data["key"] | |
| # Test invalid permissions | |
| # User tries creating a key with team_id | |
| print("Regular team member trying to create a key with team_id. Expecting error.") | |
| create_result = await generate_key( | |
| session=session, | |
| key=user_key, | |
| team_id=team_id | |
| ) | |
| print("result: ", create_result) | |
| assert "status" in create_result and create_result["status"] == 401, "User should not be able to create keys for team" | |
| error_data = json.loads(create_result["error"]) | |
| print("error response =", json.dumps(error_data, indent=4)) | |
| assert error_data["error"]["type"] == ProxyErrorTypes.team_member_permission_error.value, "Error should be a team member permission error" | |
| # User tries editing a key with team_id | |
| print("Regular team member trying to edit a key with team_id. Expecting error.") | |
| update_result = await update_key( | |
| session=session, | |
| key=user_key, | |
| key_id=team_key, | |
| team_id="ATTACKER_TEAM_ID" | |
| ) | |
| assert "status" in update_result and update_result["status"] == 401, "User should not be able to update keys for team" | |
| error_data = json.loads(update_result["error"]) | |
| print("error response =", json.dumps(error_data, indent=4)) | |
| assert error_data["error"]["type"] == ProxyErrorTypes.team_member_permission_error.value, "Error should be a team member permission error" | |
| # User tries deleting a key with team_id | |
| print("Regular team member trying to delete a key with team_id. Expecting error.") | |
| delete_result = await delete_key( | |
| session=session, | |
| key=user_key, | |
| key_id=team_key, | |
| ) | |
| assert "status" in delete_result and delete_result["status"] == 401, "User should not be able to delete keys for team" | |
| error_data = json.loads(delete_result["error"]) | |
| print("error response =", json.dumps(error_data, indent=4)) | |
| assert error_data["error"]["type"] == ProxyErrorTypes.team_member_permission_error.value, "Error should be a team member permission error" | |
| # User tries regenerating a key with team_id | |
| print("Regular team member trying to regenerate a key with team_id. Expecting error.") | |
| regenerate_result = await regenerate_key( | |
| session=session, | |
| key=user_key, | |
| key_id=team_key, | |
| ) | |
| assert "status" in regenerate_result and regenerate_result["status"] == 401, "User should not be able to regenerate keys for team" | |
| error_data = json.loads(regenerate_result["error"]) | |
| print("error response =", json.dumps(error_data, indent=4)) | |
| assert error_data["error"]["type"] == ProxyErrorTypes.team_member_permission_error.value, "Error should be a team member permission error" | |
| # Test valid permissions | |
| # User tries calling /key/info with team_id | |
| print("Regular team member trying to get key info with team_id. Expecting success.") | |
| info_result = await key_info( | |
| session=session, | |
| key=user_key, | |
| key_id=team_key, | |
| ) | |
| print("info result =", info_result) | |
| assert "status" not in info_result, "Admin should be able to get key info" | |
| async def test_edit_delete_permissions(): | |
| """ | |
| Test permissions - members allowed to edit, delete keys but not allowed to create keys | |
| """ | |
| async with aiohttp.ClientSession() as session: | |
| master_key = LITELLM_MASTER_KEY | |
| # Create a team with specific member permissions | |
| team_data = await create_team( | |
| session=session, | |
| key=master_key, | |
| member_permissions=["/key/update", "/key/delete", "/key/info"] | |
| ) | |
| team_id = team_data["team_id"] | |
| # create a user in team=team_id | |
| user_data = await create_user( | |
| session=session, | |
| key=master_key, | |
| user_id=f"user_{uuid.uuid4().hex[:8]}", | |
| team_id=team_id | |
| ) | |
| user_id = user_data["user_id"] | |
| # Generate an admin key for the team | |
| admin_key_data = await generate_key(session, master_key, team_id) | |
| key_id = admin_key_data["key"] | |
| # Create a user key | |
| user_key_data = await generate_key( | |
| session=session, | |
| key=master_key, | |
| user_id=user_id | |
| ) | |
| user_key = user_key_data["key"] | |
| # Test valid permissions | |
| # User tries editing a key with team_id | |
| update_result = await update_key( | |
| session=session, | |
| key=user_key, | |
| key_id=key_id, | |
| team_id=team_id | |
| ) | |
| assert "status" not in update_result, "User should be able to update keys for team" | |
| # User tries deleting a key with team_id - test this last | |
| delete_result = await delete_key( | |
| session=session, | |
| key=user_key, | |
| key_id=key_id | |
| ) | |
| assert "status" not in delete_result, "User should be able to delete keys for team" | |
| # Test invalid permissions | |
| # User tries creating a key with team_id | |
| create_result = await generate_key( | |
| session=session, | |
| key=user_key, | |
| team_id=team_id | |
| ) | |
| assert "status" in create_result and create_result["status"] != 200, "User should not be able to create keys for team" | |
| # User tries regenerating a key with team_id | |
| regenerate_result = await regenerate_key( | |
| session=session, | |
| key=user_key, | |
| key_id=key_id, | |
| team_id=team_id | |
| ) | |
| assert "status" in regenerate_result and regenerate_result["status"] != 200, "User should not be able to regenerate keys for team" | |
| async def test_create_permissions(): | |
| """ | |
| Test permissions - members allowed to create keys but not allowed to edit, delete keys | |
| """ | |
| async with aiohttp.ClientSession() as session: | |
| master_key = LITELLM_MASTER_KEY | |
| # Create a team with specific member permissions | |
| team_data = await create_team( | |
| session=session, | |
| key=master_key, | |
| member_permissions=["/key/generate"] | |
| ) | |
| team_id = team_data["team_id"] | |
| # Create a user in the team | |
| user_id = f"user_{uuid.uuid4().hex[:8]}" | |
| await add_team_member( | |
| session=session, | |
| key=master_key, | |
| team_id=team_id, | |
| user_id=user_id, | |
| role="user" | |
| ) | |
| # Generate an admin key for the team | |
| admin_key_data = await generate_key( | |
| session=session, | |
| key=master_key, | |
| team_id=team_id | |
| ) | |
| admin_key = admin_key_data["key"] | |
| key_id = admin_key_data["key"] | |
| # Create a user key | |
| user_key_data = await generate_key( | |
| session=session, | |
| key=master_key, | |
| user_id=user_id | |
| ) | |
| user_key = user_key_data["key"] | |
| # Test valid permissions | |
| # User tries creating a key with team_id | |
| create_result = await generate_key( | |
| session=session, | |
| key=user_key, | |
| team_id=team_id | |
| ) | |
| print("success, user created key for team=", create_result) | |
| assert "key" in create_result, "User should be able to create keys for team" | |
| assert create_result["team_id"] == team_id, "User should be able to create keys for team" | |
| assert "status" not in create_result, "User should be able to create keys for team" | |
| # Test invalid permissions | |
| # User tries editing a key with team_id | |
| update_result = await update_key( | |
| session=session, | |
| key=user_key, | |
| key_id=key_id, | |
| team_id=team_id | |
| ) | |
| assert "status" in update_result and update_result["status"] != 200, "User should not be able to update keys for team" | |
| # User tries deleting a key with team_id | |
| delete_result = await delete_key( | |
| session=session, | |
| key=user_key, | |
| key_id=key_id | |
| ) | |
| assert "status" in delete_result and delete_result["status"] != 200, "User should not be able to delete keys for team" | |
| # User tries regenerating a key with team_id | |
| regenerate_result = await regenerate_key( | |
| session=session, | |
| key=user_key, | |
| key_id=key_id, | |
| team_id=team_id | |
| ) | |
| assert "status" in regenerate_result and regenerate_result["status"] != 200, "User should not be able to regenerate keys for team" |