# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. import base64 import json import os from splunk.clilib.bundle_paths import make_splunkhome_path from ITOA.itoa_common import get_conf, get_conf_stanza from itsi.content_packs.constants import ( CONF_ITSI_CONTENT_PACKS, CONTENT_PACKS_HOME_DIR, CONTENT_PACKS_SCREENSHOTS_DIR, CONTENT_TYPE_TO_ITOA_TYPE, IMAGE_EXT_TO_MIMETYPE, ContentPackFields, ContentPackFiles, ContentType ) from itsi.content_packs.content_types import CONTENT_TYPE_TO_MODEL from itsi.content_packs.journal import EntryType from itsi.objects.itsi_content_pack_status import ItsiContentPackStatus def get_objects_reader(content_type, session_key, logger): """ Returns an instance that's capable of reading content object data. :param content_type: the content type :type content_type: str :param session_key: the session key :type session_key: str :param logger: a logger instance :type logger: Logger :return: a reader instance :rtype: object """ if content_type == ContentType.GLASS_TABLE_IMAGE: return ContentImagesReader( logger=logger, session_key=session_key ) return ContentObjectsReader( logger=logger, session_key=session_key ) def make_path_to_content_pack_file(content_pack_id, file_name=''): """ Returns the file path to the content pack file or directory. :param content_pack_id: the content pack id :type content_pack_id: str :param file_name: the file name :type file_name: str :return: the file path to the content pack file or directory :rtype: str """ return make_splunkhome_path([ 'etc', 'apps', content_pack_id, CONTENT_PACKS_HOME_DIR, file_name ]) def make_static_path_to_content_pack_file(content_pack_id, file_name): """ Returns the static path for the given file. :param content_pack_id: the content pack id :type content_pack_id: str :param file_name: the file name :type file_name: str :return: the static path to the file :rtype: str """ return os.path.join('app', content_pack_id, file_name) def make_static_path_to_screenshot(content_pack_id, screenshot): """ Returns the static path for the given file. :param content_pack_id: the content pack id :type content_pack_id: str :param screenshot: the screenshot file name :type screenshot: str :return: the static path to the screenshot file :rtype: str """ if not screenshot: return None return os.path.join('app', content_pack_id, CONTENT_PACKS_SCREENSHOTS_DIR, screenshot) def get_content_packs_conf(getargs=None, session_key=None): """ Returns the content packs conf data. :param getargs: the GET params :type getargs: dict :param session_key: the session key :type session_key: str :return: the list of content packs data :rtype: list """ conf = get_conf( session_key=session_key, conf_name=CONF_ITSI_CONTENT_PACKS, getargs=getargs ) content = json.loads(conf['content']) return content['entry'] def get_content_pack_conf(content_pack_id, version, session_key=None): """ Returns the directory of the content pack objects data for the given content pack. :param content_pack_id: the content pack id :type content_pack_id: str :param version: the content pack version (currently unused) :type version: str :param session_key: the session key :type session_key: str :return: the directory path to the content pack objects data :rtype: str """ conf = get_conf_stanza( session_key=session_key, conf_name=CONF_ITSI_CONTENT_PACKS, stanza_name=content_pack_id ) content = json.loads(conf['content']) entry = content['entry'][0] if entry['content']['version'] != version: raise EnvironmentError( 'Requested version="{}", but found content_pack_version="{}"'.format( version, entry['content']['version']) ) return entry def read_content_pack_file(content_pack_id, content_pack_file): """ Returns the file data for the given content pack file. :param content_pack_id: the content pack id :type content_pack_id: str :param content_pack_file: the content pack file name :type app: str :return: the file data :rtype: dict """ file_path = make_path_to_content_pack_file(content_pack_id, content_pack_file) with open(file_path, 'r') as file_object: contents = file_object.read() return contents def read_manifest(content_pack_id): """ Returns the manifest file for this content pack. This is v2 of the original read_manifest (in 4.8) In 4.9, we have restructured Splunk App for Content Packs to keep all the content packs as separate apps instead of merged together within one application. So the paths to individual content pack files have changed. :param content_pack_id: the content pack id :type content_pack_id: str :return: the manifest data :rtype: dict """ contents = read_content_pack_file(content_pack_id, ContentPackFiles.MANIFEST) return json.loads(contents) class ContentPackContentReader(object): """This class is responsible for fetching, reading, and parsing content pack data.""" def __init__(self, logger, session_key): """ :param logger: a logger instance :type logger: Logger :param session_key: the session key :type session_key: str """ self.logger = logger self.session_key = session_key def read(self, content_pack_id, version, journal): """ Reads and returns the content pack object data for the given content pack. :param content_pack_id: the content pack id :type content_pack_id: str :param version: the content pack version :type version: str :param journal: a journal instance :type journal: TransactionJournal :return: the content pack objects data :rtype: dict """ get_content_pack_conf(content_pack_id, version, self.session_key) content_dir = make_path_to_content_pack_file(content_pack_id) if not os.path.isdir(content_dir): raise EnvironmentError( 'Invalid directory found for content_dir="{}"'.format(content_dir) ) manifest = read_manifest(content_pack_id) content_objects = {} supported_types = CONTENT_TYPE_TO_ITOA_TYPE.keys() for content_type in supported_types: object_ids = manifest.get(content_type, []) if not object_ids: continue reader = get_objects_reader(content_type, session_key=self.session_key, logger=self.logger) dir_path = os.path.join(content_dir, content_type) file_paths = [os.path.join(dir_path, file_name) for file_name in object_ids] models_data = reader.read(file_paths, journal=journal) objects = self.to_itoa_data(content_type, models_data) content_objects[content_type] = objects return content_objects def to_itoa_data(self, content_type, models_data): """ Converts the given models data into a format that is compatible with ITOA object interfaces. :param content_type: the content type :type content_type: str :param models_data: the models data :type models_data: list :return: the ITOA interface objects data :rtype: list """ objects = [] for model_data in models_data: try: model_class = CONTENT_TYPE_TO_MODEL[content_type] model = model_class(model_data) except Exception as exc: self.logger.exception(exc) continue obj = model.to_dict(use_alias=True) obj.setdefault('_key', model.get_key()) obj.pop('version', None) objects.append(obj) return objects class ContentObjectsReader(object): """Reads and parses content objects data.""" def __init__(self, logger, session_key): """ :param logger: a logger instance :type logger: Logger :param session_key: the session key :type session_key: str """ self.logger = logger self.session_key = session_key def read(self, file_paths, journal): """ Reads and parses the content objects located at the given directory path. :param file_paths: a list of file paths :type file_paths: list :param journal: a journal instance :type journal: TransactionJournal :return: the list of content objects found at this directory path :rtype: list """ for file_path in file_paths: if not os.path.isfile(file_path): self.logger.info('Skipping read of non-file at file_path="%s"', file_path) continue file_name = os.path.basename(file_path) object_id, ext = os.path.splitext(file_name) if ext != '.json': self.logger.info('Skipping read of non-JSON file at file_path="%s"', file_path) continue try: object_data = self.read_object(file_path) except Exception as ex: self.logger.error('Error while reading JSON data at file_path="%s"', file_path) self.logger.exception(ex) journal.failure({ 'error_message': str(ex), 'type': EntryType.ERROR_OBJECT_READ, 'id': object_id }) continue if not isinstance(object_data, dict): error_message = f'Unexpected non-dict object read from file_path="{file_path}" object_data="{object_data}"' self.logger.error(error_message) journal.failure({ 'error_message': error_message, 'type': EntryType.INVALID_OBJECT_TYPE, 'id': object_id }) continue yield object_data def read_object(self, filepath): """ Reads the object data at the given file path. :param filepath: the path to the file :type filepath: str :return: the object data :rtype: str """ with open(filepath, 'r') as file_object: object_data = json.loads(file_object.read()) return object_data class ContentImagesReader(object): """Reads and parses content image data.""" def __init__(self, logger, session_key): """ :param logger: a logger instance :type logger: Logger :param session_key: the session key :type session_key: str """ self.logger = logger self.session_key = session_key def read(self, file_paths, journal): """ Reads and parses the content images located at the given directory path. :param file_paths: a list of file paths :type file_paths: list :param journal: a journal instance :type journal: TransactionJournal :return: the list of content images found at this directory path :rtype: list """ for file_path in file_paths: if not os.path.isfile(file_path): self.logger.info('Skipping read of non-file at file_path="%s"', file_path) continue file_name = os.path.basename(file_path) object_id, ext = os.path.splitext(file_name) mimetype = IMAGE_EXT_TO_MIMETYPE.get(ext) if not mimetype: self.logger.info('Skipping read of non-image file at file_path="%s"', file_path) continue try: file_data = self.read_file(file_path) except Exception as ex: self.logger.error('Error while reading image data at file_path="%s"', file_path) self.logger.exception(ex) journal.failure({ 'error_message': str(ex), 'type': EntryType.ERROR_IMAGE_READ, 'id': object_id }) continue try: data = base64.b64encode(file_data).decode('utf-8') except Exception as ex: self.logger.error('Error while base64 encoding data at file_path="%s"', file_path) self.logger.exception(ex) journal.failure({ 'error_message': str(ex), 'type': EntryType.ERROR_IMAGE_ENCODE, 'id': object_id }) continue model_data = { 'key': object_id, 'data': data, 'name': file_name, 'type': mimetype } yield model_data def read_file(self, filepath): """ Reads the file data at the given path. :param filepath: the path to the file :type filepath: str :return: the file data :rtype: bytes """ with open(filepath, 'rb') as file_object: file_data = file_object.read() return file_data class ContentPackMetadataReader(object): """This class is responsible for reading content pack metadata. """ def __init__(self, logger, session_key): """ :param logger: a logger instance :type logger: Logger :param session_key: the session key :type session_key: str """ self.logger = logger self.session_key = session_key def read(self, conf_entry, extra_fields=None): """ Returns the content pack data from a conf entry. :param conf_entry: the conf entry data :type conf_entry: dict :param extra_fields: a list of extra non-default fields to include with the data :type extra_fields: list :return: the content pack data :rtype: dict """ if extra_fields is None: extra_fields = [] content_pack_id = conf_entry['name'] entry_content = conf_entry['content'] manifest = read_manifest(content_pack_id) installed_versions = [] installed_content_pack = ItsiContentPackStatus(self.session_key, 'nobody').get('nobody', content_pack_id) if installed_content_pack: installed_versions = list(installed_content_pack[ContentPackFields.VERSION_INSTALLED]) content_pack = ({ ContentPackFields.AUTHOR: entry_content.get(ContentPackFields.AUTHOR, ''), ContentPackFields.DESCRIPTION: entry_content.get(ContentPackFields.DESCRIPTION, ''), ContentPackFields.ICON: make_static_path_to_content_pack_file( content_pack_id, ContentPackFiles.ICON ), ContentPackFields.ID: content_pack_id, ContentPackFields.HELP_LINKS: manifest.get(ContentPackFields.HELP_LINKS, []), ContentPackFields.TITLE: entry_content[ContentPackFields.TITLE], ContentPackFields.VERSION: entry_content[ContentPackFields.VERSION], ContentPackFields.VERSION_INSTALLED: installed_versions }) screenshots = manifest.get(ContentPackFields.SCREENSHOTS, []) or [] for screenshot in screenshots: path = screenshot.get(ContentPackFields.SCREENSHOT_PATH) if path: screenshot[ContentPackFields.SCREENSHOT_PATH] = make_static_path_to_screenshot( content_pack_id, path ) thumb = screenshot.get(ContentPackFields.SCREENSHOT_THUMB) if thumb: screenshot[ContentPackFields.SCREENSHOT_THUMB] = make_static_path_to_screenshot( content_pack_id, thumb ) content_pack[ContentPackFields.SCREENSHOTS] = screenshots screenshot_main = manifest.get(ContentPackFields.SCREENSHOT_MAIN, {}) or {} path = screenshot_main.get(ContentPackFields.SCREENSHOT_PATH) if path: screenshot_main[ContentPackFields.SCREENSHOT_PATH] = make_static_path_to_screenshot( content_pack_id, path ) thumb = screenshot_main.get(ContentPackFields.SCREENSHOT_THUMB) if thumb: screenshot_main[ContentPackFields.SCREENSHOT_THUMB] = make_static_path_to_screenshot( content_pack_id, thumb ) content_pack[ContentPackFields.SCREENSHOT_MAIN] = screenshot_main if ContentPackFields.OVERVIEW in extra_fields: content_pack[ContentPackFields.OVERVIEW] = read_content_pack_file(content_pack_id, ContentPackFiles.OVERVIEW) return content_pack