You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2546 lines
101 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import ITOA.itoa_common as utils
from ITOA.itoa_factory import instantiate_object
from itsi.constants import (
DEFAULT_POLICY_KEY,
KT_DEFAULT_POLICY,
ET_DEFAULT_POLICY as DEFAULT_POLICY,
THRESHOLD_CONFIGURATION_OPTIONS,
DEFAULT_SEVERITY_LEVELS,
DEFAULT_THRESHOLDS
)
from itsi.itsi_time_block_utils import ItsiTimeBlockUtils
from abc import ABC
import itsi_py3
import os
import json
import uuid
from copy import deepcopy
from urllib.parse import quote_plus
import time
import http.client
import urllib.parse
from ITOA.setup_logging import logger
import splunk
import splunk.rest as rest
from splunk.entity import setEntity, getEntity, controlEntity, buildEndpoint, deleteEntity, Entity
from splunk.clilib import cli_common as cli
from splunk.clilib.bundle_paths import make_splunkhome_path
from splunk.util import normalizeBoolean
from ITOA.storage.itoa_storage import ITOAStorage
from ITOA.itoa_common import (
is_valid_dict, post_splunk_user_message, normalize_num_field, is_valid_list,
get_current_utc_epoch, ItoaBase, is_feature_enabled, update_conf_stanza
)
from ITOA.version_check import VersionCheck
from .constants import current_itsi_app_version
import SA_ITOA_app_common.splunklib.client as client
from ITOA.saved_search_utility import SavedSearch
# Global Variable Definitions
# The capability values defined here must match the ones exposed in authorize.conf
CAPABILITY_MATRIX = {
'backup_restore': {
'read': 'read_itsi_backup_restore',
'write': 'write_itsi_backup_restore',
'delete': 'delete_itsi_backup_restore'
},
'base_service_template': {
'read': 'read_itsi_base_service_template',
'write': 'write_itsi_base_service_template',
'delete': 'delete_itsi_base_service_template'
},
'content_pack': {
'read': 'read_itsi_content_pack_authorship',
'write': 'write_itsi_content_pack_authorship',
'delete': 'delete_itsi_content_pack_authorship'
},
'content_pack_file_download': {
'read': 'read_itsi_content_pack_authorship',
'write': 'write_itsi_content_pack_authorship',
'delete': 'delete_itsi_content_pack_authorship'
},
'drift_detection_template': {
'read': 'read_itsi_drift_detection_template',
'delete': 'delete_itsi_drift_detection_template',
},
'custom_threshold_windows': {
'read': 'read_itsi_custom_threshold_windows',
'write': 'write_itsi_custom_threshold_windows',
'delete': 'delete_itsi_custom_threshold_windows'
},
'deep_dive': {
'read': 'read_itsi_deep_dive',
'write': 'write_itsi_deep_dive',
'delete': 'delete_itsi_deep_dive',
'interact': 'interact_with_itsi_deep_dive'
},
'deep_dive_context': {
'read': 'read_itsi_deep_dive_context',
'write': 'write_itsi_deep_dive_context',
'delete': 'delete_itsi_deep_dive_context',
'interact': 'interact_with_itsi_deep_dive_context'
},
'duplicate_entities_job_queue': {
'read': 'read_itsi_duplicate_entities_management',
'write': 'write_itsi_duplicate_entities_management',
'delete': 'delete_itsi_duplicate_entities_management',
},
'duplicate_aliases_cache': {
'read': 'read_itsi_duplicate_entities_management',
'write': 'write_itsi_duplicate_entities_management',
'delete': 'delete_itsi_duplicate_entities_management',
},
'entity': { # subsumed by service capabilities
'read': 'read_itsi_service',
'write': 'write_itsi_service',
'delete': 'delete_itsi_service'
},
'entity_type': { # subsumed by service capabilities
'read': 'read_itsi_service',
'write': 'write_itsi_service',
'delete': 'delete_itsi_service'
},
'entity_filter_rule': {
'read': 'read_itsi_service',
'write': 'write_itsi_service',
'delete': 'delete_itsi_service'
},
'entity_relationship': {
'read': 'read_itsi_service',
'write': 'write_itsi_service',
'delete': 'delete_itsi_service'
},
'entity_relationship_rule': {
'read': 'read_itsi_service',
'write': 'write_itsi_service',
'delete': 'delete_itsi_service'
},
'entity_management_policies': {
'read': 'read_itsi_entity_management_policies',
'write': 'write_itsi_entity_management_policies',
'delete': 'delete_itsi_entity_management_policies'
},
'entity_management_rules': {
'read': 'read_itsi_entity_management_policies',
'write': 'write_itsi_entity_management_policies',
'delete': 'delete_itsi_entity_management_policies'
},
'entity_discovery_searches': {
'read': 'read_itsi_entity_discovery_searches'
},
'event_management_state': {
'read': 'read_itsi_event_management_state',
'write': 'write_itsi_event_management_state',
'delete': 'delete_itsi_event_management_state',
'interact': 'interact_with_itsi_event_management_state'
},
'files': {
'read': 'read_itsi_backup_restore',
'write': 'write_itsi_backup_restore',
'delete': 'delete_itsi_backup_restore'
},
'glass_table': {
'read': 'read_itsi_glass_table',
'write': 'write_itsi_glass_table',
'delete': 'delete_itsi_glass_table',
'interact': 'interact_with_itsi_glass_table'
},
'home_view': {
'read': 'read_itsi_homeview',
'write': 'write_itsi_homeview',
'delete': 'delete_itsi_homeview',
'interact': 'interact_with_itsi_homeview'
},
'info': {
'read': 'read_itsi_backup_restore',
'write': 'write_itsi_backup_restore',
'delete': 'delete_itsi_backup_restore'
},
'kpi': { # subsumed by service capabilities
'read': 'read_itsi_service',
'write': 'write_itsi_service',
'delete': 'delete_itsi_service'
},
'kpi_at_info': {
'read': 'read_itsi_kpi_at_info',
'write': 'write_itsi_kpi_at_info',
'delete': 'delete_itsi_kpi_at_info'
},
'at_incremental_values': {
'read': 'read_itsi_at_incremental_values',
'write': 'write_itsi_at_incremental_values',
'delete': 'delete_itsi_at_incremental_values'
},
'kpi_base_search': { # subsumed by service capabilities
'read': 'read_itsi_kpi_base_search',
'write': 'write_itsi_kpi_base_search',
'delete': 'delete_itsi_kpi_base_search'
},
'kpi_entity_threshold': {
'read': 'read_itsi_kpi_entity_threshold',
'write': 'write_itsi_kpi_entity_threshold',
'delete': 'delete_itsi_kpi_entity_threshold'
},
'kpi_template': { # subsumed by service capabilities
'read': 'read_itsi_service',
'write': 'write_itsi_service',
'delete': 'delete_itsi_service'
},
'kpi_threshold_template': {
'read': 'read_itsi_kpi_threshold_template',
'write': 'write_itsi_kpi_threshold_template',
'delete': 'delete_itsi_kpi_threshold_template'
},
'migration': {
'read': 'read_itsi_backup_restore',
'write': 'write_itsi_backup_restore',
'delete': 'delete_itsi_backup_restore'
},
'rbac': {
'read': 'configure_perms',
'write': 'configure_perms',
'delete': 'configure_perms'
},
'refresh_queue_job': {
'read': 'read_itsi_refresh_queue_job',
'write': 'write_itsi_refresh_queue_job',
'delete': 'delete_itsi_refresh_queue_job',
},
'saved_page': None,
'service': {
'read': 'read_itsi_service',
'write': 'write_itsi_service',
'delete': 'delete_itsi_service'
},
'team': {
'read': 'read_itsi_team',
'write': 'write_itsi_team',
'delete': 'delete_itsi_team'
},
'temporary_kpi': {
'read': 'read_itsi_temporary_kpi',
'write': 'write_itsi_temporary_kpi',
'delete': 'delete_itsi_temporary_kpi'
},
'upgrade_readiness_prechecks': {
'read': 'read_itsi_upgrade_readiness_prechecks',
'write': 'write_itsi_upgrade_readiness_prechecks',
'delete': 'delete_itsi_upgrade_readiness_prechecks'
},
'sandbox': {
'read': 'read_itsi_sandbox',
'write': 'write_itsi_sandbox',
'delete': 'delete_itsi_sandbox'
},
'sandbox_service': {
'read': 'read_itsi_sandbox_service',
'write': 'write_itsi_sandbox_service',
'delete': 'delete_itsi_sandbox_service'
},
'sandbox_sync_log': {
'read': 'read_itsi_sandbox_sync_log',
'write': 'write_itsi_sandbox_sync_log',
'delete': 'delete_itsi_sandbox_sync_log'
},
'admin_console': {
'read': 'read_itsi_admin_console',
'write': 'write_itsi_admin_console'
}
}
BACKUP_COLLECTION_MATRIX = {
'backup_restore': 'itsi_backup_restore_queue',
'service': 'itsi_services',
'base_service_template': 'itsi_base_service_template',
'content_pack': 'itsi_content_pack_authorship',
'correlation_search': 'itsi_correlation_search',
'custom_threshold_windows': 'itsi_custom_threshold_windows',
'deep_dive': 'itsi_pages',
'entity': 'itsi_services',
'entity_discovery_searches': 'itsi_entity_discovery_search',
'entity_management_policies': 'itsi_entity_management_policies',
'entity_management_rules': 'itsi_entity_management_policies',
'entity_relationship': 'itsi_entity_relationships',
'entity_relationship_rule': 'itsi_entity_relationship_rules',
'event_management_state': 'itsi_event_management',
'external_ticket': 'itsi_notable_event_ticketing',
'glass_table': 'itsi_pages',
'glass_table_icons': 'SA-ITOA_icon_collection',
'glass_table_images': 'SA-ITOA_files',
'home_view': 'itsi_service_analyzer',
'kpi': 'itsi_services',
'kpi_base_search': 'itsi_services',
'kpi_template': 'itsi_services',
'kpi_threshold_template': 'itsi_services',
'migration': 'itsi_migration',
'notable_aggregation_policy': 'itsi_notable_event_aggregation_policy',
'notable_event_email_template': 'itsi_notable_event_email_template',
'notable_event_group': 'itsi_notable_group_user',
'notable_event_ref_url': 'itsi_notable_event_ref_url',
'notable_group_system': 'itsi_notable_group_system',
'sandbox': 'itsi_sandbox',
'sandbox_service': 'itsi_sandbox_service',
'sandbox_sync_log': 'itsi_sandbox_sync_log',
'saved_page': 'itsi_services',
'team': 'itsi_team',
'upgrade_readiness_prechecks': 'itsi_upgrade_readiness_prechecks'
}
OBJECT_COLLECTION_MATRIX = {
'backup_restore': 'itsi_backup_restore_queue',
'base_service_template': 'itsi_base_service_template',
'content_pack': 'itsi_content_pack_authorship',
'custom_threshold_windows': 'itsi_custom_threshold_windows',
'deep_dive': 'itsi_pages', # itsi_pages
'entity': 'itsi_services',
'entity_relationship': 'itsi_entity_relationships',
'entity_relationship_rule': 'itsi_entity_relationship_rules',
'entity_management_policies': 'itsi_entity_management_policies',
'entity_management_rules': 'itsi_entity_management_policies',
'event_management_state': 'itsi_event_management',
'glass_table': 'itsi_pages',
'home_view': 'itsi_service_analyzer', # itsi_service_analyzer
'kpi': 'itsi_services',
'kpi_at_info': 'itsi_kpi_at_info',
'kpi_base_search': 'itsi_services',
'kpi_entity_threshold': 'itsi_entity_thresholds',
'kpi_template': 'itsi_services',
'kpi_threshold_template': 'itsi_services',
'migration': 'itsi_migration', # itsi_migration
'saved_page': 'itsi_services',
'service': 'itsi_services', # itsi_service collection
'sandbox': 'itsi_sandbox',
'sandbox_service': 'itsi_sandbox_service',
'team': 'itsi_team',
'upgrade_readiness_prechecks': 'itsi_upgrade_readiness_prechecks'
}
SECURABLE_OBJECT_LIST = [
'base_service_template',
'custom_threshold_windows',
'entity',
'entity_management_policies',
'entity_management_rules',
'kpi_base_search',
'kpi_template',
'kpi_threshold_template',
'service'
]
# List of securable object_types that can be contained only inside of the Global Security Group
GLOBAL_ONLY_SECURABLE_OBJECT_LIST = [
'base_service_template',
'custom_threshold_windows',
'entity',
'entity_management_policies',
'entity_management_rules',
'kpi_base_search',
'kpi_template',
'kpi_threshold_template'
]
GLOBAL_SECURITY_GROUP_CONFIG = {
'key': 'default_itsi_security_group',
'title': 'Global'
}
SECURABLE_OBJECT_SERVICE_CONTENT_KEY = {
'base_service_template': 'linked_services'
}
DEFAULT_SCHEDULED_BACKUP_KEY = 'ItsiDefaultScheduledBackup'
# default team settings import is handled by import_team_settings method
BLOCK_LIST = ['notable_event_review_security_group', 'default_itsi_security_group']
# objects which will be mutable after import
MUTABLE_OBJECT_LIST = ['entity_type', 'duplicate_entities_cache']
# Ignore update from configuration if the record already present
OBJECT_TO_IGNORE_FOR_UPDATE_FROM_CONF = ['sandbox']
# unsupported characters
ILLEGAL_CHARACTERS = ['=', '$', '^']
# mod_source default value
ITSI_DEFAULT_IMPORT = 'ITSI default import'
# offest the batchat searches
OFFSET_MINUTES = 30
class ITOAInterfaceUtils(object):
'''
Utility methods for appserver/controllers/itoa_interface.py
'''
KV_STORE_COLLECTION_URI = '/servicesNS/nobody/SA-ITOA/storage/collections/data/itsi_services'
KV_STORE_NEW_MIGRATION_COLLECTION_URI = '/servicesNS/nobody/SA-ITOA/storage/collections/data/itsi_migration'
KV_STORE_ITSI_FEATURE_COLLECTION_COLLECTION_URI = '/servicesNS/nobody/SA-ITOA/storage/collections/data/itsi_features'
KV_STORE_NEW_MIGRATION_STATUS_COLLECTION_URI = '/servicesNS/nobody/SA-ITOA/storage/collections/data/itsi_migration_status' # noqa
RELOAD_WEBUI_URI = '/services/server/control/restart_webui_polite'
def check_if_duplicates(list_of_elements):
"""
Check if given list contains any duplicates
"""
return len(list_of_elements) != len(set(list_of_elements))
@staticmethod
def fetch_shkpi_id(service_obj):
"""
for a given service_obj, fetch the shkpi _key
service_obj = {
'title': '',
...
'kpis': [
{
<kpi fields>
'_key': 'SHKPI....'
}
]
}
"""
if (not isinstance(service_obj, dict)) or (service_obj is None) or (len(service_obj) == 0):
return None
kpis = service_obj.get('kpis', [])
# no kpis exist, lets add a SHKPI to this service
if len(kpis) == 0:
shkpi_dict = ITOAInterfaceUtils.generate_shkpi_dict(service_obj.get('_key'))
if shkpi_dict:
service_obj['kpis'] = [shkpi_dict]
for kpi in kpis:
kpi_key = kpi.get('_key', '')
if kpi_key.startswith('SHKPI'):
return kpi_key
return None
@staticmethod
def generate_backend_key():
"""
Generate a random UUID for a service health KPI or transaction ID (both are of the same format)
@return: Key
@rtype: string
"""
return str(uuid.uuid4())
@staticmethod
def validate_thresholds(thresholds_container, thresholds_key):
"""
Validate that the thresholds are in the correct format
(container fields should be containers, number fields should be numbers)
NOTE: It does not check that max is above min, or any other value checking
@param thresholds_container: The container for the thresholds
@type thresholds_container: dict
@param thresholds_key: The key indicating the threshold to examine
@type thresholds_key: string
"""
if not is_valid_dict(thresholds_container):
# Ignore validation for incorrect containers
return
if ((thresholds_key not in thresholds_container)
or (not is_valid_dict(thresholds_container[thresholds_key]))):
thresholds_container[thresholds_key] = {}
thresholds = thresholds_container[thresholds_key]
for field_name in ['isMaxStatic', 'isMinStatic']:
if ((field_name not in thresholds)
or (not isinstance(thresholds[field_name], bool))):
thresholds[field_name] = False
def validate_num_field(field_name, container):
normalize_num_field(container, field_name, numclass=float)
validate_num_field('baseSeverityValue', thresholds)
if ('thresholdLevels' not in thresholds) or (not is_valid_list(thresholds['thresholdLevels'])):
thresholds['thresholdLevels'] = []
for threshold_level in thresholds['thresholdLevels']:
validate_num_field('dynamicParam', threshold_level)
validate_num_field('thresholdValue', threshold_level)
validate_num_field('severityValue', threshold_level)
@staticmethod
def validate_aggregate_thresholds(aggregate_thresholds_container):
# Assume aggregate_thresholds_container has already been validated for valid dict
ITOAInterfaceUtils.validate_thresholds(aggregate_thresholds_container, 'aggregate_thresholds')
@staticmethod
def validate_entity_thresholds(entity_thresholds_container):
# Assume entity_thresholds_container has already been validated for valid dict
ITOAInterfaceUtils.validate_thresholds(entity_thresholds_container, 'entity_thresholds')
@staticmethod
def update_shkpi_dict(service_backend_key, backfill_enabled=False):
"""
Do not regenerated complete Service Health KPI. Backfill option
for service health KPI is now configurable by user, so for example
'backfill_enabled' attribute will not always be False. Keep such
attributes untouched during regeneration of SHKPI.
In future, extend this method when new configurable attributes
are added for SHKPI.
@param service_backend_key: service key
@type service_backend_key: basestring
@param backfill_enabled: backfill is enabled or not
@type backfill_enabled: bool
@return: SHKPI object
"""
shkpi = ITOAInterfaceUtils.generate_shkpi_dict(service_backend_key)
if backfill_enabled:
shkpi.pop('backfill_enabled', None)
shkpi.pop('backfill_earliest_time', None)
return shkpi
@staticmethod
def generate_shkpi_dict(service_backend_key):
'''
every service that is created; needs a service health kpi by default...
this is nothing but a static dict...
'''
if any([
not isinstance(service_backend_key, itsi_py3.string_type),
isinstance(service_backend_key, itsi_py3.string_type) and not service_backend_key.strip()
]):
return None
service_id = str(service_backend_key)
shkpi_key = 'SHKPI-' + str(service_backend_key)
return {
"title": "ServiceHealthScore",
"threshold_eval": "",
"alert_on": "both",
"datamodel": {
"datamodel": "",
"object": "",
"owner_field": "",
"field": ""
},
"unit": "",
"gap_severity_value": "-1",
"search_aggregate": (
"`get_full_itsi_summary_service_health_events({0})`"
" | stats latest(health_score) AS aggregate"
).format(service_id),
"fill_gaps": "null_value",
"search_alert_earliest": "15",
"kpi_template_kpi_id": "",
"type": "service_health",
"_owner": "nobody",
"adaptive_thresholds_is_enabled": False,
"source": "",
"urgency": "11",
"anomaly_detection_is_enabled": False,
"cohesive_anomaly_detection_is_enabled": False,
"target": "",
"time_variate_thresholds_specification": {
"policies": {
"default_policy": {
"policy_type": "static",
"title": "Default",
"time_blocks": [],
"aggregate_thresholds": {
"thresholdLevels": [],
"gaugeMax": 100,
"gaugeMin": 0,
"baseSeverityLabel": "info",
"metricField": "count",
"search": "",
"renderBoundaryMin": 0,
"baseSeverityValue": 1,
"baseSeverityColor": "#AED3E5",
"isMaxStatic": False,
"isMinStatic": True,
"baseSeverityColorLight": "#E3F0F6",
"renderBoundaryMax": 100
},
"entity_thresholds": {
"thresholdLevels": [],
"gaugeMax": 100,
"gaugeMin": 0,
"baseSeverityLabel": "info",
"metricField": "count",
"search": "",
"renderBoundaryMin": 0,
"baseSeverityValue": 1,
"baseSeverityColor": "#AED3E5",
"isMaxStatic": False,
"isMinStatic": True,
"baseSeverityColorLight": "#E3F0F6",
"renderBoundaryMax": 100
}
}
}
},
"threshold_field": "aggregate",
"aggregate_eval": "",
"description": "",
"search_buckets": "",
"is_service_entity_filter": False,
"aggregate_statop": "avg",
"backfill_enabled": False,
"alert_eval": "",
"entity_statop": "avg",
"aggregate_thresholds": {
"thresholdLevels": [
{
"thresholdValue": 0,
"severityLabel": "critical",
"severityValue": 6,
"severityColor": "#B50101",
"severityColorLight": "#E5A6A6"
},
{
"thresholdValue": 20,
"severityLabel": "high",
"severityValue": 5,
"severityColor": "#F26A35",
"severityColorLight": "#FBCBB9"
},
{
"thresholdValue": 40,
"severityLabel": "medium",
"severityValue": 4,
"severityColor": "#FCB64E",
"severityColorLight": "#FEE6C1"
},
{
"thresholdValue": 60,
"severityLabel": "low",
"severityValue": 3,
"severityColor": "#FFE98C",
"severityColorLight": "#FFF4C5"
},
{
"thresholdValue": 80,
"severityLabel": "normal",
"severityValue": 2,
"severityColor": "#99D18B",
"severityColorLight": "#DCEFD7"
}
],
"gaugeMax": 100,
"isMaxStatic": False,
"baseSeverityLabel": "normal",
"metricField": "count",
"search": "",
"renderBoundaryMin": 0,
"baseSeverityValue": 2,
"baseSeverityColor": "#99D18B",
"gaugeMin": 0,
"isMinStatic": True,
"baseSeverityColorLight": "#DCEFD7",
"renderBoundaryMax": 100
},
"anomaly_detection_training_window": "-7d",
"entity_thresholds": {
"thresholdLevels": [
{
"thresholdValue": 0,
"severityLabel": "critical",
"severityValue": 6,
"severityColor": "#B50101",
"severityColorLight": "#E5A6A6"
},
{
"thresholdValue": 20,
"severityLabel": "high",
"severityValue": 5,
"severityColor": "#F26A35",
"severityColorLight": "#FBCBB9"
},
{
"thresholdValue": 40,
"severityLabel": "medium",
"severityValue": 4,
"severityColor": "#FCB64E",
"severityColorLight": "#FEE6C1"
},
{
"thresholdValue": 60,
"severityLabel": "low",
"severityValue": 3,
"severityColor": "#FFE98C",
"severityColorLight": "#FFF4C5"
},
{
"thresholdValue": 80,
"severityLabel": "normal",
"severityValue": 2,
"severityColor": "#99D18B",
"severityColorLight": "#DCEFD7"
}
],
"gaugeMax": 100,
"isMaxStatic": False,
"baseSeverityLabel": "normal",
"metricField": "count",
"search": "",
"renderBoundaryMin": 0,
"baseSeverityValue": 2,
"baseSeverityColor": "#99D18B",
"gaugeMin": 0,
"isMinStatic": True,
"baseSeverityColorLight": "#DCEFD7",
"renderBoundaryMax": 100
},
"datamodel_filter": [],
"alert_lag": "30",
"kpi_base_search": "",
"base_search": (
"`get_full_itsi_summary_service_health_events({0})`"
).format(service_id),
"anomaly_detection_sensitivity": 0.999,
"search_time_series_aggregate": (
"`get_full_itsi_summary_service_health_events({0})`"
" | timechart avg(health_score) AS aggregate"
).format(service_id),
"tz_offset": None,
"is_entity_breakdown": False,
"search_time_series": (
"`get_full_itsi_summary_service_health_events({0})`"
" | timechart avg(health_score) AS aggregate"
).format(service_id),
"search_alert": "",
"search": (
"`get_full_itsi_summary_service_health_events({0})`"
" | stats latest(health_score) AS aggregate"
).format(service_id),
"time_variate_thresholds": False,
"search_alert_entities": "",
"anomaly_detection_alerting_enabled": False,
"adaptive_thresholding_training_window": "-7d",
"gap_severity_color": "#CCCCCC",
"entity_id_fields": "",
"entity_breakdown_id_fields": "",
"alert_period": "1",
"gap_severity": "unknown",
"gap_severity_color_light": "#EEEEEE",
"search_time_series_entities": "",
"search_time_compare": (
'`get_full_itsi_summary_service_health_events({0})`'
' [| stats count | addinfo | eval search= "earliest=" +'
' tostring(info_min_time-(info_max_time-info_min_time))+'
' " latest=" + tostring(info_max_time)'
' |fields search] | addinfo | eval'
' bucket=if(_time<info_max_time-((info_max_time-info_min_time)/2),'
' "last_window", "current_window") | stats'
' avg(health_score) AS aggregate BY bucket | reverse | delta'
' aggregate AS window_delta | search bucket=current_window |'
' eval window_direction=if(window_delta >0, "increase",'
' if(window_delta < 0, "decrease", "none"))'
).format(service_id),
"_key": shkpi_key,
"search_occurrences": 1,
"backfill_earliest_time": "-7d",
"search_type": "adhoc"
}
@staticmethod
def generate_kpi_base_search():
return {
"title": "kpi_base_search_template",
"description": "",
"acl": {
"can_change_perms": True,
"sharing": "app",
"can_write": True,
"modifiable": True,
"can_share_app": True,
"owner": "admin",
"perms": {
"read": ["*"],
"write": ["*"]
},
"can_share_global": True,
"can_share_user": True
},
"_owner": "nobody",
"source_itsi_da": "itsi",
"base_search": "*",
"search_alert_earliest": "5",
"alert_period": "5",
"is_entity_breakdown": False,
"entity_id_fields": "host",
"entity_breakdown_id_fields": "",
"is_service_entity_filter": False,
"metrics": [],
"metric_qualifier": "",
"alert_lag": "30",
"_user": "nobody",
"object_type": "kpi_base_search",
"permissions": {
"read": True,
"user": "admin",
"group": {"read": True, "delete": True, "write": True},
"delete": True,
"write": True
},
"actions": "",
"isFirstTimeSaveDone": False}
@staticmethod
def generate_kpi_entity_threshold():
return {
"entity_key": "entity_key_template",
"entity_title": "entity_title_template",
"service_id": "service_id_template",
"kpi_id": "kpi_id_template",
"entity_thresholds": {
"baseSeverityLabel": "normal",
"baseSeverityValue": 2,
"baseSeverityColor": "#99D18B",
"baseSeverityColorLight": "#DCEFD7",
"metricField": "count",
"renderBoundaryMin": 0,
"renderBoundaryMax": 100,
"isMaxStatic": False,
"isMinStatic": True,
"gaugeMin": 0,
"gaugeMax": 100,
"thresholdLevels": []
},
"adaptive_thresholds_is_enabled": False,
"adaptive_thresholding_training_window": '-7d',
"time_variate_thresholds": False,
"time_variate_thresholds_specification": {
"policies": {
"default_policy": {
"title": "Default",
"entity_thresholds": {
"baseSeverityLabel": "normal",
"baseSeverityValue": 2,
"baseSeverityColor": "#99D18B",
"baseSeverityColorLight": "#DCEFD7",
"metricField": "count",
"renderBoundaryMin": 0,
"renderBoundaryMax": 100,
"isMaxStatic": False,
"isMinStatic": True,
"gaugeMin": 0,
"gaugeMax": 100,
"thresholdLevels": []
},
"policy_type": "static",
"time_blocks": []
}
}
},
"object_type": "kpi_entity_threshold"
}
@staticmethod
def make_array_of_strings(arr_val):
'''
Make sure that this is an array of strings
'''
if arr_val is None:
return None
if type(arr_val) is not list:
arr_val = arr_val.split(',')
# remove whitespace in them strings
arr_val = [i.strip() for i in arr_val]
return arr_val
@staticmethod
def make_dict_from_kv_string(kv_string):
'''
From a comma separated list of kv pairs, construct a hash
e.g. a=b,c=d,e=f --> {"a":"b","c":"d","e":"f"}
'''
if kv_string is None or len(kv_string) == 0:
return None
kv_array = kv_string.split(',')
kv_dict = {}
for i in kv_array:
# TODO: Now that I think about it, pair could actually
# be more than a pair. Would require some changes to
# the mapping structure
pair = i.split("=")
# Remove the leading and trailing whitespaces
if len(pair) == 1:
continue # key is equal to nothing :( sad panda
if len(pair[1]) == 0:
continue # key is equal to nothing :( sad panda
pair = [x.strip() for x in pair]
kv_dict[pair[0]] = pair[1] # For now we'll ignore anything beyond the first k=v
return kv_dict
@staticmethod
def make_dict_from_string(dict_string):
"""
@type dict_string: basestring
@param dict_string: a string
@rtype: dict[list]|None
@return: a valid dictionary from dict_string or None
"""
if not isinstance(dict_string, itsi_py3.string_type) or len(dict_string) == 0:
return None
try:
final_dict = json.loads(dict_string)
if isinstance(final_dict, dict):
return {k: ITOAInterfaceUtils.make_array_of_strings(v) for k, v in final_dict.items()}
except ValueError:
pass
return None
@staticmethod
def _validate_keys_in_json(keys_as_list, json_object):
'''
Validates if keys are present in given json_object
@param keys_as_list: List of keys to check in json_object
@param json_object: json object to verify against
@return True if valid; False if invalid
@return missing key as string
'''
for key in keys_as_list:
if key not in json_object:
return False, key
return True, ""
@staticmethod
def trim_dict(obj_as_dict, remove_fields):
'''
From a given dictionary, remove fields we dont want...
@param json_obj - dictionary to work on
@param remove_fields - list of fields to remove
@return set of fields that were removed...
'''
set_of_removed = set()
if any([
not isinstance(obj_as_dict, dict),
isinstance(obj_as_dict, dict) and len(obj_as_dict) == 0,
len(remove_fields) == 0
]):
return set_of_removed
for field in remove_fields:
removed_field = obj_as_dict.pop(field, None)
if removed_field is not None:
set_of_removed.add(removed_field)
return set_of_removed
@staticmethod
def get_splunk_host_port():
try:
# Old format below. Update fetch to parse ipv6 formats better.
# return (splunk.getDefault("host"), splunk.getDefault("port"))
#
# Returns splunkd uri, which can be in a IPv6 compatible format
# Becomes ['https://[::1]', '8089'] or ['http://127.0.0.1', '8089']
local_uri = splunk.getLocalServerInfo()
host_and_uri = local_uri.split('://')[1]
host, port = host_and_uri.rsplit(':', 1)
return (host, port)
except Exception:
return ("localhost", 8089)
@staticmethod
def service_connection(session_key, app_name):
'''
Based on the api doc, host and port has to be provided,
otherwise it will use the default port
https://docs.splunk.com/DocumentationStatic/PythonSDK/1.1/client.html
:param session_key:
:param app_name:
:return: service object
'''
(host, port) = ITOAInterfaceUtils.get_splunk_host_port()
service = client.connect(token=session_key, app=app_name, host=host, port=port)
return service
@staticmethod
def replace_append_info(json_obj, replace_fields={}, replace_fields_types={}, add_fields={}):
'''
In json_obj, replace some fields, and add some new fields....
@param json_obj: dict to work on...
@param replace_fields: represents existing/old field,
value represents new field to replace with
{'old_field':'new_field'} replace 'old_field' by 'new_field',
if 'old_field' doesnt exist, add 'new_field' to json_obj with initial value set
based on 'new_field' specification in replace_fields_types
@param replace_fields_types: types of these new fields from above....
{'new_field':str/list/dict/...}
@param add_fields = fields to add, key represents new field; value represents type of new field
{'add_this_new_field': str/list/dict/...}
@return return True if successful, False if otherwise
'''
if len(replace_fields) > 0:
if len(replace_fields_types) == 0:
return False, ('replace_fields={} needs replace_fields_types={}'
'to be valid/non-empty').format(
json.dumps(replace_fields),
json.dumps(replace_fields_types))
# replace some fields...
for field in replace_fields:
if json_obj.get(field) is not None and json_obj.get(replace_fields[field]) is None:
existing_type = type(json_obj[field]) # fetch existing type
json_obj[replace_fields[field]] = existing_type(
json_obj[field]) # create new field with same type & value
del json_obj[field] # delete existing
else:
# add this new field even if it's old nemesis doesn't exist
if json_obj.get(replace_fields[field]) is None:
json_obj[replace_fields[field]] = replace_fields_types[replace_fields[field]]()
# now add some fields if needed...
for field in add_fields:
if json_obj.get(field) is None:
json_obj[field] = add_fields[field]()
return True, ''
@staticmethod
def get_version_from_kv(session_key, hostpath=None):
'''
Collect version information from kv
@param {string} session_key: session key
@param {string} hostPath: splunkd uri
@rtype tuple
@return tuple: tuple of
{string} old version
{string} KV stanza key which old version information
'''
uri = ITOAInterfaceUtils.KV_STORE_NEW_MIGRATION_COLLECTION_URI
if hostpath:
uri = hostpath + uri
# There is issue, if we call this function in modular input too soon,
# we get 503 error which Service Unavailable
# this means that KV store has not initialized yet
# In the case of KVService, we get a 404 error
# Also, we wait in incremental delay of 5 seconds
# in case of SHC rolling restart
retry = 1
while retry <= 10:
try:
rsp, content = rest.simpleRequest(uri, sessionKey=session_key,
raiseAllErrors=False)
if rsp.status != 503 and rsp.status != 404:
break
logger.info('KV store service is unavailable. Retry %d of 10.', retry)
except splunk.ResourceNotFound:
# Catching this error case is a workaround until SPL-218406 is fixed
logger.info('KV Service resource not found. Retry %d of 10.', retry)
# Incremental delay to reduce the number of calls.
delay = retry * 5
time.sleep(delay)
retry += 1
if rsp.status != 200 and rsp.status != 201:
logger.error('Got bad status code %s - Aborting. Response %s', rsp.status, rsp)
raise Exception('Got bad status code %s - Aborting.' % rsp.status)
# Update existing schema
logger.debug('URI: %s return content: %s', ITOAInterfaceUtils.KV_STORE_COLLECTION_URI, content)
json_data = json.loads(content)
if len(json_data) == 0:
logger.info('Could not find the migration stanza. It seems to be a fresh installation.')
return None, None
else:
entry = json_data[0]
old_version = entry.get('itsi_latest_version')
key = entry.get('_key')
logger.debug('Collected version: %s from KV store, schema _key: %s.', old_version, key)
return old_version, key
@staticmethod
def update_version_to_kv(session_key, id, new_version, old_version, is_migration_done):
'''
Update version information to KV
@param {string} session_key: Splunk session key
@param {string} id: KV store schema id, if id is none then create new stanza
@param {string} new_version: new version
@param {string} old_version: old version
@param {boolean} flag for if migration_done or not
@rtype boolean (True/False)
@return flag if data is updated successfully
'''
uri = ITOAInterfaceUtils.KV_STORE_NEW_MIGRATION_COLLECTION_URI
if id:
uri = uri + '/' + id
migration_title = 'version_info_update_record_{0}'.format(int(get_current_utc_epoch()))
data = {'title': migration_title,
'itsi_latest_version': new_version,
'itsi_old_version': old_version,
'object_type': 'migration',
'is_migration_done': is_migration_done
}
rsp, content = rest.simpleRequest(
uri,
sessionKey=session_key,
raiseAllErrors=False,
jsonargs=json.dumps(data),
method='POST'
)
if rsp.status != 200 and rsp.status != 201:
logger.error('Got bad status code %s - Aborting. Response %s', rsp.status, rsp)
return False
logger.info('Successfully updated KV store with latest_version: %s, old_version: %s, is_migration_done: %s.',
new_version, old_version, is_migration_done)
return True
@staticmethod
def get_migration_status_from_kv(session_key, hostpath=None):
'''
Collect migration status information from kv
@param {string} session_key: session key
@param {string} hostPath: splunkd uri
@rtype dict
@return dict: dict of
{bool} migration status
{string} timestamp that migration starts
{bool} whether to skip local failures
{string} key of kvstore item
'''
uri = ITOAInterfaceUtils.KV_STORE_NEW_MIGRATION_STATUS_COLLECTION_URI
if hostpath:
uri = hostpath + uri
getargs = {'output_mode': 'json'}
# There is issue, if we call this function in modular input too soon,
# we get 503 error which Service Unavailable
# this means that KV store has not initialized yet
# In the case of KVService, we get a 404 error
# Also, we wait in incremental delay of 5 seconds
# in case of SHC rolling restart
retry = 1
while retry <= 10:
try:
rsp, content = rest.simpleRequest(uri, sessionKey=session_key,
raiseAllErrors=False, getargs=getargs)
if rsp.status != 503 and rsp.status != 404:
break
logger.info('KV store service is unavailable. Retry %d of 10.', retry)
except splunk.ResourceNotFound:
# Catching this error case is a workaround until SPL-218406 is fixed
logger.info('KV Service resource not found. Retry %d of 10.', retry)
# Incremental delay to reduce the number of calls.
delay = retry * 5
time.sleep(delay)
retry += 1
if rsp.status != 200 and rsp.status != 201:
logger.error('Got bad status code %s - Aborting. Response %s', rsp.status, rsp)
raise Exception('Got bad status code %s - Aborting.' % rsp.status)
# Update existing schema
logger.debug(
'URI: %s return content: %s', ITOAInterfaceUtils.KV_STORE_NEW_MIGRATION_STATUS_COLLECTION_URI, content
)
json_data = json.loads(content)
if len(json_data) == 0:
logger.debug('Could not find the migration stanza. It seems to be a fresh installation.')
return {
'is_running': False,
'start_timestamp': None,
'id_': None,
'skip_local_failure': None,
'precheck_results': None
}
else:
entry = json_data[0]
is_running = entry.get('is_running')
start_timestamp = entry.get('start_timestamp')
skip_local_failure = entry.get('skip_local_failure')
precheck_results = entry.get('precheck_results')
id_ = entry.get('_key')
logger.debug((
'Migration status: is_running %s,'
' start_timestamp %s,'
' skip_local_failure %s,'
' schema _key: %s from KV store.',
' precheck_results: %s from KV store.'
), is_running, start_timestamp, skip_local_failure, id_, precheck_results)
res = entry
res['id_'] = entry.pop('_key')
return res
@staticmethod
def append_data_to_migration_status_kv(session_key, current_status, **kwargs):
"""
Appends data to current status record of migration
@param {string} session_key: Splunk session key
@param {dict} current_status as returned by get_migration_status_from_kv()
"""
data = current_status
id_ = data.pop('id_')
for key, value in kwargs.items():
data[key] = value
uri = ITOAInterfaceUtils.KV_STORE_NEW_MIGRATION_STATUS_COLLECTION_URI
if id_:
uri = uri + '/' + id_
rsp, content = rest.simpleRequest(uri, sessionKey=session_key,
raiseAllErrors=False, jsonargs=json.dumps(data), method='POST')
if rsp.status != 200 and rsp.status != 201:
logger.error('Got bad status code %s - Aborting. Response %s', rsp.status, rsp)
return False
return True
@staticmethod
def update_migration_status_to_kv(session_key,
id_,
is_running,
skip_local_failure,
start_timestamp,
end_timestamp=None,
has_succeeded=None,
precheck_results=None):
'''
Update version information to KV
@param {string} session_key: Splunk session key
@param {string} id_: KV store schema id, if id is none then create new stanza
@param {boolean} is_running: migration status
@param {boolean} skip_local_failure: whether to continue migration on local failures
@param {float} start_timestamp: timestamp when migration starts
@param {float} end_timestamp: timestamp when migration ended
@param {boolean} has_succeeded: whether or not migration ran successfully without major errors
@param {list} precheck_results: EA precheck results with default value = None
@rtype boolean (True/False)
@return flag if data is updated successfully
'''
uri = ITOAInterfaceUtils.KV_STORE_NEW_MIGRATION_STATUS_COLLECTION_URI
if id_:
uri = uri + '/' + id_
data = {
'is_running': is_running,
'start_timestamp': start_timestamp,
'skip_local_failure': skip_local_failure,
'end_timestamp': end_timestamp,
'has_succeeded': has_succeeded,
'precheck_results': precheck_results
}
rsp, content = rest.simpleRequest(uri, sessionKey=session_key,
raiseAllErrors=False, jsonargs=json.dumps(data), method='POST')
if rsp.status != 200 and rsp.status != 201:
logger.error('Got bad status code %s - Aborting. Response %s', rsp.status, rsp)
return False
json_data = json.loads(content)
id_ = json_data.get('_key')
logger.debug((
'Successfully updated KV store with migration status:'
' is_running: %s,'
' start_timestamp: %s,'
' skip_local_failure: %s,'
' end_timestamp: %s,'
' has_succeeded: %s,'
' precheck_results: %s,'
' schema _key: %s'
), is_running, start_timestamp, skip_local_failure, end_timestamp, has_succeeded, precheck_results, id_)
return True
@staticmethod
def _get_apps_uri(app, owner):
'''
Return uri for get version of app
@param {string} app: app name
@param {string} owner: owner name
'''
return rest.makeSplunkdUri() + 'services/apps/local/' + app
@staticmethod
def get_app_version(session_key, app="itsi", owner="nobody", fetch_conf_only=False):
'''
Get app version from app.conf file
@type: string
@param session_key - session key
@type: string
@param app - app name
@type: string
@param owner - owner name
@type: boolean
@param fetch_conf_only - is cached version for app okay to use or not, True indicates no
@return version number or None
@rtype string/None
'''
if (
not fetch_conf_only
and app.lower() == 'itsi'
and VersionCheck.validate_version(current_itsi_app_version, is_accept_empty=False)
):
return current_itsi_app_version
retry = 1
getargs = {'output_mode': 'json'}
response = None
while retry <= 10:
try:
response, content = rest.simpleRequest(ITOAInterfaceUtils._get_apps_uri(app, owner),
sessionKey=session_key, getargs=getargs)
if response.status != 503 and response.status != 404:
break
logger.info('KV store service is unavailable. Retry %d of 10.', retry)
except splunk.ResourceNotFound:
# Catching this error case is a workaround until SPL-218406 is fixed
logger.info('KV Service resource not found. Retry %d of 10.', retry)
except Exception as e:
logger.exception(e)
# Incremental delay to reduce the number of calls.
delay = retry * 5
time.sleep(delay)
retry += 1
if not response or (response.status != 200 and response.status != 201):
logger.error('Failed to get app: %s version, error: %s. Getting cached version: %s', app, response, current_itsi_app_version)
return current_itsi_app_version
else:
json_data = json.loads(content)
entry = json_data.get('entry')[0]
content = entry.get('content')
logger.debug('App version content: %s of URI: %s.', content, ITOAInterfaceUtils._get_apps_uri(
app, owner
))
return content.get('version')
@staticmethod
def get_itsi_conf_setting(session_key, stanza, setting, logger):
"""
Reads and returns the given conf setting.
@param session_key: the session key
@type session_key: str
@param stanza: the conf stanza
@type stanza: str
@param setting: the conf setting
@type setting: str
@param logger: logger
@type logger: structure
@returns: the conf setting
@rtype: any
"""
try:
response, content = rest.simpleRequest(
'/servicesNS/nobody/SA-ITOA/configs/conf-itsi_settings/' + stanza,
sessionKey=session_key,
getargs={'output_mode': 'json'}
)
if response.status == 200:
entries = json.loads(content).get('entry')
for entry in entries:
name = entry.get('name')
if name != stanza:
continue
settings = entry.get('content', {})
return settings.get(setting)
except Exception as e:
logger.exception(e)
return None
@staticmethod
def update_itsi_conf_setting(session_key, stanza, setting, logger):
status = True
try:
response, content = rest.simpleRequest(
'/servicesNS/nobody/SA-ITOA/configs/conf-itsi_settings/' + stanza,
sessionKey=session_key,
raiseAllErrors=False,
postargs=setting,
method='POST')
except Exception as e:
logger.exception(e)
status = False
return status
@staticmethod
def create_message(session_key, description, name=None, severity='info', app='itsi', owner='nobody', **query):
"""
Create splunk system message
@param {string} session_key - splunk session key
@param {string} description - app name
@param {string} app - app
@param {string} owner - owner
@return nothing
"""
logger.info('Creating system message: %s', description)
return post_splunk_user_message(
description,
session_key=session_key,
name=name,
severity=severity,
namespace=app,
owner=owner,
**query
)
@staticmethod
def get_modular_input(session_key, app, owner, mod_input_name, mod_instance_name):
"""
Get modular inputs
@type session_key: basestring
@param session_key: splunkd session key
@type app: basestring
@param app: app name under which modular input is needed
@type owner: basestring
@param owner: user name
@type mod_input_name: basestring
@param mod_input_name: Modular input name
@type mod_instance_name: mod_instance_name
@param mod_instance_name: modular input instance name
@rtype: object
@return: Entity object
"""
entity_path = "/data/inputs/" + mod_input_name
return getEntity(entity_path, mod_instance_name, sessionKey=session_key, namespace=app, owner=owner)
@staticmethod
def create_modular_input(session_key, app, owner, mod_input_name, post_args):
"""
Create modular input
@type session_key: basestring
@param session_key: splunkd session key
@type app: basestring
@param app: app name under which modular input is needed
@type owner: basestring
@param owner: user name
@type mod_input_name: basestring
@param mod_input_name: Modular input name
@type post_args: dict
@param post_args: Optional and required post_args of modular input to create it
Note: Must contain instance name in 'name' attribute of dict
@rtype: bool
@return: True - if operation is successful otherwise False
"""
entity_path = "/data/inputs/" + mod_input_name
entity = getEntity(entity_path, '_new', sessionKey=session_key, namespace=app, owner=owner)
# Content must contain required parameters
ITOAInterfaceUtils.update_modular_input(session_key, entity, post_args)
@staticmethod
def control_modular_input(session_key, app, owner, mod_input_name, mod_instance_name, action):
"""
Perform remove/enable/disable modular input
@type session_key: basestring
@param session_key: splunkd session key
@type app: basestring
@param app: app name under which modular input is needed
@type owner: basestring
@param owner: user name
@type mod_input_name: basestring
@param mod_input_name: modular input name
@type mod_instance_name: basestring
@param mod_instance_name: modular input instance name
@type action: basestring
@param action: action name ('remove', 'enable', 'disable')
@rtype: bool
@return: True - if operation is successful otherwise False
"""
entity_path = "/data/inputs/" + mod_input_name
uri = buildEndpoint(entity_path, entityName=mod_instance_name, namespace=app, owner=owner)
if action == 'enable':
uri += '/enable'
if action == 'disable':
uri += '/disable'
return controlEntity(action, uri, session_key)
@staticmethod
def delete_modular_input(session_key, app, owner, mod_input_name, mod_instance_name):
"""
Delete modular input
@type session_key: basestring
@param session_key: splunkd session key
@type app: basestring
@param app: app name under which modular input is needed
@type owner: basestring
@param owner: user name
@type mod_input_name: basestring
@param mod_input_name: Modular input name
@type mod_instance_name: basestring
@param mod_instance_name: modular input instance name
@rtype: bool
@return: True - if operation is successful otherwise False
"""
entity_path = "/data/inputs/" + mod_input_name
return deleteEntity(entity_path, mod_instance_name, app, owner, sessionKey=session_key)
@staticmethod
def update_modular_input(session_key, entity, post_arguments):
"""
Update modular input
@type session_key: basestring
@param session_key: session_key
@type entity: object
@param entity: Entity object which hold information for a modualr input
@type post_arguments: dict
@param post_arguments: properties to set
@rtype: bool
@return: True - if operation is successful otherwise False
"""
if not entity or not isinstance(entity, Entity):
logger.error('Invalid entity, failed to update.')
return False
# Content must contain required parameters
for key in entity.requiredFields:
if key in list(post_arguments.keys()):
entity[key] = post_arguments.get(key)
else:
logger.debug("Required field %s does not exist, hence cannot create new entity.", key)
return False
for opt_key in entity.optionalFields:
if opt_key in list(post_arguments.keys()):
entity[opt_key] = post_arguments.get(opt_key)
return setEntity(entity, sessionKey=session_key)
@staticmethod
def merge_with_sec_filter(filter_data, sec_filter_data):
"""
Combined the security group filter with user custom filer
@type filter_data: dict
@param filter_data: custom filter
@type sec_filter_data: dict
@param sec_filter_data: security group filter (generated by system)
@rtype: dict
@return: a merged filter
"""
new_filter = {}
if filter_data and sec_filter_data:
if '$or' in list(filter_data.keys()) or '$and' in list(filter_data.keys()):
# To avoid empty or None values in $or or $and
if not filter_data.get('$or'):
filter_data.pop('$or', None)
if not filter_data.get('$and'):
filter_data.pop('$and', None)
new_filter = {'$and': [filter_data]}
new_filter['$and'].append(sec_filter_data)
else:
new_filter.update(filter_data)
new_filter.update(sec_filter_data)
elif sec_filter_data:
new_filter = sec_filter_data
else:
new_filter = filter_data
return new_filter
@staticmethod
def remove_illegal_character_from_entity_rules(entity_rules):
"""
Replace illegal characters in a string with ''
@type string: list
@param string: entity rules to replace special chars
"""
replace_fields = ['field', 'value']
for entity_rule in entity_rules:
for rule_item in entity_rule.get('rule_items', []):
for replace_field in replace_fields:
if replace_field in rule_item:
for illegal_character in ILLEGAL_CHARACTERS:
rule_item[replace_field] = rule_item[replace_field].replace(illegal_character, '')
@staticmethod
def configure_team(session_key):
"""
Import team setting from conf file.
Team setting needs to be configured before import other settings.
@rtype: boolean
@return: status - if team configuration is successfully or fail
"""
itsi_settings_importer = ItsiSettingsImporter(session_key=session_key)
return itsi_settings_importer.import_team_setting(owner='nobody')
@staticmethod
def configure_itsi(session_key, logger):
"""
Import all ITSI setting.
Combining the itsi_configurator and itsi_upgrade modular input into one.
Since itsi_upgrade modular input runs on every restart, configure_itsi will run as well.
@rtype: None
@return: None
"""
logger.info("Check and import data from conf to KV store.")
itsi_settings_importer = ItsiSettingsImporter(session_key=session_key)
try:
is_all_import_success = itsi_settings_importer.import_itsi_settings(owner='nobody')
if not is_all_import_success:
post_splunk_user_message(
('Failures occurred while attempting to import some IT Service Intelligence settings from '
'configuration files for apps and modules. '
'Check the logs to get information about which settings failed to be imported.'),
session_key=session_key
)
except Exception as e:
message = ("Importing IT Service Intelligence settings from conf files "
"for apps and modules failed with: {}").format(str(e))
logger.exception(message)
post_splunk_user_message(message, session_key=session_key)
logger.info("Successfully imported IT Service Intelligence settings from conf files for apps and modules.")
@staticmethod
def get_local_conf_stanza(stanza, conf_file, app_name, logger):
"""
To retrieve values from local conf file in provided app
@return: local conf stanza values or False if no stanza found in local conf.
"""
app_home = make_splunkhome_path(["etc", "apps", app_name])
local_app_conf_file = os.path.sep.join([app_home, "local", conf_file])
if os.path.exists(local_app_conf_file):
localconf = cli.readConfFile(local_app_conf_file)
try:
return localconf[stanza]
except Exception as err:
logger.info("Provided stanza is not present in local conf %s", err)
return False
return False
@staticmethod
def enable_itsi_event_grouping(session_key, logger):
"""
Enable ITSI event grouping real time saved search
@return: none
"""
logger.info("Check and enable itsi_event_grouping saved search")
try:
if not (is_feature_enabled('itsi-rulesengine-adhoc', session_key, reload=True) or is_feature_enabled('itsi-rulesengine-queue', session_key, reload=True)):
saved_search_status = ITOAInterfaceUtils.get_local_conf_stanza(
"itsi_event_grouping", "savedsearches.conf", "SA-ITOA", logger
)
if saved_search_status is False:
service = ITOAInterfaceUtils.service_connection(
session_key, app_name="SA-ITOA"
)
itsi_event_grouping_search = service.saved_searches["itsi_event_grouping"]
if itsi_event_grouping_search["disabled"] == "1":
rest.simpleRequest(
"/servicesNS/nobody/SA-ITOA/saved/searches/itsi_event_grouping?disabled=0",
sessionKey=session_key,
method="POST",
raiseAllErrors=True,
)
else:
if "disabled" not in saved_search_status.keys():
rest.simpleRequest(
"/servicesNS/nobody/SA-ITOA/saved/searches/itsi_event_grouping?disabled=0",
sessionKey=session_key,
method="POST",
raiseAllErrors=True,
)
except Exception as err:
logger.error(
"Error occurred while enabling the itsi_event_grouping search: %s", err
)
@staticmethod
def configure_version(session_key):
'''
configure version information to KV when splunk starts
@param {string} session_key: Splunk session key
@return: status - if version configuration is successfully or fail
'''
new_version = ITOAInterfaceUtils.get_app_version(session_key, fetch_conf_only=True)
old_version, id_ = ITOAInterfaceUtils.get_version_from_kv(session_key)
if not old_version:
# pre_migration_version is None, treat as the same version as the new version
old_version = new_version
return ITOAInterfaceUtils.update_version_to_kv(session_key, id_, new_version, old_version, False)
@staticmethod
def update_itsi_cp_saved_searches_collection(session_key, logger):
"""
Run the custom command itsi_content_packs_status_update to update the
KVStore collection `itsi_content_pack_saved_search_status` with latest saved searches status of content packs.
@param {string} session_key: Splunk session key
@param {logger} logger: logger
@return search_job by query '| itsicontentpackstatus'
"""
return ITOAInterfaceUtils.run_search(session_key,
logger,
'| itsicontentpackstatus',
raise_exception=False)
@staticmethod
def run_search(session_key, logger, search, raise_exception=True):
"""
Runs the search command specified in the parameter.
@param {string} session_key: Splunk session key
@param {logger} logger: logger
@param {search} splunk search query
@param {bool} raise_exception:
if raise_exception is True then raise exception for search when exception happens
if raise_exception is False then simply log error when exception happens.
@return search_job based on search parameter
"""
try:
search_job = ITOAInterfaceUtils.service_connection(session_key, app_name="SA-ITOA").jobs.create(search)
return search_job
except Exception as e:
error_message = 'Error when running search "{search}". Error: {e}'.format(search=search, e=e)
if raise_exception:
raise Exception(error_message)
logger.error(error_message)
@staticmethod
def check_for_in_operator_support(session_key):
"""
Checks whether the current storage service KVStore/KVService
supports the $in operator or not.
@param {string} session_key: Splunk session key
@return Boolean
"""
kvstore = ITOAStorage()
if kvstore.wait_for_storage_init(session_key):
try:
res, content = rest.simpleRequest(
ITOAInterfaceUtils.KV_STORE_ITSI_FEATURE_COLLECTION_COLLECTION_URI,
sessionKey=session_key,
raiseAllErrors=False,
method="GET",
)
if res.status != 200:
logger.error("An error occurred while configuring is_in_op_supported flag, itsi_features collection not accessible")
return False
if json.loads(content):
content_data = json.loads(content)[0]
if (content_data.get("is_in_op_supported") is not None) and res.status == 200:
return content_data.get("is_in_op_supported")
return ITOAInterfaceUtils.configure_in_operator_support(session_key)
except Exception as e:
logger.error(f"An error occurred: {e}")
return False
else:
logger.error("KVStore not yet initialized")
return False
@staticmethod
def configure_in_operator_support(session_key):
"""
Configure the flag in itsi_features collection if $in operator supports.
@param {string} session_key: Splunk session key
@return Boolean
"""
kvstore = ITOAStorage()
is_in_op_supported = False
if kvstore.wait_for_storage_init(session_key):
try:
res, content = rest.simpleRequest(
ITOAInterfaceUtils.KV_STORE_ITSI_FEATURE_COLLECTION_COLLECTION_URI,
sessionKey=session_key,
raiseAllErrors=False,
method="GET",
)
if res.status != 200:
logger.error("An error occurred while configuring is_in_op_supported flag, itsi_features collection not accessible")
return False
rsp, con = rest.simpleRequest(
ITOAInterfaceUtils.KV_STORE_NEW_MIGRATION_COLLECTION_URI,
sessionKey=session_key,
raiseAllErrors=False,
getargs={
"output_mode": "json",
"query": json.dumps({"identifying_name": {"$in": [""]}}),
},
)
if rsp.status == 200:
is_in_op_supported = True
url = ITOAInterfaceUtils.KV_STORE_ITSI_FEATURE_COLLECTION_COLLECTION_URI
if not json.loads(content):
content_data = {"is_in_op_supported" : is_in_op_supported}
else:
content_data = json.loads(content)[0]
url = url + "/" + str(content_data["_key"])
content_data["is_in_op_supported"] = is_in_op_supported
rest.simpleRequest(
url,
sessionKey=session_key,
jsonargs=json.dumps(content_data),
method="POST",
)
return is_in_op_supported
except Exception as e:
logger.error(f"An error occurred while configuring is_in_op_supported flag: {e}")
return False
else:
logger.error("KVStore not yet initialized")
return False
class FileSystemUtils(object):
@staticmethod
def write_binary_to_disk(path, content):
"""
Write the binary to a file path
@type: string
@param path: write path
@type: bytes
@param content decoded bytes
@returns: None
@rtype: None
"""
with open(path, 'wb') as f:
f.write(content)
class ItsiSettingsImporter(ItoaBase):
log_prefix = 'ItsiSettingsImporter'
conf_prefix = 'itsi_'
app = 'SA-ITOA'
supported_settings = [
# Import these before others below
[
'team'
],
# Having imported dependencies above, now import these
[
'service',
'sandbox',
'data_integration_template',
'deep_dive',
'drift_detection_template',
'glass_table',
'kpi_base_search',
'kpi_template',
'kpi_threshold_template',
'base_service_template',
'entity_type',
'entity_management_policies',
'entity_management_rules',
'custom_threshold_windows',
'upgrade_readiness_precheck',
'duplicate_entities_job_queue',
'duplicate_aliases_cache',
'duplicate_entities_cache',
'duplicate_alias_entity_relationship_cache',
]
]
def __init__(self, session_key):
super(ItsiSettingsImporter, self).__init__(session_key)
def import_itsi_settings(self, owner):
'''
Imports ITSI settings from conf files across apps
Note that this imports KPI template settings for ITSI modules
@rtype: boolean
@return: indicates if import succeeded (True) or had one or more failures (False)
'''
settings_urls = self.find_settings_urls()
return self.import_setting(owner=owner, itsi_settings_urls=settings_urls)
@staticmethod
def get_supported_settings_conf_names():
settings_conf_names = []
for settings_list in ItsiSettingsImporter.supported_settings:
settings_conf_names.append([
(ItsiSettingsImporter.conf_prefix + setting)
for setting in settings_list
])
return settings_conf_names
def find_settings_urls(self):
'''
Using splunkd rest, we'll grab the stanzas from the ITSI confs for all apps
settings on the local host and spit them out
@rtype: list of dict
@return: a list of dict of urls corresponding to stanza names for ITSI settings found
The dicts in the list MUST be imported in that order for dependency management
'''
settings_urls = []
# First, check to see that the endpoints exist in the properties
properties_location = '/servicesNS/nobody/' + quote_plus(self.app) + '/properties'
rsp, content = rest.simpleRequest(
properties_location,
sessionKey=self.session_key,
raiseAllErrors=False,
getargs={'output_mode': 'json'}
)
if rsp.status != 200 and rsp.status != 201:
logger.error('Error getting properties from location: %s.', properties_location)
return settings_urls
properties_dict = json.loads(content)
settings_names_found = [prop['name'] for prop in properties_dict['entry']]
# Filter out the ones not returned by end point
supported_settings_found_in_conf = []
for settings_list in self.get_supported_settings_conf_names():
supported_settings_found_in_conf.append([
setting for setting in settings_list if setting in settings_names_found
])
for settings_list in supported_settings_found_in_conf:
settings_urls_dict = {}
for setting_name in settings_list:
path = properties_location + '/' + quote_plus(setting_name)
rsp, content = rest.simpleRequest(
path,
sessionKey=self.session_key,
raiseAllErrors=False,
getargs={'output_mode': 'json'}
)
if rsp.status != 200 and rsp.status != 201:
logger.error('Error getting data from REST endpoint %s.', path)
continue
try:
config = json.loads(content)
# Strip the conf prefix from the stanza
prefix_stripped_setting = setting_name[len(self.conf_prefix):]
settings_urls_dict[prefix_stripped_setting] = []
for entry in config['entry']:
url = entry.get('id')
if url is not None:
settings_urls_dict[prefix_stripped_setting].append(url)
except Exception:
logger.exception('Error parsing JSON content.')
settings_urls.append(settings_urls_dict)
return settings_urls
def get_itsi_setting(self, setting_stanza_path):
'''
The itsi_stuff_dict commonly contains urls of records stored in conf files
that need to be stored elsewhere, so we'll grab them from their locations online
and make sure that these are in a format acceptable to for input
@type setting_stanza_path: string
@param setting_stanza_path: path (url) for the stanza to read settings from
@rtype: dict
@return: stanza content for the given path
'''
rsp, content = rest.simpleRequest(
setting_stanza_path,
sessionKey=self.session_key,
raiseAllErrors=False,
getargs={'output_mode': 'json'}
)
if rsp.status != 200:
logger.error('Record %s not found, ignoring.', setting_stanza_path)
return None
try:
_key = os.path.split(setting_stanza_path)[1]
normalized_setting = {}
config = json.loads(content)
# each entry is a field for the setting object
# entry 'name' being the field name
# entry 'content' being the field content
for entry in config['entry']:
title = entry['name']
content = entry['content']
# Normalize conf values to Python
# Talk to owner of SVG viz in Glass Table before changing the next line
if title == 'svg_content' or title == 'svg_coordinates':
# There are some interesting things with glass table here
logger.debug('Special case string for SVG viz in glass table.')
elif (content.startswith('[') and content.endswith(']')) or (
content.startswith('{') and content.endswith('}')):
logger.debug('Entry %s key %s is JSON', _key, title)
content = json.loads(content)
if isinstance(content, itsi_py3.ext_string_type):
lower = content.lower()
if lower in ['true', b'true']:
content = True
elif lower in ['false', b'false']:
content = False
elif lower in ['null', b'null']:
content = None
normalized_setting[title] = content
normalized_setting['_key'] = _key
return normalized_setting
except Exception:
logger.exception('Error parsing JSON content from %s, possibly malformed, ignoring.', setting_stanza_path)
return None
def import_setting(self, owner, itsi_settings_urls):
'''
Imports the information into the statestore backend, or whatever backend you're using (skipping if
conf files are being used, because duh, thats where we're getting the information from originally)
It will retain all of the original information, including the default service ids, entity ids, kpi ids, etc)
@type itsi_settings_urls: list of dict
@param itsi_settings_urls: list of dicts mapping setting to its url to be imported in the order in the list
for dependency management
@rtype: boolean
@return: indicates if import succeeded (True) or had one or more failures (False)
'''
itoa = ITOAStorage()
if itoa.backend == 'conf':
return False
# Check if kv store is ready to perform operation
# Wait for max 5 minutes then gave up so we can take of it
if not itoa.wait_for_storage_init(self.session_key):
is_all_import_success = False
raise Exception(
'KV store is not initialized. We have tried for 5 minutes but the KV store still not available.')
is_all_import_success = True
# Other methods we'll go through the official apis to transfer things
for itsi_settings_urls_dict in itsi_settings_urls:
for setting_name in list(itsi_settings_urls_dict.keys()):
logger.info('Importing settings of type %s', setting_name)
for path in itsi_settings_urls_dict[setting_name]:
normalized_setting = self.get_itsi_setting(setting_stanza_path=path)
if normalized_setting is None:
logger.error('Unable to process setting at path %s, ignoring.', path)
is_all_import_success = False
continue
if normalized_setting.get('_key') in BLOCK_LIST:
continue
try:
# Since importing of settings is only expected once on DA installation,
# only add settings that dont already exist
object_of_type = instantiate_object(
self.session_key,
'nobody',
setting_name,
logger=logger
)
normalized_setting['mod_source'] = ITSI_DEFAULT_IMPORT
if setting_name not in MUTABLE_OBJECT_LIST and normalized_setting.get('_immutable') is None:
normalized_setting['_immutable'] = 1
# add _is_from_conf flag - this flag differentiates the objects created from the config file
# versus user created objects
if normalized_setting.get('_is_from_conf') is None:
normalized_setting['_is_from_conf'] = 1
setting_key = normalized_setting.get('_key', '')
if setting_name == "duplicate_aliases_cache":
# splunk doesn't allow '=' in stanza name. but
# our key needs '=' in there. '=' by default
# gets converted to '%3D'. so, we are converting
# it back to '=' when creating the record in KVitsi_duplicate_aliases_cache.conf
setting_key = setting_key.replace('%3D', '=')
normalized_setting['_key'] = setting_key
if object_of_type.get(owner, setting_key) is None:
logger.info(
'Settings for %s with key %s does not exist, creating new one.',
setting_name,
setting_key
)
object_of_type.create(owner, normalized_setting)
else:
if setting_name in MUTABLE_OBJECT_LIST:
existing_object = object_of_type.get(owner, setting_key)
# reset the _immutable flag
existing_object['_immutable'] = 0
existing_object['_is_from_conf'] = 1
# checking mod source to determine if the default object has been modified or not
# if not modified, update with the latest setting.
if existing_object['mod_source'] == ITSI_DEFAULT_IMPORT:
logger.info(
'Setting for %s with key %s already exists and has not been modified'
+ ' manually, updating it to match conf content.',
setting_name,
setting_key
)
object_of_type.update(owner, existing_object.get('_key'), normalized_setting)
else:
logger.info(
'Setting for %s with key %s already exists, updating it.',
setting_name,
normalized_setting['_key']
)
# Ensure that the conf file contents are updated into the exisitng record if any
object_of_type.update(owner, normalized_setting.get('_key'), normalized_setting, is_partial_data=True)
except Exception:
logger.exception(
'Unable to import setting: %s of type %s, ignoring.',
normalized_setting.get('_key', 'Unknown'), setting_name
)
is_all_import_success = False
return is_all_import_success
def import_team_setting(self, owner, from_conf=True):
"""
Imports the just the team setting from conf or hardcoded setting
@type owner: string
@param owner: owner of the object
@type from_conf: boolean
@param from_conf: if user wants to import the setting from conf file
@rtype: boolean
@return: indicates if import succeeded (True) or had one or more failures (False)
"""
import_status = True
setting_name = 'team'
if from_conf:
(host, port) = ITOAInterfaceUtils.get_splunk_host_port()
team_setting_url = \
'https://' + \
host + \
':' + \
str(port) + \
'/servicesNS/nobody/SA-ITOA/properties/itsi_team/default_itsi_security_group'
normalized_setting = self.get_itsi_setting(setting_stanza_path=team_setting_url)
else:
normalized_setting = {
'description': 'By default, all ITSI objects are contained within the default Global team. \
If you don\'t need to restrict service visibility to specific teams in your organization, \
create all services in the Global team.',
'title': 'global',
'_immutable': 0,
'_key': 'default_itsi_security_group',
'acl': {
'read': ['*', 'itoa_admin', 'itoa_team_admin', 'itoa_analyst', 'itoa_user'],
'delete': ['itoa_admin'],
'write': ['itoa_admin'],
'owner': 'nobody'
}
}
try:
object_of_type = instantiate_object(
self.session_key,
'nobody',
setting_name,
logger=logger
)
if normalized_setting.get('_immutable') is None:
normalized_setting['_immutable'] = 1
if object_of_type.get(owner, normalized_setting.get('_key', '')) is None:
object_of_type.create(owner, normalized_setting)
else:
logger.info('Team setting already exists. No need to override team setting.')
except Exception as e:
logger.exception('Unable to import team setting: {}'.format(str(e)))
import_status = False
return import_status
class ItsiMacroReader(object):
"""
Utility for looking up ITSI macros
"""
owner = 'nobody'
namespace = 'SA-ITOA'
path = 'configs/conf-macros'
session_key = None
def __init__(self, session_key, macro, host_base_uri=''):
"""
ItsiMacroReader constructor
@param session_key: Splunkd session key
@type: str
@param macro: Name of the macro to lookup
@type: str
"""
self.session_key = session_key
self.macro = getEntity(
self.path,
macro,
owner=self.owner,
namespace=self.namespace,
sessionKey=session_key,
hostPath=host_base_uri
)
self.index = self._parse_index()
@property
def definition(self):
"""
Property to access the macro's definition
@return: The macro definition
@type: str
"""
return self.macro.get('definition', None)
def _parse_index(self):
"""
Parse the name of the index from the macro definition
"""
definition = self.definition
index = None
if "index=" in definition:
parsed_definition = definition.split('=', 1)
if parsed_definition[0].strip() == 'index' and len(parsed_definition) == 2:
index_plus_more = parsed_definition[1].strip("\"'\n ").split(' ', 1)
index = index_plus_more[0].strip("\"'\n ")
# Scenario where the index is stored in a macro in the macro being 'read'
else:
parsed_definition = definition.split(' ', 1)
if "`" in parsed_definition[0].strip():
# Assumption that the index is going to be the first term
stripped_macro = parsed_definition[0].strip('`')
index = ItsiMacroReader(self.session_key, stripped_macro).index
if index is not None and len(index) > 0:
return index
else:
raise ValueError('Index value not extracted properly for this macro: %s.' % self.macro.name)
class SplunkMessageHandler(object):
"""
This class provides a handler for posting messages into the Splunk UI.
Used primarily for notifying the end user about important ITSI events.
"""
MESSAGE_ENDPOINT = '/services/messages'
INFO = 'info'
WARNING = 'warn'
ERROR = 'error'
def __init__(self, session_key):
self.session_key = session_key
def post_or_update_message(self, id, severity, message, role=None):
"""
Create a new message or update an existing message by ID
@type: string
@param id: index of the job
@type: string
@param severity: Severity of message ('info', 'warn', 'error')
@type: string
@param message: Message to write
@type: string
@param role: Comma-separated list of roles required to view message
"""
allowed_sev = [self.ERROR, self.WARNING, self.INFO]
assert severity in allowed_sev, 'Incorrect severity specified. Severity should be one of {}'.format(allowed_sev)
try:
postargs = {
'name': id,
'value': message,
'severity': severity,
}
if role:
postargs['role'] = role
response, contents = rest.simpleRequest(
path=self.MESSAGE_ENDPOINT,
postargs=postargs,
sessionKey=self.session_key)
if response.status not in [http.client.OK, http.client.CREATED]:
e = Exception('Failed to post Splunk message id={}. Response={} Contents={}'
.format(id, response, contents))
raise e
except Exception:
logger.exception('Exception while posting splunk message.')
raise
def delete_message(self, id):
try:
response, contents = rest.simpleRequest(
path=self.MESSAGE_ENDPOINT + '/' + id,
method='DELETE',
sessionKey=self.session_key)
if response.status != http.client.OK:
e = Exception('Failed to delete Splunk message id={}. Response={} Contents={}'.
format(id, response, contents))
raise e
except Exception:
logger.exception('Exception while deleting splunk message.')
raise
class ItsiAtUtils(object):
@staticmethod
def generate_cron_expression(job_ix, offset_minutes=OFFSET_MINUTES):
"""
Generates a cron expression for scheduling a job, accounting for
potential race conditions that may occur when multiple jobs are running
at the same time (e.g., 00:00 UTC) (ITSI-35084). This function offsets
the search interval to minimize these occurrences.
@type: int
@param job_ix: index of the job
@type: int
@param offset_minutes: interval to offset the search by
@rtype: string
@returns: str of the cron expression to schedule the job
"""
hour, minute = divmod(job_ix * offset_minutes, 60)
if not 0 <= hour < 24:
raise ValueError("Misconfigured offsets value")
return f"{minute} {hour} * * *"
class ThresholdUtils(ABC, object):
def generate_threshold_levels(self, thresholds, threshold_direction, is_adaptive, mean=0, std=1):
"""
Generate threshold levels based on the provided thresholds.
@type: dict
@param thresholds: owner making the request
@type: str
@param threshold_direction: The direction of the thresholds.
@type: bool
@param is_adaptive: A flag indicating whether the thresholds are adaptive.
@rtype: dict
@return: The generated threshold levels.
"""
threshold_levels = deepcopy(DEFAULT_THRESHOLDS)
severity_objects = []
for key, value in thresholds.items():
threshold_values = value if isinstance(value, list) else [value]
# Get the severity level for the key
level = next((lvl for lvl in DEFAULT_SEVERITY_LEVELS if lvl['severityLabel'] == key), None)
if level:
for threshold_value in threshold_values:
if is_adaptive:
# For adaptive thresholds
severity_object = {
'severityLabel': level['severityLabel'],
'severityValue': level['severityValue'],
'severityColor': level['severityColor'],
'severityColorLight': level['severityColorLight'],
'thresholdValue': threshold_value * std + mean,
'dynamicParam': threshold_value
}
else:
# For static thresholds
severity_object = {
'severityLabel': level['severityLabel'],
'severityValue': level['severityValue'],
'severityColor': level['severityColor'],
'severityColorLight': level['severityColorLight'],
'thresholdValue': threshold_value,
'dynamicParam': 0
}
severity_objects.append(severity_object)
# Sort severity objects
if is_adaptive:
severity_objects.sort(key=lambda x: x['dynamicParam'])
boundary_min = threshold_levels['renderBoundaryMin']
if len(severity_objects):
threshold_levels['gaugeMin'] = severity_objects[0]['thresholdValue'] - abs(severity_objects[0]['thresholdValue'] * 0.1)
if threshold_levels['gaugeMin'] < boundary_min:
threshold_levels['renderBoundaryMin'] = threshold_levels['gaugeMin']
boundary_max = threshold_levels['renderBoundaryMax']
for i, severity_obj in enumerate(severity_objects):
threshold_value = severity_objects[i]['thresholdValue']
threshold_levels['thresholdLevels'].append({
**severity_objects[i]
})
threshold_levels['gaugeMax'] = threshold_value + abs(threshold_value * 0.1)
if threshold_levels['gaugeMax'] > boundary_max:
boundary_max = threshold_levels['gaugeMax']
threshold_levels['renderBoundaryMax'] = boundary_max
else:
severity_objects.sort(key=lambda x: x['thresholdValue'])
threshold_levels['thresholdLevels'] = severity_objects
highest_threshold_value = severity_objects[-1]['thresholdValue']
updated_gauge_max = highest_threshold_value + 0.1 * highest_threshold_value
threshold_levels['gaugeMax'] = updated_gauge_max
threshold_levels['renderBoundaryMax'] = updated_gauge_max
# Update base severity level if threshold direction not upper
if threshold_direction != THRESHOLD_CONFIGURATION_OPTIONS[2]:
default_level = DEFAULT_SEVERITY_LEVELS[5]
threshold_levels['baseSeverityLabel'] = default_level['severityLabel']
threshold_levels['baseSeverityValue'] = default_level['severityValue']
threshold_levels['baseSeverityColor'] = default_level['severityColor']
threshold_levels['baseSeverityColorLight'] = default_level['severityColorLight']
return threshold_levels
def compute_threshold_value(self, range_min, range_max):
"""
Compute the threshold value given the range.
@type: float
@param range_min: The minimum value of the range.
@type: float
@param: A flag indicating whether the thresholds are adaptive.
@rtype: float
@return: The computed threshold value.
"""
range_val = range_max - range_min
threshold_value = range_min + range_val / 2
if isinstance(threshold_value, (int, float)):
if range_val > 10:
threshold_value = round(threshold_value, 0)
elif range_val > 5:
threshold_value = round(threshold_value, 1)
elif range_val > 1:
threshold_value = round(threshold_value, 2)
else:
threshold_value = round(threshold_value, 3)
return threshold_value
def generate_time_variant_policies(self, config, aggregate=False):
"""
Generates time variant policies based on the given recommended configuration.
@type: string
@param owner: owner making the request
@type: dict
@param config: recommended data for generating policies.
@rtype: dict
@return: A dictionary containing the generated policies.
"""
# Generate a unique key for the new policy
key = ItsiTimeBlockUtils.generate_policy_uuid()
# Extract configuration data
cron = config["Cron Expression"]
duration = int(config["Duration"])
time_blocks = [[cron, duration]]
mean = float(config["Mean"])
std = float(config["Std"])
use_static = config["Use Static"]
# Calculate local time zone offset in min
local_tz_offset = utils.get_local_tz_offset_to_utc_sec()
local_tz_offset_in_min = int(local_tz_offset / 60)
# Generate user specific time blocks and policy title
get_user_time_blocks = ItsiTimeBlockUtils.get_new_time_blocks([cron, duration], local_tz_offset_in_min)
get_policy_title = ItsiTimeBlockUtils.generate_policy_title(get_user_time_blocks, local_tz_offset_in_min)
# Extract and parse threshold response
threshold_response_str = config["Thresholds"]
threshold_response_json_str = threshold_response_str.replace("'", '"')
threshold_response_obj = json.loads(threshold_response_json_str)
# Generate threshold levels for the policy
threshold_levels = self.generate_threshold_levels(threshold_response_obj, config["Threshold Direction"], True, mean, std)
# Create the new policy
if aggregate:
recommended_policies = {DEFAULT_POLICY_KEY: KT_DEFAULT_POLICY}
new_policy = {
"title": get_policy_title,
"aggregate_thresholds": threshold_levels,
"entity_thresholds": DEFAULT_THRESHOLDS,
"policy_type": "static" if normalizeBoolean(use_static) else config["Algorithm"],
"time_blocks": time_blocks
}
else:
recommended_policies = {DEFAULT_POLICY_KEY: DEFAULT_POLICY}
new_policy = {
"title": get_policy_title,
"entity_thresholds": threshold_levels,
"policy_type": "static" if normalizeBoolean(use_static) else config["Algorithm"],
"time_blocks": time_blocks
}
# Add the new policy to the recommended policies dictionary
recommended_policies[key] = new_policy
return recommended_policies
class ThresholdRecommendationUtils(object):
@staticmethod
def prepare_threshold_recommendation_summary(recommended_data):
threshold_recommendation_summary = None
if len(recommended_data) > 0:
threshold_recommendation_summary = dict()
threshold_recommendation_summary['confidenceLevel'] = recommended_data[0].get('Confidence')
threshold_recommendation_summary['confidenceScore'] = recommended_data[0].get('Score')
threshold_recommendation_summary['timePolicyDesc'] = recommended_data[0].get('Time Policy Description')
threshold_recommendation_summary['timePolicyType'] = recommended_data[0].get('Algorithm')
threshold_recommendation_summary['lastRun'] = time.time()
return threshold_recommendation_summary
@staticmethod
def get_entity_default_time_variant_policies():
return {DEFAULT_POLICY_KEY: DEFAULT_POLICY}