Re-Chunking Larger Datasets#
This notebook extends ideas covered in the basic workflow. This notebook will perfrom the same operations, but will work on the much larger dataset, and involve some parallelization using the dask scheduler.
Warning
You should run this only on a cloud compute node – on ESIP Nebari, for example. We will be reading and writing enormous amounts of data to S3 buckets. To do that over a typical network connection will saturate your bandwidth and take days to complete.
System Setup#
import os
import xarray as xr
import dask
import intake
# Activate logging
import logging
logging.basicConfig(level=logging.INFO, force=True)
Plumb Data Source#
We’re going to look at a particular dataset from the National Water Model Reanalysis Version 2.1. The dataset is part of the AWS Open Data Program, and is included in the HyTEST data catalog.
url = 'https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml'
cat = intake.open_catalog(url)
cat['nwm21-streamflow-cloud']
nwm21-streamflow-cloud:
args:
consolidated: true
storage_options:
anon: true
urlpath: s3://noaa-nwm-retrospective-2-1-zarr-pds/chrtout.zarr
description: National Water Model 2.1 CHRTOUT on AWS
driver: intake_xarray.xzarr.ZarrSource
metadata:
catalog_dir: https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog
Load the zarr data#
We’ll take advantage of the intake
mechanism and load the data
directly. We’ll need to set up our AWS credentials first, since
this data is stored on an S3 bucket.
os.environ['AWS_PROFILE'] = "osn-hytest-scratch"
%run ../../../environment_set_up/Help_AWS_Credentials.ipynb
ds = cat['nwm21-streamflow-cloud'].to_dask()
indexer = ds.gage_id != ''.rjust(15).encode()
smplData = ds.where(indexer.compute(), drop=True) # subset to only those features with a valid gage_id
smplData.drop('crs') # Not needed/wanted for this analysis
smplData
/home/conda/global/16102bfe-1731002172-4-pangeo/lib/python3.11/site-packages/intake_xarray/base.py:21: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.
'dims': dict(self._ds.dims),
/tmp/ipykernel_7263/974520311.py:4: DeprecationWarning: dropping variables using `drop` is deprecated; use drop_vars.
smplData.drop('crs') # Not needed/wanted for this analysis
<xarray.Dataset> Size: 47GB Dimensions: (feature_id: 7994, time: 367439) Coordinates: elevation (feature_id) float32 32kB 134.5 69.77 1.508e+03 ... 814.0 -10.02 * feature_id (feature_id) int32 32kB 3109 3923 ... 1170023539 1180000535 gage_id (feature_id) |S15 120kB b' 01030350' ... b' 10254... latitude (feature_id) float32 32kB 45.71 45.51 40.38 ... 48.99 49.0 32.67 longitude (feature_id) float32 32kB -68.16 -68.3 -105.1 ... -116.2 -115.5 order (feature_id) int32 32kB 3 5 5 3 1 1 2 1 1 ... 8 8 8 8 8 3 9 5 5 * time (time) datetime64[ns] 3MB 1979-02-01T01:00:00 ... 2020-12-31T... Data variables: crs (feature_id) object 64kB b'' b'' b'' b'' b'' ... b'' b'' b'' b'' streamflow (time, feature_id) float64 23GB dask.array<chunksize=(672, 7994), meta=np.ndarray> velocity (time, feature_id) float64 23GB dask.array<chunksize=(672, 7994), meta=np.ndarray> Attributes: TITLE: OUTPUT FROM WRF-Hydro v5.2.0-beta2 code_version: v5.2.0-beta2 featureType: timeSeries model_configuration: retrospective proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 +lat_1...
Restrict for Tutorial#
This is a demonstration workflow, which means we don’t really need to work on the entire
dataset – which is very large. We are going to cut this input dataset down to be just the
first 100 feature_id
values, so that this tutorial will run in reasonable time.
For processing a full-sized dataset, you’d just skip this step where we slice off a representative example of the data. Expect run time to increase in proportion to the size of the data being processed. |
smplData = smplData.isel(feature_id=slice(0, 100))
smplData
<xarray.Dataset> Size: 591MB Dimensions: (feature_id: 100, time: 367439) Coordinates: elevation (feature_id) float32 400B 134.5 69.77 1.508e+03 ... 345.9 294.0 * feature_id (feature_id) int32 400B 3109 3923 12932 ... 414203 414275 417955 gage_id (feature_id) |S15 2kB b' 01030350' ... b' 07241800' latitude (feature_id) float32 400B 45.71 45.51 40.38 ... 35.5 35.57 35.34 longitude (feature_id) float32 400B -68.16 -68.3 -105.1 ... -97.36 -96.87 order (feature_id) int32 400B 3 5 5 3 1 1 2 1 1 ... 3 5 4 4 6 6 6 6 6 * time (time) datetime64[ns] 3MB 1979-02-01T01:00:00 ... 2020-12-31T... Data variables: crs (feature_id) object 800B b'' b'' b'' b'' b'' ... b'' b'' b'' b'' streamflow (time, feature_id) float64 294MB dask.array<chunksize=(672, 100), meta=np.ndarray> velocity (time, feature_id) float64 294MB dask.array<chunksize=(672, 100), meta=np.ndarray> Attributes: TITLE: OUTPUT FROM WRF-Hydro v5.2.0-beta2 code_version: v5.2.0-beta2 featureType: timeSeries model_configuration: retrospective proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 +lat_1...
Note that this data has the full range of time values (367439 time steps) and all of the
variables (streamflow
, velocity
, …). We just selected 100 feature_ids to work with so
that the example data will execute more quickly.
Spin up Dask Cluster#
Our rechunking operation will be able to work in parallel. To do that, we will
spin up a dask
cluster on the cloud hardware to schedule the various workers.
Note that this cluster must be configured with a specific user profile with
permissions to write to our eventual output location.
%run ../../../environment_set_up/Start_Dask_Cluster_Nebari.ipynb
The 'cluster' object can be used to adjust cluster behavior. i.e. 'cluster.adapt(minimum=10)'
The 'client' object can be used to directly interact with the cluster. i.e. 'client.submit(func)'
The link to view the client dashboard is:
> https://hytestnebari.dev-wma.chs.usgs.gov/gateway/clusters/dev.f4cffc2741b8423b9bb0db239828f07c/status
Re-Chunk Plan#
We will configure a new chunking plan which will favor time-series analysis. Using the dimensions of the data:
367439 time steps
100 feature IDs
We can write the new plan as:
# The new chunking plan:
chunk_plan = {
'streamflow': {'time': 367439, 'feature_id': 1}, # all time records in one chunk for each feature_id
'velocity': {'time': 367439, 'feature_id': 1},
'elevation': (100,),
'gage_id': (100,),
'latitude': (100,),
'longitude': (100,),
'order': (100,),
'time': (367439,), # all time coordinates in one chunk
'feature_id': (100,) # all feature_id coordinates in one chunk
}
This will generate chunks which are 1(feature_id) x 367439(time) arrays of float64
.
# time * id * float64
bytes = 367439 * 1 * 8
kbytes = bytes / (2**10)
mbytes = kbytes / (2**10)
print(f"chunk size: {bytes=} ({kbytes=:.2f})({mbytes=:.4f})")
chunk size: bytes=2939512 (kbytes=2870.62)(mbytes=2.8033)
# Manually reset the chunking metadata in prep for re-chunking
smplData = smplData.chunk(chunks={'feature_id':1, 'time': 367439})
for x in smplData.variables:
smplData[x].encoding['chunks'] = None
Set up output location#
With this plan, we can ask rechunker
to re-write the data using the prescribed chunking pattern.
Unlike with the smaller dataset, we need to write this very large dataset to an object store in the datacenter: an S3 ‘bucket’. So we need to set that up so that rechunker
will have a suitable place to write data. This new data will be a complete copy of the original, just re-organized a bit.
from getpass import getuser
import fsspec
uname=getuser()
fsw = fsspec.filesystem(
's3',
anon=False,
default_fill_cache=False,
skip_instance_cache=True,
client_kwargs={'endpoint_url': 'https://usgs.osn.mghpcc.org', }
)
workspace = 's3://hytest-scratch/'
testDir = workspace + "testing/"
myDir = testDir + f'{uname}/'
fsw.mkdir(testDir)
INFO:aiobotocore.credentials:Found credentials in environment variables.
staging = fsw.get_mapper(myDir + 'tutorial_staging.zarr')
outfile = fsw.get_mapper(myDir + 'tutorial_rechunked.zarr')
for fname in [staging, outfile]:
print(f"Ensuring {fname.root} is empty...", end='')
try:
fsw.rm(fname.root, recursive=True)
except:
FileNotFoundError
print(" Done.")
Ensuring hytest-scratch/testing/asnyder/tutorial_staging.zarr is empty...
Done.
Ensuring hytest-scratch/testing/asnyder/tutorial_rechunked.zarr is empty...
Done.
Ready to rechunk#
import rechunker
## Recall that merely invoking rechunker does not do any work... just sorts out
## the rechunking plan and writes metadata.
result = rechunker.rechunk(
smplData,
chunk_plan,
"16GB",
outfile,
temp_store=staging,
)
/home/conda/global/16102bfe-1731002172-4-pangeo/lib/python3.11/site-packages/rechunker/api.py:442: SerializationWarning: variable None has data in the form of a dask array with dtype=object, which means it is being loaded into memory to determine a data type that can be safely stored on disk. To avoid this, coerce this variable to a fixed-size dtype with astype() before saving it.
variable = encode_zarr_variable(variable)
INFO:aiobotocore.credentials:Found credentials in environment variables.
from dask.distributed import progress, performance_report
with performance_report(filename="dask-report.html"):
r = result.execute(retries=10)
import zarr
_ = zarr.consolidate_metadata(outfile)
Results#
Let’s read in the resulting re-chunked dataset to see how it looks:
reChunkedData = xr.open_zarr(outfile)
reChunkedData
Comparison#
## Before:
smplData = ds.where(indexer.compute(), drop=True) # subset to only those features with a valid gage_id
smplData['streamflow'].sel(feature_id=417955)
# Note: many chunks needed to service a single feature_id
## After:
reChunkedData['streamflow'].sel(feature_id=417955)
# All data for the specified feature_id is in a single chunk
client.close()
cluster.close()