You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

397 lines
18 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import time
import json
from operator import itemgetter
from SA_ITOA_app_common.solnlib.splunk_rest_client import SplunkRestClient
import itsi_py3
from itsi.itsi_utils import ITOAInterfaceUtils
from ITOA.setup_logging import logger
from ITOA.itoa_common import get_current_utc_epoch
from splunk.util import normalizeBoolean
SHKPI_STARTS_WITH = 'SHKPI-'
SHS_DEFAULT_URGENCY = 11
KPI_DEFAULT_URGENCY = 5
CRITICAL_SEVERITY = 6
HIGH_SEVERITY = 5
INFO_SEVERITY = 1
output_params = {
'output_mode': 'json',
'count': 0
}
def get_epoch_times(session_key, earliest, latest):
"""
Convenience method to convert a set of latest and earliest to epoch
@rtype: tuple(earliest_time_in_seconds, latest_time_in_seconds)
@return: a tuple with the earliest and latest times in seconds
"""
try:
uri_string = '/services/search/timeparser'
_rest_client = SplunkRestClient(session_key, "SA-ITOA")
response = _rest_client.get(
uri_string, output_mode="json", time=[earliest, latest], output_time_format="%s"
).body.read()
resp = json.loads(response)
seconds_earliest = seconds_earliest_adjusted = float(resp[earliest])
seconds_latest = float(resp[latest])
logger.info(
'Result of getting earliest and latest times from timeparser: '
'response=%s', response)
# Will most likely occur each time since the minimum time range for SA is 45 minutes
if seconds_earliest + 15 * 60 < seconds_latest:
logger.info('The time range is too wide, adjusting earliest to 15 minutes before latest.')
seconds_earliest_adjusted = seconds_latest - 15 * 60
return seconds_earliest_adjusted, seconds_latest
except Exception:
logger.error('Error in getting getting earliest and latest times from timeparser, defaults to 15 minutes.')
latest_epoch_time = get_current_utc_epoch()
return latest_epoch_time - 15 * 60, latest_epoch_time
def generate_dependencies_json(session_key, data, service_id='', earliest_time='-60m', latest_time='now'):
"""
Entry point for generating list of kpi dependencies
@type: string
@param session_key: the session key for the search
@type: dict
@param data: the lookup dict from service id to service data
@type: string
@param service_id: the current service to develop the list from
@type: string
@param: earliest_time: the earliest time in the interval to search over
@type: string
@param: latest_time: the latest time in the interval to search over
@rtype: list
@return: a list of impacting kpis sorted in order by rank
"""
if not service_id:
return []
service_data = {}
for service in data:
key = service['_key']
if not key:
continue
service_data[key] = service
service_connection = ITOAInterfaceUtils.service_connection(session_key, app_name="itsi")
visited = set()
should_run_degraded_entities_search = True
if not service_data.get(service_id) or not normalizeBoolean(
service_data.get(service_id).get('is_healthscore_calculate_by_entity_enabled')):
should_run_degraded_entities_search = False
degraded_entities_data = {}
if should_run_degraded_entities_search:
earliest_epoch_time, latest_epoch_time = get_epoch_times(session_key, earliest_time, latest_time)
degraded_entities_search = ('| mstats max(alert_level) AS alert_level WHERE `get_itsi_summary_metrics_index` '
'AND is_service_aggregate=0 '
'earliest=' + str(earliest_epoch_time) + ', latest=' + str(latest_epoch_time)
+ ' by itsi_service_id, itsi_kpi_id, entity_key, entity_title span=1m '
'| stats latest(alert_level) AS alert_level'
' by itsi_service_id, itsi_kpi_id, entity_key, entity_title '
'| where alert_level>4 | stats list(itsi_kpi_id) as itsi_kpi_id'
' by itsi_service_id, entity_key, entity_title, alert_level')
params = {
'earliest_time': earliest_time,
'latest_time': latest_time
}
search_job = service_connection.jobs.create(degraded_entities_search, **params)
if not wait_for_job(search_job):
search_time_out_msg = "Search for degraded entities timed out"
logger.error(search_time_out_msg)
raise Exception(search_time_out_msg)
search_results = json.load(search_job.results(**output_params))
results_data = search_results.get('results', [])
logger.debug('Results for degraded entities search: ' + str(results_data))
for result in results_data:
if result['entity_key'] != 'service_aggregate':
entity_data = {}
entity_data['alert_level'] = int(float(result['alert_level']))
entity_data['key'] = result['entity_key']
entity_data['title'] = result['entity_title']
if isinstance(result['itsi_kpi_id'], itsi_py3.string_type):
kpis = []
kpis.append(result['itsi_kpi_id'])
else:
kpis = result['itsi_kpi_id']
for kpi_id in kpis:
if kpi_id not in degraded_entities_data:
degraded_entities_data[kpi_id] = []
degraded_entities_data[kpi_id].append(entity_data)
logger.debug('Degraded entities data: ' + str(degraded_entities_data))
kpis = generate_impacting_dependencies_list_recursive(service_data, service_connection, visited, service_id,
degraded_entities_data, earliest_time, latest_time)
if not kpis or len(kpis) == 0:
return []
output = sorted(kpis, key=itemgetter('urgency'), reverse=True)
output = sorted(output, key=itemgetter('impact'))
# Limit number of results to 10
results = output[:10]
if should_run_degraded_entities_search:
for result in results:
kpi_id = result['_key']
if kpi_id in degraded_entities_data:
result['degraded_entities'] = sorted(degraded_entities_data[kpi_id],
key=itemgetter('alert_level'), reverse=True)
logger.debug('Generate KPIs dependencies results returned: ' + str(results))
return results
def generate_impacting_dependencies_list_recursive(data, service_connection, visited, service_id,
degraded_entities_data, earliest_time='-60m', latest_time='now'):
"""
For a given service, determine the kpis that have set its current health score and their relative impacts
@type: dict
@param data: the lookup dict from service id to service data
@type: object
@param service_connection: connection to the Splunk REST API
@type: set
@param visited: the set of tuples (from, to) denoting dependencies we have already visited
@type: string
@param service_id: the current service to develop the list from
@type: dict
@param degraded_entities_data: the dictionary containing degraded entities data
@type: string
@param: earliest_time: the earliest time in the interval to search over
@type: string
@param: latest_time: the latest time in the interval to search over
@rtype: list
@return: list of impacting kpis
"""
service = data.get(service_id)
if not service:
return []
contributing_kpis = {}
net_urgency = 0
# first, iterate the kpis and dependencies of the service
# and store them in a dict with their urgencies and associated service id
kpis = service.get('kpis', [])
for kpi in kpis:
if not kpi['_key'].startswith(SHKPI_STARTS_WITH):
urgency = int(kpi['urgency'])
# Skip the kpis which has importance set to 0 because they won't contribute to service healthscore
if urgency == 0:
continue
formatted_kpi = {
'_key': kpi['_key'],
'is_healthscore_calculate_by_entity_enabled': service.get('is_healthscore_calculate_by_entity_enabled'),
'service_id': service_id,
'title': kpi['title'],
'urgency': urgency
}
contributing_kpis[kpi['_key']] = formatted_kpi
net_urgency += urgency
dependencies = service.get('services_depends_on', [])
for service_dependency in dependencies:
kpi_dependencies = service_dependency.get('kpis_depending_on', [])
overloaded_urgencies = service_dependency.get('overloaded_urgencies', {})
dependent_service_id = service_dependency.get('serviceid', '')
dependent_service = data.get(dependent_service_id, '')
if dependent_service:
# if the dependent service has not been deleted, loop over its dependencies
for kpi in kpi_dependencies:
# if the kpi is a SHS, it either has an overloaded urgency or defaults to 11
# if the kpi is a kpi, it either has an overloaded urgency or defaults to 5
default_urgency = SHS_DEFAULT_URGENCY if kpi.startswith(SHKPI_STARTS_WITH) else KPI_DEFAULT_URGENCY
urgency = int(overloaded_urgencies.get(kpi, default_urgency))
# Skip the kpis which has importance set to 0 because they won't contribute to service healthscore
if urgency == 0:
continue
net_urgency += urgency
dependent_service_kpis = dependent_service.get('kpis')
kpi_data = next((item for item in dependent_service_kpis if item['_key'] == kpi), {})
title = kpi_data.get('title', '')
# if the dependent kpi has not been deleted, store its id, name, and urgency
if title:
svc_data = data.get(dependent_service_id)
contributing_kpis[kpi] = {
'_key': kpi,
'is_healthscore_calculate_by_entity_enabled': svc_data.get('is_healthscore_calculate_by_entity_enabled'),
'service_id': dependent_service_id,
'title': title,
'urgency': urgency
}
# build the search string using the keys of the parsed kpis
kpi_search_string = 'itsi_kpi_id IN (' + ','.join(contributing_kpis.keys()) + ') '
spl_search = ('| mstats latest(alert_level) AS alert_level, latest(alert_value) AS alert_value'
' WHERE `get_itsi_summary_metrics_index` ' + kpi_search_string
+ ' `metrics_service_level_kpi_only` by itsi_kpi_id, itsi_service_id | where alert_level > -1 '
'| lookup kpi_alert_info_lookup alert_level OUTPUT weighted_contribution AS severity')
params = {
'earliest_time': earliest_time,
'latest_time': latest_time
}
search_job = service_connection.jobs.create(spl_search, **params)
if not wait_for_job(search_job):
search_time_out_msg = "Search for dependent kpi severity timed out"
logger.error(search_time_out_msg)
raise Exception(search_time_out_msg)
search_results = json.load(search_job.results(**output_params))
kpi_severity_score_sum = 0
non_boom_keys = []
min_boom_kpi = {'impact': float('inf')}
list_min_boom_kpi = []
results_data = search_results.get('results', [])
logger.debug('Results for KPI dependencies search: ' + str(results_data))
if len(results_data) == 0:
logger.debug('generate_impacting_dependencies_list_recursive search did not return any results')
return []
# After the search is complete, we compute individual severities for each KPI
# Additionally, we note which boom kpi has the minimum severity and the sum of the non boom kpi impacts
for row in results_data:
kpi_result = row.get('itsi_kpi_id')
ref_kpi = contributing_kpis.get(kpi_result, '')
if not ref_kpi:
continue
ref_kpi['alert_value'] = row['alert_value'] if 'alert_value' in row else 'N/A'
ref_kpi['severity'] = int(row['alert_level'])
# Check for the info severity if kpi has info severity then skip it from the health score calculation
if ref_kpi.get('severity') == INFO_SEVERITY and kpi_result not in degraded_entities_data:
net_urgency -= ref_kpi.get('urgency')
continue
if ref_kpi.get('urgency') == 11:
if kpi_result in degraded_entities_data:
most_degraded_entity_severity = int(degraded_entities_data[kpi_result][0]['alert_level'])
if (most_degraded_entity_severity == CRITICAL_SEVERITY or ref_kpi['severity'] == CRITICAL_SEVERITY):
entity_severity = 0
else:
entity_severity = 30
ref_kpi['impact'] = entity_severity
else:
# if the kpi is a boom kpi, store it if it has the minimum impact
ref_kpi['impact'] = int(row['severity'])
min_boom_kpi = min([ref_kpi, min_boom_kpi], key=itemgetter('impact'))
if ref_kpi['impact'] == min_boom_kpi['impact']:
list_min_boom_kpi.append(ref_kpi)
else:
list_min_boom_kpi = []
kpi_severity_score_sum += int(ref_kpi['impact']) * ref_kpi['urgency']
non_boom_keys.append(row['itsi_kpi_id'])
else:
if kpi_result in degraded_entities_data:
most_degraded_entity_severity = int(degraded_entities_data[kpi_result][0]['alert_level'])
if (most_degraded_entity_severity == CRITICAL_SEVERITY or ref_kpi['severity'] == CRITICAL_SEVERITY):
entity_severity = 0
else:
entity_severity = 30
ref_kpi['impact'] = entity_severity * ref_kpi['urgency']
else:
# calculate the relative impact for each non boom kpi, add this to the sum, store the key
ref_kpi['impact'] = int(row['severity']) * ref_kpi['urgency']
kpi_severity_score_sum += ref_kpi['impact']
non_boom_keys.append(row['itsi_kpi_id'])
kpi_severity_score = 0 if net_urgency == 0 else kpi_severity_score_sum / net_urgency
if min_boom_kpi['impact'] > kpi_severity_score and len(non_boom_keys) > 0:
if net_urgency == 0:
return []
# non boom kpis have determined the current SHS
non_boom_kpi_output = {}
for kpi_key in non_boom_keys:
kpi = contributing_kpis[kpi_key]
if kpi_key.startswith(SHKPI_STARTS_WITH):
# skip the recursive step for cyclic dependencies
path = (service_id, kpi_key)
if path in visited:
continue
visited.add(path)
dependent_kpis = generate_impacting_dependencies_list_recursive(data, service_connection,
visited, kpi.get('service_id'),
degraded_entities_data, earliest_time,
latest_time)
relative_urgency = kpi['urgency'] / net_urgency
for dependent_kpi in dependent_kpis:
# scale the impact of a nested kpi by its SHS's relative urgency
dependent_kpi['impact'] *= relative_urgency
if dependent_kpi['_key'] not in non_boom_kpi_output:
non_boom_kpi_output[dependent_kpi['_key']] = dependent_kpi
else:
# if the kpi is already present, we add its impacts together
previous_impact = non_boom_kpi_output[dependent_kpi['_key']]['impact']
dependent_kpi['impact'] += previous_impact
non_boom_kpi_output[dependent_kpi['_key']] = dependent_kpi
else:
if kpi.get('_key') not in non_boom_kpi_output:
non_boom_kpi_output[kpi_key] = kpi
else:
# if the kpi is already present, we add its impacts together
previous_impact = non_boom_kpi_output[kpi_key]['impact']
kpi['impact'] += previous_impact
non_boom_kpi_output[kpi_key] = kpi
logger.debug('generate_impacting_dependencies_list_recursive results returned: ' + str(non_boom_kpi_output))
# convert the output to a list and return
return list(non_boom_kpi_output.values())
else:
# a boom kpi has determined the current SHS
if min_boom_kpi.get('_key').startswith(SHKPI_STARTS_WITH):
# skip the recursive step for cyclic dependencies
path = (service_id, min_boom_kpi.get('_key'))
if path in visited:
return []
visited.add(path)
# if the boom kpi is a service health score, we only want to return the kpis that comprise it
return generate_impacting_dependencies_list_recursive(data, service_connection, visited,
min_boom_kpi.get('service_id'),
degraded_entities_data, earliest_time, latest_time)
if len(list_min_boom_kpi) > 0:
logger.debug('generate_impacting_dependencies_list_recursive results as list: ' + str(list_min_boom_kpi))
return list_min_boom_kpi
logger.debug('generate_impacting_dependencies_list_recursive results as single value: ' + str(min_boom_kpi))
return [min_boom_kpi]
def wait_for_job(searchjob, maxtime=-1):
"""
Wait up to maxtime seconds for searchjob to finish. Returns true, if job finished.
"""
pause = 0.2
lapsed = 0.0
while not searchjob.is_done():
time.sleep(pause)
lapsed += pause
if maxtime >= 0 and lapsed > maxtime:
break
return searchjob.is_done()