# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. from ITOA.setup_logging import setup_logging from itsi.content_packs.journal import TransactionJournal from itsi.content_packs.readers import ContentPackContentReader from itsi.content_packs.retriever import retrieve_saved_search_consistency_status from itsi.content_packs.entitlements_processor import ContentPackEntitlementsProcessor from itsi.content_packs.itoa import get_itoa_object_class, get_itoa_object_title, get_itoa_object_id from itsi.content_packs.constants import ContentType, ContentPackFields, \ CONTENT_PACK_SOURCE_VERSION_FIELD, CONTENT_PACK_SOURCE_FIELD, CONTENT_TYPE_TO_ITOA_TYPE LOGGER = setup_logging( logfile_name='itsi_content_packs_preview.log', logger_name='itsi.content_packs.preview' ) def preview(content_pack_id, version, session_key=None): """ Preview content pack objects based on the given content pack id and version With installed field indicating whether objects have been installed :param content_pack_id: the content pack id :type content_pack_id: str :param version: the content pack version :type version: str :param session_key: the session key :type session_key: str :return: a list of content packs objects with installed attribute indicating whether content pack objects have been installed or not :rtype: list """ previewer = ContentPackPreviewer( logger=LOGGER, session_key=session_key, ) return previewer.preview(content_pack_id, version) class ContentPackPreviewer(object): """This class is responsible for previewing content pack objects.""" def __init__(self, logger, session_key): """ :param logger: a logger instance :type logger: Logger :param session_key: the session key :type session_key: str """ self.logger = logger self.session_key = session_key self.owner = 'nobody' def get_content_objects(self, content_pack_id, version): """ Retrieve a dict of objects where key is the content type, value is a list of objects of the content type :param content_pack_id: the content pack version id :type content_pack_id: str :param version: the content pack version :type version: str :return: content pack objects dict aggregated by content types :rtype: dict """ journal = TransactionJournal() entitlement_writer = ContentPackEntitlementsProcessor( content_pack_id=content_pack_id, logger=self.logger, session_key=self.session_key ) content_objects = ContentPackContentReader( logger=self.logger, session_key=self.session_key ).read(content_pack_id, version, journal=journal) entitlement_writer.apply_object_type_level_entitlement(content_objects) return content_objects def get_content_info_from_json_files(self, content_objects): """ Retrieve a tuple where the first item is a dict where key is content type, value is a list of content pack objects based on a content_pack_id from content pack json files and the second item is a list of correlation search keys of all content pack correlation searches in json files :param content_objects: content pack objects dict aggregated by content types :type content_objects: dict :return: list of content pack objects with fields id, title, description, list of content pack correlation search keys :rtype: tuple eg: ( { 'glass_tables': [ { 'description': u"Throws an event when storefront isn't able to launch a resource", 'id': u'itsi-citrix-storefront-no-resources', 'title': u'Storefront No Resources' } ] }, [ 'Windows Event Logs', 'Database Events' ] ) """ content_objects_info = {} correlation_search_keys = [] for content_type, objects in content_objects.items(): if content_type == ContentType.GLASS_TABLE_IMAGE: continue elif content_type == ContentType.CORRELATION_SEARCH: # The reason why we treat correlation searches as a special case is because correlation searches # don't have source_itsi_da_version and source_itsi_da fields while other content pack objects have # so that while we can pass source_itsi_da_version and source_itsi_da as filter to retrieve other # content pack objects from KV Store, it doesn't work for correlation searches. The only way is to # first get a list of names(the key of correlation searches) from json files and use that as a filter. correlation_search_keys += [get_itoa_object_id(content_type, item) for item in objects] content_pack_list = content_objects_info.get(content_type, []) content_pack_list += [ { ContentPackFields.ID: get_itoa_object_id(content_type, item), ContentPackFields.TITLE: get_itoa_object_title(content_type, item), ContentPackFields.DESCRIPTION: item.get(ContentPackFields.DESCRIPTION, ''), ContentPackFields.ENTITLEMENT_STATUS: item.get(ContentPackFields.ENTITLEMENT_STATUS, False) } for item in objects] content_objects_info[content_type] = content_pack_list return content_objects_info, correlation_search_keys def get_kvstore_content_objects_keys(self, content_objects_info, correlation_search_keys): """ Retrieve a list of IDs from the KV store if they match with content packs objects. :param content_objects_info: list of content pack retrieved from json files :type content_objects_info: list :param correlation_search_keys: list of content pack correlation search keys from json files :type correlation_search_keys: list :return: dict of content pack object keys separated by content types retrieved from KV Store :rtype: dict """ content_types = list(content_objects_info.keys()) itsi_object_keys = [] kvstore_content_objects_keys = {content_type: [] for content_type in content_types} for content_type, itsi_objects_list in content_objects_info.items(): if content_type == ContentType.CORRELATION_SEARCH: correlation_search_key_query_list = [ {'name': key} for key in correlation_search_keys ] filter_data = {'$or': correlation_search_key_query_list} if correlation_search_key_query_list else {} else: for itsi_object in itsi_objects_list: itsi_object_keys.append({'_key': itsi_object['id']}) filter_data = {'$or': itsi_object_keys} if itsi_object_keys else {} itoa_object_class = get_itoa_object_class(content_type) content_pack_objects_from_kvstore = itoa_object_class( self.session_key, self.owner ).get_bulk(self.owner, fields=['_key'], filter_data=filter_data) for item in content_pack_objects_from_kvstore: kvstore_content_objects_keys[content_type].append(get_itoa_object_id(content_type, item)) return kvstore_content_objects_keys def get_kvstore_content_objects_titles(self, content_objects_info): """ Retrieve a list of titles from the KV store if they match with content packs objects. :param content_objects_info: list of content pack retrieved from json files :type content_objects_info: list :return: dict of content pack object titles separated by content types retrieved from KV Store :rtype: dict """ content_types = list(content_objects_info.keys()) itsi_object_titles = [] kvstore_content_objects_titles = {content_type: [] for content_type in content_types} for content_type, itsi_objects_list in content_objects_info.items(): for itsi_object in itsi_objects_list: itsi_object_titles.append({'title': itsi_object['title']}) filter_data = {'$or': itsi_object_titles} if itsi_object_titles else {} itoa_object_class = get_itoa_object_class(content_type) content_pack_objects_from_kvstore = itoa_object_class( self.session_key, self.owner ).get_bulk(self.owner, fields=['title'], filter_data=filter_data) for item in content_pack_objects_from_kvstore: kvstore_content_objects_titles[content_type].append(get_itoa_object_title(content_type, item)) return kvstore_content_objects_titles @staticmethod def add_fields_to_content_pack_objects(content_objects, content_objects_info, kvstore_content_objects_keys, kvstore_content_objects_titles): """ Add installed, has_dependency, and duplicate_title fields to content pack objects retrieved from json files :param content_objects: content pack objects dict aggregated by content types :type content_objects: dict :param content_objects_info: list of content pack retrieved from json files :type content_objects_info: list :param kvstore_content_objects_keys: dict of content pack object keys separated by content types retrieved from KV Store :type kvstore_content_objects_keys: dict :param kvstore_content_objects_titles: dict of content pack object titles separated by content types retrieved from KV Store :type kvstore_content_objects_titles: dict :return: a dict where key is content type and value is a list of content pack objects with installed field and duplicate_title field :rtype: dict """ updated_content_pack_objects_dict_from_json = {} for content_type, content_pack_object_list in content_objects_info.items(): content_pack_objects_preview_list = [] for content_pack_object in content_pack_object_list: kvstore_data_keys = kvstore_content_objects_keys[content_type] if content_pack_object[ContentPackFields.ID] in kvstore_data_keys: content_pack_object[ContentPackFields.INSTALLED] = True else: content_pack_object[ContentPackFields.INSTALLED] = False kvstore_data_titles = kvstore_content_objects_titles[content_type] if content_pack_object[ContentPackFields.TITLE] in kvstore_data_titles: content_pack_object[ContentPackFields.DUPLICATE_TITLE] = True else: content_pack_object[ContentPackFields.DUPLICATE_TITLE] = False content_pack_object['has_dependency'] = ContentPackPreviewer.identify_dependencies( content_type, content_pack_object[ContentPackFields.ID], content_objects ) content_pack_objects_preview_list.append(content_pack_object) updated_content_pack_objects_dict_from_json[content_type] = content_pack_objects_preview_list return updated_content_pack_objects_dict_from_json @staticmethod def identify_dependencies(content_type, object_id, content_objects): """ Identify has_dependency field of a content pack object by reviewing associated attributes of other objects in this content pack :param content_type: content type of object with object_id :type content_type: str :param object_id: object id of a content pack object :type object_id: str :param content_objects: content pack objects of this content pack id :type content_objects: dict :return: a boolean whether the object of this object_id has dependencies :rtype: bool """ always_has_dependency_list = [ ContentType.CORRELATION_SEARCH, ContentType.NOTABLE_EVENT_AGGREGATION_POLICY ] services = content_objects.get(ContentType.SERVICE, []) if content_type in always_has_dependency_list: return True elif content_type == ContentType.KPI_BASE_SEARCH: for service in services: for kpi in service.get('kpis', []): if 'base_search_id' in kpi and kpi['base_search_id'] == object_id: return True elif content_type == ContentType.KPI_THRESHOLD_TEMPLATE: for service in services: for kpi in service.get('kpis', []): if 'kpi_threshold_template_id' in kpi and kpi['kpi_threshold_template_id'] == object_id: return True elif content_type == ContentType.SERVICE_TEMPLATE: for service in services: if service.get('base_service_template_id') == object_id: return True elif content_type == ContentType.SERVICE: for service in services: if service['_key'] == object_id: dependent_number = len(service['services_depends_on']) depended_number = len(service['services_depending_on_me']) if dependent_number > 0 or depended_number > 0 or service['base_service_template_id']: return True return False def preview(self, content_pack_id, version): """ Get a list of content pack objects with field installed indicating whether content pack objects have been installed or not :param content_pack_id: the content pack version id :type content_pack_id: str :param version: the content pack version :type version: str :return: a dict where key is content type and value is a list of content pack objects with installed field :rtype: dict eg: { 'glass_tables': [ { 'description': u"Throws an event when storefront isn't able to launch a resource", 'id': u'itsi-citrix-storefront-no-resources', 'installed': False, 'title': u'Storefront No Resources' } ] } """ content_objects = self.get_content_objects(content_pack_id, version) content_objects_info, correlation_search_keys = self.get_content_info_from_json_files(content_objects) kvstore_content_objects_keys = self.get_kvstore_content_objects_keys( content_objects_info, correlation_search_keys ) kvstore_content_objects_titles = self.get_kvstore_content_objects_titles( content_objects_info ) content_objects_preview = ContentPackPreviewer.add_fields_to_content_pack_objects( content_objects, content_objects_info, kvstore_content_objects_keys, kvstore_content_objects_titles ) content_objects_preview["saved_searches"] = retrieve_saved_search_consistency_status( session_key=self.session_key, content_pack_id=content_pack_id, ) return content_objects_preview def format_preview_objects(self, content_pack_id, version): """ Get a formatted list of Content Pack Object IDs from preview. :param content_pack_id: the content pack id :type content_pack_id: str :param version: the content pack version :type version: str :return: a dict where key is content type and value is a list of content pack object IDs. :rtype: dict eg: { 'glass_tables': ['gt-id-1', 'gt-id-2'], 'entity_type': ['et-id-1', 'et-id-2'] } """ preview_objects = self.preview(content_pack_id, version) content_filter_data = {} for content_type in preview_objects: try: if content_type != ContentPackFields.SAVED_SEARCHES: object_ids = [] for content_pack_object in preview_objects[content_type]: object_ids.append(content_pack_object['id']) content_filter_data[content_type] = object_ids except Exception as ex: LOGGER.error('Failed formating of object id="%s" content_pack_id="%s" version="%s"', content_pack_object['id'], content_pack_id, version) LOGGER.exception(ex) return content_filter_data