Re-Chunking Data#

Re-organizing stored data such that it matches the analysis use-case.

Inspiration from: NCAR/rechunk_retro_nwm_v21

Note

  • The rechunker documentation contains several examples and a tutorial covering how to re-chunk data. Much of what is here replicates concepts covered in that material. This document uses data that looks like HyTest data (variable names, extent, etc), so may offer a smoother intro to the concepts using familiar data.

  • The zarr data standard has a nice tutorial also which covers details of optimizing chunking strategies.

Intro#

What is chunking and why should you care?#

The idea of data ‘chunks’ is closely aligned with the NetCDF and zarr standards for storing N-dimensional arrays of typed data.

Chunks become more important as the size of the array increases. For very large arrays, it is helpful to organize the memory it occupies into sub-units. These sub-units are the chunks – note that this is not another dimension to the array, but merely a map to how the large array is partitioned into more palatable sized units for manipulation in memory. Array-handling libraries (numpy, xarray, pandas, and others) will handle all of the record-keeping to know which chunk holds a given unit of the array.

A quick side-bar to illustrate two chunking patterns for a simple 2D array. This is a simplified use-case. Consider a square array of integer values. Just for exposition, let’s use a small array 10x10.

illustration

That array can be organized in memory in a few ways… two common options are row-major order, and column-major order:

  • Row-Major – A row of data occupies a contiguous block of memory. This implies that cells which are logicall adjacent vertically are not physicall near one another in memory. The ‘distance’ from r0c0 to r0c1 (a one-cell logical move within the row) is short, while the ‘distance’ to r1c0 (a one-cell logical move within the column) is long.

  • Column-Major – A column of the array occupies a contiguious block of memory. This implies that cells which are adjacent horizontally are not near one another physically in memory.

In either chunk mapping, r3c5 (for example) still fetches the same value – the array still indexes/addresses in the same way – but the chunking plan determines how nearby an ‘adjacent’ index is.

As the size of the array increases, the chunk pattern becomes more relevant. Suppose your data is chunked by rows, and you need to process a column of data – your process will need to read a lot of data, skipping most of it (possibly going to disk to read more data if the dataset is very large), to get the \(i^{th}\) column value for each row. For this analysis, it would be better if the array could be ‘re-chunked’ from row-major order to column-major order. This would favor column operations.

Array size is important to the chunking plan… it could be that an entire row of data in a large 2D array won’t fit into memory (or it can’t fit into a contiguous block of memory). This would require chunking each row of data, in addition to chunking rows over columns (for a row-major plan) As dimensions are added to the array, this chunk-mapping becomes more complex and it becomes much more relevant to chunk the data to match the analysis.

Pros & Cons#

Data that is well-organized to optimize one kind of analysis may not suit another kind of analysis on the same data. Re-chunking is time-consuming, and it produces a separate copy of the dataset, increasing storage requirements. The initial time commitment is a one-time operation so that future analyses can run quickly. The space commitment can be substantial if a complex dataset needs to be organized for many different analyses.

Examining a Small Dataset#

Let’s read a sample dataset and examine how it is chunked.

As a test datasaet, we’ve taken a random sampling of 400 stream gages for the month of July, 2000 from the National Water Model Reanalysis Version 2.1. The full dataset is part of the AWS Open Data Program, available via the S3 bucket at

s3://noaa-nwm-retrospective-2-1-zarr-pds/noaa-nwm-retrospective-2-1-zarr-pds/chrtout.zarr

Our subset of that data for use in this tutorial is included in the HyTEST catalog:

%run ../../../environment_set_up/Help_AWS_Credentials.ipynb
## Establish AWS credentials
import xarray as xr 
import intake
url = 'https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml'
cat = intake.open_catalog(url)
sampleData = cat['rechunking-tutorial-osn'].to_dask()
sampleData
/home/conda/global/9764ad0fe205ee3ccb00b4474c3e6708e73fb3a628843d562645d7d1e21ccb8d-20231221-205006-476455-52-pangeo/lib/python3.11/site-packages/xarray/core/utils.py:494: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.
  warnings.warn(
<xarray.Dataset>
Dimensions:     (feature_id: 400, time: 744)
Coordinates:
    elevation   (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray>
  * feature_id  (feature_id) int32 3109 189899 239166 ... 947070134 1010003783
    gage_id     (feature_id) |S15 dask.array<chunksize=(400,), meta=np.ndarray>
    latitude    (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray>
    longitude   (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray>
    order       (feature_id) int32 dask.array<chunksize=(400,), meta=np.ndarray>
  * time        (time) datetime64[ns] 2000-07-01 ... 2000-07-31T23:00:00
Data variables:
    streamflow  (time, feature_id) float64 dask.array<chunksize=(256, 16), meta=np.ndarray>
    velocity    (time, feature_id) float64 dask.array<chunksize=(256, 16), meta=np.ndarray>
Attributes:
    TITLE:                OUTPUT FROM WRF-Hydro v5.2.0-beta2
    code_version:         v5.2.0-beta2
    featureType:          timeSeries
    model_configuration:  retrospective
    proj4:                +proj=lcc +units=m +a=6370000.0 +b=6370000.0 +lat_1...

The critical items to notice in this output are highlighted here:

<xarray.Dataset>

Dimensions:     (feature_id: 400, time: 744)  <-- NOTE: Two dimensions

                   +--- most coordinates are tied to feature_id dimension 
                   | 
Coordinates:       V
    elevation   (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray>
  * feature_id  (feature_id) int32 3109 189899 239166 ... 947070134 1010003783
    gage_id     (feature_id) |S15 dask.array<chunksize=(400,), meta=np.ndarray>
    latitude    (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray>
    longitude   (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray>
    order       (feature_id) int32 dask.array<chunksize=(400,), meta=np.ndarray>
  * time        (time) datetime64[ns] 2000-07-01 ... 2000-07-31T23:00:00

Data variables:
    streamflow  (time, feature_id) float64 dask.array<chunksize=(256, 16), meta=np.ndarray>
    velocity    (time, feature_id) float64 dask.array<chunksize=(256, 16), meta=np.ndarray>
                 ^^^^  ^^^^^^^^^^
                 the data variables are addressed by both dimensions; this is 2D data.

This dataset is a ‘stack’ of two 2D arrays. They are named ‘streamflow’ and ‘velocity’. The indices into each of those 2D arrays are time on one axis, and feature_id on the other. The feature id is bound to a number of other coordinates, so you can relate/refer to a given feature by its elevation, gage_id, latitude, longitude, or stream order.

Note the chunksize highlighted in green. This says that the data is stored in blocks mapping to 256 adjacent time-steps for 16 adjacent features. (NOTE: The original data is not chunked this way; we’ve deliberately fiddled with the chunk configuration for this tutorial)

A time-series analysis (i.e. sampling all time-step values for a single feature_id) would require multiple chunks to be fetched.

# Fetch all the time values for a specific feature_id
sampleData['streamflow'].sel(feature_id=1343034)
<xarray.DataArray 'streamflow' (time: 744)>
dask.array<getitem, shape=(744,), dtype=float64, chunksize=(256,), chunktype=numpy.ndarray>
Coordinates:
    elevation   float32 dask.array<chunksize=(), meta=np.ndarray>
    feature_id  int32 1343034
    gage_id     |S15 dask.array<chunksize=(), meta=np.ndarray>
    latitude    float32 dask.array<chunksize=(), meta=np.ndarray>
    longitude   float32 dask.array<chunksize=(), meta=np.ndarray>
    order       int32 dask.array<chunksize=(), meta=np.ndarray>
  * time        (time) datetime64[ns] 2000-07-01 ... 2000-07-31T23:00:00
Attributes:
    grid_mapping:  crs
    long_name:     River Flow
    units:         m3 s-1

This data has 744 time-steps available, chunked into chunks of 256 values each. Three chunks are needed to hold this time-series for one feature. Not too bad, but not good either.

On the other hand, an analysis which samples all locations for a single point in time would need to fetch multiple chunks also.

# Fetch all the gage values for a single day
sampleData['streamflow'].sel(time='07-01-2000')
<xarray.DataArray 'streamflow' (time: 24, feature_id: 400)>
dask.array<getitem, shape=(24, 400), dtype=float64, chunksize=(24, 16), chunktype=numpy.ndarray>
Coordinates:
    elevation   (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray>
  * feature_id  (feature_id) int32 3109 189899 239166 ... 947070134 1010003783
    gage_id     (feature_id) |S15 dask.array<chunksize=(400,), meta=np.ndarray>
    latitude    (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray>
    longitude   (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray>
    order       (feature_id) int32 dask.array<chunksize=(400,), meta=np.ndarray>
  * time        (time) datetime64[ns] 2000-07-01 ... 2000-07-01T23:00:00
Attributes:
    grid_mapping:  crs
    long_name:     River Flow
    units:         m3 s-1

This dataset has 400 features, broken into chunks of 16 data values each. Many more chunks to fetch. This is much worse: the I/O engine needs to find and retrieve 25 chunks vs 3 in the previous example.

If we were going to do either of those analyses on a very large dataset with this pattern, we’d want to re-chunk the data to optimize for our read pattern.

Re-Chunking the Sample Data#

This is a trivial example, due to the small size of the dataset – It all fits in memory easily. But it is worth doing, as concepts will apply when we take this to the full-sized data.

First thing we need is a chunk plan to describe the chunk layout we want. This can be generated using various methods. For this dataset, it’s easy enough to write it manually:

# Numbers are *size* of the chunk. 
chunk_plan = {
    'streamflow': {'time': 744, 'feature_id': 1}, # all time records in one chunk for each feature_id
    'velocity': {'time': 744, 'feature_id': 1},
    'elevation': (400,),
    'gage_id': (400,),
    'latitude': (400,),
    'longitude': (400,),    
    'order': (400,),    
    'time': (744,),
    'feature_id': (400,)
}

With this plan, we can ask rechunker to re-write the data using the prescribed chunking pattern.

import rechunker
outfile = r"/tmp/outfile.zarr"
result = rechunker.rechunk(
    sampleData,
    chunk_plan,
    "2GB",                #<--- Max Memory
    outfile ,
    temp_store="/tmp/scratch.zarr" 
)
_ = result.execute() # Note that we must specifically direct rechunk to calculate.
# without the call to execute(), the zarr dataset will be empty, and result will hold only
# a 'task graph' outlining the calculation steps.

Note that rechunker.rechunk does not overwrite any data. If it sees that /tmp/outfile.zarr or /tmp/scratch.zarr already exist, it will balk and likely raise an exception. Be sure that these locations do not exist.

The rechunker also writes a minimalist data group. Meaning that variable metadata is not consolidated. This is not a required step, but it will really spead up future workflows when the data is read back in.

import zarr
_ = zarr.consolidate_metadata(outfile)

Results#

Let’s read in the resulting re-chunked dataset to see how it looks:

reChunkedData = xr.open_zarr(outfile)
reChunkedData
<xarray.Dataset>
Dimensions:     (feature_id: 400, time: 744)
Coordinates:
    elevation   (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray>
  * feature_id  (feature_id) int32 3109 189899 239166 ... 947070134 1010003783
    gage_id     (feature_id) |S15 dask.array<chunksize=(400,), meta=np.ndarray>
    latitude    (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray>
    longitude   (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray>
    order       (feature_id) int32 dask.array<chunksize=(400,), meta=np.ndarray>
  * time        (time) datetime64[ns] 2000-07-01 ... 2000-07-31T23:00:00
Data variables:
    streamflow  (time, feature_id) float64 dask.array<chunksize=(744, 1), meta=np.ndarray>
    velocity    (time, feature_id) float64 dask.array<chunksize=(744, 1), meta=np.ndarray>
Attributes:
    TITLE:                OUTPUT FROM WRF-Hydro v5.2.0-beta2
    code_version:         v5.2.0-beta2
    featureType:          timeSeries
    model_configuration:  retrospective
    proj4:                +proj=lcc +units=m +a=6370000.0 +b=6370000.0 +lat_1...

Note here that for both streamflow and velocity, the chunksize in the time dimension is 744 (the total number of time steps). Analyses which favor fetching all time-step values for a given facility_id will prefer this chunking strategy.

Comparison#

Before Re-Chunking:#

sampleData['streamflow'].sel(feature_id=1343034)
# Note: three chunks needed to service a single feature_id
<xarray.DataArray 'streamflow' (time: 744)>
dask.array<getitem, shape=(744,), dtype=float64, chunksize=(256,), chunktype=numpy.ndarray>
Coordinates:
    elevation   float32 dask.array<chunksize=(), meta=np.ndarray>
    feature_id  int32 1343034
    gage_id     |S15 dask.array<chunksize=(), meta=np.ndarray>
    latitude    float32 dask.array<chunksize=(), meta=np.ndarray>
    longitude   float32 dask.array<chunksize=(), meta=np.ndarray>
    order       int32 dask.array<chunksize=(), meta=np.ndarray>
  * time        (time) datetime64[ns] 2000-07-01 ... 2000-07-31T23:00:00
Attributes:
    grid_mapping:  crs
    long_name:     River Flow
    units:         m3 s-1

After re-chunking:#

reChunkedData['streamflow'].sel(feature_id=1343034) 
# All data for the specified feature_id is in a single chunk
<xarray.DataArray 'streamflow' (time: 744)>
dask.array<getitem, shape=(744,), dtype=float64, chunksize=(744,), chunktype=numpy.ndarray>
Coordinates:
    elevation   float32 dask.array<chunksize=(), meta=np.ndarray>
    feature_id  int32 1343034
    gage_id     |S15 dask.array<chunksize=(), meta=np.ndarray>
    latitude    float32 dask.array<chunksize=(), meta=np.ndarray>
    longitude   float32 dask.array<chunksize=(), meta=np.ndarray>
    order       int32 dask.array<chunksize=(), meta=np.ndarray>
  * time        (time) datetime64[ns] 2000-07-01 ... 2000-07-31T23:00:00
Attributes:
    grid_mapping:  crs
    long_name:     River Flow
    units:         m3 s-1

Cleaning Up#

import shutil
if os.path.exists(outfile):
    print(f"removing {outfile}")
    shutil.rmtree(outfile)
if os.path.exists(r"/tmp/scratch.zarr"):
    print("removing scratch space")
    shutil.rmtree(r"/tmp/scratch.zarr")
removing /tmp/outfile.zarr
removing scratch space