You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
950 lines
28 KiB
950 lines
28 KiB
#
|
|
# Copyright 2024 Splunk Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
"""Splunk user access control related utilities."""
|
|
|
|
import json
|
|
from typing import List, Optional
|
|
|
|
from splunklib import binding
|
|
|
|
from solnlib import _utils
|
|
from solnlib import splunk_rest_client as rest_client
|
|
from solnlib import utils
|
|
|
|
__all__ = [
|
|
"ObjectACLException",
|
|
"ObjectACL",
|
|
"ObjectACLManagerException",
|
|
"ObjectACLManager",
|
|
"AppCapabilityManagerException",
|
|
"AppCapabilityManager",
|
|
"UserAccessException",
|
|
"check_user_access",
|
|
"InvalidSessionKeyException",
|
|
"get_current_username",
|
|
"UserNotExistException",
|
|
"get_user_capabilities",
|
|
"user_is_capable",
|
|
"get_user_roles",
|
|
]
|
|
|
|
|
|
class ObjectACLException(Exception):
|
|
pass
|
|
|
|
|
|
class ObjectACL:
|
|
"""Object ACL record.
|
|
|
|
Examples:
|
|
>>> from solnlib import user_access
|
|
>>> obj_acl = user_access.ObjectACL(
|
|
>>> 'test_collection',
|
|
>>> '9defa6f510d711e6be16a45e60e34295',
|
|
>>> 'test_object',
|
|
>>> 'Splunk_TA_test',
|
|
>>> 'admin',
|
|
>>> {'read': ['*'], 'write': ['admin'], 'delete': ['admin']},
|
|
>>> False)
|
|
"""
|
|
|
|
OBJ_COLLECTION_KEY = "obj_collection"
|
|
OBJ_ID_KEY = "obj_id"
|
|
OBJ_TYPE_KEY = "obj_type"
|
|
OBJ_APP_KEY = "obj_app"
|
|
OBJ_OWNER_KEY = "obj_owner"
|
|
OBJ_PERMS_KEY = "obj_perms"
|
|
OBJ_PERMS_READ_KEY = "read"
|
|
OBJ_PERMS_WRITE_KEY = "write"
|
|
OBJ_PERMS_DELETE_KEY = "delete"
|
|
OBJ_PERMS_ALLOW_ALL = "*"
|
|
OBJ_SHARED_BY_INCLUSION_KEY = "obj_shared_by_inclusion"
|
|
|
|
def __init__(
|
|
self,
|
|
obj_collection: str,
|
|
obj_id: str,
|
|
obj_type: str,
|
|
obj_app: str,
|
|
obj_owner: str,
|
|
obj_perms: dict,
|
|
obj_shared_by_inclusion: bool,
|
|
):
|
|
"""Initializes ObjectACL.
|
|
|
|
Arguments:
|
|
obj_collection: Collection where object currently stored.
|
|
obj_id: ID of this object.
|
|
obj_type: Type of this object.
|
|
obj_app: App of this object.
|
|
obj_owner: Owner of this object.
|
|
obj_perms: Object perms, like: {'read': ['*'], 'write': ['admin'], 'delete': ['admin']}.
|
|
obj_shared_by_inclusion: Flag of object is shared by inclusion.
|
|
"""
|
|
self.obj_collection = obj_collection
|
|
self.obj_id = obj_id
|
|
self.obj_type = obj_type
|
|
self.obj_app = obj_app
|
|
self.obj_owner = obj_owner
|
|
self._check_perms(obj_perms)
|
|
self._obj_perms = obj_perms
|
|
self.obj_shared_by_inclusion = obj_shared_by_inclusion
|
|
|
|
@classmethod
|
|
def _check_perms(cls, obj_perms):
|
|
if not isinstance(obj_perms, dict):
|
|
raise ObjectACLException(
|
|
"Invalid object acl perms type: %s, should be a dict." % type(obj_perms)
|
|
)
|
|
|
|
if not (
|
|
cls.OBJ_PERMS_READ_KEY in obj_perms
|
|
and cls.OBJ_PERMS_WRITE_KEY in obj_perms
|
|
and cls.OBJ_PERMS_DELETE_KEY in obj_perms
|
|
):
|
|
raise ObjectACLException(
|
|
"Invalid object acl perms: %s, "
|
|
"should include read, write and delete perms." % obj_perms
|
|
)
|
|
|
|
@property
|
|
def obj_perms(self):
|
|
return self._obj_perms
|
|
|
|
@obj_perms.setter
|
|
def obj_perms(self, obj_perms):
|
|
self._check_perms(obj_perms)
|
|
self._obj_perms = obj_perms
|
|
|
|
@property
|
|
def record(self) -> dict:
|
|
"""Get object acl record.
|
|
|
|
Returns: Object acl record, like:
|
|
|
|
{
|
|
'_key': 'test_collection-1234',
|
|
'obj_collection': 'test_collection',
|
|
'obj_id': '1234',
|
|
'obj_type': 'test_object',
|
|
'obj_app': 'Splunk_TA_test',
|
|
'obj_owner': 'admin',
|
|
'obj_perms': {'read': ['*'], 'write': ['admin'], 'delete': ['admin']},
|
|
'obj_shared_by_inclusion': True
|
|
}
|
|
"""
|
|
|
|
return {
|
|
"_key": self.generate_key(self.obj_collection, self.obj_id),
|
|
self.OBJ_COLLECTION_KEY: self.obj_collection,
|
|
self.OBJ_ID_KEY: self.obj_id,
|
|
self.OBJ_TYPE_KEY: self.obj_type,
|
|
self.OBJ_APP_KEY: self.obj_app,
|
|
self.OBJ_OWNER_KEY: self.obj_owner,
|
|
self.OBJ_PERMS_KEY: self._obj_perms,
|
|
self.OBJ_SHARED_BY_INCLUSION_KEY: self.obj_shared_by_inclusion,
|
|
}
|
|
|
|
@staticmethod
|
|
def generate_key(obj_collection: str, obj_id: str) -> str:
|
|
"""Generate object acl record key.
|
|
|
|
Arguments:
|
|
obj_collection: Collection where object currently stored.
|
|
obj_id: ID of this object.
|
|
|
|
Returns:
|
|
Object acl record key.
|
|
"""
|
|
|
|
return "{obj_collection}_{obj_id}".format(
|
|
obj_collection=obj_collection, obj_id=obj_id
|
|
)
|
|
|
|
@staticmethod
|
|
def parse(obj_acl_record: dict) -> "ObjectACL":
|
|
"""Parse object acl record and construct a new `ObjectACL` object from
|
|
it.
|
|
|
|
Arguments:
|
|
obj_acl_record: Object acl record.
|
|
|
|
Returns:
|
|
New `ObjectACL` object.
|
|
"""
|
|
|
|
return ObjectACL(
|
|
obj_acl_record[ObjectACL.OBJ_COLLECTION_KEY],
|
|
obj_acl_record[ObjectACL.OBJ_ID_KEY],
|
|
obj_acl_record[ObjectACL.OBJ_TYPE_KEY],
|
|
obj_acl_record[ObjectACL.OBJ_APP_KEY],
|
|
obj_acl_record[ObjectACL.OBJ_OWNER_KEY],
|
|
obj_acl_record[ObjectACL.OBJ_PERMS_KEY],
|
|
obj_acl_record[ObjectACL.OBJ_SHARED_BY_INCLUSION_KEY],
|
|
)
|
|
|
|
def merge(self, obj_acl: "ObjectACL"):
|
|
"""Merge current object perms with perms of `obj_acl`.
|
|
|
|
Arguments:
|
|
obj_acl: Object acl to merge.
|
|
"""
|
|
|
|
for perm_key in self._obj_perms:
|
|
self._obj_perms[perm_key] = list(
|
|
set.union(
|
|
set(self._obj_perms[perm_key]), set(obj_acl._obj_perms[perm_key])
|
|
)
|
|
)
|
|
if self.OBJ_PERMS_ALLOW_ALL in self._obj_perms[perm_key]:
|
|
self._obj_perms[perm_key] = [self.OBJ_PERMS_ALLOW_ALL]
|
|
|
|
def __str__(self):
|
|
return json.dumps(self.record)
|
|
|
|
|
|
class ObjectACLManagerException(Exception):
|
|
"""Exception for ObjectACLManager."""
|
|
|
|
pass
|
|
|
|
|
|
class ObjectACLNotExistException(Exception):
|
|
"""Exception for the situation when ACL does not exist."""
|
|
|
|
pass
|
|
|
|
|
|
class ObjectACLManager:
|
|
"""Object ACL manager.
|
|
|
|
Examples:
|
|
>>> from solnlib import user_access
|
|
>>> oaclm = user_access.ObjectACLManager(session_key,
|
|
'Splunk_TA_test')
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
collection_name: str,
|
|
session_key: str,
|
|
app: str,
|
|
owner: Optional[str] = "nobody",
|
|
scheme: Optional[str] = None,
|
|
host: Optional[str] = None,
|
|
port: Optional[int] = None,
|
|
**context: dict,
|
|
):
|
|
"""Initializes ObjectACLManager.
|
|
|
|
Arguments:
|
|
collection_name: Collection name to store object ACL info.
|
|
session_key: Splunk access token.
|
|
app: App name of namespace.
|
|
owner: (optional) Owner of namespace, default is `nobody`.
|
|
scheme: (optional) The access scheme, default is None.
|
|
host: (optional) The host name, default is None.
|
|
port: (optional) The port number, default is None.
|
|
context: Other configurations for Splunk rest client.
|
|
|
|
Raises:
|
|
ObjectACLManagerException: If init ObjectACLManager failed.
|
|
"""
|
|
collection_name = "{app}_{collection_name}".format(
|
|
app=app, collection_name=collection_name
|
|
)
|
|
try:
|
|
self._collection_data = _utils.get_collection_data(
|
|
collection_name,
|
|
session_key,
|
|
app,
|
|
owner,
|
|
scheme,
|
|
host,
|
|
port,
|
|
None,
|
|
**context,
|
|
)
|
|
except KeyError:
|
|
raise ObjectACLManagerException(
|
|
f"Get object acl collection: {collection_name} fail."
|
|
)
|
|
|
|
@utils.retry(exceptions=[binding.HTTPError])
|
|
def update_acl(
|
|
self,
|
|
obj_collection: str,
|
|
obj_id: str,
|
|
obj_type: str,
|
|
obj_app: str,
|
|
obj_owner: str,
|
|
obj_perms: dict,
|
|
obj_shared_by_inclusion: bool = True,
|
|
replace_existing: bool = True,
|
|
):
|
|
"""Update acl info of object.
|
|
|
|
Construct a new object acl info first, if `replace_existing` is True
|
|
then replace existing acl info else merge new object acl info with the
|
|
old one and replace the old acl info with merged acl info.
|
|
|
|
Arguments:
|
|
obj_collection: Collection where object currently stored.
|
|
obj_id: ID of this object.
|
|
obj_type: Type of this object.
|
|
obj_app: App of this object.
|
|
obj_owner: Owner of this object.
|
|
obj_perms: Object perms, like:
|
|
|
|
{
|
|
'read': ['*'],
|
|
'write': ['admin'],
|
|
'delete': ['admin']
|
|
}.
|
|
obj_shared_by_inclusion: (optional) Flag of object is shared by
|
|
inclusion, default is True.
|
|
replace_existing: (optional) Replace existing acl info flag, True
|
|
indicates replace old acl info with new one else merge with old
|
|
acl info, default is True.
|
|
"""
|
|
|
|
obj_acl = ObjectACL(
|
|
obj_collection,
|
|
obj_id,
|
|
obj_type,
|
|
obj_app,
|
|
obj_owner,
|
|
obj_perms,
|
|
obj_shared_by_inclusion,
|
|
)
|
|
|
|
if not replace_existing:
|
|
try:
|
|
old_obj_acl = self.get_acl(obj_collection, obj_id)
|
|
except ObjectACLNotExistException:
|
|
old_obj_acl = None
|
|
|
|
if old_obj_acl:
|
|
obj_acl.merge(old_obj_acl)
|
|
|
|
self._collection_data.batch_save(obj_acl.record)
|
|
|
|
@utils.retry(exceptions=[binding.HTTPError])
|
|
def update_acls(
|
|
self,
|
|
obj_collection: str,
|
|
obj_ids: List[str],
|
|
obj_type: str,
|
|
obj_app: str,
|
|
obj_owner: str,
|
|
obj_perms: dict,
|
|
obj_shared_by_inclusion: bool = True,
|
|
replace_existing: bool = True,
|
|
):
|
|
"""Batch update object acl info to all provided `obj_ids`.
|
|
|
|
Arguments:
|
|
obj_collection: Collection where objects currently stored.
|
|
obj_ids: IDs list of objects.
|
|
obj_type: Type of this object.
|
|
obj_app: App of this object.
|
|
obj_owner: Owner of this object.
|
|
obj_perms: Object perms, like:
|
|
|
|
{
|
|
'read': ['*'],
|
|
'write': ['admin'],
|
|
'delete': ['admin']
|
|
}.
|
|
obj_shared_by_inclusion: (optional) Flag of object is shared by
|
|
inclusion, default is True.
|
|
replace_existing: (optional) Replace existing acl info flag, True
|
|
indicates replace old acl info with new one else merge with old acl
|
|
info, default is True.
|
|
"""
|
|
|
|
obj_acl_records = []
|
|
for obj_id in obj_ids:
|
|
obj_acl = ObjectACL(
|
|
obj_collection,
|
|
obj_id,
|
|
obj_type,
|
|
obj_app,
|
|
obj_owner,
|
|
obj_perms,
|
|
obj_shared_by_inclusion,
|
|
)
|
|
|
|
if not replace_existing:
|
|
try:
|
|
old_obj_acl = self.get_acl(obj_collection, obj_id)
|
|
except ObjectACLNotExistException:
|
|
old_obj_acl = None
|
|
|
|
if old_obj_acl:
|
|
obj_acl.merge(old_obj_acl)
|
|
|
|
obj_acl_records.append(obj_acl.record)
|
|
|
|
self._collection_data.batch_save(*obj_acl_records)
|
|
|
|
@utils.retry(exceptions=[binding.HTTPError])
|
|
def get_acl(self, obj_collection: str, obj_id: str) -> "ObjectACL":
|
|
"""Get acl info.
|
|
|
|
Query object acl info with parameter of the combination of
|
|
`obj_collection` and `obj_id` from `self.collection_name` and
|
|
return it.
|
|
|
|
Arguments:
|
|
obj_collection: Collection where object currently stored.
|
|
obj_id: ID of this object.
|
|
|
|
Returns:
|
|
Object acl info if success else None.
|
|
|
|
Raises:
|
|
ObjectACLNotExistException: If object ACL info does not exist.
|
|
"""
|
|
|
|
key = ObjectACL.generate_key(obj_collection, obj_id)
|
|
try:
|
|
obj_acl = self._collection_data.query_by_id(key)
|
|
except binding.HTTPError as e:
|
|
if e.status != 404:
|
|
raise
|
|
|
|
raise ObjectACLNotExistException(
|
|
"Object ACL info of {}_{} does not exist.".format(
|
|
obj_collection, obj_id
|
|
)
|
|
)
|
|
|
|
return ObjectACL.parse(obj_acl)
|
|
|
|
@utils.retry(exceptions=[binding.HTTPError])
|
|
def get_acls(self, obj_collection: str, obj_ids: List[str]) -> List[ObjectACL]:
|
|
"""Batch get acl info.
|
|
|
|
Query objects acl info with parameter of the combination of
|
|
`obj_collection` and `obj_ids` from KVStore and return them.
|
|
|
|
Arguments:
|
|
obj_collection: Collection where object currently stored.
|
|
obj_ids: IDs of objects.
|
|
|
|
Returns:
|
|
List of `ObjectACL` instances.
|
|
"""
|
|
|
|
query = json.dumps(
|
|
{
|
|
"$or": [
|
|
{"_key": ObjectACL.generate_key(obj_collection, obj_id)}
|
|
for obj_id in obj_ids
|
|
]
|
|
}
|
|
)
|
|
obj_acls = self._collection_data.query(query=query)
|
|
|
|
return [ObjectACL.parse(obj_acl) for obj_acl in obj_acls]
|
|
|
|
@utils.retry(exceptions=[binding.HTTPError])
|
|
def delete_acl(self, obj_collection: str, obj_id: str):
|
|
"""Delete acl info.
|
|
|
|
Query object acl info with parameter of the combination of
|
|
`obj_collection` and `obj_ids` from KVStore and delete it.
|
|
|
|
Arguments:
|
|
obj_collection: Collection where object currently stored.
|
|
obj_id: ID of this object.
|
|
|
|
Raises:
|
|
ObjectACLNotExistException: If object ACL info does not exist.
|
|
"""
|
|
|
|
key = ObjectACL.generate_key(obj_collection, obj_id)
|
|
try:
|
|
self._collection_data.delete_by_id(key)
|
|
except binding.HTTPError as e:
|
|
if e.status != 404:
|
|
raise
|
|
|
|
raise ObjectACLNotExistException(
|
|
"Object ACL info of {}_{} does not exist.".format(
|
|
obj_collection, obj_id
|
|
)
|
|
)
|
|
|
|
@utils.retry(exceptions=[binding.HTTPError])
|
|
def delete_acls(self, obj_collection: str, obj_ids: List[str]):
|
|
"""Batch delete acl info.
|
|
|
|
Query objects acl info with parameter of the combination of
|
|
`obj_collection` and `obj_ids` from KVStore and delete them.
|
|
|
|
Arguments:
|
|
obj_collection: Collection where object currently stored.
|
|
obj_ids: IDs of objects.
|
|
"""
|
|
|
|
query = json.dumps(
|
|
{
|
|
"$or": [
|
|
{"_key": ObjectACL.generate_key(obj_collection, obj_id)}
|
|
for obj_id in obj_ids
|
|
]
|
|
}
|
|
)
|
|
self._collection_data.delete(query=query)
|
|
|
|
@utils.retry(exceptions=[binding.HTTPError])
|
|
def get_accessible_object_ids(
|
|
self, user: str, operation: str, obj_collection: str, obj_ids: List[str]
|
|
) -> List[str]:
|
|
"""Get accessible IDs of objects from `obj_acls`.
|
|
|
|
Arguments:
|
|
user: User name of current `operation`.
|
|
operation: User operation, possible option: (read/write/delete).
|
|
obj_collection: Collection where object currently stored.
|
|
obj_ids: IDs of objects.
|
|
|
|
Returns:
|
|
List of IDs of accessible objects.
|
|
"""
|
|
|
|
obj_acls = self.get_acls(obj_collection, obj_ids)
|
|
accessible_obj_ids = []
|
|
for obj_acl in obj_acls:
|
|
perms = obj_acl.obj_perms[operation]
|
|
if ObjectACL.OBJ_PERMS_ALLOW_ALL in perms or user in perms:
|
|
accessible_obj_ids.append(obj_acl.obj_id)
|
|
|
|
return accessible_obj_ids
|
|
|
|
|
|
class AppCapabilityManagerException(Exception):
|
|
"""Exception for AppCapabilityManager."""
|
|
|
|
pass
|
|
|
|
|
|
class AppCapabilityNotExistException(Exception):
|
|
"""Exception for the situation when AppCapability does not exist for a
|
|
specific app."""
|
|
|
|
pass
|
|
|
|
|
|
class AppCapabilityManager:
|
|
"""App capability manager.
|
|
|
|
Examples:
|
|
>>> from solnlib import user_access
|
|
>>> acm = user_access.AppCapabilityManager('test_collection',
|
|
session_key,
|
|
'Splunk_TA_test')
|
|
>>> acm.register_capabilities(...)
|
|
>>> acm.unregister_capabilities(...)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
collection_name: str,
|
|
session_key: str,
|
|
app: str,
|
|
owner: str = "nobody",
|
|
scheme: str = None,
|
|
host: str = None,
|
|
port: int = None,
|
|
**context: dict,
|
|
):
|
|
"""Initializes AppCapabilityManager.
|
|
|
|
Arguments:
|
|
collection_name: Collection name to store capabilities.
|
|
session_key: Splunk access token.
|
|
app: App name of namespace.
|
|
owner: (optional) Owner of namespace, default is `nobody`.
|
|
scheme: (optional) The access scheme, default is None.
|
|
host: (optional) The host name, default is None.
|
|
port: (optional) The port number, default is None.
|
|
context: Other configurations for Splunk rest client.
|
|
|
|
Raises:
|
|
AppCapabilityManagerException: If init AppCapabilityManager failed.
|
|
"""
|
|
self._app = app
|
|
|
|
collection_name = f"{app}_{collection_name}"
|
|
try:
|
|
self._collection_data = _utils.get_collection_data(
|
|
collection_name,
|
|
session_key,
|
|
app,
|
|
owner,
|
|
scheme,
|
|
host,
|
|
port,
|
|
None,
|
|
**context,
|
|
)
|
|
except KeyError:
|
|
raise AppCapabilityManagerException(
|
|
f"Get app capabilities collection: {collection_name} failed."
|
|
)
|
|
|
|
@utils.retry(exceptions=[binding.HTTPError])
|
|
def register_capabilities(self, capabilities: dict):
|
|
"""Register app capabilities.
|
|
|
|
Arguments:
|
|
capabilities: App capabilities, example:
|
|
|
|
{
|
|
'object_type1': {
|
|
'read': 'read_app_object_type1',
|
|
'write': 'write_app_object_type1',
|
|
'delete': 'delete_app_object_type1'},
|
|
'object_type2': {
|
|
'read': 'read_app_object_type2',
|
|
'write': 'write_app_object_type2',
|
|
'delete': 'delete_app_object_type2'
|
|
},
|
|
...
|
|
}
|
|
"""
|
|
|
|
record = {"_key": self._app, "capabilities": capabilities}
|
|
self._collection_data.batch_save(record)
|
|
|
|
@utils.retry(exceptions=[binding.HTTPError])
|
|
def unregister_capabilities(self):
|
|
"""Unregister app capabilities.
|
|
|
|
Raises:
|
|
AppCapabilityNotExistException: If app capabilities are not registered.
|
|
"""
|
|
|
|
try:
|
|
self._collection_data.delete_by_id(self._app)
|
|
except binding.HTTPError as e:
|
|
if e.status != 404:
|
|
raise
|
|
|
|
raise AppCapabilityNotExistException(
|
|
"App capabilities for %s have not been registered." % self._app
|
|
)
|
|
|
|
@utils.retry(exceptions=[binding.HTTPError])
|
|
def capabilities_are_registered(self) -> bool:
|
|
"""Check if app capabilities are registered.
|
|
|
|
Returns:
|
|
True if app capabilities are registered else False.
|
|
"""
|
|
|
|
try:
|
|
self._collection_data.query_by_id(self._app)
|
|
except binding.HTTPError as e:
|
|
if e.status != 404:
|
|
raise
|
|
|
|
return False
|
|
|
|
return True
|
|
|
|
@utils.retry(exceptions=[binding.HTTPError])
|
|
def get_capabilities(self) -> dict:
|
|
"""Get app capabilities.
|
|
|
|
Returns:
|
|
App capabilities.
|
|
|
|
Raises:
|
|
AppCapabilityNotExistException: If app capabilities are not registered.
|
|
"""
|
|
|
|
try:
|
|
record = self._collection_data.query_by_id(self._app)
|
|
except binding.HTTPError as e:
|
|
if e.status != 404:
|
|
raise
|
|
|
|
raise AppCapabilityNotExistException(
|
|
"App capabilities for %s have not been registered." % self._app
|
|
)
|
|
|
|
return record["capabilities"]
|
|
|
|
|
|
class UserAccessException(Exception):
|
|
"""Exception for the situation when there is user access exception."""
|
|
|
|
pass
|
|
|
|
|
|
def check_user_access(
|
|
session_key: str,
|
|
capabilities: dict,
|
|
obj_type: str,
|
|
operation: str,
|
|
scheme: str = None,
|
|
host: str = None,
|
|
port: int = None,
|
|
**context: dict,
|
|
):
|
|
"""User access checker.
|
|
|
|
It will fetch user capabilities from given `session_key` and check if
|
|
the capability extracted from `capabilities`, `obj_type` and `operation`
|
|
is contained, if user capabilities include the extracted capability user
|
|
access is ok else fail.
|
|
|
|
Arguments:
|
|
session_key: Splunk access token.
|
|
capabilities: App capabilities, example:
|
|
|
|
{
|
|
'object_type1': {
|
|
'read': 'read_app_object_type1',
|
|
'write': 'write_app_object_type1',
|
|
'delete': 'delete_app_object_type1'},
|
|
'object_type2': {
|
|
'read': 'read_app_object_type2',
|
|
'write': 'write_app_object_type2',
|
|
'delete': 'delete_app_object_type2'
|
|
},
|
|
...
|
|
}
|
|
obj_type: Object type.
|
|
operation: User operation, possible option: (read/write/delete).
|
|
scheme: (optional) The access scheme, default is None.
|
|
host: (optional) The host name, default is None.
|
|
port: (optional) The port number, default is None.
|
|
context: Other configurations for Splunk rest client.
|
|
|
|
Raises:
|
|
UserAccessException: If user access permission is denied.
|
|
|
|
Examples:
|
|
>>> from solnlib.user_access import check_user_access
|
|
>>> def fun():
|
|
>>> check_user_access(
|
|
>>> session_key, capabilities, 'test_object', 'read')
|
|
>>> ...
|
|
"""
|
|
|
|
username = get_current_username(
|
|
session_key, scheme=scheme, host=host, port=port, **context
|
|
)
|
|
capability = capabilities[obj_type][operation]
|
|
if not user_is_capable(
|
|
session_key,
|
|
username,
|
|
capability,
|
|
scheme=scheme,
|
|
host=host,
|
|
port=port,
|
|
**context,
|
|
):
|
|
raise UserAccessException(
|
|
"Permission denied, %s does not have the capability: %s."
|
|
% (username, capability)
|
|
)
|
|
|
|
|
|
class InvalidSessionKeyException(Exception):
|
|
"""Exception when Splunk session key is invalid."""
|
|
|
|
pass
|
|
|
|
|
|
@utils.retry(exceptions=[binding.HTTPError])
|
|
def get_current_username(
|
|
session_key: str,
|
|
scheme: str = None,
|
|
host: str = None,
|
|
port: int = None,
|
|
**context: dict,
|
|
) -> str:
|
|
"""Get current user name from `session_key`.
|
|
|
|
Arguments:
|
|
session_key: Splunk access token.
|
|
scheme: (optional) The access scheme, default is None.
|
|
host: (optional) The host name, default is None.
|
|
port: (optional) The port number, default is None.
|
|
context: Other configurations for Splunk rest client.
|
|
|
|
Returns:
|
|
Current user name.
|
|
|
|
Raises:
|
|
InvalidSessionKeyException: If `session_key` is invalid.
|
|
|
|
Examples:
|
|
>>> from solnlib import user_access
|
|
>>> user_name = user_access.get_current_username(session_key)
|
|
"""
|
|
|
|
_rest_client = rest_client.SplunkRestClient(
|
|
session_key, "-", scheme=scheme, host=host, port=port, **context
|
|
)
|
|
try:
|
|
response = _rest_client.get(
|
|
"/services/authentication/current-context", output_mode="json"
|
|
).body.read()
|
|
except binding.HTTPError as e:
|
|
if e.status != 401:
|
|
raise
|
|
|
|
raise InvalidSessionKeyException("Invalid session key.")
|
|
|
|
return json.loads(response)["entry"][0]["content"]["username"]
|
|
|
|
|
|
class UserNotExistException(Exception):
|
|
"""Exception when user does not exist."""
|
|
|
|
pass
|
|
|
|
|
|
@utils.retry(exceptions=[binding.HTTPError])
|
|
def get_user_capabilities(
|
|
session_key: str,
|
|
username: str,
|
|
scheme: str = None,
|
|
host: str = None,
|
|
port: int = None,
|
|
**context: dict,
|
|
) -> List[dict]:
|
|
"""Get user capabilities.
|
|
|
|
Arguments:
|
|
session_key: Splunk access token.
|
|
scheme: (optional) The access scheme, default is None.
|
|
host: (optional) The host name, default is None.
|
|
port: (optional) The port number, default is None.
|
|
context: Other configurations for Splunk rest client.
|
|
|
|
Returns:
|
|
User capabilities.
|
|
|
|
Raises:
|
|
UserNotExistException: If `username` does not exist.
|
|
|
|
Examples:
|
|
>>> from solnlib import user_access
|
|
>>> user_capabilities = user_access.get_user_capabilities(
|
|
>>> session_key, 'test_user')
|
|
"""
|
|
|
|
_rest_client = rest_client.SplunkRestClient(
|
|
session_key, "-", scheme=scheme, host=host, port=port, **context
|
|
)
|
|
url = f"/services/authentication/users/{username}"
|
|
try:
|
|
response = _rest_client.get(url, output_mode="json").body.read()
|
|
except binding.HTTPError as e:
|
|
if e.status != 404:
|
|
raise
|
|
|
|
raise UserNotExistException("User: %s does not exist." % username)
|
|
|
|
return json.loads(response)["entry"][0]["content"]["capabilities"]
|
|
|
|
|
|
def user_is_capable(
|
|
session_key: str,
|
|
username: str,
|
|
capability: str,
|
|
scheme: str = None,
|
|
host: str = None,
|
|
port: int = None,
|
|
**context: dict,
|
|
) -> bool:
|
|
"""Check if user is capable for given `capability`.
|
|
|
|
Arguments:
|
|
session_key: Splunk access token.
|
|
username: (optional) User name of roles to get.
|
|
capability: The capability we wish to check for.
|
|
scheme: (optional) The access scheme, default is None.
|
|
host: (optional) The host name, default is None.
|
|
port: (optional) The port number, default is None.
|
|
context: Other configurations for Splunk rest client.
|
|
|
|
Returns:
|
|
True if user is capable else False.
|
|
|
|
Raises:
|
|
UserNotExistException: If `username` does not exist.
|
|
|
|
Examples:
|
|
>>> from solnlib import user_access
|
|
>>> is_capable = user_access.user_is_capable(
|
|
>>> session_key, 'test_user', 'object_read_capability')
|
|
"""
|
|
|
|
capabilities = get_user_capabilities(
|
|
session_key, username, scheme=scheme, host=host, port=port, **context
|
|
)
|
|
return capability in capabilities
|
|
|
|
|
|
@utils.retry(exceptions=[binding.HTTPError])
|
|
def get_user_roles(
|
|
session_key: str, username: str, scheme=None, host=None, port=None, **context
|
|
) -> List:
|
|
"""Get user roles.
|
|
|
|
Arguments:
|
|
session_key: Splunk access token.
|
|
username: (optional) User name of roles to get.
|
|
scheme: (optional) The access scheme, default is None.
|
|
host: (optional) The host name, default is None.
|
|
port: (optional) The port number, default is None.
|
|
context: Other configurations for Splunk rest client.
|
|
|
|
Returns:
|
|
User roles.
|
|
|
|
Raises:
|
|
UserNotExistException: If `username` does not exist.
|
|
|
|
Examples:
|
|
>>> from solnlib import user_access
|
|
>>> user_roles = user_access.get_user_roles(session_key, 'test_user')
|
|
"""
|
|
|
|
_rest_client = rest_client.SplunkRestClient(
|
|
session_key, "-", scheme=scheme, host=host, port=port, **context
|
|
)
|
|
url = f"/services/authentication/users/{username}"
|
|
try:
|
|
response = _rest_client.get(url, output_mode="json").body.read()
|
|
except binding.HTTPError as e:
|
|
if e.status != 404:
|
|
raise
|
|
|
|
raise UserNotExistException("User: %s does not exist." % username)
|
|
|
|
return json.loads(response)["entry"][0]["content"]["roles"]
|