You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1158 lines
59 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
'''
Utility module for handling event onboarding actions, such as creating, updating, or deleting connections
'''
from __future__ import absolute_import
import json
# import re
import sys
import time
import itsi_py3
from builtins import object
from splunk.clilib.bundle_paths import make_splunkhome_path
from ITOA.storage.itoa_storage import ITOAStorage
sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib']))
from ITOA.event_management.constants import (EA_DATA_INTEGRATION_METHOD_TYPES,
EA_DATA_INTEGRATION_INPUT_TYPE,
EA_DATA_INTEGRATION_VALID_STATUS,
EA_DATA_INTEGRATION_CS_TITLE_PREFIX,
EA_DATA_INT_DEDUP_SEARCH_FOR_RAW_ALERT,
EA_DATA_INT_DEDUP_SEARCH_FOR_NOTABLE_EVENT)
from SA_ITOA_app_common.solnlib.conf_manager import ConfManager
from SA_ITOA_app_common.splunklib import results
from ITOA.setup_logging import logger
from itsi.event_management.itsi_correlation_search import ItsiCorrelationSearch
from itsi.itsi_utils import ITOAInterfaceUtils
from ITOA.itoa_common import is_feature_enabled
from splunk import ResourceNotFound
import splunk.rest as splunk_rest
from splunk.util import safeURLQuote
from SA_ITOA_app_common.solnlib.conf_manager import ConfStanzaNotExistException
class HttpEventListenerException(Exception):
pass
class EventOnboardingUtils(object):
"""
A class to support Event Onboarding operations
"""
def __init__(self, splunkd_session_key, current_user_name, user='nobody', app='itsi'):
self.dedup_search_for_raw_alert = EA_DATA_INT_DEDUP_SEARCH_FOR_RAW_ALERT
self.dedup_search_for_notable_event = EA_DATA_INT_DEDUP_SEARCH_FOR_NOTABLE_EVENT
if splunkd_session_key is None or splunkd_session_key == "":
raise HttpEventListenerException("Invalid splunkd session key")
self.splunkd_session_key = splunkd_session_key
self.current_user_name = current_user_name
self.user = user
self.app = app
self.correlation_interface = ItsiCorrelationSearch(
splunkd_session_key,
current_user_name=current_user_name,
user='nobody',
app='itsi',
logger=logger,
)
self.connection_storage = ITOAStorage(collection='itsi_data_integration')
self.template_storage = ITOAStorage(collection='itsi_data_integration_template')
self.preview_results_limit = 300
self.preview_results_search_wait_time = 10
self.is_splunk_throttling_enabled = is_feature_enabled('itsi-apply-splunk-throttling', splunkd_session_key)
self.valid_severities = []
self.valid_status = []
@staticmethod
def is_valid_regex(regex_string):
"""
Check whether the given regex string is a valid regular expression or not
TODO: Uncomment below once python version is upgraded to 3.11. Re under 3.11 do not support atomic grouping
"""
return True
# try:
# re.compile(regex_string)
# return True
# except re.error:
# return False
@staticmethod
def validate_connection_payload(connection_payload, is_preview=False):
"""
Validates the connection payload which is created from the json payload in the request to create or update the
connection. It checks that all required data or fields in the payload is present.
@type connection_payload: dict
@param connection_payload: request payload with connection details
@type is_preview: bool
@param is_preview: whether the request payload is for generating preview results
@rtype: None
@return: None
"""
if 'data_source' not in connection_payload:
message = "data_source not provided in the request payload"
logger.error(message)
raise Exception(message)
if 'ingestion_method' not in connection_payload:
message = "ingestion_method not provided in the request payload"
logger.error(message)
raise Exception(message)
if 'type' not in connection_payload['ingestion_method']:
message = "ingestion_method type not provided in the request payload"
logger.error(message)
raise Exception(message)
ingestion_method = connection_payload['ingestion_method']
method_type = ingestion_method['type']
if method_type not in EA_DATA_INTEGRATION_METHOD_TYPES:
message = "method_type is invalid. method_type should be one of the following: {}".format(
EA_DATA_INTEGRATION_METHOD_TYPES)
logger.error(message)
raise Exception(message)
if 'value' not in connection_payload['ingestion_method']:
message = "In the request payload, no value provided under ingestion_method"
logger.error(message)
raise Exception(message)
if not is_preview:
if 'status' not in connection_payload:
message = "status not provided in the request payload. status should be one of the following: {}".format(
EA_DATA_INTEGRATION_VALID_STATUS)
logger.error(message)
raise Exception(message)
status = connection_payload['status']
if status not in EA_DATA_INTEGRATION_VALID_STATUS:
message = "invalid status provided in the request payload. status should be one of the following: {}".format(
EA_DATA_INTEGRATION_VALID_STATUS)
logger.error(message)
raise Exception(message)
if 'title' not in connection_payload:
message = "title not provided in the request payload"
logger.error(message)
raise Exception(message)
if 'throttling' in connection_payload:
throttling_params = connection_payload['throttling']
if 'throttling_earliest_time' in throttling_params:
throttling_earliest_time = throttling_params['throttling_earliest_time']
if throttling_earliest_time[0] != '-':
message = "invalid throttling_earliest_time provided. It should have a minus sign in the front."
logger.error(message)
raise Exception(message)
if 'throttling_latest_time' in throttling_params:
throttling_latest_time = throttling_params['throttling_latest_time']
if throttling_latest_time != 'now' and throttling_latest_time[0] != '-':
message = ("invalid throttling_latest_time provided. It should either be 'now' or it should "
"have a minus sign in the front.")
logger.error(message)
raise Exception(message)
if 'dedup_grouping_fields' in throttling_params:
dedup_grouping_fields = throttling_params['dedup_grouping_fields']
if not isinstance(dedup_grouping_fields, list):
message = "invalid data type provided for dedup_grouping_fields. It should be a list."
logger.error(message)
raise Exception(message)
if 'dedup_notable_event' in throttling_params:
dedup_notable_event = throttling_params['dedup_notable_event']
if not isinstance(dedup_notable_event, bool):
message = ("invalid value provided for dedup_notable_event. It should be of type boolean: "
"true or false. If value is true or false, make sure it's not a string. "
"dedup_notable_event={}. Type of dedup_notable_event: {}".
format(dedup_notable_event, type(dedup_notable_event)))
logger.error(message)
raise Exception(message)
if 'mapped_fields' not in connection_payload:
message = "mapped_fields not provided in the request payload"
logger.error(message)
raise Exception(message)
mapped_fields = connection_payload['mapped_fields']
if len(mapped_fields) == 0:
message = "no mappings provided under mapped_fields in the request payload"
logger.error(message)
raise Exception(message)
for key in mapped_fields:
field_name = key
field = mapped_fields[key]
if 'input_type' not in field:
message = "input_type is not provided for the field {}".format(field_name)
logger.error(message)
raise Exception(message)
input_type = field['input_type']
if input_type not in EA_DATA_INTEGRATION_INPUT_TYPE:
message = "input_type is incorrect. It should be one of the following: {}".format(
EA_DATA_INTEGRATION_INPUT_TYPE)
logger.error(message)
raise Exception(message)
if 'values' not in field:
message = "no values provided for the field {}".format(field_name)
logger.error(message)
raise Exception(message)
values = field['values']
if len(values) <= 0:
message = "no values provided for the field {}".format(field_name)
logger.error(message)
raise Exception(message)
if input_type == 'regex':
if len(values) > 1:
# there should be only 1 value in regex
message = ("multiple values provided for the field {}. There should be only 1 value for regex "
"input_type").format(field_name)
logger.error(message)
raise Exception(message)
# if no regex_source is provided for regex input_type then use '_raw' field as the regex_source
if 'regex_source' not in field:
message = ("no regex_source provided for the field {}. If there is no specific field for "
"regex_source then pass _raw as it's value").format(field_name)
logger.error(message)
raise Exception(message)
if not field['regex_source']:
message = ("no value provided for regex_source for field {}. If there is no specific field for "
"regex_source then pass _raw as it's value").format(field_name)
logger.error(message)
raise Exception(message)
regex_str = field['values'][0]
if not EventOnboardingUtils.is_valid_regex(regex_str):
message = "invalid regex provided for field {}. Regex: {}".format(field_name, regex_str)
logger.error(message)
raise Exception(message)
if '<itsi_field_name>' not in regex_str:
message = "<itsi_field_name> capture group not present in regular expression for field {}".format(
field_name)
logger.error(message)
raise Exception(message)
@staticmethod
def get_modified_capture_group(field_name, regex):
"""
Modify the capture group in the regex by prepending field name whose value we are trying to extract
via the regular expression. All the regular expressions will have the (?<itsi_field_name>) capture group.
itsi_field_name will contain the value we are trying to extract. Since in the connection, we can have multiple
field mappings, we need to give more specific names for the capture groups so there are no conflicts.
Example: field_name="title", regex="browserType:(?<itsi_field_name>.*?)"
Modified regex will look like this: "browserType:(?<title_itsi_field_name>.*?)"
If there are any spaces in the field name, the captured group will replace those with underscores. This is
just to avoid any potential issues by having spaces in the capture group name.
@type field_name: string
@param field_name: Name of the field whose value we are trying to extract
@type regex: string
@param regex: regular expression which is trying to extract value via the named capture group. The named
capture group will be <itsi_field_name>.
@rtype: string
@return: regex with the modified capture group
"""
if not field_name:
message = "field name is empty. regex for field: {}".format(regex)
logger.error(message)
raise Exception(message)
if not regex:
message = "regex is empty for field {}".format(field_name)
logger.error(message)
raise Exception(message)
if '<itsi_field_name>' not in regex:
message = ("Capture group called \"<itsi_field_name>\" should be provided in the regex. "
"field name: {}. regex: {}").format(field_name, regex)
logger.error(message)
raise Exception(message)
modified_field_name = field_name.replace(' ', '_')
modified_capture_group = modified_field_name + '_' + 'itsi_field_name'
return modified_capture_group
@staticmethod
def get_groupingid_eval(group_by_fields, field_name='internal_groupingid'):
"""
Takes a list of field names, concatenes those fields separated by colon, and assign it to the field_name in
an eval expression.
Example: group_by_fields = ['title, 'status', 'severity']
Return "| eval groupingid= title . ":" . status . ":" . severity"
"""
if group_by_fields:
group_spl = ''
joining_str = ' . ":" . '
for field in group_by_fields:
group_spl += '{}{}'.format(joining_str, field)
# remove the joining_str from the beginning of the string
group_spl = group_spl[len(joining_str) - 1:]
group_spl = '"ALARM:" . {}'.format(group_spl)
grouping_eval = '| eval {} = {}'.format(field_name, group_spl)
logger.info('grouping_eval: %s', grouping_eval)
return grouping_eval
else:
return None
def get_dedup_notable_event_search(self, earliest_time='-59m', latest_time='now'):
"""
Modifies the self.dedup_search_for_notable_event to replace the earliest and latest time with the given
values
"""
# replace this string: "earliest=-59m latest=now"
old_time_str = "earliest=-59m latest=now"
new_time_str = 'earliest={} latest={}'.format(earliest_time, latest_time)
new_search = self.dedup_search_for_notable_event.replace(old_time_str, new_time_str)
logger.info('After modifying the time string: dedup_search_for_notable_event=%s', new_search)
return new_search
@staticmethod
def get_default_value(field_key, field_data):
"""
Get default value for the field_key from its field_data.
@type field_key: str
@param field_key: name of the field whose default value we are trying to determine
@type field_data: dict
@param field_data: dictionary (from json payload) which contains data pertaining to the given field_key
@rtype: string
@return: default value pertaining to the given field_key. If no default value determined, return None.
"""
if 'default_value' in field_data:
return field_data['default_value']
if field_key == 'owner':
return 'unassigned'
elif field_key == 'severity':
return '1'
elif field_key == 'status':
return '1'
return None
@staticmethod
def insert_additional_query(spl, additional_query):
"""
Takes the additional_query and insert in front of the first pipe in the spl
@type spl: str
@param spl: name of the field whose default value we are trying to determine
@type additional_query: dict
@param additional_query: dictionary (from json payload) which contains data pertaining to the given field_key
@rtype: string
@return: additional_query inserted in front of the first pipe in the spl
"""
idx = spl.find('|') # find the first pipe in the spl query
new_spl = ''
if idx == -1: # no pipe found
new_spl = '{} {}'.format(spl, additional_query)
else: # pipe found
first_part = spl[:idx - 1]
second_part = spl[idx:]
# if original spl is: index=itsi_main | eval my_owner="blahblah" | head 10
# and additional_query is: src="x" signature="abc"
# new spl will be: index=itsi_main src="x" signature="abc" | eval my_owner="blahblah" | head 10
new_spl = '{} {} {}'.format(first_part, additional_query, second_part)
return new_spl
def calculate_throttling_params(self, connection_payload):
# Set default values
throttling_earliest_time = '-59m'
throttling_latest_time = 'now'
dedup_grouping_fields = []
dedup_notable_event = False
throttling_enabled = False
# Check for throttling in the payload
if 'throttling' in connection_payload:
throttling_params = connection_payload['throttling']
logger.info('throttling_params=%s', throttling_params)
# Update dedup_grouping_fields if present
if 'dedup_grouping_fields' in throttling_params:
dedup_grouping_fields = throttling_params['dedup_grouping_fields']
# Enable throttling if there are grouping fields
throttling_enabled = len(dedup_grouping_fields) > 0
# Update earliest and latest throttling times if present
throttling_earliest_time = throttling_params.get('throttling_earliest_time', throttling_earliest_time)
throttling_latest_time = throttling_params.get('throttling_latest_time', throttling_latest_time)
# Update dedup_notable_event if present
dedup_notable_event = throttling_params.get('dedup_notable_event', dedup_notable_event)
# Return the calculated fields as a dictionary
return {
'throttling_earliest_time': throttling_earliest_time,
'throttling_latest_time': throttling_latest_time,
'dedup_grouping_fields': dedup_grouping_fields,
'dedup_notable_event': dedup_notable_event,
'throttling_enabled': throttling_enabled
}
def generate_spl_for_indexed_connection(self, connection_payload, is_preview=False):
"""
Generates SPL search from the field mappings in the given connection payload.
@type connection_payload: dict
@param connection_payload: connection payload json for the connection being created or updated. Along with
other parameters, it will also contain field mappings.
@type session_key: basestring
@param session_key: session_key
@type search_name: string
@param search_name: name for the saved search. It will be the same as the title of the connection.
@type create_connection: bool
@param create_connection: whether the connection is being created or being updated. If it's set to true, a
new saved search will be created for it. If it's false, the existing saved search for the connection will
be updated.
@type is_preview: bool
@param is_preview: whether we are generating the spl search for preview results
@rtype: string
@return: SPL search created from the field mappings
"""
orig_search_query = connection_payload['ingestion_method']['value']
search_query = orig_search_query
if not search_query:
message = "Search query is empty in ingestion method. It should be provided as a value under ingestion_method."
logger.error(message)
raise Exception(message)
final_spl = search_query
throttling_dict = self.calculate_throttling_params(connection_payload)
if throttling_dict['throttling_enabled']:
time_str = 'earliest={} latest={}'.format(throttling_dict['throttling_earliest_time'], throttling_dict['throttling_latest_time'])
final_spl = EventOnboardingUtils.insert_additional_query(final_spl, time_str)
mapped_fields = connection_payload['mapped_fields']
for field_key in mapped_fields:
field = mapped_fields[field_key]
input_type = field['input_type']
default_val = EventOnboardingUtils.get_default_value(field_key, field)
if input_type == 'regex':
regex_val = field['values'][0]
capture_group = 'itsi_field_name'
# escape double quotes within the regex because in rex the regex is surrounded in double quotes
regex_val = regex_val.replace('"', '\\"')
regex_src_field = field['regex_source']
# if there are curly braces around regex_source field name, then remove them
if regex_src_field[0] == '{' and regex_src_field[-1] == '}':
regex_src_field = regex_src_field[1:-1]
rex_str = ' | rex field={} \"{}"'.format(regex_src_field, regex_val)
if default_val:
# if there is a default value for the given field_key and if the eval expression didn't extract
# any value then assign it the default value
eval_str = '{} | eval {} = coalesce({}, "{}")'.format(rex_str, field_key, capture_group,
default_val)
else:
eval_str = '{} | eval {} = {}'.format(rex_str, field_key, capture_group)
final_spl += eval_str
elif input_type == 'composition':
values = field['values']
val_so_far = ''
for x in values:
val = str(x)
if len(val) > 2 and val[0] == '{' and val[-1] == '}':
# it's a field name if it's enclosed in {}, extract the field name. Example: "{vendor_severity}"
v = val[1:len(val) - 1]
# surround field names with single quotes to allow special characters such as dot.
# example: | eval title = 'Source.title'
val_so_far = "{} + \'{}\'".format(val_so_far, v)
else:
# it's a plain string value
val_so_far = '{} + \"{}\"'.format(val_so_far, val)
# Remove the ' + ' at the beginning of the string
val_so_far = val_so_far[2:]
if default_val:
# if there is a default value for the given field_key and if the eval expression didn't extract
# any value then assign it the default value
final_spl = '{} | eval {} = coalesce({}, "{}")'.format(final_spl, field_key, val_so_far,
default_val)
else:
final_spl = '{} | eval {} = {}'.format(final_spl, field_key, val_so_far)
elif input_type == 'mapping_rule':
rule_type = field['rule_type']
values = field['values']
if rule_type == 'case':
case_condition = self.generate_case_statement(values)
final_spl = '{} | eval {} = {}'.format(final_spl, field_key, case_condition)
elif rule_type == 'coalesce':
coalesce_condition = self.generate_coalesce_statement(field)
final_spl = '{} | eval {} = {}'.format(final_spl, field_key, coalesce_condition)
else:
# This should not be the case because we should always check this in our initial validation
message = "Invalid input_type provided for field {}. input_type should be one of the following {}".format(
field_key, input_type)
logger.error(message)
raise Exception(message)
# severity_id_mapping and status_id_mapping are expected to be SPL statements.
# Example: case(vendor_severity="CRITICAL", 6, vendor_severity="WARN", 2, 1=1, 1)
# Todo: Remove this once template is updated with default values for case & coalesce
'''
severity_id_mapping = connection_payload['severity_id_mapping']
if severity_id_mapping:
final_spl = '{} | eval severity_id = {}'.format(final_spl, severity_id_mapping)
if 'status_id_mapping' in connection_payload:
status_id_mapping = connection_payload['status_id_mapping']
if status_id_mapping:
if not isinstance(status_id_mapping, itsi_py3.string_type):
message = "Invalid value provided for status_id_mapping. It should be string type."
logger.error(message)
raise Exception(message)
else:
final_spl = '{} | eval status = {}'.format(final_spl, status_id_mapping)
final_spl = '{} | eval subcomponent=if(isnotnull(subcomponent), subcomponent, "-")'.format(final_spl)
'''
if throttling_dict['throttling_enabled']:
grouping_eval = EventOnboardingUtils.get_groupingid_eval(throttling_dict['dedup_grouping_fields'])
final_spl = '{} {}'.format(final_spl, grouping_eval)
logger.info('Appended groupingid field. spl so far: %s', final_spl)
final_spl = '{} {}'.format(final_spl, self.dedup_search_for_raw_alert)
if throttling_dict['dedup_notable_event']:
final_spl = '{} {}'.format(
final_spl,
self.get_dedup_notable_event_search(
throttling_dict['throttling_earliest_time'],
throttling_dict['throttling_latest_time']
)
)
'''
escaped_orig_query = orig_search_query.replace('"', '\\"')
# Todo: Remove below once template is updated with default values for case & coalesce
additional_query = 'src=\\"" + src + "\\" signature=\\"" + signature + "\\"'
first_spl = EventOnboardingUtils.insert_additional_query(escaped_orig_query, additional_query)
additional_query_2 = 'src=\\"" + src + "\\" signature=\\"" + signature + "\\" subcomponent=\\"" + subcomponent + "\\"'
second_spl = EventOnboardingUtils.insert_additional_query(escaped_orig_query, additional_query_2)
drill_search_eval = ('| eval itsiDrilldownSearch=case(isnotnull(itsiDrilldownSearch), '
'itsiDrilldownSearch, subcomponent="-", \"{}\",'
'1=1, \"{}\")').format(first_spl, second_spl)
final_spl = '{} {}'.format(final_spl, drill_search_eval)
drill_web_name_eval = (
'| eval itsiDrilldownWebName=case((isnotnull(itsiDrilldownWebURL) AND isnotnull(itsiDrilldownWebName)), itsiDrilldownWebName, '
'(isnotnull(itsiDrilldownWebURL) AND isnull(itsiDrilldownWebName)), "External Drilldown for " + title, '
'isnull(itsiDrilldownWebURL), "Sorry, no external drilldown available")')
final_spl = '{} {}'.format(final_spl, drill_web_name_eval)
drill_web_url_eval = '| eval itsiDrilldownWebURL=if(isnotnull(itsiDrilldownWebURL), itsiDrilldownWebURL, "https://splunk.com") '
final_spl = '{} {}'.format(final_spl, drill_web_url_eval)
drill_earliest_offset_eval = '| eval itsiDrilldownEarliestOffset=coalesce(itsiDrilldownEarliestOffset, "-900")'
final_spl = '{} {}'.format(final_spl, drill_earliest_offset_eval)
drill_latest_offset_eval = '| eval itsiDrilldownLatestOffset=coalesce(itsiDrilldownLatestOffset, "900")'
final_spl = '{} {}'.format(final_spl, drill_latest_offset_eval)
'''
#####
# IMPORTANT: this part of preview SPL has to happen before we add filter_maintenance_services(service_ids)
# otherwise we will not get _raw and _time fields
if is_preview:
# 'eval tmp_preview_raw = _raw' is needed to preserve the _raw for preview results. We need to do this
# because once we apply filter_maintenance_services() in SPL, we will lose _raw and _time fields.
# When search job returns _time field, it will be converted to human-readable time stamp, however we want
# the epoch time hence adding a separate field to preserve the epoch time
final_spl = '{} | eval tmp_preview_raw = _raw | eval epoch_time = _time'.format(final_spl)
entity_lookup_field = connection_payload.get("association", {}).get("entity_lookup_field")
service_ids = connection_payload.get("association", {}).get("service_ids")
if entity_lookup_field is not None and entity_lookup_field != '':
final_spl = '{} | `apply_entity_lookup({})`'.format(final_spl, entity_lookup_field)
if service_ids is not None and service_ids != '':
final_spl = '{} | `filter_maintenance_services("{}")`'.format(final_spl, service_ids)
if is_preview:
# limit the max number of results to the preview_results_limit value defined in the conf
final_spl = '{} | head {}'.format(final_spl, self.preview_results_limit)
return final_spl
@staticmethod
def get_correlation_search_name(connection_title):
"""
Construct the correlation search name for the given connection title
@type connection_title: str
@rtype: str
"""
return EA_DATA_INTEGRATION_CS_TITLE_PREFIX + connection_title
def create_correlation_search_payload(self, connection_payload, spl_search):
if "cron_schedule" in connection_payload and "value" in connection_payload["cron_schedule"]:
cron_schedule = connection_payload["cron_schedule"]["value"]
else:
# If "cron_schedule" field is not present or "value" field is missing, assign default value
cron_schedule = "*/1 * * * *"
time_range = connection_payload.get("ingestion_method", {}).get("time_range", {})
dispatch_earliest_time = time_range.get("earliest", "-15m")
dispatch_latest_time = time_range.get("latest", "now")
connection_title = connection_payload['title']
correlation_search_title = self.get_correlation_search_name(connection_title)
status = connection_payload.get('status', "active")
disabled = 0 if status == "active" else 1
payload = {
"name": correlation_search_title,
"description": "Correction Search for data integration connection: {}".format(connection_title),
"actions": "itsi_event_generator",
"action.itsi_event_generator": 1,
"action.itsi_event_generator.param.meta_data": "{}",
"action.itsi_event_generator.param.search_type": "basic",
"action.itsi_event_generator.param.title": "%title%",
"action.itsi_event_generator.param.status": "%status%",
"action.itsi_event_generator.param.severity": "%severity_id%",
"action.itsi_event_generator.param.owner": "%owner%",
"action.itsi_event_generator.param.itsi_instruction": "%itsi_instruction%",
"action.itsi_event_generator.param.drilldown_search_title": "%itsiDrilldownSearchName%",
"action.itsi_event_generator.param.drilldown_search_search": "%itsiDrilldownSearch%",
"action.itsi_event_generator.param.drilldown_search_earliest_offset": "%itsiDrilldownEarliestOffset%",
"action.itsi_event_generator.param.drilldown_search_latest_offset": "%itsiDrilldownLatestOffset%",
"action.itsi_event_generator.param.drilldown_title": "%itsiDrilldownWebName%",
"action.itsi_event_generator.param.drilldown_uri": "%itsiDrilldownWebURL%",
"action.itsi_event_generator.param.event_identifier_fields": "groupingid",
"alert.track": "0",
"counttype": "number of events",
"cron_schedule": cron_schedule,
"dispatch.earliest_time": dispatch_earliest_time,
"dispatch.latest_time": dispatch_latest_time,
"enableSched": 1,
"quantity": "0",
"relation": "greater than",
"disabled": disabled,
"hide_on_ui": 1,
"search": spl_search
}
entity_lookup_field = connection_payload.get("association", {}).get("entity_lookup_field")
service_ids = connection_payload.get("association", {}).get("service_ids")
if entity_lookup_field is not None and entity_lookup_field != '':
payload["action.itsi_event_generator.param.entity_lookup_field"] = entity_lookup_field
if service_ids is not None and service_ids != '':
payload["action.itsi_event_generator.param.service_ids"] = service_ids
if self.is_splunk_throttling_enabled:
throttling_dict = self.calculate_throttling_params(connection_payload)
payload["alert.suppress"] = 1
if 'dedup_grouping_fields' in throttling_dict and throttling_dict['dedup_grouping_fields']:
payload["alert.suppress.fields"] = ','.join(throttling_dict['dedup_grouping_fields'])
if 'throttling_earliest_time' in throttling_dict and throttling_dict['throttling_earliest_time']:
duration_in_minutes = throttling_dict['throttling_earliest_time'][1:]
payload["alert.suppress.period"] = duration_in_minutes
return payload
def create_or_update_correlation_search(self, create, connection_payload, spl_search):
connection_title = connection_payload['title']
correlation_search_title = self.get_correlation_search_name(connection_title)
payload = self.create_correlation_search_payload(connection_payload, spl_search)
if create:
logger.info("About to create correlation search: {}".format(correlation_search_title))
self.correlation_interface.create(payload, raise_if_exist=True)
else:
logger.info("About to update correlation search: {}".format(correlation_search_title))
self.correlation_interface.update(object_id=correlation_search_title, data=payload)
def create_or_update_connection(self, connection_payload, create_connection=False):
"""
Create or update a correlation search for the given connection payload
@type connection_payload: dict
@param connection_payload: connection payload json for the connection being created or updated. Along with
other parameters, it will also contain field mappings.
@type create_connection: bool
@param create_connection: whether the connection is being created or being updated. If it's set to true, a
new saved search will be created for it. If it's false, the existing saved search for the connection will
be updated.
@rtype: None
@return: None
"""
connection_action = "create" if create_connection else "update"
try:
# Even for partial updates, for example activating or deactivating the connection,
# itsi_data_integration is sending the whole object. We can use the regular flow of updating
# the whole correlation search instead of having a separate flow for enabling or disabling CS
logger.info("Received request to {} connection: {}".format(connection_action, connection_payload))
EventOnboardingUtils.validate_connection_payload(connection_payload)
title = connection_payload['title']
self.init_dedup_searches()
spl_search = self.generate_spl_for_indexed_connection(connection_payload, is_preview=False)
logger.info("Created spl search for connection. title=%s spl_search=%s", title, spl_search)
self.create_or_update_correlation_search(create_connection, connection_payload, spl_search)
except Exception as e:
title = connection_payload['title'] if 'title' in connection_payload else ''
message = "Unable to {} connection {}. Error message: {}".format(connection_action, title, e)
logger.error(message)
raise Exception(message)
@staticmethod
def wait_for_job(search_job, maxtime=10):
"""
Wait up to maxtime seconds for search_job to finish. If maxtime is
negative, waits forever. Returns true, if job finished.
"""
pause = 0.2
lapsed = 0.0
while not search_job.is_done():
time.sleep(pause)
lapsed += pause
if maxtime >= 0 and lapsed > maxtime:
logger.info("Done waiting for job to be finished. maxtime=%s lapsed=%s", maxtime, lapsed)
break
return search_job.is_done()
def get_search_job(self, search_query, earliest_time='-24h', latest_time='now'):
"""
Creates search job.
"""
service = ITOAInterfaceUtils.service_connection(self.splunkd_session_key, app_name="SA-ITOA")
current_search_job = service.jobs.create(
search_query,
earliest_time=earliest_time,
latest_time=latest_time
)
return current_search_job
def wait_for_search(self, search_query, earliest_time='-24h@h', latest_time='now'):
"""
Create search job and wait for search job to complete. Return the list of results from the search job.
"""
try:
search_job = self.get_search_job(search_query, earliest_time, latest_time)
if not self.wait_for_job(search_job, self.preview_results_search_wait_time):
raise Exception("Search for data integration connection preview results timed out.")
params = {
'output_mode': 'json',
'count': 0
}
search_results = search_job.results(**params)
results_to_return = []
for entry in search_results:
if isinstance(entry, (bytes, bytearray)):
entry = entry.decode('utf-8')
entry_json = json.loads(entry)
if 'results' in entry_json:
for row in entry_json['results']:
results_to_return.append(row)
else:
logger.error('No results field found in response: {}'.format(entry_json))
logger.info('Total number of events retrieved from preview search: {}'.format(len(results_to_return)))
return results_to_return
except Exception as e:
message = "Error occurred while getting preview results: {}".format(e)
logger.error(message)
raise Exception(message)
@staticmethod
def get_mapped_field_names(connection_payload):
"""
From the given connection payload, get the set of field names to be mapped.
@type connection_payload: dict
@param connection_payload: connection payload json for the connection to be created, updated, or previewed.
The payload must contain 'mapped_fields' key whose value have fields which are to be mapped.
@rtype: set
@return: set of field names which are going to be transformed
"""
mapped_fields = connection_payload['mapped_fields']
field_names = []
for key in mapped_fields:
if key not in field_names:
field_names.append(key)
return field_names
def get_event_onboarding_conf(self):
"""
Get event onboarding configuration from itsi_event_management.conf file. The configs will be stored in the
class variables.
@rtype: None
@return: None
"""
try:
cfm = ConfManager(self.splunkd_session_key, 'SA-ITOA')
conf = cfm.get_conf('itsi_event_management')
settings = conf.get('event_onboarding')
self.preview_results_limit = int(settings['preview_results_limit'])
self.preview_results_search_wait_time = float(settings['preview_results_search_wait_time'])
self.valid_severities = cfm.get_conf('itsi_notable_event_severity')
self.valid_status = cfm.get_conf('itsi_notable_event_status')
logger.info('event_onboarding settings: {} preview_results_limit={} '
'preview_results_search_wait_time={} '.format(settings, self.preview_results_limit,
self.preview_results_search_wait_time))
except Exception as e:
logger.error('Error occurred while getting configuration for event onboarding. Will use default'
'values for configs whose values were not fetched. Exception: {}'.format(e))
def get_conf_label(self, result, conf_type):
try:
if conf_type == 'severity_id':
return self.valid_severities.get(result, {}).get('label')
if conf_type == 'status':
return self.valid_status.get(result, {}).get('label')
except ConfStanzaNotExistException:
# Handle the specific exception and return a custom message
return f"{conf_type} not found"
except Exception as e:
logger.error('Error occurred while getting configuration for event onboarding. Exception: {}'.format(e))
def get_preview_results(self, connection_payload):
"""
Get preview results for the given connection payload
@type connection_payload: dict
@param connection_payload: connection payload json for the connection to be created or updated. Along with
other parameters, it will also contain field mappings. Field mappings are mandatory for getting preview results
@rtype: str
@return: list of events (in str format) for preview results. each event will have the following fields:
transformed_fields, _raw, and _time.
transformed_fields will contain the new fields which are create via the mapped_fields and their corresponding
value.
_raw is the original raw event.
_time is the time of the event.
Example of results returned:
[
{
"transformed_fields": {
"status": "1",
"title": "aaabbb_ccc",
"severity": "5",
"owner": "abc_my_owner"
},
"_raw": "{\"field1\": \"aaa\", \"field2\": \"bbb\", \"field3\": \"ccc\", \"description\": \"blah_description_1596\", \"event_severity\": \"blah:5-wow\", \"status_field\": 1, \"host\": \"abc\"}",
"_time": "1711392312.9779332"
},
{
"transformed_fields": {
"status": "3",
"title": "aaabbb_ccc",
"severity": "5",
"owner": "abc_my_owner"
},
"_raw": "{\"field1\": \"aaa\", \"field2\": \"bbb\", \"field3\": \"ccc\", \"description\": \"blah_description_4647\", \"event_severity\": \"blah:5-wow\", \"status_field\": 3, \"host\": \"abc\"}",
"_time": "1711391629.354221"
}
]
"""
logger.info("Received request to preview events. Connection payload: {}".format(connection_payload))
self.validate_connection_payload(connection_payload, is_preview=True)
self.get_event_onboarding_conf()
self.init_dedup_searches()
spl_search = self.generate_spl_for_indexed_connection(connection_payload, is_preview=True)
search_query = "| search {}".format(spl_search)
earliest_time = connection_payload['earliest_time'] if 'earliest_time' in connection_payload else '-24h@h'
latest_time = connection_payload['latest_time'] if 'latest_time' in connection_payload else 'now'
logger.info("Created search query to preview events for connection. search_query=%s", search_query)
search_results = self.wait_for_search(search_query, earliest_time, latest_time)
transformed_field_names = self.get_mapped_field_names(connection_payload)
decoded_search_results = []
for result in search_results:
item = {"transformed_fields": {}}
for name in transformed_field_names:
if name in result:
if name.lower() == 'severity_id' or name.lower() == 'status':
item["transformed_fields"][name] = self.get_conf_label(result[name], name.lower())
continue
item["transformed_fields"][name] = result[name]
else:
logger.error("Could not find the following field in search results: {}".format(name))
if "tmp_preview_raw" in result:
item["_raw"] = result["tmp_preview_raw"]
else:
logger.error('Could not find _raw field in the search results')
# Return the epoch_time as _time because we want to return _time in epoch format
if "epoch_time" in result:
item["_time"] = result["epoch_time"]
else:
logger.error('Could not find epoch_time field in the search results')
decoded_search_results.append(item)
logger.info('Number of events to be returned for preview: {}'.format(len(decoded_search_results)))
# interface handler is expecting a string response
json_str = json.dumps(decoded_search_results)
return json_str
def delete_connection(self, connection_id, itsi_data_integration_interface):
"""
Deletes the dependencies of the given connection_id
@type connection_id: str
@param connection_id: _key of the connection which will be deleted
@type itsi_data_integration_interface: ItsiDataIntegration
@param itsi_data_integration_interface: used to access the itsi_data_integration collection
@rtype: None
@return: None
"""
cs_search_name = ""
try:
logger.info("Received request to delete connection. connection_id={}".format(connection_id))
connection_object = itsi_data_integration_interface.get(connection_id)
cs_search_name = self.get_correlation_search_name(connection_object['title'])
self.correlation_interface.delete(cs_search_name)
except ResourceNotFound as re:
logger.info("Correlation Search \'{}\' not found. Exception: {}".format(cs_search_name, re))
except Exception as e:
message = "Unable to delete the correlation search for connection_id: {}. Error: {}".format(connection_id,
e)
logger.error(message)
raise Exception(message)
def get_search_from_macros_conf(self, macro_name):
try:
uri_string = ('/servicesNS/{}/{}/properties/macros/'
'{}/definition').format(self.user, self.app, macro_name)
uri = safeURLQuote(uri_string)
res, content = splunk_rest.simpleRequest(uri, getargs={'output_mode': 'json'},
sessionKey=self.splunkd_session_key)
logger.info('Result of getting %s from macros.conf: response=%s content=%s', macro_name, res, content)
if res.status not in [200, 201]:
logger.error('Error in getting %s from macros.conf. response=%s content=%s', macro_name, res, content)
return None
if not content:
logger.error('%s from macros.conf was not returned', macro_name)
return None
if isinstance(content, (bytes, bytearray)):
content = content.decode('utf-8')
return content
except Exception as e:
logger.error('Error in getting %s from macros.conf. Exception=%s', macro_name, e)
return None
def init_dedup_searches(self):
"""
Retrieve the dedup searches from macros.conf
"""
dedup_raw_alert_definition = self.get_search_from_macros_conf('dedup_search_for_raw_alert')
if dedup_raw_alert_definition:
self.dedup_search_for_raw_alert = dedup_raw_alert_definition
logger.info('Successfully fetched dedup_search_for_raw_alert from macros.conf. '
'dedup_search_for_raw_alert=%s', self.dedup_search_for_raw_alert)
else:
logger.error('Error in fetching dedup_search_for_raw_alert from macros.conf. '
'Will use the default value. dedup_search_for_raw_alert=%s', self.dedup_search_for_raw_alert)
dedup_notable_event_definition = self.get_search_from_macros_conf('dedup_search_for_notable_event')
if dedup_notable_event_definition:
self.dedup_search_for_notable_event = dedup_notable_event_definition
logger.info('Successfully fetched dedup_search_for_notable_event from macros.conf. '
'dedup_search_for_notable_event=%s', self.dedup_search_for_notable_event)
else:
logger.error('Error in fetching dedup_search_for_notable_event from macros.conf. Will use the '
'default value. dedup_search_for_notable_event=%s', self.dedup_search_for_notable_event)
@staticmethod
def quote_field_if_needed(field_name):
# Quote the field if it contains special characters like periods, parentheses, etc.
special_chars = ['.']
if any(char in field_name for char in special_chars):
return f"'{field_name}'" # Wrap the field name in single quotes for Splunk
return field_name
@staticmethod
def format_value(value):
if isinstance(value, str):
if value.startswith("{") and value.endswith("}"):
field_name = value[1:-1] # Remove curly braces for field references
return EventOnboardingUtils.quote_field_if_needed(field_name)
return f'"{value}"'
if isinstance(value, bool):
return str(value).lower() # Convert boolean to lowercase string
return str(value)
def construct_case_clause_string(self, clauses):
"""
Function to construct a logical condition string
@param clauses: if else conditions from payload for the selected mapping field
@return: return the
"""
clause_strings = []
for clause in clauses:
if 'field' not in clause or 'operator' not in clause:
raise KeyError('Each clause must contain "field" and "operator" keys')
field = clause["field"]
operator = clause["operator"] # Todo: decide on operator naming convention with UI team
value = clause.get("value", "")
case_sensitive = clause.get("case_sensitive", True)
if operator == "is not null":
clause_strings.append(f'isnotnull({field})')
elif operator == "is null":
clause_strings.append(f'isnull({field})')
elif operator in ["==", "!=", ">", "<", ">=", "<="]:
if not case_sensitive:
if isinstance(value, str):
clause_strings.append(f'lower({field}) {operator} "{value.lower()}"')
else:
clause_strings.append(f'lower({field}) {operator} {value}')
else:
clause_strings.append(f'{field} {operator} "{value}"')
return " AND ".join(clause_strings).strip()
def generate_case_statement(self, values):
"""
Function to process the JSON structure and generate a case statement
@param values: values from selected mapping field
@return:
"""
case_statements = []
for condition in values:
if 'condition' not in condition:
raise KeyError('Missing "condition" key in one of the values')
if 'outcomes' not in condition:
raise KeyError('Missing "outcomes" key in one of the values')
if condition['condition'] in ['IF', 'ELSE_IF'] and 'clauses' not in condition:
raise KeyError('Missing "clauses" key for IF or ELSE_IF condition')
condition_type = condition["condition"]
clauses = self.construct_case_clause_string(condition.get("clauses", []))
outcomes = condition["outcomes"]
if not isinstance(outcomes, list):
raise TypeError('Outcomes should be a list')
outcome_str = ' . '.join(self.format_value(outcome) for outcome in outcomes)
if condition_type == "IF" or condition_type == "ELSE_IF":
case_statements.append(f'{clauses}, {outcome_str}')
elif condition_type == "ELSE":
case_statements.append(f'1=1, {outcome_str}')
return f'case({", ".join(case_statements)})'
def generate_coalesce_statement(self, field):
"""
Function to process the JSON structure and generate a coalesce statement
@param field: mapping field details from payload
@return: coalesce statement
"""
if 'values' not in field:
raise KeyError('Missing "values" key in the input data')
values = field['values']
if not isinstance(values, list):
raise TypeError('The "values" key must be a list')
coalesce_values = []
for value in values:
if isinstance(value, list):
concatenated_value = " . ".join(self.format_value(item) for item in value)
coalesce_values.append(concatenated_value)
else:
coalesce_values.append(self.format_value(value))
if 'default_value' in field:
default_value = field['default_value']
coalesce_values.append(self.format_value(default_value))
coalesce_statement = f'coalesce({", ".join(coalesce_values)})'
return coalesce_statement
def migrate_connection_objects(self, version_number):
"""
Migration router for connection objects
@param: version_number: migration handler version number
@return: list of transformed connection objects
"""
if version_number == '4.20.0':
old_connection_objects = self.get_all_connection_objects()
transformed_connection_objects = self.transform_connection_objects(old_connection_objects)
self.update_all_connection_objects(transformed_connection_objects)
return transformed_connection_objects
return []
def get_all_connection_objects(self):
"""
Gets all data integration connection objects from KV store
@return: list of connection objects
"""
return self.connection_storage.get_all(self.splunkd_session_key,
self.user,
'data_integration')
def update_all_connection_objects(self, connection_objects):
"""
Updates KV store with connection objects
@param connection_objects: list of data integration connection objects
@return: None
"""
for connection in connection_objects:
connection_key = connection['_key']
self.connection_storage.delete(self.splunkd_session_key,
'nobody',
'data_integration',
connection_key)
self.connection_storage.create(self.splunkd_session_key,
'nobody',
'data_integration',
connection)
def get_template_mapping_fields_dict(self):
"""
Retrieves template mapping_fields from conf
@return: dictionary of template mapping_fields for each data type
"""
existing_templates = ['generic', 'nagios', 'solarwinds', 'scom', 'o11y', 'appdynamics', 'thousandeyes', 'cloudtrail']
template_mapping_fields_dict = {}
cfm = ConfManager(self.splunkd_session_key, 'SA-ITOA')
conf = cfm.get_conf('itsi_data_integration_template')
for template_name in existing_templates:
template = conf.get(template_name)
mapping_fields = json.loads(template.get('mapping_fields'))
template_mapping_fields_dict[template_name] = mapping_fields
return template_mapping_fields_dict
def transform_connection_objects(self, connection_objects):
"""
Function to transform connection objects to support coalesce and case
@param connection_objects: list of connection objects
@return: list of updated connection objects
"""
fields_to_update = ['severity_id', 'subcomponent', 'itsiDrilldownSearchName', 'itsiDrilldownEarliestOffset',
'itsiDrilldownLatestOffset', 'itsiDrilldownWebName', 'itsiDrilldownWebURL']
template_mapping_fields_dict = self.get_template_mapping_fields_dict()
# update connection objects
updated_connection_objects = []
for connection in connection_objects:
if 'is_out_of_the_box' in connection and connection['is_out_of_the_box'] == 1:
continue
connection_id = connection['_key']
logger.info(f'Migrating connection {connection_id}')
connection_data_source = connection['data_source']
cur_template_mapping_fields = template_mapping_fields_dict[connection_data_source]
cur_connection_mapped_fields = connection['mapped_fields']
for mapping in cur_template_mapping_fields:
field = mapping['name']
if field not in fields_to_update and not mapping['required']:
continue
# Check if required field exists in connection
# If field does not exist in connection, apply default case/coalesce values to mapping
if field not in cur_connection_mapped_fields:
updated_mapped_field = mapping.copy()
if ('input_type' in updated_mapped_field and updated_mapped_field['input_type'] == 'mapping_rule'
and updated_mapped_field['rule_type'] == 'case'):
for value in updated_mapped_field['values']:
outcome = value['outcomes'][0]
if isinstance(outcome, dict):
value['outcomes'] = [outcome['value']]
updated_mapped_field.pop('required', None)
cur_connection_mapped_fields[field] = updated_mapped_field
connection['mapped_fields'] = cur_connection_mapped_fields
logger.info(f'Transformed connection {connection_id} payload: {connection}')
updated_connection_objects.append(connection)
return updated_connection_objects