You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
436 lines
19 KiB
436 lines
19 KiB
from functools import partial
|
|
from util.data_prepare import DEFAULT_Z, EPSILON
|
|
import numpy as np
|
|
from util import setup_logging
|
|
from constants import CONSTANT_TIMESERIES_SENSITIVITY_DICT, SensitivityLevelConstants
|
|
logger = setup_logging.get_logger()
|
|
|
|
THR_DIR_BOTH = "both"
|
|
THR_DIR_UP = "upper"
|
|
THR_DIR_LO = "lower"
|
|
THR_DIR_AUTO = "auto"
|
|
HIGH_THRESHOLD_ZSCORE_MULTIPLIER = 1.2
|
|
CRITICAL_THRESHOLD_ZSCORE_MULTIPLIER = 1.4
|
|
|
|
CRITICAL = "critical"
|
|
HIGH = "high"
|
|
MEDIUM = "medium"
|
|
NORMAL = "normal"
|
|
|
|
NO_RECOMMEND_NOT_ENOUGH_DATA = "NO_RECOMMEND_NOT_ENOUGH_DATA"
|
|
|
|
class ThresholdGenerator:
|
|
def __init__(self, mean, stdev, threshold_rounding, threshold_direction, non_negative, use_static=False):
|
|
self.mean = mean
|
|
self.stdev = stdev
|
|
self.use_static = use_static
|
|
self.non_negative = non_negative
|
|
self.threshold_rounding = threshold_rounding
|
|
# rouding function is decorated by rounding digits
|
|
self.rounding_func = partial(self._compute_rounding, ndigits=threshold_rounding,
|
|
non_negative=non_negative)
|
|
# decorate the compute threshold function make it adopt the non_negative
|
|
self.compute_thres = partial(self._compute_threshold,
|
|
use_static=use_static)
|
|
self.threshold_direction = threshold_direction
|
|
|
|
|
|
def _compute_rounding(self, value, ndigits, non_negative):
|
|
"""
|
|
Compute the rounding of a given value based on the number of digits and a non-negative flag.
|
|
|
|
Returns:
|
|
- float: The rounded value.
|
|
"""
|
|
if non_negative and not self.use_static:
|
|
multiple = pow(10, ndigits)
|
|
return np.ceil(value * multiple) / multiple
|
|
else:
|
|
return np.round(value, decimals=ndigits)
|
|
|
|
def _transform_value_to_zscore(self, value):
|
|
"""
|
|
Transform a value to its z-score, using a default value if the standard deviation is zero.
|
|
|
|
Returns:
|
|
- float: The z-score of the value, rounded using the rounding function.
|
|
"""
|
|
if self.stdev == 0.0:
|
|
logger.info(f'Zero Std in transforming value to z-score. Use default Z {DEFAULT_Z:.2f}')
|
|
zscore = DEFAULT_Z
|
|
else:
|
|
zscore = (value - self.mean) / self.stdev
|
|
|
|
return self.rounding_func(zscore)
|
|
|
|
def _compute_threshold(self, zscore, use_static):
|
|
"""
|
|
Key function that computes the threshold based on zscore, non-negative and use_static
|
|
requirement.
|
|
|
|
Parameters:
|
|
- zscore (float): The z-score to be processed.
|
|
- non_negative_thres (bool): Enforce non-negative threshold.
|
|
- use_static (bool): Return actual value if True, otherwise return zscore.
|
|
|
|
Returns:
|
|
- float: The computed threshold.
|
|
"""
|
|
# Process the z-score based on the use_static flag
|
|
if use_static:
|
|
value_of_zscore = transfer_zscore_to_boundary(zscore, self.mean, self.stdev)
|
|
value = self.rounding_func(value_of_zscore)
|
|
else:
|
|
value = self.rounding_func(zscore)
|
|
return value
|
|
|
|
def _cascade_thresholds(self, threshold):
|
|
"""
|
|
Simple cascading thresholds for the initial release
|
|
"""
|
|
medium = threshold
|
|
high = medium * HIGH_THRESHOLD_ZSCORE_MULTIPLIER
|
|
critical = medium * CRITICAL_THRESHOLD_ZSCORE_MULTIPLIER
|
|
|
|
return (critical, high, medium)
|
|
|
|
def is_close(self, zscore1, zscore2):
|
|
"""
|
|
Determines if two z-scores are close to each other in significant digits according
|
|
to their reverted boundaries.
|
|
|
|
Returns:
|
|
- bool: True if the rounded boundaries of zscore1 and zscore2 are close (equal).
|
|
"""
|
|
|
|
# Convert z-scores to boundary values based on the object's mean and standard deviation
|
|
boundary1 = zscore1 if self.use_static else transfer_zscore_to_boundary(zscore1, self.mean, self.stdev)
|
|
boundary2 = zscore2 if self.use_static else transfer_zscore_to_boundary(zscore2, self.mean, self.stdev)
|
|
|
|
# Round the boundary values using the specified rounding function
|
|
rounded1 = self.rounding_func(boundary1)
|
|
rounded2 = self.rounding_func(boundary2)
|
|
|
|
# Check if the rounded boundaries are exactly the same
|
|
return abs(rounded1 - rounded2) == 0
|
|
|
|
def check_non_negative_overlapping(self, critical, high, medium):
|
|
"""
|
|
Checks if critical, high, and medium levels are close to each other.
|
|
|
|
Parameters:
|
|
- critical, high, medium: Threshold values, either scalar or lists.
|
|
|
|
Returns:
|
|
- bool: True if the levels are close, False otherwise.
|
|
"""
|
|
if not isinstance(high, list) and not isinstance(medium, list):
|
|
# Scalar case for single direction: Check closeness for critical, high, and medium
|
|
return self.is_close(critical, high) and self.is_close(high, medium)
|
|
elif isinstance(high, list) and isinstance(medium, list):
|
|
# List case for both direction: Check closeness for the first elements of critical, high, and medium
|
|
return self.is_close(critical, high[0]) and self.is_close(high[0], medium[0])
|
|
return False
|
|
|
|
def converting_negative_to_zero(self, res, bottom):
|
|
for key, values in res.items():
|
|
if isinstance(values, list):
|
|
# List case for both direction(THR_DIR_BOTH)
|
|
new_values = [max(value, bottom) for value in values]
|
|
else:
|
|
# Scalar case for single direction(THR_DIR_UP, THR_DIR_LO)
|
|
new_values = max(values, bottom)
|
|
res[key] = new_values
|
|
return res
|
|
|
|
def filter_out_overlaping_non_negative(self, res, bottom):
|
|
def remove_if_match_bottom(key):
|
|
"""Helper function to remove values overlapping bottom.
|
|
There are only two values in HIGH and MEDIUM list [upper_bound, lower_bound]
|
|
If both of them match bottom, we remove the field from res
|
|
If only lower bound match bottom, we only remove lower bound
|
|
otherwise do nothing
|
|
"""
|
|
if res[key][0] == bottom and res[key][1] == bottom:
|
|
res.pop(key)
|
|
elif res[key][1] == bottom:
|
|
res[key].pop()
|
|
# Get HIGH and MEDIUM field, if they exists in res
|
|
high, medium = res.get(HIGH, None), res.get(MEDIUM, None)
|
|
if high and medium and not isinstance(res[HIGH], list):
|
|
# Handle cases where HIGH and MEDIUM are floats
|
|
if self.is_close(res[HIGH], res[MEDIUM]):
|
|
res.pop(MEDIUM)
|
|
return res
|
|
# Handle cases where HIGH and MEDIUM are lists
|
|
if high and isinstance(res[HIGH], list):
|
|
remove_if_match_bottom(HIGH)
|
|
if medium and isinstance(res[MEDIUM], list):
|
|
remove_if_match_bottom(MEDIUM)
|
|
return res
|
|
|
|
def filter_out_overlaping(self, res):
|
|
"""
|
|
Removes duplicated values from the dictionary `res`, ensuring "critical" is
|
|
always preserved. If "normal" is present, it is also preserved.
|
|
Keys with empty lists are dropped.
|
|
Returns:
|
|
dict: A dictionary with duplicates removed,
|
|
keeping "critical" and "normal" unchanged if present.
|
|
"""
|
|
# Always preserve "critical" and "normal"
|
|
unique_values = set([res[CRITICAL]])
|
|
filtered_result = {CRITICAL: res[CRITICAL]}
|
|
if NORMAL in res:
|
|
unique_values.add(res[NORMAL])
|
|
filtered_result[NORMAL] = res[NORMAL]
|
|
for key in [HIGH, MEDIUM]:
|
|
if key not in res:
|
|
continue
|
|
value = res[key]
|
|
if isinstance(value, list):
|
|
# Remove duplicates from lists
|
|
filtered_list = []
|
|
for v in value:
|
|
if v not in unique_values: # Only include if it's unique
|
|
filtered_list.append(v)
|
|
unique_values.add(v)
|
|
if filtered_list: # Only keep non-empty lists
|
|
filtered_result[key] = filtered_list
|
|
elif value not in unique_values:
|
|
# Add scalar values if not duplicated
|
|
filtered_result[key] = value
|
|
unique_values.add(value)
|
|
# keep the original order
|
|
return {k:filtered_result[k] for k in [CRITICAL, HIGH, MEDIUM, NORMAL] if k in filtered_result}
|
|
|
|
|
|
def threshold_error_checking(self, res):
|
|
critical, high, medium = res.get(CRITICAL, None), res.get(HIGH, None), res.get(MEDIUM, None)
|
|
# high and medium can be list, float, or None, if list, we need to check the upperbound
|
|
high = high[0] if high and isinstance(high, list) else high
|
|
medium = medium[0] if medium and isinstance(medium, list) else medium
|
|
# high>critical and medium > high can happen when the variance is small and mean is slightly smaller than 0,
|
|
# which create the numerical error, we filter out these cases
|
|
if self.threshold_direction == THR_DIR_LO:
|
|
# case of THR_DIR_BOTH and THR_DIR_UP
|
|
if high and high < critical:
|
|
res.pop(HIGH)
|
|
if medium and high and medium < high:
|
|
res.pop(MEDIUM)
|
|
else: # case of THR_DIR_BOTH and THR_DIR_UP
|
|
if high and high > critical:
|
|
res.pop(HIGH)
|
|
if medium and high and medium > high:
|
|
res.pop(MEDIUM)
|
|
return res
|
|
|
|
def non_negative_filter(self, res):
|
|
"""
|
|
Applies non-negative enforcement and adjusts threshold values accordingly.
|
|
|
|
Parameters:
|
|
- res dict: contains following field:
|
|
CRITICAL: [str, float],
|
|
HIGH: [str, float or list],
|
|
MEDIUM: [str, float or list],
|
|
NORMAL: [str, float or None]
|
|
Returns:
|
|
- dict: A dictionary containing the non-negative enforcement adjusted threshold levels.
|
|
"""
|
|
# Determine the non-negative lower bound
|
|
bottom = self.rounding_func(0.0 if self.use_static else self._transform_value_to_zscore(0.0))
|
|
# converting all thresholds below lower bound (bottom) to the lower bound
|
|
res = self.converting_negative_to_zero(res, bottom)
|
|
|
|
return self.filter_out_overlaping_non_negative(res, bottom)
|
|
|
|
def final_threshold_filter(self, critical, high, medium, normal=None):
|
|
"""
|
|
Filters threshold levels based on closeness and non-negative threshold requirements.
|
|
|
|
Parameters:
|
|
- critical, high, medium, normal: Threshold values, which can be either lists or scalar values.
|
|
|
|
Returns:
|
|
- dict: A dictionary containing the filtered threshold levels.
|
|
"""
|
|
# We check whether critical, high, medium are close or not first
|
|
if self.check_non_negative_overlapping(critical, high, medium):
|
|
res = {CRITICAL: critical, NORMAL: normal} if normal else {CRITICAL: critical}
|
|
else:
|
|
if normal:
|
|
res = {CRITICAL: critical, HIGH: high, MEDIUM: medium, NORMAL: normal}
|
|
else:
|
|
res = {CRITICAL: critical, HIGH: high, MEDIUM: medium}
|
|
# Apply non-negative filter
|
|
if self.non_negative and self.threshold_direction != THR_DIR_LO:
|
|
res = self.non_negative_filter(res)
|
|
# filter out overlapping values
|
|
res = self.filter_out_overlaping(res)
|
|
# Apply error checking
|
|
res = self.threshold_error_checking(res)
|
|
# Ensure critical is larger than normal with small range
|
|
if NORMAL in res and self.is_close(res[CRITICAL], res[NORMAL]):
|
|
res = self.adding_gap(res)
|
|
return res
|
|
|
|
def adding_gap(self, res):
|
|
"""
|
|
Adds a small gap (according to the rounding digits) to the CRITICAL threshold value.
|
|
"""
|
|
value = pow(10, -self.threshold_rounding)
|
|
# In case of THR_DIR_LO, we need to minus gap, otherwise add gap to CRITICAL
|
|
gap = -value if self.threshold_direction == THR_DIR_LO else value
|
|
# this rounding is necessary for preventing numerical error
|
|
res[CRITICAL] = self.rounding_func(res[CRITICAL] + gap)
|
|
return res
|
|
|
|
def process(self, value):
|
|
"""
|
|
Main process of generate threshold according to the threshold direction
|
|
|
|
Parameters:
|
|
- value (float): The input value to generate thresholds from.
|
|
|
|
Returns:
|
|
- str: A formatted string representing the computed thresholds for different levels (critical, high, medium, normal).
|
|
"""
|
|
# Compute the initial thresholds for critical, high, and medium levels using the cascade method
|
|
critical, high, medium = self._cascade_thresholds(value)
|
|
|
|
if self.threshold_direction == THR_DIR_BOTH:
|
|
critical_final = self.compute_thres(critical)
|
|
high_final = [self.compute_thres(i) for i in [high, -critical]]
|
|
medium_final = [self.compute_thres(i) for i in [medium, -high]]
|
|
normal_final = self.compute_thres(-medium)
|
|
filtered_res = self.final_threshold_filter(critical_final, high_final, medium_final, normal_final)
|
|
return format_thresholds(filtered_res)
|
|
elif self.threshold_direction == THR_DIR_UP:
|
|
res = [self.compute_thres(thres) for thres in [critical, high, medium]]
|
|
critical_final, high_final, medium_final = res
|
|
filtered_res = self.final_threshold_filter(critical_final, high_final, medium_final)
|
|
# THR_DIR_UP direction does not need normal
|
|
filtered_res.pop(NORMAL, None)
|
|
return format_thresholds(filtered_res)
|
|
else:
|
|
# This is the case of the threshold direction is THR_DIR_LO
|
|
critical_final = self.compute_thres(-critical)
|
|
high_final = self.compute_thres(-high)
|
|
medium_final = self.compute_thres(-medium)
|
|
filtered_res = self.final_threshold_filter(critical_final, high_final, medium_final)
|
|
# THR_DIR_LO direction does not have critical and its name also need shift, the format/field should be
|
|
# example: f"{{'{HIGH}': {critical_final}, '{MEDIUM}': {high_final}, '{NORMAL}': {medium_final}}}"
|
|
|
|
filtered_res[NORMAL] = filtered_res[MEDIUM]
|
|
## rethink here need to check these logic: the HIGH might not in the logic
|
|
filtered_res[MEDIUM] = filtered_res[HIGH]
|
|
filtered_res[HIGH] = filtered_res[CRITICAL]
|
|
filtered_res.pop(CRITICAL, None)
|
|
return format_thresholds(filtered_res)
|
|
|
|
def output_thresholds_dict(
|
|
threshold, mean, stdev, threshold_rounding, threshold_direction, non_negative=True, use_static=False
|
|
):
|
|
threshold_generator = ThresholdGenerator(mean=mean, stdev=stdev,
|
|
threshold_rounding=threshold_rounding,
|
|
threshold_direction=threshold_direction,
|
|
non_negative=non_negative,
|
|
use_static=use_static)
|
|
|
|
return threshold_generator.process(threshold)
|
|
|
|
def calc_constant_time_series_thresholds(max_value, min_value, median, threshold_rounding, threshold_direction,
|
|
filter_config_choice, non_negative):
|
|
"""
|
|
Calculate threshold boundaries for "constant time series". When the input data is constant/near constant, the stdev is
|
|
too small to provide useful threshold. This method return value is calculated based on static threshold
|
|
using max/min values, median, sensitivity level, and direction. The use_static flag should be marked as "True"
|
|
when use this function.
|
|
|
|
Parameters:
|
|
max_value (float): The maximum value used for the upper bound calculation.
|
|
min_value (float): The minimum value used for the lower bound calculation.
|
|
median (float): The median value used to adjust the bounds based on sensitivity.
|
|
threshold_rounding (int): Number of decimal places to round the threshold values.
|
|
threshold_direction (str): Direction of the threshold. Can be THR_DIR_BOTH, THR_DIR_UP, or THR_DIR_LO.
|
|
filter_config_choice (str): Sensitivity level configuration. Chooses the adjustment factor from a constants map.
|
|
non_negative (bool): If True, ensures the lower bound is not negative.
|
|
|
|
Returns:
|
|
str: A string formatted dictionary containing the calculated threshold(s) based on the direction specified.
|
|
"""
|
|
# Determine the sensitivity level based on the filter configuration choice
|
|
default_sensitivity_adjustment = CONSTANT_TIMESERIES_SENSITIVITY_DICT[SensitivityLevelConstants.LOW]
|
|
# We are using the "low" sensitivity as the default value.
|
|
sensitivity_level = CONSTANT_TIMESERIES_SENSITIVITY_DICT.get(filter_config_choice, default_sensitivity_adjustment)
|
|
|
|
# Calculate the upper and lower threshold bounds
|
|
upper_bound = round(max_value + median * sensitivity_level, threshold_rounding)
|
|
lower_bound = round(min_value - median * sensitivity_level, threshold_rounding)
|
|
|
|
# Apply non-negative filter if needed
|
|
if non_negative:
|
|
lower_bound = max(0, lower_bound)
|
|
|
|
# Return thresholds based on the threshold direction, for each sensitivity level we provide MEDIUM level.
|
|
if threshold_direction == THR_DIR_UP:
|
|
return f"{{'{CRITICAL}': {upper_bound}}}"
|
|
elif threshold_direction == THR_DIR_LO:
|
|
return f"{{'{CRITICAL}': {lower_bound}}}"
|
|
else:
|
|
# if user does not choose any direction, the default is both direction,
|
|
# this case cover "both" and "auto" directions
|
|
return f"{{'{CRITICAL}': {[upper_bound, lower_bound]}}}"
|
|
|
|
def parse_res_str(res_str):
|
|
"""
|
|
Parse a string representing a dictionary back to an actual dictionary.
|
|
|
|
Parameters:
|
|
- res_str (str): A string representation of a dictionary with key-value pairs.
|
|
Example: "'critical': 1.0, 'high': 2.0, 'medium': 3.0"
|
|
|
|
Returns:
|
|
- dict: The parsed dictionary with keys as strings and values as floats.
|
|
Example: {'critical': 1.0, 'high': 2.0, 'medium': 3.0}
|
|
"""
|
|
# Strip the outer curly braces and split the string by commas to separate the key-value pairs
|
|
l = res_str.strip("{}").split(",")
|
|
# For each key-value pair string, strip any remaining commas and split by the colon to separate key and value
|
|
l = [e.strip(",").split(":") for e in l]
|
|
# Construct the dictionary by stripping whitespace and quotes from keys, and converting values to floats
|
|
r = {e[0].strip(" '"): float(e[1]) for e in l}
|
|
return r
|
|
|
|
def confidence_description(score):
|
|
if score > 0.6:
|
|
return "High"
|
|
elif score > 0.4:
|
|
return "Medium"
|
|
elif score > 0.1:
|
|
return "Low"
|
|
else:
|
|
return "No Pattern"
|
|
|
|
def transfer_zscore_to_boundary(zscore, mean, stdev):
|
|
# compute the actual boundary (value) accroding to zscore and mean+stdev
|
|
return zscore * stdev + mean
|
|
|
|
def format_thresholds(result):
|
|
"""
|
|
Formats the result dictionary into the specified string format, including only present keys.
|
|
|
|
Parameters:
|
|
- result (dict): A dictionary containing threshold levels.
|
|
|
|
Returns:
|
|
- str: A formatted string with the specified threshold levels.
|
|
"""
|
|
formatted_items = []
|
|
for k, v in result.items():
|
|
formatted_items.append(f"'{k}': {v}")
|
|
|
|
# Join all formatted items with commas and enclose in braces
|
|
return f"{{{', '.join(formatted_items)}}}"
|