Skip to article frontmatterSkip to article content

Hook Tutorial - Data Harvest

This notebook demonstrates how to use the Hook service.

🚀 Launch in JupyterHub

The detailed API and definition of each endpoint and parameters is available in the OnDemand Processing API OData v1 OpenAPI documentation found at: https://odp.data.destination-earth.eu/odata/docs

Further documentation is available at: https://destine-data-lake-docs.data.destination-earth.eu/en/latest/dedl-big-data-processing-services/Hook-service/Hook-service.html

Install python package requirements and import environment variables

# Note: The destinelab python package (which helps with authentication) is available already if you are using Python DEDL kernel
# Otherwise, the destinelab python package can be installed by uncommenting the following line

# For the importing of environment variables using the load_dotenv(...) command 
%pip install python-dotenv
# for example code navigating private S3 compatible storage (PRIVATE bucket storage)
%pip install boto3
import os
import json
import requests
from dotenv import load_dotenv
from getpass import getpass
import destinelab as destinelab

# Load (optional) notebook specific environment variables from .env_tutorial
load_dotenv("./.env_tutorial", override=True)

Authentification - Get token

# By default users should use their DESP credentials to get an Access_token
# This token is added as an Authorisation Header when interacting with the Hook Service API

# Enter DESP credentials.
DESP_USERNAME = input("Please input your DESP username or email: ")
DESP_PASSWORD = getpass("Please input your DESP password: ")
token = destinelab.AuthHandler(DESP_USERNAME, DESP_PASSWORD)

access_token = token.get_token()

# Check the status of the request
if access_token is not None:
    print("DEDL/DESP Access Token Obtained Successfully")
    # Save API headers
    api_headers = {"Authorization": "Bearer " + access_token}
else:
    print("Failed to Obtain DEDL/DESP Access Token")

Setup static variables

# Hook service url (ending with odata/v1/ - e.g. https://odp.data.destination-earth.eu/odata/v1/)
hook_service_root_url = "https://odp.data.destination-earth.eu/odata/v1/"

List available workflows

Next we can check what possible workflows are available to us by using method
https://odp.data.destination-earth.eu/odata/v1/Workflows

# Send request and return json object listing all provided workfows, ordered by Id
result = requests.get(
    f"{hook_service_root_url}Workflows?$orderby=Id asc", headers=api_headers   
).json()

print("List of available DEDL provided Hooks")
for i in range(len(result["value"])):
    print(
        f"Name:{str(result['value'][i]['Name']).ljust(20, ' ')}DisplayName:{str(result['value'][i]['DisplayName'])}"
    )  # print JSON string
# Print result JSON object: containing provided workflow list
workflow_details = json.dumps(result, indent=4)
print(workflow_details)

Select a workflow and see parameters

If we want to see the details of a specific workflow, showing us the parameters that can be set for that workflow, we can add a filter to the query as follows:

https://odp.data.destination-earth.eu/odata/v1/Workflows?$expand=WorkflowOptions&$filter=(Name eq data-harvest)

\expand=WorkflowOptionsshowsallparametersacceptedbyworkflowexpand=WorkflowOptions** shows all parameters accepted by workflow **\\filter=(Name eq data-harvest) narrows the result to workflow called “data-harvest”

# Select workflow : defaults to data-harvest
workflow = os.getenv("HOOK_WORKFLOW", "data-harvest")
print(f"workflow: {workflow}")

# Send request
result = requests.get(
    f"{hook_service_root_url}Workflows?$expand=WorkflowOptions&$filter=(Name eq '{workflow}')",
    headers=api_headers,
).json()
workflow_details = json.dumps(result, indent=4)
print(workflow_details)  # print formatted workflow_details, a JSON string

Order selected workflow

The order selected above will now be configured and executed.

e.g. workflow = “data-harvest”.

  • Make an order to ‘harvest data’ using Harmonised Data Access API.

  • i.e. data from an input source can be transferred to a Private bucket or a Temporary storage bucket.

Name your order

# Here we set the variable order_name, this will allow us to:
# Easily identify the running process (e.g. when checking the status)
# order_name is added as a suffix to the order 'Name'
order_name = os.getenv("HOOK_ORDER_NAME") or input("Name your order: ")
print(f"order_name:{order_name}")

Define output storage

In workflow parameters, among others values, storage to retreive the result has to be provided.
Two possibilites:

  1. Use your user storage

  2. Use a temporary storage

1. - Your user storage (provided by DEDL ISLET service)

Example using a S3 bucket created with ISLET Storage service - result will be available in this bucket

workflow parameter: {“Name”: “output_storage”, “Value”: “PRIVATE”}

# Output storage - Islet service

# Note: If you want the output to go to your own PRIVATE bucket rather than TEMPORARY storage (expires after 2 weeks),
#       i) This Configuration will need to be updated with your output_bucket, output_storage_access_key, output_secret_key, output_prefix
#       ii) You will need to change the output_storage in the order to PRIVATE and add the necessary source_ parameters (see workflow options and commented example)

# URL of the S3 endpoint in the Central Site (or lumi etc.)
output_storage_url = "https://s3.central.data.destination-earth.eu"
# output_storage_url = "https://s3.lumi.data.destination-earth.eu"

# Name of the object storage bucket where the results will be stored.
output_bucket = os.getenv("HOOK_OUTPUT_BUCKET", "your-bucket-name")
print(f"output_bucket            : {output_bucket}")

# Islet object storage credentials (openstack ec2 credentials)
output_storage_access_key = os.getenv("HOOK_OUTPUT_STORAGE_ACCESS_KEY", "your-access-key")
output_storage_secret_key = os.getenv("HOOK_OUTPUT_STORAGE_SECRET_KEY", "your-secret-key")
print(f"output_storage_access_key: {output_storage_access_key}")
print(f"output_storage_secret_key: {output_storage_secret_key}")


# This is the name of the folder in your output_bucket where the output of the hook will be stored.
# Here we concatenate 'dedl' with the 'workflow' and 'order_name'
output_prefix = f"dedl-{workflow}-{order_name}"
print(f"output_prefix            : {output_prefix}")

2 - Use temporary storage

The result of processing will be stored in shared storage and download link provided in the output product details

workflow parameter: {“Name”: “output_storage”, “Value”: “TEMPORARY”}

Define parameters and send order

# URL of the STAC server where your collection/item can be downloaded
stac_hda_api_url = "https://hda.data.destination-earth.eu/stac"

# Note: The data (collection_id and data_id) will have been previously discovered and searched for

# Set collection where the item can be found : defaults to example for data-harvest
collection_id = os.getenv("HOOK_COLLECTION_ID", "EO.ESA.DAT.SENTINEL-2.MSI.L1C")
print(f"STAC collection url: {stac_hda_api_url}/collections/{collection_id}")

# Set the Item to Retrieve : defaults to example for data-harvest. If Multiple Values, provide comma separated list
data_id = os.getenv("HOOK_DATA_ID", "S2A_MSIL1C_20230910T050701_N0509_R019_T47VLH_20230910T074321.SAFE")
print(f"data_id: {data_id}")
identifier_list = [data_id_element.strip() for data_id_element in data_id.split(',')]

# Get boolean value from String, default (False)
is_private_storage = os.getenv("HOOK_IS_PRIVATE_STORAGE", "False") == "True"
print(f"is_private_storage: {is_private_storage}")

# we use source_type to add DESP or EXTERNAL specific configuration
source_type = os.getenv("HOOK_SOURCE_TYPE", "DESP")
print(f"source_type: {source_type}")

if source_type == "EXTERNAL":
    EXTERNAL_USERNAME = os.getenv("HOOK_EXTERNAL_USERNAME", "EXTERNAL_USERNAME")
    EXTERNAL_PASSWORD = os.getenv("HOOK_EXTERNAL_PASSWORD", "EXTERNAL_PASSWORD")
    EXTERNAL_TOKEN_URL = os.getenv("HOOK_EXTERNAL_TOKEN_URL", "EXTERNAL_TOKEN_URL")
    EXTERNAL_CLIENT_ID = os.getenv("HOOK_EXTERNAL_CLIENT_ID", "EXTERNAL_CLIENT_ID")
########## BUILD ORDER BODY : CHOOSE PRIVATE or TEMPORARY output_storage ##########

# Initialise the order_body
order_body_custom_bucket = {
    "Name": "Tutorial " + workflow + " - " + order_name,
    "WorkflowName": workflow,
    "IdentifierList": identifier_list,
    "WorkflowOptions": [],
}


##### Configure PRIVATE OR TEMPORARY STORAGE #####
if is_private_storage:

    print("##### Preparing Order Body for PRIVATE STORAGE #####")
    order_body_custom_bucket["WorkflowOptions"].extend(
        [
            {"Name": "output_storage", "Value": "PRIVATE"},
            {"Name": "output_s3_access_key", "Value": output_storage_access_key},
            {"Name": "output_s3_secret_key", "Value": output_storage_secret_key},
            {"Name": "output_s3_path", "Value": f"s3://{output_bucket}/{output_prefix}"},
            {"Name": "output_s3_endpoint_url", "Value": output_storage_url}
        ]
    )

else:

    print("##### Preparing Order Body for TEMPORARY STORAGE #####")
    order_body_custom_bucket["WorkflowOptions"].extend(
        [
            {"Name": "output_storage", "Value": "TEMPORARY"},
        ]
    )

##### Configure SOURCE_TYPE and associated parameters #####
if source_type == "DESP":

    # Using DESP credentials is standard way of executing Hooks.
    print("##### Preparing Order Body for access to DEDL HDA using DESP Credentials #####")
    order_body_custom_bucket["WorkflowOptions"].extend(
        [
            {"Name": "source_type", "Value": "DESP"},
            {"Name": "desp_source_username", "Value": DESP_USERNAME},
            {"Name": "desp_source_password", "Value": DESP_PASSWORD},
            {"Name": "desp_source_collection", "Value": collection_id}
        ]
    )

elif source_type == "EXTERNAL":

    # Build your order body : Example using EXTERNAL source type and source_catalogue_api_type STAC.
    # This would allow you to access products directly from a configured STAC server
    # Here we show an example configuration of a STAC server with OIDC security, that could be adapted to your needs (change urls, etc)
    # This is shown for example purposes only. The standard way of configuring is with DESP source_type seen above.
    print("##### Preparing Order Body for access to EXTERNAL STAC Server using EXTERNAL Credentials #####")
    order_body_custom_bucket["WorkflowOptions"].extend(
        [
            {"Name": "source_type", "Value": "EXTERNAL"},
            {"Name": "source_catalogue_api_url", "Value": stac_hda_api_url},
            {"Name": "source_catalogue_api_type", "Value": "STAC"},
            {"Name": "source_token_url", "Value": EXTERNAL_TOKEN_URL},
            {"Name": "source_grant_type", "Value": "PASSWORD"},
            {"Name": "source_auth_header_name", "Value": "Authorization"},
            {"Name": "source_username", "Value": EXTERNAL_USERNAME},
            {"Name": "source_password", "Value": EXTERNAL_PASSWORD},
            {"Name": "source_client_id", "Value": EXTERNAL_CLIENT_ID},
            {"Name": "source_client_secret", "Value": ""},
            {"Name": "source_catalogue_collection", "Value": collection_id}
        ]
    )

else:

    print("source_type not equal to DESP or EXTERNAL")



########## ADDITIONAL OPTIONS ##########

additional_options = []

# Checks environment variables for the form HOOK_ADDITIONAL1="NAME=12345;VALUE=abcdef"
for env_key, env_value in os.environ.items():
    if env_key.startswith('HOOK_ADDITIONAL'):
        #print(f"{env_key}: {env_value}")        
        parts = env_value.split(';')
        # Extract the name and value
        name = parts[0].split('=')[1]
        value = parts[1].split('=')[1]
        value_type = parts[2].split('=')[1]
        additional_options.append({"Name": name, "Value": value if value_type == 'str' else int(value)})

print(f"addditional_options:{additional_options}")

if additional_options:
    print("Adding additional_options")
    order_body_custom_bucket["WorkflowOptions"].extend(additional_options)

########## BUILD ORDER BODY : END ##########

# Uncomment this to see the final order body
# print(json.dumps(order_body_custom_bucket, indent=4))


# Send order
order_request = requests.post(
    hook_service_root_url + "BatchOrder/OData.CSC.Order",
    json.dumps(order_body_custom_bucket),
    headers=api_headers,
).json()

# If code = 201, the order has been successfully sent

# Print order_request JSON object: containing order_request details
order_reques_details = json.dumps(order_request, indent=4)
print(order_reques_details)

order_id = order_request['value']['Id']
print(f"order 'Id' from order_request: {order_id}")

It is possible to order multiple product using endpoint: https://odp.data.destination-earth.eu/odata/v1/BatchOrder/OData.CSC.Order

Check The status of the order

Possible status values

  • queued (i.e. queued for treatment but not started)

  • in_progress (i.e. order being treated)

  • completed (i.e. order is complete and data ready)


# ProductionOrders endpoint gives status of orders (only with one item attached)
# Otherwise use BatchOrder(XXXX)/Products endpoint to get status of individual items associated with order
if len(identifier_list) == 1:
    order_status_url = f"{hook_service_root_url}ProductionOrders"
    params = {"$filter": f"id eq {order_id}"}
    order_status_response = requests.get(order_status_url, params=params, headers=api_headers).json()
    print(json.dumps(order_status_response, indent=4))

# Get Status of all items of an order in this way
order_status_response = requests.get(
    f"{hook_service_root_url}BatchOrder({order_id})/Products",
    headers=api_headers,
).json()
print(json.dumps(order_status_response, indent=4))

Access workflow output

Private storage

Let us now check our private storage using this boto3 script. You can also go and check this in the Islet service using the Horizon user interface

# PRIVATE STORAGE: Prints contents of Private Bucket
import boto3

if is_private_storage:

    s3 = boto3.client(
        "s3",
        aws_access_key_id=output_storage_access_key,
        aws_secret_access_key=output_storage_secret_key,
        endpoint_url=output_storage_url,
    )

    paginator = s3.get_paginator("list_objects_v2")
    pages = paginator.paginate(Bucket=output_bucket, Prefix=output_prefix + "/")

    for page in pages:
        try:
            for obj in page["Contents"]:
                print(obj["Key"])
        except KeyError:
            print("No files exist")
            exit(1)

Temporary storage

# List order items within a production order
# When the output_storage is of type TEMPORARY we can get a DownloadLink from the following code (Can also optionally download items here in code with the flag is_download_products)

# If TEMPORARY storage
if not is_private_storage:

    # Set to True to download products at the same level as the notebook file. File name will be in format "output-{workflow}-{order_id}-{product_id}.zip"
    is_download_products = False

    # Get Status of all items of an order in this way
    product_status_response = requests.get(
        f"{hook_service_root_url}BatchOrder({order_id})/Products",
        headers=api_headers,
    ).json()
    print(json.dumps(product_status_response, indent=4))

    if is_download_products:

        is_all_products_completed = True
        # We only attempt to download products when each of the items is in complete status.
        for i in range(len(product_status_response["value"])):

            product_id = product_status_response["value"][i]["Id"]
            product_status = product_status_response["value"][i]["Status"]

            if product_status != "completed":
                is_all_products_completed = False

        # Can download if all products completed
        if is_all_products_completed:

            for i in range(len(product_status_response["value"])):

                product_id = product_status_response["value"][i]["Id"]
                product_status = product_status_response["value"][i]["Status"]

                # Infer the url of the product
                url_product = f"{hook_service_root_url}BatchOrder({order_id})/Product({product_id})/$value"
                print(f"url_product: {url_product}")
                # Download the product
                r = requests.get(
                    url_product, headers=api_headers, allow_redirects=True
                )
                product_file_name = f"output-{workflow}-{order_id}-{product_id}.zip"
                open(product_file_name, "wb").write(r.content)
                print(f"Download Complete: product_file_name: {product_file_name}")

        else:
            print(f"Status for order:{order_id} - At least one of the products does not have the status of 'completed'.")