# Copyright (C) 2005-2023 Splunk Inc. All Rights Reserved. #CORE PYTHON IMPORTS import os import datetime try: import _pickle as cPickle except ImportError: import cPickle try: from urllib.request import urlopen, Request, build_opener from urllib.error import HTTPError except ImportError: from urllib2 import urlopen, Request, build_opener, HTTPError import json from base64 import b64encode, b64decode from collections import namedtuple from time import mktime, sleep from . import six #SPLUNK IMPORTS import splunk.entity as en #Basic Job Named Tuple JobTuple = namedtuple("JobTuple", "name target task metadata_id create_time last_time expiration_period special") class HydraCommon(object): """ This class contains utility methods or variables used across the scheduler and worker classes. """ @staticmethod def getConfModTime(app_home, conf, collection_conf_name=None): """ Get the modification time for the specified conf, collect or node return epoch time for conf modification """ if app_home == None: raise NotImplementedError("app_home must not be None") conf_name = None if conf == "collection": if collection_conf_name == None: raise NotImplementedError("Collection Conf Name Must Be Specified") conf_name = collection_conf_name elif conf == "node": conf_name = "hydra_node.conf" elif conf == "metadata": conf_name = "hydra_metadata.conf" else: raise NotImplementedError("Unrecognized Conf Parameter in getConfModTime") default_path = os.path.join(app_home, "default", conf_name) local_path = os.path.join(app_home, "local", conf_name) if os.path.exists(default_path): default_time = os.path.getmtime(default_path) else: default_time = 0 if os.path.exists(local_path): local_time = os.path.getmtime(local_path) else: local_time = 0 return max(default_time, local_time) class HydraGatewayAdapter(object): """ This class acts as the go between for schedulers and workers and the hydra gateway service. """ def __init__(self, splunkd_uri, splunk_session_key, gateway_uri): self.splunkd_uri = splunkd_uri self.splunk_session_key = splunk_session_key self.gateway_uri = gateway_uri.rstrip("/") self.authenticate_gateway() def authenticate_gateway(self): """ Call out to splunkd to get the key to the hydra gateway """ for retry in range(4): entity = en.getEntity("/hydra/hydra_gatekeeper", "hydra_gateway", sessionKey=self.splunk_session_key, hostPath=self.splunkd_uri) self.gateway_auth_key = entity["key"] if self.gateway_auth_key != "DEFER": self.opener = build_opener() self.opener.addheaders = [('X-hydra-auth', self.gateway_auth_key)] break else: #Give the gateway time to come up sleep(2) else: raise Exception("[HydraGatewayAdapter] could not authenticate with gateway after %s retries" % (str(retry))) #Cache Endpoint Wrappers: def get_cache(self, name): """ Get the current cache value for the given name. Note that caches are assumed to be JSON serializable and deserializable ARGS: name - the name of the cache entry RETURNS deserialized cache value or None if it does not exist """ uri = self.gateway_uri + "/hydra/cache" headers = {"X-HYDRA-CACHE-NAME": name} req = Request(uri, headers=headers) try: resp = self.opener.open(req) return json.loads(resp.read().decode('utf-8')) except HTTPError as e: if e.code == 404: return None else: raise e def set_cache(self, name, value, expiration=None): """ Set the current cache value for the given name. Note that caches are assumed to be JSON serializable and deserializable @type name: str @param name: the name of the cache entry @type value: json serializable object @param value: JSON serializable python object (dict preferred) @type expiration: int or None @param expiration: the period in sec from set time after which the cache should be cleared @rtype: int @return: the response code of the cache request """ uri = self.gateway_uri + "/hydra/cache" headers = {"X-HYDRA-CACHE-NAME": name} if isinstance(expiration, int): headers["X-HYDRA-CACHE-EXPIRY"] = str(expiration) body = json.dumps(value) req = Request(uri, headers=headers, data=body.encode('utf-8')) try: resp = self.opener.open(req) return resp.code except HTTPError as e: return e.code def set_cache_batch(self, cache_items, expiration=None): """ Set a batch of caches to the gateway cache. Note that caches are assumed to be JSON serializable and deserializable @type cache_items: iterable of tuples of the form (, ) @param cache_items: the set of cache name and value pairs to be set in the gateway cache @type expiration: int or None @param expiration: the period from set time after which the cache should be cleared @rtype: int @return: the response code of the cache request """ uri = self.gateway_uri + "/hydra/cache/batch" headers = {} if isinstance(expiration, int): headers["X-HYDRA-CACHE-EXPIRY"] = str(expiration) body_list = [] for name, value in cache_items: body_list.append(name + "\t" + json.dumps(value)) body = "\n".join(body_list) req = Request(uri, headers=headers, data=body.encode('utf-8')) try: resp = self.opener.open(req) return resp.code except HTTPError as e: return e.code #Job Endpoint Wrappers: def get_job_count(self): """ Get the current job count. RETURNS job count (int) """ req = Request(self.gateway_uri + "/hydra/job/info") resp = self.opener.open(req) resp_dict = json.loads(resp.read().decode('utf-8')) return resp_dict['count'] def get_job_info(self): """ Get the current job information. RETURNS job information dict which hold the following values count: total unclaimed job count expiry_job_count: expired job so far on this gateway job_aggregate_execution_info: is a dict key: target|task|metadata_id value: is array of three items 0: aggregate execution time 1: number of times execution time is reported for this category 2: unclaimed job count for this category atomic_job_info is a dict key: completed_atomic_jobs value: list of completed job names/ids key: failed_atomic_jobs value: list of failed job names/ids """ req = Request(self.gateway_uri + "/hydra/job/info") resp = self.opener.open(req) resp_dict = json.loads(resp.read().decode('utf-8')) return resp_dict def report_failed_atomic_job(self, job_tuple): ''' Calling gateway to commit failed atomic job execution. @type job_tuple: JobTuple @param job_tuple: the JobTuple object for the failed job @rtype: int @return: return code from gateway ''' uri = self.gateway_uri + "/hydra/job/execution/failure" #Handle Atomic Job headers = {} headers["X-HYDRA-ATOMIC-JOB"] = job_tuple.name req = Request(uri, headers=headers, data="".encode('utf-8')) try: resp = self.opener.open(req, timeout=60) return resp.code except HTTPError as e: return e.code def commit_job_exec_info(self, time_spent, job_tuple, is_atomic=False): ''' Calling gateway to commit job execution information @type time_spent: int @param time_spent: total time is taken by job @type job_tuple: JobTuple @param job_tuple: the JobTuple object for the completed job @type is_atomic: bool @param is_atomic: True if the job's task was atomic, False otherwise @rtype: int @return: return code from gateway ''' uri = self.gateway_uri + "/hydra/job/execution/info" #Handle Job Execution Info serialized_job = self.serialize_job(job_tuple) content = str(time_spent) + ":" + serialized_job #Handle Atomic Job headers = {} if is_atomic: headers["X-HYDRA-ATOMIC-JOB"] = job_tuple.name req = Request(uri, headers=headers, data=content.encode('utf-8')) try: resp = self.opener.open(req, timeout=60) return resp.code except HTTPError as e: return e.code def get_next_job(self, block=True): """ Get and deserialize the next job in priority order. If no jobs available, i.e. 404, return None ARGS: block - if false do not wait for job if not available for any reason, return immediately RETURNS JobTuple """ if block: uri = self.gateway_uri + "/hydra/job/pop" else: uri = self.gateway_uri + "/hydra/job/pop?block=0" req = Request(uri) try: resp = self.opener.open(req) return self.deserialize_job(resp.read().decode('utf-8')) except HTTPError as e: if e.code == 404: return None else: raise e def commit_job_batch(self, job_batch): """ Send the given batch to the gateway ARGS: job_batch - an iterable of JobTuples or tuples of (priority num, JobTuple) RETURNS status code """ serialized_batch = [] for job_tuple in job_batch: if isinstance(job_tuple, JobTuple): #We set our priority number as the expiration time in epoch for a particular job priority_number = str(int(mktime(job_tuple.create_time.timetuple())) + int(job_tuple.expiration_period)) serialized_batch.append(priority_number + ":" + self.serialize_job(job_tuple)) elif isinstance(job_tuple, tuple) and len(job_tuple) == 2: #We set our priority number as the passed priority number in the tuple serialized_batch.append(str(job_tuple[0]) + ":" + self.serialize_job(job_tuple[1])) else: raise TypeError( "Unexpected type=%s and size inside job_batch, expected either JobTuple or tuple of form (priority, JobTuple)" % type( job_tuple)) batch_body = "\n".join(serialized_batch) req = Request(self.gateway_uri + "/hydra/job/batch", data=batch_body.encode('utf-8')) try: resp = self.opener.open(req, timeout=60) return resp.code except HTTPError as e: return e.code #Job Parsing: def _convert_iso_datetime(self, val): """ Shameless theft of the ISODateTimeField's string parsing """ if not isinstance(val, datetime.datetime): try: return datetime.datetime.strptime(val, '%Y-%m-%dT%H:%M:%S.%f') except TypeError: #if there is nothing, e.g. constructing a new item, we get TypeError return datetime.datetime.fromtimestamp(0) except ValueError: #support timestamps without fractional seconds return datetime.datetime.strptime(val, '%Y-%m-%dT%H:%M:%S') else: return val.isoformat() def _parse_special(self, val): """ Python Object Field parsing for one part of the job tuple """ if isinstance(val, six.string_types): try: obj = json.loads(val) return obj except: return dict() elif isinstance(val, dict): return val else: return dict() def _dump_special(self, val): """ Python Object Field parsing for one part of job Tuple """ if not isinstance(val, six.string_types): return json.dumps(val) else: return val def serialize_job(self, job_tuple): """ Serialize the given job_tuple object to a string. """ if isinstance(job_tuple, JobTuple): return "|".join([job_tuple.name, job_tuple.target, job_tuple.task, job_tuple.metadata_id, self._convert_iso_datetime(job_tuple.create_time), self._convert_iso_datetime(job_tuple.last_time), str(job_tuple.expiration_period), self._dump_special(job_tuple.special)]) else: raise TypeError("Values of hydra job fields must be namedtuples of type JobTuple") def deserialize_job(self, job_string): """ Deserialize the given string representation of a JobTuple type into a JobTuple """ if isinstance(job_string, six.string_types): prop_list = job_string.split("|", 7) if len(prop_list) == 8: return JobTuple( prop_list[0], prop_list[1], prop_list[2], prop_list[3], self._convert_iso_datetime(prop_list[4]), self._convert_iso_datetime(prop_list[5]), prop_list[6], self._parse_special(prop_list[7]) ) else: raise ValueError( "Jobs must be of format |||||||, i.e. 8 values") else: raise TypeError( "Serialized jobs must be strings of the form |||||||, i.e. 8 values") #Test endpoint wrappers: def call_test_static(self): """ Makes a call to the /test/static endpoint of the hydra gateway """ req = Request(self.gateway_uri + "/test/static") resp = self.opener.open(req) return resp.read().decode('utf-8') def call_test_echo(self, data): """ Makes a call to the /test/echo endpoint of the hydra gateway passing in the data passed to this method """ req = Request(self.gateway_uri + "/test/echo", data=data.encode('utf-8')) resp = self.opener.open(req) return resp.read().decode('utf-8')