""" Analysis tools for agricultural data. Provides statistical analysis and visualization capabilities. """ import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots from sklearn.ensemble import RandomForestRegressor from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error, r2_score from typing import List, Dict, Optional, Tuple, Any import warnings warnings.filterwarnings('ignore') class AgriculturalAnalyzer: """Provides analysis tools for agricultural intervention data.""" def __init__(self, data_loader): self.data_loader = data_loader self.prediction_models = {} def analyze_weed_pressure_trends(self, years: Optional[List[int]] = None, plots: Optional[List[str]] = None) -> Dict[str, Any]: """Analyze weed pressure trends based on herbicide usage.""" herbicide_data = self.data_loader.get_herbicide_usage(years=years) if plots: herbicide_data = herbicide_data[herbicide_data['plot_name'].isin(plots)] # Calculate trends trends = {} # Overall IFT trend by year yearly_ift = herbicide_data.groupby('year')['ift_herbicide'].mean().reset_index() trends['yearly_ift'] = yearly_ift # IFT trend by plot plot_ift = herbicide_data.groupby(['plot_name', 'year'])['ift_herbicide'].mean().reset_index() trends['plot_ift'] = plot_ift # IFT trend by crop type crop_ift = herbicide_data.groupby(['crop_type', 'year'])['ift_herbicide'].mean().reset_index() trends['crop_ift'] = crop_ift # Statistical summary summary_stats = { 'mean_ift': herbicide_data['ift_herbicide'].mean(), 'std_ift': herbicide_data['ift_herbicide'].std(), 'min_ift': herbicide_data['ift_herbicide'].min(), 'max_ift': herbicide_data['ift_herbicide'].max(), 'total_applications': herbicide_data['num_applications'].sum(), 'unique_plots': herbicide_data['plot_name'].nunique(), 'unique_crops': herbicide_data['crop_type'].nunique() } trends['summary'] = summary_stats return trends def create_weed_pressure_visualization(self, years: Optional[List[int]] = None, plots: Optional[List[str]] = None) -> go.Figure: """Create interactive visualization of weed pressure trends.""" trends = self.analyze_weed_pressure_trends(years=years, plots=plots) # Create subplots fig = make_subplots( rows=2, cols=2, subplot_titles=('IFT Evolution par Année', 'IFT par Parcelle', 'IFT par Type de Culture', 'Distribution IFT'), specs=[[{"secondary_y": False}, {"secondary_y": False}], [{"secondary_y": False}, {"secondary_y": False}]] ) # Plot 1: Yearly IFT trend yearly_data = trends['yearly_ift'] fig.add_trace( go.Scatter(x=yearly_data['year'], y=yearly_data['ift_herbicide'], mode='lines+markers', name='IFT Moyen', line=dict(color='blue')), row=1, col=1 ) # Plot 2: IFT by plot plot_data = trends['plot_ift'] for plot in plot_data['plot_name'].unique(): plot_subset = plot_data[plot_data['plot_name'] == plot] fig.add_trace( go.Scatter(x=plot_subset['year'], y=plot_subset['ift_herbicide'], mode='lines+markers', name=f'Parcelle {plot}', showlegend=False), row=1, col=2 ) # Plot 3: IFT by crop crop_data = trends['crop_ift'] for crop in crop_data['crop_type'].unique()[:5]: # Limit to top 5 crops crop_subset = crop_data[crop_data['crop_type'] == crop] fig.add_trace( go.Scatter(x=crop_subset['year'], y=crop_subset['ift_herbicide'], mode='lines+markers', name=crop, showlegend=False), row=2, col=1 ) # Plot 4: IFT distribution herbicide_data = self.data_loader.get_herbicide_usage(years=years) if plots: herbicide_data = herbicide_data[herbicide_data['plot_name'].isin(plots)] fig.add_trace( go.Histogram(x=herbicide_data['ift_herbicide'], name='Distribution IFT', showlegend=False), row=2, col=2 ) # Update layout fig.update_layout( title_text="Analyse de la Pression Adventices (IFT Herbicides)", height=800, showlegend=True ) # Update axes labels fig.update_xaxes(title_text="Année", row=1, col=1) fig.update_yaxes(title_text="IFT Herbicide", row=1, col=1) fig.update_xaxes(title_text="Année", row=1, col=2) fig.update_yaxes(title_text="IFT Herbicide", row=1, col=2) fig.update_xaxes(title_text="Année", row=2, col=1) fig.update_yaxes(title_text="IFT Herbicide", row=2, col=1) fig.update_xaxes(title_text="IFT Herbicide", row=2, col=2) fig.update_yaxes(title_text="Fréquence", row=2, col=2) return fig def analyze_crop_rotation_impact(self) -> pd.DataFrame: """Analyze the impact of crop rotation on weed pressure.""" df = self.data_loader.load_all_files() # Group by plot and year to get crop sequences plot_years = df.groupby(['plot_name', 'year'])['crop_type'].first().reset_index() plot_years = plot_years.sort_values(['plot_name', 'year']) # Create rotation sequences rotations = [] for plot in plot_years['plot_name'].unique(): plot_data = plot_years[plot_years['plot_name'] == plot].sort_values('year') crops = plot_data['crop_type'].tolist() years = plot_data['year'].tolist() for i in range(len(crops)-1): rotations.append({ 'plot_name': plot, 'year_from': years[i], 'year_to': years[i+1], 'crop_from': crops[i], 'crop_to': crops[i+1], 'rotation_type': f"{crops[i]} → {crops[i+1]}" }) rotation_df = pd.DataFrame(rotations) # Get herbicide usage for each rotation herbicide_data = self.data_loader.get_herbicide_usage() # Merge with rotation data rotation_analysis = rotation_df.merge( herbicide_data[['plot_name', 'year', 'ift_herbicide']], left_on=['plot_name', 'year_to'], right_on=['plot_name', 'year'], how='left' ) # Analyze rotation impact rotation_impact = rotation_analysis.groupby('rotation_type').agg({ 'ift_herbicide': ['mean', 'std', 'count'] }).round(3) rotation_impact.columns = ['mean_ift', 'std_ift', 'count'] rotation_impact = rotation_impact.reset_index() rotation_impact = rotation_impact[rotation_impact['count'] >= 2] # At least 2 observations rotation_impact = rotation_impact.sort_values('mean_ift') return rotation_impact def predict_weed_pressure(self, target_years: List[int] = [2025, 2026, 2027], plots: Optional[List[str]] = None) -> Dict[str, Any]: """Predict weed pressure for the next 3 years.""" # Prepare training data df = self.data_loader.load_all_files() herbicide_data = self.data_loader.get_herbicide_usage() # Create features for prediction features_df = [] for plot in herbicide_data['plot_name'].unique(): if plots and plot not in plots: continue plot_data = herbicide_data[herbicide_data['plot_name'] == plot].sort_values('year') for i in range(len(plot_data)): row = plot_data.iloc[i].copy() # Add historical features if i > 0: row['prev_ift'] = plot_data.iloc[i-1]['ift_herbicide'] row['prev_crop'] = plot_data.iloc[i-1]['crop_type'] else: row['prev_ift'] = 0 row['prev_crop'] = 'unknown' # Add trend features if i >= 2: recent_years = plot_data.iloc[i-2:i+1] row['ift_trend'] = np.polyfit(range(3), recent_years['ift_herbicide'], 1)[0] else: row['ift_trend'] = 0 features_df.append(row) features_df = pd.DataFrame(features_df) # Prepare features for ML model # Encode categorical variables crop_dummies = pd.get_dummies(features_df['crop_type'], prefix='crop') prev_crop_dummies = pd.get_dummies(features_df['prev_crop'], prefix='prev_crop') plot_dummies = pd.get_dummies(features_df['plot_name'], prefix='plot') X = pd.concat([ features_df[['year', 'plot_surface', 'prev_ift', 'ift_trend']], crop_dummies, prev_crop_dummies, plot_dummies ], axis=1) y = features_df['ift_herbicide'] # Remove rows with missing values mask = ~(X.isnull().any(axis=1) | y.isnull()) X = X[mask] y = y[mask] # Train model X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) model = RandomForestRegressor(n_estimators=100, random_state=42) model.fit(X_train, y_train) # Evaluate model y_pred = model.predict(X_test) mse = mean_squared_error(y_test, y_pred) r2 = r2_score(y_test, y_pred) # Make predictions for target years predictions = {} for year in target_years: year_predictions = [] # Get last known data for each plot plot_columns = [col for col in X.columns if col.startswith('plot_')] unique_plots = [col.replace('plot_', '') for col in plot_columns] for plot in unique_plots: if plots and plot not in plots: continue # Find last known data for this plot plot_mask = features_df['plot_name'] == plot if not plot_mask.any(): continue last_data = features_df[plot_mask].iloc[-1] # Create prediction features pred_row = pd.Series(index=X.columns, dtype=float) pred_row['year'] = year pred_row['plot_surface'] = last_data['plot_surface'] pred_row['prev_ift'] = last_data['ift_herbicide'] pred_row['ift_trend'] = last_data.get('ift_trend', 0) # Set plot dummy plot_col = f'plot_{plot}' if plot_col in pred_row.index: pred_row[plot_col] = 1 # Assume same crop as last year for now crop_col = f'crop_{last_data["crop_type"]}' if crop_col in pred_row.index: pred_row[crop_col] = 1 prev_crop_col = f'prev_crop_{last_data["crop_type"]}' if prev_crop_col in pred_row.index: pred_row[prev_crop_col] = 1 # Fill missing values with 0 pred_row = pred_row.fillna(0) # Make prediction pred_ift = model.predict([pred_row])[0] year_predictions.append({ 'plot_name': plot, 'year': year, 'predicted_ift': pred_ift, 'risk_level': 'low' if pred_ift < 1.0 else 'medium' if pred_ift < 2.0 else 'high' }) predictions[year] = pd.DataFrame(year_predictions) # Feature importance feature_importance = pd.DataFrame({ 'feature': X.columns, 'importance': model.feature_importances_ }).sort_values('importance', ascending=False) return { 'predictions': predictions, 'model_performance': {'mse': mse, 'r2': r2}, 'feature_importance': feature_importance } def identify_suitable_plots_for_sensitive_crops(self, target_years: List[int] = [2025, 2026, 2027], max_ift_threshold: float = 1.0) -> Dict[str, List[str]]: """Identify plots suitable for sensitive crops (peas, beans) based on low weed pressure.""" predictions = self.predict_weed_pressure(target_years=target_years) suitable_plots = {} for year in target_years: if year not in predictions['predictions']: continue year_data = predictions['predictions'][year] suitable = year_data[year_data['predicted_ift'] <= max_ift_threshold] suitable_plots[year] = suitable['plot_name'].tolist() return suitable_plots def analyze_herbicide_alternatives(self) -> pd.DataFrame: """Analyze herbicide usage patterns and suggest alternatives.""" df = self.data_loader.load_all_files() herbicides = df[df['is_herbicide'] == True] # Analyze herbicide usage by product herbicide_usage = herbicides.groupby(['produit', 'crop_type']).agg({ 'quantitetot': ['sum', 'mean', 'count'], 'codeamm': 'first' }).round(3) herbicide_usage.columns = ['total_quantity', 'avg_quantity', 'applications', 'amm_code'] herbicide_usage = herbicide_usage.reset_index() herbicide_usage = herbicide_usage.sort_values('applications', ascending=False) # Identify most used herbicides top_herbicides = herbicide_usage.head(20) return top_herbicides