Streamflow Eval :: Data Preparation#
As a part of the generalized evaluation workflow:
The pre-processing step is needed in order to align the two datasets for analysis. The specific steps needed to prepare a given dataset may differ, depending on the source and the variable of interest.
Some steps might include:
Organizing the time-series index such that the time steps for both simulated and observed are congruent
This may involve interpolation to estimate a more granular time-step than is found in the source data
More often, an agregating function is used to ‘down-sample’ the dataset to a coarser time step (days vs hours).
Coordinate aggregation units between simulated and observed
Gridded data may be sampled per HUC-12, HUC-6, etc. to match modeled data indexed by these units.
Index formats may be adjusted (e.g. a ‘gage_id’ may be ‘USGS-01104200’ in one data set, vs ‘01104200’ in another)
Re-Chunking the data to make time-series analysis more efficient (see here for a primer on re-chunking).
Streamflow Data Prep#
This document shows one approach to preparing the streamflow data for subsequent analysis (That analysis is outlined here).
Streamflow analysis will compare time-series of two aligned datasets:
These data soruces are accessed using different methods. We will pull data from their respective sources, reshape and optimize the data structures, then write that data to storage to make later analysis easier.
An overview of the steps we will take in this notebook:
Read Modeled Data
Establish AWS Credentials
Source NWIS data via API
Create a plan to Re-Structure that data for storage as ZARR file on S3.
Rename variables
Establish chunking layout
Establish encoding
Create a template to formalize this configuration
In Parallel (one worker per gage_id):
Fetch data from NWIS
Write data to ZARR file
Verify the data is correctly written to ZARR storage.
1) Reading the ‘modeled’ Data#
Modeled data for this demonstration tutorial will be sourced from the S3 bucket nhgf-development
.
import os
os.environ['USE_PYGEOS'] = '0'
import fsspec
import dask
import numpy as np
import xarray as xr
import intake
from getpass import getuser
username = getuser()
Modeled data is within the HyTEST intake
catalog, with name “nwm21-streamflow-usgs-gages-osn”:
cat = intake.open_catalog(r'https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml')
modeled = cat['nwm21-streamflow-usgs-gages-osn'].to_dask()
modeled
/home/conda/global/16102bfe-1731002172-4-pangeo/lib/python3.11/site-packages/intake_xarray/base.py:21: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.
'dims': dict(self._ds.dims),
<xarray.Dataset> Size: 47GB Dimensions: (gage_id: 7994, time: 367439) Coordinates: elevation (gage_id) float32 32kB dask.array<chunksize=(7994,), meta=np.ndarray> * gage_id (gage_id) <U20 640kB 'USGS-01030350' ... 'USGS-10254970' latitude (gage_id) float32 32kB dask.array<chunksize=(7994,), meta=np.ndarray> longitude (gage_id) float32 32kB dask.array<chunksize=(7994,), meta=np.ndarray> order (gage_id) int32 32kB dask.array<chunksize=(7994,), meta=np.ndarray> * time (time) datetime64[ns] 3MB 1979-02-01T01:00:00 ... 2020-12-31T... Data variables: streamflow (time, gage_id) float64 23GB dask.array<chunksize=(367439, 1), meta=np.ndarray> velocity (time, gage_id) float64 23GB dask.array<chunksize=(367439, 1), meta=np.ndarray> Attributes: TITLE: OUTPUT FROM WRF-Hydro v5.2.0-beta2 code_version: v5.2.0-beta2 featureType: timeSeries model_configuration: retrospective proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 +lat_1...
Source Data as Template
This source data set will establish the indices and boundaries for the data we will eventually pull from the NWIS stream gage network. The two dimensions of this data are the Gage ID and Time. We’ll use these dimensions to fetch the ‘observed’ data later. The endpoints and range of these dimensions will establish that future query.
## Gage IDs
import re
_gages = [gage_id.lstrip() for gage_id in modeled['gage_id'].values.astype('str')]
GAGES = [g for g in _gages if re.search('USGS-\d+$', g) ][0:100]
# ^^^^^^^ See NOTES:
## >> NOTE 1: The regex search pattern ensures we get legal gage names only
## >> NOTE 2: We are limiting the GAGES array to the first 100 gages for this
## demo/tutorial. To run this workflow for the entire set of GAGES,
## remove the slice notation [0:100]
# Time boundaries for future queries:
start_time = modeled.time.values.min()
stop_time = modeled.time.values.max()
DATE_RANGE=(start_time, stop_time)
Some important notes for each of those bounds:
GAGES / Gage IDs :: The list of 7994 gage IDs in the model dataset include some values which the NWIS does not recognize and will not accept. We need to remove them.
Gage IDs of the form
USGS-\d+
(A string starting ‘USGS-’ and ending in an arbitrary number of digits) are processed by NWIS data requests.There are roughly 350 gage IDs in the modeled dataset with letters embedded in the string of digits after the ‘USGS-’. These will be rejected by the API when we try to call NWIS data service to obtain streamflow history for that location.
This is the reason behind the regular expression search (
re.search
) to select only gage_id of the correct format.After selecting the NWIS-compliant gage IDs, the
GAGES
list contains 7647 gages. This tutorial will demonstrate the workflow using only the first 100 gages on that list. If you want to process the whole list, remove the slice notation as described in the comments above.
DATE_RANGE / Dates :: This defines the temporal range for the historical data will will fetch from NWIS.
The NWM modeled data includes time values stepped hourly.
The historical streamflow data is stepped daily.
We will resample later to make sure the indices match.
2) Establish AWS Credentials#
Now that we’ve got a handle on the ‘modeled’ data, we can begin to think about the matching ‘observed’ data. But before we do that, let’s establish credentials for working with our compute environment. Doing this now will streamline future I/O and cluster tasks.
# uncomment the lines below to read in your AWS credentials if you want to access data from a requester-pays bucket (-cloud)
os.environ['AWS_PROFILE'] = 'osn-hytest-scratch'
%run ../../../environment_set_up/Help_AWS_Credentials.ipynb
# With credentials established, instantiate a handle for the writable file system
fs_write = fsspec.filesystem('s3',
anon=False, # force a consult to environment vars set above.
skip_instance_cache=True,
client_kwargs={'endpoint_url': 'https://usgs.osn.mghpcc.org'}
)
fname=f's3://hytest-scratch/testing/{username}/nwis_out_{username}.zarr' #<<<< THIS will be our output location.
3) Sourcing the ‘observed’ data#
Now that we have information about the list of gages and the date range covered by the model, we can use that to query NWIS for matching data points for this same range of dates and station IDs. Because NWIS data is structured a little differently than the modeled streamflow, we’ll need to re-arrange the data a little after fetching.
In addition, a call to NWIS for historical data can be time consuming – and we will do it roughly 7500 times. We will eventually set up a mechanism to do these requests in parallel, once we’ve established how the data restructuring should happen.
The first step in that process is to make a NWIS request for just a couple of gages to see how the return data is structured. We’ll use that information to create the plan by which the full dataset is to be fetched and reorganized.
from pygeohydro import NWIS
nwis = NWIS()
## Fetch data for a couple of gages to see how NWIS formats a response
observed = nwis.get_streamflow(GAGES[0:2], DATE_RANGE, to_xarray=True)
## get_streamflow() is an API call to a data server via internet.
observed
<xarray.Dataset> Size: 368kB Dimensions: (time: 15310, station_id: 2) Coordinates: * time (time) datetime64[ns] 122kB 1979-02-01T05:00:00 ... 2020-12... * station_id (station_id) object 16B 'USGS-01030350' 'USGS-01030500' Data variables: discharge (time, station_id) float64 245kB nan 116.1 nan ... 2.093 158.3 station_nm (station_id) <U43 344B 'Wytopitlock Stream Near Wytopitlock... dec_lat_va (station_id) float64 16B 45.71 45.5 dec_long_va (station_id) float64 16B -68.16 -68.31 alt_va (station_id) float64 16B 381.4 210.6 alt_acy_va (station_id) float64 16B 0.1 0.1 alt_datum_cd (station_id) <U6 48B 'NAVD88' 'NAVD88' huc_cd (station_id) <U8 64B '01020003' '01020003' begin_date (station_id) <U10 80B '2008-10-01' '1934-10-01' end_date (station_id) <U10 80B '2024-12-17' '2024-12-17' Attributes: tz: UTC
4) Examine the Response Data – Make a plan#
Our goal is to use the NWIS service to fetch data for a large number of gages, then assemble that into a dataset that is structured similarly to our modeled data.
We requested two stream gages from NWIS rather than just one, to ensure that the dataset is multi-dimensional (as the final dataset will be). Using what we learned from the above NWIS call, we can make a plan for how to match it to the existing modeled data. A few considerations:
We’ll need to rename some variables (i.e. ‘discharge’ –> ‘streamflow’, etc).
We also need to make note of which data variables are strings, but perhaps stored with different encodings.
Because the composite dataset will be quite large, some care should be taken to chunk it such that it performs well for time-series analysis.
Lastly, we note the time range returned by NWIS. It covers the time range we requested in DATE_RAGE, but is stepped daily. We will use this information to interpolate future results.
# Step 4a : rename variables
observed = (observed
.rename_dims({'station_id':'gage_id'})
.rename({'discharge':'streamflow', 'station_id':'gage_id'})
)
observed
/tmp/ipykernel_5224/2079904522.py:4: UserWarning: rename 'station_id' to 'gage_id' does not create an index anymore. Try using swap_dims instead or use set_index after rename to create an indexed coordinate.
.rename({'discharge':'streamflow', 'station_id':'gage_id'})
<xarray.Dataset> Size: 368kB Dimensions: (time: 15310, gage_id: 2) Coordinates: * time (time) datetime64[ns] 122kB 1979-02-01T05:00:00 ... 2020-12... * gage_id (gage_id) object 16B 'USGS-01030350' 'USGS-01030500' Data variables: streamflow (time, gage_id) float64 245kB nan 116.1 nan ... 2.093 158.3 station_nm (gage_id) <U43 344B 'Wytopitlock Stream Near Wytopitlock, M... dec_lat_va (gage_id) float64 16B 45.71 45.5 dec_long_va (gage_id) float64 16B -68.16 -68.31 alt_va (gage_id) float64 16B 381.4 210.6 alt_acy_va (gage_id) float64 16B 0.1 0.1 alt_datum_cd (gage_id) <U6 48B 'NAVD88' 'NAVD88' huc_cd (gage_id) <U8 64B '01020003' '01020003' begin_date (gage_id) <U10 80B '2008-10-01' '1934-10-01' end_date (gage_id) <U10 80B '2024-12-17' '2024-12-17' Attributes: tz: UTC
# Step 4b : define chunking in a 'template' dataset
source_dataset = observed
template = (xr.zeros_like(source_dataset) # DataSet just like 'observed'
.chunk()
.isel(gage_id=0, drop=True) # temporarily remove gage_id as a dimension and coordinate
.expand_dims(gage_id=len(GAGES), axis=-1) # add it back, reserving space for the full size of GAGES
.assign_coords({'gage_id': GAGES}) # add coordinate to match dimension
.chunk({ # define chunk sizes
'time': len(observed.time), # all time vals in one chunk
'gage_id': 1} # one gage_id per chunk
)
)
template
<xarray.Dataset> Size: 12MB Dimensions: (time: 15310, gage_id: 100) Coordinates: * time (time) datetime64[ns] 122kB 1979-02-01T05:00:00 ... 2020-12... * gage_id (gage_id) <U20 8kB 'USGS-01030350' ... 'USGS-07241800' Data variables: streamflow (time, gage_id) float64 12MB dask.array<chunksize=(15310, 1), meta=np.ndarray> station_nm (gage_id) <U43 17kB dask.array<chunksize=(1,), meta=np.ndarray> dec_lat_va (gage_id) float64 800B dask.array<chunksize=(1,), meta=np.ndarray> dec_long_va (gage_id) float64 800B dask.array<chunksize=(1,), meta=np.ndarray> alt_va (gage_id) float64 800B dask.array<chunksize=(1,), meta=np.ndarray> alt_acy_va (gage_id) float64 800B dask.array<chunksize=(1,), meta=np.ndarray> alt_datum_cd (gage_id) <U6 2kB dask.array<chunksize=(1,), meta=np.ndarray> huc_cd (gage_id) <U8 3kB dask.array<chunksize=(1,), meta=np.ndarray> begin_date (gage_id) <U10 4kB dask.array<chunksize=(1,), meta=np.ndarray> end_date (gage_id) <U10 4kB dask.array<chunksize=(1,), meta=np.ndarray> Attributes: tz: UTC
## Step 4c : Commit template to permanent storage:
if fs_write.exists(fname):
print("Removing old copy of tutorial/demo output...", end="")
fs_write.rm(fname, recursive=True)
print("Done")
# Step 4c (continued): write the template with specific encodings
outfile = fs_write.get_mapper(fname)
template.to_zarr(
outfile,
compute=False,
encoding = { # encodings sets data types for the disk store
'station_nm': dict( _FillValue=None, dtype='<U64'),
'alt_datum_cd':dict( _FillValue=None, dtype='<U6'),
'alt_acy_va': dict( _FillValue=-2147483647, dtype=np.int32),
'alt_va': dict( _FillValue=9.96921e+36, dtype=np.float32),
'dec_lat_va': dict( _FillValue=None, dtype=np.float32),
'dec_long_va': dict( _FillValue=None, dtype=np.float32),
'streamflow': dict( _FillValue=9.96921e+36, dtype=np.float32)
},
consolidated=True, # Consolidate metadata
mode='w'
)
Delayed('_finalize_store-83def943-f49f-4544-a27b-7cb5f511e999')
5) Parallel Processing#
The above steps were necessary to establish a permanent disk storage space for the output
dataset. We’ve established its structure (variables, chunking plan, encodings) and also
given a hint as to its size (by asserting the length of the gage_id
index to be len(GAGES)
items).
With that all established, we can now execute a job where each gage’s data is fetched from NWIS and inserted into the permanent store. This demo is limited (by default) to only 100 gages, so could in theory be executed serially. We want to do it in parallel so as to model the process for an arbitrary number of gages.
## Step 5a) : write the 'worker' function -- this will be called once per gage
# Globals:
n_timesteps = len(observed.time)
time_steps = observed.time.values
def write_one_gage(n):
"""
Writes one gage's data to the existing zarr file. Uses the NWIS API call to fetch data.
Arguments:
n : integer
the index into the GAGES array identifying which gage to fetch and write.
"""
site_id = GAGES[n]
try:
_obs = nwis.get_streamflow(site_id, DATE_RANGE, to_xarray=True).interp(time=time_steps)
_obs = _obs.rename_dims({'station_id':'gage_id'}).rename({'station_id':'gage_id','discharge':'streamflow'})
## We must force the returned data into the datatype that we stored to disk.
_obs['station_nm'] = xr.DataArray(data=_obs['station_nm'].values.astype('<U64'), dims='gage_id')
_obs['alt_datum_cd'] = xr.DataArray(data=_obs['alt_datum_cd'].values.astype('<U6'), dims='gage_id')
_obs.to_zarr(
outfile,
region={ #<<< Specifying a region lets us 'insert' data to a specific place in the dataset.
'time': slice(0, n_timesteps),
'gage_id': slice(n,n+1)
}
)
return n # If success, returns the index into GAGES.
except Exception as e:
pass
#return e # if failure, return the exception thrown.
# This is an extremely broad way to catch exceptions... and in general is to be avoided.
# We do it this way in this case to protect the parallel run. it allows a single write_one_gage()
# to fail silently without affecting the rest of the run.
# Step 5b) Start up a distributed cluster of workers
# NOTE: This cluster configuration is VERY specific to the JupyterHub cloud environment on ESIP/QHUB
%run ../../../environment_set_up/Start_Dask_Cluster_Nebari.ipynb
The 'cluster' object can be used to adjust cluster behavior. i.e. 'cluster.adapt(minimum=10)'
The 'client' object can be used to directly interact with the cluster. i.e. 'client.submit(func)'
The link to view the client dashboard is:
> https://hytestnebari.dev-wma.chs.usgs.gov/gateway/clusters/dev.9ff11f903d9e473ebd890b099ac64584/status
%%time
## Run the list of tasks:
results = dask.compute(*[dask.delayed(write_one_gage)(i) for i in range(len(GAGES))], retries=10)
CPU times: user 106 ms, sys: 1.59 ms, total: 108 ms
Wall time: 50.6 s
## Consolidate metadata, to make future reads easier/faster
from zarr.convenience import consolidate_metadata
_ = consolidate_metadata(outfile)
## Shut down the cluster
client.close()
cluster.close()
6) Verify#
We can now read the dataset that we just wrote to disk. Does it have the dimensions, chunking, and encoding that we want?
## fname is already set from above...
outfile=fs_write.get_mapper(fname)
dst = xr.open_dataset(outfile, engine='zarr', chunks={}, backend_kwargs=dict(consolidated=True))
## NOTE: xarray will employ a 'lazy' loader; only metadata will be loaded initially. Will only
## read real data when it is actually needed for computation.
dst
<xarray.Dataset> Size: 6MB Dimensions: (gage_id: 100, time: 15310) Coordinates: * gage_id (gage_id) <U20 8kB 'USGS-01030350' ... 'USGS-07241800' * time (time) datetime64[ns] 122kB 1979-02-01T05:00:00 ... 2020-12... Data variables: alt_acy_va (gage_id) float64 800B dask.array<chunksize=(1,), meta=np.ndarray> alt_datum_cd (gage_id) <U6 2kB dask.array<chunksize=(1,), meta=np.ndarray> alt_va (gage_id) float32 400B dask.array<chunksize=(1,), meta=np.ndarray> begin_date (gage_id) <U10 4kB dask.array<chunksize=(1,), meta=np.ndarray> dec_lat_va (gage_id) float32 400B dask.array<chunksize=(1,), meta=np.ndarray> dec_long_va (gage_id) float32 400B dask.array<chunksize=(1,), meta=np.ndarray> end_date (gage_id) <U10 4kB dask.array<chunksize=(1,), meta=np.ndarray> huc_cd (gage_id) <U8 3kB dask.array<chunksize=(1,), meta=np.ndarray> station_nm (gage_id) <U64 26kB dask.array<chunksize=(1,), meta=np.ndarray> streamflow (time, gage_id) float32 6MB dask.array<chunksize=(15310, 1), meta=np.ndarray> Attributes: tz: UTC
import hvplot.xarray
dst.sel(gage_id='USGS-07241800').hvplot(x='time',y='streamflow', grid=True)
## This select operator is a specific call to read data -- so may take a
## moment to fetch the full time series for the specified gage.
DONE#
That dataset is now available for future analysis in which we need a consolidated NWIS dataset, chunked to optimize time-series analysis.