from farmgym.v2.entity_api import Entity_API, Range, fillarray, checkissubclass, expglm
import numpy as np
from PIL import Image
[docs]class Soil(Entity_API):
def __init__(self, field, parameters):
Entity_API.__init__(self, field, parameters)
X = self.field.X
Y = self.field.Y
self.variables = {}
self.variables["available_N#g"] = fillarray(X, Y, (0, 10000), 0.0) # np.full((X,Y),fill_value=Range((0,10000),0.))
self.variables["available_P#g"] = fillarray(X, Y, (0, 10000), 0.0) # np.full((X,Y),fill_value=Range((0,10000),0.))
self.variables["available_K#g"] = fillarray(X, Y, (0, 10000), 0.0) # np.full((X,Y),fill_value=Range((0,10000),0.))
self.variables["available_C#g"] = fillarray(X, Y, (0, 10000), 0.0) # np.full((X,Y),fill_value=Range((0,10000),0.))
self.variables["available_Water#L"] = fillarray(
X, Y, (0, 10000), 0.0
) # np.full((X,Y),fill_value=Range((0,10000),0.))
self.variables["wet_surface#m2.day-1"] = fillarray(
X, Y, (0, 1000), 0.0
) # np.full((X,Y),fill_value=Range((0,1000),0.))
self.variables["microlife_health_index#%"] = fillarray(
X, Y, (0, 100), 0.0
) # np.full((X,Y),fill_value=Range((0,100),0.))
self.variables["amount_cide#g"] = {
"pollinators": fillarray(X, Y, (0, 10000), 0.0), # np.full((X,Y),fill_value=Range((0,10000),0.)),
"pests": fillarray(X, Y, (0, 10000), 0.0), # np.full((X,Y),fill_value=Range((0,10000),0.)),
"soil": fillarray(X, Y, (0, 10000), 0.0), # np.full((X,Y),fill_value=Range((0,10000),0.)),
"weeds": fillarray(X, Y, (0, 10000), 0.0), # np.full((X,Y),fill_value=Range((0,10000),0.))
}
#
self.variables["total_cumulated_added_water#L"] = Range((0, 100000), 0.0)
self.variables["total_cumulated_added_cide#g"] = {
"pollinators": Range((0, 100000), 0.0),
"pests": Range((0, 100000), 0.0),
"soil": Range((0, 100000), 0.0),
"weeds": Range((0, 100000), 0.0),
}
# Actions
self.actions = {
"water_discrete": {
"plot": field.plots,
"amount#L": list(np.linspace(1, 15, 15)),
"duration#min": [30, 60],
},
"water_continuous": {
"plot": field.plots,
"amount#L": (0.0, 20.0),
"duration#min": [30, 60],
},
}
# Dependencies
self.dependencies = {"Plant", "Weather"}
def get_parameter_keys(self):
return [
"max_water_capacity#L.m-3",
"depth#m",
"wilting_point#L.m-3",
"water_surface_absorption_speed#m2.day-1",
"bedrocks_release_N#mg.day-1",
"bedrocks_release_K#mg.day-1",
"bedrocks_release_P#mg.day-1",
"bedrocks_release_C#mg.day-1",
]
def reset(self):
# Generate a random soil
for x in range(self.field.X):
for y in range(self.field.Y):
self.variables["available_N#g"][x, y].set_value(self.np_random.random() * 100 * self.field.plotsurface)
self.variables["available_P#g"][x, y].set_value(self.np_random.random() * 100 * self.field.plotsurface)
self.variables["available_K#g"][x, y].set_value(self.np_random.random() * 100 * self.field.plotsurface)
self.variables["available_C#g"][x, y].set_value(self.np_random.random() * 100 * self.field.plotsurface)
self.variables["available_Water#L"][x, y].set_value(
(0.5 + 0.5 * self.np_random.random())
* self.parameters["max_water_capacity#L.m-3"]
* self.field.plotsurface
* self.parameters["depth#m"]
)
self.variables["wet_surface#m2.day-1"][x, y].set_value(0) # self.np_random.rand()*self.field.plotsurface
self.variables["microlife_health_index#%"][x, y].set_value(100)
self.variables["amount_cide#g"]["pollinators"][x, y].set_value(0)
self.variables["amount_cide#g"]["pests"][x, y].set_value(0)
self.variables["amount_cide#g"]["soil"][x, y].set_value(0)
self.variables["amount_cide#g"]["weeds"][x, y].set_value(0)
self.variables["total_cumulated_added_water#L"].set_value(0.0)
# self.variables["total_cumulated_added_pesticide#g"] = 0.
# self.variables["total_cumulated_added_herbicide#g"] = 0.
self.variables["total_cumulated_added_cide#g"]["pollinators"].set_value(0.0)
self.variables["total_cumulated_added_cide#g"]["pests"].set_value(0.0)
self.variables["total_cumulated_added_cide#g"]["soil"].set_value(0.0)
self.variables["total_cumulated_added_cide#g"]["weeds"].set_value(0.0)
self.initialize_variables(self.initial_conditions)
def update_variables(self, field, entities):
max_water_plot_capacity = (
self.parameters["max_water_capacity#L.m-3"] * self.field.plotsurface * self.parameters["depth#m"]
)
# plants = [entities[e] for e in entities if issubclass(entities[e].__class__,Plant)]
plants = [entities[e] for e in entities if checkissubclass(entities[e].__class__, "Plant")]
weather = [entities[e] for e in entities if checkissubclass(entities[e].__class__, "Weather")][0]
fertilizers = [entities[e] for e in entities if checkissubclass(entities[e].__class__, "Fertilizer")]
weeds = [entities[e] for e in entities if checkissubclass(entities[e].__class__, "Weeds")]
cides = [entities[e] for e in entities if checkissubclass(entities[e].__class__, "Cide")]
for x in range(self.field.X):
for y in range(self.field.Y):
water_surplus = 0
# Natural water input (rain)
if weather.variables["rain_amount"].value == "Light":
water_after_input = (
self.variables["available_Water#L"][x, y].value
+ 0.5 * self.np_random.random() * max_water_plot_capacity
)
self.variables["available_Water#L"][x, y].set_value(min(max_water_plot_capacity, water_after_input))
water_surplus = water_after_input - self.variables["available_Water#L"][x, y].value
self.variables["wet_surface#m2.day-1"][x, y].set_value(self.field.plotsurface)
elif weather.variables["rain_amount"].value == "Heavy":
water_after_input = self.variables["available_Water#L"][x, y].value + max_water_plot_capacity
self.variables["available_Water#L"][x, y].set_value(max_water_plot_capacity)
water_surplus = water_after_input - self.variables["available_Water#L"][x, y].value
self.variables["wet_surface#m2.day-1"][x, y].set_value(self.field.plotsurface)
# Natural nutrients input (earth)
for n in ["N", "K", "P", "C"]:
self.variables["available_" + n + "#g"][x, y].set_value(
self.variables["available_" + n + "#g"][x, y].value
+ self.variables["microlife_health_index#%"][x, y].value
/ 100.0
* self.parameters["bedrocks_release_" + n + "#mg.day-1"]
/ 1000.0
)
# Other nutrients input (fertilizers)
# print("FERTILIZERS",fertilizers)
for f in fertilizers:
# Q: Here, should we trigger update of f entity or simply compute amount? [f is updated later or earlier]
# Answer: Receiver always triggers action, Emitter never triggers it.
release = f.release_nutrients((x, y), self) # in kg
# print("Nutrients before",self.variables['available_N#g'][x,y].value, self.variables['microlife_health_index#%'][x,y].value)
for n in ["N", "K", "P", "C"]:
self.variables["available_" + n + "#g"][x, y].set_value(
self.variables["available_" + n + "#g"][x, y].value + release[n] * 1000
)
# print("Nutrients after",self.variables['available_N#g'][x,y].value)
for c in cides:
release = c.release((x, y)) # in kg
for n in ["pollinators", "pests", "soil", "weeds"]:
self.variables["amount_cide#g"][n][x, y].set_value(
self.variables["amount_cide#g"][n][x, y].value + release * 1000 * c.parameters[n]
)
# Weed nutrients and water consumption:
for w in weeds:
requirements = w.requirement((x, y))
release = w.release_nutrients((x, y), self) # in g
for n in ["N", "K", "P", "C"]:
self.variables["available_" + n + "#g"][x, y].set_value(
max(
0.0,
self.variables["available_" + n + "#g"][x, y].value
- requirements[n + "#g"]
+ release[n + "#g"],
)
)
self.variables["available_Water#L"][x, y].set_value(
max(
0.0,
self.variables["available_Water#L"][x, y].value - requirements["Water#L"],
)
)
# Plant nutrients consumption or release:
milife = self.variables["microlife_health_index#%"][x, y].value / 100.0
for p in plants:
requirements = p.requirement_nutrients((x, y))
release = p.release_nutrients((x, y), self) # in g
v = {}
stress = {}
for n in ["N", "K", "P", "C"]:
v[n + "#g"] = min(
self.variables["available_" + n + "#g"][x, y].value,
milife * requirements[n],
)
stress[n + "#g"] = requirements[n] - v[n + "#g"]
self.variables["available_" + n + "#g"][x, y].set_value(
self.variables["available_" + n + "#g"][x, y].value - v[n + "#g"] + release[n + "#g"]
)
p.receive_nutrients((x, y), v, stress)
# Plant water requirement
requirement_water = p.requirement_water((x, y), weather, field)
# print("PLANT WATER REQUIRES", requirement_water)
wp = self.parameters["wilting_point#L.m-3"] * self.parameters["depth#m"] * self.field.plotsurface
w = min(
requirement_water,
max(self.variables["available_Water#L"][x, y].value - wp, 0),
)
stress_water = requirement_water - w
self.variables["available_Water#L"][x, y].set_value(self.variables["available_Water#L"][x, y].value - w)
# print("SOIL ",requirement_water, w, stress_water)
p.receive_water((x, y), w, stress_water)
# Soil water evaporation (depend on shadows and what is not wet...)
soil_evaporated_water = self.ground_evaporation((x, y), weather, plants, weeds, field) / 1000
water_evaporation_threshold = (
self.parameters["max_water_capacity#L.m-3"]
* self.field.plotsurface
* (self.parameters["depth#m"] - self.parameters["total_evaporable_water#mm"] / 1000)
)
soil_evaporated_water = min(
soil_evaporated_water,
max(
self.variables["available_Water#L"][x, y].value - water_evaporation_threshold,
0,
),
)
# if (self.variables['available_Water#L'][x,y].value - soil_evaporated_water<water_evaporation_threshold):
# print("WATER EVAPORATED:", soil_evaporated_water, "available", self.variables['available_Water#L'][x, y].value)
self.variables["available_Water#L"][x, y].set_value(
max(
0,
self.variables["available_Water#L"][x, y].value - soil_evaporated_water,
)
)
self.variables["wet_surface#m2.day-1"][x, y].set_value(
max(
self.variables["wet_surface#m2.day-1"][x, y].value
- self.parameters["water_surface_absorption_speed#m2.day-1"],
0,
)
)
# Microlife health index:
# self.variables['microlife_health_index#%'][x,y].set_value( 100*np.exp(-(self.variables['amount_cide#g']['soil'][x,y].value ) ))
q = []
q.append(
(
1.0,
self.variables["amount_cide#g"]["soil"][x, y].value / 100,
0,
0,
)
)
q.append((0.5, water_surplus / max_water_plot_capacity, 0, 0))
p_stayalive = expglm(0.0, q)
# print("STAYALIVE",p_stayalive,self.variables['amount_cide#g']['soil'][x,y].value,water_surplus/max_water_plot_capacity)
# is_dead = (self.np_random.binomial(1, p_stayalive, 1)[0] == 0)
# if (is_dead):
# self.variables['microlife_health_index#%'][x, y].set_value(
# self.variables['microlife_health_index#%'][x, y].value * p_stayalive)
# else:
# self.variables['microlife_health_index#%'][x, y].set_value(min(100,
# self.variables['microlife_health_index#%'][x, y].value * (1+0.1*p_stayalive)))
# print("MICROLIFE before",self.variables['microlife_health_index#%'][x, y].value)
self.variables["microlife_health_index#%"][x, y].set_value(
(
(p_stayalive * (1 + 0.1 * p_stayalive) + (1 - p_stayalive) * (p_stayalive))
* self.variables["microlife_health_index#%"][x, y].value
)
)
# print("MICROLIFE after",self.variables['microlife_health_index#%'][x, y].value)
# Soil nutrients/water/pesticide/herbicide leakage due to rain.
rain_intensity = weather.variables["rain_intensity"].value
if rain_intensity > 0 or water_surplus > 0:
milife = self.variables["microlife_health_index#%"][x, y].value / 100.0
surf = self.field.plotsurface
for n in ["N", "K", "P", "C"]:
self.variables["available_" + n + "#g"][x, y].set_value(
max(
0,
self.variables["available_" + n + "#g"][x, y].value
- (rain_intensity * surf + water_surplus / max_water_plot_capacity) * (1 - milife),
)
)
for n in ["pollinators", "pests", "soil", "weeds"]:
self.variables["amount_cide#g"][n][x, y].set_value(
max(
0,
self.variables["amount_cide#g"][n][x, y].value
* np.exp(-(rain_intensity * surf + water_surplus / max_water_plot_capacity)),
)
)
# self.variables['microlife_health_index#%'][x, y].set_value(max(0,self.variables['microlife_health_index#%'][x,y].value - water_surplus/max_water_plot_capacity))
# print("PESTICIDE after leak", self.variables['amount_cide#g']['soil'][x, y].value)
# print("Nutrients after leaching",self.variables['available_N#g'][x,y].value)
def ground_evaporation(self, position, weather, plants, weeds, field):
ET_0 = weather.evapo_coefficient(field) # ml/m2
# Compute % of ground covered by shadow:
# print("Shadow-surfaces:")
# [print(p.compute_shadowsurface(position)) for p in plants]
plantshadow = np.sum([p.compute_shadowsurface(position) * p.parameters["shadow_coeff#%"] for p in plants])
weedshadow = np.sum([w.compute_shadowsurface(position) for w in weeds])
shadow_proportion = min((plantshadow + weedshadow) / self.field.plotsurface, 1.0)
wet_proportion = self.variables["wet_surface#m2.day-1"][position].value / self.field.plotsurface
evapo_prop = min(1.0 - shadow_proportion, wet_proportion)
# TODO: Multiply by coeff to stop/slow-down evaporation for clay/sand/loam.
drop_proportion = (
(1.1 - self.variables["microlife_health_index#%"][position].value / 100)
* self.field.plotsurface
* self.parameters["depth#m"]
* self.parameters["water_leakage_max#L.m-3.day-1"]
* 1000
)
return ET_0 * evapo_prop * self.field.plotsurface + drop_proportion
def act_on_variables(self, action_name, action_params):
self.assert_action(action_name, action_params)
if action_name == "water_discrete" or "water_continuous":
x, y = action_params["plot"]
max_water_plot_capacity = (
self.parameters["max_water_capacity#L.m-3"] * self.field.plotsurface * self.parameters["depth#m"]
)
water_after_input = self.variables["available_Water#L"][x, y].value + action_params["amount#L"]
new_value = min(max_water_plot_capacity, water_after_input)
water_surplus = water_after_input - new_value
self.variables["wet_surface#m2.day-1"][x, y].set_value(
self.variables["wet_surface#m2.day-1"][x, y].value
+ self.field.plotsurface * (action_params["duration#min"] / 60.0) / 24.0
) # watering for 30minutes over 24h.
self.variables["total_cumulated_added_water#L"].set_value(
self.variables["total_cumulated_added_water#L"].value
+ (new_value - self.variables["available_Water#L"][x, y].value)
)
self.variables["available_Water#L"][x, y].set_value(new_value)
if water_surplus > 0:
milife = self.variables["microlife_health_index#%"][x, y].value / 100.0
for n in ["N", "K", "P", "C"]:
self.variables["available_" + n + "#g"][x, y].set_value(
max(
0,
self.variables["available_" + n + "#g"][x, y].value - water_surplus * (1 - milife),
)
)
for n in ["pollinators", "pests", "soil", "weeds"]:
self.variables["amount_cide#g"][n][x, y].set_value(
max(
0,
self.variables["amount_cide#g"][n][x, y].value
* np.exp(-water_surplus / max_water_plot_capacity),
)
)
q = []
q.append((2.0, self.variables["amount_cide#g"]["soil"][x, y].value, 0, 0))
q.append((0.5, water_surplus / max_water_plot_capacity, 0, 0))
p_stayalive = expglm(0.0, q)
self.variables["microlife_health_index#%"][x, y].set_value(
(
(p_stayalive * (1 + 0.1 * p_stayalive) + (1 - p_stayalive) * (p_stayalive))
* self.variables["microlife_health_index#%"][x, y].value
)
)
def to_fieldimage(self):
im_width, im_height = 1216, 1216
image = Image.new(
"RGBA",
(im_width * self.field.X, im_height * self.field.Y),
(255, 255, 255, 0),
)
for x in range(self.field.X):
for y in range(self.field.Y):
# print("XY",x,y,self.variables['wet_surface#m2.day-1'][x,y].value)
if self.variables["wet_surface#m2.day-1"][x, y].value > 0.5 * self.field.plotsurface:
image.paste(self.images["wet"], (im_width * x, im_height * y))
else:
image.paste(self.images["dry"], (im_width * x, im_height * y))
return image