#!/usr/bin/env python # Copyright (C) 2015-2019 Splunk Inc. All Rights Reserved. import cexc from sampler import ReservoirSampler from util.param_util import is_truthy, convert_params from .constants import HOWTO_CONFIGURE_MLSPL_LIMITS logger = cexc.get_logger(__name__) messages = cexc.get_messages_logger() def split_options(options, mlspl_conf, stanza_name): """Split options into class and processor options. In general, "class" may refer to algorithms or scoring methods. Pop tmp_dir from the options. Parse sample count and sample seed from original params and add them to process options. Parse the kfold_cv parameter and add it to the class options. Args: options (dict): process options mlspl_conf (obj): the conf utility for mlspl conf settings stanza_name (str): class stanza name in mlspl.conf Returns: process_options (dict): the process options we use here class_options (dict): the class options to be passed to the scorer """ converted_params = {} if 'params' in options: try: converted_params = convert_params( options['params'], ignore_extra=True, ints=['sample_count', 'sample_seed', 'kfold_cv'], ) if 'sample_count' in converted_params: del options['params']['sample_count'] if 'sample_seed' in converted_params: del options['params']['sample_seed'] if 'kfold_cv' in converted_params: del options['params']['kfold_cv'] except ValueError as e: raise RuntimeError(str(e)) # copy everything from leftover options to class options class_options = options.copy() class_options['mlspl_limits'] = mlspl_conf.get_stanza(stanza_name) class_options['kfold_cv'] = converted_params.get('kfold_cv', None) # brand new process options process_options = { # sample options are added to the process options 'sample_seed': converted_params.get('sample_seed', None), 'sample_count': converted_params.get('sample_count', None), # needed by processor, not class 'tmp_dir': class_options.pop('tmp_dir'), } return process_options, class_options def load_sampler_limits(process_options, stanza_name, mlspl_conf): """Read sampling limits from conf file and decide sample count. Args: process_options (dict): process options stanza_name (str): algo/scorer stanza name in mlspl.conf mlspl_conf (obj): the conf utility for mlspl conf settings Returns: sampler_limits (dict): sampler limits """ max_inputs = int(mlspl_conf.get_mlspl_prop('max_inputs', stanza_name, -1)) sampler_limits = { 'use_sampling': is_truthy( str(mlspl_conf.get_mlspl_prop('use_sampling', stanza_name, 'yes')) ), 'sample_seed': process_options['sample_seed'], # simply set sample seed } # setting up the logic to choose the sample count if process_options['sample_count']: sampler_limits['sample_count'] = min(process_options['sample_count'], max_inputs) else: sampler_limits['sample_count'] = max_inputs return sampler_limits def load_resource_limits(stanza_name, mlspl_conf): """Load class-specific resource limits. Load resources limits for scoring and algo methods. Args: stanza_name (str): name opf algo/scorer stanza in mlspl.conf mlspl_conf (obj): the conf utility for mlspl conf settings Returns: resource_limits (dict): dictionary of resource limits including max_fit_time (or max_score_time), max_memory_usage_mb, and max_model_size_mb """ # Can return scoring or algorithm properties runtime_key = 'max_score_time' if 'score:' in stanza_name else 'max_fit_time' resource_limits = { 'max_memory_usage_mb': int( mlspl_conf.get_mlspl_prop('max_memory_usage_mb', stanza_name, -1) ), runtime_key: int(mlspl_conf.get_mlspl_prop(runtime_key, stanza_name, -1)), 'max_model_size_mb': int( mlspl_conf.get_mlspl_prop('max_model_size_mb', stanza_name, -1) ), } return resource_limits def get_sampler(sampler_limits): """Initialize the sampler and use resource limits from processor. Args: sampler_limits (dict): sampler limits Returns: (object): sampler object """ return ReservoirSampler( sampler_limits['sample_count'], random_state=sampler_limits['sample_seed'] ) def check_sampler(sampler_limits, class_name): """Inform user if sampling is on. Raise error if sampling is off and events exceed limit. Args: sampler_limits (dict): sampler limits class_name (str): name of algo/scorer class """ if is_truthy(sampler_limits['use_sampling']): messages.warn( 'Input event count exceeds max_inputs for {} ({}), model will be fit on a sample of events. {}'.format( class_name, sampler_limits['sample_count'], HOWTO_CONFIGURE_MLSPL_LIMITS ) ) else: raise RuntimeError( 'Input event count exceeds max_inputs for {} ({}) and sampling is disabled. {}'.format( class_name, sampler_limits['sample_count'], HOWTO_CONFIGURE_MLSPL_LIMITS ) )