CONUS404 Spatial Aggregation#

Using Xarray, GeoPandas and Sparse

Goal: Spatially aggregate a model data variable conservatively, i.e. by exactly partitioning each grid cell into the precise region boundaries.

Approach:

  • Represent both the original model grid and target grid as GeoPandas GeoSeries objects (with vector geometries)

  • Compute their area overlay and turn it into a sparse matrix

  • Perform matrix multiplication on the full Xarray dataset (with a time dimension)

It is quite fast and transparent.

%xmode minimal
import os
import xarray as xr
import geopandas as gp
import pandas as pd
import numpy as np
import sparse

import hvplot.pandas
import hvplot.xarray
import dask
import cf_xarray

from pynhd import NLDI, WaterData
import intake
import cartopy.crs as ccrs
from shapely.geometry import Polygon
Exception reporting mode: Minimal

Open dataset from Intake Catalog#

  • Select on-prem dataset from /caldera if running on prem (Denali/Tallgrass)

  • Select cloud/osn object store data if running elsewhere

# open the hytest data intake catalog
hytest_cat = intake.open_catalog("https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml")
list(hytest_cat)
['conus404-catalog',
 'benchmarks-catalog',
 'conus404-drb-eval-tutorial-catalog',
 'nhm-v1.0-daymet-catalog',
 'nhm-v1.1-c404-bc-catalog',
 'nhm-v1.1-gridmet-catalog',
 'trends-and-drivers-catalog',
 'nhm-prms-v1.1-gridmet-format-testing-catalog',
 'nwis-streamflow-usgs-gages-onprem',
 'nwis-streamflow-usgs-gages-osn',
 'nwm21-streamflow-usgs-gages-onprem',
 'nwm21-streamflow-usgs-gages-osn',
 'nwm21-streamflow-cloud',
 'geofabric_v1_1-zip-osn',
 'geofabric_v1_1_POIs_v1_1-osn',
 'geofabric_v1_1_TBtoGFv1_POIs-osn',
 'geofabric_v1_1_nhru_v1_1-osn',
 'geofabric_v1_1_nhru_v1_1_simp-osn',
 'geofabric_v1_1_nsegment_v1_1-osn',
 'gages2_nndar-osn',
 'wbd-zip-osn',
 'huc12-geoparquet-osn',
 'huc12-gpkg-osn',
 'nwm21-scores',
 'lcmap-cloud',
 'rechunking-tutorial-osn',
 'pointsample-tutorial-sites-osn',
 'pointsample-tutorial-output-osn']
# open the conus404 sub-catalog
cat = hytest_cat['conus404-catalog']
list(cat)
['conus404-hourly-onprem-hw',
 'conus404-hourly-cloud',
 'conus404-hourly-osn',
 'conus404-daily-diagnostic-onprem-hw',
 'conus404-daily-diagnostic-cloud',
 'conus404-daily-diagnostic-osn',
 'conus404-daily-onprem-hw',
 'conus404-daily-cloud',
 'conus404-daily-osn',
 'conus404-monthly-onprem-hw',
 'conus404-monthly-cloud',
 'conus404-monthly-osn',
 'conus404-hourly-ba-onprem-hw',
 'conus404-hourly-ba-osn',
 'conus404-daily-ba-onprem',
 'conus404-daily-ba-osn',
 'conus404-pgw-hourly-onprem-hw',
 'conus404-pgw-hourly-osn',
 'conus404-pgw-daily-diagnostic-onprem-hw',
 'conus404-pgw-daily-diagnostic-osn']
## Select the dataset you want to read into your notebook and preview its metadata
dataset = 'conus404-daily-osn' 
cat[dataset]
conus404-daily-osn:
  args:
    consolidated: true
    storage_options:
      anon: true
      client_kwargs:
        endpoint_url: https://usgs.osn.mghpcc.org/
      requester_pays: false
    urlpath: s3://hytest/conus404/conus404_daily.zarr
  description: "CONUS404 daily values for subset of model output variables derived\
    \ from hourly values. This data is stored on HyTEST\u2019s Open Storage Network\
    \ (OSN) pod. This data can be read with the S3 API and is free to work with in\
    \ any computing environment (there are no egress fees)."
  driver: intake_xarray.xzarr.ZarrSource
  metadata:
    catalog_dir: https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/subcatalogs

2) Set Up AWS Credentials (Optional)#

This notebook reads data from the OSN pod by default, which is object store data on a high speed internet connection that is free to access from any environment. If you change this notebook to use one of the CONUS404 datasets stored on S3 (options ending in -cloud), you will be pulling data from a requester-pays S3 bucket. This means you have to set up your AWS credentials, else we won’t be able to load the data. Please note that reading the -cloud data from S3 may incur charges if you are reading data outside of the us-west-2 region or running the notebook outside of the cloud altogether. If you would like to access one of the -cloud options, uncomment and run the following code snippet to set up your AWS credentials. You can find more info about this AWS helper function here.

# uncomment the lines below to read in your AWS credentials if you want to access data from a requester-pays bucket (-cloud)
# os.environ['AWS_PROFILE'] = 'default'
# %run ../environment_set_up/Help_AWS_Credentials.ipynb

Parallelize with Dask#

Some of the steps we will take are aware of parallel clustered compute environments using dask. We’re going to start a cluster now so that future steps can take advantage of this ability.

This is an optional step, but speed ups data loading significantly, especially when accessing data from the cloud.

We have documentation on how to start a Dask Cluster in different computing environments here.

%run ../environment_set_up/Start_Dask_Cluster_Nebari.ipynb
## If this notebook is not being run on Nebari/ESIP, replace the above 
## path name with a helper appropriate to your compute environment.  Examples:
# %run ../environment_set_up/Start_Dask_Cluster_Denali.ipynb
# %run ../environment_set_up/Start_Dask_Cluster_Tallgrass.ipynb
# %run ../environment_set_up/Start_Dask_Cluster_Desktop.ipynb
# %run ../environment_set_up/Start_Dask_Cluster_PangeoCHS.ipynb
The 'cluster' object can be used to adjust cluster behavior.  i.e. 'cluster.adapt(minimum=10)'
The 'client' object can be used to directly interact with the cluster.  i.e. 'client.submit(func)' 
The link to view the client dashboard is:
>  https://hytestnebari.dev-wma.chs.usgs.gov/gateway/clusters/dev.7ab65ed9c5f240b69f922d167603f16c/status
ds = cat['conus404-daily-osn'].to_dask()
/home/conda/global/16102bfe-1731002172-4-pangeo/lib/python3.11/site-packages/intake_xarray/base.py:21: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.
  'dims': dict(self._ds.dims),

Load the feature polygons (e.g. here catchment basins)#

Load with geopandas:

%%time
# USGS gage 01482100 Delaware River at Del Mem Bridge at Wilmington De
gage_id = '01482100'
nldi = NLDI()
del_basins = nldi.get_basins(gage_id)
huc12_basins = WaterData('wbd12').bygeom(del_basins.geometry[0])
/home/conda/global/16102bfe-1731002172-4-pangeo/lib/python3.11/site-packages/geopandas/geoseries.py:720: FutureWarning: Series.__getitem__ treating keys as positions is deprecated. In a future version, integer keys will always be treated as labels (consistent with DataFrame behavior). To access a value by position, use `ser.iloc[pos]`
  val = getattr(super(), mtd)(*args, **kwargs)
CPU times: user 1.67 s, sys: 82.3 ms, total: 1.76 s
Wall time: 3.25 s
regions_df = huc12_basins
region_name = 'name'

Check it:

regions_df.plot(column=region_name, figsize=(10,4))
<Axes: >
../_images/0cb6ed2382606189d3fb6866d341cbf2456004e5fc18066dc6022570aad747ca.png

All geodataframes should have a coordinate reference system. This is important (and sometimes unfamiliar to users coming from the global climate world).

crs_orig = f'EPSG:{regions_df.crs.to_epsg()}'
crs_orig
'EPSG:4326'

Aggregate CONUS404 to feature polygons#

x = 'x'  # projected x coordinate name
y = 'y'  # projected y coordinate name
ds.crs
<xarray.DataArray 'crs' ()> Size: 8B
[1 values with dtype=int64]
Attributes: (12/16)
    crs_wkt:                        PROJCRS["unknown",BASEGEOGCRS["unknown",D...
    false_easting:                  0.0
    false_northing:                 0.0
    geographic_crs_name:            unknown
    grid_mapping_name:              lambert_conformal_conic
    horizontal_datum_name:          unknown
    ...                             ...
    prime_meridian_name:            Greenwich
    projected_crs_name:             unknown
    reference_ellipsoid_name:       unknown
    semi_major_axis:                6370000.0
    semi_minor_axis:                6370000.0
    standard_parallel:              [30.0, 50.0]
crs_info = ds.crs
xx = ds.x.values
yy = ds.y.values
globe = ccrs.Globe(ellipse='sphere', semimajor_axis=6370000, semiminor_axis=6370000)
lcc = ccrs.LambertConformal(globe=globe,
                            central_longitude=crs_info.longitude_of_central_meridian, 
                            central_latitude=crs_info.latitude_of_projection_origin,
                            standard_parallels=crs_info.standard_parallel)
lcc_wkt = lcc.to_wkt()
regions_df = regions_df.to_crs(lcc_wkt)
bbox = tuple(regions_df.total_bounds)
bbox
(1751263.2763795916, 214981.84499807426, 1964829.8118029477, 619548.96593117)

Subset gridded model results to bounds of spatial dataframe to save on memory and computation. More useful when the regions_df is much smaller than the footprint of the gridded model

ds = ds.sel(x=slice(bbox[0],bbox[2]), y=slice(bbox[1],bbox[3]))

Now we extract just the horizontal grid information. The dataset has information about the lat and lon bounds of each cell, which we need to create the CONUS404 grid cell polygons.

var = 'T2'  # 2m Temp
var = 'PREC_ACC_NC' # precip
grid = ds[[var]].drop(['time', 'lon', 'lat', var]).reset_coords().load()
grid
/tmp/ipykernel_3526/93207482.py:1: DeprecationWarning: dropping variables using `drop` is deprecated; use drop_vars.
  grid = ds[[var]].drop(['time', 'lon', 'lat', var]).reset_coords().load()
<xarray.Dataset> Size: 1kB
Dimensions:  (x: 54, y: 101)
Coordinates:
  * x        (x) float64 432B 1.752e+06 1.756e+06 ... 1.96e+06 1.964e+06
  * y        (y) float64 808B 2.16e+05 2.2e+05 2.24e+05 ... 6.12e+05 6.16e+05
Data variables:
    *empty*
Attributes: (12/148)
    AER_ANGEXP_OPT:                  1
    AER_ANGEXP_VAL:                  1.2999999523162842
    AER_AOD550_OPT:                  1
    AER_AOD550_VAL:                  0.11999999731779099
    AER_ASY_OPT:                     1
    AER_ASY_VAL:                     0.8999999761581421
    ...                              ...
    WEST-EAST_PATCH_START_STAG:      1
    WEST-EAST_PATCH_START_UNSTAG:    1
    W_DAMPING:                       1
    YSU_TOPDOWN_PBLMIX:              0
    history:                         Tue Mar 29 16:35:22 2022: ncrcat -A -vW ...
    history_of_appended_files:       Tue Mar 29 16:35:22 2022: Appended file ...

Now we “stack” the data into a single 1D array. This is the first step towards transitioning to pandas.

grid = grid.cf.add_bounds([x, y])
points = grid.stack(point=(y,x))
points
<xarray.Dataset> Size: 305kB
Dimensions:   (bounds: 2, point: 5454)
Coordinates:
    x_bounds  (bounds, point) float64 87kB 1.75e+06 1.754e+06 ... 1.966e+06
    y_bounds  (bounds, point) float64 87kB 2.14e+05 2.14e+05 ... 6.18e+05
  * point     (point) object 44kB MultiIndex
  * y         (point) float64 44kB 2.16e+05 2.16e+05 ... 6.16e+05 6.16e+05
  * x         (point) float64 44kB 1.752e+06 1.756e+06 ... 1.96e+06 1.964e+06
Dimensions without coordinates: bounds
Data variables:
    *empty*
Attributes: (12/148)
    AER_ANGEXP_OPT:                  1
    AER_ANGEXP_VAL:                  1.2999999523162842
    AER_AOD550_OPT:                  1
    AER_AOD550_VAL:                  0.11999999731779099
    AER_ASY_OPT:                     1
    AER_ASY_VAL:                     0.8999999761581421
    ...                              ...
    WEST-EAST_PATCH_START_STAG:      1
    WEST-EAST_PATCH_START_UNSTAG:    1
    W_DAMPING:                       1
    YSU_TOPDOWN_PBLMIX:              0
    history:                         Tue Mar 29 16:35:22 2022: ncrcat -A -vW ...
    history_of_appended_files:       Tue Mar 29 16:35:22 2022: Appended file ...

This function creates geometries for a single pair of bounds. It is not fast, but it is fast enough here. Perhaps could be vectorized using pygeos…

def bounds_to_poly(x_bounds, y_bounds):
    return Polygon([
        (x_bounds[0], y_bounds[0]),
        (x_bounds[0], y_bounds[1]),
        (x_bounds[1], y_bounds[1]),
        (x_bounds[1], y_bounds[0])
    ])

We apply this function to each grid cell.

%%time
boxes = xr.apply_ufunc(
    bounds_to_poly,
    points.x_bounds,
    points.y_bounds,
    input_core_dims=[("bounds",),  ("bounds",)],
    output_dtypes=[np.dtype('O')],
    vectorize=True
)
CPU times: user 140 ms, sys: 0 ns, total: 140 ms
Wall time: 139 ms

Finally, we convert to a GeoDataframe, specifying the projected CRS

grid_df= gp.GeoDataFrame(
    data={"geometry": boxes.values, "y": boxes[y], "x": boxes[x]},
    index=boxes.indexes["point"],
    crs=lcc_wkt
)

We will now transform to an area preserving projection. This is imporant because we want to do area-weighted regridding. Here we use the NSIDC EASE-Grid 2.0 grid for the Northern Hemisphere.

crs_area = "EPSG:6931"

regions_df = regions_df.to_crs(crs_area)
grid_df = grid_df.to_crs(crs_area)

grid_df.crs
<Projected CRS: EPSG:6931>
Name: WGS 84 / NSIDC EASE-Grid 2.0 North
Axis Info [cartesian]:
- X[south]: Easting (metre)
- Y[south]: Northing (metre)
Area of Use:
- name: Northern hemisphere.
- bounds: (-180.0, 0.0, 180.0, 90.0)
Coordinate Operation:
- name: US NSIDC EASE-Grid 2.0 North
- method: Lambert Azimuthal Equal Area
Datum: World Geodetic System 1984 ensemble
- Ellipsoid: WGS 84
- Prime Meridian: Greenwich

Key Step: Overlay the two geometries#

This is the magic of geopandas; it can calculate the overlap between the original grid and the regions. It is expensive because it has to compare 64800 grid boxes with 242 regions.

In this dataframe, the latitude and longitude values are from the grid, while all the other columns are from the regions.

overlay = grid_df.overlay(regions_df, keep_geom_type=True)

This is essentially already a sparse matrix mapping one grid space to the other. How sparse?

sparsity = len(overlay) / (len(grid_df) * len(regions_df))
sparsity
0.002482422155259004

Let’s explore these overlays a little bit

We can verify that each basin’s area is preserved in the overlay operation.

overlay.geometry.area.groupby(overlay[region_name]).sum().nlargest(10)/1e6  # km2
name
Delaware River-Delaware Bay               1073.499824
Ashokan Reservoir-Esopus Creek             205.804534
Black Creek                                204.864521
Trout Creek                                186.163950
Lake Wallenpaupack-Wallenpaupack Creek     179.034924
Bear Creek                                 176.716685
Upper Mahanoy Creek                        170.467855
Headwaters Paulins Kill                    166.996307
Snitz Creek-Quittapahilla Creek            159.771219
Stony Creek-Lehigh River                   159.503079
dtype: float64
regions_df.geometry.area.groupby(regions_df[region_name]).sum().nlargest(10)
name
Delaware River-Delaware Bay               1.073500e+09
Ashokan Reservoir-Esopus Creek            2.058045e+08
Black Creek                               2.048645e+08
Trout Creek                               1.861640e+08
Lake Wallenpaupack-Wallenpaupack Creek    1.790349e+08
Bear Creek                                1.767167e+08
Upper Mahanoy Creek                       1.704679e+08
Headwaters Paulins Kill                   1.669963e+08
Snitz Creek-Quittapahilla Creek           1.597712e+08
Stony Creek-Lehigh River                  1.595031e+08
dtype: float64

Calculate the area fraction for each region#

This is another key step. This transform tells us how much of a country’s total area comes from each of the grid cells. This is accurate because we used an area-preserving CRS.

grid_cell_fraction = overlay.geometry.area.groupby(overlay[region_name]).transform(lambda x: x / x.sum())
grid_cell_fraction
0       0.006247
1       0.006877
2       0.000807
3       0.004617
4       0.014839
          ...   
6223    0.003519
6224    0.012164
6225    0.186799
6226    0.183510
6227    0.091903
Length: 6228, dtype: float64

We can verify that these all sum up to one.

grid_cell_fraction.groupby(overlay[region_name]).sum()
name
Alexauken Creek                      1.0
Allegheny Creek-Delaware River       1.0
Angelica Creek-Schuylkill River      1.0
Antietam Creek                       1.0
Appenzell Creek                      1.0
                                    ... 
Woodland Creek-Esopus Creek          1.0
Wrangle Brook                        1.0
Wright Creek-Lehigh River            1.0
Wyomissing Creek                     1.0
Yaleville Brook-Susquehanna River    1.0
Length: 451, dtype: float64

Turn this into a sparse Xarray DataArray#

The first step is making a MultiIndex

multi_index = overlay.set_index([y, x, region_name]).index
df_weights = pd.DataFrame({"weights": grid_cell_fraction.values}, index=multi_index)
df_weights
weights
y x name
216000.0 1920000.0 Delaware River-Delaware Bay 0.006247
1924000.0 Delaware River-Delaware Bay 0.006877
220000.0 1912000.0 Delaware River-Delaware Bay 0.000807
1916000.0 Delaware River-Delaware Bay 0.004617
1920000.0 Delaware River-Delaware Bay 0.014839
... ... ... ...
616000.0 1860000.0 Mine Kill 0.003519
1864000.0 Middle Brook 0.012164
Mine Kill 0.186799
1868000.0 Mine Kill 0.183510
1872000.0 Mine Kill 0.091903

6228 rows × 1 columns

We can bring this directly into xarray as a 1D Dataset.

ds_weights = xr.Dataset(df_weights)

Now we unstack it into a sparse array.

weights_sparse = ds_weights.unstack(sparse=True, fill_value=0.).weights

Here we can clearly see that this is a sparse matrix, mapping the input space (lat, lon) to the output space (SOVEREIGNT).

Perform Matrix Multiplication#

To regrid the data, we just have to multiply the original precip dataset by this matrix.

#regridded = xr.dot(ds[var], weights_sparse)
#regridded = sparse.einsum('ij,jk->ik', ds[var].data, weights_sparse.data)

Unfortunately, that doesn’t work out of the box, because sparse doesn’t implement einsum (see https://github.com/pydata/sparse/issues/31).

# regridded[0].compute()  # fails

Sparse does implement matmul, so we can use that. But we have to do some reshaping to make it work with our data.

def apply_weights_matmul_sparse(weights, data):

    assert isinstance(weights, sparse.SparseArray)
    assert isinstance(data, np.ndarray)
    data = sparse.COO.from_numpy(data)
    data_shape = data.shape
    # k = nlat * nlon
    n, k = data_shape[0], data_shape[1] * data_shape[2]
    data = data.reshape((n, k))
    weights_shape = weights.shape
    k_, m = weights_shape[0] * weights_shape[1], weights_shape[2]
    assert k == k_
    weights_data = weights.reshape((k, m))

    regridded = sparse.matmul(data, weights_data)
    assert regridded.shape == (n, m)
    return regridded.todense()

Variables at the same location on the grid can use the same weights#

#var = 'T2'  # 2m Temp, grid cell centers
#var = 'PREC_ACC_NC' # precip, grid cell centers
%%time
with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    var_regridded = xr.apply_ufunc(
        apply_weights_matmul_sparse,
        weights_sparse,
        ds[var],
        join="left",
        input_core_dims=[[y, x, region_name], [y, x]],
        output_core_dims=[[region_name]],
        dask="parallelized",
        dask_gufunc_kwargs=dict(meta=[np.ndarray((0,))])
    )
    

var_regridded.compute()
CPU times: user 2.8 s, sys: 221 ms, total: 3.02 s
Wall time: 1min 37s
<xarray.DataArray (time: 15707, name: 451)> Size: 57MB
array([[4.23796069e+01, 2.88045196e+01, 1.00145840e+01, ...,
        1.92430777e+00, 6.47324551e+00, 2.08002196e+00],
       [1.90177290e-02, 1.21817353e-01, 7.74821945e-01, ...,
        1.82437763e+00, 2.55103870e+00, 1.39471052e-01],
       [3.62181539e+01, 3.68524476e+01, 1.23457911e+01, ...,
        4.18607148e+01, 4.03074628e+00, 1.61067397e+01],
       ...,
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, ...,
        3.73734659e-06, 0.00000000e+00, 5.24199928e-06],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, ...,
        0.00000000e+00, 0.00000000e+00, 1.77499443e-03],
       [           nan,            nan,            nan, ...,
                   nan,            nan,            nan]])
Coordinates:
  * name     (name) object 4kB 'Alexauken Creek' ... 'Yaleville Brook-Susqueh...
  * time     (time) datetime64[ns] 126kB 1979-10-01 1979-10-02 ... 2022-10-01

Explore timeseries data by region:#

Plot monthly-average time series for two selected HUC12s.

ds_var = var_regridded.sel(name=['Mallory Brook-West Branch Delaware River', 'Yaleville Brook-Susquehanna River']).resample(time="MS").mean().to_dataset(region_name)
ds_var.hvplot(x='time', grid=True, frame_width=1000)

Explore the mean var by region#

Calculate the average value over all time steps for every HU12

df_mean = var_regridded.to_pandas().mean()
df_mean.name = var
df_mean = pd.DataFrame(df_mean).reset_index()
merged = pd.merge(regions_df, df_mean)

Convert back to geographic coordinates for plotting

crs_geo = 'EPSG:4326'
merged_geo = merged.to_crs(crs_geo)

Holoviz:

merged_geo.hvplot(c=var, geo=True, cmap='viridis_r', frame_width=600, tiles='EsriTerrain', 
               title='CONUS404', alpha=0.7)

Be a good citizen and shut down the cluster#

… Even though it would shut down in 15 minutes with no activity

client.close(); cluster.shutdown()