# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. from ITOA.setup_logging import InstrumentCall from multiprocessing.pool import ThreadPool class ItoaChangeHandler(object): """ ItoaChangeHandler - Abstract class defining the contract for a change handler """ DEFAULT_THREAD_POOL_SIZE = 8 _internal_thread_pool = None def __init__(self, logger, session_key, thread_pool=None): """ @param logger: logger instance to use for all log messages @param session_key: session key to use to access splunk resources @param thread_pool: An optional existing thread pool for performing multi-threaded operations. if None then an internal pool will be created. """ super(ItoaChangeHandler, self).__init__() self.logger = logger self.session_key = session_key self._instrumentation = InstrumentCall(self.logger) self.thread_pool = thread_pool if thread_pool else ItoaChangeHandler._get_thread_pool() @classmethod def _get_thread_pool(cls): if cls._internal_thread_pool is None: cls._internal_thread_pool = ThreadPool(processes=cls.DEFAULT_THREAD_POOL_SIZE) return cls._internal_thread_pool def deferred(self, change, transaction_id=None): """ Determine the list of impacted objects from a specific change event And then perform any transformations that need to be applied so that we can put the system into a consistent state Note: If you make any changes to multiple objects, please use the bulk methods @type change: dict @param change: The object describing the change that occurred { _key: system generated key create_time: epoch time of the CUD event that occurred changed_object_key: [key(s) of the changed object(s)] changed_object_type: String identifier of the object type in the changed_object change_type: The type of change that occurred object_type: 'refresh_job' } @type transaction_id: basestring @param transaction_id: id to correlate different function calls to a single transaction @return: A boolean indicating success or failure """ raise NotImplementedError() def assert_valid_change_object(self, change): """ @param change: The object describing the change that occurred. :raises AttributeError if any required fields are missing or values are of incorrect type """ required_attrs = ['_key', 'create_time', 'changed_object_key', 'changed_object_type', 'change_type', 'object_type'] missing_attr = [] for attr in required_attrs: value = change.get(attr, None) if value is None: missing_attr.append(attr) if len(missing_attr) > 0: raise AttributeError('Missing the following required attributes: %s' % ','.join(missing_attr)) if not isinstance(change.get('changed_object_key'), list): raise AttributeError('Expecting changed_object_key to be of type list') object_type = change.get('object_type') if object_type != 'refresh_queue_job': raise AttributeError('Expecting object_type to equal "refresh_queue_job"') def assert_valid_change_object_synchronous(self, change): """ Note: This method to be used if the call is made synchronously and not via the refresh queue job entry in the storage. @param change: The object describing the change that occurred. :raises AttributeError if any required fields are missing or values are of incorrect type """ # refresh queue job key and object type will not be created for synchronous operation required_attrs = ['create_time', 'changed_object_key', 'changed_object_type', 'change_type'] missing_attr = [] for attr in required_attrs: value = change.get(attr, None) if value is None: missing_attr.append(attr) if len(missing_attr) > 0: raise AttributeError('Missing the following required attributes: %s' % ','.join(missing_attr)) if not isinstance(change.get('changed_object_key'), list): raise AttributeError('Expecting changed_object_key to be of type list') def should_remove_duplicates(self, change): """ Determine if duplicates should be removed for this type of change Can be overridden by subclasses, default to false @param change: The object describing the change that occurred @return: Boolean """ return False