You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

421 lines
18 KiB

import numpy as np
import time
from scipy.interpolate import UnivariateSpline
from util.constants import (
FEATURE_SCALED,
SMOOTH_FACTOR,
LOWESS_POINTS,
THRESHOLD_SMALL_DRIFT,
THRESHOLD_TEMPORARY_DRIFT_LENGTH,
THRESHOLD_TEMPORARY_DRIFT,
NO_DRIFT,
DRIFTED,
DRIFT_PART,
DRIFT_DIRECTION_BOTH,
DRIFT_DIRECTION_UP,
DRIFT_DIRECTION_DOWN,
DEFAULT_THRESHOLD,
ZERO_LEVEL_TOLERANCE
)
from algo.level_drift_detection import window_method
from util.data_prepare import snd, percent_change
from util.csc_output import SegmentInfo, TREND_DRIFT_TYPE, LEVEL_DRIFT_TYPE
from logger import get_logger
from six.moves import range, zip
logger = get_logger()
def _smooth_factor(y, weights=None, smooth_factor=SMOOTH_FACTOR):
"""Calculate the smooth factor of the smoothing spline fitting, as:
The weighted square sum of input time series' 1st order difference,
multipled by a constant factor.
Args:
y (np array): input time series
weights (np array, optional): weights on input data points. Defaults to None.
smooth_factor (float, optional): constant multiplier. Defaults to SMOOTH_FACTOR.
Returns:
float: smooth factor
"""
if weights is not None:
return np.dot(
np.square(np.diff(y)),
weights[1:]
) * smooth_factor
return np.square(np.diff(y)).sum() * smooth_factor
def snd_deviation_and_drift(series, lowess_points=LOWESS_POINTS, ignore_level_drift=False):
"""Calculate the SND deviation and level drift, for trend detection with PLA model fitting
The SND will be used to derive the weights on data points while fitting PLA models, calculated
as w = 1/(snd + 1). By difinition, SND is non-negaitive and outliers have large snd values.
Consequently, outliers will have small weights, and weights of other data points will be closer to 1.
The SND is not effective when the underlying assumption of a stable baseeline is not valid.
To handle the case of level drifts, the SND needs to be calculated per each segment of the
input time series partitioned at level drift points.
Args:
series (pandas Series): input time series
lowess_points (int, optional): points of LOWESS smoothing. Defaults to LOWESS_POINTS.
ignore_level_drift (bool, optional): whether or not to ignore level drifts. Defaults to False.
Returns:
(pandas Series, np array, np array): ( snd,
drift points as int indexes,
test statistic for level drift detection)
"""
if ignore_level_drift:
return snd(series.values, lowess_points=lowess_points), np.array([len(series)]), None
drifts, test_stats = window_method(series)
return snd(series.values, lowess_points=lowess_points, drifts=drifts), drifts, test_stats
def _pla_info(y_est, knots, idx_init=0):
"""Generate summary of the PLA model fitting result on a time series, or
part of a time series partitioned by level drift points
Args:
y_est (list of np array): list of linear approximation of input partitioned by knots
knots (np array of int): knots of piecewise linear approximation
idx_init (int, optional): the index of the start point, in the case of multipe PLA model fitting partitioned by leve drifts. Defaults to 0
Returns:
list of SegmentInfo: SegmentInfo per each segment
"""
seg_info = []
left = 0
for knot in knots[1:]:
seg_info.append(SegmentInfo(
idx_start = left + idx_init,
idx_end = knot + idx_init,
val_start = y_est[left],
val_end = y_est[knot],
))
left = knot
return seg_info
def fit_pla(y, weights=None, drifts=None, feature_scaled=FEATURE_SCALED):
"""Fit continuous PLA model, as degree-1 smoothing spline fitting
Weights on input data points is used for the soft-thresholding method that avoids PLA model
fitting to outliers in input time series.
Level drift info is integrated to fit PLA models separately on segments partitioned by level drift points.
Otherwise, the PLA model will be over stretched by adding multiple knots around drift points, resulted
in a less accurate description of the overall trend/drift pattern of the input time series.
Args:
y (np array): input time series
weights (np array, optional): weights on input data points. Defaults to None.
drifts (list of int, optional): list of level drift points. Defaults to None.
Returns:
(list of np array, list of int np array, list of SegmentInfo): (
list of PLA approximation of segments partitioned by level drifts,
list of knots of segments partitioned by level drifts,
list of SegmentInfo of segments partitioned by level drifts or PLA knots
)
"""
if weights is None:
weights = np.ones_like(y)
if drifts is None:
drifts =[0, len(y)]
y_est = []
knots = []
for i in range(1, len(drifts)):
# per each segment partitioned by level drift point, fit a PLA model separately
left = drifts[i-1]
right = drifts[i]
x = np.array(list(range(right - left)))
y_part = y[left:right]
s = _smooth_factor(y_part, weights[left:right])
pla = UnivariateSpline(
x,
y_part,
k=1, # degree-1 spline for continuous PLA
s=s,
w=weights[left:right]
)
# apply the fitted PLA model to get the PLA approximation
y_est_part = pla(x)
# Use the feature value's PLA approximation at the beginning of the look-back period
# to scale the feature's PLA approximation
if feature_scaled and i == 1:
baseline = y_est_part[0]
if feature_scaled:
y_est_part /= baseline
y_est.append(y_est_part)
knots.append(pla.get_knots().astype(int))
seg_info = []
seg_info +=_pla_info(y_est[0], knots[0])
if len(y_est) > 1:
y_est_pre = y_est[0][-1]
for i, (y_est_part, knots_part) in enumerate(zip(y_est[1:], knots[1:])):
drift = drifts[i + 1]
# add level drift point to output as zero-length segment
seg_info.append(SegmentInfo(
idx_start = drift,
idx_end = drift,
val_start = y_est_pre,
val_end = y_est_part[0]
))
seg_info += _pla_info(y_est_part, knots_part, idx_init=drift)
y_est_pre = y_est_part[-1]
return y_est, knots, seg_info, 0 if not feature_scaled else baseline
def is_threshold_exceeded(baseline_list, val, threshold):
"""Compare a value versus a list of baseline values,
return the index of the last baseline that the threshold is exceeded,
return -1 if none of the comparisons exceeds the threshold
Args:
baseline_list (list of float): a list of baseline values
val (float): the value to compare vs baselines
threshold (int): the percent-change threshold
Returns:
int: the index of the last baseline that the threshold is exceeded
"""
comparison = [abs(percent_change(b, val)) >= threshold for b in baseline_list]
idx_list = [i for i, x in enumerate(comparison) if x]
if len(idx_list) == 0:
return -1
else:
return idx_list[-1]
def calc_trend_threshold_time(seg, baseline, threshold):
if baseline == seg.val_start: # one segment drift
seg_percent_change = percent_change(seg.val_start, seg.val_end)
return seg.idx_start + int(seg.length() * threshold / abs(seg_percent_change))
# accumulated drift
if seg.val_end > seg.val_start : # upward trend
if abs(baseline) < ZERO_LEVEL_TOLERANCE:
threshold_val = threshold / 100.0
else:
threshold_val = baseline + baseline * threshold / 100
else: # downward trend
threshold_val = baseline - baseline * threshold / 100
return seg.idx_start + int(seg.length() * (threshold_val - seg.val_start) / (seg.val_end - seg.val_start))
def calc_and_update_drift_flag(seg_info, threshold):
"""Calculate the drift flag for list of segments, update the drift flag in place
Handle both simple drifts and accumulated drifts
Args:
seg_info (list of SegmentInfo): summary of the result of the PLA model fitting
threshold (int): threshold for binary decision of whether KPI drifted or not
"""
baselines = []
for i, seg in enumerate(seg_info):
baselines.append(seg.val_start)
exceeded_idx = is_threshold_exceeded(baselines, seg.val_end, threshold)
if exceeded_idx >= 0:
# number of contributing DRIFT_PART segments
# should be zero if exceeded_idx refer to the last item of baselines
cnt_drift_parts = len(baselines) - 1 - exceeded_idx
# set the DRIFT_PART flag for contributing segments
for j in range(i - cnt_drift_parts, i):
seg_info[j].part_or_whole = DRIFT_PART
# set the DRIFTED flag for the segment that change exceeds threshold
seg.part_or_whole = DRIFTED
if seg.length() == 0: # level drift
seg.idx_threshold = seg.idx_end
else: # calculate the exact threshold_time in the middle of a trend segment
seg.idx_threshold = calc_trend_threshold_time(seg, baselines[exceeded_idx], threshold)
# reset the baseline to evaluate the next possible drift
baselines = []
def filter_output_segment(output_items, threshold_direction, drop_no_drift_segment=True):
"""Filter output segments by threshold direction, and possibly drop NO_DRIFT segments
Args:
output_items (list of SegmentInfo): un-filtered output items
threshold_direction (str): the direction direction
drop_no_drift_segment (bool, optional): whether or not drop NO_DRIFT segments from output. Defaults to True.
Returns:
list of SegmentInfo: filtered output items
"""
if threshold_direction == DRIFT_DIRECTION_BOTH:
# filter out NO_DRIFT segments
return [r for r in output_items if r.part_or_whole != NO_DRIFT] if drop_no_drift_segment else output_items
def _direction_flag(direction):
if direction == DRIFT_DIRECTION_UP:
return 1
else:
return -1
n_outputItems = len(output_items)
direction_flag = np.zeros(n_outputItems, dtype=int)
idx_left = -1
for i in range(n_outputItems):
if output_items[i].part_or_whole == DRIFT_PART and idx_left < 0:
idx_left = i
if output_items[i].part_or_whole == DRIFTED:
if idx_left < 0:
val_start = output_items[i].val_start
else:
val_start = output_items[idx_left].val_start
if output_items[i].val_end > val_start:
direction_flag[idx_left if idx_left >= 0 else i : i + 1] = _direction_flag(DRIFT_DIRECTION_UP)
else:
direction_flag[idx_left if idx_left >= 0 else i : i + 1] = _direction_flag(DRIFT_DIRECTION_DOWN)
idx_left = -1
if drop_no_drift_segment:
return [item for (item, flag) in zip(output_items, direction_flag) if flag == _direction_flag(threshold_direction)]
else:
return [item for (item, flag) in zip(output_items, direction_flag) if flag == _direction_flag(threshold_direction) or flag == 0]
def calc_output_segment(seg_info, threshold_direction, series, drop_no_drift_segment=True):
"""Enhance and filter list of SegmentInfo for easy integration on ITSI side
Args:
seg_info (list of SegmentInfo): summary of the PLA model fitting result
threshold_direction (str): the direction of drifts to be detected
series (pandas Series): input time series
drop_no_drift_segment (bool, optional): whether or not drop NO_DRIFT segments from output. Default to True
Returns:
list of SegmentInfo: output for ITSI
"""
for seg in seg_info:
seg.start_time = series.index[seg.idx_start]
seg.end_time = series.index[seg.idx_end]
seg.drift_type = LEVEL_DRIFT_TYPE if seg.length() == 0 else TREND_DRIFT_TYPE
if seg.part_or_whole == DRIFTED:
seg.threshold_time = series.index[seg.idx_threshold]
return filter_output_segment(seg_info, threshold_direction, drop_no_drift_segment)
def deviation_to_weight(deviation):
return 1/(1 + deviation)
def detect_drifts(
series,
threshold=DEFAULT_THRESHOLD,
threshold_direction=DRIFT_DIRECTION_BOTH,
drop_no_drift_segment=True,
lowess_points=LOWESS_POINTS,
feature_scaled=FEATURE_SCALED,
show_pla=False):
"""Detect drifts for ITSI KPI time series
detect both gradual trend and sudden level drifts
Args:
series (pandas Series): input KPI time series
threshold (int, optional): threshold for binary decision of whether KPI drifted or not. Default to DEFAULT_THRESHOLD
threshold_direction (str, optional): the direction of drifts to be detected. Default to DRIFT_DIRECTION_BOTH
drop_no_drift_segment (bool, optional): whether or not drop NO_DRIFT segments from output. Default to True
lowess_points (int, optional): number of points for local LOWESS smoothing needed for SND calculation.
Defaults to LOWESS_POINTS.
feature_scaled (bool, optional): whether or not to scale feature to 1.0. With scaled feature, the following percent change
calculation for values close to zero can be improved to avoid extraordinarily large values.
Default to FEATURE_SCALED
show_pla (bool, optional): whether or not to show viz for results of the PLA model fitting. Defaults to False.
Returns:
(list of SegmentInfo, float, tuple): ( output for ITSI integration,
time spent in seconds,
optional output tuple for showing pla results)
"""
time_0 = time.time()
# at the beginning, calculate the snd deviation and detect level drifts from scratch
deviation, drifts, test_stats = snd_deviation_and_drift(series, lowess_points=lowess_points)
weights = deviation_to_weight(deviation)
drifts = np.concatenate(([0], drifts))
y = series.values
y_est, knots, seg_info, baseline = fit_pla(y, drifts=drifts, weights=weights, feature_scaled=feature_scaled)
# Post-processing to heuristically simplify results in order to report overall pattern
# leverage percent-change at level drift points calculated from PLA model fitting, to
# measure changes at level drift points, beyond the test statistic from the window method
if len(drifts) > 2:
# remove drifts of small change
drift_changes = np.array([seg.percent_drift() for seg in seg_info if seg.length()==0])
has_big_change = abs(drift_changes) > THRESHOLD_SMALL_DRIFT
not_short_drift_pair = np.full(len(drifts) - 2, True)
if len(drifts) > 3 and has_big_change.sum() > 0:
# remove short temporary drift pairs that essentially go back to same level
for i in range(1, len(drifts)-2):
if drifts[i+1] - drifts[i] > THRESHOLD_TEMPORARY_DRIFT_LENGTH:
continue
val_before_drift_pair = y_est[i-1][-1]
val_after_drift_pair = y_est[i+1][0]
if abs(percent_change(val_before_drift_pair, val_after_drift_pair)) <= THRESHOLD_TEMPORARY_DRIFT:
# remove both points in the pair
not_short_drift_pair[i-1] = False
not_short_drift_pair[i] = False
else:
# remove the one with smaller test_stat
if test_stats[drifts[i]] > test_stats[drifts[i + 1]]:
not_short_drift_pair[i] = False
else:
not_short_drift_pair[i - 1] = False
# only keep drift points really have big level drifts
drifts_ = np.concatenate((
[0],
drifts[1:-1][np.where([a and b for a, b in zip(has_big_change, not_short_drift_pair)])],
[drifts[-1]]
))
if len(drifts_) < len(drifts):
drifts = drifts_
# update weights corresponding to updated level drift partitioning
weights = deviation_to_weight(snd(y, lowess_points=lowess_points, drifts=drifts[1:]))
# fit the PLA model again with updated level drift points
y_est, knots, seg_info, baseline = fit_pla(y, drifts=drifts, weights=weights, feature_scaled=feature_scaled)
if len(drifts) > 2:
# remove drifts of small change
drift_changes = np.array([seg.percent_drift() for seg in seg_info if seg.length()==0])
has_big_change = abs(drift_changes) > THRESHOLD_SMALL_DRIFT
drifts_ = np.concatenate((
[0],
drifts[1:-1][np.where(has_big_change)],
[drifts[-1]]
))
if len(drifts_) < len(drifts):
drifts = drifts_
# update weights corresponding to updated level drift partitioning
weights = deviation_to_weight(snd(y, lowess_points=lowess_points, drifts=drifts[1:]))
# fit the PLA model a 3rd time with updated level drift points
y_est, knots, seg_info, baseline = fit_pla(y, drifts=drifts, weights=weights, feature_scaled=feature_scaled)
calc_and_update_drift_flag(seg_info, threshold) # set the drift flag, based on user configured threshold
output_segments = calc_output_segment(seg_info, threshold_direction, series, drop_no_drift_segment)
time_spent = time.time() - time_0
return output_segments, time_spent, (y_est, knots, drifts, seg_info, baseline) if show_pla else None