Re-Chunking Data#
Re-organizing stored data such that it matches the analysis use-case.
Inspiration from: NCAR/rechunk_retro_nwm_v21
Note
The
rechunker
documentation contains several examples and a tutorial covering how to re-chunk data. Much of what is here replicates concepts covered in that material. This document uses data that looks likeHyTest
data (variable names, extent, etc), so may offer a smoother intro to the concepts using familiar data.The
zarr
data standard has a nice tutorial also which covers details of optimizing chunking strategies.
Intro#
What is chunking and why should you care?#
The idea of data ‘chunks’ is closely aligned with the NetCDF and zarr standards for storing N-dimensional arrays of typed data.
Chunks become more important as the size of the array increases. For very large arrays, it is helpful to organize the memory it occupies into sub-units. These sub-units are the chunks – note that this is not another dimension to the array, but merely a map to how the large array is partitioned into more palatable sized units for manipulation in memory. Array-handling libraries (numpy, xarray, pandas, and others) will handle all of the record-keeping to know which chunk holds a given unit of the array.
A quick side-bar to illustrate two chunking patterns for a simple 2D array. This is a simplified use-case. Consider a square array of integer values. Just for exposition, let’s use a small array 10x10.
illustration
That array can be organized in memory in a few ways… two common options are row-major order, and column-major order:
Row-Major – A row of data occupies a contiguous block of memory. This implies that cells which are logicall adjacent vertically are not physicall near one another in memory. The ‘distance’ from
r0c0
tor0c1
(a one-cell logical move within the row) is short, while the ‘distance’ tor1c0
(a one-cell logical move within the column) is long.Column-Major – A column of the array occupies a contiguious block of memory. This implies that cells which are adjacent horizontally are not near one another physically in memory.
In either chunk mapping, r3c5
(for example) still fetches the same value – the array still indexes/addresses in the same way – but the chunking plan determines how nearby an ‘adjacent’ index is.
As the size of the array increases, the chunk pattern becomes more relevant. Suppose your data is chunked by rows, and you need to process a column of data – your process will need to read a lot of data, skipping most of it (possibly going to disk to read more data if the dataset is very large), to get the \(i^{th}\) column value for each row. For this analysis, it would be better if the array could be ‘re-chunked’ from row-major order to column-major order. This would favor column operations.
Array size is important to the chunking plan… it could be that an entire row of data in a large 2D array won’t fit into memory (or it can’t fit into a contiguous block of memory). This would require chunking each row of data, in addition to chunking rows over columns (for a row-major plan) As dimensions are added to the array, this chunk-mapping becomes more complex and it becomes much more relevant to chunk the data to match the analysis.
Pros & Cons#
Data that is well-organized to optimize one kind of analysis may not suit another kind of analysis on the same data. Re-chunking is time-consuming, and it produces a separate copy of the dataset, increasing storage requirements. The initial time commitment is a one-time operation so that future analyses can run quickly. The space commitment can be substantial if a complex dataset needs to be organized for many different analyses.
Examining a Small Dataset#
Let’s read a sample dataset and examine how it is chunked.
As a test datasaet, we’ve taken a random sampling of 400 stream gages for the month of July, 2000 from the National Water Model Reanalysis Version 2.1. The full dataset is part of the AWS Open Data Program, available via the S3 bucket at
s3://noaa-nwm-retrospective-2-1-zarr-pds/noaa-nwm-retrospective-2-1-zarr-pds/chrtout.zarr
Our subset of that data for use in this tutorial is included in the HyTEST catalog:
# uncomment the lines below to read in your AWS credentials if you want to access data from a requester-pays bucket (-cloud)
#os.environ['AWS_PROFILE'] = 'osn-hytest-scratch'
#%run ../environment_set_up/Help_AWS_Credentials.ipynb
import xarray as xr
import intake
import os
url = 'https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml'
cat = intake.open_catalog(url)
sampleData = cat['rechunking-tutorial-osn'].to_dask()
sampleData
<xarray.Dataset> Size: 5MB Dimensions: (feature_id: 400, time: 744) Coordinates: elevation (feature_id) float32 2kB dask.array<chunksize=(400,), meta=np.ndarray> * feature_id (feature_id) int32 2kB 3109 189899 ... 947070134 1010003783 gage_id (feature_id) |S15 6kB dask.array<chunksize=(400,), meta=np.ndarray> latitude (feature_id) float32 2kB dask.array<chunksize=(400,), meta=np.ndarray> longitude (feature_id) float32 2kB dask.array<chunksize=(400,), meta=np.ndarray> order (feature_id) int32 2kB dask.array<chunksize=(400,), meta=np.ndarray> * time (time) datetime64[ns] 6kB 2000-07-01 ... 2000-07-31T23:00:00 Data variables: streamflow (time, feature_id) float64 2MB dask.array<chunksize=(256, 16), meta=np.ndarray> velocity (time, feature_id) float64 2MB dask.array<chunksize=(256, 16), meta=np.ndarray> Attributes: TITLE: OUTPUT FROM WRF-Hydro v5.2.0-beta2 code_version: v5.2.0-beta2 featureType: timeSeries model_configuration: retrospective proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 +lat_1...
The critical items to notice in this output are highlighted here:
<xarray.Dataset> Dimensions: (feature_id: 400, time: 744) <-- NOTE: Two dimensions +--- most coordinates are tied to feature_id dimension | Coordinates: V elevation (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray> * feature_id (feature_id) int32 3109 189899 239166 ... 947070134 1010003783 gage_id (feature_id) |S15 dask.array<chunksize=(400,), meta=np.ndarray> latitude (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray> longitude (feature_id) float32 dask.array<chunksize=(400,), meta=np.ndarray> order (feature_id) int32 dask.array<chunksize=(400,), meta=np.ndarray> * time (time) datetime64[ns] 2000-07-01 ... 2000-07-31T23:00:00 Data variables: streamflow (time, feature_id) float64 dask.array<chunksize=(256, 16), meta=np.ndarray> velocity (time, feature_id) float64 dask.array<chunksize=(256, 16), meta=np.ndarray> ^^^^ ^^^^^^^^^^ the data variables are addressed by both dimensions; this is 2D data.
This dataset is a ‘stack’ of two 2D arrays. They are named ‘streamflow’ and ‘velocity’. The indices
into each of those 2D arrays are time
on one axis, and feature_id
on the other. The feature id
is bound to a number of other coordinates, so you can relate/refer to a given feature by its elevation,
gage_id, latitude, longitude, or stream order.
Note the chunksize
highlighted in green. This says that the data is stored in blocks mapping to 256
adjacent time-steps for 16 adjacent features. (NOTE: The original data is not chunked this way; we’ve
deliberately fiddled with the chunk configuration for this tutorial)
A time-series analysis (i.e. sampling all time-step values for a single feature_id
) would require
multiple chunks to be fetched.
# Fetch all the time values for a specific feature_id
sampleData['streamflow'].sel(feature_id=1343034)
<xarray.DataArray 'streamflow' (time: 744)> Size: 6kB dask.array<getitem, shape=(744,), dtype=float64, chunksize=(256,), chunktype=numpy.ndarray> Coordinates: elevation float32 4B dask.array<chunksize=(), meta=np.ndarray> feature_id int32 4B 1343034 gage_id |S15 15B dask.array<chunksize=(), meta=np.ndarray> latitude float32 4B dask.array<chunksize=(), meta=np.ndarray> longitude float32 4B dask.array<chunksize=(), meta=np.ndarray> order int32 4B dask.array<chunksize=(), meta=np.ndarray> * time (time) datetime64[ns] 6kB 2000-07-01 ... 2000-07-31T23:00:00 Attributes: grid_mapping: crs long_name: River Flow units: m3 s-1
This data has 744 time-steps available, chunked into chunks of 256 values each. Three chunks are needed to hold this time-series for one feature. Not too bad, but not good either.
On the other hand, an analysis which samples all locations for a single point in time would need to fetch multiple chunks also.
# Fetch all the gage values for a single day
sampleData['streamflow'].sel(time='07-01-2000')
<xarray.DataArray 'streamflow' (time: 24, feature_id: 400)> Size: 77kB dask.array<getitem, shape=(24, 400), dtype=float64, chunksize=(24, 16), chunktype=numpy.ndarray> Coordinates: elevation (feature_id) float32 2kB dask.array<chunksize=(400,), meta=np.ndarray> * feature_id (feature_id) int32 2kB 3109 189899 ... 947070134 1010003783 gage_id (feature_id) |S15 6kB dask.array<chunksize=(400,), meta=np.ndarray> latitude (feature_id) float32 2kB dask.array<chunksize=(400,), meta=np.ndarray> longitude (feature_id) float32 2kB dask.array<chunksize=(400,), meta=np.ndarray> order (feature_id) int32 2kB dask.array<chunksize=(400,), meta=np.ndarray> * time (time) datetime64[ns] 192B 2000-07-01 ... 2000-07-01T23:00:00 Attributes: grid_mapping: crs long_name: River Flow units: m3 s-1
This dataset has 400 features, broken into chunks of 16 data values each. Many more chunks to fetch. This is much worse: the I/O engine needs to find and retrieve 25 chunks vs 3 in the previous example.
If we were going to do either of those analyses on a very large dataset with this pattern, we’d want to re-chunk the data to optimize for our read pattern.
Re-Chunking the Sample Data#
This is a trivial example, due to the small size of the dataset – It all fits in memory easily. But it is worth doing, as concepts will apply when we take this to the full-sized data.
First thing we need is a chunk plan to describe the chunk layout we want. This can be generated using various methods. For this dataset, it’s easy enough to write it manually:
# Numbers are *size* of the chunk.
chunk_plan = {
'streamflow': {'time': 744, 'feature_id': 1}, # all time records in one chunk for each feature_id
'velocity': {'time': 744, 'feature_id': 1},
'elevation': (400,),
'gage_id': (400,),
'latitude': (400,),
'longitude': (400,),
'order': (400,),
'time': (744,),
'feature_id': (400,)
}
With this plan, we can ask rechunker
to re-write the data using the prescribed chunking pattern.
import rechunker
outfile = r"/tmp/outfile.zarr"
result = rechunker.rechunk(
sampleData,
chunk_plan,
"2GB", #<--- Max Memory
outfile ,
temp_store="/tmp/scratch.zarr"
)
_ = result.execute() # Note that we must specifically direct rechunk to calculate.
# without the call to execute(), the zarr dataset will be empty, and result will hold only
# a 'task graph' outlining the calculation steps.
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[6], line 3
1 import rechunker
2 outfile = r"/tmp/outfile.zarr"
----> 3 result = rechunker.rechunk(
4 sampleData,
5 chunk_plan,
6 "2GB", #<--- Max Memory
7 outfile ,
8 temp_store="/tmp/scratch.zarr"
9 )
10 _ = result.execute() # Note that we must specifically direct rechunk to calculate.
11 # without the call to execute(), the zarr dataset will be empty, and result will hold only
12 # a 'task graph' outlining the calculation steps.
File /home/conda/global/75e44cfd-1739550182-21-pangeo/lib/python3.11/site-packages/rechunker/api.py:303, in rechunk(source, target_chunks, max_mem, target_store, target_options, temp_store, temp_options, executor, array_name)
300 if isinstance(executor, str):
301 executor = _get_executor(executor)
--> 303 copy_spec, intermediate, target = _setup_rechunk(
304 source=source,
305 target_chunks=target_chunks,
306 max_mem=max_mem,
307 target_store=target_store,
308 target_options=target_options,
309 temp_store=temp_store,
310 temp_options=temp_options,
311 array_name=array_name,
312 )
313 plan = executor.prepare_plan(copy_spec)
314 return Rechunked(executor, plan, source, intermediate, target)
File /home/conda/global/75e44cfd-1739550182-21-pangeo/lib/python3.11/site-packages/rechunker/api.py:447, in _setup_rechunk(source, target_chunks, max_mem, target_store, target_options, temp_store, temp_options, array_name)
442 variable = encode_zarr_variable(variable)
444 # Extract the array encoding to get a default chunking, a step
445 # which will also ensure that the target chunking is compatible
446 # with the current chunking (only necessary for on-disk arrays)
--> 447 variable_encoding = extract_zarr_variable_encoding(
448 variable, raise_on_invalid=False, name=name
449 )
450 variable_chunks = target_chunks.get(name, variable_encoding["chunks"])
451 if isinstance(variable_chunks, dict):
File /home/conda/global/75e44cfd-1739550182-21-pangeo/lib/python3.11/site-packages/xarray/backends/zarr.py:471, in extract_zarr_variable_encoding(variable, raise_on_invalid, name, safe_chunks, region, mode, shape)
468 if k not in valid_encodings:
469 del encoding[k]
--> 471 chunks = _determine_zarr_chunks(
472 enc_chunks=encoding.get("chunks"),
473 var_chunks=variable.chunks,
474 ndim=variable.ndim,
475 name=name,
476 safe_chunks=safe_chunks,
477 region=region,
478 mode=mode,
479 shape=shape,
480 )
481 if _zarr_v3() and chunks is None:
482 chunks = "auto"
File /home/conda/global/75e44cfd-1739550182-21-pangeo/lib/python3.11/site-packages/xarray/backends/zarr.py:325, in _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks, region, mode, shape)
314 allow_partial_chunks = mode != "r+"
316 base_error = (
317 f"Specified zarr chunks encoding['chunks']={enc_chunks_tuple!r} for "
318 f"variable named {name!r} would overlap multiple dask chunks {var_chunks!r} "
(...)
322 f"or modifying `encoding['chunks']`, or specify `safe_chunks=False`."
323 )
--> 325 for zchunk, dchunks, interval, size in zip(
326 enc_chunks_tuple, var_chunks, region, shape, strict=True
327 ):
328 if not safe_chunks:
329 continue
TypeError: 'NoneType' object is not iterable
Note that rechunker.rechunk
does not overwrite any data. If it sees that /tmp/outfile.zarr
or /tmp/scratch.zarr
already exist, it will balk and likely raise an exception. Be sure that these locations do not exist.
The rechunker
also writes a minimalist data group. Meaning that variable metadata is not consolidated. This is not a required step, but it will really spead up future workflows when the
data is read back in.
import zarr
_ = zarr.consolidate_metadata(outfile)
Results#
Let’s read in the resulting re-chunked dataset to see how it looks:
reChunkedData = xr.open_zarr(outfile)
reChunkedData
Note here that for both streamflow
and velocity
, the chunksize in the time
dimension is 744 (the total number of time steps). Analyses which favor fetching all time-step values for a given facility_id
will prefer this chunking strategy.
Comparison#
Before Re-Chunking:#
sampleData['streamflow'].sel(feature_id=1343034)
# Note: three chunks needed to service a single feature_id
After re-chunking:#
reChunkedData['streamflow'].sel(feature_id=1343034)
# All data for the specified feature_id is in a single chunk
Cleaning Up#
import shutil
if os.path.exists(outfile):
print(f"removing {outfile}")
shutil.rmtree(outfile)
if os.path.exists(r"/tmp/scratch.zarr"):
print("removing scratch space")
shutil.rmtree(r"/tmp/scratch.zarr")