Source code for fluxnet_shuttle.core.base

"""
Base Plugin Interface
=====================

:module: fluxnet_shuttle.core.base
:synopsis: Base class and interfaces for data hub plugins in the FLUXNET Shuttle library.
:moduleauthor: Valerie Hendrix <vchendrix@lbl.gov>
:moduleauthor: Sy-Toan Ngo <sytoanngo@lbl.gov>
:platform: Unix, Windows
:created: 2025-10-09
:updated: 2025-12-09

.. currentmodule:: fluxnet_shuttle.core.base

This module defines the base class and interfaces for data hub plugins
in the FLUXNET Shuttle library.

Data Hub Plugin Implementation
++++++++++++++++++++++++++++++

The :class:`DataHubPlugin` abstract base class provides the interface
that all data hub plugins must implement. It includes methods for
retrieving site metadata and handling HTTP requests. When implementing
a new data hub plugin, developers should subclass :class:`DataHubPlugin`
and provide concrete implementations for the abstract methods.

The :property name must return a unique identifier for the plugin, while the
:property display_name should return a human-readable name. The primary method
for retrieving site metadata is :func:`get_sites`, which is an async
generator method. A synchronous version of this method is automatically
available through the :func:`async_to_sync_generator` decorator.

Implementation Basics
---------------------
An example implementation of a data hub plugin might look like this::

    from fluxnet_shuttle.core.base import DataHubPlugin
    from fluxnet_shuttle.models import FluxnetDatasetMetadata

    class MyDataHubPlugin(DataHubPlugin):
        @property
        def name(self) -> str:
            return "mydatahub"

        @property
        def display_name(self) -> str:
            return "My Data Hub"

    @async_to_sync_generator
    async def get_sites(self, **filters) -> AsyncGenerator[FluxnetDatasetMetadata, None]:
        # Implementation to fetch and yield site metadata
        async with self._session_request("GET", "https://api.mydatahub.org/sites") as response:
            data = await response.json()
            for site in data["sites"]:
                yield FluxnetDatasetMetadata(...)  # Populate with actual data


Each plugin must implement the abstract methods defined in the
:class:`DataHubPlugin` base class.

The :class:`DataHubPlugin` class provides both asynchronous and synchronous
versions of the primary method for retrieving site metadata. The asynchronous
version is defined as :func:`get_sites`, which is an async generator method.

The synchronous version is made available through the
:func:`async_to_sync_generator` decorator, allowing users to choose
between async and sync usage based on their application's needs.

HTTP Request Handling
---------------------
Plugins should use the :func:`_session_request` helper method to make
HTTP requests. This method manages the aiohttp ClientSession and includes
error handling to ensure robust network communication.

Error Handling
--------------
Plugins should raise :class:`PluginError` exceptions when encountering
issues during execution. This allows for consistent error handling across
different plugin implementations.

"""

import asyncio
import logging
from abc import ABC, abstractmethod
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, Dict, Optional

import aiohttp

from fluxnet_shuttle.core import exceptions
from fluxnet_shuttle.core.decorators import async_to_sync_generator
from fluxnet_shuttle.core.http_utils import session_request

from ..models import FluxnetDatasetMetadata

_logger = logging.getLogger(__name__)


[docs] class DataHubPlugin(ABC): """ Base class for all data hub plugins. This abstract base class defines the interface that all data hub plugins must implement. It provides both async and sync versions of the get_sites method through the sync_from_async decorator. Attributes: config (Dict[str, Any]): Configuration dictionary for the plugin """ def __init__( self, config: Optional[Dict[str, Any]] = None, ): """ Initialize the data hub plugin. Args: config: Optional configuration dictionary """ self.config = config or {} @property def name(self) -> str: # pragma: no cover """ Data hub name identifier. Returns: str: Lowercase data hub identifier (e.g., 'ameriflux', 'icos') """ raise NotImplementedError("Subclasses must implement the 'name' property") @property @abstractmethod def display_name(self) -> str: """ Data hub name. Returns: str: Display name for the data hub (e.g., 'AmeriFlux', 'ICOS') """ pass
[docs] @async_to_sync_generator @abstractmethod async def get_sites(self, **filters: Any) -> AsyncGenerator[FluxnetDatasetMetadata, None]: """ Get available sites from the data hub. This is the primary method that plugins must implement. A synchronous version is automatically available as get_sites_sync(). Args: **filters: Optional filters to apply to site selection Yields: FluxnetDatasetMetadata: Site metadata objects Raises: PluginError: If a shuttle plugin error occurs during site retrieval Example: >>> plugin = SomeDataHubPlugin() >>> async for site in plugin.get_sites(): ... print(site.site_info.site_id) >>> # Or use the sync version >>> for site in plugin.get_sites_sync(): ... print(site.site_info.site_id) """ pass
[docs] async def download_file( self, site_id: str, download_link: str, **kwargs: Any, ) -> AsyncGenerator[bytes, None]: """ Download a file from the data hub and yield byte chunks. This method provides a default implementation that performs a simple GET request. Plugins can override this method to implement custom download logic, such as user tracking (AmeriFlux) or header validation (ICOS). Args: site_id: Site identifier download_link: URL to download the data from **kwargs: Additional plugin-specific parameters (e.g., filename, user_name, user_email for tracking) Yields: bytes: Byte chunks from the downloaded content Raises: PluginError: If download fails """ async with self._session_request("GET", download_link) as response: async for chunk in response.content.iter_any(): yield chunk
@asynccontextmanager async def _session_request( self, method: str, url: str, **kwargs: Any ) -> AsyncGenerator[aiohttp.ClientResponse, None]: """ Make an HTTP request using an aiohttp ClientSession. This helper method should be used by subclasses to make HTTP requests. Args: method: HTTP method (e.g., 'GET', 'POST') url: URL to request **kwargs: Additional arguments for the request (See aiohttp.ClientSession.request) Returns: The response object from the request Raises: PluginError: If a shuttle plugin error occurs during the request Example: >>> try: ... with await self._session_request('GET', 'https://amfcdn-dev.lbl.gov/api/data') as response: ... data = await response.json() ... except PluginError as e: ... print(f"Error occurred: {e}") Note: Error handling is built-in to log and re-raise as PluginError. """ try: kwargs.setdefault("http_timeouts", self.config.get("http_timeouts")) async with session_request(method, url, **kwargs) as response: response.raise_for_status() # Raise an error for bad responses (4xx and 5xx) yield response except (TimeoutError, asyncio.TimeoutError) as e: _logger.error(f"HTTP request timed out: {e}") raise exceptions.HttpTimeoutError( plugin_name=self.name, message="HTTP request timed out", original_error=e ) from e except aiohttp.ClientError as e: _logger.error(f"HTTP request failed: {e}") raise exceptions.NetworkError( plugin_name=self.name, message="Failed to make HTTP request", original_error=e ) from e except Exception as e: _logger.error(f"Unexpected error during HTTP request: {e}") raise exceptions.PluginError( plugin_name=self.name, message="Unexpected error during HTTP request", original_error=e ) from e