You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

201 lines
8.3 KiB

import json
import requests
import logging
import sys
from .mad_util import MADRESTException
from .mad_splunk_util import setup_logging, get_conf_stanza
logger = setup_logging('mad_rest.log', 'mad_rest', level=logging.DEBUG)
KV_STORE_PATH = "storage/collections/data"
class MADKVStoreManager(object):
# Per http://docs.splunk.com/Documentation/Splunk/6.5.0/RESTREF/RESTkvstore#Limits, 16 MB is max limit for document
# size in KV store and non-configurable
_max_document_size_limit_bytes = 16777216 # 16 * 1024 * 1024 = 16 MB
# Per http://docs.splunk.com/Documentation/Splunk/6.5.0/RESTREF/RESTkvstore#Limits,
# the max size per batch save in MB is set in the kvstore stanza in the limits.conf
# file with the name max_size_per_batch_save_mb
_max_size_per_batch_save = None
_max_documents_per_batch_save = None
def __init__(self, host_path, app_id, session_key):
self.default_request_options = {
"headers": {
'Authorization': 'Splunk %s' % session_key,
"Content-Type": "application/json"
},
"verify": False
}
self.kv_uri_base = "/".join([host_path, "nobody", app_id, KV_STORE_PATH])
self.kv_uri_default_params = ["output_mode=json"]
self._save_ranges = []
self.session_key = session_key
def _deserialize_json(self, json_str):
try:
if not json_str:
return None
else:
return json.loads(json_str)
except Exception:
err_msg = "Unable to deserialize json, possible corrupted result from kvstore\n%s" % json_str
logger.exception(err_msg)
raise MADRESTException(err_msg, logging.ERROR, status_code=500)
def _handle_response(self, r):
if 200 <= r.status_code < 300:
return self._deserialize_json(r.text)
else:
raise MADRESTException(r.text, logging.ERROR, r.status_code)
def _get_kv_url(self, url_extra, params_extra=None):
if params_extra is None:
params_extra = []
full_url = "/".join([self.kv_uri_base] + url_extra) + "?" + "&".join(params_extra + self.kv_uri_default_params)
logger.debug("kv url is : " + full_url)
return full_url
def _get_request_options(self, options_extra):
options_extra.update(self.default_request_options)
return options_extra
def _set_batch_save_size_limit(self):
"""
Fetches the max size per batch save if not already fetched
"""
# Sets static variables from limits conf file if not already set
try:
if self._max_size_per_batch_save is None or self._max_documents_per_batch_save is None:
resp, cont = get_conf_stanza(self.session_key, 'limits', 'kvstore')
entries = cont.get('entry')
max_mb = False
max_doc = False
for entry in entries:
if entry.get('name') == 'max_size_per_batch_save_mb':
self._max_size_per_batch_save = int(entry.get('content', 50)) * 1024 * 1024
max_mb = True
if entry.get('name') == 'max_documents_per_batch_save':
self._max_documents_per_batch_save = int(entry.get('content', 1000))
max_doc = True
if max_mb and max_doc:
break
except Exception as e:
err_msg = 'Error while fetching max_size_per_batch_save for kvstore ' \
'from limits.conf. Error is: {0}'.format(e)
logger.exception(err_msg)
raise MADRESTException(err_msg, logging.ERROR, status_code=500)
def check_payload_size(self, data_list, throw_on_violation=True):
"""
Method to verify KV store payload size is'nt larger than 16MB limit of per document size
@type: list
@param data_list: JSON list payload to verify
@type: boolean
@param throw_on_violation: True if violation should trigger exception, else returns bool indicating
if violation detected
@rtype: tuple (boolean, integer)
@return: (True, -1) if no violation detected, (False, size causing violation in bytes) if violation detected
"""
if not isinstance(data_list, list):
raise MADRESTException('JSON payload is invalid.')
self._set_batch_save_size_limit()
self._save_ranges = []
cur_size = 0
first_index = 0
for idx, data in enumerate(data_list):
size_of_payload = sys.getsizeof(str(data))
if size_of_payload > self._max_document_size_limit_bytes:
if throw_on_violation:
raise MADRESTException(
'Object you are trying to save is too large (%s bytes). KV store only supports '
'documents within 16MB sizes.' % size_of_payload,
logging.ERROR,
status_code=500
)
else:
# Return False indicating violation even if one object violates limits
return False, size_of_payload
cur_size += size_of_payload
# Check to see if you have reached the max batch save size
# limit in the current index that you are looking at
if cur_size >= self._max_size_per_batch_save or (idx - first_index) >= self._max_documents_per_batch_save:
self._save_ranges.append((first_index, idx))
first_index = idx
cur_size = size_of_payload
self._save_ranges.append((first_index, len(data_list)))
return True, -1
def get_all(self, collection_name, params):
try:
r = requests.get(self._get_kv_url([collection_name]), **self._get_request_options({"params": params}))
except Exception as e:
raise MADRESTException(str(e), logging.ERROR, status_code=500)
return self._handle_response(r)
def get(self, collection_name, entity_id, params):
try:
r = requests.get(self._get_kv_url([collection_name, entity_id]), **self._get_request_options({"params": params}))
except Exception as e:
logging.exception(str(e))
raise MADRESTException(str(e), logging.ERROR, status_code=500)
return self._handle_response(r)
def create(self, collection_name, data):
try:
r = requests.post(self._get_kv_url([collection_name]), **self._get_request_options({"data": json.dumps(data)}))
except Exception as e:
logging.exception(str(e))
raise MADRESTException(str(e), logging.ERROR, status_code=500)
return self._handle_response(r)
def create_bulk(self, collection_name, data):
self.check_payload_size(data)
parsed_contents = []
for data_range in self._save_ranges:
data_chunk = json.dumps(data[data_range[0]:data_range[1]])
parsed_contents.extend(self._execute_batch_save_request(collection_name, data_chunk))
self._save_ranges = []
return parsed_contents
def _execute_batch_save_request(self, collection_name, data):
try:
resp = requests.post(self._get_kv_url([collection_name, 'batch_save']),
**self._get_request_options({'data': data}))
except Exception as e:
logger.exception(str(e))
raise MADRESTException(str(e), logging.ERROR, status_code=500)
return self._handle_response(resp)
def update(self, collection_name, entity_id, data):
try:
r = requests.post(self._get_kv_url([collection_name, entity_id]), **self._get_request_options({"data": json.dumps(data)}))
except Exception as e:
logging.exception(str(e))
raise MADRESTException(str(e), logging.ERROR, status_code=500)
return self._handle_response(r)
def delete(self, collection_name, entity_id):
try:
r = requests.delete(self._get_kv_url([collection_name, entity_id]), **self.default_request_options)
except Exception as e:
logging.exception(str(e))
raise MADRESTException(str(e), logging.ERROR, status_code=500)
return self._handle_response(r)