API#

Actor#

exception lvmopstools.actor.CheckError(message: str = '', error_code: ErrorCodesBase | int = ErrorCodes.UNKNOWN)[source]#

Bases: Exception

An exception raised when the LVMActor check fails.

class lvmopstools.actor.ActorState(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: Flag

Defines the possible states of the actor.

get_state_names()[source]#

Returns the state names that are set.

class lvmopstools.actor.ErrorCodes(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)#

Bases: ErrorCodesBase

class lvmopstools.actor.ErrorCodesBase(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: Enum

Enumeration of error codes

classmethod get_error_code(error_code: int)[source]#

Returns the ErrorCodes that matches the error_code value.

class lvmopstools.actor.ErrorData(code: int, critical: bool = False, description: str = '') None[source]#

Bases: object

class lvmopstools.actor.LVMActor(*args, check_interval: float = 30.0, restart_after: float | None = 300.0, restart_mode='reload', **kwargs)[source]#

Bases: AMQPActor

Base class for LVM actors.

is_ready()[source]#

Returns True if the actor is ready.

async restart(mode: str | None = None)[source]#

Restarts the actor by killing the process.

Parameters:

mode (str | None) – How to restart the actor. Possible values are "exit", which will finish the process and let the supervisor restart it (for example a Kubernetes scheduler), and "reload" which will stop and restart the actor without killing the process. If None, defaults to restart_mode.

async start()[source]#

Starts the actor.

async stop()[source]#

Stops the actor.

async troubleshoot(error_code: ErrorCodesBase = ErrorCodes.UNKNOWN, exception: Exception | None = None, traceback_frame: int = 0)[source]#

Handles troubleshooting.

update_state(state: ActorState, error_data: dict[str, Any] | None = None, command: Command | None = None, internal: bool = True)[source]#

Updates the state and broadcasts the change.

lvmopstools.actor.create_error_codes(error_codes: dict[str, tuple | list | ErrorData], name: str = 'ErrorCodes', include_unknown: bool = True) Any[source]#
Return type:

Any

Creates an enumeration of error codes.

Retrier#

class lvmopstools.retrier.Retrier(max_attempts: int = 3, delay: float = 0.01, raise_on_max_attempts: bool = True, use_exponential_backoff: bool = True, exponential_backoff_base: float = 2.0, max_delay: float = 32.0) None[source]#

Bases: object

A class that implements a retry mechanism.

The object returned by this class can be used to wrap a function that will be retried max_attempts times if it fails:

def test_function():
    ...

retrier = Retrier(max_attempts=5)
retrier(test_function)()

where the wrapped function can be a coroutine, in which case the wrapped function will also be a coroutine.

Most frequently this class will be used as a decorator:

@Retrier(max_attempts=4, delay=0.1)
async def test_function(x, y):
    ...

await test_function(1, 2)
Parameters:
  • max_attempts (int) – The maximum number of attempts before giving up.

  • delay (float) – The delay between attempts, in seconds.

  • raise_on_max_attempts (bool) – Whether to raise an exception if the maximum number of attempts is reached. Otherwise the wrapped function will return None after the last attempt.

  • use_exponential_backoff (bool) – Whether to use exponential backoff for the delay between attempts. If True, the delay will be delay + exponential_backoff_base ** attempt + random_ms where random_ms is a random number between 0 and 100 ms used to avoid synchronisation issues.

  • exponential_backoff_base (float) – The base for the exponential backoff.

  • max_delay (float) – The maximum delay between attempts when using exponential backoff.

calculate_delay(attempt: int) float[source]#
Return type:

float

Calculates the delay for a given attempt.

Socket#

class lvmopstools.socket.AsyncSocketHandler(host: str, port: int, timeout: float = 5, retry: bool = True, retrier_params: dict[str, ~typing.Any] = <factory>) None[source]#

Bases: object

Handles a socket connection and disconnection.

Handles secure connection and disconnection to a TCP server and executed a callback. By default Retrier is used to retry the connection if it fails either during the connection phase or during callback execution.

There are two ways to use this class. The first one is to create an instance and call it with a callback function which receives StreamReader and StreamWriter arguments

async def callback(reader, writer):
    ...

handler = AsyncSocketHandler(host, port)
await handler(callback)

Alternatively, you can subclass AsyncSocketHandler and override the request method

class MyHandler(AsyncSocketHandler):
    async def request(self, reader, writer):
        ...
Parameters:
  • host (str) – The host that is running the server.

  • port (int) – The port on which the server is listening.

  • timeout (float) – The timeout for connection and callback execution.

  • retry (bool) – Whether to retry the connection/callback if they fails.

  • retrier_params (dict[str, Any]) – Parameters to pass to the Retrier instance.

async request(reader: StreamReader, writer: StreamWriter)[source]#

Sends a request to the socket.

If the handler is not called with a callback function, this method must be overridden in a subclass. It receives the StreamReader and StreamWriter client instances after a connection has been established.