# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. from .itsi_atad_search_base import ItsiAtAdSearchBase from itsi.objects.itsi_kpi_entity_threshold import ItsiKpiEntityThreshold from itsi.objects.itsi_kpi_at_info import ItsiKpiAtInfo from itsi.objects.itsi_service import ItsiService from itsi.itsi_utils import ItsiAtUtils from ITOA.setup_logging import logger from string import Template from ITOA.itoa_common import is_feature_enabled AT_SEARCH_TEMPLATE = Template(" | mstats latest(alert_value) AS alert_value latest(alert_level) AS alert_level WHERE " " `get_itsi_summary_metrics_index` AND ($kpi_filter_string) AND is_filled_gap_event!=1 " " AND is_null_alert_value=0 `$metrics_macro_filter` by $filter_fields " " span=1m | where alert_level!=-2 and not isnull(alert_value) | table _time, alert_value, " " alert_level, $filter_fields | $command ") HIGH_SCALE_AT_SEARCH_TEMPLATE = Template("| inputlookup kpi_at_info_lookup " "| search adaptive_thresholding_training_window=${training_days} " "| sort limit=0 service_id " "| rename _key as kpi_id " "| fields kpi_id " "| itsibatchat training_window=${training_days}") HIGH_SCALE_ENTITY_AT_SEARCH_TEMPLATE = Template("| inputlookup entity_thresholds_at_lookup " "| search adaptive_thresholds_is_enabled=1 adaptive_thresholding_training_window=${training_days} " "| sort limit=0 kpi_id " "| fields kpi_id, entity_key, entity_title " "| itsibatchat entitylevelthreshold=True training_window=${training_days}") class ItsiAtSearch(ItsiAtAdSearchBase): """ Implements ITSI AT Search Contains CRUD operations and utility functions pertaining to AT searches """ log_prefix = '[ITSI AT Search] ' collection_name = 'itsi_service' def __init__(self, session_key): super(ItsiAtSearch, self).__init__(session_key, logger=logger) def make_search_name(self, training_window_id): """ Generate name for AT search stanza @param training_window_id: training window string @returns: name for AT search stanza """ return 'itsi_at_' + self._make_search_name_suffix(training_window_id) def make_high_scale_at_search_name(self, training_window_id, object_type="kpi"): """ Generate name for AT search stanza @param training_window_id: training window string @param object_type: object_type to make saved search name. Valid values: kpi, entity @returns: name for AT search stanza """ return 'itsi_batch_at_' + self._make_search_name_suffix(training_window_id, object_type=object_type) def get_all_searches(self): """ @returns: dict of saved searches keyed by stanza name """ return self._get_all_searches(lambda x: x.startswith('itsi_at_search')) def get_high_scale_at_searches(self, object_type="kpi"): """ @returns: dict of high scale AT saved searches keyed by stanza name """ return self._get_all_searches(lambda x: x.startswith('itsi_batch_at_search_{0}'.format(object_type))) def make_search(self, **params): """ Generate AT search string @param **params: keyword args needed for search generation; required args: `kpi_filter_string` @returns: AT search string """ if 'kpi_filter_string' not in params: logger.error("Missing parameters to search template, got %s", params) raise Exception("Missing parameters to search template") if 'command' not in params: params["command"] = "applyat" if is_feature_enabled('itsi-at-outlier-removal', self.session_key) else 'itsiat' if 'metrics_macro_filter' not in params: params["metrics_macro_filter"] = "metrics_service_level_kpi_only" if 'filter_fields' not in params: params["filter_fields"] = "itsi_kpi_id, itsi_service_id" return AT_SEARCH_TEMPLATE.substitute(**params) def make_high_scale_at_search(self, **params): """ Generate search string that calls the batching version of the AT search @param **params: keyword args needed for search generation; required args: `kpi_filter_string` @returns: AT search string """ if params.get("object_type", "kpi") == "entity": return HIGH_SCALE_ENTITY_AT_SEARCH_TEMPLATE.substitute(**params) return HIGH_SCALE_AT_SEARCH_TEMPLATE.substitute(**params) def compute_earliest_time(self, training_window): """ Compute earliest time (in relative minutes) for AT search @param training_window: AT training window, as a relative time spec in days, minutes, or hours ('-Xd' or '-Xm') @returns: relative time string for search earliest time in minutes, e.g. '-1940m' """ return '-%sm' % (self.to_minutes(training_window)) def make_saved_search_params(self, name, search, et, kpi_list, cron_schedule='0 0 * * *'): """ @param name: search stanza name @param search: search string @param et: earliest time (relative timespec) @param kpi_list: list of KPI IDs @returns: dict of key/value pairs for the saved search stanza """ return { 'name': name, 'disabled': False, 'enableSched': 1, 'cron_schedule': cron_schedule, 'dispatch.earliest_time': et, 'dispatch.latest_time': 'now', 'action.summary_index._kpi_id_list': ','.join(kpi_list), 'action.summary_index._et': et, # save earliest time to persist it in format user can't override 'search': search } def get_num_training_days(self, search, search_name): """ @param search: existing at search @param search_name: name of the existing at search @returns: int of number of training days """ try: _et = search['content'].get('action.summary_index._et') except KeyError: _et = search['content'].get('dispatch.earliest_time') if not _et: # ITSI-31527: For scenarios where _et is not set and assuming # saved searches are named as 'itsi_at_search_kpi_minusXY' # where X is number of, Y - hours(h), minutes(m), days(d) suffix = search_name.split("itsi_at_search_kpi_minus")[1] _et = "-" + suffix return self.to_days(_et) def rewrite_saved_searches(self, is_high_scale_at_enabled=True, is_entity_level_at_enabled=True): """ @returns: Returns a list of tuples of failed search name and exception if any """ existing_at_searches = self.get_all_searches() failed_rewrite_searches = [] kpi_at_info_interface = ItsiKpiAtInfo(self.session_key, 'nobody') entity_thresholds_interface = ItsiKpiEntityThreshold(self.session_key, 'nobody') # finds all kpis with AT enabled service_interface = ItsiService(self.session_key, 'nobody') all_at_kpis = [] entity_thresholding_enabled_services = [] at_enabled_kpi_services = service_interface.get_bulk( 'nobody', filter_data={"kpis.adaptive_thresholds_is_enabled": True}, fields=['kpis.service_id', 'kpis._key', 'kpis.adaptive_thresholding_training_window', 'kpis.adaptive_thresholds_is_enabled'] ) for service in at_enabled_kpi_services: for kpi in service["kpis"]: if kpi["adaptive_thresholds_is_enabled"]: all_at_kpis.append(kpi) # Only get objects if entity level AT or High Scale AT not enabled if not is_entity_level_at_enabled or not is_high_scale_at_enabled: entity_thresholding_enabled_services = service_interface.get_bulk('nobody', filter_data={"kpis.is_entity_level_thresholding": True}) for service in entity_thresholding_enabled_services: for kpi in service["kpis"]: kpi["is_entity_level_thresholding"] = False supported_kpi_training_windows = ["-7d", "-14d", "-30d", "-60d"] supported_entity_training_windows = ["-7d", "-14d", "-30d", "-60d"] if not is_high_scale_at_enabled: kpis_by_training_window = {} for kpi in all_at_kpis: attw = kpi['adaptive_thresholding_training_window'] # if key does not exist, then skip if attw is None or attw == "": continue if attw not in kpis_by_training_window.keys(): kpis_by_training_window[attw] = [kpi['_key']] else: kpis_by_training_window[attw].append(kpi['_key']) # remove itsibatchat searches high_scale_searches = self.get_high_scale_at_searches("") for high_scale_search_name, high_scale_search in high_scale_searches.items(): try: self.delete_saved_search(high_scale_search_name) except Exception as e: self.logger.exception("Exception while deleting saved search for itsi-high-scale-at: %s, %s" % (high_scale_search_name, str(e))) failed_rewrite_searches.append((high_scale_search_name, e)) # delete at_info records, entity thresholds and disable entity_thresholding on KPIs kpi_at_info_interface.delete_bulk('nobody') entity_thresholds_interface.delete_bulk('nobody') if entity_thresholding_enabled_services: service_interface.save_batch("nobody", entity_thresholding_enabled_services, False, req_source="rewrite_saved_searches") # create applyat searches if they do not exist for training_window in kpis_by_training_window.keys(): search_name = self.make_search_name(training_window) kpis = kpis_by_training_window[training_window] kpi_filter_string = " OR ".join("itsi_kpi_id=\"" + x + "\"" for x in kpis) search_string = self.make_search(kpi_filter_string=kpi_filter_string, training_days=training_window) et = self.compute_earliest_time(training_window) try: if search_name not in existing_at_searches.keys(): self.create_saved_search(search_name, search_string, et, kpis) else: self.update_saved_search(search_name, search_string, et, kpis) except Exception as e: self.logger.exception("Exception while rewriting saved search after disabling itsi-high-scale-at: %s, %s" % (search_name, str(e))) failed_rewrite_searches.append((search_name, e)) else: # removes old applyat searches for existing_search_name, existing_search in existing_at_searches.items(): try: self.delete_saved_search(existing_search_name) except Exception as e: self.logger.exception("Exception while deleting saved search for itsi-high-scale-at: %s, %s" % (existing_search_name, str(e))) failed_rewrite_searches.append((existing_search_name, e)) # gets existing kpi_entity_objects at_info_records = kpi_at_info_interface.get_bulk('nobody') at_info_records_by_kpi = {} for record in at_info_records: at_info_records_by_kpi[record['_key']] = record new_at_info_records = [] for kpi in all_at_kpis: # creates new at_info record if kpi['_key'] not in at_info_records_by_kpi: new_at_info_records.append({ 'service_id': kpi['service_id'], 'adaptive_thresholding_training_window': kpi['adaptive_thresholding_training_window'], 'object_type': 'kpi_at_info', '_key': kpi['_key'], }) # updates existing at_info record only if training window is different else: record = at_info_records_by_kpi.pop(kpi['_key']) if record['adaptive_thresholding_training_window'] != kpi['adaptive_thresholding_training_window']: record['adaptive_thresholding_training_window'] = kpi['adaptive_thresholding_training_window'] new_at_info_records.append(record) # upserts new and updated at_info records if len(new_at_info_records): kpi_at_info_interface.save_batch('nobody', new_at_info_records, False, req_source='rewrite_saved_searches') # creates itsibatchat searches for KPIs if they don't exist high_scale_kpi_searches = self.get_high_scale_at_searches() for ix, training_window in enumerate(supported_kpi_training_windows): high_scale_search_name = self.make_high_scale_at_search_name(training_window) search_string = self.make_high_scale_at_search(training_days=training_window) et = self.compute_earliest_time(training_window) cron_schedule = ItsiAtUtils.generate_cron_expression(ix) try: if high_scale_search_name not in high_scale_kpi_searches.keys(): self.create_saved_search(high_scale_search_name, search_string, et, cron_schedule=cron_schedule) except Exception as e: self.logger.exception("Exception while rewriting saved search: %s, %s" % (high_scale_search_name, str(e))) failed_rewrite_searches.append((high_scale_search_name, None)) high_scale_entity_searches = self.get_high_scale_at_searches("entity") if is_entity_level_at_enabled: # creates itsibatchat searches for Entities if they don't exist for ix, training_window in enumerate(supported_entity_training_windows): high_scale_search_name = self.make_high_scale_at_search_name(training_window, "entity") search_string = self.make_high_scale_at_search(training_days=training_window, object_type="entity") et = self.compute_earliest_time(training_window) cron_schedule = ItsiAtUtils.generate_cron_expression(ix) try: if high_scale_search_name not in high_scale_entity_searches.keys(): self.create_saved_search(high_scale_search_name, search_string, et, cron_schedule=cron_schedule) except Exception as e: self.logger.exception("Exception while rewriting saved search: %s, %s" % (high_scale_search_name, str(e))) failed_rewrite_searches.append((high_scale_search_name, None)) else: # Remove All Entity Thresholds on disabling the feature flag # Disable is_entity_level_thresholding value in all KPIs entity_thresholds_interface.delete_bulk('nobody') if entity_thresholding_enabled_services: service_interface.save_batch("nobody", entity_thresholding_enabled_services, False, req_source="rewrite_saved_searches") # Delete itsibatchat searches for Entities if feature flag is disabled for high_scale_search_name, high_scale_search in high_scale_entity_searches.items(): try: self.delete_saved_search(high_scale_search_name) except Exception as e: self.logger.exception("Exception while deleting saved search for itsi-entity-level-adaptive-thresholding: %s, %s" % (high_scale_search_name, str(e))) failed_rewrite_searches.append((high_scale_search_name, e)) return failed_rewrite_searches