"""
Core functionality for FLUXNET Shuttle operations
:module: fluxnet_shuttle.shuttle
:moduleauthor: Gilberto Z. Pastorello <gzpastorello@lbl.gov>
:moduleauthor: Valerie Hendrix <vchendrix@lbl.gov>
:moduleauthor: Sy-Toan Ngo <sytoanngo@lbl.gov>
:platform: Unix, Windows
:created: 2024-10-31
:updated: 2025-12-09
.. currentmodule:: fluxnet_shuttle.shuttle
This module provides the core functionality for FLUXNET Shuttle operations,
including data discovery, download, and source management across multiple
FLUXNET data hubs.
The shuttle module serves as the main interface for interacting with different
FLUXNET data sources through a unified API.
Metadata Requirements
-----------------
Metadata fields:
* SITE_ID
* SITE_NAME
* TEAM_MEMBER_NAME
* TEAM_MEMBER_EMAIL
* TEAM_MEMBER_ROLE
* LOCATION_LAT
* LOCATION_LONG
* IGBP
* NETWORK (network affiliations)
* FLUXNET_PRODUCT_NAME
* PRODUCT_ID
* PRODUCT_CITATION
* PRODUCT_SOURCE_NETWORK
* FIRST_YEAR
* LAST_YEAR
* DOWNLOAD_LINK
* ONEFLUX_CODE_VERSION
* DATA_HUB
"""
import asyncio
import csv
import logging
import os
import re
import sys
from datetime import datetime
from typing import Any, Dict, List, Optional
from urllib.parse import unquote, urlparse
import aiofiles
from fluxnet_shuttle import FLUXNETShuttleError
from fluxnet_shuttle.core.decorators import async_to_sync
from fluxnet_shuttle.core.shuttle import FluxnetShuttle
_log = logging.getLogger(__name__)
# FLUXNET filename pattern: <network_id>_<site_id>_FLUXNET_<year_range>_<version>_<run>.zip
# Capture groups: 1=network_id, 2=site_id, 3=first_year, 4=last_year, 5=version, 6=run
_FLUXNET_ZIP_PATTERN = r"^([A-Z]{2,10})_([A-Z]{2}-[A-Za-z0-9]{3})_FLUXNET_(\d{4})-(\d{4})_(v\d+(?:\.\d+)?)_(r\d+)\.zip$"
# Delimiter for concatenating multiple values in CSV (e.g., team members)
CSV_MULTI_VALUE_DELIMITER = ";"
def _extract_filename_from_url(url: str) -> str:
"""
Extract a clean filename from a URL by removing query parameters.
:param url: URL to extract filename from
:type url: str
:return: Extracted filename without query parameters
:rtype: str
"""
parsed_url = urlparse(url)
# Get the path component (without query parameters)
path = parsed_url.path
# Extract the last part of the path as filename
filename = unquote(path.split("/")[-1])
return filename
@async_to_sync
async def _download_dataset(
site_id: str,
data_hub: str,
filename: str,
download_link: str,
output_dir: str = ".",
**kwargs: Any,
) -> str:
"""
Download a FLUXNET dataset for a specific site using the shuttle orchestrator.
This function delegates to the shuttle orchestrator's download_dataset method,
which handles plugin selection and error collection. The function receives
byte chunks from the orchestrator and handles file I/O using the
filename from the snapshot metadata.
:param site_id: Site identifier
:type site_id: str
:param data_hub: Data hub name (e.g., "AmeriFlux", "ICOS")
:type data_hub: str
:param filename: Filename from snapshot metadata
:type filename: str
:param download_link: Ready-to-use URL to download data from
:type download_link: str
:param output_dir: Directory to save downloaded files (default: current directory)
:type output_dir: str
:param kwargs: Additional keyword arguments.
- user_info: Dictionary with plugin-specific user tracking info (e.g., {"ameriflux": {...}})
Other kwargs are passed through to the plugin's download_file method.
:return: The filepath where the file was saved
:rtype: str
:raises FLUXNETShuttleError: If download fails
"""
_log.info(f"{data_hub}: downloading site {site_id} data file: {filename}")
try:
# Create shuttle instance to handle plugin orchestration
shuttle = FluxnetShuttle()
# Add filename to kwargs and pass everything to the orchestrator
kwargs["filename"] = filename
# Get byte chunks from orchestrator, which handles plugin selection and error collection
filepath = os.path.join(output_dir, filename)
# Warn if file already exists and will be overwritten
if os.path.exists(filepath):
_log.warning(f"{data_hub}: file already exists and will be overwritten: {filepath}")
# Write the stream to file
with open(filepath, "wb") as file:
async for chunk in shuttle.download_dataset(
site_id=site_id, data_hub=data_hub.lower(), download_link=download_link, **kwargs
):
file.write(chunk)
_log.info(f"{data_hub}: file downloaded successfully to {filepath}")
return filepath
except Exception as e:
msg = f"Failed to download {data_hub} file for site {site_id}: {e}"
_log.error(msg)
raise FLUXNETShuttleError(msg) from e
return "" # Should not reach here #pragma: no cover
[docs]
@async_to_sync
async def download(
site_ids: Optional[List[str]] = None,
snapshot_file: str = "",
output_dir: str = ".",
**kwargs: Any,
) -> List[str]:
"""
Download FLUXNET data for specified sites using configuration from a snapshot file.
Downloads are performed concurrently.
:param site_ids: List of site IDs to download data for. If None or empty, downloads all sites from snapshot file.
:type site_ids: Optional[List[str]]
:param snapshot_file: Path to CSV snapshot file containing site configuration
:type snapshot_file: str
:param output_dir: Directory to save downloaded files (default: current directory)
:type output_dir: str
:param kwargs: Additional keyword arguments passed to _download_dataset.
- user_info: Dictionary with plugin-specific user info (e.g., {"ameriflux": {...}})
:return: List of downloaded filenames
:rtype: list
:raises FLUXNETShuttleError: If snapshot_file is invalid or sites not found
"""
if not snapshot_file:
msg = "No snapshot file provided."
_log.error(msg)
raise FLUXNETShuttleError(msg)
# Load CSV snapshot file
if not os.path.exists(snapshot_file):
msg = f"Snapshot file {snapshot_file} does not exist."
_log.error(msg)
raise FLUXNETShuttleError(msg)
with open(snapshot_file, "r", encoding="utf-8", newline="") as f:
reader = csv.reader(f)
fields = next(reader)
sites = {}
for line in reader:
site = {field: line[i] for i, field in enumerate(fields)}
sites[site["site_id"]] = site
_log.debug(f"Loaded {len(sites)} sites from snapshot file")
# If no site IDs specified, download all sites from snapshot
if not site_ids:
site_ids = list(sites.keys())
_log.info(f"No site IDs specified. Will download all {len(site_ids)} sites from snapshot file.")
_log.info(f"Starting download with {len(site_ids)} site IDs: {site_ids} and snapshot file: {snapshot_file}")
# Check if site IDs are in the snapshot file
for site_id in site_ids:
if site_id not in sites:
msg = f"Site ID {site_id} not found in snapshot file."
_log.error(msg)
raise FLUXNETShuttleError(msg)
_log.debug("All site IDs found in snapshot file")
# Build download jobs (skip sites without filenames)
download_jobs = []
for site_id in site_ids:
site = sites[site_id]
data_hub = site["data_hub"]
download_link = site["download_link"]
filename = site.get("fluxnet_product_name")
if not filename:
_log.error(f"No filename found for site {site_id} from data hub {data_hub}. Skipping download.")
continue
_log.info(f"Downloading data for site {site_id} from data hub {data_hub}")
download_jobs.append(
{
"site_id": site_id,
"data_hub": data_hub,
"filename": filename,
"download_link": download_link,
}
)
if not download_jobs:
_log.info("No valid downloads found after processing snapshot file.")
return []
tasks = [
_download_dataset(
site_id=job["site_id"],
data_hub=job["data_hub"],
filename=job["filename"],
download_link=job["download_link"],
output_dir=output_dir,
**kwargs,
)
for job in download_jobs
]
results = await asyncio.gather(*tasks, return_exceptions=True)
downloaded_filenames: list[str] = []
for job, result in zip(download_jobs, results):
if isinstance(result, BaseException):
_log.error(f"Failed to download {job['filename']} for site {job['site_id']}: {result}")
else:
downloaded_filenames.append(result)
_log.info(f"Downloaded data for {len(downloaded_filenames)}/{len(download_jobs)} sites")
return downloaded_filenames
[docs]
@async_to_sync
async def listall(data_hubs: Optional[List[str]] = None, output_dir: str = ".") -> str:
"""
List all available FLUXNET data from specified data hubs.
:param data_hubs: List of data hub plugin names to include (e.g., ["ameriflux", "icos"]).
If None or empty, all available data hub plugins are included.
:type data_hubs: Optional[List[str]]
:param output_dir: Directory to save the snapshot file (default: current directory)
:type output_dir: str
:return: CSV filename containing data availability information
:rtype: str
"""
# If data_hubs is None or empty list, pass None to FluxnetShuttle to use all available plugins
if data_hubs is not None and len(data_hubs) == 0:
data_hubs = None
_log.debug(f"Data hubs to include: {data_hubs if data_hubs else 'all available'}")
shuttle = FluxnetShuttle(data_hubs=data_hubs)
# Combine data from all data hubs
fields = [
# Site information fields
"data_hub",
"site_id",
"site_name",
"location_lat",
"location_long",
"igbp",
"network",
# Team member fields (concatenated with delimiter)
"team_member_name",
"team_member_role",
"team_member_email",
# Product data fields
"first_year",
"last_year",
"download_link",
"fluxnet_product_name",
"product_citation",
"product_id",
"oneflux_code_version",
"product_source_network",
]
csv_filename = f"fluxnet_shuttle_snapshot_{datetime.now().strftime('%Y%m%dT%H%M%S')}.csv"
csv_filepath = os.path.join(output_dir, csv_filename)
counts = await _write_snapshot_file(shuttle, fields, csv_filepath)
_log.info(f"Wrote FLUXNET dataset snapshot to {csv_filepath}")
_log.info(f"Data hub counts: {counts}")
return csv_filepath
@async_to_sync
async def _write_snapshot_file(shuttle: FluxnetShuttle, fields: List[str], csv_filename: str) -> Dict[str, int]:
"""
Write FLUXNET metadata snapshot to a CSV file.
Creates a snapshot file containing complete metadata for all available
FLUXNET data from configured data hubs, including site information,
team members, and product details.
:param shuttle: FluxnetShuttle instance
:param fields: List of fields to include in the CSV
:param csv_filename: Output CSV filename path
:return: Dictionary with counts of sites per data hub
"""
counts: Dict[str, int] = {}
# map expansion for data hub counts
# Write to CSV file, using asyncio file operations
async with aiofiles.open(csv_filename, "w", encoding="utf-8") as csvfile:
csv_writer = csv.DictWriter(csvfile, fieldnames=fields)
await csv_writer.writeheader()
async for site in shuttle.get_all_sites():
counts.setdefault(site.site_info.data_hub, 0)
counts[site.site_info.data_hub] += 1
# Get site info fields (excluding team members and network which need special handling)
site_dict = site.site_info.model_dump(exclude={"group_team_member", "network"})
# Concatenate network values with delimiter
network_list = site.site_info.network
site_dict["network"] = CSV_MULTI_VALUE_DELIMITER.join(network_list) if network_list else ""
# Concatenate team member fields with delimiter
team_members = site.site_info.group_team_member
site_dict["team_member_name"] = (
CSV_MULTI_VALUE_DELIMITER.join([tm.team_member_name for tm in team_members]) if team_members else ""
)
site_dict["team_member_role"] = (
CSV_MULTI_VALUE_DELIMITER.join([tm.team_member_role for tm in team_members]) if team_members else ""
)
site_dict["team_member_email"] = (
CSV_MULTI_VALUE_DELIMITER.join([tm.team_member_email for tm in team_members]) if team_members else ""
)
# Add product data fields
product_dict = site.product_data.model_dump()
# Convert HttpUrl to string for CSV
product_dict["download_link"] = str(product_dict["download_link"])
site_dict.update(product_dict)
await csv_writer.writerow(site_dict)
return counts
# main function
if __name__ == "__main__":
sys.exit("ERROR: cannot run independently")