Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

Pangeo CONUS404 Spatial Aggregation over DRB-extent HUC12s

In this notebook, we will be showing how to aggregate gridded data to polygons. The method aggregates gridded data conservatively, i.e. by exactly partitioning each grid cell into the precise region boundaries. The method makes use of two key packages xarray and geopandas. Our implementation is based off of this Pangeo Discourse, which we have updated using more streamlined methods.

The overall approach consists of:

  • Represent both the original gridded data and target polygons as geopandas.GeoSeries objects (with vector geometries).

  • Compute their area overlay and turn it into a sparse matrix of cell weights.

  • Perform weighted aggregation using xarray.Dataset.weighted along the spatial dimensions.

It is quite fast and transparent.

The spatial polygons used in this notebook come from the NHDPlusV2 snapshot of the Watershed Boundary Dataset HUC12 boundaries provided through the PyGeoHydro python package.

We use the HyTest intake catalog to access CONUS404 from the OSN pod. This notebook provides a relatively simple and efficient workflow that can be easily run on a local computer.

%xmode minimal
import os
# Needed when boto3 >= 1.36.0 or the rechunking process will fail
# This needs to be set before the boto3 library gets loaded
# See: https://github.com/aws/aws-cli/issues/9214#issuecomment-2606619168
os.environ['AWS_REQUEST_CHECKSUM_CALCULATION'] = 'when_required'
os.environ['AWS_RESPONSE_CHECKSUM_VALIDATION'] = 'when_required'
import xarray as xr
import geopandas as gp
import pandas as pd
import numpy as np
import sparse

import hvplot.pandas
import hvplot.xarray
import dask
import cf_xarray

from pynhd import NLDI, WaterData
from pygeohydro import watershed
import pystac
from packaging.version import Version
import zarr
import cartopy.crs as ccrs
from shapely.geometry import Polygon

Open Dataset from Intake Catalog

First, let’s begin by loading the CONUS404 daily data.

def get_children(catalog, collection_id=None):
    """
    This function retrieves a specified collection from a STAC catalog/collection and prints key metadata 
    for exploring/accessing the datasets contained within it.
    If there is no collection ID provided, the collections in the top level of the catalog will be printed.
    If a collection ID is provided, it will retrieve the collection with that ID from the input catalog/collection.
    If the collection ID points to a dataset, it will print the assets available for the dataset.
    If the collection ID points to another collection, it will list the child collections in the IDed collection.

    Args:
        catalog (pystac.Catalog | pystac.Collection): The STAC catalog/collection object.
        collection_id (str): The ID of the collection or dataset to retrieve from catalog.
    
    Returns:
        collection (pystac.Catalog | pystac.Collection): The collection object corresponding to the provided ID
                                                         or the top-level catalog if no ID is provided.
    """
    dataset = False
    if collection_id:
        collection = catalog.get_child(collection_id)
        if collection.assets:
            dataset = True
            print(f"{collection_id} is a dataset. Please review the assets below and select one to open.")

        else:
            print(f"{collection_id} is a collection. Please review the child items and select one to open in the next cell.")
    else:
        collection = catalog
    if dataset==True:
        # List the assets
        for asset in collection.assets:
            print(f"Asset ID: {asset}")
            print(f"    Title: {collection.assets[asset].title}")
            print(f"    Description: {collection.assets[asset].description}")
    else:
        collections = list(collection.get_collections())
        print(f"Number of collections: {len(collections)}")
        print("Collections IDs:")
        for child_collection in collections:
            id = child_collection.id
            cite_as = "Not available"
            for link in child_collection.links:
                if link.rel == "cite-as":
                    cite_as = link.target
            print(f"- {id}, Source: {cite_as}")
    return collection
# url for the WMA STAC Catalog
catalog_url = "https://api.water.usgs.gov/gdp/pygeoapi/stac/stac-collection/"

# use pystac to read the catalog
catalog = pystac.Catalog.from_file(catalog_url)

# list the collections in the catalog
catalog = get_children(catalog)
# select a collection from the catalog, replace the collection ID with the one you want to use:
collection = get_children(catalog, collection_id="conus404")
# select a collection from the catalog, replace the collection ID with the one you want to use:
collection = get_children(collection, collection_id="conus404_daily")
# replace with the asset ID you want to use:
selected_asset_id = "zarr-s3-osn"

# read the asset metadata
asset = collection.assets[selected_asset_id]

As we can see there are two different locations for the conus404_daily data set. The locations are (1) -hovenweep meaning it is stored on the USGS Hovenweep HPC and (2) -osn meaning the data is on the USGS open storage network (OSN). As the OSN is free to access from any environment, we will use that for this example, but the location can easily be changed depending on your needs.

# uncomment the lines below to read in your AWS credentials if you want to access data from a requester-pays bucket (-cloud)
# os.environ['AWS_PROFILE'] = 'default'
# %run ../../../environment_set_up/Help_AWS_Credentials.ipynb

Finally, read in the daily CONUS404 data set.

if Version(zarr.__version__) < Version("3.0.0"):
    conus404 = xr.open_dataset(
        asset.href,
        storage_options=asset.extra_fields['xarray:storage_options'],
        **asset.extra_fields['xarray:open_kwargs']
    )
else:
    conus404 = xr.open_dataset(
    asset.href,
    storage_options=asset.extra_fields['xarray:storage_options'],
    **asset.extra_fields['xarray:open_kwargs'],
    zarr_format=2
    )

conus404

Parallelize with Dask (optional)

Some of the steps we will take are aware of parallel clustered compute environments using dask. We can start a cluster now so that future steps take advantage of this ability. This is an optional step, but speed ups data loading significantly, especially when accessing data from the cloud.

We have documentation on how to start a Dask Cluster in different computing environments here. Uncomment the cluster start up that works for your compute environment.

%run ../../../environment_set_up/Start_Dask_Cluster_Nebari.ipynb
## If this notebook is not being run on Nebari, replace the above 
## path name with a helper appropriate to your compute environment.  Examples:
# %run ../../../environment_set_up/Start_Dask_Cluster_Denali.ipynb
# %run ../../../environment_set_up/Start_Dask_Cluster_Tallgrass.ipynb
# %run ../../../environment_set_up/Start_Dask_Cluster_Desktop.ipynb

Load the Feature Polygons

Now that we have read in the CONUS404 data, we need to read in some polygons to aggregate the data. For this example, we will use the HUC12 basins within the Delaware River Basin. To get these HUC12 polygons, we can use pygeohydro.watershed to query the Hydro Network Linked Data Index (NLDI). All we need to get the basins is the general IDs of the HUC12 basins. For the Delaware Basin those are ones that start with 020401 or 020402.

%%time
wbd = watershed.WBD("huc4")
delaware_basin = wbd.byids(field="huc4", fids="0204")
huc12_basins = WaterData('wbd12').bygeom(delaware_basin.iloc[0].geometry)
huc12_basins = huc12_basins[huc12_basins['huc12'].str.startswith(('020401', '020402'))]
huc12_basins

Let’s plot the HUC12 basins to see how they look.

huc12_basins.hvplot(
    c='huc12', title="Delaware River HUC12 basins",
    coastline='50m', geo=True,
    aspect='equal', legend=False, frame_width=300
)

An important thing to note is that all geodataframes should have a coordinate reference system (CRS). Let’s check to make sure our geodataframe has a CRS.

huc12_basins.crs

Limit CONUS404 Spatial Range

With the HUC12 basins read in, we only need the CONUS404 data that spans these polygons as they are the regions we will be aggregating. So, let’s limit the CONUS404 spatial range to that of the basins. This will save on memory and computation. Note doing this is mainly useful when the region’s footprint is much smaller than the footprint of the gridded model.

To limit the spatial range, we first need to convert the CRS of the basins to that of CONUS404. Then extract the bounding box of the basins.

huc12_basins_conus404_crs = huc12_basins.to_crs(conus404.crs.crs_wkt)
bbox = huc12_basins_conus404_crs.total_bounds
bbox

Then select the CONUS404 data within the bounding box. However, when we do this, we will extend the bounds out by 5% of their range to ensure all of our basins are within the spatially limited data. We do this as the reprojections of the CRS can cause slight distortions that make polygons on the bounds not fall fully within the data.

bbox_x_range = bbox[2] - bbox[0]
bbox_y_range = bbox[3] - bbox[1]
x_range = slice(bbox[0] - bbox_x_range * 0.05,
                bbox[2] + bbox_x_range * 0.05)
y_range = slice(bbox[1] - bbox_y_range * 0.05,
                bbox[3] + bbox_y_range * 0.05)

conus404 = conus404.sel(x=x_range, y=y_range)
conus404

To make sure this worked as intended, let’s plot the full basin over the extracted CONUS404 data footprint.

# Get the footprint of the grid
cutout = xr.ones_like(conus404.isel(time=0).drop_vars(['lat', 'lon'])['ACDEWC'])
# We need to write the CRS to the CONUS404 dataset and
# reproject to the crs of the HUC12 basins dataframe for clean plotting with hvplot
cutout = cutout.rio.write_crs(conus404.crs.crs_wkt).rio.reproject('EPSG:4326')

cutout_plt = cutout.hvplot(
    coastline='50m', geo=True,
    aspect='equal', frame_width=300, colorbar=False
)
huc12_plt = huc12_basins.hvplot(
    geo=True, c='r'
)
cutout_plt * huc12_plt

Looks good!

Aggregate CONUS404 to HUC12 Polygons

Now that we have our dataset and basin polygons prepared, we are ready to aggregate.

Create Grid Polygons

The first step here is to extract the CONUS404 grid information, which consists of getting the grid center points and the grid bounds.

grid = conus404[['x', 'y']].drop_vars(['lat', 'lon']).reset_coords()
grid = grid.cf.add_bounds(['x', 'y'])
grid

Then, we “stack” the data into a single 1D array. This creates an index of (x, y) pairs of the center points that links to the bounds. This will make generating polygons of the grid cells from the bounds much simpler than any manual looping.

points = grid.stack(point=('y', 'x'))
points

Next, we can use the point pairs we just created to make polygons from the bounds. To do this, we we will make a simple function that takes the x and y bound to generate a polygon for the given grid cell. We can then apply it in parallel using xarray.apply_ufunc. Note that this step will get slower as we increase the grid size from our limited range. Perhaps could be vectorized using pygeos...

%%time
def bounds_to_poly(x_bounds, y_bounds):
    return Polygon([
        (x_bounds[0], y_bounds[0]),
        (x_bounds[0], y_bounds[1]),
        (x_bounds[1], y_bounds[1]),
        (x_bounds[1], y_bounds[0])
    ])
    
boxes = xr.apply_ufunc(
    bounds_to_poly,
    points.x_bounds,
    points.y_bounds,
    input_core_dims=[("bounds",),  ("bounds",)],
    output_dtypes=[np.dtype('O')],
    vectorize=True
)
boxes

Finally, we convert this xarray.DataArray to a geopandas.GeoDataframe, specifying the projected CRS to be the same as the CONUS404 dataset.

grid_polygons= gp.GeoDataFrame(
    data={"geometry": boxes.values, "y": boxes['y'], "x": boxes['x']},
    index=boxes.indexes["point"],
    crs=conus404.crs.crs_wkt
)
grid_polygons

Key Step: Overlay the Two Geometries

We are finally ready for the magic of this method, the weight generation using geopandas. To calculate the weights, we will use the overlay method in geopandas. It calculates the area overlap between polygons in two different GeoDataFrames, i.e. the original grid polygons and the HUC12 polygons. An important thing to note is that when generating the weights we need to use an equal area projection (i.e., equal area CRS). So, before we overlay, we will need to convert the CRS to an area preserving projection. Here we use the NSIDC EASE-Grid 2.0 grid for the Northern Hemisphere.

As long as the feature polygons only cover a few 100s of grid polygons, this is an extremely fast operation. However, if 1000s of grid cells are covered this can be a wasteful calculation, as we really only need it for the grid polygons that are partially covered. Otherwise, we could use a faster computational method for fully covered cells, but we will leave that complex topic for another notebook.

%%time
crs_area = "EPSG:6931" # Good for northern hemisphere
# crs_area = "EPSG:5070" # Good for CONUS

huc12_basins_area = huc12_basins.to_crs(crs_area)
grid_polygons = grid_polygons.to_crs(crs_area)

# overlay the polygons
overlay = grid_polygons.overlay(huc12_basins_area, keep_geom_type=True)

This is essentially already a sparse matrix, mapping one grid space to the other. How sparse? Let’s check.

sparsity = len(overlay) / (len(grid_polygons) * len(huc12_basins_area))
sparsity

Let’s explore these overlays a little bit more. Mainly, we can verify that each basin’s area is preserved in the overlay operation.

# calculate areas of HUC12s from overlay and original polygons
overlay_area = overlay.geometry.area.groupby(overlay['huc12']).sum()
huc12_area = huc12_basins_area.geometry.area.groupby(huc12_basins_area['huc12']).sum()
# find the max fractional difference
(np.abs(overlay_area - huc12_area) / huc12_area).max()

Nice! So, it worked and only have differences within machine precision.

Calculate the Weights (i.e., Area Fraction for each Region)

Now that we have the overlay of the grid polygons with the HUC12 polygons, we only need to transform this to weights for each grid cell to aggregate. This transform tells us how much of a HUC12 polygon’s total area is within each of the grid cells. This is accurate because we used an area-preserving CRS. Calculating this fractional area is again simple with geopandas.

grid_cell_fraction = overlay.geometry.area.groupby(overlay['huc12']).transform(lambda x: x / x.sum())
grid_cell_fraction

We can verify that these all sum up to one.

grid_cell_fraction.groupby(overlay['huc12']).sum().unique()

However, in their current Series form, the weights aren’t very useful. What we need is to convert them to a sparse array within xarray. Thankfully, xarray can easily do this if we add a MultiIndex for the grid cells’ center points and HUC12 IDs to the cell fraction DataFrame.

multi_index = overlay.set_index(['y', 'x', 'huc12']).index
df_weights = pd.DataFrame({"weights": grid_cell_fraction.values}, index=multi_index)
df_weights

We can bring this directly into xarray as a 1D Dataset and then unstack it into a sparse array.

ds_weights = xr.Dataset(df_weights)
weights_sparse = ds_weights.unstack(sparse=True, fill_value=0.).weights
weights_sparse

Again, we can clearly see that this is a sparse matrix from the density. This is now all we need to do our aggregation.

Perform the Aggregation

Unlike deriving the weights, actually performing the aggregation is a simple one line of code. This is because we utilize xarray.Dataset.weighted to do our weighted calculations. It will happily take a sparse array as weights and compute the aggregation.

However, rather than aggregating all variables, let’s only aggregate two in order to reduce the computational time.

%%time
# Note the .compute() at the end to actually do the computation here vs lazy computing
huc12_aggregation = conus404[['T2', 'PREC_ACC_NC']].weighted(weights_sparse).sum(dim=['x', 'y']).compute()
huc12_aggregation

Note that our aggregations are still sparse arrays, which is the cost of using a sparse array as our weights. However, the density of these sparse arrays is large, meaning we want to convert them out of sparse arrays and back to dense arrays. To do this, we can use apply_ufunc with a lambda function to convert.

huc12_aggregation = xr.apply_ufunc(lambda data: data.todense(), huc12_aggregation)

It important to note that we used a sum aggregation above rather than a mean. In theory, the two methods should be the same as a weighted sum is a weighted mean if the sum of weights is one (which they are in our case):

weighted mean: i=1nwixii=1nwi\textrm{weighted mean:\ }\frac{\sum_{i=1}^n w_i x_i}{\sum_{i=1}^n w_i}
weighted sum: i=1nwixi\textrm{weighted sum:\ } \sum_{i=1}^n w_i x_i
i=1nwixii=1nwi=i=1nwixi ifi=1nwi=1\frac{\sum_{i=1}^n w_i x_i}{\sum_{i=1}^n w_i} = \sum_{i=1}^n w_i x_i \ \textrm{if} \sum_{i=1}^n w_i = 1

However, in practice, this will matter if your data contains NaN values. When using a sum with NaNs, the NaNs will effectively be treated as zeros, meaning any all NaN aggregation will result in a 0 value. For the mean, NaNs are ignored in the calculation, but all NaN aggregations will result in a NaN value being returned. Therefore, the two methods are the same when you have data mixed with NaNs, but if you want all NaN aggregations to return NaN use a mean. Otherwise, if you want it to return 0, use a sum.

Explore the Aggregation

Now that we have the aggregated data and converted it to dense form, let’s make some plots!

Mean of Variable by HUC12

First, we will calculate and plot mean value over all time steps for every HU12.

df_mean = huc12_aggregation.mean(dim='time').to_dataframe()
df_mean

We need to merge this with the HUC12 basin GeoDataFrame to get the geometry info.

df_mean = pd.merge(huc12_basins, df_mean, left_on='huc12', right_on='huc12')
df_mean.head(3)

Time to plot our two example variables!

temp_plt = df_mean.hvplot(
    c='T2', geo=True, coastline='50m', cmap='viridis',
    title='Mean Temperature at 2m [K]', frame_width=300, aspect='equal'
)
precip_plt = df_mean.hvplot(
    c='PREC_ACC_NC', geo=True, coastline='50m', cmap='viridis',
    title='Mean 24hr Accumulated Precipitation [mm]', frame_width=300,
    aspect='equal'
)

temp_plt + precip_plt

Mean Monthly Time Series

Finally, let’s plot the mean monthly time series for each HUC12.

monthly_timeseries = huc12_aggregation.resample(time="MS").mean()
monthly_timeseries
monthly_timeseries.hvplot(x='time', grid=True)

Shut Down the Dask Client

If utilized, we should shut down the dask client.

client.close()