API¶
Actor¶
- exception lvmopstools.actor.CheckError(message: str = '', error_code: ErrorCodesBase | int = ErrorCodes.UNKNOWN)[source]¶
Bases:
ExceptionAn exception raised when the
LVMActorcheck fails.
- class lvmopstools.actor.ActorState(*values)[source]¶
Bases:
FlagDefines the possible states of the actor.
- class lvmopstools.actor.ErrorCodes(*values)¶
Bases:
ErrorCodesBase
- class lvmopstools.actor.ErrorCodesBase(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
Bases:
EnumEnumeration of error codes
- classmethod get_error_code(error_code: int)[source]¶
Returns the
ErrorCodesthat matches theerror_codevalue.
- class lvmopstools.actor.ErrorData(code: 'int', critical: 'bool' = False, description: 'str' = '')[source]¶
Bases:
object
- class lvmopstools.actor.LVMActor(*args, check_interval: float = 30.0, restart_after: float | None = 300.0, restart_mode='reload', **kwargs)[source]¶
Bases:
AMQPActorBase class for LVM actors.
- async restart(mode: str | None = None)[source]¶
Restarts the actor by killing the process.
- Parameters:
mode – How to restart the actor. Possible values are
"exit", which will finish the process and let the supervisor restart it (for example a Kubernetes scheduler), and"reload"which will stop and restart the actor without killing the process. IfNone, defaults torestart_mode.
Ephemeris¶
- lvmopstools.ephemeris.sjd_ephemeris(sjd: int, twilight_horizon: float = -15) DataFrame[source]¶
Returns the ephemeris for a given SJD.
- lvmopstools.ephemeris.is_sun_up(include_twilight: bool = False)[source]¶
Determines whether the Sun is up at the current time.
- lvmopstools.ephemeris.create_schedule(start_sjd: int, end_sjd: int, twilight_horizon: float = -15) DataFrame[source]¶
Creates a schedule for the given time range.
- Parameters:
start_sjd – The initial SJD.
end_sjd – The final SJD of the schedule.
- Returns:
The schedule as a Polars dataframe.
- Return type:
schedule
InfluxDB¶
Notifications¶
- class lvmopstools.notifications.NotificationLevel(*values)[source]¶
Bases:
EnumAllowed notification levels.
- lvmopstools.notifications.send_critical_error_email(message: str, subject: str = 'LVM Critical Alert', recipients: Sequence[str] | None = None, from_address: str | None = None, email_reply_to: str | None = None, host: str | None = None, port: int | None = None, tls: bool | None = None, username: str | None = None, password: str | None = None, show_critical_error_preface: bool = True)[source]¶
Sends a critical error email.
- Parameters:
message – The message to send.
subject – The subject of the email.
recipients – The recipients of the email. A list of email addresses or
Noneto use the default recipients.from_address – The email address from which the email is sent.
host – The SMTP server host.
port – The SMTP server port.
tls – Whether to use TLS for authentication.
username – The SMTP server username.
password – The SMTP server password.
show_critical_error_preface – Whether to show a preface in the email indicating that this is a critical error notification.
- lvmopstools.notifications.send_email(message: str, subject: str, recipients: Sequence[str], from_address: str, *, html_message: str | None = None, email_reply_to: str | None = None, host: str | None = None, port: int | None = None, tls: bool | None = None, username: str | None = None, password: str | None = None)[source]¶
Sends an email.
- Parameters:
message – The plain text message to send.
subject – The subject of the email.
recipients – The recipients of the email.
from_address – The email address from which the email is sent.
html_message – The HTML message to send.
email_reply_to – The email address to which to reply. Defaults to the sender.
host – The SMTP server host.
port – The SMTP server port.
tls – Whether to use TLS for authentication.
username – The SMTP server username.
password – The SMTP server password.
- async lvmopstools.notifications.send_notification(message: str, level: NotificationLevel | str = NotificationLevel.INFO, slack: bool = True, slack_channels: str | Sequence[str] | None = None, email_on_critical: bool = True, slack_extra_params: dict[str, Any] = {}, email_params: dict[str, Any] = {})[source]¶
Creates a new notification.
- Parameters:
message – The message of the notification. Can be formatted in Markdown.
level – The level of the notification.
slack – Whether to send the notification to Slack.
slack_channels – The Slack channel where to send the notification. If not provided, the default channel is used. Can be set to false to disable sending the Slack notification.
email_on_critical – Whether to send an email if the notification level is
CRITICAL.slack_extra_params – A dictionary of extra parameters to pass to
post_message.email_params – A dictionary of extra parameters to pass to
send_critical_error_email.
- Returns:
The message that was sent.
- Return type:
message
PubSub¶
- class lvmopstools.pubsub.Publisher(*args, **kwargs)[source]¶
Bases:
BasePubSubA class to publish messages to a RabbitMQ exchange. A singleton.
- async publish(message: dict, routing_key: str | None = None)[source]¶
Publishes a message to the exchange.
- Parameters:
message – The message to publish. Must be a dictionary that will be encoded as a JSON string.
routing_key – The routing key to use. If not provided, uses the default routing key defined in the configuration.
- class lvmopstools.pubsub.Subscriber(connection_string: str | None = None, exchange_name: str | None = None, callback: Callable[[Message], Awaitable[None]] | None = None, queue_name: str | None = None)[source]¶
Bases:
BasePubSubA class to subscribe to messages from a RabbitMQ exchange.
- async connect(queue_name: str | None = None) Self[source]¶
Connects to the exchange, declares a queue, and binds the callback.
- Parameters:
queue_name – The name of the queue to declare. If not provided, a random name will be generated (recommended).
- async get(decode: Literal[True] = True) Message[source]¶
- async get(decode: Literal[False]) AbstractIncomingMessage
Gets the next message from the queue.
- async iterator(decode: Literal[True] = True) AsyncGenerator[Message, None][source]¶
- async iterator(decode: Literal[False] = False) AsyncGenerator[AbstractIncomingMessage, None]
Iterates over a queue and yields messages.
- async lvmopstools.pubsub.send_event(event: Event | str, payload: dict[str, Any] = {})[source]¶
Convenience function to publish an event to the exchange.
Retrier¶
- class lvmopstools.retrier.Retrier(max_attempts: int = 3, delay: float = 1, use_exponential_backoff: bool = True, exponential_backoff_base: float = 2, max_delay: float = 32.0, on_retry: ~typing.Callable[[Exception], None] | None = None, raise_on_exception_class: list[type[Exception]] = <factory>, timeout: float | None = None)[source]¶
Bases:
objectA class that implements a retry mechanism.
The object returned by this class can be used to wrap a function that will be retried
max_attemptstimes if it fails:def test_function(): ... retrier = Retrier(max_attempts=5) retrier(test_function)()
where the wrapped function can be a coroutine, in which case the wrapped function will also be a coroutine.
Most frequently this class will be used as a decorator:
@Retrier(max_attempts=4, delay=0.1) async def test_function(x, y): ... await test_function(1, 2)
- Parameters:
max_attempts (int) – The maximum number of attempts before giving up.
delay (float) – The delay between attempts, in seconds.
use_exponential_backoff (bool) – Whether to use exponential backoff for the delay between attempts. If
True, the delay will bedelay * exponential_backoff_base ** (attempt - 1) + random_mswhererandom_msis a random number between 0 and 100 ms used to avoid synchronisation issues.exponential_backoff_base (float) – The base for the exponential backoff.
max_delay (float) – The maximum delay between attempts when using exponential backoff.
on_retry (Callable[[Exception], None] | None) – A function that will be called when a retry is attempted. The function should accept an exception as its only argument.
raise_on_exception_class (list[type[Exception]]) – A list of exception classes that will cause an exception to be raised without retrying.
timeout (float | None) – If defined, each attempt can take at most this amount of time. If the attempt times out, an
asyncio.TimeoutErrorwill be raised. This only works if the wrapped function is a coroutine.
Slack¶
- async lvmopstools.slack.get_user_id(name: str)[source]¶
Gets the
userIDof the user display name matchesname.
- async lvmopstools.slack.get_user_list()[source]¶
Returns the list of users in the workspace.
This function is cached because Slack limits the requests for this route.
- async lvmopstools.slack.post_message(text: str | None = None, blocks: Sequence[dict] | None = None, channel: str | None = None, mentions: list[str] = [], username: str | None = None, icon_url: str | None = None, **kwargs)[source]¶
Posts a message to Slack.
- Parameters:
text – Plain text to send to the Slack channel.
blocks – A list of blocks to send to the Slack channel. These follow the Slack API format for blocks. Incompatible with
text.channel – The channel in the SSDS-V workspace where to send the message.
mentions – A list of users to mention in the message.
Socket¶
- class lvmopstools.socket.AsyncSocketHandler(host: str, port: int, timeout: float = 5, retry: bool = True, retrier_params: dict[str, ~typing.Any] = <factory>)[source]¶
Bases:
objectHandles a socket connection and disconnection.
Handles secure connection and disconnection to a TCP server and executed a callback. By default
Retrieris used to retry the connection if it fails either during the connection phase or during callback execution.There are two ways to use this class. The first one is to create an instance and call it with a callback function which receives
StreamReaderandStreamWriterargumentsasync def callback(reader, writer): ... handler = AsyncSocketHandler(host, port) await handler(callback)
Alternatively, you can subclass
AsyncSocketHandlerand override therequestmethodclass MyHandler(AsyncSocketHandler): async def request(self, reader, writer): ...
- Parameters:
host (str) – The host that is running the server.
port (int) – The port on which the server is listening.
timeout (float) – The timeout for connection and callback execution.
retry (bool) – Whether to retry the connection/callback if they fails.
retrier_params (dict[str, Any]) – Parameters to pass to the
Retrierinstance.
- async request(reader: StreamReader, writer: StreamWriter)[source]¶
Sends a request to the socket.
If the handler is not called with a callback function, this method must be overridden in a subclass. It receives the
StreamReaderandStreamWriterclient instances after a connection has been established.
Utils¶
- async lvmopstools.utils.get_amqp_client(**kwargs) AMQPClient[source]¶
Returns a CLU AMQP client.
- lvmopstools.utils.get_exception_data(exception: Exception | None, traceback_frame: int = 0)[source]¶
Returns a dictionary with information about an exception.
- async lvmopstools.utils.stop_event_loop(timeout: float | None = 5)[source]¶
Cancels all running tasks and stops the event loop.
- async lvmopstools.utils.with_timeout(coro: Future[T] | Coroutine[Any, Any, T], timeout: float | None, raise_on_timeout: bool = True) T | None[source]¶
Runs a coroutine with a timeout.
- Parameters:
coro – The coroutine to run.
timeout – The timeout in seconds.
raise_on_timeout – If
True, raises aasyncio.TimeoutErrorif the coroutine times out, otherwise returnsNone.
- Returns:
The result of the coroutine.
- Return type:
result
- Raises:
asyncio.TimeoutError – If the coroutine times out.
- lvmopstools.utils.is_notebook() bool[source]¶
Returns
Trueif the code is run inside a Jupyter Notebook.
- class lvmopstools.utils.Trigger(n: int = 1, delay: float = 0)[source]¶
Bases:
objectA trigger that can be set and reset and accepts setting thresholds.
This class is essentially just a flag that can take true/false values, but triggering the true value can be delayed by time or number or triggers.
- Parameters:
n – The number of times the instance needs to be set before it is triggered.
delay – The delay in seconds before the instance is triggered. This is counted from the first time the instance is set, and is reset if the instance is reset. If
n_triggersis greater than 1, both conditions must be met for the instance to be triggered.
Weather¶
- async lvmopstools.weather.get_weather_data(start_time: str | float, end_time: str | float | None = None, station: Literal['dupont', 'swope', 'magellan'] = 'dupont')[source]¶
Returns a data frame with weather data from the du Pont station.
Warning
This function can only be run from inside the LVM network since it uses the LVM API which is not publicly accessible.
- Parameters:
start_time – The start time of the query. Can be a UNIX timestamp or an ISO datetime string.
end_time – The end time of the query. Can be a UNIX timestamp or an ISO datetime string. Defaults to the current time.
station – The station to query. Must be one of ‘DuPont’, ‘C40’, or ‘Magellan’.
- Returns:
A data frame with the weather data.
- Return type:
weather_data
- lvmopstools.weather.is_weather_data_safe(data: DataFrame, measurement: str, threshold: float, now: float | datetime | None = None, window: int = 30, rolling_average_window: int = 10, reopen_value: float | None = None)[source]¶
Determines whether an alert should be raised for a given weather measurement.
An alert will be issued if the rolling average value of the
measurement(a column indata) over the lastwindowminutes is above thethreshold. Once the alert has been raised the value of themeasurementmust fall below thereopen_valueto close the alert (defaults to the samethresholdvalue) in a rolling.If
nowis not provie the current time is used as the reference point.windowandrolling_average_windoware in minutes.Examples
>>> is_weather_data_safe(data, "wind_speed_avg", 35) True
- Returns:
A boolean indicating whether the measurement is safe.
Truemeans the measurement is in a valid, safe range.- Return type:
result
Devices¶
Spectrographs¶
- async lvmopstools.devices.specs.exposure_etr() tuple[float | None, float | None][source]¶
Returns the ETR for the exposure, including readout.
- async lvmopstools.devices.specs.spectrograph_mechanics(spec: Literal['sp1', 'sp2', 'sp3'] | None = None, ignore_errors: bool = True) dict[str, str | None][source]¶
Returns a dictionary of spectrograph shutter and hartmann door status.
- Parameters:
- async lvmopstools.devices.specs.spectrograph_pressures(spec: Literal['sp1', 'sp2', 'sp3'] | None = None, ignore_errors: bool = True) dict[str, float | None][source]¶
Returns a dictionary of spectrograph pressures.
- Parameters:
- async lvmopstools.devices.specs.spectrograph_status() SpectrographStatusResponse[source]¶
Returns the status of the spectrographs.
- lvmopstools.devices.specs.spectrograph_temperature_label(camera: str, sensor: str = 'ccd') str[source]¶
Returns the archon label associated with a temperature sensor.
- async lvmopstools.devices.specs.spectrograph_temperatures(spec: Literal['sp1', 'sp2', 'sp3'] | None = None, ignore_errors: bool = True) dict[str, float | None][source]¶
Returns a dictionary of spectrograph temperatures.
- Parameters:
spec – The spectrograph to retrieve the temperatures for. If
None, retrieves the temperatures for all spectrographs.ignore_errors – If
True, ignores errors when retrieving the temperatures and replaces the missing values withNone. IfFalse, raises an error if any of the temperatures cannot be retrieved.
- Returns:
A dictionary with the temperatures for each camera and sensor, e.g.,
{'r1_ln2': -184.1, 'r1_ccd': -120.3, ...}.- Return type:
Ion Pumps¶
- lvmopstools.devices.ion.ALL Flag to toggle all ion pumps.¶
- async lvmopstools.devices.ion.read_ion_pumps(cameras: list[str] | None = None) dict[str, IonPumpDict][source]¶
Reads the signal and on/off status from an ion pump.
- Parameters:
cameras – A list of cameras to read. If
None, reads all cameras.
NPS¶
AGs¶
Switch¶
- lvmopstools.devices.switch.power_cycle_interface(interface: str, verbose: bool = False)[source]¶
Power cycles the PoE for a switch interface.
The environment variales
LVM_SWITCH_HOST,LVM_SWITCH_USERNAME,LVM_SWITCH_PASSWORD, andLVM_SWITCH_SECRETmust be set when calling this function.- Parameters:
interface – The switch interface to power cycle, e.g.,
"2/0/6".verbose – If
True, prints the commands that are being run.
- lvmopstools.devices.switch.get_ag_poe_port_info(camera: str | None = None) dict[str, str][source]¶
Returns the PoE port information for a given camera.
- Parameters:
camera – The name of the camera. If not provided, returns the information for all cameras.
- Returns:
A dictionary with the PoE port information for the camera.
- Return type:
Thermistors¶
- lvmopstools.devices.thermistors.channel_to_valve(reverse: Literal[False] = False) dict[int, str][source]¶
- lvmopstools.devices.thermistors.channel_to_valve(reverse: Literal[True] = True) dict[str, int]
Returns a mapping of thermistor channels to valve names.
With
reversereturns the inverse mapping, valve to device channel.