Re-Chunking Larger Datasets#

This notebook extends ideas covered in the basic workflow. This notebook will perfrom the same operations, but will work on the much larger dataset, and involve some parallelization using the dask scheduler.

Warning

You should run this only on a cloud compute node – on ESIP Nebari, for example. We will be reading and writing enormous amounts of data to S3 buckets. To do that over a typical network connection will saturate your bandwidth and take days to complete.

System Setup#

import os
import xarray as xr
import dask
import intake

# Activate logging
import logging
logging.basicConfig(level=logging.INFO, force=True)

Plumb Data Source#

We’re going to look at a particular dataset from the National Water Model Reanalysis Version 2.1. The dataset is part of the AWS Open Data Program, and is included in the HyTEST data catalog.

url = 'https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml'
cat = intake.open_catalog(url)
cat['nwm21-streamflow-cloud']
nwm21-streamflow-cloud:
  args:
    consolidated: true
    storage_options:
      anon: true
    urlpath: s3://noaa-nwm-retrospective-2-1-zarr-pds/chrtout.zarr
  description: National Water Model 2.1 CHRTOUT on AWS
  driver: intake_xarray.xzarr.ZarrSource
  metadata:
    catalog_dir: https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog

Load the zarr data#

We’ll take advantage of the intake mechanism and load the data directly. We’ll need to set up our AWS credentials first, since this data is stored on an S3 bucket.

os.environ['AWS_PROFILE'] = "osn-hytest-scratch"
%run ../../../environment_set_up/Help_AWS_Credentials.ipynb
ds = cat['nwm21-streamflow-cloud'].to_dask()
indexer = ds.gage_id != ''.rjust(15).encode()
smplData = ds.where(indexer.compute(), drop=True) # subset to only those features with a valid gage_id
smplData.drop('crs') # Not needed/wanted for this analysis
smplData
/home/conda/global/16102bfe-1731002172-4-pangeo/lib/python3.11/site-packages/intake_xarray/base.py:21: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.
  'dims': dict(self._ds.dims),
/tmp/ipykernel_7263/974520311.py:4: DeprecationWarning: dropping variables using `drop` is deprecated; use drop_vars.
  smplData.drop('crs') # Not needed/wanted for this analysis
<xarray.Dataset> Size: 47GB
Dimensions:     (feature_id: 7994, time: 367439)
Coordinates:
    elevation   (feature_id) float32 32kB 134.5 69.77 1.508e+03 ... 814.0 -10.02
  * feature_id  (feature_id) int32 32kB 3109 3923 ... 1170023539 1180000535
    gage_id     (feature_id) |S15 120kB b'       01030350' ... b'       10254...
    latitude    (feature_id) float32 32kB 45.71 45.51 40.38 ... 48.99 49.0 32.67
    longitude   (feature_id) float32 32kB -68.16 -68.3 -105.1 ... -116.2 -115.5
    order       (feature_id) int32 32kB 3 5 5 3 1 1 2 1 1 ... 8 8 8 8 8 3 9 5 5
  * time        (time) datetime64[ns] 3MB 1979-02-01T01:00:00 ... 2020-12-31T...
Data variables:
    crs         (feature_id) object 64kB b'' b'' b'' b'' b'' ... b'' b'' b'' b''
    streamflow  (time, feature_id) float64 23GB dask.array<chunksize=(672, 7994), meta=np.ndarray>
    velocity    (time, feature_id) float64 23GB dask.array<chunksize=(672, 7994), meta=np.ndarray>
Attributes:
    TITLE:                OUTPUT FROM WRF-Hydro v5.2.0-beta2
    code_version:         v5.2.0-beta2
    featureType:          timeSeries
    model_configuration:  retrospective
    proj4:                +proj=lcc +units=m +a=6370000.0 +b=6370000.0 +lat_1...

Restrict for Tutorial#

This is a demonstration workflow, which means we don’t really need to work on the entire dataset – which is very large. We are going to cut this input dataset down to be just the first 100 feature_id values, so that this tutorial will run in reasonable time.

For processing a full-sized dataset, you’d just skip this step where we slice off a representative example of the data. Expect run time to increase in proportion to the size of the data being processed. |

smplData = smplData.isel(feature_id=slice(0, 100))
smplData
<xarray.Dataset> Size: 591MB
Dimensions:     (feature_id: 100, time: 367439)
Coordinates:
    elevation   (feature_id) float32 400B 134.5 69.77 1.508e+03 ... 345.9 294.0
  * feature_id  (feature_id) int32 400B 3109 3923 12932 ... 414203 414275 417955
    gage_id     (feature_id) |S15 2kB b'       01030350' ... b'       07241800'
    latitude    (feature_id) float32 400B 45.71 45.51 40.38 ... 35.5 35.57 35.34
    longitude   (feature_id) float32 400B -68.16 -68.3 -105.1 ... -97.36 -96.87
    order       (feature_id) int32 400B 3 5 5 3 1 1 2 1 1 ... 3 5 4 4 6 6 6 6 6
  * time        (time) datetime64[ns] 3MB 1979-02-01T01:00:00 ... 2020-12-31T...
Data variables:
    crs         (feature_id) object 800B b'' b'' b'' b'' b'' ... b'' b'' b'' b''
    streamflow  (time, feature_id) float64 294MB dask.array<chunksize=(672, 100), meta=np.ndarray>
    velocity    (time, feature_id) float64 294MB dask.array<chunksize=(672, 100), meta=np.ndarray>
Attributes:
    TITLE:                OUTPUT FROM WRF-Hydro v5.2.0-beta2
    code_version:         v5.2.0-beta2
    featureType:          timeSeries
    model_configuration:  retrospective
    proj4:                +proj=lcc +units=m +a=6370000.0 +b=6370000.0 +lat_1...

Note that this data has the full range of time values (367439 time steps) and all of the variables (streamflow, velocity, …). We just selected 100 feature_ids to work with so that the example data will execute more quickly.

Spin up Dask Cluster#

Our rechunking operation will be able to work in parallel. To do that, we will spin up a dask cluster on the cloud hardware to schedule the various workers. Note that this cluster must be configured with a specific user profile with permissions to write to our eventual output location.

%run ../../../environment_set_up/Start_Dask_Cluster_Nebari.ipynb
The 'cluster' object can be used to adjust cluster behavior.  i.e. 'cluster.adapt(minimum=10)'
The 'client' object can be used to directly interact with the cluster.  i.e. 'client.submit(func)' 
The link to view the client dashboard is:
>  https://hytestnebari.dev-wma.chs.usgs.gov/gateway/clusters/dev.f4cffc2741b8423b9bb0db239828f07c/status

Re-Chunk Plan#

We will configure a new chunking plan which will favor time-series analysis. Using the dimensions of the data:

  • 367439 time steps

  • 100 feature IDs

We can write the new plan as:

# The new chunking plan:
chunk_plan = {
    'streamflow': {'time': 367439, 'feature_id': 1}, # all time records in one chunk for each feature_id
    'velocity': {'time': 367439, 'feature_id': 1},
    'elevation': (100,),
    'gage_id': (100,),
    'latitude': (100,),
    'longitude': (100,),    
    'order': (100,),    
    'time': (367439,), # all time coordinates in one chunk
    'feature_id': (100,) # all feature_id coordinates in one chunk
}

This will generate chunks which are 1(feature_id) x 367439(time) arrays of float64.

#       time  * id * float64
bytes = 367439 * 1 * 8
kbytes = bytes / (2**10)
mbytes = kbytes / (2**10)
print(f"chunk size: {bytes=} ({kbytes=:.2f})({mbytes=:.4f})")
chunk size: bytes=2939512 (kbytes=2870.62)(mbytes=2.8033)
# Manually reset the chunking metadata in prep for re-chunking
smplData = smplData.chunk(chunks={'feature_id':1, 'time': 367439})
for x in smplData.variables:
    smplData[x].encoding['chunks'] = None

Set up output location#

With this plan, we can ask rechunker to re-write the data using the prescribed chunking pattern.

Unlike with the smaller dataset, we need to write this very large dataset to an object store in the datacenter: an S3 ‘bucket’. So we need to set that up so that rechunker will have a suitable place to write data. This new data will be a complete copy of the original, just re-organized a bit.

from getpass import getuser
import fsspec
uname=getuser()

fsw = fsspec.filesystem(
    's3', 
    anon=False, 
    default_fill_cache=False, 
    skip_instance_cache=True, 
    client_kwargs={'endpoint_url': 'https://usgs.osn.mghpcc.org', }
)

workspace = 's3://hytest-scratch/'
testDir = workspace + "testing/"
myDir = testDir + f'{uname}/'
fsw.mkdir(testDir)
INFO:aiobotocore.credentials:Found credentials in environment variables.
staging = fsw.get_mapper(myDir + 'tutorial_staging.zarr')
outfile = fsw.get_mapper(myDir + 'tutorial_rechunked.zarr')
for fname in [staging, outfile]:
    print(f"Ensuring {fname.root} is empty...", end='')
    try:
        fsw.rm(fname.root, recursive=True)
    except:
        FileNotFoundError
    print(" Done.")
Ensuring hytest-scratch/testing/asnyder/tutorial_staging.zarr is empty...
 Done.
Ensuring hytest-scratch/testing/asnyder/tutorial_rechunked.zarr is empty...
 Done.

Ready to rechunk#

import rechunker
## Recall that merely invoking rechunker does not do any work... just sorts out 
## the rechunking plan and writes metadata.
result = rechunker.rechunk(
    smplData,
    chunk_plan,
    "16GB",
    outfile, 
    temp_store=staging, 
)
/home/conda/global/16102bfe-1731002172-4-pangeo/lib/python3.11/site-packages/rechunker/api.py:442: SerializationWarning: variable None has data in the form of a dask array with dtype=object, which means it is being loaded into memory to determine a data type that can be safely stored on disk. To avoid this, coerce this variable to a fixed-size dtype with astype() before saving it.
  variable = encode_zarr_variable(variable)
INFO:aiobotocore.credentials:Found credentials in environment variables.
from dask.distributed import progress, performance_report

with performance_report(filename="dask-report.html"):
    r = result.execute(retries=10)  
import zarr
_ = zarr.consolidate_metadata(outfile)

Results#

Let’s read in the resulting re-chunked dataset to see how it looks:

reChunkedData = xr.open_zarr(outfile)
reChunkedData

Comparison#

## Before:
smplData = ds.where(indexer.compute(), drop=True) # subset to only those features with a valid gage_id
smplData['streamflow'].sel(feature_id=417955)
# Note: many chunks needed to service a single feature_id
## After:
reChunkedData['streamflow'].sel(feature_id=417955) 
# All data for the specified feature_id is in a single chunk
client.close()
cluster.close()