Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 40 additions & 26 deletions climada/engine/impact.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import pandas as pd
import xlsxwriter
from deprecation import deprecated
from pandas.api.types import is_string_dtype
from pyproj import CRS as pyprojCRS
from rasterio.crs import CRS as rasterioCRS # pylint: disable=no-name-in-module
from scipy import sparse
Expand All @@ -50,7 +51,6 @@
import climada.util.dates_times as u_dt
import climada.util.interpolation as u_interp
import climada.util.plot as u_plot
from climada import CONFIG
from climada.entity import Exposures
from climada.util.constants import CMAP_IMPACT, DEF_CRS, DEF_FREQ_UNIT
from climada.util.select import get_attributes_with_matching_dimension
Expand Down Expand Up @@ -107,8 +107,8 @@ def __init__(
crs=DEF_CRS,
eai_exp=None,
at_event=None,
tot_value=0.,
aai_agg=0.,
tot_value=0.0,
aai_agg=0.0,
unit="",
imp_mat=None,
haz_type="",
Expand Down Expand Up @@ -216,8 +216,8 @@ def calc(
"The use of Impact().calc() is deprecated."
" Use ImpactCalc().impact() instead."
)
from climada.engine.impact_calc import (
ImpactCalc, # pylint: disable=import-outside-toplevel
from climada.engine.impact_calc import ( # pylint: disable=import-outside-toplevel
ImpactCalc,
)

impcalc = ImpactCalc(exposures, impact_funcs, hazard)
Expand Down Expand Up @@ -1191,6 +1191,8 @@ def write_csv(self, file_name):
file_name : str
absolute path of the file
"""
if not all((isinstance(val, str) for val in self.event_name)):
raise TypeError("'event_name' must be a list of strings")
LOGGER.info("Writing %s", file_name)
with open(file_name, "w", encoding="utf-8") as imp_file:
imp_wr = csv.writer(imp_file)
Expand Down Expand Up @@ -1239,6 +1241,8 @@ def write_excel(self, file_name):
file_name : str
absolute path of the file
"""
if not all((isinstance(val, str) for val in self.event_name)):
raise TypeError("'event_name' must be a list of strings")
LOGGER.info("Writing %s", file_name)

def write_col(i_col, imp_ws, xls_data):
Expand Down Expand Up @@ -1453,7 +1457,13 @@ def from_csv(cls, file_name):
imp.aai_agg = imp_df["aai_agg"][0]
imp.event_id = imp_df["event_id"][~np.isnan(imp_df["event_id"])].values
num_ev = imp.event_id.size
imp.event_name = imp_df["event_name"][:num_ev].values.tolist()
event_names = imp_df["event_name"][:num_ev]
if not is_string_dtype(event_names):
warnings.warn(
"Some event names are not str will be converted to str.", UserWarning
)
event_names = event_names.astype(str)
imp.event_name = event_names.values.tolist()
imp.date = imp_df["event_date"][:num_ev].values
imp.at_event = imp_df["at_event"][:num_ev].values
imp.frequency = imp_df["event_frequency"][:num_ev].values
Expand All @@ -1475,7 +1485,7 @@ def from_csv(cls, file_name):
def read_csv(self, *args, **kwargs):
"""This function is deprecated, use Impact.from_csv instead."""
LOGGER.warning(
"The use of Impact.read_csv is deprecated." "Use Impact.from_csv instead."
"The use of Impact.read_csv is deprecated. Use Impact.from_csv instead."
)
self.__dict__ = Impact.from_csv(*args, **kwargs).__dict__

Expand All @@ -1494,28 +1504,32 @@ def from_excel(cls, file_name):
Impact from excel file
"""
LOGGER.info("Reading %s", file_name)
dfr = pd.read_excel(file_name)
imp = cls(haz_type=str(dfr["haz_type"][0]))

imp.unit = dfr["unit"][0]
imp.tot_value = dfr["tot_value"][0]
imp.aai_agg = dfr["aai_agg"][0]
imp_df = pd.read_excel(file_name)
imp = cls(haz_type=str(imp_df["haz_type"][0]))

imp.event_id = dfr["event_id"][~np.isnan(dfr["event_id"].values)].values
imp.event_name = dfr["event_name"][: imp.event_id.size].values
imp.date = dfr["event_date"][: imp.event_id.size].values
imp.frequency = dfr["event_frequency"][: imp.event_id.size].values
imp.unit = imp_df["unit"][0]
imp.tot_value = imp_df["tot_value"][0]
imp.aai_agg = imp_df["aai_agg"][0]
imp.event_id = imp_df["event_id"][~np.isnan(imp_df["event_id"].values)].values
event_names = imp_df["event_name"][~np.isnan(imp_df["event_id"].values)]
if not is_string_dtype(event_names):
warnings.warn(
"Some event names are not str will be converted to str", UserWarning
)
event_names = event_names.astype(str)
imp.event_name = event_names.values
imp.date = imp_df["event_date"][: imp.event_id.size].values
imp.frequency = imp_df["event_frequency"][: imp.event_id.size].values
imp.frequency_unit = (
dfr["frequency_unit"][0] if "frequency_unit" in dfr else DEF_FREQ_UNIT
imp_df["frequency_unit"][0] if "frequency_unit" in imp_df else DEF_FREQ_UNIT
)
imp.at_event = dfr["at_event"][: imp.event_id.size].values

imp.eai_exp = dfr["eai_exp"][~np.isnan(dfr["eai_exp"].values)].values
imp.at_event = imp_df["at_event"][: imp.event_id.size].values
imp.eai_exp = imp_df["eai_exp"][~np.isnan(imp_df["eai_exp"].values)].values
imp.coord_exp = np.zeros((imp.eai_exp.size, 2))
imp.coord_exp[:, 0] = dfr["exp_lat"].values[: imp.eai_exp.size]
imp.coord_exp[:, 1] = dfr["exp_lon"].values[: imp.eai_exp.size]
imp.coord_exp[:, 0] = imp_df["exp_lat"].values[: imp.eai_exp.size]
imp.coord_exp[:, 1] = imp_df["exp_lon"].values[: imp.eai_exp.size]
try:
imp.crs = u_coord.to_csr_user_input(dfr["exp_crs"].values[0])
imp.crs = u_coord.to_csr_user_input(imp_df["exp_crs"].values[0])
except AttributeError:
imp.crs = DEF_CRS

Expand Down Expand Up @@ -1679,8 +1693,8 @@ def video_direct_impact(
-------
list of Impact
"""
from climada.engine.impact_calc import (
ImpactCalc, # pylint: disable=import-outside-toplevel
from climada.engine.impact_calc import ( # pylint: disable=import-outside-toplevel
ImpactCalc,
)

if args_exp is None:
Expand Down
2 changes: 1 addition & 1 deletion climada/engine/test/test_impact.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def test_from_eih_pass(self):
self.assertEqual(imp.frequency_unit, HAZ.frequency_unit)
self.assertEqual(imp.tot_value, tot_value)
np.testing.assert_array_almost_equal(imp.event_id, HAZ.event_id)
np.testing.assert_array_almost_equal(imp.event_name, HAZ.event_name)
np.testing.assert_array_equal(imp.event_name, HAZ.event_name)
np.testing.assert_array_almost_equal(imp.date, HAZ.date)
np.testing.assert_array_almost_equal(imp.frequency, HAZ.frequency)
np.testing.assert_array_almost_equal(imp.eai_exp, fake_eai_exp)
Expand Down
2 changes: 1 addition & 1 deletion climada/engine/test/test_impact_calc.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def test_empty_impact(self):

def test_single_event_impact(self):
"""Check impact for single event"""
haz = HAZ.select([1])
haz = HAZ.select(event_id=[1])
icalc = ImpactCalc(ENT.exposures, ENT.impact_funcs, haz)
impact = icalc.impact()
aai_agg = 0.0
Expand Down
137 changes: 136 additions & 1 deletion climada/hazard/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import itertools
import logging
import pathlib
import warnings
from collections.abc import Collection
from typing import Any, Callable, Dict, Optional, Union

import h5py
Expand Down Expand Up @@ -177,6 +179,8 @@ def from_raster(
files_fraction = [files_fraction]
if not attrs:
attrs = {}
else:
attrs = cls._check_and_cast_attrs(attrs)
if not band:
band = [1]
if files_fraction is not None and len(files_intensity) != len(files_fraction):
Expand Down Expand Up @@ -889,10 +893,135 @@ def vshape(array):
**ident
)

hazard_kwargs = cls._check_and_cast_attrs(hazard_kwargs)

# Done!
LOGGER.debug("Hazard successfully loaded. Number of events: %i", num_events)
return cls(centroids=centroids, intensity=intensity_matrix, **hazard_kwargs)

@staticmethod
def _check_and_cast_attrs(attrs: Dict[str, Any]) -> Dict[str, Any]:
"""Check the validity of the hazard attributes given and cast to correct type if required and possible.

The current purpose is to check that event_name is a list of string
(and convert to string otherwise), although other checks and casting could be included here in the future.

Parameters
----------

attrs : dict
Attributes for a new Hazard object

Returns
-------

attrs : dict
Attributes checked for type validity and casted otherwise (only event_name at the moment).

Warns
-----

UserWarning
Warns the user if any value casting happens.
"""

def _check_and_cast_container(
attr_value: Any, expected_container: Collection
) -> Any:
"""Check if the attribute is of the expected container type and cast if necessary.

Parameters
----------
attr_value : any
The current value of the attribute.

expected_container : type
The expected type of the container (e.g., list, np.ndarray).

Returns
-------
attr_value : any
The value cast to the expected container type, if needed.
"""
if not isinstance(attr_value, expected_container):
warnings.warn(
f"Value should be of type {expected_container}. Casting it.",
UserWarning,
)
# Attempt to cast to the expected container type
if expected_container is list:
return list(attr_value)
elif expected_container is np.ndarray:
return np.array(attr_value)
else:
raise TypeError(f"Unsupported container type: {expected_container}")
return attr_value

def _check_and_cast_elements(
attr_value: Any, expected_dtype: Union[Any, None]
) -> Any:
"""Check if the elements of the container are of the expected dtype and cast if necessary,
while preserving the original container type.

Parameters
----------
attr_value : any
The current value of the attribute (a container).

expected_dtype : type or None
The expected type of the elements within the container. If None, no casting is done.

Returns
-------
attr_value : any
The value with elements cast to the expected type, preserving the original container type.
"""
if expected_dtype is None:
# No dtype enforcement required
return attr_value

container_type = type(attr_value) # Preserve the original container type

# Perform type checking and casting of elements
if isinstance(attr_value, (list, np.ndarray)):
if not all(isinstance(val, expected_dtype) for val in attr_value):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 good choice. even though in case of attr_value being a nd.array, np.issubtye(attr_value, expected_type) would be infinitely faster but make the code more complex with nothing to be gained but a couple of seconds if the array is huge.

warnings.warn(
f"Not all values are of type {expected_dtype}. Casting values.",
UserWarning,
)
casted_values = [expected_dtype(val) for val in attr_value]
# Return the casted values in the same container type
if container_type is list:
return casted_values
elif container_type is np.ndarray:
return np.array(casted_values)
else:
raise TypeError(f"Unsupported container type: {container_type}")
else:
raise TypeError(
f"Expected a container (e.g., list or ndarray), got {type(attr_value)} instead."
)

return attr_value

## This should probably be defined as a CONSTANT?
attrs_to_check = {"event_name": (list, str), "event_id": (np.ndarray, None)}
Comment on lines +1007 to +1008
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 agreed. CONSTANT dict seems sensible.


for attr_name, (expected_container, expected_dtype) in attrs_to_check.items():
attr_value = attrs.get(attr_name)

if attr_value is not None:
# Check and cast the container type
attr_value = _check_and_cast_container(attr_value, expected_container)

# Check and cast the element types (if applicable)
attr_value = _check_and_cast_elements(attr_value, expected_dtype)

# Update the attrs dictionary with the modified value
attrs[attr_name] = attr_value

return attrs

@staticmethod
def _attrs_to_kwargs(attrs: Dict[str, Any], num_events: int) -> Dict[str, Any]:
"""Transform attributes to init kwargs or use default values
Expand Down Expand Up @@ -986,7 +1115,9 @@ def from_excel(cls, file_name, var_names=None, haz_type=None):
centroids = Centroids._legacy_from_excel(
file_name, var_names=var_names["col_centroids"]
)
hazard_kwargs.update(cls._read_att_excel(file_name, var_names, centroids))
attrs = cls._read_att_excel(file_name, var_names, centroids)
attrs = cls._check_and_cast_attrs(attrs)
hazard_kwargs.update(attrs)
except KeyError as var_err:
raise KeyError("Variable not in Excel file: " + str(var_err)) from var_err

Expand Down Expand Up @@ -1071,6 +1202,9 @@ def write_hdf5(self, file_name, todense=False):
with h5py.File(file_name, "w") as hf_data:
str_dt = h5py.special_dtype(vlen=str)
for var_name, var_val in self.__dict__.items():
if var_name == "event_name":
if not all((isinstance(val, str) for val in var_val)):
raise TypeError("'event_name' must be a list of strings")
if var_name == "centroids":
# Centroids have their own write_hdf5 method,
# which is invoked at the end of this method (s.b.)
Expand Down Expand Up @@ -1172,6 +1306,7 @@ def from_hdf5(cls, file_name):
else:
hazard_kwargs[var_name] = hf_data.get(var_name)
hazard_kwargs["centroids"] = Centroids.from_hdf5(file_name)
hazard_kwargs = cls._check_and_cast_attrs(hazard_kwargs)
# Now create the actual object we want to return!
return cls(**hazard_kwargs)

Expand Down