Spaces:
Build error
Build error
| """Utility functions for news extraction, sentiment analysis, and text-to-speech.""" | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification | |
| from gtts import gTTS | |
| import os | |
| from typing import List, Dict, Any | |
| import pandas as pd | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from config import * | |
| import re | |
| from datetime import datetime, timedelta | |
| import time | |
| import json | |
| from googletrans import Translator, LANGUAGES | |
| import statistics | |
| def analyze_company_data(company_name: str) -> Dict[str, Any]: | |
| """Analyze company news and generate insights.""" | |
| try: | |
| # Initialize components | |
| news_extractor = NewsExtractor() | |
| sentiment_analyzer = SentimentAnalyzer() | |
| text_summarizer = TextSummarizer() | |
| comparative_analyzer = ComparativeAnalyzer() | |
| # Get news articles | |
| articles = news_extractor.search_news(company_name) | |
| if not articles: | |
| return { | |
| "articles": [], | |
| "comparative_sentiment_score": {}, | |
| "final_sentiment_analysis": "No articles found for analysis.", | |
| "audio_path": None | |
| } | |
| # Process each article | |
| processed_articles = [] | |
| sentiment_scores = {} | |
| for article in articles: | |
| # Generate summary | |
| summary = text_summarizer.summarize(article['content']) | |
| article['summary'] = summary | |
| # Analyze overall sentiment | |
| sentiment = sentiment_analyzer.analyze(article['content']) | |
| article['sentiment'] = sentiment | |
| # Analyze fine-grained sentiment | |
| try: | |
| fine_grained_results = sentiment_analyzer._get_fine_grained_sentiment(article['content']) | |
| article['fine_grained_sentiment'] = fine_grained_results | |
| # Add sentiment indices | |
| sentiment_indices = sentiment_analyzer._calculate_sentiment_indices(fine_grained_results) | |
| article['sentiment_indices'] = sentiment_indices | |
| # Add entities and sentiment targets | |
| entities = sentiment_analyzer._extract_entities(article['content']) | |
| article['entities'] = entities | |
| sentiment_targets = sentiment_analyzer._extract_sentiment_targets(article['content'], entities) | |
| article['sentiment_targets'] = sentiment_targets | |
| except Exception as e: | |
| print(f"Error in fine-grained sentiment analysis: {str(e)}") | |
| # Track sentiment by source | |
| source = article['source'] | |
| if source not in sentiment_scores: | |
| sentiment_scores[source] = [] | |
| sentiment_scores[source].append(sentiment) | |
| processed_articles.append(article) | |
| # Calculate overall sentiment | |
| overall_sentiment = sentiment_analyzer.get_overall_sentiment(processed_articles) | |
| # Ensure consistent array lengths in sentiment_scores | |
| max_length = max(len(scores) for scores in sentiment_scores.values()) | |
| for source in sentiment_scores: | |
| # Pad shorter arrays with 'neutral' to match the longest array | |
| sentiment_scores[source].extend(['neutral'] * (max_length - len(sentiment_scores[source]))) | |
| # Get comparative analysis | |
| comparative_analysis = comparative_analyzer.analyze_coverage(processed_articles, company_name) | |
| # Combine all results | |
| result = { | |
| "articles": processed_articles, | |
| "comparative_sentiment_score": { | |
| "sentiment_distribution": comparative_analysis.get("sentiment_distribution", {}), | |
| "sentiment_indices": comparative_analysis.get("sentiment_indices", {}), | |
| "source_distribution": comparative_analysis.get("source_distribution", {}), | |
| "common_topics": comparative_analysis.get("common_topics", []), | |
| "coverage_differences": comparative_analysis.get("coverage_differences", []), | |
| "total_articles": len(processed_articles) | |
| }, | |
| "final_sentiment_analysis": overall_sentiment, | |
| "ensemble_info": sentiment_analyzer._get_ensemble_sentiment("\n".join([a['content'] for a in processed_articles])), | |
| "audio_path": None | |
| } | |
| return result | |
| except Exception as e: | |
| print(f"Error analyzing company data: {str(e)}") | |
| return { | |
| "articles": [], | |
| "comparative_sentiment_score": {}, | |
| "final_sentiment_analysis": f"Error during analysis: {str(e)}", | |
| "audio_path": None | |
| } | |
| # Initialize translator with retry mechanism | |
| def get_translator(): | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| try: | |
| translator = Translator() | |
| # Test the translator | |
| translator.translate('test', dest='en') | |
| return translator | |
| except Exception as e: | |
| if attempt == max_retries - 1: | |
| print(f"Failed to initialize translator after {max_retries} attempts: {str(e)}") | |
| return None | |
| time.sleep(1) # Wait before retrying | |
| return None | |
| class NewsExtractor: | |
| def __init__(self): | |
| self.headers = HEADERS | |
| self.start_time = None | |
| self.timeout = 30 # 30 seconds timeout | |
| def search_news(self, company_name: str) -> List[Dict[str, str]]: | |
| """Extract news articles about the company ensuring minimum count.""" | |
| self.start_time = time.time() | |
| all_articles = [] | |
| retries = 2 # Number of retries if we don't get enough articles | |
| min_articles = MIN_ARTICLES # Start with default minimum | |
| while retries > 0 and len(all_articles) < min_articles: | |
| # Check for timeout | |
| if time.time() - self.start_time > self.timeout: | |
| print(f"\nTimeout reached after {self.timeout} seconds. Proceeding with available articles.") | |
| break | |
| for source, url_template in NEWS_SOURCES.items(): | |
| try: | |
| url = url_template.format(company_name.replace(" ", "+")) | |
| print(f"\nSearching {source} for news about {company_name}...") | |
| # Try different page numbers for more articles | |
| for page in range(2): # Try first two pages | |
| # Check for timeout again | |
| if time.time() - self.start_time > self.timeout: | |
| break | |
| page_url = url | |
| if page > 0: | |
| if source == "google": | |
| page_url += f"&start={page * 10}" | |
| elif source == "bing": | |
| page_url += f"&first={page * 10 + 1}" | |
| elif source == "yahoo": | |
| page_url += f"&b={page * 10 + 1}" | |
| elif source == "reuters": | |
| page_url += f"&page={page + 1}" | |
| elif source == "marketwatch": | |
| page_url += f"&page={page + 1}" | |
| elif source == "investing": | |
| page_url += f"&page={page + 1}" | |
| elif source == "techcrunch": | |
| page_url += f"/page/{page + 1}" | |
| elif source == "zdnet": | |
| page_url += f"&page={page + 1}" | |
| response = requests.get(page_url, headers=self.headers, timeout=15) | |
| if response.status_code != 200: | |
| print(f"Error: {source} page {page+1} returned status code {response.status_code}") | |
| continue | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| source_articles = [] | |
| if source == "google": | |
| source_articles = self._parse_google_news(soup) | |
| elif source == "bing": | |
| source_articles = self._parse_bing_news(soup) | |
| elif source == "yahoo": | |
| source_articles = self._parse_yahoo_news(soup) | |
| elif source == "reuters": | |
| source_articles = self._parse_reuters_news(soup) | |
| elif source == "marketwatch": | |
| source_articles = self._parse_marketwatch_news(soup) | |
| elif source == "investing": | |
| source_articles = self._parse_investing_news(soup) | |
| elif source == "techcrunch": | |
| source_articles = self._parse_techcrunch_news(soup) | |
| elif source == "zdnet": | |
| source_articles = self._parse_zdnet_news(soup) | |
| # Limit articles per source | |
| if source_articles: | |
| source_articles = source_articles[:MAX_ARTICLES_PER_SOURCE] | |
| all_articles.extend(source_articles) | |
| print(f"Found {len(source_articles)} articles from {source} page {page+1}") | |
| # If we have enough articles, break the page loop | |
| if len(all_articles) >= min_articles: | |
| break | |
| except Exception as e: | |
| print(f"Error fetching from {source}: {str(e)}") | |
| continue | |
| # If we have enough articles, break the source loop | |
| if len(all_articles) >= min_articles: | |
| break | |
| retries -= 1 | |
| if len(all_articles) < min_articles and retries > 0: | |
| print(f"\nFound only {len(all_articles)} articles, retrying...") | |
| # Lower the minimum requirement if we're close | |
| if len(all_articles) >= 15: # If we have at least 15 articles | |
| min_articles = len(all_articles) | |
| print(f"Adjusting minimum requirement to {min_articles} articles") | |
| # Remove duplicates | |
| unique_articles = self._remove_duplicates(all_articles) | |
| print(f"\nFound {len(unique_articles)} unique articles") | |
| if len(unique_articles) < MIN_ARTICLES: | |
| print(f"Warning: Could only find {len(unique_articles)} unique articles, fewer than minimum {MIN_ARTICLES}") | |
| print("Proceeding with available articles...") | |
| # Balance articles across sources | |
| balanced_articles = self._balance_sources(unique_articles) | |
| return balanced_articles[:max(len(unique_articles), MAX_ARTICLES)] | |
| def _balance_sources(self, articles: List[Dict[str, str]]) -> List[Dict[str, str]]: | |
| """Balance articles across sources while maintaining minimum count.""" | |
| source_articles = {} | |
| # Group articles by source | |
| for article in articles: | |
| source = article['source'] | |
| if source not in source_articles: | |
| source_articles[source] = [] | |
| source_articles[source].append(article) | |
| # Calculate target articles per source | |
| total_sources = len(source_articles) | |
| target_per_source = max(MIN_ARTICLES // total_sources, | |
| MAX_ARTICLES_PER_SOURCE) | |
| # Get articles from each source | |
| balanced = [] | |
| for source, articles_list in source_articles.items(): | |
| balanced.extend(articles_list[:target_per_source]) | |
| # If we still need more articles to meet minimum, add more from sources | |
| # that have additional articles | |
| if len(balanced) < MIN_ARTICLES: | |
| remaining = [] | |
| for articles_list in source_articles.values(): | |
| remaining.extend(articles_list[target_per_source:]) | |
| # Sort remaining by source to maintain balance | |
| remaining.sort(key=lambda x: len([a for a in balanced if a['source'] == x['source']])) | |
| while len(balanced) < MIN_ARTICLES and remaining: | |
| balanced.append(remaining.pop(0)) | |
| return balanced | |
| def _parse_google_news(self, soup: BeautifulSoup) -> List[Dict[str, str]]: | |
| """Parse Google News search results.""" | |
| articles = [] | |
| for div in soup.find_all(['div', 'article'], class_=['g', 'xuvV6b', 'WlydOe']): | |
| try: | |
| title_elem = div.find(['h3', 'h4']) | |
| snippet_elem = div.find('div', class_=['VwiC3b', 'yy6M1d']) | |
| link_elem = div.find('a') | |
| source_elem = div.find(['div', 'span'], class_='UPmit') | |
| if title_elem and snippet_elem and link_elem: | |
| source = source_elem.get_text(strip=True) if source_elem else 'Google News' | |
| articles.append({ | |
| 'title': title_elem.get_text(strip=True), | |
| 'content': snippet_elem.get_text(strip=True), | |
| 'url': link_elem['href'], | |
| 'source': source | |
| }) | |
| except Exception as e: | |
| print(f"Error parsing Google article: {str(e)}") | |
| continue | |
| return articles | |
| def _parse_bing_news(self, soup: BeautifulSoup) -> List[Dict[str, str]]: | |
| """Parse Bing News search results.""" | |
| articles = [] | |
| for article in soup.find_all(['div', 'article'], class_=['news-card', 'newsitem', 'item-info']): | |
| try: | |
| title_elem = article.find(['a', 'h3'], class_=['title', 'news-card-title']) | |
| snippet_elem = article.find(['div', 'p'], class_=['snippet', 'description']) | |
| source_elem = article.find(['div', 'span'], class_=['source', 'provider']) | |
| if title_elem and snippet_elem: | |
| source = source_elem.get_text(strip=True) if source_elem else 'Bing News' | |
| url = title_elem['href'] if 'href' in title_elem.attrs else '' | |
| articles.append({ | |
| 'title': title_elem.get_text(strip=True), | |
| 'content': snippet_elem.get_text(strip=True), | |
| 'url': url, | |
| 'source': source | |
| }) | |
| except Exception as e: | |
| print(f"Error parsing Bing article: {str(e)}") | |
| return articles | |
| def _parse_yahoo_news(self, soup: BeautifulSoup) -> List[Dict[str, str]]: | |
| """Parse Yahoo News search results.""" | |
| articles = [] | |
| for article in soup.find_all('div', class_='NewsArticle'): | |
| try: | |
| title_elem = article.find(['h4', 'h3', 'a']) | |
| snippet_elem = article.find('p') | |
| source_elem = article.find(['span', 'div'], class_=['provider', 'source']) | |
| if title_elem and snippet_elem: | |
| source = source_elem.get_text(strip=True) if source_elem else 'Yahoo News' | |
| url = title_elem.find('a')['href'] if title_elem.find('a') else '' | |
| articles.append({ | |
| 'title': title_elem.get_text(strip=True), | |
| 'content': snippet_elem.get_text(strip=True), | |
| 'url': url, | |
| 'source': source | |
| }) | |
| except Exception as e: | |
| print(f"Error parsing Yahoo article: {str(e)}") | |
| return articles | |
| def _parse_reuters_news(self, soup: BeautifulSoup) -> List[Dict[str, str]]: | |
| """Parse Reuters search results.""" | |
| articles = [] | |
| for article in soup.find_all(['div', 'article'], class_=['search-result-content', 'story']): | |
| try: | |
| title_elem = article.find(['h3', 'a'], class_='story-title') | |
| snippet_elem = article.find(['p', 'div'], class_=['story-description', 'description']) | |
| if title_elem: | |
| url = title_elem.find('a')['href'] if title_elem.find('a') else '' | |
| if url and not url.startswith('http'): | |
| url = 'https://www.reuters.com' + url | |
| articles.append({ | |
| 'title': title_elem.get_text(strip=True), | |
| 'content': snippet_elem.get_text(strip=True) if snippet_elem else '', | |
| 'url': url, | |
| 'source': 'Reuters' | |
| }) | |
| except Exception as e: | |
| print(f"Error parsing Reuters article: {str(e)}") | |
| return articles | |
| def _parse_marketwatch_news(self, soup: BeautifulSoup) -> List[Dict[str, str]]: | |
| """Parse MarketWatch search results.""" | |
| articles = [] | |
| for article in soup.find_all(['div', 'article'], class_=['element--article', 'article__content']): | |
| try: | |
| title_elem = article.find(['h3', 'h2'], class_=['article__headline', 'title']) | |
| snippet_elem = article.find('p', class_=['article__summary', 'description']) | |
| if title_elem: | |
| url = title_elem.find('a')['href'] if title_elem.find('a') else '' | |
| articles.append({ | |
| 'title': title_elem.get_text(strip=True), | |
| 'content': snippet_elem.get_text(strip=True) if snippet_elem else '', | |
| 'url': url, | |
| 'source': 'MarketWatch' | |
| }) | |
| except Exception as e: | |
| print(f"Error parsing MarketWatch article: {str(e)}") | |
| return articles | |
| def _parse_investing_news(self, soup: BeautifulSoup) -> List[Dict[str, str]]: | |
| """Parse Investing.com search results.""" | |
| articles = [] | |
| for article in soup.find_all(['div', 'article'], class_=['articleItem', 'news-item']): | |
| try: | |
| title_elem = article.find(['a', 'h3'], class_=['title', 'articleTitle']) | |
| snippet_elem = article.find(['p', 'div'], class_=['description', 'articleContent']) | |
| if title_elem: | |
| url = title_elem['href'] if 'href' in title_elem.attrs else title_elem.find('a')['href'] | |
| if url and not url.startswith('http'): | |
| url = 'https://www.investing.com' + url | |
| articles.append({ | |
| 'title': title_elem.get_text(strip=True), | |
| 'content': snippet_elem.get_text(strip=True) if snippet_elem else '', | |
| 'url': url, | |
| 'source': 'Investing.com' | |
| }) | |
| except Exception as e: | |
| print(f"Error parsing Investing.com article: {str(e)}") | |
| return articles | |
| def _parse_techcrunch_news(self, soup: BeautifulSoup) -> List[Dict[str, str]]: | |
| """Parse TechCrunch search results.""" | |
| articles = [] | |
| for article in soup.find_all(['div', 'article'], class_=['post-block', 'article-block']): | |
| try: | |
| title_elem = article.find(['h2', 'h3', 'a'], class_=['post-block__title', 'article-title']) | |
| snippet_elem = article.find(['div', 'p'], class_=['post-block__content', 'article-content']) | |
| if title_elem: | |
| url = title_elem.find('a')['href'] if title_elem.find('a') else '' | |
| articles.append({ | |
| 'title': title_elem.get_text(strip=True), | |
| 'content': snippet_elem.get_text(strip=True) if snippet_elem else '', | |
| 'url': url, | |
| 'source': 'TechCrunch' | |
| }) | |
| except Exception as e: | |
| print(f"Error parsing TechCrunch article: {str(e)}") | |
| return articles | |
| def _parse_zdnet_news(self, soup: BeautifulSoup) -> List[Dict[str, str]]: | |
| """Parse ZDNet search results.""" | |
| articles = [] | |
| for article in soup.find_all(['div', 'article'], class_=['item', 'article']): | |
| try: | |
| title_elem = article.find(['h3', 'a'], class_=['title', 'headline']) | |
| snippet_elem = article.find(['p', 'div'], class_=['summary', 'content']) | |
| if title_elem: | |
| url = title_elem.find('a')['href'] if title_elem.find('a') else '' | |
| if url and not url.startswith('http'): | |
| url = 'https://www.zdnet.com' + url | |
| articles.append({ | |
| 'title': title_elem.get_text(strip=True), | |
| 'content': snippet_elem.get_text(strip=True) if snippet_elem else '', | |
| 'url': url, | |
| 'source': 'ZDNet' | |
| }) | |
| except Exception as e: | |
| print(f"Error parsing ZDNet article: {str(e)}") | |
| return articles | |
| def _remove_duplicates(self, articles: List[Dict[str, str]]) -> List[Dict[str, str]]: | |
| """Remove duplicate articles based on title similarity.""" | |
| unique_articles = [] | |
| seen_titles = set() | |
| for article in articles: | |
| title = article['title'].lower() | |
| if not any(title in seen_title or seen_title in title for seen_title in seen_titles): | |
| unique_articles.append(article) | |
| seen_titles.add(title) | |
| return unique_articles | |
| class SentimentAnalyzer: | |
| def __init__(self): | |
| try: | |
| # Primary financial sentiment model | |
| self.sentiment_pipeline = pipeline("sentiment-analysis", | |
| model=SENTIMENT_MODEL) | |
| # Initialize fine-grained sentiment models | |
| self.fine_grained_models = {} | |
| try: | |
| # Initialize the default fine-grained model for backward compatibility | |
| self.fine_grained_sentiment = pipeline("sentiment-analysis", | |
| model=SENTIMENT_FINE_GRAINED_MODEL) | |
| # Initialize additional fine-grained models | |
| for model_name, model_path in FINE_GRAINED_MODELS.items(): | |
| try: | |
| print(f"Loading fine-grained model: {model_name}") | |
| self.fine_grained_models[model_name] = pipeline("sentiment-analysis", | |
| model=model_path) | |
| except Exception as e: | |
| print(f"Error loading fine-grained model {model_name}: {str(e)}") | |
| except Exception as e: | |
| print(f"Error initializing fine-grained models: {str(e)}") | |
| self.fine_grained_sentiment = None | |
| # Initialize additional sentiment analyzers if available | |
| self.has_textblob = False | |
| self.has_vader = False | |
| try: | |
| from textblob import TextBlob | |
| self.TextBlob = TextBlob | |
| self.has_textblob = True | |
| except: | |
| print("TextBlob not available. Install with: pip install textblob") | |
| try: | |
| from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer | |
| self.vader = SentimentIntensityAnalyzer() | |
| self.has_vader = True | |
| except: | |
| print("VADER not available. Install with: pip install vaderSentiment") | |
| self.summarizer = pipeline("summarization", | |
| model=SUMMARIZATION_MODEL) | |
| self.vectorizer = TfidfVectorizer(stop_words='english', | |
| max_features=10) | |
| # Initialize NER pipeline if spaCy is available | |
| try: | |
| import spacy | |
| self.nlp = spacy.load("en_core_web_sm") | |
| self.has_ner = True | |
| except: | |
| self.has_ner = False | |
| print("spaCy not available for NER. Install with: pip install spacy && python -m spacy download en_core_web_sm") | |
| except Exception as e: | |
| print(f"Error initializing sentiment models: {str(e)}") | |
| # Fallback to default models if specific models fail | |
| self.sentiment_pipeline = pipeline("sentiment-analysis") | |
| self.fine_grained_sentiment = None | |
| self.fine_grained_models = {} | |
| self.summarizer = pipeline("summarization") | |
| self.vectorizer = TfidfVectorizer(stop_words='english', max_features=10) | |
| self.has_ner = False | |
| self.has_textblob = False | |
| self.has_vader = False | |
| def analyze(self, text: str) -> str: | |
| """Analyze sentiment of text and return sentiment label.""" | |
| try: | |
| # Get ensemble sentiment analysis | |
| sentiment_analysis = self._get_ensemble_sentiment(text) | |
| return sentiment_analysis['ensemble_sentiment'] | |
| except Exception as e: | |
| print(f"Error in sentiment analysis: {str(e)}") | |
| return 'neutral' # Default to neutral on error | |
| def get_overall_sentiment(self, articles: List[Dict[str, Any]]) -> str: | |
| """Get overall sentiment from a list of articles.""" | |
| try: | |
| # Combine all article texts | |
| combined_text = ' '.join([ | |
| f"{article.get('title', '')} {article.get('content', '')}" | |
| for article in articles | |
| ]) | |
| # Get ensemble sentiment analysis | |
| sentiment_analysis = self._get_ensemble_sentiment(combined_text) | |
| return sentiment_analysis['ensemble_sentiment'] | |
| except Exception as e: | |
| print(f"Error getting overall sentiment: {str(e)}") | |
| return 'neutral' # Default to neutral on error | |
| def analyze_article(self, article: Dict[str, str]) -> Dict[str, Any]: | |
| """Analyze sentiment and generate summary for an article.""" | |
| try: | |
| # Get the full text by combining title and content | |
| full_text = f"{article['title']} {article['content']}" | |
| # Generate summary | |
| summary = self.summarize_text(full_text) | |
| # Get ensemble sentiment analysis | |
| sentiment_analysis = self._get_ensemble_sentiment(full_text) | |
| sentiment_label = sentiment_analysis['ensemble_sentiment'] | |
| sentiment_score = sentiment_analysis['ensemble_score'] | |
| # Add fine-grained sentiment analysis | |
| fine_grained_sentiment = self._get_fine_grained_sentiment(full_text) | |
| # Extract key topics | |
| topics = self.extract_topics(full_text) | |
| # Extract named entities | |
| entities = self._extract_entities(full_text) | |
| # Extract sentiment targets (entities associated with sentiment) | |
| sentiment_targets = self._extract_sentiment_targets(full_text, entities) | |
| # Add analysis to article | |
| analyzed_article = article.copy() | |
| analyzed_article.update({ | |
| 'summary': summary, | |
| 'sentiment': sentiment_label, | |
| 'sentiment_score': sentiment_score, | |
| 'sentiment_details': sentiment_analysis, | |
| 'fine_grained_sentiment': fine_grained_sentiment, | |
| 'topics': topics, | |
| 'entities': entities, | |
| 'sentiment_targets': sentiment_targets, | |
| 'sentiment_indices': fine_grained_sentiment.get('indices', {}), | |
| 'analysis_timestamp': datetime.now().isoformat() | |
| }) | |
| return analyzed_article | |
| except Exception as e: | |
| print(f"Error analyzing article: {str(e)}") | |
| # Return original article with default values if analysis fails | |
| article.update({ | |
| 'summary': article.get('content', '')[:200] + '...', | |
| 'sentiment': 'neutral', | |
| 'sentiment_score': 0.0, | |
| 'sentiment_details': {}, | |
| 'fine_grained_sentiment': {}, | |
| 'topics': [], | |
| 'entities': {}, | |
| 'sentiment_targets': [], | |
| 'sentiment_indices': { | |
| 'positivity_index': 0.5, | |
| 'negativity_index': 0.5, | |
| 'emotional_intensity': 0.0, | |
| 'controversy_score': 0.0, | |
| 'confidence_score': 0.0, | |
| 'esg_relevance': 0.0 | |
| }, | |
| 'analysis_timestamp': datetime.now().isoformat() | |
| }) | |
| return article | |
| def _get_ensemble_sentiment(self, text: str) -> Dict[str, Any]: | |
| """Get ensemble sentiment by combining multiple sentiment models.""" | |
| results = {} | |
| # Initialize with default values | |
| ensemble_result = { | |
| 'ensemble_sentiment': 'neutral', | |
| 'ensemble_score': 0.5, | |
| 'models': {} | |
| } | |
| try: | |
| # 1. Primary transformer model (finbert) | |
| try: | |
| primary_result = self.sentiment_pipeline(text[:512])[0] # Limit text length | |
| primary_label = primary_result['label'].lower() | |
| primary_score = primary_result['score'] | |
| # Map to standard format | |
| if primary_label == 'positive': | |
| primary_normalized = primary_score | |
| elif primary_label == 'negative': | |
| primary_normalized = 1 - primary_score | |
| else: # neutral | |
| primary_normalized = 0.5 | |
| ensemble_result['models']['transformer'] = { | |
| 'sentiment': primary_label, | |
| 'score': round(primary_score, 3), | |
| 'normalized_score': round(primary_normalized, 3) | |
| } | |
| except: | |
| ensemble_result['models']['transformer'] = { | |
| 'sentiment': 'error', | |
| 'score': 0, | |
| 'normalized_score': 0.5 | |
| } | |
| # 2. TextBlob sentiment | |
| if self.has_textblob: | |
| try: | |
| blob = self.TextBlob(text) | |
| polarity = blob.sentiment.polarity | |
| # Convert to standard format | |
| if polarity > 0.1: | |
| textblob_sentiment = 'positive' | |
| textblob_score = polarity | |
| elif polarity < -0.1: | |
| textblob_sentiment = 'negative' | |
| textblob_score = abs(polarity) | |
| else: | |
| textblob_sentiment = 'neutral' | |
| textblob_score = 0.5 | |
| # Normalize to 0-1 scale | |
| textblob_normalized = (polarity + 1) / 2 | |
| ensemble_result['models']['textblob'] = { | |
| 'sentiment': textblob_sentiment, | |
| 'score': round(textblob_score, 3), | |
| 'normalized_score': round(textblob_normalized, 3) | |
| } | |
| except: | |
| ensemble_result['models']['textblob'] = { | |
| 'sentiment': 'error', | |
| 'score': 0, | |
| 'normalized_score': 0.5 | |
| } | |
| # 3. VADER sentiment | |
| if self.has_vader: | |
| try: | |
| vader_scores = self.vader.polarity_scores(text) | |
| compound = vader_scores['compound'] | |
| # Convert to standard format | |
| if compound > 0.05: | |
| vader_sentiment = 'positive' | |
| vader_score = compound | |
| elif compound < -0.05: | |
| vader_sentiment = 'negative' | |
| vader_score = abs(compound) | |
| else: | |
| vader_sentiment = 'neutral' | |
| vader_score = 0.5 | |
| # Normalize to 0-1 scale | |
| vader_normalized = (compound + 1) / 2 | |
| ensemble_result['models']['vader'] = { | |
| 'sentiment': vader_sentiment, | |
| 'score': round(vader_score, 3), | |
| 'normalized_score': round(vader_normalized, 3) | |
| } | |
| except: | |
| ensemble_result['models']['vader'] = { | |
| 'sentiment': 'error', | |
| 'score': 0, | |
| 'normalized_score': 0.5 | |
| } | |
| # Calculate ensemble result | |
| # Get all normalized scores | |
| normalized_scores = [] | |
| for model_name, model_result in ensemble_result['models'].items(): | |
| if model_result['sentiment'] != 'error': | |
| normalized_scores.append(model_result['normalized_score']) | |
| # Calculate average if we have scores | |
| if normalized_scores: | |
| avg_score = sum(normalized_scores) / len(normalized_scores) | |
| # Convert to sentiment label | |
| if avg_score > 0.6: | |
| ensemble_sentiment = 'positive' | |
| elif avg_score < 0.4: | |
| ensemble_sentiment = 'negative' | |
| else: | |
| ensemble_sentiment = 'neutral' | |
| ensemble_result['ensemble_sentiment'] = ensemble_sentiment | |
| ensemble_result['ensemble_score'] = round(avg_score, 3) | |
| # Add confidence level | |
| if len(normalized_scores) > 1: | |
| # Calculate standard deviation to measure agreement | |
| std_dev = statistics.stdev(normalized_scores) if len(normalized_scores) > 1 else 0 | |
| agreement = 1 - (std_dev * 2) # Lower std_dev means higher agreement | |
| agreement = max(0, min(1, agreement)) # Clamp to 0-1 | |
| ensemble_result['model_agreement'] = round(agreement, 3) | |
| return ensemble_result | |
| except Exception as e: | |
| print(f"Error in ensemble sentiment analysis: {str(e)}") | |
| return { | |
| 'ensemble_sentiment': 'neutral', | |
| 'ensemble_score': 0.5, | |
| 'models': {} | |
| } | |
| def _get_fine_grained_sentiment(self, text: str) -> Dict[str, Any]: | |
| """Get fine-grained sentiment analysis with more detailed categories.""" | |
| # Initialize result structure | |
| result = { | |
| "primary": {"category": "unknown", "confidence": 0.0}, | |
| "models": {} | |
| } | |
| # Check if we have any fine-grained models | |
| if not self.fine_grained_sentiment and not self.fine_grained_models: | |
| return result | |
| try: | |
| # Split text into manageable chunks if too long | |
| chunks = self._split_text(text) | |
| # Process with default fine-grained model for backward compatibility | |
| if self.fine_grained_sentiment: | |
| primary_results = [] | |
| for chunk in chunks: | |
| if not chunk.strip(): | |
| continue | |
| chunk_result = self.fine_grained_sentiment(chunk)[0] | |
| primary_results.append(chunk_result) | |
| if primary_results: | |
| # Aggregate results from all chunks | |
| categories = {} | |
| for res in primary_results: | |
| label = res['label'].lower() | |
| score = res['score'] | |
| if label in categories: | |
| categories[label] += score | |
| else: | |
| categories[label] = score | |
| # Normalize scores | |
| total = sum(categories.values()) | |
| if total > 0: | |
| categories = {k: round(v/total, 3) for k, v in categories.items()} | |
| # Get dominant category | |
| dominant_category = max(categories.items(), key=lambda x: x[1]) | |
| result["primary"] = { | |
| "category": dominant_category[0], | |
| "confidence": dominant_category[1], | |
| "distribution": categories | |
| } | |
| # Process with additional fine-grained models | |
| for model_name, model in self.fine_grained_models.items(): | |
| model_results = [] | |
| for chunk in chunks: | |
| if not chunk.strip(): | |
| continue | |
| try: | |
| chunk_result = model(chunk)[0] | |
| model_results.append(chunk_result) | |
| except Exception as e: | |
| print(f"Error analyzing chunk with model {model_name}: {str(e)}") | |
| if model_results: | |
| # Aggregate results from all chunks | |
| categories = {} | |
| for res in model_results: | |
| # Ensure the label is lowercase for consistency | |
| label = res['label'].lower() if isinstance(res.get('label'), str) else "unknown" | |
| score = res['score'] | |
| if label in categories: | |
| categories[label] += score | |
| else: | |
| categories[label] = score | |
| # Normalize scores | |
| total = sum(categories.values()) | |
| if total > 0: | |
| categories = {k: round(v/total, 3) for k, v in categories.items()} | |
| # Get dominant category | |
| dominant_category = max(categories.items(), key=lambda x: x[1]) | |
| # Store results for this model | |
| result["models"][model_name] = { | |
| "category": dominant_category[0], | |
| "confidence": dominant_category[1], | |
| "distribution": categories | |
| } | |
| # Calculate sentiment indices based on the fine-grained results | |
| result["indices"] = self._calculate_sentiment_indices(result) | |
| return result | |
| except Exception as e: | |
| print(f"Error in fine-grained sentiment analysis: {str(e)}") | |
| return result | |
| def _calculate_sentiment_indices(self, fine_grained_results: Dict[str, Any]) -> Dict[str, float]: | |
| """Calculate various sentiment indices based on fine-grained sentiment analysis.""" | |
| indices = { | |
| "positivity_index": 0.5, # Default neutral value | |
| "negativity_index": 0.5, | |
| "emotional_intensity": 0.0, | |
| "controversy_score": 0.0, | |
| "confidence_score": 0.0, | |
| "esg_relevance": 0.0 | |
| } | |
| try: | |
| # Extract distributions from all models | |
| distributions = {} | |
| confidence_scores = {} | |
| # Add primary model if available | |
| if "category" in fine_grained_results.get("primary", {}): | |
| if "distribution" in fine_grained_results["primary"]: | |
| distributions["primary"] = fine_grained_results["primary"]["distribution"] | |
| confidence_scores["primary"] = fine_grained_results["primary"].get("confidence", 0.0) | |
| # Add other models | |
| for model_name, model_result in fine_grained_results.get("models", {}).items(): | |
| if "distribution" in model_result: | |
| distributions[model_name] = model_result["distribution"] | |
| confidence_scores[model_name] = model_result.get("confidence", 0.0) | |
| # Calculate positivity index | |
| positive_scores = [] | |
| for model_name, dist in distributions.items(): | |
| if model_name == "financial" or model_name == "primary" or model_name == "news_tone" or model_name == "aspect": | |
| pos_score = dist.get("positive", 0.0) | |
| positive_scores.append(pos_score) | |
| elif model_name == "emotion": | |
| # For emotion model, consider joy as positive | |
| pos_score = dist.get("joy", 0.0) + dist.get("surprise", 0.0) * 0.5 | |
| positive_scores.append(pos_score) | |
| if positive_scores: | |
| indices["positivity_index"] = round(sum(positive_scores) / len(positive_scores), 3) | |
| # Calculate negativity index | |
| negative_scores = [] | |
| for model_name, dist in distributions.items(): | |
| if model_name == "financial" or model_name == "primary" or model_name == "news_tone" or model_name == "aspect": | |
| neg_score = dist.get("negative", 0.0) | |
| negative_scores.append(neg_score) | |
| elif model_name == "emotion": | |
| # For emotion model, consider sadness, anger, fear, disgust as negative | |
| neg_score = dist.get("sadness", 0.0) + dist.get("anger", 0.0) + \ | |
| dist.get("fear", 0.0) + dist.get("disgust", 0.0) | |
| negative_scores.append(neg_score / 4) # Average of 4 negative emotions | |
| if negative_scores: | |
| indices["negativity_index"] = round(sum(negative_scores) / len(negative_scores), 3) | |
| # Calculate emotional intensity | |
| emotion_dist = distributions.get("emotion", {}) | |
| if emotion_dist: | |
| # Sum all emotional intensities except neutral | |
| emotional_sum = sum(v for k, v in emotion_dist.items() if k != "neutral") | |
| indices["emotional_intensity"] = round(emotional_sum, 3) | |
| # Calculate controversy score (high when both positive and negative are high) | |
| indices["controversy_score"] = round(indices["positivity_index"] * indices["negativity_index"] * 4, 3) | |
| # Calculate confidence score (average of all model confidences) | |
| if confidence_scores: | |
| indices["confidence_score"] = round(sum(confidence_scores.values()) / len(confidence_scores), 3) | |
| # Calculate ESG relevance if available | |
| esg_dist = distributions.get("esg", {}) | |
| if esg_dist: | |
| # Sum of all ESG categories | |
| esg_sum = sum(v for k, v in esg_dist.items() if k in ["environmental", "social", "governance"]) | |
| indices["esg_relevance"] = round(esg_sum, 3) | |
| return indices | |
| except Exception as e: | |
| print(f"Error calculating sentiment indices: {str(e)}") | |
| return indices | |
| def summarize_text(self, text: str) -> str: | |
| """Generate a concise summary of the text.""" | |
| try: | |
| # Clean and prepare text | |
| text = text.replace('\n', ' ').strip() | |
| # For very short texts, return as is | |
| if len(text.split()) < 30: | |
| return text | |
| # Split text into chunks if it's too long | |
| chunks = self._split_text(text) | |
| summaries = [] | |
| for chunk in chunks: | |
| # Calculate appropriate max_length based on input length | |
| input_words = len(chunk.split()) | |
| max_length = min(130, max(30, input_words // 2)) | |
| min_length = min(30, max(10, input_words // 4)) | |
| # Generate summary for each chunk | |
| summary = self.summarizer(chunk, | |
| max_length=max_length, | |
| min_length=min_length, | |
| do_sample=False)[0]['summary_text'] | |
| summaries.append(summary) | |
| # Combine summaries if there were multiple chunks | |
| final_summary = ' '.join(summaries) | |
| return final_summary | |
| except Exception as e: | |
| print(f"Error generating summary: {str(e)}") | |
| return text[:200] + '...' # Return truncated text as fallback | |
| def extract_topics(self, text: str) -> List[str]: | |
| """Extract key topics from the text using TF-IDF.""" | |
| try: | |
| # Prepare text | |
| text = text.lower() | |
| # Fit and transform the text | |
| tfidf_matrix = self.vectorizer.fit_transform([text]) | |
| # Get feature names and scores | |
| feature_names = self.vectorizer.get_feature_names_out() | |
| scores = tfidf_matrix.toarray()[0] | |
| # Get top topics | |
| top_indices = scores.argsort()[-5:][::-1] # Get top 5 topics | |
| topics = [feature_names[i] for i in top_indices] | |
| return topics | |
| except Exception as e: | |
| print(f"Error extracting topics: {str(e)}") | |
| return [] | |
| def _split_text(self, text: str, max_length: int = 1024) -> List[str]: | |
| """Split text into chunks that fit within model's maximum token limit.""" | |
| words = text.split() | |
| chunks = [] | |
| current_chunk = [] | |
| current_length = 0 | |
| for word in words: | |
| word_length = len(word) + 1 # +1 for space | |
| if current_length + word_length > max_length: | |
| chunks.append(' '.join(current_chunk)) | |
| current_chunk = [word] | |
| current_length = word_length | |
| else: | |
| current_chunk.append(word) | |
| current_length += word_length | |
| if current_chunk: | |
| chunks.append(' '.join(current_chunk)) | |
| return chunks | |
| def _extract_entities(self, text: str) -> Dict[str, List[str]]: | |
| """Extract named entities from text.""" | |
| entities = { | |
| 'PERSON': [], | |
| 'ORG': [], | |
| 'GPE': [], # Countries, cities, states | |
| 'MONEY': [], | |
| 'PERCENT': [], | |
| 'DATE': [] | |
| } | |
| if not self.has_ner: | |
| return entities | |
| try: | |
| # Process text with spaCy | |
| doc = self.nlp(text[:10000]) # Limit text length for performance | |
| # Extract entities | |
| for ent in doc.ents: | |
| if ent.label_ in entities: | |
| # Clean entity text and deduplicate | |
| clean_text = ent.text.strip() | |
| if clean_text and clean_text not in entities[ent.label_]: | |
| entities[ent.label_].append(clean_text) | |
| return entities | |
| except Exception as e: | |
| print(f"Error extracting entities: {str(e)}") | |
| return entities | |
| def _extract_sentiment_targets(self, text: str, entities: Dict[str, List[str]]) -> List[Dict[str, Any]]: | |
| """Extract entities that are targets of sentiment expressions.""" | |
| if not self.has_ner: | |
| return [] | |
| try: | |
| # Get all entities as a flat list | |
| all_entities = [] | |
| for entity_type, entity_list in entities.items(): | |
| for entity in entity_list: | |
| all_entities.append({ | |
| 'text': entity, | |
| 'type': entity_type | |
| }) | |
| # Find sentiment targets | |
| targets = [] | |
| # Split text into sentences | |
| doc = self.nlp(text[:10000]) # Limit text length | |
| for sentence in doc.sents: | |
| # Skip short sentences | |
| if len(sentence.text.split()) < 3: | |
| continue | |
| # Check for sentiment in this sentence | |
| try: | |
| sentiment = self.sentiment_pipeline(sentence.text)[0] | |
| # Only process if sentiment is strong | |
| if sentiment['score'] > 0.7: | |
| # Find entities in this sentence | |
| for entity in all_entities: | |
| if entity['text'] in sentence.text: | |
| targets.append({ | |
| 'entity': entity['text'], | |
| 'type': entity['type'], | |
| 'sentiment': sentiment['label'].lower(), | |
| 'confidence': round(sentiment['score'], 3), | |
| 'context': sentence.text | |
| }) | |
| except: | |
| continue | |
| # Return unique targets | |
| unique_targets = [] | |
| seen = set() | |
| for target in targets: | |
| key = f"{target['entity']}_{target['sentiment']}" | |
| if key not in seen: | |
| seen.add(key) | |
| unique_targets.append(target) | |
| return unique_targets | |
| except Exception as e: | |
| print(f"Error extracting sentiment targets: {str(e)}") | |
| return [] | |
| class TextSummarizer: | |
| def __init__(self): | |
| try: | |
| # Initialize the summarization pipeline | |
| self.summarizer = pipeline("summarization", model=SUMMARIZATION_MODEL) | |
| except Exception as e: | |
| print(f"Error initializing TextSummarizer: {str(e)}") | |
| # Fallback to default model if specific model fails | |
| self.summarizer = pipeline("summarization") | |
| def summarize(self, text: str) -> str: | |
| """Generate a concise summary of the text.""" | |
| try: | |
| # Clean and prepare text | |
| text = text.replace('\n', ' ').strip() | |
| # For very short texts, return as is | |
| if len(text.split()) < 30: | |
| return text | |
| # Split text into chunks if it's too long | |
| chunks = self._split_text(text) | |
| summaries = [] | |
| for chunk in chunks: | |
| # Calculate appropriate max_length based on input length | |
| input_words = len(chunk.split()) | |
| max_length = min(130, max(30, input_words // 2)) | |
| min_length = min(30, max(10, input_words // 4)) | |
| # Generate summary for each chunk | |
| summary = self.summarizer(chunk, | |
| max_length=max_length, | |
| min_length=min_length, | |
| do_sample=False)[0]['summary_text'] | |
| summaries.append(summary) | |
| # Combine summaries if there were multiple chunks | |
| final_summary = ' '.join(summaries) | |
| return final_summary | |
| except Exception as e: | |
| print(f"Error generating summary: {str(e)}") | |
| return text[:200] + '...' # Return truncated text as fallback | |
| def _split_text(self, text: str, max_length: int = 1024) -> List[str]: | |
| """Split text into chunks that fit within model's maximum token limit.""" | |
| words = text.split() | |
| chunks = [] | |
| current_chunk = [] | |
| current_length = 0 | |
| for word in words: | |
| word_length = len(word) + 1 # +1 for space | |
| if current_length + word_length > max_length: | |
| chunks.append(' '.join(current_chunk)) | |
| current_chunk = [word] | |
| current_length = word_length | |
| else: | |
| current_chunk.append(word) | |
| current_length += word_length | |
| if current_chunk: | |
| chunks.append(' '.join(current_chunk)) | |
| return chunks | |
| class TextToSpeechConverter: | |
| def __init__(self): | |
| self.output_dir = AUDIO_OUTPUT_DIR | |
| self.translator = get_translator() | |
| os.makedirs(self.output_dir, exist_ok=True) | |
| def generate_audio(self, text: str, filename: str) -> str: | |
| """Convert text to Hindi speech and save as audio file.""" | |
| try: | |
| print(f"Translating text to Hindi: {text[:100]}...") | |
| # First translate the text to Hindi | |
| # Use chunking for long text to avoid translation limits | |
| chunks = [] | |
| for i in range(0, len(text), 1000): | |
| chunk = text[i:i+1000] | |
| try: | |
| translated_chunk = self.translator.translate(chunk, dest='hi').text | |
| chunks.append(translated_chunk) | |
| print(f"Translated chunk {i//1000 + 1}") | |
| except Exception as e: | |
| print(f"Error translating chunk {i//1000 + 1}: {str(e)}") | |
| # If translation fails, use original text | |
| chunks.append(chunk) | |
| hindi_text = ' '.join(chunks) | |
| print(f"Translation complete. Hindi text length: {len(hindi_text)}") | |
| # Generate Hindi speech | |
| print("Generating Hindi speech...") | |
| tts = gTTS(text=hindi_text, lang='hi', slow=False) | |
| output_path = os.path.join(self.output_dir, f"{filename}.mp3") | |
| tts.save(output_path) | |
| print(f"Audio saved to {output_path}") | |
| return output_path | |
| except Exception as e: | |
| print(f"Error in TTS conversion: {str(e)}") | |
| # Fallback to original text if translation fails | |
| print("Using fallback English TTS") | |
| tts = gTTS(text=text, lang='en') | |
| output_path = os.path.join(self.output_dir, f"{filename}.mp3") | |
| tts.save(output_path) | |
| return output_path | |
| class ComparativeAnalyzer: | |
| def __init__(self): | |
| pass | |
| def analyze_coverage(self, articles: List[Dict[str, Any]], company_name: str = None) -> Dict[str, Any]: | |
| """Perform comparative analysis across articles.""" | |
| if not articles: | |
| return { | |
| "topics": [], | |
| "sentiment_distribution": {}, | |
| "coverage_differences": ["No articles found for analysis."], | |
| "final_sentiment": "No articles found for analysis.", | |
| "total_articles": 0, | |
| "sentiment_indices": {} | |
| } | |
| # Debug: Print articles for analysis | |
| print(f"Analyzing {len(articles)} articles for company: {company_name}") | |
| # Add company name to each article if provided | |
| if company_name: | |
| for article in articles: | |
| article['company'] = company_name | |
| # Calculate sentiment distribution | |
| print("Calculating sentiment distribution...") | |
| sentiment_dist = self._get_sentiment_distribution(articles) | |
| print("Sentiment distribution result:") | |
| print(sentiment_dist) | |
| # Analyze common topics | |
| topics = self._analyze_topics(articles) | |
| # Analyze coverage differences | |
| differences = self._analyze_coverage_differences(articles) | |
| # Get final sentiment analysis | |
| final_sentiment = self._get_final_sentiment(sentiment_dist, articles) | |
| result = { | |
| "topics": topics, | |
| "sentiment_distribution": sentiment_dist, | |
| "coverage_differences": differences, | |
| "final_sentiment": final_sentiment, | |
| "total_articles": len(articles), | |
| "sentiment_indices": sentiment_dist.get("sentiment_indices", {}) | |
| } | |
| # Debug: Print final result | |
| print("Final comparative analysis result:") | |
| print(result) | |
| return result | |
| def _get_sentiment_distribution(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Calculate distribution of sentiments across articles.""" | |
| # Basic sentiment distribution | |
| basic_distribution = {'positive': 0, 'negative': 0, 'neutral': 0} | |
| # Fine-grained sentiment distribution | |
| fine_grained_distribution = {} | |
| # Sentiment scores | |
| sentiment_scores = [] | |
| # Sentiment indices aggregation | |
| sentiment_indices = { | |
| "positivity_index": [], | |
| "negativity_index": [], | |
| "emotional_intensity": [], | |
| "controversy_score": [], | |
| "confidence_score": [], | |
| "esg_relevance": [] | |
| } | |
| # Debug: Print articles for sentiment distribution | |
| print(f"Processing {len(articles)} articles for sentiment distribution") | |
| # Process each article | |
| for i, article in enumerate(articles): | |
| try: | |
| # Debug: Print article sentiment data | |
| print(f"Article {i+1} sentiment data:") | |
| print(f" Basic sentiment: {article.get('sentiment', 'N/A')}") | |
| print(f" Fine-grained: {article.get('fine_grained_sentiment', {})}") | |
| print(f" Sentiment indices: {article.get('sentiment_indices', {})}") | |
| # Basic sentiment | |
| sentiment = article.get('sentiment', 'neutral') | |
| if isinstance(sentiment, str): | |
| sentiment = sentiment.lower() | |
| # Ensure we have a valid sentiment category | |
| if sentiment not in basic_distribution: | |
| sentiment = 'neutral' | |
| basic_distribution[sentiment] = basic_distribution.get(sentiment, 0) + 1 | |
| else: | |
| # Handle non-string sentiment values | |
| basic_distribution['neutral'] = basic_distribution.get('neutral', 0) + 1 | |
| # Sentiment score | |
| score = article.get('sentiment_score', 0.0) | |
| if isinstance(score, (int, float)): | |
| sentiment_scores.append(score) | |
| # Fine-grained sentiment | |
| fine_grained = article.get('fine_grained_sentiment', {}) | |
| if isinstance(fine_grained, dict) and 'category' in fine_grained: | |
| category = fine_grained['category'] | |
| if isinstance(category, str): | |
| category = category.lower() | |
| fine_grained_distribution[category] = fine_grained_distribution.get(category, 0) + 1 | |
| # Collect sentiment indices | |
| indices = article.get('sentiment_indices', {}) | |
| if isinstance(indices, dict): | |
| for index_name, index_values in sentiment_indices.items(): | |
| if index_name in indices and isinstance(indices[index_name], (int, float)): | |
| index_values.append(indices[index_name]) | |
| except Exception as e: | |
| print(f"Error processing article {i+1} for sentiment distribution: {str(e)}") | |
| # Continue with next article | |
| continue | |
| # Debug: Print collected data | |
| print("Collected sentiment data:") | |
| print(f" Basic distribution: {basic_distribution}") | |
| print(f" Fine-grained distribution: {fine_grained_distribution}") | |
| print(f" Sentiment scores: {sentiment_scores}") | |
| print(f" Sentiment indices collected: {sentiment_indices}") | |
| # Calculate average sentiment score with fallback | |
| avg_sentiment_score = 0.5 # Default neutral value | |
| if sentiment_scores: | |
| avg_sentiment_score = sum(sentiment_scores) / len(sentiment_scores) | |
| # Calculate sentiment volatility (standard deviation) with fallback | |
| sentiment_volatility = 0 | |
| if len(sentiment_scores) > 1: | |
| try: | |
| sentiment_volatility = statistics.stdev(sentiment_scores) | |
| except Exception as e: | |
| print(f"Error calculating sentiment volatility: {str(e)}") | |
| # Calculate average sentiment indices with fallbacks | |
| avg_indices = {} | |
| for index_name, values in sentiment_indices.items(): | |
| if values: | |
| avg_indices[index_name] = round(sum(values) / len(values), 3) | |
| else: | |
| # Provide default values for empty indices | |
| if index_name in ["positivity_index", "confidence_score"]: | |
| avg_indices[index_name] = 0.5 # Neutral default | |
| else: | |
| avg_indices[index_name] = 0.0 # Zero default for other indices | |
| # Ensure all expected indices exist | |
| for index_name in ["positivity_index", "negativity_index", "emotional_intensity", | |
| "controversy_score", "confidence_score", "esg_relevance"]: | |
| if index_name not in avg_indices: | |
| avg_indices[index_name] = 0.5 if index_name in ["positivity_index", "confidence_score"] else 0.0 | |
| # Ensure we have at least one item in each distribution | |
| if not any(basic_distribution.values()): | |
| basic_distribution['neutral'] = 1 | |
| # Ensure fine_grained_distribution has at least one entry if empty | |
| if not fine_grained_distribution: | |
| fine_grained_distribution['neutral'] = 1 | |
| result = { | |
| "basic": basic_distribution, | |
| "fine_grained": fine_grained_distribution, | |
| "avg_score": round(avg_sentiment_score, 3), | |
| "volatility": round(sentiment_volatility, 3), | |
| "sentiment_indices": avg_indices | |
| } | |
| # Debug: Print final sentiment distribution result | |
| print("Final sentiment distribution result:") | |
| print(result) | |
| return result | |
| def _analyze_topics(self, articles: List[Dict[str, Any]]) -> List[str]: | |
| """Analyze common topics across articles using TF-IDF.""" | |
| try: | |
| # Combine title and content for better topic extraction | |
| texts = [f"{article.get('title', '')} {article.get('content', '')}" for article in articles] | |
| # Create and fit TF-IDF | |
| vectorizer = TfidfVectorizer( | |
| max_features=10, | |
| stop_words='english', | |
| ngram_range=(1, 2), | |
| token_pattern=r'(?u)\b[A-Za-z][A-Za-z+\'-]*[A-Za-z]+\b' # Improved pattern | |
| ) | |
| # Clean and normalize texts | |
| cleaned_texts = [] | |
| for text in texts: | |
| # Remove numbers and special characters | |
| cleaned = re.sub(r'\d+', '', text) | |
| cleaned = re.sub(r'[^\w\s]', ' ', cleaned) | |
| cleaned_texts.append(cleaned.lower()) | |
| tfidf_matrix = vectorizer.fit_transform(cleaned_texts) | |
| feature_names = vectorizer.get_feature_names_out() | |
| # Get average TF-IDF scores for each term | |
| avg_scores = tfidf_matrix.mean(axis=0).A1 | |
| # Sort terms by score and return top meaningful terms | |
| sorted_indices = avg_scores.argsort()[-5:][::-1] | |
| meaningful_topics = [] | |
| for idx in sorted_indices: | |
| topic = feature_names[idx] | |
| # Filter out single characters and common words | |
| if len(topic) > 1 and topic not in {'000', 'com', 'said', 'says', 'year', 'new', 'one'}: | |
| meaningful_topics.append(topic) | |
| if len(meaningful_topics) >= 5: | |
| break | |
| return meaningful_topics | |
| except Exception as e: | |
| print(f"Error analyzing topics: {str(e)}") | |
| return [] | |
| def _analyze_coverage_differences(self, articles: List[Dict[str, Any]]) -> List[str]: | |
| """Analyze how coverage differs across articles.""" | |
| if not articles: | |
| return ["No articles available for comparison"] | |
| differences = [] | |
| # Compare sentiment differences | |
| sentiments = [article.get('sentiment', 'neutral').lower() for article in articles] | |
| unique_sentiments = set(sentiments) | |
| if len(unique_sentiments) > 1: | |
| pos_count = sentiments.count('positive') | |
| neg_count = sentiments.count('negative') | |
| neu_count = sentiments.count('neutral') | |
| if pos_count > 0 and neg_count > 0: | |
| differences.append(f"Coverage sentiment varies significantly: {pos_count} positive, {neg_count} negative, and {neu_count} neutral articles.") | |
| # Compare fine-grained sentiment differences | |
| fine_grained_categories = [] | |
| for article in articles: | |
| fine_grained = article.get('fine_grained_sentiment', {}) | |
| if isinstance(fine_grained, dict) and 'category' in fine_grained: | |
| category = fine_grained['category'] | |
| if isinstance(category, str): | |
| fine_grained_categories.append(category.lower()) | |
| unique_categories = set(fine_grained_categories) | |
| if len(unique_categories) > 2: # More than 2 different categories | |
| category_counts = {} | |
| for category in fine_grained_categories: | |
| category_counts[category] = category_counts.get(category, 0) + 1 | |
| top_categories = sorted(category_counts.items(), key=lambda x: x[1], reverse=True)[:3] | |
| categories_str = ", ".join([f"{cat} ({count})" for cat, count in top_categories]) | |
| differences.append(f"Articles show diverse sentiment categories: {categories_str}") | |
| # Compare sentiment indices | |
| indices_differences = [] | |
| positivity_values = [] | |
| negativity_values = [] | |
| controversy_values = [] | |
| for article in articles: | |
| indices = article.get('sentiment_indices', {}) | |
| if indices: | |
| if 'positivity_index' in indices: | |
| positivity_values.append(indices['positivity_index']) | |
| if 'negativity_index' in indices: | |
| negativity_values.append(indices['negativity_index']) | |
| if 'controversy_score' in indices: | |
| controversy_values.append(indices['controversy_score']) | |
| # Check for high variance in positivity | |
| if positivity_values and len(positivity_values) > 1: | |
| if max(positivity_values) - min(positivity_values) > 0.4: | |
| indices_differences.append("Articles show significant variation in positivity levels") | |
| # Check for high variance in negativity | |
| if negativity_values and len(negativity_values) > 1: | |
| if max(negativity_values) - min(negativity_values) > 0.4: | |
| indices_differences.append("Articles show significant variation in negativity levels") | |
| # Check for high controversy scores | |
| if controversy_values: | |
| high_controversy = [v for v in controversy_values if v > 0.5] | |
| if high_controversy: | |
| indices_differences.append(f"{len(high_controversy)} articles show high controversy scores") | |
| if indices_differences: | |
| differences.append("Sentiment index analysis: " + "; ".join(indices_differences)) | |
| # Compare source differences | |
| sources = [article.get('source', '').lower() for article in articles] | |
| source_counts = {} | |
| for source in sources: | |
| if source: | |
| source_counts[source] = source_counts.get(source, 0) + 1 | |
| if len(source_counts) > 1: | |
| top_sources = sorted(source_counts.items(), key=lambda x: x[1], reverse=True)[:3] | |
| sources_str = ", ".join([f"{source} ({count})" for source, count in top_sources]) | |
| differences.append(f"Coverage spans multiple sources: {sources_str}") | |
| # If no significant differences found | |
| if not differences: | |
| differences.append("Coverage is relatively consistent across articles") | |
| return differences | |
| def _get_final_sentiment(self, distribution: Dict[str, Any], articles: List[Dict[str, Any]]) -> str: | |
| """Generate final sentiment analysis based on distribution and article content.""" | |
| try: | |
| # Get basic sentiment counts | |
| basic_dist = distribution.get('basic', {}) | |
| positive_count = basic_dist.get('positive', 0) | |
| negative_count = basic_dist.get('negative', 0) | |
| neutral_count = basic_dist.get('neutral', 0) | |
| total_articles = positive_count + negative_count + neutral_count | |
| if total_articles == 0: | |
| return "No sentiment data available" | |
| # Calculate percentages | |
| positive_pct = (positive_count / total_articles) * 100 | |
| negative_pct = (negative_count / total_articles) * 100 | |
| neutral_pct = (neutral_count / total_articles) * 100 | |
| # Get average sentiment score | |
| avg_score = distribution.get('avg_score', 0.5) | |
| # Get volatility | |
| volatility = distribution.get('volatility', 0) | |
| # Get sentiment indices | |
| indices = distribution.get('sentiment_indices', {}) | |
| positivity_index = indices.get('positivity_index', 0.5) | |
| negativity_index = indices.get('negativity_index', 0.5) | |
| emotional_intensity = indices.get('emotional_intensity', 0) | |
| controversy_score = indices.get('controversy_score', 0) | |
| esg_relevance = indices.get('esg_relevance', 0) | |
| # Generate analysis text | |
| analysis = [] | |
| # Overall sentiment | |
| if positive_pct > 60: | |
| analysis.append(f"Overall sentiment is predominantly positive ({positive_pct:.1f}%).") | |
| elif negative_pct > 60: | |
| analysis.append(f"Overall sentiment is predominantly negative ({negative_pct:.1f}%).") | |
| elif neutral_pct > 60: | |
| analysis.append(f"Overall sentiment is predominantly neutral ({neutral_pct:.1f}%).") | |
| elif positive_pct > negative_pct and positive_pct > neutral_pct: | |
| analysis.append(f"Overall sentiment leans positive ({positive_pct:.1f}%), with some mixed coverage.") | |
| elif negative_pct > positive_pct and negative_pct > neutral_pct: | |
| analysis.append(f"Overall sentiment leans negative ({negative_pct:.1f}%), with some mixed coverage.") | |
| else: | |
| analysis.append(f"Sentiment is mixed across sources (Positive: {positive_pct:.1f}%, Negative: {negative_pct:.1f}%, Neutral: {neutral_pct:.1f}%).") | |
| # Sentiment indices insights | |
| if positivity_index > 0.7: | |
| analysis.append(f"High positivity index ({positivity_index:.2f}) indicates strong positive sentiment.") | |
| elif positivity_index < 0.3 and negativity_index > 0.7: | |
| analysis.append(f"High negativity index ({negativity_index:.2f}) with low positivity suggests strongly negative coverage.") | |
| if emotional_intensity > 0.6: | |
| analysis.append(f"Coverage shows high emotional intensity ({emotional_intensity:.2f}).") | |
| if controversy_score > 0.5: | |
| analysis.append(f"Coverage shows significant controversy ({controversy_score:.2f}), with polarized opinions.") | |
| if esg_relevance > 0.4: | |
| analysis.append(f"Coverage includes significant ESG-related content ({esg_relevance:.2f}).") | |
| # Volatility | |
| if volatility > 0.2: | |
| analysis.append(f"Sentiment varies considerably across articles (volatility: {volatility:.2f}).") | |
| else: | |
| analysis.append(f"Sentiment is relatively consistent across articles (volatility: {volatility:.2f}).") | |
| return " ".join(analysis) | |
| except Exception as e: | |
| print(f"Error generating final sentiment: {str(e)}") | |
| return "Unable to generate final sentiment analysis due to an error." | |