# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. import time import sys from ITOA.setup_logging import getLogger from itsi.upgrade.itsi_migration_log import PrefixLogger from itsi.itsi_utils import ITOAInterfaceUtils from splunk.clilib.bundle_paths import make_splunkhome_path sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib', 'SA_ITOA_app_common'])) from SA_ITOA_app_common.solnlib.server_info import ServerInfo from ITOA.itoa_factory import instantiate_object class MigrationSupervisor(object): def __init__(self, session_key): self.logger = getLogger(logger_name='itsi.supervisor') self.ui_logger = PrefixLogger(prefix='UI', logger=self.logger) self.session_key = session_key self.info = ServerInfo(self.session_key) def _drop_key(self, dictionary, key): """ Helper method to remove properties from dictionary """ key in dictionary and dictionary.pop(key) def is_migration_running(self, should_return_verbose=False): """ Checks a semaphore from any migration supervisor in current deployment Returns: bool: True when migration is in progress, False otherwise. """ entry = ITOAInterfaceUtils.get_migration_status_from_kv(self.session_key) self._drop_key(entry, '_user') self._drop_key(entry, 'id_') is_running = entry.get('is_running') if not is_running: self.logger.info('Migration is not running.') if should_return_verbose: return entry return False self.logger.info('Migration is running.') self._drop_key(entry, 'has_succeeded') self._drop_key(entry, 'end_timestamp') return True if not should_return_verbose else entry def prepare_knowledge_objects_report(self): """ Checks general ITSI knowledge objects counts. This will give ability to estimate time to run data migrations """ counts = {} from itsi.objects.object_manifest import object_manifest for object_type in object_manifest: obj = instantiate_object(self.session_key, 'nobody', object_type, logger=self.logger) results = obj.get_bulk( 'nobody', fields=['_key'], req_source='migration_report' ) counts[object_type] = len(results) entry = ITOAInterfaceUtils.get_migration_status_from_kv(self.session_key) report = { 'object_counts': counts, 'timestamp': time.time(), } return ITOAInterfaceUtils.append_data_to_migration_status_kv(self.session_key, entry, knowledge_objects_report=report) def prepare_for_migration(self): """ Creates persistent record of the migration, blocks some app functionality to allow migration procedures to proceed Returns: bool: True when pre-migration procedures are completed and current process can perform migration, False otherwise. """ # check if running, disable mod inputs self.logger.info('Start migration preparation...') if self.is_migration_running(): return False current_timestamp = time.time() entry = ITOAInterfaceUtils.get_migration_status_from_kv(self.session_key) self.ui_logger.info('Successfully prepared migration.') self._drop_key(entry, 'has_succeeded') self._drop_key(entry, 'end_timestamp') return ITOAInterfaceUtils.append_data_to_migration_status_kv(self.session_key, entry, is_running=True, start_timestamp=current_timestamp) def done(self, has_succeeded): """ Performs clean up sequence upon migration process finish. This includes successful and failed migrations """ if not self.is_migration_running(): return False if has_succeeded: self.ui_logger.info('Migration has succeeded.') else: self.ui_logger.info('Migration has failed.') entry = ITOAInterfaceUtils.get_migration_status_from_kv(self.session_key) is_done = ITOAInterfaceUtils.append_data_to_migration_status_kv( self.session_key, entry, is_running=False, end_timestamp=time.time(), has_succeeded=has_succeeded, ) return is_done