You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

762 lines
38 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
"""
This module implements the logic for the set_severity_fields custom search command
This command looks up service id and kpi id fields in search pipeline, looks up the
appropriate KPI record in the KV store, computes severity levels for the event in question
and injects alert_* fields in the search results (see the docstring for set_threshold_info).
"""
import datetime
import copy
# Core Splunk Imports
import splunk.rest
import splunk.util
from itsi.objects.itsi_kpi_base_search import ItsiKPIBaseSearch
from itsi.objects.itsi_service import ItsiService
from itsi.objects.itsi_kpi_entity_threshold import ItsiKpiEntityThreshold
from itsi.objects.itsi_kpi_state_cache import ItsiKPIStateCache
from itsi.objects.itsi_custom_threshold_windows import ItsiCustomThresholdWindows
from itsi.itsi_time_block_utils import ItsiTimeBlockUtils
import custom_threshold_windows.constants as CustomThresholdWindowConstants
from ITOA.setup_logging import logger
from ITOA.itoa_common import is_valid_dict, is_string_numeric, is_valid_str
from maintenance_services.objects.operative_maintenance_record import OperativeMaintenanceRecord
class SetSeverityFieldsCommandError(Exception):
pass
class CollectKpiInfo(object):
"""
Class to collect kpi meta data
"""
def __init__(self, session_key):
"""
Initialize session_key
"""
self.service_object = ItsiService(session_key, 'nobody')
self.kpi_base_search_object = ItsiKPIBaseSearch(session_key, 'nobody')
self.operative_maintenance_record_object = OperativeMaintenanceRecord(session_key, 'nobody')
self.kpi_state_cache_object = ItsiKPIStateCache(session_key, 'nobody')
self.custom_threshold_object = ItsiCustomThresholdWindows(session_key, 'nobody')
self.kpi_entity_threshold_object = ItsiKpiEntityThreshold(session_key, 'nobody')
# Store fetched kpi data
self.kpis_data = {}
self.entity_threshold_data = {}
self.service_data = {}
self.shared_base_kpis = {}
self.maintenance_service_cache = self._get_maintenance_services()
def _get_maintenance_services(self):
service_dict = self.operative_maintenance_record_object.get_bulk(
'nobody',
filter_data={'maintenance_object_type': 'service'},
fields=['maintenance_object_key']
)
return [service.get('maintenance_object_key') for service in service_dict]
def get_kpi(self, service_id, kpi_id):
"""
Get kpi meta data. If kpiid data contains in dict then just return it otherwise retrieve it from kv store
that way we do not retrieve data for each event
@type service_id: basestring
@param service_id: service id
@type kpi_id: basestring
@param kpi_id: kpi id
@return None or a single KPI
"""
if kpi_id in self.kpis_data:
return self.kpis_data[kpi_id], self.service_data
else:
service = self.service_object.get('nobody', service_id)
if service is None:
logger.error('Service (serviceid=%s) does not exist in kv store' % service_id)
return None, service
is_service_healthscore_calculate_by_entity_enabled = splunk.util.normalizeBoolean(
service.get("is_healthscore_calculate_by_entity_enabled", 1))
for kpi in service.get('kpis', []):
if kpi['_key'] == kpi_id:
kpi_needs_update = False
if kpi.get('recalculate_custom_thresholds', False):
kpi, kpi_needs_update = self.calculate_custom_thresholds(kpi)
if kpi_needs_update:
logger.info('KPI {} has been updated due to active custom threshold window {}.'
.format(kpi_id, kpi.get('active_custom_threshold_window'))
)
self.service_object.update('nobody', service_id, service, is_partial_data=True)
self.kpis_data[kpi.get('_key')] = kpi
in_maintenance = service.get('_key') in self.maintenance_service_cache
self.service_data = {
'_key': service.get('_key'),
'in_maintenance': in_maintenance,
'sec_grp': service.get('sec_grp'),
'is_service_healthscore_calculate_by_entity_enabled':
is_service_healthscore_calculate_by_entity_enabled
}
return kpi, self.service_data
return None, None
def get_kpis_from_shared_base(self, kpi_shared_base_search):
"""
Get the kpi meta data using the kpi base search
@param kpi_shared_base_search: The shared base search identifier
@type kpi_shared_base_search: string
@rtype: tuple
@return (None, None) on error or a tuple of
dict of the kpis keyed by service_id influenced by this shared base search and
dict of KPI Base Search containing metrics in key to metric map format, instead of metrics list.
"""
shared_base = self.kpi_base_search_object.get('nobody', kpi_shared_base_search)
if shared_base is None:
logger.error('Shared base search %s does not exist in kv store' % kpi_shared_base_search)
return None, None
# Only fetch services only if all the conditions are true:
# 1. service must be "enabled"
# 2. service must contain KPIs of type "shared_base"
# 3. with KPIs which have their base search id set to given base search. NOTE: since kpi base searches
# can only belong to the Global group which is shared by all private groups, lookup by base search id is fine
services = self.service_object.get_bulk(
'nobody',
filter_data={'$and': [
{'enabled': 1},
{'kpis.search_type': 'shared_base'},
{'kpis.base_search_id': kpi_shared_base_search}
]}
)
# generate metric key to metric map
shared_base_metric_map = {}
for metric in shared_base.get('metrics', []):
shared_base_metric_map[metric['_key']] = metric
shared_base['metrics'] = shared_base_metric_map
if services is None or len(services) == 0:
logger.error('Shared base search %s has no matching kpis' % shared_base)
return None, shared_base
kpis_found = {}
services_to_update = []
custom_threshold_windows = self.custom_threshold_object.get_bulk('nobody',
fields=['_key', 'window_config_percentage',
'window_type', 'window_config_static'],
filter_data={'$and': [
{'active': 1},
]})
existing_custom_threshold_windows = {ctw.get('_key'): ctw for ctw in custom_threshold_windows}
for svc in services:
kpis = svc.get("kpis")
service_key = svc.get("_key")
# Pertains to custom threshold window workflow
service_object_needs_update = False
service_in_maintenance = service_key in self.maintenance_service_cache
is_service_healthscore_calculate_by_entity_enabled = splunk.util.normalizeBoolean(
svc.get("is_healthscore_calculate_by_entity_enabled", 1))
if kpis is None:
logger.error('Somehow, matching service=%s has no kpis' % kpi_shared_base_search)
else:
kpi_found = kpis_found[service_key] if (service_key in kpis_found) else None
if not kpi_found:
kpi_found = {"kpis": [], 'entity_rules': svc.get('entity_rules'),
'in_maintenance': service_in_maintenance, 'sec_grp': svc.get('sec_grp'),
'is_service_healthscore_calculate_by_entity_enabled':
is_service_healthscore_calculate_by_entity_enabled}
is_any_kpi_shared_base_search_found = False
for kpi in kpis:
if kpi.get('search_type', '') == 'shared_base' and kpi.get(
'base_search_id') == kpi_shared_base_search:
kpi_needs_update = False
if kpi.get('recalculate_custom_thresholds', False):
kpi, kpi_needs_update = self.calculate_custom_thresholds(kpi,
existing_custom_threshold_windows)
if kpi_needs_update:
service_object_needs_update = True
self.kpis_data[kpi.get('_key')] = kpi
kpi_found['kpis'].append(kpi)
if not is_any_kpi_shared_base_search_found:
is_any_kpi_shared_base_search_found = True
if is_any_kpi_shared_base_search_found:
kpis_found[service_key] = kpi_found
if service_object_needs_update:
logger.info('KPIs with shared base search {} in service {} have been updated due to active custom'
' threshold window.'.format(kpi_shared_base_search, service_key))
services_to_update.append(svc)
if services_to_update:
logger.debug('{} services are being updated by custom threshold window updates.'
.format([s['_key'] for s in services_to_update]))
self.service_object.save_batch('nobody', services_to_update, validate_names=False, is_partial_data=True)
return kpis_found, shared_base
def get_kpi_state_cache(self, kpiid):
"""
Get the latest KPI severity info for a give KPI
@param kpiid: KPI id
@return: dict of the record in the collection
"""
return self.kpi_state_cache_object.get('nobody', kpiid)
def create_kpi_state_cache(self, kpiid, severity_value):
"""
Create a new record for KPI storing its severity info
@param kpiid: KPI id
@param severity_value: severity value eg "normal"
"""
data = {"_key": kpiid, "cache_severity": severity_value}
self.kpi_state_cache_object.create('nobody', data)
def update_kpi_state_cache(self, kpiid, severity_value):
"""
Update the severity value of a KPI
@param kpiid: KPI id
@param severity_value: new severity value
"""
data = {"cache_severity": severity_value}
self.kpi_state_cache_object.update('nobody', kpiid, data)
def check_kpi_for_count_override(self, kpi_dict):
"""
In cases of an entity level count/dc operator and a service level avg/max/min/sum operator we need to override
the no data null with a service level 0. Any other combination will be handled normally.
@param kpi_dict:
@type kpi_dict: dict
@return: True if we should perform the count value override, False otherwise
@rtype: bool
"""
# See https://confluence.splunk.com/display/PROD/ITSI+Search+Test+Matrix for a list of all
# Possible search results - all places where we should have 0 instead of NA
if kpi_dict.get('aggregate_statop') == 'dc':
return True
if not kpi_dict.get('is_entity_breakdown', False):
if kpi_dict.get('aggregate_statop') == 'count': # Both dc and count should return true for the agg. statop
return True
return False
# Handle the generic case of our matrix - we do the count override
valid_entity_ops = ('count', 'dc')
valid_service_ops = ('avg', 'dc', 'sum', 'max', 'min')
if kpi_dict.get('entity_statop') in valid_entity_ops and kpi_dict.get('aggregate_statop') in valid_service_ops:
return True
return False
@staticmethod
def handle_filling_of_data_gaps(alert_value, object):
"""
Override data gaps (N/A values) for KPIs, when "fill_gaps" attribute
is set to "custom_value" for KPI or metric in a shared base search.
@type alert_value: basestring
@param alert_value: alert_value in event result
@type object: dict
@param object: kpi object or a metric object in KPI Base Search
@rtype: tuple of bool and basestring
@return: (True, value) if data gap value has to be overwritten, else (False, 'N/A')
"""
if object.get('fill_gaps') == 'custom_value':
if alert_value is None or not is_string_numeric(alert_value):
return True, object.get('gap_custom_alert_value', 'N/A')
return False, 'N/A'
def update_custom_aggregate_thresholds(self, threshold_object, percentage_modifier):
"""
Modifies an aggregate_threshold policy by the percentage adjustment specified (total change to be passed in)
@param threshold_object: the aggregate_threshold policy
@param percentage_modifier: float that should already be calculated
"""
if 'thresholdLevels' in threshold_object:
for level in threshold_object.get('thresholdLevels'):
level['thresholdValue'] = round(level['thresholdValue'] * percentage_modifier, 2)
return True
return False
def calculate_custom_thresholds(self, kpi, existing_custom_threshold_windows=None):
"""
If the given KPI needs to recalculate the custom thresholds for the active window, perform calculations and
update existing entries on the KPI.
@type kpi: dict
@param kpi: the kpi object itself
@type existing_custom_threshold_windows: dict
@param existing_custom_threshold_windows: mapping of _key of CTW to partial CTW definition
@rtype: tuple of dict and bool
@return: kpi dict (modified or not) and True / False indicating it if save to service object is necessary
"""
if not kpi.get('recalculate_custom_thresholds', False):
logger.debug('KPI {} does not need to have custom thresholds recalculated.'.format(kpi.get('_key')))
return kpi, False
# Turn off "recalculate_custom_thresholds" so we don't enter this again until the next custom threshold window
# Also helps out scenarios where the CTW is misconfigured, so we don't want to keep trying to recalculate
kpi['recalculate_custom_thresholds'] = False
if not kpi.get('active_custom_threshold_window', ''):
logger.warn(
'KPI {} could not recalculate custom thresholds because no custom threshold window was marked active.'
.format(kpi.get('_key', ''))
)
return kpi, True
kpi_window = kpi.get('active_custom_threshold_window', '')
if not is_valid_str(kpi_window):
logger.error('Somehow multiple custom threshold windows were stored on KPI {} as value {} - fixing KPI and '
'skipping.'.format(kpi.get('_key'), kpi_window)
)
kpi['active_custom_threshold_window'] = ''
return kpi, True
active_window = None
if existing_custom_threshold_windows is not None:
active_window = existing_custom_threshold_windows.get(kpi_window)
if active_window is None:
active_window = self.custom_threshold_object.get('nobody', kpi_window)
if active_window.get('window_type') == CustomThresholdWindowConstants.TYPE_PERCENTAGE:
percentage_config = active_window.get('window_config_percentage', 0)
if percentage_config == 0:
# If value is for some reason not set, do not modify
logger.warn(
'Custom threshold window {} does not have a percentage modification set.'
.format(active_window.get('_key'))
)
return kpi, True
elif abs(percentage_config) > 200:
logger.warn('Custom threshold window {} has a percentage set as greater than maximum'
' absolute value of 200. Calculating custom thresholds based on 200.'
.format(active_window.get('_key')))
percentage_config = 200 if percentage_config > 0 else -200
percentage_modifier = 1.00 + (percentage_config / 100)
if kpi.get('time_variate_thresholds', False):
kpi['time_variate_thresholds_specification_custom'] = copy.deepcopy(
kpi.get('time_variate_thresholds_specification'))
for policy in kpi['time_variate_thresholds_specification_custom']['policies']:
successful_update = self.update_custom_aggregate_thresholds(
kpi['time_variate_thresholds_specification_custom']['policies'][policy]['aggregate_thresholds'],
percentage_modifier
)
if not successful_update:
logger.error('KPI {} had no thresholdLevels to update for an aggregate threshold '
'configuration. No modifications were applied to the base threshold levels.'
.format(kpi.get('_key')))
else:
kpi['aggregate_thresholds_custom'] = copy.deepcopy(kpi.get('aggregate_thresholds'))
successful_update = self.update_custom_aggregate_thresholds(kpi['aggregate_thresholds_custom'],
percentage_modifier)
if not successful_update:
logger.error('KPI {} had no thresholdLevels to update for its aggregate thresholds. No '
'modification were applied to the base threshold levels.'
.format(kpi.get('_key'))
)
else:
logger.error('Customer set static custom threshold window thresholds, which is not available in the beta.'
'Please fix and use the percentage adjustment for thresholds.')
return kpi, True
def get_kpi_entity_thresholds(self, kpi_id):
"""
Fetches the entity threshold configurations for a given KPI id
@type kpi_id: string
@param kpi_id: the kpi id of the entity treshold configurations
"""
if kpi_id not in self.entity_threshold_data:
entity_threshold_configs = self.kpi_entity_threshold_object.get_bulk('nobody', filter_data={'kpi_id': kpi_id})
if entity_threshold_configs:
self.entity_threshold_data[kpi_id] = entity_threshold_configs
return self.entity_threshold_data
def get_bulk_kpi_entity_thresholds(self, kpi_ids):
"""
Fetches the entity threshold configurations for the given KPI ids
@type kpi_ids: list
@param kpi_ids: the list of kpi ids of the entity treshold configurations
"""
kpis_to_fetch = []
for kpi_id in kpi_ids:
if kpi_id not in self.entity_threshold_data:
kpis_to_fetch.append(kpi_id)
if len(kpis_to_fetch) > 0:
entity_threshold_configs = self.kpi_entity_threshold_object.get_bulk('nobody', filter_data={'$or': [{'kpi_id': k} for k in kpis_to_fetch]})
for config in entity_threshold_configs:
kpi_id = config.get('kpi_id')
if kpi_id not in self.entity_threshold_data:
self.entity_threshold_data[kpi_id] = [config]
else:
self.entity_threshold_data[kpi_id].append(config)
return self.entity_threshold_data
class SetSeverityFields(object):
def __init__(self, is_handle_no_data=False, is_generate_max_value_alert=False, default_time=None):
"""
Initialize
@type is_handle_no_data: boolean
@param is_handle_no_data: boolean to handle no data scenario
@type is_generate_max_value_alert: boolean
@param is_generate_max_value_alert: handle to generate max alert_value event
@return:
"""
# Flag to generate extra alert and handle no data scenario
self.is_handle_no_data = is_handle_no_data
self.is_generate_max_value_alert = is_generate_max_value_alert
# Max result set - to handle multiple kpis its a dict with kpiid as the key
self.max_alert_result = {}
# default time
self.default_time = default_time
self.last_timestamp = None # used to generate max severity event for time-series events
def _get_alert_level(self, value, kpi, threshold_settings, is_kpi_in_maintenance=False):
'''
Given a metric value and threshold_settings object
(which contains a thresholdLevels array) generate alert fields
@param value: alert value to lookup thresholding for
@type value: basestring
@param kpi: KPI that is being thresholded
@type: object
@param threshold_settings: thresholding settings to apply
@type: dict
@param is_kpi_in_maintenance: indicates if the KPI is in maintenance
@type is_kpi_in_maintenance: boolean
@return: alert fields identified from applying thresholds on alert value
'''
threshold_levels = []
if is_valid_dict(threshold_settings):
threshold_levels = threshold_settings['thresholdLevels']
if is_kpi_in_maintenance:
return {
'alert_severity': 'maintenance',
'alert_color': '#5C6773',
'alert_level': int('-2') # -2 is for maintenance
}
if value is None or not is_string_numeric(value): # assume this means a data gap
logger.debug("No data scenario, value is=%s", value)
return {
'alert_severity': kpi.get('gap_severity', 'unknown'),
'alert_color': kpi.get('gap_severity_color', '#CCCCCC'),
'alert_level': int(kpi.get('gap_severity_value', '-1'))
}
else:
value = float(value)
threshold_levels.sort(key=lambda x: -float(x['thresholdValue'])) # descending order by value
# pick highest threshold that is consistent with `value`
for level in threshold_levels:
threshold_value = float(level.get('thresholdValue', None))
if value >= threshold_value:
logger.debug("threshold value found, for value=%s, threshold value=%s", value, threshold_value)
return {
'alert_severity': level.get('severityLabel', 'unknown'),
'alert_color': level.get('severityColor', '#CCCCCC'),
'alert_level': int(level.get('severityValue', '-1'))
}
# if we got here, value is below every threshold, so return the base severity
logger.debug("value=%s in range of base severity", value)
if not isinstance(threshold_settings, dict):
return {
'alert_severity': 'unknown',
'alert_color': '#CCCCCC',
'alert_level': int('-1')
}
else:
return {
'alert_severity': threshold_settings.get('baseSeverityLabel', 'unknown'),
'alert_color': threshold_settings.get('baseSeverityColor', '#CCCCCC'),
'alert_level': int(threshold_settings.get('baseSeverityValue', '-1'))
}
def _compare_fixed_thresholds(self, result, kpi, service_info, entity_level_thresholds=None):
"""
Return severity fields based on fixed/constant thresholds
@param result: the search results row dictionary to use for comparison
@type result: dict
@param kpi: the kpi model dictionary
@type kpi: dict
@param service_info: relevant service information
@type service_info: dict
@return: the severity fields to be set into the result row
@rtype: dict
"""
entity_thresholds = kpi.get('entity_thresholds')
if entity_level_thresholds:
entity_thresholds = entity_level_thresholds.get('entity_thresholds')
# TODO - Not doing custom threshold windows for entities yet
aggregate_thresholds = kpi.get('aggregate_thresholds')
if kpi.get('active_custom_threshold_window', '') and kpi.get('aggregate_thresholds_custom', {}):
logger.debug('KPI {} has an active custom threshold window {} active. KPI will use custom alert levels.'
.format(kpi.get('_key'), kpi.get('active_custom_threshold_window')))
aggregate_thresholds = kpi.get('aggregate_thresholds_custom')
is_service_in_maintenance = service_info.get('in_maintenance', False)
is_service_healthscore_calculate_by_entity_enabled = \
service_info.get("is_service_healthscore_calculate_by_entity_enabled", True)
return self._make_alert_fields(result, kpi, aggregate_thresholds, entity_thresholds, is_service_in_maintenance,
is_service_healthscore_calculate_by_entity_enabled)
def _make_alert_fields(
self,
result,
kpi,
aggregate_thresholds,
entity_thresholds,
is_service_in_maintenance=False,
is_service_healthscore_calculate_by_entity_enabled=True
):
is_service_aggregate = splunk.util.normalizeBoolean(result.get('is_service_aggregate', True))
value = result.get('alert_value')
if is_service_aggregate:
is_all_entities_in_maintenance = splunk.util.normalizeBoolean(
result.get("is_all_entities_in_maintenance", False)
)
is_kpi_in_maintenance = is_service_in_maintenance or is_all_entities_in_maintenance
alerts = self._get_alert_level(value, kpi, aggregate_thresholds, is_kpi_in_maintenance)
alerts['is_entity_in_maintenance'] = 1 if is_kpi_in_maintenance else 0 # entity is service aggregate
else:
is_entity_in_maintenance = splunk.util.normalizeBoolean(result.get("is_entity_in_maintenance", False))
is_kpi_in_maintenance = is_service_in_maintenance or is_entity_in_maintenance
alerts = self._get_alert_level(value, kpi, entity_thresholds, is_kpi_in_maintenance)
alerts['is_service_in_maintenance'] = 1 if is_service_in_maintenance else 0 # Save away for tracking
# Compared with max stored value and save it to generate separate event
if self.is_generate_max_value_alert and is_service_healthscore_calculate_by_entity_enabled:
if self.max_alert_result.get(kpi.get('_key')) is None:
# Get first value
self.max_alert_result[kpi.get('_key')] = self._copy_and_update_alert_values(result, alerts)
max_alert_level = self.max_alert_result[kpi.get('_key')].get('alert_level')
current_alert_level = alerts.get('alert_level')
if not is_string_numeric(max_alert_level) and is_string_numeric(current_alert_level):
# max contain no empty or non numeric value so assign numeric value
self.max_alert_result[kpi.get('_key')] = self._copy_and_update_alert_values(result, alerts)
if is_string_numeric(max_alert_level) and is_string_numeric(current_alert_level):
# compare max value
if float(current_alert_level) >= float(max_alert_level):
self.max_alert_result[kpi.get('_key')] = self._copy_and_update_alert_values(result, alerts)
elif self.is_generate_max_value_alert and not \
(is_service_healthscore_calculate_by_entity_enabled) and is_service_aggregate:
self.max_alert_result[kpi.get('_key')] = self._copy_and_update_alert_values(result, alerts)
return alerts
def _copy_and_update_alert_values(self, result, alert_values):
"""
Supporting function to do deep copy of result and add alerts_values in it.
@type result: dict
@param result: result or event
@type alert_values: dict
@param alert_values: finalized alert values for given result
@rtype dict
@return: new instance of dict by combining both
"""
combine_result = copy.deepcopy(result)
combine_result.update(alert_values)
return combine_result
def _get_policy(self, time, threshold_spec, tzoffset):
"""
@param time: UTC epoch timestamp
@type time: string, int, or float
@param threshold_spec: dict containing policies dict and time_blocks list
@type threshold_spec: dict
@param tzoffset: ISO timezone offset, e.g. '-07:00' or empty string for UTC
@type tzoffset: string
"""
if not is_valid_dict(threshold_spec):
error_msg = 'Invalid KPI threshold_spec: {0}. Expected dict.'.format(threshold_spec)
logger.debug(error_msg)
raise TypeError(error_msg)
policies = threshold_spec.get('policies')
if not is_valid_dict(policies):
error_msg = 'Invalid KPI policies: {0}. Expected dict.'.format(policies)
logger.debug(error_msg)
raise TypeError(error_msg)
if len(policies) == 0:
error_msg = 'Invalid KPI policies: {0}. Expected dict to not be empty.'.format(policies)
logger.debug(error_msg)
raise ValueError(error_msg)
# first, get current time information
if is_valid_str(tzoffset):
tz = splunk.util.TZInfo(offset=splunk.util.parseISOOffset(tzoffset))
else:
tz = splunk.util.utc
date = datetime.datetime.fromtimestamp(float(time), tz)
day, hour, minute = str(date.weekday()), str(date.hour), str(date.minute)
# use time information to create a time block
# note: time block has a 1 minute duration to pass validation
time_blocks = [[' '.join([minute, hour, '*', '*', day]), 1]]
# find policy associated with time block
found_policy_key = 'default_policy'
for policy_key, policy in policies.items():
policy_time_blocks = policy.get('time_blocks', [])
# if we find conflicting time blocks in policy_time_blocks, it means we've found our policy
if ItsiTimeBlockUtils.check_time_block_conflict_between(time_blocks, policy_time_blocks):
found_policy_key = policy_key
break
return policies.get(found_policy_key, {})
def _compare_variable_thresholds(self, result, kpi, service_info, entity_level_thresholds=None):
"""
Return severity fields based on time-variate thresholds given the timestamp and threshold policy set
@param result: the search result row dictionary to use for comparison
@type result: dict
@param kpi: the kpi model dictionary
@type kpi: dict
@param service_info: relevant service information
@type service_info: dict
@return: the severity fields to be set into the result row
@rtype: dict
"""
threshold_spec = kpi.get('time_variate_thresholds_specification')
if kpi.get('active_custom_threshold_window', '') and \
kpi.get('time_variate_thresholds_specification_custom', {}):
logger.debug('KPI {} has an active custom threshold window {} active. KPI will use custom alert levels.'
.format(kpi.get('_key'), kpi.get('active_custom_threshold_window')))
threshold_spec = kpi.get('time_variate_thresholds_specification_custom')
# Note that _time on summary index is UTC epoch
policy = self._get_policy(result.get('_time', self.default_time), threshold_spec, kpi.get('tz_offset', ''))
entity_thresholds = policy.get('entity_thresholds', {})
if entity_level_thresholds:
entity_threshold_spec = entity_level_thresholds.get('time_variate_thresholds_specification')
entity_policy = self._get_policy(result.get('_time', self.default_time), entity_threshold_spec,
kpi.get('tz_offset', ''))
entity_thresholds = entity_policy.get('entity_thresholds')
aggregate_thresholds = policy.get('aggregate_thresholds')
is_service_in_maintenance = service_info.get('in_maintenance', False)
is_service_healthscore_calculate_by_entity_enabled = \
service_info.get("is_service_healthscore_calculate_by_entity_enabled", True)
return self._make_alert_fields(result, kpi, aggregate_thresholds, entity_thresholds, is_service_in_maintenance,
is_service_healthscore_calculate_by_entity_enabled)
def get_severity_info(self, result, kpi=None, service_info=None, kpi_entity_thresholds=None):
"""
Compute and return the alert-related fields for a single results row from a search.
The following fields are inserted:
- alert_severity (severity label e.g. "normal")
- alert_color (e.g. "#99D18B")
- alert_level (numeric severity level e.g. 2)
- alert_value (the value of the metric field)
- alert_entity ('aggregate' for aggregate thresholds else entity_key)
The code inspects the `time_variate_thresholds` flag in the KPI record. If it is
absent or not set, threshold settings are retrieved from the entity-level
and/or aggregate-level threshold setting records in the KPI, otherwise they
are looked up based on the result _time field using time blocks collection to identify the
relevant policy record, and policy record to get the threshold settings.
@param result: the search result row dictionary to use for comparison
@type result: dict
@param kpi: kpi record as fetched from the KV store
@type kpi: dict
@param service_info: relevant service information collected from KV store
@type service_info: dict
@param kpi_entity_thresholds: the threshold configurations for individual entities
@type kpi_entity_thresholds: list
@return: the severity fields to be set into the result row
@rtype: dict
"""
# When kpi or service is not saved and this command is called, often used for preview charts
if (kpi is None) or (service_info is None):
return {
'alert_severity': 'unknown',
'alert_color': '#CCCCCC',
'alert_level': int(-1)
}
else:
is_service_aggregate = splunk.util.normalizeBoolean(result.get('is_service_aggregate', True))
is_entity_level_thresholding = kpi.get('is_entity_level_thresholding', False)
entity_level_thresholds = None
if not is_service_aggregate and is_entity_level_thresholding and kpi_entity_thresholds:
entity_key = result.get('entity_key')
entity_title = result.get('entity_title')
kpi_id = kpi.get('_key')
entity_level_thresholds = None
if kpi_id in kpi_entity_thresholds:
entity_thresholds = [x for x in kpi_entity_thresholds[kpi_id] if x.get('entity_key') == entity_key
and x.get('entity_title') == entity_title]
entity_level_thresholds = entity_thresholds[0] if len(entity_thresholds) else None
is_time_variant = False
if is_entity_level_thresholding and entity_level_thresholds:
if entity_level_thresholds.get('time_variate_thresholds', False):
is_time_variant = True
elif kpi.get('time_variate_thresholds', False):
is_time_variant = True
if is_time_variant:
return self._compare_variable_thresholds(result, kpi, service_info, entity_level_thresholds)
else:
return self._compare_fixed_thresholds(result, kpi, service_info, entity_level_thresholds)
def get_max_value_event(self, kpi):
"""
@rtype: dict|None
@return: Max result or None
"""
return self.max_alert_result.get(kpi)
def get_max_value_event_per_timestamp(self, curr_timestamp, kpi_id):
"""
While iterating over events, if we have reached next timestamp,
then return max severity event for previous timestamp and cleanup
self.max_alert_result cache.
Else, set self.last_timestamp to current timestamp and return None.
NOTE: this method relies on class variable last_timestamp, which is
initialized to None during instantiation of class.
@type curr_timestamp: basestring
@param curr_timestamp: current event timestamp
@type kpi_id: basestring
@param kpi_id: KPI key for which events are being processed
@rtype: tuple of dict and basestring
@return: tuple of max severity event and previous timestamp for which max severity event is generated
"""
max_alert_result = None
last_timestamp = self.last_timestamp
if self.last_timestamp is not None and self.last_timestamp != curr_timestamp:
max_alert_result = self.get_max_value_event(kpi_id)
# cleanup max alert result, so that it could store max alert event for next timestamp
self.max_alert_result.pop(kpi_id, None)
self.last_timestamp = curr_timestamp
return max_alert_result, last_timestamp