Rechunking Larger Datasets with Dask#
The goal of this notebook is to expand on the rechunking performed in the Introductory Rechunking tutorial. This notebook will perfrom the same operations, but will work on the much larger dataset and involve some parallelization using dask.
Warning
You should only run workflows like this tutorial on a cloud or HPC compute node. In application, this will require reading and writing enormous amounts of data. Using a typical network connection and simple compute environment, you would saturate your bandwidth and max out your processor, thereby taking days for the rechunking to complete.
import os
# Needed when boto3 >= 1.36.0 or the rechunking process will fail
# This needs to be set before the boto3 library gets loaded
# See: https://github.com/aws/aws-cli/issues/9214#issuecomment-2606619168
os.environ['AWS_REQUEST_CHECKSUM_CALCULATION'] = 'when_required'
os.environ['AWS_RESPONSE_CHECKSUM_VALIDATION'] = 'when_required'
import xarray as xr
import fsspec
from rechunker import rechunk
import zarr
import dask.distributed
import dask.diagnostics
import logging
import dask
from dask_gateway import Gateway
import configparser
dask.config.set({'temporary_directory':'../'})
<dask.config.set at 0x7f8ffe901e50>
Read in a Zarr Store#
Like the Introductory Rechunking tutorial, we will use the data from the National Water Model Retrospective Version 2.1.
The full dataset is part of the AWS Open Data Program, available via the S3 bucket at: s3://noaa-nwm-retro-v2-zarr-pds/
.
As this is a Zarr store, let’s read it in with xarray.open_dataset()
and engine='zarr'
.
file = fsspec.get_mapper('s3://noaa-nwm-retro-v2-zarr-pds', anon=True)
ds = xr.open_dataset(file, chunks={}, engine='zarr')
ds
<xarray.Dataset> Size: 35TB Dimensions: (time: 227904, feature_id: 2729077) Coordinates: * feature_id (feature_id) int32 11MB 101 179 ... 1180001803 1180001804 latitude (feature_id) float32 11MB dask.array<chunksize=(2729077,), meta=np.ndarray> longitude (feature_id) float32 11MB dask.array<chunksize=(2729077,), meta=np.ndarray> * time (time) datetime64[ns] 2MB 1993-01-01 ... 2018-12-31T23:00:00 Data variables: elevation (time, feature_id) float32 2TB dask.array<chunksize=(672, 30000), meta=np.ndarray> order (time, feature_id) int32 2TB dask.array<chunksize=(672, 30000), meta=np.ndarray> qBtmVertRunoff (time, feature_id) float64 5TB dask.array<chunksize=(672, 30000), meta=np.ndarray> qBucket (time, feature_id) float64 5TB dask.array<chunksize=(672, 30000), meta=np.ndarray> qSfcLatRunoff (time, feature_id) float64 5TB dask.array<chunksize=(672, 30000), meta=np.ndarray> q_lateral (time, feature_id) float64 5TB dask.array<chunksize=(672, 30000), meta=np.ndarray> streamflow (time, feature_id) float64 5TB dask.array<chunksize=(672, 30000), meta=np.ndarray> velocity (time, feature_id) float64 5TB dask.array<chunksize=(672, 30000), meta=np.ndarray> Attributes: (12/17) Conventions: CF-1.6 cdm_datatype: Station code_version: v5.1.0-alpha11 dev: dev_ prefix indicates development/internal me... dev_NOAH_TIMESTEP: 3600 dev_OVRTSWCRT: 1 ... ... model_output_type: channel_rt model_output_valid_time: 2018-12-28_00:00:00 model_total_valid_times: 2208 proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ... station_dimension: feature_id stream_order_output: 1
Restrict for Tutorial#
As we saw in the Introductory Rechunking tutorial, this dataset is massive, taking up almost 32 TiB uncompressed.
As this is a tutorial, we will still restrict the data to a subset - we don’t really need to work on the entire dataset to demonstrate how to use dask to scale up.
Following the Introductory Rechunking tutorial let’s only look at streamflow
and velocity
for the first 15,000 feature_id
s, |but we will look at the entire 2000s decade of water years (October 1999 through September 2009) instead of a single water year this time.
This will make our dataset larger-than-memory, but it should still run in a reasonable amount of time.
For processing the full-sized dataset, you’d just skip this step where we slice off a representative example of the data. Expect run time to increase in proportion to the size of the data being processed.
ds = ds[['streamflow', 'velocity']]
ds = ds.isel(feature_id=slice(0, 15000))
ds = ds.sel(time=slice('1999-10-01', '2009-09-30'))
ds
<xarray.Dataset> Size: 21GB Dimensions: (time: 87672, feature_id: 15000) Coordinates: * feature_id (feature_id) int32 60kB 101 179 181 183 ... 303420 303422 303424 latitude (feature_id) float32 60kB dask.array<chunksize=(15000,), meta=np.ndarray> longitude (feature_id) float32 60kB dask.array<chunksize=(15000,), meta=np.ndarray> * time (time) datetime64[ns] 701kB 1999-10-01 ... 2009-09-30T23:00:00 Data variables: streamflow (time, feature_id) float64 11GB dask.array<chunksize=(672, 15000), meta=np.ndarray> velocity (time, feature_id) float64 11GB dask.array<chunksize=(672, 15000), meta=np.ndarray> Attributes: (12/17) Conventions: CF-1.6 cdm_datatype: Station code_version: v5.1.0-alpha11 dev: dev_ prefix indicates development/internal me... dev_NOAH_TIMESTEP: 3600 dev_OVRTSWCRT: 1 ... ... model_output_type: channel_rt model_output_valid_time: 2018-12-28_00:00:00 model_total_valid_times: 2208 proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ... station_dimension: feature_id stream_order_output: 1
Now, our subset of data is only about 10 GiB per data variable and has a chunk shape of {'time': 672, 'feature_id': 15000}
with size of 76.9 MiB.
However, the chunk shape is not an optimal choice for our analysis as it is chunked completely by feature_id
(i.e., all feature IDs for a given time can be read in a single chunk).
Following the Introductory Rechunking tutorial, let’s get chunk shapes that are time-series wise chunking (i.e., all time
for a given feature_id
in one chunk) for streamflow and balanced for velocity.
Rechunk Plan#
Using our general strategy of time-series wise chunking for streamflow and balanced for velocity,
let’s compute how large the chunk sizes will be if we have chunk shapes of {'time': 87672, 'feature_id': 1}
for streamflow and 3 chunks per dimension for velocity.
nfeature = len(ds.feature_id)
ntime = len(ds.time)
streamflow_chunk_plan = {'time': ntime, 'feature_id': 1}
bytes_per_value = ds.streamflow.dtype.itemsize
total_bytes = streamflow_chunk_plan['time'] * streamflow_chunk_plan['feature_id'] * bytes_per_value
streamflow_MiB = total_bytes / (2 ** 10) ** 2
partial_chunks = {'time': ntime - streamflow_chunk_plan['time'] * (ntime / streamflow_chunk_plan['time']),
'feature_id': nfeature - streamflow_chunk_plan['feature_id'] * (nfeature / streamflow_chunk_plan['feature_id']),}
print("STREAMFLOW \n"
f"Chunk of shape {streamflow_chunk_plan} \n"
f"Partial 'time' chunk remainder: {partial_chunks['time']} ({partial_chunks['time']/streamflow_chunk_plan['time']:.3f}% of a chunk)\n"
f"Partial 'feature_id' chunk remainder: {partial_chunks['feature_id']} ({partial_chunks['feature_id']/streamflow_chunk_plan['feature_id']:.3f}% of a chunk)\n"
f"Chunk size: {streamflow_MiB:.2f} [MiB] \n")
chunks_per_dim = 3
velocity_chunk_plan = {'time': ntime // chunks_per_dim, 'feature_id': nfeature // chunks_per_dim}
bytes_per_value = ds.velocity.dtype.itemsize
total_bytes = velocity_chunk_plan['time'] * velocity_chunk_plan['feature_id'] * bytes_per_value
velocity_MiB = total_bytes / (2 ** 10) ** 2
partial_chunks = {'time': ntime - velocity_chunk_plan['time'] * chunks_per_dim,
'feature_id': nfeature - velocity_chunk_plan['feature_id'] * chunks_per_dim,}
print("VELOCITY \n"
f"Chunk of shape {velocity_chunk_plan} \n"
f"Partial 'time' chunk remainder: {partial_chunks['time']} ({partial_chunks['time']/velocity_chunk_plan['time']:.3f}% of a chunk)\n"
f"Partial 'feature_id' chunk remainder: {partial_chunks['feature_id']} ({partial_chunks['feature_id']/velocity_chunk_plan['feature_id']:.3f}% of a chunk)\n"
f"Chunk size: {velocity_MiB:.2f} [MiB]")
STREAMFLOW
Chunk of shape {'time': 87672, 'feature_id': 1}
Partial 'time' chunk remainder: 0.0 (0.000% of a chunk)
Partial 'feature_id' chunk remainder: 0.0 (0.000% of a chunk)
Chunk size: 0.67 [MiB]
VELOCITY
Chunk of shape {'time': 29224, 'feature_id': 5000}
Partial 'time' chunk remainder: 0 (0.000% of a chunk)
Partial 'feature_id' chunk remainder: 0 (0.000% of a chunk)
Chunk size: 1114.81 [MiB]
Okay, we can see that the streamflow chunk size is way to small by a factor of ~100. So, let’s include 100 feature IDs per chunk. As for velocity, it is ~10x too big. As it is an even chunk split, that means we need to increase the number of chunks per dimension by ~\(\sqrt{10} \approx 3\). However knowing that the time dimension is hourly, we can get no partial chunks if our chunk per dimension is a divisor of 24. Luckily, this also applies to the feature ID dimension as 15000 is a multiple of 24. So, rather than increasing our chunks per dimension by a factor of 3 to 9, let’s increase them to 12 as this will give no partial chunks.
nfeature = len(ds.feature_id)
ntime = len(ds.time)
streamflow_chunk_plan = {'time': ntime, 'feature_id': 100}
bytes_per_value = ds.streamflow.dtype.itemsize
total_bytes = streamflow_chunk_plan['time'] * streamflow_chunk_plan['feature_id'] * bytes_per_value
streamflow_MiB = total_bytes / (2 ** 10) ** 2
partial_chunks = {'time': ntime - streamflow_chunk_plan['time'] * (ntime / streamflow_chunk_plan['time']),
'feature_id': nfeature - streamflow_chunk_plan['feature_id'] * (nfeature / streamflow_chunk_plan['feature_id']),}
print("STREAMFLOW \n"
f"Chunk of shape {streamflow_chunk_plan} \n"
f"Partial 'time' chunk remainder: {partial_chunks['time']} ({partial_chunks['time']/streamflow_chunk_plan['time']:.3f}% of a chunk)\n"
f"Partial 'feature_id' chunk remainder: {partial_chunks['feature_id']} ({partial_chunks['feature_id']/streamflow_chunk_plan['feature_id']:.3f}% of a chunk)\n"
f"Chunk size: {streamflow_MiB:.2f} [MiB] \n")
chunks_per_dim = 12
velocity_chunk_plan = {'time': ntime // chunks_per_dim, 'feature_id': nfeature // chunks_per_dim}
bytes_per_value = ds.velocity.dtype.itemsize
total_bytes = velocity_chunk_plan['time'] * velocity_chunk_plan['feature_id'] * bytes_per_value
velocity_MiB = total_bytes / (2 ** 10) ** 2
partial_chunks = {'time': ntime - velocity_chunk_plan['time'] * chunks_per_dim,
'feature_id': nfeature - velocity_chunk_plan['feature_id'] * chunks_per_dim,}
print("VELOCITY \n"
f"Chunk of shape {velocity_chunk_plan} \n"
f"Partial 'time' chunk remainder: {partial_chunks['time']} ({partial_chunks['time']/velocity_chunk_plan['time']:.3f}% of a chunk)\n"
f"Partial 'feature_id' chunk remainder: {partial_chunks['feature_id']} ({partial_chunks['feature_id']/velocity_chunk_plan['feature_id']:.3f}% of a chunk)\n"
f"Chunk size: {velocity_MiB:.2f} [MiB]")
STREAMFLOW
Chunk of shape {'time': 87672, 'feature_id': 100}
Partial 'time' chunk remainder: 0.0 (0.000% of a chunk)
Partial 'feature_id' chunk remainder: 0.0 (0.000% of a chunk)
Chunk size: 66.89 [MiB]
VELOCITY
Chunk of shape {'time': 7306, 'feature_id': 1250}
Partial 'time' chunk remainder: 0 (0.000% of a chunk)
Partial 'feature_id' chunk remainder: 0 (0.000% of a chunk)
Chunk size: 69.68 [MiB]
Nice! Now, our chunks are a reasonable size and have no remainders. So, let’s use these chunk plans for our rechunking.
chunk_plan = {
'streamflow': streamflow_chunk_plan,
'velocity': velocity_chunk_plan,
# We don't want any of the coordinates chunked
'latitude': (nfeature,),
'longitude': (nfeature,),
'time': (ntime,),
'feature_id': (nfeature,)
}
chunk_plan
{'streamflow': {'time': 87672, 'feature_id': 100},
'velocity': {'time': 7306, 'feature_id': 1250},
'latitude': (15000,),
'longitude': (15000,),
'time': (87672,),
'feature_id': (15000,)}
Rechunk with Rechunker#
With this plan, we can now ask Rechunker to re-write the data using the prescribed chunking pattern.
Set up output location#
Unlike with the smaller dataset in our previous rechunking tutorial, we will write this larger dataset to an object store (similar to an S3 bucket) on the USGS OSN Pod. So, we need to set that up so that Rechunker will have a suitable place to write data.
First, we need to set up our credentials that allow us to write to the OSN Pod and direct it to the endpoint where the OSN Pod is located (while it is available via the S3 API, it is not in AWS, so we have to tell it where to find the pod!). If you do not already have access to the credentials to write to the OSN Pod and you are a USGS staff member or collaborator, please reach out to asnyder@usgs.gov to request them. If you are not a USGS staff or collaborator, you will need to set up a different location that to you have write permissions to. This could be an AWS S3 bucket that you have access too or you could even set up an fsspec
LocalFileSystem
.
awsconfig = configparser.ConfigParser()
awsconfig.read(
os.path.expanduser('~/.aws/credentials')
# default location... if yours is elsewhere, change this.
)
_profile_nm = 'osn-hytest-scratch'
_endpoint = 'https://usgs.osn.mghpcc.org/'
# Set environment vars based on parsed awsconfig
try:
os.environ['AWS_ACCESS_KEY_ID'] = awsconfig[_profile_nm]['aws_access_key_id']
os.environ['AWS_SECRET_ACCESS_KEY'] = awsconfig[_profile_nm]['aws_secret_access_key']
os.environ['AWS_S3_ENDPOINT'] = _endpoint
os.environ['AWS_PROFILE'] = _profile_nm
os.environ['AWS_DEFAULT_PROFILE'] = _profile_nm
except KeyError as e:
logging.error("Problem parsing the AWS credentials file. ")
Next, we make our S3 fsspec.filesystem
with the required user info and get the mapper to this file to pass to Rechunker.
from getpass import getuser
uname=getuser()
fs = fsspec.filesystem(
's3',
anon=False,
default_fill_cache=False,
skip_instance_cache=True,
client_kwargs={'endpoint_url': os.environ['AWS_S3_ENDPOINT'], }
)
output_dir = f's3://hytest-scratch/rechunking_tutorial/{uname}/'
temp_store = fs.get_mapper(output_dir + 'temp_store.zarr')
target_store = fs.get_mapper(output_dir + 'tutorial_rechunked.zarr')
# Check if the objects exist and remove if they do
for filename in [temp_store, target_store]:
try:
fs.rm(filename.root, recursive=True)
except:
FileNotFoundError
Spin up Dask Cluster#
Our rechunking operation will be able to work in parallel. To do that, we will spin up a dask cluster to schedule the various workers.
This cluster will be configured differently depending on where you compute is performed. We have set up this notebook to run on Nebari. If you are working on USGS systems, you can likely use one of our example codes to spin up a dask cluster. Otherwise, please refer to the dask deployment docs for details.
os.environ['DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION'] = "1.0"
gateway = Gateway()
_options = gateway.cluster_options()
_options.conda_environment='hytest/hytest-rechunking' ##<< this is the conda environment we use on nebari.
_options.profile = 'Medium Worker'
_env_to_add={}
aws_env_vars=['AWS_ACCESS_KEY_ID',
'AWS_SECRET_ACCESS_KEY',
'AWS_SESSION_TOKEN',
'AWS_DEFAULT_REGION',
'AWS_S3_ENDPOINT',
'AWS_REQUEST_CHECKSUM_CALCULATION',
'AWS_RESPONSE_CHECKSUM_VALIDATION']
for _e in aws_env_vars:
if _e in os.environ:
_env_to_add[_e] = os.environ[_e]
_options.environment_vars = _env_to_add
cluster = gateway.new_cluster(_options) ##<< create cluster via the dask gateway
cluster.scale(30) ##<< Sets scaling parameters.
client = cluster.get_client()
client
Client
Client-8644e3ef-5685-11f0-81a6-ea8215427e9e
Connection method: Cluster object | Cluster type: dask_gateway.GatewayCluster |
Dashboard: https://nebari.chs.usgs.gov/gateway/clusters/nebari.bf7ddf2129b949658d5b2883b107d177/status |
Cluster Info
GatewayCluster
- Name: nebari.bf7ddf2129b949658d5b2883b107d177
- Dashboard: https://nebari.chs.usgs.gov/gateway/clusters/nebari.bf7ddf2129b949658d5b2883b107d177/status
Rechunk#
Now, we are ready to rechunk!
result = rechunk(
# Make sure the base chunks are correct
ds.chunk({'time': 672, 'feature_id': 15000}),
target_chunks=chunk_plan,
max_mem="2GB",
target_store=target_store,
temp_store=temp_store
)
result
Rechunked
Source
<xarray.Dataset> Size: 21GB Dimensions: (time: 87672, feature_id: 15000) Coordinates: * feature_id (feature_id) int32 60kB 101 179 181 183 ... 303420 303422 303424 latitude (feature_id) float32 60kB dask.array<chunksize=(15000,), meta=np.ndarray> longitude (feature_id) float32 60kB dask.array<chunksize=(15000,), meta=np.ndarray> * time (time) datetime64[ns] 701kB 1999-10-01 ... 2009-09-30T23:00:00 Data variables: streamflow (time, feature_id) float64 11GB dask.array<chunksize=(672, 15000), meta=np.ndarray> velocity (time, feature_id) float64 11GB dask.array<chunksize=(672, 15000), meta=np.ndarray> Attributes: (12/17) Conventions: CF-1.6 cdm_datatype: Station code_version: v5.1.0-alpha11 dev: dev_ prefix indicates development/internal me... dev_NOAH_TIMESTEP: 3600 dev_OVRTSWCRT: 1 ... ... model_output_type: channel_rt model_output_valid_time: 2018-12-28_00:00:00 model_total_valid_times: 2208 proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ... station_dimension: feature_id stream_order_output: 1
Intermediate
NoneTarget
NoneRemember that merely invoking Rechunker does not do any work.
It just sorts out the rechunking plan and writes metadata.
We need to call .execute
on the result
object to actually run the rechunking.
with dask.diagnostics.ProgressBar():
r = result.execute(retries=10)
# Also consolidate the metadata for fast reading into xarray
_ = zarr.consolidate_metadata(target_store)
Confirm the Creation of the Zarr Store by Rechunker#
Let’s read in the resulting re-chunked dataset to confirm it turned out how we intended.
ds_rechunked = xr.open_zarr(target_store)
ds_rechunked
<xarray.Dataset> Size: 21GB Dimensions: (feature_id: 15000, time: 87672) Coordinates: * feature_id (feature_id) int32 60kB 101 179 181 183 ... 303420 303422 303424 latitude (feature_id) float32 60kB dask.array<chunksize=(15000,), meta=np.ndarray> longitude (feature_id) float32 60kB dask.array<chunksize=(15000,), meta=np.ndarray> * time (time) datetime64[ns] 701kB 1999-10-01 ... 2009-09-30T23:00:00 Data variables: streamflow (time, feature_id) float64 11GB dask.array<chunksize=(87672, 100), meta=np.ndarray> velocity (time, feature_id) float64 11GB dask.array<chunksize=(7306, 1250), meta=np.ndarray> Attributes: (12/17) Conventions: CF-1.6 cdm_datatype: Station code_version: v5.1.0-alpha11 dev: dev_ prefix indicates development/internal me... dev_NOAH_TIMESTEP: 3600 dev_OVRTSWCRT: 1 ... ... model_output_type: channel_rt model_output_valid_time: 2018-12-28_00:00:00 model_total_valid_times: 2208 proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ... station_dimension: feature_id stream_order_output: 1
Nice, looks good! You may have noticed that the only difference between the introductory tutorial on rechunking and this is the inclusion of creating the dask cluster and where we saved the files. Picking your compute environment and output location will typically be the only things that vary in other workflows requiring rechunking. Therefore, if you understand this rechunking process you should be able to apply it to your own data efficiently.
Clean Up#
As we don’t want to keep this rechunked Zarr, let’s go ahead and delete it. We will also conform with best practices and close our Dask client and cluster.
fs.rm(temp_store.root, recursive=True)
fs.rm(target_store.root, recursive=True)
client.close()
cluster.close()