You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

366 lines
14 KiB

from dataclasses import dataclass
import pandas as pd
from scipy.stats import skew
from string import Template
import time
from util.dev_util import str_int_list_no_bracket
from util.data_prepare import (
COL_VALUE, DAY_OF_WEEK_NAME, DEFAULT_Z, DEFAULT_SENSITIVITY_MULTIPLIER,
down_sample, get_resolution, remove_super_spikes
)
from util.pattern_evaluate import (
THRESHOLD_SILHOUETTE_LOW, THRESHOLD_SILHOUETTE_HIGH,
check_weekly_pattern, evaluate_half_day, evaluate_hour_block
)
from util.threshold_utils import sub_sequences_groupby
from util.itsi_at_threshold import calc_history_normal_behavior, VARIATION_BUFFER
from util.outlier_exclusion_recommend import calc_sensitivity
from util.csc_output import THR_DIR_BOTH, THR_DIR_LO, THR_DIR_UP
from util import setup_logging
from six.moves import zip
logger = setup_logging.get_logger()
DAYS_IN_WEEK = [0, 1, 2, 3, 4, 5, 6]
DAY_IN_MINUTES = 1440
WEEK_IN_MINUTES = 10080
NO_RECOMMENDATION = 'No Recommendation'
INSUFFICIENT_DATA = 'INSUFFICIENT_DATA'
NO_PATTERN = 'NO_PATTERN'
PATTERN_SWITCH = 'PATTERN_SWITCH'
SUCCESSFUL = 'SUCCESSFUL'
DEFAULT_RESOLUTION = pd.Timedelta('15min')
COEF_FAVOR_WEEKLY = 1.1
THRESHOLD_SKEW = 2.0
CRON_TEMPLATE_DAILY_PATTERN = Template('0 $start_hour * * *')
CRON_TEMPLATE_WEEKLY_PATTERN = Template('0 $start_hour * * $dow_list')
def get_output_no_pattern(mean, std):
return {0: CronOutputItem('0 0 * * 0', WEEK_IN_MINUTES, DEFAULT_Z, mean, std)}
@dataclass(frozen=True)
class TimepolicyOutput:
cron_dict : dict
time_policy_desc : str
time_policy_score : float
threshold_direction : str
value_collections : tuple
@dataclass(frozen=True)
class CronOutputItem:
cron_expression : str
time_length : int
z_value : float
mean : float = 0.0
std : float = 0.0
sensitivity : float = DEFAULT_SENSITIVITY_MULTIPLIER
def __str__(self):
return f'({self.cron_expression}, {self.time_length}, z={self.z_value:.3f}, m={self.mean:.3f}, s={self.std:.3f}, {self.sensitivity:.3f})'
def update_cron_expression(self, cron_expression):
return CronOutputItem(
cron_expression = cron_expression,
time_length = self.time_length,
z_value = self.z_value,
mean = self.mean,
std = self.std,
)
def update_mean_std_sensitivity(self, mean, std, sensitivity):
return CronOutputItem(
cron_expression = self.cron_expression,
time_length = self.time_length,
z_value = self.z_value,
mean = mean,
std = std,
sensitivity = sensitivity
)
def update_numeric_stats(self, mean, std, z_value, sensitivity):
return CronOutputItem(
cron_expression = self.cron_expression,
time_length = self.time_length,
z_value = max(z_value, self.z_value) * (1.0 + VARIATION_BUFFER),
mean = mean,
std = std,
sensitivity = sensitivity
)
class TimePolicy:
def __init__(self, hour_block_length, offset, has_weekend, offdays_start=5, score=-2.0, label_method=None) -> None:
self.hour_block_length = hour_block_length
self.offset = offset
self.has_weekend = has_weekend
self.offdays_start = offdays_start
self.score = score
self.label_method = label_method
def __str__(self) -> str:
offset_str = '' if self.offset == 0 else f', offset={self.offset}h'
if self.has_weekend:
offdays_str = '' if self.offdays_start == 5 else f', {DAY_OF_WEEK_NAME[self.offdays_start]} {DAY_OF_WEEK_NAME[(self.offdays_start + 1) % 7]} off'
return 'weekly seasonality' + offset_str + offdays_str
if self.hour_block_length == 12: # half-day pattern
return 'half-day pattern' + offset_str
if self.hour_block_length > 0:
return f'daily {self.hour_block_length}-hour block' + offset_str
return 'no pattern'
def _dict_key_for_weekly_cron(dow, start_hour):
return dow * 100 + start_hour
def detect_threshold_direction(values):
skew_score = skew(values)
if skew_score > THRESHOLD_SKEW:
return THR_DIR_UP
if skew_score < - THRESHOLD_SKEW:
return THR_DIR_LO
return THR_DIR_BOTH
def generate_cron_output(df, filter_config, choose_direction=True, compare_thresholds=False):
df_resolution = get_resolution(df)
df = down_sample(df, resolution=df_resolution, df_resolution=df_resolution) # don't change resolution but fill na
# make a copy of the df values before calling recommend_time_policy, b/c
# df values may get modified there, for example, in remove_super_spikes
df_values = df[COL_VALUE].copy()
if choose_direction:
detected_threshold_direction = detect_threshold_direction(df_values)
else:
detected_threshold_direction = None
# Time policy by seasonality pattern detection
time_policy = recommend_time_policy(df)
if time_policy.score < THRESHOLD_SILHOUETTE_LOW: # no pattern detected
logger.warning(f'The Silhouette Score is {time_policy.score:.3f}, which is less than the minimum threshold of {THRESHOLD_SILHOUETTE_LOW}.')
return TimepolicyOutput(
cron_dict = get_output_no_pattern(df[COL_VALUE].mean(), df[COL_VALUE].std()),
time_policy_desc = NO_PATTERN,
time_policy_score = round(time_policy.score, 3),
threshold_direction = detected_threshold_direction,
value_collections = None
)
history_normal_dict, subs, label_method = calc_history_normal_behavior(df, time_policy, filter_config)
if not time_policy.has_weekend:
hour_block_length = time_policy.hour_block_length
output_dict = {}
day_offset = time_policy.offset
weekly_with_offset = time_policy.has_weekend and time_policy.offset > 0
if weekly_with_offset:
# reset day_offset to re-use the code in the for-loop below
# put the logic of day_offset adjustment for the case of weekly-pattern-w-day-offset in isolation after that
day_offset = 0
for label, subs_group in list(sub_sequences_groupby(subs, label_method).items()):
history_normal = history_normal_dict[label]
if time_policy.has_weekend: # in this case, subsequence is 1-day length
dow_set = sorted(set([sub.dayofweek() for sub in subs_group]))
dow_disp = str_int_list_no_bracket(dow_set)
if history_normal.split_pnts is None:
key = _dict_key_for_weekly_cron(dow_set[0], day_offset)
output_dict[key] = CronOutputItem(
cron_expression = CRON_TEMPLATE_WEEKLY_PATTERN.substitute(start_hour=day_offset, dow_list=dow_disp),
time_length = DAY_IN_MINUTES,
z_value = history_normal.zs[0],
)
else:
left = 0
split_pnts = history_normal.split_pnts + [24]
for split_pnt, z in zip(split_pnts, history_normal.zs):
start_hour=(day_offset + left) % 24
key = _dict_key_for_weekly_cron(dow_set[0], start_hour)
output_dict[key] = CronOutputItem(
cron_expression = CRON_TEMPLATE_WEEKLY_PATTERN.substitute(start_hour=start_hour, dow_list=dow_disp),
time_length = (split_pnt - left)*60,
z_value = z,
)
left = split_pnt
else: # no weekly pattern
hour_set = sorted(set([sub.hour() for sub in subs_group]))
if history_normal.split_pnts is None:
for start_hour in hour_set:
output_dict[start_hour] = CronOutputItem(
cron_expression = CRON_TEMPLATE_DAILY_PATTERN.substitute(start_hour=start_hour),
time_length = hour_block_length * 60,
z_value = history_normal.zs[0],
)
else:
left = 0
split_pnts = history_normal.split_pnts + [hour_block_length]
for split_pnt, z in zip(split_pnts, history_normal.zs):
for start_hour in hour_set:
key = (start_hour + left) % 24
output_dict[key] = CronOutputItem(
cron_expression = CRON_TEMPLATE_DAILY_PATTERN.substitute(start_hour=key),
time_length = (split_pnt - left) * 60,
z_value = z,
)
left = split_pnt
if weekly_with_offset:
output_dict = compensate_weekly_with_offset(output_dict, time_policy.offset)
# recommend sensitivity for IQR method of ITSI outlier exclusion
output_dict, value_collections = calc_sensitivity(
df,
df_values,
time_policy,
history_normal_dict,
output_dict,
subs,
label_method,
compare_thresholds=compare_thresholds
)
return TimepolicyOutput(
cron_dict = output_dict,
time_policy_desc = str(time_policy),
time_policy_score = round(time_policy.score, 3),
threshold_direction = detected_threshold_direction,
value_collections = value_collections
)
def compensate_weekly_with_offset(output_dict, day_offset):
def _dow_plus_one(dow):
return (dow + 1) % 7
def _cron_idx_dow(cron):
return cron.rindex(' ') + 1
def _cron_get_dow_head(cron):
idx = _cron_idx_dow(cron)
return cron[idx: idx+1]
def _cron_get_dows(cron):
return cron[_cron_idx_dow(cron) :]
def _cron_update_dow(cron, dow):
return cron[:_cron_idx_dow(cron)] + dow
def _cron_hour_plus_offset(cron):
idx_right = cron.index(' ', 2)
start_hour = int(cron[2:idx_right]) + day_offset
return cron[:2] + str(start_hour) + cron[idx_right:], start_hour
def _cron_hour_update(cron, hour):
idx_right = cron.index(' ', 2)
return cron[:2] + str(hour) + cron[idx_right:]
def _unpack_dows_str(dows):
return [int(d) for d in dows.split(',')]
adjusted_dict = {}
for cron_item in list(output_dict.values()):
cron_a, start_hour = _cron_hour_plus_offset(cron_item.cron_expression)
if start_hour < 24: # still the same day
key = _dict_key_for_weekly_cron(int(_cron_get_dow_head(cron_a)), start_hour)
adjusted_dict[key] = cron_item.update_cron_expression(cron_a)
continue
start_hour -= 24
dows = _cron_get_dows(cron_a)
if len(dows) == 1: # only one day, just move to the next day
dows = str(_dow_plus_one(int(dows)))
cron_a = _cron_update_dow(cron_a, dows)
cron_a = _cron_hour_update(cron_a, start_hour)
key = int(dows) * 100 + start_hour
adjusted_dict[key] = cron_item.update_cron_expression(cron_a)
continue
# more than one days, besides +1, need to separate out the last day in the dow list
dow_list = _unpack_dows_str(dows)
dow_list = [_dow_plus_one(d) for d in dow_list]
cron_1 = CRON_TEMPLATE_WEEKLY_PATTERN.substitute(
start_hour=start_hour,
dow_list=str_int_list_no_bracket(dow_list[:-1])
)
key = _dict_key_for_weekly_cron(dow_list[0], start_hour)
adjusted_dict[key] = cron_item.update_cron_expression(cron_1)
cron_2 = CRON_TEMPLATE_WEEKLY_PATTERN.substitute(
start_hour=start_hour,
dow_list=str(dow_list[-1])
)
key = _dict_key_for_weekly_cron(dow_list[-1], start_hour)
adjusted_dict[key] = cron_item.update_cron_expression(cron_2)
return adjusted_dict
def recommend_time_policy(df):
df = remove_super_spikes(df)
df_values = df[COL_VALUE].copy()
# 5-point smoothing
smoothed = df[COL_VALUE].rolling(5, center=True).mean()
smoothed.iloc[0] = smoothed.iloc[2]
smoothed.iloc[1] = smoothed.iloc[2]
smoothed.iloc[-1] = smoothed.iloc[-3]
smoothed.iloc[-2] = smoothed.iloc[-3]
df[COL_VALUE] = smoothed
hour_block_length, offset, has_weekend, offdays_start, score, label_method = _evaluate_seasonality_patterns(df)
# keep the modifications of remove_super_spikes(), for downstream tasks, including:
# calculating history normal, and calculating anomaly boundary
df[COL_VALUE] = df_values
return TimePolicy(hour_block_length, offset, has_weekend, offdays_start, score, label_method)
def _evaluate_seasonality_patterns(df):
df_resolution = get_resolution(df)
if df_resolution < DEFAULT_RESOLUTION: # downsample when the initial resolution is finer than the default
df = down_sample(df, resolution=DEFAULT_RESOLUTION, df_resolution=df_resolution)
time_0 = time.time()
best_score_week, offdays_start, offset, label_method = check_weekly_pattern(df)
logger.debug(f'weekly pattern ({best_score_week:.3f}), time spent: {time.time() - time_0:.2f}s')
time_0 = time.time()
hour_block_candidate = evaluate_hour_block(df)
logger.debug(f'hour-block pattern time spent: {time.time() - time_0:.2f}s')
time_0 = time.time()
score_half, workhour_start = evaluate_half_day(df)
logger.debug(f'half-day pattern ({score_half:.3f}), time spent: {time.time() - time_0:.2f}s')
if hour_block_candidate is None:
hour_block_score = -2.0
else:
hour_block_score = hour_block_candidate.silhouette
if best_score_week * COEF_FAVOR_WEEKLY > hour_block_score and best_score_week * COEF_FAVOR_WEEKLY > score_half:
return 0, offset, True, offdays_start, best_score_week, label_method
else:
if hour_block_score >= score_half:
offset = hour_block_candidate.offset
best_score = hour_block_candidate.silhouette
if best_score >= THRESHOLD_SILHOUETTE_LOW:
hour_block_length = hour_block_candidate.hourdelta
else:
hour_block_length = 0
offset = 0
else:
offset = workhour_start
best_score = score_half
if best_score >= THRESHOLD_SILHOUETTE_LOW:
hour_block_length = 12
else:
hour_block_length = 0
offset = 0
return hour_block_length, offset, False, offdays_start, best_score, None