#!/usr/bin/env python
# -*- coding: utf8 -*-
from requests.auth import HTTPBasicAuth
import validators
import requests
from felicien.felits import FeliTS
REQUESTS_OPTIONS = {
"auth": (tuple, HTTPBasicAuth),
"verify": (bool, str),
"cert": (tuple, str),
"headers": (dict),
}
TSDB_TYPES = ["prometheus", "victoriametrics"]
API_TSDB_HEALTH = {
"prometheus": "api/v1/status/tsdb",
"victoriametrics": "api/v1/status/tsdb",
}
API_TSDB_QUERY = {
"prometheus": "api/v1/query",
"victoriametrics": "prometheus/api/v1/query",
}
API_TSDB_DELETE = {
"prometheus": "api/v1/admin/tsdb/delete_series",
"victoriametrics": "api/v1/admin/tsdb/delete_series",
}
API_TSDB_IMPORT = {
"prometheus": "api/v1/write",
"victoriametrics": "api/v1/import",
}
[docs]
class FeliConnector:
"""A connector to a TSDB such as Prometheus or VictoriaMetrics
This is an abstraction class to communicate with a Prometheus API'
compatible TSDB, via HTTP.
see official documentation:
https://prometheus.io/docs/prometheus/latest/querying/api/
https://docs.victoriametrics.com/url-examples/
Attributes:
- base_url: the URL of the TSDB
- tsdb: the type of TSDB: prometheus, victoriametrics
- _options: the requests options, such as auth or (m)TLS
"""
[docs]
def __init__(
self, url: str = None, tsdb: str = "prometheus", options: dict = {}
) -> None:
"""Initializes the instance based the access to a TSDB
Args:
url (str, optional): Base URL of the TSDB.
Defaults to None.
tsdb (str, optional): Type of TSDB. Can be "prometheus",
"victoriametrics".
Defaults to "prometheus"
options (dict, optional): Options to use with requests, such as
auth, TLS verification, client certificate, headers...
Raises:
ValueError if the url is not valid
ValueError if tsdb is not a valid type
ConnectionError if TSDB API is not reachable
KeyError if a option passed to requests is invalid
"""
if url is None or not validators.url(url):
raise ValueError("'url' is not a valid URL")
self.base_url = url
if tsdb not in TSDB_TYPES:
raise ValueError(
f"invalid tsdb type. Options: {', '.join(TSDB_TYPES)}"
)
self.tsdb = tsdb
self._options = dict()
if options:
for k, v in options.items():
if k in REQUESTS_OPTIONS.keys() and isinstance(
v, REQUESTS_OPTIONS[k] # type: ignore
):
self._options[k] = v
else:
raise KeyError(f"{k} is an invalid option for requests")
# test connection to the TSDB
r = requests.get(
f"{self.base_url}/{API_TSDB_HEALTH[self.tsdb]}",
**self._options, # type: ignore
timeout=60,
)
if not (r.status_code == 200 and r.json().get("status") == "success"):
raise ConnectionError(f"unable to reach TSDB API: {r.status_code}")
def __repr__(self) -> str:
return f"FeliConnector([{self.tsdb}]{{{self.base_url}}})"
[docs]
def get_timeserie(self, metric: str = None) -> FeliTS:
"""Retrieve a timeserie from the TSDB
Args:
metric (str, optional): metric of the timeserie, expressed in
PromQL. Defaults to None.
Returns:
FeliTS: Timeserie of the metric
Raises:
ConnectionError if query status code is not HTTP/200
OverflowError if the result is more than one timeserie
ValueError if the result is empty
TypeError if the result is not a vector (range or instant)
"""
payload = {"query": metric}
r = requests.get(
f"{self.base_url}/{API_TSDB_QUERY[self.tsdb]}",
**self._options, # type: ignore
params=payload,
timeout=60,
)
if not (r.status_code == 200 and r.json().get("status") == "success"):
raise ConnectionError(f"unable to get timeserie: {r.status_code}")
# connection is successful
if len(r.json().get("data", {}).get("result")) > 1:
raise OverflowError("query returns more than one timeserie")
elif len(r.json().get("data", {}).get("result")) == 0:
raise ValueError("query returned no timeserie")
# result length is correct
if r.json().get("data", {}).get("resultType") in [
"matrix",
"vector",
]:
return FeliTS(
from_prom=r.json().get("data", {}).get("result", [])[0]
)
else:
raise TypeError(
"query result can be only range vectors or instant vectors"
)
[docs]
def delete_timeserie(self, metric: str) -> bool:
"""Delete a timeserie in the TSDB
Args:
metric (str): metric of the timeserie, expressed in PromQL.
Returns:
bool: wether the deletion is completed
Raises:
ConnectionError if query status code is not HTTP/204
"""
payload = {"match[]": metric}
method = "GET" if self.tsdb == "victoriametrics" else "POST"
r = requests.request(
method=method,
url=f"{self.base_url}/{API_TSDB_DELETE[self.tsdb]}",
params=payload,
**self._options, # type: ignore
timeout=60,
)
if not r.status_code == 204:
raise ConnectionError(
f"unable to delete timeserie: {r.status_code}"
)
return True
[docs]
def import_timeserie(self, ts: FeliTS) -> bool:
"""Import a timeserie into the TSDB
Args:
ts (FeliTS): the timeserie.
Returns:
bool: wether the import is completed
Raises:
ConnectionError if query status code is not HTTP/200
"""
ts_format = ""
if self.tsdb == "prometheus":
ts_format = "s"
elif self.tsdb == "victoriametrics":
ts_format = "ms"
payload = ts.as_prometheus(timestamp_format=ts_format)
r = requests.post(
url=f"{self.base_url}/{API_TSDB_IMPORT[self.tsdb]}",
data=payload,
**self._options, # type: ignore
timeout=60,
)
if r.status_code not in [200, 204]:
raise ConnectionError(
f"unable to import timeserie: {r.status_code}"
)
return True