You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
222 lines
9.8 KiB
222 lines
9.8 KiB
# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
|
|
|
|
import json
|
|
import time
|
|
from splunk.rest import simpleRequest
|
|
from ITOA import itoa_common as utils
|
|
from ITOA.event_management.notable_event_storage import NotableEventStorage
|
|
from .base_migration_interface import BaseMigrationInterface
|
|
|
|
|
|
class NotableMigrationInterface(BaseMigrationInterface):
|
|
"""
|
|
Interface to access Notable Event Objects
|
|
"""
|
|
|
|
def _iterator_from_kvstore(self, object_type, limit, filter_data=None):
|
|
"""
|
|
Helper method to obtain content from kvstore.
|
|
@type object_type: basestring
|
|
@param object_type: type of the object
|
|
@type limit: int
|
|
@type filter_data: dictionary
|
|
@param filter_data: json filter constructed to filter data. Follows mongodb syntax
|
|
@param limit: batch limit to pull from kvstore
|
|
"""
|
|
results = None
|
|
skip = 0
|
|
collection = self._get_notable_collection(object_type)
|
|
mi_obj = NotableEventStorage(self.session_key,
|
|
'nobody',
|
|
'SA-ITOA',
|
|
object_type,
|
|
collection)
|
|
while True:
|
|
results = mi_obj.get_bulk('nobody',
|
|
sort_key="_key",
|
|
sort_dir=1,
|
|
filter_data=filter_data,
|
|
skip=skip,
|
|
limit=limit,
|
|
req_source='notable_event_migration_handler')
|
|
if not results or len(results) == 0:
|
|
break
|
|
skip += limit
|
|
for result in results:
|
|
yield result
|
|
|
|
def migration_get(self, object_type, limit=1000, **kwargs):
|
|
"""
|
|
Method to retrieve object content either from local storage or kvstore
|
|
If this is the first version migration, there is no content from the local
|
|
storage, an attempt will be made to retrieve content from kvstore.
|
|
Any subsequent GET will be from the local storage.
|
|
@type object_type: basestring
|
|
@param object_type: object_type
|
|
@type limit: int
|
|
@param limit: get bulk batch size, default to 100
|
|
@type kwargs: dict
|
|
@param kwargs:
|
|
filter_data: json filter constructed to filter data. Follows mongodb syntax
|
|
@return: iterator, an iterator contains retrieved json objects
|
|
"""
|
|
target_file_list = []
|
|
self.logger.info("Migration helper directory: %s, processing object_type: %s." %
|
|
(self.migration_helper_directory, object_type))
|
|
|
|
if utils.FileManager.is_exists(self.migration_helper_directory):
|
|
target_file_list = self._get_object_file_list(object_type)
|
|
self.logger.info("Retrieving content from local storage: %s." % target_file_list)
|
|
|
|
if target_file_list:
|
|
self.logger.info("Trying to obtain data from the local file system...")
|
|
data = self._iterator_from_filesystem(target_file_list)
|
|
else:
|
|
self.logger.info("Trying to obtain data from KV store...")
|
|
data = self._iterator_from_kvstore(object_type, limit, filter_data=kwargs.get('filter_data', None))
|
|
|
|
return data
|
|
|
|
def _get_notable_collection_data_uri(self, object_type):
|
|
"""Get the uri to the collection data when object type is Notable Type
|
|
@type object_type: basestring
|
|
@param object_type: object type
|
|
|
|
@rtype: basestring/NoneType
|
|
@return: uri if object type is Notable Type; None if unsupported object
|
|
type.
|
|
"""
|
|
if object_type == "notable_event_comment":
|
|
collection = "itsi_notable_event_comment"
|
|
elif object_type == "notable_event_tag":
|
|
collection = "itsi_notable_event_tag"
|
|
elif object_type == "external_ticket":
|
|
collection = "itsi_notable_event_ticketing"
|
|
elif object_type == "notable_event_ref_url":
|
|
collection = "itsi_notable_event_ref_url"
|
|
elif object_type == "notable_aggregation_policy":
|
|
collection = "itsi_notable_event_aggregation_policy"
|
|
elif object_type == "notable_event_state":
|
|
collection = "itsi_notable_event_state"
|
|
elif object_type == "notable_event_seed_group":
|
|
collection = "itsi_correlation_engine_group_template"
|
|
elif object_type == "notable_event_group":
|
|
collection = "itsi_notable_group_user"
|
|
elif object_type == "notable_group_system":
|
|
collection = "itsi_notable_group_system"
|
|
elif object_type == "notable_event_email_template":
|
|
collection = "itsi_notable_event_email_template"
|
|
else:
|
|
raise Exception("Failed to get notable event collection from object type: %s." % object_type)
|
|
|
|
uri = "/servicesNS/nobody/SA-ITOA/storage/collections/data/%s" % collection
|
|
return uri
|
|
|
|
def _get_notable_collection(self, object_type):
|
|
"""Get the uri to the collection data when object type is Notable Type
|
|
@type object_type: basestring
|
|
@param object_type: object type
|
|
|
|
@rtype: basestring/NoneType
|
|
@return: uri if object type is Notable Type; None if unsupported object
|
|
type.
|
|
"""
|
|
if object_type == "notable_event_comment":
|
|
collection = "itsi_notable_event_comment"
|
|
elif object_type == "notable_event_tag":
|
|
collection = "itsi_notable_event_tag"
|
|
elif object_type == "external_ticket":
|
|
collection = "itsi_notable_event_ticketing"
|
|
elif object_type == "notable_event_ref_url":
|
|
collection = "itsi_notable_event_ref_url"
|
|
elif object_type == "notable_event_group":
|
|
collection = "itsi_notable_group_user"
|
|
elif object_type == "notable_group_system":
|
|
collection = "itsi_notable_group_system"
|
|
elif object_type == "notable_aggregation_policy":
|
|
collection = "itsi_notable_event_aggregation_policy"
|
|
elif object_type == "notable_event_state":
|
|
collection = "itsi_notable_event_state"
|
|
elif object_type == "notable_event_seed_group":
|
|
collection = "itsi_correlation_engine_group_template"
|
|
else:
|
|
raise Exception("Failed to get notable event collection from object type: %s." % object_type)
|
|
|
|
return collection
|
|
|
|
def migration_update_mod_time(self, object_type, data_list):
|
|
"""
|
|
Utility method to update the mod_time during restore.
|
|
For aggregation policy, need to keep the mod time up-to-date
|
|
after the restore.
|
|
@type object_type: basestring
|
|
@param object_type: ITSI object types
|
|
@type data_list: list of dict.
|
|
@param data_list: list of json dict. from the backup cache
|
|
@return: None
|
|
"""
|
|
if object_type != 'notable_aggregation_policy':
|
|
return
|
|
|
|
# in-memory update the mod_time to the current time
|
|
for data in data_list:
|
|
if isinstance(data, dict):
|
|
data['mod_time'] = time.time()
|
|
|
|
def migration_save_single_object_to_kvstore(self, object_type, validation=True, dupname_tag=None,
|
|
skip_local_failure=False, transaction_id=None):
|
|
"""
|
|
Actual method to save content to the kvstore for a single object.
|
|
The coming data are coming from the local storage.
|
|
@type object_type: basestring
|
|
@param object_type: ITSI object types
|
|
@type validation: boolean
|
|
@param validation: require validation when saving to kvstore
|
|
@type dupname_tag: basestring
|
|
@param dupname_tag: a special tag to the duplicated titles.
|
|
@return: boolean
|
|
"""
|
|
self.logger.info("Saving single object: {} with transaction_id:{}".format(object_type, transaction_id))
|
|
target_file_list = self._get_object_file_list(object_type)
|
|
for target_file in target_file_list:
|
|
self.logger.info("Retrieving info from: {} with transaction_id:{}".format(target_file, transaction_id))
|
|
data = utils.FileManager.read_data(target_file)
|
|
if not data:
|
|
continue
|
|
location = self._get_notable_collection_data_uri(object_type)
|
|
if not location.endswith('/'):
|
|
location += '/'
|
|
location += 'batch_save'
|
|
self.migration_update_mod_time(object_type, data)
|
|
response, content = simpleRequest(
|
|
location,
|
|
method='POST',
|
|
jsonargs=json.dumps(data),
|
|
sessionKey=self.session_key,
|
|
raiseAllErrors=False
|
|
)
|
|
if response.status not in (200, 201):
|
|
self.logger.error('Failed to bulk update notable type with transaction_id:{}'.format(transaction_id))
|
|
else:
|
|
self.logger.info('Notable type updated successfully with transaction_id:{}'.format(transaction_id))
|
|
|
|
def migration_delete_kvstore(self, object_type):
|
|
"""
|
|
Actual method to delete content from the kvstore for the object.
|
|
@type object_type: basestring
|
|
@param object_type: ITSI object types
|
|
@return: boolean
|
|
"""
|
|
location = self._get_notable_collection_data_uri(object_type)
|
|
self.logger.info('Deleting existing object_type: %s URI: %s.', object_type, location)
|
|
filter_data = {}
|
|
filter_data['object_type'] = object_type
|
|
response, content = simpleRequest(
|
|
location,
|
|
method="DELETE",
|
|
sessionKey=self.session_key,
|
|
getargs=filter_data,
|
|
raiseAllErrors=False
|
|
)
|
|
return
|