""" Data loader for agricultural intervention data. Loads data exclusively from Hugging Face datasets. """ import pandas as pd import numpy as np from typing import List, Optional import os from datasets import Dataset, load_dataset from huggingface_hub import HfApi, hf_hub_download class AgriculturalDataLoader: """Loads and preprocesses agricultural intervention data from Hugging Face datasets.""" def __init__(self, hf_token: str = None, dataset_id: str = None): self.hf_token = hf_token or os.environ.get("HF_TOKEN") self.dataset_id = dataset_id or "HackathonCRA/2024" self.data_cache = {} def load_all_files(self) -> pd.DataFrame: """Load data from Hugging Face dataset.""" if 'combined_data' in self.data_cache: return self.data_cache['combined_data'] # Load from Hugging Face only df = self.load_from_huggingface() self.data_cache['combined_data'] = df return df def load_from_huggingface(self) -> pd.DataFrame: """Load data from Hugging Face dataset.""" print(f"🤗 Loading dataset from Hugging Face: {self.dataset_id}") try: # Try multiple loading strategies df = None # Strategy 1: Try direct dataset loading try: dataset = load_dataset( self.dataset_id, token=self.hf_token, streaming=False ) df = dataset["train"].to_pandas() print(f"✅ Loaded via load_dataset: {len(df)} records") except Exception as e1: print(f"⚠️ load_dataset failed: {e1}") # Strategy 2: Load individual CSV files from HF Hub try: df = self._load_csv_files_from_hub() print(f"✅ Loaded via individual CSV files: {len(df)} records") except Exception as e2: print(f"⚠️ CSV loading failed: {e2}") raise ValueError(f"All loading strategies failed. Dataset: {e1}, CSV: {e2}") if df is None or len(df) == 0: raise ValueError("No data loaded from any strategy") # Apply preprocessing df = self._preprocess_data(df) print(f"✅ Successfully processed {len(df)} records from Hugging Face") return df except Exception as e: raise ValueError(f"Failed to load dataset from Hugging Face: {e}") def _load_csv_files_from_hub(self) -> pd.DataFrame: """Load individual CSV files from Hugging Face Hub.""" from huggingface_hub import hf_hub_download import tempfile print("📂 Loading individual CSV files from HF Hub...") # Get list of CSV files api = HfApi() try: repo_info = api.repo_info(repo_id=self.dataset_id, repo_type="dataset", token=self.hf_token) csv_files = [f.rfilename for f in repo_info.siblings if f.rfilename.endswith('.csv')] except Exception as e: raise ValueError(f"Failed to get repo info: {e}") if not csv_files: raise ValueError("No CSV files found in the dataset repository") print(f"📋 Found {len(csv_files)} CSV files") all_dataframes = [] for csv_file in csv_files: try: # Download CSV file to temporary location local_path = hf_hub_download( repo_id=self.dataset_id, filename=csv_file, repo_type="dataset", token=self.hf_token ) # Read CSV with appropriate settings # First, let's check if we need to skip the first row df = pd.read_csv(local_path) # If the first row contains "Interventions (sortie sous excel)", skip it if df.columns[0].startswith('Interventions'): df = pd.read_csv(local_path) all_dataframes.append(df) print(f" ✅ {csv_file}: {len(df)} rows") except Exception as e: print(f" ⚠️ Failed to load {csv_file}: {e}") continue if not all_dataframes: raise ValueError("No CSV files could be loaded successfully") # Combine all dataframes combined_df = pd.concat(all_dataframes, ignore_index=True) return combined_df def _preprocess_data(self, df: pd.DataFrame) -> pd.DataFrame: """Preprocess the agricultural data.""" print(f"🔧 Preprocessing {len(df)} records...") print(f"📋 Available columns: {list(df.columns)}") # Convert date columns date_columns = ['datedebut', 'datefin'] for col in date_columns: if col in df.columns: df[col] = pd.to_datetime(df[col], format='%d/%m/%y', errors='coerce') # Convert numeric columns numeric_columns = ['surfparc', 'quantitetot', 'neffqte', 'peffqte', 'kqte', 'teneurn', 'teneurp', 'teneurk', 'keq', 'volumebo'] for col in numeric_columns: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') # Add derived columns (with error checking) if 'millesime' in df.columns: df['year'] = df['millesime'] else: print("⚠️ Column 'millesime' not found, trying to infer year from filename or date") # Try to extract year from date if available if 'datedebut' in df.columns: df['year'] = pd.to_datetime(df['datedebut'], errors='coerce').dt.year else: # Set a default year or raise error print("❌ Cannot determine year - setting to 2024 as default") df['year'] = 2024 if 'libelleusag' in df.columns: df['crop_type'] = df['libelleusag'] else: df['crop_type'] = 'unknown' if 'libevenem' in df.columns: df['intervention_type'] = df['libevenem'] else: df['intervention_type'] = 'unknown' if 'familleprod' in df.columns: df['product_family'] = df['familleprod'] # Calculate IFT (Treatment Frequency Index) for herbicides df['is_herbicide'] = df['familleprod'].str.contains('Herbicides', na=False) df['is_fungicide'] = df['familleprod'].str.contains('Fongicides', na=False) df['is_insecticide'] = df['familleprod'].str.contains('Insecticides', na=False) else: df['product_family'] = 'unknown' df['is_herbicide'] = False df['is_fungicide'] = False df['is_insecticide'] = False if 'nomparc' in df.columns: df['plot_name'] = df['nomparc'] else: df['plot_name'] = 'unknown' if 'numparcell' in df.columns: df['plot_number'] = df['numparcell'] else: df['plot_number'] = 0 if 'surfparc' in df.columns: df['plot_surface'] = df['surfparc'] else: df['plot_surface'] = 1.0 print(f"✅ Preprocessing completed: {len(df)} records with {len(df.columns)} columns") return df def get_years_available(self) -> List[int]: """Get list of available years in the data.""" df = self.load_all_files() return sorted(df['year'].dropna().unique().astype(int).tolist()) def get_plots_available(self) -> List[str]: """Get list of available plots.""" df = self.load_all_files() return sorted(df['plot_name'].dropna().unique().tolist()) def get_crops_available(self) -> List[str]: """Get list of available crop types.""" df = self.load_all_files() return sorted(df['crop_type'].dropna().unique().tolist()) def filter_data(self, years: Optional[List[int]] = None, plots: Optional[List[str]] = None, crops: Optional[List[str]] = None, intervention_types: Optional[List[str]] = None) -> pd.DataFrame: """Filter the data based on criteria.""" df = self.load_all_files() if years: df = df[df['year'].isin(years)] if plots: df = df[df['plot_name'].isin(plots)] if crops: df = df[df['crop_type'].isin(crops)] if intervention_types: df = df[df['intervention_type'].isin(intervention_types)] return df def get_herbicide_usage(self, years: Optional[List[int]] = None) -> pd.DataFrame: """Get herbicide usage data for weed pressure analysis.""" df = self.filter_data(years=years) herbicide_data = df[df['is_herbicide'] == True].copy() # Group by plot, year, and crop usage_summary = herbicide_data.groupby(['plot_name', 'year', 'crop_type']).agg({ 'quantitetot': 'sum', 'produit': 'count', # Number of herbicide applications 'surfparc': 'first' }).reset_index() usage_summary.columns = ['plot_name', 'year', 'crop_type', 'total_quantity', 'num_applications', 'plot_surface'] usage_summary['ift_herbicide'] = usage_summary['num_applications'] / usage_summary['plot_surface'] return usage_summary def upload_to_huggingface(self) -> str: """Upload data to Hugging Face dataset.""" if not self.hf_token: raise ValueError("HF_TOKEN not provided") df = self.load_all_files() dataset = Dataset.from_pandas(df) # Upload to Hugging Face dataset.push_to_hub( repo_id=self.dataset_id, token=self.hf_token, private=False ) return f"Data uploaded to {self.dataset_id}" def clear_cache(self): """Clear cached data to force reload from Hugging Face.""" self.data_cache.clear() print("📋 Cache cleared - will reload from Hugging Face on next access")