CONUS404 Site Data Selection#

Pull CONUS404 data at a set of lat/lon point locations.

import fsspec
import xarray as xr
import hvplot.xarray
import intake
import os
import cartopy.crs as ccrs
import metpy
import warnings
import pandas as pd
import s3fs
import xoak
import dask
import hvplot.pandas # to add .hvplot to DataFrames
from dask.distributed import LocalCluster, Client
warnings.filterwarnings('ignore')

Open dataset from Intake Catalog#

  • Select on-prem dataset from /caldera if running on prem (Denali/Tallgrass)

  • Select cloud/osn object store data if running elsewhere

# open the hytest data intake catalog
hytest_cat = intake.open_catalog("https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml")
list(hytest_cat)
['conus404-catalog',
 'benchmarks-catalog',
 'conus404-drb-eval-tutorial-catalog',
 'nhm-v1.0-daymet-catalog',
 'nhm-v1.1-c404-bc-catalog',
 'nhm-v1.1-gridmet-catalog',
 'trends-and-drivers-catalog',
 'nhm-prms-v1.1-gridmet-format-testing-catalog',
 'nwis-streamflow-usgs-gages-onprem',
 'nwis-streamflow-usgs-gages-osn',
 'nwm21-streamflow-usgs-gages-onprem',
 'nwm21-streamflow-usgs-gages-osn',
 'nwm21-streamflow-cloud',
 'geofabric_v1_1-zip-osn',
 'geofabric_v1_1_POIs_v1_1-osn',
 'geofabric_v1_1_TBtoGFv1_POIs-osn',
 'geofabric_v1_1_nhru_v1_1-osn',
 'geofabric_v1_1_nhru_v1_1_simp-osn',
 'geofabric_v1_1_nsegment_v1_1-osn',
 'gages2_nndar-osn',
 'wbd-zip-osn',
 'huc12-geoparquet-osn',
 'huc12-gpkg-osn',
 'nwm21-scores',
 'lcmap-cloud',
 'rechunking-tutorial-osn',
 'pointsample-tutorial-sites-osn',
 'pointsample-tutorial-output-osn']
# open the conus404 sub-catalog
cat = hytest_cat['conus404-catalog']
list(cat)
['conus404-hourly-onprem-hw',
 'conus404-hourly-cloud',
 'conus404-hourly-osn',
 'conus404-daily-diagnostic-onprem-hw',
 'conus404-daily-diagnostic-cloud',
 'conus404-daily-diagnostic-osn',
 'conus404-daily-onprem-hw',
 'conus404-daily-cloud',
 'conus404-daily-osn',
 'conus404-monthly-onprem-hw',
 'conus404-monthly-cloud',
 'conus404-monthly-osn',
 'conus404-hourly-ba-onprem-hw',
 'conus404-hourly-ba-osn',
 'conus404-daily-ba-onprem',
 'conus404-daily-ba-osn',
 'conus404-pgw-hourly-onprem-hw',
 'conus404-pgw-hourly-osn',
 'conus404-pgw-daily-diagnostic-onprem-hw',
 'conus404-pgw-daily-diagnostic-osn']
## Select the dataset you want to read into your notebook and preview its metadata
dataset = 'conus404-daily-diagnostic-osn' 
cat[dataset]
conus404-daily-diagnostic-osn:
  args:
    consolidated: true
    storage_options:
      anon: true
      client_kwargs:
        endpoint_url: https://usgs.osn.mghpcc.org/
      requester_pays: false
    urlpath: s3://hytest/conus404/conus404_daily_xtrm.zarr
  description: "CONUS404 daily diagnostic output (maximum, minimum, mean, and standard\
    \ deviation) for water vapor (Q2), grid-scale precipitation (RAINNC), skin temperature\
    \ (SKINTEMP), wind speed at 10 meter height (SPDUV10), temperature at 2 meter\
    \ height (T2), and U- and V-component of wind at 10 meters with respect to model\
    \ grid (U10, V10). These files were created wrfxtrm model output files (see ScienceBase\
    \ data release for more details: https://doi.org/10.5066/P9PHPK4F). This data\
    \ is stored on HyTEST\u2019s Open Storage Network (OSN) pod. This data can be\
    \ read with the S3 API and is free to work with in any computing environment (there\
    \ are no egress fees)."
  driver: intake_xarray.xzarr.ZarrSource
  metadata:
    catalog_dir: https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/subcatalogs

Parallelize with Dask#

Some of the steps we will take are aware of parallel clustered compute environments using dask. We’re going to start a cluster now so that future steps can take advantage of this ability.

This is an optional step, but speed ups data loading significantly, especially when accessing data from the cloud.

We have documentation on how to start a Dask Cluster in different computing environments here.

#%run ../environment_set_up/Start_Dask_Cluster_Nebari.ipynb
## If this notebook is not being run on Nebari/ESIP, replace the above 
## path name with a helper appropriate to your compute environment.  Examples:
# %run ../environment_set_up/Start_Dask_Cluster_Denali.ipynb
# %run ../environment_set_up/Start_Dask_Cluster_Tallgrass.ipynb
# %run ../environment_set_up/Start_Dask_Cluster_Desktop.ipynb
# %run ../environment_set_up/Start_Dask_Cluster_PangeoCHS.ipynb

Explore the dataset#

ds = cat[dataset].to_dask()
ds
<xarray.Dataset> Size: 3TB
Dimensions:       (y: 1015, x: 1367, time: 15707)
Coordinates:
    lat           (y, x) float32 6MB dask.array<chunksize=(350, 350), meta=np.ndarray>
    lon           (y, x) float32 6MB dask.array<chunksize=(350, 350), meta=np.ndarray>
  * time          (time) datetime64[ns] 126kB 1979-10-01 ... 2022-10-01
  * x             (x) float64 11kB -2.732e+06 -2.728e+06 ... 2.728e+06 2.732e+06
  * y             (y) float64 8kB -2.028e+06 -2.024e+06 ... 2.024e+06 2.028e+06
Data variables: (12/38)
    LANDMASK      (y, x) float32 6MB dask.array<chunksize=(350, 350), meta=np.ndarray>
    Q2MAX         (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray>
    Q2MEAN        (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray>
    Q2MIN         (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray>
    Q2STD         (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray>
    RAINCVMAX     (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray>
    ...            ...
    U10MEAN       (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray>
    U10STD        (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray>
    V10MAX        (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray>
    V10MEAN       (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray>
    V10STD        (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray>
    crs           int64 8B ...
Attributes: (12/80)
    BL_PBL_PHYSICS:                  1
    BOTTOM-TOP_GRID_DIMENSION:       51
    BOTTOM-TOP_PATCH_END_STAG:       51
    BOTTOM-TOP_PATCH_END_UNSTAG:     50
    BOTTOM-TOP_PATCH_START_STAG:     1
    BOTTOM-TOP_PATCH_START_UNSTAG:   1
    ...                              ...
    USE_THETA_M:                     0
    WEST-EAST_GRID_DIMENSION:        1368
    WEST-EAST_PATCH_END_STAG:        1368
    WEST-EAST_PATCH_END_UNSTAG:      1367
    WEST-EAST_PATCH_START_STAG:      1
    WEST-EAST_PATCH_START_UNSTAG:    1
# variables of interest
var = ['SKINTEMPMEAN', 'SKINTEMPMAX', 'SKINTEMPMIN', 'SKINTEMPSTD', 'TSKINTEMPMAX', 'TSKINTEMPMIN']
ds_var = ds[var]

Read in point data and clean#

points_df = hytest_cat['pointsample-tutorial-sites-osn'].read()
print(len(points_df))
points_df.head()
2080
site_id sources longitude latitude
0 Arizona_149 norwest -112.757795 36.304562
1 Arizona_150 norwest -112.632052 36.394490
2 Arizona_167 norwest -111.799499 36.192755
3 Arizona_174 norwest -112.093022 36.099446
4 BSP-1 ecosheds -69.080000 46.130000

Drop rows will null lat, lon#

points_df[points_df.isnull().any(axis=1)]
site_id sources longitude latitude
# drop rows will null lat, lon
points_df.dropna(subset = ['longitude', 'latitude'], inplace=True)
print(len(points_df))
2080

Set site_id as index#

#points_df = points_df.set_index(['site_id', 'longitude', 'latitude'])
points_df = points_df.set_index(['site_id'])
print(len(points_df))
2080
points_df.head()
sources longitude latitude
site_id
Arizona_149 norwest -112.757795 36.304562
Arizona_150 norwest -112.632052 36.394490
Arizona_167 norwest -111.799499 36.192755
Arizona_174 norwest -112.093022 36.099446
BSP-1 ecosheds -69.080000 46.130000

Make sure no site ids are duplicated#

points_df[points_df.index.duplicated()]
sources longitude latitude
site_id

Transform into xarray dataset#

points_ds = xr.Dataset.from_dataframe(points_df)
points_ds
<xarray.Dataset> Size: 67kB
Dimensions:    (site_id: 2080)
Coordinates:
  * site_id    (site_id) object 17kB 'Arizona_149' ... 'WashingtonCoast_98'
Data variables:
    sources    (site_id) object 17kB 'norwest' 'norwest' ... 'norwest' 'norwest'
    longitude  (site_id) float64 17kB -112.8 -112.6 -111.8 ... -122.2 -122.1
    latitude   (site_id) float64 17kB 36.3 36.39 36.19 36.1 ... 47.8 47.31 47.6

Find data values at point locations#

ds_var.xoak.set_index(['lat', 'lon'], 'sklearn_geo_balltree')
ds_var.xoak.index
[<sklearn.neighbors._ball_tree.BallTree at 0x7f7dc0425250>,
 <sklearn.neighbors._ball_tree.BallTree at 0x7f7dc0424db0>,
 <sklearn.neighbors._ball_tree.BallTree at 0x7f7dc80a9590>,
 <sklearn.neighbors._ball_tree.BallTree at 0x7f7dc0353940>,
 <sklearn.neighbors._ball_tree.BallTree at 0x7f7dc0354280>,
 <sklearn.neighbors._ball_tree.BallTree at 0x7f7dc8092ad0>,
 <sklearn.neighbors._ball_tree.BallTree at 0x7f7dc0353de0>,
 <sklearn.neighbors._ball_tree.BallTree at 0x7f7dc8cb47d0>,
 <sklearn.neighbors._ball_tree.BallTree at 0x7f7dc8098050>,
 <sklearn.neighbors._ball_tree.BallTree at 0x7f7dc80a87b0>,
 <sklearn.neighbors._ball_tree.BallTree at 0x7f7dc02639b0>,
 <sklearn.neighbors._ball_tree.BallTree at 0x7f7dc80a8c50>]
#from dask.diagnostics import ProgressBar
#with ProgressBar(), dask.config.set(scheduler='processes'):
ds_selection = ds_var.xoak.sel(lat=points_ds.latitude, lon=points_ds.longitude)
ds_selection
<xarray.Dataset> Size: 784MB
Dimensions:       (time: 15707, site_id: 2080)
Coordinates:
    lat           (site_id) float32 8kB dask.array<chunksize=(2080,), meta=np.ndarray>
    lon           (site_id) float32 8kB dask.array<chunksize=(2080,), meta=np.ndarray>
  * time          (time) datetime64[ns] 126kB 1979-10-01 ... 2022-10-01
    x             (site_id) float64 17kB -1.308e+06 -1.296e+06 ... -1.78e+06
    y             (site_id) float64 17kB -1.96e+05 -1.88e+05 ... 1.176e+06
Dimensions without coordinates: site_id
Data variables:
    SKINTEMPMEAN  (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray>
    SKINTEMPMAX   (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray>
    SKINTEMPMIN   (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray>
    SKINTEMPSTD   (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray>
    TSKINTEMPMAX  (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray>
    TSKINTEMPMIN  (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray>
Attributes: (12/80)
    BL_PBL_PHYSICS:                  1
    BOTTOM-TOP_GRID_DIMENSION:       51
    BOTTOM-TOP_PATCH_END_STAG:       51
    BOTTOM-TOP_PATCH_END_UNSTAG:     50
    BOTTOM-TOP_PATCH_START_STAG:     1
    BOTTOM-TOP_PATCH_START_UNSTAG:   1
    ...                              ...
    USE_THETA_M:                     0
    WEST-EAST_GRID_DIMENSION:        1368
    WEST-EAST_PATCH_END_STAG:        1368
    WEST-EAST_PATCH_END_UNSTAG:      1367
    WEST-EAST_PATCH_START_STAG:      1
    WEST-EAST_PATCH_START_UNSTAG:    1

Join selected data back to gage data with site ID, source, lat, lon#

ds_selection = xr.merge([points_ds, ds_selection])

Visualize data to verify results#

idx = 300
da = ds_selection.isel(time=idx).load()
df = da.to_dataframe()
df.hvplot.scatter(x='lon', y='lat', c=var[0], colormap='viridis').opts(clim=(260, 300))
da_grid = ds_var[var[0]].isel(time=idx).load()
da_grid.hvplot.quadmesh(x='lon', y='lat', rasterize=True, geo=True, tiles='OSM', cmap='viridis').opts('Image', clim=(260, 300))

Clean up data for saving#

# drop CONUS404 grid cell lat/lon, x/y values that data were pulled from, keeping only site's lat/lon to reduce confusion
ds_save = ds_selection.drop_vars(["lat", "lon", "x", "y"])
ds_save
<xarray.Dataset> Size: 784MB
Dimensions:       (site_id: 2080, time: 15707)
Coordinates:
  * site_id       (site_id) object 17kB 'Arizona_149' ... 'WashingtonCoast_98'
  * time          (time) datetime64[ns] 126kB 1979-10-01 ... 2022-10-01
Data variables:
    sources       (site_id) object 17kB 'norwest' 'norwest' ... 'norwest'
    longitude     (site_id) float64 17kB -112.8 -112.6 -111.8 ... -122.2 -122.1
    latitude      (site_id) float64 17kB 36.3 36.39 36.19 ... 47.8 47.31 47.6
    SKINTEMPMEAN  (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray>
    SKINTEMPMAX   (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray>
    SKINTEMPMIN   (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray>
    SKINTEMPSTD   (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray>
    TSKINTEMPMAX  (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray>
    TSKINTEMPMIN  (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray>

Save netcdf to OSN pod#

fs_write = fsspec.filesystem(
    's3',
    profile='osn-hytest',  ## This is the name of the AWS profile with credentials to write to the output bucket
    client_kwargs={'endpoint_url': 'https://usgs.osn.mghpcc.org/'}
)
fs_write.ls('hytest')
outfile = hytest_cat['pointsample-tutorial-output-osn'].urlpath
local_file = hytest_cat['pointsample-tutorial-output-osn'].urlpath.split('/')[-1]

Uncomment next two cells to save

# %%time
# ds_save.to_netcdf(local_file)
%%time
fs_write.upload(local_file, outfile)
---------------------------------------------------------------------------
FileNotFoundError                         Traceback (most recent call last)
File <timed eval>:1

File /home/conda/global/16102bfe-1731002172-4-pangeo/lib/python3.11/site-packages/fsspec/spec.py:1619, in AbstractFileSystem.upload(self, lpath, rpath, recursive, **kwargs)
   1617 def upload(self, lpath, rpath, recursive=False, **kwargs):
   1618     """Alias of `AbstractFileSystem.put`."""
-> 1619     return self.put(lpath, rpath, recursive=recursive, **kwargs)

File /home/conda/global/16102bfe-1731002172-4-pangeo/lib/python3.11/site-packages/fsspec/asyn.py:118, in sync_wrapper.<locals>.wrapper(*args, **kwargs)
    115 @functools.wraps(func)
    116 def wrapper(*args, **kwargs):
    117     self = obj or args[0]
--> 118     return sync(self.loop, func, *args, **kwargs)

File /home/conda/global/16102bfe-1731002172-4-pangeo/lib/python3.11/site-packages/fsspec/asyn.py:103, in sync(loop, func, timeout, *args, **kwargs)
    101     raise FSTimeoutError from return_result
    102 elif isinstance(return_result, BaseException):
--> 103     raise return_result
    104 else:
    105     return return_result

File /home/conda/global/16102bfe-1731002172-4-pangeo/lib/python3.11/site-packages/fsspec/asyn.py:56, in _runner(event, coro, result, timeout)
     54     coro = asyncio.wait_for(coro, timeout=timeout)
     55 try:
---> 56     result[0] = await coro
     57 except Exception as ex:
     58     result[0] = ex

File /home/conda/global/16102bfe-1731002172-4-pangeo/lib/python3.11/site-packages/fsspec/asyn.py:593, in AsyncFileSystem._put(self, lpath, rpath, recursive, callback, batch_size, maxdepth, **kwargs)
    590     put_file = callback.branch_coro(self._put_file)
    591     coros.append(put_file(lfile, rfile, **kwargs))
--> 593 return await _run_coros_in_chunks(
    594     coros, batch_size=batch_size, callback=callback
    595 )

File /home/conda/global/16102bfe-1731002172-4-pangeo/lib/python3.11/site-packages/fsspec/asyn.py:268, in _run_coros_in_chunks(coros, batch_size, callback, timeout, return_exceptions, nofiles)
    266     done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
    267     while done:
--> 268         result, k = await done.pop()
    269         results[k] = result
    271 return results

File /home/conda/global/16102bfe-1731002172-4-pangeo/lib/python3.11/site-packages/fsspec/asyn.py:245, in _run_coros_in_chunks.<locals>._run_coro(coro, i)
    243 async def _run_coro(coro, i):
    244     try:
--> 245         return await asyncio.wait_for(coro, timeout=timeout), i
    246     except Exception as e:
    247         if not return_exceptions:

File /home/conda/global/16102bfe-1731002172-4-pangeo/lib/python3.11/asyncio/tasks.py:452, in wait_for(fut, timeout)
    449 loop = events.get_running_loop()
    451 if timeout is None:
--> 452     return await fut
    454 if timeout <= 0:
    455     fut = ensure_future(fut, loop=loop)

File /home/conda/global/16102bfe-1731002172-4-pangeo/lib/python3.11/site-packages/fsspec/callbacks.py:81, in Callback.branch_coro.<locals>.func(path1, path2, **kwargs)
     78 @wraps(fn)
     79 async def func(path1, path2: str, **kwargs):
     80     with self.branched(path1, path2, **kwargs) as child:
---> 81         return await fn(path1, path2, callback=child, **kwargs)

File /home/conda/global/16102bfe-1731002172-4-pangeo/lib/python3.11/site-packages/s3fs/core.py:1200, in S3FileSystem._put_file(self, lpath, rpath, callback, chunksize, max_concurrency, **kwargs)
   1198     else:
   1199         await self._mkdir(lpath)
-> 1200 size = os.path.getsize(lpath)
   1201 callback.set_size(size)
   1203 if "ContentType" not in kwargs:

File <frozen genericpath>:50, in getsize(filename)

FileNotFoundError: [Errno 2] No such file or directory: '/shared/users/asnyder/jb/hytest/dataset_access/daily_skintemp_at_filtered_temperature_sites.nc'
# check that file has been written
fs_write.ls(hytest_cat['pointsample-tutorial-output-osn'].urlpath.split(local_file)[0])
['hytest/tutorials/data_access/daily_skintemp_at_filtered_temperature_sites.nc',
 'hytest/tutorials/data_access/filtered_temperature_sites.csv']
print(f'netcdf file size is {fs_write.size(outfile) / 1048576} MB')
netcdf file size is 122.14681243896484 MB

Read back in the nc data to verify it saved correctly#

with fs_write.open(outfile) as f:
    ds_final = xr.open_dataset(f)
ds_final
<xarray.Dataset> Size: 767MB
Dimensions:       (site_id: 2080, time: 15342)
Coordinates:
  * site_id       (site_id) <U104 865kB 'Arizona_149' ... 'WashingtonCoast_98'
  * time          (time) datetime64[ns] 123kB 1979-10-01 ... 2021-10-01
Data variables:
    sources       (site_id) <U20 166kB ...
    longitude     (site_id) float64 17kB ...
    latitude      (site_id) float64 17kB ...
    SKINTEMPMEAN  (time, site_id) float32 128MB ...
    SKINTEMPMAX   (time, site_id) float32 128MB ...
    SKINTEMPMIN   (time, site_id) float32 128MB ...
    SKINTEMPSTD   (time, site_id) float32 128MB ...
    TSKINTEMPMAX  (time, site_id) float32 128MB ...
    TSKINTEMPMIN  (time, site_id) float32 128MB ...