You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

254 lines
10 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import itsi_py3
import json
import datetime
import time
import re
import splunk.rest as splunk_rest
import splunk.search as splunk_search
from splunk.util import safeURLQuote
from ITOA.setup_logging import logger as itsi_logger
from itsi.event_management.itsi_notable_event import ItsiNotableEvent
from itsi.objects.itsi_service import ItsiService
from itsi.objects.itsi_entity import ItsiEntity
class NotableEventDriftException(Exception):
pass
class NotableEventDrift(object):
def __init__(self, session_key, app='SA-ITOA', owner='nobody', logger=None,
audit_token_name='Auto Generated ITSI Notable Index Audit Token', **kwargs):
"""
Notable event Drift special process
@type session_key: basestring
@param session_key: session key
@type app: basestring or str
@param app: app name
@type owner: basestring or str
@param owner: owner name
@type logger: object
@param logger: logger
@type audit_token_name: basestring
@param audit_token_name: audit token name
@type kwargs: dict
@param kwargs: extra params
@rtype: instance of class
@return: object
"""
self.session_key = session_key
self.owner = owner
self.app = app
self.default_status = '1'
self.default_owner = 'unassigned'
self.default_severity = '6'
self.logger = logger if logger else itsi_logger
def transform_raw_drift_events(self, data):
if isinstance(data, itsi_py3.string_type):
try:
json_content = json.loads(data)
except Exception:
message = 'Failure parsing string data into json'
self.logger.exception(message)
raise NotableEventDriftException(message)
else:
json_content = data
if not isinstance(json_content, dict):
raise TypeError('Data is not a valid dictionary, data type is %s.', type(json_content))
self.logger.debug('Received raw drift event: %s', json_content)
event_data = self.transform_event_management_data(json_content)
if event_data:
try:
self.create_notable_event(event_data)
except Exception:
message = 'Notable event creation failed.'
self.logger.exception(message)
raise NotableEventDriftException(message)
else:
self.logger.debug('Notable event not generated.')
def parse_lookup_days(self, days):
"""
Parses the number of days in the lookback period. e.g. '-182d'
@type days: str
@param days: the lookback_period
@return: an int for the number of days
"""
match = re.search(r'-?\d+', days)
if match:
return int(match.group())
else:
raise ValueError('Not a valid lookup period string')
def transform_event_management_data(self, data):
"""
Map the incoming drift alert event into event management data structure.
Expected drift alert format
{
"service_id": "hDsEUODaP1OPDXCxd7jDDEXj",
"part_or_whole": "DRIFTED", # is this required?
"drift_type": "TREND",
"percent_drift": -81,
"start_time": 1695250800,
"end_time": 1695276000,
"threshold_time": 1695276000, # is this required?
"kpi_id": "s3nwuTVKm1rfHC9JcuvvKoUK"
"alert_type": "new"
}
@type data: dict
@param data: incoming drift alert event
@return: Transformed event management data
"""
kpi_id = data.get('kpi_id', '')
time_stamp = time.time()
source = 'DriftDetection'
event_identifier_fields = 'source, title, description, itsi_kpi_id'
kpi_title = None
event_data = None
service_id = data.get('service_id', '')
service_object = ItsiService(self.session_key, self.owner)
impacted_service = service_object.get(self.owner, service_id)
lookback_period = ''
aggregation_function = ''
aggregation_span = ''
threshold_direction = ''
tolerance_in_percent = 0
if not impacted_service:
self.logger.warn('No corresponding services were found, no drift alert message will be pushed')
return event_data
requested_kpis = impacted_service.get('kpis', [])
for kpi in requested_kpis:
if kpi_id == kpi.get('_key', ''):
# Will leave this condition for now in case.
if not kpi.get('has_drift_detection_enabled', False):
self.logger.info('Received alert from drift detection, but drift detection is not enabled, suppressing the drift detction alert')
return event_data
kpi_title = kpi.get('title', '')
drift_detection_config = kpi.get('drift_detection_configuration', {})
lookback_period = drift_detection_config.get('lookback_period', '-182d')
aggregation_function = drift_detection_config.get('aggregation_function', 'avg')
aggregation_span = drift_detection_config.get('aggregation_span', '1d')
threshold_direction = drift_detection_config.get('threshold_direction', 'both')
tolerance_in_percent = drift_detection_config.get('tolerance_in_percent', 85)
break
service_title = impacted_service.get('title', '')
# If KPI cannot be found, return None
if not kpi_title:
self.logger.info('The KPI %s in the service %s was not found, the KPI may have been deleted', kpi_id, service_id)
return event_data
drilldown_search_title = 'Drift detection results'
drilldown_search_search = (
f'| mstats earliest({lookback_period}) latest(alert_value) AS alert_value latest(alert_level) AS alert_level '
f'WHERE `get_itsi_summary_metrics_index` AND itsi_kpi_id={kpi_id} AND is_filled_gap_event!=1 AND is_null_alert_value=0 '
f'`metrics_service_level_kpi_only` by itsi_kpi_id, itsi_service_id span=1m | where alert_level!=-2'
f'| bin _time span={aggregation_span} '
f'| stats {aggregation_function}(alert_value) as alert_value by _time, itsi_kpi_id, itsi_service_id '
f'| table _time alert_value itsi_kpi_id itsi_service_id '
f'| detectdrift threshold={tolerance_in_percent} threshold_direction="{threshold_direction}"'
)
drilldown_search_latest_offset = '300'
drilldown_search_earliest_offset = '-132480'
try:
days = self.parse_lookup_days(lookback_period)
drilldown_search_earliest_offset = str(days * 1440)
except ValueError:
self.logger.warn('Unable to parse lookup_period for drilldown search offset')
status = self.default_status
severity = self.default_severity
owner = self.default_owner
# drift alert settings
drift_type = data.get('drift_type', 'LEVEL')
part_or_whole = data.get('part_or_whole', '')
percent_drift = int(data.get('percent_drift'))
start_time = int(data.get('start_time', ''))
end_time = int(data.get('end_time', ''))
threshold_time = int(data.get('threshold_time', ''))
alert_type = data.get('alert_type', 'new')
try:
mod_time = datetime.datetime.fromtimestamp(time_stamp).strftime('%Y-%m-%d %H:%M:%S.%f')
drift_time = datetime.datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S')
except Exception as exc:
self.logger.exception(exc)
mod_time = time_stamp
drift_time = mod_time
direction_string = 'increased' if percent_drift > 0 else 'decreased'
percentage = abs(percent_drift)
if drift_type == 'LEVEL':
title = f'Level drift detected for "{kpi_title}", KPI suddenly {direction_string} by {percentage}%'
description = f'Drift occurred for "{service_title}" at {drift_time}'
else:
title = f'Trending drift detected for "{kpi_title}", KPI gradually {direction_string} by {percentage}%'
description = f'Drift started for "{service_title}" at {drift_time}'
event_data = {
'status': status,
'severity': severity,
'owner': owner,
'title': title,
'description': description,
'_time': time_stamp,
'mod_time': mod_time,
'drilldown_search_search': drilldown_search_search,
'drilldown_search_title': drilldown_search_title,
'drilldown_search_latest_offset': drilldown_search_latest_offset,
'drilldown_search_earliest_offset': drilldown_search_earliest_offset,
'event_identifier_fields': event_identifier_fields,
'service_ids': service_id,
'kpiid': kpi_id,
'source': source,
'drift_type': drift_type,
'part_or_whole': part_or_whole,
'percent_drift': percent_drift,
'start_time': start_time,
'end_time': end_time,
'threshold_time': threshold_time,
'alert_type': alert_type
}
self.logger.debug('transformed event data: %s' % event_data)
return event_data
def create_notable_event(self, data):
"""
Create notable event based on the transformed data
@type data: dict
@param data: transformed data
@return: None
"""
try:
notable_event = ItsiNotableEvent(self.session_key)
event_id = notable_event.create(data)
self.logger.debug('notable event created, event id: %s' % event_id)
except Exception as e:
self.logger.exception('Unable to create notable event, check log for errors.')
raise NotableEventDriftException('Unable to create notable event, %s' % e)