You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

222 lines
9.8 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import json
import time
from splunk.rest import simpleRequest
from ITOA import itoa_common as utils
from ITOA.event_management.notable_event_storage import NotableEventStorage
from .base_migration_interface import BaseMigrationInterface
class NotableMigrationInterface(BaseMigrationInterface):
"""
Interface to access Notable Event Objects
"""
def _iterator_from_kvstore(self, object_type, limit, filter_data=None):
"""
Helper method to obtain content from kvstore.
@type object_type: basestring
@param object_type: type of the object
@type limit: int
@type filter_data: dictionary
@param filter_data: json filter constructed to filter data. Follows mongodb syntax
@param limit: batch limit to pull from kvstore
"""
results = None
skip = 0
collection = self._get_notable_collection(object_type)
mi_obj = NotableEventStorage(self.session_key,
'nobody',
'SA-ITOA',
object_type,
collection)
while True:
results = mi_obj.get_bulk('nobody',
sort_key="_key",
sort_dir=1,
filter_data=filter_data,
skip=skip,
limit=limit,
req_source='notable_event_migration_handler')
if not results or len(results) == 0:
break
skip += limit
for result in results:
yield result
def migration_get(self, object_type, limit=1000, **kwargs):
"""
Method to retrieve object content either from local storage or kvstore
If this is the first version migration, there is no content from the local
storage, an attempt will be made to retrieve content from kvstore.
Any subsequent GET will be from the local storage.
@type object_type: basestring
@param object_type: object_type
@type limit: int
@param limit: get bulk batch size, default to 100
@type kwargs: dict
@param kwargs:
filter_data: json filter constructed to filter data. Follows mongodb syntax
@return: iterator, an iterator contains retrieved json objects
"""
target_file_list = []
self.logger.info("Migration helper directory: %s, processing object_type: %s." %
(self.migration_helper_directory, object_type))
if utils.FileManager.is_exists(self.migration_helper_directory):
target_file_list = self._get_object_file_list(object_type)
self.logger.info("Retrieving content from local storage: %s." % target_file_list)
if target_file_list:
self.logger.info("Trying to obtain data from the local file system...")
data = self._iterator_from_filesystem(target_file_list)
else:
self.logger.info("Trying to obtain data from KV store...")
data = self._iterator_from_kvstore(object_type, limit, filter_data=kwargs.get('filter_data', None))
return data
def _get_notable_collection_data_uri(self, object_type):
"""Get the uri to the collection data when object type is Notable Type
@type object_type: basestring
@param object_type: object type
@rtype: basestring/NoneType
@return: uri if object type is Notable Type; None if unsupported object
type.
"""
if object_type == "notable_event_comment":
collection = "itsi_notable_event_comment"
elif object_type == "notable_event_tag":
collection = "itsi_notable_event_tag"
elif object_type == "external_ticket":
collection = "itsi_notable_event_ticketing"
elif object_type == "notable_event_ref_url":
collection = "itsi_notable_event_ref_url"
elif object_type == "notable_aggregation_policy":
collection = "itsi_notable_event_aggregation_policy"
elif object_type == "notable_event_state":
collection = "itsi_notable_event_state"
elif object_type == "notable_event_seed_group":
collection = "itsi_correlation_engine_group_template"
elif object_type == "notable_event_group":
collection = "itsi_notable_group_user"
elif object_type == "notable_group_system":
collection = "itsi_notable_group_system"
elif object_type == "notable_event_email_template":
collection = "itsi_notable_event_email_template"
else:
raise Exception("Failed to get notable event collection from object type: %s." % object_type)
uri = "/servicesNS/nobody/SA-ITOA/storage/collections/data/%s" % collection
return uri
def _get_notable_collection(self, object_type):
"""Get the uri to the collection data when object type is Notable Type
@type object_type: basestring
@param object_type: object type
@rtype: basestring/NoneType
@return: uri if object type is Notable Type; None if unsupported object
type.
"""
if object_type == "notable_event_comment":
collection = "itsi_notable_event_comment"
elif object_type == "notable_event_tag":
collection = "itsi_notable_event_tag"
elif object_type == "external_ticket":
collection = "itsi_notable_event_ticketing"
elif object_type == "notable_event_ref_url":
collection = "itsi_notable_event_ref_url"
elif object_type == "notable_event_group":
collection = "itsi_notable_group_user"
elif object_type == "notable_group_system":
collection = "itsi_notable_group_system"
elif object_type == "notable_aggregation_policy":
collection = "itsi_notable_event_aggregation_policy"
elif object_type == "notable_event_state":
collection = "itsi_notable_event_state"
elif object_type == "notable_event_seed_group":
collection = "itsi_correlation_engine_group_template"
else:
raise Exception("Failed to get notable event collection from object type: %s." % object_type)
return collection
def migration_update_mod_time(self, object_type, data_list):
"""
Utility method to update the mod_time during restore.
For aggregation policy, need to keep the mod time up-to-date
after the restore.
@type object_type: basestring
@param object_type: ITSI object types
@type data_list: list of dict.
@param data_list: list of json dict. from the backup cache
@return: None
"""
if object_type != 'notable_aggregation_policy':
return
# in-memory update the mod_time to the current time
for data in data_list:
if isinstance(data, dict):
data['mod_time'] = time.time()
def migration_save_single_object_to_kvstore(self, object_type, validation=True, dupname_tag=None,
skip_local_failure=False, transaction_id=None):
"""
Actual method to save content to the kvstore for a single object.
The coming data are coming from the local storage.
@type object_type: basestring
@param object_type: ITSI object types
@type validation: boolean
@param validation: require validation when saving to kvstore
@type dupname_tag: basestring
@param dupname_tag: a special tag to the duplicated titles.
@return: boolean
"""
self.logger.info("Saving single object: {} with transaction_id:{}".format(object_type, transaction_id))
target_file_list = self._get_object_file_list(object_type)
for target_file in target_file_list:
self.logger.info("Retrieving info from: {} with transaction_id:{}".format(target_file, transaction_id))
data = utils.FileManager.read_data(target_file)
if not data:
continue
location = self._get_notable_collection_data_uri(object_type)
if not location.endswith('/'):
location += '/'
location += 'batch_save'
self.migration_update_mod_time(object_type, data)
response, content = simpleRequest(
location,
method='POST',
jsonargs=json.dumps(data),
sessionKey=self.session_key,
raiseAllErrors=False
)
if response.status not in (200, 201):
self.logger.error('Failed to bulk update notable type with transaction_id:{}'.format(transaction_id))
else:
self.logger.info('Notable type updated successfully with transaction_id:{}'.format(transaction_id))
def migration_delete_kvstore(self, object_type):
"""
Actual method to delete content from the kvstore for the object.
@type object_type: basestring
@param object_type: ITSI object types
@return: boolean
"""
location = self._get_notable_collection_data_uri(object_type)
self.logger.info('Deleting existing object_type: %s URI: %s.', object_type, location)
filter_data = {}
filter_data['object_type'] = object_type
response, content = simpleRequest(
location,
method="DELETE",
sessionKey=self.session_key,
getargs=filter_data,
raiseAllErrors=False
)
return