Trad / app.py
Riy777's picture
Update app.py
c88a877
raw
history blame
7.19 kB
# app.py (V12.3 - Titan Orchestrator + Detailed Diagnostics)
import os
import traceback
import signal
import sys
import uvicorn
import asyncio
import json
import gc
import time
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, BackgroundTasks
from datetime import datetime
from typing import List, Dict, Any
# --- استيراد الخدمات ---
try:
from r2 import R2Service
from LLM import LLMService
from data_manager import DataManager
from ml_engine.processor import MLProcessor
from learning_hub.hub_manager import LearningHubManager
from trade_manager import TradeManager
except ImportError as e:
sys.exit(f"❌ Fatal Error: {e}")
# --- المتغيرات العالمية ---
r2_service_global = None
data_manager_global = None
llm_service_global = None
learning_hub_global = None
trade_manager_global = None
ml_processor_global = None
# --- مدير الحالة ---
class StateManager:
def __init__(self):
self.initialization_complete = False
self.services_initialized = {}
def set_service_initialized(self, service_name):
self.services_initialized[service_name] = True
if len(self.services_initialized) >= 5:
self.initialization_complete = True
print("🎯 [System] All services initialized. Ready for external triggers.")
state_manager = StateManager()
# --- التهيئة المركزية ---
async def initialize_services():
global r2_service_global, data_manager_global, llm_service_global
global learning_hub_global, trade_manager_global, ml_processor_global
try:
print("🚀 [System V12.3] Starting Titan-Powered Initialization...")
r2_service_global = R2Service()
state_manager.set_service_initialized('r2')
contracts_db = await r2_service_global.load_contracts_db_async() or {}
data_manager_global = DataManager(contracts_db, None, r2_service_global)
await data_manager_global.initialize()
state_manager.set_service_initialized('data')
llm_service_global = LLMService()
llm_service_global.r2_service = r2_service_global
learning_hub_global = LearningHubManager(r2_service_global, llm_service_global, data_manager_global)
await learning_hub_global.initialize()
state_manager.set_service_initialized('hub')
ml_processor_global = MLProcessor(None, data_manager_global, learning_hub_global)
await ml_processor_global.initialize()
state_manager.set_service_initialized('processor')
trade_manager_global = TradeManager(
r2_service_global,
data_manager_global,
titan_engine=ml_processor_global.titan
)
await trade_manager_global.initialize_sentry_exchanges()
state_manager.set_service_initialized('trade')
return True
except Exception as e:
print(f"❌ [Init Error] {e}")
traceback.print_exc()
return False
# --- دورة المستكشف (Titan Explorer Cycle) ---
async def run_explorer_cycle():
if not state_manager.initialization_complete:
print("⏳ [Cycle Skipped] System still initializing...")
return
print(f"\n🔭 [Explorer V12.3] Cycle started at {datetime.now().strftime('%H:%M:%S')}")
try:
# 1. غربلة سريعة (Layer 1)
candidates = await data_manager_global.layer1_rapid_screening()
if not candidates:
print("😴 [Explorer] No candidates found in Layer 1.")
return
# 2. تحليل عميق (Layer 2 - Titan Hybrid)
print(f"🔬 [Titan Hybrid] analyzing {len(candidates)} candidates...")
titan_candidates = []
all_scored_debug = []
data_queue = asyncio.Queue(maxsize=10)
producer = asyncio.create_task(data_manager_global.stream_ohlcv_data(candidates, data_queue))
while True:
batch = await data_queue.get()
if batch is None:
data_queue.task_done()
break
for raw_data in batch:
res = await ml_processor_global.process_and_score_symbol_enhanced(raw_data)
if res:
score = res.get('enhanced_final_score', 0.0)
all_scored_debug.append(res)
if score >= data_manager_global.TITAN_ENTRY_THRESHOLD:
print(f" 🌟 [Titan Approved] {res['symbol']} Score: {score:.4f}")
titan_candidates.append(res)
data_queue.task_done()
await producer
# 3. التحديث النهائي للحارس
if titan_candidates:
titan_candidates.sort(key=lambda x: x['enhanced_final_score'], reverse=True)
top_picks = titan_candidates[:5]
print(f"✅ [Explorer] Sending {len(top_picks)} to Sentry.")
await trade_manager_global.update_sentry_watchlist(top_picks)
else:
print("📉 [Explorer] Titan rejected all candidates this cycle.")
# 🔥 طباعة تشخيصية مفصلة لأفضل المرفوضين 🔥
if all_scored_debug:
print(f"\n🔍 [Debug] أفضل 10 مرفوضين (العتبة: {data_manager_global.TITAN_ENTRY_THRESHOLD}):")
all_scored_debug.sort(key=lambda x: x.get('enhanced_final_score', 0.0), reverse=True)
for i, cand in enumerate(all_scored_debug[:10]):
final = cand.get('enhanced_final_score', 0.0)
comps = cand.get('components', {})
t_s = comps.get('titan_score', 0.0)
p_s = comps.get('patterns_score', 0.0)
m_s = comps.get('mc_score', 0.0)
# طباعة مفصلة: النتيجة النهائية [تيتان | أنماط | مونت كارلو]
print(f" #{i+1} {cand['symbol']:<10}: {final:.4f} [🐺Titan:{t_s:.2f} | 📊Pat:{p_s:.2f} | 🎲MC:{m_s:.2f}]")
print("-" * 60)
except Exception as e:
print(f"❌ [Cycle Error] {e}")
traceback.print_exc()
finally:
gc.collect()
# --- FastAPI Setup ---
@asynccontextmanager
async def lifespan(app: FastAPI):
asyncio.create_task(initialize_services())
yield
if trade_manager_global: await trade_manager_global.stop_sentry_loops()
if data_manager_global: await data_manager_global.close()
print("👋 [System] Shutdown complete.")
app = FastAPI(lifespan=lifespan, title="Titan Trading Bot V12.3")
@app.get("/")
async def root():
return {"status": "Titan Online", "initialized": state_manager.initialization_complete}
@app.get("/run-cycle")
async def trigger_cycle(background_tasks: BackgroundTasks):
if not state_manager.initialization_complete:
raise HTTPException(status_code=503, detail="System initializing...")
background_tasks.add_task(run_explorer_cycle)
return {"message": "Cycle triggered"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)