Skip to article frontmatterSkip to article content

HDA Extract Location Values - Tutorial

This notebook provides a hands-on introduction to working with data from the Destination Earth Climate Digital Twin (DT).

🚀 Launch in JupyterHub

Is it going to rain in the next 3 weekends?

This notebook provides a hands-on introduction to working with data from the Destination Earth Climate Digital Twin (DT). The goal is to give you a quick taste of how to access, explore, and handle DT data. It’s not meant to be a scientific study, but rather an accessible walkthrough that highlights some of the possibilities the data offers.

“A simple question with local impact: will it rain here soon?”

In this exercise we will:

  1. Select the right variable/scenario from the Climate DT in order to have precipitation forecasts.

  2. Order the Climate DT data

  3. Extract values for your place location.

  4. Aggregate rainfall totals over each weekend and report the answer in text and in a plot

Setup

Let’s import useful packages

import destinelab as deauth
import json
import importlib
import importlib.metadata as metadata
import requests
from requests.adapters import HTTPAdapter
import os
from getpass import getpass
from tqdm import tqdm
import time
from urllib.parse import unquote
from time import sleep
from IPython.display import JSON
import sys
from packaging import version
from IPython.display import display, HTML
from datetime import datetime, timedelta
import pandas as pd
import xarray as xr
import matplotlib.pyplot as plt

1 - Select the right variable/scenario from the Climate DT in order to have precipitation forecasts.

Use the Simulation Selection to select the desired scenario simulation for Climate DT data and Parameter Selection to select the total precipitation variable for the selected scenario. We then need to select our time range of interest starting from the next Friday.

The selection will give you the json payload to place the order via HDA.

order_body={
'ecmwf:resolution': 'high',
 'ecmwf:model': 'IFS-NEMO',
 'ecmwf:type': 'fc',
 'ecmwf:dataset': 'climate-dt',
 'ecmwf:param': '228',
 'ecmwf:levtype': 'sfc',
 'ecmwf:expver': '0001',
 'ecmwf:date': '20251002/to/20251020',
 'ecmwf:realization': '1',
 'ecmwf:activity': 'ScenarioMIP',
 'ecmwf:class': 'd1',
 'ecmwf:stream': 'clte',
 'ecmwf:time': '2300',
 'ecmwf:experiment': 'SSP3-7.0',
 'ecmwf:generation': '1'}

To place the order we need to authenticate using our DESP account

DESP_USERNAME = input("Please input your DESP username: ")
DESP_PASSWORD = getpass("Please input your DESP password: ")

auth = deauth.AuthHandler(DESP_USERNAME, DESP_PASSWORD)
access_token = auth.get_token()
if access_token is not None:
    print("DEDL/DESP Access Token Obtained Successfully")
else:
    print("Failed to Obtain DEDL/DESP Access Token")

auth_headers = {"Authorization": f"Bearer {access_token}"}

Check if we can access DT

installed_version = importlib.metadata.version("destinelab")
version_number = installed_version.split('.')[1]

cond1 = int(version_number) >= 8 
cond2 = float(installed_version) < 1
cond3 = float(installed_version) >= 1

if (cond1 and cond2) or cond3:
    auth.is_DTaccess_allowed(access_token)

2 - Order the Climate DT data

HDA_STAC_ENDPOINT="https://hda.data.destination-earth.eu/stac/v2"
COLLECTION_ID="EO.ECMWF.DAT.DT_CLIMATE_ADAPTATION"
response = requests.post(f"{HDA_STAC_ENDPOINT}/collections/{COLLECTION_ID}/order", json=order_body, headers=auth_headers)

if response.status_code != 200:
    print(response.content)
response.raise_for_status()

ordered_item = response.json()

product_id = ordered_item["id"]
storage_tier = ordered_item["properties"].get("storage:tier", "online")
order_status = ordered_item["properties"].get("order:status", "unknown")
federation_backend = ordered_item["properties"].get("federation:backends", [None])[0]

print(f"Product ordered: {product_id}")
print(f"Provider: {federation_backend}")
print(f"Storage tier: {storage_tier} (product must have storage tier \"online\" to be downloadable)")
print(f"Order status: {order_status}")    
#timeout and step for polling (sec)
TIMEOUT = 300
STEP = 1
ONLINE_STATUS = "online"

self_url = f"{HDA_STAC_ENDPOINT}/collections/{COLLECTION_ID}/items/{product_id}"
item = {}

for i in range(0, TIMEOUT, STEP):
    print(f"Polling {i + 1}/{TIMEOUT // STEP}")

    response = requests.get(self_url, headers=auth_headers)
    if response.status_code != 200:
        print(response.content)
    response.raise_for_status()
    item = response.json()

    storage_tier = item["properties"].get("storage:tier", ONLINE_STATUS)

    if storage_tier == ONLINE_STATUS:
        download_url = item["assets"]["downloadLink"]["href"]
        print("Product is ready to be downloaded.")
        print(f"Asset URL: {download_url}")
        break

    start = time.perf_counter()
    while time.perf_counter() - start < 1:
        pass  # do nothing for ~1 second
else:
    order_status = item["properties"].get("order:status", "unknown")
    print(f"We could not download the product after {TIMEOUT // STEP} tries. Current order status is {order_status}")
response = requests.get(download_url, stream=True, headers=auth_headers)
response.raise_for_status()

content_disposition = response.headers.get('Content-Disposition')
total_size = int(response.headers.get("content-length", 0))
if content_disposition:
    filename = content_disposition.split('filename=')[1].strip('"')
    filename = unquote(filename)
else:
    filename = os.path.basename(self_url)

# Open a local file in binary write mode and write the content
print(f"downloading {filename}")

with tqdm(total=total_size, unit="B", unit_scale=True) as progress_bar:
    with open(filename, 'wb') as f:
        for data in response.iter_content(1024):
            progress_bar.update(len(data))
            f.write(data)

3 - Extract values for your place location

#Riga
your_place_lon= 24.11
your_place_lat= 56.95
#London
your_place_lon= -0.128
your_place_lat= 51.508
#Darmstad
your_place_lon= 8.65
your_place_lat=49.88
#Milan
your_place_lon= 9.25
your_place_lat=45.46
from earthkit.data import from_source
import earthkit.data as ekd
from earthkit.geo import  nearest_point_kdtree

ds = from_source("file",filename)
ds.ls()
import earthkit.data as ekd
from earthkit.geo import nearest_point_haversine

latlon = ds.to_latlon()
lat = latlon["lat"]
lon = latlon["lon"]

p_ref = (your_place_lat, your_place_lon)
idx, dist = nearest_point_haversine(p_ref, (lat, lon))
idx, dist
ds_xr=ds.to_xarray()
subset = ds_xr.isel(values=idx)
subset
weekends = pd.date_range("2025-10-02", periods=3, freq="7D")

weekends
results = {}

for start in weekends:
    # Weekend = Sat (start) + Sun (start+1)
    end = start + pd.Timedelta(days=1)
    
    weekend_data = subset.sel(forecast_reference_time=slice(start, end))
    total_precip = weekend_data['tp'].sum().item()
    
    results[str(start.date())] = "Rain" if total_precip > 0.001 else "No Rain"

display(results)
import matplotlib.pyplot as plt

# Extract precipitation time series for your_place
tp = subset.tp.astype("float32") * 1000
your_place_precip = tp.to_pandas()

# Plot only the period covering the 3 weekends
plot_start = weekends[0]
plot_end = weekends[-1] + pd.Timedelta(days=1)
your_place_precip = your_place_precip.loc[plot_start:plot_end]

# Plot
fig, ax = plt.subplots(figsize=(10, 4))
your_place_precip.plot(ax=ax, lw=2, color="blue",marker="o", label="tot")

# Highlight weekends
for start in weekends:
    end = start + pd.Timedelta(days=1)
    ax.axvspan(start, end, color="lightgray", alpha=0.3, label="Weekend" if start == weekends[0] else "")

ax.set_title("Precipitation Forecast for your_place (Next 3 Weekends)")
ax.set_ylabel("Precipitation (mm)")
ax.set_xlabel("Date")
ax.legend()
plt.show()
'''
#Climate data are GRIB data defined on a HEALPix nested grid.
#We can interpolate it in a regular latitude-longitude grid

from earthkit.regrid import interpolate
from earthkit.data import from_source

ds = from_source("file",filename)

# the target grid is a global 0.1x0.1 degree regular latitude-longitude grid
out_grid = {"grid": [5,5]}

# perform interpolation for each field and add results to a new fieldlist stored in memory
r = interpolate(ds, out_grid=out_grid, method="linear")

d=r.to_xarray()
'''