You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

151 lines
4.4 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import configparser
import json
import os
import splunk.clilib.bundle_paths
import splunk.rest as rest
from itsi.content_pack_authorship.constants import CONTENT_PACK_PREFIX
from itsi.content_packs.constants import ERROR_REFRESH_INVALID_CONTENT_PACK
itsi_content_packs_conf_path = \
rest.makeSplunkdUri() + 'servicesNS/nobody/itsi/configs/conf-itsi_content_packs'
def refresh_using_configparser(session_key, logger):
"""
Current workaround for itoa_admins not being able to refresh Content Library. This process removes the need for
making POST calls to write to itsi_content_packs.conf and directly writes it to the appropriate file. This
workaround requires a refresh of the itsi_content_packs.conf file as we are manually creating/updating
"""
logger.debug("Starting content library refresh")
local_apps = set(get_apps_local())
registered_apps = set(get_apps_registered(session_key))
payload = {
'success': {
'apps_added': [],
'apps_removed': []
},
'failed': {
'apps_added': [],
'apps_removed': []
}
}
itsi_content_packs_conf = configparser.ConfigParser()
itsi_content_packs_conf.optionxform = str
for app in local_apps:
app_id, app_conf_settings = get_content_pack_settings(app)
if app_id and app_conf_settings:
itsi_content_packs_conf[app_id] = app_conf_settings
payload['success']['apps_added'].append(app_id)
else:
payload['failed']['apps_added'].append(
{
'id': app_id,
'status': ERROR_REFRESH_INVALID_CONTENT_PACK
}
)
payload['success']['apps_removed'] = list(registered_apps - local_apps)
itsi_content_packs_conf_dir = splunk.clilib.bundle_paths.make_splunkhome_path([
'etc',
'apps',
'itsi',
'local',
'itsi_content_packs.conf'
])
with open(itsi_content_packs_conf_dir, 'w') as f:
itsi_content_packs_conf.write(f)
# reload
rest.simpleRequest(
itsi_content_packs_conf_path + '/_reload',
method='GET',
sessionKey=session_key,
raiseAllErrors=False
)
logger.debug("Finished content library refresh")
return payload
def get_content_pack_settings(name):
"""
Takes the CP_ID and returns its corresponding conf settings to write to itsi_content_packs.conf
@type: str
@param name: content pack name
@rtype: str, dictionary
@return: content_pack name, and dictionary containing content pack conf settings
"""
config_path = splunk.clilib.bundle_paths.make_splunkhome_path([
'etc',
'apps',
name,
'itsi',
'config.json'
])
content_pack_configs = {}
with open(config_path) as config_file:
config_data = json.load(config_file)
content_pack_id = config_data.get('id')
content_pack_configs['description'] = config_data.get('description')
content_pack_configs['title'] = config_data.get('title')
content_pack_configs['version'] = config_data.get('version')
content_pack_configs['isCustom'] = 1
return content_pack_id, content_pack_configs
def get_apps_local():
"""
Gets all local content packs in etc/apps
@rtype: list
@return: list of locally installed content packs
"""
apps_path = splunk.clilib.bundle_paths.make_splunkhome_path(['etc', 'apps'])
local_custom_content_packs = []
for entry in os.scandir(apps_path):
if entry.is_dir() and entry.name.startswith(CONTENT_PACK_PREFIX):
local_custom_content_packs.append(entry.name)
return local_custom_content_packs
def get_apps_registered(session_key):
"""
Gets all registered content packs in itsi_content_packs.conf
@rtype: list
@return: list of registered content_packs
"""
args = {'output_mode': 'json'}
response, content = rest.simpleRequest(
itsi_content_packs_conf_path,
method='GET',
getargs=args,
sessionKey=session_key,
raiseAllErrors=False
)
results = json.loads(content.decode("utf-8")).get('entry')
registered_apps = []
for app in results:
app_name = app.get('name')
if app_name.startswith(CONTENT_PACK_PREFIX):
registered_apps.append(app_name)
return registered_apps