Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import json | |
| import os | |
| import logging | |
| from datetime import datetime | |
| from email.utils import parsedate_to_datetime | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| try: | |
| from scraper import fetch_hazard_tweets, fetch_custom_tweets, get_available_hazards, get_available_locations | |
| from classifier import classify_tweets | |
| from pg_db import init_db, upsert_hazardous_tweet | |
| # Initialize database (optional - will work without it) | |
| try: | |
| init_db() | |
| logger.info("Database initialized successfully") | |
| except Exception as e: | |
| logger.warning(f"Database initialization failed: {e}. App will work without database persistence.") | |
| except ImportError as e: | |
| logger.error(f"Failed to import required modules: {e}") | |
| raise | |
| def run_pipeline(limit=20, hazard_type=None, location=None, days_back=1): | |
| """Run the hazard detection pipeline""" | |
| try: | |
| logger.info(f"Starting pipeline with limit: {limit}, hazard: {hazard_type}, location: {location}") | |
| # Choose search method based on parameters | |
| if hazard_type or location: | |
| tweets = fetch_custom_tweets( | |
| hazard_type=hazard_type, | |
| location=location, | |
| limit=limit, | |
| days_back=days_back | |
| ) | |
| else: | |
| tweets = fetch_hazard_tweets(limit=limit) | |
| logger.info(f"Fetched {len(tweets)} tweets") | |
| # Process tweets: translate -> classify -> analyze | |
| logger.info("π Processing tweets (this may take 1-2 minutes for first request)...") | |
| results = classify_tweets(tweets) | |
| logger.info(f"β Processed {len(results)} tweets (translated, classified, and analyzed)") | |
| # Store hazardous tweets in database (optional) | |
| try: | |
| hazardous_count = 0 | |
| for r in results: | |
| if r.get('hazardous') == 1: | |
| hazardous_count += 1 | |
| hazards = (r.get('ner') or {}).get('hazards') or [] | |
| hazard_type = ", ".join(hazards) if hazards else "unknown" | |
| locs = (r.get('ner') or {}).get('locations') or [] | |
| if not locs and r.get('location'): | |
| locs = [r['location']] | |
| location = ", ".join(locs) if locs else "unknown" | |
| sentiment = r.get('sentiment') or {"label": "unknown", "score": 0.0} | |
| created_at = r.get('created_at') or "" | |
| tweet_date = "" | |
| tweet_time = "" | |
| if created_at: | |
| dt = None | |
| try: | |
| dt = parsedate_to_datetime(created_at) | |
| except Exception: | |
| dt = None | |
| if dt is None and 'T' in created_at: | |
| try: | |
| iso = created_at.replace('Z', '+00:00') | |
| dt = datetime.fromisoformat(iso) | |
| except Exception: | |
| dt = None | |
| if dt is not None: | |
| tweet_date = dt.date().isoformat() | |
| tweet_time = dt.time().strftime('%H:%M:%S') | |
| upsert_hazardous_tweet( | |
| tweet_url=r.get('tweet_url') or "", | |
| hazard_type=hazard_type, | |
| location=location, | |
| sentiment_label=sentiment.get('label', 'unknown'), | |
| sentiment_score=float(sentiment.get('score', 0.0)), | |
| tweet_date=tweet_date, | |
| tweet_time=tweet_time, | |
| ) | |
| logger.info(f"Stored {hazardous_count} hazardous tweets in database") | |
| except Exception as db_error: | |
| logger.warning(f"Database storage failed: {db_error}. Results will not be persisted.") | |
| return results | |
| except Exception as e: | |
| logger.error(f"Pipeline failed: {str(e)}") | |
| return f"Error: {str(e)}" | |
| def analyze_tweets(limit, hazard_type, location, days_back): | |
| """Gradio interface function to analyze tweets""" | |
| try: | |
| limit = int(limit) if limit else 20 | |
| days_back = int(days_back) if days_back else 1 | |
| # Clean up inputs | |
| hazard_type = hazard_type.strip() if hazard_type else None | |
| location = location.strip() if location else None | |
| results = run_pipeline( | |
| limit=limit, | |
| hazard_type=hazard_type, | |
| location=location, | |
| days_back=days_back | |
| ) | |
| if isinstance(results, str): # Error case | |
| return results, "" | |
| # Count hazardous tweets | |
| hazardous_count = sum(1 for r in results if r.get('hazardous') == 1) | |
| total_count = len(results) | |
| # Format results for display | |
| display_text = f"Analyzed {total_count} tweets, found {hazardous_count} hazardous tweets.\n\n" | |
| for i, result in enumerate(results, 1): | |
| status = "π¨ HAZARDOUS" if result.get('hazardous') == 1 else "β Safe" | |
| display_text += f"{i}. {status}\n" | |
| display_text += f" Text: {result.get('text', 'N/A')[:100]}...\n" | |
| if result.get('translated_text'): | |
| display_text += f" Translated: {result.get('translated_text', 'N/A')[:100]}...\n" | |
| if result.get('hazardous') == 1: | |
| sentiment = result.get('sentiment', {}) | |
| display_text += f" Sentiment: {sentiment.get('label', 'unknown')} ({sentiment.get('score', 0):.2f})\n" | |
| ner = result.get('ner', {}) | |
| if ner.get('hazards'): | |
| display_text += f" Hazards: {', '.join(ner.get('hazards', []))}\n" | |
| if ner.get('locations'): | |
| display_text += f" Locations: {', '.join(ner.get('locations', []))}\n" | |
| display_text += f" URL: {result.get('tweet_url', 'N/A')}\n\n" | |
| # Create JSON output | |
| json_output = json.dumps(results, indent=2, ensure_ascii=False) | |
| return display_text, json_output | |
| except Exception as e: | |
| return f"Error: {str(e)}", "" | |
| # Health check endpoint | |
| def health_check(): | |
| """Simple health check for Docker""" | |
| return {"status": "healthy", "message": "Ocean Hazard Detection System is running"} | |
| # Create Gradio interface | |
| with gr.Blocks(title="Ocean Hazard Detection", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # π Ocean Hazard Detection System | |
| This system analyzes tweets to detect ocean-related hazards using AI. It: | |
| - Scrapes tweets about ocean hazards from Indian coastal regions | |
| - Classifies tweets as hazardous or safe using multilingual AI | |
| - Translates non-English tweets to English | |
| - Analyzes sentiment and extracts hazard types and locations | |
| - Stores hazardous tweets in a database for tracking | |
| **Note**: This demo uses a limited dataset. In production, it would analyze real-time tweets. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| limit_input = gr.Number( | |
| label="Number of tweets to analyze", | |
| value=10, | |
| minimum=1, | |
| maximum=50, | |
| step=1 | |
| ) | |
| days_back_input = gr.Number( | |
| label="Days back to search", | |
| value=1, | |
| minimum=1, | |
| maximum=7, | |
| step=1 | |
| ) | |
| analyze_btn = gr.Button("π Analyze Tweets", variant="primary") | |
| with gr.Column(): | |
| hazard_type_input = gr.Dropdown( | |
| label="Hazard Type (Optional)", | |
| choices=get_available_hazards() if 'get_available_hazards' in globals() else [], | |
| value=None, | |
| allow_custom_value=True, | |
| info="Select a specific hazard type or leave empty for all hazards" | |
| ) | |
| location_input = gr.Dropdown( | |
| label="Location (Optional)", | |
| choices=get_available_locations() if 'get_available_locations' in globals() else [], | |
| value=None, | |
| allow_custom_value=True, | |
| info="Select a specific location or leave empty for all locations" | |
| ) | |
| with gr.Column(): | |
| gr.Markdown("### π Analysis Results") | |
| results_text = gr.Textbox( | |
| label="Analysis Summary", | |
| lines=15, | |
| max_lines=20, | |
| interactive=False | |
| ) | |
| with gr.Row(): | |
| gr.Markdown("### π Raw JSON Output") | |
| json_output = gr.Textbox( | |
| label="Complete Analysis Data (JSON)", | |
| lines=10, | |
| max_lines=15, | |
| interactive=False | |
| ) | |
| # Event handlers | |
| analyze_btn.click( | |
| fn=analyze_tweets, | |
| inputs=[limit_input, hazard_type_input, location_input, days_back_input], | |
| outputs=[results_text, json_output] | |
| ) | |
| # Add some example queries | |
| gr.Markdown(""" | |
| ### π What this system looks for: | |
| - **Hazard Keywords**: flood, tsunami, cyclone, storm surge, high tide, high waves, swell, coastal flooding, rip current, coastal erosion, water discoloration, algal bloom, marine debris, pollution | |
| - **Locations**: Mumbai, Chennai, Kolkata, Odisha, Kerala, Gujarat, Goa, Andhra Pradesh, West Bengal, Vizag, Puri, Bay of Bengal, Arabian Sea | |
| - **Languages**: Supports 20+ Indian languages including Hindi, Bengali, Tamil, Telugu, Marathi, Gujarati, and English | |
| """) | |
| if __name__ == "__main__": | |
| # Add health check route | |
| demo.launch( | |
| server_name="0.0.0.0", # Important for Docker | |
| server_port=7860, # Gradio default port | |
| show_error=True, # Show errors in the interface | |
| share=False, # Don't create public link | |
| debug=True # Enable debug mode | |
| ) | |