import numpy as np import time from scipy.interpolate import UnivariateSpline from util.constants import ( FEATURE_SCALED, SMOOTH_FACTOR, LOWESS_POINTS, THRESHOLD_SMALL_DRIFT, THRESHOLD_TEMPORARY_DRIFT_LENGTH, THRESHOLD_TEMPORARY_DRIFT, NO_DRIFT, DRIFTED, DRIFT_PART, DRIFT_DIRECTION_BOTH, DRIFT_DIRECTION_UP, DRIFT_DIRECTION_DOWN, DEFAULT_THRESHOLD, ZERO_LEVEL_TOLERANCE ) from algo.level_drift_detection import window_method from util.data_prepare import snd, percent_change from util.csc_output import SegmentInfo, TREND_DRIFT_TYPE, LEVEL_DRIFT_TYPE from logger import get_logger from six.moves import range, zip logger = get_logger() def _smooth_factor(y, weights=None, smooth_factor=SMOOTH_FACTOR): """Calculate the smooth factor of the smoothing spline fitting, as: The weighted square sum of input time series' 1st order difference, multipled by a constant factor. Args: y (np array): input time series weights (np array, optional): weights on input data points. Defaults to None. smooth_factor (float, optional): constant multiplier. Defaults to SMOOTH_FACTOR. Returns: float: smooth factor """ if weights is not None: return np.dot( np.square(np.diff(y)), weights[1:] ) * smooth_factor return np.square(np.diff(y)).sum() * smooth_factor def snd_deviation_and_drift(series, lowess_points=LOWESS_POINTS, ignore_level_drift=False): """Calculate the SND deviation and level drift, for trend detection with PLA model fitting The SND will be used to derive the weights on data points while fitting PLA models, calculated as w = 1/(snd + 1). By difinition, SND is non-negaitive and outliers have large snd values. Consequently, outliers will have small weights, and weights of other data points will be closer to 1. The SND is not effective when the underlying assumption of a stable baseeline is not valid. To handle the case of level drifts, the SND needs to be calculated per each segment of the input time series partitioned at level drift points. Args: series (pandas Series): input time series lowess_points (int, optional): points of LOWESS smoothing. Defaults to LOWESS_POINTS. ignore_level_drift (bool, optional): whether or not to ignore level drifts. Defaults to False. Returns: (pandas Series, np array, np array): ( snd, drift points as int indexes, test statistic for level drift detection) """ if ignore_level_drift: return snd(series.values, lowess_points=lowess_points), np.array([len(series)]), None drifts, test_stats = window_method(series) return snd(series.values, lowess_points=lowess_points, drifts=drifts), drifts, test_stats def _pla_info(y_est, knots, idx_init=0): """Generate summary of the PLA model fitting result on a time series, or part of a time series partitioned by level drift points Args: y_est (list of np array): list of linear approximation of input partitioned by knots knots (np array of int): knots of piecewise linear approximation idx_init (int, optional): the index of the start point, in the case of multipe PLA model fitting partitioned by leve drifts. Defaults to 0 Returns: list of SegmentInfo: SegmentInfo per each segment """ seg_info = [] left = 0 for knot in knots[1:]: seg_info.append(SegmentInfo( idx_start = left + idx_init, idx_end = knot + idx_init, val_start = y_est[left], val_end = y_est[knot], )) left = knot return seg_info def fit_pla(y, weights=None, drifts=None, feature_scaled=FEATURE_SCALED): """Fit continuous PLA model, as degree-1 smoothing spline fitting Weights on input data points is used for the soft-thresholding method that avoids PLA model fitting to outliers in input time series. Level drift info is integrated to fit PLA models separately on segments partitioned by level drift points. Otherwise, the PLA model will be over stretched by adding multiple knots around drift points, resulted in a less accurate description of the overall trend/drift pattern of the input time series. Args: y (np array): input time series weights (np array, optional): weights on input data points. Defaults to None. drifts (list of int, optional): list of level drift points. Defaults to None. Returns: (list of np array, list of int np array, list of SegmentInfo): ( list of PLA approximation of segments partitioned by level drifts, list of knots of segments partitioned by level drifts, list of SegmentInfo of segments partitioned by level drifts or PLA knots ) """ if weights is None: weights = np.ones_like(y) if drifts is None: drifts =[0, len(y)] y_est = [] knots = [] for i in range(1, len(drifts)): # per each segment partitioned by level drift point, fit a PLA model separately left = drifts[i-1] right = drifts[i] x = np.array(list(range(right - left))) y_part = y[left:right] s = _smooth_factor(y_part, weights[left:right]) pla = UnivariateSpline( x, y_part, k=1, # degree-1 spline for continuous PLA s=s, w=weights[left:right] ) # apply the fitted PLA model to get the PLA approximation y_est_part = pla(x) # Use the feature value's PLA approximation at the beginning of the look-back period # to scale the feature's PLA approximation if feature_scaled and i == 1: baseline = y_est_part[0] if feature_scaled: y_est_part /= baseline y_est.append(y_est_part) knots.append(pla.get_knots().astype(int)) seg_info = [] seg_info +=_pla_info(y_est[0], knots[0]) if len(y_est) > 1: y_est_pre = y_est[0][-1] for i, (y_est_part, knots_part) in enumerate(zip(y_est[1:], knots[1:])): drift = drifts[i + 1] # add level drift point to output as zero-length segment seg_info.append(SegmentInfo( idx_start = drift, idx_end = drift, val_start = y_est_pre, val_end = y_est_part[0] )) seg_info += _pla_info(y_est_part, knots_part, idx_init=drift) y_est_pre = y_est_part[-1] return y_est, knots, seg_info, 0 if not feature_scaled else baseline def is_threshold_exceeded(baseline_list, val, threshold): """Compare a value versus a list of baseline values, return the index of the last baseline that the threshold is exceeded, return -1 if none of the comparisons exceeds the threshold Args: baseline_list (list of float): a list of baseline values val (float): the value to compare vs baselines threshold (int): the percent-change threshold Returns: int: the index of the last baseline that the threshold is exceeded """ comparison = [abs(percent_change(b, val)) >= threshold for b in baseline_list] idx_list = [i for i, x in enumerate(comparison) if x] if len(idx_list) == 0: return -1 else: return idx_list[-1] def calc_trend_threshold_time(seg, baseline, threshold): if baseline == seg.val_start: # one segment drift seg_percent_change = percent_change(seg.val_start, seg.val_end) return seg.idx_start + int(seg.length() * threshold / abs(seg_percent_change)) # accumulated drift if seg.val_end > seg.val_start : # upward trend if abs(baseline) < ZERO_LEVEL_TOLERANCE: threshold_val = threshold / 100.0 else: threshold_val = baseline + baseline * threshold / 100 else: # downward trend threshold_val = baseline - baseline * threshold / 100 return seg.idx_start + int(seg.length() * (threshold_val - seg.val_start) / (seg.val_end - seg.val_start)) def calc_and_update_drift_flag(seg_info, threshold): """Calculate the drift flag for list of segments, update the drift flag in place Handle both simple drifts and accumulated drifts Args: seg_info (list of SegmentInfo): summary of the result of the PLA model fitting threshold (int): threshold for binary decision of whether KPI drifted or not """ baselines = [] for i, seg in enumerate(seg_info): baselines.append(seg.val_start) exceeded_idx = is_threshold_exceeded(baselines, seg.val_end, threshold) if exceeded_idx >= 0: # number of contributing DRIFT_PART segments # should be zero if exceeded_idx refer to the last item of baselines cnt_drift_parts = len(baselines) - 1 - exceeded_idx # set the DRIFT_PART flag for contributing segments for j in range(i - cnt_drift_parts, i): seg_info[j].part_or_whole = DRIFT_PART # set the DRIFTED flag for the segment that change exceeds threshold seg.part_or_whole = DRIFTED if seg.length() == 0: # level drift seg.idx_threshold = seg.idx_end else: # calculate the exact threshold_time in the middle of a trend segment seg.idx_threshold = calc_trend_threshold_time(seg, baselines[exceeded_idx], threshold) # reset the baseline to evaluate the next possible drift baselines = [] def filter_output_segment(output_items, threshold_direction, drop_no_drift_segment=True): """Filter output segments by threshold direction, and possibly drop NO_DRIFT segments Args: output_items (list of SegmentInfo): un-filtered output items threshold_direction (str): the direction direction drop_no_drift_segment (bool, optional): whether or not drop NO_DRIFT segments from output. Defaults to True. Returns: list of SegmentInfo: filtered output items """ if threshold_direction == DRIFT_DIRECTION_BOTH: # filter out NO_DRIFT segments return [r for r in output_items if r.part_or_whole != NO_DRIFT] if drop_no_drift_segment else output_items def _direction_flag(direction): if direction == DRIFT_DIRECTION_UP: return 1 else: return -1 n_outputItems = len(output_items) direction_flag = np.zeros(n_outputItems, dtype=int) idx_left = -1 for i in range(n_outputItems): if output_items[i].part_or_whole == DRIFT_PART and idx_left < 0: idx_left = i if output_items[i].part_or_whole == DRIFTED: if idx_left < 0: val_start = output_items[i].val_start else: val_start = output_items[idx_left].val_start if output_items[i].val_end > val_start: direction_flag[idx_left if idx_left >= 0 else i : i + 1] = _direction_flag(DRIFT_DIRECTION_UP) else: direction_flag[idx_left if idx_left >= 0 else i : i + 1] = _direction_flag(DRIFT_DIRECTION_DOWN) idx_left = -1 if drop_no_drift_segment: return [item for (item, flag) in zip(output_items, direction_flag) if flag == _direction_flag(threshold_direction)] else: return [item for (item, flag) in zip(output_items, direction_flag) if flag == _direction_flag(threshold_direction) or flag == 0] def calc_output_segment(seg_info, threshold_direction, series, drop_no_drift_segment=True): """Enhance and filter list of SegmentInfo for easy integration on ITSI side Args: seg_info (list of SegmentInfo): summary of the PLA model fitting result threshold_direction (str): the direction of drifts to be detected series (pandas Series): input time series drop_no_drift_segment (bool, optional): whether or not drop NO_DRIFT segments from output. Default to True Returns: list of SegmentInfo: output for ITSI """ for seg in seg_info: seg.start_time = series.index[seg.idx_start] seg.end_time = series.index[seg.idx_end] seg.drift_type = LEVEL_DRIFT_TYPE if seg.length() == 0 else TREND_DRIFT_TYPE if seg.part_or_whole == DRIFTED: seg.threshold_time = series.index[seg.idx_threshold] return filter_output_segment(seg_info, threshold_direction, drop_no_drift_segment) def deviation_to_weight(deviation): return 1/(1 + deviation) def detect_drifts( series, threshold=DEFAULT_THRESHOLD, threshold_direction=DRIFT_DIRECTION_BOTH, drop_no_drift_segment=True, lowess_points=LOWESS_POINTS, feature_scaled=FEATURE_SCALED, show_pla=False): """Detect drifts for ITSI KPI time series detect both gradual trend and sudden level drifts Args: series (pandas Series): input KPI time series threshold (int, optional): threshold for binary decision of whether KPI drifted or not. Default to DEFAULT_THRESHOLD threshold_direction (str, optional): the direction of drifts to be detected. Default to DRIFT_DIRECTION_BOTH drop_no_drift_segment (bool, optional): whether or not drop NO_DRIFT segments from output. Default to True lowess_points (int, optional): number of points for local LOWESS smoothing needed for SND calculation. Defaults to LOWESS_POINTS. feature_scaled (bool, optional): whether or not to scale feature to 1.0. With scaled feature, the following percent change calculation for values close to zero can be improved to avoid extraordinarily large values. Default to FEATURE_SCALED show_pla (bool, optional): whether or not to show viz for results of the PLA model fitting. Defaults to False. Returns: (list of SegmentInfo, float, tuple): ( output for ITSI integration, time spent in seconds, optional output tuple for showing pla results) """ time_0 = time.time() # at the beginning, calculate the snd deviation and detect level drifts from scratch deviation, drifts, test_stats = snd_deviation_and_drift(series, lowess_points=lowess_points) weights = deviation_to_weight(deviation) drifts = np.concatenate(([0], drifts)) y = series.values y_est, knots, seg_info, baseline = fit_pla(y, drifts=drifts, weights=weights, feature_scaled=feature_scaled) # Post-processing to heuristically simplify results in order to report overall pattern # leverage percent-change at level drift points calculated from PLA model fitting, to # measure changes at level drift points, beyond the test statistic from the window method if len(drifts) > 2: # remove drifts of small change drift_changes = np.array([seg.percent_drift() for seg in seg_info if seg.length()==0]) has_big_change = abs(drift_changes) > THRESHOLD_SMALL_DRIFT not_short_drift_pair = np.full(len(drifts) - 2, True) if len(drifts) > 3 and has_big_change.sum() > 0: # remove short temporary drift pairs that essentially go back to same level for i in range(1, len(drifts)-2): if drifts[i+1] - drifts[i] > THRESHOLD_TEMPORARY_DRIFT_LENGTH: continue val_before_drift_pair = y_est[i-1][-1] val_after_drift_pair = y_est[i+1][0] if abs(percent_change(val_before_drift_pair, val_after_drift_pair)) <= THRESHOLD_TEMPORARY_DRIFT: # remove both points in the pair not_short_drift_pair[i-1] = False not_short_drift_pair[i] = False else: # remove the one with smaller test_stat if test_stats[drifts[i]] > test_stats[drifts[i + 1]]: not_short_drift_pair[i] = False else: not_short_drift_pair[i - 1] = False # only keep drift points really have big level drifts drifts_ = np.concatenate(( [0], drifts[1:-1][np.where([a and b for a, b in zip(has_big_change, not_short_drift_pair)])], [drifts[-1]] )) if len(drifts_) < len(drifts): drifts = drifts_ # update weights corresponding to updated level drift partitioning weights = deviation_to_weight(snd(y, lowess_points=lowess_points, drifts=drifts[1:])) # fit the PLA model again with updated level drift points y_est, knots, seg_info, baseline = fit_pla(y, drifts=drifts, weights=weights, feature_scaled=feature_scaled) if len(drifts) > 2: # remove drifts of small change drift_changes = np.array([seg.percent_drift() for seg in seg_info if seg.length()==0]) has_big_change = abs(drift_changes) > THRESHOLD_SMALL_DRIFT drifts_ = np.concatenate(( [0], drifts[1:-1][np.where(has_big_change)], [drifts[-1]] )) if len(drifts_) < len(drifts): drifts = drifts_ # update weights corresponding to updated level drift partitioning weights = deviation_to_weight(snd(y, lowess_points=lowess_points, drifts=drifts[1:])) # fit the PLA model a 3rd time with updated level drift points y_est, knots, seg_info, baseline = fit_pla(y, drifts=drifts, weights=weights, feature_scaled=feature_scaled) calc_and_update_drift_flag(seg_info, threshold) # set the drift flag, based on user configured threshold output_segments = calc_output_segment(seg_info, threshold_direction, series, drop_no_drift_segment) time_spent = time.time() - time_0 return output_segments, time_spent, (y_est, knots, drifts, seg_info, baseline) if show_pla else None