Spaces:
Runtime error
Runtime error
| """ | |
| セッション管理クラス - セッション分離強化のための基盤実装 | |
| このモジュールは、Streamlitアプリケーションにおけるセッション分離を強化し、 | |
| 複数ユーザー間でのデータ混在を防ぐためのSessionManagerクラスを提供します。 | |
| """ | |
| import streamlit as st | |
| import logging | |
| from datetime import datetime | |
| from typing import Dict, Any, Optional, List | |
| import uuid | |
| logger = logging.getLogger(__name__) | |
| class SessionManager: | |
| """ | |
| セッション管理クラス | |
| 各ユーザーセッションの独立性を保証し、セッション整合性の検証、 | |
| セッション情報の取得機能を提供します。 | |
| """ | |
| def __init__(self): | |
| """ | |
| SessionManagerを初期化する | |
| セッションID、ユーザーID、作成時刻を記録し、 | |
| セッション固有の識別子を設定します。 | |
| """ | |
| # セッション固有の識別子を生成 | |
| self.session_id = id(st.session_state) | |
| self.user_id = None # 後でユーザーIDが設定される | |
| self.created_at = datetime.now() | |
| self.last_validated = datetime.now() | |
| self.validation_count = 0 | |
| self.recovery_count = 0 | |
| # 検証履歴の記録用リスト | |
| self.validation_history: List[Dict[str, Any]] = [] | |
| self.recovery_history: List[Dict[str, Any]] = [] | |
| logger.info(f"SessionManager initialized - Session ID: {self.session_id}") | |
| def set_user_id(self, user_id: str): | |
| """ | |
| ユーザーIDを設定する | |
| Args: | |
| user_id (str): 設定するユーザーID | |
| """ | |
| self.user_id = user_id | |
| logger.debug(f"User ID set for session {self.session_id}: {user_id}") | |
| def validate_session_integrity(self) -> bool: | |
| """ | |
| セッション整合性を検証する | |
| 現在のセッションIDと初期化時のセッションIDを比較し、 | |
| セッションの整合性を確認します。 | |
| Returns: | |
| bool: セッションが整合している場合True、そうでなければFalse | |
| """ | |
| current_session_id = id(st.session_state) | |
| stored_session_id = st.session_state.get('_session_id') | |
| is_consistent = self.session_id == current_session_id | |
| # 検証回数をカウント | |
| self.validation_count += 1 | |
| validation_time = datetime.now() | |
| self.last_validated = validation_time | |
| # 検証履歴を記録 | |
| validation_record = { | |
| "timestamp": validation_time.isoformat(), | |
| "validation_count": self.validation_count, | |
| "original_session_id": self.session_id, | |
| "current_session_id": current_session_id, | |
| "stored_session_id": stored_session_id, | |
| "is_consistent": is_consistent, | |
| "session_keys_count": len(st.session_state.keys()), | |
| "user_id": self.user_id | |
| } | |
| # 履歴リストのサイズ制限(最新100件まで保持) | |
| if len(self.validation_history) >= 100: | |
| self.validation_history.pop(0) | |
| self.validation_history.append(validation_record) | |
| if not is_consistent: | |
| logger.warning( | |
| f"Session integrity violation detected! " | |
| f"Original: {self.session_id}, Current: {current_session_id}, " | |
| f"Stored: {stored_session_id}, Validation #{self.validation_count}" | |
| ) | |
| else: | |
| logger.debug(f"Session integrity validated successfully (count: {self.validation_count})") | |
| return is_consistent | |
| def recover_session(self): | |
| """ | |
| セッション不整合時の復旧処理 | |
| セッションIDの不整合が検出された場合に、 | |
| セッション状態を修正して整合性を回復します。 | |
| """ | |
| old_session_id = self.session_id | |
| new_session_id = id(st.session_state) | |
| recovery_time = datetime.now() | |
| # セッションIDを現在の値に更新 | |
| self.session_id = new_session_id | |
| self.recovery_count += 1 | |
| self.last_validated = recovery_time | |
| # 復旧履歴を記録 | |
| recovery_record = { | |
| "timestamp": recovery_time.isoformat(), | |
| "recovery_count": self.recovery_count, | |
| "old_session_id": old_session_id, | |
| "new_session_id": new_session_id, | |
| "user_id": self.user_id, | |
| "recovery_type": "session_id_mismatch", | |
| "session_keys_count": len(st.session_state.keys()), | |
| "validation_count_at_recovery": self.validation_count | |
| } | |
| # 履歴リストのサイズ制限(最新50件まで保持) | |
| if len(self.recovery_history) >= 50: | |
| self.recovery_history.pop(0) | |
| self.recovery_history.append(recovery_record) | |
| logger.info( | |
| f"Session recovered - Old ID: {old_session_id}, " | |
| f"New ID: {new_session_id}, Recovery count: {self.recovery_count}" | |
| ) | |
| # セッション状態にも新しいIDを記録 | |
| st.session_state._session_id = new_session_id | |
| def get_session_info(self) -> Dict[str, Any]: | |
| """ | |
| セッション情報を取得する | |
| 現在のセッション状態、検証履歴、復旧履歴などの | |
| 詳細な情報を辞書形式で返します。 | |
| Returns: | |
| Dict[str, Any]: セッション情報を含む辞書 | |
| """ | |
| current_session_id = id(st.session_state) | |
| is_consistent = self.session_id == current_session_id | |
| session_info = { | |
| "session_id": self.session_id, | |
| "current_session_id": current_session_id, | |
| "user_id": self.user_id, | |
| "created_at": self.created_at.isoformat(), | |
| "last_validated": self.last_validated.isoformat(), | |
| "validation_count": self.validation_count, | |
| "recovery_count": self.recovery_count, | |
| "is_consistent": is_consistent, | |
| "session_age_seconds": (datetime.now() - self.created_at).total_seconds(), | |
| "stored_session_id": st.session_state.get('_session_id'), | |
| "session_keys": list(st.session_state.keys()), | |
| "validation_history_count": len(self.validation_history), | |
| "recovery_history_count": len(self.recovery_history), | |
| "last_validation_result": self.validation_history[-1] if self.validation_history else None, | |
| "last_recovery_result": self.recovery_history[-1] if self.recovery_history else None | |
| } | |
| return session_info | |
| def get_validation_history(self, limit: int = 10) -> List[Dict[str, Any]]: | |
| """ | |
| 検証履歴を取得する | |
| Args: | |
| limit (int): 取得する履歴の最大件数(デフォルト: 10) | |
| Returns: | |
| List[Dict[str, Any]]: 検証履歴のリスト(新しい順) | |
| """ | |
| return self.validation_history[-limit:] if self.validation_history else [] | |
| def get_recovery_history(self, limit: int = 10) -> List[Dict[str, Any]]: | |
| """ | |
| 復旧履歴を取得する | |
| Args: | |
| limit (int): 取得する履歴の最大件数(デフォルト: 10) | |
| Returns: | |
| List[Dict[str, Any]]: 復旧履歴のリスト(新しい順) | |
| """ | |
| return self.recovery_history[-limit:] if self.recovery_history else [] | |
| def get_isolation_status(self) -> Dict[str, Any]: | |
| """ | |
| セッション分離状態を取得する | |
| 各コンポーネントの分離状態を確認し、 | |
| セッション独立性の詳細情報を返します。 | |
| Returns: | |
| Dict[str, Any]: 分離状態情報を含む辞書 | |
| """ | |
| isolation_status = { | |
| "session_isolation": { | |
| "session_manager_present": hasattr(st.session_state, '_session_manager'), | |
| "session_id_consistent": self.validate_session_integrity(), | |
| "user_id_set": self.user_id is not None | |
| }, | |
| "component_isolation": { | |
| "chat_isolated": 'chat' in st.session_state, | |
| "memory_isolated": 'memory_manager' in st.session_state, | |
| "notifications_isolated": all(key in st.session_state for key in [ | |
| 'memory_notifications', 'affection_notifications' | |
| ]), | |
| "rate_limit_isolated": 'chat' in st.session_state and | |
| 'limiter_state' in st.session_state.get('chat', {}) | |
| }, | |
| "data_integrity": { | |
| "chat_messages_count": len(st.session_state.get('chat', {}).get('messages', [])), | |
| "memory_cache_size": len(getattr(st.session_state.get('memory_manager'), 'important_words_cache', [])), | |
| "special_memories_count": len(getattr(st.session_state.get('memory_manager'), 'special_memories', [])), | |
| "pending_notifications": { | |
| "memory": len(st.session_state.get('memory_notifications', [])), | |
| "affection": len(st.session_state.get('affection_notifications', [])) | |
| } | |
| } | |
| } | |
| return isolation_status | |
| def reset_session_data(self): | |
| """ | |
| セッションデータを完全にリセットする | |
| フルリセット時に使用 | |
| """ | |
| try: | |
| # 検証履歴をクリア | |
| self.validation_history.clear() | |
| self.recovery_history.clear() | |
| # カウンターをリセット | |
| self.validation_count = 0 | |
| self.recovery_count = 0 | |
| # タイムスタンプを更新 | |
| self.last_validated = datetime.now() | |
| logger.info("SessionManagerのデータをリセットしました") | |
| except Exception as e: | |
| logger.error(f"SessionManagerリセットエラー: {e}") | |
| def get_isolation_summary(self) -> Dict[str, str]: | |
| """ | |
| セッション分離状態のサマリーを取得する | |
| デバッグ表示用の簡潔な分離状態情報を返します。 | |
| Returns: | |
| Dict[str, str]: 分離状態のサマリー情報 | |
| """ | |
| isolation_status = self.get_isolation_status() | |
| # 各カテゴリの状態を評価 | |
| session_ok = all(isolation_status["session_isolation"].values()) | |
| components_ok = all(isolation_status["component_isolation"].values()) | |
| # データ整合性の評価 | |
| data_integrity = isolation_status["data_integrity"] | |
| data_ok = ( | |
| data_integrity["chat_messages_count"] >= 0 and | |
| data_integrity["memory_cache_size"] >= 0 and | |
| data_integrity["special_memories_count"] >= 0 | |
| ) | |
| # 全体的な健全性評価 | |
| overall_health = "正常" if (session_ok and components_ok and data_ok) else "要注意" | |
| summary = { | |
| "overall_health": overall_health, | |
| "session_isolation": "✅ 正常" if session_ok else "❌ 問題あり", | |
| "component_isolation": "✅ 正常" if components_ok else "❌ 問題あり", | |
| "data_integrity": "✅ 正常" if data_ok else "❌ 問題あり", | |
| "validation_status": f"検証{self.validation_count}回/復旧{self.recovery_count}回", | |
| "session_age": f"{round((datetime.now() - self.created_at).total_seconds() / 60, 1)}分", | |
| "last_check": f"{round((datetime.now() - self.last_validated).total_seconds(), 1)}秒前" | |
| } | |
| return summary | |
| def __str__(self) -> str: | |
| """ | |
| SessionManagerの文字列表現 | |
| Returns: | |
| str: セッション情報の要約 | |
| """ | |
| return ( | |
| f"SessionManager(session_id={self.session_id}, " | |
| f"user_id={self.user_id}, " | |
| f"validations={self.validation_count}, " | |
| f"recoveries={self.recovery_count})" | |
| ) | |
| def __repr__(self) -> str: | |
| """ | |
| SessionManagerの詳細な文字列表現 | |
| Returns: | |
| str: セッション情報の詳細 | |
| """ | |
| return self.__str__() | |
| def get_session_manager() -> SessionManager: | |
| """ | |
| 現在のセッションのSessionManagerインスタンスを取得する | |
| セッション状態にSessionManagerが存在しない場合は新規作成します。 | |
| Returns: | |
| SessionManager: 現在のセッションのSessionManagerインスタンス | |
| """ | |
| if '_session_manager' not in st.session_state: | |
| st.session_state._session_manager = SessionManager() | |
| logger.info("New SessionManager created for current session") | |
| return st.session_state._session_manager | |
| def validate_session_state() -> bool: | |
| """ | |
| 現在のセッション状態を検証する | |
| SessionManagerを使用してセッション整合性をチェックし、 | |
| 不整合が検出された場合は自動復旧を試行します。 | |
| Returns: | |
| bool: セッションが整合している場合True、復旧に失敗した場合False | |
| """ | |
| try: | |
| session_manager = get_session_manager() | |
| # 基本的なセッション整合性チェック | |
| if not session_manager.validate_session_integrity(): | |
| logger.warning("Session inconsistency detected, attempting recovery...") | |
| session_manager.recover_session() | |
| # 復旧後に再度検証 | |
| if session_manager.validate_session_integrity(): | |
| logger.info("Session recovery successful") | |
| else: | |
| logger.error("Session recovery failed") | |
| return False | |
| # 追加の整合性チェック | |
| validation_results = perform_detailed_session_validation(session_manager) | |
| # 重要な不整合が検出された場合はログに記録 | |
| critical_issues = [issue for issue in validation_results if issue.get('severity') == 'critical'] | |
| if critical_issues: | |
| logger.error(f"Critical session validation issues detected: {critical_issues}") | |
| return False | |
| # 警告レベルの問題はログに記録するが、処理は継続 | |
| warning_issues = [issue for issue in validation_results if issue.get('severity') == 'warning'] | |
| if warning_issues: | |
| logger.warning(f"Session validation warnings: {warning_issues}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Session validation failed with exception: {e}") | |
| return False | |
| def perform_detailed_session_validation(session_manager: SessionManager) -> List[Dict[str, Any]]: | |
| """ | |
| 詳細なセッション検証を実行する | |
| Args: | |
| session_manager (SessionManager): 検証対象のSessionManagerインスタンス | |
| Returns: | |
| List[Dict[str, Any]]: 検証結果のリスト(問題が検出された場合のみ) | |
| """ | |
| validation_issues = [] | |
| try: | |
| # 1. セッションID比較による整合性チェック | |
| current_session_id = id(st.session_state) | |
| stored_session_id = st.session_state.get('_session_id') | |
| if session_manager.session_id != current_session_id: | |
| validation_issues.append({ | |
| "type": "session_id_mismatch", | |
| "severity": "critical", | |
| "description": "SessionManager session_id does not match current session", | |
| "details": { | |
| "manager_session_id": session_manager.session_id, | |
| "current_session_id": current_session_id | |
| } | |
| }) | |
| if stored_session_id and stored_session_id != current_session_id: | |
| validation_issues.append({ | |
| "type": "stored_session_id_mismatch", | |
| "severity": "warning", | |
| "description": "Stored session_id does not match current session", | |
| "details": { | |
| "stored_session_id": stored_session_id, | |
| "current_session_id": current_session_id | |
| } | |
| }) | |
| # 2. 必須セッション状態の存在チェック(初期化後のみ) | |
| # 初期化中の場合はこのチェックをスキップ | |
| if st.session_state.get('_initialization_complete', False): | |
| required_keys = ['user_id', 'chat', 'memory_manager'] | |
| for key in required_keys: | |
| if key not in st.session_state: | |
| validation_issues.append({ | |
| "type": "missing_required_key", | |
| "severity": "critical", | |
| "description": f"Required session state key '{key}' is missing", | |
| "details": {"missing_key": key} | |
| }) | |
| # 3. チャット状態の整合性チェック | |
| if 'chat' in st.session_state: | |
| chat_state = st.session_state.chat | |
| required_chat_keys = ['messages', 'affection', 'scene_params', 'limiter_state'] | |
| for key in required_chat_keys: | |
| if key not in chat_state: | |
| validation_issues.append({ | |
| "type": "missing_chat_key", | |
| "severity": "warning", | |
| "description": f"Required chat state key '{key}' is missing", | |
| "details": {"missing_chat_key": key} | |
| }) | |
| # 好感度の範囲チェック | |
| affection = chat_state.get('affection', 0) | |
| if not (0 <= affection <= 100): | |
| validation_issues.append({ | |
| "type": "invalid_affection_value", | |
| "severity": "warning", | |
| "description": "Affection value is out of valid range (0-100)", | |
| "details": {"affection_value": affection} | |
| }) | |
| # 4. MemoryManagerの整合性チェック | |
| if 'memory_manager' in st.session_state: | |
| memory_manager = st.session_state.memory_manager | |
| if not hasattr(memory_manager, 'important_words_cache'): | |
| validation_issues.append({ | |
| "type": "invalid_memory_manager", | |
| "severity": "warning", | |
| "description": "MemoryManager instance is missing required attributes", | |
| "details": {"missing_attribute": "important_words_cache"} | |
| }) | |
| # 5. ユーザーIDの整合性チェック | |
| session_user_id = st.session_state.get('user_id') | |
| manager_user_id = session_manager.user_id | |
| if session_user_id and manager_user_id and session_user_id != manager_user_id: | |
| validation_issues.append({ | |
| "type": "user_id_mismatch", | |
| "severity": "warning", | |
| "description": "User ID mismatch between session state and SessionManager", | |
| "details": { | |
| "session_user_id": session_user_id, | |
| "manager_user_id": manager_user_id | |
| } | |
| }) | |
| # 6. 通知リストの整合性チェック | |
| notification_keys = ['memory_notifications', 'affection_notifications'] | |
| for key in notification_keys: | |
| if key in st.session_state: | |
| notifications = st.session_state[key] | |
| if not isinstance(notifications, list): | |
| validation_issues.append({ | |
| "type": "invalid_notification_type", | |
| "severity": "warning", | |
| "description": f"Notification key '{key}' is not a list", | |
| "details": {"key": key, "type": type(notifications).__name__} | |
| }) | |
| except Exception as e: | |
| validation_issues.append({ | |
| "type": "validation_exception", | |
| "severity": "critical", | |
| "description": "Exception occurred during detailed validation", | |
| "details": {"exception": str(e)} | |
| }) | |
| return validation_issues |