eta_nexus.connections.connection module

Connection base class and protocols for the ETA Nexus framework.

class eta_nexus.connections.connection.Readable(*args, **kwargs)[source]

Bases: Protocol, Generic[N_contra]

Non-data Protocol for Connections with the ability to read data.

abstract read(nodes: N_contra | Nodes[N_contra] | None = None) pd.DataFrame[source]

Reads current value from each Node in nodes. Uses selected_nodes if no nodes are passed.

Parameters:

nodes – Single Node or Sequence/Set of Nodes to read from.

Returns:

Pandas DataFrame with read values.

class eta_nexus.connections.connection.Writable(*args, **kwargs)[source]

Bases: Protocol, Generic[N]

Non-data Protocol for Connections with the ability to write data.

abstract write(values: Mapping[N, Any]) None[source]

Writes given values to nodes :param values: Mapping(e.g. Dict) of Nodes and respective values to write {node: value}.

class eta_nexus.connections.connection.Subscribable(*args, **kwargs)[source]

Bases: Protocol, Generic[N_contra]

Non-data Protocol for Connections with the ability to subscribe to data.

abstract subscribe(handler: SubscriptionHandler, nodes: N_contra | Nodes[N_contra] | None = None, request_frequency: TimeStep = 1) None[source]
Subscribes to nodes and calls handler when new data is available. If the connection protocol doesn’t

implement subscriptions natively, this method polls the nodes with the given frequency. Uses subscription_nodes if no nodes are passed.

Parameters:
  • nodes – Single Node or Sequence/Set of nodes to subscribe to.

  • handler – A SubscriptionHandler object

  • request_frequency – Time period between two requests. Interpreted as seconds if Numeric is given. Technically no frequency!

abstract close_sub() None[source]

Closes an open subscription. This should gracefully handle non-existent subscriptions.

class eta_nexus.connections.connection.SeriesReadable(*args, **kwargs)[source]

Bases: Protocol, Generic[N_contra]

Non-data Protocol for Connections with the ability to read historic data.

abstract read_series(from_time: datetime, to_time: datetime, nodes: N_contra | Nodes[N_contra] | None = None, interval: TimeStep = 1, **kwargs: Any) pd.DataFrame[source]
Reads time series data for each Node in nodes. Retrieves values for the partly open time interval

[from_time, to_time), adhering to the specified value-to-value distance given as resolution. Uses selected_nodes if no nodes are passed. Will apply the same resolution to all nodes.

Parameters:
  • interval – Start and end of timeseries, treated as partly open interval[from_time, to_time).

  • nodes – Single Node or Sequence/Set of nodes to read values from.

  • resolution – Time between timeseries’ values. Interpreted as seconds if Numeric is given.

  • kwargs – Additional Subclass arguments.

Returns:

pandas.DataFrame containing the timeseries read from the connection.

class eta_nexus.connections.connection.SeriesSubscribable(*args, **kwargs)[source]

Bases: Protocol, Generic[N_contra]

Non-data Protocol for Connections with the ability to subscribe to historic data.

abstract subscribe_series(handler: SubscriptionHandler, req_interval: TimeStep, offset: TimeStep | None = None, nodes: N_contra | Nodes[N_contra] | None = None, interval: TimeStep = 1, data_interval: TimeStep = 1, **kwargs: Any) None[source]
Subscribes to nodes and calls handler when new data is available. Retrieves values for the partly open time

interval [now + offset, now + offset + data_duration), adhering to the specified value-to-value distance given as resolution. If the connection protocol doesn’t implement subscriptions natively, this method polls the nodes with the given requesty_frequency. Uses subscription_nodes if no nodes are passed. Will apply the same resolution to all nodes.

Parameters:
  • handler – A SubscriptionHandler object

  • data_duration – Duration of returned timeseries interval.

  • offset – Offset between time of request and start of returned timeseries. Can be negative.

  • nodes – Single Node or Sequence/Set of nodes to subscribe to.

  • request_frequency – Time period between two requests. Interpreted as seconds if Numeric is given. Technically no frequency!

  • resolution – Time between timeseries’ values. Interpreted as seconds if Numeric is given.

  • **kwargs

    Subclass arguments

abstract close_sub() None[source]

Closes an open subscription. This should gracefully handle non-existent subscriptions.

class eta_nexus.connections.connection.Connection(url: str, usr: str | None = None, pwd: str | None = None, *, nodes: Nodes[N] | None = None)[source]

Bases: Generic[N], ABC

Common connection interface class.

The URL (netloc) may contain the username and password. (schema://username:password@hostname:port/path) In this case, the parameters usr and pwd are not required. BUT the keyword parameters of the function will take precedence over username and password configured in the url.

Parameters:
  • url – Netloc of the server to connect to.

  • usr – Username for login to server.

  • pwd – Password for login to server.

  • nodes – List of nodes to select as a standard case.

usr: str | None

Username for login to server

pwd: str | None

Password for login to server

selected_nodes

Preselected nodes which will be used for reading and writing, if no other nodes are specified

classmethod from_node(node: Nodes[N] | N, usr: str | None = None, pwd: str | None = None, **kwargs: Any) Self[source]

Will return a single connection for an enumerable of nodes with the same url netloc.

Initialize the connection object from a node object. When a list of Node objects is provided, from_node checks if all nodes match the same connection; it throws an error if they don’t. A node matches a connection if it has the same url netloc.

Parameters:
  • node – Node to initialize from.

  • kwargs – Other arguments are ignored.

Raises:

ValueError: if not all nodes match the same connection.

Returns:

Connection object

classmethod from_nodes(nodes: Nodes[N], **kwargs: Any) dict[str, Connection[N]][source]

Returns a dictionary of connections for nodes with the same url netloc.

This method handles different Connections, unlike from_node(). The keys of the dictionary are the netlocs of the nodes and each connection contains the nodes with the same netloc. (Uses from_node to initialize connections from nodes.).

Parameters:
  • nodes – List of nodes to initialize from.

  • kwargs – Other arguments are ignored.

Returns:

Dictionary of Connection objects with the netloc as key.

property url: str
class eta_nexus.connections.connection.SeriesConnection(**kwargs)[source]

Bases: Readable[N], Writable[N], Subscribable[N], SeriesReadable[N], SeriesSubscribable[N], Connection[N], ABC

Connection object for protocols with the ability to provide access to timeseries data.

Parameters:

url – URL of the server to connect to.