Source code for grafs_e.prospective

import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import partial

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import pyomo.contrib.appsi.solvers as _appsi_solvers  # active les solveurs APPSI
import seaborn as sns
from matplotlib.colors import LogNorm
from pyomo.environ import minimize
from scipy.optimize import curve_fit, minimize
from scipy.stats import linregress
from sklearn.metrics import r2_score
from tqdm import tqdm

import grafs_e.graphes_objet as gr
from grafs_e.donnees import *
from grafs_e.N_class import DataLoader, FluxGenerator, NitrogenFlowModel

_ = _appsi_solvers


[docs] class scenario: """ Loads and prepares scenario input sheets for prospective nitrogen flow modeling. This class is used to initialize and manage scenario-specific data, which can be integrated into the `NitrogenFlowModel` to simulate future or alternative configurations of agro-food systems. It either takes a pre-initialized `DataLoader` instance or initializes one internally if not provided. :param scenario_path: Path to the scenario configuration file or directory. :type scenario_path: str :param dataloader: Optional pre-loaded DataLoader instance. If None, a default one is initialized. :type dataloader: DataLoader, optional :ivar scenario_path: Absolute path to the scenario data file. :vartype scenario_path: str :ivar dataloader: Instance of the DataLoader used to retrieve or modify base data. :vartype dataloader: DataLoader :ivar data_path: Path to the general data folder used for model input files. :vartype data_path: str """ def __init__(self, scenario_path, dataloader=None): self.data_path = os.path.join(os.getcwd(), "src", "grafs_e", "data") if dataloader is None: self.dataloader = DataLoader() else: self.dataloader = dataloader self.scenario_path = scenario_path
[docs] def historic_trend(self, region, excel_line): """ Extracts the historical trend of a value for a given region and Excel row in GRAFS_full.xslx. :param region: The region of interest. :type region: str :param excel_line: The row identifier (Excel line) to track. :type excel_line: int :return: List of historical values over available years. :rtype: list[float] """ L = [] for i in annees_disponibles: df = self.dataloader.pre_process_df(i, region) L.append(df.loc[df["index_excel"] == excel_line, region].item()) return L
[docs] @staticmethod def LU_excretion(dataloader, region, t=None): """ Computes average nitrogen excretion per livestock unit (LU) by species over time. :param dataloader: The DataLoader object to use. :type dataloader: DataLoader :param region: Region name. :type region: str :param t: Optional key to extract values at a specific year. :type t: str or int, optional :return: Dictionary (or list) of excretion values by livestock type. :rtype: dict or list """ L = {} livestock = { "bovines": [(1150, 1164), (1196, 1210)], "caprines": [(1166, 1168), (1212, 1214)], "ovines": [(1170, 1172), (1216, 1218)], "porcines": [(1174, 1178), (1220, 1224)], "poultry": [(1180, 1190), (1226, 1236)], "equines": [(1192, 1193), (1238, 1239)], } for i in annees_disponibles: df = dataloader.pre_process_df(i, region) L[i] = {} for type in livestock.keys(): heads = df.loc[df["index_excel"].between(livestock[type][0][0], livestock[type][0][1]), region] heads_type = df.loc[df["index_excel"].between(livestock[type][0][0], livestock[type][0][1]), "nom"] # Récupérer les coefficients LU correspondant à chaque type Lu = heads_type.map(lu_coefficients) # Calcul total LU LU = np.dot(heads, Lu) excr_cap = df.loc[df["index_excel"].between(livestock[type][1][0], livestock[type][1][1]), region] if heads.sum() == 0: L[i][type] = 0 else: L[i][type] = np.dot(heads, excr_cap) / LU if t != None: return [entry[t] for entry in L.values()] return L
[docs] @staticmethod def livestock_LU(dataloader, region, t=None): """ Computes the number of livestock units (LU) per species over time. :param dataloader: The DataLoader object to use. :type dataloader: DataLoader :param region: Region name. :type region: str :param t: Optional key to extract values at a specific year. :type t: str or int, optional :return: Dictionary (or list) of LU values by species. :rtype: dict or list """ LU = {} livestock = { "bovines": [(1150, 1164), (0, 15)], "caprines": [(1166, 1168), (15, 18)], "ovines": [(1170, 1172), (18, 21)], "porcines": [(1174, 1178), (21, 26)], "poultry": [(1180, 1190), (26, 37)], "equines": [(1192, 1193), (37, 39)], } lu_list = list(lu_coefficients.values()) for i in annees_disponibles: df = dataloader.pre_process_df(i, region) LU[i] = {} for type in livestock.keys(): heads = df.loc[df["index_excel"].between(livestock[type][0][0], livestock[type][0][1]), region] lu_coef = lu_list[livestock[type][1][0] : livestock[type][1][1]] LU[i][type] = np.dot(heads, lu_coef) if t != None: return [entry[t] for entry in LU.values()] return LU
[docs] @staticmethod def LU_prod(dataloader, region, t=None): """ Computes productivity (and dairy productivity) per LU for each livestock species. :param dataloader: The DataLoader object to use. :type dataloader: DataLoader :param region: Region name. :type region: str :param t: Optional key to extract values at a specific year. :type t: str or int, optional :return: Dictionary (or list) of productivity values. :rtype: dict or list """ LU_prod = {} index = { "bovines": [1017, 1024], "ovines": [1018, 1025], "caprines": [1019], "equines": [1022], "porcines": [1020], "poultry": [1021, 1023], } LU = scenario.livestock_LU(dataloader, region) for i in annees_disponibles: LU_prod[i] = {} df = dataloader.pre_process_df(i, region) for type in index.keys(): if LU[i][type] == 0: LU_prod[i][f"{type} productivity"] = 0 if type in ["bovines", "ovines", "caprines", "poultry"]: LU_prod[i][f"{type} dairy productivity"] = 0 else: LU_prod[i][f"{type} productivity"] = ( df.loc[df["index_excel"] == index[type][0], region].item() / LU[i][type] * 1e6 ) if type in ["bovines", "ovines", "caprines", "poultry"]: if type in ["ovines", "caprines"]: LU_prod[i][f"{type} dairy productivity"] = ( df.loc[df["index_excel"] == index["ovines"][1], region].item() * LU[i][type] / (LU[i]["ovines"] + LU[i]["caprines"]) ** 2 * 1e6 ) else: LU_prod[i][f"{type} dairy productivity"] = ( df.loc[df["index_excel"] == index[type][1], region].item() / LU[i][type] * 1e6 if LU[i][type] > 0 else 0 ) if t != None: return [entry[t] for entry in LU_prod.values()] return LU_prod
[docs] def extrapolate_recent_trend(self, data, future_year, alpha=7.0, seuil_bas=0, seuil_haut=None): """ Extrapolates a trend into the future using a weighted slope method. Recent years are given higher weight. No predefined functional form is assumed. :param data: List or array of historical values. :type data: list or np.array :param future_year: Year until which to extrapolate. :type future_year: int :param alpha: Slope weighting coefficient for recent years. :type alpha: float :param seuil_bas: Lower bound of extrapolated values. :type seuil_bas: float :param seuil_haut: Upper bound of extrapolated values. :type seuil_haut: float or None :return: (extended_years, extended_values, slope) :rtype: tuple[np.array, np.array, float] """ x = np.array([int(i) for i in annees_disponibles]) y = data # Calcul du poids exponentiel basé sur la distance à x[-1] # distances normalisées [0..1], 0 = point le plus récent, 1 = point le plus ancien dist = x[-1] - x if dist.max() > 0: dist = dist / dist.max() weights = np.exp(-alpha * dist) # Calcul des deltas et des pentes associées # delta_i = (y[i+1]-y[i]) / (x[i+1]-x[i]) deltas = [] delta_weights = [] for i in range(len(x) - 1): dx = x[i + 1] - x[i] slope_i = (y[i + 1] - y[i]) / dx # Pondérer par le poids du 2e point (ou la moyenne w[i], w[i+1] au choix) w_slope = weights[i + 1] deltas.append(slope_i) delta_weights.append(w_slope) deltas = np.array(deltas) delta_weights = np.array(delta_weights) # Pente moyenne pondérée par w[i+1] slope = np.average(deltas, weights=delta_weights) # Extrapolation "an par an" depuis x[-1] jusqu'à future_year year_start = int(x[-1]) + 1 year_end = int(future_year) extended_years = np.arange(year_start, year_end + 1, 1, dtype=float) # On démarre à la dernière valeur historique current_val = y[-1] extended_values = [] for yr in extended_years: # On avance d'une année => + slope * 1 an # (Si besoin, gérer un delta en jours ou fraction, mais ici 1 an) new_val = current_val + slope * (yr - (yr - 1)) # Application des seuils if seuil_bas is not None: new_val = max(new_val, seuil_bas) if seuil_haut is not None: new_val = min(new_val, seuil_haut) extended_values.append(new_val) current_val = new_val return extended_years, extended_values
[docs] def get_import_net(self, region): """ Extracts net nitrogen import value for a given region over all years. :param region: Region name. :type region: str :return: List of net import values. :rtype: list[float] """ L = [] for yr in annees_disponibles: df = self.dataloader.sheets_dict["GRAFS" + yr].copy() df.columns = df.iloc[0] correct_region = { "Pyrénées occid": "Pyrénées occidentales", "Pyrénées Orient": "Pyrénées Orientales", } if region in correct_region.keys(): region = correct_region[region] L.append(df[region].iloc[78]) return L
[docs] def logistic_urb_pop(self, region, year): """ Estimates the proportion of urban population using a logistic regression on historical data. :param region: Region name. :type region: str :param year: Target year for projection. :type year: int :return: Estimated urban population share (%). :rtype: float """ int_year = [int(i) for i in annees_disponibles] prop_urb = np.array(self.historic_trend(region, 6)) logit = np.log(prop_urb / (100 - prop_urb)) slope, intercept, r_value, p_value, std_err = linregress(int_year, logit) return 100.0 / (1.0 + np.exp(-(intercept + slope * int(year))))
[docs] def generate_crop_tab(self, region, function_name="Linear"): """ Generates a crop area table and fits a yield projection model for each crop. Available models: 'linear', 'linear2', 'exp', 'ratio'. :param region: Region to generate data for. :type region: str :param function_name: The model type used for yield extrapolation. :type function_name: str :return: DataFrame with crop areas and model fit parameters. :rtype: pandas.DataFrame """ df = self.dataloader.pre_process_df(annees_disponibles[-1], region) cultures_df = df.loc[df["index_excel"].isin(range(259, 294)), ("nom", region)] cultures_df[region] = cultures_df[region] * 100 / cultures_df[region].sum() df_insert = pd.DataFrame(cultures_df.values, columns=["Crops", "Area proportion (%)"]) df_insert.loc[len(df_insert)] = ["Natural meadow ", None] df_insert.loc[df_insert["Crops"] == "Forage cabbages & roots", "Crops"] = "Forage cabbages" df_insert.loc[df_insert["Crops"] == "rice", "Crops"] = "Rice" df_insert["Enforce Area"] = False df_insert.index = df_insert["Crops"] Y_pros = Y(self.dataloader) def fit_and_store(culture: str, region: str) -> dict: """ Calibre le modèle désigné par function_name pour <culture>. Ne renvoie que les paramètres utiles à ce modèle, plus le R² et le nom de la culture. """ F, Yr = Y_pros.get_Y(culture, region) # --- Cas sans données --- empty_data = len(F) == 0 or len(Yr) == 0 if function_name == "linear": a, b, _ = Y_pros.fit_Y_lin(culture, region) r2 = 0 if empty_data else r2_score(Yr, Y_pros.Y_th_lin(F, a, b)) return { "culture": culture, "a": round(a, 2), "b": round(b, 2), "r2": round(r2, 2), } elif function_name == "linear2": a, xb, c, _ = Y_pros.fit_Y_lin_2_scipy(culture, region) r2 = 0 if empty_data else r2_score(Yr, Y_pros.Y_th_lin_2(F, a, xb, c)) return { "culture": culture, "a": round(a, 2), "xb": int(xb), "c": round(c, 2), "r2": round(r2, 2), } elif function_name == "exp": ym, k, _ = Y_pros.fit_Y_exp(culture, region) r2 = 0 if empty_data else r2_score(Yr, Y_pros.Y_th_exp_cap(F, ym, k)) return { "culture": culture, "Ymax (kgN/ha)": int(ym), "k (kgN/ha)": int(k), "r2": round(r2, 2), } elif function_name == "ratio": ym, _ = Y_pros.fit_Y(culture, region) r2 = 0 if empty_data else r2_score(Yr, Y_pros.Y_th(F, ym)) return { "culture": culture, "Ymax (kgN/ha)": ym, "r2": round(r2, 2), } else: raise ValueError( f"Unknown model : {function_name}. Available models are 'linear', 'linear2', 'exp', 'ratio'." ) all_cultures = cultures + legumineuses + prairies results = [] run_fit = partial(fit_and_store, region=region) with ThreadPoolExecutor(max_workers=os.cpu_count() - 1) as executor: futures = {executor.submit(run_fit, culture): culture for culture in all_cultures} with tqdm( total=len(all_cultures), desc=f"Fitting models ({region})", position=1, leave=False, dynamic_ncols=True ) as pbar: for future in as_completed(futures): result = future.result() results.append(result) pbar.update(1) pbar.refresh() df_temp = pd.DataFrame(results).set_index("culture") # Ajouter (ou mettre à jour) les colonnes dans df_insert for col in df_temp.columns: df_insert[col] = df_temp[col] return df_insert
[docs] def pre_generate_scenario_excel(self, function_name="Linear"): """ Generates pre-filled scenario Excel files for each region based on fitted crop data. :param function_name: The model type used for yield extrapolation ('Linear', 'exp', etc.). :type function_name: str :return: None """ model_sheets = pd.read_excel(os.path.join(self.data_path, "scenario.xlsx"), sheet_name=None) for region in tqdm( regions, total=len(regions), desc="Régions", position=0 ): # tqdm(regions[23:], total=len(regions[23:]), desc="Regions", position=0): sheets = {} sheet_corres = { "doc": "doc", "main scenario": "main", "Surface changes": "area", "technical scenario": "technical", } for sheet_name, df in model_sheets.items(): sheets[sheet_corres[sheet_name]] = df sheets["area"] = self.generate_crop_tab(region, function_name) target_dir = os.path.join(self.scenario_path, "pre_gen", function_name) os.makedirs(target_dir, exist_ok=True) # Crée les dossiers si besoin file_path = os.path.join(target_dir, f"{region}.xlsx") with pd.ExcelWriter(file_path, engine="openpyxl") as writer: for sheet_name, df in sheets.items(): df.to_excel(writer, sheet_name=sheet_name, index=False)
[docs] def generate_scenario_excel(self, year, region, name, function_name="Linear"): """ Generates a full scenario Excel file for a given region and year. This method populates all scenario sheets (documentation, main variables, area distribution, technical parameters) using historical data, regional projections (e.g., population), livestock structure, and crop productivity fits. The output file includes default values for "Business as usual" configurations, which serve as a baseline for future simulations in prospective mode. :param year: The future year for which to generate the scenario. :type year: int :param region: The region to generate the scenario for. :type region: str :param name: File name (without extension) of the output Excel scenario. :type name: str :param function_name: Name of the yield model used for projection (e.g., 'Linear', 'Exponential'). :type function_name: str :return: None. The Excel file is saved to the configured scenario path. :rtype: None """ self.region = region self.year = year self.last_data_year = annees_disponibles[-1] try: self.data = self.dataloader.pre_process_df(self.last_data_year, region) except: self.data = self.dataloader.pre_process_df(self.last_data_year, "France") print(f"No region named {region} in the data") model_sheets = pd.read_excel( os.path.join(self.data_path, "scenario_region", function_name, self.region + ".xlsx"), sheet_name=None ) sheets = {} # sheet_corres = { # "doc": "doc", # "main scenario": "main", # "Surface changes": "area", # "technical scenario": "technical", # } sheet_corres = { "doc": "doc", "main": "main", "area": "area", "technical": "technical", } for sheet_name, df in model_sheets.items(): sheets[sheet_corres[sheet_name]] = df sheets["doc"][None] = None sheets["doc"].iloc[14, 1] = name sheets["doc"].iloc[15, 1] = region sheets["doc"].iloc[16, 1] = year # Du à une mauvaise construction des fiches scenar... if function_name == "Exponential": sheets["doc"].loc[len(sheets["doc"])] = ["Production function", function_name] else: sheets["doc"].iloc[17, 1] = function_name def format_dep(dep_series): return " + ".join(dep_series.astype(str).unique()) regions_dict = { "Ile de France": [ "Paris", "Seine-et-Marne", "Yvelines", "Essonne", "Hauts-de-Seine", "Seine-Saint-Denis", "Val-de-Marne", "Val-d'Oise", ], "Eure": ["Eure"], "Eure-et-Loir": ["Eure-et-Loir"], "Picardie": ["Aisne", "Oise", "Somme"], "Calvados-Orne": ["Calvados", "Orne"], "Seine Maritime": ["Seine-Maritime"], "Manche": ["Manche"], "Nord Pas de Calais": ["Nord", "Pas-de-Calais"], "Champ-Ard-Yonne": ["Ardennes", "Aube", "Marne", "Yonne"], "Bourgogne": ["Côte-d'Or", "Haute-Marne"], "Grande Lorraine": ["Meurthe-et-Moselle", "Meuse", "Moselle", "Vosges", "Haute-Saône"], "Alsace": ["Bas-Rhin", "Haut-Rhin"], "Bretagne": ["Côtes-d'Armor", "Finistère", "Ille-et-Vilaine", "Morbihan"], "Vendée-Charente": ["Vendée", "Charente", "Charente-Maritime"], "Loire aval": ["Loire-Atlantique", "Maine-et-Loire", "Deux-Sèvres", "Mayenne", "Sarthe"], "Loire Centrale": ["Loir-et-Cher", "Indre-et-Loire", "Cher", "Loiret", "Indre", "Vienne"], "Loire Amont": [ "Saône-et-Loire", "Nièvre", "Loire", "Haute-Loire", "Allier", "Puy-de-Dôme", "Creuse", "Haute-Vienne", ], "Grand Jura": ["Doubs", "Jura", "Territoire-de-Belfort"], "Savoie": ["Savoie", "Haute-Savoie"], "Ain-Rhône": ["Ain", "Rhône"], "Alpes": ["Alpes-de-Haute-Provence", "Hautes-Alpes"], "Isère-Drôme Ardèche": ["Isère", "Drôme", "Ardèche"], "Aveyron-Lozère": ["Aveyron", "Lozère"], "Garonne": ["Haute-Garonne", "Lot-et-Garonne", "Tarn-et-Garonne", "Gers", "Aude", "Tarn", "Ariège"], "Gironde": ["Gironde"], "Pyrénées occid": ["Pyrénées-Atlantiques", "Hautes-Pyrénées"], "Landes": ["Landes"], "Dor-Lot": ["Dordogne", "Lot"], "Cantal-Corrèze": ["Cantal", "Corrèze"], "Grand Marseille": ["Bouches-du-Rhône", "Vaucluse"], "Côte d'Azur": ["Alpes-Maritimes", "Var"], "Gard-Hérault": ["Gard", "Hérault"], "Pyrénées Orient": ["Pyrénées-Orientales"], } # Create a mapping dictionary {Department: Region} department_to_region = { department: region for region, departments in regions_dict.items() for department in departments } proj_pop = pd.read_excel( os.path.join(self.data_path, "projections_pop.xlsx"), sheet_name="Population_DEP", skiprows=5 ) proj_pop["Region"] = proj_pop["LIBDEP"].map(department_to_region) proj_pop = proj_pop.dropna(subset=["Region"]) grouped_proj_pop = ( proj_pop.groupby("Region") .agg( { "DEP": format_dep, # Format DEP column as "DEP1 + DEP2 + ..." **{ col: "sum" for col in proj_pop.select_dtypes(include="number").columns }, # Sum all numerical columns } ) .reset_index() ) if region == "France": proj = 0 for r in regions: proj += grouped_proj_pop.loc[grouped_proj_pop["Region"] == r, f"POP_{year}"].item() else: proj = grouped_proj_pop.loc[grouped_proj_pop["Region"] == region, f"POP_{year}"].item() sheets["main"].loc[sheets["main"]["Variable"] == "Population", "Business as usual"] = ( proj / 1000 ) # to be in thousands, not in millions ! if self.data is not None: sheets["main"].loc[ sheets["main"]["Variable"] == "Total per capita protein ingestion", "Business as usual" ] = self.historic_trend(region, 8)[-1] sheets["main"].loc[ sheets["main"]["Variable"] == "Vegetal per capita protein ingestion", "Business as usual" ] = self.historic_trend(region, 9)[-1] sheets["main"].loc[ sheets["main"]["Variable"] == "Edible animal per capita protein ingestion (excl fish)", "Business as usual", ] = self.historic_trend(region, 10)[-1] sheets["main"].loc[ sheets["main"]["Variable"] == "Edible animal per capita protein ingestion (excl fish)", "Business as usual", ] = self.historic_trend(region, 10)[-1] sheets["main"].loc[ sheets["main"]["Variable"] == "Synth N fertilizer application to cropland", "Business as usual", ] = self.historic_trend(region, 27)[-1] sheets["main"].loc[ sheets["main"]["Variable"] == "Synth N fertilizer application to grassland", "Business as usual", ] = self.historic_trend(region, 29)[-1] sheets["technical"].loc[ sheets["technical"]["Variable"] == "N recycling rate of human excretion in urban area", "Business as usual", ] = self.historic_trend(region, 49)[-1] sheets["technical"].loc[ sheets["technical"]["Variable"] == "N recycling rate of human excretion in rural area", "Business as usual", ] = self.historic_trend(region, 50)[-1] sheets["technical"].loc[ sheets["technical"]["Variable"] == "NH3 volatilization coefficient for synthetic nitrogen", "Business as usual", ] = self.historic_trend(region, 31)[-1] sheets["technical"].loc[ sheets["technical"]["Variable"] == "Indirect N2O volatilization coefficient for synthetic nitrogen", "Business as usual", ] = self.historic_trend(region, 32)[-1] excel_dict = { "bovines": 1250, "ovines": 1264, "caprines": 1278, "porcines": 1292, "poultry": 1306, "equines": 1320, } # Excretion managment for t in betail: if t == "equine": t = "equines" sheets["technical"].loc[ sheets["technical"]["Variable"] == f"{t.capitalize()} % excretion on grassland", "Business as usual", ] = self.historic_trend(region, excel_dict[t])[-1] sheets["technical"].loc[ sheets["technical"]["Variable"] == f"{t.capitalize()} % excretion in the barn as litter manure", "Business as usual", ] = self.historic_trend(region, excel_dict[t] + 2)[-1] sheets["technical"].loc[ sheets["technical"]["Variable"] == f"{t.capitalize()} % excretion in the barn as other manure", "Business as usual", ] = self.historic_trend(region, excel_dict[t] + 3)[-1] sheets["technical"].loc[ sheets["technical"]["Variable"] == f"{t.capitalize()} % excretion in the barn as slurry", "Business as usual", ] = self.historic_trend(region, excel_dict[t] + 4)[-1] # LU prop LU_prop = self.livestock_LU(self.dataloader, self.region)[annees_disponibles[-1]] LU_prop_tot = sum(LU_prop.values()) LU_prop = {key: value / LU_prop_tot for key, value in LU_prop.items()} for t in betail: if t == "equine": t = "equines" sheets["technical"].loc[ sheets["technical"]["Variable"] == f"{t.capitalize()} LU", "Business as usual", ] = LU_prop[t] # Historical trend sheets["main"].loc[ sheets["main"]["Variable"] == "Feed nitrogen net import", "Business as usual", ] = self.extrapolate_recent_trend(self.historic_trend(self.region, 1009), self.year, seuil_bas=None)[1][-1] for type in betail: if type == "equine": type = "equines" sheets["technical"].loc[ sheets["technical"]["Variable"] == f"kgN excreted by {type} LU", "Business as usual", ] = self.extrapolate_recent_trend(self.LU_excretion(self.dataloader, self.region, type), self.year)[1][ -1 ] for type in betail: if type == "equine": type = "equines" sheets["technical"].loc[ sheets["technical"]["Variable"] == f"{type.capitalize()} productivity", "Business as usual", ] = self.LU_prod(self.dataloader, self.region, f"{type} productivity")[-1] if type in ["bovines", "ovines", "caprines", "poultry"]: sheets["technical"].loc[ sheets["technical"]["Variable"] == f"{type.capitalize()} dairy productivity", "Business as usual", ] = self.LU_prod(self.dataloader, self.region, f"{type} dairy productivity")[-1] tot_LU = [] LU_prop_hist = self.livestock_LU(self.dataloader, self.region) for yr in annees_disponibles: tot_LU.append(sum(LU_prop_hist[yr].values())) sheets["main"].loc[ sheets["main"]["Variable"] == "Total LU", "Business as usual", ] = self.extrapolate_recent_trend(tot_LU, self.year, seuil_bas=0)[1][-1] sheets["main"].loc[ sheets["main"]["Variable"] == "Arable area", "Business as usual", ] = self.extrapolate_recent_trend(self.historic_trend(self.region, 23), self.year, seuil_bas=0)[1][-1] sheets["main"].loc[ sheets["main"]["Variable"] == "Permanent grassland area", "Business as usual", ] = self.extrapolate_recent_trend(self.historic_trend(self.region, 22), self.year, seuil_bas=0)[1][-1] # Import (need GRAFS) sheets["main"].loc[ sheets["main"]["Variable"] == "Net import of vegetal pdcts", "Business as usual", ] = self.extrapolate_recent_trend(self.get_import_net(self.region), self.year, seuil_bas=None)[1][-1] # Proportion urbaine sheets["main"].loc[ sheets["main"]["Variable"] == "Urban population", "Business as usual", ] = self.logistic_urb_pop(self.region, self.year) # surface prop tab # sheets["area"] = self.generate_crop_tab(self.region) with pd.ExcelWriter(os.path.join(self.scenario_path, name + ".xlsx"), engine="openpyxl") as writer: for sheet_name, df in sheets.items(): df.to_excel(writer, sheet_name=sheet_name, index=False) # 👉 Récupérer le workbook ouvert via le writer pour ajouter une case pour vérifier la somme des proportions. wb = writer.book # ✅ Insérer une formule après l'écriture dans la feuille "area" sheet = wb["area"] # ✅ Trouver la dernière ligne (max_row donne la dernière ligne non vide) last_row = sheet.max_row + 2 # ✅ Insérer le texte et la formule Excel directement sheet[f"A{last_row}"] = "Proportion area sum correct ?" sheet[f"B{last_row}"] = f'=IF(SUM(B2:B{last_row - 3})=100, "✅ OK", "❌ Erreur")'
[docs] def generate_base_scenar(self, prod_func): """ Generates baseline scenario Excel files for all regions, containing only the crop area distribution sheet. This method is used to create an initial batch of region-specific scenario templates, filled with yield model projections for each crop, but without demand-side or technical assumptions. :param prod_func: Name of the crop production function to apply (e.g., 'Linear', 'Exponential'). :type prod_func: str :return: None. Excel files are saved to the default scenario directory. :rtype: None """ # Create scenar for all regions with only area filled for region in tqdm(regions, desc="Calcul des Ymax et k", unit="region"): try: self.data = self.dataloader.pre_process_df(self.last_data_year, region) except: self.data = self.dataloader.pre_process_df(self.last_data_year, "France") print(f"No region named {region} in the data") model_sheets = pd.read_excel(os.path.join(self.data_path, "scenario.xlsx"), sheet_name=None) sheets = {} sheet_corres = { "doc": "doc", "main scenario": "main", "Surface changes": "area", "technical scenario": "technical", } for sheet_name, df in model_sheets.items(): sheets[sheet_corres[sheet_name]] = df sheets["area"] = self.generate_crop_tab(region, prod_func) with pd.ExcelWriter( os.path.join(self.data_path, "scenario_region", region + ".xlsx"), engine="openpyxl" ) as writer: for sheet_name, df in sheets.items(): df.to_excel(writer, sheet_name=sheet_name, index=False)
[docs] class Y: """ Class for modeling and fitting nitrogen yield response curves based on historical data. This class extracts yield and fertilization data from NitrogenFlowModel instances, and provides a suite of theoretical models (e.g., Mitscherlich, exponential, linear) to fit production curves. :param dataloader: Optional instance of DataLoader to use. If None, a default one is initialized. :type dataloader: DataLoader, optional :ivar dataloader: DataLoader instance used for retrieving regional crop data. :vartype dataloader: DataLoader :ivar int_yr: List of integer years available in the dataset. :vartype int_yr: list[int] """ def __init__(self, dataloader=None): if dataloader == None: self.dataloader = DataLoader() else: self.dataloader = dataloader self.int_yr = [int(i) for i in annees_disponibles]
[docs] def get_Y(self, culture, region, plot=False): """ Extracts historical fertilization and yield data for a given crop and region. Optionally displays a scatter plot of the data. :param culture: The name of the crop (index in the NitrogenFlowModel). :type culture: str :param region: The name of the region. :type region: str :param plot: Whether to plot the data (default is False). :type plot: bool :return: Tuple of arrays (F, Y), fertilization and yield values. :rtype: tuple[np.ndarray, np.ndarray] """ F = [] Y = [] YR = [] if region == "Savoie": years = annees_disponibles[1:] else: years = annees_disponibles for yr in years: model = NitrogenFlowModel(data=self.dataloader, year=yr, region=region) f = model.Ftot(culture) y = model.Y(culture) if ( isinstance(f, (float, np.float64)) and isinstance(y, (float, np.float64)) and not np.isnan(f) and not np.isnan(y) and f != 0 and y != 0 and f < 350 # On supprime les valeur délirantes (pb données) de ferti et Y and y < 350 # and y < f*0.9 # Condition NUE<90% ): F.append(f) Y.append(y) YR.append(int(yr)) if plot: plt.figure(figsize=(8, 6)) plt.plot(F, Y, "o-", color="tab:blue", markersize=6, label="Historic Data") # Points et ligne plt.xlabel("Fertilization (kgN/ha/yr)", fontsize=12) plt.ylabel("Yield (kgN/ha/yr)", fontsize=12) # plt.title("Relation entre Fertilisation et Rendement", fontsize=14, fontweight='bold') plt.grid(True, linestyle="--", alpha=0.4) # Grille discrète plt.legend() plt.show() return np.array(F), np.array(Y), np.array(YR)
[docs] @staticmethod def Y_th(f, y_max): """ Mitscherlich-type yield response: Y = f * y_max / (f + y_max) :param f: Fertilization input (kgN/ha). :type f: float or np.ndarray :param y_max: Maximum attainable yield (kgN/ha). :type y_max: float :return: Estimated yield. :rtype: float or np.ndarray """ if np.any(f + y_max == 0): return 0 return f * y_max / (f + y_max)
[docs] @staticmethod def Y_th_exp(f, y_max, k): """ Exponential growth model without cap. Computes yield as: Y(f) = y_max * (1 - exp(-f / k)) :param f: Fertilization input (kgN/ha). :type f: float or np.ndarray :param y_max: Maximum yield asymptote (kgN/ha). :type y_max: float :param k: Growth curvature parameter (kgN/ha). :type k: float :return: Yield response Y(f). :rtype: float or np.ndarray """ return y_max * (1 - np.exp(-f / k))
[docs] @staticmethod def Y_th_exp_cap(f, y_max, k): """ Exponential response with physical cap at 99% of fertilization. Formula: Y(f) = min(y_max * (1 - exp(-f / k)), 0.99 * f) :param f: Fertilization input (kgN/ha). :type f: float or np.ndarray :param y_max: Maximum yield asymptote (kgN/ha). :type y_max: float :param k: Growth curvature parameter (kgN/ha). :type k: float :return: Capped yield response Y(f). :rtype: float or np.ndarray """ return np.minimum(y_max * (1 - np.exp(-f / k)), 0.99 * f)
[docs] @staticmethod def Y_th_lin(f, a, b): """ Simple linear yield response with a saturation cap. Formula: Y(f) = min(a * f, b) :param f: Fertilization input (kgN/ha). :type f: float or np.ndarray :param a: Linear coefficient. :type a: float :param b: Maximum yield threshold. :type b: float :return: Capped linear yield. :rtype: float or np.ndarray """ return np.minimum(a * f, b)
[docs] @staticmethod def Y_th_lin_2(f, a, xb, c): """ Piecewise linear yield function with break at `xb`. Formula: - If f ≤ xb: Y = a * f - If f > xb: Y = c * (f - xb) + xb * a :param f: Fertilization input (kgN/ha). :type f: float or np.ndarray :param a: Initial slope before breakpoint. :type a: float :param xb: Breakpoint fertilization value. :type xb: float :param c: Slope after breakpoint. :type c: float :return: Yield response Y(f). :rtype: float or np.ndarray """ return np.minimum(a * f, c * (f - xb) + xb * a)
[docs] @staticmethod def Y_th_poly(f, a, b): """ Second-order polynomial yield response. Formula: Y(f) = a * f² + b * f :param f: Fertilization input. :type f: float or np.ndarray :param a: Quadratic coefficient. :type a: float :param b: Linear coefficient. :type b: float :return: Polynomial yield response. :rtype: float or np.ndarray """ return a * f**2 + b * f
[docs] @staticmethod def Y_th_poly_cap(f, a, b): """ Polynomial yield response capped at 99% of fertilization. Formula: Y(f) = min(a * f² + b * f, 0.99 * f) :param f: Fertilization input. :type f: float or np.ndarray :param a: Quadratic coefficient. :type a: float :param b: Linear coefficient. :type b: float :return: Capped polynomial yield response. :rtype: float or np.ndarray """ return np.minimum(0.99 * f, a * f**2 + b * f)
[docs] @staticmethod def Y_th_sigmoid(f, Ymax, k, x0): """ Sigmoidal yield response function. Formula: Y(f) = Ymax / (1 + exp(-k * (f - x0))) :param f: Fertilization input. :type f: float or np.ndarray :param Ymax: Maximum yield. :type Ymax: float :param k: Growth steepness. :type k: float :param x0: Midpoint of the sigmoid. :type x0: float :return: Sigmoidal yield response. :rtype: float or np.ndarray """ return Ymax / (1 + np.exp(-k * (f - x0)))
[docs] @staticmethod def Y_th_sigmoid_cap(f, Ymax, k, x0): """ Sigmoidal yield function with upper cap at 99% of fertilization input. Formula: Y(f) = min(Ymax / (1 + exp(-k * (f - x0))), 0.99 * f) :param f: Fertilization input. :type f: float or np.ndarray :param Ymax: Maximum yield. :type Ymax: float :param k: Growth steepness. :type k: float :param x0: Inflection point. :type x0: float :return: Capped sigmoidal yield response. :rtype: float or np.ndarray """ return np.minimum(0.99 * f, Ymax / (1 + np.exp(-k * (f - x0))))
[docs] @staticmethod def Y_th_lin_2_smooth(f, a, xb, c, s): """ Smooth piecewise linear function using logistic blending near the breakpoint. Formula: transition = 1 / (1 + exp(-s * (f - xb))) Y(f) = (1 - transition) * a * f + transition * (c * (f - xb) + xb * a) :param f: Fertilization input (kgN/ha). :type f: float or np.ndarray :param a: Initial slope. :type a: float :param xb: Breakpoint. :type xb: float :param c: Post-breakpoint slope. :type c: float :param s: Sharpness of transition (higher = sharper). :type s: float :return: Smoothed yield response Y(f). :rtype: float or np.ndarray """ transition = 1 / (1 + np.exp(-s * (f - xb))) return a * f * (1 - transition) + (c * (f - xb) + xb * a) * transition
[docs] def fit_Y(self, culture, region): """ Fits a Mitscherlich-type yield curve (Y_th) to historical fertilization/yield data for a given crop and region. Returns the optimal `y_max` value and the fitted curve. :param culture: The name of the crop. :type culture: str :param region: The name of the region. :type region: str :return: Tuple of (y_max, fitted_yield_values). :rtype: tuple[float or None, np.ndarray or None] """ F, Y, _ = self.get_Y(culture, region) if len(Y) == 0: return 0, None Y_max_init = max(Y) try: popt, _ = curve_fit(self.Y_th, F, Y, p0=[Y_max_init], bounds=(0, np.inf)) Y_max_opt = popt[0] # 📌 Paramètre ajusté Y_max except RuntimeError: print("⚠️ Ajustement impossible pour", culture, region) Y_max_opt = None # Retourne None en cas d'échec Y_th_fitted = self.Y_th(F, Y_max_opt) if Y_max_opt is not None else None return Y_max_opt, Y_th_fitted
[docs] def fit_Y_exp(self, culture, region): """ Fit an exponential yield curve (Y = Y_max(1-exp(F/k))) to historical data for a given crop (culture) and region using non-linear least squares. :param culture: Name of the crop. :type culture: str :param region: Region identifier. :type region: str :return: Tuple containing optimal parameters Y_max and k, and the fitted curve. :rtype: Tuple[float, float, np.ndarray | None] """ F, Y, _ = self.get_Y(culture, region) if len(Y) == 0: return 0, 0, None Y_max_init = max(Y) try: popt, _ = curve_fit( self.Y_th_exp, F, Y, p0=[Y_max_init, min(F)], bounds=([min(Y), min(F) * 0.5], [max(Y) * 2, max(F) * 2]) ) Y_max_opt = popt[0] # 📌 Paramètre ajusté Y_max k = popt[1] except RuntimeError: print("⚠️ Ajustement impossible pour", culture, region) Y_max_opt = 0 # Retourne None en cas d'échec k = 0 Y_th_fitted = self.Y_th_exp(F, Y_max_opt, k) if Y_max_opt is not None else None return Y_max_opt, k, Y_th_fitted
[docs] def fit_Y_exp_2(self, culture, region): """ Fit an exponential yield curve using a constrained optimization method (SLSQP). Ensures that k > Y_max for interpretability. :param culture: Name of the crop. :type culture: str :param region: Region identifier. :type region: str :return: Tuple with Y_max, k, and the fitted yield curve. :rtype: Tuple[float, float, np.ndarray | None] """ F, Y, _ = self.get_Y(culture, region) if len(Y) == 0: return 0, 0, None Y_max_init = max(Y) k_init = min(F) def objective(params): y_max, k = params return np.sum((self.Y_th_exp(F, y_max, k) - Y) ** 2) # Contraintes : k > y_max constraint = {"type": "ineq", "fun": lambda params: params[1] - params[0]} # k - y_max > 0 # Bornes : y_max > 0, k > 0 bounds = [(0, max(Y) * 2), (0, max(F) * 2)] result = minimize( objective, x0=[Y_max_init, k_init], bounds=bounds, constraints=[constraint], ) if result.success: y_max_opt, k_opt = result.x y_th_fit = self.Y_th_exp(F, y_max_opt, k_opt) return y_max_opt, k_opt, y_th_fit else: print("⚠️ Ajustement impossible pour", culture, region) return 0, 0, None
[docs] def fit_Y_lin(self, culture, region): """ Fit a simple linear model (Y = a * F + b) to yield data. :param culture: Name of the crop. :type culture: str :param region: Region identifier. :type region: str :return: Fitted slope and intercept with predicted yield. :rtype: Tuple[float, float, np.ndarray | None] """ F, Y, _ = self.get_Y(culture, region) if len(Y) == 0: return 0, 0, None try: popt, _ = curve_fit(self.Y_th_lin, F, Y, p0=[0.75, max(Y)], bounds=([0, 0], [1, max(Y) * 2])) a = popt[0] # 📌 Paramètre ajusté Y_max b = popt[1] except RuntimeError: print("⚠️ Ajustement impossible pour", culture, region) a = 0 # Retourne None en cas d'échec b = 0 Y_th_fitted = self.Y_th_lin(F, a, b) if a is not None else None return a, b, Y_th_fitted
[docs] def fit_Y_lin_2(self, culture, region): """ Fit a piecewise linear model with two slopes to capture yield response. :param culture: Name of the crop. :type culture: str :param region: Region identifier. :type region: str :return: Parameters a, xb (threshold), c (second slope), and the fitted curve. :rtype: Tuple[float, float, float, np.ndarray | None] """ F, Y, _ = self.get_Y(culture, region) if len(Y) == 0: return 0, 0, None try: midpoint = (min(F) + max(F)) / 2 p01 = [max(Y) / max(F), midpoint, 0.1] popt, _ = curve_fit(self.Y_th_lin_2, F, Y, p0=p01, bounds=([0.1, 0, 0.01], [1, 1000, 1])) a = popt[0] # 📌 Paramètre ajusté Y_max xb = popt[1] c = popt[2] except RuntimeError: print("⚠️ Ajustement impossible pour", culture, region) a = 0 # Retourne None en cas d'échec xb = 0 c = 0 Y_th_fitted = self.Y_th_lin_2(F, a, xb, c) if a is not None else None return a, xb, c, Y_th_fitted
[docs] def fit_Y_lin_2_scipy(self, culture, region): """ Fit the same piecewise linear model as fit_Y_lin_2 but using SLSQP optimizer. :param culture: Name of the crop. :type culture: str :param region: Region identifier. :type region: str :return: Optimized parameters a, xb, c and the yield prediction. :rtype: Tuple[float, float, float, np.ndarray | None] """ F, Y, _ = self.get_Y(culture, region) if len(Y) == 0: return 0, 0, 0, None def objective(params): a, xb, c = params Y_pred = self.Y_th_lin_2(F, a, xb, c) return np.sum((Y - Y_pred) ** 2) # ➕ Bornes pour a, xb, c bounds = [(0.1, 1.0), (0, max(F)), (0.0, 1.0)] # 🔹 Point initial (à ajuster selon le domaine) midpoint = (min(F) + max(F)) / 2 p0 = [max(Y) / max(F), midpoint, 0.1] result = minimize(objective, p0, bounds=bounds, method="SLSQP") if result.success: a_opt, xb_opt, c_opt = result.x Y_fit = self.Y_th_lin_2(F, a_opt, xb_opt, c_opt) return a_opt, xb_opt, c_opt, Y_fit else: print(f"⚠️ Ajustement impossible pour {culture}, {region}") return 0, 0, 0, None
[docs] def fit_Y_poly(self, culture, region): """ Fit a 2nd-degree polynomial (Y = aF^2 + bF + c) to the data without constraints. :param culture: Crop name. :type culture: str :param region: Region identifier. :type region: str :return: Parameters a, b, c and the fitted values. :rtype: Tuple[float, float, float, np.ndarray | None] """ F, Y, _ = self.get_Y(culture, region) if len(Y) == 0: return 0, 0, None try: popt, _ = curve_fit(self.Y_th_poly, F, Y, p0=[0.75, 0.9, 0], bounds=([-20, -20, -20], [20, 20, 20])) a = popt[0] # 📌 Paramètre ajusté Y_max b = popt[1] c = popt[2] except RuntimeError: print("⚠️ Ajustement impossible pour", culture, region) a = 0 # Retourne None en cas d'échec b = 0 c = 0 Y_th_fitted = self.Y_th_poly(F, a, b, c) if a is not None else None return a, b, c, Y_th_fitted
[docs] def fit_Y_poly_constrained(self, culture, region): """ Fit a constrained 2nd-degree polynomial to ensure concavity and meaningful shape (P(F)<F). :param culture: Crop name. :type culture: str :param region: Region identifier. :type region: str :return: Fitted parameters a, b and predicted yield values. :rtype: Tuple[float, float, np.ndarray | None] """ F, Y, _ = self.get_Y(culture, region) if len(Y) == 0: return 0, 0, 0, None # Initialisation a_init = -0.002 b_init = 0.9 # c_init = 0 max_F = max(F) def objective(params): a, b = params return np.sum((self.Y_th_poly(F, a, b) - Y) ** 2) # ➕ 1ère contrainte : sommet après max(F) → -b / (2a) > max(F) # ⇔ -b - 2a * max(F) > 0 # constraint1 = {"type": "ineq", "fun": lambda p: -p[1] - 2 * p[0] * max_F} # ➕ 2ème contrainte : pente à max(F) > 0 → 2a * max(F) + b > 0 constraint2 = {"type": "ineq", "fun": lambda p: 2 * p[0] * max_F + p[1]} bounds = [(-10, 0), (-1000, 2)] # a < 0 pour avoir une parabole concave result = minimize( objective, x0=[a_init, b_init], bounds=bounds, method="trust-constr", constraints=[constraint2], ) if result.success: a_opt, b_opt = result.x y_fit = self.Y_th_poly(F, a_opt, b_opt) return a_opt, b_opt, y_fit else: print(f"⚠️ Ajustement impossible pour {culture}, {region}") return 0, 0, None
[docs] def fit_Y_sigmoid(self, culture, region): """ Fit a logistic sigmoid function (Y = Ymax / (1 + exp(-k(x - x0)))) to yield data. :param culture: Name of the crop. :type culture: str :param region: Region identifier. :type region: str :return: Parameters Ymax, k, x0 and fitted yield curve. :rtype: Tuple[float, float, float, np.ndarray | None] """ F, Y, _ = self.get_Y(culture, region) if len(Y) == 0: return 0, 0, 0, None # Fonction à minimiser (erreur quadratique) def objective(params): Ymax, k, x0 = params Y_pred = self.Y_th_sigmoid(F, Ymax, k, x0) return np.sum((Y - Y_pred) ** 2) # Bornes réalistes pour les paramètres bounds = [(0.1, 3 * max(Y)), (1e-10, 1), (0, 2 * max(F))] # Ymax, k, x0 # Initialisation p0 = [max(Y), 0.01, np.median(F)] result = minimize(objective, p0, bounds=bounds) if result.success: Ymax, k_opt, x0_opt = result.x Y_fit = self.Y_th_sigmoid(F, Ymax, k_opt, x0_opt) return Ymax, k_opt, x0_opt, Y_fit else: print(f"⚠️ Ajustement impossible pour {culture}, {region}") return 0, 0, 0, None
[docs] def fit_Y_lin_2_smooth(self, culture, region): """ Fit a smoothed two-slope model (sigmoid transition) for yield response. :param culture: Crop name. :type culture: str :param region: Region identifier. :type region: str :return: Parameters a, xb, c, s (smoothness), and predicted yield. :rtype: Tuple[float, float, float, float, np.ndarray | None] """ F, Y, _ = self.get_Y(culture, region) if len(Y) == 0: return 0, 0, 0, 0, None def objective(params): a, xb, c, s = params Y_pred = self.Y_th_lin_2_smooth(F, a, xb, c, s) return np.sum((Y - Y_pred) ** 2) # ➕ Bornes pour a, xb, c, s bounds = [(0.1, 1.0), (0, 2 * max(F)), (0.0, 1.0), (0, 1e6)] # 🔹 Point initial (à ajuster selon le domaine) # midpoint = (min(F) + max(F)) / 2 # p0 = [max(Y) / max(F), midpoint, 0.1, 1] a, xb, c, _ = self.fit_Y_lin_2_scipy(culture, region) p0 = [a, xb, c, 0.01] print(p0) result = minimize(objective, p0, bounds=bounds, method="Nelder-Mead") if result.success: a_opt, xb_opt, c_opt, s_opt = result.x Y_fit = self.Y_th_lin_2_smooth(F, a_opt, xb_opt, c_opt, s_opt) return a_opt, xb_opt, c_opt, s_opt, Y_fit else: print(f"⚠️ Ajustement impossible pour {culture}, {region}") return 0, 0, 0, 0, None
[docs] def plot_Y(self, culture, region): """ Plot the fitted Mitscherlich-type yield curve and historical data points for a specific crop and region. :param culture: Name of the crop. :type culture: str :param region: Region identifier. :type region: str :return: None. The plot is displayed with matplotlib. :rtype: None """ F, Y, int_yr = self.get_Y(culture, region) if len(Y) == 0: print(f"no {culture} found in {region}") return None Y_max, _ = self.fit_Y(culture, region) F_th = np.linspace(0, 1.05 * max(F), 100) Y_th = self.Y_th(F_th, Y_max) r2 = np.round(r2_score(Y, self.Y_th(F, Y_max)), 2) plt.figure(figsize=(10, 6)) # 1. Utiliser plt.scatter pour permettre la coloration par point # c=int_yr mappe les couleurs aux valeurs de la liste d'années # cmap='viridis' est une palette de couleurs, vous pouvez en choisir d'autres ('plasma', 'coolwarm', etc.) scatter = plt.scatter(F, Y, c=int_yr, cmap="viridis", s=64, zorder=3, label="Historical data") # 2. Ajouter une barre de couleur pour servir de légende pour les années cbar = plt.colorbar(scatter) cbar.set_label("Year", fontsize=12) # --- Fin des modifications --- plt.plot(Y, Y, "--", color="gray", label="Ligne 1:1") plt.plot( F_th, Y_th, label=f"Theoric curve, Y_max = {int(Y_max)}, $r^2$ = {np.round(r2, 2)}", color="orange", linewidth=4, zorder=2, ) plt.xlim(0, max(F_th) * 1.1) plt.ylim(0, max(Y) * 1.1) # plt.gca().set_aspect("equal") # Peut déformer le graphique, à utiliser avec précaution plt.xlabel("Fertilization (kgN/ha/yr)", fontsize=12) plt.ylabel("Yield (kgN/ha/yr)", fontsize=12) # plt.title(f"Relation Fertilisation-Rendement\n($r^2$ = {r2})", fontsize=14, fontweight='bold') plt.grid(True, linestyle="--", alpha=0.4) plt.legend() plt.show()
[docs] def plot_Y_lin(self, culture, region): """ Plot the fitted linear yield curve (Y = aF + b) and historical data for a crop. :param culture: Crop name. :type culture: str :param region: Region identifier. :type region: str :return: None. Displays the matplotlib plot. :rtype: None """ F, Y, int_yr = self.get_Y(culture, region) if len(Y) == 0: print(f"no {culture} found in {region}") return None a, b, _ = self.fit_Y_lin(culture, region) # a, b = 0.75, 160 F_th = np.linspace(0, 1.05 * max(F), 100) Y_th = self.Y_th_lin(F_th, a, b) r2 = np.round(r2_score(Y, self.Y_th_lin(F, a, b)), 2) plt.figure(figsize=(10, 6)) scatter = plt.scatter(F, Y, c=int_yr, cmap="viridis", s=64, zorder=3, label="Historical data") # 2. Ajouter une barre de couleur pour servir de légende pour les années cbar = plt.colorbar(scatter) cbar.set_label("Year", fontsize=12) # --- Fin des modifications --- plt.plot(Y, Y, "--", color="gray", label="Ligne 1:1") plt.plot( F_th, Y_th, label=f"Theoric curve, Y_max = {int(b)}, $r^2$ = {np.round(r2, 2)}", color="orange", linewidth=4, zorder=2, ) plt.xlim(0, max(F_th) * 1.1) plt.ylim(0, max(Y) * 1.1) # plt.gca().set_aspect("equal") # Peut déformer le graphique, à utiliser avec précaution plt.xlabel("Fertilization (kgN/ha/yr)", fontsize=12) plt.ylabel("Yield (kgN/ha/yr)", fontsize=12) # plt.title(f"Relation Fertilisation-Rendement\n($r^2$ = {r2})", fontsize=14, fontweight='bold') plt.grid(True, linestyle="--", alpha=0.4) plt.legend() plt.show()
[docs] def plot_Y_lin_2(self, culture, region): """ Plot the fitted piecewise linear yield curve with two slopes and the historical data. :param culture: Crop name. :type culture: str :param region: Region identifier. :type region: str :return: None. The plot is rendered using matplotlib. :rtype: None """ F, Y, int_yr = self.get_Y(culture, region) if len(Y) == 0: print(f"no {culture} found in {region}") return None a, xb, c, _ = self.fit_Y_lin_2_scipy(culture, region) # a, b = 0.75, 160 F_th = np.linspace(0, 1.05 * max(F), 100) Y_th = self.Y_th_lin_2(F_th, a, xb, c) r2 = np.round(r2_score(Y, self.Y_th_lin_2(F, a, xb, c)), 2) plt.figure(figsize=(10, 6)) scatter = plt.scatter(F, Y, c=int_yr, cmap="viridis", s=64, zorder=3, label="Historical data") # 2. Ajouter une barre de couleur pour servir de légende pour les années cbar = plt.colorbar(scatter) cbar.set_label("Year", fontsize=12) # --- Fin des modifications --- plt.plot(Y, Y, "--", color="gray", label="Ligne 1:1") plt.plot( F_th, Y_th, label=f"Theoric curve, $r^2$ = {np.round(r2, 2)}", color="orange", linewidth=4, zorder=2, ) plt.xlim(0, max(F_th) * 1.1) plt.ylim(0, max(Y) * 1.1) # plt.gca().set_aspect("equal") # Peut déformer le graphique, à utiliser avec précaution plt.xlabel("Fertilization (kgN/ha/yr)", fontsize=12) plt.ylabel("Yield (kgN/ha/yr)", fontsize=12) # plt.title(f"Relation Fertilisation-Rendement\n($r^2$ = {r2})", fontsize=14, fontweight='bold') plt.grid(True, linestyle="--", alpha=0.4) plt.legend() plt.show()
[docs] def plot_Y_poly(self, culture, region): """ Plot the fitted constrained polynomial yield curve and the actual data. :param culture: Crop name. :type culture: str :param region: Region identifier. :type region: str :return: None. Displays the curve and points. :rtype: None """ F, Y, int_yr = self.get_Y(culture, region) if len(Y) == 0: print(f"no {culture} found in {region}") return None a, b, _ = self.fit_Y_poly_constrained(culture, region) # a, b, c = -0.002, 0.90226, 0 F_th = np.linspace(0, 1.05 * max(F), 100) Y_th = self.Y_th_poly_cap(F_th, a, b) r2 = np.round(r2_score(Y, self.Y_th_poly_cap(F, a, b)), 2) plt.figure(figsize=(10, 6)) scatter = plt.scatter(F, Y, c=int_yr, cmap="viridis", s=64, zorder=3, label="Historical data") # 2. Ajouter une barre de couleur pour servir de légende pour les années cbar = plt.colorbar(scatter) cbar.set_label("Year", fontsize=12) # --- Fin des modifications --- plt.plot(Y, Y, "--", color="gray", label="Ligne 1:1") plt.plot( F_th, Y_th, label=f"Theoric curve, Y_max = {np.round(-b / (2 * a))}, $r^2$ = {np.round(r2, 2)}", color="orange", linewidth=4, zorder=2, ) plt.xlim(0, max(F_th) * 1.1) plt.ylim(0, max(Y) * 1.1) # plt.gca().set_aspect("equal") # Peut déformer le graphique, à utiliser avec précaution plt.xlabel("Fertilization (kgN/ha/yr)", fontsize=12) plt.ylabel("Yield (kgN/ha/yr)", fontsize=12) # plt.title(f"Relation Fertilisation-Rendement\n($r^2$ = {r2})", fontsize=14, fontweight='bold') plt.grid(True, linestyle="--", alpha=0.4) plt.legend() plt.show()
[docs] def plot_Y_exp(self, culture, region): """ Plot the fitted exponential yield curve (capped) with the data for the specified crop. :param culture: Name of the crop. :type culture: str :param region: Target region. :type region: str :return: None. Plot appears inline. :rtype: None """ F, Y, int_yr = self.get_Y(culture, region) if len(Y) == 0: print(f"no {culture} found in {region}") return None Y_max, k, _ = self.fit_Y_exp_2(culture, region) F_th = np.linspace(0, 1.05 * max(F), 100) Y_th = self.Y_th_exp_cap(F_th, Y_max, k) r2 = np.round(r2_score(Y, self.Y_th_exp_cap(F, Y_max, k)), 2) plt.figure(figsize=(10, 6)) scatter = plt.scatter(F, Y, c=int_yr, cmap="viridis", s=64, zorder=3, label="Historical data") # 2. Ajouter une barre de couleur pour servir de légende pour les années cbar = plt.colorbar(scatter) cbar.set_label("Year", fontsize=12) # --- Fin des modifications --- plt.plot(Y, Y, "--", color="gray", label="Ligne 1:1") plt.plot( F_th, Y_th, label=f"Theoric curve, Y_max = {np.round(Y_max)}, $r^2$ = {np.round(r2, 2)}", color="orange", linewidth=4, zorder=2, ) plt.xlim(0, max(F_th) * 1.1) plt.ylim(0, max(Y) * 1.1) # plt.gca().set_aspect("equal") # Peut déformer le graphique, à utiliser avec précaution plt.xlabel("Fertilization (kgN/ha/yr)", fontsize=12) plt.ylabel("Yield (kgN/ha/yr)", fontsize=12) # plt.title(f"Relation Fertilisation-Rendement\n($r^2$ = {r2})", fontsize=14, fontweight='bold') plt.grid(True, linestyle="--", alpha=0.4) plt.legend() plt.show()
[docs] def plot_Y_sigmoid(self, culture, region): """ Plot the fitted sigmoid yield curve and compare with observed fertilization-yield data. :param culture: Name of the crop. :type culture: str :param region: Region identifier. :type region: str :return: None. Shows the matplotlib plot. :rtype: None """ F, Y, int_yr = self.get_Y(culture, region) if len(Y) == 0: print(f"no {culture} found in {region}") return None # Fit sigmoïde Ymax, k_opt, x0_opt, Y_th_fit = self.fit_Y_sigmoid(culture, region) F_th = np.linspace(0, 1.05 * max(F), 200) Y_th = self.Y_th_sigmoid_cap(F_th, Ymax, k_opt, x0_opt) r2 = np.round(r2_score(Y, self.Y_th_sigmoid_cap(F, Ymax, k_opt, x0_opt)), 2) plt.figure(figsize=(10, 6)) scatter = plt.scatter(F, Y, c=int_yr, cmap="viridis", s=64, zorder=3, label="Historical data") # 2. Ajouter une barre de couleur pour servir de légende pour les années cbar = plt.colorbar(scatter) cbar.set_label("Year", fontsize=12) # --- Fin des modifications --- plt.plot(Y, Y, "--", color="gray", label="Ligne 1:1") plt.plot( F_th, Y_th, label=f"Theoric curve, Y_max = {np.round(Ymax)}, $r^2$ = {np.round(r2, 2)}", color="orange", linewidth=4, zorder=2, ) plt.xlim(0, max(F_th) * 1.1) plt.ylim(0, max(Y) * 1.1) # plt.gca().set_aspect("equal") # Peut déformer le graphique, à utiliser avec précaution plt.xlabel("Fertilization (kgN/ha/yr)", fontsize=12) plt.ylabel("Yield (kgN/ha/yr)", fontsize=12) # plt.title(f"Relation Fertilisation-Rendement\n($r^2$ = {r2})", fontsize=14, fontweight='bold') plt.grid(True, linestyle="--", alpha=0.4) plt.legend() plt.show()
[docs] def plot_Y_lin_2_smooth(self, culture, region): """ Plot the smooth transition linear-2 slope yield model and observed data points. :param culture: Name of the crop. :type culture: str :param region: Region code. :type region: str :return: None. Generates a yield vs fertilization curve. :rtype: None """ F, Y, int_yr = self.get_Y(culture, region) if len(Y) == 0: print(f"no {culture} found in {region}") return None # Fit sigmoïde a, xb, c, s, Y_th_fit = self.fit_Y_lin_2_smooth(culture, region) F_th = np.linspace(0, 1.05 * max(F), 200) Y_th = self.Y_th_lin_2_smooth(F_th, a, xb, c, s) r2 = np.round(r2_score(Y, self.Y_th_lin_2_smooth(F, a, xb, c, s)), 2) plt.figure(figsize=(8, 6)) scatter = plt.scatter(F, Y, c=int_yr, cmap="viridis", s=64, zorder=3, label="Historical data") # 2. Ajouter une barre de couleur pour servir de légende pour les années cbar = plt.colorbar(scatter) cbar.set_label("Year", fontsize=12) # --- Fin des modifications --- plt.plot(Y, Y, "--", color="gray", label="Ligne 1:1") plt.plot( F_th, Y_th, label=f"Theoric curve, $r^2$ = {np.round(r2, 2)}", color="orange", linewidth=4, zorder=2, ) plt.xlim(0, max(F_th) * 1.1) plt.ylim(0, max(Y) * 1.1) # plt.gca().set_aspect("equal") # Peut déformer le graphique, à utiliser avec précaution plt.xlabel("Fertilization (kgN/ha/yr)", fontsize=12) plt.ylabel("Yield (kgN/ha/yr)", fontsize=12) # plt.title(f"Relation Fertilisation-Rendement\n($r^2$ = {r2})", fontsize=14, fontweight='bold') plt.grid(True, linestyle="--", alpha=0.4) plt.legend() plt.show()
[docs] def compare_r2(self, region): """ Compare fitted yield model results across crop types for a given region. Evaluates and visualizes the yield ceiling (Ymax) or R² for multiple curve-fitting models: - Ratio-based (Mitscherlich) - Exponential - Linear (1 slope) - Linear (2 slopes) - Polynomial - Sigmoid Results are displayed in a heatmap per culture for the selected region. :param region: Region name or code. :type region: str :return: Tuple of NumPy array with values and dictionary of model outputs. :rtype: Tuple[np.ndarray, dict] """ r2_values = { "Ratio": {}, "Exponential": {}, "Linear 1 slope": {}, "Linear 2 slopes": {}, "Polynomial": {}, "Sigmoid": {}, } # Barre de progression avec tqdm for culture in tqdm(cultures + prairies, desc="Calcul des R²", unit="culture"): F, Y = self.get_Y(culture, region) if len(Y) == 0 or len(F) == 0: pass # r2_values["Ratio"][culture] = np.nan # r2_values["Exponential"][culture] = np.nan # r2_values["Linear 1 slope"][culture] = np.nan # r2_values["Linear 2 slopes"][culture] = np.nan # r2_values["Polynomial"][culture] = np.nan else: # Ajustement avec la première fonction (Y_th) Y_max_th, _ = self.fit_Y(culture, region) if Y_max_th is not None: r2_values["Ratio"][culture] = Y_max_th # r2_values["Ratio"][culture] = np.round(r2_score(Y, self.Y_th(F, Y_max_th)), 2) else: r2_values["Ratio"][culture] = 0 # Ajustement avec la seconde fonction (Y_th_exp) Y_max_exp, k, _ = self.fit_Y_exp(culture, region) if Y_max_exp is not None: r2_values["Exponential"][culture] = np.round(Y_max_exp) # r2_values["Exponential"][culture] = np.round(r2_score(Y, self.Y_th_exp(F, Y_max_exp, k)), 2) else: r2_values["Exponential"][culture] = 0 # Ajustement avec la troisieme fonction (Y_th_lin) a, b, _ = self.fit_Y_lin(culture, region) if a is not None: r2_values["Linear 1 slope"][culture] = np.round(b) # r2_values["Linear 1 slope"][culture] = np.round(r2_score(Y, self.Y_th_lin(F, a, b)), 2) else: r2_values["Linear 1 slope"][culture] = 0 # Ajustement avec la quatrième fonction (Y_th_lin_2) # a, xb, c, _ = self.fit_Y_lin_2(culture, region) # if a is not None: # r2_values["Linear 2 slopes"][culture] = np.round(r2_score(Y, self.Y_th_lin_2(F, a, xb, c)), 2) # else: # r2_values["Linear 2 slopes"][culture] = 0 r2_values["Linear 2 slopes"][culture] = np.nan # Ajustement avec la cinquième fonction (Y_th_poly) a, b, _ = self.fit_Y_poly_constrained(culture, region) if a is not None: r2_values["Polynomial"][culture] = np.round(-b / (2 * a)) # r2_values["Polynomial"][culture] = np.round(r2_score(Y, self.Y_th_poly(F, a, b)), 2) else: r2_values["Polynomial"][culture] = 0 # Ajustement avec la cinquième fonction (Y_th_poly) Ymax, k, x0, _ = self.fit_Y_sigmoid(culture, region) if Ymax is not None: r2_values["Sigmoid"][culture] = np.round(Ymax) # r2_values["Sigmoid"][culture] = np.round(r2_score(Y, self.Y_th_sigmoid_cap(F, Ymax, k, x0)), 2) else: r2_values["Sigmoid"][culture] = 0 # Création d'un DataFrame pour la heatmap r2_df = np.array( [ list(r2_values["Ratio"].values()), list(r2_values["Exponential"].values()), list(r2_values["Linear 1 slope"].values()), list(r2_values["Linear 2 slopes"].values()), list(r2_values["Polynomial"].values()), list(r2_values["Sigmoid"].values()), ] ) # Création de la heatmap plt.figure(figsize=(10, len(cultures) // 3)) ax = sns.heatmap( r2_df.T, annot=True, cmap="plasma", # "coolwarm", fmt=".2f", linewidths=0.5, cbar_kws={"label": "Ymax"}, # cbar_kws={"label": "R² Score"}, xticklabels=["Ratio", "Exponential", "Linear 1 slope", "Linear 2 slopes", "Polynomial", "Sigmoid"], yticklabels=list(r2_values["Ratio"].keys()), vmax=600, vmin=0, ) plt.title(f"Comparaison des Ymax (kgN/ha) pour {region}") # R² Ymax plt.xlabel("Modèle") plt.ylabel("Culture") # Affichage de la heatmap plt.show() return r2_df, r2_values
## % Prospective classes
[docs] class CultureData_prospect: """ Prepare and structure crop data required for prospective scenarios in the NitrogenFlowModel. This class constructs a DataFrame (`df_cultures`) that includes area, fertilization spread ratio, nitrogen content, and yield function parameters (depending on the selected model). """ def __init__(self, main, area, data_path, categories_mapping, func_prod): """ Initialize the CultureData_prospect instance. :param main: DataFrame containing aggregate area information (e.g. "Arable area", "Permanent grassland area"). :type main: pd.DataFrame :param area: DataFrame with crop-specific area and yield function parameters. :type area: pd.DataFrame :param data_path: Path to the Excel file containing additional crop data. :type data_path: str :param categories_mapping: Mapping dictionary for crop categorization (not directly used here). :type categories_mapping: dict :param func_prod: Production function name ("Linear", "Ratio", or "Exponential"). :type func_prod: str """ self.main = main self.area = area self.data_path = data_path self.categories_mapping = categories_mapping self.func_prod = func_prod self.df_cultures = self.create_culture_dataframe()
[docs] def create_culture_dataframe(self): """ Construct the `df_cultures` DataFrame combining area, fertilization spreading ratio, nitrogen content, and yield function parameters for each crop. The structure and included parameters depend on the selected yield function model. :return: A pandas DataFrame with all necessary parameters for crop modeling. :rtype: pd.DataFrame """ crops_index = self.area["Crops"][:-2] # Extraire les données de surface arable_area = self.main.loc[self.main["Variable"] == "Arable area", "Business as usual"].item() grassland_area = self.main.loc[self.main["Variable"] == "Permanent grassland area", "Business as usual"].item() area = self.area["Area proportion (%)"][:-3] * arable_area / 100 area[35] = grassland_area area.index = crops_index # Extraire les taux de surface avec épendage et la teneur en azote des cultures epend = pd.read_excel( os.path.join(self.data_path, "GRAFS_data.xlsx"), sheet_name="crops", ) epend = epend.drop("Note", axis=1) epend = epend.set_index("Culture") epend["Area (ha)"] = area # Ajouter les paramètres des fonctions de production if self.func_prod == "Linear": a = self.area["a"][:-2] a.index = crops_index b = self.area["b"][:-2] b.index = crops_index epend["a"] = a epend["b"] = b if self.func_prod == "Ratio": Ymax = self.area["Ymax (kgN/ha)"][:-2] Ymax.index = crops_index epend["Ymax (kgN/ha)"] = Ymax if self.func_prod == "Exponential": Ymax = self.area["Ymax (kgN/ha)"][:-2] Ymax.index = crops_index epend["Ymax (kgN/ha)"] = Ymax k = self.area["k (kgN/ha)"][:-2] k.index = crops_index epend["k (kgN/ha)"] = k return epend
[docs] class ElevageData_prospect: """ Prepare and structure livestock data required for prospective scenarios in the NitrogenFlowModel. This class aggregates technical coefficients and main livestock statistics to compute nitrogen flows, production outputs, and excretion routes by species. """ def __init__(self, main, technical, data_path): """ Initialize the ElevageData_prospect instance. :param main: DataFrame with general livestock variables (e.g., total LU). :type main: pd.DataFrame :param technical: DataFrame with technical coefficients by animal type. :type technical: pd.DataFrame :param data_path: Path to the Excel file with volatilisation and nitrogen content. :type data_path: str """ self.main = main self.technical = technical self.df_elevage = self.create_elevage_dataframe(main, technical, data_path)
[docs] def create_elevage_dataframe(self, main, technical, data_path): """ Construct the `df_elevage` DataFrame containing: - livestock units (LU) per species - productivity indicators (carcass and dairy) - excretion distributions - volatilisation rates - ingestion and nitrogen partitioning (edible, non-edible, excreted) :param main: General livestock statistics. :type main: pd.DataFrame :param technical: Technical coefficients for livestock management. :type technical: pd.DataFrame :param data_path: Path to nitrogen content Excel file. :type data_path: str :return: A pandas DataFrame with livestock nitrogen and productivity balances. :rtype: pd.DataFrame """ types = ["Bovines", "Ovines", "Caprines", "Equines", "Poultry", "Porcines"] LU_tot = main.loc[main["Variable"] == "Total LU", "Business as usual"].item() LU = {} prod = {} dairy = {} excr = {} manure = {} o_liter = {} slurry = {} grass = {} for t in types: LU[t] = technical.loc[technical["Variable"] == f"{t} LU", "Business as usual"].item() * LU_tot prod[t] = technical.loc[technical["Variable"] == f"{t} productivity", "Business as usual"].item() excr[t] = technical.loc[ technical["Variable"] == f"kgN excreted by {t.lower()} LU", "Business as usual" ].item() manure[t] = technical.loc[ technical["Variable"] == f"{t} % excretion in the barn as litter manure", "Business as usual" ].item() o_liter[t] = technical.loc[ technical["Variable"] == f"{t} % excretion in the barn as other manure", "Business as usual" ].item() slurry[t] = technical.loc[ technical["Variable"] == f"{t} % excretion in the barn as slurry", "Business as usual" ].item() grass[t] = technical.loc[ technical["Variable"] == f"{t} % excretion on grassland", "Business as usual" ].item() if t in ["Bovines", "Ovines", "Poultry", "Caprines"]: dairy[t] = technical.loc[technical["Variable"] == f"{t} dairy productivity", "Business as usual"].item() else: dairy[t] = 0 gas_em = pd.read_excel(os.path.join(data_path, "GRAFS_data.xlsx"), sheet_name="Volatilisation").set_index( "Elevage" ) combined_data = { "LU": LU, "Productivity (kgcarcass/LU)": prod, "Dairy Productivity (kg/LU)": dairy, "Excretion per LU (kgN/LU)": excr, "% excreted on grassland": grass, "% excreted indoors as manure": manure, "% excreted indoors as other manure": o_liter, "% excreted indoors as slurry": slurry, } combined_df = pd.DataFrame(combined_data) combined_df.index = combined_df.index.str.lower() combined_df.rename(index={"equines": "equine"}, inplace=True) combined_df = combined_df.join(gas_em, how="left") combined_df = combined_df.fillna(0) combined_df["% excreted indoors"] = 100 - combined_df["% excreted on grassland"] combined_df["Production (ktcarcass)"] = combined_df["LU"] * combined_df["Productivity (kgcarcass/LU)"] * 1e-6 combined_df["Dairy Production (kt)"] = combined_df["LU"] * combined_df["Dairy Productivity (kg/LU)"] * 1e-6 combined_df["Edible Nitrogen (ktN)"] = ( combined_df["Production (ktcarcass)"] * combined_df["% edible"] + combined_df["Dairy Production (kt)"] * combined_df["%N dairy"] ) combined_df["Non Edible Nitrogen (ktN)"] = combined_df["Production (ktcarcass)"] * combined_df["% non edible"] combined_df["Excreted nitrogen (ktN)"] = combined_df["Excretion per LU (kgN/LU)"] * combined_df["LU"] * 1e-6 combined_df["Ingestion (ktN)"] = ( combined_df["Excreted nitrogen (ktN)"] + combined_df["Edible Nitrogen (ktN)"] + combined_df["Non Edible Nitrogen (ktN)"] ) return combined_df
[docs] class NitrogenFlowModel_prospect: """ Prospective extension of the NitrogenFlowModel for forward-looking nitrogen flow simulations. This class initializes and manages all components required for scenario-based nitrogen budgeting. It uses scenario-specific sheets for inputs (land use, livestock, technical parameters), and dynamically computes nitrogen fluxes between labeled system compartments using an adjacency matrix. Attributes: scenar_path (str): Path to the Excel file containing scenario sheets. labels (list): List of nitrogen system compartments used to label the adjacency matrix. df_cultures (pd.DataFrame): Crop-related nitrogen flows and parameters. df_elevage (pd.DataFrame): Livestock-related nitrogen flows and parameters. adjacency_matrix (np.ndarray): Matrix of nitrogen fluxes between compartments (in ktN/year). label_to_index (dict): Mapping from compartment labels to matrix indices. year (int): Scenario year (extracted from metadata). region (str): Scenario region (extracted from metadata). """ def __init__(self, scenar_path): self.scenar_path = scenar_path self.categories_mapping = categories_mapping self.labels = labels self.cultures = cultures self.legumineuses = legumineuses self.prairies = prairies self.betail = betail self.Pop = Pop self.ext = ext file_path = os.path.dirname(__file__) self.data_path = os.path.join(file_path, "data") self.scenar_sheets = pd.read_excel(os.path.join(self.scenar_path), sheet_name=None) self.doc = pd.DataFrame(self.scenar_sheets["doc"]) self.main = pd.DataFrame(self.scenar_sheets["main"]) self.area = pd.DataFrame(self.scenar_sheets["area"]) self.technical = pd.DataFrame(self.scenar_sheets["technical"]) self.prod_func = self.doc.loc[ self.doc["excel sheet for scenario writing"] == "Production function", self.doc.columns.tolist()[1] ].item() self.culture_data = CultureData_prospect( self.main, self.area, self.data_path, categories_mapping, self.prod_func ) self.elevage_data = ElevageData_prospect(self.main, self.technical, self.data_path) self.flux_generator = FluxGenerator(labels) self.df_cultures = self.culture_data.df_cultures self.df_elevage = self.elevage_data.df_elevage self.adjacency_matrix = self.flux_generator.adjacency_matrix self.label_to_index = self.flux_generator.label_to_index self.compute_fluxes()
[docs] def plot_heatmap(self): """ Plot a static heatmap of the nitrogen flux adjacency matrix. The heatmap displays nitrogen transfers (in ktN/year) between system compartments, on a log scale. Axes are labeled with numerical indices, and a legend maps these indices to compartment names. Features: - Log-normalized color scale ("plasma_r"). - Axis ticks on top (X) and left (Y). - Custom colorbar and visual grid. - Aspect ratio constrained to "equal" for readability. - Compartment label legend placed on the right. :return: None """ plt.figure(figsize=(10, 12), dpi=500) ax = plt.gca() # Créer la heatmap sans grille pour le moment norm = LogNorm(vmin=10**-4, vmax=self.adjacency_matrix.max()) sns.heatmap( self.adjacency_matrix, xticklabels=range(1, len(self.labels) + 1), yticklabels=range(1, len(self.labels) + 1), cmap="plasma_r", annot=False, norm=norm, ax=ax, cbar_kws={"label": "ktN/year", "orientation": "horizontal", "pad": 0.02}, ) # Ajouter la grille en gris clair ax.grid(True, color="lightgray", linestyle="-", linewidth=0.5) # Déplacer les labels de l'axe x en haut ax.xaxis.set_ticks_position("top") # Placer les ticks en haut ax.xaxis.set_label_position("top") # Placer le label en haut # Rotation des labels de l'axe x plt.xticks(rotation=90, fontsize=8) plt.yticks(rotation=0, fontsize=8) # Assurer que les axes sont égaux ax.set_aspect("equal", adjustable="box") # Ajouter des labels et un titre plt.xlabel("Target", fontsize=14, fontweight="bold") plt.ylabel("Source", fontsize=14, fontweight="bold") # plt.title(f'Heatmap of adjacency matrix for {region} in {year}') legend_labels = [f"{i + 1}: {label}" for i, label in enumerate(self.labels)] for i, label in enumerate(legend_labels): ax.text( 1.05, 1 - 1.1 * (i + 0.5) / len(legend_labels), label, transform=ax.transAxes, fontsize=10, va="center", ha="left", color="black", verticalalignment="center", horizontalalignment="left", linespacing=20, ) # Augmenter l'espacement entre les lignes # plt.subplots_adjust(bottom=0.2, right=0.85) # Réduire l'espace vertical entre la heatmap et la colorbar # Afficher la heatmap plt.show()
[docs] def plot_heatmap_interactive(self): """ Generate an interactive Plotly heatmap of the nitrogen flux adjacency matrix. This interactive version allows detailed inspection of each flux with tooltip information, uses a log10 transformation for the color scale, and supports hover interactivity and zooming. Features: - Simulated log-scale using log10. - Horizontal colorbar with real-world units (ktN/year). - Tooltip shows source, target, and value. - Index-to-label legend annotated on the right. - Matrix layout with reversed Y axis and axis ticks from 1 to N. :return: Plotly Figure object (can be shown with `fig.show()`). :rtype: plotly.graph_objs.Figure """ # 1) Préparation des labels numériques x_labels = list(range(1, len(self.labels) + 1)) y_labels = list(range(1, len(self.labels) + 1)) # Si vous ignorez la dernière ligne/colonne comme dans votre code : adjacency_subset = self.adjacency_matrix[: len(self.labels), : len(self.labels)] # 2) Gestion min/max et transformation log10 cmin = max(1e-4, np.min(adjacency_subset[adjacency_subset > 0])) cmax = 100 # np.max(adjacency_subset) log_matrix = np.where(adjacency_subset > 0, np.log10(adjacency_subset), np.nan) # 3) Construire un tableau 2D de chaînes pour le survol # Même dimension que log_matrix strings_matrix = [] for row_i, y_val in enumerate(y_labels): row_texts = [] for col_i, x_val in enumerate(x_labels): # Valeur réelle (non log) => adjacency_subset[row_i, col_i] real_val = adjacency_subset[row_i, col_i] if np.isnan(real_val): real_val_str = "0" else: real_val_str = f"{real_val:.2e}" # format décimal / exposant # Construire la chaîne pour la tooltip # y_val et x_val sont les indices 1..N # self.labels[y_val] = nom de la source, self.labels[x_val] = nom de la cible tooltip_str = f"Source : {self.labels[y_val - 1]}<br>Target : {self.labels[x_val - 1]}<br>Value : {real_val_str} ktN/yr" row_texts.append(tooltip_str) strings_matrix.append(row_texts) # 3) Tracé Heatmap avec go.Figure + go.Heatmap # On règle "zmin" et "zmax" en valeurs log10 # pour contrôler la gamme de couleurs trace = go.Heatmap( z=log_matrix, x=x_labels, y=y_labels, colorscale="Plasma_r", zmin=np.log10(cmin), zmax=np.log10(cmax), text=strings_matrix, # tableau 2D de chaînes hoverinfo="text", # on n'affiche plus x, y, z bruts # Colorbar horizontale colorbar=dict( title="ktN/year", orientation="h", x=0.5, # centré horizontalement xanchor="center", y=-0.15, # en dessous de la figure thickness=15, # épaisseur len=1, # longueur en fraction de la largeur ), # Valeurs de survol -> vous verrez log10(...) par défaut # Pour afficher la valeur réelle, on peut plus tard utiliser "customdata" ) # Créer la figure et y ajouter le trace fig = go.Figure(data=[trace]) # 4) Discrétisation manuelle des ticks sur la colorbar # On veut afficher l'échelle réelle (et pas log10) # => calcul de tickvals en log10, et ticktext en 10^(tickvals) tickvals = np.linspace(np.floor(np.log10(cmin)), np.ceil(np.log10(cmax)), num=7) ticktext = [10**x for x in range(-4, 3, 1)] # [f"{10**v:.2e}" for v in tickvals] # Mettre à jour le trace pour forcer l'affichage fig.data[0].update( colorbar=dict( title="ktN/year", orientation="h", x=0.5, xanchor="center", y=-0.15, thickness=25, len=1, tickmode="array", tickvals=tickvals, ticktext=ticktext, ) ) # 5) Configuration de la mise en page fig.update_layout( width=1980, height=800, margin=dict(t=0, b=0, l=0, r=150), # espace à droite pour la légende ) # Axe X en haut fig.update_xaxes( title="Target", side="top", # place les ticks en haut tickangle=90, # rotation tickmode="array", tickfont=dict(size=10), tickvals=x_labels, # forcer l'affichage 1..N ticktext=[str(x) for x in x_labels], ) # Axe Y : inverser l'ordre pour un style "matriciel" standard fig.update_yaxes( title="Source", autorange="reversed", tickmode="array", tickfont=dict(size=10), tickvals=y_labels, ticktext=[str(y) for y in y_labels], ) # 6) Ajouter la légende à droite # Format : "1: label[0]" ... vertical legend_text = "<br>".join(f"{i + 1} : {lbl}" for i, lbl in enumerate(self.labels)) fig.add_annotation( x=1.3, # un peu à droite y=0.5, # centré en hauteur xref="paper", yref="paper", showarrow=False, text=legend_text, align="left", valign="middle", font=dict(size=9), bordercolor="rgba(0,0,0,0)", borderwidth=1, borderpad=4, bgcolor="rgba(0,0,0,0)", ) return fig
[docs] def compute_fluxes(self): """ Compute nitrogen flows across all compartments for the given scenario. This method: - Retrieves key scenario inputs (year, region, crop and livestock data). - Initializes the flux generator using crop and livestock data. - Updates the adjacency matrix accordingly. Must be called during initialization to populate model outputs. :return: None """ # Extraire les variables nécessaires df_cultures = self.df_cultures df_elevage = self.df_elevage adjacency_matrix = self.adjacency_matrix label_to_index = self.label_to_index main = self.main technical = self.technical area = self.area doc = self.doc year = doc.loc[doc["excel sheet for scenario writing"] == "Year", doc.columns.tolist()[1]].item() region = doc.loc[doc["excel sheet for scenario writing"] == "Region name", doc.columns.tolist()[1]].item() self.year = year self.region = region flux_generator = self.flux_generator if self.prod_func == "Linear": ym = "a" if self.prod_func in ["Ratio", "Exponential"]: ym = "Ymax (kgN/ha)" # Gestion du cas particulier pour 'Straw' cereales = ["Wheat", "Rye", "Barley", "Oat", "Grain maize", "Other cereals"] somme_surface_cereales = df_cultures["Area (ha)"][cereales].sum() df_cultures.loc["Straw", "Area (ha)"] = ( somme_surface_cereales * 0.1 # On attribue 10% des surfaces de céréales à la paille. A changer avec les coproduits TODO ) for cereal in cereales: df_cultures.loc[cereal, "Area (ha)"] -= ( df_cultures.loc["Straw", "Area (ha)"] * df_cultures.loc[cereal, "Area (ha)"] / somme_surface_cereales ) # Flux depuis 'other sectors' vers les cibles sélectionnées if self.prod_func in ["Ratio", "Exponential"]: # Pour éviter des valeur délirante, on interprète ymax comme un niveau de fertilisation caractéristique (Y(Ymax) = Ymax/2) target = ( df_cultures["Seed input (kt seeds/kt Ymax)"] * df_cultures[ym] / 2 * df_cultures["Area (ha)"] * 1e-6 ).to_dict() else: target = ( df_cultures["Seed input (kt seeds/kt Ymax)"] * df_cultures[ym] * df_cultures["Area (ha)"] * 1e-6 ).to_dict() source = {"other sectors": 1} flux_generator.generate_flux(source, target) # Fixation symbiotique target_fixation = ( df_cultures["N fixation coef (kgN/kgN)"] * df_cultures[ym] * df_cultures["Area (ha)"] * 1e-6 ).to_dict() source_fixation = {"atmospheric N2": 1} flux_generator.generate_flux(source_fixation, target_fixation) df_cultures["Symbiotic fixation (ktN)"] = df_cultures.index.map(target_fixation).fillna(0) ## Épandage de boue sur les champs FE_N_N02_em = 0.002 FE_N_NH3_em = 0.118 FE_N_N2_em = 0.425 pop = main.loc[main["Variable"] == "Population", "Business as usual"].item() prop_urb = main.loc[main["Variable"] == "Urban population", "Business as usual"].item() / 100 N_cons_cap = main.loc[main["Variable"] == "Total per capita protein ingestion", "Business as usual"].item() N_cap_vege = main.loc[main["Variable"] == "Vegetal per capita protein ingestion", "Business as usual"].item() N_cap_viande = main.loc[ main["Variable"] == "Edible animal per capita protein ingestion (excl fish)", "Business as usual" ].item() N_boue = pop * N_cons_cap N_vege = pop * N_cap_vege N_viande = pop * N_cap_viande # Et calcul rapide sur les ingestions de produits de la pêche N_fish = N_boue - N_vege - N_viande source = {"fishery products": N_fish} target = {"urban": prop_urb, "rural": 1 - prop_urb} flux_generator.generate_flux(source, target) # Revenons aux boues prop_recy_urb = ( technical.loc[ technical["Variable"] == "N recycling rate of human excretion in urban area", "Business as usual" ].item() / 100 ) prop_recy_rur = ( technical.loc[ technical["Variable"] == "N recycling rate of human excretion in rural area", "Business as usual" ].item() / 100 ) Norm = sum(df_cultures["Area (ha)"] * df_cultures["Spreading Rate (%)"]) # Création du dictionnaire target target_ependage = { culture: row["Area (ha)"] * row["Spreading Rate (%)"] / Norm for culture, row in df_cultures.iterrows() } source_boue = { "urban": prop_urb * N_boue * prop_recy_urb, "rural": (1 - prop_urb) * prop_recy_rur * N_boue, } flux_generator.generate_flux(source_boue, target_ependage) # Le reste est perdu dans l'environnement # Ajouter les fuites de N2O source = { "urban": N_boue * prop_urb * FE_N_N02_em, "rural": N_boue * (1 - prop_urb) * FE_N_N02_em, } target = {"N2O emission": 1} flux_generator.generate_flux(source, target) # Ajouter les fuites de NH3 source = { "urban": N_boue * prop_urb * FE_N_NH3_em, "rural": N_boue * (1 - prop_urb) * FE_N_NH3_em, } target = {"NH3 volatilization": 1} flux_generator.generate_flux(source, target) # Ajouter les fuites de N2 source = { "urban": N_boue * prop_urb * FE_N_N2_em, "rural": N_boue * (1 - prop_urb) * FE_N_N2_em, } target = {"atmospheric N2": 1} flux_generator.generate_flux(source, target) # Le reste est perdu dans l'hydroshere target = {"hydro-system": 1} source = { "urban": N_boue * prop_urb * (1 - prop_recy_urb - FE_N_N02_em - FE_N_NH3_em - FE_N_N2_em), "rural": N_boue * (1 - prop_urb) * (1 - prop_recy_rur - FE_N_NH3_em - FE_N_N02_em - FE_N_N2_em), } # Remplir la matrice d'adjacence flux_generator.generate_flux(source, target) # Azote excrété sur les prairies # Calculer les poids pour chaque cible # Calcul de la surface totale pour les prairies total_surface = ( df_cultures.loc["Alfalfa and clover", "Area (ha)"] + df_cultures.loc["Non-legume temporary meadow", "Area (ha)"] + df_cultures.loc["Natural meadow ", "Area (ha)"] ) # Création du dictionnaire target target = { "Alfalfa and clover": df_cultures.loc["Alfalfa and clover", "Area (ha)"] / total_surface, "Non-legume temporary meadow": df_cultures.loc["Non-legume temporary meadow", "Area (ha)"] / total_surface, "Natural meadow ": df_cultures.loc["Natural meadow ", "Area (ha)"] / total_surface, } source = ( df_elevage["Excreted nitrogen (ktN)"] * df_elevage["% excreted on grassland"] / 100 * (1 - df_elevage["N-NH3 EM. outdoor"] - df_elevage["N-N2O EM. outdoor"] - df_elevage["N-N2 EM. outdoor"]) ).to_dict() flux_generator.generate_flux(source, target) # Le reste est émit dans l'atmosphere # N2 target = {"atmospheric N2": 1} source = ( df_elevage["Excreted nitrogen (ktN)"] * df_elevage["% excreted on grassland"] / 100 * df_elevage["N-N2 EM. outdoor"] ).to_dict() flux_generator.generate_flux(source, target) # NH3 # 1 % est volatilisée de manière indirecte sous forme de N2O target = {"NH3 volatilization": 0.99, "N2O emission": 0.01} source = ( df_elevage["Excreted nitrogen (ktN)"] * df_elevage["% excreted on grassland"] / 100 * df_elevage["N-NH3 EM. outdoor"] ).to_dict() flux_generator.generate_flux(source, target) # volat_N2O = ( # 0.01 # * df_elevage["Excreted nitrogen (ktN)"] # * df_elevage["% excreted on grassland"] # / 100 # * df_elevage["N-NH3 EM. outdoor"] # ) # # N2O # target = {"N2O emission": 1} # source = ( # volat_N2O # + df_elevage["Excreted nitrogen (ktN)"] # * df_elevage["% excreted on grassland"] # / 100 # * df_elevage["N-N2O EM. outdoor"] # ).to_dict() # flux_generator.generate_flux(source, target) ## Epandage sur champs source = ( df_elevage["Excreted nitrogen (ktN)"] * df_elevage["% excreted indoors"] / 100 * ( df_elevage["% excreted indoors as manure"] / 100 * ( 1 - df_elevage["N-NH3 EM. manure indoor"] - df_elevage["N-N2O EM. manure indoor"] - df_elevage["N-N2 EM. manure indoor"] ) + df_elevage["% excreted indoors as slurry"] / 100 * ( 1 - df_elevage["N-NH3 EM. slurry indoor"] - df_elevage["N-N2O EM. slurry indoor"] - df_elevage["N-N2 EM. slurry indoor"] ) ) ).to_dict() flux_generator.generate_flux(source, target_ependage) # Le reste part dans l'atmosphere # N2 target = {"atmospheric N2": 1} source = ( df_elevage["Excreted nitrogen (ktN)"] * df_elevage["% excreted indoors"] / 100 * ( df_elevage["% excreted indoors as slurry"] / 100 * df_elevage["N-N2 EM. slurry indoor"] + df_elevage["% excreted indoors as manure"] / 100 * df_elevage["N-N2 EM. manure indoor"] # + other manure emission ?? TODO ) ).to_dict() flux_generator.generate_flux(source, target) # NH3 # 1 % est volatilisée de manière indirecte sous forme de N2O target = {"NH3 volatilization": 0.99, "N2O emission": 0.01} source = ( df_elevage["Excreted nitrogen (ktN)"] * df_elevage["% excreted indoors"] / 100 * ( df_elevage["% excreted indoors as slurry"] / 100 * df_elevage["N-NH3 EM. slurry indoor"] + df_elevage["% excreted indoors as manure"] / 100 * df_elevage["N-NH3 EM. manure indoor"] ) ).to_dict() flux_generator.generate_flux(source, target) # volat_N2O = ( # 0.01 # * df_elevage["Excreted nitrogen (ktN)"] # * df_elevage["% excreted indoors"] # / 100 # * ( # df_elevage["% excreted indoors as slurry"] / 100 * df_elevage["N-NH3 EM. slurry indoor"] # + df_elevage["% excreted indoors as manure"] / 100 * df_elevage["N-NH3 EM. manure indoor"] # ) # ) # # N2O # target = {"N2O emission": 1} # source = ( # volat_N2O # + df_elevage["Excreted nitrogen (ktN)"] # * df_elevage["% excreted indoors"] # / 100 # * ( # df_elevage["% excreted indoors as slurry"] / 100 * df_elevage["N-N2O EM. slurry indoor"] # + df_elevage["% excreted indoors as manure"] / 100 * df_elevage["N-N2O EM. manure indoor"] # ) # ).to_dict() # flux_generator.generate_flux(source, target) # Dépôt atmosphérique : proportionel aux emmission de gaz azoté. A voir après l'élevage ! target = ( df_cultures["Area (ha)"] / df_cultures["Area (ha)"].sum() ).to_dict() # Dépôt proportionnel aux surface source = { "N2O emission": 0.01 * adjacency_matrix[:, label_to_index["N2O emission"]].sum(), "NH3 volatilization": 0.1 * adjacency_matrix[:, label_to_index["NH3 volatilization"]].sum(), } # TODO réfléchir à comment intégrer les retombées de la volatilisation des engrais synthétiques adjacency_matrix[:, label_to_index["N2O emission"]] *= 0.99 adjacency_matrix[:, label_to_index["NH3 volatilization"]] *= 0.9 flux_generator.generate_flux(source, target) df_cultures = df_cultures.fillna(0) # Calcul de l'azote épendu par hectare def calculer_azote_ependu(culture): sources = self.betail + self.Pop + ["atmospheric N2", "other sectors", "NH3 volatilization", "N2O emission"] adj_matrix_df = pd.DataFrame(adjacency_matrix, index=self.labels, columns=self.labels) return adj_matrix_df.loc[sources, culture].sum() df_cultures["Total Non Synthetic Fertilizer Use (ktN)"] = df_cultures.index.map(calculer_azote_ependu) df_cultures["Surface Non Synthetic Fertilizer Use (kgN/ha)"] = df_cultures.apply( lambda row: row["Total Non Synthetic Fertilizer Use (ktN)"] / row["Area (ha)"] * 10**6 if row["Area (ha)"] > 0 and row["Total Non Synthetic Fertilizer Use (ktN)"] > 0 else 0, axis=1, ) df_cultures["Yield (kgN/ha)"] = 0.0 df_cultures["Nitrogen Production (ktN)"] = 0.0 # Les légumineuses ne reçoivent pas d'azote synthétique. On peut déjà calculer leur rendement for leg in df_cultures.loc[df_cultures["Category"] == "leguminous"].index.tolist() + ["Alfalfa and clover"]: if self.prod_func == "Linear": Yield = Y.Y_th_lin( df_cultures.loc[df_cultures.index == leg, "Surface Non Synthetic Fertilizer Use (kgN/ha)"].item(), df_cultures.loc[df_cultures.index == leg, "a"].item(), df_cultures.loc[df_cultures.index == leg, "b"].item(), ) if self.prod_func == "Ratio": Yield = Y.Y_th( df_cultures.loc[df_cultures.index == leg, "Surface Non Synthetic Fertilizer Use (kgN/ha)"].item(), df_cultures.loc[df_cultures.index == leg, "Ymax (kgN/ha)"].item(), ) if self.prod_func == "Exponential": Yield = Y.Y_th_exp_cap( df_cultures.loc[df_cultures.index == leg, "Surface Non Synthetic Fertilizer Use (kgN/ha)"].item(), df_cultures.loc[df_cultures.index == leg, "Ymax (kgN/ha)"].item(), df_cultures.loc[df_cultures.index == leg, "k (kgN/ha)"].item(), ) df_cultures.loc[df_cultures.index == leg, "Yield (kgN/ha)"] = Yield df_cultures["Nitrogen Production (ktN)"] = df_cultures["Yield (kgN/ha)"] * df_cultures["Area (ha)"] / 1e6 # Mécanisme d'héritage de l'azote en surplus des légumineuses df_cultures["Leguminous Nitrogen Surplus (ktN)"] = 0.0 leg_with_alfalfa = self.legumineuses + ["Alfalfa and clover"] surplus = ( df_cultures.loc[leg_with_alfalfa, "Total Non Synthetic Fertilizer Use (ktN)"].astype(float).values - df_cultures.loc[leg_with_alfalfa, "Nitrogen Production (ktN)"].astype(float).values ) df_cultures.loc[leg_with_alfalfa, "Leguminous Nitrogen Surplus (ktN)"] = surplus # Distribution du surplus aux céréales total_surplus_azote = df_cultures["Leguminous Nitrogen Surplus (ktN)"].sum() total_surface_cereales = df_cultures.loc[ ( (df_cultures["Category"] == "cereals (excluding rice)") | (df_cultures.index.isin(["Straw", "Forage maize"])) ), "Area (ha)", ].sum() df_cultures["Leguminous heritage (ktN)"] = 0.0 mask_cereales = (df_cultures["Category"] == "cereals (excluding rice)") | ( df_cultures.index.isin(["Straw", "Forage maize"]) ) # Extraire les surfaces concernées surface_vals = df_cultures.loc[mask_cereales, "Area (ha)"].astype(float).values # Calcul du surplus hérité à répartir heritage_vals = surface_vals / total_surface_cereales * total_surplus_azote # Affecter proprement sans mismatch df_cultures.loc[mask_cereales, "Leguminous heritage (ktN)"] = heritage_vals # Mise à jour de la fertilisation non synthétique mask = df_cultures["Area (ha)"] != 0 # Calcul en numpy pour éviter le mismatch d'index heritage = df_cultures.loc[mask, "Leguminous heritage (ktN)"].astype(float).values area = df_cultures.loc[mask, "Area (ha)"].astype(float).values delta = heritage / area * 1e6 # Mise à jour avec des valeurs bien typées et bien alignées df_cultures.loc[mask, "Surface Non Synthetic Fertilizer Use (kgN/ha)"] += delta df_cultures["Total Non Synthetic Fertilizer Use (ktN)"] += df_cultures["Leguminous heritage (ktN)"] # Génération des flux pour l'héritage des légumineuses source_leg = ( df_cultures.loc[df_cultures["Leguminous Nitrogen Surplus (ktN)"] > 0]["Leguminous Nitrogen Surplus (ktN)"] / df_cultures["Leguminous Nitrogen Surplus (ktN)"].sum() ).to_dict() target_leg = df_cultures["Leguminous heritage (ktN)"].to_dict() flux_generator.generate_flux(source_leg, target_leg) net_import = main.loc[main["Variable"] == "Net import of vegetal pdcts", "Business as usual"].item() Import = max(0, net_import) N_synth_crop = ( main.loc[main["Variable"] == "Synth N fertilizer application to cropland", "Business as usual"].item() * main.loc[main["Variable"] == "Arable area", "Business as usual"].item() / 1e6 ) N_synth_grass = ( main.loc[main["Variable"] == "Synth N fertilizer application to grassland", "Business as usual"].item() * main.loc[main["Variable"] == "Permanent grassland area", "Business as usual"].item() / 1e6 ) w_diet = technical.loc[technical["Variable"] == "Weight diet", "Business as usual"].item() w_Nsyn = technical.loc[technical["Variable"] == "Weight synthetic fertilizer use", "Business as usual"].item() w_imp = technical.loc[technical["Variable"] == "Weight import/export balance", "Business as usual"].item() df_elevage_comp = df_elevage.copy() df_cons_vege = df_elevage.loc[df_elevage["Ingestion (ktN)"] > 10**-8, "Ingestion (ktN)"] # On ajoute l'ingestion humaine # Une ligne urban, une ligne rural. Cela simplifiera la distinction de regime si un jour c'est pertinent df_cons_vege.loc["urban"] = N_vege * prop_urb df_cons_vege.loc["rural"] = N_vege * (1 - prop_urb) if len(df_cons_vege) > 0: CROPS = list(df_cultures.index) # par exemple CONSUMERS = list(df_cons_vege.index) # -------------------------------------------------------------------------- # 1) Collect data from your DataFrames (example placeholders) # -------------------------------------------------------------------------- area = {c: df_cultures.at[c, "Area (ha)"] for c in CROPS} if self.prod_func == "Ratio": Ymax = {c: df_cultures.at[c, "Ymax (kgN/ha)"] for c in CROPS} # kparam = {c: df_cultures.at[c, "k (kgN/ha)"] for c in CROPS} if self.prod_func == "Linear": a = {c: df_cultures.at[c, "a"] for c in CROPS} b = {c: df_cultures.at[c, "b"] for c in CROPS} if self.prod_func == "Exponential": Ymax = {c: df_cultures.at[c, "Ymax (kgN/ha)"] for c in CROPS} k_param = {c: df_cultures.at[c, "k (kgN/ha)"] for c in CROPS} nonSynthFert = {c: df_cultures.at[c, "Surface Non Synthetic Fertilizer Use (kgN/ha)"] for c in CROPS} ingestion = {k: df_cons_vege.at[k] for k in CONSUMERS} # Filter out any (c,k) pairs not in the diet. We'll build an "allowed" pair list: allowed_ck = [] for k in CONSUMERS: # Gather all crops that appear in any sub-list of regimes[k] authorized_crops = set() for p_ideal, c_list in regimes[k].items(): authorized_crops.update(c_list) # We'll keep only these (c, k) pairs for c in CROPS: if c in authorized_crops: allowed_ck.append((c, k)) # -------------------------------------------------------------------------- # 2) Create indexing for decision variables # -------------------------------------------------------------------------- idx_synth = {} offset = 0 for c in CROPS: idx_synth[c] = offset offset += 1 idx_alloc = {} for c, k in allowed_ck: idx_alloc[(c, k)] = offset offset += 1 idx_import = {} for c, k in allowed_ck: idx_import[(c, k)] = offset offset += 1 n_vars = offset # total number of decision variables w_spread_fixed = 1 fixed_availability_proportion = {} # Dict: {(k, tuple(group_crops), c): proportion} epsilon = 1e-9 for k_ in CONSUMERS: for group_crops in regimes[k_].values(): if not isinstance(group_crops, (list, tuple, set)): continue # Use Area (ha) as the basis for fixed availability weight group_total_area = sum(area.get(c2, 0) for c2 in group_crops) if group_total_area < epsilon: continue # Skip if group has no area group_key = tuple(sorted(group_crops)) # Use a consistent key for the group for c in group_crops: crop_area = area.get(c, 0) proportion = crop_area / group_total_area fixed_availability_proportion[(k_, group_key, c)] = proportion # -------------------------------------------------------------------------- # 3) Define the objective function # -------------------------------------------------------------------------- def Y_th_lin(f, a, b): return np.minimum(a * f, b) def Y_th_ratio(f, ymax): if f + ymax == 0: return 0 return f * ymax / (f + ymax) def objective(x): """ Return the scalar value of the objective: w_diet * diet_deviation + w_Nsyn * fertilizer_deviation + w_imp * import_export_deviation """ # 3.a) diet_deviation total_dev = 0.0 for k_ in CONSUMERS: # 1) Somme totale allouée (local + imports) pour le consommateur k_ denom_k = 0.0 for c_ in CROPS: # Allocation locale if (c_, k_) in idx_alloc: denom_k += x[idx_alloc[(c_, k_)]] # Allocation via import if (c_, k_) in idx_import: denom_k += x[idx_import[(c_, k_)]] # 2) Pour chaque proportion idéale, calculez la somme des cultures du groupe for p_ideal, c_list in regimes[k_].items(): group_alloc = 0.0 for c_ in c_list: # Ajout part locale if (c_, k_) in idx_alloc: group_alloc += x[idx_alloc[(c_, k_)]] # Ajout part importée if (c_, k_) in idx_import: group_alloc += x[idx_import[(c_, k_)]] if denom_k < 1e-6: proportion_real = 0.0 else: proportion_real = group_alloc / denom_k # Ecart diététique total_dev += (proportion_real - p_ideal) ** 2 # 3.b) fertilizer_deviation # sum of synthetic fertilizers in ktN total_synth = 0.0 for c in CROPS: # x[idx_synth[c]] is in kgN/ha fert_per_ha = x[idx_synth[c]] + nonSynthFert[c] total_synth += x[idx_synth[c]] * area[c] # only the synthetic part # Convert from kgN to ktN total_synth_kt = total_synth / 1e6 # desired total = (N_synth_crop + N_synth_grass) if N_synth_crop + N_synth_grass < 1: scale = 1 else: scale = N_synth_crop + N_synth_grass fert_dev = np.maximum(0, (total_synth_kt - (N_synth_crop + N_synth_grass)) / scale) ** 2 # fert_dev = ((total_synth_kt - (N_synth_crop + N_synth_grass)) / scale) ** 2 # 3.c) import_export_deviation # sum import sum_imp = 0.0 for c, k in allowed_ck: sum_imp += x[idx_import[(c, k)]] # Calcul de la production "non allouée" => export export_total = 0.0 for c in CROPS: # Production locale (ktN) ; on suppose qu'elle est déjà correcte/à jour : if self.prod_func == "Ratio": production_c = ( Y_th_ratio(x[idx_synth[c]] + nonSynthFert[c], Ymax[c]) * df_cultures.at[c, "Area (ha)"] / 1e6 ) if self.prod_func == "Linear": production_c = ( Y_th_lin(x[idx_synth[c]] + nonSynthFert[c], a[c], b[c]) * df_cultures.at[c, "Area (ha)"] / 1e6 ) if self.prod_func == "Exponential": production_c = ( Y.Y_th_exp_cap(x[idx_synth[c]] + nonSynthFert[c], Ymax[c], k_param[c]) * df_cultures.at[c, "Area (ha)"] / 1e6 ) # Somme des allocations locales sur c allocated_c = 0.0 # idx_alloc[(c,k)] = indice pour allocate(c,k) for k in CONSUMERS: if (c, k) in idx_alloc: allocated_c += x[idx_alloc[(c, k)]] # leftover = production - allocated # S'il est > 0 => export, s'il est < 0 => on a sur-alloué (besoin d'import net) leftover_c = production_c - allocated_c export_total += leftover_c # Net import du modèle net_import_model = sum_imp - export_total if abs(net_import) < 1: imp_dev = (net_import_model - net_import) ** 2 else: imp_dev = ((net_import_model - net_import) / (net_import + 1e-6)) ** 2 # --- NEW: Allocation Spread Penalty based on Fixed Proportions --- spread_penalty_fixed = 0.0 epsilon = 1e-9 for k_ in CONSUMERS: for group_crops in regimes[k_].values(): # Iterate over the crop lists defining groups if not isinstance(group_crops, (list, tuple, set)): continue group_key = tuple(sorted(group_crops)) # Match key used in pre-calculation # --- Calculate group total allocation for consumer k_ --- group_alloc_kG = 0.0 for c2 in group_crops: if (c2, k_) in idx_alloc: group_alloc_kG += x[idx_alloc[(c2, k_)]] if (c2, k_) in idx_import: group_alloc_kG += x[idx_import[(c2, k_)]] # If total allocation to group is negligible, skip penalty if group_alloc_kG < epsilon: continue # --- Calculate penalty for each crop in the group --- for c in group_crops: # Get the pre-calculated fixed proportion target_proportion = fixed_availability_proportion.get((k_, group_key, c), 0) if target_proportion < epsilon: # Skip if this crop had no area in pre-calc continue # Calculate target allocation based on fixed proportion target_alloc_c = target_proportion * group_alloc_kG # Calculate this specific crop's actual allocation for consumer k_ alloc_ck = 0.0 if (c, k_) in idx_alloc: alloc_ck += x[idx_alloc[(c, k_)]] if (c, k_) in idx_import: alloc_ck += x[idx_import[(c, k_)]] # Penalize squared deviation from the target allocation deviation = alloc_ck - target_alloc_c spread_penalty_fixed += deviation**2 # Note: This penalizes both over- and under-allocation relative to the fixed proportion. # If you ONLY want to penalize under-allocation (concentration), use: # if deviation < 0: # i.e., alloc_ck < target_alloc_c # spread_penalty_fixed += deviation**2 return w_diet * total_dev + w_Nsyn * fert_dev + w_imp * imp_dev + w_spread_fixed * spread_penalty_fixed def objective_gradient(x): """ Computes the gradient of the objective function. """ area = df_cultures["Area (ha)"].to_dict() # Faster access than .loc inside loops grad = np.zeros(n_vars, dtype=float) epsilon = 1e-12 # For safe division # --- Part 1: Import Deviation Gradient --- # dDi/dx[j] = 1 if x[j] is an import variable, 0 otherwise for c, k in allowed_ck: # Assuming allowed_ck contains all keys for idx_import if (c, k) in idx_import: # Check if the import variable exists for this combo import_idx = idx_import[(c, k)] grad[import_idx] += w_imp * 1.0 # --- Part 2: Fertilizer Deviation Gradient --- total_synth_kg = 0.0 synth_indices = [] # Store indices of synth vars for later gradient update synth_crop_map = {} # Map index back to crop for c in CROPS: if c in idx_synth: # Make sure the crop has a synth variable synth_idx = idx_synth[c] total_synth_kg += x[synth_idx] * area.get(c, 0) # Use .get for safety synth_indices.append(synth_idx) synth_crop_map[synth_idx] = c total_synth_kt = total_synth_kg / 1e6 Target = N_synth_crop + N_synth_grass Scale = max(1.0, Target) # Ensure scale is at least 1 and float Z = (total_synth_kt - Target) / Scale if Z > 0: # Base derivative factor d(Df)/d(total_synth_kt) base_fert_deriv = (2.0 * Z) / Scale for synth_idx in synth_indices: c_syn = synth_crop_map[synth_idx] # d(total_synth_kt)/dx[j] = area[c_syn] / 1e6 deriv_contrib = base_fert_deriv * (area.get(c_syn, 0) / 1e6) grad[synth_idx] += w_Nsyn * deriv_contrib # --- Part 3: Diet Deviation Gradient --- for k_ in CONSUMERS: # Calculate denom_k (total allocation for consumer k) *once* denom_k = 0.0 # Store indices and type relevant to this consumer for quick lookup alloc_indices_k = {} # Map index to crop import_indices_k = {} # Map index to crop for c_ in CROPS: alloc_key = (c_, k_) if alloc_key in idx_alloc: idx = idx_alloc[alloc_key] denom_k += x[idx] alloc_indices_k[idx] = c_ if alloc_key in idx_import: idx = idx_import[alloc_key] denom_k += x[idx] import_indices_k[idx] = c_ if denom_k < epsilon: # If total allocation is near zero, derivative is zero continue # Skip this consumer # Calculate contributions for each group in the regime for p_ideal, c_list in regimes[k_].items(): # Calculate group_alloc *once* for this group group_alloc = 0.0 group_alloc_indices = set() # Indices contributing to this group_alloc group_import_indices = set() for c_in_list in c_list: alloc_key = (c_in_list, k_) if alloc_key in idx_alloc: idx = idx_alloc[alloc_key] group_alloc += x[idx] group_alloc_indices.add(idx) if alloc_key in idx_import: idx = idx_import[alloc_key] group_alloc += x[idx] group_import_indices.add(idx) # Calculate proportion_real and the base deviation factor prop_real = group_alloc / denom_k base_diet_deriv = 2.0 * (prop_real - p_ideal) # Calculate d(prop_real)/d(x_j) contributions using the quotient rule components # Derivative Factor = base_diet_deriv * (1 / denom_k**2) # This avoids repeated division inside the loops below quotient_deriv_factor = base_diet_deriv / (denom_k * denom_k) # denom_k**2 can be large # Term 1: + denom_k * d(group_alloc)/dx[j] # Term 2: - group_alloc * d(denom_k)/dx[j] # Apply derivative contributions to relevant grad entries # d(group_alloc)/dx[j] is 1 ONLY for alloc/import vars in this c_list # Contribution: quotient_deriv_factor * denom_k * (+1) for group_idx in group_alloc_indices.union(group_import_indices): grad[group_idx] += w_diet * quotient_deriv_factor * denom_k # * (+1 is implicit) # d(denom_k)/dx[j] is 1 for ALL alloc/import vars for this consumer k # Contribution: quotient_deriv_factor * (-group_alloc) * (+1) neg_group_alloc_term = -group_alloc # Precompute for k_alloc_idx in alloc_indices_k: # Indices of alloc vars for consumer k grad[k_alloc_idx] += ( w_diet * quotient_deriv_factor * neg_group_alloc_term ) # * (+1 implicit) for k_import_idx in import_indices_k: # Indices of import vars for consumer k grad[k_import_idx] += ( w_diet * quotient_deriv_factor * neg_group_alloc_term ) # * (+1 implicit) return grad def compute_objective_terms(x): """ Return the scalar value of the objective: w_diet * diet_deviation + w_Nsyn * fertilizer_deviation + w_imp * import_export_deviation """ # 3.a) diet_deviation total_dev = 0.0 for k_ in CONSUMERS: # 1) Somme totale allouée (local + imports) pour le consommateur k_ denom_k = 0.0 for c_ in CROPS: # Allocation locale if (c_, k_) in idx_alloc: denom_k += x[idx_alloc[(c_, k_)]] # Allocation via import if (c_, k_) in idx_import: denom_k += x[idx_import[(c_, k_)]] # 2) Pour chaque proportion idéale, calculez la somme des cultures du groupe for p_ideal, c_list in regimes[k_].items(): group_alloc = 0.0 for c_ in c_list: # Ajout part locale if (c_, k_) in idx_alloc: group_alloc += x[idx_alloc[(c_, k_)]] # Ajout part importée if (c_, k_) in idx_import: group_alloc += x[idx_import[(c_, k_)]] if denom_k < 1e-6: proportion_real = 0.0 else: proportion_real = group_alloc / denom_k # Ecart diététique total_dev += (proportion_real - p_ideal) ** 2 # 3.b) fertilizer_deviation # sum of synthetic fertilizers in ktN total_synth = 0.0 for c in CROPS: # x[idx_synth[c]] is in kgN/ha fert_per_ha = x[idx_synth[c]] + nonSynthFert[c] total_synth += x[idx_synth[c]] * area[c] # only the synthetic part # Convert from kgN to ktN total_synth_kt = total_synth / 1e6 # desired total = (N_synth_crop + N_synth_grass) if N_synth_crop + N_synth_grass < 1: scale = 1 else: scale = N_synth_crop + N_synth_grass fert_dev = np.maximum(0, (total_synth_kt - (N_synth_crop + N_synth_grass)) / scale) ** 2 # fert_dev = ((total_synth_kt - (N_synth_crop + N_synth_grass)) / scale) ** 2 # 3.c) import_export_deviation # sum import sum_imp = 0.0 for c, k in allowed_ck: sum_imp += x[idx_import[(c, k)]] # Calcul de la production "non allouée" => export export_total = 0.0 for c in CROPS: # Production locale (ktN) ; on suppose qu'elle est déjà correcte/à jour : if self.prod_func == "Ratio": production_c = ( Y_th_ratio(x[idx_synth[c]] + nonSynthFert[c], Ymax[c]) * df_cultures.at[c, "Area (ha)"] / 1e6 ) if self.prod_func == "Linear": production_c = ( Y_th_lin(x[idx_synth[c]] + nonSynthFert[c], a[c], b[c]) * df_cultures.at[c, "Area (ha)"] / 1e6 ) if self.prod_func == "Exponential": production_c = ( Y.Y_th_exp_cap(x[idx_synth[c]] + nonSynthFert[c], Ymax[c], k_param[c]) * df_cultures.at[c, "Area (ha)"] / 1e6 ) # Somme des allocations locales sur c allocated_c = 0.0 # idx_alloc[(c,k)] = indice pour allocate(c,k) for k in CONSUMERS: if (c, k) in idx_alloc: allocated_c += x[idx_alloc[(c, k)]] # leftover = production - allocated # S'il est > 0 => export, s'il est < 0 => on a sur-alloué (besoin d'import net) leftover_c = production_c - allocated_c export_total += leftover_c # Net import du modèle net_import_model = sum_imp - export_total if abs(net_import) < 1: imp_dev = (net_import_model - net_import) ** 2 else: imp_dev = ((net_import_model - net_import) / (net_import + 1e-6)) ** 2 return total_dev, fert_dev, imp_dev, net_import, net_import_model, sum_imp, export_total def my_callback(xk): # xk is the current solution vector at iteration k # Evaluate any terms you want here, e.g.: f_val = objective(xk) # Possibly store separate sub-terms: diet_dev_term, fertilizer_term, imp_term, net_imp, net_import_model, sum_imp, export_total = ( compute_objective_terms(xk) ) # Append them to some global or nonlocal list iteration_log.append( { # "x": xk.copy(), "objective": f_val, "diet_dev": diet_dev_term, "fert_dev": fertilizer_term, "import term": imp_term, "net import target": net_imp, "net import model": net_import_model, "Import model": sum_imp, "Export model": export_total, } ) # -------------------------------------------------------------------------- # 4) Define constraints as a list of dicts (SLSQP form) # -------------------------------------------------------------------------- # We'll have: # (i) production_balance(c) = sum_{k} allocate(c,k) + import_export(c) - prod_c == 0 # (ii) consumption_rule(k) = sum_{c} allocate(c,k) - ingestion[k] == 0 # # The “(iii) if not authorized => allocate=0” we already excluded from x. # -------------------------------------------------------------------------- def build_min_fraction_constraints_reformulated(BETA=0.05): """ Returns constraint dict for SciPy using a reformulated inequality to avoid direct division within the constraint function. Constraint Logic: We want the allocation of a crop 'c' within a group 'G' for consumer 'k' (alloc_ck) to be at least a certain fraction (BETA) of what its proportional production (prod_c / sum_prod_g) would suggest applied to the total allocation for that group (group_alloc_kG). Original Form: alloc_ck - BETA * (prod_c / sum_prod_g) * group_alloc_kG >= 0 Reformulated : alloc_ck * sum_prod_g - BETA * prod_c * group_alloc_kG >= 0 (for sum_prod_g > 0) """ # --- Helper function to calculate production --- # (Avoid recalculating this repeatedly inside the constraint function if possible, # but here we need it dependent on x_synth which changes) def calculate_production(x_synth_val, c): # Make sure idx_synth[c] exists if you use it here, or adjust logic fert_tot = x_synth_val + nonSynthFert[c] # Assuming x_ maps directly for synth if self.prod_func == "Ratio": y_ha = Y_th_ratio(fert_tot, df_cultures.loc[c, "Ymax (kgN/ha)"]) if self.prod_func == "Linear": y_ha = Y_th_lin(fert_tot, df_cultures.loc[c, "a"], df_cultures.loc[c, "b"]) if self.prod_func == "Exponential": y_ha = Y.Y_th_exp_cap( fert_tot, df_cultures.loc[c, "Ymax (kgN/ha)"], df_cultures.loc[c, "k (kgN/ha)"] ) # Production in ktN (consistent units assumed) return (y_ha * df_cultures.loc[c, "Area (ha)"]) / 1e6 # We still need to determine the list of constraints to evaluate consistently constraint_tuples = [] for k_ in CONSUMERS: # regimes[k_] has format {proportion: [crop_list]} # We need the groups (crop_lists) themselves. Using values() is fine. for group_crops in regimes[k_].values(): # Ensure group_crops is actually a list/iterable of crop names if not isinstance(group_crops, (list, tuple, set)): # Handle potential errors in regimes structure if needed # print(f"Warning: Expected list of crops for consumer {k_}, got {group_crops}") continue for c in group_crops: # Only add constraints for crops that *can* be allocated (locally or imported) # This avoids creating constraints for crops that can never meet the condition. can_be_allocated = (c, k_) in idx_alloc or (c, k_) in idx_import if can_be_allocated: # Store (consumer, list_of_crops_in_group, specific_crop) constraint_tuples.append( (k_, tuple(group_crops), c) ) # Use tuple for hashability if needed num_constraints = len(constraint_tuples) # print(f"Building {num_constraints} min_fraction constraints.") # --- The actual constraint function passed to SciPy --- def min_fraction_fn(x_): constraints_array = np.zeros(num_constraints, dtype=float) calculated_productions = {} # Cache production calc within one function call for i, (k_, group_crops, c) in enumerate(constraint_tuples): # 1) Calculate production values for the group sum_prod_g = 0.0 prod_values = {} for c2 in group_crops: if c2 not in calculated_productions: # Ensure c2 is a valid crop index/name if c2 in idx_synth: prod_c2_val = calculate_production(x_[idx_synth[c2]], c2) calculated_productions[c2] = prod_c2_val else: # Handle case where crop might be in regimes but not synthesizable (maybe fixed production?) # For now, assume 0 production if no synth variable prod_c2_val = 0 calculated_productions[c2] = prod_c2_val prod_c2_val = calculated_productions[c2] prod_values[c2] = prod_c2_val sum_prod_g += prod_c2_val # The specific production for the crop 'c' we're constraining # Use the value already computed and stored in prod_values prod_c_val = prod_values.get( c, 0.0 ) # Default to 0 if c wasn't calculable (shouldn't happen based on loop structure) # 2) Calculate total allocation to the group for this consumer group_alloc_kG = 0.0 for c2 in group_crops: if (c2, k_) in idx_alloc: group_alloc_kG += x_[idx_alloc[(c2, k_)]] if (c2, k_) in idx_import: group_alloc_kG += x_[idx_import[(c2, k_)]] # 3) Calculate this specific crop's allocation alloc_ck = 0.0 if (c, k_) in idx_alloc: alloc_ck += x_[idx_alloc[(c, k_)]] if (c, k_) in idx_import: alloc_ck += x_[idx_import[(c, k_)]] # 4) Apply the reformulated constraint # If total production in the group is negligible, the concept # of proportional allocation breaks down. Make constraint trivially true (>=0). # Also handle the case where there's no allocation to the group. if sum_prod_g < 1e-9 or group_alloc_kG < 1e-9: # LHS must be >= 0. Setting to 0 or a small positive value is safe. constraints_array[i] = 1.0 # Or 0.0, ensures >= 0 is met else: # Reformulated: alloc_ck * sum_prod_g - BETA * prod_c * group_alloc_kG >= 0 lhs = alloc_ck * sum_prod_g - BETA * prod_c_val * group_alloc_kG constraints_array[i] = lhs return constraints_array return {"type": "ineq", "fun": min_fraction_fn} # We will define them in a wrapper that references a global or closure “x_current” # so that SLSQP can evaluate. # One standard pattern is to define a single function that returns an array of # constraints. However, SLSQP (via scipy) also supports a list of constraint dicts. def build_constraints(): cons = [] # Production >= allocations (ineq) for c in CROPS: cons.append({"type": "ineq", "fun": lambda x_, c=c: production_balance_expr(x_, c)}) # Ingestion = local + import (eq) for k_ in CONSUMERS: cons.append({"type": "eq", "fun": lambda x_, k_=k_: consumption_rule_expr(x_, k_)}) return cons def production_balance_expr(x_, c): sum_local = 0.0 for k_ in CONSUMERS: if (c, k_) in idx_alloc: sum_local += x_[idx_alloc[(c, k_)]] # production c fert_tot = x_[idx_synth[c]] + nonSynthFert[c] if self.prod_func == "Ratio": y = Y_th_ratio(fert_tot, Ymax[c]) if self.prod_func == "Linear": y = Y_th_lin(fert_tot, a[c], b[c]) if self.prod_func == "Exponential": y = Y.Y_th_exp_cap(fert_tot, Ymax[c], k_param[c]) prod_c = (y * area[c]) / 1e6 return prod_c - sum_local # doit être >= 0 def consumption_rule_expr(x_, k_): sum_local = 0.0 sum_import = 0.0 for c_ in CROPS: if (c_, k_) in idx_alloc: sum_local += x_[idx_alloc[(c_, k_)]] if (c_, k_) in idx_import: # si vous stockez les imports sous (culture, cons) sum_import += x_[idx_import[(c_, k_)]] return sum_local + sum_import - ingestion[k_] # -------------------------------------------------------------------------- # 5) Build bounds # -------------------------------------------------------------------------- # - synth_fert[c] >= 0 # - allocate[c,k] >= 0 # - import_export[c] unbounded # We must create a bounds array for each variable in x # -------------------------------------------------------------------------- bounds = [None] * n_vars for c in CROPS: # For synth_fert[c] i = idx_synth[c] if df_cultures.loc[c, "Category"] == "leguminous": bounds[i] = (0.0, 0.0) # null else: bounds[i] = (0.0, None) # nonnegative for c, k_ in allowed_ck: i = idx_alloc[(c, k_)] bounds[i] = (0.0, None) # nonnegative for c, k_ in allowed_ck: i = idx_import[(c, k_)] bounds[i] = (0.0, None) # nonnegative # -------------------------------------------------------------------------- # 6) Initial guess # -------------------------------------------------------------------------- # You can choose a naive guess, e.g. 0 for everything # or some heuristic. # -------------------------------------------------------------------------- x0 = np.array([0.01 for i in range(n_vars)]) # np.zeros(n_vars, dtype=float) if self.prod_func in ["Ratio", "Exponential"]: x0[: len(df_cultures)] = df_cultures["Ymax (kgN/ha)"].values if self.prod_func == "Linear": x0[: len(df_cultures)] = df_cultures["b"].values # -------------------------------------------------------------------------- # 7) Call the optimizer # -------------------------------------------------------------------------- cons = build_constraints() # min_frac_cons = build_min_fraction_constraints_reformulated() # cons.append(min_frac_cons) iteration_log = [] # Stage 1: quick approximate solve # res1 = minimize( # fun=objective, # x0=x0, # method="SLSQP", # bounds=bounds, # constraints=cons, # callback=my_callback, # options={"maxiter": 1000, "ftol": 1e-5, "disp": True}, # ) # from IPython import embed # embed() # Stage 2: refine from the stage 1 solution with a tighter tolerance res = minimize( fun=objective, x0=x0, method="SLSQP", bounds=bounds, constraints=cons, callback=my_callback, options={"maxiter": 1000, "ftol": 1e-5, "disp": True, "eps": 1e-6}, ) self.log = iteration_log x_opt = res.x # On crée les nouvelles colonnes à zéro par défaut df_cultures["Surface Synthetic Fertilizer Use (kgN/ha)"] = 0.0 df_cultures["Synthetic Fertilizer Use (ktN)"] = 0.0 # Parcours de toutes les cultures for c in df_cultures.index: if df_cultures.at[c, "Category"] != "leguminous": # 1) Récupérer la fertilisation synthétique du solveur, en kgN/ha # -> synth_fert_sol[c] = variable du solveur if c in idx_synth and x_opt[idx_synth[c]] > 1e-1: fert_synth = x_opt[idx_synth[c]] else: fert_synth = 0.0 # On met à jour la colonne "Surface Synthetic Fertilizer Use (kgN/ha)" fert_synth_with_volat = ( fert_synth * 1 / (0.99 - 0.01 * 0.01) ) # Pour prendre en compte l'azote synthétique volatilizé et non consommé par la plante df_cultures.at[c, "Surface Synthetic Fertilizer Use (kgN/ha)"] = fert_synth_with_volat # 2) Calculer la fertilisation synthétique totale en ktN area_ha = df_cultures.at[c, "Area (ha)"] total_synth_ktN = (fert_synth_with_volat * area_ha) / 1e6 # (kgN/ha * ha) / 1e6 = ktN df_cultures.at[c, "Synthetic Fertilizer Use (ktN)"] = total_synth_ktN # 3) Mettre à jour rendement et production # On calcule la fertilisation totale (synthétique + éventuel non-synthétique) non_synth_fert = df_cultures.at[c, "Surface Non Synthetic Fertilizer Use (kgN/ha)"] fert_tot = fert_synth + non_synth_fert if self.prod_func == "Ratio": y_max = df_cultures.at[c, "Ymax (kgN/ha)"] new_yield = Y_th_ratio(fert_tot, y_max) if self.prod_func == "Linear": a = df_cultures.at[c, "a"] b = df_cultures.at[c, "b"] new_yield = Y_th_lin(fert_tot, a, b) if self.prod_func == "Exponential": y_max = df_cultures.at[c, "Ymax (kgN/ha)"] k_param = df_cultures.at[c, "k (kgN/ha)"] new_yield = Y.Y_th_exp_cap(fert_tot, y_max, k_param) df_cultures.at[c, "Yield (kgN/ha)"] = new_yield # Production en ktN nitro_prod_ktN = (new_yield * area_ha) / 1e6 df_cultures.at[c, "Nitrogen Production (ktN)"] = nitro_prod_ktN else: # Pour les légumineuses, on ne touche pas Yield ni Nitrogen Production pass ## Azote synthétique volatilisé par les terres # Est ce qu'il n'y a que l'azote synthétique qui est volatilisé ? coef_volat_NH3 = ( technical.loc[ technical["Variable"] == "NH3 volatilization coefficient for synthetic nitrogen", "Business as usual", ].item() / 100 ) coef_volat_N2O = 0.01 # 1 % des emissions de NH3 du aux fert. synth sont volatilisées sous forme de N2O df_cultures["Volatilized Nitrogen N-NH3 (ktN)"] = ( df_cultures["Synthetic Fertilizer Use (ktN)"] * 0.99 * coef_volat_NH3 ) df_cultures["Volatilized Nitrogen N-N2O (ktN)"] = df_cultures["Synthetic Fertilizer Use (ktN)"] * ( coef_volat_N2O + 0.01 * coef_volat_NH3 ) df_cultures["Synthetic Fertilizer Use (ktN)"] = df_cultures["Synthetic Fertilizer Use (ktN)"] * ( 1 - coef_volat_NH3 - coef_volat_N2O ) source = {"Haber-Bosch": 1} target = df_cultures["Synthetic Fertilizer Use (ktN)"].to_dict() flux_generator.generate_flux(source, target) source = df_cultures["Volatilized Nitrogen N-NH3 (ktN)"].to_dict() target = {"NH3 volatilization": 1} flux_generator.generate_flux(source, target) source = df_cultures["Volatilized Nitrogen N-N2O (ktN)"].to_dict() target = {"N2O emission": 1} flux_generator.generate_flux(source, target) # A cela on ajoute les emissions indirectes de N2O lors de la fabrication des engrais # epend_tot_synt = ( # df_cultures["Volatilized Nitrogen N-NH3 (ktN)"] # + df_cultures["Volatilized Nitrogen N-N2O (ktN)"] # + df_cultures["Adjusted Total Synthetic Fertilizer Use (ktN)"] # ).sum() epend_tot_synt = df_cultures["Synthetic Fertilizer Use (ktN)"].sum() coef_emis_N_N2O = ( technical.loc[ technical["Variable"] == "Indirect N2O volatilization coefficient for synthetic nitrogen", "Business as usual", ].item() / 100 ) target = {"N2O emission": 1} source = {"Haber-Bosch": epend_tot_synt * coef_emis_N_N2O} flux_generator.generate_flux(source, target) # Azote issu de la partie non comestible des carcasses source_non_comestible = df_elevage["Non Edible Nitrogen (ktN)"].to_dict() target_other_sectors = {"other sectors": 1} flux_generator.generate_flux(source_non_comestible, target_other_sectors) allocations = [] for (c, k), idx in idx_alloc.items(): val = x_opt[idx] if val > 1e-6: # Décider du "Type" (local feed vs local food) # Par exemple, si 'k' fait partie d'un set df_elevage.index => feed if k in df_elevage.index: type_ = "Local culture feed" else: type_ = "Local culture food" allocations.append( { "Culture": c, "Consumer": k, "Allocated Nitrogen": val, "Type": type_, } ) # Idem pour les imports (I, E). # Import feed: for (c, k), idx in idx_import.items(): val = x_opt[idx] if k in df_elevage.index: label = "Feed" else: label = "Food" if val > 1e-6: allocations.append( { "Culture": c, "Consumer": k, "Allocated Nitrogen": val, "Type": f"Imported {label}", } ) allocations_df = pd.DataFrame(allocations) self.allocations_df = allocations_df self.allocation_vege = allocations_df df_cultures["Yield (qtl/ha)"] = ( df_cultures["Yield (kgN/ha)"] / (df_cultures["Nitrogen Content (%)"] / 100) / 100 # Conversion de kg en qtl (100kg) ) df_cultures["Nitrogen for Feed (ktN)"] = 0.0 df_cultures["Nitrogen for Food (ktN)"] = 0.0 group_sums = allocations_df.groupby(["Culture", "Type"])["Allocated Nitrogen"].sum() table_sums = group_sums.unstack(fill_value=0.0) for c in table_sums.index: if "Local culture feed" in table_sums.columns: df_cultures.at[c, "Nitrogen for Feed (ktN)"] = table_sums.at[c, "Local culture feed"] if "Local culture food" in table_sums.columns: df_cultures.at[c, "Nitrogen for Food (ktN)"] = table_sums.at[c, "Local culture food"] df_cultures["Available Nitrogen After Feed and Food (ktN)"] = ( df_cultures["Nitrogen Production (ktN)"] - df_cultures["Nitrogen for Feed (ktN)"] - df_cultures["Nitrogen for Food (ktN)"] ) # Mise à jour de df_elevage df_elevage["Consummed Nitrogen from local feed (ktN)"] = 0.0 df_elevage["Consummed Nitrogen from imported feed (ktN)"] = 0.0 group_sums = allocations_df.groupby(["Consumer", "Type"])["Allocated Nitrogen"].sum() table_sums = group_sums.unstack(fill_value=0.0) for k in df_elevage.index: if "Local culture feed" in table_sums.columns: df_elevage.at[k, "Consummed Nitrogen from local feed (ktN)"] = table_sums.at[ k, "Local culture feed" ] if "Imported Feed" in table_sums.columns: df_elevage.at[k, "Consummed Nitrogen from imported feed (ktN)"] = table_sums.at[k, "Imported Feed"] deviations_list = [] for k_ in CONSUMERS: # Somme totale (local + import) pour le conso k_ denom_k = 0.0 for c_ in CROPS: if (c_, k_) in idx_alloc: denom_k += x_opt[idx_alloc[(c_, k_)]] if (c_, k_) in idx_import: denom_k += x_opt[idx_import[(c_, k_)]] for p_ideal, c_list in regimes[k_].items(): group_alloc = 0.0 for c_ in c_list: if (c_, k_) in idx_alloc: group_alloc += x_opt[idx_alloc[(c_, k_)]] if (c_, k_) in idx_import: group_alloc += x_opt[idx_import[(c_, k_)]] if denom_k < 1e-6: proportion_real = 0.0 else: proportion_real = group_alloc / denom_k deviation = proportion_real - p_ideal deviations_list.append( { "Consumer": k_, "Cultures": ", ".join(c_list), "Expected Proportion (%)": p_ideal * 100, "Allocated Proportion (%)": proportion_real * 100, "Deviation (%)": deviation * 100, } ) df_deviation = pd.DataFrame(deviations_list) self.deviations_df = df_deviation # Génération des flux pour les cultures locales allocations_locales = allocations_df[ allocations_df["Type"].isin(["Local culture food", "Local culture feed"]) ] for cons in df_cons_vege.index: target = {cons: 1} source = ( allocations_locales[allocations_locales["Consumer"] == cons] .set_index("Culture")["Allocated Nitrogen"] .to_dict() ) if source: flux_generator.generate_flux(source, target) # Génération des flux pour les importations allocations_imports = allocations_df[ allocations_df["Type"].isin(["Imported Feed", "Imported Food", "Excess feed imports"]) ] for cons in df_cons_vege.index: target = {cons: 1} cons_vege_imports = allocations_imports[allocations_imports["Consumer"] == cons] # Initialisation d'un dictionnaire pour collecter les flux par catégorie flux = {} for _, row in cons_vege_imports.iterrows(): culture = row["Culture"] azote_alloue = row["Allocated Nitrogen"] # Récupération de la catégorie de la culture categorie = df_cultures.loc[culture, "Category"] # Construction du label source pour l'importation if cons in ["urban", "rural"]: label_source = f"{categorie} food trade" else: label_source = f"{categorie} feed trade" # Accumuler les flux par catégorie if label_source in flux: flux[label_source] += azote_alloue else: flux[label_source] = azote_alloue # Génération des flux pour l'élevage if sum(flux.values()) > 0: flux_generator.generate_flux(flux, target) # On redonne à df_elevage sa forme d'origine et à import_feed_net sa vraie valeur # Utiliser `infer_objects(copy=False)` pour éviter l'avertissement sur le downcasting df_elevage = df_elevage.combine_first(df_elevage_comp) # Remplir les valeurs manquantes avec 0 df_elevage = df_elevage.fillna(0) # Inférer les types pour éviter le warning sur les colonnes object df_elevage = df_elevage.infer_objects(copy=False) # feed_export = import_feed - import_feed_net # flux_exported = {} # if feed_export > 10**-6: # On a importé plus que les imports net, la diff est l'export de feed # feed_export = min( # feed_export, # df_cultures["Available Nitrogen After Feed and Food (ktN)"].sum(), # ) # Patch pour gérer les cas où on a une surexportation (cf Bretagne 2010) # # On distingue les exports de feed prioritaires (prairies et fourrages) au reste # # On distingue le cas où il y a assez dans les exports prioritaires pour couvrir # # les export de feed au cas où il faut en plus exporter les autres cultures (mais d'abord les exports prio) # if ( # feed_export # > df_cultures.loc[ # df_cultures["Category"].isin(["forages", "temporary meadows"]), # "Available Nitrogen After Feed and Food (ktN)", # ].sum() # ): # feed_export_prio = df_cultures.loc[ # df_cultures["Category"].isin(["forages", "temporary meadows"]), # "Available Nitrogen After Feed and Food (ktN)", # ].sum() # feed_export_other = feed_export - feed_export_prio # else: # feed_export_prio = feed_export # feed_export_other = 0 # # Répartition de l'azote exporté inutilisé par catégorie # # On fait un premier tour sur les cultures prioritaires # for culture in df_cultures.loc[df_cultures["Category"].isin(["forages", "temporary meadows"])].index: # categorie = df_cultures.loc[df_cultures.index == culture, "Category"].item() # # On exporte pas en feed des catégories dédiées aux humains # if categorie not in ["rice", "fruits and vegetables", "roots"]: # # Calculer la quantité exportée par catégorie proportionnellement aux catégories présentes dans df_cultures # culture_nitrogen_available = df_cultures.loc[df_cultures.index == culture][ # "Available Nitrogen After Feed and Food (ktN)" # ].item() # if culture_nitrogen_available > 0: # flux_exported[culture] = feed_export_prio * ( # culture_nitrogen_available # / df_cultures["Available Nitrogen After Feed and Food (ktN)"].sum() # ) # # On écoule le reste des export de feed (si il y en a) sur les autres cultures # if feed_export_other > 10**-6: # for culture in df_cultures.loc[ # ~df_cultures["Category"].isin(["forages", "temporary meadows"]) # ].index: # categorie = df_cultures.loc[df_cultures.index == culture, "Category"].item() # # On exporte pas en feed des catégories dédiées aux humains # if categorie not in ["rice", "fruits and vegetables", "roots"]: # # Calculer la quantité exportée par catégorie proportionnellement aux catégories présentes dans df_cultures # culture_nitrogen_available = df_cultures.loc[df_cultures.index == culture][ # "Available Nitrogen After Feed and Food (ktN)" # ].item() # if culture_nitrogen_available > 0: # flux_exported[culture] = feed_export_prio * ( # culture_nitrogen_available # / df_cultures["Available Nitrogen After Feed and Food (ktN)"].sum() # ) # # Générer des flux les exportations vers leur catégorie d'origine # for label_source, azote_exported in flux_exported.items(): # if azote_exported > 0: # categorie = df_cultures.loc[df_cultures.index == label_source, "Category"].item() # label_target = f"{categorie} feed trade" # target = {label_target: 1} # source = {label_source: azote_exported} # flux_generator.generate_flux(source, target) # # Mise à jour du DataFrame avec les quantités exportées # df_cultures["Nitrogen Exported For Feed (ktN)"] = df_cultures.index.map(flux_exported).fillna( # 0 # ) # df_cultures.index.map(source).fillna(0) # df_cultures["Available Nitrogen After Feed, Export Feed and Food (ktN)"] = ( # df_cultures["Available Nitrogen After Feed and Food (ktN)"] # - df_cultures["Nitrogen Exported For Feed (ktN)"] # ).apply(lambda x: 0 if abs(x) < 1e-6 else x) # import/export food # Le surplus est food exporté (ou stocké mais cela ne nous regarde pas) for idx, row in df_cultures.iterrows(): culture = row.name categorie = df_cultures.loc[df_cultures.index == culture, "Category"].item() if categorie not in ["temporary meadows", "forages", "natural meadows "]: source = { culture: df_cultures.loc[ df_cultures.index == culture, "Available Nitrogen After Feed and Food (ktN)", ].item() } target = {f"{categorie} food trade": 1} elif ( culture != "Natural meadow " ): # TODO Que faire des production de feed qui ne sont ni consommées ni exportées ? Pour l'instant on les exporte.... # NOOOON Il faut les laisser retourner en terre si c'est une prairie naturelle (recommandation de JLN) source = { culture: df_cultures.loc[ df_cultures.index == culture, "Available Nitrogen After Feed and Food (ktN)", ].item() } target = {f"{categorie} feed trade": 1} print(culture) print(categorie) else: source = { culture: df_cultures.loc[ df_cultures.index == culture, "Available Nitrogen After Feed and Food (ktN)", ].item() } target = {"soil stock": 1} flux_generator.generate_flux(source, target) # Que faire d'eventuel surplus de prairies ou forage ? # Sur recommandation de JLN c'est retourné vers le sol pour les prairies permanentes et exporté pour tout le reste ## Usage de l'azote animal pour nourir la population, on pourrait améliorer en distinguant viande, oeufs et lait viande_cap = main.loc[ main["Variable"] == "Edible animal per capita protein ingestion (excl fish)", "Business as usual" ].item() cons_viande = viande_cap * pop # Reflechir a considerer un regime alimentaire carne (national) apres 1960 if cons_viande < df_elevage["Edible Nitrogen (ktN)"].sum(): # Il y a assez de viande locale target = { "urban": prop_urb * cons_viande, "rural": (1 - prop_urb) * cons_viande, } source = (df_elevage["Edible Nitrogen (ktN)"] / df_elevage["Edible Nitrogen (ktN)"].sum()).to_dict() df_elevage["Net animal nitrogen exports (ktN)"] = df_elevage[ "Edible Nitrogen (ktN)" ] - df_elevage.index.map(source) * sum(target.values()) flux_generator.generate_flux(source, target) else: # On commence par consommer tout l'azote disponible target = {"urban": prop_urb, "rural": (1 - prop_urb)} source = df_elevage["Edible Nitrogen (ktN)"].to_dict() flux_generator.generate_flux(source, target) cons_viande_import = cons_viande - df_elevage["Edible Nitrogen (ktN)"].sum() commerce_path = "FAOSTAT_data_fr_viande_import.csv" commerce = pd.read_csv(os.path.join(self.data_path, commerce_path)) if ( int(year) < 1965 ): # Si on est avant 65, on se base sur les rations de 65. De toute façon ça concerne des import minoritaires... year = "1965" commerce = commerce.loc[commerce["Année"] == int(year), ["Produit", "Valeur"]] corresp_dict = { "Viande, bovine, fraîche ou réfrigérée": "bovines", "Viande ovine, fraîche ou réfrigérée": "ovines", "Viande, caprin, fraîche ou réfrigérée": "caprines", "Viande, cheval, fraîche ou réfrigérée": "equine", "Viande, porc, fraîche ou réfrigérée": "porcines", "Viande, poulet, fraîche ou réfrigérée": "poultry", } commerce["Produit"] = commerce["Produit"].map(corresp_dict).fillna(commerce["Produit"]) commerce["Ratio"] = commerce["Valeur"] / commerce["Valeur"].sum() commerce.index = commerce["Produit"] target = { "urban": prop_urb * cons_viande_import, "rural": (1 - prop_urb) * cons_viande_import, } source = { "animal trade": 1 } # commerce["Ratio"].to_dict() On peut distinguer les différents types d'azote importé flux_generator.generate_flux(source, target) # Et on reporte ce qu'il manque dans la colonne "Azote animal exporté net" df_elevage["Net animal nitrogen exports (ktN)"] = -commerce["Ratio"] * (cons_viande_import) if cons_viande < df_elevage["Edible Nitrogen (ktN)"].sum(): source = df_elevage["Net animal nitrogen exports (ktN)"].to_dict() target = {"animal trade": 1} flux_generator.generate_flux(source, target) # Calcul des déséquilibres négatifs for label in cultures + legumineuses + prairies: node_index = label_to_index[label] row_sum = adjacency_matrix[node_index, :].sum() col_sum = adjacency_matrix[:, node_index].sum() imbalance = row_sum - col_sum # Déséquilibre entre sorties et entrées if abs(imbalance) < 10**-4: imbalance = 0 if ( imbalance > 0 ): # Que conclure si il y a plus de sortie que d'entrée ? Que l'on détériore les réserves du sol ? # print(f"pb de balance avec {label}") # Plus de sorties que d'entrées, on augmente les entrées # new_adjacency_matrix[n, node_index] = imbalance # Flux du nœud de balance vers la culture target = {label: imbalance} source = {"soil stock": 1} flux_generator.generate_flux(source, target) elif imbalance < 0: # Plus d'entrées que de sorties, on augmente les sorties # adjacency_matrix[node_index, n] = -imbalance # Flux de la culture vers le nœud de balance if label != "Natural meadow ": # 70% de l'excès fini dans les ecosystèmes aquatiques source = {label: -imbalance} # Ajouter soil stock parmis les surplus de fertilisation. target = { "other losses": 0.2925, "hydro-system": 0.7, "N2O emission": 0.0075, } else: if ( imbalance * 10**6 / df_cultures.loc[df_cultures.index == "Natural meadow ", "Area (ha)"].item() > 100 ): # Si c'est une prairie, l'azote est lessivé seulement au dela de 100 kgN/ha source = { label: -imbalance - 100 * df_cultures.loc[df_cultures.index == "Natural meadow ", "Area (ha)"].item() / 10**6 } target = { "other losses": 0.2925, "hydro-system": 0.7, "N2O emission": 0.0075, } flux_generator.generate_flux(source, target) source = { label: 100 * df_cultures.loc[df_cultures.index == "Natural meadow ", "Area (ha)"].item() / 10**6 } target = {label: 1} else: # Autrement, l'azote reste dans le sol (cas particulier, est ce que cela a du sens, quid des autres cultures ?) source = {label: -imbalance} target = {"soil stock": 1} flux_generator.generate_flux(source, target) # Si imbalance == 0, aucun ajustement nécessaire # Calcul de imbalance dans df_cultures df_cultures["Balance (ktN)"] = ( df_cultures["Synthetic Fertilizer Use (ktN)"] + df_cultures["Total Non Synthetic Fertilizer Use (ktN)"] - df_cultures["Nitrogen Production (ktN)"] - df_cultures["Volatilized Nitrogen N-NH3 (ktN)"] - df_cultures["Volatilized Nitrogen N-N2O (ktN)"] # Pas de volat sous forme de N2 ? ) # On équilibre Haber-Bosch avec atmospheric N2 pour le faire entrer dans le système target = {"Haber-Bosch": self.adjacency_matrix[label_to_index["Haber-Bosch"], :].sum()} source = {"atmospheric N2": 1} flux_generator.generate_flux(source, target) df_elevage["Conversion factor (%)"] = ( df_elevage["Edible Nitrogen (ktN)"] + df_elevage["Non Edible Nitrogen (ktN)"] ) / df_elevage["Ingestion (ktN)"] from grafs_e.prospective import scenario LU = scenario.livestock_LU(self.data_loader, self.region)[self.year] LU["equine"] = LU.pop("equines") df_elevage["LU"] = LU # On ajoute une ligne total à df_cultures et df_elevage colonnes_a_exclure = [ "Spreading Rate (%)", "Nitrogen Content (%)", "Seed input (kt seeds/kt Ymax)", "Category", "N fixation coef (kgN/kgN)", "Fertilization Need (kgN/qtl)", "Surface Fertilization Need (kgN/ha)", "Yield (qtl/ha)", "Yield (kgN/ha)", "Surface Non Synthetic Fertilizer Use (kgN/ha)", "Raw Surface Synthetic Fertilizer Use (ktN/ha)", ] colonnes_a_sommer = df_cultures.columns.difference(colonnes_a_exclure) total = df_cultures[colonnes_a_sommer].sum() total.name = "Total" self.df_cultures_display = pd.concat([df_cultures, total.to_frame().T]) colonnes_a_exclure = [ "% edible", "% excreted indoors", "% excreted indoors as manure", "% excreted indoors as slurry", "% excreted on grassland", "% non edible", "%N dairy", "N-N2 EM. manure indoor", "N-N2 EM. outdoor", "N-N2 EM. slurry indoor", "N-N2O EM. manure indoor", "N-N2O EM. outdoor", "N-N2O EM. slurry indoor", "N-NH3 EM. manure indoor", "N-NH3 EM. outdoor", "N-NH3 EM. slurry indoor", "Conversion factor (%)", ] colonnes_a_sommer = df_elevage.columns.difference(colonnes_a_exclure) total = df_elevage[colonnes_a_sommer].sum() total.name = "Total" self.df_elevage_display = pd.concat([df_elevage, total.to_frame().T]) self.df_cultures = df_cultures self.df_elevage = df_elevage
# self.adjacency_matrix = adjacency_matrix
[docs] def get_df_culture(self): """ Returns the DataFrame containing crop-related data. :return: A pandas DataFrame with crop data used in the nitrogen model. :rtype: pandas.DataFrame """ return self.df_cultures
[docs] def get_df_elevage(self): """ Returns the DataFrame containing livestock-related data. :return: A pandas DataFrame with livestock data used in the nitrogen model. :rtype: pandas.DataFrame """ return self.df_elevage
[docs] def get_transition_matrix(self): """ Returns the full nitrogen transition matrix. This matrix represents all nitrogen fluxes between sectors, including core and external processes. :return: A 2D NumPy array representing nitrogen fluxes between all sectors. :rtype: numpy.ndarray """ return self.adjacency_matrix
[docs] def get_core_matrix(self): """ Extracts and returns the core matrix of nitrogen fluxes between active sectors. This method filters out rows and columns with no flows and excludes external sectors. The result isolates the central dynamics of the system. :return: A 2D NumPy array of the filtered core matrix. :rtype: numpy.ndarray """ # Calcul de la taille du noyau core_size = len(self.adjacency_matrix) - len(self.ext) # Extraire la matrice principale (noyau) core_matrix = self.adjacency_matrix[:core_size, :core_size] # Calculer la somme des éléments sur chaque ligne row_sums = core_matrix.sum(axis=1) # Identifier les indices des lignes où la somme est non nulle non_zero_rows = row_sums != 0 # Identifier les indices des colonnes à garder (les mêmes indices que les lignes non nulles) non_zero_columns = non_zero_rows # Filtrer les lignes et les colonnes avec une somme non nulle core_matrix_filtered = core_matrix[non_zero_rows, :][:, non_zero_columns] # Retourner la matrice filtrée self.core_matrix = core_matrix_filtered self.non_zero_rows = non_zero_rows return core_matrix_filtered
[docs] def get_adjacency_matrix(self): """ Returns the binary adjacency matrix of nitrogen fluxes. This matrix has the same dimensions as the core matrix and indicates the presence (1) or absence (0) of nitrogen fluxes between sectors. :return: A binary adjacency matrix. :rtype: numpy.ndarray """ _ = self.get_core_matrix() return (self.core_matrix != 0).astype(int)
[docs] def extract_input_output_matrixs(self, clean=True): """ Extracts input and output matrices (C and B blocks) from the full transition matrix. These matrices represent the nitrogen flows between core and external sectors: - B: Outputs from core to external sectors. - C: Inputs from external to core sectors. If `clean` is True, the method removes rows and columns corresponding to inactive sectors. :param clean: Whether to remove zero-flow sectors from the matrices. :type clean: bool :return: A tuple (B, C) of NumPy arrays. :rtype: tuple[numpy.ndarray, numpy.ndarray] """ # Fonction pour extraire la matrice entrée (C) et la matrice sortie (B) de la matrice complète. # Taille de la matrice coeur core_size = len(self.adjacency_matrix) - len(self.ext) n = len(self.adjacency_matrix) # Extraire la sous-matrice B (bloc haut-droit) B = self.adjacency_matrix[:core_size, core_size:n] # Extraire la sous-matrice C (bloc bas-gauche) C = self.adjacency_matrix[core_size:n, :core_size] if clean: C = C[:][:, self.non_zero_rows] B = B[self.non_zero_rows, :][:] return B, C
[docs] def imported_nitrogen(self): """ Calculates the total amount of nitrogen imported into the system. Includes nitrogen in imported food, feed, and excess feed. :return: Total imported nitrogen (in ktN). :rtype: float """ return self.allocation_vege.loc[ self.allocation_vege["Type"].isin(["Imported Food", "Imported Feed", "Excess feed imports"]), "Allocated Nitrogen", ].sum()
[docs] def net_imported_plant(self): """ Computes the net nitrogen imports for plant sectors. Calculated as the difference between total nitrogen imports and plant sector availability after local uses (feed and food). :return: Net nitrogen import for plant-based products (in ktN). :rtype: float """ return ( self.importations_df["Imported Nitrogen (ktN)"].sum() - self.df_cultures["Available Nitrogen After Feed and Food (ktN)"].sum() )
[docs] def net_imported_animal(self): """ Returns the net nitrogen export for animal sectors. :return: Total nitrogen exported via animal products (in ktN). :rtype: float """ return self.df_elevage["Net animal nitrogen exports (ktN)"].sum()
[docs] def total_plant_production(self): """ Computes the total nitrogen production from all crop categories. :return: Total nitrogen produced by crops (in ktN). :rtype: float """ return self.df_cultures["Nitrogen Production (ktN)"].sum()
[docs] def stacked_plant_production(self): """ Returns the vector of nitrogen production by crop category. :return: A pandas Series of nitrogen production per crop. :rtype: pandas.Series """ return self.df_cultures["Nitrogen Production (ktN)"]
[docs] def cereals_production(self): """ Returns the nitrogen production from cereal crops. :return: Total nitrogen from cereals (in ktN). :rtype: float """ return self.df_cultures.loc[ self.df_cultures["Category"].isin(["cereals (excluding rice)", "rice"]), "Nitrogen Production (ktN)" ].sum()
[docs] def leguminous_production(self): """ Returns the nitrogen production from leguminous crops. :return: Total nitrogen from leguminous (in ktN). :rtype: float """ return self.df_cultures.loc[ self.df_cultures["Category"].isin(["leguminous"]), "Nitrogen Production (ktN)" ].sum()
[docs] def oleaginous_production(self): """ Returns the nitrogen production from oleaginous crops. :return: Total nitrogen from oleaginous (in ktN). :rtype: float """ return self.df_cultures.loc[ self.df_cultures["Category"].isin(["oleaginous"]), "Nitrogen Production (ktN)" ].sum()
[docs] def grassland_and_forages_production(self): """ Returns the nitrogen production from grassland and forages crops. :return: Total nitrogen from grassland and forages (in ktN). :rtype: float """ return self.df_cultures.loc[ self.df_cultures["Category"].isin(["temporary meadows", "natural meadows ", "forages"]), "Nitrogen Production (ktN)", ].sum()
[docs] def roots_production(self): """ Returns the nitrogen production from roots crops. :return: Total nitrogen from roots (in ktN). :rtype: float """ return self.df_cultures.loc[self.df_cultures["Category"].isin(["roots"]), "Nitrogen Production (ktN)"].sum()
[docs] def fruits_and_vegetable_production(self): """ Returns the nitrogen production from fruits and vegetables crops. :return: Total nitrogen from fruits and vegetables (in ktN). :rtype: float """ return self.df_cultures.loc[ self.df_cultures["Category"].isin(["fruits and vegetables"]), "Nitrogen Production (ktN)" ].sum()
[docs] def cereals_production_r(self): """ Returns the share of nitrogen production from cereals relative to total plant production. :return: Percentage of total plant nitrogen production from cereals. :rtype: float """ return ( self.df_cultures.loc[ self.df_cultures["Category"].isin(["cereals (excluding rice)", "rice"]), "Nitrogen Production (ktN)" ].sum() * 100 / self.total_plant_production() )
[docs] def leguminous_production_r(self): """ Returns the share of nitrogen production from leguminous relative to total plant production. :return: Percentage of total plant nitrogen production from leguminous. :rtype: float """ return ( self.df_cultures.loc[self.df_cultures["Category"].isin(["leguminous"]), "Nitrogen Production (ktN)"].sum() * 100 / self.total_plant_production() )
[docs] def oleaginous_production_r(self): """ Returns the share of nitrogen production from oleaginous relative to total plant production. :return: Percentage of total plant nitrogen production from oleaginous. :rtype: float """ return ( self.df_cultures.loc[self.df_cultures["Category"].isin(["oleaginous"]), "Nitrogen Production (ktN)"].sum() * 100 / self.total_plant_production() )
[docs] def grassland_and_forages_production_r(self): """ Returns the share of nitrogen production from forages relative to total plant production. :return: Percentage of total plant nitrogen production from forages. :rtype: float """ return ( self.df_cultures.loc[ self.df_cultures["Category"].isin(["temporary meadows", "natural meadows", "forages"]), "Nitrogen Production (ktN)", ].sum() * 100 / self.total_plant_production() )
[docs] def roots_production_r(self): """ Returns the share of nitrogen production from roots relative to total plant production. :return: Percentage of total plant nitrogen production from roots. :rtype: float """ return ( self.df_cultures.loc[self.df_cultures["Category"].isin(["roots"]), "Nitrogen Production (ktN)"].sum() * 100 / self.total_plant_production() )
[docs] def fruits_and_vegetable_production_r(self): """ Returns the share of nitrogen production from fruits and vegetables relative to total plant production. :return: Percentage of total plant nitrogen production from fruits and vegetables. :rtype: float """ return ( self.df_cultures.loc[ self.df_cultures["Category"].isin(["fruits and vegetables"]), "Nitrogen Production (ktN)" ].sum() * 100 / self.total_plant_production() )
[docs] def animal_production(self): """ Returns the total edible nitrogen produced by livestock sectors. :return: Total nitrogen in edible animal products (in ktN). :rtype: float """ return self.df_elevage["Edible Nitrogen (ktN)"].sum()
[docs] def emissions(self): """ Computes the total nitrogen emissions from the system. Includes N₂O emissions, atmospheric N₂ release, and NH₃ volatilization, with unit conversions. :return: A pandas Series with nitrogen emission quantities. :rtype: pandas.Series """ return pd.Series( { "N2O emission": np.round( self.adjacency_matrix[:, label_to_index["N2O emission"]].sum() * (14 * 2 + 16) / (14 * 2), 2 ), "atmospheric N2": np.round(self.adjacency_matrix[:, label_to_index["atmospheric N2"]].sum(), 2), "NH3 volatilization": np.round( self.adjacency_matrix[:, label_to_index["NH3 volatilization"]].sum() * 17 / 14, 2 ), }, name="Emission", ).to_frame()["Emission"]
[docs] def surfaces(self): """ Returns the cultivated area per crop. :return: A pandas Series with area per crop (in hectares). :rtype: pandas.Series """ return self.df_cultures["Area (ha)"]
[docs] def surfaces_tot(self): """ Returns the total cultivated area in the model. :return: Total area (in hectares). :rtype: float """ return self.df_cultures["Area (ha)"].sum()
[docs] def N_eff(self): return gr.GraphAnalyzer.calculate_Neff(self.adjacency_matrix)
[docs] def C_eff(self): return gr.GraphAnalyzer.calculate_Ceff(self.adjacency_matrix)
[docs] def F_eff(self): return gr.GraphAnalyzer.calculate_Feff(self.adjacency_matrix)
[docs] def R_eff(self): return gr.GraphAnalyzer.calculate_Reff(self.adjacency_matrix)
[docs] def Ftot(self, culture): area = self.df_cultures.loc[self.df_cultures.index == culture, "Area (ha)"].item() if area == 0: # Vérification pour éviter la division par zéro return 0 return self.adjacency_matrix[:, label_to_index[culture]].sum() * 1e6 / area
[docs] def Y(self, culture): """ Computes the nitrogen yield of a given crop. Yield is calculated as nitrogen production (kgN) per hectare for the specified crop. :param culture: The name of the crop (index of `df_cultures`). :type culture: str :return: Nitrogen yield in kgN/ha. :rtype: float """ area = self.df_cultures.loc[self.df_cultures.index == culture, "Area (ha)"].item() if area == 0: # Vérification pour éviter la division par zéro return 0 return self.df_cultures.loc[self.df_cultures.index == culture, "Nitrogen Production (ktN)"].item() * 1e6 / area
[docs] def tot_fert(self): """ Computes total nitrogen inputs to the system, broken down by origin. Categories include animal and human excretion, atmospheric deposition, Haber-Bosch inputs, leguminous enrichment, etc. :return: A pandas Series of nitrogen inputs by source (in ktN). :rtype: pandas.Series """ return pd.Series( { "Mining": self.adjacency_matrix[label_to_index["soil stock"], :].sum(), "Seeds": self.adjacency_matrix[label_to_index["other sectors"], :].sum(), "Human excretion": self.adjacency_matrix[ label_to_index["urban"] : label_to_index["rural"] + 1, label_to_index["Wheat"] : label_to_index["Natural meadow "] + 1, ].sum(), "Leguminous soil enrichment": self.adjacency_matrix[ label_to_index["Horse beans and faba beans"] : label_to_index["Alfalfa and clover"] + 1, label_to_index["Wheat"] : label_to_index["Natural meadow "] + 1, ].sum(), "Haber-Bosch": self.adjacency_matrix[label_to_index["Haber-Bosch"], :].sum(), "Atmospheric deposition": self.adjacency_matrix[ label_to_index["N2O emission"], : label_to_index["Natural meadow "] + 1 ].sum() + self.adjacency_matrix[ label_to_index["NH3 volatilization"], : label_to_index["Natural meadow "] + 1 ].sum(), "atmospheric N2": self.adjacency_matrix[ label_to_index["atmospheric N2"], label_to_index["Wheat"] : label_to_index["Natural meadow "] + 1 ].sum(), "Animal excretion": self.adjacency_matrix[ label_to_index["bovines"] : label_to_index["equine"] + 1, label_to_index["Wheat"] : label_to_index["Natural meadow "] + 1, ].sum(), } )
[docs] def rel_fert(self): """ Computes the relative share (%) of each nitrogen input source. :return: A pandas Series with nitrogen input sources as percentage of the total. :rtype: pandas.Series """ df = self.tot_fert() return df * 100 / df.sum()
[docs] def primXsec(self): """ Calculates the percentage of nitrogen from secondary sources (biological or recycled), compared to the total nitrogen inputs. Secondary sources include: human excretion, animal excretion, atmospheric inputs, seeds, and leguminous fixation. :return: Share of secondary sources in total nitrogen inputs (%). :rtype: float """ df = self.tot_fert() return ( ( df["Human excretion"].sum() + df["Animal excretion"].sum() + df["atmospheric N2"].sum() + df["Atmospheric deposition"].sum() + df["Seeds"].sum() + df["Leguminous soil enrichment"].sum() ) * 100 / df.sum() )
[docs] def NUE(self): """ Calculates the crop-level nitrogen use efficiency (NUE). Defined as the ratio of nitrogen produced by crops over total nitrogen inputs. :return: NUE of crop systems (%). :rtype: float """ df = self.tot_fert() return self.df_cultures["Nitrogen Production (ktN)"].sum() * 100 / df.sum()
[docs] def NUE_system(self): """ Calculates system-wide nitrogen use efficiency, including crop and livestock production. Accounts for feed losses and nitrogen consumed via imported feed. :return: System-wide NUE (%). :rtype: float """ N_NP = ( self.df_cultures["Nitrogen Production (ktN)"].sum() - self.df_cultures["Nitrogen For Feed (ktN)"].sum() + self.df_elevage["Edible Nitrogen (ktN)"].sum() + self.df_elevage["Non Edible Nitrogen (ktN)"].sum() ) df_fert = self.tot_fert() N_tot = ( df_fert["Haber-Bosch"] + df_fert["atmospheric N2"] + df_fert["Atmospheric deposition"] + self.df_elevage["Consummed Nitrogen from imported feed (ktN)"].sum() ) return N_NP / N_tot * 100
[docs] def NUE_system_2(self): """ Alternative NUE computation considering livestock conversion factors and feed inputs. Includes non-edible nitrogen outputs and imported feed consumption in the calculation. :return: Adjusted system-wide NUE (%). :rtype: float """ N_NP = ( self.df_cultures["Nitrogen Production (ktN)"].sum() + ( (self.df_elevage["Edible Nitrogen (ktN)"] + self.df_elevage["Non Edible Nitrogen (ktN)"]) * (1 - 1 / self.df_elevage["Conversion factor (%)"]) ).sum() + self.df_elevage["Consummed Nitrogen from imported feed (ktN)"].sum() ) df_fert = self.tot_fert() N_tot = ( df_fert["Haber-Bosch"] + df_fert["atmospheric N2"] + df_fert["Atmospheric deposition"] + self.df_elevage["Consummed Nitrogen from imported feed (ktN)"].sum() ) return N_NP / N_tot * 100
[docs] def N_self_sufficient(self): """ Estimates nitrogen self-sufficiency of the system. Defined as the share of atmospheric (biological) nitrogen inputs relative to all external nitrogen sources. :return: Self-sufficiency ratio (%). :rtype: float """ df_fert = self.tot_fert() return ( (df_fert["atmospheric N2"] + df_fert["Atmospheric deposition"]) * 100 / ( df_fert["atmospheric N2"] + df_fert["Atmospheric deposition"] + df_fert["Haber-Bosch"] + self.df_elevage["Consummed Nitrogen from imported feed (ktN)"].sum() ) )
[docs] def env_footprint(self): """ Calculates the land footprint (in ha) of nitrogen flows linked to: - local food and feed production, - imported food and feed, - imported and exported livestock nitrogen, - crop exports for feed and food. This is expressed as theoretical land areas mobilized by the nitrogen content. :return: A pandas Series of land footprint values (in ha), positive for imports and local use, negative for exports. :rtype: pandas.Series """ local_surface_food = ( self.df_cultures["Nitrogen For Food (ktN)"] / self.df_cultures["Nitrogen Production (ktN)"] * self.df_cultures["Area (ha)"] ).sum() local_surface_feed = ( self.df_cultures["Nitrogen For Feed (ktN)"] / self.df_cultures["Nitrogen Production (ktN)"] * self.df_cultures["Area (ha)"] ).sum() # Food import # 1. Filtrer les allocations "Imported food" alloc = self.allocation_vege.loc[ self.allocation_vege["Type"] == "Imported Food", ["Culture", "Allocated Nitrogen"] ] # 2. Grouper par culture au cas où il y aurait plusieurs lignes par culture alloc_grouped = alloc.groupby("Culture")["Allocated Nitrogen"].sum() # 3. Créer un DataFrame aligné avec les index de df_cultures alloc_df = pd.DataFrame(index=self.df_cultures.index) alloc_df["Allocated Nitrogen"] = alloc_grouped alloc_df["Allocated Nitrogen"] = alloc_df["Allocated Nitrogen"].fillna(0) # 4. Calcul final : ratio * surface # Récupère les valeurs de 'Nitrogen Production (ktN)' et de 'Allocated Nitrogen' nitrogen_production = self.df_cultures["Nitrogen Production (ktN)"] allocated_nitrogen = alloc_df["Allocated Nitrogen"] # Si 'Nitrogen Production (ktN)' est nul, se rabattre sur les valeurs du blé # En supposant que "Wheat" soit dans l'index de df_cultures, sinon adapte-le wheat_nitrogen_production = self.df_cultures.loc["Wheat", "Nitrogen Production (ktN)"] wheat_area = self.df_cultures.loc["Wheat", "Area (ha)"] # Remplacer les valeurs de 'Nitrogen Production (ktN)' par celles du blé si elles sont nulles adjusted_nitrogen_production = nitrogen_production.replace(0, wheat_nitrogen_production) # Utiliser la superficie du blé lorsque la production d'azote est nulle, sans modifier df_cultures adjusted_area = self.df_cultures["Area (ha)"].where(nitrogen_production != 0, wheat_area) # Calcul du total des importations alimentaires total_food_import = (allocated_nitrogen / adjusted_nitrogen_production * adjusted_area).sum() # Feed import # 1. Filtrer les allocations "Imported food" alloc = self.allocation_vege.loc[ self.allocation_vege["Type"] == "Imported Feed", ["Culture", "Allocated Nitrogen"] ] # 2. Grouper par culture au cas où il y aurait plusieurs lignes par culture alloc_grouped = alloc.groupby("Culture")["Allocated Nitrogen"].sum() # 3. Créer un DataFrame aligné avec les index de df_cultures alloc_df = pd.DataFrame(index=self.df_cultures.index) alloc_df["Allocated Nitrogen"] = alloc_grouped alloc_df["Allocated Nitrogen"] = alloc_df["Allocated Nitrogen"].fillna(0) # 4. Calcul final : ratio * surface # Récupère les valeurs de 'Nitrogen Production (ktN)' et de 'Allocated Nitrogen' nitrogen_production = self.df_cultures["Nitrogen Production (ktN)"] allocated_nitrogen = alloc_df["Allocated Nitrogen"] # Si 'Nitrogen Production (ktN)' est nul, se rabattre sur les valeurs du blé # En supposant que "Wheat" soit dans l'index de df_cultures, sinon adapte-le wheat_nitrogen_production = self.df_cultures.loc["Wheat", "Nitrogen Production (ktN)"] wheat_area = self.df_cultures.loc["Wheat", "Area (ha)"] # Remplacer les valeurs de 'Nitrogen Production (ktN)' par celles du blé si elles sont nulles adjusted_nitrogen_production = nitrogen_production.replace(0, wheat_nitrogen_production) # Utiliser la superficie du blé lorsque la production d'azote est nulle, sans modifier df_cultures adjusted_area = self.df_cultures["Area (ha)"].where(nitrogen_production != 0, wheat_area) # Calcul du total des importations alimentaires total_feed_import = (allocated_nitrogen / adjusted_nitrogen_production * adjusted_area).sum() ## Importation de viande # 1. Sélectionner les animaux importés elevage_importe = self.df_elevage[self.df_elevage["Net animal nitrogen exports (ktN)"] < 0] # 2. Pourcentage importé = |export net négatif| / production totale elevage_importe["fraction_importée"] = ( -elevage_importe["Net animal nitrogen exports (ktN)"] / elevage_importe["Edible Nitrogen (ktN)"] ) # 3. Créer un DataFrame vide pour accumuler les contributions par culture surface_par_culture = pd.Series(0.0, index=self.df_cultures.index) # 4. Parcourir chaque type d'élevage importé for animal in elevage_importe.index: if animal not in self.allocation_vege["Consumer"].values: continue # Si l'animal n'est pas dans les allocations, on passe # a. Calculer la part importée de l'azote pour cet élevage part_importee = elevage_importe.loc[animal, "fraction_importée"] if elevage_importe.loc[elevage_importe.index == animal, "fraction_importée"].item() == np.inf: # Calculer l'azote végétal nécessaire (N_feed=Azote importé/0.12) N_animal_imported = -elevage_importe.loc[animal, "Net animal nitrogen exports (ktN)"] N_feed_total_required = N_animal_imported / 0.12 # Aller chercher dans régimes[animal] les cultures consommées et en quelles proportion # Dans chaque catégories, prendre la première culture avec un rendement non nul ou nan de la liste (df_cultures["Nitrogen Production (ktN)"]/df_culture["Area (ha)"]) # En déduire la surface théoriquement utilisée pour le nourrir (surface_par_culture[culture] = N_feed/rendement) # Si aucune culture de la catégorie n'a de rendement, passer cette catégorie # Parcourir les catégories du régime de l'animal for proportion, culture_list in regimes[animal].items(): N_feed_for_category = N_feed_total_required * proportion # Chercher la *première* culture valide dans la liste de la catégorie for culture_name in culture_list: # Vérifier si la culture existe dans df_cultures if culture_name in self.df_cultures.index: prod = self.df_cultures.loc[culture_name, "Nitrogen Production (ktN)"] surface = self.df_cultures.loc[culture_name, "Area (ha)"] # Calculer le rendement si possible (surface > 0 et prod > 0) if surface > 0 and prod > 0: rendement = prod / surface # Calculer la surface nécessaire pour cette catégorie via cette culture surface_needed = N_feed_for_category / rendement # Ajouter à la surface totale pour cette culture surface_par_culture[culture_name] = ( surface_par_culture.get(culture_name, 0) + surface_needed ) break # On a trouvé la première culture valide, on passe à la catégorie suivante du régime # else: rendement invalide (0 ou NaN), on essaie la culture suivante dans la liste # else: la culture n'est pas dans df_cultures, on essaie la suivante dans la liste else: # b. Extraire les allocations d'azote de chaque culture pour cet élevage aliments = self.allocation_vege[self.allocation_vege["Consumer"] == animal] for _, row in aliments.iterrows(): culture = row["Culture"] azote = row["Allocated Nitrogen"] * part_importee # Quantité d'azote importée pour cette culture if culture not in self.df_cultures.index: culture = "Wheat" # Si la culture n'est pas dans df_cultures, on remplace par du blé # c. Récupérer la production d'azote et la surface de la culture prod = self.df_cultures.loc[culture, "Nitrogen Production (ktN)"] surface = self.df_cultures.loc[culture, "Area (ha)"] # if prod == 0: # culture = "Wheat" # prod = self.df_cultures.loc[culture, "Nitrogen Production (ktN)"] # surface = self.df_cultures.loc[culture, "Area (ha)"] # d. Calculer la surface nécessaire pour produire l'azote consommé if prod > 0: surface_equivalente = azote / prod * surface surface_par_culture[culture] += surface_equivalente import_animal = surface_par_culture.sum() ## Export animaux # 1. Sélectionner les animaux exportés (ceux qui ont un exportation nette positive) elevage_exporte = self.df_elevage[self.df_elevage["Net animal nitrogen exports (ktN)"] > 0] # 2. Calculer la fraction exportée pour chaque animal (en proportion de la production totale) elevage_exporte["fraction_exportée"] = ( elevage_exporte["Net animal nitrogen exports (ktN)"] / elevage_exporte["Edible Nitrogen (ktN)"] ) # 3. Créer un dictionnaire pour accumuler les résultats par culture surface_par_culture_exporte = pd.Series({culture: 0.0 for culture in self.df_cultures.index}) # 4. Parcourir chaque type d'élevage exporté for animal in elevage_exporte.index: if animal not in self.allocation_vege["Consumer"].values: continue # Si l'animal n'est pas dans les allocations, on passe # a. Calculer la part exportée de l'azote pour cet élevage part_exportee = elevage_exporte.loc[animal, "fraction_exportée"] # b. Extraire les allocations d'azote de chaque culture pour cet élevage aliments = self.allocation_vege[self.allocation_vege["Consumer"] == animal] for _, row in aliments.iterrows(): culture = row["Culture"] azote = row["Allocated Nitrogen"] * part_exportee # Quantité d'azote exportée pour cette culture if culture not in self.df_cultures.index: culture = "Wheat" # Si la culture n'est pas dans df_cultures, on remplace par du blé # c. Récupérer la production d'azote et la surface de la culture prod = self.df_cultures.loc[culture, "Nitrogen Production (ktN)"] surface = self.df_cultures.loc[culture, "Area (ha)"] # d. Calculer la surface nécessaire pour produire l'azote consommé if prod > 0: surface_equivalente = azote / prod * surface surface_par_culture_exporte[culture] += surface_equivalente export_animal = surface_par_culture_exporte.sum() # Export culture # Feed export_surface_feed = ( self.df_cultures["Nitrogen Exported For Feed (ktN)"] / self.df_cultures["Nitrogen Production (ktN)"] * self.df_cultures["Area (ha)"] ).sum() # Food export_surface_food = ( self.df_cultures["Available Nitrogen After Feed, Export Feed and Food (ktN)"] / self.df_cultures["Nitrogen Production (ktN)"] * self.df_cultures["Area (ha)"] ).sum() return pd.Series( { "Local Food": int(local_surface_food), "Local Feed": int(local_surface_feed), "Import Food": int(total_food_import), "Import Feed": int(total_feed_import), "Import Livestock": int(import_animal), "Export Livestock": -int(export_animal), "Export Feed": -int(export_surface_feed), "Export Food": -int(export_surface_food), } )
[docs] def net_footprint(self): """ Computes the net nitrogen land footprint of the region (in Mha). Aggregates all imports and exports to yield a net balance of nitrogen-dependent land use. :return: Net land footprint (in million hectares). :rtype: float """ df = self.env_footprint() df_total_import = df.loc[["Import Food", "Import Feed", "Import Livestock"]].sum(axis=0) df_total_export = df.loc[["Export Food", "Export Feed", "Export Livestock"]].sum(axis=0) net_import_export = df_total_import + df_total_export return np.round(net_import_export / 1e6, 2)
[docs] def LU_density(self): """ Calculates the livestock unit density over the agricultural area. :return: Livestock unit per hectare (LU/ha). :rtype: float """ return np.round(self.df_elevage["LU"].sum() / self.df_cultures["Area (ha)"].sum(), 2)
[docs] def NH3_vol(self): """ Returns the total NH₃ volatilization in the system. :return: NH₃ emissions in kt. :rtype: float """ return self.emissions()["NH3 volatilization"]
[docs] def N2O_em(self): """ Returns the total N₂O emissions in the system. :return: N₂O emissions in kt. :rtype: float """ return self.emissions()["N2O emission"]