HDA Climate DT Parameter Plotter - Tutorial
This notebook provides an interactive workflow to select, query, download, and visualize Climate Digital Twin parameters from the DestinE Data Lake using the DEDL HDA API.
To search and access DEDL data a DestinE user account is needed
To search and access DT data an upgraded access is needed.
Earthkit and HDA Polytope used in this context are both packages provided by the European Centre for Medium-Range Weather Forecasts (ECMWF).
This notebook demonstrates how to access DT data for different types of simulations via HDA (Harmonized Data Access) API.
A correct combination of parameters is necessary to access DT data for different types of simulations. The aim of this notebook is to help you create the correct request to access the desired Climate DT data, through HDA, and visualize it.
Below the main steps covered by this tutorial.
Setup: Import the required libraries and define some function.
Simulation Selection: Print and select the desired scenario simulation for accessing Climate DT data.
Parameter Selection: How to select the desired Climate DT variable among the ones available through the selected scenario.
Levels Selection: How to Handle different Levels to be selected (if any).
Order and Download: How to order and download Climate DT data.
Plot: How to visualize hourly data on single levels data through Earthkit.
Setup¶
Import the Climate DT parameters & scenarios dictionary and all the required packages.
import destinelab as deauth
from destinelab import climate_dt_dictionary
import ipywidgets as widgets
import json
import datetime
import importlib
import importlib.metadata as metadata
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import os
from getpass import getpass
from tqdm import tqdm
import time
from urllib.parse import unquote
from time import sleep
from IPython.display import JSON
import sys
import subprocess
from packaging import version
from IPython.display import display, HTMLDefine some useful funtions.
#parameters
def filter_entries(search_string):
return [(name, index) for i, (name, index) in enumerate(model_filtered_options)
if search_string.lower() in name.lower()]
def on_search_change(change):
search_string = change.new
if search_string:
filtered_options = filter_entries(search_string)
entry_dropdown.options = filtered_options
else:
entry_dropdown.options =model_filtered_options
def get_selected_entry():
return entry_dropdown.value
def get_selected_hours():
selected_hour = hour_dropdown.value
return selected_hour
#simulations-scenarios
################################
# Function to generate hourly slots
def generate_hourly_slots():
hours = []
for hour in range(0, 24):
for minute in range(0, 60, 60): # Step by 60 minutes (1 hour)
hours.append(datetime.time(hour, minute))
return hours
def on_scenario_change(change):
selected_scenario_index, selected_resolution = change.new
selected_sc_entry = scenario_simulation_table[selected_scenario_index]
output.clear_output()
with output:
if(selected_sc_entry['site']=="MARENOSTRUM"):
styled_message = widgets.HTML(
value="<div style='color: red; font-weight: bold;'>The scenarios in Marenostrum are not available via HDA. Please select a scenario in LUMI</div>"
)
display(styled_message)
date_from = datetime.datetime.strptime(selected_sc_entry['dateFrom'], '%m/%d/%Y').date()
start_date_picker.max = None
start_date_picker.min = date_from
start_date_picker.max = datetime.datetime.strptime(selected_sc_entry['dateTo'], '%m/%d/%Y').date()
start_date_picker.value = date_from
def get_selected_values():
selected_scenario_index, selected_resolution = scenario_dropdown.value
selected_scenario = scenario_simulation_table[selected_scenario_index]
selected_start_date = start_date_picker.value
selected_end_date = "" # end_date_picker.value
selected_hour = "00:00:00"
return selected_scenario_index, selected_scenario, selected_resolution, selected_start_date, selected_end_date, selected_hour
# kevels
# Define a global variable
global global_widget
global_widget = None
# Create a function to generate the widget based on the selection mode
def generate_widget(selection_mode):
global global_widget
if selection_mode == 'Single':
global_widget = widgets.Dropdown(options=levelist, description='Select level:')
return global_widget
elif selection_mode == 'Multiple':
global_widget = widgets.SelectMultiple(options=levelist, description='Select levels:')
return global_widget
# Function to display the widget based on the selection mode
def display_widget(selection_mode):
output.clear_output()
with output:
display(generate_widget(selection_mode))
# Define a function to handle the change in selection mode
def on_dropdown_change(change):
display_widget(change.new)
# Function to convert tuple or single integer to string separated by "/"
def convert_to_string(input):
if isinstance(input, tuple):
return '/'.join(map(str, input))
elif isinstance(input, int):
return str(input)
else:
return None # Handle other types if needed
def ensure_package(package: str, min_version: str = None):
"""
Ensure that a given package is installed and optionally meets a minimum version.
Args:
package (str): The name of the package to check/install.
min_version (str, optional): Minimum version required. If None, just ensures package is installed.
"""
try:
# Get current version
current_version = metadata.version(package)
print(f"Current {package} version: {current_version}")
# Compare versions if needed
if min_version and version.parse(current_version) < version.parse(min_version):
print(f"Upgrading {package} to >= {min_version} ...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "--user", f"{package}>={min_version}"])
print(f"{package} updated successfully")
return importlib.reload(importlib.import_module(package))
else:
print(f"{package} is OK")
return importlib.import_module(package)
except metadata.PackageNotFoundError:
print(f"{package} not installed. Installing...")
install_target = f"{package}>={min_version}" if min_version else package
subprocess.check_call([sys.executable, "-m", "pip", "install", install_target])
print(f"{package} installed successfully")
return importlib.import_module(package)
Scenario Simulation Selection¶
Let’s select now the Scenario from which we want to obtain the Climate Parameter.
We need to ensure to have the right version of the climate DT dictionary.
pd = ensure_package("destinelab", "1.13")Below the available Climate DT simulations.
#scenario dictionary in a rich table with title per scenario.
scenario_simulation_table=climate_dt_dictionary.climateDT_scenario
for simulation in scenario_simulation_table:
if simulation["activity"]=="ScenarioMIP" and simulation.get("realization")==None:
simulation["type"] = "Future projection"
if simulation["model"] == "IFS-FESOM":
simulation["site"] = "MARENOSTRUM"
else:
simulation["site"] = "LUMI"
elif simulation["activity"]=="ScenarioMIP" and simulation.get("realization")=="2":
simulation["type"] = "Future projection (NextGEMS)"
simulation["site"] = "LUMI"
simulation["dateFrom"] = "01/20/2020"
if simulation["activity"]=="CMIP6" and simulation.get("class")=="d1":
simulation["type"] = "Historical simulation"
simulation["site"] = "LUMI"
elif simulation["activity"]=="CMIP6" and simulation.get("class")=="ng":
simulation["type"] = "Historical simulation (NextGEMS)"
simulation["site"] = "LUMI"
if simulation["activity"]=="story-nudging" and simulation.get("experiment")=="hist":
simulation["type"] = "Storyline simulation present climate"
simulation["site"] = "LUMI"
elif simulation["activity"]=="story-nudging" and simulation.get("experiment")=="cont":
simulation["type"] = "Storyline simulation past climate"
simulation["site"] = "LUMI"
elif simulation["activity"]=="story-nudging" and simulation.get("experiment")=="Tplus2.0K":
simulation["type"] = "Storyline simulation future climate"
simulation["site"] = "LUMI"
if simulation["activity"].lower()=="highresmip" and simulation.get("experiment")=="cont" and simulation["model"]=="IFS-FESOM":
simulation["type"] = "Control"
simulation["site"] = "MARENOSTRUM"
simulation["activity"] = "HighResMIP"
new_cont_sc = {"experiment": "cont", "activity": "HighResMIP", "model": "IFS-NEMO"}
if any(all(d.get(k) == v for k, v in new_cont_sc.items())
for d in scenario_simulation_table):
print("All control scenarios exist")
else:
scenario_simulation_table.append({
"type": "Control",
"activity": "HighResMIP",
"site": "LUMI",
"experiment": "cont",
"model": "IFS-NEMO",
"class": "d1",
"resolution": ["high","standard"],
"dateFrom": "01/01/1990",
"dateTo": "04/30/2020"
} )
import pandas as pd
# Convert list of dicts to DataFrame
df = pd.DataFrame(scenario_simulation_table)
# Convert resolution list into a comma-separated string for display
df['resolution'] = df['resolution'].apply(lambda x: ", ".join(x))
# Reorder columns
column_order = ["type", "activity", "model", "resolution", "dateFrom", "dateTo","experiment" , "site", "class"]
df = df[column_order]
# Bold specific fields
df['type'] = df['type'].apply(lambda x: f"<b>{x}</b>")
df['site'] = df['site'].apply(lambda x: f"<b>{x}</b>")
df['dateFrom'] = df['dateFrom'].apply(lambda x: f"<b>{x}</b>")
df['dateTo'] = df['dateTo'].apply(lambda x: f"<b>{x}</b>")
# Display as HTML without index
display(HTML(df.to_html(escape=False, index=False)))Please note that the two scenarios in Marenostrum site are still not available through HDA.
# Create dropdown to select scenario
scenario_dropdown = widgets.Dropdown(
options=[(f"{entry['type']} - {entry['model']} - {resolution}", (i, resolution))
for i, entry in enumerate(scenario_simulation_table) for resolution in entry['resolution']],
description='Scenario:', layout=widgets.Layout(width='500px')
)
# Create date picker widgets
start_date_picker = widgets.DatePicker(description='Start Date:', disabled=False, layout=widgets.Layout(width='500px'))
output = widgets.Output()
# handle dropdown event
scenario_dropdown.observe(on_scenario_change, names='value')
# Set initial values directly
selected_sc_entry = scenario_simulation_table[0]
# Convert dateFrom string to date object
date_from = datetime.datetime.strptime(selected_sc_entry['dateFrom'], '%m/%d/%Y').date()
# Set initial values directly
start_date_picker.min = date_from
start_date_picker.max = datetime.datetime.strptime(selected_sc_entry['dateTo'], '%m/%d/%Y').date()
start_date_picker.value = date_from
display(scenario_dropdown, start_date_picker, output)selected_scenario_index, selected_scenario, selected_resolution, selected_start_date, selected_end_date, selected_hour = get_selected_values()Parameter Selection¶
Climate DT variable selection (we limit the plotting to one variable). The variable selection is conditioned by the chosen simulation scenario.
text=widgets.Label(value='Start writing the name of the parameter of interest in the search box and/or open the dropdown menu below')
# Create search box
search_box = widgets.Text(placeholder='Search by parameter name', description='Search:', disabled=False)
#clean the parameters to fix some errors in the current dictionary definition
cleaned_climate_dt_dictionary=[
d for d in climate_dt_dictionary.climateDT_params
if not (d.get("param") == "130" and d.get("levtype") == "sfc")
and not ((d.get("param") == "129" or d.get("param") == "172") and d.get("levtype") == "sfc" and ("NextGEMS" in selected_scenario['type']))
]
model_filtered_options = [
(entry['paramName'], i)
for i, entry in enumerate(cleaned_climate_dt_dictionary)
if (selected_scenario['model'] in (entry['isFESOM'] ,entry['isIcon'],entry['isNemo']))
]
# Create dropdown to select entry
entry_dropdown = widgets.Dropdown(
options=model_filtered_options,
description='Select Entry:'
)
# handle search box event
search_box.observe(on_search_change, names='value')
# Display widgets
display(text,search_box, entry_dropdown)
Let’s see the details of the selected parameter (Polytope convention). It is possible to check the its characteristics, time resolution, levels...
selected_index = get_selected_entry()
selected_entry = cleaned_climate_dt_dictionary[selected_index]
print(json.dumps(selected_entry,indent=4))# Create dropdown to select hour
hourly_slots = generate_hourly_slots()
hour_dropdown = widgets.Dropdown(options=[(str(slot), slot) for slot in hourly_slots], description='Select Hour:', disabled=False)
# Display widgets
if selected_entry["time"] == "Hourly":
display( hour_dropdown)Levels Selection¶
Handle different Levels to be selected (if any)
if selected_entry["levelist"] != "":
# Convert levelist string to list of integers
levelist = list(map(int, selected_entry["levelist"].split('/')))
if(selected_scenario['model']=='IFS-NEMO'):
levelist = levelist + [73,74,75]
# Create a dropdown widget to choose selection mode
selection_mode_dropdown = widgets.Dropdown(options=['Single', 'Multiple'], description='Selection Mode:')
# Create an output widget to display the selected option(s)
output = widgets.Output()
# Register the function to handle dropdown changes
selection_mode_dropdown.observe(on_dropdown_change, names='value')
# Display the widgets
display(selection_mode_dropdown, output)
# Display the initial widget based on default selection mode
display_widget('Single')Order and Download¶
Obtain Authentication Token¶
To perform our request we need to be authenticated. Below to request of an authentication token.
DESP_USERNAME = input("Please input your DESP username: ")
DESP_PASSWORD = getpass("Please input your DESP password: ")
auth = deauth.AuthHandler(DESP_USERNAME, DESP_PASSWORD)
access_token = auth.get_token()
if access_token is not None:
print("DEDL/DESP Access Token Obtained Successfully")
else:
print("Failed to Obtain DEDL/DESP Access Token")
auth_headers = {"Authorization": f"Bearer {access_token}"}Check if DT access is granted¶
If DT access is not granted, you will not be able to execute the rest of the notebook.
installed_version = importlib.metadata.version("destinelab")
version_number = installed_version.split('.')[1]
if((int(version_number) >= 8 and float(installed_version) < 1) or float(installed_version) >= 1):
auth.is_DTaccess_allowed(access_token)HDA Endpoint¶
HDA API is based on the Spatio Temporal Asset Catalog specification (STAC), it is convenient define a costant with its endpoint. And another one with the ID of the Cliamte DT collection.
HDA_STAC_ENDPOINT="https://hda.data.destination-earth.eu/stac/v2"
COLLECTION_ID="EO.ECMWF.DAT.DT_CLIMATE_ADAPTATION"Convert selected parameters, simulation and levels into HDA request¶
Convert levels (if any)
levlInput = ""
if global_widget != None:
# Test cases
levlInput = convert_to_string(global_widget.value)Convert the chosen parameters into an HDA request for Climate DT data and print the created filters.
hourchoice4 = '{shour}00'.format(shour = str(get_selected_hours()).split(":")[0])
filter_params = {
"ecmwf:class" : get_selected_values()[1] .get("class","d1"),
"ecmwf:dataset": "climate-dt", # fixed climate-dt access
"ecmwf:activity" : get_selected_values()[1]["activity"],
"ecmwf:experiment" : get_selected_values()[1]["experiment"].upper(),
"ecmwf:model": get_selected_values()[1]["model"],
"ecmwf:generation": "1", # fixed Specifies the generation of the dataset, which can be incremented as required (latest is 1)
"ecmwf:resolution": get_selected_values()[2], # standard/ high
"ecmwf:expver": "0001", # fixed experiment version
"ecmwf:stream": selected_entry["stream"],
"ecmwf:time": hourchoice4, # choose the hourly slot(s)
"ecmwf:type": "fc", # fixed forecasted fields
"ecmwf:levtype": selected_entry["levtype"],
"ecmwf:levelist": str(levlInput),
"ecmwf:param": str(selected_entry["param"]),
"ecmwf:realization":get_selected_values()[1] .get("realization", "1")
}
# Print the result in JSON format
datechoice = "{fname}T{shour}Z".format(fname = get_selected_values()[3], shour = get_selected_hours() )
# Check if levelist is empty and remove it
if filter_params.get("ecmwf:levelist") == "":
del filter_params["ecmwf:levelist"]
if selected_entry["time"] == "Daily":
del filter_params["ecmwf:time"]
hdaFilters = {
key: {"eq": value}
for key, value in filter_params.items()
}
print("Polytope API format for the selected values. The json below can be copied and used with the polytope API\n")
polytope_payload = {key.replace("ecmwf:", ""): value for key, value in filter_params.items()}
#add date
date4poly =get_selected_values()[3].strftime("%Y%m%d")
polytope_payload["date"]=date4poly+"/to/"+date4poly
print(json.dumps(polytope_payload, indent=4))Filtering¶
Search into asynchronous datasets, as the DTs are, always return a single item:
#Sometimes requests to polytope get timeouts, it is then convenient define a retry strategy
retry_strategy = Retry(
total=5, # Total number of retries
status_forcelist=[500, 502, 503, 504], # List of 5xx status codes to retry on
allowed_methods=["GET",'POST'], # Methods to retry
backoff_factor=1 # Wait time between retries (exponential backoff)
)
# Create an adapter with the retry strategy
adapter = HTTPAdapter(max_retries=retry_strategy)
# Create a session and mount the adapter
session = requests.Session()
session.mount("https://", adapter)
response = session.post(HDA_STAC_ENDPOINT+"/search", headers=auth_headers, json={
"collections": ["EO.ECMWF.DAT.DT_CLIMATE_ADAPTATION"],
"datetime": datechoice,
"query": hdaFilters
})
if(response.status_code!= 200):
(print(response.text))
response.raise_for_status()
product = response.json()["features"][0]
JSON(product)The single item returned (above) contains:
The product id: “DT_CLIMATE_ADAPTATION_ORDERABLE_...”, that is a placeholder, its name contains the term “ORDERABLE”.
The storage:tier that indicates that the product is “offline”
The order:status that indicates that the product is “orderable”
Request params used for the order extracted from the search result
ecmwf_properties = {
key: value for key, value in product.get('properties', {}).items()
if key.startswith('ecmwf:')
}
print("HDA API format for the selected values. The json below can be copied and used with the HDA order API\n")
print(json.dumps(ecmwf_properties, indent=4))Order data¶
We have now all the information to order the data.
From the search results we know that the product is orderable and offline, we then need to order the product we searched for.
response = requests.post(f"{HDA_STAC_ENDPOINT}/collections/{COLLECTION_ID}/order", json=ecmwf_properties, headers=auth_headers)
if response.status_code != 200:
print(response.content)
response.raise_for_status()
ordered_item = response.json()
product_id = ordered_item["id"]
storage_tier = ordered_item["properties"].get("storage:tier", "online")
order_status = ordered_item["properties"].get("order:status", "unknown")
federation_backend = ordered_item["properties"].get("federation:backends", [None])[0]
print(f"Product ordered: {product_id}")
print(f"Provider: {federation_backend}")
print(f"Storage tier: {storage_tier} (product must have storage tier \"online\" to be downloadable)")
print(f"Order status: {order_status}") Poll the API until product is ready¶
We request the product itself to get an update of its status.
#timeout and step for polling (sec)
TIMEOUT = 300
STEP = 1
ONLINE_STATUS = "online"
self_url = f"{HDA_STAC_ENDPOINT}/collections/{COLLECTION_ID}/items/{product_id}"
item = {}
for i in range(0, TIMEOUT, STEP):
print(f"Polling {i + 1}/{TIMEOUT // STEP}")
response = requests.get(self_url, headers=auth_headers)
if response.status_code != 200:
print(response.content)
response.raise_for_status()
item = response.json()
storage_tier = item["properties"].get("storage:tier", ONLINE_STATUS)
if storage_tier == ONLINE_STATUS:
download_url = item["assets"]["downloadLink"]["href"]
print("Product is ready to be downloaded.")
print(f"Asset URL: {download_url}")
break
sleep(STEP)
else:
order_status = item["properties"].get("order:status", "unknown")
print(f"We could not download the product after {TIMEOUT // STEP} tries. Current order status is {order_status}")
Download¶
response = requests.get(download_url, stream=True, headers=auth_headers)
response.raise_for_status()
content_disposition = response.headers.get('Content-Disposition')
total_size = int(response.headers.get("content-length", 0))
if content_disposition:
filename = content_disposition.split('filename=')[1].strip('"')
filename = unquote(filename)
else:
filename = os.path.basename(url)
# Open a local file in binary write mode and write the content
print(f"downloading {filename}")
with tqdm(total=total_size, unit="B", unit_scale=True) as progress_bar:
with open(filename, 'wb') as f:
for data in response.iter_content(1024):
progress_bar.update(len(data))
f.write(data)import earthkit.data
import earthkit.plots
import earthkit.regrid
data = earthkit.data.from_source("file", filename)
earthkit.plots.quickplot(data,domain="Europe")