Trad / sentiment_news.py
Riy777's picture
Update sentiment_news.py
03dd882 verified
# sentiment_news.py (محدث V3.1 - إصلاح 429 بالتخزين المؤقت الذكي)
import os, asyncio
import httpx
from gnews import GNews
import feedparser
from datetime import datetime, timedelta, timezone
import time
#
# 🔴 تم التعديل: توسيع قائمة مصادر RSS لتشمل تغطية أوسع للعملات البديلة
#
CRYPTO_RSS_FEEDS = {
"Cointelegraph": "https://cointelegraph.com/rss",
"CoinDesk": "https://www.coindesk.com/arc/outboundfeeds/rss/",
"CryptoSlate": "https://cryptoslate.com/feed/",
"NewsBTC": "https://www.newsbtc.com/feed/",
"Bitcoin.com": "https://news.bitcoin.com/feed/",
"The Block": "https://www.theblock.co/rss.xml",
"Decrypt": "https://decrypt.co/feed",
"AMBCrypto": "https://ambcrypto.com/feed/",
"CryptoPotato": "https://cryptopotato.com/feed/",
"U.Today": "https://u.today/rss"
}
class NewsFetcher:
def __init__(self):
self.http_client = httpx.AsyncClient(
timeout=10.0, follow_redirects=True,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.9',
'Cache-Control': 'no-cache'
}
)
# GNews مفلترة مسبقاً لـ 3 ساعات
self.gnews = GNews(language='en', country='US', period='3h', max_results=8)
# 🔴 --- START OF CHANGE (V3.1 - Caching) --- 🔴
self.rss_cache = {}
self.cache_lock = asyncio.Lock()
self.cache_duration = timedelta(seconds=300) # (5 دقائق)
# 🔴 --- END OF CHANGE --- 🔴
async def _fetch_from_gnews(self, symbol: str) -> list:
try:
base_symbol = symbol.split("/")[0]
# نستخدم الكلمات المفتاحية السلبية لمحاولة عزل أخبار العملة نفسها
query = f'"{base_symbol}" cryptocurrency -bitcoin -ethereum -BTC -ETH'
# GNews تعمل بشكل متزامن، لذا نستخدم to_thread
news_items = await asyncio.to_thread(self.gnews.get_news, query)
#
# 🔴 تم التعديل: توحيد تنسيق الإخراج ليتضمن مفتاح 'published'
#
formatted_items = []
for item in news_items:
# GNews توفر تاريخ النشر كنص (وهو مفلتر مسبقاً لـ 3 ساعات)
published_text = item.get('published date', 'Recent')
formatted_items.append({
'title': item.get('title', 'No Title'),
'description': item.get('description', 'No Description'),
'source': item.get('source', {}).get('title', 'GNews'),
'published': published_text # تمرير التاريخ/الوقت
})
return formatted_items
except Exception as e:
print(f"Failed to fetch specific news from GNews for {symbol}: {e}")
return []
# 🔴 --- START OF CHANGE (V3.1 - Caching Logic) --- 🔴
async def _get_cached_rss_feed(self, feed_url: str, source_name: str):
"""
(جديد V3.1)
دالة مساعدة لجلب ملف RSS مع تخزين مؤقت (لمدة 5 دقائق).
هذا يمنع خطأ 429 عند طلب نفس المصدر لعدة عملات.
"""
async with self.cache_lock:
current_time = datetime.now(timezone.utc)
# 1. التحقق من الذاكرة المؤقتة
if feed_url in self.rss_cache:
cached_data, cache_time = self.rss_cache[feed_url]
if (current_time - cache_time) < self.cache_duration:
# print(f" [NewsCache] Using cached data for {source_name}")
return cached_data # (إرجاع البيانات المخزنة)
# 2. (إذا لم يكن في الذاكرة أو كان قديماً) الجلب من المصدر
# print(f" [NewsCache] Fetching fresh data for {source_name}...")
max_redirects = 2
current_url = feed_url
response_text = None
try:
for attempt in range(max_redirects):
try:
response = await self.http_client.get(current_url)
response.raise_for_status()
response_text = response.text
break
except httpx.HTTPStatusError as e:
if e.response.status_code in [301, 302, 307, 308] and 'Location' in e.response.headers:
current_url = e.response.headers['Location']
continue
else:
# (التعامل مع 429 هنا أيضاً، لا تخزن الخطأ)
if e.response.status_code == 429:
print(f" ⚠️ [NewsCache] Rate limited (429) by {source_name}. Skipping for this cycle.")
# (سنخزن "فارغ" لمنع إعادة المحاولة الفورية)
self.rss_cache[feed_url] = ([], current_time)
return []
raise
if response_text is None:
raise ValueError("Failed to fetch RSS data after redirects")
feed = feedparser.parse(response_text)
entries = feed.entries
# 3. تخزين النتيجة الجديدة
self.rss_cache[feed_url] = (entries, current_time)
return entries
except Exception as e:
print(f" ❌ [NewsCache] Failed to fetch/parse {source_name}: {e}")
# (لا تخزن الخطأ، للسماح بإعادة المحاولة في الدورة القادمة)
if feed_url in self.rss_cache:
del self.rss_cache[feed_url]
return []
# 🔴 --- END OF CHANGE --- 🔴
async def _fetch_from_rss_feed(self, feed_url: str, source_name: str, symbol: str) -> list:
try:
base_symbol = symbol.split('/')[0]
# 🔴 --- START OF CHANGE (V3.1 - Caching Logic) --- 🔴
# (الجلب من الذاكرة المؤقتة أو المصدر)
feed_entries = await self._get_cached_rss_feed(feed_url, source_name)
if not feed_entries:
return []
# 🔴 --- END OF CHANGE --- 🔴
news_items = []
search_term = base_symbol.lower()
# (حساب الوقت قبل 3 ساعات (باستخدام التوقيت العالمي UTC))
three_hours_ago = datetime.now(timezone.utc) - timedelta(hours=3)
# (إزالة قيد '[:15]' والفلترة حسب الوقت)
# (نبحث الآن في 'feed_entries' بدلاً من 'feed.entries')
for entry in feed_entries:
title = entry.title.lower() if hasattr(entry, 'title') else ''
summary = entry.summary.lower() if hasattr(entry, 'summary') else entry.description.lower() if hasattr(entry, 'description') else ''
# التحقق من تاريخ النشر
published_tuple = entry.get('published_parsed')
if not published_tuple:
continue # تخطي الخبر إذا لم يكن له تاريخ نشر
# تحويل تاريخ النشر إلى كائن datetime مع دعم التوقيت العالمي (UTC)
try:
published_time = datetime.fromtimestamp(time.mktime(published_tuple), timezone.utc)
except Exception:
# في حال فشل التحويل، نفترض أنه قديم
continue
# (تطبيق الفلتر الزمني (آخر 3 ساعات) وفلتر اسم العملة)
if (search_term in title or search_term in summary) and (published_time >= three_hours_ago):
news_items.append({
'title': entry.title,
'description': summary,
'source': source_name,
'published': published_time.isoformat() # إرسال التاريخ بتنسيق ISO
})
return news_items
except Exception as e:
# (تم نقل أخطاء 429، لذا هذا للأخطاء غير المتوقعة فقط)
print(f"Failed to fetch specific news from {source_name} RSS for {symbol}: {e}")
return []
async def get_news_for_symbol(self, symbol: str) -> str:
base_symbol = symbol.split("/")[0]
# إنشاء قائمة المهام (GNews + جميع مصادر RSS)
tasks = [self._fetch_from_gnews(symbol)]
for name, url in CRYPTO_RSS_FEEDS.items():
tasks.append(self._fetch_from_rss_feed(url, name, symbol))
results = await asyncio.gather(*tasks, return_exceptions=True)
all_news_text = []
for result in results:
if isinstance(result, Exception):
continue
for item in result:
# نستخدم الفلتر الثانوي للتأكد من أن الخبر يركز فعلاً على العملة
if self._is_directly_relevant_to_symbol(item, base_symbol):
title = item.get('title', 'No Title')
description = item.get('description', 'No Description')
source = item.get('source', 'Unknown Source')
# (هذا المنطق سيعمل الآن بفضل التعديلات السابقة)
published = item.get('published', '') # الحصول على التاريخ/الوقت
news_entry = f"[{source}] {title}. {description}"
# (إضافة وقت النشر إلى النص النهائي (تنفيذاً لطلبك))
if published:
news_entry += f" (Published: {published})"
all_news_text.append(news_entry)
if not all_news_text:
return f"No specific news found for {base_symbol} in the last 3 hours."
# أخذ أهم 5 أخبار (الأكثر حداثة أو صلة)
important_news = all_news_text[:5]
return " | ".join(important_news)
def _is_directly_relevant_to_symbol(self, news_item, base_symbol):
"""
فلتر ثانوي للتأكد من أن الخبر ليس مجرد ذكر عابر للعملة،
بل يتعلق فعلاً بجوانب التداول أو السوق.
"""
title = news_item.get('title', '').lower()
description = news_item.get('description', '').lower()
symbol_lower = base_symbol.lower()
# يجب أن يكون الرمز موجوداً في العنوان أو الوصف
if symbol_lower not in title and symbol_lower not in description:
return False
# يجب أن يحتوي الخبر على كلمات مفتاحية تدل على أنه خبر مالي/تداولي
crypto_keywords = [
'crypto', 'cryptocurrency', 'token', 'blockchain',
'price', 'market', 'trading', 'exchange', 'defi',
'coin', 'digital currency', 'altcoin', 'airdrop', 'listing',
'partnership', 'update', 'mainnet', 'protocol'
]
return any(keyword in title or keyword in description for keyword in crypto_keywords)
# --- (تم تنقيح هذا الكلاس ليعكس الواقع) ---
class SentimentAnalyzer:
def __init__(self, data_manager):
self.data_manager = data_manager
async def get_market_sentiment(self):
"""
جلب سياق السوق العام (يعتمد بالكامل على DataManager).
"""
try:
# هذه الدالة تجلب (BTC/ETH/Fear&Greed) فقط
market_context = await self.data_manager.get_market_context_async()
if not market_context:
return await self.get_fallback_market_context()
return market_context
except Exception as e:
print(f"Failed to get market sentiment: {e}")
return await self.get_fallback_market_context()
async def get_fallback_market_context(self):
"""
(محدث) سياق احتياطي مبسط يعكس الواقع (بدون بيانات حيتان عامة).
"""
return {
'timestamp': datetime.now().isoformat(),
'btc_sentiment': 'NEUTRAL',
'fear_and_greed_index': 50,
'sentiment_class': 'NEUTRAL',
'market_trend': 'UNKNOWN',
'data_quality': 'LOW'
}
print("✅ Sentiment News loaded - V3.1 (Intelligent Caching)")