You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

285 lines
7.4 KiB

# Copyright 2016 Splunk, Inc.
# SPDX-FileCopyrightText: 2020 2020
#
# SPDX-License-Identifier: Apache-2.0
"""
Splunk platform related utilities.
"""
import os
import os.path as op
import subprocess
import socket
try:
from ConfigParser import ConfigParser
CONF_PARSER_KWARGS = {}
except ImportError:
from configparser import ConfigParser
CONF_PARSER_KWARGS = {"strict": False}
from io import StringIO
from . import utils
__all__ = [
"make_splunkhome_path",
"get_splunk_host_info",
"get_splunk_bin",
"get_splunkd_access_info",
"get_splunkd_uri",
"get_conf_key_value",
"get_conf_stanza",
"get_conf_stanzas",
]
ETC_LEAF = "etc"
# See validateSearchHeadPooling() in src/libbundle/ConfSettings.cpp
on_shared_storage = [
os.path.join(ETC_LEAF, "apps"),
os.path.join(ETC_LEAF, "users"),
os.path.join("var", "run", "splunk", "dispatch"),
os.path.join("var", "run", "splunk", "srtemp"),
os.path.join("var", "run", "splunk", "rss"),
os.path.join("var", "run", "splunk", "scheduler"),
os.path.join("var", "run", "splunk", "lookup_tmp"),
]
def _splunk_home():
return os.path.normpath(os.environ["SPLUNK_HOME"])
def _splunk_etc():
try:
result = os.environ["SPLUNK_ETC"]
except KeyError:
result = op.join(_splunk_home(), ETC_LEAF)
return os.path.normpath(result)
def _get_shared_storage():
"""Get splunk shared storage name.
:returns: Splunk shared storage name.
:rtype: ``string``
"""
try:
state = get_conf_key_value("server", "pooling", "state")
storage = get_conf_key_value("server", "pooling", "storage")
except KeyError:
state = "disabled"
storage = None
if state == "enabled" and storage:
return storage
return None
# Verify path prefix and return true if both paths have drives
def _verify_path_prefix(path, start):
path_drive = os.path.splitdrive(path)[0]
start_drive = os.path.splitdrive(start)[0]
return len(path_drive) == len(start_drive)
def make_splunkhome_path(parts):
"""Construct absolute path by $SPLUNK_HOME and `parts`.
Concatenate $SPLUNK_HOME and `parts` to an absolute path.
For example, `parts` is ['etc', 'apps', 'Splunk_TA_test'],
the return path will be $SPLUNK_HOME/etc/apps/Splunk_TA_test.
Note: this function assumed SPLUNK_HOME is in environment varialbes.
:param parts: Path parts.
:type parts: ``list, tuple``
:returns: Absolute path.
:rtype: ``string``
:raises ValueError: Escape from intended parent directories.
"""
relpath = os.path.normpath(os.path.join(*parts))
basepath = None
shared_storage = _get_shared_storage()
if shared_storage:
for candidate in on_shared_storage:
# SPL-100508 On windows if the path is missing the drive letter,
# construct fullpath manually and call relpath
if os.name == "nt" and not _verify_path_prefix(relpath, candidate):
break
if os.path.relpath(relpath, candidate)[0:2] != "..":
basepath = shared_storage
break
if basepath is None:
etc_with_trailing_sep = os.path.join(ETC_LEAF, "")
if relpath == ETC_LEAF or relpath.startswith(etc_with_trailing_sep):
# Redirect $SPLUNK_HOME/etc to $SPLUNK_ETC.
basepath = _splunk_etc()
# Remove leading etc (and path separator, if present). Note: when
# emitting $SPLUNK_ETC exactly, with no additional path parts, we
# set <relpath> to the empty string.
relpath = relpath[4:]
else:
basepath = _splunk_home()
fullpath = os.path.normpath(os.path.join(basepath, relpath))
# Check that we haven't escaped from intended parent directories.
if os.path.relpath(fullpath, basepath)[0:2] == "..":
raise ValueError(
'Illegal escape from parent directory "{}": {}'.format(basepath, fullpath)
)
return fullpath
def get_splunk_host_info():
"""Get splunk host info.
:returns: Tuple of (server_name, host_name).
:rtype: ``tuple``
"""
server_name = get_conf_key_value("server", "general", "serverName")
host_name = socket.gethostname()
return (server_name, host_name)
def get_splunk_bin():
"""Get absolute path of splunk CLI.
:returns: absolute path of splunk CLI
:rtype: ``string``
"""
if os.name == "nt":
splunk_bin = "splunk.exe"
else:
splunk_bin = "splunk"
return make_splunkhome_path(("bin", splunk_bin))
def get_splunkd_access_info():
"""Get splunkd server access info.
:returns: Tuple of (scheme, host, port).
:rtype: ``tuple``
"""
if utils.is_true(get_conf_key_value("server", "sslConfig", "enableSplunkdSSL")):
scheme = "https"
else:
scheme = "http"
host_port = get_conf_key_value("web", "settings", "mgmtHostPort")
host_port = host_port.strip()
host = host_port.split(":")[0]
port = int(host_port.split(":")[1])
if "SPLUNK_BINDIP" in os.environ:
bindip = os.environ["SPLUNK_BINDIP"]
port_idx = bindip.rfind(":")
host = bindip[:port_idx] if port_idx > 0 else bindip
return (scheme, host, port)
def get_splunkd_uri():
"""Get splunkd uri.
:returns: Splunkd uri.
:rtype: ``string``
"""
if os.environ.get("SPLUNKD_URI"):
return os.environ["SPLUNKD_URI"]
scheme, host, port = get_splunkd_access_info()
return "{scheme}://{host}:{port}".format(scheme=scheme, host=host, port=port)
def get_conf_key_value(conf_name, stanza, key):
"""Get value of `key` of `stanza` in `conf_name`.
:param conf_name: Config file.
:type conf_name: ``string``
:param stanza: Stanza name.
:type stanza: ``string``
:param key: Key name.
:type key: ``string``
:returns: Config value.
:rtype: ``(string, list, dict)``
:raises KeyError: If `stanza` or `key` doesn't exist.
"""
stanzas = get_conf_stanzas(conf_name)
return stanzas[stanza][key]
def get_conf_stanza(conf_name, stanza):
"""Get `stanza` in `conf_name`.
:param conf_name: Config file.
:type conf_name: ``string``
:param stanza: Stanza name.
:type stanza: ``string``
:returns: Config stanza.
:rtype: ``dict``
:raises KeyError: If stanza doesn't exist.
"""
stanzas = get_conf_stanzas(conf_name)
return stanzas[stanza]
def get_conf_stanzas(conf_name):
"""Get stanzas of `conf_name`
:param conf_name: Config file.
:type conf_name: ``string``
:returns: Config stanzas.
:rtype: ``dict``
Usage::
>>> stanzas = get_conf_stanzas('server')
>>> return: {'serverName': 'testServer', 'sessionTimeout': '1h', ...}
"""
if conf_name.endswith(".conf"):
conf_name = conf_name[:-5]
# TODO: dynamically caculate SPLUNK_HOME
btool_cli = [
op.join(os.environ["SPLUNK_HOME"], "bin", "splunk"),
"cmd",
"btool",
conf_name,
"list",
]
p = subprocess.Popen(btool_cli, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, _ = p.communicate()
if isinstance(out, bytes):
out = out.decode()
parser = ConfigParser(**CONF_PARSER_KWARGS)
parser.optionxform = str
parser.readfp(StringIO(out))
out = {}
for section in parser.sections():
out[section] = {item[0]: item[1] for item in parser.items(section, raw=True)}
return out