Source code for lvmopstools.devices.ion

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2024-09-11
# @Filename: ion.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import warnings

from typing import cast

from typing_extensions import TypedDict

from drift import Drift
from drift.convert import data_to_float32

from lvmopstools import config
from lvmopstools.clu import send_clu_command
from lvmopstools.devices.nps import read_outlet
from lvmopstools.retrier import Retrier


__all__ = ["read_ion_pumps", "toggle_ion_pump", "convert_pressure", "ALL"]


#: Flag to toggle all ion pumps.
ALL = "all"


def convert_pressure(volts: float):
    """Converts differential voltage to pressure in Torr."""

    # The calibration is a linear fit of the form y = mx + b
    m = 2.04545
    b = -6.86373

    log10_pp0 = m * volts + b  # log10(PPa), pressure in Pascal

    torr = 10**log10_pp0 * 0.00750062

    return torr


class IonPumpDict(TypedDict):
    """Ion pump dictionary."""

    pressure: float | None
    on: bool | None
    diff_voltage: float | None


@Retrier(max_attempts=3, delay=1)
async def _read_one_ion_controller(ion_config: dict) -> dict[str, IonPumpDict]:
    """Reads the signal and on/off status from an ion controller."""

    results: dict[str, IonPumpDict] = {}

    drift = Drift(ion_config["host"], ion_config.get("port", 502), timeout=1)

    async with drift:
        for camera, camera_config in ion_config["cameras"].items():
            signal_address = camera_config["signal_address"]
            # on_off_address = camera_config["on_off_address"]

            signal = await drift.client.read_input_registers(signal_address, count=2)
            # onoff = await drift.client.read_input_registers(on_off_address, count=1)

            registers = cast(tuple[int, int], tuple(signal.registers))

            diff_volt = data_to_float32(registers)
            pressure = convert_pressure(diff_volt)

            # onoff_status = bool(onoff.registers[0])
            onoff_status = pressure > 1e-8

            # No point in reporting a bogus pressure.
            if pressure < 1e-8:
                pressure = None

            results[camera] = {
                "pressure": float(f"{pressure:.3g}") if pressure is not None else None,
                "on": onoff_status,
                "diff_voltage": round(diff_volt, 3),
            }

    return results


[docs] async def read_ion_pumps(cameras: list[str] | None = None) -> dict[str, IonPumpDict]: """Reads the signal and on/off status from an ion pump. Parameters ---------- cameras A list of cameras to read. If `None`, reads all cameras. """ ion_config: list[dict] = config["devices.ion"] results: dict[str, IonPumpDict] = {} tasks: list[asyncio.Task] = [] for ion_controller in ion_config: type_ = ion_controller.get("type", "ion_controller") controller_cameras = ion_controller["cameras"] if cameras is not None: # Skip reading this controller if none of the cameras are in the list. if len(set(cameras) & set(controller_cameras)) == 0: continue for camera in controller_cameras: results[camera] = {"pressure": None, "on": None, "diff_voltage": None} if type_ == "nps": if (cameras is None) or (cameras is not None and camera in cameras): nps_data = await read_outlet( ion_controller["actor"], ion_controller["outlet"], ) results[camera] = { "pressure": None, "on": nps_data["state"], "diff_voltage": None, } if type_ == "ion_controller": tasks.append(asyncio.create_task(_read_one_ion_controller(ion_controller))) await asyncio.gather(*tasks, return_exceptions=True) for task in tasks: if task.exception(): warnings.warn(f"Error reading ion pump: {task.exception()}") continue for camera, item in task.result().items(): results[camera] = item if cameras is not None: results = {camera: results[camera] for camera in cameras if camera in results} if cameras is not None and set(results.keys()) != set(cameras): warnings.warn("Not all cameras were found in the configuration.") return results
[docs] @Retrier(max_attempts=3, delay=1) async def toggle_ion_pump(camera: str, on: bool): """Turns the ion pump on or off. Parameters ---------- camera The camera for which to toggle the ion pump. Can also be :obj:`.ALL` to toggle all ion pumps. on If `True`, turns the pump on. If `False`, turns the pump off. If `None`, toggles the pump current status. """ ion_config: list[dict] = config["devices.ion"] if camera == ALL: cameras = [camera for ic in ion_config for camera in ic["cameras"]] for camera in cameras: await toggle_ion_pump(camera, on) return # ion_controller or nps type_: str | None = None # Ion pump connected to an NPS actor: str | None = None outlet: str | int | None = None # Ion controller box host: str | None = None port: int | None = None on_off_address: int | None = None for ic in ion_config: if camera in ic["cameras"]: type_ = ic.get("type", "ion_controller") if type_ == "ion_controller": host = ic["host"] port = ic.get("port", 502) on_off_address = ic["cameras"][camera]["on_off_address"] elif type_ == "nps": actor = ic["actor"] outlet = ic["outlet"] else: raise ValueError(f"Unknown type {type_!r} for ion controller {ic!r}.") break if type_ is None: raise ValueError(f"Camera {camera!r} not found in the configuration.") if type_ == "nps": cmd = await send_clu_command( f"{actor} {'on' if on else 'off'} {outlet}", raw=True, ) if cmd.status.did_fail: raise ValueError(f"Error toggling ion pump for camera {camera!r} via NPS.") return # Ion controller box if host is None or port is None or on_off_address is None: raise ValueError(f"Camera {camera!r} configuration is incomplete.") drift = Drift(host, port) async with drift: value = 2**16 - 1 if on else 0 await drift.client.write_register(on_off_address, value)