File size: 5,217 Bytes
a303b0f 02817fd a303b0f add3bed a303b0f add3bed 3e464ec a303b0f add3bed 02817fd add3bed 3e464ec a303b0f add3bed a303b0f add3bed a303b0f 02817fd add3bed 3e464ec add3bed 3e464ec add3bed 3e464ec add3bed a303b0f add3bed a303b0f add3bed a303b0f add3bed a303b0f add3bed 3e464ec add3bed 3e464ec |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
import os
import json
import tempfile
from datetime import datetime, timezone
from huggingface_hub import hf_hub_download, upload_file
from huggingface_hub.utils import HfHubHTTPError
# Constant for the dataset repository, configurable via environment variable
DATASET_REPO = os.getenv("HF_DATASET_REPO", "Jofthomas/geoguessr_game_of_the_day")
BLOCK_MULTIPLE_GAMES = os.getenv("BLOCK_MULTIPLE_GAMES", "True").lower() == "true"
def get_todays_records_path() -> str:
"""Gets the path for today's game records file, e.g., 'records/2025-10-03.json'."""
date_str = datetime.now(timezone.utc).strftime('%Y-%m-%d')
return f"records/{date_str}.json"
def get_todays_games(token: str) -> list:
"""
Downloads and reads the game records for the current day from the HF Hub.
Returns an empty list if the file for today doesn't exist yet.
"""
filepath = get_todays_records_path()
try:
# Use the provided token for read access
local_path = hf_hub_download(
repo_id=DATASET_REPO,
filename=filepath,
repo_type="dataset",
token=token,
)
with open(local_path, "r", encoding="utf-8") as f:
return json.load(f)
except HfHubHTTPError as e:
if e.response.status_code == 404:
return [] # No games played today yet, which is normal.
else:
print(f"Error downloading daily records: {e}")
raise # Re-raise other HTTP errors
except Exception as e:
print(f"An unexpected error occurred while getting today's games: {e}")
return []
def has_user_played_today(username: str, todays_games: list) -> bool:
"""Checks if a user has completed a game today."""
for game in todays_games:
if game.get("username") == username and game.get("completed", False):
return True
return False
def get_user_game_today(username: str, todays_games: list) -> dict:
"""Gets the user's game record for today, if it exists."""
for game in todays_games:
if game.get("username") == username:
return game
return None
def update_game_record(username: str, round_data: dict = None, final_score: float = None, final_ai_score: float = None):
"""
Updates or creates a game record for a user after each round.
This ensures data is recorded incrementally and prevents abuse.
Args:
username: The player's username
round_data: Single round details to append to the record
final_score: Final total human score (only set when game is complete)
final_ai_score: Final total AI score (only set when game is complete)
"""
write_token = os.getenv("HF_TOKEN", "")
if not write_token:
print("Warning: Server HF_TOKEN not set. Cannot record game data.")
return
try:
# Fetch the latest records
todays_games = get_todays_games(token=write_token)
# Find existing game record for this user today
existing_game = get_user_game_today(username, todays_games)
if existing_game:
# Update existing record
if round_data:
if "rounds" not in existing_game:
existing_game["rounds"] = []
existing_game["rounds"].append(round_data)
if final_score is not None:
existing_game["human_score"] = int(round(final_score))
existing_game["completed"] = True
if final_ai_score is not None:
existing_game["ai_score"] = int(round(final_ai_score))
else:
# Create new game record
game_record = {
"username": username,
"human_score": 0, # Will be updated when game completes
"ai_score": 0, # Will be updated when game completes
"timestamp": datetime.now(timezone.utc).isoformat(),
"rounds": [round_data] if round_data else [],
"completed": False
}
todays_games.append(game_record)
filepath_in_repo = get_todays_records_path()
with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".json", encoding="utf-8") as tmp_file:
json.dump(todays_games, tmp_file, indent=2)
tmp_file_path = tmp_file.name
upload_file(
path_or_fileobj=tmp_file_path,
path_in_repo=filepath_in_repo,
repo_id=DATASET_REPO,
repo_type="dataset",
token=write_token,
commit_message=f"Update game for {username}"
)
print(f"Successfully updated game record for {username}")
except Exception as e:
print(f"Error updating game record for {username}: {e}")
finally:
if 'tmp_file_path' in locals() and os.path.exists(tmp_file_path):
os.remove(tmp_file_path)
def record_game(username: str, score: float, rounds_data: list = None, ai_score: float = None):
"""
Legacy function - now just calls update_game_record with final score.
Kept for backwards compatibility.
"""
update_game_record(username, final_score=score, final_ai_score=ai_score)
|