from collections import defaultdict from functools import partial from itertools import groupby import numpy as np from util.data_prepare import ( COL_VALUE, COL_BND_LOW, COL_BND_UP, COL_EDGE_MASK, COL_ANOMALY_LABEL, COL_ANOMALY_SCORE, sigmoid ) from util.pattern_evaluate import ( LABEL_WEEKLY_OFFDAYS, LABEL_ALTER_HOUR_BLOCK, evaluate_clustering_quality ) from util.sub_sequence import df_to_day_sequences, df_to_half_day_sequences, df_to_hour_sequences THRESH_ANOMALY_SCORE = 0.5 def value_to_anomaly_score(val, bnd_up, bnd_low): #TODO: try: anomaly score as z value if val > bnd_up: return val - bnd_up if val < bnd_low: return bnd_low - val return -1.0 def relative_dist_to_score(val, bnd_low, bnd_up): # transfer value to a non-negative number : # 0 if val==mid # 1 if val==bnd_up or val==bnd_low # > 1 if val beyond bnd mid = (bnd_up + bnd_low) / 2.0 wide = (bnd_up - bnd_low) / 2.0 if wide == 0.0: return sigmoid(-1.0) # transfer to [0, 1] using sigmoid function return sigmoid(abs(val - mid) / wide) # generate anomaly label for df with anomaly boundary def calc_anomaly_label(df, apply_edge_mask=True): anomaly_scores = np.vectorize(relative_dist_to_score)( df[COL_VALUE].to_numpy(), df[COL_BND_LOW].to_numpy(), df[COL_BND_UP].to_numpy(), ) if apply_edge_mask: anomaly_labels = ( anomaly_scores * np.array(df[COL_EDGE_MASK], dtype=int) > THRESH_ANOMALY_SCORE ).astype(int) else: anomaly_labels = (anomaly_scores > THRESH_ANOMALY_SCORE).astype(int) df[COL_ANOMALY_LABEL] = anomaly_labels df[COL_ANOMALY_SCORE] = anomaly_scores return df def sub_sequences_groupby(subs, get_seq_label): groups = defaultdict(list) for label, sub_grouper in groupby(subs, get_seq_label): groups[label] += list(sub_grouper) return groups def subsequence_by_time_policy(df, time_policy): if time_policy.label_method is not None: label_method = time_policy.label_method subs = df_to_day_sequences(df, offset=time_policy.offset) elif time_policy.hour_block_length>=1 and time_policy.hour_block_length<12: # hour-block pattern label_method = partial(LABEL_ALTER_HOUR_BLOCK, time_policy.hour_block_length, time_policy.offset) subs = df_to_hour_sequences(df, time_policy.hour_block_length, time_policy.offset) elif time_policy.hour_block_length==12: # half-day pattern label_method = partial(LABEL_ALTER_HOUR_BLOCK, time_policy.hour_block_length, time_policy.offset) subs = df_to_half_day_sequences(df, start_hour=time_policy.offset) elif time_policy.has_weekend: # weekly pattern label_method = partial(LABEL_WEEKLY_OFFDAYS, time_policy.offdays_start) subs = df_to_day_sequences(df, offset=time_policy.offset) else: # default : 1-hour block label_method = partial(LABEL_ALTER_HOUR_BLOCK, 1, 0) subs = df_to_hour_sequences(df) _, subs = evaluate_clustering_quality(subs, label_method, kee_all_sequences=True) return subs, label_method def get_edge_length(): #TODO return 2 # sub_length = sub_duration // df_resolution # half_hour_edge = max(timedelta(minutes=30) // df_resolution + 1, 2) # return half_hour_edge if half_hour_edge * 2 < sub_length else 2