# learning_hub/statistical_analyzer.py # (محدث بالكامل - V2 - تعلم تأثير VADER) # وهو يمثل "التعلم البطيء" (الإحصائي) import json import asyncio import traceback # (إضافة) from datetime import datetime from typing import Dict, Any, List import numpy as np # (نفترض أن هذه الدوال المساعدة سيتم نقلها إلى ملف helpers.py عام) # (لأغراض هذا الملف، سنعرفها هنا مؤقتاً) def normalize_weights(weights_dict): total = sum(weights_dict.values()) if total > 0: for key in weights_dict: weights_dict[key] /= total return weights_dict def should_update_weights(history_length): return history_length % 5 == 0 # (تحديث الأوزان كل 5 صفقات) class StatisticalAnalyzer: def __init__(self, r2_service: Any, data_manager: Any): self.r2_service = r2_service self.data_manager = data_manager # (لجلب سياق السوق) # --- (هذه هي نفس متغيرات الحالة من learning_engine القديم) --- self.weights = {} # (أوزان استراتيجيات الدخول) self.performance_history = [] self.strategy_effectiveness = {} # (إحصائيات استراتيجيات الدخول) self.exit_profile_effectiveness = {} # (إحصائيات مزيج الدخول+الخروج) self.market_patterns = {} # 🔴 --- START OF CHANGE (V2 - VADER Learning) --- 🔴 self.vader_bin_effectiveness = {} # (جديد: لتتبع أداء VADER) # 🔴 --- END OF CHANGE --- 🔴 self.initialized = False self.lock = asyncio.Lock() print("✅ Learning Hub Module: Statistical Analyzer (Slow-Learner) loaded") async def initialize(self): """تهيئة المحلل الإحصائي (التعلم البطيء)""" async with self.lock: if self.initialized: return print("🔄 [StatsAnalyzer] تهيئة نظام التعلم الإحصائي (البطيء)...") try: await self.load_weights_from_r2() await self.load_performance_history() await self.load_exit_profile_effectiveness() # 🔴 --- START OF CHANGE (V2 - VADER Learning) --- 🔴 await self.load_vader_effectiveness() # 🔴 --- END OF CHANGE --- 🔴 if not self.weights or not self.strategy_effectiveness: await self.initialize_default_weights() self.initialized = True print("✅ [StatsAnalyzer] نظام التعلم الإحصائي جاهز.") except Exception as e: print(f"❌ [StatsAnalyzer] فشل التهيئة: {e}") await self.initialize_default_weights() self.initialized = True # --------------------------------------------------------------------------- # (الدوال التالية مأخوذة مباشرة من learning_engine (39).py القديم) # (مع تعديلات طفيفة) # --------------------------------------------------------------------------- async def initialize_default_weights(self): """إعادة تعيين الأوزان إلى الوضع الافتراضي""" # 🔴 --- START OF CHANGE --- 🔴 self.weights = { # 1. أوزان اختيار الاستراتيجية (MLProcessor) "strategy_weights": { "trend_following": 0.18, "mean_reversion": 0.15, "breakout_momentum": 0.22, "volume_spike": 0.12, "whale_tracking": 0.15, "pattern_recognition": 0.10, "hybrid_ai": 0.08 }, # 2. أوزان المؤشرات العامة (MLProcessor) "indicator_weights": { "rsi": 0.2, "macd": 0.2, "bbands": 0.15, "atr": 0.1, "volume_ratio": 0.2, "vwap": 0.15 }, # 3. أوزان الأنماط العامة (MLProcessor) "pattern_weights": { "Double Bottom": 0.3, "Breakout Up": 0.3, "Uptrend": 0.2, "Near Support": 0.2, "Double Top": -0.3 # (وزن سلبي) }, # 4. أوزان كاشف الانعكاس 5m (للحارس) "reversal_indicator_weights": { "pattern": 0.4, "rsi": 0.3, "macd": 0.3 }, # 5. أوزان زناد الدخول 1m (للحارس) "entry_trigger_weights": { "cvd": 0.25, "order_book": 0.25, "ema_1m": 0.25, "macd_1m": 0.25 }, # 6. عتبة تفعيل زناد الدخول "entry_trigger_threshold": 0.75 } # 🔴 --- END OF CHANGE --- 🔴 self.strategy_effectiveness = {} self.exit_profile_effectiveness = {} self.market_patterns = {} # 🔴 --- START OF CHANGE (V2 - VADER Learning) --- 🔴 # (إعادة تعيين إحصائيات VADER أيضاً) self.vader_bin_effectiveness = { "Strong_Positive": {"total_trades": 0, "total_pnl_percent": 0}, "Positive": {"total_trades": 0, "total_pnl_percent": 0}, "Neutral": {"total_trades": 0, "total_pnl_percent": 0}, "Negative": {"total_trades": 0, "total_pnl_percent": 0}, "Strong_Negative": {"total_trades": 0, "total_pnl_percent": 0} } # 🔴 --- END OF CHANGE --- 🔴 async def load_weights_from_r2(self): key = "learning_statistical_weights.json" # (ملف جديد) try: response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key) data = json.loads(response['Body'].read()) self.weights = data.get("weights", {}) # (إضافة: التحقق من وجود الأوزان الجديدة، وإلا إضافتها من الافتراضيات) if "reversal_indicator_weights" not in self.weights: defaults = await self.get_default_strategy_weights() # (سيحتوي على كل شيء) self.weights["reversal_indicator_weights"] = defaults.get("reversal_indicator_weights") self.weights["entry_trigger_weights"] = defaults.get("entry_trigger_weights") self.weights["entry_trigger_threshold"] = defaults.get("entry_trigger_threshold") print("ℹ️ [StatsAnalyzer] تم تحديث ملف الأوزان ببيانات الحارس الجديدة.") self.strategy_effectiveness = data.get("strategy_effectiveness", {}) self.market_patterns = data.get("market_patterns", {}) print(f"✅ [StatsAnalyzer] تم تحميل الأوزان والإحصائيات من R2.") except Exception as e: print(f"ℹ️ [StatsAnalyzer] فشل تحميل الأوزان ({e}). استخدام الافتراضيات.") await self.initialize_default_weights() async def save_weights_to_r2(self): key = "learning_statistical_weights.json" try: data = { "weights": self.weights, "strategy_effectiveness": self.strategy_effectiveness, "market_patterns": self.market_patterns, "last_updated": datetime.now().isoformat() } data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8') self.r2_service.s3_client.put_object( Bucket="trading", Key=key, Body=data_json, ContentType="application/json" ) except Exception as e: print(f"❌ [StatsAnalyzer] فشل حفظ الأوزان في R2: {e}") async def load_performance_history(self): key = "learning_performance_history.json" # (مشترك) try: response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key) data = json.loads(response['Body'].read()) self.performance_history = data.get("history", []) except Exception as e: self.performance_history = [] async def save_performance_history(self): key = "learning_performance_history.json" try: data = {"history": self.performance_history[-1000:]} # (آخر 1000 صفقة فقط) data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8') self.r2_service.s3_client.put_object( Bucket="trading", Key=key, Body=data_json, ContentType="application/json" ) except Exception as e: print(f"❌ [StatsAnalyzer] فشل حفظ تاريخ الأداء: {e}") async def load_exit_profile_effectiveness(self): key = "learning_exit_profile_effectiveness.json" # (مشترك) try: response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key) data = json.loads(response['Body'].read()) self.exit_profile_effectiveness = data.get("effectiveness", {}) except Exception as e: self.exit_profile_effectiveness = {} async def save_exit_profile_effectiveness(self): key = "learning_exit_profile_effectiveness.json" try: data = { "effectiveness": self.exit_profile_effectiveness, "last_updated": datetime.now().isoformat() } data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8') self.r2_service.s3_client.put_object( Bucket="trading", Key=key, Body=data_json, ContentType="application/json" ) except Exception as e: print(f"❌ [StatsAnalyzer] فشل حفظ أداء ملف الخروج: {e}") # 🔴 --- START OF CHANGE (V2 - VADER Learning) --- 🔴 async def load_vader_effectiveness(self): """تحميل إحصائيات VADER من R2""" key = "learning_vader_effectiveness.json" # (ملف جديد) try: response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key) data = json.loads(response['Body'].read()) self.vader_bin_effectiveness = data.get("effectiveness", {}) if not self.vader_bin_effectiveness: await self.initialize_default_weights() # (سيقوم بملء القيم الافتراضية) except Exception as e: # (إذا فشل، ستقوم initialize_default_weights بملء القيم الافتراضية) pass async def save_vader_effectiveness(self): """حفظ إحصائيات VADER إلى R2""" key = "learning_vader_effectiveness.json" try: data = { "effectiveness": self.vader_bin_effectiveness, "last_updated": datetime.now().isoformat() } data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8') self.r2_service.s3_client.put_object( Bucket="trading", Key=key, Body=data_json, ContentType="application/json" ) except Exception as e: print(f"❌ [StatsAnalyzer] فشل حفظ أداء VADER: {e}") def _get_vader_bin(self, vader_score: float) -> str: """تصنيف درجة VADER الخام (-1 إلى +1) إلى سلال""" if vader_score > 0.5: return "Strong_Positive" elif vader_score > 0.05: return "Positive" elif vader_score < -0.5: return "Strong_Negative" elif vader_score < -0.05: return "Negative" else: return "Neutral" # 🔴 --- END OF CHANGE --- 🔴 async def update_statistics(self, trade_object: Dict[str, Any], close_reason: str): """ هذه هي الدالة الرئيسية التي تحدث الإحصائيات (التعلم البطيء). (تدمج update_strategy_effectiveness و update_market_patterns من الملف القديم) """ if not self.initialized: await self.initialize() try: strategy = trade_object.get('strategy', 'unknown') decision_data = trade_object.get('decision_data', {}) exit_profile = decision_data.get('exit_profile', 'unknown') combined_key = f"{strategy}_{exit_profile}" pnl_percent = trade_object.get('pnl_percent', 0) is_success = pnl_percent > 0.1 # (اعتبار الربح الطفيف نجاحاً) # 🔴 --- START OF CHANGE --- 🔴 # (استخدام بيانات السوق وقت القرار إذا كانت مخزنة، وإلا جلب الحالية) market_context = decision_data.get('market_context_at_decision', {}) if not market_context: market_context = await self.get_current_market_conditions() market_condition = market_context.get('current_trend', 'sideways_market') # (V2 - VADER Learning) جلب درجة VADER وقت القرار # (نفترض أن TradeManager حفظها هنا) vader_score_at_decision = decision_data.get('news_score', 0.0) vader_bin = self._get_vader_bin(vader_score_at_decision) # 🔴 --- END OF CHANGE --- 🔴 # --- 1. تحديث تاريخ الأداء (للتتبع العام) --- analysis_entry = { "timestamp": datetime.now().isoformat(), "trade_id": trade_object.get('id', 'N/A'), "symbol": trade_object.get('symbol', 'N/A'), "outcome": close_reason, "market_conditions": market_context, "strategy_used": strategy, "exit_profile_used": exit_profile, "pnl_percent": pnl_percent, "vader_score": vader_score_at_decision, # (إضافة) "vader_bin": vader_bin # (إضافة) } self.performance_history.append(analysis_entry) # --- 2. تحديث إحصائيات استراتيجية الدخول (strategy_effectiveness) --- if strategy not in self.strategy_effectiveness: self.strategy_effectiveness[strategy] = {"total_trades": 0, "successful_trades": 0, "total_pnl_percent": 0} self.strategy_effectiveness[strategy]["total_trades"] += 1 self.strategy_effectiveness[strategy]["total_pnl_percent"] += pnl_percent if is_success: self.strategy_effectiveness[strategy]["successful_trades"] += 1 # --- 3. تحديث إحصائيات مزيج (الدخول + الخروج) (exit_profile_effectiveness) --- if combined_key not in self.exit_profile_effectiveness: self.exit_profile_effectiveness[combined_key] = {"total_trades": 0, "successful_trades": 0, "total_pnl_percent": 0, "pnl_list": []} self.exit_profile_effectiveness[combined_key]["total_trades"] += 1 self.exit_profile_effectiveness[combined_key]["total_pnl_percent"] += pnl_percent self.exit_profile_effectiveness[combined_key]["pnl_list"].append(pnl_percent) if len(self.exit_profile_effectiveness[combined_key]["pnl_list"]) > 100: self.exit_profile_effectiveness[combined_key]["pnl_list"] = self.exit_profile_effectiveness[combined_key]["pnl_list"][-100:] if is_success: self.exit_profile_effectiveness[combined_key]["successful_trades"] += 1 # --- 4. تحديث إحصائيات ظروف السوق (market_patterns) --- if market_condition not in self.market_patterns: self.market_patterns[market_condition] = {"total_trades": 0, "successful_trades": 0, "total_pnl_percent": 0} self.market_patterns[market_condition]["total_trades"] += 1 self.market_patterns[market_condition]["total_pnl_percent"] += pnl_percent if is_success: self.market_patterns[market_condition]["successful_trades"] += 1 # 🔴 --- START OF CHANGE (V2 - VADER Learning) --- 🔴 # --- 5. تحديث إحصائيات VADER --- if vader_bin not in self.vader_bin_effectiveness: # (لضمان عدم حدوث خطأ إذا كانت السلة غير موجودة) self.vader_bin_effectiveness[vader_bin] = {"total_trades": 0, "total_pnl_percent": 0} self.vader_bin_effectiveness[vader_bin]["total_trades"] += 1 self.vader_bin_effectiveness[vader_bin]["total_pnl_percent"] += pnl_percent # 🔴 --- END OF CHANGE --- 🔴 # --- 6. تكييف الأوزان والحفظ (إذا لزم الأمر) --- # (ملاحظة: نحتاج إلى إضافة منطق لتعلم أوزان الحارس هنا مستقبلاً) if should_update_weights(len(self.performance_history)): await self.adapt_weights_based_on_performance() await self.save_weights_to_r2() await self.save_performance_history() await self.save_exit_profile_effectiveness() # 🔴 --- START OF CHANGE (V2 - VADER Learning) --- 🔴 await self.save_vader_effectiveness() # (حفظ إحصائيات VADER) # 🔴 --- END OF CHANGE --- 🔴 print(f"✅ [StatsAnalyzer] تم تحديث الإحصائيات لـ {strategy} (News Bin: {vader_bin})") except Exception as e: print(f"❌ [StatsAnalyzer] فشل تحديث الإحصائيات: {e}") traceback.print_exc() async def adapt_weights_based_on_performance(self): """تكييف أوزان استراتيجيات الدخول بناءً على الأداء الإحصائي""" # (ملاحظة: هذا المنطق حالياً يكيف فقط strategy_weights) # (سنحتاج لتطويره لاحقاً ليكيف أوزان الحارس) print("🔄 [StatsAnalyzer] تكييف أوزان الاستراتيجيات (التعلم البطيء)...") try: strategy_performance = {} total_performance = 0 for strategy, data in self.strategy_effectiveness.items(): if data.get("total_trades", 0) > 2: # (يتطلب 3 صفقات على الأقل للتكيف) success_rate = data["successful_trades"] / data["total_trades"] avg_pnl = data["total_pnl_percent"] / data["total_trades"] # مقياس مركب: (معدل النجاح * 60%) + (متوسط الربح * 40%) # (يتم تقييد متوسط الربح بين -5 و +5) normalized_pnl = min(max(avg_pnl, -5.0), 5.0) / 5.0 # (من -1 إلى 1) composite_performance = (success_rate * 0.6) + (normalized_pnl * 0.4) strategy_performance[strategy] = composite_performance total_performance += composite_performance if total_performance > 0 and strategy_performance: base_weights = self.weights.get("strategy_weights", {}) for strategy, performance in strategy_performance.items(): current_weight = base_weights.get(strategy, 0.1) # (تعديل طفيف: 80% من الوزن الحالي + 20% من الأداء) new_weight = (current_weight * 0.8) + (performance * 0.2) base_weights[strategy] = max(new_weight, 0.05) # (الحد الأدنى للوزن 5%) normalize_weights(base_weights) self.weights["strategy_weights"] = base_weights print(f"✅ [StatsAnalyzer] تم تكييف الأوزان: {base_weights}") except Exception as e: print(f"❌ [StatsAnalyzer] فشل تكييف الأوزان: {e}") # --- (الدوال المساعدة لجلب البيانات - مأخوذة من الملف القديم) --- async def get_best_exit_profile(self, entry_strategy: str) -> str: """يجد أفضل ملف خروج إحصائياً لاستراتيجية دخول معينة.""" if not self.initialized or not self.exit_profile_effectiveness: return "unknown" relevant_profiles = {} for combined_key, data in self.exit_profile_effectiveness.items(): if combined_key.startswith(f"{entry_strategy}_"): if data.get("total_trades", 0) >= 3: # (يتطلب 3 صفقات) exit_profile_name = combined_key.replace(f"{entry_strategy}_", "", 1) avg_pnl = data["total_pnl_percent"] / data["total_trades"] relevant_profiles[exit_profile_name] = avg_pnl if not relevant_profiles: return "unknown" best_profile = max(relevant_profiles, key=relevant_profiles.get) return best_profile # 🔴 --- START OF CHANGE (V2 - VADER Learning) --- 🔴 async def get_statistical_vader_pnl(self, vader_score: float) -> float: """ جلب متوسط الربح/الخسارة التاريخي لدرجة VADER """ if not self.initialized: return 0.0 # (العودة بقيمة محايدة) vader_bin = self._get_vader_bin(vader_score) bin_data = self.vader_bin_effectiveness.get(vader_bin) if not bin_data or bin_data.get("total_trades", 0) < 3: # (لا توجد بيانات كافية، العودة بقيمة محايدة) return 0.0 # (إرجاع متوسط الربح/الخسارة الفعلي لهذه السلة) avg_pnl = bin_data["total_pnl_percent"] / bin_data["total_trades"] return avg_pnl # 🔴 --- END OF CHANGE --- 🔴 # 🔴 --- START OF CHANGE --- 🔴 async def get_optimized_weights(self, market_condition: str) -> Dict[str, float]: """ جلب جميع الأوزان المعدلة إحصائياً (لكل من MLProcessor والحارس). """ if not self.initialized or "strategy_weights" not in self.weights: await self.initialize() base_weights = self.weights.copy() # (يمكننا إضافة منطق تعديل الأوزان بناءً على ظروف السوق هنا) # (لكن في الوقت الحالي، سنعيد الأوزان المعدلة إحصائياً كما هي) if not base_weights: # (العودة إلى الافتراضيات إذا كانت الأوزان فارغة) return await self.get_default_strategy_weights() return base_weights # 🔴 --- END OF CHANGE --- 🔴 async def get_default_strategy_weights(self) -> Dict[str, float]: """إرجاع الأوزان الافتراضية عند الفشل""" # 🔴 --- START OF CHANGE --- 🔴 # (إرجاع كل شيء، وليس فقط أوزان الاستراتيجية) return { "strategy_weights": { "trend_following": 0.18, "mean_reversion": 0.15, "breakout_momentum": 0.22, "volume_spike": 0.12, "whale_tracking": 0.15, "pattern_recognition": 0.10, "hybrid_ai": 0.08 }, "indicator_weights": { "rsi": 0.2, "macd": 0.2, "bbands": 0.15, "atr": 0.1, "volume_ratio": 0.2, "vwap": 0.15 }, "pattern_weights": { "Double Bottom": 0.3, "Breakout Up": 0.3, "Uptrend": 0.2, "Near Support": 0.2, "Double Top": -0.3 }, "reversal_indicator_weights": { "pattern": 0.4, "rsi": 0.3, "macd": 0.3 }, "entry_trigger_weights": { "cvd": 0.25, "order_book": 0.25, "ema_1m": 0.25, "macd_1m": 0.25 }, "entry_trigger_threshold": 0.75 } # 🔴 --- END OF CHANGE --- 🔴 async def get_current_market_conditions(self) -> Dict[str, Any]: """جلب سياق السوق الحالي (من الملف القديم)""" try: if not self.data_manager: raise ValueError("DataManager unavailable") market_context = await self.data_manager.get_market_context_async() if not market_context: raise ValueError("Market context fetch failed") # (نحتاج دالة لحساب التقلب - نفترض أنها في helpers) # volatility = calculate_market_volatility(market_context) return { "current_trend": market_context.get('market_trend', 'sideways_market'), "volatility": "medium", # (قيمة مؤقتة) "market_sentiment": market_context.get('btc_sentiment', 'NEUTRAL'), } except Exception as e: return {"current_trend": "sideways_market", "volatility": "medium", "market_sentiment": "NEUTRAL"} # 🔴 --- START OF CHANGE --- 🔴 # (تم حذف القوس } الزائد من هنا) # 🔴 --- END OF CHANGE --- 🔴