from dataclasses import dataclass import pandas as pd from scipy.stats import skew from string import Template import time from util.dev_util import str_int_list_no_bracket from util.data_prepare import ( COL_VALUE, DAY_OF_WEEK_NAME, DEFAULT_Z, DEFAULT_SENSITIVITY_MULTIPLIER, down_sample, get_resolution, remove_super_spikes ) from util.pattern_evaluate import ( THRESHOLD_SILHOUETTE_LOW, THRESHOLD_SILHOUETTE_HIGH, check_weekly_pattern, evaluate_half_day, evaluate_hour_block ) from util.threshold_utils import sub_sequences_groupby from util.itsi_at_threshold import calc_history_normal_behavior, VARIATION_BUFFER from util.outlier_exclusion_recommend import calc_sensitivity from util.csc_output import THR_DIR_BOTH, THR_DIR_LO, THR_DIR_UP from util import setup_logging from six.moves import zip logger = setup_logging.get_logger() DAYS_IN_WEEK = [0, 1, 2, 3, 4, 5, 6] DAY_IN_MINUTES = 1440 WEEK_IN_MINUTES = 10080 NO_RECOMMENDATION = 'No Recommendation' INSUFFICIENT_DATA = 'INSUFFICIENT_DATA' NO_PATTERN = 'NO_PATTERN' PATTERN_SWITCH = 'PATTERN_SWITCH' SUCCESSFUL = 'SUCCESSFUL' DEFAULT_RESOLUTION = pd.Timedelta('15min') COEF_FAVOR_WEEKLY = 1.1 THRESHOLD_SKEW = 2.0 CRON_TEMPLATE_DAILY_PATTERN = Template('0 $start_hour * * *') CRON_TEMPLATE_WEEKLY_PATTERN = Template('0 $start_hour * * $dow_list') def get_output_no_pattern(mean, std): return {0: CronOutputItem('0 0 * * 0', WEEK_IN_MINUTES, DEFAULT_Z, mean, std)} @dataclass(frozen=True) class TimepolicyOutput: cron_dict : dict time_policy_desc : str time_policy_score : float threshold_direction : str value_collections : tuple @dataclass(frozen=True) class CronOutputItem: cron_expression : str time_length : int z_value : float mean : float = 0.0 std : float = 0.0 sensitivity : float = DEFAULT_SENSITIVITY_MULTIPLIER def __str__(self): return f'({self.cron_expression}, {self.time_length}, z={self.z_value:.3f}, m={self.mean:.3f}, s={self.std:.3f}, {self.sensitivity:.3f})' def update_cron_expression(self, cron_expression): return CronOutputItem( cron_expression = cron_expression, time_length = self.time_length, z_value = self.z_value, mean = self.mean, std = self.std, ) def update_mean_std_sensitivity(self, mean, std, sensitivity): return CronOutputItem( cron_expression = self.cron_expression, time_length = self.time_length, z_value = self.z_value, mean = mean, std = std, sensitivity = sensitivity ) def update_numeric_stats(self, mean, std, z_value, sensitivity): return CronOutputItem( cron_expression = self.cron_expression, time_length = self.time_length, z_value = max(z_value, self.z_value) * (1.0 + VARIATION_BUFFER), mean = mean, std = std, sensitivity = sensitivity ) class TimePolicy: def __init__(self, hour_block_length, offset, has_weekend, offdays_start=5, score=-2.0, label_method=None) -> None: self.hour_block_length = hour_block_length self.offset = offset self.has_weekend = has_weekend self.offdays_start = offdays_start self.score = score self.label_method = label_method def __str__(self) -> str: offset_str = '' if self.offset == 0 else f', offset={self.offset}h' if self.has_weekend: offdays_str = '' if self.offdays_start == 5 else f', {DAY_OF_WEEK_NAME[self.offdays_start]} {DAY_OF_WEEK_NAME[(self.offdays_start + 1) % 7]} off' return 'weekly seasonality' + offset_str + offdays_str if self.hour_block_length == 12: # half-day pattern return 'half-day pattern' + offset_str if self.hour_block_length > 0: return f'daily {self.hour_block_length}-hour block' + offset_str return 'no pattern' def _dict_key_for_weekly_cron(dow, start_hour): return dow * 100 + start_hour def detect_threshold_direction(values): skew_score = skew(values) if skew_score > THRESHOLD_SKEW: return THR_DIR_UP if skew_score < - THRESHOLD_SKEW: return THR_DIR_LO return THR_DIR_BOTH def generate_cron_output(df, filter_config, choose_direction=True, compare_thresholds=False): df_resolution = get_resolution(df) df = down_sample(df, resolution=df_resolution, df_resolution=df_resolution) # don't change resolution but fill na # make a copy of the df values before calling recommend_time_policy, b/c # df values may get modified there, for example, in remove_super_spikes df_values = df[COL_VALUE].copy() if choose_direction: detected_threshold_direction = detect_threshold_direction(df_values) else: detected_threshold_direction = None # Time policy by seasonality pattern detection time_policy = recommend_time_policy(df) if time_policy.score < THRESHOLD_SILHOUETTE_LOW: # no pattern detected logger.warning(f'The Silhouette Score is {time_policy.score:.3f}, which is less than the minimum threshold of {THRESHOLD_SILHOUETTE_LOW}.') return TimepolicyOutput( cron_dict = get_output_no_pattern(df[COL_VALUE].mean(), df[COL_VALUE].std()), time_policy_desc = NO_PATTERN, time_policy_score = round(time_policy.score, 3), threshold_direction = detected_threshold_direction, value_collections = None ) history_normal_dict, subs, label_method = calc_history_normal_behavior(df, time_policy, filter_config) if not time_policy.has_weekend: hour_block_length = time_policy.hour_block_length output_dict = {} day_offset = time_policy.offset weekly_with_offset = time_policy.has_weekend and time_policy.offset > 0 if weekly_with_offset: # reset day_offset to re-use the code in the for-loop below # put the logic of day_offset adjustment for the case of weekly-pattern-w-day-offset in isolation after that day_offset = 0 for label, subs_group in list(sub_sequences_groupby(subs, label_method).items()): history_normal = history_normal_dict[label] if time_policy.has_weekend: # in this case, subsequence is 1-day length dow_set = sorted(set([sub.dayofweek() for sub in subs_group])) dow_disp = str_int_list_no_bracket(dow_set) if history_normal.split_pnts is None: key = _dict_key_for_weekly_cron(dow_set[0], day_offset) output_dict[key] = CronOutputItem( cron_expression = CRON_TEMPLATE_WEEKLY_PATTERN.substitute(start_hour=day_offset, dow_list=dow_disp), time_length = DAY_IN_MINUTES, z_value = history_normal.zs[0], ) else: left = 0 split_pnts = history_normal.split_pnts + [24] for split_pnt, z in zip(split_pnts, history_normal.zs): start_hour=(day_offset + left) % 24 key = _dict_key_for_weekly_cron(dow_set[0], start_hour) output_dict[key] = CronOutputItem( cron_expression = CRON_TEMPLATE_WEEKLY_PATTERN.substitute(start_hour=start_hour, dow_list=dow_disp), time_length = (split_pnt - left)*60, z_value = z, ) left = split_pnt else: # no weekly pattern hour_set = sorted(set([sub.hour() for sub in subs_group])) if history_normal.split_pnts is None: for start_hour in hour_set: output_dict[start_hour] = CronOutputItem( cron_expression = CRON_TEMPLATE_DAILY_PATTERN.substitute(start_hour=start_hour), time_length = hour_block_length * 60, z_value = history_normal.zs[0], ) else: left = 0 split_pnts = history_normal.split_pnts + [hour_block_length] for split_pnt, z in zip(split_pnts, history_normal.zs): for start_hour in hour_set: key = (start_hour + left) % 24 output_dict[key] = CronOutputItem( cron_expression = CRON_TEMPLATE_DAILY_PATTERN.substitute(start_hour=key), time_length = (split_pnt - left) * 60, z_value = z, ) left = split_pnt if weekly_with_offset: output_dict = compensate_weekly_with_offset(output_dict, time_policy.offset) # recommend sensitivity for IQR method of ITSI outlier exclusion output_dict, value_collections = calc_sensitivity( df, df_values, time_policy, history_normal_dict, output_dict, subs, label_method, compare_thresholds=compare_thresholds ) return TimepolicyOutput( cron_dict = output_dict, time_policy_desc = str(time_policy), time_policy_score = round(time_policy.score, 3), threshold_direction = detected_threshold_direction, value_collections = value_collections ) def compensate_weekly_with_offset(output_dict, day_offset): def _dow_plus_one(dow): return (dow + 1) % 7 def _cron_idx_dow(cron): return cron.rindex(' ') + 1 def _cron_get_dow_head(cron): idx = _cron_idx_dow(cron) return cron[idx: idx+1] def _cron_get_dows(cron): return cron[_cron_idx_dow(cron) :] def _cron_update_dow(cron, dow): return cron[:_cron_idx_dow(cron)] + dow def _cron_hour_plus_offset(cron): idx_right = cron.index(' ', 2) start_hour = int(cron[2:idx_right]) + day_offset return cron[:2] + str(start_hour) + cron[idx_right:], start_hour def _cron_hour_update(cron, hour): idx_right = cron.index(' ', 2) return cron[:2] + str(hour) + cron[idx_right:] def _unpack_dows_str(dows): return [int(d) for d in dows.split(',')] adjusted_dict = {} for cron_item in list(output_dict.values()): cron_a, start_hour = _cron_hour_plus_offset(cron_item.cron_expression) if start_hour < 24: # still the same day key = _dict_key_for_weekly_cron(int(_cron_get_dow_head(cron_a)), start_hour) adjusted_dict[key] = cron_item.update_cron_expression(cron_a) continue start_hour -= 24 dows = _cron_get_dows(cron_a) if len(dows) == 1: # only one day, just move to the next day dows = str(_dow_plus_one(int(dows))) cron_a = _cron_update_dow(cron_a, dows) cron_a = _cron_hour_update(cron_a, start_hour) key = int(dows) * 100 + start_hour adjusted_dict[key] = cron_item.update_cron_expression(cron_a) continue # more than one days, besides +1, need to separate out the last day in the dow list dow_list = _unpack_dows_str(dows) dow_list = [_dow_plus_one(d) for d in dow_list] cron_1 = CRON_TEMPLATE_WEEKLY_PATTERN.substitute( start_hour=start_hour, dow_list=str_int_list_no_bracket(dow_list[:-1]) ) key = _dict_key_for_weekly_cron(dow_list[0], start_hour) adjusted_dict[key] = cron_item.update_cron_expression(cron_1) cron_2 = CRON_TEMPLATE_WEEKLY_PATTERN.substitute( start_hour=start_hour, dow_list=str(dow_list[-1]) ) key = _dict_key_for_weekly_cron(dow_list[-1], start_hour) adjusted_dict[key] = cron_item.update_cron_expression(cron_2) return adjusted_dict def recommend_time_policy(df): df = remove_super_spikes(df) df_values = df[COL_VALUE].copy() # 5-point smoothing smoothed = df[COL_VALUE].rolling(5, center=True).mean() smoothed.iloc[0] = smoothed.iloc[2] smoothed.iloc[1] = smoothed.iloc[2] smoothed.iloc[-1] = smoothed.iloc[-3] smoothed.iloc[-2] = smoothed.iloc[-3] df[COL_VALUE] = smoothed hour_block_length, offset, has_weekend, offdays_start, score, label_method = _evaluate_seasonality_patterns(df) # keep the modifications of remove_super_spikes(), for downstream tasks, including: # calculating history normal, and calculating anomaly boundary df[COL_VALUE] = df_values return TimePolicy(hour_block_length, offset, has_weekend, offdays_start, score, label_method) def _evaluate_seasonality_patterns(df): df_resolution = get_resolution(df) if df_resolution < DEFAULT_RESOLUTION: # downsample when the initial resolution is finer than the default df = down_sample(df, resolution=DEFAULT_RESOLUTION, df_resolution=df_resolution) time_0 = time.time() best_score_week, offdays_start, offset, label_method = check_weekly_pattern(df) logger.debug(f'weekly pattern ({best_score_week:.3f}), time spent: {time.time() - time_0:.2f}s') time_0 = time.time() hour_block_candidate = evaluate_hour_block(df) logger.debug(f'hour-block pattern time spent: {time.time() - time_0:.2f}s') time_0 = time.time() score_half, workhour_start = evaluate_half_day(df) logger.debug(f'half-day pattern ({score_half:.3f}), time spent: {time.time() - time_0:.2f}s') if hour_block_candidate is None: hour_block_score = -2.0 else: hour_block_score = hour_block_candidate.silhouette if best_score_week * COEF_FAVOR_WEEKLY > hour_block_score and best_score_week * COEF_FAVOR_WEEKLY > score_half: return 0, offset, True, offdays_start, best_score_week, label_method else: if hour_block_score >= score_half: offset = hour_block_candidate.offset best_score = hour_block_candidate.silhouette if best_score >= THRESHOLD_SILHOUETTE_LOW: hour_block_length = hour_block_candidate.hourdelta else: hour_block_length = 0 offset = 0 else: offset = workhour_start best_score = score_half if best_score >= THRESHOLD_SILHOUETTE_LOW: hour_block_length = 12 else: hour_block_length = 0 offset = 0 return hour_block_length, offset, False, offdays_start, best_score, None