You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

95 lines
3.2 KiB

from typing import Optional, Dict, Any, List
from splunklib.client import (
connect,
Service,
StoragePassword,
ConfigurationFile,
KVStoreCollection
)
from utils import get_logger, app_name
class SplunkApi:
def __init__(self, session_token: str):
self.logger = get_logger()
self.service: Service = connect(
token=session_token,
auto_login=True,
owner="nobody",
app=app_name,
sharing="app"
)
def get_configuration(self, file_name: str) -> Optional[Dict[str, Dict[str, Any]]]:
if file_name not in self.service.confs:
self.logger.debug('configuration file %s not found', file_name)
return None
configuration_file: ConfigurationFile = self.service.confs[file_name]
file_props = dict()
for prop in configuration_file:
file_props[prop.name] = prop.content
self.logger.debug('configuration file %s for file name %s',
file_props, file_name)
return file_props
def create_storage_configuration(self,
collection_id: str,
value_json: str):
kvstore_collection = self.__get_storage_collection(collection_id)
self.logger.debug('inserting into kvstore collection %s value %s',
collection_id, value_json)
kvstore_collection.data.insert(value_json)
def update_storage_configuration(self,
collection_id: str,
key_id: str,
value_json: str):
kvstore_collection = self.__get_storage_collection(collection_id)
self.logger.debug('updating kvstore collection %s for key %s value %s',
collection_id, key_id, value_json)
kvstore_collection.data.update(key_id, value_json)
def get_storage_configuration(self, collection_id: str) -> Optional[List[str]]:
kvstore_collection = self.__get_storage_collection(collection_id)
data = kvstore_collection.data.query()
self.logger.debug('retrieved items from kvstore collection %s data %s',
collection_id, data)
return list(data)
def get_storage_password(self, key: str) -> Optional[str]:
if key not in self.service.storage_passwords:
self.logger.debug('storage password %s not found', key)
return None
storage_password: StoragePassword = self.service.storage_passwords[key]
self.logger.debug('storage password %s found',
key)
return storage_password.clear_password
def __get_storage_collection(self, collection_id: str) -> KVStoreCollection:
if collection_id not in self.service.kvstore:
raise Exception('kvstore collection %s not found', collection_id)
kvstore_collection: KVStoreCollection = self.service.kvstore[collection_id]
self.logger.debug('kvstore collection exist %s for key %s',
kvstore_collection is not None, collection_id)
return kvstore_collection