Source code for eta_nexus.subhandlers.multi_subhandler

from __future__ import annotations

from logging import getLogger
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from datetime import datetime
    from typing import Any

    from eta_nexus.nodes import Node


from eta_nexus.subhandlers.subhandler import SubscriptionHandler

log = getLogger(__name__)


[docs] class MultiSubHandler(SubscriptionHandler): """The MultiSubHandler can be used to distribute subscribed values to multiple different subscription handlers. The handlers can be registered using the register method. """ def __init__(self) -> None: super().__init__() self._handlers: list = []
[docs] def register(self, sub_handler: SubscriptionHandler) -> None: """Register a subscription handler. :param SubscriptionHandler sub_handler: SubscriptionHandler object to use for handling subscriptions. """ if not isinstance(sub_handler, SubscriptionHandler): raise TypeError("Subscription Handler should be an instance of the SubscriptionHandler class.") self._handlers.append(sub_handler)
[docs] def push(self, node: Node, value: Any, timestamp: datetime | None = None) -> None: """Receive data from a subscription. This should contain the node that was requested, a value and a timestamp when data was received. Push data to all registered sub-handlers. :param node: Node object the data belongs to. :param value: Value of the data. :param timestamp: Timestamp of receiving the data. """ for handler in self._handlers: handler.push(node, value, timestamp)
[docs] def close(self) -> None: """Finalize and close all subscription handlers.""" for handler in self._handlers: try: handler.close() except Exception: log.exception(f"Failed to close subscription handler {handler}.")