You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Splunk_Deploiement/apps/Splunk_ML_Toolkit/bin/chunked_controller.py

244 lines
8.8 KiB

#!/usr/bin/env python
# Copyright (C) 2015-2019 Splunk Inc. All Rights Reserved.
import csv
import importlib
from io import StringIO
import pandas as pd
import cexc
from util.base_util import match_field_globs
from util.param_util import missing_keys_in_dict
from util.telemetry_util import log_sourcetype_inference
logger = cexc.get_logger(__name__)
messages = cexc.get_messages_logger()
# We don't want the sourcetype inference data to be logged too many times
# per search in order to limit the amount of data sent to the telemetry server.
_MAX_NUM_SOURCETYPE_LOGGED = 10
class ChunkedController(object):
"""The controller connects CEXC commands to ML-SPL processors.
- abstracting our interaction with the CEXC protocol and calling command
- converting CSVs to pandas DataFrames
- initializing and calling the processor's process method
- notifying rest end point after a model is saved
"""
def __init__(self, getinfo, controller_options):
"""Set a variety of attributes on self, call initializer methods.
Args:
getinfo (dict): getinfo metadata
controller_options (dict): controller options passed from caller
"""
# Attributes
self._csv_parse_time = 0.0
self._csv_render_time = 0.0
# Number of times the sourcetype inference info has been logged for this search.
self._num_sourcetype_logged = 0
# Check required searchinfo fields
required_searchinfo_fields = ('sid', 'splunkd_uri', 'session_key', 'app', 'username')
missing_keys = missing_keys_in_dict(required_searchinfo_fields, getinfo['searchinfo'])
if missing_keys:
logger.debug(
'searchinfo in getinfo missing the following keys: %s', ', '.join(missing_keys)
)
raise RuntimeError('Protocol error has occurred while instantiating the search')
# Parse options
self.getinfo, self.controller_options, process_options = self.split_options(
controller_options, getinfo
)
# Create our own searchinfo context with the information we need to pass to subsequent functions.
searchinfo = {k: self.getinfo['searchinfo'][k] for k in required_searchinfo_fields}
# initializer methods
self.processor = self.initialize_processor(
self.controller_options['processor'], process_options, searchinfo
)
self.resource_limits = self.get_resource_limits(self.processor)
self.body = None
@staticmethod
def split_options(options, getinfo):
"""Split apart controller and processor options.
Args:
options (dict): controller options
getinfo (dict): getinfo metadata
Returns:
getinfo (dict): the get info dictionary passed to controller
controller_options (dict): the controller options to be used
process_options (dict): the process options to pass to the processor
"""
# Default value
controller_options = {}
controller_options['use_processor_output'] = options.pop('apply', True)
controller_options['processor'] = options.pop('processor')
if 'model_name' in options:
controller_options['model_name'] = options.get('model_name')
if 'namespace' in options:
controller_options['namespace'] = options.get('namespace')
# Construct process option
process_options = options
process_options['tmp_dir'] = getinfo['searchinfo']['dispatch_dir']
if (
controller_options['processor'] == 'ApplyProcessor'
or controller_options['processor'] == 'ApplyOnnxProcessor'
):
process_options['dispatch_dir'] = getinfo['searchinfo']['dispatch_dir']
return getinfo, controller_options, process_options
@staticmethod
def initialize_processor(processor_name, process_options, searchinfo):
"""Import and initialize a processor.
Processors are stored in ./bin/processors/
The processors all inherit from the BaseProcessor class.
Args:
processor_name (str): processor name
process_options (dict): process options
searchinfo (dict): information required for search
Returns:
processor (object): initialized processor
"""
try:
processor_module = importlib.import_module('processors.{}'.format(processor_name))
processor_class = getattr(processor_module, processor_name)
except AttributeError:
logger.debug('Failed to import ML-SPL processor "%s"' % processor_name)
raise RuntimeError('Failed to import ML-SPL processor.')
try:
processor = processor_class(process_options, searchinfo)
except Exception as e:
cexc.log_traceback()
logger.debug(
'Error while initializing processor "%s": %s' % (processor_name, str(e))
)
raise RuntimeError(str(e))
return processor
@staticmethod
def parse_relevant_fields(sio, relevant_fields):
"""Get relevant fields from processor & parse them from StringIO to dataframe.
Args:
sio (StringIO): buffer
relevant_fields (list): list of field names
Returns:
df (dataframe): dataframe loaded from input data buffer
"""
dict_reader = csv.DictReader(sio)
input_fields = dict_reader.fieldnames
fields_to_parse = match_field_globs(input_fields, relevant_fields)
sio.seek(0)
return pd.read_csv(sio, usecols=[str(f) for f in fields_to_parse])
def get_required_fields(self):
"""Fetch required fields from processor's relevant fields.
Required fields are sent back during the getinfo exchange. This is how
Splunk knows which fields will be needed by our command and is akin
to setting required fields with the SPL fields command.
Returns:
required_fields (list): list of required fields
"""
return self.processor.get_relevant_fields()
@staticmethod
def get_resource_limits(processor):
"""Fetch resource limits from the processor.
Args:
processor (object): initialized processor
Returns:
resource_limits (dict): resource limits with keys such as
max_memory_usage
"""
try:
resource_limits = processor.resource_limits
except AttributeError:
processor_str = processor.__class__.__name__
logger.debug(
'No resource limits were loaded for ML-SPL processor "%s"' % processor_str
)
resource_limits = None
return resource_limits
def load_data(self, body):
"""Load fields from search results into a DataFrame.
In addition to literally converting the body into a data frame, note that
the processor also receives the dataframe here.
Args:
body (str): csv body chunk from CEXC process
"""
self.body = body
if len(body) != 0:
sio = StringIO(body)
with cexc.Timer() as csv_t:
if self.controller_options['use_processor_output']:
df = pd.read_csv(sio)
else:
relevant_fields = self.get_required_fields()
df = self.parse_relevant_fields(sio, relevant_fields)
self._csv_parse_time += csv_t.interval
logger.debug(
'chunk body=%d bytes, rows=%d, columns=%d, csv_read_time=%f',
len(body),
len(df),
len(df.columns),
csv_t.interval,
)
if self._num_sourcetype_logged < _MAX_NUM_SOURCETYPE_LOGGED:
if log_sourcetype_inference(df, row_idx=-1):
self._num_sourcetype_logged += 1
else:
df = pd.DataFrame()
self.processor.receive_input(df)
def execute(self):
"""Call the processor's process method."""
self.processor.process()
def output_results(self):
"""Get processor's output and convert to CSV string if do_apply.
If do_apply is set to false, then the body will not be converted.
Returns:
body (str): a CSV data payload for CEXC
"""
with cexc.Timer() as csv_t:
if self.controller_options['use_processor_output']:
body = self.processor.get_output().to_csv(index=False)
else:
body = self.body
self._csv_render_time += csv_t.interval
return body
def finalize(self):
"""Save model and notify REST endpoint about new lookup."""
if "model_name" in self.controller_options:
self.processor.save_model()