Trad / LLM.py
Riy777's picture
Update LLM.py
a1e2a1a verified
# LLM.py (V19.3 - NVIDIA Engine + Vader Fix)
import os, traceback, json, time, re
import httpx
from datetime import datetime
from typing import List, Dict, Any, Optional
# (استخدام مكتبة OpenAI الرسمية بدلاً من httpx)
from openai import AsyncOpenAI, RateLimitError, APIError
try:
from r2 import R2Service
from learning_hub.hub_manager import LearningHubManager # (استيراد العقل)
except ImportError:
print("❌ [LLMService] فشل استيراد R2Service أو LearningHubManager")
R2Service = None
LearningHubManager = None
# (V8.1) استيراد NewsFetcher
try:
from sentiment_news import NewsFetcher
except ImportError:
NewsFetcher = None
# 🔴 --- START OF CHANGE (V19.3) --- 🔴
# (استيراد VADER هنا أيضاً للـ type hinting)
try:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
except ImportError:
SentimentIntensityAnalyzer = None
# 🔴 --- END OF CHANGE --- 🔴
# (تحديث الإعدادات الافتراضية لتطابق NVIDIA)
LLM_API_URL = os.getenv("LLM_API_URL", "https://integrate.api.nvidia.com/v1")
LLM_API_KEY = os.getenv("LLM_API_KEY") # (هذا هو المفتاح الذي سيتم استخدامه)
LLM_MODEL = os.getenv("LLM_MODEL", "nvidia/llama-3.1-nemotron-ultra-253b-v1")
# (البارامترات المحددة من طرفك)
LLM_TEMPERATURE = 0.2
LLM_TOP_P = 0.7
LLM_MAX_TOKENS = 16384
LLM_FREQUENCY_PENALTY = 0.8
LLM_PRESENCE_PENALTY = 0.5
# إعدادات العميل
CLIENT_TIMEOUT = 300.0
class LLMService:
def __init__(self):
if not LLM_API_KEY:
raise ValueError("❌ [LLMService] متغير بيئة LLM_API_KEY غير موجود!")
try:
self.client = AsyncOpenAI(
base_url=LLM_API_URL,
api_key=LLM_API_KEY,
timeout=CLIENT_TIMEOUT
)
print(f"✅ [LLMService V19.3] مهيأ. النموذج: {LLM_MODEL}")
print(f" -> Endpoint: {LLM_API_URL}")
except Exception as e:
print(f"❌ [LLMService V19.3] فشل تهيئة AsyncOpenAI: {e}")
traceback.print_exc()
raise
# --- (الربط بالخدمات الأخرى) ---
self.r2_service: Optional[R2Service] = None
self.learning_hub: Optional[LearningHubManager] = None
self.news_fetcher: Optional[NewsFetcher] = None
# 🔴 --- START OF CHANGE (V19.3) --- 🔴
# (إضافة متغير VADER)
self.vader_analyzer: Optional[SentimentIntensityAnalyzer] = None
# 🔴 --- END OF CHANGE --- 🔴
async def _call_llm(self, prompt: str) -> Optional[str]:
"""
(محدث V19.2)
إجراء استدعاء API للنموذج الضخم (يستخدم الآن "detailed thinking on" كـ system prompt).
"""
system_prompt = "detailed thinking on"
payload = {
"model": LLM_MODEL,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt} # (prompt يحتوي الآن على تعليمات JSON)
],
"temperature": LLM_TEMPERATURE,
"top_p": LLM_TOP_P,
"max_tokens": LLM_MAX_TOKENS,
"frequency_penalty": LLM_FREQUENCY_PENALTY,
"presence_penalty": LLM_PRESENCE_PENALTY,
"stream": False, # (يجب أن تكون False للحصول على JSON)
"response_format": {"type": "json_object"}
}
try:
response = await self.client.chat.completions.create(**payload)
if response.choices and len(response.choices) > 0:
content = response.choices[0].message.content
if content:
return content.strip()
print(f"❌ [LLMService] استجابة API غير متوقعة: {response.model_dump_json()}")
return None
except RateLimitError as e:
print(f"❌ [LLMService] خطأ Rate Limit من NVIDIA API: {e}")
except APIError as e:
print(f"❌ [LLMService] خطأ API من NVIDIA API: {e}")
except json.JSONDecodeError:
print(f"❌ [LLMService] فشل في تحليل استجابة JSON.")
except Exception as e:
print(f"❌ [LLMService] خطأ غير متوقع في _call_llm: {e}")
traceback.print_exc()
return None
def _parse_llm_response_enhanced(self,
response_text: str,
fallback_strategy: str = "decision",
symbol: str = "N/A") -> Optional[Dict[str, Any]]:
"""
(محدث V8) محلل JSON ذكي ومتسامح مع الأخطاء.
"""
if not response_text:
print(f" ⚠️ [LLMParser] الاستجابة فارغة لـ {symbol}.")
return self._get_fallback_response(fallback_strategy, "Empty response")
# 1. محاولة تحليل JSON مباشرة (لأننا طلبنا response_format=json_object)
try:
return json.loads(response_text)
except json.JSONDecodeError:
print(f" ⚠️ [LLMParser] فشل تحليل JSON المباشر لـ {symbol}. محاولة استخراج JSON...")
pass # (الانتقال إلى المحاولة 2)
# 2. محاولة استخراج JSON من داخل نص (Fallback 1)
try:
# (البحث عن أول { وآخر })
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if json_match:
json_string = json_match.group(0)
return json.loads(json_string)
else:
print(f" ⚠️ [LLMParser] لم يتم العثور على JSON مطابق لـ {symbol}.")
raise json.JSONDecodeError("No JSON object found in text", response_text, 0)
except json.JSONDecodeError as e:
print(f" ❌ [LLMParser] فشل الاستخراج النهائي لـ {symbol}. نص الاستجابة: {response_text[:200]}...")
return self._get_fallback_response(fallback_strategy, f"Final JSON parse fail: {e}")
except Exception as e:
print(f" ❌ [LLMParser] خطأ غير متوقع في المحلل لـ {symbol}: {e}")
return self._get_fallback_response(fallback_strategy, f"Unexpected parser error: {e}")
def _get_fallback_response(self, strategy: str, reason: str) -> Optional[Dict[str, Any]]:
"""
(محدث V8) إرجاع استجابة آمنة عند فشل النموذج الضخم.
"""
print(f" 🚨 [LLMService] تفعيل الاستجابة الاحتياطية (Fallback) لاستراتيجية '{strategy}' (السبب: {reason})")
if strategy == "decision":
# (القرار الآمن: لا تتداول)
return {
"action": "NO_DECISION",
"strategy_to_watch": "GENERIC",
"confidence_level": 0,
"reasoning": f"LLM analysis failed: {reason}",
"exit_profile": "Standard"
}
elif strategy == "reanalysis":
# (القرار الآمن: استمر في الصفقة الحالية)
return {
"action": "HOLD",
"strategy": "MAINTAIN_CURRENT",
"reasoning": f"LLM re-analysis failed: {reason}. Maintaining current trade strategy."
}
elif strategy == "reflection":
# (القرار الآمن: لا تقم بإنشاء قاعدة تعلم)
return None # (سيمنع Reflector من إنشاء دلتا)
elif strategy == "distillation":
# (القرار الآمن: لا تقم بإنشاء قواعد مقطرة)
return None # (سيمنع Curator من المتابعة)
return None # (Fallback عام)
async def get_trading_decision(self, candidate_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
(محدث V8.1)
يستدعي النموذج الضخم لاتخاذ قرار "WATCH" استراتيجي (Explorer Brain).
"""
symbol = candidate_data.get('symbol', 'UNKNOWN')
try:
# 1. (العقل) جلب القواعد (Deltas) من محور التعلم
learning_context_prompt = "Playbook: No learning context available."
if self.learning_hub:
learning_context_prompt = await self.learning_hub.get_active_context_for_llm(
domain="general",
query=f"{symbol} strategy decision"
)
# 2. إنشاء الـ Prompt (باللغة الإنجليزية)
prompt = self._create_trading_prompt(candidate_data, learning_context_prompt)
if self.r2_service:
await self.r2_service.save_llm_prompts_async(symbol, "trading_decision", prompt, candidate_data)
# 3. استدعاء النموذج الضخم (LLM)
response_text = await self._call_llm(prompt)
# 4. تحليل الاستجابة (باستخدام المحلل الذكي)
decision_json = self._parse_llm_response_enhanced(
response_text,
fallback_strategy="decision",
symbol=symbol
)
return decision_json
except Exception as e:
print(f"❌ [LLMService] فشل فادح في get_trading_decision لـ {symbol}: {e}")
traceback.print_exc()
return self._get_fallback_response("decision", str(e)) # (إرجاع قرار آمن)
async def re_analyze_trade_async(self, trade_data: Dict[str, Any], current_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
(محدث V19.3)
يستدعي النموذج الضخم لإعادة تحليل صفقة مفتوحة (Reflector Brain).
"""
symbol = trade_data.get('symbol', 'UNKNOWN')
try:
# 1. (العقل) جلب القواعد (Deltas) من محور التعلم
learning_context_prompt = "Playbook: No learning context available."
if self.learning_hub:
learning_context_prompt = await self.learning_hub.get_active_context_for_llm(
domain="strategy",
query=f"{symbol} re-analysis {trade_data.get('strategy', 'GENERIC')}"
)
# 2. (V8.1) جلب أحدث الأخبار (باستخدام NewsFetcher المخصص)
latest_news_text = "News data unavailable for re-analysis."
latest_news_score = 0.0
# 🔴 --- START OF CHANGE (V19.3) --- 🔴
# (استخدام self.vader_analyzer الذي تم حقنه)
if self.news_fetcher:
latest_news_text = await self.news_fetcher.get_news_for_symbol(symbol)
if self.vader_analyzer and latest_news_text: # (التحقق من المحلل المُمرر)
vader_scores = self.vader_analyzer.polarity_scores(latest_news_text)
latest_news_score = vader_scores.get('compound', 0.0)
# 🔴 --- END OF CHANGE --- 🔴
current_data['latest_news_text'] = latest_news_text
current_data['latest_news_score'] = latest_news_score
# 3. إنشاء الـ Prompt (باللغة الإنجليزية)
prompt = await self._create_reanalysis_prompt(trade_data, current_data, learning_context_prompt)
if self.r2_service:
await self.r2_service.save_llm_prompts_async(symbol, "trade_reanalysis", prompt, current_data)
# 4. استدعاء النموذج الضخم (LLM)
response_text = await self._call_llm(prompt)
# 5. تحليل الاستجابة (باستخدام المحلل الذكي)
decision_json = self._parse_llm_response_enhanced(
response_text,
fallback_strategy="reanalysis",
symbol=symbol
)
return decision_json
except Exception as e:
print(f"❌ [LLMService] فشل فادح في re_analyze_trade_async لـ {symbol}: {e}")
traceback.print_exc()
return self._get_fallback_response("reanalysis", str(e)) # (إرجاع قرار آمن)
# --- (دوال إنشاء الـ Prompts) ---
# (ملاحظة: هذه الدوال يجب أن تكون دائماً باللغة الإنجليزية)
def _create_trading_prompt(self,
candidate_data: Dict[str, Any],
learning_context: str) -> str:
"""
(معدل V19.2)
إنشاء الـ Prompt (باللغة الإنجليزية) لاتخاذ قرار التداول الأولي (Explorer).
(تم نقل جميع التعليمات إلى هنا لتناسب system prompt الجديد)
"""
symbol = candidate_data.get('symbol', 'N/A')
# --- 1. استخراج بيانات ML (الطبقة 1) ---
l1_score = candidate_data.get('enhanced_final_score', 0)
l1_reasons = candidate_data.get('reasons_for_candidacy', [])
pattern_data = candidate_data.get('pattern_analysis', {})
mc_data = candidate_data.get('monte_carlo_distribution', {})
# --- 2. استخراج بيانات المشاعر والأخبار (الطبقة 1) ---
news_text = candidate_data.get('news_text', 'No news text provided.')
news_score_raw = candidate_data.get('news_score_raw', 0.0)
statistical_news_pnl = candidate_data.get('statistical_news_pnl', 0.0)
# --- 3. استخراج بيانات الحيتان (الطبقة 1) ---
whale_data = candidate_data.get('whale_data', {})
whale_summary = whale_data.get('llm_friendly_summary', {})
exchange_flows = whale_data.get('exchange_flows', {})
whale_signal = whale_summary.get('recommended_action', 'HOLD')
whale_confidence = whale_summary.get('confidence', 0.3)
whale_reason = whale_summary.get('whale_activity_summary', 'No whale data.')
net_flow_usd = exchange_flows.get('net_flow_usd', 0.0)
# (البيانات طويلة المدى - من تحليل 24 ساعة الجديد)
accumulation_data_24h = whale_data.get('accumulation_analysis_24h', {})
net_flow_24h_usd = accumulation_data_24h.get('net_flow_usd', 0.0)
total_inflow_24h_usd = accumulation_data_24h.get('to_exchanges_usd', 0.0)
total_outflow_24h_usd = accumulation_data_24h.get('from_exchanges_usd', 0.0)
relative_net_flow_24h_percent = accumulation_data_24h.get('relative_net_flow_percent', 0.0)
# --- 4. استخراج بيانات السوق (الطبقة 0) ---
market_context = candidate_data.get('sentiment_data', {})
market_trend = market_context.get('market_trend', 'UNKNOWN')
btc_sentiment = market_context.get('btc_sentiment', 'UNKNOWN')
# --- 5. بناء أقسام الـ Prompt (الإنجليزية) ---
playbook_prompt = f"""
--- START OF LEARNING PLAYBOOK ---
{learning_context}
--- END OF PLAYBOOK ---
"""
ml_summary_prompt = f"""
1. **ML Analysis (Score: {l1_score:.3f}):**
* Reasons: {', '.join(l1_reasons)}
* Chart Pattern: {pattern_data.get('pattern_detected', 'None')} (Conf: {pattern_data.get('pattern_confidence', 0):.2f})
* Monte Carlo (1h): {mc_data.get('probability_of_gain', 0):.1f}% chance of profit (Expected: {mc_data.get('expected_return_pct', 0):.2f}%)
"""
news_prompt = f"""
2. **News & Sentiment Analysis:**
* Market Trend: {market_trend} (BTC: {btc_sentiment})
* VADER (Raw): {news_score_raw:.3f}
* Statistical PnL (Learned): {statistical_news_pnl:+.2f}%
* News Text: {news_text[:300]}...
"""
whale_activity_prompt = f"""
3. **Whale Activity (Real-time Flow - Optimized Window):**
* Signal: {whale_signal} (Confidence: {whale_confidence:.2f})
* Reason: {whale_reason}
* Net Flow (to/from Exchanges): ${net_flow_usd:,.2f}
4. **Whale Activity (24h Accumulation):**
* 24h Net Flow (Accumulation): ${net_flow_24h_usd:,.2f}
* 24h Total Deposits: ${total_inflow_24h_usd:,.2f}
* 24h Total Withdrawals: ${total_outflow_24h_usd:,.2f}
* Relative 24h Net Flow (vs Daily Volume): {relative_net_flow_24h_percent:+.2f}%
"""
# (تم دمج جميع التعليمات هنا في رسالة الـ user)
task_prompt = f"""
CONTEXT:
You are an expert AI trading analyst (Explorer Brain).
Analyze the provided data for {symbol} and decide if it's a high-potential candidate to 'WATCH'.
{playbook_prompt}
--- START OF CANDIDATE DATA ---
{ml_summary_prompt}
{news_prompt}
{whale_activity_prompt}
--- END OF CANDIDATE DATA ---
TASK:
1. **Internal Thinking (Private):** Perform a step-by-step analysis (as triggered by the system prompt).
* Synthesize all data points (ML, News, Whale Flow, 24h Accumulation).
* Are the signals aligned? (e.g., ML breakout + Whale Accumulation = Strong).
* Are there conflicts? (e.g., Good ML Score but high 24h Deposits = Risky).
* Consult the "Playbook" for learned rules.
2. **Final Decision:** Based on your internal thinking, decide the final action.
3. **Output Constraint:** Provide your final answer ONLY in the requested JSON object format, with no introductory text, markdown formatting, or explanations.
OUTPUT (JSON Object ONLY):
{{
"action": "WATCH" or "NO_DECISION",
"strategy_to_watch": "STRATEGY_NAME",
"confidence_level": 0.0_to_1.0,
"reasoning": "Brief justification (max 40 words) synthesizing all data points.",
"exit_profile": "Aggressive" or "Standard" or "Patient"
}}
"""
# (نرسل فقط task_prompt لأنه يحتوي الآن على كل شيء)
return task_prompt
async def _create_reanalysis_prompt(self,
trade_data: Dict[str, Any],
current_data: Dict[str, Any],
learning_context: str) -> str:
"""
(معدل V19.2)
إنشاء الـ Prompt (باللغة الإنجليزية) لإعادة تحليل صفقة مفتوحة (Reflector Brain).
"""
symbol = trade_data.get('symbol', 'N/A')
# --- 1. بيانات الصفقة الأصلية (القديمة) ---
original_strategy = trade_data.get('strategy', 'N/A')
original_reasoning = trade_data.get('decision_data', {}).get('reasoning', 'N/A')
entry_price = trade_data.get('entry_price', 0)
current_pnl = trade_data.get('pnl_percent', 0)
current_sl = trade_data.get('stop_loss', 0)
current_tp = trade_data.get('take_profit', 0)
# --- 2. البيانات الفنية المحدثة (الحالية) ---
current_price = current_data.get('current_price', 0)
mc_data = current_data.get('monte_carlo_distribution', {})
mc_prob = mc_data.get('probability_of_gain', 0)
mc_expected_return = mc_data.get('expected_return_pct', 0)
# --- 3. (V8.1) بيانات الأخبار المحدثة (الحالية) ---
latest_news_text = current_data.get('latest_news_text', 'No news.')
latest_news_score = current_data.get('latest_news_score', 0.0)
# --- 4. (العقل) بيانات التعلم الإحصائي ---
statistical_feedback = ""
if self.learning_hub:
statistical_feedback = await self.learning_hub.get_statistical_feedback_for_llm(original_strategy)
# --- 5. بناء أقسام الـ Prompt (الإنجليزية) ---
playbook_prompt = f"""
--- START OF LEARNING PLAYBOOK ---
{learning_context}
{statistical_feedback}
--- END OF PLAYBOOK ---
"""
trade_status_prompt = f"""
1. **Open Trade Status ({symbol}):**
* Current PnL: {current_pnl:+.2f}%
* Original Strategy: {original_strategy}
* Original Reasoning: {original_reasoning}
* Entry Price: {entry_price}
* Current Price: {current_price}
* Current StopLoss: {current_sl}
* Current TakeProfit: {current_tp}
"""
current_analysis_prompt = f"""
2. **Current Real-time Analysis:**
* Monte Carlo (1h): {mc_prob:.1f}% chance of profit (Expected: {mc_expected_return:.2f}%)
* Latest News (VADER: {latest_news_score:.3f}): {latest_news_text[:300]}...
"""
# (دمج جميع التعليمات في رسالة الـ user)
task_prompt = f"""
CONTEXT:
You are an expert AI trading analyst (Reflector Brain).
An open trade for {symbol} has triggered a mandatory re-analysis. Analyze the new data and decide the next action.
{playbook_prompt}
--- START OF TRADE DATA ---
{trade_status_prompt}
{current_analysis_prompt}
--- END OF TRADE DATA ---
TASK:
1. **Internal Thinking (Private):** Perform a step-by-step analysis (as triggered by the system prompt).
* Compare the "Open Trade Status" with the "Current Real-time Analysis".
* Has the situation improved or deteriorated? (e.g., PnL is good, but new Monte Carlo is negative).
* Are there new critical news?
* Consult the "Playbook" for learned rules and statistical feedback.
2. **Final Decision:** Based on your internal thinking, decide the best course of action (HOLD, UPDATE_TRADE, CLOSE_TRADE).
3. **Output Constraint:** Provide your final answer ONLY in the requested JSON object format, with no introductory text, markdown formatting, or explanations.
OUTPUT (JSON Object ONLY):
{{
"action": "HOLD" or "UPDATE_TRADE" or "CLOSE_TRADE",
"strategy": "MAINTAIN_CURRENT" or "ADAPTIVE_EXIT" or "IMMEDIATE_EXIT",
"reasoning": "Brief justification (max 40 words) for the decision.",
"new_stop_loss": (float or null, required if action is 'UPDATE_TRADE'),
"new_take_profit": (float or null, required if action is 'UPDATE_TRADE')
}}
"""
return task_prompt