Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| import io | |
| from PIL import Image | |
| import gradio as gr | |
| from smolagents import tool, CodeAgent, InferenceClientModel | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import accuracy_score, classification_report, r2_score, mean_squared_error | |
| from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor | |
| from sklearn.linear_model import LogisticRegression, LinearRegression | |
| import joblib | |
| import tempfile | |
| import os | |
| # π Set your HF API key | |
| agent = None | |
| # ββββββββββββββββββββββββββββββββ | |
| # π Heuristic Target Column Detection | |
| # ββββββββββββββββββββββββββββββββ | |
| def detect_target_column(df: pd.DataFrame) -> str: | |
| """ | |
| Heuristically detect the most likely target column based on naming, cardinality, and type. | |
| """ | |
| if df.empty or len(df.columns) < 2: | |
| return None | |
| scores = {} | |
| for col in df.columns: | |
| score = 0.0 | |
| name_lower = col.lower() | |
| # Rule 1: Name matches common target keywords | |
| keywords = ["target", "label", "class", "outcome", "result", "y", "output", "flag", "status", "churn", "survived", "price", "sale"] | |
| if any(kw in name_lower for kw in keywords): | |
| score += 3.0 | |
| if name_lower in ["target", "label", "class", "y"]: | |
| score += 2.0 | |
| # Rule 2: Binary or low-cardinality categorical β likely classification | |
| nunique = df[col].nunique() | |
| total = len(df) | |
| unique_ratio = nunique / total | |
| if nunique == 2 and df[col].dtype in ["int64", "object", "category"]: | |
| score += 4.0 # Strong signal | |
| elif nunique <= 20 and df[col].dtype in ["int64", "object", "category"]: | |
| score += 3.0 | |
| # Rule 3: High unique ratio + numeric β likely regression target | |
| if unique_ratio > 0.8 and df[col].dtype in ["int64", "float64"]: | |
| score += 2.5 | |
| # Rule 4: Avoid ID-like or high-cardinality text | |
| id_keywords = ["id", "name", "email", "phone", "address", "username", "url", "link"] | |
| if any(kw in name_lower for kw in id_keywords): | |
| score -= 10.0 | |
| if nunique == total and df[col].dtype == "object": | |
| score -= 10.0 # Likely unique identifier | |
| scores[col] = score | |
| # Return best candidate if score > 0 | |
| best_col = max(scores, key=scores.get) | |
| return best_col if scores[best_col] > 0 else None | |
| # ββββββββββββββββββββββββββββββββ | |
| # π οΈ Tool 1: LoadData | |
| # ββββββββββββββββββββββββββββββββ | |
| def LoadData(filepath: str) -> dict: | |
| """ | |
| Loads data from a CSV file and returns it as a dictionary. | |
| Args: | |
| filepath (str): Path to the CSV file. | |
| Returns: | |
| dict: Data as dictionary (from DataFrame.to_dict()). | |
| """ | |
| df = pd.read_csv(filepath) | |
| return df.to_dict() | |
| # ββββββββββββββββββββββββββββββββ | |
| # π οΈ Tool 2: CleanData (Enhanced) | |
| # ββββββββββββββββββββββββββββββββ | |
| def CleanData(data: dict, handle_outliers: bool = True, impute_strategy: str = "median_mode") -> pd.DataFrame: | |
| """ | |
| Cleans dataset with smart imputation, encoding, and optional outlier removal. | |
| Args: | |
| data (dict): Dataset in dictionary format. | |
| handle_outliers (bool): Whether to remove outliers using IQR. | |
| impute_strategy (str): "median_mode" or "mean_mode" | |
| Returns: | |
| pd.DataFrame: Cleaned dataset. | |
| """ | |
| df = pd.DataFrame.from_dict(data) | |
| # Drop duplicates | |
| df = df.drop_duplicates().reset_index(drop=True) | |
| # Handle missing values | |
| for col in df.columns: | |
| if df[col].dtype in ["int64", "float64"]: | |
| if impute_strategy == "median_mode" or df[col].skew() > 1: | |
| fill_val = df[col].median() | |
| else: | |
| fill_val = df[col].mean() | |
| df[col] = df[col].fillna(fill_val) | |
| else: | |
| mode = df[col].mode() | |
| fill_val = mode[0] if len(mode) > 0 else "Unknown" | |
| df[col] = df[col].fillna(fill_val) | |
| # Parse datetime | |
| for col in df.columns: | |
| if "date" in col.lower() or "time" in col.lower(): | |
| try: | |
| df[col] = pd.to_datetime(df[col], infer_datetime_format=True, errors="coerce") | |
| except: | |
| pass | |
| # Encode categorical variables (only if not too many unique values) | |
| for col in df.select_dtypes(include="object").columns: | |
| if df[col].nunique() / len(df) < 0.5: | |
| df[col] = df[col].astype("category").cat.codes | |
| # else: leave as object (e.g., free text) | |
| # Outlier removal (optional) | |
| if handle_outliers: | |
| for col in df.select_dtypes(include=["float64", "int64"]).columns: | |
| Q1 = df[col].quantile(0.25) | |
| Q3 = df[col].quantile(0.75) | |
| IQR = Q3 - Q1 | |
| lower, upper = Q1 - 1.5 * IQR, Q3 + 1.5 * IQR | |
| count_before = len(df) | |
| df = df[(df[col] >= lower) & (df[col] <= upper)] | |
| if len(df) == 0: | |
| # Avoid empty df | |
| df = pd.DataFrame.from_dict(data) # Revert | |
| break | |
| return df.reset_index(drop=True) | |
| # ββββββββββββββββββββββββββββββββ | |
| # π Tool 3: EDA (Enhanced) | |
| # ββββββββββββββββββββββββββββββββ | |
| def EDA(data: dict, max_cat_plots: int = 3, max_num_plots: int = 3) -> dict: | |
| """ | |
| Performs advanced EDA with smart visualizations and insights. | |
| Args: | |
| data (dict): Dataset in dictionary format. | |
| max_cat_plots (int): Max number of categorical distribution plots. | |
| max_num_plots (int): Max number of numeric vs target plots. | |
| Returns: | |
| dict: EDA results including text, plots, and recommendations. | |
| """ | |
| df = pd.DataFrame.from_dict(data) | |
| results = {} | |
| # 1. Summary Stats | |
| results["summary"] = df.describe(include="all").to_string() | |
| # 2. Missing Values | |
| missing = df.isnull().sum() | |
| results["missing_values"] = missing[missing > 0].to_dict() | |
| # Missingness heatmap | |
| if missing.sum() > 0: | |
| plt.figure(figsize=(8, 4)) | |
| sns.heatmap(df.isnull(), cbar=True, cmap="viridis", yticklabels=False) | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format="png", bbox_inches="tight") | |
| plt.close() | |
| buf.seek(0) | |
| img = Image.open(buf) | |
| results["missingness_plot"] = img #buf | |
| # 3. Correlation Heatmap | |
| corr = df.corr(numeric_only=True) | |
| if not corr.empty and len(corr.columns) > 1: | |
| plt.figure(figsize=(8, 6)) | |
| sns.heatmap(corr, annot=True, fmt=".2f", cmap="coolwarm", square=True) | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format="png", bbox_inches="tight") | |
| plt.close() | |
| buf.seek(0) | |
| img = Image.open(buf) | |
| results["correlation_plot"] = img #buf | |
| # Top 5 absolute correlations | |
| unstacked = corr.abs().unstack() | |
| unstacked = unstacked[unstacked < 1.0] | |
| top_corr = unstacked.sort_values(ascending=False).head(5).to_dict() | |
| results["top_correlations"] = top_corr | |
| # 4. Skewness & Kurtosis | |
| numeric_cols = df.select_dtypes(include=["float64", "int64"]).columns | |
| skew_kurt = {} | |
| for col in numeric_cols: | |
| skew_kurt[col] = {"skew": df[col].skew(), "kurtosis": df[col].kurtosis()} | |
| results["skew_kurtosis"] = skew_kurt | |
| # 5. Numeric Distributions | |
| if len(numeric_cols) > 0: | |
| df[numeric_cols].hist(bins=20, figsize=(12, 8), layout=(2, -1)) | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format="png", bbox_inches="tight") | |
| plt.close() | |
| buf.seek(0) | |
| img = Image.open(buf) | |
| results["numeric_distributions"] = img #buf | |
| # 6. Categorical Distributions | |
| cat_cols = df.select_dtypes(include=["object", "category"]).columns | |
| for col in cat_cols[:max_cat_plots]: | |
| plt.figure(figsize=(6, 4)) | |
| top_vals = df[col].value_counts().head(10) | |
| sns.barplot(x=top_vals.index, y=top_vals.values) | |
| plt.xticks(rotation=45) | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format="png", bbox_inches="tight") | |
| plt.close() | |
| buf.seek(0) | |
| img = Image.open(buf) | |
| results[f"dist_{col}"] = img #buf | |
| # 7. Target Relationships | |
| target_col = detect_target_column(df) | |
| if target_col: | |
| results["detected_target"] = target_col | |
| for col in numeric_cols[:max_num_plots]: | |
| plt.figure(figsize=(6, 4)) | |
| if df[target_col].nunique() <= 20: | |
| sns.boxplot(data=df, x=target_col, y=col) | |
| else: | |
| sns.scatterplot(data=df, x=col, y=target_col) | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format="png", bbox_inches="tight") | |
| plt.close() | |
| buf.seek(0) | |
| img = Image.open(buf) | |
| results[f"{col}_vs_{target_col}"] = img #buf | |
| # 8. Recommendations | |
| recs = [] | |
| for col, sk in skew_kurt.items(): | |
| if abs(sk["skew"]) > 1: | |
| recs.append(f"Feature '{col}' is skewed ({sk['skew']:.2f}) β consider log transform.") | |
| if results["missing_values"]: | |
| recs.append("Missing data detected β consider KNN or iterative imputation.") | |
| if results.get("top_correlations"): | |
| recs.append("High correlations found β consider PCA or feature selection.") | |
| if target_col: | |
| recs.append(f"Target variable '{target_col}' detected automatically.") | |
| results["recommendations"] = recs | |
| return results | |
| # ββββββββββββββββββββββββββββββββ | |
| # π€ Tool 4: AutoML (Enhanced) | |
| # ββββββββββββββββββββββββββββββββ | |
| def AutoML(data: dict, task_hint: str = None) -> dict: | |
| """ | |
| Enhanced AutoML with multiple models and robust evaluation. | |
| Args: | |
| data (dict): Cleaned dataset. | |
| task_hint (str): "classification", "regression", or None. | |
| Returns: | |
| dict: Model results and metrics. | |
| """ | |
| df = pd.DataFrame.from_dict(data) | |
| results = {} | |
| target_col = detect_target_column(df) | |
| if not target_col: | |
| results["note"] = "No target column detected. Check column names and data." | |
| return results | |
| X = df.drop(columns=[target_col]) | |
| y = df[target_col] | |
| # One-hot encode X | |
| X = pd.get_dummies(X, drop_first=True) | |
| if X.shape[1] == 0: | |
| results["error"] = "No valid features after encoding." | |
| return results | |
| X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) | |
| # Detect task | |
| if task_hint: | |
| task = task_hint | |
| elif y.dtype in ["object", "category"] or y.nunique() <= 20: | |
| task = "classification" | |
| else: | |
| task = "regression" | |
| try: | |
| if task == "classification": | |
| models = { | |
| "RandomForest": RandomForestClassifier(n_estimators=100, random_state=42), | |
| "LogisticRegression": LogisticRegression(max_iter=1000, random_state=42) | |
| } | |
| results["task"] = "classification" | |
| best_acc = 0 | |
| for name, model in models.items(): | |
| model.fit(X_train, y_train) | |
| preds = model.predict(X_test) | |
| acc = accuracy_score(y_test, preds) | |
| if acc > best_acc: | |
| best_acc = acc | |
| results["accuracy"] = acc | |
| results["best_model"] = name | |
| results["report"] = classification_report(y_test, preds, zero_division=0) | |
| if hasattr(model, "feature_importances_"): | |
| results["feature_importance"] = dict(zip(X.columns, model.feature_importances_)) | |
| else: | |
| models = { | |
| "RandomForest": RandomForestRegressor(n_estimators=100, random_state=42), | |
| "LinearRegression": LinearRegression() | |
| } | |
| results["task"] = "regression" | |
| best_r2 = -float("inf") | |
| for name, model in models.items(): | |
| model.fit(X_train, y_train) | |
| preds = model.predict(X_test) | |
| r2 = r2_score(y_test, preds) | |
| if r2 > best_r2: | |
| best_r2 = r2 | |
| results["r2_score"] = r2 | |
| results["mse"] = mean_squared_error(y_test, preds) | |
| results["best_model"] = name | |
| best_model = model # Keep best model | |
| if hasattr(model, "feature_importances_"): | |
| results["feature_importance"] = dict(zip(X.columns, model.feature_importances_)) | |
| # β Save the best model to a temporary file | |
| model_dir = tempfile.mkdtemp() | |
| model_path = os.path.join(model_dir, f"trained_model_{task}.pkl") | |
| joblib.dump({ | |
| "model": best_model, | |
| "task": task, | |
| "target_column": target_col, | |
| "features": X.columns.tolist() | |
| }, model_path) | |
| results["model_download_path"] = model_path | |
| results["model_info"] = f"Best model: {results['best_model']} | Task: {task} | Target: {target_col}" | |
| except Exception as e: | |
| results["error"] = f"Model training failed: {str(e)}" | |
| return results | |
| # ββββββββββββββββββββββββββββββββ | |
| # π§ Initialize the AI Agent | |
| # ββββββββββββββββββββββββββββββββ | |
| #agent = CodeAgent( | |
| #tools=[LoadData, CleanData, EDA, AutoML], | |
| #model=InferenceClientModel( | |
| #model_id="Qwen/Qwen2.5-Coder-1.5B-Instruct", | |
| #token=os.environ["HF_TOKEN"], | |
| #provider="Featherless AI", | |
| #max_tokens=4048 | |
| #), | |
| #additional_authorized_imports=[ | |
| #"pandas", "matplotlib.pyplot", "seaborn", "PIL", "sklearn", "io", "os","joblib","tempfile" | |
| #], | |
| #max_steps=10, | |
| #) | |
| def set_hf_token(token): | |
| global agent | |
| os.environ["HF_TOKEN"] = token.strip() | |
| # β Initialize the agent *only now* when token is available | |
| agent = CodeAgent( | |
| tools=[LoadData, CleanData, EDA, AutoML], | |
| model=InferenceClientModel( | |
| model_id="Qwen/Qwen2.5-Coder-7B-Instruct", | |
| token=os.environ["HF_TOKEN"], | |
| provider="nscale", | |
| max_tokens=4048 | |
| ), | |
| additional_authorized_imports=[ | |
| "pandas", "matplotlib.pyplot", "seaborn", "PIL", "sklearn", "io", "os", "joblib", "tempfile" | |
| ], | |
| max_steps=10, | |
| ) | |
| return "β Token saved and agent initialized successfully! You can now upload your CSV file." | |
| # ββββββββββββββββββββββββββββββββ | |
| # πΌοΈ Gradio Interface | |
| # ββββββββββββββββββββββββββββββββ | |
| def analyze_data(file): | |
| if "HF_TOKEN" not in os.environ or not os.environ["HF_TOKEN"]: | |
| return "β Please enter your HF token first!", [], None | |
| filepath = file.name | |
| prompt = f""" | |
| Load the data from '{filepath}', then clean it using CleanData with outlier handling. | |
| Run EDA to analyze data quality, distributions, and detect the target variable. | |
| If a target is found, run AutoML to train the best model. | |
| Return all insights, metrics, and visualizations. | |
| """ | |
| try: | |
| results = agent.run(prompt) | |
| except Exception as e: | |
| results = {"error": f"Agent failed: {str(e)}"} | |
| if not isinstance(results, dict): | |
| # AgentText or string β convert to dict-like structure | |
| if hasattr(results, "content"): | |
| text_content = results.content | |
| else: | |
| text_content = str(results) | |
| results = {"summary": text_content} | |
| # === Text Report === | |
| text_output = "" | |
| if "error" in results: | |
| text_output = f"β Error: {results['error']}" | |
| else: | |
| summary = results.get("summary", "No summary.") | |
| missing_vals = results.get("missing_values", {}) | |
| top_corr = results.get("top_correlations", {}) | |
| outliers = results.get("outliers", {}) | |
| recs = results.get("recommendations", []) | |
| detected_target = results.get("detected_target", "Unknown") | |
| text_output += f"### π Dataset Overview\n" | |
| text_output += f"**Detected Target:** `{detected_target}`\n\n" | |
| text_output += f"### Summary Stats\n{summary}\n\n" | |
| text_output += f"### Missing Values\n{missing_vals}\n\n" | |
| text_output += f"### Top Correlations\n{top_corr}\n\n" | |
| text_output += f"### Outliers\n{outliers}\n\n" | |
| text_output += f"### Recommendations\n" + "\n".join([f"- {r}" for r in recs]) + "\n\n" | |
| if "task" in results: | |
| task = results["task"] | |
| text_output += f"### π€ AutoML Results ({task.title()})\n" | |
| text_output += f"**Best Model:** {results.get('best_model', 'Unknown')}\n" | |
| if task == "classification": | |
| text_output += f"**Accuracy:** {results['accuracy']:.3f}\n\n" | |
| text_output += f"```\n{results['report']}\n```\n" | |
| else: | |
| text_output += f"**RΒ²:** {results['r2_score']:.3f}, **MSE:** {results['mse']:.3f}\n" | |
| feat_imp = sorted(results.get("feature_importance", {}).items(), key=lambda x: x[1], reverse=True)[:5] | |
| text_output += f"### Top Features\n" + "\n".join([f"- `{f}`: {imp:.3f}" for f, imp in feat_imp]) | |
| # === Collect Plots === | |
| plots = [] | |
| for key, value in results.items(): | |
| if isinstance(value, Image.Image): | |
| plots.append(value) | |
| model_file = results.get("model_download_path", None) | |
| if model_file and os.path.exists(model_file): | |
| model_download_output = model_file | |
| else: | |
| model_download_output = None # No file to download | |
| return text_output, plots, model_download_output | |
| # ββββββββββββββββββββββββββββββββ | |
| # π Launch Gradio App | |
| # ββββββββββββββββββββββββββββββββ | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# π§ AI Data Analyst Agent with AutoML & Smart Target Detection") | |
| gr.Markdown("Enter your Hugging Face token, then upload a CSV file.") | |
| token_box = gr.Textbox(label="π Hugging Face Token", placeholder="Enter your HF token here...", type="password") | |
| token_status = gr.Markdown() | |
| token_box.submit(set_hf_token, inputs=token_box, outputs=token_status) | |
| with gr.Row(): | |
| file_input = gr.File(label="π Upload CSV") | |
| with gr.Row(): | |
| text_output = gr.Textbox(label="π Analysis Report", lines=24) | |
| with gr.Row(): | |
| plots_output = gr.Gallery(label="π EDA & Model Plots", scale=2) | |
| with gr.Row(): | |
| model_download = gr.File(label="πΎ Download Trained Model (.pkl)") | |
| file_input.upload(analyze_data, inputs=file_input, outputs=[text_output, plots_output, model_download]) | |
| # Launch | |
| if __name__ == "__main__": | |
| demo.launch(share=True) # Use share=True for public link |