# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. import abc import csv import gzip import json import sys import os import random import time as time_import import copy import asyncio from splunk.clilib.bundle_paths import make_splunkhome_path from splunk.util import normalizeBoolean sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib'])) sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib', 'SA_ITOA_app_common'])) from SA_ITOA_app_common.splunklib import results from itsi.itsi_utils import ITOAInterfaceUtils from ITOA.setup_logging import logger from ITOA.event_management.push_event_manager import PushEventManager from ITOA.itoa_common import get_itsi_event_management_conf_field_value from .notable_event_utils import filter_index_fields_and_get_event_id_for_notable_event, NotableEventException from .itsi_nats_publish import NatsEventPublisher # set the maximum allowable CSV field size # # The default of the csv module is 128KB; upping to 10MB. See SPL-12117 for # the background on issues surrounding field sizes. # (this method is new in python 2.5) csv.field_size_limit(10485760) # This is common modular action which is being copied from CIM # Once we moved to model of pull library at run time, Please delete ModularAction class code and # pull this class from common lib class InvalidResultID(Exception): pass class ModularAction(object): DEFAULT_MESSAGE = 'sendmodalert - signature="%s" action_name="%s" search_name="%s" sid="%s" orig_sid="%s" rid="%s" orig_rid="%s" app="%s" owner="%s" action_mode="%s" action_status="%s"' # we require a logging instance def __init__(self, settings, logger, action_name=None): self.settings = json.loads(settings) self.logger = logger self.session_key = self.settings.get('session_key') self.sid = self.settings.get('sid') self.orig_sid = '' self.rid = '' self.orig_rid = '' self.results_file = self.settings.get('results_file') self.search_name = self.settings.get('search_name') self.app = self.settings.get('app') self.owner = self.settings.get('owner') self.configuration = self.settings.get('configuration', {}) # use | sendalert param.action_name=$action_name$ self.action_name = self.configuration.get('action_name') or action_name or 'unknown' self.action_mode = 'undetermined' self.action_status = '' # Since we don't use the result object we get from settings it will be purged del self.settings['result'] # get job self.job = {} try: service = ITOAInterfaceUtils.service_connection(self.session_key, app_name="SA-ITOA") search_job = service.job(self.sid) error_message = search_job.messages.get('error') if error_message: self.logger.warning(self.message(f'Could not retrieve search job info, Error: {error_message}')) else: result = results.JSONResultsReader(search_job.results(output_mode='json')) for event in result: if isinstance(event, dict): self.job = event self.logger.info(self.message('Successfully retrieved search job info.')) self.logger.debug(self.job) except Exception: logger.exception("Exception retrieving search job info.") self.logger.warning(self.message('Could not retrieve search job info.')) # set action_mode if self.job.get('delegate', 'scheduler') == 'scheduler': self.action_mode = 'saved' else: self.action_mode = 'adhoc' # The purpose of this method is to provide a common messaging interface def message(self, signature, status=None): status = status or self.action_status return ModularAction.DEFAULT_MESSAGE % ( signature, self.action_name, self.search_name, self.sid, self.orig_sid, self.rid, self.orig_rid, self.app, self.owner, self.action_mode, status ) # The purpose of this method is to update per-result ModAction attributes def update(self, result): # This is for events/results that were created as the result of a previous action self.orig_sid = result.get('orig_sid', '') # This is for events/results that were created as the result of a previous action self.orig_rid = result.get('orig_rid', '') if 'rid' in result: self.rid = result['rid'] else: raise InvalidResultID('Result must have an ID') # The purpose of this method is to generate per-result invocation messages def invoke(self): self.logger.debug(self.message('Invoking modular alert action')) def dowork(self): return class SendAlert(ModularAction, metaclass=abc.ABCMeta): """ Send alert to index file """ # FIELD NAME HTTP_AUTH_TOKEN = 'http_auth_token' HTTP_TOKEN_NAME = 'http_token_name' INDEX_NAME = 'index' SOURCETYPE = 'sourcetype' TITLE = 'title' DESCRIPTION = 'description' OWNER = 'owner' STATUS = 'status' SEVERITY = 'severity' DRILLDOWN_SEARCH_TITLE = 'drilldown_search_title' DRILLDOWN_SEARCH_SEARCH = 'drilldown_search_search' DRILLDOWN_SEARCH_LATEST_OFFSET = 'drilldown_search_latest_offset' DRILLDOWN_SEARCH_EARLIEST_OFFSET = 'drilldown_search_earliest_offset' DRILLDOWN_TITLE = 'drilldown_title' DRILLDOWN_URI = 'drilldown_uri' UNIQUE_IDENTIFIER_FIELDS = 'event_identifier_fields' METADATA = 'meta_data' EDITOR = 'editor' EVENT_FIELD_MAX_LENGTH = 'event_field_max_length' BATCH_SIZE = 'batch_size' # Reserved fields used by itsi_tracked_alerts index RESERVED_FIELDS = {'event_id'} def __init__(self, settings, is_validate=True): """ Initialized send alert class instance @type settings: basestring @param settings: sys.stdin.read() contains @type is_validate: bool @param is_validate: flag to validate required params or not @return: """ self.required_params = [self.HTTP_TOKEN_NAME, self.INDEX_NAME] self.optional_params = [self.HTTP_AUTH_TOKEN, self.SOURCETYPE, self.TITLE, self.DESCRIPTION, self.TITLE, self.SEVERITY, self.OWNER, self.STATUS, self.DRILLDOWN_SEARCH_SEARCH, self.DRILLDOWN_SEARCH_TITLE, self.DRILLDOWN_SEARCH_EARLIEST_OFFSET, self.DRILLDOWN_SEARCH_LATEST_OFFSET, self.DRILLDOWN_TITLE, self.DRILLDOWN_URI, self.UNIQUE_IDENTIFIER_FIELDS, self.EVENT_FIELD_MAX_LENGTH] action_name = 'event_generator' super(SendAlert, self).__init__(settings, logger, action_name) self.splunkd_uri = self.settings.get('server_uri') # Construct UI to update alert_actions settings self.app = 'SA-ITOA' self.owner = 'nobody' self.params = {} # define it after validation self.push_manager = None if is_validate and not self.validate_params(): raise ValueError('Failed to validate arguments. Please make sure arguments are correct') else: self.initialize_params() # Get collection fields self.event_id_key = 'event_id' # check whether sort_notable_events is true self.sort_notable_events = get_itsi_event_management_conf_field_value(self.session_key, 'tracked_alert', 'sort_notable_events') if not self.sort_notable_events: self.sort_notable_events = 0 if self.sort_notable_events == 1: try: self.is_use_event_time = normalizeBoolean(self._get_field('is_use_event_time')) except Exception as e: self.logger.exception('Failed to get is_use_event_time value : %s' % e.args[0]) self.is_use_event_time = False if not self.is_use_event_time: self.logger.warning('The is_use_event_time field value is 0. Please change it to 1 in order to sort ' 'the notable events.') self.sort_notable_events = 0 @abc.abstractmethod def pre_processing(self, event): """ Abstract function which has to be implement by inherit class Normally it is being used to perform some validation or may be send events to some where else @param event: dict @param event: event which is going to be push to index or had pushed @rtype: bool @return: True - if event pushed to kv store successfully otherwise false """ raise NotImplementedError('Function is not implemented') @abc.abstractmethod def undo_pre_processing(self): """ Undo operation of pre_processsing @return: """ raise NotImplementedError('Function is not implemented') def validate_params(self): """ Validate parameters @rtype: bool @return: True/False """ is_found = True field_not_available = None for key in self.required_params: if key not in self.configuration: is_found = False field_not_available = key break if not is_found: raise ValueError('Required field={0} does not exist'.format(field_not_available)) return self.additional_validation() def initialize_params(self): """ Initialize parameters @return: """ for key in self.required_params: self.params[key] = self.configuration.get(key) for optional_key in self.optional_params: self.params[optional_key] = self.configuration.get(optional_key) # Now fetch remaining configuration for key, value in self.configuration.items(): if key not in self.params and value: self.params[key] = value self.logger.debug('Parameters=%s', self.params) return self.params def _get_field(self, name, default_value=None): """ Get field @type name: basestring @param name: field name @type default_value: basestring @param default_value: default value if field value does not exist @return: field value """ return self.configuration.get(name, default_value) def _get_sourcetype(self): """ Get sourcetype, dafault value is stash @rtype: basestring @return: sourcetype """ value = self._get_field(self.SOURCETYPE) if value is None: # Return default return 'itsi_notable:event' else: return value def additional_validation(self): """ Perform additional validation like checking for auth token key. If key does not exist then create http token key @return: None """ # Check if auth token is provided otherwise, get auth token and set it # Get new token_name = self._get_field(self.HTTP_TOKEN_NAME) # Acquire token index = self._get_field(self.INDEX_NAME) # noqa F841 sourcetype = self._get_sourcetype() # noqa F841 try: self.push_manager = PushEventManager(self.session_key, token_name=token_name) except Exception as e: self.logger.exception(e) return False return True def proceess_high_scale_ea_backfill_event(self, result): is_backfill_event = result.get('is_backfill') processed_event = json.loads(result.get('_raw')) if result.get('host') is not None: processed_event['event_host'] = result.get('host') else: processed_event['event_host'] = self.settings.get('server_host') # Search name will be come source otherwise result source is being set as source type if result.get('source') is not None: processed_event['event_source'] = result.get('source') else: processed_event['event_source'] = '' if processed_event.get('search_name') is None else processed_event['search_name'] if result.get('sourcetype') is not None: processed_event['event_sourcetype'] = result.get('sourcetype') else: processed_event['event_sourcetype'] = self._get_sourcetype() if result.get('_time') is not None: processed_event['_time'] = float(result.get('_time')) processed_event['is_backfill_event'] = is_backfill_event return processed_event def proceess_notable_event(self, result, num): """ @type result: dict @param result: result @type num: int @param num: result count @return: None """ block_list_param_fields = [self.HTTP_AUTH_TOKEN, self.INDEX_NAME, self.HTTP_TOKEN_NAME, self.SOURCETYPE, self.METADATA, self.EDITOR, self.BATCH_SIZE] result.setdefault('rid', str(num)) result['orig_sid'] = result.get('orig_sid', self.sid) result['orig_rid'] = result.get('orig_rid', result['rid']) self.update(result) self.invoke() fields_to_send = {} # Add parameter fields for field in self.params: if field not in block_list_param_fields: fields_to_send[field] = self._get_field(field) max_field_len = int(self._get_field(self.EVENT_FIELD_MAX_LENGTH, "10000")) # Make sure event does not contains fields which are similar name as parameters for field in list(result.keys()): if len(result[field]) > max_field_len: # Truncate field by keep first max_field_len chars. self.logger.warning('The length of field %s is %s which exceeds the max limit of %s chars.' 'The field will be truncated.', field, len(result[field]), max_field_len) result[field] = result[field][0:max_field_len] # if field similar to parameter of alert if field in self.configuration or field in self.RESERVED_FIELDS: if 'orig_' + field not in result: result['orig_' + field] = result[field] else: self.logger.warning('Field=orig_%s already exist in the result hence adding random integer in the' ' field ', field) result['orig_' + field + str(int(random.random() * 100000))] = result[field] del result[field] if result.get('host') is not None: host = result.get('host') else: host = self.settings.get('server_host') # Search name will be come source otherwise result source is being set as source type if self.search_name: source = self.search_name # Add search_name field fields_to_send['search_name'] = self.search_name else: source = fields_to_send.get('source') fields_to_send['source'] = source fields_to_send['host'] = host # Filter index time fields and also create event_id, _time, mod_time and event_identifier_hash identifier_fields = self._get_field(self.UNIQUE_IDENTIFIER_FIELDS) self.logger.debug('Identifier fields="%s" for search=%s', identifier_fields, self.search_name) event_time = result.get('_time') if normalizeBoolean(self.configuration.get('is_use_event_time', 0)) else None fields_to_send.update(filter_index_fields_and_get_event_id_for_notable_event(result, self.logger, identifier_fields, fields_to_send=fields_to_send, event_time=event_time, is_token_replacement=True)) # Join serviceid field with service_ids if fields_to_send.get('serviceid'): serviceid = fields_to_send.get('serviceid').split('\n') if fields_to_send.get('service_ids'): service_ids = fields_to_send.get('service_ids').split(',') fields_to_send['service_ids'] = ','.join(set(service_ids + serviceid)) else: fields_to_send['service_ids'] = ','.join(serviceid) result = self.pre_processing(fields_to_send) if not result: self.logger.error('Failed to create notable due to pre-processing step event=%s', fields_to_send) raise NotableEventException('Pre-processing step failed while creating notable event') else: self.logger.debug("Returning event=%s source=%s host=%s to list", fields_to_send, source, host) fields_to_send['event_source'] = source fields_to_send['event_host'] = host return fields_to_send def update_and_push_events(self, events): """ Update push event @return: None """ try: self.push_manager.push_events( events, sourcetype=self._get_sourcetype() ) except Exception as e: self.undo_pre_processing() raise e def process_result_file(self, mode): """ Process result file with file open mode @type mode: basestring @param mode: file open mode @return: None """ events = [] events_to_ingest = [] with gzip.open(self.results_file, mode) as fh: for num, result in enumerate(csv.DictReader(fh)): processed_event = self.proceess_notable_event(result, num) events.append(processed_event) if self.push_manager.is_queue_mode_enabled: self.update_ingest_events(processed_event, events_to_ingest) # if sort_notable_events is enabled, sort events & then push if self.sort_notable_events == 1 and len(events) > 0: self.logger.debug('Sort notable events based on _time value') try: # sort json events = sorted(events, key=lambda k: k['_time'], reverse=False) except Exception as e: self.logger.exception('Failed to sort notable events: %s' % e.args[0]) batch_size = int(self._get_field(self.BATCH_SIZE, "5000")) total_events = len(events) for start in range(0, total_events, batch_size): batch_events = events[start:min(start + batch_size, total_events)] self.update_and_push_events(batch_events) if len(events_to_ingest) > 0: if self.push_manager.is_queue_mode_enabled: nats_publisher = NatsEventPublisher(self.session_key, self.logger) asyncio.run(nats_publisher.push_events_to_nats(events_to_ingest)) def update_ingest_events(self, processed_event, events_to_ingest): host = processed_event['event_host'] source = processed_event['event_source'] event_time = float(time_import.time()) if not processed_event.get('_time') else ( float(processed_event.get('_time'))) event_to_ingest = copy.deepcopy(processed_event) event_to_ingest['sourcetype'] = self._get_sourcetype() if not processed_event.get('event_sourcetype') else ( processed_event.get('event_sourcetype')) event_to_ingest['host'] = host event_to_ingest['source'] = source event_to_ingest['_time'] = float(time_import.time()) if not event_time else event_time events_to_ingest.append(event_to_ingest) def run(self): """ Main function which is invoked by base class function @rtype: bool @return: True/False """ if not os.path.exists(self.results_file): self.logger.info('Result file=%s does not exist. This could happen when search has no results', self.results_file) sys.stdout.write('No results found.') try: try: self.process_result_file('rt') except ValueError as e: # On Windows Value Error is thrown because it interprets *nix csv files as binary files. self.logger.info('Got ValueError: %s , trying to open the file in binary mode.', e.message) self.process_result_file('rb') return True except IOError as e: if e.errno == 2: # No file exist self.logger.info('No results founds from search, %s', e.errno) else: self.logger.exception(e) sys.stderr.write(e.args[0]) raise e except Exception as e: self.logger.exception(e) sys.stderr.write(e.args[0]) raise e