# CONUS404 Spatial Aggregation 
Using Xarray, GeoPandas and Sparse

**Goal:** Spatially aggregate a model data variable _conservatively_, i.e. by exactly partitioning each grid cell into the precise region boundaries.

**Approach:** 
- Represent both the original model grid and target grid as GeoPandas GeoSeries objects (with vector geometries)
- Compute their area overlay and turn it into a sparse matrix
- Perform matrix multiplication on the full Xarray dataset (with a time dimension)

It is quite fast and transparent.

In [None]:
%xmode minimal
import os
import xarray as xr
import geopandas as gp
import pandas as pd
import numpy as np
import sparse

import hvplot.pandas
import hvplot.xarray
import dask
import cf_xarray

from pynhd import NLDI, WaterData
import intake
import cartopy.crs as ccrs
from shapely.geometry import Polygon

## Open dataset from Intake Catalog
* Select `on-prem` dataset from /caldera if running on prem (Denali/Tallgrass)
* Select `cloud`/`osn` object store data if running elsewhere

In [None]:
# open the hytest data intake catalog
hytest_cat = intake.open_catalog("https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml")
list(hytest_cat)

In [None]:
# open the conus404 sub-catalog
cat = hytest_cat['conus404-catalog']
list(cat)

In [None]:
## Select the dataset you want to read into your notebook and preview its metadata
dataset = 'conus404-daily-osn' 
cat[dataset]

## 2) Set Up AWS Credentials (Optional)

This notebook reads data from the OSN pod by default, which is object store data on a high speed internet connection that is free to access from any environment. If you change this notebook to use one of the CONUS404 datasets stored on S3 (options ending in `-cloud`), you will be pulling data from a `requester-pays` S3 bucket. This means you have to set up your AWS credentials, else we won't be able to load the data. Please note that reading the `-cloud` data from S3 may incur charges if you are reading data outside of the us-west-2 region or running the notebook outside of the cloud altogether. If you would like to access one of the `-cloud` options, uncomment and run the following code snippet to set up your AWS credentials. You can find more info about this AWS helper function [here](https://hytest-org.github.io/hytest/environment_set_up/Help_AWS_Credentials.html).

In [None]:
# uncomment the lines below to read in your AWS credentials if you want to access data from a requester-pays bucket (-cloud)
# os.environ['AWS_PROFILE'] = 'default'
# %run ../environment_set_up/Help_AWS_Credentials.ipynb

## Parallelize with Dask 
Some of the steps we will take are aware of parallel clustered compute environments
using `dask`. We're going to start a cluster now so that future steps can take advantage
of this ability. 

This is an optional step, but speed ups data loading significantly, especially 
when accessing data from the cloud.

We have documentation on how to start a Dask Cluster in different computing environments [here](../environment_set_up/clusters.md).

In [None]:
%run ../environment_set_up/Start_Dask_Cluster_Nebari.ipynb
## If this notebook is not being run on Nebari/ESIP, replace the above 
## path name with a helper appropriate to your compute environment.  Examples:
# %run ../environment_set_up/Start_Dask_Cluster_Denali.ipynb
# %run ../environment_set_up/Start_Dask_Cluster_Tallgrass.ipynb
# %run ../environment_set_up/Start_Dask_Cluster_Desktop.ipynb
# %run ../environment_set_up/Start_Dask_Cluster_PangeoCHS.ipynb

In [None]:
ds = cat['conus404-daily-osn'].to_dask()

## Load the feature polygons (e.g. here catchment basins)

Load with geopandas:

In [None]:
%%time
# USGS gage 01482100 Delaware River at Del Mem Bridge at Wilmington De
gage_id = '01482100'
nldi = NLDI()
del_basins = nldi.get_basins(gage_id)
huc12_basins = WaterData('wbd12').bygeom(del_basins.geometry[0])

In [None]:
regions_df = huc12_basins
region_name = 'name'

Check it:

In [None]:
regions_df.plot(column=region_name, figsize=(10,4))

All geodataframes should have a coordinate reference system. This is important (and sometimes unfamiliar to users coming from the global climate world).


In [None]:
crs_orig = f'EPSG:{regions_df.crs.to_epsg()}'
crs_orig

## Aggregate CONUS404 to feature polygons

In [None]:
x = 'x'  # projected x coordinate name
y = 'y'  # projected y coordinate name

In [None]:
ds.crs

In [None]:
crs_info = ds.crs
xx = ds.x.values
yy = ds.y.values
globe = ccrs.Globe(ellipse='sphere', semimajor_axis=6370000, semiminor_axis=6370000)
lcc = ccrs.LambertConformal(globe=globe,
                            central_longitude=crs_info.longitude_of_central_meridian, 
                            central_latitude=crs_info.latitude_of_projection_origin,
                            standard_parallels=crs_info.standard_parallel)

In [None]:
lcc_wkt = lcc.to_wkt()

In [None]:
regions_df = regions_df.to_crs(lcc_wkt)

In [None]:
bbox = tuple(regions_df.total_bounds)
bbox

Subset gridded model results to bounds of spatial dataframe to save on memory and computation. More useful when the regions_df is much smaller than the footprint of the gridded model

In [None]:
ds = ds.sel(x=slice(bbox[0],bbox[2]), y=slice(bbox[1],bbox[3]))

Now we extract just the horizontal grid information.
The dataset has information about the lat and lon bounds of each cell, which we need to create the CONUS404 grid cell polygons.

In [None]:
var = 'T2'  # 2m Temp
var = 'PREC_ACC_NC' # precip

In [None]:
grid = ds[[var]].drop(['time', 'lon', 'lat', var]).reset_coords().load()
grid

Now we "stack" the data into a single 1D array. This is the first step towards transitioning to pandas.

In [None]:
grid = grid.cf.add_bounds([x, y])

In [None]:
points = grid.stack(point=(y,x))
points

This function creates geometries for a single pair of bounds.
It is not fast, but it is fast enough here.
Perhaps could be vectorized using pygeos...

In [None]:
def bounds_to_poly(x_bounds, y_bounds):
    return Polygon([
        (x_bounds[0], y_bounds[0]),
        (x_bounds[0], y_bounds[1]),
        (x_bounds[1], y_bounds[1]),
        (x_bounds[1], y_bounds[0])
    ])

We apply this function to each grid cell.

In [None]:
%%time
boxes = xr.apply_ufunc(
    bounds_to_poly,
    points.x_bounds,
    points.y_bounds,
    input_core_dims=[("bounds",),  ("bounds",)],
    output_dtypes=[np.dtype('O')],
    vectorize=True
)

Finally, we convert to a GeoDataframe, specifying the projected CRS

In [None]:
grid_df= gp.GeoDataFrame(
    data={"geometry": boxes.values, "y": boxes[y], "x": boxes[x]},
    index=boxes.indexes["point"],
    crs=lcc_wkt
)

We will now transform to an area preserving projection. This is imporant because we want to do area-weighted regridding. Here we use the [NSIDC EASE-Grid 2.0](https://nsidc.org/data/user-resources/help-center/guide-ease-grids) grid for the Northern Hemisphere. 

In [None]:
crs_area = "EPSG:6931"

regions_df = regions_df.to_crs(crs_area)
grid_df = grid_df.to_crs(crs_area)

grid_df.crs

### Key Step: Overlay the two geometries

This is the magic of geopandas; it can calculate the overlap between the original grid and the regions.
It is expensive because it has to compare 64800 grid boxes with 242 regions. 

In this dataframe, the `latitude` and `longitude` values are from the grid, while all the other columns are from the regions.

In [None]:
overlay = grid_df.overlay(regions_df, keep_geom_type=True)

This is essentially already a sparse matrix mapping one grid space to the other. How sparse?

In [None]:
sparsity = len(overlay) / (len(grid_df) * len(regions_df))
sparsity

Let's explore these overlays a little bit

We can verify that each basin's area is preserved in the overlay operation.

In [None]:
overlay.geometry.area.groupby(overlay[region_name]).sum().nlargest(10)/1e6  # km2

In [None]:
regions_df.geometry.area.groupby(regions_df[region_name]).sum().nlargest(10)

### Calculate the area fraction for each region

This is another key step. This transform tells us _how much of a country's total area comes from each of the grid cells._
This is accurate because we used an area-preserving CRS.

In [None]:
grid_cell_fraction = overlay.geometry.area.groupby(overlay[region_name]).transform(lambda x: x / x.sum())
grid_cell_fraction

We can verify that these all sum up to one.

In [None]:
grid_cell_fraction.groupby(overlay[region_name]).sum()

### Turn this into a sparse Xarray DataArray

The first step is making a MultiIndex

In [None]:
multi_index = overlay.set_index([y, x, region_name]).index
df_weights = pd.DataFrame({"weights": grid_cell_fraction.values}, index=multi_index)
df_weights

We can bring this directly into xarray as a 1D Dataset.

In [None]:
ds_weights = xr.Dataset(df_weights)

Now we unstack it into a sparse array.

In [None]:
weights_sparse = ds_weights.unstack(sparse=True, fill_value=0.).weights

Here we can clearly see that this is a sparse matrix, mapping the input space (lat, lon) to the output space (SOVEREIGNT).

### Perform Matrix Multiplication

To regrid the data, we just have to multiply the original precip dataset by this matrix.

In [None]:
#regridded = xr.dot(ds[var], weights_sparse)

In [None]:
#regridded = sparse.einsum('ij,jk->ik', ds[var].data, weights_sparse.data)

Unfortunately, that doesn't work out of the box, because sparse doesn't implement einsum (see https://github.com/pydata/sparse/issues/31).

In [None]:
# regridded[0].compute()  # fails

Sparse does implement matmul, so we can use that. But we have to do some reshaping to make it work with our data.

In [None]:
def apply_weights_matmul_sparse(weights, data):

    assert isinstance(weights, sparse.SparseArray)
    assert isinstance(data, np.ndarray)
    data = sparse.COO.from_numpy(data)
    data_shape = data.shape
    # k = nlat * nlon
    n, k = data_shape[0], data_shape[1] * data_shape[2]
    data = data.reshape((n, k))
    weights_shape = weights.shape
    k_, m = weights_shape[0] * weights_shape[1], weights_shape[2]
    assert k == k_
    weights_data = weights.reshape((k, m))

    regridded = sparse.matmul(data, weights_data)
    assert regridded.shape == (n, m)
    return regridded.todense()

### Variables at the same location on the grid can use the same weights

In [None]:
#var = 'T2'  # 2m Temp, grid cell centers
#var = 'PREC_ACC_NC' # precip, grid cell centers

In [None]:
%%time
with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    var_regridded = xr.apply_ufunc(
        apply_weights_matmul_sparse,
        weights_sparse,
        ds[var],
        join="left",
        input_core_dims=[[y, x, region_name], [y, x]],
        output_core_dims=[[region_name]],
        dask="parallelized",
        dask_gufunc_kwargs=dict(meta=[np.ndarray((0,))])
    )
    

var_regridded.compute()

### Explore timeseries data by region:
Plot monthly-average time series for two selected HUC12s.

In [None]:
ds_var = var_regridded.sel(name=['Mallory Brook-West Branch Delaware River', 'Yaleville Brook-Susquehanna River']).resample(time="MS").mean().to_dataset(region_name)

In [None]:
ds_var.hvplot(x='time', grid=True, frame_width=1000)

### Explore the mean var by region
Calculate the average value over all time steps for every HU12

In [None]:
df_mean = var_regridded.to_pandas().mean()
df_mean.name = var
df_mean = pd.DataFrame(df_mean).reset_index()

In [None]:
merged = pd.merge(regions_df, df_mean)

Convert back to geographic coordinates for plotting

In [None]:
crs_geo = 'EPSG:4326'

In [None]:
merged_geo = merged.to_crs(crs_geo)

Holoviz:

In [None]:
merged_geo.hvplot(c=var, geo=True, cmap='viridis_r', frame_width=600, tiles='EsriTerrain', 
               title='CONUS404', alpha=0.7)

### Be a good citizen and shut down the cluster
... Even though it would shut down in 15 minutes with no activity

In [None]:
client.close(); cluster.shutdown()