""" 非同期レート制限管理クラス 1日1回制限とAPI呼び出し制限を管理し、 デバッグモード時の制限緩和機能を提供します。 """ import asyncio import os from datetime import datetime, timedelta from typing import Dict, Any, Optional, Tuple import logging # ログ設定 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class RateLimitError(Exception): """レート制限エラー""" pass class AsyncRateLimitManager: """非同期レート制限管理クラス""" def __init__(self, storage_manager, max_requests: int = 1): self.storage = storage_manager # 設定値(環境変数から取得、デフォルト値あり) self.max_daily_requests = int(os.getenv("MAX_DAILY_REQUESTS", "1")) self.max_api_calls_per_day = int(os.getenv("MAX_API_CALLS_PER_DAY", "10")) self.debug_mode = os.getenv("DEBUG_MODE", "false").lower() == "true" # デバッグモード時の制限緩和 if self.debug_mode: self.max_daily_requests = int(os.getenv("DEBUG_MAX_DAILY_REQUESTS", "10")) self.max_api_calls_per_day = int(os.getenv("DEBUG_MAX_API_CALLS", "100")) logger.info("デバッグモードが有効です。制限が緩和されています") logger.info(f"レート制限設定 - 1日のリクエスト上限: {self.max_daily_requests}, API呼び出し上限: {self.max_api_calls_per_day}") async def check_daily_request_limit(self, user_id: str) -> Tuple[bool, Dict[str, Any]]: """1日のリクエスト制限をチェック""" try: user_data = await self.storage.get_user_data(user_id) today = datetime.now().strftime("%Y-%m-%d") # 今日のリクエスト数を取得 daily_requests = user_data["rate_limits"]["daily_requests"] today_requests = daily_requests.get(today, 0) # 制限チェック is_allowed = today_requests < self.max_daily_requests limit_info = { "today_requests": today_requests, "max_requests": self.max_daily_requests, "remaining": max(0, self.max_daily_requests - today_requests), "reset_time": self._get_next_reset_time(), "debug_mode": self.debug_mode } if not is_allowed: logger.warning(f"ユーザー {user_id} の1日のリクエスト制限に達しました ({today_requests}/{self.max_daily_requests})") return is_allowed, limit_info except Exception as e: logger.error(f"1日のリクエスト制限チェックエラー: {e}") # エラー時は制限を適用 return False, {"error": str(e)} async def check_api_call_limit(self, user_id: str) -> Tuple[bool, Dict[str, Any]]: """API呼び出し制限をチェック""" try: user_data = await self.storage.get_user_data(user_id) today = datetime.now().strftime("%Y-%m-%d") # 今日のAPI呼び出し数を取得 api_calls = user_data["rate_limits"]["api_calls"] today_calls = api_calls.get(today, 0) # 制限チェック is_allowed = today_calls < self.max_api_calls_per_day limit_info = { "today_calls": today_calls, "max_calls": self.max_api_calls_per_day, "remaining": max(0, self.max_api_calls_per_day - today_calls), "reset_time": self._get_next_reset_time(), "debug_mode": self.debug_mode } if not is_allowed: logger.warning(f"ユーザー {user_id} のAPI呼び出し制限に達しました ({today_calls}/{self.max_api_calls_per_day})") return is_allowed, limit_info except Exception as e: logger.error(f"API呼び出し制限チェックエラー: {e}") # エラー時は制限を適用 return False, {"error": str(e)} async def record_request(self, user_id: str) -> None: """リクエストを記録""" try: user_data = await self.storage.get_user_data(user_id) today = datetime.now().strftime("%Y-%m-%d") # 今日のリクエスト数を増加 if "daily_requests" not in user_data["rate_limits"]: user_data["rate_limits"]["daily_requests"] = {} user_data["rate_limits"]["daily_requests"][today] = \ user_data["rate_limits"]["daily_requests"].get(today, 0) + 1 # プロファイルの最終リクエスト日を更新 user_data["profile"]["last_request"] = today await self.storage.update_user_data(user_id, user_data) logger.info(f"ユーザー {user_id} のリクエストを記録しました") except Exception as e: logger.error(f"リクエスト記録エラー: {e}") raise RateLimitError(f"リクエストの記録に失敗しました: {e}") async def record_api_call(self, user_id: str, api_type: str = "general") -> None: """API呼び出しを記録""" try: user_data = await self.storage.get_user_data(user_id) today = datetime.now().strftime("%Y-%m-%d") # 今日のAPI呼び出し数を増加 if "api_calls" not in user_data["rate_limits"]: user_data["rate_limits"]["api_calls"] = {} user_data["rate_limits"]["api_calls"][today] = \ user_data["rate_limits"]["api_calls"].get(today, 0) + 1 await self.storage.update_user_data(user_id, user_data) logger.info(f"ユーザー {user_id} のAPI呼び出し ({api_type}) を記録しました") except Exception as e: logger.error(f"API呼び出し記録エラー: {e}") raise RateLimitError(f"API呼び出しの記録に失敗しました: {e}") async def get_user_limits_status(self, user_id: str) -> Dict[str, Any]: """ユーザーの制限状況を取得""" try: # リクエスト制限の確認 request_allowed, request_info = await self.check_daily_request_limit(user_id) # API呼び出し制限の確認 api_allowed, api_info = await self.check_api_call_limit(user_id) # 次回リクエスト可能時刻の計算 next_request_time = None if not request_allowed: next_request_time = self._get_next_reset_time() return { "request_limit": { "allowed": request_allowed, "info": request_info }, "api_limit": { "allowed": api_allowed, "info": api_info }, "next_request_time": next_request_time, "debug_mode": self.debug_mode } except Exception as e: logger.error(f"制限状況取得エラー: {e}") return {"error": str(e)} async def reset_daily_counters(self) -> int: """1日のカウンターをリセット(古いデータを削除)""" try: # 7日以上前のデータを削除 cutoff_date = datetime.now() - timedelta(days=7) cutoff_str = cutoff_date.strftime("%Y-%m-%d") all_users = await self.storage.get_all_users() reset_count = 0 for user_id in all_users: user_data = await self.storage.get_user_data(user_id) # 古い1日のリクエストデータを削除 daily_requests = user_data["rate_limits"]["daily_requests"] dates_to_delete = [date for date in daily_requests.keys() if date < cutoff_str] for date in dates_to_delete: del daily_requests[date] reset_count += 1 # 古いAPI呼び出しデータを削除 api_calls = user_data["rate_limits"]["api_calls"] dates_to_delete = [date for date in api_calls.keys() if date < cutoff_str] for date in dates_to_delete: del api_calls[date] reset_count += 1 if reset_count > 0: await self.storage.update_user_data(user_id, user_data) if reset_count > 0: logger.info(f"{reset_count}件の古い制限データをリセットしました") return reset_count except Exception as e: logger.error(f"カウンターリセットエラー: {e}") return 0 def _get_next_reset_time(self) -> str: """次のリセット時刻を取得(翌日の0時)""" tomorrow = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) return tomorrow.isoformat() async def is_request_allowed(self, user_id: str) -> Tuple[bool, str]: """リクエストが許可されているかチェック(統合チェック)""" try: # 1日のリクエスト制限チェック request_allowed, request_info = await self.check_daily_request_limit(user_id) if not request_allowed: remaining_time = self._calculate_remaining_time() return False, f"1日のリクエスト制限に達しています。次回リクエスト可能時刻: {remaining_time}" # API呼び出し制限チェック api_allowed, api_info = await self.check_api_call_limit(user_id) if not api_allowed: remaining_time = self._calculate_remaining_time() return False, f"API呼び出し制限に達しています。次回リクエスト可能時刻: {remaining_time}" return True, "リクエスト可能です" except Exception as e: logger.error(f"リクエスト許可チェックエラー: {e}") return False, f"制限チェック中にエラーが発生しました: {e}" def _calculate_remaining_time(self) -> str: """次回リクエスト可能までの残り時間を計算""" now = datetime.now() tomorrow = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) remaining = tomorrow - now hours = remaining.seconds // 3600 minutes = (remaining.seconds % 3600) // 60 return f"{hours}時間{minutes}分後" async def get_rate_limit_stats(self) -> Dict[str, Any]: """レート制限の統計情報を取得""" try: all_users = await self.storage.get_all_users() today = datetime.now().strftime("%Y-%m-%d") total_requests_today = 0 total_api_calls_today = 0 active_users_today = 0 for user_id in all_users: user_data = await self.storage.get_user_data(user_id) # 今日のリクエスト数 daily_requests = user_data["rate_limits"]["daily_requests"] user_requests_today = daily_requests.get(today, 0) total_requests_today += user_requests_today # 今日のAPI呼び出し数 api_calls = user_data["rate_limits"]["api_calls"] user_api_calls_today = api_calls.get(today, 0) total_api_calls_today += user_api_calls_today # アクティブユーザー数 if user_requests_today > 0 or user_api_calls_today > 0: active_users_today += 1 return { "total_users": len(all_users), "active_users_today": active_users_today, "total_requests_today": total_requests_today, "total_api_calls_today": total_api_calls_today, "max_daily_requests": self.max_daily_requests, "max_api_calls_per_day": self.max_api_calls_per_day, "debug_mode": self.debug_mode, "date": today } except Exception as e: logger.error(f"統計情報取得エラー: {e}") return {"error": str(e)} def is_debug_mode(self) -> bool: """デバッグモードかどうかを確認""" return self.debug_mode async def set_debug_mode(self, enabled: bool) -> None: """デバッグモードの設定(動的変更)""" self.debug_mode = enabled if enabled: self.max_daily_requests = int(os.getenv("DEBUG_MAX_DAILY_REQUESTS", "10")) self.max_api_calls_per_day = int(os.getenv("DEBUG_MAX_API_CALLS", "100")) logger.info("デバッグモードを有効にしました") else: self.max_daily_requests = int(os.getenv("MAX_DAILY_REQUESTS", "1")) self.max_api_calls_per_day = int(os.getenv("MAX_API_CALLS_PER_DAY", "10")) logger.info("デバッグモードを無効にしました") async def force_reset_user_limits(self, user_id: str) -> None: """特定ユーザーの制限を強制リセット(デバッグ用)""" if not self.debug_mode: raise RateLimitError("デバッグモードでのみ利用可能です") try: user_data = await self.storage.get_user_data(user_id) today = datetime.now().strftime("%Y-%m-%d") # 今日の制限をリセット user_data["rate_limits"]["daily_requests"][today] = 0 user_data["rate_limits"]["api_calls"][today] = 0 await self.storage.update_user_data(user_id, user_data) logger.info(f"ユーザー {user_id} の制限を強制リセットしました") except Exception as e: logger.error(f"強制リセットエラー: {e}") raise RateLimitError(f"制限のリセットに失敗しました: {e}") # テスト用の関数 async def test_rate_limit_manager(): """RateLimitManagerのテスト""" import tempfile import uuid from async_storage_manager import AsyncStorageManager # 一時ディレクトリでテスト with tempfile.TemporaryDirectory() as temp_dir: test_file = os.path.join(temp_dir, "test_letters.json") storage = AsyncStorageManager(test_file) rate_limiter = AsyncRateLimitManager(storage) print("=== RateLimitManagerテスト開始 ===") user_id = str(uuid.uuid4()) # 初回リクエストチェック allowed, message = await rate_limiter.is_request_allowed(user_id) print(f"✓ 初回リクエストチェック: {allowed} - {message}") # リクエスト記録 await rate_limiter.record_request(user_id) print("✓ リクエスト記録成功") # API呼び出し記録 await rate_limiter.record_api_call(user_id, "groq") print("✓ API呼び出し記録成功") # 制限状況確認 status = await rate_limiter.get_user_limits_status(user_id) print(f"✓ 制限状況確認: {status}") # 統計情報取得 stats = await rate_limiter.get_rate_limit_stats() print(f"✓ 統計情報取得: {stats}") # デバッグモードテスト await rate_limiter.set_debug_mode(True) print("✓ デバッグモード有効化成功") # 強制リセットテスト(デバッグモード時のみ) await rate_limiter.force_reset_user_limits(user_id) print("✓ 強制リセット成功") print("=== 全てのテストが完了しました! ===") if __name__ == "__main__": asyncio.run(test_rate_limit_manager())