Pull CONUS404 data at a set of lat/lon point locations.
import fsspec
import xarray as xr
import hvplot.xarray
import intake
import pystac
from packaging.version import Version
import zarr
import os
import cartopy.crs as ccrs
import metpy
import warnings
import pandas as pd
import s3fs
import xoak
import dask
import hvplot.pandas # to add .hvplot to DataFrames
from dask.distributed import LocalCluster, Client
warnings.filterwarnings('ignore')Open dataset from WMA STAC Catalog¶
def get_children(catalog, collection_id=None):
"""
This function retrieves a specified collection from a STAC catalog/collection and prints key metadata
for exploring/accessing the datasets contained within it.
If there is no collection ID provided, the collections in the top level of the catalog will be printed.
If a collection ID is provided, it will retrieve the collection with that ID from the input catalog/collection.
If the collection ID points to a dataset, it will print the assets available for the dataset.
If the collection ID points to another collection, it will list the child collections in the IDed collection.
Args:
catalog (pystac.Catalog | pystac.Collection): The STAC catalog/collection object.
collection_id (str): The ID of the collection or dataset to retrieve from catalog.
Returns:
collection (pystac.Catalog | pystac.Collection): The collection object corresponding to the provided ID
or the top-level catalog if no ID is provided.
"""
dataset = False
if collection_id:
collection = catalog.get_child(collection_id)
if collection.assets:
dataset = True
print(f"{collection_id} is a dataset. Please review the assets below and select one to open.")
else:
print(f"{collection_id} is a collection. Please review the child items and select one to open in the next cell.")
else:
collection = catalog
if dataset==True:
# List the assets
for asset in collection.assets:
print(f"Asset ID: {asset}")
print(f" Title: {collection.assets[asset].title}")
print(f" Description: {collection.assets[asset].description}")
else:
collections = list(collection.get_collections())
print(f"Number of collections: {len(collections)}")
print("Collections IDs:")
for child_collection in collections:
id = child_collection.id
cite_as = "Not available"
for link in child_collection.links:
if link.rel == "cite-as":
cite_as = link.target
print(f"- {id}, Source: {cite_as}")
return collection# url for the WMA STAC Catalog
catalog_url = "https://api.water.usgs.gov/gdp/pygeoapi/stac/stac-collection/"
# use pystac to read the catalog
catalog = pystac.Catalog.from_file(catalog_url)
# list the collections in the catalog
catalog = get_children(catalog)# select a collection from the catalog, replace the collection ID with the one you want to use:
collection = get_children(catalog, collection_id="conus404")# select a collection from the catalog, replace the collection ID with the one you want to use:
collection = get_children(collection, collection_id="conus404_xtrm_daily")# replace with the asset ID you want to use:
selected_asset_id = "zarr-s3-osn"
# read the asset metadata
asset = collection.assets[selected_asset_id]Parallelize with Dask¶
Some of the steps we will take are aware of parallel clustered compute environments
using dask. We’re going to start a cluster now so that future steps can take advantage
of this ability.
This is an optional step, but speed ups data loading significantly, especially when accessing data from the cloud.
We have documentation on how to start a Dask Cluster in different computing environments here.
#%run ../../environment_set_up/Start_Dask_Cluster_Nebari.ipynb
## If this notebook is not being run on Nebari, replace the above
## path name with a helper appropriate to your compute environment. Examples:
# %run ../../environment_set_up/Start_Dask_Cluster_Denali.ipynb
# %run ../../environment_set_up/Start_Dask_Cluster_Tallgrass.ipynb
# %run ../../environment_set_up/Start_Dask_Cluster_Desktop.ipynbExplore the dataset¶
if Version(zarr.__version__) < Version("3.0.0"):
ds = xr.open_dataset(
asset.href,
storage_options=asset.extra_fields['xarray:storage_options'],
**asset.extra_fields['xarray:open_kwargs']
)
else:
ds = xr.open_dataset(
asset.href,
storage_options=asset.extra_fields['xarray:storage_options'],
**asset.extra_fields['xarray:open_kwargs'],
zarr_format=2
)ds# variables of interest
var = ['SKINTEMPMEAN', 'SKINTEMPMAX', 'SKINTEMPMIN', 'SKINTEMPSTD', 'TSKINTEMPMAX', 'TSKINTEMPMIN']ds_var = ds[var]Read in point data and clean¶
hytest_cat = intake.open_catalog("https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml")
points_df = hytest_cat['pointsample-tutorial-sites-osn'].read()
print(len(points_df))
points_df.head()Drop rows will null lat, lon¶
points_df[points_df.isnull().any(axis=1)]# drop rows will null lat, lon
points_df.dropna(subset = ['longitude', 'latitude'], inplace=True)
print(len(points_df))Set site_id as index¶
#points_df = points_df.set_index(['site_id', 'longitude', 'latitude'])
points_df = points_df.set_index(['site_id'])
print(len(points_df))points_df.head()Make sure no site ids are duplicated¶
points_df[points_df.index.duplicated()]Transform into xarray dataset¶
points_ds = xr.Dataset.from_dataframe(points_df)points_dsFind data values at point locations¶
First we will use xoak.set_index (docs to set up an index tree that will enable efficient indexing for the lat and lon coordinates in the CONUS404 data subset. We will choose the sklearn_geo_balltree method for indexing, which uses the Haversine distance metric and is a good choice for indexing latitude / longitude points.
ds_var.xoak.set_index(['lat', 'lon'], 'sklearn_geo_balltree')ds_var.xoak.index#from dask.diagnostics import ProgressBar
#with ProgressBar(), dask.config.set(scheduler='processes'):
ds_selection = ds_var.xoak.sel(lat=points_ds.latitude, lon=points_ds.longitude)ds_selectionJoin selected data back to gage data with site ID, source, lat, lon¶
ds_selection = xr.merge([points_ds, ds_selection])Visualize data to verify results¶
idx = 300da = ds_selection.isel(time=idx).load()
df = da.to_dataframe()df.hvplot.scatter(x='lon', y='lat', c=var[0], colormap='viridis').opts(clim=(260, 300))da_grid = ds_var[var[0]].isel(time=idx).load()
da_grid.hvplot.quadmesh(x='lon', y='lat', rasterize=True, geo=True, tiles='OSM', cmap='viridis').opts('Image', clim=(260, 300))Clean up data for saving¶
# drop CONUS404 grid cell lat/lon, x/y values that data were pulled from, keeping only site's lat/lon to reduce confusion
ds_save = ds_selection.drop_vars(["lat", "lon", "x", "y"])ds_saveSave netcdf to OSN pod¶
fs_write = fsspec.filesystem(
's3',
profile='osn-hytest', ## This is the name of the AWS profile with credentials to write to the output bucket
client_kwargs={'endpoint_url': 'https://usgs.osn.mghpcc.org/'}
)fs_write.ls('hytest')
# get intake catalog entry url (intake catalog entry has already been created) to use to write the file
outfile = hytest_cat['pointsample-tutorial-output-osn']._entry._yaml()['sources']['pointsample-tutorial-output-osn']['args']['urlpath']
local_file = outfile.split('/')[-1]Uncomment next two cells to save
# %%time
# ds_save.to_netcdf(local_file)# %%time
# fs_write.upload(local_file, outfile)# check that file has been written
fs_write.ls(outfile.split(local_file)[0])print(f'netcdf file size is {fs_write.size(outfile) / 1048576} MB')Read back in the nc data to verify it saved correctly¶
with fs_write.open(outfile) as f:
ds_final = xr.open_dataset(f)ds_final