# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. from ITOA.itoa_object import ItoaObject, CRUDMethodTypes from ITOA.saved_search_utility import SavedSearch from ITOA import itoa_common as utils from ITOA.setup_logging import logger, InstrumentCall from itsi.objects.itsi_entity import ItsiEntity from itsi.objects.itsi_entity_type import ItsiEntityType class ItsiEntityManagementPolicies(ItoaObject): log_prefix = '[ITSI Entity Management Policies]' collection_name = 'itsi_entity_management_policies' unmark_retirable_search = 'Entity Lifecycle Management - Unmark Retirable Search' template_savedsearch_name = 'EntityManagementPolicy-%s' owner = 'nobody' def __init__(self, session_key, current_user_name): super(ItsiEntityManagementPolicies, self).__init__(session_key, current_user_name, 'entity_management_policies', collection_name=self.collection_name, title_validation_required=True, is_securable_object=True) def do_object_validation(self, owner, objects, validate_name=True, dupname_tag=None, transaction_id=None, skip_local_failure=False): super(ItsiEntityManagementPolicies, self).do_object_validation(owner, objects, validate_name, dupname_tag, transaction_id, skip_local_failure) self._validate_entity_types_in_policy(objects, dupname_tag) def _validate_entity_types_in_policy(self, objects, dupname_tag): for json_data in objects[:]: is_restore_job = False if dupname_tag == '_dup_from_Backup_Restore_Jobs_Processor': is_restore_job = True entity_types_json = json_data.get('entity_type_ids') has_not_sent_rule_key = json_data.get('entity_management_rule', {}).get('_key', '') if entity_types_json is None \ or not utils.is_valid_list(entity_types_json): self.raise_error_bad_validation( logger, 'invalid entity type id or rule') if not utils.is_valid_str(has_not_sent_rule_key): self.raise_error_bad_validation( logger, 'invalid policy rule') # Check if we are creating a generic policy for this rule if not entity_types_json: # Note: this request should be expanded to multiple rules when policies have multiple rules in future filter_data_generic = {'$and': [{'_key': {"$ne": json_data.get('_key')}}, {'entity_management_rule._key': has_not_sent_rule_key}, {"entity_type_ids": []}]} other_generic_policies = self.storage_interface.get_all( self.session_key, self.owner, objecttype=self.object_type, filter_data=filter_data_generic, current_user_name=self.current_user_name ) if other_generic_policies: if is_restore_job: logger.warning('Restore is skipped for policy [%s] because a similar policy with all entity ' 'types exists in the system.' % (json_data.get('title'))) objects.remove(json_data) else: self.raise_error_bad_validation( logger, ('Policy : %s can\'t be created because a similar policy with all entity ' 'types exists in the system. To fix this issue add one or more ' 'entity types relevant to the policy before retrying the operation.') % (json_data.get('title'))) else: # Note: this request should be expanded to multiple rules when policies have multiple rules in future filter_data = {'$and': [{'_key': {'$ne': json_data.get('_key')}}, {'entity_management_rule._key': has_not_sent_rule_key}]} all_other_policies = self.storage_interface.get_all( self.session_key, self.owner, objecttype=self.object_type, filter_data=filter_data, current_user_name=self.current_user_name ) rule_to_entity_type_mapping = {} for policy in all_other_policies: rule_key = policy.get('entity_management_rule', {}).get('_key', '') if utils.is_valid_str(rule_key): if rule_key not in rule_to_entity_type_mapping: rule_to_entity_type_mapping[rule_key] = {} for entity_type_id in policy.get('entity_type_ids', []): rule_to_entity_type_mapping[rule_key][entity_type_id] = { 'title': policy['title'], '_key': policy['_key'], } # Note: Check against multiple rules when policies have multiple rules in future if rule_to_entity_type_mapping.get(has_not_sent_rule_key, None): common_entity_types = set(entity_types_json) & set( rule_to_entity_type_mapping.get(has_not_sent_rule_key, {}).keys()) if (common_entity_types): filter_data_entity_types = {'$or': [{"_key": entity_type_id} for entity_type_id in common_entity_types]} entity_types = ItsiEntityType(self.session_key, self.owner) common_entity_type_objects = entity_types.storage_interface.get_all( self.session_key, self.owner, objecttype='entity_type', filter_data=filter_data_entity_types, current_user_name=self.current_user_name, fields=['_key', 'title'] ) common_entity_type_titles = [] policy_info = [] for et in common_entity_type_objects: title = et.get('title', '') if (utils.is_valid_str(title)): common_entity_type_titles.append(title) entity_type_key = et.get('_key') policy_info.append('%s' % (rule_to_entity_type_mapping[rule_key][entity_type_key]['title'])) if is_restore_job: logger.warning('The following entity types: %s are part of one or more existing ' 'policies: %s. Restore is skipped for policy [%s].' % (', '.join(common_entity_type_titles), ', '.join(policy_info), json_data.get('title'))) objects.remove(json_data) else: self.raise_error_bad_validation( logger, 'The following entity types: %s are part of one or more existing ' 'policies: %s. To ensure that your new policy works, change the entity ' 'types on the new policy before retrying the operation.' % (', '.join(common_entity_type_titles), ', '.join(policy_info))) # pylint: disable = unused-argument def post_save_setup(self, owner, ids, objects, req_source='unknown', method=CRUDMethodTypes.METHOD_UPSERT, transaction_id=None, skip_local_failure=False): """ Optional method to be implemented in derived classes of specific object types to do additional setup after a write operation (create or update) is invoked on this object @type owner: string @param owner: user who is performing this operation @type ids: List of dict identifiers in format {"_key":} returned by kvstore, parity with objects passed @param ids: list of dict @param method: method name for CRUD @param transaction_id: transaction id to which this call should be part of @param skip_local_failure: if local failures needs to be ignored @type objects: list of dictionary @param objects: list of objects being written @type req_source: string @param req_source: string identifying source of this request @return: none, throws exceptions on errors """ for i, policy in enumerate(objects): # changes made to support loading the policy during a restore operation if '_key' not in ids[i]: policy_id = ids[i] else: policy_id = ids[i].get('_key') # Saved search generation savedsearch_name = self.template_savedsearch_name % policy_id kwargs_to_send = { 'title': savedsearch_name, 'name': savedsearch_name, # A result is required to hit the custom search command 'search': '| setretiredentities auto_retire=%s policy_id=%s' % (True if policy.get('auto_retire') == 1 else False, policy_id), # earliest_time is just to reflect policy and has no effect on search 'dispatch.earliest_time': '-%s%s' % (policy.get('entity_retire_period'), policy.get('entity_retire_schedule')), 'dispatch.latest_time': 'now', 'disabled': 1 if policy.get('disabled') == 1 else 0, 'enableSched': 1, 'cron_schedule': policy.get('cron_schedule'), } SavedSearch.update_search(self.session_key, savedsearch_name, namespace='itsi', owner=self.owner, raise_if_exist=False, **kwargs_to_send) # Setting this every time is just a way to reduce impact of ITSI-19262 SavedSearch.update_acl(self.session_key, savedsearch_name, 'nobody') SavedSearch.update_search(self.session_key, self.unmark_retirable_search, namespace='itsi', owner=self.owner, raise_if_exist=False, disabled=0) if not policy.get('_key'): policy['_key'] = policy_id policy['next_scheduled_time'] = utils.calculate_next_cron_time(policy)[0] self.storage_interface.edit( self.session_key, self.owner, self.object_type, policy_id, policy, current_user_name=self.current_user_name ) return def post_delete(self, owner, ids, req_source='unknown', method=CRUDMethodTypes.METHOD_UPSERT, transaction_id=None): for object_id in ids: savedsearch_name = self.template_savedsearch_name % object_id SavedSearch.delete_search(self.session_key, savedsearch_name) # TODO: (ITSI-17855) Test out the following clause if not self.get_bulk(owner=self.owner): SavedSearch.update_search(self.session_key, self.unmark_retirable_search, namespace='itsi', owner=self.owner, raise_if_exist=False, disabled=1) return def identify_dependencies(self, owner, objects, method, req_source='unknown', transaction_id=None, skip_local_failure=False): required_refresh_jobs = [] # if policy is created first time skip the dependency check if method == CRUDMethodTypes.METHOD_CREATE: return False, [] obj = ItsiEntity(self.session_key, self.owner) entity_policies = objects filter_data = {"$and": [{"retired": {"$ne": 1}}, {"retirable": 1}, {"$or": [{"retiring_policy": entity_policy['_key']} for entity_policy in entity_policies]}]} fields = ["retirable", "retiring_policy"] entity_list = obj.get_bulk(self.owner, filter_data=filter_data, fields=fields, limit=1) object_ids = [entity_policy['_key'] for entity_policy in entity_policies] if entity_list and len(entity_list) > 0: change_detail = {} if CRUDMethodTypes.METHOD_DELETE == method: change_detail = {'action': 'delete'} elif method == CRUDMethodTypes.METHOD_UPDATE or method == CRUDMethodTypes.METHOD_UPSERT: is_policy_disabled = utils.normalize_bool_flag(objects[0].get('disabled', 0)) if not is_policy_disabled: change_detail = {'action': 'update'} else: change_detail = {'action': 'disable'} required_refresh_jobs.append( self.get_refresh_job_meta_data( 'entity_lifecycle_management', list(object_ids), self.object_type, change_detail, transaction_id=transaction_id) ) return len(required_refresh_jobs) > 0, required_refresh_jobs