Spaces:
Running
Running
| import asyncio | |
| import json | |
| import time | |
| import traceback | |
| from datetime import datetime, timedelta | |
| from helpers import safe_float_conversion, _apply_patience_logic | |
| class TradeManager: | |
| def __init__(self, r2_service, learning_engine=None, data_manager=None): | |
| self.r2_service = r2_service | |
| self.learning_engine = learning_engine | |
| self.data_manager = data_manager | |
| self.monitoring_tasks = {} | |
| self.is_running = False | |
| self.monitoring_errors = {} | |
| self.max_consecutive_errors = 5 | |
| async def open_trade(self, symbol, decision, current_price): | |
| try: | |
| portfolio_state = await self.r2_service.get_portfolio_state_async() | |
| available_capital = portfolio_state.get("current_capital_usd", 0) | |
| if available_capital < 1: | |
| print(f"❌ رأس المال غير كافي لفتح صفقة لـ {symbol}: {available_capital}") | |
| return None | |
| expected_target_minutes = decision.get('expected_target_minutes', 15) | |
| expected_target_minutes = max(5, min(expected_target_minutes, 45)) | |
| expected_target_time = (datetime.now() + timedelta(minutes=expected_target_minutes)).isoformat() | |
| strategy = decision.get('strategy') | |
| if not strategy or strategy == 'unknown': | |
| strategy = 'GENERIC' | |
| trades = await self.r2_service.get_open_trades_async() | |
| new_trade = { | |
| "id": str(int(datetime.now().timestamp())), | |
| "symbol": symbol, | |
| "entry_price": current_price, | |
| "entry_timestamp": datetime.now().isoformat(), | |
| "decision_data": decision, | |
| "status": "OPEN", | |
| "stop_loss": decision.get("stop_loss"), | |
| "take_profit": decision.get("take_profit"), | |
| "trade_type": decision.get("trade_type"), | |
| "position_size_usd": available_capital, | |
| "expected_target_minutes": expected_target_minutes, | |
| "expected_target_time": expected_target_time, | |
| "is_monitored": True, | |
| "strategy": strategy, | |
| "monitoring_started": False | |
| } | |
| trades.append(new_trade) | |
| await self.r2_service.save_open_trades_async(trades) | |
| portfolio_state["invested_capital_usd"] = available_capital | |
| portfolio_state["current_capital_usd"] = 0.0 | |
| portfolio_state["total_trades"] = portfolio_state.get("total_trades", 0) + 1 | |
| await self.r2_service.save_portfolio_state_async(portfolio_state) | |
| await self.r2_service.save_system_logs_async({ | |
| "new_trade_opened": True, | |
| "symbol": symbol, | |
| "position_size": available_capital, | |
| "expected_minutes": expected_target_minutes, | |
| "trade_type": decision.get("trade_type", "LONG"), | |
| "strategy": strategy | |
| }) | |
| print(f"✅ تم فتح صفقة جديدة لـ {symbol} باستراتيجية {strategy}") | |
| return new_trade | |
| except Exception as e: | |
| print(f"❌ فشل فتح صفقة لـ {symbol}: {e}") | |
| await self.r2_service.save_system_logs_async({ | |
| "trade_open_failed": True, | |
| "symbol": symbol, | |
| "error": str(e) | |
| }) | |
| raise | |
| async def close_trade(self, trade_to_close, close_price, reason="إغلاق بالنظام"): | |
| try: | |
| symbol = trade_to_close.get('symbol') | |
| trade_to_close['status'] = 'CLOSED' | |
| trade_to_close['close_price'] = close_price | |
| trade_to_close['close_timestamp'] = datetime.now().isoformat() | |
| trade_to_close['is_monitored'] = False | |
| entry_price = trade_to_close['entry_price'] | |
| position_size = trade_to_close['position_size_usd'] | |
| trade_type = trade_to_close.get('trade_type', 'LONG') | |
| strategy = trade_to_close.get('strategy', 'unknown') | |
| pnl = 0.0 | |
| pnl_percent = 0.0 | |
| if entry_price and entry_price > 0 and close_price and close_price > 0: | |
| try: | |
| if trade_type == 'LONG': | |
| pnl_percent = ((close_price - entry_price) / entry_price) * 100 | |
| pnl = position_size * (pnl_percent / 100) | |
| elif trade_type == 'SHORT': | |
| pnl_percent = ((entry_price - close_price) / entry_price) * 100 | |
| pnl = position_size * (pnl_percent / 100) | |
| except (TypeError, ZeroDivisionError) as calc_error: | |
| pnl = 0.0 | |
| pnl_percent = 0.0 | |
| trade_to_close['pnl_usd'] = pnl | |
| trade_to_close['pnl_percent'] = pnl_percent | |
| await self._archive_closed_trade(trade_to_close) | |
| await self._update_trade_summary(trade_to_close) | |
| portfolio_state = await self.r2_service.get_portfolio_state_async() | |
| current_capital = portfolio_state.get("current_capital_usd", 0) | |
| invested_capital = portfolio_state.get("invested_capital_usd", 0) | |
| new_capital = current_capital + position_size + pnl | |
| portfolio_state["current_capital_usd"] = new_capital | |
| portfolio_state["invested_capital_usd"] = 0.0 | |
| if pnl > 0: | |
| portfolio_state["winning_trades"] = portfolio_state.get("winning_trades", 0) + 1 | |
| portfolio_state["total_profit_usd"] = portfolio_state.get("total_profit_usd", 0.0) + pnl | |
| elif pnl < 0: | |
| portfolio_state["total_loss_usd"] = portfolio_state.get("total_loss_usd", 0.0) + abs(pnl) | |
| await self.r2_service.save_portfolio_state_async(portfolio_state) | |
| open_trades = await self.r2_service.get_open_trades_async() | |
| trades_to_keep = [t for t in open_trades if t.get('id') != trade_to_close.get('id')] | |
| await self.r2_service.save_open_trades_async(trades_to_keep) | |
| # إزالة من المهام النشطة | |
| if symbol in self.monitoring_tasks: | |
| del self.monitoring_tasks[symbol] | |
| if symbol in self.monitoring_errors: | |
| del self.monitoring_errors[symbol] | |
| await self.r2_service.save_system_logs_async({ | |
| "trade_closed": True, | |
| "symbol": symbol, | |
| "entry_price": entry_price, | |
| "close_price": close_price, | |
| "pnl_usd": pnl, | |
| "pnl_percent": pnl_percent, | |
| "new_capital": new_capital, | |
| "strategy": strategy, | |
| "position_size": position_size, | |
| "trade_type": trade_type, | |
| "reason": reason | |
| }) | |
| if self.learning_engine and self.learning_engine.initialized: | |
| await self.learning_engine.analyze_trade_outcome(trade_to_close, reason) | |
| print(f"✅ تم إغلاق صفقة {symbol} - السبب: {reason} - الربح: {pnl_percent:+.2f}%") | |
| return True | |
| except Exception as e: | |
| print(f"❌ فشل إغلاق صفقة {trade_to_close.get('symbol')}: {e}") | |
| await self.r2_service.save_system_logs_async({ | |
| "trade_close_failed": True, | |
| "symbol": trade_to_close.get('symbol'), | |
| "error": str(e) | |
| }) | |
| raise | |
| async def update_trade(self, trade_to_update, re_analysis_decision): | |
| try: | |
| symbol = trade_to_update.get('symbol') | |
| if re_analysis_decision.get('new_stop_loss'): | |
| trade_to_update['stop_loss'] = re_analysis_decision['new_stop_loss'] | |
| if re_analysis_decision.get('new_take_profit'): | |
| trade_to_update['take_profit'] = re_analysis_decision['new_take_profit'] | |
| new_expected_minutes = re_analysis_decision.get('new_expected_minutes') | |
| if new_expected_minutes: | |
| new_expected_minutes = max(5, min(new_expected_minutes, 45)) | |
| trade_to_update['expected_target_minutes'] = new_expected_minutes | |
| trade_to_update['expected_target_time'] = (datetime.now() + timedelta(minutes=new_expected_minutes)).isoformat() | |
| original_strategy = trade_to_update.get('strategy') | |
| if not original_strategy or original_strategy == 'unknown': | |
| original_strategy = re_analysis_decision.get('strategy', 'GENERIC') | |
| trade_to_update['strategy'] = original_strategy | |
| trade_to_update['decision_data'] = re_analysis_decision | |
| trade_to_update['is_monitored'] = True | |
| open_trades = await self.r2_service.get_open_trades_async() | |
| for i, trade in enumerate(open_trades): | |
| if trade.get('id') == trade_to_update.get('id'): | |
| open_trades[i] = trade_to_update | |
| break | |
| await self.r2_service.save_open_trades_async(open_trades) | |
| await self.r2_service.save_system_logs_async({ | |
| "trade_updated": True, | |
| "symbol": symbol, | |
| "new_expected_minutes": new_expected_minutes, | |
| "action": "UPDATE_TRADE", | |
| "strategy": original_strategy | |
| }) | |
| print(f"✅ تم تحديث صفقة {symbol}") | |
| return True | |
| except Exception as e: | |
| print(f"❌ فشل تحديث صفقة {trade_to_update.get('symbol')}: {e}") | |
| raise | |
| async def immediate_close_trade(self, symbol, close_price, reason="المراقبة الفورية"): | |
| try: | |
| open_trades = await self.r2_service.get_open_trades_async() | |
| trade_to_close = None | |
| for trade in open_trades: | |
| if trade['symbol'] == symbol and trade['status'] == 'OPEN': | |
| trade_to_close = trade | |
| break | |
| if not trade_to_close: | |
| print(f"⚠️ لم يتم العثور على صفقة مفتوحة لـ {symbol}") | |
| return False | |
| await self.close_trade(trade_to_close, close_price, reason) | |
| return True | |
| except Exception as e: | |
| print(f"❌ فشل الإغلاق الفوري لـ {symbol}: {e}") | |
| return False | |
| async def start_trade_monitoring(self): | |
| self.is_running = True | |
| print("🔍 بدء مراقبة الصفقات...") | |
| while self.is_running: | |
| try: | |
| open_trades = await self.r2_service.get_open_trades_async() | |
| current_time = time.time() | |
| for trade in open_trades: | |
| symbol = trade['symbol'] | |
| # تخطي الصفقات التي تجاوزت حد الأخطاء | |
| if self.monitoring_errors.get(symbol, 0) >= self.max_consecutive_errors: | |
| if symbol in self.monitoring_tasks: | |
| del self.monitoring_tasks[symbol] | |
| continue | |
| # بدء المراقبة إذا لم تكن نشطة | |
| if symbol not in self.monitoring_tasks: | |
| task = asyncio.create_task(self._monitor_single_trade(trade)) | |
| self.monitoring_tasks[symbol] = { | |
| 'task': task, | |
| 'start_time': current_time, | |
| 'trade': trade | |
| } | |
| trade['monitoring_started'] = True | |
| # تنظيف المهام المنتهية | |
| current_symbols = {trade['symbol'] for trade in open_trades} | |
| for symbol in list(self.monitoring_tasks.keys()): | |
| if symbol not in current_symbols: | |
| task_info = self.monitoring_tasks[symbol] | |
| if not task_info['task'].done(): | |
| task_info['task'].cancel() | |
| del self.monitoring_tasks[symbol] | |
| if symbol in self.monitoring_errors: | |
| del self.monitoring_errors[symbol] | |
| await asyncio.sleep(10) | |
| except Exception as error: | |
| print(f"❌ خطأ في مراقبة الصفقات: {error}") | |
| await asyncio.sleep(30) | |
| async def _monitor_single_trade(self, trade): | |
| symbol = trade['symbol'] | |
| max_monitoring_time = 3600 # أقصى وقت مراقبة: ساعة واحدة | |
| print(f"🔍 بدء مراقبة الصفقة: {symbol}") | |
| while (symbol in self.monitoring_tasks and | |
| self.is_running and | |
| self.monitoring_errors.get(symbol, 0) < self.max_consecutive_errors): | |
| try: | |
| start_time = time.time() | |
| if not self.data_manager: | |
| print(f"⚠️ DataManager غير متوفر لـ {symbol}") | |
| await asyncio.sleep(15) | |
| continue | |
| # الحصول على السعر الحالي مع مهلة زمنية | |
| try: | |
| current_price = await asyncio.wait_for( | |
| self.data_manager.get_latest_price_async(symbol), | |
| timeout=10 | |
| ) | |
| except asyncio.TimeoutError: | |
| print(f"⏰ مهلة انتظار السعر لـ {symbol}") | |
| self._increment_monitoring_error(symbol) | |
| await asyncio.sleep(15) | |
| continue | |
| if not current_price: | |
| print(f"⚠️ لم يتم الحصول على سعر لـ {symbol}") | |
| self._increment_monitoring_error(symbol) | |
| await asyncio.sleep(15) | |
| continue | |
| entry_price = trade['entry_price'] | |
| stop_loss = trade.get('stop_loss') | |
| take_profit = trade.get('take_profit') | |
| should_close, close_reason = False, "" | |
| # التحقق من شروط الإغلاق | |
| if stop_loss and current_price <= stop_loss: | |
| should_close, close_reason = True, f"وصول وقف الخسارة: {current_price} <= {stop_loss}" | |
| elif take_profit and current_price >= take_profit: | |
| should_close, close_reason = True, f"وصول جني الأرباح: {current_price} >= {take_profit}" | |
| # تحديث وقف الخسارة الديناميكي | |
| if not should_close and current_price > entry_price: | |
| dynamic_stop = current_price * 0.98 | |
| if dynamic_stop > (stop_loss or 0): | |
| trade['stop_loss'] = dynamic_stop | |
| # إغلاق الصفقة إذا لزم الأمر | |
| if should_close: | |
| if self.r2_service.acquire_lock(): | |
| try: | |
| await self.immediate_close_trade(symbol, current_price, close_reason) | |
| except Exception as close_error: | |
| print(f"❌ فشل الإغلاق التلقائي لـ {symbol}: {close_error}") | |
| finally: | |
| self.r2_service.release_lock() | |
| break | |
| # إعادة تعيين عداد الأخطاء عند النجاح | |
| if symbol in self.monitoring_errors: | |
| self.monitoring_errors[symbol] = 0 | |
| # التحقق من وقت المراقبة الطويل | |
| monitoring_duration = time.time() - start_time | |
| if monitoring_duration > max_monitoring_time: | |
| print(f"🕒 انتهى وقت مراقبة الصفقة {symbol}") | |
| break | |
| await asyncio.sleep(15) | |
| except Exception as error: | |
| error_count = self._increment_monitoring_error(symbol) | |
| print(f"❌ خطأ في مراقبة {symbol} (الخطأ #{error_count}): {error}") | |
| if error_count >= self.max_consecutive_errors: | |
| print(f"🚨 إيقاف مراقبة {symbol} بسبب الأخطاء المتتالية") | |
| await self.r2_service.save_system_logs_async({ | |
| "monitoring_stopped": True, | |
| "symbol": symbol, | |
| "error_count": error_count, | |
| "error": str(error) | |
| }) | |
| break | |
| await asyncio.sleep(30) | |
| print(f"🛑 توقيف مراقبة الصفقة: {symbol}") | |
| def _increment_monitoring_error(self, symbol): | |
| if symbol not in self.monitoring_errors: | |
| self.monitoring_errors[symbol] = 0 | |
| self.monitoring_errors[symbol] += 1 | |
| return self.monitoring_errors[symbol] | |
| def stop_monitoring(self): | |
| self.is_running = False | |
| print("🛑 إيقاف جميع مهام المراقبة...") | |
| for symbol, task_info in self.monitoring_tasks.items(): | |
| if not task_info['task'].done(): | |
| task_info['task'].cancel() | |
| print(f"✅ تم إلغاء مهمة مراقبة {symbol}") | |
| self.monitoring_tasks.clear() | |
| self.monitoring_errors.clear() | |
| async def _archive_closed_trade(self, closed_trade): | |
| try: | |
| key = "closed_trades_history.json" | |
| try: | |
| response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key) | |
| history = json.loads(response['Body'].read()) | |
| except Exception: | |
| history = [] | |
| history.append(closed_trade) | |
| # حفظ آخر 1000 صفقة فقط | |
| if len(history) > 1000: | |
| history = history[-1000:] | |
| data_json = json.dumps(history, indent=2).encode('utf-8') | |
| self.r2_service.s3_client.put_object( | |
| Bucket="trading", Key=key, Body=data_json, ContentType="application/json" | |
| ) | |
| except Exception as e: | |
| print(f"❌ فشل أرشفة الصفقة: {e}") | |
| async def _update_trade_summary(self, closed_trade): | |
| try: | |
| key = "trade_summary.json" | |
| try: | |
| response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key) | |
| summary = json.loads(response['Body'].read()) | |
| except Exception: | |
| summary = { | |
| "total_trades": 0, "winning_trades": 0, "losing_trades": 0, | |
| "total_profit_usd": 0.0, "total_loss_usd": 0.0, "win_percentage": 0.0, | |
| "avg_profit_per_trade": 0.0, "avg_loss_per_trade": 0.0, | |
| "largest_win": 0.0, "largest_loss": 0.0, | |
| "last_updated": datetime.now().isoformat() | |
| } | |
| pnl = closed_trade.get('pnl_usd', 0.0) | |
| summary['total_trades'] += 1 | |
| if pnl >= 0: | |
| summary['winning_trades'] += 1 | |
| summary['total_profit_usd'] += pnl | |
| if pnl > summary.get('largest_win', 0): | |
| summary['largest_win'] = pnl | |
| else: | |
| summary['losing_trades'] += 1 | |
| summary['total_loss_usd'] += abs(pnl) | |
| if abs(pnl) > summary.get('largest_loss', 0): | |
| summary['largest_loss'] = abs(pnl) | |
| if summary['total_trades'] > 0: | |
| summary['win_percentage'] = (summary['winning_trades'] / summary['total_trades']) * 100 | |
| if summary['winning_trades'] > 0: | |
| summary['avg_profit_per_trade'] = summary['total_profit_usd'] / summary['winning_trades'] | |
| if summary['losing_trades'] > 0: | |
| summary['avg_loss_per_trade'] = summary['total_loss_usd'] / summary['losing_trades'] | |
| summary['last_updated'] = datetime.now().isoformat() | |
| data_json = json.dumps(summary, indent=2).encode('utf-8') | |
| self.r2_service.s3_client.put_object( | |
| Bucket="trading", Key=key, Body=data_json, ContentType="application/json" | |
| ) | |
| except Exception as e: | |
| print(f"❌ فشل تحديث ملخص التداول: {e}") | |
| async def get_open_trades(self): | |
| try: | |
| return await self.r2_service.get_open_trades_async() | |
| except Exception as e: | |
| print(f"❌ فشل جلب الصفقات المفتوحة: {e}") | |
| return [] | |
| async def get_trade_by_symbol(self, symbol): | |
| try: | |
| open_trades = await self.get_open_trades() | |
| for trade in open_trades: | |
| if trade['symbol'] == symbol and trade['status'] == 'OPEN': | |
| return trade | |
| return None | |
| except Exception as e: | |
| print(f"❌ فشل البحث عن صفقة {symbol}: {e}") | |
| return None | |
| def get_monitoring_status(self): | |
| return { | |
| 'is_running': self.is_running, | |
| 'active_tasks': len(self.monitoring_tasks), | |
| 'monitoring_errors': dict(self.monitoring_errors), | |
| 'max_consecutive_errors': self.max_consecutive_errors | |
| } |