Trad / LLM.py
Riy777's picture
Update LLM.py
60efff2
raw
history blame
40.8 kB
# LLM.py (Updated to integrate LearningHub and English-only prompts)
import os, traceback, asyncio, json, time
import re # ✅ استيراد مكتبة re
from datetime import datetime
from functools import wraps
from backoff import on_exception, expo
from openai import OpenAI, RateLimitError, APITimeoutError
import numpy as np
from sentiment_news import NewsFetcher
# ✅ تعديل الاستيراد: parse_json_from_response لم يعد مستخدماً هنا بشكل مباشر لتحليل استجابة النموذج الرئيسية
from helpers import validate_required_fields, format_technical_indicators, format_strategy_scores, format_candle_data_for_pattern_analysis, format_whale_analysis_for_llm, parse_json_from_response
from ml_engine.processor import safe_json_parse # ✅ الإصلاح: استيراد المحلل الآمن من الموجه الجديد
NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY")
PRIMARY_MODEL = "nvidia/llama-3.1-nemotron-ultra-253b-v1"
class PatternAnalysisEngine:
# --- (هذا الكلاس جزء من LLM.py ومطلوب لتحليل الشموع) ---
def __init__(self, llm_service):
self.llm = llm_service
def _format_chart_data_for_llm(self, ohlcv_data):
"""تنسيق شامل لبيانات الشموع الخام لتحليل الأنماط"""
if not ohlcv_data:
return "Insufficient chart data for pattern analysis"
try:
# استخدام جميع الأطر الزمنية المتاحة مع البيانات الخام
all_timeframes = []
for timeframe, candles in ohlcv_data.items():
if candles and len(candles) >= 10: # تخفيف الشرط من 20 إلى 10 شموع
# تمرير البيانات الخام مباشرة للنموذج
raw_candle_summary = self._format_raw_candle_data(candles, timeframe)
all_timeframes.append(f"=== {timeframe.upper()} TIMEFRAME ({len(candles)} CANDLES) ===\n{raw_candle_summary}")
return "\n\n".join(all_timeframes) if all_timeframes else "No sufficient timeframe data available"
except Exception as e:
return f"Error formatting chart data: {str(e)}"
def _format_raw_candle_data(self, candles, timeframe):
"""تنسيق بيانات الشموع الخام بشكل مفصل للنموذج"""
try:
if len(candles) < 10:
return f"Only {len(candles)} candles available - insufficient for deep pattern analysis"
# أخذ آخر 50 شمعة كحد أقصى لتجنب السياق الطويل جداً
analysis_candles = candles[-50:] if len(candles) > 50 else candles
summary = []
summary.append(f"Total candles: {len(candles)} (showing last {len(analysis_candles)})")
summary.append("Recent candles (newest to oldest):")
# عرض آخر 15 شمعة بالتفصيل
for i in range(min(15, len(analysis_candles))):
idx = len(analysis_candles) - 1 - i
candle = analysis_candles[idx]
# تحويل الطابع الزمني
try:
timestamp = datetime.fromtimestamp(candle[0] / 1000).strftime('%Y-%m-%d %H:%M:%S')
except:
timestamp = "unknown"
open_price, high, low, close, volume = candle[1], candle[2], candle[3], candle[4], candle[5]
candle_type = "🟢 BULLISH" if close > open_price else "🔴 BEARISH" if close < open_price else "⚪ NEUTRAL"
body_size = abs(close - open_price)
body_percent = (body_size / open_price * 100) if open_price > 0 else 0
wick_upper = high - max(open_price, close)
wick_lower = min(open_price, close) - low
total_range = high - low
if total_range > 0:
body_ratio = (body_size / total_range) * 100
upper_wick_ratio = (wick_upper / total_range) * 100
lower_wick_ratio = (wick_lower / total_range) * 100
else:
body_ratio = upper_wick_ratio = lower_wick_ratio = 0
summary.append(f"{i+1:2d}. {timestamp} | {candle_type}")
summary.append(f" O:{open_price:.8f} H:{high:.8f} L:{low:.8f} C:{close:.8f}")
summary.append(f" Body: {body_percent:.2f}% | Body/Range: {body_ratio:.1f}%")
summary.append(f" Wicks: Upper {upper_wick_ratio:.1f}% / Lower {lower_wick_ratio:.1f}%")
summary.append(f" Volume: {volume:,.0f}")
# إضافة تحليل إحصائي
if len(analysis_candles) >= 20:
stats = self._calculate_candle_statistics(analysis_candles)
summary.append(f"\n📊 STATISTICAL ANALYSIS:")
summary.append(f"• Price Change: {stats['price_change']:+.2f}%")
summary.append(f"• Average Body Size: {stats['avg_body']:.4f}%")
summary.append(f"• Volatility (ATR): {stats['atr']:.6f}")
summary.append(f"• Trend: {stats['trend']}")
summary.append(f"• Support: {stats['support']:.6f}")
summary.append(f"• Resistance: {stats['resistance']:.6f}")
return "\n".join(summary)
except Exception as e:
return f"Error formatting raw candle data: {str(e)}"
def _calculate_candle_statistics(self, candles):
"""حساب الإحصائيات الأساسية للشموع"""
try:
closes = [c[4] for c in candles]
opens = [c[1] for c in candles]
highs = [c[2] for c in candles]
lows = [c[3] for c in candles]
# حساب التغير في السعر
first_close = closes[0]
last_close = closes[-1]
price_change = ((last_close - first_close) / first_close) * 100
# حساب متوسط حجم الجسم
body_sizes = [abs(close - open) for open, close in zip(opens, closes)]
avg_body = (sum(body_sizes) / len(body_sizes)) / first_close * 100 if first_close > 0 else 0 # Handle potential ZeroDivisionError
# حساب ATR مبسط
true_ranges = []
for i in range(1, len(candles)):
high, low, prev_close = highs[i], lows[i], closes[i-1]
tr1 = high - low
tr2 = abs(high - prev_close)
tr3 = abs(low - prev_close)
true_ranges.append(max(tr1, tr2, tr3))
atr = sum(true_ranges) / len(true_ranges) if true_ranges else 0
# تحديد الاتجاه
if price_change > 3:
trend = "STRONG UPTREND"
elif price_change > 1:
trend = "UPTREND"
elif price_change < -3:
trend = "STRONG DOWNTREND"
elif price_change < -1:
trend = "DOWNTREND"
else:
trend = "SIDEWAYS"
# مستويات الدعم والمقاومة المبسطة
support = min(lows)
resistance = max(highs)
return {
'price_change': price_change,
'avg_body': avg_body,
'atr': atr,
'trend': trend,
'support': support,
'resistance': resistance
}
except Exception as e:
# Provide default values in case of calculation error
return {
'price_change': 0,
'avg_body': 0,
'atr': 0,
'trend': 'UNKNOWN',
'support': 0,
'resistance': 0
}
async def analyze_chart_patterns(self, symbol, ohlcv_data):
"""(تم تركها فارغة عمداً لأن النموذج الضخم يقوم بها الآن)"""
pass
def _parse_pattern_response(self, response_text):
"""(تم تركها فارغة عمداً)"""
pass
class LLMService:
def __init__(self, api_key=NVIDIA_API_KEY, model_name=PRIMARY_MODEL, temperature=0.7):
self.api_key = api_key
self.model_name = model_name
self.temperature = temperature
self.client = OpenAI(base_url="https://integrate.api.nvidia.com/v1", api_key=self.api_key)
self.news_fetcher = NewsFetcher()
self.pattern_engine = PatternAnalysisEngine(self)
self.semaphore = asyncio.Semaphore(5)
self.r2_service = None # (Set from app.py)
# 🔴 --- START OF CHANGE --- 🔴
# Renamed from self.learning_engine to self.learning_hub
self.learning_hub = None # (Set from app.py, expects LearningHubManager)
# 🔴 --- END OF CHANGE --- 🔴
def _rate_limit_nvidia_api(func):
@wraps(func)
@on_exception(expo, RateLimitError, max_tries=5)
async def wrapper(*args, **kwargs):
return await func(*args, **kwargs)
return wrapper
async def get_trading_decision(self, data_payload: dict):
try:
symbol = data_payload.get('symbol', 'unknown')
target_strategy = data_payload.get('target_strategy', 'GENERIC')
ohlcv_data = data_payload.get('raw_ohlcv') or data_payload.get('ohlcv')
if not ohlcv_data:
print(f"⚠️ No candle data for {symbol} - skipping analysis")
return None
total_candles = sum(len(data) for data in ohlcv_data.values() if data) if ohlcv_data else 0
timeframes_count = len([tf for tf, data in ohlcv_data.items() if data and len(data) >= 10]) if ohlcv_data else 0
if total_candles < 30:
print(f" ⚠️ Insufficient candle data for {symbol}: {total_candles} candles")
return None
valid_timeframes = [tf for tf, candles in ohlcv_data.items() if candles and len(candles) >= 5]
if not valid_timeframes:
print(f" ⚠️ No valid timeframes for {symbol}")
return None
news_text = await self.news_fetcher.get_news_for_symbol(symbol)
whale_data = data_payload.get('whale_data', {})
# 🔴 --- START OF CHANGE --- 🔴
# (Fetch learning context from the new hub)
statistical_feedback = "No statistical learning data yet."
active_context_playbook = "No active learning rules available."
if self.learning_hub and self.learning_hub.initialized:
# 1. Get Statistical Feedback (Slow-learner)
statistical_feedback = await self.learning_hub.get_statistical_feedback_for_llm(target_strategy)
# 2. Get Active Context / Deltas (Fast-learner)
active_context_playbook = await self.learning_hub.get_active_context_for_llm(
domain="strategy",
query=f"{target_strategy} {symbol}" # (Query with strategy and symbol)
)
# (Pass new context to the prompt creator)
prompt = self._create_comprehensive_trading_prompt(
data_payload,
news_text,
None,
whale_data,
statistical_feedback,
active_context_playbook
)
# 🔴 --- END OF CHANGE --- 🔴
if self.r2_service:
analysis_data = {
'symbol': symbol,
'current_price': data_payload.get('current_price'),
'enhanced_final_score': data_payload.get('enhanced_final_score'),
'target_strategy': target_strategy,
'statistical_feedback': statistical_feedback,
'active_context_playbook': active_context_playbook,
'whale_data_available': whale_data.get('data_available', False),
'total_candles': total_candles,
'timeframes_count': timeframes_count,
'timestamp': datetime.now().isoformat()
}
await self.r2_service.save_llm_prompts_async(
symbol, 'comprehensive_trading_decision_v3_hub', prompt, analysis_data
)
async with self.semaphore:
response = await self._call_llm(prompt)
decision_dict = self._parse_llm_response_enhanced(response, target_strategy, symbol)
if decision_dict:
decision_dict['model_source'] = self.model_name
decision_dict['whale_data_integrated'] = whale_data.get('data_available', False)
decision_dict['total_candles_analyzed'] = total_candles
decision_dict['timeframes_analyzed'] = timeframes_count
return decision_dict
else:
print(f"❌ LLM parsing failed for {symbol} - no fallback decisions")
return None
except Exception as e:
print(f"❌ Error in get_trading_decision for {data_payload.get('symbol', 'unknown')}: {e}")
traceback.print_exc()
return None
def _parse_llm_response_enhanced(self, response_text: str, fallback_strategy: str, symbol: str) -> dict:
try:
json_str = parse_json_from_response(response_text)
if not json_str:
print(f"❌ Failed to extract JSON from LLM response for {symbol}")
return None
decision_data = safe_json_parse(json_str)
if not decision_data:
print(f"❌ Failed to parse JSON (safe_json_parse) for {symbol}: {response_text}")
return None
# (This check is for the trading decision, not the reflector response)
if fallback_strategy == "reflection" or fallback_strategy == "distillation":
# (If this is a reflector/curator call, just return the data)
return decision_data
required_fields = [
'action', 'reasoning', 'risk_assessment', 'stop_loss', 'take_profit',
'expected_target_minutes', 'confidence_level', 'pattern_identified_by_llm',
'exit_profile', 'exit_parameters'
]
if not validate_required_fields(decision_data, required_fields):
print(f"❌ Missing required fields in LLM response for {symbol}")
missing = [f for f in required_fields if f not in decision_data]
print(f" MIA: {missing}")
return None
if not isinstance(decision_data['exit_parameters'], dict):
print(f"❌ 'exit_parameters' is not a valid dict for {symbol}")
return None
action = decision_data.get('action')
if action not in ['BUY', 'HOLD']:
print(f"⚠️ LLM suggested unsupported action ({action}) for {symbol}. Forcing HOLD.")
decision_data['action'] = 'HOLD'
if decision_data['action'] == 'BUY':
decision_data['trade_type'] = 'LONG'
else:
decision_data['trade_type'] = None
strategy_value = decision_data.get('strategy')
if not strategy_value or strategy_value == 'unknown':
decision_data['strategy'] = fallback_strategy
return decision_data
except Exception as e:
print(f"❌ Error parsing LLM response for {symbol}: {e}")
return None
async def _get_pattern_analysis(self, data_payload):
try:
symbol = data_payload['symbol']
ohlcv_data = data_payload.get('raw_ohlcv') or data_payload.get('ohlcv')
if ohlcv_data:
# (This is a placeholder, as PatternAnalysisEngine.analyze_chart_patterns is not implemented)
return None
return None
except Exception as e:
print(f"❌ Pattern analysis failed for {data_payload.get('symbol')}: {e}")
return None
# 🔴 --- START OF PROMPT CHANGE --- 🔴
def _create_comprehensive_trading_prompt(
self,
payload: dict,
news_text: str,
pattern_analysis: dict, # (This is the old system, now deprecated, but we leave the arg)
whale_data: dict,
statistical_feedback: str, # (NEW from Hub)
active_context_playbook: str # (NEW from Hub)
) -> str:
symbol = payload.get('symbol', 'N/A')
current_price = payload.get('current_price', 'N/A')
reasons = payload.get('reasons_for_candidacy', [])
sentiment_data = payload.get('sentiment_data', {})
advanced_indicators = payload.get('advanced_indicators', {})
strategy_scores = payload.get('strategy_scores', {})
recommended_strategy = payload.get('recommended_strategy', 'N/A')
target_strategy = payload.get('target_strategy', 'GENERIC')
enhanced_final_score = payload.get('enhanced_final_score', 0)
enhanced_score_display = f"{enhanced_final_score:.3f}" if isinstance(enhanced_final_score, (int, float)) else str(enhanced_final_score)
indicators_summary = format_technical_indicators(advanced_indicators)
strategies_summary = format_strategy_scores(strategy_scores, recommended_strategy)
whale_analysis_section = format_whale_analysis_for_llm(whale_data)
ohlcv_data = payload.get('raw_ohlcv') or payload.get('ohlcv', {})
candle_data_section = self._format_candle_data_comprehensive(ohlcv_data)
market_context_section = self._format_market_context(sentiment_data)
# (New sections from the Learning Hub)
statistical_feedback_section = f"🧠 STATISTICAL FEEDBACK (Slow-Learner):\n{statistical_feedback}"
playbook_section = f"📚 LEARNING PLAYBOOK (Fast-Learner Active Rules):\n{active_context_playbook}"
prompt = f"""
COMPREHENSIVE TRADING ANALYSIS FOR {symbol}
🚨 IMPORTANT SYSTEM CONSTRAINT: This is a SPOT TRADING system ONLY. Decisions MUST be limited to BUY (LONG) or HOLD. SHORT selling is NOT possible.
🎯 STRATEGY CONTEXT:
* Target Strategy: {target_strategy}
* Recommended Strategy: {recommended_strategy}
* Current Price: ${current_price}
* Enhanced System Score: {enhanced_score_display}
--- LEARNING HUB INPUT (CRITICAL) ---
{playbook_section}
{statistical_feedback_section}
--- END OF LEARNING INPUT ---
📊 TECHNICAL INDICATORS (ALL TIMEFRAMES):
{indicators_summary}
📈 RAW CANDLE DATA SUMMARY & STATISTICS (FOR YOUR PATTERN ANALYSIS):
{candle_data_section}
{chr(10)}--- END OF CANDLE DATA ---{chr(10)}
🎯 STRATEGY ANALYSIS (System's recommendation based on various factors):
{strategies_summary}
🐋 WHALE ACTIVITY ANALYSIS:
{whale_analysis_section}
🌍 MARKET CONTEXT:
{market_context_section if market_context_section and "No market context" not in market_context_section else "Market context data not available for this analysis."}
📰 LATEST NEWS:
{news_text if news_text else "No significant news found"}
📋 REASONS FOR SYSTEM CANDIDACY (Layer 1 & 2 Screening):
{chr(10).join([f"• {reason}" for reason in reasons]) if reasons else "No specific reasons provided"}
---
🎯 TRADING DECISION INSTRUCTIONS (SPOT ONLY - LLM MUST ANALYZE PATTERNS AND DEFINE EXIT STRATEGY):
1. **PERFORM CHART PATTERN ANALYSIS:** Based *ONLY* on the provided 'RAW CANDLE DATA SUMMARY & STATISTICS' section, identify relevant chart patterns (Triangles, Flags, etc.) and candlestick patterns (Engulfing, Doji, etc.).
2. **INTEGRATE ALL DATA:** Combine YOUR pattern analysis with technicals, strategy analysis, whale activity, market context, news, and (most importantly) the 'LEARNING HUB INPUT'.
3. **ADHERE STRICTLY TO SPOT TRADING RULES:** Only decide 'BUY' (LONG) or 'HOLD'. DO NOT suggest 'SELL'.
4. **DEFINE EXIT STRATEGY (CRITICAL):** If (and only if) action is 'BUY', you MUST define the dynamic exit strategy (Exit Profile) and its parameters. This profile will be executed by a separate tactical bot.
* `"exit_profile"`: Choose one: "ATR_TRAILING" (Recommended for trends/breakouts), "FIXED_TARGET" (Recommended for mean reversion/scalping), "TIME_BASED" (Exit after X minutes regardless).
* `"exit_parameters"`: Define parameters for the chosen profile, respecting the 'Statistical Feedback'.
* For "ATR_TRAILING": {{"atr_multiplier": 2.0, "atr_period": 14, "break_even_trigger_percent": 1.5}} (break_even_trigger_percent moves stop to entry when profit hits 1.5%)
* For "FIXED_TARGET": {{"time_stop_minutes": 120}} (Hard stop if target not hit in 120 mins)
* For "TIME_BASED": {{"exit_after_minutes": 60}}
5. **DEFINE HARD STOPS:** You must still provide the initial "hard" stop_loss (catastrophic failure stop) and the final "take_profit" target. The dynamic exit profile operates *within* these boundaries.
6. **SELF-CRITIQUE (Point 4 of Plan):** After defining the JSON, perform a self-critique. List potential failure modes for your decision and confirm your final answer.
OUTPUT FORMAT (JSON - SPOT ONLY - INCLUDE EXIT PROFILE AND SELF-CRITIQUE):
{{
"action": "BUY/HOLD",
"reasoning": "Detailed explanation integrating ALL data sources, starting with the patterns identified from the candle summary, and justifying the BUY or HOLD decision. Explain *why* the chosen exit_profile is appropriate, considering the Learning Hub feedback.",
"pattern_identified_by_llm": "Name of the primary pattern(s) identified (e.g., 'Bull Flag on 1H', 'No Clear Pattern')",
"pattern_influence": "Explain how the identified pattern(s) influenced the decision.",
"risk_assessment": "low/medium/high",
"stop_loss": 0.000000,
"take_profit": 0.000000,
"exit_profile": "FIXED_TARGET",
"exit_parameters": {{ "time_stop_minutes": 120 }},
"expected_target_minutes": 15,
"confidence_level": 0.85,
"strategy": "{target_strategy}",
"whale_influence": "How whale data influenced the BUY/HOLD decision",
"key_support_level": 0.000000,
"key_resistance_level": 0.000000,
"risk_reward_ratio": 2.5,
"self_critique": {{
"failure_modes": [
"What is the first reason this decision could fail? (e.g., 'The identified pattern is a false breakout.')",
"What is the second reason? (e.g., 'Whale data shows distribution, contradicting the technicals.')"
],
"confidence_adjustment_reason": "Brief reason if confidence was adjusted post-critique."
}}
}}
"""
return prompt
# 🔴 --- END OF PROMPT CHANGE --- 🔴
def _format_candle_data_comprehensive(self, ohlcv_data):
"""تنسيق شامل لبيانات الشموع الخام"""
if not ohlcv_data:
return "No raw candle data available for analysis"
try:
timeframes_available = []
total_candles = 0
for timeframe, candles in ohlcv_data.items():
if candles and len(candles) >= 5:
timeframes_available.append(f"{timeframe.upper()} ({len(candles)} candles)")
total_candles += len(candles)
if not timeframes_available:
return "Insufficient candle data across all timeframes"
summary = f"📊 Available Timeframes: {', '.join(timeframes_available)}\n"
summary += f"📈 Total Candles Available: {total_candles}\n\n"
raw_candle_analysis_text = self.pattern_engine._format_chart_data_for_llm(ohlcv_data)
summary += raw_candle_analysis_text
return summary
except Exception as e:
return f"Error formatting raw candle data: {str(e)}"
def _analyze_timeframe_candles(self, candles, timeframe):
"""تحليل الشموع لإطار زمني محدد - (تستخدم داخلياً بواسطة _format_raw_candle_data)"""
try:
if len(candles) < 10:
return f"Insufficient data ({len(candles)} candles)"
recent_candles = candles[-15:]
closes = [c[4] for c in recent_candles]
opens = [c[1] for c in recent_candles]
highs = [c[2] for c in recent_candles]
lows = [c[3] for c in recent_candles]
volumes = [c[5] for c in recent_candles]
current_price = closes[-1]
first_price = closes[0]
price_change = ((current_price - first_price) / first_price) * 100 if first_price > 0 else 0
if price_change > 2: trend = "🟢 UPTREND"
elif price_change < -2: trend = "🔴 DOWNTREND"
else: trend = "⚪ SIDEWAYS"
high_max = max(highs)
low_min = min(lows)
volatility = ((high_max - low_min) / low_min) * 100 if low_min > 0 else 0
avg_volume = sum(volumes) / len(volumes) if volumes else 1
current_volume = volumes[-1] if volumes else 0
volume_ratio = current_volume / avg_volume if avg_volume > 0 else 1
green_candles = sum(1 for i in range(len(closes)) if closes[i] > opens[i])
red_candles = len(closes) - green_candles
candle_ratio = green_candles / len(closes) if closes else 0
analysis = [
f"📈 Trend: {trend} ({price_change:+.2f}%)",
f"🌊 Volatility: {volatility:.2f}%",
f"📦 Volume: {volume_ratio:.2f}x average",
f"🕯️ Candles: {green_candles}🟢/{red_candles}🔴 ({candle_ratio:.1%} green)",
f"💰 Range: {low_min:.6f} - {high_max:.6f}",
f"🎯 Current: {current_price:.6f}"
]
return "\n".join(analysis)
except Exception as e:
return f"Analysis error: {str(e)}"
def _format_market_context(self, sentiment_data):
"""تنسيق سياق السوق"""
if not sentiment_data or sentiment_data.get('data_quality', 'LOW') == 'LOW':
return "Market context data not available or incomplete."
btc_sentiment = sentiment_data.get('btc_sentiment', 'N/A')
fear_greed = sentiment_data.get('fear_and_greed_index', 'N/A')
market_trend = sentiment_data.get('market_trend', 'N/A')
lines = [
f"• Bitcoin Sentiment: {btc_sentiment}",
f"• Fear & Greed Index: {fear_greed} ({sentiment_data.get('sentiment_class', 'Neutral')})",
f"• Overall Market Trend: {market_trend.replace('_', ' ').title() if isinstance(market_trend, str) else 'N/A'}"
]
general_whale = sentiment_data.get('general_whale_activity', {})
if general_whale and general_whale.get('sentiment') != 'NEUTRAL':
whale_sentiment = general_whale.get('sentiment', 'N/A')
critical_alert = general_whale.get('critical_alert', False)
lines.append(f"• General Whale Sentiment: {whale_sentiment.replace('_', ' ').title() if isinstance(whale_sentiment, str) else 'N/A'}")
if critical_alert:
lines.append(" ⚠️ CRITICAL WHALE ALERT ACTIVE")
return "\n".join(lines)
async def re_analyze_trade_async(self, trade_data: dict, processed_data: dict):
try:
symbol = trade_data['symbol']
original_strategy = trade_data.get('strategy', 'GENERIC')
ohlcv_data = processed_data.get('raw_ohlcv') or processed_data.get('ohlcv')
if not ohlcv_data:
print(f"⚠️ No updated candle data for {symbol} - skipping re-analysis")
return None
news_text = await self.news_fetcher.get_news_for_symbol(symbol)
pattern_analysis = await self._get_pattern_analysis(processed_data)
whale_data = processed_data.get('whale_data', {})
# 🔴 --- START OF CHANGE --- 🔴
# (Fetch learning context from the new hub for re-analysis)
statistical_feedback = "No statistical learning data yet."
active_context_playbook = "No active learning rules available."
if self.learning_hub and self.learning_hub.initialized:
# 1. Get Statistical Feedback (Slow-learner)
statistical_feedback = await self.learning_hub.get_statistical_feedback_for_llm(original_strategy)
# 2. Get Active Context / Deltas (Fast-learner)
active_context_playbook = await self.learning_hub.get_active_context_for_llm(
domain="strategy",
query=f"{original_strategy} {symbol} re-analysis"
)
# (Pass new context to the prompt creator)
prompt = self._create_re_analysis_prompt(
trade_data,
processed_data,
news_text,
pattern_analysis,
whale_data,
statistical_feedback,
active_context_playbook
)
# 🔴 --- END OF CHANGE --- 🔴
if self.r2_service:
analysis_data = {
'symbol': symbol,
'entry_price': trade_data.get('entry_price'),
'current_price': processed_data.get('current_price'),
'original_strategy': original_strategy,
'statistical_feedback': statistical_feedback,
'active_context_playbook': active_context_playbook,
'whale_data_available': whale_data.get('data_available', False)
}
await self.r2_service.save_llm_prompts_async(
symbol, 'trade_reanalysis_v3_hub', prompt, analysis_data
)
async with self.semaphore:
response = await self._call_llm(prompt)
re_analysis_dict = self._parse_re_analysis_response(response, original_strategy, symbol)
if re_analysis_dict:
re_analysis_dict['model_source'] = self.model_name
re_analysis_dict['whale_data_integrated'] = whale_data.get('data_available', False)
return re_analysis_dict
else:
print(f"❌ LLM re-analysis parsing failed for {symbol}")
return None
except Exception as e:
print(f"❌ Error in LLM re-analysis: {e}")
traceback.print_exc()
return None
def _parse_re_analysis_response(self, response_text: str, fallback_strategy: str, symbol: str) -> dict:
try:
json_str = parse_json_from_response(response_text)
if not json_str:
return None
decision_data = safe_json_parse(json_str)
if not decision_data:
print(f"❌ Failed to parse JSON (safe_json_parse) for re-analysis of {symbol}: {response_text}")
return None
action = decision_data.get('action')
if action not in ['HOLD', 'CLOSE_TRADE', 'UPDATE_TRADE']:
print(f"⚠️ LLM suggested unsupported re-analysis action ({action}) for {symbol}. Forcing HOLD.")
decision_data['action'] = 'HOLD'
if action == 'UPDATE_TRADE':
required_update_fields = ['new_stop_loss', 'new_take_profit', 'new_exit_profile', 'new_exit_parameters']
if not validate_required_fields(decision_data, required_update_fields):
print(f"❌ Missing required fields for UPDATE_TRADE for {symbol}")
missing = [f for f in required_update_fields if f not in decision_data]
print(f" MIA: {missing}")
decision_data['action'] = 'HOLD'
elif not isinstance(decision_data['new_exit_parameters'], dict):
print(f"❌ 'new_exit_parameters' is not a valid dict for {symbol}")
decision_data['action'] = 'HOLD'
strategy_value = decision_data.get('strategy')
if not strategy_value or strategy_value == 'unknown':
decision_data['strategy'] = fallback_strategy
return decision_data
except Exception as e:
print(f"Error parsing re-analysis response for {symbol}: {e}")
return None
# 🔴 --- START OF PROMPT CHANGE --- 🔴
def _create_re_analysis_prompt(
self,
trade_data: dict,
processed_data: dict,
news_text: str,
pattern_analysis: dict,
whale_data: dict,
statistical_feedback: str, # (NEW from Hub)
active_context_playbook: str # (NEW from Hub)
) -> str:
symbol = trade_data.get('symbol', 'N/A')
entry_price = trade_data.get('entry_price', 'N/A')
current_price = processed_data.get('current_price', 'N/A')
strategy = trade_data.get('strategy', 'GENERIC')
current_exit_profile = trade_data.get('decision_data', {}).get('exit_profile', 'N/A')
current_exit_params = json.dumps(trade_data.get('decision_data', {}).get('exit_parameters', {}))
# (New sections from the Learning Hub)
statistical_feedback_section = f"🧠 STATISTICAL FEEDBACK (Slow-Learner):\n{statistical_feedback}"
playbook_section = f"📚 LEARNING PLAYBOOK (Fast-Learner Active Rules):\n{active_context_playbook}"
try:
price_change = ((current_price - entry_price) / entry_price) * 100 if entry_price else 0
price_change_display = f"{price_change:+.2f}%"
except (TypeError, ZeroDivisionError):
price_change_display = "N/A"
indicators_summary = format_technical_indicators(processed_data.get('advanced_indicators', {}))
pattern_summary = self._format_pattern_analysis(pattern_analysis) if pattern_analysis else "Pattern analysis data not available for re-analysis."
whale_analysis_section = format_whale_analysis_for_llm(whale_data)
market_context_section = self._format_market_context(processed_data.get('sentiment_data', {}))
ohlcv_data = processed_data.get('raw_ohlcv') or processed_data.get('ohlcv', {})
candle_data_section = self._format_candle_data_comprehensive(ohlcv_data)
prompt = f"""
TRADE RE-ANALYSIS FOR {symbol} (SPOT ONLY - Currently Open LONG Position)
🚨 IMPORTANT SYSTEM CONSTRAINT: This is a SPOT TRADING system ONLY. The open trade is LONG. Re-analysis should decide to HOLD, CLOSE, or UPDATE this LONG position. SHORT selling is NOT possible.
📊 CURRENT TRADE CONTEXT:
* Strategy: {strategy}
* Entry Price: {entry_price} (LONG position)
* Current Price: {current_price}
* Current Performance: {price_change_display}
* Trade Age: {trade_data.get('hold_duration_minutes', 'N/A')} minutes
* Current Exit Profile: {current_exit_profile}
* Current Exit Parameters: {current_exit_params}
--- LEARNING HUB INPUT (CRITICAL) ---
{playbook_section}
{statistical_feedback_section}
--- END OF LEARNING INPUT ---
🔄 UPDATED TECHNICAL ANALYSIS:
{indicators_summary}
📈 UPDATED RAW CANDLE DATA SUMMARY & STATISTICS:
{candle_data_section}
{chr(10)}--- END OF CANDLE DATA ---{chr(10)}
🔍 UPDATED PATTERN ANALYSIS RESULTS (From System):
{pattern_summary}
🐋 UPDATED WHALE ACTIVITY:
{whale_analysis_section}
🌍 UPDATED MARKET CONTEXT:
{market_context_section if market_context_section and "No market context" not in market_context_section else "Market context data not available for this re-analysis."}
📰 LATEST NEWS:
{news_text if news_text else "No significant news found"}
---
🎯 RE-ANALYSIS INSTRUCTIONS (SPOT - LONG POSITION):
1. **ANALYZE UPDATED DATA:** Evaluate if the original LONG thesis still holds based on the updated raw candle data, technicals, patterns, whale activity, market context, and (most importantly) the 'LEARNING HUB INPUT'.
2. **DECIDE ACTION (HOLD/CLOSE/UPDATE):** Based on the comprehensive analysis, decide whether to HOLD, CLOSE_TRADE (exit the LONG position), or UPDATE_TRADE (adjust SL/TP and/or the Exit Profile for the LONG position).
3. **IF UPDATING (CRITICAL):** If action is UPDATE_TRADE, you MUST provide:
* `new_stop_loss` (New hard stop)
* `new_take_profit` (New final target)
* `new_exit_profile`: (e.g., "ATR_TRAILING") - Can be the same or different.
* `new_exit_parameters`: (e.g., {{"atr_multiplier": 1.5}}) - Must match the new profile.
4. **SELF-CRITIQUE:** Perform a self-critique. What is the risk of this re-analysis decision?
CRITICAL: The decision must be one of HOLD, CLOSE_TRADE, or UPDATE_TRADE for the existing LONG position.
OUTPUT FORMAT (JSON - SPOT RE-ANALYSIS):
{{
"action": "HOLD/CLOSE_TRADE/UPDATE_TRADE",
"reasoning": "Comprehensive justification for HOLD, CLOSE, or UPDATE of the LONG position, based on updated analysis. If UPDATE, explain why the new exit profile/parameters are better, referencing the Learning Hub input.",
"new_stop_loss": 0.000000,
"new_take_profit": 0.000000,
"new_exit_profile": "None",
"new_exit_parameters": {{}},
"new_expected_minutes": 15,
"confidence_level": 0.85,
"strategy": "{strategy}",
"whale_influence_reanalysis": "How updated whale data influenced the decision",
"pattern_influence_reanalysis": "How updated candle patterns AND provided patterns influenced the decision",
"risk_adjustment": "low/medium/high",
"self_critique": {{
"failure_modes": [
"What is the primary risk of this new decision? (e.g., 'Holding this position increases exposure to market volatility.')",
"What is the second risk? (e.g., 'Closing now might miss a future rebound.')"
],
"confidence_adjustment_reason": "Brief reason if confidence was adjusted post-critique."
}}
}}
"""
return prompt
# 🔴 --- END OF PROMPT CHANGE --- 🔴
def _format_pattern_analysis(self, pattern_analysis):
"""Helper to format pattern analysis for the LLM"""
if not pattern_analysis or not pattern_analysis.get('pattern_detected') or pattern_analysis.get('pattern_detected') == 'no_clear_pattern':
return "No clear chart pattern detected by the system."
pattern = pattern_analysis.get('pattern_detected', 'N/A')
confidence = pattern_analysis.get('pattern_confidence', 0)
direction = pattern_analysis.get('predicted_direction', 'N/A')
timeframe = pattern_analysis.get('timeframe', 'N/A') # (This key might not exist, need to check patterns.py)
# (Assuming timeframe is part of the top-level analysis)
tf_display = f"on {timeframe} timeframe" if timeframe != 'N/A' else ""
return f"System Pattern Analysis: Detected '{pattern}' {tf_display} with {confidence:.2f} confidence. Predicted direction: {direction}."
@_rate_limit_nvidia_api
async def _call_llm(self, prompt: str) -> str:
try:
for attempt in range(2):
try:
response = self.client.chat.completions.create(
model=self.model_name,
messages=[{"role": "user", "content": prompt}],
temperature=self.temperature,
seed=int(time.time()),
max_tokens=4000
)
content = response.choices[0].message.content
if content and '{' in content and '}' in content:
return content
else:
print(f"⚠️ LLM returned invalid content (attempt {attempt+1}): {content[:100]}...")
if attempt == 0: await asyncio.sleep(1)
except (RateLimitError, APITimeoutError) as e:
print(f"❌ LLM API Error (Rate Limit/Timeout): {e}. Retrying via backoff...")
raise
except Exception as e:
print(f"❌ Unexpected LLM API error (attempt {attempt+1}): {e}")
if attempt == 0: await asyncio.sleep(2)
elif attempt == 1: raise
print("❌ LLM failed to return valid content after retries.")
return ""
except Exception as e:
print(f"❌ Final failure in _call_llm after backoff retries: {e}")
raise
print("✅ LLM Service loaded - V3 (Integrated Learning Hub, English-only Prompts, Self-Critique)")