from __future__ import print_function import numpy as np from util.constants import LEVEL_DRIFT_THRESHOLD, WINDOW_LENGTH from util.data_prepare import snd def _merge_close_drifts(drifts, test_stats, close): """Merge close drift points into one For a block of close drifts, take the one with the largest value of test_stats Args: drifts (list or array of int): drift points test_stats (np array): test_stats of drift points close (int): distance betwen close drift points Returns: np array of int: drift points with close drifts merged into one """ rslt = [] pre_drift = -1 max_idx = -1 max_val = -1.0 for drift in drifts: if drift - pre_drift > close: if max_idx > 0: rslt.append(max_idx) max_val = test_stats[drift] max_idx = drift else: if test_stats[drift] > max_val: max_val = test_stats[drift] max_idx = drift pre_drift = drift if max_idx > 0: rslt.append(max_idx) return np.array(rslt) def window_method(signal, threshold=LEVEL_DRIFT_THRESHOLD, win_len=WINDOW_LENGTH): """Level drift detection on KPI time series using window method The test statistic is SND normalized and smoothed version of the effect size of mean difference between left window and right window at a time point Args: signal (pandas Series): input KPI time series threshold (float, optional): threshold on the test statistic. Defaults to LEVEL_DRIFT_THRESHOLD. win_len (int, optional): the length of the window for level drift detection. Defaults to WINDOW_LENGTH. Returns: (np array, np array): (level drift points, test statistic) """ in_len = len(signal) if in_len <= 2 * win_len: print(f'input too short ({in_len}), skip level drift detection.') return [in_len], None rolling = signal.rolling(win_len) means = rolling.mean() stds = rolling.std() means_shifted = means.shift(-win_len) stds_shifted = stds.shift(-win_len) test_stats = abs(means - means_shifted) / (stds + stds_shifted) test_stats = snd(test_stats.fillna(test_stats.min()).to_numpy(), 3) rslt = np.where(test_stats > threshold)[0] if len(rslt) > 1 : # consolidate continuous drifts into one rslt = _merge_close_drifts(rslt, test_stats, 1) if len(rslt) > 0 and rslt[-1] == in_len - 1: # Remove the last level drift point if it equals in_len - 1, # b/c it will cause numerical error downstream in the SciPy spline API by passing an array of length 1. rslt = rslt[:-1] return np.concatenate((rslt, [in_len])), test_stats