You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

3439 lines
187 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import itsi_py3
import json
import re
import copy
import splunk
from splunk.util import normalizeBoolean
import ITOA.itoa_common as utils
from custom_threshold_windows.utils import calculate_custom_thresholds
from ITOA.itoa_exceptions import ItoaDatamodelContextError, ItoaError
from ITOA.itoa_factory import instantiate_object
from ITOA.itoa_object import ItoaObject, CRUDMethodTypes
from ITOA.saved_search_utility import SavedSearch
from ITOA.setup_logging import logger
from itsi.itsi_utils import GLOBAL_SECURITY_GROUP_CONFIG, ITOAInterfaceUtils
from itsi.objects.itsi_entity import ItsiEntity
from itsi.objects.itsi_entity_filter import ItsiEntityFilterRule
from itsi.objects.itsi_kpi import (
EXTERNAL_FIELDS, SEARCH_AND_CALCULATE_ATTRIBUTES, THRESHOLDS_ATTRIBUTES, ItsiKpi, convert_filter_to_kpi_filter,
convert_kpi_to_modified_kpi,
)
from itsi.objects.itsi_kpi_state_cache import ItsiKPIStateCache
from itsi.objects.itsi_kpi_template import ItsiKpiTemplate
from itsi.objects.itsi_security_group import ItsiSecGrp
from itsi.searches.itsi_filter import ItsiFilter
from itsi.searches.itsi_searches import ItsiKpiSearches
from itsi.searches.itsi_shared_base_search import ItsiSharedAdhocSearch
from itsi.service_template.service_template_utils import ServiceTemplateUtils
MAX_SAVE_LIMIT = 500
class ItsiService(ItoaObject):
"""
Implements ITSI service
"""
collection_name = 'itsi_services'
SHKPI_STARTS_WITH = 'SHKPI-'
def __init__(self, session_key, current_user_name, object_type='service', collection_name=None,
title_validation_required=True, is_securable_object=True):
if collection_name is None:
collection_name = self.collection_name
super(ItsiService, self).__init__(session_key,
current_user_name,
object_type,
collection_name=collection_name,
is_securable_object=is_securable_object)
self.synchronous = False
# is used by update_services_with_base_service_templates() and post_save_setup() to store
# fetched base service templates from kvstore. Once, service templates are fetched in
# update_services_with_base_service_templates(), we do not need to perform extra fetch in
# post_save_setup() to synchronously update service templates.
self.base_service_templates = None
# used to skip service template processing for services
self.skip_service_template_update = False
# force to update the savesearch, this will bypass all the savedsearch updates optimization
self.force_update_savedsearch = False
# set the flag to false by default. Set to true when using the publish sandbox using save_batch_publish
self.is_publish_flow = False
# Use a simple caching layer for persisted services
# Performance enhancement for sandbox services
self.use_persisted_services_caching = False
# Additional variables for persisted services caching layer
# Each dictionary's keys describe the MINIMAL fields of the services inside. There may be other fields, due to
# how ITSI processes objects.
# Example:
# {
# '_key,kpis._key': [{'_key': '6d97fb9c-fee4-449a-9f42-142655dbdcac',
# 'kpis': [{'_key': 'SHKPI-6d97fb9c-fee4-449a-9f42-142655dbdcac'}], # gitleaks:allow
# 'object_type': 'service',
# 'permissions': ...}],
# }
# _ALL is a special case that's best-effort, all fields
self.persisted_services_dict = {}
self.persisted_services_by_id_dict = {}
# Mark service update as a KPI deletion
self.is_kpi_delete = False
def allow_additional_patching(self):
"""
Inherited from ItoaObject
"""
return True
def templatize(self, owner, object_id, req_source='unknown', for_base_service_template=False):
"""
Templatize given object id
@type owner: basestring
@param owner: context of the request `nobody` vs an actual user
@type object_id: basestring
@param object_id: unique identifier of an object to templatize
@type req_source: basestring
@param req_source: identified source initiating the operation.
@type for_base_service_template: bool
@param for_base_service_template: True, if templatizing service for Base Service Temple.
Else, False.
@rtype: dict/None
@return: requested template.
"""
service_template = super(ItsiService, self).templatize(owner, object_id, req_source)
if not for_base_service_template: # normal templatizing of service
# Null'ify service dependencies
service_template['services_depends_on'] = []
service_template['services_depending_on_me'] = []
service_template_fields_to_remove = ('source_itsi_da', 'source_itsi_da_id', 'source_itsi_da_version')
# pop service template fields
for key in service_template_fields_to_remove:
service_template.pop(key, None)
# Templatize kpis in the service.
logger.debug('Templatizing KPIs for service ID=`%s`', object_id)
kpis = service_template.get('kpis', [])
kpi_keys_to_remove = ('_key', 'search', 'search_alert', 'search_entities', 'search_aggregate',
'search_time_series', 'search_time_compare',
'search_time_series_entities', 'search_time_series_aggregate')
for kpi in kpis:
for key in kpi_keys_to_remove:
kpi.pop(key, None)
if 'aggregate_thresholds' in kpi:
kpi['aggregate_thresholds']['search'] = ''
if 'entity_thresholds' in kpi:
kpi['entity_thresholds']['search'] = ''
if 'time_variate_thresholds_specification' in kpi and \
'policies' in kpi['time_variate_thresholds_specification'] and \
'default_policy' in kpi['time_variate_thresholds_specification']['policies']:
if 'aggregate_thresholds' in \
kpi['time_variate_thresholds_specification']['policies']['default_policy']:
(kpi['time_variate_thresholds_specification']['policies']['default_policy']
['aggregate_thresholds']['search']) = ''
if 'entity_thresholds' in \
kpi['time_variate_thresholds_specification']['policies']['default_policy']:
(kpi['time_variate_thresholds_specification']['policies']['default_policy']
['entity_thresholds']['search']) = ''
else:
if service_template is None:
return None
# specific templatizing of service to create Base Service Template object
# from service. Used by ItsiBaseServiceTemplate object.
if 'service_id' not in service_template or not service_template['service_id']:
service_template['service_id'] = object_id
service_fields_to_remove = ('services_depends_on', 'services_depending_on_me',
'service_template_id', 'enabled', 'isFirstTimeSaveDone',
'base_service_template_id', 'algorithms',
'source_itsi_da', 'source_itsi_da_id', 'source_itsi_da_version')
# pop service fields
for key in service_fields_to_remove:
service_template.pop(key, None)
# Templatize kpis in the service.
logger.debug('Templatizing KPIs for service ID=`%s`', object_id)
kpis = service_template.get('kpis', [])
# remove service health kpi
for i in range(len(kpis)):
if kpis[i]['_key'].startswith(self.SHKPI_STARTS_WITH):
del kpis[i]
break
kpi_keys_to_remove = ('search', 'kpi_base_search', 'search_alert', 'search_entities',
'search_aggregate', 'search_time_series', 'search_time_compare',
'search_time_series_entities', 'search_time_series_aggregate',
'backfill_enabled', 'backfill_earliest_time', 'service_id',
'enabled', 'sec_grp', 'type', 'base_service_template_id',
'recalculate_custom_thresholds', 'active_custom_threshold_window',
'linked_custom_threshold_windows', 'aggregate_thresholds_custom',
'time_variate_thresholds_specification_custom',
'is_recommended_time_policies', 'was_recommendation_modified',
'did_load_recommendation', 'recommendation_training_window',
'threshold_direction', 'recommendation_start_date', 'threshold_recommendation_summary')
# pop kpi fields
for kpi in kpis:
# generate new key for kpi
kpi['_key'] = ITOAInterfaceUtils.generate_backend_key()
for key in kpi_keys_to_remove:
kpi.pop(key, None)
logger.debug('object_id=`%s` template=`%s`', object_id, service_template)
return service_template
def _determine_disabled_entity_level_thresholding(self, service, persisted_service):
if not utils.is_feature_enabled('itsi-high-scale-at', self.session_key) or not utils.is_feature_enabled('itsi-entity-level-adaptive-thresholding', self.session_key):
return []
changed_kpis = []
if service is None:
for kpi in persisted_service.get('kpis', []):
if kpi.get("is_entity_level_thresholding", False):
changed_kpis.append(kpi.get('_key'))
elif persisted_service is None:
pass
else:
persisted_entity_level_thresholding_kpis = set()
for kpi in persisted_service.get('kpis', []):
if kpi.get("is_entity_level_thresholding", False):
persisted_entity_level_thresholding_kpis.add(kpi.get('_key'))
updated_entity_level_thresholding_kpis = set()
for kpi in service.get('kpis', []):
if kpi.get("is_entity_level_thresholding", False):
updated_entity_level_thresholding_kpis.add(kpi.get('_key'))
changed_kpis = list(persisted_entity_level_thresholding_kpis - updated_entity_level_thresholding_kpis)
return changed_kpis
def _determine_changed_kpis_at_ad(self, service, persisted_service, state_field, track_field, settings_field=None):
"""
Helper method to see if anomaly detection settings have changed in any KPIs for a service
@param service: service object (current state) or `None` if service is being deleted
@param persisted_service: previously saved state of service object or None if service is new
@param state_field: KPI field that contains the ON/OFF state information
@param track_field: KPI field that contains training window information (`None` if not applicable)
@param settings_field: KPI field containing additional settings in a dict (None if not applicable)
@returns: changed_kpis: dict keyed by kpiid with change given by `on`, `off`, `changed`
"""
changed_kpis = {}
if service is None:
for kpi in persisted_service.get('kpis', []):
if not utils.is_valid_dict(kpi):
# Be resilient
continue
if kpi.get(state_field):
changed_kpis[kpi['_key']] = 'off'
elif persisted_service is None:
for kpi in service.get('kpis', []):
if not utils.is_valid_dict(kpi):
# Be resilient
continue
# Mark the kpis state to on only if the kpi is enabled.
if kpi.get(state_field) and kpi.get('enabled') == 1:
changed_kpis[kpi['_key']] = 'on'
else:
# Check for anomaly detection being turned on or off for kpi
# use dicts here to provide both easy conversion to sets and easy attribute access
persisted_kpi_dict_atad_on = dict((kpi.get('_key'), kpi)
for kpi in persisted_service.get('kpis', [])
if kpi.get(state_field))
kpi_dict_atad_on = dict((kpi.get('_key'), kpi)
for kpi in service.get('kpis', [])
if kpi.get(state_field))
persisted_kpi_dict_atad_on_kpi_enabled = dict((kpi.get('_key'), kpi)
for kpi in persisted_service.get('kpis', [])
if (kpi.get(state_field) and kpi.get('enabled') == 1))
kpi_dict_atad_on_kpi_enabled = dict((kpi.get('_key'), kpi)
for kpi in service.get('kpis', [])
if (kpi.get(state_field) and kpi.get('enabled') == 1))
persisted_kpi_dict_atad_on_kpi_disabled = dict((kpi.get('_key'), kpi)
for kpi in persisted_service.get('kpis', [])
if (kpi.get(state_field) and kpi.get('enabled') == 0))
kpi_dict_atad_on_kpi_disabled = dict((kpi.get('_key'), kpi)
for kpi in service.get('kpis', [])
if (kpi.get(state_field) and kpi.get('enabled') == 0))
# (1) Check for AD being toggled on by comparing service's KPIs to persisted service's KPIs
# NOTE: only enabled kpis are being considered here. For disabled kpis, its a NO-OP.
_toggled_on = set(kpi_dict_atad_on_kpi_enabled.keys()) - set(persisted_kpi_dict_atad_on_kpi_enabled.keys())
# For the case when at was already on and kpi is being toggled from disabled to enabled.
_kpis_enabled_with_at_on = set(kpi_dict_atad_on_kpi_enabled) & set(persisted_kpi_dict_atad_on_kpi_disabled)
_toggled_on = _toggled_on | _kpis_enabled_with_at_on
logger.info("List of KPIs toggled on: %s", _toggled_on)
# (2) Check for AD being toggled off by comparing service's KPIs to persisted service's KPIs
_toggled_off = set(persisted_kpi_dict_atad_on.keys()) - set(kpi_dict_atad_on.keys())
# For the case when at is on but the kpi is toggled from enabled to disabled.
_kpis_disabled_with_at_on = set(kpi_dict_atad_on_kpi_disabled) & set(persisted_kpi_dict_atad_on_kpi_enabled)
_toggled_off = _toggled_off | _kpis_disabled_with_at_on
logger.info("List of KPIs toggled off: %s", _toggled_off)
# (3) Check if any KPIs with AD turned on have had their training period changed
# Affect the change only if the kpi is enabled
_changed = set()
for k in kpi_dict_atad_on:
if k in persisted_kpi_dict_atad_on:
old_training_window = persisted_kpi_dict_atad_on[k].get(track_field)
new_training_window = kpi_dict_atad_on[k].get(track_field)
if new_training_window != old_training_window:
_changed.add(k)
# (4) Check if any KPIs with AD turned on have had their settings changed
# Affect the change only if the kpi is enabled
if settings_field is not None:
for k in kpi_dict_atad_on:
if k in persisted_kpi_dict_atad_on:
old_settings = persisted_kpi_dict_atad_on[k].get(settings_field, {})
new_settings = kpi_dict_atad_on[k].get(settings_field, {})
settings_keys = set()
settings_keys |= set(old_settings.keys())
settings_keys |= set(new_settings.keys())
for setting_key in settings_keys:
if old_settings.get(setting_key) != new_settings.get(setting_key):
_changed.add(k)
changed_kpis.update(dict((k, 'on') for k in _toggled_on))
changed_kpis.update(dict((k, 'off') for k in _toggled_off))
changed_kpis.update(dict((k, 'changed') for k in _changed))
return changed_kpis
def _determine_generate_changed_search_param(self, service, persisted_service):
"""
Helper method to see if any search related parameters are updated in any KPI for a service.
@type service: dict
@param service: service object
@type persisted_service: dict
@param persisted_service: service object from kvstore
@rtype changed_kpis: dict
@param changed_kpis: kpis with savedsearch info
@rtype new_filter_rule_objects: list
@param new_filter_rule_objects: a list of the dicts representing the newly created entity filter rules
@rtype need_remove_entity_filter: list
@param need_remove_entity_filter: a list of the KPIs that have turned off entity filtering
(and no longer need EFRs)
"""
changed_kpis = {}
if not utils.is_valid_dict(service) or service is None:
return changed_kpis
if not utils.is_valid_dict(persisted_service) or persisted_service is None:
persisted_kpi_list = {}
else:
persisted_kpi_list = dict((kpi.get('_key'), kpi)
for kpi in persisted_service.get('kpis', []) if kpi.get('_key'))
new_kpi_list = dict((kpi.get('_key'), kpi)
for kpi in service.get('kpis', []) if kpi.get('_key'))
need_generate_entity_filter = set()
need_remove_entity_filter = set()
sync_disabled = utils.get_saved_search_kpi_setting(self.session_key)
kpi_need_filter_update = False
# Check to see if svc entity rules updated - definitely need to update the entity filter rules for kpis if yes
if persisted_service is not None and persisted_service.get('entity_rules') != service.get('entity_rules'):
kpi_need_filter_update = True
for kpi_id, kpi_content in new_kpi_list.items():
# Add temporary keys
kpi_content['service_id'] = service.get('_key')
kpi_content['service_title'] = service.get('title', '')
# If entity filtering is disabled, we do not need to update/create the itsi_entity_filter objects
is_service_entity_filter = kpi_content.get('is_service_entity_filter', False)
if kpi_content['_key'].startswith(self.SHKPI_STARTS_WITH) or \
kpi_content.get('search_type') == 'shared_base':
# No saved searches need to be updated for ServiceHealth.
# Saved searches for Shared Base Searches are updated async by the itsi_refresher Modular Input.
continue
# take care of all the newly created KPIs.
if kpi_id not in persisted_kpi_list:
# KPI Creation occurring - we must generate the saved search settings
saved_search_settings = ItsiKpi(self.session_key, 'nobody') \
.generate_saved_search_settings(kpi_content, service.get('entity_rules'),
service.get('sec_grp'),
sync_schedule_disabled=sync_disabled)
changed_kpis.update({kpi_id: saved_search_settings})
if is_service_entity_filter:
# If a new KPI, the EntityFilterRule does not exist - thus, need to generate entity filter rule
need_generate_entity_filter.add(kpi_id)
else:
if kpi_need_filter_update and is_service_entity_filter:
need_generate_entity_filter.add(kpi_id)
if self.force_update_savedsearch:
logger.debug("Migration likely in effect. Performing a forced update"
" to saved searches on KPI: {}.".format(kpi_id))
# regardless of the search string content, the search need to be regenerated
saved_search_settings = ItsiKpi(self.session_key, 'nobody') \
.generate_saved_search_settings(kpi_content,
service.get('entity_rules'),
service.get('sec_grp'),
acl_update=False,
sync_schedule_disabled=sync_disabled)
changed_kpis.update({kpi_id: saved_search_settings})
if is_service_entity_filter:
# Migration in effect - need to generate entity filter
need_generate_entity_filter.add(kpi_id)
else:
# kpi/search already exist, compare the search setting
saved_search_settings = ItsiKpi(self.session_key, 'nobody') \
.generate_saved_search_settings(kpi_content,
service.get('entity_rules'),
service.get('sec_grp'),
acl_update=False,
sync_schedule_disabled=sync_disabled)
persisted_kpi_content = persisted_kpi_list.get(kpi_id)
# If the entity filtering field was updated to a different value, regenerate entity filter rule
if is_service_entity_filter and \
persisted_kpi_content.get('entity_id_fields') != kpi_content.get('entity_id_fields'):
need_generate_entity_filter.add(kpi_id)
# if it was a shared base search before, now a non-shared bases search
# we still want to generate the search string for it.
if persisted_kpi_content.get('search_type') == 'shared_base' and \
kpi_content.get('search_type') != 'shared_base':
saved_search_settings['acl_update'] = True
changed_kpis.update({kpi_id: saved_search_settings})
if is_service_entity_filter:
# Shared base search -> adhoc search KPI needs a single search entity filter rule
need_generate_entity_filter.add(kpi_id)
else:
persisted_kpi_content['service_id'] = service.get('_key')
persisted_kpi_content['service_title'] = service.get('title', '')
# With PBL-5603, changes to allow a user to split KPI by a different entity field from
# entity filtering field were added. As a part of this change, new field
# 'entity_breakdown_id_fields' was added to kpi object. Code chunk below is added to
# handle case of migration, when persisted KPI content in kvstore will not have
# 'entity_breakdown_id_fields' while performing migration of service objects.
if persisted_kpi_content.get('is_entity_breakdown', False):
entity_breakdown_id_fields = persisted_kpi_content.get('entity_breakdown_id_fields', None)
if entity_breakdown_id_fields is None or len(entity_breakdown_id_fields) == 0:
persisted_kpi_content['entity_breakdown_id_fields'] = persisted_kpi_content.get(
'entity_id_fields', '')
logger.debug('entity_breakdown_id_fields missing from kpi object = {}. '
'Setting it to entity_id_fields.'
.format(persisted_kpi_content.get('_key')))
persisted_saved_search_settings = ItsiKpi(self.session_key, 'nobody') \
.generate_saved_search_settings(persisted_kpi_content, service.get('entity_rules'),
service.get('sec_grp'),
acl_update=False,
sync_schedule_disabled=sync_disabled)
# pop out the crontab, we don't compare the crontab
cron_schedule = saved_search_settings.get('cron_schedule')
saved_search_settings.pop('cron_schedule', None)
persisted_saved_search_settings.pop('cron_schedule', None)
if saved_search_settings != persisted_saved_search_settings:
saved_search_settings['cron_schedule'] = cron_schedule
changed_kpis.update({kpi_id: saved_search_settings})
if is_service_entity_filter and not persisted_kpi_content.get('is_service_entity_filter',
False):
# If entity filtering was previously disabled but is now enabled, need to generate EFR
need_generate_entity_filter.add(kpi_id)
elif persisted_kpi_content.get('is_service_entity_filter', False) and not \
is_service_entity_filter:
need_remove_entity_filter.add(kpi_id)
if is_service_entity_filter and normalizeBoolean(persisted_kpi_content.get('enabled')) == \
normalizeBoolean(saved_search_settings.get('disabled')):
# If entity filtering was set but the service changes to/from enabled/disabled then...
if normalizeBoolean(persisted_kpi_content.get('enabled')):
# If the KPI/Service WAS enabled (aka now disabled) remove entity filter rules
need_remove_entity_filter.add(kpi_id)
else:
# If the service actually became enabled, we regenerate the entity filter rules
need_generate_entity_filter.add(kpi_id)
new_filter_rule_objects = []
if need_generate_entity_filter:
entity_rules = service.get('entity_rules', [])
# Get the associated entities ONCE, pass down into ItsiSearches to break down the matched alias values
entities = ItsiFilter(entity_rules).get_filtered_objects(self.session_key, 'nobody', fields=['services:0'])
service_has_entity_rules = isinstance(entity_rules, list) and len(entity_rules) > 0
for kpi_id in need_generate_entity_filter:
kpi_content = new_kpi_list[kpi_id]
entity_filtering_info = ItsiKpiSearches.gen_entity_filter_info(
self.session_key, kpi_content, entities, service_has_entity_rules)
generated_filter_object = ItsiEntityFilterRule(self.session_key, 'nobody') \
.generate_filter_rule_dictionary(kpi_id,
service.get('_key'),
entity_filtering_info,
service.get('sec_grp'))
new_filter_rule_objects.append(generated_filter_object)
return changed_kpis, new_filter_rule_objects, list(need_remove_entity_filter)
def _determine_changed_alert_period(self, service, persisted_service):
"""
Helper method to see if alert period value have changed in any KPIs for a service
@param service: service object (current state) or `None` if service is being deleted
@param persisted_service: previously saved state of service object or None if service is new
@returns: changed_kpis: dict keyed by kpiid and new alert period value
"""
changed_kpis = {}
if not utils.is_valid_dict(service) or service is None or \
not utils.is_valid_dict(persisted_service) or persisted_service is None:
return changed_kpis
persisted_alert_period_list = dict((kpi.get('_key'), kpi)
for kpi in persisted_service.get('kpis', []) if kpi.get('alert_period'))
alert_period_list = dict((kpi.get('_key'), kpi)
for kpi in service.get('kpis', []) if kpi.get('alert_period'))
for k in alert_period_list:
if k in persisted_alert_period_list:
old_alert_period = persisted_alert_period_list[k].get('alert_period')
new_alert_period = alert_period_list[k].get('alert_period')
if old_alert_period != new_alert_period:
changed_kpis.update({k: new_alert_period})
return changed_kpis
def get_shared_search_type(self, service):
"""
Get shared search kpis
@type service: dict
@param service: service object
@return: dict which hold kpis which are shared search
"""
if not isinstance(service, dict):
# return empty dict
return {}
return dict((kpi.get('_key'), kpi) for kpi in service.get('kpis', []) if kpi.get('search_type')
== 'shared_base' and kpi.get("base_search_id") is not None)
def get_un_shared_search_type(self, service):
"""
Get ad-hoc or datamodel search kpis
@type service: dict
@param service: service object
@return: dict which hold kpis which are shared search
"""
if not isinstance(service, dict):
# return empty dict
return {}
return dict((kpi.get('_key'), kpi) for kpi in service.get('kpis', [])
if kpi.get('search_type') != 'shared_base')
def _determine_changed_search_type(self, service, persisted_service):
"""
Determine if search type changed from adhoc/data model to shared_base search
@type service: dict
@param service: service
@type persisted_service: dict
@param persisted_service: persisted service
@return: list of kpis which is changed from ad-hoc/datamodel to shared base search
"""
if service is None or persisted_service is None:
return []
if not isinstance(service, dict) or not isinstance(persisted_service, dict):
return []
old_un_shared_kpi = self.get_un_shared_search_type(persisted_service)
new_kpi_shared_search = self.get_shared_search_type(service)
# get search which has changed from ad-hoc/datamodel to shared
kpi_changed_to_shared = list(set(old_un_shared_kpi.keys()).intersection(set(new_kpi_shared_search.keys())))
# Lets pass id which is changed
logger.info("length=%s, ids=%s changed from ad-hoc/datamodel to shared_based", len(kpi_changed_to_shared),
kpi_changed_to_shared)
return kpi_changed_to_shared
def _determine_changed_service_dependencies(self, service, persisted_service):
"""
Will check if any dependencies were added or removed to the service
@param service: service object (current state) or `None` if service is being deleted
@param persisted_service: previously saved state of service object or None if service is new
@return: dict of service key to added and removed dependencies
"""
update_set = {}
service_depends_on = service.get('services_depends_on')
added_dependencies = []
removed_dependencies = []
# This is a new service, so we can assume all dependencies are newly added as well
if not persisted_service:
if service_depends_on is None or len(service_depends_on) == 0:
return update_set # nothing to do
# all dependencies are new
for service_dependency in service_depends_on:
added_dependencies.append({
'target_service': service_dependency.get('serviceid'),
'depending_kpis': service_dependency.get('kpis_depending_on')
})
# This is an update to an existing service, so we must go through the structure
else:
existing_depends_on = persisted_service.get('services_depends_on')
# no dependency info on either existing or new, so nothing required
if (service_depends_on is None or len(service_depends_on)) == 0 and (existing_depends_on is None
or len(existing_depends_on)) == 0:
return update_set # nothing to do
existing_service_ids = set([])
new_service_ids = set([])
if existing_depends_on is not None:
existing_service_ids = set({record.get('serviceid') for record in existing_depends_on})
if service_depends_on is not None:
new_service_ids = set({record.get('serviceid') for record in service_depends_on})
# Find which services were completely removed as dependencies
removed_services = existing_service_ids - new_service_ids
# Find which services are new dependencies
added_services = new_service_ids - existing_service_ids
logger.debug('removed services detected: %s', str(removed_services))
# For all removed services, we can remove all depending on kpis from the target service
for removed_service in removed_services:
existing_dependency = next(
(x for x in existing_depends_on if x.get('serviceid') == removed_service), None)
removed_dependencies.append({
'target_service': removed_service,
'depending_kpis': existing_dependency.get('kpis_depending_on')
})
# For all added services we can add all kpis as dependencies
for added_service in added_services:
new_dependency = next((x for x in service_depends_on if x.get('serviceid') == added_service), None)
added_dependencies.append({
'target_service': added_service,
'depending_kpis': new_dependency.get('kpis_depending_on')
})
# Now the complicated part, if service was already a dependency
# we need to see if list of KPI dependencies changed
for service_id in existing_service_ids.intersection(new_service_ids):
new_dependency = next((x for x in service_depends_on if x.get('serviceid') == service_id), None)
existing_dependency = next((x for x in existing_depends_on if x.get('serviceid') == service_id), None)
# New KPI dependencies
new_kpis = list(set(new_dependency.get('kpis_depending_on'))
- set(existing_dependency.get('kpis_depending_on')))
# Removed KPI dependencies
removed_kpis = list(set(existing_dependency.get('kpis_depending_on'))
- set(new_dependency.get('kpis_depending_on')))
if len(new_kpis) > 0:
added_dependencies.append({
'target_service': service_id,
'depending_kpis': new_kpis
})
if len(removed_kpis) > 0:
removed_dependencies.append({
'target_service': service_id,
'depending_kpis': removed_kpis
})
logger.debug('added_dependencies: %s, removed_dependencies: %s',
str(added_dependencies), str(removed_dependencies))
# if any dependencies were added or removed, then we have a service we need to update
if len(added_dependencies) > 0 or len(removed_dependencies) > 0:
update_set[service.get('_key')] = {
'added_dependencies': added_dependencies,
'removed_dependencies': removed_dependencies
}
return update_set
def get_shared_base_search_update_jobs(self, services, old_services, transaction_id, method='GET'):
"""
Iterate through the services to determine what kpi base searches we need to update on a service change
@param services: A list of new services that we have not yet saved to the database. Could be dicts, could be
strings
@param old_services: A dict of old services that we're comparing against, needs to be a subset of services
@returns: A list of requests issued to update different base searches
"""
# When do we want to update a base search? From the work on ticket ITOA-6362,
# think that there are 4 rules that we want to abide by
# Anything outside of these rules should not have an update issued for it
# 1 Issue an update to all associated base searches if the entity filter rules change
# 2 Issue an update to the base search when an entirely new service is being created that uses base searches
# 3 Issue an update when a service has deleted a kpi and there are no more kpis in that service which
# reference a particular base search
# 4 Issue an update when a service has added a kpi that references a previously unreferenced base search
base_searches_to_update = set()
existing_ids_set = set()
logger.debug('Updating shared base searches tid=%s', transaction_id)
for service in services:
current_base_searches = set()
current_enabled_base_searches = set()
if method == CRUDMethodTypes.METHOD_CREATE and not service.get('enabled', 0):
continue
kpis = service.get("kpis", [])
for kpi in kpis:
if (kpi['search_type'] == 'shared_base'
and kpi.get('base_search_id') is not None):
# Get the set of currently referenced base searches
current_base_searches.add(kpi['base_search_id'])
# We want a special case for changing the enabled/disabled status
# Because it adds an additional dimension
if kpi.get('enabled') == 1:
current_enabled_base_searches.add(kpi['base_search_id'])
if isinstance(service, dict): # Sometimes this may be a full service, other times a string
# Keep in mind that old_service can be None
old_service = old_services.get(service.get('_key'))
elif isinstance(service, itsi_py3.string_type):
old_service = old_services.get(service)
else:
raise Exception('Invalid service in service list %s.' % service)
if not old_service:
# We're dealing with an entirely new service, add all of the base searches
# referencing that service
for kpi in kpis:
if (kpi['search_type'] == 'shared_base'
and kpi.get('base_search_id') is not None):
base_searches_to_update.add(kpi['base_search_id'])
continue
else:
if (method == CRUDMethodTypes.METHOD_UPDATE or method == CRUDMethodTypes.METHOD_UPSERT) and not service.get('enabled', 0) and (service.get('enabled', 0) == old_service.get('enabled', 0) or service.get('base_service_template_id', '') != old_service.get('base_service_template_id', '')):
continue
elif method == CRUDMethodTypes.METHOD_DELETE:
is_shared_base_search_enabled = False
for kpi in old_service.get('kpis', []):
if (kpi.get('search_type') == 'shared_base'
and kpi.get('base_search_id') is not None
and kpi.get('enabled') == 1):
is_shared_base_search_enabled = True
if not is_shared_base_search_enabled:
continue
# If the entity rules are different, then everything needs to be updated
if old_service.get('entity_rules') != service.get('entity_rules'):
for s in [service, old_service]:
kpis = s.get('kpis', [])
for kpi in kpis:
if (kpi['search_type'] == 'shared_base'
and kpi.get('base_search_id') is not None):
base_searches_to_update.add(kpi['base_search_id'])
continue
# Get all of the previously referenced KPIs
prior_base_searches = set()
prior_enabled_base_searches = set()
old_kpis = old_service.get('kpis', [])
for kpi in old_kpis:
if (kpi.get('search_type') == 'shared_base'
and kpi.get('base_search_id') is not None):
prior_base_searches.add(kpi['base_search_id'])
existing_ids_set.add(kpi['base_search_id'])
# If the kpi was enabled previously, then we want to add it
if kpi.get('enabled') == 1:
prior_enabled_base_searches.add(kpi['base_search_id'])
# Now we should have two sets that tell us what the current base searches are and what the
# prior base searches were, we want things that fall outside of the intersection of the two
base_searches_to_update.update(prior_base_searches.symmetric_difference(current_base_searches))
# Now we add the base searches where the dimensionality changed.
# If it goes from enabled -> disabled or disabled -> enabled, then it should show up in only one of
# the sets, therefore we want to ony add whats in the symmetric differences
base_searches_to_update.update(
prior_enabled_base_searches.symmetric_difference(current_enabled_base_searches))
# We now should have a set of all base searches that need an update
refresh_jobs = []
existing_ids = list(existing_ids_set)
logger.debug('Shared base update issued tid=%s searches=%s', transaction_id, base_searches_to_update)
for base_search in base_searches_to_update:
refresh_jobs.append(
self.get_refresh_job_meta_data(
'update_shared_base_search',
[base_search],
'kpi_base_search',
change_detail={'existing_ids': existing_ids},
transaction_id=transaction_id
))
return refresh_jobs
def kpi_threshold_sort(self, aggregate_threshold):
"""
Helper method that gathers the various thresholds from a aggregate threshold (or policy in time variate)
@type aggregate_threshold: dict of thresholds
@param aggregate_threshold: the policy or aggregate threshold for the kpi
@return: a list of sorted threshold values
"""
if not aggregate_threshold:
logger.error('No aggregate thresholds provided for KPI being updated.')
return []
thresholds = []
for threshold_level in aggregate_threshold.get('thresholdLevels'):
# Create a tuple of (threshold value, severity value) to see if either thresholds changed or severity did
thresholds.append(
(int(threshold_level.get('thresholdValue')), int(threshold_level.get('severityValue')))
)
thresholds.sort(key=lambda x: x[0]) # Sort by the threshold value
return thresholds
def get_bulk_skip_enforce_security(
self, owner, sort_key=None, sort_dir=None, filter_data=None, fields=None, skip=None, limit=None,
req_source='unknown', transaction_id=None,
):
"""
Retrieves objects with no security criteria, matching criteria. If no filtering is specified, retrieves all
objects of this object type.
@type owner: string
@param owner: user who is performing this operation
@type sort_key: string
@param sort_key: string defining keys to sort by
@type sort_dir: string
@param sort_dir: string defining direction for sorting - asc or desc
@type filter_data: dictionary
@param filter_data: json filter constructed to filter data. Follows mongodb syntax
@type fields: list
@param fields: list of fields to retrieve, fetches all fields if not specified
@type skip: number
@param skip: number of items to skip from the start
@type limit: number
@param limit: maximum number of items to return
@type req_source: string
@param req_source: identified source initiating the operation
@rtype: list of dictionary
@return: objects retrieved on success, throws exceptions on errors
"""
results = self.do_paged_get_bulk(owner, sort_key=sort_key, sort_dir=sort_dir, filter_data=filter_data,
fields=fields, limit=limit, skip=skip, skip_enforce_security=True,
transaction_id=transaction_id)
number_of_objects = len(results) if utils.is_valid_list(
results) else 1 if utils.is_valid_dict(results) else 0
logger.debug('%s objects of type %s retrieved, request source: %s',
number_of_objects,
self.object_type,
req_source,
)
return results
def _determine_custom_thresholds_need_update(self, owner, service, persisted_service):
"""
Helper method that goes through a service and its persisted service from the kvstore. Checks the KPI and the
various threshold settings to determine if the recalculate flag needs to be reset.
@type owner: string
@param owner: owner of service being updated
@type service: dict
@param service: service object
@type persisted_service: dict
@param persisted_service: service object from kvstore
@return: the service with its potentially updated KPIs
"""
if not utils.is_valid_dict(service) or service is None:
return service
if not utils.is_valid_dict(persisted_service) or persisted_service is None:
persisted_kpi_list = {}
else:
persisted_kpi_list = dict((kpi.get('_key'), kpi)
for kpi in persisted_service.get('kpis', []) if kpi.get('_key'))
if not persisted_kpi_list:
return service
# NOTE: Import here to remove circular dependency between itsi_service and itsi_CTW objects
# Need to fetch the CTW affecting the specific KPIs to perform threshold calculation
# Potentially we can remove circular imports if we do not need to import itsi_service in CTW object class
from itsi.objects.itsi_custom_threshold_windows import ItsiCustomThresholdWindows
ctw_obj_interface = ItsiCustomThresholdWindows(self.session_key, self.current_user_name)
kpis_need_to_recalculate_custom_thresholds = set()
service_kpis = service.get('kpis', [])
for kpi_index, kpi in enumerate(service_kpis):
kpi_id = kpi.get('_key')
if self.SHKPI_STARTS_WITH in kpi_id or kpi_id not in persisted_kpi_list.keys():
# SH-KPI should not be affected by custom threshold windows. Same with newly added KPIs
continue
if not kpi.get('active_custom_threshold_window', '') or not kpi.get('linked_custom_threshold_windows', []):
# Do not need to do anything if the current KPI does not have any active custom threshold windows
logger.debug(
'KPI with key {} does not have active custom threshold window or'
'already needs to be recalculated.'.format(kpi_id))
continue
if kpi.get('recalculate_custom_thresholds', False):
kpi['recalculate_custom_thresholds'] = False
kpis_need_to_recalculate_custom_thresholds.add(kpi_index)
continue
persisted_kpi = persisted_kpi_list.get(kpi_id)
if persisted_kpi.get('time_variate_thresholds', False) != kpi.get('time_variate_thresholds', False):
kpis_need_to_recalculate_custom_thresholds.add(kpi_index)
continue
if kpi.get('time_variate_thresholds', False):
# Time variate thresholds specification
persisted_policies = persisted_kpi.get('time_variate_thresholds_specification', {}).get('policies', {})
current_policies = kpi.get('time_variate_thresholds_specification', {}).get('policies', {})
if len(persisted_policies) != len(current_policies):
kpis_need_to_recalculate_custom_thresholds.add(kpi_index)
continue
for policy_key, policy_definition in current_policies.items():
if policy_key not in persisted_policies:
kpis_need_to_recalculate_custom_thresholds.add(kpi_index)
break
current_thresholds = self.kpi_threshold_sort(policy_definition.get('aggregate_thresholds', {}))
persisted_thresholds = self.kpi_threshold_sort(
persisted_policies.get(policy_key).get('aggregate_thresholds', {})
)
if persisted_thresholds != current_thresholds:
kpis_need_to_recalculate_custom_thresholds.add(kpi_index)
break
else:
# Aggregate Thresholds
persisted_aggregate_thresholds = persisted_kpi.get('aggregate_thresholds', {})
current_aggregate_thresholds = kpi.get('aggregate_thresholds', {})
if (len(persisted_aggregate_thresholds.get('thresholdLevels', []))
!= len(current_aggregate_thresholds.get('thresholdLevels', []))):
kpis_need_to_recalculate_custom_thresholds.add(kpi_index)
continue
persisted_thresholds = self.kpi_threshold_sort(persisted_aggregate_thresholds)
current_thresholds = self.kpi_threshold_sort(current_aggregate_thresholds)
if persisted_thresholds != current_thresholds:
kpis_need_to_recalculate_custom_thresholds.add(kpi_index)
if len(kpis_need_to_recalculate_custom_thresholds) > 0:
for index in kpis_need_to_recalculate_custom_thresholds:
impacting_ctw = ctw_obj_interface.get(owner, service_kpis[index].get('active_custom_threshold_window'))
service_kpis[index] = calculate_custom_thresholds(service_kpis[index], impacting_ctw)
service['kpis'] = service_kpis
return service
def set_persisted_services_caching(self, val):
"""
Enable or disable persisted services caching for the service interface
@type val: Boolean
@param val: Enable persisted services caching?
"""
if val:
logger.info("Enabling persisted services caching...")
self.use_persisted_services_caching = True
else:
logger.info("Disabling persisted services caching...")
self.use_persisted_services_caching = False
self.persisted_services_dict = {}
self.persisted_services_by_id_dict = {}
@staticmethod
def convert_fields_to_fields_key(fields=None):
"""
Convert a list of fields into a key that uniquely identifies a list of fields for caching
@param fields: list of fields to retrieve, fetches all fields if not specified
* Please pass in fields in the same order each time, to avoid wasting cycles creating and comparing sets of
fields.
* Fields cannot include "_ALL"
@rtype: basestring
@return: Key that uniquely identifies a list of fields for caching
"""
if fields is None:
return "_ALL"
if "_key" not in fields:
fields = ['_key'] + fields
fields.sort()
return ",".join(fields)
def fetch_persisted_services(self, owner, fields=None, req_source='unknown', transaction_id=None):
"""
Fetch persisted services, either from cache or from KVStore, depending on whether persisted services caching is
enabled. This may update the cache based on the same flag.
@type owner: basestring
@param owner: user who is performing this operation
@type fields: list
@param fields: list of fields to retrieve, fetches all fields if not specified
* Please pass in fields in the same order each time, to avoid wasting cycles creating and comparing sets of
fields.
* Fields cannot include "_ALL"
@type req_source: basestring
@param req_source: identified source initiating the operation
@type transaction_id: basestring
@param transaction_id: unique identifier for transaction tracing
@rtype: list of dictionary
@return: objects retrieved on success
"""
if self.use_persisted_services_caching:
fields_key = self.convert_fields_to_fields_key(fields)
content = self.persisted_services_dict.get(fields_key)
if content is not None:
return content
content = self.get_bulk(owner, fields=fields, req_source=req_source, transaction_id=transaction_id)
self.persisted_services_dict[fields_key] = content
else:
content = self.get_bulk(owner, fields=fields, req_source=req_source, transaction_id=transaction_id)
return content
def fetch_persisted_services_by_id(self, owner, object_ids, fields=None, req_source='unknown', transaction_id=None):
"""
Fetch persisted services by ID, either from cache or from KVStore, depending on whether persisted services
caching is enabled. This may update the cache based on the same flag.
@type owner: basestring
@param owner: user who is performing this operation
@type object_ids: list of basestring
@param object_ids: list of IDs to fetch
* An empty list will return all services
@type fields: list of basestring
@param fields: list of fields to retrieve, fetches all fields if not specified
* Please pass in fields in the same order each time, to avoid wasting cycles creating and comparing sets of
fields.
* Fields cannot include "_ALL"
@type req_source: basestring
@param req_source: identified source initiating the operation
@type transaction_id: basestring
@param transaction_id: unique identifier for transaction tracing
@rtype: list of dictionary
@return: objects retrieved on success
"""
if self.use_persisted_services_caching:
if not object_ids:
return self.fetch_persisted_services(owner, fields=fields, req_source=req_source,
transaction_id=transaction_id)
else:
fields_key = self.convert_fields_to_fields_key(fields)
persisted_services_dict = self.persisted_services_by_id_dict.get(fields_key)
if persisted_services_dict is not None:
content = []
for key, service in persisted_services_dict.items():
if key in object_ids:
content.append(service)
else:
services = self.fetch_persisted_services(owner, fields=fields, req_source=req_source,
transaction_id=transaction_id)
services_by_id = {}
content = []
for service in services:
services_by_id[service['_key']] = service
if service['_key'] in object_ids:
content.append(service)
self.persisted_services_by_id_dict[fields_key] = services_by_id
else:
content = self.get_persisted_objects_by_id(
owner,
object_ids=object_ids,
req_source=req_source,
transaction_id=transaction_id,
fields=fields,
)
return content
def do_object_validation(self, owner, objects, validate_name=True, dupname_tag=None, transaction_id=None,
skip_local_failure=False, ignore_same_key=False, **kwargs):
return super(ItsiService, self).do_object_validation(
owner, objects, validate_name=validate_name, dupname_tag=dupname_tag, transaction_id=transaction_id,
skip_local_failure=skip_local_failure, ignore_same_key=ignore_same_key, **kwargs
)
def identify_dependencies(self, owner, objects, method, req_source='unknown', transaction_id=None,
skip_local_failure=False, dry_run=False):
"""
Assess refresh job data based upon changes
@param {string} owner: user which is performing this operation
@param {list} objects: list of object
@param {string} method: method name
@param {string} req_source: request source
@param {string} transaction_id: transaction ID
@param {bool} skip_local_failure: Should we log errors instead of raising them?
@param {bool} dry_run: Should this function avoid making changes to the environment?
This is an alternative usage of this function for validation purposes.
@return: a tuple
{boolean} set to true/false if dependency update is required
{list} list - list of refresh job, each element has the following
change_type: <identifier of the change used to pick change handler>,
changed_object_key: <Array of changed objects' keys>,
changed_object_type: <string of the type of object>
"""
refresh_jobs = []
is_refresh_required = False
if not utils.is_valid_list(objects):
logger.error("%s resource did not passed valid object list:%s", req_source, objects)
return is_refresh_required, refresh_jobs
fields_filter = None
# If delete, then fetch only what's needed to determine change which is a much smaller set than for other ops
if method == CRUDMethodTypes.METHOD_DELETE:
fields_filter = ['_key', 'object_type', 'kpis._key', 'kpis.adaptive_thresholds_is_enabled',
'kpis.adaptive_thresholding_training_window', 'kpis.search_type', 'kpis.base_search_id',
'kpis.enabled', 'kpis.is_entity_level_thresholding']
object_ids = [service.get('_key') for service in objects]
if method == CRUDMethodTypes.METHOD_CREATE:
persisted_services = self.fetch_persisted_services_by_id(
owner, object_ids, fields=fields_filter, req_source=req_source, transaction_id=transaction_id,
)
else:
persisted_services = self.get_persisted_objects_by_id(
owner, object_ids=object_ids, req_source=req_source, fields=fields_filter,
)
persisted_services_dict = dict((x['_key'], x) for x in persisted_services)
refresh_jobs.extend(self.get_shared_base_search_update_jobs(objects, persisted_services_dict, transaction_id, method))
updated_entities_service_keys = []
entity_updates_needed_detail = {'method': method, 'service_info': {}}
if method == CRUDMethodTypes.METHOD_DELETE:
deleted_kpis = {'adhoc': [], 'shared_base': []}
disabled_entity_level_thresholding_kpis = []
atad_changed_kpis = {}
kpi_dict = {}
kpi_svc_dict = {} # kpi to service mapping
for service in persisted_services:
for kpi in service.get("kpis", []):
if not utils.is_valid_dict(kpi):
# Be resilient
continue
kpi_dict[kpi.get("_key")] = kpi
kpi_svc_dict[kpi.get("_key")] = service.get("_key")
atad_changed_kpis.update(self._determine_changed_kpis_at_ad(None, service,
"adaptive_thresholds_is_enabled",
"adaptive_thresholding_training_window"))
for kpi in service.get("kpis", []):
if not utils.is_valid_dict(kpi):
# Be resilient
continue
if utils.is_valid_dict(kpi):
kpi_id = kpi.get('_key', '')
kpi_type = kpi.get('search_type', '')
if kpi_type == 'shared_base':
deleted_kpis['shared_base'].append(kpi_id)
else:
deleted_kpis['adhoc'].append(kpi_id)
# Service delete implies, any entity associated with the service via inclusion rules must be updated
entity_updates_needed_detail['service_info'][service["_key"]] = {
'title': service.get('title')
}
updated_entities_service_keys.append(service["_key"])
# Service delete also implies that any entity filter rule created needs to be deleted
entity_filter_rule_delete_filter = {
'$or': [{'service_id': service.get('_key')} for service in objects]
}
ItsiEntityFilterRule(self.session_key, 'nobody').delete_bulk(
owner, filter_data=entity_filter_rule_delete_filter)
# When the service is being deleted, we need to clear the cache entry
kpi_state_cache = ItsiKPIStateCache(self.session_key, owner)
for kpi_types in deleted_kpis.values():
for kpi_key in kpi_types:
cache_entry = kpi_state_cache.delete(owner, kpi_key)
if cache_entry:
logger.debug('Successfully deleted KPI state cache entry for KPI with id ({})'.format(kpi_key))
self._enqueue_atad_refresh_jobs(refresh_jobs, kpi_dict, kpi_svc_dict,
'service_kpi_at', atad_changed_kpis, transaction_id)
disabled_entity_level_thresholding_kpis.extend(
self._determine_disabled_entity_level_thresholding(None, service)
)
refresh_jobs.append(
self.get_refresh_job_meta_data(
"delete_service",
[service.get('_key') for service in objects],
self.object_type,
change_detail={"deleted_kpis": deleted_kpis},
transaction_id=transaction_id
)
)
if len(entity_updates_needed_detail['service_info']) > 0:
refresh_jobs.append(
self.get_refresh_job_meta_data(
"service_entities_update",
updated_entities_service_keys,
self.object_type,
entity_updates_needed_detail,
transaction_id=transaction_id
)
)
if len(disabled_entity_level_thresholding_kpis) > 0:
refresh_jobs.append(
self.get_refresh_job_meta_data("kpi_entity_thresholds_deletion", disabled_entity_level_thresholding_kpis,
'kpi',
transaction_id=transaction_id)
)
else:
# Get service bulk
# add key if does not exist
for service in objects:
if not service.get("_key"):
service["_key"] = ITOAInterfaceUtils.generate_backend_key()
saved_search_changed_kpis = {}
updated_entity_filters = [] # a list to contain dicts representing new filter_rule objects
# a list to contain the keys of KPI_ids that no longer filter to entities and dont need entity filter rules
delete_entity_rule_filter = {'$or': []}
updated_entities_service_keys = [] # contain list of array of services _key which has entities list updated
updated_entities_service_detail = {} # dict of service key to added and removed entities for that service
deleted_kpi_ids = []
deleted_kpi_detail = {}
backfill_enabled_service_keys = [] # list of array of services _key which has KPIs with backfill enabled
backfill_enabled_service_detail = {} # dict of service key to list of KPIs with backfill enabled
kpi_dict = {} # kpiid -> KPI
kpi_svc_dict = {} # kpiid -> serviceid
at_changed_kpis = {} # kpiid -> change summary (e.g. 'on', 'off', 'changed')
ad_changed_kpis = {} # kpiid -> change summary (e.g. 'on', 'off', 'changed')
cad_changed_kpis = {} # kpiid -> change summary (e.g. 'on', 'off', 'changed')
alert_period_changed_kpis = {}
updated_service_dependencies = {}
service_kpis_changed_to_shared_search = {} # kpis which are changed from adhoc/datamodel to shared based
disabled_entity_level_thresholding_kpis = []
objects, remove_depending_on_me = self._validate_service_dependencies(objects, method)
filter_rule_object = ItsiEntityFilterRule(self.session_key, 'nobody')
# add refresh queue job for updating 'services_depends_on' based on 'services_depending_on_me'
if remove_depending_on_me:
updated_service_dependencies['services_depending_on_me'] = remove_depending_on_me
for service in objects:
persisted_service = persisted_services_dict.get(service['_key'])
# we are doing asynchronous handling of the linking the service to the base service template here.
# This will resolve issue mentioned here ITSI-25182
# if new service is created from the base_service_template then we have to add it in the base_service_template.
if method == CRUDMethodTypes.METHOD_CREATE and not self.is_publish_flow:
service_key = service.get('_key', '')
base_service_template_key = service.get('base_service_template_id', '')
if service_key and base_service_template_key:
refresh_jobs.append(
self.get_refresh_job_meta_data(
change_type='link_service_with_base_service_template',
changed_object_key=base_service_template_key,
changed_object_type=self.object_type,
change_detail={'service_key': service_key, 'current_user': self.current_user_name,
'owner': owner},
transaction_id=transaction_id
)
)
# Get list of kpis which is changed from ad-hoc/datamodel to shared search, can possible only for
# upgrade
if method == CRUDMethodTypes.METHOD_UPDATE or method == CRUDMethodTypes.METHOD_UPSERT:
list_kpis_changed_to_shared_search = self._determine_changed_search_type(service, persisted_service)
service_key = service.get('_key')
if len(list_kpis_changed_to_shared_search) > 0 and \
service_key in service_kpis_changed_to_shared_search:
service_kpis_changed_to_shared_search[service_key].extend(list_kpis_changed_to_shared_search)
elif len(list_kpis_changed_to_shared_search) > 0:
service_kpis_changed_to_shared_search[service_key] = list_kpis_changed_to_shared_search
if persisted_service is not None:
for kpi in persisted_service.get("kpis", []): # kpi_dict must include info from persisted services
if not utils.is_valid_dict(kpi):
# Be resilient
continue
kpi_dict[kpi.get("_key")] = kpi # to handle KPI deletions gracefully
kpi_svc_dict[kpi.get("_key")] = persisted_service.get("_key")
for kpi in service.get("kpis", []): # append/overwrite kpi_dict entries with info for current service
if not utils.is_valid_dict(kpi):
# Be resilient
continue
kpi_dict[kpi.get("_key")] = kpi
kpi_svc_dict[kpi.get("_key")] = service.get("_key")
ad_changed_kpis.update(self._determine_changed_kpis_at_ad(service, persisted_service,
"anomaly_detection_is_enabled", None,
settings_field='trending_ad'))
at_changed_kpis.update(self._determine_changed_kpis_at_ad(service, persisted_service,
"adaptive_thresholds_is_enabled",
"adaptive_thresholding_training_window"))
cad_changed_kpis.update(self._determine_changed_kpis_at_ad(service, persisted_service,
"cohesive_anomaly_detection_is_enabled",
None,
settings_field='cohesive_ad'))
disabled_entity_level_thresholding_kpis.extend(
self._determine_disabled_entity_level_thresholding(service, persisted_service))
alert_period_changed_kpis.update(self._determine_changed_alert_period(service, persisted_service))
if not updated_service_dependencies.get('services_depends_on'):
updated_service_dependencies['services_depends_on'] = {}
updated_service_dependencies['services_depends_on'].update(
self._determine_changed_service_dependencies(service, persisted_service))
if persisted_service is not None:
# Save away entity rules to handle entity membership in refresh job
entity_updates_needed_detail['service_info'][service["_key"]] = {
'entity_rules': service.get('entity_rules', []),
'title': service.get('title')
}
updated_entities_service_keys.append(service["_key"])
# Check of KPI deletion
persisted_kpi_ids = [persisted_kpi.get("_key")
for persisted_kpi in persisted_service.get("kpis", [])]
kpi_ids = [kpi.get("_key") for kpi in service.get("kpis", [])]
service_deleted_kpis = list(set(persisted_kpi_ids) - set(kpi_ids))
if len(service_deleted_kpis) > 0:
deleted_kpi_detail[service.get('_key')] = service_deleted_kpis
deleted_kpi_ids.extend(service_deleted_kpis)
# If the service became disabled
if not service.get('enabled') and persisted_service.get('enabled') and not dry_run:
kpi_state_cache = ItsiKPIStateCache(self.session_key, owner)
for kpi_key in kpi_ids:
cache_entry = kpi_state_cache.delete(owner, kpi_key)
if cache_entry:
logger.debug('Successfully deleted KPI state cache entry for KPI with id ({})'.format(
kpi_key
))
# Check for backfill being enabled
persisted_kpi_backfill_on = dict((kpi.get("_key"), kpi)
for kpi in persisted_service.get("kpis", [])
if kpi.get("backfill_enabled"))
kpi_backfill_on = dict((kpi.get("_key"), kpi)
for kpi in service.get("kpis", [])
if kpi.get("backfill_enabled"))
enabled_backfill_kpis = list(set(kpi_backfill_on) - set(persisted_kpi_backfill_on))
if len(enabled_backfill_kpis) > 0:
logger.debug('Found backfill KPIs on update service.')
backfill_enabled_service_detail[service.get("_key")] = {"kpis": enabled_backfill_kpis}
backfill_enabled_service_keys.append(service.get("_key"))
# check to see if thresholds were changed for existing kpi compared to persisted
updated_service = self._determine_custom_thresholds_need_update(owner, service, persisted_service)
service['kpis'] = updated_service['kpis']
else:
# Service create implies, any entity associated with the service via inclusion rules must be updated
# Save away entity rules to handle entity membership in refresh job
entity_updates_needed_detail['service_info'][service["_key"]] = {
'entity_rules': service.get('entity_rules', []),
'title': service.get('title')
}
updated_entities_service_keys.append(service["_key"])
# it would be new service is does not exist in KV store
if len(service.get("entities", [])) > 0:
logger.debug('Found backfill KPIs on new service.')
updated_entities_service_detail[service.get("_key")] = {
"added_entities": service.get("entities", []), "removed_entities": []}
updated_entities_service_keys.append(service.get("_key"))
# get the set of KPIs with backfill enabled
kpi_backfill_on = dict((kpi.get("_key"), kpi)
for kpi in service.get("kpis", []) if kpi.get("backfill_enabled"))
if len(kpi_backfill_on) > 0:
backfill_enabled_service_detail[service.get("_key")] = {"kpis": list(kpi_backfill_on.keys())}
backfill_enabled_service_keys.append(service.get("_key"))
# Handling all the savedsearch generations
changed_kpis, new_filter_rules, filters_to_remove = self._determine_generate_changed_search_param(
service, persisted_service)
saved_search_changed_kpis.update(changed_kpis)
updated_entity_filters.extend(new_filter_rules)
for delete_kpi_filter_id in filters_to_remove:
delete_entity_rule_filter['$or'].append({'kpi_id': delete_kpi_filter_id})
if not dry_run:
try:
self.update_savedsearches(service.get('_key'), saved_search_changed_kpis, transaction_id)
except Exception as e:
if skip_local_failure:
logger.error('Local failure skipped: %s', e)
else:
raise e
if delete_entity_rule_filter['$or']:
filter_rule_object.delete_bulk('nobody', filter_data=delete_entity_rule_filter)
if updated_entity_filters:
paginated_entity_filter_rules = [updated_entity_filters[i:i + MAX_SAVE_LIMIT]
for i in range(0, len(updated_entity_filters), MAX_SAVE_LIMIT)]
for subset_entity_filter_rules in paginated_entity_filter_rules:
try:
filter_rule_object.save_batch(owner, subset_entity_filter_rules, validate_names=False)
except Exception:
logger.exception('Error while bulk saving entity filter rules - logging and continuing '
'entity filter list = %s, tid = %s' % (subset_entity_filter_rules,
transaction_id)
)
# update information to job queue
if len(entity_updates_needed_detail['service_info']) > 0:
refresh_jobs.append(
self.get_refresh_job_meta_data(
"service_entities_update",
updated_entities_service_keys,
self.object_type,
change_detail=entity_updates_needed_detail,
transaction_id=transaction_id
)
)
if len(deleted_kpi_ids) > 0:
refresh_jobs.append(
self.get_refresh_job_meta_data("service_kpi_deletion", deleted_kpi_ids, "kpi",
change_detail={'service_kpi_mapping': deleted_kpi_detail},
transaction_id=transaction_id))
if len(backfill_enabled_service_keys) > 0:
refresh_jobs.append(
self.get_refresh_job_meta_data("service_kpi_backfill_enabled", backfill_enabled_service_keys,
self.object_type,
change_detail=backfill_enabled_service_detail,
transaction_id=transaction_id)
)
if len(disabled_entity_level_thresholding_kpis) > 0:
refresh_jobs.append(
self.get_refresh_job_meta_data("kpi_entity_thresholds_deletion", disabled_entity_level_thresholding_kpis,
'kpi',
transaction_id=transaction_id)
)
if len(updated_service_dependencies) > 0:
updated_service_dependencies_keys = []
if updated_service_dependencies.get('services_depends_on'):
updated_service_dependencies_keys.extend(
list(updated_service_dependencies['services_depends_on'].keys()))
if updated_service_dependencies.get('services_depending_on_me'):
updated_service_dependencies_keys.extend(
list(updated_service_dependencies['services_depending_on_me'].keys()))
if updated_service_dependencies_keys:
refresh_jobs.append(
self.get_refresh_job_meta_data("service_dependency_changed", updated_service_dependencies_keys,
self.object_type,
change_detail=updated_service_dependencies,
transaction_id=transaction_id)
)
# Create job to delete KPI saved search because it is changed from Ad/hoc to shared
if len(service_kpis_changed_to_shared_search) > 0:
refresh_jobs.append(self.get_refresh_job_meta_data('modify_kpi_search_type',
list(service_kpis_changed_to_shared_search.keys()),
'service_kpi',
change_detail=service_kpis_changed_to_shared_search,
transaction_id=transaction_id))
# Ignore the kpis in the deleted list
for i in deleted_kpi_ids:
if i in ad_changed_kpis:
ad_changed_kpis.pop(i)
if i in cad_changed_kpis:
cad_changed_kpis.pop(i)
logger.debug("Changed KPIs for AD %s", ad_changed_kpis)
self._enqueue_atad_refresh_jobs(refresh_jobs, kpi_dict, kpi_svc_dict,
'service_kpi_ad', ad_changed_kpis, transaction_id)
logger.debug("Changed KPIs for Cohesive AD %s", cad_changed_kpis)
self._enqueue_atad_refresh_jobs(refresh_jobs, kpi_dict, kpi_svc_dict,
'service_kpi_cad', cad_changed_kpis, transaction_id)
logger.debug("Changed KPIs for AT %s", at_changed_kpis)
self._enqueue_atad_refresh_jobs(refresh_jobs, kpi_dict, kpi_svc_dict,
'service_kpi_at', at_changed_kpis, transaction_id)
logger.debug("Changed KPIs for alert period %s", alert_period_changed_kpis)
if len(alert_period_changed_kpis) > 0:
kpis_keys = list(alert_period_changed_kpis.keys())
refresh_jobs.append(
self.get_refresh_job_meta_data('service_kpi_update_alert_period', kpis_keys, 'kpi',
change_detail=alert_period_changed_kpis,
transaction_id=transaction_id))
is_refresh_required = len(refresh_jobs) > 0
return is_refresh_required, refresh_jobs
def _enqueue_atad_refresh_jobs(self, refresh_jobs, kpi_dict, kpi_svc_dict,
change_type, changed_kpis, transaction_id):
"""
Change jobs for anomaly detection or adaptive thresholding get created
whenever we detect that the on/off flag has changed state, or when key
parameters (e.g. training window, or thresholding method) have changed.
A job will be created per training window track; the job will contain a list of
KPIs with AD or AT turned on. The change handler will create an appropriate search.
If an empty list is passed for the changed_object_key's, the handler will
delete the saved search for that track
@param refresh_jobs: refresh jobs list
@param kpi_dict: full dict of all KPI objects for all services (keyed by KPI ID)
@param kpi_svc_dict: kpiid -> serviceid mapping
@param changed_kpis: full dict of changed KPI objects for all services;
keyed by KPI ID, values are one of ('on', 'off', 'changed'),
depending on whether anomaly detection / adaptive thresholding
- was newly switched on
- was newly switched off
- had parameters changed
"""
if change_type == 'service_kpi_ad':
STATE_FIELD = 'anomaly_detection_is_enabled'
TRACK_FIELD = ''
KPI_DATA_FIELDS = ['trending_ad']
elif change_type == 'service_kpi_at':
STATE_FIELD = 'adaptive_thresholds_is_enabled'
TRACK_FIELD = 'adaptive_thresholding_training_window'
KPI_DATA_FIELDS = []
elif change_type == 'service_kpi_cad':
STATE_FIELD = 'cohesive_anomaly_detection_is_enabled'
TRACK_FIELD = ''
KPI_DATA_FIELDS = ['cohesive_ad']
else:
raise ValueError('Invalid `change_type` argument: %s.' % change_type)
if len(changed_kpis) == 0:
logger.debug("No change detected in AT/AD settings")
return # no-op if the set of affected KPIs is empty
def _get_kpi_data_dict(kpi):
data_dict = dict((x, kpi.get(x)) for x in KPI_DATA_FIELDS)
data_dict['anomaly_detection_is_enabled'] = kpi.get(STATE_FIELD)
data_dict['training_window'] = kpi.get(TRACK_FIELD)
data_dict['service_id'] = kpi_svc_dict.get(kpi['_key'])
data_dict['change_summary'] = changed_kpis.get(kpi['_key']) or 'unchanged'
data_dict['alert_period'] = str(kpi.get('alert_period')) + 'm'
return data_dict
_kpi_data = {} # abridged KPI dict
_kpi_ids = []
for kpi_id in changed_kpis:
_kpi_data[kpi_id] = _get_kpi_data_dict(kpi_dict[kpi_id])
_kpi_ids.append(kpi_id)
refresh_jobs.append(
self.get_refresh_job_meta_data(change_type, _kpi_ids, 'kpi', change_detail={'kpi_data': _kpi_data},
transaction_id=transaction_id))
def get_entities(self, owner, service_ids, req_source="static services link", transaction_id=None):
"""
Get entities which contains of one of given service id
@param {string} owner: owner
@param {list} service_ids: list of service id
@param {string} req_source:
@return: {list} - list of service ids
"""
filter_data = {
'$or': [{'services': service_id} for service_id in service_ids]
}
entity_object = ItsiEntity(self.session_key, self.current_user_name)
entities = entity_object.get_bulk(owner, filter_data=filter_data,
req_source=req_source, transaction_id=transaction_id)
return entities
def delete_kpi_saved_searches(self, kpi_saved_search_names):
"""
Delete kpi alert saved searches
Note this is a primitive and performs no clean up of service objects
@param {list} kpi_saved_search_names: list of search names to delete
@return: True or False based upon operation success or failure
:exception exception if things do not go well
"""
status_ok = True
# Delete saved search for kpi
for saved_search_name in kpi_saved_search_names:
# TODO: delete_kpi_saved_searches is DEFINITELY broken and needs an update
# This is not the kind of heuristic I should be going with to determine if we can delete the search
# We're checking to make sure that the indicator search does not contain the search prefix we expect
if ItsiSharedAdhocSearch.search_prefix in saved_search_name:
continue
ret = True
try:
ret = SavedSearch.delete_search(self.session_key, saved_search_name)
except splunk.ResourceNotFound:
logger.exception('Saved search "%s" was not found, ignoring delete', saved_search_name)
except Exception as e:
logger.exception('Caught exception trying to delete saved search "%s". Error: %s', saved_search_name, e)
ret = False
if not ret:
logger.error('Failed to delete saved search "%s".', saved_search_name)
status_ok = False
else:
logger.info('Successfully deleted saved search "%s".', saved_search_name)
return status_ok
def _update_savedsearches(self, saved_searches):
"""
Update given saved searches synchronously.
@type saved_searches: dict
@param saved_searches: dictionary. key'ed by kpi title. value being saved search settings
@rtype: None
@returns nothing.
"""
for ss in saved_searches:
title = ss.pop('kpi_title', None)
acl_update = ss.pop('acl_update', True)
try:
ret = SavedSearch.update_search(self.session_key, ss.get('name'), 'itsi', 'nobody', **ss)
if ret:
logger.debug("Successfully created/update saved search=%s for KPI=%s", ss.get('name'), title)
if acl_update:
ret = SavedSearch.update_acl(self.session_key, ss.get('name'),
'nobody' # All searches get saved under the nobody context
)
if not ret:
msg = ('Failed to update ACL settings for savedsearch: "{}";'
' KPI: "{}". Do this manually.').format(ss.get('name'), title)
logger.error(msg)
# At this stage we could either:
# i) raise an exception, delete KPI and bail out or
# ii) continue with KPI creation and let Admin manually change
# ACL settings for this KPI Saved Search via GUI.
# We will execute ii)
else:
raise Exception('Failed to update saved searches.')
except Exception as e:
logger.error(e)
failed_object_detail = "Failed to create saved search={0} for KPI={1}.".format(ss.get('name'), title)
message = '{}. Error Response: {}.'.format(failed_object_detail, e)
raise Exception(message)
return
def post_save_setup(self, owner, ids, services, req_source='unknown', method=CRUDMethodTypes.METHOD_UPSERT,
transaction_id=None, skip_local_failure=False, dry_run=False, **kwargs):
"""
Performs additional synchronous operations on other objects after writing services to kvstore.
@type owner: string
@param owner: user who is performing this operation
@type ids: List of dict identifiers in format {"_key": <key>} returned by kvstore, parity with objects passed in
@param ids: list of dict
@type services: list of dictionary
@param services: list of services being written
@type req_source: string
@param req_source: string identifying source of this request
@type method: basestring
@param method: operation type. Defaults to upsert.
@type transaction_id: basestring
@param transaction_id: transaction id for end-end tracing.
@type dry_run: Boolean
@param dry_run: Should this function avoid making changes to the environment?
This is an alternative usage of this function for validation purposes.
@return: none, throws exceptions on errors
"""
# Used in service sandbox
if self.use_persisted_services_caching and method == CRUDMethodTypes.METHOD_CREATE:
for cache_list in self.persisted_services_dict.values():
cache_list.extend(services)
for cache_dict in self.persisted_services_by_id_dict.values():
for new_service in services:
cache_dict[new_service['_key']] = new_service
# Used in KPI endpoints
elif self.use_persisted_services_caching and method == CRUDMethodTypes.METHOD_UPDATE:
for service in services:
for cache_list in self.persisted_services_dict.values():
for i in range(len(cache_list)):
if cache_list[i]["_key"] == service["_key"]:
cache_list[i] = service
break
for cache_dict in self.persisted_services_by_id_dict.values():
cache_dict[service["_key"]] = service
elif self.use_persisted_services_caching:
raise Exception("Unexpected usage of service caching-layer")
if dry_run:
return
# NOTE: Here, we update base service templates synchronously after service creation/update is done.
# Considering, number of service template objects would not be as high as other itoa objects like
# services and entities, we're performing synchronous update of service template in bulk here. If,
# we hit performance issues with synchronous update of service templates, then we can move it to
# async refresh queue job, using change handler.
if self.base_service_templates is not None and len(self.base_service_templates) > 0:
service_template_interface = instantiate_object(self.session_key, 'nobody',
'base_service_template', logger=logger)
service_template_interface.skip_service_template_update = True
service_template_interface.save_batch(owner, self.base_service_templates,
validate_names=True,
req_source=req_source,
ignore_refresh_impacted_objects=True,
method=CRUDMethodTypes.METHOD_UPDATE,
transaction_id=transaction_id)
service_template_interface.skip_service_template_update = False
logger.info('Total `{0}` base service templates updated after update/creation '
'of `{1}` linked services. transaction_id = {2}'.format(len(self.base_service_templates),
ids, transaction_id))
# Reset base service templates attribute for next publish
# This is a quick and dirty bugfix for an imminent release. Ideally, we should restructure the code so each
# publish can't disrupt a following publish.
self.base_service_templates = None
def post_delete(self, owner, deleted_service_ids, req_source='unknown',
method=CRUDMethodTypes.METHOD_UPSERT, transaction_id=None):
"""
Perform post delete operation after deleting services from kvstore
Update service template linked services to remove the services that got deleted
:param owner: owner
:param deleted_service_ids: deleted services
"""
if len(deleted_service_ids) == 0:
return
if method == CRUDMethodTypes.METHOD_UPDATE or method == CRUDMethodTypes.METHOD_UPSERT:
service_template_utils = ServiceTemplateUtils(self.session_key, "nobody")
service_templates = service_template_utils. \
get_service_template_of_linked_services(owner, deleted_service_ids,
req_source=req_source, transaction_id=transaction_id)
logger.debug("Number of impacted service templates:%s", len(service_templates))
if len(service_templates) == 0:
return
info = [{'title': template.get("title"), "_key": template.get("_key")} for template in service_templates]
logger.debug("Impacted service templates:%s", info)
try:
for template in service_templates:
linked_services = template.get('linked_services', [])
if len(linked_services) > 0:
updated_linked_services = []
for service_id in linked_services:
if service_id not in deleted_service_ids:
updated_linked_services.append(service_id)
template['linked_services'] = updated_linked_services
template['total_linked_services'] = len(template['linked_services'])
logger.info("Updated service linkage of service template:%s, _key: %s, update linked "
"services:%s", template.get('title'), template.get('_key'),
template.get('linked_services'))
service_template_utils.save_service_templates(owner, service_templates, transaction_id=transaction_id)
return
except Exception as e:
logger.exception(e)
return
def update_savedsearches(self, service_key, saved_search_changed_kpis, transaction_id=None):
"""
Update and generate the savedsearches
@type service_key: basestring
@param service_key: service _key
@type saved_search_changed_kpis: dict
@param saved_search_changed_kpis: list of kpis that need to be updated
@type transaction_id: basestring
@param transaction_id: transaction id for debugging purpose
@rtype: None
@returns nothing
"""
refresh_jobs = []
refresh_jobs_change_details = {}
savedsearch_list = []
for kpi_id, savedsearch in saved_search_changed_kpis.items():
savedsearch_list.append(savedsearch)
refresh_jobs_change_details[kpi_id] = {'search_data': json.dumps(savedsearch),
'kpi_title': savedsearch.get('title')}
size = len(savedsearch_list)
if size == 0:
return
if 0 < size < 10:
self._update_savedsearches(savedsearch_list)
else:
refresh_jobs.append(self.get_refresh_job_meta_data(
'create_or_update_kpi_saved_search',
service_key, self.object_type,
change_detail=refresh_jobs_change_details,
transaction_id=transaction_id
))
logger.info('Creating create_or_update_kpi_saved_search refresh jobs for service=%s', service_key)
self.create_refresh_jobs(refresh_jobs, synchronous=self.synchronous)
@staticmethod
def add_required_fields_to_new_kpi_from_service_template(new_kpi, service_template_id):
"""
Add required fields to a new kpi, while adding it to a service from a service template.
@type new_kpi: dict
@param new_kpi: new kpi to be added in service.
@type service_template_id: basestring
@param service_template_id: key of service template to which service is linked
@return: None
"""
fields_to_be_added_to_kpi = (
('_key', ITOAInterfaceUtils.generate_backend_key()),
('type', 'kpis_primary'),
('base_service_template_id', service_template_id)
)
for field, value in fields_to_be_added_to_kpi:
new_kpi[field] = value
def update_services_with_base_service_templates(self, services, persisted_services, owner, req_source='unknown',
method=CRUDMethodTypes.METHOD_UPSERT, transaction_id=None):
"""
Update service objects with base service template objects content, when service has to be created from
base service template.
If, no service is linked to base service template, then, does nothing.
@type services: list
@param services: list of service objects
@type persisted_services: list
@param persisted_services: list of persisted services in kvstore
@type owner: string
@param owner: user who is performing this operation
@type req_source: string
@param req_source: string identifying source of this request
@type method: basestring
@param method: operation type. Defaults to upsert.
@type transaction_id: basestring
@param transaction_id: transaction id for end-end tracing.
@return: none, throws exceptions on errors
"""
# used to get service templated id from service id
service_to_service_template_map = {}
service_template_get_filter = {
'$or': []
}
for service in services:
if not isinstance(service, dict):
self.raise_error_bad_validation(
logger, 'Invalid type for service. Expected JSON, received {}.'.format(type(service).__name__)
)
if service.get('_key'):
service_to_service_template_map[service.get('_key')] = service.get('base_service_template_id', '')
# if service doesn't have a key, then assume it is a new service to be created
elif service.get('base_service_template_id'):
service_template_get_filter['$or'].append({'_key': service.get('base_service_template_id')})
services_unlink_map = {}
persisted_services_kpis_map = {}
process_services = False
# validation of update request for services linked to service templates. always validate
# base_service_template_id in service by comparing it with persisted service in kvstore. We do not allow
# re-linking of service to another service template, through update/bulk-update endpoint for service.
if method != CRUDMethodTypes.METHOD_CREATE:
for persisted_service in persisted_services:
# update request for service
if persisted_service.get('_key') in service_to_service_template_map:
persisted_service_template = persisted_service.get('base_service_template_id', '')
updated_service_template = service_to_service_template_map.get(persisted_service.get('_key'), '')
# Unlink operation
if persisted_service_template and not updated_service_template:
# Get related service template
service_template_get_filter['$or'].append({
'_key': persisted_service.get('base_service_template_id')
})
services_unlink_map[persisted_service['_key']] = \
persisted_service.get('base_service_template_id')
# pop the service ids from map which are not create service request from service template
service_to_service_template_map.pop(persisted_service.get('_key'), None)
# if base service template id for a service in request is not same as for the
# service in kvstore, then raise bad validation error
elif persisted_service_template != updated_service_template:
self.raise_error_bad_validation(logger,
('Invalid service template id provided while updating '
'service `{0}`. Cannot re-link service to another service'
' template through this endpoint, check ITSI API docs to'
' find API for re-linking of service to another service'
' template. Expected service template id `{1}`, found `{2}`')
.format(persisted_service.get('_key', None),
persisted_service.get('base_service_template_id', None),
service_to_service_template_map.get(
persisted_service.get('_key', ''), None))
)
# check for update in KPI thresholds and validate update requested, for
# services linked to service template
elif persisted_service_template and updated_service_template:
for kpi in persisted_service.get('kpis', []):
if kpi['_key'].startswith(self.SHKPI_STARTS_WITH) \
or not kpi.get('base_service_template_id', ''):
continue
if persisted_service['_key'] not in persisted_services_kpis_map:
persisted_services_kpis_map[persisted_service['_key']] = {
kpi['_key']: kpi
}
else:
persisted_services_kpis_map[persisted_service['_key']][kpi['_key']] = kpi
if persisted_services_kpis_map:
process_services = True
# pop the service ids from map which are not create service request from service template
service_to_service_template_map.pop(persisted_service.get('_key'), None)
# add service templates to filter to be fetched
for service_id in service_to_service_template_map:
# only fetch service templates with valid key
if service_to_service_template_map.get(service_id):
service_template_get_filter['$or'].append({'_key': service_to_service_template_map[service_id]})
service_templates_map = {}
# fetch service templates for creating services or for unlinking services
if len(service_template_get_filter.get('$or')) > 0:
logger.info('Some of the service objects in create/update request are requested to be'
' created or unlinked from service template. transaction_id="%s"' % transaction_id)
service_template_interface = instantiate_object(self.session_key, 'nobody',
'base_service_template', logger=logger)
# fetch service template objects from kvstore in bulk
self.base_service_templates = service_template_interface.get_bulk(owner,
req_source=req_source,
filter_data=service_template_get_filter,
transaction_id=transaction_id)
if len(self.base_service_templates) == 0:
self.raise_error_bad_validation(logger,
('Could not find service template object(s) with id(s) `{}`. '
'Cannot create service(s) from non-existent service template(s).')
.format(service_template_get_filter.get('$or'))
)
# construct service template key to object map. used by service object to
# quickly get service template content by id
for service_template in self.base_service_templates:
service_templates_map[service_template.get('_key')] = service_template
process_services = True
if process_services:
for service in services:
# handle new service to be created from base service template. service without
# a key and no kpis means a new service to be created
if service.get('base_service_template_id') and \
(not service.get('_key') or service.get('_key') in service_to_service_template_map):
service_template = service_templates_map.get(service.get('base_service_template_id'), {})
if not utils.is_valid_str(service.get('_key')):
service['_key'] = ITOAInterfaceUtils.generate_backend_key()
# Special handling for service tags
if 'service_tags' not in service:
service['service_tags'] = dict()
if 'tags' not in service['service_tags']:
service['service_tags']['tags'] = []
service['service_tags']['template_tags'] = service_template.get('template_tags', [])
# the clone service payload is also sent without a key, but with all kpis defined
# copying from service template content could be skipped
if not service.get('kpis', []):
# generate service health kpi
service['kpis'] = [ITOAInterfaceUtils.generate_shkpi_dict(service['_key'])]
# tuple of tuples containing field name and it's default value, in case field is missing
# from service template
fields_to_copy_from_template = (('kpis', []), ('entity_rules', []),
('serviceTemplateId', ''),
('is_healthscore_calculate_by_entity_enabled', 1))
for field, default_value in fields_to_copy_from_template:
if field == 'kpis':
service[field].extend(copy.deepcopy(service_template.get(field, default_value)))
continue
service[field] = copy.deepcopy(service_template.get(field, default_value))
# tuple of tuples containing field name to be added to kpis as first element and
# field's value as second element of tuple
fields_to_be_added_to_kpi = (
# considering backfill fields are sent in service payload, while creating
# service from service template
('backfill_enabled', service.get('backfill_enabled', False)),
('backfill_earliest_time', service.get('backfill_earliest_time', '-7d'))
)
# add fields to service kpis not present in service template kpis
for kpi in service.get('kpis', []):
if kpi['_key'].startswith(self.SHKPI_STARTS_WITH):
continue
self.add_required_fields_to_new_kpi_from_service_template(kpi, service.
get('base_service_template_id'))
for field, value in fields_to_be_added_to_kpi:
kpi[field] = value
# pop below field, if it is passed while creation of service from service
# template. as it can cause repercussions while updating unchanged linked
# kpis with service template in BaseServiceTemplateUpdateHandler
kpi.pop('linked_kpi_thresholds_updated', None)
# pop redundant fields.
service.pop('backfill_enabled', None)
service.pop('backfill_earliest_time', None)
logger.debug('Updated service content of `{0}` with base service template `{1}`.'
' transaction_id = {2}'.format(service.get('_key'),
service.get('base_service_template_id'),
transaction_id))
#####
# Make updates to service template object to reflect link with new service. Since, the newly linked
# service is a new service, therefore, we don't need to validate, if service key already exists in
# linked services list or not. Actual update of service templates in kvstore occurs in
# post_save_setup(), after service objects get saved in kvstore.
# we are handling 'linked_services' asynchronously for newly created services in identify_dependencies() method
#####
if method != CRUDMethodTypes.METHOD_CREATE:
if service_template.get('linked_services') is not None:
if service.get('_key') not in service_template['linked_services']:
service_template['linked_services'].append(service.get('_key'))
else:
service_template['linked_services'] = [service.get('_key')]
elif services_unlink_map and service.get('_key') in services_unlink_map:
# Unlink service from service template
# Please note: since the payload contains all the necessary changes on service
# Only service template unlink is needed
service_template = service_templates_map.get(services_unlink_map[service.get('_key')], {})
if service.get('_key') in service_template['linked_services']:
service_template['linked_services'].remove(service.get('_key'))
# Handle service tags - move the template tags if any to service's tags
if 'service_tags' not in service:
service['service_tags'] = dict()
if 'tags' not in service['service_tags']:
service['service_tags']['tags'] = []
if 'template_tags' not in service['service_tags']:
service['service_tags']['template_tags'] = []
all_tags = service['service_tags']['tags'] + service['service_tags'][
'template_tags']
service['service_tags']['tags'] = all_tags
service['service_tags']['template_tags'] = []
# Validate KPI content and check for KPI thresholds update, for services linked to template
elif service.get('_key') in persisted_services_kpis_map:
for kpi in service.get('kpis', []):
if kpi['_key'].startswith(self.SHKPI_STARTS_WITH):
continue
if kpi['_key'] in persisted_services_kpis_map[service['_key']]:
persisted_service_kpi = persisted_services_kpis_map[service['_key']][kpi['_key']]
if not kpi.get('base_service_template_id', ''):
self.raise_error_bad_validation(
logger, ('The base_service_template_id must be provided. A KPI cannot be unlinked '
'from a service template, only a service can be unlinked from a service '
'template. service_id="%s", kpi_id="%s"') %
(service.get('_key'), kpi.get('_key'))
)
elif kpi.get('base_service_template_id', '') \
!= persisted_service_kpi.get('base_service_template_id', ''):
self.raise_error_bad_validation(
logger, ('Invalid base_service_template_id provided for KPI. A service can be'
' linked to only one service template. service_id="%s", kpi_id="%s", '
'service_base_template_id="%s", kpi_base_template_id="%s"') %
(
service.get('_key', None),
kpi.get('_key', None),
service.get('base_service_template_id', None),
kpi.get('base_service_template_id', None)
)
)
# make sure that none of the service template linked KPI search attribute
# is changed, in update request
for attr in SEARCH_AND_CALCULATE_ATTRIBUTES:
if kpi.get(attr, '') != persisted_service_kpi.get(attr, ''):
self.raise_error_bad_validation(
logger, ('Invalid update request. Cannot update search attributes for KPI '
'linked to service template. To update KPI search attributes, '
'update the corresponding KPI in service template and then '
'push out changes to linked services. service_id="%s", kpi_id="%s", '
'service_template_id="%s", attribute_update_requested="%s"') % (
service.get('_key'), kpi.get('_key'),
kpi.get('base_service_template_id'), attr
)
)
# if 'linked_kpi_thresholds_updated' is already true in persisted kpi, don't reset it
if not persisted_service_kpi.get('linked_kpi_thresholds_updated', False):
for field in THRESHOLDS_ATTRIBUTES:
if kpi.get(field, '') != persisted_service_kpi.get(field, ''):
kpi['linked_kpi_thresholds_updated'] = True
break
else:
kpi['linked_kpi_thresholds_updated'] = persisted_service_kpi.get(
'linked_kpi_thresholds_updated'
)
# raise error, if new kpi is found in service, which is linked to service template.
# addition of new kpi to service which is linked to service template, could only be
# done by adding that kpi to service template and then pushing out changes to linked
# services.
elif kpi.get('base_service_template_id', ''):
self.raise_error_bad_validation(
logger, ('Invalid update request. Cannot add a new KPI with a link to service '
'template, through service update request. To add a new KPI with a link to '
'service template, add the KPI to the service template, then push out the '
'changes to linked services. service_id="%s", kpi_id="%s"') %
(service.get('_key', None), kpi.get('_key', None))
)
def do_additional_setup(
self, owner, objects, req_source='unknown', method=CRUDMethodTypes.METHOD_UPSERT, transaction_id=None,
skip_local_failure=False, **kwargs
):
"""
Any additional setup that is required to be done.
@type owner: basestring
@param owner: request owner. "nobody" or some username.
@type objects: list
@param objects: List of service type objects
@type req_source: basestring
@param req_source: Source requesting this operation.
@type method: basestring
@param method: operation type. Defaults to upsert.
@type transaction_id: basestring
@param transaction_id: transaction id for end-end tracing.
@type skip_kpi_consistancy_check: boolean
@param skip_kpi_consistancy_check: Ignore KPI Consistancy validation and dictionary key validation checks
"""
def _normalize_enabled_flag_to_binary(input_):
false_things = [0, '0', 'false']
if input_ in false_things:
return 0
else:
return 1
if not utils.is_valid_list(objects):
self.raise_error_bad_validation(
logger, 'Invalid type for service(s). Expected list, received {}.'.format(type(objects).__name__))
# Ignore KPI validation and cleanup in service objects when saving service objects with internal scripts
if not kwargs.get("skip_kpi_consistancy_check", False):
# fetch all persisted services to be used for service template processing and validation of kpis
fields_to_fetch = ['_key', 'kpis._key']
if method != CRUDMethodTypes.METHOD_CREATE:
fields_to_fetch.extend(
['base_service_template_id', 'kpis.base_service_template_id']
+ [('kpis.' + attr) for attr in SEARCH_AND_CALCULATE_ATTRIBUTES + THRESHOLDS_ATTRIBUTES]
)
if not self.is_kpi_delete:
persisted_services = self.fetch_persisted_services(
owner, req_source=req_source, fields=fields_to_fetch, transaction_id=transaction_id
)
else:
# Performance optimization to skip service update step as the KPI is being removed
persisted_services = []
# update service with service template content, if service has to be created from service template.
# if the request comes from link service template endpoint, skip this step
if not self.skip_service_template_update:
self.update_services_with_base_service_templates(objects, persisted_services, owner, req_source=req_source,
transaction_id=transaction_id)
self.validate_kpis(owner, objects, persisted_services, method=method, transaction_id=transaction_id)
self.cleanup_kpis(objects, method)
for svc in objects:
if not utils.is_valid_str(svc.get('_key')):
svc['_key'] = ITOAInterfaceUtils.generate_backend_key()
# If the flag is not passed in, enable the service
svc['enabled'] = _normalize_enabled_flag_to_binary(svc.get('enabled', 1))
# If the flag is not passed in, enable the
# is_healthscore_calculate_by_entity on service object
svc['is_healthscore_calculate_by_entity_enabled'] = _normalize_enabled_flag_to_binary(
svc.get('is_healthscore_calculate_by_entity_enabled', 1))
kpis = svc.get('kpis', [])
# Assume no shkpi exists for given service. We'll generate one down below
shkpi_found = False
# Generate KPI searches afresh
for kpi in kpis:
if kpi['_key'].startswith(self.SHKPI_STARTS_WITH):
shkpi_found = True
# don't update backfill attributes while regenerating SHKPI.
kpi.update(
ITOAInterfaceUtils.update_shkpi_dict(
svc['_key'], backfill_enabled=kpi.get('backfill_enabled', False))
) # Always update shkpi
kpi['backfill_earliest_time'] = kpi.get('backfill_earliest_time', '-7d').strip().replace(' ', '')
continue # Do not populate searches for shkpi
# validate time policies prior to filling searches
ItsiKpiTemplate(self.session_key, self.current_user_name).validate_kpi_time_policies(kpi)
# Populate regular kpi object with search strings.
ItsiKpi(self.session_key, 'nobody').populate(
kpi, svc.get('entity_rules', []),
svc.get('_key'), svc.get('title'),
svc.get('enabled'), svc.get('sec_grp')
)
# End for loop...
# Create a SHKPI if none is present.
if not shkpi_found:
kpi = ITOAInterfaceUtils.generate_shkpi_dict(svc['_key'])
# if SHKPI is not found (create service case), then user can enable health score
# backfill, by passing below backfill attributes in service object.
if 'health_score_backfill_enabled' in svc:
kpi['backfill_enabled'] = svc['health_score_backfill_enabled']
kpi['backfill_earliest_time'] = svc.get(
'health_score_backfill_earliest_time', '-7d').strip().replace(' ', '')
svc.pop('health_score_backfill_enabled', None)
svc.pop('health_score_backfill_earliest_time', None)
kpis.append(copy.deepcopy(kpi))
del kpi
svc['kpis'] = kpis # ... in case, svc object had no kpis initially.
# Clean up entity rules
self.cleanup_entity_rules(svc)
# Service payloads can bloat up significantly after generating searches.
# If the resultant payload size exceeds allowable document size in KV store,
# save operation on the service will fail causing stray saved searches
# and saved search refresh jobs if those operations occur before the save.
# As a precaution, perform a size limit
# check after size bloat and before saved searches and refresh jobs are created
self.storage_interface.check_payload_size(self.session_key, objects)
def cleanup_entity_rules(self, service):
"""
Clean up the all the things from entity rules which are not needed.
Specifically, clean up the id attribute from entity rule terms. If present, it causes
backbone destroy call to fail, while deleting an entity rule from UI. As, it assumes
EntityRuleModel has a persistent store associated to it, when id is present.
For more info, see ITOA-11342.
@type service: dict
@param service: service object
@return:
"""
if not isinstance(service.get('entity_rules'), list):
service['entity_rules'] = []
entity_rules = service.get('entity_rules')
# Remove the 'id' attribute from the entity_rules in the service object if present
for or_rule_term in entity_rules:
if not isinstance(or_rule_term, dict):
self.raise_error_bad_validation(
logger,
'Invalid type of OR entity rule term specified for service %s. Expected JSON, received %s.'
% (service.get('title'), type(or_rule_term).__name__)
)
or_rule_term.pop('id', None)
for and_rule_term in or_rule_term.get('rule_items', []):
if not isinstance(and_rule_term, dict):
self.raise_error_bad_validation(
logger,
('Invalid type of AND entity rule term specified for service %s. '
'Expected JSON, received %s.') % (service.get('title'), type(and_rule_term).__name__)
)
and_rule_term.pop('id', None)
def cleanup_kpis(self, services, method=CRUDMethodTypes.METHOD_UPSERT):
"""
Given a list of services, cleanup the KPIs of all things that are not
needed.
Primarily, search strings in time_variate_thresholds_specification &
aggregate_thresholds
@type services: list
@param services: List of service type objects
@type method: basestring
@param method: operation type. Defaults to upsert.
@rtype: None
@return: Nothing
"""
if method not in (CRUDMethodTypes.METHOD_UPSERT, CRUDMethodTypes.METHOD_CREATE, CRUDMethodTypes.METHOD_UPDATE):
return
# Assume that services is a valid list and so are kpis.
# Assume that each kpi is a valid dictionary.
# We will also trust that a developer has the sanity to validate data prior to calling us.
for service in services:
kpis = service.get('kpis', [])
for kpi in kpis:
if kpi.get('search_type', '') != 'shared_base':
# cleanup KPIs if they have base search specfic keys for adhoc/datamodel type
keys = ('base_search_metric', 'base_search_id')
for k in keys:
kpi.pop(k, None)
keys = ('aggregate_thresholds', 'entity_thresholds')
for k in keys:
blob = kpi.get(k, {})
if 'search' in blob:
blob['search'] = ''
# clear search strings from policies defined as part of time variate thresholds
threshold_policies = kpi.get('time_variate_thresholds_specification', {}).get('policies', {})
for value in threshold_policies.values():
for k in keys:
blob = value.get(k, {})
if 'search' in blob:
blob['search'] = ''
# cleanup service template specific attributes, if service or kpi is not linked to template
if not service.get('base_service_template_id'):
service_template_keys = ('base_service_template_id', 'linked_kpi_thresholds_updated')
for key in service_template_keys:
kpi.pop(key, None)
elif not kpi.get('base_service_template_id'):
kpi.pop('linked_kpi_thresholds_updated', None)
def validate_kpis(self, owner, services, persisted_services, method=CRUDMethodTypes.METHOD_UPSERT,
transaction_id=None, for_base_service_template=False):
"""
Validates KPIs for passed in services. Attempts to perform basic validations to prevent KPI
from breaking the searches. Extensive checks for each field could be added as the need is found
in favor of saving on performance of service saves.
@type owner: string
@param owner: owner context for KV store operations
@type services: list of dict
@param services: Services JSON list containing KPIs to validate
@type persisted_services: list of dict
@param persisted_services: list of persisted services in kvstore
@type method: basestring
@param method: CRUD method type
@type transaction_id: basestring
@param transaction_id: transaction_id for end-end tracing
@type for_base_service_template: bool
@param for_base_service_template: True, if validating kpis for Base Service Template.
Else, False.
@return: None, raises exceptions on invalid KPIs
"""
if for_base_service_template:
object_type = 'base service template'
else:
object_type = 'service'
# Accumulate map of service to KPI keys for existing services to validate KPI
# key uniqueness below
kpi_keys_map = {}
kpi_keys_dict = {} # Used to quickly detect KPI key duplication within passed in services
kpi_title_list = []
itsi_kpi = ItsiKpi(self.session_key, self.current_user_name)
for service in services:
if not isinstance(service, dict):
self.raise_error_bad_validation(
logger, 'Invalid type for {0}. Expected JSON, received {1}.'
.format(object_type, type(service).__name__))
kpis = service.get('kpis', [])
if not isinstance(kpis, list):
self.raise_error_bad_validation(
logger, 'Invalid type for KPIs. Expected list, received %s.' % type(kpis).__name__)
# Per KPI validations
count_shkpi = 0
for kpi in kpis:
if not isinstance(kpi, dict):
self.raise_error_bad_validation(
logger, 'Invalid type for KPI. Expected JSON, received %s.' % type(kpi).__name__)
kpi_key = kpi.get('_key')
if not utils.is_valid_str(kpi_key):
self.raise_error_bad_validation(logger, 'Missing key. KPI must have a "_key" populated.')
if kpi_key.startswith(self.SHKPI_STARTS_WITH):
count_shkpi += 1
continue # Skip remaining validations for SH-KPI
service_key = service.get('_key', '') # Accumulate KPIs in new service with key
if utils.is_valid_str(service_key):
if service_key not in kpi_keys_map:
kpi_keys_map[service_key] = []
if for_base_service_template and kpi_key in kpi_keys_map[service_key]:
self.raise_error_bad_validation(logger, ('KPI keys are not unique within the same'
' {0}. KPIs must have unique keys.'
' Duplicate _key = {1}').format(object_type, kpi_key))
kpi_keys_map[service_key].append(kpi_key)
if kpi.get('search_type') not in ['adhoc', 'datamodel', 'shared_base', 'metric']:
isadhoc = kpi.get('isadhoc', True) # Assume an adhoc search
if isadhoc:
kpi['search_type'] = 'adhoc'
else:
kpi['search_type'] = 'datamodel'
# For Base Service Template object, do not need to validate kpi key uniqueness across templates and
# validate characters in key, as base service templates kpis don't generate any searches.
if not for_base_service_template:
if kpi_key in kpi_keys_dict:
self.raise_error_bad_validation(
logger, 'KPI keys are not unique. KPIs must have unique keys. Eg. _key: ' + kpi_key)
else:
kpi_keys_dict[kpi_key] = kpi.get('_kpi_method', CRUDMethodTypes.METHOD_UPSERT)
# Guard against usage of problematic characters in KPI key primarily for usage in search:
# Dots
# Commas
# Whitespaces
# Pipes
# Parenthesis
# Square brackets
# Single and double quotes
# Equal sign
# Backslash
# Prevent use of special terms like service_aggregate and N/A
regex_invalid_key_characters = re.compile('service_aggregate|N\\\\A|[=.,"\'()\\[\\]\\s\\|\\\\]+')
if re.search(regex_invalid_key_characters, kpi_key):
self.raise_error_bad_validation(
logger,
('Invalid key specified for KPI, Eg. {}. '
'Key cannot contain special characters not supported by SPL. '
'Key could also not be reserved words like service_aggregate or N/A.').format(kpi_key)
)
# Perform basic validation and value setting for all the KPIs
itsi_kpi.validate_kpi_basic_structure(kpi, for_base_service_template)
# Title has already been validated.
# Check for duplicated kpi titles within the same service
kpi_title = kpi.get('title').strip().lower()
if kpi_title in kpi_title_list:
self.raise_error_bad_validation(
logger, 'Duplicated KPI title within the same {}.'.format(object_type))
kpi_title_list.append(kpi_title)
is_service_entity_filter = kpi.get('is_service_entity_filter', False)
if kpi.get('search_type') == 'datamodel':
try:
datamodel_spec = kpi.get('datamodel', {})
ItsiKpiSearches.get_datamodel_context(self.session_key,
'nobody',
datamodel_spec.get('field'),
datamodel_spec.get('datamodel'),
datamodel_object_name=datamodel_spec.get('object'))
except ItoaDatamodelContextError as err:
self.raise_error_bad_validation(logger,
('Invalid data model specification in {0} "{1}" for KPI "{'
'2}". Data model info: {3}. Error: {4}').format(
object_type,
service.get('title', ''),
kpi_title,
json.dumps(datamodel_spec), err)
)
service_entity_rules = service.get('entity_rules', [])
if ((service_entity_rules is None) or (len(service_entity_rules) == 0)) and is_service_entity_filter:
self.raise_error_bad_validation(logger,
('Cannot filter on entities in {0} "{1}" for KPI "{2}" if there '
'are no entities in the {3}.').format(object_type,
service.get('title', ''),
kpi_title, object_type),
400,
'SI-SRVC-CFG_0001',
{
'object_type': object_type,
'service_title': service.get('title', ''),
'kpi_title': kpi_title
}
)
if is_service_entity_filter and not utils.is_valid_str(kpi.get('entity_id_fields')):
self.raise_error_bad_validation(
logger,
('Requires a valid entity alias mapping to be set to generate searches when filtering on '
'entities in {}.').format(object_type)
)
if kpi.get('is_entity_breakdown', False):
entity_breakdown_id_fields = kpi.get('entity_breakdown_id_fields')
if entity_breakdown_id_fields is not None and len(entity_breakdown_id_fields) > 0 \
and not utils.is_valid_str(entity_breakdown_id_fields):
self.raise_error_bad_validation(
logger,
'Requires a valid entity breakdown id field in order to split KPI by entities.'
)
# Validate anomaly detection settings
def validate_ad_settings(madAlgorithmType):
enable_attribute = 'anomaly_detection_is_enabled' if madAlgorithmType == 'trending' \
else 'cohesive_anomaly_detection_is_enabled'
settings_attribute = 'trending_ad' if madAlgorithmType == 'trending' else 'cohesive_ad'
if kpi.get(enable_attribute):
ad_settings = kpi.get(settings_attribute, {})
if not isinstance(ad_settings, dict):
self.raise_error_bad_validation(
logger,
'Invalid %s anomaly detection settings specified.' % madAlgorithmType
)
kpi[settings_attribute] = ad_settings
sensitivity = ad_settings.get('sensitivity')
if sensitivity is None:
# Set default if not set
sensitivity = 8 # default in MAD for unspecified sensitivity
kpi[settings_attribute] = {'sensitivity': sensitivity}
elif not isinstance(sensitivity, int):
self.raise_error_bad_validation(
logger,
'Invalid %s anomaly detection sensitivity specified, must be integer.'
% madAlgorithmType
)
validate_ad_settings('trending')
validate_ad_settings('cohesive')
kpi_title_list[:] = []
# Validate that there is only one service health KPI
if count_shkpi > 1:
self.raise_error_bad_validation(
logger, 'Invalid Service Health KPI count. Expecting 1. Found {0}.'.format(count_shkpi))
# validation in below section is not needed for base service templates, since service templates don't have
# health KPIs associated with them.
if not for_base_service_template:
# Validate if kpi keys are unique across services.
# This check is expensive but this check is important when introducing new KPIs
# into the system. Without a guarantee for KPI key uniqueness, health calculations in the
# ITSI KPI searches would be incorrect.
# Batching check for all KPIs for optimization
# Note that duplication in KPI keys within passed in services is already done above
# KV store does not support lookup on fields starting with _ in a reliable way in nested objects
# Owing to this, we will load all KPI ids and do a manual compare here
# Query existing services for duplication of KPI keys
# Also if in create only mode make sure KPI key being created
# does not already exist
for service_key in kpi_keys_map:
for kpi_key in kpi_keys_map[service_key]:
kpi_found = False
for persisted_service in persisted_services:
for persisted_kpi in persisted_service.get('kpis', []):
if persisted_kpi['_key'] == kpi_key:
# On create, any existing KPI with same key is a duplicate
duplicate_found = kpi_keys_dict[kpi_key] == CRUDMethodTypes.METHOD_CREATE
# On update/upsert, KPIs with same key in other services are duplicates
if kpi_keys_dict[kpi_key] != CRUDMethodTypes.METHOD_CREATE:
if persisted_service['_key'] == service_key:
kpi_found = True
else:
duplicate_found = True
if duplicate_found:
self.raise_error_bad_validation(logger,
('KPI keys are not unique. KPIs must have unique'
' keys. Eg: existing service duplicating key for'
' KPIs that you are trying to save are: ')
+ persisted_service.get('_key', '')
)
# On update, if specified KPI does not exist in current service,
# that is also an error condition
if (kpi_keys_dict[kpi_key] == CRUDMethodTypes.METHOD_UPDATE) and (not kpi_found):
self.raise_error(
logger, 'KPI keys specified for update do not exist. Example: ' + kpi_key, 404)
# Remove the book keeping fields added to KPIs. List of such keys:
# _kpi_method
for service in services:
for kpi in service.get('kpis', []):
if '_kpi_method' in kpi:
del kpi['_kpi_method']
def _validate_service_dependencies(self, services, method):
"""
Method to validate service dependencies for a bunch of services
1. They must exist.
2. a) They must exist in the same security group of the service, or
b) They must belong to a security group thats a parent of the security group of the service if this service
depends on the other
@type services: JSON list of dict
@param services: the service objects for which dependency config is being checked
@type method: basestring
@param method: operation type
@rtype: list of dicts
@return: list of service objects
"""
def _extract_service_ids(dependency_list):
if not isinstance(dependency_list, list):
return []
dependency_service_ids = []
for service_dependency in dependency_list:
if not isinstance(service_dependency, dict):
continue
dependency_service_id = service_dependency.get('serviceid')
if dependency_service_id is None:
# Ignore
continue
dependency_service_ids.append(dependency_service_id)
return dependency_service_ids
all_service_ids = []
all_sec_grp_ids = []
service_dependencies_map = {}
remove_depending_on_me = {}
# Assume services json is valid
# First collect all service and security groups info needed across all input services
# to prevent multiple kv store lookups
for i, service in enumerate(services):
services_depending_on_ids = _extract_service_ids(service.get('services_depends_on'))
services_depending_on_me_ids = _extract_service_ids(service.get('services_depending_on_me'))
all_service_ids.extend(services_depending_on_ids + services_depending_on_me_ids)
all_sec_grp_ids.append(service.get('sec_grp'))
service_dependencies_map[service.get('_key')] = {
'services_depending_on_ids': services_depending_on_ids,
'services_depending_on_me_ids': services_depending_on_me_ids
}
if len(all_service_ids) < 1:
# No dependencies, done here!
return services, {}
# Lookup all services once across the list of services for dependent service info
# Does not support defining dependencies within active set to prevent potential complex
# issues from misconfiguration
service_ids_filter = self.get_filter_data_for_keys(object_ids=all_service_ids)
dependency_services = self.storage_interface.get_all(self.session_key, 'nobody', self.object_type,
filter_data=service_ids_filter,
fields=['_key', 'sec_grp', 'title'])
if not isinstance(dependency_services, list):
if isinstance(dependency_services, dict):
dependency_services = [dependency_services]
else:
# Cant fail here since stale info is ok in services_depending_on_me
# Validate this below
dependency_services = []
# Lookup all security groups across list of services for sec grp info
sec_grp_instance = ItsiSecGrp(self.session_key, self.current_user_name)
service_sec_grps_map = sec_grp_instance.get_inheritance_info(all_sec_grp_ids)
if not (isinstance(service_sec_grps_map, dict) and len(service_sec_grps_map) > 0):
self.raise_error(logger, 'Some or all services are being configured with invalid teams.')
# create a map for updated services
all_updated_services = {}
for service in services:
all_updated_services[service.get('_key')] = service
for i, service in enumerate(services):
# Go through the list of updated services and check if dependency violates security group membership rules
# Note: If the dependency service itself is in the update list, the new sec_grp field is used
# instead of the original one in kvstore
services_depending_on_map = {}
services_depending_on_me_map = {}
services_depending_on_ids = service_dependencies_map[service.get('_key')]['services_depending_on_ids']
services_depending_on_me_ids = service_dependencies_map[service.get('_key')]['services_depending_on_me_ids']
original_service = self.storage_interface.get(self.session_key,
'nobody',
self.object_type,
service.get('_key'))
for dependency_service in dependency_services:
if dependency_service.get('_key') in services_depending_on_ids:
services_depending_on_map[dependency_service['_key']] = dependency_service
# replace the security group from kvstore with security group from update
if dependency_service.get('_key') in all_updated_services:
if 'sec_grp' in all_updated_services[dependency_service['_key']]:
services_depending_on_map[dependency_service['_key']]['sec_grp'] = \
all_updated_services[dependency_service['_key']]['sec_grp']
if dependency_service.get('_key') in services_depending_on_me_ids:
services_depending_on_me_map[dependency_service['_key']] = dependency_service
# replace the security group from kvstore with security group from update
if dependency_service.get('_key') in all_updated_services:
if 'sec_grp' in all_updated_services[dependency_service['_key']]:
services_depending_on_me_map[dependency_service['_key']]['sec_grp'] = \
all_updated_services[dependency_service['_key']]['sec_grp']
# All needed info is collected at this point, start validations
if len(services_depending_on_map) != len(services_depending_on_ids):
# self.raise_error(logger, 'Some or all dependencies being configured could not be found.')
logger.warning('Some or all dependencies being configured could not be found.')
service_sec_grp = service_sec_grps_map.get(service.get('sec_grp'))
if not isinstance(service_sec_grp, dict):
self.raise_error(
logger,
'Team is incorrectly configured for the service.')
# First process services that this service depends on
for dependency_service_id, dependency_service in services_depending_on_map.items():
if service.get('sec_grp') == dependency_service.get('sec_grp', GLOBAL_SECURITY_GROUP_CONFIG.get('key')) \
or dependency_service.get('sec_grp') == GLOBAL_SECURITY_GROUP_CONFIG.get('key'):
continue
sec_grp_violation_error_msg = ('Dependency being configured violates team membership rules. '
'Services could only depend on services from other teams if '
'the depending service is in the parent hierarchy of the service\'s '
'team. Violating dependent service is %s, for the service %s.') % \
(dependency_service.get('title', dependency_service_id), service.get('title'))
break_dependency_msg = ('Service dependency was broken by this action. Service %s '
'no longer depends on service %s.') % \
(service.get('title'), dependency_service.get('title', dependency_service_id))
if not service_sec_grp['has_parents'] or \
not any(str(parent['_key']) == str(dependency_service.get('sec_grp'))
for parent in service_sec_grp['parents']):
# handle the case that sec_grp gets updated
if method == CRUDMethodTypes.METHOD_UPDATE and service.get('sec_grp') \
!= original_service.get('sec_grp'):
logger.warning(break_dependency_msg)
temp_service_depends_on = []
for service_dependency in service['services_depends_on']:
if not isinstance(service_dependency, dict) or \
service_dependency.get('serviceid') != dependency_service_id:
temp_service_depends_on.append(service_dependency)
services[i]['services_depends_on'] = temp_service_depends_on
else:
self.raise_error(logger, sec_grp_violation_error_msg)
# Now process services that depend on this service, for the sake of REST API
for dependency_service_id, dependency_service in services_depending_on_me_map.items():
if service.get('sec_grp') == dependency_service.get('sec_grp', GLOBAL_SECURITY_GROUP_CONFIG.get('key')) \
or service.get('sec_grp') == GLOBAL_SECURITY_GROUP_CONFIG.get('key'):
continue
sec_grp_violation_error_msg = ('Dependency being configured violates team membership rules. '
'Services could only depend on services from other teams if '
'the depending service is in the parent hierarchy of the'
' service\'s team. Violating '
'dependent service is %s, for the service %s.') % \
(service.get('title'), dependency_service.get('title', dependency_service_id))
break_dependency_msg = ('Service dependency was broken by this action. Service %s '
'no longer depends on service %s.') % \
(dependency_service.get('title', dependency_service_id), service.get('title'))
if not service_sec_grp['has_children'] \
or not any(str(child['_key']) == str(dependency_service.get('sec_grp'))
for child in service_sec_grp['children']):
# handle the case that sec_grp gets updated
if method == CRUDMethodTypes.METHOD_UPDATE and service.get('sec_grp') \
!= original_service.get('sec_grp'):
logger.warning(break_dependency_msg)
temp_service_depending_on_me = []
for service_dependency in service['services_depending_on_me']:
if not isinstance(service_dependency, dict) or \
service_dependency.get('serviceid') != dependency_service_id:
temp_service_depending_on_me.append(service_dependency)
services[i]['services_depending_on_me'] = temp_service_depending_on_me
if service.get('_key') not in remove_depending_on_me:
remove_depending_on_me[service.get('_key')] = [dependency_service_id]
else:
remove_depending_on_me[service.get('_key')].append(dependency_service_id)
else:
self.raise_error(logger, sec_grp_violation_error_msg)
return services, remove_depending_on_me
def fetch_kpis_via_script(self, owner, service_kpis_raw, transaction_id=None):
"""
Used by kvstore_to_json.py to get KPIs from existing services
@type owner: string
@param owner: owner context for KV store collection
@type service_kpis_raw: list of dicts
@param service_kpis_raw: list of dictionary specifying which KPIs to get from which services
Expected format: [{_key: <service key>, kpis: [{_key: <KPI key>}]]
@type transaction_id: string
@param transaction_id: Transaction ID
@rtype: list of dictionaries
@return: list of dictionary items with service key and KPIs requested for that service
"""
service_kpis = self.extract_json_data(service_kpis_raw)
if (service_kpis is not None) and (not utils.is_valid_list(service_kpis)):
self.raise_error_bad_validation(
logger,
('To get KPIs, pass in a list of service keys with their KPI keys. '
'Expected format: [{_key: <service key>, kpis: [{_key: <KPI key>}]]. '
'Invalid input received.')
)
services_filter = {} # By default get all services and their KPIs
service_kpis_map = {}
if service_kpis:
services_filter = {'$or': []}
for service in service_kpis:
service_key = service.get('_key')
kpi_keys = [
kpi['_key'] for kpi in service.get('kpis', []) if utils.is_valid_str(kpi.get('_key'))
]
if service_key in service_kpis_map:
self.raise_error_bad_validation(
logger,
('To get KPIs, pass in a list of service keys with their KPI keys. '
'Expected format: [{_key: <service key>, kpis: [{_key: <KPI key>}]]. '
'Duplicate service key entries received.')
)
elif not utils.is_valid_str(service_key):
self.raise_error_bad_validation(
logger,
('To get KPIs, pass in a list of service keys with their KPI keys. '
'Expected format: [{_key: <service key>, kpis: [{_key: <KPI key>}]]. '
'Invalid input received.')
)
service_kpis_map[service_key] = kpi_keys
service_filter = {'_key': service_key}
if kpi_keys:
# Get the subset of KPIs requested
service_filter = {
'$and': [
service_filter,
{
"$or": [{"kpis._key": kpi_key} for kpi_key in kpi_keys]
}
]
}
# else get all KPIs for the service
services_filter['$or'].append(service_filter)
services = self.get_bulk(owner, filter_data=services_filter, fields=['_key', 'title', 'kpis'],
transaction_id=transaction_id)
for service in services:
# As a guard if KV store returned the KPIs not requested owing to _key being an internal field,
# remove them here
# Also remove service health KPI since this API is meant for CRUD on KPIs directly and
# we should not allow users to modify service health KPI
requested_kpis = service_kpis_map.get(service['_key'], [])
service['kpis'] = [
kpi for kpi in service.get('kpis', [])
if (not kpi['_key'].startswith(self.SHKPI_STARTS_WITH))
and ((not requested_kpis) or (kpi['_key'] in requested_kpis))
]
return services
def change_kpis_via_script(self, owner, service_kpis_raw, transaction_id=None):
"""
Used by kvstore_to_json.py to directly create new/update existing KPIs on services
@type owner: string
@param owner: owner context for KV store collection
@type service_kpis_raw: list of dicts
@param service_kpis_raw: list of dictionary specifying which KPIs to update for what services
Expected format: [{_key: <service key>, kpis: [{_key: <KPI key>, <rest of KPI structure>}]]
@type transaction_id: string
@param transaction_id: Transaction ID
@rtype: list of strings
@return: list of service keys for services that got updated
"""
service_kpis = self.extract_json_data(service_kpis_raw)
if (not utils.is_valid_list(service_kpis)) or (len(service_kpis) < 1):
self.raise_error_bad_validation(
logger,
('To change KPIs, pass in a non-empty list of service keys with their KPIs list. '
'Expected format: [{_key: <service key>, kpis: [{_key: <KPI key>, <rest of KPI structure>}]]. '
'Invalid input received.') + str(service_kpis)
)
# First get the existing service configuration
service_keys = []
service_kpis_map = {}
services_filter = {'$or': []}
for service in service_kpis:
service_key = service.get('_key')
if not utils.is_valid_str(service_key):
self.raise_error_bad_validation(
logger,
('To change KPIs, pass in a non-empty list of service keys with their KPIs list. '
'Expected format: [{_key: <service key>, kpis: [{_key: <KPI key>, <rest of KPI structure>}]]. '
'Invalid service keys received.')
)
service_keys.append(service_key)
services_filter['$or'].append({'_key': service_key})
if service_key in service_kpis_map:
self.raise_error_bad_validation(
logger,
('To change KPIs, pass in a non-empty list of service keys with their KPIs list. '
'Expected format: [{_key: <service key>, kpis: [{_key: <KPI key>, <rest of KPI structure>}]]. '
'Duplicate service entries (keys) received. Check services specified.')
)
service_kpis_map[service_key] = service.get('kpis', [])
if len(service_keys) < 1:
self.raise_error_bad_validation(
logger,
('To change KPIs, pass in a non-empty list of service keys with their KPIs list. '
'Expected format: [{_key: <service key>, kpis: [{_key: <KPI key>, <rest of KPI structure>}]]. '
'No service keys received.')
)
services = self.get_bulk(owner, filter_data=services_filter, transaction_id=transaction_id)
if not (utils.is_valid_list(services) and (len(services) == len(service_keys))):
self.raise_error(
logger,
'A service key specified does not exist. Cannot change KPIs for non-existing services.',
404
)
# Merge the KPIs to update with the existing KPIs in the services
# Add as new if not found
for service in services:
kpis_to_update = service_kpis_map[service['_key']]
if not (utils.is_valid_list(kpis_to_update) and (len(kpis_to_update) > 0)):
self.raise_error_bad_validation(
logger,
('To change KPIs, pass in a non-empty list of service keys with their KPIs list. '
'Expected format: [{_key: <service key>, kpis: [{_key: <KPI key>, <rest of KPI structure>}]]. '
'No KPIs received for some services. Check the KPIs.')
)
merged_kpis = []
changed_kpi_keys = set()
# First add all KPIs passed in for this service
# New ones and updated ones
for kpi_to_update in kpis_to_update:
kpi_key = kpi_to_update.get('_key', '')
if not utils.is_valid_str(kpi_key):
kpi_to_update['_key'] = ITOAInterfaceUtils.generate_backend_key()
kpi_key = kpi_to_update['_key']
if kpi_key.startswith(self.SHKPI_STARTS_WITH):
self.raise_error_bad_validation(
logger,
'Cannot change service health type KPIs. Remove service health KPIs and retry.'
)
merged_kpis.append(kpi_to_update)
changed_kpi_keys.add(kpi_key)
# Now go over all existing KPIs in the service that have not been merged and add them
for existing_kpi in service.get('kpis', []):
if existing_kpi['_key'] not in changed_kpi_keys:
merged_kpis.append(existing_kpi)
service['kpis'] = merged_kpis
# Now save the updated services
# Note that the KPIs get validated in save_batch, so skip validations here
return self.save_batch(
owner,
services,
method=CRUDMethodTypes.METHOD_UPDATE, # Services are always updated for KPI changes
validate_names=False, # Skip validating service titles here,
transaction_id=transaction_id
)
def delete_kpis_via_script(self, owner, service_kpis_raw, transaction_id=None):
"""
Used by kvstore_to_json.py to delete KPIs from existing services
@type owner: string
@param owner: owner context for KV store collection
@type service_kpis_raw: list of dicts
@param service_kpis_raw: list of dictionary specifying which KPIs to get from which services
Expected format: [{_key: <service key>, kpis: [{_key: <KPI key>}]]
@type transaction_id: string
@param transaction_id: Transaction ID
@rtype: list of strings
@return: list of services from which KPIs were deleted
"""
service_kpis = self.extract_json_data(service_kpis_raw)
if not (utils.is_valid_list(service_kpis) and (len(service_kpis) > 0)):
self.raise_error_bad_validation(
logger,
('To delete KPIs, pass in a list of service keys with the keys for the KPIs to delete. '
'Expected format: [{_key: <service key>, kpis: [{_key: <KPI key>}]]. '
'Invalid input received.')
)
services_filter = {'$or': []}
service_delete_kpis_map = {}
for service in service_kpis:
service_key = service.get('_key')
kpi_keys = []
for kpi in service.get('kpis', []):
if kpi["_key"].startswith(self.SHKPI_STARTS_WITH):
self.raise_error_bad_validation(
logger,
'Cannot change service health type KPIs. Remove service health KPIs and retry.'
)
else:
kpi_keys.append(kpi["_key"])
# We dont want users to accidentally delete all KPIs from a service
# Hence limit deletes to only specified KPI keys
if not (utils.is_valid_str(service_key) and utils.is_valid_list(kpi_keys) and (len(kpi_keys) > 0)):
self.raise_error_bad_validation(
logger,
('To delete KPIs, pass in a list of service keys with the keys for the KPIs to delete. '
'Expected format: [{_key: <service key>, kpis: [{_key: <KPI key>}]]. '
'Specify at least one KPI per service.')
)
if service_key in service_delete_kpis_map:
self.raise_error_bad_validation(
logger,
('To delete KPIs, pass in a list of service keys with the keys for the KPIs to delete. '
'Expected format: [{_key: <service key>, kpis: [{_key: <KPI key>}]]. '
'Duplicate service key received. Must specify all KPIs for a service in one entry.')
)
service_delete_kpis_map[service_key] = kpi_keys
service_filter = {'_key': service_key}
services_filter['$or'].append(service_filter)
services = self.get_bulk(owner, filter_data=services_filter, transaction_id=transaction_id)
for service in services:
# Remove any KPIs requested to be deleted from the service if found
# Always retain service health KPIs
service['kpis'] = [
kpi for kpi in service.get('kpis', [])
if (kpi['_key'] not in service_delete_kpis_map[service['_key']])
]
# Skip validating service titles here
return self.save_batch(owner, services, validate_names=False, transaction_id=transaction_id)
def create_kpi(self, owner, object_id, kpi_object):
"""
See ItoaObject.create() for parameters and return values
"""
target_service = kpi_object.get("service_id")
if not target_service:
self.raise_error(logger, "service_id is missing from KPI object.", 400)
existing_service = self.get(owner, target_service)
if not existing_service:
self.raise_error(logger, "Service (%s) not found" % target_service, 404)
kpi_object["_key"] = object_id
new_service_kpis = {
"kpis": existing_service["kpis"],
}
new_service_kpis["kpis"].append(kpi_object)
self.update(owner, target_service, new_service_kpis, is_partial_data=True)
return {"_key": object_id}
def get_kpi(self, owner, object_id, req_source='unknown', transaction_id=None):
"""
See ItoaObject.get() for parameters and return values
"""
services = self.get_bulk(owner, filter_data={"kpis._key": object_id})
if not services:
self.raise_error(logger, "KPI (%s) not found" % object_id, 404)
for kpi in services[0]["kpis"]:
if kpi["_key"] == object_id:
modified_kpi = kpi
break
else:
self.raise_error(logger, "KPI (%s) not found (anomalous case)" % object_id, 404)
return convert_kpi_to_modified_kpi(services[0], modified_kpi)
def get_bulk_kpi(self, owner, filter_data=None, fields=None, transaction_id=None, keys=[]):
"""
This function fetches ALL KPIs matching a filter.
WARNING: filter_data is unreliable and should not be used unless you know what you're doing. We have to
hard-code handling as we can't rely on MongoDB to do the correct filtering. KPIs are not a real object, and
they're many-to-one with the KVstore object we *can* interact with. A filtered-fetch may return 1 service and
all the KPIs underneath, but MongoDB can't identify *which* KPIs match.
Fields will be converted to KPI-equivalents (as the field names don't literally exist in the KVstore). Also,
"services_depending_on_me" is a valid field, despite not existing in the KPI object itself.
As a heads-up, the KVStore considers this to be the proper sort order for keys:
008a2024-170a-4dbd-b4d4-1263ab31f676
SHKPI-2d7f4da2-4ed3-47fd-99f0-2a77c4f57983
ff8a2024-170a-4dbd-b4d4-1263ab31f676
:param owner: Owner of objects
:type owner: string
:param filter_data: JSON filter constructed to filter data (MongoDB syntax). See WARNING!
:type filter_data: dict
:param fields: Comma-separated fields to retrieve, fetches all fields if not specified
:type fields: string
:param transaction_id: Transaction ID to associate call with
:type transaction_id: string
:param keys: List of keys (used in lieu of filter_data)
:type keys: list
:return: KPIs matching filter
:rtype: list of dictionaries
"""
if fields:
actual_fields = ["kpis._key", "services_depending_on_me"] # Required fields
for field in fields.split(","):
if field in EXTERNAL_FIELDS:
actual_fields.append(field)
else:
actual_fields.append("kpis.%s" % field)
else:
actual_fields = None
if not filter_data:
if keys:
kvstore_filter = {"$or": [{"kpis._key": key} for key in keys]}
else:
kvstore_filter = {}
else:
kvstore_filter = convert_filter_to_kpi_filter(filter_data)
"""
Note that sorting isn't passed here. Sorting doesn't matter because it's always going to be wrong. We just need
to avoid the MongoDB sorting error ("Sort operation used more than the maximum 33554432 bytes of RAM.") which is
dodged by not using `sort_key` and `sort_dir`. The accelerated fields in `collections.conf` don't seem to work
for nested values.
"""
services = self.get_bulk(owner, filter_data=kvstore_filter, fields=actual_fields, skip=0, limit=0,
transaction_id=transaction_id)
results = []
for service in services:
for kpi in service.get("kpis", []):
if keys and kpi["_key"] not in keys:
continue
results.append(convert_kpi_to_modified_kpi(service, kpi))
if filter_data:
# This is slower than the native DB filtering, but we can't do much about that
results = utils.apply_filter_to_results(results, logger, filter_data=filter_data)
return results
def get_bulk_service_tags(
self, owner, sort_key=None, sort_dir=None, filter_data=None, fields=None, skip=None, limit=None,
req_source='unknown', transaction_id=None, keys=[],
):
"""
This function provides ItoaObject.get_bulk()-like functionality for service tags.
WARNING: filter_data is unreliable and should not be used unless you know what you're doing. We have to
hard-code handling as we can't rely on MongoDB to do the correct filtering. KPIs are not a real object, and
they're many-to-one with the KVstore object we *can* interact with. A filtered-fetch may return 1 service and
all the KPIs underneath, but MongoDB can't identify *which* KPIs match.
See ItoaObject.get_bulk() for most parameters and return values
:param filter_data: See WARNING
:type filter_data: dict
:param keys: List of keys (used in lieu of filter_data)
:type keys: list
"""
actual_fields = []
if fields is None:
actual_fields = ["service_tags"]
else:
for field in fields.split(","):
actual_fields.append(field)
"""
Note that sorting isn't passed here. Sorting doesn't matter because it's always going to be wrong. We just need
to avoid the MongoDB sorting error ("Sort operation used more than the maximum 33554432 bytes of RAM.") which is
dodged by not using `sort_key` and `sort_dir`. The accelerated fields in `collections.conf` don't seem to work
for nested values.
"""
unique_service_tags = set()
# First filter the data based on the KVStore records. There could still be irrelevant data coming in the results as
# one search result can contain multiple entries for service_tags and only of the entry from the list would match
# in the list
services = self.get_bulk(owner, filter_data=filter_data, fields=actual_fields, skip=0, limit=0)
# Get only the unique service_tags from the list of service
for service in services:
service_tags = service.get("service_tags", {})
# Handle the case ("service_tags": null) coming from vmware content pack
if service_tags is not None:
for tag in service_tags:
unique_service_tags.update(service_tags[tag])
unique_service_tags_map = [ { "service_tags": tag} for tag in unique_service_tags]
if filter_data:
def replace_key(d, old_key, new_key):
"""
Recursively find the old key and replace it with the new key
"""
if d:
d = d.copy()
for k, v in d.items():
if k == old_key:
d[new_key] = d.pop(old_key)
elif isinstance( v, dict):
d[k] = replace_key(v, old_key, new_key)
elif isinstance( v, list):
new_list = []
for item in v:
new_list.append(replace_key(item, old_key, new_key))
d[k] = new_list
return d
# Need to do the following as we are having an internal implementation of sorting which does not
# support "."
updated_filter = replace_key(filter_data, "service_tags.template_tags", "service_tags")
updated_filter = replace_key(updated_filter, "service_tags.tags", "service_tags")
unique_service_tags_map = utils.apply_filter_to_results(unique_service_tags_map, logger, filter_data=updated_filter)
list_service_tags = [unique_service_tag['service_tags'] for unique_service_tag in unique_service_tags_map]
count = len(list_service_tags)
result = { "entry" : sorted(list_service_tags), "paging" : { "total" : count, "perPage": count, "offset": 0} }
return result
def update_kpi(self, owner, object_id, data, is_partial_data=True, dupname_tag=None, transaction_id=None):
"""
This is equivalent to itsi_kpi.py:update() in all but name and location. This function cannot exist in
itsi_kpi.py due to pre-existing import structure.
See ItoaObject.update() for parameters and return values
"""
existing_kpi = self.get_kpi(owner, object_id, transaction_id=transaction_id)
data["_key"] = object_id
# Workaround for bug in service update
data.pop("search_alert", None)
patch_data = {
"kpis": [data],
}
self.update(owner, existing_kpi["service_id"], patch_data, is_partial_data=True, transaction_id=transaction_id)
return {"_key": object_id}
def delete_kpi(self, owner, object_id, req_source='unknown', transaction_id=None):
"""
This is equivalent to itsi_kpi.py:delete() in all but name and location. This function cannot exist in
itsi_kpi.py due to pre-existing import structure.
* Deleting a Service Health KPI is not allowed.
* Deleting a KPI is conceptually equivalent to updating a Service.
* Deleting a KPI cannot delete the last KPI or a Service
See ItoaObject.delete() for parameters and return values.
"""
if object_id.startswith(self.SHKPI_STARTS_WITH):
self.raise_error(logger, "Deleting Service Health KPIs is forbidden.", 403)
services = self.get_bulk(owner, filter_data={"kpis._key": object_id})
# Deleting non-existent objects doesn't fail
if len(services) == 0:
return
elif len(services) > 1:
self.raise_error(logger, "Ambiguous KPI reference (%s)" % object_id, 500)
target_service = services[0]
patch_data = {
"_marked_for_delete": {
"kpis": [{
"_key": object_id,
}],
},
}
try:
self.is_kpi_delete = True
self.update(owner, target_service["_key"], patch_data, is_partial_data=True, transaction_id=transaction_id)
finally:
self.is_kpi_delete = False
return
def delete_bulk_kpi(self, owner, filter_data=None, req_source='unknown', transaction_id=None, keys=[]):
"""
WARNING: filter_data is unreliable and should not be used unless you know what you're doing. We have to
hard-code handling as we can't rely on MongoDB to do the correct filtering. KPIs are not a real object, and
they're many-to-one with the KVstore object we *can* interact with. A filtered-fetch may return 1 service and
all the KPIs underneath, but MongoDB can't identify *which* KPIs match.
This is equivalent to itsi_kpi.py:delete_bulk() in all but name and location. This function cannot exist in
itsi_kpi.py due to pre-existing import structure.
* Deleting a Service Health KPI is not allowed.
* Deleting a KPI is conceptually equivalent to updating a Service.
* Deleting a KPI cannot delete the last KPI or a Service
* This function can partially succeed, while returning an error to the user.
See ItoaObject.delete_bulk() for most parameters and return values
:param filter_data: See WARNING
:type filter_data: dict
:param keys: List of keys (used in lieu of filter_data)
:type keys: list
"""
if not keys:
filter = {}
else:
filter = {"$or": [{"kpis._key": key} for key in keys]}
services = self.get_bulk(owner, filter_data=filter, req_source=req_source, transaction_id=transaction_id)
key_set = set(keys)
for service in services:
# Area for future performance improvement
for kpi in service["kpis"]:
if kpi["_key"].startswith(self.SHKPI_STARTS_WITH):
continue
elif not key_set:
self.delete_kpi(owner, kpi["_key"])
elif kpi["_key"] in key_set:
self.delete_kpi(owner, kpi["_key"])
return
def update(self, owner, service_id, data, is_partial_data=False, dupname_tag=None, transaction_id=None):
"""
Wrapper to itoa_object interface's update method, to set class variable to skip service template
update or not, then call itoa_object interface's update method and then unset the variable.
@type owner: string
@param owner: user who is performing this operation
@type service_id: string
@param service_id: id of object to update
@type data: string
@param data: object to update
@type is_partial_data: bool
@param is_partial_data: indicates if payload passed into data is a subset of object structure
when True, payload passed into data is a subset of object structure
when False, payload passed into data is the entire object structure
Note that KV store API does not support partial updates
@rtype: string
@return: id of object updated on success, throws exceptions on errors
"""
results = super(ItsiService, self).update(owner, service_id, data, is_partial_data=is_partial_data,
dupname_tag=dupname_tag, transaction_id=transaction_id)
self.skip_service_template_update = False
return results
def save_batch(self, owner, data_list, validate_names, dupname_tag=None, req_source='unknown',
ignore_refresh_impacted_objects=False, method=CRUDMethodTypes.METHOD_UPSERT,
is_partial_data=False, transaction_id=None, skip_local_failure=False,
**kwargs):
"""
Wrapper to itoa_object interface's save_batch method, to set class variable to skip service template
update or not, then call itoa_object interface's save_batch method and then unset the variable.
@type owner: string
@param owner: user who is performing this operation
@type data_list: list
@param data_list: list of objects to upsert
@type validate_names: bool
@param validate_names: validate_names is a means for search commands and csv load to by pass
perf hit from name validation in scenarios they can safely skip
@type req_source: string
@param req_source: string identifying source of this request
@type is_partial_data: bool
@param is_partial_data: indicates if payload passed into each entry in data_list is a subset of object structure
when True, payload passed into data is a subset of object structure
when False, payload passed into data is the entire object structure
Note that KV store API does not support partial updates
This argument only applies to update entries since on create, entire payload is a MUST
@rtype: list of strings
@return: ids of objects upserted on success, throws exceptions on errors
"""
result_ids = super(ItsiService, self) \
.save_batch(owner, data_list, validate_names, dupname_tag=dupname_tag,
req_source=req_source, ignore_refresh_impacted_objects=ignore_refresh_impacted_objects,
method=method, is_partial_data=is_partial_data, transaction_id=transaction_id,
skip_local_failure=skip_local_failure, **kwargs)
self.skip_service_template_update = False
return result_ids
def save_batch_via_publish(self, owner, data_list, validate_names, dupname_tag=None, req_source='unknown',
ignore_refresh_impacted_objects=False, method=CRUDMethodTypes.METHOD_CREATE,
is_partial_data=False, transaction_id=None, skip_local_failure=False):
"""
Wrapper to itoa_object interface's save_batch method, to set class variable to skip service template
update or not, then call itoa_object interface's save_batch method and then unset the variable.
Note: This method to be used only for Sandbox service publish operation.
@type owner: string
@param owner: user who is performing this operation
@type data_list: list
@param data_list: list of objects to upsert
@type validate_names: bool
@param validate_names: validate_names is a means for search commands and csv load to by pass
perf hit from name validation in scenarios they can safely skip
@type req_source: string
@param req_source: string identifying source of this request
@type is_partial_data: bool
@param is_partial_data: indicates if payload passed into each entry in data_list is a subset of object structure
when True, payload passed into data is a subset of object structure
when False, payload passed into data is the entire object structure
Note that KV store API does not support partial updates
This argument only applies to update entries since on create, entire payload is a MUST
@rtype: list of strings
@return: ids of objects upserted on success, throws exceptions on errors
"""
try:
# Set the marker to say that the publish flow is in progress
self.is_publish_flow = True
result_ids = super(ItsiService, self) \
.save_batch(owner, data_list, validate_names, dupname_tag=dupname_tag,
req_source=req_source, ignore_refresh_impacted_objects=ignore_refresh_impacted_objects,
method=method, is_partial_data=is_partial_data, transaction_id=transaction_id,
skip_local_failure=skip_local_failure)
self.skip_service_template_update = False
except Exception as e:
raise e
finally:
self.is_publish_flow = False
return result_ids