Trad / app.py
Riy777's picture
Update app.py
2e2955f
raw
history blame
16.9 kB
# app.py (V14.0 - Unified Smart Cycle - Full Production Version)
import os, sys, traceback, asyncio, gc, json
from datetime import datetime
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, BackgroundTasks
# --- استيراد الوحدات الأساسية ---
try:
from r2 import R2Service
from data_manager import DataManager
from ml_engine.processor import MLProcessor
from trade_manager import TradeManager
from LLM import LLMService
from learning_hub.hub_manager import LearningHubManager
# وحدات الطبقة الثانية
from whale_monitor.core import EnhancedWhaleMonitor
from sentiment_news import NewsFetcher
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
except ImportError as e:
# إيقاف النظام في حال فشل استيراد أي مكون حيوي
sys.exit(f"❌ Fatal Init Error: {e}")
# --- المتغيرات العامة (Global Instances) ---
r2 = None
data_manager = None
ml_processor = None
trade_manager = None
llm_service = None
learning_hub = None
whale_monitor = None
news_fetcher = None
vader = None
class SystemState:
def __init__(self):
self.ready = False
self.cycle_running = False
sys_state = SystemState()
# ==============================================================================
# 🚀 تهيئة النظام الكاملة (Full System Initialization)
# ==============================================================================
async def initialize_system():
global r2, data_manager, ml_processor, trade_manager, llm_service, learning_hub
global whale_monitor, news_fetcher, vader
if sys_state.ready: return
print("\n🔌 [System V14] Initializing Smart Core...")
try:
# 1. الخدمات الأساسية والبنية التحتية
r2 = R2Service()
contracts = await r2.load_contracts_db_async()
whale_monitor = EnhancedWhaleMonitor(contracts_db=contracts, r2_service=r2)
data_manager = DataManager(contracts, whale_monitor, r2)
await data_manager.initialize()
news_fetcher = NewsFetcher()
vader = SentimentIntensityAnalyzer()
# 2. العقل المركزي (LLM) ومحور التعلم
llm_service = LLMService()
llm_service.r2_service = r2 # لربط حفظ البرومبتات
# سيتم ربط learning_hub لاحقاً لتجنب التبعية الدائرية
learning_hub = LearningHubManager(r2, llm_service, data_manager)
await learning_hub.initialize()
llm_service.learning_hub = learning_hub # الربط العكسي
# 3. محركات التحليل والتنفيذ
ml_processor = MLProcessor(None, data_manager, learning_hub)
await ml_processor.initialize()
trade_manager = TradeManager(r2, data_manager, ml_processor.titan, ml_processor.pattern_analyzer)
await trade_manager.initialize_sentry_exchanges()
# بدء حلقات المراقبة الخلفية للحارس
asyncio.create_task(trade_manager.start_sentry_loops())
sys_state.ready = True
print("✅ [System] All Systems GO. Ready for action.")
except Exception as e:
print(f"❌ [Init Failed] {e}")
traceback.print_exc()
sys.exit(1) # خروج اضطراري في حال فشل التهيئة
# ==============================================================================
# 🧠 الدورة الموحدة الذكية (Unified Smart Cycle Entry Point)
# ==============================================================================
async def run_unified_cycle():
"""
نقطة الدخول الوحيدة للدورات.
تقرر تلقائياً بين وضع 'البحث عن فرص' ووضع 'إدارة الصفقات المفتوحة'.
"""
if not sys_state.ready:
print("⏳ [Cycle Skipped] System not ready yet.")
return
if sys_state.cycle_running:
print("⏳ [Cycle Skipped] Another cycle is already running.")
return
# محاولة الحصول على القفل لمنع تداخل العمليات
if not r2.acquire_lock():
print("🔒 [Cycle Skipped] Locked by another process.")
return
sys_state.cycle_running = True
start_time = datetime.now()
try:
# فحص حالة الحارس: هل هناك صفقات نشطة؟
open_trades = list(trade_manager.open_positions.values())
if open_trades:
# --- 🛡️ الفرع الاستراتيجي: وضع إعادة التقييم ---
print(f"\n⚔️ [Unified Cycle] Active trades detected ({len(open_trades)}). Engaging STRATEGIC RE-ANALYSIS mode.")
await _run_reanalysis_mode(open_trades)
else:
# --- 🔭 فرع المستكشف: وضع البحث عن فرص ---
print("\n🔭 [Unified Cycle] No active trades. Engaging EXPLORER mode.")
await _run_explorer_mode()
except Exception as e:
print(f"❌ [Cycle Error] An unexpected error occurred: {e}")
traceback.print_exc()
finally:
# تنظيف وإطلاق القفل دائماً
r2.release_lock()
sys_state.cycle_running = False
gc.collect() # تنظيف الذاكرة
print(f"🏁 [Cycle Finished] Duration: {(datetime.now() - start_time).total_seconds():.1f}s")
# ==============================================================================
# 🛡️ الفرع 1: وضع إعادة التقييم الاستراتيجي (Strategic Re-analysis Mode)
# ==============================================================================
async def _run_reanalysis_mode(open_trades):
"""إعادة تقييم جميع الصفقات المفتوحة باستخدام العقل الكلي"""
for trade in open_trades:
symbol = trade['symbol']
print(f" ⚖️ Re-evaluating {symbol} with Omniscient Brain...")
try:
# 1. جمع البيانات الطازجة (Fresh Data Snapshot)
current_price = await data_manager.get_latest_price_async(symbol)
whale_data = await whale_monitor.get_symbol_whale_activity(symbol)
news_text = await news_fetcher.get_news_for_symbol(symbol)
# محاولة الحصول على تحديث سريع لدرجة تيتان (اختياري، يعتمد على توفر البيانات)
# هنا نستخدم 0 كقيمة افتراضية إذا لم نتمكن من إجراء تحليل كامل سريعاً
titan_score = 0.0
current_data_packet = {
'symbol': symbol,
'current_price': current_price,
'titan_score': titan_score,
'whale_data': whale_data,
'news_text': news_text
}
# 2. استشارة العقل الكلي (Omniscient Brain Consultation)
decision = await llm_service.re_analyze_trade_async(trade, current_data_packet)
# 3. تنفيذ أوامر العقل فوراً
if decision:
action = decision.get('action')
reason = decision.get('reasoning', 'Strategic update by Brain')
if action == 'EMERGENCY_EXIT':
print(f" 🚨 BRAIN COMMAND: IMMEDIATE EMERGENCY EXIT for {symbol}!")
await trade_manager.execute_emergency_exit(symbol, f"Brain Command: {reason}")
elif action == 'UPDATE_TARGETS':
new_tp = decision.get('new_tp')
new_sl = decision.get('new_sl')
if new_tp or new_sl:
print(f" 🎯 BRAIN COMMAND: Update targets for {symbol} (TP: {new_tp}, SL: {new_sl})")
await trade_manager.update_trade_targets(symbol, new_tp, new_sl, f"Brain Update: {reason}")
else:
print(f" ⚠️ Brain requested target update for {symbol} but provided no values.")
else:
# HOLD or unknown action
print(f" ✅ Brain verdict for {symbol}: HOLD. Continuing strategy. ({reason[:50]}...)")
else:
print(f" ⚠️ Failed to get a valid re-analysis decision from Brain for {symbol}.")
except Exception as e:
print(f" ❌ Error re-evaluating {symbol}: {e}")
# ==============================================================================
# 🔭 الفرع 2: وضع المستكشف (The 4-Layer Explorer Mode)
# ==============================================================================
async def _run_explorer_mode():
"""تنفيذ دورة البحث الكاملة عبر الطبقات الأربع"""
try:
# ---------------------------------------------------------
# Layer 1: Rapid Hybrid Screening (Titan + Patterns + Simple MC)
# ---------------------------------------------------------
print("\n--- 🛡️ Layer 1: Rapid Screening ---")
raw_candidates = await data_manager.layer1_rapid_screening()
if not raw_candidates:
print(" ⚠️ No initial candidates found in Layer 1.")
return
l1_passed = []
data_queue = asyncio.Queue()
# بدء بث البيانات ومعالجتها بالتوازي
producer = asyncio.create_task(data_manager.stream_ohlcv_data(raw_candidates, data_queue))
while True:
batch = await data_queue.get()
if batch is None:
data_queue.task_done()
break
for raw_data in batch:
# المعالجة الأولية باستخدام MLProcessor
res = await ml_processor.process_and_score_symbol_enhanced(raw_data)
# عتبة مرور أولية مخففة (مثلاً 0.50) للسماح بمرور عدد كافٍ للطبقة الثانية
if res and res.get('enhanced_final_score', 0.0) >= 0.50:
l1_passed.append(res)
data_queue.task_done()
await producer
# ترتيب واختيار أفضل 10 فقط للتحليل العميق
l1_passed.sort(key=lambda x: x['enhanced_final_score'], reverse=True)
layer2_candidates = l1_passed[:10]
print(f"✅ Layer 1 Complete. {len(layer2_candidates)} candidates advanced to Layer 2.")
if not layer2_candidates: return
# ---------------------------------------------------------
# Layer 2: Deep Analysis (Whales + News + Advanced MC)
# ---------------------------------------------------------
print("\n--- 🐳 Layer 2: Deep Analysis ---")
l2_scored = []
for cand in layer2_candidates:
symbol = cand['symbol']
print(f" 🔎 Deep analyzing {symbol}...")
# A. تحليل الحيتان
whale_data = await whale_monitor.get_symbol_whale_activity(symbol)
# B. تحليل الأخبار والمشاعر
news_text = await news_fetcher.get_news_for_symbol(symbol)
# تحليل VADER بسيط للمساعدة في الترتيب الأولي (النموذج الضخم سيقرأ النص الخام لاحقاً)
news_score = vader.polarity_scores(news_text)['compound'] if news_text else 0.0
# C. حساب "النقاط المعززة" (Enhanced Score) لترتيب المرشحين
# المعادلة: 50% درجة الطبقة الأولى + 30% بونص حيتان + 20% بونص أخبار
l1_score = cand['enhanced_final_score']
whale_bonus = 0.15 if whale_data.get('trading_signal', {}).get('action') in ['BUY', 'STRONG_BUY'] else 0.0
news_bonus = 0.10 if news_score > 0.25 else (-0.10 if news_score < -0.25 else 0.0)
final_l2_score = min(1.0, max(0.0, l1_score + whale_bonus + news_bonus))
# تحديث سجل المرشح بكل البيانات الجديدة (ليراها النموذج الضخم)
cand.update({
'layer2_score': final_l2_score,
'whale_data': whale_data,
'news_text': news_text,
'news_score_raw': news_score
})
l2_scored.append(cand)
# اختيار أفضل 5 للطبقة الثالثة
l2_scored.sort(key=lambda x: x['layer2_score'], reverse=True)
layer3_candidates = l2_scored[:5]
print(f"✅ Layer 2 Complete. Top 5 candidates selected for Brain validation.")
# ---------------------------------------------------------
# Layer 3: The Brain Filter (LLM Validation)
# ---------------------------------------------------------
print("\n--- 🧠 Layer 3: Omniscient Brain Validation ---")
approved_targets = []
for cand in layer3_candidates:
symbol = cand['symbol']
print(f" ⚖️ Consulting Brain for {symbol}...")
# استشارة العقل الكلي بوجبة البيانات الكاملة
decision = await llm_service.get_trading_decision(cand)
if decision and decision.get('action') == 'WATCH':
confidence = decision.get('confidence_level', 0)
print(f" 🎉 APPROVED by Brain! Confidence: {confidence:.2f}")
cand['llm_decision'] = decision
approved_targets.append(cand)
else:
reason = decision.get('reasoning', 'Unknown reason') if decision else 'Brain Consultation Failed'
print(f" 🛑 REJECTED by Brain: {reason[:60]}...")
# ---------------------------------------------------------
# Layer 4: Active Sentry Handover
# ---------------------------------------------------------
print("\n--- 🛡️ Layer 4: Sentry Handover ---")
if approved_targets:
print(f"🚀 Handing over {len(approved_targets)} elite targets to Sentry for tactical execution.")
await trade_manager.update_sentry_watchlist(approved_targets)
else:
print("😴 No candidates passed all 4 layers this cycle. Sentry remains on standby.")
except Exception as e:
print(f"❌ [Explorer Mode Error] {e}")
traceback.print_exc()
# ==============================================================================
# 🔥 تطبيق FastAPI (نقاط النهاية)
# ==============================================================================
@asynccontextmanager
async def lifespan(app: FastAPI):
# تهيئة النظام عند البدء
await initialize_system()
yield
# تنظيف الموارد عند الإغلاق
if trade_manager: await trade_manager.stop_sentry_loops()
if data_manager: await data_manager.close()
print("👋 System Shutdown Gracefully.")
app = FastAPI(lifespan=lifespan, title="Titan 4-Layer Hybrid System V14")
@app.get("/")
def root():
return {
"status": "Smart Hybrid System Online",
"initialized": sys_state.ready,
"mode": "Re-analysis" if trade_manager and trade_manager.open_positions else "Explorer"
}
@app.get("/run-cycle")
async def trigger_cycle(background_tasks: BackgroundTasks):
"""نقطة الاستدعاء الخارجية (مثلاً كل 15 دقيقة)"""
if not sys_state.ready:
raise HTTPException(status_code=503, detail="System is still initializing...")
# إضافة الدورة الموحدة إلى مهام الخلفية
background_tasks.add_task(run_unified_cycle)
return {"message": "Unified smart cycle triggered in background."}
@app.get("/status")
async def get_status():
"""جلب حالة النظام التفصيلية"""
return {
"initialized": sys_state.ready,
"cycle_running": sys_state.cycle_running,
"open_positions_count": len(trade_manager.open_positions) if trade_manager else 0,
"open_positions": list(trade_manager.open_positions.keys()) if trade_manager else [],
"watchlist_count": len(trade_manager.watchlist) if trade_manager else 0,
"watchlist": list(trade_manager.watchlist.keys()) if trade_manager else []
}
if __name__ == "__main__":
import uvicorn
# تشغيل السيرفر
uvicorn.run(app, host="0.0.0.0", port=7860)