You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
373 lines
16 KiB
373 lines
16 KiB
#!/usr/bin/env python
|
|
# Copyright (C) 2015-2019 Splunk Inc. All Rights Reserved.
|
|
import gc
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
|
|
import cexc
|
|
import models.base
|
|
|
|
from .BaseProcessor import BaseProcessor
|
|
from util import search_util
|
|
from util.searchinfo_util import is_parsetmp
|
|
from util.df_util import merge_predictions
|
|
from util.constants import ONNX_MODEL_EXTENSION
|
|
import util.onnx_util as onnx_util
|
|
from util.telemetry_onnx_util import (
|
|
log_onnx_model_input_shape,
|
|
log_onnx_app_algo_details,
|
|
)
|
|
|
|
logger = cexc.get_logger(__name__)
|
|
messages = cexc.get_messages_logger()
|
|
|
|
|
|
class ApplyOnnxProcessor(BaseProcessor):
|
|
"""The apply processor receives and returns pandas DataFrames."""
|
|
|
|
def __init__(self, process_options, searchinfo):
|
|
"""Initialize options for the processor.
|
|
|
|
Args:
|
|
process_options (dict): process options
|
|
searchinfo (dict): information required for search
|
|
"""
|
|
self.upload = self._is_upload(process_options)
|
|
self.searchinfo = searchinfo
|
|
self.session = None
|
|
self.algo = None
|
|
# There is no option for upload through spl currently, therefore this if condition will never be triggered.
|
|
if self.upload:
|
|
# if ran in upload phase, setup model with validation and verification steps
|
|
# else, check model entry in lookup table and return results.
|
|
(
|
|
self.algo_name,
|
|
self.process_options,
|
|
self.namespace,
|
|
) = ApplyOnnxProcessor.setup_model(process_options, self.searchinfo)
|
|
else:
|
|
# if not an upload stage, populate entries from lookup table for the model provided.
|
|
(
|
|
self.algo_name,
|
|
self.process_options,
|
|
self.namespace,
|
|
) = self.get_model_attributes(process_options, self.searchinfo)
|
|
log_onnx_app_algo_details(
|
|
self.searchinfo.get("app"), self.algo_name, self.process_options
|
|
)
|
|
|
|
self.resource_limits = ApplyOnnxProcessor.load_resource_limits(
|
|
self.algo_name, self.process_options
|
|
)
|
|
|
|
def _is_upload(self, process_options):
|
|
params = process_options.get("params", None)
|
|
if params:
|
|
upload = params.get('upload')
|
|
if upload:
|
|
return True
|
|
return False
|
|
|
|
@classmethod
|
|
def get_model_attributes(cls, process_options, searchinfo):
|
|
"""
|
|
Populate process_options with required fields: features, target, model_names, limits, model_folder_location,
|
|
:param process_options:
|
|
:param searchinfo:
|
|
:return:
|
|
"""
|
|
searchinfo = search_util.add_distributed_search_info(process_options, searchinfo)
|
|
namespace = process_options.pop('namespace', None)
|
|
|
|
# Fetch model name from process options parameter
|
|
model_name = process_options["model_name"]
|
|
if model_name.endswith(ONNX_MODEL_EXTENSION):
|
|
model_name = model_name.split('.')[0]
|
|
algo_name, model_data, model_options = models.base.get_model_options_from_disk(
|
|
model_name, searchinfo, namespace
|
|
)
|
|
|
|
# Updating process_options with the metadata information from model file.
|
|
model_options.update(process_options)
|
|
process_options = model_options
|
|
|
|
if onnx_util.check_model_for_size_limitation(model_data, process_options):
|
|
return algo_name, process_options, namespace
|
|
|
|
def get_relevant_fields(self):
|
|
"""For onnx models,
|
|
2. Within model_location, access feature and target variables from metadata info.
|
|
4. check sample.csv (if exists) to verify whether these feature/target variables exists
|
|
5. If it does, update self.process_options with these params, else raise feature not found error.
|
|
7. Return the feature variables as relevant fields.
|
|
|
|
Returns:
|
|
relevant_fields (list): relevant fields
|
|
"""
|
|
|
|
relevant_fields = self.process_options['feature_variables'] + self.process_options.get(
|
|
'split_by', []
|
|
)
|
|
|
|
# TODO MLA-1589: require explicit _* usage
|
|
if '*' in relevant_fields:
|
|
relevant_fields.append('_*')
|
|
|
|
if '_time' not in relevant_fields:
|
|
relevant_fields.append('_time')
|
|
if 'target_variable' in self.process_options:
|
|
# TODO : Modify for multi target support
|
|
x = self.process_options['target_variable']
|
|
if x not in relevant_fields:
|
|
relevant_fields.append(x)
|
|
return relevant_fields
|
|
|
|
@classmethod
|
|
def setup_model(cls, process_options, searchinfo):
|
|
"""
|
|
Load temp model, try to load real model, validate user capabilities for model upload.
|
|
Parse feature variables and mlspl_limits
|
|
Remove the tmp_dir in the process.
|
|
|
|
Args:
|
|
process_options (dict): process_options
|
|
searchinfo (dict): information required for search
|
|
Returns:
|
|
algo_name (str): algorithm name
|
|
algo (object): algorithm object
|
|
process_options (dict): updated process options
|
|
namespace (str): namespace of the model
|
|
"""
|
|
|
|
searchinfo = search_util.add_distributed_search_info(process_options, searchinfo)
|
|
|
|
namespace = process_options.pop('namespace', None)
|
|
|
|
mlspl_conf = process_options.pop('mlspl_conf')
|
|
|
|
assert onnx_util.validate_user_capabilities_for_upload(searchinfo)
|
|
# For MLA-1989 we cannot properly load a model in parsetmp search
|
|
if is_parsetmp(searchinfo):
|
|
process_options['mlspl_limits'] = {}
|
|
process_options['feature_variables'] = ['*']
|
|
return None, None, process_options, None
|
|
|
|
algo_name = ONNX_MODEL_EXTENSION[1:]
|
|
process_options['mlspl_limits'] = mlspl_conf.get_stanza(algo_name)
|
|
|
|
# Once validated and verified, create lookup table entry
|
|
reply = onnx_util.create_onnx_model_lookup_entry(
|
|
process_options['model_name'],
|
|
algo_name=algo_name,
|
|
options=process_options,
|
|
max_size=None,
|
|
tmp=False,
|
|
searchinfo=searchinfo,
|
|
namespace=namespace,
|
|
local=False,
|
|
)
|
|
|
|
return algo_name, process_options, namespace
|
|
|
|
@staticmethod
|
|
def load_resource_limits(algo_name, process_options):
|
|
"""Load algorithm-specific limits.
|
|
|
|
Args:
|
|
algo_name (str): algorithm name
|
|
process_options (dict): the mlspl limits from the conf files
|
|
|
|
Returns:
|
|
resource_limits (dict): dictionary of resource limits
|
|
"""
|
|
resource_limits = {}
|
|
limits = process_options['mlspl_limits']
|
|
resource_limits['max_memory_usage_mb'] = int(limits.get('max_memory_usage_mb', -1))
|
|
resource_limits['streaming_apply'] = False
|
|
resource_limits['max_model_size_mb'] = int(limits.get('max_model_size_mb', -1))
|
|
return resource_limits
|
|
|
|
@staticmethod
|
|
def get_input_fields(df, input_cols):
|
|
# types of inputs:
|
|
len_input_cols = len(input_cols)
|
|
if len_input_cols > 1:
|
|
# 1) where input_cols > 1
|
|
# multiple input sources, each source can represent 1 column
|
|
inputs = {c: df[c].values for c in df.columns}
|
|
df_shape = df.shape
|
|
log_onnx_model_input_shape(
|
|
"Multiple columns single-dim input", str(len_input_cols), str(df_shape)
|
|
)
|
|
if len_input_cols != len(df.columns):
|
|
raise RuntimeError(
|
|
f"Expected number of inputs in the dataset is {len_input_cols} but found {len(df.columns)}"
|
|
)
|
|
for items in input_cols:
|
|
field_name = items.name
|
|
# we need to minus one, because the first dimension is reserved for something else,
|
|
# and is usually None for ONNX
|
|
input_cols_shape_size = items.shape[1]
|
|
if field_name not in df.columns:
|
|
raise RuntimeError(
|
|
f"Required field {field_name} of type {items.type} does not exist"
|
|
)
|
|
else:
|
|
df_field_type = str(df[field_name].dtypes)
|
|
expected_field_type = items.type
|
|
print(
|
|
f"df_field_type: {df_field_type} expected_field_type: {expected_field_type}"
|
|
)
|
|
if df_field_type == "int64" or expected_field_type == "tensor(int64)":
|
|
inputs[field_name] = inputs[field_name].astype(np.int64)
|
|
elif df_field_type == "float32" or expected_field_type == "tensor(float)":
|
|
inputs[field_name] = inputs[field_name].astype(np.float32)
|
|
elif df_field_type == "float64" or expected_field_type == "tensor(float)":
|
|
inputs[field_name] = inputs[field_name].astype(np.float64)
|
|
elif df_field_type == "double" or expected_field_type == "tensor(double)":
|
|
inputs[field_name] = inputs[field_name].astype(np.float64)
|
|
else:
|
|
raise RuntimeError(
|
|
f"Only Integer and Float feature variables are "
|
|
f"supported. Found unknown values in {field_name} variable"
|
|
)
|
|
elif len_input_cols == 1:
|
|
# 2) input_cols == 1
|
|
# maybe just one input source,
|
|
# but it can be multi-dimension
|
|
input_cols_shape = input_cols[0].shape
|
|
df_shape = df.shape
|
|
input_cols_shape_size = len(input_cols_shape)
|
|
log_onnx_model_input_shape(
|
|
"Single column multi-dim input", str(input_cols_shape), str(df_shape)
|
|
)
|
|
if input_cols_shape_size <= 3:
|
|
# Scenarios with single input , 2d tensors. Ex. [None, 4]
|
|
# it's not possible to have a 3D df straight from splunk's search result
|
|
# FIXME: casting everything to float32 is not safe, nor precise
|
|
if input_cols_shape[1] != df_shape[1]:
|
|
raise RuntimeError(
|
|
f"Data has shape of {df_shape}, but ONNX model requires shape {input_cols_shape}"
|
|
)
|
|
if type(df) == pd.DataFrame:
|
|
inputs = {input_cols[0].name: df.to_numpy().astype(np.float32)}
|
|
elif type(df) == np.array or type(df) == np.ndarray:
|
|
inputs = {input_cols[0].name: df.astype(np.float32)}
|
|
return inputs
|
|
elif input_cols_shape_size > df.shape[1] and input_cols_shape_size > 3:
|
|
# we have a 2d df, but we need a 3d tensor
|
|
for column in df:
|
|
if str(df[column].dtypes) != "str":
|
|
# non string value, we can not pretend that we can split the value into an array
|
|
raise RuntimeError(
|
|
f"data has shape of {df_shape}, but ONNX model requires shape {input_cols[0].shape}"
|
|
)
|
|
# Splunk multi-value fields have the values separated by new line
|
|
df = df[column].str.split("\n")
|
|
if type(df) == pd.DataFrame:
|
|
inputs = {input_cols[0].name: df.to_numpy().astype(np.float32)}
|
|
elif type(df) == np.array or type(df) == np.ndarray:
|
|
inputs = {input_cols[0].name: df.astype(np.float32)}
|
|
return inputs
|
|
else:
|
|
raise RuntimeError(
|
|
f"data has shape of {df.shape}, but ONNX model requires shape {input_cols_shape}"
|
|
)
|
|
else:
|
|
# not input, error case
|
|
raise RuntimeError("ONNX model does not have input")
|
|
# Reshaping inputs
|
|
for k in inputs:
|
|
inputs[k] = inputs[k].reshape((inputs[k].shape[0], 1))
|
|
return inputs
|
|
|
|
@staticmethod
|
|
def find_fields_to_drop(df, process_options):
|
|
to_drop = [process_options.get("target_variable")]
|
|
# TODO: ONNX- replace with preprocessing column functions
|
|
for cols in df.columns:
|
|
if cols.startswith('_'):
|
|
to_drop.append(cols)
|
|
return to_drop
|
|
|
|
@staticmethod
|
|
def fetch_output_col(process_options, target_var):
|
|
"""
|
|
Function to fetch the output variable name when 'as' keyword is specified in the SPL
|
|
"""
|
|
# FIXME: Make sure to update this logic when adding multi-target support for apply command.
|
|
output_var = process_options.get("output_name")
|
|
if output_var:
|
|
return [f"{output_var}"]
|
|
else:
|
|
return [f"predicted({target_var})"]
|
|
|
|
@staticmethod
|
|
def apply(df, algo, process_options, session_obj=None):
|
|
"""Perform the literal predict from the estimator.
|
|
|
|
Args:
|
|
df (dataframe): input data
|
|
algo (object): initialized algo object
|
|
process_options (dict): process options
|
|
session_obj (object) : specific session obj for onnx models
|
|
|
|
Returns:
|
|
prediction_df (dataframe): output dataframe
|
|
"""
|
|
try:
|
|
assert onnx_util.validate_feature_and_target_variables(df.head(), process_options)
|
|
# TODO - ONNX can store the results for validation "df['output_column']", like scoring options.
|
|
to_drop = ApplyOnnxProcessor.find_fields_to_drop(df, process_options)
|
|
df_new = df.drop(to_drop, axis=1, inplace=False)
|
|
|
|
input_names = session_obj.get_inputs()
|
|
label_name = session_obj.get_outputs()[0].name if session_obj else None
|
|
inputs = ApplyOnnxProcessor.get_input_fields(df_new, input_names)
|
|
try:
|
|
prediction = session_obj.run(output_names=[label_name], input_feed=inputs)
|
|
prediction_df = pd.Series(prediction)
|
|
target_var = process_options.get('target_variable')
|
|
output = merge_predictions(
|
|
df,
|
|
pd.DataFrame(
|
|
prediction_df[0],
|
|
columns=ApplyOnnxProcessor.fetch_output_col(
|
|
process_options, target_var
|
|
),
|
|
),
|
|
)
|
|
except Exception as e:
|
|
raise RuntimeError(f"Error found during model inferencing : {e}")
|
|
|
|
gc.collect()
|
|
except Exception as e:
|
|
cexc.log_traceback()
|
|
cexc.messages.warn(
|
|
'Error while applying model "%s": %s' % (process_options['model_name'], str(e))
|
|
)
|
|
raise RuntimeError(e)
|
|
return output
|
|
|
|
def process(self):
|
|
if self.upload:
|
|
# if uploading, then return empty dataframe with appropriate warning/error
|
|
self.df = pd.json_normalize(self.process_options)
|
|
else:
|
|
# Else, return apply results if model lookup entry is found, else error
|
|
"""If algo isn't loaded, load the model. Create the output dataframe."""
|
|
if self.algo is None:
|
|
self.session = models.base.load_onnx_model(
|
|
model_name=self.process_options['model_name'],
|
|
searchinfo=self.searchinfo,
|
|
namespace=self.namespace,
|
|
)
|
|
if len(self.df) > 0:
|
|
self.df = self.apply(
|
|
self.df, self.algo, self.process_options, session_obj=self.session
|
|
)
|
|
if self.df is None:
|
|
messages.warn('Apply method did not return any results.')
|
|
self.df = pd.DataFrame()
|