"""
Plugin Registry and Error Collection
====================================
:module:: fluxnet_shuttle.core.registry
:synopsis: Plugin registry and error collection for FLUXNET Shuttle Library
:moduleauthor: Valerie Hendrix <vchendrix@lbl.gov>
:moduleauthor: Sy-Toan Ngo <sytoanngo@lbl.gov>
:platform: Unix, Windows
:created: 2025-10-09
:updated: 2025-12-09
.. currentmodule:: fluxnet_shuttle.core.registry
This module provides the plugin registry for managing data hub plugins
and error collection capabilities.
"""
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, AsyncGenerator, Dict, List, Type
from ..models import ErrorSummary, FluxnetDatasetMetadata, PluginErrorDetail
from .base import DataHubPlugin
logger = logging.getLogger(__name__)
[docs]
@dataclass
class PluginErrorInfo:
"""Container for plugin execution errors."""
plugin_name: str
error: Exception
timestamp: datetime = field(default_factory=datetime.now)
operation: str = ""
[docs]
class ErrorCollectingIterator:
"""
Async iterator that collects errors while yielding results.
This class implements the async iterator protocol and collects results
from multiple plugins while isolating and collecting any errors that occur.
"""
def __init__(
self,
plugins: Dict[str, DataHubPlugin],
operation: str,
**kwargs: Any,
) -> None:
"""
Initialize the error collecting iterator.
Args:
plugins: Dictionary of plugin instances to iterate over
operation: The operation being performed (e.g., 'get_sites')
**kwargs: Arguments to pass to the plugin operation
"""
self.plugins = plugins
self.operation = operation
self.kwargs = kwargs
self.errors: List[PluginErrorInfo] = []
self._results_count = 0
self._plugin_iterators: Dict[str, AsyncGenerator[FluxnetDatasetMetadata, None]] = {}
self._completed_plugins: set[str] = set()
def __aiter__(self) -> "ErrorCollectingIterator":
"""Return self as the async iterator."""
return self
async def __anext__(self) -> FluxnetDatasetMetadata:
"""
Get next result from any available plugin.
Returns:
FluxnetDatasetMetadata: Next available site metadata
Raises:
StopAsyncIteration: When no more results are available
"""
# Initialize iterators for plugins that haven't been started
for plugin_name, plugin in self.plugins.items():
if plugin_name not in self._plugin_iterators and plugin_name not in self._completed_plugins:
try:
# check if plugin has the requested operation
if not hasattr(plugin, self.operation):
logger.warning(f"Plugin '{plugin_name}' does not have operation '{self.operation}'")
self.add_error(
plugin_name, AttributeError(f"Operation '{self.operation}' not found"), self.operation
)
self._completed_plugins.add(plugin_name)
continue
# now check if it's callable and is an async generator as expected
if not callable(getattr(plugin, self.operation)):
logger.warning(f"Plugin '{plugin_name}' operation '{self.operation}' is not callable")
self.add_error(
plugin_name,
TypeError(f"Operation '{self.operation}' is not callable"),
self.operation,
)
self._completed_plugins.add(plugin_name)
continue
# Initialize the async generator
iterator = getattr(plugin, self.operation)(**self.kwargs)
if not hasattr(iterator, "__aiter__"):
logger.warning(f"Plugin '{plugin_name}' operation '{self.operation}' is not an async generator")
self.add_error(
plugin_name,
TypeError(f"Operation '{self.operation}' is not an async generator"),
self.operation,
)
self._completed_plugins.add(plugin_name)
continue
self._plugin_iterators[plugin_name] = getattr(plugin, self.operation)(**self.kwargs).__aiter__()
except Exception as e: # pragma: no cover
# should not happen, but just in case
logger.warning(f"Error initializing plugin '{plugin_name}': {e}")
self.add_error(plugin_name, e, self.operation)
self._completed_plugins.add(plugin_name)
# Try to get next result from any plugin
while self._plugin_iterators:
# Try each plugin iterator
for plugin_name in list(self._plugin_iterators.keys()):
try:
result = await self._plugin_iterators[plugin_name].__anext__()
self._results_count += 1
return result
except StopAsyncIteration:
# This plugin is done
del self._plugin_iterators[plugin_name]
self._completed_plugins.add(plugin_name)
except Exception as e:
# Error in this plugin
self.add_error(plugin_name, e, self.operation)
del self._plugin_iterators[plugin_name]
self._completed_plugins.add(plugin_name)
# No more results from any plugin
raise StopAsyncIteration
[docs]
def add_error(self, plugin_name: str, error: Exception, operation: str = "") -> None:
"""
Add an error to the collection.
Args:
plugin_name: Name of the plugin that encountered the error
error: The exception that occurred
operation: The operation being performed when the error occurred
"""
self.errors.append(PluginErrorInfo(plugin_name=plugin_name, error=error, operation=operation))
logger.warning(f"Plugin '{plugin_name}' error in '{operation}': {error}")
[docs]
def has_errors(self) -> bool:
"""
Check if any errors were collected.
Returns:
bool: True if any errors occurred
"""
return len(self.errors) > 0
[docs]
def get_error_summary(self) -> ErrorSummary:
"""
Get summary of all errors.
Returns:
ErrorSummary: Pydantic model containing error summary information
"""
error_details = [
PluginErrorDetail(
data_hub=error.plugin_name,
operation=error.operation,
error=str(error.error),
error_type=type(error.error).__name__,
timestamp=error.timestamp.isoformat(),
)
for error in self.errors
]
return ErrorSummary(
total_errors=len(self.errors),
total_results=self._results_count,
errors=error_details,
)
[docs]
class PluginRegistry:
"""
Registry for managing data hub plugins with automatic discovery.
This class manages the registration and instantiation of data hub plugins,
including automatic discovery through entry points.
"""
def __init__(self) -> None:
"""Initialize the plugin registry."""
self._plugins: Dict[str, Type[DataHubPlugin]] = {}
self._instances: Dict[str, DataHubPlugin] = {}
[docs]
def register(self, plugin_class: Type[DataHubPlugin]) -> None:
"""
Register a data hub plugin.
Args:
plugin_class: The plugin class to register
"""
if not issubclass(plugin_class, DataHubPlugin):
raise TypeError("Plugin class must inherit from DataHubPlugin")
# Check for duplicate names
temp_instance = plugin_class()
plugin_name = temp_instance.name.lower()
if plugin_name in self._plugins:
raise ValueError(f"Plugin with name '{plugin_name}' is already registered.")
# Create a temporary instance to get the plugin name
temp_instance = plugin_class()
plugin_name = temp_instance.name.lower()
self._plugins[plugin_name] = plugin_class
logger.debug(f"Registered plugin: {plugin_name}")
[docs]
def get_plugin(self, name: str) -> Type[DataHubPlugin]:
"""
Get a plugin by name.
Args:
name: Plugin name
Returns:
Plugin class or None if not found
Raises:
ValueError: If plugin is not found
"""
# Check plugin, raise error if not found
plugin = self._plugins.get(name.lower(), None)
if not plugin:
raise ValueError(f"Plugin with name '{name}' not found. Available plugins: {self.list_plugins()}")
return plugin
[docs]
def list_plugins(self) -> List[str]:
"""
List all registered plugin names.
Returns:
List of plugin names
"""
return list(self._plugins.keys())
[docs]
def create_instance(
self,
name: str,
**config: Any,
) -> DataHubPlugin:
"""
Create an instance of a plugin.
Args:
name: Plugin name
**config: Configuration parameters
Returns:
Plugin instance
Raises:
ValueError: If plugin is not found
"""
try:
plugin_class = self.get_plugin(name)
return plugin_class(config=config)
except Exception as e:
logger.error(f"Error creating plugin instance '{name}': {e}")
raise e
# Global registry instance
registry = PluginRegistry()