You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
205 lines
8.7 KiB
205 lines
8.7 KiB
# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
|
|
|
|
import json
|
|
|
|
from splunk.util import normalizeBoolean
|
|
|
|
from ITOA.itoa_factory import instantiate_object
|
|
from ITOA.setup_logging import getLogger
|
|
|
|
logger = getLogger()
|
|
OUTLIER_DETECTION_ALGO_DEFAULT = 'iqr'
|
|
from itsi.itsi_utils import ThresholdUtils, ThresholdRecommendationUtils
|
|
|
|
|
|
class KpiThresholdUtils(ThresholdUtils):
|
|
def __init__(self, session_key, owner):
|
|
"""
|
|
Constructor
|
|
|
|
@type: string
|
|
@param: session_key
|
|
|
|
@type: string
|
|
@param owner: "current_user_name" user invoking this call
|
|
|
|
@rtype: None
|
|
@return: None
|
|
"""
|
|
self._session_key = session_key
|
|
self.owner = owner
|
|
self._service_object = instantiate_object(session_key, owner, "service")
|
|
|
|
def check_and_fetch_service_configs(self, owner, data):
|
|
"""
|
|
Fetch the KPI configurations.
|
|
|
|
@type: str
|
|
@param owner: owner making the request
|
|
|
|
@type: list
|
|
@param data: A list of dictionaries containing KPI data.
|
|
|
|
@rtype: dict
|
|
@return: A dictionary containing KPI configurations.
|
|
"""
|
|
logger.debug("Checking presence of entities in the collection...")
|
|
|
|
# Initialize the filter for bulk get
|
|
get_bulk_filter = {'$or': []}
|
|
|
|
# Populate the filter based on the provided data
|
|
for entry in data:
|
|
service_id = entry['itsi_service_id']
|
|
get_bulk_filter['$or'].append({'_key': service_id})
|
|
|
|
# Fetch service configurations from the collection
|
|
service_configs = self._service_object.get_bulk(owner, filter_data=get_bulk_filter)
|
|
|
|
logger.debug("Service configs fetched successfully.")
|
|
return service_configs
|
|
|
|
def update_collection_with_result(self, owner, data):
|
|
"""
|
|
Update the collection with the provided service data.
|
|
|
|
@type: str
|
|
@param owner: owner making the request
|
|
|
|
@type: list
|
|
@param data: A list containing service data.
|
|
|
|
@rtype: None
|
|
@return: None
|
|
"""
|
|
success = True
|
|
logger.debug("Updating the service collection with the provided service data...")
|
|
|
|
# Extract values from the provided data
|
|
values = data
|
|
|
|
# Save the batch of data to the collection
|
|
try:
|
|
self._service_object.save_batch(owner, values, False)
|
|
logger.debug("service collection updated successfully.")
|
|
except Exception as e:
|
|
logger.exception('Error while updating kpi threshold to collection. Exception="%s"', e)
|
|
success = False
|
|
|
|
return success
|
|
|
|
def calculate_kpi_thresholds(self, owner, **kwargs):
|
|
"""
|
|
Calculate KPI threshold recommendations and add/update it in service collection.
|
|
|
|
@type: string
|
|
@param owner: owner making the request
|
|
|
|
@type: dict
|
|
@param **kwargs: key word arguments extracted from request
|
|
|
|
@rtype: tuple (bool, bool)
|
|
@return: (success, data) True for success if saved successfully and True for data if there are objects to store
|
|
"""
|
|
recommended_data = kwargs.get('data', [])
|
|
temp_key_set = set()
|
|
|
|
kpi_data_updated = dict()
|
|
|
|
service_configs = self.check_and_fetch_service_configs(owner, recommended_data)
|
|
|
|
# Create a dictionary to store the fetched KPI configurations
|
|
kpi_data = {}
|
|
for service_config in service_configs:
|
|
for kpi in service_config['kpis']:
|
|
kpi_data.update({kpi['_key']: kpi})
|
|
|
|
recommended_data_ = []
|
|
for kpi in recommended_data:
|
|
for kpi_at_configuration in kpi['kpi_at_configurations']:
|
|
recommended_data_.append({'itsi_kpi_id': kpi['itsi_kpi_id'], **kpi_at_configuration})
|
|
recommended_data = recommended_data_
|
|
|
|
# Prepare threshold_recommendation_summary with the first result in recommended_data
|
|
threshold_recommendation_summary = ThresholdRecommendationUtils.prepare_threshold_recommendation_summary(
|
|
recommended_data)
|
|
|
|
# Iterate the recommendation data and add/update KPI threshold accordingly
|
|
for data in recommended_data:
|
|
kpi_id = data['itsi_kpi_id']
|
|
training_window = data.get('Analysis Window', '-7d')
|
|
|
|
# Generate time variant policies for SUCCESSFUL flag
|
|
# Generate static thresholds for NO_PATTERN flag
|
|
if data["Recommendation Flag"] == "SUCCESSFUL":
|
|
recommended_policies = self.generate_time_variant_policies(data, aggregate=True)
|
|
|
|
# If recommended object is already checked for temp_key, just add new policy for it.
|
|
if kpi_id in temp_key_set:
|
|
kpi_data_updated[kpi_id]['time_variate_thresholds_specification']['policies'].update(recommended_policies)
|
|
continue
|
|
kpi_data_updated[kpi_id] = kpi_data[kpi_id].copy()
|
|
kpi_data_updated[kpi_id]['adaptive_thresholding_training_window'] = training_window
|
|
kpi_data_updated[kpi_id]['time_variate_thresholds'] = True
|
|
kpi_data_updated[kpi_id]['time_variate_thresholds_specification']['policies'] = recommended_policies
|
|
kpi_data_updated[kpi_id]['adaptive_thresholds_is_enabled'] = not normalizeBoolean(data["Use Static"])
|
|
kpi_data_updated[kpi_id]['is_recommended_time_policies'] = True
|
|
kpi_data_updated[kpi_id]['was_recommendation_modified'] = False
|
|
kpi_data_updated[kpi_id]['did_load_recommendation'] = True
|
|
if "Non_negative" in data:
|
|
kpi_data_updated[kpi_id]['recommendation_allow_negative_value'] = normalizeBoolean(data["Non_negative"])
|
|
|
|
if "Sensitivity Level" in data:
|
|
kpi_data_updated[kpi_id]['recommendation_threshold_sensitivity'] = data["Sensitivity Level"]
|
|
|
|
if [data.get('Sensitivity', 'NO_RECOMMEND_NOT_ENOUGH_DATA')] != 'NO_RECOMMEND_NOT_ENOUGH_DATA':
|
|
kpi_data_updated[kpi_id]['outlier_detection_algo'] = OUTLIER_DETECTION_ALGO_DEFAULT
|
|
kpi_data_updated[kpi_id]['outlier_detection_sensitivity'] = data.get('Sensitivity')
|
|
kpi_data_updated[kpi_id]['aggregate_outlier_detection_enabled'] = True
|
|
kpi_data_updated[kpi_id]['kpi_threshold_template_id'] = ''
|
|
|
|
if threshold_recommendation_summary is not None:
|
|
kpi_data_updated[kpi_id]['threshold_recommendation_summary'] = threshold_recommendation_summary
|
|
elif data["Recommendation Flag"] == "NO_PATTERN":
|
|
threshold_response_str = data["Thresholds"]
|
|
threshold_response_json_str = threshold_response_str.replace("'", '"')
|
|
threshold_response_obj = json.loads(threshold_response_json_str)
|
|
threshold_levels = self.generate_threshold_levels(threshold_response_obj, data["Threshold Direction"], False)
|
|
|
|
if kpi_id not in temp_key_set:
|
|
kpi_data_updated[kpi_id] = kpi_data[kpi_id].copy()
|
|
kpi_data_updated[kpi_id]['adaptive_thresholding_training_window'] = '-7d'
|
|
kpi_data_updated[kpi_id]['aggregate_thresholds'] = threshold_levels
|
|
kpi_data_updated[kpi_id]['adaptive_thresholds_is_enabled'] = False
|
|
kpi_data_updated[kpi_id]['time_variate_thresholds'] = False
|
|
kpi_data_updated[kpi_id]['is_recommended_time_policies'] = True
|
|
kpi_data_updated[kpi_id]['was_recommendation_modified'] = False
|
|
kpi_data_updated[kpi_id]['did_load_recommendation'] = True
|
|
|
|
if "Non_negative" in data:
|
|
kpi_data_updated[kpi_id]['recommendation_allow_negative_value'] = normalizeBoolean(data["Non_negative"])
|
|
|
|
if "Sensitivity Level" in data:
|
|
kpi_data_updated[kpi_id]['recommendation_threshold_sensitivity'] = data["Sensitivity Level"]
|
|
|
|
if threshold_recommendation_summary is not None:
|
|
kpi_data_updated[kpi_id]['threshold_recommendation_summary'] = threshold_recommendation_summary
|
|
|
|
temp_key_set.add(kpi_id)
|
|
|
|
# Will add condition for CONSTANT_KPI and INSUFFICIENT_DATA flag
|
|
|
|
if not kpi_data_updated:
|
|
return False, False
|
|
|
|
for kpi_id in kpi_data_updated:
|
|
for service_config in service_configs:
|
|
for kpi in service_config['kpis']:
|
|
if kpi['_key'] == kpi_id:
|
|
kpi.update(kpi_data_updated[kpi_id])
|
|
|
|
# POST new/updated service object to service collection
|
|
result = self.update_collection_with_result(owner, service_configs)
|
|
|
|
return result, True
|