You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
154 lines
6.2 KiB
154 lines
6.2 KiB
from collections import defaultdict
|
|
import numpy as np
|
|
from scipy.stats import iqr
|
|
|
|
from util.data_prepare import (
|
|
COL_VALUE, COL_ANOMALY_LABEL, COL_ITSI_BND_LOW, COL_ITSI_BND_UP,
|
|
DEFAULT_SENSITIVITY_MULTIPLIER,
|
|
EPSILON, DEFAULT_Z,
|
|
NotEnoughDataException)
|
|
from util.pattern_evaluate import THRESHOLD_SILHOUETTE_LOW
|
|
from util.threshold_utils import calc_anomaly_label
|
|
from util.itsi_at_threshold import itsi_thresholding
|
|
from util.itsi_at_no_pattern import itsi_thresholding_np
|
|
|
|
from util import setup_logging
|
|
logger = setup_logging.get_logger()
|
|
|
|
def break_cron_weekdays(cronitem_dict):
|
|
splitted_keys = []
|
|
key_mapping = defaultdict(int)
|
|
for key, cronitem in list(cronitem_dict.items()):
|
|
cron = cronitem.cron_expression
|
|
weekdays = cron[cron.rfind(' ') + 1 :].split(',')
|
|
if len(weekdays) == 1:
|
|
splitted_keys.append(key)
|
|
key_mapping[key] = key
|
|
else:
|
|
hour = key % 100
|
|
for day in weekdays:
|
|
splitted_key = 100 * int(day) + hour
|
|
splitted_keys.append(splitted_key)
|
|
key_mapping[splitted_key] = key
|
|
|
|
return sorted(splitted_keys), key_mapping
|
|
|
|
def _find_key(timestamp, time_policy, sorted_keys, key_mapping):
|
|
if time_policy.has_weekend:
|
|
return key_mapping[ sorted_keys[
|
|
np.searchsorted(sorted_keys, timestamp.dayofweek * 100 + timestamp.hour, side='right') -1
|
|
]
|
|
]
|
|
# the simple case of search only depends on hour
|
|
return sorted_keys[
|
|
np.searchsorted(sorted_keys, timestamp.hour, side='right') -1
|
|
]
|
|
|
|
def outlier_exclusion(values, median_val, iqr_val, sensitivity):
|
|
return values[np.where((values >= median_val - iqr_val * sensitivity) & (values <= median_val + iqr_val * sensitivity))]
|
|
|
|
def calc_sensitivity(df, df_values, time_policy, history_normal_dict, cronitem_dict, subs, label_method, compare_thresholds=False):
|
|
try:
|
|
if time_policy.score >= THRESHOLD_SILHOUETTE_LOW:
|
|
df = itsi_thresholding(df, history_normal_dict, subs, label_method)
|
|
else:
|
|
df = itsi_thresholding_np(df)
|
|
|
|
except NotEnoughDataException:
|
|
# return a copy of the cronitem_dict with sensitivity set to -1.0
|
|
cronitem_dict_cp = {}
|
|
for key, cronitem in list(cronitem_dict.items()):
|
|
cronitem_dict_cp[key] = cronitem.update_mean_std_sensitivity(-1.0, -1.0, -1.0)
|
|
return cronitem_dict_cp, None
|
|
|
|
df[COL_VALUE] = df_values
|
|
df = calc_anomaly_label(df, apply_edge_mask=False)
|
|
|
|
if time_policy.has_weekend:
|
|
sorted_keys, key_mapping = break_cron_weekdays(cronitem_dict)
|
|
else: # the simple case of key only depends on hour
|
|
sorted_keys = sorted(cronitem_dict.keys())
|
|
key_mapping = None
|
|
|
|
values_collection = {}
|
|
normal_values_max = defaultdict(lambda: -float('inf'))
|
|
normal_values_min = defaultdict(lambda: float('inf'))
|
|
|
|
def _update_normal_values_range(key, normal_values):
|
|
val_max, val_min = normal_values.max(), normal_values.min()
|
|
if val_max > normal_values_max[key]:
|
|
normal_values_max[key] = val_max
|
|
if val_min < normal_values_min[key]:
|
|
normal_values_min[key] = val_min
|
|
|
|
dfre = df.resample('1h')
|
|
for timestamp, sub_df in dfre:
|
|
key = _find_key(timestamp, time_policy, sorted_keys, key_mapping)
|
|
if key in values_collection:
|
|
values_collection[key] = np.concatenate((values_collection[key], sub_df[COL_VALUE].to_numpy()))
|
|
else:
|
|
values_collection[key] = sub_df[COL_VALUE].to_numpy()
|
|
|
|
_update_normal_values_range(key, sub_df[COL_VALUE][sub_df[COL_ANOMALY_LABEL]==0])
|
|
|
|
max_multiplier = - float('inf')
|
|
median_dict = {}
|
|
iqr_dict = {}
|
|
for key, values in list(values_collection.items()):
|
|
mid = np.median(values)
|
|
median_dict[key] = mid
|
|
deviation_unit = iqr(values)
|
|
iqr_dict[key] = deviation_unit
|
|
|
|
if deviation_unit > 0:
|
|
multiplier_up = (normal_values_max[key] - mid) / deviation_unit
|
|
multiplier_low = (mid - normal_values_min[key]) / deviation_unit
|
|
#TODO: improve the logic for the case of severely skewed dataset
|
|
multiplier = max(multiplier_up, multiplier_low)
|
|
|
|
else:
|
|
logger.warning(f'WARN Zero iqr in values ({key}). Use default multiplier {DEFAULT_SENSITIVITY_MULTIPLIER:.2f}')
|
|
multiplier = DEFAULT_SENSITIVITY_MULTIPLIER
|
|
|
|
if multiplier > max_multiplier:
|
|
max_multiplier = multiplier
|
|
|
|
# return a copy of the cronitem_dict with sensitivity updated
|
|
cronitem_dict_cp = {}
|
|
for key in cronitem_dict:
|
|
values_filtered = outlier_exclusion(values_collection[key], median_dict[key], iqr_dict[key], max_multiplier)
|
|
|
|
mean_value = np.mean(values_filtered)
|
|
std_value = np.std(values_filtered)
|
|
if std_value <= EPSILON:
|
|
logger.info(f'Zero Std in filtered values. Use default Z {DEFAULT_Z:.2f}')
|
|
z_value = DEFAULT_Z
|
|
else:
|
|
z_value = max((values_filtered - mean_value) / std_value)
|
|
|
|
cronitem_dict_cp[key] = cronitem_dict[key].update_numeric_stats(
|
|
mean_value,
|
|
std_value,
|
|
z_value,
|
|
max_multiplier)
|
|
|
|
if compare_thresholds:
|
|
if time_policy.score >= THRESHOLD_SILHOUETTE_LOW:
|
|
# add thresholds calculated by replicating the ITSI outlier exclusion logic
|
|
df_len = len(df)
|
|
bnd_up = np.empty(df_len)
|
|
bnd_low = np.empty(df_len)
|
|
idx = 0
|
|
for timestamp, sub_df in dfre:
|
|
sub_len = len(sub_df)
|
|
key = _find_key(timestamp, time_policy, sorted_keys, key_mapping)
|
|
cronitem = cronitem_dict_cp[key] # use the mean/std/z values that will be returned to ITSI
|
|
bnd_up[idx : idx + sub_len] = cronitem.mean + cronitem.z_value * cronitem.std
|
|
bnd_low[idx : idx + sub_len] = cronitem.mean - cronitem.z_value * cronitem.std
|
|
idx += sub_len
|
|
|
|
df[COL_ITSI_BND_UP] = bnd_up
|
|
df[COL_ITSI_BND_LOW] = bnd_low
|
|
|
|
return cronitem_dict_cp, (values_collection, normal_values_min, normal_values_max, df) if compare_thresholds else None
|