Source code for eta_nexus.connections.wetterdienst_connection

from __future__ import annotations

from abc import ABC, abstractmethod
from logging import getLogger
from typing import TYPE_CHECKING, Generic, TypeVar

import pandas as pd
from wetterdienst import Settings
from wetterdienst.provider.dwd.mosmix.api import DwdMosmixRequest
from wetterdienst.provider.dwd.observation.api import DwdObservationRequest

if TYPE_CHECKING:
    from datetime import datetime
    from typing import Any

    from wetterdienst.core.timeseries.result import StationsResult

    from eta_nexus.util.type_annotations import Nodes, TimeStep

from eta_nexus.connections.connection import Connection, SeriesReadable
from eta_nexus.nodes.wetterdienst_node import (
    WetterdienstNode,
    WetterdienstObservationNode,
    WetterdienstPredictionNode,
)

WN = TypeVar("WN", bound=WetterdienstNode)


[docs] class WetterdienstConnection(Generic[WN], Connection[WN], SeriesReadable[WN], ABC): """The WetterdienstConnection class is a connection to the Wetterdienst API for retrieving weather data. This class is an abstract base class and should not be used directly. Instead, use the subclasses :class:`WetterdienstObservationConnection` and :class:`WetterdienstPredictionConnection`. :param url: The base URL of the Wetterdienst API :param nodes: Nodes to select in connection :param settings: Wetterdienst settings object """ logger = getLogger(__name__) def __init__( self, *, nodes: Nodes[WN] | None = None, settings: Settings | None = None, **kwargs: Any, ) -> None: self.settings = Settings(settings=settings) self.settings.ts_skip_empty = True self.settings.ts_si_units = False self.settings.ts_humanize = True super().__init__("https://opendata.dwd.de/", nodes=nodes) # dummy url @classmethod def _from_node(cls, node: WN, **kwargs: Any) -> WetterdienstConnection: """Initialize the connection object from an wetterdienst protocol node object. :param node: Node to initialize from :param kwargs: Extra keyword arguments """ settings = kwargs.get("settings") return super()._from_node(node, settings=settings)
[docs] @abstractmethod def read_series( self, from_time: datetime, to_time: datetime, nodes: WN | Nodes[WN] | None = None, interval: TimeStep = 60, **kwargs: Any, ) -> pd.DataFrame: """Abstract base method for read_series(). Is fully implemented in :func:`~wetterdienst.WetterdienstObservationConnection.read_series` and :func:`~wetterdienst.WetterdienstPredictionConnection.read_series`. :param nodes: Single node or list/set of nodes to read values from. :param from_time: Starting time to begin reading (included in output). :param to_time: Time to stop reading at (not included in output). :param interval: interval between time steps. It is interpreted as seconds if given as integer. :param kwargs: additional argument list, to be defined by subclasses. :return: pandas.DataFrame containing the data read from the connection. """
[docs] def retrieve_stations(self, node: WetterdienstNode, request: DwdObservationRequest) -> pd.DataFrame: """Retrieve stations from the Wetterdienst API and return the values as a pandas DataFrame Stations are filtered by the node's station_id or latlon and number_of_stations. :param node: Node to retrieve stations for :param request: Wetterdienst request object, containing the station data """ # Retrieve stations. If station_id is provided, use it, otherwise use latlon to get nearest stations stations: StationsResult if node.station_id is not None: stations = request.filter_by_station_id(node.station_id) else: stations = request.filter_by_rank(node.latlon, rank=node.number_of_stations) # Convert to pandas and pivot values so date is the index and # node names combined with the station_id are the columns result_df: pd.DataFrame = stations.values.all().df.to_pandas() # noqa: PD011 (stations is not a dataframe) result_df = result_df.pivot_table(values="value", columns=("parameter", "station_id"), index="date") # Rename the columns to the node names result_df = result_df.rename({node.parameter.lower(): node.name}, axis="columns") return result_df.rename_axis(("Name", "station_id"), axis="columns")
[docs] class WetterdienstObservationConnection( WetterdienstConnection[WetterdienstObservationNode], protocol="wetterdienst_observation" ): """The WetterdienstObservationConnection class is a connection to the Wetterdienst API for retrieving weather observation data. Data can only be read with :func:`~wetterdienst.WetterdienstObservationConnection.read_series`. """
[docs] def read_series( self, from_time: datetime, to_time: datetime, nodes: WetterdienstObservationNode | Nodes[WetterdienstObservationNode] | None = None, interval: TimeStep = 60, **kwargs: Any, ) -> pd.DataFrame: """Read weather observation data from the Wetterdienst API for the given nodes and time interval. :param from_time: Start time for the data retrieval :param to_time: End time for the data retrieval :param nodes: Single node or list/set of nodes to read data from :param interval: Time interval between data points in seconds :return: Pandas DataFrame containing the data read from the connection """ from_time, to_time, nodes, interval = super()._preprocess_series_context( from_time, to_time, nodes, interval, **kwargs ) def _read_node(node: WetterdienstObservationNode) -> pd.Dataframe: # Get the resolution for the node from the interval resolution = WetterdienstObservationNode.convert_interval_to_resolution(node.interval) # Create a request object for the node request: DwdObservationRequest = DwdObservationRequest( parameter=node.parameter, resolution=resolution, start_date=from_time, end_date=to_time, settings=self.settings, ) return self.retrieve_stations(node, request) # We can't use a ThreadPoolExecutor here, as the Wetterdienst library uses asyncio. # As a result, we have to call the _read_node method directly, which causes type errors. result = pd.concat([_read_node(node) for node in nodes], axis=1, sort=False) # Convert the data to the requested interval return result.asfreq(interval, method="ffill").ffill()
[docs] class WetterdienstPredictionConnection( WetterdienstConnection[WetterdienstPredictionNode], protocol="wetterdienst_prediction" ): """The WetterdienstPredictionConnection class is a connection to the Wetterdienst API for retrieving weather prediction data (MOSMIX). Data can only be read with :func:`~wetterdienst.WetterdienstPredictionConnection.read_series`. """
[docs] def read_series( self, from_time: datetime, to_time: datetime, nodes: WetterdienstPredictionNode | Nodes[WetterdienstPredictionNode] | None = None, interval: TimeStep = 0, **kwargs: Any, ) -> pd.DataFrame: """Read weather prediction data from the Wetterdienst API for the given nodes. The interval parameter is not used for prediction data, as predictions are always given hourly. :param from_time: Start time for the data retrieval :param to_time: End time for the data retrieval :param nodes: Single node or list/set of nodes to read data from :param interval: - Not used for prediction data :return: Pandas DataFrame containing the data read from the connection """ from_time, to_time, nodes, interval = super()._preprocess_series_context( from_time, to_time, nodes, interval, **kwargs ) def _read_node(node: WetterdienstPredictionNode) -> pd.Dataframe: request = DwdMosmixRequest( parameter=node.parameter, mosmix_type=node.mosmix_type, start_date=from_time, end_date=to_time, settings=self.settings, ) return self.retrieve_stations(node, request) # We can't use a ThreadPoolExecutor here, as the Wetterdienst library uses asyncio. # As a result, we have to call the _read_node method directly, which causes type errors. return pd.concat([_read_node(node) for node in nodes], axis=1, sort=False)