Skip to content

Anomaly handlers

cesnet_tszoo.utils.anomaly_handler

AnomalyHandler

Bases: ABC

Base class for anomaly handlers, used for handling anomalies in the data.

This class serves as the foundation for creating custom anomaly handlers. To implement a custom anomaly handler, this class is recommended to be subclassed and extended.

Example:

import numpy as np

class InterquartileRange(AnomalyHandler):

    def __init__(self):
        self.lower_bound = None
        self.upper_bound = None
        self.iqr = None

    def fit(self, data: np.ndarray) -> None:
        q25, q75 = np.percentile(data, [25, 75], axis=0)
        self.iqr = q75 - q25

        self.lower_bound = q25 - 1.5 * self.iqr
        self.upper_bound = q75 + 1.5 * self.iqr

    def transform_anomalies(self, data: np.ndarray) -> np.ndarray:
        mask_lower_outliers = data < self.lower_bound
        mask_upper_outliers = data > self.upper_bound

        data[mask_lower_outliers] = np.take(self.lower_bound, np.where(mask_lower_outliers)[1])
        data[mask_upper_outliers] = np.take(self.upper_bound, np.where(mask_upper_outliers)[1])
Source code in cesnet_tszoo\utils\anomaly_handler.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class AnomalyHandler(ABC):
    """
    Base class for anomaly handlers, used for handling anomalies in the data.

    This class serves as the foundation for creating custom anomaly handlers. To implement a custom anomaly handler, this class is recommended to be subclassed and extended.

    Example:

        import numpy as np

        class InterquartileRange(AnomalyHandler):

            def __init__(self):
                self.lower_bound = None
                self.upper_bound = None
                self.iqr = None

            def fit(self, data: np.ndarray) -> None:
                q25, q75 = np.percentile(data, [25, 75], axis=0)
                self.iqr = q75 - q25

                self.lower_bound = q25 - 1.5 * self.iqr
                self.upper_bound = q75 + 1.5 * self.iqr

            def transform_anomalies(self, data: np.ndarray) -> np.ndarray:
                mask_lower_outliers = data < self.lower_bound
                mask_upper_outliers = data > self.upper_bound

                data[mask_lower_outliers] = np.take(self.lower_bound, np.where(mask_lower_outliers)[1])
                data[mask_upper_outliers] = np.take(self.upper_bound, np.where(mask_upper_outliers)[1])

    """

    @abstractmethod
    def fit(self, data: np.ndarray) -> None:
        """
        Sets the anomaly handler values for a given time series part.

        This method must be implemented.

        Parameters:
            data: A numpy array representing data for a single time series with shape `(times, features)` excluding any identifiers.  
        """
        ...

    @abstractmethod
    def transform_anomalies(self, data: np.ndarray) -> np.ndarray:
        """
        Transforms anomalies the input data for a given time series part.

        This method must be implemented.
        Anomaly transformation is done in-place.

        Parameters:
            data: A numpy array representing data for a single time series with shape `(times, features)` excluding any identifiers.            
        """
        ...

fit abstractmethod

fit(data: ndarray) -> None

Sets the anomaly handler values for a given time series part.

This method must be implemented.

Parameters:

Name Type Description Default
data ndarray

A numpy array representing data for a single time series with shape (times, features) excluding any identifiers.

required
Source code in cesnet_tszoo\utils\anomaly_handler.py
43
44
45
46
47
48
49
50
51
52
53
@abstractmethod
def fit(self, data: np.ndarray) -> None:
    """
    Sets the anomaly handler values for a given time series part.

    This method must be implemented.

    Parameters:
        data: A numpy array representing data for a single time series with shape `(times, features)` excluding any identifiers.  
    """
    ...

transform_anomalies abstractmethod

transform_anomalies(data: ndarray) -> np.ndarray

Transforms anomalies the input data for a given time series part.

This method must be implemented. Anomaly transformation is done in-place.

Parameters:

Name Type Description Default
data ndarray

A numpy array representing data for a single time series with shape (times, features) excluding any identifiers.

required
Source code in cesnet_tszoo\utils\anomaly_handler.py
55
56
57
58
59
60
61
62
63
64
65
66
@abstractmethod
def transform_anomalies(self, data: np.ndarray) -> np.ndarray:
    """
    Transforms anomalies the input data for a given time series part.

    This method must be implemented.
    Anomaly transformation is done in-place.

    Parameters:
        data: A numpy array representing data for a single time series with shape `(times, features)` excluding any identifiers.            
    """
    ...

ZScore

Bases: AnomalyHandler

Fitting calculates mean and standard deviation of values used for fitting. Calculated mean and standard deviation calculated when fitting will be used for calculating z-score for every value and those with z-score over or below threshold (3) will be clipped to the threshold value.

Corresponds to enum AnomalyHandlerType.Z_SCORE or literal z-score.

Source code in cesnet_tszoo\utils\anomaly_handler.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class ZScore(AnomalyHandler):
    """
    Fitting calculates mean and standard deviation of values used for fitting. 
    Calculated mean and standard deviation calculated when fitting will be used for calculating z-score for every value and those with z-score over or below threshold (3) will be clipped to the threshold value.

    Corresponds to enum [`AnomalyHandlerType.Z_SCORE`][cesnet_tszoo.utils.enums.AnomalyHandlerType] or literal `z-score`.
    """

    def __init__(self):
        self.mean = None
        self.std = None
        self.threshold = 3

    def fit(self, data: np.ndarray) -> None:
        warnings.filterwarnings("ignore")
        self.mean = np.nanmean(data, axis=0)
        self.std = np.nanstd(data, axis=0)
        warnings.filterwarnings("always")

    def transform_anomalies(self, data: np.ndarray) -> np.ndarray:
        temp = data - self.mean
        z_score = np.divide(temp, self.std, out=np.zeros_like(temp, dtype=float), where=self.std != 0)
        mask_outliers = np.abs(z_score) > self.threshold

        clipped_values = self.mean + np.sign(z_score) * self.threshold * self.std

        data[mask_outliers] = clipped_values[mask_outliers]

InterquartileRange

Bases: AnomalyHandler

Fitting calculates 25th percentile, 75th percentile from the values used for fitting. From those percentiles the interquartile range, lower and upper bound will be calculated. Lower and upper bounds will then be used for detecting anomalies (values below lower bound or above upper bound). Anomalies will then be clipped to closest bound.

Corresponds to enum AnomalyHandlerType.INTERQUARTILE_RANGE or literal interquartile_range.

Source code in cesnet_tszoo\utils\anomaly_handler.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class InterquartileRange(AnomalyHandler):
    """
    Fitting calculates 25th percentile, 75th percentile from the values used for fitting. From those percentiles the interquartile range, lower and upper bound will be calculated.
    Lower and upper bounds will then be used for detecting anomalies (values below lower bound or above upper bound). Anomalies will then be clipped to closest bound.

    Corresponds to enum [`AnomalyHandlerType.INTERQUARTILE_RANGE`][cesnet_tszoo.utils.enums.AnomalyHandlerType] or literal `interquartile_range`.
    """

    def __init__(self):
        self.lower_bound = None
        self.upper_bound = None
        self.iqr = None

    def fit(self, data: np.ndarray) -> None:
        q25, q75 = np.percentile(data, [25, 75], axis=0)
        self.iqr = q75 - q25

        self.lower_bound = q25 - 1.5 * self.iqr
        self.upper_bound = q75 + 1.5 * self.iqr

    def transform_anomalies(self, data: np.ndarray) -> np.ndarray:
        mask_lower_outliers = data < self.lower_bound
        mask_upper_outliers = data > self.upper_bound

        data[mask_lower_outliers] = np.take(self.lower_bound, np.where(mask_lower_outliers)[1])
        data[mask_upper_outliers] = np.take(self.upper_bound, np.where(mask_upper_outliers)[1])