Source code for eta_nexus.connections.connection_utils

from __future__ import annotations

from asyncio import sleep as async_sleep
from datetime import datetime
from time import sleep
from typing import TYPE_CHECKING

from eta_nexus.util import ensure_timezone

if TYPE_CHECKING:
    from collections.abc import Sequence
    from typing import Any, Final

    import pandas as pd

    from eta_nexus.nodes import Node
    from eta_nexus.util.type_annotations import TimeStep


[docs] class RetryWaiter: """Helper class which keeps track of waiting time before retrying a connection.""" VALUES: Final[list[int]] = [0, 1, 3, 5, 5, 10, 20, 30, 40, 60] def __init__(self) -> None: self.counter = 0
[docs] def tried(self) -> None: """Register a retry with the RetryWaiter.""" self.counter += 1
[docs] def success(self) -> None: """Register a successful connection with the RetryWaiter.""" self.counter = 0
@property def wait_time(self) -> int: """Return the time to wait for.""" if self.counter >= len(self.VALUES) - 1: return self.VALUES[-1] return self.VALUES[self.counter]
[docs] def wait(self) -> None: """Wait/sleep synchronously.""" sleep(self.wait_time)
[docs] async def wait_async(self) -> None: """Wait/sleep asynchronously - must be awaited.""" await async_sleep(self.wait_time)
[docs] class IntervalChecker: """Class for the subscription interval checking.""" def __init__(self) -> None: #: Dictionary that stores the value and the time for checking changes and the time interval self.node_latest_values: dict[Node, list] = {} #: :py:func:`eta_nexus.util.ensure_timezone` self._assert_tz_awareness = ensure_timezone
[docs] def push( self, node: Node, value: Any | pd.Series | Sequence[Any], timestamp: datetime | pd.DatetimeIndex | TimeStep | None = None, ) -> None: """Push value and time in dictionary for a node. If the value doesn't change compared to the previous timestamp, the push is skipped. :param node: Node to check. :param value: Value from the subscription. :param timestamp: Time of the incoming value of the node. """ if node in self.node_latest_values: if value != self.node_latest_values[node][0]: self.node_latest_values[node] = [value, timestamp] elif node.interval is not None: self.node_latest_values[node] = [value, timestamp]
[docs] def check_interval_connection(self) -> bool | None: """Check the interval between old and new value. If no interval has been defined, the check interval is skipped. :return: Boolean for the interval check. """ # Get the current time to compare the interval time = self._assert_tz_awareness(datetime.now()) if len(self.node_latest_values) > 0: for node in self.node_latest_values: _time_since_last_check = ( (time - self.node_latest_values[node][1]).total_seconds() if node in self.node_latest_values else None ) if node in self.node_latest_values and _time_since_last_check is not None: if _time_since_last_check <= float(node.interval): # type: ignore[arg-type] _changed_within_interval = True else: _changed_within_interval = False break else: _changed_within_interval = True else: _changed_within_interval = True return _changed_within_interval