Source code for eta_nexus.servers.loaders.opcua_server_loader

from eta_nexus.nodes import OpcuaNode
from eta_nexus.nodes.node_utils import _dtype_converter
from eta_nexus.servers import OpcuaServer
from eta_nexus.util.io_utils import load_config
from eta_nexus.util.utils import dict_get_any, url_parse


[docs] def load_opcua_servers_from_config(config_path: str, port_override: int | None = None) -> list[OpcuaServer]: """ Load and instantiate OPC UA servers from a configuration file. Supported config shape (normalized internally): { "system": [ { "name": "CHP", "servers": { "glt": {"url": "127.0.0.1:4840", "protocol": "opcua", "usr": "...", "pwd": "..."} }, "nodes": [ {"name": "power_elek", "server": "glt", "opc_id": "ns=6;s=...", "dtype": "float"}, ... ] } ] } Normalization rules: - Endpoint: prefer `url`/`endpoint` (with or without `opc.tcp://`); otherwise compose from `ip` + `port`. - If only `url` is provided, it is parsed into `ip` and `port`. - NodeId must be provided as `opc_id`. - DType aliases are mapped case-insensitively via `_dtype_converter`. Returns: List[OpcuaServer] with nodes created and attached. """ config = load_config(config_path) servers = [] raw_servers = _extract_servers(config) for s in raw_servers: namespace = s.get("namespace") or s.get("name") or "default" ip = s.get("ip") port = port_override if port_override is not None else s.get("port", 4840) server = OpcuaServer(namespace=namespace, ip=ip, port=port) nodes_conf = s.get("nodes", []) nodes = [] for node_conf in nodes_conf: node = OpcuaNode( name=node_conf["name"], url=server.url, protocol="opcua", opc_id=node_conf["opc_id"], dtype=_dtype_converter(str(node_conf.get("dtype", "float"))) or float, ) nodes.append(node) if nodes: server.create_nodes(nodes) server.nodes = nodes servers.append(server) return servers
def _resolve_ip_port_from_url(url: str) -> tuple[str | None, int | None]: """Resolve host and port from a URL using shared util; default port 4840.""" _url, _usr, _pwd = url_parse(url, scheme="opc.tcp") host = _url.hostname port = _url.port if _url.port is not None else 4840 return host, port def _extract_servers(config: dict) -> list[dict]: """Normalize supported config shapes into a list of standardized servers.""" systems_obj = config.get("system") if systems_obj is None: return [] return _extract_from_system(systems_obj) def _extract_from_system(systems_obj: dict | list) -> list[dict]: out: list[dict] = [] systems = systems_obj if isinstance(systems_obj, list) else [systems_obj] for sys_entry in systems: if not isinstance(sys_entry, dict): continue sys_name = sys_entry.get("name") or "default" servers_obj = sys_entry.get("servers", {}) or {} nodes_all = sys_entry.get("nodes", []) or [] mat_servers: dict[str, dict] = {} if isinstance(servers_obj, dict): for srv_key, srv_val in servers_obj.items(): if not isinstance(srv_val, dict): continue ip, port = None, srv_val.get("port", 4840) url = dict_get_any(srv_val, "url", "endpoint", fail=False, default=None) if url: ip, port = _resolve_ip_port_from_url(str(url)) else: ip = srv_val.get("ip") port = srv_val.get("port", 4840) mat_servers[srv_key] = { "name": srv_key, "namespace": sys_name, "ip": ip, "port": port, "nodes": [], } for n in nodes_all: if not isinstance(n, dict): continue srv_ref = n.get("server") if srv_ref in mat_servers: name_n = n.get("name") opc_id = n.get("opc_id") if not (name_n and opc_id): continue dtype = n.get("dtype", n.get("datatype", "float")) mat_servers[srv_ref]["nodes"].append({"name": name_n, "opc_id": opc_id, "dtype": dtype}) out.extend(mat_servers.values()) return out