You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

181 lines
6.2 KiB

#!/usr/bin/env python
# Copyright (C) 2015-2019 Splunk Inc. All Rights Reserved.
import errno
import gc
import pandas as pd
import cexc
import models.base
from .BaseProcessor import BaseProcessor
from util import search_util
from util.searchinfo_util import is_parsetmp
logger = cexc.get_logger(__name__)
messages = cexc.get_messages_logger()
class ApplyProcessor(BaseProcessor):
"""The apply processor receives and returns pandas DataFrames."""
def __init__(self, process_options, searchinfo):
"""Initialize options for the processor.
Args:
process_options (dict): process options
searchinfo (dict): information required for search
"""
self.searchinfo = searchinfo
(
self.algo_name,
self.algo,
self.process_options,
self.namespace,
) = ApplyProcessor.setup_model(process_options, self.searchinfo)
self.resource_limits = ApplyProcessor.load_resource_limits(
self.algo_name, self.process_options
)
def get_relevant_fields(self):
"""Return the needed feature variables.
Returns:
relevant_fields (list): relevant fields
"""
relevant_fields = self.process_options['feature_variables'] + self.process_options.get(
'split_by', []
)
# TODO MLA-1589: require explicit _* usage
if '*' in relevant_fields:
relevant_fields.append('_*')
if '_time' not in relevant_fields:
relevant_fields.append('_time')
if 'target_variable' in self.process_options:
for x in self.process_options['target_variable']:
if x not in relevant_fields:
relevant_fields.append(x)
return relevant_fields
@classmethod
def setup_model(cls, process_options, searchinfo):
"""Load temp model, try to load real model, update options.
Remove the tmp_dir in the process.
Args:
process_options (dict): process_options
searchinfo (dict): information required for search
Returns:
algo_name (str): algorithm name
algo (object): algorithm object
process_options (dict): updated process options
namespace (str): namespace of the model
"""
tmp_dir = process_options.pop('tmp_dir')
searchinfo = search_util.add_distributed_search_info(process_options, searchinfo)
namespace = process_options.pop('namespace', None)
mlspl_conf = process_options.pop('mlspl_conf')
# import pdb; pdb.set_trace()
# For MLA-1989 we cannot properly load a model in parsetmp search
if is_parsetmp(searchinfo):
process_options['mlspl_limits'] = {}
process_options['feature_variables'] = ['*']
return None, None, process_options, None
try:
algo_name, _, model_options = models.base.load_model(
process_options['model_name'],
searchinfo,
namespace=namespace,
model_dir=tmp_dir,
skip_model_obj=True,
tmp=True,
)
algo = None
logger.debug('Using tmp model to set required_fields.')
except:
# Try to load real model.
try:
algo_name, algo, model_options = models.base.load_model(
process_options['model_name'], searchinfo, namespace=namespace
)
except (OSError, IOError) as e:
if e.errno == errno.ENOENT:
raise RuntimeError(
'model "%s" does not exist.' % process_options['model_name']
)
raise RuntimeError(
'Failed to load model "%s": %s.' % (process_options['model_name'], str(e))
)
except Exception as e:
cexc.log_traceback()
raise RuntimeError(
'Failed to load model "%s": %s.' % (process_options['model_name'], str(e))
)
model_options.update(process_options) # process options override loaded model options
process_options = model_options
process_options['mlspl_limits'] = mlspl_conf.get_stanza(algo_name)
return algo_name, algo, process_options, namespace
@staticmethod
def load_resource_limits(algo_name, process_options):
"""Load algorithm-specific limits.
Args:
algo_name (str): algorithm name
process_options (dict): the mlspl limits from the conf files
Returns:
resource_limits (dict): dictionary of resource limits
"""
resource_limits = {}
limits = process_options['mlspl_limits']
resource_limits['max_memory_usage_mb'] = int(limits.get('max_memory_usage_mb', -1))
resource_limits['streaming_apply'] = False
return resource_limits
@staticmethod
def apply(df, algo, process_options):
"""Perform the literal predict from the estimator.
Args:
df (dataframe): input data
algo (object): initialized algo object
process_options (dict): process options
Returns:
prediction_df (dataframe): output dataframe
"""
try:
prediction_df = algo.apply(df, process_options)
gc.collect()
except Exception as e:
cexc.log_traceback()
cexc.messages.warn(
'Error while applying model "%s": %s' % (process_options['model_name'], str(e))
)
raise RuntimeError(e)
return prediction_df
def process(self):
"""If algo isn't loaded, load the model. Create the output dataframe."""
if self.algo is None:
self.algo_name, self.algo, _ = models.base.load_model(
self.process_options['model_name'], self.searchinfo, namespace=self.namespace
)
if len(self.df) > 0:
self.df = self.apply(self.df, self.algo, self.process_options)
if self.df is None:
messages.warn('Apply method did not return any results.')
self.df = pd.DataFrame()