Source code for lvmopstools.influxdb

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2023-11-19
# @Filename: influxdb.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

#

from __future__ import annotations

import json
import os
import warnings

import polars


try:
    from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
    from influxdb_client.client.warnings import MissingPivotFunction
except ImportError:
    InfluxDBClientAsync = None
    MissingPivotFunction = None


__all__ = ["query_influxdb"]


[docs] async def query_influxdb( url: str, query: str, token: str | None = None, org: str | None = None, ) -> polars.DataFrame: """Runs a query in InfluxDB and returns a Polars dataframe.""" if not InfluxDBClientAsync or not MissingPivotFunction: raise ImportError( "influxdb-client is not installed. Install the influxdb or all extras." ) warnings.simplefilter("ignore", MissingPivotFunction) # noqa: F821 token = token or os.environ.get("INFLUXDB_V2_TOKEN") if token is None: raise ValueError("$INFLUXDB_V2_TOKEN not defined.") org = org or os.environ.get("INFLUXDB_V2_ORG") if org is None: raise ValueError("$INFLUXDB_V2_ORG not defined.") async with InfluxDBClientAsync(url=url, token=token, org=org) as client: if not (await client.ping()): raise RuntimeError("InfluxDB client failed to connect.") api = client.query_api() query_results = await api.query(query) df = polars.DataFrame(json.loads(query_results.to_json())) if len(df) > 0: df = df.with_columns(polars.col._time.cast(polars.Datetime("ms"))) return df