InfluxDB Connection

The InfluxDB3 integration in eta_nexus consists of an InfluxNode (that describes where a signal lives in InfluxDB) and an InfluxConnection (that knows how to read/write those signals using the official influxdb-client-3). The connector talks to InfluxDB v3 via SQL and exchanges data as pandas DataFrame/Series.

What this module does

  • Read latest values per node using a generated SELECT that returns a single timestamped row. See read().

  • Read historic series over a half-open interval [from_time, to_time) into a time-indexed frame (one column per node/field). See read_series().

  • Write current values for one or many nodes (grouped by measurement/table). See write().

  • Write historic series from either a mapping {node: pd.Series} or a DataFrame whose columns match node fields. See write_series().

How nodes map to InfluxDB

  • InfluxNode carries: - database: the Influx bucket/database. - table: the measurement/table. - name: used as the Influx field/column name (also available as node.field).

  • Nodes are grouped by table for efficient reads/writes; each group yields one SQL query or write call.

Authentication & configuration

  • Pass token=... to the connection or set environment variable INFLUXDB3_AUTH_TOKEN.

  • The target database can be given as database=...; if omitted it may be inferred from the first node or from INFLUXDB_DB (if set).

  • The connection inherits base settings from Connection (e.g., URL, user/password).

Data/Time handling

  • Time series are indexed by a datetime index named time (UTC recommended). The connector will error if the index is not datetime-like.

  • Reads return frames with requested columns ordered as the input node order.

  • For historic reads the interval is half-open [from_time, to_time); writes preserve exact timestamps.

Notes & limitations

  • This connector targets InfluxDB v3 + SQL; it does not use Flux.

  • When writing a DataFrame, all columns must correspond to known node fields; unknown columns raise a ValueError.

  • Some server-side resampling parameters (e.g., interval) are accepted for API compatibility and may be ignored by the backend.

See the API sections below for the full class/method reference and the example for a minimal end-to-end read.

Influx Node

class eta_nexus.nodes.InfluxNode(name: str, url: str, protocol: str, *args: Any, **kwargs: Any)[source]

Node for the InfluxDB module.

database: str
table: str
property field: str

Alias for the underlying field/column name.

as_dict(*, filter_none: bool = False, **kwargs: Any) dict[str, Any]

Return the attrs attribute values of node instance as a dict.

Parameters:

filter_none – Filter none values, defaults to False

Returns:

dict of attribute values

as_tuple(*, filter_none: bool = False, **kwargs: Any) tuple[Any, ...]

Return the attrs attribute values of inst as a tuple.

Parameters:

filter_none – Filter none values, defaults to False

Returns:

tuple of attribute values

connection_identifier() str

Unique identifier for the connection that is associated with the node (i.e. would be created by Connection.from_node())

evolve(**kwargs: Any) Self

Returns a new node instance by copying the current node and changing only specified keyword arguments.

This allows for seamless node instantiation with only a few changes.

Parameters:

kwargs – Keyword arguments to change.

Returns:

New instance of the node.

classmethod from_dict(dikt: Sequence[Mapping] | Mapping[str, Any], *, fail: bool = True) list[Self]

Create nodes from a dictionary of node configurations. The configuration must specify the following fields for each node:

  • Code (or name), URL, Protocol (i.e. modbus or opcua or eneffco). The URL should be a complete network location identifier. Alternatively it is possible to specify the location in two fields: IP and Port. These should only contain the respective parts (as in only an IP address and only the port number). The IP-Address should always be given without scheme (https://).

For local nodes no additional fields are required.

For Modbus nodes the following additional fields are required:

  • ModbusRegisterType (or mb_register), ModbusSlave (or mb_slave), ModbusChannel (or mb_channel).

For OPC UA nodes the following additional fields are required:

  • Identifier.

For Eneffco nodes the code field must be present.

For EntsoE nodes the endpoint field must be present.

Parameters:
  • dikt – Configuration dictionary.

  • fail – Set this to false, if you would like to log errors instead of raising them.

Returns:

List of Node objects.

classmethod from_excel(path: Path, sheet_name: str, *, fail: bool = True) list[Self]

Method to read out nodes from an Excel document. The document must specify the following fields:

  • Code, IP, Port, Protocol (modbus or opcua or eneffco).

For Modbus nodes the following additional fields are required:

  • ModbusRegisterType, ModbusByte, ModbusChannel.

For OPC UA nodes the following additional fields are required:

  • Identifier.

For Eneffco nodes the Code field must be present.

The IP-Address should always be given without scheme (https://).

Parameters:
  • path – Path to Excel document.

  • sheet_name – name of Excel sheet, which will be read out.

  • fail – Set this to false, if you would like to log errors instead of raising them.

Returns:

List of Node objects.

name: str

Name for the node.

url: str

URL of the connection.

url_parsed: ParseResult

Parse result object of the URL (in case more post-processing is required).

protocol: str

Protocol of the connection.

usr: str | None

Username for login to the connection (default: None).

pwd: str | None

Password for login to the connection (default: None).

interval: str | None

Interval

dtype: Callable | None

Data type of the node (for value conversion). Note that strings will be interpreted as utf-8 encoded. If you do not want this behaviour, use ‘bytes’.

Influx Connection

class eta_nexus.connections.InfluxConnection(*args: Any, **kwargs: Any)[source]

InfluxDB v3 connection using SQL+Pandas.

Parameters (in addition to Connection):
database (str): Database (a.k.a. bucket) to connect to. If omitted, we try

to infer from the first provided node or from INFLUXDB_DB.

token (str, optional): Auth token for InfluxDB v3. If omitted, we try

INFLUXDB3_AUTH_TOKEN and finally pwd from the base connection.

logger: Logger = <Logger eta_nexus.connections.influx_connection (WARNING)>
read(nodes: InfluxNode | Nodes[InfluxNode] | None = None) pd.DataFrame[source]

Read the latest value for each requested node.

Returns:

pd.DataFrame: Single-row DataFrame indexed by timestamp with one column per node field.

read_series(from_time: datetime, to_time: datetime, nodes: InfluxNode | Nodes[InfluxNode] | None = None, interval: TimeStep = 1, **kwargs: Any) pd.DataFrame[source]

Read historic series for each requested node over the partly-open interval [from_time, to_time). The interval parameter is currently accepted for API compatibility and may be used by backends that support server-side resampling.

Returns:

pd.DataFrame: Time-indexed frame with one column per node field.

write(values: Mapping[InfluxNode, Any]) None[source]

Write current values for the provided nodes.

Groups by table/measurement and writes one row per table at the rounded current time.

write_series(values: Mapping[InfluxNode, pd.Series] | pd.DataFrame, *, allow_overwrite: bool = True, **kwargs: Any) None[source]

Write historic time series.

Accepts either:
  • Mapping[InfluxNode, pd.Series]: each Series must have a datetime-like index.

  • pd.DataFrame: datetime-like index; columns must match node fields of selected_nodes.

Args:

allow_overwrite: Currently forwarded to the underlying client if supported.

classmethod from_node(node: Nodes[N] | N, usr: str | None = None, pwd: str | None = None, **kwargs: Any) Self

Will return a single connection for an enumerable of nodes with the same url netloc.

Initialize the connection object from a node object. When a list of Node objects is provided, from_node checks if all nodes match the same connection; it throws an error if they don’t. A node matches a connection if it has the same url netloc.

Parameters:
  • node – Node to initialize from.

  • kwargs – Other arguments are ignored.

Raises:

ValueError: if not all nodes match the same connection.

Returns:

Connection object

classmethod from_nodes(nodes: Nodes[N], **kwargs: Any) dict[str, Connection[N]]

Returns a dictionary of connections for nodes with the same url netloc.

This method handles different Connections, unlike from_node(). The keys of the dictionary are the netlocs of the nodes and each connection contains the nodes with the same netloc. (Uses from_node to initialize connections from nodes.).

Parameters:
  • nodes – List of nodes to initialize from.

  • kwargs – Other arguments are ignored.

Returns:

Dictionary of Connection objects with the netloc as key.

property url: str
url_parsed: ParseResult

URL of the server to connect to

usr: str | None

Username for login to server

pwd: str | None

Password for login to server

exc: BaseException | None
selected_nodes

Preselected nodes which will be used for reading and writing, if no other nodes are specified

Example Usage

 1import os
 2from datetime import datetime, timedelta, timezone
 3
 4from eta_nexus.connections import InfluxConnection
 5from eta_nexus.nodes.influx_node import InfluxNode
 6from eta_nexus.util.io_utils import autoload_env
 7
 8autoload_env()
 9url = os.getenv("INFLUX_HOST")
10if not url:
11    raise ValueError("Set INFLUX_HOST env variable.")
12node = InfluxNode(name="hum", url=url, protocol="influx", database="foo", table="home")
13conn = InfluxConnection.from_node(node)
14now = datetime.now(timezone.utc)
15from_time = now - timedelta(hours=1)
16res = conn.read_series(from_time=from_time, to_time=now)