InfluxDB Connection
The InfluxDB3 integration in eta_nexus consists of an InfluxNode
(that describes where a signal lives in InfluxDB) and an
InfluxConnection
(that knows how to read/write those signals using the official influxdb-client-3).
The connector talks to InfluxDB v3 via SQL and exchanges data as pandas
DataFrame/Series.
What this module does
Read latest values per node using a generated
SELECTthat returns a single timestamped row. Seeread().Read historic series over a half-open interval
[from_time, to_time)into a time-indexed frame (one column per node/field). Seeread_series().Write current values for one or many nodes (grouped by measurement/table). See
write().Write historic series from either a mapping
{node: pd.Series}or aDataFramewhose columns match node fields. Seewrite_series().
How nodes map to InfluxDB
InfluxNodecarries: -database: the Influx bucket/database. -table: the measurement/table. -name: used as the Influx field/column name (also available asnode.field).Nodes are grouped by table for efficient reads/writes; each group yields one SQL query or write call.
Authentication & configuration
Pass
token=...to the connection or set environment variableINFLUXDB3_AUTH_TOKEN.The target database can be given as
database=...; if omitted it may be inferred from the first node or fromINFLUXDB_DB(if set).The connection inherits base settings from
Connection(e.g., URL, user/password).
Data/Time handling
Time series are indexed by a datetime index named
time(UTC recommended). The connector will error if the index is not datetime-like.Reads return frames with requested columns ordered as the input node order.
For historic reads the interval is half-open
[from_time, to_time); writes preserve exact timestamps.
Notes & limitations
This connector targets InfluxDB v3 + SQL; it does not use Flux.
When writing a
DataFrame, all columns must correspond to known node fields; unknown columns raise aValueError.Some server-side resampling parameters (e.g.,
interval) are accepted for API compatibility and may be ignored by the backend.
See the API sections below for the full class/method reference and the example for a minimal end-to-end read.
Influx Node
- class eta_nexus.nodes.InfluxNode(name: str, url: str, protocol: str, *args: Any, **kwargs: Any)[source]
Node for the InfluxDB module.
- database: str
- table: str
- property field: str
Alias for the underlying field/column name.
- as_dict(*, filter_none: bool = False, **kwargs: Any) dict[str, Any]
Return the attrs attribute values of node instance as a dict.
- Parameters:
filter_none – Filter none values, defaults to False
- Returns:
dict of attribute values
- as_tuple(*, filter_none: bool = False, **kwargs: Any) tuple[Any, ...]
Return the attrs attribute values of inst as a tuple.
- Parameters:
filter_none – Filter none values, defaults to False
- Returns:
tuple of attribute values
- connection_identifier() str
Unique identifier for the connection that is associated with the node (i.e. would be created by Connection.from_node())
- evolve(**kwargs: Any) Self
Returns a new node instance by copying the current node and changing only specified keyword arguments.
This allows for seamless node instantiation with only a few changes.
- Parameters:
kwargs – Keyword arguments to change.
- Returns:
New instance of the node.
- classmethod from_dict(dikt: Sequence[Mapping] | Mapping[str, Any], *, fail: bool = True) list[Self]
Create nodes from a dictionary of node configurations. The configuration must specify the following fields for each node:
Code (or name), URL, Protocol (i.e. modbus or opcua or eneffco). The URL should be a complete network location identifier. Alternatively it is possible to specify the location in two fields: IP and Port. These should only contain the respective parts (as in only an IP address and only the port number). The IP-Address should always be given without scheme (https://).
For local nodes no additional fields are required.
For Modbus nodes the following additional fields are required:
ModbusRegisterType (or mb_register), ModbusSlave (or mb_slave), ModbusChannel (or mb_channel).
For OPC UA nodes the following additional fields are required:
Identifier.
For Eneffco nodes the code field must be present.
For EntsoE nodes the endpoint field must be present.
- Parameters:
dikt – Configuration dictionary.
fail – Set this to false, if you would like to log errors instead of raising them.
- Returns:
List of Node objects.
- classmethod from_excel(path: Path, sheet_name: str, *, fail: bool = True) list[Self]
Method to read out nodes from an Excel document. The document must specify the following fields:
Code, IP, Port, Protocol (modbus or opcua or eneffco).
For Modbus nodes the following additional fields are required:
ModbusRegisterType, ModbusByte, ModbusChannel.
For OPC UA nodes the following additional fields are required:
Identifier.
For Eneffco nodes the Code field must be present.
The IP-Address should always be given without scheme (https://).
- Parameters:
path – Path to Excel document.
sheet_name – name of Excel sheet, which will be read out.
fail – Set this to false, if you would like to log errors instead of raising them.
- Returns:
List of Node objects.
- name: str
Name for the node.
- url: str
URL of the connection.
- url_parsed: ParseResult
Parse result object of the URL (in case more post-processing is required).
- protocol: str
Protocol of the connection.
- dtype: Callable | None
Data type of the node (for value conversion). Note that strings will be interpreted as utf-8 encoded. If you do not want this behaviour, use ‘bytes’.
Influx Connection
- class eta_nexus.connections.InfluxConnection(*args: Any, **kwargs: Any)[source]
InfluxDB v3 connection using SQL+Pandas.
- Parameters (in addition to
Connection): - database (str): Database (a.k.a. bucket) to connect to. If omitted, we try
to infer from the first provided node or from
INFLUXDB_DB.- token (str, optional): Auth token for InfluxDB v3. If omitted, we try
INFLUXDB3_AUTH_TOKENand finallypwdfrom the base connection.
- logger: Logger = <Logger eta_nexus.connections.influx_connection (WARNING)>
- read(nodes: InfluxNode | Nodes[InfluxNode] | None = None) pd.DataFrame[source]
Read the latest value for each requested node.
- Returns:
pd.DataFrame: Single-row DataFrame indexed by timestamp with one column per node field.
- read_series(from_time: datetime, to_time: datetime, nodes: InfluxNode | Nodes[InfluxNode] | None = None, interval: TimeStep = 1, **kwargs: Any) pd.DataFrame[source]
Read historic series for each requested node over the partly-open interval
[from_time, to_time). The interval parameter is currently accepted for API compatibility and may be used by backends that support server-side resampling.- Returns:
pd.DataFrame: Time-indexed frame with one column per node field.
- write(values: Mapping[InfluxNode, Any]) None[source]
Write current values for the provided nodes.
Groups by table/measurement and writes one row per table at the rounded current time.
- write_series(values: Mapping[InfluxNode, pd.Series] | pd.DataFrame, *, allow_overwrite: bool = True, **kwargs: Any) None[source]
Write historic time series.
- Accepts either:
Mapping[InfluxNode, pd.Series]: each Series must have a datetime-like index.pd.DataFrame: datetime-like index; columns must match node fields ofselected_nodes.
- Args:
allow_overwrite: Currently forwarded to the underlying client if supported.
- classmethod from_node(node: Nodes[N] | N, usr: str | None = None, pwd: str | None = None, **kwargs: Any) Self
Will return a single connection for an enumerable of nodes with the same url netloc.
Initialize the connection object from a node object. When a list of Node objects is provided, from_node checks if all nodes match the same connection; it throws an error if they don’t. A node matches a connection if it has the same url netloc.
- Parameters:
node – Node to initialize from.
kwargs – Other arguments are ignored.
- Raises:
ValueError: if not all nodes match the same connection.
- Returns:
Connection object
- classmethod from_nodes(nodes: Nodes[N], **kwargs: Any) dict[str, Connection[N]]
Returns a dictionary of connections for nodes with the same url netloc.
This method handles different Connections, unlike from_node(). The keys of the dictionary are the netlocs of the nodes and each connection contains the nodes with the same netloc. (Uses from_node to initialize connections from nodes.).
- Parameters:
nodes – List of nodes to initialize from.
kwargs – Other arguments are ignored.
- Returns:
Dictionary of Connection objects with the netloc as key.
- property url: str
- url_parsed: ParseResult
URL of the server to connect to
- exc: BaseException | None
- selected_nodes
Preselected nodes which will be used for reading and writing, if no other nodes are specified
- Parameters (in addition to
Example Usage
1import os
2from datetime import datetime, timedelta, timezone
3
4from eta_nexus.connections import InfluxConnection
5from eta_nexus.nodes.influx_node import InfluxNode
6from eta_nexus.util.io_utils import autoload_env
7
8autoload_env()
9url = os.getenv("INFLUX_HOST")
10if not url:
11 raise ValueError("Set INFLUX_HOST env variable.")
12node = InfluxNode(name="hum", url=url, protocol="influx", database="foo", table="home")
13conn = InfluxConnection.from_node(node)
14now = datetime.now(timezone.utc)
15from_time = now - timedelta(hours=1)
16res = conn.read_series(from_time=from_time, to_time=now)