You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

662 lines
34 KiB

# Copyright (C) 2005-2024 Splunk Inc. All Rights Reserved.
#Core Python Imports
import sys
import os
import time
import logging
import datetime
try:
from urllib.request import urlopen, Request
from urllib.error import HTTPError
except ImportError:
from urllib2 import urlopen, Request, HTTPError
#Splunk Imports
import splunk
import splunk.version as ver
import hydra_inframon
from .hydra_common import HydraCommon, HydraGatewayAdapter
from .models import HydraMetadataStanza, HydraSessionStanza, SplunkStoredCredential, HydraGatewayStanza
#Modify Path to include SA-Hydra-inframon/lib
from splunk.clilib.bundle_paths import make_splunkhome_path
sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-Hydra-inframon', 'lib']))
from SolnCommon.modinput import ModularInput, Field, ListField, DurationField
class HydraWorker(ModularInput):
title = "Hydra Worker"
description = "Perform Distributed Work"
handlers = None
app = None
def __init__(self):
self.output = hydra_inframon.XMLOutputManager()
args = [
Field("name", "Worker Name", "A name for your worker input to attach to all events that originate with it.",
required_on_create=False),
ListField("capabilities", "Worker Capabilities",
"A comma delimited list of job types that this worker can perform.", required_on_create=False),
Field("log_level", "Logging Level", "This is the level at which the worker will log data.",
required_on_create=True),
DurationField("duration", "Duration",
"This is the minimum time between runs of the input should it exit for some reason",
required_on_create=True)
]
scheme_args = {'title': self.title,
'description': self.description,
'use_external_validation': "true",
'streaming_mode': "xml",
'use_single_instance': "false"}
ModularInput.__init__(self, scheme_args, args)
self.metadata_conf_mtime = None
self.active_handlers = {}
def establishMetadata(self):
"""
Read the local metadata stanza and set it to self.metadata
RETURNS nothing
"""
while True:
metadata_stanza = HydraMetadataStanza.from_name("metadata", self.app, "nobody")
if metadata_stanza:
self.metadata = metadata_stanza.parsed_wildcard_fields["metadata"]
self.logger.info("successfully loaded metadata stanza with metadata fields=%s", list(self.metadata.keys()))
break
else:
self.logger.warning("could not find metadata stanza going to sleep for 10 seconds and retrying...")
time.sleep(10)
def _getMetadataForJob(self, job):
"""
Lookup the metadata for the given job
args:
job - JobTuple of the job whose metadata is desired
RETURNS a metadata dict
"""
return self.metadata.get(job.metadata_id, None)
def isJobExpired(self, job_id, job_tuple, metadata):
"""
Determine if a job is expired given a particular set of metadata
args:
job_id - the id of the job
job_tuple - the JobTuple namedtuple for the job
metadata - the metadata dictionary associated with this job
RETURNS True if expired False otherwise
"""
expiration_period = int(job_tuple.expiration_period)
cur_time = datetime.datetime.utcnow()
expiration_time = job_tuple.create_time + datetime.timedelta(seconds=expiration_period)
pool_name = metadata.get("pool_name", None)
if expiration_time < cur_time:
self.logger.debug("[pool=%s][isJobExpired] expired job=%s create_time=%s expiration_period=%s cur_time=%s task=%s target=%s",
pool_name, job_id, job_tuple.create_time, job_tuple.expiration_period, cur_time, job_tuple.task, job_tuple.target)
return expiration_time < cur_time
def getJob(self):
"""
Check for performable jobs via the hydra gateway, if none found sleep until one is available.
RETURNS: a tuple of the job_id, named JobTuple, and the associated metadata
"""
active_job = None
job_id = None
job_tuple = None
while True:
try:
job_tuple = self.gateway_adapter.get_next_job()
except HTTPError as e:
if e.code == 401:
self.logger.error("[getJob] hydra gateway adapter failed to authenticate, re-establishing...")
self.establishGateway()
else:
self.logger.exception(
"[getjob] unhandled HTTP error of code=%s, sleeping before retry, exception=%s", e.code, str(e))
time.sleep(2)
continue
if job_tuple is None:
self.logger.info("[getJob] could not find a job to do, sleeping before retry")
time.sleep(2)
continue
job_id = job_tuple.name
metadata = self._getMetadataForJob(job_tuple)
if metadata is None:
self.logger.error(
"[getJob] could not find metadata information for job=%s and it will not be run, likely due to a removal of target=%s from collection configuration",
job_id, job_tuple.target)
continue
pool_name = metadata.get("pool_name", None)
if self.isJobExpired(job_id, job_tuple, metadata):
self.logger.error("[pool=%s][getJob] job=%s of task=%s for target=%s has expired and will not be run",pool_name, job_id, job_tuple.task, job_tuple.target)
is_atomic = False
if job_tuple.task in metadata.get("atomic_tasks", []):
is_atomic = True
elif (type(job_tuple.special) == dict and len(
job_tuple.special) > 0) and job_tuple.task in job_tuple.special.get("atomic_tasks", []):
is_atomic = True
if is_atomic:
self.logger.error("[pool=%s][getJob] reporting expired atomic job as a job failure for job=%s of task=%s for target=%s",
pool_name, job_id, job_tuple.task, job_tuple.target)
self.report_failed_atomic_job(job_tuple, pool_name)
continue
#Looks like we found a job to do
active_job = (job_id, job_tuple, metadata)
break
self.logger.debug("[pool=%s][getJob] found job to do, active job=%s", pool_name, job_id)
return active_job
def report_failed_atomic_job(self, job_tuple, pool_name):
"""
Report a failed atomic job to the gateway.
@type job_tuple: JobTuple
@param job_tuple: the JobTuple object for the failed job
@param pool_name:
@rtype: None
@return None
"""
try:
self.gateway_adapter.report_failed_atomic_job(job_tuple)
self.logger.info(
"[pool=%s][ReportFailedAtomicJob] Successfully reported failed job to gateway for target=%s, task=%s, metadata_id=%s",
pool_name, job_tuple.target, job_tuple.task, job_tuple.metadata_id)
except HTTPError as e:
if e.code == 401:
self.logger.error(
"[pool=%s][ReportFailedAtomicJob] hydra gateway adapter failed to authenticate, re-establishing...", pool_name)
self.establishGateway()
else:
self.logger.exception(
"[pool=%s][ReportFailedAtomicJob] unhandled HTTP error of code=%s, skipping the update, exception=%s", pool_name,
e.code, str(e))
def update_job_execution_info(self, time_spent, job_tuple, is_atomic, pool_name):
'''
Update job execution info in gateway including the execution time
and the job_id if it was atomic.
@type time_spent: int
@param time_spent: total time is taken by job
@type job_tuple: JobTuple
@param job_tuple: the JobTuple object for the completed job
@type is_atomic: bool
@param is_atomic: True if the job's task was atomic, False otherwise
@rtype: None
@return None
'''
try:
self.gateway_adapter.commit_job_exec_info(time_spent, job_tuple, is_atomic)
self.logger.info(
"[pool=%s][UpdateJobTime] Successfully reported time to gateway for target=%s, task=%s, metadata_id=%s, time=%s",
pool_name, job_tuple.target, job_tuple.task, job_tuple.metadata_id, time_spent)
except HTTPError as e:
if e.code == 401:
self.logger.error("[pool=%s][UpdateJobTime] hydra gateway adapter failed to authenticate, re-establishing...", pool_name)
self.establishGateway()
else:
self.logger.exception(
"[pool=%s][UpdateJobTime] unhandled HTTP error of code=%s, skipping the update, exception=%s", pool_name, e.code,
str(e))
def initializeHandlers(self):
"""
Instantiate instances of all handlers specified in the handlers dict
that correspond to specified capabilities of this worker process
and store them in the active_handlers property
RETURNS nothing
"""
if (self.handlers is None) or (type(self.handlers) is not dict):
self.logger.error("Problem with the handlers property, right now it is of type {0} and value {1}".format(
str(type(self.handlers)), str(self.handlers)))
raise NotImplementedError(
"HydraWorker processes MUST have a handlers property consisting of a dict of task to handler classes")
#Iterate on all handlers, instantiate them
tmp_capabilities = self.worker_capabilities[:]
self.active_handlers = {}
for handled_task, task_handler in self.handlers.items():
if "*" not in self.worker_capabilities and handled_task not in self.worker_capabilities:
self.logger.info(
"Worker has handler for task outside capabilities, will not instantiate handler for task={0}".format(
handled_task))
else:
self.active_handlers[handled_task] = task_handler(self.output, self.logger, self.worker_name, self.app,
self.gateway_adapter)
try:
tmp_capabilities.remove(handled_task)
except ValueError:
#If they put in a * this is benign
pass
self.logger.info("Worker instantiated handler for task={0}".format(handled_task))
if "*" in tmp_capabilities:
tmp_capabilities.remove("*")
if len(tmp_capabilities) > 0:
self.logger.error(
"worker failed to supply handler for the following tasks: {0}, any attempt to execute these tasks will fail.".format(
str(tmp_capabilities)))
def loginToTarget(self, target, user, password, realm):
"""
This method must be overloaded by any implementation of the hydra worker.
It shall use its args to somehow create a session object passed to any handler
performing a job associated with the particular target
args:
target - the uri to the domain specific asset we need to log in to
user - the user name stored in splunkd associated with that target
password - the password stored in splunkd associated with that target
realm - the realm if in realm mode, else None
RETURNS the session object for this implementation
"""
raise NotImplementedError(
"All HydraWorker implementations must overload loginToTarget with the valid domain specific login logic.")
def isSessionValid(self, session):
"""
This method must be overloaded by any implementation of the hydraworker.
It takes in a session object returned by loginToTarget and indicates if that
session is still valid for use by a handler.
args:
session - the python object returned by loginToTarget to be tested
RETURNS True if session is valid, False if it must be refreshed
"""
raise NotImplementedError(
"All HydraWorker implementations must overload isSessionValid with the domain specific logic for checking session state.")
def getPassword(self, realm, user):
"""
This method pulls the clear password from storage/passwords for a
particular realm and user. This wraps the util method for logging purposes.
args:
realm - the realm associated with the stored credential
user - the user name associated with the stored credential
RETURNS the clear string of the password, None if not found
"""
#note we are relying on splunk's internal automagical session_key storage
password = SplunkStoredCredential.get_password(realm, user, app=self.app)
if password is None:
self.logger.warning(
"Could not find a stored credential for realm={0} and user={1}, sending None to loginToTarget".format(
realm, user))
return None
else:
return password
def acquireStanzaLock(self, stanza, lock_wait=0.1, lock_timeout=10):
"""
Lock a conf stanza with your worker's name.
Note that this method should only be used locally.
args:
stanza - the SOLNAppObjModel based model instance for the desired stanza with a worker field
RETURNS True if able to lock and currently locked, False otherwise
"""
return hydra_inframon.acquireStanzaLock(stanza, self.worker_name, self.logger, lock_wait, lock_timeout)
def releaseStanzaLock(self, stanza):
"""
Unlock a conf stanza currently locked with your worker's name.
Note that this method should only be used locally.
args:
stanza - the SOLNAppObjModel based model instance for the desired stanza with a worker field
RETURNS True if able to unlock or currently unlocked, False if under someone else's claim
"""
return hydra_inframon.releaseStanzaLock(stanza, self.worker_name, self.logger)
def updateSessionStanza(self, session_stanza, target, username, realm):
"""
Acquire a lock on the session stanza and update it with username and password if it has an invalid session key upon lock
"""
RETRY_LIMIT = 5
retry_count = 0
cred_realm = target if not realm else realm
while True:
session_stanza = session_stanza.from_self()
if not session_stanza:
self.logger.warning(
"[getSessionForTarget] problem getting session stanza for target=%s and username=%s and realm=%s, retrying...",
target, username, realm)
retry_count += 1
if retry_count > RETRY_LIMIT:
raise hydra_inframon.ForceHydraRebuild(
"[getSessionForTarget] problem getting session stanzas in hydra_stanza.conf for target={0} and username={1} and realm={2}".format(
target, username, realm))
else:
time.sleep(15)
continue
if self.isSessionValid(session_stanza.session):
self.logger.debug(
"[getSessionForTarget] found valid session=%s for target=%s and username=%s and realm=%s",
session_stanza.session, target, username, realm)
break
else:
self.logger.warning(
"[getSessionForTarget] could neither claim the session stanza nor use the existing session, re-trying to lock for target={0} with username={1}".format(
target, username))
if self.acquireStanzaLock(session_stanza, lock_wait=1, lock_timeout=45):
self.logger.debug(
"[getSessionForTarget] about to rebuild session=%s with password for target=%s and username=%s and realm=%s",
session_stanza.session, target, username, realm)
password = self.getPassword(cred_realm, username)
session = self.loginToTarget(target, username, password, realm)
self.logger.debug(
"[getSessionForTarget] rebuilt into session=%s with password for target=%s and username=%s and realm=%s",
session, target, username, realm)
session_stanza.session = session
if not session_stanza.passive_save():
self.logger.error(
"[getSessionForTarget] unable to save session for target=%s and username=%s and realm=%s",
target, username, realm)
else:
self.logger.debug(
"[getSessionForTarget] successfully saved session=%s for target=%s and username=%s and realm=%s",
session_stanza.session, target, username, realm)
if not self.releaseStanzaLock(session_stanza):
self.logger.error(
"[getSessionForTarget] unable to release claim on session for target=%s and username=%s and realm=%s unless it was claimed by another worker lock will remain until expiration",
target, username, realm)
self.logger.debug(
"[getSessionForTarget] returning session=%s with password for target=%s and username=%s and realm=%s",
session_stanza.session, target, username, realm)
else:
#Since we lost our lock we will check if the session is valid, if it is we will use it
#We refresh the stanza here just in case in interim the session itself was refreshed at the top of the run
self.logger.warning(
"[getSessionForTarget] could not claim invalid session stanza to update it, sleeping for 15sec")
time.sleep(15)
return session_stanza
def getSessionForTarget(self, target, username, realm):
"""
This method gets either a cached or a fresh session for a particular target that is
tested to be a valid session.
args:
target - the target host for which to get the session
username - the username to use for this particular target
realm - the authentication realm if in realm mode, else None
RETURNS a valid session object to the target
"""
if target is None:
self.logger.debug(
"target passed as None to getSessionForTarget, cannot get a session for target of None, will return None.")
return None
if username is None:
self.logger.debug(
"username passed as None to getSessionForTarget, cannot get a session without a username, will return None.")
return None
session_stanza = HydraSessionStanza.from_name(target, app=self.app)
if not session_stanza:
self.logger.debug("[getSessionForTarget] No active/inactive session found for target, creating new one...")
for attempt_number in range(4):
session_stanza = HydraSessionStanza(self.app, "nobody", target)
session_stanza.worker = "unassigned"
session_stanza.last_lock_time = datetime.datetime.utcnow()
session_stanza.passive_save()
session_stanza = HydraSessionStanza.from_name(target, app=self.app)
if not session_stanza:
self.logger.error(
"[getSessionForTarget] Could not confirm creation of session stanza in attempt={0} even though it was just created will try again".format(
str(attempt_number)))
time.sleep(5)
else:
break
#If after four tries there's no session stanza, we failed just raise it and force a restart
if not session_stanza:
raise hydra_inframon.ForceHydraRebuild(
"[getSessionForTarget] Problem creating session stanzas in hydra_stanza.conf for target={0} and username={1}".format(
target, username))
#Now that a session stanza exists we need to try and claim it, check if someone already made it valid, and maybe make it valid
session_stanza = self.updateSessionStanza(session_stanza, target, username, realm)
else:
session_stanza = self.updateSessionStanza(session_stanza, target, username, realm)
return session_stanza.session
def assignJobToHandler(self, job):
"""
This method assigns the given job to the appropriate handler and runs the handler
args:
job - the active and to be performed tuple of job_id, job_tuple, metadata
RETURNS True if job successful, False if job failed
"""
#Acquire Proper Handler
job_id, job_tuple, metadata = job
handler = self.active_handlers.get(job_tuple.task, None)
if handler is None:
raise NotImplementedError("No handler found for task={0}".format(job_tuple.task))
#Establish a config that is the metadata updated with anything from the special if special is not empty
if type(job_tuple.special) == dict and len(job_tuple.special) > 0:
#metadata is copied to prevent corruption of the base metadata
config = metadata.copy()
config.update(job_tuple.special)
else:
#since there is no special we assume that the handler is not going to fuss with our metadata ref
config = metadata
#Check For Atomic Job
is_atomic = False
if job_tuple.task in config.get("atomic_tasks", []):
is_atomic = True
#Get Session For Target
session = self.getSessionForTarget(job_tuple.target, config["username"], config["realm"])
#Execute Job
self.logger.info("Assigning job=%s of task=%s with config=%s to handler=%s for target=%s", job_id, job_tuple.task, config,
handler, job_tuple.target)
job_start_time = time.time()
pool_name = config.get("pool_name", None)
if handler.run(session, config, job_tuple.create_time, job_tuple.last_time):
self.logger.info("[pool={0}]Successfully completed job={1} of task={2} for target={3}".format(pool_name, job_id, job_tuple.task, job_tuple.target))
job_end_time = time.time()
total_time = job_end_time - job_start_time
self.update_job_execution_info(total_time, job_tuple, is_atomic, pool_name)
else:
if is_atomic:
self.report_failed_atomic_job(job_tuple, pool_name)
self.logger.error('[pool=%s]Failed to complete job="%s" of task="%s" against target="%s"', pool_name, job_id, job_tuple.task,
job_tuple.target)
self.logger.debug("[pool=%s]Failed job=%s was passed a config=%s, session=%s, create_time=%s, and last_time=%s",
pool_name, job_id, str(config), str(session), str(job_tuple.create_time), str(job_tuple.last_time))
def establishGateway(self):
"""
Safely establish the adapter to the hydra gateway on the node. If it
cannot be established set it to None.
"""
#Read configuration from splunkd:
stanza = HydraGatewayStanza.from_name("gateway", "SA-Hydra-inframon", session_key=self.session_key)
if not stanza or not isinstance(stanza.port, int):
self.logger.warning(
"[establishGateway] could not read gateway configuration from splunkd, defaulting to port 8008")
gateway_port = 8008
else:
gateway_port = stanza.port
self.gateway_uri = "https://" + splunk.getDefault("host") + ":" + str(gateway_port)
#Authenticate with gateway
hga = None
for retry in range(4):
self.logger.info("[establishGateway] attempting to connect to gateway=%s for node=%s ...", self.gateway_uri,
self.node_path)
try:
hga = HydraGatewayAdapter(self.node_path, self.session_key, self.gateway_uri)
self.logger.info("[establishGateway] successfully connected to gateway=%s for node=%s",
self.gateway_uri, self.node_path)
self.logger.debug("[establishGateway] resetting gateway adapter for active_handlers=%s",
self.active_handlers)
for handler in self.active_handlers.values():
handler.gateway_adapter = hga
except splunk.SplunkdConnectionException:
self.logger.error(
"[establishGateway] could not connect to gateway=%s for node=%s due to a socket error, timeout, or other fundamental communication issue",
self.gateway_uri, self.node_path)
except splunk.AuthenticationFailed:
self.logger.error(
"[establishGateway] could not authenticate with gateway=%s for node=%s due to a splunkd authentication issue, this is fatal, forcing rebuild",
self.gateway_uri, self.node_path)
except splunk.LicenseRestriction:
self.logger.error(
"[establishGateway] could not authenticate with gateway=%s for node=%s due to a splunkd license issue, this is fatal, forcing rebuild",
self.gateway_uri, self.node_path)
except splunk.AuthorizationFailed:
self.logger.error(
"[establishGateway] could not authenticate with gateway=%s for node=%s due to a splunkd user permissions issue, this is fatal, forcing rebuild",
self.gateway_uri, self.node_path)
except splunk.ResourceNotFound:
self.logger.error(
"[establishGateway] could not authenticate with gateway=%s for node=%s due to missing hydra gatekeeper EAI endpoint, this is fatal, forcing rebuild",
self.gateway_uri, self.node_path)
except splunk.InternalServerError as e:
self.logger.error(
"[establishGateway] could not authenticate with gateway=%s for node=%s due to internal server error=%s",
self.gateway_uri, self.node_path, str(e))
except splunk.BadRequest as e:
self.logger.error(
"[establishGateway] could not authenticate with gateway=%s for node=%s due to bad request error=%s",
self.gateway_uri, self.node_path, str(e))
except splunk.RESTException as e:
self.logger.error(
"[establishGateway] could not authenticate with gateway=%s for node=%s due to some crazy REST error=%s",
self.gateway_uri, self.node_path, str(e))
if hga is not None:
break
else:
self.logger.warn("[establishGateway] failed to establish gateway on try=%s", str(retry + 1))
if hga is not None:
self.gateway_adapter = hga
else:
raise hydra_inframon.ForceHydraRebuild("Could not authenticate with local Hydra Gateway")
def cry(self, message):
"""
Cry out to the hydra_health conf file. When read by the scheduler
this should result in a total refresh of this process.
:'(
RETURNS: nothing
"""
try:
with open(make_splunkhome_path(['etc', 'apps', self.app, 'local', 'hydra_health.conf']), 'a') as f:
stanza = "[" + str(time.time()) + "]\nhead=" + self.head_name + "\nreason=" + str(message) + "\n"
f.write(str(stanza))
except Exception as e:
self.logger.error(str(e))
def run(self, stanza):
#Handle configuration parsing and logging setup
if isinstance(stanza, list):
self.worker_name = stanza[0].get('name', None)
worker_log_level = stanza[0].get("log_level", "WARN").upper()
self.worker_capabilities = stanza[0].get("capabilities", "*")
is_interval_field_defined = True if int(stanza[0].get("interval", -1)) > 0 else False
else:
self.worker_name = stanza.get('name', None)
worker_log_level = stanza.get("log_level", "WARN").upper()
self.worker_capabilities = stanza.get("capabilities", "*")
is_interval_field_defined = True if int(stanza.get("interval", -1)) > 0 else False
input_config = self._input_config
self.head_name = self.worker_name.split("/", 1)[1].lstrip("/")
logname = "hydra_inframon_worker_" + self.worker_name.replace("://", "_") + ".log"
self.worker_name_full = self.worker_name + ":" + str(os.getpid())
if worker_log_level not in ["DEBUG", "INFO", "WARN", "WARNING", "ERROR"]:
worker_log_level = logging.WARN
self.logger = hydra_inframon.setupLogger(logger=None,
log_format='%(asctime)s %(levelname)s [' + self.worker_name_full + '] %(message)s',
level=worker_log_level, log_name=logname)
self.logger.warn("log_level was set to a non-recognizable level it has be reset to WARNING level")
else:
self.logger = hydra_inframon.setupLogger(logger=None,
log_format='%(asctime)s %(levelname)s [' + self.worker_name_full + '] %(message)s',
level=worker_log_level, log_name=logname)
self.logger.debug("logger reset with log level of {0}".format(worker_log_level))
#Handle local authentication automagically
splunk.setDefault('sessionKey', input_config.session_key)
try:
#Set up gateway
self.node_path = splunk.mergeHostPath()
self.session_key = input_config.session_key
self.establishGateway()
#Set up worker handlers and capabilities
self.logger.info(
"Initiating worker={0} with capabilities={1} and handlers={2}".format(self.worker_name_full,
str(self.worker_capabilities),
str(self.handlers)))
#Confirm we have been implemented correctly
if self.app is None:
raise NotImplementedError(
"All workers must implement a self.app property in order to establish namespace")
self.app_home = os.path.join(make_splunkhome_path(['etc', 'apps']), self.app)
self.initializeHandlers()
self.output.initStream()
while True:
#Refresh the metadata on the worker if necessary
metadata_mtime = HydraCommon.getConfModTime(self.app_home, "metadata")
if self.metadata_conf_mtime == None or self.metadata_conf_mtime < metadata_mtime:
self.establishMetadata()
self.metadata_conf_mtime = metadata_mtime
#Ask for our new job
cur_job = self.getJob()
#handle it
self.assignJobToHandler(cur_job)
#oh yeah we handled that
self.logger.debug("handled job=%s of task=%s", cur_job[0], cur_job[1].task)
#Let's try to give splunk a chance to respond to things
time.sleep(0.1)
self.output.finishStream()
except splunk.AuthenticationFailed as e:
self.output.finishStream()
## Old behavior is required for netapp, remove me when netapp support splunk 6.0.x version onwards
if not is_interval_field_defined or ver.__version__ < '6.0':
self.cry("Unrecoverable Local Authentication Failure")
self.logger.error(
"Crying due to unrecoverable problem with hydra worker {0}: {1}".format(self.worker_name_full, str(e)))
self.logger.error(
"Exiting current run of hydra worker, expecting restart by scheduler on next job assignment run")
sys.exit(1)
except hydra_inframon.ForceHydraRebuild as e:
self.output.finishStream()
## Old behavior is required for netapp, remove me when netapp support splunk 6.0.x version onwards
if not is_interval_field_defined or ver.__version__ < '6.0':
self.cry("Unrecoverable Hydra Problem")
self.logger.error(
"Crying due to unrecoverable problem with hydra worker {0}: {1}".format(self.worker_name_full, str(e)))
self.logger.error(
"Exiting current run of hydra worker, expecting restart by scheduler on next job assignment run")
sys.exit(1)
except Exception as e:
self.output.finishStream()
self.logger.exception("Problem with hydra worker {0}: {1}".format(self.worker_name_full, str(e)))
self.logger.warning(
"Exiting current run of hydra worker, expecting to restart based on duration or interval")
## Old behavior is required for netapp, remove me when netapp support splunk 6.0.x version onwards
if not is_interval_field_defined or ver.__version__ < '6.0':
return False
else:
sys.exit(1)