You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

118 lines
4.8 KiB

import splunk.rest as rest
import splunk.entity as entity
import json
import cp_aws_bin.utils.app_util as util
import time
logger = util.get_logger()
TIMESTAMP_ATTRIBUTE_NAME = 'timestamp'
CONF_WEB = 'configs/conf-web'
class KVStoreAccessObject(object):
def __init__(self, collection = None, session_key = None, owner = 'nobody'):
app_name = util.APP_NAME
splunkd_uri = entity.getEntity(CONF_WEB, 'settings', sessionKey=session_key, namespace=app_name, owner=owner).get('mgmtHostPort', '127.0.0.1:8089')
self.url = 'https://%s/servicesNS/nobody/%s/storage/collections/data/%s' % (splunkd_uri, app_name, collection)
self.session_key = session_key
def get_item_by_key(self, key):
"""
Get an item of kvstore.
:param key: key id in kvstore
:return: response
"""
request_url = '%s/%s' % (self.url, key)
response, content = rest.simpleRequest(request_url, sessionKey = self.session_key, method = 'GET', raiseAllErrors = True)
return content
def update_item_by_key(self, key, updated_item):
"""
Update an item of kvstore.
:param key: key id in kvstore
:param updated_item: a json object containing new values of ALL attributes
:return: response
"""
json_args = json.dumps(updated_item)
post_url = '%s/%s' % (self.url, key)
rest.simpleRequest(post_url, sessionKey = self.session_key, jsonargs = json_args, method = 'POST', raiseAllErrors = True)
return
def delete_item_by_key(self, key):
"""
Delete an item of kvstore.
:param key: key id in kvstore
:return: response
"""
delete_url = '%s/%s' % (self.url, key)
rest.simpleRequest(delete_url, sessionKey = self.session_key, method = 'DELETE', raiseAllErrors = True)
return
def query_items(self, query_conditions={}, sort_by=None, sort_direction=-1, from_timestamp=0):
"""
Query items in an order from kvstore by query conditions.
:param query_conditions: a json object containing all the field conditions, {name:value} format
:param sort_by: field name
:param sort_direction: an integer, asc: 1, desc: -1
:param from_timestamp: a timestamp
:return: response
"""
if from_timestamp > 0:
query_conditions[TIMESTAMP_ATTRIBUTE_NAME] = {'$gt': from_timestamp}
get_args = {'query': json.dumps(query_conditions)}
if sort_by is not None:
get_args['sort'] = '%s:%d' % (sort_by, sort_direction)
response, content = rest.simpleRequest(self.url, sessionKey = self.session_key, method = 'GET', getargs = get_args, raiseAllErrors = True)
return content
def delete_items(self):
"""
Delete all items.
:return: response
"""
response, content = rest.simpleRequest(self.url, sessionKey = self.session_key, method = 'DELETE', raiseAllErrors = True)
return content
def delete_staled_items(self, expired_time):
"""
Delete all items by a given expired time
:param expired_time: a timestamp
:return:
"""
timestamp_before = int(time.time()) - expired_time
delete_args = {'query': '{"%s": {"$lt": %d}}' % (TIMESTAMP_ATTRIBUTE_NAME, timestamp_before)}
response, content = rest.simpleRequest(self.url, sessionKey = self.session_key, method = 'DELETE', getargs = delete_args, raiseAllErrors = True)
return content
def delete_items_by_condition(self, delete_conditions = {}):
"""
Delete items from kvstore by given conditions
:param expired_time: a timestamp
:return:
"""
delete_args = {'query': json.dumps(delete_conditions)}
response, content = rest.simpleRequest(self.url, sessionKey = self.session_key, method = 'DELETE', getargs = delete_args, raiseAllErrors = True)
return content
def insert_single_item(self, new_item):
"""
Insert an item into kvstore.
:param new_item: a json object, indicating the new item
:return: response (attribute "_key" stands for the key id of new item)
"""
json_args = json.dumps(new_item)
response, content = rest.simpleRequest(self.url, sessionKey = self.session_key, jsonargs = json_args, method = 'POST', raiseAllErrors = True)
return content
def batch_insert_items(self, new_item_array):
"""
Batch insert items into kvstore
:param new_item_array: an array of new items
:return: response (attribute "_key" stands for the key id of new item)
"""
post_url = self.url + '/batch_save'
json_args = json.dumps(new_item_array)
response, content = rest.simpleRequest(post_url, sessionKey = self.session_key, jsonargs = json_args, method = 'POST', raiseAllErrors = True)
return content