You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

425 lines
19 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import glob
import os
from abc import ABCMeta, abstractmethod
import time
import splunk
from itsi.upgrade.itsi_migration_log import logger, getMigrationLogger, PrefixLogger
from ITOA.version_check import VersionCheck
from ITOA.itoa_common import FileManager
from ITOA.saved_search_utility import SavedSearch
from .object_interface.migration_config import get_registered_migration_handler
from . import utils
from splunk.clilib.bundle_paths import make_splunkhome_path
from itsi.itsi_utils import ITOAInterfaceUtils
CHUNK_SIZE = 250
class MigrationFunctionAbstract(metaclass=ABCMeta):
'''
Base class for app migration handling
'''
def __init__(self, session_key, logger=getMigrationLogger()):
self.mi_method = MigrationBaseMethod(session_key, logger=logger)
self.session_key = session_key
self.logger = logger
self.ui_logger = PrefixLogger(prefix='UI', logger=self.logger)
def rollback(self):
'''
Rollback function, which is called if execute operation fails
This function must return status of operation
True - if operation is successful
False - if operation is failed
:return boolean: status if operation is a success or failure
'''
return True
def prepare(self):
'''
Prepare function which is called before execute operation, if it fails then execute function won't get call
This function must return status of operation
True - if operation is successful
False - if operation is failed
:return boolean: status if operation is successful or fail
'''
return True
@abstractmethod
def execute(self):
'''
This function must return status of operation
True - if operation is successful
False - if operation is failed
:return boolean: status if operation is successful or fail
'''
pass
def get_object_iterator(self, object_type, limit=CHUNK_SIZE, **kwargs):
"""
Migration Base Class method to get records.
@type session_key: basestring
@param session_key: splunk session key
@type object_type: basestring
@param object_type: ITSI object types
@type limit: int
@param limit: get bulk batch size, default to 100
@return: iterator, matched records from kvstore
"""
return self.mi_method.migration_get(object_type, limit, **kwargs)
def save_object(self, object_type, data_list, rolling_number=0):
"""
Migration Base Class method to save records.
@type session_key: basestring
@param session_key: splunk session key
@type object_type: basestring
@param object_type: ITSI object types
@type data_list: list
@param data_list: list of json objects to be saved
@type rolling_number: int
@param rolling_number: where the rolling number starts from
@return: boolean
"""
return self.mi_method.migration_save(object_type, data_list, rolling_number)
def save_single_object(self, object_type, data):
"""
Migration Base Class method to save single record.
@type session_key: basestring
@param session_key: splunk session key
@type object_type: basestring
@param object_type: ITSI object types
@type data: object
@param data: object to be saved
@return: boolean
"""
return self.mi_method.migration_save_single_object_to_kvstore(object_type, data)
class MigrationBaseMethod(object):
"""
Base class which contains general migration methods
"""
def __init__(self, session_key, dupname_tag=None, logger=getMigrationLogger(), remove_migration_folder=False):
self.session_key = session_key
self.dupname_tag = dupname_tag
self.logger = logger
self.handler_manifest = get_registered_migration_handler()
self.migration_helper_directory = make_splunkhome_path(['var', 'itsi', 'migration_helper'])
if remove_migration_folder:
self.cleanup_local_storage()
if not FileManager.is_exists(self.migration_helper_directory):
FileManager.create_directory(self.migration_helper_directory)
self.logger.info('Created the migration_helper folder at path %s', self.migration_helper_directory)
def _get_handler_for_object_type(self, object_type):
"""
A method to obtain the class handler based on the object type.
All object classes are defined in the migration_manifest file.
@type object_type: basestring
@param object_type: ITSI object types
@return: the appropriate class handler.
"""
handler = None
manifest_handler = self.handler_manifest.get(object_type, None)
if manifest_handler:
handler = manifest_handler.get("base", None)
if not handler:
message = 'No valid handler found for object_type: {0}, grab a noop handler.'.format(object_type)
self.logger.debug(message)
handler = self.handler_manifest.get("noop").get("base")
return handler(self.session_key, self.migration_helper_directory, logger)
def migration_get(self, object_type, limit=100, **kwargs):
"""
A wrap method to get an iterator based on the object type
@type object_type: basestring
@param object_type: ITSI object types
@type limit: int
@param limit: max count of objects being fetched
@return: an object iterator.
"""
handler = self._get_handler_for_object_type(object_type)
return handler.migration_get(object_type, limit, **kwargs)
def migration_save(self, object_type, data_list, rolling_number=0):
"""
A wrap method to save incoming object data to local storage
@type object_type: basestring
@param object_type: ITSI object types
@type data_list: list
@param data_list: Actual data objects
@type rolling_number: int
@param rolling_number: where the rolling number starts from
@return: boolean
"""
handler = self._get_handler_for_object_type(object_type)
return handler.migration_save(object_type, data_list, rolling_number)
def migration_delete_kvstore(self, object_type):
"""
A wrap method to delete content from the kvstore for the object
@type object_type: basestring
@param object_type: ITSI object types
@return: boolean
"""
handler = self._get_handler_for_object_type(object_type)
return handler.migration_delete_kvstore(object_type)
def migration_delete_kvstore_object(self, object_type, object_key):
"""
A wrap method to delete content from the kvstore for the object
@type object_type: basestring
@param object_type: ITSI object types
@type object_key: basestring
@param object_Key: ITSI object Key
@return: boolean
"""
handler = self._get_handler_for_object_type(object_type)
return handler.migration_delete_kvstore_object(object_type, object_key)
def migration_save_single_object_to_kvstore(self, object_type, validation=True, dupname_tag=None,
skip_local_failure=False):
"""
A wrap method to save content to the kvstore for a single object.
The coming data are coming from the local storage.
@type object_type: basestring
@param object_type: ITSI object types
@type validation: boolean
@param validation: require validation when saving to kvstore
@type dupname_tag: basestring
@param dupname_tag: a special tag to the duplicated titles.
@return: boolean
"""
handler = self._get_handler_for_object_type(object_type)
return handler.migration_save_single_object_to_kvstore(object_type, validation, dupname_tag)
def migration_bulk_save_to_kvstore(self, validation=True, dupname_tag=None, skip_local_failure=False,
transaction_id=None):
"""
A public method to save content to the kvstore for all objects.
The incoming data are coming from the local storage.
@type validation: boolean
@param validation: require validation when saving to kvstore
@type dupname_tag: basestring
@param dupname_tag: a special tag to the duplicated titles.
@return: boolean
"""
object_type_list = []
status = True
bulk_save_msg = ''
try:
target_directory = os.path.join(os.path.sep, self.migration_helper_directory, "*")
for file_name in glob.glob(target_directory):
mi_object = os.path.split(file_name)[-1].split('___')[0]
if mi_object not in object_type_list:
object_type_list.append(mi_object)
# Most objects after services have same priority, so sort alphabetically then sort by their weights
# to ensure that migrations/restores are performed in a consistent order (see utils._get_object_order())
alpha_sorted_object_type_list = sorted(object_type_list)
sorted_object_type_list = sorted(alpha_sorted_object_type_list, key=lambda x: utils._get_object_order(x))
self.logger.info('Restoring order as the following: %s' % str(sorted_object_type_list))
for object_type in sorted_object_type_list:
self.logger.info('Saving content of object_type: %s into KV store.' % object_type)
handler = self._get_handler_for_object_type(object_type)
handler.migration_save_single_object_to_kvstore(object_type=object_type,
validation=validation,
dupname_tag=dupname_tag,
skip_local_failure=skip_local_failure,
transaction_id=transaction_id)
except splunk.BadRequest as brq:
self.logger.exception(brq)
bulk_save_msg = brq.get_extended_message_text()
status = False
except Exception as e:
self.logger.exception(e)
status = False
bulk_save_msg = str(e)
self.cleanup_local_storage()
return status, bulk_save_msg
def cleanup_local_storage(self):
"""
A utility function to remove the local storage
"""
try:
if FileManager.is_exists(self.migration_helper_directory):
FileManager.delete_working_directory(self.migration_helper_directory)
self.logger.info('Deleted the migration_helper folder at path %s', self.migration_helper_directory)
except Exception:
self.logger.error('Error in deleting %s !' % self.migration_helper_directory)
class MigrationBase(object):
'''
Base class which has some basic required function
'''
def __init__(self, from_version, to_version, ignore_build_number=True, logger=getMigrationLogger()):
'''
Initialize
:param from_version: from version we need to migration
:param to_version: to version number perform migration
:param ignore_build_number: if we need to ignore build number and version suffix after (major, minor, update)
version
:return: nothing
'''
if not self._validate_version(from_version):
raise ValueError('Invalid version:{0}.'.format(from_version))
if not self._validate_version(to_version):
raise ValueError('Invalid version:{0}.'.format(to_version))
self.to_version = to_version
self.from_version = from_version
self.ignore_build_number = ignore_build_number
self.logger = logger
# TODO implement functionality ignore_build_number is set to true
def _validate_version(self, version):
return VersionCheck.validate_version(version)
def _is_migration_required(self):
comp = VersionCheck.compare(self.to_version, self.from_version)
if comp == 0 or comp == -1:
return False
elif comp == 1:
self.logger.info('Version compare between %s and %s indicates that migration is required',
self.from_version, self.to_version)
return True
else:
self.logger.error('Version compare between %s and %s did not return expected value',
self.from_version, self.to_version)
return False
class Migration(MigrationBase):
'''
Migration call to perform any migration operation
'''
def __init__(self, from_version, to_version, session_key, ignore_build_number=True, logger=getMigrationLogger()):
'''
Initialize the call
:param from_version: from version we need to migration
:param to_version: to version number perform migration
:param ignore_build_number: if we need to ignore build number and version suffix after (major, minor, update)
version
:return: nothing
'''
super(Migration, self).__init__(from_version, to_version, ignore_build_number, logger)
self.session_key = session_key
self.is_execution_successful = True
self.migration_functions = []
self.logger = logger
self.ui_logger = PrefixLogger(prefix='UI', logger=self.logger)
def add(self, migration_function):
'''
Add migration function into the list. It throws exception if migration_function is not proper instance
:param migration_function: migration function
:return: boolean flag to show if migration is successful
'''
if not isinstance(migration_function, MigrationFunctionAbstract):
self.logger.error('Migration function:%s is not instance of MigrationFunctionAbstract', migration_function)
raise ValueError('Migration function: {} is not instance of MigrationFunctionAbstract.'.format(
migration_function))
self.migration_functions.append(migration_function)
def run(self):
if not self._is_migration_required():
self.logger.info('Migration is not required from:%s to:%s', self.from_version, self.to_version)
try:
self._initialize_migration_functions_statuses_to_kv()
for command in self.migration_functions:
prepare_step = command.prepare()
# Call other steps only if prepare step is successful
if prepare_step:
self._update_status_of_command_to_kv(command, 'In Progress')
self.ui_logger.info('Starting {} operation...'.format(command))
try:
if not command.execute():
self.ui_logger.error('Failed to execute operation: {}.'.format(command))
self.is_execution_successful = False
self._update_status_of_command_to_kv(command, 'Failed')
break
else:
self.ui_logger.info('Successfully executed operation: {}.'.format(command))
self._update_status_of_command_to_kv(command, 'Completed')
except Exception as e:
self.logger.exception(e)
self.ui_logger.error('Failed to execute operation: {}.'.format(command))
self.is_execution_successful = False
self._update_status_of_command_to_kv(command, 'Failed')
break
else:
self._update_status_of_command_to_kv(command, 'Failed')
self.ui_logger.error('Skipping operation {} since preparation has failed.'.format(command))
except Exception as e:
self.logger.exception(e)
self.is_execution_successful = False
def _update_status_of_command_to_kv(self, command, status):
if command.__class__.__name__ == 'BackupRestore':
return
migration_function_key = str(command).replace('.', '')
entry = ITOAInterfaceUtils.get_migration_status_from_kv(self.session_key)
if 'migration_functions' not in entry:
entry['migration_functions'] = { migration_function_key: {} }
entry['migration_functions'][migration_function_key]['status'] = status
ITOAInterfaceUtils.append_data_to_migration_status_kv(
self.session_key,
entry,
)
def _initialize_migration_functions_statuses_to_kv(self):
migration_functions = {}
for migration_function in self.migration_functions:
if migration_function.__class__.__name__ != 'BackupRestore':
migration_function_key = str(migration_function).replace('.', '')
migration_functions[migration_function_key] = {'title': migration_function_key, 'status': 'Enqueued'}
entry = ITOAInterfaceUtils.get_migration_status_from_kv(self.session_key)
ITOAInterfaceUtils.append_data_to_migration_status_kv(
self.session_key,
entry,
migration_functions=migration_functions,
transform_start_timestamp=time.time()
)
class EventAnalyticsMigrationAbstract(MigrationFunctionAbstract):
"""
Base class for EA migration
"""
def __init__(self, session_key, logger=getMigrationLogger()):
super(EventAnalyticsMigrationAbstract, self).__init__(session_key, logger=logger)
self.session_key = session_key
self.rules_engine_saved_search = SavedSearch.get_search(self.session_key, 'itsi_event_grouping')
self.rules_engine_changed = False
def disable_rules_engine(self):
"""
Disable Rules Engine
:return: Nothing
"""
if int(self.rules_engine_saved_search['disabled']) == 0:
self.rules_engine_saved_search['disabled'] = 1
SavedSearch.save_entity(self.session_key, self.rules_engine_saved_search)
self.rules_engine_changed = True
def recover_rules_engine(self):
"""
Recover rules engine to previous state, previous stats can be disabled or enabled.
:return: Nothing
"""
if self.rules_engine_changed:
self.rules_engine_saved_search['disabled'] = 0
SavedSearch.save_entity(self.session_key, self.rules_engine_saved_search)