You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

295 lines
12 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import itsi_py3
from uuid import uuid1
import json
import sys
from splunk.util import safeURLQuote, normalizeBoolean
from splunk.clilib.bundle_paths import make_splunkhome_path
import splunk.rest as splunk_rest
sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'bin']))
sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib', 'SA_ITOA_app_common']))
from SA_ITOA_app_common.splunklib import results
from itsi.itsi_utils import ITOAInterfaceUtils
from ITOA.itoa_common import get_current_utc_epoch
from ITOA.setup_logging import logger
from .notable_event_utils import Audit
from .push_event_manager import PushEventManager
class NotableEventComment(object):
"""
Class to create comments in itsi_grouped_alert index with sourcetype=itsi_notable:comment
{
comment_id: <comment id>,
event_id: <event id>,
_is_group: <whether or not the event id is a group id>,
user: <who initialized the operation>,
owner: <who owns the comment, usually who created the comment>,
create_time: <when the comment is created>,
mod_time: <epoch time of creating comments>,
comment: <comment string>
}
Notable Event Comment Storage is append-only.
"""
# ID Key name of this object
id_key = 'comment_id'
# Tokens for pushing comments to index
itsi_group_comments_token = 'itsi_group_comments_token'
def __init__(self, session_key, current_user_name=None,
audit_token_name='Auto Generated ITSI Notable Index Audit Token', **kwargs):
"""
Initialize
@param session_key: session key
@param current_user_name: the user name to be set for the comments, in case we want to assign custom user name for comments.
@param comment_token_name: comment token to used to send to comment index
@param audit_token_name: audit token to used to send to audit logging
@return:
"""
# Initialized base event object
self.session_key = session_key
self.audit_token_name = audit_token_name
self.audit = None
self.event_id_key = 'event_id'
self.is_group_key = '_is_group'
self.comment_id_key = self.id_key
self.mod_time_key = 'mod_time'
self.comment_key = 'comment'
self.user_key = 'user'
self.owner_key = 'owner'
self.create_time_key = 'create_time'
self.policy_id_key = 'itsi_policy_id'
self.audit = Audit(self.session_key, self.audit_token_name)
self.object_type = 'notable_event_comment'
self.user = current_user_name
self.comment = None
def clean_comment(self, comment):
"""
Get the clean comment data for audit message.
:param comment: The dict of comment.
:return: the clean version of comment
"""
return {self.event_id_key: comment.get(self.event_id_key, ''),
self.comment_id_key: comment.get(self.comment_id_key, ''),
self.comment_key: comment.get(self.comment_key, ''),
self.policy_id_key: comment.get(self.policy_id_key, '')}
def get_event_pusher(self):
"""
Get event pusher for lazy init.
:param is_sync: Whether the pusher is sync or not.
:return: the object of event pusher
"""
if self.comment is None:
self.comment = PushEventManager(self.session_key, self.itsi_group_comments_token)
return self.comment
def _prep(self, comment):
"""
Set the following field values for a comments: comment_id, user, mod_time, owner
:param comment: the dict for a comment
:return: the comment dict with the following format:
{
comment_id: <comment id>,
event_id: <event id>,
_is_group: <whether or not the event id is a group id>,
user: who <created the comments>,
owner: <who owns the comment, usually who created the comment>,
create_time: <when the comment is created>,
mod_time: <epoch time of creating comments>,
comment: <comment string>
}
"""
# Verify and set the fields
comment[self.comment_id_key] = str(uuid1())
if self.user_key not in comment:
comment[self.user_key] = self.user if self.user else self.audit.get_current_user()
if self.owner_key not in comment:
comment[self.owner_key] = comment['_owner'] if '_owner' in comment else comment[self.user_key]
if self.mod_time_key not in comment:
comment[self.mod_time_key] = str(get_current_utc_epoch())
# Remove unused keys in dict
if 'filter_search' in comment:
del comment['filter_search']
if 'earliest_time' in comment:
del comment['earliest_time']
if 'latest_time' in comment:
del comment['latest_time']
if '_owner' in comment:
del comment['_owner']
if '_user' in comment:
del comment['_user']
return comment
def get(self, object_id, **kwargs):
"""
Get operation can be supported for both comment_id and group_id. User either get only one comment by passing
object_id as comment_id or object_id can be passed as group_id to get all comments of given group_id - only
caveat is that user needs to 'is_group_id' flag to true. Default this flag is false.
This function will run a search to get comments from index. And it is only used for SDK and REST API.
We recommend UI code to use SPL to get comments for better performance.
@param object_id:
@param kwargs:
@return:
"""
if not isinstance(object_id, itsi_py3.string_type):
raise TypeError('object_id="%s" is not a valid string.' % object_id)
is_group_id = normalizeBoolean(kwargs.get('is_group_id', False))
earliest_time = 0
if is_group_id:
uri_string = '/servicesNS/nobody/SA-ITOA/storage/collections/data/itsi_notable_group_system/' + \
object_id
uri = safeURLQuote(uri_string)
logger.info('Fetching group system info for episode id: %s', object_id)
try:
res, contents = splunk_rest.simpleRequest(
uri,
method='GET',
getargs={'output_mode': 'json'},
sessionKey=self.session_key)
payload = json.loads(contents)
if payload['start_time']:
earliest_time = payload['start_time']
except Exception:
logger.warn('Failed to get group system info for episode id: %s', object_id)
search = 'search `itsi_event_management_comment_index` %s_id=%s' % ('event' if is_group_id else 'comment', object_id)
kwargs = {
'earliestTime': earliest_time,
'latestTime': get_current_utc_epoch(),
'output_mode': 'json',
'count': 50000 # TODO: We need to control the count of results. Keep the same limit(50K) as KVStore here.
}
logger.info('Running search %s to fetch comment(s)', search)
try:
service = ITOAInterfaceUtils.service_connection(self.session_key, app_name="SA-ITOA")
search_job = service.jobs.oneshot(search, **kwargs)
reader = results.JSONResultsReader(search_job)
search_results = [result for result in reader]
except Exception:
error_message = search_job.messages.get('error', [])
logger.error(f'failed to retrieve search results, Search Error {error_message}, Search {search}')
if not is_group_id and len(search_results) == 1:
# Keep the same behavior as pre-4.4.0 versions
return search_results[0]
return search_results
def create(self, data, **kwargs):
"""
Create a new comment (comments are always associated with group since 4.0.0).
@type data: dict
@param data: data which hold comment from caller
@type kwargs: dict
@param kwargs: kv args which holds extra settings
@rtype: dict
@return: created comment document:
{
comment_id: <comment id>,
comment: <comment string>,
user: who <created the comments>,
owner: <who owns the comment, usually who created the comment>,
mod_time: <epoch time of creating comments>,
create_time: <when the comment is created>
}
"""
data = self._prep(data)
if self.create_time_key not in data:
# Set create_time for creation
data[self.create_time_key] = data[self.mod_time_key]
self.get_event_pusher().push_event(data)
self.audit.send_activity_to_audit(self.clean_comment(data),
'The comment="%s" was created successfully.' % data.get(self.comment_key),
'Comment created')
return {self.comment_id_key: data.get(self.comment_id_key, ''),
self.comment_key: data.get(self.comment_key, ''),
self.user_key: data.get(self.user_key, ''),
self.owner_key: data.get(self.owner_key, ''),
self.mod_time_key: data.get(self.mod_time_key, ''),
self.create_time_key: data.get(self.create_time_key, ''),
self.policy_id_key: data.get(self.policy_id_key, '')}
def create_for_group(self, data, **kwargs):
"""
Create a new comment for group (comments are always associated with group since 4.0.0,
so this function is just a placeholder just in case it is called by other functions).
@type data: dict
@param data: data which hold comment from caller
@type kwargs: dict
@param kwargs: kv args which holds extra settings
@rtype: dict {'comment_id': <comment_id>}
@return: id of generated comment document
"""
return self.create(data, **kwargs)
def create_bulk(self, data_list, **kwargs):
"""
To bulk create comments
@type data_list: list
@param data: data which hold array of comment and event id in format of
[<comment_id1>, <comment_id2>, ...]
@type kwargs: dict
@param kwargs: kv args which holds extra settings
@rtype: list [<comment_id>]
@return: comment ids of generated comment documents
"""
if not isinstance(data_list, list):
raise TypeError('Array of comments expected')
activities = []
results = []
for data in data_list:
comment_id = self._prep(data)[self.comment_id_key]
activities.append('The comment="%s" was created successfully.' % data.get(self.comment_key))
results.append(comment_id)
self.get_event_pusher().push_events(data_list)
self.audit.send_activity_to_audit_bulk(data_list, activities, 'Comment created')
return results
def update(self, object_id, data, is_partial_update=False, **kwargs):
raise NotImplementedError('%s operation is not supported for this %s object type' % ('update',
self.object_type))
def delete(self, object_id, **kwargs):
raise NotImplementedError('%s operation is not supported for this %s object type' % ('delete',
self.object_type))
def get_bulk(self, object_ids, **kwargs):
raise NotImplementedError('%s operation is not supported for this %s object type' % ('get_bulk',
self.object_type))
def update_bulk(self, object_ids, data_list, is_partial_update=False, **kwargs):
raise NotImplementedError('%s operation is not supported for this %s object type' % ('update_bulk',
self.object_type))
def delete_bulk(self, object_ids, **kwargs):
raise NotImplementedError('%s operation is not supported for this %s object type' % ('delete_bulk',
self.object_type))