Spaces:
Running
Running
| # learning_hub/memory_store.py | |
| import json | |
| import asyncio | |
| from datetime import datetime | |
| # 🔴 --- START OF CHANGE --- 🔴 | |
| from typing import List, Dict, Optional, Any # (Import Any here) | |
| # 🔴 --- END OF CHANGE --- 🔴 | |
| from .schemas import Delta, ReflectorOutput | |
| from .policy_engine import PolicyEngine | |
| # للتوافق مع R2Service (نفترض أنه سيتم تمريره) | |
| # (لا يمكننا استيراده مباشرة لتجنب التبعيات الدائرية) | |
| class MemoryStore: | |
| # (The __init__ signature now correctly uses the imported Any) | |
| def __init__(self, r2_service: Any, policy_engine: PolicyEngine, llm_service: Any): | |
| self.r2_service = r2_service | |
| self.policy_engine = policy_engine | |
| self.llm_service = llm_service # نحتاجه لعملية "التقطير" (Distillation) | |
| self.domain_files = { | |
| "strategy": "learning_deltas_strategy.json", | |
| "pattern": "learning_deltas_pattern.json", | |
| "indicator": "learning_deltas_indicator.json", | |
| "monte_carlo": "learning_deltas_monte_carlo.json", | |
| "general": "learning_deltas_general.json" | |
| } | |
| self.distill_threshold = 50 # (من النقطة 6) | |
| print("✅ Learning Hub Module: Memory Store loaded (FIXED: Imported Any)") | |
| async def _load_deltas_from_r2(self, domain: str) -> List[Dict]: | |
| """تحميل ملف الدلتا المحدد من R2""" | |
| key = self.domain_files.get(domain, self.domain_files["general"]) | |
| try: | |
| response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key) | |
| deltas_list = json.loads(response['Body'].read()) | |
| return deltas_list | |
| except Exception as e: | |
| # إذا فشل التحميل (مثل الملف غير موجود)، أعد قائمة فارغة | |
| print(f"ℹ️ [MemoryStore] لم يتم العثور على ملف دلتا لـ {domain}، سيتم إنشاء واحد جديد.") | |
| return [] | |
| async def _save_deltas_to_r2(self, domain: str, deltas_list: List[Dict]): | |
| """حفظ ملف الدلتا المحدث إلى R2""" | |
| key = self.domain_files.get(domain, self.domain_files["general"]) | |
| try: | |
| # (Ensure list contains dicts before saving) | |
| deltas_to_save = [d.model_dump() if isinstance(d, Delta) else d for d in deltas_list] | |
| data_json = json.dumps(deltas_to_save, indent=2, ensure_ascii=False).encode('utf-8') | |
| self.r2_service.s3_client.put_object( | |
| Bucket="trading", Key=key, Body=data_json, ContentType="application/json" | |
| ) | |
| except Exception as e: | |
| print(f"❌ [MemoryStore] فشل حفظ الدلتا إلى R2: {e}") | |
| async def save_new_delta(self, | |
| reflector_output: ReflectorOutput, | |
| trade_object: Dict[str, Any], | |
| domain: str = "strategy"): | |
| """ | |
| حفظ "دلتا" جديدة بناءً على مخرجات المنعكس وسياسة القبول. | |
| """ | |
| try: | |
| trade_pnl_percent = trade_object.get('pnl_percent', 0) | |
| # 1. التحقق من سياسة القبول | |
| is_approved, approval_reason = self.policy_engine.get_delta_acceptance( | |
| reflector_output, | |
| trade_pnl_percent | |
| ) | |
| # 2. إنشاء كائن الدلتا | |
| new_delta = Delta( | |
| text=reflector_output.suggested_rule, | |
| domain=domain, | |
| score=reflector_output.confidence, | |
| evidence_refs=[trade_object.get('id', 'unknown_trade_id')], | |
| approved=is_approved, | |
| trade_strategy=trade_object.get('strategy', 'unknown'), | |
| exit_profile=trade_object.get('decision_data', {}).get('exit_profile', 'unknown') | |
| ) | |
| # 3. تحميل، إضافة، وحفظ الدلتا | |
| deltas_list = await self._load_deltas_from_r2(domain) | |
| deltas_list.append(new_delta.model_dump()) # Use model_dump() for pydantic models | |
| await self._save_deltas_to_r2(domain, deltas_list) | |
| print(f"✅ [MemoryStore] تم حفظ دلتا جديدة لـ {domain}. الحالة: {approval_reason}") | |
| # 4. تفعيل عملية "التقطير" (لا يتم تنفيذها هنا مباشرة) | |
| if len([d for d in deltas_list if d.get('approved')]) % self.distill_threshold == 0 and is_approved: | |
| print(f"ℹ️ [MemoryStore] تم الوصول إلى حد {self.distill_threshold} دلتا لـ {domain}. التقطير سيتم جدولته.") | |
| # (Curator will handle the actual check and distillation) | |
| except Exception as e: | |
| print(f"❌ [MemoryStore] فشل فادح في حفظ الدلتا: {e}") | |
| traceback.print_exc() | |
| async def get_active_context(self, domain: str, query: str, top_k: int = 3) -> str: | |
| """ | |
| جلب "السياق النشط" (Active Context) لإرساله إلى النموذج. | |
| """ | |
| try: | |
| all_deltas_dicts = await self._load_deltas_from_r2(domain) | |
| # 1. تصفية الدلتا المعتمدة فقط | |
| approved_deltas = [Delta(**d) for d in all_deltas_dicts if d.get('approved', False)] | |
| if not approved_deltas: | |
| # (Return English text for consistency) | |
| return "Playbook: No approved learning rules (Deltas) found for this domain yet." | |
| # 2. خوارزمية الاسترجاع (نسخة مبسطة) | |
| scored_deltas = [] | |
| for delta in approved_deltas: | |
| priority_map = {"high": 1.0, "medium": 0.6, "low": 0.2} | |
| priority_score = priority_map.get(delta.priority, 0.6) | |
| try: | |
| age_days = (datetime.now() - datetime.fromisoformat(delta.created_at)).days | |
| freshness_score = max(0, 1.0 - (age_days / 90.0)) | |
| except Exception: | |
| freshness_score = 0.5 | |
| relevance_score = 0.5 | |
| query_words = set(query.lower().split()) | |
| delta_words = set(delta.text.lower().split()) | |
| if query_words.intersection(delta_words): | |
| relevance_score = 1.0 | |
| elif delta.trade_strategy and delta.trade_strategy.lower() in query_words: | |
| relevance_score = 0.8 | |
| final_score = (0.6 * relevance_score) + (0.3 * priority_score) + (0.1 * freshness_score) | |
| scored_deltas.append((final_score, delta)) | |
| # 3. فرز واختيار أفضل K | |
| scored_deltas.sort(key=lambda x: x[0], reverse=True) | |
| top_deltas = [delta for score, delta in scored_deltas[:top_k]] | |
| # 4. تنسيق الموجه (باللغة الإنجليزية) | |
| if not top_deltas: | |
| return "Playbook: No relevant learning rules (Deltas) found for this query." | |
| playbook_header = f"Playbook (Top {len(top_deltas)} Rules - Domain: {domain}):" | |
| # (Added score for context) | |
| delta_lines = [f"• {delta.text} (Score: {delta.score:.2f}, Prio: {delta.priority})" for delta in top_deltas] | |
| # (Distilled rules are just high-priority deltas in this implementation) | |
| return "\n".join([playbook_header] + delta_lines) | |
| except Exception as e: | |
| print(f"❌ [MemoryStore] فشل جلب السياق النشط: {e}") | |
| # (Return English text for consistency) | |
| return "Playbook: Error retrieving learning context." |