You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
148 lines
5.5 KiB
148 lines
5.5 KiB
#!/usr/bin/env python
|
|
# Copyright (C) 2015-2019 Splunk Inc. All Rights Reserved.
|
|
from exec_anaconda import exec_anaconda_or_die
|
|
|
|
exec_anaconda_or_die()
|
|
|
|
import json
|
|
import re
|
|
import sys
|
|
import cexc
|
|
from cexc import BaseChunkHandler, CommandType
|
|
|
|
from util.rest_proxy import rest_proxy_from_searchinfo
|
|
from util import command_util
|
|
from util.searchinfo_util import searchinfo_from_cexc
|
|
from util.param_util import parse_args
|
|
from util.rest_url_util import make_splunk_url
|
|
from util.experiment_util import get_experiment_by_id, get_history_fields_from_experiment
|
|
from experiment.evaluation_metrics import compute_statistics, get_statistics_metadata
|
|
|
|
logger = cexc.get_logger('logexperiment')
|
|
messages = cexc.get_messages_logger()
|
|
|
|
|
|
class LogExperimentCommand(BaseChunkHandler):
|
|
"""LogExperimentCommand logs the results of an experiment."""
|
|
|
|
def __init__(
|
|
self,
|
|
handler_data=None,
|
|
in_file=sys.stdin.buffer,
|
|
out_file=sys.stdout.buffer,
|
|
err_file=sys.stderr,
|
|
):
|
|
super(LogExperimentCommand, self).__init__(handler_data, in_file, out_file, err_file)
|
|
self.exp_id = None
|
|
self.app = None
|
|
self.searchinfo = None
|
|
self.experiment = None
|
|
self.exp_metadata_list = []
|
|
|
|
@staticmethod
|
|
def handle_arguments(getinfo):
|
|
"""Take the getinfo metadata and return controller_options.
|
|
|
|
Args:
|
|
getinfo (dict): getinfo metadata
|
|
|
|
Returns:
|
|
controller_options (dict): options to be sent to controller
|
|
"""
|
|
options = parse_args(getinfo['searchinfo']['args'])
|
|
|
|
if options.get('params') is None or options['params'].get('id') is None:
|
|
raise RuntimeError('Experiment ID must be specified, e.g: logexperiment id=... ')
|
|
|
|
return options
|
|
|
|
def setup(self):
|
|
"""Parse search string and choose processor.
|
|
|
|
Returns:
|
|
(dict): get info response (command type) and required fields. This
|
|
response will be sent back to the CEXC process on the getinfo
|
|
exchange (first chunk) to establish our execution type and
|
|
required fields.
|
|
"""
|
|
options = self.handle_arguments(self.getinfo)
|
|
self.exp_id = options['params']['id']
|
|
|
|
# The 'app' argument value is needed to correctly locate the experiment
|
|
# as it may be a different app than the current app context this is invoked from.
|
|
# By default, it's the current app the command is executed from, but override it
|
|
# if the user specified a value for the app arg.
|
|
app = options['params'].get("app")
|
|
if app:
|
|
self.searchinfo["app"] = app
|
|
|
|
return {'type': CommandType.REPORTING}
|
|
|
|
@staticmethod
|
|
def _from_schedule(sid):
|
|
# Assume no realtime data (realtime searches start with 'rt_scheduler__').
|
|
return re.match(r'scheduler__', sid) is not None
|
|
|
|
def handler(self, metadata, body):
|
|
"""Main handler we override from BaseChunkHandler.
|
|
|
|
Handles the reading and writing of data to the CEXC process, and
|
|
finishes negotiation of the termination of the process.
|
|
|
|
Args:
|
|
metadata (dict): metadata information
|
|
body (str): data payload from CEXC
|
|
|
|
Returns:
|
|
(dict): metadata to be sent back to CEXC
|
|
output_body (str): data payload to be sent back to CEXC
|
|
"""
|
|
if command_util.should_early_return(metadata):
|
|
return {'type': CommandType.REPORTING}
|
|
|
|
if command_util.is_getinfo_chunk(metadata):
|
|
self.searchinfo = searchinfo_from_cexc(metadata['searchinfo'], extra_fields=['sid'])
|
|
return self.setup()
|
|
|
|
# Save info we need to calculate stats when we process the final chunk.
|
|
if self.experiment is None:
|
|
rest_proxy = rest_proxy_from_searchinfo(self.searchinfo)
|
|
self.experiment = get_experiment_by_id(rest_proxy, self.exp_id)
|
|
|
|
self.exp_metadata_list.append(get_statistics_metadata(self.experiment, body))
|
|
|
|
finished_flag = metadata.get('finished', False)
|
|
if finished_flag:
|
|
rest_proxy = rest_proxy_from_searchinfo(self.searchinfo)
|
|
experiment = get_experiment_by_id(rest_proxy, self.exp_id)
|
|
experiment_history = get_history_fields_from_experiment(experiment)
|
|
sid = self.searchinfo['sid']
|
|
json_body = {'sid': sid, 'from_schedule': self._from_schedule(sid)}
|
|
# Update json_body with experiment history
|
|
json_body.update(experiment_history)
|
|
|
|
# Update the json body with statistics. If statistics can't be computed, update with empty dictionary.
|
|
statistics_dict = compute_statistics(self.exp_metadata_list)
|
|
json_body.update(statistics_dict)
|
|
|
|
# Send json_body to the history store
|
|
url = make_splunk_url(
|
|
rest_proxy,
|
|
'user',
|
|
extra_url_parts=['mltk', 'experiments', self.exp_id, 'history'],
|
|
)
|
|
reply = rest_proxy.make_rest_call('POST', url, jsonargs=json.dumps(json_body))
|
|
if not reply['success']:
|
|
content = reply['content']
|
|
logger.warn(content)
|
|
raise RuntimeError(json.loads(content)['messages'][0]['text'])
|
|
|
|
# Our final farewell
|
|
return {'finished': finished_flag}, body
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logger.debug("Starting logexperiment.py.")
|
|
LogExperimentCommand(handler_data=BaseChunkHandler.DATA_RAW).run()
|
|
logger.debug("Exiting gracefully. Byee!!")
|