You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

130 lines
3.6 KiB

# Generic utility functions
import os
import re
import fnmatch
import platform
SUPPORTED_SYSTEMS = {
('Linux', 'x86_64'): 'linux_x86_64',
('Darwin', 'x86_64'): 'darwin_x86_64',
('Darwin', 'arm64'): 'darwin_arm64',
('Windows', 'AMD64'): 'windows_x86_64',
}
PSC_PATH_PREFIX = 'Splunk_SA_Scientific_Python_'
# originally moved from exec_anaconda.py
# Note: the following functions do NOT work with Search Head
# Pooling/shared storage.
def get_splunkhome_path():
return os.path.normpath(os.environ['SPLUNK_HOME'])
def make_splunkhome_path(p):
return os.path.join(get_splunkhome_path(), *p)
def get_etc_path():
return os.environ.get('SPLUNK_ETC', os.path.join(get_splunkhome_path(), 'etc'))
def get_apps_path(bundle_path=None, app_name=None):
"""
Get the full path to the 'apps' directory.
Args:
bundle_path: path of the search bundle that contains the 'apps' directory
Returns:
path to the apps directory
"""
full_path_to_apps_dir = bundle_path if bundle_path else get_etc_path()
return os.path.normpath(os.path.join(full_path_to_apps_dir, 'apps'))
def get_mltk_pycache_path(
app_name=None,
bundle_path=None,
):
"""
Get the full path to the 'pycache' directory inside MLTK
Args:
bundle_path: path of the search bundle that contains the 'apps' directory
app_name: name of MLTK app
Returns:
path to the pycache directory
"""
return os.path.normpath(
os.path.join(get_apps_path(bundle_path=bundle_path), app_name, 'local', '.tmp')
)
def get_staging_area_path():
staging_path = os.path.join('var', 'run', 'splunk', 'lookup_tmp')
return os.path.normpath(os.path.join(get_splunkhome_path(), staging_path))
def is_valid_identifier(name):
"""Check if name is a valid identifier.
Returns True if 'name' is a valid Python identifier. Such
identifiers don't allow '.' or '/', so may also be used to ensure
that name can be used as a filename without risk of directory
traversal.
"""
return re.match('^[a-zA-Z_][a-zA-Z0-9_]*$', name) is not None
def match_field_globs(input_fields, requested_fields):
"""Intersect input_fields with glob expansion of requested_fields.
Args:
input_fields (list): the fields that are present
requested_fields (list): the fields that are requested
Returns:
output_fields (list): matched field names
"""
output_fields = []
for f in requested_fields:
if '*' in f: # f contains a glob
pat = re.compile(fnmatch.translate(f))
matches = [
x for x in list(input_fields) if not x.startswith('__mv_') and pat.match(x)
]
if len(matches) == 0:
output_fields.append(f)
else:
output_fields.extend(matches)
else:
output_fields.append(f)
return output_fields
class MLSPLNotImplementedError(RuntimeError):
"""Custom ML-SPL exception to capture not implemented errors."""
pass
def get_system_paths():
if platform.system() == "Darwin" and "ARM64" in platform.version():
system = (platform.system(), "arm64")
else:
system = (platform.system(), platform.machine())
if system not in SUPPORTED_SYSTEMS:
raise Exception(f'Unsupported platform: {system}')
sa_scipy = f"{PSC_PATH_PREFIX}{SUPPORTED_SYSTEMS[system]}"
sa_path = os.path.join(get_apps_path(), sa_scipy)
if not os.path.isdir(sa_path):
raise Exception(f'Failed to find Python for Scientific Computing Add-on ({sa_scipy})')
return sa_path, system