You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

410 lines
15 KiB

# Copyright (C) 2005-2021 Splunk Inc. All Rights Reserved.
#CORE PYTHON IMPORTS
import os
import datetime
try:
import _pickle as cPickle
except ImportError:
import cPickle
try:
from urllib.request import urlopen, Request, build_opener
from urllib.error import HTTPError
except ImportError:
from urllib2 import urlopen, Request, build_opener, HTTPError
import json
from base64 import b64encode, b64decode
from collections import namedtuple
from time import mktime, sleep
from . import six
#SPLUNK IMPORTS
import splunk.entity as en
#Basic Job Named Tuple
JobTuple = namedtuple("JobTuple", "name target task metadata_id create_time last_time expiration_period special")
class HydraCommon(object):
"""
This class contains utility methods or variables used across the scheduler and
worker classes.
"""
@staticmethod
def getConfModTime(app_home, conf, collection_conf_name=None):
"""
Get the modification time for the specified conf, collect or node
return epoch time for conf modification
"""
if app_home == None:
raise NotImplementedError("app_home must not be None")
conf_name = None
if conf == "collection":
if collection_conf_name == None:
raise NotImplementedError("Collection Conf Name Must Be Specified")
conf_name = collection_conf_name
elif conf == "node":
conf_name = "inframon_hydra_node.conf"
elif conf == "metadata":
conf_name = "inframon_hydra_metadata.conf"
else:
raise NotImplementedError("Unrecognized Conf Parameter in getConfModTime")
default_path = os.path.join(app_home, "default", conf_name)
local_path = os.path.join(app_home, "local", conf_name)
if os.path.exists(default_path):
default_time = os.path.getmtime(default_path)
else:
default_time = 0
if os.path.exists(local_path):
local_time = os.path.getmtime(local_path)
else:
local_time = 0
return max(default_time, local_time)
class HydraGatewayAdapter(object):
"""
This class acts as the go between for schedulers and workers and the
hydra gateway service.
"""
def __init__(self, splunkd_uri, splunk_session_key, gateway_uri):
self.splunkd_uri = splunkd_uri
self.splunk_session_key = splunk_session_key
self.gateway_uri = gateway_uri.rstrip("/")
self.authenticate_gateway()
def authenticate_gateway(self):
"""
Call out to splunkd to get the key to the hydra gateway
"""
for retry in range(4):
entity = en.getEntity("/hydra/hydra_gatekeeper", "hydra_gateway", sessionKey=self.splunk_session_key,
hostPath=self.splunkd_uri)
self.gateway_auth_key = entity["key"]
if self.gateway_auth_key != "DEFER":
self.opener = build_opener()
self.opener.addheaders = [('X-hydra-auth', self.gateway_auth_key)]
break
else:
#Give the gateway time to come up
sleep(2)
else:
raise Exception("[HydraGatewayAdapter] could not authenticate with gateway after %s retries" % (str(retry)))
#Cache Endpoint Wrappers:
def get_cache(self, name):
"""
Get the current cache value for the given name.
Note that caches are assumed to be JSON serializable and deserializable
ARGS:
name - the name of the cache entry
RETURNS deserialized cache value or None if it does not exist
"""
uri = self.gateway_uri + "/hydra/cache"
headers = {"X-HYDRA-CACHE-NAME": name}
req = Request(uri, headers=headers)
try:
resp = self.opener.open(req)
return json.loads(resp.read().decode('utf-8'))
except HTTPError as e:
if e.code == 404:
return None
else:
raise e
def set_cache(self, name, value, expiration=None):
"""
Set the current cache value for the given name.
Note that caches are assumed to be JSON serializable and deserializable
@type name: str
@param name: the name of the cache entry
@type value: json serializable object
@param value: JSON serializable python object (dict preferred)
@type expiration: int or None
@param expiration: the period in sec from set time after which the cache should be cleared
@rtype: int
@return: the response code of the cache request
"""
uri = self.gateway_uri + "/hydra/cache"
headers = {"X-HYDRA-CACHE-NAME": name}
if isinstance(expiration, int):
headers["X-HYDRA-CACHE-EXPIRY"] = str(expiration)
body = json.dumps(value)
req = Request(uri, headers=headers, data=body.encode('utf-8'))
try:
resp = self.opener.open(req)
return resp.code
except HTTPError as e:
return e.code
def set_cache_batch(self, cache_items, expiration=None):
"""
Set a batch of caches to the gateway cache.
Note that caches are assumed to be JSON serializable and deserializable
@type cache_items: iterable of tuples of the form (<cache_name>, <cache_value>)
@param cache_items: the set of cache name and value pairs to be set in the gateway cache
@type expiration: int or None
@param expiration: the period from set time after which the cache should be cleared
@rtype: int
@return: the response code of the cache request
"""
uri = self.gateway_uri + "/hydra/cache/batch"
headers = {}
if isinstance(expiration, int):
headers["X-HYDRA-CACHE-EXPIRY"] = str(expiration)
body_list = []
for name, value in cache_items:
body_list.append(name + "\t" + json.dumps(value))
body = "\n".join(body_list)
req = Request(uri, headers=headers, data=body.encode('utf-8'))
try:
resp = self.opener.open(req)
return resp.code
except HTTPError as e:
return e.code
#Job Endpoint Wrappers:
def get_job_count(self):
"""
Get the current job count.
RETURNS job count (int)
"""
req = Request(self.gateway_uri + "/hydra/job/info")
resp = self.opener.open(req)
resp_dict = json.loads(resp.read().decode('utf-8'))
return resp_dict['count']
def get_job_info(self):
"""
Get the current job information.
RETURNS job information dict which hold the following values
count: total unclaimed job count
expiry_job_count: expired job so far on this gateway
job_aggregate_execution_info: is a dict
key: target|task|metadata_id
value: is array of three items
0: aggregate execution time
1: number of times execution time is reported for this category
2: unclaimed job count for this category
atomic_job_info is a dict
key: completed_atomic_jobs
value: list of completed job names/ids
key: failed_atomic_jobs
value: list of failed job names/ids
"""
req = Request(self.gateway_uri + "/hydra/job/info")
resp = self.opener.open(req)
resp_dict = json.loads(resp.read().decode('utf-8'))
return resp_dict
def report_failed_atomic_job(self, job_tuple):
'''
Calling gateway to commit failed atomic job execution.
@type job_tuple: JobTuple
@param job_tuple: the JobTuple object for the failed job
@rtype: int
@return: return code from gateway
'''
uri = self.gateway_uri + "/hydra/job/execution/failure"
#Handle Atomic Job
headers = {}
headers["X-HYDRA-ATOMIC-JOB"] = job_tuple.name
req = Request(uri, headers=headers, data="".encode('utf-8'))
try:
resp = self.opener.open(req, timeout=60)
return resp.code
except HTTPError as e:
return e.code
def commit_job_exec_info(self, time_spent, job_tuple, is_atomic=False):
'''
Calling gateway to commit job execution information
@type time_spent: int
@param time_spent: total time is taken by job
@type job_tuple: JobTuple
@param job_tuple: the JobTuple object for the completed job
@type is_atomic: bool
@param is_atomic: True if the job's task was atomic, False otherwise
@rtype: int
@return: return code from gateway
'''
uri = self.gateway_uri + "/hydra/job/execution/info"
#Handle Job Execution Info
serialized_job = self.serialize_job(job_tuple)
content = str(time_spent) + ":" + serialized_job
#Handle Atomic Job
headers = {}
if is_atomic:
headers["X-HYDRA-ATOMIC-JOB"] = job_tuple.name
req = Request(uri, headers=headers, data=content.encode('utf-8'))
try:
resp = self.opener.open(req, timeout=60)
return resp.code
except HTTPError as e:
return e.code
def get_next_job(self, block=True):
"""
Get and deserialize the next job in priority order.
If no jobs available, i.e. 404, return None
ARGS:
block - if false do not wait for job if not available for any reason, return immediately
RETURNS JobTuple
"""
if block:
uri = self.gateway_uri + "/hydra/job/pop"
else:
uri = self.gateway_uri + "/hydra/job/pop?block=0"
req = Request(uri)
try:
resp = self.opener.open(req)
return self.deserialize_job(resp.read().decode('utf-8'))
except HTTPError as e:
if e.code == 404:
return None
else:
raise e
def commit_job_batch(self, job_batch):
"""
Send the given batch to the gateway
ARGS:
job_batch - an iterable of JobTuples or tuples of (priority num, JobTuple)
RETURNS status code
"""
serialized_batch = []
for job_tuple in job_batch:
if isinstance(job_tuple, JobTuple):
#We set our priority number as the expiration time in epoch for a particular job
priority_number = str(int(mktime(job_tuple.create_time.timetuple())) + int(job_tuple.expiration_period))
serialized_batch.append(priority_number + ":" + self.serialize_job(job_tuple))
elif isinstance(job_tuple, tuple) and len(job_tuple) == 2:
#We set our priority number as the passed priority number in the tuple
serialized_batch.append(str(job_tuple[0]) + ":" + self.serialize_job(job_tuple[1]))
else:
raise TypeError(
"Unexpected type=%s and size inside job_batch, expected either JobTuple or tuple of form (priority, JobTuple)" % type(
job_tuple))
batch_body = "\n".join(serialized_batch)
req = Request(self.gateway_uri + "/hydra/job/batch", data=batch_body.encode('utf-8'))
try:
resp = self.opener.open(req, timeout=60)
return resp.code
except HTTPError as e:
return e.code
#Job Parsing:
def _convert_iso_datetime(self, val):
"""
Shameless theft of the ISODateTimeField's string parsing
"""
if not isinstance(val, datetime.datetime):
try:
return datetime.datetime.strptime(val, '%Y-%m-%dT%H:%M:%S.%f')
except TypeError:
#if there is nothing, e.g. constructing a new item, we get TypeError
return datetime.datetime.fromtimestamp(0)
except ValueError:
#support timestamps without fractional seconds
return datetime.datetime.strptime(val, '%Y-%m-%dT%H:%M:%S')
else:
return val.isoformat()
def _parse_special(self, val):
"""
Python Object Field parsing for one part of the job tuple
"""
if isinstance(val, six.string_types):
try:
obj = json.loads(val)
return obj
except:
return dict()
elif isinstance(val, dict):
return val
else:
return dict()
def _dump_special(self, val):
"""
Python Object Field parsing for one part of job Tuple
"""
if not isinstance(val, six.string_types):
return json.dumps(val)
else:
return val
def serialize_job(self, job_tuple):
"""
Serialize the given job_tuple object to a string.
"""
if isinstance(job_tuple, JobTuple):
return "|".join([job_tuple.name, job_tuple.target, job_tuple.task, job_tuple.metadata_id,
self._convert_iso_datetime(job_tuple.create_time),
self._convert_iso_datetime(job_tuple.last_time), str(job_tuple.expiration_period),
self._dump_special(job_tuple.special)])
else:
raise TypeError("Values of hydra job fields must be namedtuples of type JobTuple")
def deserialize_job(self, job_string):
"""
Deserialize the given string representation of a JobTuple type into a JobTuple
"""
if isinstance(job_string, six.string_types):
prop_list = job_string.split("|", 7)
if len(prop_list) == 8:
return JobTuple(
prop_list[0],
prop_list[1],
prop_list[2],
prop_list[3],
self._convert_iso_datetime(prop_list[4]),
self._convert_iso_datetime(prop_list[5]),
prop_list[6],
self._parse_special(prop_list[7])
)
else:
raise ValueError(
"Jobs must be of format <name>|<target>|<task>|<metadata-id>|<create_time>|<last_time>|<expiration_period(seconds)>|<special>, i.e. 8 values")
else:
raise TypeError(
"Serialized jobs must be strings of the form <name>|<target>|<task>|<metadata-id>|<create_time>|<last_time>|<expiration_period(seconds)>|<special>, i.e. 8 values")
#Test endpoint wrappers:
def call_test_static(self):
"""
Makes a call to the /test/static endpoint of the hydra gateway
"""
req = Request(self.gateway_uri + "/test/static")
resp = self.opener.open(req)
return resp.read().decode('utf-8')
def call_test_echo(self, data):
"""
Makes a call to the /test/echo endpoint of the hydra gateway
passing in the data passed to this method
"""
req = Request(self.gateway_uri + "/test/echo", data=data.encode('utf-8'))
resp = self.opener.open(req)
return resp.read().decode('utf-8')