You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
366 lines
14 KiB
366 lines
14 KiB
from dataclasses import dataclass
|
|
import pandas as pd
|
|
from scipy.stats import skew
|
|
from string import Template
|
|
import time
|
|
|
|
from util.dev_util import str_int_list_no_bracket
|
|
from util.data_prepare import (
|
|
COL_VALUE, DAY_OF_WEEK_NAME, DEFAULT_Z, DEFAULT_SENSITIVITY_MULTIPLIER,
|
|
down_sample, get_resolution, remove_super_spikes
|
|
)
|
|
from util.pattern_evaluate import (
|
|
THRESHOLD_SILHOUETTE_LOW, THRESHOLD_SILHOUETTE_HIGH,
|
|
check_weekly_pattern, evaluate_half_day, evaluate_hour_block
|
|
)
|
|
from util.threshold_utils import sub_sequences_groupby
|
|
from util.itsi_at_threshold import calc_history_normal_behavior, VARIATION_BUFFER
|
|
from util.outlier_exclusion_recommend import calc_sensitivity
|
|
from util.csc_output import THR_DIR_BOTH, THR_DIR_LO, THR_DIR_UP
|
|
|
|
from util import setup_logging
|
|
from six.moves import zip
|
|
|
|
logger = setup_logging.get_logger()
|
|
|
|
DAYS_IN_WEEK = [0, 1, 2, 3, 4, 5, 6]
|
|
|
|
DAY_IN_MINUTES = 1440
|
|
WEEK_IN_MINUTES = 10080
|
|
NO_RECOMMENDATION = 'No Recommendation'
|
|
INSUFFICIENT_DATA = 'INSUFFICIENT_DATA'
|
|
NO_PATTERN = 'NO_PATTERN'
|
|
PATTERN_SWITCH = 'PATTERN_SWITCH'
|
|
SUCCESSFUL = 'SUCCESSFUL'
|
|
|
|
DEFAULT_RESOLUTION = pd.Timedelta('15min')
|
|
COEF_FAVOR_WEEKLY = 1.1
|
|
THRESHOLD_SKEW = 2.0
|
|
|
|
CRON_TEMPLATE_DAILY_PATTERN = Template('0 $start_hour * * *')
|
|
CRON_TEMPLATE_WEEKLY_PATTERN = Template('0 $start_hour * * $dow_list')
|
|
|
|
def get_output_no_pattern(mean, std):
|
|
return {0: CronOutputItem('0 0 * * 0', WEEK_IN_MINUTES, DEFAULT_Z, mean, std)}
|
|
|
|
@dataclass(frozen=True)
|
|
class TimepolicyOutput:
|
|
cron_dict : dict
|
|
time_policy_desc : str
|
|
time_policy_score : float
|
|
threshold_direction : str
|
|
value_collections : tuple
|
|
|
|
@dataclass(frozen=True)
|
|
class CronOutputItem:
|
|
cron_expression : str
|
|
time_length : int
|
|
z_value : float
|
|
mean : float = 0.0
|
|
std : float = 0.0
|
|
sensitivity : float = DEFAULT_SENSITIVITY_MULTIPLIER
|
|
|
|
def __str__(self):
|
|
return f'({self.cron_expression}, {self.time_length}, z={self.z_value:.3f}, m={self.mean:.3f}, s={self.std:.3f}, {self.sensitivity:.3f})'
|
|
|
|
def update_cron_expression(self, cron_expression):
|
|
return CronOutputItem(
|
|
cron_expression = cron_expression,
|
|
time_length = self.time_length,
|
|
z_value = self.z_value,
|
|
mean = self.mean,
|
|
std = self.std,
|
|
)
|
|
|
|
def update_mean_std_sensitivity(self, mean, std, sensitivity):
|
|
return CronOutputItem(
|
|
cron_expression = self.cron_expression,
|
|
time_length = self.time_length,
|
|
z_value = self.z_value,
|
|
mean = mean,
|
|
std = std,
|
|
sensitivity = sensitivity
|
|
)
|
|
|
|
def update_numeric_stats(self, mean, std, z_value, sensitivity):
|
|
return CronOutputItem(
|
|
cron_expression = self.cron_expression,
|
|
time_length = self.time_length,
|
|
z_value = max(z_value, self.z_value) * (1.0 + VARIATION_BUFFER),
|
|
mean = mean,
|
|
std = std,
|
|
sensitivity = sensitivity
|
|
)
|
|
|
|
class TimePolicy:
|
|
def __init__(self, hour_block_length, offset, has_weekend, offdays_start=5, score=-2.0, label_method=None) -> None:
|
|
self.hour_block_length = hour_block_length
|
|
self.offset = offset
|
|
self.has_weekend = has_weekend
|
|
self.offdays_start = offdays_start
|
|
self.score = score
|
|
self.label_method = label_method
|
|
|
|
def __str__(self) -> str:
|
|
offset_str = '' if self.offset == 0 else f', offset={self.offset}h'
|
|
if self.has_weekend:
|
|
offdays_str = '' if self.offdays_start == 5 else f', {DAY_OF_WEEK_NAME[self.offdays_start]} {DAY_OF_WEEK_NAME[(self.offdays_start + 1) % 7]} off'
|
|
return 'weekly seasonality' + offset_str + offdays_str
|
|
|
|
if self.hour_block_length == 12: # half-day pattern
|
|
return 'half-day pattern' + offset_str
|
|
|
|
if self.hour_block_length > 0:
|
|
return f'daily {self.hour_block_length}-hour block' + offset_str
|
|
|
|
return 'no pattern'
|
|
|
|
def _dict_key_for_weekly_cron(dow, start_hour):
|
|
return dow * 100 + start_hour
|
|
|
|
def detect_threshold_direction(values):
|
|
skew_score = skew(values)
|
|
if skew_score > THRESHOLD_SKEW:
|
|
return THR_DIR_UP
|
|
if skew_score < - THRESHOLD_SKEW:
|
|
return THR_DIR_LO
|
|
|
|
return THR_DIR_BOTH
|
|
|
|
def generate_cron_output(df, filter_config, choose_direction=True, compare_thresholds=False):
|
|
|
|
df_resolution = get_resolution(df)
|
|
df = down_sample(df, resolution=df_resolution, df_resolution=df_resolution) # don't change resolution but fill na
|
|
|
|
# make a copy of the df values before calling recommend_time_policy, b/c
|
|
# df values may get modified there, for example, in remove_super_spikes
|
|
df_values = df[COL_VALUE].copy()
|
|
|
|
if choose_direction:
|
|
detected_threshold_direction = detect_threshold_direction(df_values)
|
|
else:
|
|
detected_threshold_direction = None
|
|
|
|
# Time policy by seasonality pattern detection
|
|
time_policy = recommend_time_policy(df)
|
|
|
|
if time_policy.score < THRESHOLD_SILHOUETTE_LOW: # no pattern detected
|
|
logger.warning(f'The Silhouette Score is {time_policy.score:.3f}, which is less than the minimum threshold of {THRESHOLD_SILHOUETTE_LOW}.')
|
|
return TimepolicyOutput(
|
|
cron_dict = get_output_no_pattern(df[COL_VALUE].mean(), df[COL_VALUE].std()),
|
|
time_policy_desc = NO_PATTERN,
|
|
time_policy_score = round(time_policy.score, 3),
|
|
threshold_direction = detected_threshold_direction,
|
|
value_collections = None
|
|
)
|
|
|
|
history_normal_dict, subs, label_method = calc_history_normal_behavior(df, time_policy, filter_config)
|
|
|
|
if not time_policy.has_weekend:
|
|
hour_block_length = time_policy.hour_block_length
|
|
output_dict = {}
|
|
|
|
day_offset = time_policy.offset
|
|
weekly_with_offset = time_policy.has_weekend and time_policy.offset > 0
|
|
if weekly_with_offset:
|
|
# reset day_offset to re-use the code in the for-loop below
|
|
# put the logic of day_offset adjustment for the case of weekly-pattern-w-day-offset in isolation after that
|
|
day_offset = 0
|
|
for label, subs_group in list(sub_sequences_groupby(subs, label_method).items()):
|
|
history_normal = history_normal_dict[label]
|
|
|
|
if time_policy.has_weekend: # in this case, subsequence is 1-day length
|
|
dow_set = sorted(set([sub.dayofweek() for sub in subs_group]))
|
|
dow_disp = str_int_list_no_bracket(dow_set)
|
|
|
|
if history_normal.split_pnts is None:
|
|
key = _dict_key_for_weekly_cron(dow_set[0], day_offset)
|
|
output_dict[key] = CronOutputItem(
|
|
cron_expression = CRON_TEMPLATE_WEEKLY_PATTERN.substitute(start_hour=day_offset, dow_list=dow_disp),
|
|
time_length = DAY_IN_MINUTES,
|
|
z_value = history_normal.zs[0],
|
|
)
|
|
else:
|
|
left = 0
|
|
split_pnts = history_normal.split_pnts + [24]
|
|
for split_pnt, z in zip(split_pnts, history_normal.zs):
|
|
start_hour=(day_offset + left) % 24
|
|
key = _dict_key_for_weekly_cron(dow_set[0], start_hour)
|
|
output_dict[key] = CronOutputItem(
|
|
cron_expression = CRON_TEMPLATE_WEEKLY_PATTERN.substitute(start_hour=start_hour, dow_list=dow_disp),
|
|
time_length = (split_pnt - left)*60,
|
|
z_value = z,
|
|
)
|
|
left = split_pnt
|
|
|
|
else: # no weekly pattern
|
|
hour_set = sorted(set([sub.hour() for sub in subs_group]))
|
|
if history_normal.split_pnts is None:
|
|
for start_hour in hour_set:
|
|
output_dict[start_hour] = CronOutputItem(
|
|
cron_expression = CRON_TEMPLATE_DAILY_PATTERN.substitute(start_hour=start_hour),
|
|
time_length = hour_block_length * 60,
|
|
z_value = history_normal.zs[0],
|
|
)
|
|
else:
|
|
left = 0
|
|
split_pnts = history_normal.split_pnts + [hour_block_length]
|
|
for split_pnt, z in zip(split_pnts, history_normal.zs):
|
|
for start_hour in hour_set:
|
|
key = (start_hour + left) % 24
|
|
output_dict[key] = CronOutputItem(
|
|
cron_expression = CRON_TEMPLATE_DAILY_PATTERN.substitute(start_hour=key),
|
|
time_length = (split_pnt - left) * 60,
|
|
z_value = z,
|
|
)
|
|
left = split_pnt
|
|
|
|
if weekly_with_offset:
|
|
output_dict = compensate_weekly_with_offset(output_dict, time_policy.offset)
|
|
|
|
# recommend sensitivity for IQR method of ITSI outlier exclusion
|
|
output_dict, value_collections = calc_sensitivity(
|
|
df,
|
|
df_values,
|
|
time_policy,
|
|
history_normal_dict,
|
|
output_dict,
|
|
subs,
|
|
label_method,
|
|
compare_thresholds=compare_thresholds
|
|
)
|
|
|
|
return TimepolicyOutput(
|
|
cron_dict = output_dict,
|
|
time_policy_desc = str(time_policy),
|
|
time_policy_score = round(time_policy.score, 3),
|
|
threshold_direction = detected_threshold_direction,
|
|
value_collections = value_collections
|
|
)
|
|
|
|
def compensate_weekly_with_offset(output_dict, day_offset):
|
|
def _dow_plus_one(dow):
|
|
return (dow + 1) % 7
|
|
def _cron_idx_dow(cron):
|
|
return cron.rindex(' ') + 1
|
|
def _cron_get_dow_head(cron):
|
|
idx = _cron_idx_dow(cron)
|
|
return cron[idx: idx+1]
|
|
def _cron_get_dows(cron):
|
|
return cron[_cron_idx_dow(cron) :]
|
|
def _cron_update_dow(cron, dow):
|
|
return cron[:_cron_idx_dow(cron)] + dow
|
|
def _cron_hour_plus_offset(cron):
|
|
idx_right = cron.index(' ', 2)
|
|
start_hour = int(cron[2:idx_right]) + day_offset
|
|
return cron[:2] + str(start_hour) + cron[idx_right:], start_hour
|
|
def _cron_hour_update(cron, hour):
|
|
idx_right = cron.index(' ', 2)
|
|
return cron[:2] + str(hour) + cron[idx_right:]
|
|
def _unpack_dows_str(dows):
|
|
return [int(d) for d in dows.split(',')]
|
|
|
|
adjusted_dict = {}
|
|
|
|
for cron_item in list(output_dict.values()):
|
|
cron_a, start_hour = _cron_hour_plus_offset(cron_item.cron_expression)
|
|
|
|
if start_hour < 24: # still the same day
|
|
key = _dict_key_for_weekly_cron(int(_cron_get_dow_head(cron_a)), start_hour)
|
|
adjusted_dict[key] = cron_item.update_cron_expression(cron_a)
|
|
continue
|
|
|
|
start_hour -= 24
|
|
dows = _cron_get_dows(cron_a)
|
|
|
|
if len(dows) == 1: # only one day, just move to the next day
|
|
dows = str(_dow_plus_one(int(dows)))
|
|
cron_a = _cron_update_dow(cron_a, dows)
|
|
cron_a = _cron_hour_update(cron_a, start_hour)
|
|
key = int(dows) * 100 + start_hour
|
|
adjusted_dict[key] = cron_item.update_cron_expression(cron_a)
|
|
continue
|
|
|
|
# more than one days, besides +1, need to separate out the last day in the dow list
|
|
dow_list = _unpack_dows_str(dows)
|
|
dow_list = [_dow_plus_one(d) for d in dow_list]
|
|
cron_1 = CRON_TEMPLATE_WEEKLY_PATTERN.substitute(
|
|
start_hour=start_hour,
|
|
dow_list=str_int_list_no_bracket(dow_list[:-1])
|
|
)
|
|
key = _dict_key_for_weekly_cron(dow_list[0], start_hour)
|
|
adjusted_dict[key] = cron_item.update_cron_expression(cron_1)
|
|
cron_2 = CRON_TEMPLATE_WEEKLY_PATTERN.substitute(
|
|
start_hour=start_hour,
|
|
dow_list=str(dow_list[-1])
|
|
)
|
|
key = _dict_key_for_weekly_cron(dow_list[-1], start_hour)
|
|
adjusted_dict[key] = cron_item.update_cron_expression(cron_2)
|
|
|
|
return adjusted_dict
|
|
|
|
def recommend_time_policy(df):
|
|
df = remove_super_spikes(df)
|
|
df_values = df[COL_VALUE].copy()
|
|
|
|
# 5-point smoothing
|
|
smoothed = df[COL_VALUE].rolling(5, center=True).mean()
|
|
smoothed.iloc[0] = smoothed.iloc[2]
|
|
smoothed.iloc[1] = smoothed.iloc[2]
|
|
smoothed.iloc[-1] = smoothed.iloc[-3]
|
|
smoothed.iloc[-2] = smoothed.iloc[-3]
|
|
df[COL_VALUE] = smoothed
|
|
|
|
hour_block_length, offset, has_weekend, offdays_start, score, label_method = _evaluate_seasonality_patterns(df)
|
|
|
|
# keep the modifications of remove_super_spikes(), for downstream tasks, including:
|
|
# calculating history normal, and calculating anomaly boundary
|
|
df[COL_VALUE] = df_values
|
|
return TimePolicy(hour_block_length, offset, has_weekend, offdays_start, score, label_method)
|
|
|
|
def _evaluate_seasonality_patterns(df):
|
|
df_resolution = get_resolution(df)
|
|
if df_resolution < DEFAULT_RESOLUTION: # downsample when the initial resolution is finer than the default
|
|
df = down_sample(df, resolution=DEFAULT_RESOLUTION, df_resolution=df_resolution)
|
|
|
|
time_0 = time.time()
|
|
best_score_week, offdays_start, offset, label_method = check_weekly_pattern(df)
|
|
|
|
logger.debug(f'weekly pattern ({best_score_week:.3f}), time spent: {time.time() - time_0:.2f}s')
|
|
|
|
time_0 = time.time()
|
|
hour_block_candidate = evaluate_hour_block(df)
|
|
logger.debug(f'hour-block pattern time spent: {time.time() - time_0:.2f}s')
|
|
|
|
time_0 = time.time()
|
|
score_half, workhour_start = evaluate_half_day(df)
|
|
logger.debug(f'half-day pattern ({score_half:.3f}), time spent: {time.time() - time_0:.2f}s')
|
|
|
|
if hour_block_candidate is None:
|
|
hour_block_score = -2.0
|
|
else:
|
|
hour_block_score = hour_block_candidate.silhouette
|
|
|
|
if best_score_week * COEF_FAVOR_WEEKLY > hour_block_score and best_score_week * COEF_FAVOR_WEEKLY > score_half:
|
|
return 0, offset, True, offdays_start, best_score_week, label_method
|
|
|
|
else:
|
|
if hour_block_score >= score_half:
|
|
offset = hour_block_candidate.offset
|
|
best_score = hour_block_candidate.silhouette
|
|
if best_score >= THRESHOLD_SILHOUETTE_LOW:
|
|
hour_block_length = hour_block_candidate.hourdelta
|
|
else:
|
|
hour_block_length = 0
|
|
offset = 0
|
|
else:
|
|
offset = workhour_start
|
|
best_score = score_half
|
|
if best_score >= THRESHOLD_SILHOUETTE_LOW:
|
|
hour_block_length = 12
|
|
else:
|
|
hour_block_length = 0
|
|
offset = 0
|
|
|
|
return hour_block_length, offset, False, offdays_start, best_score, None
|