# Copyright (C) 2005-2024 Splunk Inc. All Rights Reserved. import json import os import splunklib.client as client from feature_flagging.license_retriever import LicenseRetriever from splunk.clilib.bundle_paths import make_splunkhome_path from itsi.itsi_utils import ITOAInterfaceUtils from itsi.content_packs.constants import ContentPackFields, CONTENT_TYPE_TO_ITOA_TYPE class ContentPackEntitlementsProcessor(object): """This class is responsible for write entitlements to content pack data.""" def __init__(self, content_pack_id, logger, session_key): """ :param content_pack_id: the content pack id :type content_pack_id: str :param logger: a logger instance :type logger: Logger :param session_key: the session key :type session_key: str """ self.content_pack_id = content_pack_id self.logger = logger self.session_key = session_key self.content_app_service = ContentPackEntitlementsProcessor._setup_splunk_service( session_key=session_key, app_name='DA-ITSI-ContentLibrary' ) @staticmethod def _setup_splunk_service(session_key, app_name): """ :param session_key: the session key :type session_key: str :param app_name: Splunk app name :type app_name: str :return: a Splunk Service object :rtype: object """ (host, port) = ITOAInterfaceUtils.get_splunk_host_port() return client.connect(token=session_key, app=app_name, host=host, port=port) def get_entitlement_status(self, entitlement_list): """ :param entitlement_list: entitlement list of an object type or a content pack :type entitlement_list: list :return: entitlement_status of this object type or content pack :rtype: boolean """ features, _ = LicenseRetriever(self.session_key).get_features_and_suite() entitlement_status = False for entitlement in entitlement_list: try: entitlement_status = entitlement_status or features.get(entitlement) except KeyError as e: self.logger.error("entitlement %s not existing in feature_suite", entitlement) raise e return entitlement_status def apply_object_type_level_entitlement(self, content_objects): """ :param content_objects: content pack objects dict aggregated by content types :type content_objects: dict :return: None """ entitlement = self.get_entitlement('object_type_level') for content_type, entitlement_list in entitlement.items(): # This is to handle case where object type is defined in manifest.json of a content pack # but actually has no objects if content_type not in content_objects: continue entitlement_status = self.get_entitlement_status(entitlement_list) objects = content_objects[content_type] for item in objects: item[ContentPackFields.ENTITLEMENT_STATUS] = entitlement_status def apply_content_pack_level_entitlement(self, content_pack): """ :param content_pack: content pack data :type content_pack: list :return: None """ entitlement = self.get_entitlement('content_pack_level') entitlement_list = entitlement['entitlement'] entitlement_status = self.get_entitlement_status(entitlement_list) content_pack[ContentPackFields.ENTITLEMENT_STATUS] = entitlement_status def get_entitlement(self, entitlement_level): """ :param entitlement_level: one of ['content_pack_level', 'object_type_level'] :type entitlement_level: str :return: content pack level entitlement/object type level entitlement :rtype: dict """ if entitlement_level == 'content_pack_level': return self.get_content_pack_level_entitlements() elif entitlement_level == 'object_type_level': return self.get_object_level_entitlements() def read_content_pack_manifest(self): """ reads the manifest.json in the content_pack :return: manifest.json :rtype: dict """ object_manifest_path = make_splunkhome_path(['etc', 'apps', self.content_pack_id, 'itsi', 'manifest.json']) with open(object_manifest_path) as f: object_manifest = json.load(f) return object_manifest def read_content_pack_rule(self): """ reads content pack's rule.json. If rule.json does not exist in the content pack's directory, default rule.json is read :return: rule.json :rtype: dict """ rule_path = make_splunkhome_path([ 'etc', 'apps', self.content_pack_id, 'itsi', 'rule.json']) if not os.path.exists(rule_path): rule_path = make_splunkhome_path([ 'etc', 'apps', 'SA-ITOA', 'lib', 'itsi', 'content_packs', 'rule.json' ]) with open(rule_path) as f: rule = json.load(f) return rule def get_object_level_entitlements(self): """ Retrieves object level entitlements of the content pack :return: object level entitlements :rtype: dict """ object_manifest = self.read_content_pack_manifest() rules = self.read_content_pack_rule() object_types = [key for key in object_manifest.keys() if key in CONTENT_TYPE_TO_ITOA_TYPE] entitlements = rules['object_type_entitlement'] entitlements = {key: value for key, value in entitlements.items() if key in object_types} return entitlements def get_content_pack_level_entitlements(self): """ Retrieves content pack level entitlements :return: content pack level entitlements :rtype: dict """ object_manifest = self.read_content_pack_manifest() rules = self.read_content_pack_rule() object_types = set([key for key in object_manifest.keys() if key in CONTENT_TYPE_TO_ITOA_TYPE]) entitlements = set() for entity_type, entitlement_list in rules.get('object_type_entitlement').items(): if entity_type in object_types: entitlements.update(entitlement_list) return {'entitlement': list(entitlements)} def process_objects(self, content_objects, **kwargs): """ Filter out objects that don't have entitlements :param content_objects: the content objects data :type content_objects: dict :return: the content objects data with entitlements :rtype: dict """ entitlement = self.get_entitlement('object_type_level') for content_type, entitlement_list in entitlement.items(): entitlement_status = self.get_entitlement_status(entitlement_list) if not entitlement_status: del content_objects[content_type] return content_objects