eta_nexus.connections.influx_connection module

InfluxDB v3 SQL-backed connection for ETA Nexus.

This module provides InfluxConnection, a concrete Connection that can read latest values, read historic time series and write single/bulk time series to InfluxDB v3 via the official influxdb-client-3 pandas API.

Authentication is performed with an API token. You may pass it explicitly via token=... or set the environment variable INFLUXDB3_AUTH_TOKEN. If neither is present, a final fallback to the base-connection password (pwd) is attempted for convenience.

class eta_nexus.connections.influx_connection.InfluxConnection(*args: Any, **kwargs: Any)[source]

Bases: Connection[InfluxNode], StatusReadable[InfluxNode], SeriesReadable[InfluxNode], StatusWritable[InfluxNode], SeriesWritable[InfluxNode]

InfluxDB v3 connection using SQL+Pandas.

Parameters (in addition to Connection):
database (str): Database (a.k.a. bucket) to connect to. If omitted, we try

to infer from the first provided node or from INFLUXDB_DB.

token (str, optional): Auth token for InfluxDB v3. If omitted, we try

INFLUXDB3_AUTH_TOKEN and finally pwd from the base connection.

logger: Logger = <Logger eta_nexus.connections.influx_connection (WARNING)>
read(nodes: InfluxNode | Nodes[InfluxNode] | None = None) pd.DataFrame[source]

Read the latest value for each requested node.

Returns:

pd.DataFrame: Single-row DataFrame indexed by timestamp with one column per node field.

read_series(from_time: datetime, to_time: datetime, nodes: InfluxNode | Nodes[InfluxNode] | None = None, interval: TimeStep = 1, **kwargs: Any) pd.DataFrame[source]

Read historic series for each requested node over the partly-open interval [from_time, to_time). The interval parameter is currently accepted for API compatibility and may be used by backends that support server-side resampling.

Returns:

pd.DataFrame: Time-indexed frame with one column per node field.

write(values: Mapping[InfluxNode, Any]) None[source]

Write current values for the provided nodes.

Groups by table/measurement and writes one row per table at the rounded current time.

write_series(values: Mapping[InfluxNode, pd.Series] | pd.DataFrame, *, allow_overwrite: bool = True, **kwargs: Any) None[source]

Write historic time series.

Accepts either:
  • Mapping[InfluxNode, pd.Series]: each Series must have a datetime-like index.

  • pd.DataFrame: datetime-like index; columns must match node fields of selected_nodes.

Args:

allow_overwrite: Currently forwarded to the underlying client if supported.