Spaces:
Sleeping
Sleeping
| """ | |
| Analysis tools for agricultural data. | |
| Provides statistical analysis and visualization capabilities. | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from plotly.subplots import make_subplots | |
| from sklearn.ensemble import RandomForestRegressor | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import mean_squared_error, r2_score | |
| from typing import List, Dict, Optional, Tuple, Any | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| class AgriculturalAnalyzer: | |
| """Provides analysis tools for agricultural intervention data.""" | |
| def __init__(self, data_loader): | |
| self.data_loader = data_loader | |
| self.prediction_models = {} | |
| def analyze_weed_pressure_trends(self, | |
| years: Optional[List[int]] = None, | |
| plots: Optional[List[str]] = None) -> Dict[str, Any]: | |
| """Analyze weed pressure trends based on herbicide usage.""" | |
| herbicide_data = self.data_loader.get_herbicide_usage(years=years) | |
| if plots: | |
| herbicide_data = herbicide_data[herbicide_data['plot_name'].isin(plots)] | |
| # Calculate trends | |
| trends = {} | |
| # Overall IFT trend by year | |
| yearly_ift = herbicide_data.groupby('year')['ift_herbicide'].mean().reset_index() | |
| trends['yearly_ift'] = yearly_ift | |
| # IFT trend by plot | |
| plot_ift = herbicide_data.groupby(['plot_name', 'year'])['ift_herbicide'].mean().reset_index() | |
| trends['plot_ift'] = plot_ift | |
| # IFT trend by crop type | |
| crop_ift = herbicide_data.groupby(['crop_type', 'year'])['ift_herbicide'].mean().reset_index() | |
| trends['crop_ift'] = crop_ift | |
| # Statistical summary | |
| summary_stats = { | |
| 'mean_ift': herbicide_data['ift_herbicide'].mean(), | |
| 'std_ift': herbicide_data['ift_herbicide'].std(), | |
| 'min_ift': herbicide_data['ift_herbicide'].min(), | |
| 'max_ift': herbicide_data['ift_herbicide'].max(), | |
| 'total_applications': herbicide_data['num_applications'].sum(), | |
| 'unique_plots': herbicide_data['plot_name'].nunique(), | |
| 'unique_crops': herbicide_data['crop_type'].nunique() | |
| } | |
| trends['summary'] = summary_stats | |
| return trends | |
| def create_weed_pressure_visualization(self, | |
| years: Optional[List[int]] = None, | |
| plots: Optional[List[str]] = None) -> go.Figure: | |
| """Create interactive visualization of weed pressure trends.""" | |
| trends = self.analyze_weed_pressure_trends(years=years, plots=plots) | |
| # Create subplots | |
| fig = make_subplots( | |
| rows=2, cols=2, | |
| subplot_titles=('IFT Evolution par Année', 'IFT par Parcelle', | |
| 'IFT par Type de Culture', 'Distribution IFT'), | |
| specs=[[{"secondary_y": False}, {"secondary_y": False}], | |
| [{"secondary_y": False}, {"secondary_y": False}]] | |
| ) | |
| # Plot 1: Yearly IFT trend | |
| yearly_data = trends['yearly_ift'] | |
| fig.add_trace( | |
| go.Scatter(x=yearly_data['year'], y=yearly_data['ift_herbicide'], | |
| mode='lines+markers', name='IFT Moyen', | |
| line=dict(color='blue')), | |
| row=1, col=1 | |
| ) | |
| # Plot 2: IFT by plot | |
| plot_data = trends['plot_ift'] | |
| for plot in plot_data['plot_name'].unique(): | |
| plot_subset = plot_data[plot_data['plot_name'] == plot] | |
| fig.add_trace( | |
| go.Scatter(x=plot_subset['year'], y=plot_subset['ift_herbicide'], | |
| mode='lines+markers', name=f'Parcelle {plot}', | |
| showlegend=False), | |
| row=1, col=2 | |
| ) | |
| # Plot 3: IFT by crop | |
| crop_data = trends['crop_ift'] | |
| for crop in crop_data['crop_type'].unique()[:5]: # Limit to top 5 crops | |
| crop_subset = crop_data[crop_data['crop_type'] == crop] | |
| fig.add_trace( | |
| go.Scatter(x=crop_subset['year'], y=crop_subset['ift_herbicide'], | |
| mode='lines+markers', name=crop, | |
| showlegend=False), | |
| row=2, col=1 | |
| ) | |
| # Plot 4: IFT distribution | |
| herbicide_data = self.data_loader.get_herbicide_usage(years=years) | |
| if plots: | |
| herbicide_data = herbicide_data[herbicide_data['plot_name'].isin(plots)] | |
| fig.add_trace( | |
| go.Histogram(x=herbicide_data['ift_herbicide'], | |
| name='Distribution IFT', | |
| showlegend=False), | |
| row=2, col=2 | |
| ) | |
| # Update layout | |
| fig.update_layout( | |
| title_text="Analyse de la Pression Adventices (IFT Herbicides)", | |
| height=800, | |
| showlegend=True | |
| ) | |
| # Update axes labels | |
| fig.update_xaxes(title_text="Année", row=1, col=1) | |
| fig.update_yaxes(title_text="IFT Herbicide", row=1, col=1) | |
| fig.update_xaxes(title_text="Année", row=1, col=2) | |
| fig.update_yaxes(title_text="IFT Herbicide", row=1, col=2) | |
| fig.update_xaxes(title_text="Année", row=2, col=1) | |
| fig.update_yaxes(title_text="IFT Herbicide", row=2, col=1) | |
| fig.update_xaxes(title_text="IFT Herbicide", row=2, col=2) | |
| fig.update_yaxes(title_text="Fréquence", row=2, col=2) | |
| return fig | |
| def analyze_crop_rotation_impact(self) -> pd.DataFrame: | |
| """Analyze the impact of crop rotation on weed pressure.""" | |
| df = self.data_loader.load_all_files() | |
| # Group by plot and year to get crop sequences | |
| plot_years = df.groupby(['plot_name', 'year'])['crop_type'].first().reset_index() | |
| plot_years = plot_years.sort_values(['plot_name', 'year']) | |
| # Create rotation sequences | |
| rotations = [] | |
| for plot in plot_years['plot_name'].unique(): | |
| plot_data = plot_years[plot_years['plot_name'] == plot].sort_values('year') | |
| crops = plot_data['crop_type'].tolist() | |
| years = plot_data['year'].tolist() | |
| for i in range(len(crops)-1): | |
| rotations.append({ | |
| 'plot_name': plot, | |
| 'year_from': years[i], | |
| 'year_to': years[i+1], | |
| 'crop_from': crops[i], | |
| 'crop_to': crops[i+1], | |
| 'rotation_type': f"{crops[i]} → {crops[i+1]}" | |
| }) | |
| rotation_df = pd.DataFrame(rotations) | |
| # Get herbicide usage for each rotation | |
| herbicide_data = self.data_loader.get_herbicide_usage() | |
| # Merge with rotation data | |
| rotation_analysis = rotation_df.merge( | |
| herbicide_data[['plot_name', 'year', 'ift_herbicide']], | |
| left_on=['plot_name', 'year_to'], | |
| right_on=['plot_name', 'year'], | |
| how='left' | |
| ) | |
| # Analyze rotation impact | |
| rotation_impact = rotation_analysis.groupby('rotation_type').agg({ | |
| 'ift_herbicide': ['mean', 'std', 'count'] | |
| }).round(3) | |
| rotation_impact.columns = ['mean_ift', 'std_ift', 'count'] | |
| rotation_impact = rotation_impact.reset_index() | |
| rotation_impact = rotation_impact[rotation_impact['count'] >= 2] # At least 2 observations | |
| rotation_impact = rotation_impact.sort_values('mean_ift') | |
| return rotation_impact | |
| def predict_weed_pressure(self, | |
| target_years: List[int] = [2025, 2026, 2027], | |
| plots: Optional[List[str]] = None) -> Dict[str, Any]: | |
| """Predict weed pressure for the next 3 years.""" | |
| # Prepare training data | |
| df = self.data_loader.load_all_files() | |
| herbicide_data = self.data_loader.get_herbicide_usage() | |
| # Create features for prediction | |
| features_df = [] | |
| for plot in herbicide_data['plot_name'].unique(): | |
| if plots and plot not in plots: | |
| continue | |
| plot_data = herbicide_data[herbicide_data['plot_name'] == plot].sort_values('year') | |
| for i in range(len(plot_data)): | |
| row = plot_data.iloc[i].copy() | |
| # Add historical features | |
| if i > 0: | |
| row['prev_ift'] = plot_data.iloc[i-1]['ift_herbicide'] | |
| row['prev_crop'] = plot_data.iloc[i-1]['crop_type'] | |
| else: | |
| row['prev_ift'] = 0 | |
| row['prev_crop'] = 'unknown' | |
| # Add trend features | |
| if i >= 2: | |
| recent_years = plot_data.iloc[i-2:i+1] | |
| row['ift_trend'] = np.polyfit(range(3), recent_years['ift_herbicide'], 1)[0] | |
| else: | |
| row['ift_trend'] = 0 | |
| features_df.append(row) | |
| features_df = pd.DataFrame(features_df) | |
| # Prepare features for ML model | |
| # Encode categorical variables | |
| crop_dummies = pd.get_dummies(features_df['crop_type'], prefix='crop') | |
| prev_crop_dummies = pd.get_dummies(features_df['prev_crop'], prefix='prev_crop') | |
| plot_dummies = pd.get_dummies(features_df['plot_name'], prefix='plot') | |
| X = pd.concat([ | |
| features_df[['year', 'plot_surface', 'prev_ift', 'ift_trend']], | |
| crop_dummies, | |
| prev_crop_dummies, | |
| plot_dummies | |
| ], axis=1) | |
| y = features_df['ift_herbicide'] | |
| # Remove rows with missing values | |
| mask = ~(X.isnull().any(axis=1) | y.isnull()) | |
| X = X[mask] | |
| y = y[mask] | |
| # Train model | |
| X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) | |
| model = RandomForestRegressor(n_estimators=100, random_state=42) | |
| model.fit(X_train, y_train) | |
| # Evaluate model | |
| y_pred = model.predict(X_test) | |
| mse = mean_squared_error(y_test, y_pred) | |
| r2 = r2_score(y_test, y_pred) | |
| # Make predictions for target years | |
| predictions = {} | |
| for year in target_years: | |
| year_predictions = [] | |
| # Get last known data for each plot | |
| plot_columns = [col for col in X.columns if col.startswith('plot_')] | |
| unique_plots = [col.replace('plot_', '') for col in plot_columns] | |
| for plot in unique_plots: | |
| if plots and plot not in plots: | |
| continue | |
| # Find last known data for this plot | |
| plot_mask = features_df['plot_name'] == plot | |
| if not plot_mask.any(): | |
| continue | |
| last_data = features_df[plot_mask].iloc[-1] | |
| # Create prediction features | |
| pred_row = pd.Series(index=X.columns, dtype=float) | |
| pred_row['year'] = year | |
| pred_row['plot_surface'] = last_data['plot_surface'] | |
| pred_row['prev_ift'] = last_data['ift_herbicide'] | |
| pred_row['ift_trend'] = last_data.get('ift_trend', 0) | |
| # Set plot dummy | |
| plot_col = f'plot_{plot}' | |
| if plot_col in pred_row.index: | |
| pred_row[plot_col] = 1 | |
| # Assume same crop as last year for now | |
| crop_col = f'crop_{last_data["crop_type"]}' | |
| if crop_col in pred_row.index: | |
| pred_row[crop_col] = 1 | |
| prev_crop_col = f'prev_crop_{last_data["crop_type"]}' | |
| if prev_crop_col in pred_row.index: | |
| pred_row[prev_crop_col] = 1 | |
| # Fill missing values with 0 | |
| pred_row = pred_row.fillna(0) | |
| # Make prediction | |
| pred_ift = model.predict([pred_row])[0] | |
| year_predictions.append({ | |
| 'plot_name': plot, | |
| 'year': year, | |
| 'predicted_ift': pred_ift, | |
| 'risk_level': 'low' if pred_ift < 1.0 else 'medium' if pred_ift < 2.0 else 'high' | |
| }) | |
| predictions[year] = pd.DataFrame(year_predictions) | |
| # Feature importance | |
| feature_importance = pd.DataFrame({ | |
| 'feature': X.columns, | |
| 'importance': model.feature_importances_ | |
| }).sort_values('importance', ascending=False) | |
| return { | |
| 'predictions': predictions, | |
| 'model_performance': {'mse': mse, 'r2': r2}, | |
| 'feature_importance': feature_importance | |
| } | |
| def identify_suitable_plots_for_sensitive_crops(self, | |
| target_years: List[int] = [2025, 2026, 2027], | |
| max_ift_threshold: float = 1.0) -> Dict[str, List[str]]: | |
| """Identify plots suitable for sensitive crops (peas, beans) based on low weed pressure.""" | |
| predictions = self.predict_weed_pressure(target_years=target_years) | |
| suitable_plots = {} | |
| for year in target_years: | |
| if year not in predictions['predictions']: | |
| continue | |
| year_data = predictions['predictions'][year] | |
| suitable = year_data[year_data['predicted_ift'] <= max_ift_threshold] | |
| suitable_plots[year] = suitable['plot_name'].tolist() | |
| return suitable_plots | |
| def analyze_herbicide_alternatives(self) -> pd.DataFrame: | |
| """Analyze herbicide usage patterns and suggest alternatives.""" | |
| df = self.data_loader.load_all_files() | |
| herbicides = df[df['is_herbicide'] == True] | |
| # Analyze herbicide usage by product | |
| herbicide_usage = herbicides.groupby(['produit', 'crop_type']).agg({ | |
| 'quantitetot': ['sum', 'mean', 'count'], | |
| 'codeamm': 'first' | |
| }).round(3) | |
| herbicide_usage.columns = ['total_quantity', 'avg_quantity', 'applications', 'amm_code'] | |
| herbicide_usage = herbicide_usage.reset_index() | |
| herbicide_usage = herbicide_usage.sort_values('applications', ascending=False) | |
| # Identify most used herbicides | |
| top_herbicides = herbicide_usage.head(20) | |
| return top_herbicides | |