# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. from ITOA.itoa_common import validate_json import re import json class EntityVitalMetricValidationException(Exception): pass class EntityVitalMetric(object): """ EntityVitalMetric - an entity vital metric for looking up metrics associated with the entity and of its entity_type """ def __init__(self): pass @classmethod def validate(cls, log_prefix, json_data): """ Validate if input json data represents a valid vital metric @type log_prefix: str @param log_prefix: prefix for log message @type json_data: dict @param json_data: json data representing an entity vital metric @return: None """ validate_json(log_prefix, json_data) cls._validate_params(log_prefix, json_data) @classmethod def _validate_params(cls, log_prefix, json_data): """ Validate both split_by_fields and matching_entity_fields @type log_prefix: str @param log_prefix: prefix for log messages @type split_by_fields: list @param split_by_fields: @type matching_entity_fields: list @param matching_entity_fields: @return: None """ req_fields = ['metric_name', 'search', 'split_by_fields', 'matching_entity_fields'] req_alert_fields = ['suppress_time', 'cron_schedule', "is_enabled", "critical_threshold", "info_threshold"] req_entity_filter_fields = ['field', 'value', 'field_type'] if not all([f in json_data for f in req_fields]): raise EntityVitalMetricValidationException( log_prefix + 'vital_metrics must contain the following fields: %s' % req_fields) split_by_fields = json_data.get('split_by_fields') matching_entity_fields = json_data.get('matching_entity_fields') is_key = json_data.get('is_key', False) alert_rule = json_data.get('alert_rule', None) if json_data.get('metric_name') in ['', None]: raise EntityVitalMetricValidationException( log_prefix + 'metric name cannot be empty') if not isinstance(split_by_fields, list) or not isinstance(matching_entity_fields, list): raise EntityVitalMetricValidationException( log_prefix + 'split_by_fields and matching_entity_fields must be string arrays') if not split_by_fields or not matching_entity_fields: if split_by_fields + matching_entity_fields: empty_field = 'split_by_fields' if matching_entity_fields else 'matching_entity_fields' raise EntityVitalMetricValidationException( log_prefix + '{} cannot be an empty array'.format(empty_field)) elif is_key: raise EntityVitalMetricValidationException( log_prefix + 'key metric must have entity matching fields') if any(field in split_by_fields + matching_entity_fields for field in ['', None]): raise EntityVitalMetricValidationException( log_prefix + 'split_by_fields and matching_entity_fields cannot be empty strings or None') if json_data.get('search') in ['', None]: raise EntityVitalMetricValidationException( log_prefix + 'search field cannot be empty string or None') if len(split_by_fields) != len(matching_entity_fields): raise EntityVitalMetricValidationException( log_prefix + 'split_by_fields and matching_entity_fields must have same number of fields') if alert_rule: if not all([f in alert_rule for f in req_alert_fields]): raise EntityVitalMetricValidationException(log_prefix + ('vital_metrics alert config must contain \ the following fields: %s') % req_alert_fields) critical_threshold = alert_rule.get('critical_threshold', None) info_threshold = alert_rule.get('info_threshold', None) entity_filter = alert_rule.get('entity_filter', []) if critical_threshold is None or info_threshold is None \ or len(critical_threshold) != 2 or len(info_threshold) != 2: raise EntityVitalMetricValidationException( log_prefix + 'vital_metrics alert threshold cannot be empty') suppress_time = alert_rule.get('suppress_time', None) cron_schedule = alert_rule.get('cron_schedule', None) try: int(suppress_time) except ValueError: raise EntityVitalMetricValidationException( log_prefix + 'vital_metrics alert suppress_time must be integer') try: int(re.split('/| ', cron_schedule)[1]) except ValueError: raise EntityVitalMetricValidationException( log_prefix + 'vital_metrics alert cron_schedule must be in the format of */1 * * * *') if not isinstance(entity_filter, list): raise EntityVitalMetricValidationException(log_prefix + 'entity_filter must be arrays') for filter_dict in entity_filter: if not all([f in filter_dict for f in req_entity_filter_fields]): raise EntityVitalMetricValidationException(log_prefix + ('vital_metrics alert entity filter config \ must contain the following fields: %s') % req_entity_filter_fields) if any(val in filter_dict.values() for val in ['', None]): raise EntityVitalMetricValidationException( log_prefix + 'entity_filter field values cannot be empty strings or None') if filter_dict.get('field_type') not in ['alias', 'info']: raise EntityVitalMetricValidationException(log_prefix + 'field_type must be either alias or info') @staticmethod def convert_entity_filter_to_spl(json_data): """ convert entity filter json to spl Example: json_data = [ {"field":"os","value":"Ubuntu","field_type":"info"}, {"field":"os","value":"centos","field_type":"info"}, {"field":"host","value":"akron","field_type":"alias"}] convert to: (info_lookup="os=Ubuntu" OR info_lookup="os=centos" ) AND (alias_lookup="host=akron" ) """ spls = [] filter_dict = {} if json_data and isinstance(json_data, list): for filter_info in json_data: field_type = filter_info.get("field_type", "") field = filter_info.get("field", "") val = filter_info.get("value", "") if field not in filter_dict: filter_dict[field] = [] filter_dict[field].append('%s_lookup="%s=%s"' % (field_type, field, val)) for filter in filter_dict: spls.append("(%s )" % " OR ".join(filter_dict[filter])) return json.dumps(" AND ".join(spls)) @staticmethod def convert_threshold_to_spl(threshold): """ convert threshold to spl Example: "critical_threshold":["10","+inf"] convert to: "val>10" "info_threshold":["-inf","10"] convert to "val <= 10" """ spl = '' if threshold and len(threshold) == 2: threshold.sort() if threshold[0] == '-inf': spl = 'val <= {}'.format(threshold[1]) elif threshold[0] == '+inf': spl = 'val > {}'.format(threshold[1]) elif float(threshold[0]) > float(threshold[1]): spl = 'val <= {} and val > {}'.format( threshold[1], threshold[0]) else: spl = 'val <= {} and val > {}'.format( threshold[1], threshold[0]) return spl