Rechunking#

The goal of this notebook is to learn how to “rechunk” data. This will be a culmination of all the previous introductory material where we will:

  1. Read in a Zarr store

  2. Check the current chunking

  3. Choose a new chunk shape

  4. Rechunk using Rechunker

  5. Confirm the proper creation of the Zarr store by Rechunker

%xmode Minimal
import xarray as xr
import fsspec
from rechunker import rechunk
import zarr
import shutil
import numpy as np
Exception reporting mode: Minimal

Read in a Zarr Store#

For the dataset in this tutorial, we will use the data from the National Water Model Reanalysis Version 2.1. The full dataset is part of the AWS Open Data Program, available via the S3 bucket at: s3://noaa-nwm-retro-v2-zarr-pds/.

As this is a Zarr store, we can easily read it in directly with xarray.open_dataset(), including the keyword chunks={} to make sure dask loads the data using the stored chunks’ shape and size.

file = fsspec.get_mapper('s3://noaa-nwm-retro-v2-zarr-pds', anon=True)
ds = xr.open_dataset(file, chunks={}, engine='zarr')
ds
<xarray.Dataset> Size: 35TB
Dimensions:         (time: 227904, feature_id: 2729077)
Coordinates:
  * feature_id      (feature_id) int32 11MB 101 179 ... 1180001803 1180001804
    latitude        (feature_id) float32 11MB dask.array<chunksize=(2729077,), meta=np.ndarray>
    longitude       (feature_id) float32 11MB dask.array<chunksize=(2729077,), meta=np.ndarray>
  * time            (time) datetime64[ns] 2MB 1993-01-01 ... 2018-12-31T23:00:00
Data variables:
    elevation       (time, feature_id) float32 2TB dask.array<chunksize=(672, 30000), meta=np.ndarray>
    order           (time, feature_id) int32 2TB dask.array<chunksize=(672, 30000), meta=np.ndarray>
    qBtmVertRunoff  (time, feature_id) float64 5TB dask.array<chunksize=(672, 30000), meta=np.ndarray>
    qBucket         (time, feature_id) float64 5TB dask.array<chunksize=(672, 30000), meta=np.ndarray>
    qSfcLatRunoff   (time, feature_id) float64 5TB dask.array<chunksize=(672, 30000), meta=np.ndarray>
    q_lateral       (time, feature_id) float64 5TB dask.array<chunksize=(672, 30000), meta=np.ndarray>
    streamflow      (time, feature_id) float64 5TB dask.array<chunksize=(672, 30000), meta=np.ndarray>
    velocity        (time, feature_id) float64 5TB dask.array<chunksize=(672, 30000), meta=np.ndarray>
Attributes: (12/17)
    Conventions:                CF-1.6
    cdm_datatype:               Station
    code_version:               v5.1.0-alpha11
    dev:                        dev_ prefix indicates development/internal me...
    dev_NOAH_TIMESTEP:          3600
    dev_OVRTSWCRT:              1
    ...                         ...
    model_output_type:          channel_rt
    model_output_valid_time:    2018-12-28_00:00:00
    model_total_valid_times:    2208
    proj4:                      +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ...
    station_dimension:          feature_id
    stream_order_output:        1

Check the Current Chunk Shape and Size#

From the output, we can see there are two dimensions, time of length 227,904 and feature_id of length 2,729,077. Both time and feature_id are coordinates, along with two extra coordinates of latitude and longitude which are tied to the feature_id. Inspecting these extra coordinates in detail, we notice they are “chunked”, but only to a single chunk of size 10.41 MiB. This is good, as we will want them in a single chunk for writing, such that they are fully read in with each data variable chunk. Finally, there are eight data variables, all having both of the dimensions, meaning they are 2D data. Examining the variables in detail, they all have chunks of shape {'time': 672, 'feature_id': 30000} and size of 153.81 MiB or 76.9 MiB (depending if the variable is 64- or 32-bit, respectively). Additionally, we can see that each variable is a whopping 4.53 or 2.26 TiB in memory, which means the whole dataset is almost 32 TiB!

This is a bit more than we want to work with in our example. So, let’s go ahead and select a subset of the data that is about 1 GiB in memory. While this is not a larger-than-memory dataset anymore, it is still reasonably large and it will work well for this example without taking forever to get data and rechunk. For the subset, let’s only look at water year 2018 (i.e., October 2017 to September 2018) for the variables streamflow and velocity for the first 15,000 feature_ids.

Note

As we are not working on AWS, we have selected this subset to minimize the number of chunks that need to be downloaded from the S3 bucket.

ds = ds[['streamflow', 'velocity']]
ds = ds.isel(feature_id=slice(0, 15000))
ds = ds.sel(time=slice('2017-10-01', '2018-09-30'))
ds
<xarray.Dataset> Size: 2GB
Dimensions:     (time: 8760, feature_id: 15000)
Coordinates:
  * feature_id  (feature_id) int32 60kB 101 179 181 183 ... 303420 303422 303424
    latitude    (feature_id) float32 60kB dask.array<chunksize=(15000,), meta=np.ndarray>
    longitude   (feature_id) float32 60kB dask.array<chunksize=(15000,), meta=np.ndarray>
  * time        (time) datetime64[ns] 70kB 2017-10-01 ... 2018-09-30T23:00:00
Data variables:
    streamflow  (time, feature_id) float64 1GB dask.array<chunksize=(120, 15000), meta=np.ndarray>
    velocity    (time, feature_id) float64 1GB dask.array<chunksize=(120, 15000), meta=np.ndarray>
Attributes: (12/17)
    Conventions:                CF-1.6
    cdm_datatype:               Station
    code_version:               v5.1.0-alpha11
    dev:                        dev_ prefix indicates development/internal me...
    dev_NOAH_TIMESTEP:          3600
    dev_OVRTSWCRT:              1
    ...                         ...
    model_output_type:          channel_rt
    model_output_valid_time:    2018-12-28_00:00:00
    model_total_valid_times:    2208
    proj4:                      +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ...
    station_dimension:          feature_id
    stream_order_output:        1

Now, our subset of data is only about 1 GiB per data variable and has a chunk shape of {'time': 672, 'feature_id': 15000} with size of 76.9 MiB. This is a good chunk size and between the optimal range of 10 to 200 MiB. However, the chunk shape may not be an optimal choice for our analysis as it is chunked completely by feature_id (i.e., all feature IDs for a given time are read in a single chunk).

Choose a New Chunk Shape and Size#

To decide on a new chunk shape and size, we need to determine how we will use the data. As we just discussed, an analysis which samples all locations for a single point in time would need to fetch only a single chunk, which is perfect for that analysis. However, a time-series analysis (i.e. sampling all time-step values for a single feature_id) would require 13 chunks to be read for one feature. This means many more chunks must be fetched for this read pattern. Thinking of data usage, the preferred format for the streamflow data variable is likely time-series wise chunking as this variable is more often used as full time series at a single location. The same goes for velocity. However, for the purpose of this example, let’s assume that we don’t know how velocity will be used and give it a different chunking pattern, one where we balance the number of chunks between each dimension.

nfeature = len(ds.feature_id)
ntime = len(ds.time)

streamflow_chunk_plan = {'time': ntime, 'feature_id': 1}
bytes_per_value = ds.streamflow.dtype.itemsize
total_bytes = streamflow_chunk_plan['time'] * streamflow_chunk_plan['feature_id'] * bytes_per_value
streamflow_MiB = total_bytes / (2 ** 10) ** 2
print("STREAMFLOW \n"
      f"Shape of chunk: {streamflow_chunk_plan} \n"
      f"Partial 'time' chunk remainder: {(ntime % streamflow_chunk_plan['time']) / streamflow_chunk_plan['time']} \n"
      f"Partial 'feature_id' chunk remainder: {(nfeature % streamflow_chunk_plan['feature_id']) / streamflow_chunk_plan['feature_id']} \n"
      f"Chunk size: {streamflow_MiB:.2f} [MiB] \n")

chunks_per_dim = 3
velocity_chunk_plan = {'time': ntime // chunks_per_dim, 'feature_id': nfeature // chunks_per_dim}
bytes_per_value = ds.velocity.dtype.itemsize
total_bytes = velocity_chunk_plan['time'] * velocity_chunk_plan['feature_id'] * bytes_per_value
velocity_MiB = total_bytes / (2 ** 10) ** 2
print("VELOCITY \n"
      f"Shape of chunk: {velocity_chunk_plan} \n"
      f"Partial 'time' chunk remainder: {(ntime % velocity_chunk_plan['time']) / velocity_chunk_plan['time']} \n"
      f"Partial 'feature_id' chunk remainder: {(nfeature % velocity_chunk_plan['feature_id']) / velocity_chunk_plan['feature_id']} \n"
      f"Chunk size: {velocity_MiB:.2f} [MiB]")
STREAMFLOW 
Shape of chunk: {'time': 8760, 'feature_id': 1} 
Partial 'time' chunk remainder: 0.0 
Partial 'feature_id' chunk remainder: 0.0 
Chunk size: 0.07 [MiB] 

VELOCITY 
Shape of chunk: {'time': 2920, 'feature_id': 5000} 
Partial 'time' chunk remainder: 0.0 
Partial 'feature_id' chunk remainder: 0.0 
Chunk size: 111.39 [MiB]

The time-series streamflow chunking for a single feature ID has way too small of a chunk size. If we increase the number of feature IDs per chunk by a factor of 1000, we should have the right size for that chunk As for velocity, the split of the data into three chunks along both dimensions resulted in no partial chunks and a chunk size of 111 MiB. This is within our optimal chunk size range. So, let’s stick with the velocity chunks and recheck the streamflow chunking with 1000 features per time series chunk.

streamflow_chunk_plan = {'time': ntime, 'feature_id': 1000}
bytes_per_value = ds.streamflow.dtype.itemsize
total_bytes = streamflow_chunk_plan['time'] * streamflow_chunk_plan['feature_id'] * bytes_per_value
streamflow_MiB = total_bytes / (2 ** 10) ** 2
print("STREAMFLOW \n"
      f"Chunk of shape {streamflow_chunk_plan} \n"
      f"Partial 'time' chunk remainder: {(ntime % streamflow_chunk_plan['time']) / streamflow_chunk_plan['time']} \n"
      f"Partial 'feature_id' chunk remainder: {(nfeature % streamflow_chunk_plan['feature_id']) / streamflow_chunk_plan['feature_id']} \n"
      f"Chunk size: {streamflow_MiB:.2f} [MiB] \n")
STREAMFLOW 
Chunk of shape {'time': 8760, 'feature_id': 1000} 
Partial 'time' chunk remainder: 0.0 
Partial 'feature_id' chunk remainder: 0.0 
Chunk size: 66.83 [MiB] 

Alright, no remainders and a reasonable chunk size. Time to get to rechunking!

Rechunk with Rechunker#

This is a relatively trivial example, due to the smaller size of the subset of the dataset. As the whole subset can fit into memory easily, chunking in general is largely unnecesary in terms of optimizing I/O (however, parallelism is still a consideration). But it is worth doing, as the concepts will apply if we take this to the full dataset.

First thing we need to determine is a chunk plan to describe the chunk layout we want. We already created a basic version of this above that describes the chunk shapes of streamflow and velocity. We just need to place it into a more cohesive list along with chunk shapes for the coordinates.

chunk_plan = {
    'streamflow': streamflow_chunk_plan,
    'velocity': velocity_chunk_plan,
     # We don't want any of the coordinates chunked
    'latitude': (nfeature,),
    'longitude': (nfeature,),    
    'time': (ntime,),
    'feature_id': (nfeature,)
}
chunk_plan
{'streamflow': {'time': 8760, 'feature_id': 1000},
 'velocity': {'time': 2920, 'feature_id': 5000},
 'latitude': (15000,),
 'longitude': (15000,),
 'time': (8760,),
 'feature_id': (15000,)}

With this plan, we can ask Rechunker to re-write the data using the prescribed chunking pattern. Rechunker will read the data and rechunk it using an intermediate Zarr store for efficiency. This will produce a new Zarr store with the chunk shapes we specified in chunk_plan.

outfile = "rechunked_nwm.zarr"
temp_store = "/tmp/scratch.zarr"
try:
    result = rechunk(
        ds,
        target_chunks=chunk_plan,
        max_mem="2GB",
        target_store=outfile ,
        temp_store=temp_store
    )
    display(result)
except Exception as e:
    print(e)
Zarr requires uniform chunk sizes except for final chunk. Variable named 'streamflow' has incompatible dask chunks: ((120, 672, 672, 672, 672, 672, 672, 672, 672, 672, 672, 672, 672, 576), (15000,)). Consider rechunking using `chunk()`.

Oh, that is not what we wanted! We seem to have gotten an error indicating overlap in chunks between the read and write. Looking at the error, it is saying that the first time chunk we are reading is a partial chunk and not a full chunk. So, when Rechunker tries to read the data and then write the first rechunk, it is having to read two chunks to write to the one chunk. This is a one-to-many write, which can corrupt our file when done in parallel with dask. Thank goodness Rechunker caught this for us! Reading the recommended fix, it seems the only way to go about this is to call chunk() and reset the chunking on the original data. In other words, after we select the subset from the dataset, we need to realign the chunks such that the first chunk is not a partial chunk. This is simple enough to do. So much so, we can just do it when passing the dataset subset to Rechunker.

Note

rechunker.rechunk does not overwrite any data. If it sees that rechunked_nwm.zarr or /tmp/scratch.zarr already exist, it will raise an exception. Be sure that these locations do not exist before calling Rechunker.

# We must delete the started rechunked zarr stores
shutil.rmtree(outfile)
shutil.rmtree(temp_store)

result = rechunk(
    ds.chunk({'time': 672, 'feature_id': 15000}),
    target_chunks=chunk_plan,
    max_mem="2GB",
    target_store=outfile ,
    temp_store=temp_store
)
result

Rechunked

Source
<xarray.Dataset> Size: 2GB
Dimensions:     (time: 8760, feature_id: 15000)
Coordinates:
  * feature_id  (feature_id) int32 60kB 101 179 181 183 ... 303420 303422 303424
    latitude    (feature_id) float32 60kB dask.array<chunksize=(15000,), meta=np.ndarray>
    longitude   (feature_id) float32 60kB dask.array<chunksize=(15000,), meta=np.ndarray>
  * time        (time) datetime64[ns] 70kB 2017-10-01 ... 2018-09-30T23:00:00
Data variables:
    streamflow  (time, feature_id) float64 1GB dask.array<chunksize=(672, 15000), meta=np.ndarray>
    velocity    (time, feature_id) float64 1GB dask.array<chunksize=(672, 15000), meta=np.ndarray>
Attributes: (12/17)
    Conventions:                CF-1.6
    cdm_datatype:               Station
    code_version:               v5.1.0-alpha11
    dev:                        dev_ prefix indicates development/internal me...
    dev_NOAH_TIMESTEP:          3600
    dev_OVRTSWCRT:              1
    ...                         ...
    model_output_type:          channel_rt
    model_output_valid_time:    2018-12-28_00:00:00
    model_total_valid_times:    2208
    proj4:                      +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ...
    station_dimension:          feature_id
    stream_order_output:        1
Intermediate None
Target None

Alright, that worked with no problems! Now, we must specifically direct rechunk to calculate. To do this, we can call execute() on our result Rechunked object. Without the call to execute(), the Zarr dataset will be empty, and result will only hold a ‘task graph’ outlining the calculation steps.

Tip

Rechunker also writes a minimalist data group, meaning that variable metadata is not consolidated. This is not a required step, but it will really spead up future workflows when the data is read back in using xarray.

_ = result.execute()
_ = zarr.consolidate_metadata(outfile)

Confirm the Creation of the Zarr Store by Rechunker#

Let’s read in the resulting re-chunked dataset to confirm it turned out how we intended.

ds_rechunked = xr.open_zarr(outfile)
ds_rechunked
<xarray.Dataset> Size: 2GB
Dimensions:     (feature_id: 15000, time: 8760)
Coordinates:
  * feature_id  (feature_id) int32 60kB 101 179 181 183 ... 303420 303422 303424
    latitude    (feature_id) float32 60kB dask.array<chunksize=(15000,), meta=np.ndarray>
    longitude   (feature_id) float32 60kB dask.array<chunksize=(15000,), meta=np.ndarray>
  * time        (time) datetime64[ns] 70kB 2017-10-01 ... 2018-09-30T23:00:00
Data variables:
    streamflow  (time, feature_id) float64 1GB dask.array<chunksize=(8760, 1000), meta=np.ndarray>
    velocity    (time, feature_id) float64 1GB dask.array<chunksize=(2920, 5000), meta=np.ndarray>
Attributes: (12/17)
    Conventions:                CF-1.6
    cdm_datatype:               Station
    code_version:               v5.1.0-alpha11
    dev:                        dev_ prefix indicates development/internal me...
    dev_NOAH_TIMESTEP:          3600
    dev_OVRTSWCRT:              1
    ...                         ...
    model_output_type:          channel_rt
    model_output_valid_time:    2018-12-28_00:00:00
    model_total_valid_times:    2208
    proj4:                      +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ...
    station_dimension:          feature_id
    stream_order_output:        1

Great, our chunk shapes are exactly what we specified! Let’s do one final check that will compare our original subset of the dataset with our new rechunked dataset, that way we can confirm nothing unexpected happened during rechunking.

Note

For our small example dataset this is easy as the data will fit into memory. More efficient and better ways should be used for larger datasets.

np.abs(ds - ds_rechunked).compute().max()
/home/conda/hytest/28b61f30-1750444858-150-rechunking/lib/python3.13/site-packages/dask/array/core.py:4998: PerformanceWarning: Increasing number of chunks by factor of 14
  result = blockwise(
<xarray.Dataset> Size: 16B
Dimensions:     ()
Data variables:
    streamflow  float64 8B 0.0
    velocity    float64 8B 0.0

Perfect! The maximum absolute difference between each both the streamflow and velocity variables is 0. In other words, they are exactly the same, and Rechunker worked as expect.

Now that you know how to rechunk a Zarr store using Rechunker, you should know all of the basics there are in terms of chunking. You are now ready to explore more advanced chunking topics in chunking if you are interested!

Clean Up#

As we don’t want to keep this rechunked Zarr on our local machine, let’s go ahead and delete it.

shutil.rmtree(outfile)
shutil.rmtree(temp_store)