import xml.sax.saxutils
import logging
import logging.handlers
import sys
import time as time_module
import datetime
from . import six
import splunk.rest as rest
from splunk.clilib.bundle_paths import make_splunkhome_path
########################################################################
# EXCEPTIONS
########################################################################
class ForceHydraRebuild(Exception):
def __init__(self, message="Something went unstable with a Hydra asset, typically due to a REST timeout or misconfiguration, rebuilding and validating entities"):
Exception.__init__(self, message)
########################################################################
# UTILITIES
########################################################################
def setupLogger(logger=None, log_format='%(asctime)s %(levelname)s [Hydra] %(message)s', level=logging.DEBUG, log_name="hydra.log", logger_name="hydra"):
"""
Setup a logger suitable for splunkd consumption
"""
if logger is None:
logger = logging.getLogger(logger_name)
logger.propagate = False # Prevent the log messages from being duplicated in the python.log file
logger.setLevel(level)
file_handler = logging.handlers.RotatingFileHandler(make_splunkhome_path(['var', 'log', 'splunk', log_name]), maxBytes=2500000, backupCount=5)
formatter = logging.Formatter(log_format)
file_handler.setFormatter(formatter)
logger.handlers = []
logger.addHandler(file_handler)
logger.debug("Init hydra logger")
return logger
from .models import HydraCacheStanza, SOLNAppObjModel
def acquireStanzaLock(stanza, worker, logger, lock_wait=0.1, lock_timeout=10):
"""
Lock a conf stanza with a worker's name.
Note that this method should only be used locally.
args:
stanza - the SOLNAppObjModel based model instance for the desired stanza with a worker field
worker - the full name for the worker locking the stanza
logger - a logger instance
lock_wait - the time to wait before confirming claim
lock_timeout - the timeout period for unreleased locks
RETURNS True if able to lock and currently locked, False otherwise
"""
if not isinstance(stanza, SOLNAppObjModel):
raise TypeError("Attempted to lock a stanza that was not a SOLNAppObjModel stanza={0}".format(str(stanza)))
if not "worker" in stanza.model_fields:
raise TypeError("Attempted to lock a stanza that didn't have a worker field stanza={0}".format(str(stanza)))
if not "last_lock_time" in stanza.model_fields:
raise TypeError("Attempted to lock a stanza that didn't have a last_lock_time field stanza={0}".format(str(stanza)))
if stanza.worker == worker:
#refresh the lock time
stanza.last_lock_time = datetime.datetime.utcnow()
logger.debug("[HydraStanzaLocker] stanza={0} with stanza_name={1} already claimed/locked by this worker={2}, no work necessary".format(str(stanza), stanza.name, stanza.worker))
return True
#Deal with the lock time
time_lock_gate = False
if stanza.last_lock_time is None:
#This stanza has never been locked before thus we can lock it
time_lock_gate = True
else:
#Check if the lock is expired, by default 10s
time_lock_gate = (datetime.datetime.utcnow() - stanza.last_lock_time) > datetime.timedelta(seconds=lock_timeout)
if stanza.worker != "unassigned" and not time_lock_gate:
logger.debug("[HydraStanzaLocker] cannot claim/lock stanza={0} with stanza_name={1} due to existing claim from worker={2}".format(str(stanza), stanza.name, stanza.worker))
return False
elif time_lock_gate and stanza.worker != "unassigned":
logger.debug("[HydraStanzaLocker] lock from worker=%s for stanza=%s is expired, reclaiming", stanza.worker, str(stanza))
#note that this sets the worker field in the stanza to the passed worker by reference
stanza.worker = worker
stanza.last_lock_time = datetime.datetime.utcnow()
logger.debug("[HydraStanzaLocker] attempting to claim/lock stanza_name=%s", stanza.name)
if not stanza.passive_save():
logger.error("[HydraStanzaLocker] Failed to save and thus claim/lock stanza={0} with stanza_name={1}".format(str(stanza), str(stanza.name)))
return False
#sleep for 100ms to see if we have lost the claim
time_module.sleep(lock_wait)
claimed_stanza = stanza.from_self()
if not claimed_stanza or claimed_stanza.worker != worker:
if claimed_stanza:
logger.debug("[HydraStanzaLocker] lost claim on stanza={0} with stanza_name={1} to worker={2}".format(str(stanza), stanza.name, claimed_stanza.worker))
else:
logger.debug("[HydraStanzaLocker] lost claim on stanza={0} with stanza_name={1} to deletion".format(str(stanza), stanza.name))
return False
else:
logger.debug("[HydraStanzaLocker] successfully claimed stanza=%s with stanza_name=%s for worker=%s", str(stanza), stanza.name, claimed_stanza.worker)
return True
def releaseStanzaLock(stanza, worker, logger):
"""
Unlock a conf stanza currently locked with your worker's name.
Note that this method should only be used locally.
Note that this method will also implicitly save the current model if successful.
args:
stanza - the SOLNAppObjModel based model instance for the desired stanza with a worker field
worker - the full name for the worker locking the stanza
logger - a logger instance
RETURNS True if able to unlock or currently unlocked, False if under someone else's claim
"""
if not isinstance(stanza, SOLNAppObjModel):
raise TypeError("Attempted to unlock a stanza that was not a SOLNAppObjModel stanza={0}".format(str(stanza)))
if not "worker" in stanza.model_fields:
raise TypeError("Attempted to unlock a stanza that didn't have a worker field stanza={0}".format(str(stanza)))
if stanza.worker != worker and stanza.worker != "unassigned":
logger.warning("[HydraStanzaLocker] Just tried to unlock a stanza that was not locked under this worker stanza_name={0} actual worker={1}".format(stanza.name, stanza.worker))
return False
if stanza.worker == "unassigned":
logger.debug("[HydraStanzaLocker] Just tried to unlock an already unlocked/unclaimed stanza, whatevs it's just weird")
return True
#alright now we actually remove the claim
stanza.worker = "unassigned"
return stanza.passive_save()
def isSplunkSessionKeyValid(host_path, session_key, return_status=False):
"""
Determine if the given session key is valid for the particular splunk server.
If you do not pass a session key, this will always return False, not use the default session key.
Also this is a way to "touch" a session key, and keep it from timing out.
If return_status is True, return the actual status code of the request, or False if host is unreachable.
args:
host_path - the path to the management port of the splunk server, e.g. https://idx.splunk.com:8089
session_key - the actual session key to test
return_status - return the actual status code of the request, False if host is unreachable
RETURNS True if session key is valid, False otherwise or response status code
"""
uri = host_path.rstrip("/") + "/services/authentication/current-context"
retval = False
if not session_key:
return retval
if not host_path:
return retval
try:
response, content = rest.simpleRequest(uri, sessionKey=session_key, rawResult=True)
del content
except Exception:
return False
if response.status == 200:
retval = True
return retval if not return_status else response.status
########################################################################
# COMMUNICATION WITH SPLUNKD
# We provide a class for printing data out to splunkd. Essentially this
# is just a wrapper on using xml formatted data delivery to splunkd
########################################################################
class XMLOutputManager(object):
"""
This guy handles writing data to splunkd with modular input xml
streaming mode.
"""
def __init__(self, out=sys.stdout):
"""
Construct an output manager.
kwargs:
out - represents the stream to print to. Defaults to sys.stdout.
"""
self.stream_initiated = False
self.out = out
def initStream(self):
"""
Initiate a stream of data for splunk to consume.
This MUST be called before any call to sendData.
"""
self.out.write("")
self.stream_initiated = True
def finishStream(self):
"""
Close the stream of data for splunk to consume
"""
if self.stream_initiated:
self.out.write("")
self.stream_initiated = False
def sendData(self, buf, unbroken=None, sourcetype=None, source=None, host=None, time=None, index=None):
"""
Send some data to splunk
args:
buf - the buffer of data to send (string). REQUIRED.
kwargs:
unbroken - this is a boolean indicating the buf passed is unbroken data if this is True.
Defaults to False (buf is a single event).
sourcetype - the sourcetype to assign to the event (string). Defaults to input default.
source - the source to assign to the event (string). Defaults to input default.
host - the host to assign to the event (string). Defaults to input default.
time - the time to assign to the event (string of UTC UNIX timestamp,
miliseconds supported). Defaults to letting splunkd work it out.
index - the index into which the data should be stored. Defaults to the input default.
"""
if not unbroken:
self.out.write("")
else :
self.out.write("")
self.out.write("")
self.out.write(xml.sax.saxutils.escape(buf))
self.out.write("")
if sourcetype is not None:
self.out.write("" + xml.sax.saxutils.escape(sourcetype) + "")
if source is not None:
self.out.write("" + xml.sax.saxutils.escape(source) + "")
if time is not None:
if isinstance(time, datetime.datetime):
time = str(time_module.mktime(time.timetuple()))
self.out.write("")
if host is not None:
self.out.write("" + xml.sax.saxutils.escape(host) + "")
if index is not None:
self.out.write("" + xml.sax.saxutils.escape(index) + "")
self.out.write("\n")
self.out.flush()
def sendDoneKey(self, sourcetype=None, source=None, host=None, time=None, index=None):
"""
Let splunkd know that previously sent, unbroken events are now complete
and ready for processing. Typically you will send some data, like chunks of a log file
then when you know you are done, say at the end of the log file you will send a
done key to indicate that sent data may be processed for the provided source,
sourcetype, host, and index
kwargs:
sourcetype - the sourcetype of the event (string). Defaults to input default.
source - the source of the event (string). Defaults to input default.
host - the host of the event (string). Defaults to input default.
index - the index into which the data is being stored. Defaults to the input default.
"""
self.out.write("")
self.out.write("")
if sourcetype is not None:
self.out.write("" + xml.sax.saxutils.escape(sourcetype) + "")
if source is not None:
self.out.write("" + xml.sax.saxutils.escape(source) + "")
if time is not None:
if isinstance(time, datetime.datetime):
time = str(time_module.mktime(time.timetuple()))
self.out.write("")
if host is not None:
self.out.write("" + xml.sax.saxutils.escape(host) + "")
if index is not None:
self.out.write("" + xml.sax.saxutils.escape(index) + "")
self.out.write("\n")
self.out.flush()
# prints XML error data to be consumed by Splunk
def printError(self, s):
self.out.write("{0}".format(xml.sax.saxutils.escape(s)))
########################################################################
# BOILER PLATE HANDLER
# Inherit from this handler to have the minimum methods you should have
########################################################################
class HydraHandler(object):
"""
Abstract for a generic hydra handler for any task
"""
cache_model = HydraCacheStanza
def __init__(self, output, logger, worker_name, app, gateway_adapter):
"""
This constructs your handler
args:
output - the worker's XMLOutputManager instance you use to send data to splunkd
logger - the worker's python logger instance you use to log for your handler
worker_name - the full name of the worker, used for locking hydra_caches
"""
self.logger = logger
self.output = output
self.worker_name = worker_name
self.app = app
self.gateway_adapter = gateway_adapter
def run(self, session, config, create_time, last_time):
"""
This is the method you must implement to perform your atomic task
args:
session - the session object return by the loginToTarget method
config - the dictionary of all the config keys from your stanza in the collection.conf
create_time - the time this task was created/scheduled to run (datetime object)
last_time - the last time this task was created/scheduler to run (datetime object)
RETURNS True if successful, False otherwise
"""
raise NotImplementedError('Run not supported by this handler.')
#===========================================================================
# METHODS FOR CACHE MANAGEMENT
# Note that cache model must be implemented to use any of these methods,
# else the default cache handler is used
#===========================================================================
def getCache(self, stanza_name):
"""
Get the cached information for the stanza name provided. Data will be provided back as a dict.
If the stanza does not exist yet an empty dictionary will be returned, but the stanza will
not be created.
args:
stanza_name - the name under which the data is cached
RETURNS a dict of the stanza keys:data
"""
#Note that we assume that session_key and host_path are local and set in the worker to global state
model = self.cache_model.from_name(stanza_name, app=self.app)
out_info = {}
if model:
for field in model.model_fields:
out_info[field] = getattr(model, field)
return out_info
def getCacheAndLock(self, stanza_name):
"""
Get the cached information for the stanza name provided. Data will be provided back as a dict.
Also acquire a lock on the stanza so that no other worker can edit it until it has been released.
If the stanza does not exist yet an empty dictionary will be returned, but the stanza will
be created.
Also return the status of the lock, i.e. True if locked, False if unlocked
args:
stanza_name - the name under which the data is cached
RETURNS a tuple of the stanza keys:data in the first index, and status (boolean) of the lock in the second
"""
#Note that we assume that session_key and host_path are local and set in the worker to global state
model = self.cache_model.from_name(stanza_name, app=self.app)
out_info = {}
if not model:
model = self.cache_model(self.app, "nobody", stanza_name)
model.worker = self.worker_name
model.last_lock_time = datetime.datetime.utcnow()
if model.passive_save():
return {"worker": self.worker_name, "last_lock_time": model.last_lock_time}, True
else:
return out_info, False
else:
status = acquireStanzaLock(model, self.worker_name, self.logger)
if status:
for field in model.model_fields:
out_info[field] = getattr(model, field)
out_info["worker"] = self.worker_name
else:
model = model.from_self()
for field in model.model_fields:
out_info[field] = getattr(model, field)
return out_info, status
def setCache(self, stanza_name, data):
"""
Set the cache stanza with the given name to hold data, where data is a dictionary that
contains the keys equivalent to the model field names. Note that this requires a lock,
if a lock cannot be acquired nothing will be set.
args:
stanza_name - the name under which the data is cached
data - dict with keys equivalent to the model fields
RETURNS True if successful, False if not
"""
model = self.cache_model.from_name(stanza_name, app=self.app)
if not model:
model = self.cache_model(self.app, "nobody", stanza_name)
else:
if not acquireStanzaLock(model, self.worker_name, self.logger):
#Could not get a lock, we failed
return False
#We have the stanza be it new or on the lock
for key, val in six.iteritems(data):
setattr(model, key, val)
#explicitly remove the lock, set it here first to avoid user corruption of the field
model.worker = self.worker_name
return releaseStanzaLock(model, self.worker_name, self.logger)
def destroyCache(self, stanza_name, retry_count=3):
"""
Destroy the cache stanza named. Must be able to lock in order to destroy.
args:
stanza_name - the name under which the data is cached
RETURNS True if successful False if not
"""
status = False
for retry in range(retry_count):
model = self.cache_model.from_name(stanza_name, app=self.app)
if not model:
status = False
else:
if not acquireStanzaLock(model, self.worker_name, self.logger):
#Could not get a lock, we failed to destroy it
status = False
else:
status = model.passive_delete()
if status:
break
return status