eta_nexus.connections.connection module

Connection base class and protocols for the ETA Nexus framework.

class eta_nexus.connections.connection.StatusReadable(*args, **kwargs)[source]

Bases: Protocol, Generic[N_contra]

Non-data Protocol for Connections with the ability to read data.

abstract read(nodes: N_contra | Nodes[N_contra] | None = None) pd.DataFrame[source]

Reads current value from each Node in nodes. Uses selected_nodes if no nodes are passed.

Parameters:

nodes – Single Node or Sequence/Set of Nodes to read from.

Returns:

Pandas DataFrame with read values.

class eta_nexus.connections.connection.StatusWritable(*args, **kwargs)[source]

Bases: Protocol, Generic[N]

Non-data Protocol for Connections with the ability to write data.

abstract write(values: Mapping[N, Any]) None[source]

Writes given values to nodes :param values: Mapping(e.g. Dict) of Nodes and respective values to write {node: value}.

class eta_nexus.connections.connection.StatusSubscribable(*args, **kwargs)[source]

Bases: Protocol, Generic[N_contra]

Non-data Protocol for Connections with the ability to subscribe to data.

abstract subscribe(handler: SubscriptionHandler, nodes: N_contra | Nodes[N_contra] | None = None, request_frequency: TimeStep = 1) None[source]
Subscribes to nodes and calls handler when new data is available. If the connection protocol doesn’t

implement subscriptions natively, this method polls the nodes with the given frequency. Uses subscription_nodes if no nodes are passed.

Parameters:
  • nodes – Single Node or Sequence/Set of nodes to subscribe to.

  • handler – A SubscriptionHandler object

  • request_frequency – Time period between two requests. Interpreted as seconds if Numeric is given. Technically no frequency!

abstract close_sub() None[source]

Closes an open subscription. This should gracefully handle non-existent subscriptions.

class eta_nexus.connections.connection.SeriesReadable(*args, **kwargs)[source]

Bases: Protocol, Generic[N_contra]

Non-data Protocol for Connections with the ability to read historic data.

abstract read_series(from_time: datetime, to_time: datetime, nodes: N_contra | Nodes[N_contra] | None = None, interval: TimeStep = 1, **kwargs: Any) pd.DataFrame[source]
Reads time series data for each Node in nodes. Retrieves values for the partly open time interval

[from_time, to_time), adhering to the specified value-to-value distance given as resolution. Uses selected_nodes if no nodes are passed. Will apply the same resolution to all nodes.

Parameters:
  • interval – Start and end of timeseries, treated as partly open interval[from_time, to_time).

  • nodes – Single Node or Sequence/Set of nodes to read values from.

  • resolution – Time between timeseries’ values. Interpreted as seconds if Numeric is given.

  • kwargs – Additional Subclass arguments.

Returns:

pandas.DataFrame containing the timeseries read from the connection.

class eta_nexus.connections.connection.SeriesWritable(*args, **kwargs)[source]

Bases: Protocol, Generic[N]

Non-data Protocol for Connections with the ability to write historic (time series) data.

write_series(values: Mapping[N, pandas.Series], *, allow_overwrite: bool = True, **kwargs: Any) None[source]
write_series(values: pandas.DataFrame, *, allow_overwrite: bool = True, **kwargs: Any) None

Writes time series data for the given nodes.

Accepts either
  • a mapping from Node -> pandas.Series (index must be datetime-like; series values are samples), or

  • a pandas.DataFrame with a datetime-like index and one column per Node (column names must match node.name).

Implementations may round/align timestamps to node-specific intervals and should ensure timezone awareness consistent with the Connection utilities.

Parameters:
  • values – Mapping of nodes to Series, or a DataFrame with datetime-like index.

  • allow_overwrite – If True, upsert points at identical timestamps; if False, avoid overwriting.

  • kwargs – Additional subclass arguments.

class eta_nexus.connections.connection.SeriesSubscribable(*args, **kwargs)[source]

Bases: Protocol, Generic[N_contra]

Non-data Protocol for Connections with the ability to subscribe to historic data.

abstract subscribe_series(handler: SubscriptionHandler, req_interval: TimeStep, offset: TimeStep | None = None, nodes: N_contra | Nodes[N_contra] | None = None, interval: TimeStep = 1, data_interval: TimeStep = 1, **kwargs: Any) None[source]
Subscribes to nodes and calls handler when new data is available. Retrieves values for the partly open time

interval [now + offset, now + offset + data_duration), adhering to the specified value-to-value distance given as resolution. If the connection protocol doesn’t implement subscriptions natively, this method polls the nodes with the given requesty_frequency. Uses subscription_nodes if no nodes are passed. Will apply the same resolution to all nodes.

Parameters:
  • handler – A SubscriptionHandler object

  • data_duration – Duration of returned timeseries interval.

  • offset – Offset between time of request and start of returned timeseries. Can be negative.

  • nodes – Single Node or Sequence/Set of nodes to subscribe to.

  • request_frequency – Time period between two requests. Interpreted as seconds if Numeric is given. Technically no frequency!

  • resolution – Time between timeseries’ values. Interpreted as seconds if Numeric is given.

  • **kwargs

    Subclass arguments

abstract close_sub() None[source]

Closes an open subscription. This should gracefully handle non-existent subscriptions.

class eta_nexus.connections.connection.Connection(url: str, usr: str | None = None, pwd: str | None = None, *, nodes: Nodes[N] | None = None)[source]

Bases: Generic[N], ABC

Common connection interface class.

The URL (netloc) may contain the username and password. (schema://username:password@hostname:port/path) In this case, the parameters usr and pwd are not required. BUT the keyword parameters of the function will take precedence over username and password configured in the url.

Parameters:
  • url – Netloc of the server to connect to.

  • usr – Username for login to server.

  • pwd – Password for login to server.

  • nodes – List of nodes to select as a standard case.

logger: Logger
url_parsed: ParseResult

URL of the server to connect to

usr: str | None

Username for login to server

pwd: str | None

Password for login to server

selected_nodes

Preselected nodes which will be used for reading and writing, if no other nodes are specified

classmethod from_node(node: Nodes[N] | N, usr: str | None = None, pwd: str | None = None, **kwargs: Any) Self[source]

Will return a single connection for an enumerable of nodes with the same url netloc.

Initialize the connection object from a node object. When a list of Node objects is provided, from_node checks if all nodes match the same connection; it throws an error if they don’t. A node matches a connection if it has the same url netloc.

Parameters:
  • node – Node to initialize from.

  • kwargs – Other arguments are ignored.

Raises:

ValueError: if not all nodes match the same connection.

Returns:

Connection object

classmethod from_nodes(nodes: Nodes[N], **kwargs: Any) dict[str, Connection[N]][source]

Returns a dictionary of connections for nodes with the same url netloc.

This method handles different Connections, unlike from_node(). The keys of the dictionary are the netlocs of the nodes and each connection contains the nodes with the same netloc. (Uses from_node to initialize connections from nodes.).

Parameters:
  • nodes – List of nodes to initialize from.

  • kwargs – Other arguments are ignored.

Returns:

Dictionary of Connection objects with the netloc as key.

property url: str
class eta_nexus.connections.connection.SeriesConnection(**kwargs)[source]

Bases: StatusReadable[N], StatusWritable[N], StatusSubscribable[N], SeriesReadable[N], SeriesWritable[N], SeriesSubscribable[N], Connection[N], ABC

Connection object for protocols with the ability to provide access to timeseries data.

Parameters:

url – URL of the server to connect to.

class eta_nexus.connections.connection.RESTConnection(url: str, usr: str | None = None, pwd: str | None = None, *, nodes: Nodes[N] | None = None, retry_total: int = 3, retry_backoff_factor: float = 1.0)[source]

Bases: Connection[N], ABC

RESTConnection is an abstract base class for managing RESTful API connections in the ETA Nexus framework. It extends the Connection class and provides standardized functionality for handling HTTP requests, managing API tokens, and session management. This class is designed to reduce boilerplate code and streamline the integration of new REST-based connections.

Key Features: - Centralized HTTP request handling with consistent error management and logging. - Lazy-loaded session management using a cached session. - API token retrieval from environment variables based on the connection protocol name. - Abstract methods for session initialization and node-specific data reading. - Authentication abstraction for subclasses to define custom authentication mechanisms.

Subclasses should implement the _initialize_session method to define session initialization logic and the _read_node method to handle node-specific data reading.

Parameters:
  • url – URL of the REST API endpoint.

  • usr – Username for authentication (optional).

  • pwd – Password for authentication (optional).

  • nodes – List of nodes to connect to (optional).

  • retry_total – Total number of retries for failed HTTP requests (default: 3).

  • retry_backoff_factor – Backoff factor for retries (default: 1s-> e.g. 1s, 2s, 4s for 3 retries).

property session: CachedSession

Return the cached session.

property authentication: None | AuthBase

Return the authentication method for the API.

abstract read_node(node: N, from_time: datetime, to_time: datetime, interval: timedelta, **kwargs: Any) pandas.DataFrame[source]

Read data from a REST API endpoint.

Parameters:
  • node – Node to read data from.

  • from_time – Start of the time series (timezone-aware).

  • to_time – End of the time series (timezone-aware).

  • interval – Time interval between data points.

  • kwargs – Additional subclass-specific arguments.

Returns:

DataFrame containing the data read from the API.