Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

Writing Chunked Files

The goal of this notebook is to learn how to load a collection of NetCDF files, chunk the data, write the data in Zarr format, and confirm the proper creation of the Zarr store. We will be writing to our local storage for simplicity (as this is just a tutorial notebook), but you can easily change the output path to be anywhere including cloud storage.

import fsspec
import xarray as xr
import numpy as np
import hvplot.xarray

Example Dataset

In this notebook, we will use the daily gridMET precipitation dataset as an example for reading data and writing to Zarr. The data is currently hosted on the HyTEST OSN as a collection of NetCDF files. To get the files, we will use fsspec to open each year of precipitation data to a list. Then, we can read in the all the files at once using xarray.open_mfdataset.

fs = fsspec.filesystem(
    's3',
    anon=True,   # anonymous = does not require credentials
    client_kwargs={'endpoint_url': 'https://usgs.osn.mghpcc.org/'}
)
precip_files = [fs.open(file) for file in fs.glob('s3://mdmf/gdp/netcdf/gridmet/gridmet/pr_*.nc')]
ds = xr.open_mfdataset(
    precip_files,
    chunks={},
    engine='h5netcdf'
)
ds

Selecting Chunk Shape and Size

As we can see in the rich HTML output of our dataset, the NetCDF files were already chunked with pattern of {'day': 61, 'lat': 98, 'lon': 231}. However, the size of these chunks are relatively small and near the lower limit of an acceptable chunk size of 10 MiB. So, it would be better if we could increase our chunk sizes to say 70-110 MiB. To do this, we will simply use multiples of the current chunks so we don’t have to completely rechunk the dataset (grouping chunks is way faster than completely changing chunk shape). Our goal will be to try and evenly distribute the number of chunks between the time and space dimensions.

First, let’s check how many chunks are in the temporal and spatial dimensions with the current chunk shape.

ds.precipitation_amount.encoding
time_chunk_shape, lat_chunk_shape, lon_chunk_shape = ds.precipitation_amount.encoding['chunksizes']

print(f"Number of temporal chunks: {np.ceil(len(ds.day) / time_chunk_shape)}")
print(f"Number of spatial chunks: {np.ceil(len(ds.lat) / lat_chunk_shape) * np.ceil(len(ds.lon) / lon_chunk_shape)}")

Okay, so there are about 7 times as many time chunks as there are spatial chunks. Let’s try to balance this better so there are an approximately equal amount. We will do this by simply increasing the time chunk shape by a factor of 7. Then, we can double check our chunk size to see if it aligns with our chunk size goal.

chunk_plan = {
    "day": time_chunk_shape * 7, 
    "lat" : lat_chunk_shape,
    "lon" : lon_chunk_shape
}

bytes_per_value = ds.precipitation_amount.dtype.itemsize

print(f"Chunk Plan:{chunk_plan}")
print(f"Number of temporal chunks: {np.ceil(len(ds.day) / chunk_plan['day'])}")
print(f"Number of spatial chunks: {np.ceil(len(ds.lat) / chunk_plan['lat']) * np.ceil(len(ds.lon) / chunk_plan['lon'])}")
print(f"Chunk size in MiB: {(bytes_per_value * chunk_plan['day'] * chunk_plan['lat'] * chunk_plan['lon']) / 2 ** 10 / 2 ** 10:0.2f}")

Nice! That got us the exact same number of chunks and the chunk size we were looking for. As a final check, let’s make sure none of the ending chunks contain <50% data.

print(f"Number of day chunks: {(len(ds.day) / chunk_plan['day'])}")
print(f"Number of lat chunks: {(len(ds.lat) / chunk_plan['lat'])}")
print(f"Number of lon chunks: {(len(ds.lon) / chunk_plan['lon'])}")

Perfect! So the final day chunk is only 77% full, but that is good enough. Let’s now chunk the dataset to our chunking plan.

ds = ds.chunk(chunk_plan)
ds

Now, let’s save this data to a Zarr!

Writing Chunked Zarr

As discussed in the Xarray User-guide for reading and writing Zarr, chunks are specified to our Zarr store in one of three ways in the preferential order of:

  1. Manual chunk sizing through the use of the encoding argument

  2. Automatic chunking based on chunks of the dask arrays

  3. Default chunk behavior determined by the Zarr library

In our case, we updated the dask array chunks by calling ds.chunk(). Therefore, we have the correct chunks and should be good to go.

# Let's actually use only one time chunk as this is an
# example and we don't want to write 90 GiB
ds_write = ds.isel(day=slice(0, chunk_plan['day']))

outfile = "gridmet_precip.zarr"
_ = ds_write.to_zarr(outfile)

Now, let’s read in the just-written dataset to verify its integrity and chunks.

ds_read = xr.open_dataset(outfile, engine='zarr', chunks={})
ds_read

Great! Everything looks good!

Assessing Compression

Now that our Zarr store is made, let’s check how much the data was compressed. By default, Zarr uses the Blosc compressor when calling xarray.Dataset.to_zarr() if we don’t specify a compressor in the encoding. So, our data should be compressed by default, and we can examine each chunk on disk to confirm their compression factor.

To examine this, let’s create a new local filesystem to get the file info.

fs_local = fsspec.filesystem('local')

Now, estimate the file sizes.

# Exclude any hidden files (i.e., starts with .)
chunkfiles = fs_local.glob(outfile + "/precipitation_amount/[!.]*")
filesize = np.array([fs_local.du(file) for file in chunkfiles])
filesize_MiB = xr.DataArray(data=filesize / 2**10 / 2**10, name='Size', attrs={'units': 'MiB'})

print("Stored Chunk Size Summary Statistics [MiB]:\n"
      f"Total: {filesize_MiB.sum():.4f}\n"
      f"Minimum: {filesize_MiB.min():.4f}\n"
      f"Mean:  {filesize_MiB.mean():.4f}\n"
      f"Maximum: {filesize_MiB.max():.4f}")
filesize_MiB.hvplot.hist(ylabel='Counts')

As we can see, the total dataset (excluding coordinates) is only 85 MiB on disk, with chunk sizes varying from 75 KiB to 4.9 MiB. This size is drastically smaller than the quoted total size for the xarray output, which said 2.58 GiB. Same for the individual chunks, which were quoted at 73.75 MiB. Let’s get an exact comparison and compression ratio for the data we read in.

bytes_per_value = ds_read.precipitation_amount.dtype.itemsize
total_size = ds_read['precipitation_amount'].size * bytes_per_value
chunk_size = np.array(ds_read['precipitation_amount'].encoding['chunks']).prod() * bytes_per_value
print("Read-in Chunk Size Summary Statistics:\n"
      f"Total: {total_size / (2**10)**3:.4f} [GiB]\n"
      f"Chunks: {chunk_size / (2**10)**2:.4f} [MiB]")

print("\n"
      "Compression Ratio Summary Statistics:\n"
      f"Total: {total_size / filesize.sum():.3f}\n"
      f"Minimum: {(chunk_size / filesize).min():.4f}\n"
      f"Mean:  {(chunk_size / filesize).mean():.4f}\n"
      f"Maximum: {(chunk_size / filesize).max():.4f}")

Ooph! This tells us that we get an astonishing average compression ratio of 30:1 on this particular data. Pretty good results.

Appending New Chunk

Since this compression is so good, let’s go ahead and add another time chunk onto our existing Zarr store. This is simple in xarray, especially since we are just appending another time chunk. All we have to do is add append_dim to our .to_zarr() call to append to the time dimension.

ds_write = ds.isel(day=slice(chunk_plan['day'], chunk_plan['day']* 2))
_ = ds_write.to_zarr(outfile, append_dim='day')

Now, let’s read in the appended dataset to verify everything worked.

ds_read = xr.open_dataset(outfile, engine='zarr', chunks={})
ds_read

That looks like it worked as expected! Let’s now double check the compression on these files.

# Exclude any hidden files (i.e., starts with .)
chunkfiles = fs_local.glob(outfile + "/precipitation_amount/[!.]*")
filesize = np.array([fs_local.du(file) for file in chunkfiles])
filesize_MiB = xr.DataArray(data=filesize / 2**10 / 2**10, name='Size', attrs={'units': 'MiB'})

print("Stored Chunk Size Summary Statistics [MiB]:\n"
      f"Total: {filesize_MiB.sum():.4f}\n"
      f"Minimum: {filesize_MiB.min():.4f}\n"
      f"Mean:  {filesize_MiB.mean():.4f}\n"
      f"Maximum: {filesize_MiB.max():.4f}")
filesize_MiB.hvplot.hist(ylabel='Counts')

Looks like we stayed at the same levels of compression which is great.

Clean Up

So, hopefully now you know the basics of how to create a Zarr store from some NetCDF files and set its chunks’ shape. The same methods would apply when rechunking a dataset, which we will get into next.

As we don’t want to keep this Zarr on our local machine, let’s go ahead and delete it.

# Clean up by deleting the zarr store
fs_local.rm(outfile, recursive=True)