You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1158 lines
59 KiB
1158 lines
59 KiB
# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
|
|
'''
|
|
Utility module for handling event onboarding actions, such as creating, updating, or deleting connections
|
|
'''
|
|
from __future__ import absolute_import
|
|
import json
|
|
# import re
|
|
import sys
|
|
import time
|
|
import itsi_py3
|
|
from builtins import object
|
|
from splunk.clilib.bundle_paths import make_splunkhome_path
|
|
|
|
from ITOA.storage.itoa_storage import ITOAStorage
|
|
|
|
sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib']))
|
|
from ITOA.event_management.constants import (EA_DATA_INTEGRATION_METHOD_TYPES,
|
|
EA_DATA_INTEGRATION_INPUT_TYPE,
|
|
EA_DATA_INTEGRATION_VALID_STATUS,
|
|
EA_DATA_INTEGRATION_CS_TITLE_PREFIX,
|
|
EA_DATA_INT_DEDUP_SEARCH_FOR_RAW_ALERT,
|
|
EA_DATA_INT_DEDUP_SEARCH_FOR_NOTABLE_EVENT)
|
|
from SA_ITOA_app_common.solnlib.conf_manager import ConfManager
|
|
from SA_ITOA_app_common.splunklib import results
|
|
from ITOA.setup_logging import logger
|
|
from itsi.event_management.itsi_correlation_search import ItsiCorrelationSearch
|
|
from itsi.itsi_utils import ITOAInterfaceUtils
|
|
from ITOA.itoa_common import is_feature_enabled
|
|
from splunk import ResourceNotFound
|
|
import splunk.rest as splunk_rest
|
|
from splunk.util import safeURLQuote
|
|
from SA_ITOA_app_common.solnlib.conf_manager import ConfStanzaNotExistException
|
|
|
|
|
|
class HttpEventListenerException(Exception):
|
|
pass
|
|
|
|
|
|
class EventOnboardingUtils(object):
|
|
"""
|
|
A class to support Event Onboarding operations
|
|
"""
|
|
|
|
def __init__(self, splunkd_session_key, current_user_name, user='nobody', app='itsi'):
|
|
self.dedup_search_for_raw_alert = EA_DATA_INT_DEDUP_SEARCH_FOR_RAW_ALERT
|
|
self.dedup_search_for_notable_event = EA_DATA_INT_DEDUP_SEARCH_FOR_NOTABLE_EVENT
|
|
if splunkd_session_key is None or splunkd_session_key == "":
|
|
raise HttpEventListenerException("Invalid splunkd session key")
|
|
self.splunkd_session_key = splunkd_session_key
|
|
self.current_user_name = current_user_name
|
|
self.user = user
|
|
self.app = app
|
|
self.correlation_interface = ItsiCorrelationSearch(
|
|
splunkd_session_key,
|
|
current_user_name=current_user_name,
|
|
user='nobody',
|
|
app='itsi',
|
|
logger=logger,
|
|
)
|
|
self.connection_storage = ITOAStorage(collection='itsi_data_integration')
|
|
self.template_storage = ITOAStorage(collection='itsi_data_integration_template')
|
|
self.preview_results_limit = 300
|
|
self.preview_results_search_wait_time = 10
|
|
self.is_splunk_throttling_enabled = is_feature_enabled('itsi-apply-splunk-throttling', splunkd_session_key)
|
|
self.valid_severities = []
|
|
self.valid_status = []
|
|
|
|
@staticmethod
|
|
def is_valid_regex(regex_string):
|
|
"""
|
|
Check whether the given regex string is a valid regular expression or not
|
|
|
|
TODO: Uncomment below once python version is upgraded to 3.11. Re under 3.11 do not support atomic grouping
|
|
"""
|
|
return True
|
|
|
|
# try:
|
|
# re.compile(regex_string)
|
|
# return True
|
|
# except re.error:
|
|
# return False
|
|
|
|
@staticmethod
|
|
def validate_connection_payload(connection_payload, is_preview=False):
|
|
"""
|
|
Validates the connection payload which is created from the json payload in the request to create or update the
|
|
connection. It checks that all required data or fields in the payload is present.
|
|
|
|
@type connection_payload: dict
|
|
@param connection_payload: request payload with connection details
|
|
|
|
@type is_preview: bool
|
|
@param is_preview: whether the request payload is for generating preview results
|
|
|
|
@rtype: None
|
|
@return: None
|
|
"""
|
|
if 'data_source' not in connection_payload:
|
|
message = "data_source not provided in the request payload"
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
if 'ingestion_method' not in connection_payload:
|
|
message = "ingestion_method not provided in the request payload"
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
if 'type' not in connection_payload['ingestion_method']:
|
|
message = "ingestion_method type not provided in the request payload"
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
ingestion_method = connection_payload['ingestion_method']
|
|
method_type = ingestion_method['type']
|
|
if method_type not in EA_DATA_INTEGRATION_METHOD_TYPES:
|
|
message = "method_type is invalid. method_type should be one of the following: {}".format(
|
|
EA_DATA_INTEGRATION_METHOD_TYPES)
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
if 'value' not in connection_payload['ingestion_method']:
|
|
message = "In the request payload, no value provided under ingestion_method"
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
|
|
if not is_preview:
|
|
if 'status' not in connection_payload:
|
|
message = "status not provided in the request payload. status should be one of the following: {}".format(
|
|
EA_DATA_INTEGRATION_VALID_STATUS)
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
status = connection_payload['status']
|
|
if status not in EA_DATA_INTEGRATION_VALID_STATUS:
|
|
message = "invalid status provided in the request payload. status should be one of the following: {}".format(
|
|
EA_DATA_INTEGRATION_VALID_STATUS)
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
if 'title' not in connection_payload:
|
|
message = "title not provided in the request payload"
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
if 'throttling' in connection_payload:
|
|
throttling_params = connection_payload['throttling']
|
|
if 'throttling_earliest_time' in throttling_params:
|
|
throttling_earliest_time = throttling_params['throttling_earliest_time']
|
|
if throttling_earliest_time[0] != '-':
|
|
message = "invalid throttling_earliest_time provided. It should have a minus sign in the front."
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
if 'throttling_latest_time' in throttling_params:
|
|
throttling_latest_time = throttling_params['throttling_latest_time']
|
|
if throttling_latest_time != 'now' and throttling_latest_time[0] != '-':
|
|
message = ("invalid throttling_latest_time provided. It should either be 'now' or it should "
|
|
"have a minus sign in the front.")
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
if 'dedup_grouping_fields' in throttling_params:
|
|
dedup_grouping_fields = throttling_params['dedup_grouping_fields']
|
|
if not isinstance(dedup_grouping_fields, list):
|
|
message = "invalid data type provided for dedup_grouping_fields. It should be a list."
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
if 'dedup_notable_event' in throttling_params:
|
|
dedup_notable_event = throttling_params['dedup_notable_event']
|
|
if not isinstance(dedup_notable_event, bool):
|
|
message = ("invalid value provided for dedup_notable_event. It should be of type boolean: "
|
|
"true or false. If value is true or false, make sure it's not a string. "
|
|
"dedup_notable_event={}. Type of dedup_notable_event: {}".
|
|
format(dedup_notable_event, type(dedup_notable_event)))
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
|
|
if 'mapped_fields' not in connection_payload:
|
|
message = "mapped_fields not provided in the request payload"
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
mapped_fields = connection_payload['mapped_fields']
|
|
if len(mapped_fields) == 0:
|
|
message = "no mappings provided under mapped_fields in the request payload"
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
for key in mapped_fields:
|
|
field_name = key
|
|
field = mapped_fields[key]
|
|
if 'input_type' not in field:
|
|
message = "input_type is not provided for the field {}".format(field_name)
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
input_type = field['input_type']
|
|
if input_type not in EA_DATA_INTEGRATION_INPUT_TYPE:
|
|
message = "input_type is incorrect. It should be one of the following: {}".format(
|
|
EA_DATA_INTEGRATION_INPUT_TYPE)
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
if 'values' not in field:
|
|
message = "no values provided for the field {}".format(field_name)
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
values = field['values']
|
|
if len(values) <= 0:
|
|
message = "no values provided for the field {}".format(field_name)
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
if input_type == 'regex':
|
|
if len(values) > 1:
|
|
# there should be only 1 value in regex
|
|
message = ("multiple values provided for the field {}. There should be only 1 value for regex "
|
|
"input_type").format(field_name)
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
# if no regex_source is provided for regex input_type then use '_raw' field as the regex_source
|
|
if 'regex_source' not in field:
|
|
message = ("no regex_source provided for the field {}. If there is no specific field for "
|
|
"regex_source then pass _raw as it's value").format(field_name)
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
if not field['regex_source']:
|
|
message = ("no value provided for regex_source for field {}. If there is no specific field for "
|
|
"regex_source then pass _raw as it's value").format(field_name)
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
regex_str = field['values'][0]
|
|
if not EventOnboardingUtils.is_valid_regex(regex_str):
|
|
message = "invalid regex provided for field {}. Regex: {}".format(field_name, regex_str)
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
if '<itsi_field_name>' not in regex_str:
|
|
message = "<itsi_field_name> capture group not present in regular expression for field {}".format(
|
|
field_name)
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
|
|
@staticmethod
|
|
def get_modified_capture_group(field_name, regex):
|
|
"""
|
|
Modify the capture group in the regex by prepending field name whose value we are trying to extract
|
|
via the regular expression. All the regular expressions will have the (?<itsi_field_name>) capture group.
|
|
itsi_field_name will contain the value we are trying to extract. Since in the connection, we can have multiple
|
|
field mappings, we need to give more specific names for the capture groups so there are no conflicts.
|
|
|
|
Example: field_name="title", regex="browserType:(?<itsi_field_name>.*?)"
|
|
Modified regex will look like this: "browserType:(?<title_itsi_field_name>.*?)"
|
|
|
|
If there are any spaces in the field name, the captured group will replace those with underscores. This is
|
|
just to avoid any potential issues by having spaces in the capture group name.
|
|
|
|
@type field_name: string
|
|
@param field_name: Name of the field whose value we are trying to extract
|
|
|
|
@type regex: string
|
|
@param regex: regular expression which is trying to extract value via the named capture group. The named
|
|
capture group will be <itsi_field_name>.
|
|
|
|
@rtype: string
|
|
@return: regex with the modified capture group
|
|
"""
|
|
if not field_name:
|
|
message = "field name is empty. regex for field: {}".format(regex)
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
if not regex:
|
|
message = "regex is empty for field {}".format(field_name)
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
if '<itsi_field_name>' not in regex:
|
|
message = ("Capture group called \"<itsi_field_name>\" should be provided in the regex. "
|
|
"field name: {}. regex: {}").format(field_name, regex)
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
modified_field_name = field_name.replace(' ', '_')
|
|
modified_capture_group = modified_field_name + '_' + 'itsi_field_name'
|
|
return modified_capture_group
|
|
|
|
@staticmethod
|
|
def get_groupingid_eval(group_by_fields, field_name='internal_groupingid'):
|
|
"""
|
|
Takes a list of field names, concatenes those fields separated by colon, and assign it to the field_name in
|
|
an eval expression.
|
|
Example: group_by_fields = ['title, 'status', 'severity']
|
|
Return "| eval groupingid= title . ":" . status . ":" . severity"
|
|
"""
|
|
if group_by_fields:
|
|
group_spl = ''
|
|
joining_str = ' . ":" . '
|
|
for field in group_by_fields:
|
|
group_spl += '{}{}'.format(joining_str, field)
|
|
# remove the joining_str from the beginning of the string
|
|
group_spl = group_spl[len(joining_str) - 1:]
|
|
group_spl = '"ALARM:" . {}'.format(group_spl)
|
|
grouping_eval = '| eval {} = {}'.format(field_name, group_spl)
|
|
logger.info('grouping_eval: %s', grouping_eval)
|
|
return grouping_eval
|
|
else:
|
|
return None
|
|
|
|
def get_dedup_notable_event_search(self, earliest_time='-59m', latest_time='now'):
|
|
"""
|
|
Modifies the self.dedup_search_for_notable_event to replace the earliest and latest time with the given
|
|
values
|
|
"""
|
|
# replace this string: "earliest=-59m latest=now"
|
|
old_time_str = "earliest=-59m latest=now"
|
|
new_time_str = 'earliest={} latest={}'.format(earliest_time, latest_time)
|
|
new_search = self.dedup_search_for_notable_event.replace(old_time_str, new_time_str)
|
|
logger.info('After modifying the time string: dedup_search_for_notable_event=%s', new_search)
|
|
return new_search
|
|
|
|
@staticmethod
|
|
def get_default_value(field_key, field_data):
|
|
"""
|
|
Get default value for the field_key from its field_data.
|
|
|
|
@type field_key: str
|
|
@param field_key: name of the field whose default value we are trying to determine
|
|
|
|
@type field_data: dict
|
|
@param field_data: dictionary (from json payload) which contains data pertaining to the given field_key
|
|
|
|
@rtype: string
|
|
@return: default value pertaining to the given field_key. If no default value determined, return None.
|
|
"""
|
|
if 'default_value' in field_data:
|
|
return field_data['default_value']
|
|
if field_key == 'owner':
|
|
return 'unassigned'
|
|
elif field_key == 'severity':
|
|
return '1'
|
|
elif field_key == 'status':
|
|
return '1'
|
|
return None
|
|
|
|
@staticmethod
|
|
def insert_additional_query(spl, additional_query):
|
|
"""
|
|
Takes the additional_query and insert in front of the first pipe in the spl
|
|
|
|
@type spl: str
|
|
@param spl: name of the field whose default value we are trying to determine
|
|
|
|
@type additional_query: dict
|
|
@param additional_query: dictionary (from json payload) which contains data pertaining to the given field_key
|
|
|
|
@rtype: string
|
|
@return: additional_query inserted in front of the first pipe in the spl
|
|
"""
|
|
idx = spl.find('|') # find the first pipe in the spl query
|
|
new_spl = ''
|
|
if idx == -1: # no pipe found
|
|
new_spl = '{} {}'.format(spl, additional_query)
|
|
else: # pipe found
|
|
first_part = spl[:idx - 1]
|
|
second_part = spl[idx:]
|
|
# if original spl is: index=itsi_main | eval my_owner="blahblah" | head 10
|
|
# and additional_query is: src="x" signature="abc"
|
|
# new spl will be: index=itsi_main src="x" signature="abc" | eval my_owner="blahblah" | head 10
|
|
new_spl = '{} {} {}'.format(first_part, additional_query, second_part)
|
|
return new_spl
|
|
|
|
def calculate_throttling_params(self, connection_payload):
|
|
# Set default values
|
|
throttling_earliest_time = '-59m'
|
|
throttling_latest_time = 'now'
|
|
dedup_grouping_fields = []
|
|
dedup_notable_event = False
|
|
throttling_enabled = False
|
|
|
|
# Check for throttling in the payload
|
|
if 'throttling' in connection_payload:
|
|
throttling_params = connection_payload['throttling']
|
|
logger.info('throttling_params=%s', throttling_params)
|
|
|
|
# Update dedup_grouping_fields if present
|
|
if 'dedup_grouping_fields' in throttling_params:
|
|
dedup_grouping_fields = throttling_params['dedup_grouping_fields']
|
|
# Enable throttling if there are grouping fields
|
|
throttling_enabled = len(dedup_grouping_fields) > 0
|
|
|
|
# Update earliest and latest throttling times if present
|
|
throttling_earliest_time = throttling_params.get('throttling_earliest_time', throttling_earliest_time)
|
|
throttling_latest_time = throttling_params.get('throttling_latest_time', throttling_latest_time)
|
|
|
|
# Update dedup_notable_event if present
|
|
dedup_notable_event = throttling_params.get('dedup_notable_event', dedup_notable_event)
|
|
|
|
# Return the calculated fields as a dictionary
|
|
return {
|
|
'throttling_earliest_time': throttling_earliest_time,
|
|
'throttling_latest_time': throttling_latest_time,
|
|
'dedup_grouping_fields': dedup_grouping_fields,
|
|
'dedup_notable_event': dedup_notable_event,
|
|
'throttling_enabled': throttling_enabled
|
|
}
|
|
|
|
def generate_spl_for_indexed_connection(self, connection_payload, is_preview=False):
|
|
"""
|
|
Generates SPL search from the field mappings in the given connection payload.
|
|
|
|
@type connection_payload: dict
|
|
@param connection_payload: connection payload json for the connection being created or updated. Along with
|
|
other parameters, it will also contain field mappings.
|
|
|
|
@type session_key: basestring
|
|
@param session_key: session_key
|
|
|
|
@type search_name: string
|
|
@param search_name: name for the saved search. It will be the same as the title of the connection.
|
|
|
|
@type create_connection: bool
|
|
@param create_connection: whether the connection is being created or being updated. If it's set to true, a
|
|
new saved search will be created for it. If it's false, the existing saved search for the connection will
|
|
be updated.
|
|
|
|
@type is_preview: bool
|
|
@param is_preview: whether we are generating the spl search for preview results
|
|
|
|
@rtype: string
|
|
@return: SPL search created from the field mappings
|
|
"""
|
|
orig_search_query = connection_payload['ingestion_method']['value']
|
|
search_query = orig_search_query
|
|
if not search_query:
|
|
message = "Search query is empty in ingestion method. It should be provided as a value under ingestion_method."
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
|
|
final_spl = search_query
|
|
|
|
throttling_dict = self.calculate_throttling_params(connection_payload)
|
|
if throttling_dict['throttling_enabled']:
|
|
time_str = 'earliest={} latest={}'.format(throttling_dict['throttling_earliest_time'], throttling_dict['throttling_latest_time'])
|
|
final_spl = EventOnboardingUtils.insert_additional_query(final_spl, time_str)
|
|
|
|
mapped_fields = connection_payload['mapped_fields']
|
|
for field_key in mapped_fields:
|
|
field = mapped_fields[field_key]
|
|
input_type = field['input_type']
|
|
default_val = EventOnboardingUtils.get_default_value(field_key, field)
|
|
if input_type == 'regex':
|
|
regex_val = field['values'][0]
|
|
capture_group = 'itsi_field_name'
|
|
# escape double quotes within the regex because in rex the regex is surrounded in double quotes
|
|
regex_val = regex_val.replace('"', '\\"')
|
|
regex_src_field = field['regex_source']
|
|
# if there are curly braces around regex_source field name, then remove them
|
|
if regex_src_field[0] == '{' and regex_src_field[-1] == '}':
|
|
regex_src_field = regex_src_field[1:-1]
|
|
rex_str = ' | rex field={} \"{}"'.format(regex_src_field, regex_val)
|
|
if default_val:
|
|
# if there is a default value for the given field_key and if the eval expression didn't extract
|
|
# any value then assign it the default value
|
|
eval_str = '{} | eval {} = coalesce({}, "{}")'.format(rex_str, field_key, capture_group,
|
|
default_val)
|
|
else:
|
|
eval_str = '{} | eval {} = {}'.format(rex_str, field_key, capture_group)
|
|
final_spl += eval_str
|
|
elif input_type == 'composition':
|
|
values = field['values']
|
|
val_so_far = ''
|
|
for x in values:
|
|
val = str(x)
|
|
if len(val) > 2 and val[0] == '{' and val[-1] == '}':
|
|
# it's a field name if it's enclosed in {}, extract the field name. Example: "{vendor_severity}"
|
|
v = val[1:len(val) - 1]
|
|
# surround field names with single quotes to allow special characters such as dot.
|
|
# example: | eval title = 'Source.title'
|
|
val_so_far = "{} + \'{}\'".format(val_so_far, v)
|
|
else:
|
|
# it's a plain string value
|
|
val_so_far = '{} + \"{}\"'.format(val_so_far, val)
|
|
# Remove the ' + ' at the beginning of the string
|
|
val_so_far = val_so_far[2:]
|
|
if default_val:
|
|
# if there is a default value for the given field_key and if the eval expression didn't extract
|
|
# any value then assign it the default value
|
|
final_spl = '{} | eval {} = coalesce({}, "{}")'.format(final_spl, field_key, val_so_far,
|
|
default_val)
|
|
else:
|
|
final_spl = '{} | eval {} = {}'.format(final_spl, field_key, val_so_far)
|
|
elif input_type == 'mapping_rule':
|
|
rule_type = field['rule_type']
|
|
values = field['values']
|
|
if rule_type == 'case':
|
|
case_condition = self.generate_case_statement(values)
|
|
final_spl = '{} | eval {} = {}'.format(final_spl, field_key, case_condition)
|
|
elif rule_type == 'coalesce':
|
|
coalesce_condition = self.generate_coalesce_statement(field)
|
|
final_spl = '{} | eval {} = {}'.format(final_spl, field_key, coalesce_condition)
|
|
else:
|
|
# This should not be the case because we should always check this in our initial validation
|
|
message = "Invalid input_type provided for field {}. input_type should be one of the following {}".format(
|
|
field_key, input_type)
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
|
|
# severity_id_mapping and status_id_mapping are expected to be SPL statements.
|
|
# Example: case(vendor_severity="CRITICAL", 6, vendor_severity="WARN", 2, 1=1, 1)
|
|
# Todo: Remove this once template is updated with default values for case & coalesce
|
|
'''
|
|
severity_id_mapping = connection_payload['severity_id_mapping']
|
|
if severity_id_mapping:
|
|
final_spl = '{} | eval severity_id = {}'.format(final_spl, severity_id_mapping)
|
|
if 'status_id_mapping' in connection_payload:
|
|
status_id_mapping = connection_payload['status_id_mapping']
|
|
if status_id_mapping:
|
|
if not isinstance(status_id_mapping, itsi_py3.string_type):
|
|
message = "Invalid value provided for status_id_mapping. It should be string type."
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
else:
|
|
final_spl = '{} | eval status = {}'.format(final_spl, status_id_mapping)
|
|
|
|
final_spl = '{} | eval subcomponent=if(isnotnull(subcomponent), subcomponent, "-")'.format(final_spl)
|
|
'''
|
|
if throttling_dict['throttling_enabled']:
|
|
grouping_eval = EventOnboardingUtils.get_groupingid_eval(throttling_dict['dedup_grouping_fields'])
|
|
final_spl = '{} {}'.format(final_spl, grouping_eval)
|
|
logger.info('Appended groupingid field. spl so far: %s', final_spl)
|
|
|
|
final_spl = '{} {}'.format(final_spl, self.dedup_search_for_raw_alert)
|
|
if throttling_dict['dedup_notable_event']:
|
|
final_spl = '{} {}'.format(
|
|
final_spl,
|
|
self.get_dedup_notable_event_search(
|
|
throttling_dict['throttling_earliest_time'],
|
|
throttling_dict['throttling_latest_time']
|
|
)
|
|
)
|
|
'''
|
|
escaped_orig_query = orig_search_query.replace('"', '\\"')
|
|
# Todo: Remove below once template is updated with default values for case & coalesce
|
|
|
|
|
|
additional_query = 'src=\\"" + src + "\\" signature=\\"" + signature + "\\"'
|
|
first_spl = EventOnboardingUtils.insert_additional_query(escaped_orig_query, additional_query)
|
|
additional_query_2 = 'src=\\"" + src + "\\" signature=\\"" + signature + "\\" subcomponent=\\"" + subcomponent + "\\"'
|
|
second_spl = EventOnboardingUtils.insert_additional_query(escaped_orig_query, additional_query_2)
|
|
drill_search_eval = ('| eval itsiDrilldownSearch=case(isnotnull(itsiDrilldownSearch), '
|
|
'itsiDrilldownSearch, subcomponent="-", \"{}\",'
|
|
'1=1, \"{}\")').format(first_spl, second_spl)
|
|
final_spl = '{} {}'.format(final_spl, drill_search_eval)
|
|
drill_web_name_eval = (
|
|
'| eval itsiDrilldownWebName=case((isnotnull(itsiDrilldownWebURL) AND isnotnull(itsiDrilldownWebName)), itsiDrilldownWebName, '
|
|
'(isnotnull(itsiDrilldownWebURL) AND isnull(itsiDrilldownWebName)), "External Drilldown for " + title, '
|
|
'isnull(itsiDrilldownWebURL), "Sorry, no external drilldown available")')
|
|
final_spl = '{} {}'.format(final_spl, drill_web_name_eval)
|
|
drill_web_url_eval = '| eval itsiDrilldownWebURL=if(isnotnull(itsiDrilldownWebURL), itsiDrilldownWebURL, "https://splunk.com") '
|
|
final_spl = '{} {}'.format(final_spl, drill_web_url_eval)
|
|
drill_earliest_offset_eval = '| eval itsiDrilldownEarliestOffset=coalesce(itsiDrilldownEarliestOffset, "-900")'
|
|
final_spl = '{} {}'.format(final_spl, drill_earliest_offset_eval)
|
|
drill_latest_offset_eval = '| eval itsiDrilldownLatestOffset=coalesce(itsiDrilldownLatestOffset, "900")'
|
|
final_spl = '{} {}'.format(final_spl, drill_latest_offset_eval)
|
|
|
|
'''
|
|
#####
|
|
|
|
# IMPORTANT: this part of preview SPL has to happen before we add filter_maintenance_services(service_ids)
|
|
# otherwise we will not get _raw and _time fields
|
|
if is_preview:
|
|
# 'eval tmp_preview_raw = _raw' is needed to preserve the _raw for preview results. We need to do this
|
|
# because once we apply filter_maintenance_services() in SPL, we will lose _raw and _time fields.
|
|
# When search job returns _time field, it will be converted to human-readable time stamp, however we want
|
|
# the epoch time hence adding a separate field to preserve the epoch time
|
|
final_spl = '{} | eval tmp_preview_raw = _raw | eval epoch_time = _time'.format(final_spl)
|
|
|
|
entity_lookup_field = connection_payload.get("association", {}).get("entity_lookup_field")
|
|
service_ids = connection_payload.get("association", {}).get("service_ids")
|
|
if entity_lookup_field is not None and entity_lookup_field != '':
|
|
final_spl = '{} | `apply_entity_lookup({})`'.format(final_spl, entity_lookup_field)
|
|
if service_ids is not None and service_ids != '':
|
|
final_spl = '{} | `filter_maintenance_services("{}")`'.format(final_spl, service_ids)
|
|
if is_preview:
|
|
# limit the max number of results to the preview_results_limit value defined in the conf
|
|
final_spl = '{} | head {}'.format(final_spl, self.preview_results_limit)
|
|
return final_spl
|
|
|
|
@staticmethod
|
|
def get_correlation_search_name(connection_title):
|
|
"""
|
|
Construct the correlation search name for the given connection title
|
|
|
|
@type connection_title: str
|
|
|
|
@rtype: str
|
|
"""
|
|
return EA_DATA_INTEGRATION_CS_TITLE_PREFIX + connection_title
|
|
|
|
def create_correlation_search_payload(self, connection_payload, spl_search):
|
|
if "cron_schedule" in connection_payload and "value" in connection_payload["cron_schedule"]:
|
|
cron_schedule = connection_payload["cron_schedule"]["value"]
|
|
else:
|
|
# If "cron_schedule" field is not present or "value" field is missing, assign default value
|
|
cron_schedule = "*/1 * * * *"
|
|
|
|
time_range = connection_payload.get("ingestion_method", {}).get("time_range", {})
|
|
dispatch_earliest_time = time_range.get("earliest", "-15m")
|
|
dispatch_latest_time = time_range.get("latest", "now")
|
|
connection_title = connection_payload['title']
|
|
correlation_search_title = self.get_correlation_search_name(connection_title)
|
|
status = connection_payload.get('status', "active")
|
|
disabled = 0 if status == "active" else 1
|
|
|
|
payload = {
|
|
"name": correlation_search_title,
|
|
"description": "Correction Search for data integration connection: {}".format(connection_title),
|
|
"actions": "itsi_event_generator",
|
|
"action.itsi_event_generator": 1,
|
|
"action.itsi_event_generator.param.meta_data": "{}",
|
|
"action.itsi_event_generator.param.search_type": "basic",
|
|
"action.itsi_event_generator.param.title": "%title%",
|
|
"action.itsi_event_generator.param.status": "%status%",
|
|
"action.itsi_event_generator.param.severity": "%severity_id%",
|
|
"action.itsi_event_generator.param.owner": "%owner%",
|
|
"action.itsi_event_generator.param.itsi_instruction": "%itsi_instruction%",
|
|
"action.itsi_event_generator.param.drilldown_search_title": "%itsiDrilldownSearchName%",
|
|
"action.itsi_event_generator.param.drilldown_search_search": "%itsiDrilldownSearch%",
|
|
"action.itsi_event_generator.param.drilldown_search_earliest_offset": "%itsiDrilldownEarliestOffset%",
|
|
"action.itsi_event_generator.param.drilldown_search_latest_offset": "%itsiDrilldownLatestOffset%",
|
|
"action.itsi_event_generator.param.drilldown_title": "%itsiDrilldownWebName%",
|
|
"action.itsi_event_generator.param.drilldown_uri": "%itsiDrilldownWebURL%",
|
|
"action.itsi_event_generator.param.event_identifier_fields": "groupingid",
|
|
"alert.track": "0",
|
|
"counttype": "number of events",
|
|
"cron_schedule": cron_schedule,
|
|
"dispatch.earliest_time": dispatch_earliest_time,
|
|
"dispatch.latest_time": dispatch_latest_time,
|
|
"enableSched": 1,
|
|
"quantity": "0",
|
|
"relation": "greater than",
|
|
"disabled": disabled,
|
|
"hide_on_ui": 1,
|
|
"search": spl_search
|
|
}
|
|
entity_lookup_field = connection_payload.get("association", {}).get("entity_lookup_field")
|
|
service_ids = connection_payload.get("association", {}).get("service_ids")
|
|
if entity_lookup_field is not None and entity_lookup_field != '':
|
|
payload["action.itsi_event_generator.param.entity_lookup_field"] = entity_lookup_field
|
|
if service_ids is not None and service_ids != '':
|
|
payload["action.itsi_event_generator.param.service_ids"] = service_ids
|
|
|
|
if self.is_splunk_throttling_enabled:
|
|
throttling_dict = self.calculate_throttling_params(connection_payload)
|
|
payload["alert.suppress"] = 1
|
|
if 'dedup_grouping_fields' in throttling_dict and throttling_dict['dedup_grouping_fields']:
|
|
payload["alert.suppress.fields"] = ','.join(throttling_dict['dedup_grouping_fields'])
|
|
if 'throttling_earliest_time' in throttling_dict and throttling_dict['throttling_earliest_time']:
|
|
duration_in_minutes = throttling_dict['throttling_earliest_time'][1:]
|
|
payload["alert.suppress.period"] = duration_in_minutes
|
|
|
|
return payload
|
|
|
|
def create_or_update_correlation_search(self, create, connection_payload, spl_search):
|
|
connection_title = connection_payload['title']
|
|
correlation_search_title = self.get_correlation_search_name(connection_title)
|
|
payload = self.create_correlation_search_payload(connection_payload, spl_search)
|
|
if create:
|
|
logger.info("About to create correlation search: {}".format(correlation_search_title))
|
|
self.correlation_interface.create(payload, raise_if_exist=True)
|
|
else:
|
|
logger.info("About to update correlation search: {}".format(correlation_search_title))
|
|
self.correlation_interface.update(object_id=correlation_search_title, data=payload)
|
|
|
|
def create_or_update_connection(self, connection_payload, create_connection=False):
|
|
"""
|
|
Create or update a correlation search for the given connection payload
|
|
|
|
@type connection_payload: dict
|
|
@param connection_payload: connection payload json for the connection being created or updated. Along with
|
|
other parameters, it will also contain field mappings.
|
|
|
|
@type create_connection: bool
|
|
@param create_connection: whether the connection is being created or being updated. If it's set to true, a
|
|
new saved search will be created for it. If it's false, the existing saved search for the connection will
|
|
be updated.
|
|
|
|
@rtype: None
|
|
@return: None
|
|
"""
|
|
connection_action = "create" if create_connection else "update"
|
|
try:
|
|
# Even for partial updates, for example activating or deactivating the connection,
|
|
# itsi_data_integration is sending the whole object. We can use the regular flow of updating
|
|
# the whole correlation search instead of having a separate flow for enabling or disabling CS
|
|
logger.info("Received request to {} connection: {}".format(connection_action, connection_payload))
|
|
EventOnboardingUtils.validate_connection_payload(connection_payload)
|
|
title = connection_payload['title']
|
|
self.init_dedup_searches()
|
|
spl_search = self.generate_spl_for_indexed_connection(connection_payload, is_preview=False)
|
|
logger.info("Created spl search for connection. title=%s spl_search=%s", title, spl_search)
|
|
self.create_or_update_correlation_search(create_connection, connection_payload, spl_search)
|
|
except Exception as e:
|
|
title = connection_payload['title'] if 'title' in connection_payload else ''
|
|
message = "Unable to {} connection {}. Error message: {}".format(connection_action, title, e)
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
|
|
@staticmethod
|
|
def wait_for_job(search_job, maxtime=10):
|
|
"""
|
|
Wait up to maxtime seconds for search_job to finish. If maxtime is
|
|
negative, waits forever. Returns true, if job finished.
|
|
"""
|
|
pause = 0.2
|
|
lapsed = 0.0
|
|
while not search_job.is_done():
|
|
time.sleep(pause)
|
|
lapsed += pause
|
|
if maxtime >= 0 and lapsed > maxtime:
|
|
logger.info("Done waiting for job to be finished. maxtime=%s lapsed=%s", maxtime, lapsed)
|
|
break
|
|
return search_job.is_done()
|
|
|
|
def get_search_job(self, search_query, earliest_time='-24h', latest_time='now'):
|
|
"""
|
|
Creates search job.
|
|
"""
|
|
service = ITOAInterfaceUtils.service_connection(self.splunkd_session_key, app_name="SA-ITOA")
|
|
current_search_job = service.jobs.create(
|
|
search_query,
|
|
earliest_time=earliest_time,
|
|
latest_time=latest_time
|
|
)
|
|
return current_search_job
|
|
|
|
def wait_for_search(self, search_query, earliest_time='-24h@h', latest_time='now'):
|
|
"""
|
|
Create search job and wait for search job to complete. Return the list of results from the search job.
|
|
"""
|
|
try:
|
|
search_job = self.get_search_job(search_query, earliest_time, latest_time)
|
|
if not self.wait_for_job(search_job, self.preview_results_search_wait_time):
|
|
raise Exception("Search for data integration connection preview results timed out.")
|
|
params = {
|
|
'output_mode': 'json',
|
|
'count': 0
|
|
}
|
|
search_results = search_job.results(**params)
|
|
results_to_return = []
|
|
for entry in search_results:
|
|
if isinstance(entry, (bytes, bytearray)):
|
|
entry = entry.decode('utf-8')
|
|
entry_json = json.loads(entry)
|
|
if 'results' in entry_json:
|
|
for row in entry_json['results']:
|
|
results_to_return.append(row)
|
|
else:
|
|
logger.error('No results field found in response: {}'.format(entry_json))
|
|
logger.info('Total number of events retrieved from preview search: {}'.format(len(results_to_return)))
|
|
return results_to_return
|
|
except Exception as e:
|
|
message = "Error occurred while getting preview results: {}".format(e)
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
|
|
@staticmethod
|
|
def get_mapped_field_names(connection_payload):
|
|
"""
|
|
From the given connection payload, get the set of field names to be mapped.
|
|
|
|
@type connection_payload: dict
|
|
@param connection_payload: connection payload json for the connection to be created, updated, or previewed.
|
|
The payload must contain 'mapped_fields' key whose value have fields which are to be mapped.
|
|
|
|
@rtype: set
|
|
@return: set of field names which are going to be transformed
|
|
"""
|
|
mapped_fields = connection_payload['mapped_fields']
|
|
field_names = []
|
|
for key in mapped_fields:
|
|
if key not in field_names:
|
|
field_names.append(key)
|
|
return field_names
|
|
|
|
def get_event_onboarding_conf(self):
|
|
"""
|
|
Get event onboarding configuration from itsi_event_management.conf file. The configs will be stored in the
|
|
class variables.
|
|
|
|
@rtype: None
|
|
@return: None
|
|
"""
|
|
try:
|
|
cfm = ConfManager(self.splunkd_session_key, 'SA-ITOA')
|
|
conf = cfm.get_conf('itsi_event_management')
|
|
settings = conf.get('event_onboarding')
|
|
self.preview_results_limit = int(settings['preview_results_limit'])
|
|
self.preview_results_search_wait_time = float(settings['preview_results_search_wait_time'])
|
|
self.valid_severities = cfm.get_conf('itsi_notable_event_severity')
|
|
self.valid_status = cfm.get_conf('itsi_notable_event_status')
|
|
logger.info('event_onboarding settings: {} preview_results_limit={} '
|
|
'preview_results_search_wait_time={} '.format(settings, self.preview_results_limit,
|
|
self.preview_results_search_wait_time))
|
|
except Exception as e:
|
|
logger.error('Error occurred while getting configuration for event onboarding. Will use default'
|
|
'values for configs whose values were not fetched. Exception: {}'.format(e))
|
|
|
|
def get_conf_label(self, result, conf_type):
|
|
try:
|
|
if conf_type == 'severity_id':
|
|
return self.valid_severities.get(result, {}).get('label')
|
|
if conf_type == 'status':
|
|
return self.valid_status.get(result, {}).get('label')
|
|
except ConfStanzaNotExistException:
|
|
# Handle the specific exception and return a custom message
|
|
return f"{conf_type} not found"
|
|
except Exception as e:
|
|
logger.error('Error occurred while getting configuration for event onboarding. Exception: {}'.format(e))
|
|
|
|
def get_preview_results(self, connection_payload):
|
|
"""
|
|
Get preview results for the given connection payload
|
|
|
|
@type connection_payload: dict
|
|
@param connection_payload: connection payload json for the connection to be created or updated. Along with
|
|
other parameters, it will also contain field mappings. Field mappings are mandatory for getting preview results
|
|
|
|
@rtype: str
|
|
@return: list of events (in str format) for preview results. each event will have the following fields:
|
|
transformed_fields, _raw, and _time.
|
|
transformed_fields will contain the new fields which are create via the mapped_fields and their corresponding
|
|
value.
|
|
_raw is the original raw event.
|
|
_time is the time of the event.
|
|
|
|
Example of results returned:
|
|
[
|
|
{
|
|
"transformed_fields": {
|
|
"status": "1",
|
|
"title": "aaabbb_ccc",
|
|
"severity": "5",
|
|
"owner": "abc_my_owner"
|
|
},
|
|
"_raw": "{\"field1\": \"aaa\", \"field2\": \"bbb\", \"field3\": \"ccc\", \"description\": \"blah_description_1596\", \"event_severity\": \"blah:5-wow\", \"status_field\": 1, \"host\": \"abc\"}",
|
|
"_time": "1711392312.9779332"
|
|
},
|
|
{
|
|
"transformed_fields": {
|
|
"status": "3",
|
|
"title": "aaabbb_ccc",
|
|
"severity": "5",
|
|
"owner": "abc_my_owner"
|
|
},
|
|
"_raw": "{\"field1\": \"aaa\", \"field2\": \"bbb\", \"field3\": \"ccc\", \"description\": \"blah_description_4647\", \"event_severity\": \"blah:5-wow\", \"status_field\": 3, \"host\": \"abc\"}",
|
|
"_time": "1711391629.354221"
|
|
}
|
|
]
|
|
"""
|
|
logger.info("Received request to preview events. Connection payload: {}".format(connection_payload))
|
|
self.validate_connection_payload(connection_payload, is_preview=True)
|
|
self.get_event_onboarding_conf()
|
|
self.init_dedup_searches()
|
|
spl_search = self.generate_spl_for_indexed_connection(connection_payload, is_preview=True)
|
|
search_query = "| search {}".format(spl_search)
|
|
earliest_time = connection_payload['earliest_time'] if 'earliest_time' in connection_payload else '-24h@h'
|
|
latest_time = connection_payload['latest_time'] if 'latest_time' in connection_payload else 'now'
|
|
logger.info("Created search query to preview events for connection. search_query=%s", search_query)
|
|
search_results = self.wait_for_search(search_query, earliest_time, latest_time)
|
|
transformed_field_names = self.get_mapped_field_names(connection_payload)
|
|
decoded_search_results = []
|
|
for result in search_results:
|
|
item = {"transformed_fields": {}}
|
|
for name in transformed_field_names:
|
|
if name in result:
|
|
if name.lower() == 'severity_id' or name.lower() == 'status':
|
|
item["transformed_fields"][name] = self.get_conf_label(result[name], name.lower())
|
|
continue
|
|
item["transformed_fields"][name] = result[name]
|
|
else:
|
|
logger.error("Could not find the following field in search results: {}".format(name))
|
|
if "tmp_preview_raw" in result:
|
|
item["_raw"] = result["tmp_preview_raw"]
|
|
else:
|
|
logger.error('Could not find _raw field in the search results')
|
|
# Return the epoch_time as _time because we want to return _time in epoch format
|
|
if "epoch_time" in result:
|
|
item["_time"] = result["epoch_time"]
|
|
else:
|
|
logger.error('Could not find epoch_time field in the search results')
|
|
decoded_search_results.append(item)
|
|
logger.info('Number of events to be returned for preview: {}'.format(len(decoded_search_results)))
|
|
# interface handler is expecting a string response
|
|
json_str = json.dumps(decoded_search_results)
|
|
return json_str
|
|
|
|
def delete_connection(self, connection_id, itsi_data_integration_interface):
|
|
"""
|
|
Deletes the dependencies of the given connection_id
|
|
|
|
@type connection_id: str
|
|
@param connection_id: _key of the connection which will be deleted
|
|
|
|
@type itsi_data_integration_interface: ItsiDataIntegration
|
|
@param itsi_data_integration_interface: used to access the itsi_data_integration collection
|
|
|
|
@rtype: None
|
|
@return: None
|
|
"""
|
|
cs_search_name = ""
|
|
try:
|
|
logger.info("Received request to delete connection. connection_id={}".format(connection_id))
|
|
connection_object = itsi_data_integration_interface.get(connection_id)
|
|
cs_search_name = self.get_correlation_search_name(connection_object['title'])
|
|
self.correlation_interface.delete(cs_search_name)
|
|
except ResourceNotFound as re:
|
|
logger.info("Correlation Search \'{}\' not found. Exception: {}".format(cs_search_name, re))
|
|
except Exception as e:
|
|
message = "Unable to delete the correlation search for connection_id: {}. Error: {}".format(connection_id,
|
|
e)
|
|
logger.error(message)
|
|
raise Exception(message)
|
|
|
|
def get_search_from_macros_conf(self, macro_name):
|
|
try:
|
|
uri_string = ('/servicesNS/{}/{}/properties/macros/'
|
|
'{}/definition').format(self.user, self.app, macro_name)
|
|
uri = safeURLQuote(uri_string)
|
|
res, content = splunk_rest.simpleRequest(uri, getargs={'output_mode': 'json'},
|
|
sessionKey=self.splunkd_session_key)
|
|
|
|
logger.info('Result of getting %s from macros.conf: response=%s content=%s', macro_name, res, content)
|
|
|
|
if res.status not in [200, 201]:
|
|
logger.error('Error in getting %s from macros.conf. response=%s content=%s', macro_name, res, content)
|
|
return None
|
|
if not content:
|
|
logger.error('%s from macros.conf was not returned', macro_name)
|
|
return None
|
|
if isinstance(content, (bytes, bytearray)):
|
|
content = content.decode('utf-8')
|
|
return content
|
|
except Exception as e:
|
|
logger.error('Error in getting %s from macros.conf. Exception=%s', macro_name, e)
|
|
return None
|
|
|
|
def init_dedup_searches(self):
|
|
"""
|
|
Retrieve the dedup searches from macros.conf
|
|
"""
|
|
dedup_raw_alert_definition = self.get_search_from_macros_conf('dedup_search_for_raw_alert')
|
|
if dedup_raw_alert_definition:
|
|
self.dedup_search_for_raw_alert = dedup_raw_alert_definition
|
|
logger.info('Successfully fetched dedup_search_for_raw_alert from macros.conf. '
|
|
'dedup_search_for_raw_alert=%s', self.dedup_search_for_raw_alert)
|
|
else:
|
|
logger.error('Error in fetching dedup_search_for_raw_alert from macros.conf. '
|
|
'Will use the default value. dedup_search_for_raw_alert=%s', self.dedup_search_for_raw_alert)
|
|
|
|
dedup_notable_event_definition = self.get_search_from_macros_conf('dedup_search_for_notable_event')
|
|
if dedup_notable_event_definition:
|
|
self.dedup_search_for_notable_event = dedup_notable_event_definition
|
|
logger.info('Successfully fetched dedup_search_for_notable_event from macros.conf. '
|
|
'dedup_search_for_notable_event=%s', self.dedup_search_for_notable_event)
|
|
else:
|
|
logger.error('Error in fetching dedup_search_for_notable_event from macros.conf. Will use the '
|
|
'default value. dedup_search_for_notable_event=%s', self.dedup_search_for_notable_event)
|
|
|
|
@staticmethod
|
|
def quote_field_if_needed(field_name):
|
|
# Quote the field if it contains special characters like periods, parentheses, etc.
|
|
special_chars = ['.']
|
|
if any(char in field_name for char in special_chars):
|
|
return f"'{field_name}'" # Wrap the field name in single quotes for Splunk
|
|
return field_name
|
|
|
|
@staticmethod
|
|
def format_value(value):
|
|
if isinstance(value, str):
|
|
if value.startswith("{") and value.endswith("}"):
|
|
field_name = value[1:-1] # Remove curly braces for field references
|
|
return EventOnboardingUtils.quote_field_if_needed(field_name)
|
|
return f'"{value}"'
|
|
if isinstance(value, bool):
|
|
return str(value).lower() # Convert boolean to lowercase string
|
|
return str(value)
|
|
|
|
def construct_case_clause_string(self, clauses):
|
|
"""
|
|
Function to construct a logical condition string
|
|
@param clauses: if else conditions from payload for the selected mapping field
|
|
@return: return the
|
|
"""
|
|
clause_strings = []
|
|
for clause in clauses:
|
|
if 'field' not in clause or 'operator' not in clause:
|
|
raise KeyError('Each clause must contain "field" and "operator" keys')
|
|
field = clause["field"]
|
|
operator = clause["operator"] # Todo: decide on operator naming convention with UI team
|
|
value = clause.get("value", "")
|
|
case_sensitive = clause.get("case_sensitive", True)
|
|
|
|
if operator == "is not null":
|
|
clause_strings.append(f'isnotnull({field})')
|
|
elif operator == "is null":
|
|
clause_strings.append(f'isnull({field})')
|
|
elif operator in ["==", "!=", ">", "<", ">=", "<="]:
|
|
if not case_sensitive:
|
|
if isinstance(value, str):
|
|
clause_strings.append(f'lower({field}) {operator} "{value.lower()}"')
|
|
else:
|
|
clause_strings.append(f'lower({field}) {operator} {value}')
|
|
else:
|
|
clause_strings.append(f'{field} {operator} "{value}"')
|
|
|
|
return " AND ".join(clause_strings).strip()
|
|
|
|
def generate_case_statement(self, values):
|
|
"""
|
|
Function to process the JSON structure and generate a case statement
|
|
|
|
@param values: values from selected mapping field
|
|
@return:
|
|
"""
|
|
case_statements = []
|
|
|
|
for condition in values:
|
|
if 'condition' not in condition:
|
|
raise KeyError('Missing "condition" key in one of the values')
|
|
if 'outcomes' not in condition:
|
|
raise KeyError('Missing "outcomes" key in one of the values')
|
|
if condition['condition'] in ['IF', 'ELSE_IF'] and 'clauses' not in condition:
|
|
raise KeyError('Missing "clauses" key for IF or ELSE_IF condition')
|
|
|
|
condition_type = condition["condition"]
|
|
clauses = self.construct_case_clause_string(condition.get("clauses", []))
|
|
outcomes = condition["outcomes"]
|
|
|
|
if not isinstance(outcomes, list):
|
|
raise TypeError('Outcomes should be a list')
|
|
|
|
outcome_str = ' . '.join(self.format_value(outcome) for outcome in outcomes)
|
|
|
|
if condition_type == "IF" or condition_type == "ELSE_IF":
|
|
case_statements.append(f'{clauses}, {outcome_str}')
|
|
elif condition_type == "ELSE":
|
|
case_statements.append(f'1=1, {outcome_str}')
|
|
|
|
return f'case({", ".join(case_statements)})'
|
|
|
|
def generate_coalesce_statement(self, field):
|
|
"""
|
|
Function to process the JSON structure and generate a coalesce statement
|
|
|
|
@param field: mapping field details from payload
|
|
@return: coalesce statement
|
|
"""
|
|
if 'values' not in field:
|
|
raise KeyError('Missing "values" key in the input data')
|
|
values = field['values']
|
|
if not isinstance(values, list):
|
|
raise TypeError('The "values" key must be a list')
|
|
coalesce_values = []
|
|
for value in values:
|
|
if isinstance(value, list):
|
|
concatenated_value = " . ".join(self.format_value(item) for item in value)
|
|
coalesce_values.append(concatenated_value)
|
|
else:
|
|
coalesce_values.append(self.format_value(value))
|
|
|
|
if 'default_value' in field:
|
|
default_value = field['default_value']
|
|
coalesce_values.append(self.format_value(default_value))
|
|
|
|
coalesce_statement = f'coalesce({", ".join(coalesce_values)})'
|
|
return coalesce_statement
|
|
|
|
def migrate_connection_objects(self, version_number):
|
|
"""
|
|
Migration router for connection objects
|
|
@param: version_number: migration handler version number
|
|
@return: list of transformed connection objects
|
|
"""
|
|
if version_number == '4.20.0':
|
|
old_connection_objects = self.get_all_connection_objects()
|
|
|
|
transformed_connection_objects = self.transform_connection_objects(old_connection_objects)
|
|
self.update_all_connection_objects(transformed_connection_objects)
|
|
return transformed_connection_objects
|
|
return []
|
|
|
|
def get_all_connection_objects(self):
|
|
"""
|
|
Gets all data integration connection objects from KV store
|
|
@return: list of connection objects
|
|
"""
|
|
return self.connection_storage.get_all(self.splunkd_session_key,
|
|
self.user,
|
|
'data_integration')
|
|
|
|
def update_all_connection_objects(self, connection_objects):
|
|
"""
|
|
Updates KV store with connection objects
|
|
@param connection_objects: list of data integration connection objects
|
|
@return: None
|
|
"""
|
|
for connection in connection_objects:
|
|
connection_key = connection['_key']
|
|
self.connection_storage.delete(self.splunkd_session_key,
|
|
'nobody',
|
|
'data_integration',
|
|
connection_key)
|
|
self.connection_storage.create(self.splunkd_session_key,
|
|
'nobody',
|
|
'data_integration',
|
|
connection)
|
|
|
|
def get_template_mapping_fields_dict(self):
|
|
"""
|
|
Retrieves template mapping_fields from conf
|
|
@return: dictionary of template mapping_fields for each data type
|
|
"""
|
|
existing_templates = ['generic', 'nagios', 'solarwinds', 'scom', 'o11y', 'appdynamics', 'thousandeyes', 'cloudtrail']
|
|
template_mapping_fields_dict = {}
|
|
cfm = ConfManager(self.splunkd_session_key, 'SA-ITOA')
|
|
conf = cfm.get_conf('itsi_data_integration_template')
|
|
for template_name in existing_templates:
|
|
template = conf.get(template_name)
|
|
mapping_fields = json.loads(template.get('mapping_fields'))
|
|
template_mapping_fields_dict[template_name] = mapping_fields
|
|
return template_mapping_fields_dict
|
|
|
|
def transform_connection_objects(self, connection_objects):
|
|
"""
|
|
Function to transform connection objects to support coalesce and case
|
|
@param connection_objects: list of connection objects
|
|
@return: list of updated connection objects
|
|
"""
|
|
fields_to_update = ['severity_id', 'subcomponent', 'itsiDrilldownSearchName', 'itsiDrilldownEarliestOffset',
|
|
'itsiDrilldownLatestOffset', 'itsiDrilldownWebName', 'itsiDrilldownWebURL']
|
|
|
|
template_mapping_fields_dict = self.get_template_mapping_fields_dict()
|
|
# update connection objects
|
|
updated_connection_objects = []
|
|
for connection in connection_objects:
|
|
if 'is_out_of_the_box' in connection and connection['is_out_of_the_box'] == 1:
|
|
continue
|
|
connection_id = connection['_key']
|
|
logger.info(f'Migrating connection {connection_id}')
|
|
connection_data_source = connection['data_source']
|
|
cur_template_mapping_fields = template_mapping_fields_dict[connection_data_source]
|
|
cur_connection_mapped_fields = connection['mapped_fields']
|
|
for mapping in cur_template_mapping_fields:
|
|
field = mapping['name']
|
|
if field not in fields_to_update and not mapping['required']:
|
|
continue
|
|
# Check if required field exists in connection
|
|
# If field does not exist in connection, apply default case/coalesce values to mapping
|
|
if field not in cur_connection_mapped_fields:
|
|
updated_mapped_field = mapping.copy()
|
|
if ('input_type' in updated_mapped_field and updated_mapped_field['input_type'] == 'mapping_rule'
|
|
and updated_mapped_field['rule_type'] == 'case'):
|
|
for value in updated_mapped_field['values']:
|
|
outcome = value['outcomes'][0]
|
|
if isinstance(outcome, dict):
|
|
value['outcomes'] = [outcome['value']]
|
|
updated_mapped_field.pop('required', None)
|
|
cur_connection_mapped_fields[field] = updated_mapped_field
|
|
connection['mapped_fields'] = cur_connection_mapped_fields
|
|
logger.info(f'Transformed connection {connection_id} payload: {connection}')
|
|
updated_connection_objects.append(connection)
|
|
|
|
return updated_connection_objects
|