# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. from ITOA.itoa_common import validate_json from ITOA.constants import OOTB_ENTITY_TYPE_DASHBOARD_IDS import ITOA.itoa_common as utils import http.client import re from ITOA.itoa_exceptions import ItoaValidationError class EntityDashboardDrilldownValidationException(ItoaValidationError): pass class EntityDashboardDrilldown(object): # captures all tokens of format ${token} . Example: https://google.com/${org_id}/${realm} url_template_regex = re.compile("(\${([\-\w]+)})") # noqa """ EntityDashboardDrilldown - an entity dashboard drilldown for looking up dashboards associated with the entity and of its entity_type """ UDF_DASHBOARD = 'udf_dashboard' XML_DASHBOARD = 'xml_dashboard' NAVIGATION_LINK = 'navigation_link' ALLOWED_DASHBOARD_TYPES = [UDF_DASHBOARD, XML_DASHBOARD, NAVIGATION_LINK] is_normalization_ff_on = None def __init__(self, session_key, title, base_url, params, id, dashboard_type): """ Initialize an EntityDashboardDrilldown object @type title: str @param title: name of the dashboard drilldown @type base_url: str @param base_url: base URL the links to the dashboard @type params: dict @param params: contains two fields: alias_param_map and static_params - alias_param_map: mapping of a URL param and it's alias - static_params: params with a defined value. @type dashboard_type: str @param base_url: type of dashboard: udf_dashboard, xml_dashboard, navigation_link """ self.title = title self.base_url = base_url self.params = params self.dashboard_type = dashboard_type self.id = id self.session_key = session_key EntityDashboardDrilldown.normalization_feature_flagged(session_key) @classmethod def normalization_feature_flagged(cls, session_key): """ initializing is_normalization_ff_on once at class level @param session_key: @return: """ if cls.is_normalization_ff_on is None: cls.is_normalization_ff_on = ( utils.is_feature_enabled('itsi-duplicate-entity-normalization', session_key, True)) return cls.is_normalization_ff_on @classmethod def validate(cls, log_prefix, json_data, logger=None, is_normalization_ff_on=False): """ Validate if input json data represents a valid entity dashboard drilldown @type log_prefix: str @param log_prefix: prefix for log message @type json_data: dict @param json_data: json data representing an entity dashboard drilldown @type logger: object @param logger: for logging @type is_normalization_ff_on: boolean @param is_normalization_ff_on: indicating whether the feature flag is on/off @return: None """ validate_json(log_prefix, json_data) cls._validate_params(log_prefix, json_data, logger, is_normalization_ff_on) @classmethod def _validate_params(cls, log_prefix, json_data, logger=None, is_normalization_ff_on=False): chk_fields = ['title', 'base_url', 'dashboard_type'] req_fields = ['static_params', 'alias_param_map'] if is_normalization_ff_on: alias_map_fields = ['alias', 'param', 'ds_source'] else: alias_map_fields = ['alias', 'param'] for entry in chk_fields: if json_data.get(entry) is None: raise EntityDashboardDrilldownValidationException(log_prefix + f'{entry} must be specified.', logger, log_prefix, status_code=http.client.BAD_REQUEST, uid='SI-ENT-TP_0009', context={ 'entry': entry }) dboard_type = json_data.get('dashboard_type') if dboard_type not in EntityDashboardDrilldown.ALLOWED_DASHBOARD_TYPES: allowed_dashboard_types = ', '.join(EntityDashboardDrilldown.ALLOWED_DASHBOARD_TYPES) raise EntityDashboardDrilldownValidationException( log_prefix + f"dashboard type must be one of [{allowed_dashboard_types}]", logger, log_prefix, status_code=http.client.BAD_REQUEST, uid='SI-ENT-TP_0010', context={ 'allowed_dashboard_types': allowed_dashboard_types }) # Only navigation_link can modify base_url if dboard_type in [EntityDashboardDrilldown.UDF_DASHBOARD, EntityDashboardDrilldown.XML_DASHBOARD]: if json_data.get('base_url'): raise EntityDashboardDrilldownValidationException( log_prefix + f'base_url for dashboard type {dboard_type} cannot be modified', logger, log_prefix, status_code=http.client.BAD_REQUEST, uid='SI-ENT-TP_0011', context={ 'dboard_type': dboard_type }) # navigation_link cannot have empty base_url: if dboard_type == EntityDashboardDrilldown.NAVIGATION_LINK: if not json_data.get('base_url'): raise EntityDashboardDrilldownValidationException( log_prefix + f'base_url for dashboard type {dboard_type} cannot be empty', logger, log_prefix, status_code=http.client.BAD_REQUEST, uid='SI-ENT-TP_0012', context={ 'dboard_type': dboard_type }) params = json_data.get('params', {}) if not params: raise EntityDashboardDrilldownValidationException(log_prefix + 'params cannot be empty', logger, log_prefix, status_code=http.client.BAD_REQUEST, uid='SI-ENT-TP_0013' ) all_keys = set(params.keys()) all_req_fields = set(req_fields) if all_req_fields.difference(all_keys): req_fields_str = ','.join(req_fields) raise EntityDashboardDrilldownValidationException( log_prefix + 'params must contain the following fields: {req_fields_str}', logger, log_prefix, status_code=http.client.BAD_REQUEST, uid='SI-ENT-TP_0014', context={ 'req_fields': req_fields_str }) if not isinstance(params.get('static_params'), dict): raise EntityDashboardDrilldownValidationException( log_prefix + 'static_params must be an instance of dict', logger, log_prefix, status_code=http.client.BAD_REQUEST, uid='SI-ENT-TP_0015') if not isinstance(params.get('alias_param_map'), list): raise EntityDashboardDrilldownValidationException( log_prefix + 'alias_param_map must be an instance of list', logger, log_prefix, status_code=http.client.BAD_REQUEST, uid='SI-ENT-TP_0016' ) if all_keys != all_req_fields: diff = all_keys.difference(all_req_fields) if len(diff) != 0: diff_str = ','.join(diff) raise EntityDashboardDrilldownValidationException( log_prefix + f'found additional fields not recognized by the system: {diff_str}', logger, log_prefix, status_code=http.client.BAD_REQUEST, uid='SI-ENT-TP_0017', context={ 'diff': diff_str } ) param_keys = set(params['static_params']) dashboard_id = json_data.get('id', '') for each_map in params.get('alias_param_map'): if not isinstance(each_map, dict): raise EntityDashboardDrilldownValidationException( log_prefix + 'each object in alias_param_map must be an instance of dict', logger, log_prefix, status_code=http.client.BAD_REQUEST, uid='SI-ENT-TP_0018') if not is_normalization_ff_on: if 'ds_source' in each_map: each_map.pop('ds_source') if 'alias' in each_map and each_map['alias'].startswith('ds_sources-'): each_map['alias'] = each_map['alias'][11:] if any(key not in alias_map_fields for key in set(each_map.keys())): alias_map_fields_str = ", ".join(alias_map_fields) extra_fields = [key for key in each_map.keys() if key not in alias_map_fields] extra_fields_str = ", ".join(extra_fields) raise EntityDashboardDrilldownValidationException( log_prefix + f'each object in alias_param_map must contain the following fields: {alias_map_fields_str} check: {extra_fields_str}', logger, log_prefix, status_code=http.client.BAD_REQUEST, uid='SI-ENT-TP_0019', context={ 'alias_map_fields': alias_map_fields_str, 'extra_fields': extra_fields_str } ) # OOTB dashboards will not have 'param' filled in # custom dashboards may or may not have 'ds_source' filled in for key in alias_map_fields: if not each_map.get(key): if key == 'param' and dashboard_id in OOTB_ENTITY_TYPE_DASHBOARD_IDS: continue if key == 'ds_source' and dashboard_id not in OOTB_ENTITY_TYPE_DASHBOARD_IDS: continue raise EntityDashboardDrilldownValidationException(log_prefix + f'{key} cannot be empty', logger, log_prefix, status_code=http.client.BAD_REQUEST, uid='SI-ENT-TP_0020', context={ 'key': key }) if each_map['param'] in param_keys: raise EntityDashboardDrilldownValidationException( log_prefix + f"one param {each_map['param']} cannot be added twice", logger, log_prefix, status_code=http.client.BAD_REQUEST, uid='SI-ENT-TP_0021', context={ 'param': each_map['param'] }) param_keys.add(each_map['param']) def _is_splunk_created_dashboard(self): """ Checks whether the current dashboard is a Splunk-created dashboard. This function evaluates whether the dashboard is a Splunk-created dashboard by inspecting its type and title. A dashboard is considered Splunk-created if it meets either of the following conditions: 1. The dashboard type is equal to EntityDashboardDrilldown.XML_DASHBOARD. 2. The dashboard type is equal to EntityDashboardDrilldown.UDF_DASHBOARD and the id is not present in the list of OOTB dashboard id defined in OOTB_ENTITY_TYPE_DASHBOARD_IDS. @rtype: bool @return: True if the dashboard is a Splunk-created dashboard, False otherwise. """ return (self.dashboard_type == EntityDashboardDrilldown.XML_DASHBOARD or (self.dashboard_type == EntityDashboardDrilldown.UDF_DASHBOARD and self.id not in OOTB_ENTITY_TYPE_DASHBOARD_IDS)) def _is_dashboard(self): """ Checks whether the current drilldown is a dashboard. (could be navlink instead) This function evaluates whether it is a dashboard: 1. The dashboard type is equal to EntityDashboardDrilldown.XML_DASHBOARD. 2. The dashboard type is equal to EntityDashboardDrilldown.UDF_DASHBOARD @rtype: bool @return: True if the dashboard is a dashboard, False otherwise. """ return (self.dashboard_type == EntityDashboardDrilldown.XML_DASHBOARD or self.dashboard_type == EntityDashboardDrilldown.UDF_DASHBOARD) def _strip_ds_sources_prefix(self, text): """Strips the prefix 'ds_sources-' from the given text if it exists. We are doing this because when we introduced the ds_source option for alias values in entity_type, UI depends on the ds_sources- prefix for knowing when to show the source discovery searches drop down. @param text: The string to process. @return: The string with the prefix removed if it was present, otherwise the original string. """ if not text or text == '': return text return text[11:] if text.startswith("ds_sources-") else text def _get_ds_source_alias_value(self, entity, alias_name, ds_source): """Retrieves the value associated with the given alias and data source. @param entity: The entity dictionary containing the data. @param alias_name: The name of the alias to retrieve. @param ds_source: The name of the data source to use. usually it is the discovery search name @return: The value associated with the alias and data source, or the alias value w/o the source, or []. """ # Check if necessary conditions are met before further lookups alias_value_without_source = entity.get(alias_name, []) if not ds_source or not entity.get('ds_sources') or not entity.get('ds_sources', {}).get(ds_source): return alias_value_without_source ds_sources_obj = entity['ds_sources'][ds_source] # Return the value from the data source object, handling potential absence return ds_sources_obj.get(alias_name, alias_value_without_source) def build_url_params(self, entity): """ construct dashboard url and params based on the entity information from input. @type entity: dict @param entity: entity information @rtype: dict @return: dashboard drilldown with base url and params """ entity_params = self.params['static_params'] # if values in drilldown's static params are not list, convert into one for key, val in entity_params.items(): if not isinstance(val, list): entity_params[key] = [val] is_dashboard = self._is_splunk_created_dashboard() if EntityDashboardDrilldown.is_normalization_ff_on: is_dashboard = self._is_dashboard() # OOTB param can be edited if is_dashboard: # regardless of whether FF is on or off, we will continue # to keep "": "" pair directly inside "param":{} for those # alias-param pairs that don't use ds_sources at all. this will # ensure it is backward compatible alias_param_without_ds_sources = [ item for item in self.params['alias_param_map'] if 'ds_source' not in item or not item['ds_source'] ] for alias_param in alias_param_without_ds_sources: entity_params[alias_param['alias']] = alias_param['param'] # when FF is on, we will copy the entire alias_param_map[] over # so we can accomodate the alias-param that utilizes ds_sources # as well the ones that don't if EntityDashboardDrilldown.is_normalization_ff_on: entity_params['alias_param_map'] = self.params['alias_param_map'] else: # navlinks # merge 'static_params' and 'alias_param_map' in drilldown['params'] into one dict # Get entity_params dict with key as parameter name and value as a list for alias_param in self.params['alias_param_map']: # on the UI side, the construction of the alias param mapping # may prefix the actual alias name with 'ds_sources-' so it # knows to show the proper UI selection box. alias_name = self._strip_ds_sources_prefix(alias_param['alias']) if EntityDashboardDrilldown.is_normalization_ff_on: entity_alias_value = self._get_ds_source_alias_value(entity, alias_name, alias_param.get('ds_source', '')) else: entity_alias_value = entity.get(alias_name, []) if not isinstance(entity_alias_value, list): entity_alias_value = [entity_alias_value] entity_params[alias_param['param']] = entity_alias_value token_results = EntityDashboardDrilldown.url_template_regex.findall(self.base_url) processed_url = self.base_url # token_results is of the format: [('${token_1}', 'token_1'), ('${token_2}', 'token_2')] for result in token_results: entity_field_value = entity.get(result[1], ['']) if isinstance(entity_field_value, list): processed_url = processed_url.replace(result[0], entity_field_value[0]) if isinstance(entity_field_value, str): processed_url = processed_url.replace(result[0], entity_field_value) # create dict with base_url, params and metadata url_params_with_metadata_json = { 'dashboard_drilldown_title': self.title, 'dashboard_type': self.dashboard_type, 'base_url': processed_url, 'params': entity_params, 'id': self.id, } return url_params_with_metadata_json