You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2239 lines
110 KiB

# Copyright (C) 2005-2021 Splunk Inc. All Rights Reserved.
#CORE PYTHON IMPORTS
import sys
import os
import time
import datetime
import math
import logging
from httplib2 import ServerNotFoundError
#Splunk Python does not bundle UUID, so we've included it in the hydra bin, but it is a core python module
import uuid
#CORE SPLUNK IMPORTS
import splunk
from splunk.util import normalizeBoolean
from splunk.rest import simpleRequest
import splunk.auth as auth
import splunk.entity as en
import splunk.version as ver
#SA-HYDRA-inframon IMPORTS
from . import six
from hydra_inframon import XMLOutputManager, setupLogger, isSplunkSessionKeyValid, ForceHydraRebuild
from .models import HydraHealthStanza, HydraMetadataStanza, HydraNodeStanza, SplunkStoredCredential, HydraGatewayStanza
from .hydra_common import HydraCommon, HydraGatewayAdapter, JobTuple
#Modify Path to include SA-Hydra-inframon/lib
from splunk.clilib.bundle_paths import make_splunkhome_path
sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-Hydra-inframon', 'lib']))
from SolnCommon.modinput import ModularInput, Field, DurationField
#Utility Functions
def makeFieldID():
"""
Return a string usable as an id in a splunk wildcard field
"""
return str(uuid.uuid1()).replace("-", "")
#Classes for data, might want to put these in a module by themselves at some point
class HydraConfigToken(object):
"""A config token that can be sent to any worker."""
def __init__(self, target, username, task, metadata_id, logger, initial_schedule_offset=0, metadata={}, special={},
initial_aggregate_execution_time=5):
"""
Task and Target are the job type to perform and the target host to perform
it on.
ARGS:
@type target: str
@param target: The external system/resource upon which to perform the task
@type username: str
@param username: The username to use when logging into the target
@type task: str
@param task: The task or type of job to perform on the target
@type metadata_id: str
@param metadata_id: the name of the metadata to use when performing the task
@type logger: logging.logger reference
@param logger: the logger to use when writing out messages to the scheduler log
@type initial_schedule_offset: int
@param initial_schdule_offset: the delta from the current time at which to begin scheduling jobs from this config token
@type metadata: dict
@param metadata: the collection configuration information for this particular config token to determine the scheduling properties
@type special: dict
@param special: the metadata specific to this config token only that will be stored in the job tuple
@type initial_aggregate_execution_time: int
@param initial_aggregate_execution_time: the inital weight of jobs created by this config token
"""
#Basic stuff
self.target = target
self.username = username
self.task = task
self.logger = logger
self.metadata_id = metadata_id
self.special = special
# read default offset from config if exits
if task + "_offset" in metadata:
self.offset = metadata[task + "_offset"]
else:
self.offset = initial_schedule_offset
self.metadata = metadata
#Handle all work around scheduling this thing
interval_param_name = task + "_interval"
if interval_param_name in metadata:
self.interval = metadata[interval_param_name]
else:
self.logger.error(
"Could not establish configured interval for job type %s, setting to default of 60 seconds", self.task)
self.interval = 60
#Expiration period
expiration_param_name = task + "_expiration"
if expiration_param_name in metadata:
self._expiration_period = metadata[expiration_param_name]
else:
self.logger.error(
"Could not establish configured expiration period for job type %s, setting to default of same as interval",
self.task)
self._expiration_period = self.interval
#Priority Modification
priority_param_name = task + "_priority"
if priority_param_name in metadata:
self._priority_adjustment = metadata[priority_param_name]
self.logger.debug("Established configured priority adjustment for job type %s, setting to %s", self.task,
self._priority_adjustment)
else:
self.logger.debug(
"Could not establish configured priority adjustment for job type %s, setting to default of 0",
self.task)
self._priority_adjustment = 0
#Task execution initial estimated time
task_exec_param_name = task + "_exectime"
if task_exec_param_name in metadata:
self._execution_time = metadata[task_exec_param_name]
self.logger.debug("Established configured execution time for job task=%s, setting to %s", self.task,
self._execution_time)
else:
self.logger.debug("Could not establish configured execution time for job task=%s, setting to default %s",
self.task, initial_aggregate_execution_time)
self._execution_time = initial_aggregate_execution_time
#Atomic Task Handling
self._assigned = False
confirmation_expiration_param_name = task + "_confirmation_expiration"
if confirmation_expiration_param_name in metadata:
self._confirmation_expiration = metadata[confirmation_expiration_param_name]
self.logger.debug(
"Established configured atomic confirmation expiration period for job task=%s, setting to %s",
self.task, self._confirmation_expiration)
else:
self._confirmation_expiration = 2 * self.interval
self.logger.debug(
"Could not establish configured atomic confirmation expiration period for job task=%s, setting to default of 2*interval, %s",
self.task, self._confirmation_expiration)
# assignment_info is only used for atomic completion or failure tracking
self.assignment_info = None
if task in metadata.get("atomic_tasks", []):
self.atomic = True
else:
self.atomic = False
self._parse_time = datetime.datetime.utcnow()
self._schedule_time = self._parse_time + datetime.timedelta(seconds=self.offset)
self._last_time = self._parse_time
#This is used to know who to preferably assign this particular job to
self.worker_affinity = []
def __str__(self):
"""
Return the job type, username, and target as representation for this config token
RETURNS string representation of this config token
"""
return "HydraConfigToken(task={0}, target={1}, username={2})".format(self.task, self.target, self.username)
def __repr__(self):
"""
Return the job type, username, and target as representation for this config token
RETURNS string representation of this config token
"""
return "HydraConfigToken(task={0}, target={1}, username={2})".format(self.task, self.target, self.username)
def getExecTime(self):
'''
@return: return execution time for token
'''
return self._execution_time
def __eq__(self, other):
"""
@param other: Object of HydraConfigToken for comparison
Return the boolean result if the object represents same token or not
"""
if isinstance(other, self.__class__):
return self.target == other.target and self.task == other.task and self.special == other.special
return False
def setExecTime(self, time_in_sec):
'''
@param time_in_sec: execution time in sec
@return: nothing
'''
self._execution_time = time_in_sec
def setOffset(self, initial_offset=0):
'''
Set initial token offset
@param initial_offset: offset value in sec
@return: nothing
'''
self.offset = initial_offset
self.logger.info("Successfully set offset for task=%s, target=%s, offset=%s", self.task, self.target,
self.offset)
#updating schedule time
self._schedule_time = self._parse_time + datetime.timedelta(seconds=self.offset)
self.logger.info("Schedule time is updated successfully for task=%s, target=%s, offset=%s", self.task,
self.target, self.offset)
def is_locked(self):
"""
Essentially an accessor for the _assigned prop currently, this method
returns an boolean indicating if this config token is currently locked,
i.e. blocked from creating jobs.
@rtype: bool
@return: True if locked, False if unlocked
"""
return self._assigned
def unlock(self):
"""
This method is used to reset the _assigned prop to False after the
confirmation of an atomic job's completion or expiration of the
confirmation period. If the job is past its current scheduled time,
resets the clock to schedule as of now.
@rtype: None
@return: None
"""
self._assigned = False
utc_now = datetime.datetime.utcnow()
if self._schedule_time < utc_now:
self._schedule_time = utc_now
def isReady(self):
"""
Determine if this task is ready to be scheduled.
Note for continuous tasks this is a mapping to whether or not the task is assigned.
@rtype: bool
@return True if ready to be assigned, False if not time yet
"""
should_be_ready = self._schedule_time <= datetime.datetime.utcnow()
if not self.atomic:
return should_be_ready
else:
if not self._assigned:
return should_be_ready
#Determine if we should auto unlock due to lack of notification in time
elif (self._schedule_time + datetime.timedelta(
seconds=self._confirmation_expiration)) <= datetime.datetime.utcnow():
if self.assignment_info is not None:
node_path, job_name = self.assignment_info
else:
node_path, job_name = ("UNKNOWN", "UNKNOWN")
self.logger.warning(
"[HydraConfigToken] [isReady] atomic config_token=%s failed to confirm execution of last assigned job=%s on node=%s within confirmation_expiration, unlocking job and allowing assignment...",
str(self), job_name, node_path)
self.unlock()
return should_be_ready
else:
#we are locked and should be locked
if self.assignment_info is not None:
node_path, job_name = self.assignment_info
else:
node_path, job_name = ("UNKNOWN", "UNKNOWN")
self.logger.debug(
"[HydraConfigToken] [isReady] atomic config_token=%s not yet to confirm execution of last assigned job=%s on node=%s, blocking job creation",
str(self), job_name, node_path)
return False
def scheduleNext(self):
"""
Schedule the next iteration of this particular config token This follows several rules.
At the most basic it adds its interval to the current schedule time and sets it. It will
however only do so if that time is in the future, if not it will continue to add the
interval until it is in the future flagging a warning each time.
@rtype: None
"""
if self.interval < 1:
return
schedule_time = self._schedule_time
self._last_time = schedule_time
cur_time = datetime.datetime.utcnow()
while True:
schedule_time += datetime.timedelta(seconds=self.interval)
if schedule_time > cur_time:
self._schedule_time = schedule_time
if not self.atomic:
self._assigned = False
break
else:
self.logger.warning(
"config token type {0} has missed one iteration due at {1}, scheduling for next iteration with interval {2}".format(
self.task, str(schedule_time), str(self.interval)))
def _updateAffinity(self, worker_path):
"""
Update self.worker_affinity to reflect the assignment of this job to this node.
args:
worker_path - the management uri of the worker to update the affinity for
RETURNS nothing
"""
try:
self.worker_affinity.remove(worker_path)
except ValueError:
self.logger.debug("[HydraConfigToken] first time instance of config_token=%s assigned to worker=%s",
str(self), worker_path)
self.worker_affinity.insert(0, worker_path)
def register_assignment(self, job_name, node_path):
"""
Take note of the job name and the worker the job was assigned to so that
we can unlock the job once it completes.
@type job_name: str
@param job_name: the name/id of the assigned job
@type node_path: str
@param node_path: the data collection node's node_path to which the job was assigned
@rtype: None
@return: None
"""
#Assignment info is the simple tuple of the node_path and the job_name as assigned to that worker
self.assignment_info = (node_path, job_name)
def assignToWorker(self, worker):
"""
Give this a worker's HydraWorkerNode object and it will assign it's current job
with the correct information to the specified worker.
If successfully assigned this will also mark this config token as assigned.
returns True if successful, False if not
"""
job_name = "job_" + makeFieldID()
job = JobTuple(job_name, self.target, self.task, self.metadata_id, self._schedule_time, self._last_time,
self._expiration_period, self.special)
priority_num = str(
int(time.mktime(job.create_time.timetuple())) + int(job.expiration_period + self._priority_adjustment))
if worker.addJob((priority_num, job), self, self.atomic):
self._updateAffinity(worker.node_path)
self._assigned = True
self.scheduleNext()
self.logger.debug("[HydraConfigToken] job=%s of task=%s queued for assignment to node=%s", job_name,
self.task, worker.node_path)
return True
else:
self.logger.error("[HydraConfigToken] job=%s of task=%s failed to be assigned to node=%s", job_name,
self.task, worker.node_path)
return False
class HydraCollectionManifest(object):
"""
An administration layer on top of collections of config tokens aimed at data
collection.
An instance of this class provides scheduling methods for its constituent tasks
"""
def __init__(self, logger, metadata_dict={}, config_token_list=[], app="SA-Hydra-inframon", collect_list=[]):
"""
Create a new instance of a collection manifest, optionally with a config
token list specified.
"""
self._metadata_dict = metadata_dict
self._config_token_list = config_token_list
self.collect_list = collect_list
self.logger = logger
self.app = app
# Dict which hold aggregate execution time taken by task level
# Key is task, value is tuple of execution time, by total execution cycles far
self.task_aggre_exec_time = {}
# Dict to hold aggregate execution time taken by task, target, meta_data_id
# Key is combination of target|task|meta_id, value is tuple of execution time, by total execution cycles so far
self.task_target_metaid_aggre_exec_time = {}
self._calculateTokenListProperties()
self._calculateExecutionTime()
def _calculateTokenListProperties(self):
"""
Recalculates all internal properties based on the token list including:
task_set - a set of all distinct tasks in the manifest
atomic_config_tokens - a list of all config tokens that are marked atomic
returns nothing
"""
self.task_set = set()
self.atomic_config_tokens = []
for token in self._config_token_list:
self.task_set.add(token.task)
if token.atomic:
self.atomic_config_tokens.append(token)
self.logger.debug("[HydraCollectionManifest] calculated aggregated collection task_set=%s", self.task_set)
def _calculateExecutionTime(self):
'''
Calculate initial execution time at task and target|task|metadata_id level
@return: nothing
'''
for token in self._config_token_list:
key = token.task
value = [float(token.getExecTime()), 1]
self._update_execution_dict(self.task_aggre_exec_time, key, value)
# update at target, task and metadata id level
key = token.target + "|" + token.task + "|" + token.metadata_id
self._update_execution_dict(self.task_target_metaid_aggre_exec_time, key, value)
def _calculateTaskWeights(self, node_list):
"""
Given a particular node list calculate the weights of all tasks
and return them as a dict
"""
weights = {}
for task in self.task_set:
weight = 0
for node in node_list:
if node.hasCapability(task):
weight += node.model.heads
self.logger.debug("[HydraCollectionManifest] calculated weight=%s for task=%s", weight, task)
if weight == 0:
self.logger.error(
"[HydraCollectionManifest] calculated weight=%s for task=%s implies no node will be able to perform jobs of this task, please alter capabilities of your nodes to accommodate this task",
weight, task)
weights[task] = weight
return weights
def getReadyJobs(self):
"""
gets a list of jobs ready to assign
returns list of jobs ready to be assigned
"""
ready_jobs = []
for token in self._config_token_list:
if token.isReady():
ready_jobs.append(token)
return ready_jobs
def getTimeToNextJob(self):
"""
Get the time in seconds until the next job is ready to be scheduled
RETURNS time in seconds
"""
time_to_next_job = None
utc_now = datetime.datetime.utcnow()
token_delta = 0
for token in self._config_token_list:
if utc_now > token._schedule_time:
token_delta = 0
else:
token_delta = (token._schedule_time - utc_now).seconds
if time_to_next_job is None:
time_to_next_job = token_delta
elif token_delta < time_to_next_job:
time_to_next_job = token_delta
if time_to_next_job is None:
raise ForceHydraRebuild(
"[HydraCollectionManifest] could not establish the time to next job run, forcing a rebuild...")
return time_to_next_job
def _update_execution_dict(self, dict_var, key, value):
'''
Support function to update task_aggre_exec_time or task_target_metaid_aggre_exec_time
@param dict_var : dict reference of one of them (task_aggre_exec_time or task_target_metaid_aggre_exec_time)
@key : key name which needs to be updated (target|task|metadata_id)
@value : Array of 2 items,
0 - Average value of give count in 1 index
1 - Number of cycles execution time reported for given key
@return nothing
'''
if key in dict_var:
total_cycle = dict_var[key][1] + value[1]
# Avoid divide by zero exception
if total_cycle > 0:
dict_var[key] = (
(float(dict_var[key][0] * dict_var[key][1] + value[0] * value[1])) / total_cycle, total_cycle)
else:
dict_var[key] = (value[0], value[1])
# check if dict value reached hit float max threshold
if dict_var[key][0] / sys.float_info.max > 0.85:
# reset it to 20% of the current value
self.logger.info(
"[HydraCollectionManifest] Float value has reached to 85% of max float so reseting it to 20% of the current value")
dict_var[key][0] = 0.2 * dict_var[key][0]
def _update_execution_time(self, info_dict):
'''
Update aggregated execution in task_aggre_exec_time, task_target_metaid_aggre_exec_time
@param info_dict: dict which has avg execution time reported by gateway
'''
for key, value in six.iteritems(info_dict):
# Update for target|task|metadata_id
self._update_execution_dict(self.task_target_metaid_aggre_exec_time, key, value[:2])
# Update for task
self._update_execution_dict(self.task_aggre_exec_time, key.split("|")[1], value[:2])
def _get_job_avg_exectime(self, key, task):
'''
Get avg execution from task_aggre_exec_time or task_target_metaid_aggre_exec_time
@return: avg execution time in float if exists otherwise 0.0
'''
if key in self.task_target_metaid_aggre_exec_time:
# Get target, task and metadata_id level if this is defined
return float(self.task_target_metaid_aggre_exec_time[key][0])
else:
# Get a task level (Assuming that task level this value is always be defined)
if task in self.task_aggre_exec_time:
return float(self.task_aggre_exec_time[task][0])
else:
# This code will not execute ever however if any case lets log the information
self.logger.error(
"[HydraCollectionManifest] Could not find execution time so skipping job to calculate the average execution time (key=%s)",
key)
return 0.0
def _calculateLoadDistribution(self, node_list, ready_jobs, node_job_infos):
"""
This method takes in a node_list and ready jobs and
calculates the load balance information. This load balance information
is represented by available_work_load which is a mapping of host path
to the number of load that node has to put work in.
@param node_list: a list of active HydraWorkerNode objects
@param read_jobs: jobs that need to be scheduled
@param node_job_infos: dict of aggregate job execution info which is reported by gateway from all
workers node (see getActiveJobInfo for details)
@return: tuple of available_work_load, balanced_load of each head
available_work_load is a dict which hold work load which can be handle by each worker
key of this dict is a worker path
balance_load : load of per worker head
"""
#Must use floats due to division
head_count = 0.0
queue_job_execution_time = {}
available_work_load = {}
for worker in node_list:
node_job_info = node_job_infos[worker.node_path]
self.logger.debug("[HydraCollectionManifest] Average time reported by gateway of node=%s, value=%s",
worker.node_path, node_job_info)
# Update execution time at task and target|task|metadata_id and task level
self._update_execution_time(node_job_info["job_aggregate_execution_info"])
# calculate unclaimed job execution time
total_unclaimed_queue_execution_time = 0.0
for key, value in six.iteritems(node_job_info["job_aggregate_execution_info"]):
total_unclaimed_queue_execution_time = total_unclaimed_queue_execution_time + value[
2] * self._get_job_avg_exectime(
key, key.split("|")[1]) # active jobs * avg time
queue_job_execution_time[worker.node_path] = total_unclaimed_queue_execution_time
self.logger.debug(
"[HydraCollectionManifest] node=%s current unclaimed queue length=%s, left execution time=%s",
worker.node_path, node_job_info["count"], total_unclaimed_queue_execution_time)
head_count += worker.model.heads
# calculate ready queue execution time
total_readyqueue_execution_time = 0.0
for ready_job in ready_jobs:
total_readyqueue_execution_time = total_readyqueue_execution_time + self._get_job_avg_exectime(
key=ready_job.target + "|" + ready_job.task + "|" + ready_job.metadata_id, task=ready_job.task)
balanced_load = math.ceil((total_readyqueue_execution_time + total_unclaimed_queue_execution_time) / head_count)
self.logger.debug(
"[HydraCollectionManifest] spraying jobs to %s heads on %s nodes with balanced unclaimed queue load per head of %s",
head_count, len(node_list), balanced_load)
for worker in node_list:
available_work_load[worker.node_path] = balanced_load * worker.model.heads - queue_job_execution_time[
worker.node_path]
self.logger.debug("[HydraCollectionManifest] node=%s load balanced available work load=%s",
worker.node_path, available_work_load[worker.node_path])
return available_work_load, balanced_load
def _queueJobsToWorkers(self, job_list, node_list, available_work_load, node_manifest, balanced_load, node_infos):
"""
Given a list of sorted jobs and a list of active HydraWorkerNode objects
this will queue all jobs to the workers. This uses the load balance info
in available_work_load and the workers_by_path dict.
Honestly is this method particularly useful isolated? No. But we make
sillyness for the sake of being able to unit test.
ARGS:
job_list - list of HydraConfigToken objects to be queued
node_list - list of HydraWorkerNode objects onto which to queue jobs
available_work_load - mapping of available load per node (see
_calculateLoadDistribution for details)
node_manifest - the node manifest to which all these nodes belong
balance_load - Load balancer value ( see _calculateLoadDistribution for details)
node_infos - is a dict which hold reported by gateway from each worker
RETURNS nothing
"""
worker_queue_load_sort_key = lambda worker: available_work_load[worker.node_path]
workers_by_path = node_manifest.nodes_by_path
# Define minimum jobs count as per head count and considering existing job count on that node
minimum_job = {}
for node in node_list:
minimum_job[node.node_path] = node.model.heads - node_infos[node.node_path]["count"]
for token in job_list:
worker_path = None
worker = None
avg_job_exec_time = self._get_job_avg_exectime(
key=token.target + "|" + token.task + "|" + token.metadata_id, task=token.task)
#First check the by token affinity
for tmp_path in token.worker_affinity:
if (available_work_load.get(tmp_path, 0) - avg_job_exec_time) > 0 or minimum_job.get(tmp_path, 0) > 0:
tmp_worker = workers_by_path[tmp_path]
if tmp_worker.hasCapability(token.task):
worker_path = tmp_path
worker = tmp_worker
self.logger.debug("Assigned job based upon job token affinity for node=%s node_path=%s",
worker, worker_path)
break
#If that fails, check the by target affinity
if worker_path is None:
for tmp_path in node_manifest.getPreferredNodesForTarget(token.target):
if (available_work_load.get(tmp_path, 0) - avg_job_exec_time) > 0 or minimum_job.get(tmp_path,
0) > 0:
tmp_worker = workers_by_path[tmp_path]
if tmp_worker.hasCapability(token.task):
worker_path = tmp_path
worker = tmp_worker
self.logger.debug("Assigned job based upon job target affinity for node=%s node_path=%s",
worker, worker_path)
break
#If that fails, just assign to a worker that has the best room(Start filling form smallest available load)
if worker_path is None:
node_list.sort(key=worker_queue_load_sort_key)
for node in node_list:
if node.hasCapability(token.task):
# First best fit node
if available_work_load[node.node_path] - avg_job_exec_time > 0 or minimum_job[
node.node_path] > 0:
worker = node
worker_path = node.node_path
self.logger.debug(
"Assigned job based upon job first best fit algorithm for node=%s node_path=%s",
worker, worker_path)
break
#If thats fails, (may be a corner case where sum of all available weight is greater than that job execution time, but individual node does not have enough load factor left out)
if worker_path is None:
node_list.sort(key=worker_queue_load_sort_key, reverse=True)
for node in node_list:
worker = node
worker_path = worker.node_path
if worker.hasCapability(token.task):
self.logger.debug("Assigned job based upon job weight for node=%s node_path=%s", worker,
worker_path)
break
else:
self.logger.error(
"[HydraCollectionManifest] unable to find an active node capable of executing config_token=%s of task=%s if no node configured for this task becomes active this config_token will never generate another job",
token, token.task)
worker = None
continue
#Queue up the job to the node for assignment to its gateway queue
if worker is not None and token.assignToWorker(worker):
node_manifest.updateTargetNodeAffinity(token.target, worker_path)
self.logger.debug(
"Before job assignment values are, node_weight=%s, job_weight=%s, minimum_job(can be negative value)=%s",
available_work_load[worker_path], avg_job_exec_time, minimum_job[worker_path])
available_work_load[worker_path] = available_work_load[worker_path] - avg_job_exec_time
minimum_job[worker_path] = minimum_job[worker_path] - 1
self.logger.debug(
"After job assignment value, node_weight=%s, job_weight=%s, minimum_job(can be negative value)=%s",
available_work_load[worker_path], avg_job_exec_time, minimum_job[worker_path])
def _unlock_atomic_jobs(self, node_job_infos):
"""
Given the job infos from the active nodes attempt to unlock all the
atomic config tokens that are currently locked. This is done by
matching the completed jobs against the atomic config token's stored
current job. If a node is not present that an atomic config token
previously assigned to and expects to hear from we flag a warning but
do not try to reassign unless the config token lock is expired.
@type node_job_infos: dict
@param node_job_infos: the dict of node -> job info
@rtype None
@return None
"""
for config_token in self.atomic_config_tokens:
if config_token.is_locked():
node_path, job_name = config_token.assignment_info
if node_path in node_job_infos:
node_job_info = node_job_infos[node_path]
node_atomic_job_info = node_job_info.get("atomic_job_info",
{"completed_atomic_jobs": [], "failed_atomic_jobs": []})
if job_name in node_atomic_job_info.get("completed_atomic_jobs", []):
config_token.unlock()
self.logger.debug(
"[HydraCollectionManifest] [UnlockAtomicJobs] confirmed completion of job=%s on node=%s originating from token=%s",
job_name, node_path, config_token)
elif job_name in node_atomic_job_info.get("failed_atomic_jobs", []):
config_token.unlock()
self.logger.error(
"[HydraCollectionManifest] [UnlockAtomicJobs] confirmed the failure of job=%s on node=%s originating from token=%s",
job_name, node_path, config_token)
else:
self.logger.debug(
"[HydraCollectionManifest] [UnlockAtomicJobs] unable to confirm completion/failure of job=%s on node=%s originating from token=%s with current execution information",
job_name, node_path, config_token)
def _getActiveWorkerInfo(self, node_manifest, confirm_status=True):
"""
Refresh Nodes and set new dict of active workers for scheduler run
@type confirm_status: bool
@param confirm_status: whether to confirm status by calling updateStatus
@rtype dict
@return active_workers
"""
node_manifest.refreshNodes(confirm_status)
active_workers = node_manifest.active_nodes
worker_count = len(active_workers)
if worker_count < 1:
self.logger.error(
"[HydraCollectionManifest] Attempted to assign jobs but we have no active workers to assign to. Restarting Scheduler...")
raise ForceHydraRebuild
return active_workers
def sprayReadyJobs(self, node_manifest):
"""
Take all config tokens that are ready to be assigned out as jobs and spray them over
active workers in efforts to even out job queues.
This method will also be in charge of managing affinities for workers to particular
target assets.
args:
node_manifest - the node manifest to get the worker nodes to assign the jobs to from
"""
#Get workers
active_workers = self._getActiveWorkerInfo(node_manifest)
#Get aggregate job execution information
node_job_infos = {}
for worker in active_workers:
activeJobInfo = worker.getActiveJobInfo()
#No active job found for this node
if len(activeJobInfo) > 0:
node_job_infos[worker.node_path] = activeJobInfo
#Need refreshNodes again in case node statuses are changed while getting active job
active_workers = self._getActiveWorkerInfo(node_manifest, False)
#Unlock atomic jobs here based on the node_job_infos
self._unlock_atomic_jobs(node_job_infos)
#Sort jobs by task weight primarily so that the hardest to schedule get scheduled first, secondarily by target
task_weights = self._calculateTaskWeights(active_workers)
ready_jobs = self.getReadyJobs()
ready_jobs.sort(key=lambda token: token.target)
ready_jobs.sort(key=lambda token: task_weights[token.task])
self.logger.debug("Sorted list of ready_jobs=%s", ready_jobs)
#Calculate load balancing
available_work_load, balanced_load = self._calculateLoadDistribution(active_workers, ready_jobs, node_job_infos)
#Spray the jobs onto workers
self._queueJobsToWorkers(ready_jobs, active_workers, available_work_load, node_manifest, balanced_load,
node_job_infos)
#Now that we have established the assignments, we actually commit the queues
reassign_jobs = []
atomic_reassign_jobs = {}
for worker in active_workers:
try:
failed_to_assign, atomic_failed_to_assign = worker.commitJobs()
reassign_jobs += failed_to_assign
atomic_reassign_jobs.update(atomic_failed_to_assign)
except Exception:
self.logger.exception(
"[HydraCollectionManifest] failed to assign batch of jobs for node=%s, may be dead and reassigning jobs to others, may cause job duplication",
worker.node_path)
reassign_jobs += (worker.add_jobs)
worker.add_jobs = []
if reassign_jobs != []:
#call primitive sprayJobSet that sprays already parsed jobs.
node_manifest.sprayJobSet(reassign_jobs, atomic_reassign_jobs)
class HydraWorkerNode(object):
'''
An object representation of a splunk forwarder running at least 1 HydraWorker process
and added to this scheduler's management in inframon_hydra_node.conf
'''
#Class variables for use in status
OFFLINE = False
ONLINE = True
def __init__(self, logger, path, password, model, gateway_uri, pool_name, session_key=None, metadata_dict=None,
worker_input_name="ta_vmware_collection_worker_inframon"):
"""
Initialize the HydraWorkerNode object.
args:
logger - a ref to a logger instance
path - this is the management uri (host_path) to this splunk server, e.g. https://forwarder.splunk.com:8089
password - the splunkd management service password for the node
model - this is the HydrasNodeStanza model object corresponding to this node
gateway_uri - this is the uri for the Hydra WSGI Gateway on the node, e.g. https://forwarder.splunk.com:8008
session_key - this is the valid session_key for this splunk forwarder
worker_input_name - the name of the modular input to control worker processes
"""
self.logger = logger
self.node_path = path
self.model = model
self.pool_name = pool_name
self.app = self.model.namespace
self.password = password
self.worker_input_name = worker_input_name
self.gateway_uri = gateway_uri
self.capabilities = self.model.capabilities if self.model.capabilities is not None else ["*"]
self.worker_log_level = self.model.log_level if self.model.log_level is not None else "INFO"
#Establish the session key and the status
if session_key is None:
self.refreshSessionKey()
else:
self.session_key = session_key
self.updateStatus()
self.configureGateway()
self.establishGateway()
if metadata_dict is not None:
self.setMetadata(metadata_dict, bounce_heads=False)
self.heads_list = self.establishHeads()
self.add_jobs = []
self.atomic_add_jobs = {}
def hasCapability(self, task):
"""
Check that this node can perform the given task
ARGS:
task - the task to check
RETURNS True if it can, False otherwise
"""
if "*" in self.capabilities or task in self.capabilities:
return True
else:
return False
def configureGateway(self):
"""
Configure the gateway on the node per the inframon_hydra_node configuration
"""
self.logger.info(
"[HydraWorkerNode] [configureGateway] setting gateway configuration on node=%s to bind to port=%s...",
self.node_path, self.model.gateway_port)
try:
stanza = HydraGatewayStanza.from_name("gateway", "SA-Hydra-inframon", host_path=self.node_path,
session_key=self.session_key)
if not stanza:
stanza = HydraGatewayStanza("SA-Hydra-inframon", "nobody", "gateway", sessionKey=self.session_key,
host_path=self.node_path)
stanza.port = self.model.gateway_port
for retry in range(4):
if stanza.passive_save():
self.logger.info("[HydraWorkerNode] [configureGateway] successfully configured gateway on node=%s",
self.node_path)
break
else:
self.logger.error(
"[HydraWorkerNode] [configureGateway] failed to configure gateway on node=%s after %s retries",
self.node_path, retry)
#mark this guy offline since we can't configure it remotely
self.status = HydraWorkerNode.OFFLINE
except Exception as e:
self.logger.exception(
"[HydraWorkerNode] [configureGateway] problem configuring gateway, marking node dead: %s", str(e))
self.status = HydraWorkerNode.OFFLINE
def establishGateway(self):
"""
Safely establish the adapter to the hydra gateway on the node. If it
cannot be established mark the node dead and set it to None.
"""
hga = None
self.logger.info("[HydraWorkerNode] [establishGateway] attempting to connect to gateway=%s for node=%s ...",
self.gateway_uri, self.node_path)
try:
hga = HydraGatewayAdapter(self.node_path, self.session_key, self.gateway_uri)
self.logger.info("[HydraWorkerNode] [establishGateway] successfully connected to gateway=%s for node=%s",
self.gateway_uri, self.node_path)
except splunk.SplunkdConnectionException:
self.logger.error(
"[HydraWorkerNode] [establishGateway] could not connect to gateway=%s for node=%s due to a socket error, timeout, or other fundamental communication issue, marking node as dead",
self.gateway_uri, self.node_path)
self.status = HydraWorkerNode.OFFLINE
except splunk.AuthenticationFailed:
self.logger.error(
"[HydraWorkerNode] [establishGateway] could not authenticate with gateway=%s for node=%s due to a splunkd authentication issue, marking node as dead",
self.gateway_uri, self.node_path)
self.status = HydraWorkerNode.OFFLINE
except splunk.LicenseRestriction:
self.logger.error(
"[HydraWorkerNode] [establishGateway] could not authenticate with gateway=%s for node=%s due to a splunkd license issue, this is fatal, marking node as dead permanently",
self.gateway_uri, self.node_path)
self.status = HydraWorkerNode.OFFLINE
except splunk.AuthorizationFailed:
self.logger.error(
"[HydraWorkerNode] [establishGateway] could not authenticate with gateway=%s for node=%s due to a splunkd user permissions issue, this is fatal, marking node as dead permanently",
self.gateway_uri, self.node_path)
self.status = HydraWorkerNode.OFFLINE
except splunk.ResourceNotFound:
self.logger.error(
"[HydraWorkerNode] [establishGateway] could not authenticate with gateway=%s for node=%s due to missing hydra gatekeeper EAI endpoint, this is fatal, marking node as dead permanently",
self.gateway_uri, self.node_path)
self.status = HydraWorkerNode.OFFLINE
except splunk.InternalServerError as e:
self.logger.error(
"[HydraWorkerNode] [establishGateway] could not authenticate with gateway=%s for node=%s due to internal server error=\"%s\", marking node as dead",
self.gateway_uri, self.node_path, str(e))
self.status = HydraWorkerNode.OFFLINE
except splunk.BadRequest as e:
self.logger.error(
"[HydraWorkerNode] [establishGateway] could not authenticate with gateway=%s for node=%s due to bad request error=\"%s\", marking node as dead",
self.gateway_uri, self.node_path, str(e))
self.status = HydraWorkerNode.OFFLINE
except splunk.RESTException as e:
self.logger.error(
"[HydraWorkerNode] [establishGateway] could not authenticate with gateway=%s for node=%s due to some crazy REST error=\"%s\", marking node as dead",
self.gateway_uri, self.node_path, str(e))
self.status = HydraWorkerNode.OFFLINE
except ServerNotFoundError as e:
self.logger.error(
"[HydraWorkerNode] [establishGateway] could not find gateway=%s for node=%s error=\"%s\", node will be dead permanently",
self.gateway_uri, self.node_path, str(e))
self.status = HydraWorkerNode.OFFLINE
except Exception as e:
self.logger.error(
"[HydraWorkerNode] [establishGateway] could not authenticate with gateway=%s for node=%s due to error=\"%s\", marking node as dead",
self.gateway_uri, self.node_path, str(e))
self.status = HydraWorkerNode.OFFLINE
self.gateway_adapter = hga
def getActiveJobInfo(self):
"""
Pulls the latest job information from the node's hydra gateway.
@return: a dict
key : target|task|metadata_id
value : is array of three items
0 : aggregate execution time
1 : number of times execution time is reported for this category
2 : unclaimed job count for this category
"""
job_info = {}
try:
job_info = self.gateway_adapter.get_job_info()
except Exception as e:
self.logger.exception(
"[HydraWorkerNode] node=%s is likely dead, could not get info on current job count, msg : %s",
self.node_path, str(e))
self.updateStatus(refresh_session_key=True)
return job_info
def __str__(self):
"""
Print the object and the node path as a string representation for this node.
"""
return "HydraWorkerNode(" + self.node_path + ")"
def __repr__(self):
"""
Print the object and the node path as a string representation for this node.
"""
return "HydraWorkerNode(" + self.node_path + ")"
def refreshSessionKey(self):
"""
Attempt to refresh the session key of this node either with shared key auth or
with username/password.
RETURNS True if successful, False if not
"""
session_key = None
unrecoverable = False
try:
session_key = auth.getSessionKey(self.model.user, self.password, self.node_path)
except splunk.SplunkdConnectionException:
self.logger.error(
"[HydraWorkerNode] node=%s is dead, could not connect to splunkd check path and if splunkd is up on remote node",
self.node_path)
except splunk.LicenseRestriction:
unrecoverable = True
self.logger.error("[HydraWorkerNode] node=%s is dead due to a license issue", self.node_path)
except splunk.AuthorizationFailed:
unrecoverable = True
self.logger.error(
"[HydraWorkerNode] node=%s is dead, could connect to splunkd but failed to auth check username and password",
self.node_path)
except Exception as e:
self.logger.exception("[HydraWorkerNode] node=%s is dead, because some weird stuff happened: %s",
self.node_path, str(e))
if session_key is not None:
self.logger.debug(
"[HydraWorkerNode] {0} is alive, successfully authenticated user {1}".format(self, self.model.user))
self.session_key = session_key
self.status = HydraWorkerNode.ONLINE
return True
else:
self.logger.error(
"[HydraWorkerNode] {0} is dead, failed to authenticate user {1}".format(self, self.model.user))
self.session_key = None
if self.model.credential_validation and unrecoverable:
self.logger.info("[HydraWorkerNode] node=%s is unrecoverably dead, marking so in inframon_hydra_node.conf",
self.node_path)
self.model.credential_validation = False
if not self.model.passive_save():
self.logger.error("[HydraWorkerNode] failed to save credential validation as false for node=%s",
self.node_path)
else:
hydra_scheduler = HydraScheduler()
hydra_scheduler.setConfModificationTime("node")
self.logger.info("[HydraWorkerNode] Updating the conf modification time property for node=%s", self.node_path)
self.status = HydraWorkerNode.OFFLINE
return False
#TODO: we need to work out the shared key auth stuff for a future release, not in chablis
def updateStatus(self, refresh_session_key=False):
"""
Check that this node's session key works and update it to either online or offline.
If refresh_session_key is True attempt to refresh the session_key on a 401.
args:
refresh_session_key - indicates that on a 401 node should attempt to refresh session_key
RETURNS self.status
"""
if refresh_session_key:
rsp_code = isSplunkSessionKeyValid(self.node_path, self.session_key, return_status=True)
if rsp_code == 200:
self.status = HydraWorkerNode.ONLINE
#Also need to establish Hydra Gateway because of VMW-4355, to make Hydra Node truely ONLINE
self.establishGateway()
elif rsp_code == 401:
self.logger.debug("[HydraWorkerNode] [updateStatus] detected unauthorized session key, refreshing...")
if self.refreshSessionKey():
self.status = HydraWorkerNode.ONLINE
#If we refresh session key, we should refresh our gateway adapter as well
self.establishGateway()
else:
self.status = HydraWorkerNode.OFFLINE
elif rsp_code == 404:
self.logger.debug(
"[HydraWorkerNode] [updateStatus] detected splunkd restart or explicit session kill, refreshing...")
if self.refreshSessionKey():
self.status = HydraWorkerNode.ONLINE
#If we refresh session key, we should refresh our gateway adapter as well
self.establishGateway()
else:
self.status = HydraWorkerNode.OFFLINE
else:
#This means something went funky so try to refresh session key
self.logger.debug("[HydraWorkerNode] [updateStatus] could not communicate with node, refreshing...")
if self.refreshSessionKey():
self.status = HydraWorkerNode.ONLINE
#If we refresh session key, we should refresh our gateway adapter as well
self.establishGateway()
else:
self.status = HydraWorkerNode.OFFLINE
else:
if isSplunkSessionKeyValid(self.node_path, self.session_key):
self.status = HydraWorkerNode.ONLINE
else:
self.status = HydraWorkerNode.OFFLINE
if self.status == HydraWorkerNode.OFFLINE:
self.logger.warning("[HydraWorkerNode] node=%s is offline/unresponsive/unauthenticated", self.node_path)
return self.status
def _toggleHead(self, head, action):
"""
Use this method to toggle the disabled property of a particular head
args:
head - the name of the head
action - either enable or disable
returns True if successful, else False
"""
try:
if action not in ["enable", "disable"]:
raise ValueError("[HydraWorkerNode] toggleHead action must be one of [enable, disable]")
path = self.node_path.rstrip(
"/") + "/servicesNS/nobody/" + self.app + "/data/inputs/" + self.worker_input_name + "/" + head + "/" + action
rsp, content = simpleRequest(path, method='POST', sessionKey=self.session_key)
if rsp.status == 200:
return True
else:
self.logger.error(
"[HydraWorkerNode] some weird bad stuff happened trying to toggle a hydra head=%s on node=%s see content=%s",
head, self.node_path, str(content))
return False
except ValueError as e:
raise e
except Exception:
self.logger.exception("[HydraWorkerNode] Problem enabling/disabling remote hydra head=%s on node=%s", head,
self.node_path)
return False
def disableHead(self, head):
"""
Shortcut to _toggleHead
"""
return self._toggleHead(head, "disable")
def enableHead(self, head):
"""
Configure and enable a particular head on a particular node
"""
uri = self.node_path.rstrip(
"/") + "/servicesNS/nobody/" + self.app + "/data/inputs/" + self.worker_input_name + "/" + head
rsp, content = simpleRequest(uri, sessionKey=self.session_key, method='POST',
postargs={'capabilities': ",".join(self.capabilities),
'log_level': self.model.log_level})
if rsp.status != 200:
self.logger.error(
"[HydraWorkerNode] problem saving configuration for head=%s on node=%s, got status=%s with an error_response=%s",
head, self.node_path, rsp.status, content)
return self._toggleHead(head, "enable")
def disableHeads(self, head_list):
"""
Disable a list of heads
RETURNS status boolean
"""
status = True
for head in head_list:
status = status and self.disableHead(head)
return status
def enableHeads(self, head_list):
"""
Enable a list of heads
RETURNS status boolean
"""
status = True
for head in head_list:
status = status and self.enableHead(head)
return status
def _safeSaveModels(self, *args):
"""
Call the save on all arg'ed in items. On fails log and update status
RETURNS nothing
"""
if args is not None:
for model in args:
if not model.passive_save():
self.logger.error(
"[HydraWorkerNode] could not save %s queue and node=%s, node may be down oh monkey turds...",
model.name, self.node_path)
#verify death
self.updateStatus()
def establishHeads(self):
"""
Grab the total configured heads on the node. Normalize the enabled ones
to match the model, or if model heads undefined define model per the remote config.
Note that only enabled heads make it to the heads dict.
RETURNS - a list of the names of enabled heads
"""
#First thing is to check the status, if we are OFFLINE we return []
enabled_heads = []
disabled_heads = []
if self.status != HydraWorkerNode.ONLINE:
self.logger.info("[HydraWorkerNode] cannot establish current heads for node=%s since it is down",
self.node_path)
else:
#Now that we know we are online we need to pull all the configured heads and sort them into enabled and disabled
configured_heads = en.getEntities("/data/inputs/" + self.worker_input_name, self.app, "nobody",
sessionKey=self.session_key, hostPath=self.node_path)
for head_name, config in six.iteritems(configured_heads):
if not normalizeBoolean(config.get("disabled", True)):
self.logger.debug(
"[HydraWorkerNode] found enabled input process on node=%s with name=%s and config=%s",
self.node_path, head_name, config)
enabled_heads.append(head_name)
else:
self.logger.debug(
"[HydraWorkerNode] found disabled input process on node=%s with name=%s and config=%s",
self.node_path, head_name, config)
disabled_heads.append(head_name)
heads = getattr(self.model, "heads", 0)
num_enabled_heads = len(enabled_heads)
if num_enabled_heads == heads:
self.logger.info("[HydraWorkerNode] Correct number of heads=%s on node=%s", heads, self.node_path)
elif num_enabled_heads < heads:
self.logger.error(
"[HydraWorkerNode] Incorrect number of heads=%s on node=%s, actual_heads=%s, trying to bring number up to correct value",
heads, self.node_path, num_enabled_heads)
needed_heads = heads - num_enabled_heads
if len(disabled_heads) >= needed_heads:
try:
new_heads_enabled = []
for ii in range(needed_heads):
head = disabled_heads[ii]
if self.enableHead(head):
new_heads_enabled.append(head)
else:
raise splunk.RESTException("Could not manage inputs on remote node=%s", self.node_path)
for head in new_heads_enabled:
enabled_heads.append(head)
disabled_heads.remove(head)
except splunk.RESTException:
self.logger.exception(
"[HydraWorkerNode] Could not manage inputs on remote node=%s, freezing heads at last configured status",
self.node_path)
#Maybe we lost connectivity to the node, we should update status
if self.updateStatus() == HydraWorkerNode.OFFLINE:
return []
else:
num_disabled_heads = len(disabled_heads)
self.logger.error(
"[HydraWorkerNode] node=%s does not have enough configured input processes to match configured heads, can only enable extra %s inputs, required %s",
self.node_path, num_disabled_heads, needed_heads)
try:
new_heads_enabled = []
for ii in range(num_disabled_heads):
head = disabled_heads[ii]
if self.enableHead(head):
new_heads_enabled.append(head)
else:
raise splunk.RESTException("Could not manage inputs on remote node=%s", self.node_path)
for head in new_heads_enabled:
disabled_heads.remove(head)
enabled_heads.append(head)
except splunk.RESTException:
self.logger.exception(
"[HydraWorkerNode] Could not manage inputs on remote node=%s, freezing heads at last configured status",
self.node_path)
#Maybe we lost connectivity to the node, we should update status
if self.updateStatus() == HydraWorkerNode.OFFLINE:
return []
else:
#We have too many heads, need to disable some
kill_count = num_enabled_heads - heads
self.logger.error(
"[HydraWorkerNode] Incorrect number of heads=%s on node=%s, actual_heads=%s, trying to bring number down to correct value",
heads, self.node_path, num_enabled_heads)
try:
new_heads_disabled = []
for ii in range(kill_count):
head = enabled_heads[ii]
if self.disableHead(head):
new_heads_disabled.append(head)
else:
raise splunk.RESTException("Could not manage inputs on remote node=%s", self.node_path)
for head in new_heads_disabled:
enabled_heads.remove(head)
disabled_heads.append(head)
except splunk.RESTException:
self.logger.exception(
"[HydraWorkerNode] Could not manage inputs on remote node=%s, freezing heads at last configured status",
self.node_path)
#Maybe we lost connectivity to the node, we should update status
if self.updateStatus() == HydraWorkerNode.OFFLINE:
return []
#Okay now we have the correct, or frozen enabled and disabled lists
#We bounce all the heads to make sure they are actually up
#FIXME: when we have a real interval setting for modular inputs this is not necessary (SOLNVMW-3106)
self.disableHeads(enabled_heads)
if not self.enableHeads(enabled_heads):
self.logger.error(
"[HydraWorkerNode] Could not manage inputs on remote node=%s while bouncing heads, node is being marked offline",
self.node_path)
self.status = HydraWorkerNode.OFFLINE
if self.model.heads != len(enabled_heads):
self.model.heads = len(enabled_heads)
if not self.model.passive_save():
self.logger.error("[HydraWorkerNode] could not save inframon_hydra_node conf stanza for node=%s",
self.node_path)
else:
hydra_scheduler = HydraScheduler()
hydra_scheduler.setConfModificationTime("node")
self.logger.info("[HydraWorkerNode] Updating conf modified property for node=%s", self.node_path)
return enabled_heads
def resurrect(self):
"""
Attempt to bring this node back to life, refreshing all internal properties
RETURNS status boolean
"""
self.model = self.model.from_self()
if self.updateStatus(refresh_session_key=True) == HydraWorkerNode.ONLINE:
self.heads_list = self.establishHeads()
self.configureGateway()
return True
else:
return False
def checkHeadHealth(self):
"""
Check that all heads have reported in their health status.
If they have not reported in a health status they get bounced.
RETURNS: nothing
"""
#First we refresh the endpoint
path = self.node_path.rstrip("/") + "/servicesNS/nobody/" + self.app + "/configs/conf-hydra_health/_reload"
try:
simpleRequest(path, sessionKey=self.session_key, raiseAllErrors=True)
except splunk.AuthenticationFailed:
self.logger.warning(
"[HydraWorkerNode] could not refresh the hydra health conf for node=%s, due to auth failure, refreshing session_key...",
self.node_path)
self.updateStatus(refresh_session_key=True)
except Exception as e:
self.logger.exception("[HydraWorkerNode] could not refresh the hydra health conf for node=%s, message: %s",
self.node_path, str(e))
try:
#Now we can iterate across logging and bouncing heads
bounced_heads = 0
health_stanzas = HydraHealthStanza.all(host_path=self.node_path, sessionKey=self.session_key)
health_stanzas = health_stanzas.filter_by_app(self.app)
health_stanzas._owner = "nobody"
for stanza in health_stanzas:
if stanza.head is None:
self.logger.warning("[HydraWorkerNode] got a bad stanza in hydra_health stanza=%s", stanza.name)
continue
self.logger.error(
"[HydraWorkerNode] regrowing head due to sad face health problem reported for head=%s on node=%s : msg='%s'",
stanza.head, self.node_path, stanza.reason)
successful = False
already_disabled = False
for retry in range(3):
if not already_disabled and not self.disableHead(stanza.head):
continue
else:
already_disabled = True
if not self.enableHead(stanza.head):
continue
else:
successful = True
break
del retry #just to stop the linter's whining
if successful:
#Where one falls two shall grow in their place! ... err i mean one
self.logger.info(
"[HydraWorkerNode] successfully regrew head=%s on node=%s after health cry sad face",
stanza.head, self.node_path)
bounced_heads += 1
if not stanza.passive_delete():
self.logger.error(
"[HydraWorkerNode] could not delete health stanza for head=%s on node=%s after restart, head will likely be double restarted",
stanza.head, self.node_path)
else:
self.logger.error(
"[HydraWorkerNode] failed to regrow head=%s on node=%s after health cry sad face, will try again later",
stanza.head, self.node_path)
except splunk.AuthenticationFailed:
self.logger.exception(
"[HydraWorkerNode] could not act on the hydra health conf for node=%s, due to auth failure, refreshing session_key...",
self.node_path)
self.updateStatus(refresh_session_key=True)
except Exception as e:
self.logger.exception("[HydraWorkerNode] could not act on the hydra health conf for node=%s, message: %s",
self.node_path, str(e))
#Finally log what we did
if bounced_heads == 0:
self.logger.debug("[HydraWorkerNode] no heads regrown after they cried for help on node=%s", self.node_path)
else:
self.logger.info("[HydraWorkerNode] head_count=%s regrown after crying for help on node=%s", bounced_heads,
self.node_path)
def addJob(self, priority_job_tuple, config_token=None, is_atomic=False):
"""
Add a new job to the open queue
@type priority_job_tuple: tuple
@param priority_job_tuple: tuple of (priority num, JobTuple)
@type config_token: HydraConfigToken
@param config_token: the HydraConfigToken that created the job
@type is_atomic: bool
@param is_atomic: True if the job to be assigned is atomic, False otherwise
@rtype: bool
@return: status boolean
"""
self.add_jobs.append(priority_job_tuple)
if is_atomic:
priority_num, job_tuple = priority_job_tuple
del priority_num
self.atomic_add_jobs[job_tuple.name] = config_token
return True
def commitJobs(self):
"""
Take all jobs in the add_jobs queue and commit them to the gateway on
this data collection node. If tyhere is a communication failure return
@rtype: tuple
@return: (any jobs that need to be reassigned as a list, atomic job names to config tokens as dict)
"""
to_reassign = []
atomic_to_reassign = {}
#Send to the worker
#TODO: we still want to look at handling partial job parse completes both in the gateway service and here (implement a status code 205)
if len(self.add_jobs) > 0:
status_code = self.gateway_adapter.commit_job_batch(self.add_jobs)
if status_code == 200:
self.logger.debug("[HydraWorkerNode] successfully saved job batch on node=%s with number_new_jobs=%s",
self.node_path, len(self.add_jobs))
for job_name, config_token in six.iteritems(self.atomic_add_jobs):
config_token.register_assignment(job_name, self.node_path)
else:
self.logger.error(
"[HydraWorkerNode] could not save job batch on node=%s got a status_code=%s may be a sad face situation for that node",
self.node_path, status_code)
self.updateStatus(refresh_session_key=True)
to_reassign = self.add_jobs
atomic_to_reassign = self.atomic_add_jobs
#Finally we added all our jobs so we set the add jobs to empty
self.add_jobs = []
self.atomic_add_jobs = {}
return to_reassign, atomic_to_reassign
def setMetadata(self, metadata_dict, bounce_heads=True):
"""
Set the metadata stanza on this node to reflect the given dict.
If it fails this node is marked as dead
RETURNS nothing
"""
#build new metadata stanza
success = False
try:
new_metadata_stanza = HydraMetadataStanza(self.app, "nobody", "metadata", host_path=self.node_path,
sessionKey=self.session_key)
for metadata_id, metadata in six.iteritems(metadata_dict):
setattr(new_metadata_stanza, metadata_id, metadata)
old_metadata_stanza = HydraMetadataStanza.from_name("metadata", self.app, "nobody", self.node_path,
self.session_key)
if old_metadata_stanza:
if not old_metadata_stanza.passive_delete():
self.logger.error(
"[HydraWorkerNode] node=%s failed to delete old metadata, some unnecessary data may linger",
self.node_path)
if not new_metadata_stanza.passive_save():
self.logger.error(
"[HydraWorkerNode] node=%s failed to save new metadata, node is effectively dead, will attempt to resurrect...",
self.node_path)
self.status = HydraWorkerNode.OFFLINE
else:
if bounce_heads and hasattr(self, "heads_list"):
#Since metadata was updated we bounce the heads on the remote node to get them to pick up the new metadata
heads = self.heads_list
self.disableHeads(heads)
self.enableHeads(heads)
success = True
self.logger.info("[HydraWorkerNode] New meta data is distributed: %s.", new_metadata_stanza)
except splunk.SplunkdConnectionException:
self.logger.error(
"[HydraWorkerNode] node=%s is dead, could not connect to splunkd check path and if splunkd is up on remote node",
self.node_path)
except splunk.LicenseRestriction:
self.logger.error("[HydraWorkerNode] node=%s is dead due to a license issue", self.node_path)
except splunk.AuthorizationFailed:
self.logger.error(
"[HydraWorkerNode] node=%s is dead, could connect to splunkd but failed to auth check username and password",
self.node_path)
except Exception as e:
self.logger.exception("[HydraWorkerNode] node=%s is dead, because some weird stuff happened: %s",
self.node_path, str(e))
if not success:
self.status = HydraWorkerNode.OFFLINE
class HydraWorkerNodeManifest(object):
"""
This is a container for many HydraWorkerNode objects.
It provides convenience methods for dealing with groups of nodes.
"""
def __init__(self, logger, node_list, app, worker_input_name):
"""
Construct a manifest with the given nodes.
args:
logger - a logger instance
node_list - a list of HydraWorkerNode Objects
app - the app namespace to apply to the nodes
"""
self.logger = logger
self.worker_input_name = worker_input_name
self.nodes = node_list
node_path_list = []
nodes_by_path = {}
for node in node_list:
node_path_list.append(node.node_path)
nodes_by_path[node.node_path] = node
self.all_node_paths = node_path_list
self.nodes_by_path = nodes_by_path
self.app = app
self.active_nodes, self.dead_nodes = self.getNodes()
self.target_node_affinity = {}
def updateTargetNodeAffinity(self, target, node_path):
"""
Update the target node affinity object to reflect the assignment of one
of a target to a particular node.
args:
target - the target whose affinities need to be updated
node_path - the path of the node that just did business with the target
RETURNS nothing
"""
node_affinity_array = self.target_node_affinity.get(target, False)
if not node_affinity_array:
node_affinity_array = []
self.target_node_affinity[target] = node_affinity_array
try:
node_affinity_array.remove(node_path)
except ValueError:
self.logger.info(
"[HydraWorkerNodeManifest] first time job for target={0} assigned to worker={1}".format(target,
node_path))
#Update by reference
node_affinity_array.insert(0, node_path)
def getPreferredNodesForTarget(self, target):
"""
Get the node affinity for a particular target
args:
target - the target for which you want the node affinity
RETURNS an array of node_paths ordered from preferred to least preferred
"""
return self.target_node_affinity.get(target, False) or []
def refreshNodes(self, confirm_status=True):
"""
Refresh the active and dead nodes lists
"""
self.active_nodes, self.dead_nodes = self.getNodes(confirm_status=confirm_status)
def getNodes(self, confirm_status=False):
"""
This method retrieves a list of active nodes, that is nodes that
have a status value of HydraWorkerNode.ONLINE. The nodes are not
tested for whether or not they are online unless conirm_status is
True. If confirm_status is True nodes will also attempt to refresh
their session_keys if they are failing.
args:
confirm_status - flag indicating whether or not to test the
status of the nodes
RETURNS a list of online HydraWorkerNode objects and a list of offline
"""
active_nodes = []
dead_nodes = []
self.logger.debug("[HydraWorkerNodeManifest] checking the status of all nodes")
if confirm_status:
for node in self.nodes:
if node.updateStatus(refresh_session_key=True):
active_nodes.append(node)
else:
dead_nodes.append(node)
else:
for node in self.nodes:
if node.status == HydraWorkerNode.ONLINE:
active_nodes.append(node)
else:
dead_nodes.append(node)
return active_nodes, dead_nodes
def checkHealth(self, active_nodes=None):
"""
Check the health of nodes, bouncing any heads that have not reported in.
RETURNS: nothing
"""
if active_nodes is None:
active_nodes = self.active_nodes
for node in active_nodes:
self.logger.debug("[HydraWorkerNodeManifest] checking health of node=%s", node.node_path)
node.checkHeadHealth()
def resurrectDeadNodes(self):
"""
Work through the dead nodes trying to bring them back to life
RETURNS a list of nodes that were brought back to life
"""
resurrected_nodes = []
for ii in range(len(self.dead_nodes)):
node = self.dead_nodes[ii]
self.logger.debug("[HydraWorkerNodeManifest] attempting to resurrect node=%s", str(node))
if node.resurrect():
self.logger.debug("[HydraWorkerNodeManifest] successfully resurrected node=%s", str(node))
resurrected_nodes.append(node)
self.dead_nodes.pop(ii)
else:
self.logger.info("[HydraWorkerNodeManifest] failed to resurrect node=%s", str(node))
if len(resurrected_nodes) > 0:
self.active_nodes += resurrected_nodes
return resurrected_nodes
def sprayJobSet(self, job_set, atomic_job_set):
"""
This is a safety method in case something happens during ready job assignment.
It is non ideal and just dumps jobs out to workers as quick as possible
@type job_set: list
@param job_set: this is an iterable of JobTuple
@type atomic_job_set: dict
@param atomic_job_set: dict of job_name -> config_token that created it
@rtype: None
@return None
"""
self.logger.debug("[HydraWorkerNodeManifest] failover to generic job spray initiated")
self.refreshNodes(confirm_status=False)
worker_count = len(self.active_nodes)
if worker_count < 1:
self.logger.error(
"[HydraWorkerNodeManifest] Attempted to assign jobs but we have no active workers to assign to. Restarting Scheduler...")
raise ForceHydraRebuild
worker_index = 0
for job_tuple in job_set:
if isinstance(job_tuple, JobTuple):
job_name = job_tuple.name
task = job_tuple.task
elif isinstance(job_tuple, tuple) and len(job_tuple) == 2:
job_name = job_tuple[1].name
task = job_tuple[1].task
else:
raise TypeError(
"Unexpected type=%s and size inside job_batch, expected either JobTuple or tuple of form (priority, JobTuple)" % type(
job_tuple))
iter_count = 0
while iter_count < worker_count:
iter_count += 1
if worker_index == worker_count:
worker_index = 0
worker = self.active_nodes[worker_index]
worker_index += 1
if worker.hasCapability(task):
if job_name in atomic_job_set:
worker.addJob(job_tuple, atomic_job_set[job_name], True)
else:
worker.addJob(job_tuple)
break
else:
self.logger.error(
"[HydraCollectionManifest] unable to find an active node capable of executing job=%s of task=%s if no node configured for this task becomes active jobs of this task will never be assigned",
job_tuple, task)
#Now that we have established the assignments, we actually commit the queues
reassign_jobs = []
atomic_reassign_jobs = {}
for worker in self.active_nodes:
try:
failed_to_assign, atomic_failed_to_assign = worker.commitJobs()
reassign_jobs += failed_to_assign
atomic_reassign_jobs.update(atomic_failed_to_assign)
except Exception:
self.logger.exception(
"[HydraCollectionManifest] failed to assign batch of jobs for node=%s, marking dead and reassigning jobs to others, may cause job duplication",
worker.node_path)
reassign_jobs += worker.add_jobs
worker.add_jobs = []
if reassign_jobs != []:
#call primitive sprayJobSet that sprays already parsed jobs.
self.sprayJobSet(reassign_jobs, atomic_reassign_jobs)
def commitMetadata(self, metadata_dict):
"""
Set the metadata stanza for all active nodes to the passed dict
Note that this deletes all past metadata which may result in the failure of
old jobs if metadata has changed
"""
for worker in self.active_nodes:
worker.setMetadata(metadata_dict)
self.refreshNodes()
class HydraScheduler(ModularInput):
title = "Hydra Scheduler"
description = "Schedule Distributed Work. DO NOT have more than one scheduler working simultaneously. It will result in task duplication"
collection_model = None
app = None
collection_conf_name = None
worker_input_name = None
generate_auto_offset = True
def __init__(self):
self.output = XMLOutputManager()
args = [
Field("name", "Scheduler Name",
"A name for your scheduler input to attach to all events that originate from it directly.",
required_on_create=False),
Field("log_level", "Logging Level", "This is the level at which the scheduler will log data.",
required_on_create=True),
DurationField("duration", "Duration",
"This is the minimum time between runs of the input should it exit for some reason",
required_on_create=True)
]
scheme_args = {'title': self.title,
'description': self.description,
'use_external_validation': "true",
'streaming_mode': "xml",
'use_single_instance': "false"}
ModularInput.__init__(self, scheme_args, args)
# Use the below data to update data in hydra_avg_execution_time.json
self.checkpoint_file = "hydra_avg_execution_time.json"
# Number of cycle of scheduler after that check point dir need to be updated
# TODO: this value should be relative to interval or duration value
self.checkpoint_update_cycle = 200
def checkDeadNodes(self, collect_list):
"""
Check a the node manifest's dead nodes and attempt to bring them back up.
If they come back up, give them some credentials.
RETURNS: nothing
"""
resurrected_nodes = self.node_manifest.resurrectDeadNodes()
if len(resurrected_nodes) > 0:
self.logger.info("distributing credentials and metadata to newly resurrected nodes=%s", resurrected_nodes)
self.distributeCredentials(resurrected_nodes, collect_list)
self.distributeMetadata(resurrected_nodes)
def augmentMetadataByStanza(self, config, stanza_name):
"""
Overload this method to augment the metadata used by a particular job.
Editing config by reference will edit the configuration for all jobs
in the stanza.
RETURNS nothing (all edits must be done by reference)
"""
pass
def augmentMetadataByTarget(self, special, config, stanza_name, target):
"""
Overload this method to augment the metadata used by a particular job.
Editing config by reference will edit the configuration for all jobs
in the stanza.
Editing special by reference will edit it only for particular targets
within the stanza.
RETURNS nothing (all edits must be done by reference)
"""
pass
def augmentMetadataByTask(self, special, config, stanza_name, target, task):
"""
Overload this method to augment the metadata used by a particular job.
Editing config by reference will edit the configuration for all jobs
in the stanza.
Editing special by reference will edit it only for particular target
task pairs within the stanza.
RETURNS nothing (all edits must be done by reference)
"""
pass
def augmentTaskExecutionTime(self, collection_manifest):
'''
Overload this methof to agument execution time for task or target|task|metadata_id
level
@param collection_manifest: HydraCollectionManifest object which is created earlier
@return modifies object of collection_manifest
'''
# Read data from check point dir
if self.checkpoint_data_exists(self.checkpoint_file, self.checkpoint_dir):
data = self.get_checkpoint_data(self.checkpoint_file, self.checkpoint_dir)
# Update data at task level target_task_metadata_level_data, task_level_data
if data and isinstance(data, dict):
for key, items in six.iteritems(data.get("task_level_data", {})):
# Reset the count
items[1] = 1
if key in collection_manifest.task_aggre_exec_time:
collection_manifest.task_aggre_exec_time[key] = tuple(items)
for key, items in six.iteritems(data.get("target_task_metadata_level_data", {})):
# Reset the count
items[1] = 1
# if target is deleted then we do not want to read information about that target
if key in collection_manifest.task_target_metaid_aggre_exec_time:
collection_manifest.task_target_metaid_aggre_exec_time[key] = tuple(items)
return collection_manifest
def getConfigTokenOffsets(self, token_list, total_worker_heads, schedular_execution_time=15,
head_dist_bucketsize=2):
"""
Calculate offset values for job if no of jobs are more than is head_dist_bucketsize*total_worker_heads
Offset values is being calculated by grouping equal interval together and assign offset value for those
interval
@param:
token_list - HydraConfigToken list
total_worker_heads - total active heads
schedular_execution_time - schedular execution time
head_dist_bucketsize - Min bucket counts for each head job buckets
@return: dict of offset for each job
"""
self.logger.info("Start process to get initial offset for number of jobs %s", len(token_list))
# Note minimum jobs assign to each worker is 2 (head_distribution_size), so it token is more than 2* total_heads
if (head_dist_bucketsize * total_worker_heads > len(token_list)):
self.logger.info(
"Total jobs are less than threashold value of worker(s) load, hence no need to set auto job offset")
return
# check for schedular execution time
if schedular_execution_time == 0:
self.logger.info("Schedular execution time can't be zero, returning default offset values")
return
# Group equal interval token for per target
group_dict = {}
for token in token_list:
if group_dict.get(token.interval, False):
# Interval already exists, add token to existing list
self.logger.debug("Appending token %s in interval %s", token, token.interval)
group_dict[token.interval].append(token)
else:
# Add _internval first time
self.logger.info("Adding first time data %s interval %s", token, token.interval)
group_dict[token.interval] = [token]
# Done with grouping, now apply algorithm for each group of token
self.logger.debug("Calculated group based upon interval %s", str(group_dict))
for interval, tokens in six.iteritems(group_dict):
if len(tokens) <= 1:
# Only one job, no need to distribute
self.logger.debug(
"Auto offset algorithm work only if there is more than one job has same interval. We found only one job with interval value=%s, hence skipping auto offset calcualtion for it",
interval)
continue
if interval is None or interval <= schedular_execution_time:
# Make sure we have atleast one cycle to set auto set
temp_interval = schedular_execution_time
self.logger.debug(
"To calculate offset, job interval should be more than hydra schedular execution time, hence distribute offset with in interval (hydra schedular time)=%s",
temp_interval)
else:
temp_interval = interval
# No of schedular cycle to distribute jobs over the cycles
no_of_cycles = math.ceil(float(interval) / schedular_execution_time)
self.logger.debug(
"Number of hydra schedular cycles=%s, in which job is distributed by adding offset value.",
no_of_cycles)
# Load balance factor which allow to jobs offset by no of jobs in that interval
# For example 10 cycles, 3 tokens, allow to set offset of interval of 45
increasing_offset_factor = math.floor(no_of_cycles / len(tokens)) * schedular_execution_time
# if Jobs (token) are more than no of cycles
if increasing_offset_factor == 0:
increasing_offset_factor = schedular_execution_time
self.logger.debug("Job offset incremental value=%s", increasing_offset_factor)
for ii in range(len(tokens)):
if tokens[ii].task + "_offset" in tokens[ii].metadata:
self.logger.info("External offset is set hence ignoring the auto offset")
else:
tokens[ii].setOffset((ii % no_of_cycles) * increasing_offset_factor)
self.logger.info("New initial offset %s for token %s",
(ii % no_of_cycles) * increasing_offset_factor, tokens[ii])
self.logger.info("Successfully set offsets for all tokens")
def updateCheckPointDir(self, collection_manifest):
'''
Update check point dir using collection_manifest object
'''
if collection_manifest:
self.set_checkpoint_data(self.checkpoint_file,
{"task_level_data": collection_manifest.task_aggre_exec_time,
"target_task_metadata_level_data": collection_manifest.task_target_metaid_aggre_exec_time},
self.checkpoint_dir)
self.logger.info("Updated check point file=%s successfully", self.checkpoint_file)
else:
self.logger.error("Failed to update check point file=%s, because collection manifest is not defined.",
self.checkpoint_file)
def updateTargetDictStatus(self, target_info_dict={}):
pass
def checkvCenterConnectivity(self, rewrite=False):
pass
def distributeHierarchy(self):
pass
def establishCollectionManifest(self, calculate_auto_offset=False, total_heads=0, is_timediff_lt_4hr=True, old_token_list=[]):
"""
Get the information from the collection conf then break it up into
atomic tasks and place them in the collection manifest
@param:
calculate_auto_offset: Required if initial offset needs to be calculated
total_heads: total active worker heads, required if calculate_auto_offset is set to True
return HydraCollectionManifest with entire contents of collect conf file
"""
#Get collection conf information
for retry in range(4):
collects = self.collection_model.all(host_path=self.local_server_uri, sessionKey=self.local_session_key)
collects._owner = "nobody"
collects = collects.filter_by_app(self.app)
if collects is not None and len(collects) > 0:
break
else:
if retry == 3:
self.logger.error(
"[establishCollectionManifest] Could not get collection or no collection defined from scheduler host=%s, after number of retry=%s",
self.local_server_uri, retry)
raise ForceHydraRebuild(
"[establishCollectionManifest] Failed to get collection, hence rebuilding hydra...")
else:
self.logger.error(
"[establishCollectionManifest] Could not get collection or no collection defined from scheduler host=%s, after number of retry=%s",
self.local_server_uri, retry)
metadata_dict = {}
token_list = []
for collect in collects:
self.logger.info("Processing collection stanza={0}".format(collect.name))
config = {}
username = collect.username
for field in collect.model_fields:
#forgive me this but models don't implement a get item function so we have to do this
config[field] = getattr(collect, field)
self.logger.info("parsed collection stanza={0} into a config={1}".format(collect.name, str(config)))
metadata_id = "metadata_" + collect.name
metadata_dict[metadata_id] = config
self.augmentMetadataByStanza(config, collect.name)
special = {}
for target in collect.target:
self.augmentMetadataByTarget(special, config, collect.name, collect.target)
for task in collect.task:
self.augmentMetadataByTask(special, config, collect.name, collect.target, collect.task)
token_list.append(
HydraConfigToken(target, username, task, metadata_id, self.logger, metadata=config,
special=special))
self.logger.debug("Establishing collection manifest with token list: {0}".format(str(token_list)))
# calculate auto offset
if calculate_auto_offset:
self.getConfigTokenOffsets(token_list, total_heads, schedular_execution_time=self.scheduling_resolution,
head_dist_bucketsize=2)
#Distribute Metadata to all nodes
self.metadata_dict = metadata_dict
collection_manifest = HydraCollectionManifest(self.logger, metadata_dict, token_list, self.app)
collection_manifest = self.augmentTaskExecutionTime(collection_manifest)
return collection_manifest
def distributeCredentials(self, nodes, collect_list):
"""
This takes in a list of nodes and then distributes all credentials
local to the current app to all the nodes, excluding node credentials.
args:
nodes - list of HydraWorkerNode objects to distribute credentials to
RETURNS nothing
"""
#gather up our credentials
creds = SplunkStoredCredential.all(host_path=self.local_server_uri, sessionKey=self.local_session_key)
# Removing filter by add-on namespace while pushing Credentials to DCN for accessing creds globally
# creds = creds.filter_by_app(self.app)
creds._owner = "nobody" #FIXME: nobody no no
self.logger.debug("attempting to distribute local credentials to nodes={0}".format(str(nodes)))
for cred in creds:
self.logger.debug("processing credential for realm={0} user={1}".format(cred.realm, cred.username))
# Only distribute credentials for vCenters!
if cred.realm in collect_list:
for node in nodes:
new_cred = SplunkStoredCredential(self.app, "nobody", cred.username, sessionKey=node.session_key,
host_path=node.node_path)
new_cred.realm = cred.realm
new_cred.password = cred.clear_password
new_cred.username = cred.username
if not new_cred.passive_save():
self.logger.error(
"Failed to distribute credential: realm={0} username={1} to node={2}".format(cred.realm,
cred.username,
node.node_path))
else:
self.logger.debug(
"Successfully distributed credential: realm={0} username={1} to node={2}".format(cred.realm,
cred.username,
node.node_path))
self.logger.debug("finished distributing local credentials to nodes={0}".format(str(nodes)))
def distributeMetadata(self, node_list=None):
"""
If we have both a metadata dict and a node manifest, distribute the metadata to all nodes
args:
node_list - if not None will only distribute metadata to given list of HydraWorkerNode objects
RETURNS nothing
"""
if node_list is not None and self.metadata_dict is not None:
for node in node_list:
node.setMetadata(self.metadata_dict)
if self.node_manifest is not None:
self.node_manifest.refreshNodes()
elif self.node_manifest is not None and self.metadata_dict is not None:
self.node_manifest.commitMetadata(self.metadata_dict)
def getConfModificationTime(self):
pass
def setConfModificationTime(self, entity_type):
pass
def establishNodeManifest(self):
"""
Get all configured worker nodes and construct a node manifest
RETURNS: HydraWorkerNodeManifest Instance will all configured nodes
"""
#Establish node list
node_stanzas = HydraNodeStanza.all(host_path=self.local_server_uri, sessionKey=self.local_session_key)
node_stanzas._owner = "nobody" #self.asset_owner
node_stanzas = node_stanzas.filter_by_app(self.app)
#Iterate on all nodes, checking if alive and sorting appropriately
node_list = []
for node_stanza in node_stanzas:
if self.pool_name == node_stanza.pool_name:
password = SplunkStoredCredential.get_password(node_stanza.name, node_stanza.user, self.app,
session_key=self.local_session_key,
host_path=self.local_server_uri)
if isinstance(node_stanza.gateway_port, int):
gateway_port = node_stanza.gateway_port
else:
gateway_port = 8008
gateway_uri = node_stanza.name.rstrip("/0123456789") + str(gateway_port)
node = HydraWorkerNode(self.logger, node_stanza.name, password, node_stanza, gateway_uri, node_stanza.pool_name,
metadata_dict=self.metadata_dict, worker_input_name=self.worker_input_name)
node_list.append(node)
return HydraWorkerNodeManifest(self.logger, node_list, self.app, self.worker_input_name)
def run(self, stanza):
#Get config info
if isinstance(stanza, list):
self.name = stanza[0].get('name', None)
log_level = stanza[0].get("log_level", "WARN").upper()
is_interval_field_defined = True if int(stanza[0].get("interval", -1)) > 0 else False
else:
self.name = stanza.get('name', None)
log_level = stanza.get("log_level", "WARN").upper()
is_interval_field_defined = True if int(stanza.get("interval", -1)) > 0 else False
logname = "hydra_inframon_scheduler_" + self.name.replace("://", "_") + ".log"
self.pool_name = self.name.replace("://", "_").replace("ta_vmware_collection_scheduler_inframon_","")
input_config = self._input_config
#Handle local authentication
self.local_session_key = input_config.session_key
self.local_server_uri = input_config.server_uri
#splunk.setDefault('sessionKey', local_session_key) oh but wait this will get overridden all the f'n time so we have to explicitly pass session keys
splunk.setDefault('user', "nobody")
#this may be made a configuration option
self.scheduling_resolution = 5
#Set up logger
if log_level not in ["DEBUG", "INFO", "WARN", "WARNING", "ERROR"]:
log_level = logging.WARN
self.logger = setupLogger(logger=None,
log_format='%(asctime)s %(levelname)s [' + self.name + '] %(message)s',
level=log_level, log_name=logname)
self.logger.warn("log_level was set to a non-recognizable level it has be reset to WARNING level")
else:
self.logger = setupLogger(logger=None,
log_format='%(asctime)s %(levelname)s [' + self.name + '] %(message)s',
level=log_level, log_name=logname)
self.logger.debug("logger reset with log level of {0}".format(log_level))
#Self Validation
if self.collection_model is None:
self.logger.error(
"HydraScheduler implementation {0} did not have a collection model specified, you must specify a collection model".format(
self.name))
raise NotImplementedError("All HydraScheduler implementations must specify a collection model.")
if self.app is None:
self.logger.error(
"HydraScheduler implementation {0} did not have an app specified, you must specify an app".format(
self.name))
raise NotImplementedError("All HydraScheduler implementations must specify an app.")
if self.collection_conf_name is None:
self.logger.error(
"HydraScheduler implementation {0} did not have a collection conf name specified, you must specify a collection conf or changes cannot be identified".format(
self.name))
raise NotImplementedError("All HydraScheduler implementations must specify a collection conf.")
if self.worker_input_name is None:
self.logger.error(
"HydraScheduler implementation {0} did not have a worker input name specified, you must specify the associated hydra worker's modular input name".format(
self.name))
raise NotImplementedError(
"All HydraScheduler implementations must specify the associated hydra worker's modular input name.")
#Debug logging
self.logger.debug("Initialized with local server uri of {0}".format(self.local_server_uri))
try:
self.metadata_dict = None
self.node_manifest = None
self.app_home = os.path.join(make_splunkhome_path(['etc', 'apps']), self.app)
is_timediff_lt_4hr = True
old_token_list = []
#Initialize node manifest
self.node_manifest = self.establishNodeManifest()
self.logger.debug("Initialized node manifest with nodes={0}".format(str(self.node_manifest.all_node_paths)))
# Calculating number of heads to get auto initial offset.
head_count = 0
for worker in self.node_manifest.active_nodes:
head_count += worker.model.heads
#Initialize collection manifest
collection_manifest = self.establishCollectionManifest(calculate_auto_offset=self.generate_auto_offset,
total_heads=head_count, is_timediff_lt_4hr=is_timediff_lt_4hr, old_token_list=old_token_list)
self.logger.debug("Initialized collection manifest")
# get modification times for inframon_hydra_node and inframon_ta_vmware_collection conf files
node_conf_mtime, collection_conf_mtime = self.getConfModificationTime()
# check vcenter connectivity if change in collection
if (datetime.datetime.utcnow() - collection_conf_mtime >= datetime.timedelta(0, 1800)):
self.logger.info("Rechecking vCenter connectivity as 30 minutes elapsed.")
self.checkvCenterConnectivity(rewrite=True)
#Distribute packages
self.distributeCredentials(self.node_manifest.active_nodes, collection_manifest.collect_list)
self.distributeMetadata()
#Loop time!
self.output.initStream()
target_info_dict = {"target_status_checkedtime":time.time(), "target_hostlist_updatedtime":time.time(), "is_timediff_lt_4hr":is_timediff_lt_4hr}
while True:
distribute_creds = False
node_conf_current_mtime, collection_conf_current_mtime = self.getConfModificationTime()
#Update the status of target into target_info_dict
self.updateTargetDictStatus(target_info_dict)
#Check if conf files have been modified or have not been modified since last 4 hours
if not target_info_dict["is_timediff_lt_4hr"] or collection_conf_mtime < collection_conf_current_mtime:
self.updateCheckPointDir(collection_manifest)
old_token_list = collection_manifest._config_token_list
collection_manifest = self.establishCollectionManifest(
calculate_auto_offset=self.generate_auto_offset, total_heads=head_count, is_timediff_lt_4hr=target_info_dict["is_timediff_lt_4hr"], old_token_list=old_token_list)
if collection_conf_mtime < collection_conf_current_mtime:
self.distributeHierarchy()
self.distributeMetadata()
target_info_dict["target_status_checkedtime"] = time.time()
target_info_dict["target_hostlist_updatedtime"] = time.time()
collection_conf_mtime = collection_conf_current_mtime
distribute_creds = True
self.logger.debug("Re-established collection manifest after filesystem change or 4 hours elapsed")
#TODO: handle continuous mode jobs here, as well as potential cancels
if node_conf_mtime < node_conf_current_mtime:
node_conf_mtime = node_conf_current_mtime
self.node_manifest = self.establishNodeManifest()
self.distributeMetadata()
distribute_creds = True
self.logger.info("Re-established node manifest after filesystem change")
if distribute_creds:
# Distribute Credentials as change in file system
self.distributeCredentials(self.node_manifest.active_nodes, collection_manifest.collect_list)
time_to_next_job = collection_manifest.getTimeToNextJob()
if time_to_next_job > 0:
self.logger.debug("No jobs ready for scheduling going to sleep for maximum %s",
self.scheduling_resolution)
time.sleep(min(time_to_next_job, self.scheduling_resolution))
continue
#Update nodes
## TODO: Remove health check once we completely move to splunk 6.0 version onwards
self.node_manifest.checkHealth()
self.logger.debug("Updated status of active nodes")
#Check on nodes that are inactive
self.checkDeadNodes(collection_manifest.collect_list)
self.logger.debug("Checked status of dead nodes")
#Spread work around
collection_manifest.sprayReadyJobs(self.node_manifest)
self.logger.debug("Sprayed all ready jobs onto active nodes")
#rinse, repeat! also maybe we should be printing some performance stats or something here?
# update checkpoint dir
if self.checkpoint_update_cycle <= 0:
self.updateCheckPointDir(collection_manifest)
self.checkpoint_update_cycle = 50
else:
self.checkpoint_update_cycle = self.checkpoint_update_cycle - 1
self.output.finishStream()
except Exception as e:
self.output.finishStream()
self.logger.exception("Problem with hydra scheduler {0}:\n {1}".format(self.name, str(e)))
self.logger.warning(
"Exiting current run of hydra scheduler, expecting to restart based on duration or interval")
# If node manifest is not establised because of any exception then UnboundLocalError error shows up
# To avoid this we are using try ..except block to update checkPoint dir
try:
self.updateCheckPointDir(collection_manifest)
except UnboundLocalError as e:
self.logger.error("Could not save checkpoint information due to failure to establish node manifest.")
finally:
if not is_interval_field_defined or ver.__version__ < '6.0':
return False
else:
sys.exit(1)