You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

82 lines
3.3 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import re
from itsi.content_packs.constants import ContentType
from itsi.content_packs.itoa import get_itoa_object_title, get_itoa_object_title_field
from ITOA.setup_logging import InstrumentCall
class ContentObjectsPrefixer(object):
"""prefixer that prefixes all objects that are selected to be installed."""
def __init__(self, logger, session_key, prefix='', length_limit=50, transaction_id=None):
"""
:param logger: a logger instance
:type logger: Logger
:param session_key: the session key
:type session_key: str
:param prefix: the object title prefix
:type prefix: str
:param transaction_id: transaction info for tracking for debugging
:type transaction_id: basestring
"""
self.logger = logger
self.session_key = session_key
self.prefix = prefix
self.length_limit = length_limit
self._instrumentation = InstrumentCall(logger)
self.transaction_id = transaction_id
def process_objects(self, content_objects, **kwargs):
"""
Prefixes title of the given content objects.
:param content_objects: the content objects data
:type content_objects: dict
:return: the content objects data with title being prefixed
:rtype: dict
"""
with self._instrumentation.track("ContentObjectsPrefixer.process_objects", transaction_id=self.transaction_id):
if not self.is_prefix_valid():
return content_objects
processed_objects = {}
for content_type, objects in content_objects.items():
if content_type not in (ContentType.CORRELATION_SEARCH, ContentType.GLASS_TABLE_IMAGE,
ContentType.GLASS_TABLE_ICON, ContentType.ENTITY_TYPE):
for obj in objects:
obj[get_itoa_object_title_field(content_type)] = \
self.prefix + get_itoa_object_title(content_type, obj)
if content_type == ContentType.SERVICE_ANALYZER:
filterStrings = obj.get('serviceFilterString', '').split(",")
filterStrings = [
f'"{self.prefix + filterString[1:-1]}"' if filterString.startswith('"') and filterString.endswith('"') else self.prefix + filterString
for filterString in filterStrings
]
obj['serviceFilterString'] = ", ".join(filterStrings)
processed_objects[content_type] = objects
return processed_objects
def is_prefix_valid(self):
"""
Determines whether user input prefix is valid or not
:return: whether prefix is valid or not
:rtype: boolean
"""
if not self.prefix:
return False
if len(self.prefix) > self.length_limit:
self.logger.error(f'tid={self.transaction_id} Failed to prefix objects because it exceeds length {self.length_limit}.')
return False
if re.search('[="\']+', self.prefix):
self.logger.error(f'tid={self.transaction_id} Failed to prefix objects because characters \' " = are not allowed in prefix.')
return False
return True