Trad / sentiment_news.py
Riy777's picture
Update sentiment_news.py
662698c
raw
history blame
13.6 kB
import os, asyncio
import httpx
from gnews import GNews
import feedparser
from datetime import datetime, timedelta, timezone
import time
#
# 🔴 تم التعديل: توسيع قائمة مصادر RSS لتشمل تغطية أوسع للعملات البديلة
#
CRYPTO_RSS_FEEDS = {
"Cointelegraph": "https://cointelegraph.com/rss",
"CoinDesk": "https://www.coindesk.com/arc/outboundfeeds/rss/",
"CryptoSlate": "https://cryptoslate.com/feed/",
"NewsBTC": "https://www.newsbtc.com/feed/",
"Bitcoin.com": "https://news.bitcoin.com/feed/",
"The Block": "https://www.theblock.co/rss.xml",
"Decrypt": "https://decrypt.co/feed",
"AMBCrypto": "https://ambcrypto.com/feed/",
"CryptoPotato": "https://cryptopotato.com/feed/",
"U.Today": "https://u.today/rss"
}
class NewsFetcher:
def __init__(self):
self.http_client = httpx.AsyncClient(
timeout=10.0, follow_redirects=True,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.9',
'Cache-Control': 'no-cache'
}
)
# GNews مفلترة مسبقاً لـ 3 ساعات
self.gnews = GNews(language='en', country='US', period='3h', max_results=8)
async def _fetch_from_gnews(self, symbol: str) -> list:
try:
base_symbol = symbol.split("/")[0]
# نستخدم الكلمات المفتاحية السلبية لمحاولة عزل أخبار العملة نفسها
query = f'"{base_symbol}" cryptocurrency -bitcoin -ethereum -BTC -ETH'
# GNews تعمل بشكل متزامن، لذا نستخدم to_thread
news_items = await asyncio.to_thread(self.gnews.get_news, query)
#
# 🔴 تم التعديل: توحيد تنسيق الإخراج ليتضمن مفتاح 'published'
#
formatted_items = []
for item in news_items:
# GNews توفر تاريخ النشر كنص (وهو مفلتر مسبقاً لـ 3 ساعات)
published_text = item.get('published date', 'Recent')
formatted_items.append({
'title': item.get('title', 'No Title'),
'description': item.get('description', 'No Description'),
'source': item.get('source', {}).get('title', 'GNews'),
'published': published_text # تمرير التاريخ/الوقت
})
return formatted_items
except Exception as e:
print(f"Failed to fetch specific news from GNews for {symbol}: {e}")
return []
async def _fetch_from_rss_feed(self, feed_url: str, source_name: str, symbol: str) -> list:
try:
base_symbol = symbol.split('/')[0]
max_redirects = 2
current_url = feed_url
# منطق معالجة إعادة التوجيه
for attempt in range(max_redirects):
try:
response = await self.http_client.get(current_url)
response.raise_for_status()
break
except httpx.HTTPStatusError as e:
if e.response.status_code in [301, 302, 307, 308] and 'Location' in e.response.headers:
current_url = e.response.headers['Location']
continue
else:
raise
feed = feedparser.parse(response.text)
news_items = []
search_term = base_symbol.lower()
#
# 🔴 تم التعديل: حساب الوقت قبل 3 ساعات (باستخدام التوقيت العالمي UTC)
#
three_hours_ago = datetime.now(timezone.utc) - timedelta(hours=3)
#
# 🔴 تم التعديل: إزالة قيد '[:15]' والفلترة حسب الوقت
#
for entry in feed.entries:
title = entry.title.lower() if hasattr(entry, 'title') else ''
summary = entry.summary.lower() if hasattr(entry, 'summary') else entry.description.lower() if hasattr(entry, 'description') else ''
# التحقق من تاريخ النشر
published_tuple = entry.get('published_parsed')
if not published_tuple:
continue # تخطي الخبر إذا لم يكن له تاريخ نشر
# تحويل تاريخ النشر إلى كائن datetime مع دعم التوقيت العالمي (UTC)
try:
published_time = datetime.fromtimestamp(time.mktime(published_tuple), timezone.utc)
except Exception:
# في حال فشل التحويل، نفترض أنه قديم
continue
#
# 🔴 تم التعديل: تطبيق الفلتر الزمني (آخر 3 ساعات) وفلتر اسم العملة
#
if (search_term in title or search_term in summary) and (published_time >= three_hours_ago):
news_items.append({
'title': entry.title,
'description': summary,
'source': source_name,
'published': published_time.isoformat() # إرسال التاريخ بتنسيق ISO
})
return news_items
except Exception as e:
print(f"Failed to fetch specific news from {source_name} RSS for {symbol}: {e}")
return []
async def get_news_for_symbol(self, symbol: str) -> str:
base_symbol = symbol.split("/")[0]
# إنشاء قائمة المهام (GNews + جميع مصادر RSS)
tasks = [self._fetch_from_gnews(symbol)]
for name, url in CRYPTO_RSS_FEEDS.items():
tasks.append(self._fetch_from_rss_feed(url, name, symbol))
results = await asyncio.gather(*tasks, return_exceptions=True)
all_news_text = []
for result in results:
if isinstance(result, Exception):
continue
for item in result:
# نستخدم الفلتر الثانوي للتأكد من أن الخبر يركز فعلاً على العملة
if self._is_directly_relevant_to_symbol(item, base_symbol):
title = item.get('title', 'No Title')
description = item.get('description', 'No Description')
source = item.get('source', 'Unknown Source')
#
# 🔴 تم التعديل: هذا المنطق سيعمل الآن بفضل التعديلات السابقة
#
published = item.get('published', '') # الحصول على التاريخ/الوقت
news_entry = f"[{source}] {title}. {description}"
#
# 🔴 تم التعديل: إضافة وقت النشر إلى النص النهائي (تنفيذاً لطلبك)
#
if published:
news_entry += f" (Published: {published})"
all_news_text.append(news_entry)
if not all_news_text:
return f"No specific news found for {base_symbol} in the last 3 hours."
# أخذ أهم 5 أخبار (الأكثر حداثة أو صلة)
important_news = all_news_text[:5]
return " | ".join(important_news)
def _is_directly_relevant_to_symbol(self, news_item, base_symbol):
"""
فلتر ثانوي للتأكد من أن الخبر ليس مجرد ذكر عابر للعملة،
بل يتعلق فعلاً بجوانب التداول أو السوق.
"""
title = news_item.get('title', '').lower()
description = news_item.get('description', '').lower()
symbol_lower = base_symbol.lower()
# يجب أن يكون الرمز موجوداً في العنوان أو الوصف
if symbol_lower not in title and symbol_lower not in description:
return False
# يجب أن يحتوي الخبر على كلمات مفتاحية تدل على أنه خبر مالي/تداولي
crypto_keywords = [
'crypto', 'cryptocurrency', 'token', 'blockchain',
'price', 'market', 'trading', 'exchange', 'defi',
'coin', 'digital currency', 'altcoin', 'airdrop', 'listing',
'partnership', 'update', 'mainnet', 'protocol'
]
return any(keyword in title or keyword in description for keyword in crypto_keywords)
# --- (باقي الملف: SentimentAnalyzer و format_whale_analysis) ---
# --- (لم يتم المساس بهذه الأجزاء لأنها خارج نطاق الطلب) ---
class SentimentAnalyzer:
def __init__(self, data_manager):
self.data_manager = data_manager
async def get_market_sentiment(self):
try:
market_context = await self.data_manager.get_market_context_async()
if not market_context:
return await self.get_fallback_market_context()
return market_context
except Exception as e:
print(f"Failed to get market sentiment: {e}")
return await self.get_fallback_market_context()
async def get_fallback_market_context(self):
return {
'timestamp': datetime.now().isoformat(),
'general_whale_activity': {
'sentiment': 'NEUTRAL',
'description': 'Fallback mode - system initializing',
'critical_alert': False,
'transaction_count': 0,
'total_volume_usd': 0,
'netflow_analysis': {
'net_flow': 0,
'flow_direction': 'BALANCED',
'market_impact': 'LOW'
}
},
'btc_sentiment': 'NEUTRAL',
'fear_and_greed_index': 50
}
def format_whale_analysis(self, general_whale_activity, symbol_whale_data, symbol):
analysis_parts = []
if general_whale_activity.get('data_available', False):
netflow_analysis = general_whale_activity.get('netflow_analysis', {})
critical_flag = " CRITICAL ALERT" if general_whale_activity.get('critical_alert') else ''
if netflow_analysis:
inflow = netflow_analysis.get('inflow_to_exchanges', 0)
outflow = netflow_analysis.get('outflow_from_exchanges', 0)
net_flow = netflow_analysis.get('net_flow', 0)
flow_direction = netflow_analysis.get('flow_direction', 'BALANCED')
market_impact = netflow_analysis.get('market_impact', 'UNKNOWN')
analysis_parts.append(f"General Market Netflow Analysis:")
analysis_parts.append(f" • Inflow to Exchanges: ${inflow:,.0f}")
analysis_parts.append(f" • Outflow from Exchanges: ${outflow:,.0f}")
analysis_parts.append(f" • Net Flow: ${net_flow:,.0f} ({flow_direction})")
analysis_parts.append(f" • Market Impact: {market_impact}{critical_flag}")
trading_signals = general_whale_activity.get('trading_signals', [])
if trading_signals:
analysis_parts.append(f" • Trading Signals: {len(trading_signals)} active signals")
for signal in trading_signals[:3]:
analysis_parts.append(f" ◦ {signal.get('action')}: {signal.get('reason')} (Confidence: {signal.get('confidence', 0):.2f})")
else:
analysis_parts.append(f"General Market: {general_whale_activity.get('description', 'Activity detected')}{critical_flag}")
else:
analysis_parts.append("General Market: No significant general whale data available")
if symbol_whale_data.get('data_available', False):
activity_level = symbol_whale_data.get('activity_level', 'UNKNOWN')
large_transfers = symbol_whale_data.get('large_transfers_count', 0)
total_volume = symbol_whale_data.get('total_volume', 0)
analysis_parts.append(f"{symbol} Specific Whale Activity:")
analysis_parts.append(f" • Activity Level: {activity_level}")
analysis_parts.append(f" • Large Transfers: {large_transfers}")
analysis_parts.append(f" • Total Volume: ${total_volume:,.0f}")
recent_transfers = symbol_whale_data.get('recent_large_transfers', [])
if recent_transfers:
analysis_parts.append(f" • Recent Large Transfers: {len(recent_transfers)}")
else:
analysis_parts.append(f"{symbol} Specific: No contract-based whale data available")
return "\n".join(analysis_parts)