Source code for felicien.felits

#!/usr/bin/env python
# -*- coding: utf8 -*-

import pandas as pd
import itertools
import datetime as dt
import matplotlib.pyplot as plt
import json


# Smallest frequency should be seen at least 10% of the Serie to be considered
# as the Serie frequency
MIN_FREQUENCY_REPRESENTATION = 0.1


[docs] class FeliTS: """A Timeserie of a Prometheus metric This is a metric representation as returned by the Prometheus API. It includes the metric definition, and the data as a pandas Series. see official documentation: https://prometheus.io/docs/prometheus/latest/querying/api/#expression-query-result-formats Attributes: - name: A string with the name of the metric - labels: A dict of labels of the metric - data: A pandas.Series with the timeserie """
[docs] def __init__( self, from_prom: dict = None, name: str = None, labels: dict = {}, values: pd.Series = None, ) -> None: """Initializes the instance based on the data from Prometheus API Args: from_prom (dict, optional): Query result data from Prometheus API. name (str, optional): Name of the metric labels (dict, optional): Labels of the metric values (pandas Series, optional): Values and their timestamp of the timeserie as the Index Raises: AttributeError if the metric has no __name__ AttributeError if the metric has no value (or values) ValueError if the value list is empty ValueError if a item of the value list hasn't the right format: [timestamp, metric_value] AttributeError if neither an output from Prometheus API nor raw data are passed to the constructor """ if from_prom is not None: # Construct from Prometheus API output self.name = from_prom.get("metric", {}).get("__name__", "") if self.name == "": raise AttributeError("missing metric __name__") self.labels = dict() for label, value in from_prom.get("metric", {}).items(): if label != "__name__": self.labels[label] = value _data = list() _index = list() if from_prom.get("value") is not None: if ( not isinstance(from_prom.get("value"), list) or len(from_prom.get("value", [])) != 2 ): raise ValueError( f"metric value is not right {from_prom.get('value')}. " f"It should be an array with a timestamp and a value." ) _index.append(from_prom.get("value", [])[0]) _data.append(float(from_prom.get("value", [])[1])) elif from_prom.get("values") is not None: for value in from_prom.get("values", []): if not isinstance(value, list) or len(value) != 2: raise ValueError( f"metric value is not as expected {value}" ) _index.append(value[0]) _data.append(float(value[1])) else: raise AttributeError("missing metric value(s)") if len(_data) == 0 or len(_index) == 0: raise ValueError("metric value can't be empty") self.data = pd.Series( data=_data, index=pd.to_datetime(_index, unit="s") ) elif name is not None: # Construct from raw data self.name = name if self.name == "": raise AttributeError("missing metric __name__") self.labels = labels if values is None: raise AttributeError("missing metric value(s)") if values.size == 0: raise ValueError("metric value can't be empty") self.data = values else: # Construct from nothing raise AttributeError("missing data to construct FeliTS")
def __repr__(self) -> str: return ( f"FeliTS({self.name}{{{self.labels_string}}}, " f"{self.size} datapoints)" ) @property def labels_string(self) -> str: """The labels as a string, as Prometheus would represent it Returns: str: all the labels as a key-value list, separated with commas """ _labels = list() if hasattr(self, "labels") and self.labels is not None: for k, v in self.labels.items(): _labels.append(f'{k}="{v}"') return ", ".join(_labels) @property def frequency(self) -> dt.timedelta: """Expose the main frequency in the timeseries. In case there are multiple frequencies, the most frequent is returned. Returns: dt.timedelta: the duration between 2 data points or None for single value serie """ if self.data.size <= 1: return dt.timedelta() # round the timestamp to the second, then calculate time delta between # every 2 points, then count all the different deltas, and count # results frequencies = ( self.data.index.floor("s").diff().value_counts() # type: ignore ) if frequencies.size == 1: # only one frequency: return it return frequencies.idxmax() elif frequencies.size > 1: # multiple frequencies: return the lowest one that is occuring # more than 10% of the time for i in range(frequencies.size): if (frequencies / self.data.size).sort_index().iloc[ i ] > MIN_FREQUENCY_REPRESENTATION: return frequencies.sort_index()[i:].idxmax() # all other cases seem wrong return dt.timedelta() @property def size(self) -> int: """Expose the size to the timeseries Returns: int: Size of the timeseries """ return self.data.size
[docs] def as_prometheus(self, timestamp_format: str = "s") -> str: """Object representation based on Prometheus API format Args: timestamp_format (str, optional): Format of the timestamps. Could be 's' for seconds or 'ms' for milliseconds. Defaults to 's'. Returns: str: JSON representation of the object, as you could push it to Prometheus """ return json.dumps(self.as_dict(timestamp_format=timestamp_format))
[docs] def as_dict(self, timestamp_format: str = "s") -> dict: """Object representation as a Dictionary Args: timestamp_format (str, optional): Format of the timestamps. Could be 's' for seconds or 'ms' for milliseconds. Defaults to 's'. Returns: dict: representation of the object Raises: ValueError if the timestamp_format argument is not "s" or "ms" """ result = dict() if timestamp_format == "s": timestamp_factor = 1 elif timestamp_format == "ms": timestamp_factor = 1000 else: raise ValueError("timestamp_format should be 's' or 'ms'") result["metric"] = {"__name__": self.name} for k, v in self.labels.items(): result["metric"][k] = v result["values"] = self.data.to_list() # type: ignore result["timestamps"] = ( ( pd.Series(data=self.data.index) - dt.datetime(1970, 1, 1) ) # type: ignore .dt.total_seconds() .apply(lambda x: x * timestamp_factor) .astype(int) .to_list() ) return result
[docs] def as_dataframe(self, name: str = "") -> pd.DataFrame: """self.data representation as a pandas.DataFrame Args: name (str, optional): Name of the column for the Serie in the resulting DataFrame. Defaults to self.name. Returns: pandas.DataFrame: The self.data, as a pandas.DataFrame """ colname = self.name if name == "" else name return self.data.to_frame(name=colname)
[docs] def trim_by_date( self, boundary: dt.datetime = None, keep: str = "right", inplace: bool = False, ) -> pd.Series: """Trim the timeseries by date Args: boundary (dt.datetime, optional): Limit on which triming the timeserie. Defaults to None. keep (str, optional): Which part of the timeseries to keep. Defaults to right. inplace (bool, optional): Control if the trim should be applied to the current object, or just get the trimmed timeserie. Defaults to False. Returns: pd.Series: The trimmed timeseries Raises: ValueError if the keep argument is not "left" or "right" """ if keep == "right": criteria = self.data.index < boundary elif keep == "left": criteria = self.data.index > boundary else: raise ValueError("keep should be 'left' or 'right'") if inplace: self.data = self.data[~(criteria)] return self.data return self.data[~(criteria)]
[docs] def trim_by_size( self, boundary: int = 0, keep: str = "right", inplace: bool = False, ) -> pd.Series: """Trim the timeseries by size Args: boundary (int, optional): Size of the trimmed timeserie. If the boundary is 0, keep the whole timeserie. Defaults to 0. keep (str, optional): Which part of the timeseries to keep. Defaults to right. inplace (bool, optional): Control if the trim should be applied to the current object, or just get the trimmed timeserie. Defaults to False. Returns: pd.Series: The trimmed timeseries Raises: ValueError if the keep argument is not "left" or "right" """ if boundary > self.data.size or boundary == 0: return self.data if keep == "right": if inplace: self.data = self.data[-boundary:] return self.data[-boundary:] elif keep == "left": if inplace: self.data = self.data[:boundary] return self.data[:boundary] else: raise ValueError("keep should be 'left' or 'right'")
[docs] def plot(self) -> None: """Plot a timeserie""" plt.plot( self.data.index.to_list(), self.data.to_list(), label=self.name, linestyle="solid", ) plt.xticks(rotation=60, fontsize=10) plt.title(f"{self.name}{{{self.labels_string}}}") plt.show() plt.close()
[docs] def normalize(self, inplace: bool = False) -> pd.Series: """Normalize the timeserie, filling missing points with NaN values but making sure the index respect the frequency. Important: points not aligned on the frequency will be dropped, while missing points on frequency will be added as NaN Args: inplace (bool, optional): Control if the trim should be applied to the current object, or just get the trimmed timeserie. Defaults to False. Returns: pd.Series: The normalized timeserie Raises: ValueError if the timeserie has no frequency (such as single point timeserie) """ if self.frequency == dt.timedelta(): raise ValueError("Can't determine frequency") if inplace: self.data = self.data.asfreq(freq=self.frequency) return self.data return self.data.asfreq(freq=self.frequency)
[docs] def longest_continuous_segment(self, position: str = "last") -> pd.Series: """Extract the longest continuous segment, which is the longest segment of the timeserie respecting the frequency, without any missing point DEPRECATED: use `continuous_segment` function instead Args: position (str, optional): Which longest segment to return, in case multiple segment exist Defaults to last. Returns: pd.Series: the extract of the timeserie Raises: ValueError if the position argument is not "first" or "last" """ # validate arguments if position not in ["first", "last"]: raise ValueError( f"position argument must be 'first' or 'last', not {position}" ) return self.continuous_segment(position=position, longest=True)
[docs] def continuous_segment( self, position: str = "last", longest: bool = False ) -> pd.Series: """Extract continuous segment from the timeserie respecting the frequency, without any missing point. This segment can be the longest, or any length, but it has to be eother the first segment or the last from the timeserie. Args: position (str, optional): Which longest segment to return, in case multiple segment exist Defaults to last. longest (bool, optional): Should the segment be the longest Defaults to False. Returns: pd.Series: the extract of the timeserie Raises: ValueError if the position argument is not "first" or "last" """ # validate arguments if position not in ["first", "last"]: raise ValueError( f"position argument must be 'first' or 'last', not {position}" ) # case of series with maximum 2 points if self.size <= 2: return self.data # create an array stating if two points are separated by # exactly 1 frequency segments = self.data.index.diff() == self.frequency # type: ignore # estimate the size of the longest segment matching the frequency longest_length = max( len(list(y)) if is_freq else 0 for (is_freq, y) in itertools.groupby(segments) ) # finding the position of the longest segment (first or last) start_position = 0 end_position = 0 cursor = 0 for is_freq, segment in itertools.groupby(segments): local_segment = list(segment) # case where the frequency matches and the length of # the segment is the longest if is_freq and ( (longest and len(local_segment) == longest_length) or (not longest) ): start_position = cursor end_position = start_position + len(local_segment) # in case we found the first longest segment if position == "first": break # move current cursor along the serie cursor += len(local_segment) return self.data.iloc[start_position - 1 : end_position] # noqa E203