You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
421 lines
18 KiB
421 lines
18 KiB
import numpy as np
|
|
import time
|
|
|
|
from scipy.interpolate import UnivariateSpline
|
|
|
|
from util.constants import (
|
|
FEATURE_SCALED,
|
|
SMOOTH_FACTOR,
|
|
LOWESS_POINTS,
|
|
THRESHOLD_SMALL_DRIFT,
|
|
THRESHOLD_TEMPORARY_DRIFT_LENGTH,
|
|
THRESHOLD_TEMPORARY_DRIFT,
|
|
NO_DRIFT,
|
|
DRIFTED,
|
|
DRIFT_PART,
|
|
DRIFT_DIRECTION_BOTH,
|
|
DRIFT_DIRECTION_UP,
|
|
DRIFT_DIRECTION_DOWN,
|
|
DEFAULT_THRESHOLD,
|
|
ZERO_LEVEL_TOLERANCE
|
|
)
|
|
|
|
from algo.level_drift_detection import window_method
|
|
from util.data_prepare import snd, percent_change
|
|
from util.csc_output import SegmentInfo, TREND_DRIFT_TYPE, LEVEL_DRIFT_TYPE
|
|
|
|
from logger import get_logger
|
|
from six.moves import range, zip
|
|
logger = get_logger()
|
|
|
|
def _smooth_factor(y, weights=None, smooth_factor=SMOOTH_FACTOR):
|
|
"""Calculate the smooth factor of the smoothing spline fitting, as:
|
|
The weighted square sum of input time series' 1st order difference,
|
|
multipled by a constant factor.
|
|
|
|
Args:
|
|
y (np array): input time series
|
|
weights (np array, optional): weights on input data points. Defaults to None.
|
|
smooth_factor (float, optional): constant multiplier. Defaults to SMOOTH_FACTOR.
|
|
|
|
Returns:
|
|
float: smooth factor
|
|
"""
|
|
if weights is not None:
|
|
return np.dot(
|
|
np.square(np.diff(y)),
|
|
weights[1:]
|
|
) * smooth_factor
|
|
|
|
return np.square(np.diff(y)).sum() * smooth_factor
|
|
|
|
def snd_deviation_and_drift(series, lowess_points=LOWESS_POINTS, ignore_level_drift=False):
|
|
"""Calculate the SND deviation and level drift, for trend detection with PLA model fitting
|
|
|
|
The SND will be used to derive the weights on data points while fitting PLA models, calculated
|
|
as w = 1/(snd + 1). By difinition, SND is non-negaitive and outliers have large snd values.
|
|
Consequently, outliers will have small weights, and weights of other data points will be closer to 1.
|
|
|
|
The SND is not effective when the underlying assumption of a stable baseeline is not valid.
|
|
To handle the case of level drifts, the SND needs to be calculated per each segment of the
|
|
input time series partitioned at level drift points.
|
|
|
|
Args:
|
|
series (pandas Series): input time series
|
|
lowess_points (int, optional): points of LOWESS smoothing. Defaults to LOWESS_POINTS.
|
|
ignore_level_drift (bool, optional): whether or not to ignore level drifts. Defaults to False.
|
|
|
|
Returns:
|
|
(pandas Series, np array, np array): ( snd,
|
|
drift points as int indexes,
|
|
test statistic for level drift detection)
|
|
"""
|
|
|
|
if ignore_level_drift:
|
|
return snd(series.values, lowess_points=lowess_points), np.array([len(series)]), None
|
|
|
|
drifts, test_stats = window_method(series)
|
|
return snd(series.values, lowess_points=lowess_points, drifts=drifts), drifts, test_stats
|
|
|
|
|
|
def _pla_info(y_est, knots, idx_init=0):
|
|
"""Generate summary of the PLA model fitting result on a time series, or
|
|
part of a time series partitioned by level drift points
|
|
|
|
Args:
|
|
y_est (list of np array): list of linear approximation of input partitioned by knots
|
|
knots (np array of int): knots of piecewise linear approximation
|
|
idx_init (int, optional): the index of the start point, in the case of multipe PLA model fitting partitioned by leve drifts. Defaults to 0
|
|
|
|
Returns:
|
|
list of SegmentInfo: SegmentInfo per each segment
|
|
"""
|
|
seg_info = []
|
|
left = 0
|
|
for knot in knots[1:]:
|
|
seg_info.append(SegmentInfo(
|
|
idx_start = left + idx_init,
|
|
idx_end = knot + idx_init,
|
|
val_start = y_est[left],
|
|
val_end = y_est[knot],
|
|
))
|
|
left = knot
|
|
|
|
return seg_info
|
|
|
|
def fit_pla(y, weights=None, drifts=None, feature_scaled=FEATURE_SCALED):
|
|
"""Fit continuous PLA model, as degree-1 smoothing spline fitting
|
|
|
|
Weights on input data points is used for the soft-thresholding method that avoids PLA model
|
|
fitting to outliers in input time series.
|
|
|
|
Level drift info is integrated to fit PLA models separately on segments partitioned by level drift points.
|
|
Otherwise, the PLA model will be over stretched by adding multiple knots around drift points, resulted
|
|
in a less accurate description of the overall trend/drift pattern of the input time series.
|
|
|
|
Args:
|
|
y (np array): input time series
|
|
weights (np array, optional): weights on input data points. Defaults to None.
|
|
drifts (list of int, optional): list of level drift points. Defaults to None.
|
|
|
|
Returns:
|
|
(list of np array, list of int np array, list of SegmentInfo): (
|
|
list of PLA approximation of segments partitioned by level drifts,
|
|
list of knots of segments partitioned by level drifts,
|
|
list of SegmentInfo of segments partitioned by level drifts or PLA knots
|
|
)
|
|
"""
|
|
if weights is None:
|
|
weights = np.ones_like(y)
|
|
if drifts is None:
|
|
drifts =[0, len(y)]
|
|
|
|
y_est = []
|
|
knots = []
|
|
for i in range(1, len(drifts)):
|
|
# per each segment partitioned by level drift point, fit a PLA model separately
|
|
left = drifts[i-1]
|
|
right = drifts[i]
|
|
x = np.array(list(range(right - left)))
|
|
y_part = y[left:right]
|
|
s = _smooth_factor(y_part, weights[left:right])
|
|
|
|
pla = UnivariateSpline(
|
|
x,
|
|
y_part,
|
|
k=1, # degree-1 spline for continuous PLA
|
|
s=s,
|
|
w=weights[left:right]
|
|
)
|
|
|
|
# apply the fitted PLA model to get the PLA approximation
|
|
y_est_part = pla(x)
|
|
|
|
# Use the feature value's PLA approximation at the beginning of the look-back period
|
|
# to scale the feature's PLA approximation
|
|
if feature_scaled and i == 1:
|
|
baseline = y_est_part[0]
|
|
if feature_scaled:
|
|
y_est_part /= baseline
|
|
|
|
y_est.append(y_est_part)
|
|
knots.append(pla.get_knots().astype(int))
|
|
|
|
seg_info = []
|
|
seg_info +=_pla_info(y_est[0], knots[0])
|
|
if len(y_est) > 1:
|
|
y_est_pre = y_est[0][-1]
|
|
for i, (y_est_part, knots_part) in enumerate(zip(y_est[1:], knots[1:])):
|
|
drift = drifts[i + 1]
|
|
# add level drift point to output as zero-length segment
|
|
seg_info.append(SegmentInfo(
|
|
idx_start = drift,
|
|
idx_end = drift,
|
|
val_start = y_est_pre,
|
|
val_end = y_est_part[0]
|
|
))
|
|
|
|
seg_info += _pla_info(y_est_part, knots_part, idx_init=drift)
|
|
y_est_pre = y_est_part[-1]
|
|
|
|
return y_est, knots, seg_info, 0 if not feature_scaled else baseline
|
|
|
|
def is_threshold_exceeded(baseline_list, val, threshold):
|
|
"""Compare a value versus a list of baseline values,
|
|
return the index of the last baseline that the threshold is exceeded,
|
|
return -1 if none of the comparisons exceeds the threshold
|
|
|
|
Args:
|
|
baseline_list (list of float): a list of baseline values
|
|
val (float): the value to compare vs baselines
|
|
threshold (int): the percent-change threshold
|
|
|
|
Returns:
|
|
int: the index of the last baseline that the threshold is exceeded
|
|
"""
|
|
comparison = [abs(percent_change(b, val)) >= threshold for b in baseline_list]
|
|
idx_list = [i for i, x in enumerate(comparison) if x]
|
|
if len(idx_list) == 0:
|
|
return -1
|
|
else:
|
|
return idx_list[-1]
|
|
|
|
def calc_trend_threshold_time(seg, baseline, threshold):
|
|
if baseline == seg.val_start: # one segment drift
|
|
seg_percent_change = percent_change(seg.val_start, seg.val_end)
|
|
return seg.idx_start + int(seg.length() * threshold / abs(seg_percent_change))
|
|
|
|
# accumulated drift
|
|
if seg.val_end > seg.val_start : # upward trend
|
|
if abs(baseline) < ZERO_LEVEL_TOLERANCE:
|
|
threshold_val = threshold / 100.0
|
|
else:
|
|
threshold_val = baseline + baseline * threshold / 100
|
|
else: # downward trend
|
|
threshold_val = baseline - baseline * threshold / 100
|
|
|
|
return seg.idx_start + int(seg.length() * (threshold_val - seg.val_start) / (seg.val_end - seg.val_start))
|
|
|
|
def calc_and_update_drift_flag(seg_info, threshold):
|
|
"""Calculate the drift flag for list of segments, update the drift flag in place
|
|
Handle both simple drifts and accumulated drifts
|
|
|
|
Args:
|
|
seg_info (list of SegmentInfo): summary of the result of the PLA model fitting
|
|
threshold (int): threshold for binary decision of whether KPI drifted or not
|
|
"""
|
|
baselines = []
|
|
for i, seg in enumerate(seg_info):
|
|
baselines.append(seg.val_start)
|
|
exceeded_idx = is_threshold_exceeded(baselines, seg.val_end, threshold)
|
|
if exceeded_idx >= 0:
|
|
# number of contributing DRIFT_PART segments
|
|
# should be zero if exceeded_idx refer to the last item of baselines
|
|
cnt_drift_parts = len(baselines) - 1 - exceeded_idx
|
|
# set the DRIFT_PART flag for contributing segments
|
|
for j in range(i - cnt_drift_parts, i):
|
|
seg_info[j].part_or_whole = DRIFT_PART
|
|
|
|
# set the DRIFTED flag for the segment that change exceeds threshold
|
|
seg.part_or_whole = DRIFTED
|
|
|
|
if seg.length() == 0: # level drift
|
|
seg.idx_threshold = seg.idx_end
|
|
else: # calculate the exact threshold_time in the middle of a trend segment
|
|
seg.idx_threshold = calc_trend_threshold_time(seg, baselines[exceeded_idx], threshold)
|
|
|
|
# reset the baseline to evaluate the next possible drift
|
|
baselines = []
|
|
|
|
def filter_output_segment(output_items, threshold_direction, drop_no_drift_segment=True):
|
|
"""Filter output segments by threshold direction, and possibly drop NO_DRIFT segments
|
|
|
|
Args:
|
|
output_items (list of SegmentInfo): un-filtered output items
|
|
threshold_direction (str): the direction direction
|
|
drop_no_drift_segment (bool, optional): whether or not drop NO_DRIFT segments from output. Defaults to True.
|
|
|
|
Returns:
|
|
list of SegmentInfo: filtered output items
|
|
"""
|
|
if threshold_direction == DRIFT_DIRECTION_BOTH:
|
|
# filter out NO_DRIFT segments
|
|
return [r for r in output_items if r.part_or_whole != NO_DRIFT] if drop_no_drift_segment else output_items
|
|
|
|
def _direction_flag(direction):
|
|
if direction == DRIFT_DIRECTION_UP:
|
|
return 1
|
|
else:
|
|
return -1
|
|
|
|
n_outputItems = len(output_items)
|
|
direction_flag = np.zeros(n_outputItems, dtype=int)
|
|
idx_left = -1
|
|
for i in range(n_outputItems):
|
|
if output_items[i].part_or_whole == DRIFT_PART and idx_left < 0:
|
|
idx_left = i
|
|
if output_items[i].part_or_whole == DRIFTED:
|
|
if idx_left < 0:
|
|
val_start = output_items[i].val_start
|
|
else:
|
|
val_start = output_items[idx_left].val_start
|
|
if output_items[i].val_end > val_start:
|
|
direction_flag[idx_left if idx_left >= 0 else i : i + 1] = _direction_flag(DRIFT_DIRECTION_UP)
|
|
else:
|
|
direction_flag[idx_left if idx_left >= 0 else i : i + 1] = _direction_flag(DRIFT_DIRECTION_DOWN)
|
|
idx_left = -1
|
|
|
|
if drop_no_drift_segment:
|
|
return [item for (item, flag) in zip(output_items, direction_flag) if flag == _direction_flag(threshold_direction)]
|
|
else:
|
|
return [item for (item, flag) in zip(output_items, direction_flag) if flag == _direction_flag(threshold_direction) or flag == 0]
|
|
|
|
def calc_output_segment(seg_info, threshold_direction, series, drop_no_drift_segment=True):
|
|
"""Enhance and filter list of SegmentInfo for easy integration on ITSI side
|
|
|
|
Args:
|
|
seg_info (list of SegmentInfo): summary of the PLA model fitting result
|
|
threshold_direction (str): the direction of drifts to be detected
|
|
series (pandas Series): input time series
|
|
drop_no_drift_segment (bool, optional): whether or not drop NO_DRIFT segments from output. Default to True
|
|
|
|
Returns:
|
|
list of SegmentInfo: output for ITSI
|
|
"""
|
|
for seg in seg_info:
|
|
seg.start_time = series.index[seg.idx_start]
|
|
seg.end_time = series.index[seg.idx_end]
|
|
seg.drift_type = LEVEL_DRIFT_TYPE if seg.length() == 0 else TREND_DRIFT_TYPE
|
|
if seg.part_or_whole == DRIFTED:
|
|
seg.threshold_time = series.index[seg.idx_threshold]
|
|
|
|
return filter_output_segment(seg_info, threshold_direction, drop_no_drift_segment)
|
|
|
|
def deviation_to_weight(deviation):
|
|
return 1/(1 + deviation)
|
|
|
|
def detect_drifts(
|
|
series,
|
|
threshold=DEFAULT_THRESHOLD,
|
|
threshold_direction=DRIFT_DIRECTION_BOTH,
|
|
drop_no_drift_segment=True,
|
|
lowess_points=LOWESS_POINTS,
|
|
feature_scaled=FEATURE_SCALED,
|
|
show_pla=False):
|
|
"""Detect drifts for ITSI KPI time series
|
|
detect both gradual trend and sudden level drifts
|
|
|
|
Args:
|
|
series (pandas Series): input KPI time series
|
|
threshold (int, optional): threshold for binary decision of whether KPI drifted or not. Default to DEFAULT_THRESHOLD
|
|
threshold_direction (str, optional): the direction of drifts to be detected. Default to DRIFT_DIRECTION_BOTH
|
|
drop_no_drift_segment (bool, optional): whether or not drop NO_DRIFT segments from output. Default to True
|
|
lowess_points (int, optional): number of points for local LOWESS smoothing needed for SND calculation.
|
|
Defaults to LOWESS_POINTS.
|
|
feature_scaled (bool, optional): whether or not to scale feature to 1.0. With scaled feature, the following percent change
|
|
calculation for values close to zero can be improved to avoid extraordinarily large values.
|
|
Default to FEATURE_SCALED
|
|
show_pla (bool, optional): whether or not to show viz for results of the PLA model fitting. Defaults to False.
|
|
|
|
Returns:
|
|
(list of SegmentInfo, float, tuple): ( output for ITSI integration,
|
|
time spent in seconds,
|
|
optional output tuple for showing pla results)
|
|
"""
|
|
time_0 = time.time()
|
|
|
|
# at the beginning, calculate the snd deviation and detect level drifts from scratch
|
|
deviation, drifts, test_stats = snd_deviation_and_drift(series, lowess_points=lowess_points)
|
|
weights = deviation_to_weight(deviation)
|
|
drifts = np.concatenate(([0], drifts))
|
|
|
|
y = series.values
|
|
y_est, knots, seg_info, baseline = fit_pla(y, drifts=drifts, weights=weights, feature_scaled=feature_scaled)
|
|
|
|
# Post-processing to heuristically simplify results in order to report overall pattern
|
|
# leverage percent-change at level drift points calculated from PLA model fitting, to
|
|
# measure changes at level drift points, beyond the test statistic from the window method
|
|
if len(drifts) > 2:
|
|
# remove drifts of small change
|
|
drift_changes = np.array([seg.percent_drift() for seg in seg_info if seg.length()==0])
|
|
has_big_change = abs(drift_changes) > THRESHOLD_SMALL_DRIFT
|
|
|
|
not_short_drift_pair = np.full(len(drifts) - 2, True)
|
|
if len(drifts) > 3 and has_big_change.sum() > 0:
|
|
# remove short temporary drift pairs that essentially go back to same level
|
|
for i in range(1, len(drifts)-2):
|
|
if drifts[i+1] - drifts[i] > THRESHOLD_TEMPORARY_DRIFT_LENGTH:
|
|
continue
|
|
val_before_drift_pair = y_est[i-1][-1]
|
|
val_after_drift_pair = y_est[i+1][0]
|
|
if abs(percent_change(val_before_drift_pair, val_after_drift_pair)) <= THRESHOLD_TEMPORARY_DRIFT:
|
|
# remove both points in the pair
|
|
not_short_drift_pair[i-1] = False
|
|
not_short_drift_pair[i] = False
|
|
else:
|
|
# remove the one with smaller test_stat
|
|
if test_stats[drifts[i]] > test_stats[drifts[i + 1]]:
|
|
not_short_drift_pair[i] = False
|
|
else:
|
|
not_short_drift_pair[i - 1] = False
|
|
|
|
# only keep drift points really have big level drifts
|
|
drifts_ = np.concatenate((
|
|
[0],
|
|
drifts[1:-1][np.where([a and b for a, b in zip(has_big_change, not_short_drift_pair)])],
|
|
[drifts[-1]]
|
|
))
|
|
|
|
if len(drifts_) < len(drifts):
|
|
drifts = drifts_
|
|
# update weights corresponding to updated level drift partitioning
|
|
weights = deviation_to_weight(snd(y, lowess_points=lowess_points, drifts=drifts[1:]))
|
|
# fit the PLA model again with updated level drift points
|
|
y_est, knots, seg_info, baseline = fit_pla(y, drifts=drifts, weights=weights, feature_scaled=feature_scaled)
|
|
|
|
if len(drifts) > 2:
|
|
# remove drifts of small change
|
|
drift_changes = np.array([seg.percent_drift() for seg in seg_info if seg.length()==0])
|
|
has_big_change = abs(drift_changes) > THRESHOLD_SMALL_DRIFT
|
|
|
|
drifts_ = np.concatenate((
|
|
[0],
|
|
drifts[1:-1][np.where(has_big_change)],
|
|
[drifts[-1]]
|
|
))
|
|
|
|
if len(drifts_) < len(drifts):
|
|
drifts = drifts_
|
|
# update weights corresponding to updated level drift partitioning
|
|
weights = deviation_to_weight(snd(y, lowess_points=lowess_points, drifts=drifts[1:]))
|
|
# fit the PLA model a 3rd time with updated level drift points
|
|
y_est, knots, seg_info, baseline = fit_pla(y, drifts=drifts, weights=weights, feature_scaled=feature_scaled)
|
|
|
|
|
|
calc_and_update_drift_flag(seg_info, threshold) # set the drift flag, based on user configured threshold
|
|
|
|
output_segments = calc_output_segment(seg_info, threshold_direction, series, drop_no_drift_segment)
|
|
time_spent = time.time() - time_0
|
|
|
|
return output_segments, time_spent, (y_est, knots, drifts, seg_info, baseline) if show_pla else None
|