# sentiment_news.py (محدث V3.2 - توسيع نافذة البحث إلى 12 ساعة) import os, asyncio import httpx from gnews import GNews import feedparser from datetime import datetime, timedelta, timezone import time # # 🔴 تم التعديل: توسيع قائمة مصادر RSS لتشمل تغطية أوسع للعملات البديلة # CRYPTO_RSS_FEEDS = { "Cointelegraph": "https://cointelegraph.com/rss", "CoinDesk": "https://www.coindesk.com/arc/outboundfeeds/rss/", "CryptoSlate": "https://cryptoslate.com/feed/", "NewsBTC": "https://www.newsbtc.com/feed/", "Bitcoin.com": "https://news.bitcoin.com/feed/", "The Block": "https://www.theblock.co/rss.xml", "Decrypt": "https://decrypt.co/feed", "AMBCrypto": "https://ambcrypto.com/feed/", "CryptoPotato": "https://cryptopotato.com/feed/", "U.Today": "https://u.today/rss" } class NewsFetcher: def __init__(self): self.http_client = httpx.AsyncClient( timeout=10.0, follow_redirects=True, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'en-US,en;q=0.9', 'Cache-Control': 'no-cache' } ) # 🔴 --- START OF CHANGE (V3.2) --- 🔴 # (توسيع نافذة البحث إلى 12 ساعة وزيادة النتائج) self.gnews = GNews(language='en', country='US', period='12h', max_results=15) # 🔴 --- END OF CHANGE --- 🔴 self.rss_cache = {} self.cache_lock = asyncio.Lock() self.cache_duration = timedelta(seconds=300) # (5 دقائق) async def _fetch_from_gnews(self, symbol: str) -> list: try: base_symbol = symbol.split("/")[0] # نستخدم الكلمات المفتاحية السلبية لمحاولة عزل أخبار العملة نفسها query = f'"{base_symbol}" cryptocurrency -bitcoin -ethereum -BTC -ETH' # GNews تعمل بشكل متزامن، لذا نستخدم to_thread news_items = await asyncio.to_thread(self.gnews.get_news, query) formatted_items = [] for item in news_items: # GNews توفر تاريخ النشر كنص (مفلتر مسبقاً) published_text = item.get('published date', 'Recent') formatted_items.append({ 'title': item.get('title', 'No Title'), 'description': item.get('description', 'No Description'), 'source': item.get('source', {}).get('title', 'GNews'), 'published': published_text # تمرير التاريخ/الوقت }) return formatted_items except Exception as e: print(f"Failed to fetch specific news from GNews for {symbol}: {e}") return [] async def _get_cached_rss_feed(self, feed_url: str, source_name: str): """ (جديد V3.1) دالة مساعدة لجلب ملف RSS مع تخزين مؤقت (لمدة 5 دقائق). هذا يمنع خطأ 429 عند طلب نفس المصدر لعدة عملات. """ async with self.cache_lock: current_time = datetime.now(timezone.utc) # 1. التحقق من الذاكرة المؤقتة if feed_url in self.rss_cache: cached_data, cache_time = self.rss_cache[feed_url] if (current_time - cache_time) < self.cache_duration: # print(f" [NewsCache] Using cached data for {source_name}") return cached_data # (إرجاع البيانات المخزنة) # 2. (إذا لم يكن في الذاكرة أو كان قديماً) الجلب من المصدر # print(f" [NewsCache] Fetching fresh data for {source_name}...") max_redirects = 2 current_url = feed_url response_text = None try: for attempt in range(max_redirects): try: response = await self.http_client.get(current_url) response.raise_for_status() response_text = response.text break except httpx.HTTPStatusError as e: if e.response.status_code in [301, 302, 307, 308] and 'Location' in e.response.headers: current_url = e.response.headers['Location'] continue else: # (التعامل مع 429 هنا أيضاً، لا تخزن الخطأ) if e.response.status_code == 429: print(f" ⚠️ [NewsCache] Rate limited (429) by {source_name}. Skipping for this cycle.") # (سنخزن "فارغ" لمنع إعادة المحاولة الفورية) self.rss_cache[feed_url] = ([], current_time) return [] raise if response_text is None: raise ValueError("Failed to fetch RSS data after redirects") feed = feedparser.parse(response_text) entries = feed.entries # 3. تخزين النتيجة الجديدة self.rss_cache[feed_url] = (entries, current_time) return entries except Exception as e: print(f" ❌ [NewsCache] Failed to fetch/parse {source_name}: {e}") # (لا تخزن الخطأ، للسماح بإعادة المحاولة في الدورة القادمة) if feed_url in self.rss_cache: del self.rss_cache[feed_url] return [] async def _fetch_from_rss_feed(self, feed_url: str, source_name: str, symbol: str) -> list: try: base_symbol = symbol.split('/')[0] # (الجلب من الذاكرة المؤقتة أو المصدر) feed_entries = await self._get_cached_rss_feed(feed_url, source_name) if not feed_entries: return [] news_items = [] search_term = base_symbol.lower() # 🔴 --- START OF CHANGE (V3.2) --- 🔴 # (حساب الوقت قبل 12 ساعة (باستخدام التوقيت العالمي UTC)) twelve_hours_ago = datetime.now(timezone.utc) - timedelta(hours=12) # 🔴 --- END OF CHANGE --- 🔴 for entry in feed_entries: title = entry.title.lower() if hasattr(entry, 'title') else '' summary = entry.summary.lower() if hasattr(entry, 'summary') else entry.description.lower() if hasattr(entry, 'description') else '' # التحقق من تاريخ النشر published_tuple = entry.get('published_parsed') if not published_tuple: continue # تخطي الخبر إذا لم يكن له تاريخ نشر # تحويل تاريخ النشر إلى كائن datetime مع دعم التوقيت العالمي (UTC) try: published_time = datetime.fromtimestamp(time.mktime(published_tuple), timezone.utc) except Exception: # في حال فشل التحويل، نفترض أنه قديم continue # 🔴 --- START OF CHANGE (V3.2) --- 🔴 # (تطبيق الفلتر الزمني (آخر 12 ساعة) وفلتر اسم العملة) if (search_term in title or search_term in summary) and (published_time >= twelve_hours_ago): # 🔴 --- END OF CHANGE --- 🔴 news_items.append({ 'title': entry.title, 'description': summary, 'source': source_name, 'published': published_time.isoformat() # إرسال التاريخ بتنسيق ISO }) return news_items except Exception as e: print(f"Failed to fetch specific news from {source_name} RSS for {symbol}: {e}") return [] async def get_news_for_symbol(self, symbol: str) -> str: base_symbol = symbol.split("/")[0] # إنشاء قائمة المهام (GNews + جميع مصادر RSS) tasks = [self._fetch_from_gnews(symbol)] for name, url in CRYPTO_RSS_FEEDS.items(): tasks.append(self._fetch_from_rss_feed(url, name, symbol)) results = await asyncio.gather(*tasks, return_exceptions=True) all_news_text = [] for result in results: if isinstance(result, Exception): continue for item in result: # نستخدم الفلتر الثانوي للتأكد من أن الخبر يركز فعلاً على العملة if self._is_directly_relevant_to_symbol(item, base_symbol): title = item.get('title', 'No Title') description = item.get('description', 'No Description') source = item.get('source', 'Unknown Source') published = item.get('published', '') # الحصول على التاريخ/الوقت news_entry = f"[{source}] {title}. {description}" if published: news_entry += f" (Published: {published})" all_news_text.append(news_entry) if not all_news_text: # 🔴 --- START OF CHANGE (V3.2) --- 🔴 return f"No specific news found for {base_symbol} in the last 12 hours." # 🔴 --- END OF CHANGE --- 🔴 # أخذ أهم 5 أخبار (الأكثر حداثة أو صلة) important_news = all_news_text[:5] return " | ".join(important_news) def _is_directly_relevant_to_symbol(self, news_item, base_symbol): """ فلتر ثانوي للتأكد من أن الخبر ليس مجرد ذكر عابر للعملة، بل يتعلق فعلاً بجوانب التداول أو السوق. """ title = news_item.get('title', '').lower() description = news_item.get('description', '').lower() symbol_lower = base_symbol.lower() # يجب أن يكون الرمز موجوداً في العنوان أو الوصف if symbol_lower not in title and symbol_lower not in description: return False # يجب أن يحتوي الخبر على كلمات مفتاحية تدل على أنه خبر مالي/تداولي crypto_keywords = [ 'crypto', 'cryptocurrency', 'token', 'blockchain', 'price', 'market', 'trading', 'exchange', 'defi', 'coin', 'digital currency', 'altcoin', 'airdrop', 'listing', 'partnership', 'update', 'mainnet', 'protocol' ] return any(keyword in title or keyword in description for keyword in crypto_keywords) # --- (تم تنقيح هذا الكلاس ليعكس الواقع) --- class SentimentAnalyzer: def __init__(self, data_manager): self.data_manager = data_manager async def get_market_sentiment(self): """ جلب سياق السوق العام (يعتمد بالكامل على DataManager). """ try: # هذه الدالة تجلب (BTC/ETH/Fear&Greed) فقط market_context = await self.data_manager.get_market_context_async() if not market_context: return await self.get_fallback_market_context() return market_context except Exception as e: print(f"Failed to get market sentiment: {e}") return await self.get_fallback_market_context() async def get_fallback_market_context(self): """ (محدث) سياق احتياطي مبسط يعكس الواقع (بدون بيانات حيتان عامة). """ return { 'timestamp': datetime.now().isoformat(), 'btc_sentiment': 'NEUTRAL', 'fear_and_greed_index': 50, 'sentiment_class': 'NEUTRAL', 'market_trend': 'UNKNOWN', 'data_quality': 'LOW' } # 🔴 --- START OF CHANGE (V3.2) --- 🔴 print("✅ Sentiment News loaded - V3.2 (Expanded 12h Window)") # 🔴 --- END OF CHANGE --- 🔴