Source code for lvmopstools.utils

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2024-01-18
# @Filename: utils.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import os
import subprocess
import time
import warnings
from functools import wraps

from typing import Any, Coroutine, TypeVar

import nmap3

from clu import AMQPClient
from sdsstools.utils import run_in_executor


try:
    import netmiko
except ImportError:
    netmiko = None


__all__ = [
    "get_amqp_client",
    "get_exception_data",
    "stop_event_loop",
    "with_timeout",
    "timeout",
    "is_notebook",
    "Trigger",
    "is_host_up",
    "is_root",
]


[docs] async def get_amqp_client(**kwargs) -> AMQPClient: # pragma: no cover """Returns a CLU AMQP client.""" amqp_client = AMQPClient(**kwargs) await amqp_client.start() return amqp_client
[docs] def get_exception_data(exception: Exception | None, traceback_frame: int = 0): """Returns a dictionary with information about an exception.""" if exception is None: return None if not isinstance(exception, Exception): return None exception_data: dict[str, Any] = {} if exception is not None: filename: str | None = None lineno: int | None = None if exception.__traceback__ is not None: tb = exception.__traceback__ for _ in range(traceback_frame): t_next = tb.tb_next if t_next is None: break tb = t_next filename = tb.tb_frame.f_code.co_filename if tb else None lineno = tb.tb_lineno if tb else None exception_data = { "module": exception.__class__.__module__, "type": exception.__class__.__name__, "message": str(exception), "filename": filename, "lineno": lineno, } return exception_data
[docs] async def stop_event_loop(timeout: float | None = 5): # pragma: no cover """Cancels all running tasks and stops the event loop.""" for task in asyncio.all_tasks(): task.cancel() try: await asyncio.wait_for(asyncio.gather(*asyncio.all_tasks()), timeout=timeout) except asyncio.TimeoutError: pass finally: asyncio.get_running_loop().stop()
[docs] def is_notebook() -> bool: """Returns :obj:`True` if the code is run inside a Jupyter Notebook. https://stackoverflow.com/questions/15411967/how-can-i-check-if-code-is-executed-in-the-ipython-notebook """ try: shell = get_ipython().__class__.__name__ # type: ignore if shell == "ZMQInteractiveShell": return True # Jupyter notebook or qtconsole elif shell == "TerminalInteractiveShell": return False # Terminal running IPython else: return False # Other type (?) except NameError: return False # Probably standard Python interpreter
T = TypeVar("T", bound=Any)
[docs] async def with_timeout( coro: asyncio.Future[T] | Coroutine[Any, Any, T], timeout: float | None, raise_on_timeout: bool = True, ) -> T | None: """Runs a coroutine with a timeout. Parameters ---------- coro The coroutine to run. timeout The timeout in seconds. raise_on_timeout If :obj:`True`, raises a :class:`asyncio.TimeoutError` if the coroutine times out, otherwise returns :obj:`None`. Returns ------- result The result of the coroutine. Raises ------ asyncio.TimeoutError If the coroutine times out. """ try: return await asyncio.wait_for(coro, timeout) except asyncio.TimeoutError: if raise_on_timeout: raise asyncio.TimeoutError(f"Timed out after {timeout} seconds.")
def timeout(timeout: float, raise_on_timeout: bool = True): """Decorator that wraps an async function with :obj:`asyncio.wait_for`. Parameters ---------- timeout The timeout in seconds. raise_on_timeout If :obj:`True`, raises a :class:`asyncio.TimeoutError` if the coroutine times out, otherwise returns :obj:`None`. Example ------- @timeout(5) async def work(): ... @timeout(5, raise_on_timeout=False) async def maybe(): ... """ def decorator(func): if not asyncio.iscoroutinefunction(func): raise TypeError("timeout decorator can only be applied to async functions.") @wraps(func) async def wrapper(*args, **kwargs): try: return await asyncio.wait_for(func(*args, **kwargs), timeout) except asyncio.TimeoutError: if raise_on_timeout: raise asyncio.TimeoutError( f"{func.__name__} timed out after {timeout} seconds." ) return None return wrapper return decorator
[docs] class Trigger: """A trigger that can be set and reset and accepts setting thresholds. This class is essentially just a flag that can take true/false values, but triggering the true value can be delayed by time or number or triggers. Parameters ---------- n The number of times the instance needs to be set before it is triggered. delay The delay in seconds before the instance is triggered. This is counted from the first time the instance is set, and is reset if the instance is reset. If ``n_triggers`` is greater than 1, both conditions must be met for the instance to be triggered. """ def __init__(self, n: int = 1, delay: float = 0): self.n = n self.delay = delay self._triggered = False self._first_trigger: float | None = None self._n_sets: int = 0 def _check(self): """Check the trigger conditions and update the internal state.""" now = time.time() if ( self._n_sets >= self.n and self._first_trigger is not None and now - self._first_trigger >= self.delay ): self._triggered = True
[docs] def set(self): """Sets the trigger.""" if self._triggered: return self._n_sets += 1 self._first_trigger = self._first_trigger or time.time() self._check()
[docs] def reset(self): """Resets the trigger.""" self._first_trigger = None self._n_sets = 0 self._triggered = False
[docs] def is_set(self): """Returns :obj:`True` if the trigger is set.""" self._check() return self._triggered
def is_root(): """Returns whether the user is root. This may not work in all systems.""" return os.geteuid() == 0 async def ping_host(host: str): """Pings a host.""" cmd = await asyncio.create_subprocess_exec( *["ping", "-c", "1", "-W", "5", host], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) return_code = await cmd.wait() return return_code == 0 async def is_host_up(host: str, fallback_to_ping: bool = True) -> bool: """Returns whether a host is up. Parameters ---------- host The host to check. fallback_to_ping If ``True``, a system ping will be used if the user is not root or nmap is not available. This is less reliable than using nmap, but nmap requires running as root for reliable results. Returns ------- is_up ``True`` if the host is up, ``False`` otherwise. """ if not is_root(): if fallback_to_ping: warnings.warn('Running as non-root; using "ping" instead of "nmap".') return await ping_host(host) raise PermissionError("root privileges are required to run nmap.") if not nmap3.get_nmap_path(): if fallback_to_ping: warnings.warn('nmap not available. Using "ping instead.') return await ping_host(host) raise RuntimeError("nmap is not available.") nmap = nmap3.NmapHostDiscovery() result = await run_in_executor( nmap.nmap_no_portscan, host, args="--host-timeout=1 --max-retries=2", ) if ( host not in result or not isinstance(result[host], dict) or "state" not in result[host] or "state" not in result[host]["state"] ): return False return result[host]["state"]["state"] == "up"