Essential Benchmark Components¶
This benchmark notebook will present a workflow which follows a canonical model for Essential Benchmark Components:
A set of predictions and matching observation (the data);
The domain (e.g. space or time) over which to benchmark;
A set of statistical metrics with which to benchmark.
Let’s get started.
Step 0: Load libraries and configure Python computing environment¶
import pandas as pd
import logging
import os
# Needed when boto3 >= 1.36.0 or the rechunking process will fail
# This needs to be set before the boto3 library gets loaded
# See: https://github.com/aws/aws-cli/issues/9214#issuecomment-2606619168
os.environ['AWS_REQUEST_CHECKSUM_CALCULATION'] = 'when_required'
os.environ['AWS_RESPONSE_CHECKSUM_VALIDATION'] = 'when_required'Step 1: Load Data¶
Essential Benchmark Components:
A set of predictions and matching observations
The domain over which to benchmark
A set of statistical metrics with which to benchmark.
Finding and loading data is made easier for this particular workflow (the streamflow variable), in that most of it has been pre-processed and stored in a cloud-friendly format. That process is described in the data preparation notebook. We will proceed here using the already-prepared data for streamflow, which is included in the HyTEST intake catalog.
import intake
cat = intake.open_catalog(r'https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml')
print("Available datasets: \n", "\n".join(cat.keys()))The above list represents the processed datasets available for benchmarking. If a dataset you want is not in that list, you can load the data manually via direct connection to ‘on-prem’ or S3 object storage. If you load data from a source other than this list, you can jump to Step 2: Restrict to a Domain
Note that the interesting datasets in the cataloged dataset above are duplicated: Some are -onprem
and some are -cloud. Same data, but the storage location and access protocol will be different. You
will definitely want to use the correct copy of the data for your computing environment.
onprem: Direct access via thecalderafilesystem from denali or tallgrasscloud: Network access via S3 bucket, suitable for consumption on cloud-hosted jupyter servers. You could also access using any network-attached computer, but the amount of data will likely saturate your connection. Use in the cloud (e.g. ESIP QHub)osn: Network access via OSN pod, which uses the S3 API, suitable for consumption on any jupyter server.
So... are you on-prem?
import platform
onprem = (platform.node() in ['denali', 'tallgrass']) # NOTE: these hostnames are not quite correct... this will always return not onprem.
if onprem:
print("Yes : -onprem")
obs_data_src='nwis-streamflow-usgs-gages-onprem'
mod_data_src='nwm21-streamflow-usgs-gages-onprem'
else:
print("Not onprem; use '-osn' data source")
obs_data_src='nwis-streamflow-usgs-gages-osn'
mod_data_src='nwm21-streamflow-usgs-gages-osn'
print("Observed : ", obs_data_src)
print("Modeled : ", mod_data_src)variable_of_interest = 'streamflow'
try:
obs = cat[obs_data_src].to_dask()
mod = cat[mod_data_src].to_dask()
except KeyError:
print("Something wrong with dataset names.")
raise
try:
obs_data = obs[variable_of_interest]
mod_data = mod[variable_of_interest].astype('float32')
except KeyError:
print(f"{variable_of_interest} was not found in these data.")
obs_data.name = 'observed'
mod_data.name = 'predicted' Step 2: Restrict to a Domain¶
Essential Benchmark Components:
A set of predictions and matching observations,
The domain over which to benchmark
A set of statistical metrics with which to benchmark.
Each benchmark domain is defined over specific bounds (typically space and/or time). Benchmark domain definitions can be defined within the notebook, or sourced from elsewhere. For this example, we use the Cobalt gages, avaliable for download on ScienceBase (Foks et al., 2022).
This is essentially a list of stream guages in which we are interested, along with some metadata about that gage (watershed, reach code, etc). We will use this as a spatial selector to restrict the original data to only those gages found within this benchmarking domain.
cobalt = pd.read_csv(
'https://www.sciencebase.gov/catalog/file/get/6181ac65d34e9f2789e44897?f=__disk__22%2F1a%2Fd2%2F221ad2fe9d95de17731ad35d0fc6b417a4b53ee1',
dtype={'site_no':str, 'huc_cd':str, 'reachcode':str, 'comid':str, 'gagesII_class':str, 'aggecoregion': str},
index_col='site_no'
)
# Re-format the gage_id/site_no string value. ex: "1000000" ==> "USGS-1000000"
cobalt.rename(index=lambda x: f'USGS-{x}', inplace=True)
print(f"{len(cobalt.index)} gages in this benchmark")
cobalt.head(2)So now we have a domain dataset representing the stream gages (unique site_no values) identifying the locations making up the spatial domain of this benchmark. While we have good meta-data for these gages (lat/lon location, HUC8 code, etc), we really will only use the list of site_no values to select locations out of the raw data.
Step 3: Define Metrics¶
Essential Benchmark Components:
A set of predictions and matching observations,
The domain over which to benchmark
A set of statistical metrics with which to benchmark.
The code to calculate the various NWM metrics has been standardized in Standard Suite (v1) Metrics with usage examples in Standard Suite (v1) Metrics -- Usage Examples. You can use these metrics or write your own. To import and use these standardized definitions, run this cell:
%run ../../Metrics_StdSuite_v1.ipynbWhether you use these functions or your own, we need to put all metric computation into a special all-encompasing
benchmarking function--a single call which can be assigned to each gage in our domain list. This sort of arrangement
is well-suited to parallelism with dask.
If this is done well, the process will benefit enormously from task parallelism -- each gage can be given its own CPU to run on. After all are done, the various results will be collected and assembled into a composite dataset.
To achieve this, we need a single ‘atomic’ function that can execute independently. It will take the gage identifier as input and return a list of metrics.
## Wrapper function -- this func will be called once per gage_id, each call on its own dask worker
def compute_benchmark(gage_id):
try:
## obs_data and mod_data should be globals...
obs = obs_data.sel(gage_id=gage_id).load(scheduler='single-threaded').to_series()
mod = mod_data.sel(gage_id=gage_id).load(scheduler='single-threaded').to_series().resample('1D', offset='5h').mean()
# make sure the indices match
obs.index = obs.index.to_period('D')
mod.index = mod.index.to_period('D')
# merge obs and predictions; drop NaNs.
gage_df = pd.merge(obs, mod, left_index=True, right_index=True).dropna(how='any')
scores = pd.Series(
data={
'NSE': NSE(gage_df.observed, gage_df.predicted),
'KGE': KGE(gage_df.observed, gage_df.predicted),
'logNSE': logNSE(gage_df.observed, gage_df.predicted),
'pbias': pbias(gage_df.observed, gage_df.predicted),
'rSD': rSD(gage_df.observed, gage_df.predicted),
'pearson': pearson_r(gage_df.observed, gage_df.predicted),
'spearman': spearman_r(gage_df.observed, gage_df.predicted),
'pBiasFMS': pBiasFMS(gage_df.observed, gage_df.predicted),
'pBiasFLV': pBiasFLV(gage_df.observed, gage_df.predicted),
'pBiasFHV': pBiasFHV(gage_df.observed, gage_df.predicted)
},
name=gage_id,
dtype='float64'
)
return scores
except Exception as e:#<-- this is an extremely broad way to catch exceptions. We only do it this way to ensure
# that a failure on one benchmark (for a single stream gage) will not halt the entire run.
logging.info("Benchmark failed for %s", gage_id)
return NoneLet’s test to be sure this compute_benchmark() function will return data for a single gage
compute_benchmark('USGS-01030350')Execute the Analysis¶
We will be doing a lot of work in parallel, using workers within a ‘cluster’.
The details of cluster configuration are handled for us by ‘helper’ notebooks, below.
You can override their function by doing your own cluster configuration if you like.
# uncomment the lines below to read in your AWS credentials if you want to access data from a requester-pays bucket (-cloud)
# os.environ['AWS_PROFILE'] = 'default'
# %run ../../../environment_set_up/Help_AWS_Credentials.ipynb%run ../../../environment_set_up/Start_Dask_Cluster_Nebari.ipynb
### Executes external 'helper to spin up a cluster of workers. We verified above that the compute_benchmark works on the “hosted” server (where this
notebook is being executed. As a sanity check before we give the cluster of workers a lot
to do, let’s verify that we can have a remote worker process a gage by submitting work
to one in isolation:
client.submit(compute_benchmark, 'USGS-01030350').result()Now that we’ve got a benchmark function, and can prove that it works in remote workers
within the cluster, we can dispatch a fleet of workers to process our data in parallel.
We will make use of dask to do this using a dask ‘bag’.
# Set up a dask bag with the contents beging a list of the cobalt gages
import dask.bag as db
bag = db.from_sequence( cobalt.index.tolist() ).map(compute_benchmark)
results = bag.compute() #<< Dispatch the workersWith that big task done, we don’t need dask parallelism any more. Let’s shut down the cluster:
client.close(); del client
cluster.close(); del clusterAssemble the results¶
The bag now contains a collection of return values (one per call to compute_benchmark()). We can massage that into a table/dataframe for easier processing:
r = [i for i in results if i is not None] # Drop entries where compute_benchmark failed
results_df = pd.concat(r, axis=1).T
results_df.index.name = 'site_no'
results_dfThis dataframe/table can be saved to disk as a CSV. It will be used for visualizations in other notebooks.
results_df.to_csv('NWM_v2.1_streamflow_example.csv') ##<--- change this to a personalized filename- Foks, S. S., Towler, E., Hodson, T. O., Bock, A. R., Dickinson, J. E., Dugger, A. L., Dunne, K. A., Essaid, H. I., Miles, K. J., Over, T. M., Penn, C. A., Russell, A. M., Saxe, S. W., & Simeone, C. E. (2022). Streamflow benchmark locations for hydrologic model evaluation within the conterminous United States (cobalt gages). U.S. Geological Survey. 10.5066/P972P42Z