arena / sandbox /sandbox_telemetry.py
terryyz
init
fe3f5b0
raw
history blame
6.91 kB
'''
Module for logging the sandbox interactions and state.
'''
from concurrent.futures import ThreadPoolExecutor
import json
import os
from typing import Any, List, Literal, Optional, TypedDict
import datetime
from chat_state import LOG_DIR
from sandbox.sandbox_state import ChatbotSandboxState
from azure.storage.blob import BlobServiceClient
from sandbox.constants import AZURE_BLOB_STORAGE_CONNECTION_STRING, AZURE_BLOB_STORAGE_CONTAINER_NAME
class SandboxLog(TypedDict):
'''
The schema of the sandbox log stored.
'''
sandbox_state: ChatbotSandboxState
user_interaction_records: Optional[List[Any]]
def upload_data_to_azure_storage(
data: bytes,
blob_name: str,
write_mode: Literal['overwrite', 'append'],
connection_string: str | None = AZURE_BLOB_STORAGE_CONNECTION_STRING,
container_name: str = AZURE_BLOB_STORAGE_CONTAINER_NAME,
) -> None:
'''
Upload data to Azure Blob Storage.
'''
if not connection_string:
raise ValueError("AZURE_STORAGE_CONNECTION_STRING is not set")
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)
if write_mode == "overwrite":
container_client.upload_blob(
name=blob_name,
data=data,
overwrite=True
)
elif write_mode == "append":
blob_client = container_client.get_blob_client(blob=blob_name)
if not blob_client.exists():
blob_client.upload_blob(data, blob_type="AppendBlob")
else:
blob_client.append_block(data)
else:
raise ValueError("Unsupported write_mode. Use 'w' for overwrite or 'a' for append.")
def get_sandbox_log_blob_name(filename: str) -> str:
date_str = datetime.datetime.now().strftime('%Y_%m_%d')
blob_name = f"{date_str}/sandbox_logs/{filename}"
return blob_name
def get_conv_log_filepath(
date: datetime.date,
chat_mode: Literal['battle_anony', 'battle_named', 'direct'],
chat_session_id: str,
) -> str:
'''
Get the filepath for the conversation log.
Expected directory structure:
softwarearenlog/
└── YEAR_MONTH_DAY/
β”œβ”€β”€ conv_logs/
β”‚ β”œβ”€β”€ battle_anony/
β”‚ β”‚ └── CHATSESSIONID.json
β”‚ β”œβ”€β”€ battle_named/
β”‚ β”‚ └── CHATSESSIONID.json
β”‚ └── direct/
β”‚ └── CHATSESSIONID.json
'''
date_str = date.strftime('%Y_%m_%d')
filepath = os.path.join(
date_str,
'conv_logs',
chat_mode,
f"{chat_session_id}.json"
)
return filepath
def get_sandbox_log_filepath(
date: datetime.date,
chat_mode: Literal['battle_anony', 'battle_named', 'direct'],
chat_session_id: str,
) -> str:
'''
Get the filepath for the conversation log.
Expected directory structure:
softwarearenlog/
└── YEAR_MONTH_DAY/
β”œβ”€β”€ conv_logs/
└── sandbox_logs/
β”œβ”€β”€ battle/
β”‚ └── sandbox-records-SESSIONID-A-B-EDITID.json
β”œβ”€β”€ side-by-side/
β”‚ └── sandbox-records-SESSIONID-A-B-EDITID.json
└── direct/
└── sandbox-records-SESSIONID-A-B-EDITID.json
'''
date_str = date.strftime('%Y_%m_%d')
filepath = os.path.join(
date_str,
'sandbox_logs',
chat_mode,
f"{chat_session_id}.json"
)
return filepath
def get_conv_log_blob_name(filename: str) -> str:
date_str = datetime.datetime.now().strftime('%Y_%m_%d')
blob_name = f"{date_str}/conv_logs/{filename}"
return blob_name
_executor = ThreadPoolExecutor(max_workers=20)
def save_conv_log_to_azure_storage(
filename: str,
log_data: dict[str, Any],
write_mode: Literal['overwrite', 'append'] = 'append',
use_async: bool = True
) -> None:
try:
if AZURE_BLOB_STORAGE_CONNECTION_STRING:
blob_name = get_conv_log_blob_name(filename)
log_json: str = json.dumps(
obj=log_data,
default=str,
)
def _run_upload():
upload_data_to_azure_storage(
str.encode(log_json + "\n"),
blob_name,
write_mode
)
if use_async:
_executor.submit(_run_upload)
else:
_run_upload()
except Exception as e:
print(f"Error uploading conv log to Azure Blob Storage: {e}")
def get_sandbox_log_filename(sandbox_state: ChatbotSandboxState) -> str:
return (
'-'.join(
[
"sandbox-logs",
f"{sandbox_state['conv_id']}", # chat conv id
f"{sandbox_state['enabled_round']}", # current chat round
f"{sandbox_state['sandbox_run_round']}", # current sandbox round
]
) + ".json"
)
def upsert_sandbox_log(filename: str, data: str) -> None:
filepath = os.path.join(
LOG_DIR,
datetime.datetime.now().strftime('%Y_%m_%d'), # current date as 2025_02_02
'sandbox_logs',
filename
)
# create directory if not exists
os.makedirs(os.path.dirname(filepath), exist_ok=True)
with open(filepath, "w") as fout:
fout.write(data)
def create_sandbox_log(sandbox_state: ChatbotSandboxState, user_interaction_records: list[Any] | None) -> SandboxLog:
return {
"sandbox_state": sandbox_state,
"user_interaction_records": user_interaction_records,
}
def log_sandbox_telemetry_gradio_fn(
sandbox_state: ChatbotSandboxState,
sandbox_ui_value: tuple[str, bool, list[Any]] | None
) -> None:
if sandbox_state is None:
return
sandbox_id = sandbox_state['sandbox_id']
user_interaction_records = sandbox_ui_value[2] if sandbox_ui_value else None
if sandbox_id is None:
return
log_json = create_sandbox_log(sandbox_state, user_interaction_records)
log_data = json.dumps(
log_json,
indent=2,
default=str,
ensure_ascii=False
)
filename = get_sandbox_log_filename(sandbox_state)
upsert_sandbox_log(filename=filename, data=log_data)
# # Upload to Azure Blob Storage
# if AZURE_BLOB_STORAGE_CONNECTION_STRING:
# try:
# blob_name = get_sandbox_log_blob_name(filename)
# upload_data_to_azure_storage(
# data=str.encode(log_data),
# blob_name=blob_name,
# write_mode='overwrite'
# )
# except Exception as e:
# print(f"Error uploading sandbox log to Azure Blob Storage: {e}")