You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

208 lines
7.6 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import json
import os
import splunklib.client as client
from feature_flagging.license_retriever import LicenseRetriever
from splunk.clilib.bundle_paths import make_splunkhome_path
from itsi.itsi_utils import ITOAInterfaceUtils
from itsi.content_packs.constants import ContentPackFields, CONTENT_TYPE_TO_ITOA_TYPE
from ITOA.setup_logging import InstrumentCall
class ContentPackEntitlementsProcessor(object):
"""This class is responsible for write entitlements to content pack data."""
def __init__(self, content_pack_id, logger, session_key, transaction_id=None):
"""
:param content_pack_id: the content pack id
:type content_pack_id: str
:param logger: a logger instance
:type logger: Logger
:param session_key: the session key
:type session_key: str
:param transaction_id: transaction info for tracking for debugging
:type transaction_id: basestring
"""
self.content_pack_id = content_pack_id
self.logger = logger
self.session_key = session_key
self._instrumentation = InstrumentCall(logger)
self.transaction_id = transaction_id
self.content_app_service = ContentPackEntitlementsProcessor._setup_splunk_service(
session_key=session_key,
app_name='DA-ITSI-ContentLibrary'
)
@staticmethod
def _setup_splunk_service(session_key, app_name):
"""
:param session_key: the session key
:type session_key: str
:param app_name: Splunk app name
:type app_name: str
:return: a Splunk Service object
:rtype: object
"""
(host, port) = ITOAInterfaceUtils.get_splunk_host_port()
return client.connect(token=session_key, app=app_name, host=host, port=port)
def get_entitlement_status(self, entitlement_list):
"""
:param entitlement_list: entitlement list of an object type or a content pack
:type entitlement_list: list
:return: entitlement_status of this object type or content pack
:rtype: boolean
"""
features, _ = LicenseRetriever(self.session_key).get_features_and_suite()
entitlement_status = False
for entitlement in entitlement_list:
try:
entitlement_status = entitlement_status or features.get(entitlement)
except KeyError as e:
self.logger.error(f"tid={self.transaction_id} entitlement {entitlement} not existing in feature_suite")
raise e
return entitlement_status
def apply_object_type_level_entitlement(self, content_objects):
"""
:param content_objects: content pack objects dict aggregated by content types
:type content_objects: dict
:return: None
"""
entitlement = self.get_entitlement('object_type_level')
for content_type, entitlement_list in entitlement.items():
# This is to handle case where object type is defined in manifest.json of a content pack
# but actually has no objects
if content_type not in content_objects:
continue
entitlement_status = self.get_entitlement_status(entitlement_list)
objects = content_objects[content_type]
for item in objects:
item[ContentPackFields.ENTITLEMENT_STATUS] = entitlement_status
def apply_content_pack_level_entitlement(self, content_pack):
"""
:param content_pack: content pack data
:type content_pack: list
:return: None
"""
entitlement = self.get_entitlement('content_pack_level')
entitlement_list = entitlement['entitlement']
entitlement_status = self.get_entitlement_status(entitlement_list)
content_pack[ContentPackFields.ENTITLEMENT_STATUS] = entitlement_status
def get_entitlement(self, entitlement_level):
"""
:param entitlement_level: one of ['content_pack_level', 'object_type_level']
:type entitlement_level: str
:return: content pack level entitlement/object type level entitlement
:rtype: dict
"""
if entitlement_level == 'content_pack_level':
return self.get_content_pack_level_entitlements()
elif entitlement_level == 'object_type_level':
return self.get_object_level_entitlements()
def read_content_pack_manifest(self):
"""
reads the manifest.json in the content_pack
:return: manifest.json
:rtype: dict
"""
object_manifest_path = make_splunkhome_path(['etc', 'apps', self.content_pack_id, 'itsi', 'manifest.json'])
with open(object_manifest_path) as f:
object_manifest = json.load(f)
return object_manifest
def read_content_pack_rule(self):
"""
reads content pack's rule.json.
If rule.json does not exist in the content pack's directory, default rule.json is read
:return: rule.json
:rtype: dict
"""
rule_path = make_splunkhome_path([
'etc',
'apps',
self.content_pack_id,
'itsi',
'rule.json'])
if not os.path.exists(rule_path):
rule_path = make_splunkhome_path([
'etc',
'apps',
'SA-ITOA',
'lib',
'itsi',
'content_packs',
'rule.json'
])
with open(rule_path) as f:
rule = json.load(f)
return rule
def get_object_level_entitlements(self):
"""
Retrieves object level entitlements of the content pack
:return: object level entitlements
:rtype: dict
"""
object_manifest = self.read_content_pack_manifest()
rules = self.read_content_pack_rule()
object_types = [key for key in object_manifest.keys() if key in CONTENT_TYPE_TO_ITOA_TYPE]
entitlements = rules['object_type_entitlement']
entitlements = {key: value for key, value in entitlements.items() if key in object_types}
return entitlements
def get_content_pack_level_entitlements(self):
"""
Retrieves content pack level entitlements
:return: content pack level entitlements
:rtype: dict
"""
object_manifest = self.read_content_pack_manifest()
rules = self.read_content_pack_rule()
object_types = set([key for key in object_manifest.keys() if key in CONTENT_TYPE_TO_ITOA_TYPE])
entitlements = set()
for entity_type, entitlement_list in rules.get('object_type_entitlement').items():
if entity_type in object_types:
entitlements.update(entitlement_list)
return {'entitlement': list(entitlements)}
def process_objects(self, content_objects, **kwargs):
"""
Filter out objects that don't have entitlements
:param content_objects: the content objects data
:type content_objects: dict
:return: the content objects data with entitlements
:rtype: dict
"""
with self._instrumentation.track("ContentPackEntitlementsProcessor.process_objects", transaction_id=self.transaction_id):
entitlement = self.get_entitlement('object_type_level')
for content_type, entitlement_list in entitlement.items():
entitlement_status = self.get_entitlement_status(entitlement_list)
if not entitlement_status and content_type in content_objects:
del content_objects[content_type]
return content_objects