Spaces:
Running
Running
| ''' | |
| Module for logging the sandbox interactions and state. | |
| ''' | |
| from concurrent.futures import ThreadPoolExecutor | |
| import json | |
| import os | |
| from typing import Any, List, Literal, Optional, TypedDict | |
| import datetime | |
| from chat_state import LOG_DIR | |
| from sandbox.sandbox_state import ChatbotSandboxState | |
| from azure.storage.blob import BlobServiceClient | |
| from sandbox.constants import AZURE_BLOB_STORAGE_CONNECTION_STRING, AZURE_BLOB_STORAGE_CONTAINER_NAME | |
| class SandboxLog(TypedDict): | |
| ''' | |
| The schema of the sandbox log stored. | |
| ''' | |
| sandbox_state: ChatbotSandboxState | |
| user_interaction_records: Optional[List[Any]] | |
| def upload_data_to_azure_storage( | |
| data: bytes, | |
| blob_name: str, | |
| write_mode: Literal['overwrite', 'append'], | |
| connection_string: str | None = AZURE_BLOB_STORAGE_CONNECTION_STRING, | |
| container_name: str = AZURE_BLOB_STORAGE_CONTAINER_NAME, | |
| ) -> None: | |
| ''' | |
| Upload data to Azure Blob Storage. | |
| ''' | |
| if not connection_string: | |
| raise ValueError("AZURE_STORAGE_CONNECTION_STRING is not set") | |
| blob_service_client = BlobServiceClient.from_connection_string(connection_string) | |
| container_client = blob_service_client.get_container_client(container_name) | |
| if write_mode == "overwrite": | |
| container_client.upload_blob( | |
| name=blob_name, | |
| data=data, | |
| overwrite=True | |
| ) | |
| elif write_mode == "append": | |
| blob_client = container_client.get_blob_client(blob=blob_name) | |
| if not blob_client.exists(): | |
| blob_client.upload_blob(data, blob_type="AppendBlob") | |
| else: | |
| blob_client.append_block(data) | |
| else: | |
| raise ValueError("Unsupported write_mode. Use 'w' for overwrite or 'a' for append.") | |
| def get_sandbox_log_blob_name(filename: str) -> str: | |
| date_str = datetime.datetime.now().strftime('%Y_%m_%d') | |
| blob_name = f"{date_str}/sandbox_logs/{filename}" | |
| return blob_name | |
| def get_conv_log_filepath( | |
| date: datetime.date, | |
| chat_mode: Literal['battle_anony', 'battle_named', 'direct'], | |
| chat_session_id: str, | |
| ) -> str: | |
| ''' | |
| Get the filepath for the conversation log. | |
| Expected directory structure: | |
| softwarearenlog/ | |
| βββ YEAR_MONTH_DAY/ | |
| βββ conv_logs/ | |
| β βββ battle_anony/ | |
| β β βββ CHATSESSIONID.json | |
| β βββ battle_named/ | |
| β β βββ CHATSESSIONID.json | |
| β βββ direct/ | |
| β βββ CHATSESSIONID.json | |
| ''' | |
| date_str = date.strftime('%Y_%m_%d') | |
| filepath = os.path.join( | |
| date_str, | |
| 'conv_logs', | |
| chat_mode, | |
| f"{chat_session_id}.json" | |
| ) | |
| return filepath | |
| def get_sandbox_log_filepath( | |
| date: datetime.date, | |
| chat_mode: Literal['battle_anony', 'battle_named', 'direct'], | |
| chat_session_id: str, | |
| ) -> str: | |
| ''' | |
| Get the filepath for the conversation log. | |
| Expected directory structure: | |
| softwarearenlog/ | |
| βββ YEAR_MONTH_DAY/ | |
| βββ conv_logs/ | |
| βββ sandbox_logs/ | |
| βββ battle/ | |
| β βββ sandbox-records-SESSIONID-A-B-EDITID.json | |
| βββ side-by-side/ | |
| β βββ sandbox-records-SESSIONID-A-B-EDITID.json | |
| βββ direct/ | |
| βββ sandbox-records-SESSIONID-A-B-EDITID.json | |
| ''' | |
| date_str = date.strftime('%Y_%m_%d') | |
| filepath = os.path.join( | |
| date_str, | |
| 'sandbox_logs', | |
| chat_mode, | |
| f"{chat_session_id}.json" | |
| ) | |
| return filepath | |
| def get_conv_log_blob_name(filename: str) -> str: | |
| date_str = datetime.datetime.now().strftime('%Y_%m_%d') | |
| blob_name = f"{date_str}/conv_logs/{filename}" | |
| return blob_name | |
| _executor = ThreadPoolExecutor(max_workers=20) | |
| def save_conv_log_to_azure_storage( | |
| filename: str, | |
| log_data: dict[str, Any], | |
| write_mode: Literal['overwrite', 'append'] = 'append', | |
| use_async: bool = True | |
| ) -> None: | |
| try: | |
| if AZURE_BLOB_STORAGE_CONNECTION_STRING: | |
| blob_name = get_conv_log_blob_name(filename) | |
| log_json: str = json.dumps( | |
| obj=log_data, | |
| default=str, | |
| ) | |
| def _run_upload(): | |
| upload_data_to_azure_storage( | |
| str.encode(log_json + "\n"), | |
| blob_name, | |
| write_mode | |
| ) | |
| if use_async: | |
| _executor.submit(_run_upload) | |
| else: | |
| _run_upload() | |
| except Exception as e: | |
| print(f"Error uploading conv log to Azure Blob Storage: {e}") | |
| def get_sandbox_log_filename(sandbox_state: ChatbotSandboxState) -> str: | |
| return ( | |
| '-'.join( | |
| [ | |
| "sandbox-logs", | |
| f"{sandbox_state['conv_id']}", # chat conv id | |
| f"{sandbox_state['enabled_round']}", # current chat round | |
| f"{sandbox_state['sandbox_run_round']}", # current sandbox round | |
| ] | |
| ) + ".json" | |
| ) | |
| def upsert_sandbox_log(filename: str, data: str) -> None: | |
| filepath = os.path.join( | |
| LOG_DIR, | |
| datetime.datetime.now().strftime('%Y_%m_%d'), # current date as 2025_02_02 | |
| 'sandbox_logs', | |
| filename | |
| ) | |
| # create directory if not exists | |
| os.makedirs(os.path.dirname(filepath), exist_ok=True) | |
| with open(filepath, "w") as fout: | |
| fout.write(data) | |
| def create_sandbox_log(sandbox_state: ChatbotSandboxState, user_interaction_records: list[Any] | None) -> SandboxLog: | |
| return { | |
| "sandbox_state": sandbox_state, | |
| "user_interaction_records": user_interaction_records, | |
| } | |
| def log_sandbox_telemetry_gradio_fn( | |
| sandbox_state: ChatbotSandboxState, | |
| sandbox_ui_value: tuple[str, bool, list[Any]] | None | |
| ) -> None: | |
| if sandbox_state is None: | |
| return | |
| sandbox_id = sandbox_state['sandbox_id'] | |
| user_interaction_records = sandbox_ui_value[2] if sandbox_ui_value else None | |
| if sandbox_id is None: | |
| return | |
| log_json = create_sandbox_log(sandbox_state, user_interaction_records) | |
| log_data = json.dumps( | |
| log_json, | |
| indent=2, | |
| default=str, | |
| ensure_ascii=False | |
| ) | |
| filename = get_sandbox_log_filename(sandbox_state) | |
| upsert_sandbox_log(filename=filename, data=log_data) | |
| # # Upload to Azure Blob Storage | |
| # if AZURE_BLOB_STORAGE_CONNECTION_STRING: | |
| # try: | |
| # blob_name = get_sandbox_log_blob_name(filename) | |
| # upload_data_to_azure_storage( | |
| # data=str.encode(log_data), | |
| # blob_name=blob_name, | |
| # write_mode='overwrite' | |
| # ) | |
| # except Exception as e: | |
| # print(f"Error uploading sandbox log to Azure Blob Storage: {e}") | |