Climate DT Parameter - Data Access using DEDL HDA
This notebook authenticates with the DestinE API, queries ECMWF Climate Digital Twin adaptation data based on ScenarioMIP parameters, downloads the selected forecast data using a robust retry mechanism, and visualizes it using EarthKit.
To search and access DEDL data a DestinE user account is needed
To search and access DT data an upgraded access is needed.
Earthkit and HDA Polytope used in this context are both packages provided by the European Centre for Medium-Range Weather Forecasts (ECMWF).
This notebook demonstrates how to use the HDA (Harmonized Data Access) API to query and access Climate DT data to plot a parameter series. Below the main steps covered by this tutorial.
Setup: Import the required libraries.
Order and Download: How to filter and download Climate DT data.
Plot: How to visualize hourly data on single levels data through Earthkit.
Setup¶
Import all the required packages.
import destinelab as deauth
import json
import datetime
import importlib.metadata
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import os
from getpass import getpass
from tqdm import tqdm
import time
from datetime import datetime
from urllib.parse import unquote
from time import sleep
from IPython.display import JSONOrder and Download¶
Obtain Authentication Token¶
To access data we need to be authenticated.
Below how to request of an authentication token using the destinelab package.
DESP_USERNAME = input("Please input your DESP username: ")
DESP_PASSWORD = getpass("Please input your DESP password: ")
auth = deauth.AuthHandler(DESP_USERNAME, DESP_PASSWORD)
access_token = auth.get_token()
if access_token is not None:
print("DEDL/DESP Access Token Obtained Successfully")
else:
print("Failed to Obtain DEDL/DESP Access Token")
auth_headers = {"Authorization": f"Bearer {access_token}"}Check if DT access is granted¶
If DT access is not granted, you will not be able to execute the rest of the notebook.
import importlib
installed_version = importlib.metadata.version("destinelab")
version_number = installed_version.split('.')[1]
if((int(version_number) >= 8 and float(installed_version) < 1) or float(installed_version) >= 1):
auth.is_DTaccess_allowed(access_token)HDA Endpoint¶
HDA API is based on the Spatio Temporal Asset Catalog specification (STAC), it is convenient define a costant with its endpoint. And another one with the ID of the Cliamte DT collection.
HDA_STAC_ENDPOINT="https://hda.data.destination-earth.eu/stac/v2"
COLLECTION_ID="EO.ECMWF.DAT.DT_CLIMATE_ADAPTATION"Order and download data¶
Climate Digital Twin collection, as well as all the ECMWF datasets, in the DestinE Data Lake requires an ordering workflow. The ordering workflow consists of the following steps:
submit an order with parameters directly to the collection,
monitor the returned STAC item status,
download the asset when ready.
Note: ECMWF data follows this process, the STAC item in this case will always contain one asset with the requested data.
Submit an Order¶
To submit an order we need to select our parameters of interest.
datechoice = "20280610"
filter_params = {
"ecmwf:class": "d1", # fixed
"ecmwf:dataset": "climate-dt", # fixed climate-dt access
"ecmwf:activity": "ScenarioMIP", # activity + experiment + model (go together)
"ecmwf:experiment": "SSP3-7.0", # activity + experiment + model (go together)
"ecmwf:model": "IFS-NEMO", # activity + experiment + model (go together)
"ecmwf:generation": "1", # fixed Specifies the generation of the dataset, which can be incremented as required (latest is 1)
"ecmwf:realization": "1", # fixed Specifies the climate realization. Default 1. Based on perturbations of initial conditions
"ecmwf:resolution": "high", # standard/ high
"ecmwf:expver": "0001", # fixed experiment version
"ecmwf:stream": "clte", # fixed climate
"ecmwf:time": "0000", # choose the hourly slot(s)
"ecmwf:type": "fc", # fixed forecasted fields
"ecmwf:levtype": "sfc", # Surface fields (levtype=sfc), Height level fields (levtype=hl), Pressure level fields (levtype=pl), Model Level (Levtype=ml)
# "levelist": "1/2/3/...", # for ml/pl/sol type data
"ecmwf:param": "134", # Surface Pressure parameter
"ecmwf:date":datechoice+"/to/"+datechoice
}#timeout and step for polling (sec)
TIMEOUT = 300
STEP = 1
ONLINE_STATUS = "online"
response = requests.post(f"{HDA_STAC_ENDPOINT}/collections/{COLLECTION_ID}/order", json=filter_params, headers=auth_headers)
if response.status_code != 200:
print(response.content)
response.raise_for_status()
ordered_item = response.json()
product_id = ordered_item["id"]
storage_tier = ordered_item["properties"].get("storage:tier", "online")
order_status = ordered_item["properties"].get("order:status", "unknown")
federation_backend = ordered_item["properties"].get("federation:backends", [None])[0]
print(f"Product ordered: {product_id}")
print(f"Provider: {federation_backend}")
print(f"Storage tier: {storage_tier} (product must have storage tier \"online\" to be downloadable)")
print(f"Order status: {order_status}")
self_url = f"{HDA_STAC_ENDPOINT}/collections/{COLLECTION_ID}/items/{product_id}"
item = {}
Monitor the status¶
The status of an order can be:
shipping, that means that the order is in progress;
succeeded, that means that the order is completed;
failed, that means that the order is in error.
for i in range(0, TIMEOUT, STEP):
print(f"Polling {i + 1}/{TIMEOUT // STEP}")
response = requests.get(self_url, headers=auth_headers)
if response.status_code != 200:
print(response.content)
response.raise_for_status()
item = response.json()
storage_tier = item["properties"].get("storage:tier", ONLINE_STATUS)
if storage_tier == ONLINE_STATUS:
download_url = item["assets"]["downloadLink"]["href"]
print("Product is ready to be downloaded.")
print(f"Asset URL: {download_url}")
break
sleep(STEP)
else:
order_status = item["properties"].get("order:status", "unknown")
print(f"We could not download the product after {TIMEOUT // STEP} tries. Current order status is {order_status}")
response = requests.get(download_url, stream=True, headers=auth_headers)
response.raise_for_status()Download¶
content_disposition = response.headers.get('Content-Disposition')
total_size = int(response.headers.get("content-length", 0))
if content_disposition:
filename = content_disposition.split('filename=')[1].strip('"')
filename = unquote(filename)
else:
filename = os.path.basename(url)
# Open a local file in binary write mode and write the content
print(f"downloading {filename}")
with tqdm(total=total_size, unit="B", unit_scale=True) as progress_bar:
with open(filename, 'wb') as f:
for data in response.iter_content(1024):
progress_bar.update(len(data))
f.write(data)import earthkit.data
import earthkit.plots
import earthkit.regrid
data = earthkit.data.from_source("file", filename)
earthkit.plots.quickplot(data)