3.3 SNOTEL#
Having explored the details of SNOTEL and its significance, let’s dive into the exciting journey of collecting and processing the data.
Characteristics of SNOTEL Data#
Product/Data Type: SNOTEL Station Daily Data
Spatial Resolution: Point data specific to each SNOTEL station location present in the western USA within bounding box of southwest_lon = -125.0
southwest_lat = 25.0
northeast_lon = -100.0
northeast_lat = 49.0
Temporal Resolution: Daily
Data Format: CSV
This script automates the collection of Snow Water Equivalent (SWE) data from SNOTEL stations, filters it based on geographic criteria, and saves it into CSV files. By the end of this process, we’ll have a valuable dataset, ready to provide insights into SWE, snow depth, and temperature trends in the Western United States.
import math
import json
import requests
import pandas as pd
import csv
import io
import os
import dask
import dask.dataframe as dd
Common Python libraries for mathematical operations, data handling, HTTP requests, CSV operations, file handling, and parallel computing with Dask.
# homedir = os.path.expanduser('~')
working_dir = f"../data/snotel_training_data"
southwest_lon = -125.0
southwest_lat = 25.0
northeast_lon = -100.0
northeast_lat = 49.0
#for demonstration purposes, we will use a time period of 2022-2023
train_start_date = "2022-01-03"
train_end_date = "2022-12-31"
We’ve defined our geographic criteria using latitude and longitude bounds. For the demonstration purpose we set the start and end dates, and specified the directory for saving all our work.
3.3.1 Download Snotel Station Data#
The below code downloads station data from a web API if it doesn’t already exist locally. It saves the data as a JSON file
output_json_file = f'{working_dir}/all_snotel_cdec_stations.json'
if not os.path.exists(output_json_file):
# Fetch data from the URL
response = requests.get("https://wcc.sc.egov.usda.gov/awdbRestApi/services/v1/stations?activeOnly=true&returnForecastPointMetadata=false&returnReservoirMetadata=false&returnStationElements=false")
# Check if the request was successful (status code 200)
if response.status_code == 200:
# Decode the JSON content
json_content = response.json()
# Save the JSON content to a file
with open(output_json_file, 'w') as json_file:
json.dump(json_content, json_file, indent=2)
print(f"Data downloaded and saved to {output_json_file}")
else:
print(f"Failed to download data. Status code: {response.status_code}")
else:
print(f"The file {output_json_file} already exists.")
Data downloaded and saved to ../data/snotel_training_data/all_snotel_cdec_stations.json
3.3.2 Convert JSON Data to CSV#
# read the json file and convert it to csv
csv_file_path = f'{working_dir}/all_snotel_cdec_stations.csv'
if not os.path.exists(csv_file_path):
# Read the JSON file
with open(output_json_file, 'r') as json_file:
json_content = json.load(json_file)
# Check the content (print or analyze as needed)
#print("JSON Content:")
#print(json.dumps(json_content, indent=2))
# Convert JSON data to a list of dictionaries (assuming JSON is a list of objects)
data_list = json_content if isinstance(json_content, list) else [json_content]
# Get the header from the keys of the first dictionary (assuming consistent structure)
header = data_list[0].keys()
# Write to CSV file
with open(csv_file_path, 'w', newline='') as csv_file:
csv_writer = csv.DictWriter(csv_file, fieldnames=header)
csv_writer.writeheader()
csv_writer.writerows(data_list)
print(f"Data converted and saved to {csv_file_path}")
else:
print(f"The csv all snotel/cdec stations exists.")
Data converted and saved to ../data/snotel_training_data/all_snotel_cdec_stations.csv
The above code converts the downloaded snotel station json data to csv and saves the file.
3.3.3 Filter Active Snotel Stations in the Western U.S. and Save as CSV#
active_csv_file_path = f'{working_dir}/all_snotel_cdec_stations_active_in_westus.csv'
if not os.path.exists(active_csv_file_path):
all_df = pd.read_csv(csv_file_path)
print(all_df.head())
all_df['endDate'] = pd.to_datetime(all_df['endDate'])
print(all_df.shape)
end_date = pd.to_datetime('2050-01-01')
filtered_df = all_df[all_df['endDate'] > end_date]
# Filter rows within the latitude and longitude ranges
filtered_df = filtered_df[
(filtered_df['latitude'] >= southwest_lat) & (filtered_df['latitude'] <= northeast_lat) &
(filtered_df['longitude'] >= southwest_lon) & (filtered_df['longitude'] <= northeast_lon)
]
# Print the original and filtered DataFrames
print("Filtered DataFrame:")
print(filtered_df.shape)
filtered_df.to_csv(active_csv_file_path, index=False)
else:
print(f"The active csv already exists: {active_csv_file_path}")
stationTriplet stationId stateCode networkCode name \
0 2057:AL:SCAN 2057 AL SCAN AAMU-JTG
1 ABY:CA:SNOW ABY CA SNOW Abbey
2 0010:ID:COOP 0010 ID COOP Aberdeen Experimnt Stn
3 1F01A:BC:SNOW 1F01A BC SNOW Aberdeen Lake
4 0041:NM:COOP 0041 NM COOP Abiquiu Dam
dcoCode countyName huc elevation latitude longitude \
0 GC Madison 6.030002e+10 860.0 34.78333 -86.55000
1 UN Plumas 1.802012e+11 5650.0 39.95500 -120.53800
2 ID Bingham 1.704021e+11 4410.0 42.95000 -112.83333
3 OR UNKNOWN NaN 4298.0 50.14733 -119.05340
4 UN Rio Arriba 1.302010e+11 6380.0 36.23333 -106.43333
dataTimeZone pedonCode shefId beginDate endDate
0 -6.0 27979 AAMA1 2002-02-23 00:00 2100-01-01 00:00
1 NaN NaN NaN 1963-02-01 00:00 2100-01-01 00:00
2 NaN NaN ABDI1 1914-01-01 00:00 2100-01-01 00:00
3 NaN NaN ABLQ2 1939-04-01 00:00 2100-01-01 00:00
4 NaN NaN ABIN5 1957-01-01 00:00 2100-01-01 00:00
(4414, 16)
Filtered DataFrame:
(3674, 16)
The above code filters out the snotel stations in the western us based on the bounderies specified. And it also filters out the active stations based on the end date field and saved in the file all_snotel_cdec_stations_active_in_westus.csv
which will be used for later.
3.3.4 Retreive Snow Water Equivalent (SWE) from Snotel Stations#
def remove_commented_lines(text):
lines = text.split(os.linesep)
cleaned_lines = []
for line in lines:
if not line.startswith('#'):
cleaned_lines.append(line)
cleaned_text = os.linesep.join(cleaned_lines)
return cleaned_text
Removes lines starting with # from a text.
new_base_station_list_file = f"{working_dir}/all_snotel_cdec_stations_active_in_westus.csv"
new_base_df = pd.read_csv(new_base_station_list_file)
print(new_base_df.head())
csv_file = f'{new_base_station_list_file}_swe_restored_dask_all_vars.csv'
start_date = train_start_date
end_date = train_end_date
# Create an empty Pandas DataFrame with the desired columns
result_df = pd.DataFrame(columns=[
'station_name',
'date',
'lat',
'lon',
'swe_value',
'change_in_swe_inch',
'snow_depth',
'change_in_swe_inch',
'air_temperature_observed_f'
])
stationTriplet stationId stateCode networkCode name \
0 ABY:CA:SNOW ABY CA SNOW Abbey
1 0010:ID:COOP 0010 ID COOP Aberdeen Experimnt Stn
2 0041:NM:COOP 0041 NM COOP Abiquiu Dam
3 08108010:NM:BOR 08108010 NM BOR Abiquiu Reservoir
4 13E19:ID:SNOW 13E19 ID SNOW Above Gilmore
dcoCode countyName huc elevation latitude longitude \
0 UN Plumas 1.802012e+11 5650.0 39.95500 -120.53800
1 ID Bingham 1.704021e+11 4410.0 42.95000 -112.83333
2 UN Rio Arriba 1.302010e+11 6380.0 36.23333 -106.43333
3 CO Rio Arriba 1.302010e+11 6180.0 36.23700 -106.42912
4 ID Lemhi 1.706020e+11 8289.0 44.45615 -113.30097
dataTimeZone pedonCode shefId beginDate endDate
0 NaN NaN NaN 1963-02-01 00:00 2100-01-01
1 NaN NaN ABDI1 1914-01-01 00:00 2100-01-01
2 NaN NaN ABIN5 1957-01-01 00:00 2100-01-01
3 NaN NaN NaN 1964-09-01 00:00 2100-01-01
4 NaN NaN ABGI1 1961-01-01 00:00 2100-01-01
Reads a CSV file containing active station data for the Western U.S. into a DataFrame and initializes an empty DataFrame with specified columns to store results related to snow water equivalent (SWE) and other variables.
3.3.5 Process Data using Dask#
Dask is a flexible parallel computing library for analytic computing. It is designed to scale from a single machine to a cluster of machines. @dask.delayed
decorator is used to mark the process_station function for lazy execution. This means that the function will not execute immediately but will instead create a task graph that can be executed in parallel.
By parallelizing the processing of each station, Dask can utilize multiple CPU cores or even multiple machines, leading to faster execution times.
# Function to process each station
@dask.delayed
def process_station(station):
location_name = station['name']
location_triplet = station['stationTriplet']
location_elevation = station['elevation']
location_station_lat = station['latitude']
location_station_long = station['longitude']
url = f"https://wcc.sc.egov.usda.gov/reportGenerator/view_csv/customSingleStationReport/daily/{location_triplet}%7Cid%3D%22%22%7Cname/{start_date},{end_date}%2C0/WTEQ%3A%3Avalue%2CWTEQ%3A%3Adelta%2CSNWD%3A%3Avalue%2CSNWD%3A%3Adelta%2CTOBS%3A%3Avalue"
r = requests.get(url)
text = remove_commented_lines(r.text)
reader = csv.DictReader(io.StringIO(text))
json_data = json.loads(json.dumps(list(reader)))
entries = []
for entry in json_data:
try:
# {'Date': '2021-06-18', 'Snow Water Equivalent (in) Start of Day Values': '', 'Change In Snow Water Equivalent (in)': '', 'Snow Depth (in) Start of Day Values': '', 'Change In Snow Depth (in)': '', 'Air Temperature Observed (degF) Start of Day Values': '70.5'}
required_data = {
'station_name': location_name,
'date': entry.get('Date', None),
'lat': location_station_lat,
'lon': location_station_long,
'swe_value': entry.get('Snow Water Equivalent (in) Start of Day Values',None),
'change_in_swe_inch': entry.get('Change In Snow Water Equivalent (in)', None),
'snow_depth': entry.get('Snow Depth (in) Start of Day Values',None),
'change_in_swe_inch': entry.get('Change In Snow Depth (in)', None),
'air_temperature_observed_f': entry.get('Air Temperature Observed (degF) Start of Day Values',None)
}
entries.append(required_data)
except Exception as e:
print("entry = ", entry)
raise e
return pd.DataFrame(entries)
location_name = station['name']
location_triplet = station['stationTriplet']
location_elevation = station['elevation']
location_station_lat = station['latitude']
location_station_long = station['longitude']
Extracts various pieces of information about the station from the station
dictionary.
url = f"https://wcc.sc.egov.usda.gov/reportGenerator/view_csv/customSingleStationReport/daily/{location_triplet}%7Cid%3D%22%22%7Cname/{start_date},{end_date}%2C0/WTEQ%3A%3Avalue%2CWTEQ%3A%3Adelta%2CSNWD%3A%3Avalue%2CSNWD%3A%3Adelta%2CTOBS%3A%3Avalue"
Constructs a URL
to request data for the station using its stationTriplet
and the specified date range (start_date
and end_date
)
Sends a GET request to the constructed URL.
Removes commented lines from the response text.
Parses the CSV data into a list of dictionaries (json_data)
Initializes an empty list to store the processed data entries.
for entry in json_data:
try:
required_data = {
'station_name': location_name,
'date': entry.get('Date', None),
'lat': location_station_lat,
'lon': location_station_long,
'swe_value': entry.get('Snow Water Equivalent (in) Start of Day Values', None),
'change_in_swe_inch': entry.get('Change In Snow Water Equivalent (in)', None),
'snow_depth': entry.get('Snow Depth (in) Start of Day Values', None),
'change_in_swe_inch': entry.get('Change In Snow Depth (in)', None),
'air_temperature_observed_f': entry.get('Air Temperature Observed (degF) Start of Day Values', None)
}
entries.append(required_data)
except Exception as e:
print("entry = ", entry)
raise e
For each entry in the json data
parses the json data
Extracts the required fields and constructs a dictionary (required_data) for each entry.Appends the dictionary to the entries list.
At the end converts the list of dictionaries (entries) into a Pandas DataFrame and returns it.
# List of delayed computations for each station
delayed_results = [process_station(row) for _, row in new_base_df.iterrows()]
# Compute the delayed results
result_lists = dask.compute(*delayed_results)
# Concatenate the lists into a Pandas DataFrame
result_df = pd.concat(result_lists, ignore_index=True)
# Print the final Pandas DataFrame
print(result_df.head())
# Save the DataFrame to a CSV file
result_df.to_csv(csv_file, index=False)
station_name date lat lon swe_value change_in_swe_inch \
0 Adams Ranch #1 2022-01-03 34.25 -105.41667
1 Adams Ranch #1 2022-01-04 34.25 -105.41667
2 Adams Ranch #1 2022-01-05 34.25 -105.41667
3 Adams Ranch #1 2022-01-06 34.25 -105.41667
4 Adams Ranch #1 2022-01-07 34.25 -105.41667
snow_depth air_temperature_observed_f
0 21.7
1 36.9
2 42.4
3 45.3
4 45.7
Iterates over each station in the active snotel stations and calls teh process_station
function to process.
collects the delayed computations in a list called delayed_results
.
Uses dask.compute
to execute all delayed computations in parallel.
The *delayed_results
syntax unpacks the list into individual arguments.
The results are stored in result_lists, which is a list of DataFrames.
result_df = pd.concat(result_lists, ignore_index=True)
Concatenates all DataFrames in result_lists into a single DataFrame result_df.
The ignore_index=True
parameter ensures that the index is reset.
Saves the final DataFrame result_df to a CSV file specified by csv_file. The index=False parameter ensures that the index is not included in the CSV file.