Trad / ml_engine /processor.py
Riy777's picture
Update ml_engine/processor.py
7ed0e45
raw
history blame
12.7 kB
# ml_engine/processor.py (Updated to pass LearningHubManager)
import pandas as pd
import numpy as np
from datetime import datetime
import asyncio
import json
import re
# (Import components from the same folder)
from .indicators import AdvancedTechnicalAnalyzer
from .monte_carlo import MonteCarloAnalyzer
from .patterns import ChartPatternAnalyzer
from .strategies import MultiStrategyEngine
class MLProcessor:
# πŸ”΄ --- START OF CHANGE --- πŸ”΄
def __init__(self, market_context, data_manager, learning_hub): # (Changed from learning_engine)
self.market_context = market_context
self.data_manager = data_manager
self.learning_hub = learning_hub # (Changed from learning_engine)
self.technical_analyzer = AdvancedTechnicalAnalyzer()
# (Pass the hub to the strategy engine)
self.strategy_engine = MultiStrategyEngine(data_manager, learning_hub)
self.monte_carlo_analyzer = MonteCarloAnalyzer()
self.pattern_analyzer = ChartPatternAnalyzer()
self.whale_data_semaphore = asyncio.Semaphore(2)
# πŸ”΄ --- END OF CHANGE --- πŸ”΄
async def process_and_score_symbol_enhanced(self, raw_data, preloaded_whale_data: dict = None):
"""
(Unchanged logic, but now self.strategy_engine uses the learning_hub)
"""
try:
if not raw_data or not raw_data.get('ohlcv'):
return None
symbol = raw_data['symbol']
base_analysis = await self.process_and_score_symbol(raw_data)
if not base_analysis:
return None
try:
# Calculate Indicators
advanced_indicators = {}
ohlcv_available = raw_data.get('ohlcv', {})
for timeframe, candles in ohlcv_available.items():
if candles and len(candles) >= 20:
dataframe = self._create_dataframe(candles)
indicators = self.technical_analyzer.calculate_all_indicators(dataframe, timeframe)
advanced_indicators[timeframe] = indicators
base_analysis['advanced_indicators'] = advanced_indicators
# Monte Carlo (Phase 1)
monte_carlo_results = await self.monte_carlo_analyzer.generate_1h_price_distribution(ohlcv_available)
if monte_carlo_results:
base_analysis['monte_carlo_distribution'] = monte_carlo_results
base_analysis['monte_carlo_probability'] = monte_carlo_results.get('probability_of_gain', 0)
base_analysis['monte_carlo_details'] = self.monte_carlo_analyzer.simulation_results
else:
base_analysis['monte_carlo_distribution'] = None
base_analysis['monte_carlo_probability'] = 0
base_analysis['monte_carlo_details'] = self.monte_carlo_analyzer.simulation_results
# Pattern Analysis
pattern_analysis = await self.pattern_analyzer.detect_chart_patterns(ohlcv_available)
base_analysis['pattern_analysis'] = pattern_analysis
# Whale Data
if preloaded_whale_data:
base_analysis['whale_data'] = preloaded_whale_data.get(symbol, {'data_available': False, 'reason': 'Not preloaded'})
else:
base_analysis['whale_data'] = {'data_available': False, 'reason': 'Preloading disabled'}
# πŸ”΄ (This call now uses the Learning Hub via strategy_engine)
strategy_scores, base_scores = await self.strategy_engine.evaluate_all_strategies(base_analysis, self.market_context)
base_analysis['strategy_scores'] = strategy_scores
base_analysis['base_strategy_scores'] = base_scores
if base_scores:
best_strategy = max(base_scores.items(), key=lambda x: x[1])
best_strategy_name = best_strategy[0]
best_strategy_score = best_strategy[1]
base_analysis['recommended_strategy'] = best_strategy_name
base_analysis['strategy_confidence'] = best_strategy_score
base_analysis['target_strategy'] = best_strategy_name if best_strategy_score > 0.3 else 'GENERIC'
enhanced_score = self._calculate_enhanced_final_score(base_analysis)
base_analysis['enhanced_final_score'] = enhanced_score
return base_analysis
except Exception as strategy_error:
print(f"❌ Error in advanced analysis for {symbol}: {strategy_error}")
return base_analysis
except Exception as error:
print(f"❌ Fatal error in enhanced processing for {raw_data.get('symbol', 'unknown')}: {error}")
return None
def _create_dataframe(self, candles):
# (This function remains unchanged)
try:
if not candles: return pd.DataFrame()
df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
df.sort_index(inplace=True)
return df
except Exception as e:
print(f"❌ Error creating DataFrame: {e}")
return pd.DataFrame()
def _calculate_enhanced_final_score(self, analysis):
# (This function remains unchanged)
try:
base_score = analysis.get('final_score', 0)
pattern_confidence = analysis.get('pattern_analysis', {}).get('pattern_confidence', 0)
strategy_confidence = analysis.get('strategy_confidence', 0)
mc_distribution = analysis.get('monte_carlo_distribution')
monte_carlo_score = 0
if mc_distribution:
prob_gain = mc_distribution.get('probability_of_gain', 0)
var_95_value = mc_distribution.get('risk_metrics', {}).get('VaR_95_value', 0)
current_price = analysis.get('current_price', 1)
if current_price > 0:
normalized_var = var_95_value / current_price
risk_penalty = 1.0
if normalized_var > 0.05: risk_penalty = 0.5
elif normalized_var > 0.03: risk_penalty = 0.8
monte_carlo_score = prob_gain * risk_penalty
else:
monte_carlo_score = 0
whale_confidence = 0
whale_data = analysis.get('whale_data')
if whale_data and whale_data.get('data_available'):
signal = whale_data.get('trading_signal', {})
if signal.get('action') != 'HOLD' and signal.get('confidence', 0) >= 0.5:
whale_confidence = signal.get('confidence', 0)
components = []
weights = []
if base_score > 0: components.append(base_score); weights.append(0.20)
if monte_carlo_score > 0: components.append(monte_carlo_score); weights.append(0.25)
if pattern_confidence > 0: components.append(pattern_confidence); weights.append(0.25)
if strategy_confidence > 0: components.append(strategy_confidence); weights.append(0.15)
if whale_confidence > 0: components.append(whale_confidence); weights.append(0.15)
if not components: return 0
total_weight = sum(weights)
if total_weight == 0: return 0
enhanced_score = sum(comp * weight for comp, weight in zip(components, weights)) / total_weight
return min(max(enhanced_score, 0.0), 1.0)
except Exception as e:
print(f"❌ Error calculating enhanced score: {e}")
return analysis.get('final_score', 0)
async def process_and_score_symbol(self, raw_data):
# (This function remains unchanged)
try:
symbol = raw_data['symbol']
ohlcv_data = raw_data.get('ohlcv')
if not ohlcv_data: return None
current_price = raw_data.get('current_price', 0)
layer1_score = raw_data.get('layer1_score', 0)
reasons = raw_data.get('reasons_for_candidacy', [])
final_score = layer1_score
successful_timeframes = raw_data.get('successful_timeframes', 0)
return {
'symbol': symbol, 'current_price': current_price, 'final_score': final_score,
'enhanced_final_score': final_score, 'reasons_for_candidacy': reasons,
'layer1_score': layer1_score, 'ohlcv': ohlcv_data,
'successful_timeframes': successful_timeframes
}
except Exception as error:
print(f"❌ Error in basic symbol processing {raw_data.get('symbol', 'unknown')}: {error}")
return None
def filter_top_candidates(self, candidates, number_of_candidates=10):
# (This function remains unchanged)
valid_candidates = [c for c in candidates if c is not None and isinstance(c, dict)]
if not valid_candidates: print("❌ No valid candidates to filter"); return []
sorted_candidates = sorted(valid_candidates, key=lambda c: c.get('enhanced_final_score', 0), reverse=True)
top_candidates = sorted_candidates[:number_of_candidates]
print(f"πŸŽ–οΈ Top {len(top_candidates)} Candidates:")
for i, c in enumerate(top_candidates):
score = c.get('enhanced_final_score', 0); strategy = c.get('recommended_strategy', 'GENERIC'); mc_dist = c.get('monte_carlo_distribution'); pattern = c.get('pattern_analysis', {}).get('pattern_detected', 'no_pattern'); symbol = c.get('symbol', 'UNKNOWN'); timeframes = c.get('successful_timeframes', 0)
print(f" {i+1}. {symbol}: πŸ“Š {score:.3f} | TFs: {timeframes}/6")
if mc_dist:
mc_pi_90 = mc_dist.get('prediction_interval_90', [0,0]); mc_var = mc_dist.get('risk_metrics', {}).get('VaR_95_value', 0)
print(f" 🎯 MonteCarlo: 90% PI [{mc_pi_90[0]:.4f} - {mc_pi_90[1]:.4f}] | VaR: ${mc_var:.4f}")
print(f" 🎯 Strategy: {strategy} | Pattern: {pattern}")
whale_data = c.get('whale_data')
if whale_data and whale_data.get('data_available'):
signal = whale_data.get('trading_signal', {}); print(f" πŸ‹ Whale: {signal.get('action', 'HOLD')} (Conf: {signal.get('confidence', 0):.2f})")
return top_candidates
async def process_multiple_symbols_parallel(self, symbols_data_list, preloaded_whale_data: dict, max_concurrent=5):
# (This function remains unchanged)
semaphore = asyncio.Semaphore(max_concurrent)
tasks_results = []
async def process_symbol_with_semaphore(symbol_data):
async with semaphore:
try:
return await self.process_and_score_symbol_enhanced(symbol_data, preloaded_whale_data)
except Exception as e:
return e
try:
batch_tasks = [asyncio.create_task(process_symbol_with_semaphore(sd)) for sd in symbols_data_list]
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=False)
successful_results = []
for result in batch_results:
if isinstance(result, Exception): raise result
if isinstance(result, dict): successful_results.append(result)
return successful_results
except Exception as error:
raise error
def safe_json_parse(json_string):
# (This function remains unchanged)
if not json_string: return None
try:
return json.loads(json_string)
except json.JSONDecodeError:
try:
s = str(json_string).replace("'", '"'); s = re.sub(r'\\"', '"', s); s = re.sub(r'[\n\t]', ' ', s); s = re.sub(r'(?<!")(\b\w+\b)(?=\s*:)', r'"\1"', s); s = re.sub(r':\s*(\btrue\b|\bfalse\b|\bnull\b)(?=[,\s}])', r': \1', s); s = re.sub(r',\s*([}\]])', r'\1', s)
return json.loads(s)
except json.JSONDecodeError: return None
print("βœ… ML Processor loaded - (Patched to pass LearningHub)")