You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
173 lines
5.7 KiB
173 lines
5.7 KiB
import copy
|
|
import json
|
|
import urllib.request, urllib.parse, urllib.error
|
|
from util.rest_bouncer import make_rest_call
|
|
|
|
|
|
def make_splunk_url(splunk_proxy, namespace, extra_url_parts=None, url_params=None):
|
|
"""
|
|
Compose a generic Splunk REST URL.
|
|
|
|
Args:
|
|
splunk_proxy (SplunkRestProxy): splunk rest bouncer wrapper
|
|
namespace (str): namespace, either 'user', 'app' or 'global'
|
|
extra_url_parts (array): an array of strings for extra url path parts, ie. ['users', '1'] becomes 'users/1'
|
|
url_params (array): an array of tuples representing URL params, ie. [(count, -1)]
|
|
|
|
Returns:
|
|
base_url (str): The base Splunk REST, namespaced to the user/app
|
|
"""
|
|
# note that we also accept global in this function.
|
|
# This is because in listmodels, the returned models need to be loaded
|
|
# using load_model function with namespace specified. Since we are getting
|
|
# namespace info from Splunk for each model, it can be global (set from UI).
|
|
# Therefore, we need to support global in make_lookup_url so that we won't
|
|
# throw at load_model time when Splunk tags a certain model as global.
|
|
|
|
if extra_url_parts is None:
|
|
extra_url_parts = []
|
|
|
|
if url_params is None:
|
|
url_params = []
|
|
|
|
if namespace in ['app', 'global']:
|
|
user = 'nobody'
|
|
elif namespace == 'user':
|
|
user = splunk_proxy.splunk_user
|
|
elif namespace == '-':
|
|
user = namespace
|
|
else:
|
|
raise RuntimeError('You may only specify namespace "app"')
|
|
|
|
base_url_parts = [
|
|
splunk_proxy.splunkd_uri,
|
|
splunk_proxy.name_space_str,
|
|
user,
|
|
splunk_proxy.splunk_app,
|
|
]
|
|
|
|
base_url = '/'.join(base_url_parts + extra_url_parts)
|
|
|
|
url_param_string = urllib.parse.urlencode(url_params)
|
|
|
|
if url_param_string:
|
|
base_url += '?' + url_param_string
|
|
|
|
return base_url
|
|
|
|
|
|
def make_lookup_url(splunk_proxy, namespace, lookup_file=None, url_params=None):
|
|
"""
|
|
Compose url for data/lookup_table_files endpoint.
|
|
|
|
Args:
|
|
splunk_proxy (SplunkRestProxy): splunk rest bouncer wrapper
|
|
namespace (str): namespace, either 'user', 'app' or 'global'
|
|
lookup_file (str): Optional. Target lookup_file for the endpoint.
|
|
url_params (array): an array of tuples representing URL params, ie. [(count, -1)]
|
|
|
|
|
|
Returns:
|
|
base_url (str): the base Splunk /lookup-table-files URL for the current namespace
|
|
"""
|
|
|
|
url_parts = ['data', 'lookup-table-files']
|
|
if lookup_file is not None:
|
|
url_parts.append(lookup_file)
|
|
|
|
url = make_splunk_url(
|
|
splunk_proxy, namespace, extra_url_parts=url_parts, url_params=url_params
|
|
)
|
|
|
|
return url
|
|
|
|
|
|
def make_get_lookup_url(splunk_proxy, namespace, lookup_file=None, url_params=None):
|
|
"""
|
|
Compose url for GET operation to data/lookup_table_files endpoint.
|
|
|
|
Args:
|
|
splunk_proxy (SplunkRestProxy): splunk rest bouncer wrapper
|
|
namespace (str): namespace, either 'user', 'app' or 'global'
|
|
lookup_file (str): Optional. Target lookup_file for the endpoint.
|
|
url_params (array): an array of tuples representing URL params, ie. [(count, -1)]
|
|
|
|
Returns:
|
|
url (str): the Splunk /lookup-table-files URL for GET commands specifically
|
|
"""
|
|
|
|
if url_params is None:
|
|
url_params = []
|
|
|
|
# copy url_params because we need to modify it and we want to leave the passed-in argument untouched
|
|
url_params_copy = copy.deepcopy(url_params)
|
|
url_params_copy.append(('output_mode', 'json'))
|
|
|
|
url = make_lookup_url(splunk_proxy, namespace, lookup_file, url_params=url_params_copy)
|
|
|
|
return url
|
|
|
|
|
|
def make_kvstore_url(splunk_proxy, namespace, collection, extra_url_params=None):
|
|
"""
|
|
Compose a URL for querying the KVStore collection
|
|
|
|
Args:
|
|
splunk_proxy (SplunkRestProxy): splunk rest bouncer wrapper
|
|
namespace (str): namespace, either 'user' or 'app'
|
|
collection (str): the name of the KVStore collection to query
|
|
extra_url_params (list of tuples): extra parameters to be added in the url
|
|
after ?output_mode=json
|
|
|
|
Returns:
|
|
kvstore_url (str): The URL of the KVStore collection
|
|
"""
|
|
|
|
if extra_url_params is None:
|
|
extra_url_params = []
|
|
url_parts = ['storage', 'collections', 'data', collection]
|
|
|
|
return make_splunk_url(
|
|
splunk_proxy, namespace, extra_url_parts=url_parts, url_params=extra_url_params
|
|
)
|
|
|
|
|
|
def parse_capabilities_response(reply=None):
|
|
if reply:
|
|
try:
|
|
content = reply.get('content')
|
|
if not content:
|
|
raise RuntimeError("No content in reply")
|
|
json_content = json.loads(content)
|
|
if reply['success']:
|
|
content = (json_content.get('entry')[0]).get('content')
|
|
return content
|
|
else:
|
|
error_type = reply.get('error_type')
|
|
error_text = json_content.get('messages')[0].get('text')
|
|
if error_type is None:
|
|
raise RuntimeError(error_text)
|
|
elif error_type == 'AuthorizationFailed':
|
|
raise RuntimeError(f"UserNotAuthorizedError")
|
|
except Exception as e:
|
|
raise e
|
|
else:
|
|
raise RuntimeError("No reply received")
|
|
|
|
|
|
def get_user_capabilities(splunkd_uri, token, username):
|
|
# TODO: Make functions for creating url
|
|
url = f'{splunkd_uri}/services/authentication/users/{username}?output_mode=json'
|
|
reply = make_rest_call(
|
|
session_key=token,
|
|
method='GET',
|
|
url=url,
|
|
postargs=None,
|
|
jsonargs=None,
|
|
getargs=None,
|
|
rawResult=False,
|
|
)
|
|
reply_content = parse_capabilities_response(reply)
|
|
capabilities = reply_content.get('capabilities')
|
|
return capabilities
|