The goal of this notebook is to learn how to load a collection of NetCDF files, chunk the data, write the data in Zarr format, and confirm the proper creation of the Zarr store. We will be writing to our local storage for simplicity (as this is just a tutorial notebook), but you can easily change the output path to be anywhere including cloud storage.
import fsspec
import xarray as xr
import numpy as np
import hvplot.xarrayExample Dataset¶
In this notebook, we will use the daily gridMET precipitation dataset as an example for reading data and writing to Zarr.
The data is currently hosted on the HyTEST OSN as a collection of NetCDF files.
To get the files, we will use fsspec to open each year of precipitation data to a list.
Then, we can read in the all the files at once using xarray.open_mfdataset.
fs = fsspec.filesystem(
's3',
anon=True, # anonymous = does not require credentials
client_kwargs={'endpoint_url': 'https://usgs.osn.mghpcc.org/'}
)
precip_files = [fs.open(file) for file in fs.glob('s3://mdmf/gdp/netcdf/gridmet/gridmet/pr_*.nc')]
ds = xr.open_mfdataset(
precip_files,
chunks={},
engine='h5netcdf'
)
dsSelecting Chunk Shape and Size¶
As we can see in the rich HTML output of our dataset, the NetCDF files were already chunked with pattern of {'day': 61, 'lat': 98, 'lon': 231}.
However, the size of these chunks are relatively small and near the lower limit of an acceptable chunk size of 10 MiB.
So, it would be better if we could increase our chunk sizes to say 70-110 MiB.
To do this, we will simply use multiples of the current chunks so we don’t have to completely rechunk the dataset (grouping chunks is way faster than completely changing chunk shape).
Our goal will be to try and evenly distribute the number of chunks between the time and space dimensions.
First, let’s check how many chunks are in the temporal and spatial dimensions with the current chunk shape.
ds.precipitation_amount.encodingtime_chunk_shape, lat_chunk_shape, lon_chunk_shape = ds.precipitation_amount.encoding['chunksizes']
print(f"Number of temporal chunks: {np.ceil(len(ds.day) / time_chunk_shape)}")
print(f"Number of spatial chunks: {np.ceil(len(ds.lat) / lat_chunk_shape) * np.ceil(len(ds.lon) / lon_chunk_shape)}")Okay, so there are about 7 times as many time chunks as there are spatial chunks. Let’s try to balance this better so there are an approximately equal amount. We will do this by simply increasing the time chunk shape by a factor of 7. Then, we can double check our chunk size to see if it aligns with our chunk size goal.
Also by increasing our temporal chunk shape by a factor of seven, it is now larger than a whole year. This means if we only wanted a single year, we would at most need to read two chunks.
chunk_plan = {
"day": time_chunk_shape * 7,
"lat" : lat_chunk_shape,
"lon" : lon_chunk_shape
}
bytes_per_value = ds.precipitation_amount.dtype.itemsize
print(f"Chunk Plan:{chunk_plan}")
print(f"Number of temporal chunks: {np.ceil(len(ds.day) / chunk_plan['day'])}")
print(f"Number of spatial chunks: {np.ceil(len(ds.lat) / chunk_plan['lat']) * np.ceil(len(ds.lon) / chunk_plan['lon'])}")
print(f"Chunk size in MiB: {(bytes_per_value * chunk_plan['day'] * chunk_plan['lat'] * chunk_plan['lon']) / 2 ** 10 / 2 ** 10:0.2f}")Nice! That got us the exact same number of chunks and the chunk size we were looking for. As a final check, let’s make sure none of the ending chunks contain <50% data.
print(f"Number of day chunks: {(len(ds.day) / chunk_plan['day'])}")
print(f"Number of lat chunks: {(len(ds.lat) / chunk_plan['lat'])}")
print(f"Number of lon chunks: {(len(ds.lon) / chunk_plan['lon'])}")Perfect! So the final day chunk is only 77% full, but that is good enough. Let’s now chunk the dataset to our chunking plan.
ds = ds.chunk(chunk_plan)
dsNow, let’s save this data to a Zarr!
Writing Chunked Zarr¶
As discussed in the Xarray User-guide for reading and writing Zarr, chunks are specified to our Zarr store in one of three ways in the preferential order of:
Manual chunk sizing through the use of the
encodingargumentAutomatic chunking based on chunks of the dask arrays
Default chunk behavior determined by the Zarr library
In our case, we updated the dask array chunks by calling ds.chunk().
Therefore, we have the correct chunks and should be good to go.
This is our preferred method over using the encoding argument, as the positional ordering of the chunk shape in the encoding argument must match the positional ordering of the dimensions in each array.
If they do not match you can get incorrect chunk shapes in the Zarr store.
If you have multiple variables, using encoding could allow for specifying individual chunking shapes for each variable.
However, if this is the case, we recommend updating each variable individually using, for example, ds.precipitation_amount.chunk() to change the individual variable chunk shape.
# Let's actually use only one time chunk as this is an
# example and we don't want to write 90 GiB
ds_write = ds.isel(day=slice(0, chunk_plan['day']))
outfile = "gridmet_precip.zarr"
_ = ds_write.to_zarr(outfile)Now, let’s read in the just-written dataset to verify its integrity and chunks.
ds_read = xr.open_dataset(outfile, engine='zarr', chunks={})
ds_readGreat! Everything looks good!
Assessing Compression¶
Now that our Zarr store is made, let’s check how much the data was compressed.
By default, Zarr uses the Blosc compressor when calling xarray.Dataset.to_zarr() if we don’t specify a compressor in the encoding.
So, our data should be compressed by default, and we can examine each chunk on disk to confirm their compression factor.
To examine this, let’s create a new local filesystem to get the file info.
fs_local = fsspec.filesystem('local')Now, estimate the file sizes.
# Exclude any hidden files (i.e., starts with .)
chunkfiles = fs_local.glob(outfile + "/precipitation_amount/[!.]*")
filesize = np.array([fs_local.du(file) for file in chunkfiles])
filesize_MiB = xr.DataArray(data=filesize / 2**10 / 2**10, name='Size', attrs={'units': 'MiB'})
print("Stored Chunk Size Summary Statistics [MiB]:\n"
f"Total: {filesize_MiB.sum():.4f}\n"
f"Minimum: {filesize_MiB.min():.4f}\n"
f"Mean: {filesize_MiB.mean():.4f}\n"
f"Maximum: {filesize_MiB.max():.4f}")
filesize_MiB.hvplot.hist(ylabel='Counts')As we can see, the total dataset (excluding coordinates) is only 85 MiB on disk, with chunk sizes varying from 75 KiB to 4.9 MiB. This size is drastically smaller than the quoted total size for the xarray output, which said 2.58 GiB. Same for the individual chunks, which were quoted at 73.75 MiB. Let’s get an exact comparison and compression ratio for the data we read in.
bytes_per_value = ds_read.precipitation_amount.dtype.itemsize
total_size = ds_read['precipitation_amount'].size * bytes_per_value
chunk_size = np.array(ds_read['precipitation_amount'].encoding['chunks']).prod() * bytes_per_value
print("Read-in Chunk Size Summary Statistics:\n"
f"Total: {total_size / (2**10)**3:.4f} [GiB]\n"
f"Chunks: {chunk_size / (2**10)**2:.4f} [MiB]")
print("\n"
"Compression Ratio Summary Statistics:\n"
f"Total: {total_size / filesize.sum():.3f}\n"
f"Minimum: {(chunk_size / filesize).min():.4f}\n"
f"Mean: {(chunk_size / filesize).mean():.4f}\n"
f"Maximum: {(chunk_size / filesize).max():.4f}")Ooph! This tells us that we get an astonishing average compression ratio of 30:1 on this particular data. Pretty good results.
Appending New Chunk¶
Since this compression is so good, let’s go ahead and add another time chunk onto our existing Zarr store.
This is simple in xarray, especially since we are just appending another time chunk.
All we have to do is add append_dim to our .to_zarr() call to append to the time dimension.
ds_write = ds.isel(day=slice(chunk_plan['day'], chunk_plan['day']* 2))
_ = ds_write.to_zarr(outfile, append_dim='day')Now, let’s read in the appended dataset to verify everything worked.
ds_read = xr.open_dataset(outfile, engine='zarr', chunks={})
ds_readThat looks like it worked as expected! Let’s now double check the compression on these files.
# Exclude any hidden files (i.e., starts with .)
chunkfiles = fs_local.glob(outfile + "/precipitation_amount/[!.]*")
filesize = np.array([fs_local.du(file) for file in chunkfiles])
filesize_MiB = xr.DataArray(data=filesize / 2**10 / 2**10, name='Size', attrs={'units': 'MiB'})
print("Stored Chunk Size Summary Statistics [MiB]:\n"
f"Total: {filesize_MiB.sum():.4f}\n"
f"Minimum: {filesize_MiB.min():.4f}\n"
f"Mean: {filesize_MiB.mean():.4f}\n"
f"Maximum: {filesize_MiB.max():.4f}")
filesize_MiB.hvplot.hist(ylabel='Counts')Looks like we stayed at the same levels of compression which is great.
Clean Up¶
So, hopefully now you know the basics of how to create a Zarr store from some NetCDF files and set its chunks’ shape. The same methods would apply when rechunking a dataset, which we will get into next.
As we don’t want to keep this Zarr on our local machine, let’s go ahead and delete it.
# Clean up by deleting the zarr store
fs_local.rm(outfile, recursive=True)