You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1027 lines
40 KiB
1027 lines
40 KiB
'''
|
|
# Copyright (C) 2005-2024 Splunk Inc. All Rights Reserved.
|
|
'''
|
|
import getpass
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
import xml.dom
|
|
from xml.dom.minidom import Document
|
|
import xml.sax.saxutils
|
|
|
|
import splunk
|
|
import splunk.clilib
|
|
import splunk.version
|
|
from splunk.models.app import App
|
|
from splunk.clilib.bundle_paths import get_slaveapps_base_path
|
|
try:
|
|
from splunk.clilib.bundle_paths import make_splunkhome_path
|
|
except ImportError:
|
|
from splunk.appserver.mrsparkle.lib.util import make_splunkhome_path
|
|
|
|
if 'slave' in splunk.clilib.cli_common.getMergedConf('server').get('clustering', {}).get('mode'):
|
|
sys.path.append(os.path.join(get_slaveapps_base_path(), "SA-Hydra-inframon", "lib"))
|
|
else:
|
|
sys.path.append(make_splunkhome_path(["etc", "apps", "SA-Hydra-inframon", "lib"]))
|
|
|
|
from SolnCommon import log
|
|
from .fields import BooleanField
|
|
from .fields import DurationField
|
|
from .fields import Field
|
|
from .fields import FieldValidationException
|
|
from .fields import FloatField
|
|
from .fields import IntegerField
|
|
from .fields import IntervalField
|
|
from .fields import ListField
|
|
from .fields import RangeField
|
|
from .fields import RegexField
|
|
|
|
# Define logger using the name of the script here, versus in the modular_input class.
|
|
logger = log.setup_logger(name='python_modular_input', level=logging.INFO)
|
|
|
|
|
|
class ModularInputConfig(object):
|
|
|
|
def __init__(self, server_host, server_uri, session_key, checkpoint_dir, configuration):
|
|
self.server_host = server_host
|
|
self.server_uri = server_uri
|
|
self.session_key = session_key
|
|
self.checkpoint_dir = checkpoint_dir
|
|
self.configuration = configuration
|
|
|
|
def __str__(self):
|
|
attrs = ['server_host', 'server_uri', 'session_key', 'checkpoint_dir', 'configuration']
|
|
return str({attr: str(getattr(self, attr)) for attr in attrs})
|
|
|
|
@staticmethod
|
|
def get_text(node, default=None):
|
|
"""
|
|
Get the value of the text in the first node under the given node.
|
|
|
|
Arguments:
|
|
node -- The node that should have a text node under it.
|
|
default -- The default text that ought to be returned if no text node could be found (defaults to none).
|
|
"""
|
|
|
|
if node and node.firstChild and node.firstChild.nodeType == node.firstChild.TEXT_NODE:
|
|
return node.firstChild.data
|
|
else:
|
|
return default
|
|
|
|
@staticmethod
|
|
def get_config_from_xml(config_str_xml):
|
|
"""
|
|
Get the config from the given XML and return a ModularInputConfig instance.
|
|
|
|
Arguments:
|
|
config_str_xml -- A string of XML that represents the configuration provided by Splunk.
|
|
"""
|
|
configuration = {}
|
|
|
|
# Parse the document
|
|
doc = xml.dom.minidom.parseString(config_str_xml)
|
|
root = doc.documentElement
|
|
|
|
# Get the server_host
|
|
server_host_node = root.getElementsByTagName("server_host")[0]
|
|
server_host = ModularInputConfig.get_text(server_host_node)
|
|
|
|
# Get the server_uri
|
|
server_uri_node = root.getElementsByTagName("server_uri")[0]
|
|
server_uri = ModularInputConfig.get_text(server_uri_node)
|
|
|
|
# Get the session_key
|
|
session_key_node = root.getElementsByTagName("session_key")[0]
|
|
session_key = ModularInputConfig.get_text(session_key_node)
|
|
|
|
# Get the checkpoint directory
|
|
checkpoint_node = root.getElementsByTagName("checkpoint_dir")[0]
|
|
checkpoint_dir = ModularInputConfig.get_text(checkpoint_node)
|
|
|
|
# Parse the config
|
|
conf_node = root.getElementsByTagName("configuration")[0]
|
|
|
|
if conf_node:
|
|
|
|
for stanza in conf_node.getElementsByTagName("stanza"):
|
|
config = {}
|
|
|
|
if stanza:
|
|
stanza_name = stanza.getAttribute("name")
|
|
|
|
if stanza_name:
|
|
config["name"] = stanza_name
|
|
|
|
params = stanza.getElementsByTagName("param")
|
|
|
|
for param in params:
|
|
param_name = param.getAttribute("name")
|
|
|
|
config[param_name] = ModularInputConfig.get_text(param)
|
|
|
|
configuration[stanza_name] = config
|
|
|
|
return ModularInputConfig(server_host, server_uri, session_key, checkpoint_dir, configuration)
|
|
|
|
|
|
class ModularInput(object):
|
|
|
|
# These arguments cover the standard fields that are always supplied
|
|
standard_args = [
|
|
BooleanField("disabled", "Disabled", "Whether the modular input is disabled or not"),
|
|
Field("host", "Host", "The host that is running the input"),
|
|
Field("index", "Index", "The index that data should be sent to"),
|
|
IntervalField("interval", "Interval", "The interval the script will be run on"),
|
|
Field("name", "Stanza name", "The name of the stanza for this modular input"),
|
|
Field("source", "Source", "The source for events created by this modular input"),
|
|
Field("sourcetype", "Stanza name", "The name of the stanza for this modular input"),
|
|
Field("python.version", "Python version", "Python version on which the ModularInput script would run")
|
|
]
|
|
|
|
checkpoint_dir = None
|
|
|
|
def _is_valid_param(self, name, val):
|
|
'''Raise an error if the parameter is None or empty.'''
|
|
if val is None:
|
|
raise ValueError("The {0} parameter cannot be none".format(name))
|
|
|
|
if len(val.strip()) == 0:
|
|
raise ValueError("The {0} parameter cannot be empty".format(name))
|
|
|
|
return val
|
|
|
|
def _create_formatter_textnode(self, xmldoc, nodename, value):
|
|
'''Shortcut for creating a formatter textnode.
|
|
|
|
Arguments:
|
|
xmldoc - A Document object.
|
|
nodename - A string name for the node.
|
|
'''
|
|
node = xmldoc.createElement(nodename)
|
|
text = xmldoc.createTextNode(str(value))
|
|
node.appendChild(text)
|
|
|
|
return node
|
|
|
|
def _create_document(self):
|
|
'''Create the document for sending XML streaming events.'''
|
|
|
|
doc = Document()
|
|
|
|
# Create the <stream> base element
|
|
stream = doc.createElement('stream')
|
|
doc.appendChild(stream)
|
|
|
|
return doc
|
|
|
|
def _create_event(self, doc, params, stanza, unbroken=False, close=True):
|
|
'''Create an event for XML streaming output.
|
|
|
|
Arguments:
|
|
doc - a Document object.
|
|
params - a dictionary of attributes for the event.
|
|
stanza_name - the stanza
|
|
'''
|
|
|
|
# Create the <event> base element
|
|
event = doc.createElement('event')
|
|
|
|
# Indicate if this event is to be unbroken (meaning a </done> tag will
|
|
# need to be added by a future event.
|
|
if unbroken:
|
|
event.setAttribute('unbroken', '1')
|
|
|
|
# Indicate if this script is single-instance mode or not.
|
|
if self.streaming_mode == 'true':
|
|
event.setAttribute('stanza', stanza)
|
|
|
|
# Define the possible elements
|
|
valid_elements = ['host', 'index', 'source', 'sourcetype', 'time', 'data']
|
|
|
|
# Append the valid child elements. Invalid elements will be dropped.
|
|
for element in filter(lambda x: x in valid_elements, params.keys()):
|
|
event.appendChild(self._create_formatter_textnode(doc, element, params[element]))
|
|
|
|
if close:
|
|
event.appendChild(doc.createElement('done'))
|
|
|
|
return event
|
|
|
|
def _print_event(self, doc, event):
|
|
'''Adds an event to XML streaming output.'''
|
|
|
|
# Get the stream from the document.
|
|
stream = doc.firstChild
|
|
|
|
# Append the event.
|
|
stream.appendChild(event)
|
|
|
|
# Return the content as a string WITHOUT the XML header; remove the
|
|
# child object so the next event can be returned and reuse the same
|
|
# Document object.
|
|
output = doc.documentElement.toxml()
|
|
|
|
stream.removeChild(event)
|
|
|
|
return output
|
|
|
|
def _add_events(self, doc, events):
|
|
'''Adds a set of events to XML streaming output.'''
|
|
|
|
# Get the stream from the document.
|
|
stream = doc.firstChild
|
|
|
|
# Add the <event> node.
|
|
for event in events:
|
|
stream.appendChild(event)
|
|
|
|
# Return the content as a string WITHOUT the XML header.
|
|
return doc.documentElement.toxml()
|
|
|
|
def __init__(self, scheme_args, args=None):
|
|
"""
|
|
Set up the modular input.
|
|
|
|
Arguments:
|
|
title -- The title of the modular input (e.g. "Database Connector")
|
|
description -- A description of the input (e.g. "Get data from a database")
|
|
args -- A list of Field instances for validating the arguments
|
|
"""
|
|
|
|
# Set the scheme arguments.
|
|
for arg in ['title', 'description', 'use_external_validation', 'streaming_mode', 'use_single_instance']:
|
|
setattr(self, arg, self._is_valid_param(arg, scheme_args.get(arg)))
|
|
|
|
if args is None:
|
|
self.args = []
|
|
else:
|
|
self.args = args[:]
|
|
|
|
def addArg(self, arg):
|
|
"""
|
|
Add a given argument to the list of arguments.
|
|
|
|
Arguments:
|
|
arg -- An instance of Field that represents an argument.
|
|
"""
|
|
|
|
if self.args is None:
|
|
self.args = []
|
|
|
|
self.args.append(arg)
|
|
|
|
def usage(self, out=sys.stdout):
|
|
"""
|
|
Print a usage statement.
|
|
|
|
Arguments:
|
|
out -- The stream to write the message to (defaults to standard output)
|
|
"""
|
|
|
|
out.write("usage: %s [--scheme|--validate-arguments]\n")
|
|
|
|
def do_scheme(self, out=sys.stdout):
|
|
"""
|
|
Get the scheme and write it out to standard output.
|
|
|
|
Arguments:
|
|
out -- The stream to write the message to (defaults to standard output)
|
|
"""
|
|
|
|
logger.info("Modular input: scheme requested")
|
|
out.write(self.get_scheme())
|
|
|
|
return True
|
|
|
|
def get_scheme(self):
|
|
"""
|
|
Get the scheme of the inputs parameters and return as a string.
|
|
"""
|
|
|
|
# Create the XML document
|
|
doc = Document()
|
|
|
|
# Create the <scheme> base element
|
|
element_scheme = doc.createElement("scheme")
|
|
doc.appendChild(element_scheme)
|
|
|
|
# Create the title element
|
|
element_title = doc.createElement("title")
|
|
element_scheme.appendChild(element_title)
|
|
|
|
element_title_text = doc.createTextNode(self.title)
|
|
element_title.appendChild(element_title_text)
|
|
|
|
# Create the description element
|
|
element_desc = doc.createElement("description")
|
|
element_scheme.appendChild(element_desc)
|
|
|
|
element_desc_text = doc.createTextNode(self.description)
|
|
element_desc.appendChild(element_desc_text)
|
|
|
|
# Create the use_external_validation element
|
|
element_external_validation = doc.createElement("use_external_validation")
|
|
element_scheme.appendChild(element_external_validation)
|
|
|
|
element_external_validation_text = doc.createTextNode(self.use_external_validation)
|
|
element_external_validation.appendChild(element_external_validation_text)
|
|
|
|
# Create the streaming_mode element
|
|
element_streaming_mode = doc.createElement("streaming_mode")
|
|
element_scheme.appendChild(element_streaming_mode)
|
|
|
|
element_streaming_mode_text = doc.createTextNode(self.streaming_mode)
|
|
element_streaming_mode.appendChild(element_streaming_mode_text)
|
|
|
|
# Create the use_single_instance element
|
|
element_use_single_instance = doc.createElement("use_single_instance")
|
|
element_scheme.appendChild(element_use_single_instance)
|
|
|
|
element_use_single_instance_text = doc.createTextNode(self.use_single_instance)
|
|
element_use_single_instance.appendChild(element_use_single_instance_text)
|
|
|
|
# Create the elements to stored args element
|
|
element_endpoint = doc.createElement("endpoint")
|
|
element_scheme.appendChild(element_endpoint)
|
|
|
|
element_args = doc.createElement("args")
|
|
element_endpoint.appendChild(element_args)
|
|
|
|
# Create the argument elements
|
|
self.add_xml_args(doc, element_args)
|
|
|
|
# Return the content as a string
|
|
return doc.toxml()
|
|
|
|
def add_xml_args(self, doc, element_args):
|
|
"""
|
|
Add the arguments to the XML scheme.
|
|
|
|
Arguments:
|
|
doc -- The XML document
|
|
element_args -- The element that should be the parent of the arg elements that will be added.
|
|
"""
|
|
|
|
for arg in self.args:
|
|
element_arg = doc.createElement("arg")
|
|
element_arg.setAttribute("name", arg.name)
|
|
|
|
element_args.appendChild(element_arg)
|
|
|
|
# Create the title element
|
|
element_title = doc.createElement("title")
|
|
element_arg.appendChild(element_title)
|
|
|
|
element_title_text = doc.createTextNode(arg.title)
|
|
element_title.appendChild(element_title_text)
|
|
|
|
# Create the description element
|
|
element_desc = doc.createElement("description")
|
|
element_arg.appendChild(element_desc)
|
|
|
|
element_desc_text = doc.createTextNode(arg.description)
|
|
element_desc.appendChild(element_desc_text)
|
|
|
|
# Create the data_type element
|
|
element_data_type = doc.createElement("data_type")
|
|
element_arg.appendChild(element_data_type)
|
|
|
|
element_data_type_text = doc.createTextNode(arg.get_data_type())
|
|
element_data_type.appendChild(element_data_type_text)
|
|
|
|
# Create the required_on_create element
|
|
element_required_on_create = doc.createElement("required_on_create")
|
|
element_arg.appendChild(element_required_on_create)
|
|
|
|
element_required_on_create_text = doc.createTextNode("true" if arg.required_on_create else "false")
|
|
element_required_on_create.appendChild(element_required_on_create_text)
|
|
|
|
# Create the required_on_save element
|
|
element_required_on_edit = doc.createElement("required_on_edit")
|
|
element_arg.appendChild(element_required_on_edit)
|
|
|
|
element_required_on_edit_text = doc.createTextNode("true" if arg.required_on_edit else "false")
|
|
element_required_on_edit.appendChild(element_required_on_edit_text)
|
|
|
|
def do_validation(self, in_stream=sys.stdin):
|
|
"""
|
|
Get the validation data from standard input and attempt to validate it. Returns true if the arguments validated, false otherwise.
|
|
|
|
Arguments:
|
|
in_stream -- The stream to get the input from (defaults to standard input)
|
|
"""
|
|
|
|
data = self.get_validation_data(in_stream)
|
|
|
|
try:
|
|
self.validate(data)
|
|
return True
|
|
except FieldValidationException as e:
|
|
self.print_error(str(e))
|
|
return False
|
|
|
|
def validate(self, arguments):
|
|
"""
|
|
Validate the argument dictionary where each key is a stanza.
|
|
|
|
Arguments:
|
|
arguments -- The arguments as an dictionary where the key is the stanza and the value is a dictionary of the values.
|
|
"""
|
|
|
|
# Check each stanza
|
|
self.validate_parameters(arguments)
|
|
return True
|
|
|
|
def validate_parameters(self, parameters):
|
|
"""
|
|
Validate the parameter set for a stanza and returns a dictionary of cleaner parameters.
|
|
|
|
Arguments:
|
|
parameters -- The list of parameters
|
|
"""
|
|
|
|
cleaned_params = {}
|
|
|
|
# Append the arguments list such that the standard fields that Splunk provides are included
|
|
all_args = {}
|
|
|
|
for a in self.standard_args:
|
|
all_args[a.name] = a
|
|
|
|
for a in self.args:
|
|
all_args[a.name] = a
|
|
|
|
# Convert and check the parameters
|
|
for name, value in parameters.items():
|
|
|
|
# If the argument was found, then validate and convert it
|
|
if name in all_args:
|
|
cleaned_params[name] = all_args[name].to_python(value)
|
|
|
|
# Throw an exception if the argument could not be found
|
|
else:
|
|
raise FieldValidationException("The parameter '%s' is not a valid argument" % (name))
|
|
|
|
return cleaned_params
|
|
|
|
def print_error(self, error, out=sys.stdout):
|
|
"""
|
|
Prints the given error message to standard output.
|
|
|
|
Arguments:
|
|
error -- The message to be printed
|
|
out -- The stream to write the message to (defaults to standard output)
|
|
"""
|
|
|
|
out.write("<error><message>%s</message></error>" % xml.sax.saxutils.escape(error))
|
|
|
|
def read_config(self, in_stream=sys.stdin):
|
|
"""
|
|
Read the config from standard input and return the configuration.
|
|
|
|
in_stream -- The stream to get the input from (defaults to standard input)
|
|
"""
|
|
|
|
config_str_xml = in_stream.read()
|
|
|
|
return ModularInputConfig.get_config_from_xml(config_str_xml)
|
|
|
|
def run(self, stanza, cleaned_params):
|
|
"""
|
|
Run the input using the arguments provided.
|
|
|
|
Arguments:
|
|
stanza -- The name of the stanza
|
|
cleaned_params -- The arguments following validation and conversion to Python objects.
|
|
"""
|
|
|
|
raise Exception("Run function was not implemented")
|
|
|
|
@staticmethod
|
|
def get_file_path(checkpoint_dir, stanza):
|
|
"""
|
|
Get the path to the checkpoint file.
|
|
|
|
Arguments:
|
|
checkpoint_dir -- The directory where checkpoints ought to be saved
|
|
stanza -- The stanza of the input being used
|
|
"""
|
|
|
|
return os.path.join(checkpoint_dir, hashlib.sha1(stanza).hexdigest() + ".json")
|
|
|
|
@classmethod
|
|
def last_ran(cls, checkpoint_dir, stanza):
|
|
"""
|
|
Determines the date that the input was last run (the input denoted by the stanza name).
|
|
|
|
Arguments:
|
|
checkpoint_dir -- The directory where checkpoints ought to be saved
|
|
stanza -- The stanza of the input being used
|
|
"""
|
|
|
|
fp = None
|
|
|
|
try:
|
|
fp = open(cls.get_file_path(checkpoint_dir, stanza))
|
|
checkpoint_dict = json.load(fp)
|
|
|
|
return checkpoint_dict['last_run']
|
|
|
|
finally:
|
|
if fp is not None:
|
|
fp.close()
|
|
|
|
@classmethod
|
|
def needs_another_run(cls, checkpoint_dir, stanza, interval, cur_time=None):
|
|
"""
|
|
Determines if the given input (denoted by the stanza name) ought to be executed.
|
|
|
|
Arguments:
|
|
checkpoint_dir -- The directory where checkpoints ought to be saved
|
|
stanza -- The stanza of the input being used
|
|
interval -- The frequency that the analysis ought to be performed
|
|
cur_time -- The current time (will be automatically determined if not provided)
|
|
"""
|
|
|
|
try:
|
|
last_ran = cls.last_ran(checkpoint_dir, stanza)
|
|
|
|
return cls.is_expired(last_ran, interval, cur_time)
|
|
|
|
except IOError as e:
|
|
# The file likely doesn't exist
|
|
logger.exception("The checkpoint file likely doesn't exist")
|
|
return True
|
|
except ValueError as e:
|
|
# The file could not be loaded
|
|
logger.exception("The checkpoint file could not be loaded")
|
|
return True
|
|
except Exception as e:
|
|
#Catch all that enforces an extra run
|
|
logger.exception("Unexpected exception caught, enforcing extra run, exception info: " + str(e))
|
|
return True
|
|
# Default return value
|
|
return True
|
|
|
|
@classmethod
|
|
def time_to_next_run(cls, checkpoint_dir, stanza, duration):
|
|
"""
|
|
Returns the number of seconds as int until the next run of the input is expected.
|
|
Note that a minimum of 1 second is enforced to avoid a python loop of death
|
|
constricting the system in rare checkpoint dir failure scenarios.
|
|
Snake pun entirely intentional (pythons constrict prey to death, like your cpu).
|
|
|
|
Arguments:
|
|
checkpoint_dir -- The directory where checkpoints ought to be saved
|
|
stanza -- The stanza of the input being used
|
|
duration -- The frequency that the analysis ought to be performed
|
|
"""
|
|
|
|
try:
|
|
last_ran = cls.last_ran(checkpoint_dir, stanza)
|
|
last_target_time = last_ran + duration
|
|
time_to_next = last_target_time - time.time()
|
|
if time_to_next < 1:
|
|
time_to_next = 1
|
|
return time_to_next
|
|
except IOError:
|
|
# The file likely doesn't exist
|
|
logger.warning("Could not read checkpoint file for last time run, likely does not exist, if this persists debug input immediately")
|
|
return 1
|
|
except ValueError:
|
|
# The file could not be loaded
|
|
logger.error("Could not read checkpoint file for last time run, if this persists debug input immediately")
|
|
return 1
|
|
except Exception as e:
|
|
logger.exception("Unexpected exception caught, enforcing extra run, exception info: " + str(e))
|
|
return 1
|
|
# Default return value
|
|
logger.info("This really should be impossible, but whatevs if your input is breaking check the duration calculations")
|
|
return 1
|
|
|
|
@classmethod
|
|
def save_checkpoint(cls, checkpoint_dir, stanza, last_run):
|
|
"""
|
|
Save the checkpoint state.
|
|
|
|
Arguments:
|
|
checkpoint_dir -- The directory where checkpoints ought to be saved
|
|
stanza -- The stanza of the input being used
|
|
last_run -- The time when the analysis was last performed
|
|
"""
|
|
|
|
fp = None
|
|
|
|
try:
|
|
fp = open(cls.get_file_path(checkpoint_dir, stanza), 'w')
|
|
|
|
d = {'last_run': last_run}
|
|
|
|
json.dump(d, fp)
|
|
|
|
except Exception:
|
|
logger.exception("Failed to save checkpoint directory")
|
|
finally:
|
|
if fp is not None:
|
|
fp.close()
|
|
|
|
@staticmethod
|
|
def is_expired(last_run, interval, cur_time=None):
|
|
"""
|
|
Indicates if the last run time is expired based .
|
|
|
|
Arguments:
|
|
last_run -- The time that the analysis was last done
|
|
interval -- The interval that the analysis ought to be done (as an integer)
|
|
cur_time -- The current time (will be automatically determined if not provided)
|
|
"""
|
|
|
|
if cur_time is None:
|
|
cur_time = time.time()
|
|
|
|
if (last_run + interval) < cur_time:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def checkpoint_data_exists(self, filename, checkpoint_dir=None):
|
|
'''Returns True if a checkpoint file exists with the given filename.'''
|
|
checkpoint_dir = checkpoint_dir or self._input_config.checkpoint_dir
|
|
return os.path.isfile(os.path.join(checkpoint_dir, filename))
|
|
|
|
def delete_checkpoint_data(self, filename, checkpoint_dir=None):
|
|
"""
|
|
Delete arbitrary checkpoint data.
|
|
|
|
Arguments:
|
|
filename -- The name of the file to create in the checkpoint directory.
|
|
checkpoint_dir -- The directory where checkpoints ought to be saved. Should
|
|
be set only if the intent is to read data from the checkpoint directory
|
|
of a different modular input.
|
|
|
|
Returns:
|
|
True if the data is successfully saved, False otherwise.
|
|
"""
|
|
checkpoint_dir = checkpoint_dir or self._input_config.checkpoint_dir
|
|
try:
|
|
os.unlink(os.path.join(checkpoint_dir, filename))
|
|
return True
|
|
except IOError:
|
|
logger.exception('msg="IOError exception when deleting checkpoint data" checkpoint_dir="{}" filename="{}"'.format(checkpoint_dir, filename))
|
|
return False
|
|
|
|
def set_checkpoint_data(self, filename, data, checkpoint_dir=None):
|
|
"""
|
|
Save arbitrary checkpoint data as JSON.
|
|
|
|
Arguments:
|
|
filename -- The name of the file to create in the checkpoint directory.
|
|
data -- A Python data structure that can be converted to JSON.
|
|
checkpoint_dir -- The directory where checkpoints ought to be saved. Should
|
|
be set only if the intent is to read data from the checkpoint directory
|
|
of a different modular input.
|
|
|
|
Returns:
|
|
True if the data is successfully saved, False otherwise.
|
|
Throws:
|
|
IOError if the checkpoint cannot be saved.
|
|
|
|
Note: The caller is repsonsible for ensuring that the filename does not
|
|
clash with other uses.
|
|
"""
|
|
checkpoint_dir = checkpoint_dir or self._input_config.checkpoint_dir
|
|
|
|
success = False
|
|
try:
|
|
with open(os.path.join(checkpoint_dir, filename), 'w') as fp:
|
|
json.dump(data, fp)
|
|
success = True
|
|
except IOError:
|
|
logger.exception('msg="IOError exception when saving checkpoint data" checkpoint_dir="{}" filename="{}"'.format(checkpoint_dir, filename))
|
|
except ValueError:
|
|
logger.exception('msg="ValueError when saving checkpoint data (perhaps invalid JSON)" checkpoint_dir="{}" filename="{}"'.format(checkpoint_dir, filename))
|
|
except Exception:
|
|
logger.exception('msg="Unknown exception when saving checkpoint data" checkpoint_dir="{}" filename="{}"'.format(checkpoint_dir, filename))
|
|
return success
|
|
|
|
def get_checkpoint_data(self, filename, checkpoint_dir=None, raise_known_exceptions=False):
|
|
"""
|
|
Get arbitrary checkpoint data from JSON.
|
|
|
|
Arguments:
|
|
filename -- The name of the file to retrieve in the checkpoint directory.
|
|
checkpoint_dir -- The directory where checkpoints ought to be saved. Should
|
|
be set only if the intent is to read data from the checkpoint directory
|
|
of a different modular input.
|
|
|
|
Returns:
|
|
data -- A Python data structure converted from JSON.
|
|
|
|
Throws:
|
|
IOError or Exception if the checkpoint cannot be read; ValueError for
|
|
malformed data. The caller should check if the file exists if it is
|
|
necessary to distinguish between invalid data versus missing data.
|
|
"""
|
|
checkpoint_dir = checkpoint_dir or self._input_config.checkpoint_dir
|
|
checkpoint_path = os.path.join(checkpoint_dir, filename)
|
|
data = None
|
|
|
|
try:
|
|
if os.path.isfile(checkpoint_path):
|
|
with open(checkpoint_path, 'r') as fp:
|
|
data = json.load(fp)
|
|
except (IOError, ValueError) as e:
|
|
logger.exception('msg="Exception when reading checkpoint data" checkpoint_dir="{}" filename="{}" exception="%s"'.format(checkpoint_dir, filename, e))
|
|
if raise_known_exceptions:
|
|
raise
|
|
except Exception:
|
|
logger.exception('msg="Unknown exception when reading checkpoint data" checkpoint_dir="{}" filename="{}"'.format(checkpoint_dir, filename))
|
|
raise
|
|
|
|
return data
|
|
|
|
def do_run(self, in_stream=sys.stdin, log_exception_and_continue=False):
|
|
"""
|
|
Read the config from standard input and return the configuration.
|
|
|
|
in_stream -- The stream to get the input from (defaults to standard input)
|
|
log_exception_and_continue -- If true, exceptions will not be thrown for invalid configurations and instead the stanza will be skipped.
|
|
"""
|
|
|
|
# Run the modular import
|
|
input_config = self.read_config(in_stream)
|
|
|
|
# Save input config for future use (contains the session key).
|
|
self._input_config = input_config
|
|
# TODO: Remove now-redundant refs to self.checkpoint_dir
|
|
self.checkpoint_dir = input_config.checkpoint_dir
|
|
|
|
# Is this input single instance mode?
|
|
single_instance = str(getattr(self, "use_single_instance", '')).strip().lower() in ["true", "t", "1"]
|
|
|
|
# Validate all stanza parameters.
|
|
stanzas = []
|
|
for stanza_name, unclean_stanza in input_config.configuration.items():
|
|
try:
|
|
stanzas.append(self.validate_parameters(unclean_stanza))
|
|
except FieldValidationException as e:
|
|
if log_exception_and_continue:
|
|
# Discard the invalid stanza.
|
|
logger.error("Discarding invalid input stanza '%s': %s" % (stanza_name, str(e)))
|
|
else:
|
|
raise e
|
|
|
|
# Call the modular input's "run" method on valid stanzas, handling two cases:
|
|
# a. If this script is in single_instance mode=true, we have received
|
|
# all stanzas in input_config.configuration. In this case,
|
|
# a duration parameter is not supported. The run() method may
|
|
# loop indefinitely, but we do not re-execute.
|
|
# b. If this script is in single_instance_mode=false, each stanza
|
|
# can have a duration parameter (or not) specifying the time
|
|
# at which the individual stanza will be re-executed.
|
|
#
|
|
# Note: The "duration" parameter emulates the behavior of the "interval"
|
|
# parameter available on Splunk 6.x and higher, and is mainly used by the
|
|
# VMWare application.
|
|
|
|
# TODO: A run() method may pass results back for optional processing
|
|
results = None
|
|
|
|
if stanzas:
|
|
if single_instance:
|
|
# Run the input across all defined stanzas and exit.
|
|
results = self.run(stanzas)
|
|
else:
|
|
# Retrieve the single input stanza.
|
|
stanza = stanzas[0]
|
|
try:
|
|
duration = int(stanza.get('duration', -1))
|
|
except ValueError as e:
|
|
# This should never happen unless the author of the modular input
|
|
# fails to specify "duration" as an IntegerField.
|
|
logger.error("Input stanza '%s' specified an invalid duration: %s" % (stanza.get('name', 'unknown'), str(e)))
|
|
# Exit with non-zero exit code so services/admin/inputstatus correctly reflects script status.
|
|
sys.exit(1)
|
|
|
|
# Save the checkpoint(s).
|
|
stanza_name = stanza.get('name', None)
|
|
|
|
# If there splunk 6.0 and interval field is defined, then ignore duration fields completely
|
|
if int(stanza.get("interval", -1)) >= 0 and splunk.version.__version__ >= '6.0':
|
|
# Run the single stanza and exit.
|
|
results = self.run(stanza)
|
|
else:
|
|
# Run duration field
|
|
if duration > 0 and self.checkpoint_dir:
|
|
# Make sure lastrun and duration is less than current time,
|
|
# otherwise if modular input is stopped and started (immediately)
|
|
# less than defined duration time then needs_another_run returns false always, which leads to
|
|
# modular input to run as single stanza and exist.
|
|
while True:
|
|
# TODO: Checkpoints will build up in the config directory when
|
|
# the input stanza changes. This should probably be modified to
|
|
# use the name of the input itself, unhashed. Name collisions would
|
|
# be a configuration error.
|
|
self.save_checkpoint(self.checkpoint_dir, stanza_name, int(time.time()))
|
|
results = self.run(stanza)
|
|
# Results processing, if any, could occur here.
|
|
time.sleep(ModularInput.time_to_next_run(self.checkpoint_dir, stanza_name, duration))
|
|
else:
|
|
# Duration is not defined
|
|
# Run the single stanza and exit for Splunk 5.x
|
|
results = self.run(stanza)
|
|
|
|
else:
|
|
logger.info("No input stanzas defined")
|
|
|
|
# Results processing, if any, could occur here.
|
|
|
|
def get_validation_data(self, in_stream=sys.stdin):
|
|
"""
|
|
Get the validation data from standard input
|
|
|
|
Arguments:
|
|
in_stream -- The stream to get the input from (defaults to standard input)
|
|
"""
|
|
|
|
val_data = {}
|
|
|
|
# Read everything from stdin
|
|
val_str = in_stream.read()
|
|
|
|
# Parse the validation XML
|
|
doc = xml.dom.minidom.parseString(val_str)
|
|
root = doc.documentElement
|
|
|
|
item_node = root.getElementsByTagName("item")[0]
|
|
if item_node:
|
|
|
|
name = item_node.getAttribute("name")
|
|
val_data["name"] = name
|
|
|
|
params_node = item_node.getElementsByTagName("param")
|
|
|
|
for param in params_node:
|
|
name = param.getAttribute("name")
|
|
|
|
if name and param.firstChild and param.firstChild.nodeType == param.firstChild.TEXT_NODE:
|
|
val_data[name] = param.firstChild.data
|
|
|
|
return val_data
|
|
|
|
def validate_parameters_from_cli(self, argument_array=None):
|
|
"""
|
|
Load the arguments from the given array (or from the command-line) and validate them.
|
|
|
|
Arguments:
|
|
argument_array -- An array of arguments (will get them from the command-line arguments if none)
|
|
"""
|
|
|
|
# Get the arguments from the sys.argv if not provided
|
|
if argument_array is None:
|
|
argument_array = sys.argv[1:]
|
|
|
|
# This is the list of parameters we will generate
|
|
parameters = {}
|
|
|
|
for i in range(0, len(self.args)):
|
|
arg = self.args[i]
|
|
|
|
if i < len(argument_array):
|
|
parameters[arg.name] = argument_array[i]
|
|
else:
|
|
parameters[arg.name] = None
|
|
|
|
# Now that we have simulated the parameters, go ahead and test them
|
|
self.validate_parameters("unnamed", parameters)
|
|
|
|
def execute(self, in_stream=sys.stdin, out_stream=sys.stdout):
|
|
"""
|
|
Get the arguments that were provided from the command-line and execute the script.
|
|
|
|
Arguments:
|
|
in_stream -- The stream to get the input from (defaults to standard input)
|
|
out_stream -- The stream to write the output to (defaults to standard output)
|
|
"""
|
|
|
|
try:
|
|
logger.info("Execute called")
|
|
|
|
if len(sys.argv) == 1:
|
|
# Run the modular input.
|
|
self.do_run(in_stream, log_exception_and_continue=True)
|
|
|
|
elif len(sys.argv) == 2:
|
|
|
|
if sys.argv[1].startswith('--username='):
|
|
# If a username command-line argument was provided, prompt for
|
|
# a password, try to obtain a session key, and run the modular
|
|
# input. This is only used to override session keys for testing purposes.
|
|
try:
|
|
password = getpass.getpass("Splunk password: ")
|
|
self._alt_session_key = splunk.auth.getSessionKey(sys.argv[1].split('=')[1], password)
|
|
except Exception:
|
|
logger.exception("Modular input: session key override failed.")
|
|
|
|
self.do_run(in_stream, log_exception_and_continue=True)
|
|
|
|
elif sys.argv[1] == "--scheme":
|
|
self.do_scheme(out_stream)
|
|
|
|
elif sys.argv[1] == "--validate-arguments":
|
|
logger.info("Modular input: validate arguments called")
|
|
|
|
# Exit with a code of -1 if validation failed
|
|
if self.do_validation() == False:
|
|
sys.exit(-1)
|
|
|
|
else:
|
|
self.usage(out_stream)
|
|
|
|
elif len(sys.argv) == 3 and sys.argv[1].startswith('--username=') and sys.argv[2].startswith('--password='):
|
|
# If username and password command-line arguments were provided,
|
|
# try to obtain a session key and run the modular input. This is
|
|
# only used to override session keys for testing purposes.
|
|
try:
|
|
self._alt_session_key = splunk.auth.getSessionKey(sys.argv[1].split('=')[1], sys.argv[2].split('=')[1])
|
|
except Exception:
|
|
logger.exception("Modular input: session key override failed.")
|
|
|
|
self.do_run(in_stream, log_exception_and_continue=True)
|
|
|
|
else:
|
|
self.usage(out_stream)
|
|
|
|
logger.info("Execution completed successfully")
|
|
|
|
except Exception as e:
|
|
|
|
logger.exception("Execution failed: %s" % (str(e)))
|
|
|
|
# Make sure to grab any exceptions so that we can print a valid error message
|
|
self.print_error(str(e), out_stream)
|
|
|
|
def gen_checkpoint_filename(self, stanza_name, modinput_name=None):
|
|
'''Generate a checkpoint filename for this stanza. Collision detection
|
|
is not performed explicitly, since we don't expect duplicate stanzas.
|
|
|
|
Parameters:
|
|
stanza_name - A string representing the stanza name, which is typically
|
|
in the form <modinput_name>://<stanza_name>
|
|
modinput_name - An alternate modular input name. This can be used to
|
|
construct a safe path to the checkpoint directory of a different
|
|
modular input, which is useful in situations where two modular inputs
|
|
are acting in a producer/consumer relationship.
|
|
|
|
Returns: The path to the checkpoint file corresponding to the stanza
|
|
and modular input name. The caller is repsonsible for ensuring that
|
|
the path can read/written.
|
|
'''
|
|
checkpoint_filename = stanza_name.split('://')[1] if '://' in stanza_name else stanza_name
|
|
if modinput_name:
|
|
return os.path.join(os.path.dirname(self._input_config.checkpoint_dir), modinput_name, checkpoint_filename)
|
|
return os.path.join(self._input_config.checkpoint_dir, checkpoint_filename)
|
|
|
|
def get_checkpoint_update_times(self, stanza_names, modinput_name=None):
|
|
'''Get the update timestamps for checkpointed files by stanza name.
|
|
|
|
Parameters:
|
|
|
|
stanza_names - A list of strings representing stanza names.
|
|
modinput_name - A string representing the name of another modular
|
|
input to derive checkpoint file update timstamps for, if this modular
|
|
input is acting as a consumer of the output of another modular input.
|
|
|
|
Returns: A list of tuples:
|
|
[(stanza_name, path_to_checkpoint_file, last_updated_timestamp),
|
|
...
|
|
]
|
|
|
|
'''
|
|
|
|
output = []
|
|
for stanza_name in stanza_names:
|
|
path = self.gen_checkpoint_filename(stanza_name, modinput_name)
|
|
if os.path.isfile(path):
|
|
try:
|
|
fstat = os.stat(path)
|
|
output.append((stanza_name, path, fstat.st_size, int(fstat.st_mtime)))
|
|
except IOError:
|
|
output.append((stanza_name, path, None, None))
|
|
else:
|
|
output.append((stanza_name, None, None, None))
|
|
return output
|
|
|
|
def is_configured(self, app=None, assume_true_on_error=False):
|
|
if app:
|
|
try:
|
|
app = App.get(App.build_id(app, '', 'nobody'), self._input_config.session_key)
|
|
return app.is_configured
|
|
except splunk.RESTException:
|
|
return assume_true_on_error
|
|
else:
|
|
return assume_true_on_error
|