Source code for fluxnet_shuttle.core.decorators

"""
Decorators module
=================

:module:: fluxnet_shuttle.core.decorators
:synopsis: Decorators for FLUXNET Shuttle Library
:moduleauthor: Valerie Hendrix <vchendrix@lbl.gov>
:moduleauthor: Sy-Toan Ngo <sytoanngo@lbl.gov>
:platform: Unix, Windows
:created: 2025-10-09
:updated: 2025-12-09

.. currentmodule:: fluxnet_shuttle.core.decorators

This module provides decorators for converting between sync and async operations.
"""

import asyncio
import functools
from typing import Any, Callable, Generator, TypeVar

T = TypeVar("T")


[docs] def async_to_sync(func: Callable[..., Any]) -> Callable[..., Any]: """ Decorator: If there is no event loop, run synchronously """ # check if the function is a coroutine function if not asyncio.iscoroutinefunction(func): raise TypeError( f"The async_to_sync decorator can only be applied to async functions - '{func.__name__}' is not async." ) @functools.wraps(func) def function_wrapper(*args: Any, **kwargs: Any) -> Any: loop = asyncio.get_event_loop() if loop.is_running(): # If there is already a running event loop, just call the async function directly # This cannot be converted to sync in this case return func(*args, **kwargs) else: # No running event loop, so we can create one and run the async function synchronously # Create a new event loop for this sync call # Get the async function f = func(*args, **kwargs) loop = asyncio.get_event_loop() task = loop.create_task(f) loop.run_until_complete(task) return task.result() return function_wrapper
[docs] def async_to_sync_generator(func: Callable[..., Any]) -> Callable[..., Any]: """ Decorator that enables both async and sync usage of async generator methods. This decorator modifies the method so that: - When called with `async for`, it works as an async generator - When called with regular `for` or `list()`, it works as a sync iterable The detection is based on whether the caller tries to use the result as an async iterator (__aiter__) or sync iterator (__iter__). """ @functools.wraps(func) def wrapper(self: Any, *args: Any, **kwargs: Any) -> "_HybridAsyncSyncIterator": """Wrapper that returns a hybrid iterator object.""" return _HybridAsyncSyncIterator(func, self, *args, **kwargs) # Store reference to original function for direct access setattr(wrapper, "_original_async_func", func) return wrapper
class _HybridAsyncSyncIterator: """ A hybrid iterator that can work both as async and sync iterator. """ def __init__(self, async_func: Callable[..., Any], *args: Any, **kwargs: Any) -> None: self._async_func = async_func self._args = args self._kwargs = kwargs self._async_gen: Any = None self._sync_gen: Any = None def __aiter__(self) -> "_HybridAsyncSyncIterator": """Return self for async iteration.""" return self async def __anext__(self) -> Any: """Async next method.""" if self._async_gen is None: # Call the original async function directly to get the actual async generator self._async_gen = self._async_func(*self._args, **self._kwargs) return await self._async_gen.__anext__() def __iter__(self) -> "_HybridAsyncSyncIterator": """Return iterator for sync iteration.""" if self._sync_gen is None: self._sync_gen = _sync_generator_wrapper(self._async_func, *self._args, **self._kwargs) return self def __next__(self) -> Any: """Sync next method.""" # Ensure the sync generator is initialized if self._sync_gen is None: # pragma: no cover self._sync_gen = _sync_generator_wrapper(self._async_func, *self._args, **self._kwargs) # Delegate to the sync generator's next method return next(self._sync_gen) # pragma: no cover def _sync_generator_wrapper(async_func: Callable[..., Any], *args: Any, **kwargs: Any) -> Generator[Any, None, None]: """Convert async generator to sync generator.""" # Create a new event loop for this sync call loop = asyncio.get_event_loop() try: # Ensure no event loop is running assert not loop.is_running(), "Event loop is already running, cannot convert to sync generator" # Get the async generator loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) async_gen = async_func(*args, **kwargs) # Convert to sync generator while True: try: # Get next item from async generator item = loop.run_until_complete(async_gen.__anext__()) yield item except StopAsyncIteration: break finally: # Clean up if "async_gen" in locals(): try: loop.run_until_complete(async_gen.aclose()) finally: loop.close()