You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

205 lines
8.7 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import json
from splunk.util import normalizeBoolean
from ITOA.itoa_factory import instantiate_object
from ITOA.setup_logging import getLogger
logger = getLogger()
OUTLIER_DETECTION_ALGO_DEFAULT = 'iqr'
from itsi.itsi_utils import ThresholdUtils, ThresholdRecommendationUtils
class KpiThresholdUtils(ThresholdUtils):
def __init__(self, session_key, owner):
"""
Constructor
@type: string
@param: session_key
@type: string
@param owner: "current_user_name" user invoking this call
@rtype: None
@return: None
"""
self._session_key = session_key
self.owner = owner
self._service_object = instantiate_object(session_key, owner, "service")
def check_and_fetch_service_configs(self, owner, data):
"""
Fetch the KPI configurations.
@type: str
@param owner: owner making the request
@type: list
@param data: A list of dictionaries containing KPI data.
@rtype: dict
@return: A dictionary containing KPI configurations.
"""
logger.debug("Checking presence of entities in the collection...")
# Initialize the filter for bulk get
get_bulk_filter = {'$or': []}
# Populate the filter based on the provided data
for entry in data:
service_id = entry['itsi_service_id']
get_bulk_filter['$or'].append({'_key': service_id})
# Fetch service configurations from the collection
service_configs = self._service_object.get_bulk(owner, filter_data=get_bulk_filter)
logger.debug("Service configs fetched successfully.")
return service_configs
def update_collection_with_result(self, owner, data):
"""
Update the collection with the provided service data.
@type: str
@param owner: owner making the request
@type: list
@param data: A list containing service data.
@rtype: None
@return: None
"""
success = True
logger.debug("Updating the service collection with the provided service data...")
# Extract values from the provided data
values = data
# Save the batch of data to the collection
try:
self._service_object.save_batch(owner, values, False)
logger.debug("service collection updated successfully.")
except Exception as e:
logger.exception('Error while updating kpi threshold to collection. Exception="%s"', e)
success = False
return success
def calculate_kpi_thresholds(self, owner, **kwargs):
"""
Calculate KPI threshold recommendations and add/update it in service collection.
@type: string
@param owner: owner making the request
@type: dict
@param **kwargs: key word arguments extracted from request
@rtype: tuple (bool, bool)
@return: (success, data) True for success if saved successfully and True for data if there are objects to store
"""
recommended_data = kwargs.get('data', [])
temp_key_set = set()
kpi_data_updated = dict()
service_configs = self.check_and_fetch_service_configs(owner, recommended_data)
# Create a dictionary to store the fetched KPI configurations
kpi_data = {}
for service_config in service_configs:
for kpi in service_config['kpis']:
kpi_data.update({kpi['_key']: kpi})
recommended_data_ = []
for kpi in recommended_data:
for kpi_at_configuration in kpi['kpi_at_configurations']:
recommended_data_.append({'itsi_kpi_id': kpi['itsi_kpi_id'], **kpi_at_configuration})
recommended_data = recommended_data_
# Prepare threshold_recommendation_summary with the first result in recommended_data
threshold_recommendation_summary = ThresholdRecommendationUtils.prepare_threshold_recommendation_summary(
recommended_data)
# Iterate the recommendation data and add/update KPI threshold accordingly
for data in recommended_data:
kpi_id = data['itsi_kpi_id']
training_window = data.get('Analysis Window', '-7d')
# Generate time variant policies for SUCCESSFUL flag
# Generate static thresholds for NO_PATTERN flag
if data["Recommendation Flag"] == "SUCCESSFUL":
recommended_policies = self.generate_time_variant_policies(data, aggregate=True)
# If recommended object is already checked for temp_key, just add new policy for it.
if kpi_id in temp_key_set:
kpi_data_updated[kpi_id]['time_variate_thresholds_specification']['policies'].update(recommended_policies)
continue
kpi_data_updated[kpi_id] = kpi_data[kpi_id].copy()
kpi_data_updated[kpi_id]['adaptive_thresholding_training_window'] = training_window
kpi_data_updated[kpi_id]['time_variate_thresholds'] = True
kpi_data_updated[kpi_id]['time_variate_thresholds_specification']['policies'] = recommended_policies
kpi_data_updated[kpi_id]['adaptive_thresholds_is_enabled'] = not normalizeBoolean(data["Use Static"])
kpi_data_updated[kpi_id]['is_recommended_time_policies'] = True
kpi_data_updated[kpi_id]['was_recommendation_modified'] = False
kpi_data_updated[kpi_id]['did_load_recommendation'] = True
if "Non_negative" in data:
kpi_data_updated[kpi_id]['recommendation_allow_negative_value'] = normalizeBoolean(data["Non_negative"])
if "Sensitivity Level" in data:
kpi_data_updated[kpi_id]['recommendation_threshold_sensitivity'] = data["Sensitivity Level"]
if [data.get('Sensitivity', 'NO_RECOMMEND_NOT_ENOUGH_DATA')] != 'NO_RECOMMEND_NOT_ENOUGH_DATA':
kpi_data_updated[kpi_id]['outlier_detection_algo'] = OUTLIER_DETECTION_ALGO_DEFAULT
kpi_data_updated[kpi_id]['outlier_detection_sensitivity'] = data.get('Sensitivity')
kpi_data_updated[kpi_id]['aggregate_outlier_detection_enabled'] = True
kpi_data_updated[kpi_id]['kpi_threshold_template_id'] = ''
if threshold_recommendation_summary is not None:
kpi_data_updated[kpi_id]['threshold_recommendation_summary'] = threshold_recommendation_summary
elif data["Recommendation Flag"] == "NO_PATTERN":
threshold_response_str = data["Thresholds"]
threshold_response_json_str = threshold_response_str.replace("'", '"')
threshold_response_obj = json.loads(threshold_response_json_str)
threshold_levels = self.generate_threshold_levels(threshold_response_obj, data["Threshold Direction"], False)
if kpi_id not in temp_key_set:
kpi_data_updated[kpi_id] = kpi_data[kpi_id].copy()
kpi_data_updated[kpi_id]['adaptive_thresholding_training_window'] = '-7d'
kpi_data_updated[kpi_id]['aggregate_thresholds'] = threshold_levels
kpi_data_updated[kpi_id]['adaptive_thresholds_is_enabled'] = False
kpi_data_updated[kpi_id]['time_variate_thresholds'] = False
kpi_data_updated[kpi_id]['is_recommended_time_policies'] = True
kpi_data_updated[kpi_id]['was_recommendation_modified'] = False
kpi_data_updated[kpi_id]['did_load_recommendation'] = True
if "Non_negative" in data:
kpi_data_updated[kpi_id]['recommendation_allow_negative_value'] = normalizeBoolean(data["Non_negative"])
if "Sensitivity Level" in data:
kpi_data_updated[kpi_id]['recommendation_threshold_sensitivity'] = data["Sensitivity Level"]
if threshold_recommendation_summary is not None:
kpi_data_updated[kpi_id]['threshold_recommendation_summary'] = threshold_recommendation_summary
temp_key_set.add(kpi_id)
# Will add condition for CONSTANT_KPI and INSUFFICIENT_DATA flag
if not kpi_data_updated:
return False, False
for kpi_id in kpi_data_updated:
for service_config in service_configs:
for kpi in service_config['kpis']:
if kpi['_key'] == kpi_id:
kpi.update(kpi_data_updated[kpi_id])
# POST new/updated service object to service collection
result = self.update_collection_with_result(owner, service_configs)
return result, True