|
|
|
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from datetime import datetime |
|
|
import asyncio |
|
|
import json |
|
|
import re |
|
|
|
|
|
|
|
|
from .indicators import AdvancedTechnicalAnalyzer |
|
|
from .monte_carlo import MonteCarloAnalyzer |
|
|
from .patterns import ChartPatternAnalyzer |
|
|
from .strategies import MultiStrategyEngine |
|
|
|
|
|
class MLProcessor: |
|
|
def __init__(self, market_context, data_manager, learning_hub): |
|
|
self.market_context = market_context |
|
|
self.data_manager = data_manager |
|
|
self.learning_hub = learning_hub |
|
|
|
|
|
self.technical_analyzer = AdvancedTechnicalAnalyzer() |
|
|
|
|
|
self.strategy_engine = MultiStrategyEngine(data_manager, learning_hub) |
|
|
self.monte_carlo_analyzer = MonteCarloAnalyzer() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.data_manager and self.data_manager.pattern_analyzer: |
|
|
self.pattern_analyzer = self.data_manager.pattern_analyzer |
|
|
print("✅ [MLProcessor V6.4] تم ربط محرك الأنماط V8 (الرئيسي).") |
|
|
else: |
|
|
print("⚠️ [MLProcessor V6.4] DataManager أو محرك V8 غير متاح. العودة إلى الوضع الآمن.") |
|
|
self.pattern_analyzer = ChartPatternAnalyzer(r2_service=None) |
|
|
|
|
|
|
|
|
self.whale_data_semaphore = asyncio.Semaphore(2) |
|
|
|
|
|
async def process_and_score_symbol_enhanced(self, raw_data, preloaded_whale_data: dict = None): |
|
|
""" |
|
|
(Unchanged logic, but now self.strategy_engine uses the learning_hub) |
|
|
""" |
|
|
try: |
|
|
if not raw_data or not raw_data.get('ohlcv'): |
|
|
return None |
|
|
|
|
|
symbol = raw_data['symbol'] |
|
|
|
|
|
base_analysis = await self.process_and_score_symbol(raw_data) |
|
|
if not base_analysis: |
|
|
return None |
|
|
|
|
|
try: |
|
|
|
|
|
advanced_indicators = {} |
|
|
ohlcv_available = raw_data.get('ohlcv', {}) |
|
|
for timeframe, candles in ohlcv_available.items(): |
|
|
if candles and len(candles) >= 20: |
|
|
dataframe = self._create_dataframe(candles) |
|
|
indicators = self.technical_analyzer.calculate_all_indicators(dataframe, timeframe) |
|
|
advanced_indicators[timeframe] = indicators |
|
|
base_analysis['advanced_indicators'] = advanced_indicators |
|
|
|
|
|
|
|
|
monte_carlo_results = await self.monte_carlo_analyzer.generate_1h_price_distribution(ohlcv_available) |
|
|
|
|
|
if monte_carlo_results: |
|
|
base_analysis['monte_carlo_distribution'] = monte_carlo_results |
|
|
base_analysis['monte_carlo_probability'] = monte_carlo_results.get('probability_of_gain', 0) |
|
|
base_analysis['monte_carlo_details'] = self.monte_carlo_analyzer.simulation_results |
|
|
else: |
|
|
base_analysis['monte_carlo_distribution'] = None |
|
|
base_analysis['monte_carlo_probability'] = 0 |
|
|
base_analysis['monte_carlo_details'] = self.monte_carlo_analyzer.simulation_results |
|
|
|
|
|
|
|
|
|
|
|
pattern_analysis = await self.pattern_analyzer.detect_chart_patterns(ohlcv_available) |
|
|
base_analysis['pattern_analysis'] = pattern_analysis |
|
|
|
|
|
|
|
|
if preloaded_whale_data: |
|
|
base_analysis['whale_data'] = preloaded_whale_data.get(symbol, {'data_available': False, 'reason': 'Not preloaded'}) |
|
|
else: |
|
|
base_analysis['whale_data'] = {'data_available': False, 'reason': 'Preloading disabled'} |
|
|
|
|
|
|
|
|
strategy_scores, base_scores = await self.strategy_engine.evaluate_all_strategies(base_analysis, self.market_context) |
|
|
base_analysis['strategy_scores'] = strategy_scores |
|
|
base_analysis['base_strategy_scores'] = base_scores |
|
|
|
|
|
if base_scores: |
|
|
best_strategy = max(base_scores.items(), key=lambda x: x[1]) |
|
|
best_strategy_name = best_strategy[0] |
|
|
best_strategy_score = best_strategy[1] |
|
|
base_analysis['recommended_strategy'] = best_strategy_name |
|
|
base_analysis['strategy_confidence'] = best_strategy_score |
|
|
base_analysis['target_strategy'] = best_strategy_name if best_strategy_score > 0.3 else 'GENERIC' |
|
|
|
|
|
|
|
|
enhanced_score = self._calculate_enhanced_final_score(base_analysis) |
|
|
base_analysis['enhanced_final_score'] = enhanced_score |
|
|
|
|
|
return base_analysis |
|
|
|
|
|
except Exception as strategy_error: |
|
|
print(f"❌ Error in advanced analysis for {symbol}: {strategy_error}") |
|
|
return base_analysis |
|
|
|
|
|
except Exception as error: |
|
|
print(f"❌ Fatal error in enhanced processing for {raw_data.get('symbol', 'unknown')}: {error}") |
|
|
return None |
|
|
|
|
|
def _create_dataframe(self, candles): |
|
|
|
|
|
try: |
|
|
if not candles: return pd.DataFrame() |
|
|
df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) |
|
|
df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float) |
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') |
|
|
df.set_index('timestamp', inplace=True) |
|
|
df.sort_index(inplace=True) |
|
|
return df |
|
|
except Exception as e: |
|
|
print(f"❌ Error creating DataFrame: {e}") |
|
|
return pd.DataFrame() |
|
|
|
|
|
|
|
|
def _calculate_enhanced_final_score(self, analysis): |
|
|
"""(محدث V6.6) استخدام درجة الأخبار الإحصائية (الربح/الخسارة الفعلي) بدلاً من VADER الخام.""" |
|
|
try: |
|
|
base_score = analysis.get('final_score', 0) |
|
|
pattern_confidence = analysis.get('pattern_analysis', {}).get('pattern_confidence', 0) |
|
|
strategy_confidence = analysis.get('strategy_confidence', 0) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
statistical_pnl = analysis.get('statistical_news_pnl', 0.0) |
|
|
|
|
|
|
|
|
|
|
|
clamped_pnl = max(min(statistical_pnl, 3.0), -3.0) |
|
|
|
|
|
|
|
|
|
|
|
normalized_news_score = (clamped_pnl + 3.0) / 6.0 |
|
|
|
|
|
|
|
|
mc_distribution = analysis.get('monte_carlo_distribution') |
|
|
monte_carlo_score = 0 |
|
|
|
|
|
if mc_distribution: |
|
|
prob_gain = mc_distribution.get('probability_of_gain', 0) |
|
|
var_95_value = mc_distribution.get('risk_metrics', {}).get('VaR_95_value', 0) |
|
|
current_price = analysis.get('current_price', 1) |
|
|
|
|
|
if current_price > 0: |
|
|
normalized_var = var_95_value / current_price |
|
|
risk_penalty = 1.0 |
|
|
if normalized_var > 0.05: risk_penalty = 0.5 |
|
|
elif normalized_var > 0.03: risk_penalty = 0.8 |
|
|
|
|
|
normalized_prob_score = max(0.0, (prob_gain - 0.5) * 2) |
|
|
monte_carlo_score = normalized_prob_score * risk_penalty |
|
|
else: |
|
|
monte_carlo_score = 0 |
|
|
|
|
|
|
|
|
whale_confidence = 0 |
|
|
whale_data = analysis.get('whale_data') |
|
|
if whale_data and whale_data.get('data_available'): |
|
|
signal = whale_data.get('trading_signal', {}) |
|
|
if signal.get('action') != 'HOLD' and signal.get('confidence', 0) >= 0.5: |
|
|
whale_confidence = signal.get('confidence', 0) |
|
|
|
|
|
|
|
|
components = [] |
|
|
weights = [] |
|
|
|
|
|
|
|
|
if base_score > 0: components.append(base_score); weights.append(0.15) |
|
|
if monte_carlo_score > 0: components.append(monte_carlo_score); weights.append(0.20) |
|
|
if pattern_confidence > 0: components.append(pattern_confidence); weights.append(0.20) |
|
|
if strategy_confidence > 0: components.append(strategy_confidence); weights.append(0.15) |
|
|
if whale_confidence > 0: components.append(whale_confidence); weights.append(0.20) |
|
|
|
|
|
|
|
|
|
|
|
components.append(normalized_news_score); weights.append(0.10) |
|
|
|
|
|
|
|
|
if not components: return 0 |
|
|
total_weight = sum(weights) |
|
|
if total_weight == 0: return 0 |
|
|
enhanced_score = sum(comp * weight for comp, weight in zip(components, weights)) / total_weight |
|
|
|
|
|
|
|
|
exhaustion_penalty_factor = 1.0 |
|
|
|
|
|
|
|
|
price_change_24h = analysis.get('price_change_24h', 0) |
|
|
rsi_1d = analysis.get('advanced_indicators', {}).get('1d', {}).get('rsi', 50) |
|
|
rsi_4h = analysis.get('advanced_indicators', {}).get('4h', {}).get('rsi', 50) |
|
|
|
|
|
if price_change_24h > 60 and (rsi_1d > 80 or rsi_4h > 80): |
|
|
exhaustion_penalty_factor = 0.4 |
|
|
elif price_change_24h > 40 and (rsi_1d > 75 or rsi_4h > 75): |
|
|
exhaustion_penalty_factor = 0.6 |
|
|
|
|
|
if exhaustion_penalty_factor < 1.0: |
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
final_penalized_score = enhanced_score * exhaustion_penalty_factor |
|
|
|
|
|
return min(max(final_penalized_score, 0.0), 1.0) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Error calculating enhanced score: {e}") |
|
|
return analysis.get('final_score', 0) |
|
|
|
|
|
|
|
|
async def process_and_score_symbol(self, raw_data): |
|
|
"""(محدث V6.3) إضافة price_change_24h إلى القاموس""" |
|
|
try: |
|
|
symbol = raw_data['symbol'] |
|
|
ohlcv_data = raw_data.get('ohlcv') |
|
|
if not ohlcv_data: return None |
|
|
current_price = raw_data.get('current_price', 0) |
|
|
layer1_score = raw_data.get('layer1_score', 0) |
|
|
reasons = raw_data.get('reasons_for_candidacy', []) |
|
|
final_score = layer1_score |
|
|
successful_timeframes = raw_data.get('successful_timeframes', 0) |
|
|
|
|
|
|
|
|
price_change_24h = raw_data.get('price_change_24h', 0) |
|
|
|
|
|
|
|
|
return { |
|
|
'symbol': symbol, 'current_price': current_price, 'final_score': final_score, |
|
|
'enhanced_final_score': final_score, 'reasons_for_candidacy': reasons, |
|
|
'layer1_score': layer1_score, 'ohlcv': ohlcv_data, |
|
|
'successful_timeframes': successful_timeframes, |
|
|
'price_change_24h': price_change_24h |
|
|
} |
|
|
except Exception as error: |
|
|
print(f"❌ Error in basic symbol processing {raw_data.get('symbol', 'unknown')}: {error}") |
|
|
return None |
|
|
|
|
|
def filter_top_candidates(self, candidates, number_of_candidates=10): |
|
|
|
|
|
valid_candidates = [c for c in candidates if c is not None and isinstance(c, dict)] |
|
|
if not valid_candidates: print("❌ No valid candidates to filter"); return [] |
|
|
sorted_candidates = sorted(valid_candidates, key=lambda c: c.get('enhanced_final_score', 0), reverse=True) |
|
|
top_candidates = sorted_candidates[:number_of_candidates] |
|
|
|
|
|
print(f"🎖️ Top {len(top_candidates)} Candidates:") |
|
|
for i, c in enumerate(top_candidates): |
|
|
score = c.get('enhanced_final_score', 0); strategy = c.get('recommended_strategy', 'GENERIC'); mc_dist = c.get('monte_carlo_distribution'); pattern = c.get('pattern_analysis', {}).get('pattern_detected', 'no_pattern'); symbol = c.get('symbol', 'UNKNOWN'); timeframes = c.get('successful_timeframes', 0) |
|
|
print(f" {i+1}. {symbol}: 📊 {score:.3f} | TFs: {timeframes}/6") |
|
|
if mc_dist: |
|
|
mc_pi_90 = mc_dist.get('prediction_interval_90', [0,0]); mc_var = mc_dist.get('risk_metrics', {}).get('VaR_95_value', 0) |
|
|
print(f" 🎯 MonteCarlo: 90% PI [{mc_pi_90[0]:.4f} - {mc_pi_90[1]:.4f}] | VaR: ${mc_var:.4f}") |
|
|
print(f" 🎯 Strategy: {strategy} | Pattern: {pattern}") |
|
|
whale_data = c.get('whale_data') |
|
|
if whale_data and whale_data.get('data_available'): |
|
|
signal = whale_data.get('trading_signal', {}); print(f" 🐋 Whale: {signal.get('action', 'HOLD')} (Conf: {signal.get('confidence', 0):.2f})") |
|
|
|
|
|
if 'statistical_news_pnl' in c: |
|
|
print(f" 📰 News (Learned PnL): {c['statistical_news_pnl']:.2f}%") |
|
|
return top_candidates |
|
|
|
|
|
async def process_multiple_symbols_parallel(self, symbols_data_list, preloaded_whale_data: dict, max_concurrent=5): |
|
|
|
|
|
semaphore = asyncio.Semaphore(max_concurrent) |
|
|
tasks_results = [] |
|
|
async def process_symbol_with_semaphore(symbol_data): |
|
|
async with semaphore: |
|
|
try: |
|
|
return await self.process_and_score_symbol_enhanced(symbol_data, preloaded_whale_data) |
|
|
except Exception as e: |
|
|
return e |
|
|
try: |
|
|
batch_tasks = [asyncio.create_task(process_symbol_with_semaphore(sd)) for sd in symbols_data_list] |
|
|
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=False) |
|
|
successful_results = [] |
|
|
for result in batch_results: |
|
|
if isinstance(result, Exception): raise result |
|
|
if isinstance(result, dict): successful_results.append(result) |
|
|
return successful_results |
|
|
except Exception as error: |
|
|
raise error |
|
|
|
|
|
def safe_json_parse(json_string): |
|
|
|
|
|
if not json_string: return None |
|
|
try: |
|
|
return json.loads(json_string) |
|
|
except json.JSONDecodeError: |
|
|
try: |
|
|
s = str(json_string).replace("'", '"'); s = re.sub(r'\\"', '"', s); s = re.sub(r'[\n\t]', ' ', s); s = re.sub(r'(?<!")(\b\w+\b)(?=\s*:)', r'"\1"', s); s = re.sub(r':\s*(\btrue\b|\bfalse\b|\bnull\b)(?=[,\s}])', r': \1', s); s = re.sub(r',\s*([}\]])', r'\1', s) |
|
|
return json.loads(s) |
|
|
except json.JSONDecodeError: return None |
|
|
|
|
|
print("✅ ML Processor loaded - V6.6 (Statistical News Score)") |