Skip to article frontmatterSkip to article content

HDA Climate DT Parameter Plotter - Tutorial

This notebook provides an interactive workflow to select, query, download, and visualize Climate Digital Twin parameters from the DestinE Data Lake using the DEDL HDA API.

🚀 Launch in JupyterHub
Prerequisites:References:Credit:
  • Earthkit and HDA Polytope used in this context are both packages provided by the European Centre for Medium-Range Weather Forecasts (ECMWF).

This notebook demonstrates how to access DT data for different types of simulations via HDA (Harmonized Data Access) API.

A correct combination of parameters is necessary to access DT data for different types of simulations. The aim of this notebook is to help you create the correct request to access the desired Climate DT data, through HDA, and visualize it.

Below the main steps covered by this tutorial.

  1. Setup: Import the required libraries and define some function.

  2. Simulation Selection: Print and select the desired scenario simulation for accessing Climate DT data.

  3. Parameter Selection: How to select the desired Climate DT variable among the ones available through the selected scenario.

  4. Levels Selection: How to Handle different Levels to be selected (if any).

  5. Order and Download: How to order and download Climate DT data.

  6. Plot: How to visualize hourly data on single levels data through Earthkit.

Setup

Import the Climate DT parameters & scenarios dictionary and all the required packages.

import destinelab as deauth
from destinelab import climate_dt_dictionary
import ipywidgets as widgets
import json
import datetime
import importlib
import importlib.metadata as metadata

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import os
from getpass import getpass
from tqdm import tqdm
import time
from urllib.parse import unquote
from time import sleep
from IPython.display import JSON

import sys
import subprocess
from packaging import version

from IPython.display import display, HTML

Define some useful funtions.

#parameters

def filter_entries(search_string):
    return [(name, index) for i, (name, index) in enumerate(model_filtered_options)
            if search_string.lower() in name.lower()]


def on_search_change(change):
    search_string = change.new
    if search_string:
        filtered_options = filter_entries(search_string)
        entry_dropdown.options = filtered_options
    else:
        entry_dropdown.options =model_filtered_options
        
def get_selected_entry():
    return entry_dropdown.value

def get_selected_hours():
    selected_hour = hour_dropdown.value
    return selected_hour

#simulations-scenarios
################################

# Function to generate hourly slots
def generate_hourly_slots():
    hours = []
    for hour in range(0, 24):
        for minute in range(0, 60, 60):  # Step by 60 minutes (1 hour)
            hours.append(datetime.time(hour, minute))
    return hours

def on_scenario_change(change):
    selected_scenario_index, selected_resolution = change.new
    selected_sc_entry = scenario_simulation_table[selected_scenario_index]
    output.clear_output()
    with output:
        if(selected_sc_entry['site']=="MARENOSTRUM"):
            styled_message = widgets.HTML(
                    value="<div style='color: red; font-weight: bold;'>The scenarios in Marenostrum are not available via HDA. Please select a scenario in LUMI</div>"
                )
            display(styled_message)
    date_from = datetime.datetime.strptime(selected_sc_entry['dateFrom'], '%m/%d/%Y').date()
    start_date_picker.max = None
    start_date_picker.min = date_from
    start_date_picker.max = datetime.datetime.strptime(selected_sc_entry['dateTo'], '%m/%d/%Y').date()
    start_date_picker.value = date_from
    
def get_selected_values():
    selected_scenario_index, selected_resolution = scenario_dropdown.value
    selected_scenario = scenario_simulation_table[selected_scenario_index]
    selected_start_date = start_date_picker.value
    selected_end_date = "" # end_date_picker.value
    selected_hour = "00:00:00"
        
    return selected_scenario_index, selected_scenario, selected_resolution, selected_start_date, selected_end_date, selected_hour

# kevels

# Define a global variable
global global_widget
global_widget = None

# Create a function to generate the widget based on the selection mode
def generate_widget(selection_mode):
    global global_widget
    if selection_mode == 'Single':
        global_widget = widgets.Dropdown(options=levelist, description='Select level:')
        return global_widget
    elif selection_mode == 'Multiple':
        global_widget = widgets.SelectMultiple(options=levelist, description='Select levels:')
        return global_widget
    
    
# Function to display the widget based on the selection mode
def display_widget(selection_mode):
    output.clear_output()
    with output:
        display(generate_widget(selection_mode))

# Define a function to handle the change in selection mode
def on_dropdown_change(change):
    display_widget(change.new)
    
# Function to convert tuple or single integer to string separated by "/"
def convert_to_string(input):
    if isinstance(input, tuple):
        return '/'.join(map(str, input))
    elif isinstance(input, int):
        return str(input)
    else:
        return None  # Handle other types if needed

    
def ensure_package(package: str, min_version: str = None):
    """
    Ensure that a given package is installed and optionally meets a minimum version.
    
    Args:
        package (str): The name of the package to check/install.
        min_version (str, optional): Minimum version required. If None, just ensures package is installed.
    """
    try:
        # Get current version
        current_version = metadata.version(package)
        print(f"Current {package} version: {current_version}")
        
        # Compare versions if needed
        if min_version and version.parse(current_version) < version.parse(min_version):
            print(f"Upgrading {package} to >= {min_version} ...")
            subprocess.check_call([sys.executable, "-m", "pip", "install", "--user", f"{package}>={min_version}"])
            print(f"{package} updated successfully")
            return importlib.reload(importlib.import_module(package))
        else:
            print(f"{package} is OK")
            return importlib.import_module(package)
    
    except metadata.PackageNotFoundError:
        print(f"{package} not installed. Installing...")
        install_target = f"{package}>={min_version}" if min_version else package
        subprocess.check_call([sys.executable, "-m", "pip", "install", install_target])
        print(f"{package} installed successfully")
        return importlib.import_module(package)
    

Scenario Simulation Selection

Let’s select now the Scenario from which we want to obtain the Climate Parameter.

We need to ensure to have the right version of the climate DT dictionary.

pd = ensure_package("destinelab", "1.13")

Below the available Climate DT simulations.


#scenario dictionary in a rich table with title per scenario.

scenario_simulation_table=climate_dt_dictionary.climateDT_scenario

for simulation in scenario_simulation_table:
    if simulation["activity"]=="ScenarioMIP" and  simulation.get("realization")==None:
        simulation["type"] = "Future projection"
        if simulation["model"] == "IFS-FESOM":
            simulation["site"] = "MARENOSTRUM"
        else:
            simulation["site"] = "LUMI" 
    elif simulation["activity"]=="ScenarioMIP" and  simulation.get("realization")=="2":
        simulation["type"] = "Future projection (NextGEMS)"
        simulation["site"] = "LUMI"
        simulation["dateFrom"] = "01/20/2020"
    
    if simulation["activity"]=="CMIP6" and  simulation.get("class")=="d1":
        simulation["type"] = "Historical simulation"
        simulation["site"] = "LUMI"
    elif simulation["activity"]=="CMIP6" and  simulation.get("class")=="ng":
        simulation["type"] = "Historical simulation (NextGEMS)"
        simulation["site"] = "LUMI"
    
    if simulation["activity"]=="story-nudging" and  simulation.get("experiment")=="hist":
        simulation["type"] = "Storyline simulation present climate"
        simulation["site"] = "LUMI"
    elif simulation["activity"]=="story-nudging" and  simulation.get("experiment")=="cont":
        simulation["type"] = "Storyline simulation past climate"
        simulation["site"] = "LUMI"
    elif simulation["activity"]=="story-nudging" and  simulation.get("experiment")=="Tplus2.0K":
        simulation["type"] = "Storyline simulation future climate"
        simulation["site"] = "LUMI"
    if simulation["activity"].lower()=="highresmip" and  simulation.get("experiment")=="cont" and  simulation["model"]=="IFS-FESOM":
        simulation["type"] = "Control"
        simulation["site"] = "MARENOSTRUM"
        simulation["activity"] = "HighResMIP"
        
        
new_cont_sc = {"experiment": "cont", "activity": "HighResMIP", "model": "IFS-NEMO"}

if any(all(d.get(k) == v for k, v in new_cont_sc.items())
       for d in scenario_simulation_table):
    print("All control scenarios exist")                
else:
    scenario_simulation_table.append({
    "type": "Control",
    "activity": "HighResMIP",
    "site": "LUMI",
    "experiment": "cont",
    "model": "IFS-NEMO",
    "class": "d1",
    "resolution":  ["high","standard"],
    "dateFrom": "01/01/1990",
    "dateTo": "04/30/2020"
    } )      
        
import pandas as pd
# Convert list of dicts to DataFrame
df = pd.DataFrame(scenario_simulation_table)

# Convert resolution list into a comma-separated string for display
df['resolution'] = df['resolution'].apply(lambda x: ", ".join(x))

# Reorder columns
column_order = ["type", "activity", "model", "resolution", "dateFrom", "dateTo","experiment" , "site", "class"]
df = df[column_order]

# Bold specific fields 
df['type'] = df['type'].apply(lambda x: f"<b>{x}</b>")
df['site'] = df['site'].apply(lambda x: f"<b>{x}</b>")
df['dateFrom'] = df['dateFrom'].apply(lambda x: f"<b>{x}</b>")
df['dateTo'] = df['dateTo'].apply(lambda x: f"<b>{x}</b>")

# Display as HTML without index
display(HTML(df.to_html(escape=False, index=False)))
Note:

Please note that the two scenarios in Marenostrum site are still not available through HDA.

# Create dropdown to select scenario
scenario_dropdown = widgets.Dropdown(
    options=[(f"{entry['type']} - {entry['model']} - {resolution}", (i, resolution)) 
             for i, entry in enumerate(scenario_simulation_table) for resolution in entry['resolution']],
    description='Scenario:', layout=widgets.Layout(width='500px')
)

# Create date picker widgets
start_date_picker = widgets.DatePicker(description='Start Date:', disabled=False, layout=widgets.Layout(width='500px'))

output = widgets.Output()


# handle dropdown event
scenario_dropdown.observe(on_scenario_change, names='value')

# Set initial values directly
selected_sc_entry = scenario_simulation_table[0]

# Convert dateFrom string to date object
date_from = datetime.datetime.strptime(selected_sc_entry['dateFrom'], '%m/%d/%Y').date()

# Set initial values directly
start_date_picker.min = date_from
start_date_picker.max = datetime.datetime.strptime(selected_sc_entry['dateTo'], '%m/%d/%Y').date()
start_date_picker.value = date_from

display(scenario_dropdown, start_date_picker, output)
selected_scenario_index, selected_scenario, selected_resolution, selected_start_date, selected_end_date, selected_hour = get_selected_values()

Parameter Selection

Climate DT variable selection (we limit the plotting to one variable). The variable selection is conditioned by the chosen simulation scenario.

text=widgets.Label(value='Start writing the name of the parameter of interest in the search box and/or open the dropdown menu below')

# Create search box
search_box = widgets.Text(placeholder='Search by parameter name', description='Search:', disabled=False)

#clean the parameters to fix some errors in the current dictionary definition
cleaned_climate_dt_dictionary=[
    d for d in climate_dt_dictionary.climateDT_params
    if not (d.get("param") == "130" and d.get("levtype") == "sfc") 
    and not ((d.get("param") == "129" or d.get("param") == "172") and d.get("levtype") == "sfc" and ("NextGEMS" in selected_scenario['type'])) 
]

model_filtered_options = [
    (entry['paramName'], i)
    for i, entry in enumerate(cleaned_climate_dt_dictionary)
    if (selected_scenario['model'] in (entry['isFESOM'] ,entry['isIcon'],entry['isNemo']))
]


# Create dropdown to select entry
entry_dropdown = widgets.Dropdown(
    options=model_filtered_options,
    description='Select Entry:'
)

# handle search box event
search_box.observe(on_search_change, names='value')

# Display widgets
display(text,search_box, entry_dropdown)

Let’s see the details of the selected parameter (Polytope convention). It is possible to check the its characteristics, time resolution, levels...

selected_index = get_selected_entry()
selected_entry = cleaned_climate_dt_dictionary[selected_index]
print(json.dumps(selected_entry,indent=4))
# Create dropdown to select hour
hourly_slots = generate_hourly_slots()
hour_dropdown = widgets.Dropdown(options=[(str(slot), slot) for slot in hourly_slots], description='Select Hour:', disabled=False)


# Display widgets
if selected_entry["time"] == "Hourly":
    display( hour_dropdown)

Levels Selection

Handle different Levels to be selected (if any)

if selected_entry["levelist"] != "":
    # Convert levelist string to list of integers
    levelist = list(map(int, selected_entry["levelist"].split('/')))
    if(selected_scenario['model']=='IFS-NEMO'):
        levelist = levelist + [73,74,75]

    # Create a dropdown widget to choose selection mode
    selection_mode_dropdown = widgets.Dropdown(options=['Single', 'Multiple'], description='Selection Mode:')

    # Create an output widget to display the selected option(s)
    output = widgets.Output()

    # Register the function to handle dropdown changes
    selection_mode_dropdown.observe(on_dropdown_change, names='value')

    # Display the widgets
    display(selection_mode_dropdown, output)

    # Display the initial widget based on default selection mode
    display_widget('Single')

Order and Download

Obtain Authentication Token

To perform our request we need to be authenticated. Below to request of an authentication token.

DESP_USERNAME = input("Please input your DESP username: ")
DESP_PASSWORD = getpass("Please input your DESP password: ")

auth = deauth.AuthHandler(DESP_USERNAME, DESP_PASSWORD)
access_token = auth.get_token()
if access_token is not None:
    print("DEDL/DESP Access Token Obtained Successfully")
else:
    print("Failed to Obtain DEDL/DESP Access Token")

auth_headers = {"Authorization": f"Bearer {access_token}"}

Check if DT access is granted

If DT access is not granted, you will not be able to execute the rest of the notebook.

installed_version = importlib.metadata.version("destinelab")
version_number = installed_version.split('.')[1]
if((int(version_number) >= 8 and float(installed_version) < 1) or float(installed_version) >= 1):
    auth.is_DTaccess_allowed(access_token)

HDA Endpoint

HDA API is based on the Spatio Temporal Asset Catalog specification (STAC), it is convenient define a costant with its endpoint. And another one with the ID of the Cliamte DT collection.

HDA_STAC_ENDPOINT="https://hda.data.destination-earth.eu/stac/v2"
COLLECTION_ID="EO.ECMWF.DAT.DT_CLIMATE_ADAPTATION"

Convert selected parameters, simulation and levels into HDA request

Convert levels (if any)

levlInput = ""
if global_widget != None:
    # Test cases
    levlInput = convert_to_string(global_widget.value)

Convert the chosen parameters into an HDA request for Climate DT data and print the created filters.

hourchoice4 = '{shour}00'.format(shour = str(get_selected_hours()).split(":")[0])

filter_params = {
  "ecmwf:class" : get_selected_values()[1] .get("class","d1"),
  "ecmwf:dataset": "climate-dt",   # fixed climate-dt access
  "ecmwf:activity" : get_selected_values()[1]["activity"],
  "ecmwf:experiment" : get_selected_values()[1]["experiment"].upper(),
  "ecmwf:model": get_selected_values()[1]["model"],
  "ecmwf:generation": "1",         # fixed Specifies the generation of the dataset, which can be incremented as required (latest is 1)
  "ecmwf:resolution": get_selected_values()[2],      # standard/ high 
  "ecmwf:expver": "0001",          # fixed experiment version 
  "ecmwf:stream": selected_entry["stream"],
  "ecmwf:time": hourchoice4,            # choose the hourly slot(s)
  "ecmwf:type": "fc",              # fixed forecasted fields
  "ecmwf:levtype": selected_entry["levtype"],  
  "ecmwf:levelist": str(levlInput),  
  "ecmwf:param": str(selected_entry["param"]),
  "ecmwf:realization":get_selected_values()[1] .get("realization", "1")
}

# Print the result in JSON format
datechoice = "{fname}T{shour}Z".format(fname = get_selected_values()[3], shour = get_selected_hours() )

# Check if levelist is empty and remove it
if filter_params.get("ecmwf:levelist") == "":
    del filter_params["ecmwf:levelist"]

if selected_entry["time"] == "Daily":
    del filter_params["ecmwf:time"]

hdaFilters = {
    key: {"eq": value}
    for key, value in filter_params.items()
}


print("Polytope API format for the selected values. The json below can be copied and used with the polytope API\n")
polytope_payload = {key.replace("ecmwf:", ""): value for key, value in filter_params.items()}
#add date
date4poly =get_selected_values()[3].strftime("%Y%m%d")
polytope_payload["date"]=date4poly+"/to/"+date4poly
print(json.dumps(polytope_payload, indent=4))

Filtering

Search into asynchronous datasets, as the DTs are, always return a single item:

#Sometimes requests to polytope get timeouts, it is then convenient define a retry strategy
retry_strategy = Retry(
    total=5,  # Total number of retries
    status_forcelist=[500, 502, 503, 504],  # List of 5xx status codes to retry on
    allowed_methods=["GET",'POST'],  # Methods to retry
    backoff_factor=1  # Wait time between retries (exponential backoff)
)

# Create an adapter with the retry strategy
adapter = HTTPAdapter(max_retries=retry_strategy)

# Create a session and mount the adapter
session = requests.Session()
session.mount("https://", adapter)

response = session.post(HDA_STAC_ENDPOINT+"/search", headers=auth_headers, json={
 "collections": ["EO.ECMWF.DAT.DT_CLIMATE_ADAPTATION"],
    "datetime": datechoice,
    "query": hdaFilters
})

if(response.status_code!= 200):
    (print(response.text))
response.raise_for_status()

product = response.json()["features"][0]
JSON(product)

The single item returned (above) contains:

  • The product id: “DT_CLIMATE_ADAPTATION_ORDERABLE_...”, that is a placeholder, its name contains the term “ORDERABLE”.

  • The storage:tier that indicates that the product is “offline”

  • The order:status that indicates that the product is “orderable”

  • Request params used for the order extracted from the search result

ecmwf_properties = {
    key: value for key, value in product.get('properties', {}).items()
    if key.startswith('ecmwf:')
}

print("HDA API format for the selected values. The json below can be copied and used with the HDA order API\n")
print(json.dumps(ecmwf_properties, indent=4))

Order data

We have now all the information to order the data.

From the search results we know that the product is orderable and offline, we then need to order the product we searched for.

response = requests.post(f"{HDA_STAC_ENDPOINT}/collections/{COLLECTION_ID}/order", json=ecmwf_properties, headers=auth_headers)

if response.status_code != 200:
    print(response.content)
response.raise_for_status()

ordered_item = response.json()

product_id = ordered_item["id"]
storage_tier = ordered_item["properties"].get("storage:tier", "online")
order_status = ordered_item["properties"].get("order:status", "unknown")
federation_backend = ordered_item["properties"].get("federation:backends", [None])[0]

print(f"Product ordered: {product_id}")
print(f"Provider: {federation_backend}")
print(f"Storage tier: {storage_tier} (product must have storage tier \"online\" to be downloadable)")
print(f"Order status: {order_status}")    

Poll the API until product is ready

We request the product itself to get an update of its status.

#timeout and step for polling (sec)
TIMEOUT = 300
STEP = 1
ONLINE_STATUS = "online"

self_url = f"{HDA_STAC_ENDPOINT}/collections/{COLLECTION_ID}/items/{product_id}"
item = {}

for i in range(0, TIMEOUT, STEP):
    print(f"Polling {i + 1}/{TIMEOUT // STEP}")

    response = requests.get(self_url, headers=auth_headers)
    if response.status_code != 200:
        print(response.content)
    response.raise_for_status()
    item = response.json()

    storage_tier = item["properties"].get("storage:tier", ONLINE_STATUS)

    if storage_tier == ONLINE_STATUS:
        download_url = item["assets"]["downloadLink"]["href"]
        print("Product is ready to be downloaded.")
        print(f"Asset URL: {download_url}")
        break

    sleep(STEP)
else:
    order_status = item["properties"].get("order:status", "unknown")
    print(f"We could not download the product after {TIMEOUT // STEP} tries. Current order status is {order_status}")
    

Download

response = requests.get(download_url, stream=True, headers=auth_headers)
response.raise_for_status()

content_disposition = response.headers.get('Content-Disposition')
total_size = int(response.headers.get("content-length", 0))
if content_disposition:
    filename = content_disposition.split('filename=')[1].strip('"')
    filename = unquote(filename)
else:
    filename = os.path.basename(url)

# Open a local file in binary write mode and write the content
print(f"downloading {filename}")

with tqdm(total=total_size, unit="B", unit_scale=True) as progress_bar:
    with open(filename, 'wb') as f:
        for data in response.iter_content(1024):
            progress_bar.update(len(data))
            f.write(data)

EarthKit

Lets plot the result file with EarthKit selecting only the Europe area in the plot.

import earthkit.data
import earthkit.plots
import earthkit.regrid

data = earthkit.data.from_source("file", filename)
earthkit.plots.quickplot(data,domain="Europe")