The first step is to import the dependencies that allow the script to run
import json
from io import BytesIO
from urllib.parse import urlencode
import getpass
import pycurl
import requests
from IPython.display import JSON
The following implement methods retrieve the token required to run the workflow
import requests
from lxml import html
from urllib.parse import parse_qs, urlparse
IAM_URL = "https://auth.destine.eu/"
CLIENT_ID = "dedl-hook"
REALM = "desp"
SERVICE_URL = "https://odp.data.destination-earth.eu/odata/v1/"
TEST_RUN_ID = "004"
class DESPAuth:
def __init__(self, username, password):
self.username = username
self.password = password
def get_token(self):
with requests.Session() as s:
# Get the auth url
auth_url = html.fromstring(s.get(url=IAM_URL + "/realms/" + REALM + "/protocol/openid-connect/auth",
params = {
"client_id": CLIENT_ID,
"redirect_uri": SERVICE_URL,
"scope": "openid",
"response_type": "code"
}
).content.decode()).forms[0].action
# Login and get auth code
login = s.post(auth_url,
data = {
"username" : self.username,
"password" : self.password,
},
allow_redirects=False
)
# We expect a 302, a 200 means we got sent back to the login page and there's probably an error message
if login.status_code == 200:
tree = html.fromstring(login.content)
error_message_element = tree.xpath('//span[@id="input-error"]/text()')
error_message = error_message_element[0].strip() if error_message_element else 'Error message not found'
raise Exception(error_message)
if login.status_code != 302:
raise Exception("Login failed")
auth_code = parse_qs(urlparse(login.headers["Location"]).query)['code'][0]
# Use the auth code to get the token
response = requests.post(IAM_URL + "/realms/" + REALM + "/protocol/openid-connect/token",
data = {
"client_id" : CLIENT_ID,
"redirect_uri" : SERVICE_URL,
"code" : auth_code,
"grant_type" : "authorization_code",
"scope" : ""
}
)
if response.status_code != 200:
raise Exception("Failed to get token")
token = response.json()['access_token']
return token
class DEDLAuth:
def __init__(self, desp_access_token):
self.desp_access_token = desp_access_token
def get_token(self):
DEDL_TOKEN_URL='https://identity.data.destination-earth.eu/auth/realms/dedl/protocol/openid-connect/token'
DEDL_CLIENT_ID='hda-public'
AUDIENCE='hda-public'
data = {
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"subject_token": self.desp_access_token,
"subject_issuer": "desp-oidc",
"subject_token_type": "urn:ietf:params:oauth:token-type:access_token",
"client_id": DEDL_CLIENT_ID,
"audience": AUDIENCE
}
response = requests.post(DEDL_TOKEN_URL, data=data)
print("Response code:", response.status_code)
if response.status_code == 200:
dedl_token = response.json()["access_token"]
return dedl_token
else:
print(response.json())
print("Error obtaining DEDL access token")
class AuthHandler:
def __init__(self, username, password):
self.username = username
self.password = password
self.desp_access_token = None
self.dedl_access_token = None
def get_token(self):
# Get DESP auth token
desp_auth = DESPAuth(self.username, self.password)
self.desp_access_token = desp_auth.get_token()
# Get DEDL auth token
dedl_auth = DEDLAuth(self.desp_access_token)
self.dedl_access_token = dedl_auth.get_token()
return self.dedl_access_token
Users must provide their credentials to the DESP platform to retrieve an access token
print('Provide credentials for the DESP')
desp_username = input('DESP Username: ')
desp_password = getpass.getpass(prompt='DESP Password: ', stream=None)
token = AuthHandler(desp_username, desp_password)
access_token = token.get_token()
api_headers = {'Authorization': 'Bearer ' + access_token}
We can review an accepted parameters for workflows
workflow_options_url = SERVICE_URL + "Workflows?$filter=Name+eq+\'data-harvest\'&$expand=WorkflowOptions"
result = requests.get(workflow_options_url, headers=api_headers).json()
available_workflows = json.dumps(result,indent=2)
JSON(result)
Prepare Worflow Options for data-harvest workflow
workflow = "data-harvest"
identifier_list = ["S2A_MSIL2A_20240501T095031_N0510_R079_T33UXT_20240501T135852.SAFE"]
order_body_custom_bucket = {
"Name": "DEDL - Hook introduction support " + workflow + " - " + TEST_RUN_ID,
"WorkflowName": workflow,
"IdentifierList": identifier_list,
"WorkflowOptions":[
{"Name": "output_storage", "Value": "TEMPORARY"},
{"Name": "source_type", "Value": "DESP"},
{"Name": "desp_source_collection", "Value": "EO.ESA.DAT.SENTINEL-2.MSI.L2A"},
{"Name": "desp_source_username", "Value": desp_username},
{"Name": "desp_source_password", "Value": desp_password},
]
}
Make a request to run the workflow with the parameters just set
request = requests.post(
SERVICE_URL + "BatchOrder/OData.CSC.Order",
json.dumps(order_body_custom_bucket),
headers=api_headers
)
resp = request.json()
print(request.status_code)
order_id = resp['value']['Id']
JSON(resp, indent=2)
Review information about the products processed in the order
batch_order_items = requests.get(SERVICE_URL + 'BatchOrder(' + str(order_id) + ')/Products', headers=api_headers).json()
JSON(batch_order_items, indent=2)
Review processed product status
batch_order_items['value'][0]['Status']
Review processed product DownloadLink
batch_order_items['value'][0]['DownloadLink']