Spaces:
Sleeping
Sleeping
| import os | |
| import pandas as pd | |
| import numpy as np | |
| import streamlit as st | |
| from dotenv import load_dotenv | |
| from huggingface_hub import InferenceClient, login | |
| import google.generativeai as genai | |
| from io import StringIO | |
| import time | |
| import requests | |
| # ====================================================== | |
| # βοΈ APP CONFIGURATION | |
| # ====================================================== | |
| st.set_page_config(page_title="π Smart Data Analyst Pro", layout="wide") | |
| st.title("π Smart Data Analyst Pro (Chat Mode)") | |
| st.caption("Chat with your dataset β AI cleans, analyzes, and visualizes data. Hugging Face + Gemini compatible.") | |
| # ====================================================== | |
| # π Load Environment Variables | |
| # ====================================================== | |
| load_dotenv() | |
| HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_API_KEY") | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
| if not HF_TOKEN: | |
| st.error("β Missing HF_TOKEN. Please set it in your .env file.") | |
| else: | |
| login(token=HF_TOKEN) | |
| if GEMINI_API_KEY: | |
| genai.configure(api_key=GEMINI_API_KEY) | |
| else: | |
| st.warning("β οΈ Gemini API key missing. Gemini 2.5 Flash will not work.") | |
| # ====================================================== | |
| # π§ MODEL SETUP | |
| # ====================================================== | |
| with st.sidebar: | |
| st.header("βοΈ Model Settings") | |
| CLEANER_MODEL = st.selectbox( | |
| "Select Cleaner Model:", | |
| [ | |
| "Qwen/Qwen2.5-Coder-14B", | |
| "mistralai/Mistral-7B-Instruct-v0.3" | |
| ], | |
| index=0 | |
| ) | |
| ANALYST_MODEL = st.selectbox( | |
| "Select Analysis Model:", | |
| [ | |
| "Gemini 2.5 Flash (Google)", | |
| "Qwen/Qwen2.5-14B-Instruct", | |
| "mistralai/Mistral-7B-Instruct-v0.3", | |
| "HuggingFaceH4/zephyr-7b-beta" | |
| ], | |
| index=0 | |
| ) | |
| temperature = st.slider("Temperature", 0.0, 1.0, 0.3) | |
| max_tokens = st.slider("Max Tokens", 128, 4096, 1024) | |
| hf_cleaner_client = InferenceClient(model=CLEANER_MODEL, token=HF_TOKEN) | |
| hf_analyst_client = None | |
| if ANALYST_MODEL != "Gemini 2.5 Flash (Google)": | |
| hf_analyst_client = InferenceClient(model=ANALYST_MODEL, token=HF_TOKEN) | |
| # ====================================================== | |
| # π§© SAFE GENERATION FUNCTION | |
| # ====================================================== | |
| def safe_hf_generate(client, prompt, temperature=0.3, max_tokens=512, retries=2): | |
| """Try text generation, with retry + fallback on service errors.""" | |
| for attempt in range(retries + 1): | |
| try: | |
| resp = client.text_generation( | |
| prompt, | |
| temperature=temperature, | |
| max_new_tokens=max_tokens, | |
| return_full_text=False, | |
| ) | |
| return resp.strip() | |
| except Exception as e: | |
| err = str(e) | |
| # π©Ή FIX: Handle common server overloads gracefully | |
| if "503" in err or "Service Temporarily Unavailable" in err: | |
| time.sleep(2) | |
| if attempt < retries: | |
| continue # retry | |
| else: | |
| return "β οΈ The Hugging Face model is temporarily unavailable. Please try again or switch to Gemini." | |
| elif "Supported task: conversational" in err: | |
| chat_resp = client.chat_completion( | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| ) | |
| return chat_resp["choices"][0]["message"]["content"].strip() | |
| else: | |
| raise e | |
| return "β οΈ Failed after retries." | |
| # ====================================================== | |
| # π§© DATA CLEANING | |
| # ====================================================== | |
| def fallback_clean(df: pd.DataFrame) -> pd.DataFrame: | |
| df = df.copy() | |
| df.dropna(axis=1, how="all", inplace=True) | |
| df.columns = [c.strip().replace(" ", "_").lower() for c in df.columns] | |
| for col in df.columns: | |
| if df[col].dtype == "O": | |
| if not df[col].mode().empty: | |
| df[col].fillna(df[col].mode()[0], inplace=True) | |
| else: | |
| df[col].fillna("Unknown", inplace=True) | |
| else: | |
| df[col].fillna(df[col].median(), inplace=True) | |
| df.drop_duplicates(inplace=True) | |
| return df | |
| def ai_clean_dataset(df: pd.DataFrame) -> (pd.DataFrame, str): | |
| if len(df) > 50: | |
| return df, "β οΈ AI cleaning skipped: dataset has more than 50 rows." | |
| csv_text = df.to_csv(index=False) | |
| prompt = f""" | |
| You are a professional data cleaning assistant. | |
| Clean and standardize the dataset below dynamically: | |
| 1. Handle missing values | |
| 2. Fix column name inconsistencies | |
| 3. Convert data types (dates, numbers, categories) | |
| 4. Remove irrelevant or duplicate rows | |
| Return ONLY a valid CSV text (no markdown, no explanations). | |
| Dataset: | |
| {csv_text} | |
| """ | |
| try: | |
| cleaned_str = safe_hf_generate(hf_cleaner_client, prompt, temperature=0.1, max_tokens=4096) | |
| cleaned_str = cleaned_str.replace("```csv", "").replace("```", "").replace("###", "").strip() | |
| cleaned_df = pd.read_csv(StringIO(cleaned_str), on_bad_lines="skip") | |
| cleaned_df.columns = [c.strip().replace(" ", "_").lower() for c in cleaned_df.columns] | |
| return cleaned_df, "β AI cleaning completed successfully." | |
| except Exception as e: | |
| return df, f"β οΈ AI cleaning failed: {str(e)}" | |
| # ====================================================== | |
| # π§© DATA SUMMARY (Token-efficient) | |
| # ====================================================== | |
| def summarize_for_analysis(df: pd.DataFrame, sample_rows=10) -> str: | |
| summary = [f"Rows: {len(df)}, Columns: {len(df.columns)}"] | |
| for col in df.columns: | |
| non_null = int(df[col].notnull().sum()) | |
| if pd.api.types.is_numeric_dtype(df[col]): | |
| desc = df[col].describe().to_dict() | |
| summary.append(f"- {col}: mean={desc.get('mean', np.nan):.2f}, median={df[col].median():.2f}, non_null={non_null}") | |
| else: | |
| top = df[col].value_counts().head(3).to_dict() | |
| summary.append(f"- {col}: top_values={top}, non_null={non_null}") | |
| sample = df.head(sample_rows).to_csv(index=False) | |
| summary.append("--- Sample Data ---") | |
| summary.append(sample) | |
| return "\n".join(summary) | |
| # ====================================================== | |
| # π§ ANALYSIS FUNCTION | |
| # ====================================================== | |
| def query_analysis_model(df: pd.DataFrame, user_query: str, dataset_name: str) -> str: | |
| prompt_summary = summarize_for_analysis(df) | |
| prompt = f""" | |
| You are a professional data analyst. | |
| Analyze the dataset '{dataset_name}' and answer the user's question. | |
| --- DATA SUMMARY --- | |
| {prompt_summary} | |
| --- USER QUESTION --- | |
| {user_query} | |
| Respond with: | |
| 1. Key insights and patterns | |
| 2. Quantitative findings | |
| 3. Notable relationships or anomalies | |
| 4. Data-driven recommendations | |
| """ | |
| try: | |
| if ANALYST_MODEL == "Gemini 2.5 Flash (Google)": | |
| response = genai.GenerativeModel("gemini-2.5-flash").generate_content( | |
| prompt, | |
| generation_config={ | |
| "temperature": temperature, | |
| "max_output_tokens": max_tokens | |
| } | |
| ) | |
| return response.text if hasattr(response, "text") else "No valid text response." | |
| else: | |
| # π©Ή FIX: wrap in retry-aware generator | |
| result = safe_hf_generate(hf_analyst_client, prompt, temperature=temperature, max_tokens=max_tokens) | |
| # fallback to Gemini if Hugging Face failed entirely | |
| if "temporarily unavailable" in result.lower() and GEMINI_API_KEY: | |
| alt = genai.GenerativeModel("gemini-2.5-flash").generate_content(prompt) | |
| return f"π Fallback to Gemini:\n\n{alt.text}" | |
| return result | |
| except Exception as e: | |
| # π©Ή FIX: fallback if server rejects or 5xx | |
| if "503" in str(e) and GEMINI_API_KEY: | |
| response = genai.GenerativeModel("gemini-2.5-flash").generate_content(prompt) | |
| return f"π Fallback to Gemini due to 503 error:\n\n{response.text}" | |
| return f"β οΈ Analysis failed: {str(e)}" | |
| # ====================================================== | |
| # π MAIN CHATBOT LOGIC | |
| # ====================================================== | |
| uploaded = st.file_uploader("π Upload CSV or Excel file", type=["csv", "xlsx"]) | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| if uploaded: | |
| df = pd.read_csv(uploaded) if uploaded.name.endswith(".csv") else pd.read_excel(uploaded) | |
| with st.spinner("π§Ό Cleaning your dataset..."): | |
| cleaned_df, cleaning_status = ai_clean_dataset(df) | |
| st.subheader("β Cleaning Status") | |
| st.info(cleaning_status) | |
| st.subheader("π Dataset Preview") | |
| st.dataframe(cleaned_df.head(), use_container_width=True) | |
| st.subheader("π¬ Chat with Your Dataset") | |
| for msg in st.session_state.messages: | |
| with st.chat_message(msg["role"]): | |
| st.markdown(msg["content"]) | |
| if user_query := st.chat_input("Ask something about your dataset..."): | |
| st.session_state.messages.append({"role": "user", "content": user_query}) | |
| with st.chat_message("user"): | |
| st.markdown(user_query) | |
| with st.chat_message("assistant"): | |
| with st.spinner("π€ Analyzing..."): | |
| result = query_analysis_model(cleaned_df, user_query, uploaded.name) | |
| st.markdown(result) | |
| st.session_state.messages.append({"role": "assistant", "content": result}) | |
| else: | |
| st.info("π₯ Upload a dataset to begin chatting with your AI analyst.") | |