Source code for eta_nexus.connections.eneffco_connection

from __future__ import annotations

import asyncio
import concurrent.futures
import os
from datetime import datetime, timedelta, timezone
from logging import getLogger
from typing import TYPE_CHECKING

import numpy as np
import pandas as pd
import requests
from requests_cache import CachedSession

from eta_nexus.connections.connection import (
    Connection,
    Readable,
    SeriesReadable,
    SeriesSubscribable,
    Subscribable,
    Writable,
)
from eta_nexus.nodes import EneffcoNode
from eta_nexus.subhandlers import SubscriptionHandler

if TYPE_CHECKING:
    from collections.abc import Mapping, Sequence
    from typing import Any

    from eta_nexus.subhandlers import SubscriptionHandler
    from eta_nexus.util.type_annotations import Nodes, Primitive, TimeStep


log = getLogger(__name__)


[docs] class EneffcoConnection( Connection[EneffcoNode], Readable[EneffcoNode], Writable[EneffcoNode], Subscribable[EneffcoNode], SeriesReadable[EneffcoNode], SeriesSubscribable[EneffcoNode], protocol="eneffco", ): """EneffcoConnection is a class to download and upload multiple features from and to the Eneffco database as timeseries. :param url: URL of the server with scheme (https://). :param usr: Username in EnEffco for login. :param pwd: Password in EnEffco for login. :param nodes: Nodes to select in connection. """ API_PATH: str = "/API/v1.0" def __init__(self, url: str, usr: str | None, pwd: str | None, *, nodes: Nodes[EneffcoNode] | None = None) -> None: url = url + self.API_PATH _api_token: str | None = os.getenv("ENEFFCO_API_TOKEN") super().__init__(url, usr, pwd, nodes=nodes) if self.usr is None: raise ValueError("Username must be provided for the Eneffco connection.") if self.pwd is None: raise ValueError("Password must be provided for the Eneffco connection.") if _api_token is None: raise ValueError("ENEFFCO_API_TOKEN environment variable is not set.") self._api_token: str = _api_token self._node_ids: pd.DataFrame | None = None self._node_ids_raw: pd.DataFrame | None = None self._sub: asyncio.Task | None = None self._subscription_nodes: set[EneffcoNode] = set() self._subscription_open: bool = False self._session: CachedSession = CachedSession( cache_name="eta_nexus/connections/requests_cache/eneffco_cache", expire_after=timedelta(minutes=15), use_cache_dir=True, ) @classmethod def _from_node( cls, node: EneffcoNode, usr: str | None = None, pwd: str | None = None, **kwargs: Any ) -> EneffcoConnection: """Initialize the connection object from an Eneffco protocol node object. :param node: Node to initialize from. :param usr: Username to use. :param pwd: Password to use. :return: EneffcoConnection object. """ return super()._from_node(node, usr=usr, pwd=pwd)
[docs] @classmethod def from_ids(cls, ids: Sequence[str], url: str, usr: str, pwd: str) -> EneffcoConnection: """Initialize the connection object from an Eneffco protocol through the node IDs. :param ids: Identification of the Node. :param url: URL for EnEffco connection. :param usr: Username for Eneffco login. :param pwd: Password for Eneffco login. :return: EneffcoConnection object. """ nodes = [EneffcoNode(name=name, url=url, protocol="eneffco", eneffco_code=name) for name in ids] return cls(url=url, usr=usr, pwd=pwd, nodes=nodes)
[docs] def read(self, nodes: EneffcoNode | Nodes[EneffcoNode] | None = None) -> pd.DataFrame: """Download current value from the Eneffco Database. :param nodes: Single node or list/set of nodes to read values from. :return: pandas.DataFrame containing the data read from the connection. """ nodes = self._validate_nodes(nodes) base_time = 1 # seconds the_time = self._round_timestamp(datetime.now(), base_time).replace(tzinfo=None) return self.read_series(the_time - timedelta(seconds=base_time), the_time, nodes, base_time)
[docs] def write( self, values: Mapping[EneffcoNode, Primitive] | pd.Series[datetime, Primitive], time_interval: timedelta | None = None, ) -> None: """Writes some values to the Eneffco Database. :param values: Dictionary of nodes and data to write {node: value}. :param time_interval: Interval between datapoints (i.e. between "From" and "To" in Eneffco Upload) (default 1s). """ nodes = self._validate_nodes(list(values.keys())) if time_interval is None: time_interval = timedelta(seconds=1) for node in nodes: request_url = f"rawdatapoint/{self.id_from_code(node.eneffco_code, raw_datapoint=True)}/value" response = self._raw_request( "POST", request_url, data=self._prepare_raw_data(values[node], time_interval), headers={ "Content-Type": "application/json", "cache-control": "no-cache", "Postman-Token": self._api_token, }, params={"comment": ""}, ) log.info(response.text if response else "No response.")
def _prepare_raw_data( self, data: Mapping[datetime, Primitive] | pd.Series[datetime, Primitive], time_interval: timedelta ) -> str: """Change the input format into a compatible format with Eneffco and filter NaN values. :param data: Data to write to node {time: value}. Could be a dictionary or a pandas Series. :param time_interval: Interval between datapoints (i.e. between "From" and "To" in Eneffco Upload). :return upload_data: String from dictionary in the format for the upload to Eneffco. """ if isinstance(data, (dict, pd.Series)): upload_data: dict[str, list[Any]] = {"Values": []} for time, val in data.items(): # Only write values if they are not nan if not np.isnan(val): aware_time = self._assert_tz_awareness(time).astimezone(timezone.utc) upload_data["Values"].append( { "Value": float(val), "From": aware_time.strftime("%Y-%m-%d %H:%M:%SZ"), "To": (aware_time + time_interval).strftime("%Y-%m-%d %H:%M:%SZ"), } ) else: raise TypeError("Unrecognized data format for Eneffco upload. Provide dictionary or pandas series.") return str(upload_data)
[docs] def read_info(self, nodes: EneffcoNode | Nodes[EneffcoNode] | None = None) -> pd.DataFrame: """Read additional datapoint information from Database. :param nodes: Single node or list/set of nodes values from. :return: pandas.DataFrame containing the data read from the connection. """ nodes = self._validate_nodes(nodes) values = [] for node in nodes: request_url = f"datapoint/{self.id_from_code(node.eneffco_code)}" response = self._raw_request("GET", request_url) json_data = self._safe_json_dict(response) if json_data is None: log.warning(f"[Eneffco] Skipping node {node.eneffco_code} — upstream request failed or returned empty") continue values.append(pd.Series(json_data, name=node.name)) return pd.concat(values, axis=1)
def _safe_json_dict(self, response: requests.Response | None) -> dict | None: if response is None: log.warning("[Eneffco] No HTTP response received") return None try: return response.json() except ValueError: log.exception("[Eneffco] Failed to parse JSON as dict — invalid content or upstream error") return None
[docs] def subscribe( self, handler: SubscriptionHandler, nodes: EneffcoNode | Nodes[EneffcoNode] | None = None, interval: TimeStep = 1, ) -> None: """Subscribe to nodes and call handler when new data is available. This will return only the last available values. :param handler: SubscriptionHandler object with a push method that accepts node, value pairs. :param interval: Interval for receiving new data. It is interpreted as seconds when given as an integer. :param nodes: Single node or list/set of nodes to subscribe to. """ self.subscribe_series(handler=handler, req_interval=1, nodes=nodes, interval=interval, data_interval=interval)
[docs] def read_series( self, from_time: datetime, to_time: datetime, nodes: EneffcoNode | Nodes[EneffcoNode] | None = None, interval: TimeStep = 1, **kwargs: Any, ) -> pd.DataFrame: """Download timeseries data from the Eneffco Database. :param nodes: Single node or list/set of nodes to read values from. :param from_time: Starting time to begin reading (included in output). :param to_time: Time to stop reading at (not included in output). :param interval: Interval between time steps. It is interpreted as seconds if given as integer. :param kwargs: Other parameters (ignored by this connection). :return: Pandas DataFrame containing the data read from the connection. """ from_time, to_time, nodes, interval = super()._preprocess_series_context( from_time, to_time, nodes, interval, **kwargs ) def read_node(node: EneffcoNode) -> pd.DataFrame: request_url = ( f"datapoint/{self.id_from_code(node.eneffco_code)}/value?" f"from={self.timestr_from_datetime(from_time)}&" f"to={self.timestr_from_datetime(to_time)}&" f"timeInterval={int(interval.total_seconds())!s}&" "includeNanValues=True" ) response = self._raw_request("GET", request_url) if response is None: log.warning(f"[Eneffco] No response from {request_url} — possible connection or timeout issue") return pd.DataFrame(columns=[node.name]) # Empty DataFrame try: json_data = response.json() except ValueError: log.exception( f"[Eneffco] Failed to parse JSON from {request_url} — upstream HTTP or token error likely" ) return pd.DataFrame(columns=[node.name]) if not json_data: # Empty or None response log.warning( f"[Eneffco] Empty JSON returned from {request_url} — check API response or token access rights" ) return pd.DataFrame(columns=[node.name]) try: data = pd.DataFrame( data=(r["Value"] for r in json_data), index=( pd.to_datetime( [r["From"] for r in json_data], utc=True, format="%Y-%m-%dT%H:%M:%SZ", ).tz_convert(self._local_tz) ), columns=[node.name], dtype="float64", ) data.index.name = "Time (with timezone)" except (KeyError, ValueError, TypeError): log.exception( f"[Eneffco] Failed to construct DataFrame for {node.eneffco_code} — " f"invalid or incomplete response structure" ) return pd.DataFrame(columns=[node.name]) else: return data with concurrent.futures.ThreadPoolExecutor() as executor: results = executor.map(read_node, nodes) return pd.concat(results, axis=1, sort=False)
[docs] def subscribe_series( self, handler: SubscriptionHandler, req_interval: TimeStep, offset: TimeStep | None = None, nodes: EneffcoNode | Nodes[EneffcoNode] | None = None, interval: TimeStep = 1, data_interval: TimeStep = 1, **kwargs: Any, ) -> None: """Subscribe to nodes and call handler when new data is available. This will always return a series of values. If nodes with different intervals should be subscribed, multiple connection objects are needed. :param handler: SubscriptionHandler object with a push method that accepts node, value pairs. :param req_interval: Duration covered by requested data (time interval). Interpreted as seconds if given as int. :param offset: Offset from datetime.now from which to start requesting data (time interval). Interpreted as seconds if given as int. Use negative values to go to past timestamps. :param data_interval: Time interval between values in returned data. Interpreted as seconds if given as int. :param interval: interval (between requests) for receiving new data. It is interpreted as seconds when given as an integer. :param nodes: Single node or list/set of nodes to subscribe to. :param kwargs: Other, ignored parameters. """ nodes = self._validate_nodes(nodes) interval = interval if isinstance(interval, timedelta) else timedelta(seconds=interval) req_interval = req_interval if isinstance(req_interval, timedelta) else timedelta(seconds=req_interval) if offset is None: offset = -req_interval else: offset = offset if isinstance(offset, timedelta) else timedelta(seconds=offset) data_interval = data_interval if isinstance(data_interval, timedelta) else timedelta(seconds=data_interval) self._subscription_nodes.update(nodes) if self._subscription_open: # Adding nodes to subscription is enough to include them in the query. Do not start an additional loop # if one already exists return self._subscription_open = True loop = asyncio.get_event_loop() self._sub = loop.create_task( self._subscription_loop( handler, int(interval.total_seconds()), req_interval, offset, data_interval, ) )
[docs] def close_sub(self) -> None: """Close an open subscription.""" self._subscription_open = False if self.exc: raise self.exc try: self._sub.cancel() # type: ignore[union-attr] except Exception: log.exception("Error while closing EnEffCo subscription.")
async def _subscription_loop( self, handler: SubscriptionHandler, interval: TimeStep, req_interval: TimeStep, offset: TimeStep, data_interval: TimeStep, ) -> None: """The subscription loop handles requesting data from the server in the specified interval. :param handler: Handler object with a push function to receive data. :param interval: Interval for requesting data in seconds. :param req_interval: Duration covered by the requested data. :param offset: Offset from datetime.now from which to start requesting data (time interval). Use negative values to go to past timestamps. :param data_interval: Interval between data points. """ interval = interval if isinstance(interval, timedelta) else timedelta(seconds=interval) req_interval = req_interval if isinstance(req_interval, timedelta) else timedelta(seconds=req_interval) data_interval = data_interval if isinstance(data_interval, timedelta) else timedelta(seconds=data_interval) offset = offset if isinstance(offset, timedelta) else timedelta(seconds=offset) try: while self._subscription_open: from_time = datetime.now() + offset to_time = from_time + req_interval values = self.read_series(from_time, to_time, self._subscription_nodes, interval=data_interval) for node in self._subscription_nodes: handler.push(node, values[node.name]) await asyncio.sleep(interval.total_seconds()) except Exception as e: self.exc = e
[docs] def id_from_code(self, code: str, *, raw_datapoint: bool = False) -> str: """Function to get the raw Eneffco ID corresponding to a specific (raw) datapoint. :param code: Exact Eneffco code. :param raw_datapoint: Returns raw datapoint ID. """ # Only build lists of IDs if they are not available yet if self._node_ids is None: self._node_ids = self._safe_json_df(self._raw_request("GET", "/datapoint")) if self._node_ids is None: log.error("[Eneffco] Failed to load /datapoint — upstream request returned empty or malformed response") return "" if self._node_ids_raw is None: self._node_ids_raw = self._safe_json_df(self._raw_request("GET", "/rawdatapoint")) if self._node_ids_raw is None: log.error( "[Eneffco] Failed to load /rawdatapoint - upstream request returned empty or malformed response" ) return "" def find_id(node_ids: pd.DataFrame) -> str: if len(node_ids.loc[node_ids["Code"] == code, "Id"]) <= 0: raise ValueError(f"Code {code} does not exist on server {self.url}.") return node_ids.loc[node_ids["Code"] == code, "Id"].to_numpy().item() return find_id(self._node_ids_raw) if raw_datapoint else find_id(self._node_ids)
def _safe_json_df(self, response: requests.Response | None) -> pd.DataFrame | None: if response is None: return None try: return pd.DataFrame(data=response.json()) except ValueError: log.exception("[Eneffco] JSON parse failed — check HTTP status or token/connection issues") return None
[docs] def timestr_from_datetime(self, dt: datetime) -> str: """Create an Eneffco compatible time string. :param dt: Datetime object to convert to string. :return: Eneffco compatible time string. """ return dt.isoformat(sep="T", timespec="seconds").replace(":", "%3A").replace("+", "%2B")
def _raw_request(self, method: str, endpoint: str, **kwargs: Any) -> requests.Response | None: """Perform Eneffco request and handle possibly resulting errors. :param method: HTTP request method. :param endpoint: Endpoint for the request (server URI is added automatically). :param kwargs: Additional arguments for the request. """ if self.usr is None: raise AttributeError("Make sure to specify a username before performing Eneffco requests.") if self.pwd is None: raise AttributeError("Make sure to specify a password before performing Eneffco requests.") try: response = self._session.request( method, self.url + "/" + str(endpoint), auth=requests.auth.HTTPBasicAuth(self.usr, self.pwd), **kwargs ) response.raise_for_status() except requests.exceptions.HTTPError as e: log.warning(f"[Eneffco] {e}") return None except requests.exceptions.RequestException: log.exception(f"[Eneffco] Request failed at {self.url}") return None else: return response