mcp / analysis_tools.py
Tracy André
updated
7ca901a
raw
history blame
15 kB
"""
Analysis tools for agricultural data.
Provides statistical analysis and visualization capabilities.
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from typing import List, Dict, Optional, Tuple, Any
import warnings
warnings.filterwarnings('ignore')
class AgriculturalAnalyzer:
"""Provides analysis tools for agricultural intervention data."""
def __init__(self, data_loader):
self.data_loader = data_loader
self.prediction_models = {}
def analyze_weed_pressure_trends(self,
years: Optional[List[int]] = None,
plots: Optional[List[str]] = None) -> Dict[str, Any]:
"""Analyze weed pressure trends based on herbicide usage."""
herbicide_data = self.data_loader.get_herbicide_usage(years=years)
if plots:
herbicide_data = herbicide_data[herbicide_data['plot_name'].isin(plots)]
# Calculate trends
trends = {}
# Overall IFT trend by year
yearly_ift = herbicide_data.groupby('year')['ift_herbicide'].mean().reset_index()
trends['yearly_ift'] = yearly_ift
# IFT trend by plot
plot_ift = herbicide_data.groupby(['plot_name', 'year'])['ift_herbicide'].mean().reset_index()
trends['plot_ift'] = plot_ift
# IFT trend by crop type
crop_ift = herbicide_data.groupby(['crop_type', 'year'])['ift_herbicide'].mean().reset_index()
trends['crop_ift'] = crop_ift
# Statistical summary
summary_stats = {
'mean_ift': herbicide_data['ift_herbicide'].mean(),
'std_ift': herbicide_data['ift_herbicide'].std(),
'min_ift': herbicide_data['ift_herbicide'].min(),
'max_ift': herbicide_data['ift_herbicide'].max(),
'total_applications': herbicide_data['num_applications'].sum(),
'unique_plots': herbicide_data['plot_name'].nunique(),
'unique_crops': herbicide_data['crop_type'].nunique()
}
trends['summary'] = summary_stats
return trends
def create_weed_pressure_visualization(self,
years: Optional[List[int]] = None,
plots: Optional[List[str]] = None) -> go.Figure:
"""Create interactive visualization of weed pressure trends."""
trends = self.analyze_weed_pressure_trends(years=years, plots=plots)
# Create subplots
fig = make_subplots(
rows=2, cols=2,
subplot_titles=('IFT Evolution par Année', 'IFT par Parcelle',
'IFT par Type de Culture', 'Distribution IFT'),
specs=[[{"secondary_y": False}, {"secondary_y": False}],
[{"secondary_y": False}, {"secondary_y": False}]]
)
# Plot 1: Yearly IFT trend
yearly_data = trends['yearly_ift']
fig.add_trace(
go.Scatter(x=yearly_data['year'], y=yearly_data['ift_herbicide'],
mode='lines+markers', name='IFT Moyen',
line=dict(color='blue')),
row=1, col=1
)
# Plot 2: IFT by plot
plot_data = trends['plot_ift']
for plot in plot_data['plot_name'].unique():
plot_subset = plot_data[plot_data['plot_name'] == plot]
fig.add_trace(
go.Scatter(x=plot_subset['year'], y=plot_subset['ift_herbicide'],
mode='lines+markers', name=f'Parcelle {plot}',
showlegend=False),
row=1, col=2
)
# Plot 3: IFT by crop
crop_data = trends['crop_ift']
for crop in crop_data['crop_type'].unique()[:5]: # Limit to top 5 crops
crop_subset = crop_data[crop_data['crop_type'] == crop]
fig.add_trace(
go.Scatter(x=crop_subset['year'], y=crop_subset['ift_herbicide'],
mode='lines+markers', name=crop,
showlegend=False),
row=2, col=1
)
# Plot 4: IFT distribution
herbicide_data = self.data_loader.get_herbicide_usage(years=years)
if plots:
herbicide_data = herbicide_data[herbicide_data['plot_name'].isin(plots)]
fig.add_trace(
go.Histogram(x=herbicide_data['ift_herbicide'],
name='Distribution IFT',
showlegend=False),
row=2, col=2
)
# Update layout
fig.update_layout(
title_text="Analyse de la Pression Adventices (IFT Herbicides)",
height=800,
showlegend=True
)
# Update axes labels
fig.update_xaxes(title_text="Année", row=1, col=1)
fig.update_yaxes(title_text="IFT Herbicide", row=1, col=1)
fig.update_xaxes(title_text="Année", row=1, col=2)
fig.update_yaxes(title_text="IFT Herbicide", row=1, col=2)
fig.update_xaxes(title_text="Année", row=2, col=1)
fig.update_yaxes(title_text="IFT Herbicide", row=2, col=1)
fig.update_xaxes(title_text="IFT Herbicide", row=2, col=2)
fig.update_yaxes(title_text="Fréquence", row=2, col=2)
return fig
def analyze_crop_rotation_impact(self) -> pd.DataFrame:
"""Analyze the impact of crop rotation on weed pressure."""
df = self.data_loader.load_all_files()
# Group by plot and year to get crop sequences
plot_years = df.groupby(['plot_name', 'year'])['crop_type'].first().reset_index()
plot_years = plot_years.sort_values(['plot_name', 'year'])
# Create rotation sequences
rotations = []
for plot in plot_years['plot_name'].unique():
plot_data = plot_years[plot_years['plot_name'] == plot].sort_values('year')
crops = plot_data['crop_type'].tolist()
years = plot_data['year'].tolist()
for i in range(len(crops)-1):
rotations.append({
'plot_name': plot,
'year_from': years[i],
'year_to': years[i+1],
'crop_from': crops[i],
'crop_to': crops[i+1],
'rotation_type': f"{crops[i]}{crops[i+1]}"
})
rotation_df = pd.DataFrame(rotations)
# Get herbicide usage for each rotation
herbicide_data = self.data_loader.get_herbicide_usage()
# Merge with rotation data
rotation_analysis = rotation_df.merge(
herbicide_data[['plot_name', 'year', 'ift_herbicide']],
left_on=['plot_name', 'year_to'],
right_on=['plot_name', 'year'],
how='left'
)
# Analyze rotation impact
rotation_impact = rotation_analysis.groupby('rotation_type').agg({
'ift_herbicide': ['mean', 'std', 'count']
}).round(3)
rotation_impact.columns = ['mean_ift', 'std_ift', 'count']
rotation_impact = rotation_impact.reset_index()
rotation_impact = rotation_impact[rotation_impact['count'] >= 2] # At least 2 observations
rotation_impact = rotation_impact.sort_values('mean_ift')
return rotation_impact
def predict_weed_pressure(self,
target_years: List[int] = [2025, 2026, 2027],
plots: Optional[List[str]] = None) -> Dict[str, Any]:
"""Predict weed pressure for the next 3 years."""
# Prepare training data
df = self.data_loader.load_all_files()
herbicide_data = self.data_loader.get_herbicide_usage()
# Create features for prediction
features_df = []
for plot in herbicide_data['plot_name'].unique():
if plots and plot not in plots:
continue
plot_data = herbicide_data[herbicide_data['plot_name'] == plot].sort_values('year')
for i in range(len(plot_data)):
row = plot_data.iloc[i].copy()
# Add historical features
if i > 0:
row['prev_ift'] = plot_data.iloc[i-1]['ift_herbicide']
row['prev_crop'] = plot_data.iloc[i-1]['crop_type']
else:
row['prev_ift'] = 0
row['prev_crop'] = 'unknown'
# Add trend features
if i >= 2:
recent_years = plot_data.iloc[i-2:i+1]
row['ift_trend'] = np.polyfit(range(3), recent_years['ift_herbicide'], 1)[0]
else:
row['ift_trend'] = 0
features_df.append(row)
features_df = pd.DataFrame(features_df)
# Prepare features for ML model
# Encode categorical variables
crop_dummies = pd.get_dummies(features_df['crop_type'], prefix='crop')
prev_crop_dummies = pd.get_dummies(features_df['prev_crop'], prefix='prev_crop')
plot_dummies = pd.get_dummies(features_df['plot_name'], prefix='plot')
X = pd.concat([
features_df[['year', 'plot_surface', 'prev_ift', 'ift_trend']],
crop_dummies,
prev_crop_dummies,
plot_dummies
], axis=1)
y = features_df['ift_herbicide']
# Remove rows with missing values
mask = ~(X.isnull().any(axis=1) | y.isnull())
X = X[mask]
y = y[mask]
# Train model
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Evaluate model
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
# Make predictions for target years
predictions = {}
for year in target_years:
year_predictions = []
# Get last known data for each plot
plot_columns = [col for col in X.columns if col.startswith('plot_')]
unique_plots = [col.replace('plot_', '') for col in plot_columns]
for plot in unique_plots:
if plots and plot not in plots:
continue
# Find last known data for this plot
plot_mask = features_df['plot_name'] == plot
if not plot_mask.any():
continue
last_data = features_df[plot_mask].iloc[-1]
# Create prediction features
pred_row = pd.Series(index=X.columns, dtype=float)
pred_row['year'] = year
pred_row['plot_surface'] = last_data['plot_surface']
pred_row['prev_ift'] = last_data['ift_herbicide']
pred_row['ift_trend'] = last_data.get('ift_trend', 0)
# Set plot dummy
plot_col = f'plot_{plot}'
if plot_col in pred_row.index:
pred_row[plot_col] = 1
# Assume same crop as last year for now
crop_col = f'crop_{last_data["crop_type"]}'
if crop_col in pred_row.index:
pred_row[crop_col] = 1
prev_crop_col = f'prev_crop_{last_data["crop_type"]}'
if prev_crop_col in pred_row.index:
pred_row[prev_crop_col] = 1
# Fill missing values with 0
pred_row = pred_row.fillna(0)
# Make prediction
pred_ift = model.predict([pred_row])[0]
year_predictions.append({
'plot_name': plot,
'year': year,
'predicted_ift': pred_ift,
'risk_level': 'low' if pred_ift < 1.0 else 'medium' if pred_ift < 2.0 else 'high'
})
predictions[year] = pd.DataFrame(year_predictions)
# Feature importance
feature_importance = pd.DataFrame({
'feature': X.columns,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
return {
'predictions': predictions,
'model_performance': {'mse': mse, 'r2': r2},
'feature_importance': feature_importance
}
def identify_suitable_plots_for_sensitive_crops(self,
target_years: List[int] = [2025, 2026, 2027],
max_ift_threshold: float = 1.0) -> Dict[str, List[str]]:
"""Identify plots suitable for sensitive crops (peas, beans) based on low weed pressure."""
predictions = self.predict_weed_pressure(target_years=target_years)
suitable_plots = {}
for year in target_years:
if year not in predictions['predictions']:
continue
year_data = predictions['predictions'][year]
suitable = year_data[year_data['predicted_ift'] <= max_ift_threshold]
suitable_plots[year] = suitable['plot_name'].tolist()
return suitable_plots
def analyze_herbicide_alternatives(self) -> pd.DataFrame:
"""Analyze herbicide usage patterns and suggest alternatives."""
df = self.data_loader.load_all_files()
herbicides = df[df['is_herbicide'] == True]
# Analyze herbicide usage by product
herbicide_usage = herbicides.groupby(['produit', 'crop_type']).agg({
'quantitetot': ['sum', 'mean', 'count'],
'codeamm': 'first'
}).round(3)
herbicide_usage.columns = ['total_quantity', 'avg_quantity', 'applications', 'amm_code']
herbicide_usage = herbicide_usage.reset_index()
herbicide_usage = herbicide_usage.sort_values('applications', ascending=False)
# Identify most used herbicides
top_herbicides = herbicide_usage.head(20)
return top_herbicides