Spaces:
Running
Running
| # ml_engine/processor.py (Updated to pass LearningHubManager) | |
| import pandas as pd | |
| import numpy as np | |
| from datetime import datetime | |
| import asyncio | |
| import json | |
| import re | |
| # (Import components from the same folder) | |
| from .indicators import AdvancedTechnicalAnalyzer | |
| from .monte_carlo import MonteCarloAnalyzer | |
| from .patterns import ChartPatternAnalyzer | |
| from .strategies import MultiStrategyEngine | |
| class MLProcessor: | |
| # π΄ --- START OF CHANGE --- π΄ | |
| def __init__(self, market_context, data_manager, learning_hub): # (Changed from learning_engine) | |
| self.market_context = market_context | |
| self.data_manager = data_manager | |
| self.learning_hub = learning_hub # (Changed from learning_engine) | |
| self.technical_analyzer = AdvancedTechnicalAnalyzer() | |
| # (Pass the hub to the strategy engine) | |
| self.strategy_engine = MultiStrategyEngine(data_manager, learning_hub) | |
| self.monte_carlo_analyzer = MonteCarloAnalyzer() | |
| self.pattern_analyzer = ChartPatternAnalyzer() | |
| self.whale_data_semaphore = asyncio.Semaphore(2) | |
| # π΄ --- END OF CHANGE --- π΄ | |
| async def process_and_score_symbol_enhanced(self, raw_data, preloaded_whale_data: dict = None): | |
| """ | |
| (Unchanged logic, but now self.strategy_engine uses the learning_hub) | |
| """ | |
| try: | |
| if not raw_data or not raw_data.get('ohlcv'): | |
| return None | |
| symbol = raw_data['symbol'] | |
| base_analysis = await self.process_and_score_symbol(raw_data) | |
| if not base_analysis: | |
| return None | |
| try: | |
| # Calculate Indicators | |
| advanced_indicators = {} | |
| ohlcv_available = raw_data.get('ohlcv', {}) | |
| for timeframe, candles in ohlcv_available.items(): | |
| if candles and len(candles) >= 20: | |
| dataframe = self._create_dataframe(candles) | |
| indicators = self.technical_analyzer.calculate_all_indicators(dataframe, timeframe) | |
| advanced_indicators[timeframe] = indicators | |
| base_analysis['advanced_indicators'] = advanced_indicators | |
| # Monte Carlo (Phase 1) | |
| monte_carlo_results = await self.monte_carlo_analyzer.generate_1h_price_distribution(ohlcv_available) | |
| if monte_carlo_results: | |
| base_analysis['monte_carlo_distribution'] = monte_carlo_results | |
| base_analysis['monte_carlo_probability'] = monte_carlo_results.get('probability_of_gain', 0) | |
| base_analysis['monte_carlo_details'] = self.monte_carlo_analyzer.simulation_results | |
| else: | |
| base_analysis['monte_carlo_distribution'] = None | |
| base_analysis['monte_carlo_probability'] = 0 | |
| base_analysis['monte_carlo_details'] = self.monte_carlo_analyzer.simulation_results | |
| # Pattern Analysis | |
| pattern_analysis = await self.pattern_analyzer.detect_chart_patterns(ohlcv_available) | |
| base_analysis['pattern_analysis'] = pattern_analysis | |
| # Whale Data | |
| if preloaded_whale_data: | |
| base_analysis['whale_data'] = preloaded_whale_data.get(symbol, {'data_available': False, 'reason': 'Not preloaded'}) | |
| else: | |
| base_analysis['whale_data'] = {'data_available': False, 'reason': 'Preloading disabled'} | |
| # π΄ (This call now uses the Learning Hub via strategy_engine) | |
| strategy_scores, base_scores = await self.strategy_engine.evaluate_all_strategies(base_analysis, self.market_context) | |
| base_analysis['strategy_scores'] = strategy_scores | |
| base_analysis['base_strategy_scores'] = base_scores | |
| if base_scores: | |
| best_strategy = max(base_scores.items(), key=lambda x: x[1]) | |
| best_strategy_name = best_strategy[0] | |
| best_strategy_score = best_strategy[1] | |
| base_analysis['recommended_strategy'] = best_strategy_name | |
| base_analysis['strategy_confidence'] = best_strategy_score | |
| base_analysis['target_strategy'] = best_strategy_name if best_strategy_score > 0.3 else 'GENERIC' | |
| enhanced_score = self._calculate_enhanced_final_score(base_analysis) | |
| base_analysis['enhanced_final_score'] = enhanced_score | |
| return base_analysis | |
| except Exception as strategy_error: | |
| print(f"β Error in advanced analysis for {symbol}: {strategy_error}") | |
| return base_analysis | |
| except Exception as error: | |
| print(f"β Fatal error in enhanced processing for {raw_data.get('symbol', 'unknown')}: {error}") | |
| return None | |
| def _create_dataframe(self, candles): | |
| # (This function remains unchanged) | |
| try: | |
| if not candles: return pd.DataFrame() | |
| df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) | |
| df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float) | |
| df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') | |
| df.set_index('timestamp', inplace=True) | |
| df.sort_index(inplace=True) | |
| return df | |
| except Exception as e: | |
| print(f"β Error creating DataFrame: {e}") | |
| return pd.DataFrame() | |
| def _calculate_enhanced_final_score(self, analysis): | |
| # (This function remains unchanged) | |
| try: | |
| base_score = analysis.get('final_score', 0) | |
| pattern_confidence = analysis.get('pattern_analysis', {}).get('pattern_confidence', 0) | |
| strategy_confidence = analysis.get('strategy_confidence', 0) | |
| mc_distribution = analysis.get('monte_carlo_distribution') | |
| monte_carlo_score = 0 | |
| if mc_distribution: | |
| prob_gain = mc_distribution.get('probability_of_gain', 0) | |
| var_95_value = mc_distribution.get('risk_metrics', {}).get('VaR_95_value', 0) | |
| current_price = analysis.get('current_price', 1) | |
| if current_price > 0: | |
| normalized_var = var_95_value / current_price | |
| risk_penalty = 1.0 | |
| if normalized_var > 0.05: risk_penalty = 0.5 | |
| elif normalized_var > 0.03: risk_penalty = 0.8 | |
| monte_carlo_score = prob_gain * risk_penalty | |
| else: | |
| monte_carlo_score = 0 | |
| whale_confidence = 0 | |
| whale_data = analysis.get('whale_data') | |
| if whale_data and whale_data.get('data_available'): | |
| signal = whale_data.get('trading_signal', {}) | |
| if signal.get('action') != 'HOLD' and signal.get('confidence', 0) >= 0.5: | |
| whale_confidence = signal.get('confidence', 0) | |
| components = [] | |
| weights = [] | |
| if base_score > 0: components.append(base_score); weights.append(0.20) | |
| if monte_carlo_score > 0: components.append(monte_carlo_score); weights.append(0.25) | |
| if pattern_confidence > 0: components.append(pattern_confidence); weights.append(0.25) | |
| if strategy_confidence > 0: components.append(strategy_confidence); weights.append(0.15) | |
| if whale_confidence > 0: components.append(whale_confidence); weights.append(0.15) | |
| if not components: return 0 | |
| total_weight = sum(weights) | |
| if total_weight == 0: return 0 | |
| enhanced_score = sum(comp * weight for comp, weight in zip(components, weights)) / total_weight | |
| return min(max(enhanced_score, 0.0), 1.0) | |
| except Exception as e: | |
| print(f"β Error calculating enhanced score: {e}") | |
| return analysis.get('final_score', 0) | |
| async def process_and_score_symbol(self, raw_data): | |
| # (This function remains unchanged) | |
| try: | |
| symbol = raw_data['symbol'] | |
| ohlcv_data = raw_data.get('ohlcv') | |
| if not ohlcv_data: return None | |
| current_price = raw_data.get('current_price', 0) | |
| layer1_score = raw_data.get('layer1_score', 0) | |
| reasons = raw_data.get('reasons_for_candidacy', []) | |
| final_score = layer1_score | |
| successful_timeframes = raw_data.get('successful_timeframes', 0) | |
| return { | |
| 'symbol': symbol, 'current_price': current_price, 'final_score': final_score, | |
| 'enhanced_final_score': final_score, 'reasons_for_candidacy': reasons, | |
| 'layer1_score': layer1_score, 'ohlcv': ohlcv_data, | |
| 'successful_timeframes': successful_timeframes | |
| } | |
| except Exception as error: | |
| print(f"β Error in basic symbol processing {raw_data.get('symbol', 'unknown')}: {error}") | |
| return None | |
| def filter_top_candidates(self, candidates, number_of_candidates=10): | |
| # (This function remains unchanged) | |
| valid_candidates = [c for c in candidates if c is not None and isinstance(c, dict)] | |
| if not valid_candidates: print("β No valid candidates to filter"); return [] | |
| sorted_candidates = sorted(valid_candidates, key=lambda c: c.get('enhanced_final_score', 0), reverse=True) | |
| top_candidates = sorted_candidates[:number_of_candidates] | |
| print(f"ποΈ Top {len(top_candidates)} Candidates:") | |
| for i, c in enumerate(top_candidates): | |
| score = c.get('enhanced_final_score', 0); strategy = c.get('recommended_strategy', 'GENERIC'); mc_dist = c.get('monte_carlo_distribution'); pattern = c.get('pattern_analysis', {}).get('pattern_detected', 'no_pattern'); symbol = c.get('symbol', 'UNKNOWN'); timeframes = c.get('successful_timeframes', 0) | |
| print(f" {i+1}. {symbol}: π {score:.3f} | TFs: {timeframes}/6") | |
| if mc_dist: | |
| mc_pi_90 = mc_dist.get('prediction_interval_90', [0,0]); mc_var = mc_dist.get('risk_metrics', {}).get('VaR_95_value', 0) | |
| print(f" π― MonteCarlo: 90% PI [{mc_pi_90[0]:.4f} - {mc_pi_90[1]:.4f}] | VaR: ${mc_var:.4f}") | |
| print(f" π― Strategy: {strategy} | Pattern: {pattern}") | |
| whale_data = c.get('whale_data') | |
| if whale_data and whale_data.get('data_available'): | |
| signal = whale_data.get('trading_signal', {}); print(f" π Whale: {signal.get('action', 'HOLD')} (Conf: {signal.get('confidence', 0):.2f})") | |
| return top_candidates | |
| async def process_multiple_symbols_parallel(self, symbols_data_list, preloaded_whale_data: dict, max_concurrent=5): | |
| # (This function remains unchanged) | |
| semaphore = asyncio.Semaphore(max_concurrent) | |
| tasks_results = [] | |
| async def process_symbol_with_semaphore(symbol_data): | |
| async with semaphore: | |
| try: | |
| return await self.process_and_score_symbol_enhanced(symbol_data, preloaded_whale_data) | |
| except Exception as e: | |
| return e | |
| try: | |
| batch_tasks = [asyncio.create_task(process_symbol_with_semaphore(sd)) for sd in symbols_data_list] | |
| batch_results = await asyncio.gather(*batch_tasks, return_exceptions=False) | |
| successful_results = [] | |
| for result in batch_results: | |
| if isinstance(result, Exception): raise result | |
| if isinstance(result, dict): successful_results.append(result) | |
| return successful_results | |
| except Exception as error: | |
| raise error | |
| def safe_json_parse(json_string): | |
| # (This function remains unchanged) | |
| if not json_string: return None | |
| try: | |
| return json.loads(json_string) | |
| except json.JSONDecodeError: | |
| try: | |
| s = str(json_string).replace("'", '"'); s = re.sub(r'\\"', '"', s); s = re.sub(r'[\n\t]', ' ', s); s = re.sub(r'(?<!")(\b\w+\b)(?=\s*:)', r'"\1"', s); s = re.sub(r':\s*(\btrue\b|\bfalse\b|\bnull\b)(?=[,\s}])', r': \1', s); s = re.sub(r',\s*([}\]])', r'\1', s) | |
| return json.loads(s) | |
| except json.JSONDecodeError: return None | |
| print("β ML Processor loaded - (Patched to pass LearningHub)") |