# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. import json from splunk.util import normalizeBoolean from ITOA.itoa_factory import instantiate_object from ITOA.setup_logging import getLogger logger = getLogger() OUTLIER_DETECTION_ALGO_DEFAULT = 'iqr' from itsi.itsi_utils import ThresholdUtils, ThresholdRecommendationUtils class KpiThresholdUtils(ThresholdUtils): def __init__(self, session_key, owner): """ Constructor @type: string @param: session_key @type: string @param owner: "current_user_name" user invoking this call @rtype: None @return: None """ self._session_key = session_key self.owner = owner self._service_object = instantiate_object(session_key, owner, "service") def check_and_fetch_service_configs(self, owner, data): """ Fetch the KPI configurations. @type: str @param owner: owner making the request @type: list @param data: A list of dictionaries containing KPI data. @rtype: dict @return: A dictionary containing KPI configurations. """ logger.debug("Checking presence of entities in the collection...") # Initialize the filter for bulk get get_bulk_filter = {'$or': []} # Populate the filter based on the provided data for entry in data: service_id = entry['itsi_service_id'] get_bulk_filter['$or'].append({'_key': service_id}) # Fetch service configurations from the collection service_configs = self._service_object.get_bulk(owner, filter_data=get_bulk_filter) logger.debug("Service configs fetched successfully.") return service_configs def update_collection_with_result(self, owner, data): """ Update the collection with the provided service data. @type: str @param owner: owner making the request @type: list @param data: A list containing service data. @rtype: None @return: None """ success = True logger.debug("Updating the service collection with the provided service data...") # Extract values from the provided data values = data # Save the batch of data to the collection try: self._service_object.save_batch(owner, values, False) logger.debug("service collection updated successfully.") except Exception as e: logger.exception('Error while updating kpi threshold to collection. Exception="%s"', e) success = False return success def calculate_kpi_thresholds(self, owner, **kwargs): """ Calculate KPI threshold recommendations and add/update it in service collection. @type: string @param owner: owner making the request @type: dict @param **kwargs: key word arguments extracted from request @rtype: tuple (bool, bool) @return: (success, data) True for success if saved successfully and True for data if there are objects to store """ recommended_data = kwargs.get('data', []) temp_key_set = set() kpi_data_updated = dict() service_configs = self.check_and_fetch_service_configs(owner, recommended_data) # Create a dictionary to store the fetched KPI configurations kpi_data = {} for service_config in service_configs: for kpi in service_config['kpis']: kpi_data.update({kpi['_key']: kpi}) recommended_data_ = [] for kpi in recommended_data: for kpi_at_configuration in kpi['kpi_at_configurations']: recommended_data_.append({'itsi_kpi_id': kpi['itsi_kpi_id'], **kpi_at_configuration}) recommended_data = recommended_data_ # Prepare threshold_recommendation_summary with the first result in recommended_data threshold_recommendation_summary = ThresholdRecommendationUtils.prepare_threshold_recommendation_summary( recommended_data) # Iterate the recommendation data and add/update KPI threshold accordingly for data in recommended_data: kpi_id = data['itsi_kpi_id'] training_window = data.get('Analysis Window', '-7d') # Generate time variant policies for SUCCESSFUL flag # Generate static thresholds for NO_PATTERN flag if data["Recommendation Flag"] == "SUCCESSFUL": recommended_policies = self.generate_time_variant_policies(data, aggregate=True) # If recommended object is already checked for temp_key, just add new policy for it. if kpi_id in temp_key_set: kpi_data_updated[kpi_id]['time_variate_thresholds_specification']['policies'].update(recommended_policies) continue kpi_data_updated[kpi_id] = kpi_data[kpi_id].copy() kpi_data_updated[kpi_id]['adaptive_thresholding_training_window'] = training_window kpi_data_updated[kpi_id]['time_variate_thresholds'] = True kpi_data_updated[kpi_id]['time_variate_thresholds_specification']['policies'] = recommended_policies kpi_data_updated[kpi_id]['adaptive_thresholds_is_enabled'] = not normalizeBoolean(data["Use Static"]) kpi_data_updated[kpi_id]['is_recommended_time_policies'] = True kpi_data_updated[kpi_id]['was_recommendation_modified'] = False kpi_data_updated[kpi_id]['did_load_recommendation'] = True if "Non_negative" in data: kpi_data_updated[kpi_id]['recommendation_allow_negative_value'] = normalizeBoolean(data["Non_negative"]) if "Sensitivity Level" in data: kpi_data_updated[kpi_id]['recommendation_threshold_sensitivity'] = data["Sensitivity Level"] if [data.get('Sensitivity', 'NO_RECOMMEND_NOT_ENOUGH_DATA')] != 'NO_RECOMMEND_NOT_ENOUGH_DATA': kpi_data_updated[kpi_id]['outlier_detection_algo'] = OUTLIER_DETECTION_ALGO_DEFAULT kpi_data_updated[kpi_id]['outlier_detection_sensitivity'] = data.get('Sensitivity') kpi_data_updated[kpi_id]['aggregate_outlier_detection_enabled'] = True kpi_data_updated[kpi_id]['kpi_threshold_template_id'] = '' if threshold_recommendation_summary is not None: kpi_data_updated[kpi_id]['threshold_recommendation_summary'] = threshold_recommendation_summary elif data["Recommendation Flag"] == "NO_PATTERN": threshold_response_str = data["Thresholds"] threshold_response_json_str = threshold_response_str.replace("'", '"') threshold_response_obj = json.loads(threshold_response_json_str) threshold_levels = self.generate_threshold_levels(threshold_response_obj, data["Threshold Direction"], False) if kpi_id not in temp_key_set: kpi_data_updated[kpi_id] = kpi_data[kpi_id].copy() kpi_data_updated[kpi_id]['adaptive_thresholding_training_window'] = '-7d' kpi_data_updated[kpi_id]['aggregate_thresholds'] = threshold_levels kpi_data_updated[kpi_id]['adaptive_thresholds_is_enabled'] = False kpi_data_updated[kpi_id]['time_variate_thresholds'] = False kpi_data_updated[kpi_id]['is_recommended_time_policies'] = True kpi_data_updated[kpi_id]['was_recommendation_modified'] = False kpi_data_updated[kpi_id]['did_load_recommendation'] = True if "Non_negative" in data: kpi_data_updated[kpi_id]['recommendation_allow_negative_value'] = normalizeBoolean(data["Non_negative"]) if "Sensitivity Level" in data: kpi_data_updated[kpi_id]['recommendation_threshold_sensitivity'] = data["Sensitivity Level"] if threshold_recommendation_summary is not None: kpi_data_updated[kpi_id]['threshold_recommendation_summary'] = threshold_recommendation_summary temp_key_set.add(kpi_id) # Will add condition for CONSTANT_KPI and INSUFFICIENT_DATA flag if not kpi_data_updated: return False, False for kpi_id in kpi_data_updated: for service_config in service_configs: for kpi in service_config['kpis']: if kpi['_key'] == kpi_id: kpi.update(kpi_data_updated[kpi_id]) # POST new/updated service object to service collection result = self.update_collection_with_result(owner, service_configs) return result, True