API#
Actor#
- exception lvmopstools.actor.CheckError(message: str = '', error_code: ErrorCodesBase | int = ErrorCodes.UNKNOWN)[source]#
Bases:
Exception
An exception raised when the
LVMActor
check fails.
- class lvmopstools.actor.ActorState(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
Bases:
Flag
Defines the possible states of the actor.
- class lvmopstools.actor.ErrorCodes(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)#
Bases:
ErrorCodesBase
- class lvmopstools.actor.ErrorCodesBase(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
Bases:
Enum
Enumeration of error codes
- classmethod get_error_code(error_code: int)[source]#
Returns the
ErrorCodes
that matches theerror_code
value.
- class lvmopstools.actor.ErrorData(code: int, critical: bool = False, description: str = '') None [source]#
Bases:
object
- class lvmopstools.actor.LVMActor(*args, check_interval: float = 30.0, restart_after: float | None = 300.0, restart_mode='reload', **kwargs)[source]#
Bases:
AMQPActor
Base class for LVM actors.
- async restart(mode: str | None = None)[source]#
Restarts the actor by killing the process.
- Parameters:
mode (
str
|None
) – How to restart the actor. Possible values are"exit"
, which will finish the process and let the supervisor restart it (for example a Kubernetes scheduler), and"reload"
which will stop and restart the actor without killing the process. IfNone
, defaults torestart_mode
.
Retrier#
- class lvmopstools.retrier.Retrier(max_attempts: int = 3, delay: float = 0.01, raise_on_max_attempts: bool = True, use_exponential_backoff: bool = True, exponential_backoff_base: float = 2.0, max_delay: float = 32.0) None [source]#
Bases:
object
A class that implements a retry mechanism.
The object returned by this class can be used to wrap a function that will be retried
max_attempts
times if it fails:def test_function(): ... retrier = Retrier(max_attempts=5) retrier(test_function)()
where the wrapped function can be a coroutine, in which case the wrapped function will also be a coroutine.
Most frequently this class will be used as a decorator:
@Retrier(max_attempts=4, delay=0.1) async def test_function(x, y): ... await test_function(1, 2)
- Parameters:
max_attempts (
int
) – The maximum number of attempts before giving up.delay (
float
) – The delay between attempts, in seconds.raise_on_max_attempts (
bool
) – Whether to raise an exception if the maximum number of attempts is reached. Otherwise the wrapped function will returnNone
after the last attempt.use_exponential_backoff (
bool
) – Whether to use exponential backoff for the delay between attempts. IfTrue
, the delay will bedelay + exponential_backoff_base ** attempt + random_ms
whererandom_ms
is a random number between 0 and 100 ms used to avoid synchronisation issues.exponential_backoff_base (
float
) – The base for the exponential backoff.max_delay (
float
) – The maximum delay between attempts when using exponential backoff.
Socket#
- class lvmopstools.socket.AsyncSocketHandler(host: str, port: int, timeout: float = 5, retry: bool = True, retrier_params: dict[str, ~typing.Any] = <factory>) None [source]#
Bases:
object
Handles a socket connection and disconnection.
Handles secure connection and disconnection to a TCP server and executed a callback. By default
Retrier
is used to retry the connection if it fails either during the connection phase or during callback execution.There are two ways to use this class. The first one is to create an instance and call it with a callback function which receives
StreamReader
andStreamWriter
argumentsasync def callback(reader, writer): ... handler = AsyncSocketHandler(host, port) await handler(callback)
Alternatively, you can subclass
AsyncSocketHandler
and override therequest
methodclass MyHandler(AsyncSocketHandler): async def request(self, reader, writer): ...
- Parameters:
host (
str
) – The host that is running the server.port (
int
) – The port on which the server is listening.timeout (
float
) – The timeout for connection and callback execution.retry (
bool
) – Whether to retry the connection/callback if they fails.retrier_params (
dict
[str
,Any
]) – Parameters to pass to theRetrier
instance.
- async request(reader: StreamReader, writer: StreamWriter)[source]#
Sends a request to the socket.
If the handler is not called with a callback function, this method must be overridden in a subclass. It receives the
StreamReader
andStreamWriter
client instances after a connection has been established.