|
|
|
|
|
import json |
|
|
import asyncio |
|
|
from datetime import datetime |
|
|
|
|
|
from typing import List, Dict, Optional, Any |
|
|
|
|
|
from .schemas import Delta, ReflectorOutput |
|
|
from .policy_engine import PolicyEngine |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MemoryStore: |
|
|
|
|
|
def __init__(self, r2_service: Any, policy_engine: PolicyEngine, llm_service: Any): |
|
|
self.r2_service = r2_service |
|
|
self.policy_engine = policy_engine |
|
|
self.llm_service = llm_service |
|
|
|
|
|
self.domain_files = { |
|
|
"strategy": "learning_deltas_strategy.json", |
|
|
"pattern": "learning_deltas_pattern.json", |
|
|
"indicator": "learning_deltas_indicator.json", |
|
|
"monte_carlo": "learning_deltas_monte_carlo.json", |
|
|
"general": "learning_deltas_general.json" |
|
|
} |
|
|
|
|
|
self.distill_threshold = 50 |
|
|
print("✅ Learning Hub Module: Memory Store loaded (FIXED: Imported Any)") |
|
|
|
|
|
async def _load_deltas_from_r2(self, domain: str) -> List[Dict]: |
|
|
"""تحميل ملف الدلتا المحدد من R2""" |
|
|
key = self.domain_files.get(domain, self.domain_files["general"]) |
|
|
try: |
|
|
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key) |
|
|
deltas_list = json.loads(response['Body'].read()) |
|
|
return deltas_list |
|
|
except Exception as e: |
|
|
|
|
|
print(f"ℹ️ [MemoryStore] لم يتم العثور على ملف دلتا لـ {domain}، سيتم إنشاء واحد جديد.") |
|
|
return [] |
|
|
|
|
|
async def _save_deltas_to_r2(self, domain: str, deltas_list: List[Dict]): |
|
|
"""حفظ ملف الدلتا المحدث إلى R2""" |
|
|
key = self.domain_files.get(domain, self.domain_files["general"]) |
|
|
try: |
|
|
|
|
|
deltas_to_save = [d.model_dump() if isinstance(d, Delta) else d for d in deltas_list] |
|
|
data_json = json.dumps(deltas_to_save, indent=2, ensure_ascii=False).encode('utf-8') |
|
|
self.r2_service.s3_client.put_object( |
|
|
Bucket="trading", Key=key, Body=data_json, ContentType="application/json" |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"❌ [MemoryStore] فشل حفظ الدلتا إلى R2: {e}") |
|
|
|
|
|
async def save_new_delta(self, |
|
|
reflector_output: ReflectorOutput, |
|
|
trade_object: Dict[str, Any], |
|
|
domain: str = "strategy"): |
|
|
""" |
|
|
حفظ "دلتا" جديدة بناءً على مخرجات المنعكس وسياسة القبول. |
|
|
""" |
|
|
try: |
|
|
trade_pnl_percent = trade_object.get('pnl_percent', 0) |
|
|
|
|
|
|
|
|
is_approved, approval_reason = self.policy_engine.get_delta_acceptance( |
|
|
reflector_output, |
|
|
trade_pnl_percent |
|
|
) |
|
|
|
|
|
|
|
|
new_delta = Delta( |
|
|
text=reflector_output.suggested_rule, |
|
|
domain=domain, |
|
|
score=reflector_output.confidence, |
|
|
evidence_refs=[trade_object.get('id', 'unknown_trade_id')], |
|
|
approved=is_approved, |
|
|
trade_strategy=trade_object.get('strategy', 'unknown'), |
|
|
exit_profile=trade_object.get('decision_data', {}).get('exit_profile', 'unknown') |
|
|
) |
|
|
|
|
|
|
|
|
deltas_list = await self._load_deltas_from_r2(domain) |
|
|
deltas_list.append(new_delta.model_dump()) |
|
|
await self._save_deltas_to_r2(domain, deltas_list) |
|
|
|
|
|
print(f"✅ [MemoryStore] تم حفظ دلتا جديدة لـ {domain}. الحالة: {approval_reason}") |
|
|
|
|
|
|
|
|
if len([d for d in deltas_list if d.get('approved')]) % self.distill_threshold == 0 and is_approved: |
|
|
print(f"ℹ️ [MemoryStore] تم الوصول إلى حد {self.distill_threshold} دلتا لـ {domain}. التقطير سيتم جدولته.") |
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ [MemoryStore] فشل فادح في حفظ الدلتا: {e}") |
|
|
traceback.print_exc() |
|
|
|
|
|
|
|
|
async def get_active_context(self, domain: str, query: str, top_k: int = 3) -> str: |
|
|
""" |
|
|
جلب "السياق النشط" (Active Context) لإرساله إلى النموذج. |
|
|
""" |
|
|
try: |
|
|
all_deltas_dicts = await self._load_deltas_from_r2(domain) |
|
|
|
|
|
|
|
|
approved_deltas = [Delta(**d) for d in all_deltas_dicts if d.get('approved', False)] |
|
|
|
|
|
if not approved_deltas: |
|
|
|
|
|
return "Playbook: No approved learning rules (Deltas) found for this domain yet." |
|
|
|
|
|
|
|
|
scored_deltas = [] |
|
|
for delta in approved_deltas: |
|
|
priority_map = {"high": 1.0, "medium": 0.6, "low": 0.2} |
|
|
priority_score = priority_map.get(delta.priority, 0.6) |
|
|
|
|
|
try: |
|
|
age_days = (datetime.now() - datetime.fromisoformat(delta.created_at)).days |
|
|
freshness_score = max(0, 1.0 - (age_days / 90.0)) |
|
|
except Exception: |
|
|
freshness_score = 0.5 |
|
|
|
|
|
relevance_score = 0.5 |
|
|
query_words = set(query.lower().split()) |
|
|
delta_words = set(delta.text.lower().split()) |
|
|
if query_words.intersection(delta_words): |
|
|
relevance_score = 1.0 |
|
|
elif delta.trade_strategy and delta.trade_strategy.lower() in query_words: |
|
|
relevance_score = 0.8 |
|
|
|
|
|
final_score = (0.6 * relevance_score) + (0.3 * priority_score) + (0.1 * freshness_score) |
|
|
scored_deltas.append((final_score, delta)) |
|
|
|
|
|
|
|
|
scored_deltas.sort(key=lambda x: x[0], reverse=True) |
|
|
top_deltas = [delta for score, delta in scored_deltas[:top_k]] |
|
|
|
|
|
|
|
|
if not top_deltas: |
|
|
return "Playbook: No relevant learning rules (Deltas) found for this query." |
|
|
|
|
|
playbook_header = f"Playbook (Top {len(top_deltas)} Rules - Domain: {domain}):" |
|
|
|
|
|
delta_lines = [f"• {delta.text} (Score: {delta.score:.2f}, Prio: {delta.priority})" for delta in top_deltas] |
|
|
|
|
|
|
|
|
|
|
|
return "\n".join([playbook_header] + delta_lines) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ [MemoryStore] فشل جلب السياق النشط: {e}") |
|
|
|
|
|
return "Playbook: Error retrieving learning context." |