from datetime import timedelta from functools import partial import numpy as np from sklearn import metrics from scipy.special import expit from util.data_prepare import detrend_daily, COL_VALUE, EPSILON, resample_with_offset from util.dev_util import format_float_list from util.sub_sequence import df_to_day_sequences, df_to_hour_sequences, df_to_half_day_sequences, pack_sub_values_with_filter from util import setup_logging from six.moves import range, zip logger = setup_logging.get_logger() HOURDELTA_LIST = [1, 2, 3, 4] THRESHOLD_SILHOUETTE_LOW = 0.1 THRESHOLD_SILHOUETTE_MEDIUM = 0.4 THRESHOLD_SILHOUETTE_HIGH = 0.6 THRESHOLD_IRREGULAR_SEQUENCE_LENGTH = 0.7 # the general weekly pattern: seven days in a week can be labeled to all possible clustering schemes LABEL_WEEKLY_PATTERN = lambda dow_label_dict, seq: dow_label_dict[seq.dayofweek()] LABEL_WEEKLY_OFFDAYS = lambda offday_start, seq: 1 if seq.dayofweek() == offday_start or seq.dayofweek() == (offday_start + 1) % 7 else 0 LABEL_WEEKEND = lambda seq: 0 if seq.dayofweek() < 5 else 1 # Monday=0, Sunday=6 LABEL_HOUR = lambda seq: seq.hour() LABEL_ALTER_HOUR_BLOCK = lambda hourdelta, offset, seq: 0 if ((seq.hour() - offset) // hourdelta) % 2 else 1 def evaluate_clustering_quality(in_sequences, get_seq_label, kee_all_sequences=False): lengths = [s.length for s in in_sequences] counts = np.bincount(lengths) len_seq = np.argmax(counts) # use the most frequent one as the sequence length of this data set sequences = [s for s in in_sequences if s.length == len_seq] # make sure sequences having same length cnt_regular_seq = len(sequences) if cnt_regular_seq < len(in_sequences) and (cnt_regular_seq < THRESHOLD_IRREGULAR_SEQUENCE_LENGTH * len(in_sequences)): logger.warning(f'Only {cnt_regular_seq} out of {len(in_sequences)} subsequences have regular length.') return -2, sequences # quantify clustering quality with silhouette_score X = np.empty((cnt_regular_seq, len_seq)) labels = np.empty((cnt_regular_seq,), dtype=int) for idx, seq in enumerate(sequences): X[idx] = seq.values labels[idx] = get_seq_label(seq) if len(np.unique(labels)) < 2: logger.warning('Need at least 2 clusters to calculate silhouette score.') return -2, sequences scores = metrics.silhouette_samples(X, labels) med_score = np.median(scores) # save silhouette_samples of each sub-sequence for score, seq in zip(scores, sequences): seq.silhouette = score if kee_all_sequences: idx = 0 for sub in in_sequences: if sub.length == len_seq: sub.silhouette = sequences[idx].silhouette idx += 1 else: sub.silhouette = -1.0 return med_score, in_sequences else: return med_score, sequences def check_weekend_pattern(df): sub_sequences = df_to_day_sequences(df) score, _ = evaluate_clustering_quality(sub_sequences, LABEL_WEEKEND) logger.debug(f'{score:.3f} for weekend pattern') return score >= THRESHOLD_SILHOUETTE_LOW def evaluate_weeky_pattern_with_offset(df): scores = [] for offset in range(12): sub_sequences = df_to_day_sequences(df, offset=offset) score, _ = evaluate_clustering_quality(sub_sequences, LABEL_WEEKEND) scores.append(score) score, offset = max(scores), np.argmax(scores) logger.debug(f'offset={offset}, {format_float_list(scores)} for weekend pattern') return score, offset def compare_workday_offday_avg(df, offset, offday_start): daily_avg = resample_with_offset(df[COL_VALUE], "24h", offset).mean() is_offdays = lambda x : x.dayofweek == offday_start or x.dayofweek == (offday_start + 1) % 7 for g in daily_avg.groupby(is_offdays): if g[0]: offdays_avg_mean = g[1].mean() offdays_avg_std = g[1].std() logger.debug(f'compare_workday_offday_avg : offdays(mean={offdays_avg_mean:.3f}, std={offdays_avg_std:.3f})') else: workdays_avg_mean = g[1].mean() workdays_avg_std = g[1].std() logger.debug(f'compare_workday_offday_avg : workdays(mean={workdays_avg_mean:.3f}, std={workdays_avg_std:.3f})') return abs(workdays_avg_mean - offdays_avg_mean) / (workdays_avg_std + offdays_avg_std + EPSILON) def _reduce_factor(avg_diff_normalized): # the factor to reduce the score, in the range of [0.5, 1.0] return expit(avg_diff_normalized - 1.0) * 2 # for weekday-offday pattern NOT aligned with calender weekday-weekend def check_weekly_pattern(df): if df.index[-1] - df.index[0] <= timedelta(days=13): logger.warning(f'There are only {df.index[-1] - df.index[0]} of data, we skip evaluting weekly patterns for time policy recommendation.') offday_start = -1 return -2.0, offday_start, 0, None best_score, offday_start, offset = evaluate_async_weeky_pattern_with_offset(df) label_method = partial(LABEL_WEEKLY_OFFDAYS, offday_start) avg_diff_normalized = compare_workday_offday_avg(df, offset, offday_start) if avg_diff_normalized < 1.0: # the evaluated weekly pattern is likely spurious, b/c # the normalized daily average difference is not substantial, # reduce its score, and return logger.debug(f'workday_offday normalized diff {avg_diff_normalized:.3f} < 1.0, reduce score of weekly pattern') return best_score * _reduce_factor(avg_diff_normalized), offday_start, offset, label_method score, label_method_5th = evaluate_5th_workday_pattern(df, offday_start, offset) if abs(score - best_score) / best_score < 0.1: # when the score difference is small, prefer the pattern with 5th workday: # Since we typically have only 2 to 4 weeks of data, dividing them into too many clusters without # a significant improvement in clustering quality can lower the silhouette score. logger.debug(f'the 5th workday pattern is accepted, the subsequences have three clusters, with the 5th workday forming a separate third cluster.') return score, offday_start, offset, label_method_5th else: return best_score, offday_start, offset, label_method # calcualte the day-of-week to label dictionary def get_dow_label_dict(offday_start, b_5th_workday=True): dow_label_dict = {} for dow in range(7): if dow == offday_start or dow == ((offday_start+1) % 7): # the two offdays dow_label_dict[dow] = 1 elif dow == ((offday_start-1) % 7): # the 5th workday dow_label_dict[dow] = 2 else: dow_label_dict[dow] = 0 return dow_label_dict def evaluate_5th_workday_pattern(df, offday_start, offset): sub_sequences = df_to_day_sequences(df, offset=offset) dow_label_dict = get_dow_label_dict(offday_start) label_method = partial(LABEL_WEEKLY_PATTERN, dow_label_dict) score, _ = evaluate_clustering_quality(sub_sequences, label_method) return score, label_method def evaluate_async_weeky_pattern_with_offset(df): # search route.a: find the async offdays first scores_a7 = [] sub_sequences = df_to_day_sequences(df) for start_day in range(7): label_weekly = partial(LABEL_WEEKLY_OFFDAYS, start_day) # lambda seq: 0 if seq.dayofweek()==start_day or seq.dayofweek()==((start_day+1) % 7) else 1 score, _ = evaluate_clustering_quality(sub_sequences, label_weekly) scores_a7.append(score) offday1 = np.argmax(scores_a7) label_weekly = partial(LABEL_WEEKLY_OFFDAYS, offday1) # lambda seq: 0 if seq.dayofweek()==offday1 or seq.dayofweek()==((offday1+1) % 7) else 1 scores_a12 = [] for offset in range(12): sub_sequences = df_to_day_sequences(df, offset=offset) score, _ = evaluate_clustering_quality(sub_sequences, label_weekly) scores_a12.append(score) # search route.b: find daily hour offset first scores_b12 = [] for offset in range(12): sub_sequences = df_to_day_sequences(df, offset=offset) score, _ = evaluate_clustering_quality(sub_sequences, LABEL_WEEKEND) scores_b12.append(score) offset = int(np.argmax(scores_b12)) scores_b7 = [] sub_sequences = df_to_day_sequences(df, offset=offset) for start_day in range(7): label_weekly = partial(LABEL_WEEKLY_OFFDAYS, start_day) # label_weekly = lambda seq: 0 if seq.dayofweek()==start_day or seq.dayofweek()==((start_day+1) % 7) else 1 score, _ = evaluate_clustering_quality(sub_sequences, label_weekly) scores_b7.append(score) logger.debug(f'scores_a7 = {format_float_list(scores_a7)}') logger.debug(f'scores_a12 = {format_float_list(scores_a12)}') logger.debug(f'scores_b7 = {format_float_list(scores_b7)}') logger.debug(f'scores_b12 = {format_float_list(scores_b12)}') if np.max(scores_a12) > np.max(scores_b7): return max(scores_a12), offday1, np.argmax(scores_a12) else: return max(scores_b7), np.argmax(scores_b7), offset class HourBlockCandidate: def __init__(self, hourdelta, offset=0, silhouette=-2, label=0) -> None: self.hourdelta = hourdelta self.offset = offset self.silhouette = silhouette self.label = label def __str__(self) -> str: return f'HourBlockCandidate(hourdelta={self.hourdelta}, offset={self.offset}, silhouette={self.silhouette:.3f})' def get_preferred_hour_block(hour_block_candidates): '''A robust decision rule for hour-block-candidates, by: find a high quality clustering of the candidates wrt high score vs low score, then pick from the high score cluster the candidate of longest hour block ''' assert len(hour_block_candidates) > 2, 'len(hour_block_candidates) <= 2' hbc_sorted = sorted(hour_block_candidates, key=lambda x: x.silhouette) # if is_verbose(): # print(' sorted hourdelta: ', [hbc.hourdelta for hbc in hbc_sorted], file=sys.stderr) X = np.array([hbc.silhouette for hbc in hbc_sorted]) X = np.reshape(X, (-1,1)) best_score = -2.0 best_cutoff = 0 for cutoff in range(1, len(hbc_sorted)): for i, hbc in enumerate(hbc_sorted): hbc.label = 0 if i best_score: best_score = score best_cutoff = cutoff # if is_verbose(): # print(f' {best_score:.3f} {[hbc.label for hbc in hbc_sorted]}, {score:.3f}', file=sys.stderr) return max(hbc_sorted[best_cutoff:], key=lambda hbc: hbc.hourdelta) def evaluate_hour_block(df): if df.index[-1] - df.index[0] <= timedelta(days=1): logger.warning(f'There are less than 1 day of data. At least 1 day of data is required for time policy recommendation.') return None hour_block_candidates = [] df_detrended = detrend_daily(df) for hourdelta in HOURDELTA_LIST: scores = np.empty(hourdelta) for offset in range(hourdelta): subs = df_to_hour_sequences(df_detrended, hourDelta=hourdelta, offset=offset) score, _ = evaluate_clustering_quality(subs, partial(LABEL_ALTER_HOUR_BLOCK, hourdelta, offset)) scores[offset] = score logger.debug(f'{hourdelta}h {format_float_list(scores)}') hour_block_candidates.append(HourBlockCandidate(hourdelta, np.argmax(scores), max(scores))) logger.debug(f'{format_float_list([hbc.silhouette for hbc in hour_block_candidates])} for hour block pattern') return get_preferred_hour_block(hour_block_candidates) def evaluate_half_day(df): if df.index[-1] - df.index[0] <= timedelta(days=3): logger.warning(f'There are only {df.index[-1] - df.index[0]} of data, we skip evaluting half-day patterns for time policy recommendation.') return -1.0, -1 scores_list = np.empty(12) stds0_list = np.empty(12) stds1_list = np.empty(12) for sh in range(12): subs = df_to_half_day_sequences(df, start_hour=sh) score, subs = evaluate_clustering_quality(subs, LABEL_HOUR) if score > 0: sub_filtered_values, _ = pack_sub_values_with_filter(subs, 50) sub_std0 = max(np.std(sub_filtered_values, axis=0)) sub_std1 = np.mean(np.std(sub_filtered_values, axis=1)) else: sub_std0 = 0 sub_std1 = 0 stds0_list[sh] = sub_std0 stds1_list[sh] = sub_std1 scores_list[sh]=score max_score = np.max(scores_list) if max_score < THRESHOLD_SILHOUETTE_LOW: logger.debug(f'half-day pattern scores: {format_float_list(scores_list)}') return max_score, np.argmax(scores_list) if not(max(stds0_list) > 0 and max(stds1_list) > 0): logger.debug(f'return early in half-day pattern') return np.max(scores_list), np.argmax(scores_list) adjusted_scores_list0 = adjust_scores_by_std(scores_list, stds0_list) adjusted_scores_list1 = adjust_scores_by_std(scores_list, stds1_list) a0_power, a0_support = get_disambiguity_power(adjusted_scores_list0) a1_power, a1_support = get_disambiguity_power(adjusted_scores_list1) logger.debug(f'half-day pattern scores.a0: ({a0_power:.3f}, ({a0_support[0]}, {a0_support[1]})) {format_float_list(adjusted_scores_list0)}') logger.debug(f'half-day pattern scores.a1: ({a1_power:.3f}, ({a1_support[0]}, {a1_support[1]})) {format_float_list(adjusted_scores_list1)}') # pernalized by the size of the support a0_power /= a0_support[1] - a0_support[0] a1_power /= a1_support[1] - a1_support[0] if a1_power > a0_power: return np.max(adjusted_scores_list1), np.argmax(adjusted_scores_list1) else: return np.max(adjusted_scores_list0), np.argmax(adjusted_scores_list0) def get_disambiguity_power(scores_list): len_list = len(scores_list) X = np.array(scores_list) X = np.reshape(X, (-1,1)) idx_max = np.argmax(scores_list) best_silhouette = -2.0 left = idx_max right = idx_max + 1 while left >= 0: while right <= len_list: if right - left == len_list: break labels_list = np.zeros(len_list, dtype=int) labels_list[left:right] = 1 silhouette = metrics.silhouette_score(X, labels_list) if silhouette > best_silhouette: best_silhouette = silhouette support = (left, right) right += 1 left -=1 return best_silhouette, support def adjust_scores_by_std(scores_list, stds_list): min_std = min([std for std in stds_list if std>0]) std_ratios = [std/min_std if std>0 else std for std in stds_list] adjusted_scores_list = [score/std_ratio if std_ratio>0 else score for (score, std_ratio) in zip(scores_list, std_ratios)] return adjusted_scores_list