Skip to article frontmatterSkip to article content

Climate DT Parameter - Data Access using DEDL HDA

This notebook authenticates with the DestinE API, queries ECMWF Climate Digital Twin adaptation data based on ScenarioMIP parameters, downloads the selected forecast data using a robust retry mechanism, and visualizes it using EarthKit.

🚀 Launch in JupyterHub
Prerequisites:References:Credit:
  • Earthkit and HDA Polytope used in this context are both packages provided by the European Centre for Medium-Range Weather Forecasts (ECMWF).

This notebook demonstrates how to use the HDA (Harmonized Data Access) API to query and access Climate DT data to plot a parameter series. Below the main steps covered by this tutorial.

  1. Setup: Import the required libraries.

  2. Order and Download: How to filter and download Climate DT data.

  3. Plot: How to visualize hourly data on single levels data through Earthkit.

Setup

Import all the required packages.

import destinelab as deauth
import json
import datetime
import importlib.metadata

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import os
from getpass import getpass
from tqdm import tqdm
import time
from datetime import datetime
from urllib.parse import unquote
from time import sleep
from IPython.display import JSON

Order and Download

Obtain Authentication Token

To access data we need to be authenticated.

Below how to request of an authentication token using the destinelab package.

DESP_USERNAME = input("Please input your DESP username: ")
DESP_PASSWORD = getpass("Please input your DESP password: ")

auth = deauth.AuthHandler(DESP_USERNAME, DESP_PASSWORD)
access_token = auth.get_token()
if access_token is not None:
    print("DEDL/DESP Access Token Obtained Successfully")
else:
    print("Failed to Obtain DEDL/DESP Access Token")

auth_headers = {"Authorization": f"Bearer {access_token}"}

Check if DT access is granted

If DT access is not granted, you will not be able to execute the rest of the notebook.

import importlib
installed_version = importlib.metadata.version("destinelab")
version_number = installed_version.split('.')[1]
if((int(version_number) >= 8 and float(installed_version) < 1) or float(installed_version) >= 1):
    auth.is_DTaccess_allowed(access_token)

HDA Endpoint

HDA API is based on the Spatio Temporal Asset Catalog specification (STAC), it is convenient define a costant with its endpoint. And another one with the ID of the Cliamte DT collection.

HDA_STAC_ENDPOINT="https://hda.data.destination-earth.eu/stac/v2"
COLLECTION_ID="EO.ECMWF.DAT.DT_CLIMATE_ADAPTATION"

Order and download data

Climate Digital Twin collection, as well as all the ECMWF datasets, in the DestinE Data Lake requires an ordering workflow. The ordering workflow consists of the following steps:

  • submit an order with parameters directly to the collection,

  • monitor the returned STAC item status,

  • download the asset when ready.

Note: ECMWF data follows this process, the STAC item in this case will always contain one asset with the requested data.

Submit an Order

To submit an order we need to select our parameters of interest.

datechoice = "20280610"
filter_params = {
        "ecmwf:class": "d1",             # fixed 
        "ecmwf:dataset": "climate-dt",   # fixed climate-dt access
        "ecmwf:activity": "ScenarioMIP", # activity + experiment + model (go together)
        "ecmwf:experiment": "SSP3-7.0",  # activity + experiment + model (go together)
        "ecmwf:model": "IFS-NEMO",       # activity + experiment + model (go together)
        "ecmwf:generation": "1",         # fixed Specifies the generation of the dataset, which can be incremented as required (latest is 1)
        "ecmwf:realization": "1",        # fixed Specifies the climate realization. Default 1. Based on perturbations of initial conditions
        "ecmwf:resolution": "high",      # standard/ high 
        "ecmwf:expver": "0001",          # fixed experiment version 
        "ecmwf:stream": "clte",          # fixed climate
        "ecmwf:time": "0000",            # choose the hourly slot(s)
        "ecmwf:type": "fc",              # fixed forecasted fields
        "ecmwf:levtype": "sfc",          # Surface fields (levtype=sfc), Height level fields (levtype=hl), Pressure level fields (levtype=pl), Model Level (Levtype=ml)
#        "levelist": "1/2/3/...",  # for ml/pl/sol type data
        "ecmwf:param": "134",             # Surface Pressure parameter
        "ecmwf:date":datechoice+"/to/"+datechoice
    }
#timeout and step for polling (sec)
TIMEOUT = 300
STEP = 1
ONLINE_STATUS = "online"

response = requests.post(f"{HDA_STAC_ENDPOINT}/collections/{COLLECTION_ID}/order", json=filter_params, headers=auth_headers)

if response.status_code != 200:
    print(response.content)
response.raise_for_status()

ordered_item = response.json()

product_id = ordered_item["id"]
storage_tier = ordered_item["properties"].get("storage:tier", "online")
order_status = ordered_item["properties"].get("order:status", "unknown")
federation_backend = ordered_item["properties"].get("federation:backends", [None])[0]

print(f"Product ordered: {product_id}")
print(f"Provider: {federation_backend}")
print(f"Storage tier: {storage_tier} (product must have storage tier \"online\" to be downloadable)")
print(f"Order status: {order_status}")      

self_url = f"{HDA_STAC_ENDPOINT}/collections/{COLLECTION_ID}/items/{product_id}"
item = {}

Monitor the status

The status of an order can be:

  • shipping, that means that the order is in progress;

  • succeeded, that means that the order is completed;

  • failed, that means that the order is in error.

for i in range(0, TIMEOUT, STEP):
    print(f"Polling {i + 1}/{TIMEOUT // STEP}")

    response = requests.get(self_url, headers=auth_headers)
    if response.status_code != 200:
        print(response.content)
    response.raise_for_status()
    item = response.json()

    storage_tier = item["properties"].get("storage:tier", ONLINE_STATUS)

    if storage_tier == ONLINE_STATUS:
        download_url = item["assets"]["downloadLink"]["href"]
        print("Product is ready to be downloaded.")
        print(f"Asset URL: {download_url}")
        break
    sleep(STEP)
else:
    order_status = item["properties"].get("order:status", "unknown")
    print(f"We could not download the product after {TIMEOUT // STEP} tries. Current order status is {order_status}")
    
response = requests.get(download_url, stream=True, headers=auth_headers)
response.raise_for_status()

Download

content_disposition = response.headers.get('Content-Disposition')
total_size = int(response.headers.get("content-length", 0))
if content_disposition:
    filename = content_disposition.split('filename=')[1].strip('"')
    filename = unquote(filename)
else:
    filename = os.path.basename(url)

# Open a local file in binary write mode and write the content
print(f"downloading {filename}")

with tqdm(total=total_size, unit="B", unit_scale=True) as progress_bar:
    with open(filename, 'wb') as f:
        for data in response.iter_content(1024):
            progress_bar.update(len(data))
            f.write(data)

EarthKit

Lets plot the result file with EarthKit.

import earthkit.data
import earthkit.plots
import earthkit.regrid

data = earthkit.data.from_source("file", filename)
earthkit.plots.quickplot(data)