"""
Main Shuttle Orchestrator
=========================
:module:: fluxnet_shuttle.core.shuttle
:synopsis: Main orchestrator for FLUXNET Shuttle operations
:moduleauthor: Gilberto Pastorello <gzpastorello@lbl.gov>
:moduleauthor: Valerie Hendrix <vchendrix@lbl.gov>
:moduleauthor: Sy-Toan Ngo <sytoanngo@lbl.gov>
:platform: Unix, Windows
:created: 2025-10-09
:updated: 2025-12-09
.. currentmodule:: fluxnet_shuttle.core.shuttle
This module provides the main FluxnetShuttle class that orchestrates
operations across multiple data hub plugins.
"""
import logging
from typing import Any, AsyncGenerator, Dict, List, Optional
from fluxnet_shuttle.core.decorators import async_to_sync_generator
from fluxnet_shuttle.core.registry import ErrorCollectingIterator, registry
from ..models import ErrorSummary, FluxnetDatasetMetadata
from .base import DataHubPlugin
from .config import ShuttleConfig
logger = logging.getLogger(__name__)
[docs]
class FluxnetShuttle:
"""
Main orchestrator for FLUXNET operations with error collection.
This class provides the main interface for interacting with multiple
FLUXNET data hubs through their respective plugins. It handles error
collection and provides both sync and async interfaces.
"""
def __init__(self, data_hubs: Optional[List[str]] = None, config: Optional[ShuttleConfig] = None):
"""
Initialize the FLUXNET Shuttle.
Args:
data_hubs: List of data hub names to enable. If None, all configured data hubs are used.
config: Optional configuration object. If None, default config is loaded.
"""
self.registry = registry
self.config = config or ShuttleConfig.load_default()
if data_hubs is None:
# Use all enabled data hubs from config if none specified
self.data_hubs = [name for name, _ in self.config.data_hubs.items()]
else:
self.data_hubs = [name for name, _ in self.config.data_hubs.items() if name in data_hubs]
self._last_error_collector: Optional[ErrorCollectingIterator] = None
logger.info(f"Initialized FluxnetShuttle with data hubs: {self.data_hubs}")
[docs]
@async_to_sync_generator
async def get_all_sites(self, **filters: Any) -> AsyncGenerator[FluxnetDatasetMetadata, None]:
"""
Get sites from all enabled data hubs.
Args:
**filters: Optional filters to apply to site selection
Yields:
FluxnetDatasetMetadata: Site metadata objects from all data hubs
Examples:
Synchronous usage (simple):
>>> from fluxnet_shuttle.core.shuttle import FluxnetShuttle
>>> shuttle = FluxnetShuttle()
>>> for site in shuttle.get_all_sites():
... print(f"{site.site_info.site_id} from {site.site_info.data_hub}")
Asynchronous usage (when in async context):
>>> from fluxnet_shuttle.core.shuttle import FluxnetShuttle
>>> import asyncio
>>> async def list_sites():
... shuttle = FluxnetShuttle()
... async for site in shuttle.get_all_sites():
... print(f"{site.site_info.site_id} from {site.site_info.data_hub}")
>>> asyncio.run(list_sites())
"""
plugins = self._get_enabled_plugins()
if not plugins:
logger.warning("No enabled plugins found")
# For async generators, just return without yielding anything
return
# Create error collecting iterator
error_collector = ErrorCollectingIterator(plugins, "get_sites", **filters)
self._last_error_collector = error_collector
# Yield results using async iterator
try:
async for site in error_collector:
yield site
finally:
# Log summary after iteration completes
summary = error_collector.get_error_summary()
logger.info(f"Completed get_all_sites: {summary.total_results} results, " f"{summary.total_errors} errors")
[docs]
@async_to_sync_generator
async def download_dataset(
self,
site_id: str,
data_hub: str,
download_link: str,
**kwargs: Any,
) -> AsyncGenerator[bytes, None]:
"""
Download dataset as a stream of byte chunks from specified data hub.
This method delegates to the appropriate plugin's download_file method
through the orchestrator pattern, providing consistent error handling
and plugin management.
Args:
site_id: Site identifier
data_hub: Data hub name (e.g., "ameriflux", "icos")
download_link: URL to download from
**kwargs: Additional parameters passed to plugin's download_file method
Yields:
bytes: Byte chunks for the dataset
Examples:
Synchronous usage (simple):
>>> from fluxnet_shuttle.core.shuttle import FluxnetShuttle
>>> shuttle = FluxnetShuttle()
>>> with open("download.zip", "wb") as file:
... for chunk in shuttle.download_dataset(
... "ZA-Uby",
... "icos",
... "https://...",
... ):
... file.write(chunk)
Asynchronous usage (when in async context):
>>> from fluxnet_shuttle.core.shuttle import FluxnetShuttle
>>> import asyncio
>>> async def download():
... shuttle = FluxnetShuttle()
... with open("download.zip", "wb") as file:
... async for chunk in shuttle.download_dataset(
... "ZA-Uby",
... "icos",
... "https://...",
... ):
... file.write(chunk)
>>> asyncio.run(download())
"""
download_link = str(download_link)
try:
plugin = self._get_plugin_instance(data_hub.lower())
except ValueError as e:
logger.error(f"Failed to get plugin for data hub '{data_hub}': {e}")
# Re-raise to maintain the expected error behavior
raise
error_collector = ErrorCollectingIterator(
{plugin.name: plugin}, "download_file", site_id=site_id, download_link=download_link, **kwargs
)
self._last_error_collector = error_collector
try:
# Check if plugin has download_file method
if not hasattr(plugin, "download_file"):
raise AttributeError(f"Plugin '{plugin.name}' does not have 'download_file' method")
# Call the plugin's download_file method and yield byte chunks
async for chunk in plugin.download_file(site_id=site_id, download_link=download_link, **kwargs):
yield chunk
except Exception as e:
logger.error(f"Failed to download dataset for site {site_id} from {data_hub}: {e}")
error_collector.add_error(plugin.name, e, "download_file")
# Re-raise to let caller handle the error
raise
finally:
summary = error_collector.get_error_summary()
logger.info(f"Completed download_dataset: {summary.total_results} results, {summary.total_errors} errors")
[docs]
def get_errors(self) -> ErrorSummary:
"""
Get collected errors from last operation.
Returns:
ErrorSummary: Pydantic model containing error summary information
"""
if self._last_error_collector is not None:
summary: ErrorSummary = self._last_error_collector.get_error_summary()
return summary
return ErrorSummary(total_errors=0, total_results=0, errors=[])
[docs]
def list_available_data_hubs(self) -> List[str]:
"""
List all available data hub plugins.
Returns:
List of available data hub names
"""
plugins: List[str] = self.registry.list_plugins()
return plugins
def _get_enabled_plugins(self) -> Dict[str, Any]:
"""
Get instances of enabled plugins.
Returns:
Dict mapping data hub names to plugin instances
"""
plugins = {}
for data_hub_name in self.data_hubs:
plugin = self._get_plugin_instance(data_hub_name)
plugins[data_hub_name] = plugin
return plugins
def _get_plugin_instance(self, data_hub_name: str) -> DataHubPlugin:
"""
Get a plugin instance for the specified data hub.
Args:
data_hub_name: Name of the data hub
Returns:
DataHubPlugin instance
Raises:
ValueError: If data hub is not configured or plugin not found
"""
if data_hub_name not in self.config.data_hubs:
raise ValueError(f"Data hub '{data_hub_name}' not configured")
data_hub_config = self.config.data_hubs[data_hub_name]
if not data_hub_config.enabled:
raise ValueError(f"Data hub '{data_hub_name}' is disabled.")
settings = {
"fluxnet_shuttle_referer": self.config.fluxnet_shuttle_referer,
"http_timeouts": self.config.http_timeouts,
**data_hub_config.settings,
}
plugin: DataHubPlugin = self.registry.create_instance(data_hub_name, **settings)
return plugin