mari-chat-3 / async_rate_limiter.py
sirochild's picture
Upload 57 files
a73fa4e verified
raw
history blame
16.8 kB
"""
非同期レート制限管理クラス
1日1回制限とAPI呼び出し制限を管理し、
デバッグモード時の制限緩和機能を提供します。
"""
import asyncio
import os
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, Tuple
import logging
# ログ設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RateLimitError(Exception):
"""レート制限エラー"""
pass
class AsyncRateLimitManager:
"""非同期レート制限管理クラス"""
def __init__(self, storage_manager, max_requests: int = 1):
self.storage = storage_manager
# 設定値(環境変数から取得、デフォルト値あり)
self.max_daily_requests = int(os.getenv("MAX_DAILY_REQUESTS", "1"))
self.max_api_calls_per_day = int(os.getenv("MAX_API_CALLS_PER_DAY", "10"))
self.debug_mode = os.getenv("DEBUG_MODE", "false").lower() == "true"
# デバッグモード時の制限緩和
if self.debug_mode:
self.max_daily_requests = int(os.getenv("DEBUG_MAX_DAILY_REQUESTS", "10"))
self.max_api_calls_per_day = int(os.getenv("DEBUG_MAX_API_CALLS", "100"))
logger.info("デバッグモードが有効です。制限が緩和されています")
logger.info(f"レート制限設定 - 1日のリクエスト上限: {self.max_daily_requests}, API呼び出し上限: {self.max_api_calls_per_day}")
async def check_daily_request_limit(self, user_id: str) -> Tuple[bool, Dict[str, Any]]:
"""1日のリクエスト制限をチェック"""
try:
user_data = await self.storage.get_user_data(user_id)
today = datetime.now().strftime("%Y-%m-%d")
# 今日のリクエスト数を取得
daily_requests = user_data["rate_limits"]["daily_requests"]
today_requests = daily_requests.get(today, 0)
# 制限チェック
is_allowed = today_requests < self.max_daily_requests
limit_info = {
"today_requests": today_requests,
"max_requests": self.max_daily_requests,
"remaining": max(0, self.max_daily_requests - today_requests),
"reset_time": self._get_next_reset_time(),
"debug_mode": self.debug_mode
}
if not is_allowed:
logger.warning(f"ユーザー {user_id} の1日のリクエスト制限に達しました ({today_requests}/{self.max_daily_requests})")
return is_allowed, limit_info
except Exception as e:
logger.error(f"1日のリクエスト制限チェックエラー: {e}")
# エラー時は制限を適用
return False, {"error": str(e)}
async def check_api_call_limit(self, user_id: str) -> Tuple[bool, Dict[str, Any]]:
"""API呼び出し制限をチェック"""
try:
user_data = await self.storage.get_user_data(user_id)
today = datetime.now().strftime("%Y-%m-%d")
# 今日のAPI呼び出し数を取得
api_calls = user_data["rate_limits"]["api_calls"]
today_calls = api_calls.get(today, 0)
# 制限チェック
is_allowed = today_calls < self.max_api_calls_per_day
limit_info = {
"today_calls": today_calls,
"max_calls": self.max_api_calls_per_day,
"remaining": max(0, self.max_api_calls_per_day - today_calls),
"reset_time": self._get_next_reset_time(),
"debug_mode": self.debug_mode
}
if not is_allowed:
logger.warning(f"ユーザー {user_id} のAPI呼び出し制限に達しました ({today_calls}/{self.max_api_calls_per_day})")
return is_allowed, limit_info
except Exception as e:
logger.error(f"API呼び出し制限チェックエラー: {e}")
# エラー時は制限を適用
return False, {"error": str(e)}
async def record_request(self, user_id: str) -> None:
"""リクエストを記録"""
try:
user_data = await self.storage.get_user_data(user_id)
today = datetime.now().strftime("%Y-%m-%d")
# 今日のリクエスト数を増加
if "daily_requests" not in user_data["rate_limits"]:
user_data["rate_limits"]["daily_requests"] = {}
user_data["rate_limits"]["daily_requests"][today] = \
user_data["rate_limits"]["daily_requests"].get(today, 0) + 1
# プロファイルの最終リクエスト日を更新
user_data["profile"]["last_request"] = today
await self.storage.update_user_data(user_id, user_data)
logger.info(f"ユーザー {user_id} のリクエストを記録しました")
except Exception as e:
logger.error(f"リクエスト記録エラー: {e}")
raise RateLimitError(f"リクエストの記録に失敗しました: {e}")
async def record_api_call(self, user_id: str, api_type: str = "general") -> None:
"""API呼び出しを記録"""
try:
user_data = await self.storage.get_user_data(user_id)
today = datetime.now().strftime("%Y-%m-%d")
# 今日のAPI呼び出し数を増加
if "api_calls" not in user_data["rate_limits"]:
user_data["rate_limits"]["api_calls"] = {}
user_data["rate_limits"]["api_calls"][today] = \
user_data["rate_limits"]["api_calls"].get(today, 0) + 1
await self.storage.update_user_data(user_id, user_data)
logger.info(f"ユーザー {user_id} のAPI呼び出し ({api_type}) を記録しました")
except Exception as e:
logger.error(f"API呼び出し記録エラー: {e}")
raise RateLimitError(f"API呼び出しの記録に失敗しました: {e}")
async def get_user_limits_status(self, user_id: str) -> Dict[str, Any]:
"""ユーザーの制限状況を取得"""
try:
# リクエスト制限の確認
request_allowed, request_info = await self.check_daily_request_limit(user_id)
# API呼び出し制限の確認
api_allowed, api_info = await self.check_api_call_limit(user_id)
# 次回リクエスト可能時刻の計算
next_request_time = None
if not request_allowed:
next_request_time = self._get_next_reset_time()
return {
"request_limit": {
"allowed": request_allowed,
"info": request_info
},
"api_limit": {
"allowed": api_allowed,
"info": api_info
},
"next_request_time": next_request_time,
"debug_mode": self.debug_mode
}
except Exception as e:
logger.error(f"制限状況取得エラー: {e}")
return {"error": str(e)}
async def reset_daily_counters(self) -> int:
"""1日のカウンターをリセット(古いデータを削除)"""
try:
# 7日以上前のデータを削除
cutoff_date = datetime.now() - timedelta(days=7)
cutoff_str = cutoff_date.strftime("%Y-%m-%d")
all_users = await self.storage.get_all_users()
reset_count = 0
for user_id in all_users:
user_data = await self.storage.get_user_data(user_id)
# 古い1日のリクエストデータを削除
daily_requests = user_data["rate_limits"]["daily_requests"]
dates_to_delete = [date for date in daily_requests.keys() if date < cutoff_str]
for date in dates_to_delete:
del daily_requests[date]
reset_count += 1
# 古いAPI呼び出しデータを削除
api_calls = user_data["rate_limits"]["api_calls"]
dates_to_delete = [date for date in api_calls.keys() if date < cutoff_str]
for date in dates_to_delete:
del api_calls[date]
reset_count += 1
if reset_count > 0:
await self.storage.update_user_data(user_id, user_data)
if reset_count > 0:
logger.info(f"{reset_count}件の古い制限データをリセットしました")
return reset_count
except Exception as e:
logger.error(f"カウンターリセットエラー: {e}")
return 0
def _get_next_reset_time(self) -> str:
"""次のリセット時刻を取得(翌日の0時)"""
tomorrow = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
return tomorrow.isoformat()
async def is_request_allowed(self, user_id: str) -> Tuple[bool, str]:
"""リクエストが許可されているかチェック(統合チェック)"""
try:
# 1日のリクエスト制限チェック
request_allowed, request_info = await self.check_daily_request_limit(user_id)
if not request_allowed:
remaining_time = self._calculate_remaining_time()
return False, f"1日のリクエスト制限に達しています。次回リクエスト可能時刻: {remaining_time}"
# API呼び出し制限チェック
api_allowed, api_info = await self.check_api_call_limit(user_id)
if not api_allowed:
remaining_time = self._calculate_remaining_time()
return False, f"API呼び出し制限に達しています。次回リクエスト可能時刻: {remaining_time}"
return True, "リクエスト可能です"
except Exception as e:
logger.error(f"リクエスト許可チェックエラー: {e}")
return False, f"制限チェック中にエラーが発生しました: {e}"
def _calculate_remaining_time(self) -> str:
"""次回リクエスト可能までの残り時間を計算"""
now = datetime.now()
tomorrow = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
remaining = tomorrow - now
hours = remaining.seconds // 3600
minutes = (remaining.seconds % 3600) // 60
return f"{hours}時間{minutes}分後"
async def get_rate_limit_stats(self) -> Dict[str, Any]:
"""レート制限の統計情報を取得"""
try:
all_users = await self.storage.get_all_users()
today = datetime.now().strftime("%Y-%m-%d")
total_requests_today = 0
total_api_calls_today = 0
active_users_today = 0
for user_id in all_users:
user_data = await self.storage.get_user_data(user_id)
# 今日のリクエスト数
daily_requests = user_data["rate_limits"]["daily_requests"]
user_requests_today = daily_requests.get(today, 0)
total_requests_today += user_requests_today
# 今日のAPI呼び出し数
api_calls = user_data["rate_limits"]["api_calls"]
user_api_calls_today = api_calls.get(today, 0)
total_api_calls_today += user_api_calls_today
# アクティブユーザー数
if user_requests_today > 0 or user_api_calls_today > 0:
active_users_today += 1
return {
"total_users": len(all_users),
"active_users_today": active_users_today,
"total_requests_today": total_requests_today,
"total_api_calls_today": total_api_calls_today,
"max_daily_requests": self.max_daily_requests,
"max_api_calls_per_day": self.max_api_calls_per_day,
"debug_mode": self.debug_mode,
"date": today
}
except Exception as e:
logger.error(f"統計情報取得エラー: {e}")
return {"error": str(e)}
def is_debug_mode(self) -> bool:
"""デバッグモードかどうかを確認"""
return self.debug_mode
async def set_debug_mode(self, enabled: bool) -> None:
"""デバッグモードの設定(動的変更)"""
self.debug_mode = enabled
if enabled:
self.max_daily_requests = int(os.getenv("DEBUG_MAX_DAILY_REQUESTS", "10"))
self.max_api_calls_per_day = int(os.getenv("DEBUG_MAX_API_CALLS", "100"))
logger.info("デバッグモードを有効にしました")
else:
self.max_daily_requests = int(os.getenv("MAX_DAILY_REQUESTS", "1"))
self.max_api_calls_per_day = int(os.getenv("MAX_API_CALLS_PER_DAY", "10"))
logger.info("デバッグモードを無効にしました")
async def force_reset_user_limits(self, user_id: str) -> None:
"""特定ユーザーの制限を強制リセット(デバッグ用)"""
if not self.debug_mode:
raise RateLimitError("デバッグモードでのみ利用可能です")
try:
user_data = await self.storage.get_user_data(user_id)
today = datetime.now().strftime("%Y-%m-%d")
# 今日の制限をリセット
user_data["rate_limits"]["daily_requests"][today] = 0
user_data["rate_limits"]["api_calls"][today] = 0
await self.storage.update_user_data(user_id, user_data)
logger.info(f"ユーザー {user_id} の制限を強制リセットしました")
except Exception as e:
logger.error(f"強制リセットエラー: {e}")
raise RateLimitError(f"制限のリセットに失敗しました: {e}")
# テスト用の関数
async def test_rate_limit_manager():
"""RateLimitManagerのテスト"""
import tempfile
import uuid
from async_storage_manager import AsyncStorageManager
# 一時ディレクトリでテスト
with tempfile.TemporaryDirectory() as temp_dir:
test_file = os.path.join(temp_dir, "test_letters.json")
storage = AsyncStorageManager(test_file)
rate_limiter = AsyncRateLimitManager(storage)
print("=== RateLimitManagerテスト開始 ===")
user_id = str(uuid.uuid4())
# 初回リクエストチェック
allowed, message = await rate_limiter.is_request_allowed(user_id)
print(f"✓ 初回リクエストチェック: {allowed} - {message}")
# リクエスト記録
await rate_limiter.record_request(user_id)
print("✓ リクエスト記録成功")
# API呼び出し記録
await rate_limiter.record_api_call(user_id, "groq")
print("✓ API呼び出し記録成功")
# 制限状況確認
status = await rate_limiter.get_user_limits_status(user_id)
print(f"✓ 制限状況確認: {status}")
# 統計情報取得
stats = await rate_limiter.get_rate_limit_stats()
print(f"✓ 統計情報取得: {stats}")
# デバッグモードテスト
await rate_limiter.set_debug_mode(True)
print("✓ デバッグモード有効化成功")
# 強制リセットテスト(デバッグモード時のみ)
await rate_limiter.force_reset_user_limits(user_id)
print("✓ 強制リセット成功")
print("=== 全てのテストが完了しました! ===")
if __name__ == "__main__":
asyncio.run(test_rate_limit_manager())