Trad / learning_hub /memory_store.py
Riy777's picture
Update learning_hub/memory_store.py
11e97ef
raw
history blame
7.93 kB
# learning_hub/memory_store.py
import json
import asyncio
from datetime import datetime
# 🔴 --- START OF CHANGE --- 🔴
from typing import List, Dict, Optional, Any # (Import Any here)
# 🔴 --- END OF CHANGE --- 🔴
from .schemas import Delta, ReflectorOutput
from .policy_engine import PolicyEngine
# للتوافق مع R2Service (نفترض أنه سيتم تمريره)
# (لا يمكننا استيراده مباشرة لتجنب التبعيات الدائرية)
class MemoryStore:
# (The __init__ signature now correctly uses the imported Any)
def __init__(self, r2_service: Any, policy_engine: PolicyEngine, llm_service: Any):
self.r2_service = r2_service
self.policy_engine = policy_engine
self.llm_service = llm_service # نحتاجه لعملية "التقطير" (Distillation)
self.domain_files = {
"strategy": "learning_deltas_strategy.json",
"pattern": "learning_deltas_pattern.json",
"indicator": "learning_deltas_indicator.json",
"monte_carlo": "learning_deltas_monte_carlo.json",
"general": "learning_deltas_general.json"
}
self.distill_threshold = 50 # (من النقطة 6)
print("✅ Learning Hub Module: Memory Store loaded (FIXED: Imported Any)")
async def _load_deltas_from_r2(self, domain: str) -> List[Dict]:
"""تحميل ملف الدلتا المحدد من R2"""
key = self.domain_files.get(domain, self.domain_files["general"])
try:
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
deltas_list = json.loads(response['Body'].read())
return deltas_list
except Exception as e:
# إذا فشل التحميل (مثل الملف غير موجود)، أعد قائمة فارغة
print(f"ℹ️ [MemoryStore] لم يتم العثور على ملف دلتا لـ {domain}، سيتم إنشاء واحد جديد.")
return []
async def _save_deltas_to_r2(self, domain: str, deltas_list: List[Dict]):
"""حفظ ملف الدلتا المحدث إلى R2"""
key = self.domain_files.get(domain, self.domain_files["general"])
try:
# (Ensure list contains dicts before saving)
deltas_to_save = [d.model_dump() if isinstance(d, Delta) else d for d in deltas_list]
data_json = json.dumps(deltas_to_save, indent=2, ensure_ascii=False).encode('utf-8')
self.r2_service.s3_client.put_object(
Bucket="trading", Key=key, Body=data_json, ContentType="application/json"
)
except Exception as e:
print(f"❌ [MemoryStore] فشل حفظ الدلتا إلى R2: {e}")
async def save_new_delta(self,
reflector_output: ReflectorOutput,
trade_object: Dict[str, Any],
domain: str = "strategy"):
"""
حفظ "دلتا" جديدة بناءً على مخرجات المنعكس وسياسة القبول.
"""
try:
trade_pnl_percent = trade_object.get('pnl_percent', 0)
# 1. التحقق من سياسة القبول
is_approved, approval_reason = self.policy_engine.get_delta_acceptance(
reflector_output,
trade_pnl_percent
)
# 2. إنشاء كائن الدلتا
new_delta = Delta(
text=reflector_output.suggested_rule,
domain=domain,
score=reflector_output.confidence,
evidence_refs=[trade_object.get('id', 'unknown_trade_id')],
approved=is_approved,
trade_strategy=trade_object.get('strategy', 'unknown'),
exit_profile=trade_object.get('decision_data', {}).get('exit_profile', 'unknown')
)
# 3. تحميل، إضافة، وحفظ الدلتا
deltas_list = await self._load_deltas_from_r2(domain)
deltas_list.append(new_delta.model_dump()) # Use model_dump() for pydantic models
await self._save_deltas_to_r2(domain, deltas_list)
print(f"✅ [MemoryStore] تم حفظ دلتا جديدة لـ {domain}. الحالة: {approval_reason}")
# 4. تفعيل عملية "التقطير" (لا يتم تنفيذها هنا مباشرة)
if len([d for d in deltas_list if d.get('approved')]) % self.distill_threshold == 0 and is_approved:
print(f"ℹ️ [MemoryStore] تم الوصول إلى حد {self.distill_threshold} دلتا لـ {domain}. التقطير سيتم جدولته.")
# (Curator will handle the actual check and distillation)
except Exception as e:
print(f"❌ [MemoryStore] فشل فادح في حفظ الدلتا: {e}")
traceback.print_exc()
async def get_active_context(self, domain: str, query: str, top_k: int = 3) -> str:
"""
جلب "السياق النشط" (Active Context) لإرساله إلى النموذج.
"""
try:
all_deltas_dicts = await self._load_deltas_from_r2(domain)
# 1. تصفية الدلتا المعتمدة فقط
approved_deltas = [Delta(**d) for d in all_deltas_dicts if d.get('approved', False)]
if not approved_deltas:
# (Return English text for consistency)
return "Playbook: No approved learning rules (Deltas) found for this domain yet."
# 2. خوارزمية الاسترجاع (نسخة مبسطة)
scored_deltas = []
for delta in approved_deltas:
priority_map = {"high": 1.0, "medium": 0.6, "low": 0.2}
priority_score = priority_map.get(delta.priority, 0.6)
try:
age_days = (datetime.now() - datetime.fromisoformat(delta.created_at)).days
freshness_score = max(0, 1.0 - (age_days / 90.0))
except Exception:
freshness_score = 0.5
relevance_score = 0.5
query_words = set(query.lower().split())
delta_words = set(delta.text.lower().split())
if query_words.intersection(delta_words):
relevance_score = 1.0
elif delta.trade_strategy and delta.trade_strategy.lower() in query_words:
relevance_score = 0.8
final_score = (0.6 * relevance_score) + (0.3 * priority_score) + (0.1 * freshness_score)
scored_deltas.append((final_score, delta))
# 3. فرز واختيار أفضل K
scored_deltas.sort(key=lambda x: x[0], reverse=True)
top_deltas = [delta for score, delta in scored_deltas[:top_k]]
# 4. تنسيق الموجه (باللغة الإنجليزية)
if not top_deltas:
return "Playbook: No relevant learning rules (Deltas) found for this query."
playbook_header = f"Playbook (Top {len(top_deltas)} Rules - Domain: {domain}):"
# (Added score for context)
delta_lines = [f"• {delta.text} (Score: {delta.score:.2f}, Prio: {delta.priority})" for delta in top_deltas]
# (Distilled rules are just high-priority deltas in this implementation)
return "\n".join([playbook_header] + delta_lines)
except Exception as e:
print(f"❌ [MemoryStore] فشل جلب السياق النشط: {e}")
# (Return English text for consistency)
return "Playbook: Error retrieving learning context."