Author: EUMETSAT
Copyright: 2024 EUMETSAT
Licence: MIT

DEDL - Hook Tutorial - Data Harvest (data-harvest)#

This notebook demonstrates how to use the Hook service.

Author: EUMETSAT

The detailed API and definition of each endpoint and parameters is available in the OnDemand Processing API OData v1 OpenAPI documentation found at: https://odp.data.destination-earth.eu/odata/docs

Further documentation is available at: https://destine-data-lake-docs.data.destination-earth.eu/en/latest/dedl-big-data-processing-services/Hook-service/Hook-service.html

Install python package requirements and import environment variables#

# Note: The destinelab python package (which helps with authentication) is available already if you are using Python DEDL kernel
# Otherwise, the destinelab python package can be installed by uncommenting the following line

# For the importing of environment variables using the load_dotenv(...) command 
%pip install python-dotenv
# for example code navigating private S3 compatible storage (PRIVATE bucket storage)
%pip install boto3
import os
import json
import requests
from dotenv import load_dotenv
from getpass import getpass
import destinelab as destinelab

# Load (optional) notebook specific environment variables from .env_tutorial
load_dotenv("./.env_tutorial", override=True)

Authentification - Get token#

# By default users should use their DESP credentials to get an Access_token
# This token is added as an Authorisation Header when interacting with the Hook Service API

# Enter DESP credentials.
DESP_USERNAME = input("Please input your DESP username or email: ")
DESP_PASSWORD = getpass("Please input your DESP password: ")
token = destinelab.AuthHandler(DESP_USERNAME, DESP_PASSWORD)

access_token = token.get_token()

# Check the status of the request
if access_token is not None:
    print("DEDL/DESP Access Token Obtained Successfully")
    # Save API headers
    api_headers = {"Authorization": "Bearer " + access_token}
else:
    print("Failed to Obtain DEDL/DESP Access Token")

Setup static variables#

# Hook service url (ending with odata/v1/ - e.g. https://odp.data.destination-earth.eu/odata/v1/)
hook_service_root_url = "https://odp.data.destination-earth.eu/odata/v1/"

List available workflows#

Next we can check what possible workflows are available to us by using method
https://odp.data.destination-earth.eu/odata/v1/Workflows

# Send request and return json object listing all provided workfows, ordered by Id
result = requests.get(
    f"{hook_service_root_url}Workflows?$orderby=Id asc", headers=api_headers   
).json()

print("List of available DEDL provided Hooks")
for i in range(len(result["value"])):
    print(
        f"Name:{str(result['value'][i]['Name']).ljust(20, ' ')}DisplayName:{str(result['value'][i]['DisplayName'])}"
    )  # print JSON string
# Print result JSON object: containing provided workflow list
workflow_details = json.dumps(result, indent=4)
print(workflow_details)

Select a workflow and see parameters#

If we want to see the details of a specific workflow, showing us the parameters that can be set for that workflow, we can add a filter to the query as follows:

https://odp.data.destination-earth.eu/odata/v1/Workflows?$expand=WorkflowOptions&$filter=(Name eq data-harvest)

\\(expand=WorkflowOptions** shows all parameters accepted by workflow **\\\)filter=(Name eq data-harvest) narrows the result to workflow called “data-harvest”

# Select workflow : defaults to data-harvest
workflow = os.getenv("HOOK_WORKFLOW", "data-harvest")
print(f"workflow: {workflow}")

# Send request
result = requests.get(
    f"{hook_service_root_url}Workflows?$expand=WorkflowOptions&$filter=(Name eq '{workflow}')",
    headers=api_headers,
).json()
workflow_details = json.dumps(result, indent=4)
print(workflow_details)  # print formatted workflow_details, a JSON string

Order selected workflow#

The order selected above will now be configured and executed.

e.g. workflow = “data-harvest”.

  • Make an order to ‘harvest data’ using Harmonised Data Access API.

  • i.e. data from an input source can be transferred to a Private bucket or a Temporary storage bucket.

Name your order#

# Here we set the variable order_name, this will allow us to:
# Easily identify the running process (e.g. when checking the status)
# order_name is added as a suffix to the order 'Name'
order_name = os.getenv("HOOK_ORDER_NAME") or input("Name your order: ")
print(f"order_name:{order_name}")

Define output storage#

In workflow parameters, among others values, storage to retreive the result has to be provided.
Two possibilites:

  1. Use your user storage

  2. Use a temporary storage

1. - Your user storage (provided by DEDL ISLET service)#

Example using a S3 bucket created with ISLET Storage service - result will be available in this bucket

workflow parameter: {“Name”: “output_storage”, “Value”: “PRIVATE”}

# Output storage - Islet service

# Note: If you want the output to go to your own PRIVATE bucket rather than TEMPORARY storage (expires after 2 weeks),
#       i) This Configuration will need to be updated with your output_bucket, output_storage_access_key, output_secret_key, output_prefix
#       ii) You will need to change the output_storage in the order to PRIVATE and add the necessary source_ parameters (see workflow options and commented example)

# URL of the S3 endpoint in the Central Site (or lumi etc.)
output_storage_url = "https://s3.central.data.destination-earth.eu"
# output_storage_url = "https://s3.lumi.data.destination-earth.eu"

# Name of the object storage bucket where the results will be stored.
output_bucket = os.getenv("HOOK_OUTPUT_BUCKET", "your-bucket-name")
print(f"output_bucket            : {output_bucket}")

# Islet object storage credentials (openstack ec2 credentials)
output_storage_access_key = os.getenv("HOOK_OUTPUT_STORAGE_ACCESS_KEY", "your-access-key")
output_storage_secret_key = os.getenv("HOOK_OUTPUT_STORAGE_SECRET_KEY", "your-secret-key")
print(f"output_storage_access_key: {output_storage_access_key}")
print(f"output_storage_secret_key: {output_storage_secret_key}")


# This is the name of the folder in your output_bucket where the output of the hook will be stored.
# Here we concatenate 'dedl' with the 'workflow' and 'order_name'
output_prefix = f"dedl-{workflow}-{order_name}"
print(f"output_prefix            : {output_prefix}")

2 - Use temporary storage#

The result of processing will be stored in shared storage and download link provided in the output product details

workflow parameter: {“Name”: “output_storage”, “Value”: “TEMPORARY”}

Define parameters and send order#

# URL of the STAC server where your collection/item can be downloaded
stac_hda_api_url = "https://hda.data.destination-earth.eu/stac"

# Note: The data (collection_id and data_id) will have been previously discovered and searched for

# Set collection where the item can be found : defaults to example for data-harvest
collection_id = os.getenv("HOOK_COLLECTION_ID", "EO.ESA.DAT.SENTINEL-2.MSI.L1C")
print(f"STAC collection url: {stac_hda_api_url}/collections/{collection_id}")

# Set the Item to Retrieve : defaults to example for data-harvest. If Multiple Values, provide comma separated list
data_id = os.getenv("HOOK_DATA_ID", "S2A_MSIL1C_20230910T050701_N0509_R019_T47VLH_20230910T074321.SAFE")
print(f"data_id: {data_id}")
identifier_list = [data_id_element.strip() for data_id_element in data_id.split(',')]

# Get boolean value from String, default (False)
is_private_storage = os.getenv("HOOK_IS_PRIVATE_STORAGE", "False") == "True"
print(f"is_private_storage: {is_private_storage}")

# we use source_type to add DESP or EXTERNAL specific configuration
source_type = os.getenv("HOOK_SOURCE_TYPE", "DESP")
print(f"source_type: {source_type}")

if source_type == "EXTERNAL":
    EXTERNAL_USERNAME = os.getenv("HOOK_EXTERNAL_USERNAME", "EXTERNAL_USERNAME")
    EXTERNAL_PASSWORD = os.getenv("HOOK_EXTERNAL_PASSWORD", "EXTERNAL_PASSWORD")
    EXTERNAL_TOKEN_URL = os.getenv("HOOK_EXTERNAL_TOKEN_URL", "EXTERNAL_TOKEN_URL")
    EXTERNAL_CLIENT_ID = os.getenv("HOOK_EXTERNAL_CLIENT_ID", "EXTERNAL_CLIENT_ID")
########## BUILD ORDER BODY : CHOOSE PRIVATE or TEMPORARY output_storage ##########

# Initialise the order_body
order_body_custom_bucket = {
    "Name": "Tutorial " + workflow + " - " + order_name,
    "WorkflowName": workflow,
    "IdentifierList": identifier_list,
    "WorkflowOptions": [],
}


##### Configure PRIVATE OR TEMPORARY STORAGE #####
if is_private_storage:

    print("##### Preparing Order Body for PRIVATE STORAGE #####")
    order_body_custom_bucket["WorkflowOptions"].extend(
        [
            {"Name": "output_storage", "Value": "PRIVATE"},
            {"Name": "output_s3_access_key", "Value": output_storage_access_key},
            {"Name": "output_s3_secret_key", "Value": output_storage_secret_key},
            {"Name": "output_s3_path", "Value": f"s3://{output_bucket}/{output_prefix}"},
            {"Name": "output_s3_endpoint_url", "Value": output_storage_url}
        ]
    )

else:

    print("##### Preparing Order Body for TEMPORARY STORAGE #####")
    order_body_custom_bucket["WorkflowOptions"].extend(
        [
            {"Name": "output_storage", "Value": "TEMPORARY"},
        ]
    )

##### Configure SOURCE_TYPE and associated parameters #####
if source_type == "DESP":

    # Using DESP credentials is standard way of executing Hooks.
    print("##### Preparing Order Body for access to DEDL HDA using DESP Credentials #####")
    order_body_custom_bucket["WorkflowOptions"].extend(
        [
            {"Name": "source_type", "Value": "DESP"},
            {"Name": "desp_source_username", "Value": DESP_USERNAME},
            {"Name": "desp_source_password", "Value": DESP_PASSWORD},
            {"Name": "desp_source_collection", "Value": collection_id}
        ]
    )

elif source_type == "EXTERNAL":

    # Build your order body : Example using EXTERNAL source type and source_catalogue_api_type STAC.
    # This would allow you to access products directly from a configured STAC server
    # Here we show an example configuration of a STAC server with OIDC security, that could be adapted to your needs (change urls, etc)
    # This is shown for example purposes only. The standard way of configuring is with DESP source_type seen above.
    print("##### Preparing Order Body for access to EXTERNAL STAC Server using EXTERNAL Credentials #####")
    order_body_custom_bucket["WorkflowOptions"].extend(
        [
            {"Name": "source_type", "Value": "EXTERNAL"},
            {"Name": "source_catalogue_api_url", "Value": stac_hda_api_url},
            {"Name": "source_catalogue_api_type", "Value": "STAC"},
            {"Name": "source_token_url", "Value": EXTERNAL_TOKEN_URL},
            {"Name": "source_grant_type", "Value": "PASSWORD"},
            {"Name": "source_auth_header_name", "Value": "Authorization"},
            {"Name": "source_username", "Value": EXTERNAL_USERNAME},
            {"Name": "source_password", "Value": EXTERNAL_PASSWORD},
            {"Name": "source_client_id", "Value": EXTERNAL_CLIENT_ID},
            {"Name": "source_client_secret", "Value": ""},
            {"Name": "source_catalogue_collection", "Value": collection_id}
        ]
    )

else:

    print("source_type not equal to DESP or EXTERNAL")



########## ADDITIONAL OPTIONS ##########

additional_options = []

# Checks environment variables for the form HOOK_ADDITIONAL1="NAME=12345;VALUE=abcdef"
for env_key, env_value in os.environ.items():
    if env_key.startswith('HOOK_ADDITIONAL'):
        #print(f"{env_key}: {env_value}")        
        parts = env_value.split(';')
        # Extract the name and value
        name = parts[0].split('=')[1]
        value = parts[1].split('=')[1]
        value_type = parts[2].split('=')[1]
        additional_options.append({"Name": name, "Value": value if value_type == 'str' else int(value)})

print(f"addditional_options:{additional_options}")

if additional_options:
    print("Adding additional_options")
    order_body_custom_bucket["WorkflowOptions"].extend(additional_options)

########## BUILD ORDER BODY : END ##########

# Send order
order_request = requests.post(
    hook_service_root_url + "BatchOrder/OData.CSC.Order",
    json.dumps(order_body_custom_bucket),
    headers=api_headers,
).json()

# If code = 201, the order has been successfully sent

# Print order_request JSON object: containing order_request details
order_reques_details = json.dumps(order_request, indent=4)
print(order_reques_details)

order_id = order_request['value']['Id']
print(f"order 'Id' from order_request: {order_id}")

It is possible to order multiple product using endpoint: https://odp.data.destination-earth.eu/odata/v1/BatchOrder/OData.CSC.Order

Check The status of the order#

Possible status values

  • queued (i.e. queued for treatment but not started)

  • in_progress (i.e. order being treated)

  • completed (i.e. order is complete and data ready)

# ProductionOrders endpoint gives status of orders (only with one item attached)
# Otherwise use BatchOrder(XXXX)/Products endpoint to get status of individual items associated with order
if len(identifier_list) == 1:
    order_status_url = f"{hook_service_root_url}ProductionOrders"
    params = {"$filter": f"id eq {order_id}"}
    order_status_response = requests.get(order_status_url, params=params, headers=api_headers).json()
    print(json.dumps(order_status_response, indent=4))

# Get Status of all items of an order in this way
order_status_response = requests.get(
    f"{hook_service_root_url}BatchOrder({order_id})/Products",
    headers=api_headers,
).json()
print(json.dumps(order_status_response, indent=4))

Access workflow output#

Private storage#

Let us now check our private storage using this boto3 script. You can also go and check this in the Islet service using the Horizon user interface

# PRIVATE STORAGE: Prints contents of Private Bucket
import boto3

if is_private_storage:

    s3 = boto3.client(
        "s3",
        aws_access_key_id=output_storage_access_key,
        aws_secret_access_key=output_storage_secret_key,
        endpoint_url=output_storage_url,
    )

    paginator = s3.get_paginator("list_objects_v2")
    pages = paginator.paginate(Bucket=output_bucket, Prefix=output_prefix + "/")

    for page in pages:
        try:
            for obj in page["Contents"]:
                print(obj["Key"])
        except KeyError:
            print("No files exist")
            exit(1)

Temporary storage#

# List order items within a production order
# When the output_storage is of type TEMPORARY we can get a DownloadLink from the following code (Can also optionally download items here in code with the flag is_download_products)

# If TEMPORARY storage
if not is_private_storage:

    # Set to True to download products at the same level as the notebook file. File name will be in format "output-{workflow}-{order_id}-{product_id}.zip"
    is_download_products = False

    # Get Status of all items of an order in this way
    product_status_response = requests.get(
        f"{hook_service_root_url}BatchOrder({order_id})/Products",
        headers=api_headers,
    ).json()
    print(json.dumps(product_status_response, indent=4))

    if is_download_products:

        is_all_products_completed = True
        # We only attempt to download products when each of the items is in complete status.
        for i in range(len(product_status_response["value"])):

            product_id = product_status_response["value"][i]["Id"]
            product_status = product_status_response["value"][i]["Status"]

            if product_status != "completed":
                is_all_products_completed = False

        # Can download if all products completed
        if is_all_products_completed:

            for i in range(len(product_status_response["value"])):

                product_id = product_status_response["value"][i]["Id"]
                product_status = product_status_response["value"][i]["Status"]

                # Infer the url of the product
                url_product = f"{hook_service_root_url}BatchOrder({order_id})/Product({product_id})/$value"
                print(f"url_product: {url_product}")
                # Download the product
                r = requests.get(
                    url_product, headers=api_headers, allow_redirects=True
                )
                product_file_name = f"output-{workflow}-{order_id}-{product_id}.zip"
                open(product_file_name, "wb").write(r.content)
                print(f"Download Complete: product_file_name: {product_file_name}")

        else:
            print(f"Status for order:{order_id} - At least one of the products does not have the status of 'completed'.")