Source code for lvmopstools.devices.nps

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2024-09-13
# @Filename: nps.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

from typing import Sequence

from typing_extensions import TypedDict

from sdsstools import GatheringTaskGroup

from lvmopstools import config
from lvmopstools.clu import send_clu_command
from lvmopstools.retrier import Retrier


__all__ = ["read_nps", "read_outlet"]


class NPSStatus(TypedDict):
    """A class to represent the status of an NPS."""

    actor: str
    name: str
    id: int
    state: bool


[docs] @Retrier(max_attempts=3, delay=1) async def read_nps(actors: Sequence[str] | str | None = None) -> dict[str, NPSStatus]: """Returns the status of all NPS.""" default_actors: list[str] = config["devices.nps.default_actors"] valid_actors: list[str] = config["devices.nps.valid_actors"] if isinstance(actors, str): actors = [actors] if actors is None: actors = default_actors async with GatheringTaskGroup() as group: for actor in actors: if not actor.startswith("lvmnps."): actor = "lvmnps." + actor if actor not in valid_actors: raise ValueError(f"Invalid NPS actor: {actor}.") group.create_task( send_clu_command( f"{actor} status", raw=True, internal=True, ) ) nps_data: dict[str, NPSStatus] = {} for cmd in group.results(): actor = cmd.consumer_id outlets = cmd.replies.get("outlets") for outlet in outlets: key = f"{actor.split('.')[1]}.{outlet['normalised_name']}" nps_data[key] = NPSStatus( actor=actor, name=outlet["normalised_name"], id=outlet["id"], state=outlet["state"], ) return nps_data
[docs] @Retrier(max_attempts=3, delay=1) async def read_outlet(actor: str, outlet: str | int) -> NPSStatus: """Returns the status of a single NPS outlet.""" data = await send_clu_command(f"{actor} status {outlet}", internal=True) return { "actor": actor, "name": data[0]["outlet_info"]["normalised_name"], "id": data[0]["outlet_info"]["id"], "state": data[0]["outlet_info"]["state"], }