# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. """ The itsi base search object (and association) definition. Used in what may be called "kpi templates" It allows the users to abstract base searches away and make kpi creation easier See: https://confluence.splunk.com/pages/viewpage.action?title=ITSI+KPI+Search+Performance+Enhancements&spaceKey=PROD """ from splunk.util import normalizeBoolean from ITOA.itoa_object import ItoaObject, CRUDMethodTypes from .itsi_service import ItsiService from .itsi_entity_filter import ItsiEntityFilterRule from ITOA.itoa_factory import instantiate_object from .itsi_kpi import BASE_SEARCH_KPI_ATTRIBUTES, BASE_SEARCH_METRIC_KPI_ATTRIBUTES, \ DEFAULT_VALUE_KPI_ATTRIBUTES_DICT, validate_data_gaps_filling_options from itsi.searches.itsi_shared_base_search import ItsiSharedAdhocSearch from itsi.service_template.service_template_utils import ServiceTemplateUtils from ITOA.setup_logging import logger class ItsiKPIBaseSearch(ItoaObject): """ Implements ITSI base search ItoaObject methods Updates on the base search - If base search and other fields have been modified, then update the associated services - If the base search title has been modified, then update the base search association table - If the base search has been deleted, then delete the entries in the base search association table and update the associated services from "shared_base_search" to "adhoc" """ log_prefix = '[ITSI Base Search] ' def __init__(self, session_key, current_user_name): """ @param session_key: The active splunkd session key @param current_user_name: The user initializing the objects """ super(ItsiKPIBaseSearch, self).__init__(session_key, current_user_name, 'kpi_base_search', collection_name='itsi_services', is_securable_object=True) def templatize(self, owner, object_id, req_source='unknown'): """ Templatize given object id @type owner: basestring @param owner: context of the request `nobody` vs an actual user @type object_id: basestring @param object_id: unique identifier of an object to templatize @type req_source: basestring @param req_source: identified source initiating the operation. """ logger.debug('Templatize request received for `%s`', object_id) kpi_template = super(ItsiKPIBaseSearch, self).templatize(owner, object_id, req_source) logger.debug('Base class templatize returned=`%s`.', kpi_template) logger.debug('Templatizing metrics for `%s`', object_id) metrics = kpi_template.get('metrics', []) for metric in metrics: metric.pop('_key', None) kpi_template['metrics'] = metrics return kpi_template @staticmethod def kpi_attributes_compare(old_kpi, new_kbs): """ Determine if the base search attributes are identical to KPIs If they are - return true, else return false @param: old_kpi - The old kpi base search referencing KPI from a service @param: new_kbs - The new kpi base search """ # NOTE: Is it worth it to do this checking? Should I compress and loop through this part for attribute in BASE_SEARCH_KPI_ATTRIBUTES: if new_kbs.get(attribute) != old_kpi.get(attribute): return False return True @staticmethod def metric_attribute_compare(new_metric, old_metric): """ Determine if two metrics are identical to KPIs If they are - return true, else return False """ for attribute in BASE_SEARCH_METRIC_KPI_ATTRIBUTES: if new_metric.get(attribute) != old_metric.get(attribute): return False return True def _get_impacted_objects(self, owner, fetch_filter, impacted_object_type): """ Impacted objects (Services or Base Service Templates) with kpi entries that contain the base search ids associated with the objects that are being passed in. NOTE: Users of this method should keep in mind that the whole service and service template will be passed in. So one or more KPIS may match and the consumer has the responsibility of figuring out which one is which @type owner: basestring @param owner: The owner of the objects (typically 'nobody') @type fetch_filter: list of dict @param fetch_filter: list of base search id maps that will be used as a filter to fetch impacted objects from kvstore. example: [{'kpis.base_search_id': }, {'kpis.base_search_id': }] @type impacted_object_type: basestring @param impacted_object_type: impact object type as defined in objects manifest @rtype: list of dict @return: list of impacted objects (services or service templates) """ # All the items here are fresh - keyless. No matching services possible if len(fetch_filter) == 0: return [] object_interface = instantiate_object( self.session_key, self.current_user_name, impacted_object_type, logger=logger ) # get services containing KPIs of type shared_base & who have base # search ids that match us existing_objects = object_interface.get_bulk( owner, filter_data={ "$and": [ {'kpis.search_type': 'shared_base'}, {'$or': fetch_filter} ] } ) return existing_objects def _update_impacted_objects(self, updated_base_search, affected_objects, affected_object_dict, updated_objects_ids): """ Update impacted objects with kpi base search updates. Impacted objects could be KPIs in services and service templates, using base search. @type updated_base_search: dict @param updated_base_search: updated kpi base search object @type affected_objects: list of dict @param affected_objects: list of impacted services / service template objects @type affected_object_dict: dict @param affected_object_dict: map of impacted object id to object @param updated_objects_ids: list of impacted services / service templates keys, which have already been updated @type updated_objects_ids: list of basestring @rtype: list of dict @return: list of updated impacted objects """ if affected_objects is None: return [] new_metrics = updated_base_search.get('metrics', []) updated_objects = [] for object_id, affected_kpis in affected_objects.items(): updated = False object = affected_object_dict[object_id] # used for logging purposes object_type_log_string = 'Service' if object.get('object_type') == 'base_service_template': object_type_log_string = 'Base Service Template' kpis = object.get('kpis') for kpi in kpis: if kpi.get('_key') not in affected_kpis: continue base_search_id = kpi.get('base_search_id') if base_search_id is None: continue # Just skip if kpi.get('search_type') != 'shared_base': logger.warning('%s %s has kpi %s referencing a shared base search, but has the wrong search type' % (object_type_log_string, object_id, kpi.get('_key'))) continue is_metric_kbs = updated_base_search.get('is_metric', False) updated_kbs_metric = updated_base_search.get('metric', {}) kpi_metric = kpi.get('metric', {}) if is_metric_kbs and (updated_kbs_metric['metric_index'] != kpi_metric['metric_index'] or updated_kbs_metric['metric_name'] != kpi_metric['metric_name']): updated = True kpi_metric['metric_index'] = updated_kbs_metric['metric_index'] kpi_metric['metric_name'] = updated_kbs_metric['metric_name'] # Apply the changes that dont have to do with base search metrics non_metric_changes = not self.kpi_attributes_compare(kpi, updated_base_search) if non_metric_changes: updated = True for attribute in BASE_SEARCH_KPI_ATTRIBUTES: # Some fields in old shared base search are missing, populate defaults if attribute in updated_base_search: kpi[attribute] = updated_base_search.get(attribute) else: kpi[attribute] = DEFAULT_VALUE_KPI_ATTRIBUTES_DICT.get(attribute) # Now update metric fields metric_id = kpi.get("base_search_metric") if metric_id is None: logger.error('%s %s has kpi %s has no base search metric - Aborting update' % (object_type_log_string, object_id, kpi.get('_key'))) continue metric_found = False for new_metric in new_metrics: if new_metric['_key'].lower() != metric_id.lower(): # normalize metric key to lower case continue metric_found = True if not self.metric_attribute_compare(new_metric, kpi): updated = True for metric_attribute in BASE_SEARCH_METRIC_KPI_ATTRIBUTES: if metric_attribute not in new_metric: logger.info('KPI metric in base_search_id: %s, metric_key: %s does not have the' ' following attribute: %s', kpi.get('base_search_id'), new_metric['_key'], metric_attribute ) kpi[metric_attribute] = new_metric.get(metric_attribute, '') break # we should not hit this scenario for a service template. If a service template KPI is using a base # search metric, then we should not allow deletion of that metric. if not metric_found and object.get('object_type') != 'base_service_template': # Metric is deleted from shared base search, convert KPI to adhoc updated = True if kpi.get('search_type') == 'shared_base': if normalizeBoolean(kpi.get('is_metric', False)): kpi['search_type'] = 'metric' kpi['base_search'] = '' else: kpi['search_type'] = 'adhoc' del kpi['base_search_id'] del kpi['base_search_metric'] if 'is_metric' in kpi: del kpi['is_metric'] del kpi['metric_qualifier'] # No updates to the base search, everything that was changed here # can be passed directly to the statestore if updated and object_id not in updated_objects_ids: updated_objects_ids.append(object_id) updated_objects.append(object) return updated_objects @staticmethod def _get_base_search_to_impacted_objects_map(impacted_objects): """ Generates a base search id to impacted object id to kpis list map. @type impacted_objects: list of dict @param impacted_objects: list of impacted services or service templates @rtype: tuple @return: tuple of base search id to impacted object id to kpis list map and impacted object id to object map """ # A dictionary of base search ids to to a dict of kpi id lists, see sample # {'bsid1':{'svc1':['kpi1','kpi2']}} bs_to_object_kpi_dict = {} object_dict = {} # I tried to do this all in one line, but it was a wee bit unreadable for obj in impacted_objects: object_dict[obj.get('_key')] = obj kpis = obj.get('kpis') for kpi in kpis: base_search = kpi.get('base_search_id') if base_search is not None: # Add to the dict if base_search not in bs_to_object_kpi_dict: bs_to_object_kpi_dict[base_search] = {obj.get('_key'): [kpi.get("_key")]} else: object_map = bs_to_object_kpi_dict[base_search] if obj.get('_key') not in object_map: bs_to_object_kpi_dict[base_search][obj.get("_key")] = [kpi.get("_key")] else: bs_to_object_kpi_dict[base_search][obj.get("_key")].append(kpi.get("_key")) return bs_to_object_kpi_dict, object_dict def update_kpi_base_search(self, owner, objects, transaction_id=None): """ The base searches have been updated; upsert OR update Perform the appropriate action within this context @type owner: basestring @param owner: A string representing the user making the change @type objects: list of dict @param objects: The base searches that we're changing @type transaction_id: basestring @param transaction_id: for transaction tracing """ # Unfortunately, we need to determine the nature of the change for the objects id_array = [] impacted_objects_fetch_filter = [] base_searches = {} for obj in objects: if '_key' in obj: id_array.append({'_key': obj.get('_key')}) impacted_objects_fetch_filter.append({'kpis.base_search_id': obj.get('_key')}) base_searches[obj.get('_key')] = obj if len(id_array) == 0: return # No existing associations, its all new stuff services = self._get_impacted_objects(owner, impacted_objects_fetch_filter, 'service') service_templates = self._get_impacted_objects(owner, impacted_objects_fetch_filter, 'base_service_template') if len(services) == 0 and len(service_templates) == 0: return # No services / service templates / kpis are affected bs_to_service_kpi_dict, service_dict = self._get_base_search_to_impacted_objects_map(services) bs_to_service_template_kpi_dict, service_templates_dict = self._get_base_search_to_impacted_objects_map( service_templates ) updated_services = [] updated_service_ids = [] updated_service_templates = [] updated_service_template_ids = [] # The meat of the method for nbs_key, nbs in base_searches.items(): affected_services = bs_to_service_kpi_dict.get(nbs['_key'], {}) affected_service_templates = bs_to_service_template_kpi_dict.get(nbs['_key'], {}) updated_services.extend( self._update_impacted_objects(nbs, affected_services, service_dict, updated_service_ids) ) updated_service_templates.extend( self._update_impacted_objects(nbs, affected_service_templates, service_templates_dict, updated_service_template_ids) ) if updated_services: # OKAY! After all of that if we have a non-empty list of the services # That we need to update # These service updates need to go through the official channels, so that all the necessary # Cruft gets applied service_object = ItsiService(self.session_key, self.current_user_name) service_object.skip_service_template_update = True service_object.save_batch(owner, updated_services, False) if updated_service_templates: service_template_interface = instantiate_object( self.session_key, self.current_user_name, 'base_service_template', logger=logger ) # we do not really need to perform any dependencies update for service templates. # therefore, directly call backend batch save to update service templates service_template_interface.batch_save_backend( owner, updated_service_templates, transaction_id=transaction_id ) def delete_kpi_base_search(self, owner, objects, req_source='unknown'): """ The base searches listed have been deleted. Remove any service associations and save @param owner: The service/kpi owner @param objects: The base search objects that we'll be matching on """ # For convenience, store the ids of the array id_array = [] fetch_filter = [] for obj in objects: if '_key' in obj: id_array.append(obj.get('_key')) fetch_filter.append({'kpis.base_search_id': obj.get('_key')}) services = self._get_impacted_objects(owner, fetch_filter, 'service') if len(services) == 0: logger.debug('Zero services affected from deletion of base searches "%s".', objects) return for svc in services: for kpi in svc.get('kpis', []): if kpi.get('search_type') == 'shared_base' and kpi.get('base_search_id') in id_array: # We have a match for the kpi that we need to edit. Change the method to adhoc # And remove the base search id reference if normalizeBoolean(kpi.get('is_metric', False)): kpi['search_type'] = 'metric' kpi['base_search'] = '' else: kpi['search_type'] = 'adhoc' del kpi['base_search_id'] del kpi['base_search_metric'] if 'is_metric' in kpi: del kpi['is_metric'] del kpi['metric_qualifier'] # Update affected services & KPIs. They will need savedsearch entries # and such. Plus we should not have any need for title validations or # creating refresh jobs for these services. op = ItsiService(self.session_key, self.current_user_name) op.skip_service_template_update = True results = op.save_batch(owner, services, False, req_source=req_source, ignore_refresh_impacted_objects=False) if len(results) != len(services): service_ids = [s['_key'] for s in services] logger.error('There was an error saving the services to the backend. Length of results=%s.' 'Length of services=%s', len(results), len(services)) logger.debug('Incomplete Results=%s', results) logger.debug('Service_IDs=%s', service_ids) # Probably better here to keep the shared searches rather than delete them return entity_filter_rule_filter = [] for bs_id in id_array: # Kind of a hack here, but I dont need or want to get the services or base search # So should delete_splunk_search should work it out ItsiSharedAdhocSearch(self.session_key, bs_id, base_search={}, services=[]).delete_splunk_search() # Also need to take care of the related/previously used entity filter rules entity_filter_rule_filter.append({'base_search_id': bs_id}) if entity_filter_rule_filter: try: efr = ItsiEntityFilterRule(self.session_key, 'nobody') efr.delete_bulk('nobody', filter_data={'$or': entity_filter_rule_filter}) logger.info('Successfully deleted entity filter rules for base search with ID: "%s".', bs_id) except Exception as e: logger.exception( 'Caught exception when trying to delete entity filter rule relating to base search with id "{}": {}' .format(bs_id, e) ) return def _validate_metrics(self, metrics, is_metric_search): """ Validate metrics objects: _ validate data gaps filling attribute (fill_gaps) for metrics in KPI Base Search. _ validate threshold attribute (threshold_field) for metrics in KPI Base Search. @type metrics: list of dict @param metrics: list of metric objects in kpi base search @param is_metric: indicates whether the kpi base search uses a metrics search """ for metric in metrics: is_valid, msg = validate_data_gaps_filling_options(metric, object_type='kpi_base_search') if not is_valid: msg += ', metric_name="%s"' % metric.get('title') self.raise_error_bad_validation(logger, msg) # this most likely can happen when the metric is created while the base search is of type metric # then the base search is updated to type adhoc but the metric is missing a threshold field. if not is_metric_search and not metric.get('threshold_field'): self.raise_error_bad_validation(logger, 'Metric %s requires a threshold field.' % metric.get('title')) ################################################################## # ItoaObject specific methods ################################################################# def identify_dependencies(self, owner, objects, method=CRUDMethodTypes.METHOD_UPSERT, req_source='unknown', transaction_id=None, skip_local_failure=False): """ Extended as a part of the contract for ItoaObject subclasses, this one makes sure that services depending on the KPIBaseSearch have updates issued for them See the ERD for information on what needs to be done here https://confluence.splunk.com/pages/viewpage.action?title=ITSI+KPI+Search+Performance+Enhancements&spaceKey=PROD """ if method == CRUDMethodTypes.METHOD_DELETE: self.delete_kpi_base_search(owner, objects, req_source) # Dont really need change handlers here since post save will take care of most updates return False, [] def post_save_setup(self, owner, ids, objects, req_source='unknown', method=CRUDMethodTypes.METHOD_UPSERT, transaction_id=None, skip_local_failure=False): """ Extended as a part of the contract for ItoaObject subclasses, this one performs the optional step of creating/updating the saved searches that apply to them @param owner: The owner @type owner: string @param ids: An array of dicts in the form of {"_key":"} @type ids: list @param objects: The objects used in the create call @type objects: List of dicts @param req_source: Required source @type req_source: string @param method: The method string used to generate the data @type method: CRUDMethodType constant """ if len(ids) != len(objects): raise Exception('Error getting the appropriate id array for object array') if method == CRUDMethodTypes.METHOD_UPDATE or method == CRUDMethodTypes.METHOD_UPSERT: # We're doing an upsert or update, for both of these we have more complicated adjustments to make self.update_kpi_base_search(owner, objects, transaction_id=transaction_id) for idx, base_search in enumerate(objects): bs_id = ids[idx] if isinstance(bs_id, dict): bs_id = bs_id['_key'] search = ItsiSharedAdhocSearch(self.session_key, bs_id, base_search) try: search.create_splunk_search() except Exception as e: if skip_local_failure: logger.warning('Local failure skipped: %s', e) else: raise e def do_additional_setup(self, owner, objects, req_source='unknown', method=CRUDMethodTypes.METHOD_UPSERT, transaction_id=None, skip_local_failure=False): """ Any additional setup required before saving base search should be done here. @type owner: basestring @param owner: request owner. "nobody" or some username. @type objects: list @param objects: List of base search type objects @type req_source: basestring @param req_source: Source requesting this operation. @type method: basestring @param method: operation type. Defaults to upsert. @type transaction_id: basestring @param transaction_id: transaction id for end-end tracing. @rtype: None @return: Nothing """ # Do not allow delete of kpi base search metric if it's used by service templates if method == CRUDMethodTypes.METHOD_UPDATE or method == CRUDMethodTypes.METHOD_UPSERT: # check if the metric is used by service template results = ServiceTemplateUtils(self.session_key, self.current_user_name)\ .get_base_search_used_metric_not_deleted(objects) if not results: self.raise_error_bad_validation(logger, ('KPI base search metric cannot be deleted because it' ' is being used by one or more service templates.')) # PBL-5603: changes made in this story, allow user to split KPI by a different entity field from # entity filtering field. As a part of this change, new field 'entity_breakdown_id_fields' # was added to KPI object. To guard against migration issues and cases where # 'entity_breakdown_id_fields' would be missing in kpi object, added following check. We fall back # to 'entity_id_fields', in cases when 'entity_breakdown_id_fields' is missing. for base_search in objects: # By the definition of macro "match_filter_entites", entity_id_fields of the kpi base search can take # only one value. Fail to create/update kpi base search if more than one value is provided. if base_search.get('is_service_entity_filter', False): base_id_fields = base_search.get('entity_id_fields', None) if base_id_fields: if len(base_id_fields.split(",")) > 1: logger.debug('User has specified a compound alias consisting of: {}.'.format(base_id_fields)) if base_search.get('is_entity_breakdown', False): entity_breakdown_id_fields = base_search.get('entity_breakdown_id_fields', None) if entity_breakdown_id_fields is None or len(entity_breakdown_id_fields) == 0: base_search['entity_breakdown_id_fields'] = base_search.get('entity_id_fields', '') logger.debug('entity_breakdown_id_fields missing from base search object = {}. ' 'Setting it to entity_id_fields.'.format(base_search.get('_key'))) else: logger.debug('User has specified a compound alias consisting of: {}.' .format(entity_breakdown_id_fields)) self._validate_metrics(base_search.get('metrics', []), normalizeBoolean(base_search.get('is_metric'))) def can_be_deleted(self, owner, objects, raise_error=False, transaction_id=None): # Do not allow delete of kpi base search if it's used by service templates results = ServiceTemplateUtils(self.session_key, self.current_user_name)\ .get_objects_not_used_by_service_templates(self.object_type, objects) if raise_error and not results: self.raise_error_bad_validation(logger, ('KPI base search cannot be deleted because it is' ' being used by one or more service templates.')) return results