You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
296 lines
13 KiB
296 lines
13 KiB
# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
|
|
import sys
|
|
|
|
from itsi.content_packs.itoa import (
|
|
get_itoa_identifier_fields,
|
|
get_itoa_object_class,
|
|
get_itoa_object_id,
|
|
get_itoa_object_title
|
|
)
|
|
from itsi.content_packs.journal import EntryType
|
|
from itsi.content_packs.constants import ContentPackInstallOptions, ContentType, CONTENT_TYPE_SUPPORTING_DUPLICATES, CONTENT_TYPE_WRITE_PRIORITY as CONTENT_TYPE_PRIORITY_ORDER
|
|
from ITOA.setup_logging import InstrumentCall
|
|
|
|
|
|
class ConflictResolver(object):
|
|
"""
|
|
Employs strategies to resolve conflicts for content objects that may already exist in ITSI.
|
|
"""
|
|
|
|
def __init__(self, logger, session_key, transaction_id=None):
|
|
"""
|
|
:param logger: a logger instance
|
|
:type logger: Logger
|
|
|
|
:param session_key: the session key
|
|
:type session_key: str
|
|
|
|
:param transaction_id: transaction info for tracking for debugging
|
|
:type transaction_id: basestring
|
|
"""
|
|
self.logger = logger
|
|
self.session_key = session_key
|
|
self._instrumentation = InstrumentCall(logger)
|
|
self.transaction_id = transaction_id
|
|
|
|
def process_objects(self, content_objects, journal):
|
|
"""
|
|
Employs the conflict resolution strategy on the given content objects.
|
|
|
|
:param content_objects: the content objects data
|
|
:type content_objects: dict
|
|
|
|
:param journal: a journal instance
|
|
:type journal: TransactionJournal
|
|
|
|
:return: the content objects data
|
|
:rtype: dict
|
|
"""
|
|
with self._instrumentation.track("ConflictResolver.process_objects", transaction_id=self.transaction_id):
|
|
failed_parent_itsi_object_priority_order = sys.maxsize
|
|
failed_parent_itsi_object_type = ""
|
|
processed_objects = {}
|
|
|
|
content_types_with_priority = sorted(((content_type, CONTENT_TYPE_PRIORITY_ORDER[content_type]) for content_type in content_objects.keys()), key=lambda x: x[1])
|
|
|
|
for content_type, priority_order in content_types_with_priority:
|
|
objects = content_objects[content_type]
|
|
if not objects:
|
|
continue
|
|
if failed_parent_itsi_object_type and failed_parent_itsi_object_priority_order < priority_order:
|
|
error_message = f'tid={self.transaction_id} Skipping installation due to a failure while processing the parent ITSI Object Type: "{failed_parent_itsi_object_type}"'
|
|
self.logger.error(error_message)
|
|
journal.failure({
|
|
'error_message': error_message,
|
|
'type': EntryType.SKIPPING_INSTALLATION_PROCESSING + ' ' + failed_parent_itsi_object_type,
|
|
'content_type': content_type,
|
|
'ids': list({get_itoa_object_id(content_type, obj) for obj in objects}),
|
|
'titles': list({get_itoa_object_title(content_type, obj) for obj in objects})
|
|
})
|
|
else:
|
|
processed_objects[content_type], failed_parent_itsi_object_type, failed_parent_itsi_object_priority_order = self.process_content_type(content_type, objects, priority_order, journal=journal)
|
|
|
|
return processed_objects
|
|
|
|
def process_content_type(self, content_type, objects, priority_order, journal):
|
|
"""
|
|
Employs the conflict resolution strategy on the given content type objects.
|
|
|
|
:param content_type: the content type
|
|
:type content_type: str
|
|
|
|
:param objects: the list of content object data
|
|
:type objects: list
|
|
|
|
:param priority_order: priority order of ITSI Object
|
|
:type priority_order: int
|
|
|
|
:param journal: a journal instance
|
|
:type journal: TransactionJournal
|
|
|
|
:return: a tuple that will contain following items:
|
|
1. The filtered list of content objects data that are successfully processed
|
|
2. Name of ITSI Object Type that indicates that installation is skipped for dependent ITSI Objects Type
|
|
3. Priority Order of ITSI Object Type
|
|
:rtype: (list, string, int)
|
|
"""
|
|
existing_objects = self.fetch_existing_objects(content_type, objects)
|
|
|
|
if content_type == ContentType.GLASS_TABLE_IMAGE:
|
|
# Images are a special use case since save_batch does not have ability to update existing objects.
|
|
self.clean_up_images(content_type, objects)
|
|
|
|
return self.process_objects_by_type(content_type, objects, existing_objects, priority_order, journal=journal)
|
|
|
|
def fetch_existing_objects(self, content_type, objects):
|
|
"""
|
|
Returns the potential existing objects in ITSI for the given content type and list of objects.
|
|
|
|
:param content_type: the content type
|
|
:type content_type: str
|
|
|
|
:param objects: the content objects data
|
|
:type objects: list
|
|
|
|
:return: a list of object ids
|
|
:rtype: list
|
|
"""
|
|
id_fields = get_itoa_identifier_fields(content_type)
|
|
id_filter = self.construct_id_filter(objects, id_fields)
|
|
filter_data = {'$or': id_filter } if id_filter else {}
|
|
|
|
itoa_object_class = get_itoa_object_class(content_type)
|
|
|
|
handler = itoa_object_class(self.session_key, 'nobody')
|
|
|
|
return handler.get_bulk(
|
|
'nobody',
|
|
fields=['_key', 'title'],
|
|
filter_data=filter_data
|
|
)
|
|
|
|
def construct_id_filter(self, objects, id_fields):
|
|
"""
|
|
Returns the filter data containing the ids for the given list of objects.
|
|
|
|
:param objects: a list of objects data
|
|
:type objects: list
|
|
|
|
:param id_fields: a list of field names used to identify an object
|
|
:type id_fields: list
|
|
|
|
:return: a list of filter objects
|
|
:rtype: list
|
|
"""
|
|
id_filter = []
|
|
|
|
for obj in objects:
|
|
for field in id_fields:
|
|
value = obj.get(field)
|
|
if value:
|
|
id_filter.append({field: value})
|
|
else:
|
|
self.logger.info(f'tid={self.transaction_id} Unable to find {field} field for object={obj}')
|
|
|
|
return id_filter
|
|
|
|
def clean_up_images(self, content_type, objects):
|
|
"""
|
|
Images are a special case for the overwrite resolution strategy.
|
|
|
|
To overwrite, delete the existing object on the environment first then reinstall it.
|
|
|
|
:param content_type: the content type (going to be IMAGES)
|
|
:type content_type: str
|
|
|
|
:param objects: a list of content type objects
|
|
:type objects: list
|
|
|
|
:return: the filtered list of content objects data
|
|
:rtype: list
|
|
"""
|
|
itoa_object_class = get_itoa_object_class(content_type)
|
|
image_handler = itoa_object_class(self.session_key, 'nobody')
|
|
|
|
try:
|
|
for image in objects:
|
|
image_handler.delete(image.get('_key', ''))
|
|
except Exception as e:
|
|
self.logger.error(f'tid={self.transaction_id} Failed to delete content pack image objects from environment.')
|
|
self.logger.exception(f'tid={self.transaction_id} {e}')
|
|
|
|
def process_objects_by_type(self, content_type, objects, existing_objects, priority_order, journal):
|
|
"""
|
|
Processes all objects from the content pack. Don't check for existing objects and just return all content
|
|
pack objects.
|
|
|
|
:param content_type: the content type
|
|
:type content_type: str
|
|
|
|
:param objects: a list of content type objects
|
|
:type objects: list
|
|
|
|
:param existing_objects: a list of existing objects
|
|
:type existing_objects: list
|
|
|
|
:param priority_order: priority order of ITSI Object
|
|
:type priority_order: int
|
|
|
|
:param journal: a journal instance
|
|
:type journal: TransactionJournal
|
|
|
|
:return: a tuple that will contain following items:
|
|
1. The filtered list of content objects data that are successfully processed
|
|
2. Name of ITSI Object Type that indicates that installation is skipped for dependent ITSI Objects Type
|
|
3. Priority Order of ITSI Object Type
|
|
:rtype: (list, string, int)
|
|
"""
|
|
with self._instrumentation.track(f"ConflictResolver.process_objects_by_type.{content_type}", transaction_id=self.transaction_id):
|
|
failed_parent_itsi_object_priority_order = sys.maxsize
|
|
failed_parent_itsi_object_type = ""
|
|
id_fields = get_itoa_identifier_fields(content_type)
|
|
local_ids_and_title = set()
|
|
processed = []
|
|
for obj in objects:
|
|
if content_type not in CONTENT_TYPE_SUPPORTING_DUPLICATES:
|
|
if (
|
|
content_type != ContentType.CORRELATION_SEARCH
|
|
and content_type != ContentType.GLASS_TABLE_IMAGE
|
|
and self.object_name_exists(
|
|
obj, existing_objects, content_type, journal=journal
|
|
)
|
|
):
|
|
# We will not skip the installation of other ITSI Objects as Glass Table Icon is not available for user selection
|
|
if not failed_parent_itsi_object_type and content_type not in ContentType.GLASS_TABLE_ICON:
|
|
failed_parent_itsi_object_type = content_type
|
|
failed_parent_itsi_object_priority_order = priority_order
|
|
continue
|
|
|
|
"""
|
|
For the DUPLICATE_OBJECT ErrorCode, a local_ids_and_title list is maintained. The '_key' and 'title' field of the content pack objects is stored if it
|
|
is successfully processed, and the code will check that the same '_key' or 'title' field is not present in any other object of the content pack.
|
|
"""
|
|
# This flag is kept to identify that if duplicate object is founded then do not add the object to processed object list
|
|
is_duplicate_found = False
|
|
for field in id_fields:
|
|
object_field = obj.get(field)
|
|
if object_field in local_ids_and_title:
|
|
# We will not skip the installation of other ITSI Objects as Glass Table Icon and Images are not available for user selection
|
|
if not failed_parent_itsi_object_type and content_type not in (ContentType.GLASS_TABLE_IMAGE, ContentType.GLASS_TABLE_ICON):
|
|
failed_parent_itsi_object_type = content_type
|
|
failed_parent_itsi_object_priority_order = priority_order
|
|
error_message = f'tid={self.transaction_id} Excluding duplicate object content_type="{content_type}" with "{field}"="{object_field}"'
|
|
self.logger.error(error_message)
|
|
journal.failure({
|
|
'error_message': error_message,
|
|
'type': EntryType.DUPLICATE_OBJECT,
|
|
'content_type': content_type,
|
|
'id': get_itoa_object_id(content_type, obj),
|
|
'title': get_itoa_object_title(content_type, obj)
|
|
})
|
|
is_duplicate_found = True
|
|
break
|
|
else:
|
|
local_ids_and_title.add(object_field)
|
|
if is_duplicate_found:
|
|
continue
|
|
processed.append(obj)
|
|
return processed, failed_parent_itsi_object_type, failed_parent_itsi_object_priority_order
|
|
|
|
def object_name_exists(self, obj, existing_objects, content_type, journal):
|
|
"""
|
|
Determines whether the given object's name already exists for other objects.
|
|
|
|
:param obj: the object data
|
|
:type obj: dict
|
|
|
|
:param existing_objects: a list of existing objects
|
|
:type existing_objects: list
|
|
|
|
:param content_type: the content type
|
|
:type content_type: str
|
|
|
|
:param journal: a journal instance
|
|
:type journal: TransactionJournal
|
|
|
|
:return: True if the object exists, False otherwise
|
|
:rtype: bool
|
|
"""
|
|
object_name = obj.get('title')
|
|
object_id = obj.get('_key')
|
|
|
|
for existing_obj in existing_objects:
|
|
if object_name == existing_obj.get('title') and object_id != existing_obj.get('_key'):
|
|
error_message = f'tid={self.transaction_id} Excluding object content_type="{content_type}" with name="{object_name}" since it already exists'
|
|
self.logger.error(error_message)
|
|
journal.failure({
|
|
'error_message': error_message,
|
|
'type': EntryType.OBJECT_NAME_ALREADY_EXISTS,
|
|
'content_type': content_type,
|
|
'id': get_itoa_object_id(content_type, obj),
|
|
'title': get_itoa_object_title(content_type, obj)
|
|
})
|
|
return True
|
|
|
|
return False
|