Source code for eta_nexus.subscription_handlers.df_subscription_handler

from __future__ import annotations

import threading
from datetime import datetime
from logging import getLogger
from threading import Lock
from typing import TYPE_CHECKING

import numpy as np
import pandas as pd

if TYPE_CHECKING:
    from collections.abc import Sequence
    from typing import Any

    from eta_nexus.nodes import Node
    from eta_nexus.util.type_annotations import TimeStep

from eta_nexus.subscription_handlers.subscription_handler import SubscriptionHandler

log = getLogger(__name__)


[docs] class DFSubscriptionHandler(SubscriptionHandler): """Subscription handler for returning pandas.DataFrames when requested. :param write_interval: Interval between index values in the data frame (value to which time is rounded). :param size_limit: Number of rows to keep in memory. :param auto_fillna: If True, missing values in self._data are filled with the pandas-method df.ffill() each time self.data is called. """ def __init__(self, write_interval: TimeStep = 1, size_limit: int = 100, *, auto_fillna: bool = True) -> None: super().__init__(write_interval=write_interval) self._data: pd.DataFrame = pd.DataFrame() self._data_lock: threading.Lock = Lock() self.keep_data_rows: int = size_limit self.auto_fillna: bool = auto_fillna
[docs] def push( self, node: Node, value: Any | pd.Series | Sequence[Any], timestamp: datetime | pd.DatetimeIndex | TimeStep | None = None, ) -> None: """Append values to the dataframe. :param node: Node object the data belongs to. :param value: Value of the data or Series of values. There must be corresponding timestamps for each value. :param timestamp: Timestamp of receiving the data or DatetimeIndex if pushing multiple values. Alternatively an integer/timedelta can be provided to determine the interval between data points. Use negative numbers to describe past data. Integers are interpreted as seconds. If value is a pd.Series and has a pd.DatetimeIndex, timestamp is ignored. """ # Check if node.name is in _data.columns with self._data_lock: if node.name not in self._data.columns: self._data[node.name] = pd.Series(dtype="object") def set_value(val: Any, ts: datetime, column: str) -> None: with self._data_lock: # Replace NaN with -inf to distinguish between the 'real' NaN and the 'fill' NaN if pd.isna(val): val = -np.inf self._data.loc[ts, column] = val # Multiple values if not isinstance(value, (str, bytes)) and hasattr(value, "__len__"): series = self._convert_series(value, timestamp) # Push Series # Values are rounded to self.write_interval in _convert_series for _timestamp, _value in series.items(): _timestamp = self._assert_tz_awareness(_timestamp) set_value(val=_value, ts=_timestamp, column=node.name) # Single value else: if not isinstance(timestamp, datetime) and timestamp is not None: raise ValueError("Timestamp must be a datetime object or None.") timestamp = self._round_timestamp(timestamp if timestamp is not None else datetime.now()) set_value(val=value, ts=timestamp, column=node.name) # Housekeeping (Keep internal data short) self._housekeeping()
[docs] def get_latest(self) -> pd.DataFrame | None: """Return a copy of the dataframe, this ensures they can be worked on freely. Returns None if data is empty.""" with self._data_lock: if len(self._data.index) == 0: return None # If no data in self._data, return None return self.data.iloc[[-1]]
@property def data(self) -> pd.DataFrame: """This contains the interval dataframe and will return a copy of that.""" with self._data_lock, pd.option_context("future.no_silent_downcasting", True): # noqa: FBT003 if self.auto_fillna: self._data = self._data.ffill(inplace=False) _data = self._data.replace(-np.inf, np.nan, inplace=False) return _data.convert_dtypes()
[docs] def reset(self) -> None: """Reset the internal data and restart collection.""" with self._data_lock: self._data = pd.DataFrame() log.info(f"Subscribed DataFrame {hash(self._data)} was reset successfully.")
def _housekeeping(self) -> None: """Keep internal data short by only keeping last rows as specified in self.keep_data_rows.""" with self._data_lock: self._data = self._data.drop(index=self._data.index[: -self.keep_data_rows])
[docs] def close(self) -> None: """This is just here to satisfy the interface, not needed in this case."""