3.3 SNOTEL#

Having explored the details of SNOTEL and its significance, let’s dive into the exciting journey of collecting and processing the data.

Characteristics of SNOTEL Data#

Product/Data Type: SNOTEL Station Daily Data

Spatial Resolution: Point data specific to each SNOTEL station location present in the western USA within bounding box of southwest_lon = -125.0

southwest_lat = 25.0

northeast_lon = -100.0

northeast_lat = 49.0

Temporal Resolution: Daily

Data Format: CSV

This script automates the collection of Snow Water Equivalent (SWE) data from SNOTEL stations, filters it based on geographic criteria, and saves it into CSV files. By the end of this process, we’ll have a valuable dataset, ready to provide insights into SWE, snow depth, and temperature trends in the Western United States.

import math
import json
import requests
import pandas as pd
import csv
import io
import os
import dask
import dask.dataframe as dd

Common Python libraries for mathematical operations, data handling, HTTP requests, CSV operations, file handling, and parallel computing with Dask.

# homedir = os.path.expanduser('~')
working_dir = f"../data/snotel_training_data"
southwest_lon = -125.0
southwest_lat = 25.0
northeast_lon = -100.0
northeast_lat = 49.0
#for demonstration purposes, we will use a time period of 2022-2023
train_start_date = "2022-01-03"
train_end_date = "2022-12-31"

We’ve defined our geographic criteria using latitude and longitude bounds. For the demonstration purpose we set the start and end dates, and specified the directory for saving all our work.

3.3.1 Download Snotel Station Data#

The below code downloads station data from a web API if it doesn’t already exist locally. It saves the data as a JSON file

output_json_file = f'{working_dir}/all_snotel_cdec_stations.json'
if not os.path.exists(output_json_file):
    # Fetch data from the URL
    response = requests.get("https://wcc.sc.egov.usda.gov/awdbRestApi/services/v1/stations?activeOnly=true&returnForecastPointMetadata=false&returnReservoirMetadata=false&returnStationElements=false")
    

    # Check if the request was successful (status code 200)
    if response.status_code == 200:
        # Decode the JSON content
        json_content = response.json()

        # Save the JSON content to a file
        with open(output_json_file, 'w') as json_file:
            json.dump(json_content, json_file, indent=2)

        print(f"Data downloaded and saved to {output_json_file}")
    else:
        print(f"Failed to download data. Status code: {response.status_code}")
else:
    print(f"The file {output_json_file} already exists.")
Data downloaded and saved to ../data/snotel_training_data/all_snotel_cdec_stations.json

3.3.2 Convert JSON Data to CSV#

# read the json file and convert it to csv
csv_file_path = f'{working_dir}/all_snotel_cdec_stations.csv'
if not os.path.exists(csv_file_path):
    # Read the JSON file
    with open(output_json_file, 'r') as json_file:
        json_content = json.load(json_file)

    # Check the content (print or analyze as needed)
    #print("JSON Content:")
    #print(json.dumps(json_content, indent=2))

    # Convert JSON data to a list of dictionaries (assuming JSON is a list of objects)
    data_list = json_content if isinstance(json_content, list) else [json_content]

    # Get the header from the keys of the first dictionary (assuming consistent structure)
    header = data_list[0].keys()
    # Write to CSV file
    with open(csv_file_path, 'w', newline='') as csv_file:
        csv_writer = csv.DictWriter(csv_file, fieldnames=header)
        csv_writer.writeheader()
        csv_writer.writerows(data_list)

    print(f"Data converted and saved to {csv_file_path}")

else:
    print(f"The csv all snotel/cdec stations exists.")
Data converted and saved to ../data/snotel_training_data/all_snotel_cdec_stations.csv

The above code converts the downloaded snotel station json data to csv and saves the file.

3.3.3 Filter Active Snotel Stations in the Western U.S. and Save as CSV#

active_csv_file_path = f'{working_dir}/all_snotel_cdec_stations_active_in_westus.csv'
if not os.path.exists(active_csv_file_path):
    all_df = pd.read_csv(csv_file_path)
    print(all_df.head())
    all_df['endDate'] = pd.to_datetime(all_df['endDate'])
    print(all_df.shape)
    end_date = pd.to_datetime('2050-01-01')
    filtered_df = all_df[all_df['endDate'] > end_date]
    
    # Filter rows within the latitude and longitude ranges
    filtered_df = filtered_df[
        (filtered_df['latitude'] >= southwest_lat) & (filtered_df['latitude'] <= northeast_lat) &
        (filtered_df['longitude'] >= southwest_lon) & (filtered_df['longitude'] <= northeast_lon)
    ]

    # Print the original and filtered DataFrames
    print("Filtered DataFrame:")
    print(filtered_df.shape)
    filtered_df.to_csv(active_csv_file_path, index=False)
else:
    print(f"The active csv already exists: {active_csv_file_path}")
  stationTriplet stationId stateCode networkCode                    name  \
0   2057:AL:SCAN      2057        AL        SCAN                AAMU-JTG   
1    ABY:CA:SNOW       ABY        CA        SNOW                   Abbey   
2   0010:ID:COOP      0010        ID        COOP  Aberdeen Experimnt Stn   
3  1F01A:BC:SNOW     1F01A        BC        SNOW           Aberdeen Lake   
4   0041:NM:COOP      0041        NM        COOP             Abiquiu Dam   

  dcoCode  countyName           huc  elevation  latitude  longitude  \
0      GC     Madison  6.030002e+10      860.0  34.78333  -86.55000   
1      UN      Plumas  1.802012e+11     5650.0  39.95500 -120.53800   
2      ID     Bingham  1.704021e+11     4410.0  42.95000 -112.83333   
3      OR     UNKNOWN           NaN     4298.0  50.14733 -119.05340   
4      UN  Rio Arriba  1.302010e+11     6380.0  36.23333 -106.43333   

   dataTimeZone pedonCode shefId         beginDate           endDate  
0          -6.0     27979  AAMA1  2002-02-23 00:00  2100-01-01 00:00  
1           NaN       NaN    NaN  1963-02-01 00:00  2100-01-01 00:00  
2           NaN       NaN  ABDI1  1914-01-01 00:00  2100-01-01 00:00  
3           NaN       NaN  ABLQ2  1939-04-01 00:00  2100-01-01 00:00  
4           NaN       NaN  ABIN5  1957-01-01 00:00  2100-01-01 00:00  
(4414, 16)
Filtered DataFrame:
(3674, 16)

The above code filters out the snotel stations in the western us based on the bounderies specified. And it also filters out the active stations based on the end date field and saved in the file all_snotel_cdec_stations_active_in_westus.csv which will be used for later.

3.3.4 Retreive Snow Water Equivalent (SWE) from Snotel Stations#

def remove_commented_lines(text):
    lines = text.split(os.linesep)
    cleaned_lines = []
    for line in lines:
        if not line.startswith('#'):
            cleaned_lines.append(line)
    cleaned_text = os.linesep.join(cleaned_lines)
    return cleaned_text

Removes lines starting with # from a text.

new_base_station_list_file = f"{working_dir}/all_snotel_cdec_stations_active_in_westus.csv"
new_base_df = pd.read_csv(new_base_station_list_file)
print(new_base_df.head())

csv_file = f'{new_base_station_list_file}_swe_restored_dask_all_vars.csv'
start_date = train_start_date
end_date = train_end_date

 # Create an empty Pandas DataFrame with the desired columns
result_df = pd.DataFrame(columns=[
    'station_name', 
    'date', 
    'lat', 
    'lon', 
    'swe_value', 
    'change_in_swe_inch', 
    'snow_depth', 
    'change_in_swe_inch', 
    'air_temperature_observed_f'
])
    stationTriplet stationId stateCode networkCode                    name  \
0      ABY:CA:SNOW       ABY        CA        SNOW                   Abbey   
1     0010:ID:COOP      0010        ID        COOP  Aberdeen Experimnt Stn   
2     0041:NM:COOP      0041        NM        COOP             Abiquiu Dam   
3  08108010:NM:BOR  08108010        NM         BOR       Abiquiu Reservoir   
4    13E19:ID:SNOW     13E19        ID        SNOW           Above Gilmore   

  dcoCode  countyName           huc  elevation  latitude  longitude  \
0      UN      Plumas  1.802012e+11     5650.0  39.95500 -120.53800   
1      ID     Bingham  1.704021e+11     4410.0  42.95000 -112.83333   
2      UN  Rio Arriba  1.302010e+11     6380.0  36.23333 -106.43333   
3      CO  Rio Arriba  1.302010e+11     6180.0  36.23700 -106.42912   
4      ID       Lemhi  1.706020e+11     8289.0  44.45615 -113.30097   

   dataTimeZone  pedonCode shefId         beginDate     endDate  
0           NaN        NaN    NaN  1963-02-01 00:00  2100-01-01  
1           NaN        NaN  ABDI1  1914-01-01 00:00  2100-01-01  
2           NaN        NaN  ABIN5  1957-01-01 00:00  2100-01-01  
3           NaN        NaN    NaN  1964-09-01 00:00  2100-01-01  
4           NaN        NaN  ABGI1  1961-01-01 00:00  2100-01-01  

Reads a CSV file containing active station data for the Western U.S. into a DataFrame and initializes an empty DataFrame with specified columns to store results related to snow water equivalent (SWE) and other variables.

3.3.5 Process Data using Dask#

Dask is a flexible parallel computing library for analytic computing. It is designed to scale from a single machine to a cluster of machines. @dask.delayed decorator is used to mark the process_station function for lazy execution. This means that the function will not execute immediately but will instead create a task graph that can be executed in parallel.

By parallelizing the processing of each station, Dask can utilize multiple CPU cores or even multiple machines, leading to faster execution times.

# Function to process each station
@dask.delayed
def process_station(station):
    location_name = station['name']
    location_triplet = station['stationTriplet']
    location_elevation = station['elevation']
    location_station_lat = station['latitude']
    location_station_long = station['longitude']

    url = f"https://wcc.sc.egov.usda.gov/reportGenerator/view_csv/customSingleStationReport/daily/{location_triplet}%7Cid%3D%22%22%7Cname/{start_date},{end_date}%2C0/WTEQ%3A%3Avalue%2CWTEQ%3A%3Adelta%2CSNWD%3A%3Avalue%2CSNWD%3A%3Adelta%2CTOBS%3A%3Avalue"

    r = requests.get(url)
    text = remove_commented_lines(r.text)
    reader = csv.DictReader(io.StringIO(text))
    json_data = json.loads(json.dumps(list(reader)))

    entries = []
    
    for entry in json_data:
        try:
            # {'Date': '2021-06-18', 'Snow Water Equivalent (in) Start of Day Values': '', 'Change In Snow Water Equivalent (in)': '', 'Snow Depth (in) Start of Day Values': '', 'Change In Snow Depth (in)': '', 'Air Temperature Observed (degF) Start of Day Values': '70.5'}
            required_data = {
            'station_name': location_name,
            'date': entry.get('Date', None),
            'lat': location_station_lat, 
            'lon': location_station_long,
            'swe_value': entry.get('Snow Water Equivalent (in) Start of Day Values',None),
            'change_in_swe_inch': entry.get('Change In Snow Water Equivalent (in)', None),
            'snow_depth': entry.get('Snow Depth (in) Start of Day Values',None),
            'change_in_swe_inch': entry.get('Change In Snow Depth (in)', None),
            'air_temperature_observed_f': entry.get('Air Temperature Observed (degF) Start of Day Values',None)
            }
            entries.append(required_data)
        except Exception as e:
            print("entry = ", entry)
            raise e
    return pd.DataFrame(entries)
location_name = station['name']
location_triplet = station['stationTriplet']
location_elevation = station['elevation']
location_station_lat = station['latitude']
location_station_long = station['longitude']

Extracts various pieces of information about the station from the station dictionary.

url = f"https://wcc.sc.egov.usda.gov/reportGenerator/view_csv/customSingleStationReport/daily/{location_triplet}%7Cid%3D%22%22%7Cname/{start_date},{end_date}%2C0/WTEQ%3A%3Avalue%2CWTEQ%3A%3Adelta%2CSNWD%3A%3Avalue%2CSNWD%3A%3Adelta%2CTOBS%3A%3Avalue"

Constructs a URL to request data for the station using its stationTriplet and the specified date range (start_date and end_date)

  • Sends a GET request to the constructed URL.

  • Removes commented lines from the response text.

  • Parses the CSV data into a list of dictionaries (json_data)

  • Initializes an empty list to store the processed data entries.

for entry in json_data:
    try:
        required_data = {
            'station_name': location_name,
            'date': entry.get('Date', None),
            'lat': location_station_lat, 
            'lon': location_station_long,
            'swe_value': entry.get('Snow Water Equivalent (in) Start of Day Values', None),
            'change_in_swe_inch': entry.get('Change In Snow Water Equivalent (in)', None),
            'snow_depth': entry.get('Snow Depth (in) Start of Day Values', None),
            'change_in_swe_inch': entry.get('Change In Snow Depth (in)', None),
            'air_temperature_observed_f': entry.get('Air Temperature Observed (degF) Start of Day Values', None)
        }
        entries.append(required_data)
    except Exception as e:
        print("entry = ", entry)
        raise e

For each entry in the json data

  • parses the json data

  • Extracts the required fields and constructs a dictionary (required_data) for each entry.Appends the dictionary to the entries list.

At the end converts the list of dictionaries (entries) into a Pandas DataFrame and returns it.

# List of delayed computations for each station
delayed_results = [process_station(row) for _, row in new_base_df.iterrows()]

# Compute the delayed results
result_lists = dask.compute(*delayed_results)

# Concatenate the lists into a Pandas DataFrame
result_df = pd.concat(result_lists, ignore_index=True)

# Print the final Pandas DataFrame
print(result_df.head())

# Save the DataFrame to a CSV file
result_df.to_csv(csv_file, index=False)
     station_name        date    lat        lon swe_value change_in_swe_inch  \
0  Adams Ranch #1  2022-01-03  34.25 -105.41667                                
1  Adams Ranch #1  2022-01-04  34.25 -105.41667                                
2  Adams Ranch #1  2022-01-05  34.25 -105.41667                                
3  Adams Ranch #1  2022-01-06  34.25 -105.41667                                
4  Adams Ranch #1  2022-01-07  34.25 -105.41667                                

  snow_depth air_temperature_observed_f  
0                                  21.7  
1                                  36.9  
2                                  42.4  
3                                  45.3  
4                                  45.7  

Iterates over each station in the active snotel stations and calls teh process_station function to process.

collects the delayed computations in a list called delayed_results.

Uses dask.compute to execute all delayed computations in parallel.

The *delayed_results syntax unpacks the list into individual arguments.

The results are stored in result_lists, which is a list of DataFrames.

result_df = pd.concat(result_lists, ignore_index=True)

Concatenates all DataFrames in result_lists into a single DataFrame result_df.

The ignore_index=True parameter ensures that the index is reset.

Saves the final DataFrame result_df to a CSV file specified by csv_file. The index=False parameter ensures that the index is not included in the CSV file.