You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1240 lines
52 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
"""
This module deals with Notable Event Actions.
Examples of a Notable Event Action are: `ping host` or `send email`
"""
import itsi_py3
import json
import time
import copy
import uuid
import random
import re
import splunk.rest as splunk_rest
import splunk.search as splunk_search
from splunk import ResourceNotFound
from splunk.util import safeURLQuote, normalizeBoolean
from ITOA.event_management.notable_event_utils import ActionDispatchConfiguration
from ITOA.event_management.notable_event_utils import NotableEventActionException
from ITOA.itoa_common import get_conf
from ITOA.setup_logging import logger as itsi_logger
from ITOA.storage.itoa_storage import ITOAStorage
from ITOA.version_check import VersionCheck
from .base_event_management import time_function_call
from .notable_event_ticketing import ExternalTicket
from .notable_event_utils import Audit
from .notable_event_actions_scheduler import Job, NotableEventActionsScheduler
ACTION_TYPE_MANIFEST = {
'external_ticket': ExternalTicket
}
class NotableEventAction(object):
"""
Represents a Notable Event Action
"""
BATCH_SIZE = 250 # update 250 events at a time
def __init__(self, session_key, app='SA-ITOA', owner='nobody', logger=None,
audit_token_name='Auto Generated ITSI Notable Index Audit Token',
action_dispatch_config=None, is_from_action_queue=False, **kwargs):
"""
Notable event actions to be perform
@type session_key: basestring
@param session_key: session key
@type app: basestring or str
@param app: app name
@type owner: basestring or str
@param owner: owner name
@type logger: object
@param logger: logger
@type audit_token_name: basestring
@param audit_token_name: audit token name
@type action_dispatch_config: ActionDispatchConfiguration
@param action_dispatch_config: the setting for hybrid action dispatch
@type is_from_action_queue: boolean
@param is_from_action_queue: whether action is dispatched from action queue or rest.
@type kwargs: dict
@param kwargs: extra params
@rtype: instance of class
@return: object
"""
self.session_key = session_key
self.master_session_key = session_key
self.action_dispatch_config = action_dispatch_config
self.host_base_uri = ''
if self.action_dispatch_config:
self.master_session_key = action_dispatch_config.get_master_host_session_key()
self.host_base_uri = action_dispatch_config.remote_ea_mgmt_uri
self.owner = owner
self.app = app
self.conf_file_name = 'notable_event_actions'
self.logger = logger if logger else itsi_logger
self.action_queue_storage = ITOAStorage(collection='itsi_notable_event_actions_queue')
self.is_from_action_queue = is_from_action_queue
# Dict which hold configuration
self.configuration = self.get_configuration()
self.action_app_configuration = {}
# Dict which hold all accessible actions
self.all_actions = self.get_all_actions()
self.audit = Audit(self.session_key, audit_token_name=audit_token_name)
self.scheduler = NotableEventActionsScheduler(self.configuration)
# The field name for saving policy_id in event, for internal use to generate audit logs.
self.policy_id_field = '_itsi_policy_id_'
self.show_activity = True
def get_configuration(self, conf_file_name=None, app=None):
"""
Get configured notable event actions
@rtype: dict
@return: notable event actions, key'ed by action name, value is a blob
"""
conf_file_name = conf_file_name if conf_file_name else self.conf_file_name
app = app if app else self.app
rval = get_conf(self.session_key, conf_file_name, search='disabled=0', count=-1, app=app)
response = rval.get('response')
if response.status != 200:
self.logger.error(
'Failed to fetch configuration file=`%s`, rval=`%s`', self.conf_file_name, rval
)
raise NotableEventActionException('Failed to fetch data using url="%s".' % self.conf_file_name)
content = rval.get('content')
content = json.loads(content)
configuration = {}
for entry in content.get('entry', []):
configuration[entry.get('name')] = entry.get('content', {})
return configuration
def get_actions(self):
"""
Get actions
@rtype: dict
@return: list of action which user can perform on notable
"""
return self._get_available_action()
def get_action(self, action_name):
"""
Get any given action
@type action_name - action name
@param action_name - action name
@return: dict - Action information
"""
if action_name is None:
message = "Invalid action name"
self.logger.error(message)
raise ValueError(message)
if action_name not in self.all_actions:
self.logger.error("Action does not exist for this user")
raise ResourceNotFound("Action {0} does not exist for this user {1}.".format(action_name, self.owner))
return self.all_actions.get(action_name)
def _get_available_action(self):
"""
Compare action list against accessible actions for the app
@rtype: list
@return: notable actions to perform
"""
action_list = list(self.configuration.keys())
# Check actions are valid in alert_actions.conf
valid_actions = self._is_valid_actions(action_list)
# Decorate the configuration from notable_event_actions.conf into the valid actions
for action in valid_actions:
action['is_group_compatible'] = self.configuration[action['action_name']].get('is_group_compatible', 0)
action['is_bulk_compatible'] = self.configuration[action['action_name']].get('is_bulk_compatible', 0)
action['execute_once_per_group'] = self.configuration[action['action_name']].get(
'execute_once_per_group', 0
)
action['execute_in_sync'] = self.configuration[action['action_name']].get('execute_in_sync', 0)
action['type'] = self.configuration[action['action_name']].get('type', '')
action['refresh_impact_tab'] = self.configuration[action['action_name']].get('refresh_impact_tab', 0)
return valid_actions
def _is_valid_actions(self, actions_list):
"""
Check if give action is accessible by this app and is a valid action
return valid action only
@type actions_list: list
@param actions_list: actions list to compare against accessible actions
@rtype: list of dict
@return: return valid action list where each item contain action_name, label
"""
valid_actions = []
for action in actions_list:
if action in self.all_actions and not self.all_actions[action].get('disabled', False):
valid_actions.append({'action_name': action,
'label': self.all_actions.get(action, {}).get('label', action)})
else:
self.logger.warning('Provided action=%s is not a valid action '
'or is disabled in alert_actions.conf', action)
return valid_actions
def _is_sim_add_on_installed(self):
try:
uri_string = '/servicesNS/' + self.owner + '/' + self.app + '/apps/local/splunk_ta_sim'
uri = safeURLQuote(uri_string)
res, contents = splunk_rest.simpleRequest(uri, getargs={'output_mode': 'json', 'count': '0'},
sessionKey=self.session_key)
except Exception:
return False
else:
return True if res.status == 200 else False
def get_all_actions(self, is_get_full_email=False):
"""
Get all accessible custom modular alerts
@type is_get_full_email: basestring
@param is_get_full_email - flag to pull full email content
@rtype: dict
@return: return valid accessible objects
"""
uri_string = '/servicesNS/' + self.owner + '/' + self.app + '/alerts/alert_actions'
if is_get_full_email:
uri_string += '/email'
uri = safeURLQuote(uri_string)
res, contents = splunk_rest.simpleRequest(uri, getargs={'output_mode': 'json', 'count': '-1'},
sessionKey=self.session_key)
# itsi_event_action_clear_sim_incidents is built as a native integration.
# It should be available as a notable action only if the sim add-on is installed
sim_add_on_installed = self._is_sim_add_on_installed()
actions = {}
if res.status == 200:
json_contents = json.loads(contents)
for entry in json_contents.get('entry', []):
if entry.get('name') not in actions:
if entry.get('name') == 'itsi_event_action_clear_sim_incidents' and \
sim_add_on_installed:
actions[entry.get('name')] = entry.get('content')
elif entry.get('name') != 'itsi_event_action_clear_sim_incidents':
actions[entry.get('name')] = entry.get('content')
else:
self.logger.error('Failed to get all accessible notable events, uri=%s, response="%s", content="%s"',
uri, res, contents)
# Overload the email action with things we can actually pass to the sendemail search command
if not is_get_full_email and 'email' in actions:
actions['email'] = {
'label': 'Send email',
'content_type': '',
'to': '',
'cc': '',
'bcc': '',
'priority': '',
'subject': '',
'message': ''
}
self.logger.debug('All actions: %s', list(actions.keys()))
return actions
def get_command_params(self, action_name, data, is_group=False):
"""
Applicable only to sendalert/sendemail
Given input data, extract command parameters for given action_name.
@type action_name: str
@param action_name: action name you wish to execute
@type data: basestring
@param data: given input data
"""
# TODO ITOA-5104: modularize this method. It's really big.
action_content = self.all_actions.get(action_name)
self.logger.debug('action_content=`%s`\ndata=`%s`', action_content, data)
params = ''
field_prefix = 'action'
if isinstance(data, itsi_py3.string_type):
data = json.loads(data)
if not isinstance(data, dict):
raise TypeError('Expecting a valid dict for data. Received="%s". Type="%s".' % (
data, type(data).__name__))
# first work on action_content. this is the set of key/values we are
# working with from the alert_actions endpoint.
long_prefix = field_prefix + '.' + action_name
for key in action_content.keys():
short_field_name = field_prefix + '.' + key
full_field_name = long_prefix + '.' + key
# data either contain full name or only field name,we support for both
if full_field_name in data:
params += ' %s="%s"' % (key, str(data[full_field_name]).replace('"', '\\"'))
elif short_field_name in data:
params += ' %s="%s"' % (key, str(data[short_field_name]).replace('"', '\\"'))
elif key in data:
params += ' %s="%s"' % (key, str(data[key]).replace('"', '\\"'))
else:
self.logger.debug('Either %s is not a valid field or does not exist in the request.', key)
# finally we could also have alert params sitting inside the data
# this is entirely dependent on how the alert action is defined.
# i.e parameters are mentioned in <alert_action>.conf.spec file or the
# conf file i.e. <alert_action>.conf
for key in data.keys():
if key is None:
continue
short_key = key
if key.startswith(long_prefix):
short_key = key[len(long_prefix) + 1:] # remove . too
elif key.startswith(field_prefix):
short_key = key[len(field_prefix) + 1:] # remove . too
# Make sure we did not add key earlier
if short_key in action_content or key in action_content:
continue
params += ' %s="%s"' % (short_key, str(data[key]).replace('"', '\\"'))
# certain actions viz, those corresponding to the creation/updating of an external
# ticket mandate us to work with a correlation id being passed in along
# with other parameters. Enforce the same here. If none is given or if
# correlation id value is an empty string, we will append
# $result.event_id$ as correlation id
action_config = self.configuration.get(action_name, {})
action_type = action_config.get('type', '')
if action_type == 'external_ticket' and action_type in ACTION_TYPE_MANIFEST:
obj = ACTION_TYPE_MANIFEST[action_type]
params = obj.curate_params(params, action_name, action_config, self.logger, is_group=is_group)
self.logger.debug('action params for command=%s', params)
return params
def get_group_system_info(self, group_id):
"""
Gets the group info for the group id
@type group_id: basestring
@param group_id - the group id
@rtype: dict
@return: the system info for a group
"""
uri_string = '/servicesNS/nobody/' + self.app + \
'/storage/collections/data/itsi_notable_group_system/' + \
group_id
uri = self.host_base_uri + safeURLQuote(uri_string)
self.logger.info('Fetching group system info for episode id: {}'.format(group_id))
try:
res, contents = splunk_rest.simpleRequest(
uri,
method='GET',
getargs={'output_mode': 'json'},
sessionKey=self.master_session_key)
except Exception:
self.logger.warn('Failed to get group system info for episode id: %s', group_id)
return None
return json.loads(contents)
def _get_action_queue_consumers(self):
"""
Returns list of registered action queue consumers.
:return:
"""
return self.action_queue_storage.get_all(
self.session_key,
'nobody',
'consumer_registration',
host_base_uri=self.host_base_uri
)
def move_action_to_action_queue(self, data, action_queue_consumers):
"""
Moves rest action to action queue.
:param data: list of actions
:param action_queue_consumers: list of action queue consumers
:return:
"""
action = data[0]
create_time = time.time()
action_queue_job = {
'content': json.dumps(action),
'uri': '/servicesNS/nobody/SA-ITOA/event_management_interface/notable_event_actions/' + action['name'],
'create_time': create_time,
'object_type': 'action_queue_job',
'sub_object_type': 'notable_event_action',
'id': random.choice(action_queue_consumers)['_key'],
'timeout': create_time + 7200
}
self.action_queue_storage.batch_save(
self.session_key, 'nobody', [action_queue_job], objecttype='action_queue_job',
host_base_uri=self.host_base_uri
)
def execute_actions(self, data):
"""
Execute one or more action or actions
@type data - dict or list (when data is list then action is executed in bulk)
@param data: data
data - when it is list then more than one event action is being perform
data - is dict then only one action is being performed
data structure would looks like this
ids : [] -> list of events or group ids
name: -> action name
params: key:value pair for action parameters
_is_sync: bool to check if action is sync or async
_is_group: bool to check if action is being perform on group or not
_group_data: list if event ids where action is perform if list is
empty then action is being done on all events of the group
earliest_time - earliest time
latest_time - latest time
@return: list of dict
[{
sid: search id
ids: [] list of events or group id where action is being perform
action_name: name of action which is being performed
}...]
"""
if not (isinstance(data, dict) or isinstance(data, list)):
self.logger.error("Invalid data %s", data)
raise NotableEventActionException("Invalid data so cannot perform actions.")
if isinstance(data, dict):
data = [data]
if not self.action_dispatch_config:
action_dispatch_config = ActionDispatchConfiguration(self.session_key, self.logger)
else:
action_dispatch_config = self.action_dispatch_config
# keyword 'master' should be deprecated going forward -> ITSI-10666
if action_dispatch_config.ea_role == 'manager' or action_dispatch_config.ea_role == 'master' \
and not self.is_from_action_queue:
action_queue_consumers = self._get_action_queue_consumers()
if len(action_queue_consumers) > 0:
self.move_action_to_action_queue(data, action_queue_consumers)
return {
'ids': data[0]['ids'],
'action_name': data[0]['name'],
'msg': 'moved {} action to action queue'.format(data[0]['name'])
}
jobs_not_scheduled = []
for action_data in data:
jobs_not_scheduled.extend(self.schedule_jobs(action_data))
result = []
try:
for job in jobs_not_scheduled:
res = self.handle_job_by_status(job)
if res is not None:
result.append(res)
for job in self.scheduler.start_scheduler(self.check_action, self.execute_action):
res = self.handle_job_by_status(job)
if res is not None:
result.append(res)
except Exception as e:
self.logger.exception(e)
raise NotableEventActionException("Unexpected error when running action, please check log for details.")
finally:
self.scheduler.reset()
return result
def _get_policy_id_from_group_data(self, group_data):
if not isinstance(group_data, dict) or 'event' not in group_data:
return None
event_data = group_data['event']
if not isinstance(event_data, dict) or self.policy_id_field not in event_data:
return None
return event_data[self.policy_id_field]
def audit_job(self, job):
"""
Log the job's auditing information.
@type job: Job
@param job: Job to be audited
"""
bulk_data = []
activities = []
additional_ids = job.additional_ids if job.additional_ids is not None else []
activity_type = 'Action executed for episode'
for eid in job.group_ids:
bulk_data.append({
'action_name': job.action_name,
'search_command': job.search_command,
'is_group': True,
'event_id': eid,
'itsi_policy_id': self._get_policy_id_from_group_data(job.group_data)
})
activities.append('The action="%s" was executed successfully.' % job.action_name)
# add additional event IDs to bulk_data to track action execution on other events in bulk selection
for additional_eid in additional_ids:
bulk_data.append({
'action_name': job.action_name,
'search_command': job.search_command,
'is_group': True,
'event_id': additional_eid,
'itsi_policy_id': self._get_policy_id_from_group_data(job.group_data)
})
activities.append('The action="%s" was executed successfully.' % job.action_name)
self.audit.send_activity_to_audit_bulk(bulk_data, activities, activity_type)
def handle_job_by_status(self, job):
"""
Handle job by its status
@type job: Job
@param job: Job to be handled
@return: dict of failure result of the job, if the job is not failed, return None
"""
if job.status == Job.INIT:
self.execute_action(job)
self.audit_job(job)
self.refresh_notable(job)
self.logger.info(
'actionId="%s", actionName="%s", Status="Completed", FunctionName="actionExecution", '
'actionInternalName="%s"',
job.action_id, "notable_event_action", job.action_name)
elif job.status == Job.DONE:
self.audit_job(job)
self.refresh_notable(job)
self.logger.info(
'actionId="%s", actionName="%s", Status="Completed", FunctionName="actionExecution", '
'actionInternalName="%s"',
job.action_id, "notable_event_action", job.action_name)
if job.status == Job.FAILED:
self.logger.error(
'Actions (%s) failed to execute on: %s. Error: %s', job.action_name, job.group_ids, job.message
)
self.logger.info(
'actionId="%s", actionName="%s", Status="Failed", FunctionName="actionExecution", '
'actionInternalName="%s"',
job.action_id, "notable_event_action", job.action_name)
activity_type = 'Action failed for episode'
bulk_data = []
activities = []
self.show_activity = not (job.error_message and isinstance(job.error_message, list) and any('error code 126' in error for error in job.error_message))
if self.show_activity:
for i in job.group_ids:
bulk_data.append({'action_name': job.action_name, 'is_group': True, 'event_id': i})
activities.append('The action="{}" failed with the following error: {}'.format(
job.action_name, job.message)
)
self.audit.send_activity_to_audit_bulk(bulk_data, activities, activity_type)
return {'sid': None,
'ids': job.group_ids,
'action_name': job.action_name,
'error': job.message}
else:
return {'sid': None,
'ids': job.group_ids,
'action_name': job.action_name,
'error': job.error_message}
return {
'sid': job.splunk_job.sid,
'ids': job.group_ids,
'action_name': job.action_name
}
def should_execute_once_per_group(self, action_config):
"""
For action on bulk events, given an action config should we execute
only once per group? Or should be execute once each for an event in a
group?
@type action_config: dict
@param action_config: values under a stanza in
notable_event_actions.conf.
@rtype: boolean
@return: return True or False depending on what the config has.
@raises TypeError when invalid type for input parameter.
"""
if not isinstance(action_config, dict):
raise TypeError('"action_config" invalid type. Received="%s"' % type(action_config).__name__)
self.logger.debug('Received config=%s', action_config)
if 'execute_once_per_group' not in action_config:
self.logger.debug(
'`execute_once_per_group` not found.'
' Defaulting to True. Implies execute once for a group'
)
return True
return normalizeBoolean(action_config['execute_once_per_group'])
def get_event_ids(self, data):
"""
Return episode ids to work on.
@type data: dict
@param data: incoming data for this action
@rtype: (list, list)
@return: (list of ids to operate on, list of additional ids to track but not operate on)
@raises NotableEventActionException on bad request.
"""
action_name = data.get('name')
action_config = self.configuration.get(action_name)
if action_config is None:
message = '"%s" must have some configuration. Received None' % action_name
self.logger.error(message)
raise NotableEventActionException(message)
received_ids = data.get('ids')
self.logger.debug('Received event_ids=%s Type=%s', received_ids, type(received_ids).__name__)
self.logger.info('is_group=True. action_name=`%s` received=`%s`', action_name, received_ids)
return received_ids, []
def should_execute_in_sync(self, data):
"""
Should action be executed in sync?
We will always use the value in the action's conf file
if `execute_in_sync` is present we will abide, else we will abide by the
request issued to us by the UI. i.e. data.get('_is_sync', False)
If no '_is_sync' is found in the request, we will default to `False`
@type data: dict
@param data: incoming data for action.
@rtype: bool
@return True if certain criterion are met. False otherwise.
@raises TypeError if invalid incoming param
"""
if not isinstance(data, dict):
self.logger.error('Invalid data %s', data)
raise TypeError("Invalid data.")
action_name = data.get('name')
if any([not isinstance(action_name, itsi_py3.string_type), isinstance(action_name, itsi_py3.string_type)
and not action_name.strip()]):
message = 'Invalid action_name "%s"' % action_name
self.logger.error(message)
raise TypeError(message)
action_config = self.configuration.get(action_name)
if not action_config:
message = '%s does not have any config' % action_name
self.logger.error(message)
raise NotableEventActionException(message)
self.logger.debug('action name=%s config=%s', action_name, action_config)
in_sync = normalizeBoolean(action_config.get('execute_in_sync', data.get('_is_sync', False)))
self.logger.debug('should execute in sync=%s', in_sync)
return in_sync
def is_alt_search_command_available(self, action_name, action_config):
"""
Checks if configured alert action has an alternative search command configured
@type data: action_name
@param data: the executing action
@type data: action_config
@param data: the action configuration for the executing action
@rtype: bool
@return True if an alternative search command is configured, else False.
"""
app_name = action_config.get('app_name', None)
supported_app_version = action_config.get('alt_command_supported_version', None)
if action_config.get('alt_command', None) and supported_app_version and app_name:
if action_name in self.action_app_configuration:
app_conf = self.action_app_configuration[action_name]
else:
app_conf = self.action_app_configuration[action_name] = self.get_configuration(
conf_file_name='app', app=app_name
)
app_version = app_conf.get('launcher', None).get('version') if app_conf.get('launcher', None) else '0.0.0'
if not VersionCheck.validate_version(app_version):
app_version = self.extract_numeric_version(app_version)
if VersionCheck.compare(app_version, supported_app_version) >= 0:
return True
return False
def refresh_notable(self, job):
"""
Refresh given notables if applicable.
@type job: Job
@param job: the action job to be refreshed
@returns nothing
"""
action_name = job.action_name
action_config = self.configuration.get(action_name)
policy_id = self._get_policy_id_from_group_data(job.group_data)
# Refresh is applicable only if there is a key `type` in your stanza
# and whose value value exists in `ACTION_TYPE_MANIFEST`
# Ex: In notable_event_actions.conf:
# [snow_incident]
# ...
# type = external_ticket
# ...
# and here `external_ticket` exists in our ACTION_TYPE_MANIFEST
if ('type' not in action_config or action_config.get('type') not in ACTION_TYPE_MANIFEST
or self.is_alt_search_command_available(action_name, action_config)):
self.logger.info('Refresh is not applicable for `%s`.', action_name)
return
self.logger.info('Refreshing notable events=`%s`.', job.group_ids)
obj_type = ACTION_TYPE_MANIFEST[action_config['type']]
obj_type.do_refresh(
self.session_key,
self.logger,
action_name,
job.group_ids,
action_config,
action_data=job.params,
itsi_policy_id=policy_id,
action_dispatch_config=self.action_dispatch_config
)
self.logger.info('Refresh completed. Action executed on=`%s`', job.group_ids)
def schedule_jobs(self, data):
"""
Schedule jobs for running actions. The jobs that is not being scheduled are returned back.
Note: the async actions are not scheduled because we can just run without waiting for the completion.
@param data: data which hold action information for schema please refer execute_actions
data structure would looks like this
ids : list of events or group ids
name: string action name
params: dict key/value pair for action parameters
_is_sync: bool to check if action is sync or async
_is_group: bool to check if action is being perform on group or not
_group_data: list if event ids where action is perform if list is
empty then action is being done on all events of the group
earliest_time - earliest time
latest_time - latest time
@return: list of action jobs not scheduled.
"""
if not isinstance(data, dict):
self.logger.error("Invalid data %s", data)
raise NotableEventActionException("Invalid data so cannot perform actions.")
result = []
action_name = data.get('name')
action_id = data.get('action_id', None)
self.logger.info(
'actionId="%s", actionName="%s", Status="Started", FunctionName="actionExecution", actionInternalName="%s"',
action_id, "notable_event_action", action_name
)
if self.configuration.get(action_name) is None and \
self.action_dispatch_config and \
self.action_dispatch_config.ea_role == 'executor':
error_msg = 'ITSI episode action {} ( actionId={} ) was dispatched to the ' \
'executor, but the action has not been ' \
'configured on the executor host.'.format(action_name, action_id)
self.logger.error(error_msg)
result.append(Job(action_name, action_id, status=Job.FAILED, message=error_msg))
return result
params = data.get('params', {})
if isinstance(params, itsi_py3.string_type):
params = json.loads(params)
ids, additional_ids = self.get_event_ids(data)
is_sync = self.should_execute_in_sync(data)
# If user want to perform operation on limit events from group then
# he can specify those ids in _group_action. Here
# _group_data = {
# event_ids: [event1, event2] so on .. /
# }
# Note - do not set group data if you want to run action on all events of group
group_data = data.get('_group_data', {})
id_policy_map = data.get('id_policy_map', {})
earliest = data.get('earliest')
latest = data.get('latest')
num_of_groups = len(data['ids'])
action_config = self.configuration.get(action_name, {})
is_bulk_action_iterative = normalizeBoolean(action_config.get('run_bulk_action_iteratively', False))
action_retries = int(action_config.get('max_retries', 0))
if is_bulk_action_iterative and num_of_groups > 1:
# The snow_incident alert action currently only generates one incident per call
# to bulk generate incidents the sendalert must be called iteratively
bulk_max = int(action_config.get('bulk_max', 25))
if num_of_groups > bulk_max:
error_msg = "Too many episodes selected to perform action {} ( actionId={} ). " \
"Select {} or fewer episodes and try again.".format(action_name, action_id, bulk_max)
self.logger.error(error_msg)
result.append(Job(action_name, action_id, status=Job.FAILED, message=error_msg))
return result
for i in data['ids']:
if 'event' not in group_data and i in id_policy_map:
# Put itsi_policy_id to the Job object
group_data['event'] = {self.policy_id_field: id_policy_map.get(i)}
cur_job = Job(action_name, action_id, Job.INIT, None, [i], params, group_data, is_sync,
earliest, latest, additional_ids, action_retries)
if is_sync:
self.scheduler.waiting_queue_offer(cur_job)
else:
result.append(cur_job)
elif len(ids) > 0:
if 'event' not in group_data and ids[0] in id_policy_map:
# Put itsi_policy_id to the Job object,
# since these actions are only for one group, they will map to one policy.
group_data['event'] = {self.policy_id_field: id_policy_map.get(ids[0])}
# perform action in batch if ids are more than a given limit
index = 0
while index < len(ids):
batch_size = self.BATCH_SIZE if len(ids) > index + self.BATCH_SIZE else len(ids) - index
batch = ids[index: index + batch_size]
cur_job = Job(action_name, action_id, Job.INIT, None, batch, params, group_data, is_sync,
earliest, latest, additional_ids, action_retries)
if is_sync:
self.scheduler.waiting_queue_offer(cur_job)
else:
result.append(cur_job)
self.logger.debug('Successfully initialized job for action=%s, actionId=%s, ids=%s',
action_name, action_id, batch)
index += batch_size
return result
def curate_default_prepend_command(self, prepend_command, ids, is_group, action_name):
"""
For certain action types, we need to curate the prepend_command. See
specific implementations for more details.
@type prepend_command: basestring
@param prepend_command: prepend_command so far
@type ids: list
@param ids: event ids we are working on
@type is_group: boolean
@param is_group: indicates if we are working on a Notable Event group
@type action_name: basestring
@param action_name: name of the action being executed
@rtype: basestring
@return: curated default prepand command for given action
"""
action_config = self.configuration.get(action_name, {})
action_type = action_config.get('type', '')
if action_type not in ACTION_TYPE_MANIFEST:
self.logger.debug(
'No curation required. Command=`%s` action_type=`%s` action_name=`%s`', prepend_command, action_type,
action_name
)
return prepend_command
self.logger.debug(
'pre-curate action_type=`%s` action_name=`%s` default_prepend_command=`%s`', action_type, action_name,
prepend_command
)
obj = ACTION_TYPE_MANIFEST.get(action_type)
try:
prepend_command = obj.curate_search_prepend_command(
prepend_command,
ids,
is_group,
action_name,
action_config,
self.logger
)
except AttributeError:
self.logger.warning('Attribute error: `curate_search_prepend_command` for `%s`. Will pass.', str(obj))
self.logger.debug('post-curate default_prepend_command=`%s`', prepend_command)
return prepend_command
def get_default_prepend_command(self, ids, action_name, is_group=False, event=None):
"""
get the default prepend command for given ids and action name
@type ids: list
@param ids: event or group id of notable events
@type action_name: basestring
@param action_name: name of the action that is being executed
@type is_group: bool
@param is_group: set to true when action is being done on a group.
@type event: dict
@param event: the event to run the action on
@rtype: basestring
@return: the default prepend command
"""
if not isinstance(ids, list) or not ids:
message = 'Invalid ids. Expecting valid list. Received="%s".' % ids
self.logger.error(message)
raise TypeError(message)
group_filter_command = ''
for eid in ids:
group_filter_command += ' itsi_group_id="%s" OR' % eid
# Remove the trailing `OR` that we blindly appended earlier.
group_filter_command = group_filter_command.rstrip('OR')
if event and self.policy_id_field not in event:
event_string = json.dumps(json.dumps(event)) # escape double quotes
prepend_command = '| stats count | eval makeresultsevent={} | ' \
'spath input=makeresultsevent | fields - makeresultsevent, count '.format(event_string)
else:
prepend_command = 'search `itsi_event_management_group_index` {0} ' \
'| dedup itsi_group_id | fields * '.format(group_filter_command)
return self.curate_default_prepend_command(prepend_command, ids, is_group, action_name)
def get_alert_command(self, action_name, action_data):
"""
Get the appropriate alert command to execute an action.
i.e. sendalert et al.
@type action_name: basestring
@param action_name: the name of the action to execute
@type action_data: dict
@param action_data: data sent with the action, presumably the parameters
@rtype: basestring
@return: the alert command
"""
action_content = self.all_actions.get(action_name)
if action_name == 'email':
alert_command = 'sendemail'
elif action_name == 'script':
alert_command = action_content.get('command')
# file name token replacement
if 'filename' not in action_data and 'action.script.filename' not in action_data:
self.logger.error('file name is required to run a script')
raise ValueError('Filename is missing in the request.')
else:
field_prefix = 'action.' + action_name
token_name = '$' + field_prefix + '.filename$'
filename = action_data.get('filename') or action_data.get('action.script.filename')
alert_command = alert_command.replace(token_name, filename)
elif action_name == 'rss':
alert_command = action_content.get('command')
else:
alert_command = 'sendalert'
return alert_command
def synchronize_search(self, search_job, search_command):
"""
Synchronizing an existing search implies, waiting for search to complete
and verifying that it ran successfully.
@type search_job: SearchJob
@param search_job: Executed search job
@type search_command: basestring
@param search_command: the executed command
@rtype: None
@returns Nothing
@raise NotableEventActionException
"""
splunk_search.waitForJob(search_job)
if search_job.isFailed:
raise NotableEventActionException('%s search failed. Refer to the search.log at "%s".' %
(search_command, search_job.links.get('search.log')))
# Check messages as well
is_error = False
error_msg = ''
for msg in search_job.messages:
if not msg:
pass
if isinstance(msg, itsi_py3.string_type) and (msg.upper() == 'ERROR' or msg.upper() == 'FATAL'):
error_msg = str(msg)
is_error = True
break
if is_error:
message = 'Search failed with message="%s"' % error_msg
raise NotableEventActionException(message)
def run_search(self, search_command, is_sync, earliest_time=None, latest_time=None):
"""
Run a splunk search.
@type search_command: basestring
@param search_command: the command to execute
@type is_sync: boolean
@param is_sync: indicates if search should be run synchronously
@type earliest_time: basestring
@param earliest_time: earliest time
@type latest_time: basestring
@param latest_time: latest time
@rtype: basestring
@returns: job's search id.
"""
job = splunk_search.dispatch(
search=search_command,
sessionKey=self.session_key,
owner=self.owner,
namespace=self.app,
earliestTime=earliest_time,
latestTime=latest_time
)
if is_sync:
t = time.time()
self.synchronize_search(job, search_command)
diff = time.time() - t
diff_string = str(round(diff, 2))
self.logger.info(
'source_component="notable_event_actions", search_command=\'%s\', duration_in_sec="%s", job_sid="%s"',
search_command, diff_string, str(job.sid))
return job.sid
def get_search_command(self, action_name, action_data, ids, is_group, event=None):
"""
Get search command for an action
@type action_name: basestring
@param action_name: the action name
@type action_data: dict
@param action_data: action data in the request
@type ids: list
@param ids: list of IDs
@type is_group: bool
@param is_group: True if the IDs are group IDs, otherwise False
@type event: dict
@:param event: the event to run action on
@rtype: basestring
@returns: search command string
"""
alert_command = self.get_alert_command(action_name, action_data)
default_prepend_command = self.get_default_prepend_command(ids, action_name, is_group, event)
group_state_lookup = ' | `itsi_notable_event_actions_temp_state_values` | ' \
'`itsi_notable_group_lookup` | `itsi_notable_event_actions_coalesce_state_values`'
if self.host_base_uri:
if event and 'event_id' in event:
search_command = (
'{} | appendcols [earemotesearch remote_spl="| stats count | fields - count | eval'
' itsi_group_id=\\\"{}\\\" | `itsi_notable_group_lookup`"]'.format(default_prepend_command, ids[0])
)
else:
search_command = '| earemotesearch remote_spl="{0}"'.format(
default_prepend_command.replace('"', r'\"') + group_state_lookup
)
else:
search_command = action_data.get('prepand_search_command', default_prepend_command) + group_state_lookup
if action_name == 'jira_cloud_issue':
search_command += (' | eval jira_ticket_system="Jira Cloud" | lookup itsi_notable_event_external_ticket '
+ 'event_id as itsi_group_id tickets.ticket_system as jira_ticket_system OUTPUTNEW '
+ 'tickets.ticket_system as ticketing_system tickets.ticket_id as ticket_id '
+ '| eval jira_ticket_id= if(ticketing_system="Jira Cloud", ticket_id, " ")')
if not search_command.rstrip(' ').endswith('|'):
search_command += ' | '
if action_name in ['script', 'rss']:
search_command += ' %s ' % alert_command
else:
command = action_name
# check if send alert command has a custom wrapper
action_config = self.configuration.get(action_name)
if self.is_alt_search_command_available(action_name, action_config):
command = action_config.get('alt_command')
search_command += ' %s "%s" ' % (alert_command, command)
# get params and add it to search command
search_command += self.get_command_params(action_name, action_data, is_group)
# Workaround for issue SPL-128836
if action_name in ['email']:
email_data = self.get_all_actions(True).get('email', {})
if email_data.get('mailserver'):
self.logger.info('Adding mail server=%s to email command', email_data.get('mailserver'))
search_command += ' server="%s" ' % email_data.get('mailserver')
if action_name == 'remedy_incident_rest':
account_name = action_data.get('action.remedy_incident_rest.param.account', None)
if account_name:
self.logger.info('Adding account to the remedy_incident_rest command. account=%s', account_name)
search_command += ' account="%s" ' % account_name
else:
self.logger.error('Account must be provided if running remedy_incident_rest action')
return search_command
def execute_action(self, job):
"""
Invoke command by passing given Job object
@type job: Job
@param job: The action job to be executed.
@rtype: bool
@returns True if the action stared successfully, otherwise False.
"""
action_name = job.action_name
data = job.params
ids = job.group_ids
event = job.group_data.get('event')
earliest_time = job.earliest
latest_time = job.latest
if not isinstance(action_name, itsi_py3.string_type):
message = 'Action name is not specified="%s".' % action_name
self.logger.error(message)
raise ValueError(message)
valid_actions = [action.get('action_name') for action in self.get_actions()]
if action_name not in valid_actions:
self.logger.error('Invalid action provided=%s', action_name)
raise NotableEventActionException(
'Invalid action. This action: "%s"'
' is not allowed for notable event.' % action_name
)
# All actions are executed as a Splunk search. Surprise!!!
search_command = self.get_search_command(action_name, data, ids, True, event)
self.logger.info(
'Generated search command=`%s` for action=%s, actionId=%s with earliest_time=%s, latest_time=%s',
search_command, action_name, job.action_id, earliest_time, latest_time
)
# Just kick start the action, don't need to sync with the results
try:
job.search_command = search_command
job.splunk_job = splunk_search.dispatch(
search=search_command,
sessionKey=self.session_key,
owner=self.owner,
namespace=self.app,
earliestTime=earliest_time,
latestTime=latest_time
)
except Exception as e:
self.logger.exception(e)
self.logger.error(
'actionId="%s", actionName="%s", Status="Failed", FunctionName="actionExecution", '
'actionInternalName="%s", exception="%s"',
job.action_id, "notable_event_action", action_name, e)
if hasattr(e, 'message'):
job.message = e.message
job.status = Job.FAILED
return False
job.status = Job.STARTED
return True
def check_action(self, job):
"""
Verify if the action job is completed or not
@type job: Job
@param job: The action job to be verified.
@rtype: bool
@returns True if the action is completed (succeeded or failed), False if the job is not completed.
"""
if job.splunk_job is None:
self.logger.warn('Splunk Job object is none for actionName=%s, actionId=%s, marking it as done.' %
(job.action_name, job.action_id))
job.status = Job.FAILED
job.message = 'Splunk job object not found.'
return True
if job.splunk_job.isDone:
if job.splunk_job.isFailed:
job.error_message = job.splunk_job.messages['error']
total_retries = int(self.configuration[job.action_name].get('max_retries', 0))
sleep_time = int(self.configuration[job.action_name].get('retry_interval', 0))
if job.action_retries > 0:
time.sleep(sleep_time)
job.action_retries -= 1
self.logger.info(
'The notable event action failed for actionId=%s, search_command=\'%s\', '
'job_sid="%s", job_action retries="%s"',
job.action_id, job.search_command, str(job.splunk_job.sid), (total_retries - job.action_retries))
return False
error_msg = '%s search failed for actionId=%s. Refer to the search.log at "%s".' %\
(job.action_id, job.search_command, job.splunk_job.links.get('search.log'))
job.status = Job.FAILED
job.message = error_msg
self.logger.error(error_msg)
else:
job.status = Job.DONE
self.logger.info(
'Notable event action command runs successfully for actionId=%s, search_command=\'%s\', '
'job_sid="%s"',
job.action_id, job.search_command, str(job.splunk_job.sid))
return True
return False
def extract_numeric_version(self, version):
"""
Extract the numeric dotted version from the given version
There may be cases where a [non-ITSI] app (such as ServiceNow Add-on) may have version with alphanumeric
characters at the end but when we are doing version number comparison, we only need the beginning number part.
Example: "5.5b" will return "5.5", "6.3.0Rf44f5cb" will return "6.3.0"
@type version: string
@param version: version number
"""
numeric_version_regex = r'\d+(\.\d+)*'
return re.search(numeric_version_regex, version).group(0)