# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. import glob import os from ITOA.itoa_factory import instantiate_object from ITOA import itoa_common as utils from .itoa_migration_interface import ITOAMigrationInterface from ITOA.datamodel_interface import DatamodelInterface from ITOA.storage import itoa_storage from itsi.upgrade.itsi_migration_log import PrefixLogger from itsi.upgrade.migration_utils import render_ui_message_required class ServiceMigrationInterface(ITOAMigrationInterface): """ Migration class to handle ITOA service objects """ def migration_save_single_object_to_kvstore(self, object_type, validation=True, dupname_tag=None, skip_local_failure=False, transaction_id=None): """ Method to save service content to the kvstore for a single object. The coming data are coming from the local storage. @type object_type: basestring @param object_type: ITSI object types @type validation: boolean @param validation: require validation when saving to kvstore @type dupname_tag: basestring @param dupname_tag: a special tag to the duplicated titles. @return: boolean """ ui_logger = PrefixLogger(prefix='UI', logger=self.logger) self.logger.info("Saving single object: {} with transaction_id: {}".format(object_type, transaction_id)) target_file_list = self.get_object_file_list(object_type) # Fetch all datamodels for datamodel conversions self.cached_datamodel_dict = DatamodelInterface.get_all_datamodels( self.session_key, '', itoa_storage.ITOAStorage().get_app_name()) saved_object_count = 0 for target_file in target_file_list: data = utils.FileManager.read_data(target_file) if len(data) > 0: self.convert_invalid_datamodel_kpis_to_adhoc(data) mi_obj = instantiate_object(self.session_key, "nobody", object_type, logger=self.logger) if validation: mi_obj.skip_service_template_update = True mi_obj.force_update_savedsearch = True utils.save_batch(mi_obj, "nobody", data, no_batch=False, dupname_tag=dupname_tag, skip_local_failure=skip_local_failure, transaction_id=transaction_id, skip_service_template_update=True) else: mi_obj.batch_save_backend("nobody", data) saved_object_count += 1 if render_ui_message_required(session_key=self.session_key, app_name='SA-ITOA'): ui_logger.info( 'Saving {} objects to KV store... {} out of {} objects have been saved with transaction_id:{}'.format( object_type, saved_object_count, len(target_file_list), transaction_id ) ) self.logger.info("{} {} successfully created in KV store with transaction_id:{}".format(len(data), object_type, transaction_id)) else: self.logger.info("No objects of type{} to create with transaction_id:{}".format(object_type, transaction_id)) def convert_invalid_datamodel_kpis_to_adhoc(self, data): """ Converts any possible invalid datamodel KPIs to adhoc @type object_type: array @param object_type: the service objects from local storage @return: None """ kpi_obj = instantiate_object(self.session_key, "nobody", "kpi", logger=self.logger) for service in data: for kpi in service.get('kpis', []): if kpi.get('search_type', '') == 'datamodel': if kpi_obj.convert_invalid_datamodel_kpi_to_adhoc(kpi, self.cached_datamodel_dict): self.logger.warning('Found KPI (Id: %s) in service "%s" with stale datamodel specification. ' 'Auto converting this KPI to adhoc search type to prevent service ' 'failures.', kpi.get('title', ''), service.get('title')) else: self.logger.info('KPI (Id: %s) in service "%s" was not converted.', kpi.get( 'title', ''), service.get('title')) def get_object_file_list(self, object_type): object_type_modifier = object_type + "___*" target_file = os.path.join(os.path.sep, self.migration_helper_directory, object_type_modifier) target_file_list = glob.glob(target_file) self.logger.info("Fetching the local storage target file list: %s" % target_file_list) return target_file_list