Spaces:
Running
Running
| # ml_engine/patterns.py | |
| import pandas as pd | |
| import numpy as np | |
| class ChartPatternAnalyzer: | |
| def __init__(self): | |
| self.pattern_cache = {} | |
| async def detect_chart_patterns(self, ohlcv_data): | |
| """اكتشاف الأنماط البيانية لجميع الأطر الزمنية""" | |
| patterns = { | |
| 'pattern_detected': 'no_clear_pattern', | |
| 'pattern_confidence': 0, | |
| 'predicted_direction': 'neutral', | |
| 'timeframe_analysis': {}, | |
| 'all_patterns': [] | |
| } | |
| try: | |
| for timeframe, candles in ohlcv_data.items(): | |
| if candles and len(candles) >= 20: | |
| dataframe = self._create_dataframe(candles) | |
| timeframe_pattern = await self._analyze_timeframe_patterns(dataframe, timeframe) | |
| patterns['timeframe_analysis'][timeframe] = timeframe_pattern | |
| patterns['all_patterns'].append(timeframe_pattern) | |
| if timeframe_pattern['confidence'] > patterns['pattern_confidence']: | |
| patterns.update({ | |
| 'pattern_detected': timeframe_pattern['pattern'], | |
| 'pattern_confidence': timeframe_pattern['confidence'], | |
| 'predicted_direction': timeframe_pattern['direction'] | |
| }) | |
| return patterns | |
| except Exception as e: | |
| print(f"❌ خطأ في اكتشاف الأنماط: {e}") | |
| return patterns | |
| def _create_dataframe(self, candles): | |
| """إنشاء DataFrame من بيانات الشموع""" | |
| try: | |
| df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) | |
| df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float) | |
| return df | |
| except Exception as e: | |
| print(f"❌ خطأ في إنشاء DataFrame: {e}") | |
| return pd.DataFrame() | |
| async def _analyze_timeframe_patterns(self, dataframe, timeframe): | |
| """تحليل الأنماط لإطار زمني محدد""" | |
| pattern_info = { | |
| 'pattern': 'no_clear_pattern', | |
| 'confidence': 0, | |
| 'direction': 'neutral', | |
| 'timeframe': timeframe, | |
| 'details': {} | |
| } | |
| try: | |
| if dataframe is None or dataframe.empty or len(dataframe) < 20: | |
| return pattern_info | |
| closes = dataframe['close'].values | |
| highs = dataframe['high'].values | |
| lows = dataframe['low'].values | |
| current_price = closes[-1] | |
| patterns_detected = [] | |
| double_pattern = self._detect_double_pattern(highs, lows, closes) | |
| if double_pattern['detected']: | |
| patterns_detected.append(double_pattern) | |
| breakout_pattern = self._detect_breakout_pattern(highs, lows, closes) | |
| if breakout_pattern['detected']: | |
| patterns_detected.append(breakout_pattern) | |
| trend_pattern = self._detect_trend_pattern(dataframe) | |
| if trend_pattern['detected']: | |
| patterns_detected.append(trend_pattern) | |
| support_resistance_pattern = self._detect_support_resistance(highs, lows, closes) | |
| if support_resistance_pattern['detected']: | |
| patterns_detected.append(support_resistance_pattern) | |
| if patterns_detected: | |
| best_pattern = max(patterns_detected, key=lambda x: x['confidence']) | |
| pattern_info.update({ | |
| 'pattern': best_pattern['pattern'], | |
| 'confidence': best_pattern['confidence'], | |
| 'direction': best_pattern.get('direction', 'neutral'), | |
| 'details': best_pattern.get('details', {}) | |
| }) | |
| return pattern_info | |
| except Exception as e: | |
| print(f"❌ خطأ في تحليل الأنماط للإطار {timeframe}: {e}") | |
| return pattern_info | |
| def _detect_double_pattern(self, highs, lows, closes): | |
| """كشف نمط القمة المزدوجة أو القاع المزدوج""" | |
| try: | |
| if len(highs) < 15: | |
| return {'detected': False} | |
| recent_highs = highs[-15:] | |
| recent_lows = lows[-15:] | |
| high_indices = np.argsort(recent_highs)[-2:] | |
| high_indices.sort() | |
| low_indices = np.argsort(recent_lows)[:2] | |
| low_indices.sort() | |
| double_top = False | |
| double_bottom = False | |
| if len(high_indices) == 2: | |
| high1 = recent_highs[high_indices[0]] | |
| high2 = recent_highs[high_indices[1]] | |
| time_diff = high_indices[1] - high_indices[0] | |
| if (abs(high1 - high2) / high1 < 0.02 and | |
| time_diff >= 3 and time_diff <= 10 and | |
| closes[-1] < min(high1, high2)): | |
| double_top = True | |
| if len(low_indices) == 2: | |
| low1 = recent_lows[low_indices[0]] | |
| low2 = recent_lows[low_indices[1]] | |
| time_diff = low_indices[1] - low_indices[0] | |
| if (abs(low1 - low2) / low1 < 0.02 and | |
| time_diff >= 3 and time_diff <= 10 and | |
| closes[-1] > max(low1, low2)): | |
| double_bottom = True | |
| if double_top: | |
| return { | |
| 'detected': True, | |
| 'pattern': 'Double Top', | |
| 'confidence': 0.75, | |
| 'direction': 'down', | |
| 'details': { | |
| 'resistance_level': np.mean([high1, high2]), | |
| 'breakdown_level': min(lows[-5:]) | |
| } | |
| } | |
| elif double_bottom: | |
| return { | |
| 'detected': True, | |
| 'pattern': 'Double Bottom', | |
| 'confidence': 0.75, | |
| 'direction': 'up', | |
| 'details': { | |
| 'support_level': np.mean([low1, low2]), | |
| 'breakout_level': max(highs[-5:]) | |
| } | |
| } | |
| return {'detected': False} | |
| except Exception as e: | |
| return {'detected': False} | |
| def _detect_breakout_pattern(self, highs, lows, closes): | |
| """كشف نمط الاختراق""" | |
| try: | |
| if len(highs) < 25: | |
| return {'detected': False} | |
| current_price = closes[-1] | |
| resistance = np.max(highs[-25:-5]) | |
| support = np.min(lows[-25:-5]) | |
| if current_price > resistance * 1.01: | |
| return { | |
| 'detected': True, | |
| 'pattern': 'Breakout Up', | |
| 'confidence': 0.8, | |
| 'direction': 'up', | |
| 'details': { | |
| 'breakout_level': resistance, | |
| 'target_level': resistance * 1.05 | |
| } | |
| } | |
| elif current_price < support * 0.99: | |
| return { | |
| 'detected': True, | |
| 'pattern': 'Breakout Down', | |
| 'confidence': 0.8, | |
| 'direction': 'down', | |
| 'details': { | |
| 'breakdown_level': support, | |
| 'target_level': support * 0.95 | |
| } | |
| } | |
| return {'detected': False} | |
| except Exception as e: | |
| return {'detected': False} | |
| def _detect_trend_pattern(self, dataframe): | |
| """كشف نمط الاتجاه""" | |
| try: | |
| if dataframe is None or dataframe.empty or len(dataframe) < 20: | |
| return {'detected': False} | |
| closes = dataframe['close'].values | |
| ma_short = np.mean(closes[-5:]) | |
| ma_medium = np.mean(closes[-13:]) | |
| ma_long = np.mean(closes[-21:]) | |
| if ma_short > ma_medium > ma_long and closes[-1] > ma_short: | |
| trend_strength = (ma_short - ma_long) / ma_long | |
| confidence = min(0.3 + trend_strength * 10, 0.8) | |
| return { | |
| 'detected': True, | |
| 'pattern': 'Uptrend', | |
| 'confidence': confidence, | |
| 'direction': 'up', | |
| 'details': { | |
| 'trend_strength': trend_strength, | |
| 'support_level': ma_medium | |
| } | |
| } | |
| elif ma_short < ma_medium < ma_long and closes[-1] < ma_short: | |
| trend_strength = (ma_long - ma_short) / ma_long | |
| confidence = min(0.3 + trend_strength * 10, 0.8) | |
| return { | |
| 'detected': True, | |
| 'pattern': 'Downtrend', | |
| 'confidence': confidence, | |
| 'direction': 'down', | |
| 'details': { | |
| 'trend_strength': trend_strength, | |
| 'resistance_level': ma_medium | |
| } | |
| } | |
| return {'detected': False} | |
| except Exception as e: | |
| return {'detected': False} | |
| def _detect_support_resistance(self, highs, lows, closes): | |
| """كشف مستويات الدعم والمقاومة""" | |
| try: | |
| if len(highs) < 20: | |
| return {'detected': False} | |
| current_price = closes[-1] | |
| resistance_level = np.max(highs[-20:]) | |
| support_level = np.min(lows[-20:]) | |
| position = (current_price - support_level) / (resistance_level - support_level) | |
| if position < 0.2: | |
| return { | |
| 'detected': True, | |
| 'pattern': 'Near Support', | |
| 'confidence': 0.6, | |
| 'direction': 'up', | |
| 'details': { | |
| 'support_level': support_level, | |
| 'resistance_level': resistance_level, | |
| 'position': position | |
| } | |
| } | |
| elif position > 0.8: | |
| return { | |
| 'detected': True, | |
| 'pattern': 'Near Resistance', | |
| 'confidence': 0.6, | |
| 'direction': 'down', | |
| 'details': { | |
| 'support_level': support_level, | |
| 'resistance_level': resistance_level, | |
| 'position': position | |
| } | |
| } | |
| return {'detected': False} | |
| except Exception as e: | |
| return {'detected': False} | |
| print("✅ ML Module: Chart Pattern Analyzer loaded") |