You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
662 lines
23 KiB
662 lines
23 KiB
# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
|
|
|
|
import time
|
|
|
|
from ITOA.itoa_common import is_valid_dict, is_valid_list, is_valid_str, validate_json
|
|
from ITOA.storage import itoa_storage
|
|
from ITOA.setup_logging import logger
|
|
from .notable_event_utils import Audit, MethodType, NotableEventConfiguration
|
|
|
|
|
|
class EventManagementException(Exception):
|
|
pass
|
|
|
|
|
|
def time_function_call(fx):
|
|
"""
|
|
This decorator will provide a log message measuring how long a function call took.
|
|
|
|
Arguments:
|
|
fx -- The function to measure
|
|
"""
|
|
|
|
def wrapper(*args, **kwargs):
|
|
|
|
logger.debug("Started operation=%s", fx.__name__)
|
|
|
|
t = time.time()
|
|
|
|
r = fx(*args, **kwargs)
|
|
|
|
logger.info('[Change Tracking] Successfully called operation="%s"', fx.__name__)
|
|
|
|
diff = time.time() - t
|
|
|
|
diff_string = str(round(diff, 2)) + " seconds"
|
|
|
|
logger.info('[Performance Tracking] Completed Notable Event operation="%s", duration="%s"', fx.__name__,
|
|
diff_string)
|
|
|
|
return r
|
|
|
|
return wrapper
|
|
|
|
|
|
class BaseEventManagement(object):
|
|
"""
|
|
A generic class which has CURD operation and bulk curd operation to perform for any object level
|
|
"""
|
|
|
|
# Key which hold id
|
|
id_key = '_key'
|
|
|
|
def __init__(self, session_key, collection, object_type, user='nobody', current_user_name=None, action_dispatch_config=None):
|
|
"""
|
|
Initialize objects
|
|
|
|
@type session_key: basestring
|
|
@param session_key: session_key
|
|
|
|
@type collection: basestring
|
|
@param collection: collection name
|
|
|
|
@type object_type: basestring
|
|
@param object_type: object type
|
|
|
|
@type user: basestring
|
|
@param user: user name
|
|
|
|
@type current_user_bame: basestring
|
|
@param current_user_name: user name
|
|
|
|
@type action_dispatch_config: ActionDispatchConfiguration
|
|
@param action_dispatch_config: the setting for hybrid action dispatch
|
|
|
|
@rtype: object
|
|
@return: instance of the class
|
|
"""
|
|
if not is_valid_str(session_key):
|
|
message = "Invalid session key."
|
|
logger.error(message)
|
|
raise ValueError(message)
|
|
else:
|
|
self.session_key = session_key
|
|
|
|
self.action_dispatch_config = action_dispatch_config
|
|
self.audit = None
|
|
self.notable_event_configuration = None
|
|
self.host_base_uri = ''
|
|
self.master_session_key = self.session_key
|
|
self.owner = user
|
|
self.current_user_name = current_user_name
|
|
|
|
if not is_valid_str(collection):
|
|
logger.error("Invalid collection name=%s", collection)
|
|
raise ValueError('Invalid collection name')
|
|
else:
|
|
self.collection = collection
|
|
|
|
self.object_type = object_type
|
|
|
|
self.object_type_key = 'object_type'
|
|
|
|
self.storage_interface = itoa_storage.ITOAStorage(collection=self.collection)
|
|
|
|
def pre_processing(self, data_list, method):
|
|
"""
|
|
This is being used by inherit class which can be used
|
|
to validate schema or inject some default values like time
|
|
etc
|
|
@type data_list: list
|
|
@param data_list: data list
|
|
|
|
@type method: basestring
|
|
@param method: method name
|
|
|
|
@return: None
|
|
"""
|
|
pass
|
|
|
|
def get_filter_data(self, object_ids):
|
|
"""
|
|
return filter base upon _key
|
|
|
|
@type: object_ids: list
|
|
@param object_ids: object list
|
|
|
|
@rtype: basestring
|
|
@return: return filter string
|
|
"""
|
|
if is_valid_list(object_ids):
|
|
return {'$or': [{self.id_key: object_id} for object_id in object_ids]}
|
|
else:
|
|
raise TypeError('%s is not list' % object_ids)
|
|
|
|
def merge_filter_data(self, filter_data, new_data):
|
|
"""
|
|
Merge filter passed in request and filter created later
|
|
|
|
Update filter_data in place
|
|
|
|
@type filter_data: dict
|
|
@param filter_data: filter data
|
|
|
|
@type new_data: dict
|
|
@param new_data: newly create filter
|
|
@return:
|
|
"""
|
|
if not is_valid_dict(new_data) or not is_valid_dict(filter_data):
|
|
raise TypeError('Invalid filter data to merge')
|
|
|
|
for key, value in new_data.items():
|
|
if key in filter_data:
|
|
filter_data[key].extend(value)
|
|
else:
|
|
filter_data[key] = value
|
|
|
|
return validate_json('[event_management_interface]', filter_data)
|
|
|
|
def get_user(self, **kwargs):
|
|
"""
|
|
Return user
|
|
@param kwargs: dict which hold some configuration
|
|
|
|
@rtype: basestring
|
|
@return: return user
|
|
"""
|
|
return self.owner if self.owner else kwargs.get('owner')
|
|
|
|
def fetch_filter_data(self, **kwargs):
|
|
"""
|
|
Check filter data in kwargs and return dict form of it
|
|
|
|
@type kwargs: dict
|
|
@param kwargs: kwargs
|
|
|
|
@rtype: dict
|
|
@return: return filter data
|
|
"""
|
|
filter_data = {}
|
|
if kwargs.get('filter_data') and kwargs.get('filter') in kwargs:
|
|
f_data_1 = kwargs.get('filter_data')
|
|
f_data_2 = kwargs.get('filter')
|
|
if f_data_1:
|
|
f_data_1 = validate_json('[event_management_interface]', f_data_1)
|
|
if f_data_2:
|
|
f_data_2 = validate_json('[event_management_interface]', f_data_2)
|
|
if f_data_1 is not None and filter_data is not None:
|
|
filter_data = self.merge_data(f_data_1, f_data_2)
|
|
else:
|
|
filter_data = f_data_1 or f_data_2
|
|
else:
|
|
filter_data = kwargs.get('filter') or kwargs.get('filter_data')
|
|
|
|
if filter_data:
|
|
filter_data = validate_json('[event_management_interface]', filter_data)
|
|
return filter_data or {}
|
|
|
|
def inject_object_type(self, data_list):
|
|
"""
|
|
Insert object type if it is not set
|
|
|
|
@type data_list: list
|
|
@param data_list: data list
|
|
|
|
@return: in place update
|
|
"""
|
|
# make sure object type is set
|
|
for data in data_list:
|
|
if 'object_type' not in data:
|
|
data['object_type'] = self.object_type
|
|
elif data.get('object_type') != self.object_type:
|
|
data['object_type'] = self.object_type
|
|
|
|
@time_function_call
|
|
def create(self, data, **kwargs):
|
|
"""
|
|
Create notable event object
|
|
|
|
@type data - dict
|
|
@param data - notable event schema to create
|
|
|
|
@rtype dict
|
|
@return create object _key or raise an exception
|
|
"""
|
|
if is_valid_dict(data):
|
|
self.inject_object_type([data])
|
|
self.pre_processing([data], MethodType.CREATE)
|
|
result = self.storage_interface.create(
|
|
self.master_session_key,
|
|
self.get_user(**kwargs),
|
|
self.object_type,
|
|
data,
|
|
current_user_name=self.current_user_name,
|
|
host_base_uri=self.host_base_uri
|
|
)
|
|
logger.debug("Create %s object ID=%s.", self.object_type, result.get(self.id_key))
|
|
return result
|
|
else:
|
|
message = "Data is not a valid dictionary, data type=%s." % type(data)
|
|
logger.error(message)
|
|
raise TypeError(message)
|
|
|
|
def create_for_group(self, data, **kwargs):
|
|
"""
|
|
Create stuff for events in a Group.
|
|
"""
|
|
raise NotImplementedError('Derived class must implement this method')
|
|
|
|
@time_function_call
|
|
def create_bulk(self, data_list, **kwargs):
|
|
"""
|
|
Create more than one notable event object
|
|
|
|
@type data_list: list
|
|
@param data_list: data list
|
|
|
|
@rtype: list
|
|
@return: list of created
|
|
"""
|
|
try:
|
|
validate_json('[Notable Event Curd]', data_list)
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
message = 'Invalid JSON list to do bulk create.'
|
|
logger.error(message)
|
|
raise TypeError(message)
|
|
|
|
# make sure object type is set
|
|
self.inject_object_type(data_list)
|
|
self.pre_processing(data_list, MethodType.CREATE_BULK)
|
|
|
|
results = self.storage_interface.batch_save(
|
|
self.master_session_key,
|
|
self.get_user(**kwargs),
|
|
data_list,
|
|
objecttype=self.object_type,
|
|
current_user_name=self.current_user_name,
|
|
host_base_uri=self.host_base_uri
|
|
)
|
|
return results
|
|
|
|
@time_function_call
|
|
def get(self, object_id, **kwargs):
|
|
"""
|
|
Get notable event object
|
|
|
|
@type object_id: basestring
|
|
@param object_id: notable event key
|
|
|
|
@rtype: dict
|
|
@return: return notable event schema
|
|
"""
|
|
if is_valid_str(object_id):
|
|
result = self.storage_interface.get(
|
|
self.master_session_key,
|
|
self.get_user(**kwargs),
|
|
self.object_type,
|
|
object_id,
|
|
current_user_name=self.current_user_name,
|
|
host_base_uri=self.host_base_uri
|
|
)
|
|
return result
|
|
else:
|
|
message = 'Invalid key to get object, value=%s.' % object_id
|
|
logger.error(message)
|
|
raise TypeError(message)
|
|
|
|
@time_function_call
|
|
def get_bulk(self, object_ids, **kwargs):
|
|
"""
|
|
Get one or more than one notable event objects
|
|
|
|
@type object_ids: list
|
|
@param object_ids: list of objects to get
|
|
Note: if object list is empty or not defined then get all objects
|
|
|
|
@type kwargs: dict
|
|
@param kwargs: extra arguments to fetch notable events
|
|
|
|
@rtype: list
|
|
@return: list of notable events
|
|
"""
|
|
filter_data = self.fetch_filter_data(**kwargs)
|
|
|
|
if is_valid_list(object_ids) and len(object_ids) != 0:
|
|
self.merge_filter_data(filter_data, self.get_filter_data(object_ids))
|
|
|
|
logger.debug('Updated filter data=%s', filter_data)
|
|
fields = kwargs.get('fields', [])
|
|
if not fields and kwargs.get('f'):
|
|
fields = kwargs.get('f')
|
|
kwargs['fields'] = fields
|
|
if isinstance(fields, str) and ',' in fields:
|
|
kwargs['fields'] = fields.split(',')
|
|
|
|
limit = kwargs.get('count')
|
|
skip = kwargs.get('offset')
|
|
if limit is None and skip is None:
|
|
# If count and offset are undefined, try limit and skip
|
|
limit = kwargs.get('limit')
|
|
skip = kwargs.get('skip')
|
|
|
|
results = self.storage_interface.get_all(
|
|
self.master_session_key,
|
|
self.get_user(**kwargs),
|
|
self.object_type,
|
|
sort_key=kwargs.get('sort_key'),
|
|
filter_data=filter_data,
|
|
sort_dir=kwargs.get('sort_dir'),
|
|
fields=kwargs.get('fields'),
|
|
skip=skip,
|
|
limit=limit,
|
|
current_user_name=self.current_user_name,
|
|
host_base_uri=self.host_base_uri
|
|
)
|
|
logger.debug("Return %s notable events", len(results))
|
|
return results
|
|
|
|
def get_and_merge_data_list(self, object_ids, data_list, is_partial_update=True, **kwargs):
|
|
"""
|
|
Useful function to do partial update. Its merge data from backend with request data
|
|
|
|
@type object_ids: list
|
|
@param object_ids: list of objects to fetch from backend
|
|
|
|
@type data_list: list
|
|
@param data_list: data list which is passed in the request
|
|
|
|
@type is_partial_update: bool
|
|
@param is_partial_update: set to true it is partial update
|
|
|
|
@param **kwargs: Key word arguments to provide additional args to those who override this method
|
|
|
|
@rtype: list
|
|
@return: Merged data
|
|
"""
|
|
results = self.get_bulk(object_ids, **kwargs)
|
|
is_group_state_action = False
|
|
if isinstance(data_list, list) and len(data_list) > 0:
|
|
# if there is 'group_state_change_action' key in the value then this is a group state change action
|
|
is_group_state_action = 'group_state_change_action' in data_list[0]
|
|
logger.info('is_group_state_action=%s objects_to_modify=%s', is_group_state_action, object_ids)
|
|
if (results is None or len(results) == 0) and not is_group_state_action:
|
|
logger.error('Failed to get objects=%s from kv store', object_ids)
|
|
raise EventManagementException('Failed to get objects=%s from kv store' % (str(object_ids)))
|
|
mapped_objects = {}
|
|
for data in data_list:
|
|
if data.get(self.id_key) not in mapped_objects:
|
|
mapped_objects[data.get(self.id_key)] = {'data': data, 'fdata': None}
|
|
for result in results:
|
|
if result.get(self.id_key) in mapped_objects:
|
|
mapped_objects[result.get(self.id_key)]['fdata'] = result
|
|
|
|
for value in mapped_objects.values():
|
|
if is_group_state_action:
|
|
if value['fdata']: # updating an existing group
|
|
fields_to_update = value['data'].get('fields_to_update', [])
|
|
# remove the fields which don't have to be updated. Can't delete items of dict while iterating
|
|
# hence getting a copy of the keys the dict's keys by converting the keys to list
|
|
for key in list(value['data']):
|
|
# don't delete the action_id. It's needed in data_list for logging purposes
|
|
if key == 'action_id':
|
|
continue
|
|
if key not in fields_to_update:
|
|
value['data'].pop(key, None)
|
|
logger.info('group_state_change_action updating the group group_id=%s fields_to_update=%s',
|
|
value['data'].get('_key', None), value['data'])
|
|
else: # no existing group in KV Store, let's create a new one
|
|
# remove fields_to_update and group_state_change_action because we don't want to insert these
|
|
# fields in the collection
|
|
value['data'].pop('fields_to_update', None)
|
|
value['data'].pop('group_state_change_action', None)
|
|
# no existing object in KV Store so let's create a new one by inserting the whole object passed
|
|
value['fdata'] = value['data']
|
|
logger.info('group_state_change_action creating new group group_id=%s new_group_object=%s',
|
|
value['data'].get('_key', None), value['data'])
|
|
continue
|
|
self.merge_data(value['fdata'], value['data'], is_partial_update)
|
|
logger.debug('final_merged_data data_list=%s', data_list)
|
|
return data_list
|
|
|
|
def get_and_merge_data(self, object_id, data, is_partial_update=True, **kwargs):
|
|
"""
|
|
Similar function but it deals with one object instead of list
|
|
Note: this is inplace update to data dict
|
|
|
|
@type object_id: basestring
|
|
@param object_id: object id
|
|
|
|
@type data: dict
|
|
@param data: data is passed in the request
|
|
|
|
@type is_partial_update: bool
|
|
@param is_partial_update: set to true it is partial update
|
|
|
|
@param **kwargs: Key word arguments to provide additional args to those who override this method
|
|
|
|
@rtype: dict
|
|
@return: Merge data
|
|
"""
|
|
result = self.get(object_id)
|
|
if result is None:
|
|
errorMsg = "Failed to get object ID=%s from KV store." % object_id
|
|
logger.error(errorMsg)
|
|
raise EventManagementException(errorMsg)
|
|
return self.merge_data(result, data, is_partial_update)
|
|
|
|
def merge_data(self, fetched_data, data, is_partial_update=True):
|
|
"""
|
|
Helper function to merge request data with backend data
|
|
Note: this is inplace update to data dict
|
|
|
|
@type fetched_data: dict
|
|
@param fetched_data: Fetch data from backend
|
|
|
|
@param data: dict
|
|
@param data: request data
|
|
|
|
@type is_partial_update: bool
|
|
@param is_partial_update: set to true it is partial update
|
|
|
|
@rtype: dict
|
|
@return: return updated data (inplace update to data)
|
|
"""
|
|
if data is None or fetched_data is None:
|
|
logger.error("data or fetched data is None")
|
|
raise EventManagementException('data or fetched data is None')
|
|
|
|
for key, value in fetched_data.items():
|
|
if is_partial_update and key not in data:
|
|
data[key] = value
|
|
|
|
return data
|
|
|
|
@time_function_call
|
|
def update(self, object_id, data, is_partial_update=False, **kwargs):
|
|
"""
|
|
Update one notable event object
|
|
|
|
@type object_id: basestring
|
|
@param object_id: object id
|
|
|
|
@type data: dict
|
|
@param data: data
|
|
|
|
@type is_partial_update: bool
|
|
@param is_partial_update: flag to do partial update
|
|
|
|
@type kwargs: dict
|
|
@param kwargs: Extra parameters
|
|
|
|
@rtype: dict
|
|
@return: return dict which holds updated keys
|
|
"""
|
|
if is_valid_str(object_id):
|
|
self.get_and_merge_data(object_id, data, is_partial_update, **kwargs)
|
|
|
|
self.inject_object_type([data])
|
|
self.pre_processing([data], MethodType.UPDATE)
|
|
|
|
result = self.storage_interface.edit(
|
|
self.master_session_key,
|
|
self.get_user(**kwargs),
|
|
self.object_type,
|
|
object_id,
|
|
data,
|
|
current_user_name=self.current_user_name,
|
|
host_base_uri=self.host_base_uri
|
|
)
|
|
return result
|
|
else:
|
|
message = 'Object ID is not a valid string, value=%s.' % object_id
|
|
logger.error(message)
|
|
raise TypeError(message)
|
|
|
|
@time_function_call
|
|
def update_bulk(self, object_ids, data_list, is_partial_update=False, skip_get_merge=False, **kwargs):
|
|
"""
|
|
Perform update for one or more notable event objects
|
|
|
|
@type object_ids: list
|
|
@param object_ids: notable events
|
|
|
|
@type data_list: list
|
|
@param data_list: notable events
|
|
|
|
@type is_partial_update: bool
|
|
@param is_partial_update: flag for partial update
|
|
|
|
@type skip_get_merge: bool
|
|
@param skip_get_merge: flag for skipping calling get_and_merge_data_list or not
|
|
|
|
@type kwargs: dict
|
|
@param kwargs: Extra params to perform
|
|
|
|
@rtype: list
|
|
@return: update notable event schema
|
|
"""
|
|
if is_valid_list(object_ids):
|
|
if not skip_get_merge:
|
|
self.get_and_merge_data_list(object_ids, data_list, is_partial_update, **kwargs)
|
|
self.inject_object_type(data_list)
|
|
self.pre_processing(data_list, MethodType.UPDATE_BULK)
|
|
results = self.storage_interface.batch_save(
|
|
self.master_session_key,
|
|
self.get_user(**kwargs),
|
|
data_list,
|
|
objecttype=self.object_type,
|
|
current_user_name=self.current_user_name,
|
|
host_base_uri=self.host_base_uri
|
|
)
|
|
return results
|
|
else:
|
|
message = 'Object IDs is not a valid list, value=%s.' % object_ids
|
|
logger.error(message)
|
|
raise TypeError(message)
|
|
|
|
@time_function_call
|
|
def delete(self, object_id, **kwargs):
|
|
"""
|
|
Delete notable event object from KV store
|
|
|
|
@type object_id: basestring
|
|
@param object_id: object id
|
|
|
|
@type kwargs: dict
|
|
@param kwargs: extra params
|
|
|
|
@return: None
|
|
"""
|
|
if is_valid_str(object_id):
|
|
self.pre_processing([{self.id_key: object_id}], MethodType.DELETE)
|
|
logger.debug('Deleting %s:%s event', self.id_key, object_id)
|
|
return self.storage_interface.delete(
|
|
self.master_session_key,
|
|
self.get_user(**kwargs),
|
|
self.object_type,
|
|
object_id,
|
|
current_user_name=self.current_user_name,
|
|
host_base_uri=self.host_base_uri
|
|
)
|
|
else:
|
|
message = 'Id cannot be empty or invalid id=%s.' % object_id
|
|
logger.error(message)
|
|
raise TypeError(message)
|
|
|
|
@time_function_call
|
|
def delete_bulk(self, object_ids, **kwargs):
|
|
"""
|
|
Delete bulk
|
|
|
|
@type object_ids: list
|
|
@param object_ids: object list to delete
|
|
|
|
@type kwargs: dict
|
|
@param kwargs: extra params to delete
|
|
@return:
|
|
"""
|
|
filter_data = self.fetch_filter_data(**kwargs)
|
|
|
|
if is_valid_list(object_ids) and len(object_ids) != 0:
|
|
self.merge_filter_data(filter_data, self.get_filter_data(object_ids))
|
|
|
|
if filter_data:
|
|
filter_data = validate_json('[event_management_interface]', filter_data)
|
|
|
|
if isinstance(object_ids, list):
|
|
self.pre_processing([{self.id_key: eid for eid in object_ids}], MethodType.DELETE_BULK)
|
|
logger.debug('Deleting events ids=%s, other arguments=%s', object_ids, filter_data)
|
|
|
|
return self.storage_interface.delete_all(
|
|
self.master_session_key,
|
|
self.get_user(**kwargs),
|
|
self.object_type,
|
|
filter_data,
|
|
current_user_name=self.current_user_name,
|
|
host_base_uri=self.host_base_uri
|
|
)
|
|
|
|
def send_activity_to_audit(self, data, activity, type):
|
|
"""
|
|
Send audit event to itsi_notable_audit index on episode updates, it will perform
|
|
lazy initialization for Audit object.
|
|
|
|
@type data: list or dict
|
|
@param data: Content of event or list of events that is sent to audit index
|
|
|
|
@type activity: list or str
|
|
@param activity: String to describe the activity
|
|
|
|
@type type: str
|
|
@param type: Activity Type
|
|
|
|
@return:
|
|
"""
|
|
if not self.audit:
|
|
self.audit = Audit(self.session_key,
|
|
audit_token_name='Auto Generated ITSI Notable Index Audit Token')
|
|
if isinstance(data, list) and isinstance(activity, list):
|
|
self.audit.send_activity_to_audit_bulk(data, activity, type)
|
|
elif isinstance(data, dict) and isinstance(activity, str):
|
|
self.audit.send_activity_to_audit(data, activity, type)
|
|
else:
|
|
logger.error('Unsupported data (value=%s) or activity (value=%s)', data, activity)
|
|
|
|
def lazy_init_notable_event_configuration(self):
|
|
"""
|
|
Lazy initialize notable event configuration object.
|
|
:return:
|
|
"""
|
|
if self.notable_event_configuration is None:
|
|
if self.action_dispatch_config:
|
|
self.host_base_uri = self.action_dispatch_config.remote_ea_mgmt_uri
|
|
self.master_session_key = self.action_dispatch_config.get_master_host_session_key()
|
|
|
|
# Perform lazy initialization
|
|
self.notable_event_configuration = NotableEventConfiguration(
|
|
self.master_session_key,
|
|
logger,
|
|
host_base_uri=self.host_base_uri
|
|
)
|