# Re-Chunking Larger Datasets 

This notebook extends ideas covered in the [basic workflow](./ReChunkingData.ipynb).  This 
notebook will perfrom the same operations, but will work on the **much** larger dataset, and 
involve some parallelization using the dask scheduler. 

:::{Warning}

You should run this **only** on a cloud compute node -- on ESIP Nebari, for example. We 
will be reading and writing **enormous** amounts of data to S3 buckets. To do that over a 
typical network connection will saturate your bandwidth and take days to complete.

:::

## System Setup 

In [None]:
import os
import xarray as xr
import dask
import intake

# Activate logging
import logging
logging.basicConfig(level=logging.INFO, force=True)

## Plumb Data Source
We're going to look at a particular dataset from the National Water Model Reanalysis Version 2.1. 
The dataset is part of the AWS Open Data Program, and is included in the HyTEST data catalog.


In [None]:
url = 'https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml'
cat = intake.open_catalog(url)
cat['nwm21-streamflow-cloud']

## Load the zarr data
We'll take advantage of the `intake` mechanism and load the data 
directly.  We'll need to set up our AWS credentials first, since
this data is stored on an S3 bucket. 

In [None]:
os.environ['AWS_PROFILE'] = "osn-hytest-scratch"
%run ../../../environment_set_up/Help_AWS_Credentials.ipynb

In [None]:
ds = cat['nwm21-streamflow-cloud'].to_dask()
indexer = ds.gage_id != ''.rjust(15).encode()
smplData = ds.where(indexer.compute(), drop=True) # subset to only those features with a valid gage_id
smplData.drop('crs') # Not needed/wanted for this analysis
smplData

## Restrict for Tutorial
This is a demonstration workflow, which means we don't really need to work on the entire 
dataset -- which is very large. We are going to cut this input dataset down to be just the
first 100 `feature_id` values, so that this tutorial will run in reasonable time. 

For processing a full-sized dataset, you'd just skip this step where we slice off a representative
example of the data. Expect run time to increase in proportion to the size of the data being
processed. |

In [None]:
smplData = smplData.isel(feature_id=slice(0, 100))
smplData

Note that this data has the full range of time values (367439 time steps) and all of the 
variables (`streamflow`, `velocity`, ...). We just selected 100 feature_ids to work with so 
that the example data will execute more quickly. 

## Spin up Dask Cluster
Our rechunking operation will be able to work in parallel. To do that, we will
spin up a `dask` cluster on the cloud hardware to schedule the various workers.
Note that this cluster must be configured with a specific user **profile** with 
permissions to write to our eventual output location. 

In [None]:
%run ../../../environment_set_up/Start_Dask_Cluster_Nebari.ipynb


## Re-Chunk Plan
We will configure a new chunking plan which will favor time-series analysis. 
Using the dimensions of the data: 
* 367439 time steps
* 100 feature IDs

We can write the new plan as: 

In [None]:
# The new chunking plan:
chunk_plan = {
    'streamflow': {'time': 367439, 'feature_id': 1}, # all time records in one chunk for each feature_id
    'velocity': {'time': 367439, 'feature_id': 1},
    'elevation': (100,),
    'gage_id': (100,),
    'latitude': (100,),
    'longitude': (100,),    
    'order': (100,),    
    'time': (367439,), # all time coordinates in one chunk
    'feature_id': (100,) # all feature_id coordinates in one chunk
}


This will generate chunks which are 1(feature_id) x 367439(time) arrays of `float64`. 

In [None]:
#       time  * id * float64
bytes = 367439 * 1 * 8
kbytes = bytes / (2**10)
mbytes = kbytes / (2**10)
print(f"chunk size: {bytes=} ({kbytes=:.2f})({mbytes=:.4f})")

In [None]:
# Manually reset the chunking metadata in prep for re-chunking
smplData = smplData.chunk(chunks={'feature_id':1, 'time': 367439})
for x in smplData.variables:
    smplData[x].encoding['chunks'] = None

## Set up output location

With this plan, we can ask `rechunker` to re-write the data using the prescribed chunking pattern.

Unlike with the smaller dataset, we need to write this very large dataset to an object store in the datacenter: an S3 'bucket'.  So we need to set that up so that `rechunker` will have a suitable place to write data. This new data will be a complete copy of the original, just re-organized a bit. 

In [None]:
from getpass import getuser
import fsspec
uname=getuser()

fsw = fsspec.filesystem(
    's3', 
    anon=False, 
    default_fill_cache=False, 
    skip_instance_cache=True, 
    client_kwargs={'endpoint_url': 'https://usgs.osn.mghpcc.org', }
)

workspace = 's3://hytest-scratch/'
testDir = workspace + "testing/"
myDir = testDir + f'{uname}/'
fsw.mkdir(testDir)

In [None]:
staging = fsw.get_mapper(myDir + 'tutorial_staging.zarr')
outfile = fsw.get_mapper(myDir + 'tutorial_rechunked.zarr')
for fname in [staging, outfile]:
    print(f"Ensuring {fname.root} is empty...", end='')
    try:
        fsw.rm(fname.root, recursive=True)
    except:
        FileNotFoundError
    print(" Done.")

## Ready to rechunk

In [None]:
import rechunker
## Recall that merely invoking rechunker does not do any work... just sorts out 
## the rechunking plan and writes metadata.
result = rechunker.rechunk(
    smplData,
    chunk_plan,
    "16GB",
    outfile, 
    temp_store=staging, 
)

In [None]:
from dask.distributed import progress, performance_report

with performance_report(filename="dask-report.html"):
    r = result.execute(retries=10)  

In [None]:
import zarr
_ = zarr.consolidate_metadata(outfile)

## Results
Let's read in the resulting re-chunked dataset to see how it looks:

In [None]:
reChunkedData = xr.open_zarr(outfile)
reChunkedData

### Comparison


In [None]:
## Before:
smplData = ds.where(indexer.compute(), drop=True) # subset to only those features with a valid gage_id
smplData['streamflow'].sel(feature_id=417955)
# Note: many chunks needed to service a single feature_id


In [None]:
## After:
reChunkedData['streamflow'].sel(feature_id=417955) 
# All data for the specified feature_id is in a single chunk


In [None]:
client.close()
cluster.close()