File size: 7,933 Bytes
59250c3 11e97ef 59250c3 11e97ef 59250c3 11e97ef 59250c3 11e97ef 59250c3 11e97ef 59250c3 11e97ef 59250c3 11e97ef 59250c3 11e97ef 59250c3 11e97ef 59250c3 11e97ef 59250c3 11e97ef 59250c3 11e97ef 59250c3 11e97ef 59250c3 11e97ef 59250c3 11e97ef 59250c3 11e97ef 59250c3 11e97ef 59250c3 11e97ef 59250c3 11e97ef 59250c3 11e97ef 59250c3 11e97ef 59250c3 11e97ef |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# learning_hub/memory_store.py
import json
import asyncio
from datetime import datetime
# 🔴 --- START OF CHANGE --- 🔴
from typing import List, Dict, Optional, Any # (Import Any here)
# 🔴 --- END OF CHANGE --- 🔴
from .schemas import Delta, ReflectorOutput
from .policy_engine import PolicyEngine
# للتوافق مع R2Service (نفترض أنه سيتم تمريره)
# (لا يمكننا استيراده مباشرة لتجنب التبعيات الدائرية)
class MemoryStore:
# (The __init__ signature now correctly uses the imported Any)
def __init__(self, r2_service: Any, policy_engine: PolicyEngine, llm_service: Any):
self.r2_service = r2_service
self.policy_engine = policy_engine
self.llm_service = llm_service # نحتاجه لعملية "التقطير" (Distillation)
self.domain_files = {
"strategy": "learning_deltas_strategy.json",
"pattern": "learning_deltas_pattern.json",
"indicator": "learning_deltas_indicator.json",
"monte_carlo": "learning_deltas_monte_carlo.json",
"general": "learning_deltas_general.json"
}
self.distill_threshold = 50 # (من النقطة 6)
print("✅ Learning Hub Module: Memory Store loaded (FIXED: Imported Any)")
async def _load_deltas_from_r2(self, domain: str) -> List[Dict]:
"""تحميل ملف الدلتا المحدد من R2"""
key = self.domain_files.get(domain, self.domain_files["general"])
try:
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
deltas_list = json.loads(response['Body'].read())
return deltas_list
except Exception as e:
# إذا فشل التحميل (مثل الملف غير موجود)، أعد قائمة فارغة
print(f"ℹ️ [MemoryStore] لم يتم العثور على ملف دلتا لـ {domain}، سيتم إنشاء واحد جديد.")
return []
async def _save_deltas_to_r2(self, domain: str, deltas_list: List[Dict]):
"""حفظ ملف الدلتا المحدث إلى R2"""
key = self.domain_files.get(domain, self.domain_files["general"])
try:
# (Ensure list contains dicts before saving)
deltas_to_save = [d.model_dump() if isinstance(d, Delta) else d for d in deltas_list]
data_json = json.dumps(deltas_to_save, indent=2, ensure_ascii=False).encode('utf-8')
self.r2_service.s3_client.put_object(
Bucket="trading", Key=key, Body=data_json, ContentType="application/json"
)
except Exception as e:
print(f"❌ [MemoryStore] فشل حفظ الدلتا إلى R2: {e}")
async def save_new_delta(self,
reflector_output: ReflectorOutput,
trade_object: Dict[str, Any],
domain: str = "strategy"):
"""
حفظ "دلتا" جديدة بناءً على مخرجات المنعكس وسياسة القبول.
"""
try:
trade_pnl_percent = trade_object.get('pnl_percent', 0)
# 1. التحقق من سياسة القبول
is_approved, approval_reason = self.policy_engine.get_delta_acceptance(
reflector_output,
trade_pnl_percent
)
# 2. إنشاء كائن الدلتا
new_delta = Delta(
text=reflector_output.suggested_rule,
domain=domain,
score=reflector_output.confidence,
evidence_refs=[trade_object.get('id', 'unknown_trade_id')],
approved=is_approved,
trade_strategy=trade_object.get('strategy', 'unknown'),
exit_profile=trade_object.get('decision_data', {}).get('exit_profile', 'unknown')
)
# 3. تحميل، إضافة، وحفظ الدلتا
deltas_list = await self._load_deltas_from_r2(domain)
deltas_list.append(new_delta.model_dump()) # Use model_dump() for pydantic models
await self._save_deltas_to_r2(domain, deltas_list)
print(f"✅ [MemoryStore] تم حفظ دلتا جديدة لـ {domain}. الحالة: {approval_reason}")
# 4. تفعيل عملية "التقطير" (لا يتم تنفيذها هنا مباشرة)
if len([d for d in deltas_list if d.get('approved')]) % self.distill_threshold == 0 and is_approved:
print(f"ℹ️ [MemoryStore] تم الوصول إلى حد {self.distill_threshold} دلتا لـ {domain}. التقطير سيتم جدولته.")
# (Curator will handle the actual check and distillation)
except Exception as e:
print(f"❌ [MemoryStore] فشل فادح في حفظ الدلتا: {e}")
traceback.print_exc()
async def get_active_context(self, domain: str, query: str, top_k: int = 3) -> str:
"""
جلب "السياق النشط" (Active Context) لإرساله إلى النموذج.
"""
try:
all_deltas_dicts = await self._load_deltas_from_r2(domain)
# 1. تصفية الدلتا المعتمدة فقط
approved_deltas = [Delta(**d) for d in all_deltas_dicts if d.get('approved', False)]
if not approved_deltas:
# (Return English text for consistency)
return "Playbook: No approved learning rules (Deltas) found for this domain yet."
# 2. خوارزمية الاسترجاع (نسخة مبسطة)
scored_deltas = []
for delta in approved_deltas:
priority_map = {"high": 1.0, "medium": 0.6, "low": 0.2}
priority_score = priority_map.get(delta.priority, 0.6)
try:
age_days = (datetime.now() - datetime.fromisoformat(delta.created_at)).days
freshness_score = max(0, 1.0 - (age_days / 90.0))
except Exception:
freshness_score = 0.5
relevance_score = 0.5
query_words = set(query.lower().split())
delta_words = set(delta.text.lower().split())
if query_words.intersection(delta_words):
relevance_score = 1.0
elif delta.trade_strategy and delta.trade_strategy.lower() in query_words:
relevance_score = 0.8
final_score = (0.6 * relevance_score) + (0.3 * priority_score) + (0.1 * freshness_score)
scored_deltas.append((final_score, delta))
# 3. فرز واختيار أفضل K
scored_deltas.sort(key=lambda x: x[0], reverse=True)
top_deltas = [delta for score, delta in scored_deltas[:top_k]]
# 4. تنسيق الموجه (باللغة الإنجليزية)
if not top_deltas:
return "Playbook: No relevant learning rules (Deltas) found for this query."
playbook_header = f"Playbook (Top {len(top_deltas)} Rules - Domain: {domain}):"
# (Added score for context)
delta_lines = [f"• {delta.text} (Score: {delta.score:.2f}, Prio: {delta.priority})" for delta in top_deltas]
# (Distilled rules are just high-priority deltas in this implementation)
return "\n".join([playbook_header] + delta_lines)
except Exception as e:
print(f"❌ [MemoryStore] فشل جلب السياق النشط: {e}")
# (Return English text for consistency)
return "Playbook: Error retrieving learning context." |