You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

432 lines
13 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import copy
import json
from ITOA.setup_logging import setup_logging
from ITOA.event_management.notable_event_aggregation_policy import NotableEventAggregationPolicy
from ITOA.event_management.notable_event_utils import OBJECT_COLLECTION_MATRIX
from ITOA.saved_search_utility import SavedSearch
from SA_ITOA_app_common.apifilesave.filesave import ApifilesaveService
from apiiconcollection.iconcollection import IconService
from itsi.content_packs.constants import CONTENT_TYPE_TO_ITOA_TYPE
from itsi.content_packs.constants import ContentType
from itsi.event_management.itsi_correlation_search import ItsiCorrelationSearch
from itsi.objects.object_manifest import object_manifest
LOGGER = setup_logging(
logfile_name='itsi_content_packs_itoa.log',
logger_name='itsi.content_packs.itoa'
)
def get_itoa_identifier_fields(content_type):
"""
Returns a list of fields to identify a content type.
:param content_type: the content type
:type content_type: str
:return: a list of field names
:rtype: list
"""
if content_type == ContentType.CORRELATION_SEARCH:
return ['name']
if content_type == ContentType.GLASS_TABLE_IMAGE:
return ['_key']
return ['_key', 'title']
def get_itoa_object_class(content_type):
"""
Returns the ItoaObject class for the given content type.
:param content_type: the content type
:type content_type: str
:return: the ItoaObject class
:rtype: ItoaObject
"""
if content_type == ContentType.GLASS_TABLE_ICON:
return ItoaIcon
if content_type == ContentType.CORRELATION_SEARCH:
return ItoaCorrelationSearch
if content_type == ContentType.NOTABLE_EVENT_AGGREGATION_POLICY:
return ItoaNotableEventAggregationPolicy
if content_type == ContentType.GLASS_TABLE_IMAGE:
return ItoaImage
itoa_type = CONTENT_TYPE_TO_ITOA_TYPE.get(content_type)
return object_manifest.get(itoa_type)
def get_itoa_object_id(content_type, itoa_object):
"""
Returns the object id for the given object.
:param content_type: the content type
:type content_type: str
:param itoa_object: the itoa object data
:type itoa_object: dict
:return: the object id
:rtype: str
"""
id_field = get_itoa_identifier_fields(content_type)[0]
return itoa_object.get(id_field, '')
def get_itoa_object_title(content_type, itoa_object):
"""
Returns the title for the given object.
:param content_type: the content type
:type content_type: str
:param itoa_object: the itoa object data
:type itoa_object: dict
:return: the object title
:rtype: str
"""
title_field = get_itoa_object_title_field(content_type)
return itoa_object.get(title_field, '')
def get_itoa_object_title_field(content_type):
"""
Returns title field of objects based on content type
:param content_type: the content type
:type content_type: str
:return: a string
:rtype: string
"""
if content_type in [ContentType.CORRELATION_SEARCH, ContentType.GLASS_TABLE_IMAGE]:
field = 'name'
else:
field = 'title'
return field
class ItoaCorrelationSearch(ItsiCorrelationSearch):
"""
Adapter for managing correlation searches.
"""
collection_name = OBJECT_COLLECTION_MATRIX[
CONTENT_TYPE_TO_ITOA_TYPE[ContentType.CORRELATION_SEARCH]
]
def delete_bulk(self, object_ids, *args, **kwargs):
"""
Deletes correlation searches in bulk by ids.
:param object_ids: a list of object ids
:type object_ids: list
"""
if object_ids and isinstance(object_ids, list):
super(ItoaCorrelationSearch, self).delete_bulk(object_ids, *args, **kwargs)
return
correlation_searches = SavedSearch.get_all_searches(
self.session_key,
search='action.itsi_event_generator=1'
)
search_ids = [search.name for search in correlation_searches]
super(ItoaCorrelationSearch, self).delete_bulk(search_ids, *args, **kwargs)
def get_bulk(self, owner, filter_data, *args, **kwargs):
"""
Returns a list of correlation search data for the given filter criteria.
:param owner: the owner
:type owner: str
:param filter_data: the filter data
:type: filter_data: dict
:return: a list of correlation search objects
:rtype: list
"""
return super(ItoaCorrelationSearch, self).get_bulk([], filter=filter_data, **kwargs)
def save_batch(self, owner, data_list, **kwargs):
"""
Saves the given list of correlation search objects.
:param owner: the owner
:type owner: str
:param data_list: a list of objects data
:type data_list: list
:return: the successfully saved object ids
:rtype: list
"""
# Prevent `data_list` from being modified via the call to `create_bulk()`
copied_list = copy.deepcopy(data_list)
search_ids = [search['sid'] for search in copied_list]
# Perform an update_bulk instead of a create_bulk since create_bulk return a 409 error if trying to overwrite
# objects that already exist (by _key) in the kvstore
return self.update_bulk(search_ids, copied_list)
class ItoaNotableEventAggregationPolicy(NotableEventAggregationPolicy):
"""
Adapter for managing notable event aggregation policies.
"""
collection_name = OBJECT_COLLECTION_MATRIX[
CONTENT_TYPE_TO_ITOA_TYPE[ContentType.NOTABLE_EVENT_AGGREGATION_POLICY]
]
def delete_bulk(self, *args, **kwargs):
"""
Deletes all notable event aggregation policies.
"""
super(ItoaNotableEventAggregationPolicy, self).delete_bulk([])
def get_bulk(self, owner, filter_data, *args, **kwargs):
"""
Returns a list of notable event aggregation policy data for the given filter criteria.
:param owner: the owner
:type owner: str
:param filter_data: the filter data
:type: filter_data: dict
:return: a list of notable event aggregation policy objects
:rtype: list
"""
return super(ItoaNotableEventAggregationPolicy, self).get_bulk([], filter=filter_data, **kwargs)
def save_batch(self, owner, data_list, **kwargs):
"""
Saves the given list of notable event aggregation policy objects.
:param owner: the owner
:type owner: str
:param data_list: a list of objects data
:type data_list: list
:return: the successfully saved object ids
:rtype: list
"""
return self.create_bulk(data_list)
class ItoaImage(object):
"""
Adapter for managing images.
"""
collection_name = 'SA-ITOA_files'
def __init__(self, session_key, *args, **kwargs):
"""
:param session_key: the session key
:type session_key: str
"""
self.session_key = session_key
def delete(self, image_id, *args, **kwargs):
"""
Deletes specific image that matches the image_id.
:param image_id: id of the image to be deleted
:type image_id: str
"""
service = self._get_filesave_service()
service.delete(image_id)
def delete_bulk(self, *args, **kwargs):
"""
Deletes all images.
"""
service = self._get_filesave_service()
service.delete_all()
def get_bulk(self, *args, **kwargs):
"""
Returns a list of images.
:return: a list of images
:rtype: list
"""
options = {}
fields = kwargs.get('fields')
if fields:
options['fields'] = fields
query = kwargs.get('filter_data')
if query:
options['query'] = json.dumps(query)
service = self._get_filesave_service()
return service.kv_client._collection_data.query(**options)
def save_batch(self, owner, data_list, **kwargs):
"""
Saves the given list of images.
:param owner: the owner
:type owner: str
:param data_list: a list of objects data
:type data_list: list
:return: the successfully saved object ids
:rtype: list
"""
service = self._get_filesave_service()
saved = []
for data in data_list:
ret = service.create(data)
try:
result = json.loads(ret)
except Exception as e:
LOGGER.error('Encountered exception while saving data="%s", exception="%s"',
data, e)
continue
key = result.get('_key', None)
if key:
saved.append(key)
return saved
def _get_filesave_service(self):
"""
Returns an object that interfaces with the image storage backend.
:return: an instance to the image storage backend
:rtype: ApifilesaveService
"""
return ApifilesaveService(app_name='SA-ITOA',
session_id=self.session_key,
user_name='nobody',
collection_name=self.collection_name)
class ItoaIcon(object):
"""
Adapter for managing icons.
"""
collection_name = 'SA-ITOA_icon_collection'
def __init__(self, session_key, *args, **kwargs):
"""
:param session_key: the session key
:type session_key: str
"""
self.session_key = session_key
def delete_bulk(self, *args, **kwargs):
"""Deletes multiple icons based on filter.
:param *args: Pass a non-keyworded, variable-length argument list to the function
:param **kwargs: Pass in any additional keyword arguments that are passed to the function
:return:
"""
service = self._get_icon_service()
filter = kwargs.get('filter_data')
service.bulk_delete(filter)
def get_bulk(self, *args, **kwargs):
"""Get icons based on input criteria
For whatever reason 'fields' returns multiple field values
only when the fields list is passed in as a string. otherwise
it returns only 1 field value
:param *args: Pass a non-keyworded, variable-length argument list
:param **kwargs: Pass in the parameters that are passed to the get_bulk function
:return: icon objects with 'fields' filled in
"""
options = {}
fields = kwargs.get('fields')
if fields:
if isinstance(fields, list):
fields = ','.join(fields)
options['fields'] = fields
query = kwargs.get('filter_data')
if query:
options['query'] = json.dumps(query)
service = self._get_icon_service()
return service.kv_client._collection_data.query(**options)
def save_batch(self, owner, data_list, **kwargs):
"""Saves a list of icons
The save_batch function saves a list of icons to the icon service.
It does this by first checking if each icon exists in the database, and if it doesn't,
it creates it. If an icon already exists in the database, then it updates that record instead.
:param data_list: Pass a list of icons to save
:param **kwargs: Pass in any additional parameters that are needed to be passed into the save_batch function
:return: A list of keys that were saved
"""
service = self._get_icon_service()
saved = []
for data in data_list:
if 'dataURI' in data and not data.get('dataURI'):
# an empty dataURI causes icon not to show up
# properly in the glass table
data.pop('dataURI')
key = data.get('_key')
if key:
try:
ret = service.update(key, data)
saved.append(key)
continue
except Exception as e:
if e.status and e.status != 404:
LOGGER.error(e)
continue
# else, icon doesn't exist, will create
try:
ret = service.create([data])
except Exception as e:
LOGGER.error(e)
continue
if ret and len(ret) == 1:
saved.append(ret[0])
return saved
def _get_icon_service(self):
"""
Returns an object that interfaces with the icon storage backend.
:return: an instance to the icon storage backend
:rtype: IconService
"""
return IconService(app_name='SA-ITOA',
session_id=self.session_key,
user_name='nobody',
collection_name=self.collection_name)