CONUS404 Site Data Selection#
Pull CONUS404 data at a set of lat/lon point locations.
import fsspec
import xarray as xr
import hvplot.xarray
import intake
import os
import cartopy.crs as ccrs
import metpy
import warnings
import pandas as pd
import s3fs
import xoak
import dask
import hvplot.pandas # to add .hvplot to DataFrames
from dask.distributed import LocalCluster, Client
warnings.filterwarnings('ignore')
/home/conda/hytest/738ff346-1752093942-203-pangeo/lib/python3.11/site-packages/xoak/__init__.py:1: UserWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html. The pkg_resources package is slated for removal as early as 2025-11-30. Refrain from using this package or pin to Setuptools<81.
from pkg_resources import DistributionNotFound, get_distribution
Open dataset from Intake Catalog#
Select
on-prem
dataset from /caldera if running on prem (Denali/Tallgrass)Select
cloud
/osn
object store data if running elsewhere
# open the hytest data intake catalog
hytest_cat = intake.open_catalog("https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml")
list(hytest_cat)
['conus404-catalog',
'benchmarks-catalog',
'conus404-drb-eval-tutorial-catalog',
'nhm-v1.0-daymet-catalog',
'nhm-v1.1-c404-bc-catalog',
'nhm-v1.1-gridmet-catalog',
'trends-and-drivers-catalog',
'nhm-prms-v1.1-gridmet-format-testing-catalog',
'nwis-streamflow-usgs-gages-onprem',
'nwis-streamflow-usgs-gages-osn',
'nwm21-streamflow-usgs-gages-onprem',
'nwm21-streamflow-usgs-gages-osn',
'nwm21-streamflow-s3',
'geofabric_v1_1-zip-osn',
'geofabric_v1_1_POIs_v1_1-osn',
'geofabric_v1_1_TBtoGFv1_POIs-osn',
'geofabric_v1_1_nhru_v1_1-osn',
'geofabric_v1_1_nhru_v1_1_simp-osn',
'geofabric_v1_1_nsegment_v1_1-osn',
'gages2_nndar-osn',
'wbd-zip-osn',
'huc12-geoparquet-osn',
'huc12-gpkg-osn',
'nwm21-scores',
'lcmap-s3',
'rechunking-tutorial-osn',
'pointsample-tutorial-sites-osn',
'pointsample-tutorial-output-osn']
# open the conus404 sub-catalog
cat = hytest_cat['conus404-catalog']
list(cat)
['conus404-hourly-onprem-hw',
'conus404-hourly-osn',
'conus404-daily-diagnostic-onprem-hw',
'conus404-daily-diagnostic-osn',
'conus404-daily-onprem-hw',
'conus404-daily-osn',
'conus404-monthly-onprem-hw',
'conus404-monthly-osn',
'conus404-hourly-ba-onprem-hw',
'conus404-hourly-ba-osn',
'conus404-daily-ba-onprem-hw',
'conus404-daily-ba-osn',
'conus404-pgw-hourly-onprem-hw',
'conus404-pgw-hourly-osn',
'conus404-pgw-daily-diagnostic-onprem-hw',
'conus404-pgw-daily-diagnostic-osn',
'conus404-pgw-daily-onprem-hw',
'conus404-pgw-daily-osn']
## Select the dataset you want to read into your notebook and preview its metadata
dataset = 'conus404-daily-diagnostic-osn'
cat[dataset]
<intake_xarray.xzarr.ZarrSource at 0x7f105a1366d0>
Parallelize with Dask#
Some of the steps we will take are aware of parallel clustered compute environments
using dask
. We’re going to start a cluster now so that future steps can take advantage
of this ability.
This is an optional step, but speed ups data loading significantly, especially when accessing data from the cloud.
We have documentation on how to start a Dask Cluster in different computing environments here.
#%run ../../environment_set_up/Start_Dask_Cluster_Nebari.ipynb
## If this notebook is not being run on Nebari, replace the above
## path name with a helper appropriate to your compute environment. Examples:
# %run ../../environment_set_up/Start_Dask_Cluster_Denali.ipynb
# %run ../../environment_set_up/Start_Dask_Cluster_Tallgrass.ipynb
# %run ../../environment_set_up/Start_Dask_Cluster_Desktop.ipynb
Explore the dataset#
ds = cat[dataset].to_dask()
ds
<xarray.Dataset> Size: 3TB Dimensions: (y: 1015, x: 1367, time: 15707) Coordinates: lat (y, x) float32 6MB dask.array<chunksize=(350, 350), meta=np.ndarray> lon (y, x) float32 6MB dask.array<chunksize=(350, 350), meta=np.ndarray> * time (time) datetime64[ns] 126kB 1979-10-01 ... 2022-10-01 * x (x) float64 11kB -2.732e+06 -2.728e+06 ... 2.728e+06 2.732e+06 * y (y) float64 8kB -2.028e+06 -2.024e+06 ... 2.024e+06 2.028e+06 Data variables: (12/38) LANDMASK (y, x) float32 6MB dask.array<chunksize=(350, 350), meta=np.ndarray> Q2MAX (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray> Q2MEAN (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray> Q2MIN (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray> Q2STD (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray> RAINCVMAX (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray> ... ... U10MEAN (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray> U10STD (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray> V10MAX (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray> V10MEAN (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray> V10STD (time, y, x) float32 87GB dask.array<chunksize=(24, 350, 350), meta=np.ndarray> crs int64 8B ... Attributes: (12/80) BL_PBL_PHYSICS: 1 BOTTOM-TOP_GRID_DIMENSION: 51 BOTTOM-TOP_PATCH_END_STAG: 51 BOTTOM-TOP_PATCH_END_UNSTAG: 50 BOTTOM-TOP_PATCH_START_STAG: 1 BOTTOM-TOP_PATCH_START_UNSTAG: 1 ... ... USE_THETA_M: 0 WEST-EAST_GRID_DIMENSION: 1368 WEST-EAST_PATCH_END_STAG: 1368 WEST-EAST_PATCH_END_UNSTAG: 1367 WEST-EAST_PATCH_START_STAG: 1 WEST-EAST_PATCH_START_UNSTAG: 1
# variables of interest
var = ['SKINTEMPMEAN', 'SKINTEMPMAX', 'SKINTEMPMIN', 'SKINTEMPSTD', 'TSKINTEMPMAX', 'TSKINTEMPMIN']
ds_var = ds[var]
Read in point data and clean#
points_df = hytest_cat['pointsample-tutorial-sites-osn'].read()
print(len(points_df))
points_df.head()
2080
site_id | sources | longitude | latitude | |
---|---|---|---|---|
0 | Arizona_149 | norwest | -112.757795 | 36.304562 |
1 | Arizona_150 | norwest | -112.632052 | 36.394490 |
2 | Arizona_167 | norwest | -111.799499 | 36.192755 |
3 | Arizona_174 | norwest | -112.093022 | 36.099446 |
4 | BSP-1 | ecosheds | -69.080000 | 46.130000 |
Drop rows will null lat, lon#
points_df[points_df.isnull().any(axis=1)]
site_id | sources | longitude | latitude |
---|
# drop rows will null lat, lon
points_df.dropna(subset = ['longitude', 'latitude'], inplace=True)
print(len(points_df))
2080
Set site_id as index#
#points_df = points_df.set_index(['site_id', 'longitude', 'latitude'])
points_df = points_df.set_index(['site_id'])
print(len(points_df))
2080
points_df.head()
sources | longitude | latitude | |
---|---|---|---|
site_id | |||
Arizona_149 | norwest | -112.757795 | 36.304562 |
Arizona_150 | norwest | -112.632052 | 36.394490 |
Arizona_167 | norwest | -111.799499 | 36.192755 |
Arizona_174 | norwest | -112.093022 | 36.099446 |
BSP-1 | ecosheds | -69.080000 | 46.130000 |
Make sure no site ids are duplicated#
points_df[points_df.index.duplicated()]
sources | longitude | latitude | |
---|---|---|---|
site_id |
Transform into xarray dataset#
points_ds = xr.Dataset.from_dataframe(points_df)
points_ds
<xarray.Dataset> Size: 67kB Dimensions: (site_id: 2080) Coordinates: * site_id (site_id) object 17kB 'Arizona_149' ... 'WashingtonCoast_98' Data variables: sources (site_id) object 17kB 'norwest' 'norwest' ... 'norwest' 'norwest' longitude (site_id) float64 17kB -112.8 -112.6 -111.8 ... -122.2 -122.1 latitude (site_id) float64 17kB 36.3 36.39 36.19 36.1 ... 47.8 47.31 47.6
Find data values at point locations#
First we will use xoak.set_index
(docs to set up an index tree that will enable efficient indexing for the lat and lon coordinates in the CONUS404 data subset. We will choose the sklearn_geo_balltree
method for indexing, which uses the Haversine distance metric and is a good choice for indexing latitude / longitude points.
ds_var.xoak.set_index(['lat', 'lon'], 'sklearn_geo_balltree')
ds_var.xoak.index
[<sklearn.neighbors._ball_tree.BallTree at 0x7f10400a87a0>,
<sklearn.neighbors._ball_tree.BallTree at 0x7f103c0d14a0>,
<sklearn.neighbors._ball_tree.BallTree at 0x7f103c0d1b50>,
<sklearn.neighbors._ball_tree.BallTree at 0x7f1040513fa0>,
<sklearn.neighbors._ball_tree.BallTree at 0x7f103c2cd970>,
<sklearn.neighbors._ball_tree.BallTree at 0x7f103c0d0660>,
<sklearn.neighbors._ball_tree.BallTree at 0x7f103c2cdea0>,
<sklearn.neighbors._ball_tree.BallTree at 0x7f103c2cd380>,
<sklearn.neighbors._ball_tree.BallTree at 0x7f104040bed0>,
<sklearn.neighbors._ball_tree.BallTree at 0x7f103c0d0e20>,
<sklearn.neighbors._ball_tree.BallTree at 0x7f104040c420>,
<sklearn.neighbors._ball_tree.BallTree at 0x7f10400bea30>]
#from dask.diagnostics import ProgressBar
#with ProgressBar(), dask.config.set(scheduler='processes'):
ds_selection = ds_var.xoak.sel(lat=points_ds.latitude, lon=points_ds.longitude)
ds_selection
<xarray.Dataset> Size: 784MB Dimensions: (time: 15707, site_id: 2080) Coordinates: lat (site_id) float32 8kB dask.array<chunksize=(2080,), meta=np.ndarray> lon (site_id) float32 8kB dask.array<chunksize=(2080,), meta=np.ndarray> * time (time) datetime64[ns] 126kB 1979-10-01 ... 2022-10-01 x (site_id) float64 17kB -1.308e+06 -1.296e+06 ... -1.78e+06 y (site_id) float64 17kB -1.96e+05 -1.88e+05 ... 1.176e+06 Dimensions without coordinates: site_id Data variables: SKINTEMPMEAN (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray> SKINTEMPMAX (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray> SKINTEMPMIN (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray> SKINTEMPSTD (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray> TSKINTEMPMAX (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray> TSKINTEMPMIN (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray> Attributes: (12/80) BL_PBL_PHYSICS: 1 BOTTOM-TOP_GRID_DIMENSION: 51 BOTTOM-TOP_PATCH_END_STAG: 51 BOTTOM-TOP_PATCH_END_UNSTAG: 50 BOTTOM-TOP_PATCH_START_STAG: 1 BOTTOM-TOP_PATCH_START_UNSTAG: 1 ... ... USE_THETA_M: 0 WEST-EAST_GRID_DIMENSION: 1368 WEST-EAST_PATCH_END_STAG: 1368 WEST-EAST_PATCH_END_UNSTAG: 1367 WEST-EAST_PATCH_START_STAG: 1 WEST-EAST_PATCH_START_UNSTAG: 1
Join selected data back to gage data with site ID, source, lat, lon#
ds_selection = xr.merge([points_ds, ds_selection])
Visualize data to verify results#
idx = 300
da = ds_selection.isel(time=idx).load()
df = da.to_dataframe()
df.hvplot.scatter(x='lon', y='lat', c=var[0], colormap='viridis').opts(clim=(260, 300))
da_grid = ds_var[var[0]].isel(time=idx).load()
da_grid.hvplot.quadmesh(x='lon', y='lat', rasterize=True, geo=True, tiles='OSM', cmap='viridis').opts('Image', clim=(260, 300))
Clean up data for saving#
# drop CONUS404 grid cell lat/lon, x/y values that data were pulled from, keeping only site's lat/lon to reduce confusion
ds_save = ds_selection.drop_vars(["lat", "lon", "x", "y"])
ds_save
<xarray.Dataset> Size: 784MB Dimensions: (site_id: 2080, time: 15707) Coordinates: * site_id (site_id) object 17kB 'Arizona_149' ... 'WashingtonCoast_98' * time (time) datetime64[ns] 126kB 1979-10-01 ... 2022-10-01 Data variables: sources (site_id) object 17kB 'norwest' 'norwest' ... 'norwest' longitude (site_id) float64 17kB -112.8 -112.6 -111.8 ... -122.2 -122.1 latitude (site_id) float64 17kB 36.3 36.39 36.19 ... 47.8 47.31 47.6 SKINTEMPMEAN (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray> SKINTEMPMAX (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray> SKINTEMPMIN (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray> SKINTEMPSTD (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray> TSKINTEMPMAX (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray> TSKINTEMPMIN (time, site_id) float32 131MB dask.array<chunksize=(24, 2080), meta=np.ndarray>
Save netcdf to OSN pod#
fs_write = fsspec.filesystem(
's3',
profile='osn-hytest', ## This is the name of the AWS profile with credentials to write to the output bucket
client_kwargs={'endpoint_url': 'https://usgs.osn.mghpcc.org/'}
)
fs_write.ls('hytest')
# get intake catalog entry url (intake catalog entry has already been created) to use to write the file
outfile = hytest_cat['pointsample-tutorial-output-osn']._entry._yaml()['sources']['pointsample-tutorial-output-osn']['args']['urlpath']
local_file = outfile.split('/')[-1]
Uncomment next two cells to save
# %%time
# ds_save.to_netcdf(local_file)
# %%time
# fs_write.upload(local_file, outfile)
# check that file has been written
fs_write.ls(outfile.split(local_file)[0])
['hytest/tutorials/data_access/daily_skintemp_at_filtered_temperature_sites.nc',
'hytest/tutorials/data_access/filtered_temperature_sites.csv']
print(f'netcdf file size is {fs_write.size(outfile) / 1048576} MB')
netcdf file size is 122.14681243896484 MB
Read back in the nc data to verify it saved correctly#
with fs_write.open(outfile) as f:
ds_final = xr.open_dataset(f)
ds_final
<xarray.Dataset> Size: 767MB Dimensions: (site_id: 2080, time: 15342) Coordinates: * site_id (site_id) <U104 865kB 'Arizona_149' ... 'WashingtonCoast_98' * time (time) datetime64[ns] 123kB 1979-10-01 ... 2021-10-01 Data variables: sources (site_id) <U20 166kB ... longitude (site_id) float64 17kB ... latitude (site_id) float64 17kB ... SKINTEMPMEAN (time, site_id) float32 128MB ... SKINTEMPMAX (time, site_id) float32 128MB ... SKINTEMPMIN (time, site_id) float32 128MB ... SKINTEMPSTD (time, site_id) float32 128MB ... TSKINTEMPMAX (time, site_id) float32 128MB ... TSKINTEMPMIN (time, site_id) float32 128MB ...