# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. """ This module deals with Notable Event Actions. Examples of a Notable Event Action are: `ping host` or `send email` """ import itsi_py3 import json import time import copy import uuid import random import re import splunk.rest as splunk_rest import splunk.search as splunk_search from splunk import ResourceNotFound from splunk.util import safeURLQuote, normalizeBoolean from ITOA.event_management.notable_event_utils import ActionDispatchConfiguration from ITOA.event_management.notable_event_utils import NotableEventActionException from ITOA.itoa_common import get_conf from ITOA.setup_logging import logger as itsi_logger from ITOA.storage.itoa_storage import ITOAStorage from ITOA.version_check import VersionCheck from .base_event_management import time_function_call from .notable_event_ticketing import ExternalTicket from .notable_event_utils import Audit from .notable_event_actions_scheduler import Job, NotableEventActionsScheduler ACTION_TYPE_MANIFEST = { 'external_ticket': ExternalTicket } class NotableEventAction(object): """ Represents a Notable Event Action """ BATCH_SIZE = 250 # update 250 events at a time def __init__(self, session_key, app='SA-ITOA', owner='nobody', logger=None, audit_token_name='Auto Generated ITSI Notable Index Audit Token', action_dispatch_config=None, is_from_action_queue=False, **kwargs): """ Notable event actions to be perform @type session_key: basestring @param session_key: session key @type app: basestring or str @param app: app name @type owner: basestring or str @param owner: owner name @type logger: object @param logger: logger @type audit_token_name: basestring @param audit_token_name: audit token name @type action_dispatch_config: ActionDispatchConfiguration @param action_dispatch_config: the setting for hybrid action dispatch @type is_from_action_queue: boolean @param is_from_action_queue: whether action is dispatched from action queue or rest. @type kwargs: dict @param kwargs: extra params @rtype: instance of class @return: object """ self.session_key = session_key self.master_session_key = session_key self.action_dispatch_config = action_dispatch_config self.host_base_uri = '' if self.action_dispatch_config: self.master_session_key = action_dispatch_config.get_master_host_session_key() self.host_base_uri = action_dispatch_config.remote_ea_mgmt_uri self.owner = owner self.app = app self.conf_file_name = 'notable_event_actions' self.logger = logger if logger else itsi_logger self.action_queue_storage = ITOAStorage(collection='itsi_notable_event_actions_queue') self.is_from_action_queue = is_from_action_queue # Dict which hold configuration self.configuration = self.get_configuration() self.action_app_configuration = {} # Dict which hold all accessible actions self.all_actions = self.get_all_actions() self.audit = Audit(self.session_key, audit_token_name=audit_token_name) self.scheduler = NotableEventActionsScheduler(self.configuration) # The field name for saving policy_id in event, for internal use to generate audit logs. self.policy_id_field = '_itsi_policy_id_' self.show_activity = True def get_configuration(self, conf_file_name=None, app=None): """ Get configured notable event actions @rtype: dict @return: notable event actions, key'ed by action name, value is a blob """ conf_file_name = conf_file_name if conf_file_name else self.conf_file_name app = app if app else self.app rval = get_conf(self.session_key, conf_file_name, search='disabled=0', count=-1, app=app) response = rval.get('response') if response.status != 200: self.logger.error( 'Failed to fetch configuration file=`%s`, rval=`%s`', self.conf_file_name, rval ) raise NotableEventActionException('Failed to fetch data using url="%s".' % self.conf_file_name) content = rval.get('content') content = json.loads(content) configuration = {} for entry in content.get('entry', []): configuration[entry.get('name')] = entry.get('content', {}) return configuration def get_actions(self): """ Get actions @rtype: dict @return: list of action which user can perform on notable """ return self._get_available_action() def get_action(self, action_name): """ Get any given action @type action_name - action name @param action_name - action name @return: dict - Action information """ if action_name is None: message = "Invalid action name" self.logger.error(message) raise ValueError(message) if action_name not in self.all_actions: self.logger.error("Action does not exist for this user") raise ResourceNotFound("Action {0} does not exist for this user {1}.".format(action_name, self.owner)) return self.all_actions.get(action_name) def _get_available_action(self): """ Compare action list against accessible actions for the app @rtype: list @return: notable actions to perform """ action_list = list(self.configuration.keys()) # Check actions are valid in alert_actions.conf valid_actions = self._is_valid_actions(action_list) # Decorate the configuration from notable_event_actions.conf into the valid actions for action in valid_actions: action['is_group_compatible'] = self.configuration[action['action_name']].get('is_group_compatible', 0) action['is_bulk_compatible'] = self.configuration[action['action_name']].get('is_bulk_compatible', 0) action['execute_once_per_group'] = self.configuration[action['action_name']].get( 'execute_once_per_group', 0 ) action['execute_in_sync'] = self.configuration[action['action_name']].get('execute_in_sync', 0) action['type'] = self.configuration[action['action_name']].get('type', '') action['refresh_impact_tab'] = self.configuration[action['action_name']].get('refresh_impact_tab', 0) return valid_actions def _is_valid_actions(self, actions_list): """ Check if give action is accessible by this app and is a valid action return valid action only @type actions_list: list @param actions_list: actions list to compare against accessible actions @rtype: list of dict @return: return valid action list where each item contain action_name, label """ valid_actions = [] for action in actions_list: if action in self.all_actions and not self.all_actions[action].get('disabled', False): valid_actions.append({'action_name': action, 'label': self.all_actions.get(action, {}).get('label', action)}) else: self.logger.warning('Provided action=%s is not a valid action ' 'or is disabled in alert_actions.conf', action) return valid_actions def _is_sim_add_on_installed(self): try: uri_string = '/servicesNS/' + self.owner + '/' + self.app + '/apps/local/splunk_ta_sim' uri = safeURLQuote(uri_string) res, contents = splunk_rest.simpleRequest(uri, getargs={'output_mode': 'json', 'count': '0'}, sessionKey=self.session_key) except Exception: return False else: return True if res.status == 200 else False def get_all_actions(self, is_get_full_email=False): """ Get all accessible custom modular alerts @type is_get_full_email: basestring @param is_get_full_email - flag to pull full email content @rtype: dict @return: return valid accessible objects """ uri_string = '/servicesNS/' + self.owner + '/' + self.app + '/alerts/alert_actions' if is_get_full_email: uri_string += '/email' uri = safeURLQuote(uri_string) res, contents = splunk_rest.simpleRequest(uri, getargs={'output_mode': 'json', 'count': '-1'}, sessionKey=self.session_key) # itsi_event_action_clear_sim_incidents is built as a native integration. # It should be available as a notable action only if the sim add-on is installed sim_add_on_installed = self._is_sim_add_on_installed() actions = {} if res.status == 200: json_contents = json.loads(contents) for entry in json_contents.get('entry', []): if entry.get('name') not in actions: if entry.get('name') == 'itsi_event_action_clear_sim_incidents' and \ sim_add_on_installed: actions[entry.get('name')] = entry.get('content') elif entry.get('name') != 'itsi_event_action_clear_sim_incidents': actions[entry.get('name')] = entry.get('content') else: self.logger.error('Failed to get all accessible notable events, uri=%s, response="%s", content="%s"', uri, res, contents) # Overload the email action with things we can actually pass to the sendemail search command if not is_get_full_email and 'email' in actions: actions['email'] = { 'label': 'Send email', 'content_type': '', 'to': '', 'cc': '', 'bcc': '', 'priority': '', 'subject': '', 'message': '' } self.logger.debug('All actions: %s', list(actions.keys())) return actions def get_command_params(self, action_name, data, is_group=False): """ Applicable only to sendalert/sendemail Given input data, extract command parameters for given action_name. @type action_name: str @param action_name: action name you wish to execute @type data: basestring @param data: given input data """ # TODO ITOA-5104: modularize this method. It's really big. action_content = self.all_actions.get(action_name) self.logger.debug('action_content=`%s`\ndata=`%s`', action_content, data) params = '' field_prefix = 'action' if isinstance(data, itsi_py3.string_type): data = json.loads(data) if not isinstance(data, dict): raise TypeError('Expecting a valid dict for data. Received="%s". Type="%s".' % ( data, type(data).__name__)) # first work on action_content. this is the set of key/values we are # working with from the alert_actions endpoint. long_prefix = field_prefix + '.' + action_name for key in action_content.keys(): short_field_name = field_prefix + '.' + key full_field_name = long_prefix + '.' + key # data either contain full name or only field name,we support for both if full_field_name in data: params += ' %s="%s"' % (key, str(data[full_field_name]).replace('"', '\\"')) elif short_field_name in data: params += ' %s="%s"' % (key, str(data[short_field_name]).replace('"', '\\"')) elif key in data: params += ' %s="%s"' % (key, str(data[key]).replace('"', '\\"')) else: self.logger.debug('Either %s is not a valid field or does not exist in the request.', key) # finally we could also have alert params sitting inside the data # this is entirely dependent on how the alert action is defined. # i.e parameters are mentioned in .conf.spec file or the # conf file i.e. .conf for key in data.keys(): if key is None: continue short_key = key if key.startswith(long_prefix): short_key = key[len(long_prefix) + 1:] # remove . too elif key.startswith(field_prefix): short_key = key[len(field_prefix) + 1:] # remove . too # Make sure we did not add key earlier if short_key in action_content or key in action_content: continue params += ' %s="%s"' % (short_key, str(data[key]).replace('"', '\\"')) # certain actions viz, those corresponding to the creation/updating of an external # ticket mandate us to work with a correlation id being passed in along # with other parameters. Enforce the same here. If none is given or if # correlation id value is an empty string, we will append # $result.event_id$ as correlation id action_config = self.configuration.get(action_name, {}) action_type = action_config.get('type', '') if action_type == 'external_ticket' and action_type in ACTION_TYPE_MANIFEST: obj = ACTION_TYPE_MANIFEST[action_type] params = obj.curate_params(params, action_name, action_config, self.logger, is_group=is_group) self.logger.debug('action params for command=%s', params) return params def get_group_system_info(self, group_id): """ Gets the group info for the group id @type group_id: basestring @param group_id - the group id @rtype: dict @return: the system info for a group """ uri_string = '/servicesNS/nobody/' + self.app + \ '/storage/collections/data/itsi_notable_group_system/' + \ group_id uri = self.host_base_uri + safeURLQuote(uri_string) self.logger.info('Fetching group system info for episode id: {}'.format(group_id)) try: res, contents = splunk_rest.simpleRequest( uri, method='GET', getargs={'output_mode': 'json'}, sessionKey=self.master_session_key) except Exception: self.logger.warn('Failed to get group system info for episode id: %s', group_id) return None return json.loads(contents) def _get_action_queue_consumers(self): """ Returns list of registered action queue consumers. :return: """ return self.action_queue_storage.get_all( self.session_key, 'nobody', 'consumer_registration', host_base_uri=self.host_base_uri ) def move_action_to_action_queue(self, data, action_queue_consumers): """ Moves rest action to action queue. :param data: list of actions :param action_queue_consumers: list of action queue consumers :return: """ action = data[0] create_time = time.time() action_queue_job = { 'content': json.dumps(action), 'uri': '/servicesNS/nobody/SA-ITOA/event_management_interface/notable_event_actions/' + action['name'], 'create_time': create_time, 'object_type': 'action_queue_job', 'sub_object_type': 'notable_event_action', 'id': random.choice(action_queue_consumers)['_key'], 'timeout': create_time + 7200 } self.action_queue_storage.batch_save( self.session_key, 'nobody', [action_queue_job], objecttype='action_queue_job', host_base_uri=self.host_base_uri ) def execute_actions(self, data): """ Execute one or more action or actions @type data - dict or list (when data is list then action is executed in bulk) @param data: data data - when it is list then more than one event action is being perform data - is dict then only one action is being performed data structure would looks like this ids : [] -> list of events or group ids name: -> action name params: key:value pair for action parameters _is_sync: bool to check if action is sync or async _is_group: bool to check if action is being perform on group or not _group_data: list if event ids where action is perform if list is empty then action is being done on all events of the group earliest_time - earliest time latest_time - latest time @return: list of dict [{ sid: search id ids: [] list of events or group id where action is being perform action_name: name of action which is being performed }...] """ if not (isinstance(data, dict) or isinstance(data, list)): self.logger.error("Invalid data %s", data) raise NotableEventActionException("Invalid data so cannot perform actions.") if isinstance(data, dict): data = [data] if not self.action_dispatch_config: action_dispatch_config = ActionDispatchConfiguration(self.session_key, self.logger) else: action_dispatch_config = self.action_dispatch_config # keyword 'master' should be deprecated going forward -> ITSI-10666 if action_dispatch_config.ea_role == 'manager' or action_dispatch_config.ea_role == 'master' \ and not self.is_from_action_queue: action_queue_consumers = self._get_action_queue_consumers() if len(action_queue_consumers) > 0: self.move_action_to_action_queue(data, action_queue_consumers) return { 'ids': data[0]['ids'], 'action_name': data[0]['name'], 'msg': 'moved {} action to action queue'.format(data[0]['name']) } jobs_not_scheduled = [] for action_data in data: jobs_not_scheduled.extend(self.schedule_jobs(action_data)) result = [] try: for job in jobs_not_scheduled: res = self.handle_job_by_status(job) if res is not None: result.append(res) for job in self.scheduler.start_scheduler(self.check_action, self.execute_action): res = self.handle_job_by_status(job) if res is not None: result.append(res) except Exception as e: self.logger.exception(e) raise NotableEventActionException("Unexpected error when running action, please check log for details.") finally: self.scheduler.reset() return result def _get_policy_id_from_group_data(self, group_data): if not isinstance(group_data, dict) or 'event' not in group_data: return None event_data = group_data['event'] if not isinstance(event_data, dict) or self.policy_id_field not in event_data: return None return event_data[self.policy_id_field] def audit_job(self, job): """ Log the job's auditing information. @type job: Job @param job: Job to be audited """ bulk_data = [] activities = [] additional_ids = job.additional_ids if job.additional_ids is not None else [] activity_type = 'Action executed for episode' for eid in job.group_ids: bulk_data.append({ 'action_name': job.action_name, 'search_command': job.search_command, 'is_group': True, 'event_id': eid, 'itsi_policy_id': self._get_policy_id_from_group_data(job.group_data) }) activities.append('The action="%s" was executed successfully.' % job.action_name) # add additional event IDs to bulk_data to track action execution on other events in bulk selection for additional_eid in additional_ids: bulk_data.append({ 'action_name': job.action_name, 'search_command': job.search_command, 'is_group': True, 'event_id': additional_eid, 'itsi_policy_id': self._get_policy_id_from_group_data(job.group_data) }) activities.append('The action="%s" was executed successfully.' % job.action_name) self.audit.send_activity_to_audit_bulk(bulk_data, activities, activity_type) def handle_job_by_status(self, job): """ Handle job by its status @type job: Job @param job: Job to be handled @return: dict of failure result of the job, if the job is not failed, return None """ if job.status == Job.INIT: self.execute_action(job) self.audit_job(job) self.refresh_notable(job) self.logger.info( 'actionId="%s", actionName="%s", Status="Completed", FunctionName="actionExecution", ' 'actionInternalName="%s"', job.action_id, "notable_event_action", job.action_name) elif job.status == Job.DONE: self.audit_job(job) self.refresh_notable(job) self.logger.info( 'actionId="%s", actionName="%s", Status="Completed", FunctionName="actionExecution", ' 'actionInternalName="%s"', job.action_id, "notable_event_action", job.action_name) if job.status == Job.FAILED: self.logger.error( 'Actions (%s) failed to execute on: %s. Error: %s', job.action_name, job.group_ids, job.message ) self.logger.info( 'actionId="%s", actionName="%s", Status="Failed", FunctionName="actionExecution", ' 'actionInternalName="%s"', job.action_id, "notable_event_action", job.action_name) activity_type = 'Action failed for episode' bulk_data = [] activities = [] self.show_activity = not (job.error_message and isinstance(job.error_message, list) and any('error code 126' in error for error in job.error_message)) if self.show_activity: for i in job.group_ids: bulk_data.append({'action_name': job.action_name, 'is_group': True, 'event_id': i}) activities.append('The action="{}" failed with the following error: {}'.format( job.action_name, job.message) ) self.audit.send_activity_to_audit_bulk(bulk_data, activities, activity_type) return {'sid': None, 'ids': job.group_ids, 'action_name': job.action_name, 'error': job.message} else: return {'sid': None, 'ids': job.group_ids, 'action_name': job.action_name, 'error': job.error_message} return { 'sid': job.splunk_job.sid, 'ids': job.group_ids, 'action_name': job.action_name } def should_execute_once_per_group(self, action_config): """ For action on bulk events, given an action config should we execute only once per group? Or should be execute once each for an event in a group? @type action_config: dict @param action_config: values under a stanza in notable_event_actions.conf. @rtype: boolean @return: return True or False depending on what the config has. @raises TypeError when invalid type for input parameter. """ if not isinstance(action_config, dict): raise TypeError('"action_config" invalid type. Received="%s"' % type(action_config).__name__) self.logger.debug('Received config=%s', action_config) if 'execute_once_per_group' not in action_config: self.logger.debug( '`execute_once_per_group` not found.' ' Defaulting to True. Implies execute once for a group' ) return True return normalizeBoolean(action_config['execute_once_per_group']) def get_event_ids(self, data): """ Return episode ids to work on. @type data: dict @param data: incoming data for this action @rtype: (list, list) @return: (list of ids to operate on, list of additional ids to track but not operate on) @raises NotableEventActionException on bad request. """ action_name = data.get('name') action_config = self.configuration.get(action_name) if action_config is None: message = '"%s" must have some configuration. Received None' % action_name self.logger.error(message) raise NotableEventActionException(message) received_ids = data.get('ids') self.logger.debug('Received event_ids=%s Type=%s', received_ids, type(received_ids).__name__) self.logger.info('is_group=True. action_name=`%s` received=`%s`', action_name, received_ids) return received_ids, [] def should_execute_in_sync(self, data): """ Should action be executed in sync? We will always use the value in the action's conf file if `execute_in_sync` is present we will abide, else we will abide by the request issued to us by the UI. i.e. data.get('_is_sync', False) If no '_is_sync' is found in the request, we will default to `False` @type data: dict @param data: incoming data for action. @rtype: bool @return True if certain criterion are met. False otherwise. @raises TypeError if invalid incoming param """ if not isinstance(data, dict): self.logger.error('Invalid data %s', data) raise TypeError("Invalid data.") action_name = data.get('name') if any([not isinstance(action_name, itsi_py3.string_type), isinstance(action_name, itsi_py3.string_type) and not action_name.strip()]): message = 'Invalid action_name "%s"' % action_name self.logger.error(message) raise TypeError(message) action_config = self.configuration.get(action_name) if not action_config: message = '%s does not have any config' % action_name self.logger.error(message) raise NotableEventActionException(message) self.logger.debug('action name=%s config=%s', action_name, action_config) in_sync = normalizeBoolean(action_config.get('execute_in_sync', data.get('_is_sync', False))) self.logger.debug('should execute in sync=%s', in_sync) return in_sync def is_alt_search_command_available(self, action_name, action_config): """ Checks if configured alert action has an alternative search command configured @type data: action_name @param data: the executing action @type data: action_config @param data: the action configuration for the executing action @rtype: bool @return True if an alternative search command is configured, else False. """ app_name = action_config.get('app_name', None) supported_app_version = action_config.get('alt_command_supported_version', None) if action_config.get('alt_command', None) and supported_app_version and app_name: if action_name in self.action_app_configuration: app_conf = self.action_app_configuration[action_name] else: app_conf = self.action_app_configuration[action_name] = self.get_configuration( conf_file_name='app', app=app_name ) app_version = app_conf.get('launcher', None).get('version') if app_conf.get('launcher', None) else '0.0.0' if not VersionCheck.validate_version(app_version): app_version = self.extract_numeric_version(app_version) if VersionCheck.compare(app_version, supported_app_version) >= 0: return True return False def refresh_notable(self, job): """ Refresh given notables if applicable. @type job: Job @param job: the action job to be refreshed @returns nothing """ action_name = job.action_name action_config = self.configuration.get(action_name) policy_id = self._get_policy_id_from_group_data(job.group_data) # Refresh is applicable only if there is a key `type` in your stanza # and whose value value exists in `ACTION_TYPE_MANIFEST` # Ex: In notable_event_actions.conf: # [snow_incident] # ... # type = external_ticket # ... # and here `external_ticket` exists in our ACTION_TYPE_MANIFEST if ('type' not in action_config or action_config.get('type') not in ACTION_TYPE_MANIFEST or self.is_alt_search_command_available(action_name, action_config)): self.logger.info('Refresh is not applicable for `%s`.', action_name) return self.logger.info('Refreshing notable events=`%s`.', job.group_ids) obj_type = ACTION_TYPE_MANIFEST[action_config['type']] obj_type.do_refresh( self.session_key, self.logger, action_name, job.group_ids, action_config, action_data=job.params, itsi_policy_id=policy_id, action_dispatch_config=self.action_dispatch_config ) self.logger.info('Refresh completed. Action executed on=`%s`', job.group_ids) def schedule_jobs(self, data): """ Schedule jobs for running actions. The jobs that is not being scheduled are returned back. Note: the async actions are not scheduled because we can just run without waiting for the completion. @param data: data which hold action information for schema please refer execute_actions data structure would looks like this ids : list of events or group ids name: string action name params: dict key/value pair for action parameters _is_sync: bool to check if action is sync or async _is_group: bool to check if action is being perform on group or not _group_data: list if event ids where action is perform if list is empty then action is being done on all events of the group earliest_time - earliest time latest_time - latest time @return: list of action jobs not scheduled. """ if not isinstance(data, dict): self.logger.error("Invalid data %s", data) raise NotableEventActionException("Invalid data so cannot perform actions.") result = [] action_name = data.get('name') action_id = data.get('action_id', None) self.logger.info( 'actionId="%s", actionName="%s", Status="Started", FunctionName="actionExecution", actionInternalName="%s"', action_id, "notable_event_action", action_name ) if self.configuration.get(action_name) is None and \ self.action_dispatch_config and \ self.action_dispatch_config.ea_role == 'executor': error_msg = 'ITSI episode action {} ( actionId={} ) was dispatched to the ' \ 'executor, but the action has not been ' \ 'configured on the executor host.'.format(action_name, action_id) self.logger.error(error_msg) result.append(Job(action_name, action_id, status=Job.FAILED, message=error_msg)) return result params = data.get('params', {}) if isinstance(params, itsi_py3.string_type): params = json.loads(params) ids, additional_ids = self.get_event_ids(data) is_sync = self.should_execute_in_sync(data) # If user want to perform operation on limit events from group then # he can specify those ids in _group_action. Here # _group_data = { # event_ids: [event1, event2] so on .. / # } # Note - do not set group data if you want to run action on all events of group group_data = data.get('_group_data', {}) id_policy_map = data.get('id_policy_map', {}) earliest = data.get('earliest') latest = data.get('latest') num_of_groups = len(data['ids']) action_config = self.configuration.get(action_name, {}) is_bulk_action_iterative = normalizeBoolean(action_config.get('run_bulk_action_iteratively', False)) action_retries = int(action_config.get('max_retries', 0)) if is_bulk_action_iterative and num_of_groups > 1: # The snow_incident alert action currently only generates one incident per call # to bulk generate incidents the sendalert must be called iteratively bulk_max = int(action_config.get('bulk_max', 25)) if num_of_groups > bulk_max: error_msg = "Too many episodes selected to perform action {} ( actionId={} ). " \ "Select {} or fewer episodes and try again.".format(action_name, action_id, bulk_max) self.logger.error(error_msg) result.append(Job(action_name, action_id, status=Job.FAILED, message=error_msg)) return result for i in data['ids']: if 'event' not in group_data and i in id_policy_map: # Put itsi_policy_id to the Job object group_data['event'] = {self.policy_id_field: id_policy_map.get(i)} cur_job = Job(action_name, action_id, Job.INIT, None, [i], params, group_data, is_sync, earliest, latest, additional_ids, action_retries) if is_sync: self.scheduler.waiting_queue_offer(cur_job) else: result.append(cur_job) elif len(ids) > 0: if 'event' not in group_data and ids[0] in id_policy_map: # Put itsi_policy_id to the Job object, # since these actions are only for one group, they will map to one policy. group_data['event'] = {self.policy_id_field: id_policy_map.get(ids[0])} # perform action in batch if ids are more than a given limit index = 0 while index < len(ids): batch_size = self.BATCH_SIZE if len(ids) > index + self.BATCH_SIZE else len(ids) - index batch = ids[index: index + batch_size] cur_job = Job(action_name, action_id, Job.INIT, None, batch, params, group_data, is_sync, earliest, latest, additional_ids, action_retries) if is_sync: self.scheduler.waiting_queue_offer(cur_job) else: result.append(cur_job) self.logger.debug('Successfully initialized job for action=%s, actionId=%s, ids=%s', action_name, action_id, batch) index += batch_size return result def curate_default_prepend_command(self, prepend_command, ids, is_group, action_name): """ For certain action types, we need to curate the prepend_command. See specific implementations for more details. @type prepend_command: basestring @param prepend_command: prepend_command so far @type ids: list @param ids: event ids we are working on @type is_group: boolean @param is_group: indicates if we are working on a Notable Event group @type action_name: basestring @param action_name: name of the action being executed @rtype: basestring @return: curated default prepand command for given action """ action_config = self.configuration.get(action_name, {}) action_type = action_config.get('type', '') if action_type not in ACTION_TYPE_MANIFEST: self.logger.debug( 'No curation required. Command=`%s` action_type=`%s` action_name=`%s`', prepend_command, action_type, action_name ) return prepend_command self.logger.debug( 'pre-curate action_type=`%s` action_name=`%s` default_prepend_command=`%s`', action_type, action_name, prepend_command ) obj = ACTION_TYPE_MANIFEST.get(action_type) try: prepend_command = obj.curate_search_prepend_command( prepend_command, ids, is_group, action_name, action_config, self.logger ) except AttributeError: self.logger.warning('Attribute error: `curate_search_prepend_command` for `%s`. Will pass.', str(obj)) self.logger.debug('post-curate default_prepend_command=`%s`', prepend_command) return prepend_command def get_default_prepend_command(self, ids, action_name, is_group=False, event=None): """ get the default prepend command for given ids and action name @type ids: list @param ids: event or group id of notable events @type action_name: basestring @param action_name: name of the action that is being executed @type is_group: bool @param is_group: set to true when action is being done on a group. @type event: dict @param event: the event to run the action on @rtype: basestring @return: the default prepend command """ if not isinstance(ids, list) or not ids: message = 'Invalid ids. Expecting valid list. Received="%s".' % ids self.logger.error(message) raise TypeError(message) group_filter_command = '' for eid in ids: group_filter_command += ' itsi_group_id="%s" OR' % eid # Remove the trailing `OR` that we blindly appended earlier. group_filter_command = group_filter_command.rstrip('OR') if event and self.policy_id_field not in event: event_string = json.dumps(json.dumps(event)) # escape double quotes prepend_command = '| stats count | eval makeresultsevent={} | ' \ 'spath input=makeresultsevent | fields - makeresultsevent, count '.format(event_string) else: prepend_command = 'search `itsi_event_management_group_index` {0} ' \ '| dedup itsi_group_id | fields * '.format(group_filter_command) return self.curate_default_prepend_command(prepend_command, ids, is_group, action_name) def get_alert_command(self, action_name, action_data): """ Get the appropriate alert command to execute an action. i.e. sendalert et al. @type action_name: basestring @param action_name: the name of the action to execute @type action_data: dict @param action_data: data sent with the action, presumably the parameters @rtype: basestring @return: the alert command """ action_content = self.all_actions.get(action_name) if action_name == 'email': alert_command = 'sendemail' elif action_name == 'script': alert_command = action_content.get('command') # file name token replacement if 'filename' not in action_data and 'action.script.filename' not in action_data: self.logger.error('file name is required to run a script') raise ValueError('Filename is missing in the request.') else: field_prefix = 'action.' + action_name token_name = '$' + field_prefix + '.filename$' filename = action_data.get('filename') or action_data.get('action.script.filename') alert_command = alert_command.replace(token_name, filename) elif action_name == 'rss': alert_command = action_content.get('command') else: alert_command = 'sendalert' return alert_command def synchronize_search(self, search_job, search_command): """ Synchronizing an existing search implies, waiting for search to complete and verifying that it ran successfully. @type search_job: SearchJob @param search_job: Executed search job @type search_command: basestring @param search_command: the executed command @rtype: None @returns Nothing @raise NotableEventActionException """ splunk_search.waitForJob(search_job) if search_job.isFailed: raise NotableEventActionException('%s search failed. Refer to the search.log at "%s".' % (search_command, search_job.links.get('search.log'))) # Check messages as well is_error = False error_msg = '' for msg in search_job.messages: if not msg: pass if isinstance(msg, itsi_py3.string_type) and (msg.upper() == 'ERROR' or msg.upper() == 'FATAL'): error_msg = str(msg) is_error = True break if is_error: message = 'Search failed with message="%s"' % error_msg raise NotableEventActionException(message) def run_search(self, search_command, is_sync, earliest_time=None, latest_time=None): """ Run a splunk search. @type search_command: basestring @param search_command: the command to execute @type is_sync: boolean @param is_sync: indicates if search should be run synchronously @type earliest_time: basestring @param earliest_time: earliest time @type latest_time: basestring @param latest_time: latest time @rtype: basestring @returns: job's search id. """ job = splunk_search.dispatch( search=search_command, sessionKey=self.session_key, owner=self.owner, namespace=self.app, earliestTime=earliest_time, latestTime=latest_time ) if is_sync: t = time.time() self.synchronize_search(job, search_command) diff = time.time() - t diff_string = str(round(diff, 2)) self.logger.info( 'source_component="notable_event_actions", search_command=\'%s\', duration_in_sec="%s", job_sid="%s"', search_command, diff_string, str(job.sid)) return job.sid def get_search_command(self, action_name, action_data, ids, is_group, event=None): """ Get search command for an action @type action_name: basestring @param action_name: the action name @type action_data: dict @param action_data: action data in the request @type ids: list @param ids: list of IDs @type is_group: bool @param is_group: True if the IDs are group IDs, otherwise False @type event: dict @:param event: the event to run action on @rtype: basestring @returns: search command string """ alert_command = self.get_alert_command(action_name, action_data) default_prepend_command = self.get_default_prepend_command(ids, action_name, is_group, event) group_state_lookup = ' | `itsi_notable_event_actions_temp_state_values` | ' \ '`itsi_notable_group_lookup` | `itsi_notable_event_actions_coalesce_state_values`' if self.host_base_uri: if event and 'event_id' in event: search_command = ( '{} | appendcols [earemotesearch remote_spl="| stats count | fields - count | eval' ' itsi_group_id=\\\"{}\\\" | `itsi_notable_group_lookup`"]'.format(default_prepend_command, ids[0]) ) else: search_command = '| earemotesearch remote_spl="{0}"'.format( default_prepend_command.replace('"', r'\"') + group_state_lookup ) else: search_command = action_data.get('prepand_search_command', default_prepend_command) + group_state_lookup if action_name == 'jira_cloud_issue': search_command += (' | eval jira_ticket_system="Jira Cloud" | lookup itsi_notable_event_external_ticket ' + 'event_id as itsi_group_id tickets.ticket_system as jira_ticket_system OUTPUTNEW ' + 'tickets.ticket_system as ticketing_system tickets.ticket_id as ticket_id ' + '| eval jira_ticket_id= if(ticketing_system="Jira Cloud", ticket_id, " ")') if not search_command.rstrip(' ').endswith('|'): search_command += ' | ' if action_name in ['script', 'rss']: search_command += ' %s ' % alert_command else: command = action_name # check if send alert command has a custom wrapper action_config = self.configuration.get(action_name) if self.is_alt_search_command_available(action_name, action_config): command = action_config.get('alt_command') search_command += ' %s "%s" ' % (alert_command, command) # get params and add it to search command search_command += self.get_command_params(action_name, action_data, is_group) # Workaround for issue SPL-128836 if action_name in ['email']: email_data = self.get_all_actions(True).get('email', {}) if email_data.get('mailserver'): self.logger.info('Adding mail server=%s to email command', email_data.get('mailserver')) search_command += ' server="%s" ' % email_data.get('mailserver') if action_name == 'remedy_incident_rest': account_name = action_data.get('action.remedy_incident_rest.param.account', None) if account_name: self.logger.info('Adding account to the remedy_incident_rest command. account=%s', account_name) search_command += ' account="%s" ' % account_name else: self.logger.error('Account must be provided if running remedy_incident_rest action') return search_command def execute_action(self, job): """ Invoke command by passing given Job object @type job: Job @param job: The action job to be executed. @rtype: bool @returns True if the action stared successfully, otherwise False. """ action_name = job.action_name data = job.params ids = job.group_ids event = job.group_data.get('event') earliest_time = job.earliest latest_time = job.latest if not isinstance(action_name, itsi_py3.string_type): message = 'Action name is not specified="%s".' % action_name self.logger.error(message) raise ValueError(message) valid_actions = [action.get('action_name') for action in self.get_actions()] if action_name not in valid_actions: self.logger.error('Invalid action provided=%s', action_name) raise NotableEventActionException( 'Invalid action. This action: "%s"' ' is not allowed for notable event.' % action_name ) # All actions are executed as a Splunk search. Surprise!!! search_command = self.get_search_command(action_name, data, ids, True, event) self.logger.info( 'Generated search command=`%s` for action=%s, actionId=%s with earliest_time=%s, latest_time=%s', search_command, action_name, job.action_id, earliest_time, latest_time ) # Just kick start the action, don't need to sync with the results try: job.search_command = search_command job.splunk_job = splunk_search.dispatch( search=search_command, sessionKey=self.session_key, owner=self.owner, namespace=self.app, earliestTime=earliest_time, latestTime=latest_time ) except Exception as e: self.logger.exception(e) self.logger.error( 'actionId="%s", actionName="%s", Status="Failed", FunctionName="actionExecution", ' 'actionInternalName="%s", exception="%s"', job.action_id, "notable_event_action", action_name, e) if hasattr(e, 'message'): job.message = e.message job.status = Job.FAILED return False job.status = Job.STARTED return True def check_action(self, job): """ Verify if the action job is completed or not @type job: Job @param job: The action job to be verified. @rtype: bool @returns True if the action is completed (succeeded or failed), False if the job is not completed. """ if job.splunk_job is None: self.logger.warn('Splunk Job object is none for actionName=%s, actionId=%s, marking it as done.' % (job.action_name, job.action_id)) job.status = Job.FAILED job.message = 'Splunk job object not found.' return True if job.splunk_job.isDone: if job.splunk_job.isFailed: job.error_message = job.splunk_job.messages['error'] total_retries = int(self.configuration[job.action_name].get('max_retries', 0)) sleep_time = int(self.configuration[job.action_name].get('retry_interval', 0)) if job.action_retries > 0: time.sleep(sleep_time) job.action_retries -= 1 self.logger.info( 'The notable event action failed for actionId=%s, search_command=\'%s\', ' 'job_sid="%s", job_action retries="%s"', job.action_id, job.search_command, str(job.splunk_job.sid), (total_retries - job.action_retries)) return False error_msg = '%s search failed for actionId=%s. Refer to the search.log at "%s".' %\ (job.action_id, job.search_command, job.splunk_job.links.get('search.log')) job.status = Job.FAILED job.message = error_msg self.logger.error(error_msg) else: job.status = Job.DONE self.logger.info( 'Notable event action command runs successfully for actionId=%s, search_command=\'%s\', ' 'job_sid="%s"', job.action_id, job.search_command, str(job.splunk_job.sid)) return True return False def extract_numeric_version(self, version): """ Extract the numeric dotted version from the given version There may be cases where a [non-ITSI] app (such as ServiceNow Add-on) may have version with alphanumeric characters at the end but when we are doing version number comparison, we only need the beginning number part. Example: "5.5b" will return "5.5", "6.3.0Rf44f5cb" will return "6.3.0" @type version: string @param version: version number """ numeric_version_regex = r'\d+(\.\d+)*' return re.search(numeric_version_regex, version).group(0)