Source code for eta_nexus.nodes.emonio_node

from __future__ import annotations

from logging import getLogger
from typing import TYPE_CHECKING

import pandas as pd
from attrs import (
    field,
    validators as vld,
)

from eta_nexus.nodes.node import Node
from eta_nexus.nodes.node_utils import _lower_str

if TYPE_CHECKING:
    from typing import Any, Final


log = getLogger(__name__)


[docs] class EmonioConstants: """Dict constants for the Emonio API.""" #: Mapping of parameters to addresses PARAMETER_MAP: Final[dict[int, list[str]]] = { 0: ["VRMS", "V_RMS", "Voltage", "V", "Spannung"], 2: ["IRMS", "I_RMS", "Current", "I", "Strom"], 4: ["WATT", "Power", "W", "Leistung", "Wirkleistung"], 6: ["VAR", "Reactive Power", "VAR", "Blindleistung"], 8: ["VA", "Apparent Power", "VA", "Scheinleistung"], 10: ["FREQ", "Frequency", "Hz", "Frequenz"], 12: ["KWH", "Energy", "kWh", "Energie"], 14: ["PF", "Power Factor", "PF", "Leistungsfaktor"], 20: ["VRMS MIN", "VRMS_MIN", "Voltage Min", "V Min", "Spannung Min"], 22: ["VRMS MAX", "VRMS_MAX", "Voltage Max", "V Max", "Spannung Max"], 24: ["IRMS MIN", "IRMS_MIN", "Current Min", "I Min", "Strom Min"], 26: ["IRMS MAX", "IRMS_MAX", "Current Max", "I Max", "Strom Max"], 28: ["WATT MIN", "WATT_MIN", "Power Min", "W Min", "Leistung Min"], 30: ["WATT MAX", "WATT_MAX", "Power Max", "W Max", "Leistung Max"], 500: ["Temp", "degree", "Temperature", "°C", "Temperatur"], 800: ["Impulse", "Impuls"], } #: Create dictionary with all upper cased parameters UPPER_CASED: Final[dict[int, list[str]]] = { adr: [par.upper() for par in par_list] for (adr, par_list) in PARAMETER_MAP.items() } #: Mapping of phases to address offsets PHASE_MAP: Final[dict[str, int]] = { "a": 0, "b": 100, "c": 200, "abc": 300, }
[docs] class EmonioNode(Node, protocol="emonio"): """Node for the emonio. The parameter to read is specified by the name of the node. Available parameters are defined in the parameter_map class attribute. Additionally, the phase of the parameter can be specified, with 'a', 'b', 'c' or 'abc'. https://wiki.emonio.de/de/Emonio_P3 """ #: Modbus address of the parameter to read address: int = field(default=-1, kw_only=True, converter=int) #: Phase of the parameter (a, b, c). If not set, all phases are read phase: str = field(default="abc", kw_only=True, converter=_lower_str, validator=vld.in_(("a", "b", "c", "abc"))) def __attrs_post_init__(self) -> None: """Ensure that all required parameters are present and valid.""" super().__attrs_post_init__() if self.address == -1: address = self._translate_name() object.__setattr__(self, "address", address) _parameter = self.address % 100 _phase = self.address // 100 * 100 # Validate address if self.address in {500, 800}: pass elif _parameter not in EmonioConstants.PARAMETER_MAP or _phase not in EmonioConstants.PHASE_MAP.values(): raise ValueError(f"Address {self.address} for node {self.name} is not valid.") elif _parameter >= 20 and _parameter <= 30 and _phase == 300: raise ValueError("Phase must be set for MIN/MAX values") def _translate_name(self) -> int: """Translate the name of the node to the correct parameter name. :return: Modbus address of the parameter. """ parameter: int | None = None phase: int | None = None # Try to find matching parameter for the name for address in EmonioConstants.UPPER_CASED: # e.g. Server1.Voltage -> VOLTAGE parameter_str = self.name.split(".")[-1].upper() if parameter_str in EmonioConstants.UPPER_CASED[address]: parameter = address log.debug(f"Parameter {parameter_str} found at address {address}") break # If no parameter was found, raise an error if parameter is None: raise ValueError(f"Parameter for node {self.name} not found, name is not valid.") # Temperature and Impulse values do not have a phase if parameter in (500, 800): return parameter # Phase is set to 0, 100, 200 or 300. (300 is default) phase = EmonioConstants.PHASE_MAP[self.phase] # Return correct address (by adding the phase offset to the parameter) return parameter + phase @classmethod def _from_dict(cls, dikt: dict[str, Any]) -> EmonioNode: """Create an Emonio node from a dictionary of node information. :param dikt: dictionary with node information. :return: EmonioNode object. """ name, _, url, _, interval = cls._read_dict_info(dikt) phase = dikt.get("phase", "abc") phase = "abc" if pd.isna(phase) else phase address = dikt.get("address") address = -1 if pd.isna(address) else address try: return cls(name, url, "emonio", interval=interval, phase=phase, address=address) except (TypeError, AttributeError) as e: raise TypeError(f"Could not convert all types for node {name}.") from e