You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

404 lines
18 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
from ITOA.itoa_common import validate_json
from ITOA.constants import OOTB_ENTITY_TYPE_DASHBOARD_IDS
import ITOA.itoa_common as utils
import http.client
import re
from ITOA.itoa_exceptions import ItoaValidationError
class EntityDashboardDrilldownValidationException(ItoaValidationError):
pass
class EntityDashboardDrilldown(object):
# captures all tokens of format ${token} . Example: https://google.com/${org_id}/${realm}
url_template_regex = re.compile("(\${([\-\w]+)})") # noqa
"""
EntityDashboardDrilldown - an entity dashboard drilldown for looking up dashboards associated
with the entity and of its entity_type
"""
UDF_DASHBOARD = 'udf_dashboard'
XML_DASHBOARD = 'xml_dashboard'
NAVIGATION_LINK = 'navigation_link'
ALLOWED_DASHBOARD_TYPES = [UDF_DASHBOARD, XML_DASHBOARD, NAVIGATION_LINK]
is_normalization_ff_on = None
def __init__(self, session_key, title, base_url, params, id, dashboard_type):
"""
Initialize an EntityDashboardDrilldown object
@type title: str
@param title: name of the dashboard drilldown
@type base_url: str
@param base_url: base URL the links to the dashboard
@type params: dict
@param params: contains two fields: alias_param_map and static_params
- alias_param_map: mapping of a URL param and it's alias
- static_params: params with a defined value.
@type dashboard_type: str
@param base_url: type of dashboard: udf_dashboard, xml_dashboard, navigation_link
"""
self.title = title
self.base_url = base_url
self.params = params
self.dashboard_type = dashboard_type
self.id = id
self.session_key = session_key
EntityDashboardDrilldown.normalization_feature_flagged(session_key)
@classmethod
def normalization_feature_flagged(cls, session_key):
""" initializing is_normalization_ff_on once at class level
@param session_key:
@return:
"""
if cls.is_normalization_ff_on is None:
cls.is_normalization_ff_on = (
utils.is_feature_enabled('itsi-duplicate-entity-normalization',
session_key, True))
return cls.is_normalization_ff_on
@classmethod
def validate(cls, log_prefix, json_data, logger=None, is_normalization_ff_on=False):
"""
Validate if input json data represents a valid entity dashboard drilldown
@type log_prefix: str
@param log_prefix: prefix for log message
@type json_data: dict
@param json_data: json data representing an entity dashboard drilldown
@type logger: object
@param logger: for logging
@type is_normalization_ff_on: boolean
@param is_normalization_ff_on: indicating whether the feature flag is on/off
@return: None
"""
validate_json(log_prefix, json_data)
cls._validate_params(log_prefix, json_data, logger, is_normalization_ff_on)
@classmethod
def _validate_params(cls, log_prefix, json_data, logger=None, is_normalization_ff_on=False):
chk_fields = ['title', 'base_url', 'dashboard_type']
req_fields = ['static_params', 'alias_param_map']
if is_normalization_ff_on:
alias_map_fields = ['alias', 'param', 'ds_source']
else:
alias_map_fields = ['alias', 'param']
for entry in chk_fields:
if json_data.get(entry) is None:
raise EntityDashboardDrilldownValidationException(log_prefix + f'{entry} must be specified.',
logger, log_prefix,
status_code=http.client.BAD_REQUEST,
uid='SI-ENT-TP_0009',
context={
'entry': entry
})
dboard_type = json_data.get('dashboard_type')
if dboard_type not in EntityDashboardDrilldown.ALLOWED_DASHBOARD_TYPES:
allowed_dashboard_types = ', '.join(EntityDashboardDrilldown.ALLOWED_DASHBOARD_TYPES)
raise EntityDashboardDrilldownValidationException(
log_prefix + f"dashboard type must be one of [{allowed_dashboard_types}]",
logger, log_prefix,
status_code=http.client.BAD_REQUEST,
uid='SI-ENT-TP_0010',
context={
'allowed_dashboard_types': allowed_dashboard_types
})
# Only navigation_link can modify base_url
if dboard_type in [EntityDashboardDrilldown.UDF_DASHBOARD, EntityDashboardDrilldown.XML_DASHBOARD]:
if json_data.get('base_url'):
raise EntityDashboardDrilldownValidationException(
log_prefix + f'base_url for dashboard type {dboard_type} cannot be modified',
logger, log_prefix,
status_code=http.client.BAD_REQUEST,
uid='SI-ENT-TP_0011',
context={
'dboard_type': dboard_type
})
# navigation_link cannot have empty base_url:
if dboard_type == EntityDashboardDrilldown.NAVIGATION_LINK:
if not json_data.get('base_url'):
raise EntityDashboardDrilldownValidationException(
log_prefix + f'base_url for dashboard type {dboard_type} cannot be empty',
logger, log_prefix,
status_code=http.client.BAD_REQUEST,
uid='SI-ENT-TP_0012',
context={
'dboard_type': dboard_type
})
params = json_data.get('params', {})
if not params:
raise EntityDashboardDrilldownValidationException(log_prefix + 'params cannot be empty',
logger, log_prefix,
status_code=http.client.BAD_REQUEST,
uid='SI-ENT-TP_0013'
)
all_keys = set(params.keys())
all_req_fields = set(req_fields)
if all_req_fields.difference(all_keys):
req_fields_str = ','.join(req_fields)
raise EntityDashboardDrilldownValidationException(
log_prefix + 'params must contain the following fields: {req_fields_str}',
logger, log_prefix,
status_code=http.client.BAD_REQUEST,
uid='SI-ENT-TP_0014',
context={
'req_fields': req_fields_str
})
if not isinstance(params.get('static_params'), dict):
raise EntityDashboardDrilldownValidationException(
log_prefix + 'static_params must be an instance of dict',
logger, log_prefix,
status_code=http.client.BAD_REQUEST,
uid='SI-ENT-TP_0015')
if not isinstance(params.get('alias_param_map'), list):
raise EntityDashboardDrilldownValidationException(
log_prefix + 'alias_param_map must be an instance of list',
logger, log_prefix,
status_code=http.client.BAD_REQUEST,
uid='SI-ENT-TP_0016'
)
if all_keys != all_req_fields:
diff = all_keys.difference(all_req_fields)
if len(diff) != 0:
diff_str = ','.join(diff)
raise EntityDashboardDrilldownValidationException(
log_prefix
+ f'found additional fields not recognized by the system: {diff_str}',
logger, log_prefix,
status_code=http.client.BAD_REQUEST,
uid='SI-ENT-TP_0017',
context={
'diff': diff_str
}
)
param_keys = set(params['static_params'])
dashboard_id = json_data.get('id', '')
for each_map in params.get('alias_param_map'):
if not isinstance(each_map, dict):
raise EntityDashboardDrilldownValidationException(
log_prefix + 'each object in alias_param_map must be an instance of dict',
logger, log_prefix,
status_code=http.client.BAD_REQUEST,
uid='SI-ENT-TP_0018')
if not is_normalization_ff_on:
if 'ds_source' in each_map:
each_map.pop('ds_source')
if 'alias' in each_map and each_map['alias'].startswith('ds_sources-'):
each_map['alias'] = each_map['alias'][11:]
if any(key not in alias_map_fields for key in set(each_map.keys())):
alias_map_fields_str = ", ".join(alias_map_fields)
extra_fields = [key for key in each_map.keys() if key not in alias_map_fields]
extra_fields_str = ", ".join(extra_fields)
raise EntityDashboardDrilldownValidationException(
log_prefix + f'each object in alias_param_map must contain the following fields: {alias_map_fields_str} check: {extra_fields_str}',
logger, log_prefix,
status_code=http.client.BAD_REQUEST,
uid='SI-ENT-TP_0019',
context={
'alias_map_fields': alias_map_fields_str,
'extra_fields': extra_fields_str
}
)
# OOTB dashboards will not have 'param' filled in
# custom dashboards may or may not have 'ds_source' filled in
for key in alias_map_fields:
if not each_map.get(key):
if key == 'param' and dashboard_id in OOTB_ENTITY_TYPE_DASHBOARD_IDS:
continue
if key == 'ds_source' and dashboard_id not in OOTB_ENTITY_TYPE_DASHBOARD_IDS:
continue
raise EntityDashboardDrilldownValidationException(log_prefix + f'{key} cannot be empty',
logger, log_prefix,
status_code=http.client.BAD_REQUEST,
uid='SI-ENT-TP_0020',
context={
'key': key
})
if each_map['param'] in param_keys:
raise EntityDashboardDrilldownValidationException(
log_prefix + f"one param {each_map['param']} cannot be added twice",
logger, log_prefix,
status_code=http.client.BAD_REQUEST,
uid='SI-ENT-TP_0021',
context={
'param': each_map['param']
})
param_keys.add(each_map['param'])
def _is_splunk_created_dashboard(self):
"""
Checks whether the current dashboard is a Splunk-created dashboard.
This function evaluates whether the dashboard is a Splunk-created dashboard by inspecting its type
and title. A dashboard is considered Splunk-created if it meets either of the following conditions:
1. The dashboard type is equal to EntityDashboardDrilldown.XML_DASHBOARD.
2. The dashboard type is equal to EntityDashboardDrilldown.UDF_DASHBOARD and the id is not present in
the list of OOTB dashboard id defined in OOTB_ENTITY_TYPE_DASHBOARD_IDS.
@rtype: bool
@return: True if the dashboard is a Splunk-created dashboard, False otherwise.
"""
return (self.dashboard_type == EntityDashboardDrilldown.XML_DASHBOARD
or (self.dashboard_type == EntityDashboardDrilldown.UDF_DASHBOARD
and self.id not in OOTB_ENTITY_TYPE_DASHBOARD_IDS))
def _is_dashboard(self):
"""
Checks whether the current drilldown is a dashboard. (could be navlink instead)
This function evaluates whether it is a dashboard:
1. The dashboard type is equal to EntityDashboardDrilldown.XML_DASHBOARD.
2. The dashboard type is equal to EntityDashboardDrilldown.UDF_DASHBOARD
@rtype: bool
@return: True if the dashboard is a dashboard, False otherwise.
"""
return (self.dashboard_type == EntityDashboardDrilldown.XML_DASHBOARD
or self.dashboard_type == EntityDashboardDrilldown.UDF_DASHBOARD)
def _strip_ds_sources_prefix(self, text):
"""Strips the prefix 'ds_sources-' from the given text if it exists.
We are doing this because when we introduced the ds_source option for alias values in
entity_type, UI depends on the ds_sources- prefix for knowing when to show the source discovery
searches drop down.
@param text: The string to process.
@return: The string with the prefix removed if it was present, otherwise the original string.
"""
if not text or text == '':
return text
return text[11:] if text.startswith("ds_sources-") else text
def _get_ds_source_alias_value(self, entity, alias_name, ds_source):
"""Retrieves the value associated with the given alias and data source.
@param entity: The entity dictionary containing the data.
@param alias_name: The name of the alias to retrieve.
@param ds_source: The name of the data source to use. usually it is the discovery search name
@return: The value associated with the alias and data source, or the alias value w/o the source, or [].
"""
# Check if necessary conditions are met before further lookups
alias_value_without_source = entity.get(alias_name, [])
if not ds_source or not entity.get('ds_sources') or not entity.get('ds_sources', {}).get(ds_source):
return alias_value_without_source
ds_sources_obj = entity['ds_sources'][ds_source]
# Return the value from the data source object, handling potential absence
return ds_sources_obj.get(alias_name, alias_value_without_source)
def build_url_params(self, entity):
"""
construct dashboard url and params based on the entity information from input.
@type entity: dict
@param entity: entity information
@rtype: dict
@return: dashboard drilldown with base url and params
"""
entity_params = self.params['static_params']
# if values in drilldown's static params are not list, convert into one
for key, val in entity_params.items():
if not isinstance(val, list):
entity_params[key] = [val]
is_dashboard = self._is_splunk_created_dashboard()
if EntityDashboardDrilldown.is_normalization_ff_on:
is_dashboard = self._is_dashboard() # OOTB param can be edited
if is_dashboard:
# regardless of whether FF is on or off, we will continue
# to keep "<alias>": "<param>" pair directly inside "param":{} for those
# alias-param pairs that don't use ds_sources at all. this will
# ensure it is backward compatible
alias_param_without_ds_sources = [
item for item in self.params['alias_param_map']
if 'ds_source' not in item or not item['ds_source']
]
for alias_param in alias_param_without_ds_sources:
entity_params[alias_param['alias']] = alias_param['param']
# when FF is on, we will copy the entire alias_param_map[] over
# so we can accomodate the alias-param that utilizes ds_sources
# as well the ones that don't
if EntityDashboardDrilldown.is_normalization_ff_on:
entity_params['alias_param_map'] = self.params['alias_param_map']
else: # navlinks
# merge 'static_params' and 'alias_param_map' in drilldown['params'] into one dict
# Get entity_params dict with key as parameter name and value as a list
for alias_param in self.params['alias_param_map']:
# on the UI side, the construction of the alias param mapping
# may prefix the actual alias name with 'ds_sources-' so it
# knows to show the proper UI selection box.
alias_name = self._strip_ds_sources_prefix(alias_param['alias'])
if EntityDashboardDrilldown.is_normalization_ff_on:
entity_alias_value = self._get_ds_source_alias_value(entity, alias_name,
alias_param.get('ds_source', ''))
else:
entity_alias_value = entity.get(alias_name, [])
if not isinstance(entity_alias_value, list):
entity_alias_value = [entity_alias_value]
entity_params[alias_param['param']] = entity_alias_value
token_results = EntityDashboardDrilldown.url_template_regex.findall(self.base_url)
processed_url = self.base_url
# token_results is of the format: [('${token_1}', 'token_1'), ('${token_2}', 'token_2')]
for result in token_results:
entity_field_value = entity.get(result[1], [''])
if isinstance(entity_field_value, list):
processed_url = processed_url.replace(result[0], entity_field_value[0])
if isinstance(entity_field_value, str):
processed_url = processed_url.replace(result[0], entity_field_value)
# create dict with base_url, params and metadata
url_params_with_metadata_json = {
'dashboard_drilldown_title': self.title,
'dashboard_type': self.dashboard_type,
'base_url': processed_url,
'params': entity_params,
'id': self.id,
}
return url_params_with_metadata_json