Skip to article frontmatterSkip to article content

Hook - Perform data harvesting

This Notebook demonstrates how to perform data harvesting with Hook.

🚀 Launch in JupyterHub

The first step is to import the dependencies that allow the script to run

import json
from io import BytesIO
from urllib.parse import urlencode
import getpass
import pycurl
import requests
from IPython.display import JSON

The following implement methods retrieve the token required to run the workflow

import requests
from lxml import html
from urllib.parse import parse_qs, urlparse

IAM_URL = "https://auth.destine.eu/"
CLIENT_ID = "dedl-hook"
REALM = "desp"
SERVICE_URL = "https://odp.data.destination-earth.eu/odata/v1/"
TEST_RUN_ID = "004"


class DESPAuth:
    def __init__(self, username, password):
        self.username = username
        self.password = password

    def get_token(self):
        with requests.Session() as s:

            # Get the auth url
            auth_url = html.fromstring(s.get(url=IAM_URL + "/realms/" + REALM + "/protocol/openid-connect/auth",
                                     params = {
                                            "client_id": CLIENT_ID,
                                            "redirect_uri": SERVICE_URL,
                                            "scope": "openid",
                                            "response_type": "code"
                                     }
                                       ).content.decode()).forms[0].action
            
            # Login and get auth code
            login = s.post(auth_url,
                            data = {
                                "username" : self.username,
                                "password" : self.password,
                            },
                            allow_redirects=False
            )


            # We expect a 302, a 200 means we got sent back to the login page and there's probably an error message
            if login.status_code == 200:
                tree = html.fromstring(login.content)
                error_message_element = tree.xpath('//span[@id="input-error"]/text()')
                error_message = error_message_element[0].strip() if error_message_element else 'Error message not found'
                raise Exception(error_message)

            if login.status_code != 302:
                raise Exception("Login failed")
            

            auth_code = parse_qs(urlparse(login.headers["Location"]).query)['code'][0]

            # Use the auth code to get the token
            response = requests.post(IAM_URL + "/realms/" + REALM + "/protocol/openid-connect/token",
                    data = {
                        "client_id" : CLIENT_ID,
                        "redirect_uri" : SERVICE_URL,
                        "code" : auth_code,
                        "grant_type" : "authorization_code",
                        "scope" : ""
                    }
                )
            
            if response.status_code != 200:
                raise Exception("Failed to get token")

            token = response.json()['access_token']
        

            return token

class DEDLAuth:
    def __init__(self, desp_access_token):
        self.desp_access_token = desp_access_token

    def get_token(self):
        DEDL_TOKEN_URL='https://identity.data.destination-earth.eu/auth/realms/dedl/protocol/openid-connect/token'
        DEDL_CLIENT_ID='hda-public'
        AUDIENCE='hda-public'
        
        data = { 
            "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", 
            "subject_token": self.desp_access_token,
            "subject_issuer": "desp-oidc",
            "subject_token_type": "urn:ietf:params:oauth:token-type:access_token",
            "client_id": DEDL_CLIENT_ID,
            "audience": AUDIENCE
        }

        response = requests.post(DEDL_TOKEN_URL, data=data)
        
        print("Response code:", response.status_code)

        if response.status_code == 200: 
            dedl_token = response.json()["access_token"]
            return dedl_token
        else: 
            print(response.json())
            print("Error obtaining DEDL access token")
            
class AuthHandler:
    def __init__(self, username, password):
        self.username = username
        self.password = password
        self.desp_access_token = None
        self.dedl_access_token = None
    
    def get_token(self):
        # Get DESP auth token
        desp_auth = DESPAuth(self.username, self.password)
        self.desp_access_token = desp_auth.get_token()
        
        # Get DEDL auth token
        dedl_auth = DEDLAuth(self.desp_access_token)
        self.dedl_access_token = dedl_auth.get_token()
        
        return self.dedl_access_token

Users must provide their credentials to the DESP platform to retrieve an access token

print('Provide credentials for the DESP')
desp_username = input('DESP Username: ')
desp_password = getpass.getpass(prompt='DESP Password: ', stream=None) 

token = AuthHandler(desp_username, desp_password)          
access_token = token.get_token()
api_headers = {'Authorization': 'Bearer ' + access_token}

We can review an accepted parameters for workflows

workflow_options_url = SERVICE_URL + "Workflows?$filter=Name+eq+\'data-harvest\'&$expand=WorkflowOptions"
result = requests.get(workflow_options_url, headers=api_headers).json()
available_workflows = json.dumps(result,indent=2)
JSON(result)

Prepare Worflow Options for data-harvest workflow

workflow = "data-harvest"
identifier_list = ["S2A_MSIL2A_20240501T095031_N0510_R079_T33UXT_20240501T135852.SAFE"]
order_body_custom_bucket = {
        "Name": "DEDL - Hook introduction support " + workflow + " - " + TEST_RUN_ID,
        "WorkflowName": workflow,
        "IdentifierList": identifier_list,
        "WorkflowOptions":[
            {"Name": "output_storage", "Value": "TEMPORARY"},
            {"Name": "source_type", "Value": "DESP"},
            {"Name": "desp_source_collection", "Value": "EO.ESA.DAT.SENTINEL-2.MSI.L2A"},
            {"Name": "desp_source_username", "Value": desp_username},
            {"Name": "desp_source_password", "Value": desp_password},
            
        ]
    }

Make a request to run the workflow with the parameters just set

request = requests.post(
    SERVICE_URL + "BatchOrder/OData.CSC.Order",
    json.dumps(order_body_custom_bucket),
    headers=api_headers
)
resp = request.json()
print(request.status_code)
order_id = resp['value']['Id']
JSON(resp, indent=2)

Review information about the products processed in the order

batch_order_items = requests.get(SERVICE_URL + 'BatchOrder(' + str(order_id) + ')/Products', headers=api_headers).json()
JSON(batch_order_items, indent=2)

Review processed product status

batch_order_items['value'][0]['Status']

Review processed product DownloadLink

batch_order_items['value'][0]['DownloadLink']