pravaah / app.py
Prathamesh Sutar
Deployment optimized
f0663fb
import gradio as gr
import json
import os
import logging
from datetime import datetime
from email.utils import parsedate_to_datetime
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
try:
from scraper import fetch_hazard_tweets, fetch_custom_tweets, get_available_hazards, get_available_locations
from classifier import classify_tweets
from pg_db import init_db, upsert_hazardous_tweet
# Initialize database (optional - will work without it)
try:
init_db()
logger.info("Database initialized successfully")
except Exception as e:
logger.warning(f"Database initialization failed: {e}. App will work without database persistence.")
except ImportError as e:
logger.error(f"Failed to import required modules: {e}")
raise
def run_pipeline(limit=20, hazard_type=None, location=None, days_back=1):
"""Run the hazard detection pipeline"""
try:
logger.info(f"Starting pipeline with limit: {limit}, hazard: {hazard_type}, location: {location}")
# Choose search method based on parameters
if hazard_type or location:
tweets = fetch_custom_tweets(
hazard_type=hazard_type,
location=location,
limit=limit,
days_back=days_back
)
else:
tweets = fetch_hazard_tweets(limit=limit)
logger.info(f"Fetched {len(tweets)} tweets")
# Process tweets: translate -> classify -> analyze
logger.info("πŸ”„ Processing tweets (this may take 1-2 minutes for first request)...")
results = classify_tweets(tweets)
logger.info(f"βœ… Processed {len(results)} tweets (translated, classified, and analyzed)")
# Store hazardous tweets in database (optional)
try:
hazardous_count = 0
for r in results:
if r.get('hazardous') == 1:
hazardous_count += 1
hazards = (r.get('ner') or {}).get('hazards') or []
hazard_type = ", ".join(hazards) if hazards else "unknown"
locs = (r.get('ner') or {}).get('locations') or []
if not locs and r.get('location'):
locs = [r['location']]
location = ", ".join(locs) if locs else "unknown"
sentiment = r.get('sentiment') or {"label": "unknown", "score": 0.0}
created_at = r.get('created_at') or ""
tweet_date = ""
tweet_time = ""
if created_at:
dt = None
try:
dt = parsedate_to_datetime(created_at)
except Exception:
dt = None
if dt is None and 'T' in created_at:
try:
iso = created_at.replace('Z', '+00:00')
dt = datetime.fromisoformat(iso)
except Exception:
dt = None
if dt is not None:
tweet_date = dt.date().isoformat()
tweet_time = dt.time().strftime('%H:%M:%S')
upsert_hazardous_tweet(
tweet_url=r.get('tweet_url') or "",
hazard_type=hazard_type,
location=location,
sentiment_label=sentiment.get('label', 'unknown'),
sentiment_score=float(sentiment.get('score', 0.0)),
tweet_date=tweet_date,
tweet_time=tweet_time,
)
logger.info(f"Stored {hazardous_count} hazardous tweets in database")
except Exception as db_error:
logger.warning(f"Database storage failed: {db_error}. Results will not be persisted.")
return results
except Exception as e:
logger.error(f"Pipeline failed: {str(e)}")
return f"Error: {str(e)}"
def analyze_tweets(limit, hazard_type, location, days_back):
"""Gradio interface function to analyze tweets"""
try:
limit = int(limit) if limit else 20
days_back = int(days_back) if days_back else 1
# Clean up inputs
hazard_type = hazard_type.strip() if hazard_type else None
location = location.strip() if location else None
results = run_pipeline(
limit=limit,
hazard_type=hazard_type,
location=location,
days_back=days_back
)
if isinstance(results, str): # Error case
return results, ""
# Count hazardous tweets
hazardous_count = sum(1 for r in results if r.get('hazardous') == 1)
total_count = len(results)
# Format results for display
display_text = f"Analyzed {total_count} tweets, found {hazardous_count} hazardous tweets.\n\n"
for i, result in enumerate(results, 1):
status = "🚨 HAZARDOUS" if result.get('hazardous') == 1 else "βœ… Safe"
display_text += f"{i}. {status}\n"
display_text += f" Text: {result.get('text', 'N/A')[:100]}...\n"
if result.get('translated_text'):
display_text += f" Translated: {result.get('translated_text', 'N/A')[:100]}...\n"
if result.get('hazardous') == 1:
sentiment = result.get('sentiment', {})
display_text += f" Sentiment: {sentiment.get('label', 'unknown')} ({sentiment.get('score', 0):.2f})\n"
ner = result.get('ner', {})
if ner.get('hazards'):
display_text += f" Hazards: {', '.join(ner.get('hazards', []))}\n"
if ner.get('locations'):
display_text += f" Locations: {', '.join(ner.get('locations', []))}\n"
display_text += f" URL: {result.get('tweet_url', 'N/A')}\n\n"
# Create JSON output
json_output = json.dumps(results, indent=2, ensure_ascii=False)
return display_text, json_output
except Exception as e:
return f"Error: {str(e)}", ""
# Health check endpoint
def health_check():
"""Simple health check for Docker"""
return {"status": "healthy", "message": "Ocean Hazard Detection System is running"}
# Create Gradio interface
with gr.Blocks(title="Ocean Hazard Detection", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🌊 Ocean Hazard Detection System
This system analyzes tweets to detect ocean-related hazards using AI. It:
- Scrapes tweets about ocean hazards from Indian coastal regions
- Classifies tweets as hazardous or safe using multilingual AI
- Translates non-English tweets to English
- Analyzes sentiment and extracts hazard types and locations
- Stores hazardous tweets in a database for tracking
**Note**: This demo uses a limited dataset. In production, it would analyze real-time tweets.
""")
with gr.Row():
with gr.Column():
limit_input = gr.Number(
label="Number of tweets to analyze",
value=10,
minimum=1,
maximum=50,
step=1
)
days_back_input = gr.Number(
label="Days back to search",
value=1,
minimum=1,
maximum=7,
step=1
)
analyze_btn = gr.Button("πŸ” Analyze Tweets", variant="primary")
with gr.Column():
hazard_type_input = gr.Dropdown(
label="Hazard Type (Optional)",
choices=get_available_hazards() if 'get_available_hazards' in globals() else [],
value=None,
allow_custom_value=True,
info="Select a specific hazard type or leave empty for all hazards"
)
location_input = gr.Dropdown(
label="Location (Optional)",
choices=get_available_locations() if 'get_available_locations' in globals() else [],
value=None,
allow_custom_value=True,
info="Select a specific location or leave empty for all locations"
)
with gr.Column():
gr.Markdown("### πŸ“Š Analysis Results")
results_text = gr.Textbox(
label="Analysis Summary",
lines=15,
max_lines=20,
interactive=False
)
with gr.Row():
gr.Markdown("### πŸ“„ Raw JSON Output")
json_output = gr.Textbox(
label="Complete Analysis Data (JSON)",
lines=10,
max_lines=15,
interactive=False
)
# Event handlers
analyze_btn.click(
fn=analyze_tweets,
inputs=[limit_input, hazard_type_input, location_input, days_back_input],
outputs=[results_text, json_output]
)
# Add some example queries
gr.Markdown("""
### πŸ” What this system looks for:
- **Hazard Keywords**: flood, tsunami, cyclone, storm surge, high tide, high waves, swell, coastal flooding, rip current, coastal erosion, water discoloration, algal bloom, marine debris, pollution
- **Locations**: Mumbai, Chennai, Kolkata, Odisha, Kerala, Gujarat, Goa, Andhra Pradesh, West Bengal, Vizag, Puri, Bay of Bengal, Arabian Sea
- **Languages**: Supports 20+ Indian languages including Hindi, Bengali, Tamil, Telugu, Marathi, Gujarati, and English
""")
if __name__ == "__main__":
# Add health check route
demo.launch(
server_name="0.0.0.0", # Important for Docker
server_port=7860, # Gradio default port
show_error=True, # Show errors in the interface
share=False, # Don't create public link
debug=True # Enable debug mode
)