Steamflow Eval :: DScore Analysis#

../../../_images/Eval_Analysis.svg

Note

This notebook adapted from originals by Timothy Hodson and Rich Signell. See that upstream work at:

  • https://github.com/thodson-usgs/dscore

  • https://github.com/USGS-python/hytest-evaluation-workflows/

Essential Benchmark Components#

This benchmark notebook will present a workflow which follows a canonical model for Essential Benchmark Components:

  1. A set of predictions and matching observations;

  2. The domain (e.g. space or time) over which to benchmark;

  3. A set of statistical metrics with which to benchmark.

Let’s get started.

Step 0: Load libraries and configure Python computing environment#

import pandas as pd
import logging
import os

Step 1: Load Data#

../../../_images/Eval_Analysis_Data.svg

Essential Benchmark Components:

  1. A set of predictions and matching observations, <<–You are here

  2. The domain over which to benchmark

  3. A set of statistical metrics with which to benchmark.

Finding and loading data is made easier for this particular workflow (the streamflow variable), in that most of it has been pre-processed and stored in a cloud-friendly format. That process is described in the data preparation notebook. We will proceed here using the already-prepared data for streamflow, which is included in the HyTEST intake catalog.

import intake
cat = intake.open_catalog(r'https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml')
print("Available datasets: \n", "\n".join(cat.keys()))
Available datasets: 
 conus404-catalog
benchmarks-catalog
conus404-drb-eval-tutorial-catalog
nhm-v1.0-daymet-catalog
nhm-v1.1-c404-bc-catalog
nhm-v1.1-gridmet-catalog
trends-and-drivers-catalog
nhm-prms-v1.1-gridmet-format-testing-catalog
nwis-streamflow-usgs-gages-onprem
nwis-streamflow-usgs-gages-osn
nwm21-streamflow-usgs-gages-onprem
nwm21-streamflow-usgs-gages-osn
nwm21-streamflow-cloud
geofabric_v1_1-zip-osn
geofabric_v1_1_POIs_v1_1-osn
geofabric_v1_1_TBtoGFv1_POIs-osn
geofabric_v1_1_nhru_v1_1-osn
geofabric_v1_1_nhru_v1_1_simp-osn
geofabric_v1_1_nsegment_v1_1-osn
gages2_nndar-osn
wbd-zip-osn
huc12-geoparquet-osn
huc12-gpkg-osn
nwm21-scores
lcmap-cloud
rechunking-tutorial-osn
pointsample-tutorial-sites-osn
pointsample-tutorial-output-osn

The above list represents the processed datasets available for benchmarking. If a dataset you want is not in that list, you can load data directly from S3 or onprem. If you load data from a source other than this list, you can jump to Step 2: Restrict to a Domain

Note that the interesting datasets in the cataloged dataset above are duplicated: Some are -onprem , some are -cloud, and some are some are -osn. Same data, but the storage location and access protocol will be different. You will definitely want to use the correct copy of the data for your computing environment.

  • onprem : Direct access via the caldera filesystem from denali or tallgrass

  • cloud : Network access via S3 bucket, suitable for consumption on cloud-hosted jupyter servers. You could also access using any network-attached computer, but the amount of data will likely saturate your connection. Use in the cloud (e.g. ESIP QHub)

  • osn : Network access via OSN pod, which uses the S3 API, suitable for consumption on any jupyter server.

So… are you on-prem?

import platform
onprem = (platform.node() in ['denali', 'tallgrass'])
if onprem:
    print("Yes : -onprem")
    obs_data_src='nwis-streamflow-usgs-gages-onprem'
    mod_data_src='nwm21-streamflow-usgs-gages-onprem'
else:
    print("Not onprem; use '-osn' data source")
    obs_data_src='nwis-streamflow-usgs-gages-osn'
    mod_data_src='nwm21-streamflow-usgs-gages-osn'
print("Observed : ", obs_data_src)
print("Modeled  : ", mod_data_src)
Not onprem; use '-osn' data source
Observed :  nwis-streamflow-usgs-gages-osn
Modeled  :  nwm21-streamflow-usgs-gages-osn
variable_of_interest = 'streamflow'
try:
    obs = cat[obs_data_src].to_dask()
    mod = cat[mod_data_src].to_dask()
except KeyError:
    print("Something wrong with dataset names.")
    raise

try:
    obs_data = obs[variable_of_interest]
    mod_data = mod[variable_of_interest].astype('float32')
except KeyError:
    print(f"{variable_of_interest} was not found in these data.")

obs_data.name = 'observed'
mod_data.name = 'predicted'    
/home/conda/global/16102bfe-1731002172-4-pangeo/lib/python3.11/site-packages/intake_xarray/base.py:21: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.
  'dims': dict(self._ds.dims),
/home/conda/global/16102bfe-1731002172-4-pangeo/lib/python3.11/site-packages/intake_xarray/base.py:21: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.
  'dims': dict(self._ds.dims),

Step 2: Restrict to a Domain#

../../../_images/Eval_Analysis_Domain.svg

Essential Benchmark Components:

  1. A set of predictions and matching observations,

  2. The domain over which to benchmark <<–You are here

  3. A set of statistical metrics with which to benchmark.

Each benchmark domain is defined over specific bounds (typically space and/or time). Benchmark domain definitions can be defined within the notebook, or sourced from elsewhere. For this example, we use the Cobalt gages, avaliable for download on ScienceBase (Foks et al., 2022).

This is essentially a list of stream guages in which we are interested, along with some metadata about that gage (watershed, reach code, etc). We will use this as a spatial selector to restrict the original data to only those gages found within this benchmarking domain.

cobalt = pd.read_csv(
    'https://www.sciencebase.gov/catalog/file/get/6181ac65d34e9f2789e44897?f=__disk__22%2F1a%2Fd2%2F221ad2fe9d95de17731ad35d0fc6b417a4b53ee1',
    dtype={'site_no':str, 'huc_cd':str, 'reachcode':str, 'comid':str, 'gagesII_class':str, 'aggecoregion': str}, 
    index_col='site_no'
    )
# Re-format the gage_id/site_no string value.  ex:   "1000000"  ==> "USGS-1000000"
cobalt.rename(index=lambda x: f'USGS-{x}', inplace=True)
print(f"{len(cobalt.index)} gages in this benchmark")
cobalt.head(2)
5390 gages in this benchmark
dec_lat_va dec_long_va comid reachcode reach_meas drain_sqkm huc02 gagesII_class aggecoregion complete_yrs n_days nldi swim gfv1d1 camels
site_no
USGS-01011000 47.069722 -69.079444 721640 01010002000001 53.747189 3186.8 01 Non-ref NorthEast 33 12146 1 1 1 0
USGS-01013500 47.237500 -68.582778 724696 01010003000003 7.660419 2252.7 01 Ref NorthEast 33 12146 1 1 1 1

So now we have a domain dataset representing the stream gages (unique site_no values) identifying the locations making up the spatial domain of this benchmark. While we have good meta-data for these gages (lat/lon location, HUC8 code, etc), we really will only use the list of site_no values to select locations out of the raw data.

Step 3: Define Metrics#

../../../_images/Eval_Analysis_Metrics.svg

Essential Benchmark Components:

  1. A set of predictions and matching observations,

  2. The domain over which to benchmark

  3. A set of statistical metrics with which to benchmark. <<–You are here

The code to calculate the various metrics has been standardized in Standard Suite (v1) Metrics. You can use these or write your own. To import and use these standard definitions, run this cell:

%run ../../Metrics_DScore_Suite_v1.ipynb

Whether you use these functions or your own, we need to put all metric computation into a special all-encompasing benchmarking function–a single call which can be assigned to each gage in our domain list. This sort of arrangement is well-suited to parallelism with dask.

If this is done well, the process will benefit enormously from task parallelism – each gage can be given its own CPU to run on. After all are done, the various results will be collected and assembled into a composite dataset.

To achieve this, we need a single ‘atomic’ function that can execute independently. It will take the gage identifier as input and return a list of metrics.

## Wrapper function -- this func will be called once per gage_id, each call on its own dask worker
def compute_benchmark(gage_id):
    try:
        ## obs_data and mod_data should be globals...
        obs = obs_data.sel(gage_id=gage_id).load(scheduler='single-threaded').to_series()
        mod = mod_data.sel(gage_id=gage_id).load(scheduler='single-threaded').to_series().resample('1D', offset='5h').mean() 
        
        # make sure the indices match
        obs.index = obs.index.to_period('D')
        mod.index = mod.index.to_period('D')

        # merge obs and predictions; drop NaNs.
        gage_df = pd.merge(obs, mod, left_index=True, right_index=True).dropna(how='any')
        
        obs_log = np.log(gage_df['observed'].clip(lower=0.01)) # clip to remove zeros and negative values
        sim_log = np.log(gage_df['predicted'].clip(lower=0.01))
        
        scores = pd.concat([
                pd.Series([ mse(obs_log, sim_log) ], index=["mse"], dtype='float64'),
                bias_distribution_sequence(obs_log, sim_log), 
                seasonal_mse(obs_log, sim_log),
                quantile_mse(obs_log, sim_log)
                ]
            )
        scores.name=gage_id
        return scores
    except Exception as e:#<-- this is an extremely broad way to catch exceptions.  We only do it this way to ensure 
            #    that a failure on one benchmark (for a single stream gage) will not halt the entire run. 
        logging.info("Benchmark failed for %s", gage_id)
        return None

Let’s test to be sure this compute_benchmark() function will return data for a single gage

compute_benchmark('USGS-01030350')
mse          1.356542
e_bias       0.181100
e_dist       0.914028
e_seq        0.261676
winter       0.087221
spring       0.101688
summer       0.549519
fall         0.618114
low          1.110189
below_avg    0.076378
above_avg    0.055341
high         0.114634
Name: USGS-01030350, dtype: float64

Execute the Analysis#

We will be doing a lot of work in paralallel, using workers within a ‘cluster’.
The details of cluster configuration are handled for us by ‘helper’ notebooks, below. You can override their function by doing your own cluster configuration if you like.

# uncomment the lines below to read in your AWS credentials if you want to access data from a requester-pays bucket (-cloud)
# os.environ['AWS_PROFILE'] = 'default'
# %run ../environment_set_up/Help_AWS_Credentials.ipynb
%run ../../../environment_set_up/Start_Dask_Cluster_Nebari.ipynb
### Executes external 'helper to spin up a cluster of workers. 
The 'cluster' object can be used to adjust cluster behavior.  i.e. 'cluster.adapt(minimum=10)'
The 'client' object can be used to directly interact with the cluster.  i.e. 'client.submit(func)' 
The link to view the client dashboard is:
>  https://hytestnebari.dev-wma.chs.usgs.gov/gateway/clusters/dev.3a18fc545cca4988b4f18c4a84e4dfda/status

We verified above that the compute_benchmark works on the “hosted” server (where this notebook is being executed. As a sanity check before we give the cluster of workers a lot to do, let’s verify that we can have a remote worker process a gage by submitting work to one in isolation:

client.submit(compute_benchmark, 'USGS-01030350').result()
mse          1.356542
e_bias       0.181100
e_dist       0.914028
e_seq        0.261676
winter       0.087221
spring       0.101688
summer       0.549519
fall         0.618114
low          1.110189
below_avg    0.076378
above_avg    0.055341
high         0.114634
Name: USGS-01030350, dtype: float64

Now that we’ve got a benchmark function, and can prove that it works in remote workers within the cluster, we can dispatch a fleet of workers to process our data in parallel. We will mak use of dask to do thisusing a dask ‘bag’.

# Set up a dask bag with the contents beging a list of the cobalt gages
import dask.bag as db
bag = db.from_sequence( cobalt.index.tolist() ).map(compute_benchmark)
results = bag.compute() 

With that big task done, we don’t need dask parallelism any more. Let’s shut down the cluster:

client.close(); del client
cluster.close(); del cluster

Assemble the results#

The bag now contains a collection of return values (one per call to compute_benchmark()). We can massage that into a table/dataframe for easier processing:

results = [i for i in results if i is not None] # Drop entries where compute_benchmark failed
results_df = pd.concat(results, axis=1).T
results_df.index.name = 'site_no'
results_df
mse e_bias e_dist e_seq winter spring summer fall low below_avg above_avg high
site_no
USGS-01011000 0.349280 0.062854 0.008747 0.277697 0.077164 0.108429 0.108192 0.055495 0.052829 0.092242 0.104895 0.099314
USGS-01013500 0.231121 0.002947 0.070471 0.157718 0.024475 0.088122 0.049798 0.068726 0.072867 0.038483 0.036120 0.083652
USGS-01015800 0.283531 0.000781 0.014034 0.268733 0.039518 0.110405 0.073111 0.060497 0.059348 0.060264 0.074423 0.089495
USGS-01017000 0.316433 0.001108 0.012394 0.302951 0.046091 0.109606 0.083467 0.077269 0.062643 0.064209 0.086774 0.102808
USGS-01017550 0.667979 0.020339 0.046881 0.600843 0.125872 0.201271 0.146279 0.194557 0.128817 0.154305 0.165163 0.219694
... ... ... ... ... ... ... ... ... ... ... ... ...
USGS-14369500 0.458883 0.008361 0.152934 0.297617 0.061887 0.084825 0.153114 0.159056 0.194793 0.084246 0.119609 0.060235
USGS-14372300 0.306280 0.010342 0.148491 0.147466 0.047533 0.045161 0.043905 0.169680 0.105135 0.101732 0.062807 0.036606
USGS-14375100 2.365592 0.529879 0.202781 1.633049 1.084582 0.929288 0.222655 0.129067 1.310581 0.822220 0.133049 0.099743
USGS-14377100 0.277418 0.033382 0.020332 0.223719 0.042932 0.022683 0.108892 0.102911 0.133749 0.067866 0.045944 0.029858
USGS-14400000 0.242510 0.000133 0.054383 0.188010 0.048414 0.048052 0.044409 0.101635 0.067314 0.056164 0.064461 0.054572

5380 rows × 12 columns

This dataframe/table can be saved to disk as a CSV. It will be used for visualizations in Streamflow Eval :: Visualization .

results_df.to_csv('DScore_streamflow_example.csv') ##<--- change this to a personalized filename