# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. ''' Utility module for handling event onboarding actions, such as creating, updating, or deleting connections ''' from __future__ import absolute_import import json # import re import sys import time import itsi_py3 from builtins import object from splunk.clilib.bundle_paths import make_splunkhome_path from ITOA.storage.itoa_storage import ITOAStorage sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib'])) from ITOA.event_management.constants import (EA_DATA_INTEGRATION_METHOD_TYPES, EA_DATA_INTEGRATION_INPUT_TYPE, EA_DATA_INTEGRATION_VALID_STATUS, EA_DATA_INTEGRATION_CS_TITLE_PREFIX, EA_DATA_INT_DEDUP_SEARCH_FOR_RAW_ALERT, EA_DATA_INT_DEDUP_SEARCH_FOR_NOTABLE_EVENT) from SA_ITOA_app_common.solnlib.conf_manager import ConfManager from SA_ITOA_app_common.splunklib import results from ITOA.setup_logging import logger from itsi.event_management.itsi_correlation_search import ItsiCorrelationSearch from itsi.itsi_utils import ITOAInterfaceUtils from ITOA.itoa_common import is_feature_enabled from splunk import ResourceNotFound import splunk.rest as splunk_rest from splunk.util import safeURLQuote from SA_ITOA_app_common.solnlib.conf_manager import ConfStanzaNotExistException class HttpEventListenerException(Exception): pass class EventOnboardingUtils(object): """ A class to support Event Onboarding operations """ def __init__(self, splunkd_session_key, current_user_name, user='nobody', app='itsi'): self.dedup_search_for_raw_alert = EA_DATA_INT_DEDUP_SEARCH_FOR_RAW_ALERT self.dedup_search_for_notable_event = EA_DATA_INT_DEDUP_SEARCH_FOR_NOTABLE_EVENT if splunkd_session_key is None or splunkd_session_key == "": raise HttpEventListenerException("Invalid splunkd session key") self.splunkd_session_key = splunkd_session_key self.current_user_name = current_user_name self.user = user self.app = app self.correlation_interface = ItsiCorrelationSearch( splunkd_session_key, current_user_name=current_user_name, user='nobody', app='itsi', logger=logger, ) self.connection_storage = ITOAStorage(collection='itsi_data_integration') self.template_storage = ITOAStorage(collection='itsi_data_integration_template') self.preview_results_limit = 300 self.preview_results_search_wait_time = 10 self.is_splunk_throttling_enabled = is_feature_enabled('itsi-apply-splunk-throttling', splunkd_session_key) self.valid_severities = [] self.valid_status = [] @staticmethod def is_valid_regex(regex_string): """ Check whether the given regex string is a valid regular expression or not TODO: Uncomment below once python version is upgraded to 3.11. Re under 3.11 do not support atomic grouping """ return True # try: # re.compile(regex_string) # return True # except re.error: # return False @staticmethod def validate_connection_payload(connection_payload, is_preview=False): """ Validates the connection payload which is created from the json payload in the request to create or update the connection. It checks that all required data or fields in the payload is present. @type connection_payload: dict @param connection_payload: request payload with connection details @type is_preview: bool @param is_preview: whether the request payload is for generating preview results @rtype: None @return: None """ if 'data_source' not in connection_payload: message = "data_source not provided in the request payload" logger.error(message) raise Exception(message) if 'ingestion_method' not in connection_payload: message = "ingestion_method not provided in the request payload" logger.error(message) raise Exception(message) if 'type' not in connection_payload['ingestion_method']: message = "ingestion_method type not provided in the request payload" logger.error(message) raise Exception(message) ingestion_method = connection_payload['ingestion_method'] method_type = ingestion_method['type'] if method_type not in EA_DATA_INTEGRATION_METHOD_TYPES: message = "method_type is invalid. method_type should be one of the following: {}".format( EA_DATA_INTEGRATION_METHOD_TYPES) logger.error(message) raise Exception(message) if 'value' not in connection_payload['ingestion_method']: message = "In the request payload, no value provided under ingestion_method" logger.error(message) raise Exception(message) if not is_preview: if 'status' not in connection_payload: message = "status not provided in the request payload. status should be one of the following: {}".format( EA_DATA_INTEGRATION_VALID_STATUS) logger.error(message) raise Exception(message) status = connection_payload['status'] if status not in EA_DATA_INTEGRATION_VALID_STATUS: message = "invalid status provided in the request payload. status should be one of the following: {}".format( EA_DATA_INTEGRATION_VALID_STATUS) logger.error(message) raise Exception(message) if 'title' not in connection_payload: message = "title not provided in the request payload" logger.error(message) raise Exception(message) if 'throttling' in connection_payload: throttling_params = connection_payload['throttling'] if 'throttling_earliest_time' in throttling_params: throttling_earliest_time = throttling_params['throttling_earliest_time'] if throttling_earliest_time[0] != '-': message = "invalid throttling_earliest_time provided. It should have a minus sign in the front." logger.error(message) raise Exception(message) if 'throttling_latest_time' in throttling_params: throttling_latest_time = throttling_params['throttling_latest_time'] if throttling_latest_time != 'now' and throttling_latest_time[0] != '-': message = ("invalid throttling_latest_time provided. It should either be 'now' or it should " "have a minus sign in the front.") logger.error(message) raise Exception(message) if 'dedup_grouping_fields' in throttling_params: dedup_grouping_fields = throttling_params['dedup_grouping_fields'] if not isinstance(dedup_grouping_fields, list): message = "invalid data type provided for dedup_grouping_fields. It should be a list." logger.error(message) raise Exception(message) if 'dedup_notable_event' in throttling_params: dedup_notable_event = throttling_params['dedup_notable_event'] if not isinstance(dedup_notable_event, bool): message = ("invalid value provided for dedup_notable_event. It should be of type boolean: " "true or false. If value is true or false, make sure it's not a string. " "dedup_notable_event={}. Type of dedup_notable_event: {}". format(dedup_notable_event, type(dedup_notable_event))) logger.error(message) raise Exception(message) if 'mapped_fields' not in connection_payload: message = "mapped_fields not provided in the request payload" logger.error(message) raise Exception(message) mapped_fields = connection_payload['mapped_fields'] if len(mapped_fields) == 0: message = "no mappings provided under mapped_fields in the request payload" logger.error(message) raise Exception(message) for key in mapped_fields: field_name = key field = mapped_fields[key] if 'input_type' not in field: message = "input_type is not provided for the field {}".format(field_name) logger.error(message) raise Exception(message) input_type = field['input_type'] if input_type not in EA_DATA_INTEGRATION_INPUT_TYPE: message = "input_type is incorrect. It should be one of the following: {}".format( EA_DATA_INTEGRATION_INPUT_TYPE) logger.error(message) raise Exception(message) if 'values' not in field: message = "no values provided for the field {}".format(field_name) logger.error(message) raise Exception(message) values = field['values'] if len(values) <= 0: message = "no values provided for the field {}".format(field_name) logger.error(message) raise Exception(message) if input_type == 'regex': if len(values) > 1: # there should be only 1 value in regex message = ("multiple values provided for the field {}. There should be only 1 value for regex " "input_type").format(field_name) logger.error(message) raise Exception(message) # if no regex_source is provided for regex input_type then use '_raw' field as the regex_source if 'regex_source' not in field: message = ("no regex_source provided for the field {}. If there is no specific field for " "regex_source then pass _raw as it's value").format(field_name) logger.error(message) raise Exception(message) if not field['regex_source']: message = ("no value provided for regex_source for field {}. If there is no specific field for " "regex_source then pass _raw as it's value").format(field_name) logger.error(message) raise Exception(message) regex_str = field['values'][0] if not EventOnboardingUtils.is_valid_regex(regex_str): message = "invalid regex provided for field {}. Regex: {}".format(field_name, regex_str) logger.error(message) raise Exception(message) if '' not in regex_str: message = " capture group not present in regular expression for field {}".format( field_name) logger.error(message) raise Exception(message) @staticmethod def get_modified_capture_group(field_name, regex): """ Modify the capture group in the regex by prepending field name whose value we are trying to extract via the regular expression. All the regular expressions will have the (?) capture group. itsi_field_name will contain the value we are trying to extract. Since in the connection, we can have multiple field mappings, we need to give more specific names for the capture groups so there are no conflicts. Example: field_name="title", regex="browserType:(?.*?)" Modified regex will look like this: "browserType:(?.*?)" If there are any spaces in the field name, the captured group will replace those with underscores. This is just to avoid any potential issues by having spaces in the capture group name. @type field_name: string @param field_name: Name of the field whose value we are trying to extract @type regex: string @param regex: regular expression which is trying to extract value via the named capture group. The named capture group will be . @rtype: string @return: regex with the modified capture group """ if not field_name: message = "field name is empty. regex for field: {}".format(regex) logger.error(message) raise Exception(message) if not regex: message = "regex is empty for field {}".format(field_name) logger.error(message) raise Exception(message) if '' not in regex: message = ("Capture group called \"\" should be provided in the regex. " "field name: {}. regex: {}").format(field_name, regex) logger.error(message) raise Exception(message) modified_field_name = field_name.replace(' ', '_') modified_capture_group = modified_field_name + '_' + 'itsi_field_name' return modified_capture_group @staticmethod def get_groupingid_eval(group_by_fields, field_name='internal_groupingid'): """ Takes a list of field names, concatenes those fields separated by colon, and assign it to the field_name in an eval expression. Example: group_by_fields = ['title, 'status', 'severity'] Return "| eval groupingid= title . ":" . status . ":" . severity" """ if group_by_fields: group_spl = '' joining_str = ' . ":" . ' for field in group_by_fields: group_spl += '{}{}'.format(joining_str, field) # remove the joining_str from the beginning of the string group_spl = group_spl[len(joining_str) - 1:] group_spl = '"ALARM:" . {}'.format(group_spl) grouping_eval = '| eval {} = {}'.format(field_name, group_spl) logger.info('grouping_eval: %s', grouping_eval) return grouping_eval else: return None def get_dedup_notable_event_search(self, earliest_time='-59m', latest_time='now'): """ Modifies the self.dedup_search_for_notable_event to replace the earliest and latest time with the given values """ # replace this string: "earliest=-59m latest=now" old_time_str = "earliest=-59m latest=now" new_time_str = 'earliest={} latest={}'.format(earliest_time, latest_time) new_search = self.dedup_search_for_notable_event.replace(old_time_str, new_time_str) logger.info('After modifying the time string: dedup_search_for_notable_event=%s', new_search) return new_search @staticmethod def get_default_value(field_key, field_data): """ Get default value for the field_key from its field_data. @type field_key: str @param field_key: name of the field whose default value we are trying to determine @type field_data: dict @param field_data: dictionary (from json payload) which contains data pertaining to the given field_key @rtype: string @return: default value pertaining to the given field_key. If no default value determined, return None. """ if 'default_value' in field_data: return field_data['default_value'] if field_key == 'owner': return 'unassigned' elif field_key == 'severity': return '1' elif field_key == 'status': return '1' return None @staticmethod def insert_additional_query(spl, additional_query): """ Takes the additional_query and insert in front of the first pipe in the spl @type spl: str @param spl: name of the field whose default value we are trying to determine @type additional_query: dict @param additional_query: dictionary (from json payload) which contains data pertaining to the given field_key @rtype: string @return: additional_query inserted in front of the first pipe in the spl """ idx = spl.find('|') # find the first pipe in the spl query new_spl = '' if idx == -1: # no pipe found new_spl = '{} {}'.format(spl, additional_query) else: # pipe found first_part = spl[:idx - 1] second_part = spl[idx:] # if original spl is: index=itsi_main | eval my_owner="blahblah" | head 10 # and additional_query is: src="x" signature="abc" # new spl will be: index=itsi_main src="x" signature="abc" | eval my_owner="blahblah" | head 10 new_spl = '{} {} {}'.format(first_part, additional_query, second_part) return new_spl def calculate_throttling_params(self, connection_payload): # Set default values throttling_earliest_time = '-59m' throttling_latest_time = 'now' dedup_grouping_fields = [] dedup_notable_event = False throttling_enabled = False # Check for throttling in the payload if 'throttling' in connection_payload: throttling_params = connection_payload['throttling'] logger.info('throttling_params=%s', throttling_params) # Update dedup_grouping_fields if present if 'dedup_grouping_fields' in throttling_params: dedup_grouping_fields = throttling_params['dedup_grouping_fields'] # Enable throttling if there are grouping fields throttling_enabled = len(dedup_grouping_fields) > 0 # Update earliest and latest throttling times if present throttling_earliest_time = throttling_params.get('throttling_earliest_time', throttling_earliest_time) throttling_latest_time = throttling_params.get('throttling_latest_time', throttling_latest_time) # Update dedup_notable_event if present dedup_notable_event = throttling_params.get('dedup_notable_event', dedup_notable_event) # Return the calculated fields as a dictionary return { 'throttling_earliest_time': throttling_earliest_time, 'throttling_latest_time': throttling_latest_time, 'dedup_grouping_fields': dedup_grouping_fields, 'dedup_notable_event': dedup_notable_event, 'throttling_enabled': throttling_enabled } def generate_spl_for_indexed_connection(self, connection_payload, is_preview=False): """ Generates SPL search from the field mappings in the given connection payload. @type connection_payload: dict @param connection_payload: connection payload json for the connection being created or updated. Along with other parameters, it will also contain field mappings. @type session_key: basestring @param session_key: session_key @type search_name: string @param search_name: name for the saved search. It will be the same as the title of the connection. @type create_connection: bool @param create_connection: whether the connection is being created or being updated. If it's set to true, a new saved search will be created for it. If it's false, the existing saved search for the connection will be updated. @type is_preview: bool @param is_preview: whether we are generating the spl search for preview results @rtype: string @return: SPL search created from the field mappings """ orig_search_query = connection_payload['ingestion_method']['value'] search_query = orig_search_query if not search_query: message = "Search query is empty in ingestion method. It should be provided as a value under ingestion_method." logger.error(message) raise Exception(message) final_spl = search_query throttling_dict = self.calculate_throttling_params(connection_payload) if throttling_dict['throttling_enabled']: time_str = 'earliest={} latest={}'.format(throttling_dict['throttling_earliest_time'], throttling_dict['throttling_latest_time']) final_spl = EventOnboardingUtils.insert_additional_query(final_spl, time_str) mapped_fields = connection_payload['mapped_fields'] for field_key in mapped_fields: field = mapped_fields[field_key] input_type = field['input_type'] default_val = EventOnboardingUtils.get_default_value(field_key, field) if input_type == 'regex': regex_val = field['values'][0] capture_group = 'itsi_field_name' # escape double quotes within the regex because in rex the regex is surrounded in double quotes regex_val = regex_val.replace('"', '\\"') regex_src_field = field['regex_source'] # if there are curly braces around regex_source field name, then remove them if regex_src_field[0] == '{' and regex_src_field[-1] == '}': regex_src_field = regex_src_field[1:-1] rex_str = ' | rex field={} \"{}"'.format(regex_src_field, regex_val) if default_val: # if there is a default value for the given field_key and if the eval expression didn't extract # any value then assign it the default value eval_str = '{} | eval {} = coalesce({}, "{}")'.format(rex_str, field_key, capture_group, default_val) else: eval_str = '{} | eval {} = {}'.format(rex_str, field_key, capture_group) final_spl += eval_str elif input_type == 'composition': values = field['values'] val_so_far = '' for x in values: val = str(x) if len(val) > 2 and val[0] == '{' and val[-1] == '}': # it's a field name if it's enclosed in {}, extract the field name. Example: "{vendor_severity}" v = val[1:len(val) - 1] # surround field names with single quotes to allow special characters such as dot. # example: | eval title = 'Source.title' val_so_far = "{} + \'{}\'".format(val_so_far, v) else: # it's a plain string value val_so_far = '{} + \"{}\"'.format(val_so_far, val) # Remove the ' + ' at the beginning of the string val_so_far = val_so_far[2:] if default_val: # if there is a default value for the given field_key and if the eval expression didn't extract # any value then assign it the default value final_spl = '{} | eval {} = coalesce({}, "{}")'.format(final_spl, field_key, val_so_far, default_val) else: final_spl = '{} | eval {} = {}'.format(final_spl, field_key, val_so_far) elif input_type == 'mapping_rule': rule_type = field['rule_type'] values = field['values'] if rule_type == 'case': case_condition = self.generate_case_statement(values) final_spl = '{} | eval {} = {}'.format(final_spl, field_key, case_condition) elif rule_type == 'coalesce': coalesce_condition = self.generate_coalesce_statement(field) final_spl = '{} | eval {} = {}'.format(final_spl, field_key, coalesce_condition) else: # This should not be the case because we should always check this in our initial validation message = "Invalid input_type provided for field {}. input_type should be one of the following {}".format( field_key, input_type) logger.error(message) raise Exception(message) # severity_id_mapping and status_id_mapping are expected to be SPL statements. # Example: case(vendor_severity="CRITICAL", 6, vendor_severity="WARN", 2, 1=1, 1) # Todo: Remove this once template is updated with default values for case & coalesce ''' severity_id_mapping = connection_payload['severity_id_mapping'] if severity_id_mapping: final_spl = '{} | eval severity_id = {}'.format(final_spl, severity_id_mapping) if 'status_id_mapping' in connection_payload: status_id_mapping = connection_payload['status_id_mapping'] if status_id_mapping: if not isinstance(status_id_mapping, itsi_py3.string_type): message = "Invalid value provided for status_id_mapping. It should be string type." logger.error(message) raise Exception(message) else: final_spl = '{} | eval status = {}'.format(final_spl, status_id_mapping) final_spl = '{} | eval subcomponent=if(isnotnull(subcomponent), subcomponent, "-")'.format(final_spl) ''' if throttling_dict['throttling_enabled']: grouping_eval = EventOnboardingUtils.get_groupingid_eval(throttling_dict['dedup_grouping_fields']) final_spl = '{} {}'.format(final_spl, grouping_eval) logger.info('Appended groupingid field. spl so far: %s', final_spl) final_spl = '{} {}'.format(final_spl, self.dedup_search_for_raw_alert) if throttling_dict['dedup_notable_event']: final_spl = '{} {}'.format( final_spl, self.get_dedup_notable_event_search( throttling_dict['throttling_earliest_time'], throttling_dict['throttling_latest_time'] ) ) ''' escaped_orig_query = orig_search_query.replace('"', '\\"') # Todo: Remove below once template is updated with default values for case & coalesce additional_query = 'src=\\"" + src + "\\" signature=\\"" + signature + "\\"' first_spl = EventOnboardingUtils.insert_additional_query(escaped_orig_query, additional_query) additional_query_2 = 'src=\\"" + src + "\\" signature=\\"" + signature + "\\" subcomponent=\\"" + subcomponent + "\\"' second_spl = EventOnboardingUtils.insert_additional_query(escaped_orig_query, additional_query_2) drill_search_eval = ('| eval itsiDrilldownSearch=case(isnotnull(itsiDrilldownSearch), ' 'itsiDrilldownSearch, subcomponent="-", \"{}\",' '1=1, \"{}\")').format(first_spl, second_spl) final_spl = '{} {}'.format(final_spl, drill_search_eval) drill_web_name_eval = ( '| eval itsiDrilldownWebName=case((isnotnull(itsiDrilldownWebURL) AND isnotnull(itsiDrilldownWebName)), itsiDrilldownWebName, ' '(isnotnull(itsiDrilldownWebURL) AND isnull(itsiDrilldownWebName)), "External Drilldown for " + title, ' 'isnull(itsiDrilldownWebURL), "Sorry, no external drilldown available")') final_spl = '{} {}'.format(final_spl, drill_web_name_eval) drill_web_url_eval = '| eval itsiDrilldownWebURL=if(isnotnull(itsiDrilldownWebURL), itsiDrilldownWebURL, "https://splunk.com") ' final_spl = '{} {}'.format(final_spl, drill_web_url_eval) drill_earliest_offset_eval = '| eval itsiDrilldownEarliestOffset=coalesce(itsiDrilldownEarliestOffset, "-900")' final_spl = '{} {}'.format(final_spl, drill_earliest_offset_eval) drill_latest_offset_eval = '| eval itsiDrilldownLatestOffset=coalesce(itsiDrilldownLatestOffset, "900")' final_spl = '{} {}'.format(final_spl, drill_latest_offset_eval) ''' ##### # IMPORTANT: this part of preview SPL has to happen before we add filter_maintenance_services(service_ids) # otherwise we will not get _raw and _time fields if is_preview: # 'eval tmp_preview_raw = _raw' is needed to preserve the _raw for preview results. We need to do this # because once we apply filter_maintenance_services() in SPL, we will lose _raw and _time fields. # When search job returns _time field, it will be converted to human-readable time stamp, however we want # the epoch time hence adding a separate field to preserve the epoch time final_spl = '{} | eval tmp_preview_raw = _raw | eval epoch_time = _time'.format(final_spl) entity_lookup_field = connection_payload.get("association", {}).get("entity_lookup_field") service_ids = connection_payload.get("association", {}).get("service_ids") if entity_lookup_field is not None and entity_lookup_field != '': final_spl = '{} | `apply_entity_lookup({})`'.format(final_spl, entity_lookup_field) if service_ids is not None and service_ids != '': final_spl = '{} | `filter_maintenance_services("{}")`'.format(final_spl, service_ids) if is_preview: # limit the max number of results to the preview_results_limit value defined in the conf final_spl = '{} | head {}'.format(final_spl, self.preview_results_limit) return final_spl @staticmethod def get_correlation_search_name(connection_title): """ Construct the correlation search name for the given connection title @type connection_title: str @rtype: str """ return EA_DATA_INTEGRATION_CS_TITLE_PREFIX + connection_title def create_correlation_search_payload(self, connection_payload, spl_search): if "cron_schedule" in connection_payload and "value" in connection_payload["cron_schedule"]: cron_schedule = connection_payload["cron_schedule"]["value"] else: # If "cron_schedule" field is not present or "value" field is missing, assign default value cron_schedule = "*/1 * * * *" time_range = connection_payload.get("ingestion_method", {}).get("time_range", {}) dispatch_earliest_time = time_range.get("earliest", "-15m") dispatch_latest_time = time_range.get("latest", "now") connection_title = connection_payload['title'] correlation_search_title = self.get_correlation_search_name(connection_title) status = connection_payload.get('status', "active") disabled = 0 if status == "active" else 1 payload = { "name": correlation_search_title, "description": "Correction Search for data integration connection: {}".format(connection_title), "actions": "itsi_event_generator", "action.itsi_event_generator": 1, "action.itsi_event_generator.param.meta_data": "{}", "action.itsi_event_generator.param.search_type": "basic", "action.itsi_event_generator.param.title": "%title%", "action.itsi_event_generator.param.status": "%status%", "action.itsi_event_generator.param.severity": "%severity_id%", "action.itsi_event_generator.param.owner": "%owner%", "action.itsi_event_generator.param.itsi_instruction": "%itsi_instruction%", "action.itsi_event_generator.param.drilldown_search_title": "%itsiDrilldownSearchName%", "action.itsi_event_generator.param.drilldown_search_search": "%itsiDrilldownSearch%", "action.itsi_event_generator.param.drilldown_search_earliest_offset": "%itsiDrilldownEarliestOffset%", "action.itsi_event_generator.param.drilldown_search_latest_offset": "%itsiDrilldownLatestOffset%", "action.itsi_event_generator.param.drilldown_title": "%itsiDrilldownWebName%", "action.itsi_event_generator.param.drilldown_uri": "%itsiDrilldownWebURL%", "action.itsi_event_generator.param.event_identifier_fields": "groupingid", "alert.track": "0", "counttype": "number of events", "cron_schedule": cron_schedule, "dispatch.earliest_time": dispatch_earliest_time, "dispatch.latest_time": dispatch_latest_time, "enableSched": 1, "quantity": "0", "relation": "greater than", "disabled": disabled, "hide_on_ui": 1, "search": spl_search } entity_lookup_field = connection_payload.get("association", {}).get("entity_lookup_field") service_ids = connection_payload.get("association", {}).get("service_ids") if entity_lookup_field is not None and entity_lookup_field != '': payload["action.itsi_event_generator.param.entity_lookup_field"] = entity_lookup_field if service_ids is not None and service_ids != '': payload["action.itsi_event_generator.param.service_ids"] = service_ids if self.is_splunk_throttling_enabled: throttling_dict = self.calculate_throttling_params(connection_payload) payload["alert.suppress"] = 1 if 'dedup_grouping_fields' in throttling_dict and throttling_dict['dedup_grouping_fields']: payload["alert.suppress.fields"] = ','.join(throttling_dict['dedup_grouping_fields']) if 'throttling_earliest_time' in throttling_dict and throttling_dict['throttling_earliest_time']: duration_in_minutes = throttling_dict['throttling_earliest_time'][1:] payload["alert.suppress.period"] = duration_in_minutes return payload def create_or_update_correlation_search(self, create, connection_payload, spl_search): connection_title = connection_payload['title'] correlation_search_title = self.get_correlation_search_name(connection_title) payload = self.create_correlation_search_payload(connection_payload, spl_search) if create: logger.info("About to create correlation search: {}".format(correlation_search_title)) self.correlation_interface.create(payload, raise_if_exist=True) else: logger.info("About to update correlation search: {}".format(correlation_search_title)) self.correlation_interface.update(object_id=correlation_search_title, data=payload) def create_or_update_connection(self, connection_payload, create_connection=False): """ Create or update a correlation search for the given connection payload @type connection_payload: dict @param connection_payload: connection payload json for the connection being created or updated. Along with other parameters, it will also contain field mappings. @type create_connection: bool @param create_connection: whether the connection is being created or being updated. If it's set to true, a new saved search will be created for it. If it's false, the existing saved search for the connection will be updated. @rtype: None @return: None """ connection_action = "create" if create_connection else "update" try: # Even for partial updates, for example activating or deactivating the connection, # itsi_data_integration is sending the whole object. We can use the regular flow of updating # the whole correlation search instead of having a separate flow for enabling or disabling CS logger.info("Received request to {} connection: {}".format(connection_action, connection_payload)) EventOnboardingUtils.validate_connection_payload(connection_payload) title = connection_payload['title'] self.init_dedup_searches() spl_search = self.generate_spl_for_indexed_connection(connection_payload, is_preview=False) logger.info("Created spl search for connection. title=%s spl_search=%s", title, spl_search) self.create_or_update_correlation_search(create_connection, connection_payload, spl_search) except Exception as e: title = connection_payload['title'] if 'title' in connection_payload else '' message = "Unable to {} connection {}. Error message: {}".format(connection_action, title, e) logger.error(message) raise Exception(message) @staticmethod def wait_for_job(search_job, maxtime=10): """ Wait up to maxtime seconds for search_job to finish. If maxtime is negative, waits forever. Returns true, if job finished. """ pause = 0.2 lapsed = 0.0 while not search_job.is_done(): time.sleep(pause) lapsed += pause if maxtime >= 0 and lapsed > maxtime: logger.info("Done waiting for job to be finished. maxtime=%s lapsed=%s", maxtime, lapsed) break return search_job.is_done() def get_search_job(self, search_query, earliest_time='-24h', latest_time='now'): """ Creates search job. """ service = ITOAInterfaceUtils.service_connection(self.splunkd_session_key, app_name="SA-ITOA") current_search_job = service.jobs.create( search_query, earliest_time=earliest_time, latest_time=latest_time ) return current_search_job def wait_for_search(self, search_query, earliest_time='-24h@h', latest_time='now'): """ Create search job and wait for search job to complete. Return the list of results from the search job. """ try: search_job = self.get_search_job(search_query, earliest_time, latest_time) if not self.wait_for_job(search_job, self.preview_results_search_wait_time): raise Exception("Search for data integration connection preview results timed out.") params = { 'output_mode': 'json', 'count': 0 } search_results = search_job.results(**params) results_to_return = [] for entry in search_results: if isinstance(entry, (bytes, bytearray)): entry = entry.decode('utf-8') entry_json = json.loads(entry) if 'results' in entry_json: for row in entry_json['results']: results_to_return.append(row) else: logger.error('No results field found in response: {}'.format(entry_json)) logger.info('Total number of events retrieved from preview search: {}'.format(len(results_to_return))) return results_to_return except Exception as e: message = "Error occurred while getting preview results: {}".format(e) logger.error(message) raise Exception(message) @staticmethod def get_mapped_field_names(connection_payload): """ From the given connection payload, get the set of field names to be mapped. @type connection_payload: dict @param connection_payload: connection payload json for the connection to be created, updated, or previewed. The payload must contain 'mapped_fields' key whose value have fields which are to be mapped. @rtype: set @return: set of field names which are going to be transformed """ mapped_fields = connection_payload['mapped_fields'] field_names = [] for key in mapped_fields: if key not in field_names: field_names.append(key) return field_names def get_event_onboarding_conf(self): """ Get event onboarding configuration from itsi_event_management.conf file. The configs will be stored in the class variables. @rtype: None @return: None """ try: cfm = ConfManager(self.splunkd_session_key, 'SA-ITOA') conf = cfm.get_conf('itsi_event_management') settings = conf.get('event_onboarding') self.preview_results_limit = int(settings['preview_results_limit']) self.preview_results_search_wait_time = float(settings['preview_results_search_wait_time']) self.valid_severities = cfm.get_conf('itsi_notable_event_severity') self.valid_status = cfm.get_conf('itsi_notable_event_status') logger.info('event_onboarding settings: {} preview_results_limit={} ' 'preview_results_search_wait_time={} '.format(settings, self.preview_results_limit, self.preview_results_search_wait_time)) except Exception as e: logger.error('Error occurred while getting configuration for event onboarding. Will use default' 'values for configs whose values were not fetched. Exception: {}'.format(e)) def get_conf_label(self, result, conf_type): try: if conf_type == 'severity_id': return self.valid_severities.get(result, {}).get('label') if conf_type == 'status': return self.valid_status.get(result, {}).get('label') except ConfStanzaNotExistException: # Handle the specific exception and return a custom message return f"{conf_type} not found" except Exception as e: logger.error('Error occurred while getting configuration for event onboarding. Exception: {}'.format(e)) def get_preview_results(self, connection_payload): """ Get preview results for the given connection payload @type connection_payload: dict @param connection_payload: connection payload json for the connection to be created or updated. Along with other parameters, it will also contain field mappings. Field mappings are mandatory for getting preview results @rtype: str @return: list of events (in str format) for preview results. each event will have the following fields: transformed_fields, _raw, and _time. transformed_fields will contain the new fields which are create via the mapped_fields and their corresponding value. _raw is the original raw event. _time is the time of the event. Example of results returned: [ { "transformed_fields": { "status": "1", "title": "aaabbb_ccc", "severity": "5", "owner": "abc_my_owner" }, "_raw": "{\"field1\": \"aaa\", \"field2\": \"bbb\", \"field3\": \"ccc\", \"description\": \"blah_description_1596\", \"event_severity\": \"blah:5-wow\", \"status_field\": 1, \"host\": \"abc\"}", "_time": "1711392312.9779332" }, { "transformed_fields": { "status": "3", "title": "aaabbb_ccc", "severity": "5", "owner": "abc_my_owner" }, "_raw": "{\"field1\": \"aaa\", \"field2\": \"bbb\", \"field3\": \"ccc\", \"description\": \"blah_description_4647\", \"event_severity\": \"blah:5-wow\", \"status_field\": 3, \"host\": \"abc\"}", "_time": "1711391629.354221" } ] """ logger.info("Received request to preview events. Connection payload: {}".format(connection_payload)) self.validate_connection_payload(connection_payload, is_preview=True) self.get_event_onboarding_conf() self.init_dedup_searches() spl_search = self.generate_spl_for_indexed_connection(connection_payload, is_preview=True) search_query = "| search {}".format(spl_search) earliest_time = connection_payload['earliest_time'] if 'earliest_time' in connection_payload else '-24h@h' latest_time = connection_payload['latest_time'] if 'latest_time' in connection_payload else 'now' logger.info("Created search query to preview events for connection. search_query=%s", search_query) search_results = self.wait_for_search(search_query, earliest_time, latest_time) transformed_field_names = self.get_mapped_field_names(connection_payload) decoded_search_results = [] for result in search_results: item = {"transformed_fields": {}} for name in transformed_field_names: if name in result: if name.lower() == 'severity_id' or name.lower() == 'status': item["transformed_fields"][name] = self.get_conf_label(result[name], name.lower()) continue item["transformed_fields"][name] = result[name] else: logger.error("Could not find the following field in search results: {}".format(name)) if "tmp_preview_raw" in result: item["_raw"] = result["tmp_preview_raw"] else: logger.error('Could not find _raw field in the search results') # Return the epoch_time as _time because we want to return _time in epoch format if "epoch_time" in result: item["_time"] = result["epoch_time"] else: logger.error('Could not find epoch_time field in the search results') decoded_search_results.append(item) logger.info('Number of events to be returned for preview: {}'.format(len(decoded_search_results))) # interface handler is expecting a string response json_str = json.dumps(decoded_search_results) return json_str def delete_connection(self, connection_id, itsi_data_integration_interface): """ Deletes the dependencies of the given connection_id @type connection_id: str @param connection_id: _key of the connection which will be deleted @type itsi_data_integration_interface: ItsiDataIntegration @param itsi_data_integration_interface: used to access the itsi_data_integration collection @rtype: None @return: None """ cs_search_name = "" try: logger.info("Received request to delete connection. connection_id={}".format(connection_id)) connection_object = itsi_data_integration_interface.get(connection_id) cs_search_name = self.get_correlation_search_name(connection_object['title']) self.correlation_interface.delete(cs_search_name) except ResourceNotFound as re: logger.info("Correlation Search \'{}\' not found. Exception: {}".format(cs_search_name, re)) except Exception as e: message = "Unable to delete the correlation search for connection_id: {}. Error: {}".format(connection_id, e) logger.error(message) raise Exception(message) def get_search_from_macros_conf(self, macro_name): try: uri_string = ('/servicesNS/{}/{}/properties/macros/' '{}/definition').format(self.user, self.app, macro_name) uri = safeURLQuote(uri_string) res, content = splunk_rest.simpleRequest(uri, getargs={'output_mode': 'json'}, sessionKey=self.splunkd_session_key) logger.info('Result of getting %s from macros.conf: response=%s content=%s', macro_name, res, content) if res.status not in [200, 201]: logger.error('Error in getting %s from macros.conf. response=%s content=%s', macro_name, res, content) return None if not content: logger.error('%s from macros.conf was not returned', macro_name) return None if isinstance(content, (bytes, bytearray)): content = content.decode('utf-8') return content except Exception as e: logger.error('Error in getting %s from macros.conf. Exception=%s', macro_name, e) return None def init_dedup_searches(self): """ Retrieve the dedup searches from macros.conf """ dedup_raw_alert_definition = self.get_search_from_macros_conf('dedup_search_for_raw_alert') if dedup_raw_alert_definition: self.dedup_search_for_raw_alert = dedup_raw_alert_definition logger.info('Successfully fetched dedup_search_for_raw_alert from macros.conf. ' 'dedup_search_for_raw_alert=%s', self.dedup_search_for_raw_alert) else: logger.error('Error in fetching dedup_search_for_raw_alert from macros.conf. ' 'Will use the default value. dedup_search_for_raw_alert=%s', self.dedup_search_for_raw_alert) dedup_notable_event_definition = self.get_search_from_macros_conf('dedup_search_for_notable_event') if dedup_notable_event_definition: self.dedup_search_for_notable_event = dedup_notable_event_definition logger.info('Successfully fetched dedup_search_for_notable_event from macros.conf. ' 'dedup_search_for_notable_event=%s', self.dedup_search_for_notable_event) else: logger.error('Error in fetching dedup_search_for_notable_event from macros.conf. Will use the ' 'default value. dedup_search_for_notable_event=%s', self.dedup_search_for_notable_event) @staticmethod def quote_field_if_needed(field_name): # Quote the field if it contains special characters like periods, parentheses, etc. special_chars = ['.'] if any(char in field_name for char in special_chars): return f"'{field_name}'" # Wrap the field name in single quotes for Splunk return field_name @staticmethod def format_value(value): if isinstance(value, str): if value.startswith("{") and value.endswith("}"): field_name = value[1:-1] # Remove curly braces for field references return EventOnboardingUtils.quote_field_if_needed(field_name) return f'"{value}"' if isinstance(value, bool): return str(value).lower() # Convert boolean to lowercase string return str(value) def construct_case_clause_string(self, clauses): """ Function to construct a logical condition string @param clauses: if else conditions from payload for the selected mapping field @return: return the """ clause_strings = [] for clause in clauses: if 'field' not in clause or 'operator' not in clause: raise KeyError('Each clause must contain "field" and "operator" keys') field = clause["field"] operator = clause["operator"] # Todo: decide on operator naming convention with UI team value = clause.get("value", "") case_sensitive = clause.get("case_sensitive", True) if operator == "is not null": clause_strings.append(f'isnotnull({field})') elif operator == "is null": clause_strings.append(f'isnull({field})') elif operator in ["==", "!=", ">", "<", ">=", "<="]: if not case_sensitive: if isinstance(value, str): clause_strings.append(f'lower({field}) {operator} "{value.lower()}"') else: clause_strings.append(f'lower({field}) {operator} {value}') else: clause_strings.append(f'{field} {operator} "{value}"') return " AND ".join(clause_strings).strip() def generate_case_statement(self, values): """ Function to process the JSON structure and generate a case statement @param values: values from selected mapping field @return: """ case_statements = [] for condition in values: if 'condition' not in condition: raise KeyError('Missing "condition" key in one of the values') if 'outcomes' not in condition: raise KeyError('Missing "outcomes" key in one of the values') if condition['condition'] in ['IF', 'ELSE_IF'] and 'clauses' not in condition: raise KeyError('Missing "clauses" key for IF or ELSE_IF condition') condition_type = condition["condition"] clauses = self.construct_case_clause_string(condition.get("clauses", [])) outcomes = condition["outcomes"] if not isinstance(outcomes, list): raise TypeError('Outcomes should be a list') outcome_str = ' . '.join(self.format_value(outcome) for outcome in outcomes) if condition_type == "IF" or condition_type == "ELSE_IF": case_statements.append(f'{clauses}, {outcome_str}') elif condition_type == "ELSE": case_statements.append(f'1=1, {outcome_str}') return f'case({", ".join(case_statements)})' def generate_coalesce_statement(self, field): """ Function to process the JSON structure and generate a coalesce statement @param field: mapping field details from payload @return: coalesce statement """ if 'values' not in field: raise KeyError('Missing "values" key in the input data') values = field['values'] if not isinstance(values, list): raise TypeError('The "values" key must be a list') coalesce_values = [] for value in values: if isinstance(value, list): concatenated_value = " . ".join(self.format_value(item) for item in value) coalesce_values.append(concatenated_value) else: coalesce_values.append(self.format_value(value)) if 'default_value' in field: default_value = field['default_value'] coalesce_values.append(self.format_value(default_value)) coalesce_statement = f'coalesce({", ".join(coalesce_values)})' return coalesce_statement def migrate_connection_objects(self, version_number): """ Migration router for connection objects @param: version_number: migration handler version number @return: list of transformed connection objects """ if version_number == '4.20.0': old_connection_objects = self.get_all_connection_objects() transformed_connection_objects = self.transform_connection_objects(old_connection_objects) self.update_all_connection_objects(transformed_connection_objects) return transformed_connection_objects return [] def get_all_connection_objects(self): """ Gets all data integration connection objects from KV store @return: list of connection objects """ return self.connection_storage.get_all(self.splunkd_session_key, self.user, 'data_integration') def update_all_connection_objects(self, connection_objects): """ Updates KV store with connection objects @param connection_objects: list of data integration connection objects @return: None """ for connection in connection_objects: connection_key = connection['_key'] self.connection_storage.delete(self.splunkd_session_key, 'nobody', 'data_integration', connection_key) self.connection_storage.create(self.splunkd_session_key, 'nobody', 'data_integration', connection) def get_template_mapping_fields_dict(self): """ Retrieves template mapping_fields from conf @return: dictionary of template mapping_fields for each data type """ existing_templates = ['generic', 'nagios', 'solarwinds', 'scom', 'o11y', 'appdynamics', 'thousandeyes', 'cloudtrail'] template_mapping_fields_dict = {} cfm = ConfManager(self.splunkd_session_key, 'SA-ITOA') conf = cfm.get_conf('itsi_data_integration_template') for template_name in existing_templates: template = conf.get(template_name) mapping_fields = json.loads(template.get('mapping_fields')) template_mapping_fields_dict[template_name] = mapping_fields return template_mapping_fields_dict def transform_connection_objects(self, connection_objects): """ Function to transform connection objects to support coalesce and case @param connection_objects: list of connection objects @return: list of updated connection objects """ fields_to_update = ['severity_id', 'subcomponent', 'itsiDrilldownSearchName', 'itsiDrilldownEarliestOffset', 'itsiDrilldownLatestOffset', 'itsiDrilldownWebName', 'itsiDrilldownWebURL'] template_mapping_fields_dict = self.get_template_mapping_fields_dict() # update connection objects updated_connection_objects = [] for connection in connection_objects: if 'is_out_of_the_box' in connection and connection['is_out_of_the_box'] == 1: continue connection_id = connection['_key'] logger.info(f'Migrating connection {connection_id}') connection_data_source = connection['data_source'] cur_template_mapping_fields = template_mapping_fields_dict[connection_data_source] cur_connection_mapped_fields = connection['mapped_fields'] for mapping in cur_template_mapping_fields: field = mapping['name'] if field not in fields_to_update and not mapping['required']: continue # Check if required field exists in connection # If field does not exist in connection, apply default case/coalesce values to mapping if field not in cur_connection_mapped_fields: updated_mapped_field = mapping.copy() if ('input_type' in updated_mapped_field and updated_mapped_field['input_type'] == 'mapping_rule' and updated_mapped_field['rule_type'] == 'case'): for value in updated_mapped_field['values']: outcome = value['outcomes'][0] if isinstance(outcome, dict): value['outcomes'] = [outcome['value']] updated_mapped_field.pop('required', None) cur_connection_mapped_fields[field] = updated_mapped_field connection['mapped_fields'] = cur_connection_mapped_fields logger.info(f'Transformed connection {connection_id} payload: {connection}') updated_connection_objects.append(connection) return updated_connection_objects