v0.0.1_alpha01

CONUS404 Site Data Selection#

Pull CONUS404 data at a set of lat/lon point locations.

import fsspec
import xarray as xr
import hvplot.xarray
import intake
import os
import cartopy.crs as ccrs
import metpy
import warnings
import pandas as pd
import s3fs
import xoak
import dask
import hvplot.pandas # to add .hvplot to DataFrames
from dask.distributed import LocalCluster, Client
warnings.filterwarnings('ignore')

Open dataset from Intake Catalog#

  • Select on-prem dataset from /caldera if running on prem (Denali/Tallgrass)

  • Select cloud/osn object store data if running elsewhere

# open the hytest data intake catalog
hytest_cat = intake.open_catalog("https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml")
list(hytest_cat)
['conus404-catalog',
 'conus404-drb-eval-tutorial-catalog',
 'nhm-v1.0-daymet-catalog',
 'nhm-v1.1-c404-bc-catalog',
 'nhm-v1.1-gridmet-catalog',
 'trends-and-drivers-catalog',
 'nhm-prms-v1.1-gridmet-format-testing-catalog',
 'nwis-streamflow-usgs-gages-onprem',
 'nwis-streamflow-usgs-gages-osn',
 'nwm21-streamflow-usgs-gages-onprem',
 'nwm21-streamflow-usgs-gages-osn',
 'nwm21-streamflow-cloud',
 'geofabric_v1_1-zip-osn',
 'geofabric_v1_1_POIs_v1_1-osn',
 'geofabric_v1_1_TBtoGFv1_POIs-osn',
 'geofabric_v1_1_nhru_v1_1-osn',
 'geofabric_v1_1_nhru_v1_1_simp-osn',
 'geofabric_v1_1_nsegment_v1_1-osn',
 'gages2_nndar-osn',
 'wbd-zip-osn',
 'huc12-geoparquet-osn',
 'huc12-gpkg-osn',
 'nwm21-scores',
 'lcmap-cloud',
 'rechunking-tutorial-osn',
 'pointsample-tutorial-sites-osn',
 'pointsample-tutorial-output-osn']
# open the conus404 sub-catalog
cat = hytest_cat['conus404-catalog']
list(cat)
['conus404-hourly-onprem',
 'conus404-hourly-cloud',
 'conus404-hourly-osn',
 'conus404-daily-diagnostic-onprem',
 'conus404-daily-diagnostic-cloud',
 'conus404-daily-diagnostic-osn',
 'conus404-daily-onprem',
 'conus404-daily-cloud',
 'conus404-daily-osn',
 'conus404-monthly-onprem',
 'conus404-monthly-cloud',
 'conus404-monthly-osn',
 'conus404-hourly-ba-osn',
 'conus404-daily-ba-osn']
## Select the dataset you want to read into your notebook and preview its metadata
dataset = 'conus404-daily-diagnostic-osn' 
cat[dataset]
conus404-daily-diagnostic-osn:
  args:
    consolidated: true
    storage_options:
      anon: true
      client_kwargs:
        endpoint_url: https://usgs.osn.mghpcc.org/
      requester_pays: false
    urlpath: s3://hytest/conus404/conus404_daily_xtrm.zarr
  description: 'CONUS404 40 years of daily diagnostic output (maximum, minimum, mean,
    and standard deviation) for water vapor (Q2), grid-scale precipitation (RAINNC),
    skin temperature (SKINTEMP), wind speed at 10 meter height (SPDUV10), temperature
    at 2 meter height (T2), and U- and V-component of wind at 10 meters with respect
    to model grid (U10, V10). These files were created wrfxtrm model output files
    (see ScienceBase data release for more details: https://www.sciencebase.gov/catalog/item/6372cd09d34ed907bf6c6ab1).
    You can work with this data for free in any environment (there are no egress fees).'
  driver: intake_xarray.xzarr.ZarrSource
  metadata:
    catalog_dir: https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/subcatalogs

Parallelize with Dask#

Some of the steps we will take are aware of parallel clustered compute environments using dask. We’re going to start a cluster now so that future steps can take advantage of this ability.

This is an optional step, but speed ups data loading significantly, especially when accessing data from the cloud.

We have documentation on how to start a Dask Cluster in different computing environments here.

#%run ../environment_set_up/Start_Dask_Cluster_Nebari.ipynb
## If this notebook is not being run on Nebari/ESIP, replace the above 
## path name with a helper appropriate to your compute environment.  Examples:
# %run ../environment_set_up/Start_Dask_Cluster_Denali.ipynb
# %run ../environment_set_up/Start_Dask_Cluster_Tallgrass.ipynb
# %run ../environment_set_up/Start_Dask_Cluster_Desktop.ipynb
# %run ../environment_set_up/Start_Dask_Cluster_PangeoCHS.ipynb

Explore the dataset#

ds = cat[dataset].to_dask()
ds
<xarray.Dataset> Size: 3TB
Dimensions:       (y: 1015, x: 1367, time: 15707)
Coordinates:
    lat           (y, x) float32 6MB dask.array<chunksize=(350, 350), meta=np.ndarray>
    lon           (y, x) float32 6MB dask.array<chunksize=(350, 350), meta=np.ndarray>
  * time          (time) datetime64[ns] 126kB 1979-10-01 ... 2022-10-01
  * x             (x) float64 11kB -2.732e+06 -2.728e+06 ... 2.728e+06 2.732e+06
  * y             (y) float64 8kB -2.028e+06 -2.024e+06 ... 2.024e+06 2.028e+06
Data variables: (12/38)
    LANDMASK      (y, x) float32 6MB dask.array<chunksize=(350, 350), meta=np.ndarray>
    Q2MAX         (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray>
    Q2MEAN        (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray>
    Q2MIN         (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray>
    Q2STD         (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray>
    RAINCVMAX     (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray>
    ...            ...
    U10MEAN       (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray>
    U10STD        (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray>
    V10MAX        (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray>
    V10MEAN       (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray>
    V10STD        (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray>
    crs           int64 8B ...
Attributes: (12/80)
    BL_PBL_PHYSICS:                  1
    BOTTOM-TOP_GRID_DIMENSION:       51
    BOTTOM-TOP_PATCH_END_STAG:       51
    BOTTOM-TOP_PATCH_END_UNSTAG:     50
    BOTTOM-TOP_PATCH_START_STAG:     1
    BOTTOM-TOP_PATCH_START_UNSTAG:   1
    ...                              ...
    USE_THETA_M:                     0
    WEST-EAST_GRID_DIMENSION:        1368
    WEST-EAST_PATCH_END_STAG:        1368
    WEST-EAST_PATCH_END_UNSTAG:      1367
    WEST-EAST_PATCH_START_STAG:      1
    WEST-EAST_PATCH_START_UNSTAG:    1
# variables of interest
var = ['SKINTEMPMEAN', 'SKINTEMPMAX', 'SKINTEMPMIN', 'SKINTEMPSTD', 'TSKINTEMPMAX', 'TSKINTEMPMIN']
ds_var = ds[var]
# get projection info
ds = ds.metpy.parse_cf()
crs = ds[var[0]].metpy.cartopy_crs
#crs
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[10], line 3
      1 # get projection info
      2 ds = ds.metpy.parse_cf()
----> 3 crs = ds[var[0]].metpy.cartopy_crs
      4 #crs

File /home/conda/global/774f2bb1-1715626348-8-pangeo/lib/python3.11/site-packages/metpy/xarray.py:266, in MetPyDataArrayAccessor.cartopy_crs(self)
    263 @property
    264 def cartopy_crs(self):
    265     """Return the coordinate reference system (CRS) as a cartopy object."""
--> 266     return self.crs.to_cartopy()

File /home/conda/global/774f2bb1-1715626348-8-pangeo/lib/python3.11/site-packages/metpy/xarray.py:259, in MetPyDataArrayAccessor.crs(self)
    257 if 'metpy_crs' in self._data_array.coords:
    258     return self._data_array.coords['metpy_crs'].item()
--> 259 raise AttributeError('crs attribute is not available. You may need to use the'
    260                      ' `parse_cf` or `assign_crs` methods. Consult the "xarray'
    261                      ' with MetPy Tutorial" for more details.')

AttributeError: crs attribute is not available. You may need to use the `parse_cf` or `assign_crs` methods. Consult the "xarray with MetPy Tutorial" for more details.

Read in point data and clean#

points_df = hytest_cat['pointsample-tutorial-sites-osn'].read()
print(len(points_df))
points_df.head()

Drop rows will null lat, lon#

points_df[points_df.isnull().any(axis=1)]
# drop rows will null lat, lon
points_df.dropna(subset = ['longitude', 'latitude'], inplace=True)
print(len(points_df))

Set site_id as index#

#points_df = points_df.set_index(['site_id', 'longitude', 'latitude'])
points_df = points_df.set_index(['site_id'])
print(len(points_df))
points_df.head()

Make sure no site ids are duplicated#

points_df[points_df.index.duplicated()]

Transform into xarray dataset#

points_ds = xr.Dataset.from_dataframe(points_df)
points_ds

Find data values at point locations#

ds_var.xoak.set_index(['lat', 'lon'], 'sklearn_geo_balltree')
ds_var.xoak.index
#from dask.diagnostics import ProgressBar
#with ProgressBar(), dask.config.set(scheduler='processes'):
ds_selection = ds_var.xoak.sel(lat=points_ds.latitude, lon=points_ds.longitude)
ds_selection

Join selected data back to gage data with site ID, source, lat, lon#

ds_selection = xr.merge([points_ds, ds_selection])

Visualize data to verify results#

idx = 300
da = ds_selection.isel(time=idx).load()
df = da.to_dataframe()
df.hvplot.scatter(x='lon', y='lat', c=var[0], colormap='viridis').opts(clim=(260, 300))
da_grid = ds_var[var[0]].isel(time=idx).load()
da_grid.hvplot.quadmesh(x='lon', y='lat', rasterize=True, geo=True, tiles='OSM', cmap='viridis').opts('Image', clim=(260, 300))

Clean up data for saving#

# drop CONUS404 grid cell lat/lon, x/y values that data were pulled from, keeping only site's lat/lon to reduce confusion
ds_save = ds_selection.drop_vars(["lat", "lon", "x", "y"])
ds_save

Save netcdf to OSN pod#

fs_write = fsspec.filesystem(
    's3',
    profile='osn-hytest',  ## This is the name of the AWS profile with credentials to write to the output bucket
    client_kwargs={'endpoint_url': 'https://usgs.osn.mghpcc.org/'}
)
fs_write.ls('hytest')
outfile = hytest_cat['pointsample-tutorial-output-osn'].urlpath
local_file = hytest_cat['pointsample-tutorial-output-osn'].urlpath.split('/')[-1]

Uncomment next two cells to save

# %%time
# ds_save.to_netcdf(local_file)
%%time
fs_write.upload(local_file, outfile)
# check that file has been written
fs_write.ls(hytest_cat['pointsample-tutorial-output-osn'].urlpath.split(local_file)[0])
print(f'netcdf file size is {fs_write.size(outfile) / 1048576} MB')

Read back in the nc data to verify it saved correctly#

with fs_write.open(outfile) as f:
    ds_final = xr.open_dataset(f)
ds_final