You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

371 lines
16 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
from __future__ import absolute_import
import json
import itsi_py3
from ITOA import itoa_common as utils
from ITOA.event_management.notable_event_utils import MethodType
from ITOA.setup_logging import logger
from ITOA.event_management.event_onboarding_utils import EventOnboardingUtils
from splunk import ResourceNotFound
from .base_event_management import BaseEventManagement
from .notable_event_error import NotableEventBadRequest
class ItsiDataIntegration(BaseEventManagement):
"""
Implements ITSI data integration object
"""
def __init__(self,
session_key,
current_user_name=None,
collection="itsi_data_integration",
object_type="data_integration",
user='nobody', title_validation_required=True):
"""
Initialize a Data Integration
"""
# in addition to the schema, other meta data are persisted. these are:
self.mod_time_key = 'modified_time'
self.create_time_key = 'created_time'
self.created_by_key = 'created_by'
self.modified_by_key = 'modified_by'
self.status_by_key = 'status'
self.logger = logger
self.identifying_name_key = 'identifying_name'
self.title_validation_required = title_validation_required
self.title_key = 'title'
self.key = '_key'
super(ItsiDataIntegration, self).__init__(
session_key, collection, object_type, user, current_user_name
)
self.event_onboarding_utils = EventOnboardingUtils(session_key, current_user_name, user)
def pre_processing(self, data_list, method):
"""
Adds create time / modified time information in the data
based on method type.
@type data_list: list
@param data_list: list of data to validate and add time, user info etc
@type method: basestring
@param method: method type
@rtype: list
@return: returns updated data (does in-place replacement of input)
"""
if not isinstance(data_list, list):
raise TypeError('Data is not a valid list, data_list type is %s.', type(data_list).__name__)
for data in data_list:
self.logger.info('pre_processing method=%s data= %s', method, data)
if not isinstance(data, dict):
raise TypeError('Data is not a valid dictionary, data type=%s.', type(data).__name__)
session_user = self.current_user_name
user = session_user if session_user else self.owner
time_value = utils.get_current_utc_epoch()
if method != MethodType.DELETE:
if self.status_by_key in data and data[self.status_by_key]:
data[self.status_by_key] = data[self.status_by_key].lower()
# We require an identifying name field for objects we sort using the saved pages system
data[self.identifying_name_key] = str(data.get('title', '')).strip().lower()
if method == MethodType.CREATE:
logger.info("Received request to create connection: {}".format(data))
data[self.create_time_key] = time_value
data[self.created_by_key] = user
# create correlation search for the connection
self.event_onboarding_utils.create_or_update_connection(data, create_connection=True)
if method == MethodType.UPDATE:
logger.info("Received request to update connection: {}".format(data))
data[self.mod_time_key] = time_value
data[self.modified_by_key] = user
# update the correlation search for the connection
self.event_onboarding_utils.create_or_update_connection(data, create_connection=False)
if method == MethodType.DELETE:
logger.info("Received request to delete connection: {}".format(data))
if self.id_key not in data:
message = "Data does not contain '{}' field".format(self.id_key)
logger.error(message)
raise Exception(message)
self.event_onboarding_utils.delete_connection(data[self.id_key], self)
return data_list
def do_title_validation(self, connections, validate_name=True):
"""
Generic object validation routine.
Currently, it only consists of title related validation.
All new object level validation should be invoked from here...
@type connections: list[dict]
@param connections: list of dict
@return: None
"""
if not self.title_validation_required:
# Skip the below code as it is only used for title validation
return
for json_data in connections:
if not utils.is_valid_name(json_data.get('title', None)):
self.raise_error_bad_validation('Invalid title specified for the object_type: %s. '
'Cannot be empty and cannot contain = " or \'.' % self.object_type)
# for create and update case, validate_name is always set to true.
if validate_name:
self.validate_title(connections)
def validate_title(self, connections):
"""
Check for valid and unique names for the objects, stored in the identifying_name
@param connections:
@return:
"""
# First guard against duplicates within passed in objects
unique_names = set()
duplicate_names = set()
invalid_names = set()
name_filter = []
for json_data in connections:
title = str(json_data.get(self.title_key)).strip()
if utils.is_valid_name(title):
title = title if isinstance(title, str) \
else title.decode('unicode-escape')
title = title.lower()
if title not in unique_names:
unique_names.add(title)
else:
duplicate_names.add(title)
# Append to filter to identify existing objects later that have the same identifying name as this object
name_based_filter = {'$and': [
{'identifying_name': title},
]}
name_filter.append(name_based_filter)
else:
invalid_names.add(str(title))
if len(invalid_names) > 0:
self.raise_error_bad_validation(
'Names cannot contain equal and quote characters. List of invalid names: %s.' % ', '.join(
list(invalid_names))
)
del invalid_names
if len(duplicate_names) > 0:
self.raise_error_bad_validation(
'Object names must be unique for object type: %s. List of duplicate names: %s.' % (
self.object_type, ', '.join(list(duplicate_names)))
)
del duplicate_names
del unique_names
# Now guard against duplicates against saved objects
filter_data = self.add_filtering(name_filter, connections)
kwargs = {}
kwargs['fields'] = ['_key', 'identifying_name', 'title']
kwargs['filter'] = filter_data
persisted_objects = self.get_bulk([], **kwargs)
if isinstance(persisted_objects, list) and len(persisted_objects) > 0:
duplicate_titles = set(
[persisted_object.get('title', '') for persisted_object in persisted_objects])
self.raise_error_bad_validation(
'Duplicate object name(s) found: {}. Please rename the object(s) before proceeding.'.format(
', '.join(duplicate_titles))
)
del duplicate_titles
def do_update_title_validation(self, connections, method, object_ids):
"""
This method is to verify that the title remains unchanged during an update operation
@param connections: request payload
@param method: Operation type - update or update_bulk
@param object_ids: keys of the objects to update
@return:
"""
key_filter = []
for json_data in connections:
if json_data.get(self.title_key) is None:
continue
if method == MethodType.UPDATE:
json_data[self.key] = object_ids[0]
key = str(json_data.get(self.key)).strip()
kay_based_filter = {'$and': [
{'_key': key},
]}
key_filter.append(kay_based_filter)
if len(key_filter) == 0:
return
filter_data = self.add_filtering(key_filter, connections)
kwargs = {}
kwargs['fields'] = ['_key', 'identifying_name', 'title']
kwargs['filter'] = filter_data
persisted_objects = self.get_bulk([], **kwargs)
mismatched_titles = []
if isinstance(persisted_objects, list) and len(persisted_objects) > 0:
for obj in connections:
key = obj.get('_key')
if obj.get(self.title_key) is None:
continue
title = obj.get('title')
matching_persisted_object = next(
(persisted_obj for persisted_obj in persisted_objects if persisted_obj.get('_key') == key), None)
if matching_persisted_object:
persisted_title = matching_persisted_object.get('title')
if persisted_title != title:
mismatched_titles.append(title)
if len(mismatched_titles) > 0:
self.raise_error_bad_validation(
'Title can not be changed. The following objects have different titles: {}.'.format(
', '.join(mismatched_titles))
)
def raise_error_bad_validation(self, message):
"""
@param message:
@return:
"""
raise NotableEventBadRequest(message)
def add_filtering(self, filter_data, objects=None):
"""
add filtering. Can be extended to add additional filtering
@type filter_data: string
@param filter_data: filter value
@type objects: list of dictionary
@param filter_data: list of itoa_objects
@return: dictionary of filter data
"""
return {'$or': filter_data}
def create(self, connection, **kwargs):
"""
create a Data Integration Connection
@type connection: dict/json type string
@param connection: data integration connection
@type kwargs: dict
@param kwargs: other k-v arguments which will never be used.
Mentioned here because of the way we wire things up.
"""
self.do_title_validation([connection])
result = super(ItsiDataIntegration, self).create(connection, **kwargs)
return result
def update(self, object_id, data, is_partial_update=False, **kwargs):
"""
update a Data Integration connection
@param object_id:
@param data:
@param is_partial_update:
@param kwargs:
@return:
"""
self.do_update_title_validation([data], MethodType.UPDATE, [object_id])
result = super(ItsiDataIntegration, self).update(object_id, data, is_partial_update, **kwargs)
return result
def update_bulk(self, object_ids, data_list, is_partial_update=False, skip_get_merge=False, **kwargs):
"""
update a Data Integration connections
@param object_ids:
@param data_list:
@param is_partial_update:
@param skip_get_merge:
@param kwargs:
@return:
"""
self.do_update_title_validation(data_list, MethodType.UPDATE_BULK, object_ids)
result = super(ItsiDataIntegration, self).update_bulk(object_ids, data_list, is_partial_update, skip_get_merge, **kwargs)
return result
def get(self, connection_id, **kwargs):
"""
@type policy_id: basestring
@param policy_id: a unique id for this given policy
@type kwargs: dict
@param kwargs: other k-v arguments which will never be used.
Mentioned here because of the way we wire things up.
"""
# no validations needed here
ret = super(ItsiDataIntegration, self).get(connection_id, **kwargs)
if not ret:
raise ResourceNotFound("Object %s does not exist." % connection_id)
return ret
def get_bulk(self, connection_ids, **kwargs):
"""
@type connection_ids: list
@param connection_ids: list of connection ids to fetch
@type kwargs: dict
@param kwargs: might contain filter criteria keyed by `filter_data`
"""
# no validations needed here
fields = kwargs.get('fields', None)
if type(fields) in itsi_py3.ext_string_type:
fields = fields.split(',')
if "object_type" not in fields:
fields.append("object_type")
kwargs['fields'] = fields
return super(ItsiDataIntegration, self).get_bulk(connection_ids, **kwargs)
def get_summary(self, owner, **kwargs):
"""
Get a summary of all integrations across all data sources
@param owner: the owner of the object set
"""
fields = kwargs.get('fields', None)
filter_data = kwargs.get('filter')
if type(fields) in itsi_py3.ext_string_type:
fields = fields.split(',')
if "data_source" not in fields:
fields.append("data_source")
try:
if isinstance(filter_data, itsi_py3.string_type) and len(filter_data) > 0:
filter_data = json.loads(filter_data)
except (ValueError, TypeError) as exc:
logger.exception(exc)
raise NotableEventBadRequest(
"Could not parse filter as provided '%s'. Must be a valid JSON format." % filter_data)
# in finality, always ensure filter_data is a dict type
filter_data = None if not isinstance(filter_data, dict) else filter_data
all_integrations = self.get_bulk(owner,
filter_data=filter_data,
sort_key=kwargs.get('sort_key', None),
sort_dir=kwargs.get('sort_dir', None),
fields=fields)
# metadata is used to pass parameters to instrumentation
formatted_data = {}
for entry in all_integrations:
data_source = entry["data_source"]
if data_source not in formatted_data:
formatted_data[data_source] = {"data_source": data_source, "connections": []}
connection_data = {key: entry[key] for key in entry if key != "data_source"}
formatted_data[data_source]["connections"].append(connection_data)
final_output = list(formatted_data.values())
return final_output
def delete(self, object_id, **kwargs):
self.logger.info("Received request to delete connection: {}", object_id)
connection = self.get(object_id, **kwargs)
if 'is_out_of_the_box' in connection and connection['is_out_of_the_box'] == 1:
msg = 'User is not allowed to delete out of the box data integration connection: {}'.format(object_id)
self.logger.error(msg)
raise Exception(msg)
return super(ItsiDataIntegration, self).delete(object_id, **kwargs)
def delete_bulk(self, object_ids, **kwargs):
raise NotImplementedError('%s operation is not supported for this %s object type' % ('delete_bulk',
self.object_type))