You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
529 lines
21 KiB
529 lines
21 KiB
# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
|
|
|
|
import abc
|
|
import csv
|
|
import gzip
|
|
import json
|
|
import sys
|
|
import os
|
|
import random
|
|
import time as time_import
|
|
import copy
|
|
import asyncio
|
|
|
|
from splunk.clilib.bundle_paths import make_splunkhome_path
|
|
from splunk.util import normalizeBoolean
|
|
|
|
sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib']))
|
|
sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib', 'SA_ITOA_app_common']))
|
|
|
|
from SA_ITOA_app_common.splunklib import results
|
|
|
|
from itsi.itsi_utils import ITOAInterfaceUtils
|
|
|
|
from ITOA.setup_logging import logger
|
|
from ITOA.event_management.push_event_manager import PushEventManager
|
|
from ITOA.itoa_common import get_itsi_event_management_conf_field_value
|
|
|
|
from .notable_event_utils import filter_index_fields_and_get_event_id_for_notable_event, NotableEventException
|
|
from .itsi_nats_publish import NatsEventPublisher
|
|
# set the maximum allowable CSV field size
|
|
#
|
|
# The default of the csv module is 128KB; upping to 10MB. See SPL-12117 for
|
|
# the background on issues surrounding field sizes.
|
|
# (this method is new in python 2.5)
|
|
csv.field_size_limit(10485760)
|
|
|
|
|
|
# This is common modular action which is being copied from CIM
|
|
# Once we moved to model of pull library at run time, Please delete ModularAction class code and
|
|
# pull this class from common lib
|
|
|
|
|
|
class InvalidResultID(Exception):
|
|
pass
|
|
|
|
|
|
class ModularAction(object):
|
|
DEFAULT_MESSAGE = 'sendmodalert - signature="%s" action_name="%s" search_name="%s" sid="%s" orig_sid="%s" rid="%s" orig_rid="%s" app="%s" owner="%s" action_mode="%s" action_status="%s"'
|
|
|
|
# we require a logging instance
|
|
def __init__(self, settings, logger, action_name=None):
|
|
self.settings = json.loads(settings)
|
|
self.logger = logger
|
|
self.session_key = self.settings.get('session_key')
|
|
self.sid = self.settings.get('sid')
|
|
self.orig_sid = ''
|
|
self.rid = ''
|
|
self.orig_rid = ''
|
|
self.results_file = self.settings.get('results_file')
|
|
self.search_name = self.settings.get('search_name')
|
|
self.app = self.settings.get('app')
|
|
self.owner = self.settings.get('owner')
|
|
self.configuration = self.settings.get('configuration', {})
|
|
# use | sendalert param.action_name=$action_name$
|
|
self.action_name = self.configuration.get('action_name') or action_name or 'unknown'
|
|
self.action_mode = 'undetermined'
|
|
self.action_status = ''
|
|
# Since we don't use the result object we get from settings it will be purged
|
|
del self.settings['result']
|
|
|
|
# get job
|
|
self.job = {}
|
|
try:
|
|
service = ITOAInterfaceUtils.service_connection(self.session_key, app_name="SA-ITOA")
|
|
search_job = service.job(self.sid)
|
|
error_message = search_job.messages.get('error')
|
|
if error_message:
|
|
self.logger.warning(self.message(f'Could not retrieve search job info, Error: {error_message}'))
|
|
else:
|
|
result = results.JSONResultsReader(search_job.results(output_mode='json'))
|
|
for event in result:
|
|
if isinstance(event, dict):
|
|
self.job = event
|
|
self.logger.info(self.message('Successfully retrieved search job info.'))
|
|
self.logger.debug(self.job)
|
|
except Exception:
|
|
logger.exception("Exception retrieving search job info.")
|
|
self.logger.warning(self.message('Could not retrieve search job info.'))
|
|
|
|
# set action_mode
|
|
if self.job.get('delegate', 'scheduler') == 'scheduler':
|
|
self.action_mode = 'saved'
|
|
else:
|
|
self.action_mode = 'adhoc'
|
|
|
|
# The purpose of this method is to provide a common messaging interface
|
|
def message(self, signature, status=None):
|
|
status = status or self.action_status
|
|
return ModularAction.DEFAULT_MESSAGE % (
|
|
signature, self.action_name, self.search_name, self.sid, self.orig_sid, self.rid, self.orig_rid, self.app,
|
|
self.owner, self.action_mode, status
|
|
)
|
|
|
|
# The purpose of this method is to update per-result ModAction attributes
|
|
def update(self, result):
|
|
# This is for events/results that were created as the result of a previous action
|
|
self.orig_sid = result.get('orig_sid', '')
|
|
# This is for events/results that were created as the result of a previous action
|
|
self.orig_rid = result.get('orig_rid', '')
|
|
if 'rid' in result:
|
|
self.rid = result['rid']
|
|
else:
|
|
raise InvalidResultID('Result must have an ID')
|
|
|
|
# The purpose of this method is to generate per-result invocation messages
|
|
def invoke(self):
|
|
self.logger.debug(self.message('Invoking modular alert action'))
|
|
|
|
def dowork(self):
|
|
return
|
|
|
|
|
|
class SendAlert(ModularAction, metaclass=abc.ABCMeta):
|
|
|
|
"""
|
|
Send alert to index file
|
|
"""
|
|
# FIELD NAME
|
|
HTTP_AUTH_TOKEN = 'http_auth_token'
|
|
HTTP_TOKEN_NAME = 'http_token_name'
|
|
INDEX_NAME = 'index'
|
|
SOURCETYPE = 'sourcetype'
|
|
TITLE = 'title'
|
|
DESCRIPTION = 'description'
|
|
OWNER = 'owner'
|
|
STATUS = 'status'
|
|
SEVERITY = 'severity'
|
|
DRILLDOWN_SEARCH_TITLE = 'drilldown_search_title'
|
|
DRILLDOWN_SEARCH_SEARCH = 'drilldown_search_search'
|
|
DRILLDOWN_SEARCH_LATEST_OFFSET = 'drilldown_search_latest_offset'
|
|
DRILLDOWN_SEARCH_EARLIEST_OFFSET = 'drilldown_search_earliest_offset'
|
|
DRILLDOWN_TITLE = 'drilldown_title'
|
|
DRILLDOWN_URI = 'drilldown_uri'
|
|
UNIQUE_IDENTIFIER_FIELDS = 'event_identifier_fields'
|
|
METADATA = 'meta_data'
|
|
EDITOR = 'editor'
|
|
EVENT_FIELD_MAX_LENGTH = 'event_field_max_length'
|
|
BATCH_SIZE = 'batch_size'
|
|
|
|
# Reserved fields used by itsi_tracked_alerts index
|
|
RESERVED_FIELDS = {'event_id'}
|
|
|
|
def __init__(self, settings, is_validate=True):
|
|
"""
|
|
Initialized send alert class instance
|
|
|
|
@type settings: basestring
|
|
@param settings: sys.stdin.read() contains
|
|
|
|
@type is_validate: bool
|
|
@param is_validate: flag to validate required params or not
|
|
|
|
@return:
|
|
"""
|
|
self.required_params = [self.HTTP_TOKEN_NAME, self.INDEX_NAME]
|
|
self.optional_params = [self.HTTP_AUTH_TOKEN,
|
|
self.SOURCETYPE, self.TITLE,
|
|
self.DESCRIPTION, self.TITLE, self.SEVERITY,
|
|
self.OWNER, self.STATUS, self.DRILLDOWN_SEARCH_SEARCH, self.DRILLDOWN_SEARCH_TITLE,
|
|
self.DRILLDOWN_SEARCH_EARLIEST_OFFSET, self.DRILLDOWN_SEARCH_LATEST_OFFSET,
|
|
self.DRILLDOWN_TITLE, self.DRILLDOWN_URI, self.UNIQUE_IDENTIFIER_FIELDS,
|
|
self.EVENT_FIELD_MAX_LENGTH]
|
|
|
|
action_name = 'event_generator'
|
|
super(SendAlert, self).__init__(settings, logger, action_name)
|
|
self.splunkd_uri = self.settings.get('server_uri')
|
|
|
|
# Construct UI to update alert_actions settings
|
|
self.app = 'SA-ITOA'
|
|
self.owner = 'nobody'
|
|
|
|
self.params = {}
|
|
|
|
# define it after validation
|
|
self.push_manager = None
|
|
if is_validate and not self.validate_params():
|
|
raise ValueError('Failed to validate arguments. Please make sure arguments are correct')
|
|
else:
|
|
self.initialize_params()
|
|
|
|
# Get collection fields
|
|
self.event_id_key = 'event_id'
|
|
|
|
# check whether sort_notable_events is true
|
|
self.sort_notable_events = get_itsi_event_management_conf_field_value(self.session_key, 'tracked_alert',
|
|
'sort_notable_events')
|
|
if not self.sort_notable_events:
|
|
self.sort_notable_events = 0
|
|
|
|
if self.sort_notable_events == 1:
|
|
try:
|
|
self.is_use_event_time = normalizeBoolean(self._get_field('is_use_event_time'))
|
|
except Exception as e:
|
|
self.logger.exception('Failed to get is_use_event_time value : %s' % e.args[0])
|
|
self.is_use_event_time = False
|
|
|
|
if not self.is_use_event_time:
|
|
self.logger.warning('The is_use_event_time field value is 0. Please change it to 1 in order to sort '
|
|
'the notable events.')
|
|
self.sort_notable_events = 0
|
|
|
|
@abc.abstractmethod
|
|
def pre_processing(self, event):
|
|
"""
|
|
Abstract function which has to be implement by inherit class
|
|
Normally it is being used to perform some validation or may be
|
|
send events to some where else
|
|
|
|
@param event: dict
|
|
@param event: event which is going to be push to index or had pushed
|
|
|
|
@rtype: bool
|
|
@return: True - if event pushed to kv store successfully otherwise false
|
|
"""
|
|
raise NotImplementedError('Function is not implemented')
|
|
|
|
@abc.abstractmethod
|
|
def undo_pre_processing(self):
|
|
"""
|
|
Undo operation of pre_processsing
|
|
|
|
@return:
|
|
"""
|
|
raise NotImplementedError('Function is not implemented')
|
|
|
|
def validate_params(self):
|
|
"""
|
|
Validate parameters
|
|
|
|
@rtype: bool
|
|
@return: True/False
|
|
"""
|
|
is_found = True
|
|
field_not_available = None
|
|
for key in self.required_params:
|
|
if key not in self.configuration:
|
|
is_found = False
|
|
field_not_available = key
|
|
break
|
|
if not is_found:
|
|
raise ValueError('Required field={0} does not exist'.format(field_not_available))
|
|
return self.additional_validation()
|
|
|
|
def initialize_params(self):
|
|
"""
|
|
Initialize parameters
|
|
@return:
|
|
"""
|
|
for key in self.required_params:
|
|
self.params[key] = self.configuration.get(key)
|
|
for optional_key in self.optional_params:
|
|
self.params[optional_key] = self.configuration.get(optional_key)
|
|
# Now fetch remaining configuration
|
|
for key, value in self.configuration.items():
|
|
if key not in self.params and value:
|
|
self.params[key] = value
|
|
self.logger.debug('Parameters=%s', self.params)
|
|
return self.params
|
|
|
|
def _get_field(self, name, default_value=None):
|
|
"""
|
|
Get field
|
|
@type name: basestring
|
|
@param name: field name
|
|
|
|
@type default_value: basestring
|
|
@param default_value: default value if field value does not exist
|
|
|
|
@return: field value
|
|
"""
|
|
return self.configuration.get(name, default_value)
|
|
|
|
def _get_sourcetype(self):
|
|
"""
|
|
Get sourcetype, dafault value is stash
|
|
|
|
@rtype: basestring
|
|
@return: sourcetype
|
|
"""
|
|
value = self._get_field(self.SOURCETYPE)
|
|
if value is None:
|
|
# Return default
|
|
return 'itsi_notable:event'
|
|
else:
|
|
return value
|
|
|
|
def additional_validation(self):
|
|
"""
|
|
Perform additional validation like checking for auth token key. If key does not exist
|
|
then create http token key
|
|
|
|
@return: None
|
|
"""
|
|
# Check if auth token is provided otherwise, get auth token and set it
|
|
# Get new
|
|
token_name = self._get_field(self.HTTP_TOKEN_NAME)
|
|
# Acquire token
|
|
index = self._get_field(self.INDEX_NAME) # noqa F841
|
|
sourcetype = self._get_sourcetype() # noqa F841
|
|
try:
|
|
self.push_manager = PushEventManager(self.session_key, token_name=token_name)
|
|
except Exception as e:
|
|
self.logger.exception(e)
|
|
return False
|
|
return True
|
|
|
|
def proceess_high_scale_ea_backfill_event(self, result):
|
|
is_backfill_event = result.get('is_backfill')
|
|
processed_event = json.loads(result.get('_raw'))
|
|
if result.get('host') is not None:
|
|
processed_event['event_host'] = result.get('host')
|
|
else:
|
|
processed_event['event_host'] = self.settings.get('server_host')
|
|
# Search name will be come source otherwise result source is being set as source type
|
|
if result.get('source') is not None:
|
|
processed_event['event_source'] = result.get('source')
|
|
else:
|
|
processed_event['event_source'] = '' if processed_event.get('search_name') is None else processed_event['search_name']
|
|
|
|
if result.get('sourcetype') is not None:
|
|
processed_event['event_sourcetype'] = result.get('sourcetype')
|
|
else:
|
|
processed_event['event_sourcetype'] = self._get_sourcetype()
|
|
|
|
if result.get('_time') is not None:
|
|
processed_event['_time'] = float(result.get('_time'))
|
|
|
|
processed_event['is_backfill_event'] = is_backfill_event
|
|
return processed_event
|
|
|
|
def proceess_notable_event(self, result, num):
|
|
"""
|
|
@type result: dict
|
|
@param result: result
|
|
|
|
@type num: int
|
|
@param num: result count
|
|
|
|
@return: None
|
|
"""
|
|
|
|
block_list_param_fields = [self.HTTP_AUTH_TOKEN, self.INDEX_NAME, self.HTTP_TOKEN_NAME, self.SOURCETYPE,
|
|
self.METADATA, self.EDITOR, self.BATCH_SIZE]
|
|
|
|
result.setdefault('rid', str(num))
|
|
result['orig_sid'] = result.get('orig_sid', self.sid)
|
|
result['orig_rid'] = result.get('orig_rid', result['rid'])
|
|
|
|
self.update(result)
|
|
self.invoke()
|
|
|
|
fields_to_send = {}
|
|
# Add parameter fields
|
|
for field in self.params:
|
|
if field not in block_list_param_fields:
|
|
fields_to_send[field] = self._get_field(field)
|
|
|
|
max_field_len = int(self._get_field(self.EVENT_FIELD_MAX_LENGTH, "10000"))
|
|
|
|
# Make sure event does not contains fields which are similar name as parameters
|
|
for field in list(result.keys()):
|
|
if len(result[field]) > max_field_len:
|
|
# Truncate field by keep first max_field_len chars.
|
|
self.logger.warning('The length of field %s is %s which exceeds the max limit of %s chars.'
|
|
'The field will be truncated.', field, len(result[field]), max_field_len)
|
|
result[field] = result[field][0:max_field_len]
|
|
|
|
# if field similar to parameter of alert
|
|
if field in self.configuration or field in self.RESERVED_FIELDS:
|
|
if 'orig_' + field not in result:
|
|
result['orig_' + field] = result[field]
|
|
else:
|
|
self.logger.warning('Field=orig_%s already exist in the result hence adding random integer in the'
|
|
' field ', field)
|
|
result['orig_' + field + str(int(random.random() * 100000))] = result[field]
|
|
del result[field]
|
|
|
|
if result.get('host') is not None:
|
|
host = result.get('host')
|
|
else:
|
|
host = self.settings.get('server_host')
|
|
|
|
# Search name will be come source otherwise result source is being set as source type
|
|
if self.search_name:
|
|
source = self.search_name
|
|
# Add search_name field
|
|
fields_to_send['search_name'] = self.search_name
|
|
else:
|
|
source = fields_to_send.get('source')
|
|
|
|
fields_to_send['source'] = source
|
|
fields_to_send['host'] = host
|
|
|
|
# Filter index time fields and also create event_id, _time, mod_time and event_identifier_hash
|
|
identifier_fields = self._get_field(self.UNIQUE_IDENTIFIER_FIELDS)
|
|
self.logger.debug('Identifier fields="%s" for search=%s', identifier_fields, self.search_name)
|
|
event_time = result.get('_time') if normalizeBoolean(self.configuration.get('is_use_event_time', 0)) else None
|
|
fields_to_send.update(filter_index_fields_and_get_event_id_for_notable_event(result, self.logger,
|
|
identifier_fields,
|
|
fields_to_send=fields_to_send,
|
|
event_time=event_time,
|
|
is_token_replacement=True))
|
|
# Join serviceid field with service_ids
|
|
if fields_to_send.get('serviceid'):
|
|
serviceid = fields_to_send.get('serviceid').split('\n')
|
|
if fields_to_send.get('service_ids'):
|
|
service_ids = fields_to_send.get('service_ids').split(',')
|
|
fields_to_send['service_ids'] = ','.join(set(service_ids + serviceid))
|
|
else:
|
|
fields_to_send['service_ids'] = ','.join(serviceid)
|
|
|
|
result = self.pre_processing(fields_to_send)
|
|
|
|
if not result:
|
|
self.logger.error('Failed to create notable due to pre-processing step event=%s', fields_to_send)
|
|
raise NotableEventException('Pre-processing step failed while creating notable event')
|
|
else:
|
|
self.logger.debug("Returning event=%s source=%s host=%s to list", fields_to_send, source, host)
|
|
fields_to_send['event_source'] = source
|
|
fields_to_send['event_host'] = host
|
|
return fields_to_send
|
|
|
|
def update_and_push_events(self, events):
|
|
"""
|
|
Update push event
|
|
@return: None
|
|
"""
|
|
try:
|
|
self.push_manager.push_events(
|
|
events,
|
|
sourcetype=self._get_sourcetype()
|
|
)
|
|
except Exception as e:
|
|
self.undo_pre_processing()
|
|
raise e
|
|
|
|
def process_result_file(self, mode):
|
|
"""
|
|
Process result file with file open mode
|
|
|
|
@type mode: basestring
|
|
@param mode: file open mode
|
|
|
|
@return: None
|
|
"""
|
|
events = []
|
|
events_to_ingest = []
|
|
with gzip.open(self.results_file, mode) as fh:
|
|
for num, result in enumerate(csv.DictReader(fh)):
|
|
processed_event = self.proceess_notable_event(result, num)
|
|
events.append(processed_event)
|
|
|
|
if self.push_manager.is_queue_mode_enabled:
|
|
self.update_ingest_events(processed_event, events_to_ingest)
|
|
|
|
# if sort_notable_events is enabled, sort events & then push
|
|
if self.sort_notable_events == 1 and len(events) > 0:
|
|
self.logger.debug('Sort notable events based on _time value')
|
|
try:
|
|
# sort json
|
|
events = sorted(events, key=lambda k: k['_time'], reverse=False)
|
|
except Exception as e:
|
|
self.logger.exception('Failed to sort notable events: %s' % e.args[0])
|
|
|
|
batch_size = int(self._get_field(self.BATCH_SIZE, "5000"))
|
|
total_events = len(events)
|
|
for start in range(0, total_events, batch_size):
|
|
batch_events = events[start:min(start + batch_size, total_events)]
|
|
self.update_and_push_events(batch_events)
|
|
|
|
if len(events_to_ingest) > 0:
|
|
if self.push_manager.is_queue_mode_enabled:
|
|
nats_publisher = NatsEventPublisher(self.session_key, self.logger)
|
|
asyncio.run(nats_publisher.push_events_to_nats(events_to_ingest))
|
|
|
|
def update_ingest_events(self, processed_event, events_to_ingest):
|
|
host = processed_event['event_host']
|
|
source = processed_event['event_source']
|
|
event_time = float(time_import.time()) if not processed_event.get('_time') else (
|
|
float(processed_event.get('_time')))
|
|
event_to_ingest = copy.deepcopy(processed_event)
|
|
event_to_ingest['sourcetype'] = self._get_sourcetype() if not processed_event.get('event_sourcetype') else (
|
|
processed_event.get('event_sourcetype'))
|
|
event_to_ingest['host'] = host
|
|
event_to_ingest['source'] = source
|
|
event_to_ingest['_time'] = float(time_import.time()) if not event_time else event_time
|
|
events_to_ingest.append(event_to_ingest)
|
|
|
|
def run(self):
|
|
"""
|
|
Main function which is invoked by base class function
|
|
|
|
@rtype: bool
|
|
@return: True/False
|
|
"""
|
|
if not os.path.exists(self.results_file):
|
|
self.logger.info('Result file=%s does not exist. This could happen when search has no results',
|
|
self.results_file)
|
|
sys.stdout.write('No results found.')
|
|
try:
|
|
try:
|
|
self.process_result_file('rt')
|
|
except ValueError as e:
|
|
# On Windows Value Error is thrown because it interprets *nix csv files as binary files.
|
|
self.logger.info('Got ValueError: %s , trying to open the file in binary mode.', e.message)
|
|
self.process_result_file('rb')
|
|
return True
|
|
except IOError as e:
|
|
if e.errno == 2: # No file exist
|
|
self.logger.info('No results founds from search, %s', e.errno)
|
|
else:
|
|
self.logger.exception(e)
|
|
sys.stderr.write(e.args[0])
|
|
raise e
|
|
except Exception as e:
|
|
self.logger.exception(e)
|
|
sys.stderr.write(e.args[0])
|
|
raise e
|