"""The OPC UA module provides utilities for the flexible creation of OPC UA connections."""
from __future__ import annotations
import asyncio
import concurrent.futures
import socket
from concurrent.futures import (
CancelledError as ConCancelledError,
TimeoutError as ConTimeoutError,
)
from contextlib import contextmanager
from datetime import datetime, timedelta
from logging import getLogger
from typing import TYPE_CHECKING
import asyncua.sync
import pandas as pd
# TODO: add async import: from asyncua import Client as asyncClient
# https://git.ptw.maschinenbau.tu-darmstadt.de/eta-fabrik/public/eta-utility/-/issues/270
from asyncua import ua
# TODO: add async import: from asyncua.common.subscription import Subscription as asyncSubscription
# https://git.ptw.maschinenbau.tu-darmstadt.de/eta-fabrik/public/eta-utility/-/issues/270
from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256
# Synchronous imports
from asyncua.sync import Client, Subscription, ThreadLoopNotRunning
from asyncua.ua import SecurityPolicy, uaerrors
from eta_nexus.connections.connection_utils import IntervalChecker, RetryWaiter
from eta_nexus.nodes import OpcuaNode
from eta_nexus.subscription_handlers import SubscriptionHandler
from eta_nexus.util import KeyCertPair, Suppressor, check_type_mismatch
if TYPE_CHECKING:
from collections.abc import Generator, Mapping, Sequence
from typing import Any
from eta_nexus.subscription_handlers import SubscriptionHandler
# Sync import
# Async import
# TODO: add async import: from asyncua import Node as asyncSyncOpcNode
# https://git.ptw.maschinenbau.tu-darmstadt.de/eta-fabrik/public/eta-utility/-/issues/270
from eta_nexus.util.type_annotations import Nodes, Primitive, TimeStep
from eta_nexus.connections.connection import Connection, StatusReadable, StatusSubscribable, StatusWritable
[docs]
class OpcuaConnection(
Connection[OpcuaNode],
StatusReadable[OpcuaNode],
StatusWritable[OpcuaNode],
StatusSubscribable[OpcuaNode],
protocol="opcua",
):
"""The OPC UA Connection class allows reading and writing from and to OPC UA servers. Additionally,
it implements a subscription method, which reads continuously in a specified interval.
:param url: URL of the OPC UA Server.
:param usr: Username in OPC UA for login.
:param pwd: Password in OPC UA for login.
:param nodes: List of nodes to use for all operations.
"""
logger = getLogger(__name__)
def __init__(
self,
url: str,
usr: str | None = None,
pwd: str | None = None,
*,
nodes: Nodes[OpcuaNode] | None = None,
key_cert: KeyCertPair | None = None,
**kwargs: Any,
) -> None:
super().__init__(url, usr, pwd, nodes=nodes)
if self.url_parsed.scheme != "opc.tcp":
raise ValueError("Given URL is not a valid OPC url (scheme: opc.tcp).")
self.connection: Client
self._connected = False
self._retry = RetryWaiter()
self._retry_interval_checker = RetryWaiter()
self._conn_check_interval = 1
self._sub: Subscription
self._subbed_nodes: list[int] = []
self._sub_task: asyncio.Task
self._subscription_open: bool = False
self._subscription_nodes: set[OpcuaNode] = set()
self.connection_interval_checker = IntervalChecker()
self._key_cert: KeyCertPair | None = key_cert
self._try_secure_connect = True
@classmethod
def _from_node(
cls, node: OpcuaNode, usr: str | None = None, pwd: str | None = None, **kwargs: Any
) -> OpcuaConnection:
"""Initialize the connection object from an Opcua protocol Node object.
:param node: Node to initialize from.
:param usr: Username to use.
:param pwd: Password to use.
:param kwargs: Other arguments are ignored.
:return: OpcuaConnection object.
"""
key_cert = kwargs.get("key_cert")
return super()._from_node(node, usr=usr, pwd=pwd, key_cert=key_cert)
[docs]
@classmethod
def from_ids(
cls,
ids: Sequence[str],
url: str,
usr: str | None = None,
pwd: str | None = None,
) -> OpcuaConnection:
"""Initialize the connection object from an OPC UA protocol through the node IDs.
:param ids: Identification of the Node.
:param url: URL for connection.
:param usr: Username in OPC UA for login.
:param pwd: Password in OPC UA for login.
:return: OpcuaConnection object.
"""
nodes = [OpcuaNode(name=opc_id, usr=usr, pwd=pwd, url=url, protocol="opcua", opc_id=opc_id) for opc_id in ids]
return cls(nodes[0].url, usr, pwd, nodes=nodes)
[docs]
def read(self, nodes: OpcuaNode | Nodes[OpcuaNode] | None = None) -> pd.DataFrame:
"""Read some manually selected values from OPC UA capable controller.
:param nodes: Single node or list/set of nodes to read from.
:return: pandas.DataFrame containing current values of the OPC UA-variables.
:raises ConnectionError: When an error occurs during reading.
"""
_nodes = self._validate_nodes(nodes)
def read_node(node: OpcuaNode) -> dict[str, list]:
try:
opcua_variable = self.connection.get_node(node.opc_id)
value = opcua_variable.read_value()
if node.dtype is not None:
# Check for type mismatch between configured dtype and server data type
try:
opcua_variant_type = opcua_variable.read_data_type_as_variant_type()
check_type_mismatch(node.dtype, opcua_variant_type, node.name, self.logger)
except Exception:
# If we can't determine the server type, log at debug level and continue
self.logger.debug(f"Could not determine OPC UA data type for node '{node.name}'")
try:
value = node.dtype(value)
except ValueError as e:
raise ConnectionError(
f"Failed to typecast value '{value}' at {node.name} to {node.dtype.__name__}."
) from e
except uaerrors.BadNodeIdUnknown:
raise ConnectionError(
f"The node id ({node.opc_id}) refers to a node that does not exist in the server address space "
f"{self.url}. (BadNodeIdUnknown)"
) from None
except RuntimeError as e:
raise ConnectionError(str(e)) from e
else:
return {node.name: [value]}
values: dict[str, list] = {}
with self._connection(), concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(read_node, _nodes)
for result in results:
values.update(result)
return pd.DataFrame(values, index=[self._assert_tz_awareness(datetime.now())])
[docs]
def write(self, values: Mapping[OpcuaNode, Primitive]) -> None:
"""Writes some manually selected values on OPC UA capable controller.
:param values: Dictionary of nodes and data to write {node: value}.
:raises ConnectionError: When an error occurs during reading.
"""
nodes = self._validate_nodes(set(values.keys()))
with self._connection():
for node in nodes:
try:
opcua_variable = self.connection.get_node(node.opc_id)
opcua_variable_type = opcua_variable.read_data_type_as_variant_type()
value = node.dtype(values[node]) if node.dtype is not None else values[node]
opcua_variable.write_value(ua.DataValue(ua.Variant(value, opcua_variable_type)))
except uaerrors.BadNodeIdUnknown as e:
raise ConnectionError(
f"The node id ({node.opc_id}) refers to a node that does not exist in the server address space "
f"{self.url}. (BadNodeIdUnknown)"
) from e
except RuntimeError as e:
raise ConnectionError(str(e)) from e
[docs]
def subscribe(
self, handler: SubscriptionHandler, nodes: OpcuaNode | Nodes[OpcuaNode] | None = None, interval: TimeStep = 1
) -> None:
"""Subscribe to nodes and call handler when new data is available. Basic architecture of the subscription is
the client- server communication via subscription notify. This function works asynchronously. Subscriptions
must always be closed using the close_sub function (use try, finally!).
:param nodes: Single node or list/set of nodes to subscribe to.
:param handler: SubscriptionHandler object with a push method that accepts node, value pairs.
:param interval: Interval for receiving new data. It is interpreted as seconds when given as an integer.
"""
_nodes = self._validate_nodes(nodes)
interval = interval if isinstance(interval, timedelta) else timedelta(seconds=interval)
self._subscription_nodes.update(_nodes)
if self._subscription_open:
# Adding nodes to subscription is enough to include them in the query. Do not start an additional loop
# if one already exists
return
self._subscription_open = True
loop = asyncio.get_event_loop()
self._sub_task = loop.create_task(
self._subscription_loop(
_OPCSubHandler(handler=handler, interval_check_handler=self.connection_interval_checker),
float(interval.total_seconds()),
)
)
async def _subscription_loop(self, handler: _OPCSubHandler, interval: float) -> None:
"""The subscription loop makes sure that the subscription is reset in case the server generates an error.
:param handler: Handler object with a push function to receive data.
:param interval: Interval for requesting data in seconds.
"""
subscribed = False
while self._subscription_open:
try:
if not self._connected:
await self._retry.wait_async()
try:
self._connect()
except ConnectionError:
self.logger.error(f"Retrying to connect to {self.url}.") # noqa: TRY400
continue
elif self._connected and not subscribed:
try:
self._sub = self.connection.create_subscription(interval * 1000, handler)
subscribed = True
except RuntimeError:
subscribed = False
self.logger.error(f"Unable to subscribe to server {self.url} - Retrying.") # noqa: TRY400
self._disconnect()
continue
for node in self._subscription_nodes:
try:
handler.add_node(node.opc_id, node) # type: ignore[arg-type]
self._subbed_nodes.append(
self._sub.subscribe_data_change(self.connection.get_node(node.opc_id))
)
except RuntimeError as e:
self.logger.warning(
f"Could not subscribe to node '{node.name}' on server {self.url}, error: {e}"
)
except (ConnectionAbortedError, ConnectionResetError, TimeoutError, ConCancelledError, BaseException) as e:
if isinstance(e, asyncio.CancelledError):
self.logger.debug(f"Subscription loop for {self.url} was cancelled.")
break
msg = None
if isinstance(e, (ConnectionAbortedError, ConnectionResetError)):
msg = f"Subscription to the OPC UA server {self.url} is unexpectedly terminated."
if isinstance(e, TimeoutError):
msg = f"OPC UA client for server {self.url} doesn't receive a response from the server."
if isinstance(e, ConCancelledError):
msg = (
f"Connection to OPC UA-Server {self.url} was terminated "
"during connection establishment or maintenance."
)
self.logger.exception(f"Handling exception for server {self.url}.")
if msg:
msg += " Trying to reconnect."
self.logger.info(msg)
subscribed = False
self._connected = False
# Exit point in case the connection operates normally.
if not self._check_connection():
# Push Nan for every node
for node in self._subscription_nodes:
handler.handler.push(node=node, value=float("nan"), timestamp=datetime.now())
subscribed = False
self._connected = False
self._disconnect()
elif self._connected and subscribed:
_changed_within_interval = self.connection_interval_checker.check_interval_connection()
if not _changed_within_interval:
subscribed = False
self._connected = False
self.logger.warning(
f"The subscription connection for {self.url} doesn't change the values "
"anymore. Trying to reconnect."
)
self._disconnect()
self._retry_interval_checker.tried()
await self._retry_interval_checker.wait_async()
else:
self._retry_interval_checker.success()
await asyncio.sleep(self._conn_check_interval)
[docs]
def close_sub(self) -> None:
"""Close an open subscription."""
self._subscription_open = False
try:
self._sub.unsubscribe(self._subbed_nodes)
except AttributeError:
# Occurs only if subscription did not exist and can be ignored.
pass
except ThreadLoopNotRunning:
# Occurs only if subscription was already stopped (and therefore the ThreadLoop as well) and can be ignored.
pass
except Exception:
self.logger.exception("Canceling OpcUA subscription failed.")
finally:
self._subbed_nodes = []
try:
self._sub_task.cancel()
self._sub.delete()
except (OSError, RuntimeError) as e:
self.logger.debug(f"Deleting subscription for server {self.url} failed.")
self.logger.debug(f"Server {self.url} returned error: {e}.")
except (TimeoutError, ConTimeoutError):
self.logger.debug(f"Timeout occurred while trying to close the subscription to server {self.url}.")
except AttributeError:
# Occurs if the subscription did not exist and can be ignored.
pass
except asyncua.sync.ThreadLoopNotRunning:
# Occurs if the subscription (and therefore the thread loop) was already closed and can be ignored.
pass
self._disconnect()
def _connect(self) -> None:
"""Connect to server. This will try to securely connect using Basic256SHA256 method
before trying an insecure connection.
"""
if not hasattr(self, "connection"):
# Do not reninitialize connection if it already exists
self.connection = Client(self.url)
self._connected = False
if self.usr is not None:
self.connection.set_user(self.usr)
if self.pwd is not None:
self.connection.set_password(self.pwd)
self._retry.tried()
def _connect_insecure() -> None:
self.connection.aio_obj.security_policy = SecurityPolicy()
self.connection.aio_obj.uaclient.set_security(self.connection.aio_obj.security_policy)
self.connection.connect()
def _connect_secure(key_cert: KeyCertPair) -> None:
try:
self.connection.set_security(SecurityPolicyBasic256Sha256, key_cert.cert_path, key_cert.key_path)
with Suppressor():
self.connection.connect()
except ua.uaerrors.BadSecurityPolicyRejected:
self._try_secure_connect = False
_connect_insecure()
except ua.UaError as e:
if "No matching endpoints" in str(e):
self._try_secure_connect = False
_connect_insecure()
else:
raise
except (TimeoutError, ConTimeoutError, asyncio.exceptions.TimeoutError) as e:
self._try_secure_connect = False
raise ConnectionError("Host timeout during secure connect") from e
try:
if self._key_cert is not None and self._try_secure_connect:
_connect_secure(key_cert=self._key_cert)
else:
_connect_insecure()
except (socket.herror, socket.gaierror) as e:
raise ConnectionError(f"Host not found: {self.url}") from e
except (TimeoutError, ConTimeoutError, asyncio.exceptions.TimeoutError) as e:
raise ConnectionError(f"Host timeout: {self.url}") from e
except ConCancelledError as e:
raise ConnectionError(f"Connection cancelled by host: {self.url}") from e
except (RuntimeError, ConnectionError) as e:
raise ConnectionError(f"OPC Connection Error: {self.url}: {e!s}") from e
else:
self.logger.debug(f"Connected to OPC UA server: {self.url}")
self._connected = True
self._retry.success()
def _check_connection(self) -> bool:
if self._connected:
try:
self.connection.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State)).read_value()
except AttributeError:
self._connected = False
self.logger.debug(f"Connection to server {self.url} did not exist - connection check failed.")
except Exception:
self._connected = False
self.logger.error(f"Error while checking connection to server {self.url}.") # noqa: TRY400
else:
self._connected = True
if not self._connected:
self._disconnect()
return self._connected
def _disconnect(self) -> None:
"""Disconnect from server."""
self._connected = False
try:
self.connection.disconnect()
except (ConCancelledError, ConnectionAbortedError):
self.logger.debug(f"Connection to {self.url} already closed by server.")
except (OSError, RuntimeError) as e:
self.logger.debug(f"Closing connection to server {self.url} failed")
self.logger.debug(f"Connection to {self.url} returned an error while closing the connection: {e}")
except AttributeError:
self.logger.debug(f"Connection to server {self.url} already closed.")
@contextmanager
def _connection(self) -> Generator:
"""Connect to the server and return a context manager that automatically disconnects when finished."""
try:
self._connect()
yield None
finally:
self._disconnect()
class _OPCSubHandler:
"""Wrapper for the OPC UA subscription. Enables the subscription to use the standardized eta_nexus subscription
format.
:param handler: *eta_nexus* style subscription handler.
"""
logger = getLogger(__name__)
def __init__(self, handler: SubscriptionHandler, interval_check_handler: IntervalChecker) -> None:
self.handler = handler
self._sub_nodes: dict[str | int, OpcuaNode] = {}
self._node_interval_to_check = interval_check_handler
def add_node(self, opc_id: str | int, node: OpcuaNode) -> None:
"""Add a node to the subscription. This is necessary to translate between formats."""
self._sub_nodes[opc_id] = node
def datachange_notification(self, node: OpcuaNode, val: Primitive, data: Any) -> None:
"""datachange_notification is called whenever subscribed input data is received via OPC UA. This pushes data
to the actual eta_nexus subscription handler.
:param node: Node Object, which was subscribed to and which has sent an updated value.
:param val: New value of OPC UA node.
:param data: Raw data of OPC UA (not used).
"""
_time = self.handler._assert_tz_awareness(datetime.now())
subscribed_node = self._sub_nodes[str(node)]
# Check for type mismatch and convert value if dtype is configured
if subscribed_node.dtype is not None:
# Get the actual OPC UA variant type from the data notification
try:
if hasattr(data, "monitored_item") and hasattr(data.monitored_item, "Value"):
opcua_variant_type = data.monitored_item.Value.Value.VariantType
check_type_mismatch(subscribed_node.dtype, opcua_variant_type, subscribed_node.name, self.logger)
except Exception:
self.logger.debug(
f"Could not determine OPC UA data type for node '{subscribed_node.name}' in subscription"
)
try:
val = subscribed_node.dtype(val)
except (ValueError, TypeError) as e:
dtype_name = getattr(subscribed_node.dtype, "__name__", str(subscribed_node.dtype))
self.logger.warning(
f"Failed to convert value {val!r} to {dtype_name} for node '{subscribed_node.name}': {e}"
)
self.handler.push(subscribed_node, val, _time)
self._node_interval_to_check.push(node=subscribed_node, value=val, timestamp=_time)
def status_change_notification(self, status: ua.StatusChangeNotification) -> None:
pass
def event_notification(self, event: ua.EventNotificationList) -> None:
pass