You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

424 lines
12 KiB

# coding=utf-8
#
# Copyright © 2011-2024 Splunk, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"): you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import csv
import os
import re
from io import open, StringIO
from os import getcwd
from json.encoder import encode_basestring_ascii as json_encode_string
from collections import namedtuple
class Validator:
""" Base class for validators that check and format search command options.
You must inherit from this class and override :code:`Validator.__call__` and
:code:`Validator.format`. :code:`Validator.__call__` should convert the
value it receives as argument and then return it or raise a
:code:`ValueError`, if the value will not convert.
:code:`Validator.format` should return a human readable version of the value
it receives as argument the same way :code:`str` does.
"""
def __call__(self, value):
raise NotImplementedError()
def format(self, value):
raise NotImplementedError()
class Boolean(Validator):
""" Validates Boolean option values.
"""
truth_values = {
'1': True, '0': False,
't': True, 'f': False,
'true': True, 'false': False,
'y': True, 'n': False,
'yes': True, 'no': False
}
def __call__(self, value):
if not (value is None or isinstance(value, bool)):
value = str(value).lower()
if value not in Boolean.truth_values:
raise ValueError(f'Unrecognized truth value: {value}')
value = Boolean.truth_values[value]
return value
def format(self, value):
if value is None:
return None
return 't' if value else 'f'
class Code(Validator):
""" Validates code option values.
This validator compiles an option value into a Python code object that can be executed by :func:`exec` or evaluated
by :func:`eval`. The value returned is a :func:`namedtuple` with two members: object, the result of compilation, and
source, the original option value.
"""
def __init__(self, mode='eval'):
"""
:param mode: Specifies what kind of code must be compiled; it can be :const:`'exec'`, if source consists of a
sequence of statements, :const:`'eval'`, if it consists of a single expression, or :const:`'single'` if it
consists of a single interactive statement. In the latter case, expression statements that evaluate to
something other than :const:`None` will be printed.
:type mode: unicode or bytes
"""
self._mode = mode
def __call__(self, value):
if value is None:
return None
try:
return Code.object(compile(value, 'string', self._mode), str(value))
except (SyntaxError, TypeError) as error:
message = str(error)
raise ValueError(message) from error
def format(self, value):
return None if value is None else value.source
object = namedtuple('Code', ('object', 'source'))
class Fieldname(Validator):
""" Validates field name option values.
"""
pattern = re.compile(r'''[_.a-zA-Z-][_.a-zA-Z0-9-]*$''')
def __call__(self, value):
if value is not None:
value = str(value)
if Fieldname.pattern.match(value) is None:
raise ValueError(f'Illegal characters in fieldname: {value}')
return value
def format(self, value):
return value
class File(Validator):
""" Validates file option values.
"""
def __init__(self, mode='rt', buffering=None, directory=None):
self.mode = mode
self.buffering = buffering
self.directory = File._var_run_splunk if directory is None else directory
def __call__(self, value):
if value is None:
return value
path = str(value)
if not os.path.isabs(path):
path = os.path.join(self.directory, path)
try:
value = open(path, self.mode) if self.buffering is None else open(path, self.mode, self.buffering)
except IOError as error:
raise ValueError(f'Cannot open {value} with mode={self.mode} and buffering={self.buffering}: {error}')
return value
def format(self, value):
return None if value is None else value.name
_var_run_splunk = os.path.join(
os.environ['SPLUNK_HOME'] if 'SPLUNK_HOME' in os.environ else getcwd(), 'var', 'run', 'splunk')
class Integer(Validator):
""" Validates integer option values.
"""
def __init__(self, minimum=None, maximum=None):
if minimum is not None and maximum is not None:
def check_range(value):
if not minimum <= value <= maximum:
raise ValueError(f'Expected integer in the range [{minimum},{maximum}], not {value}')
elif minimum is not None:
def check_range(value):
if value < minimum:
raise ValueError(f'Expected integer in the range [{minimum},+∞], not {value}')
elif maximum is not None:
def check_range(value):
if value > maximum:
raise ValueError(f'Expected integer in the range [-∞,{maximum}], not {value}')
else:
def check_range(value):
return
self.check_range = check_range
def __call__(self, value):
if value is None:
return None
try:
value = int(value)
except ValueError:
raise ValueError(f'Expected integer value, not {json_encode_string(value)}')
self.check_range(value)
return value
def format(self, value):
return None if value is None else str(int(value))
class Float(Validator):
""" Validates float option values.
"""
def __init__(self, minimum=None, maximum=None):
if minimum is not None and maximum is not None:
def check_range(value):
if not minimum <= value <= maximum:
raise ValueError(f'Expected float in the range [{minimum},{maximum}], not {value}')
elif minimum is not None:
def check_range(value):
if value < minimum:
raise ValueError(f'Expected float in the range [{minimum},+∞], not {value}')
elif maximum is not None:
def check_range(value):
if value > maximum:
raise ValueError(f'Expected float in the range [-∞,{maximum}], not {value}')
else:
def check_range(value):
return
self.check_range = check_range
def __call__(self, value):
if value is None:
return None
try:
value = float(value)
except ValueError:
raise ValueError(f'Expected float value, not {json_encode_string(value)}')
self.check_range(value)
return value
def format(self, value):
return None if value is None else str(float(value))
class Duration(Validator):
""" Validates duration option values.
"""
def __call__(self, value):
if value is None:
return None
p = value.split(':', 2)
result = None
_60 = Duration._60
_unsigned = Duration._unsigned
try:
if len(p) == 1:
result = _unsigned(p[0])
if len(p) == 2:
result = 60 * _unsigned(p[0]) + _60(p[1])
if len(p) == 3:
result = 3600 * _unsigned(p[0]) + 60 * _60(p[1]) + _60(p[2])
except ValueError:
raise ValueError(f'Invalid duration value: {value}')
return result
def format(self, value):
if value is None:
return None
value = int(value)
s = value % 60
m = value // 60 % 60
h = value // (60 * 60)
return '{0:02d}:{1:02d}:{2:02d}'.format(h, m, s)
_60 = Integer(0, 59)
_unsigned = Integer(0)
class List(Validator):
""" Validates a list of strings
"""
class Dialect(csv.Dialect):
""" Describes the properties of list option values. """
strict = True
delimiter = str(',')
quotechar = str('"')
doublequote = True
lineterminator = str('\n')
skipinitialspace = True
quoting = csv.QUOTE_MINIMAL
def __init__(self, validator=None):
if not (validator is None or isinstance(validator, Validator)):
raise ValueError(f'Expected a Validator instance or None for validator, not {repr(validator)}')
self._validator = validator
def __call__(self, value):
if value is None or isinstance(value, list):
return value
try:
value = next(csv.reader([value], self.Dialect))
except csv.Error as error:
raise ValueError(error)
if self._validator is None:
return value
try:
for index, item in enumerate(value):
value[index] = self._validator(item)
except ValueError as error:
raise ValueError(f'Could not convert item {index}: {error}')
return value
def format(self, value):
output = StringIO()
writer = csv.writer(output, List.Dialect)
writer.writerow(value)
value = output.getvalue()
return value[:-1]
class Map(Validator):
""" Validates map option values.
"""
def __init__(self, **kwargs):
self.membership = kwargs
def __call__(self, value):
if value is None:
return None
value = str(value)
if value not in self.membership:
raise ValueError(f'Unrecognized value: {value}')
return self.membership[value]
def format(self, value):
return None if value is None else list(self.membership.keys())[list(self.membership.values()).index(value)]
class Match(Validator):
""" Validates that a value matches a regular expression pattern.
"""
def __init__(self, name, pattern, flags=0):
self.name = str(name)
self.pattern = re.compile(pattern, flags)
def __call__(self, value):
if value is None:
return None
value = str(value)
if self.pattern.match(value) is None:
raise ValueError(f'Expected {self.name}, not {json_encode_string(value)}')
return value
def format(self, value):
return None if value is None else str(value)
class OptionName(Validator):
""" Validates option names.
"""
pattern = re.compile(r'''(?=\w)[^\d]\w*$''', re.UNICODE)
def __call__(self, value):
if value is not None:
value = str(value)
if OptionName.pattern.match(value) is None:
raise ValueError(f'Illegal characters in option name: {value}')
return value
def format(self, value):
return None if value is None else str(value)
class RegularExpression(Validator):
""" Validates regular expression option values.
"""
def __call__(self, value):
if value is None:
return None
try:
value = re.compile(str(value))
except re.error as error:
raise ValueError(f'{str(error).capitalize()}: {value}')
return value
def format(self, value):
return None if value is None else value.pattern
class Set(Validator):
""" Validates set option values.
"""
def __init__(self, *args):
self.membership = set(args)
def __call__(self, value):
if value is None:
return None
value = str(value)
if value not in self.membership:
raise ValueError(f'Unrecognized value: {value}')
return value
def format(self, value):
return self.__call__(value)
__all__ = ['Boolean', 'Code', 'Duration', 'File', 'Integer', 'Float', 'List', 'Map', 'RegularExpression', 'Set']