Source code for lvmopstools.devices.nps
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2024-09-13
# @Filename: nps.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
from typing import Sequence
from typing_extensions import TypedDict
from sdsstools import GatheringTaskGroup
from lvmopstools import config
from lvmopstools.clu import send_clu_command
from lvmopstools.retrier import Retrier
__all__ = ["read_nps", "read_outlet"]
class NPSStatus(TypedDict):
"""A class to represent the status of an NPS."""
actor: str
name: str
id: int
state: bool
[docs]
@Retrier(max_attempts=3, delay=1)
async def read_nps(actors: Sequence[str] | str | None = None) -> dict[str, NPSStatus]:
"""Returns the status of all NPS."""
default_actors: list[str] = config["devices.nps.default_actors"]
valid_actors: list[str] = config["devices.nps.valid_actors"]
if isinstance(actors, str):
actors = [actors]
if actors is None:
actors = default_actors
async with GatheringTaskGroup() as group:
for actor in actors:
if not actor.startswith("lvmnps."):
actor = "lvmnps." + actor
if actor not in valid_actors:
raise ValueError(f"Invalid NPS actor: {actor}.")
group.create_task(
send_clu_command(
f"{actor} status",
raw=True,
internal=True,
)
)
nps_data: dict[str, NPSStatus] = {}
for cmd in group.results():
actor = cmd.consumer_id
outlets = cmd.replies.get("outlets")
for outlet in outlets:
key = f"{actor.split('.')[1]}.{outlet['normalised_name']}"
nps_data[key] = NPSStatus(
actor=actor,
name=outlet["normalised_name"],
id=outlet["id"],
state=outlet["state"],
)
return nps_data
[docs]
@Retrier(max_attempts=3, delay=1)
async def read_outlet(actor: str, outlet: str | int) -> NPSStatus:
"""Returns the status of a single NPS outlet."""
data = await send_clu_command(f"{actor} status {outlet}", internal=True)
return {
"actor": actor,
"name": data[0]["outlet_info"]["normalised_name"],
"id": data[0]["outlet_info"]["id"],
"state": data[0]["outlet_info"]["state"],
}