from functools import partial from util.data_prepare import DEFAULT_Z, EPSILON import numpy as np from util import setup_logging from constants import CONSTANT_TIMESERIES_SENSITIVITY_DICT, SensitivityLevelConstants logger = setup_logging.get_logger() THR_DIR_BOTH = "both" THR_DIR_UP = "upper" THR_DIR_LO = "lower" THR_DIR_AUTO = "auto" HIGH_THRESHOLD_ZSCORE_MULTIPLIER = 1.2 CRITICAL_THRESHOLD_ZSCORE_MULTIPLIER = 1.4 CRITICAL = "critical" HIGH = "high" MEDIUM = "medium" NORMAL = "normal" NO_RECOMMEND_NOT_ENOUGH_DATA = "NO_RECOMMEND_NOT_ENOUGH_DATA" class ThresholdGenerator: def __init__(self, mean, stdev, threshold_rounding, threshold_direction, non_negative, use_static=False): self.mean = mean self.stdev = stdev self.use_static = use_static self.non_negative = non_negative self.threshold_rounding = threshold_rounding # rouding function is decorated by rounding digits self.rounding_func = partial(self._compute_rounding, ndigits=threshold_rounding, non_negative=non_negative) # decorate the compute threshold function make it adopt the non_negative self.compute_thres = partial(self._compute_threshold, use_static=use_static) self.threshold_direction = threshold_direction def _compute_rounding(self, value, ndigits, non_negative): """ Compute the rounding of a given value based on the number of digits and a non-negative flag. Returns: - float: The rounded value. """ if non_negative and not self.use_static: multiple = pow(10, ndigits) return np.ceil(value * multiple) / multiple else: return np.round(value, decimals=ndigits) def _transform_value_to_zscore(self, value): """ Transform a value to its z-score, using a default value if the standard deviation is zero. Returns: - float: The z-score of the value, rounded using the rounding function. """ if self.stdev == 0.0: logger.info(f'Zero Std in transforming value to z-score. Use default Z {DEFAULT_Z:.2f}') zscore = DEFAULT_Z else: zscore = (value - self.mean) / self.stdev return self.rounding_func(zscore) def _compute_threshold(self, zscore, use_static): """ Key function that computes the threshold based on zscore, non-negative and use_static requirement. Parameters: - zscore (float): The z-score to be processed. - non_negative_thres (bool): Enforce non-negative threshold. - use_static (bool): Return actual value if True, otherwise return zscore. Returns: - float: The computed threshold. """ # Process the z-score based on the use_static flag if use_static: value_of_zscore = transfer_zscore_to_boundary(zscore, self.mean, self.stdev) value = self.rounding_func(value_of_zscore) else: value = self.rounding_func(zscore) return value def _cascade_thresholds(self, threshold): """ Simple cascading thresholds for the initial release """ medium = threshold high = medium * HIGH_THRESHOLD_ZSCORE_MULTIPLIER critical = medium * CRITICAL_THRESHOLD_ZSCORE_MULTIPLIER return (critical, high, medium) def is_close(self, zscore1, zscore2): """ Determines if two z-scores are close to each other in significant digits according to their reverted boundaries. Returns: - bool: True if the rounded boundaries of zscore1 and zscore2 are close (equal). """ # Convert z-scores to boundary values based on the object's mean and standard deviation boundary1 = zscore1 if self.use_static else transfer_zscore_to_boundary(zscore1, self.mean, self.stdev) boundary2 = zscore2 if self.use_static else transfer_zscore_to_boundary(zscore2, self.mean, self.stdev) # Round the boundary values using the specified rounding function rounded1 = self.rounding_func(boundary1) rounded2 = self.rounding_func(boundary2) # Check if the rounded boundaries are exactly the same return abs(rounded1 - rounded2) == 0 def check_non_negative_overlapping(self, critical, high, medium): """ Checks if critical, high, and medium levels are close to each other. Parameters: - critical, high, medium: Threshold values, either scalar or lists. Returns: - bool: True if the levels are close, False otherwise. """ if not isinstance(high, list) and not isinstance(medium, list): # Scalar case for single direction: Check closeness for critical, high, and medium return self.is_close(critical, high) and self.is_close(high, medium) elif isinstance(high, list) and isinstance(medium, list): # List case for both direction: Check closeness for the first elements of critical, high, and medium return self.is_close(critical, high[0]) and self.is_close(high[0], medium[0]) return False def converting_negative_to_zero(self, res, bottom): for key, values in res.items(): if isinstance(values, list): # List case for both direction(THR_DIR_BOTH) new_values = [max(value, bottom) for value in values] else: # Scalar case for single direction(THR_DIR_UP, THR_DIR_LO) new_values = max(values, bottom) res[key] = new_values return res def filter_out_overlaping_non_negative(self, res, bottom): def remove_if_match_bottom(key): """Helper function to remove values overlapping bottom. There are only two values in HIGH and MEDIUM list [upper_bound, lower_bound] If both of them match bottom, we remove the field from res If only lower bound match bottom, we only remove lower bound otherwise do nothing """ if res[key][0] == bottom and res[key][1] == bottom: res.pop(key) elif res[key][1] == bottom: res[key].pop() # Get HIGH and MEDIUM field, if they exists in res high, medium = res.get(HIGH, None), res.get(MEDIUM, None) if high and medium and not isinstance(res[HIGH], list): # Handle cases where HIGH and MEDIUM are floats if self.is_close(res[HIGH], res[MEDIUM]): res.pop(MEDIUM) return res # Handle cases where HIGH and MEDIUM are lists if high and isinstance(res[HIGH], list): remove_if_match_bottom(HIGH) if medium and isinstance(res[MEDIUM], list): remove_if_match_bottom(MEDIUM) return res def filter_out_overlaping(self, res): """ Removes duplicated values from the dictionary `res`, ensuring "critical" is always preserved. If "normal" is present, it is also preserved. Keys with empty lists are dropped. Returns: dict: A dictionary with duplicates removed, keeping "critical" and "normal" unchanged if present. """ # Always preserve "critical" and "normal" unique_values = set([res[CRITICAL]]) filtered_result = {CRITICAL: res[CRITICAL]} if NORMAL in res: unique_values.add(res[NORMAL]) filtered_result[NORMAL] = res[NORMAL] for key in [HIGH, MEDIUM]: if key not in res: continue value = res[key] if isinstance(value, list): # Remove duplicates from lists filtered_list = [] for v in value: if v not in unique_values: # Only include if it's unique filtered_list.append(v) unique_values.add(v) if filtered_list: # Only keep non-empty lists filtered_result[key] = filtered_list elif value not in unique_values: # Add scalar values if not duplicated filtered_result[key] = value unique_values.add(value) # keep the original order return {k:filtered_result[k] for k in [CRITICAL, HIGH, MEDIUM, NORMAL] if k in filtered_result} def threshold_error_checking(self, res): critical, high, medium = res.get(CRITICAL, None), res.get(HIGH, None), res.get(MEDIUM, None) # high and medium can be list, float, or None, if list, we need to check the upperbound high = high[0] if high and isinstance(high, list) else high medium = medium[0] if medium and isinstance(medium, list) else medium # high>critical and medium > high can happen when the variance is small and mean is slightly smaller than 0, # which create the numerical error, we filter out these cases if self.threshold_direction == THR_DIR_LO: # case of THR_DIR_BOTH and THR_DIR_UP if high and high < critical: res.pop(HIGH) if medium and high and medium < high: res.pop(MEDIUM) else: # case of THR_DIR_BOTH and THR_DIR_UP if high and high > critical: res.pop(HIGH) if medium and high and medium > high: res.pop(MEDIUM) return res def non_negative_filter(self, res): """ Applies non-negative enforcement and adjusts threshold values accordingly. Parameters: - res dict: contains following field: CRITICAL: [str, float], HIGH: [str, float or list], MEDIUM: [str, float or list], NORMAL: [str, float or None] Returns: - dict: A dictionary containing the non-negative enforcement adjusted threshold levels. """ # Determine the non-negative lower bound bottom = self.rounding_func(0.0 if self.use_static else self._transform_value_to_zscore(0.0)) # converting all thresholds below lower bound (bottom) to the lower bound res = self.converting_negative_to_zero(res, bottom) return self.filter_out_overlaping_non_negative(res, bottom) def final_threshold_filter(self, critical, high, medium, normal=None): """ Filters threshold levels based on closeness and non-negative threshold requirements. Parameters: - critical, high, medium, normal: Threshold values, which can be either lists or scalar values. Returns: - dict: A dictionary containing the filtered threshold levels. """ # We check whether critical, high, medium are close or not first if self.check_non_negative_overlapping(critical, high, medium): res = {CRITICAL: critical, NORMAL: normal} if normal else {CRITICAL: critical} else: if normal: res = {CRITICAL: critical, HIGH: high, MEDIUM: medium, NORMAL: normal} else: res = {CRITICAL: critical, HIGH: high, MEDIUM: medium} # Apply non-negative filter if self.non_negative and self.threshold_direction != THR_DIR_LO: res = self.non_negative_filter(res) # filter out overlapping values res = self.filter_out_overlaping(res) # Apply error checking res = self.threshold_error_checking(res) # Ensure critical is larger than normal with small range if NORMAL in res and self.is_close(res[CRITICAL], res[NORMAL]): res = self.adding_gap(res) return res def adding_gap(self, res): """ Adds a small gap (according to the rounding digits) to the CRITICAL threshold value. """ value = pow(10, -self.threshold_rounding) # In case of THR_DIR_LO, we need to minus gap, otherwise add gap to CRITICAL gap = -value if self.threshold_direction == THR_DIR_LO else value # this rounding is necessary for preventing numerical error res[CRITICAL] = self.rounding_func(res[CRITICAL] + gap) return res def process(self, value): """ Main process of generate threshold according to the threshold direction Parameters: - value (float): The input value to generate thresholds from. Returns: - str: A formatted string representing the computed thresholds for different levels (critical, high, medium, normal). """ # Compute the initial thresholds for critical, high, and medium levels using the cascade method critical, high, medium = self._cascade_thresholds(value) if self.threshold_direction == THR_DIR_BOTH: critical_final = self.compute_thres(critical) high_final = [self.compute_thres(i) for i in [high, -critical]] medium_final = [self.compute_thres(i) for i in [medium, -high]] normal_final = self.compute_thres(-medium) filtered_res = self.final_threshold_filter(critical_final, high_final, medium_final, normal_final) return format_thresholds(filtered_res) elif self.threshold_direction == THR_DIR_UP: res = [self.compute_thres(thres) for thres in [critical, high, medium]] critical_final, high_final, medium_final = res filtered_res = self.final_threshold_filter(critical_final, high_final, medium_final) # THR_DIR_UP direction does not need normal filtered_res.pop(NORMAL, None) return format_thresholds(filtered_res) else: # This is the case of the threshold direction is THR_DIR_LO critical_final = self.compute_thres(-critical) high_final = self.compute_thres(-high) medium_final = self.compute_thres(-medium) filtered_res = self.final_threshold_filter(critical_final, high_final, medium_final) # THR_DIR_LO direction does not have critical and its name also need shift, the format/field should be # example: f"{{'{HIGH}': {critical_final}, '{MEDIUM}': {high_final}, '{NORMAL}': {medium_final}}}" filtered_res[NORMAL] = filtered_res[MEDIUM] ## rethink here need to check these logic: the HIGH might not in the logic filtered_res[MEDIUM] = filtered_res[HIGH] filtered_res[HIGH] = filtered_res[CRITICAL] filtered_res.pop(CRITICAL, None) return format_thresholds(filtered_res) def output_thresholds_dict( threshold, mean, stdev, threshold_rounding, threshold_direction, non_negative=True, use_static=False ): threshold_generator = ThresholdGenerator(mean=mean, stdev=stdev, threshold_rounding=threshold_rounding, threshold_direction=threshold_direction, non_negative=non_negative, use_static=use_static) return threshold_generator.process(threshold) def calc_constant_time_series_thresholds(max_value, min_value, median, threshold_rounding, threshold_direction, filter_config_choice, non_negative): """ Calculate threshold boundaries for "constant time series". When the input data is constant/near constant, the stdev is too small to provide useful threshold. This method return value is calculated based on static threshold using max/min values, median, sensitivity level, and direction. The use_static flag should be marked as "True" when use this function. Parameters: max_value (float): The maximum value used for the upper bound calculation. min_value (float): The minimum value used for the lower bound calculation. median (float): The median value used to adjust the bounds based on sensitivity. threshold_rounding (int): Number of decimal places to round the threshold values. threshold_direction (str): Direction of the threshold. Can be THR_DIR_BOTH, THR_DIR_UP, or THR_DIR_LO. filter_config_choice (str): Sensitivity level configuration. Chooses the adjustment factor from a constants map. non_negative (bool): If True, ensures the lower bound is not negative. Returns: str: A string formatted dictionary containing the calculated threshold(s) based on the direction specified. """ # Determine the sensitivity level based on the filter configuration choice default_sensitivity_adjustment = CONSTANT_TIMESERIES_SENSITIVITY_DICT[SensitivityLevelConstants.LOW] # We are using the "low" sensitivity as the default value. sensitivity_level = CONSTANT_TIMESERIES_SENSITIVITY_DICT.get(filter_config_choice, default_sensitivity_adjustment) # Calculate the upper and lower threshold bounds upper_bound = round(max_value + median * sensitivity_level, threshold_rounding) lower_bound = round(min_value - median * sensitivity_level, threshold_rounding) # Apply non-negative filter if needed if non_negative: lower_bound = max(0, lower_bound) # Return thresholds based on the threshold direction, for each sensitivity level we provide MEDIUM level. if threshold_direction == THR_DIR_UP: return f"{{'{CRITICAL}': {upper_bound}}}" elif threshold_direction == THR_DIR_LO: return f"{{'{CRITICAL}': {lower_bound}}}" else: # if user does not choose any direction, the default is both direction, # this case cover "both" and "auto" directions return f"{{'{CRITICAL}': {[upper_bound, lower_bound]}}}" def parse_res_str(res_str): """ Parse a string representing a dictionary back to an actual dictionary. Parameters: - res_str (str): A string representation of a dictionary with key-value pairs. Example: "'critical': 1.0, 'high': 2.0, 'medium': 3.0" Returns: - dict: The parsed dictionary with keys as strings and values as floats. Example: {'critical': 1.0, 'high': 2.0, 'medium': 3.0} """ # Strip the outer curly braces and split the string by commas to separate the key-value pairs l = res_str.strip("{}").split(",") # For each key-value pair string, strip any remaining commas and split by the colon to separate key and value l = [e.strip(",").split(":") for e in l] # Construct the dictionary by stripping whitespace and quotes from keys, and converting values to floats r = {e[0].strip(" '"): float(e[1]) for e in l} return r def confidence_description(score): if score > 0.6: return "High" elif score > 0.4: return "Medium" elif score > 0.1: return "Low" else: return "No Pattern" def transfer_zscore_to_boundary(zscore, mean, stdev): # compute the actual boundary (value) accroding to zscore and mean+stdev return zscore * stdev + mean def format_thresholds(result): """ Formats the result dictionary into the specified string format, including only present keys. Parameters: - result (dict): A dictionary containing threshold levels. Returns: - str: A formatted string with the specified threshold levels. """ formatted_items = [] for k, v in result.items(): formatted_items.append(f"'{k}': {v}") # Join all formatted items with commas and enclose in braces return f"{{{', '.join(formatted_items)}}}"