# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. import time from ITOA.itoa_common import is_valid_dict, is_valid_list, is_valid_str, validate_json from ITOA.storage import itoa_storage from ITOA.setup_logging import logger from .notable_event_utils import Audit, MethodType, NotableEventConfiguration class EventManagementException(Exception): pass def time_function_call(fx): """ This decorator will provide a log message measuring how long a function call took. Arguments: fx -- The function to measure """ def wrapper(*args, **kwargs): logger.debug("Started operation=%s", fx.__name__) t = time.time() r = fx(*args, **kwargs) logger.info('[Change Tracking] Successfully called operation="%s"', fx.__name__) diff = time.time() - t diff_string = str(round(diff, 2)) + " seconds" logger.info('[Performance Tracking] Completed Notable Event operation="%s", duration="%s"', fx.__name__, diff_string) return r return wrapper class BaseEventManagement(object): """ A generic class which has CURD operation and bulk curd operation to perform for any object level """ # Key which hold id id_key = '_key' def __init__(self, session_key, collection, object_type, user='nobody', current_user_name=None, action_dispatch_config=None): """ Initialize objects @type session_key: basestring @param session_key: session_key @type collection: basestring @param collection: collection name @type object_type: basestring @param object_type: object type @type user: basestring @param user: user name @type current_user_bame: basestring @param current_user_name: user name @type action_dispatch_config: ActionDispatchConfiguration @param action_dispatch_config: the setting for hybrid action dispatch @rtype: object @return: instance of the class """ if not is_valid_str(session_key): message = "Invalid session key." logger.error(message) raise ValueError(message) else: self.session_key = session_key self.action_dispatch_config = action_dispatch_config self.audit = None self.notable_event_configuration = None self.host_base_uri = '' self.master_session_key = self.session_key self.owner = user self.current_user_name = current_user_name if not is_valid_str(collection): logger.error("Invalid collection name=%s", collection) raise ValueError('Invalid collection name') else: self.collection = collection self.object_type = object_type self.object_type_key = 'object_type' self.storage_interface = itoa_storage.ITOAStorage(collection=self.collection) def pre_processing(self, data_list, method): """ This is being used by inherit class which can be used to validate schema or inject some default values like time etc @type data_list: list @param data_list: data list @type method: basestring @param method: method name @return: None """ pass def get_filter_data(self, object_ids): """ return filter base upon _key @type: object_ids: list @param object_ids: object list @rtype: basestring @return: return filter string """ if is_valid_list(object_ids): return {'$or': [{self.id_key: object_id} for object_id in object_ids]} else: raise TypeError('%s is not list' % object_ids) def merge_filter_data(self, filter_data, new_data): """ Merge filter passed in request and filter created later Update filter_data in place @type filter_data: dict @param filter_data: filter data @type new_data: dict @param new_data: newly create filter @return: """ if not is_valid_dict(new_data) or not is_valid_dict(filter_data): raise TypeError('Invalid filter data to merge') for key, value in new_data.items(): if key in filter_data: filter_data[key].extend(value) else: filter_data[key] = value return validate_json('[event_management_interface]', filter_data) def get_user(self, **kwargs): """ Return user @param kwargs: dict which hold some configuration @rtype: basestring @return: return user """ return self.owner if self.owner else kwargs.get('owner') def fetch_filter_data(self, **kwargs): """ Check filter data in kwargs and return dict form of it @type kwargs: dict @param kwargs: kwargs @rtype: dict @return: return filter data """ filter_data = {} if kwargs.get('filter_data') and kwargs.get('filter') in kwargs: f_data_1 = kwargs.get('filter_data') f_data_2 = kwargs.get('filter') if f_data_1: f_data_1 = validate_json('[event_management_interface]', f_data_1) if f_data_2: f_data_2 = validate_json('[event_management_interface]', f_data_2) if f_data_1 is not None and filter_data is not None: filter_data = self.merge_data(f_data_1, f_data_2) else: filter_data = f_data_1 or f_data_2 else: filter_data = kwargs.get('filter') or kwargs.get('filter_data') if filter_data: filter_data = validate_json('[event_management_interface]', filter_data) return filter_data or {} def inject_object_type(self, data_list): """ Insert object type if it is not set @type data_list: list @param data_list: data list @return: in place update """ # make sure object type is set for data in data_list: if 'object_type' not in data: data['object_type'] = self.object_type elif data.get('object_type') != self.object_type: data['object_type'] = self.object_type @time_function_call def create(self, data, **kwargs): """ Create notable event object @type data - dict @param data - notable event schema to create @rtype dict @return create object _key or raise an exception """ if is_valid_dict(data): self.inject_object_type([data]) self.pre_processing([data], MethodType.CREATE) result = self.storage_interface.create( self.master_session_key, self.get_user(**kwargs), self.object_type, data, current_user_name=self.current_user_name, host_base_uri=self.host_base_uri ) logger.debug("Create %s object ID=%s.", self.object_type, result.get(self.id_key)) return result else: message = "Data is not a valid dictionary, data type=%s." % type(data) logger.error(message) raise TypeError(message) def create_for_group(self, data, **kwargs): """ Create stuff for events in a Group. """ raise NotImplementedError('Derived class must implement this method') @time_function_call def create_bulk(self, data_list, **kwargs): """ Create more than one notable event object @type data_list: list @param data_list: data list @rtype: list @return: list of created """ try: validate_json('[Notable Event Curd]', data_list) except Exception as e: logger.exception(e) message = 'Invalid JSON list to do bulk create.' logger.error(message) raise TypeError(message) # make sure object type is set self.inject_object_type(data_list) self.pre_processing(data_list, MethodType.CREATE_BULK) results = self.storage_interface.batch_save( self.master_session_key, self.get_user(**kwargs), data_list, objecttype=self.object_type, current_user_name=self.current_user_name, host_base_uri=self.host_base_uri ) return results @time_function_call def get(self, object_id, **kwargs): """ Get notable event object @type object_id: basestring @param object_id: notable event key @rtype: dict @return: return notable event schema """ if is_valid_str(object_id): result = self.storage_interface.get( self.master_session_key, self.get_user(**kwargs), self.object_type, object_id, current_user_name=self.current_user_name, host_base_uri=self.host_base_uri ) return result else: message = 'Invalid key to get object, value=%s.' % object_id logger.error(message) raise TypeError(message) @time_function_call def get_bulk(self, object_ids, **kwargs): """ Get one or more than one notable event objects @type object_ids: list @param object_ids: list of objects to get Note: if object list is empty or not defined then get all objects @type kwargs: dict @param kwargs: extra arguments to fetch notable events @rtype: list @return: list of notable events """ filter_data = self.fetch_filter_data(**kwargs) if is_valid_list(object_ids) and len(object_ids) != 0: self.merge_filter_data(filter_data, self.get_filter_data(object_ids)) logger.debug('Updated filter data=%s', filter_data) fields = kwargs.get('fields', []) if not fields and kwargs.get('f'): fields = kwargs.get('f') kwargs['fields'] = fields if isinstance(fields, str) and ',' in fields: kwargs['fields'] = fields.split(',') limit = kwargs.get('count') skip = kwargs.get('offset') if limit is None and skip is None: # If count and offset are undefined, try limit and skip limit = kwargs.get('limit') skip = kwargs.get('skip') results = self.storage_interface.get_all( self.master_session_key, self.get_user(**kwargs), self.object_type, sort_key=kwargs.get('sort_key'), filter_data=filter_data, sort_dir=kwargs.get('sort_dir'), fields=kwargs.get('fields'), skip=skip, limit=limit, current_user_name=self.current_user_name, host_base_uri=self.host_base_uri ) logger.debug("Return %s notable events", len(results)) return results def get_and_merge_data_list(self, object_ids, data_list, is_partial_update=True, **kwargs): """ Useful function to do partial update. Its merge data from backend with request data @type object_ids: list @param object_ids: list of objects to fetch from backend @type data_list: list @param data_list: data list which is passed in the request @type is_partial_update: bool @param is_partial_update: set to true it is partial update @param **kwargs: Key word arguments to provide additional args to those who override this method @rtype: list @return: Merged data """ results = self.get_bulk(object_ids, **kwargs) is_group_state_action = False if isinstance(data_list, list) and len(data_list) > 0: # if there is 'group_state_change_action' key in the value then this is a group state change action is_group_state_action = 'group_state_change_action' in data_list[0] logger.info('is_group_state_action=%s objects_to_modify=%s', is_group_state_action, object_ids) if (results is None or len(results) == 0) and not is_group_state_action: logger.error('Failed to get objects=%s from kv store', object_ids) raise EventManagementException('Failed to get objects=%s from kv store' % (str(object_ids))) mapped_objects = {} for data in data_list: if data.get(self.id_key) not in mapped_objects: mapped_objects[data.get(self.id_key)] = {'data': data, 'fdata': None} for result in results: if result.get(self.id_key) in mapped_objects: mapped_objects[result.get(self.id_key)]['fdata'] = result for value in mapped_objects.values(): if is_group_state_action: if value['fdata']: # updating an existing group fields_to_update = value['data'].get('fields_to_update', []) # remove the fields which don't have to be updated. Can't delete items of dict while iterating # hence getting a copy of the keys the dict's keys by converting the keys to list for key in list(value['data']): # don't delete the action_id. It's needed in data_list for logging purposes if key == 'action_id': continue if key not in fields_to_update: value['data'].pop(key, None) logger.info('group_state_change_action updating the group group_id=%s fields_to_update=%s', value['data'].get('_key', None), value['data']) else: # no existing group in KV Store, let's create a new one # remove fields_to_update and group_state_change_action because we don't want to insert these # fields in the collection value['data'].pop('fields_to_update', None) value['data'].pop('group_state_change_action', None) # no existing object in KV Store so let's create a new one by inserting the whole object passed value['fdata'] = value['data'] logger.info('group_state_change_action creating new group group_id=%s new_group_object=%s', value['data'].get('_key', None), value['data']) continue self.merge_data(value['fdata'], value['data'], is_partial_update) logger.debug('final_merged_data data_list=%s', data_list) return data_list def get_and_merge_data(self, object_id, data, is_partial_update=True, **kwargs): """ Similar function but it deals with one object instead of list Note: this is inplace update to data dict @type object_id: basestring @param object_id: object id @type data: dict @param data: data is passed in the request @type is_partial_update: bool @param is_partial_update: set to true it is partial update @param **kwargs: Key word arguments to provide additional args to those who override this method @rtype: dict @return: Merge data """ result = self.get(object_id) if result is None: errorMsg = "Failed to get object ID=%s from KV store." % object_id logger.error(errorMsg) raise EventManagementException(errorMsg) return self.merge_data(result, data, is_partial_update) def merge_data(self, fetched_data, data, is_partial_update=True): """ Helper function to merge request data with backend data Note: this is inplace update to data dict @type fetched_data: dict @param fetched_data: Fetch data from backend @param data: dict @param data: request data @type is_partial_update: bool @param is_partial_update: set to true it is partial update @rtype: dict @return: return updated data (inplace update to data) """ if data is None or fetched_data is None: logger.error("data or fetched data is None") raise EventManagementException('data or fetched data is None') for key, value in fetched_data.items(): if is_partial_update and key not in data: data[key] = value return data @time_function_call def update(self, object_id, data, is_partial_update=False, **kwargs): """ Update one notable event object @type object_id: basestring @param object_id: object id @type data: dict @param data: data @type is_partial_update: bool @param is_partial_update: flag to do partial update @type kwargs: dict @param kwargs: Extra parameters @rtype: dict @return: return dict which holds updated keys """ if is_valid_str(object_id): self.get_and_merge_data(object_id, data, is_partial_update, **kwargs) self.inject_object_type([data]) self.pre_processing([data], MethodType.UPDATE) result = self.storage_interface.edit( self.master_session_key, self.get_user(**kwargs), self.object_type, object_id, data, current_user_name=self.current_user_name, host_base_uri=self.host_base_uri ) return result else: message = 'Object ID is not a valid string, value=%s.' % object_id logger.error(message) raise TypeError(message) @time_function_call def update_bulk(self, object_ids, data_list, is_partial_update=False, skip_get_merge=False, **kwargs): """ Perform update for one or more notable event objects @type object_ids: list @param object_ids: notable events @type data_list: list @param data_list: notable events @type is_partial_update: bool @param is_partial_update: flag for partial update @type skip_get_merge: bool @param skip_get_merge: flag for skipping calling get_and_merge_data_list or not @type kwargs: dict @param kwargs: Extra params to perform @rtype: list @return: update notable event schema """ if is_valid_list(object_ids): if not skip_get_merge: self.get_and_merge_data_list(object_ids, data_list, is_partial_update, **kwargs) self.inject_object_type(data_list) self.pre_processing(data_list, MethodType.UPDATE_BULK) results = self.storage_interface.batch_save( self.master_session_key, self.get_user(**kwargs), data_list, objecttype=self.object_type, current_user_name=self.current_user_name, host_base_uri=self.host_base_uri ) return results else: message = 'Object IDs is not a valid list, value=%s.' % object_ids logger.error(message) raise TypeError(message) @time_function_call def delete(self, object_id, **kwargs): """ Delete notable event object from KV store @type object_id: basestring @param object_id: object id @type kwargs: dict @param kwargs: extra params @return: None """ if is_valid_str(object_id): self.pre_processing([{self.id_key: object_id}], MethodType.DELETE) logger.debug('Deleting %s:%s event', self.id_key, object_id) return self.storage_interface.delete( self.master_session_key, self.get_user(**kwargs), self.object_type, object_id, current_user_name=self.current_user_name, host_base_uri=self.host_base_uri ) else: message = 'Id cannot be empty or invalid id=%s.' % object_id logger.error(message) raise TypeError(message) @time_function_call def delete_bulk(self, object_ids, **kwargs): """ Delete bulk @type object_ids: list @param object_ids: object list to delete @type kwargs: dict @param kwargs: extra params to delete @return: """ filter_data = self.fetch_filter_data(**kwargs) if is_valid_list(object_ids) and len(object_ids) != 0: self.merge_filter_data(filter_data, self.get_filter_data(object_ids)) if filter_data: filter_data = validate_json('[event_management_interface]', filter_data) if isinstance(object_ids, list): self.pre_processing([{self.id_key: eid for eid in object_ids}], MethodType.DELETE_BULK) logger.debug('Deleting events ids=%s, other arguments=%s', object_ids, filter_data) return self.storage_interface.delete_all( self.master_session_key, self.get_user(**kwargs), self.object_type, filter_data, current_user_name=self.current_user_name, host_base_uri=self.host_base_uri ) def send_activity_to_audit(self, data, activity, type): """ Send audit event to itsi_notable_audit index on episode updates, it will perform lazy initialization for Audit object. @type data: list or dict @param data: Content of event or list of events that is sent to audit index @type activity: list or str @param activity: String to describe the activity @type type: str @param type: Activity Type @return: """ if not self.audit: self.audit = Audit(self.session_key, audit_token_name='Auto Generated ITSI Notable Index Audit Token') if isinstance(data, list) and isinstance(activity, list): self.audit.send_activity_to_audit_bulk(data, activity, type) elif isinstance(data, dict) and isinstance(activity, str): self.audit.send_activity_to_audit(data, activity, type) else: logger.error('Unsupported data (value=%s) or activity (value=%s)', data, activity) def lazy_init_notable_event_configuration(self): """ Lazy initialize notable event configuration object. :return: """ if self.notable_event_configuration is None: if self.action_dispatch_config: self.host_base_uri = self.action_dispatch_config.remote_ea_mgmt_uri self.master_session_key = self.action_dispatch_config.get_master_host_session_key() # Perform lazy initialization self.notable_event_configuration = NotableEventConfiguration( self.master_session_key, logger, host_base_uri=self.host_base_uri )