Source code for fluxnet_shuttle.plugins.icos

"""
ICOS Data Hub Plugin
====================

ICOS Carbon Portal data hub implementation for the FLUXNET Shuttle plugin system.

:moduleauthor: Gilberto Pastorello <gzpastorello@lbl.gov>
:moduleauthor: Valerie Hendrix <vchendrix@lbl.gov>
:moduleauthor: Sy-Toan Ngo <sytoanngo@lbl.gov>
:platform: Unix, Windows
:created: 2025-01-09
:updated: 2025-12-09
"""

import asyncio
import logging
from typing import Any, AsyncGenerator, Dict, Generator, List, Optional, Tuple

from ..core.base import DataHubPlugin
from ..core.config import DEFAULT_FLUXNET_SHUTTLE_REFERER
from ..core.decorators import async_to_sync_generator
from ..models import (
    BadmSiteGeneralInfo,
    DataFluxnetProduct,
    FluxnetDatasetMetadata,
    TeamMember,
)
from ..shuttle import (
    extract_fluxnet_filename_metadata,
    validate_fluxnet_filename_format,
)

logger = logging.getLogger(__name__)

# Default ICOS SPARQL endpoint.
# Can be overridden in config.yaml under ``data_hubs.icos.api_url``.
ICOS_API_URL = "https://meta.icos-cp.eu/sparql"

# Mapping from ICOS SPARQL team member roles to BADM controlled vocabulary
ICOS_ROLE_TO_BADM = {
    "principal investigator": "PI",
    "administrator": "PI",
    "researcher": "FluxContact",
    "data manager": "DataManager",
    "engineer": "Technician",
}


# Mapping from ICOS network codes to BADM controlled vocabulary codes.
# Source: https://meta.icos-cp.eu/ontologies/cpmeta/EtcNetwork
ICOS_NETWORK_TO_BADM = {
    "AMF": "AmeriFlux",
    "ANAEE": "ANAEE",
    "ASF": "AsiaFlux",
    "CAFRICA": "CarboAfrica",
    "CEURO": "CarboEuroFlux",
    "CEUROIP": "CarboEuropeIP",
    "CEXTREME": "CarboExtreme",
    "CITALY": "CarboItaly",
    "CMONT": "Carbomont",
    "CNF": "ChinaFLUX",
    "CZN": "CZNet",
    "DANUBIUS": "DANUBIUS-RI",
    "ELTER": "eLTER",
    "EUDB": "European Fluxes Database",
    "EUROFLUX": "EuroFlux",
    "FCANADA": "Fluxnet-Canada",
    "FLX": "Unaffiliated",
    "GHGEU": "GHG-Europe",
    "GREENG": "GreenGrass",
    "ICOS": "ICOS",
    "IMECC": "IMECC",
    "INFLUX": "INFLUX",
    "INGOS": "InGOS",
    "JPF": "JapanFlux",
    "KOF": "KoFlux",
    "LBA": "LBA",
    "LTAR": "LTAR",
    "LTER": "LTER",
    "MEDFLU": "Medeflu",
    "MEXFL": "MexFlux",
    "NEON": "NEON",
    "NYSM": "New York State Mesonet",
    "OZF": "OzFlux",
    "PAGE21": "PAGE21",
    "PHEN": "Phenocam",
    "SAEON": "EFTEON SAEON",
    "SWISSF": "Swiss FluxNet",
    "TAIFLU": "TaiwanFlux",
    "TCOS": "TCOS-Siberia",
    "TERENO": "TERENO",
    "TERN": "TERN",
    "THAIFLU": "ThaiFlux",
    "TROPI": "TROPI-DRY",
    "UFLUX": "UFLUX",
    "URBFLU": "Urban Flux Network",
    "USCCC": "USCCC",
}

ICOS_SPARQL_QUERY = """
prefix cpmeta: <http://meta.icos-cp.eu/ontologies/cpmeta/>
prefix prov: <http://www.w3.org/ns/prov#>
prefix xsd: <http://www.w3.org/2001/XMLSchema#>
prefix geo: <http://www.opengis.net/ont/geosparql#>
prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>
select ?dobj ?hasNextVersion ?spec ?station ?stationName ?fileName ?size ?submTime
       ?timeStart ?timeEnd ?lat ?lon ?ecosystemType ?citationString
       ?firstName ?lastName ?email ?roleName ?orgName ?networks
where {
    VALUES ?spec {<http://meta.icos-cp.eu/resources/cpmeta/miscFluxnetArchiveProduct>}
    ?dobj cpmeta:hasObjectSpec ?spec .
    BIND(EXISTS{[] cpmeta:isNextVersionOf ?dobj} AS ?hasNextVersion)
    ?dobj cpmeta:wasAcquiredBy/prov:wasAssociatedWith ?station .
    ?dobj cpmeta:hasSizeInBytes ?size .
    ?dobj cpmeta:hasName ?fileName .
    ?dobj cpmeta:wasSubmittedBy/prov:endedAtTime ?submTime .
    ?dobj cpmeta:hasStartTime | (cpmeta:wasAcquiredBy / prov:startedAtTime) ?timeStart .
    ?dobj cpmeta:hasEndTime | (cpmeta:wasAcquiredBy / prov:endedAtTime) ?timeEnd .

    # Get station name
    OPTIONAL {
        ?station cpmeta:hasName ?stationName .
    }

    # Get station location
    OPTIONAL {
        ?station cpmeta:hasLatitude ?lat .
        ?station cpmeta:hasLongitude ?lon .
    }

    # Get ecosystem/vegetation type if available
    OPTIONAL {
        ?station cpmeta:hasEcosystemType ?ecosystemType .
    }

    # Get citation string
    OPTIONAL {
        ?dobj cpmeta:hasCitationString ?citationString .
    }

    # Get network affiliations
    OPTIONAL {
        ?station cpmeta:hasAssociatedNetwork ?networks .
    }

    # Get team member information
    OPTIONAL {
        ?membership cpmeta:atOrganization ?station .
        ?person cpmeta:hasMembership ?membership .
        OPTIONAL { ?person cpmeta:hasFirstName ?firstName . }
        OPTIONAL { ?person cpmeta:hasLastName ?lastName . }
        OPTIONAL { ?person cpmeta:hasEmail ?email . }
        OPTIONAL {
            ?membership cpmeta:hasRole ?role .
            ?role rdfs:label ?roleName .
        }
        OPTIONAL {
            ?membership cpmeta:hasAttributingOrganization ?org .
            ?org cpmeta:hasName ?orgName .
        }
    }

    FILTER NOT EXISTS {[] cpmeta:isNextVersionOf ?dobj}
}
order by desc(?fileName)
"""


[docs] class ICOSPlugin(DataHubPlugin): """ICOS Carbon Portal data hub plugin implementation.""" @property def name(self) -> str: return "icos" @property def display_name(self) -> str: return "ICOS"
[docs] @async_to_sync_generator async def get_sites(self) -> AsyncGenerator[FluxnetDatasetMetadata, None]: """ Fetch ICOS sites with FLUXNET data from the ICOS Carbon Portal SPARQL endpoint. Yields site metadata objects using the FluxnetDatasetMetadata model. All available sites are returned; filtering is not currently supported. This method is an async generator. The :func:`async_to_sync_generator` decorator allows usage in both asynchronous and synchronous contexts. Configuration: api_url (str): Optional. Override the default ICOS API URL. timeout (int): Optional. Request timeout in seconds. Yields: FluxnetDatasetMetadata: Site metadata objects. """ logger.info("Fetching ICOS sites...") # Get configuration parameters api_url = self.config.get("api_url", ICOS_API_URL) headers = { "Accept": "application/json", "Referer": self.config.get("fluxnet_shuttle_referer", DEFAULT_FLUXNET_SHUTTLE_REFERER), } async with self._session_request( "POST", api_url, data={"query": ICOS_SPARQL_QUERY}, headers=headers ) as response: data = await response.json() # Parse and yield site metadata for site_data in self._parse_sparql_response(data): await asyncio.sleep(0.001) # Yield control to event loop yield site_data
def _group_sparql_bindings(self, bindings: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: """Group SPARQL bindings by data object URI and collect team members.""" sites_data: Dict[str, Dict[str, Any]] = {} for binding in bindings: try: dobj_uri = binding["dobj"]["value"] # Initialize site data if first time seeing this dobj if dobj_uri not in sites_data: station_uri = binding["station"]["value"][-6:] station_id = station_uri.split("/")[-1] sites_data[dobj_uri] = { "station_id": station_id, "station_name": binding.get("stationName", {}).get("value", station_id), "time_start": binding.get("timeStart", {}).get("value", ""), "time_end": binding.get("timeEnd", {}).get("value", ""), "location_lat": binding.get("lat", {}).get("value"), "location_long": binding.get("lon", {}).get("value"), "ecosystem_type": binding.get("ecosystemType", {}).get("value", ""), "citation": binding.get("citationString", {}).get("value", ""), "filename": binding.get("fileName", {}).get("value", ""), "dobj_uri": dobj_uri, "team_members": [], "network_uris": set(), } # Collect network URI if present network_uri = binding.get("networks", {}).get("value", "") if network_uri: sites_data[dobj_uri]["network_uris"].add(network_uri) # Extract and add team member if present team_member = self._extract_team_member(binding) if team_member: sites_data[dobj_uri]["team_members"].append(team_member) except Exception as e: logger.warning(f"Error grouping ICOS site data: {e}") continue return sites_data def _map_network_to_badm(self, network_uri: str) -> Optional[str]: """ Map an ICOS network URI to a BADM network controlled vocabulary code. ICOS provides network affiliations as URIs in the format: http://meta.icos-cp.eu/resources/networks/ETC/<CODE> The BADM code is extracted from the last path segment of the URI and validated against BADM_NETWORK_CODES. Args: network_uri: ICOS network URI string Returns: BADM network code, or None if the code is not recognized """ if not network_uri: return None # Extract the network code from the URI (last path segment) code = network_uri.split("/")[-1] if not code: return None badm_code = ICOS_NETWORK_TO_BADM.get(code) if badm_code is None: logger.debug(f"Unknown ICOS network code for BADM mapping: {code}") return None return badm_code def _map_icos_role_to_badm(self, icos_role: str) -> str: """ Map ICOS SPARQL team member roles to BADM controlled vocabulary. Args: icos_role: Role name from ICOS SPARQL query Returns: BADM controlled vocabulary role (PI, FluxContact, DataManager, Technician, or Affiliate) """ # Normalize role string by stripping whitespace and converting to lower case for comparison normalized_role = icos_role.strip().lower() # Return mapped role or "Affiliate" for blank/other values return ICOS_ROLE_TO_BADM.get(normalized_role, "Affiliate") def _extract_team_member(self, binding: Dict[str, Any]) -> Optional[TeamMember]: """Extract team member information from SPARQL binding.""" first_name = binding.get("firstName", {}).get("value", "") last_name = binding.get("lastName", {}).get("value", "") if not (first_name or last_name): return None full_name = f"{first_name} {last_name}".strip() if not full_name: return None # Get ICOS role and map to BADM controlled vocabulary icos_role = binding.get("roleName", {}).get("value", "") badm_role = self._map_icos_role_to_badm(icos_role) return TeamMember( team_member_name=full_name, team_member_role=badm_role, team_member_email=binding.get("email", {}).get("value", ""), ) def _parse_coordinates(self, station_id: str, lat_value: Any, lon_value: Any) -> Tuple[float, float]: """Parse and validate latitude and longitude coordinates.""" location_lat = 0.0 location_long = 0.0 try: if lat_value is not None: location_lat = float(lat_value) except (ValueError, TypeError): logger.warning(f"Invalid latitude for station {station_id}") try: if lon_value is not None: location_long = float(lon_value) except (ValueError, TypeError): logger.warning(f"Invalid longitude for station {station_id}") return location_lat, location_long def _parse_sparql_response(self, data: Dict[str, Any]) -> Generator[FluxnetDatasetMetadata, None, None]: """ Parse ICOS SPARQL response to extract site information. Args: data: SPARQL response data Yields: FluxnetDatasetMetadata objects with citation information and team members from SPARQL query """ bindings = data.get("results", {}).get("bindings", []) sites_data = self._group_sparql_bindings(bindings) # Yield one FluxnetDatasetMetadata per site for dobj_uri, site_data in sites_data.items(): try: station_id = site_data["station_id"] filename = site_data["filename"] # Validate filename format if not validate_fluxnet_filename_format(filename): logger.debug( f"Skipping site {station_id} - filename does not follow standard format " f"(<network_id>_<site_id>_FLUXNET_<year_range>_<version>_<run>.<extension>): " f"{filename}" ) continue location_lat, location_long = self._parse_coordinates( station_id, site_data["location_lat"], site_data["location_long"] ) igbp = self._map_ecosystem_to_igbp(site_data["ecosystem_type"]) download_id = dobj_uri.split("/")[-1] download_link = f"https://data.icos-cp.eu/licence_accept?ids=%5B%22{download_id}%22%5D" # Extract product source network, code version, and year range from filename product_source_network, oneflux_code_version, first_year, last_year, _ = ( extract_fluxnet_filename_metadata(filename) ) citation = site_data["citation"] # Skip site if citation is not available if not citation: logger.warning( f"Skipping site {station_id} - no citation available. " f"Please contact FLUXNET support at support@fluxnet.org." ) continue # Translate network URIs to BADM controlled vocabulary codes networks = sorted( code for uri in site_data["network_uris"] if (code := self._map_network_to_badm(uri)) is not None ) site_info = BadmSiteGeneralInfo( site_id=station_id, site_name=site_data["station_name"], data_hub="ICOS", location_lat=location_lat, location_long=location_long, igbp=igbp, network=networks, group_team_member=site_data["team_members"], ) product_data = DataFluxnetProduct( first_year=first_year, last_year=last_year, download_link=download_link, # type: ignore[arg-type] product_citation=citation, product_id=download_id, oneflux_code_version=oneflux_code_version, product_source_network=product_source_network, fluxnet_product_name=filename, ) yield FluxnetDatasetMetadata(site_info=site_info, product_data=product_data) except Exception as e: logger.warning(f"Error parsing ICOS site data: {e}") continue def _map_ecosystem_to_igbp(self, ecosystem_type: str) -> str: """ Map ICOS ecosystem type to IGBP land cover classification. ICOS provides ecosystem types as URIs in the format: http://meta.icos-cp.eu/ontologies/cpmeta/igbp_XXX where XXX is the IGBP code (ENF, GRA, CRO, etc.) Args: ecosystem_type: ICOS ecosystem type string (URI) Returns: IGBP classification code (e.g., "ENF", "GRA", "CRO", "UNK") """ if not ecosystem_type: return "UNK" # Extract the last part of URI if it's a URI if "/" in ecosystem_type: ecosystem_type = ecosystem_type.split("/")[-1] # Check if it's in IGBP format (igbp_XXX) if ecosystem_type.startswith("igbp_"): igbp_code = ecosystem_type[5:].upper() # Extract XXX from igbp_XXX return igbp_code # If not recognized, return unknown logger.debug(f"Unknown ecosystem type for IGBP mapping: {ecosystem_type}") return "UNK"
# Auto-register the plugin from ..core.registry import registry registry.register(ICOSPlugin)