You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
397 lines
18 KiB
397 lines
18 KiB
# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
|
|
|
|
import time
|
|
import json
|
|
from operator import itemgetter
|
|
|
|
from SA_ITOA_app_common.solnlib.splunk_rest_client import SplunkRestClient
|
|
import itsi_py3
|
|
from itsi.itsi_utils import ITOAInterfaceUtils
|
|
from ITOA.setup_logging import logger
|
|
from ITOA.itoa_common import get_current_utc_epoch
|
|
from splunk.util import normalizeBoolean
|
|
|
|
SHKPI_STARTS_WITH = 'SHKPI-'
|
|
SHS_DEFAULT_URGENCY = 11
|
|
KPI_DEFAULT_URGENCY = 5
|
|
CRITICAL_SEVERITY = 6
|
|
HIGH_SEVERITY = 5
|
|
INFO_SEVERITY = 1
|
|
|
|
output_params = {
|
|
'output_mode': 'json',
|
|
'count': 0
|
|
}
|
|
|
|
|
|
def get_epoch_times(session_key, earliest, latest):
|
|
"""
|
|
Convenience method to convert a set of latest and earliest to epoch
|
|
@rtype: tuple(earliest_time_in_seconds, latest_time_in_seconds)
|
|
@return: a tuple with the earliest and latest times in seconds
|
|
"""
|
|
try:
|
|
uri_string = '/services/search/timeparser'
|
|
_rest_client = SplunkRestClient(session_key, "SA-ITOA")
|
|
|
|
response = _rest_client.get(
|
|
uri_string, output_mode="json", time=[earliest, latest], output_time_format="%s"
|
|
).body.read()
|
|
resp = json.loads(response)
|
|
seconds_earliest = seconds_earliest_adjusted = float(resp[earliest])
|
|
seconds_latest = float(resp[latest])
|
|
logger.info(
|
|
'Result of getting earliest and latest times from timeparser: '
|
|
'response=%s', response)
|
|
# Will most likely occur each time since the minimum time range for SA is 45 minutes
|
|
if seconds_earliest + 15 * 60 < seconds_latest:
|
|
logger.info('The time range is too wide, adjusting earliest to 15 minutes before latest.')
|
|
seconds_earliest_adjusted = seconds_latest - 15 * 60
|
|
return seconds_earliest_adjusted, seconds_latest
|
|
except Exception:
|
|
logger.error('Error in getting getting earliest and latest times from timeparser, defaults to 15 minutes.')
|
|
latest_epoch_time = get_current_utc_epoch()
|
|
return latest_epoch_time - 15 * 60, latest_epoch_time
|
|
|
|
|
|
def generate_dependencies_json(session_key, data, service_id='', earliest_time='-60m', latest_time='now'):
|
|
"""
|
|
Entry point for generating list of kpi dependencies
|
|
|
|
@type: string
|
|
@param session_key: the session key for the search
|
|
|
|
@type: dict
|
|
@param data: the lookup dict from service id to service data
|
|
|
|
@type: string
|
|
@param service_id: the current service to develop the list from
|
|
|
|
@type: string
|
|
@param: earliest_time: the earliest time in the interval to search over
|
|
|
|
@type: string
|
|
@param: latest_time: the latest time in the interval to search over
|
|
|
|
@rtype: list
|
|
@return: a list of impacting kpis sorted in order by rank
|
|
"""
|
|
if not service_id:
|
|
return []
|
|
service_data = {}
|
|
for service in data:
|
|
key = service['_key']
|
|
if not key:
|
|
continue
|
|
service_data[key] = service
|
|
|
|
service_connection = ITOAInterfaceUtils.service_connection(session_key, app_name="itsi")
|
|
|
|
visited = set()
|
|
|
|
should_run_degraded_entities_search = True
|
|
|
|
if not service_data.get(service_id) or not normalizeBoolean(
|
|
service_data.get(service_id).get('is_healthscore_calculate_by_entity_enabled')):
|
|
should_run_degraded_entities_search = False
|
|
|
|
degraded_entities_data = {}
|
|
if should_run_degraded_entities_search:
|
|
earliest_epoch_time, latest_epoch_time = get_epoch_times(session_key, earliest_time, latest_time)
|
|
|
|
degraded_entities_search = ('| mstats max(alert_level) AS alert_level WHERE `get_itsi_summary_metrics_index` '
|
|
'AND is_service_aggregate=0 '
|
|
'earliest=' + str(earliest_epoch_time) + ', latest=' + str(latest_epoch_time)
|
|
+ ' by itsi_service_id, itsi_kpi_id, entity_key, entity_title span=1m '
|
|
'| stats latest(alert_level) AS alert_level'
|
|
' by itsi_service_id, itsi_kpi_id, entity_key, entity_title '
|
|
'| where alert_level>4 | stats list(itsi_kpi_id) as itsi_kpi_id'
|
|
' by itsi_service_id, entity_key, entity_title, alert_level')
|
|
params = {
|
|
'earliest_time': earliest_time,
|
|
'latest_time': latest_time
|
|
}
|
|
search_job = service_connection.jobs.create(degraded_entities_search, **params)
|
|
|
|
if not wait_for_job(search_job):
|
|
search_time_out_msg = "Search for degraded entities timed out"
|
|
logger.error(search_time_out_msg)
|
|
raise Exception(search_time_out_msg)
|
|
search_results = json.load(search_job.results(**output_params))
|
|
results_data = search_results.get('results', [])
|
|
logger.debug('Results for degraded entities search: ' + str(results_data))
|
|
for result in results_data:
|
|
if result['entity_key'] != 'service_aggregate':
|
|
entity_data = {}
|
|
entity_data['alert_level'] = int(float(result['alert_level']))
|
|
entity_data['key'] = result['entity_key']
|
|
entity_data['title'] = result['entity_title']
|
|
if isinstance(result['itsi_kpi_id'], itsi_py3.string_type):
|
|
kpis = []
|
|
kpis.append(result['itsi_kpi_id'])
|
|
else:
|
|
kpis = result['itsi_kpi_id']
|
|
for kpi_id in kpis:
|
|
if kpi_id not in degraded_entities_data:
|
|
degraded_entities_data[kpi_id] = []
|
|
degraded_entities_data[kpi_id].append(entity_data)
|
|
logger.debug('Degraded entities data: ' + str(degraded_entities_data))
|
|
|
|
kpis = generate_impacting_dependencies_list_recursive(service_data, service_connection, visited, service_id,
|
|
degraded_entities_data, earliest_time, latest_time)
|
|
if not kpis or len(kpis) == 0:
|
|
return []
|
|
|
|
output = sorted(kpis, key=itemgetter('urgency'), reverse=True)
|
|
output = sorted(output, key=itemgetter('impact'))
|
|
# Limit number of results to 10
|
|
results = output[:10]
|
|
if should_run_degraded_entities_search:
|
|
for result in results:
|
|
kpi_id = result['_key']
|
|
if kpi_id in degraded_entities_data:
|
|
result['degraded_entities'] = sorted(degraded_entities_data[kpi_id],
|
|
key=itemgetter('alert_level'), reverse=True)
|
|
logger.debug('Generate KPIs dependencies results returned: ' + str(results))
|
|
return results
|
|
|
|
|
|
def generate_impacting_dependencies_list_recursive(data, service_connection, visited, service_id,
|
|
degraded_entities_data, earliest_time='-60m', latest_time='now'):
|
|
"""
|
|
For a given service, determine the kpis that have set its current health score and their relative impacts
|
|
|
|
@type: dict
|
|
@param data: the lookup dict from service id to service data
|
|
|
|
@type: object
|
|
@param service_connection: connection to the Splunk REST API
|
|
|
|
@type: set
|
|
@param visited: the set of tuples (from, to) denoting dependencies we have already visited
|
|
|
|
@type: string
|
|
@param service_id: the current service to develop the list from
|
|
|
|
@type: dict
|
|
@param degraded_entities_data: the dictionary containing degraded entities data
|
|
|
|
@type: string
|
|
@param: earliest_time: the earliest time in the interval to search over
|
|
|
|
@type: string
|
|
@param: latest_time: the latest time in the interval to search over
|
|
|
|
@rtype: list
|
|
@return: list of impacting kpis
|
|
"""
|
|
service = data.get(service_id)
|
|
if not service:
|
|
return []
|
|
|
|
contributing_kpis = {}
|
|
net_urgency = 0
|
|
|
|
# first, iterate the kpis and dependencies of the service
|
|
# and store them in a dict with their urgencies and associated service id
|
|
kpis = service.get('kpis', [])
|
|
for kpi in kpis:
|
|
if not kpi['_key'].startswith(SHKPI_STARTS_WITH):
|
|
urgency = int(kpi['urgency'])
|
|
# Skip the kpis which has importance set to 0 because they won't contribute to service healthscore
|
|
if urgency == 0:
|
|
continue
|
|
formatted_kpi = {
|
|
'_key': kpi['_key'],
|
|
'is_healthscore_calculate_by_entity_enabled': service.get('is_healthscore_calculate_by_entity_enabled'),
|
|
'service_id': service_id,
|
|
'title': kpi['title'],
|
|
'urgency': urgency
|
|
}
|
|
contributing_kpis[kpi['_key']] = formatted_kpi
|
|
net_urgency += urgency
|
|
|
|
dependencies = service.get('services_depends_on', [])
|
|
for service_dependency in dependencies:
|
|
kpi_dependencies = service_dependency.get('kpis_depending_on', [])
|
|
overloaded_urgencies = service_dependency.get('overloaded_urgencies', {})
|
|
dependent_service_id = service_dependency.get('serviceid', '')
|
|
dependent_service = data.get(dependent_service_id, '')
|
|
if dependent_service:
|
|
# if the dependent service has not been deleted, loop over its dependencies
|
|
for kpi in kpi_dependencies:
|
|
# if the kpi is a SHS, it either has an overloaded urgency or defaults to 11
|
|
# if the kpi is a kpi, it either has an overloaded urgency or defaults to 5
|
|
default_urgency = SHS_DEFAULT_URGENCY if kpi.startswith(SHKPI_STARTS_WITH) else KPI_DEFAULT_URGENCY
|
|
urgency = int(overloaded_urgencies.get(kpi, default_urgency))
|
|
# Skip the kpis which has importance set to 0 because they won't contribute to service healthscore
|
|
if urgency == 0:
|
|
continue
|
|
net_urgency += urgency
|
|
|
|
dependent_service_kpis = dependent_service.get('kpis')
|
|
kpi_data = next((item for item in dependent_service_kpis if item['_key'] == kpi), {})
|
|
title = kpi_data.get('title', '')
|
|
# if the dependent kpi has not been deleted, store its id, name, and urgency
|
|
if title:
|
|
svc_data = data.get(dependent_service_id)
|
|
contributing_kpis[kpi] = {
|
|
'_key': kpi,
|
|
'is_healthscore_calculate_by_entity_enabled': svc_data.get('is_healthscore_calculate_by_entity_enabled'),
|
|
'service_id': dependent_service_id,
|
|
'title': title,
|
|
'urgency': urgency
|
|
}
|
|
|
|
# build the search string using the keys of the parsed kpis
|
|
kpi_search_string = 'itsi_kpi_id IN (' + ','.join(contributing_kpis.keys()) + ') '
|
|
|
|
spl_search = ('| mstats latest(alert_level) AS alert_level, latest(alert_value) AS alert_value'
|
|
' WHERE `get_itsi_summary_metrics_index` ' + kpi_search_string
|
|
+ ' `metrics_service_level_kpi_only` by itsi_kpi_id, itsi_service_id | where alert_level > -1 '
|
|
'| lookup kpi_alert_info_lookup alert_level OUTPUT weighted_contribution AS severity')
|
|
params = {
|
|
'earliest_time': earliest_time,
|
|
'latest_time': latest_time
|
|
}
|
|
|
|
search_job = service_connection.jobs.create(spl_search, **params)
|
|
|
|
if not wait_for_job(search_job):
|
|
search_time_out_msg = "Search for dependent kpi severity timed out"
|
|
logger.error(search_time_out_msg)
|
|
raise Exception(search_time_out_msg)
|
|
|
|
search_results = json.load(search_job.results(**output_params))
|
|
|
|
kpi_severity_score_sum = 0
|
|
non_boom_keys = []
|
|
min_boom_kpi = {'impact': float('inf')}
|
|
list_min_boom_kpi = []
|
|
|
|
results_data = search_results.get('results', [])
|
|
logger.debug('Results for KPI dependencies search: ' + str(results_data))
|
|
|
|
if len(results_data) == 0:
|
|
logger.debug('generate_impacting_dependencies_list_recursive search did not return any results')
|
|
return []
|
|
|
|
# After the search is complete, we compute individual severities for each KPI
|
|
# Additionally, we note which boom kpi has the minimum severity and the sum of the non boom kpi impacts
|
|
for row in results_data:
|
|
kpi_result = row.get('itsi_kpi_id')
|
|
ref_kpi = contributing_kpis.get(kpi_result, '')
|
|
if not ref_kpi:
|
|
continue
|
|
ref_kpi['alert_value'] = row['alert_value'] if 'alert_value' in row else 'N/A'
|
|
ref_kpi['severity'] = int(row['alert_level'])
|
|
# Check for the info severity if kpi has info severity then skip it from the health score calculation
|
|
if ref_kpi.get('severity') == INFO_SEVERITY and kpi_result not in degraded_entities_data:
|
|
net_urgency -= ref_kpi.get('urgency')
|
|
continue
|
|
if ref_kpi.get('urgency') == 11:
|
|
if kpi_result in degraded_entities_data:
|
|
most_degraded_entity_severity = int(degraded_entities_data[kpi_result][0]['alert_level'])
|
|
if (most_degraded_entity_severity == CRITICAL_SEVERITY or ref_kpi['severity'] == CRITICAL_SEVERITY):
|
|
entity_severity = 0
|
|
else:
|
|
entity_severity = 30
|
|
ref_kpi['impact'] = entity_severity
|
|
else:
|
|
# if the kpi is a boom kpi, store it if it has the minimum impact
|
|
ref_kpi['impact'] = int(row['severity'])
|
|
min_boom_kpi = min([ref_kpi, min_boom_kpi], key=itemgetter('impact'))
|
|
if ref_kpi['impact'] == min_boom_kpi['impact']:
|
|
list_min_boom_kpi.append(ref_kpi)
|
|
else:
|
|
list_min_boom_kpi = []
|
|
kpi_severity_score_sum += int(ref_kpi['impact']) * ref_kpi['urgency']
|
|
non_boom_keys.append(row['itsi_kpi_id'])
|
|
else:
|
|
if kpi_result in degraded_entities_data:
|
|
most_degraded_entity_severity = int(degraded_entities_data[kpi_result][0]['alert_level'])
|
|
if (most_degraded_entity_severity == CRITICAL_SEVERITY or ref_kpi['severity'] == CRITICAL_SEVERITY):
|
|
entity_severity = 0
|
|
else:
|
|
entity_severity = 30
|
|
ref_kpi['impact'] = entity_severity * ref_kpi['urgency']
|
|
else:
|
|
# calculate the relative impact for each non boom kpi, add this to the sum, store the key
|
|
ref_kpi['impact'] = int(row['severity']) * ref_kpi['urgency']
|
|
kpi_severity_score_sum += ref_kpi['impact']
|
|
non_boom_keys.append(row['itsi_kpi_id'])
|
|
|
|
kpi_severity_score = 0 if net_urgency == 0 else kpi_severity_score_sum / net_urgency
|
|
if min_boom_kpi['impact'] > kpi_severity_score and len(non_boom_keys) > 0:
|
|
if net_urgency == 0:
|
|
return []
|
|
# non boom kpis have determined the current SHS
|
|
non_boom_kpi_output = {}
|
|
for kpi_key in non_boom_keys:
|
|
kpi = contributing_kpis[kpi_key]
|
|
if kpi_key.startswith(SHKPI_STARTS_WITH):
|
|
# skip the recursive step for cyclic dependencies
|
|
path = (service_id, kpi_key)
|
|
if path in visited:
|
|
continue
|
|
visited.add(path)
|
|
|
|
dependent_kpis = generate_impacting_dependencies_list_recursive(data, service_connection,
|
|
visited, kpi.get('service_id'),
|
|
degraded_entities_data, earliest_time,
|
|
latest_time)
|
|
relative_urgency = kpi['urgency'] / net_urgency
|
|
for dependent_kpi in dependent_kpis:
|
|
# scale the impact of a nested kpi by its SHS's relative urgency
|
|
dependent_kpi['impact'] *= relative_urgency
|
|
if dependent_kpi['_key'] not in non_boom_kpi_output:
|
|
non_boom_kpi_output[dependent_kpi['_key']] = dependent_kpi
|
|
else:
|
|
# if the kpi is already present, we add its impacts together
|
|
previous_impact = non_boom_kpi_output[dependent_kpi['_key']]['impact']
|
|
dependent_kpi['impact'] += previous_impact
|
|
non_boom_kpi_output[dependent_kpi['_key']] = dependent_kpi
|
|
else:
|
|
if kpi.get('_key') not in non_boom_kpi_output:
|
|
non_boom_kpi_output[kpi_key] = kpi
|
|
else:
|
|
# if the kpi is already present, we add its impacts together
|
|
previous_impact = non_boom_kpi_output[kpi_key]['impact']
|
|
kpi['impact'] += previous_impact
|
|
non_boom_kpi_output[kpi_key] = kpi
|
|
logger.debug('generate_impacting_dependencies_list_recursive results returned: ' + str(non_boom_kpi_output))
|
|
# convert the output to a list and return
|
|
return list(non_boom_kpi_output.values())
|
|
else:
|
|
# a boom kpi has determined the current SHS
|
|
if min_boom_kpi.get('_key').startswith(SHKPI_STARTS_WITH):
|
|
# skip the recursive step for cyclic dependencies
|
|
path = (service_id, min_boom_kpi.get('_key'))
|
|
if path in visited:
|
|
return []
|
|
visited.add(path)
|
|
# if the boom kpi is a service health score, we only want to return the kpis that comprise it
|
|
return generate_impacting_dependencies_list_recursive(data, service_connection, visited,
|
|
min_boom_kpi.get('service_id'),
|
|
degraded_entities_data, earliest_time, latest_time)
|
|
|
|
if len(list_min_boom_kpi) > 0:
|
|
logger.debug('generate_impacting_dependencies_list_recursive results as list: ' + str(list_min_boom_kpi))
|
|
return list_min_boom_kpi
|
|
logger.debug('generate_impacting_dependencies_list_recursive results as single value: ' + str(min_boom_kpi))
|
|
return [min_boom_kpi]
|
|
|
|
|
|
def wait_for_job(searchjob, maxtime=-1):
|
|
"""
|
|
Wait up to maxtime seconds for searchjob to finish. Returns true, if job finished.
|
|
"""
|
|
pause = 0.2
|
|
lapsed = 0.0
|
|
while not searchjob.is_done():
|
|
time.sleep(pause)
|
|
lapsed += pause
|
|
if maxtime >= 0 and lapsed > maxtime:
|
|
break
|
|
return searchjob.is_done()
|