Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

Rechunking

The goal of this notebook is to learn how to “rechunk” data. This will be a culmination of all the previous introductory material where we will:

  1. Read in a Zarr store

  2. Check the current chunking

  3. Choose a new chunk shape

  4. Rechunk using Rechunker

  5. Confirm the proper creation of the Zarr store by Rechunker

%xmode Minimal
import xarray as xr
import fsspec
from rechunker import rechunk
import zarr
import shutil
import numpy as np

Read in a Zarr Store

For the dataset in this tutorial, we will use the data from the National Water Model Reanalysis Version 2.1. The full dataset is part of the AWS Open Data Program, available via the S3 bucket at: s3://noaa-nwm-retro-v2-zarr-pds/.

As this is a Zarr store, we can easily read it in directly with xarray.open_dataset(), including the keyword chunks={} to make sure dask loads the data using the stored chunks’ shape and size.

file = fsspec.get_mapper('s3://noaa-nwm-retro-v2-zarr-pds', anon=True)
ds = xr.open_dataset(file, chunks={}, engine='zarr')
ds

Check the Current Chunk Shape and Size

From the output, we can see there are two dimensions, time of length 227,904 and feature_id of length 2,729,077. Both time and feature_id are coordinates, along with two extra coordinates of latitude and longitude which are tied to the feature_id. Inspecting these extra coordinates in detail, we notice they are “chunked”, but only to a single chunk of size 10.41 MiB. This is good, as we will want them in a single chunk for writing, such that they are fully read in with each data variable chunk. Finally, there are eight data variables, all having both of the dimensions, meaning they are 2D data. Examining the variables in detail, they all have chunks of shape {'time': 672, 'feature_id': 30000} and size of 153.81 MiB or 76.9 MiB (depending if the variable is 64- or 32-bit, respectively). Additionally, we can see that each variable is a whopping 4.53 or 2.26 TiB in memory, which means the whole dataset is almost 32 TiB!

This is a bit more than we want to work with in our example. So, let’s go ahead and select a subset of the data that is about 1 GiB in memory. While this is not a larger-than-memory dataset anymore, it is still reasonably large and it will work well for this example without taking forever to get data and rechunk. For the subset, let’s only look at water year 2018 (i.e., October 2017 to September 2018) for the variables streamflow and velocity for the first 15,000 feature_ids.

ds = ds[['streamflow', 'velocity']]
ds = ds.isel(feature_id=slice(0, 15000))
ds = ds.sel(time=slice('2017-10-01', '2018-09-30'))
ds

Now, our subset of data is only about 1 GiB per data variable and has a chunk shape of {'time': 672, 'feature_id': 15000} with size of 76.9 MiB. This is a good chunk size and between the optimal range of 10 to 200 MiB. However, the chunk shape may not be an optimal choice for our analysis as it is chunked completely by feature_id (i.e., all feature IDs for a given time are read in a single chunk).

Choose a New Chunk Shape and Size

To decide on a new chunk shape and size, we need to determine how we will use the data. As we just discussed, an analysis which samples all locations for a single point in time would need to fetch only a single chunk, which is perfect for that analysis. However, a time-series analysis (i.e. sampling all time-step values for a single feature_id) would require 13 chunks to be read for one feature. This means many more chunks must be fetched for this read pattern. Thinking of data usage, the preferred format for the streamflow data variable is likely time-series wise chunking as this variable is more often used as full time series at a single location. The same goes for velocity. However, for the purpose of this example, let’s assume that we don’t know how velocity will be used and give it a different chunking pattern, one where we balance the number of chunks between each dimension.

nfeature = len(ds.feature_id)
ntime = len(ds.time)

streamflow_chunk_plan = {'time': ntime, 'feature_id': 1}
bytes_per_value = ds.streamflow.dtype.itemsize
total_bytes = streamflow_chunk_plan['time'] * streamflow_chunk_plan['feature_id'] * bytes_per_value
streamflow_MiB = total_bytes / (2 ** 10) ** 2
print("STREAMFLOW \n"
      f"Shape of chunk: {streamflow_chunk_plan} \n"
      f"Partial 'time' chunk remainder: {(ntime % streamflow_chunk_plan['time']) / streamflow_chunk_plan['time']} \n"
      f"Partial 'feature_id' chunk remainder: {(nfeature % streamflow_chunk_plan['feature_id']) / streamflow_chunk_plan['feature_id']} \n"
      f"Chunk size: {streamflow_MiB:.2f} [MiB] \n")

chunks_per_dim = 3
velocity_chunk_plan = {'time': ntime // chunks_per_dim, 'feature_id': nfeature // chunks_per_dim}
bytes_per_value = ds.velocity.dtype.itemsize
total_bytes = velocity_chunk_plan['time'] * velocity_chunk_plan['feature_id'] * bytes_per_value
velocity_MiB = total_bytes / (2 ** 10) ** 2
print("VELOCITY \n"
      f"Shape of chunk: {velocity_chunk_plan} \n"
      f"Partial 'time' chunk remainder: {(ntime % velocity_chunk_plan['time']) / velocity_chunk_plan['time']} \n"
      f"Partial 'feature_id' chunk remainder: {(nfeature % velocity_chunk_plan['feature_id']) / velocity_chunk_plan['feature_id']} \n"
      f"Chunk size: {velocity_MiB:.2f} [MiB]")

The time-series streamflow chunking for a single feature ID has way too small of a chunk size. If we increase the number of feature IDs per chunk by a factor of 1000, we should have the right size for that chunk As for velocity, the split of the data into three chunks along both dimensions resulted in no partial chunks and a chunk size of 111 MiB. This is within our optimal chunk size range. So, let’s stick with the velocity chunks and recheck the streamflow chunking with 1000 features per time series chunk.

streamflow_chunk_plan = {'time': ntime, 'feature_id': 1000}
bytes_per_value = ds.streamflow.dtype.itemsize
total_bytes = streamflow_chunk_plan['time'] * streamflow_chunk_plan['feature_id'] * bytes_per_value
streamflow_MiB = total_bytes / (2 ** 10) ** 2
print("STREAMFLOW \n"
      f"Chunk of shape {streamflow_chunk_plan} \n"
      f"Partial 'time' chunk remainder: {(ntime % streamflow_chunk_plan['time']) / streamflow_chunk_plan['time']} \n"
      f"Partial 'feature_id' chunk remainder: {(nfeature % streamflow_chunk_plan['feature_id']) / streamflow_chunk_plan['feature_id']} \n"
      f"Chunk size: {streamflow_MiB:.2f} [MiB] \n")

Alright, no remainders and a reasonable chunk size. Time to get to rechunking!

Rechunk with Rechunker

This is a relatively trivial example, due to the smaller size of the subset of the dataset. As the whole subset can fit into memory easily, chunking in general is largely unnecesary in terms of optimizing I/O (however, parallelism is still a consideration). But it is worth doing, as the concepts will apply if we take this to the full dataset.

First thing we need to determine is a chunk plan to describe the chunk layout we want. We already created a basic version of this above that describes the chunk shapes of streamflow and velocity. We just need to place it into a more cohesive list along with chunk shapes for the coordinates.

chunk_plan = {
    'streamflow': streamflow_chunk_plan,
    'velocity': velocity_chunk_plan,
     # We don't want any of the coordinates chunked
    'latitude': (nfeature,),
    'longitude': (nfeature,),    
    'time': (ntime,),
    'feature_id': (nfeature,)
}
chunk_plan

With this plan, we can ask Rechunker to re-write the data using the prescribed chunking pattern. Rechunker will read the data and rechunk it using an intermediate Zarr store for efficiency. This will produce a new Zarr store with the chunk shapes we specified in chunk_plan.

outfile = "rechunked_nwm.zarr"
temp_store = "/tmp/scratch.zarr"
try:
    result = rechunk(
        ds,
        target_chunks=chunk_plan,
        max_mem="2GB",
        target_store=outfile ,
        temp_store=temp_store
    )
    display(result)
except Exception as e:
    print(e)

Oh, that is not what we wanted! We seem to have gotten an error indicating overlap in chunks between the read and write. Looking at the error, it is saying that the first time chunk we are reading is a partial chunk and not a full chunk. So, when Rechunker tries to read the data and then write the first rechunk, it is having to read two chunks to write to the one chunk. This is a one-to-many write, which can corrupt our file when done in parallel with dask. Thank goodness Rechunker caught this for us! Reading the recommended fix, it seems the only way to go about this is to call chunk() and reset the chunking on the original data. In other words, after we select the subset from the dataset, we need to realign the chunks such that the first chunk is not a partial chunk. This is simple enough to do. So much so, we can just do it when passing the dataset subset to Rechunker.

# We must delete the started rechunked zarr stores
shutil.rmtree(outfile)
shutil.rmtree(temp_store)

result = rechunk(
    ds.chunk({'time': 672, 'feature_id': 15000}),
    target_chunks=chunk_plan,
    max_mem="2GB",
    target_store=outfile ,
    temp_store=temp_store
)
result

Alright, that worked with no problems! Now, we must specifically direct rechunk to calculate. To do this, we can call execute() on our result Rechunked object. Without the call to execute(), the Zarr dataset will be empty, and result will only hold a ‘task graph’ outlining the calculation steps.

_ = result.execute()
_ = zarr.consolidate_metadata(outfile)

Confirm the Creation of the Zarr Store by Rechunker

Let’s read in the resulting re-chunked dataset to confirm it turned out how we intended.

ds_rechunked = xr.open_zarr(outfile)
ds_rechunked

Great, our chunk shapes are exactly what we specified! Let’s do one final check that will compare our original subset of the dataset with our new rechunked dataset, that way we can confirm nothing unexpected happened during rechunking.

np.abs(ds - ds_rechunked).compute().max()

Perfect! The maximum absolute difference between each both the streamflow and velocity variables is 0. In other words, they are exactly the same, and Rechunker worked as expect.

Now that you know how to rechunk a Zarr store using Rechunker, you should know all of the basics there are in terms of chunking. You are now ready to explore more advanced chunking topics in chunking if you are interested!

Clean Up

As we don’t want to keep this rechunked Zarr on our local machine, let’s go ahead and delete it.

shutil.rmtree(outfile)
shutil.rmtree(temp_store)