Trad / trade_manager.py
Riy777's picture
Update trade_manager.py
3f42b72
raw
history blame
16.9 kB
# trade_manager.py (V13.1 - Tactical Sniper with Full Reporting)
import asyncio
import time
import json
import traceback
from datetime import datetime, timezone
from typing import Dict, List, Deque, Optional
from collections import deque
import numpy as np
import ccxt.async_support as ccxtasync
# استيراد التبعيات الضرورية للتحليل اللحظي
try:
from ml_engine.titan_engine import TitanEngine
from ml_engine.patterns import ChartPatternAnalyzer
except ImportError:
TitanEngine = None
ChartPatternAnalyzer = None
print("⚠️ [TradeManager] Warning: ML engines not found. Sniper mode will be limited.")
class TacticalData:
"""حاوية بيانات تكتيكية سريعة للمراقبة اللحظية"""
def __init__(self, symbol):
self.symbol = symbol
self.order_book = None
self.trades = deque(maxlen=1000) # آخر 1000 صفقة لتتبع الزخم
# نحتاج فريم 5 دقائق بشكل أساسي للزناد، و 1 ساعة للسياق
self.ohlcv = {tf: deque(maxlen=300) for tf in ['5m', '1h']}
self.last_update = time.time()
def update_ohlcv(self, tf, candles):
if not candles: return
current = self.ohlcv.get(tf)
if current is None: return
# دمج الشموع الجديدة بكفاءة
if len(current) == 0:
current.extend(candles)
else:
last_ts = current[-1][0]
new_candles = [c for c in candles if c[0] > last_ts]
current.extend(new_candles)
# تحديث الشمعة الأخيرة إذا كانت لا تزال مفتوحة
if candles[-1][0] == last_ts:
current[-1] = candles[-1]
def is_ready_for_analysis(self):
# نحتاج على الأقل 100 شمعة 5 دقائق لتحليل فني معقول
return len(self.ohlcv['5m']) >= 100
class TradeManager:
def __init__(self, r2_service, data_manager, titan_engine=None, processor=None):
self.r2 = r2_service
self.dm = data_manager
# استخدام محركات خاصة للحارس لضمان عدم تضارب الذاكرة مع المستكشف
self.titan = TitanEngine() if TitanEngine else None
self.pattern_analyzer = ChartPatternAnalyzer() if ChartPatternAnalyzer else None
self.is_running = False
self.watchlist = {} # العملات قيد المراقبة للدخول
self.open_positions = {} # العملات التي تم الدخول فيها
self.tasks = {}
self.data_cache = {}
self.lock = asyncio.Lock()
self.exchange = None
# إعدادات القناص
self.SNIPER_ENTRY_THRESHOLD = 0.75 # عتبة دخول لحظية مرتفعة
self.MOMENTUM_CONFIRMATION = True # تطلب تأكيد الزخم من دفتر الطلبات
print(f"⚔️ [Sniper] Tactical Trade Manager Initialized. Entry Threshold: {self.SNIPER_ENTRY_THRESHOLD}")
async def initialize_sentry_exchanges(self):
"""تهيئة اتصال المنصة الخاص بالحارس"""
try:
if not self.exchange:
self.exchange = ccxtasync.kucoin({
'enableRateLimit': True,
'timeout': 15000, # تايم آوت أقصر للاستجابة السريعة
'options': {'defaultType': 'spot'}
})
await self.exchange.load_markets()
# تهيئة المحركات اللحظية
if self.titan and not self.titan.initialized: await self.titan.initialize()
if self.pattern_analyzer and not self.pattern_analyzer.initialized: await self.pattern_analyzer.initialize()
except Exception as e:
print(f"❌ [Sniper Init Error] {e}")
async def start_sentry_loops(self):
"""بدء حلقة المراقبة الرئيسية"""
self.is_running = True
print("🔭 [Sniper] Sentry loops started.")
while self.is_running:
try:
# 1. إدارة قائمة المراقبة (للدخول)
async with self.lock:
watch_targets = list(self.watchlist.keys())
for sym in watch_targets:
if sym not in self.tasks:
print(f"🎯 [Sniper] Locking on target: {sym}")
self.data_cache[sym] = TacticalData(sym)
self.tasks[sym] = asyncio.create_task(self._monitor_for_entry(sym))
# 2. إدارة الصفقات المفتوحة (للخروج)
# (سيتم إضافتها لاحقاً: self._monitor_open_positions())
await asyncio.sleep(5) # دورة تفقد رئيسية سريعة
except Exception as e:
print(f"⚠️ [Sentry Loop Error] {e}")
await asyncio.sleep(5)
async def stop_sentry_loops(self):
self.is_running = False
for t in self.tasks.values(): t.cancel()
if self.exchange: await self.exchange.close()
print("🛡️ [Sniper] Sentry stood down.")
async def update_sentry_watchlist(self, candidates):
"""تحديث قائمة الأهداف من المستكشف"""
async with self.lock:
# إضافة أهداف جديدة فقط
for cand in candidates:
sym = cand['symbol']
if sym not in self.watchlist and sym not in self.open_positions:
self.watchlist[sym] = cand
print(f"📋 [Sniper] New target acquired: {sym}")
# ==================================================================
# 🎯 منطق القناص (Entry Logic)
# ==================================================================
async def _monitor_for_entry(self, symbol):
"""مراقبة لحظية مكثفة لهدف محدد بغرض الدخول"""
print(f"👁️ [Sniper] Monitoring {symbol} for tactical entry...")
consecutive_signals = 0
while self.is_running and symbol in self.watchlist:
try:
# 1. تحديث البيانات التكتيكية (5m + 1h + OrderBook)
tfs = ['5m', '1h']
tasks = [self.exchange.fetch_ohlcv(symbol, tf, limit=150) for tf in tfs]
tasks.append(self.exchange.fetch_order_book(symbol, limit=50))
results = await asyncio.gather(*tasks, return_exceptions=True)
data = self.data_cache[symbol]
valid_update = True
for i, res in enumerate(results[:-1]):
if isinstance(res, list):
data.update_ohlcv(tfs[i], res)
else:
valid_update = False
if isinstance(results[-1], dict):
data.order_book = results[-1]
if not valid_update or not data.is_ready_for_analysis():
await asyncio.sleep(10) # انتظار تجميع البيانات
continue
# 2. التحليل اللحظي الجديد (Fresh Analysis)
# أ. تحليل Titan اللحظي (5m)
titan_score = 0.0
if self.titan:
# تجهيز البيانات لـ Titan (يحتاج قاموس بجميع الفريمات المتاحة)
titan_packet = {tf: list(data.ohlcv[tf]) for tf in tfs}
titan_res = await asyncio.to_thread(self.titan.predict, titan_packet)
titan_score = titan_res.get('score', 0.0)
# ب. تحليل الأنماط اللحظي (5m)
pattern_score = 0.0
bullish_pattern = False
if self.pattern_analyzer:
# تحليل آخر 100 شمعة 5 دقائق فقط
candles_5m = list(data.ohlcv['5m'])[-100:]
pat_res = await self.pattern_analyzer.detect_chart_patterns({'5m': candles_5m})
pattern_score = pat_res.get('pattern_confidence', 0.0)
if pat_res.get('pattern_detected') and 'BULLISH' in pat_res['pattern_detected'].upper():
bullish_pattern = True
# ج. تحليل زخم دفتر الطلبات (Order Book Imbalance)
ob_imbalance = self._calculate_ob_imbalance(data.order_book)
momentum_confirmed = ob_imbalance > 0.15 # طلبات الشراء أكثر بـ 15%
# 3. قرار الزناد (The Trigger)
# الشروط: Titan قوي جدًا OR (Titan جيد AND نمط إيجابي AND زخم دفتر طلبات)
sniper_fired = False
trigger_reason = ""
if titan_score >= self.SNIPER_ENTRY_THRESHOLD:
sniper_fired = True
trigger_reason = f"Titan Surge ({titan_score:.2f})"
elif titan_score >= 0.65 and bullish_pattern and momentum_confirmed:
sniper_fired = True
trigger_reason = f"Tactical Combo (Titan:{titan_score:.2f} + Pattern + OB:{ob_imbalance:.2f})"
if sniper_fired:
consecutive_signals += 1
print(f"🔥 [Sniper] {symbol} Signal detected! ({consecutive_signals}/2) - {trigger_reason}")
# نطلب تأكيد إشارتين متتاليتين لتقليل الإشارات الكاذبة
if consecutive_signals >= 2:
await self._execute_buy(symbol, trigger_reason, titan_score)
break # الخروج من حلقة المراقبة بعد الشراء
else:
consecutive_signals = max(0, consecutive_signals - 1) # تبريد العداد
await asyncio.sleep(30) # مراقبة كل 30 ثانية (سريع)
except asyncio.CancelledError:
break
except Exception as e:
print(f"⚠️ [Monitor Error] {symbol}: {e}")
await asyncio.sleep(60) # تهدئة عند الخطأ
def _calculate_ob_imbalance(self, ob):
"""حساب عدم توازن دفتر الطلبات (ميل نحو الشراء أو البيع)"""
if not ob or 'bids' not in ob or 'asks' not in ob: return 0.0
# حساب حجم أفضل 10 طلبات وعروض
bids_vol = sum(b[1] for b in ob['bids'][:10])
asks_vol = sum(a[1] for a in ob['asks'][:10])
if (bids_vol + asks_vol) == 0: return 0.0
return (bids_vol - asks_vol) / (bids_vol + asks_vol)
# ==================================================================
# 🚀 التنفيذ وإدارة الصفقات (Execution & Mgmt)
# ==================================================================
async def _execute_buy(self, symbol, reason, score):
"""تنفيذ الشراء الافتراضي وتسجيل الصفقة"""
try:
# 1. جلب السعر الحالي للتنفيذ
ticker = await self.exchange.fetch_ticker(symbol)
entry_price = ticker['last']
# 2. حساب حجم الصفقة (إدارة مخاطر بسيطة: 10% من رأس المال الوهمي)
portfolio = await self.r2.get_portfolio_state_async()
capital = portfolio.get('current_capital_usd', 1000.0)
trade_amount_usd = capital * 0.10
quantity = trade_amount_usd / entry_price
# 3. إنشاء سجل الصفقة
trade = {
"symbol": symbol,
"status": "OPEN",
"entry_price": entry_price,
"quantity": quantity,
"invested_usd": trade_amount_usd,
"entry_time": datetime.now(timezone.utc).isoformat(),
"entry_reason": reason,
"entry_score": score,
# أهداف أولية (يمكن للحارس تعديلها لاحقاً)
"tp_price": entry_price * 1.025, # هدف 2.5%
"sl_price": entry_price * 0.98, # وقف 2%
"highest_price": entry_price # لتتبع الوقف المتحرك
}
# 4. نقل من قائمة المراقبة إلى الصفقات المفتوحة
async with self.lock:
if symbol in self.watchlist:
del self.watchlist[symbol]
self.open_positions[symbol] = trade
# 5. حفظ في R2 (اختياري: يمكن حفظ الصفقات المفتوحة أيضاً)
# await self.r2.save_open_trades_async(list(self.open_positions.values()))
print(f"🚀 [EXECUTED] BUY {symbol} @ {entry_price} | Reason: {reason}")
# بدء مراقبة هذه الصفقة المفتوحة فوراً
asyncio.create_task(self._monitor_open_position(symbol))
except Exception as e:
print(f"❌ [Execution Failed] {symbol}: {e}")
async def _monitor_open_position(self, symbol):
"""حارس الأرباح: مراقبة صفقة مفتوحة حتى الإغلاق"""
print(f"🛡️ [Guardian] Protecting position: {symbol}")
while self.is_running and symbol in self.open_positions:
try:
trade = self.open_positions[symbol]
ticker = await self.exchange.fetch_ticker(symbol)
current_price = ticker['last']
# تحديث أعلى سعر وصل له لتفعيل الوقف المتحرك
if current_price > trade['highest_price']:
trade['highest_price'] = current_price
# تفعيل وقف متحرك إذا تجاوز الربح 1%
if current_price >= trade['entry_price'] * 1.01:
new_sl = current_price * 0.995 # حجز 0.5% تحت أعلى قمة
if new_sl > trade['sl_price']:
trade['sl_price'] = new_sl
print(f"🛡️ [Guardian] {symbol} Trailing SL moved to {new_sl:.4f}")
# التحقق من شروط الخروج
close_reason = None
if current_price >= trade['tp_price']:
close_reason = "TAKE_PROFIT"
elif current_price <= trade['sl_price']:
close_reason = "STOP_LOSS"
if close_reason:
await self._execute_sell(symbol, current_price, close_reason)
break
await asyncio.sleep(5) # مراقبة لصيقة جداً للصفقات المفتوحة
except Exception as e:
print(f"⚠️ [Guardian Error] {symbol}: {e}")
await asyncio.sleep(10)
async def _execute_sell(self, symbol, exit_price, reason):
"""تنفيذ البيع وتسجيل التقارير النهائية"""
async with self.lock:
trade = self.open_positions.pop(symbol, None)
if trade:
# 1. حساب النتائج
gross_pnl = (exit_price - trade['entry_price']) * trade['quantity']
# خصم رسوم وهمية (0.1% دخول + 0.1% خروج)
fees = (trade['invested_usd'] + (exit_price * trade['quantity'])) * 0.001
net_pnl = gross_pnl - fees
pnl_percent = (net_pnl / trade['invested_usd']) * 100
# 2. تحديث بيانات الصفقة
trade.update({
"status": "CLOSED",
"exit_price": exit_price,
"exit_time": datetime.now(timezone.utc).isoformat(),
"exit_reason": reason,
"pnl_usd": net_pnl,
"pnl_percent": pnl_percent,
"fees_paid": fees
})
print(f"💰 [CLOSED] {symbol} ({reason}) | PnL: ${net_pnl:.2f} ({pnl_percent:.2f}%)")
# 3. حفظ التقارير عبر R2 (النظام الجديد)
# أ. حفظ الصفقة المغلقة وتحديث ملخص التداول
await self.r2.save_closed_trade_async(trade)
# ب. تحديث رصيد المحفظة
await self.r2.update_portfolio_balance_async(net_pnl)
if symbol in self.tasks:
del self.tasks[symbol]