import datetime as dt import numpy as np import pandas as pd from string import Template import sys import time from util.dev_util import str_int_list_no_bracket, enable_multi_resolution from util.data_prepare import COL_VALUE, LOG_TRANSFORM_THRESHOLD, DAY_OF_WEEK_NAME from util.data_prepare import down_sample, get_resolution, log_transform_statistics, remove_super_spikes from util.pattern_evaluate import THRESHOLD_SILHOUETTE_LOW, THRESHOLD_SILHOUETTE_MEDIUM, THRESHOLD_SILHOUETTE_HIGH from util.pattern_evaluate import check_weekly_pattern, evaluate_half_day, evaluate_hour_block from util.threshold_utils import subsequence_by_time_policy, sub_sequences_groupby from util.itsi_at_threshold import get_history_normal_behavior from util import setup_logging logger = setup_logging.get_logger() DAYS_IN_WEEK = [0, 1, 2, 3, 4, 5, 6] DAY_IN_SECOND = 1440 WEEK_IN_SECOND = 10080 NO_RECOMMENDATION = 'No Recommendation' INSUFFICIENT_DATA = 'INSUFFICIENT_DATA' NO_PATTERN = 'NO_PATTERN' PATTERN_SWITCH = 'PATTERN_SWITCH' CRON_TEMPLATE_DAILY_PATTERN = Template('0 $start_hour * * *') CRON_TEMPLATE_WEEKLY_PATTERN = Template('0 $start_hour * * $dow_list') class TimePolicy: def __init__(self, hour_block_length, offset, has_weekend, offdays_start=5, score=-2.0, label_method=None, log_trans=False) -> None: self.hour_block_length = hour_block_length self.offset = offset self.has_weekend = has_weekend self.offdays_start = offdays_start self.score = score self.label_method = label_method self.log_trans = log_trans def __str__(self) -> str: if self.has_weekend: week_clause = 'weekly seasonality' else: week_clause = '' if self.hour_block_length == 0: # hour pattern score below threshold rslt = week_clause + ('' if self.offset == 0 else f', offset={self.offset}h') + ('' if self.offdays_start == 5 else f', {DAY_OF_WEEK_NAME[self.offdays_start]} {DAY_OF_WEEK_NAME[(self.offdays_start + 1) % 7]} off') else: rslt = f'daily {int(self.hour_block_length)}-hour blocks' + ('' if self.offset == 0 else f', offset={self.offset}-hour') + week_clause return (rslt if rslt else 'no pattern') def _dict_key_for_weekly_cron(dow, start_hour): return dow * 100 + start_hour def get_cron_output(df, time_policy=None): # Checking for possible 'No Recommendation' reasons df_delta = df.index[-1] - df.index[0] if df_delta <= dt.timedelta(days=1): logger.warning(f'The difference between your ending time and starting time is {df_delta}. At least 1 day (24h) of data is required for time policy recommendation.') return [], INSUFFICIENT_DATA, -1.0 if time_policy is None: time_policy = recommend_time_policy(df) if time_policy.score < THRESHOLD_SILHOUETTE_LOW: # no pattern detected logger.warning(f'The Silhouette Score is {time_policy.score}, which is less than the minimum threshold of {THRESHOLD_SILHOUETTE_LOW}.') return [], NO_PATTERN, round(time_policy.score, 3) df_resolution = get_resolution(df) df = down_sample(df, resolution=df_resolution, df_resolution=df_resolution) # don' change resolution but fill na subs, label_method = subsequence_by_time_policy(df, time_policy) pattern_desc = str(time_policy) if not time_policy.has_weekend: hour_block_length = time_policy.hour_block_length cron_dura_thres_dict = {} day_offset = time_policy.offset weekly_with_offset = time_policy.has_weekend and time_policy.offset > 0 if weekly_with_offset: # reset day_offset to re-use the code in the for-loop below # put the logic of day_offset adjustment for the case of weekly-pattern-w-day-offset in isolation after that day_offset = 0 for _, subs_group in sub_sequences_groupby(subs, label_method).items(): history_normal = get_history_normal_behavior(subs_group) logger.debug(f'{str(history_normal)}') if time_policy.has_weekend: # in this case, subsequence is 1-day length dow_set = sorted(set([sub.dayofweek() for sub in subs_group])) dow_disp = str_int_list_no_bracket(dow_set) if history_normal.split_pnts is None: key = _dict_key_for_weekly_cron(dow_set[0], day_offset) cron_dura_thres_dict[key] = ( CRON_TEMPLATE_WEEKLY_PATTERN.substitute(start_hour=day_offset, dow_list=dow_disp), DAY_IN_SECOND, history_normal.zs[0] ) else: left = 0 split_pnts = history_normal.split_pnts + [24] for split_pnt, z in zip(split_pnts, history_normal.zs): start_hour=(day_offset + left) % 24 key = _dict_key_for_weekly_cron(dow_set[0], start_hour) cron_dura_thres_dict[key] = ( CRON_TEMPLATE_WEEKLY_PATTERN.substitute(start_hour=start_hour, dow_list=dow_disp), (split_pnt - left)*60, z ) left = split_pnt else: # no weekly pattern hour_set = sorted(set([sub.hour() for sub in subs_group])) if history_normal.split_pnts is None: for start_hour in hour_set: cron_dura_thres_dict[start_hour] = ( CRON_TEMPLATE_DAILY_PATTERN.substitute(start_hour=start_hour), hour_block_length * 60, history_normal.zs[0] ) else: left = 0 split_pnts = history_normal.split_pnts + [hour_block_length] for split_pnt, z in zip(split_pnts, history_normal.zs): for start_hour in hour_set: key = (start_hour + left) % 24 cron_dura_thres_dict[key] = ( CRON_TEMPLATE_DAILY_PATTERN.substitute(start_hour=key), (split_pnt - left) * 60, z ) left = split_pnt if weekly_with_offset: cron_dura_thres_dict = compensate_weekly_with_offset(cron_dura_thres_dict, time_policy.offset) return [cron_dura_thres_dict[k] for k in sorted(cron_dura_thres_dict.keys())], pattern_desc, round(time_policy.score, 3) def compensate_weekly_with_offset(cron_dura_thres_dict, day_offset): def _dow_plus_one(dow): return (dow + 1) % 7 def _cron_idx_dow(cron): return cron.rindex(' ') + 1 def _cron_get_dow_head(cron): idx = _cron_idx_dow(cron) return cron[idx: idx+1] def _cron_get_dows(cron): return cron[_cron_idx_dow(cron) :] def _cron_update_dow(cron, dow): return cron[:_cron_idx_dow(cron)] + dow def _cron_hour_plus_offset(cron): idx_right = cron.index(' ', 2) start_hour = int(cron[2:idx_right]) + day_offset return cron[:2] + str(start_hour) + cron[idx_right:], start_hour def _cron_hour_update(cron, hour): idx_right = cron.index(' ', 2) return cron[:2] + str(hour) + cron[idx_right:] def _unpack_dows_str(dows): return [int(d) for d in dows.split(',')] adjusted_dict = {} for (cron, dura, thres) in cron_dura_thres_dict.values(): cron_a, start_hour = _cron_hour_plus_offset(cron) if start_hour < 24: # still the same day key = _dict_key_for_weekly_cron(int(_cron_get_dow_head(cron_a)), start_hour) adjusted_dict[key] = (cron_a, dura, thres) continue start_hour -= 24 dows = _cron_get_dows(cron_a) if len(dows) == 1: # only one day, just move to the next day dows = str(_dow_plus_one(int(dows))) cron_a = _cron_update_dow(cron_a, dows) cron_a = _cron_hour_update(cron_a, start_hour) key = int(dows) * 100 + start_hour adjusted_dict[key] = (cron_a, dura, thres) continue # more than one days, besides +1, need to separate out the last day in the dow list dow_list = _unpack_dows_str(dows) dow_list = [_dow_plus_one(d) for d in dow_list] cron_1 = CRON_TEMPLATE_WEEKLY_PATTERN.substitute( start_hour=start_hour, dow_list=str_int_list_no_bracket(dow_list[:-1]) ) key = _dict_key_for_weekly_cron(dow_list[0], start_hour) adjusted_dict[key] = (cron_1, dura, thres) cron_2 = CRON_TEMPLATE_WEEKLY_PATTERN.substitute( start_hour=start_hour, dow_list=str(dow_list[-1]) ) key = _dict_key_for_weekly_cron(dow_list[-1], start_hour) adjusted_dict[key] = (cron_2, dura, thres) return adjusted_dict def unpack_history_normal(df, time_policy): if time_policy.score < THRESHOLD_SILHOUETTE_LOW: # no pattern detected return [] df_resolution = get_resolution(df) df = down_sample(df, df_resolution) # don' change resolution but fill na subs, label_method = subsequence_by_time_policy(df, time_policy) if time_policy.has_weekend: dow_thresholds_dict = {} else: hour_thresholds_dict = {} hour_block_length = time_policy.hour_block_length for _, subs_group in sub_sequences_groupby(subs, label_method).items(): history_normal = get_history_normal_behavior(subs_group) if time_policy.has_weekend: # in this case, subsequence is 1-day length dow_set = set([sub.dayofweek() for sub in subs_group]) if history_normal.split_pnts is None: day_thresholds = list(history_normal.zs) * 24 else: day_thresholds = [] left = 0 split_pnts = history_normal.split_pnts + [24] for split_pnt, z in zip(split_pnts, history_normal.zs): day_thresholds += [z] * (split_pnt - left) left = split_pnt for dow in dow_set: dow_thresholds_dict[dow] = day_thresholds else: # no weekly pattern hour_set = set([sub.hour() for sub in subs_group]) if history_normal.split_pnts is None: hours_thresholds = list(history_normal.zs) * hour_block_length else: hours_thresholds = [] left = 0 split_pnts = history_normal.split_pnts + [hour_block_length] for split_pnt, z in zip(split_pnts, history_normal.zs): hours_thresholds += [z] * (split_pnt - left) left = split_pnt for hour in hour_set: hour_thresholds_dict[hour] = hours_thresholds thresholds = [] if time_policy.has_weekend: for dow in range(7): thresholds += dow_thresholds_dict[dow] else: for hour in range(0, 24, hour_block_length): thresholds += hour_thresholds_dict[hour + time_policy.offset] if time_policy.offset == 0: return thresholds else: return thresholds[-time_policy.offset:] + thresholds[:len(thresholds) - time_policy.offset] def recommend_time_policy(df): df = remove_super_spikes(df) # 5-point smoothing df[COL_VALUE] = df[COL_VALUE].rolling(5, center=True).mean() df[COL_VALUE].iloc[0] = df[COL_VALUE].iloc[2] df[COL_VALUE].iloc[1] = df[COL_VALUE].iloc[2] df[COL_VALUE].iloc[-1] = df[COL_VALUE].iloc[-3] df[COL_VALUE].iloc[-2] = df[COL_VALUE].iloc[-3] log_trans = False range_ratio, log_values, c = log_transform_statistics(df[COL_VALUE]) logger.debug(f'range_ratio={range_ratio:.3f}') if range_ratio > LOG_TRANSFORM_THRESHOLD: logger.info('Apply Log transformation in recommend_time_policy()') df[COL_VALUE] = log_values log_trans = True hour_block_length, offset, has_weekend, offdays_start, score, label_method = _evaluate_seasonality_patterns_at_multi_resolution(df) if log_trans: # invert log-trans on COL_VALUE df[COL_VALUE] = np.exp(log_values) - c return TimePolicy(hour_block_length, offset, has_weekend, offdays_start, score, label_method, log_trans) def _evaluate_seasonality_patterns_at_multi_resolution(df): df_resolution = get_resolution(df) if enable_multi_resolution(): resolution_list = ['15min', '30min', '60min'] if df_resolution > pd.Timedelta(resolution_list[2]): resolution_list = [df_resolution] else: # disable multi-resolution by default to reduce latency resolution = pd.Timedelta('15min') if df_resolution > resolution: resolution_list = [df_resolution] else: resolution_list = [resolution] best_score = 0.0 result = (0, 0, False, 0, -2.0, None) for resolution in resolution_list: if df_resolution > pd.Timedelta(resolution): continue df = down_sample(df, resolution=resolution) hour_block_length, offset, has_weekend, offdays_start, score, label_method = _evaluate_seasonality_patterns(df) # time_policy = TimePolicy(hour_block_length, offset, has_weekend, offdays_start, score) # logger.debug(f'{str(time_policy)}; (resolution={resolution})') if score > best_score: best_score = score result = (hour_block_length, offset, has_weekend, offdays_start, score, label_method) if best_score > THRESHOLD_SILHOUETTE_HIGH: break #TODO return result def _evaluate_seasonality_patterns(df): time_0 = time.time() best_score_week, offdays_start, offset, label_method = check_weekly_pattern(df) logger.debug(f'weekly pattern time spent: {time.time() - time_0:.2f}s') time_0 = time.time() hour_block_candidate = evaluate_hour_block(df) logger.debug(f'hour-block pattern time spent: {time.time() - time_0:.2f}s') time_0 = time.time() score_half, workhour_start = evaluate_half_day(df) logger.debug(f'half-day pattern time spent: {time.time() - time_0:.2f}s') if hour_block_candidate is None: hour_block_score = -2.0 else: hour_block_score = hour_block_candidate.silhouette has_weekend = best_score_week > THRESHOLD_SILHOUETTE_LOW # ONLY add hour-block or workhour pattern when it improves the score if has_weekend and best_score_week > hour_block_candidate.silhouette and best_score_week > score_half: return 0, offset, has_weekend, offdays_start, best_score_week, label_method else: if hour_block_score >= score_half: offset = hour_block_candidate.offset best_score = hour_block_candidate.silhouette if best_score >= THRESHOLD_SILHOUETTE_LOW: hour_block_length = hour_block_candidate.hourdelta else: hour_block_length = 0 offset = 0 else: offset = workhour_start best_score = score_half if best_score >= THRESHOLD_SILHOUETTE_LOW: hour_block_length = 12 else: hour_block_length = 0 offset = 0 return hour_block_length, offset, False, offdays_start, best_score, None if __name__ == '__main__': # sample usage of time policy recommend import os import time from data_prepare import NAB_TIMESTAMP_FORMAT from data_prepare import load_time_series_from_file, down_sample curr_dir = os.path.dirname(os.path.realpath(__file__)) timeseries_name = 'nyc_taxi' path_data = os.path.join(curr_dir, '../..', f'data/{timeseries_name}.csv') print('time series:', timeseries_name, file=sys.stderr) start_time = time.time() os.environ['TIMEPOLICY_VERBOSE'] = 'False' df = load_time_series_from_file(path_data, NAB_TIMESTAMP_FORMAT) df = down_sample(df) time_policy, scores_lists = recommend_time_policy(df) print(str(time_policy) + f' (time spent: {time.time() - start_time :.2f})', file=sys.stderr)