Trad / learning_hub /reflector.py
Riy777's picture
Create reflector.py
ed6a20b
raw
history blame
6.8 kB
# learning_hub/reflector.py
import json
import traceback
from typing import Dict, Any, TYPE_CHECKING
from .schemas import TraceLog, ReflectorOutput
from .memory_store import MemoryStore
# (استخدام TYPE_CHECKING لتجنب الاستيراد الدائري الفعلي)
if TYPE_CHECKING:
from LLM import LLMService
class Reflector:
def __init__(self, llm_service: 'LLMService', memory_store: MemoryStore):
self.llm_service = llm_service
self.memory_store = memory_store
print("✅ Learning Hub Module: Reflector (Fast-Learner) loaded")
async def analyze_trade_outcome(self, trade_object: Dict[str, Any], close_reason: str):
"""
Analyzes the trade outcome using LLM to generate a learning 'Delta' (rule).
(Implements Point 2 & 4 of the 16-point plan)
"""
try:
# 1. Create the TraceLog
# (Note: We will later modify TradeManager to store
# 'market_context_at_decision' and 'indicators_at_decision'
# inside 'decision_data' when opening the trade)
decision_data = trade_object.get('decision_data', {})
trace_log = TraceLog(
decision_context=decision_data,
market_context_at_decision=decision_data.get('market_context_at_decision', {}),
indicators_at_decision=decision_data.get('indicators_at_decision', {}),
closed_trade_object=trade_object,
actual_outcome_reason=close_reason
)
# 2. Create the Reflector Prompt (Now in English)
prompt = self._create_reflector_prompt(trace_log)
# 3. Call the LLM
response_text = await self.llm_service._call_llm(prompt)
if not response_text:
raise ValueError("Reflector LLM call returned no response.")
# 4. Parse the response
# (We use the enhanced parser from LLM.py which handles JSON)
reflector_json = self.llm_service._parse_llm_response_enhanced(
response_text,
fallback_strategy="reflection",
symbol=trade_object.get('symbol', 'N/A')
)
if not reflector_json:
raise ValueError(f"Failed to parse Reflector LLM response: {response_text}")
# (Validate against the strict schema from schemas.py)
reflector_output = ReflectorOutput(**reflector_json)
# 5. Determine the 'Domain' for the Delta
strategy = trade_object.get('strategy', 'general')
domain = self._determine_domain(strategy, reflector_output.error_mode)
# 6. Save the suggested 'Delta' to the Memory Store
# (MemoryStore will use PolicyEngine to decide on auto-approval)
await self.memory_store.save_new_delta(
reflector_output=reflector_output,
trade_object=trade_object,
domain=domain
)
print(f"✅ [Reflector] Successfully analyzed {trade_object.get('symbol')}. New Delta created.")
except Exception as e:
print(f"❌ [Reflector] Failed to analyze trade outcome for {trade_object.get('symbol')}: {e}")
traceback.print_exc()
def _determine_domain(self, strategy: str, error_mode: str) -> str:
"""Determines the domain the suggested Delta belongs to."""
error_mode = error_mode.lower()
if "pattern" in error_mode or "triangle" in error_mode or "flag" in error_mode:
return "pattern"
if "indicator" in error_mode or "rsi" in error_mode or "macd" in error_mode:
return "indicator"
if "monte_carlo" in error_mode or "garch" in error_mode or "simulation" in error_mode:
return "monte_carlo"
if "strategy" in error_mode or "exit" in error_mode or "entry" in error_mode:
return "strategy"
# Default to the strategy's domain
if strategy in ["trend_following", "mean_reversion", "breakout_momentum"]:
return "strategy"
return "general"
def _create_reflector_prompt(self, trace_log: TraceLog) -> str:
"""
Creates the (English-only) prompt for the LLM to act as a Reflector.
(Implements Point 4 - Reflector prompt)
"""
trade = trace_log.closed_trade_object
pnl_percent = trade.get('pnl_percent', 0)
# Determine initial success
is_success = pnl_percent > 0.1 # (Consider any small profit a success)
prompt = f"""
SYSTEM: You are an expert trading analyst Reflector. Your task is to analyze a completed trade "Trace" and determine the cause of success or failure. You must suggest a concise "Rule" (Delta) (max 25 words) to improve future performance.
--- TRACE LOG START ---
1. **Original Decision Context (What we decided):**
* Strategy Used: {trade.get('strategy', 'N/A')}
* Exit Profile: {trade.get('decision_data', {}).get('exit_profile', 'N/A')}
* Reasoning (at entry): {trade.get('decision_data', {}).get('reasoning', 'N/A')[:200]}...
* Entry Price: {trade.get('entry_price')}
* Initial Stop Loss: {trade.get('stop_loss')}
* Initial Take Profit: {trade.get('take_profit')}
2. **Environment Context (When we decided):**
* Market Context: {json.dumps(trace_log.market_context_at_decision)}
* Key Indicators: {json.dumps(trace_log.indicators_at_decision)}
3. **Actual Outcome (What happened):**
* Close Price: {trade.get('close_price')}
* Final PnL: {pnl_percent:+.2f}%
* Close Reason: {trace_log.actual_outcome_reason}
* Trade Duration: {trade.get('hold_duration_minutes', 'N/A')} minutes
--- TRACE LOG END ---
TASK: Analyze the Trace above.
1. Compare the "Actual Outcome" with the "Original Decision Context".
2. Was the original decision correct given what happened?
3. Was the "Close Reason" optimal? (e.g., Did it close too early? Too late?)
4. Identify the primary "Error Mode" (e.g., 'ignored_volatility', 'premature_exit_by_ATR') or "Success Factor" (e.g., 'correct_pattern_identification').
5. Suggest ONE concise "Rule" (Delta) (max 25 words) to improve performance.
OUTPUT FORMAT (JSON Only - Adhere strictly to this schema):
{{
"success": {str(is_success).lower()},
"score": 0.0,
"error_mode": "Short description of the error mode or success factor (e.g., 'ignored_high_market_volatility').",
"suggested_rule": "The concise 25-word rule for future use (e.g., 'If market volatility is HIGH, increase ATR multiplier for stop loss.').",
"confidence": 0.0
}}
"""
return prompt