You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

314 lines
11 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import json
from ITOA.itoa_common import validate_json
class EntityDataDrilldownValidationException(Exception):
pass
class EntityDataDrilldown(object):
"""
EntityDataDrilldown - an entity data drilldown is used to associated entity with raw data
"""
valid_types = ['metrics', 'events']
def __init__(self, title, type, static_filter, entity_field_filter):
"""
Initialize an EntityDataDrilldown object
@type title: str
@param title: name of the drilldown
@type type: str
@param type: corresponds to the raw data type in splunk (i.e. metrics or events)
@type: static_filter: dict
@param static_filter: json filter used to find subset of data that might be associated based on some static information
@type entity_field_filter: dict
@param entity_field_filter: json filter to be constructed using entity's alias information
"""
self.title = title
self.type = type
self.static_filter = DataDrilldownFilter.from_json(static_filter)
self.entity_field_filter = DataDrilldownFilter.from_json(entity_field_filter)
@classmethod
def validate(cls, log_prefix, json_data):
"""
Validate if input json represents a valid entity data drilldown
@type log_prefix: str
@param log_prefix: prefix for log message
@param json_data: data drilldown in json format
@type json_data: dict
@return: None
"""
validate_json(log_prefix, json_data)
cls._validate_type(log_prefix, json_data.get('type'))
cls._validate_data_filter(log_prefix, json_data.get('static_filter'), json_data.get('entity_field_filter'))
@classmethod
def _validate_type(cls, log_prefix, drilldown_type):
"""
Validate if the input type for entity data drilldown is valid
@type log_prefix: str
@param log_prefix: prefix for log message
@type drilldown_type: str
@param drilldown_type: drilldown type
@return: None
"""
if drilldown_type not in cls.valid_types:
raise EntityDataDrilldownValidationException(
log_prefix + 'Invalid data type for entity data drilldown - "%s". Must be one of %s' % (
drilldown_type,
', '.join(cls.valid_types)
)
)
@classmethod
def _validate_data_filter(cls, log_prefix, static_filter, entity_field_filter):
"""
Validate both static filter and entity field filter
@type log_prefix: str
@param log_prefix: prefix for log message
@type static_filter: dict
@param static_filter:
@type entity_field_filter: dict
@param entity_field_filter:
@return: None
"""
if not static_filter or not entity_field_filter:
raise EntityDataDrilldownValidationException(log_prefix
+ 'static_filter and entity_field_filter cannot be empty')
DataDrilldownFilter.validate_data_filter_json(log_prefix, static_filter)
DataDrilldownFilter.validate_data_filter_json(log_prefix, entity_field_filter)
def build_data_drilldown_filter(self, entity):
"""
construct data filter based on the entity information from input.
@type entity: dict
@param entity: entity information
@rtype: dict
@return: A data drilldown filter
"""
drilldown_filter = BooleanFilter(
type='and',
filters=[self.static_filter, self.entity_field_filter]
)
filter_json = drilldown_filter.to_json(reference_entity=entity)
filter_json_with_metadata = {
'data_drilldown_title': self.title,
'type': self.type,
'filter': filter_json
}
return filter_json_with_metadata
class DataDrilldownFilter(object):
"""
DataDrilldownFilter - represents a data drilldown filter which can be used to filter raw data.
"""
def __init__(self, type):
self.type = type
@staticmethod
def validate_data_filter_json(log_prefix, json_data):
"""
Helper method to perform recursive validation on filters to check the necessary fields and structure
@type log_prefix: str
@param log_prefix: prefix for log message
@type json_data: dict
@param json_data: filter data in json format
@return: None
"""
f_type = json_data.get('type')
if f_type in BasicFilter.ALLOWED_TYPES:
BasicFilter.validate(log_prefix, json_data)
elif f_type in BooleanFilter.ALLOWED_TYPES:
BooleanFilter.validate(log_prefix, json_data)
elif f_type in EntityFieldFilter.ALLOWED_TYPES:
EntityFieldFilter.validate(log_prefix, json_data)
else:
raise EntityDataDrilldownValidationException(log_prefix + 'Invalid filter type found "%s"' % f_type)
@staticmethod
def from_json(json_data):
"""
Deserialize input json data into an DataDrilldownFilter object
@type json_data: dict
@param json_data:
@rtype: DataDrilldownFilter
@return: A DataDrilldownFilter instance
"""
params = json.loads(json_data) if isinstance(json_data, str) else json_data
filter_type = params.get('type')
if filter_type in BasicFilter.ALLOWED_TYPES:
return BasicFilter(**params)
elif filter_type in EntityFieldFilter.ALLOWED_TYPES:
return EntityFieldFilter(**params)
elif filter_type in BooleanFilter.ALLOWED_TYPES:
return BooleanFilter(
type=filter_type,
filters=[DataDrilldownFilter.from_json(data) for data in json_data.get('filters', [])]
)
def to_json(self, **kwargs):
"""
Serialize a DataDrilldownFilter object into its JSON format
@type kwargs: dict
@param kwargs: Any additional information required for serialization in key value format
@return: json
"""
raise NotImplementedError()
class BasicFilter(DataDrilldownFilter):
"""
BasicFilter - equivalent a field=value filter OR field!=value filter based on its type
"""
ALLOWED_TYPES = ['include', 'exclude']
REQUIRED_FIELDS = ['type', 'values', 'field']
def __init__(self, type, field, values):
super(BasicFilter, self).__init__(type)
self.type = type
self.field = field
self.values = values
@classmethod
def validate(cls, log_prefix, json_data):
if not all([f in json_data for f in cls.REQUIRED_FIELDS]):
raise EntityDataDrilldownValidationException(log_prefix + 'Missing field in basic filter, must contain %s' % cls.REQUIRED_FIELDS)
if json_data['type'] not in cls.ALLOWED_TYPES:
raise EntityDataDrilldownValidationException(log_prefix + 'Invalid type found for basic filter - "%s"' % json_data['type'])
if not isinstance(json_data['values'], list):
raise EntityDataDrilldownValidationException(
log_prefix + 'Invalid type for values field of basic filter, must be list. Found: %s' %
type(json_data['values']).__name__
)
def to_json(self, **kwargs):
return {
'type': self.type,
'field': {'name': self.field},
'values': self.values
}
class BooleanFilter(DataDrilldownFilter):
"""
BooleanFilter - glues BasicFilter and EntityFieldFilter together to express complex boolean logic
"""
ALLOWED_TYPES = ['or', 'and']
REQUIRED_FIELDS = ['type', 'filters']
def __init__(self, type, filters):
super(BooleanFilter, self).__init__(type)
self.type = type
self.filters = filters
@classmethod
def validate(cls, log_prefix, json_data):
if 'filters' not in json_data:
raise EntityDataDrilldownValidationException(log_prefix + 'Missing field in boolean filter, must contain "filters"')
if not isinstance(json_data['filters'], list):
raise EntityDataDrilldownValidationException(log_prefix + 'filters of a boolean filter must be a list')
if json_data['type'] not in cls.ALLOWED_TYPES:
raise EntityDataDrilldownValidationException(log_prefix + 'Invalid type found for boolean filter - "%s"' % json_data['type'])
for filter_json in json_data['filters']:
DataDrilldownFilter.validate_data_filter_json(log_prefix, filter_json)
def to_json(self, **kwargs):
return {
'type': self.type,
'filters': [f.to_json(**kwargs) for f in self.filters]
}
class EntityFieldFilter(DataDrilldownFilter):
"""
EntityFieldFilter - A filter "template" that could be constructed using an entity's alias.
"""
ALLOWED_TYPES = ['entity']
REQUIRED_FIELDS = ['type', 'data_field', 'entity_field']
def __init__(self, type, data_field, entity_field):
super(EntityFieldFilter, self).__init__(type)
self.type = type
self.data_field = data_field
self.entity_field = entity_field
@classmethod
def validate(cls, log_prefix, json_data):
if not all([f in json_data for f in cls.REQUIRED_FIELDS]):
raise EntityDataDrilldownValidationException(log_prefix + 'Missing field in entity field filter, must contain %s' % cls.REQUIRED_FIELDS)
if json_data['type'] not in cls.ALLOWED_TYPES:
raise EntityDataDrilldownValidationException(log_prefix + 'Invalid type found for entity field filter - "%s"' % json_data['type'])
def convert_to_basic_filter(self, entity):
"""
Convert a EntityFieldFilter object to a BasicFilter object based on the input entity information
@type entity: dict
@param entity:
@rtype: BasicFilter
@return: A BasicFilter instance
"""
value = entity.get(self.entity_field)
if not isinstance(value, list):
value = [value]
return BasicFilter(
type='include',
field=self.data_field,
values=value
)
def to_json(self, **kwargs):
"""
Convert an EntityFieldFilter object to json, and convert to BasicFilter json if a
reference entity is passed in
@type kwargs: dict
@param kwargs: Any extra information
@keyword reference_entity: The entity to use to convert to a basic filter
@rtype: dict
@return: the EntityFieldFilter instance in json format
"""
reference_entity = kwargs.get('reference_entity')
if isinstance(reference_entity, dict):
return self.convert_to_basic_filter(reference_entity).to_json()
return {
'type': self.type,
'data_field': self.data_field,
'entity_field': self.entity_field
}