Source code for fluxnet_shuttle.plugins.tern

"""
TERN Data Hub Plugin
====================

TERN (Terrestrial Ecosystem Research Network) data hub implementation
for the FLUXNET Shuttle plugin system.

This plugin handles:
- BADM SGI metadata from BIF (BADM Interchange Format) files
- FLUXNET Data Product information from separate text files
- Parsing grouped metadata elements (e.g., team members)
"""

import asyncio
import csv
import logging
from collections import defaultdict
from io import StringIO
from typing import Any, AsyncGenerator, Dict, List, Optional

from fluxnet_shuttle.core.config import DEFAULT_FLUXNET_SHUTTLE_REFERER
from fluxnet_shuttle.core.exceptions import PluginError

from ..core.base import DataHubPlugin
from ..core.decorators import async_to_sync_generator
from ..models import (
    BadmSiteGeneralInfo,
    DataFluxnetProduct,
    FluxnetDatasetMetadata,
    TeamMember,
)
from ..shuttle import (
    _extract_filename_from_url,
    extract_fluxnet_filename_metadata,
)

logger = logging.getLogger(__name__)

# Default TERN data endpoint.
# Can be overridden in config.yaml under ``data_hubs.tern.base_url``.
TERN_BASE_URL = "https://dap.tern.org.au/thredds/fileServer/ecosystem_process/fluxnet/"
TERN_BIF_METADATA_URL = f"{TERN_BASE_URL}BIF_all_sites.csv"
TERN_PRODUCT_METADATA_URL = f"{TERN_BASE_URL}TERN_THREDDS_catalogue.csv"

# BIF file column names
BIF_COLUMNS = ["SITE_ID", "GROUP_ID", "VARIABLE_GROUP", "VARIABLE", "DATAVALUE"]


def _is_newer_product(
    current_version: tuple[int, ...],
    current_run: int,
    latest_version: Optional[tuple[int, ...]],
    latest_run: Optional[int],
) -> bool:
    """
    Determine if a product is newer than the current latest product.

    Comparison logic:
    1. Compare major version numbers first (e.g., v2 > v1)
    2. If major versions equal, compare minor versions (e.g., v1.3 > v1.2)
    3. Continue for all version parts (patch, etc.)
    4. If all version parts equal, compare run numbers (e.g., r2 > r1)

    Args:
        current_version: Version tuple of current product (e.g., (1, 3) for v1.3)
        current_run: Run number of current product (e.g., 2 for r2)
        latest_version: Version tuple of latest product found so far (None if first)
        latest_run: Run number of latest product found so far (None if first)

    Returns:
        True if current product is newer than latest, False otherwise

    Examples:
        >>> _is_newer_product((1, 3), 1, (1, 2), 1)  # v1.3 > v1.2
        True
        >>> _is_newer_product((1, 3), 2, (1, 3), 1)  # v1.3.r2 > v1.3.r1
        True
        >>> _is_newer_product((2, 0), 1, (1, 9), 1)  # v2.0 > v1.9
        True
    """
    # If no latest product exists yet, current is always newer
    if latest_version is None or latest_run is None:
        return True

    # FLUXNET versions currently follow a 2-part pattern (major.minor)
    # e.g., v1.3, v2.0, etc. Compare each part explicitly.

    # Compare major version (index 0)
    if len(current_version) > 0 and len(latest_version) > 0:
        current_major = current_version[0]
        latest_major = latest_version[0]

        if current_major > latest_major:
            return True
        elif current_major < latest_major:
            return False
        # If major versions equal, continue to minor

    # Compare minor version (index 1) if both versions have it
    if len(current_version) > 1 and len(latest_version) > 1:
        current_minor = current_version[1]
        latest_minor = latest_version[1]

        if current_minor > latest_minor:
            return True
        elif current_minor < latest_minor:
            return False
        # If minor versions equal, continue to run number

    # All version parts are equal, compare run numbers
    return current_run > latest_run


class BIFParser:
    """
    Parser for BADM Interchange Format (BIF) files.

    BIF files contain BADM metadata in a CSV format with columns:
    SITE_ID, GROUP_ID, VARIABLE_GROUP, VARIABLE, DATAVALUE

    The GROUP_ID identifies related elements (e.g., all fields for one team member).
    """

    @staticmethod
    def parse_bif_content(content: str) -> Dict[str, Dict[str, Dict[str, List[Dict[str, str]]]]]:
        """
        Parse BIF file content into structured metadata.

        Args:
            content: BIF file content as string

        Returns:
            Nested dictionary structure:
            {
                site_id: {
                    group_id: {
                        variable_group: [
                            {variable: datavalue, ...},
                            ...
                        ]
                    }
                }
            }
        """
        reader = csv.DictReader(StringIO(content))

        # Validate header
        if not reader.fieldnames or set(reader.fieldnames) != set(BIF_COLUMNS):
            raise ValueError(f"Invalid BIF file format. Expected columns: {BIF_COLUMNS}, got: {reader.fieldnames}")

        # Structure: site_id -> group_id -> variable_group -> list of {variable: datavalue}
        parsed_data: Dict[str, Dict[str, Dict[str, List[Dict[str, str]]]]] = defaultdict(
            lambda: defaultdict(lambda: defaultdict(list))
        )

        for row in reader:
            site_id = row["SITE_ID"]
            group_id = row["GROUP_ID"]
            variable_group = row["VARIABLE_GROUP"]
            variable = row["VARIABLE"]
            datavalue = row["DATAVALUE"]

            # Combine site_id with group_id to ensure uniqueness across sites
            # This handles the case where GROUP_IDs might only be unique within a site
            unique_group_key = f"{site_id}_{group_id}"

            parsed_data[site_id][unique_group_key][variable_group].append({variable: datavalue})

        return parsed_data

    @staticmethod
    def extract_site_metadata(  # noqa: C901
        site_id: str, site_data: Dict[str, Dict[str, List[Dict[str, str]]]]
    ) -> Dict[str, Any]:
        """
        Extract structured metadata for a single site from parsed BIF data.

        Args:
            site_id: Site identifier
            site_data: Parsed BIF data for this site (group_id -> variable_group -> data)

        Returns:
            Dictionary with structured site metadata including:
            - site_name
            - location_lat, location_long
            - igbp
            - network (list)
            - team_members (list of dicts)
            - utc_offset
        """
        metadata: Dict[str, Any] = {
            "site_id": site_id,
            "site_name": "",
            "location_lat": 0.0,
            "location_long": 0.0,
            "igbp": "UNK",
            "network": [],
            "team_members": [],
            "utc_offset": None,
        }

        # Iterate through all groups for this site
        for group_id, group_data in site_data.items():
            # Process HEADER group
            if "HEADER" in group_data:
                for item in group_data["HEADER"]:
                    if "SITE_NAME" in item:
                        metadata["site_name"] = item["SITE_NAME"]

            # Process LOCATION group
            if "LOCATION" in group_data:
                for item in group_data["LOCATION"]:
                    # Process latitude and longitude fields
                    for field_name, metadata_key, display_name in [
                        ("LOCATION_LAT", "location_lat", "latitude"),
                        ("LOCATION_LONG", "location_long", "longitude"),
                    ]:
                        if field_name in item:
                            try:
                                metadata[metadata_key] = float(item[field_name])
                            except (ValueError, TypeError):
                                logger.warning(f"Invalid {display_name} for site {site_id}: {item[field_name]}")

            # Process IGBP group
            if "IGBP" in group_data:
                for item in group_data["IGBP"]:
                    if "IGBP" in item:
                        metadata["igbp"] = item["IGBP"]

            # Process NETWORK group (can have multiple entries)
            if "NETWORK" in group_data:
                for item in group_data["NETWORK"]:
                    network = item.get("NETWORK")
                    if network and network not in metadata["network"]:
                        metadata["network"].append(network)

            # Process TEAM_MEMBER group (grouped by GROUP_ID)
            # Note: A group can contain multiple team members (rows with same GROUP_ID)
            # We need to group by each occurrence of TEAM_MEMBER_NAME to separate individuals
            if "TEAM_MEMBER" in group_data:
                current_member: Dict[str, str] = {}
                for item in group_data["TEAM_MEMBER"]:
                    # When we encounter a name, it signals a new team member
                    if "TEAM_MEMBER_NAME" in item:
                        # Save previous member if exists
                        if current_member.get("name"):
                            metadata["team_members"].append(current_member)
                        # Start new member
                        current_member = {"name": item["TEAM_MEMBER_NAME"], "role": "", "email": ""}
                    elif "TEAM_MEMBER_ROLE" in item:
                        if current_member:
                            current_member["role"] = item["TEAM_MEMBER_ROLE"]
                    elif "TEAM_MEMBER_EMAIL" in item:
                        if current_member:
                            current_member["email"] = item["TEAM_MEMBER_EMAIL"]

                # Add the last team member
                if current_member.get("name"):
                    metadata["team_members"].append(current_member)

            # Process UTC_OFFSET group
            if "UTC_OFFSET" in group_data:
                for item in group_data["UTC_OFFSET"]:
                    if "UTC_OFFSET" in item:
                        try:
                            metadata["utc_offset"] = float(item["UTC_OFFSET"])
                        except (ValueError, TypeError):
                            logger.warning(f"Invalid UTC offset for site {site_id}: {item['UTC_OFFSET']}")

        return metadata


[docs] class TERNPlugin(DataHubPlugin): """TERN data hub plugin implementation.""" @property def name(self) -> str: return __name__.split(".")[-1] @property def display_name(self) -> str: return "TERN"
[docs] @async_to_sync_generator async def get_sites(self, **filters: Any) -> AsyncGenerator[FluxnetDatasetMetadata, None]: """ Get TERN sites with FLUXNET data. This method: 1. Fetches BIF file containing BADM Site General Information 2. Fetches FLUXNET product metadata file 3. Combines the data to yield FluxnetDatasetMetadata objects Args: **filters: Optional filters (not used in this implementation) Yields: FluxnetDatasetMetadata: Site metadata objects """ logger.info("Fetching TERN sites...") try: # Fetch BIF metadata bif_metadata = await self._fetch_bif_metadata() # Fetch product metadata product_metadata = await self._fetch_product_metadata() # Combine and yield results async for site_data in self._combine_metadata(bif_metadata, product_metadata): await asyncio.sleep(0.001) # Yield control to event loop yield site_data except PluginError: raise except Exception as e: logger.exception("Failed to retrieve TERN data: %s", e) raise PluginError(self.name, f"Failed to retrieve data: {e}", original_error=e)
async def _fetch_bif_metadata(self) -> Dict[str, Dict[str, Any]]: """ Fetch and parse BIF metadata file. Returns: Dictionary mapping site_id to parsed site metadata Raises: PluginError: If fetching or parsing fails """ base_url = self.config.get("base_url", TERN_BASE_URL) bif_metadata_url = f"{base_url}BIF_all_sites.csv" logger.info(f"Fetching BIF metadata from {bif_metadata_url}") headers = {"Referer": self.config.get("fluxnet_shuttle_referer", DEFAULT_FLUXNET_SHUTTLE_REFERER)} try: async with self._session_request("GET", bif_metadata_url, headers=headers) as response: content = await response.text() # Parse BIF content parser = BIFParser() parsed_data = parser.parse_bif_content(content) # Extract structured metadata for each site site_metadata = {} for site_id, site_data in parsed_data.items(): site_metadata[site_id] = parser.extract_site_metadata(site_id, site_data) logger.info(f"Successfully parsed BIF metadata for {len(site_metadata)} sites") return site_metadata except PluginError: raise except Exception as e: logger.error(f"Error fetching BIF metadata: {e}") raise PluginError(self.name, f"Failed to fetch BIF metadata: {e}", original_error=e) async def _fetch_product_metadata(self) -> Dict[str, List[Dict[str, Any]]]: """ Fetch FLUXNET product metadata file. File format: SITE_ID,PRODUCT_URL,PRODUCT_ID,PRODUCT_CITATION AU-Lox,https://...,doi:...,Citation text... Returns: Dictionary mapping site_id to list of all available products for that site. Selection of the best product happens later in _combine_metadata. Raises: PluginError: If fetching or parsing fails """ base_url = self.config.get("base_url", TERN_BASE_URL) product_metadata_url = f"{base_url}TERN_THREDDS_catalogue.csv" logger.info(f"Fetching product metadata from {product_metadata_url}") headers = {"Referer": self.config.get("fluxnet_shuttle_referer", DEFAULT_FLUXNET_SHUTTLE_REFERER)} try: async with self._session_request("GET", product_metadata_url, headers=headers) as response: content = await response.text() # Parse product metadata (returns all products per site, no selection yet) product_data = self._parse_products(content) logger.info(f"Successfully parsed product metadata for {len(product_data)} sites") return product_data except PluginError: raise except Exception as e: logger.error(f"Error fetching product metadata: {e}") raise PluginError(self.name, f"Failed to fetch product metadata: {e}", original_error=e) @staticmethod def _parse_products(content: str) -> Dict[str, List[Dict[str, Any]]]: """ Parse product metadata file and group by site. File format: SITE_ID,PRODUCT_URL,PRODUCT_ID,PRODUCT_CITATION Args: content: File content as string Returns: Dictionary mapping site_id to list of all products for that site """ reader = csv.DictReader(StringIO(content)) # Group products by site_id products_by_site: Dict[str, List[Dict[str, Any]]] = defaultdict(list) for row in reader: site_id = row.get("SITE_ID", "").strip() product_url = row.get("PRODUCT_URL", "").strip() product_id = row.get("PRODUCT_ID", "").strip() product_citation = row.get("PRODUCT_CITATION", "").strip() if not site_id or not product_url: continue products_by_site[site_id].append( { "product_url": product_url, "product_id": product_id, "product_citation": product_citation, } ) return dict(products_by_site) @staticmethod def _select_latest_product_version(products: List[Dict[str, Any]], site_id: str) -> Optional[Dict[str, Any]]: """ Select the latest run of the most recent version from a list of products and return the product with parsed filename components. Selection criteria (in order): 1. Valid FLUXNET filename format 2. Highest version number (e.g., v1.3 > v1.2) 3. Highest run number within that version (e.g., r2 > r1) Args: products: List of product dictionaries with 'product_url' key site_id: Site identifier (for logging) Returns: Dictionary containing: - product: Original product dict - filename: Extracted filename - product_source_network: Network code from filename - oneflux_code_version: Version string - first_year: First year of data - last_year: Last year of data - version_tuple: Parsed version tuple (for comparison) - run_num: Run number (for comparison) Returns None if no valid products found """ latest_product: Optional[Dict[str, Any]] = None for product in products: url = product["product_url"] filename = _extract_filename_from_url(url) # Extract all metadata from filename (validates format internally) product_source_network, version, first_year, last_year, run = extract_fluxnet_filename_metadata(filename) # Skip if validation failed if not version or not run: logger.debug( f"Skipping product for {site_id} - filename does not follow standard format " f"(<network_id>_<site_id>_FLUXNET_<year_range>_<version>_<run>.<extension>): {filename}" ) continue # Parse version number (e.g., "v1.3" -> (1, 3)) version_parts = version.lower().replace("v", "").split(".") version_tuple = tuple(int(p) for p in version_parts) # Parse run number (e.g., "r2" -> 2) run_num = int(run.lower().replace("r", "")) # Keep track of the latest product found so far if _is_newer_product( current_version=version_tuple, current_run=run_num, latest_version=latest_product["version_tuple"] if latest_product else None, latest_run=latest_product["run_num"] if latest_product else None, ): latest_product = { "product": product, "version_tuple": version_tuple, "run_num": run_num, "filename": filename, "product_source_network": product_source_network, "oneflux_code_version": version, "first_year": first_year, "last_year": last_year, } if latest_product is None: logger.debug( f"No valid production-ready products found for {site_id} (currently only beta products available)" ) return None logger.debug( f"Selected product for {site_id}: {latest_product['filename']} " f"(version: {latest_product['version_tuple']}, run: {latest_product['run_num']})" ) return latest_product async def _combine_metadata( self, bif_metadata: Dict[str, Dict[str, Any]], product_metadata: Dict[str, List[Dict[str, Any]]] ) -> AsyncGenerator[FluxnetDatasetMetadata, None]: """ Combine BIF metadata and product metadata to create FluxnetDatasetMetadata objects. This method selects the best product for each site and extracts filename metadata in a single pass. Args: bif_metadata: Dictionary of site metadata from BIF file product_metadata: Dictionary mapping site_id to list of available products Yields: FluxnetDatasetMetadata objects """ # Find sites that have both BIF and product metadata common_sites = set(bif_metadata.keys()) & set(product_metadata.keys()) if not common_sites: logger.warning("No sites found with both BIF and product metadata") return logger.info(f"Processing {len(common_sites)} sites with complete metadata") for site_id in common_sites: try: site_meta = bif_metadata[site_id] products_list = product_metadata[site_id] # Select best product and extract metadata (single extraction point!) latest_product = self._select_latest_product_version(products_list, site_id) if not latest_product: logger.debug(f"No valid product found for site {site_id}, skipping") continue # Extract metadata from the selection result selected_product = latest_product.get("product", {}) filename = latest_product.get("filename", "") product_source_network = latest_product.get("product_source_network", "") oneflux_code_version = latest_product.get("oneflux_code_version", "") first_year = latest_product.get("first_year", 0) last_year = latest_product.get("last_year", 0) # Build BadmSiteGeneralInfo team_members = [] for tm_data in site_meta.get("team_members", []): try: team_member = TeamMember( team_member_name=tm_data.get("name", ""), team_member_role=tm_data.get("role", ""), team_member_email=tm_data.get("email", ""), ) team_members.append(team_member) except Exception as e: logger.warning(f"Error parsing team member for site {site_id}: {e}") continue site_info = BadmSiteGeneralInfo( site_id=site_id, site_name=site_meta.get("site_name", ""), data_hub="TERN", location_lat=site_meta.get("location_lat", 0.0), location_long=site_meta.get("location_long", 0.0), igbp=site_meta.get("igbp", "UNK"), network=site_meta.get("network", []), group_team_member=team_members, ) # Get citation product_citation = selected_product.get("product_citation", "") # Skip site if citation is not available if not product_citation: logger.warning(f"Skipping site {site_id} - no citation available. Please contact TERN support.") continue product_data = DataFluxnetProduct( first_year=first_year, last_year=last_year, download_link=selected_product.get("product_url", ""), product_citation=product_citation, product_id=selected_product.get("product_id", ""), oneflux_code_version=oneflux_code_version, product_source_network=product_source_network, fluxnet_product_name=filename, ) metadata = FluxnetDatasetMetadata(site_info=site_info, product_data=product_data) yield metadata except Exception as e: logger.warning(f"Error processing site {site_id}: {e}. Skipping this site.") continue
# Auto-register the plugin from fluxnet_shuttle.core.registry import registry registry.register(TERNPlugin)