You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

120 lines
4.6 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import time
import sys
from ITOA.setup_logging import getLogger
from itsi.upgrade.itsi_migration_log import PrefixLogger
from itsi.itsi_utils import ITOAInterfaceUtils
from splunk.clilib.bundle_paths import make_splunkhome_path
sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib', 'SA_ITOA_app_common']))
from SA_ITOA_app_common.solnlib.server_info import ServerInfo
from ITOA.itoa_factory import instantiate_object
class MigrationSupervisor(object):
def __init__(self, session_key):
self.logger = getLogger(logger_name='itsi.supervisor')
self.ui_logger = PrefixLogger(prefix='UI', logger=self.logger)
self.session_key = session_key
self.info = ServerInfo(self.session_key)
def _drop_key(self, dictionary, key):
"""
Helper method to remove properties from dictionary
"""
key in dictionary and dictionary.pop(key)
def is_migration_running(self, should_return_verbose=False):
"""
Checks a semaphore from any migration supervisor
in current deployment
Returns:
bool: True when migration is in progress, False otherwise.
"""
entry = ITOAInterfaceUtils.get_migration_status_from_kv(self.session_key)
self._drop_key(entry, '_user')
self._drop_key(entry, 'id_')
is_running = entry.get('is_running')
if not is_running:
self.logger.info('Migration is not running.')
if should_return_verbose:
return entry
return False
self.logger.info('Migration is running.')
self._drop_key(entry, 'has_succeeded')
self._drop_key(entry, 'end_timestamp')
return True if not should_return_verbose else entry
def prepare_knowledge_objects_report(self):
"""
Checks general ITSI knowledge objects counts.
This will give ability to estimate time to run data migrations
"""
counts = {}
from itsi.objects.object_manifest import object_manifest
for object_type in object_manifest:
obj = instantiate_object(self.session_key, 'nobody', object_type, logger=self.logger)
results = obj.get_bulk(
'nobody',
fields=['_key'],
req_source='migration_report'
)
counts[object_type] = len(results)
entry = ITOAInterfaceUtils.get_migration_status_from_kv(self.session_key)
report = {
'object_counts': counts,
'timestamp': time.time(),
}
return ITOAInterfaceUtils.append_data_to_migration_status_kv(self.session_key, entry, knowledge_objects_report=report)
def prepare_for_migration(self):
"""
Creates persistent record of the migration,
blocks some app functionality to allow migration procedures to proceed
Returns:
bool: True when pre-migration procedures are completed
and current process can perform migration, False otherwise.
"""
# check if running, disable mod inputs
self.logger.info('Start migration preparation...')
if self.is_migration_running():
return False
current_timestamp = time.time()
entry = ITOAInterfaceUtils.get_migration_status_from_kv(self.session_key)
self.ui_logger.info('Successfully prepared migration.')
self._drop_key(entry, 'has_succeeded')
self._drop_key(entry, 'end_timestamp')
return ITOAInterfaceUtils.append_data_to_migration_status_kv(self.session_key,
entry,
is_running=True,
start_timestamp=current_timestamp)
def done(self, has_succeeded):
"""
Performs clean up sequence upon migration process finish.
This includes successful and failed migrations
"""
if not self.is_migration_running():
return False
if has_succeeded:
self.ui_logger.info('Migration has succeeded.')
else:
self.ui_logger.info('Migration has failed.')
entry = ITOAInterfaceUtils.get_migration_status_from_kv(self.session_key)
is_done = ITOAInterfaceUtils.append_data_to_migration_status_kv(
self.session_key,
entry,
is_running=False,
end_timestamp=time.time(),
has_succeeded=has_succeeded,
)
return is_done