You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
332 lines
10 KiB
332 lines
10 KiB
#!/usr/bin/env python
|
|
"""Utility library for "chunked" custom search commands."""
|
|
|
|
from collections import OrderedDict
|
|
import csv
|
|
import logging
|
|
import sys
|
|
import time
|
|
import traceback
|
|
|
|
if sys.version_info >= (3, 0):
|
|
from io import StringIO
|
|
else:
|
|
from cStringIO import StringIO
|
|
|
|
from cexc import setup_logging
|
|
from util.chunk_util import read_chunk, write_chunk
|
|
|
|
logger = setup_logging.get_logger()
|
|
messages = logger.getChild('messages')
|
|
|
|
|
|
class CommandType(object):
|
|
"""Chunked Command Protocol command types"""
|
|
|
|
EVENTS = 'events'
|
|
REPORTING = 'reporting'
|
|
STREAMING = 'streaming'
|
|
STATEFUL = 'stateful'
|
|
|
|
|
|
def get_logger(name=None):
|
|
"""Returns a logger for internal messages."""
|
|
if name:
|
|
return logger.getChild(name)
|
|
else:
|
|
return logger
|
|
|
|
|
|
def get_messages_logger():
|
|
"""Returns a logger for user-visible messages."""
|
|
return messages
|
|
|
|
|
|
def log_traceback():
|
|
"""Logs a traceback. Useful in Exception handlers."""
|
|
logger.error(traceback.format_exc())
|
|
|
|
|
|
def abort(msg):
|
|
"""Helper method to abort gracefully with a user-visible message.
|
|
|
|
Do NOT use this method from within a running
|
|
BaseChunkHandler. Instead, raise an Exception or RuntimeError.
|
|
|
|
Invoke this function to gracefully exit a custom search command
|
|
before a BaseChunkHandler object has been created and run. You may
|
|
use this, for instance, if there is an exception during an import
|
|
in your __main__ module.
|
|
"""
|
|
AbortHandler.abort(msg)
|
|
|
|
|
|
class BaseChunkHandler(object):
|
|
"""Base class for custom search commands using the "chunked" protocol.
|
|
|
|
This is a low-level implementation. You are strongly encouraged to
|
|
use the Splunk Python SDK instead.
|
|
|
|
To write an external search command, extend this class, override
|
|
the handler() method, and invoke its run() method, e.g.:
|
|
|
|
class Handler(BaseChunkHandler):
|
|
def handler(self, metadata, data):
|
|
...
|
|
if __name__ == "__main__":
|
|
Handler().run()
|
|
|
|
run() will read a chunk from stdin, call handler() with the
|
|
metadata and data payloads, and write a chunk containing
|
|
handler()'s return value. It will continue doing this in a loop
|
|
until EOF is read.
|
|
|
|
Parameters
|
|
----------
|
|
handler_data : DATA_DICT | DATA_CSVROW | DATA_RAW
|
|
Specifies how/whether data payload should be parsed.
|
|
Defaults to DATA_DICT.
|
|
|
|
in_file, out_file, err_file : file
|
|
Files to use for input, output, and errors, respectively.
|
|
Defaults to sys.stdin.buffer, sys.stdout.buffer, sys.stderr.
|
|
N.B.: in_file must be a byte stream, where in_file.read(num) reads
|
|
num bytes (and not characters). This is because `body_len` in chunk
|
|
protocol specifies body length in bytes.
|
|
|
|
Attributes
|
|
----------
|
|
getinfo : dict, class attribute
|
|
Metadata from the getinfo exchange. Set when
|
|
action:getinfo is observed in _read_chunk().
|
|
|
|
"""
|
|
|
|
(
|
|
DATA_DICT, # parse data payload with csv.DictReader
|
|
DATA_CSVROW, # parse data payload with csv.reader
|
|
DATA_RAW, # don't parse data payload
|
|
) = list(range(3))
|
|
|
|
def __init__(
|
|
self,
|
|
handler_data=None,
|
|
in_file=sys.stdin.buffer,
|
|
out_file=sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else sys.stdout._buffer,
|
|
err_file=sys.stderr,
|
|
):
|
|
if handler_data:
|
|
self.handler_data = handler_data
|
|
else:
|
|
self.handler_data = self.DATA_DICT
|
|
self.in_file = in_file
|
|
self.out_file = out_file
|
|
self.err_file = err_file
|
|
self.getinfo = {}
|
|
|
|
# Unmangle line-endings in Windows.
|
|
|
|
# N.B. : Windows converts \n to \r such that transport headers do not
|
|
# get received correctly by the CEXC protocol. However, this is really
|
|
# only needed when the IO is actually an object with a file descriptor.
|
|
# Python 2 docs note that file-like objects that don't have real file
|
|
# descriptors should *not* implement a fileno method:
|
|
|
|
if sys.platform == "win32":
|
|
import os, msvcrt # pylint: disable=import-error
|
|
|
|
for file_like_object in [self.in_file, self.out_file, self.err_file]:
|
|
fileno = getattr(file_like_object, 'fileno', None)
|
|
if fileno is not None:
|
|
if callable(fileno):
|
|
try:
|
|
msvcrt.setmode(
|
|
file_like_object.fileno(), os.O_BINARY
|
|
) # pylint: disable=E1103 ; the Windows version of os has O_BINARY
|
|
except ValueError:
|
|
# This can be safely skipped, as it is raised
|
|
# from pytest which incorreclty implements a fileno
|
|
pass
|
|
|
|
# Logger instance for user-visible messages.
|
|
self.messages_logger = get_messages_logger()
|
|
self.messages_handler = logging.handlers.BufferingHandler(100000)
|
|
self.messages_logger.addHandler(self.messages_handler)
|
|
|
|
# Variables to track time spent in different chunk handling
|
|
# states.
|
|
self._read_time = 0.0
|
|
self._handle_time = 0.0
|
|
self._write_time = 0.0
|
|
|
|
self.controller_options = None
|
|
self.controller = None
|
|
self.watchdog = None
|
|
self.partial_fit = None
|
|
|
|
def run(self):
|
|
"""Handle chunks in a loop until EOF is read.
|
|
|
|
If an exception is raised during chunk handling, a chunk
|
|
indicating the error will be written and the process will exit.
|
|
"""
|
|
try:
|
|
while self._handle_chunk():
|
|
pass
|
|
except Exception as e:
|
|
if isinstance(e, RuntimeError):
|
|
error_message = str(e)
|
|
else:
|
|
error_message = '(%s) %s' % (type(e).__name__, e)
|
|
self.die(error_message)
|
|
|
|
def handler(self, metadata, body):
|
|
"""Default chunk handler, returns empty metadata and data payloads."""
|
|
return ({}, [])
|
|
|
|
def die(self, message, log_traceback=True):
|
|
"""Logs a message, writes a user-visible error, and exits."""
|
|
|
|
logger.error(message)
|
|
if log_traceback:
|
|
logger.error(traceback.format_exc())
|
|
|
|
metadata = {'finished': True, 'error': message}
|
|
|
|
# Insert inspector messages from messages_logger.
|
|
messages = self._pop_messages()
|
|
# Convert non-DEBUG messages to ERROR so the user can see them...
|
|
messages = [['ERROR', y] for x, y in messages if x != 'DEBUG']
|
|
|
|
if len(messages) > 0:
|
|
metadata.setdefault('inspector', {}).setdefault('messages', []).extend(messages)
|
|
|
|
# Sort the keys in reverse order! 'inspector' must come before 'error'.
|
|
metadata = OrderedDict([(k, metadata[k]) for k in sorted(metadata, reverse=True)])
|
|
|
|
write_chunk(self.out_file, metadata, '')
|
|
sys.exit(1)
|
|
|
|
def _handle_chunk(self):
|
|
"""Handle (read, process, write) a chunk."""
|
|
with Timer() as t:
|
|
ret = read_chunk(self.in_file)
|
|
if not ret:
|
|
return False # EOF
|
|
|
|
metadata, body = ret
|
|
|
|
if self.handler_data == self.DATA_DICT:
|
|
assert isinstance(body, str)
|
|
body = list(csv.DictReader(StringIO(body)))
|
|
elif self.handler_data == self.DATA_CSVROW:
|
|
assert isinstance(body, str)
|
|
body = list(csv.reader(StringIO(body)))
|
|
elif self.handler_data == self.DATA_RAW:
|
|
pass
|
|
|
|
# Cache a copy of the getinfo metadata.
|
|
if metadata.get('action', None) == 'getinfo':
|
|
self.getinfo = dict(metadata)
|
|
|
|
self._read_time += t.interval
|
|
|
|
with Timer() as t:
|
|
# Invoke handler. Hopefully someone overloaded it!
|
|
ret = self.handler(metadata, body)
|
|
|
|
if isinstance(ret, dict):
|
|
metadata, body = ret, None
|
|
else:
|
|
try:
|
|
metadata, body = ret
|
|
except:
|
|
raise TypeError(
|
|
"Handler must return (metadata, body), got: %.128s" % repr(ret)
|
|
)
|
|
|
|
# Insert inspector messages from messages_logger.
|
|
messages = self._pop_messages()
|
|
if len(messages) > 0:
|
|
metadata.setdefault('inspector', {}).setdefault('messages', []).extend(messages)
|
|
|
|
self._handle_time += t.interval
|
|
|
|
with Timer() as t:
|
|
if body is not None and len(body) > 0:
|
|
sio = StringIO()
|
|
|
|
if self.handler_data == self.DATA_DICT:
|
|
assert hasattr(body, '__iter__')
|
|
|
|
keys = set()
|
|
for r in body:
|
|
keys.update(list(r.keys()))
|
|
|
|
writer = csv.DictWriter(sio, fieldnames=list(keys))
|
|
writer.writeheader()
|
|
|
|
for r in body:
|
|
writer.writerow(r)
|
|
body = sio.getvalue()
|
|
|
|
elif self.handler_data == self.DATA_CSVROW:
|
|
writer = csv.writer(sio)
|
|
for r in body:
|
|
writer.writerow(r)
|
|
body = sio.getvalue()
|
|
elif self.handler_data == self.DATA_RAW:
|
|
pass
|
|
|
|
assert isinstance(body, str)
|
|
|
|
else:
|
|
body = ''
|
|
|
|
write_chunk(self.out_file, metadata, body)
|
|
|
|
self._write_time += t.interval
|
|
|
|
return True
|
|
|
|
def _pop_messages(self):
|
|
"""Drain logging.MemoryHandler holding user-visible messages."""
|
|
messages = []
|
|
for r in self.messages_handler.buffer:
|
|
# Map message levels to Splunk equivalents.
|
|
level = {
|
|
'DEBUG': 'DEBUG',
|
|
'INFO': 'INFO',
|
|
'WARNING': 'WARN',
|
|
'ERROR': 'ERROR',
|
|
'CRITICAL': 'ERROR',
|
|
}[r.levelname]
|
|
messages.append([level, r.message])
|
|
|
|
self.messages_handler.flush()
|
|
return messages
|
|
|
|
|
|
class AbortHandler(BaseChunkHandler):
|
|
def __init__(self, msg):
|
|
self.msg = msg
|
|
super(AbortHandler, self).__init__()
|
|
|
|
def handler(self, metadata, body):
|
|
raise RuntimeError(self.msg)
|
|
|
|
@classmethod
|
|
def abort(cls, msg):
|
|
cls(msg).run()
|
|
sys.exit(1)
|
|
|
|
|
|
class Timer:
|
|
def __enter__(self):
|
|
self.start = time.process_time()
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
self.end = time.process_time()
|
|
self.interval = self.end - self.start
|