You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
998 lines
38 KiB
998 lines
38 KiB
# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
|
|
|
|
import itsi_py3
|
|
import time
|
|
import asyncio
|
|
import time as time_import
|
|
import copy
|
|
from abc import ABCMeta, abstractmethod
|
|
from ITOA.event_management.notable_event_tag import NotableEventTag
|
|
from ITOA.event_management.notable_event_utils import filter_index_fields_and_get_event_id_for_notable_event, \
|
|
SearchUtils, Audit, NotableEventException, NotableEventConfiguration, MethodType
|
|
from ITOA.itoa_common import is_valid_dict, is_valid_list, is_valid_str, validate_json, get_current_utc_epoch
|
|
from ITOA.storage import itoa_storage
|
|
from ITOA.setup_logging import logger as itsi_logger
|
|
from .base_event_management import time_function_call, BaseEventManagement, EventManagementException
|
|
from .push_event_manager import PushEventManager
|
|
from .itsi_nats_publish import NatsEventPublisher
|
|
import socket
|
|
|
|
|
|
def lazy_property(fn):
|
|
'''Decorator that makes a property lazy-evaluated.
|
|
'''
|
|
attr_name = '_lazy_' + fn.__name__
|
|
|
|
@property
|
|
def _lazy_property(self):
|
|
if not hasattr(self, attr_name):
|
|
setattr(self, attr_name, fn(self))
|
|
return getattr(self, attr_name)
|
|
return _lazy_property
|
|
|
|
|
|
class NotableEvent(BaseEventManagement, metaclass=ABCMeta):
|
|
def __init__(self, session_key,
|
|
index_name,
|
|
current_user_name=None,
|
|
audit_token_name='Notable Index Audit Token',
|
|
tag_collection='itsi_notable_event_tag',
|
|
state_collection='itsi_notable_event_state',
|
|
logger=None,
|
|
**kwargs):
|
|
"""
|
|
Initialize objects
|
|
|
|
@type session_key: basestring
|
|
@param session_key: session_key
|
|
|
|
## AUDITING FOR NOTABLE EVENTS IS DEPRECATED AS OF 4.0.0. MODIFICATIONS TO INDIVIDUAL EVENTS CANNOT BE MADE.
|
|
@type audit_token_name: basestring
|
|
@param audit_token_name: Http listener token name to audit any create, update, delete
|
|
|
|
# Tag and collection is required to delete tags and comments
|
|
## BOTH FUNCTIONALITIES ARE DEPRECATED AS OF 4.0.0. MODIFICATIONS TO INDIVIDUAL EVENTS CANNOT BE MADE.
|
|
@type tag_collection: basestring
|
|
@param tag_collection: Tag collection name
|
|
|
|
@type comment_collection: basestring
|
|
@param comment_collection: comment collection
|
|
|
|
@type kwargs: dict
|
|
@param kwargs: Additional settings like token name, audit index etc
|
|
|
|
@rtype: object
|
|
@return: instance of the class
|
|
"""
|
|
self.host_base_uri = ''
|
|
self.master_session_key = session_key
|
|
self.logger = logger if logger else itsi_logger
|
|
|
|
if not (is_valid_str(session_key) and is_valid_str(index_name)):
|
|
errorMsg = 'Invalid session key or index name.'
|
|
self.logger.error(errorMsg)
|
|
raise NotableEventException(errorMsg)
|
|
else:
|
|
self.session_key = session_key
|
|
|
|
self.index_name = index_name
|
|
self.audit_token_name = audit_token_name
|
|
self.mod_time_key = 'mod_time'
|
|
self.create_time_key = 'create_time'
|
|
default_token_name = 'Auto Generated Event Management Token'
|
|
self.token_name = kwargs.get('token_name', default_token_name)
|
|
|
|
# Extra arguments
|
|
self.kwargs = kwargs
|
|
self.id_key = 'event_id'
|
|
|
|
# Consider all 4 lines below DEPRECATED as of 4.0.0. Notable Events no longer have tags, comments, or states.
|
|
self.tag_object = NotableEventTag(session_key, collection=tag_collection)
|
|
self.storage_interface = itoa_storage.ITOAStorage(collection=state_collection)
|
|
self.object_type = 'notable_event_state'
|
|
self.owner = kwargs.get('user', 'nobody')
|
|
self.current_user_name = current_user_name
|
|
self.fields_to_track = ['status', 'severity', 'owner']
|
|
|
|
@lazy_property
|
|
def push_manager(self):
|
|
return PushEventManager(self.session_key, self.token_name)
|
|
|
|
@lazy_property
|
|
def search_utils(self):
|
|
return SearchUtils(self.session_key, self.logger, self.index_name,
|
|
user=self.kwargs.get('user', 'nobody'),
|
|
namespace=self.kwargs.get('namespace', 'itsi'))
|
|
|
|
@lazy_property
|
|
def audit(self):
|
|
return Audit(self.session_key, audit_token_name=self.audit_token_name)
|
|
|
|
@lazy_property
|
|
def notable_event_configuration(self):
|
|
return NotableEventConfiguration(self.session_key, self.logger)
|
|
|
|
def pre_processing(self, data_list, method):
|
|
"""
|
|
Add mod_time and create_time to the notable event states
|
|
|
|
@type data_list: list
|
|
@param data_list: list of data to validate and add time, user info etc
|
|
|
|
@type method: basestring
|
|
@param method: method type
|
|
|
|
@rtype: list
|
|
@return: It updates list in place and also return it back as well
|
|
"""
|
|
if not isinstance(data_list, list):
|
|
error_msg = 'Data_list: {0} is not a valid list, data_list type is {1}.'.format(data_list, type(data_list))
|
|
self.logger.error(error_msg)
|
|
raise TypeError(error_msg)
|
|
|
|
for data in data_list:
|
|
# Make sure data is valid dict
|
|
if not isinstance(data, dict):
|
|
error_msg = 'Data: {0} is not a valid dictionary, data type is {1}.'.format(data, type(data))
|
|
self.logger.error(error_msg)
|
|
raise TypeError(error_msg)
|
|
|
|
# Ensure '_key' is set to id_key for kv store request
|
|
if self.id_key in data:
|
|
data['_key'] = data.get(self.id_key)
|
|
|
|
time_value = time.time()
|
|
if method in (MethodType.CREATE, MethodType.CREATE_BULK):
|
|
# Add create time
|
|
data[self.create_time_key] = time_value
|
|
if method not in (MethodType.DELETE, MethodType.DELETE_BULK, MethodType.GET, MethodType.GET_BULK):
|
|
# Need to set mod time for create and update
|
|
data[self.mod_time_key] = time_value
|
|
|
|
return data_list
|
|
|
|
@abstractmethod
|
|
def validate_schema(self, data):
|
|
"""
|
|
Validate schema prior to perform an operation
|
|
|
|
@type data: dict
|
|
@param data: schema to validate
|
|
|
|
@rtype: bool
|
|
@return: True - if validation pass otherwise False
|
|
"""
|
|
raise NotImplementedError('Not implemented')
|
|
|
|
def validate_schema_list(self, data_list):
|
|
"""
|
|
Validate schema list
|
|
|
|
@type data_list: list
|
|
@param data_list: list of document/schema
|
|
|
|
@rtype: bool
|
|
@return: True/False
|
|
"""
|
|
ret = True
|
|
if is_valid_list(data_list):
|
|
for data in data_list:
|
|
ret = ret and self.validate_schema(data)
|
|
else:
|
|
self.logger.error('Cannot validate schema because of invalid list.')
|
|
return ret
|
|
|
|
def add_time(self, data):
|
|
"""
|
|
Add create time and mod time to data
|
|
|
|
@type data: dict
|
|
@param data: data to add create time and mod time
|
|
|
|
@rtype: dict
|
|
@return: updated data or raise exception
|
|
"""
|
|
if not is_valid_dict(data):
|
|
self.logger.error('Failed to add create time because of invalid format of data')
|
|
raise NotableEventException('Failed to add create time because of invalid format of data')
|
|
|
|
# add mod time
|
|
data = self.upsert_mod_time(data)
|
|
|
|
return data
|
|
|
|
def upsert_mod_time(self, data):
|
|
"""
|
|
Add or update mod time
|
|
|
|
@type data: dict
|
|
@param data: data to add create time and mod time
|
|
|
|
@rtype: dict
|
|
@return: updated data or raise exception
|
|
"""
|
|
if not is_valid_dict(data):
|
|
self.logger.error('Failed to add %s time', self.mod_time_key)
|
|
raise NotableEventException('Failed to add required %s key' % self.mod_time_key)
|
|
|
|
data[self.mod_time_key] = str(get_current_utc_epoch())
|
|
return data
|
|
|
|
def _upsert_old_values(self, fetched_data, data):
|
|
"""
|
|
Add old values to data to set up values for activity tracking
|
|
|
|
@type fetched_data: dict
|
|
@param fetched_data: old data values to graft onto data
|
|
|
|
@type data: dict
|
|
@param data: data to add old value tracking to
|
|
|
|
@rtype: dict
|
|
@return: data, updated in place with old value tracking
|
|
"""
|
|
# graft fetched data onto object to update so that activity tracking can use it
|
|
for key in list(fetched_data.keys()):
|
|
if key != self.id_key and key != '_key':
|
|
data['__old__' + key] = fetched_data.get(key)
|
|
|
|
def _upsert_id_key(self, data, object_id):
|
|
"""
|
|
Add object_id to data
|
|
|
|
@type data: dict
|
|
@param data: data to add id key to
|
|
|
|
@type object_id: basestring
|
|
@param object_id: id key to add
|
|
|
|
@rtype: dict
|
|
@return: updated data or raise exception
|
|
"""
|
|
if not is_valid_dict(data):
|
|
error_msg = 'Failed to add key {0} to data: {1}.'.format(object_id, data)
|
|
self.logger.error(error_msg)
|
|
raise NotableEventException(error_msg)
|
|
|
|
# ensure object_id is included in data so that event_id matches _key in kv store
|
|
data[self.id_key] = object_id
|
|
|
|
return data
|
|
|
|
@time_function_call
|
|
def create(self, data, **kwargs):
|
|
"""
|
|
Create notable event
|
|
|
|
@type data - dict
|
|
@param data - notable event schema to create
|
|
|
|
@rtype dict
|
|
@return create object _key or raise an exception
|
|
"""
|
|
if not is_valid_dict(data):
|
|
message = "Data is not a valid dictionary, data type=%s." % type(data)
|
|
self.logger.error(message)
|
|
raise NotableEventException(message)
|
|
|
|
self.add_time(data)
|
|
updated = filter_index_fields_and_get_event_id_for_notable_event(data, self.logger,
|
|
is_none_allowed=True,
|
|
event_identifier_fields_string=data.get('event_identifier_fields'),
|
|
is_token_replacement=True)
|
|
|
|
try:
|
|
self.validate_schema(updated)
|
|
except Exception:
|
|
self.logger.error('Validation failed and invalid data to create notable event, data="%s"', data)
|
|
raise
|
|
|
|
# create event id and filter some index fields
|
|
host = updated.get('host', None)
|
|
if host is None:
|
|
host = socket.gethostname()
|
|
time = updated.get('_time')
|
|
if time is None:
|
|
time = float(time_import.time())
|
|
sourcetype = updated.get('sourcetype')
|
|
if sourcetype is None:
|
|
sourcetype = 'itsi_notable:event'
|
|
source = updated.get('source')
|
|
|
|
self.push_manager.push_event(updated, host=host, time=time,
|
|
source=source, sourcetype=sourcetype)
|
|
# if queue mode is enabled, send event to NATS subject
|
|
if self.push_manager.is_queue_mode_enabled:
|
|
nats_publisher = NatsEventPublisher(self.session_key, self.logger)
|
|
events_to_ingest = []
|
|
self.update_ingest_events(updated, events_to_ingest, host, time, sourcetype, source)
|
|
asyncio.run(nats_publisher.push_events_to_nats(events_to_ingest))
|
|
self.logger.debug("Created event id=%s", updated.get(self.id_key))
|
|
return updated.get(self.id_key)
|
|
|
|
def _get_sourcetype(self):
|
|
"""
|
|
Get sourcetype, dafault value is stash
|
|
|
|
@rtype: basestring
|
|
@return: sourcetype
|
|
"""
|
|
value = self._get_field(self.SOURCETYPE)
|
|
if value is None:
|
|
# Return default
|
|
return 'itsi_notable:event'
|
|
else:
|
|
return value
|
|
|
|
def update_ingest_events(self, processed_event, events_to_ingest, host, time, sourcetype, source):
|
|
event_to_ingest = copy.deepcopy(processed_event)
|
|
event_to_ingest['host'] = host
|
|
event_to_ingest['sourcetype'] = sourcetype
|
|
event_to_ingest['_time'] = time
|
|
event_to_ingest['source'] = source
|
|
events_to_ingest.append(event_to_ingest)
|
|
|
|
@time_function_call
|
|
def create_bulk(self, data_list, **kwargs):
|
|
"""
|
|
Create more than one notable events
|
|
|
|
@type data_list: list
|
|
@param data_list: data list
|
|
|
|
@rtype: list
|
|
@return: list of created
|
|
"""
|
|
try:
|
|
validate_json('[Notable Event CRUD]', data_list)
|
|
except Exception:
|
|
self.logger.exception('Invalid JSON list to do bulk create.')
|
|
raise
|
|
|
|
results = []
|
|
events = []
|
|
|
|
# prepare bulk data
|
|
for data in data_list:
|
|
self.add_time(data)
|
|
# Create event id, mod time and filter some index fields
|
|
updated = filter_index_fields_and_get_event_id_for_notable_event(
|
|
data,
|
|
self.logger,
|
|
is_none_allowed=True,
|
|
event_identifier_fields_string=data.get('event_identifier_fields'),
|
|
is_token_replacement=True
|
|
)
|
|
try:
|
|
self.validate_schema(updated)
|
|
except Exception:
|
|
self.logger.error('Invalid data={0}'.format(updated))
|
|
raise
|
|
results.append(updated.get(self.id_key))
|
|
events.append(updated)
|
|
|
|
if events:
|
|
host = events[0].get('host')
|
|
if host is None:
|
|
host = socket.gethostname()
|
|
time = events[0].get('_time')
|
|
if time is None:
|
|
time = float(time_import.time())
|
|
sourcetype = events[0].get('sourcetype')
|
|
if sourcetype is None:
|
|
sourcetype = 'itsi_notable:event'
|
|
source = events[0].get('source')
|
|
self.push_manager.push_events(events, host=host, source=source,
|
|
sourcetype=sourcetype)
|
|
# if queue mode is enabled, send events to NATS subject
|
|
if self.push_manager.is_queue_mode_enabled:
|
|
nats_publisher = NatsEventPublisher(self.session_key, self.logger)
|
|
events_to_ingest = []
|
|
for processed_event in events:
|
|
self.update_ingest_events(processed_event, events_to_ingest, host, time, sourcetype, source)
|
|
asyncio.run(nats_publisher.push_events_to_nats(events_to_ingest))
|
|
# audit is not applicable for events that do not exist yet.
|
|
return results
|
|
|
|
@time_function_call
|
|
def get(self, object_id, **kwargs):
|
|
"""
|
|
Get notable event object
|
|
|
|
@type object_id: basestring
|
|
@param object_id: notable event key
|
|
|
|
@type kwargs: optional key value params `earliest_time` and
|
|
`latest_time`
|
|
@param kwargs: additional parameters which can optimize search time
|
|
if you know what the bucket is, this is awesome.
|
|
|
|
@rtype: dict
|
|
@return: return notable event schema
|
|
"""
|
|
if not is_valid_str(object_id):
|
|
message = 'Invalid key to get object, value=%s.' % object_id
|
|
self.logger.error(message)
|
|
raise TypeError(message)
|
|
|
|
result = self.search_utils.get_events([object_id],
|
|
earliest_time=kwargs.get('earliest_time'),
|
|
latest_time=kwargs.get('latest_time'))
|
|
result = result[0]
|
|
|
|
# get and merge results from kv store
|
|
kv_result = self._check_state_exists(object_id)
|
|
if kv_result is not None:
|
|
self._force_merge_data(kv_result, result)
|
|
|
|
return result
|
|
|
|
@time_function_call
|
|
def get_bulk(self, object_ids, **kwargs):
|
|
"""
|
|
Get one or more than one notable events
|
|
|
|
@type object_ids: list
|
|
@param object_ids: list of objects to get
|
|
|
|
@type kwargs: dict
|
|
@param kwargs: extra arguments to fetch notable events
|
|
|
|
@rtype: list
|
|
@return: list of notable events
|
|
"""
|
|
if not is_valid_list(object_ids):
|
|
message = 'Object IDs is not a valid list, value=%s.' % object_ids
|
|
self.logger.error(message)
|
|
raise TypeError(message)
|
|
|
|
results = self.search_utils.get_events(object_ids, earliest_time=kwargs.get('earliest_time'),
|
|
latest_time=kwargs.get('latest_time'))
|
|
# sort results to speed up iteration
|
|
results = sorted(results, key=lambda result: result.get(self.id_key))
|
|
|
|
# get and merge results from kv store
|
|
# TODO: pass sort by event_id as part of get request
|
|
kv_results = self._get_state_bulk(object_ids)
|
|
if len(kv_results) > 0:
|
|
# sort kv_results to speed up iteration
|
|
kv_results = sorted(kv_results, key=lambda result: result.get(self.id_key))
|
|
results_iter = 0
|
|
for kv_result in kv_results:
|
|
# find raw index result in sorted results, starting at results_iter
|
|
if results_iter < len(results):
|
|
for i in range(results_iter, len(results)):
|
|
result = results[i]
|
|
if result.get(self.id_key) == kv_result.get(self.id_key):
|
|
# merge kv values onto index result
|
|
self._force_merge_data(kv_result, result)
|
|
results_iter += 1
|
|
break
|
|
|
|
self.logger.debug("Return %s notable events", len(results))
|
|
return results
|
|
|
|
def _force_merge_data(self, fetched_data, data):
|
|
"""
|
|
Helper function to merge request data with backend data
|
|
Note: this is inplace update to data dict
|
|
|
|
@type fetched_data: dict
|
|
@param fetched_data: Fetch data from backend
|
|
|
|
@param data: dict
|
|
@param data: request data
|
|
|
|
@rtype: dict
|
|
@return: return updated data (inplace update to data)
|
|
"""
|
|
if data is None or fetched_data is None:
|
|
error_msg = 'Data or fetched data is None.'
|
|
self.logger.error(error_msg)
|
|
raise EventManagementException(error_msg)
|
|
|
|
for key, value in fetched_data.items():
|
|
data[key] = value
|
|
|
|
return data
|
|
|
|
def _get_activity(self, updated_data, activity_type=None):
|
|
"""
|
|
Return activity which is happening during update
|
|
|
|
@type updated_data: dict
|
|
@param updated_data: data to get activity
|
|
|
|
@type activity_type: basestring
|
|
@param activity_type: type of activity
|
|
|
|
@rtype: basestring
|
|
@return: activity log statement
|
|
"""
|
|
activity_tracking = ''
|
|
keys_to_delete = []
|
|
fields_to_update = [] # keep track of fields that already exist in entry
|
|
|
|
if activity_type == 'acknowledge':
|
|
return '{0} acknowledged notable event'.format(updated_data.get('owner'))
|
|
|
|
# handle fields that already exist in entry - show update from old value to new value
|
|
for key in list(updated_data.keys()):
|
|
if key.startswith('__old__'):
|
|
keys_to_delete.append(key)
|
|
actual_key = key[len('__old__'):]
|
|
if actual_key not in updated_data or actual_key not in self.fields_to_track:
|
|
continue
|
|
fields_to_update.append(actual_key)
|
|
old_value = updated_data.get(key)
|
|
new_value = updated_data.get(actual_key)
|
|
# look up label for available fields
|
|
if actual_key == 'status':
|
|
old_value = '{0} ({1})'.format(self.notable_event_configuration.status_contents.get(old_value, {}).get('label'), old_value)
|
|
new_value = '{0} ({1})'.format(self.notable_event_configuration.status_contents.get(new_value, {}).get('label'), new_value)
|
|
elif actual_key == 'severity':
|
|
old_value = '{0} ({1})'.format(self.notable_event_configuration.severity_contents.get(old_value, {}).get('label'), old_value)
|
|
new_value = '{0} ({1})'.format(self.notable_event_configuration.severity_contents.get(new_value, {}).get('label'), new_value)
|
|
activity_tracking += '{0} changed from {0}="{1}" to {0}="{2}". '.format(actual_key, old_value, new_value)
|
|
|
|
# delete old entry in the dict
|
|
for key in keys_to_delete:
|
|
del updated_data[key]
|
|
|
|
# handle fields that don't exist yet - show update to new value
|
|
for field in list(updated_data.keys()):
|
|
if field not in fields_to_update and field in self.fields_to_track:
|
|
value = updated_data.get(field)
|
|
if field == 'status':
|
|
value = '{0} ({1})'.format(self.notable_event_configuration.status_contents.get(value, {}).get('label'), value)
|
|
elif field == 'severity':
|
|
value = '{0} ({1})'.format(self.notable_event_configuration.severity_contents.get(value, {}).get('label'), value)
|
|
activity_tracking += 'updated {0}="{1}". '.format(field, value)
|
|
|
|
return activity_tracking
|
|
|
|
def _create_state(self, data, **kwargs):
|
|
"""
|
|
Create state for one notable event
|
|
|
|
@type data - dict
|
|
@param data - notable event schema to create
|
|
|
|
@rtype dict
|
|
@return created object _key or raise an exception
|
|
"""
|
|
if not isinstance(data, dict):
|
|
error_msg = 'Data: {0} is not a valid dictionary, data type is {1}.'.format(data, type(data))
|
|
self.logger.error(error_msg)
|
|
raise TypeError(error_msg)
|
|
|
|
activity = self._get_activity(data, kwargs.pop('action_type', None))
|
|
ret = super(NotableEvent, self).create(data, **kwargs)
|
|
# Create is kind of update here because event had already created with some initial state
|
|
# now we are tracking its state by creating record in KV
|
|
self.audit.send_activity_to_audit({self.id_key: data.get('_key')}, activity, 'Notable Event Update')
|
|
return ret
|
|
|
|
def _get_state(self, object_id, **kwargs):
|
|
"""
|
|
Get state for one notable event
|
|
|
|
@type object_id: basestring
|
|
@param object_id: object id
|
|
|
|
@rtype: dict
|
|
@return: notable event state
|
|
"""
|
|
return super(NotableEvent, self).get(object_id, **kwargs)
|
|
|
|
def _update_state(self, object_id, data, is_partial_update=False, **kwargs):
|
|
"""
|
|
Update state for one notable event
|
|
|
|
@type object_id: basestring
|
|
@param object_id: object id
|
|
|
|
@type data: dict
|
|
@param data: data
|
|
|
|
@type is_partial_update: bool
|
|
@param is_partial_update: flag to do partial update
|
|
|
|
@type kwargs: dict
|
|
@param kwargs: Extra parameters
|
|
|
|
@rtype: dict
|
|
@return: return dict which holds updated keys
|
|
"""
|
|
activity = self._get_activity(data, kwargs.pop('action_type', None))
|
|
ret = super(NotableEvent, self).update(object_id, data, is_partial_update, **kwargs)
|
|
self.audit.send_activity_to_audit({self.id_key: data.get('_key')}, activity, 'Notable Event Update')
|
|
|
|
return ret
|
|
|
|
def _delete_state(self, object_id, **kwargs):
|
|
"""
|
|
Delete state for one notable event
|
|
|
|
@type object_id: basestring
|
|
@param object_id: object id
|
|
|
|
@type kwargs: dict
|
|
@param kwargs: extra params
|
|
|
|
@return:
|
|
"""
|
|
return super(NotableEvent, self).delete(object_id, **kwargs)
|
|
|
|
def _create_state_bulk(self, data_list, **kwargs):
|
|
"""
|
|
Create state for one or more notable events
|
|
|
|
@type data_list: list
|
|
@param data_list: data list
|
|
|
|
@rtype: list
|
|
@return: list of created state object _keys
|
|
"""
|
|
if not isinstance(data_list, list):
|
|
error_msg = 'Data_list: {0} is not a valid list, data_list type is {1}.'.format(data_list, type(data_list))
|
|
self.logger.error(error_msg)
|
|
raise TypeError(error_msg)
|
|
|
|
action_type = kwargs.pop('action_type', None)
|
|
activities_data = []
|
|
activities = []
|
|
for data in data_list:
|
|
activities_data.append({self.id_key: data.get(self.id_key)})
|
|
activities.append(self._get_activity(data, action_type))
|
|
ret = super(NotableEvent, self).create_bulk(data_list, **kwargs)
|
|
# Create is kind of update here because events had already created with some initial state
|
|
# now we are tracking their state by creating record in KV
|
|
self.audit.send_activity_to_audit_bulk(activities_data, activities, 'Notable Event Bulk Update')
|
|
|
|
return ret
|
|
|
|
def _get_state_bulk(self, object_ids, **kwargs):
|
|
"""
|
|
Get state for one or more notable events
|
|
|
|
@type object_ids: list
|
|
@param object_ids: list of objects to get
|
|
Note: if object list is empty or not defined then get all objects
|
|
|
|
@type kwargs: dict
|
|
@param kwargs: extra arguments to fetch notable events
|
|
|
|
@rtype: list
|
|
@return: list of states of notable events
|
|
"""
|
|
return super(NotableEvent, self).get_bulk(object_ids, **kwargs)
|
|
|
|
def _update_state_bulk(self, data_list, is_partial_update=True, **kwargs):
|
|
"""
|
|
Perform update for one or more notable events
|
|
|
|
@type object_ids: list
|
|
@param object_ids: notable event IDs
|
|
|
|
@type data_list: list
|
|
@param data_list: notable events
|
|
|
|
@type is_partial_update: bool
|
|
@param is_partial_update: flag for partial update
|
|
|
|
@type kwargs: dict
|
|
@param kwargs: Extra params to perform
|
|
|
|
@rtype: list
|
|
@return: updated notable event IDs
|
|
"""
|
|
if not isinstance(data_list, list):
|
|
error_msg = 'Data_list: {0} is not a valid list, data_list type is {1}.'.format(data_list, type(data_list))
|
|
self.logger.error(error_msg)
|
|
raise TypeError(error_msg)
|
|
if len(data_list) == 0:
|
|
return []
|
|
|
|
action_type = kwargs.pop('action_type', None)
|
|
|
|
ids = []
|
|
ids_data = []
|
|
activities = []
|
|
for data in data_list:
|
|
ids.append(data.get(self.id_key))
|
|
ids_data.append({self.id_key: data.get(self.id_key)})
|
|
activities.append(self._get_activity(data, action_type))
|
|
ret = super(NotableEvent, self).update_bulk(ids, data_list, is_partial_update, **kwargs)
|
|
self.audit.send_activity_to_audit_bulk(ids_data, activities, 'Notable Event Bulk Update')
|
|
|
|
return ret
|
|
|
|
def _delete_state_bulk(self, object_ids, **kwargs):
|
|
"""
|
|
Delete bulk
|
|
|
|
@type object_ids: list
|
|
@param object_ids: object ID list to delete
|
|
|
|
@type kwargs: dict
|
|
@param kwargs: extra params to delete
|
|
|
|
@return:
|
|
"""
|
|
return super(NotableEvent, self).delete_bulk(object_ids, **kwargs)
|
|
|
|
def _check_state_exists(self, object_id):
|
|
"""
|
|
Check KV store to see if entry exists for given event ID
|
|
|
|
@type object_id: basestring
|
|
@param object_id: object id
|
|
|
|
@rtype: dict|None
|
|
@return: entry if it exists in KV store, None otherwise
|
|
"""
|
|
try:
|
|
# see if entry for event exists
|
|
return self._get_state(object_id)
|
|
except Exception:
|
|
return
|
|
|
|
def get_and_merge_data_list(self, object_ids, data_list, is_partial_update=True, **kwargs):
|
|
"""
|
|
Similar function to do partial update. It merges data from backend with request data
|
|
|
|
@type object_ids: list
|
|
@param object_ids: list of objects to fetch from backend
|
|
|
|
@type data_list: list
|
|
@param data_list: data list with his passed in the request
|
|
|
|
@type is_partial_update: bool
|
|
@param is_partial_update: set to true if it is partial update
|
|
|
|
@param **kwargs: Key word arguments to provide additional args to those who override this method
|
|
Generally expected kwargs are:
|
|
fetched_data: already fetched kv store entry for given object_id
|
|
|
|
@rtype: list
|
|
@return: Merged data
|
|
"""
|
|
# if data hasn't been fetched yet, fetch it from KV store
|
|
fetched_data = kwargs.get('fetched_data', self._get_state_bulk(object_ids))
|
|
if fetched_data is None or len(fetched_data) == 0:
|
|
error_msg = 'Failed to get state for events: {0} from KV store.'.format(str(object_ids))
|
|
self.logger.error(error_msg)
|
|
raise EventManagementException(error_msg)
|
|
|
|
mapped_objects = {}
|
|
for data in data_list:
|
|
mapped_objects[data.get(self.id_key)] = {'data': data, 'fdata': None}
|
|
# fill in fetched data values
|
|
for state_object in fetched_data:
|
|
if state_object.get(self.id_key) in mapped_objects:
|
|
mapped_objects[state_object.get(self.id_key)]['fdata'] = state_object
|
|
# merge fetched data with data to udpate
|
|
for value in mapped_objects.values():
|
|
self.merge_data(value.get('fdata'), value.get('data'), is_partial_update)
|
|
|
|
return data_list
|
|
|
|
def get_and_merge_data(self, object_id, data, is_partial_update=True, **kwargs):
|
|
"""
|
|
Similar function but it deals with one object instead of list
|
|
Note: this is inplace update to data dict
|
|
|
|
@type object_id: basestring
|
|
@param object_id: object id
|
|
|
|
@type data: dict
|
|
@param data: data is passed in the request
|
|
|
|
@type is_partial_update: bool
|
|
@param is_partial_update: set to true if it is partial update
|
|
|
|
@param **kwargs: Key word arguments to provide additional args to those who override this method
|
|
Generally expected kwargs are:
|
|
fetched_data: already fetched kv store entry for given object_id
|
|
|
|
@rtype: dict
|
|
@return: Merge data
|
|
"""
|
|
# if data hasn't been fetched yet, fetch it from KV store
|
|
fetched_data = kwargs.get('fetched_data', self._check_state_exists(object_id))
|
|
if fetched_data is None:
|
|
error_msg = 'Failed to get state for event: {0} from KV store.'.format(object_id)
|
|
self.logger.error(error_msg)
|
|
raise EventManagementException(error_msg)
|
|
|
|
return self.merge_data(fetched_data, data, is_partial_update)
|
|
|
|
@time_function_call
|
|
def update(self, object_id, data, is_partial_update=True, **kwargs):
|
|
"""
|
|
Update one notable event's state
|
|
|
|
@type object_id: basestring
|
|
@param object_id: object id
|
|
|
|
@type data: dict
|
|
@param data: data
|
|
|
|
@type kwargs: dict
|
|
@param kwargs: Extra parameters
|
|
|
|
@rtype: dict
|
|
@return: return dict which holds updated keys
|
|
"""
|
|
if not is_valid_str(object_id):
|
|
error_msg = 'Object_id: {0} is not a valid string, object_id type is {1}.'.format(object_id, type(object_id))
|
|
self.logger.error(error_msg)
|
|
raise TypeError(error_msg)
|
|
|
|
data_to_update = {}
|
|
self._upsert_id_key(data_to_update, object_id)
|
|
# grab valid fields from data
|
|
for field in self.fields_to_track:
|
|
if field in data:
|
|
data_to_update[field] = data.get(field)
|
|
|
|
# fetch event state, if it exists
|
|
fetched_event_state = self._check_state_exists(object_id)
|
|
# create entry for event if it doesn't exist yet and return
|
|
if fetched_event_state is None:
|
|
return self._create_state(data_to_update, **kwargs)
|
|
|
|
# update existing entry for event
|
|
self._upsert_old_values(fetched_event_state, data_to_update)
|
|
|
|
return self._update_state(object_id, data_to_update, is_partial_update, fetched_data=fetched_event_state, **kwargs)
|
|
|
|
@time_function_call
|
|
def update_bulk(self, object_ids, data_list, is_partial_update=True, **kwargs):
|
|
"""
|
|
Perform update for one or more notable events
|
|
|
|
Note: is_partial_update is not being used here because it is always do partial update
|
|
|
|
@type object_ids: list
|
|
@param object_ids: notable events
|
|
|
|
@type data_list: list
|
|
@param data_list: notable events
|
|
|
|
@type is_partial_update: bool
|
|
@param is_partial_update: flag for partial update
|
|
|
|
@type kwargs: dict
|
|
@param kwargs: Extra params to perform
|
|
|
|
@rtype: list
|
|
@return: updated notable event IDs
|
|
"""
|
|
if not is_valid_list(object_ids):
|
|
errorMsg = 'Object_ids: {0} is not a valid list, object_ids type is {1}.'.format(object_ids, type(object_ids))
|
|
self.logger.error(errorMsg)
|
|
raise TypeError(errorMsg)
|
|
if not is_valid_list(data_list):
|
|
errorMsg = 'Data_list: {0} is not a valid list, data_list type is {1}.'.format(data_list, type(data_list))
|
|
self.logger.error(errorMsg)
|
|
raise TypeError(errorMsg)
|
|
if len(data_list) != len(object_ids):
|
|
errorMsg = 'Object_ids: {0} to update don\'t match up with data_list: {1}.'.format(object_ids, data_list)
|
|
self.logger.error(errorMsg)
|
|
raise TypeError(errorMsg)
|
|
if len(data_list) == 0 or len(object_ids) == 0:
|
|
return []
|
|
|
|
# if action_type is not in kwargs, it is acknowledge action for "All events in the group"
|
|
# no other elegant way of passing action_type flag to _create_state_bulk
|
|
# action_type in data is removed before _create_state_bulk is called because it is not one of the fields to track
|
|
if 'action_type' not in kwargs:
|
|
kwargs['action_type'] = data_list[0].get('action_type', None)
|
|
|
|
# create map of ID to associated data
|
|
mapped_objects = {}
|
|
for event_id in object_ids:
|
|
data = [data for data in data_list if data.get(self.id_key) == event_id]
|
|
if len(data) > 0:
|
|
data = data[0]
|
|
data_to_update = {}
|
|
self._upsert_id_key(data_to_update, event_id)
|
|
# grab valid fields from data
|
|
for field in self.fields_to_track:
|
|
if field in data:
|
|
data_to_update[field] = data.get(field)
|
|
mapped_objects[event_id] = data_to_update
|
|
|
|
# fetch existing state object entries
|
|
existing_state_objects = self._get_state_bulk(object_ids)
|
|
|
|
# determine if state objects need to be updated or created based on existence in KV store
|
|
objects_to_create = []
|
|
objects_to_update = []
|
|
for object_id, mapped_object in mapped_objects.items():
|
|
existing_state_object = [existing_state_object for existing_state_object in existing_state_objects if existing_state_object.get(self.id_key) == object_id]
|
|
if len(existing_state_object) == 0:
|
|
# object doesn't exist so it needs to be created
|
|
objects_to_create.append(mapped_object)
|
|
else:
|
|
# object exists so it can be updated
|
|
existing_state_object = existing_state_object[0]
|
|
self._upsert_old_values(existing_state_object, mapped_object)
|
|
objects_to_update.append(mapped_object)
|
|
|
|
# bulk create non-existent entries
|
|
create_ret = []
|
|
create_err = None
|
|
# create entries, best-effort
|
|
if len(objects_to_create) > 0:
|
|
try:
|
|
create_ret = self._create_state_bulk(objects_to_create, **kwargs)
|
|
except Exception as e:
|
|
create_err = e
|
|
|
|
# bulk update existing entries
|
|
update_ret = []
|
|
update_err = None
|
|
# update entries, best-effort
|
|
if len(objects_to_update) > 0:
|
|
try:
|
|
update_ret = self._update_state_bulk(objects_to_update, is_partial_update, fetched_data=existing_state_objects, **kwargs)
|
|
except Exception as e:
|
|
update_err = e
|
|
|
|
# handle errors
|
|
if create_err is not None or update_err is not None:
|
|
msg = 'Notable event bulk update failed.'
|
|
if create_err is not None:
|
|
msg += ' {0}'.format(str(create_err))
|
|
if update_err is not None:
|
|
msg += ' {0}'.format(str(update_err))
|
|
self.logger.error(msg)
|
|
raise NotableEventException(msg)
|
|
|
|
# join returned values (IDs) from create and update
|
|
return create_ret + update_ret
|
|
|
|
def update_group_events(self, group_id, fields_to_update, event_filter, **kwargs):
|
|
"""
|
|
This function is being used when user want to perform action on group events with specific state to new state
|
|
Primary uses for this is rules engine
|
|
|
|
@type group_id: basestring
|
|
@param group_id: group id
|
|
|
|
@type fields_to_update: dict
|
|
@param fields_to_update: key, value of fields to update
|
|
|
|
@type event_filter: basestring
|
|
@param event_filter: event filter which needs to be apply
|
|
|
|
@return: list of ids of updated notable events
|
|
"""
|
|
if (not isinstance(group_id, itsi_py3.string_type)
|
|
or not isinstance(event_filter, itsi_py3.string_type)
|
|
or not isinstance(fields_to_update, dict)):
|
|
msg = 'Invalid group_id="{}" or fields_to_update="{}" or filter="{}".'.format(
|
|
group_id, str(fields_to_update), event_filter)
|
|
self.logger.error(msg)
|
|
raise TypeError(msg)
|
|
|
|
events = self.search_utils.update_group_events(group_id, fields_to_update, event_filter,
|
|
latest_time=kwargs.get('latest_time'),
|
|
earliest_time=kwargs.get('earliest_time'))
|
|
|
|
activity = 'changed events which match filter=`%s` to `%s`' % (event_filter, str(fields_to_update))
|
|
|
|
mapped_data_to_update = {}
|
|
for event_id in events:
|
|
mapped_data_to_update[event_id] = {self.id_key: event_id}
|
|
for field_key, field_value in fields_to_update.items():
|
|
mapped_data_to_update[event_id][field_key] = field_value
|
|
|
|
ret = self.update_bulk(list(mapped_data_to_update.keys()), list(mapped_data_to_update.values()))
|
|
self.audit.send_activity_to_audit({'event_id': group_id, 'is_group': True}, activity, 'Notable Event Bulk Update for Group')
|
|
|
|
return ret
|