from datetime import timedelta import numpy as np from util.dev_util import format_float_list from util.data_prepare import COL_VALUE, COL_BND_LOW, COL_BND_UP, COL_EDGE_MASK , DEFAULT_Z, EPSILON, NotEnoughDataException from util.sub_sequence import pack_sub_values_with_filter from util.threshold_utils import subsequence_by_time_policy, sub_sequences_groupby, get_edge_length from util import setup_logging from six.moves import range, zip logger = setup_logging.get_logger() HISTORY_NORMAL_PERCENTILE_THRESHOLD = 50 SPLIT_DELTA = 12 # minimum split size wrt number of data points VARIATION_BUFFER = 0.05 # ============================================================= # split long subsequences to smaller segments for thresholding # ============================================================= CLIP_LEFT = 'L' CLIP_RIGHT = 'R' CLIP_BOTH = 'LR' DEFAULT_EDGE_CLIP = 1 def outlier_filter(values, filter_config): """ Filters outliers from a given array of values based on user-defined configuration, which sets the sensitivity of the threshold bounds. If a tighter range is specified (with a bigger lower-bound value and a smaller upper-bound value), more values are classified as outliers and removed. This reduction in outliers decreases the standard deviation, thereby increasing the sensitivity of final thresholding result. Parameters: - values: A numpy array or list of numerical values to filter. - filter_config: An object that contains the following attributes: - percentile_upper_threshold: Upper threshold percentile for outlier filtering. - percentile_lower_thres: Lower threshold percentile for outlier filtering. - variation_unit_multiplier: A multiplier to determine the variation range around the median. Returns: - values_filtered: A numpy array of values with outliers removed based on the configured thresholds. """ percentile_upper_threshold = filter_config.percentile_upper_threshold percentile_lower_threshold = filter_config.percentile_lower_threshold variation_unit_multiplier = filter_config.variation_unit_multiplier (percentile_lower_bound, percentile_upper_bound) = np.percentile(values, [percentile_lower_threshold, percentile_upper_threshold]) mid = np.median(values) # use IQR as variation_unit variation_unit = np.subtract.reduce(np.percentile(values, [75, 25])) if variation_unit == 0 : variation_unit = values.std() + 1e-6 # filter out extreme values lower_bound = min(mid - variation_unit_multiplier * variation_unit, percentile_lower_bound) values_filtered = values[values > lower_bound] upper_bound = max(mid + variation_unit_multiplier * variation_unit, percentile_upper_bound) values_filtered = values_filtered[values_filtered < upper_bound] return values_filtered def _calc_segment_statistics(sub_values, collection_coherence_score, filter_config, clip='', calc_multiplier=True, sub_values_all=None): """ Calculate statistics for a given segment of data. Parameters: - sub_values: A 2D array of numerical values representing the data segment to analyze, each row represents values group by time_policy - filter_config: A configuration object that defines how to filter outliers. - clip: Optional. Defines whether to clip the edges of the data. Can be '', 'CLIP_LEFT', 'CLIP_RIGHT', or 'CLIP_BOTH'. - calc_multiplier: Optional. A boolean flag to determine if a multiplier (Z-score) should be calculated. Returns: - Tuple containing: - The mean of the means of the filtered values. - The adjusted maximum standard deviation plus the standard deviation of means multiplied by the coherence score. - The maximum Z-score of the filtered values (if calc_multiplier is True). Raises: - NotEnoughDataException: Raises if the sub_values array has insufficient data for analysis. """ if len(sub_values.shape) < 2 or sub_values.shape[0] < 1 or sub_values.shape[1] <= 2 * DEFAULT_EDGE_CLIP: raise NotEnoughDataException("Unexpected sub_values in _calc_segment_statistics") if clip == CLIP_LEFT: sub_values = sub_values[:, DEFAULT_EDGE_CLIP:] elif clip == CLIP_RIGHT: sub_values = sub_values[:, :-DEFAULT_EDGE_CLIP] elif clip == CLIP_BOTH: sub_values = sub_values[:, DEFAULT_EDGE_CLIP:-DEFAULT_EDGE_CLIP] # sensitivity adjustment # filter out outliers for each row in sub_values, where each row in the sub_value is grouped by the time policy # After filtering, the length of row will be different so we use list instead of np.array filtered_values = [outlier_filter(values, filter_config) for values in sub_values] # compute means and stdev on filitered values means = np.array([np.mean(row) for row in filtered_values]) stds = np.array([np.std(row) for row in filtered_values]) if calc_multiplier: if stds.max() <= EPSILON: logger.info(f'Zero Std in historical normal behavior. Use default Z {DEFAULT_Z:.2f}') stds = np.absolute(means) * 0.01 z = DEFAULT_Z else: zs = np.empty(sub_values.shape[0]) for i in range(sub_values.shape[0]): if stds[i] == 0.0: zs[i] = 0.0 continue # the z-score need to use the filtered value correspondingly sub = filtered_values[i] sub_zs = np.absolute(sub - means[i]) / stds[i] zs[i] = sub_zs.max() z = zs.max() else: z = 0.0 # Calculate the standard deviation for segments as a combination of two components: # 1) Within-Segment Variation: Take the maximum standard deviation among all segments. # 2) Inter-Segment Variation: Compute the standard deviation of the segment means. # # To handle edge cases involving perfectly regular patterns (e.g., synthetic test time series) where inter-segment variation is nearly zero, # set a lower bound for the inter-segment variation (part 2) at 5% of the within-segment variation (part 1). This effectively introduces # a 5% buffer to the calculation, addressing potential false positives caused by perfectly repeating patterns in synthetic scenarios. std_rslt = stds.max() + max(VARIATION_BUFFER * stds.max(), means.std() * collection_coherence_score) return means.mean(), std_rslt, z def _calc_subsequence_statistics(sub_values, collection_coherence_score, split_pnts, one_hour_len, filter_config, sub_values_all=None): def bridge_calc_segment_statistics(left, right, clip): piece_data_all = sub_values_all[:, left:right] return _calc_segment_statistics(sub_values[:, left:right], collection_coherence_score, filter_config, clip=clip, sub_values_all=piece_data_all) segment_len = sub_values.shape[1] if split_pnts is not None: stats1, stats2, multipliers = [], [], [] left = 0 for i in split_pnts: right = i * one_hour_len (stat1, stat2, z) = bridge_calc_segment_statistics(left, right, CLIP_LEFT if left == 0 else '') stats1.append(stat1) stats2.append(stat2) multipliers.append(z) left = right if left < segment_len: (stat1, stat2, z) = bridge_calc_segment_statistics(left, segment_len, CLIP_RIGHT) stats1.append(stat1) stats2.append(stat2) multipliers.append(z) return stats1, stats2, multipliers (stat1, stat2, z) = bridge_calc_segment_statistics(0, segment_len, CLIP_BOTH) return [stat1], [stat2], [z] class HistoryNormal: def __init__(self, sub_values, collection_coherence_score, filter_config, split_pnts=None, one_hour_len=0, sub_values_all=None) -> None: assert len(sub_values.shape) == 2 and sub_values.shape[0] >= 1 and sub_values.shape[1] > 2 * DEFAULT_EDGE_CLIP, "Unexpected sub_values in HistoryNormal init" self.split_pnts = split_pnts self.one_hour_len = one_hour_len stats1, stats2, multipliers = _calc_subsequence_statistics( sub_values, collection_coherence_score, split_pnts, one_hour_len, filter_config, sub_values_all=sub_values_all ) self.means = np.array(stats1) self.stds = np.array(stats2) self.zs = np.array(multipliers) def __str__(self) -> str: if self.split_pnts is not None: return f'HistoryNormal({self.split_pnts}, means={format_float_list(self.means)}, stds={format_float_list(self.stds)}, zs={format_float_list(self.zs)})' else: return f'HistoryNormal(mean={self.means[0]:.3f}, std={self.stds[0]:.3f}, z={self.zs[0]:.3f})' def calc_history_normal_behavior_for_cluster(subs, collection_coherence_score, filter_config, percentile_threshold=HISTORY_NORMAL_PERCENTILE_THRESHOLD): def is_too_short(): #TODO: return subs[0].length <= 10 or subs[0].duration <= timedelta(hours=4) sub_values_all, _ = pack_sub_values_with_filter(subs, 0) # pack values of subsequence w/o filtering sub_filtered_values, percentiles = pack_sub_values_with_filter(subs, percentile_threshold) for (sub, percentile) in zip(subs, percentiles): sub.score_percentile = percentile # set this value here, to show in plot later if is_too_short() : # no need to split return HistoryNormal(sub_filtered_values, collection_coherence_score, filter_config, sub_values_all=sub_values_all) sub_len = subs[0].length sub_hour = int(subs[0].duration.total_seconds() // 3600) # segment duration in hour one_hour_len = sub_len // sub_hour # number of points for one hour duration if one_hour_len < SPLIT_DELTA: split_delta = SPLIT_DELTA // one_hour_len + 1 else: split_delta = 1 (means, stds) = ([], []) for i in range(0, sub_hour, split_delta): piece_data = sub_filtered_values[:, i * one_hour_len : min(sub_len, (i + split_delta) * one_hour_len)] clip = CLIP_LEFT if i==0 else CLIP_RIGHT if i + split_delta >= sub_len else '' mean, std, _ = _calc_segment_statistics(piece_data, collection_coherence_score, filter_config, calc_multiplier=False, clip=clip) means.append(np.mean(mean)) stds.append(std) split_pnts = [] for i in range(1, len(means)): mean_diff = np.absolute(means[i] - means[i-1]) std = min(stds[i], stds[i-1]) if mean_diff > std * 0.3: split_pnts.append(i) if len(split_pnts) > 0: return HistoryNormal(sub_filtered_values, collection_coherence_score, filter_config, split_pnts=[i * split_delta for i in split_pnts], one_hour_len=one_hour_len, sub_values_all=sub_values_all) else: # no splitting return HistoryNormal(sub_filtered_values, collection_coherence_score, filter_config, sub_values_all=sub_values_all) def calc_history_normal_behavior(df, tp, filter_config): subs, label_method = subsequence_by_time_policy(df, tp) history_normal_dict = {} for label, subs_group in list(sub_sequences_groupby(subs, label_method).items()): history_normal = calc_history_normal_behavior_for_cluster(subs_group, tp.score, filter_config) logger.debug(f'{str(history_normal)}') history_normal_dict[label] = history_normal return history_normal_dict, subs, label_method def itsi_thresholding(df, history_normal_dict, subs, label_method, clip_lower=False): # avoid possible partial subsequence at the beginning and the end, to reduce FP if subs[0].length < subs[1].length: subs = subs[1:] if subs[-1].length < subs[-2].length: subs = subs[:-1] subs_total_len = sum([s.length for s in subs]) # subs may have different length due to possible missing values bnd_up = np.empty(subs_total_len) bnd_low = np.empty(subs_total_len) edge_mask = np.empty(subs_total_len, dtype=int) # use edge_mask to reduce FP at segments' edges edge_length = get_edge_length() idx = 0 for sub in subs: sub_label = label_method(sub) history_normal = history_normal_dict[sub_label] threshold = get_thresholds_from_history(history_normal, sub.length) bnd_up[idx : idx + sub.length] = threshold[0] bnd_low[idx : idx + sub.length] = threshold[1] if sub.length > 2 * edge_length: edge_mask[idx : idx + sub.length] = np.concatenate(( np.zeros(edge_length, dtype=int), np.ones(sub.length - 2*edge_length, dtype=int), np.zeros(edge_length, dtype=int))) else: edge_mask[idx : idx + sub.length] = np.zeros(sub.length, dtype=int) idx += sub.length head_len = sum(df.index < subs[0].start_time) tail_len = df.shape[0] - head_len - bnd_up.shape[0] df_head = np.array(df[COL_VALUE][:head_len]) df_tail = np.array(df[COL_VALUE][-tail_len:]) if clip_lower: bnd_low = np.clip(bnd_low, df[COL_VALUE].min(), None) df[COL_BND_LOW] = np.concatenate(( np.full(head_len, df_head.min()), bnd_low, np.full(tail_len, df_tail.min()))) df[COL_BND_UP] = np.concatenate(( np.full(head_len, df_head.max()), bnd_up, np.full(tail_len, df_tail.max()))) edge_mask = np.concatenate(( np.zeros(head_len, dtype=int), edge_mask, np.zeros(tail_len, dtype=int))) df[COL_EDGE_MASK] = edge_mask return df def get_thresholds_from_history(history_normal, length): def _get_up_low_bnds(idx): variation = history_normal.stds[idx] * history_normal.zs[idx] mid = history_normal.means[idx] return mid + variation, mid - variation if history_normal.split_pnts is not None: bnd_up = np.empty(length) bnd_low = np.empty(length) left = 0 for i, split_pnt in enumerate(history_normal.split_pnts): right = split_pnt * history_normal.one_hour_len (up, low) = _get_up_low_bnds(i) bnd_up[left : right] = up bnd_low[left : right] = low left = right if left < length: (up, low) = _get_up_low_bnds(-1) bnd_up[left :] = up bnd_low[left :] = low else: (up, low) = _get_up_low_bnds(0) bnd_up = np.full(length, up) bnd_low = np.full(length, low) return bnd_up, bnd_low