Re-Chunking Data#
Re-organizing stored data such that it matches the analysis use-case.
Inspiration from: NCAR/rechunk_retro_nwm_v21
Note
The
rechunker
documentation contains several examples and a tutorial covering how to re-chunk data. Much of what is here replicates concepts covered in that material. This document uses data that looks likeHyTest
data (variable names, extent, etc), so may offer a smoother intro to the concepts using familiar data.The
zarr
data standard has a nice tutorial also which covers details of optimizing chunking strategies.
Intro#
What is chunking and why should you care?#
The idea of data ‘chunks’ is closely aligned with the NetCDF and zarr standards for storing N-dimensional arrays of typed data.
Chunks become more important as the size of the array increases. For very large arrays, it is helpful to organize the memory it occupies into sub-units. These sub-units are the chunks – note that this is not another dimension to the array, but merely a map to how the large array is partitioned into more palatable sized units for manipulation in memory. Array-handling libraries (numpy, xarray, pandas, and others) will handle all of the record-keeping to know which chunk holds a given unit of the array.
A quick side-bar to illustrate two chunking patterns for a simple 2D array. This is a simplified use-case. Consider a square array of integer values. Just for exposition, let’s use a small array 10x10.
illustration
That array can be organized in memory in a few ways… two common options are row-major order, and column-major order:
Row-Major – A row of data occupies a contiguous block of memory. This implies that cells which are logicall adjacent vertically are not physicall near one another in memory. The ‘distance’ from
r0c0
tor0c1
(a one-cell logical move within the row) is short, while the ‘distance’ tor1c0
(a one-cell logical move within the column) is long.Column-Major – A column of the array occupies a contiguious block of memory. This implies that cells which are adjacent horizontally are not near one another physically in memory.
In either chunk mapping, r3c5
(for example) still fetches the same value – the array still indexes/addresses in the same way – but the chunking plan determines how nearby an ‘adjacent’ index is.
As the size of the array increases, the chunk pattern becomes more relevant. Suppose your data is chunked by rows, and you need to process a column of data – your process will need to read a lot of data, skipping most of it (possibly going to disk to read more data if the dataset is very large), to get the \(i^{th}\) column value for each row. For this analysis, it would be better if the array could be ‘re-chunked’ from row-major order to column-major order. This would favor column operations.
Array size is important to the chunking plan… it could be that an entire row of data in a large 2D array won’t fit into memory (or it can’t fit into a contiguous block of memory). This would require chunking each row of data, in addition to chunking rows over columns (for a row-major plan) As dimensions are added to the array, this chunk-mapping becomes more complex and it becomes much more relevant to chunk the data to match the analysis.
Pros & Cons#
Data that is well-organized to optimize one kind of analysis may not suit another kind of analysis on the same data. Re-chunking is time-consuming, and it produces a separate copy of the dataset, increasing storage requirements. The initial time commitment is a one-time operation so that future analyses can run quickly. The space commitment can be substantial if a complex dataset needs to be organized for many different analyses.
Examining a Small Dataset#
Let’s read a sample dataset and examine how it is chunked.
As a test datasaet, we’ve taken a random sampling of 400 stream gages for the month of July, 2000 from the National Water Model Reanalysis Version 2.1. The full dataset is part of the AWS Open Data Program, available via the S3 bucket at
s3://noaa-nwm-retrospective-2-1-zarr-pds/noaa-nwm-retrospective-2-1-zarr-pds/chrtout.zarr
Our subset of that data for use in this tutorial is included in the HyTEST catalog:
%run ../../../environment_set_up/Help_AWS_Credentials.ipynb
## Establish AWS credentials
import xarray as xr
import intake
url = 'https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml'
cat = intake.open_catalog(url)
sampleData = cat['rechunking-tutorial-osn'].to_dask()
sampleData
/home/conda/global/9764ad0fe205ee3ccb00b4474c3e6708e73fb3a628843d562645d7d1e21ccb8d-20231221-205006-476455-52-pangeo/lib/python3.11/site-packages/xarray/core/utils.py:494: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.
warnings.warn(
<xarray.Dataset> Dimensions: (feature_id: 400, time: 744) Coordinates: elevation (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray> * feature_id (feature_id) int32 3109 189899 239166 ... 947070134 1010003783 gage_id (feature_id) |S15 dask.array<chunksize=(400,), meta=np.ndarray> latitude (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray> longitude (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray> order (feature_id) int32 dask.array<chunksize=(400,), meta=np.ndarray> * time (time) datetime64[ns] 2000-07-01 ... 2000-07-31T23:00:00 Data variables: streamflow (time, feature_id) float64 dask.array<chunksize=(256, 16), meta=np.ndarray> velocity (time, feature_id) float64 dask.array<chunksize=(256, 16), meta=np.ndarray> Attributes: TITLE: OUTPUT FROM WRF-Hydro v5.2.0-beta2 code_version: v5.2.0-beta2 featureType: timeSeries model_configuration: retrospective proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 +lat_1...
The critical items to notice in this output are highlighted here:
<xarray.Dataset> Dimensions: (feature_id: 400, time: 744) <-- NOTE: Two dimensions +--- most coordinates are tied to feature_id dimension | Coordinates: V elevation (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray> * feature_id (feature_id) int32 3109 189899 239166 ... 947070134 1010003783 gage_id (feature_id) |S15 dask.array<chunksize=(400,), meta=np.ndarray> latitude (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray> longitude (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray> order (feature_id) int32 dask.array<chunksize=(400,), meta=np.ndarray> * time (time) datetime64[ns] 2000-07-01 ... 2000-07-31T23:00:00 Data variables: streamflow (time, feature_id) float64 dask.array<chunksize=(256, 16), meta=np.ndarray> velocity (time, feature_id) float64 dask.array<chunksize=(256, 16), meta=np.ndarray> ^^^^ ^^^^^^^^^^ the data variables are addressed by both dimensions; this is 2D data.
This dataset is a ‘stack’ of two 2D arrays. They are named ‘streamflow’ and ‘velocity’. The indices
into each of those 2D arrays are time
on one axis, and feature_id
on the other. The feature id
is bound to a number of other coordinates, so you can relate/refer to a given feature by its elevation,
gage_id, latitude, longitude, or stream order.
Note the chunksize
highlighted in green. This says that the data is stored in blocks mapping to 256
adjacent time-steps for 16 adjacent features. (NOTE: The original data is not chunked this way; we’ve
deliberately fiddled with the chunk configuration for this tutorial)
A time-series analysis (i.e. sampling all time-step values for a single feature_id
) would require
multiple chunks to be fetched.
# Fetch all the time values for a specific feature_id
sampleData['streamflow'].sel(feature_id=1343034)
<xarray.DataArray 'streamflow' (time: 744)> dask.array<getitem, shape=(744,), dtype=float64, chunksize=(256,), chunktype=numpy.ndarray> Coordinates: elevation float32 dask.array<chunksize=(), meta=np.ndarray> feature_id int32 1343034 gage_id |S15 dask.array<chunksize=(), meta=np.ndarray> latitude float32 dask.array<chunksize=(), meta=np.ndarray> longitude float32 dask.array<chunksize=(), meta=np.ndarray> order int32 dask.array<chunksize=(), meta=np.ndarray> * time (time) datetime64[ns] 2000-07-01 ... 2000-07-31T23:00:00 Attributes: grid_mapping: crs long_name: River Flow units: m3 s-1
This data has 744 time-steps available, chunked into chunks of 256 values each. Three chunks are needed to hold this time-series for one feature. Not too bad, but not good either.
On the other hand, an analysis which samples all locations for a single point in time would need to fetch multiple chunks also.
# Fetch all the gage values for a single day
sampleData['streamflow'].sel(time='07-01-2000')
<xarray.DataArray 'streamflow' (time: 24, feature_id: 400)> dask.array<getitem, shape=(24, 400), dtype=float64, chunksize=(24, 16), chunktype=numpy.ndarray> Coordinates: elevation (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray> * feature_id (feature_id) int32 3109 189899 239166 ... 947070134 1010003783 gage_id (feature_id) |S15 dask.array<chunksize=(400,), meta=np.ndarray> latitude (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray> longitude (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray> order (feature_id) int32 dask.array<chunksize=(400,), meta=np.ndarray> * time (time) datetime64[ns] 2000-07-01 ... 2000-07-01T23:00:00 Attributes: grid_mapping: crs long_name: River Flow units: m3 s-1
This dataset has 400 features, broken into chunks of 16 data values each. Many more chunks to fetch. This is much worse: the I/O engine needs to find and retrieve 25 chunks vs 3 in the previous example.
If we were going to do either of those analyses on a very large dataset with this pattern, we’d want to re-chunk the data to optimize for our read pattern.
Re-Chunking the Sample Data#
This is a trivial example, due to the small size of the dataset – It all fits in memory easily. But it is worth doing, as concepts will apply when we take this to the full-sized data.
First thing we need is a chunk plan to describe the chunk layout we want. This can be generated using various methods. For this dataset, it’s easy enough to write it manually:
# Numbers are *size* of the chunk.
chunk_plan = {
'streamflow': {'time': 744, 'feature_id': 1}, # all time records in one chunk for each feature_id
'velocity': {'time': 744, 'feature_id': 1},
'elevation': (400,),
'gage_id': (400,),
'latitude': (400,),
'longitude': (400,),
'order': (400,),
'time': (744,),
'feature_id': (400,)
}
With this plan, we can ask rechunker
to re-write the data using the prescribed chunking pattern.
import rechunker
outfile = r"/tmp/outfile.zarr"
result = rechunker.rechunk(
sampleData,
chunk_plan,
"2GB", #<--- Max Memory
outfile ,
temp_store="/tmp/scratch.zarr"
)
_ = result.execute() # Note that we must specifically direct rechunk to calculate.
# without the call to execute(), the zarr dataset will be empty, and result will hold only
# a 'task graph' outlining the calculation steps.
Note that rechunker.rechunk
does not overwrite any data. If it sees that /tmp/outfile.zarr
or /tmp/scratch.zarr
already exist, it will balk and likely raise an exception. Be sure that these locations do not exist.
The rechunker
also writes a minimalist data group. Meaning that variable metadata is not consolidated. This is not a required step, but it will really spead up future workflows when the
data is read back in.
import zarr
_ = zarr.consolidate_metadata(outfile)
Results#
Let’s read in the resulting re-chunked dataset to see how it looks:
reChunkedData = xr.open_zarr(outfile)
reChunkedData
<xarray.Dataset> Dimensions: (feature_id: 400, time: 744) Coordinates: elevation (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray> * feature_id (feature_id) int32 3109 189899 239166 ... 947070134 1010003783 gage_id (feature_id) |S15 dask.array<chunksize=(400,), meta=np.ndarray> latitude (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray> longitude (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray> order (feature_id) int32 dask.array<chunksize=(400,), meta=np.ndarray> * time (time) datetime64[ns] 2000-07-01 ... 2000-07-31T23:00:00 Data variables: streamflow (time, feature_id) float64 dask.array<chunksize=(744, 1), meta=np.ndarray> velocity (time, feature_id) float64 dask.array<chunksize=(744, 1), meta=np.ndarray> Attributes: TITLE: OUTPUT FROM WRF-Hydro v5.2.0-beta2 code_version: v5.2.0-beta2 featureType: timeSeries model_configuration: retrospective proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 +lat_1...
Note here that for both streamflow
and velocity
, the chunksize in the time
dimension is 744 (the total number of time steps). Analyses which favor fetching all time-step values for a given facility_id
will prefer this chunking strategy.
Comparison#
Before Re-Chunking:#
sampleData['streamflow'].sel(feature_id=1343034)
# Note: three chunks needed to service a single feature_id
<xarray.DataArray 'streamflow' (time: 744)> dask.array<getitem, shape=(744,), dtype=float64, chunksize=(256,), chunktype=numpy.ndarray> Coordinates: elevation float32 dask.array<chunksize=(), meta=np.ndarray> feature_id int32 1343034 gage_id |S15 dask.array<chunksize=(), meta=np.ndarray> latitude float32 dask.array<chunksize=(), meta=np.ndarray> longitude float32 dask.array<chunksize=(), meta=np.ndarray> order int32 dask.array<chunksize=(), meta=np.ndarray> * time (time) datetime64[ns] 2000-07-01 ... 2000-07-31T23:00:00 Attributes: grid_mapping: crs long_name: River Flow units: m3 s-1
After re-chunking:#
reChunkedData['streamflow'].sel(feature_id=1343034)
# All data for the specified feature_id is in a single chunk
<xarray.DataArray 'streamflow' (time: 744)> dask.array<getitem, shape=(744,), dtype=float64, chunksize=(744,), chunktype=numpy.ndarray> Coordinates: elevation float32 dask.array<chunksize=(), meta=np.ndarray> feature_id int32 1343034 gage_id |S15 dask.array<chunksize=(), meta=np.ndarray> latitude float32 dask.array<chunksize=(), meta=np.ndarray> longitude float32 dask.array<chunksize=(), meta=np.ndarray> order int32 dask.array<chunksize=(), meta=np.ndarray> * time (time) datetime64[ns] 2000-07-01 ... 2000-07-31T23:00:00 Attributes: grid_mapping: crs long_name: River Flow units: m3 s-1
Cleaning Up#
import shutil
if os.path.exists(outfile):
print(f"removing {outfile}")
shutil.rmtree(outfile)
if os.path.exists(r"/tmp/scratch.zarr"):
print("removing scratch space")
shutil.rmtree(r"/tmp/scratch.zarr")
removing /tmp/outfile.zarr
removing scratch space