Source code for eta_nexus.nodes.modbus_node

"""This module implements the node class, which is used to parametrize connections."""

from __future__ import annotations

import struct
from logging import getLogger
from typing import TYPE_CHECKING

from attrs import (
    field,
    validators as vld,
)

from eta_nexus.nodes.node import Node
from eta_nexus.nodes.node_utils import _lower_str
from eta_nexus.util import dict_get_any

if TYPE_CHECKING:
    from collections.abc import Sequence
    from typing import Any

    from eta_nexus.util.type_annotations import Primitive, Self


log = getLogger(__name__)

_FLOAT_TYPES = {2: "e", 4: "f", 8: "d"}
_INT_TYPES = {1: "b", 2: "h", 4: "i", 8: "q"}


def _mb_endianness_converter(value: str) -> str:
    """Convert some values for mb_byteorder.

    :param value: Value to be converted to mb_byteorder
    :return: mb_byteorder corresponding to correct scheme.
    """
    value = _lower_str(value)
    if value in {"little", "littleendian"}:
        return "little"

    if value in {"big", "bigendian"}:
        return "big"

    return ""


[docs] def bitarray_to_registers(bits: list[int | bool]) -> list[int]: """Convert a list of bits into a list of 16 bit 'bytes'.""" # Make sure that _bits is a list of integers, not bools. _bits = [int(x) for x in bits] if isinstance(bits[0], bool) else bits b_size = (len(_bits) + 15) // 16 register_list = [0] * b_size for i in range(b_size): start = i * 16 register_list[i] = int("".join([str(v) for v in _bits[start : start + 16]]), 2) return register_list
[docs] class ModbusNode(Node, protocol="modbus"): """Node for the Modbus protocol.""" #: Modbus Slave ID mb_slave: int | None = field(kw_only=True, default=32, converter=int) #: Modbus Register name. One of input, discrete_input, coils and holding. Note that only coils and #: holding can be written to. mb_register: str = field( kw_only=True, converter=_lower_str, validator=vld.in_(("input", "discrete_input", "coils", "holding")) ) #: Modbus Channel (Address of the value) mb_channel: int = field(kw_only=True, converter=int) #: Length of the value in bits (default 32). This determines, how much data is read from the server. The #: value must be a multiple of 16. mb_bit_length: int = field(kw_only=True, default=32, converter=int, validator=vld.ge(1)) #: Byteorder of values returned by modbus mb_byteorder: str = field(kw_only=True, converter=_mb_endianness_converter, validator=vld.in_(("little", "big"))) #: Wordorder of values returned by modbus mb_wordorder: str = field( default="big", kw_only=True, converter=_mb_endianness_converter, validator=vld.in_(("little", "big")) ) def __attrs_post_init__(self) -> None: """Add default port to the URL and convert mb_byteorder values.""" super().__attrs_post_init__() # Set port to default 502 if it was not explicitly specified if not isinstance(self.url_parsed.port, int): url = self.url_parsed._replace(netloc=f"{self.url_parsed.hostname}:502") object.__setattr__(self, "url", url.geturl()) object.__setattr__(self, "url_parsed", url) # Set default type of float if self.dtype is None: object.__setattr__(self, "dtype", float) if self.dtype not in (int, float, str, bytes, bool): raise ValueError( f"The given modbus data type was not recognized: {self.dtype}. Please use int, float, str, bytes, bool." ) @classmethod def _from_dict(cls, dikt: dict[str, Any]) -> Self: """Create a modbus node from a dictionary of node information. :param dikt: dictionary with node information. :return: ModbusNode object. """ name, pwd, url, usr, interval = cls._read_dict_info(dikt) # Initialize node if protocol is 'modbus' try: mb_register = cls._try_dict_get_any(dikt, "mb_register", "modbusregistertype") mb_channel = cls._try_dict_get_any(dikt, "mb_channel", "modbuschannel") mb_byteorder = cls._try_dict_get_any(dikt, "mb_byteorder", "modbusbyteorder") mb_wordorder = dict_get_any(dikt, "mb_wordorder", "modbuswordorder", fail=False, default="big") mb_slave = dict_get_any(dikt, "mb_slave", "modbusslave", fail=False, default=32) mb_bit_length = dict_get_any(dikt, "mb_bit_length", "mb_bitlength", fail=False, default=32) dtype = dict_get_any(dikt, "dtype", "datentyp", fail=False) except KeyError as e: raise KeyError( f"The required parameter for the node configuration was not found (see log). The node {name} could " f"not load." ) from e try: return cls( name, url, "modbus", usr=usr, pwd=pwd, mb_register=mb_register, mb_slave=mb_slave, mb_channel=mb_channel, mb_bit_length=mb_bit_length, mb_byteorder=mb_byteorder, mb_wordorder=mb_wordorder, dtype=dtype, interval=interval, ) except (TypeError, AttributeError) as e: raise TypeError(f"Could not convert all types for node {name}.") from e
[docs] def decode_modbus_value(self, value: Sequence[int]) -> Any: """Decode incoming modbus values. Strings are always decoded as utf-8 values. If you do not want this behaviour, specify 'bytes' as the data type for the Node. :param value: Current value to be decoded :return: Decoded value as the Node's data type. """ # Boolean values don't need decoding if self.dtype is bool: if len(value) > 1: raise ValueError(f"Length of boolean values mustn't exceed one, got {len(value)}") return bool(value[0]) bo = "<" if self.mb_byteorder == "little" else ">" # Swap words if word order is little endian if self.dtype in (int, float) and self.mb_wordorder == "little": value = value[::-1] dtype, _len = self._get_decode_params(value) # Determine the format strings for packing and unpacking the received byte sequences. These format strings # depend on the endianness (determined by bo), the length of the value in bytes and the data type. pack = f">{len(value):1d}H" unpack = f"{bo}{_len}{dtype}" # Convert the value into the appropriate format val: Any = struct.unpack(unpack, struct.pack(pack, *value))[0] if self.dtype is str: try: val = str(val, encoding="utf-8") except UnicodeDecodeError: log.exception(f"Could not convert value {val} to string") val = "" else: val = val.rstrip("\x00") elif self.dtype is not None: val = self.dtype(val) else: val = float(val) return val
[docs] def encode_bits(self, value: Primitive) -> list[int]: """Encode python data type to modbus value. This means an array of bytes to send to a modbus server. :param value: Current value to be decoded into float. :return: Decoded value as a python type. """ # Make sure that value is of the type specified by the node. if self.dtype is not None: value = self.dtype(value) if self.dtype is str: value = bytes(str(value), encoding="utf-8") _type, _len = self._get_encode_params(value) bo = "<" if self.mb_byteorder == "little" else ">" try: byte = struct.pack(f"{bo}{_len}{_type}", value) except struct.error as e: raise ValueError(f"Could not convert value {value!r} to bits.") from e bitstrings = [f"{bin(x)[2:]:0>8}" for x in byte] return [int(z) for z in "".join(bitstrings)]
def _get_encode_params(self, value: Primitive) -> tuple[str, int]: byte_length: int = self.mb_bit_length // 8 try: if isinstance(value, int): type_format = _INT_TYPES[byte_length] if value >= 0: type_format = type_format.upper() # Use unsigned integer return type_format, 1 if isinstance(value, (str, bytes)): return "s", byte_length if isinstance(value, float) or (value := float(value)): return _FLOAT_TYPES[byte_length], 1 except KeyError as e: raise ValueError( f"The length of the value ({byte_length}) does not match the data type: {type(value)}" ) from e # Fallback for unsupported types raise TypeError(f"Unsupported value type: {type(value)}") def _get_decode_params(self, value: Sequence[int]) -> tuple[str, int]: """Provide parameters for decoding incoming modbus values :param value: Incoming data :return: Struct format and length """ byte_length = len(value) * 2 # Conversion from 16 to 8 Bit length (Register to Byte) try: if self.dtype is int: return _INT_TYPES[byte_length], 1 if self.dtype is float: return _FLOAT_TYPES[byte_length], 1 except KeyError: raise ValueError( f"The length of the received value ({len(value)}) does not match the data type: {self.dtype}" ) from None else: # str and bytes return "s", byte_length