You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

331 lines
15 KiB

from datetime import timedelta
from functools import partial
import numpy as np
from sklearn import metrics
from scipy.special import expit
from util.data_prepare import detrend_daily, COL_VALUE, EPSILON, resample_with_offset
from util.dev_util import format_float_list
from util.sub_sequence import df_to_day_sequences, df_to_hour_sequences, df_to_half_day_sequences, pack_sub_values_with_filter
from util import setup_logging
from six.moves import range, zip
logger = setup_logging.get_logger()
HOURDELTA_LIST = [1, 2, 3, 4]
THRESHOLD_SILHOUETTE_LOW = 0.1
THRESHOLD_SILHOUETTE_MEDIUM = 0.4
THRESHOLD_SILHOUETTE_HIGH = 0.6
THRESHOLD_IRREGULAR_SEQUENCE_LENGTH = 0.7
# the general weekly pattern: seven days in a week can be labeled to all possible clustering schemes
LABEL_WEEKLY_PATTERN = lambda dow_label_dict, seq: dow_label_dict[seq.dayofweek()]
LABEL_WEEKLY_OFFDAYS = lambda offday_start, seq: 1 if seq.dayofweek() == offday_start or seq.dayofweek() == (offday_start + 1) % 7 else 0
LABEL_WEEKEND = lambda seq: 0 if seq.dayofweek() < 5 else 1 # Monday=0, Sunday=6
LABEL_HOUR = lambda seq: seq.hour()
LABEL_ALTER_HOUR_BLOCK = lambda hourdelta, offset, seq: 0 if ((seq.hour() - offset) // hourdelta) % 2 else 1
def evaluate_clustering_quality(in_sequences, get_seq_label, kee_all_sequences=False):
lengths = [s.length for s in in_sequences]
counts = np.bincount(lengths)
len_seq = np.argmax(counts) # use the most frequent one as the sequence length of this data set
sequences = [s for s in in_sequences if s.length == len_seq] # make sure sequences having same length
cnt_regular_seq = len(sequences)
if cnt_regular_seq < len(in_sequences) and (cnt_regular_seq < THRESHOLD_IRREGULAR_SEQUENCE_LENGTH * len(in_sequences)):
logger.warning(f'Only {cnt_regular_seq} out of {len(in_sequences)} subsequences have regular length.')
return -2, sequences
# quantify clustering quality with silhouette_score
X = np.empty((cnt_regular_seq, len_seq))
labels = np.empty((cnt_regular_seq,), dtype=int)
for idx, seq in enumerate(sequences):
X[idx] = seq.values
labels[idx] = get_seq_label(seq)
if len(np.unique(labels)) < 2:
logger.warning('Need at least 2 clusters to calculate silhouette score.')
return -2, sequences
scores = metrics.silhouette_samples(X, labels)
med_score = np.median(scores)
# save silhouette_samples of each sub-sequence
for score, seq in zip(scores, sequences):
seq.silhouette = score
if kee_all_sequences:
idx = 0
for sub in in_sequences:
if sub.length == len_seq:
sub.silhouette = sequences[idx].silhouette
idx += 1
else:
sub.silhouette = -1.0
return med_score, in_sequences
else:
return med_score, sequences
def check_weekend_pattern(df):
sub_sequences = df_to_day_sequences(df)
score, _ = evaluate_clustering_quality(sub_sequences, LABEL_WEEKEND)
logger.debug(f'{score:.3f} for weekend pattern')
return score >= THRESHOLD_SILHOUETTE_LOW
def evaluate_weeky_pattern_with_offset(df):
scores = []
for offset in range(12):
sub_sequences = df_to_day_sequences(df, offset=offset)
score, _ = evaluate_clustering_quality(sub_sequences, LABEL_WEEKEND)
scores.append(score)
score, offset = max(scores), np.argmax(scores)
logger.debug(f'offset={offset}, {format_float_list(scores)} for weekend pattern')
return score, offset
def compare_workday_offday_avg(df, offset, offday_start):
daily_avg = resample_with_offset(df[COL_VALUE], "24h", offset).mean()
is_offdays = lambda x : x.dayofweek == offday_start or x.dayofweek == (offday_start + 1) % 7
for g in daily_avg.groupby(is_offdays):
if g[0]:
offdays_avg_mean = g[1].mean()
offdays_avg_std = g[1].std()
logger.debug(f'compare_workday_offday_avg : offdays(mean={offdays_avg_mean:.3f}, std={offdays_avg_std:.3f})')
else:
workdays_avg_mean = g[1].mean()
workdays_avg_std = g[1].std()
logger.debug(f'compare_workday_offday_avg : workdays(mean={workdays_avg_mean:.3f}, std={workdays_avg_std:.3f})')
return abs(workdays_avg_mean - offdays_avg_mean) / (workdays_avg_std + offdays_avg_std + EPSILON)
def _reduce_factor(avg_diff_normalized):
# the factor to reduce the score, in the range of [0.5, 1.0]
return expit(avg_diff_normalized - 1.0) * 2
# for weekday-offday pattern NOT aligned with calender weekday-weekend
def check_weekly_pattern(df):
if df.index[-1] - df.index[0] <= timedelta(days=13):
logger.warning(f'There are only {df.index[-1] - df.index[0]} of data, we skip evaluting weekly patterns for time policy recommendation.')
offday_start = -1
return -2.0, offday_start, 0, None
best_score, offday_start, offset = evaluate_async_weeky_pattern_with_offset(df)
label_method = partial(LABEL_WEEKLY_OFFDAYS, offday_start)
avg_diff_normalized = compare_workday_offday_avg(df, offset, offday_start)
if avg_diff_normalized < 1.0:
# the evaluated weekly pattern is likely spurious, b/c
# the normalized daily average difference is not substantial,
# reduce its score, and return
logger.debug(f'workday_offday normalized diff {avg_diff_normalized:.3f} < 1.0, reduce score of weekly pattern')
return best_score * _reduce_factor(avg_diff_normalized), offday_start, offset, label_method
score, label_method_5th = evaluate_5th_workday_pattern(df, offday_start, offset)
if abs(score - best_score) / best_score < 0.1:
# when the score difference is small, prefer the pattern with 5th workday:
# Since we typically have only 2 to 4 weeks of data, dividing them into too many clusters without
# a significant improvement in clustering quality can lower the silhouette score.
logger.debug(f'the 5th workday pattern is accepted, the subsequences have three clusters, with the 5th workday forming a separate third cluster.')
return score, offday_start, offset, label_method_5th
else:
return best_score, offday_start, offset, label_method
# calcualte the day-of-week to label dictionary
def get_dow_label_dict(offday_start, b_5th_workday=True):
dow_label_dict = {}
for dow in range(7):
if dow == offday_start or dow == ((offday_start+1) % 7): # the two offdays
dow_label_dict[dow] = 1
elif dow == ((offday_start-1) % 7): # the 5th workday
dow_label_dict[dow] = 2
else:
dow_label_dict[dow] = 0
return dow_label_dict
def evaluate_5th_workday_pattern(df, offday_start, offset):
sub_sequences = df_to_day_sequences(df, offset=offset)
dow_label_dict = get_dow_label_dict(offday_start)
label_method = partial(LABEL_WEEKLY_PATTERN, dow_label_dict)
score, _ = evaluate_clustering_quality(sub_sequences, label_method)
return score, label_method
def evaluate_async_weeky_pattern_with_offset(df):
# search route.a: find the async offdays first
scores_a7 = []
sub_sequences = df_to_day_sequences(df)
for start_day in range(7):
label_weekly = partial(LABEL_WEEKLY_OFFDAYS, start_day)
# lambda seq: 0 if seq.dayofweek()==start_day or seq.dayofweek()==((start_day+1) % 7) else 1
score, _ = evaluate_clustering_quality(sub_sequences, label_weekly)
scores_a7.append(score)
offday1 = np.argmax(scores_a7)
label_weekly = partial(LABEL_WEEKLY_OFFDAYS, offday1) # lambda seq: 0 if seq.dayofweek()==offday1 or seq.dayofweek()==((offday1+1) % 7) else 1
scores_a12 = []
for offset in range(12):
sub_sequences = df_to_day_sequences(df, offset=offset)
score, _ = evaluate_clustering_quality(sub_sequences, label_weekly)
scores_a12.append(score)
# search route.b: find daily hour offset first
scores_b12 = []
for offset in range(12):
sub_sequences = df_to_day_sequences(df, offset=offset)
score, _ = evaluate_clustering_quality(sub_sequences, LABEL_WEEKEND)
scores_b12.append(score)
offset = int(np.argmax(scores_b12))
scores_b7 = []
sub_sequences = df_to_day_sequences(df, offset=offset)
for start_day in range(7):
label_weekly = partial(LABEL_WEEKLY_OFFDAYS, start_day)
# label_weekly = lambda seq: 0 if seq.dayofweek()==start_day or seq.dayofweek()==((start_day+1) % 7) else 1
score, _ = evaluate_clustering_quality(sub_sequences, label_weekly)
scores_b7.append(score)
logger.debug(f'scores_a7 = {format_float_list(scores_a7)}')
logger.debug(f'scores_a12 = {format_float_list(scores_a12)}')
logger.debug(f'scores_b7 = {format_float_list(scores_b7)}')
logger.debug(f'scores_b12 = {format_float_list(scores_b12)}')
if np.max(scores_a12) > np.max(scores_b7):
return max(scores_a12), offday1, np.argmax(scores_a12)
else:
return max(scores_b7), np.argmax(scores_b7), offset
class HourBlockCandidate:
def __init__(self, hourdelta, offset=0, silhouette=-2, label=0) -> None:
self.hourdelta = hourdelta
self.offset = offset
self.silhouette = silhouette
self.label = label
def __str__(self) -> str:
return f'HourBlockCandidate(hourdelta={self.hourdelta}, offset={self.offset}, silhouette={self.silhouette:.3f})'
def get_preferred_hour_block(hour_block_candidates):
'''A robust decision rule for hour-block-candidates, by:
find a high quality clustering of the candidates wrt high score vs low score, then
pick from the high score cluster the candidate of longest hour block
'''
assert len(hour_block_candidates) > 2, 'len(hour_block_candidates) <= 2'
hbc_sorted = sorted(hour_block_candidates, key=lambda x: x.silhouette)
# if is_verbose():
# print(' sorted hourdelta: ', [hbc.hourdelta for hbc in hbc_sorted], file=sys.stderr)
X = np.array([hbc.silhouette for hbc in hbc_sorted])
X = np.reshape(X, (-1,1))
best_score = -2.0
best_cutoff = 0
for cutoff in range(1, len(hbc_sorted)):
for i, hbc in enumerate(hbc_sorted):
hbc.label = 0 if i<cutoff else 1
score = metrics.silhouette_score(X, [hbc.label for hbc in hbc_sorted])
if score > best_score:
best_score = score
best_cutoff = cutoff
# if is_verbose():
# print(f' {best_score:.3f} {[hbc.label for hbc in hbc_sorted]}, {score:.3f}', file=sys.stderr)
return max(hbc_sorted[best_cutoff:], key=lambda hbc: hbc.hourdelta)
def evaluate_hour_block(df):
if df.index[-1] - df.index[0] <= timedelta(days=1):
logger.warning(f'There are less than 1 day of data. At least 1 day of data is required for time policy recommendation.')
return None
hour_block_candidates = []
df_detrended = detrend_daily(df)
for hourdelta in HOURDELTA_LIST:
scores = np.empty(hourdelta)
for offset in range(hourdelta):
subs = df_to_hour_sequences(df_detrended, hourDelta=hourdelta, offset=offset)
score, _ = evaluate_clustering_quality(subs, partial(LABEL_ALTER_HOUR_BLOCK, hourdelta, offset))
scores[offset] = score
logger.debug(f'{hourdelta}h {format_float_list(scores)}')
hour_block_candidates.append(HourBlockCandidate(hourdelta, np.argmax(scores), max(scores)))
logger.debug(f'{format_float_list([hbc.silhouette for hbc in hour_block_candidates])} for hour block pattern')
return get_preferred_hour_block(hour_block_candidates)
def evaluate_half_day(df):
if df.index[-1] - df.index[0] <= timedelta(days=3):
logger.warning(f'There are only {df.index[-1] - df.index[0]} of data, we skip evaluting half-day patterns for time policy recommendation.')
return -1.0, -1
scores_list = np.empty(12)
stds0_list = np.empty(12)
stds1_list = np.empty(12)
for sh in range(12):
subs = df_to_half_day_sequences(df, start_hour=sh)
score, subs = evaluate_clustering_quality(subs, LABEL_HOUR)
if score > 0:
sub_filtered_values, _ = pack_sub_values_with_filter(subs, 50)
sub_std0 = max(np.std(sub_filtered_values, axis=0))
sub_std1 = np.mean(np.std(sub_filtered_values, axis=1))
else:
sub_std0 = 0
sub_std1 = 0
stds0_list[sh] = sub_std0
stds1_list[sh] = sub_std1
scores_list[sh]=score
max_score = np.max(scores_list)
if max_score < THRESHOLD_SILHOUETTE_LOW:
logger.debug(f'half-day pattern scores: {format_float_list(scores_list)}')
return max_score, np.argmax(scores_list)
if not(max(stds0_list) > 0 and max(stds1_list) > 0):
logger.debug(f'return early in half-day pattern')
return np.max(scores_list), np.argmax(scores_list)
adjusted_scores_list0 = adjust_scores_by_std(scores_list, stds0_list)
adjusted_scores_list1 = adjust_scores_by_std(scores_list, stds1_list)
a0_power, a0_support = get_disambiguity_power(adjusted_scores_list0)
a1_power, a1_support = get_disambiguity_power(adjusted_scores_list1)
logger.debug(f'half-day pattern scores.a0: ({a0_power:.3f}, ({a0_support[0]}, {a0_support[1]})) {format_float_list(adjusted_scores_list0)}')
logger.debug(f'half-day pattern scores.a1: ({a1_power:.3f}, ({a1_support[0]}, {a1_support[1]})) {format_float_list(adjusted_scores_list1)}')
# pernalized by the size of the support
a0_power /= a0_support[1] - a0_support[0]
a1_power /= a1_support[1] - a1_support[0]
if a1_power > a0_power:
return np.max(adjusted_scores_list1), np.argmax(adjusted_scores_list1)
else:
return np.max(adjusted_scores_list0), np.argmax(adjusted_scores_list0)
def get_disambiguity_power(scores_list):
len_list = len(scores_list)
X = np.array(scores_list)
X = np.reshape(X, (-1,1))
idx_max = np.argmax(scores_list)
best_silhouette = -2.0
left = idx_max
right = idx_max + 1
while left >= 0:
while right <= len_list:
if right - left == len_list: break
labels_list = np.zeros(len_list, dtype=int)
labels_list[left:right] = 1
silhouette = metrics.silhouette_score(X, labels_list)
if silhouette > best_silhouette:
best_silhouette = silhouette
support = (left, right)
right += 1
left -=1
return best_silhouette, support
def adjust_scores_by_std(scores_list, stds_list):
min_std = min([std for std in stds_list if std>0])
std_ratios = [std/min_std if std>0 else std for std in stds_list]
adjusted_scores_list = [score/std_ratio if std_ratio>0 else score for (score, std_ratio) in zip(scores_list, std_ratios)]
return adjusted_scores_list