Spaces:
Sleeping
Sleeping
| """ | |
| Data loader for agricultural intervention data. | |
| Loads data exclusively from Hugging Face datasets. | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from typing import List, Optional | |
| import os | |
| from datasets import Dataset, load_dataset | |
| from huggingface_hub import HfApi, hf_hub_download | |
| class AgriculturalDataLoader: | |
| """Loads and preprocesses agricultural intervention data from Hugging Face datasets.""" | |
| def __init__(self, hf_token: str = None, dataset_id: str = None): | |
| self.hf_token = hf_token or os.environ.get("HF_TOKEN") | |
| self.dataset_id = dataset_id or "HackathonCRA/2024" | |
| self.data_cache = {} | |
| def load_all_files(self) -> pd.DataFrame: | |
| """Load data from Hugging Face dataset.""" | |
| if 'combined_data' in self.data_cache: | |
| return self.data_cache['combined_data'] | |
| # Load from Hugging Face only | |
| df = self.load_from_huggingface() | |
| self.data_cache['combined_data'] = df | |
| return df | |
| def load_from_huggingface(self) -> pd.DataFrame: | |
| """Load data from Hugging Face dataset.""" | |
| print(f"π€ Loading dataset from Hugging Face: {self.dataset_id}") | |
| try: | |
| # Try multiple loading strategies | |
| df = None | |
| # Strategy 1: Try direct dataset loading | |
| try: | |
| dataset = load_dataset( | |
| self.dataset_id, | |
| token=self.hf_token, | |
| streaming=False | |
| ) | |
| df = dataset["train"].to_pandas() | |
| print(f"β Loaded via load_dataset: {len(df)} records") | |
| except Exception as e1: | |
| print(f"β οΈ load_dataset failed: {e1}") | |
| # Strategy 2: Load individual CSV files from HF Hub | |
| try: | |
| df = self._load_csv_files_from_hub() | |
| print(f"β Loaded via individual CSV files: {len(df)} records") | |
| except Exception as e2: | |
| print(f"β οΈ CSV loading failed: {e2}") | |
| raise ValueError(f"All loading strategies failed. Dataset: {e1}, CSV: {e2}") | |
| if df is None or len(df) == 0: | |
| raise ValueError("No data loaded from any strategy") | |
| # Apply preprocessing | |
| df = self._preprocess_data(df) | |
| print(f"β Successfully processed {len(df)} records from Hugging Face") | |
| return df | |
| except Exception as e: | |
| raise ValueError(f"Failed to load dataset from Hugging Face: {e}") | |
| def _load_csv_files_from_hub(self) -> pd.DataFrame: | |
| """Load individual CSV files from Hugging Face Hub.""" | |
| from huggingface_hub import hf_hub_download | |
| import tempfile | |
| print("π Loading individual CSV files from HF Hub...") | |
| # Get list of CSV files | |
| api = HfApi() | |
| try: | |
| repo_info = api.repo_info(repo_id=self.dataset_id, repo_type="dataset", token=self.hf_token) | |
| csv_files = [f.rfilename for f in repo_info.siblings if f.rfilename.endswith('.csv')] | |
| except Exception as e: | |
| raise ValueError(f"Failed to get repo info: {e}") | |
| if not csv_files: | |
| raise ValueError("No CSV files found in the dataset repository") | |
| print(f"π Found {len(csv_files)} CSV files") | |
| all_dataframes = [] | |
| for csv_file in csv_files: | |
| try: | |
| # Download CSV file to temporary location | |
| local_path = hf_hub_download( | |
| repo_id=self.dataset_id, | |
| filename=csv_file, | |
| repo_type="dataset", | |
| token=self.hf_token | |
| ) | |
| # Read CSV with appropriate settings | |
| # First, let's check if we need to skip the first row | |
| df = pd.read_csv(local_path) | |
| # If the first row contains "Interventions (sortie sous excel)", skip it | |
| if df.columns[0].startswith('Interventions'): | |
| df = pd.read_csv(local_path) | |
| all_dataframes.append(df) | |
| print(f" β {csv_file}: {len(df)} rows") | |
| except Exception as e: | |
| print(f" β οΈ Failed to load {csv_file}: {e}") | |
| continue | |
| if not all_dataframes: | |
| raise ValueError("No CSV files could be loaded successfully") | |
| # Combine all dataframes | |
| combined_df = pd.concat(all_dataframes, ignore_index=True) | |
| return combined_df | |
| def _preprocess_data(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Preprocess the agricultural data.""" | |
| print(f"π§ Preprocessing {len(df)} records...") | |
| print(f"π Available columns: {list(df.columns)}") | |
| # Convert date columns | |
| date_columns = ['datedebut', 'datefin'] | |
| for col in date_columns: | |
| if col in df.columns: | |
| df[col] = pd.to_datetime(df[col], format='%d/%m/%y', errors='coerce') | |
| # Convert numeric columns | |
| numeric_columns = ['surfparc', 'quantitetot', 'neffqte', 'peffqte', 'kqte', | |
| 'teneurn', 'teneurp', 'teneurk', 'keq', 'volumebo'] | |
| for col in numeric_columns: | |
| if col in df.columns: | |
| df[col] = pd.to_numeric(df[col], errors='coerce') | |
| # Add derived columns (with error checking) | |
| if 'millesime' in df.columns: | |
| df['year'] = df['millesime'] | |
| else: | |
| print("β οΈ Column 'millesime' not found, trying to infer year from filename or date") | |
| # Try to extract year from date if available | |
| if 'datedebut' in df.columns: | |
| df['year'] = pd.to_datetime(df['datedebut'], errors='coerce').dt.year | |
| else: | |
| # Set a default year or raise error | |
| print("β Cannot determine year - setting to 2024 as default") | |
| df['year'] = 2024 | |
| if 'libelleusag' in df.columns: | |
| df['crop_type'] = df['libelleusag'] | |
| else: | |
| df['crop_type'] = 'unknown' | |
| if 'libevenem' in df.columns: | |
| df['intervention_type'] = df['libevenem'] | |
| else: | |
| df['intervention_type'] = 'unknown' | |
| if 'familleprod' in df.columns: | |
| df['product_family'] = df['familleprod'] | |
| # Calculate IFT (Treatment Frequency Index) for herbicides | |
| df['is_herbicide'] = df['familleprod'].str.contains('Herbicides', na=False) | |
| df['is_fungicide'] = df['familleprod'].str.contains('Fongicides', na=False) | |
| df['is_insecticide'] = df['familleprod'].str.contains('Insecticides', na=False) | |
| else: | |
| df['product_family'] = 'unknown' | |
| df['is_herbicide'] = False | |
| df['is_fungicide'] = False | |
| df['is_insecticide'] = False | |
| if 'nomparc' in df.columns: | |
| df['plot_name'] = df['nomparc'] | |
| else: | |
| df['plot_name'] = 'unknown' | |
| if 'numparcell' in df.columns: | |
| df['plot_number'] = df['numparcell'] | |
| else: | |
| df['plot_number'] = 0 | |
| if 'surfparc' in df.columns: | |
| df['plot_surface'] = df['surfparc'] | |
| else: | |
| df['plot_surface'] = 1.0 | |
| print(f"β Preprocessing completed: {len(df)} records with {len(df.columns)} columns") | |
| return df | |
| def get_years_available(self) -> List[int]: | |
| """Get list of available years in the data.""" | |
| df = self.load_all_files() | |
| return sorted(df['year'].dropna().unique().astype(int).tolist()) | |
| def get_plots_available(self) -> List[str]: | |
| """Get list of available plots.""" | |
| df = self.load_all_files() | |
| return sorted(df['plot_name'].dropna().unique().tolist()) | |
| def get_crops_available(self) -> List[str]: | |
| """Get list of available crop types.""" | |
| df = self.load_all_files() | |
| return sorted(df['crop_type'].dropna().unique().tolist()) | |
| def filter_data(self, | |
| years: Optional[List[int]] = None, | |
| plots: Optional[List[str]] = None, | |
| crops: Optional[List[str]] = None, | |
| intervention_types: Optional[List[str]] = None) -> pd.DataFrame: | |
| """Filter the data based on criteria.""" | |
| df = self.load_all_files() | |
| if years: | |
| df = df[df['year'].isin(years)] | |
| if plots: | |
| df = df[df['plot_name'].isin(plots)] | |
| if crops: | |
| df = df[df['crop_type'].isin(crops)] | |
| if intervention_types: | |
| df = df[df['intervention_type'].isin(intervention_types)] | |
| return df | |
| def get_herbicide_usage(self, years: Optional[List[int]] = None) -> pd.DataFrame: | |
| """Get herbicide usage data for weed pressure analysis.""" | |
| df = self.filter_data(years=years) | |
| herbicide_data = df[df['is_herbicide'] == True].copy() | |
| # Group by plot, year, and crop | |
| usage_summary = herbicide_data.groupby(['plot_name', 'year', 'crop_type']).agg({ | |
| 'quantitetot': 'sum', | |
| 'produit': 'count', # Number of herbicide applications | |
| 'surfparc': 'first' | |
| }).reset_index() | |
| usage_summary.columns = ['plot_name', 'year', 'crop_type', 'total_quantity', 'num_applications', 'plot_surface'] | |
| usage_summary['ift_herbicide'] = usage_summary['num_applications'] / usage_summary['plot_surface'] | |
| return usage_summary | |
| def upload_to_huggingface(self) -> str: | |
| """Upload data to Hugging Face dataset.""" | |
| if not self.hf_token: | |
| raise ValueError("HF_TOKEN not provided") | |
| df = self.load_all_files() | |
| dataset = Dataset.from_pandas(df) | |
| # Upload to Hugging Face | |
| dataset.push_to_hub( | |
| repo_id=self.dataset_id, | |
| token=self.hf_token, | |
| private=False | |
| ) | |
| return f"Data uploaded to {self.dataset_id}" | |
| def clear_cache(self): | |
| """Clear cached data to force reload from Hugging Face.""" | |
| self.data_cache.clear() | |
| print("π Cache cleared - will reload from Hugging Face on next access") | |