You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

529 lines
21 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import abc
import csv
import gzip
import json
import sys
import os
import random
import time as time_import
import copy
import asyncio
from splunk.clilib.bundle_paths import make_splunkhome_path
from splunk.util import normalizeBoolean
sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib']))
sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib', 'SA_ITOA_app_common']))
from SA_ITOA_app_common.splunklib import results
from itsi.itsi_utils import ITOAInterfaceUtils
from ITOA.setup_logging import logger
from ITOA.event_management.push_event_manager import PushEventManager
from ITOA.itoa_common import get_itsi_event_management_conf_field_value
from .notable_event_utils import filter_index_fields_and_get_event_id_for_notable_event, NotableEventException
from .itsi_nats_publish import NatsEventPublisher
# set the maximum allowable CSV field size
#
# The default of the csv module is 128KB; upping to 10MB. See SPL-12117 for
# the background on issues surrounding field sizes.
# (this method is new in python 2.5)
csv.field_size_limit(10485760)
# This is common modular action which is being copied from CIM
# Once we moved to model of pull library at run time, Please delete ModularAction class code and
# pull this class from common lib
class InvalidResultID(Exception):
pass
class ModularAction(object):
DEFAULT_MESSAGE = 'sendmodalert - signature="%s" action_name="%s" search_name="%s" sid="%s" orig_sid="%s" rid="%s" orig_rid="%s" app="%s" owner="%s" action_mode="%s" action_status="%s"'
# we require a logging instance
def __init__(self, settings, logger, action_name=None):
self.settings = json.loads(settings)
self.logger = logger
self.session_key = self.settings.get('session_key')
self.sid = self.settings.get('sid')
self.orig_sid = ''
self.rid = ''
self.orig_rid = ''
self.results_file = self.settings.get('results_file')
self.search_name = self.settings.get('search_name')
self.app = self.settings.get('app')
self.owner = self.settings.get('owner')
self.configuration = self.settings.get('configuration', {})
# use | sendalert param.action_name=$action_name$
self.action_name = self.configuration.get('action_name') or action_name or 'unknown'
self.action_mode = 'undetermined'
self.action_status = ''
# Since we don't use the result object we get from settings it will be purged
del self.settings['result']
# get job
self.job = {}
try:
service = ITOAInterfaceUtils.service_connection(self.session_key, app_name="SA-ITOA")
search_job = service.job(self.sid)
error_message = search_job.messages.get('error')
if error_message:
self.logger.warning(self.message(f'Could not retrieve search job info, Error: {error_message}'))
else:
result = results.JSONResultsReader(search_job.results(output_mode='json'))
for event in result:
if isinstance(event, dict):
self.job = event
self.logger.info(self.message('Successfully retrieved search job info.'))
self.logger.debug(self.job)
except Exception:
logger.exception("Exception retrieving search job info.")
self.logger.warning(self.message('Could not retrieve search job info.'))
# set action_mode
if self.job.get('delegate', 'scheduler') == 'scheduler':
self.action_mode = 'saved'
else:
self.action_mode = 'adhoc'
# The purpose of this method is to provide a common messaging interface
def message(self, signature, status=None):
status = status or self.action_status
return ModularAction.DEFAULT_MESSAGE % (
signature, self.action_name, self.search_name, self.sid, self.orig_sid, self.rid, self.orig_rid, self.app,
self.owner, self.action_mode, status
)
# The purpose of this method is to update per-result ModAction attributes
def update(self, result):
# This is for events/results that were created as the result of a previous action
self.orig_sid = result.get('orig_sid', '')
# This is for events/results that were created as the result of a previous action
self.orig_rid = result.get('orig_rid', '')
if 'rid' in result:
self.rid = result['rid']
else:
raise InvalidResultID('Result must have an ID')
# The purpose of this method is to generate per-result invocation messages
def invoke(self):
self.logger.debug(self.message('Invoking modular alert action'))
def dowork(self):
return
class SendAlert(ModularAction, metaclass=abc.ABCMeta):
"""
Send alert to index file
"""
# FIELD NAME
HTTP_AUTH_TOKEN = 'http_auth_token'
HTTP_TOKEN_NAME = 'http_token_name'
INDEX_NAME = 'index'
SOURCETYPE = 'sourcetype'
TITLE = 'title'
DESCRIPTION = 'description'
OWNER = 'owner'
STATUS = 'status'
SEVERITY = 'severity'
DRILLDOWN_SEARCH_TITLE = 'drilldown_search_title'
DRILLDOWN_SEARCH_SEARCH = 'drilldown_search_search'
DRILLDOWN_SEARCH_LATEST_OFFSET = 'drilldown_search_latest_offset'
DRILLDOWN_SEARCH_EARLIEST_OFFSET = 'drilldown_search_earliest_offset'
DRILLDOWN_TITLE = 'drilldown_title'
DRILLDOWN_URI = 'drilldown_uri'
UNIQUE_IDENTIFIER_FIELDS = 'event_identifier_fields'
METADATA = 'meta_data'
EDITOR = 'editor'
EVENT_FIELD_MAX_LENGTH = 'event_field_max_length'
BATCH_SIZE = 'batch_size'
# Reserved fields used by itsi_tracked_alerts index
RESERVED_FIELDS = {'event_id'}
def __init__(self, settings, is_validate=True):
"""
Initialized send alert class instance
@type settings: basestring
@param settings: sys.stdin.read() contains
@type is_validate: bool
@param is_validate: flag to validate required params or not
@return:
"""
self.required_params = [self.HTTP_TOKEN_NAME, self.INDEX_NAME]
self.optional_params = [self.HTTP_AUTH_TOKEN,
self.SOURCETYPE, self.TITLE,
self.DESCRIPTION, self.TITLE, self.SEVERITY,
self.OWNER, self.STATUS, self.DRILLDOWN_SEARCH_SEARCH, self.DRILLDOWN_SEARCH_TITLE,
self.DRILLDOWN_SEARCH_EARLIEST_OFFSET, self.DRILLDOWN_SEARCH_LATEST_OFFSET,
self.DRILLDOWN_TITLE, self.DRILLDOWN_URI, self.UNIQUE_IDENTIFIER_FIELDS,
self.EVENT_FIELD_MAX_LENGTH]
action_name = 'event_generator'
super(SendAlert, self).__init__(settings, logger, action_name)
self.splunkd_uri = self.settings.get('server_uri')
# Construct UI to update alert_actions settings
self.app = 'SA-ITOA'
self.owner = 'nobody'
self.params = {}
# define it after validation
self.push_manager = None
if is_validate and not self.validate_params():
raise ValueError('Failed to validate arguments. Please make sure arguments are correct')
else:
self.initialize_params()
# Get collection fields
self.event_id_key = 'event_id'
# check whether sort_notable_events is true
self.sort_notable_events = get_itsi_event_management_conf_field_value(self.session_key, 'tracked_alert',
'sort_notable_events')
if not self.sort_notable_events:
self.sort_notable_events = 0
if self.sort_notable_events == 1:
try:
self.is_use_event_time = normalizeBoolean(self._get_field('is_use_event_time'))
except Exception as e:
self.logger.exception('Failed to get is_use_event_time value : %s' % e.args[0])
self.is_use_event_time = False
if not self.is_use_event_time:
self.logger.warning('The is_use_event_time field value is 0. Please change it to 1 in order to sort '
'the notable events.')
self.sort_notable_events = 0
@abc.abstractmethod
def pre_processing(self, event):
"""
Abstract function which has to be implement by inherit class
Normally it is being used to perform some validation or may be
send events to some where else
@param event: dict
@param event: event which is going to be push to index or had pushed
@rtype: bool
@return: True - if event pushed to kv store successfully otherwise false
"""
raise NotImplementedError('Function is not implemented')
@abc.abstractmethod
def undo_pre_processing(self):
"""
Undo operation of pre_processsing
@return:
"""
raise NotImplementedError('Function is not implemented')
def validate_params(self):
"""
Validate parameters
@rtype: bool
@return: True/False
"""
is_found = True
field_not_available = None
for key in self.required_params:
if key not in self.configuration:
is_found = False
field_not_available = key
break
if not is_found:
raise ValueError('Required field={0} does not exist'.format(field_not_available))
return self.additional_validation()
def initialize_params(self):
"""
Initialize parameters
@return:
"""
for key in self.required_params:
self.params[key] = self.configuration.get(key)
for optional_key in self.optional_params:
self.params[optional_key] = self.configuration.get(optional_key)
# Now fetch remaining configuration
for key, value in self.configuration.items():
if key not in self.params and value:
self.params[key] = value
self.logger.debug('Parameters=%s', self.params)
return self.params
def _get_field(self, name, default_value=None):
"""
Get field
@type name: basestring
@param name: field name
@type default_value: basestring
@param default_value: default value if field value does not exist
@return: field value
"""
return self.configuration.get(name, default_value)
def _get_sourcetype(self):
"""
Get sourcetype, dafault value is stash
@rtype: basestring
@return: sourcetype
"""
value = self._get_field(self.SOURCETYPE)
if value is None:
# Return default
return 'itsi_notable:event'
else:
return value
def additional_validation(self):
"""
Perform additional validation like checking for auth token key. If key does not exist
then create http token key
@return: None
"""
# Check if auth token is provided otherwise, get auth token and set it
# Get new
token_name = self._get_field(self.HTTP_TOKEN_NAME)
# Acquire token
index = self._get_field(self.INDEX_NAME) # noqa F841
sourcetype = self._get_sourcetype() # noqa F841
try:
self.push_manager = PushEventManager(self.session_key, token_name=token_name)
except Exception as e:
self.logger.exception(e)
return False
return True
def proceess_high_scale_ea_backfill_event(self, result):
is_backfill_event = result.get('is_backfill')
processed_event = json.loads(result.get('_raw'))
if result.get('host') is not None:
processed_event['event_host'] = result.get('host')
else:
processed_event['event_host'] = self.settings.get('server_host')
# Search name will be come source otherwise result source is being set as source type
if result.get('source') is not None:
processed_event['event_source'] = result.get('source')
else:
processed_event['event_source'] = '' if processed_event.get('search_name') is None else processed_event['search_name']
if result.get('sourcetype') is not None:
processed_event['event_sourcetype'] = result.get('sourcetype')
else:
processed_event['event_sourcetype'] = self._get_sourcetype()
if result.get('_time') is not None:
processed_event['_time'] = float(result.get('_time'))
processed_event['is_backfill_event'] = is_backfill_event
return processed_event
def proceess_notable_event(self, result, num):
"""
@type result: dict
@param result: result
@type num: int
@param num: result count
@return: None
"""
block_list_param_fields = [self.HTTP_AUTH_TOKEN, self.INDEX_NAME, self.HTTP_TOKEN_NAME, self.SOURCETYPE,
self.METADATA, self.EDITOR, self.BATCH_SIZE]
result.setdefault('rid', str(num))
result['orig_sid'] = result.get('orig_sid', self.sid)
result['orig_rid'] = result.get('orig_rid', result['rid'])
self.update(result)
self.invoke()
fields_to_send = {}
# Add parameter fields
for field in self.params:
if field not in block_list_param_fields:
fields_to_send[field] = self._get_field(field)
max_field_len = int(self._get_field(self.EVENT_FIELD_MAX_LENGTH, "10000"))
# Make sure event does not contains fields which are similar name as parameters
for field in list(result.keys()):
if len(result[field]) > max_field_len:
# Truncate field by keep first max_field_len chars.
self.logger.warning('The length of field %s is %s which exceeds the max limit of %s chars.'
'The field will be truncated.', field, len(result[field]), max_field_len)
result[field] = result[field][0:max_field_len]
# if field similar to parameter of alert
if field in self.configuration or field in self.RESERVED_FIELDS:
if 'orig_' + field not in result:
result['orig_' + field] = result[field]
else:
self.logger.warning('Field=orig_%s already exist in the result hence adding random integer in the'
' field ', field)
result['orig_' + field + str(int(random.random() * 100000))] = result[field]
del result[field]
if result.get('host') is not None:
host = result.get('host')
else:
host = self.settings.get('server_host')
# Search name will be come source otherwise result source is being set as source type
if self.search_name:
source = self.search_name
# Add search_name field
fields_to_send['search_name'] = self.search_name
else:
source = fields_to_send.get('source')
fields_to_send['source'] = source
fields_to_send['host'] = host
# Filter index time fields and also create event_id, _time, mod_time and event_identifier_hash
identifier_fields = self._get_field(self.UNIQUE_IDENTIFIER_FIELDS)
self.logger.debug('Identifier fields="%s" for search=%s', identifier_fields, self.search_name)
event_time = result.get('_time') if normalizeBoolean(self.configuration.get('is_use_event_time', 0)) else None
fields_to_send.update(filter_index_fields_and_get_event_id_for_notable_event(result, self.logger,
identifier_fields,
fields_to_send=fields_to_send,
event_time=event_time,
is_token_replacement=True))
# Join serviceid field with service_ids
if fields_to_send.get('serviceid'):
serviceid = fields_to_send.get('serviceid').split('\n')
if fields_to_send.get('service_ids'):
service_ids = fields_to_send.get('service_ids').split(',')
fields_to_send['service_ids'] = ','.join(set(service_ids + serviceid))
else:
fields_to_send['service_ids'] = ','.join(serviceid)
result = self.pre_processing(fields_to_send)
if not result:
self.logger.error('Failed to create notable due to pre-processing step event=%s', fields_to_send)
raise NotableEventException('Pre-processing step failed while creating notable event')
else:
self.logger.debug("Returning event=%s source=%s host=%s to list", fields_to_send, source, host)
fields_to_send['event_source'] = source
fields_to_send['event_host'] = host
return fields_to_send
def update_and_push_events(self, events):
"""
Update push event
@return: None
"""
try:
self.push_manager.push_events(
events,
sourcetype=self._get_sourcetype()
)
except Exception as e:
self.undo_pre_processing()
raise e
def process_result_file(self, mode):
"""
Process result file with file open mode
@type mode: basestring
@param mode: file open mode
@return: None
"""
events = []
events_to_ingest = []
with gzip.open(self.results_file, mode) as fh:
for num, result in enumerate(csv.DictReader(fh)):
processed_event = self.proceess_notable_event(result, num)
events.append(processed_event)
if self.push_manager.is_queue_mode_enabled:
self.update_ingest_events(processed_event, events_to_ingest)
# if sort_notable_events is enabled, sort events & then push
if self.sort_notable_events == 1 and len(events) > 0:
self.logger.debug('Sort notable events based on _time value')
try:
# sort json
events = sorted(events, key=lambda k: k['_time'], reverse=False)
except Exception as e:
self.logger.exception('Failed to sort notable events: %s' % e.args[0])
batch_size = int(self._get_field(self.BATCH_SIZE, "5000"))
total_events = len(events)
for start in range(0, total_events, batch_size):
batch_events = events[start:min(start + batch_size, total_events)]
self.update_and_push_events(batch_events)
if len(events_to_ingest) > 0:
if self.push_manager.is_queue_mode_enabled:
nats_publisher = NatsEventPublisher(self.session_key, self.logger)
asyncio.run(nats_publisher.push_events_to_nats(events_to_ingest))
def update_ingest_events(self, processed_event, events_to_ingest):
host = processed_event['event_host']
source = processed_event['event_source']
event_time = float(time_import.time()) if not processed_event.get('_time') else (
float(processed_event.get('_time')))
event_to_ingest = copy.deepcopy(processed_event)
event_to_ingest['sourcetype'] = self._get_sourcetype() if not processed_event.get('event_sourcetype') else (
processed_event.get('event_sourcetype'))
event_to_ingest['host'] = host
event_to_ingest['source'] = source
event_to_ingest['_time'] = float(time_import.time()) if not event_time else event_time
events_to_ingest.append(event_to_ingest)
def run(self):
"""
Main function which is invoked by base class function
@rtype: bool
@return: True/False
"""
if not os.path.exists(self.results_file):
self.logger.info('Result file=%s does not exist. This could happen when search has no results',
self.results_file)
sys.stdout.write('No results found.')
try:
try:
self.process_result_file('rt')
except ValueError as e:
# On Windows Value Error is thrown because it interprets *nix csv files as binary files.
self.logger.info('Got ValueError: %s , trying to open the file in binary mode.', e.message)
self.process_result_file('rb')
return True
except IOError as e:
if e.errno == 2: # No file exist
self.logger.info('No results founds from search, %s', e.errno)
else:
self.logger.exception(e)
sys.stderr.write(e.args[0])
raise e
except Exception as e:
self.logger.exception(e)
sys.stderr.write(e.args[0])
raise e