# Copyright (C) 2005-2024 Splunk Inc. All Rights Reserved. import sys import os from collections import defaultdict from splunk.clilib.bundle_paths import make_splunkhome_path sys.path.append(make_splunkhome_path(["etc", "apps", "SA-ITOA", "lib"])) from migration_utility.migration_utility_manifest import ( precheck_failure_severity, precheck_documentation_details, dict_for_mapping_precheck_ids, dict_for_mapping_category, dict_for_mapping_precheck_class, dict_for_mapping_precheck_class_and_id, ) from itsi.objects.itsi_kpi_base_search import ItsiKPIBaseSearch from ITOA.itoa_common import ( get_conf_stanza_single_entry, get_object_batch_size, SplunkUser, ) from ITOA.setup_logging import getLogger from migration.migration import MigrationBaseMethod from migration_utility.constants import ( MODES, MODES_STR_MAP, UPGRADE_READINESS_JOB_TIMEOUT_LIMIT, ) from itsi.objects.model.itsi_model_validator import ItsiModelValidator from itsi.objects.itsi_service import ItsiService from itsi.objects.itsi_entity import ItsiEntity from itsi.objects.itsi_service_template import ItsiBaseServiceTemplate from itsi.objects.itsi_upgrade_readiness_prechecks import ItsiUpgradeReadinessPrechecks from itsi.objects.itsi_refresh_queue_job import ItsiRefreshQueueJob from itsi.service_template.service_template_utils import ServiceTemplateUtils from itsi.objects.itsi_backup_restore import ItsiBackupRestore from ITOA.itoa_common import get_current_utc_epoch logger = getLogger( logger_file="itsi_migration_utility.log", logger_name="itsi_migration_utility_prechecker", ) class ItsiMigrationUtilityHandler: def __init__(self, sessionkey, transaction_id) -> None: self.sessionkey = sessionkey self.transaction_id = transaction_id self.dict_for_mapping_precheck_ids = dict_for_mapping_precheck_ids self.dict_for_mapping_precheck_class = dict_for_mapping_precheck_class self.dict_for_mapping_precheck_class_and_id = ( dict_for_mapping_precheck_class_and_id ) self.precheck_failure_severity = precheck_failure_severity self.dict_for_mapping_category = dict_for_mapping_category self.precheck_documentation_details = precheck_documentation_details self.upgrade_readiness_precheck_obj = ItsiUpgradeReadinessPrechecks( self.sessionkey, "nobody" ) self.failed_precheck_count = 0 self.large_number_of_kpis_per_base_search_threshold = 600 self.dangling_service_reference_in_entities_disabled = int( get_conf_stanza_single_entry( self.sessionkey, "itsi_settings", "upgrade_readiness", "dangling_service_reference_in_entities_disabled", ).get("content", 0) ) self.itsi_default_authorize_conf_file = make_splunkhome_path( ["etc", "apps", "itsi", "default", "authorize.conf"] ) self.system_default_authorize_conf_file = make_splunkhome_path( ["etc", "system", "default", "authorize.conf"] ) self.system_local_authorize_conf_file = make_splunkhome_path( ["etc", "system", "local", "authorize.conf"] ) self.metric_ad_authorize_conf_file = make_splunkhome_path( ["etc", "apps", "SA-ITSI-MetricAD", "default", "authorize.conf"] ) self.current_user_context = SplunkUser.get_current_user_context( self.sessionkey, logger ) self.parent_role_to_get_user_capabilites = "itoa_admin" self.role_to_get_user_capabilites = "itoa_team_admin" self.native_user_capabilities = self.get_native_capabilities(self.itsi_default_authorize_conf_file, self.system_default_authorize_conf_file, self.system_local_authorize_conf_file, self.metric_ad_authorize_conf_file, self.role_to_get_user_capabilites) self.disabled_capabilities = self.get_disabled_capabilities(self.parent_role_to_get_user_capabilites) self.modified_native_capabilities = self.get_modified_capabilities(self.native_user_capabilities, self.disabled_capabilities) self.modified_capability_for_current_user = self.get_modified_capability_for_current_user(self.current_user_context, self.modified_native_capabilities, self.native_user_capabilities) def get_precheck_details(self, itsi_muh, precheck_id): precheck = [ name for name, pre_id in self.dict_for_mapping_precheck_ids.items() if pre_id == precheck_id ] precheck_class = self.dict_for_mapping_precheck_class[precheck[0]] description_details, resolution_details = precheck_class( itsi_muh ).get_precheck_details() documentation_details = self.precheck_documentation_details[precheck_id] return description_details, resolution_details, documentation_details def get_auto_remediation_details(self, itsi_muh, precheck_id): precheck = [ name for name, pre_id in self.dict_for_mapping_precheck_ids.items() if pre_id == precheck_id ] precheck_class = self.dict_for_mapping_precheck_class[precheck[0]] description_details, category_fixed = precheck_class( itsi_muh ).get_auto_remediation_details() return description_details, category_fixed def set_prechecks(self, precheck_ids): # list of prechecks on which operations to be performed self.prechecks = [] if precheck_ids is None: self.prechecks = list( self.dict_for_mapping_precheck_class.values()) else: for key, value in self.dict_for_mapping_precheck_ids.items(): if value in precheck_ids: self.prechecks.append( self.dict_for_mapping_precheck_class[key]) def execute(self, itsi_muh, operation_mode, precheck_ids=None): """ Takes prechecks classes' list and operation mode to be executed and after the execution proper output in console and itsi_migration_utility.log will be logged : param itsi_muh: object of ItsiMigrationUtilityHandler() class : param operation_mode: mode of operation -- MODE 1: Precheck or MODE 2: Auto-remediation : precheck_ids: list of ids of prechecks to be executed. If None, all prechecks will be executed """ result = True message = "" self.set_prechecks(precheck_ids) # Check for in progress jobs before starting job. other_in_progress_jobs = self.upgrade_readiness_precheck_obj.get_in_progress_upgrade_readiness_prechecks( lookback_time=UPGRADE_READINESS_JOB_TIMEOUT_LIMIT, exclude_transaction_id=self.transaction_id, ) other_in_progress_job_ids = [ other_in_progress_job["transaction_id"] for other_in_progress_job in other_in_progress_jobs ] other_in_progress_job_start_times = [ other_in_progress_job["start_time"] for other_in_progress_job in other_in_progress_jobs ] if len(other_in_progress_jobs) > 0: logger.error( f"[transaction_id={self.transaction_id}] " f"[operation_mode={MODES_STR_MAP[operation_mode]}] " f"Execution failed because other in progress job(s) were found, transaction_id(s): {other_in_progress_job_ids}." ) return False, f"OTHER_JOB_{other_in_progress_job_start_times}", self.failed_precheck_count if operation_mode == MODES["PRECHECK"]: message = "PRECHECK" self.perform_precheck(itsi_muh) elif operation_mode == MODES["AUTO_REMEDIATION"]: result, message = self.perform_remediation(itsi_muh) return result, message, self.failed_precheck_count def perform_precheck(self, itsi_muh): # list to store the detailed output for logging in # itsi_migration_utility.log file list_to_store_detailed_output = [] self.setup_data() time_now = get_current_utc_epoch() self.upgrade_readiness_precheck_obj.update_upgrade_readiness_precheck_job( self.transaction_id, precheck_started=True, precheck_start_time=time_now ) for precheck in self.prechecks: precheck_obj = precheck(itsi_muh) detailed_output = precheck_obj.precheck_result() if detailed_output: # Logging the precheck failure details list_to_store_detailed_output.append(detailed_output) # to print in log file for detailed_output in list_to_store_detailed_output: if "[precheck_passed" in detailed_output: logger.info(detailed_output) else: self.failed_precheck_count += 1 logger.error(detailed_output) logger.info( f"[transaction_id={self.transaction_id}] Precheck Execution Completed" ) def perform_remediation(self, itsi_muh): # list to store the detailed output for logging in # itsi_migration_utility.log file list_to_store_detailed_output = [] operation_mode = MODES["AUTO_REMEDIATION"] self.setup_data() time_now = get_current_utc_epoch() self.upgrade_readiness_precheck_obj.update_upgrade_readiness_precheck_job( self.transaction_id, remediation_started=True, remediation_start_time=time_now, ) # Fail job and return if refresh queue is not empty after a period of time refresh_queue_job_interface = ItsiRefreshQueueJob(self.sessionkey, "nobody") if not refresh_queue_job_interface.wait_for_unblocked_queue(): logger.error( f"[transaction_id={self.transaction_id}] " f"[operation_mode={MODES_STR_MAP[operation_mode]}] " "Execution timed out waiting for refresh queue to empty. Not proceeding with remediation." ) return False, "REFRESH_QUEUE" # Fail job and return if backup/restore jobs are in progress backup_restore_interface = ItsiBackupRestore(self.sessionkey, "nobody") if backup_restore_interface.is_any_backup_restore_job_in_progress("nobody"): logger.error( f"[transaction_id={self.transaction_id}] " f"[operation_mode={MODES_STR_MAP[operation_mode]}] " "Backup/restore jobs are in progress. Auto-remediation is not allowed while backup/restore jobs are in progress." ) return False, "BACKUP/RESTORE" # Fail job and return if service template sync is in progress status = ServiceTemplateUtils( self.sessionkey, "nobody" ).service_template_sync_job_in_progress_or_sync_now() if status.get("status", False): logger.error( f"[transaction_id={self.transaction_id}] " f"[operation_mode={MODES_STR_MAP[operation_mode]}] " "One or more service templates are currently syncing. Auto-remediation is not allowed while service template sync is in progress." ) return False, "TEMPLATE SYNC" for precheck in self.prechecks: precheck_obj = precheck(itsi_muh) precheck_id = self.dict_for_mapping_precheck_class_and_id[precheck] detailed_output_of_precheck = precheck_obj.precheck_result() if "[precheck_passed" not in detailed_output_of_precheck: detailed_output_of_remediation = precheck_obj.auto_remediation() if detailed_output_of_remediation: # Logging the details of updated objects list_to_store_detailed_output.append( detailed_output_of_remediation) list_to_store_detailed_output.append( f"[transaction_id={self.transaction_id}] " f"[precheck_id={precheck_id}] " "The error identified by the upgrade readiness check was fixed." ) else: list_to_store_detailed_output.append( f"[transaction_id={self.transaction_id}] " f"[precheck_id={precheck_id}] " "No automatic fix ran because no error exists." ) # converting dict into list list_of_updated_objs = list(itsi_muh.dict_of_original_objs.values()) # update into kvstore self.update_kvstore_objects(list_of_updated_objs) # to print in log file for detailed_output in list_to_store_detailed_output: logger.info(detailed_output) logger.info( f"[transaction_id={self.transaction_id}] " f"[operation_mode={MODES_STR_MAP[operation_mode]}] " "The errors identified by the upgrade readiness check were fixed." ) # perform another precheck after remediation is done logger.info( f"[transaction_id={self.transaction_id}] " f"[operation_mode={MODES_STR_MAP[operation_mode]}] " "Running another precheck after completing remediation." ) self.set_prechecks(None) self.perform_precheck(itsi_muh) return True, "" def setup_data(self): self.imv = ItsiModelValidator(self.sessionkey, logger) self.imv.check_service_templates() self.imv.check_services() self.imv.get_services_and_service_templates() self.imv.check_kpis_in_collections() self.imv.get_services_having_no_entity_filter_rule() self.imv.get_kpi_base_search_having_service_entity_filter_rule() self.imv.get_base_search_id_having_service_entity_filter_rule() self.imv.get_kpi_base_searches() self.imv.get_service_templates_with_correct_services() self.imv.get_entities_services_map() if not self.dangling_service_reference_in_entities_disabled else None self.mi_method = MigrationBaseMethod(self.sessionkey, logger=logger) self.owner = "nobody" # Set the threshold from the limits.conf file self.large_number_of_kpis_per_base_search_threshold = int(get_conf_stanza_single_entry( self.sessionkey, 'itsi_settings', 'upgrade_readiness', 'large_number_of_kpis_per_base_search_threshold').get('content', 600)) # List of KPIs having empty threshold field self.kpi_with_empty_threshold_field = [] # List of KPI Base Searches having no metrics configured self.kpi_base_search_empty_metrics = [] self.get_kpi_with_empty_threshold_field() # List of entities link to each services self.list_of_entities_link_to_service = dict( self.imv.list_of_entities_link_to_service ) self.service_objects_with_dangling_service_refs = ( self.imv.service_objects_with_dangling_service_refs ) self.kpi_base_search_having_service_entity_filter_rule = list( self.imv.kpi_base_search_id_having_entity_filter_rule_enabled ) # seperating the service_objects_with_dangling_service_refs into 2 different # objs with dangling services depending on me references and # objs with dangling services depends on references self.service_objects_with_dangling_services_depending_on_me_refs = defaultdict( list ) self.service_objects_with_dangling_services_depends_on_refs = defaultdict( list) for service_obj in self.service_objects_with_dangling_service_refs: service_id = service_obj["_key"] for services_depends_on_obj in service_obj["services_depends_on"]: if "serviceid" in services_depends_on_obj: self.service_objects_with_dangling_services_depends_on_refs[ service_id ].append(services_depends_on_obj["serviceid"]) for services_depending_on_me_obj in service_obj["services_depending_on_me"]: if "serviceid" in services_depending_on_me_obj: self.service_objects_with_dangling_services_depending_on_me_refs[ service_id ].append(services_depending_on_me_obj["serviceid"]) self.service_template_id_to_missing_linked_services_map = ( self.imv.service_template_id_to_missing_linked_services_map ) self.enabled_services_with_no_entity_filter_rule = ( self.imv.enabled_services_with_no_entity_filter_rule ) self.service_template_objects_missing_expected_linked_services_map = ( self.imv.service_template_objects_missing_expected_linked_services_map ) self.service_templates_with_linked_services = ( self.imv.service_templates_with_linked_services ) self.services_without_base_service_template_id = ( self.imv.services_without_base_service_template_id ) self.service_objects_with_missing_depends_service_refs_map = ( self.imv.service_objects_with_missing_depends_service_refs_map ) self.objs_with_corrupt_kpis = self.imv.objs_with_corrupt_kpis self.objs_with_dangling_shared_base_search_kpis = ( self.imv.objs_with_dangling_shared_base_search_kpis ) self.objs_with_dangling_kpi_threshold_templates = ( self.imv.objs_with_dangling_kpi_threshold_templates ) self.service_template_bad_sync_status = ( self.imv.service_template_bad_sync_status ) self.entity_interface = ItsiEntity(self.sessionkey, "nobody") self.service_interface = ItsiService(self.sessionkey, "nobody") self.service_template_interface = ItsiBaseServiceTemplate( self.sessionkey, "nobody" ) self.kpi_base_search_interface = ItsiKPIBaseSearch( self.sessionkey, "nobody" ) self.kpi_base_search_with_count_of_kpis = self.imv.kpi_base_search_with_count_of_kpis self.service_template_with_correct_linked_services = self.imv.service_template_with_correct_linked_services # fetch all the objects from kvstore self.original_entities = self.entity_interface.get_bulk( "nobody", transaction_id=None ) self.original_services = self.service_interface.get_bulk( "nobody", transaction_id=None ) self.original_base_service_templates = self.service_template_interface.get_bulk( "nobody", transaction_id=None ) self.original_kpi_templates = self.service_interface.get_bulk( "nobody", transaction_id=None ) self.original_kpi_base_search = self.kpi_base_search_interface.get_bulk( "nobody", transaction_id=None ) self.dict_of_original_entities = { entity["_key"]: entity for entity in self.original_entities } self.dict_of_original_services = { service["_key"]: service for service in self.original_services } self.dict_of_original_base_service_templates = { base_service_template["_key"]: base_service_template for base_service_template in self.original_base_service_templates } self.dict_of_original_kpi_templates = { kpi_template["_key"]: kpi_template for kpi_template in self.original_kpi_templates } self.dict_of_original_kpi_base_search = { kpi_base_search["_key"]: kpi_base_search for kpi_base_search in self.original_kpi_base_search } self.list_of_dicts = [ self.dict_of_original_base_service_templates, self.dict_of_original_kpi_templates, self.dict_of_original_services, self.dict_of_original_entities, self.dict_of_original_kpi_base_search, ] self.dict_of_original_objs = {} for self.item in self.list_of_dicts: self.dict_of_original_objs.update(self.item) self.all_updated_objs = [] def update_kvstore_objects(self, updated_objects): """ Takes list of updated objects after performing auto-remediation and then stores them in kvstore using rest call : param updated_objects: list of updated objects """ updated_base_service_templates = [] updated_services = [] updated_kpi_templates = [] updated_kpi_base_search = [] for updated_obj in updated_objects: if updated_obj["object_type"] == "base_service_template": updated_base_service_templates.append(updated_obj) elif updated_obj["object_type"] == "service" or updated_obj["object_type"] == "entity": updated_services.append(updated_obj) elif updated_obj["object_type"] == "kpi_template": updated_kpi_templates.append(updated_obj) elif updated_obj["object_type"] == "kpi_base_search": updated_kpi_base_search.append(updated_obj) # Saving the updated objects in the backend if updated_base_service_templates: self.service_template_interface.batch_save_backend( "nobody", updated_base_service_templates, transaction_id=None ) if updated_services: self.service_interface.batch_save_backend( "nobody", updated_services, transaction_id=None ) if updated_kpi_templates: self.service_interface.batch_save_backend( "nobody", updated_kpi_templates, transaction_id=None ) if updated_kpi_base_search: self.kpi_base_search_interface.batch_save_backend( "nobody", updated_kpi_base_search, transaction_id=None ) def make_logging_details( self, precheck_id, total_count, severity, category, block_upgrade ): logging_details = ( "[transaction_id={}] [precheck_id={}] [total_count={}] " '[severity={}] [category="{}"] [blocks_upgrade={}] '.format( self.transaction_id, precheck_id, total_count, severity, category, block_upgrade, ) ) return logging_details def make_result(self, passed, precheck, message, object_type, object_id, **kwargs): """ Takes required parameters of the object that failed the prechecks and then returns the dict of all these parameters so that proper formatted result will get logged in the log file : param passed: boolean for prechecks failed or passed : param precheck: precheck type : param message: required message to be added : param object_type: type of the object (service, base service template, kpi template) : param object_id: '_key' of the object : return: dict of these parameters received """ precheck_dict = { "passed": passed, "pre-check": precheck, "message": message, "Object_type": object_type, "object_id": object_id, } precheck_dict.update(kwargs) return precheck_dict def _check_empty_threshold_kpi(self, kpi): """ Check if the threshold fields in the KPIs of service is empty 1.If the KPI is shared base verify the metric field in the base search used by the KPI 2.If not then consider threshold field :param kpi: ITSI Object for KPI :return : None """ if not kpi.get("threshold_field", "") and ( kpi.get("search_type", "") == "shared_base" and kpi.get("base_search_id", "") in self.kpi_base_search_empty_metrics or kpi.get("search_type", "") == "adhoc" ): self.kpi_with_empty_threshold_field.append(kpi) def _check_threshold_in_kpi(self, service_obj): """ Check the threhold field in each KPI :param service_obj: ITSI object with services and KPIs :return : None """ kpi_list = service_obj.get("kpis", []) def check_kpis(kpi): if kpi["title"] != "ServiceHealthScore": return True else: return False filtered_kpi_list = filter(check_kpis, kpi_list) for kpi in filtered_kpi_list: self._check_empty_threshold_kpi(kpi) def _check_empty_metrics_for_kpi_base_search(self): """ Check for the empty metrics field in KPI Base search :param : None :return : None """ kpi_base_search_object = ItsiKPIBaseSearch(self.sessionkey, "unknown") all_objects = kpi_base_search_object.get_bulk(owner=self.owner) # Find all the objects that have empty metrics field def get_kpi_base_obj_keys(kpi_base_obj): if len(kpi_base_obj.get("metrics", [])) == 0: return True else: return False get_filtered_kpi_base_search_objs = filter( get_kpi_base_obj_keys, all_objects) for kpi_base_obj in get_filtered_kpi_base_search_objs: self.kpi_base_search_empty_metrics.append(kpi_base_obj.get("_key")) def get_kpi_with_empty_threshold_field(self): """ Get the KPIs that have empty threshold fields for each Service : param : None : return : None """ service_iter = self.mi_method.migration_get( "service", limit=get_object_batch_size(self.sessionkey), get_raw=True, fields=["_key", "identifying_name", "object_type", "kpis"], source_kvstore=True, get_raw_kwargs={"collection": "itsi_services"}, filter_data={"kpis.threshold_field": ""}, ) self._check_empty_metrics_for_kpi_base_search() for service in service_iter: self._check_threshold_in_kpi(service) def parse_conf_file(self, file_path, stanza): """ Get conf stanza from file : param file_path : file located at path : param stanza : stanza name to get values of : return : dict of key/values """ result = {} current_stanza = None if not os.path.isfile(file_path): return result with open(file_path, "r") as file: for line in file: line = line.strip() if line.startswith("[") and line.endswith("]"): current_stanza = line[1:-1] elif current_stanza == stanza and "=" in line: key, value = line.split("=", 1) result[key.strip()] = value.strip() return result def parse_authorize_conf(self, file_path): """ Parse the authorize.conf file and return a dictionary of roles and their imported roles. : param file_path : The file path of the authorize.conf file. : return : A dictionary containing roles and their imported roles. """ roles = {} if not os.path.isfile(file_path): return roles with open(file_path, "r") as file: lines = file.readlines() current_role = None for line in lines: line = line.strip() if line.startswith("[") and line.endswith("]"): current_role = line[1:-1] if current_role.startswith("role_"): roles[current_role] = [] elif line.startswith("importRoles"): if current_role is not None: import_roles = line.split("=")[1].strip().split(";") roles[current_role].extend(import_roles) return roles def get_all_import_roles(self, role, conf_files): """ Get all the roles imported by the given role from the specified configuration files. : param role : The role to get imported roles for. : param conf_files : A list of configuration files to parse. : return : A list of roles imported by the given role. """ roles_data = {} for conf_file in conf_files: parsed_role = self.parse_authorize_conf(conf_file) for key, value in parsed_role.items(): if value: roles_data[key] = value def filter_roles(role_dict): updated_dict = {} for key, value in role_dict.items(): updated_key = key.replace("role_", "") updated_dict[updated_key] = value return updated_dict filtered_roles = filter_roles(roles_data) def get_nested_roles(role_dict, target_role): if target_role not in role_dict: return [] nested_roles = [] stack = [target_role] while stack: current_role = stack.pop() nested_roles.append(current_role) if current_role in role_dict: stack.extend(role_dict[current_role]) return nested_roles all_roles = get_nested_roles(filtered_roles, role) return list(set(all_roles)) def get_capabilities(self, file_paths, roles): """ Get the capabilities for the given roles from the specified configuration files. : param file_paths : A list of file paths for the configuration files to parse. : param roles : A list of roles to get capabilities for. : return : A dictionary containing the capabilities for the given roles. """ all_capabilities = {} for file_path in file_paths: for role in roles: capabilities = self.parse_conf_file(file_path, role) all_capabilities.update(capabilities) return { key: value for key, value in all_capabilities.items() if value in ["enabled", "disabled"] } def get_native_capabilities( self, itsi_app_default_file_path, system_default_file_path, system_local_default_path, metric_ad_authorize_conf_file, role, ): """ Get the native capabilities for the given role. : param itsi_app_default_file_path : Default file path of authorize.conf : param role : Role to get native capabilities : return native_capabilities : Dict containing native capabilities of given role """ default_capabilities = self.parse_conf_file( itsi_app_default_file_path, "role_itoa_admin" ) imported_roles = self.get_all_import_roles( role, [ itsi_app_default_file_path, system_default_file_path, system_local_default_path, metric_ad_authorize_conf_file, ], ) all_imported_roles = ["role_" + value for value in imported_roles] inherited_capabilities = self.get_capabilities( [ itsi_app_default_file_path, system_default_file_path, system_local_default_path, metric_ad_authorize_conf_file, ], all_imported_roles, ) native_capabilities = { key: value for key, value in default_capabilities.items() if key not in inherited_capabilities } return native_capabilities def get_disabled_capabilities(self, role): """ Get capabilities that have been disabled by the user's for given role : param role : role for which capabilities have been disabled : return : Dict of disabled capabilities in context of itsi, and system """ system_local_file_path = make_splunkhome_path( ["etc", "system", "local", "authorize.conf"] ) disabled_capabilities = {} if os.path.isfile(system_local_file_path): system_modified_capabilities = self.parse_conf_file( system_local_file_path, "role_" + role ) if system_modified_capabilities: disabled_capabilities["system"] = { key: value for key, value in system_modified_capabilities.items() if value == "disabled" } return disabled_capabilities def get_modified_capabilities(self, native_capabilities, updated_capabilities): """ Get the modified capabilities based on the native capabilities and the updated capabilities. : param native_capabilities : A dictionary containing native capabilities. : param updated_capabilities : A dictionary containing updated capabilities. : return : A list of modified capabilities. """ native_capability_keys = set(native_capabilities.keys()) modified_capability_keys = set() for value in updated_capabilities.values(): if isinstance(value, dict): modified_capability_keys.update(value.keys()) common_capabilities = native_capability_keys.intersection( modified_capability_keys ) return list(common_capabilities) def get_modified_capability_for_current_user( self, current_user_context, modified_native_capabilities, native_user_capabilities, ): """ Determine the modified capabilities for the current user based on the native capabilities and the modified capabilities. : param current_user_context : A dictionary containing information about the current user, including their username and capabilities. : param modified_native_capabilities : A list of capabilities that have been modified from their native state. : param native_user_capabilities : A list of capabilities that are native to the user. : return : A list of capabilities that have been modified for the current user. """ removed_capability = [] filtered_native_user_capabilities = [ key for key, value in native_user_capabilities.items() if value.lower() in ["enabled", "disabled"] ] if current_user_context.get("username") in ["admin", "splunk-system-user"]: return modified_native_capabilities else: for capability in filtered_native_user_capabilities: if capability not in current_user_context.get("capabilities"): removed_capability.append(capability) return removed_capability