# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. import sys from splunk.clilib.bundle_paths import make_splunkhome_path sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-UserAccess', 'lib'])) from user_access_utils import UserAccess from itsi.content_packs import constants from itsi.content_packs.itoa import get_itoa_identifier_fields, get_itoa_object_class, get_itoa_object_title from itsi.content_packs.journal import EntryType from ITOA.setup_logging import InstrumentCall USERACCESS_ACL = { 'read': ['*'], 'write': ['*'], 'delete': ['*'] } ITSI_ACL = { 'can_change_perms': True, 'can_share_app': True, 'can_share_global': True, 'can_share_user': True, 'can_write': True, 'modifiable': True, 'owner': 'nobody', 'perms': { 'read': ['*'], 'write': ['*'] }, 'sharing': 'app' } class AccessConfigurator(object): """ Configures ACLs for content objects. """ def __init__(self, logger, session_key, transaction_id=None): """ :param logger: a logger instance :type logger: Logger :param session_key: the session key :type session_key: str :param transaction_id: transaction info for tracking for debugging :type transaction_id: basestring """ self.logger = logger self.session_key = session_key self._instrumentation = InstrumentCall(logger) self.transaction_id = transaction_id def process_objects(self, content_objects, journal): """ Configures ACLs for supported content type objects. :param content_objects: the content objects data :type content_objects: dict :param journal: a journal instance :type journal: TransactionJournal :return: the content objects data :rtype: dict """ with self._instrumentation.track("AccessConfigurator.process_objects", transaction_id=self.transaction_id): for content_type, objects in content_objects.items(): if content_type not in constants.CONTENT_TYPES_WITH_ACL: continue self.configure_access(content_type, objects, journal=journal) return content_objects def configure_access(self, content_type, objects, journal): """ Configures ACLs for the given content objects. :param content_type: the content type :type content_type: str :param objects: the list of objects data :type objects: list :param journal: :type journal: TransactionJournal """ object_type = constants.CONTENT_TYPE_TO_ITOA_TYPE.get(content_type) if not object_type: self.logger.warning( f'tid={self.transaction_id} Unsupported content_type="{content_type}" found while configuring ACLs' ) return itoa_object_class = get_itoa_object_class(content_type) if not itoa_object_class: self.logger.warning( f'tid={self.transaction_id} No object class available for content_type="{content_type}"' ) return object_storename = getattr(itoa_object_class, 'collection_name', None) if not object_storename: self.logger.warning( f'tid={self.transaction_id} No collection name associated for content_type="{content_type}"' ) return object_ids = self.configure_itsi_acl(content_type, objects) if not object_ids: return try: UserAccess.bulk_update_perms( object_ids=object_ids, acl=USERACCESS_ACL, object_app='itsi', object_type=object_type, object_storename=object_storename, session_key=self.session_key, logger=self.logger ) except Exception as ex: self.logger.error( f'tid={self.transaction_id} Failed to bulk update permissions for content_type="{content_type}" object_type="{object_type}" object_storename="{object_storename}" object_ids="{object_ids}"' ) self.logger.exception(f'tid={self.transaction_id} {ex}') journal.failure({ 'error_message': str(ex), 'type': EntryType.ERROR_BULK_UPDATE_PERMS, 'content_type': content_type, 'object_type': object_type, 'object_storename': object_storename, 'ids': object_ids, 'titles': list({get_itoa_object_title(content_type, obj) for obj in objects}) }) def configure_itsi_acl(self, content_type, objects): """ Sets ACLs for valid objects and returns these object ids . :param content_type: the content type :type content_type: str :param objects: the list of objects data :type objects: list :return: the list of object ids :rtype: list """ object_ids = [] id_field = get_itoa_identifier_fields(content_type)[0] for obj in objects: object_id = obj.get(id_field) if object_id: obj['acl'] = ITSI_ACL obj['_owner'] = 'nobody' object_ids.append(object_id) else: self.logger.warning(f'tid={self.transaction_id} No _key field found for object={obj}') return object_ids