You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
159 lines
7.9 KiB
159 lines
7.9 KiB
# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
|
|
|
|
from ITOA.itoa_factory import instantiate_object
|
|
from ITOA import itoa_common as utils
|
|
from ITOA.storage import itoa_storage
|
|
from .base_migration_interface import BaseMigrationInterface
|
|
|
|
|
|
class ITOAMigrationInterface(BaseMigrationInterface):
|
|
"""
|
|
Migration class to handle ITOA objects
|
|
"""
|
|
|
|
def _iterator_from_kvstore(self, object_type, limit, get_raw=False, **kwargs):
|
|
"""
|
|
Helper method to obtain content from kvstore.
|
|
This method is specific to ITOA objects.
|
|
@type object_type: basestring
|
|
@param object_type: type of the object
|
|
@type limit: int
|
|
@param limit: batch limit to pull from kvstore
|
|
@type get_raw: boolean
|
|
@param get_raw: get raw contents instead of processed contents
|
|
"""
|
|
try:
|
|
skip = 0
|
|
mi_obj = None
|
|
if get_raw:
|
|
mi_obj = itoa_storage.ITOAStorage(**kwargs.get('get_raw_kwargs', {}))
|
|
mi_obj.wait_for_storage_init(self.session_key)
|
|
else:
|
|
mi_obj = instantiate_object(self.session_key,
|
|
"nobody",
|
|
object_type,
|
|
logger=self.logger)
|
|
|
|
while True:
|
|
results = None
|
|
if get_raw:
|
|
results = mi_obj.get_all(self.session_key, 'nobody', object_type, sort_key='identifying_name',
|
|
sort_dir=1, skip=skip, limit=limit,
|
|
filter_data=kwargs.get('filter_data', None),
|
|
fields=kwargs.get('fields', None))
|
|
else:
|
|
results = mi_obj.get_bulk("nobody",
|
|
sort_key="_key",
|
|
sort_dir=1,
|
|
skip=skip,
|
|
limit=limit,
|
|
filter_data=kwargs.get('filter_data', None),
|
|
fields=kwargs.get('fields', None),
|
|
req_source="MigrationBaseMethod")
|
|
|
|
self.logger.info("Operation get_bulk from KV store done, size of the results: %s." % len(results))
|
|
if not results or len(results) == 0:
|
|
break
|
|
skip += limit
|
|
for result in results:
|
|
yield result
|
|
except Exception as e:
|
|
self.logger.error("Failed to get object content for object type: %s.", object_type)
|
|
raise Exception(e)
|
|
|
|
def migration_get(self, object_type, limit=100, **kwargs):
|
|
"""
|
|
Method to retrieve object content either from local storage or kvstore
|
|
If this is the first version migration, there is no content from the local
|
|
storage, an attempt will be made to retrieve content from kvstore.
|
|
Any subsequent GET will be from the local storage.
|
|
@type object_type: basestring
|
|
@param object_type: object_type
|
|
@type limit: int
|
|
@param limit: get bulk batch size, default to 100
|
|
@type kwargs: dict
|
|
@param kwargs:
|
|
get_raw: get raw contents from kv store instead of processed contents
|
|
@return: iterator, an iterator contains retrieved json objects
|
|
"""
|
|
target_file_list = []
|
|
self.logger.info("Migration helper directory: %s, processing object_type: %s." %
|
|
(self.migration_helper_directory, object_type))
|
|
|
|
if not kwargs.get('source_kvstore', False) and utils.FileManager.is_exists(self.migration_helper_directory):
|
|
target_file_list = self._get_object_file_list(object_type)
|
|
self.logger.info("Retrieving content from local storage: %s." % target_file_list)
|
|
|
|
if target_file_list:
|
|
self.logger.info("Trying to obtain data from the local file system...")
|
|
data = self._iterator_from_filesystem(target_file_list)
|
|
else:
|
|
self.logger.info("Trying to obtain data from KV store...")
|
|
get_raw = kwargs.get('get_raw', False)
|
|
kwargs.pop('get_raw', None)
|
|
data = self._iterator_from_kvstore(object_type, limit, get_raw, **kwargs)
|
|
|
|
return data
|
|
|
|
def migration_save_single_object_to_kvstore(self, object_type, validation=True, dupname_tag=None,
|
|
skip_local_failure=False, transaction_id=None):
|
|
"""
|
|
Actual method to save content to the kvstore for a single object.
|
|
The coming data are coming from the local storage.
|
|
@type object_type: basestring
|
|
@param object_type: ITSI object types
|
|
@type validation: boolean
|
|
@param validation: require validation when saving to kvstore
|
|
@type dupname_tag: basestring
|
|
@param dupname_tag: a special tag to the duplicated titles.
|
|
@return: boolean
|
|
"""
|
|
self.logger.info("Saving single object: {} with transaction_id: {}".format(object_type, transaction_id))
|
|
target_file_list = self._get_object_file_list(object_type)
|
|
self.logger.info('Files retrieved for object type: {} files: {} transaction_id:{}'.format(object_type, target_file_list, transaction_id))
|
|
saved_object_count = 0
|
|
for target_file in target_file_list:
|
|
self.logger.info('File {} getting restored with transaction_id:{}'.format(target_file, transaction_id))
|
|
data = utils.FileManager.read_data(target_file)
|
|
if len(data) > 0:
|
|
mi_obj = instantiate_object(self.session_key,
|
|
"nobody",
|
|
object_type,
|
|
logger=self.logger)
|
|
if validation:
|
|
# for base_service_template, need to skip the st update
|
|
if object_type in ['base_service_template']:
|
|
mi_obj.skip_service_template_update = True
|
|
|
|
utils.save_batch(mi_obj,
|
|
"nobody",
|
|
data,
|
|
no_batch=False,
|
|
dupname_tag=dupname_tag,
|
|
skip_local_failure=skip_local_failure,
|
|
transaction_id=transaction_id)
|
|
else:
|
|
mi_obj.batch_save_backend("nobody", data)
|
|
saved_object_count += 1
|
|
self.logger.info('Saving {} objects to KV store... {} out of {} objects have been saved with transaction_id:{}.'.format(
|
|
object_type, saved_object_count, len(target_file_list), transaction_id
|
|
))
|
|
self.logger.info("{} {} successfully created in KV store with transaction_id:{}".format(len(data), object_type, transaction_id))
|
|
else:
|
|
self.logger.info("No objects of type {} to create. transaction_id:{}".format(object_type, transaction_id))
|
|
|
|
def migration_delete_kvstore(self, object_type):
|
|
"""
|
|
Actual method to delete content from the kvstore for the object.
|
|
This method applies to all ITOA objects
|
|
@type object_type: basestring
|
|
@param object_type: ITSI object types
|
|
@return: boolean
|
|
"""
|
|
self.logger.info('Deleting existing object_type: %s.', object_type)
|
|
mi_obj = instantiate_object(self.session_key,
|
|
"nobody",
|
|
object_type,
|
|
logger=self.logger)
|
|
mi_obj.delete_bulk("nobody")
|