You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

436 lines
19 KiB

from functools import partial
from util.data_prepare import DEFAULT_Z, EPSILON
import numpy as np
from util import setup_logging
from constants import CONSTANT_TIMESERIES_SENSITIVITY_DICT, SensitivityLevelConstants
logger = setup_logging.get_logger()
THR_DIR_BOTH = "both"
THR_DIR_UP = "upper"
THR_DIR_LO = "lower"
THR_DIR_AUTO = "auto"
HIGH_THRESHOLD_ZSCORE_MULTIPLIER = 1.2
CRITICAL_THRESHOLD_ZSCORE_MULTIPLIER = 1.4
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
NORMAL = "normal"
NO_RECOMMEND_NOT_ENOUGH_DATA = "NO_RECOMMEND_NOT_ENOUGH_DATA"
class ThresholdGenerator:
def __init__(self, mean, stdev, threshold_rounding, threshold_direction, non_negative, use_static=False):
self.mean = mean
self.stdev = stdev
self.use_static = use_static
self.non_negative = non_negative
self.threshold_rounding = threshold_rounding
# rouding function is decorated by rounding digits
self.rounding_func = partial(self._compute_rounding, ndigits=threshold_rounding,
non_negative=non_negative)
# decorate the compute threshold function make it adopt the non_negative
self.compute_thres = partial(self._compute_threshold,
use_static=use_static)
self.threshold_direction = threshold_direction
def _compute_rounding(self, value, ndigits, non_negative):
"""
Compute the rounding of a given value based on the number of digits and a non-negative flag.
Returns:
- float: The rounded value.
"""
if non_negative and not self.use_static:
multiple = pow(10, ndigits)
return np.ceil(value * multiple) / multiple
else:
return np.round(value, decimals=ndigits)
def _transform_value_to_zscore(self, value):
"""
Transform a value to its z-score, using a default value if the standard deviation is zero.
Returns:
- float: The z-score of the value, rounded using the rounding function.
"""
if self.stdev == 0.0:
logger.info(f'Zero Std in transforming value to z-score. Use default Z {DEFAULT_Z:.2f}')
zscore = DEFAULT_Z
else:
zscore = (value - self.mean) / self.stdev
return self.rounding_func(zscore)
def _compute_threshold(self, zscore, use_static):
"""
Key function that computes the threshold based on zscore, non-negative and use_static
requirement.
Parameters:
- zscore (float): The z-score to be processed.
- non_negative_thres (bool): Enforce non-negative threshold.
- use_static (bool): Return actual value if True, otherwise return zscore.
Returns:
- float: The computed threshold.
"""
# Process the z-score based on the use_static flag
if use_static:
value_of_zscore = transfer_zscore_to_boundary(zscore, self.mean, self.stdev)
value = self.rounding_func(value_of_zscore)
else:
value = self.rounding_func(zscore)
return value
def _cascade_thresholds(self, threshold):
"""
Simple cascading thresholds for the initial release
"""
medium = threshold
high = medium * HIGH_THRESHOLD_ZSCORE_MULTIPLIER
critical = medium * CRITICAL_THRESHOLD_ZSCORE_MULTIPLIER
return (critical, high, medium)
def is_close(self, zscore1, zscore2):
"""
Determines if two z-scores are close to each other in significant digits according
to their reverted boundaries.
Returns:
- bool: True if the rounded boundaries of zscore1 and zscore2 are close (equal).
"""
# Convert z-scores to boundary values based on the object's mean and standard deviation
boundary1 = zscore1 if self.use_static else transfer_zscore_to_boundary(zscore1, self.mean, self.stdev)
boundary2 = zscore2 if self.use_static else transfer_zscore_to_boundary(zscore2, self.mean, self.stdev)
# Round the boundary values using the specified rounding function
rounded1 = self.rounding_func(boundary1)
rounded2 = self.rounding_func(boundary2)
# Check if the rounded boundaries are exactly the same
return abs(rounded1 - rounded2) == 0
def check_non_negative_overlapping(self, critical, high, medium):
"""
Checks if critical, high, and medium levels are close to each other.
Parameters:
- critical, high, medium: Threshold values, either scalar or lists.
Returns:
- bool: True if the levels are close, False otherwise.
"""
if not isinstance(high, list) and not isinstance(medium, list):
# Scalar case for single direction: Check closeness for critical, high, and medium
return self.is_close(critical, high) and self.is_close(high, medium)
elif isinstance(high, list) and isinstance(medium, list):
# List case for both direction: Check closeness for the first elements of critical, high, and medium
return self.is_close(critical, high[0]) and self.is_close(high[0], medium[0])
return False
def converting_negative_to_zero(self, res, bottom):
for key, values in res.items():
if isinstance(values, list):
# List case for both direction(THR_DIR_BOTH)
new_values = [max(value, bottom) for value in values]
else:
# Scalar case for single direction(THR_DIR_UP, THR_DIR_LO)
new_values = max(values, bottom)
res[key] = new_values
return res
def filter_out_overlaping_non_negative(self, res, bottom):
def remove_if_match_bottom(key):
"""Helper function to remove values overlapping bottom.
There are only two values in HIGH and MEDIUM list [upper_bound, lower_bound]
If both of them match bottom, we remove the field from res
If only lower bound match bottom, we only remove lower bound
otherwise do nothing
"""
if res[key][0] == bottom and res[key][1] == bottom:
res.pop(key)
elif res[key][1] == bottom:
res[key].pop()
# Get HIGH and MEDIUM field, if they exists in res
high, medium = res.get(HIGH, None), res.get(MEDIUM, None)
if high and medium and not isinstance(res[HIGH], list):
# Handle cases where HIGH and MEDIUM are floats
if self.is_close(res[HIGH], res[MEDIUM]):
res.pop(MEDIUM)
return res
# Handle cases where HIGH and MEDIUM are lists
if high and isinstance(res[HIGH], list):
remove_if_match_bottom(HIGH)
if medium and isinstance(res[MEDIUM], list):
remove_if_match_bottom(MEDIUM)
return res
def filter_out_overlaping(self, res):
"""
Removes duplicated values from the dictionary `res`, ensuring "critical" is
always preserved. If "normal" is present, it is also preserved.
Keys with empty lists are dropped.
Returns:
dict: A dictionary with duplicates removed,
keeping "critical" and "normal" unchanged if present.
"""
# Always preserve "critical" and "normal"
unique_values = set([res[CRITICAL]])
filtered_result = {CRITICAL: res[CRITICAL]}
if NORMAL in res:
unique_values.add(res[NORMAL])
filtered_result[NORMAL] = res[NORMAL]
for key in [HIGH, MEDIUM]:
if key not in res:
continue
value = res[key]
if isinstance(value, list):
# Remove duplicates from lists
filtered_list = []
for v in value:
if v not in unique_values: # Only include if it's unique
filtered_list.append(v)
unique_values.add(v)
if filtered_list: # Only keep non-empty lists
filtered_result[key] = filtered_list
elif value not in unique_values:
# Add scalar values if not duplicated
filtered_result[key] = value
unique_values.add(value)
# keep the original order
return {k:filtered_result[k] for k in [CRITICAL, HIGH, MEDIUM, NORMAL] if k in filtered_result}
def threshold_error_checking(self, res):
critical, high, medium = res.get(CRITICAL, None), res.get(HIGH, None), res.get(MEDIUM, None)
# high and medium can be list, float, or None, if list, we need to check the upperbound
high = high[0] if high and isinstance(high, list) else high
medium = medium[0] if medium and isinstance(medium, list) else medium
# high>critical and medium > high can happen when the variance is small and mean is slightly smaller than 0,
# which create the numerical error, we filter out these cases
if self.threshold_direction == THR_DIR_LO:
# case of THR_DIR_BOTH and THR_DIR_UP
if high and high < critical:
res.pop(HIGH)
if medium and high and medium < high:
res.pop(MEDIUM)
else: # case of THR_DIR_BOTH and THR_DIR_UP
if high and high > critical:
res.pop(HIGH)
if medium and high and medium > high:
res.pop(MEDIUM)
return res
def non_negative_filter(self, res):
"""
Applies non-negative enforcement and adjusts threshold values accordingly.
Parameters:
- res dict: contains following field:
CRITICAL: [str, float],
HIGH: [str, float or list],
MEDIUM: [str, float or list],
NORMAL: [str, float or None]
Returns:
- dict: A dictionary containing the non-negative enforcement adjusted threshold levels.
"""
# Determine the non-negative lower bound
bottom = self.rounding_func(0.0 if self.use_static else self._transform_value_to_zscore(0.0))
# converting all thresholds below lower bound (bottom) to the lower bound
res = self.converting_negative_to_zero(res, bottom)
return self.filter_out_overlaping_non_negative(res, bottom)
def final_threshold_filter(self, critical, high, medium, normal=None):
"""
Filters threshold levels based on closeness and non-negative threshold requirements.
Parameters:
- critical, high, medium, normal: Threshold values, which can be either lists or scalar values.
Returns:
- dict: A dictionary containing the filtered threshold levels.
"""
# We check whether critical, high, medium are close or not first
if self.check_non_negative_overlapping(critical, high, medium):
res = {CRITICAL: critical, NORMAL: normal} if normal else {CRITICAL: critical}
else:
if normal:
res = {CRITICAL: critical, HIGH: high, MEDIUM: medium, NORMAL: normal}
else:
res = {CRITICAL: critical, HIGH: high, MEDIUM: medium}
# Apply non-negative filter
if self.non_negative and self.threshold_direction != THR_DIR_LO:
res = self.non_negative_filter(res)
# filter out overlapping values
res = self.filter_out_overlaping(res)
# Apply error checking
res = self.threshold_error_checking(res)
# Ensure critical is larger than normal with small range
if NORMAL in res and self.is_close(res[CRITICAL], res[NORMAL]):
res = self.adding_gap(res)
return res
def adding_gap(self, res):
"""
Adds a small gap (according to the rounding digits) to the CRITICAL threshold value.
"""
value = pow(10, -self.threshold_rounding)
# In case of THR_DIR_LO, we need to minus gap, otherwise add gap to CRITICAL
gap = -value if self.threshold_direction == THR_DIR_LO else value
# this rounding is necessary for preventing numerical error
res[CRITICAL] = self.rounding_func(res[CRITICAL] + gap)
return res
def process(self, value):
"""
Main process of generate threshold according to the threshold direction
Parameters:
- value (float): The input value to generate thresholds from.
Returns:
- str: A formatted string representing the computed thresholds for different levels (critical, high, medium, normal).
"""
# Compute the initial thresholds for critical, high, and medium levels using the cascade method
critical, high, medium = self._cascade_thresholds(value)
if self.threshold_direction == THR_DIR_BOTH:
critical_final = self.compute_thres(critical)
high_final = [self.compute_thres(i) for i in [high, -critical]]
medium_final = [self.compute_thres(i) for i in [medium, -high]]
normal_final = self.compute_thres(-medium)
filtered_res = self.final_threshold_filter(critical_final, high_final, medium_final, normal_final)
return format_thresholds(filtered_res)
elif self.threshold_direction == THR_DIR_UP:
res = [self.compute_thres(thres) for thres in [critical, high, medium]]
critical_final, high_final, medium_final = res
filtered_res = self.final_threshold_filter(critical_final, high_final, medium_final)
# THR_DIR_UP direction does not need normal
filtered_res.pop(NORMAL, None)
return format_thresholds(filtered_res)
else:
# This is the case of the threshold direction is THR_DIR_LO
critical_final = self.compute_thres(-critical)
high_final = self.compute_thres(-high)
medium_final = self.compute_thres(-medium)
filtered_res = self.final_threshold_filter(critical_final, high_final, medium_final)
# THR_DIR_LO direction does not have critical and its name also need shift, the format/field should be
# example: f"{{'{HIGH}': {critical_final}, '{MEDIUM}': {high_final}, '{NORMAL}': {medium_final}}}"
filtered_res[NORMAL] = filtered_res[MEDIUM]
## rethink here need to check these logic: the HIGH might not in the logic
filtered_res[MEDIUM] = filtered_res[HIGH]
filtered_res[HIGH] = filtered_res[CRITICAL]
filtered_res.pop(CRITICAL, None)
return format_thresholds(filtered_res)
def output_thresholds_dict(
threshold, mean, stdev, threshold_rounding, threshold_direction, non_negative=True, use_static=False
):
threshold_generator = ThresholdGenerator(mean=mean, stdev=stdev,
threshold_rounding=threshold_rounding,
threshold_direction=threshold_direction,
non_negative=non_negative,
use_static=use_static)
return threshold_generator.process(threshold)
def calc_constant_time_series_thresholds(max_value, min_value, median, threshold_rounding, threshold_direction,
filter_config_choice, non_negative):
"""
Calculate threshold boundaries for "constant time series". When the input data is constant/near constant, the stdev is
too small to provide useful threshold. This method return value is calculated based on static threshold
using max/min values, median, sensitivity level, and direction. The use_static flag should be marked as "True"
when use this function.
Parameters:
max_value (float): The maximum value used for the upper bound calculation.
min_value (float): The minimum value used for the lower bound calculation.
median (float): The median value used to adjust the bounds based on sensitivity.
threshold_rounding (int): Number of decimal places to round the threshold values.
threshold_direction (str): Direction of the threshold. Can be THR_DIR_BOTH, THR_DIR_UP, or THR_DIR_LO.
filter_config_choice (str): Sensitivity level configuration. Chooses the adjustment factor from a constants map.
non_negative (bool): If True, ensures the lower bound is not negative.
Returns:
str: A string formatted dictionary containing the calculated threshold(s) based on the direction specified.
"""
# Determine the sensitivity level based on the filter configuration choice
default_sensitivity_adjustment = CONSTANT_TIMESERIES_SENSITIVITY_DICT[SensitivityLevelConstants.LOW]
# We are using the "low" sensitivity as the default value.
sensitivity_level = CONSTANT_TIMESERIES_SENSITIVITY_DICT.get(filter_config_choice, default_sensitivity_adjustment)
# Calculate the upper and lower threshold bounds
upper_bound = round(max_value + median * sensitivity_level, threshold_rounding)
lower_bound = round(min_value - median * sensitivity_level, threshold_rounding)
# Apply non-negative filter if needed
if non_negative:
lower_bound = max(0, lower_bound)
# Return thresholds based on the threshold direction, for each sensitivity level we provide MEDIUM level.
if threshold_direction == THR_DIR_UP:
return f"{{'{CRITICAL}': {upper_bound}}}"
elif threshold_direction == THR_DIR_LO:
return f"{{'{CRITICAL}': {lower_bound}}}"
else:
# if user does not choose any direction, the default is both direction,
# this case cover "both" and "auto" directions
return f"{{'{CRITICAL}': {[upper_bound, lower_bound]}}}"
def parse_res_str(res_str):
"""
Parse a string representing a dictionary back to an actual dictionary.
Parameters:
- res_str (str): A string representation of a dictionary with key-value pairs.
Example: "'critical': 1.0, 'high': 2.0, 'medium': 3.0"
Returns:
- dict: The parsed dictionary with keys as strings and values as floats.
Example: {'critical': 1.0, 'high': 2.0, 'medium': 3.0}
"""
# Strip the outer curly braces and split the string by commas to separate the key-value pairs
l = res_str.strip("{}").split(",")
# For each key-value pair string, strip any remaining commas and split by the colon to separate key and value
l = [e.strip(",").split(":") for e in l]
# Construct the dictionary by stripping whitespace and quotes from keys, and converting values to floats
r = {e[0].strip(" '"): float(e[1]) for e in l}
return r
def confidence_description(score):
if score > 0.6:
return "High"
elif score > 0.4:
return "Medium"
elif score > 0.1:
return "Low"
else:
return "No Pattern"
def transfer_zscore_to_boundary(zscore, mean, stdev):
# compute the actual boundary (value) accroding to zscore and mean+stdev
return zscore * stdev + mean
def format_thresholds(result):
"""
Formats the result dictionary into the specified string format, including only present keys.
Parameters:
- result (dict): A dictionary containing threshold levels.
Returns:
- str: A formatted string with the specified threshold levels.
"""
formatted_items = []
for k, v in result.items():
formatted_items.append(f"'{k}': {v}")
# Join all formatted items with commas and enclose in braces
return f"{{{', '.join(formatted_items)}}}"