Spaces:
Runtime error
Runtime error
File size: 16,751 Bytes
a73fa4e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 |
"""
非同期レート制限管理クラス
1日1回制限とAPI呼び出し制限を管理し、
デバッグモード時の制限緩和機能を提供します。
"""
import asyncio
import os
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, Tuple
import logging
# ログ設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RateLimitError(Exception):
"""レート制限エラー"""
pass
class AsyncRateLimitManager:
"""非同期レート制限管理クラス"""
def __init__(self, storage_manager, max_requests: int = 1):
self.storage = storage_manager
# 設定値(環境変数から取得、デフォルト値あり)
self.max_daily_requests = int(os.getenv("MAX_DAILY_REQUESTS", "1"))
self.max_api_calls_per_day = int(os.getenv("MAX_API_CALLS_PER_DAY", "10"))
self.debug_mode = os.getenv("DEBUG_MODE", "false").lower() == "true"
# デバッグモード時の制限緩和
if self.debug_mode:
self.max_daily_requests = int(os.getenv("DEBUG_MAX_DAILY_REQUESTS", "10"))
self.max_api_calls_per_day = int(os.getenv("DEBUG_MAX_API_CALLS", "100"))
logger.info("デバッグモードが有効です。制限が緩和されています")
logger.info(f"レート制限設定 - 1日のリクエスト上限: {self.max_daily_requests}, API呼び出し上限: {self.max_api_calls_per_day}")
async def check_daily_request_limit(self, user_id: str) -> Tuple[bool, Dict[str, Any]]:
"""1日のリクエスト制限をチェック"""
try:
user_data = await self.storage.get_user_data(user_id)
today = datetime.now().strftime("%Y-%m-%d")
# 今日のリクエスト数を取得
daily_requests = user_data["rate_limits"]["daily_requests"]
today_requests = daily_requests.get(today, 0)
# 制限チェック
is_allowed = today_requests < self.max_daily_requests
limit_info = {
"today_requests": today_requests,
"max_requests": self.max_daily_requests,
"remaining": max(0, self.max_daily_requests - today_requests),
"reset_time": self._get_next_reset_time(),
"debug_mode": self.debug_mode
}
if not is_allowed:
logger.warning(f"ユーザー {user_id} の1日のリクエスト制限に達しました ({today_requests}/{self.max_daily_requests})")
return is_allowed, limit_info
except Exception as e:
logger.error(f"1日のリクエスト制限チェックエラー: {e}")
# エラー時は制限を適用
return False, {"error": str(e)}
async def check_api_call_limit(self, user_id: str) -> Tuple[bool, Dict[str, Any]]:
"""API呼び出し制限をチェック"""
try:
user_data = await self.storage.get_user_data(user_id)
today = datetime.now().strftime("%Y-%m-%d")
# 今日のAPI呼び出し数を取得
api_calls = user_data["rate_limits"]["api_calls"]
today_calls = api_calls.get(today, 0)
# 制限チェック
is_allowed = today_calls < self.max_api_calls_per_day
limit_info = {
"today_calls": today_calls,
"max_calls": self.max_api_calls_per_day,
"remaining": max(0, self.max_api_calls_per_day - today_calls),
"reset_time": self._get_next_reset_time(),
"debug_mode": self.debug_mode
}
if not is_allowed:
logger.warning(f"ユーザー {user_id} のAPI呼び出し制限に達しました ({today_calls}/{self.max_api_calls_per_day})")
return is_allowed, limit_info
except Exception as e:
logger.error(f"API呼び出し制限チェックエラー: {e}")
# エラー時は制限を適用
return False, {"error": str(e)}
async def record_request(self, user_id: str) -> None:
"""リクエストを記録"""
try:
user_data = await self.storage.get_user_data(user_id)
today = datetime.now().strftime("%Y-%m-%d")
# 今日のリクエスト数を増加
if "daily_requests" not in user_data["rate_limits"]:
user_data["rate_limits"]["daily_requests"] = {}
user_data["rate_limits"]["daily_requests"][today] = \
user_data["rate_limits"]["daily_requests"].get(today, 0) + 1
# プロファイルの最終リクエスト日を更新
user_data["profile"]["last_request"] = today
await self.storage.update_user_data(user_id, user_data)
logger.info(f"ユーザー {user_id} のリクエストを記録しました")
except Exception as e:
logger.error(f"リクエスト記録エラー: {e}")
raise RateLimitError(f"リクエストの記録に失敗しました: {e}")
async def record_api_call(self, user_id: str, api_type: str = "general") -> None:
"""API呼び出しを記録"""
try:
user_data = await self.storage.get_user_data(user_id)
today = datetime.now().strftime("%Y-%m-%d")
# 今日のAPI呼び出し数を増加
if "api_calls" not in user_data["rate_limits"]:
user_data["rate_limits"]["api_calls"] = {}
user_data["rate_limits"]["api_calls"][today] = \
user_data["rate_limits"]["api_calls"].get(today, 0) + 1
await self.storage.update_user_data(user_id, user_data)
logger.info(f"ユーザー {user_id} のAPI呼び出し ({api_type}) を記録しました")
except Exception as e:
logger.error(f"API呼び出し記録エラー: {e}")
raise RateLimitError(f"API呼び出しの記録に失敗しました: {e}")
async def get_user_limits_status(self, user_id: str) -> Dict[str, Any]:
"""ユーザーの制限状況を取得"""
try:
# リクエスト制限の確認
request_allowed, request_info = await self.check_daily_request_limit(user_id)
# API呼び出し制限の確認
api_allowed, api_info = await self.check_api_call_limit(user_id)
# 次回リクエスト可能時刻の計算
next_request_time = None
if not request_allowed:
next_request_time = self._get_next_reset_time()
return {
"request_limit": {
"allowed": request_allowed,
"info": request_info
},
"api_limit": {
"allowed": api_allowed,
"info": api_info
},
"next_request_time": next_request_time,
"debug_mode": self.debug_mode
}
except Exception as e:
logger.error(f"制限状況取得エラー: {e}")
return {"error": str(e)}
async def reset_daily_counters(self) -> int:
"""1日のカウンターをリセット(古いデータを削除)"""
try:
# 7日以上前のデータを削除
cutoff_date = datetime.now() - timedelta(days=7)
cutoff_str = cutoff_date.strftime("%Y-%m-%d")
all_users = await self.storage.get_all_users()
reset_count = 0
for user_id in all_users:
user_data = await self.storage.get_user_data(user_id)
# 古い1日のリクエストデータを削除
daily_requests = user_data["rate_limits"]["daily_requests"]
dates_to_delete = [date for date in daily_requests.keys() if date < cutoff_str]
for date in dates_to_delete:
del daily_requests[date]
reset_count += 1
# 古いAPI呼び出しデータを削除
api_calls = user_data["rate_limits"]["api_calls"]
dates_to_delete = [date for date in api_calls.keys() if date < cutoff_str]
for date in dates_to_delete:
del api_calls[date]
reset_count += 1
if reset_count > 0:
await self.storage.update_user_data(user_id, user_data)
if reset_count > 0:
logger.info(f"{reset_count}件の古い制限データをリセットしました")
return reset_count
except Exception as e:
logger.error(f"カウンターリセットエラー: {e}")
return 0
def _get_next_reset_time(self) -> str:
"""次のリセット時刻を取得(翌日の0時)"""
tomorrow = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
return tomorrow.isoformat()
async def is_request_allowed(self, user_id: str) -> Tuple[bool, str]:
"""リクエストが許可されているかチェック(統合チェック)"""
try:
# 1日のリクエスト制限チェック
request_allowed, request_info = await self.check_daily_request_limit(user_id)
if not request_allowed:
remaining_time = self._calculate_remaining_time()
return False, f"1日のリクエスト制限に達しています。次回リクエスト可能時刻: {remaining_time}"
# API呼び出し制限チェック
api_allowed, api_info = await self.check_api_call_limit(user_id)
if not api_allowed:
remaining_time = self._calculate_remaining_time()
return False, f"API呼び出し制限に達しています。次回リクエスト可能時刻: {remaining_time}"
return True, "リクエスト可能です"
except Exception as e:
logger.error(f"リクエスト許可チェックエラー: {e}")
return False, f"制限チェック中にエラーが発生しました: {e}"
def _calculate_remaining_time(self) -> str:
"""次回リクエスト可能までの残り時間を計算"""
now = datetime.now()
tomorrow = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
remaining = tomorrow - now
hours = remaining.seconds // 3600
minutes = (remaining.seconds % 3600) // 60
return f"{hours}時間{minutes}分後"
async def get_rate_limit_stats(self) -> Dict[str, Any]:
"""レート制限の統計情報を取得"""
try:
all_users = await self.storage.get_all_users()
today = datetime.now().strftime("%Y-%m-%d")
total_requests_today = 0
total_api_calls_today = 0
active_users_today = 0
for user_id in all_users:
user_data = await self.storage.get_user_data(user_id)
# 今日のリクエスト数
daily_requests = user_data["rate_limits"]["daily_requests"]
user_requests_today = daily_requests.get(today, 0)
total_requests_today += user_requests_today
# 今日のAPI呼び出し数
api_calls = user_data["rate_limits"]["api_calls"]
user_api_calls_today = api_calls.get(today, 0)
total_api_calls_today += user_api_calls_today
# アクティブユーザー数
if user_requests_today > 0 or user_api_calls_today > 0:
active_users_today += 1
return {
"total_users": len(all_users),
"active_users_today": active_users_today,
"total_requests_today": total_requests_today,
"total_api_calls_today": total_api_calls_today,
"max_daily_requests": self.max_daily_requests,
"max_api_calls_per_day": self.max_api_calls_per_day,
"debug_mode": self.debug_mode,
"date": today
}
except Exception as e:
logger.error(f"統計情報取得エラー: {e}")
return {"error": str(e)}
def is_debug_mode(self) -> bool:
"""デバッグモードかどうかを確認"""
return self.debug_mode
async def set_debug_mode(self, enabled: bool) -> None:
"""デバッグモードの設定(動的変更)"""
self.debug_mode = enabled
if enabled:
self.max_daily_requests = int(os.getenv("DEBUG_MAX_DAILY_REQUESTS", "10"))
self.max_api_calls_per_day = int(os.getenv("DEBUG_MAX_API_CALLS", "100"))
logger.info("デバッグモードを有効にしました")
else:
self.max_daily_requests = int(os.getenv("MAX_DAILY_REQUESTS", "1"))
self.max_api_calls_per_day = int(os.getenv("MAX_API_CALLS_PER_DAY", "10"))
logger.info("デバッグモードを無効にしました")
async def force_reset_user_limits(self, user_id: str) -> None:
"""特定ユーザーの制限を強制リセット(デバッグ用)"""
if not self.debug_mode:
raise RateLimitError("デバッグモードでのみ利用可能です")
try:
user_data = await self.storage.get_user_data(user_id)
today = datetime.now().strftime("%Y-%m-%d")
# 今日の制限をリセット
user_data["rate_limits"]["daily_requests"][today] = 0
user_data["rate_limits"]["api_calls"][today] = 0
await self.storage.update_user_data(user_id, user_data)
logger.info(f"ユーザー {user_id} の制限を強制リセットしました")
except Exception as e:
logger.error(f"強制リセットエラー: {e}")
raise RateLimitError(f"制限のリセットに失敗しました: {e}")
# テスト用の関数
async def test_rate_limit_manager():
"""RateLimitManagerのテスト"""
import tempfile
import uuid
from async_storage_manager import AsyncStorageManager
# 一時ディレクトリでテスト
with tempfile.TemporaryDirectory() as temp_dir:
test_file = os.path.join(temp_dir, "test_letters.json")
storage = AsyncStorageManager(test_file)
rate_limiter = AsyncRateLimitManager(storage)
print("=== RateLimitManagerテスト開始 ===")
user_id = str(uuid.uuid4())
# 初回リクエストチェック
allowed, message = await rate_limiter.is_request_allowed(user_id)
print(f"✓ 初回リクエストチェック: {allowed} - {message}")
# リクエスト記録
await rate_limiter.record_request(user_id)
print("✓ リクエスト記録成功")
# API呼び出し記録
await rate_limiter.record_api_call(user_id, "groq")
print("✓ API呼び出し記録成功")
# 制限状況確認
status = await rate_limiter.get_user_limits_status(user_id)
print(f"✓ 制限状況確認: {status}")
# 統計情報取得
stats = await rate_limiter.get_rate_limit_stats()
print(f"✓ 統計情報取得: {stats}")
# デバッグモードテスト
await rate_limiter.set_debug_mode(True)
print("✓ デバッグモード有効化成功")
# 強制リセットテスト(デバッグモード時のみ)
await rate_limiter.force_reset_user_limits(user_id)
print("✓ 強制リセット成功")
print("=== 全てのテストが完了しました! ===")
if __name__ == "__main__":
asyncio.run(test_rate_limit_manager()) |