Source code for rainfall_gridder.prepare_data.DataPreparer
from pathlib import Path
import polars as pl
import xarray as xr
from rainfall_gridder.prepare_data import data_combiner, data_formatting, metadata_preparer
from rainfall_gridder.utils import spatial_utils, xarray_utils
[docs]
class DataPreparer:
"""
Main data preparing algorithm.
"""
def __init__(
self,
rainfall_data: pl.DataFrame,
rainfall_metadata: pl.DataFrame,
station_id_col: str,
station_name_col: str,
precipitation_col: str,
date_time_col: str,
start_date_col: str,
end_date_col: str,
easting_col: str,
northing_col: str,
gridded_rainfall_data: xr.Dataset,
gridded_rainfall_col: str,
rainfall_offset_hours: int,
output_dir: str | Path,
min_n_timesteps: int,
verbose: bool = False,
):
"""
Main data preparer for gridded workflow.
Parameters
----------
rainfall_data:
Rainfall gauge data
rainfall_metadata:
Details of rain gauge data
gridded_rainfall_col:
Name of rainfall variable in the gridded_rainfall_data
rainfall_offset_hours:
First hour of the rainfall day (e.g. 9 if running from 9am to 8.59am)
output_dir:
Output directory for data files
min_n_timesteps:
Minimum number of timesteps needed in rainfall_data to be considered valid
verbose:
Whether to print progress as algorithm is run (default: False)
"""
self.station_id_col = station_id_col
self.station_name_col = station_name_col
self.precipitation_col = precipitation_col
self.date_time_col = date_time_col
self.start_date_col = start_date_col
self.end_date_col = end_date_col
self.easting_col = easting_col
self.northing_col = northing_col
self.rainfall_offset_hours = rainfall_offset_hours
self.output_dir = output_dir
self.min_n_timesteps = min_n_timesteps
self.gridded_rainfall_col = gridded_rainfall_col
self.verbose = verbose
# Prepare data inputs
self.rainfall_data = self._prepare_data(rainfall_data)
self.rainfall_metadata = self._prepare_metadata(rainfall_metadata)
self.gridded_rainfall_data = self._prepare_gridded_rainfall_data(gridded_rainfall_data)
# empty final outputs
self.prepared_data = None
self.prepared_metadata = None
def _remove_duplicates_in_metadata(self, metadata: pl.DataFrame) -> pl.DataFrame:
return metadata.unique(
subset=[self.station_id_col]
) # TODO: this would leave wrong coords if it returns first unique
def _prepare_metadata(self, rainfall_metadata: pl.DataFrame) -> pl.DataFrame:
rainfall_metadata = self._remove_duplicates_in_metadata(rainfall_metadata)
try:
metadata_preparer.add_completeness_to_metadata(
self.rainfall_data,
rainfall_metadata,
station_id_col=self.station_id_col,
date_time_col=self.date_time_col,
)
except ValueError as ve:
print(ve)
rainfall_metadata = metadata_preparer.add_completeness_to_metadata(
self.rainfall_data, rainfall_metadata, station_id_col=self.station_id_col, date_time_col=self.date_time_col
)
rainfall_metadata = data_formatting.group_metadata_by_station_locations(
rainfall_metadata, easting_col=self.easting_col, northing_col=self.northing_col
)
return data_formatting.add_blank_file_path_to_metadata(rainfall_metadata)
def _prepare_gridded_rainfall_data(self, gridded_rainfall_data: xr.Dataset) -> xr.Dataset:
for data_var in ["x", "y", "time", self.gridded_rainfall_col]:
assert data_var in gridded_rainfall_data, (
f"Expecting data variable: '{data_var}' in gridded rainfall data. "
"Please add this variable, or rename its equivalent."
)
gridded_rainfall_data = xarray_utils.replace_daily_time_step_hour_with_zero(
gridded_rainfall_data, time_col="time"
)
gridded_rainfall_data = xarray_utils.subset_gridded_data_to_metadata_bounds(
gridded_rainfall_data, self.rainfall_metadata, self.easting_col, self.northing_col
)
return gridded_rainfall_data
def _prepare_data(self, data: pl.DataFrame) -> pl.DataFrame:
return data_formatting.set_negative_precip_values_to_none(data, precip_col=self.precipitation_col)
[docs]
@classmethod
def run(
cls, save_data: bool, return_data: bool, partition_by_columns: list = None, **kwargs
) -> None | tuple[pl.DataFrame, pl.DataFrame]:
"""
Run the data preparer and return and/or save the prepared data.
Parameters
----------
save_data:
Whether to save data to output directory
return_data:
Whether to return dataframes
partition_by_columns:
List of columns to partition the parquet files by if saving outputs
Returns
-------
prepared_data:
Data run through algorithm
prepared_metadata:
Metadata of data run through algorithm
"""
data_preparer = cls(**kwargs)
if data_preparer.verbose:
print("Preparing data for gridder")
data_preparer.prepare_data_and_metadata_for_gridding()
if save_data:
if data_preparer.verbose:
print(f"Saving data to {data_preparer.output_dir}")
data_preparer.save_prepared_data(partition_by_columns)
data_preparer.save_prepared_metadata()
else:
if data_preparer.verbose:
print("Data not saved")
if return_data:
return data_preparer.prepared_data, data_preparer.prepared_metadata
[docs]
def prepare_data_and_metadata_for_gridding(self) -> None:
prepared_data_list = []
prepared_metadata_list = []
# Loop through each station id group (group may be multiple gauges if a site has a backup)
for station_group_id in self.rainfall_metadata["station_group_id"].unique():
metadata_one_group = self.rainfall_metadata.filter(pl.col("station_group_id") == station_group_id)
data_one_group = self.rainfall_data.filter(
pl.col(self.station_id_col).is_in(metadata_one_group[self.station_id_col].unique().to_list())
)
# Sort and drop duplicates
data_one_group = data_one_group.sort(self.date_time_col).unique()
if len(data_one_group[self.station_id_col].unique()) > 1:
print(f"Group ID: {station_group_id} has {len(data_one_group[self.station_id_col].unique())} members")
# create pivot of data
data_one_group_pivot = data_one_group.pivot(
values=self.precipitation_col, index=self.date_time_col, on=self.station_id_col
).sort(by=self.date_time_col)
# if duplicate exist, merge segments
gauge_combiner = data_combiner.RainGaugeSegmentCombiner(
pivoted_gauge_data=data_one_group_pivot,
metadata=metadata_one_group,
station_id_col=self.station_id_col,
)
nearest_daily_gridded_cell = spatial_utils.get_nearest_grid_cell(
self.gridded_rainfall_data,
easting=metadata_one_group[self.easting_col][0],
northing=metadata_one_group[self.northing_col][0],
)
combined_data = gauge_combiner.loop_through_and_merge_data(
nearest_daily_gridded_cell,
date_time_col=self.date_time_col,
rain_col=self.gridded_rainfall_col,
rainfall_offset_hours=self.rainfall_offset_hours,
)
station_name = gauge_combiner.combined_station_col_name
# unpivot data
data_one_group = combined_data.unpivot(
index=[self.date_time_col],
on=[station_name],
variable_name=self.station_id_col,
value_name=self.precipitation_col,
)
else:
station_name = metadata_one_group[self.station_id_col][0]
# Save data if at least N months worth of non-null record
if len(data_one_group.drop_nulls()) >= self.min_n_timesteps:
if self.verbose:
print(f"Adding group ID: {station_group_id}")
output_file_name = str(
data_combiner.build_output_path(
base_dir=self.output_dir / "data", id_col_name=self.station_id_col, station_id=station_name
)
)
metadata_one_group = metadata_one_group.with_columns(pl.lit(output_file_name).alias("file_path"))
data_one_group = data_one_group.select(
[self.date_time_col, self.precipitation_col, self.station_id_col]
)
prepared_data_list.append(data_one_group)
else:
if self.verbose:
print(f"{station_name} being ignored as not more than {self.min_n_timesteps} time steps.")
if len(metadata_one_group) > 1:
if self.verbose:
print(f"merging metadata of {station_name}")
metadata_merger = metadata_preparer.MetadataMerger(
metadata=metadata_one_group,
cols_to_check_identical=[self.easting_col, self.northing_col, "station_group_id", "file_path"],
cols_to_combine=[col for col in [self.station_id_col, self.station_name_col] if col],
start_date_col=self.start_date_col,
end_date_col=self.end_date_col,
)
merged_metadata = metadata_merger.merge_group_metadata(
group_name=station_name,
group_name_col=self.station_id_col,
min_datetime=data_one_group[self.date_time_col].min(),
max_datetime=data_one_group[self.date_time_col].max(),
)
prepared_metadata_list.append(merged_metadata.select(self.rainfall_metadata.columns))
else:
prepared_metadata_list.append(metadata_one_group.select(self.rainfall_metadata.columns))
self.prepared_data = pl.concat(prepared_data_list)
self.prepared_metadata = pl.concat(prepared_metadata_list, how="diagonal_relaxed")
[docs]
def save_prepared_data(self, partition_by_columns: list = None) -> None:
"""
Save data that has been prepared for gridding.
Parameters
----------
partition_by_columns:
Columns that decide the partitioning of the output parquet file structure (default is station_id_col)
"""
if partition_by_columns is None:
partition_by_columns = [self.station_id_col]
if self.prepared_data is None:
raise RuntimeError("You must call prepare_data_and_metadata_for_gridding() before save_prepared_data()")
assert len(self.prepared_metadata.filter(pl.col("file_path").is_duplicated())) == 0, (
"Problem with metadata as duplicate filepaths"
)
# Save partitioned parquet file
(
self.prepared_data.sort(self.date_time_col).write_parquet(
self.output_dir / "data",
partition_by=partition_by_columns,
)
)
if self.verbose:
print(f"prepared gauge data available at: {self.output_dir / 'data/'}")
[docs]
def save_prepared_metadata(self) -> None:
if self.prepared_metadata is None:
raise RuntimeError("You must call prepare_data_and_metadata_for_gridding() before save_prepared_metadata()")
assert len(self.prepared_metadata.filter(pl.col("file_path").is_duplicated())) == 0, (
"Problem with metadata as duplicate filepaths"
)
self.prepared_metadata.write_parquet(self.output_dir / "prepared_metadata.parquet")
if self.verbose:
print(f"prepared gauge metadata available at: {self.output_dir / 'prepared_metadata.parquet'}")