# CONUS404 Site Data Selection
Pull CONUS404 data at a set of lat/lon point locations.


In [None]:
import fsspec
import xarray as xr
import hvplot.xarray
import intake
import os
import cartopy.crs as ccrs
import metpy
import warnings
import pandas as pd
import s3fs
import xoak
import dask
import hvplot.pandas # to add .hvplot to DataFrames
from dask.distributed import LocalCluster, Client
warnings.filterwarnings('ignore')

## Open dataset from Intake Catalog
* Select `on-prem` dataset from /caldera if running on prem (Denali/Tallgrass)
* Select `cloud`/`osn` object store data if running elsewhere

In [None]:
# open the hytest data intake catalog
hytest_cat = intake.open_catalog("https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml")
list(hytest_cat)

In [None]:
# open the conus404 sub-catalog
cat = hytest_cat['conus404-catalog']
list(cat)

In [None]:
## Select the dataset you want to read into your notebook and preview its metadata
dataset = 'conus404-daily-diagnostic-osn' 
cat[dataset]

## Parallelize with Dask 
Some of the steps we will take are aware of parallel clustered compute environments
using `dask`. We're going to start a cluster now so that future steps can take advantage
of this ability. 

This is an optional step, but speed ups data loading significantly, especially 
when accessing data from the cloud.

We have documentation on how to start a Dask Cluster in different computing environments [here](../environment_set_up/clusters.md).

In [None]:
#%run ../environment_set_up/Start_Dask_Cluster_Nebari.ipynb
## If this notebook is not being run on Nebari/ESIP, replace the above 
## path name with a helper appropriate to your compute environment.  Examples:
# %run ../environment_set_up/Start_Dask_Cluster_Denali.ipynb
# %run ../environment_set_up/Start_Dask_Cluster_Tallgrass.ipynb
# %run ../environment_set_up/Start_Dask_Cluster_Desktop.ipynb
# %run ../environment_set_up/Start_Dask_Cluster_PangeoCHS.ipynb

## Explore the dataset

In [None]:
ds = cat[dataset].to_dask()

In [None]:
ds

In [None]:
# variables of interest
var = ['SKINTEMPMEAN', 'SKINTEMPMAX', 'SKINTEMPMIN', 'SKINTEMPSTD', 'TSKINTEMPMAX', 'TSKINTEMPMIN']

In [None]:
ds_var = ds[var]

In [None]:
# get projection info
ds = ds.metpy.parse_cf()
crs = ds[var[0]].metpy.cartopy_crs
#crs

## Read in point data and clean

In [None]:
points_df = hytest_cat['pointsample-tutorial-sites-osn'].read()
print(len(points_df))
points_df.head()

### Drop rows will null lat, lon

In [None]:
points_df[points_df.isnull().any(axis=1)]

In [None]:
# drop rows will null lat, lon
points_df.dropna(subset = ['longitude', 'latitude'], inplace=True)
print(len(points_df))

### Set site_id as index

In [None]:
#points_df = points_df.set_index(['site_id', 'longitude', 'latitude'])
points_df = points_df.set_index(['site_id'])
print(len(points_df))

In [None]:
points_df.head()

### Make sure no site ids are duplicated

In [None]:
points_df[points_df.index.duplicated()]

### Transform into xarray dataset

In [None]:
points_ds = xr.Dataset.from_dataframe(points_df)

In [None]:
points_ds

## Find data values at point locations

In [None]:
ds_var.xoak.set_index(['lat', 'lon'], 'sklearn_geo_balltree')

In [None]:
ds_var.xoak.index

In [None]:
#from dask.diagnostics import ProgressBar
#with ProgressBar(), dask.config.set(scheduler='processes'):
ds_selection = ds_var.xoak.sel(lat=points_ds.latitude, lon=points_ds.longitude)

In [None]:
ds_selection

## Join selected data back to gage data with site ID, source, lat, lon

In [None]:
ds_selection = xr.merge([points_ds, ds_selection])

## Visualize data to verify results

In [None]:
idx = 300

In [None]:
da = ds_selection.isel(time=idx).load()
df = da.to_dataframe()

In [None]:
df.hvplot.scatter(x='lon', y='lat', c=var[0], colormap='viridis').opts(clim=(260, 300))

In [None]:
da_grid = ds_var[var[0]].isel(time=idx).load()
da_grid.hvplot.quadmesh(x='lon', y='lat', rasterize=True, geo=True, tiles='OSM', cmap='viridis').opts('Image', clim=(260, 300))

## Clean up data for saving

In [None]:
# drop CONUS404 grid cell lat/lon, x/y values that data were pulled from, keeping only site's lat/lon to reduce confusion
ds_save = ds_selection.drop_vars(["lat", "lon", "x", "y"])

In [None]:
ds_save

## Save netcdf to OSN pod

In [None]:
fs_write = fsspec.filesystem(
    's3',
    profile='osn-hytest',  ## This is the name of the AWS profile with credentials to write to the output bucket
    client_kwargs={'endpoint_url': 'https://usgs.osn.mghpcc.org/'}
)

In [None]:
fs_write.ls('hytest')
outfile = hytest_cat['pointsample-tutorial-output-osn'].urlpath
local_file = hytest_cat['pointsample-tutorial-output-osn'].urlpath.split('/')[-1]

Uncomment next two cells to save

In [None]:
# %%time
# ds_save.to_netcdf(local_file)

In [None]:
%%time
fs_write.upload(local_file, outfile)

In [None]:
# check that file has been written
fs_write.ls(hytest_cat['pointsample-tutorial-output-osn'].urlpath.split(local_file)[0])

In [None]:
print(f'netcdf file size is {fs_write.size(outfile) / 1048576} MB')

### Read back in the nc data to verify it saved correctly

In [None]:
with fs_write.open(outfile) as f:
    ds_final = xr.open_dataset(f)

In [None]:
ds_final