# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. import uuid import asyncio import sys from splunk.clilib.bundle_paths import make_splunkhome_path sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib', 'SA_ITOA_app_common'])) from ITOA.setup_logging import logger from ITOA.itoa_common import get_current_utc_epoch, is_feature_enabled from ITOA.event_management.notable_event_error import NotableEventBadRequest from .base_event_management import BaseEventManagement from .notable_event_utils import MethodType from .push_event_manager import PushEventManager from .itsi_nats_publish import NatsEventPublisher class NotableEventGroup(BaseEventManagement): """ Class to create, update, get and delete group state Use to store notable event comments { _key: Random key object_type: notable_event_group, owner: , severity: , status: , mod_time: : } """ def __init__(self, session_key, current_user_name=None, collection='itsi_notable_group_user', object_type='notable_event_group', user='nobody', action_dispatch_config=None, **kwargs): """ Initialize @param session_key: session key @param collection: collection name @param object_type: object type @param user: user context to save @type action_dispatch_config: ActionDispatchConfiguration @param action_dispatch_config: the setting for hybrid action dispatch @param kwargs: extra args @return: """ # Initialized base event object super(NotableEventGroup, self).__init__( session_key, collection, object_type, user, current_user_name, action_dispatch_config=action_dispatch_config ) self.session_key = session_key self.mod_time_key = 'mod_time' self.create_time_key = 'create_time' self.user = 'nobody' self.logger = logger self.kwargs = kwargs if action_dispatch_config: self.host_base_uri = action_dispatch_config.remote_ea_mgmt_uri self.master_session_key = action_dispatch_config.get_master_host_session_key() self.is_send_episode_event = is_feature_enabled('itsi-send-episode-event', self.session_key) def pre_processing(self, data_list, method): """ Add mod_time and event time to the group @type data_list: list @param data_list: list of data to validate and add time, user info etc @type method: basestring @param method: method type @rtype: list @return: It updates list in place and also return it back as well """ if not isinstance(data_list, list): raise TypeError('Data is not a valid list, data_list type is %s.', type(data_list)) for data in data_list: # Make sure data is valid dict if not isinstance(data, dict): raise TypeError('Data is not a valid dictionary.') time_value = get_current_utc_epoch() if method == MethodType.CREATE: # Add mod time, create time data[self.create_time_key] = time_value if method not in (MethodType.DELETE, MethodType.DELETE_BULK, MethodType.GET, MethodType.GET_BULK): # Need to set mod time for create and update data[self.mod_time_key] = time_value return data_list def _get_activity(self, updated_data, activity_type=None): """ Return activity which is happening during update @type updated_data: dict @param updated_data: data to get activity @type activity_type: basestring @param activity_type: type of activity @rtype: basestring @return: activity log statement """ activity_tracking = '' keys_to_del = [] if activity_type == 'acknowledge': return '{0} successfully acknowledged episode.'.format(updated_data.get('owner')) self.lazy_init_notable_event_configuration() for key, value in updated_data.items(): if key.startswith('__old__'): actual_key = key[len('__old__'):] # For status and severity, stores its level old_value = value new_value = updated_data[actual_key] # Put label along with id to show them pretty if actual_key == 'status' or actual_key == 'severity': if actual_key == 'status': old_value = self.notable_event_configuration.status_contents.get(old_value, {})\ .get('label') + " ({0})".format(old_value) new_value = self.notable_event_configuration.status_contents.get(new_value, {})\ .get('label') + " ({0})".format(new_value) if actual_key == 'severity': old_value = self.notable_event_configuration.severity_contents.get(old_value, {})\ .get('label') + " ({0})".format(old_value) new_value = self.notable_event_configuration.severity_contents.get(new_value, {})\ .get('label') + " ({0})".format(new_value) activity_tracking += '{0} changed from {0}="{1}" to {0}="{2}". '.format(actual_key, old_value, new_value) keys_to_del.append(key) # delete old entry in the dict for key in keys_to_del: del updated_data[key] if not activity_tracking and updated_data: fields = set(updated_data.keys()).intersection(set(['status', 'severity', 'owner'])) activity_tracking = 'Updated ' for field in fields: value = updated_data[field] if field == 'severity': value = self.notable_event_configuration.severity_contents.get(value, {}).get('label', '') + " ({0})".format(value) if field == 'status': value = self.notable_event_configuration.status_contents.get(value, {}).get('label', '') + " ({0})".format(value) activity_tracking += ' {0}={1} '.format(field, value) return activity_tracking def create(self, data, **kwargs): """ Create notable event group @type data - dict @param data - notable event group schema to create @rtype dict @return create object _key or raise an exception """ # We need to set _key because _key should be same as group_id # if we set this value in payload then generic facade understand # that as update instead of create, hence we are passing as different # then _key # group_id comes from the UI and should not be confused with itsi_group_id # To-Fix: # - Update UI to use itsi_group_id if isinstance(data, dict) and 'group_id' in data: data['_key'] = data.pop('group_id') activity = self._get_activity(data, data.pop('action_type', None)) ret = super(NotableEventGroup, self).create(data, **kwargs) # Create is kind of update here because group had already create with some initial state # now we are tracking it's state by creating record in KV self.send_activity_to_audit({ 'event_id': data.get('_key'), 'itsi_policy_id': data.get('itsi_policy_id') }, activity, 'Episode update') self.check_to_send_break_group_event(data['_key'], **kwargs) return ret def create_bulk(self, data_list, **kwargs): """ Create more than one notable event group @type data_list: list @param data_list: data list @rtype: list @return: list of created """ activities = [] activities_data = [] action_type = kwargs.pop('action_type', None) if isinstance(data_list, list): for data in data_list: if 'group_id' not in data: continue data['_key'] = data.pop('group_id', None) activities.append(self._get_activity(data, action_type)) activities_data.append({ 'event_id': data.get('_key'), 'itsi_policy_id': data.get('itsi_policy_id') }) ret = super(NotableEventGroup, self).create_bulk(data_list, **kwargs) # Create is kind of update here because group had already create with some initial state # now we are tracking it's state by creating record in KV self.send_activity_to_audit(activities_data, activities, 'Episode bulk update') self.check_to_send_multiple_break_group_events(data_list, **kwargs) return ret def update(self, object_id, data, is_partial_update=False, **kwargs): """ Update one notable event @type object_id: basestring @param object_id: object id @type data: dict @param data: data @type is_partial_update: bool @param is_partial_update: flag to do partial update @type kwargs: dict @param kwargs: Extra parameters @rtype: dict @return: return dict which holds updated keys """ activity = self._get_activity(data, data.pop('action_type', None)) fields_to_update = {} if kwargs.get('break_group_policy_id', False): self.check_break_group_event_fields(data) if not self.is_send_episode_event and kwargs.get('episode_update_policy_id', False): raise NotableEventBadRequest('Unable to send event as the feature flag itsi-send-episode-event is disabled') if kwargs.get('episode_update_policy_id', False) and self.is_send_episode_event: self.check_itsi_episode_event(data) if 'group_state_change_action' in data: # if there is 'group_state_change_action' field in the data then this means that this is a group state change action # triggered from the Rules Engine side. In this case, let's do an update_bulk instead because there is # special logic for handling group state change actions in update_bulk. That special logic is for handling # group updates for groups which are not yet inserted in KV Store. ret = super(NotableEventGroup, self).update_bulk([object_id], [data], is_partial_update, **kwargs) elif kwargs.get('episode_update_policy_id', False) and self.is_send_episode_event and not kwargs.get('break_group_policy_id', False): # check if there is any update in status, severity or owner of the episode fields_to_update = self.get_episode_fields_to_update(data) ret = super(NotableEventGroup, self).update(object_id, data, is_partial_update, **kwargs) else: ret = super(NotableEventGroup, self).update(object_id, data, is_partial_update, **kwargs) self.send_activity_to_audit({ 'event_id': data.get('_key'), 'itsi_policy_id': data.get('itsi_policy_id') }, activity, 'Episode update') if (self.is_send_episode_event): self.check_to_send_itsi_episode_event(data.get('_key'), fields_to_update, **kwargs) self.check_to_send_break_group_event(data.get('_key'), **kwargs) return ret def update_bulk(self, object_ids, data_list, is_partial_update=False, **kwargs): """ Perform update for one or more notable event groups @type object_ids: list @param object_ids: notable events @type data_list: list @param data_list: notable events @type is_partial_update: bool @param is_partial_update: flag for partial update @type kwargs: dict @param kwargs: Extra params to perform @rtype: list @return: update notable event schema """ activities = [] activities_data = [] fields_to_update_list = {} action_type = kwargs.pop('action_type', None) if kwargs.get('break_multiple_groups', False): for data in data_list: self.check_break_group_event_fields(data) if not self.is_send_episode_event and kwargs.get('send_multiple_episode_updates', False): raise NotableEventBadRequest('Unable to send events as the feature flag itsi-send-episode-event is disabled') if kwargs.get('send_multiple_episode_updates', False) and self.is_send_episode_event: for data in data_list: self.check_itsi_episode_event(data) for data in data_list: if 'group_id' in data: data['_key'] = data.pop('group_id', None) activities.append(self._get_activity(data, action_type)) activities_data.append({ 'event_id': data.get('_key'), 'itsi_policy_id': data.get('itsi_policy_id') }) # if there is any update in status, severity or owner of the episodes, # then we check and create am itsi_episode_event and push it in the index itsi_tracked_alerts. fields_to_update = {} fields_to_update = self.get_episode_fields_to_update(data) if fields_to_update and not data.get('break_group_policy_id', False) and data.get('episode_update_policy_id', False): fields_to_update_list[data.get('_key')] = fields_to_update ret = super(NotableEventGroup, self).update_bulk(object_ids, data_list, is_partial_update, **kwargs) self.send_activity_to_audit(activities_data, activities, 'Episode bulk update') if self.is_send_episode_event: self.check_to_send_multiple_episode_update_events(data_list, fields_to_update_list, **kwargs) self.check_to_send_multiple_break_group_events(data_list, **kwargs) return ret def add_drilldown(self, object_id, drilldown, is_partial_update=True, **kwargs): """ Add drilldown link to notable event group @type object_id: basestring @param object_id: object id @type drilldown: dict @param drilldown: drilldown to be added @type is_partial_update: bool @param is_partial_update: flag to do partial update @type kwargs: dict @param kwargs: Extra parameters @rtype: dict @return: return dict which holds updated keys """ if not self.is_valid_drilldown(drilldown): raise ValueError('Drilldown data must have link and name') group = self.get(object_id) clean_drilldown = self._clean_drilldown(drilldown) try: drilldown_list = group.get('drilldown', []) except AttributeError: raise TypeError('Group is not of type dict') try: drilldown_list.append(clean_drilldown) except AttributeError: raise TypeError('Drilldown field is not of type list') ret = super(NotableEventGroup, self).update(object_id, {'drilldown': drilldown_list}, is_partial_update, **kwargs) return ret def update_drilldown(self, object_id, drilldown, is_partial_update=True, **kwargs): """ Update drilldown for a NotableEventGroup @type object_id: basestring @param object_id: object id @type drilldown: dict @param drilldown: drilldown to be updated @type is_partial_update: bool @param is_partial_update: flag to do partial update @type kwargs: dict @param kwargs: extra parameters @rtype: dict @return: return dict which holds updated keys """ if not self.is_valid_drilldown(drilldown): raise ValueError('Drilldown data must have link and name') group = self.get(object_id) clean_drilldown = self._clean_drilldown(drilldown) try: drilldown_list = group.get('drilldown', []) except AttributeError: raise TypeError('Group is not of type dict') drilldown_index = self._find_drilldown(drilldown_list, clean_drilldown) if not drilldown_list or drilldown_index is None: ret = self.add_drilldown(object_id, clean_drilldown, is_partial_update, **kwargs) return ret try: drilldown_list[drilldown_index].update(clean_drilldown) except IndexError: raise IndexError('Drilldown index of: {0} out of bounds for drilldown list.'.format(drilldown_index)) except ValueError: raise ValueError('Non dictionary type given for drilldown.') except TypeError: raise TypeError('Drilldown index given is not an integer.') except AttributeError: raise AttributeError('Drilldown list item at index: {0} is not of type dict'.format(drilldown_index)) ret = super(NotableEventGroup, self).update(object_id, {'drilldown': drilldown_list}, is_partial_update, **kwargs) return ret def delete_drilldown(self, object_id, drilldown, is_partial_update=True, **kwargs): """ Delete drilldown for a NotableEventGroup @type object_id: basestring @param object_id: object id @type drilldown: dict @param drilldown: drilldown to be updated @type is_partial_update: bool @param is_partial_update: flag to do partial update @type kwargs: dict @param kwargs: extra parameters @rtype: dict @return: return dict which holds updated keys """ if not self.is_valid_drilldown(drilldown): raise ValueError('Drilldown data must have link and name') group = self.get(object_id) clean_drilldown = self._clean_drilldown(drilldown) try: drilldown_list = group.get('drilldown', []) except AttributeError: raise TypeError('Group is not of type dict') drilldown_index = self._find_drilldown(drilldown_list, clean_drilldown) if drilldown_index is None: raise KeyError('Drilldown with name: {0} not found.'.format(drilldown['name'])) try: drilldown_list.pop(drilldown_index) except AttributeError: raise AttributeError('Drilldown list is not of type list.') except TypeError: raise TypeError('Drilldown index given is not an integer.') except IndexError: raise IndexError('Drilldown index of: {0} out of bounds for drilldown list.'.format(drilldown_index)) ret = super(NotableEventGroup, self).update(object_id, {'drilldown': drilldown_list}, is_partial_update, **kwargs) return ret def check_to_send_break_group_event(self, group_id, **kwargs): """ Check to see if you need to send an event to break the group by looking through kwargs for a break group flag @type group_id: basestring @param group_id: the id of the group @type kwargs: dict @param kwargs: Extra params to perform """ # If we detect a policy id for breaking the group, then sent an event to the rules engine to break the group break_group_policy_id = kwargs.get('break_group_policy_id', False) if break_group_policy_id: self.send_break_group_event(group_id=group_id, policy_id=break_group_policy_id, **kwargs) def check_to_send_itsi_episode_event(self, group_id, fields_to_update, **kwargs): """ Check to see if you need to send an event to break the group by looking through kwargs for a break group flag @type group_id: basestring @param group_id: the id of the group @type fields_to_update: dict @param fields_to_update: dict of values to update and push @type kwargs: dict @param kwargs: Extra params to perform """ # If we detect a policy id for episode update, then sent an event to the rules engine to update the group info episode_update_policy_id = kwargs.get('episode_update_policy_id', False) if episode_update_policy_id and fields_to_update and not kwargs.get('break_group_policy_id', False): self.send_itsi_episode_event(fields_to_update, group_id=group_id, policy_id=episode_update_policy_id) def check_to_send_multiple_episode_update_events(self, data_list, fields_to_update_list, **kwargs): """ Check to see if you need to send an event to udpate the episodes by looking through kwargs for episode update flag @type data_list: list @param data_list: list of group informations @type fields_to_update_list: dict @param fields_to_update_list: dict of values to update and push @type kwargs: dict @param kwargs: Extra params to perform """ # If we detect a policy id for updating the group, then sent an event to the rules engine to update the group send_multiple_episode_updates = kwargs.get('send_multiple_episode_updates', False) if send_multiple_episode_updates: self.send_multiple_episode_update_events(group_list=data_list, fields_to_update_list=fields_to_update_list) def check_to_send_multiple_break_group_events(self, data_list, **kwargs): """ Check to see if you need to send an event to break the group by looking through kwargs for a break group flag @type data_list: list @param data_list: notable events @type kwargs: dict @param kwargs: Extra params to perform """ # If we detect a policy id for breaking the group, then sent an event to the rules engine to break the group break_multiple_groups = kwargs.get('break_multiple_groups', False) if break_multiple_groups: self.send_multiple_break_group_events(group_list=data_list) def send_break_group_event(self, group_id, policy_id, **kwargs): """ Sends an event to the itsi_tracked_alerts index to break a specified group @type group_id: basestring @param group_id: the id of the group to be broken @type policy_id: basestring @param policy_id: the id of the group to be broken @type kwargs: dict @param kwargs: Extra params to perform @return: """ push_event_manager = PushEventManager( self.session_key, 'Auto Generated ITSI Event Management Token' ) event = { 'event_id': str(uuid.uuid1()), 'itsi_policy_id': policy_id, 'itsi_group_id': group_id, 'break_group_flag': True } event_info = self.get(group_id, **kwargs) if 'title' in list(event_info.keys()): macro_fields = ('title', 'description', 'severity', 'owner', 'status') # array for ACE fields ace_fields = [] if 'is_ace_enabled' in event_info: if 'itsi_group_ace_category_values' in event_info: ace_fields.append('itsi_group_ace_category_values') if 'itsi_group_ace_text_values' in event_info: ace_fields.append('itsi_group_ace_text_values') if len(ace_fields) > 0: macro_fields = macro_fields + tuple(ace_fields) event.update({(key, event_info[key]) for key in macro_fields}) push_event_manager.push_event(event, source='itsi@internal@group_closing_event', time=str(get_current_utc_epoch())) if push_event_manager.is_queue_mode_enabled: event['source'] = 'itsi@internal@group_closing_event' event['_time'] = str(get_current_utc_epoch()) nats_publisher = NatsEventPublisher(self.session_key, self.logger) asyncio.run(nats_publisher.push_events_to_nats([event])) def send_itsi_episode_event(self, update_info, group_id, policy_id): """ Sends an event to the itsi_tracked_alerts index to change the state of a specified group @type update_info: dict @param update_info: contains the fields that are changed in the group @type group_id: basestring @param group_id: the id of the group whose status is changed @type policy_id: basestring @param policy_id: the id of the policy by which group is created @type kwargs: dict @param kwargs: Extra params to perform """ push_event_manager = PushEventManager( self.session_key, 'Auto Generated ITSI Event Management Token' ) event = { 'event_id': str(uuid.uuid1()), 'itsi_policy_id': policy_id, 'itsi_group_id': group_id, 'is_itsi_episode_event': True } event.update(update_info) push_event_manager.push_event(event, source='itsi@internal@itsi_episode_event', time=str(get_current_utc_epoch())) if push_event_manager.is_queue_mode_enabled: event['source'] = 'itsi@internal@itsi_episode_event' event['_time'] = str(get_current_utc_epoch()) nats_publisher = NatsEventPublisher(self.session_key, self.logger) asyncio.run(nats_publisher.push_events_to_nats([event])) def send_multiple_episode_update_events(self, group_list, fields_to_update_list): """ Sends an event to the itsi_tracked_alerts index to update the episode info @type group_list: list @param group_list: notable events @type fields_to_update_list: dict @param fields_to_update_list: dict of values to update and push @return: """ for group in group_list: if 'episode_update_policy_id' in group and group.get('_key') in fields_to_update_list: self.send_itsi_episode_event(fields_to_update_list[group.get('_key')], group.get('_key'), group.get('episode_update_policy_id')) def send_multiple_break_group_events(self, group_list): """ Sends an event to the itsi_tracked_alerts index to break a specified group @type group_list: list @param group_list: notable events @return: """ for group in group_list: if 'break_group_policy_id' not in group: continue group_id = group.pop('_key', None) policy_id = group.pop('break_group_policy_id', None) self.send_break_group_event(group_id, policy_id, **group) def check_break_group_event_fields(self, data): """ Check to see if the break group event has all the required fields present @type data: dict @param data: data for break group event @return: """ macro_fields = ('title', 'description', 'severity', 'owner', 'status') for field in macro_fields: if field not in data: raise NotableEventBadRequest('Unable to send the event due to missing field in data %s' % data) def check_itsi_episode_event(self, data): """ Check to see if the episode update event has the required fields to update @type data: dict @param data: data for episode event @return: """ if not (any(key in data for key in ('status', 'severity', 'owner'))): raise NotableEventBadRequest('Unable to send event as the payload does not have status, severity or owner') def get_episode_fields_to_update(self, data): """ Get the episode updated fields to be send to the event @type data: dict @param data: data for episode event @return: dict of the key value pair for the episode updates """ update_info = {} keys_to_check = ['status', 'severity', 'owner'] for key in keys_to_check: if key in data: update_info[key] = data[key] if 'is_ace_enabled' in data: if 'itsi_group_ace_category_values' in data: update_info['itsi_group_ace_category_values'] = data.get('itsi_group_ace_category_values') if 'itsi_group_ace_text_values' in data: update_info['itsi_group_ace_text_values'] = data.get('itsi_group_ace_text_values') return update_info def is_valid_drilldown(self, drilldown): """ Validation for drilldown link Must have name and the link And all values must be a string @type drilldown: dict @param drilldown: drilldown to be added @rtype: bool @return: True or false according to validation. """ if type(drilldown) is not dict: return False VALID_FIELD = ['name', 'link'] for field in VALID_FIELD: if field not in drilldown: return False if not drilldown.get(field): return False if type(drilldown.get(field)) is not str: return False return True def _clean_drilldown(self, drilldown): """ Remove all non-whitelisted fields from drilldown dict @type drilldown: dict @param drilldown: drilldown to clean @rtype: dict @return: cleaned drilldown """ whitelisted_fields = [ 'name', 'link' ] for key in list(drilldown.keys()): if key not in whitelisted_fields: del drilldown[key] return drilldown def _find_drilldown(self, drilldown_list, drilldown): """ Find drilldown in drilldown list by name @type drilldown_list: list @param drilldown_list: list of drilldowns @type drilldown: dict @param drilldown: drilldown to find @rtype: int @return: index of found drilldown in drilldown list """ for index, dd in enumerate(drilldown_list): if dd['name'] == drilldown['name']: return index return None