from collections import defaultdict import numpy as np from scipy.stats import iqr from util.data_prepare import ( COL_VALUE, COL_ANOMALY_LABEL, COL_ITSI_BND_LOW, COL_ITSI_BND_UP, DEFAULT_SENSITIVITY_MULTIPLIER, EPSILON, DEFAULT_Z, NotEnoughDataException) from util.pattern_evaluate import THRESHOLD_SILHOUETTE_LOW from util.threshold_utils import calc_anomaly_label from util.itsi_at_threshold import itsi_thresholding from util.itsi_at_no_pattern import itsi_thresholding_np from util import setup_logging logger = setup_logging.get_logger() def break_cron_weekdays(cronitem_dict): splitted_keys = [] key_mapping = defaultdict(int) for key, cronitem in list(cronitem_dict.items()): cron = cronitem.cron_expression weekdays = cron[cron.rfind(' ') + 1 :].split(',') if len(weekdays) == 1: splitted_keys.append(key) key_mapping[key] = key else: hour = key % 100 for day in weekdays: splitted_key = 100 * int(day) + hour splitted_keys.append(splitted_key) key_mapping[splitted_key] = key return sorted(splitted_keys), key_mapping def _find_key(timestamp, time_policy, sorted_keys, key_mapping): if time_policy.has_weekend: return key_mapping[ sorted_keys[ np.searchsorted(sorted_keys, timestamp.dayofweek * 100 + timestamp.hour, side='right') -1 ] ] # the simple case of search only depends on hour return sorted_keys[ np.searchsorted(sorted_keys, timestamp.hour, side='right') -1 ] def outlier_exclusion(values, median_val, iqr_val, sensitivity): return values[np.where((values >= median_val - iqr_val * sensitivity) & (values <= median_val + iqr_val * sensitivity))] def calc_sensitivity(df, df_values, time_policy, history_normal_dict, cronitem_dict, subs, label_method, compare_thresholds=False): try: if time_policy.score >= THRESHOLD_SILHOUETTE_LOW: df = itsi_thresholding(df, history_normal_dict, subs, label_method) else: df = itsi_thresholding_np(df) except NotEnoughDataException: # return a copy of the cronitem_dict with sensitivity set to -1.0 cronitem_dict_cp = {} for key, cronitem in list(cronitem_dict.items()): cronitem_dict_cp[key] = cronitem.update_mean_std_sensitivity(-1.0, -1.0, -1.0) return cronitem_dict_cp, None df[COL_VALUE] = df_values df = calc_anomaly_label(df, apply_edge_mask=False) if time_policy.has_weekend: sorted_keys, key_mapping = break_cron_weekdays(cronitem_dict) else: # the simple case of key only depends on hour sorted_keys = sorted(cronitem_dict.keys()) key_mapping = None values_collection = {} normal_values_max = defaultdict(lambda: -float('inf')) normal_values_min = defaultdict(lambda: float('inf')) def _update_normal_values_range(key, normal_values): val_max, val_min = normal_values.max(), normal_values.min() if val_max > normal_values_max[key]: normal_values_max[key] = val_max if val_min < normal_values_min[key]: normal_values_min[key] = val_min dfre = df.resample('1h') for timestamp, sub_df in dfre: key = _find_key(timestamp, time_policy, sorted_keys, key_mapping) if key in values_collection: values_collection[key] = np.concatenate((values_collection[key], sub_df[COL_VALUE].to_numpy())) else: values_collection[key] = sub_df[COL_VALUE].to_numpy() _update_normal_values_range(key, sub_df[COL_VALUE][sub_df[COL_ANOMALY_LABEL]==0]) max_multiplier = - float('inf') median_dict = {} iqr_dict = {} for key, values in list(values_collection.items()): mid = np.median(values) median_dict[key] = mid deviation_unit = iqr(values) iqr_dict[key] = deviation_unit if deviation_unit > 0: multiplier_up = (normal_values_max[key] - mid) / deviation_unit multiplier_low = (mid - normal_values_min[key]) / deviation_unit #TODO: improve the logic for the case of severely skewed dataset multiplier = max(multiplier_up, multiplier_low) else: logger.warning(f'WARN Zero iqr in values ({key}). Use default multiplier {DEFAULT_SENSITIVITY_MULTIPLIER:.2f}') multiplier = DEFAULT_SENSITIVITY_MULTIPLIER if multiplier > max_multiplier: max_multiplier = multiplier # return a copy of the cronitem_dict with sensitivity updated cronitem_dict_cp = {} for key in cronitem_dict: values_filtered = outlier_exclusion(values_collection[key], median_dict[key], iqr_dict[key], max_multiplier) mean_value = np.mean(values_filtered) std_value = np.std(values_filtered) if std_value <= EPSILON: logger.info(f'Zero Std in filtered values. Use default Z {DEFAULT_Z:.2f}') z_value = DEFAULT_Z else: z_value = max((values_filtered - mean_value) / std_value) cronitem_dict_cp[key] = cronitem_dict[key].update_numeric_stats( mean_value, std_value, z_value, max_multiplier) if compare_thresholds: if time_policy.score >= THRESHOLD_SILHOUETTE_LOW: # add thresholds calculated by replicating the ITSI outlier exclusion logic df_len = len(df) bnd_up = np.empty(df_len) bnd_low = np.empty(df_len) idx = 0 for timestamp, sub_df in dfre: sub_len = len(sub_df) key = _find_key(timestamp, time_policy, sorted_keys, key_mapping) cronitem = cronitem_dict_cp[key] # use the mean/std/z values that will be returned to ITSI bnd_up[idx : idx + sub_len] = cronitem.mean + cronitem.z_value * cronitem.std bnd_low[idx : idx + sub_len] = cronitem.mean - cronitem.z_value * cronitem.std idx += sub_len df[COL_ITSI_BND_UP] = bnd_up df[COL_ITSI_BND_LOW] = bnd_low return cronitem_dict_cp, (values_collection, normal_values_min, normal_values_max, df) if compare_thresholds else None