mari-chat-3 / session_manager.py
sirochild's picture
Upload 57 files
a73fa4e verified
raw
history blame
20.7 kB
"""
セッション管理クラス - セッション分離強化のための基盤実装
このモジュールは、Streamlitアプリケーションにおけるセッション分離を強化し、
複数ユーザー間でのデータ混在を防ぐためのSessionManagerクラスを提供します。
"""
import streamlit as st
import logging
from datetime import datetime
from typing import Dict, Any, Optional, List
import uuid
logger = logging.getLogger(__name__)
class SessionManager:
"""
セッション管理クラス
各ユーザーセッションの独立性を保証し、セッション整合性の検証、
セッション情報の取得機能を提供します。
"""
def __init__(self):
"""
SessionManagerを初期化する
セッションID、ユーザーID、作成時刻を記録し、
セッション固有の識別子を設定します。
"""
# セッション固有の識別子を生成
self.session_id = id(st.session_state)
self.user_id = None # 後でユーザーIDが設定される
self.created_at = datetime.now()
self.last_validated = datetime.now()
self.validation_count = 0
self.recovery_count = 0
# 検証履歴の記録用リスト
self.validation_history: List[Dict[str, Any]] = []
self.recovery_history: List[Dict[str, Any]] = []
logger.info(f"SessionManager initialized - Session ID: {self.session_id}")
def set_user_id(self, user_id: str):
"""
ユーザーIDを設定する
Args:
user_id (str): 設定するユーザーID
"""
self.user_id = user_id
logger.debug(f"User ID set for session {self.session_id}: {user_id}")
def validate_session_integrity(self) -> bool:
"""
セッション整合性を検証する
現在のセッションIDと初期化時のセッションIDを比較し、
セッションの整合性を確認します。
Returns:
bool: セッションが整合している場合True、そうでなければFalse
"""
current_session_id = id(st.session_state)
stored_session_id = st.session_state.get('_session_id')
is_consistent = self.session_id == current_session_id
# 検証回数をカウント
self.validation_count += 1
validation_time = datetime.now()
self.last_validated = validation_time
# 検証履歴を記録
validation_record = {
"timestamp": validation_time.isoformat(),
"validation_count": self.validation_count,
"original_session_id": self.session_id,
"current_session_id": current_session_id,
"stored_session_id": stored_session_id,
"is_consistent": is_consistent,
"session_keys_count": len(st.session_state.keys()),
"user_id": self.user_id
}
# 履歴リストのサイズ制限(最新100件まで保持)
if len(self.validation_history) >= 100:
self.validation_history.pop(0)
self.validation_history.append(validation_record)
if not is_consistent:
logger.warning(
f"Session integrity violation detected! "
f"Original: {self.session_id}, Current: {current_session_id}, "
f"Stored: {stored_session_id}, Validation #{self.validation_count}"
)
else:
logger.debug(f"Session integrity validated successfully (count: {self.validation_count})")
return is_consistent
def recover_session(self):
"""
セッション不整合時の復旧処理
セッションIDの不整合が検出された場合に、
セッション状態を修正して整合性を回復します。
"""
old_session_id = self.session_id
new_session_id = id(st.session_state)
recovery_time = datetime.now()
# セッションIDを現在の値に更新
self.session_id = new_session_id
self.recovery_count += 1
self.last_validated = recovery_time
# 復旧履歴を記録
recovery_record = {
"timestamp": recovery_time.isoformat(),
"recovery_count": self.recovery_count,
"old_session_id": old_session_id,
"new_session_id": new_session_id,
"user_id": self.user_id,
"recovery_type": "session_id_mismatch",
"session_keys_count": len(st.session_state.keys()),
"validation_count_at_recovery": self.validation_count
}
# 履歴リストのサイズ制限(最新50件まで保持)
if len(self.recovery_history) >= 50:
self.recovery_history.pop(0)
self.recovery_history.append(recovery_record)
logger.info(
f"Session recovered - Old ID: {old_session_id}, "
f"New ID: {new_session_id}, Recovery count: {self.recovery_count}"
)
# セッション状態にも新しいIDを記録
st.session_state._session_id = new_session_id
def get_session_info(self) -> Dict[str, Any]:
"""
セッション情報を取得する
現在のセッション状態、検証履歴、復旧履歴などの
詳細な情報を辞書形式で返します。
Returns:
Dict[str, Any]: セッション情報を含む辞書
"""
current_session_id = id(st.session_state)
is_consistent = self.session_id == current_session_id
session_info = {
"session_id": self.session_id,
"current_session_id": current_session_id,
"user_id": self.user_id,
"created_at": self.created_at.isoformat(),
"last_validated": self.last_validated.isoformat(),
"validation_count": self.validation_count,
"recovery_count": self.recovery_count,
"is_consistent": is_consistent,
"session_age_seconds": (datetime.now() - self.created_at).total_seconds(),
"stored_session_id": st.session_state.get('_session_id'),
"session_keys": list(st.session_state.keys()),
"validation_history_count": len(self.validation_history),
"recovery_history_count": len(self.recovery_history),
"last_validation_result": self.validation_history[-1] if self.validation_history else None,
"last_recovery_result": self.recovery_history[-1] if self.recovery_history else None
}
return session_info
def get_validation_history(self, limit: int = 10) -> List[Dict[str, Any]]:
"""
検証履歴を取得する
Args:
limit (int): 取得する履歴の最大件数(デフォルト: 10)
Returns:
List[Dict[str, Any]]: 検証履歴のリスト(新しい順)
"""
return self.validation_history[-limit:] if self.validation_history else []
def get_recovery_history(self, limit: int = 10) -> List[Dict[str, Any]]:
"""
復旧履歴を取得する
Args:
limit (int): 取得する履歴の最大件数(デフォルト: 10)
Returns:
List[Dict[str, Any]]: 復旧履歴のリスト(新しい順)
"""
return self.recovery_history[-limit:] if self.recovery_history else []
def get_isolation_status(self) -> Dict[str, Any]:
"""
セッション分離状態を取得する
各コンポーネントの分離状態を確認し、
セッション独立性の詳細情報を返します。
Returns:
Dict[str, Any]: 分離状態情報を含む辞書
"""
isolation_status = {
"session_isolation": {
"session_manager_present": hasattr(st.session_state, '_session_manager'),
"session_id_consistent": self.validate_session_integrity(),
"user_id_set": self.user_id is not None
},
"component_isolation": {
"chat_isolated": 'chat' in st.session_state,
"memory_isolated": 'memory_manager' in st.session_state,
"notifications_isolated": all(key in st.session_state for key in [
'memory_notifications', 'affection_notifications'
]),
"rate_limit_isolated": 'chat' in st.session_state and
'limiter_state' in st.session_state.get('chat', {})
},
"data_integrity": {
"chat_messages_count": len(st.session_state.get('chat', {}).get('messages', [])),
"memory_cache_size": len(getattr(st.session_state.get('memory_manager'), 'important_words_cache', [])),
"special_memories_count": len(getattr(st.session_state.get('memory_manager'), 'special_memories', [])),
"pending_notifications": {
"memory": len(st.session_state.get('memory_notifications', [])),
"affection": len(st.session_state.get('affection_notifications', []))
}
}
}
return isolation_status
def reset_session_data(self):
"""
セッションデータを完全にリセットする
フルリセット時に使用
"""
try:
# 検証履歴をクリア
self.validation_history.clear()
self.recovery_history.clear()
# カウンターをリセット
self.validation_count = 0
self.recovery_count = 0
# タイムスタンプを更新
self.last_validated = datetime.now()
logger.info("SessionManagerのデータをリセットしました")
except Exception as e:
logger.error(f"SessionManagerリセットエラー: {e}")
def get_isolation_summary(self) -> Dict[str, str]:
"""
セッション分離状態のサマリーを取得する
デバッグ表示用の簡潔な分離状態情報を返します。
Returns:
Dict[str, str]: 分離状態のサマリー情報
"""
isolation_status = self.get_isolation_status()
# 各カテゴリの状態を評価
session_ok = all(isolation_status["session_isolation"].values())
components_ok = all(isolation_status["component_isolation"].values())
# データ整合性の評価
data_integrity = isolation_status["data_integrity"]
data_ok = (
data_integrity["chat_messages_count"] >= 0 and
data_integrity["memory_cache_size"] >= 0 and
data_integrity["special_memories_count"] >= 0
)
# 全体的な健全性評価
overall_health = "正常" if (session_ok and components_ok and data_ok) else "要注意"
summary = {
"overall_health": overall_health,
"session_isolation": "✅ 正常" if session_ok else "❌ 問題あり",
"component_isolation": "✅ 正常" if components_ok else "❌ 問題あり",
"data_integrity": "✅ 正常" if data_ok else "❌ 問題あり",
"validation_status": f"検証{self.validation_count}回/復旧{self.recovery_count}回",
"session_age": f"{round((datetime.now() - self.created_at).total_seconds() / 60, 1)}分",
"last_check": f"{round((datetime.now() - self.last_validated).total_seconds(), 1)}秒前"
}
return summary
def __str__(self) -> str:
"""
SessionManagerの文字列表現
Returns:
str: セッション情報の要約
"""
return (
f"SessionManager(session_id={self.session_id}, "
f"user_id={self.user_id}, "
f"validations={self.validation_count}, "
f"recoveries={self.recovery_count})"
)
def __repr__(self) -> str:
"""
SessionManagerの詳細な文字列表現
Returns:
str: セッション情報の詳細
"""
return self.__str__()
def get_session_manager() -> SessionManager:
"""
現在のセッションのSessionManagerインスタンスを取得する
セッション状態にSessionManagerが存在しない場合は新規作成します。
Returns:
SessionManager: 現在のセッションのSessionManagerインスタンス
"""
if '_session_manager' not in st.session_state:
st.session_state._session_manager = SessionManager()
logger.info("New SessionManager created for current session")
return st.session_state._session_manager
def validate_session_state() -> bool:
"""
現在のセッション状態を検証する
SessionManagerを使用してセッション整合性をチェックし、
不整合が検出された場合は自動復旧を試行します。
Returns:
bool: セッションが整合している場合True、復旧に失敗した場合False
"""
try:
session_manager = get_session_manager()
# 基本的なセッション整合性チェック
if not session_manager.validate_session_integrity():
logger.warning("Session inconsistency detected, attempting recovery...")
session_manager.recover_session()
# 復旧後に再度検証
if session_manager.validate_session_integrity():
logger.info("Session recovery successful")
else:
logger.error("Session recovery failed")
return False
# 追加の整合性チェック
validation_results = perform_detailed_session_validation(session_manager)
# 重要な不整合が検出された場合はログに記録
critical_issues = [issue for issue in validation_results if issue.get('severity') == 'critical']
if critical_issues:
logger.error(f"Critical session validation issues detected: {critical_issues}")
return False
# 警告レベルの問題はログに記録するが、処理は継続
warning_issues = [issue for issue in validation_results if issue.get('severity') == 'warning']
if warning_issues:
logger.warning(f"Session validation warnings: {warning_issues}")
return True
except Exception as e:
logger.error(f"Session validation failed with exception: {e}")
return False
def perform_detailed_session_validation(session_manager: SessionManager) -> List[Dict[str, Any]]:
"""
詳細なセッション検証を実行する
Args:
session_manager (SessionManager): 検証対象のSessionManagerインスタンス
Returns:
List[Dict[str, Any]]: 検証結果のリスト(問題が検出された場合のみ)
"""
validation_issues = []
try:
# 1. セッションID比較による整合性チェック
current_session_id = id(st.session_state)
stored_session_id = st.session_state.get('_session_id')
if session_manager.session_id != current_session_id:
validation_issues.append({
"type": "session_id_mismatch",
"severity": "critical",
"description": "SessionManager session_id does not match current session",
"details": {
"manager_session_id": session_manager.session_id,
"current_session_id": current_session_id
}
})
if stored_session_id and stored_session_id != current_session_id:
validation_issues.append({
"type": "stored_session_id_mismatch",
"severity": "warning",
"description": "Stored session_id does not match current session",
"details": {
"stored_session_id": stored_session_id,
"current_session_id": current_session_id
}
})
# 2. 必須セッション状態の存在チェック(初期化後のみ)
# 初期化中の場合はこのチェックをスキップ
if st.session_state.get('_initialization_complete', False):
required_keys = ['user_id', 'chat', 'memory_manager']
for key in required_keys:
if key not in st.session_state:
validation_issues.append({
"type": "missing_required_key",
"severity": "critical",
"description": f"Required session state key '{key}' is missing",
"details": {"missing_key": key}
})
# 3. チャット状態の整合性チェック
if 'chat' in st.session_state:
chat_state = st.session_state.chat
required_chat_keys = ['messages', 'affection', 'scene_params', 'limiter_state']
for key in required_chat_keys:
if key not in chat_state:
validation_issues.append({
"type": "missing_chat_key",
"severity": "warning",
"description": f"Required chat state key '{key}' is missing",
"details": {"missing_chat_key": key}
})
# 好感度の範囲チェック
affection = chat_state.get('affection', 0)
if not (0 <= affection <= 100):
validation_issues.append({
"type": "invalid_affection_value",
"severity": "warning",
"description": "Affection value is out of valid range (0-100)",
"details": {"affection_value": affection}
})
# 4. MemoryManagerの整合性チェック
if 'memory_manager' in st.session_state:
memory_manager = st.session_state.memory_manager
if not hasattr(memory_manager, 'important_words_cache'):
validation_issues.append({
"type": "invalid_memory_manager",
"severity": "warning",
"description": "MemoryManager instance is missing required attributes",
"details": {"missing_attribute": "important_words_cache"}
})
# 5. ユーザーIDの整合性チェック
session_user_id = st.session_state.get('user_id')
manager_user_id = session_manager.user_id
if session_user_id and manager_user_id and session_user_id != manager_user_id:
validation_issues.append({
"type": "user_id_mismatch",
"severity": "warning",
"description": "User ID mismatch between session state and SessionManager",
"details": {
"session_user_id": session_user_id,
"manager_user_id": manager_user_id
}
})
# 6. 通知リストの整合性チェック
notification_keys = ['memory_notifications', 'affection_notifications']
for key in notification_keys:
if key in st.session_state:
notifications = st.session_state[key]
if not isinstance(notifications, list):
validation_issues.append({
"type": "invalid_notification_type",
"severity": "warning",
"description": f"Notification key '{key}' is not a list",
"details": {"key": key, "type": type(notifications).__name__}
})
except Exception as e:
validation_issues.append({
"type": "validation_exception",
"severity": "critical",
"description": "Exception occurred during detailed validation",
"details": {"exception": str(e)}
})
return validation_issues