You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

662 lines
23 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import time
from ITOA.itoa_common import is_valid_dict, is_valid_list, is_valid_str, validate_json
from ITOA.storage import itoa_storage
from ITOA.setup_logging import logger
from .notable_event_utils import Audit, MethodType, NotableEventConfiguration
class EventManagementException(Exception):
pass
def time_function_call(fx):
"""
This decorator will provide a log message measuring how long a function call took.
Arguments:
fx -- The function to measure
"""
def wrapper(*args, **kwargs):
logger.debug("Started operation=%s", fx.__name__)
t = time.time()
r = fx(*args, **kwargs)
logger.info('[Change Tracking] Successfully called operation="%s"', fx.__name__)
diff = time.time() - t
diff_string = str(round(diff, 2)) + " seconds"
logger.info('[Performance Tracking] Completed Notable Event operation="%s", duration="%s"', fx.__name__,
diff_string)
return r
return wrapper
class BaseEventManagement(object):
"""
A generic class which has CURD operation and bulk curd operation to perform for any object level
"""
# Key which hold id
id_key = '_key'
def __init__(self, session_key, collection, object_type, user='nobody', current_user_name=None, action_dispatch_config=None):
"""
Initialize objects
@type session_key: basestring
@param session_key: session_key
@type collection: basestring
@param collection: collection name
@type object_type: basestring
@param object_type: object type
@type user: basestring
@param user: user name
@type current_user_bame: basestring
@param current_user_name: user name
@type action_dispatch_config: ActionDispatchConfiguration
@param action_dispatch_config: the setting for hybrid action dispatch
@rtype: object
@return: instance of the class
"""
if not is_valid_str(session_key):
message = "Invalid session key."
logger.error(message)
raise ValueError(message)
else:
self.session_key = session_key
self.action_dispatch_config = action_dispatch_config
self.audit = None
self.notable_event_configuration = None
self.host_base_uri = ''
self.master_session_key = self.session_key
self.owner = user
self.current_user_name = current_user_name
if not is_valid_str(collection):
logger.error("Invalid collection name=%s", collection)
raise ValueError('Invalid collection name')
else:
self.collection = collection
self.object_type = object_type
self.object_type_key = 'object_type'
self.storage_interface = itoa_storage.ITOAStorage(collection=self.collection)
def pre_processing(self, data_list, method):
"""
This is being used by inherit class which can be used
to validate schema or inject some default values like time
etc
@type data_list: list
@param data_list: data list
@type method: basestring
@param method: method name
@return: None
"""
pass
def get_filter_data(self, object_ids):
"""
return filter base upon _key
@type: object_ids: list
@param object_ids: object list
@rtype: basestring
@return: return filter string
"""
if is_valid_list(object_ids):
return {'$or': [{self.id_key: object_id} for object_id in object_ids]}
else:
raise TypeError('%s is not list' % object_ids)
def merge_filter_data(self, filter_data, new_data):
"""
Merge filter passed in request and filter created later
Update filter_data in place
@type filter_data: dict
@param filter_data: filter data
@type new_data: dict
@param new_data: newly create filter
@return:
"""
if not is_valid_dict(new_data) or not is_valid_dict(filter_data):
raise TypeError('Invalid filter data to merge')
for key, value in new_data.items():
if key in filter_data:
filter_data[key].extend(value)
else:
filter_data[key] = value
return validate_json('[event_management_interface]', filter_data)
def get_user(self, **kwargs):
"""
Return user
@param kwargs: dict which hold some configuration
@rtype: basestring
@return: return user
"""
return self.owner if self.owner else kwargs.get('owner')
def fetch_filter_data(self, **kwargs):
"""
Check filter data in kwargs and return dict form of it
@type kwargs: dict
@param kwargs: kwargs
@rtype: dict
@return: return filter data
"""
filter_data = {}
if kwargs.get('filter_data') and kwargs.get('filter') in kwargs:
f_data_1 = kwargs.get('filter_data')
f_data_2 = kwargs.get('filter')
if f_data_1:
f_data_1 = validate_json('[event_management_interface]', f_data_1)
if f_data_2:
f_data_2 = validate_json('[event_management_interface]', f_data_2)
if f_data_1 is not None and filter_data is not None:
filter_data = self.merge_data(f_data_1, f_data_2)
else:
filter_data = f_data_1 or f_data_2
else:
filter_data = kwargs.get('filter') or kwargs.get('filter_data')
if filter_data:
filter_data = validate_json('[event_management_interface]', filter_data)
return filter_data or {}
def inject_object_type(self, data_list):
"""
Insert object type if it is not set
@type data_list: list
@param data_list: data list
@return: in place update
"""
# make sure object type is set
for data in data_list:
if 'object_type' not in data:
data['object_type'] = self.object_type
elif data.get('object_type') != self.object_type:
data['object_type'] = self.object_type
@time_function_call
def create(self, data, **kwargs):
"""
Create notable event object
@type data - dict
@param data - notable event schema to create
@rtype dict
@return create object _key or raise an exception
"""
if is_valid_dict(data):
self.inject_object_type([data])
self.pre_processing([data], MethodType.CREATE)
result = self.storage_interface.create(
self.master_session_key,
self.get_user(**kwargs),
self.object_type,
data,
current_user_name=self.current_user_name,
host_base_uri=self.host_base_uri
)
logger.debug("Create %s object ID=%s.", self.object_type, result.get(self.id_key))
return result
else:
message = "Data is not a valid dictionary, data type=%s." % type(data)
logger.error(message)
raise TypeError(message)
def create_for_group(self, data, **kwargs):
"""
Create stuff for events in a Group.
"""
raise NotImplementedError('Derived class must implement this method')
@time_function_call
def create_bulk(self, data_list, **kwargs):
"""
Create more than one notable event object
@type data_list: list
@param data_list: data list
@rtype: list
@return: list of created
"""
try:
validate_json('[Notable Event Curd]', data_list)
except Exception as e:
logger.exception(e)
message = 'Invalid JSON list to do bulk create.'
logger.error(message)
raise TypeError(message)
# make sure object type is set
self.inject_object_type(data_list)
self.pre_processing(data_list, MethodType.CREATE_BULK)
results = self.storage_interface.batch_save(
self.master_session_key,
self.get_user(**kwargs),
data_list,
objecttype=self.object_type,
current_user_name=self.current_user_name,
host_base_uri=self.host_base_uri
)
return results
@time_function_call
def get(self, object_id, **kwargs):
"""
Get notable event object
@type object_id: basestring
@param object_id: notable event key
@rtype: dict
@return: return notable event schema
"""
if is_valid_str(object_id):
result = self.storage_interface.get(
self.master_session_key,
self.get_user(**kwargs),
self.object_type,
object_id,
current_user_name=self.current_user_name,
host_base_uri=self.host_base_uri
)
return result
else:
message = 'Invalid key to get object, value=%s.' % object_id
logger.error(message)
raise TypeError(message)
@time_function_call
def get_bulk(self, object_ids, **kwargs):
"""
Get one or more than one notable event objects
@type object_ids: list
@param object_ids: list of objects to get
Note: if object list is empty or not defined then get all objects
@type kwargs: dict
@param kwargs: extra arguments to fetch notable events
@rtype: list
@return: list of notable events
"""
filter_data = self.fetch_filter_data(**kwargs)
if is_valid_list(object_ids) and len(object_ids) != 0:
self.merge_filter_data(filter_data, self.get_filter_data(object_ids))
logger.debug('Updated filter data=%s', filter_data)
fields = kwargs.get('fields', [])
if not fields and kwargs.get('f'):
fields = kwargs.get('f')
kwargs['fields'] = fields
if isinstance(fields, str) and ',' in fields:
kwargs['fields'] = fields.split(',')
limit = kwargs.get('count')
skip = kwargs.get('offset')
if limit is None and skip is None:
# If count and offset are undefined, try limit and skip
limit = kwargs.get('limit')
skip = kwargs.get('skip')
results = self.storage_interface.get_all(
self.master_session_key,
self.get_user(**kwargs),
self.object_type,
sort_key=kwargs.get('sort_key'),
filter_data=filter_data,
sort_dir=kwargs.get('sort_dir'),
fields=kwargs.get('fields'),
skip=skip,
limit=limit,
current_user_name=self.current_user_name,
host_base_uri=self.host_base_uri
)
logger.debug("Return %s notable events", len(results))
return results
def get_and_merge_data_list(self, object_ids, data_list, is_partial_update=True, **kwargs):
"""
Useful function to do partial update. Its merge data from backend with request data
@type object_ids: list
@param object_ids: list of objects to fetch from backend
@type data_list: list
@param data_list: data list which is passed in the request
@type is_partial_update: bool
@param is_partial_update: set to true it is partial update
@param **kwargs: Key word arguments to provide additional args to those who override this method
@rtype: list
@return: Merged data
"""
results = self.get_bulk(object_ids, **kwargs)
is_group_state_action = False
if isinstance(data_list, list) and len(data_list) > 0:
# if there is 'group_state_change_action' key in the value then this is a group state change action
is_group_state_action = 'group_state_change_action' in data_list[0]
logger.info('is_group_state_action=%s objects_to_modify=%s', is_group_state_action, object_ids)
if (results is None or len(results) == 0) and not is_group_state_action:
logger.error('Failed to get objects=%s from kv store', object_ids)
raise EventManagementException('Failed to get objects=%s from kv store' % (str(object_ids)))
mapped_objects = {}
for data in data_list:
if data.get(self.id_key) not in mapped_objects:
mapped_objects[data.get(self.id_key)] = {'data': data, 'fdata': None}
for result in results:
if result.get(self.id_key) in mapped_objects:
mapped_objects[result.get(self.id_key)]['fdata'] = result
for value in mapped_objects.values():
if is_group_state_action:
if value['fdata']: # updating an existing group
fields_to_update = value['data'].get('fields_to_update', [])
# remove the fields which don't have to be updated. Can't delete items of dict while iterating
# hence getting a copy of the keys the dict's keys by converting the keys to list
for key in list(value['data']):
# don't delete the action_id. It's needed in data_list for logging purposes
if key == 'action_id':
continue
if key not in fields_to_update:
value['data'].pop(key, None)
logger.info('group_state_change_action updating the group group_id=%s fields_to_update=%s',
value['data'].get('_key', None), value['data'])
else: # no existing group in KV Store, let's create a new one
# remove fields_to_update and group_state_change_action because we don't want to insert these
# fields in the collection
value['data'].pop('fields_to_update', None)
value['data'].pop('group_state_change_action', None)
# no existing object in KV Store so let's create a new one by inserting the whole object passed
value['fdata'] = value['data']
logger.info('group_state_change_action creating new group group_id=%s new_group_object=%s',
value['data'].get('_key', None), value['data'])
continue
self.merge_data(value['fdata'], value['data'], is_partial_update)
logger.debug('final_merged_data data_list=%s', data_list)
return data_list
def get_and_merge_data(self, object_id, data, is_partial_update=True, **kwargs):
"""
Similar function but it deals with one object instead of list
Note: this is inplace update to data dict
@type object_id: basestring
@param object_id: object id
@type data: dict
@param data: data is passed in the request
@type is_partial_update: bool
@param is_partial_update: set to true it is partial update
@param **kwargs: Key word arguments to provide additional args to those who override this method
@rtype: dict
@return: Merge data
"""
result = self.get(object_id)
if result is None:
errorMsg = "Failed to get object ID=%s from KV store." % object_id
logger.error(errorMsg)
raise EventManagementException(errorMsg)
return self.merge_data(result, data, is_partial_update)
def merge_data(self, fetched_data, data, is_partial_update=True):
"""
Helper function to merge request data with backend data
Note: this is inplace update to data dict
@type fetched_data: dict
@param fetched_data: Fetch data from backend
@param data: dict
@param data: request data
@type is_partial_update: bool
@param is_partial_update: set to true it is partial update
@rtype: dict
@return: return updated data (inplace update to data)
"""
if data is None or fetched_data is None:
logger.error("data or fetched data is None")
raise EventManagementException('data or fetched data is None')
for key, value in fetched_data.items():
if is_partial_update and key not in data:
data[key] = value
return data
@time_function_call
def update(self, object_id, data, is_partial_update=False, **kwargs):
"""
Update one notable event object
@type object_id: basestring
@param object_id: object id
@type data: dict
@param data: data
@type is_partial_update: bool
@param is_partial_update: flag to do partial update
@type kwargs: dict
@param kwargs: Extra parameters
@rtype: dict
@return: return dict which holds updated keys
"""
if is_valid_str(object_id):
self.get_and_merge_data(object_id, data, is_partial_update, **kwargs)
self.inject_object_type([data])
self.pre_processing([data], MethodType.UPDATE)
result = self.storage_interface.edit(
self.master_session_key,
self.get_user(**kwargs),
self.object_type,
object_id,
data,
current_user_name=self.current_user_name,
host_base_uri=self.host_base_uri
)
return result
else:
message = 'Object ID is not a valid string, value=%s.' % object_id
logger.error(message)
raise TypeError(message)
@time_function_call
def update_bulk(self, object_ids, data_list, is_partial_update=False, skip_get_merge=False, **kwargs):
"""
Perform update for one or more notable event objects
@type object_ids: list
@param object_ids: notable events
@type data_list: list
@param data_list: notable events
@type is_partial_update: bool
@param is_partial_update: flag for partial update
@type skip_get_merge: bool
@param skip_get_merge: flag for skipping calling get_and_merge_data_list or not
@type kwargs: dict
@param kwargs: Extra params to perform
@rtype: list
@return: update notable event schema
"""
if is_valid_list(object_ids):
if not skip_get_merge:
self.get_and_merge_data_list(object_ids, data_list, is_partial_update, **kwargs)
self.inject_object_type(data_list)
self.pre_processing(data_list, MethodType.UPDATE_BULK)
results = self.storage_interface.batch_save(
self.master_session_key,
self.get_user(**kwargs),
data_list,
objecttype=self.object_type,
current_user_name=self.current_user_name,
host_base_uri=self.host_base_uri
)
return results
else:
message = 'Object IDs is not a valid list, value=%s.' % object_ids
logger.error(message)
raise TypeError(message)
@time_function_call
def delete(self, object_id, **kwargs):
"""
Delete notable event object from KV store
@type object_id: basestring
@param object_id: object id
@type kwargs: dict
@param kwargs: extra params
@return: None
"""
if is_valid_str(object_id):
self.pre_processing([{self.id_key: object_id}], MethodType.DELETE)
logger.debug('Deleting %s:%s event', self.id_key, object_id)
return self.storage_interface.delete(
self.master_session_key,
self.get_user(**kwargs),
self.object_type,
object_id,
current_user_name=self.current_user_name,
host_base_uri=self.host_base_uri
)
else:
message = 'Id cannot be empty or invalid id=%s.' % object_id
logger.error(message)
raise TypeError(message)
@time_function_call
def delete_bulk(self, object_ids, **kwargs):
"""
Delete bulk
@type object_ids: list
@param object_ids: object list to delete
@type kwargs: dict
@param kwargs: extra params to delete
@return:
"""
filter_data = self.fetch_filter_data(**kwargs)
if is_valid_list(object_ids) and len(object_ids) != 0:
self.merge_filter_data(filter_data, self.get_filter_data(object_ids))
if filter_data:
filter_data = validate_json('[event_management_interface]', filter_data)
if isinstance(object_ids, list):
self.pre_processing([{self.id_key: eid for eid in object_ids}], MethodType.DELETE_BULK)
logger.debug('Deleting events ids=%s, other arguments=%s', object_ids, filter_data)
return self.storage_interface.delete_all(
self.master_session_key,
self.get_user(**kwargs),
self.object_type,
filter_data,
current_user_name=self.current_user_name,
host_base_uri=self.host_base_uri
)
def send_activity_to_audit(self, data, activity, type):
"""
Send audit event to itsi_notable_audit index on episode updates, it will perform
lazy initialization for Audit object.
@type data: list or dict
@param data: Content of event or list of events that is sent to audit index
@type activity: list or str
@param activity: String to describe the activity
@type type: str
@param type: Activity Type
@return:
"""
if not self.audit:
self.audit = Audit(self.session_key,
audit_token_name='Auto Generated ITSI Notable Index Audit Token')
if isinstance(data, list) and isinstance(activity, list):
self.audit.send_activity_to_audit_bulk(data, activity, type)
elif isinstance(data, dict) and isinstance(activity, str):
self.audit.send_activity_to_audit(data, activity, type)
else:
logger.error('Unsupported data (value=%s) or activity (value=%s)', data, activity)
def lazy_init_notable_event_configuration(self):
"""
Lazy initialize notable event configuration object.
:return:
"""
if self.notable_event_configuration is None:
if self.action_dispatch_config:
self.host_base_uri = self.action_dispatch_config.remote_ea_mgmt_uri
self.master_session_key = self.action_dispatch_config.get_master_host_session_key()
# Perform lazy initialization
self.notable_event_configuration = NotableEventConfiguration(
self.master_session_key,
logger,
host_base_uri=self.host_base_uri
)