|
|
import os, asyncio |
|
|
import httpx |
|
|
from gnews import GNews |
|
|
import feedparser |
|
|
from datetime import datetime, timedelta, timezone |
|
|
import time |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CRYPTO_RSS_FEEDS = { |
|
|
"Cointelegraph": "https://cointelegraph.com/rss", |
|
|
"CoinDesk": "https://www.coindesk.com/arc/outboundfeeds/rss/", |
|
|
"CryptoSlate": "https://cryptoslate.com/feed/", |
|
|
"NewsBTC": "https://www.newsbtc.com/feed/", |
|
|
"Bitcoin.com": "https://news.bitcoin.com/feed/", |
|
|
"The Block": "https://www.theblock.co/rss.xml", |
|
|
"Decrypt": "https://decrypt.co/feed", |
|
|
"AMBCrypto": "https://ambcrypto.com/feed/", |
|
|
"CryptoPotato": "https://cryptopotato.com/feed/", |
|
|
"U.Today": "https://u.today/rss" |
|
|
} |
|
|
|
|
|
class NewsFetcher: |
|
|
def __init__(self): |
|
|
self.http_client = httpx.AsyncClient( |
|
|
timeout=10.0, follow_redirects=True, |
|
|
headers={ |
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', |
|
|
'Accept': 'application/json, text/plain, */*', |
|
|
'Accept-Language': 'en-US,en;q=0.9', |
|
|
'Cache-Control': 'no-cache' |
|
|
} |
|
|
) |
|
|
|
|
|
self.gnews = GNews(language='en', country='US', period='3h', max_results=8) |
|
|
|
|
|
async def _fetch_from_gnews(self, symbol: str) -> list: |
|
|
try: |
|
|
base_symbol = symbol.split("/")[0] |
|
|
|
|
|
query = f'"{base_symbol}" cryptocurrency -bitcoin -ethereum -BTC -ETH' |
|
|
|
|
|
|
|
|
news_items = await asyncio.to_thread(self.gnews.get_news, query) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
formatted_items = [] |
|
|
for item in news_items: |
|
|
|
|
|
published_text = item.get('published date', 'Recent') |
|
|
formatted_items.append({ |
|
|
'title': item.get('title', 'No Title'), |
|
|
'description': item.get('description', 'No Description'), |
|
|
'source': item.get('source', {}).get('title', 'GNews'), |
|
|
'published': published_text |
|
|
}) |
|
|
return formatted_items |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Failed to fetch specific news from GNews for {symbol}: {e}") |
|
|
return [] |
|
|
|
|
|
async def _fetch_from_rss_feed(self, feed_url: str, source_name: str, symbol: str) -> list: |
|
|
try: |
|
|
base_symbol = symbol.split('/')[0] |
|
|
max_redirects = 2 |
|
|
current_url = feed_url |
|
|
|
|
|
|
|
|
for attempt in range(max_redirects): |
|
|
try: |
|
|
response = await self.http_client.get(current_url) |
|
|
response.raise_for_status() |
|
|
break |
|
|
except httpx.HTTPStatusError as e: |
|
|
if e.response.status_code in [301, 302, 307, 308] and 'Location' in e.response.headers: |
|
|
current_url = e.response.headers['Location'] |
|
|
continue |
|
|
else: |
|
|
raise |
|
|
|
|
|
feed = feedparser.parse(response.text) |
|
|
news_items = [] |
|
|
search_term = base_symbol.lower() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
three_hours_ago = datetime.now(timezone.utc) - timedelta(hours=3) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for entry in feed.entries: |
|
|
title = entry.title.lower() if hasattr(entry, 'title') else '' |
|
|
summary = entry.summary.lower() if hasattr(entry, 'summary') else entry.description.lower() if hasattr(entry, 'description') else '' |
|
|
|
|
|
|
|
|
published_tuple = entry.get('published_parsed') |
|
|
if not published_tuple: |
|
|
continue |
|
|
|
|
|
|
|
|
try: |
|
|
published_time = datetime.fromtimestamp(time.mktime(published_tuple), timezone.utc) |
|
|
except Exception: |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (search_term in title or search_term in summary) and (published_time >= three_hours_ago): |
|
|
news_items.append({ |
|
|
'title': entry.title, |
|
|
'description': summary, |
|
|
'source': source_name, |
|
|
'published': published_time.isoformat() |
|
|
}) |
|
|
|
|
|
return news_items |
|
|
except Exception as e: |
|
|
print(f"Failed to fetch specific news from {source_name} RSS for {symbol}: {e}") |
|
|
return [] |
|
|
|
|
|
async def get_news_for_symbol(self, symbol: str) -> str: |
|
|
base_symbol = symbol.split("/")[0] |
|
|
|
|
|
|
|
|
tasks = [self._fetch_from_gnews(symbol)] |
|
|
for name, url in CRYPTO_RSS_FEEDS.items(): |
|
|
tasks.append(self._fetch_from_rss_feed(url, name, symbol)) |
|
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
all_news_text = [] |
|
|
|
|
|
for result in results: |
|
|
if isinstance(result, Exception): |
|
|
continue |
|
|
|
|
|
for item in result: |
|
|
|
|
|
if self._is_directly_relevant_to_symbol(item, base_symbol): |
|
|
title = item.get('title', 'No Title') |
|
|
description = item.get('description', 'No Description') |
|
|
source = item.get('source', 'Unknown Source') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
published = item.get('published', '') |
|
|
|
|
|
news_entry = f"[{source}] {title}. {description}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if published: |
|
|
news_entry += f" (Published: {published})" |
|
|
|
|
|
all_news_text.append(news_entry) |
|
|
|
|
|
if not all_news_text: |
|
|
return f"No specific news found for {base_symbol} in the last 3 hours." |
|
|
|
|
|
|
|
|
important_news = all_news_text[:5] |
|
|
return " | ".join(important_news) |
|
|
|
|
|
def _is_directly_relevant_to_symbol(self, news_item, base_symbol): |
|
|
""" |
|
|
فلتر ثانوي للتأكد من أن الخبر ليس مجرد ذكر عابر للعملة، |
|
|
بل يتعلق فعلاً بجوانب التداول أو السوق. |
|
|
""" |
|
|
title = news_item.get('title', '').lower() |
|
|
description = news_item.get('description', '').lower() |
|
|
symbol_lower = base_symbol.lower() |
|
|
|
|
|
|
|
|
if symbol_lower not in title and symbol_lower not in description: |
|
|
return False |
|
|
|
|
|
|
|
|
crypto_keywords = [ |
|
|
'crypto', 'cryptocurrency', 'token', 'blockchain', |
|
|
'price', 'market', 'trading', 'exchange', 'defi', |
|
|
'coin', 'digital currency', 'altcoin', 'airdrop', 'listing', |
|
|
'partnership', 'update', 'mainnet', 'protocol' |
|
|
] |
|
|
|
|
|
return any(keyword in title or keyword in description for keyword in crypto_keywords) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SentimentAnalyzer: |
|
|
def __init__(self, data_manager): |
|
|
self.data_manager = data_manager |
|
|
|
|
|
async def get_market_sentiment(self): |
|
|
try: |
|
|
market_context = await self.data_manager.get_market_context_async() |
|
|
if not market_context: |
|
|
return await self.get_fallback_market_context() |
|
|
return market_context |
|
|
except Exception as e: |
|
|
print(f"Failed to get market sentiment: {e}") |
|
|
return await self.get_fallback_market_context() |
|
|
|
|
|
async def get_fallback_market_context(self): |
|
|
return { |
|
|
'timestamp': datetime.now().isoformat(), |
|
|
'general_whale_activity': { |
|
|
'sentiment': 'NEUTRAL', |
|
|
'description': 'Fallback mode - system initializing', |
|
|
'critical_alert': False, |
|
|
'transaction_count': 0, |
|
|
'total_volume_usd': 0, |
|
|
'netflow_analysis': { |
|
|
'net_flow': 0, |
|
|
'flow_direction': 'BALANCED', |
|
|
'market_impact': 'LOW' |
|
|
} |
|
|
}, |
|
|
'btc_sentiment': 'NEUTRAL', |
|
|
'fear_and_greed_index': 50 |
|
|
} |
|
|
|
|
|
def format_whale_analysis(self, general_whale_activity, symbol_whale_data, symbol): |
|
|
analysis_parts = [] |
|
|
|
|
|
if general_whale_activity.get('data_available', False): |
|
|
netflow_analysis = general_whale_activity.get('netflow_analysis', {}) |
|
|
critical_flag = " CRITICAL ALERT" if general_whale_activity.get('critical_alert') else '' |
|
|
|
|
|
if netflow_analysis: |
|
|
inflow = netflow_analysis.get('inflow_to_exchanges', 0) |
|
|
outflow = netflow_analysis.get('outflow_from_exchanges', 0) |
|
|
net_flow = netflow_analysis.get('net_flow', 0) |
|
|
flow_direction = netflow_analysis.get('flow_direction', 'BALANCED') |
|
|
market_impact = netflow_analysis.get('market_impact', 'UNKNOWN') |
|
|
|
|
|
analysis_parts.append(f"General Market Netflow Analysis:") |
|
|
analysis_parts.append(f" • Inflow to Exchanges: ${inflow:,.0f}") |
|
|
analysis_parts.append(f" • Outflow from Exchanges: ${outflow:,.0f}") |
|
|
analysis_parts.append(f" • Net Flow: ${net_flow:,.0f} ({flow_direction})") |
|
|
analysis_parts.append(f" • Market Impact: {market_impact}{critical_flag}") |
|
|
|
|
|
trading_signals = general_whale_activity.get('trading_signals', []) |
|
|
if trading_signals: |
|
|
analysis_parts.append(f" • Trading Signals: {len(trading_signals)} active signals") |
|
|
for signal in trading_signals[:3]: |
|
|
analysis_parts.append(f" ◦ {signal.get('action')}: {signal.get('reason')} (Confidence: {signal.get('confidence', 0):.2f})") |
|
|
else: |
|
|
analysis_parts.append(f"General Market: {general_whale_activity.get('description', 'Activity detected')}{critical_flag}") |
|
|
else: |
|
|
analysis_parts.append("General Market: No significant general whale data available") |
|
|
|
|
|
if symbol_whale_data.get('data_available', False): |
|
|
activity_level = symbol_whale_data.get('activity_level', 'UNKNOWN') |
|
|
large_transfers = symbol_whale_data.get('large_transfers_count', 0) |
|
|
total_volume = symbol_whale_data.get('total_volume', 0) |
|
|
|
|
|
analysis_parts.append(f"{symbol} Specific Whale Activity:") |
|
|
analysis_parts.append(f" • Activity Level: {activity_level}") |
|
|
analysis_parts.append(f" • Large Transfers: {large_transfers}") |
|
|
analysis_parts.append(f" • Total Volume: ${total_volume:,.0f}") |
|
|
|
|
|
recent_transfers = symbol_whale_data.get('recent_large_transfers', []) |
|
|
if recent_transfers: |
|
|
analysis_parts.append(f" • Recent Large Transfers: {len(recent_transfers)}") |
|
|
else: |
|
|
analysis_parts.append(f"{symbol} Specific: No contract-based whale data available") |
|
|
|
|
|
return "\n".join(analysis_parts) |