API

Actor

exception lvmopstools.actor.CheckError(message: str = '', error_code: ErrorCodesBase | int = ErrorCodes.UNKNOWN)[source]

Bases: Exception

An exception raised when the LVMActor check fails.

class lvmopstools.actor.ActorState(*values)[source]

Bases: Flag

Defines the possible states of the actor.

get_state_names()[source]

Returns the state names that are set.

class lvmopstools.actor.ErrorCodes(*values)

Bases: ErrorCodesBase

class lvmopstools.actor.ErrorCodesBase(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Enumeration of error codes

classmethod get_error_code(error_code: int)[source]

Returns the ErrorCodes that matches the error_code value.

class lvmopstools.actor.ErrorData(code: 'int', critical: 'bool' = False, description: 'str' = '')[source]

Bases: object

class lvmopstools.actor.LVMActor(*args, check_interval: float = 30.0, restart_after: float | None = 300.0, restart_mode='reload', **kwargs)[source]

Bases: AMQPActor

Base class for LVM actors.

is_ready()[source]

Returns True if the actor is ready.

async restart(mode: str | None = None)[source]

Restarts the actor by killing the process.

Parameters:

mode – How to restart the actor. Possible values are "exit", which will finish the process and let the supervisor restart it (for example a Kubernetes scheduler), and "reload" which will stop and restart the actor without killing the process. If None, defaults to restart_mode.

async start()[source]

Starts the actor.

async stop()[source]

Stops the actor.

async troubleshoot(error_code: ErrorCodesBase = ErrorCodes.UNKNOWN, exception: Exception | None = None, traceback_frame: int = 0)[source]

Handles troubleshooting.

update_state(state: ActorState, error_data: dict[str, Any] | None = None, command: Command | None = None, internal: bool = True)[source]

Updates the state and broadcasts the change.

lvmopstools.actor.create_error_codes(error_codes: dict[str, tuple | list | ErrorData], name: str = 'ErrorCodes', include_unknown: bool = True) Any[source]

Creates an enumeration of error codes.

Ephemeris

lvmopstools.ephemeris.sjd_ephemeris(sjd: int, twilight_horizon: float = -15) DataFrame[source]

Returns the ephemeris for a given SJD.

lvmopstools.ephemeris.is_sun_up(include_twilight: bool = False)[source]

Determines whether the Sun is up at the current time.

lvmopstools.ephemeris.create_schedule(start_sjd: int, end_sjd: int, twilight_horizon: float = -15) DataFrame[source]

Creates a schedule for the given time range.

Parameters:
  • start_sjd – The initial SJD.

  • end_sjd – The final SJD of the schedule.

Returns:

The schedule as a Polars dataframe.

Return type:

schedule

InfluxDB

async lvmopstools.influxdb.query_influxdb(url: str, query: str, token: str | None = None, org: str | None = None) DataFrame[source]

Runs a query in InfluxDB and returns a Polars dataframe.

Notifications

class lvmopstools.notifications.NotificationLevel(*values)[source]

Bases: Enum

Allowed notification levels.

lvmopstools.notifications.send_critical_error_email(message: str, subject: str = 'LVM Critical Alert', recipients: Sequence[str] | None = None, from_address: str | None = None, email_reply_to: str | None = None, host: str | None = None, port: int | None = None, tls: bool | None = None, username: str | None = None, password: str | None = None, show_critical_error_preface: bool = True)[source]

Sends a critical error email.

Parameters:
  • message – The message to send.

  • subject – The subject of the email.

  • recipients – The recipients of the email. A list of email addresses or None to use the default recipients.

  • from_address – The email address from which the email is sent.

  • host – The SMTP server host.

  • port – The SMTP server port.

  • tls – Whether to use TLS for authentication.

  • username – The SMTP server username.

  • password – The SMTP server password.

  • show_critical_error_preface – Whether to show a preface in the email indicating that this is a critical error notification.

lvmopstools.notifications.send_email(message: str, subject: str, recipients: Sequence[str], from_address: str, *, html_message: str | None = None, email_reply_to: str | None = None, host: str | None = None, port: int | None = None, tls: bool | None = None, username: str | None = None, password: str | None = None)[source]

Sends an email.

Parameters:
  • message – The plain text message to send.

  • subject – The subject of the email.

  • recipients – The recipients of the email.

  • from_address – The email address from which the email is sent.

  • html_message – The HTML message to send.

  • email_reply_to – The email address to which to reply. Defaults to the sender.

  • host – The SMTP server host.

  • port – The SMTP server port.

  • tls – Whether to use TLS for authentication.

  • username – The SMTP server username.

  • password – The SMTP server password.

async lvmopstools.notifications.send_notification(message: str, level: NotificationLevel | str = NotificationLevel.INFO, slack: bool = True, slack_channels: str | Sequence[str] | None = None, email_on_critical: bool = True, slack_extra_params: dict[str, Any] = {}, email_params: dict[str, Any] = {})[source]

Creates a new notification.

Parameters:
  • message – The message of the notification. Can be formatted in Markdown.

  • level – The level of the notification.

  • slack – Whether to send the notification to Slack.

  • slack_channels – The Slack channel where to send the notification. If not provided, the default channel is used. Can be set to false to disable sending the Slack notification.

  • email_on_critical – Whether to send an email if the notification level is CRITICAL.

  • slack_extra_params – A dictionary of extra parameters to pass to post_message.

  • email_params – A dictionary of extra parameters to pass to send_critical_error_email.

Returns:

The message that was sent.

Return type:

message

PubSub

class lvmopstools.pubsub.Publisher(*args, **kwargs)[source]

Bases: BasePubSub

A class to publish messages to a RabbitMQ exchange. A singleton.

async publish(message: dict, routing_key: str | None = None)[source]

Publishes a message to the exchange.

Parameters:
  • message – The message to publish. Must be a dictionary that will be encoded as a JSON string.

  • routing_key – The routing key to use. If not provided, uses the default routing key defined in the configuration.

class lvmopstools.pubsub.Subscriber(connection_string: str | None = None, exchange_name: str | None = None, callback: Callable[[Message], Awaitable[None]] | None = None, queue_name: str | None = None)[source]

Bases: BasePubSub

A class to subscribe to messages from a RabbitMQ exchange.

async connect(queue_name: str | None = None) Self[source]

Connects to the exchange, declares a queue, and binds the callback.

Parameters:

queue_name – The name of the queue to declare. If not provided, a random name will be generated (recommended).

async disconnect()[source]

Disconnects from the RabbitMQ server.

async get(decode: Literal[True] = True) Message[source]
async get(decode: Literal[False]) AbstractIncomingMessage

Gets the next message from the queue.

async iterator(decode: Literal[True] = True) AsyncGenerator[Message, None][source]
async iterator(decode: Literal[False] = False) AsyncGenerator[AbstractIncomingMessage, None]

Iterates over a queue and yields messages.

async lvmopstools.pubsub.send_event(event: Event | str, payload: dict[str, Any] = {})[source]

Convenience function to publish an event to the exchange.

class lvmopstools.pubsub.Event(*values)[source]

Bases: UppercaseStrEnum

Enumeration with the event types.

class lvmopstools.pubsub.Message(message: AbstractIncomingMessage)[source]

Bases: object

A model for messages to be published to the exchange.

Retrier

class lvmopstools.retrier.Retrier(max_attempts: int = 3, delay: float = 1, use_exponential_backoff: bool = True, exponential_backoff_base: float = 2, max_delay: float = 32.0, on_retry: ~typing.Callable[[Exception], None] | None = None, raise_on_exception_class: list[type[Exception]] = <factory>, timeout: float | None = None)[source]

Bases: object

A class that implements a retry mechanism.

The object returned by this class can be used to wrap a function that will be retried max_attempts times if it fails:

def test_function():
    ...

retrier = Retrier(max_attempts=5)
retrier(test_function)()

where the wrapped function can be a coroutine, in which case the wrapped function will also be a coroutine.

Most frequently this class will be used as a decorator:

@Retrier(max_attempts=4, delay=0.1)
async def test_function(x, y):
    ...

await test_function(1, 2)
Parameters:
  • max_attempts (int) – The maximum number of attempts before giving up.

  • delay (float) – The delay between attempts, in seconds.

  • use_exponential_backoff (bool) – Whether to use exponential backoff for the delay between attempts. If True, the delay will be delay * exponential_backoff_base ** (attempt - 1) + random_ms where random_ms is a random number between 0 and 100 ms used to avoid synchronisation issues.

  • exponential_backoff_base (float) – The base for the exponential backoff.

  • max_delay (float) – The maximum delay between attempts when using exponential backoff.

  • on_retry (Callable[[Exception], None] | None) – A function that will be called when a retry is attempted. The function should accept an exception as its only argument.

  • raise_on_exception_class (list[type[Exception]]) – A list of exception classes that will cause an exception to be raised without retrying.

  • timeout (float | None) – If defined, each attempt can take at most this amount of time. If the attempt times out, an asyncio.TimeoutError will be raised. This only works if the wrapped function is a coroutine.

calculate_delay(attempt: int) float[source]

Calculates the delay for a given attempt.

Slack

async lvmopstools.slack.get_user_id(name: str)[source]

Gets the userID of the user display name matches name.

async lvmopstools.slack.get_user_list()[source]

Returns the list of users in the workspace.

This function is cached because Slack limits the requests for this route.

async lvmopstools.slack.post_message(text: str | None = None, blocks: Sequence[dict] | None = None, channel: str | None = None, mentions: list[str] = [], username: str | None = None, icon_url: str | None = None, **kwargs)[source]

Posts a message to Slack.

Parameters:
  • text – Plain text to send to the Slack channel.

  • blocks – A list of blocks to send to the Slack channel. These follow the Slack API format for blocks. Incompatible with text.

  • channel – The channel in the SSDS-V workspace where to send the message.

  • mentions – A list of users to mention in the message.

Socket

class lvmopstools.socket.AsyncSocketHandler(host: str, port: int, timeout: float = 5, retry: bool = True, retrier_params: dict[str, ~typing.Any] = <factory>)[source]

Bases: object

Handles a socket connection and disconnection.

Handles secure connection and disconnection to a TCP server and executed a callback. By default Retrier is used to retry the connection if it fails either during the connection phase or during callback execution.

There are two ways to use this class. The first one is to create an instance and call it with a callback function which receives StreamReader and StreamWriter arguments

async def callback(reader, writer):
    ...

handler = AsyncSocketHandler(host, port)
await handler(callback)

Alternatively, you can subclass AsyncSocketHandler and override the request method

class MyHandler(AsyncSocketHandler):
    async def request(self, reader, writer):
        ...
Parameters:
  • host (str) – The host that is running the server.

  • port (int) – The port on which the server is listening.

  • timeout (float) – The timeout for connection and callback execution.

  • retry (bool) – Whether to retry the connection/callback if they fails.

  • retrier_params (dict[str, Any]) – Parameters to pass to the Retrier instance.

async request(reader: StreamReader, writer: StreamWriter)[source]

Sends a request to the socket.

If the handler is not called with a callback function, this method must be overridden in a subclass. It receives the StreamReader and StreamWriter client instances after a connection has been established.

Utils

async lvmopstools.utils.get_amqp_client(**kwargs) AMQPClient[source]

Returns a CLU AMQP client.

lvmopstools.utils.get_exception_data(exception: Exception | None, traceback_frame: int = 0)[source]

Returns a dictionary with information about an exception.

async lvmopstools.utils.stop_event_loop(timeout: float | None = 5)[source]

Cancels all running tasks and stops the event loop.

async lvmopstools.utils.with_timeout(coro: Future[T] | Coroutine[Any, Any, T], timeout: float | None, raise_on_timeout: bool = True) T | None[source]

Runs a coroutine with a timeout.

Parameters:
  • coro – The coroutine to run.

  • timeout – The timeout in seconds.

  • raise_on_timeout – If True, raises a asyncio.TimeoutError if the coroutine times out, otherwise returns None.

Returns:

The result of the coroutine.

Return type:

result

Raises:

asyncio.TimeoutError – If the coroutine times out.

lvmopstools.utils.is_notebook() bool[source]

Returns True if the code is run inside a Jupyter Notebook.

https://stackoverflow.com/questions/15411967/how-can-i-check-if-code-is-executed-in-the-ipython-notebook

class lvmopstools.utils.Trigger(n: int = 1, delay: float = 0)[source]

Bases: object

A trigger that can be set and reset and accepts setting thresholds.

This class is essentially just a flag that can take true/false values, but triggering the true value can be delayed by time or number or triggers.

Parameters:
  • n – The number of times the instance needs to be set before it is triggered.

  • delay – The delay in seconds before the instance is triggered. This is counted from the first time the instance is set, and is reset if the instance is reset. If n_triggers is greater than 1, both conditions must be met for the instance to be triggered.

is_set()[source]

Returns True if the trigger is set.

reset()[source]

Resets the trigger.

set()[source]

Sets the trigger.

Weather

async lvmopstools.weather.get_weather_data(start_time: str | float, end_time: str | float | None = None, station: Literal['dupont', 'swope', 'magellan'] = 'dupont')[source]

Returns a data frame with weather data from the du Pont station.

Warning

This function can only be run from inside the LVM network since it uses the LVM API which is not publicly accessible.

Parameters:
  • start_time – The start time of the query. Can be a UNIX timestamp or an ISO datetime string.

  • end_time – The end time of the query. Can be a UNIX timestamp or an ISO datetime string. Defaults to the current time.

  • station – The station to query. Must be one of ‘DuPont’, ‘C40’, or ‘Magellan’.

Returns:

A data frame with the weather data.

Return type:

weather_data

lvmopstools.weather.is_weather_data_safe(data: DataFrame, measurement: str, threshold: float, now: float | datetime | None = None, window: int = 30, rolling_average_window: int = 10, reopen_value: float | None = None)[source]

Determines whether an alert should be raised for a given weather measurement.

An alert will be issued if the rolling average value of the measurement (a column in data) over the last window minutes is above the threshold. Once the alert has been raised the value of the measurement must fall below the reopen_value to close the alert (defaults to the same threshold value) in a rolling.

If now is not provie the current time is used as the reference point.

window and rolling_average_window are in minutes.

Examples

>>> is_weather_data_safe(data, "wind_speed_avg", 35)
True
Returns:

A boolean indicating whether the measurement is safe. True means the measurement is in a valid, safe range.

Return type:

result

Devices

Spectrographs

async lvmopstools.devices.specs.exposure_etr() tuple[float | None, float | None][source]

Returns the ETR for the exposure, including readout.

async lvmopstools.devices.specs.spectrograph_mechanics(spec: Literal['sp1', 'sp2', 'sp3'] | None = None, ignore_errors: bool = True) dict[str, str | None][source]

Returns a dictionary of spectrograph shutter and hartmann door status.

Parameters:
  • spec – The spectrograph to retrieve the mechanics status for. If None, retrieves the status for all spectrographs.

  • ignore_errors – If True, ignores errors when retrieving the status and replaces the missing values with None. If False, raises an error if any of the status cannot be retrieved

async lvmopstools.devices.specs.spectrograph_pressures(spec: Literal['sp1', 'sp2', 'sp3'] | None = None, ignore_errors: bool = True) dict[str, float | None][source]

Returns a dictionary of spectrograph pressures.

Parameters:
  • spec – The spectrograph to retrieve the pressures for. If None, retrieves the pressures for all spectrographs.

  • ignore_errors – If True, ignores errors when retrieving the pressures and replaces the missing values with None. If False, raises an error if any of the pressures cannot be retrieved.

async lvmopstools.devices.specs.spectrograph_status() SpectrographStatusResponse[source]

Returns the status of the spectrographs.

lvmopstools.devices.specs.spectrograph_temperature_label(camera: str, sensor: str = 'ccd') str[source]

Returns the archon label associated with a temperature sensor.

async lvmopstools.devices.specs.spectrograph_temperatures(spec: Literal['sp1', 'sp2', 'sp3'] | None = None, ignore_errors: bool = True) dict[str, float | None][source]

Returns a dictionary of spectrograph temperatures.

Parameters:
  • spec – The spectrograph to retrieve the temperatures for. If None, retrieves the temperatures for all spectrographs.

  • ignore_errors – If True, ignores errors when retrieving the temperatures and replaces the missing values with None. If False, raises an error if any of the temperatures cannot be retrieved.

Returns:

A dictionary with the temperatures for each camera and sensor, e.g., {'r1_ln2': -184.1, 'r1_ccd': -120.3, ...}.

Return type:

dict

Ion Pumps

lvmopstools.devices.ion.ALL Flag to toggle all ion pumps.
async lvmopstools.devices.ion.read_ion_pumps(cameras: list[str] | None = None) dict[str, IonPumpDict][source]

Reads the signal and on/off status from an ion pump.

Parameters:

cameras – A list of cameras to read. If None, reads all cameras.

async lvmopstools.devices.ion.toggle_ion_pump(camera: str, on: bool)[source]

Turns the ion pump on or off.

Parameters:
  • camera – The camera for which to toggle the ion pump. Can also be ALL to toggle all ion pumps.

  • on – If True, turns the pump on. If False, turns the pump off. If None, toggles the pump current status.

NPS

async lvmopstools.devices.nps.read_nps(actors: str | Sequence[str] | None = None) dict[str, NPSStatus][source]

Returns the status of all NPS.

async lvmopstools.devices.nps.read_outlet(actor: str, outlet: str | int) NPSStatus[source]

Returns the status of a single NPS outlet.

AGs

async lvmopstools.devices.ags.power_cycle_ag_camera(camera: str, verbose: bool = False)[source]

Power cycles an AG camera either by resetting PoE or toggling the NPS.

Parameters:
  • camera – The name of the camera to power cycle.

  • verbose – If True, prints the commands that are being run.

Switch

lvmopstools.devices.switch.power_cycle_interface(interface: str, verbose: bool = False)[source]

Power cycles the PoE for a switch interface.

The environment variales LVM_SWITCH_HOST, LVM_SWITCH_USERNAME, LVM_SWITCH_PASSWORD, and LVM_SWITCH_SECRET must be set when calling this function.

Parameters:
  • interface – The switch interface to power cycle, e.g., "2/0/6".

  • verbose – If True, prints the commands that are being run.

lvmopstools.devices.switch.get_ag_poe_port_info(camera: str | None = None) dict[str, str][source]

Returns the PoE port information for a given camera.

Parameters:

camera – The name of the camera. If not provided, returns the information for all cameras.

Returns:

A dictionary with the PoE port information for the camera.

Return type:

dict

Thermistors

async lvmopstools.devices.thermistors.read_thermistors()[source]

Returns the thermistor values.

lvmopstools.devices.thermistors.channel_to_valve(reverse: Literal[False] = False) dict[int, str][source]
lvmopstools.devices.thermistors.channel_to_valve(reverse: Literal[True] = True) dict[str, int]

Returns a mapping of thermistor channels to valve names.

With reverse returns the inverse mapping, valve to device channel.