Skip to article frontmatterSkip to article content

HDA Tutorial - Queryables

This notebook demonstrates how to use the queryables API to filter C3S and DestinE digital twin collections by leveraging variable terms that dynamically adjust based on user selections.

🚀 Launch in JupyterHub

HDA Tutorial - How to use the queryables API

Contents

  • Objective: This notebook has the aim to show how to use the queryable API to build your data request.

    The example focus on datasets from C3S and the DestinE Digital Twins. These collections are characterized by extensive metadata, high dimensionality, and complex filtering capabilities. Leveraging the Queryables API facilitates systematic exploration of these datasets and supports reproducible and well‑defined data retrieval workflows.

  • Data Sources: The examples focuses on the ECMWF datasets provided through HDA. - https://atmosphere.copernicus.eu/data - https://emergency.copernicus.eu/data - https://cds.climate.copernicus.eu/datasets - https://destine.ecmwf.int/climate-change-adaptation-digital-twin-climate-dt/ - https://destine.ecmwf.int/weather-induced-extremes-digital-twin/

  • Methods: The Queryables API provides a list of variable terms that can be used to filter a selected collection. It dynamically exposes only the filters that are valid for the chosen dataset, and each selection further narrows the available options. This ensures that users always build queries that are compatible with the dataset structure.

    This notebook illustrates how to use the Queryables API to explore and filter data within a specific collection. It guides you through retrieving the available queryable variables and applying them to construct valid data requests.

    Throughout this notebook, you will learn:

    1. Authenticate: How to authenticate for searching and access DEDL collections.

    2. Queryables: How to exploit the STAC API filter extension features. The “queryables” API helps users to determine the property names and types available for filtering data.

    3. Search data: How to search DEDL data using filters obtained by the “queryables” API.

    4. Download data: How to download DEDL data through HDA.

    The detailed HDA API and definition of each endpoint and parameters is available in the HDA Swagger UI at: STAC API - Get Queryables For Collection

  • Prerequisites:

  • Expected Output:

    • 1 file containing the requested data

Prerequisites

To run this tutorial, the appropriate access to the DestinE platform is needed:

Import

We start off by importing the relevant modules for DestnE authentication, HTTP requests, json handling, widgets and some utility.

import destinelab as deauth
import requests
import json
from getpass import getpass

import ipywidgets as widgets
from IPython.display import display, clear_output, HTML
from ipywidgets import Layout, Box
from datetime import datetime

from urllib.parse import unquote
from tqdm import tqdm
from time import sleep
from IPython.display import JSON

Authentication

The destinelab package is used to perform the authentication.

Obtain Authentication Token

To perform a query on HDA we need to be authenticated.

DESP_USERNAME = input("Please input your DESP username or email: ")
DESP_PASSWORD = getpass("Please input your DESP password: ")

auth = deauth.AuthHandler(DESP_USERNAME, DESP_PASSWORD)
access_token = auth.get_token()
if access_token is not None:
    print("DEDL/DESP Access Token Obtained Successfully")
else:
    print("Failed to Obtain DEDL/DESP Access Token")

auth_headers = {"Authorization": f"Bearer {access_token}"}
Please input your DESP username or email:  eum-dedl-user
Please input your DESP password:  ········
Response code: 200
DEDL/DESP Access Token Obtained Successfully
Response code: 200
DEDL/DESP Access Token Obtained Successfully

Define some constants for the API URLs and utility functions

In this section, we define the relevant constants, holding the URL strings for the different endpoints.

# Core API
HDA_API_URL = "https://hda.data.destination-earth.eu"

# STAC API
## Core
STAC_API_URL = f"{HDA_API_URL}/stac/v2"

## Item Search
SEARCH_URL = f"{STAC_API_URL}/search"

## Collections
COLLECTIONS_URL = f"{STAC_API_URL}/collections"

## Queryables
QUERYABLES_URL = f"{STAC_API_URL}/queryables"
HDA_FILTERS =''

## HTTP Success
HTTP_SUCCESS_CODE = 200
# parse STAC temporal interval into date objects
def parse_time_extent(col):
    itv = (((col.get("extent") or {}).get("temporal") or {}).get("interval") or [[]])
    start_iso, end_iso = (itv[0] + [None, None])[:2]
    to_date = lambda s: (datetime.fromisoformat(s.replace("Z","")).date() if s else None)
    sd, ed = to_date(start_iso), to_date(end_iso)
    s_txt = sd.isoformat() if sd else "open"
    e_txt = ed.isoformat() if ed else "open"
    return sd, ed, f"{s_txt} → {e_txt}"

def show_collection_info(change):
    out.clear_output()
    col = col_map[change["new"]]

    desc = col.get("dedl:short_description", "").strip()

    # NEW: parse and display time extent; update state for next cell
    sd, ed, extent_txt = parse_time_extent(col)
    selected["id"] = col["id"]
    selected["start_date"] = sd
    selected["end_date"] = ed

    with out:
        print("Collection Description:\n--------------")
        print(desc if desc else "(no description)")
        print("\nTime extent:\n--------------")
        print(extent_txt)
def fetch_constraints(selection):
    clean_filters = dict(selection or {})
    blocked = {"ecmwf:date"}
    for key in blocked:
        clean_filters.pop(key, None)
    response = requests.get(QUERYABLES_BY_COLLECTION_ID, params=clean_filters)
    response.raise_for_status()
    properties = response.json().get("properties", {})

    constraints = {}

    for field, info in properties.items():
        if not isinstance(info, dict):
            continue

        # Determine shape from JSON Schema type
        shape = "array" if info.get("type") == "array" else "scalar"
        
        # 1️ enum directly on property
        if isinstance(info.get("enum"), list):
            constraints[field] = {
                "type": "enum",
                "shape": shape,
                "values": info["enum"]
            }
            continue

        # 2️ enum under items
        items = info.get("items")
        if isinstance(items, dict) and isinstance(items.get("enum"), list):
            constraints[field] = {
                "type": "enum",
                "shape": shape,
                "values": items["enum"]
            }
            continue

        # 3️ const
        if (isinstance(items, dict) and isinstance(items.get("const"), str)) or isinstance(info.get("const"), str):
            constraints[field] = {
                "type": "const",
                "shape": shape,
                "value": (items or {}).get("const") or (info or {}).get("const")
            }

    return constraints
def rebuild_ui():
    global effective_selection

    with output:
        clear_output()

        constraints = fetch_constraints(current_selection)
        #print(json.dumps(constraints, indent=4))
        widgets_list = []
        effective_selection = {}  # reset every rebuild

        
        for field, meta in constraints.items():
            if meta["type"] == "enum":
                dropdown = widgets.Dropdown(
                    options=[""] + meta["values"],
                    value=current_selection.get(field, ""),
                    description=field,
                    layout=widgets.Layout(width="600px")
                )

                def handler(change, field=field):
                    if change["new"] == "":
                        current_selection.pop(field, None)
                        print("pop ", field)
                    else:
                        current_selection[field] = change["new"]
                    rebuild_ui()

                dropdown.observe(handler, names="value")
                widgets_list.append(dropdown)

                # shape-aware enum injection
                if field in current_selection:
                    if meta["shape"] == "array":
                        effective_selection[field] = [current_selection[field]]
                    else:
                        effective_selection[field] = current_selection[field]

            elif meta["type"] == "const":
                # consts are ALWAYS part of the effective selection
                if meta["shape"] == "array":
                    effective_selection[field] = [meta["value"]]
                else:
                    effective_selection[field] = meta["value"]

                widgets_list.append(
                    widgets.Text(
                        value=str(meta["value"]),
                        description=field,
                        disabled=True,
                        layout=widgets.Layout(width="600px")
                    )
                )

        display(widgets.HTML("<b>Queryables-driven parameters</b>"))
        for w in widgets_list:
            display(w)

        display(widgets.HTML("<hr><b>Effective selection (typed, complete)</b>"))
        display(effective_selection)

Queryables

The “queryables” API helps users to determine the property names and types available for filtering data inside a specific collection.

Below a dropdown menu to choose the collection. We can choose the collection of which we want to inspect the filters.

URL = COLLECTIONS_URL
params = {
    "limit": 100,
    "q": 'EO.ECMWF'
}

resp = requests.get(URL, params=params, timeout=60)
resp.raise_for_status()
discovery_json = resp.json()

collections = discovery_json.get("collections", [])

# mapping: id → collection object
col_map = {c["id"]: c for c in collections}

# ---- state container you can reuse in following cells ----
selected = {"id": None, "start_date": None, "end_date": None}

# Combo box with collection IDs
dd = widgets.Dropdown(options=sorted(col_map.keys()), description="Collection:")
out = widgets.Output()
dd.observe(show_collection_info, names="value")

display(dd, out)
show_collection_info({"new": dd.value})   # show initial selection
Loading...
COLLECTION_ID=selected['id']
print(COLLECTION_ID)
EO.ECMWF.DAT.CAMS_GLOBAL_GREENHOUSE_GAS_REANALYSIS_MONTHLY_AV_FIELDS

Get Queryables per Collection

The QUERYABLES ENDPOINT for the selected collection returns the applicable filters under the section named ‘properties’.


QUERYABLES_BY_COLLECTION_ID = f"{COLLECTIONS_URL}/{COLLECTION_ID}/queryables"

print("Queryables endpoint for the selected collection:")
print(QUERYABLES_BY_COLLECTION_ID)
Queryables endpoint for the selected collection:
https://hda.data.destination-earth.eu/stac/v2/collections/EO.ECMWF.DAT.CAMS_GLOBAL_GREENHOUSE_GAS_REANALYSIS_MONTHLY_AV_FIELDS/queryables

The properties section contains all the possible filters (queryables) for that collection. The filters specific for the collection have the ecmwf perfix.

Each single filter section contains:

  • type, the kind of filter (string, array...),

  • enum, the possible filter values (conditioned by the values selected for the other filters)

  • default, the chosen value (if applied)

  • const, a fixed value if only one possibility is foreseen for the current filter.

We can print the’properties’ section for the selected collection in the table below. The table shows the filters and the values applied by default when we perform a search for the chosen dataset without specifying any filter.


import pandas as pd
url = (QUERYABLES_BY_COLLECTION_ID)

# Keys inside each property definition that we want to ignore
EXCLUDED_KEYS = {"format", "pattern","prefixItems"}

response = requests.get(url)
response.raise_for_status()
data = response.json()

properties = data.get("properties", {})

rows = []

for field_name, field_info in properties.items():
    row = {"field": field_name}

    if isinstance(field_info, dict):
        for key, value in field_info.items():
            # Skip unwanted metadata entirely
            if key in EXCLUDED_KEYS:
                continue
                
            if key == "items" and isinstance(value, dict):
                enum_values = value.get("enum")
                if isinstance(enum_values, list):
                    row["enum"] = "\n".join(map(str, enum_values))
                continue  # do not add raw 'items'
            if isinstance(value, list):
                value = ", ".join(map(str, value))

            # Replace None with empty string
            row[key] = value if value is not None else ""

    rows.append(row)

# Create DataFrame directly from clean rows
df = pd.DataFrame(rows)

# Ensure no NaN survive (in case of uneven keys across rows)
df = df.fillna("")

# Put 'field' first
df = df[["field"] + [c for c in df.columns if c != "field"]]

pd.set_option("display.max_columns", None)
display(df.style.set_properties(
    subset=["enum"],
    **{"white-space": "pre-line"}
))
Loading...

Calling the queryables API specifying filters, that means using as parameters the values chosen for filtering the selected dataset, the API replies with the applicable filters, conditioned by the chosen values. Then if the user selects a certain value for a parameter then the choice is narrowed down for other variables.

The queryables API, in this way, helps user to build a correct search request for the given dataset.

Below an interactive example, to see that once you select a value for a property the choice is narrowed down for other variables.

current_selection = {}
output = widgets.Output()
effective_selection = {}
rebuild_ui()
display(output)
Loading...

Filtering a collection with the list returned by the queryable API

This section wil show how to use the list of variable terms returned by the queryables API for filtering a specific dataset.

If you choose a digital twins collection, check if the access is granted

If DT access is not granted, you will not be able to search and access DT data.

auth.is_DTaccess_allowed(access_token)
DT Output access allowed
True

Build the query from the selected values

The parameters chosen in the previous steps can be used to build the corresponding HDA queries.

# Build the base query as a Python dict
dictQuery = {
    "collections": [COLLECTION_ID],
    "query": {
        key: {"eq": value}
        for key, value in effective_selection.items()
    }
}

# Convert to JSON
queryJson = json.dumps(dictQuery, indent=4)

print(queryJson)
{
    "collections": [
        "EO.ECMWF.DAT.CAMS_GLOBAL_GREENHOUSE_GAS_REANALYSIS_MONTHLY_AV_FIELDS"
    ],
    "query": {
        "ecmwf:data_format": {
            "eq": "grib"
        },
        "ecmwf:month": {
            "eq": [
                "11"
            ]
        },
        "ecmwf:pressure_level": {
            "eq": [
                "700"
            ]
        },
        "ecmwf:product_type": {
            "eq": [
                "monthly_mean"
            ]
        },
        "ecmwf:variable": {
            "eq": [
                "carbon_dioxide"
            ]
        },
        "ecmwf:year": {
            "eq": [
                "2020"
            ]
        }
    }
}

#response = requests.post(SEARCH_URL, headers=auth_headers, json= queryJson )
response = requests.post(SEARCH_URL, headers=auth_headers, json= json.loads(queryJson) )
if (response.status_code)!= 200: print(response.text)
response.raise_for_status()
data = response.json()
product =data["features"][0]
JSON(product, expanded= False)
Loading...

Download

Once we have found the product we can download it:

The single item returned (above) contains:

  • The product id: “DT_CLIMATE_ADAPTATION_ORDERABLE_...”, that is a placeholder, its name contains the term “ORDERABLE”.

  • The storage:tier that indicates that the product is “offline”

  • The order:status that indicates that the product is “orderable”

  • Request params used for the order extracted from the search result

link = next((l for l in product.get('links', []) if l.get("rel") == "retrieve"), None)

if link:
    href = link.get("href")
    body = link.get("body")   # optional: depends on extension
    print("order endpoint:", href)
    print("order body, same as native format:")
    print(json.dumps(body, indent=4))
else:
    print(f"No link with rel='{target_rel}' found")
order endpoint: https://hda.data.destination-earth.eu/stac/v2/collections/EO.ECMWF.DAT.CAMS_GLOBAL_GREENHOUSE_GAS_REANALYSIS_MONTHLY_AV_FIELDS/order
order body, same as native format:
{
    "data_format": "grib",
    "month": [
        "11"
    ],
    "pressure_level": [
        "700"
    ],
    "product_type": [
        "monthly_mean"
    ],
    "variable": [
        "carbon_dioxide"
    ],
    "year": [
        "2020"
    ]
}

Order data

We have now all the information to order the data.

From the search results we know that the product is orderable and offline, we then need to order the product we searched for.

response = requests.post(href, json=body, headers=auth_headers)

if response.status_code != 200:
    print(response.content)
response.raise_for_status()

ordered_item = response.json()

product_id = ordered_item["id"]
storage_tier = ordered_item["properties"].get("storage:tier", "online")
order_status = ordered_item["properties"].get("order:status", "unknown")
federation_backend = ordered_item["properties"].get("federation:backends", [None])[0]

print(f"Product ordered: {product_id}")
print(f"Provider: {federation_backend}")
print(f"Storage tier: {storage_tier} (product must have storage tier \"online\" to be downloadable)")
print(f"Order status: {order_status}")    
Product ordered: 1a6ae79d-0163-4c3c-a233-ca8c260d25a1
Provider: cop_ads
Storage tier: offline (product must have storage tier "online" to be downloadable)
Order status: ordered

Poll the API until product is ready

We request the product itself to get an update of its status.

#timeout and step for polling (sec)
TIMEOUT = 300
STEP = 1
ONLINE_STATUS = "online"

self_url = f"{STAC_API_URL}/collections/{COLLECTION_ID}/items/{product_id}"
item = {}

for i in range(0, TIMEOUT, STEP):
    print(f"Polling {i + 1}/{TIMEOUT // STEP}")

    response = requests.get(self_url, headers=auth_headers)
    if response.status_code != 200:
        print(response.content)
    response.raise_for_status()
    item = response.json()

    storage_tier = item["properties"].get("storage:tier", ONLINE_STATUS)

    if storage_tier == ONLINE_STATUS:
        download_url = item["assets"]["downloadLink"]["href"]
        print("Product is ready to be downloaded.")
        print(f"Asset URL: {download_url}")
        break

    sleep(STEP)
else:
    order_status = item["properties"].get("order:status", "unknown")
    print(f"We could not download the product after {TIMEOUT // STEP} tries. Current order status is {order_status}")
    
Polling 1/300
Polling 2/300
Polling 3/300
Polling 4/300
Polling 5/300
Polling 6/300
Polling 7/300
Polling 8/300
Polling 9/300
Polling 10/300
Polling 11/300
Product is ready to be downloaded.
Asset URL: https://hda-download.leonardo.data.destination-earth.eu/data/cop_ads/EO.ECMWF.DAT.CAMS_GLOBAL_GREENHOUSE_GAS_REANALYSIS_MONTHLY_AV_FIELDS/1a6ae79d-0163-4c3c-a233-ca8c260d25a1/downloadLink

Download

response = requests.get(download_url, stream=True, headers=auth_headers)
response.raise_for_status()

content_disposition = response.headers.get('Content-Disposition')
total_size = int(response.headers.get("content-length", 0))
if content_disposition:
    filename = content_disposition.split('filename=')[1].strip('"')
    filename = unquote(filename)
else:
    filename = os.path.basename(url)

# Open a local file in binary write mode and write the content
print(f"downloading {filename}")

with tqdm(total=total_size, unit="B", unit_scale=True) as progress_bar:
    with open(filename, 'wb') as f:
        for data in response.iter_content(1024):
            progress_bar.update(len(data))
            f.write(data)
---------------------------------------------------------------------------
HTTPError                                 Traceback (most recent call last)
Cell In[19], line 2
      1 response = requests.get(download_url, stream=True, headers=auth_headers)
----> 2 response.raise_for_status()
      4 content_disposition = response.headers.get('Content-Disposition')
      5 total_size = int(response.headers.get("content-length", 0))

File /opt/conda/envs/python_dedl/lib/python3.11/site-packages/requests/models.py:1026, in Response.raise_for_status(self)
   1021     http_error_msg = (
   1022         f"{self.status_code} Server Error: {reason} for url: {self.url}"
   1023     )
   1025 if http_error_msg:
-> 1026     raise HTTPError(http_error_msg, response=self)

HTTPError: 502 Server Error: Bad Gateway for url: https://hda-download.leonardo.data.destination-earth.eu/data/cop_ads/EO.ECMWF.DAT.CAMS_GLOBAL_GREENHOUSE_GAS_REANALYSIS_MONTHLY_AV_FIELDS/1a6ae79d-0163-4c3c-a233-ca8c260d25a1/downloadLink