# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. import hashlib import json from ITOA.itoa_object import ItoaObject, CRUDMethodTypes from ITOA.setup_logging import logger from ITOA import itoa_common as utils from itsi.objects.itsi_kpi import ItsiKpi from itsi.objects.itsi_service import ItsiService from itsi.objects.itsi_security_group import ItsiSecGrp from ITOA.controller_utils import ITOAError from ITOA.itoa_exceptions import ItoaAccessDeniedError class ItsiKpiEntityThreshold(ItoaObject): ''' Implements ITSI KPI Entity Threshold for Entity level Adaptive Thresholding ''' log_prefix = '[ITSI KPI Entity Threshold] ' collection_name = 'itsi_entity_thresholds' def __init__(self, session_key, current_user_name): self.sec_grp = ItsiSecGrp(session_key, current_user_name) super(ItsiKpiEntityThreshold, self).__init__( session_key, current_user_name, 'kpi_entity_threshold', collection_name=self.collection_name, is_securable_object=False, title_validation_required=False) @staticmethod def generate_fixed_key(kpi_id, entity_title): """ Generate fixed key for referring to KPI entity thresholds :type kpi_id: string :param kpi_id: KPI ID :type entity_title: string :param entity_title: Entity title :rtype: string :return: MD5 hash representing a key for a KPI entity threshold """ # Note: If updating this make to update front-end hash generation for entity configs return hashlib.md5((json.dumps([entity_title, kpi_id], separators=(',', ':'))).encode('utf-8')).hexdigest() def do_object_validation(self, owner, objects, validate_name=True, dupname_tag=None, transaction_id=None, skip_local_failure=False, ignore_same_key=False): super(ItsiKpiEntityThreshold, self).do_object_validation(owner, objects, validate_name, dupname_tag, transaction_id=transaction_id) for json_data in objects: if not (json_data.get('kpi_id', None) and json_data.get('entity_title', None)): self.raise_error_bad_validation(logger, 'KPI Id and Entity Title are required for the object_type: {}.'.format(self.object_type)) def do_additional_setup(self, owner, objects, req_source='unknown', method=CRUDMethodTypes.METHOD_UPSERT, transaction_id=None, skip_local_failure=False, **kwargs): """ Additional setup includes: * Setting a default entity_title * Deriving key from entity_key and kpi_id for easy retrieval See parent class for function signature """ self.attach_security_groups(owner, objects, transaction_id) for object in objects: if '_key' in object: logger.warning('KPI entity thresholds with custom key (%s) may be unfetchable' % (object['_key'])) else: object['_key'] = self.generate_fixed_key(object['kpi_id'], object['entity_title']) def attach_security_groups(self, owner, objects, transaction_id): """ Attach inferred security groups and acl details to this object (from services). :param owner: Splunk user to own object :type owner: basestring :param objects: kpi_entity_threshold objects to update :type objects: list :param transaction_id: Transaction ID for logging :type transaction_id: basestring """ if not objects or len(objects) == 0: return try: kpi_interface = ItsiKpi(self.session_key, owner) service_interface = ItsiService(self.session_key, owner) # Fetch/cache Details of security group secgrp_all = self.sec_grp.get_bulk(owner, transaction_id=transaction_id) secgrp_acl_map = {} for secgrp in secgrp_all: secgrp_acl_map[secgrp['_key']] = secgrp['acl'] for kpi_entity_threshold in objects: if self.current_user_name == 'nobody': kpi_entity_threshold['sec_grp'] = 'default_itsi_security_group' kpi_entity_threshold['acl'] = { 'read': ['*', 'itoa_admin', 'itoa_team_admin', 'itoa_analyst', 'itoa_user'], 'delete': ['itoa_admin'], 'write': ['itoa_admin'], 'owner': 'nobody' } continue # get service_id, if not available then fetch using kpi_id if not kpi_entity_threshold.get('service_id', None): kpi_id = kpi_entity_threshold.get('kpi_id', None) kpi = kpi_interface.get( owner, kpi_id, req_source='kpi_entity_threshold', transaction_id=transaction_id, ) if len(kpi) < 1: logger.warning(f'kpi {kpi_id} not found skipping threshold object for {kpi_entity_threshold["entity_title"]}') continue kpi_entity_threshold['service_id'] = kpi[0]['_key'] # Get serviceId from KPI object # Get details of service that contains the kpi service = service_interface.get( owner, kpi_entity_threshold['service_id'], req_source='kpi_entity_threshold', transaction_id=transaction_id, ) if service is None: raise ITOAError(status='404', message='Service not found.') elif not service.get('sec_grp', None): raise ITOAError(status='403', message='Service permission denied.') else: kpi_entity_threshold['sec_grp'] = service['sec_grp'] kpi_entity_threshold['acl'] = secgrp_acl_map[service['sec_grp']] # update this object with parent service acl except ItoaAccessDeniedError: raise ITOAError(status='403', message='Service permission denied.') except Exception as e: message = str(e) logger.error(f'attach security failed with: {message}') logger.exception(e) raise ITOAError(status='500', message=message) def get_bulk(self, owner, sort_key=None, sort_dir=None, filter_data=None, fields=None, skip=None, limit=None, req_source='unknown', transaction_id=None): """ Overriding the itoa_object get_bulk function. KPI entity threshold object has securable=False, hence requires explicit read permission checks. """ results = super().get_bulk(owner, sort_key, sort_dir, filter_data, fields, skip, limit, req_source, transaction_id) self.attach_security_groups(owner, results, transaction_id) results = self.sec_grp.enforce_security_self(results) # enforces read permission return results def batch_save_backend(self, owner, data_list, transaction_id=None): """ Overriding the itoa_object batch_save_backend function. KPI entity threshold object has securable=False, hence requires explicit write permission checks. """ self.attach_security_groups(owner, data_list, transaction_id) results = self.sec_grp.enforce_security_self(data_list) write_objects = self._filter_by_permission(results, 'write') return super().batch_save_backend(owner, write_objects, transaction_id) def _filter_by_permission(self, data_list, perm): """ Filter items based on permission """ accessible = [] non_accessible = [] if not isinstance(data_list, list): return [] for data in data_list: if data['permissions'][perm]: data.pop('permissions', None) accessible.append(data) else: non_accessible.append(data['_key']) if len(non_accessible) > 1: logger.warning(f'cannot {perm} following keys: {non_accessible}') return accessible def delete_bulk( self, owner, filter_data=None, req_source='unknown', transaction_id=None ): """ Overriding the itoa_object delete_bulk function. Fetch rows(in batches) matching filter, select rows that user is allowed to delete and then delete. """ transaction_id = self._instrumentation.push('kpi_entity_threshold.delete_bulk', transaction_id=transaction_id, owner=owner) deleted_count = 0 requested_skip = 0 # get in batch sizes configured for an object type batch_size = utils.get_object_batch_size(self.session_key, self.object_type) while True: results = super().get_bulk( owner, sort_key=None, sort_dir=None, filter_data=filter_data, limit=batch_size, skip=requested_skip, transaction_id=transaction_id, ) # we reached end of paginated reads if not results or len(results) == 0: break self.attach_security_groups(owner, results, transaction_id) results = self.sec_grp.enforce_security_self(results) delete_objects = self._filter_by_permission(results, 'delete') requested_skip += batch_size # continue to next subset of records if none objects were deletable if not delete_objects or len(delete_objects) == 0: continue deleted_count += len(delete_objects) self.delete_batch(owner, delete_objects, req_source, transaction_id) self._instrumentation.pop('kpi_entity_threshold.delete_bulk', transaction_id, metric_info={'numberOfObjects': deleted_count})