# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. """ Contains the main methods and base class used for generating a shared base search (a single saved search that works with multiple kpis) Currently we only have support for Adhoc searches TO THE FUTURE: if reasonable, make shared datamodel searches a separate class """ import itsi_py3 from splunk import ResourceNotFound from splunk.util import normalizeBoolean import ITOA.itoa_common as utils from ITOA.storage import itoa_storage from itsi.searches.itsi_filter import ItsiFilter from itsi.objects.itsi_entity_filter import ItsiEntityFilterRule from ITOA.setup_logging import logger from ITOA.saved_search_utility import SavedSearch class ItsiSharedAdhocSearch(utils.ItoaBase): """ Abstraction for anything related to ITSI Shared KPI base searches Key component here is search generation for multiple KPIs using a shared base search """ # Defined out here because of annoying kpi delete construction search_prefix = 'Indicator - Shared - ' entity_magic = "%ENTITY_FILTER%" def __init__(self, session_key, bs_id, base_search=None, services=None): """ Initializes the adhoc shared saved search @param session_key: The splunkd session key @param bs_id: The base search id @param base_search: An optional parameter of the base search associated with the key @param services: A list of service objects to be used instead of going to the kvstore @type services: list """ super(ItsiSharedAdhocSearch, self).__init__(session_key) self.backend = itoa_storage.ITOAStorage().get_backend(session_key) self.shared_base_search_id = bs_id if not isinstance(bs_id, itsi_py3.string_type): raise Exception('KPI base search ID must be a valid string.') if base_search is not None: # If it doesnt exist, dont throw the exception if base_search.get('_key', bs_id) != bs_id: raise Exception('KPI base search passed in does not match key passed in.') else: # We need to look up the base search ourselves base_search = self.backend.get(session_key, 'nobody', 'kpi_base_search', bs_id) if base_search is None: raise Exception('Cannot find KPI base search with ID: %s.' % bs_id) self.base_search = base_search # determine if it's a metric base search self.is_metric = normalizeBoolean(self.base_search.get("is_metric", False), False) # At this point we have the shared base search # Lookup all the services and kpis that use the shared base search if services is None: services = self.backend.get_all(session_key, 'nobody', 'service', filter_data={"$and": [{"kpis.base_search_id": bs_id}, {"kpis.search_type": "shared_base"}, {"kpis.enabled": 1}]}) if len(services) == 0 or services is None: # No services use this base search # This MIGHT be an error or a warning, but I'm going # To log it as warning for now, since this might be # a normal mode of operation logger.info("No services matched base search id %s" % bs_id) services = [] # Disambiguation reassignment # Create a structure that has all kpis from the services that we found # And that makes service lookups a bit easier self.kpis = {} self.services = {} for svc in services: kpis = svc.get("kpis") # This structure is a little easier to work with service_key = svc.get("_key") # Flag is kpis in service has shared search is_found = False for kpi in kpis: # Front end sometimes does not update base_search_id so check for # search type too if kpi.get("base_search_id") == bs_id and kpi.get('search_type') == 'shared_base': is_found = True if service_key not in self.kpis: self.kpis[service_key] = [kpi] else: self.kpis[service_key].append(kpi) if is_found: self.services[service_key] = svc # Per ITOA-4442 Make a dictionary of "aliases". We'll need to carry this around so that when the situation # arises. We can map to the appropriate metric self.metric_aliases = {} # PBL-5786: allow splitting of base search KPI by a different field than entity filter field. To support this, # we need to perform pre stats calculation using 'sistats' command, since, entity aggregate values depend on # multiple fields (breakdown and filter) self.diff_entity_filter_breakdown_fields = False if self.base_search.get("is_entity_breakdown", False) and \ self.base_search.get("is_service_entity_filter", False) and \ self.base_search.get("entity_id_fields", '') != self.base_search.get("entity_breakdown_id_fields", ''): self.diff_entity_filter_breakdown_fields = True self.unique_metric_tokens = {} self.at_least_one_fill_gaps_metric = False def upsert_service_for_preview(self, service): """ Used for previewing, upsert a service to the existing service list NOTE to be VERY careful with this, it can make invalid entries """ if service.get('_key') is None: # CAN BE NONE!! Unline other things self.services["NO-KEY"] = service else: self.services[service['_key']] = service @staticmethod def can_optimize_entities(search_string): """ Examines the base search and determines if we can insert the entity filter clause This is only a method because we may want to expand or fine tune the criteria Cause, yeah, its basic. @param """ # NOTE: It is also valid syntax if you have a subsearch followed by an entity filter optimization # Since we don't know if macros can be automatically optimized, we dont automatically optimize on those either if search_string.find("|") != -1 or search_string.find("`") != -1: # If its a compounded search string, return false return False return True @staticmethod def append_entity_filter(search_string, entity_filter, metric=False): """ Determines if and how we can optimize a search string with a given entity filter Looks for the magic number """ if ItsiSharedAdhocSearch.entity_magic in search_string: # NOTE: Replaces ALL instances of the magic string with the entity filter return search_string.replace(ItsiSharedAdhocSearch.entity_magic, entity_filter) optimization = ItsiSharedAdhocSearch.can_optimize_entities(search_string) if len(entity_filter) > 0: if optimization or metric: search_string += " " + entity_filter else: search_string += " | search " + entity_filter return search_string def get_saved_search_name(self): """ Return the name of the saved search TODO: Determine if this should be converted into a static method """ return 'Indicator - Shared - ' + self.shared_base_search_id + ' - ITSI Search' @staticmethod def generate_search_dispatch_times(alert_lag, search_alert_earliest): """ Generate the dispatch settings used to calculate the earliest and latest times to schedule the search. @param alert_lag: The number of seconds to skew the search range @type alert_lag: Int @param search_alert_earliest: The number of seconds back to search @type search_alert_earliest: Int @return: A dict of settings representing the dispatch times """ settings = {} if alert_lag == 0: # Real Time case we need to set latest time to now settings['dispatch.earliest_time'] = '-' + str(search_alert_earliest) + 's' settings['dispatch.latest_time'] = 'now' elif alert_lag <= 1800: # Normal Case, adjust search timing to account for the lag settings['dispatch.earliest_time'] = '-' + str(search_alert_earliest + alert_lag) + 's' settings['dispatch.latest_time'] = '-' + str(alert_lag) + 's' else: raise ValueError('Invalid alert_lag passed to saved search management, must be below 30 minutes') return settings ################################################################## # Splunk Search Methods ################################################################## def build_splunk_search(self, description=None): """ Create the shared base search parameters. These are the generic defaults passed in, and used by different searches @param description: An alternate string description supplied @type description: string @return: The parameters of the shared base search """ saved_search_settings = {} # Start off with the generic settings saved_search_settings['name'] = self.get_saved_search_name() if not isinstance(description, itsi_py3.string_type): saved_search_settings['description'] = 'Auto generated shared base search' else: saved_search_settings['description'] = description saved_search_settings['search'] = self.generate_shared_base_search_string() # Calculate the dispatch timing alert_lag = int(self.base_search.get('alert_lag', 30)) alert_earliest = int(self.base_search.get('search_alert_earliest', 5)) * 60 dispatch = ItsiSharedAdhocSearch.generate_search_dispatch_times(alert_lag, alert_earliest) saved_search_settings.update(dispatch) saved_search_settings['enableSched'] = '1' # Perform REST call to fetch kpi saved search scheduling settings sync_disabled = utils.get_saved_search_kpi_setting(self.session_key) # Regenerate a random cron every time in order to take into account a change in the alert period # Technically this means on save there is a potential for a kpi to execute slightly off rhythm at # the point of save if the start point of the cron changes for a 5 or 15 period kpi crontab = SavedSearch.generate_cron_schedule(self.base_search.get('alert_period', 5), sync_disabled) saved_search_settings['cron_schedule'] = crontab saved_search_settings['alert.suppress'] = '0' saved_search_settings['alert.track'] = '0' saved_search_settings['alert.digest_mode'] = '1' saved_search_settings['actions'] = 'indicator' saved_search_settings['action.indicator._itsi_base_search_id'] = self.shared_base_search_id return saved_search_settings # Create is also update def create_splunk_search(self, ignore_service_check=False, acl_update=True): """ Create the shared base search that is associated with the shared_base_search_id of this search @param ignore_service_check: Determine whether or not we should ignore the service check. If there are no associated services don't create the base search @type ignore_service_check: Boolean @return: The parameters of the shared base search """ if not ignore_service_check and len(self.services) == 0: logger.debug("Not issuing search for kpi_base_search=%s - No associated services" % self.shared_base_search_id) return True # First a very simple check, if we don't have settings = self.build_splunk_search() ret = SavedSearch.update_search(self.session_key, settings.get('name'), 'itsi', 'nobody', **settings) if ret: # Successfully updated the saved search logger.info("Successfully created/update saved search=%s", settings.get('name')) if acl_update: ret = SavedSearch.update_acl( self.session_key, settings.get('name'), 'nobody') if not ret: msg = 'ACL update failed for saved search %s. Manual update required.' % settings.get('name') logger.error(msg) else: # Search creation failed message = 'Failed to create saved search %s.' % settings.get('name') logger.error(message) raise Exception(message) return ret def delete_splunk_search(self): """ Remove the associated splunk shared base search associated with the base search id """ saved_search_name = self.get_saved_search_name() # Delete saved search for kpi ret = True try: ret = SavedSearch.delete_search(self.session_key, saved_search_name) except ResourceNotFound: logger.exception( 'Saved search "%s" was not found, ignoring delete', saved_search_name ) except Exception: logger.exception( 'Caught exception trying to delete saved search "%s"', saved_search_name ) ret = False if not ret: logger.error('Failed to delete saved search "%s".', saved_search_name) else: logger.info('Successfully deleted saved search "%s".', saved_search_name) return ret def get_splunk_search(self): """ Retrieves the splunk search associated with the base_search_id """ try: saved_search_name = self.get_saved_search_name() return SavedSearch.get_search(self.session_key, saved_search_name) except ResourceNotFound: logger.exception("Unable to splunk search: %s", saved_search_name) return None ################################################################## # Search String Generation ################################################################## def generate_shared_base_search_string(self): """ From the information provided on init, construct the shared base search string """ if self.is_metric: # Metric searches are not updated (as of 4.10.x) to support compound aliases return self.generate_shared_metric_search_string() base_search = self.base_search.get("base_search") is_service_entity_filter = normalizeBoolean(self.base_search.get("is_service_entity_filter", False), False) search_string = base_search if is_service_entity_filter: generated_entity_filter_rule = self.update_entity_filter(save_filter=True) entity_filter_string = '[ `lookup_shared_base_search_entity_filter_rule({base_search_id})` ' \ '| return $value ]'.format(base_search_id=self.shared_base_search_id) search_string = ItsiSharedAdhocSearch.append_entity_filter(search_string, entity_filter_string) logger.debug('Entity filter rule {} was created for base search id {}.' .format(generated_entity_filter_rule['_key'], self.shared_base_search_id)) else: efr = ItsiEntityFilterRule(self.session_key, 'nobody') efr_key = "EFR-" + self.shared_base_search_id entity_filter_rule_object = efr.get('nobody', efr_key) if entity_filter_rule_object is not None: efr.delete('nobody', efr_key) logger.debug("Successfully deleted entity filter rule object : %s", efr_key) entity_id_fields = self.base_search.get("entity_id_fields", "") self._identify_metrics() if self.base_search.get("is_entity_breakdown"): entity_breakdown_id_fields = self.base_search.get("entity_breakdown_id_fields", "") if isinstance(entity_breakdown_id_fields, str) and ',' in entity_breakdown_id_fields: is_compound_alias = True compound_alias_search_strings = self.generate_compound_alias_search_strings(entity_breakdown_id_fields) else: is_compound_alias = False compound_alias_search_strings = [] # ITOA-4442: first, separate metrics with duplicate threshold field and entity statop. # so that, we generate correct aggregate entity search. search_string += " | " + self.aggregate_raw_into_entity( pre_stats_operation=self.diff_entity_filter_breakdown_fields ) # Make sure we don't have any residual data coming through for the results search_string += " | eval serviceid=null() " search_string += ' | eval sec_grp="' + self.base_search.get('sec_grp') + '"' # we would have to perform lookup by entity filter field as well, # if entity filter and breakdown fields are different in a base search if self.diff_entity_filter_breakdown_fields: search_string += ' | `match_filter_entites(' + entity_id_fields + ', sec_grp)`' else: if is_compound_alias: search_string += ' | eval compound_pseudo_entity={compound_entity_title} ' \ '| `match_compound_entities("{entity_breakdown_ids}", sec_grp, ' \ '"{identifier_lookups}")`'.format( compound_entity_title=compound_alias_search_strings[0], entity_breakdown_ids=entity_breakdown_id_fields, identifier_lookups=compound_alias_search_strings[2]) else: search_string += ' | `match_entities(' + entity_breakdown_id_fields + ', sec_grp)`' if len(self.services) == 1 and not is_service_entity_filter: search_string += ' | eval serviceid="' + '","'.join(list(self.services.keys())) + '"' elif len(self.services) > 1 and not is_service_entity_filter: # Redundant, but clarifying search_string += ' | eval serviceid=mvappend("' + '","'.join(list(self.services.keys())) + '")' search_string += " | mvexpand serviceid" if self.diff_entity_filter_breakdown_fields: # generate stats command to aggregate entity, after pre stats calculation # and filter entity lookup is done above search_string += ' | ' + self.aggregate_raw_into_entity() search_string += ' | eval sec_grp="' + self.base_search.get('sec_grp') + '"' if is_compound_alias: search_string += ' | eval compound_pseudo_entity={compound_entity_title}'.format( compound_entity_title=compound_alias_search_strings[0]) search_string += ' | `match_breakdown_compound_alias_entities("{entity_breakdown_id_fields}", ' \ 'sec_grp, "{entity_identifiers}")`'\ .format(entity_breakdown_id_fields=entity_breakdown_id_fields, entity_identifiers=compound_alias_search_strings[2]) else: # This is the default from pre-Robin (4.10.x) search_string += ' | `match_breakdown_entities(' + entity_breakdown_id_fields + ', sec_grp)`' search_string += " | " + self.aggregate_entity_into_service() else: if is_service_entity_filter: search_string += ' | eval sec_grp="' + self.base_search.get('sec_grp') + '"' search_string += ' | `match_entities(' + entity_id_fields + ', sec_grp)`' search_string += ' | mvexpand serviceid' search_string += ' | ' + self.aggregate_raw_into_service() else: # Sacrificing condensed code for readability (I hope) search_string += ' | ' + self.aggregate_raw_into_service() if len(self.services) == 1: search_string += ' | eval serviceid="' + '","'.join(list(self.services.keys())) + '"' elif len(self.services) > 1: search_string += ' | eval serviceid=mvappend("' + '","'.join(list(self.services.keys())) + '")' search_string += ' | mvexpand serviceid' # Now for the common pieces alert_period = self.base_search.get('alert_period', '5') search_string += ' | `assess_severity(' + self.shared_base_search_id + ')`' # Remove any unnecessary fields we dont need search_string += ' | ' + self.remove_extraneous_fields() # Finish up with a round of evals search_string += ' | eval alert_period=' + alert_period + ', itsi_kpi_id=kpiid, itsi_service_id=serviceid' return search_string def generate_shared_metric_search_string(self): """ From the information provided on init, construct the shared metric search string """ metric = self.base_search.get('metric') base_search = 'WHERE index={metric_index} metric_name="{metric_name}"'\ .format(metric_index=metric.get('metric_index'), metric_name=metric.get('metric_name')) is_service_entity_filter = normalizeBoolean(self.base_search.get("is_service_entity_filter", False), False) search_string = base_search sub_search_string = '' if is_service_entity_filter: generated_entity_filter_rule = self.update_entity_filter(save_filter=True) entity_filter_string = '[ `lookup_shared_base_search_entity_filter_rule({base_search_id})` ' \ '| return $value ]'.format(base_search_id=self.shared_base_search_id) logger.debug('Entity filter rule {} was created for base search id {}.' .format(generated_entity_filter_rule['_key'], self.shared_base_search_id)) sub_search_string = ItsiSharedAdhocSearch.append_entity_filter('', entity_filter_string, metric=True) entity_breakdown_id_fields = self.base_search.get("entity_breakdown_id_fields") entity_id_fields = self.base_search.get("entity_id_fields") self._identify_metrics() if self.base_search.get("is_entity_breakdown"): # ITOA-4442: first, separate metrics with duplicate threshold field and entity statop. # so that, we generate correct aggregate entity search. search_string = self.aggregate_raw_into_entity_metric( search_string, sub_search_string, pre_stats_operation=self.diff_entity_filter_breakdown_fields ) # Make sure we don't have any residual data coming through for the results search_string += " | eval serviceid=null() " search_string += ' | eval sec_grp="' + self.base_search.get('sec_grp') + '"' # we would have to perform lookup by entity filter field as well, # if entity filter and breakdown fields are different in a base search if self.diff_entity_filter_breakdown_fields: # different filter and breakdown fields search_string += ' | `match_filter_entites(' + entity_id_fields + ', sec_grp)`' else: search_string += ' | `match_entities(' + entity_breakdown_id_fields + ', sec_grp)`' if len(self.services) == 1 and not is_service_entity_filter: search_string += ' | eval serviceid="' + '","'.join(list(self.services.keys())) + '"' elif len(self.services) > 1 and not is_service_entity_filter: # Redundant, but clarifying search_string += ' | eval serviceid=mvappend("' + '","'.join(list(self.services.keys())) + '")' search_string += " | mvexpand serviceid" if self.diff_entity_filter_breakdown_fields: # different filter and breakdown fields # generate stats command to aggregate entity, after pre stats calculation # and filter entity lookup is done above search_string += ' | ' + self.aggregate_raw_into_entity() search_string += ' | eval sec_grp="' + self.base_search.get('sec_grp') + '"' search_string += ' | `match_breakdown_entities(' + entity_breakdown_id_fields + ', sec_grp)`' search_string += ' | ' + self.aggregate_entity_into_service() else: if is_service_entity_filter: search_string += ' by ' + entity_id_fields search_string += ' | search' + sub_search_string search_string += ' | eval sec_grp="' + self.base_search.get('sec_grp') + '"' search_string += ' | `match_entities(' + entity_id_fields + ', sec_grp)`' search_string += ' | mvexpand serviceid' search_string = self.aggregate_raw_into_service_metric(search_string) else: # Sacrificing condensed code for readability (I hope) # aggregate raw results into service without using serviceid as # entity filtering and breakdown is not enabled. search_string = self.aggregate_raw_into_service_metric(search_string) # append serviceid after generating aggregate service results if len(self.services) == 1: search_string += ' | eval serviceid="' + '","'.join(list(self.services.keys())) + '"' elif len(self.services) > 1: search_string += ' | eval serviceid=mvappend("' + '","'.join(list(self.services.keys())) + '")' search_string += ' | mvexpand serviceid' # Now for the common pieces alert_period = self.base_search.get("alert_period", '5') search_string += ' | `assess_severity(' + self.shared_base_search_id + ')`' # Remove any unnecessary fields we dont need search_string += " | " + self.remove_extraneous_fields() # Finish up with a round of evals search_string += ' | eval alert_period=' + alert_period + ', itsi_kpi_id=kpiid, itsi_service_id=serviceid' return search_string def expand_combined_fields(self): """ PER ITOA-4442 we combine the aggregate statistics, however, so that we don't have missing entries, we need to re-expand these entities @return search string with alias fields in it """ if len(self.metric_aliases) == 0: return '' search_strings = [] for duplicate_metric in self.metric_aliases: string = 'alert_value_' + duplicate_metric + '=\'' + 'alert_value_' +\ self.metric_aliases[duplicate_metric] + '\'' search_strings.append(string) return ' | eval ' + ','.join(search_strings) def remove_extraneous_fields(self): """ Remove the extraneous fields from the shared base search These are the fields based on the metric id fields that weren't copied over into alert_value """ search_string = 'fields - alert_error' if self.diff_entity_filter_breakdown_fields and not self.is_metric: search_string += ' entity_filter_fields' metrics = self.base_search.get('metrics') for metric in metrics: metric_id = metric.get('_key') search_string += ' alert_value_%s' % metric_id return search_string def update_entity_filter(self, save_filter=False): """ @param save_filter: bool - whether or not we should save the generated filter rule back to kvstore @rtype: dict @return: The entity filter rule for the shared base search. Will return nothing in case of misconfiguration Goes through the service information defined by the shared base search initialization and gets entities matched. Creates an entity filter rule based on all entities matched by the shared_base_search If the param save_filter is TRUE, then we also save this filter rule to the kvstore """ # First, simple check, make sure that this isn't null and that this is filtering for entities if not normalizeBoolean(self.base_search.get("is_service_entity_filter", False), False): logger.info("Entity filtering invoked with untrue is_service_entity_filter " "- do not generate entity_filter_rule objects.") # We are working with a straight up service filter - This is a "normal" working case return # Entity Alias Filtering fields are no longer configurable, # and instead the Entity Filter Field (AKA the Entity Id fields) # is used instead to determine the entity aliases used in search generation fieldname = self.base_search.get('entity_id_fields') fieldnames = [] if isinstance(fieldname, itsi_py3.string_type) and fieldname and fieldname.strip(): fieldnames = fieldname.split(',') alias_field = self.base_search.get("entity_id_fields") entities_in_services = [] # First, get all of the matching entity filter clauses for svc in self.services.values(): entity_rules = svc.get('entity_rules') entities_in_service = ItsiFilter(entity_rules).get_filtered_objects( self.session_key, 'nobody', fields=['services:0']) entities_in_services.extend(entities_in_service) # Next, extract the appropriate fields and dedup entity_alias_info = [] processed_entities = set() for entity in entities_in_services: entity_key = entity.get('_key') identifiers = entity.get('identifier') if not identifiers: continue fields = identifiers.get('fields') if not isinstance(fields, list): continue if len(fieldnames) > 0 and entity_key not in processed_entities: processed_entities.add(entity_key) for field in fieldnames: # Only look at identifying fields to construct the entity filter string if field not in fields: logger.warning('Entity "{}" does not have identifying field "{}"' .format(entity.get('_key'), field)) continue values = entity.get(field) if values is None or len(values) == 0: logger.warning("Entity %s had fieldnames specified with no values - skipping" % entity.get("_key")) continue # Using is_valid_name because of the regex protection it affords against un-splunkable aliases # If the entity value contains "\", replace it with "\\\". "\" is considered special character # Per entity filtering changes in ITSI-2179, we only need the list of of the alias values for value in values: entity_filter_object = {} if utils.is_valid_field_value(value): entity_filter_object['alias_value'] = value.replace('\\', '\\\\\\') entity_filter_object['entity_key'] = entity_key entity_alias_info.append(entity_filter_object) # Once we have found all of the matching aliases, create a filter rule and save to the collection efr = ItsiEntityFilterRule(self.session_key, 'nobody') new_filter_rule = efr.create_shared_base_search_filter(self.shared_base_search_id, alias_field, entity_alias_info ) # TODO - post ITSI-2179, move the save to kvstore out of function if save_filter: # If function is called with save_filter, save to entity_filter_rule collection # e.g. scenario where we need are creating a new shared base search in savedsearches.conf file efr.save_batch('nobody', [new_filter_rule], validate_names=False) return new_filter_rule def aggregate_raw_into_service(self, is_aggregate_entity_into_service=False): """ Generate the aggregate string for all metrics found in the base search The macro for this for a single kpi is stats $aggregate_statop$($threshold_field$) AS alert_value | eval is_service_aggregate="1", is_entity_defined="0", entity_key="service_aggregate", entity_title="service_aggregate" | `gettime` The stats part is what we'll need to change, as the macro definition does not allow for a variable number of arguments. Fortunately, just the first piece will need to change @param is_aggregate_entity_into_service: A parameter to determine whether or not we will need to create a by clause for services @type is_aggregate_entity_into_service: boolean @return: a string representing the search term """ ret_string = 'stats ' metrics = self.base_search.get('metrics') if not is_aggregate_entity_into_service: ret_string += self.add_metrics() else: for metric in metrics: # There is some duplication here between this and add_metrics # Which I'm willing to sacrifice for the sake of clarity aggregate_statop = metric.get("aggregate_statop") threshold_field = metric.get("threshold_field") metric_id = metric.get("_key") if metric_id is None: logger.warning("base_search %s missing a metrics key - skipping" % self.shared_base_search_id) continue if aggregate_statop is None or threshold_field is None: logger.warning("metric %s missing threshold_field or aggregate_statop - skipping" % metric.get("_key")) continue ret_string += "%s(alert_value_%s) AS alert_value_%s " % (aggregate_statop, metric_id, metric_id) # The metric qualifier will change how we choose to aggregate the instances metric_qualifier = self.base_search.get('metric_qualifier') if isinstance(metric_qualifier, itsi_py3.string_type) and len(metric_qualifier) > 0: # We have a metric qualifier, create a slightly different search than before ret_string += 'by %s' % metric_qualifier # only group by serviceid for service aggregate results, when entities are involved in calculation. if is_aggregate_entity_into_service: ret_string += ', serviceid, is_entity_in_maintenance' elif self.base_search.get('is_service_entity_filter', False): ret_string += ', serviceid' else: # only group by serviceid for service aggregate results, when entities are involved in calculation. if is_aggregate_entity_into_service: ret_string += 'by serviceid, is_entity_in_maintenance' elif self.base_search.get('is_service_entity_filter', False): ret_string += 'by serviceid' if not is_aggregate_entity_into_service: ret_string += self.expand_combined_fields() ret_string += self._fill_data_gaps() ret_string += ' | eval is_entity_in_maintenance="0", is_service_aggregate="1", ' \ 'is_entity_defined="0"' # if fill data gaps is enabled for at least one metric, # then entity_title would already be set to "service_aggregate". if not self.at_least_one_fill_gaps_metric: ret_string += ', entity_title="service_aggregate"' else: # is_entity_in_maintenance field in each of the entity result row is used to determine if all entities in a # service is in maintenance or not. In the scenario where entity breakdown and entity filtering are not # configured, there will be no entity level results, in that case, we need to eval # is_entity_in_maintenance to 0 for the service aggregation row. ret_string += ' | sort 0 serviceid is_entity_in_maintenance | dedup consecutive=t serviceid' \ ' | eval is_service_aggregate="1", is_entity_defined="0", entity_title="service_aggregate"' ret_string += ', entity_key="service_aggregate" | `gettime`' return ret_string def aggregate_raw_into_service_metric(self, search_string): """ Generate the aggregate string for all metrics found in the shared metric search @return: a string representing the search term """ # using prestats here since serviceid is not defined when calling mstats ret_string = '|mstats ' metrics = self.base_search.get("metrics") all_aggregate_statop = set() metric_aggregate_dict = {} for metric in metrics: # There is some duplication here between this and aggregate_raw_into_entity # Which I'm willing to sacrifice for the sake of clarity aggregate_statop = metric.get("aggregate_statop") all_aggregate_statop.add(aggregate_statop) metric_id = metric.get("_key") if metric_id is None: logger.warning("base_search %s missing a metrics key - skipping" % self.shared_base_search_id) continue if aggregate_statop is None: logger.warning("metric %s missing aggregate_statop - skipping" % metric.get("_key")) continue if aggregate_statop in metric_aggregate_dict: metric_aggregate_dict[aggregate_statop].append(metric_id) else: metric_aggregate_dict[aggregate_statop] = [metric_id] if self.base_search.get('is_service_entity_filter'): # when entity filtering is enabled, we need to perform mstats prestats # and then stats grouped by serviceid, to calculate service aggregate results ret_string += 'prestats=t ' for aggregate_statop in list(all_aggregate_statop): ret_string += '%s(_value) ' % (aggregate_statop) ret_string += search_string ret_string += ' | stats ' eval_string = '' for aggregate_statop, metric_ids in list(metric_aggregate_dict.items()): ret_string += '%s(_value) AS alert_value_%s ' % (aggregate_statop, metric_ids[0]) for metric_id in metric_ids[1:]: eval_string += ' | eval alert_value_%s=\'alert_value_%s\'' % (metric_id, metric_ids[0]) # aggregation by serviceid is only needed, when entity filtering is enabled, # to correctly aggregate associated entities for a service ret_string += 'by serviceid' else: # when entity filtering is not enabled, we directly calculate service aggregate # using mstats command as we don't need to group by serviceid based on entity # service association. eval_string = '' for aggregate_statop, metric_ids in list(metric_aggregate_dict.items()): ret_string += '%s(_value) AS alert_value_%s ' % (aggregate_statop, metric_ids[0]) for metric_id in metric_ids[1:]: eval_string += ' | eval alert_value_%s=\'alert_value_%s\'' % (metric_id, metric_ids[0]) ret_string += search_string ret_string += eval_string ret_string += self._fill_data_gaps() ret_string += ' | eval is_entity_in_maintenance="0", is_service_aggregate="1", is_entity_defined="0",' \ ' entity_key="service_aggregate"' # if fill data gaps is enabled for at least one metric, # then entity_title would already be set to "service_aggregate". if not self.at_least_one_fill_gaps_metric: ret_string += ', entity_title="service_aggregate"' ret_string += ' | `gettime`' return ret_string def add_metrics(self, is_aggregate_into_service=True, pre_stats_operation=False): """ Add subsearch from metrics @type is_aggregate_into_service: bool @param is_aggregate_into_service: True, if search string is being generated for service aggregation. else, False @param pre_stats_operation: Boolean @return: string """ ret_string = '' for metric_id, metric in self.unique_metric_tokens.items(): aggregate_statop = metric.get('aggregate_statop') entity_statop = metric.get('entity_statop') threshold_field = metric.get('threshold_field', '_value') if self.is_metric: threshold_field = '_value' if is_aggregate_into_service: # search string for aggregate raw into service, when no entities are involved ret_string += "%s(%s) AS alert_value_%s " % (aggregate_statop, threshold_field, metric_id) else: # search string for aggregate raw into entity if pre_stats_operation: ret_string += '%s(%s) ' % (entity_statop, threshold_field) else: ret_string += "%s(%s) AS alert_value_%s " % (entity_statop, threshold_field, metric_id) return ret_string def aggregate_raw_into_entity(self, pre_stats_operation=False): """ Generate the aggregate entity for all metrics found in the base search The macro this is based on for the single kpi is example: STATS = stats $entity_statop$($threshold_field$) AS alert_value by $entity_id_fields$ | `gettime` PRE-STATS = sistats count(log_level) values(component) by sourcetype, component NOTE for sistats command: PBL-5786: allow splitting of base search KPI by a different field than entity filter field. To support this, we need to perform pre stats calculation using 'sistats' command, since, entity aggregate values depend on multiple fields (breakdown and filter) @param pre_stats_operation: True is want to generate 'sistats` command for prestats calculation of entity aggregates @type pre_stats_operation: bool """ ret_string = 'stats ' if pre_stats_operation: ret_string = 'sistats ' # TODO: Get clarification on whether or not this should be a string, a list # And handle appropriately. From my understanding, it needs to be a single field # And should PROBABLY be renamed to the singular entity_breakdown_id_fields = self.base_search.get("entity_breakdown_id_fields") entity_id_fields = self.base_search.get("entity_id_fields") if not utils.is_valid_str(entity_breakdown_id_fields): raise Exception('For an entity search, valid entity breakdown id field is required.') if not pre_stats_operation and self.diff_entity_filter_breakdown_fields\ and not utils.is_valid_str(entity_id_fields): raise Exception('For an entity search, valid entity id field is required.') ret_string += self.add_metrics(is_aggregate_into_service=False, pre_stats_operation=pre_stats_operation) # Now add the suffix and be on our way. The metric qualifier will # change how we choose to aggregate the instances metric_qualifier = self.base_search.get("metric_qualifier") has_metric_qualifier = utils.is_valid_str(metric_qualifier) # We have a metric qualifier, create a slightly different search than before if has_metric_qualifier: if pre_stats_operation: ret_string += 'values(%s) by %s, %s, %s' % \ (entity_id_fields, entity_breakdown_id_fields, entity_id_fields, metric_qualifier) else: if self.diff_entity_filter_breakdown_fields: if self.is_metric: ret_string += 'by %s, serviceid, %s' % (entity_breakdown_id_fields, metric_qualifier) else: ret_string += 'values(%s) as entity_filter_fields by %s, serviceid, %s' % \ (entity_id_fields, entity_breakdown_id_fields, metric_qualifier) else: ret_string += 'by %s, %s' % (entity_breakdown_id_fields, metric_qualifier) else: if pre_stats_operation: if ',' in entity_breakdown_id_fields: entity_aggregation_fields = entity_breakdown_id_fields.split(',') if entity_id_fields not in entity_aggregation_fields: entity_aggregation_fields.append(entity_id_fields) else: entity_aggregation_fields = [entity_breakdown_id_fields, entity_id_fields] split_by_fields = ','.join(entity_aggregation_fields) ret_string += 'values(%s) by %s' % \ (entity_id_fields, split_by_fields) else: if self.diff_entity_filter_breakdown_fields: if self.is_metric: ret_string += 'by %s, serviceid' % entity_breakdown_id_fields else: ret_string += 'values(%s) as entity_filter_fields by %s, serviceid' % \ (entity_id_fields, entity_breakdown_id_fields) else: ret_string += 'by %s' % entity_breakdown_id_fields if not pre_stats_operation: ret_string += self.expand_combined_fields() ret_string += self._fill_data_gaps() ret_string += ' | `gettime`' return ret_string def aggregate_raw_into_entity_metric(self, search_string, sub_search_string=None, pre_stats_operation=False): """ Generate the aggregate entity for all metrics found in the base search The macro this is based on for the single kpi is @param pre_stats_operation: True is want to generate 'prestats` command for prestats calculation of entity aggregates @type pre_stats_operation: bool """ # a new method for metric based KPI ret_string = '| mstats ' if pre_stats_operation: ret_string = '| mstats prestats=t ' # And handle appropriately. From my understanding, it needs to be a single field # And should PROBABLY be renamed to the singular entity_breakdown_id_fields = self.base_search.get("entity_breakdown_id_fields") entity_id_fields = self.base_search.get("entity_id_fields") if not utils.is_valid_str(entity_breakdown_id_fields): raise Exception('For an entity search, valid entity breakdown id field is required.') if not pre_stats_operation and self.diff_entity_filter_breakdown_fields\ and not utils.is_valid_str(entity_id_fields): raise Exception('For an entity search, valid entity id field is required.') ret_string += self.add_metrics(is_aggregate_into_service=False, pre_stats_operation=pre_stats_operation) ret_string += search_string if sub_search_string: # apply entity filter if pre_stats_operation: ret_string += ' by %s, %s | search%s'\ % (entity_breakdown_id_fields, entity_id_fields, sub_search_string) else: ret_string += ' by %s | search%s' % (entity_breakdown_id_fields, sub_search_string) else: if pre_stats_operation: # different filter and breakdown fields ret_string += ' by %s, %s' % (entity_breakdown_id_fields, entity_id_fields) else: ret_string += ' by %s' % entity_breakdown_id_fields if not pre_stats_operation: ret_string += self.expand_combined_fields() ret_string += self._fill_data_gaps() ret_string += ' | `gettime`' return ret_string def aggregate_entity_into_service(self): """ Generate the service aggregate based on the existing single value macro definition: appendpipe [stats $service_statop$(alert_value) AS alert_value | eval is_service_aggregate="1", is_entity_defined="0", entity_key="service_aggregate", entity_title="service_aggregate"] | `gettime` You'll notice that this is really really similar to the aggregate_raw_into_entity_command but using an appendpipe. """ return "appendpipe [" + self.aggregate_raw_into_service(is_aggregate_entity_into_service=True) + "]" def gen_preview_alert_search(self, kpi): """ Generate the preview alert search It basically means generating a normal search string, but with the kpi adjusted @param kpi: A kpi dictionary representing the new thing to be edited and previewed @type kpi: dict """ service_id = kpi.get("service_id", "NEW_SERVICE_ID") if service_id not in self.services: # Add the service to the list service = self.backend.get(self.session_key, 'nobody', 'service', service_id) if service is not None: self.services[service_id] = service else: logger.error("No matching services found for service_id=%s", service_id) return None found = False for idx, old_kpi in enumerate(self.services[service_id]['kpis']): if kpi.get("_key") == old_kpi.get("_key"): # Replace the kpi self.services[service_id]['kpis'][idx] = kpi found = True break if not found: # We're dealing with a new kpi self.services[service_id]['kpis'].append(kpi) result = self.generate_shared_base_search_string() return result def _identify_metrics(self): """ Goes through the list of metrics, stores unique metrics in a global variable and stores duplicate metrics in another global variable with appropriate aliases for metric alert_value. @return: None """ duplicate_metric_tokens = {} metric_token_keys = {} metrics = self.base_search.get('metrics', []) for metric in metrics: entity_statop = metric.get('entity_statop') aggregate_statop = metric.get('aggregate_statop') threshold_field = metric.get('threshold_field') metric_id = metric.get('_key') if metric_id is None: logger.warning("kpi_base_search=%s missing a metrics key - skipping" % self.shared_base_search_id) continue if threshold_field is None or (not self.base_search.get('is_entity_breakdown') and aggregate_statop is None) or entity_statop is None: logger.warning("metric %s missing threshold_field or entity_statop - skipping" % metric.get("_key")) continue if not self.at_least_one_fill_gaps_metric and metric.get('fill_gaps') == 'last_available_value': self.at_least_one_fill_gaps_metric = True # For ITOA-4442, we should check and see if the "token" tuple of entity_statop, threshold_field are unique if self.base_search.get('is_entity_breakdown'): identifying_tuple = (entity_statop, threshold_field) else: identifying_tuple = (aggregate_statop, threshold_field) if identifying_tuple not in metric_token_keys: metric_token_keys[identifying_tuple] = metric_id self.unique_metric_tokens[metric_id] = { 'entity_statop': entity_statop, 'aggregate_statop': aggregate_statop, 'threshold_field': threshold_field } # ITOA-4442 - update the duplicate statop fields, they won't be dicts like the key field else: if identifying_tuple not in duplicate_metric_tokens: duplicate_metric_tokens[identifying_tuple] = [] duplicate_metric_tokens[identifying_tuple].append(metric_id) # ITOA-4442: Now we iterate through the metric ids, checking for duplicates for tuple_key, metric_key in metric_token_keys.items(): if tuple_key not in duplicate_metric_tokens: continue duplicate_metric_ids = duplicate_metric_tokens[tuple_key] for duplicate_metric in duplicate_metric_ids: self.metric_aliases[duplicate_metric] = metric_key def _transpose_aggregated_metric_values(self, metrics): """ Transpose the metric results such that each row of results contains metric id and corresponding alert value. This is needed to perform lookup of cached metric alert values, based on metric id and base search id. @type metrics: list of dict @param metrics: list of metrics objects defined in shared base search. @rtype: basestring @return: search string """ ret_string = '' metric_id_strings = [] metric_alert_value_strings = [] for metric in metrics: metric_id = metric.get('_key') metric_id_strings.append('%s' % metric_id) metric_alert_value_strings.append('alert_value_%s' % metric_id) if len(metrics) == 1: ret_string += ' | eval metric_id="' + metric_id_strings[0] + '"' else: ret_string += ' | eval metric_id=mvappend("' + '", "'.join(metric_id_strings) + '")' ret_string += ' | mvexpand metric_id' case_cmd_args = [] for metric in metrics: metric_id = metric.get('_key') case_arg = 'metric_id=="%s", \'alert_value_%s\'' % (metric_id, metric_id) case_cmd_args.append(case_arg) ret_string += ' | eval alert_value=case(' + ', '.join(case_cmd_args) + ')' ret_string += ' | fields - ' + ', '.join(metric_alert_value_strings) if self.diff_entity_filter_breakdown_fields: ret_string += ', entity_filter_fields' return ret_string def _lookup_cached_metric_results(self, metrics, entity_split_field, is_compound_alias, compound_alias_search_strings): """ Generate search to lookup cached metric aggregate/entity alert value(s) for the metrics with "fill_gaps" attribute set to "last_available_value". @type metrics: list of dict @param metrics: list of metrics objects defined in shared base search. @type entity_split_field: string @param entity_split_field: a string representing the alias(es) (can be more than one in case of compound entity aliases) for splitting KPI calculation @type is_compound_alias: boolean @param is_compound_alias: whether the base search is split-by a compound entity alias @type compound_alias_search_strings: list of basestrings @param compound_alias_search_strings: a list of various search strings that are used in compound alias searches. The one used in this function is a string representing the alias(es) (can be more than one in case of compound entity aliases) concatenated together by a colon @rtype: basestring @return: search string """ ret_string = '' ret_string += ' | inputlookup append=true itsi_kpi_alert_value_cache where (base_search_id="%s"' \ % self.shared_base_search_id if self.diff_entity_filter_breakdown_fields: ret_string += ' AND entity_split_field="%s" AND entity_title!="service_aggregate" AND serviceid=*' \ % entity_split_field elif self.base_search.get('is_entity_breakdown'): ret_string += ' AND entity_split_field="%s" AND entity_title!="service_aggregate"' \ % entity_split_field elif self.base_search.get('is_service_entity_filter'): ret_string += ' AND serviceid=* AND entity_title="service_aggregate"' else: ret_string += ' AND entity_title="service_aggregate"' or_clause = [] for metric in metrics: # only lookup for the cached results of metrics for which "fill gaps" # option is set to "last available value" if metric.get('fill_gaps') == 'last_available_value': metric_id = metric.get('_key') or_clause.append('metric_id="%s"' % metric_id) ret_string += ' AND (' + ' OR '.join(or_clause) + '))' return ret_string def _fill_metric_data_gaps_and_update_cache( self, entity_split_field, is_compound_alias, compound_alias_search_strings): """ Generate search to fill entity/aggregate data gaps for metrics, with cached metric alert values. Also, update the cache (itsi_kpi_alert_value_cache lookup) with new alert values along with mod_time, if new alert value for metric is encountered. @type entity_split_field: string @param entity_split_field: a string representing the alias(es) (can be more than one in case of compound entity aliases) for splitting KPI calculation @type is_compound_alias: boolean @param is_compound_alias: whether the base search is split-by a compound entity alias @type compound_alias_search_strings: list of basestrings @param compound_alias_search_strings: a list of various search strings that are used in compound alias searches. The one used in this function is a search string splitting the entity_title then coaslescing the fields to the entity alias fields @rtype: basestring @return: search string """ ret_string = ' | eval mod_time=if(isnotnull(alert_value), now(), mod_time)' if self.diff_entity_filter_breakdown_fields or self.base_search.get('is_entity_breakdown'): if is_compound_alias: ret_string += (compound_alias_search_strings[1]) else: ret_string += ' | eval entity_title=coalesce(%s, entity_title)' % entity_split_field if self.base_search.get('is_entity_breakdown'): if is_compound_alias: # When it's a compound alias, we aggregate BY the split field NOT the entity title # since the entity title will be using a different format (field1:field2:field3...) ret_string += ' | stats first(*) as * max(mod_time) as mod_time by metric_id, %s' \ % entity_split_field else: ret_string += ' | stats first(*) as * max(mod_time) as mod_time by metric_id, entity_title' # If we are also filtering but it differs, then we also need to add serviceid to BY clause if self.diff_entity_filter_breakdown_fields: ret_string += ', serviceid' ret_string += ' | eval cached_alert_value=coalesce(alert_value, cached_alert_value), base_search_id="%s"' \ % self.shared_base_search_id if is_compound_alias: # For compound aliases, need to work created compound_alias field ret_string += ' | eval entity_title=coalesce(entity_title, %s)' % compound_alias_search_strings[0] ret_string += ' | fields - alert_value, %s' \ ' | eval entity_split_field="%s", metric_lookup_key=metric_id."_".entity_title' \ % (entity_split_field, entity_split_field) if self.diff_entity_filter_breakdown_fields: ret_string += '."_".serviceid' elif self.base_search.get('is_service_entity_filter'): ret_string += ' | stats first(*) as * max(mod_time) as mod_time by metric_id, serviceid' \ ' | eval cached_alert_value=coalesce(alert_value, cached_alert_value),' \ ' base_search_id="%s", entity_title="service_aggregate"' \ ' | fields - alert_value' \ ' | eval metric_lookup_key=metric_id."_".serviceid' \ % self.shared_base_search_id else: ret_string += ' | stats first(*) as * max(mod_time) as mod_time by metric_id' \ ' | eval cached_alert_value=coalesce(alert_value, cached_alert_value),' \ ' base_search_id="%s", entity_title="service_aggregate"' \ ' | fields - alert_value' \ ' | eval metric_lookup_key=metric_id."_".entity_title' \ % self.shared_base_search_id ret_string += ', mod_time=if(isnull(mod_time), now(), mod_time)' \ ' | outputlookup itsi_kpi_alert_value_cache append=true key_field=metric_lookup_key' return ret_string def _convert_transposed_results_to_original_form(self, metrics, entity_split_field, is_compound_alias, compound_alias_search_strings): """ Convert transposed results back to original form, after filling data gaps for metrics defined in shared base search. @type metrics: list of dict @param metrics: list of metrics objects defined in shared base search. @type entity_split_field: string @param entity_split_field: a string representing the alias(es) (can be more than one in case of compound entity aliases) for splitting KPI calculation @type is_compound_alias: boolean @param is_compound_alias: whether the base search is split-by a compound entity alias @type compound_alias_search_strings: list of basestrings @param compound_alias_search_strings: a list of various search strings that are used in compound alias searches. The one used in this function is a string representing the alias(es) (can be more than one in case of compound entity aliases) concatenated together by a colon @rtype: basestring @return: search string """ ret_string = '' eval_metric_alert_value = [] for metric in metrics: metric_id = metric.get('_key') eval_metric = 'alert_value_%s=if(metric_id=="%s", cached_alert_value, null())' % (metric_id, metric_id) eval_metric_alert_value.append(eval_metric) ret_string += ' | eval ' + ', '.join(eval_metric_alert_value) ret_string += ' | fields - metric_id, cached_alert_value, base_search_id, metric_lookup_key' if self.base_search.get('is_entity_breakdown'): if is_compound_alias: ret_string += ', entity_split_field ' + compound_alias_search_strings[1] else: ret_string += ', entity_split_field | rename entity_title as %s' % (entity_split_field) ret_string += ' | stats first(*) as * by %s' % (entity_split_field) if self.diff_entity_filter_breakdown_fields: ret_string += ', serviceid' elif self.base_search.get('is_service_entity_filter'): ret_string += ' | stats first(*) as * by serviceid' else: ret_string += ' | stats first(*) as *' return ret_string def _fill_data_gaps(self): """ Generate the search part to fill data gaps with last available value for metrics, if "fill_gaps" attribute is set to "last_available_value" for metric. @rtype: basestring @return: search string which perform fill data gaps operation """ ret_string = '' metrics = self.base_search.get('metrics', []) if not self.at_least_one_fill_gaps_metric or len(metrics) == 0: return ret_string entity_split_field = self.base_search.get('entity_breakdown_id_fields') is_compound_alias = False compound_alias_search_strings = [] if ',' in entity_split_field: is_compound_alias = True compound_alias_search_strings = self.generate_compound_alias_search_strings(entity_split_field) ret_string += self._transpose_aggregated_metric_values(metrics) ret_string += self._lookup_cached_metric_results(metrics, entity_split_field, is_compound_alias, compound_alias_search_strings) ret_string += self._fill_metric_data_gaps_and_update_cache(entity_split_field, is_compound_alias, compound_alias_search_strings) ret_string += self._convert_transposed_results_to_original_form(metrics, entity_split_field, is_compound_alias, compound_alias_search_strings) return ret_string @staticmethod def generate_compound_alias_search_strings(entity_breakdown_id_fields): """ Helper function that returns an array of search strings that are used across the various fill data gaps functions. @type entity_breakdown_id_fields: basestring @param entity_breakdown_id_fields: the entity_breakdown_id_fields, which should be comma separated @rtype: list of strings @return: a list of strings -> [concatenated_breakdown_id_strings, breakdown_fields_from_entity_title, #compound_entity_lookups] """ entity_aliases = [alias.strip() for alias in entity_breakdown_id_fields.split(',')] itsi_identifier_lookup_list = [] for alias in entity_aliases: itsi_identifier_lookup_list.append('_itsi_identifier_lookups AS itsi_identifier_' + alias) compound_itsi_identifier_lookups = ', '.join(itsi_identifier_lookup_list) # We are opting to go with :: as the delimiter of choice when displaying the values of a pseudoentity concatenated_breakdown_id_fields = '."::".'.join(entity_aliases) coalesced_breakdown_fields = [] # When we fill data gaps, we need to coalesce and fill in values from the cached results # which will come from the saved entity_title for ind in range(len(entity_aliases)): coalesced_breakdown_fields.append('{entity_alias}=coalesce({entity_alias}, mvindex(entity_title, ' '{index}))'.format(entity_alias=entity_aliases[ind], index=ind)) breakdown_fields_from_entity_title = ' | eval entity_title=split(entity_title, "::")' \ ' | eval {coalesced_breakdown_fields}'\ ' | fields - entity_title'.format( coalesced_breakdown_fields=', '.join(coalesced_breakdown_fields)) return [concatenated_breakdown_id_fields, breakdown_fields_from_entity_title, compound_itsi_identifier_lookups]