Trad / trade_manager.py
Riy777's picture
Update trade_manager.py
605065a
raw
history blame
28.8 kB
# trade_manager.py (Updated to integrate LearningHubManager)
import asyncio
import json
import time
import traceback
from datetime import datetime, timedelta
from helpers import safe_float_conversion, _apply_patience_logic
# (The _DynamicExitEngine and pandas/ta imports remain unchanged)
try:
import pandas as pd
import pandas_ta as ta
PANDAS_TA_AVAILABLE = True
except ImportError:
PANDAS_TA_AVAILABLE = False
print("โš ๏ธ pandas_ta not available. ATR Trailing Stop will be disabled.")
# ๐Ÿ”ด (The _DynamicExitEngine class remains unchanged from the previous version)
class _DynamicExitEngine:
def __init__(self, data_manager):
self.data_manager = data_manager
async def evaluate_exit(self, trade: dict, current_price: float):
"""
ูŠู‚ูŠู‘ู… ู…ุง ุฅุฐุง ูƒุงู† ูŠุฌุจ ุฅุบู„ุงู‚ ุงู„ุตูู‚ุฉ ุจู†ุงุกู‹
(Code omitted for brevity - this class is identical to trade_manager (26).py)
"""
if not PANDAS_TA_AVAILABLE:
return await self._evaluate_fixed_exit(trade, current_price)
try:
exit_profile = trade.get('decision_data', {}).get('exit_profile', 'FIXED_TARGET')
exit_params = trade.get('decision_data', {}).get('exit_parameters', {})
hard_stop = trade.get('stop_loss')
take_profit = trade.get('take_profit')
if hard_stop and current_price <= hard_stop:
return True, f"Hard Stop Loss hit: {current_price} <= {hard_stop}", trade
if take_profit and current_price >= take_profit:
return True, f"Final Take Profit hit: {current_price} >= {take_profit}", trade
dynamic_stop = trade.get('dynamic_stop_loss')
if dynamic_stop and current_price <= dynamic_stop:
return True, f"Dynamic Trailing Stop hit: {current_price} <= {dynamic_stop}", trade
if exit_profile == "ATR_TRAILING":
return await self._evaluate_atr_trailing(trade, current_price, exit_params)
elif exit_profile == "TIME_BASED":
return await self._evaluate_time_based(trade, current_price, exit_params)
elif exit_profile == "FIXED_TARGET":
return await self._evaluate_break_even(trade, current_price, exit_params.get("break_even_trigger_percent", 0))
elif exit_profile == "SIGNAL_BASED":
pass
if exit_profile != "ATR_TRAILING":
return await self._evaluate_break_even(trade, current_price, exit_params.get("break_even_trigger_percent", 0))
return False, "No exit criteria met", trade
except Exception as e:
print(f"โŒ Error in DynamicExitEngine for {trade.get('symbol')}: {e}")
traceback.print_exc()
return await self._evaluate_fixed_exit(trade, current_price)
async def _evaluate_fixed_exit(self, trade: dict, current_price: float):
hard_stop = trade.get('stop_loss')
take_profit = trade.get('take_profit')
if hard_stop and current_price <= hard_stop:
return True, f"Hard Stop Loss hit: {current_price} <= {hard_stop}", trade
if take_profit and current_price >= take_profit:
return True, f"Final Take Profit hit: {current_price} >= {take_profit}", trade
dynamic_stop = trade.get('dynamic_stop_loss')
if dynamic_stop and current_price <= dynamic_stop:
return True, f"Dynamic Stop hit: {current_price} <= {dynamic_stop}", trade
return False, "No exit criteria met", trade
async def _evaluate_atr_trailing(self, trade: dict, current_price: float, params: dict):
try:
atr_period = params.get("atr_period", 14)
atr_multiplier = params.get("atr_multiplier", 2.0)
break_even_trigger_percent = params.get("break_even_trigger_percent", 0)
ohlcv_data = await self._fetch_ohlcv_for_atr(trade['symbol'], '15m', atr_period + 50)
if ohlcv_data is None:
return False, "ATR data unavailable", trade
df = pd.DataFrame(ohlcv_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['atr'] = ta.atr(df['high'], df['low'], df['close'], length=atr_period)
if df['atr'].dropna().empty:
return False, "ATR calculation failed", trade
current_atr = df['atr'].iloc[-1]
if current_atr is None or current_atr == 0:
return False, "Invalid ATR value", trade
new_atr_stop = current_price - (current_atr * atr_multiplier)
current_dynamic_stop = trade.get('dynamic_stop_loss', 0)
entry_price = trade.get('entry_price')
if break_even_trigger_percent > 0:
trigger_price = entry_price * (1 + break_even_trigger_percent / 100)
if current_price >= trigger_price:
new_atr_stop = max(new_atr_stop, entry_price)
if new_atr_stop > current_dynamic_stop:
trade['dynamic_stop_loss'] = new_atr_stop
return False, "ATR Trailing logic executed", trade
except Exception as e:
print(f"โŒ Error in ATR Trailing logic for {trade.get('symbol')}: {e}")
return False, "ATR logic failed", trade
async def _evaluate_time_based(self, trade: dict, current_price: float, params: dict):
exit_after_minutes = params.get("exit_after_minutes", 0)
if exit_after_minutes <= 0:
return False, "Time stop not configured", trade
entry_time = datetime.fromisoformat(trade.get('entry_timestamp'))
elapsed_minutes = (datetime.now() - entry_time).total_seconds() / 60
if elapsed_minutes >= exit_after_minutes:
return True, f"Time Stop hit: {elapsed_minutes:.1f}m >= {exit_after_minutes}m", trade
return False, "Time stop not hit", trade
async def _evaluate_break_even(self, trade: dict, current_price: float, break_even_trigger_percent: float):
if break_even_trigger_percent <= 0:
return False, "Break-even not configured", trade
entry_price = trade.get('entry_price')
current_dynamic_stop = trade.get('dynamic_stop_loss', 0)
if current_dynamic_stop < entry_price:
trigger_price = entry_price * (1 + break_even_trigger_percent / 100)
if current_price >= trigger_price:
trade['dynamic_stop_loss'] = entry_price
return False, "Break-even logic executed", trade
async def _fetch_ohlcv_for_atr(self, symbol, timeframe, limit):
try:
if not self.data_manager or not self.data_manager.exchange:
return None
loop = asyncio.get_event_loop()
ohlcv_data = await loop.run_in_executor(
None,
lambda: self.data_manager.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
)
if ohlcv_data and len(ohlcv_data) >= limit:
return ohlcv_data
else:
return None
except Exception as e:
print(f"โš ๏ธ Failed to fetch OHLCV for ATR ({symbol}): {e}")
return None
# ๐Ÿ”ด (End of unchanged _DynamicExitEngine class)
class TradeManager:
# ๐Ÿ”ด --- START OF CHANGE --- ๐Ÿ”ด
def __init__(self, r2_service, learning_hub=None, data_manager=None, state_manager=None):
self.r2_service = r2_service
self.learning_hub = learning_hub # (Changed from learning_engine)
self.data_manager = data_manager
self.state_manager = state_manager
self.monitoring_tasks = {}
self.is_running = False
self.monitoring_errors = {}
self.max_consecutive_errors = 5
self.exit_engine = _DynamicExitEngine(self.data_manager)
# ๐Ÿ”ด --- END OF CHANGE --- ๐Ÿ”ด
# ๐Ÿ”ด --- START OF CHANGE --- ๐Ÿ”ด
# (Added 'decision_context' parameter to capture data for the Reflector)
async def open_trade(self, symbol, decision, current_price, decision_context: dict):
# ๐Ÿ”ด --- END OF CHANGE --- ๐Ÿ”ด
try:
trade_type = decision.get("trade_type", "LONG")
if trade_type == "SHORT":
print(f"โš ๏ธ SHORT trade rejected for {symbol}. SPOT only system.")
await self.r2_service.save_system_logs_async({
"trade_open_rejected": True, "reason": "SHORT trade not allowed", "symbol": symbol
})
return None
portfolio_state = await self.r2_service.get_portfolio_state_async()
available_capital = portfolio_state.get("current_capital_usd", 0)
if available_capital < 1:
print(f"โŒ Insufficient capital for {symbol}: {available_capital}")
return None
expected_target_minutes = decision.get('expected_target_minutes', 15)
expected_target_minutes = max(5, min(expected_target_minutes, 45))
expected_target_time = (datetime.now() + timedelta(minutes=expected_target_minutes)).isoformat()
strategy = decision.get('strategy', 'GENERIC')
exit_profile = decision.get('exit_profile', 'FIXED_TARGET')
# ๐Ÿ”ด --- START OF CHANGE --- ๐Ÿ”ด
# (This is the LLM's decision JSON)
decision_data_for_trade = decision.copy()
# (Add the environmental context for the Reflector's analysis)
decision_data_for_trade['market_context_at_decision'] = decision_context.get('market', {})
decision_data_for_trade['indicators_at_decision'] = decision_context.get('indicators', {})
# ๐Ÿ”ด --- END OF CHANGE --- ๐Ÿ”ด
trades = await self.r2_service.get_open_trades_async()
new_trade = {
"id": str(int(datetime.now().timestamp())),
"symbol": symbol,
"entry_price": current_price,
"entry_timestamp": datetime.now().isoformat(),
# ๐Ÿ”ด --- START OF CHANGE --- ๐Ÿ”ด
# (Save the combined decision + context object)
"decision_data": decision_data_for_trade,
# ๐Ÿ”ด --- END OF CHANGE --- ๐Ÿ”ด
"status": "OPEN",
"stop_loss": decision.get("stop_loss"),
"take_profit": decision.get("take_profit"),
"dynamic_stop_loss": decision.get("stop_loss"),
"trade_type": "LONG",
"position_size_usd": available_capital,
"expected_target_minutes": expected_target_minutes,
"expected_target_time": expected_target_time,
"is_monitored": True,
"strategy": strategy,
"monitoring_started": False
}
trades.append(new_trade)
await self.r2_service.save_open_trades_async(trades)
portfolio_state["invested_capital_usd"] = available_capital
portfolio_state["current_capital_usd"] = 0.0
portfolio_state["total_trades"] = portfolio_state.get("total_trades", 0) + 1
await self.r2_service.save_portfolio_state_async(portfolio_state)
await self.r2_service.save_system_logs_async({
"new_trade_opened": True, "symbol": symbol, "position_size": available_capital,
"strategy": strategy, "exit_profile": exit_profile
})
print(f"โœ… New LONG trade opened for {symbol} (Strategy: {strategy} | Exit Profile: {exit_profile})")
return new_trade
except Exception as e:
print(f"โŒ Failed to open trade for {symbol}: {e}")
await self.r2_service.save_system_logs_async({
"trade_open_failed": True, "symbol": symbol, "error": str(e)
})
raise
async def close_trade(self, trade_to_close, close_price, reason="System Close"):
try:
symbol = trade_to_close.get('symbol')
trade_to_close['status'] = 'CLOSED'
trade_to_close['close_price'] = close_price
trade_to_close['close_timestamp'] = datetime.now().isoformat()
trade_to_close['is_monitored'] = False
entry_price = trade_to_close['entry_price']
position_size = trade_to_close['position_size_usd']
strategy = trade_to_close.get('strategy', 'unknown')
pnl = 0.0
pnl_percent = 0.0
if entry_price and entry_price > 0 and close_price and close_price > 0:
try:
pnl_percent = ((close_price - entry_price) / entry_price) * 100
pnl = position_size * (pnl_percent / 100)
except (TypeError, ZeroDivisionError):
pnl = 0.0
pnl_percent = 0.0
trade_to_close['pnl_usd'] = pnl
trade_to_close['pnl_percent'] = pnl_percent
await self._archive_closed_trade(trade_to_close)
await self._update_trade_summary(trade_to_close)
portfolio_state = await self.r2_service.get_portfolio_state_async()
current_capital = portfolio_state.get("current_capital_usd", 0)
new_capital = current_capital + position_size + pnl
portfolio_state["current_capital_usd"] = new_capital
portfolio_state["invested_capital_usd"] = 0.0
if pnl > 0:
portfolio_state["winning_trades"] = portfolio_state.get("winning_trades", 0) + 1
portfolio_state["total_profit_usd"] = portfolio_state.get("total_profit_usd", 0.0) + pnl
elif pnl < 0:
portfolio_state["total_loss_usd"] = portfolio_state.get("total_loss_usd", 0.0) + abs(pnl)
await self.r2_service.save_portfolio_state_async(portfolio_state)
open_trades = await self.r2_service.get_open_trades_async()
trades_to_keep = [t for t in open_trades if t.get('id') != trade_to_close.get('id')]
await self.r2_service.save_open_trades_async(trades_to_keep)
if symbol in self.monitoring_tasks:
del self.monitoring_tasks[symbol]
if symbol in self.monitoring_errors:
del self.monitoring_errors[symbol]
await self.r2_service.save_system_logs_async({
"trade_closed": True, "symbol": symbol, "pnl_usd": pnl, "pnl_percent": pnl_percent,
"new_capital": new_capital, "strategy": strategy, "trade_type": "LONG", "reason": reason
})
# ๐Ÿ”ด --- START OF CHANGE --- ๐Ÿ”ด
# (Trigger the new Learning Hub)
if self.learning_hub and self.learning_hub.initialized:
print(f"๐Ÿง  Triggering Learning Hub for {symbol}...")
await self.learning_hub.analyze_trade_and_learn(trade_to_close, reason)
# ๐Ÿ”ด --- END OF CHANGE --- ๐Ÿ”ด
print(f"โœ… Trade closed for {symbol} - Reason: {reason} - PnL: {pnl_percent:+.2f}%")
return True
except Exception as e:
print(f"โŒ Failed to close trade for {trade_to_close.get('symbol')}: {e}")
await self.r2_service.save_system_logs_async({
"trade_close_failed": True, "symbol": trade_to_close.get('symbol'), "error": str(e)
})
raise
async def update_trade(self, trade_to_update, re_analysis_decision):
# (This function remains unchanged from trade_manager (26).py)
# (It correctly updates exit profiles based on re-analysis)
try:
symbol = trade_to_update.get('symbol')
if re_analysis_decision.get('action') == "UPDATE_TRADE":
trade_to_update['stop_loss'] = re_analysis_decision['new_stop_loss']
trade_to_update['take_profit'] = re_analysis_decision['new_take_profit']
trade_to_update['dynamic_stop_loss'] = re_analysis_decision['new_stop_loss']
trade_to_update['decision_data']['exit_profile'] = re_analysis_decision['new_exit_profile']
trade_to_update['decision_data']['exit_parameters'] = re_analysis_decision['new_exit_parameters']
print(f" ๐Ÿ”„ {symbol}: Exit profile updated to {re_analysis_decision['new_exit_profile']}")
new_expected_minutes = re_analysis_decision.get('new_expected_minutes')
if new_expected_minutes:
new_expected_minutes = max(5, min(new_expected_minutes, 45))
trade_to_update['expected_target_minutes'] = new_expected_minutes
trade_to_update['expected_target_time'] = (datetime.now() + timedelta(minutes=new_expected_minutes)).isoformat()
original_strategy = trade_to_update.get('strategy', re_analysis_decision.get('strategy', 'GENERIC'))
trade_to_update['strategy'] = original_strategy
trade_to_update['decision_data']['reasoning'] = re_analysis_decision.get('reasoning')
trade_to_update['is_monitored'] = True
trade_to_update['trade_type'] = "LONG"
open_trades = await self.r2_service.get_open_trades_async()
for i, trade in enumerate(open_trades):
if trade.get('id') == trade_to_update.get('id'):
open_trades[i] = trade_to_update
break
await self.r2_service.save_open_trades_async(open_trades)
await self.r2_service.save_system_logs_async({
"trade_updated": True, "symbol": symbol, "action": "UPDATE_TRADE",
"strategy": original_strategy,
"new_exit_profile": re_analysis_decision.get('new_exit_profile', 'N/A')
})
print(f"โœ… Trade updated for {symbol}")
return True
except Exception as e:
print(f"โŒ Failed to update trade {trade_to_update.get('symbol')}: {e}")
raise
async def immediate_close_trade(self, symbol, close_price, reason="Immediate Monitor"):
# (This function remains unchanged)
try:
open_trades = await self.r2_service.get_open_trades_async()
trade_to_close = next((t for t in open_trades if t['symbol'] == symbol and t['status'] == 'OPEN'), None)
if not trade_to_close:
print(f"โš ๏ธ No open trade found for {symbol} to close immediately.")
return False
await self.close_trade(trade_to_close, close_price, reason)
return True
except Exception as e:
print(f"โŒ Failed to immediate_close {symbol}: {e}")
return False
async def start_trade_monitoring(self):
# (This function remains unchanged, it correctly starts the _monitor_single_trade tasks)
self.is_running = True
print(f"๐Ÿ” Starting trade monitoring (Dynamic Exit Engine) (Pandas: {PANDAS_TA_AVAILABLE})...")
while self.is_running:
try:
open_trades = await self.r2_service.get_open_trades_async()
current_time = time.time()
for trade in open_trades:
symbol = trade['symbol']
if self.monitoring_errors.get(symbol, 0) >= self.max_consecutive_errors:
if symbol in self.monitoring_tasks: del self.monitoring_tasks[symbol]
continue
if symbol not in self.monitoring_tasks:
task = asyncio.create_task(self._monitor_single_trade(trade))
self.monitoring_tasks[symbol] = {'task': task, 'start_time': current_time, 'trade_object': trade}
trade['monitoring_started'] = True
current_symbols = {trade['symbol'] for trade in open_trades}
for symbol in list(self.monitoring_tasks.keys()):
if symbol not in current_symbols:
task_info = self.monitoring_tasks[symbol]
if not task_info['task'].done(): task_info['task'].cancel()
del self.monitoring_tasks[symbol]
if symbol in self.monitoring_errors: del self.monitoring_errors[symbol]
await asyncio.sleep(10)
except Exception as error:
print(f"โŒ Error in main trade monitoring loop: {error}")
await asyncio.sleep(30)
async def _monitor_single_trade(self, trade_object):
# (This function remains unchanged)
# (It correctly uses the state_manager lock and the exit_engine)
symbol = trade_object['symbol']
local_trade = trade_object.copy()
print(f"๐Ÿ” Starting tactical monitoring: {symbol} (Profile: {local_trade.get('decision_data', {}).get('exit_profile', 'N/A')})")
while (symbol in self.monitoring_tasks and
self.is_running and
self.monitoring_errors.get(symbol, 0) < self.max_consecutive_errors):
try:
if self.state_manager and self.state_manager.trade_analysis_lock.locked():
print(f"โธ๏ธ [Monitor] Pausing {symbol} (Strategic re-analysis active...)")
await asyncio.sleep(10)
continue
if not self.data_manager:
await asyncio.sleep(15); continue
try:
current_price = await asyncio.wait_for(self.data_manager.get_latest_price_async(symbol), timeout=10)
except asyncio.TimeoutError:
print(f"โฐ Price timeout for {symbol}"); self._increment_monitoring_error(symbol); await asyncio.sleep(15); continue
if not current_price:
self._increment_monitoring_error(symbol); await asyncio.sleep(15); continue
should_close, close_reason, updated_local_trade = await self.exit_engine.evaluate_exit(
local_trade,
current_price
)
local_trade = updated_local_trade
if should_close:
print(f"๐Ÿ›‘ [Monitor] Exit signal for {symbol}: {close_reason}")
if self.r2_service.acquire_lock():
try:
latest_trade_data = await self.r2_service.get_trade_by_symbol_async(symbol)
if latest_trade_data and latest_trade_data['status'] == 'OPEN':
await self.immediate_close_trade(symbol, current_price, f"Tactical Monitor: {close_reason}")
else:
print(f"โš ๏ธ {symbol} no longer open, tactical close cancelled.")
except Exception as close_error:
print(f"โŒ Auto-close failed for {symbol}: {close_error}")
finally:
self.r2_service.release_lock()
break
if symbol in self.monitoring_errors:
self.monitoring_errors[symbol] = 0
await asyncio.sleep(60)
except Exception as error:
error_count = self._increment_monitoring_error(symbol)
print(f"โŒ Error monitoring {symbol} (Err #{error_count}): {error}")
traceback.print_exc()
if error_count >= self.max_consecutive_errors:
print(f"๐Ÿšจ Stopping monitoring for {symbol} due to repeated errors")
await self.r2_service.save_system_logs_async({"monitoring_stopped": True, "symbol": symbol, "error_count": error_count, "error": str(error)})
break
await asyncio.sleep(30)
print(f"๐Ÿ›‘ Stopping tactical monitoring for: {symbol}")
if symbol in self.monitoring_tasks: del self.monitoring_tasks[symbol]
if symbol in self.monitoring_errors: del self.monitoring_errors[symbol]
def _increment_monitoring_error(self, symbol):
# (This function remains unchanged)
self.monitoring_errors.setdefault(symbol, 0)
self.monitoring_errors[symbol] += 1
return self.monitoring_errors[symbol]
def stop_monitoring(self):
# (This function remains unchanged)
self.is_running = False
print("๐Ÿ›‘ Stopping all monitoring tasks...")
for symbol, task_info in self.monitoring_tasks.items():
if not task_info['task'].done():
task_info['task'].cancel()
self.monitoring_tasks.clear()
self.monitoring_errors.clear()
async def _archive_closed_trade(self, closed_trade):
# (This function remains unchanged)
try:
key = "closed_trades_history.json"
try:
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
history = json.loads(response['Body'].read())
except Exception:
history = []
history.append(closed_trade)
history = history[-1000:]
data_json = json.dumps(history, indent=2).encode('utf-8')
self.r2_service.s3_client.put_object(Bucket="trading", Key=key, Body=data_json, ContentType="application/json")
except Exception as e:
print(f"โŒ Failed to archive trade: {e}")
async def _update_trade_summary(self, closed_trade):
# (This function remains unchanged)
try:
key = "trade_summary.json"
try:
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
summary = json.loads(response['Body'].read())
except Exception:
summary = {"total_trades": 0, "winning_trades": 0, "losing_trades": 0, "total_profit_usd": 0.0, "total_loss_usd": 0.0, "win_percentage": 0.0, "avg_profit_per_trade": 0.0, "avg_loss_per_trade": 0.0, "largest_win": 0.0, "largest_loss": 0.0, "last_updated": datetime.now().isoformat()}
pnl = closed_trade.get('pnl_usd', 0.0)
summary['total_trades'] += 1
if pnl >= 0:
summary['winning_trades'] += 1
summary['total_profit_usd'] += pnl
summary['largest_win'] = max(summary.get('largest_win', 0), pnl)
else:
summary['losing_trades'] += 1
summary['total_loss_usd'] += abs(pnl)
summary['largest_loss'] = max(summary.get('largest_loss', 0), abs(pnl))
if summary['total_trades'] > 0:
summary['win_percentage'] = (summary['winning_trades'] / summary['total_trades']) * 100
if summary['winning_trades'] > 0:
summary['avg_profit_per_trade'] = summary['total_profit_usd'] / summary['winning_trades']
if summary['losing_trades'] > 0:
summary['avg_loss_per_trade'] = summary['total_loss_usd'] / summary['losing_trades']
summary['last_updated'] = datetime.now().isoformat()
data_json = json.dumps(summary, indent=2).encode('utf-8')
self.r2_service.s3_client.put_object(Bucket="trading", Key=key, Body=data_json, ContentType="application/json")
except Exception as e:
print(f"โŒ Failed to update trade summary: {e}")
async def get_open_trades(self):
# (This function remains unchanged)
try:
return await self.r2_service.get_open_trades_async()
except Exception as e:
print(f"โŒ Failed to get open trades: {e}")
return []
async def get_trade_by_symbol(self, symbol):
# (This function remains unchanged)
try:
open_trades = await self.get_open_trades()
return next((t for t in open_trades if t['symbol'] == symbol and t['status'] == 'OPEN'), None)
except Exception as e:
print(f"โŒ Failed to get trade by symbol {symbol}: {e}")
return None
def get_monitoring_status(self):
# (This function remains unchanged)
return {
'is_running': self.is_running,
'active_tasks': len(self.monitoring_tasks),
'monitoring_errors': dict(self.monitoring_errors),
'max_consecutive_errors': self.max_consecutive_errors
}
print(f"โœ… Trade Manager loaded - V3 (Integrated LearningHub Trigger & Context Injection / Pandas: {PANDAS_TA_AVAILABLE})")