mcp / data_loader.py
Tracy AndrΓ©
updated
fd8d3c6
raw
history blame
10.6 kB
"""
Data loader for agricultural intervention data.
Loads data exclusively from Hugging Face datasets.
"""
import pandas as pd
import numpy as np
from typing import List, Optional
import os
from datasets import Dataset, load_dataset
from huggingface_hub import HfApi, hf_hub_download
class AgriculturalDataLoader:
"""Loads and preprocesses agricultural intervention data from Hugging Face datasets."""
def __init__(self, hf_token: str = None, dataset_id: str = None):
self.hf_token = hf_token or os.environ.get("HF_TOKEN")
self.dataset_id = dataset_id or "HackathonCRA/2024"
self.data_cache = {}
def load_all_files(self) -> pd.DataFrame:
"""Load data from Hugging Face dataset."""
if 'combined_data' in self.data_cache:
return self.data_cache['combined_data']
# Load from Hugging Face only
df = self.load_from_huggingface()
self.data_cache['combined_data'] = df
return df
def load_from_huggingface(self) -> pd.DataFrame:
"""Load data from Hugging Face dataset."""
print(f"πŸ€— Loading dataset from Hugging Face: {self.dataset_id}")
try:
# Try multiple loading strategies
df = None
# Strategy 1: Try direct dataset loading
try:
dataset = load_dataset(
self.dataset_id,
token=self.hf_token,
streaming=False
)
df = dataset["train"].to_pandas()
print(f"βœ… Loaded via load_dataset: {len(df)} records")
except Exception as e1:
print(f"⚠️ load_dataset failed: {e1}")
# Strategy 2: Load individual CSV files from HF Hub
try:
df = self._load_csv_files_from_hub()
print(f"βœ… Loaded via individual CSV files: {len(df)} records")
except Exception as e2:
print(f"⚠️ CSV loading failed: {e2}")
raise ValueError(f"All loading strategies failed. Dataset: {e1}, CSV: {e2}")
if df is None or len(df) == 0:
raise ValueError("No data loaded from any strategy")
# Apply preprocessing
df = self._preprocess_data(df)
print(f"βœ… Successfully processed {len(df)} records from Hugging Face")
return df
except Exception as e:
raise ValueError(f"Failed to load dataset from Hugging Face: {e}")
def _load_csv_files_from_hub(self) -> pd.DataFrame:
"""Load individual CSV files from Hugging Face Hub."""
from huggingface_hub import hf_hub_download
import tempfile
print("πŸ“‚ Loading individual CSV files from HF Hub...")
# Get list of CSV files
api = HfApi()
try:
repo_info = api.repo_info(repo_id=self.dataset_id, repo_type="dataset", token=self.hf_token)
csv_files = [f.rfilename for f in repo_info.siblings if f.rfilename.endswith('.csv')]
except Exception as e:
raise ValueError(f"Failed to get repo info: {e}")
if not csv_files:
raise ValueError("No CSV files found in the dataset repository")
print(f"πŸ“‹ Found {len(csv_files)} CSV files")
all_dataframes = []
for csv_file in csv_files:
try:
# Download CSV file to temporary location
local_path = hf_hub_download(
repo_id=self.dataset_id,
filename=csv_file,
repo_type="dataset",
token=self.hf_token
)
# Read CSV with appropriate settings
# First, let's check if we need to skip the first row
df = pd.read_csv(local_path)
# If the first row contains "Interventions (sortie sous excel)", skip it
if df.columns[0].startswith('Interventions'):
df = pd.read_csv(local_path)
all_dataframes.append(df)
print(f" βœ… {csv_file}: {len(df)} rows")
except Exception as e:
print(f" ⚠️ Failed to load {csv_file}: {e}")
continue
if not all_dataframes:
raise ValueError("No CSV files could be loaded successfully")
# Combine all dataframes
combined_df = pd.concat(all_dataframes, ignore_index=True)
return combined_df
def _preprocess_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""Preprocess the agricultural data."""
print(f"πŸ”§ Preprocessing {len(df)} records...")
print(f"πŸ“‹ Available columns: {list(df.columns)}")
# Convert date columns
date_columns = ['datedebut', 'datefin']
for col in date_columns:
if col in df.columns:
df[col] = pd.to_datetime(df[col], format='%d/%m/%y', errors='coerce')
# Convert numeric columns
numeric_columns = ['surfparc', 'quantitetot', 'neffqte', 'peffqte', 'kqte',
'teneurn', 'teneurp', 'teneurk', 'keq', 'volumebo']
for col in numeric_columns:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
# Add derived columns (with error checking)
if 'millesime' in df.columns:
df['year'] = df['millesime']
else:
print("⚠️ Column 'millesime' not found, trying to infer year from filename or date")
# Try to extract year from date if available
if 'datedebut' in df.columns:
df['year'] = pd.to_datetime(df['datedebut'], errors='coerce').dt.year
else:
# Set a default year or raise error
print("❌ Cannot determine year - setting to 2024 as default")
df['year'] = 2024
if 'libelleusag' in df.columns:
df['crop_type'] = df['libelleusag']
else:
df['crop_type'] = 'unknown'
if 'libevenem' in df.columns:
df['intervention_type'] = df['libevenem']
else:
df['intervention_type'] = 'unknown'
if 'familleprod' in df.columns:
df['product_family'] = df['familleprod']
# Calculate IFT (Treatment Frequency Index) for herbicides
df['is_herbicide'] = df['familleprod'].str.contains('Herbicides', na=False)
df['is_fungicide'] = df['familleprod'].str.contains('Fongicides', na=False)
df['is_insecticide'] = df['familleprod'].str.contains('Insecticides', na=False)
else:
df['product_family'] = 'unknown'
df['is_herbicide'] = False
df['is_fungicide'] = False
df['is_insecticide'] = False
if 'nomparc' in df.columns:
df['plot_name'] = df['nomparc']
else:
df['plot_name'] = 'unknown'
if 'numparcell' in df.columns:
df['plot_number'] = df['numparcell']
else:
df['plot_number'] = 0
if 'surfparc' in df.columns:
df['plot_surface'] = df['surfparc']
else:
df['plot_surface'] = 1.0
print(f"βœ… Preprocessing completed: {len(df)} records with {len(df.columns)} columns")
return df
def get_years_available(self) -> List[int]:
"""Get list of available years in the data."""
df = self.load_all_files()
return sorted(df['year'].dropna().unique().astype(int).tolist())
def get_plots_available(self) -> List[str]:
"""Get list of available plots."""
df = self.load_all_files()
return sorted(df['plot_name'].dropna().unique().tolist())
def get_crops_available(self) -> List[str]:
"""Get list of available crop types."""
df = self.load_all_files()
return sorted(df['crop_type'].dropna().unique().tolist())
def filter_data(self,
years: Optional[List[int]] = None,
plots: Optional[List[str]] = None,
crops: Optional[List[str]] = None,
intervention_types: Optional[List[str]] = None) -> pd.DataFrame:
"""Filter the data based on criteria."""
df = self.load_all_files()
if years:
df = df[df['year'].isin(years)]
if plots:
df = df[df['plot_name'].isin(plots)]
if crops:
df = df[df['crop_type'].isin(crops)]
if intervention_types:
df = df[df['intervention_type'].isin(intervention_types)]
return df
def get_herbicide_usage(self, years: Optional[List[int]] = None) -> pd.DataFrame:
"""Get herbicide usage data for weed pressure analysis."""
df = self.filter_data(years=years)
herbicide_data = df[df['is_herbicide'] == True].copy()
# Group by plot, year, and crop
usage_summary = herbicide_data.groupby(['plot_name', 'year', 'crop_type']).agg({
'quantitetot': 'sum',
'produit': 'count', # Number of herbicide applications
'surfparc': 'first'
}).reset_index()
usage_summary.columns = ['plot_name', 'year', 'crop_type', 'total_quantity', 'num_applications', 'plot_surface']
usage_summary['ift_herbicide'] = usage_summary['num_applications'] / usage_summary['plot_surface']
return usage_summary
def upload_to_huggingface(self) -> str:
"""Upload data to Hugging Face dataset."""
if not self.hf_token:
raise ValueError("HF_TOKEN not provided")
df = self.load_all_files()
dataset = Dataset.from_pandas(df)
# Upload to Hugging Face
dataset.push_to_hub(
repo_id=self.dataset_id,
token=self.hf_token,
private=False
)
return f"Data uploaded to {self.dataset_id}"
def clear_cache(self):
"""Clear cached data to force reload from Hugging Face."""
self.data_cache.clear()
print("πŸ“‹ Cache cleared - will reload from Hugging Face on next access")