Skip to article frontmatterSkip to article content

STACK service - Python Client Dask

This notebook demonstrates how to use the DEDL Stack Python client to authenticate, manage, and execute parallel, multi-cloud Dask computations on distributed datasets stored across Central Site and LUMI bridge.

🚀 Launch in JupyterHub

Multi-cloud processing with Dask

Overview

Content
  • DestinE Data Lake (DEDL) Stack Client

  • Making use of clients context manager

  • Use Case: Pakistan Flood 2022

  • Duration: 15 min.

    Please make sure Python DEDL kernel is used.

    DestinE Data Lake utilises a deployment of Dask Gateway on each location (bridge) in the data lake. Dask Gateway provides a secure, multi-tenant server for managing Dask clusters. It allows users to launch and use Dask clusters in a shared, centrally managed cluster environment, without requiring users to have direct access to the underlying cluster backend (e.g. Kubernetes, Hadoop/YARN, HPC Job queues, etc…).

    Dask Gateway exposes a REST API to spawn clusters on demand. The overall architecture of Dask Gateway is depicted hereafter.

    DEDL Dask Gateway

    Central Site

    LUMI Bridge

    Only authenticated access is granted to the DEDL STACK service Dask, therefore a helper class to authenticate a user against the DESP identity management system is implemented. The users password is directly handed over to the request object and is not permanently stored.

    In the following, please enter your DESP username and password. Again, the password will only be saved for the duration of this user session and will be remove as soon as the notebook/kernel is closed.

    from dask_gateway.auth import GatewayAuth
    from getpass import getpass
    from destinelab import AuthHandler as DESP_AuthHandler
    
    class DESPAuth(GatewayAuth):
        def __init__(self, username: str):
            self.auth_handler = DESP_AuthHandler(username, getpass("Please input your DESP password: "))
            self.access_token = self.auth_handler.get_token()
        
        def pre_request(self, _):
            headers = {"Authorization": "Bearer " + self.access_token}
            return headers, None
    from rich.prompt import Prompt
    myAuth = DESPAuth(username=Prompt.ask(prompt="Username"))
    Loading...

    DestinE Data Lake (DEDL) Stack Client

    The DEDL Stack Client is a Python library to facilitate the use of Stack Service Dask. The main objective is to provide an abstraction layer to interact with the various clusters on each DEDL bridge. Computations can be directed to the different Dask clusters by making use of a context manager as given in the following.

    from dedl_stack_client.dask import DaskMultiCluster
    
    myDEDLClusters = DaskMultiCluster(auth=myAuth)
    myDEDLClusters.new_cluster()
    Create new cluster for Central Site
    Create new cluster for LUMI Bridge
    
    myDEDLClusters.get_cluster_url()
    http://dask.central.data.destination-earth.eu/clusters/dask-gateway.203d0155443945c986a35b0488db7ebb/status
    http://dask.lumi.data.destination-earth.eu/clusters/dask-gateway.9010455c0c8a44df9a52e15003252889/status
    

    We can again showcase the execution of standard Python functions on the remote clusters. In the following we will make use of dask.futures, non-blocking distributed calculations, utilising the map() method for task distribution. Detailed information about dask.futures can be found on the Dask documention.

    This approach allows for embarrassingly parallel task scheduling, which is very similar to Function as a Service capabilities.

    from time import sleep as wait
    
    def apply_myfunc(x):
        wait(1)
        return x+1

    We want to run apply_myfunc() on Central Site and wait for all results to be ready. my_filelist_central represents a filelist to be processed by apply_myfunc().

    my_filelist_central = range(20)
    with myDEDLClusters.as_current(location="central") as myclient:
        central_future = myclient.map(apply_myfunc, my_filelist_central)
        results_central = myclient.gather(central_future)
    results_central
    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

    Run computation at LUMI bridge.

    my_filelist_lumi = range(32)
    with myDEDLClusters.as_current(location="lumi") as myclient:
        lumi_future = myclient.map(apply_myfunc, my_filelist_lumi)
        results_lumi = myclient.gather(lumi_future)
    results_lumi
    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]

    Limitations

    • Python libraries use in the local environment need to match, same version, with those available in the Dask Cluster. If this is not the case, you will get a warning, code might work but not guaranteed.

    • No direct data exchange between Dask Workers across cloud locations possible. Each location acts as atmoic unit, however data can be easily exchanged via storage services such as S3.

    Use Case example: Pakistan Flood 2022

    The complete use case is available on GitHub via https://github.com/destination-earth/DestinE_EUMETSAT_PakistanFlood_2022.

    The use case demonstrates the multi-cloud capabilities of DEDL following the paradigm of data proximate computing. Data of the Global Flood Monitoring (GFM) service as well as Climate DT outputs, simulated by utilising ERA5 data have been use for flood risk assessment.

    Data is stored as datacubes (zarr format) at Central Site and at LUMI bridge in object storage.

    import s3fs
    import xarray as xr
    
    xr.set_options(keep_attrs=True)
    
    s3fs_central = s3fs.S3FileSystem(
        anon=True,
        use_ssl=True,
        client_kwargs={"endpoint_url": "https://s3.central.data.destination-earth.eu"})
    
    s3fs_lumi = s3fs.S3FileSystem(
        anon=True,
        use_ssl=True,
        client_kwargs={"endpoint_url": "https://s3.lumi.data.destination-earth.eu"})

    We can list the data available at Central Site.

    s3fs_central.ls("increment1-testdata")
    ['increment1-testdata/2022-08-30.zarr', 'increment1-testdata/built_data.zarr']

    Read data stored in S3 bucket at Central Site. The data we want to read is a single Zarr data store representing the GFM flood data over Pakistan for 2022-08-30.

    flood_map = xr.open_zarr(store=s3fs.S3Map(root=f"increment1-testdata/2022-08-30.zarr", s3=s3fs_central, check=False),
                             decode_coords="all",)["flood"].assign_attrs(location="central", resolution=20)
    #flood_map

    We now want to run simple computation and compute the flooded area for the this day in August 2022.

    flooded_area_ = flood_map.sum()*20*20/1000.
    #flooded_area_

    So far we haven’t computed anything, so lets do the computation now on the Dask cluster.

    from rich.console import Console
    from rich.prompt import Prompt
    console = Console()
    
    flooded_area = myDEDLClusters.compute(flooded_area_, sync=True)
    console.print(f"Flooded area: {flooded_area.data} km2")
    Loading...

    How was that processing routed to Dask Gateway at Central Site?

    myDEDLClusters.compute(flooded_area_, sync=True) checks for annotations (attributes) of array and maps that to available Dask Clusters.

    Preprocess GFM data at Central Site for visualiation

    def preprocess_dataset(data_array: xr.DataArray, method: str):
        data_array = data_array.squeeze()
        steps = 500 // data_array.attrs["resolution"]
        coarsened = data_array.coarsen({'y': steps, 'x': steps}, boundary='trim')
        if method == 'median':
            data_array = (coarsened.median() > 0).astype('float32')
        elif method == 'mean':
            data_array = coarsened.mean()
        elif method == 'max':
            data_array = coarsened.max()
        else:
            raise NotImplementedError(method)
        return data_array
    flood_prep_ = preprocess_dataset(flood_map, 'median')
    import numpy as np
    flood_prep = myDEDLClusters.compute(flood_prep_, sync=True)
    flood_prep.rio.write_crs("epsg:4326", inplace=True)
    flood_prep = flood_prep.rio.reproject(f"EPSG:3857", nodata=np.nan)

    Visualise flood data on map.

    import leafmap
    from attr import dataclass
    
    @dataclass
    class Extent:
        min_x: float
        min_y: float
        max_x: float
        max_y: float
        crs: str
        def get_center(self):
            return (np.mean([self.min_y, self.max_y]),
                    np.mean([self.min_x,self.max_x]))
    
    
    roi_extent = Extent(65, 21, 71, 31, crs='EPSG:4326')
    
    m = leafmap.Map(center=roi_extent.get_center(),
                    zoom=8, height=600)
    m.add_raster(flood_prep, colormap="Blues", layer_name="Flood", nodata=0.)
    
    m
    Loading...

    Read data stored in S3 bucket at LUMI bridge (Finland). Data we want to read is a datacube generated from ERA-5 representing predicted rainfall data.

    rainfall = xr.open_zarr(store=s3fs.S3Map(root=f"increment1-testdata/predicted_rainfall.zarr",
                                             s3=s3fs_lumi,
                                             check=False),
                            decode_coords="all",)["tp"].assign_attrs(location="lumi", resolution=20)

    And again run the computation close to the data, therefore at LUMI bridge.

    First we compute the accumulated rainfall over Pakistan. Secondly we compute the average rainfall for August 2022 (monthly mean) at global scale.

    from datetime import datetime
    
    def accum_rain_predictions(rain_data, startdate, enddate, extent):
        rain_ = rain_data.sel(time=slice(startdate, enddate),
                              latitude=slice(extent.max_y, extent.min_y),
                              longitude=slice(extent.min_x, extent.max_x))
        return rain_.cumsum(dim="time", keep_attrs=True)*1000
    
    # compute accumulated rainfall over Pakistan
    acc_rain_ = accum_rain_predictions(rainfall, startdate=datetime(2022, 8, 18),
                                                      enddate=datetime(2022, 8, 30),
                                                      extent=roi_extent)
    acc_rain_ = acc_rain_.rename({"longitude":"x", "latitude":"y"})
    acc_rain = myDEDLClusters.compute(acc_rain_, sync=True)
    def acc_rain_reproject(rain):
        from rasterio.enums import Resampling
        rain.rio.write_nodata(0, inplace=True)
        rain.rio.write_crs('EPSG:4326', inplace=True)
        return rain.rio.reproject('EPSG:3857', resolution=500, resampling=Resampling.bilinear)
    
    acc_rain = acc_rain_reproject(acc_rain)

    Visualise forecast data provided by the Digital Twin which could have been used for flood risk assessment or even alerting.

    time_dim_len = acc_rain.shape[0]
    for day in range(0, time_dim_len):
        fpath_str = f"./{day}.tif"
        acc_rain[day,:].rio.to_raster(fpath_str,
                                      driver="COG",
                                      overview_count=10)
    import leafmap
    from localtileserver import get_leaflet_tile_layer
    
    m = leafmap.Map(center=roi_extent.get_center(),
                    zoom=6, height=600)
    
    layer_dict = {}
    date_vals = np.datetime_as_string(acc_rain["time"].values, unit='D')
    for day in range(0, time_dim_len):
        layer_dict[date_vals[day]]= get_leaflet_tile_layer(f"./{day}.tif",
                                                           colormap="Blues",
                                                           indexes=[1],
                                                           nodata=0.,
                                                           vmin=acc_rain.min().values,
                                                           vmax=acc_rain.max().values,
                                                           opacity=0.85)
    
    m.add_local_tile(flood_prep,
                     colormap="Blues",
                     nodata=0.)
    m.add_time_slider(layer_dict,
                      layer="Accumluated Rainfall",
                      time_interval=1.)
    m
    myDEDLClusters.shutdown()