Trad / ml_engine /patterns.py
Riy777's picture
Create patterns.py
0bf918d
raw
history blame
11.4 kB
# ml_engine/patterns.py
import pandas as pd
import numpy as np
class ChartPatternAnalyzer:
def __init__(self):
self.pattern_cache = {}
async def detect_chart_patterns(self, ohlcv_data):
"""اكتشاف الأنماط البيانية لجميع الأطر الزمنية"""
patterns = {
'pattern_detected': 'no_clear_pattern',
'pattern_confidence': 0,
'predicted_direction': 'neutral',
'timeframe_analysis': {},
'all_patterns': []
}
try:
for timeframe, candles in ohlcv_data.items():
if candles and len(candles) >= 20:
dataframe = self._create_dataframe(candles)
timeframe_pattern = await self._analyze_timeframe_patterns(dataframe, timeframe)
patterns['timeframe_analysis'][timeframe] = timeframe_pattern
patterns['all_patterns'].append(timeframe_pattern)
if timeframe_pattern['confidence'] > patterns['pattern_confidence']:
patterns.update({
'pattern_detected': timeframe_pattern['pattern'],
'pattern_confidence': timeframe_pattern['confidence'],
'predicted_direction': timeframe_pattern['direction']
})
return patterns
except Exception as e:
print(f"❌ خطأ في اكتشاف الأنماط: {e}")
return patterns
def _create_dataframe(self, candles):
"""إنشاء DataFrame من بيانات الشموع"""
try:
df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
return df
except Exception as e:
print(f"❌ خطأ في إنشاء DataFrame: {e}")
return pd.DataFrame()
async def _analyze_timeframe_patterns(self, dataframe, timeframe):
"""تحليل الأنماط لإطار زمني محدد"""
pattern_info = {
'pattern': 'no_clear_pattern',
'confidence': 0,
'direction': 'neutral',
'timeframe': timeframe,
'details': {}
}
try:
if dataframe is None or dataframe.empty or len(dataframe) < 20:
return pattern_info
closes = dataframe['close'].values
highs = dataframe['high'].values
lows = dataframe['low'].values
current_price = closes[-1]
patterns_detected = []
double_pattern = self._detect_double_pattern(highs, lows, closes)
if double_pattern['detected']:
patterns_detected.append(double_pattern)
breakout_pattern = self._detect_breakout_pattern(highs, lows, closes)
if breakout_pattern['detected']:
patterns_detected.append(breakout_pattern)
trend_pattern = self._detect_trend_pattern(dataframe)
if trend_pattern['detected']:
patterns_detected.append(trend_pattern)
support_resistance_pattern = self._detect_support_resistance(highs, lows, closes)
if support_resistance_pattern['detected']:
patterns_detected.append(support_resistance_pattern)
if patterns_detected:
best_pattern = max(patterns_detected, key=lambda x: x['confidence'])
pattern_info.update({
'pattern': best_pattern['pattern'],
'confidence': best_pattern['confidence'],
'direction': best_pattern.get('direction', 'neutral'),
'details': best_pattern.get('details', {})
})
return pattern_info
except Exception as e:
print(f"❌ خطأ في تحليل الأنماط للإطار {timeframe}: {e}")
return pattern_info
def _detect_double_pattern(self, highs, lows, closes):
"""كشف نمط القمة المزدوجة أو القاع المزدوج"""
try:
if len(highs) < 15:
return {'detected': False}
recent_highs = highs[-15:]
recent_lows = lows[-15:]
high_indices = np.argsort(recent_highs)[-2:]
high_indices.sort()
low_indices = np.argsort(recent_lows)[:2]
low_indices.sort()
double_top = False
double_bottom = False
if len(high_indices) == 2:
high1 = recent_highs[high_indices[0]]
high2 = recent_highs[high_indices[1]]
time_diff = high_indices[1] - high_indices[0]
if (abs(high1 - high2) / high1 < 0.02 and
time_diff >= 3 and time_diff <= 10 and
closes[-1] < min(high1, high2)):
double_top = True
if len(low_indices) == 2:
low1 = recent_lows[low_indices[0]]
low2 = recent_lows[low_indices[1]]
time_diff = low_indices[1] - low_indices[0]
if (abs(low1 - low2) / low1 < 0.02 and
time_diff >= 3 and time_diff <= 10 and
closes[-1] > max(low1, low2)):
double_bottom = True
if double_top:
return {
'detected': True,
'pattern': 'Double Top',
'confidence': 0.75,
'direction': 'down',
'details': {
'resistance_level': np.mean([high1, high2]),
'breakdown_level': min(lows[-5:])
}
}
elif double_bottom:
return {
'detected': True,
'pattern': 'Double Bottom',
'confidence': 0.75,
'direction': 'up',
'details': {
'support_level': np.mean([low1, low2]),
'breakout_level': max(highs[-5:])
}
}
return {'detected': False}
except Exception as e:
return {'detected': False}
def _detect_breakout_pattern(self, highs, lows, closes):
"""كشف نمط الاختراق"""
try:
if len(highs) < 25:
return {'detected': False}
current_price = closes[-1]
resistance = np.max(highs[-25:-5])
support = np.min(lows[-25:-5])
if current_price > resistance * 1.01:
return {
'detected': True,
'pattern': 'Breakout Up',
'confidence': 0.8,
'direction': 'up',
'details': {
'breakout_level': resistance,
'target_level': resistance * 1.05
}
}
elif current_price < support * 0.99:
return {
'detected': True,
'pattern': 'Breakout Down',
'confidence': 0.8,
'direction': 'down',
'details': {
'breakdown_level': support,
'target_level': support * 0.95
}
}
return {'detected': False}
except Exception as e:
return {'detected': False}
def _detect_trend_pattern(self, dataframe):
"""كشف نمط الاتجاه"""
try:
if dataframe is None or dataframe.empty or len(dataframe) < 20:
return {'detected': False}
closes = dataframe['close'].values
ma_short = np.mean(closes[-5:])
ma_medium = np.mean(closes[-13:])
ma_long = np.mean(closes[-21:])
if ma_short > ma_medium > ma_long and closes[-1] > ma_short:
trend_strength = (ma_short - ma_long) / ma_long
confidence = min(0.3 + trend_strength * 10, 0.8)
return {
'detected': True,
'pattern': 'Uptrend',
'confidence': confidence,
'direction': 'up',
'details': {
'trend_strength': trend_strength,
'support_level': ma_medium
}
}
elif ma_short < ma_medium < ma_long and closes[-1] < ma_short:
trend_strength = (ma_long - ma_short) / ma_long
confidence = min(0.3 + trend_strength * 10, 0.8)
return {
'detected': True,
'pattern': 'Downtrend',
'confidence': confidence,
'direction': 'down',
'details': {
'trend_strength': trend_strength,
'resistance_level': ma_medium
}
}
return {'detected': False}
except Exception as e:
return {'detected': False}
def _detect_support_resistance(self, highs, lows, closes):
"""كشف مستويات الدعم والمقاومة"""
try:
if len(highs) < 20:
return {'detected': False}
current_price = closes[-1]
resistance_level = np.max(highs[-20:])
support_level = np.min(lows[-20:])
position = (current_price - support_level) / (resistance_level - support_level)
if position < 0.2:
return {
'detected': True,
'pattern': 'Near Support',
'confidence': 0.6,
'direction': 'up',
'details': {
'support_level': support_level,
'resistance_level': resistance_level,
'position': position
}
}
elif position > 0.8:
return {
'detected': True,
'pattern': 'Near Resistance',
'confidence': 0.6,
'direction': 'down',
'details': {
'support_level': support_level,
'resistance_level': resistance_level,
'position': position
}
}
return {'detected': False}
except Exception as e:
return {'detected': False}
print("✅ ML Module: Chart Pattern Analyzer loaded")