Spaces:
Running
Running
| # trade_manager.py (Updated to integrate LearningHubManager) | |
| import asyncio | |
| import json | |
| import time | |
| import traceback | |
| from datetime import datetime, timedelta | |
| from helpers import safe_float_conversion, _apply_patience_logic | |
| # (The _DynamicExitEngine and pandas/ta imports remain unchanged) | |
| try: | |
| import pandas as pd | |
| import pandas_ta as ta | |
| PANDAS_TA_AVAILABLE = True | |
| except ImportError: | |
| PANDAS_TA_AVAILABLE = False | |
| print("โ ๏ธ pandas_ta not available. ATR Trailing Stop will be disabled.") | |
| # ๐ด (The _DynamicExitEngine class remains unchanged from the previous version) | |
| class _DynamicExitEngine: | |
| def __init__(self, data_manager): | |
| self.data_manager = data_manager | |
| async def evaluate_exit(self, trade: dict, current_price: float): | |
| """ | |
| ููููู ู ุง ุฅุฐุง ูุงู ูุฌุจ ุฅุบูุงู ุงูุตููุฉ ุจูุงุกู | |
| (Code omitted for brevity - this class is identical to trade_manager (26).py) | |
| """ | |
| if not PANDAS_TA_AVAILABLE: | |
| return await self._evaluate_fixed_exit(trade, current_price) | |
| try: | |
| exit_profile = trade.get('decision_data', {}).get('exit_profile', 'FIXED_TARGET') | |
| exit_params = trade.get('decision_data', {}).get('exit_parameters', {}) | |
| hard_stop = trade.get('stop_loss') | |
| take_profit = trade.get('take_profit') | |
| if hard_stop and current_price <= hard_stop: | |
| return True, f"Hard Stop Loss hit: {current_price} <= {hard_stop}", trade | |
| if take_profit and current_price >= take_profit: | |
| return True, f"Final Take Profit hit: {current_price} >= {take_profit}", trade | |
| dynamic_stop = trade.get('dynamic_stop_loss') | |
| if dynamic_stop and current_price <= dynamic_stop: | |
| return True, f"Dynamic Trailing Stop hit: {current_price} <= {dynamic_stop}", trade | |
| if exit_profile == "ATR_TRAILING": | |
| return await self._evaluate_atr_trailing(trade, current_price, exit_params) | |
| elif exit_profile == "TIME_BASED": | |
| return await self._evaluate_time_based(trade, current_price, exit_params) | |
| elif exit_profile == "FIXED_TARGET": | |
| return await self._evaluate_break_even(trade, current_price, exit_params.get("break_even_trigger_percent", 0)) | |
| elif exit_profile == "SIGNAL_BASED": | |
| pass | |
| if exit_profile != "ATR_TRAILING": | |
| return await self._evaluate_break_even(trade, current_price, exit_params.get("break_even_trigger_percent", 0)) | |
| return False, "No exit criteria met", trade | |
| except Exception as e: | |
| print(f"โ Error in DynamicExitEngine for {trade.get('symbol')}: {e}") | |
| traceback.print_exc() | |
| return await self._evaluate_fixed_exit(trade, current_price) | |
| async def _evaluate_fixed_exit(self, trade: dict, current_price: float): | |
| hard_stop = trade.get('stop_loss') | |
| take_profit = trade.get('take_profit') | |
| if hard_stop and current_price <= hard_stop: | |
| return True, f"Hard Stop Loss hit: {current_price} <= {hard_stop}", trade | |
| if take_profit and current_price >= take_profit: | |
| return True, f"Final Take Profit hit: {current_price} >= {take_profit}", trade | |
| dynamic_stop = trade.get('dynamic_stop_loss') | |
| if dynamic_stop and current_price <= dynamic_stop: | |
| return True, f"Dynamic Stop hit: {current_price} <= {dynamic_stop}", trade | |
| return False, "No exit criteria met", trade | |
| async def _evaluate_atr_trailing(self, trade: dict, current_price: float, params: dict): | |
| try: | |
| atr_period = params.get("atr_period", 14) | |
| atr_multiplier = params.get("atr_multiplier", 2.0) | |
| break_even_trigger_percent = params.get("break_even_trigger_percent", 0) | |
| ohlcv_data = await self._fetch_ohlcv_for_atr(trade['symbol'], '15m', atr_period + 50) | |
| if ohlcv_data is None: | |
| return False, "ATR data unavailable", trade | |
| df = pd.DataFrame(ohlcv_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) | |
| df['atr'] = ta.atr(df['high'], df['low'], df['close'], length=atr_period) | |
| if df['atr'].dropna().empty: | |
| return False, "ATR calculation failed", trade | |
| current_atr = df['atr'].iloc[-1] | |
| if current_atr is None or current_atr == 0: | |
| return False, "Invalid ATR value", trade | |
| new_atr_stop = current_price - (current_atr * atr_multiplier) | |
| current_dynamic_stop = trade.get('dynamic_stop_loss', 0) | |
| entry_price = trade.get('entry_price') | |
| if break_even_trigger_percent > 0: | |
| trigger_price = entry_price * (1 + break_even_trigger_percent / 100) | |
| if current_price >= trigger_price: | |
| new_atr_stop = max(new_atr_stop, entry_price) | |
| if new_atr_stop > current_dynamic_stop: | |
| trade['dynamic_stop_loss'] = new_atr_stop | |
| return False, "ATR Trailing logic executed", trade | |
| except Exception as e: | |
| print(f"โ Error in ATR Trailing logic for {trade.get('symbol')}: {e}") | |
| return False, "ATR logic failed", trade | |
| async def _evaluate_time_based(self, trade: dict, current_price: float, params: dict): | |
| exit_after_minutes = params.get("exit_after_minutes", 0) | |
| if exit_after_minutes <= 0: | |
| return False, "Time stop not configured", trade | |
| entry_time = datetime.fromisoformat(trade.get('entry_timestamp')) | |
| elapsed_minutes = (datetime.now() - entry_time).total_seconds() / 60 | |
| if elapsed_minutes >= exit_after_minutes: | |
| return True, f"Time Stop hit: {elapsed_minutes:.1f}m >= {exit_after_minutes}m", trade | |
| return False, "Time stop not hit", trade | |
| async def _evaluate_break_even(self, trade: dict, current_price: float, break_even_trigger_percent: float): | |
| if break_even_trigger_percent <= 0: | |
| return False, "Break-even not configured", trade | |
| entry_price = trade.get('entry_price') | |
| current_dynamic_stop = trade.get('dynamic_stop_loss', 0) | |
| if current_dynamic_stop < entry_price: | |
| trigger_price = entry_price * (1 + break_even_trigger_percent / 100) | |
| if current_price >= trigger_price: | |
| trade['dynamic_stop_loss'] = entry_price | |
| return False, "Break-even logic executed", trade | |
| async def _fetch_ohlcv_for_atr(self, symbol, timeframe, limit): | |
| try: | |
| if not self.data_manager or not self.data_manager.exchange: | |
| return None | |
| loop = asyncio.get_event_loop() | |
| ohlcv_data = await loop.run_in_executor( | |
| None, | |
| lambda: self.data_manager.exchange.fetch_ohlcv(symbol, timeframe, limit=limit) | |
| ) | |
| if ohlcv_data and len(ohlcv_data) >= limit: | |
| return ohlcv_data | |
| else: | |
| return None | |
| except Exception as e: | |
| print(f"โ ๏ธ Failed to fetch OHLCV for ATR ({symbol}): {e}") | |
| return None | |
| # ๐ด (End of unchanged _DynamicExitEngine class) | |
| class TradeManager: | |
| # ๐ด --- START OF CHANGE --- ๐ด | |
| def __init__(self, r2_service, learning_hub=None, data_manager=None, state_manager=None): | |
| self.r2_service = r2_service | |
| self.learning_hub = learning_hub # (Changed from learning_engine) | |
| self.data_manager = data_manager | |
| self.state_manager = state_manager | |
| self.monitoring_tasks = {} | |
| self.is_running = False | |
| self.monitoring_errors = {} | |
| self.max_consecutive_errors = 5 | |
| self.exit_engine = _DynamicExitEngine(self.data_manager) | |
| # ๐ด --- END OF CHANGE --- ๐ด | |
| # ๐ด --- START OF CHANGE --- ๐ด | |
| # (Added 'decision_context' parameter to capture data for the Reflector) | |
| async def open_trade(self, symbol, decision, current_price, decision_context: dict): | |
| # ๐ด --- END OF CHANGE --- ๐ด | |
| try: | |
| trade_type = decision.get("trade_type", "LONG") | |
| if trade_type == "SHORT": | |
| print(f"โ ๏ธ SHORT trade rejected for {symbol}. SPOT only system.") | |
| await self.r2_service.save_system_logs_async({ | |
| "trade_open_rejected": True, "reason": "SHORT trade not allowed", "symbol": symbol | |
| }) | |
| return None | |
| portfolio_state = await self.r2_service.get_portfolio_state_async() | |
| available_capital = portfolio_state.get("current_capital_usd", 0) | |
| if available_capital < 1: | |
| print(f"โ Insufficient capital for {symbol}: {available_capital}") | |
| return None | |
| expected_target_minutes = decision.get('expected_target_minutes', 15) | |
| expected_target_minutes = max(5, min(expected_target_minutes, 45)) | |
| expected_target_time = (datetime.now() + timedelta(minutes=expected_target_minutes)).isoformat() | |
| strategy = decision.get('strategy', 'GENERIC') | |
| exit_profile = decision.get('exit_profile', 'FIXED_TARGET') | |
| # ๐ด --- START OF CHANGE --- ๐ด | |
| # (This is the LLM's decision JSON) | |
| decision_data_for_trade = decision.copy() | |
| # (Add the environmental context for the Reflector's analysis) | |
| decision_data_for_trade['market_context_at_decision'] = decision_context.get('market', {}) | |
| decision_data_for_trade['indicators_at_decision'] = decision_context.get('indicators', {}) | |
| # ๐ด --- END OF CHANGE --- ๐ด | |
| trades = await self.r2_service.get_open_trades_async() | |
| new_trade = { | |
| "id": str(int(datetime.now().timestamp())), | |
| "symbol": symbol, | |
| "entry_price": current_price, | |
| "entry_timestamp": datetime.now().isoformat(), | |
| # ๐ด --- START OF CHANGE --- ๐ด | |
| # (Save the combined decision + context object) | |
| "decision_data": decision_data_for_trade, | |
| # ๐ด --- END OF CHANGE --- ๐ด | |
| "status": "OPEN", | |
| "stop_loss": decision.get("stop_loss"), | |
| "take_profit": decision.get("take_profit"), | |
| "dynamic_stop_loss": decision.get("stop_loss"), | |
| "trade_type": "LONG", | |
| "position_size_usd": available_capital, | |
| "expected_target_minutes": expected_target_minutes, | |
| "expected_target_time": expected_target_time, | |
| "is_monitored": True, | |
| "strategy": strategy, | |
| "monitoring_started": False | |
| } | |
| trades.append(new_trade) | |
| await self.r2_service.save_open_trades_async(trades) | |
| portfolio_state["invested_capital_usd"] = available_capital | |
| portfolio_state["current_capital_usd"] = 0.0 | |
| portfolio_state["total_trades"] = portfolio_state.get("total_trades", 0) + 1 | |
| await self.r2_service.save_portfolio_state_async(portfolio_state) | |
| await self.r2_service.save_system_logs_async({ | |
| "new_trade_opened": True, "symbol": symbol, "position_size": available_capital, | |
| "strategy": strategy, "exit_profile": exit_profile | |
| }) | |
| print(f"โ New LONG trade opened for {symbol} (Strategy: {strategy} | Exit Profile: {exit_profile})") | |
| return new_trade | |
| except Exception as e: | |
| print(f"โ Failed to open trade for {symbol}: {e}") | |
| await self.r2_service.save_system_logs_async({ | |
| "trade_open_failed": True, "symbol": symbol, "error": str(e) | |
| }) | |
| raise | |
| async def close_trade(self, trade_to_close, close_price, reason="System Close"): | |
| try: | |
| symbol = trade_to_close.get('symbol') | |
| trade_to_close['status'] = 'CLOSED' | |
| trade_to_close['close_price'] = close_price | |
| trade_to_close['close_timestamp'] = datetime.now().isoformat() | |
| trade_to_close['is_monitored'] = False | |
| entry_price = trade_to_close['entry_price'] | |
| position_size = trade_to_close['position_size_usd'] | |
| strategy = trade_to_close.get('strategy', 'unknown') | |
| pnl = 0.0 | |
| pnl_percent = 0.0 | |
| if entry_price and entry_price > 0 and close_price and close_price > 0: | |
| try: | |
| pnl_percent = ((close_price - entry_price) / entry_price) * 100 | |
| pnl = position_size * (pnl_percent / 100) | |
| except (TypeError, ZeroDivisionError): | |
| pnl = 0.0 | |
| pnl_percent = 0.0 | |
| trade_to_close['pnl_usd'] = pnl | |
| trade_to_close['pnl_percent'] = pnl_percent | |
| await self._archive_closed_trade(trade_to_close) | |
| await self._update_trade_summary(trade_to_close) | |
| portfolio_state = await self.r2_service.get_portfolio_state_async() | |
| current_capital = portfolio_state.get("current_capital_usd", 0) | |
| new_capital = current_capital + position_size + pnl | |
| portfolio_state["current_capital_usd"] = new_capital | |
| portfolio_state["invested_capital_usd"] = 0.0 | |
| if pnl > 0: | |
| portfolio_state["winning_trades"] = portfolio_state.get("winning_trades", 0) + 1 | |
| portfolio_state["total_profit_usd"] = portfolio_state.get("total_profit_usd", 0.0) + pnl | |
| elif pnl < 0: | |
| portfolio_state["total_loss_usd"] = portfolio_state.get("total_loss_usd", 0.0) + abs(pnl) | |
| await self.r2_service.save_portfolio_state_async(portfolio_state) | |
| open_trades = await self.r2_service.get_open_trades_async() | |
| trades_to_keep = [t for t in open_trades if t.get('id') != trade_to_close.get('id')] | |
| await self.r2_service.save_open_trades_async(trades_to_keep) | |
| if symbol in self.monitoring_tasks: | |
| del self.monitoring_tasks[symbol] | |
| if symbol in self.monitoring_errors: | |
| del self.monitoring_errors[symbol] | |
| await self.r2_service.save_system_logs_async({ | |
| "trade_closed": True, "symbol": symbol, "pnl_usd": pnl, "pnl_percent": pnl_percent, | |
| "new_capital": new_capital, "strategy": strategy, "trade_type": "LONG", "reason": reason | |
| }) | |
| # ๐ด --- START OF CHANGE --- ๐ด | |
| # (Trigger the new Learning Hub) | |
| if self.learning_hub and self.learning_hub.initialized: | |
| print(f"๐ง Triggering Learning Hub for {symbol}...") | |
| await self.learning_hub.analyze_trade_and_learn(trade_to_close, reason) | |
| # ๐ด --- END OF CHANGE --- ๐ด | |
| print(f"โ Trade closed for {symbol} - Reason: {reason} - PnL: {pnl_percent:+.2f}%") | |
| return True | |
| except Exception as e: | |
| print(f"โ Failed to close trade for {trade_to_close.get('symbol')}: {e}") | |
| await self.r2_service.save_system_logs_async({ | |
| "trade_close_failed": True, "symbol": trade_to_close.get('symbol'), "error": str(e) | |
| }) | |
| raise | |
| async def update_trade(self, trade_to_update, re_analysis_decision): | |
| # (This function remains unchanged from trade_manager (26).py) | |
| # (It correctly updates exit profiles based on re-analysis) | |
| try: | |
| symbol = trade_to_update.get('symbol') | |
| if re_analysis_decision.get('action') == "UPDATE_TRADE": | |
| trade_to_update['stop_loss'] = re_analysis_decision['new_stop_loss'] | |
| trade_to_update['take_profit'] = re_analysis_decision['new_take_profit'] | |
| trade_to_update['dynamic_stop_loss'] = re_analysis_decision['new_stop_loss'] | |
| trade_to_update['decision_data']['exit_profile'] = re_analysis_decision['new_exit_profile'] | |
| trade_to_update['decision_data']['exit_parameters'] = re_analysis_decision['new_exit_parameters'] | |
| print(f" ๐ {symbol}: Exit profile updated to {re_analysis_decision['new_exit_profile']}") | |
| new_expected_minutes = re_analysis_decision.get('new_expected_minutes') | |
| if new_expected_minutes: | |
| new_expected_minutes = max(5, min(new_expected_minutes, 45)) | |
| trade_to_update['expected_target_minutes'] = new_expected_minutes | |
| trade_to_update['expected_target_time'] = (datetime.now() + timedelta(minutes=new_expected_minutes)).isoformat() | |
| original_strategy = trade_to_update.get('strategy', re_analysis_decision.get('strategy', 'GENERIC')) | |
| trade_to_update['strategy'] = original_strategy | |
| trade_to_update['decision_data']['reasoning'] = re_analysis_decision.get('reasoning') | |
| trade_to_update['is_monitored'] = True | |
| trade_to_update['trade_type'] = "LONG" | |
| open_trades = await self.r2_service.get_open_trades_async() | |
| for i, trade in enumerate(open_trades): | |
| if trade.get('id') == trade_to_update.get('id'): | |
| open_trades[i] = trade_to_update | |
| break | |
| await self.r2_service.save_open_trades_async(open_trades) | |
| await self.r2_service.save_system_logs_async({ | |
| "trade_updated": True, "symbol": symbol, "action": "UPDATE_TRADE", | |
| "strategy": original_strategy, | |
| "new_exit_profile": re_analysis_decision.get('new_exit_profile', 'N/A') | |
| }) | |
| print(f"โ Trade updated for {symbol}") | |
| return True | |
| except Exception as e: | |
| print(f"โ Failed to update trade {trade_to_update.get('symbol')}: {e}") | |
| raise | |
| async def immediate_close_trade(self, symbol, close_price, reason="Immediate Monitor"): | |
| # (This function remains unchanged) | |
| try: | |
| open_trades = await self.r2_service.get_open_trades_async() | |
| trade_to_close = next((t for t in open_trades if t['symbol'] == symbol and t['status'] == 'OPEN'), None) | |
| if not trade_to_close: | |
| print(f"โ ๏ธ No open trade found for {symbol} to close immediately.") | |
| return False | |
| await self.close_trade(trade_to_close, close_price, reason) | |
| return True | |
| except Exception as e: | |
| print(f"โ Failed to immediate_close {symbol}: {e}") | |
| return False | |
| async def start_trade_monitoring(self): | |
| # (This function remains unchanged, it correctly starts the _monitor_single_trade tasks) | |
| self.is_running = True | |
| print(f"๐ Starting trade monitoring (Dynamic Exit Engine) (Pandas: {PANDAS_TA_AVAILABLE})...") | |
| while self.is_running: | |
| try: | |
| open_trades = await self.r2_service.get_open_trades_async() | |
| current_time = time.time() | |
| for trade in open_trades: | |
| symbol = trade['symbol'] | |
| if self.monitoring_errors.get(symbol, 0) >= self.max_consecutive_errors: | |
| if symbol in self.monitoring_tasks: del self.monitoring_tasks[symbol] | |
| continue | |
| if symbol not in self.monitoring_tasks: | |
| task = asyncio.create_task(self._monitor_single_trade(trade)) | |
| self.monitoring_tasks[symbol] = {'task': task, 'start_time': current_time, 'trade_object': trade} | |
| trade['monitoring_started'] = True | |
| current_symbols = {trade['symbol'] for trade in open_trades} | |
| for symbol in list(self.monitoring_tasks.keys()): | |
| if symbol not in current_symbols: | |
| task_info = self.monitoring_tasks[symbol] | |
| if not task_info['task'].done(): task_info['task'].cancel() | |
| del self.monitoring_tasks[symbol] | |
| if symbol in self.monitoring_errors: del self.monitoring_errors[symbol] | |
| await asyncio.sleep(10) | |
| except Exception as error: | |
| print(f"โ Error in main trade monitoring loop: {error}") | |
| await asyncio.sleep(30) | |
| async def _monitor_single_trade(self, trade_object): | |
| # (This function remains unchanged) | |
| # (It correctly uses the state_manager lock and the exit_engine) | |
| symbol = trade_object['symbol'] | |
| local_trade = trade_object.copy() | |
| print(f"๐ Starting tactical monitoring: {symbol} (Profile: {local_trade.get('decision_data', {}).get('exit_profile', 'N/A')})") | |
| while (symbol in self.monitoring_tasks and | |
| self.is_running and | |
| self.monitoring_errors.get(symbol, 0) < self.max_consecutive_errors): | |
| try: | |
| if self.state_manager and self.state_manager.trade_analysis_lock.locked(): | |
| print(f"โธ๏ธ [Monitor] Pausing {symbol} (Strategic re-analysis active...)") | |
| await asyncio.sleep(10) | |
| continue | |
| if not self.data_manager: | |
| await asyncio.sleep(15); continue | |
| try: | |
| current_price = await asyncio.wait_for(self.data_manager.get_latest_price_async(symbol), timeout=10) | |
| except asyncio.TimeoutError: | |
| print(f"โฐ Price timeout for {symbol}"); self._increment_monitoring_error(symbol); await asyncio.sleep(15); continue | |
| if not current_price: | |
| self._increment_monitoring_error(symbol); await asyncio.sleep(15); continue | |
| should_close, close_reason, updated_local_trade = await self.exit_engine.evaluate_exit( | |
| local_trade, | |
| current_price | |
| ) | |
| local_trade = updated_local_trade | |
| if should_close: | |
| print(f"๐ [Monitor] Exit signal for {symbol}: {close_reason}") | |
| if self.r2_service.acquire_lock(): | |
| try: | |
| latest_trade_data = await self.r2_service.get_trade_by_symbol_async(symbol) | |
| if latest_trade_data and latest_trade_data['status'] == 'OPEN': | |
| await self.immediate_close_trade(symbol, current_price, f"Tactical Monitor: {close_reason}") | |
| else: | |
| print(f"โ ๏ธ {symbol} no longer open, tactical close cancelled.") | |
| except Exception as close_error: | |
| print(f"โ Auto-close failed for {symbol}: {close_error}") | |
| finally: | |
| self.r2_service.release_lock() | |
| break | |
| if symbol in self.monitoring_errors: | |
| self.monitoring_errors[symbol] = 0 | |
| await asyncio.sleep(60) | |
| except Exception as error: | |
| error_count = self._increment_monitoring_error(symbol) | |
| print(f"โ Error monitoring {symbol} (Err #{error_count}): {error}") | |
| traceback.print_exc() | |
| if error_count >= self.max_consecutive_errors: | |
| print(f"๐จ Stopping monitoring for {symbol} due to repeated errors") | |
| await self.r2_service.save_system_logs_async({"monitoring_stopped": True, "symbol": symbol, "error_count": error_count, "error": str(error)}) | |
| break | |
| await asyncio.sleep(30) | |
| print(f"๐ Stopping tactical monitoring for: {symbol}") | |
| if symbol in self.monitoring_tasks: del self.monitoring_tasks[symbol] | |
| if symbol in self.monitoring_errors: del self.monitoring_errors[symbol] | |
| def _increment_monitoring_error(self, symbol): | |
| # (This function remains unchanged) | |
| self.monitoring_errors.setdefault(symbol, 0) | |
| self.monitoring_errors[symbol] += 1 | |
| return self.monitoring_errors[symbol] | |
| def stop_monitoring(self): | |
| # (This function remains unchanged) | |
| self.is_running = False | |
| print("๐ Stopping all monitoring tasks...") | |
| for symbol, task_info in self.monitoring_tasks.items(): | |
| if not task_info['task'].done(): | |
| task_info['task'].cancel() | |
| self.monitoring_tasks.clear() | |
| self.monitoring_errors.clear() | |
| async def _archive_closed_trade(self, closed_trade): | |
| # (This function remains unchanged) | |
| try: | |
| key = "closed_trades_history.json" | |
| try: | |
| response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key) | |
| history = json.loads(response['Body'].read()) | |
| except Exception: | |
| history = [] | |
| history.append(closed_trade) | |
| history = history[-1000:] | |
| data_json = json.dumps(history, indent=2).encode('utf-8') | |
| self.r2_service.s3_client.put_object(Bucket="trading", Key=key, Body=data_json, ContentType="application/json") | |
| except Exception as e: | |
| print(f"โ Failed to archive trade: {e}") | |
| async def _update_trade_summary(self, closed_trade): | |
| # (This function remains unchanged) | |
| try: | |
| key = "trade_summary.json" | |
| try: | |
| response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key) | |
| summary = json.loads(response['Body'].read()) | |
| except Exception: | |
| summary = {"total_trades": 0, "winning_trades": 0, "losing_trades": 0, "total_profit_usd": 0.0, "total_loss_usd": 0.0, "win_percentage": 0.0, "avg_profit_per_trade": 0.0, "avg_loss_per_trade": 0.0, "largest_win": 0.0, "largest_loss": 0.0, "last_updated": datetime.now().isoformat()} | |
| pnl = closed_trade.get('pnl_usd', 0.0) | |
| summary['total_trades'] += 1 | |
| if pnl >= 0: | |
| summary['winning_trades'] += 1 | |
| summary['total_profit_usd'] += pnl | |
| summary['largest_win'] = max(summary.get('largest_win', 0), pnl) | |
| else: | |
| summary['losing_trades'] += 1 | |
| summary['total_loss_usd'] += abs(pnl) | |
| summary['largest_loss'] = max(summary.get('largest_loss', 0), abs(pnl)) | |
| if summary['total_trades'] > 0: | |
| summary['win_percentage'] = (summary['winning_trades'] / summary['total_trades']) * 100 | |
| if summary['winning_trades'] > 0: | |
| summary['avg_profit_per_trade'] = summary['total_profit_usd'] / summary['winning_trades'] | |
| if summary['losing_trades'] > 0: | |
| summary['avg_loss_per_trade'] = summary['total_loss_usd'] / summary['losing_trades'] | |
| summary['last_updated'] = datetime.now().isoformat() | |
| data_json = json.dumps(summary, indent=2).encode('utf-8') | |
| self.r2_service.s3_client.put_object(Bucket="trading", Key=key, Body=data_json, ContentType="application/json") | |
| except Exception as e: | |
| print(f"โ Failed to update trade summary: {e}") | |
| async def get_open_trades(self): | |
| # (This function remains unchanged) | |
| try: | |
| return await self.r2_service.get_open_trades_async() | |
| except Exception as e: | |
| print(f"โ Failed to get open trades: {e}") | |
| return [] | |
| async def get_trade_by_symbol(self, symbol): | |
| # (This function remains unchanged) | |
| try: | |
| open_trades = await self.get_open_trades() | |
| return next((t for t in open_trades if t['symbol'] == symbol and t['status'] == 'OPEN'), None) | |
| except Exception as e: | |
| print(f"โ Failed to get trade by symbol {symbol}: {e}") | |
| return None | |
| def get_monitoring_status(self): | |
| # (This function remains unchanged) | |
| return { | |
| 'is_running': self.is_running, | |
| 'active_tasks': len(self.monitoring_tasks), | |
| 'monitoring_errors': dict(self.monitoring_errors), | |
| 'max_consecutive_errors': self.max_consecutive_errors | |
| } | |
| print(f"โ Trade Manager loaded - V3 (Integrated LearningHub Trigger & Context Injection / Pandas: {PANDAS_TA_AVAILABLE})") |