Copernicus data exploration#


In this notebook, we search for the different caracteristics of the required data from copernicus marine sevices in order to add them to a yaml file that will store all the uris for relevant products and help us improve how we access data from cmems.#

Here is a little list of definitions of the different keywords use in this notebook :#


  • Product : A product is a set of dataset avaible from the Copenicus Marine Data Store. It is distinguishable thanks to is product ID, example : GLOBAL_ANALYSISFORECAST_PHY_001_024.

  • Dataset : A dataset is a collection of data points, each described by multiple variables. For a given dataset, the coordinates are fixed and indicate where data is present for each variable for a given coordinate. In cmems a dataset has a unique identifier such as : cmems_mod_glo_phy-cur_anfc_0.083deg_P1D-m and contains some info inside, such as the spatial and temporal resolution. Here, 0.083deg means that the model has a 0.083*0.083 spatial resoltion and the 1D means that it’s a daily temporal resolution. A dataset is accessible via a URI.

  • Variable : A variable is a part of a dataset, it depends of one or more coordinates. For GFTS, we are interested in 3 particular variables :

  • thetao : sea_water_potential_temperature. Allows us to calculate the difference between the actual temperature and the measured temperature from biologging. Depends on Latitude, Longitude, Depth and time.

  • zos : sea_surface_height_above_geoid. Sea surface height relativly to the geoid. Depends on Latitude, Longitude and time.

  • deptho : sea_floor_depth_below_geoid. Distance between the geoid and the sea floor. Depends on Latitude and Longitude

  • Coordinate : Describe a Variable, where and when the data exists.


First you need to get to Copernicus Marine Data Store, choose a product that can match your usage and then copy the product ID in the classification section on the product page.

import copernicusmarine
import yaml
import intake
import xarray as xr
import os
from tqdm import tqdm
import numpy as np
import s3fs

s3 = s3fs.S3FileSystem(
    anon=False,
    client_kwargs={
        "endpoint_url": "https://s3.gra.perf.cloud.ovh.net",
    },
)

1. Defining the functions#

def generate_copernicus_dict(product_id="GLOBAL_ANALYSISFORECAST_PHY_001_024"):
    """
    Generates a dictionary containing metadata information about the specified
    product from the Copernicus Marine service. This dictionary can be used for
    saving to a YAML file.

    Args:
        product_id (str): The ID of the product to retrieve information about.
                          Default is "GLOBAL_ANALYSISFORECAST_PHY_001_024".

    Returns:
        dict: A dictionary containing metadata and variable information for the product.
    """
    products_info = {}

    # Open the CMEMS catalog and retrieve product details
    catalogue = copernicusmarine.describe(
        include_datasets=True,
        contains=[product_id],
    )

    # Iterate through each dataset in the product
    for dataset in tqdm(catalogue["products"][0]["datasets"], desc="Gathering data"):
        dataset_id = dataset["dataset_id"]

        for service in dataset["versions"][0]["parts"][0]["services"]:
            # Check if the service type is not 'original-files' and contains '.zarr'
            if (service["service_type"]["service_name"] != "original-files") & (
                ".zarr" in service["uri"]
            ):
                # Collect the URI of the service
                uri = service["uri"]
                variable_info = {}
                variable_metadata = {}
                data = xr.open_zarr(uri)
                data_vars = data.data_vars

                # Filters for reaching only the variables we need for GFTS
                if "thetao" in data_vars or "zos" in data_vars or "deptho" in data_vars:
                    # Collect data for each variable in the dataset
                    for var in data_vars:
                        standard_name = data[var].attrs["standard_name"]
                        units = data[var].attrs["units"]

                        coords_info = {}
                        # Collect coordinates information for the variable
                        for coord in data[var].coords:
                            da = data[var][coord]
                            if da.attrs != {} and "units":
                                coords_info[coord] = {
                                    "units": data[var][coord].attrs["units"],
                                }

                            if "step" in da.attrs:
                                variable_metadata[
                                    "spatial_resolution (degrees)"
                                ] = data[var][coord].attrs["step"]

                            coords_info[coord] = {
                                "min_val": float(da[coord].min().data),
                                "max_val": float(da[coord].max().data),
                            }

                        if "latitude" in coords_info and "longitude" in coords_info:
                            bbox = {
                                "latitude": [
                                    coords_info["latitude"]["min_val"],
                                    coords_info["latitude"]["max_val"],
                                ],
                                "longitude": [
                                    coords_info["longitude"]["max_val"],
                                    coords_info["longitude"]["min_val"],
                                ],
                            }

                        if "time" in data[var].coords:
                            step = np.unique(data[var]["time"].diff(dim="time")).astype(
                                "timedelta64[h]"
                            )
                            units = str(data[var]["time"].dtype)
                            coords_info["time"] = {
                                "min_val": str(data[var]["time"].min().data),
                                "max_val": str(data[var]["time"].max().data),
                                "step": int(step.astype(int)[0]),
                                "step_unit": str(step.dtype),
                                "units": units,
                            }

                        try:
                            bbox
                        except NameError:
                            bbox = None
                        # Create metadata for a given variable
                        variable_metadata[var] = {
                            "standard_name": standard_name,
                            "coordinates": coords_info,
                            "dataset_id": dataset_id,
                            "bbox": bbox,
                            "units": units,
                        }

                    # Add metadata to the variable information
                    variable_info["metadata"] = {
                        "description": "Variables available in this product",
                        "variable": variable_metadata,
                    }

                    # Specify the driver information
                    variable_info["driver"] = "zarr"

                    # Modify the URI with the parameter aproach for Intake catlaog
                    if "timeChunked" in uri:
                        variable_info["default"] = "time"
                        variable_info["allowed"] = ["time", "geo"]
                        uri = uri.replace("time", "{{ chunk }}")

                    elif "geoChunked" in uri:
                        variable_info["default"] = "time"
                        variable_info["allowed"] = ["time", "geo"]
                        uri = uri.replace("geo", "{{ chunk }}")

                    variable_info["args"] = {"urlpath": uri, "consolidated": True}

                    # Add the variable information to the product info dictionary
                    products_info[dataset_id] = variable_info

    return products_info
def read_yaml(file):
    """
    Reads a YAML file and returns its content.

    Parameters:
    file (str): Path to the YAML file.

    Returns:
    dict: Parsed content of the YAML file.
    None: If there's an error during parsing.
    """
    with open(file, "r") as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            print(exc)
            return None


def update_copernicus_sources(file, d):
    """
    Updates the 'sources' section of a YAML file with the provided dictionary.

    Parameters:
    file (str): Path to the YAML file to be updated.
    d (dict): Dictionary containing new source data to be added.

    Returns:
    dict: Updated content of the YAML file.
    """
    copernicus_file = read_yaml(file)

    # Check if 'sources' is None and initialize if needed
    if copernicus_file["sources"] is {}:
        copernicus_file["sources"] = d
    else:
        # Update existing 'sources' with new data
        for key in d.keys():
            copernicus_file["sources"][key] = d[key]

    return copernicus_file


def generate_template(filename):
    """
    Generates a template YAML file with predefined structure and saves it.

    Parameters:
    filename (str): Path where the template YAML file will be saved.
    """
    # Check if the file exists
    if not os.path.exists(filename):
        # Create the file if it does not exist
        open(filename, "a").close()

    template = {
        "description": "Uris and data about the relevant products for GFTS project",
        "metadata": {"version": 1},
        "sources": {},
    }
    write_yaml(filename, template)


def generate_master_template(filename):
    """
    Generates a template YAML file with predefined structure and saves it.

    Parameters:
    filename (str): Path where the template YAML file will be saved.
    """
    # Check if the file exists
    if not os.path.exists(filename):
        # Create the file if it does not exist
        open(filename, "a").close()

    template = {
        "description": "Master catalog to read data from copernicus marine data store",
        "metadata": {"version": 1},
        "sources": {},
    }
    write_yaml(filename, template)


def write_yaml(file, content):
    """
    Writes content to a YAML file.

    Parameters:
    file (str): Path to the YAML file.
    content (dict): Content to be written to the YAML file.
    """
    with open(file, "w") as stream:
        try:
            yaml.dump(content, stream, default_flow_style=False, allow_unicode=True)
        except yaml.YAMLError as exc:
            print(exc)
def create_product_yml(product_id, storage_location=""):
    """
    Generates a product YAML file for a specified product ID and stores it in the given location.

    This function creates a YAML file for a product by generating a template, updating it with
    product-specific data, and then writing the updated catalog to the file. The generated YAML
    file will be referenced in the master catalog.

    Parameters:
    product_id (str): The unique identifier for the product.
    storage_location (str): The directory path where the YAML file will be stored. Default is an empty string.

    Returns:
    None
    """

    # Construct the full path for the product YAML file
    full_path = f"{storage_location}{product_id}.yml"

    # Generate a template for the YAML file
    generate_template(full_path)

    # Generate a dictionary with product-specific data
    data = generate_copernicus_dict(product_id)

    # Update the product catalog with the generated data
    update_cat = update_copernicus_sources(full_path, data)

    # Write the updated catalog to the YAML file
    write_yaml(full_path, update_cat)

    # Print a message indicating successful creation of the YAML file
    print(f"{product_id} yml file created at {full_path}")
def update_master_cat(master_cat_path, copernicus_cat_path):
    """
    Updates the master catalog with references to new product catalogs found in the specified directory.

    This function checks if the master catalog file exists. If not, it generates a master template.
    Then, it reads the master catalog, updates it with references to any new product catalogs
    found in the specified directory, and writes the updated master catalog back to the file.

    Warning : The master catalog created with this function will only create a catalog with a local access path.
    Thus the catalog created is not meant to be pushed to the s3 bucket.
    The catalog that will be pushed to the bucket should be created with the function update_master_cat_remote()

    Parameters:
    master_cat_path (str): The file path to the master catalog YAML file.
    copernicus_cat_path (str): The directory path containing the Copernicus product catalog YAML files.

    Returns:
    None
    """

    # Check if the master catalog file exists
    if not os.path.exists(master_cat_path):
        generate_master_template(
            master_cat_path
        )  # Generate master template if it does not exist

    # Read the master catalog YAML file
    master = read_yaml(master_cat_path)

    # Iterate through all files in the Copernicus catalog directory
    for file in os.listdir(copernicus_cat_path):
        if ".yml" in file:  # Process only YAML files
            product_dict = {
                "args": {
                    "path": f"{copernicus_cat_path}{file}"  # Path to the product catalog file
                },
                "description": "Uris and data about a relevant product for GFTS project",
                "driver": "intake.catalog.local.YAMLFileCatalog",
                "metadata": {"version": 1},
            }
            product_id = file.replace(".yml", "")
            master["sources"][product_id] = product_dict

    # Write the updated master catalog back to the file
    write_yaml(master_cat_path, master)
def update_master_cat_remote(master_cat_path, s3):
    """
    Updates the master catalog for the s3 bucket with references to new product catalogs found in the specified directory.

    This function checks if the master catalog file exists. If not, it generates a master template.
    Then, it reads the master catalog, updates it with references to any new product catalogs
    found in the specified directory, and writes the updated master catalog back to the file.

    Parameters:
    master_cat_path (str): The file path to the master catalog YAML file.

    Returns:
    None
    """

    # Check if the master catalog file exists
    if not os.path.exists(master_cat_path):
        generate_master_template(
            master_cat_path
        )  # Generate master template if it does not exist

    # Read the master catalog YAML file
    master = read_yaml(master_cat_path)

    # Iterate through all files in the Copernicus catalog directory
    s3_path = "gfts-ifremer/copernicus_catalogs/product_catalogs/"
    for file in s3.ls(s3_path):
        if ".yml" in file:  # Process only YAML files
            product_dict = {
                "args": {
                    "path": f"s3://{file}"  # Path to the product catalog file
                },
                "description": "Uris and data about a relevant product for GFTS project",
                "driver": "intake.catalog.local.YAMLFileCatalog",
                "metadata": {"version": 1},
            }
            product_id = file.replace(".yml", "").replace(s3_path, "")
            master["sources"][product_id] = product_dict

    # Write the updated master catalog back to the file
    write_yaml(master_cat_path, master)

2. Showcasing how the functions works#


2.1 Creating a catalog for a product#

create_product_yml(
    product_id="GLOBAL_ANALYSISFORECAST_PHY_001_024",
    storage_location="copernicus_catalogs/product_catalogs/",
)

The function create_product_yml creates a catalaog with the product ID that you can find on the product page, in the classification section. It stores it in the specified path.

update_master_cat(
    master_cat_path="copernicus_catalogs/master.yml",
    copernicus_cat_path="copernicus_catalogs/product_catalogs/",
)

The function update_master_cat creates masetr catalog if no catalog already exists. Then it loops over the catalogs in the given folder to access them and create an entry in the sources.

2.2 How to open data#

# Opens the master catalog
cat = intake.open_catalog("copernicus_catalogs/master.yml")
# You can acces the product we just created the following way
cat.GLOBAL_ANALYSISFORECAST_PHY_001_024
# Load a dataset
product = cat.GLOBAL_ANALYSISFORECAST_PHY_001_024
product["cmems_mod_glo_phy-thetao_anfc_0.083deg_P1D-m"](chunk="time").to_dask()

# Don't forget to specify the chunking type or it won't work and this error won't be shown explicitly

Let’s add another source to our master catalog

# Creating a new source and store it in the same folder
create_product_yml(
    product_id="IBI_MULTIYEAR_PHY_005_002",
    storage_location="copernicus_catalogs/product_catalogs/",
)
# Updataing the catalog
update_master_cat(
    master_cat_path="copernicus_catalogs/master.yml",
    copernicus_cat_path="copernicus_catalogs/product_catalogs/",
)

Let’s have a look at our new product :

# Opens the master catalog
cat = intake.open_catalog("copernicus_catalogs/master.yml")
# Verify that we have two products now
list(cat)
product = cat.IBI_MULTIYEAR_PHY_005_002
product["cmems_mod_ibi_phy_my_0.083deg-2D_PT1H-m"](chunk="time").to_dask()

3. Exploring and selecting a variable#

# Generating catalogs to explore
create_product_yml("GLOBAL_ANALYSISFORECAST_PHY_001_024")
create_product_yml("IBI_MULTIYEAR_PHY_005_002")
def variable_selector(master_cat, target_variable, product_id=None):
    """
    Select datasets containing a specific target variable from a given catalog.

    Parameters:
    master_cat (str): Path to the master catalog file.
    target_variable (str): The variable to search for within the datasets.
                           Valid values are "thetao", "deptho", and "zos".
    product_id (str, optional): The specific product ID to filter by. Defaults to None.

    Returns:
    list or dict:
        - If product_id is provided, returns a list of dataset IDs within the specified product
          that contain the target variable.
        - If product_id is not provided, returns a dictionary where keys are product IDs and
          values are lists of dataset IDs within each product that contain the target variable.

    Raises:
    ValueError: If the target_variable is not one of the authorized values.
    """
    if target_variable not in ["thetao", "deptho", "zos"]:
        raise ValueError(
            f"Wrong target variable: {target_variable}. Authorized values are: ['thetao', 'deptho', 'zos']"
        )

    if product_id is not None:
        target_products = []
        cat = intake.open_catalog(master_cat)[product_id]
        possible_datasets = list(cat)
        for dataset_id in possible_datasets:
            if target_variable in cat[dataset_id].metadata["variable"].keys():
                target_products.append(dataset_id)

    else:
        target_products = {}
        cat = intake.open_catalog("copernicus_catalogs/master.yml")
        for product in list(cat):
            target_list = []
            sub_cat = intake.open_catalog(master_cat)[product]
            possible_datasets = list(sub_cat)
            for dataset_id in possible_datasets:
                if target_variable in sub_cat[dataset_id].metadata["variable"].keys():
                    target_list.append(dataset_id)
            target_products[product] = target_list
    return target_products
# Showcase the datasets were the variable is available
variable_selector(master_cat="copernicus_catalogs/master.yml", target_variable="zos")

4. Saving to the bucket#

# Stores the product catalogs in at this path in the bucket : gfts-ifremer/copernicus_catalogs/product_catalogs
s3.put(
    "copernicus_catalogs/product_catalogs/",
    "gfts-ifremer/copernicus_catalogs/product_catalogs",
    recursive=True,
)
# List the files to see waht's inside
s3.ls("gfts-ifremer/copernicus_catalogs", refresh=True)
# The function shapes the files for the remote storage of data for s3 bucket storage
update_master_cat_remote("copernicus_catalogs/master.yml", s3=s3)
# Stores the master catalog with the adapted remote paths and not local files
s3.put(
    "copernicus_catalogs/master.yml",
    "gfts-ifremer/copernicus_catalogs/master.yml",
    recursive=True,
)