Writing Chunked Files#
The goal of this notebook is to learn how to load a collection of NetCDF files, chunk the data, write the data in Zarr format, and confirm the proper creation of the Zarr store. We will be writing to our local storage for simplicity (as this is just a tutorial notebook), but you can easily change the output path to be anywhere including cloud storage.
import fsspec
import xarray as xr
import numpy as np
import hvplot.xarray
Example Dataset#
In this notebook, we will use the daily gridMET precipitation dataset as an example for reading data and writing to Zarr.
The data is currently hosted on the HyTEST OSN as a collection of NetCDF files.
To get the files, we will use fsspec to open each year of precipitation data to a list.
Then, we can read in the all the files at once using xarray.open_mfdataset
.
fs = fsspec.filesystem(
's3',
anon=True, # anonymous = does not require credentials
client_kwargs={'endpoint_url': 'https://usgs.osn.mghpcc.org/'}
)
precip_files = [fs.open(file) for file in fs.glob('s3://mdmf/gdp/netcdf/gridmet/gridmet/pr_*.nc')]
ds = xr.open_mfdataset(
precip_files,
chunks={},
engine='h5netcdf'
)
ds
<xarray.Dataset> Size: 99GB Dimensions: (day: 15275, lat: 585, lon: 1386, crs: 1) Coordinates: * lon (lon) float64 11kB -124.8 -124.7 ... -67.1 -67.06 * lat (lat) float64 5kB 49.4 49.36 49.32 ... 25.11 25.07 * day (day) datetime64[ns] 122kB 1979-01-01 ... 2020-10-26 * crs (crs) uint16 2B 3 Data variables: precipitation_amount (day, lat, lon) float64 99GB dask.array<chunksize=(61, 98, 231), meta=np.ndarray> Attributes: (12/19) geospatial_bounds_crs: EPSG:4326 Conventions: CF-1.6 geospatial_bounds: POLYGON((-124.7666666333333 49.40000000000000... geospatial_lat_min: 25.066666666666666 geospatial_lat_max: 49.40000000000000 geospatial_lon_min: -124.7666666333333 ... ... date: 02 July 2019 note1: The projection information for this file is: ... note2: Citation: Abatzoglou, J.T., 2013, Development... note3: Data in slices after last_permanent_slice (1-... note4: Data in slices after last_provisional_slice (... note5: Days correspond approximately to calendar day...
Selecting Chunk Shape and Size#
As we can see in the rich HTML output of our dataset, the NetCDF files were already chunked with pattern of {'day': 61, 'lat': 98, 'lon': 231}
.
However, the size of these chunks are relatively small and near the lower limit of an acceptable chunk size of 10 MiB.
So, it would be better if we could increase our chunk sizes to say 70-110 MiB.
To do this, we will simply use multiples of the current chunks so we don’t have to completely rechunk the dataset (grouping chunks is way faster than completely changing chunk shape).
Our goal will be to try and evenly distribute the number of chunks between the time and space dimensions.
First, let’s check how many chunks are in the temporal and spatial dimensions with the current chunk shape.
ds.precipitation_amount.encoding
{'chunksizes': (61, 98, 231),
'fletcher32': False,
'shuffle': False,
'preferred_chunks': {'day': 61, 'lat': 98, 'lon': 231},
'zlib': True,
'complevel': 9,
'source': '<File-like object S3FileSystem, mdmf/gdp/netcdf/gridmet/gridmet/pr_1979.nc>',
'original_shape': (365, 585, 1386),
'dtype': dtype('uint16'),
'missing_value': np.int16(32767),
'_FillValue': np.uint16(32767),
'_Unsigned': 'true',
'scale_factor': np.float64(0.1),
'add_offset': np.float64(0.0),
'coordinates': 'lon lat'}
time_chunk_shape, lat_chunk_shape, lon_chunk_shape = ds.precipitation_amount.encoding['chunksizes']
print(f"Number of temporal chunks: {np.ceil(len(ds.day) / time_chunk_shape)}")
print(f"Number of spatial chunks: {np.ceil(len(ds.lat) / lat_chunk_shape) * np.ceil(len(ds.lon) / lon_chunk_shape)}")
Number of temporal chunks: 251.0
Number of spatial chunks: 36.0
Okay, so there are about 7 times as many time chunks as there are spatial chunks. Let’s try to balance this better so there are an approximately equal amount. We will do this by simply increasing the time chunk shape by a factor of 7. Then, we can double check our chunk size to see if it aligns with our chunk size goal.
Note
Also by increasing our temporal chunk shape by a factor of seven, it is now larger than a whole year. This means if we only wanted a single year, we would at most need to read two chunks.
chunk_plan = {
"day": time_chunk_shape * 7,
"lat" : lat_chunk_shape,
"lon" : lon_chunk_shape
}
bytes_per_value = ds.precipitation_amount.dtype.itemsize
print(f"Chunk Plan:{chunk_plan}")
print(f"Number of temporal chunks: {np.ceil(len(ds.day) / chunk_plan['day'])}")
print(f"Number of spatial chunks: {np.ceil(len(ds.lat) / chunk_plan['lat']) * np.ceil(len(ds.lon) / chunk_plan['lon'])}")
print(f"Chunk size in MiB: {(bytes_per_value * chunk_plan['day'] * chunk_plan['lat'] * chunk_plan['lon']) / 2 ** 10 / 2 ** 10:0.2f}")
Chunk Plan:{'day': 427, 'lat': 98, 'lon': 231}
Number of temporal chunks: 36.0
Number of spatial chunks: 36.0
Chunk size in MiB: 73.75
Nice! That got us the exact same number of chunks and the chunk size we were looking for. As a final check, let’s make sure none of the ending chunks contain <50% data.
print(f"Number of day chunks: {(len(ds.day) / chunk_plan['day'])}")
print(f"Number of lat chunks: {(len(ds.lat) / chunk_plan['lat'])}")
print(f"Number of lon chunks: {(len(ds.lon) / chunk_plan['lon'])}")
Number of day chunks: 35.7728337236534
Number of lat chunks: 5.969387755102041
Number of lon chunks: 6.0
Perfect! So the final day chunk is only 77% full, but that is good enough. Let’s now chunk the dataset to our chunking plan.
ds = ds.chunk(chunk_plan)
ds
<xarray.Dataset> Size: 99GB Dimensions: (day: 15275, lat: 585, lon: 1386, crs: 1) Coordinates: * lon (lon) float64 11kB -124.8 -124.7 ... -67.1 -67.06 * lat (lat) float64 5kB 49.4 49.36 49.32 ... 25.11 25.07 * day (day) datetime64[ns] 122kB 1979-01-01 ... 2020-10-26 * crs (crs) uint16 2B 3 Data variables: precipitation_amount (day, lat, lon) float64 99GB dask.array<chunksize=(427, 98, 231), meta=np.ndarray> Attributes: (12/19) geospatial_bounds_crs: EPSG:4326 Conventions: CF-1.6 geospatial_bounds: POLYGON((-124.7666666333333 49.40000000000000... geospatial_lat_min: 25.066666666666666 geospatial_lat_max: 49.40000000000000 geospatial_lon_min: -124.7666666333333 ... ... date: 02 July 2019 note1: The projection information for this file is: ... note2: Citation: Abatzoglou, J.T., 2013, Development... note3: Data in slices after last_permanent_slice (1-... note4: Data in slices after last_provisional_slice (... note5: Days correspond approximately to calendar day...
Now, let’s save this data to a Zarr!
Writing Chunked Zarr#
As discussed in the Xarray User-guide for reading and writing Zarr, chunks are specified to our Zarr store in one of three ways in the preferential order of:
Manual chunk sizing through the use of the
encoding
argumentAutomatic chunking based on chunks of the dask arrays
Default chunk behavior determined by the Zarr library
In our case, we updated the dask array chunks by calling ds.chunk()
.
Therefore, we have the correct chunks and should be good to go.
Tip
This is our preferred method over using the encoding
argument, as the positional ordering of the chunk shape in the encoding
argument must match the positional ordering of the dimensions in each array.
If they do not match you can get incorrect chunk shapes in the Zarr store.
If you have multiple variables, using encoding
could allow for specifying individual chunking shapes for each variable.
However, if this is the case, we recommend updating each variable individually using, for example, ds.precipitation_amount.chunk()
to change the individual variable chunk shape.
# Let's actually use only one time chunk as this is an
# example and we don't want to write 90 GiB
ds_write = ds.isel(day=slice(0, chunk_plan['day']))
outfile = "gridmet_precip.zarr"
_ = ds_write.to_zarr(outfile)
Now, let’s read in the just-written dataset to verify its integrity and chunks.
ds_read = xr.open_dataset(outfile, engine='zarr', chunks={})
ds_read
<xarray.Dataset> Size: 3GB Dimensions: (crs: 1, day: 427, lat: 585, lon: 1386) Coordinates: * crs (crs) uint16 2B 3 * day (day) datetime64[ns] 3kB 1979-01-01 ... 1980-03-02 * lat (lat) float64 5kB 49.4 49.36 49.32 ... 25.11 25.07 * lon (lon) float64 11kB -124.8 -124.7 ... -67.1 -67.06 Data variables: precipitation_amount (day, lat, lon) float64 3GB dask.array<chunksize=(427, 98, 231), meta=np.ndarray> Attributes: (12/19) Conventions: CF-1.6 author: John Abatzoglou - University of Idaho, jabatz... coordinate_system: EPSG:4326 date: 02 July 2019 geospatial_bounds: POLYGON((-124.7666666333333 49.40000000000000... geospatial_bounds_crs: EPSG:4326 ... ... geospatial_lon_units: decimal_degrees east note1: The projection information for this file is: ... note2: Citation: Abatzoglou, J.T., 2013, Development... note3: Data in slices after last_permanent_slice (1-... note4: Data in slices after last_provisional_slice (... note5: Days correspond approximately to calendar day...
Great! Everything looks good!
Assessing Compression#
Now that our Zarr store is made, let’s check how much the data was compressed.
By default, Zarr uses the Blosc compressor when calling xarray.Dataset.to_zarr()
if we don’t specify a compressor in the encoding
.
So, our data should be compressed by default, and we can examine each chunk on disk to confirm their compression factor.
To examine this, let’s create a new local filesystem
to get the file info.
fs_local = fsspec.filesystem('local')
Now, estimate the file sizes.
# Exclude any hidden files (i.e., starts with .)
chunkfiles = fs_local.glob(outfile + "/precipitation_amount/[!.]*")
filesize = np.array([fs_local.du(file) for file in chunkfiles])
filesize_MiB = xr.DataArray(data=filesize / 2**10 / 2**10, name='Size', attrs={'units': 'MiB'})
print("Stored Chunk Size Summary Statistics [MiB]:\n"
f"Total: {filesize_MiB.sum():.4f}\n"
f"Minimum: {filesize_MiB.min():.4f}\n"
f"Mean: {filesize_MiB.mean():.4f}\n"
f"Maximum: {filesize_MiB.max():.4f}")
filesize_MiB.hvplot.hist(ylabel='Counts')
Stored Chunk Size Summary Statistics [MiB]:
Total: 85.7155
Minimum: 0.0746
Mean: 2.3810
Maximum: 4.8642
As we can see, the total dataset (excluding coordinates) is only 85 MiB on disk, with chunk sizes varying from 75 KiB to 4.9 MiB. This size is drastically smaller than the quoted total size for the xarray output, which said 2.58 GiB. Same for the individual chunks, which were quoted at 73.75 MiB. Let’s get an exact comparison and compression ratio for the data we read in.
bytes_per_value = ds_read.precipitation_amount.dtype.itemsize
total_size = ds_read['precipitation_amount'].size * bytes_per_value
chunk_size = np.array(ds_read['precipitation_amount'].encoding['chunks']).prod() * bytes_per_value
print("Read-in Chunk Size Summary Statistics:\n"
f"Total: {total_size / (2**10)**3:.4f} [GiB]\n"
f"Chunks: {chunk_size / (2**10)**2:.4f} [MiB]")
print("\n"
"Compression Ratio Summary Statistics:\n"
f"Total: {total_size / filesize.sum():.3f}\n"
f"Minimum: {(chunk_size / filesize).min():.4f}\n"
f"Mean: {(chunk_size / filesize).mean():.4f}\n"
f"Maximum: {(chunk_size / filesize).max():.4f}")
Read-in Chunk Size Summary Statistics:
Total: 2.5795 [GiB]
Chunks: 73.7490 [MiB]
Compression Ratio Summary Statistics:
Total: 30.816
Minimum: 15.1616
Mean: 162.8470
Maximum: 989.0445
Ooph! This tells us that we get an astonishing average compression ratio of 30:1 on this particular data. Pretty good results.
Appending New Chunk#
Since this compression is so good, let’s go ahead and add another time chunk onto our existing Zarr store.
This is simple in xarray, especially since we are just appending another time chunk.
All we have to do is add append_dim
to our .to_zarr()
call to append to the time dimension.
ds_write = ds.isel(day=slice(chunk_plan['day'], chunk_plan['day']* 2))
_ = ds_write.to_zarr(outfile, append_dim='day')
Now, let’s read in the appended dataset to verify everything worked.
ds_read = xr.open_dataset(outfile, engine='zarr', chunks={})
ds_read
<xarray.Dataset> Size: 6GB Dimensions: (crs: 1, day: 854, lat: 585, lon: 1386) Coordinates: * crs (crs) uint16 2B 3 * day (day) datetime64[ns] 7kB 1979-01-01 ... 1981-05-03 * lat (lat) float64 5kB 49.4 49.36 49.32 ... 25.11 25.07 * lon (lon) float64 11kB -124.8 -124.7 ... -67.1 -67.06 Data variables: precipitation_amount (day, lat, lon) float64 6GB dask.array<chunksize=(427, 98, 231), meta=np.ndarray> Attributes: (12/19) Conventions: CF-1.6 author: John Abatzoglou - University of Idaho, jabatz... coordinate_system: EPSG:4326 date: 02 July 2019 geospatial_bounds: POLYGON((-124.7666666333333 49.40000000000000... geospatial_bounds_crs: EPSG:4326 ... ... geospatial_lon_units: decimal_degrees east note1: The projection information for this file is: ... note2: Citation: Abatzoglou, J.T., 2013, Development... note3: Data in slices after last_permanent_slice (1-... note4: Data in slices after last_provisional_slice (... note5: Days correspond approximately to calendar day...
That looks like it worked as expected! Let’s now double check the compression on these files.
# Exclude any hidden files (i.e., starts with .)
chunkfiles = fs_local.glob(outfile + "/precipitation_amount/[!.]*")
filesize = np.array([fs_local.du(file) for file in chunkfiles])
filesize_MiB = xr.DataArray(data=filesize / 2**10 / 2**10, name='Size', attrs={'units': 'MiB'})
print("Stored Chunk Size Summary Statistics [MiB]:\n"
f"Total: {filesize_MiB.sum():.4f}\n"
f"Minimum: {filesize_MiB.min():.4f}\n"
f"Mean: {filesize_MiB.mean():.4f}\n"
f"Maximum: {filesize_MiB.max():.4f}")
filesize_MiB.hvplot.hist(ylabel='Counts')
Stored Chunk Size Summary Statistics [MiB]:
Total: 164.0833
Minimum: 0.0746
Mean: 2.2789
Maximum: 4.8642
Looks like we stayed at the same levels of compression which is great.
Clean Up#
So, hopefully now you know the basics of how to create a Zarr store from some NetCDF files and set its chunks’ shape. The same methods would apply when rechunking a dataset, which we will get into next.
As we don’t want to keep this Zarr on our local machine, let’s go ahead and delete it.
# Clean up by deleting the zarr store
fs_local.rm(outfile, recursive=True)