Trad / app.py
Riy777's picture
Update app.py
8d3a60a
raw
history blame
13.6 kB
# app.py (V11.1 - Full Integration with Memory Management for HF Spaces)
import os
import traceback
import signal
import sys
import uvicorn
import asyncio
import json
import gc
import time
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, BackgroundTasks
from datetime import datetime
from typing import List, Dict, Any
# --- استيراد الخدمات الأساسية للنظام ---
try:
from r2 import R2Service
from LLM import LLMService
from ml_engine.data_manager import DataManager
from ml_engine.processor import MLProcessor
from learning_hub.hub_manager import LearningHubManager
from trade_manager import TradeManager
except ImportError as e:
print(f"❌ [App] خطأ فادح في استيراد الوحدات الأساسية: {e}")
sys.exit(1)
# --- المتغيرات العالمية (Global Instances) ---
r2_service_global = None
data_manager_global = None
llm_service_global = None
learning_hub_global = None
trade_manager_global = None
ml_processor_global = None
MARKET_STATE_OK = True
# --- مدير حالة النظام ---
class StateManager:
def __init__(self):
self.initialization_complete = False
self.initialization_error = None
self.services_initialized = {
'r2': False, 'data': False, 'llm': False,
'hub': False, 'processor': False, 'trade': False
}
def set_service_initialized(self, service_name):
self.services_initialized[service_name] = True
if all(self.services_initialized.values()):
self.initialization_complete = True
print("🎯 [System] اكتملت تهيئة جميع الخدمات بنجاح.")
async def wait_for_initialization(self, timeout=120):
"""انتظار آمن لاكتمال التهيئة مع مهلة زمنية"""
start_time = time.time()
while not self.initialization_complete:
if time.time() - start_time > timeout:
print("⚠️ [System] تجاوزت مهلة انتظار التهيئة.")
return False
if self.initialization_error:
print(f"❌ [System] توقف الانتظار بسبب خطأ في التهيئة: {self.initialization_error}")
return False
await asyncio.sleep(2)
return True
state_manager = StateManager()
# --- دالة التهيئة المركزية ---
async def initialize_services():
global r2_service_global, data_manager_global, llm_service_global
global learning_hub_global, trade_manager_global, ml_processor_global
try:
print("🚀 [System V11.1] بدء تهيئة الخدمات (وضع توفير الذاكرة)...")
# 1. الطبقة الأساسية (R2 & Data)
r2_service_global = R2Service()
state_manager.set_service_initialized('r2')
# تحميل قاعدة العقود (إن وجدت)
contracts_db = await r2_service_global.load_contracts_db_async() or {}
data_manager_global = DataManager(contracts_db, whale_monitor=None, r2_service=r2_service_global)
await data_manager_global.initialize()
state_manager.set_service_initialized('data')
# 2. طبقة الذكاء (LLM & Learning Hub)
llm_service_global = LLMService()
llm_service_global.r2_service = r2_service_global
learning_hub_global = LearningHubManager(r2_service_global, llm_service_global, data_manager_global)
await learning_hub_global.initialize()
llm_service_global.learning_hub = learning_hub_global # ربط عكسي
state_manager.set_service_initialized('llm')
state_manager.set_service_initialized('hub')
# 3. طبقة التنفيذ والمعالجة
ml_processor_global = MLProcessor(
market_context=None, # سيتم تحديثه في كل دورة
data_manager=data_manager_global,
learning_hub=learning_hub_global
)
state_manager.set_service_initialized('processor')
trade_manager_global = TradeManager(
r2_service_global, learning_hub_global, data_manager_global,
state_manager, callback_on_close=run_bot_cycle_wrapper
)
await trade_manager_global.initialize_sentry_exchanges()
state_manager.set_service_initialized('trade')
return True
except Exception as e:
error_msg = f"فشل تهيئة الخدمات: {e}"
print(f"❌ [System] {error_msg}")
traceback.print_exc()
state_manager.initialization_error = error_msg
return False
# --- دورة المستكشف الرئيسية (مع إدارة الذاكرة) ---
async def run_explorer_cycle():
"""
دورة المستكشف الكاملة (Layer 1).
تقوم بتحميل النماذج عند البدء، وتفريغها عند الانتهاء لتوفير الذاكرة.
"""
if not await state_manager.wait_for_initialization():
print("⏳ [Explorer] الخدمات غير جاهزة بعد. تم تخطي الدورة.")
return
print("\n🔭 [Explorer V11.1] بدء دورة استكشاف جديدة...")
# 🔴 1. [إدارة الذاكرة] إعادة تحميل النماذج قبل البدء
print("🔄 [Memory] التأكد من تحميل نماذج ML للدورة الحالية...")
try:
if data_manager_global.pattern_analyzer and not data_manager_global.pattern_analyzer.initialized:
await data_manager_global.pattern_analyzer.initialize()
if data_manager_global.layer1_ranker and not data_manager_global.layer1_ranker.model:
await data_manager_global.layer1_ranker.initialize()
except Exception as e:
print(f"⚠️ [Memory] تحذير أثناء إعادة تحميل النماذج: {e}")
try:
# أ. تحديث سياق السوق
market_ctx = await data_manager_global.get_market_context_async()
ml_processor_global.market_context = market_ctx
if market_ctx.get('market_trend') == 'bear_market' and market_ctx.get('fear_and_greed_index', 50) < 20:
print("🐻 [Explorer] السوق في حالة خوف شديد. قد يتم تقليل النشاط.")
# ب. الغربلة الأولية السريعة (Layer 1.1)
candidates_l1 = await data_manager_global.layer1_rapid_screening()
if not candidates_l1:
print("😴 [Explorer] لم يتم العثور على مرشحين في الغربلة الأولية.")
return
# ج. التحليل العميق المتوازي (Layer 1.2 & 1.3)
print(f"🔬 [Explorer] تحليل عميق لـ {len(candidates_l1)} عملة...")
analyzed_candidates = []
data_queue = asyncio.Queue(maxsize=5) # طابور صغير لتوفير الذاكرة
# تشغيل المنتج (جلب البيانات) والمستهلك (التحليل) بالتوازي
producer_task = asyncio.create_task(data_manager_global.stream_ohlcv_data(candidates_l1, data_queue))
while True:
batch = await data_queue.get()
if batch is None:
data_queue.task_done()
break
# تحليل الدفعة
tasks = [ml_processor_global.process_and_score_symbol_enhanced(c) for c in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for res in results:
if res and isinstance(res, dict):
# شرط التأهل للقرار النهائي: درجة >= 60%
if res.get('enhanced_final_score', 0) >= 0.60:
analyzed_candidates.append(res)
data_queue.task_done()
await producer_task
# د. الترشيح النهائي والاختيار (Layer 1.4 LLM)
if analyzed_candidates:
analyzed_candidates.sort(key=lambda x: x['enhanced_final_score'], reverse=True)
top_candidates = analyzed_candidates[:5] # نرسل أفضل 5 فقط
print(f"🧠 [Explorer] إرسال أفضل {len(top_candidates)} عملة إلى LLM...")
watchlist = []
for cand in top_candidates:
# استدعاء LLM لاتخاذ قرار استراتيجي
decision = await llm_service_global.get_trading_decision(cand)
if decision and decision.get('action') == 'WATCH':
watchlist.append({
'symbol': cand['symbol'],
'strategy_hint': decision.get('strategy_to_watch', 'GENERIC'),
'llm_decision_context': {'decision': decision},
'explorer_score': cand.get('enhanced_final_score', 0)
})
print(f" ✅ [LLM] وافق على {cand['symbol']} (Strategy: {decision.get('strategy_to_watch')})")
else:
print(f" ❌ [LLM] رفض {cand['symbol']}")
# تحديث قائمة الحارس
await trade_manager_global.update_sentry_watchlist(watchlist)
else:
print("📉 [Explorer] لا يوجد مرشحين مؤهلين بعد التحليل العميق.")
await trade_manager_global.update_sentry_watchlist([])
except Exception as e:
print(f"❌ [Explorer Error] حدث خطأ غير متوقع أثناء الدورة: {e}")
traceback.print_exc()
finally:
# 🔴 2. [إدارة الذاكرة] تنظيف الذاكرة بعد انتهاء الدورة
print("🧹 [System] بدء تنظيف الذاكرة وإسبات النماذج...")
if data_manager_global.pattern_analyzer:
data_manager_global.pattern_analyzer.clear_memory()
if data_manager_global.layer1_ranker:
data_manager_global.layer1_ranker.clear_memory()
gc.collect() # إجبار النظام على تحرير الذاكرة غير المستخدمة
print("✅ [System] اكتمل التنظيف. النظام في وضع الخمول (Idle).")
# غلاف للدورة ليتوافق مع callback مدير الصفقات
async def run_bot_cycle_wrapper():
await run_explorer_cycle()
# --- مهام الخلفية المجدولة ---
async def scheduled_tasks_loop():
"""حلقة المهام المجدولة (مثل الدورة الرئيسية كل فترة)"""
await state_manager.wait_for_initialization()
while True:
try:
# تشغيل دورة المستكشف كل 15 دقيقة (مثلاً)
await asyncio.sleep(900)
if not trade_manager_global.sentry_watchlist: # إذا كانت القائمة فارغة، نبحث عن فرص جديدة
await run_explorer_cycle()
except asyncio.CancelledError: break
except Exception as e:
print(f"⚠️ [Scheduler] خطأ في المهام المجدولة: {e}")
await asyncio.sleep(60)
# --- إعداد تطبيق FastAPI ---
@asynccontextmanager
async def lifespan(app: FastAPI):
# عند البدء
print("🏁 [Lifespan] بدء تشغيل النظام...")
init_task = asyncio.create_task(initialize_services())
yield # هنا يعمل التطبيق
# عند الإيقاف
print("🛑 [Lifespan] إيقاف النظام...")
if trade_manager_global: await trade_manager_global.stop_sentry_loops()
if data_manager_global: await data_manager_global.close()
print("👋 [Lifespan] تم الإيقاف بنجاح.")
app = FastAPI(lifespan=lifespan, title="AI Trading Bot V11.1", version="11.1.0")
# --- نقاط النهاية (API Endpoints) ---
@app.get("/")
async def root():
return {
"status": "running" if state_manager.initialization_complete else "initializing",
"system": "Explorer-Sentry-Executor V11.1 (Memory Optimized)",
"timestamp": datetime.now().isoformat()
}
@app.get("/run-cycle")
async def trigger_cycle(background_tasks: BackgroundTasks):
"""تشغيل دورة مستكشف يدوياً في الخلفية"""
if not state_manager.initialization_complete:
raise HTTPException(status_code=503, detail="System initializing...")
background_tasks.add_task(run_explorer_cycle)
return {"message": "Explorer cycle triggered in background"}
@app.get("/status")
async def get_status():
"""جلب حالة النظام والحارس"""
sentry_status = {}
if trade_manager_global:
sentry_status = trade_manager_global.get_sentry_status()
return {
"initialization": state_manager.initialization_complete,
"services": state_manager.services_initialized,
"sentry": sentry_status,
"memory_mode": "On-Demand Loading Active"
}
@app.on_event("startup")
async def startup_event():
"""تشغيل حلقات المراقبة الخلفية بعد بدء التطبيق"""
asyncio.create_task(scheduled_tasks_loop())
# تشغيل حلقة الحارس إذا كان مهيأ
if trade_manager_global:
asyncio.create_task(trade_manager_global.start_sentry_and_monitoring_loops())
if __name__ == "__main__":
# تشغيل الخادم
uvicorn.run(app, host="0.0.0.0", port=7860)