Spaces:
Sleeping
Sleeping
| import os | |
| import pandas as pd | |
| import numpy as np | |
| import streamlit as st | |
| import plotly.express as px | |
| import plotly.figure_factory as ff | |
| from dotenv import load_dotenv | |
| from huggingface_hub import InferenceClient, login | |
| from io import StringIO | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| # ====================================================== | |
| # βοΈ APP CONFIGURATION | |
| # ====================================================== | |
| st.set_page_config(page_title="π Smart Data Analyst Pro", layout="wide") | |
| st.title("π Smart Data Analyst Pro") | |
| st.caption("AI that cleans, analyzes, and visualizes your data β powered by Hugging Face Inference API and local open-source models.") | |
| # ====================================================== | |
| # π Load Environment Variables | |
| # ====================================================== | |
| load_dotenv() | |
| HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_API_KEY") | |
| if not HF_TOKEN: | |
| st.error("β Missing HF_TOKEN. Please set it in your .env file.") | |
| else: | |
| login(token=HF_TOKEN) | |
| # ====================================================== | |
| # π§ MODEL SETTINGS | |
| # ====================================================== | |
| with st.sidebar: | |
| st.header("βοΈ Model Settings") | |
| CLEANER_MODEL = st.selectbox( | |
| "Select Cleaner Model:", | |
| [ | |
| "Qwen/Qwen2.5-Coder-7B-Instruct", | |
| "meta-llama/Meta-Llama-3-8B-Instruct", | |
| "microsoft/Phi-3-mini-4k-instruct" | |
| ], | |
| index=0 | |
| ) | |
| ANALYST_MODEL = st.selectbox( | |
| "Select Analysis Model (Local Open-Source Recommended):", | |
| [ | |
| "meta-llama/Meta-Llama-3-8B-Instruct", | |
| "Qwen/Qwen2.5-Coder-7B-Instruct", | |
| "HuggingFaceH4/zephyr-7b-beta", | |
| "mistralai/Mistral-7B-Instruct-v0.3" | |
| ], | |
| index=0 | |
| ) | |
| temperature = st.slider("Temperature", 0.0, 1.0, 0.3) | |
| max_tokens = st.slider("Max Tokens", 128, 2048, 512) | |
| # Initialize cleaner client (HF API) | |
| cleaner_client = InferenceClient(model=CLEANER_MODEL, token=HF_TOKEN) | |
| # Initialize local analyst if open-source | |
| local_analyst = None | |
| if ANALYST_MODEL in ["meta-llama/Meta-Llama-3-8B-Instruct"]: | |
| try: | |
| tokenizer = AutoTokenizer.from_pretrained(ANALYST_MODEL) | |
| model = AutoModelForCausalLM.from_pretrained(ANALYST_MODEL) | |
| local_analyst = pipeline("text-generation", model=model, tokenizer=tokenizer) | |
| except Exception as e: | |
| st.warning(f"β οΈ Failed to load local analyst: {e}") | |
| # ====================================================== | |
| # π§© DATA CLEANING FUNCTIONS | |
| # ====================================================== | |
| def fallback_clean(df: pd.DataFrame) -> pd.DataFrame: | |
| df = df.copy() | |
| df.dropna(axis=1, how="all", inplace=True) | |
| df.columns = [c.strip().replace(" ", "_").lower() for c in df.columns] | |
| for col in df.columns: | |
| if df[col].dtype == "O": | |
| df[col].fillna(df[col].mode()[0] if not df[col].mode().empty else "Unknown", inplace=True) | |
| else: | |
| df[col].fillna(df[col].median(), inplace=True) | |
| df.drop_duplicates(inplace=True) | |
| return df | |
| def ai_clean_dataset(df: pd.DataFrame) -> pd.DataFrame: | |
| raw_preview = df.head(5).to_csv(index=False) | |
| prompt = f""" | |
| You are a Python data cleaning expert. | |
| Clean and standardize the dataset dynamically: | |
| - Handle missing values logically | |
| - Correct and normalize column names | |
| - Detect and fix datatype inconsistencies | |
| - Remove duplicates or invalid rows | |
| Return ONLY valid CSV text (no Markdown). | |
| --- RAW SAMPLE --- | |
| {raw_preview} | |
| """ | |
| try: | |
| response = cleaner_client.text_generation(prompt, max_new_tokens=1024, temperature=0.1, return_full_text=False) | |
| cleaned_str = response.strip() | |
| except Exception as e: | |
| st.warning(f"β οΈ AI cleaning failed: {e}") | |
| return fallback_clean(df) | |
| cleaned_str = cleaned_str.replace("```csv","").replace("```","").replace("###","").replace(";",",").strip() | |
| lines = [l for l in cleaned_str.splitlines() if "," in l] | |
| cleaned_str = "\n".join(lines) | |
| try: | |
| cleaned_df = pd.read_csv(StringIO(cleaned_str), on_bad_lines="skip") | |
| cleaned_df.dropna(axis=1, how="all", inplace=True) | |
| cleaned_df.columns = [c.strip().replace(" ", "_").lower() for c in cleaned_df.columns] | |
| return cleaned_df | |
| except Exception as e: | |
| st.warning(f"β οΈ CSV parse failed: {e}") | |
| return fallback_clean(df) | |
| def summarize_dataframe(df: pd.DataFrame) -> str: | |
| lines = [f"Rows: {len(df)} | Columns: {len(df.columns)}", "Column summaries:"] | |
| for col in df.columns[:10]: | |
| non_null = int(df[col].notnull().sum()) | |
| if pd.api.types.is_numeric_dtype(df[col]): | |
| mean = df[col].mean() | |
| median = df[col].median() if non_null > 0 else None | |
| lines.append(f"- {col}: mean={mean:.3f}, median={median}, non_null={non_null}") | |
| else: | |
| top = df[col].value_counts().head(3).to_dict() | |
| lines.append(f"- {col}: top_values={top}, non_null={non_null}") | |
| return "\n".join(lines) | |
| # ====================================================== | |
| # π§ ANALYSIS FUNCTION | |
| # ====================================================== | |
| def query_analysis_model(df: pd.DataFrame, user_query: str, dataset_name: str) -> str: | |
| df_summary = summarize_dataframe(df) | |
| sample = df.head(6).to_csv(index=False) | |
| prompt = f""" | |
| You are a data analyst. | |
| Analyze '{dataset_name}' and answer the question below. | |
| Base your insights only on the provided data. | |
| --- SUMMARY --- | |
| {df_summary} | |
| --- SAMPLE DATA --- | |
| {sample} | |
| --- QUESTION --- | |
| {user_query} | |
| Respond concisely with key insights, numbers, patterns, and recommended steps. | |
| """ | |
| if local_analyst: | |
| try: | |
| response = local_analyst(prompt, max_new_tokens=max_tokens, temperature=temperature) | |
| return response[0]['generated_text'] | |
| except Exception as e: | |
| return f"β οΈ Local analysis failed: {e}" | |
| else: | |
| st.warning("β οΈ Analyst model is not local. Using HF API may require payment.") | |
| return "Analysis not available for free model." | |
| # ====================================================== | |
| # π MAIN APP | |
| # ====================================================== | |
| uploaded = st.file_uploader("π Upload CSV or Excel file", type=["csv", "xlsx"]) | |
| if uploaded: | |
| try: | |
| df = pd.read_csv(uploaded) if uploaded.name.endswith(".csv") else pd.read_excel(uploaded) | |
| except Exception as e: | |
| st.error(f"β File load failed: {e}") | |
| st.stop() | |
| with st.spinner("π§Ό AI Cleaning your dataset..."): | |
| cleaned_df = ai_clean_dataset(df) | |
| st.subheader("β Cleaned Dataset Preview") | |
| st.dataframe(cleaned_df.head(), use_container_width=True) | |
| with st.expander("π Cleaning Summary"): | |
| st.text(summarize_dataframe(cleaned_df)) | |
| with st.expander("π Quick Visualizations", expanded=True): | |
| numeric_cols = cleaned_df.select_dtypes(include="number").columns.tolist() | |
| categorical_cols = cleaned_df.select_dtypes(exclude="number").columns.tolist() | |
| viz_type = st.selectbox("Visualization Type", ["Scatter Plot", "Histogram", "Box Plot", "Correlation Heatmap", "Categorical Count"]) | |
| if viz_type == "Scatter Plot" and len(numeric_cols) >= 2: | |
| x = st.selectbox("X-axis", numeric_cols) | |
| y = st.selectbox("Y-axis", numeric_cols, index=min(1,len(numeric_cols)-1)) | |
| color = st.selectbox("Color", ["None"] + categorical_cols) | |
| fig = px.scatter(cleaned_df, x=x, y=y, color=None if color=="None" else color) | |
| st.plotly_chart(fig, use_container_width=True) | |
| elif viz_type == "Histogram" and numeric_cols: | |
| col = st.selectbox("Column", numeric_cols) | |
| fig = px.histogram(cleaned_df, x=col, nbins=30) | |
| st.plotly_chart(fig, use_container_width=True) | |
| elif viz_type == "Box Plot" and numeric_cols: | |
| col = st.selectbox("Column", numeric_cols) | |
| fig = px.box(cleaned_df, y=col) | |
| st.plotly_chart(fig, use_container_width=True) | |
| elif viz_type == "Correlation Heatmap" and len(numeric_cols) > 1: | |
| corr = cleaned_df[numeric_cols].corr() | |
| fig = ff.create_annotated_heatmap(z=corr.values, x=list(corr.columns), y=list(corr.index), | |
| annotation_text=corr.round(2).values, showscale=True) | |
| st.plotly_chart(fig, use_container_width=True) | |
| elif viz_type == "Categorical Count" and categorical_cols: | |
| cat = st.selectbox("Category", categorical_cols) | |
| fig = px.bar(cleaned_df[cat].value_counts().reset_index(), x="index", y=cat) | |
| st.plotly_chart(fig, use_container_width=True) | |
| else: | |
| st.warning("β οΈ Not enough columns for this visualization type.") | |
| st.subheader("π¬ Ask AI About Your Data") | |
| user_query = st.text_area("Enter your question:", placeholder="e.g. What factors influence sales?") | |
| if st.button("Analyze with AI", use_container_width=True) and user_query: | |
| with st.spinner("π€ Interpreting data..."): | |
| result = query_analysis_model(cleaned_df, user_query, uploaded.name) | |
| st.markdown("### π‘ Insights") | |
| st.markdown(result) | |
| else: | |
| st.info("π₯ Upload a dataset to begin smart analysis.") | |