Source code for lvmopstools.devices.thermistors

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2024-09-12
# @Filename: thermistors.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import re

from typing import Literal, overload

import asyncudp

from lvmopstools import config
from lvmopstools.retrier import Retrier


__all__ = ["read_thermistors", "channel_to_valve"]


@overload
def channel_to_valve(reverse: Literal[False] = False) -> dict[int, str]: ...


@overload
def channel_to_valve(reverse: Literal[True] = True) -> dict[str, int]: ...


[docs] def channel_to_valve(reverse: bool = False) -> dict[int, str] | dict[str, int]: """Returns a mapping of thermistor channels to valve names. With ``reverse`` returns the inverse mapping, valve to device channel. """ valve_to_channel = config["devices.thermistors.channels"] if reverse: return valve_to_channel channel_to_valve = {valve_to_channel[valve]: valve for valve in valve_to_channel} return channel_to_valve
[docs] @Retrier(max_attempts=3, delay=1) async def read_thermistors(): """Returns the thermistor values.""" th_config = config["devices.thermistors"] host = th_config["host"] port = th_config["port"] socket: asyncudp.Socket | None = None try: socket = await asyncio.wait_for( asyncudp.create_socket(remote_addr=(host, port)), timeout=5, ) socket.sendto(b"$016\r\n") data, _ = await asyncio.wait_for(socket.recvfrom(), timeout=10) finally: if socket: socket.close() match = re.match(rb"!01([0-9A-F]+)\r", data) if match is None: raise ValueError(f"Invalid response from thermistor server at {host!r}.") value = int(match.group(1), 16) thermistor_values: dict[str, bool] = {} for thermistor in th_config["channels"]: channel = th_config["channels"][thermistor] thermistor_values[thermistor] = bool(int((value & 1 << channel) > 0)) return thermistor_values