You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
2238 lines
109 KiB
2238 lines
109 KiB
# Copyright (C) 2005-2024 Splunk Inc. All Rights Reserved.
|
|
#CORE PYTHON IMPORTS
|
|
import sys
|
|
import os
|
|
import time
|
|
import datetime
|
|
import math
|
|
import logging
|
|
from httplib2 import ServerNotFoundError
|
|
#Splunk Python does not bundle UUID, so we've included it in the hydra bin, but it is a core python module
|
|
import uuid
|
|
|
|
#CORE SPLUNK IMPORTS
|
|
import splunk
|
|
from splunk.util import normalizeBoolean
|
|
from splunk.rest import simpleRequest
|
|
import splunk.auth as auth
|
|
import splunk.entity as en
|
|
import splunk.version as ver
|
|
|
|
#SA-HYDRA-inframon IMPORTS
|
|
from hydra_inframon import XMLOutputManager, setupLogger, isSplunkSessionKeyValid, ForceHydraRebuild
|
|
from .models import HydraHealthStanza, HydraMetadataStanza, HydraNodeStanza, SplunkStoredCredential, HydraGatewayStanza
|
|
from .hydra_common import HydraCommon, HydraGatewayAdapter, JobTuple
|
|
#Modify Path to include SA-Hydra-inframon/lib
|
|
from splunk.clilib.bundle_paths import make_splunkhome_path
|
|
|
|
sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-Hydra-inframon', 'lib']))
|
|
from SolnCommon.modinput import ModularInput, Field, DurationField
|
|
|
|
#Utility Functions
|
|
def makeFieldID():
|
|
"""
|
|
Return a string usable as an id in a splunk wildcard field
|
|
"""
|
|
return str(uuid.uuid1()).replace("-", "")
|
|
|
|
|
|
#Classes for data, might want to put these in a module by themselves at some point
|
|
class HydraConfigToken(object):
|
|
"""A config token that can be sent to any worker."""
|
|
|
|
def __init__(self, target, username, task, metadata_id, logger, initial_schedule_offset=0, metadata={}, special={},
|
|
initial_aggregate_execution_time=5):
|
|
"""
|
|
Task and Target are the job type to perform and the target host to perform
|
|
it on.
|
|
|
|
ARGS:
|
|
@type target: str
|
|
@param target: The external system/resource upon which to perform the task
|
|
@type username: str
|
|
@param username: The username to use when logging into the target
|
|
@type task: str
|
|
@param task: The task or type of job to perform on the target
|
|
@type metadata_id: str
|
|
@param metadata_id: the name of the metadata to use when performing the task
|
|
@type logger: logging.logger reference
|
|
@param logger: the logger to use when writing out messages to the scheduler log
|
|
@type initial_schedule_offset: int
|
|
@param initial_schdule_offset: the delta from the current time at which to begin scheduling jobs from this config token
|
|
@type metadata: dict
|
|
@param metadata: the collection configuration information for this particular config token to determine the scheduling properties
|
|
@type special: dict
|
|
@param special: the metadata specific to this config token only that will be stored in the job tuple
|
|
@type initial_aggregate_execution_time: int
|
|
@param initial_aggregate_execution_time: the inital weight of jobs created by this config token
|
|
"""
|
|
#Basic stuff
|
|
self.target = target
|
|
self.username = username
|
|
self.task = task
|
|
self.logger = logger
|
|
self.metadata_id = metadata_id
|
|
self.special = special
|
|
# read default offset from config if exits
|
|
if task + "_offset" in metadata:
|
|
self.offset = metadata[task + "_offset"]
|
|
else:
|
|
self.offset = initial_schedule_offset
|
|
|
|
self.metadata = metadata
|
|
|
|
#Handle all work around scheduling this thing
|
|
interval_param_name = task + "_interval"
|
|
if interval_param_name in metadata:
|
|
self.interval = metadata[interval_param_name]
|
|
else:
|
|
self.logger.error(
|
|
"Could not establish configured interval for job type %s, setting to default of 60 seconds", self.task)
|
|
self.interval = 60
|
|
#Expiration period
|
|
expiration_param_name = task + "_expiration"
|
|
if expiration_param_name in metadata:
|
|
self._expiration_period = metadata[expiration_param_name]
|
|
else:
|
|
self.logger.error(
|
|
"Could not establish configured expiration period for job type %s, setting to default of same as interval",
|
|
self.task)
|
|
self._expiration_period = self.interval
|
|
|
|
#Priority Modification
|
|
priority_param_name = task + "_priority"
|
|
if priority_param_name in metadata:
|
|
self._priority_adjustment = metadata[priority_param_name]
|
|
self.logger.debug("Established configured priority adjustment for job type %s, setting to %s", self.task,
|
|
self._priority_adjustment)
|
|
else:
|
|
self.logger.debug(
|
|
"Could not establish configured priority adjustment for job type %s, setting to default of 0",
|
|
self.task)
|
|
self._priority_adjustment = 0
|
|
|
|
#Task execution initial estimated time
|
|
task_exec_param_name = task + "_exectime"
|
|
if task_exec_param_name in metadata:
|
|
self._execution_time = metadata[task_exec_param_name]
|
|
self.logger.debug("Established configured execution time for job task=%s, setting to %s", self.task,
|
|
self._execution_time)
|
|
else:
|
|
self.logger.debug("Could not establish configured execution time for job task=%s, setting to default %s",
|
|
self.task, initial_aggregate_execution_time)
|
|
self._execution_time = initial_aggregate_execution_time
|
|
|
|
#Atomic Task Handling
|
|
self._assigned = False
|
|
confirmation_expiration_param_name = task + "_confirmation_expiration"
|
|
if confirmation_expiration_param_name in metadata:
|
|
self._confirmation_expiration = metadata[confirmation_expiration_param_name]
|
|
self.logger.debug(
|
|
"Established configured atomic confirmation expiration period for job task=%s, setting to %s",
|
|
self.task, self._confirmation_expiration)
|
|
else:
|
|
self._confirmation_expiration = 2 * self.interval
|
|
self.logger.debug(
|
|
"Could not establish configured atomic confirmation expiration period for job task=%s, setting to default of 2*interval, %s",
|
|
self.task, self._confirmation_expiration)
|
|
# assignment_info is only used for atomic completion or failure tracking
|
|
self.assignment_info = None
|
|
if task in metadata.get("atomic_tasks", []):
|
|
self.atomic = True
|
|
else:
|
|
self.atomic = False
|
|
|
|
self._parse_time = datetime.datetime.utcnow()
|
|
self._schedule_time = self._parse_time + datetime.timedelta(seconds=self.offset)
|
|
self._last_time = self._parse_time
|
|
|
|
#This is used to know who to preferably assign this particular job to
|
|
self.worker_affinity = []
|
|
|
|
def __str__(self):
|
|
"""
|
|
Return the job type, username, and target as representation for this config token
|
|
|
|
RETURNS string representation of this config token
|
|
"""
|
|
return "HydraConfigToken(task={0}, target={1}, username={2})".format(self.task, self.target, self.username)
|
|
|
|
def __repr__(self):
|
|
"""
|
|
Return the job type, username, and target as representation for this config token
|
|
|
|
RETURNS string representation of this config token
|
|
"""
|
|
return "HydraConfigToken(task={0}, target={1}, username={2})".format(self.task, self.target, self.username)
|
|
|
|
def getExecTime(self):
|
|
'''
|
|
@return: return execution time for token
|
|
'''
|
|
return self._execution_time
|
|
|
|
def __eq__(self, other):
|
|
"""
|
|
@param other: Object of HydraConfigToken for comparison
|
|
Return the boolean result if the object represents same token or not
|
|
"""
|
|
if isinstance(other, self.__class__):
|
|
return self.target == other.target and self.task == other.task and self.special == other.special
|
|
return False
|
|
|
|
def setExecTime(self, time_in_sec):
|
|
'''
|
|
@param time_in_sec: execution time in sec
|
|
@return: nothing
|
|
'''
|
|
self._execution_time = time_in_sec
|
|
|
|
def setOffset(self, initial_offset=0):
|
|
'''
|
|
Set initial token offset
|
|
@param initial_offset: offset value in sec
|
|
@return: nothing
|
|
'''
|
|
self.offset = initial_offset
|
|
self.logger.info("Successfully set offset for task=%s, target=%s, offset=%s", self.task, self.target,
|
|
self.offset)
|
|
#updating schedule time
|
|
self._schedule_time = self._parse_time + datetime.timedelta(seconds=self.offset)
|
|
self.logger.info("Schedule time is updated successfully for task=%s, target=%s, offset=%s", self.task,
|
|
self.target, self.offset)
|
|
|
|
def is_locked(self):
|
|
"""
|
|
Essentially an accessor for the _assigned prop currently, this method
|
|
returns an boolean indicating if this config token is currently locked,
|
|
i.e. blocked from creating jobs.
|
|
|
|
@rtype: bool
|
|
@return: True if locked, False if unlocked
|
|
"""
|
|
return self._assigned
|
|
|
|
def unlock(self):
|
|
"""
|
|
This method is used to reset the _assigned prop to False after the
|
|
confirmation of an atomic job's completion or expiration of the
|
|
confirmation period. If the job is past its current scheduled time,
|
|
resets the clock to schedule as of now.
|
|
|
|
@rtype: None
|
|
@return: None
|
|
"""
|
|
self._assigned = False
|
|
utc_now = datetime.datetime.utcnow()
|
|
if self._schedule_time < utc_now:
|
|
self._schedule_time = utc_now
|
|
|
|
def isReady(self):
|
|
"""
|
|
Determine if this task is ready to be scheduled.
|
|
Note for continuous tasks this is a mapping to whether or not the task is assigned.
|
|
|
|
@rtype: bool
|
|
@return True if ready to be assigned, False if not time yet
|
|
"""
|
|
should_be_ready = self._schedule_time <= datetime.datetime.utcnow()
|
|
if not self.atomic:
|
|
return should_be_ready
|
|
else:
|
|
if not self._assigned:
|
|
return should_be_ready
|
|
#Determine if we should auto unlock due to lack of notification in time
|
|
elif (self._schedule_time + datetime.timedelta(
|
|
seconds=self._confirmation_expiration)) <= datetime.datetime.utcnow():
|
|
if self.assignment_info is not None:
|
|
node_path, job_name = self.assignment_info
|
|
else:
|
|
node_path, job_name = ("UNKNOWN", "UNKNOWN")
|
|
self.logger.warning(
|
|
"[HydraConfigToken] [isReady] atomic config_token=%s failed to confirm execution of last assigned job=%s on node=%s within confirmation_expiration, unlocking job and allowing assignment...",
|
|
str(self), job_name, node_path)
|
|
self.unlock()
|
|
return should_be_ready
|
|
else:
|
|
#we are locked and should be locked
|
|
if self.assignment_info is not None:
|
|
node_path, job_name = self.assignment_info
|
|
else:
|
|
node_path, job_name = ("UNKNOWN", "UNKNOWN")
|
|
self.logger.debug(
|
|
"[HydraConfigToken] [isReady] atomic config_token=%s not yet to confirm execution of last assigned job=%s on node=%s, blocking job creation",
|
|
str(self), job_name, node_path)
|
|
return False
|
|
|
|
|
|
def scheduleNext(self):
|
|
"""
|
|
Schedule the next iteration of this particular config token This follows several rules.
|
|
At the most basic it adds its interval to the current schedule time and sets it. It will
|
|
however only do so if that time is in the future, if not it will continue to add the
|
|
interval until it is in the future flagging a warning each time.
|
|
|
|
@rtype: None
|
|
"""
|
|
if self.interval < 1:
|
|
return
|
|
schedule_time = self._schedule_time
|
|
self._last_time = schedule_time
|
|
cur_time = datetime.datetime.utcnow()
|
|
while True:
|
|
schedule_time += datetime.timedelta(seconds=self.interval)
|
|
if schedule_time > cur_time:
|
|
self._schedule_time = schedule_time
|
|
if not self.atomic:
|
|
self._assigned = False
|
|
break
|
|
else:
|
|
self.logger.warning(
|
|
"config token type {0} has missed one iteration due at {1}, scheduling for next iteration with interval {2}".format(
|
|
self.task, str(schedule_time), str(self.interval)))
|
|
|
|
def _updateAffinity(self, worker_path):
|
|
"""
|
|
Update self.worker_affinity to reflect the assignment of this job to this node.
|
|
args:
|
|
worker_path - the management uri of the worker to update the affinity for
|
|
|
|
RETURNS nothing
|
|
"""
|
|
try:
|
|
self.worker_affinity.remove(worker_path)
|
|
except ValueError:
|
|
self.logger.debug("[HydraConfigToken] first time instance of config_token=%s assigned to worker=%s",
|
|
str(self), worker_path)
|
|
self.worker_affinity.insert(0, worker_path)
|
|
|
|
def register_assignment(self, job_name, node_path):
|
|
"""
|
|
Take note of the job name and the worker the job was assigned to so that
|
|
we can unlock the job once it completes.
|
|
|
|
@type job_name: str
|
|
@param job_name: the name/id of the assigned job
|
|
@type node_path: str
|
|
@param node_path: the data collection node's node_path to which the job was assigned
|
|
|
|
@rtype: None
|
|
@return: None
|
|
"""
|
|
#Assignment info is the simple tuple of the node_path and the job_name as assigned to that worker
|
|
self.assignment_info = (node_path, job_name)
|
|
|
|
def assignToWorker(self, worker):
|
|
"""
|
|
Give this a worker's HydraWorkerNode object and it will assign it's current job
|
|
with the correct information to the specified worker.
|
|
If successfully assigned this will also mark this config token as assigned.
|
|
|
|
returns True if successful, False if not
|
|
"""
|
|
job_name = "job_" + makeFieldID()
|
|
job = JobTuple(job_name, self.target, self.task, self.metadata_id, self._schedule_time, self._last_time,
|
|
self._expiration_period, self.special)
|
|
priority_num = str(
|
|
int(time.mktime(job.create_time.timetuple())) + int(job.expiration_period + self._priority_adjustment))
|
|
if worker.addJob((priority_num, job), self, self.atomic):
|
|
self._updateAffinity(worker.node_path)
|
|
self._assigned = True
|
|
self.scheduleNext()
|
|
self.logger.debug("[HydraConfigToken] job=%s of task=%s queued for assignment to node=%s", job_name,
|
|
self.task, worker.node_path)
|
|
return True
|
|
else:
|
|
self.logger.error("[HydraConfigToken] job=%s of task=%s failed to be assigned to node=%s", job_name,
|
|
self.task, worker.node_path)
|
|
return False
|
|
|
|
|
|
class HydraCollectionManifest(object):
|
|
"""
|
|
An administration layer on top of collections of config tokens aimed at data
|
|
collection.
|
|
An instance of this class provides scheduling methods for its constituent tasks
|
|
"""
|
|
|
|
def __init__(self, logger, metadata_dict={}, config_token_list=[], app="SA-Hydra-inframon", collect_list=[]):
|
|
"""
|
|
Create a new instance of a collection manifest, optionally with a config
|
|
token list specified.
|
|
"""
|
|
self._metadata_dict = metadata_dict
|
|
self._config_token_list = config_token_list
|
|
self.collect_list = collect_list
|
|
self.logger = logger
|
|
self.app = app
|
|
# Dict which hold aggregate execution time taken by task level
|
|
# Key is task, value is tuple of execution time, by total execution cycles far
|
|
self.task_aggre_exec_time = {}
|
|
# Dict to hold aggregate execution time taken by task, target, meta_data_id
|
|
# Key is combination of target|task|meta_id, value is tuple of execution time, by total execution cycles so far
|
|
self.task_target_metaid_aggre_exec_time = {}
|
|
|
|
self._calculateTokenListProperties()
|
|
self._calculateExecutionTime()
|
|
|
|
def _calculateTokenListProperties(self):
|
|
"""
|
|
Recalculates all internal properties based on the token list including:
|
|
task_set - a set of all distinct tasks in the manifest
|
|
atomic_config_tokens - a list of all config tokens that are marked atomic
|
|
|
|
returns nothing
|
|
"""
|
|
self.task_set = set()
|
|
self.atomic_config_tokens = []
|
|
for token in self._config_token_list:
|
|
self.task_set.add(token.task)
|
|
if token.atomic:
|
|
self.atomic_config_tokens.append(token)
|
|
self.logger.debug("[HydraCollectionManifest] calculated aggregated collection task_set=%s", self.task_set)
|
|
|
|
def _calculateExecutionTime(self):
|
|
'''
|
|
Calculate initial execution time at task and target|task|metadata_id level
|
|
@return: nothing
|
|
'''
|
|
for token in self._config_token_list:
|
|
key = token.task
|
|
value = [float(token.getExecTime()), 1]
|
|
self._update_execution_dict(self.task_aggre_exec_time, key, value)
|
|
# update at target, task and metadata id level
|
|
key = token.target + "|" + token.task + "|" + token.metadata_id
|
|
self._update_execution_dict(self.task_target_metaid_aggre_exec_time, key, value)
|
|
|
|
def _calculateTaskWeights(self, node_list):
|
|
"""
|
|
Given a particular node list calculate the weights of all tasks
|
|
and return them as a dict
|
|
"""
|
|
weights = {}
|
|
for task in self.task_set:
|
|
weight = 0
|
|
for node in node_list:
|
|
if node.hasCapability(task):
|
|
weight += node.model.heads
|
|
self.logger.debug("[HydraCollectionManifest] calculated weight=%s for task=%s", weight, task)
|
|
if weight == 0:
|
|
self.logger.error(
|
|
"[HydraCollectionManifest] calculated weight=%s for task=%s implies no node will be able to perform jobs of this task, please alter capabilities of your nodes to accommodate this task",
|
|
weight, task)
|
|
weights[task] = weight
|
|
|
|
return weights
|
|
|
|
def getReadyJobs(self):
|
|
"""
|
|
gets a list of jobs ready to assign
|
|
|
|
returns list of jobs ready to be assigned
|
|
"""
|
|
ready_jobs = []
|
|
for token in self._config_token_list:
|
|
if token.isReady():
|
|
ready_jobs.append(token)
|
|
return ready_jobs
|
|
|
|
def getTimeToNextJob(self):
|
|
"""
|
|
Get the time in seconds until the next job is ready to be scheduled
|
|
|
|
RETURNS time in seconds
|
|
"""
|
|
time_to_next_job = None
|
|
utc_now = datetime.datetime.utcnow()
|
|
token_delta = 0
|
|
for token in self._config_token_list:
|
|
if utc_now > token._schedule_time:
|
|
token_delta = 0
|
|
else:
|
|
token_delta = (token._schedule_time - utc_now).seconds
|
|
if time_to_next_job is None:
|
|
time_to_next_job = token_delta
|
|
elif token_delta < time_to_next_job:
|
|
time_to_next_job = token_delta
|
|
if time_to_next_job is None:
|
|
raise ForceHydraRebuild(
|
|
"[HydraCollectionManifest] could not establish the time to next job run, forcing a rebuild...")
|
|
return time_to_next_job
|
|
|
|
def _update_execution_dict(self, dict_var, key, value):
|
|
'''
|
|
Support function to update task_aggre_exec_time or task_target_metaid_aggre_exec_time
|
|
|
|
@param dict_var : dict reference of one of them (task_aggre_exec_time or task_target_metaid_aggre_exec_time)
|
|
@key : key name which needs to be updated (target|task|metadata_id)
|
|
@value : Array of 2 items,
|
|
0 - Average value of give count in 1 index
|
|
1 - Number of cycles execution time reported for given key
|
|
@return nothing
|
|
'''
|
|
if key in dict_var:
|
|
total_cycle = dict_var[key][1] + value[1]
|
|
# Avoid divide by zero exception
|
|
if total_cycle > 0:
|
|
dict_var[key] = (
|
|
(float(dict_var[key][0] * dict_var[key][1] + value[0] * value[1])) / total_cycle, total_cycle)
|
|
else:
|
|
dict_var[key] = (value[0], value[1])
|
|
# check if dict value reached hit float max threshold
|
|
if dict_var[key][0] / sys.float_info.max > 0.85:
|
|
# reset it to 20% of the current value
|
|
self.logger.info(
|
|
"[HydraCollectionManifest] Float value has reached to 85% of max float so reseting it to 20% of the current value")
|
|
dict_var[key][0] = 0.2 * dict_var[key][0]
|
|
|
|
def _update_execution_time(self, info_dict):
|
|
'''
|
|
Update aggregated execution in task_aggre_exec_time, task_target_metaid_aggre_exec_time
|
|
|
|
@param info_dict: dict which has avg execution time reported by gateway
|
|
'''
|
|
for key, value in info_dict.items():
|
|
# Update for target|task|metadata_id
|
|
self._update_execution_dict(self.task_target_metaid_aggre_exec_time, key, value[:2])
|
|
# Update for task
|
|
self._update_execution_dict(self.task_aggre_exec_time, key.split("|")[1], value[:2])
|
|
|
|
def _get_job_avg_exectime(self, key, task):
|
|
'''
|
|
Get avg execution from task_aggre_exec_time or task_target_metaid_aggre_exec_time
|
|
|
|
@return: avg execution time in float if exists otherwise 0.0
|
|
'''
|
|
if key in self.task_target_metaid_aggre_exec_time:
|
|
# Get target, task and metadata_id level if this is defined
|
|
return float(self.task_target_metaid_aggre_exec_time[key][0])
|
|
else:
|
|
# Get a task level (Assuming that task level this value is always be defined)
|
|
if task in self.task_aggre_exec_time:
|
|
return float(self.task_aggre_exec_time[task][0])
|
|
else:
|
|
# This code will not execute ever however if any case lets log the information
|
|
self.logger.error(
|
|
"[HydraCollectionManifest] Could not find execution time so skipping job to calculate the average execution time (key=%s)",
|
|
key)
|
|
return 0.0
|
|
|
|
def _calculateLoadDistribution(self, node_list, ready_jobs, node_job_infos):
|
|
"""
|
|
This method takes in a node_list and ready jobs and
|
|
calculates the load balance information. This load balance information
|
|
is represented by available_work_load which is a mapping of host path
|
|
to the number of load that node has to put work in.
|
|
@param node_list: a list of active HydraWorkerNode objects
|
|
@param read_jobs: jobs that need to be scheduled
|
|
@param node_job_infos: dict of aggregate job execution info which is reported by gateway from all
|
|
workers node (see getActiveJobInfo for details)
|
|
|
|
@return: tuple of available_work_load, balanced_load of each head
|
|
available_work_load is a dict which hold work load which can be handle by each worker
|
|
key of this dict is a worker path
|
|
balance_load : load of per worker head
|
|
"""
|
|
#Must use floats due to division
|
|
head_count = 0.0
|
|
queue_job_execution_time = {}
|
|
available_work_load = {}
|
|
for worker in node_list:
|
|
node_job_info = node_job_infos[worker.node_path]
|
|
self.logger.debug("[HydraCollectionManifest] Average time reported by gateway of node=%s, value=%s",
|
|
worker.node_path, node_job_info)
|
|
# Update execution time at task and target|task|metadata_id and task level
|
|
self._update_execution_time(node_job_info["job_aggregate_execution_info"])
|
|
# calculate unclaimed job execution time
|
|
total_unclaimed_queue_execution_time = 0.0
|
|
for key, value in node_job_info["job_aggregate_execution_info"].items():
|
|
total_unclaimed_queue_execution_time = total_unclaimed_queue_execution_time + value[
|
|
2] * self._get_job_avg_exectime(
|
|
key, key.split("|")[1]) # active jobs * avg time
|
|
queue_job_execution_time[worker.node_path] = total_unclaimed_queue_execution_time
|
|
self.logger.debug(
|
|
"[HydraCollectionManifest] node=%s current unclaimed queue length=%s, left execution time=%s",
|
|
worker.node_path, node_job_info["count"], total_unclaimed_queue_execution_time)
|
|
head_count += worker.model.heads
|
|
# calculate ready queue execution time
|
|
total_readyqueue_execution_time = 0.0
|
|
for ready_job in ready_jobs:
|
|
total_readyqueue_execution_time = total_readyqueue_execution_time + self._get_job_avg_exectime(
|
|
key=ready_job.target + "|" + ready_job.task + "|" + ready_job.metadata_id, task=ready_job.task)
|
|
|
|
balanced_load = math.ceil((total_readyqueue_execution_time + total_unclaimed_queue_execution_time) / head_count)
|
|
self.logger.debug(
|
|
"[HydraCollectionManifest] spraying jobs to %s heads on %s nodes with balanced unclaimed queue load per head of %s",
|
|
head_count, len(node_list), balanced_load)
|
|
for worker in node_list:
|
|
available_work_load[worker.node_path] = balanced_load * worker.model.heads - queue_job_execution_time[
|
|
worker.node_path]
|
|
self.logger.debug("[HydraCollectionManifest] node=%s load balanced available work load=%s",
|
|
worker.node_path, available_work_load[worker.node_path])
|
|
return available_work_load, balanced_load
|
|
|
|
def _queueJobsToWorkers(self, job_list, node_list, available_work_load, node_manifest, balanced_load, node_infos):
|
|
"""
|
|
Given a list of sorted jobs and a list of active HydraWorkerNode objects
|
|
this will queue all jobs to the workers. This uses the load balance info
|
|
in available_work_load and the workers_by_path dict.
|
|
Honestly is this method particularly useful isolated? No. But we make
|
|
sillyness for the sake of being able to unit test.
|
|
ARGS:
|
|
job_list - list of HydraConfigToken objects to be queued
|
|
node_list - list of HydraWorkerNode objects onto which to queue jobs
|
|
available_work_load - mapping of available load per node (see
|
|
_calculateLoadDistribution for details)
|
|
node_manifest - the node manifest to which all these nodes belong
|
|
balance_load - Load balancer value ( see _calculateLoadDistribution for details)
|
|
node_infos - is a dict which hold reported by gateway from each worker
|
|
|
|
RETURNS nothing
|
|
"""
|
|
worker_queue_load_sort_key = lambda worker: available_work_load[worker.node_path]
|
|
workers_by_path = node_manifest.nodes_by_path
|
|
# Define minimum jobs count as per head count and considering existing job count on that node
|
|
minimum_job = {}
|
|
for node in node_list:
|
|
minimum_job[node.node_path] = node.model.heads - node_infos[node.node_path]["count"]
|
|
|
|
for token in job_list:
|
|
worker_path = None
|
|
worker = None
|
|
avg_job_exec_time = self._get_job_avg_exectime(
|
|
key=token.target + "|" + token.task + "|" + token.metadata_id, task=token.task)
|
|
#First check the by token affinity
|
|
for tmp_path in token.worker_affinity:
|
|
if (available_work_load.get(tmp_path, 0) - avg_job_exec_time) > 0 or minimum_job.get(tmp_path, 0) > 0:
|
|
tmp_worker = workers_by_path[tmp_path]
|
|
if tmp_worker.hasCapability(token.task):
|
|
worker_path = tmp_path
|
|
worker = tmp_worker
|
|
self.logger.debug("Assigned job based upon job token affinity for node=%s node_path=%s",
|
|
worker, worker_path)
|
|
break
|
|
|
|
#If that fails, check the by target affinity
|
|
if worker_path is None:
|
|
for tmp_path in node_manifest.getPreferredNodesForTarget(token.target):
|
|
if (available_work_load.get(tmp_path, 0) - avg_job_exec_time) > 0 or minimum_job.get(tmp_path,
|
|
0) > 0:
|
|
tmp_worker = workers_by_path[tmp_path]
|
|
if tmp_worker.hasCapability(token.task):
|
|
worker_path = tmp_path
|
|
worker = tmp_worker
|
|
self.logger.debug("Assigned job based upon job target affinity for node=%s node_path=%s",
|
|
worker, worker_path)
|
|
break
|
|
|
|
#If that fails, just assign to a worker that has the best room(Start filling form smallest available load)
|
|
if worker_path is None:
|
|
node_list.sort(key=worker_queue_load_sort_key)
|
|
for node in node_list:
|
|
if node.hasCapability(token.task):
|
|
# First best fit node
|
|
if available_work_load[node.node_path] - avg_job_exec_time > 0 or minimum_job[
|
|
node.node_path] > 0:
|
|
worker = node
|
|
worker_path = node.node_path
|
|
self.logger.debug(
|
|
"Assigned job based upon job first best fit algorithm for node=%s node_path=%s",
|
|
worker, worker_path)
|
|
break
|
|
|
|
#If thats fails, (may be a corner case where sum of all available weight is greater than that job execution time, but individual node does not have enough load factor left out)
|
|
if worker_path is None:
|
|
node_list.sort(key=worker_queue_load_sort_key, reverse=True)
|
|
for node in node_list:
|
|
worker = node
|
|
worker_path = worker.node_path
|
|
if worker.hasCapability(token.task):
|
|
self.logger.debug("Assigned job based upon job weight for node=%s node_path=%s", worker,
|
|
worker_path)
|
|
break
|
|
else:
|
|
self.logger.error(
|
|
"[HydraCollectionManifest] unable to find an active node capable of executing config_token=%s of task=%s if no node configured for this task becomes active this config_token will never generate another job",
|
|
token, token.task)
|
|
worker = None
|
|
continue
|
|
|
|
#Queue up the job to the node for assignment to its gateway queue
|
|
if worker is not None and token.assignToWorker(worker):
|
|
node_manifest.updateTargetNodeAffinity(token.target, worker_path)
|
|
self.logger.debug(
|
|
"Before job assignment values are, node_weight=%s, job_weight=%s, minimum_job(can be negative value)=%s",
|
|
available_work_load[worker_path], avg_job_exec_time, minimum_job[worker_path])
|
|
available_work_load[worker_path] = available_work_load[worker_path] - avg_job_exec_time
|
|
minimum_job[worker_path] = minimum_job[worker_path] - 1
|
|
self.logger.debug(
|
|
"After job assignment value, node_weight=%s, job_weight=%s, minimum_job(can be negative value)=%s",
|
|
available_work_load[worker_path], avg_job_exec_time, minimum_job[worker_path])
|
|
|
|
|
|
def _unlock_atomic_jobs(self, node_job_infos):
|
|
"""
|
|
Given the job infos from the active nodes attempt to unlock all the
|
|
atomic config tokens that are currently locked. This is done by
|
|
matching the completed jobs against the atomic config token's stored
|
|
current job. If a node is not present that an atomic config token
|
|
previously assigned to and expects to hear from we flag a warning but
|
|
do not try to reassign unless the config token lock is expired.
|
|
|
|
@type node_job_infos: dict
|
|
@param node_job_infos: the dict of node -> job info
|
|
|
|
@rtype None
|
|
@return None
|
|
"""
|
|
for config_token in self.atomic_config_tokens:
|
|
if config_token.is_locked():
|
|
node_path, job_name = config_token.assignment_info
|
|
if node_path in node_job_infos:
|
|
node_job_info = node_job_infos[node_path]
|
|
node_atomic_job_info = node_job_info.get("atomic_job_info",
|
|
{"completed_atomic_jobs": [], "failed_atomic_jobs": []})
|
|
if job_name in node_atomic_job_info.get("completed_atomic_jobs", []):
|
|
config_token.unlock()
|
|
self.logger.debug(
|
|
"[HydraCollectionManifest] [UnlockAtomicJobs] confirmed completion of job=%s on node=%s originating from token=%s",
|
|
job_name, node_path, config_token)
|
|
elif job_name in node_atomic_job_info.get("failed_atomic_jobs", []):
|
|
config_token.unlock()
|
|
self.logger.error(
|
|
"[HydraCollectionManifest] [UnlockAtomicJobs] confirmed the failure of job=%s on node=%s originating from token=%s",
|
|
job_name, node_path, config_token)
|
|
else:
|
|
self.logger.debug(
|
|
"[HydraCollectionManifest] [UnlockAtomicJobs] unable to confirm completion/failure of job=%s on node=%s originating from token=%s with current execution information",
|
|
job_name, node_path, config_token)
|
|
|
|
def _getActiveWorkerInfo(self, node_manifest, confirm_status=True):
|
|
"""
|
|
Refresh Nodes and set new dict of active workers for scheduler run
|
|
@type confirm_status: bool
|
|
@param confirm_status: whether to confirm status by calling updateStatus
|
|
|
|
@rtype dict
|
|
@return active_workers
|
|
"""
|
|
node_manifest.refreshNodes(confirm_status)
|
|
active_workers = node_manifest.active_nodes
|
|
worker_count = len(active_workers)
|
|
if worker_count < 1:
|
|
self.logger.error(
|
|
"[HydraCollectionManifest] Attempted to assign jobs but we have no active workers to assign to. Restarting Scheduler...")
|
|
raise ForceHydraRebuild
|
|
|
|
return active_workers
|
|
|
|
def sprayReadyJobs(self, node_manifest):
|
|
"""
|
|
Take all config tokens that are ready to be assigned out as jobs and spray them over
|
|
active workers in efforts to even out job queues.
|
|
This method will also be in charge of managing affinities for workers to particular
|
|
target assets.
|
|
|
|
args:
|
|
node_manifest - the node manifest to get the worker nodes to assign the jobs to from
|
|
"""
|
|
#Get workers
|
|
active_workers = self._getActiveWorkerInfo(node_manifest)
|
|
|
|
#Get aggregate job execution information
|
|
node_job_infos = {}
|
|
for worker in active_workers:
|
|
activeJobInfo = worker.getActiveJobInfo()
|
|
#No active job found for this node
|
|
if len(activeJobInfo) > 0:
|
|
node_job_infos[worker.node_path] = activeJobInfo
|
|
|
|
#Need refreshNodes again in case node statuses are changed while getting active job
|
|
active_workers = self._getActiveWorkerInfo(node_manifest, False)
|
|
|
|
#Unlock atomic jobs here based on the node_job_infos
|
|
self._unlock_atomic_jobs(node_job_infos)
|
|
|
|
#Sort jobs by task weight primarily so that the hardest to schedule get scheduled first, secondarily by target
|
|
task_weights = self._calculateTaskWeights(active_workers)
|
|
ready_jobs = self.getReadyJobs()
|
|
ready_jobs.sort(key=lambda token: token.target)
|
|
ready_jobs.sort(key=lambda token: task_weights[token.task])
|
|
self.logger.debug("Sorted list of ready_jobs=%s", ready_jobs)
|
|
|
|
#Calculate load balancing
|
|
available_work_load, balanced_load = self._calculateLoadDistribution(active_workers, ready_jobs, node_job_infos)
|
|
|
|
#Spray the jobs onto workers
|
|
self._queueJobsToWorkers(ready_jobs, active_workers, available_work_load, node_manifest, balanced_load,
|
|
node_job_infos)
|
|
|
|
#Now that we have established the assignments, we actually commit the queues
|
|
reassign_jobs = []
|
|
atomic_reassign_jobs = {}
|
|
for worker in active_workers:
|
|
try:
|
|
failed_to_assign, atomic_failed_to_assign = worker.commitJobs()
|
|
reassign_jobs += failed_to_assign
|
|
atomic_reassign_jobs.update(atomic_failed_to_assign)
|
|
except Exception:
|
|
self.logger.exception(
|
|
"[HydraCollectionManifest] failed to assign batch of jobs for node=%s, may be dead and reassigning jobs to others, may cause job duplication",
|
|
worker.node_path)
|
|
reassign_jobs += (worker.add_jobs)
|
|
worker.add_jobs = []
|
|
|
|
if reassign_jobs != []:
|
|
#call primitive sprayJobSet that sprays already parsed jobs.
|
|
node_manifest.sprayJobSet(reassign_jobs, atomic_reassign_jobs)
|
|
|
|
|
|
class HydraWorkerNode(object):
|
|
'''
|
|
An object representation of a splunk forwarder running at least 1 HydraWorker process
|
|
and added to this scheduler's management in inframon_hydra_node.conf
|
|
'''
|
|
#Class variables for use in status
|
|
OFFLINE = False
|
|
ONLINE = True
|
|
|
|
def __init__(self, logger, path, password, model, gateway_uri, pool_name, session_key=None, metadata_dict=None,
|
|
worker_input_name="ta_vmware_collection_worker_inframon"):
|
|
"""
|
|
Initialize the HydraWorkerNode object.
|
|
args:
|
|
logger - a ref to a logger instance
|
|
path - this is the management uri (host_path) to this splunk server, e.g. https://forwarder.splunk.com:8089
|
|
password - the splunkd management service password for the node
|
|
model - this is the HydrasNodeStanza model object corresponding to this node
|
|
gateway_uri - this is the uri for the Hydra WSGI Gateway on the node, e.g. https://forwarder.splunk.com:8008
|
|
session_key - this is the valid session_key for this splunk forwarder
|
|
worker_input_name - the name of the modular input to control worker processes
|
|
"""
|
|
self.logger = logger
|
|
self.node_path = path
|
|
self.model = model
|
|
self.pool_name = pool_name
|
|
self.app = self.model.namespace
|
|
self.password = password
|
|
self.worker_input_name = worker_input_name
|
|
self.gateway_uri = gateway_uri
|
|
self.capabilities = self.model.capabilities if self.model.capabilities is not None else ["*"]
|
|
self.worker_log_level = self.model.log_level if self.model.log_level is not None else "INFO"
|
|
|
|
#Establish the session key and the status
|
|
if session_key is None:
|
|
self.refreshSessionKey()
|
|
else:
|
|
self.session_key = session_key
|
|
self.updateStatus()
|
|
|
|
self.configureGateway()
|
|
|
|
self.establishGateway()
|
|
|
|
if metadata_dict is not None:
|
|
self.setMetadata(metadata_dict, bounce_heads=False)
|
|
|
|
self.heads_list = self.establishHeads()
|
|
self.add_jobs = []
|
|
self.atomic_add_jobs = {}
|
|
|
|
def hasCapability(self, task):
|
|
"""
|
|
Check that this node can perform the given task
|
|
ARGS:
|
|
task - the task to check
|
|
|
|
RETURNS True if it can, False otherwise
|
|
"""
|
|
if "*" in self.capabilities or task in self.capabilities:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def configureGateway(self):
|
|
"""
|
|
Configure the gateway on the node per the inframon_hydra_node configuration
|
|
"""
|
|
self.logger.info(
|
|
"[HydraWorkerNode] [configureGateway] setting gateway configuration on node=%s to bind to port=%s...",
|
|
self.node_path, self.model.gateway_port)
|
|
try:
|
|
stanza = HydraGatewayStanza.from_name("gateway", "SA-Hydra-inframon", host_path=self.node_path,
|
|
session_key=self.session_key)
|
|
if not stanza:
|
|
stanza = HydraGatewayStanza("SA-Hydra-inframon", "nobody", "gateway", sessionKey=self.session_key,
|
|
host_path=self.node_path)
|
|
stanza.port = self.model.gateway_port
|
|
for retry in range(4):
|
|
if stanza.passive_save():
|
|
self.logger.info("[HydraWorkerNode] [configureGateway] successfully configured gateway on node=%s",
|
|
self.node_path)
|
|
break
|
|
else:
|
|
self.logger.error(
|
|
"[HydraWorkerNode] [configureGateway] failed to configure gateway on node=%s after %s retries",
|
|
self.node_path, retry)
|
|
#mark this guy offline since we can't configure it remotely
|
|
self.status = HydraWorkerNode.OFFLINE
|
|
except Exception as e:
|
|
self.logger.exception(
|
|
"[HydraWorkerNode] [configureGateway] problem configuring gateway, marking node dead: %s", str(e))
|
|
self.status = HydraWorkerNode.OFFLINE
|
|
|
|
def establishGateway(self):
|
|
"""
|
|
Safely establish the adapter to the hydra gateway on the node. If it
|
|
cannot be established mark the node dead and set it to None.
|
|
"""
|
|
|
|
hga = None
|
|
self.logger.info("[HydraWorkerNode] [establishGateway] attempting to connect to gateway=%s for node=%s ...",
|
|
self.gateway_uri, self.node_path)
|
|
try:
|
|
hga = HydraGatewayAdapter(self.node_path, self.session_key, self.gateway_uri)
|
|
self.logger.info("[HydraWorkerNode] [establishGateway] successfully connected to gateway=%s for node=%s",
|
|
self.gateway_uri, self.node_path)
|
|
except splunk.SplunkdConnectionException:
|
|
self.logger.error(
|
|
"[HydraWorkerNode] [establishGateway] could not connect to gateway=%s for node=%s due to a socket error, timeout, or other fundamental communication issue, marking node as dead",
|
|
self.gateway_uri, self.node_path)
|
|
self.status = HydraWorkerNode.OFFLINE
|
|
except splunk.AuthenticationFailed:
|
|
self.logger.error(
|
|
"[HydraWorkerNode] [establishGateway] could not authenticate with gateway=%s for node=%s due to a splunkd authentication issue, marking node as dead",
|
|
self.gateway_uri, self.node_path)
|
|
self.status = HydraWorkerNode.OFFLINE
|
|
except splunk.LicenseRestriction:
|
|
self.logger.error(
|
|
"[HydraWorkerNode] [establishGateway] could not authenticate with gateway=%s for node=%s due to a splunkd license issue, this is fatal, marking node as dead permanently",
|
|
self.gateway_uri, self.node_path)
|
|
self.status = HydraWorkerNode.OFFLINE
|
|
except splunk.AuthorizationFailed:
|
|
self.logger.error(
|
|
"[HydraWorkerNode] [establishGateway] could not authenticate with gateway=%s for node=%s due to a splunkd user permissions issue, this is fatal, marking node as dead permanently",
|
|
self.gateway_uri, self.node_path)
|
|
self.status = HydraWorkerNode.OFFLINE
|
|
except splunk.ResourceNotFound:
|
|
self.logger.error(
|
|
"[HydraWorkerNode] [establishGateway] could not authenticate with gateway=%s for node=%s due to missing hydra gatekeeper EAI endpoint, this is fatal, marking node as dead permanently",
|
|
self.gateway_uri, self.node_path)
|
|
self.status = HydraWorkerNode.OFFLINE
|
|
except splunk.InternalServerError as e:
|
|
self.logger.error(
|
|
"[HydraWorkerNode] [establishGateway] could not authenticate with gateway=%s for node=%s due to internal server error=\"%s\", marking node as dead",
|
|
self.gateway_uri, self.node_path, str(e))
|
|
self.status = HydraWorkerNode.OFFLINE
|
|
except splunk.BadRequest as e:
|
|
self.logger.error(
|
|
"[HydraWorkerNode] [establishGateway] could not authenticate with gateway=%s for node=%s due to bad request error=\"%s\", marking node as dead",
|
|
self.gateway_uri, self.node_path, str(e))
|
|
self.status = HydraWorkerNode.OFFLINE
|
|
except splunk.RESTException as e:
|
|
self.logger.error(
|
|
"[HydraWorkerNode] [establishGateway] could not authenticate with gateway=%s for node=%s due to some crazy REST error=\"%s\", marking node as dead",
|
|
self.gateway_uri, self.node_path, str(e))
|
|
self.status = HydraWorkerNode.OFFLINE
|
|
except ServerNotFoundError as e:
|
|
self.logger.error(
|
|
"[HydraWorkerNode] [establishGateway] could not find gateway=%s for node=%s error=\"%s\", node will be dead permanently",
|
|
self.gateway_uri, self.node_path, str(e))
|
|
self.status = HydraWorkerNode.OFFLINE
|
|
except Exception as e:
|
|
self.logger.error(
|
|
"[HydraWorkerNode] [establishGateway] could not authenticate with gateway=%s for node=%s due to error=\"%s\", marking node as dead",
|
|
self.gateway_uri, self.node_path, str(e))
|
|
self.status = HydraWorkerNode.OFFLINE
|
|
|
|
self.gateway_adapter = hga
|
|
|
|
def getActiveJobInfo(self):
|
|
"""
|
|
Pulls the latest job information from the node's hydra gateway.
|
|
|
|
@return: a dict
|
|
key : target|task|metadata_id
|
|
value : is array of three items
|
|
0 : aggregate execution time
|
|
1 : number of times execution time is reported for this category
|
|
2 : unclaimed job count for this category
|
|
"""
|
|
job_info = {}
|
|
try:
|
|
job_info = self.gateway_adapter.get_job_info()
|
|
except Exception as e:
|
|
self.logger.exception(
|
|
"[HydraWorkerNode] node=%s is likely dead, could not get info on current job count, msg : %s",
|
|
self.node_path, str(e))
|
|
self.updateStatus(refresh_session_key=True)
|
|
|
|
return job_info
|
|
|
|
def __str__(self):
|
|
"""
|
|
Print the object and the node path as a string representation for this node.
|
|
"""
|
|
return "HydraWorkerNode(" + self.node_path + ")"
|
|
|
|
def __repr__(self):
|
|
"""
|
|
Print the object and the node path as a string representation for this node.
|
|
"""
|
|
return "HydraWorkerNode(" + self.node_path + ")"
|
|
|
|
def refreshSessionKey(self):
|
|
"""
|
|
Attempt to refresh the session key of this node either with shared key auth or
|
|
with username/password.
|
|
|
|
RETURNS True if successful, False if not
|
|
"""
|
|
session_key = None
|
|
unrecoverable = False
|
|
try:
|
|
session_key = auth.getSessionKey(self.model.user, self.password, self.node_path)
|
|
except splunk.SplunkdConnectionException:
|
|
self.logger.error(
|
|
"[HydraWorkerNode] node=%s is dead, could not connect to splunkd check path and if splunkd is up on remote node",
|
|
self.node_path)
|
|
except splunk.LicenseRestriction:
|
|
unrecoverable = True
|
|
self.logger.error("[HydraWorkerNode] node=%s is dead due to a license issue", self.node_path)
|
|
except splunk.AuthorizationFailed:
|
|
unrecoverable = True
|
|
self.logger.error(
|
|
"[HydraWorkerNode] node=%s is dead, could connect to splunkd but failed to auth check username and password",
|
|
self.node_path)
|
|
except Exception as e:
|
|
self.logger.exception("[HydraWorkerNode] node=%s is dead, because some weird stuff happened: %s",
|
|
self.node_path, str(e))
|
|
|
|
if session_key is not None:
|
|
self.logger.debug(
|
|
"[HydraWorkerNode] {0} is alive, successfully authenticated user {1}".format(self, self.model.user))
|
|
self.session_key = session_key
|
|
self.status = HydraWorkerNode.ONLINE
|
|
return True
|
|
else:
|
|
self.logger.error(
|
|
"[HydraWorkerNode] {0} is dead, failed to authenticate user {1}".format(self, self.model.user))
|
|
self.session_key = None
|
|
if self.model.credential_validation and unrecoverable:
|
|
self.logger.info("[HydraWorkerNode] node=%s is unrecoverably dead, marking so in inframon_hydra_node.conf",
|
|
self.node_path)
|
|
self.model.credential_validation = False
|
|
if not self.model.passive_save():
|
|
self.logger.error("[HydraWorkerNode] failed to save credential validation as false for node=%s",
|
|
self.node_path)
|
|
else:
|
|
hydra_scheduler = HydraScheduler()
|
|
hydra_scheduler.setConfModificationTime("node")
|
|
self.logger.info("[HydraWorkerNode] Updating the conf modification time property for node=%s", self.node_path)
|
|
self.status = HydraWorkerNode.OFFLINE
|
|
return False
|
|
#TODO: we need to work out the shared key auth stuff for a future release, not in chablis
|
|
|
|
def updateStatus(self, refresh_session_key=False):
|
|
"""
|
|
Check that this node's session key works and update it to either online or offline.
|
|
If refresh_session_key is True attempt to refresh the session_key on a 401.
|
|
args:
|
|
refresh_session_key - indicates that on a 401 node should attempt to refresh session_key
|
|
|
|
RETURNS self.status
|
|
"""
|
|
if refresh_session_key:
|
|
rsp_code = isSplunkSessionKeyValid(self.node_path, self.session_key, return_status=True)
|
|
if rsp_code == 200:
|
|
self.status = HydraWorkerNode.ONLINE
|
|
#Also need to establish Hydra Gateway because of VMW-4355, to make Hydra Node truely ONLINE
|
|
self.establishGateway()
|
|
elif rsp_code == 401:
|
|
self.logger.debug("[HydraWorkerNode] [updateStatus] detected unauthorized session key, refreshing...")
|
|
if self.refreshSessionKey():
|
|
self.status = HydraWorkerNode.ONLINE
|
|
#If we refresh session key, we should refresh our gateway adapter as well
|
|
self.establishGateway()
|
|
else:
|
|
self.status = HydraWorkerNode.OFFLINE
|
|
elif rsp_code == 404:
|
|
self.logger.debug(
|
|
"[HydraWorkerNode] [updateStatus] detected splunkd restart or explicit session kill, refreshing...")
|
|
if self.refreshSessionKey():
|
|
self.status = HydraWorkerNode.ONLINE
|
|
#If we refresh session key, we should refresh our gateway adapter as well
|
|
self.establishGateway()
|
|
else:
|
|
self.status = HydraWorkerNode.OFFLINE
|
|
else:
|
|
#This means something went funky so try to refresh session key
|
|
self.logger.debug("[HydraWorkerNode] [updateStatus] could not communicate with node, refreshing...")
|
|
if self.refreshSessionKey():
|
|
self.status = HydraWorkerNode.ONLINE
|
|
#If we refresh session key, we should refresh our gateway adapter as well
|
|
self.establishGateway()
|
|
else:
|
|
self.status = HydraWorkerNode.OFFLINE
|
|
else:
|
|
if isSplunkSessionKeyValid(self.node_path, self.session_key):
|
|
self.status = HydraWorkerNode.ONLINE
|
|
else:
|
|
self.status = HydraWorkerNode.OFFLINE
|
|
if self.status == HydraWorkerNode.OFFLINE:
|
|
self.logger.warning("[HydraWorkerNode] node=%s is offline/unresponsive/unauthenticated", self.node_path)
|
|
|
|
return self.status
|
|
|
|
def _toggleHead(self, head, action):
|
|
"""
|
|
Use this method to toggle the disabled property of a particular head
|
|
args:
|
|
head - the name of the head
|
|
action - either enable or disable
|
|
|
|
returns True if successful, else False
|
|
"""
|
|
try:
|
|
if action not in ["enable", "disable"]:
|
|
raise ValueError("[HydraWorkerNode] toggleHead action must be one of [enable, disable]")
|
|
path = self.node_path.rstrip(
|
|
"/") + "/servicesNS/nobody/" + self.app + "/data/inputs/" + self.worker_input_name + "/" + head + "/" + action
|
|
rsp, content = simpleRequest(path, method='POST', sessionKey=self.session_key)
|
|
if rsp.status == 200:
|
|
return True
|
|
else:
|
|
self.logger.error(
|
|
"[HydraWorkerNode] some weird bad stuff happened trying to toggle a hydra head=%s on node=%s see content=%s",
|
|
head, self.node_path, str(content))
|
|
return False
|
|
except ValueError as e:
|
|
raise e
|
|
except Exception:
|
|
self.logger.exception("[HydraWorkerNode] Problem enabling/disabling remote hydra head=%s on node=%s", head,
|
|
self.node_path)
|
|
return False
|
|
|
|
def disableHead(self, head):
|
|
"""
|
|
Shortcut to _toggleHead
|
|
"""
|
|
return self._toggleHead(head, "disable")
|
|
|
|
def enableHead(self, head):
|
|
"""
|
|
Configure and enable a particular head on a particular node
|
|
"""
|
|
uri = self.node_path.rstrip(
|
|
"/") + "/servicesNS/nobody/" + self.app + "/data/inputs/" + self.worker_input_name + "/" + head
|
|
rsp, content = simpleRequest(uri, sessionKey=self.session_key, method='POST',
|
|
postargs={'capabilities': ",".join(self.capabilities),
|
|
'log_level': self.model.log_level})
|
|
if rsp.status != 200:
|
|
self.logger.error(
|
|
"[HydraWorkerNode] problem saving configuration for head=%s on node=%s, got status=%s with an error_response=%s",
|
|
head, self.node_path, rsp.status, content)
|
|
return self._toggleHead(head, "enable")
|
|
|
|
def disableHeads(self, head_list):
|
|
"""
|
|
Disable a list of heads
|
|
|
|
RETURNS status boolean
|
|
"""
|
|
status = True
|
|
for head in head_list:
|
|
status = status and self.disableHead(head)
|
|
return status
|
|
|
|
def enableHeads(self, head_list):
|
|
"""
|
|
Enable a list of heads
|
|
|
|
RETURNS status boolean
|
|
"""
|
|
status = True
|
|
for head in head_list:
|
|
status = status and self.enableHead(head)
|
|
return status
|
|
|
|
def _safeSaveModels(self, *args):
|
|
"""
|
|
Call the save on all arg'ed in items. On fails log and update status
|
|
|
|
RETURNS nothing
|
|
"""
|
|
if args is not None:
|
|
for model in args:
|
|
if not model.passive_save():
|
|
self.logger.error(
|
|
"[HydraWorkerNode] could not save %s queue and node=%s, node may be down oh monkey turds...",
|
|
model.name, self.node_path)
|
|
#verify death
|
|
self.updateStatus()
|
|
|
|
def establishHeads(self):
|
|
"""
|
|
Grab the total configured heads on the node. Normalize the enabled ones
|
|
to match the model, or if model heads undefined define model per the remote config.
|
|
Note that only enabled heads make it to the heads dict.
|
|
|
|
RETURNS - a list of the names of enabled heads
|
|
"""
|
|
#First thing is to check the status, if we are OFFLINE we return []
|
|
enabled_heads = []
|
|
disabled_heads = []
|
|
if self.status != HydraWorkerNode.ONLINE:
|
|
self.logger.info("[HydraWorkerNode] cannot establish current heads for node=%s since it is down",
|
|
self.node_path)
|
|
else:
|
|
#Now that we know we are online we need to pull all the configured heads and sort them into enabled and disabled
|
|
configured_heads = en.getEntities("/data/inputs/" + self.worker_input_name, self.app, "nobody",
|
|
sessionKey=self.session_key, hostPath=self.node_path)
|
|
for head_name, config in configured_heads.items():
|
|
if not normalizeBoolean(config.get("disabled", True)):
|
|
self.logger.debug(
|
|
"[HydraWorkerNode] found enabled input process on node=%s with name=%s and config=%s",
|
|
self.node_path, head_name, config)
|
|
enabled_heads.append(head_name)
|
|
else:
|
|
self.logger.debug(
|
|
"[HydraWorkerNode] found disabled input process on node=%s with name=%s and config=%s",
|
|
self.node_path, head_name, config)
|
|
disabled_heads.append(head_name)
|
|
heads = getattr(self.model, "heads", 0)
|
|
num_enabled_heads = len(enabled_heads)
|
|
if num_enabled_heads == heads:
|
|
self.logger.info("[HydraWorkerNode] Correct number of heads=%s on node=%s", heads, self.node_path)
|
|
elif num_enabled_heads < heads:
|
|
self.logger.error(
|
|
"[HydraWorkerNode] Incorrect number of heads=%s on node=%s, actual_heads=%s, trying to bring number up to correct value",
|
|
heads, self.node_path, num_enabled_heads)
|
|
needed_heads = heads - num_enabled_heads
|
|
if len(disabled_heads) >= needed_heads:
|
|
try:
|
|
new_heads_enabled = []
|
|
for ii in range(needed_heads):
|
|
head = disabled_heads[ii]
|
|
if self.enableHead(head):
|
|
new_heads_enabled.append(head)
|
|
else:
|
|
raise splunk.RESTException("Could not manage inputs on remote node=%s", self.node_path)
|
|
for head in new_heads_enabled:
|
|
enabled_heads.append(head)
|
|
disabled_heads.remove(head)
|
|
except splunk.RESTException:
|
|
self.logger.exception(
|
|
"[HydraWorkerNode] Could not manage inputs on remote node=%s, freezing heads at last configured status",
|
|
self.node_path)
|
|
#Maybe we lost connectivity to the node, we should update status
|
|
if self.updateStatus() == HydraWorkerNode.OFFLINE:
|
|
return []
|
|
else:
|
|
num_disabled_heads = len(disabled_heads)
|
|
self.logger.error(
|
|
"[HydraWorkerNode] node=%s does not have enough configured input processes to match configured heads, can only enable extra %s inputs, required %s",
|
|
self.node_path, num_disabled_heads, needed_heads)
|
|
try:
|
|
new_heads_enabled = []
|
|
for ii in range(num_disabled_heads):
|
|
head = disabled_heads[ii]
|
|
if self.enableHead(head):
|
|
new_heads_enabled.append(head)
|
|
else:
|
|
raise splunk.RESTException("Could not manage inputs on remote node=%s", self.node_path)
|
|
for head in new_heads_enabled:
|
|
disabled_heads.remove(head)
|
|
enabled_heads.append(head)
|
|
except splunk.RESTException:
|
|
self.logger.exception(
|
|
"[HydraWorkerNode] Could not manage inputs on remote node=%s, freezing heads at last configured status",
|
|
self.node_path)
|
|
#Maybe we lost connectivity to the node, we should update status
|
|
if self.updateStatus() == HydraWorkerNode.OFFLINE:
|
|
return []
|
|
else:
|
|
#We have too many heads, need to disable some
|
|
kill_count = num_enabled_heads - heads
|
|
self.logger.error(
|
|
"[HydraWorkerNode] Incorrect number of heads=%s on node=%s, actual_heads=%s, trying to bring number down to correct value",
|
|
heads, self.node_path, num_enabled_heads)
|
|
try:
|
|
new_heads_disabled = []
|
|
for ii in range(kill_count):
|
|
head = enabled_heads[ii]
|
|
if self.disableHead(head):
|
|
new_heads_disabled.append(head)
|
|
else:
|
|
raise splunk.RESTException("Could not manage inputs on remote node=%s", self.node_path)
|
|
for head in new_heads_disabled:
|
|
enabled_heads.remove(head)
|
|
disabled_heads.append(head)
|
|
except splunk.RESTException:
|
|
self.logger.exception(
|
|
"[HydraWorkerNode] Could not manage inputs on remote node=%s, freezing heads at last configured status",
|
|
self.node_path)
|
|
#Maybe we lost connectivity to the node, we should update status
|
|
if self.updateStatus() == HydraWorkerNode.OFFLINE:
|
|
return []
|
|
|
|
#Okay now we have the correct, or frozen enabled and disabled lists
|
|
#We bounce all the heads to make sure they are actually up
|
|
#FIXME: when we have a real interval setting for modular inputs this is not necessary (SOLNVMW-3106)
|
|
self.disableHeads(enabled_heads)
|
|
if not self.enableHeads(enabled_heads):
|
|
self.logger.error(
|
|
"[HydraWorkerNode] Could not manage inputs on remote node=%s while bouncing heads, node is being marked offline",
|
|
self.node_path)
|
|
self.status = HydraWorkerNode.OFFLINE
|
|
if self.model.heads != len(enabled_heads):
|
|
self.model.heads = len(enabled_heads)
|
|
if not self.model.passive_save():
|
|
self.logger.error("[HydraWorkerNode] could not save inframon_hydra_node conf stanza for node=%s",
|
|
self.node_path)
|
|
else:
|
|
hydra_scheduler = HydraScheduler()
|
|
hydra_scheduler.setConfModificationTime("node")
|
|
self.logger.info("[HydraWorkerNode] Updating conf modified property for node=%s", self.node_path)
|
|
return enabled_heads
|
|
|
|
def resurrect(self):
|
|
"""
|
|
Attempt to bring this node back to life, refreshing all internal properties
|
|
|
|
RETURNS status boolean
|
|
"""
|
|
self.model = self.model.from_self()
|
|
if self.updateStatus(refresh_session_key=True) == HydraWorkerNode.ONLINE:
|
|
self.heads_list = self.establishHeads()
|
|
self.configureGateway()
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def checkHeadHealth(self):
|
|
"""
|
|
Check that all heads have reported in their health status.
|
|
If they have not reported in a health status they get bounced.
|
|
|
|
RETURNS: nothing
|
|
"""
|
|
|
|
#First we refresh the endpoint
|
|
path = self.node_path.rstrip("/") + "/servicesNS/nobody/" + self.app + "/configs/conf-hydra_health/_reload"
|
|
try:
|
|
simpleRequest(path, sessionKey=self.session_key, raiseAllErrors=True)
|
|
except splunk.AuthenticationFailed:
|
|
self.logger.warning(
|
|
"[HydraWorkerNode] could not refresh the hydra health conf for node=%s, due to auth failure, refreshing session_key...",
|
|
self.node_path)
|
|
self.updateStatus(refresh_session_key=True)
|
|
except Exception as e:
|
|
self.logger.exception("[HydraWorkerNode] could not refresh the hydra health conf for node=%s, message: %s",
|
|
self.node_path, str(e))
|
|
|
|
try:
|
|
#Now we can iterate across logging and bouncing heads
|
|
bounced_heads = 0
|
|
health_stanzas = HydraHealthStanza.all(host_path=self.node_path, sessionKey=self.session_key)
|
|
health_stanzas = health_stanzas.filter_by_app(self.app)
|
|
health_stanzas._owner = "nobody"
|
|
for stanza in health_stanzas:
|
|
if stanza.head is None:
|
|
self.logger.warning("[HydraWorkerNode] got a bad stanza in hydra_health stanza=%s", stanza.name)
|
|
continue
|
|
self.logger.error(
|
|
"[HydraWorkerNode] regrowing head due to sad face health problem reported for head=%s on node=%s : msg='%s'",
|
|
stanza.head, self.node_path, stanza.reason)
|
|
successful = False
|
|
already_disabled = False
|
|
for retry in range(3):
|
|
if not already_disabled and not self.disableHead(stanza.head):
|
|
continue
|
|
else:
|
|
already_disabled = True
|
|
if not self.enableHead(stanza.head):
|
|
continue
|
|
else:
|
|
successful = True
|
|
break
|
|
del retry #just to stop the linter's whining
|
|
if successful:
|
|
#Where one falls two shall grow in their place! ... err i mean one
|
|
self.logger.info(
|
|
"[HydraWorkerNode] successfully regrew head=%s on node=%s after health cry sad face",
|
|
stanza.head, self.node_path)
|
|
bounced_heads += 1
|
|
if not stanza.passive_delete():
|
|
self.logger.error(
|
|
"[HydraWorkerNode] could not delete health stanza for head=%s on node=%s after restart, head will likely be double restarted",
|
|
stanza.head, self.node_path)
|
|
else:
|
|
self.logger.error(
|
|
"[HydraWorkerNode] failed to regrow head=%s on node=%s after health cry sad face, will try again later",
|
|
stanza.head, self.node_path)
|
|
except splunk.AuthenticationFailed:
|
|
self.logger.exception(
|
|
"[HydraWorkerNode] could not act on the hydra health conf for node=%s, due to auth failure, refreshing session_key...",
|
|
self.node_path)
|
|
self.updateStatus(refresh_session_key=True)
|
|
except Exception as e:
|
|
self.logger.exception("[HydraWorkerNode] could not act on the hydra health conf for node=%s, message: %s",
|
|
self.node_path, str(e))
|
|
|
|
#Finally log what we did
|
|
if bounced_heads == 0:
|
|
self.logger.debug("[HydraWorkerNode] no heads regrown after they cried for help on node=%s", self.node_path)
|
|
else:
|
|
self.logger.info("[HydraWorkerNode] head_count=%s regrown after crying for help on node=%s", bounced_heads,
|
|
self.node_path)
|
|
|
|
def addJob(self, priority_job_tuple, config_token=None, is_atomic=False):
|
|
"""
|
|
Add a new job to the open queue
|
|
|
|
@type priority_job_tuple: tuple
|
|
@param priority_job_tuple: tuple of (priority num, JobTuple)
|
|
@type config_token: HydraConfigToken
|
|
@param config_token: the HydraConfigToken that created the job
|
|
@type is_atomic: bool
|
|
@param is_atomic: True if the job to be assigned is atomic, False otherwise
|
|
|
|
@rtype: bool
|
|
@return: status boolean
|
|
"""
|
|
self.add_jobs.append(priority_job_tuple)
|
|
if is_atomic:
|
|
priority_num, job_tuple = priority_job_tuple
|
|
del priority_num
|
|
self.atomic_add_jobs[job_tuple.name] = config_token
|
|
|
|
return True
|
|
|
|
def commitJobs(self):
|
|
"""
|
|
Take all jobs in the add_jobs queue and commit them to the gateway on
|
|
this data collection node. If tyhere is a communication failure return
|
|
|
|
@rtype: tuple
|
|
@return: (any jobs that need to be reassigned as a list, atomic job names to config tokens as dict)
|
|
"""
|
|
to_reassign = []
|
|
atomic_to_reassign = {}
|
|
#Send to the worker
|
|
#TODO: we still want to look at handling partial job parse completes both in the gateway service and here (implement a status code 205)
|
|
if len(self.add_jobs) > 0:
|
|
status_code = self.gateway_adapter.commit_job_batch(self.add_jobs)
|
|
if status_code == 200:
|
|
self.logger.debug("[HydraWorkerNode] successfully saved job batch on node=%s with number_new_jobs=%s",
|
|
self.node_path, len(self.add_jobs))
|
|
for job_name, config_token in self.atomic_add_jobs.items():
|
|
config_token.register_assignment(job_name, self.node_path)
|
|
else:
|
|
self.logger.error(
|
|
"[HydraWorkerNode] could not save job batch on node=%s got a status_code=%s may be a sad face situation for that node",
|
|
self.node_path, status_code)
|
|
self.updateStatus(refresh_session_key=True)
|
|
to_reassign = self.add_jobs
|
|
atomic_to_reassign = self.atomic_add_jobs
|
|
|
|
#Finally we added all our jobs so we set the add jobs to empty
|
|
self.add_jobs = []
|
|
self.atomic_add_jobs = {}
|
|
|
|
return to_reassign, atomic_to_reassign
|
|
|
|
def setMetadata(self, metadata_dict, bounce_heads=True):
|
|
"""
|
|
Set the metadata stanza on this node to reflect the given dict.
|
|
If it fails this node is marked as dead
|
|
|
|
RETURNS nothing
|
|
"""
|
|
#build new metadata stanza
|
|
success = False
|
|
try:
|
|
new_metadata_stanza = HydraMetadataStanza(self.app, "nobody", "metadata", host_path=self.node_path,
|
|
sessionKey=self.session_key)
|
|
for metadata_id, metadata in metadata_dict.items():
|
|
setattr(new_metadata_stanza, metadata_id, metadata)
|
|
|
|
old_metadata_stanza = HydraMetadataStanza.from_name("metadata", self.app, "nobody", self.node_path,
|
|
self.session_key)
|
|
if old_metadata_stanza:
|
|
if not old_metadata_stanza.passive_delete():
|
|
self.logger.error(
|
|
"[HydraWorkerNode] node=%s failed to delete old metadata, some unnecessary data may linger",
|
|
self.node_path)
|
|
if not new_metadata_stanza.passive_save():
|
|
self.logger.error(
|
|
"[HydraWorkerNode] node=%s failed to save new metadata, node is effectively dead, will attempt to resurrect...",
|
|
self.node_path)
|
|
self.status = HydraWorkerNode.OFFLINE
|
|
else:
|
|
if bounce_heads and hasattr(self, "heads_list"):
|
|
#Since metadata was updated we bounce the heads on the remote node to get them to pick up the new metadata
|
|
heads = self.heads_list
|
|
self.disableHeads(heads)
|
|
self.enableHeads(heads)
|
|
success = True
|
|
self.logger.info("[HydraWorkerNode] New meta data is distributed: %s.", new_metadata_stanza)
|
|
except splunk.SplunkdConnectionException:
|
|
self.logger.error(
|
|
"[HydraWorkerNode] node=%s is dead, could not connect to splunkd check path and if splunkd is up on remote node",
|
|
self.node_path)
|
|
except splunk.LicenseRestriction:
|
|
self.logger.error("[HydraWorkerNode] node=%s is dead due to a license issue", self.node_path)
|
|
except splunk.AuthorizationFailed:
|
|
self.logger.error(
|
|
"[HydraWorkerNode] node=%s is dead, could connect to splunkd but failed to auth check username and password",
|
|
self.node_path)
|
|
except Exception as e:
|
|
self.logger.exception("[HydraWorkerNode] node=%s is dead, because some weird stuff happened: %s",
|
|
self.node_path, str(e))
|
|
if not success:
|
|
self.status = HydraWorkerNode.OFFLINE
|
|
|
|
|
|
class HydraWorkerNodeManifest(object):
|
|
"""
|
|
This is a container for many HydraWorkerNode objects.
|
|
It provides convenience methods for dealing with groups of nodes.
|
|
"""
|
|
|
|
def __init__(self, logger, node_list, app, worker_input_name):
|
|
"""
|
|
Construct a manifest with the given nodes.
|
|
args:
|
|
logger - a logger instance
|
|
node_list - a list of HydraWorkerNode Objects
|
|
app - the app namespace to apply to the nodes
|
|
"""
|
|
self.logger = logger
|
|
self.worker_input_name = worker_input_name
|
|
self.nodes = node_list
|
|
node_path_list = []
|
|
nodes_by_path = {}
|
|
for node in node_list:
|
|
node_path_list.append(node.node_path)
|
|
nodes_by_path[node.node_path] = node
|
|
self.all_node_paths = node_path_list
|
|
self.nodes_by_path = nodes_by_path
|
|
self.app = app
|
|
self.active_nodes, self.dead_nodes = self.getNodes()
|
|
self.target_node_affinity = {}
|
|
|
|
def updateTargetNodeAffinity(self, target, node_path):
|
|
"""
|
|
Update the target node affinity object to reflect the assignment of one
|
|
of a target to a particular node.
|
|
args:
|
|
target - the target whose affinities need to be updated
|
|
node_path - the path of the node that just did business with the target
|
|
|
|
RETURNS nothing
|
|
"""
|
|
node_affinity_array = self.target_node_affinity.get(target, False)
|
|
if not node_affinity_array:
|
|
node_affinity_array = []
|
|
self.target_node_affinity[target] = node_affinity_array
|
|
try:
|
|
node_affinity_array.remove(node_path)
|
|
except ValueError:
|
|
self.logger.info(
|
|
"[HydraWorkerNodeManifest] first time job for target={0} assigned to worker={1}".format(target,
|
|
node_path))
|
|
#Update by reference
|
|
node_affinity_array.insert(0, node_path)
|
|
|
|
def getPreferredNodesForTarget(self, target):
|
|
"""
|
|
Get the node affinity for a particular target
|
|
args:
|
|
target - the target for which you want the node affinity
|
|
|
|
RETURNS an array of node_paths ordered from preferred to least preferred
|
|
"""
|
|
return self.target_node_affinity.get(target, False) or []
|
|
|
|
def refreshNodes(self, confirm_status=True):
|
|
"""
|
|
Refresh the active and dead nodes lists
|
|
"""
|
|
self.active_nodes, self.dead_nodes = self.getNodes(confirm_status=confirm_status)
|
|
|
|
def getNodes(self, confirm_status=False):
|
|
"""
|
|
This method retrieves a list of active nodes, that is nodes that
|
|
have a status value of HydraWorkerNode.ONLINE. The nodes are not
|
|
tested for whether or not they are online unless conirm_status is
|
|
True. If confirm_status is True nodes will also attempt to refresh
|
|
their session_keys if they are failing.
|
|
args:
|
|
confirm_status - flag indicating whether or not to test the
|
|
status of the nodes
|
|
|
|
RETURNS a list of online HydraWorkerNode objects and a list of offline
|
|
"""
|
|
active_nodes = []
|
|
dead_nodes = []
|
|
self.logger.debug("[HydraWorkerNodeManifest] checking the status of all nodes")
|
|
if confirm_status:
|
|
for node in self.nodes:
|
|
if node.updateStatus(refresh_session_key=True):
|
|
active_nodes.append(node)
|
|
else:
|
|
dead_nodes.append(node)
|
|
else:
|
|
for node in self.nodes:
|
|
if node.status == HydraWorkerNode.ONLINE:
|
|
active_nodes.append(node)
|
|
else:
|
|
dead_nodes.append(node)
|
|
|
|
return active_nodes, dead_nodes
|
|
|
|
def checkHealth(self, active_nodes=None):
|
|
"""
|
|
Check the health of nodes, bouncing any heads that have not reported in.
|
|
|
|
RETURNS: nothing
|
|
"""
|
|
if active_nodes is None:
|
|
active_nodes = self.active_nodes
|
|
for node in active_nodes:
|
|
self.logger.debug("[HydraWorkerNodeManifest] checking health of node=%s", node.node_path)
|
|
node.checkHeadHealth()
|
|
|
|
def resurrectDeadNodes(self):
|
|
"""
|
|
Work through the dead nodes trying to bring them back to life
|
|
|
|
RETURNS a list of nodes that were brought back to life
|
|
"""
|
|
resurrected_nodes = []
|
|
for ii in range(len(self.dead_nodes)):
|
|
node = self.dead_nodes[ii]
|
|
self.logger.debug("[HydraWorkerNodeManifest] attempting to resurrect node=%s", str(node))
|
|
if node.resurrect():
|
|
self.logger.debug("[HydraWorkerNodeManifest] successfully resurrected node=%s", str(node))
|
|
resurrected_nodes.append(node)
|
|
self.dead_nodes.pop(ii)
|
|
else:
|
|
self.logger.info("[HydraWorkerNodeManifest] failed to resurrect node=%s", str(node))
|
|
if len(resurrected_nodes) > 0:
|
|
self.active_nodes += resurrected_nodes
|
|
|
|
return resurrected_nodes
|
|
|
|
def sprayJobSet(self, job_set, atomic_job_set):
|
|
"""
|
|
This is a safety method in case something happens during ready job assignment.
|
|
It is non ideal and just dumps jobs out to workers as quick as possible
|
|
|
|
@type job_set: list
|
|
@param job_set: this is an iterable of JobTuple
|
|
@type atomic_job_set: dict
|
|
@param atomic_job_set: dict of job_name -> config_token that created it
|
|
|
|
@rtype: None
|
|
@return None
|
|
"""
|
|
self.logger.debug("[HydraWorkerNodeManifest] failover to generic job spray initiated")
|
|
self.refreshNodes(confirm_status=False)
|
|
worker_count = len(self.active_nodes)
|
|
if worker_count < 1:
|
|
self.logger.error(
|
|
"[HydraWorkerNodeManifest] Attempted to assign jobs but we have no active workers to assign to. Restarting Scheduler...")
|
|
raise ForceHydraRebuild
|
|
worker_index = 0
|
|
for job_tuple in job_set:
|
|
if isinstance(job_tuple, JobTuple):
|
|
job_name = job_tuple.name
|
|
task = job_tuple.task
|
|
elif isinstance(job_tuple, tuple) and len(job_tuple) == 2:
|
|
job_name = job_tuple[1].name
|
|
task = job_tuple[1].task
|
|
else:
|
|
raise TypeError(
|
|
"Unexpected type=%s and size inside job_batch, expected either JobTuple or tuple of form (priority, JobTuple)" % type(
|
|
job_tuple))
|
|
iter_count = 0
|
|
while iter_count < worker_count:
|
|
iter_count += 1
|
|
if worker_index == worker_count:
|
|
worker_index = 0
|
|
worker = self.active_nodes[worker_index]
|
|
worker_index += 1
|
|
if worker.hasCapability(task):
|
|
if job_name in atomic_job_set:
|
|
worker.addJob(job_tuple, atomic_job_set[job_name], True)
|
|
else:
|
|
worker.addJob(job_tuple)
|
|
break
|
|
else:
|
|
self.logger.error(
|
|
"[HydraCollectionManifest] unable to find an active node capable of executing job=%s of task=%s if no node configured for this task becomes active jobs of this task will never be assigned",
|
|
job_tuple, task)
|
|
|
|
#Now that we have established the assignments, we actually commit the queues
|
|
reassign_jobs = []
|
|
atomic_reassign_jobs = {}
|
|
for worker in self.active_nodes:
|
|
try:
|
|
failed_to_assign, atomic_failed_to_assign = worker.commitJobs()
|
|
reassign_jobs += failed_to_assign
|
|
atomic_reassign_jobs.update(atomic_failed_to_assign)
|
|
except Exception:
|
|
self.logger.exception(
|
|
"[HydraCollectionManifest] failed to assign batch of jobs for node=%s, marking dead and reassigning jobs to others, may cause job duplication",
|
|
worker.node_path)
|
|
reassign_jobs += worker.add_jobs
|
|
worker.add_jobs = []
|
|
|
|
if reassign_jobs != []:
|
|
#call primitive sprayJobSet that sprays already parsed jobs.
|
|
self.sprayJobSet(reassign_jobs, atomic_reassign_jobs)
|
|
|
|
def commitMetadata(self, metadata_dict):
|
|
"""
|
|
Set the metadata stanza for all active nodes to the passed dict
|
|
Note that this deletes all past metadata which may result in the failure of
|
|
old jobs if metadata has changed
|
|
"""
|
|
for worker in self.active_nodes:
|
|
worker.setMetadata(metadata_dict)
|
|
self.refreshNodes()
|
|
|
|
|
|
class HydraScheduler(ModularInput):
|
|
title = "Hydra Scheduler"
|
|
description = "Schedule Distributed Work. DO NOT have more than one scheduler working simultaneously. It will result in task duplication"
|
|
collection_model = None
|
|
app = None
|
|
collection_conf_name = None
|
|
worker_input_name = None
|
|
generate_auto_offset = True
|
|
|
|
def __init__(self):
|
|
self.output = XMLOutputManager()
|
|
args = [
|
|
Field("name", "Scheduler Name",
|
|
"A name for your scheduler input to attach to all events that originate from it directly.",
|
|
required_on_create=False),
|
|
Field("log_level", "Logging Level", "This is the level at which the scheduler will log data.",
|
|
required_on_create=True),
|
|
DurationField("duration", "Duration",
|
|
"This is the minimum time between runs of the input should it exit for some reason",
|
|
required_on_create=True)
|
|
]
|
|
scheme_args = {'title': self.title,
|
|
'description': self.description,
|
|
'use_external_validation': "true",
|
|
'streaming_mode': "xml",
|
|
'use_single_instance': "false"}
|
|
ModularInput.__init__(self, scheme_args, args)
|
|
|
|
# Use the below data to update data in hydra_avg_execution_time.json
|
|
self.checkpoint_file = "hydra_avg_execution_time.json"
|
|
# Number of cycle of scheduler after that check point dir need to be updated
|
|
# TODO: this value should be relative to interval or duration value
|
|
self.checkpoint_update_cycle = 200
|
|
|
|
def checkDeadNodes(self, collect_list):
|
|
"""
|
|
Check a the node manifest's dead nodes and attempt to bring them back up.
|
|
If they come back up, give them some credentials.
|
|
|
|
RETURNS: nothing
|
|
"""
|
|
resurrected_nodes = self.node_manifest.resurrectDeadNodes()
|
|
if len(resurrected_nodes) > 0:
|
|
self.logger.info("distributing credentials and metadata to newly resurrected nodes=%s", resurrected_nodes)
|
|
self.distributeCredentials(resurrected_nodes, collect_list)
|
|
self.distributeMetadata(resurrected_nodes)
|
|
|
|
|
|
def augmentMetadataByStanza(self, config, stanza_name):
|
|
"""
|
|
Overload this method to augment the metadata used by a particular job.
|
|
Editing config by reference will edit the configuration for all jobs
|
|
in the stanza.
|
|
|
|
RETURNS nothing (all edits must be done by reference)
|
|
"""
|
|
pass
|
|
|
|
def augmentMetadataByTarget(self, special, config, stanza_name, target):
|
|
"""
|
|
Overload this method to augment the metadata used by a particular job.
|
|
Editing config by reference will edit the configuration for all jobs
|
|
in the stanza.
|
|
Editing special by reference will edit it only for particular targets
|
|
within the stanza.
|
|
|
|
RETURNS nothing (all edits must be done by reference)
|
|
"""
|
|
pass
|
|
|
|
def augmentMetadataByTask(self, special, config, stanza_name, target, task):
|
|
"""
|
|
Overload this method to augment the metadata used by a particular job.
|
|
Editing config by reference will edit the configuration for all jobs
|
|
in the stanza.
|
|
Editing special by reference will edit it only for particular target
|
|
task pairs within the stanza.
|
|
|
|
RETURNS nothing (all edits must be done by reference)
|
|
"""
|
|
pass
|
|
|
|
def augmentTaskExecutionTime(self, collection_manifest):
|
|
'''
|
|
Overload this methof to agument execution time for task or target|task|metadata_id
|
|
level
|
|
@param collection_manifest: HydraCollectionManifest object which is created earlier
|
|
@return modifies object of collection_manifest
|
|
'''
|
|
# Read data from check point dir
|
|
if self.checkpoint_data_exists(self.checkpoint_file, self.checkpoint_dir):
|
|
data = self.get_checkpoint_data(self.checkpoint_file, self.checkpoint_dir)
|
|
# Update data at task level target_task_metadata_level_data, task_level_data
|
|
if data and isinstance(data, dict):
|
|
for key, items in data.get("task_level_data", {}).items():
|
|
# Reset the count
|
|
items[1] = 1
|
|
if key in collection_manifest.task_aggre_exec_time:
|
|
collection_manifest.task_aggre_exec_time[key] = tuple(items)
|
|
for key, items in data.get("target_task_metadata_level_data", {}).items():
|
|
# Reset the count
|
|
items[1] = 1
|
|
# if target is deleted then we do not want to read information about that target
|
|
if key in collection_manifest.task_target_metaid_aggre_exec_time:
|
|
collection_manifest.task_target_metaid_aggre_exec_time[key] = tuple(items)
|
|
return collection_manifest
|
|
|
|
def getConfigTokenOffsets(self, token_list, total_worker_heads, schedular_execution_time=15,
|
|
head_dist_bucketsize=2):
|
|
"""
|
|
Calculate offset values for job if no of jobs are more than is head_dist_bucketsize*total_worker_heads
|
|
Offset values is being calculated by grouping equal interval together and assign offset value for those
|
|
interval
|
|
|
|
@param:
|
|
token_list - HydraConfigToken list
|
|
total_worker_heads - total active heads
|
|
schedular_execution_time - schedular execution time
|
|
head_dist_bucketsize - Min bucket counts for each head job buckets
|
|
|
|
@return: dict of offset for each job
|
|
"""
|
|
self.logger.info("Start process to get initial offset for number of jobs %s", len(token_list))
|
|
# Note minimum jobs assign to each worker is 2 (head_distribution_size), so it token is more than 2* total_heads
|
|
if (head_dist_bucketsize * total_worker_heads > len(token_list)):
|
|
self.logger.info(
|
|
"Total jobs are less than threashold value of worker(s) load, hence no need to set auto job offset")
|
|
return
|
|
# check for schedular execution time
|
|
if schedular_execution_time == 0:
|
|
self.logger.info("Schedular execution time can't be zero, returning default offset values")
|
|
return
|
|
|
|
# Group equal interval token for per target
|
|
group_dict = {}
|
|
for token in token_list:
|
|
if group_dict.get(token.interval, False):
|
|
# Interval already exists, add token to existing list
|
|
self.logger.debug("Appending token %s in interval %s", token, token.interval)
|
|
group_dict[token.interval].append(token)
|
|
else:
|
|
# Add _internval first time
|
|
self.logger.info("Adding first time data %s interval %s", token, token.interval)
|
|
group_dict[token.interval] = [token]
|
|
# Done with grouping, now apply algorithm for each group of token
|
|
self.logger.debug("Calculated group based upon interval %s", str(group_dict))
|
|
|
|
for interval, tokens in group_dict.items():
|
|
if len(tokens) <= 1:
|
|
# Only one job, no need to distribute
|
|
self.logger.debug(
|
|
"Auto offset algorithm work only if there is more than one job has same interval. We found only one job with interval value=%s, hence skipping auto offset calcualtion for it",
|
|
interval)
|
|
continue
|
|
if interval is None or interval <= schedular_execution_time:
|
|
# Make sure we have atleast one cycle to set auto set
|
|
temp_interval = schedular_execution_time
|
|
self.logger.debug(
|
|
"To calculate offset, job interval should be more than hydra schedular execution time, hence distribute offset with in interval (hydra schedular time)=%s",
|
|
temp_interval)
|
|
else:
|
|
temp_interval = interval
|
|
# No of schedular cycle to distribute jobs over the cycles
|
|
no_of_cycles = math.ceil(float(interval) / schedular_execution_time)
|
|
self.logger.debug(
|
|
"Number of hydra schedular cycles=%s, in which job is distributed by adding offset value.",
|
|
no_of_cycles)
|
|
# Load balance factor which allow to jobs offset by no of jobs in that interval
|
|
# For example 10 cycles, 3 tokens, allow to set offset of interval of 45
|
|
increasing_offset_factor = math.floor(no_of_cycles / len(tokens)) * schedular_execution_time
|
|
# if Jobs (token) are more than no of cycles
|
|
if increasing_offset_factor == 0:
|
|
increasing_offset_factor = schedular_execution_time
|
|
|
|
self.logger.debug("Job offset incremental value=%s", increasing_offset_factor)
|
|
for ii in range(len(tokens)):
|
|
if tokens[ii].task + "_offset" in tokens[ii].metadata:
|
|
self.logger.info("External offset is set hence ignoring the auto offset")
|
|
else:
|
|
tokens[ii].setOffset((ii % no_of_cycles) * increasing_offset_factor)
|
|
self.logger.info("New initial offset %s for token %s",
|
|
(ii % no_of_cycles) * increasing_offset_factor, tokens[ii])
|
|
self.logger.info("Successfully set offsets for all tokens")
|
|
|
|
def updateCheckPointDir(self, collection_manifest):
|
|
'''
|
|
Update check point dir using collection_manifest object
|
|
'''
|
|
if collection_manifest:
|
|
self.set_checkpoint_data(self.checkpoint_file,
|
|
{"task_level_data": collection_manifest.task_aggre_exec_time,
|
|
"target_task_metadata_level_data": collection_manifest.task_target_metaid_aggre_exec_time},
|
|
self.checkpoint_dir)
|
|
self.logger.info("Updated check point file=%s successfully", self.checkpoint_file)
|
|
else:
|
|
self.logger.error("Failed to update check point file=%s, because collection manifest is not defined.",
|
|
self.checkpoint_file)
|
|
|
|
def updateTargetDictStatus(self, target_info_dict={}):
|
|
pass
|
|
|
|
def checkvCenterConnectivity(self, rewrite=False):
|
|
pass
|
|
|
|
def distributeHierarchy(self):
|
|
pass
|
|
|
|
def establishCollectionManifest(self, calculate_auto_offset=False, total_heads=0, is_timediff_lt_4hr=True, old_token_list=[]):
|
|
"""
|
|
Get the information from the collection conf then break it up into
|
|
atomic tasks and place them in the collection manifest
|
|
|
|
@param:
|
|
calculate_auto_offset: Required if initial offset needs to be calculated
|
|
total_heads: total active worker heads, required if calculate_auto_offset is set to True
|
|
|
|
return HydraCollectionManifest with entire contents of collect conf file
|
|
"""
|
|
#Get collection conf information
|
|
for retry in range(4):
|
|
collects = self.collection_model.all(host_path=self.local_server_uri, sessionKey=self.local_session_key)
|
|
collects._owner = "nobody"
|
|
collects = collects.filter_by_app(self.app)
|
|
if collects is not None and len(collects) > 0:
|
|
break
|
|
else:
|
|
if retry == 3:
|
|
self.logger.error(
|
|
"[establishCollectionManifest] Could not get collection or no collection defined from scheduler host=%s, after number of retry=%s",
|
|
self.local_server_uri, retry)
|
|
raise ForceHydraRebuild(
|
|
"[establishCollectionManifest] Failed to get collection, hence rebuilding hydra...")
|
|
else:
|
|
self.logger.error(
|
|
"[establishCollectionManifest] Could not get collection or no collection defined from scheduler host=%s, after number of retry=%s",
|
|
self.local_server_uri, retry)
|
|
metadata_dict = {}
|
|
token_list = []
|
|
|
|
for collect in collects:
|
|
self.logger.info("Processing collection stanza={0}".format(collect.name))
|
|
config = {}
|
|
username = collect.username
|
|
for field in collect.model_fields:
|
|
#forgive me this but models don't implement a get item function so we have to do this
|
|
config[field] = getattr(collect, field)
|
|
self.logger.info("parsed collection stanza={0} into a config={1}".format(collect.name, str(config)))
|
|
metadata_id = "metadata_" + collect.name
|
|
metadata_dict[metadata_id] = config
|
|
self.augmentMetadataByStanza(config, collect.name)
|
|
special = {}
|
|
for target in collect.target:
|
|
self.augmentMetadataByTarget(special, config, collect.name, collect.target)
|
|
for task in collect.task:
|
|
self.augmentMetadataByTask(special, config, collect.name, collect.target, collect.task)
|
|
token_list.append(
|
|
HydraConfigToken(target, username, task, metadata_id, self.logger, metadata=config,
|
|
special=special))
|
|
self.logger.debug("Establishing collection manifest with token list: {0}".format(str(token_list)))
|
|
|
|
# calculate auto offset
|
|
if calculate_auto_offset:
|
|
self.getConfigTokenOffsets(token_list, total_heads, schedular_execution_time=self.scheduling_resolution,
|
|
head_dist_bucketsize=2)
|
|
#Distribute Metadata to all nodes
|
|
self.metadata_dict = metadata_dict
|
|
|
|
collection_manifest = HydraCollectionManifest(self.logger, metadata_dict, token_list, self.app)
|
|
collection_manifest = self.augmentTaskExecutionTime(collection_manifest)
|
|
return collection_manifest
|
|
|
|
def distributeCredentials(self, nodes, collect_list):
|
|
"""
|
|
This takes in a list of nodes and then distributes all credentials
|
|
local to the current app to all the nodes, excluding node credentials.
|
|
args:
|
|
nodes - list of HydraWorkerNode objects to distribute credentials to
|
|
|
|
RETURNS nothing
|
|
"""
|
|
#gather up our credentials
|
|
creds = SplunkStoredCredential.all(host_path=self.local_server_uri, sessionKey=self.local_session_key)
|
|
# Removing filter by add-on namespace while pushing Credentials to DCN for accessing creds globally
|
|
# creds = creds.filter_by_app(self.app)
|
|
creds._owner = "nobody" #FIXME: nobody no no
|
|
self.logger.debug("attempting to distribute local credentials to nodes={0}".format(str(nodes)))
|
|
for cred in creds:
|
|
self.logger.debug("processing credential for realm={0} user={1}".format(cred.realm, cred.username))
|
|
# Only distribute credentials for vCenters!
|
|
if cred.realm in collect_list:
|
|
for node in nodes:
|
|
new_cred = SplunkStoredCredential(self.app, "nobody", cred.username, sessionKey=node.session_key,
|
|
host_path=node.node_path)
|
|
new_cred.realm = cred.realm
|
|
new_cred.password = cred.clear_password
|
|
new_cred.username = cred.username
|
|
if not new_cred.passive_save():
|
|
self.logger.error(
|
|
"Failed to distribute credential: realm={0} username={1} to node={2}".format(cred.realm,
|
|
cred.username,
|
|
node.node_path))
|
|
else:
|
|
self.logger.debug(
|
|
"Successfully distributed credential: realm={0} username={1} to node={2}".format(cred.realm,
|
|
cred.username,
|
|
node.node_path))
|
|
self.logger.debug("finished distributing local credentials to nodes={0}".format(str(nodes)))
|
|
|
|
def distributeMetadata(self, node_list=None):
|
|
"""
|
|
If we have both a metadata dict and a node manifest, distribute the metadata to all nodes
|
|
args:
|
|
node_list - if not None will only distribute metadata to given list of HydraWorkerNode objects
|
|
|
|
RETURNS nothing
|
|
"""
|
|
if node_list is not None and self.metadata_dict is not None:
|
|
for node in node_list:
|
|
node.setMetadata(self.metadata_dict)
|
|
if self.node_manifest is not None:
|
|
self.node_manifest.refreshNodes()
|
|
elif self.node_manifest is not None and self.metadata_dict is not None:
|
|
self.node_manifest.commitMetadata(self.metadata_dict)
|
|
|
|
def getConfModificationTime(self):
|
|
pass
|
|
|
|
def setConfModificationTime(self, entity_type):
|
|
pass
|
|
|
|
def establishNodeManifest(self):
|
|
"""
|
|
Get all configured worker nodes and construct a node manifest
|
|
|
|
RETURNS: HydraWorkerNodeManifest Instance will all configured nodes
|
|
"""
|
|
#Establish node list
|
|
node_stanzas = HydraNodeStanza.all(host_path=self.local_server_uri, sessionKey=self.local_session_key)
|
|
node_stanzas._owner = "nobody" #self.asset_owner
|
|
node_stanzas = node_stanzas.filter_by_app(self.app)
|
|
|
|
#Iterate on all nodes, checking if alive and sorting appropriately
|
|
node_list = []
|
|
for node_stanza in node_stanzas:
|
|
if self.pool_name == node_stanza.pool_name:
|
|
password = SplunkStoredCredential.get_password(node_stanza.name, node_stanza.user, self.app,
|
|
session_key=self.local_session_key,
|
|
host_path=self.local_server_uri)
|
|
if isinstance(node_stanza.gateway_port, int):
|
|
gateway_port = node_stanza.gateway_port
|
|
else:
|
|
gateway_port = 8008
|
|
gateway_uri = node_stanza.name.rstrip("/0123456789") + str(gateway_port)
|
|
node = HydraWorkerNode(self.logger, node_stanza.name, password, node_stanza, gateway_uri, node_stanza.pool_name,
|
|
metadata_dict=self.metadata_dict, worker_input_name=self.worker_input_name)
|
|
node_list.append(node)
|
|
|
|
return HydraWorkerNodeManifest(self.logger, node_list, self.app, self.worker_input_name)
|
|
|
|
def run(self, stanza):
|
|
#Get config info
|
|
if isinstance(stanza, list):
|
|
self.name = stanza[0].get('name', None)
|
|
log_level = stanza[0].get("log_level", "WARN").upper()
|
|
is_interval_field_defined = True if int(stanza[0].get("interval", -1)) > 0 else False
|
|
else:
|
|
self.name = stanza.get('name', None)
|
|
log_level = stanza.get("log_level", "WARN").upper()
|
|
is_interval_field_defined = True if int(stanza.get("interval", -1)) > 0 else False
|
|
|
|
logname = "hydra_inframon_scheduler_" + self.name.replace("://", "_") + ".log"
|
|
self.pool_name = self.name.replace("://", "_").replace("ta_vmware_collection_scheduler_inframon_","")
|
|
|
|
input_config = self._input_config
|
|
#Handle local authentication
|
|
self.local_session_key = input_config.session_key
|
|
self.local_server_uri = input_config.server_uri
|
|
#splunk.setDefault('sessionKey', local_session_key) oh but wait this will get overridden all the f'n time so we have to explicitly pass session keys
|
|
splunk.setDefault('user', "nobody")
|
|
|
|
#this may be made a configuration option
|
|
self.scheduling_resolution = 5
|
|
|
|
#Set up logger
|
|
if log_level not in ["DEBUG", "INFO", "WARN", "WARNING", "ERROR"]:
|
|
log_level = logging.WARN
|
|
self.logger = setupLogger(logger=None,
|
|
log_format='%(asctime)s %(levelname)s [' + self.name + '] %(message)s',
|
|
level=log_level, log_name=logname)
|
|
self.logger.warn("log_level was set to a non-recognizable level it has be reset to WARNING level")
|
|
else:
|
|
self.logger = setupLogger(logger=None,
|
|
log_format='%(asctime)s %(levelname)s [' + self.name + '] %(message)s',
|
|
level=log_level, log_name=logname)
|
|
self.logger.debug("logger reset with log level of {0}".format(log_level))
|
|
|
|
#Self Validation
|
|
if self.collection_model is None:
|
|
self.logger.error(
|
|
"HydraScheduler implementation {0} did not have a collection model specified, you must specify a collection model".format(
|
|
self.name))
|
|
raise NotImplementedError("All HydraScheduler implementations must specify a collection model.")
|
|
if self.app is None:
|
|
self.logger.error(
|
|
"HydraScheduler implementation {0} did not have an app specified, you must specify an app".format(
|
|
self.name))
|
|
raise NotImplementedError("All HydraScheduler implementations must specify an app.")
|
|
if self.collection_conf_name is None:
|
|
self.logger.error(
|
|
"HydraScheduler implementation {0} did not have a collection conf name specified, you must specify a collection conf or changes cannot be identified".format(
|
|
self.name))
|
|
raise NotImplementedError("All HydraScheduler implementations must specify a collection conf.")
|
|
if self.worker_input_name is None:
|
|
self.logger.error(
|
|
"HydraScheduler implementation {0} did not have a worker input name specified, you must specify the associated hydra worker's modular input name".format(
|
|
self.name))
|
|
raise NotImplementedError(
|
|
"All HydraScheduler implementations must specify the associated hydra worker's modular input name.")
|
|
|
|
#Debug logging
|
|
self.logger.debug("Initialized with local server uri of {0}".format(self.local_server_uri))
|
|
|
|
try:
|
|
self.metadata_dict = None
|
|
self.node_manifest = None
|
|
self.app_home = os.path.join(make_splunkhome_path(['etc', 'apps']), self.app)
|
|
is_timediff_lt_4hr = True
|
|
old_token_list = []
|
|
#Initialize node manifest
|
|
self.node_manifest = self.establishNodeManifest()
|
|
self.logger.debug("Initialized node manifest with nodes={0}".format(str(self.node_manifest.all_node_paths)))
|
|
|
|
# Calculating number of heads to get auto initial offset.
|
|
head_count = 0
|
|
for worker in self.node_manifest.active_nodes:
|
|
head_count += worker.model.heads
|
|
|
|
#Initialize collection manifest
|
|
collection_manifest = self.establishCollectionManifest(calculate_auto_offset=self.generate_auto_offset,
|
|
total_heads=head_count, is_timediff_lt_4hr=is_timediff_lt_4hr, old_token_list=old_token_list)
|
|
self.logger.debug("Initialized collection manifest")
|
|
|
|
# get modification times for inframon_hydra_node and inframon_ta_vmware_collection conf files
|
|
node_conf_mtime, collection_conf_mtime = self.getConfModificationTime()
|
|
# check vcenter connectivity if change in collection
|
|
if (datetime.datetime.utcnow() - collection_conf_mtime >= datetime.timedelta(0, 1800)):
|
|
self.logger.info("Rechecking vCenter connectivity as 30 minutes elapsed.")
|
|
self.checkvCenterConnectivity(rewrite=True)
|
|
|
|
#Distribute packages
|
|
self.distributeCredentials(self.node_manifest.active_nodes, collection_manifest.collect_list)
|
|
self.distributeMetadata()
|
|
|
|
#Loop time!
|
|
self.output.initStream()
|
|
target_info_dict = {"target_status_checkedtime":time.time(), "target_hostlist_updatedtime":time.time(), "is_timediff_lt_4hr":is_timediff_lt_4hr}
|
|
while True:
|
|
distribute_creds = False
|
|
node_conf_current_mtime, collection_conf_current_mtime = self.getConfModificationTime()
|
|
#Update the status of target into target_info_dict
|
|
self.updateTargetDictStatus(target_info_dict)
|
|
#Check if conf files have been modified or have not been modified since last 4 hours
|
|
if not target_info_dict["is_timediff_lt_4hr"] or collection_conf_mtime < collection_conf_current_mtime:
|
|
self.updateCheckPointDir(collection_manifest)
|
|
old_token_list = collection_manifest._config_token_list
|
|
collection_manifest = self.establishCollectionManifest(
|
|
calculate_auto_offset=self.generate_auto_offset, total_heads=head_count, is_timediff_lt_4hr=target_info_dict["is_timediff_lt_4hr"], old_token_list=old_token_list)
|
|
if collection_conf_mtime < collection_conf_current_mtime:
|
|
self.distributeHierarchy()
|
|
self.distributeMetadata()
|
|
target_info_dict["target_status_checkedtime"] = time.time()
|
|
target_info_dict["target_hostlist_updatedtime"] = time.time()
|
|
collection_conf_mtime = collection_conf_current_mtime
|
|
distribute_creds = True
|
|
self.logger.debug("Re-established collection manifest after filesystem change or 4 hours elapsed")
|
|
#TODO: handle continuous mode jobs here, as well as potential cancels
|
|
if node_conf_mtime < node_conf_current_mtime:
|
|
node_conf_mtime = node_conf_current_mtime
|
|
self.node_manifest = self.establishNodeManifest()
|
|
self.distributeMetadata()
|
|
distribute_creds = True
|
|
self.logger.info("Re-established node manifest after filesystem change")
|
|
if distribute_creds:
|
|
# Distribute Credentials as change in file system
|
|
self.distributeCredentials(self.node_manifest.active_nodes, collection_manifest.collect_list)
|
|
time_to_next_job = collection_manifest.getTimeToNextJob()
|
|
if time_to_next_job > 0:
|
|
self.logger.debug("No jobs ready for scheduling going to sleep for maximum %s",
|
|
self.scheduling_resolution)
|
|
time.sleep(min(time_to_next_job, self.scheduling_resolution))
|
|
continue
|
|
|
|
#Update nodes
|
|
## TODO: Remove health check once we completely move to splunk 6.0 version onwards
|
|
self.node_manifest.checkHealth()
|
|
self.logger.debug("Updated status of active nodes")
|
|
|
|
#Check on nodes that are inactive
|
|
self.checkDeadNodes(collection_manifest.collect_list)
|
|
self.logger.debug("Checked status of dead nodes")
|
|
|
|
#Spread work around
|
|
collection_manifest.sprayReadyJobs(self.node_manifest)
|
|
self.logger.debug("Sprayed all ready jobs onto active nodes")
|
|
|
|
#rinse, repeat! also maybe we should be printing some performance stats or something here?
|
|
# update checkpoint dir
|
|
if self.checkpoint_update_cycle <= 0:
|
|
self.updateCheckPointDir(collection_manifest)
|
|
self.checkpoint_update_cycle = 50
|
|
else:
|
|
self.checkpoint_update_cycle = self.checkpoint_update_cycle - 1
|
|
|
|
self.output.finishStream()
|
|
except Exception as e:
|
|
self.output.finishStream()
|
|
self.logger.exception("Problem with hydra scheduler {0}:\n {1}".format(self.name, str(e)))
|
|
self.logger.warning(
|
|
"Exiting current run of hydra scheduler, expecting to restart based on duration or interval")
|
|
# If node manifest is not establised because of any exception then UnboundLocalError error shows up
|
|
# To avoid this we are using try ..except block to update checkPoint dir
|
|
try:
|
|
self.updateCheckPointDir(collection_manifest)
|
|
except UnboundLocalError as e:
|
|
self.logger.error("Could not save checkpoint information due to failure to establish node manifest.")
|
|
|
|
finally:
|
|
if not is_interval_field_defined or ver.__version__ < '6.0':
|
|
return False
|
|
else:
|
|
sys.exit(1) |