
Author: EUMETSAT
Copyright: 2024 EUMETSAT
Licence: MIT
DEDL - Hook Tutorial - Data Harvest (data-harvest)#
This notebook demonstrates how to use the Hook service.
Author: EUMETSAT
The detailed API and definition of each endpoint and parameters is available in the OnDemand Processing API OData v1 OpenAPI documentation found at: https://odp.data.destination-earth.eu/odata/docs
Further documentation is available at: https://destine-data-lake-docs.data.destination-earth.eu/en/latest/dedl-big-data-processing-services/Hook-service/Hook-service.html
Install python package requirements and import environment variables#
# Note: The destinelab python package (which helps with authentication) is available already if you are using Python DEDL kernel
# Otherwise, the destinelab python package can be installed by uncommenting the following line
# For the importing of environment variables using the load_dotenv(...) command
%pip install python-dotenv
# for example code navigating private S3 compatible storage (PRIVATE bucket storage)
%pip install boto3
import os
import json
import requests
from dotenv import load_dotenv
from getpass import getpass
import destinelab as destinelab
# Load (optional) notebook specific environment variables from .env_tutorial
load_dotenv("./.env_tutorial", override=True)
Authentification - Get token#
# By default users should use their DESP credentials to get an Access_token
# This token is added as an Authorisation Header when interacting with the Hook Service API
# Enter DESP credentials.
DESP_USERNAME = input("Please input your DESP username or email: ")
DESP_PASSWORD = getpass("Please input your DESP password: ")
token = destinelab.AuthHandler(DESP_USERNAME, DESP_PASSWORD)
access_token = token.get_token()
# Check the status of the request
if access_token is not None:
print("DEDL/DESP Access Token Obtained Successfully")
# Save API headers
api_headers = {"Authorization": "Bearer " + access_token}
else:
print("Failed to Obtain DEDL/DESP Access Token")
Setup static variables#
# Hook service url (ending with odata/v1/ - e.g. https://odp.data.destination-earth.eu/odata/v1/)
hook_service_root_url = "https://odp.data.destination-earth.eu/odata/v1/"
List available workflows#
Next we can check what possible workflows are available to us by using method
https://odp.data.destination-earth.eu/odata/v1/Workflows
# Send request and return json object listing all provided workfows, ordered by Id
result = requests.get(
f"{hook_service_root_url}Workflows?$orderby=Id asc", headers=api_headers
).json()
print("List of available DEDL provided Hooks")
for i in range(len(result["value"])):
print(
f"Name:{str(result['value'][i]['Name']).ljust(20, ' ')}DisplayName:{str(result['value'][i]['DisplayName'])}"
) # print JSON string
# Print result JSON object: containing provided workflow list
workflow_details = json.dumps(result, indent=4)
print(workflow_details)
Select a workflow and see parameters#
If we want to see the details of a specific workflow, showing us the parameters that can be set for that workflow, we can add a filter to the query as follows:
https://odp.data.destination-earth.eu/odata/v1/Workflows?$expand=WorkflowOptions&$filter=(Name eq data-harvest)
\\(expand=WorkflowOptions** shows all parameters accepted by workflow **\\\)filter=(Name eq data-harvest) narrows the result to workflow called “data-harvest”
# Select workflow : defaults to data-harvest
workflow = os.getenv("HOOK_WORKFLOW", "data-harvest")
print(f"workflow: {workflow}")
# Send request
result = requests.get(
f"{hook_service_root_url}Workflows?$expand=WorkflowOptions&$filter=(Name eq '{workflow}')",
headers=api_headers,
).json()
workflow_details = json.dumps(result, indent=4)
print(workflow_details) # print formatted workflow_details, a JSON string
Order selected workflow#
The order selected above will now be configured and executed.
e.g. workflow = “data-harvest”.
Make an order to ‘harvest data’ using Harmonised Data Access API.
i.e. data from an input source can be transferred to a Private bucket or a Temporary storage bucket.
Name your order#
# Here we set the variable order_name, this will allow us to:
# Easily identify the running process (e.g. when checking the status)
# order_name is added as a suffix to the order 'Name'
order_name = os.getenv("HOOK_ORDER_NAME") or input("Name your order: ")
print(f"order_name:{order_name}")
Define output storage#
In workflow parameters, among others values, storage to retreive the result has to be provided.
Two possibilites:
Use your user storage
Use a temporary storage
1. - Your user storage (provided by DEDL ISLET service)#
Example using a S3 bucket created with ISLET Storage service - result will be available in this bucket
workflow parameter: {“Name”: “output_storage”, “Value”: “PRIVATE”}
# Output storage - Islet service
# Note: If you want the output to go to your own PRIVATE bucket rather than TEMPORARY storage (expires after 2 weeks),
# i) This Configuration will need to be updated with your output_bucket, output_storage_access_key, output_secret_key, output_prefix
# ii) You will need to change the output_storage in the order to PRIVATE and add the necessary source_ parameters (see workflow options and commented example)
# URL of the S3 endpoint in the Central Site (or lumi etc.)
output_storage_url = "https://s3.central.data.destination-earth.eu"
# output_storage_url = "https://s3.lumi.data.destination-earth.eu"
# Name of the object storage bucket where the results will be stored.
output_bucket = os.getenv("HOOK_OUTPUT_BUCKET", "your-bucket-name")
print(f"output_bucket : {output_bucket}")
# Islet object storage credentials (openstack ec2 credentials)
output_storage_access_key = os.getenv("HOOK_OUTPUT_STORAGE_ACCESS_KEY", "your-access-key")
output_storage_secret_key = os.getenv("HOOK_OUTPUT_STORAGE_SECRET_KEY", "your-secret-key")
print(f"output_storage_access_key: {output_storage_access_key}")
print(f"output_storage_secret_key: {output_storage_secret_key}")
# This is the name of the folder in your output_bucket where the output of the hook will be stored.
# Here we concatenate 'dedl' with the 'workflow' and 'order_name'
output_prefix = f"dedl-{workflow}-{order_name}"
print(f"output_prefix : {output_prefix}")
2 - Use temporary storage#
The result of processing will be stored in shared storage and download link provided in the output product details
workflow parameter: {“Name”: “output_storage”, “Value”: “TEMPORARY”}
Define parameters and send order#
# URL of the STAC server where your collection/item can be downloaded
stac_hda_api_url = "https://hda.data.destination-earth.eu/stac"
# Note: The data (collection_id and data_id) will have been previously discovered and searched for
# Set collection where the item can be found : defaults to example for data-harvest
collection_id = os.getenv("HOOK_COLLECTION_ID", "EO.ESA.DAT.SENTINEL-2.MSI.L1C")
print(f"STAC collection url: {stac_hda_api_url}/collections/{collection_id}")
# Set the Item to Retrieve : defaults to example for data-harvest. If Multiple Values, provide comma separated list
data_id = os.getenv("HOOK_DATA_ID", "S2A_MSIL1C_20230910T050701_N0509_R019_T47VLH_20230910T074321.SAFE")
print(f"data_id: {data_id}")
identifier_list = [data_id_element.strip() for data_id_element in data_id.split(',')]
# Get boolean value from String, default (False)
is_private_storage = os.getenv("HOOK_IS_PRIVATE_STORAGE", "False") == "True"
print(f"is_private_storage: {is_private_storage}")
# we use source_type to add DESP or EXTERNAL specific configuration
source_type = os.getenv("HOOK_SOURCE_TYPE", "DESP")
print(f"source_type: {source_type}")
if source_type == "EXTERNAL":
EXTERNAL_USERNAME = os.getenv("HOOK_EXTERNAL_USERNAME", "EXTERNAL_USERNAME")
EXTERNAL_PASSWORD = os.getenv("HOOK_EXTERNAL_PASSWORD", "EXTERNAL_PASSWORD")
EXTERNAL_TOKEN_URL = os.getenv("HOOK_EXTERNAL_TOKEN_URL", "EXTERNAL_TOKEN_URL")
EXTERNAL_CLIENT_ID = os.getenv("HOOK_EXTERNAL_CLIENT_ID", "EXTERNAL_CLIENT_ID")
########## BUILD ORDER BODY : CHOOSE PRIVATE or TEMPORARY output_storage ##########
# Initialise the order_body
order_body_custom_bucket = {
"Name": "Tutorial " + workflow + " - " + order_name,
"WorkflowName": workflow,
"IdentifierList": identifier_list,
"WorkflowOptions": [],
}
##### Configure PRIVATE OR TEMPORARY STORAGE #####
if is_private_storage:
print("##### Preparing Order Body for PRIVATE STORAGE #####")
order_body_custom_bucket["WorkflowOptions"].extend(
[
{"Name": "output_storage", "Value": "PRIVATE"},
{"Name": "output_s3_access_key", "Value": output_storage_access_key},
{"Name": "output_s3_secret_key", "Value": output_storage_secret_key},
{"Name": "output_s3_path", "Value": f"s3://{output_bucket}/{output_prefix}"},
{"Name": "output_s3_endpoint_url", "Value": output_storage_url}
]
)
else:
print("##### Preparing Order Body for TEMPORARY STORAGE #####")
order_body_custom_bucket["WorkflowOptions"].extend(
[
{"Name": "output_storage", "Value": "TEMPORARY"},
]
)
##### Configure SOURCE_TYPE and associated parameters #####
if source_type == "DESP":
# Using DESP credentials is standard way of executing Hooks.
print("##### Preparing Order Body for access to DEDL HDA using DESP Credentials #####")
order_body_custom_bucket["WorkflowOptions"].extend(
[
{"Name": "source_type", "Value": "DESP"},
{"Name": "desp_source_username", "Value": DESP_USERNAME},
{"Name": "desp_source_password", "Value": DESP_PASSWORD},
{"Name": "desp_source_collection", "Value": collection_id}
]
)
elif source_type == "EXTERNAL":
# Build your order body : Example using EXTERNAL source type and source_catalogue_api_type STAC.
# This would allow you to access products directly from a configured STAC server
# Here we show an example configuration of a STAC server with OIDC security, that could be adapted to your needs (change urls, etc)
# This is shown for example purposes only. The standard way of configuring is with DESP source_type seen above.
print("##### Preparing Order Body for access to EXTERNAL STAC Server using EXTERNAL Credentials #####")
order_body_custom_bucket["WorkflowOptions"].extend(
[
{"Name": "source_type", "Value": "EXTERNAL"},
{"Name": "source_catalogue_api_url", "Value": stac_hda_api_url},
{"Name": "source_catalogue_api_type", "Value": "STAC"},
{"Name": "source_token_url", "Value": EXTERNAL_TOKEN_URL},
{"Name": "source_grant_type", "Value": "PASSWORD"},
{"Name": "source_auth_header_name", "Value": "Authorization"},
{"Name": "source_username", "Value": EXTERNAL_USERNAME},
{"Name": "source_password", "Value": EXTERNAL_PASSWORD},
{"Name": "source_client_id", "Value": EXTERNAL_CLIENT_ID},
{"Name": "source_client_secret", "Value": ""},
{"Name": "source_catalogue_collection", "Value": collection_id}
]
)
else:
print("source_type not equal to DESP or EXTERNAL")
########## ADDITIONAL OPTIONS ##########
additional_options = []
# Checks environment variables for the form HOOK_ADDITIONAL1="NAME=12345;VALUE=abcdef"
for env_key, env_value in os.environ.items():
if env_key.startswith('HOOK_ADDITIONAL'):
#print(f"{env_key}: {env_value}")
parts = env_value.split(';')
# Extract the name and value
name = parts[0].split('=')[1]
value = parts[1].split('=')[1]
value_type = parts[2].split('=')[1]
additional_options.append({"Name": name, "Value": value if value_type == 'str' else int(value)})
print(f"addditional_options:{additional_options}")
if additional_options:
print("Adding additional_options")
order_body_custom_bucket["WorkflowOptions"].extend(additional_options)
########## BUILD ORDER BODY : END ##########
# Send order
order_request = requests.post(
hook_service_root_url + "BatchOrder/OData.CSC.Order",
json.dumps(order_body_custom_bucket),
headers=api_headers,
).json()
# If code = 201, the order has been successfully sent
# Print order_request JSON object: containing order_request details
order_reques_details = json.dumps(order_request, indent=4)
print(order_reques_details)
order_id = order_request['value']['Id']
print(f"order 'Id' from order_request: {order_id}")
It is possible to order multiple product using endpoint:
https://odp.data.destination-earth.eu/odata/v1/BatchOrder/OData.CSC.Order
Check The status of the order#
Possible status values
queued (i.e. queued for treatment but not started)
in_progress (i.e. order being treated)
completed (i.e. order is complete and data ready)
# ProductionOrders endpoint gives status of orders (only with one item attached)
# Otherwise use BatchOrder(XXXX)/Products endpoint to get status of individual items associated with order
if len(identifier_list) == 1:
order_status_url = f"{hook_service_root_url}ProductionOrders"
params = {"$filter": f"id eq {order_id}"}
order_status_response = requests.get(order_status_url, params=params, headers=api_headers).json()
print(json.dumps(order_status_response, indent=4))
# Get Status of all items of an order in this way
order_status_response = requests.get(
f"{hook_service_root_url}BatchOrder({order_id})/Products",
headers=api_headers,
).json()
print(json.dumps(order_status_response, indent=4))
Access workflow output#
Private storage#
Let us now check our private storage using this boto3 script. You can also go and check this in the Islet service using the Horizon user interface
# PRIVATE STORAGE: Prints contents of Private Bucket
import boto3
if is_private_storage:
s3 = boto3.client(
"s3",
aws_access_key_id=output_storage_access_key,
aws_secret_access_key=output_storage_secret_key,
endpoint_url=output_storage_url,
)
paginator = s3.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=output_bucket, Prefix=output_prefix + "/")
for page in pages:
try:
for obj in page["Contents"]:
print(obj["Key"])
except KeyError:
print("No files exist")
exit(1)
Temporary storage#
# List order items within a production order
# When the output_storage is of type TEMPORARY we can get a DownloadLink from the following code (Can also optionally download items here in code with the flag is_download_products)
# If TEMPORARY storage
if not is_private_storage:
# Set to True to download products at the same level as the notebook file. File name will be in format "output-{workflow}-{order_id}-{product_id}.zip"
is_download_products = False
# Get Status of all items of an order in this way
product_status_response = requests.get(
f"{hook_service_root_url}BatchOrder({order_id})/Products",
headers=api_headers,
).json()
print(json.dumps(product_status_response, indent=4))
if is_download_products:
is_all_products_completed = True
# We only attempt to download products when each of the items is in complete status.
for i in range(len(product_status_response["value"])):
product_id = product_status_response["value"][i]["Id"]
product_status = product_status_response["value"][i]["Status"]
if product_status != "completed":
is_all_products_completed = False
# Can download if all products completed
if is_all_products_completed:
for i in range(len(product_status_response["value"])):
product_id = product_status_response["value"][i]["Id"]
product_status = product_status_response["value"][i]["Status"]
# Infer the url of the product
url_product = f"{hook_service_root_url}BatchOrder({order_id})/Product({product_id})/$value"
print(f"url_product: {url_product}")
# Download the product
r = requests.get(
url_product, headers=api_headers, allow_redirects=True
)
product_file_name = f"output-{workflow}-{order_id}-{product_id}.zip"
open(product_file_name, "wb").write(r.content)
print(f"Download Complete: product_file_name: {product_file_name}")
else:
print(f"Status for order:{order_id} - At least one of the products does not have the status of 'completed'.")