"""
Configuration System
====================
:module:: fluxnet_shuttle.core.config
:synopsis: Configuration system for FLUXNET Shuttle library
:moduleauthor: Valerie Hendrix <vchendrix@lbl.gov>
:moduleauthor: Sy-Toan Ngo <sytoanngo@lbl.gov>
:platform: Unix, Windows
:created: 2025-10-09
:updated: 2025-12-09
.. currentmodule:: fluxnet_shuttle.core.config
This module provides the configuration system for the FLUXNET Shuttle
library, including loading default and custom configurations.
"""
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, Optional
import yaml
logger = logging.getLogger(__name__)
# Default configuration values (overridden by config.yaml at runtime)
DEFAULT_PARALLEL_REQUESTS = 3
DEFAULT_DOWNLOAD_BATCH_SIZE = 10
DEFAULT_FLUXNET_SHUTTLE_REFERER = "local_shuttle"
# Default HTTP timeout values (used when config.yaml omits http_timeouts)
DEFAULT_HTTP_TIMEOUT_TOTAL: Optional[float] = None # no global deadline
DEFAULT_HTTP_SOCK_CONNECT: float = 60.0 # allow slow TLS handshakes on a busy network
DEFAULT_HTTP_SOCK_READ: float = 300.0 # 5 minute read timeout to avoid TLS issues
[docs]
@dataclass
class HttpTimeoutConfig:
"""HTTP timeout settings applied to all plugin requests."""
total: Optional[float] = DEFAULT_HTTP_TIMEOUT_TOTAL
sock_connect: float = DEFAULT_HTTP_SOCK_CONNECT
sock_read: float = DEFAULT_HTTP_SOCK_READ
[docs]
@dataclass
class DataHubConfig:
"""Configuration for a specific data hub.
Stores the ``enabled`` flag plus any additional plugin-specific
settings (e.g. ``base_url``, ``api_url``). Extra keys supplied
at construction time are kept in the ``settings`` dict and are
forwarded to the plugin instance via ``self.config``.
"""
enabled: bool = True
settings: Dict[str, Any] = field(default_factory=dict)
[docs]
@dataclass
class ShuttleConfig:
"""Main shuttle configuration."""
data_hubs: Dict[str, DataHubConfig] = field(default_factory=dict)
parallel_requests: int = DEFAULT_PARALLEL_REQUESTS
download_batch_size: int = DEFAULT_DOWNLOAD_BATCH_SIZE
fluxnet_shuttle_referer: str = DEFAULT_FLUXNET_SHUTTLE_REFERER
http_timeouts: HttpTimeoutConfig = field(default_factory=HttpTimeoutConfig)
[docs]
@classmethod
def load_default(cls) -> "ShuttleConfig":
"""
Load default configuration from packaged config.yaml.
Returns:
ShuttleConfig: Configuration object with default settings
"""
try:
# Try to load from package data first
try:
# Don't use deprecated pkg_resources if possible
import importlib.resources
config_data = importlib.resources.read_text("fluxnet_shuttle.plugins", "config.yaml")
config_dict = yaml.safe_load(config_data)
logger.info("Loaded default configuration from package")
except (ImportError, FileNotFoundError): # pragma: no cover
# Fallback to file path if pkg_resources fails
config_path = Path(__file__).parent.parent / "plugins" / "config.yaml"
if config_path.exists():
with open(config_path) as f:
config_dict = yaml.safe_load(f)
logger.info(f"Loaded default configuration from {config_path}")
else:
logger.warning("Default config file not found, using hardcoded defaults")
config_dict = cls._get_hardcoded_defaults()
# Parse configuration
config = cls()
if "data_hubs" in config_dict:
for data_hub_name, data_hub_data in config_dict["data_hubs"].items():
config.data_hubs[data_hub_name] = cls._parse_data_hub_config(data_hub_data)
# Update other settings
for key, value in config_dict.items():
if key == "data_hubs":
continue
if key == "http_timeouts" and isinstance(value, dict):
config.http_timeouts = HttpTimeoutConfig(
total=value.get("total", DEFAULT_HTTP_TIMEOUT_TOTAL),
sock_connect=value.get("sock_connect", DEFAULT_HTTP_SOCK_CONNECT),
sock_read=value.get("sock_read", DEFAULT_HTTP_SOCK_READ),
)
elif hasattr(config, key):
setattr(config, key, value)
return config
except Exception as e: # pragma: no cover
logger.warning(f"Failed to load default config: {e}, using hardcoded defaults")
return cls._create_default_config()
[docs]
@classmethod
def load_from_file(cls, config_path: Path) -> "ShuttleConfig":
"""
Load configuration from external YAML file.
Args:
config_path: Path to the configuration file
Returns:
ShuttleConfig: Configuration object
"""
if not config_path.exists():
logger.warning(f"Config file {config_path} not found, using defaults")
return cls.load_default()
try:
with open(config_path) as f:
config_dict = yaml.safe_load(f)
# Start with default config and override with file config
config = cls.load_default()
if "data_hubs" in config_dict:
for data_hub_name, data_hub_data in config_dict["data_hubs"].items():
if data_hub_name in config.data_hubs:
# Merge: override only the keys specified in the file
existing = config.data_hubs[data_hub_name]
known_fields = {"enabled"}
if "enabled" in data_hub_data:
existing.enabled = data_hub_data["enabled"]
override_settings = {k: v for k, v in data_hub_data.items() if k not in known_fields}
existing.settings.update(override_settings)
else:
config.data_hubs[data_hub_name] = cls._parse_data_hub_config(data_hub_data)
for key, value in config_dict.items():
if key == "data_hubs":
continue
if key == "http_timeouts" and isinstance(value, dict):
config.http_timeouts = HttpTimeoutConfig(
total=value.get("total", DEFAULT_HTTP_TIMEOUT_TOTAL),
sock_connect=value.get("sock_connect", DEFAULT_HTTP_SOCK_CONNECT),
sock_read=value.get("sock_read", DEFAULT_HTTP_SOCK_READ),
)
elif hasattr(config, key):
setattr(config, key, value)
logger.info(f"Loaded configuration from {config_path}")
return config
except Exception as e:
logger.warning(f"Failed to load config from {config_path}: {e}, using defaults")
return cls.load_default()
@classmethod
def _get_hardcoded_defaults(cls) -> Dict[str, Any]:
"""Get hardcoded default configuration."""
return {
"parallel_requests": DEFAULT_PARALLEL_REQUESTS,
"download_batch_size": DEFAULT_DOWNLOAD_BATCH_SIZE,
"fluxnet_shuttle_referer": DEFAULT_FLUXNET_SHUTTLE_REFERER,
"data_hubs": {
"ameriflux": {"enabled": True},
"icos": {"enabled": True},
"tern": {"enabled": True},
},
}
@classmethod
def _create_default_config(cls) -> "ShuttleConfig":
"""Create default configuration object."""
config_dict = cls._get_hardcoded_defaults()
config = cls()
for data_hub_name, data_hub_data in config_dict["data_hubs"].items():
config.data_hubs[data_hub_name] = cls._parse_data_hub_config(data_hub_data)
for key, value in config_dict.items():
if key != "data_hubs" and hasattr(config, key):
setattr(config, key, value)
return config
@staticmethod
def _parse_data_hub_config(data: Dict[str, Any]) -> "DataHubConfig":
"""Parse a data hub config dict into a DataHubConfig.
Known fields (``enabled``) are set as dataclass attributes.
All other keys are stored in the ``settings`` dict so they
can be forwarded to the plugin instance.
"""
known_fields = {"enabled"}
enabled = data.get("enabled", True)
settings = {k: v for k, v in data.items() if k not in known_fields}
return DataHubConfig(enabled=enabled, settings=settings)