Source code for eta_nexus.nodes.wetterdienst_node

from __future__ import annotations

import enum
from datetime import timedelta
from logging import getLogger
from typing import TYPE_CHECKING

from attrs import (
    converters,
    field,
    validators as vld,
)
from wetterdienst.metadata.parameter import Parameter
from wetterdienst.provider.dwd.mosmix.api import DwdMosmixParameter
from wetterdienst.provider.dwd.observation import (
    DwdObservationParameter,
    DwdObservationResolution,
)

from eta_nexus.nodes.node import Node

if TYPE_CHECKING:
    from typing import Any


log = getLogger(__name__)


[docs] class WetterdienstNode(Node): """Abstract Base Node for the Wetterdienst API. This class is not meant to be used directly, but to be subclassed by WetterdienstObservationNode and WetterdienstPredictionNode. """ #: Parameter to read from wetterdienst (e.g HUMIDITY or TEMPERATURE_AIR_200) parameter: str = field(kw_only=True, converter=str.upper) #: The id of the weather station station_id: str | None = field(default=None, kw_only=True) #: latitude and longitude (not necessarily a weather station) latlon: str | None = field(default=None, kw_only=True) #: Number of stations to be used for the query number_of_stations: int | None = field(default=None, kw_only=True) def __attrs_post_init__(self) -> None: """Ensure that all required parameters are present.""" # Set same default URL for all Wetterdienst nodes object.__setattr__(self, "url", "https://opendata.dwd.de") super().__attrs_post_init__() if self.station_id is None and (self.latlon is None or self.number_of_stations is None): raise ValueError( "The required parameter 'station_id' or 'latlon' and 'number_of_stations' for the node configuration " "was not found. The node could not load." ) parameters = [item.name for item in Parameter] if self.parameter not in parameters: raise ValueError( f"Parameter {self.parameter} is not valid. Valid parameters can be found here:" f"https://wetterdienst.readthedocs.io/en/latest/data/parameters.html" ) @classmethod def _get_params(cls, dikt: dict[str, Any]) -> dict[str, Any]: """Get the common parameters for a Wetterdienst node. :param dikt: dictionary with node information. :return: dict with: parameter, station_id, latlon, number_of_stations """ return { "parameter": dikt.get("parameter"), "station_id": dikt.get("station_id"), "latlon": dikt.get("latlon"), "number_of_stations": dikt.get("number_of_stations"), }
[docs] class WetterdienstObservationNode(WetterdienstNode, protocol="wetterdienst_observation"): """Node for the Wetterdienst API to get weather observations. For more information see: https://wetterdienst.readthedocs.io/en/latest/data/provider/dwd/observation/. """ #: Redeclare interval attribute, but don't allow it to be optional interval: str = field(converter=converters.optional(float), kw_only=True, repr=False, eq=False, order=False) def __attrs_post_init__(self) -> None: super().__attrs_post_init__() resolution = self.convert_interval_to_resolution(self.interval) # Sort out the parameters by resolution available_params = DwdObservationParameter[resolution] available_params = [param.name for param in available_params if type(param) is not enum.EnumMeta] # If the parameter is not in the available parameters for the resolution, generate a list # of available resolutions for the parameter and raise an error if self.parameter not in available_params: available_resolutions = [] for resolution in DwdObservationResolution: params = DwdObservationParameter[resolution.name] # type: ignore[attr-defined] if self.parameter in [param.name for param in params if type(param) is not enum.EnumMeta]: available_resolutions.append(resolution.name) # type: ignore[attr-defined] if len(available_resolutions) == 0: raise ValueError(f"Parameter {self.parameter} is not a valid observation parameter.") raise ValueError( f"Parameter {self.parameter} is not valid for the given resolution. " f"Valid resolutions for parameter {self.parameter} are: " f"{available_resolutions}" ) @classmethod def _from_dict(cls, dikt: dict[str, Any]) -> WetterdienstObservationNode: """Create a WetterdienstObservationNode from a dictionary of node information. :param dikt: dictionary with node information. :return: WetterdienstObservationNode object. """ name, _, _, _, interval = cls._read_dict_info(dikt) params = cls._get_params(dikt) try: return cls(name, "", "wetterdienst_observation", interval=interval, **params) except (TypeError, AttributeError) as e: raise TypeError(f"Could not convert all types for node {name}.") from e
[docs] @staticmethod def convert_interval_to_resolution(interval: int | str | timedelta) -> str: resolutions = { 60: "MINUTE_1", 300: "MINUTE_5", 600: "MINUTE_10", 3600: "HOURLY", 28800: "SUBDAILY", # not 8h intervals, measured at 7am, 2pm, 9pm 86400: "DAILY", 2592000: "MONTHLY", 31536000: "ANNUAL", } interval = int(interval.total_seconds()) if isinstance(interval, timedelta) else int(interval) if interval not in resolutions: raise ValueError(f"Interval {interval} not supported. Must be one of {list(resolutions.keys())}") return resolutions[interval]
[docs] class WetterdienstPredictionNode(WetterdienstNode, protocol="wetterdienst_prediction"): """Node for the Wetterdienst API to get weather predictions. For more information see: https://wetterdienst.readthedocs.io/en/latest/data/provider/dwd/mosmix/. """ #: Type of the MOSMIX prediction. Either 'SMALL' or 'LARGE' mosmix_type: str = field(kw_only=True, converter=str.upper, validator=vld.in_(("SMALL", "LARGE"))) def __attrs_post_init__(self) -> None: super().__attrs_post_init__() # Sort out the parameters by resolution params = DwdMosmixParameter[self.mosmix_type] # Create list of available parameters, enums are excluded because they are datasets available_params = [param.name for param in params if type(param) is not enum.EnumMeta] if self.parameter not in available_params: raise ValueError( f"Parameter {self.parameter} is not valid for the given resolution." f"Valid parameters for resolution {self.mosmix_type} can be found here:" f"https://wetterdienst.readthedocs.io/en/latest/data/provider/dwd/mosmix/hourly/" ) @classmethod def _from_dict(cls, dikt: dict[str, Any]) -> WetterdienstPredictionNode: """Create a WetterdienstPredictionNode from a dictionary of node information. :param dikt: dictionary with node information. :return: WetterdienstPredictionNode object. """ name, _, _, _, _ = cls._read_dict_info(dikt) params = cls._get_params(dikt) mosmix_type = dikt.get("mosmix_type") try: return cls(name, "", "wetterdienst_prediction", mosmix_type=mosmix_type, **params) except (TypeError, AttributeError) as e: raise TypeError(f"Could not convert all types for node {name}.") from e