Spaces:
Runtime error
Runtime error
| """ | |
| バッチスケジューラークラス | |
| 2時、3時、4時の時刻別バッチ処理機能を実装し、 | |
| 指定時刻の未処理リクエストを処理する機能を提供します。 | |
| """ | |
| import asyncio | |
| import os | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Any, List, Optional | |
| import logging | |
| import traceback | |
| from letter_request_manager import RequestManager | |
| from letter_generator import LetterGenerator | |
| from async_storage_manager import AsyncStorageManager | |
| from async_rate_limiter import AsyncRateLimitManager | |
| from letter_user_manager import UserManager | |
| # ログ設定 | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class BatchSchedulerError(Exception): | |
| """バッチスケジューラー関連のエラー""" | |
| pass | |
| class BatchScheduler: | |
| """非同期バッチ処理スケジューラー""" | |
| def __init__(self, storage_manager: Optional[AsyncStorageManager] = None): | |
| """ | |
| バッチスケジューラーを初期化 | |
| Args: | |
| storage_manager: ストレージマネージャー(指定しない場合は新規作成) | |
| """ | |
| # ストレージマネージャーの初期化 | |
| if storage_manager is None: | |
| storage_path = os.getenv("STORAGE_PATH", "/mnt/data/letters.json") | |
| self.storage_manager = AsyncStorageManager(storage_path) | |
| else: | |
| self.storage_manager = storage_manager | |
| # 依存コンポーネントの初期化 | |
| self.rate_limiter = AsyncRateLimitManager(self.storage_manager) | |
| self.request_manager = RequestManager(self.storage_manager, self.rate_limiter) | |
| self.letter_generator = LetterGenerator() | |
| self.user_manager = UserManager(self.storage_manager) | |
| # 設定値 | |
| self.available_hours = [2, 3, 4] # 2時、3時、4時 | |
| self.max_concurrent_generations = int(os.getenv("MAX_CONCURRENT_GENERATIONS", "3")) | |
| self.generation_timeout = int(os.getenv("GENERATION_TIMEOUT", "300")) # 5分 | |
| self.retry_failed_requests = os.getenv("RETRY_FAILED_REQUESTS", "true").lower() == "true" | |
| logger.info(f"BatchScheduler初期化完了 - 対象時刻: {self.available_hours}") | |
| async def run_hourly_batch(self, hour: int) -> Dict[str, Any]: | |
| """ | |
| 指定時刻のバッチ処理を実行 | |
| Args: | |
| hour: 実行時刻(2, 3, 4のいずれか) | |
| Returns: | |
| Dict: バッチ処理の結果 | |
| """ | |
| start_time = datetime.now() | |
| batch_id = f"{start_time.strftime('%Y%m%d_%H%M%S')}_{hour}" | |
| logger.info(f"=== {hour}時のバッチ処理開始 (ID: {batch_id}) ===") | |
| # 時刻の検証 | |
| if hour not in self.available_hours: | |
| error_msg = f"無効な時刻が指定されました: {hour}時 (有効: {self.available_hours})" | |
| logger.error(error_msg) | |
| return { | |
| "success": False, | |
| "error": error_msg, | |
| "batch_id": batch_id, | |
| "hour": hour, | |
| "start_time": start_time.isoformat(), | |
| "end_time": datetime.now().isoformat() | |
| } | |
| try: | |
| # バッチ実行の記録開始 | |
| await self._record_batch_start(batch_id, hour, start_time) | |
| # 指定時刻の未処理リクエストを処理 | |
| result = await self.process_pending_requests_for_hour(hour) | |
| # バッチ実行の記録完了 | |
| end_time = datetime.now() | |
| await self._record_batch_completion(batch_id, hour, start_time, end_time, result) | |
| execution_time = (end_time - start_time).total_seconds() | |
| logger.info(f"=== {hour}時のバッチ処理完了 (所要時間: {execution_time:.2f}秒) ===") | |
| return { | |
| "success": True, | |
| "batch_id": batch_id, | |
| "hour": hour, | |
| "start_time": start_time.isoformat(), | |
| "end_time": end_time.isoformat(), | |
| "execution_time": execution_time, | |
| "processed_count": result.get("processed_count", 0), | |
| "success_count": result.get("success_count", 0), | |
| "failed_count": result.get("failed_count", 0), | |
| "errors": result.get("errors", []) | |
| } | |
| except Exception as e: | |
| end_time = datetime.now() | |
| execution_time = (end_time - start_time).total_seconds() | |
| error_msg = f"バッチ処理中にエラーが発生しました: {str(e)}" | |
| logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
| # エラーの記録 | |
| await self._record_batch_error(batch_id, hour, start_time, end_time, error_msg) | |
| return { | |
| "success": False, | |
| "error": error_msg, | |
| "batch_id": batch_id, | |
| "hour": hour, | |
| "start_time": start_time.isoformat(), | |
| "end_time": end_time.isoformat(), | |
| "execution_time": execution_time | |
| } | |
| async def process_pending_requests_for_hour(self, hour: int) -> Dict[str, Any]: | |
| """ | |
| 指定時刻の未処理リクエストを処理 | |
| Args: | |
| hour: 処理対象の時刻 | |
| Returns: | |
| Dict: 処理結果の詳細 | |
| """ | |
| try: | |
| # 未処理リクエストを取得 | |
| pending_requests = await self.request_manager.get_pending_requests_by_hour(hour) | |
| if not pending_requests: | |
| logger.info(f"{hour}時の未処理リクエストはありません") | |
| return { | |
| "processed_count": 0, | |
| "success_count": 0, | |
| "failed_count": 0, | |
| "errors": [] | |
| } | |
| logger.info(f"{hour}時の未処理リクエスト数: {len(pending_requests)}") | |
| # 並行処理用のセマフォ | |
| semaphore = asyncio.Semaphore(self.max_concurrent_generations) | |
| # 各リクエストを並行処理 | |
| tasks = [] | |
| for request in pending_requests: | |
| task = self._process_single_request(semaphore, request) | |
| tasks.append(task) | |
| # 全てのタスクを実行 | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # 結果を集計 | |
| success_count = 0 | |
| failed_count = 0 | |
| errors = [] | |
| for i, result in enumerate(results): | |
| if isinstance(result, Exception): | |
| failed_count += 1 | |
| error_msg = f"リクエスト処理例外: {str(result)}" | |
| errors.append({ | |
| "request_index": i, | |
| "user_id": pending_requests[i].get("user_id", "unknown"), | |
| "error": error_msg | |
| }) | |
| logger.error(error_msg) | |
| elif result.get("success", False): | |
| success_count += 1 | |
| else: | |
| failed_count += 1 | |
| errors.append({ | |
| "request_index": i, | |
| "user_id": pending_requests[i].get("user_id", "unknown"), | |
| "error": result.get("error", "不明なエラー") | |
| }) | |
| logger.info(f"処理完了 - 成功: {success_count}, 失敗: {failed_count}") | |
| return { | |
| "processed_count": len(pending_requests), | |
| "success_count": success_count, | |
| "failed_count": failed_count, | |
| "errors": errors | |
| } | |
| except Exception as e: | |
| error_msg = f"未処理リクエスト処理中にエラーが発生: {str(e)}" | |
| logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
| return { | |
| "processed_count": 0, | |
| "success_count": 0, | |
| "failed_count": 0, | |
| "errors": [{"error": error_msg}] | |
| } | |
| async def _process_single_request(self, semaphore: asyncio.Semaphore, request: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| 単一のリクエストを処理 | |
| Args: | |
| semaphore: 並行処理制御用のセマフォ | |
| request: 処理対象のリクエスト | |
| Returns: | |
| Dict: 処理結果 | |
| """ | |
| async with semaphore: | |
| user_id = request["user_id"] | |
| theme = request["theme"] | |
| date = request["date"] | |
| try: | |
| logger.info(f"手紙生成開始 - ユーザー: {user_id}, テーマ: {theme[:50]}...") | |
| # タイムアウト付きで手紙生成を実行 | |
| user_history = await self.user_manager.get_user_profile(user_id) | |
| generation_task = self.letter_generator.generate_letter(user_id, theme, user_history) | |
| letter_result = await asyncio.wait_for(generation_task, timeout=self.generation_timeout) | |
| # 生成された手紙をストレージに保存 | |
| await self._save_generated_letter(user_id, date, theme, letter_result) | |
| # リクエストを完了としてマーク | |
| await self.request_manager.mark_request_processed(user_id, date, "completed") | |
| # ユーザー履歴を更新 | |
| await self.user_manager.update_user_history(user_id, { | |
| "date": date, | |
| "theme": theme, | |
| "status": "completed", | |
| "generated_at": datetime.now().isoformat() | |
| }) | |
| logger.info(f"手紙生成完了 - ユーザー: {user_id}") | |
| return { | |
| "success": True, | |
| "user_id": user_id, | |
| "theme": theme, | |
| "generation_time": letter_result["metadata"].get("generation_time", 0) | |
| } | |
| except asyncio.TimeoutError: | |
| error_msg = f"手紙生成がタイムアウトしました({self.generation_timeout}秒)" | |
| logger.error(f"{error_msg} - ユーザー: {user_id}") | |
| await self.request_manager.mark_request_failed(user_id, date, error_msg) | |
| return { | |
| "success": False, | |
| "user_id": user_id, | |
| "error": error_msg | |
| } | |
| except Exception as e: | |
| error_msg = f"手紙生成中にエラーが発生: {str(e)}" | |
| logger.error(f"{error_msg} - ユーザー: {user_id}\n{traceback.format_exc()}") | |
| await self.request_manager.mark_request_failed(user_id, date, error_msg) | |
| return { | |
| "success": False, | |
| "user_id": user_id, | |
| "error": error_msg | |
| } | |
| async def _save_generated_letter(self, user_id: str, date: str, theme: str, letter_result: Dict[str, Any]) -> None: | |
| """ | |
| 生成された手紙をストレージに保存 | |
| Args: | |
| user_id: ユーザーID | |
| date: 日付 | |
| theme: テーマ | |
| letter_result: 生成結果 | |
| """ | |
| try: | |
| user_data = await self.storage_manager.get_user_data(user_id) | |
| # 手紙データを作成 | |
| letter_data = { | |
| "theme": theme, | |
| "content": letter_result["content"], | |
| "status": "completed", | |
| "generated_at": datetime.now().isoformat(), | |
| "metadata": letter_result["metadata"] | |
| } | |
| # ユーザーデータに追加 | |
| user_data["letters"][date] = letter_data | |
| user_data["profile"]["total_letters"] = user_data["profile"].get("total_letters", 0) + 1 | |
| user_data["profile"]["last_request"] = date | |
| # ストレージに保存 | |
| await self.storage_manager.update_user_data(user_id, user_data) | |
| logger.info(f"手紙をストレージに保存しました - ユーザー: {user_id}, 日付: {date}") | |
| except Exception as e: | |
| logger.error(f"手紙保存エラー - ユーザー: {user_id}, 日付: {date}: {str(e)}") | |
| raise | |
| async def _record_batch_start(self, batch_id: str, hour: int, start_time: datetime) -> None: | |
| """バッチ実行開始を記録""" | |
| try: | |
| system_info = await self.storage_manager.get_system_info() | |
| if "batch_runs" not in system_info: | |
| system_info["batch_runs"] = {} | |
| system_info["batch_runs"][batch_id] = { | |
| "hour": hour, | |
| "start_time": start_time.isoformat(), | |
| "status": "running" | |
| } | |
| await self.storage_manager.update_system_info(system_info) | |
| except Exception as e: | |
| logger.error(f"バッチ開始記録エラー: {str(e)}") | |
| async def _record_batch_completion(self, batch_id: str, hour: int, start_time: datetime, | |
| end_time: datetime, result: Dict[str, Any]) -> None: | |
| """バッチ実行完了を記録""" | |
| try: | |
| system_info = await self.storage_manager.get_system_info() | |
| if batch_id in system_info.get("batch_runs", {}): | |
| system_info["batch_runs"][batch_id].update({ | |
| "end_time": end_time.isoformat(), | |
| "status": "completed", | |
| "execution_time": (end_time - start_time).total_seconds(), | |
| "processed_count": result.get("processed_count", 0), | |
| "success_count": result.get("success_count", 0), | |
| "failed_count": result.get("failed_count", 0), | |
| "error_count": len(result.get("errors", [])) | |
| }) | |
| await self.storage_manager.update_system_info(system_info) | |
| except Exception as e: | |
| logger.error(f"バッチ完了記録エラー: {str(e)}") | |
| async def _record_batch_error(self, batch_id: str, hour: int, start_time: datetime, | |
| end_time: datetime, error_msg: str) -> None: | |
| """バッチ実行エラーを記録""" | |
| try: | |
| system_info = await self.storage_manager.get_system_info() | |
| if batch_id in system_info.get("batch_runs", {}): | |
| system_info["batch_runs"][batch_id].update({ | |
| "end_time": end_time.isoformat(), | |
| "status": "failed", | |
| "execution_time": (end_time - start_time).total_seconds(), | |
| "error": error_msg | |
| }) | |
| await self.storage_manager.update_system_info(system_info) | |
| except Exception as e: | |
| logger.error(f"バッチエラー記録エラー: {str(e)}") | |
| def schedule_all_hours(self) -> None: | |
| """ | |
| 全ての対象時刻でのスケジュール設定 | |
| 注意: この関数は実際のスケジューリングライブラリと組み合わせて使用する | |
| """ | |
| logger.info("バッチスケジュールを設定します") | |
| for hour in self.available_hours: | |
| logger.info(f" - {hour}:00 に手紙生成バッチを設定") | |
| # 実際のスケジューリングは外部ライブラリ(APScheduler等)で実装 | |
| # ここでは設定情報のログ出力のみ | |
| logger.info("スケジュール設定完了(実際の実行は外部スケジューラーが必要)") | |
| async def cleanup_old_data(self, days: int = 90) -> Dict[str, Any]: | |
| """ | |
| 古いデータの自動削除 | |
| Args: | |
| days: 保持日数 | |
| Returns: | |
| Dict: 削除結果 | |
| """ | |
| try: | |
| logger.info(f"{days}日以前の古いデータを削除します") | |
| # ストレージマネージャーの削除機能を使用 | |
| deleted_count = await self.storage_manager.cleanup_old_data(days) | |
| # リクエストマネージャーの削除機能も使用 | |
| deleted_requests = await self.request_manager.cleanup_old_requests(days) | |
| # バックアップの作成 | |
| backup_path = await self.storage_manager.backup_data() | |
| result = { | |
| "success": True, | |
| "deleted_letters": deleted_count, | |
| "deleted_requests": deleted_requests, | |
| "backup_created": backup_path, | |
| "cleanup_date": datetime.now().isoformat() | |
| } | |
| logger.info(f"古いデータ削除完了: {result}") | |
| return result | |
| except Exception as e: | |
| error_msg = f"古いデータ削除中にエラーが発生: {str(e)}" | |
| logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
| return { | |
| "success": False, | |
| "error": error_msg, | |
| "cleanup_date": datetime.now().isoformat() | |
| } | |
| async def get_batch_statistics(self, days: int = 7) -> Dict[str, Any]: | |
| """ | |
| バッチ処理の統計情報を取得 | |
| Args: | |
| days: 統計対象日数 | |
| Returns: | |
| Dict: 統計情報 | |
| """ | |
| try: | |
| system_info = await self.storage_manager.get_system_info() | |
| batch_runs = system_info.get("batch_runs", {}) | |
| # 指定日数以内のバッチ実行を抽出 | |
| cutoff_date = datetime.now() - timedelta(days=days) | |
| recent_batches = [] | |
| for batch_id, batch_info in batch_runs.items(): | |
| try: | |
| start_time = datetime.fromisoformat(batch_info["start_time"]) | |
| if start_time >= cutoff_date: | |
| recent_batches.append(batch_info) | |
| except (ValueError, KeyError): | |
| continue | |
| # 統計を計算 | |
| total_runs = len(recent_batches) | |
| successful_runs = len([b for b in recent_batches if b.get("status") == "completed"]) | |
| failed_runs = len([b for b in recent_batches if b.get("status") == "failed"]) | |
| total_processed = sum(b.get("processed_count", 0) for b in recent_batches) | |
| total_success = sum(b.get("success_count", 0) for b in recent_batches) | |
| total_failed = sum(b.get("failed_count", 0) for b in recent_batches) | |
| avg_execution_time = 0 | |
| if recent_batches: | |
| execution_times = [b.get("execution_time", 0) for b in recent_batches if b.get("execution_time")] | |
| if execution_times: | |
| avg_execution_time = sum(execution_times) / len(execution_times) | |
| # 時刻別統計 | |
| hourly_stats = {hour: {"runs": 0, "processed": 0, "success": 0} for hour in self.available_hours} | |
| for batch in recent_batches: | |
| hour = batch.get("hour") | |
| if hour in hourly_stats: | |
| hourly_stats[hour]["runs"] += 1 | |
| hourly_stats[hour]["processed"] += batch.get("processed_count", 0) | |
| hourly_stats[hour]["success"] += batch.get("success_count", 0) | |
| return { | |
| "period_days": days, | |
| "total_runs": total_runs, | |
| "successful_runs": successful_runs, | |
| "failed_runs": failed_runs, | |
| "success_rate": (successful_runs / total_runs * 100) if total_runs > 0 else 0, | |
| "total_processed": total_processed, | |
| "total_success": total_success, | |
| "total_failed": total_failed, | |
| "processing_success_rate": (total_success / total_processed * 100) if total_processed > 0 else 0, | |
| "avg_execution_time": avg_execution_time, | |
| "hourly_stats": hourly_stats, | |
| "generated_at": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"統計情報取得エラー: {str(e)}") | |
| return {"error": str(e)} | |
| async def test_batch_processing(self, test_hour: int = 2) -> Dict[str, Any]: | |
| """ | |
| バッチ処理のテスト実行 | |
| Args: | |
| test_hour: テスト対象の時刻 | |
| Returns: | |
| Dict: テスト結果 | |
| """ | |
| try: | |
| logger.info(f"バッチ処理テストを開始します({test_hour}時)") | |
| # API接続テスト | |
| api_status = await self.letter_generator.check_api_connections() | |
| if not api_status.get("overall", False): | |
| return { | |
| "success": False, | |
| "error": "API接続テストに失敗しました", | |
| "api_status": api_status | |
| } | |
| # テスト用のバッチ処理を実行 | |
| result = await self.run_hourly_batch(test_hour) | |
| return { | |
| "success": True, | |
| "message": "バッチ処理テスト完了", | |
| "api_status": api_status, | |
| "batch_result": result | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"バッチ処理テスト中にエラーが発生: {str(e)}" | |
| } | |
| # テスト用の関数 | |
| async def test_batch_scheduler(): | |
| """BatchSchedulerのテスト""" | |
| import tempfile | |
| import uuid | |
| # 一時ディレクトリでテスト | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| test_file = os.path.join(temp_dir, "test_letters.json") | |
| storage = AsyncStorageManager(test_file) | |
| scheduler = BatchScheduler(storage) | |
| print("=== BatchSchedulerテスト開始 ===") | |
| # テスト用のリクエストを作成 | |
| user_id = str(uuid.uuid4()) | |
| success, message = await scheduler.request_manager.submit_request(user_id, "テストテーマ", 2) | |
| print(f"✓ テストリクエスト作成: {success} - {message}") | |
| # バッチ処理テスト | |
| result = await scheduler.run_hourly_batch(2) | |
| print(f"✓ バッチ処理テスト: {result['success']}") | |
| # 統計情報テスト | |
| stats = await scheduler.get_batch_statistics() | |
| print(f"✓ 統計情報取得テスト: {stats}") | |
| # 古いデータ削除テスト | |
| cleanup_result = await scheduler.cleanup_old_data(0) # 全て削除 | |
| print(f"✓ データ削除テスト: {cleanup_result['success']}") | |
| print("=== 全てのテストが完了しました! ===") | |
| if __name__ == "__main__": | |
| asyncio.run(test_batch_scheduler()) |