You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

950 lines
28 KiB

#
# Copyright 2024 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Splunk user access control related utilities."""
import json
from typing import List, Optional
from splunklib import binding
from solnlib import _utils
from solnlib import splunk_rest_client as rest_client
from solnlib import utils
__all__ = [
"ObjectACLException",
"ObjectACL",
"ObjectACLManagerException",
"ObjectACLManager",
"AppCapabilityManagerException",
"AppCapabilityManager",
"UserAccessException",
"check_user_access",
"InvalidSessionKeyException",
"get_current_username",
"UserNotExistException",
"get_user_capabilities",
"user_is_capable",
"get_user_roles",
]
class ObjectACLException(Exception):
pass
class ObjectACL:
"""Object ACL record.
Examples:
>>> from solnlib import user_access
>>> obj_acl = user_access.ObjectACL(
>>> 'test_collection',
>>> '9defa6f510d711e6be16a45e60e34295',
>>> 'test_object',
>>> 'Splunk_TA_test',
>>> 'admin',
>>> {'read': ['*'], 'write': ['admin'], 'delete': ['admin']},
>>> False)
"""
OBJ_COLLECTION_KEY = "obj_collection"
OBJ_ID_KEY = "obj_id"
OBJ_TYPE_KEY = "obj_type"
OBJ_APP_KEY = "obj_app"
OBJ_OWNER_KEY = "obj_owner"
OBJ_PERMS_KEY = "obj_perms"
OBJ_PERMS_READ_KEY = "read"
OBJ_PERMS_WRITE_KEY = "write"
OBJ_PERMS_DELETE_KEY = "delete"
OBJ_PERMS_ALLOW_ALL = "*"
OBJ_SHARED_BY_INCLUSION_KEY = "obj_shared_by_inclusion"
def __init__(
self,
obj_collection: str,
obj_id: str,
obj_type: str,
obj_app: str,
obj_owner: str,
obj_perms: dict,
obj_shared_by_inclusion: bool,
):
"""Initializes ObjectACL.
Arguments:
obj_collection: Collection where object currently stored.
obj_id: ID of this object.
obj_type: Type of this object.
obj_app: App of this object.
obj_owner: Owner of this object.
obj_perms: Object perms, like: {'read': ['*'], 'write': ['admin'], 'delete': ['admin']}.
obj_shared_by_inclusion: Flag of object is shared by inclusion.
"""
self.obj_collection = obj_collection
self.obj_id = obj_id
self.obj_type = obj_type
self.obj_app = obj_app
self.obj_owner = obj_owner
self._check_perms(obj_perms)
self._obj_perms = obj_perms
self.obj_shared_by_inclusion = obj_shared_by_inclusion
@classmethod
def _check_perms(cls, obj_perms):
if not isinstance(obj_perms, dict):
raise ObjectACLException(
"Invalid object acl perms type: %s, should be a dict." % type(obj_perms)
)
if not (
cls.OBJ_PERMS_READ_KEY in obj_perms
and cls.OBJ_PERMS_WRITE_KEY in obj_perms
and cls.OBJ_PERMS_DELETE_KEY in obj_perms
):
raise ObjectACLException(
"Invalid object acl perms: %s, "
"should include read, write and delete perms." % obj_perms
)
@property
def obj_perms(self):
return self._obj_perms
@obj_perms.setter
def obj_perms(self, obj_perms):
self._check_perms(obj_perms)
self._obj_perms = obj_perms
@property
def record(self) -> dict:
"""Get object acl record.
Returns: Object acl record, like:
{
'_key': 'test_collection-1234',
'obj_collection': 'test_collection',
'obj_id': '1234',
'obj_type': 'test_object',
'obj_app': 'Splunk_TA_test',
'obj_owner': 'admin',
'obj_perms': {'read': ['*'], 'write': ['admin'], 'delete': ['admin']},
'obj_shared_by_inclusion': True
}
"""
return {
"_key": self.generate_key(self.obj_collection, self.obj_id),
self.OBJ_COLLECTION_KEY: self.obj_collection,
self.OBJ_ID_KEY: self.obj_id,
self.OBJ_TYPE_KEY: self.obj_type,
self.OBJ_APP_KEY: self.obj_app,
self.OBJ_OWNER_KEY: self.obj_owner,
self.OBJ_PERMS_KEY: self._obj_perms,
self.OBJ_SHARED_BY_INCLUSION_KEY: self.obj_shared_by_inclusion,
}
@staticmethod
def generate_key(obj_collection: str, obj_id: str) -> str:
"""Generate object acl record key.
Arguments:
obj_collection: Collection where object currently stored.
obj_id: ID of this object.
Returns:
Object acl record key.
"""
return "{obj_collection}_{obj_id}".format(
obj_collection=obj_collection, obj_id=obj_id
)
@staticmethod
def parse(obj_acl_record: dict) -> "ObjectACL":
"""Parse object acl record and construct a new `ObjectACL` object from
it.
Arguments:
obj_acl_record: Object acl record.
Returns:
New `ObjectACL` object.
"""
return ObjectACL(
obj_acl_record[ObjectACL.OBJ_COLLECTION_KEY],
obj_acl_record[ObjectACL.OBJ_ID_KEY],
obj_acl_record[ObjectACL.OBJ_TYPE_KEY],
obj_acl_record[ObjectACL.OBJ_APP_KEY],
obj_acl_record[ObjectACL.OBJ_OWNER_KEY],
obj_acl_record[ObjectACL.OBJ_PERMS_KEY],
obj_acl_record[ObjectACL.OBJ_SHARED_BY_INCLUSION_KEY],
)
def merge(self, obj_acl: "ObjectACL"):
"""Merge current object perms with perms of `obj_acl`.
Arguments:
obj_acl: Object acl to merge.
"""
for perm_key in self._obj_perms:
self._obj_perms[perm_key] = list(
set.union(
set(self._obj_perms[perm_key]), set(obj_acl._obj_perms[perm_key])
)
)
if self.OBJ_PERMS_ALLOW_ALL in self._obj_perms[perm_key]:
self._obj_perms[perm_key] = [self.OBJ_PERMS_ALLOW_ALL]
def __str__(self):
return json.dumps(self.record)
class ObjectACLManagerException(Exception):
"""Exception for ObjectACLManager."""
pass
class ObjectACLNotExistException(Exception):
"""Exception for the situation when ACL does not exist."""
pass
class ObjectACLManager:
"""Object ACL manager.
Examples:
>>> from solnlib import user_access
>>> oaclm = user_access.ObjectACLManager(session_key,
'Splunk_TA_test')
"""
def __init__(
self,
collection_name: str,
session_key: str,
app: str,
owner: Optional[str] = "nobody",
scheme: Optional[str] = None,
host: Optional[str] = None,
port: Optional[int] = None,
**context: dict,
):
"""Initializes ObjectACLManager.
Arguments:
collection_name: Collection name to store object ACL info.
session_key: Splunk access token.
app: App name of namespace.
owner: (optional) Owner of namespace, default is `nobody`.
scheme: (optional) The access scheme, default is None.
host: (optional) The host name, default is None.
port: (optional) The port number, default is None.
context: Other configurations for Splunk rest client.
Raises:
ObjectACLManagerException: If init ObjectACLManager failed.
"""
collection_name = "{app}_{collection_name}".format(
app=app, collection_name=collection_name
)
try:
self._collection_data = _utils.get_collection_data(
collection_name,
session_key,
app,
owner,
scheme,
host,
port,
None,
**context,
)
except KeyError:
raise ObjectACLManagerException(
f"Get object acl collection: {collection_name} fail."
)
@utils.retry(exceptions=[binding.HTTPError])
def update_acl(
self,
obj_collection: str,
obj_id: str,
obj_type: str,
obj_app: str,
obj_owner: str,
obj_perms: dict,
obj_shared_by_inclusion: bool = True,
replace_existing: bool = True,
):
"""Update acl info of object.
Construct a new object acl info first, if `replace_existing` is True
then replace existing acl info else merge new object acl info with the
old one and replace the old acl info with merged acl info.
Arguments:
obj_collection: Collection where object currently stored.
obj_id: ID of this object.
obj_type: Type of this object.
obj_app: App of this object.
obj_owner: Owner of this object.
obj_perms: Object perms, like:
{
'read': ['*'],
'write': ['admin'],
'delete': ['admin']
}.
obj_shared_by_inclusion: (optional) Flag of object is shared by
inclusion, default is True.
replace_existing: (optional) Replace existing acl info flag, True
indicates replace old acl info with new one else merge with old
acl info, default is True.
"""
obj_acl = ObjectACL(
obj_collection,
obj_id,
obj_type,
obj_app,
obj_owner,
obj_perms,
obj_shared_by_inclusion,
)
if not replace_existing:
try:
old_obj_acl = self.get_acl(obj_collection, obj_id)
except ObjectACLNotExistException:
old_obj_acl = None
if old_obj_acl:
obj_acl.merge(old_obj_acl)
self._collection_data.batch_save(obj_acl.record)
@utils.retry(exceptions=[binding.HTTPError])
def update_acls(
self,
obj_collection: str,
obj_ids: List[str],
obj_type: str,
obj_app: str,
obj_owner: str,
obj_perms: dict,
obj_shared_by_inclusion: bool = True,
replace_existing: bool = True,
):
"""Batch update object acl info to all provided `obj_ids`.
Arguments:
obj_collection: Collection where objects currently stored.
obj_ids: IDs list of objects.
obj_type: Type of this object.
obj_app: App of this object.
obj_owner: Owner of this object.
obj_perms: Object perms, like:
{
'read': ['*'],
'write': ['admin'],
'delete': ['admin']
}.
obj_shared_by_inclusion: (optional) Flag of object is shared by
inclusion, default is True.
replace_existing: (optional) Replace existing acl info flag, True
indicates replace old acl info with new one else merge with old acl
info, default is True.
"""
obj_acl_records = []
for obj_id in obj_ids:
obj_acl = ObjectACL(
obj_collection,
obj_id,
obj_type,
obj_app,
obj_owner,
obj_perms,
obj_shared_by_inclusion,
)
if not replace_existing:
try:
old_obj_acl = self.get_acl(obj_collection, obj_id)
except ObjectACLNotExistException:
old_obj_acl = None
if old_obj_acl:
obj_acl.merge(old_obj_acl)
obj_acl_records.append(obj_acl.record)
self._collection_data.batch_save(*obj_acl_records)
@utils.retry(exceptions=[binding.HTTPError])
def get_acl(self, obj_collection: str, obj_id: str) -> "ObjectACL":
"""Get acl info.
Query object acl info with parameter of the combination of
`obj_collection` and `obj_id` from `self.collection_name` and
return it.
Arguments:
obj_collection: Collection where object currently stored.
obj_id: ID of this object.
Returns:
Object acl info if success else None.
Raises:
ObjectACLNotExistException: If object ACL info does not exist.
"""
key = ObjectACL.generate_key(obj_collection, obj_id)
try:
obj_acl = self._collection_data.query_by_id(key)
except binding.HTTPError as e:
if e.status != 404:
raise
raise ObjectACLNotExistException(
"Object ACL info of {}_{} does not exist.".format(
obj_collection, obj_id
)
)
return ObjectACL.parse(obj_acl)
@utils.retry(exceptions=[binding.HTTPError])
def get_acls(self, obj_collection: str, obj_ids: List[str]) -> List[ObjectACL]:
"""Batch get acl info.
Query objects acl info with parameter of the combination of
`obj_collection` and `obj_ids` from KVStore and return them.
Arguments:
obj_collection: Collection where object currently stored.
obj_ids: IDs of objects.
Returns:
List of `ObjectACL` instances.
"""
query = json.dumps(
{
"$or": [
{"_key": ObjectACL.generate_key(obj_collection, obj_id)}
for obj_id in obj_ids
]
}
)
obj_acls = self._collection_data.query(query=query)
return [ObjectACL.parse(obj_acl) for obj_acl in obj_acls]
@utils.retry(exceptions=[binding.HTTPError])
def delete_acl(self, obj_collection: str, obj_id: str):
"""Delete acl info.
Query object acl info with parameter of the combination of
`obj_collection` and `obj_ids` from KVStore and delete it.
Arguments:
obj_collection: Collection where object currently stored.
obj_id: ID of this object.
Raises:
ObjectACLNotExistException: If object ACL info does not exist.
"""
key = ObjectACL.generate_key(obj_collection, obj_id)
try:
self._collection_data.delete_by_id(key)
except binding.HTTPError as e:
if e.status != 404:
raise
raise ObjectACLNotExistException(
"Object ACL info of {}_{} does not exist.".format(
obj_collection, obj_id
)
)
@utils.retry(exceptions=[binding.HTTPError])
def delete_acls(self, obj_collection: str, obj_ids: List[str]):
"""Batch delete acl info.
Query objects acl info with parameter of the combination of
`obj_collection` and `obj_ids` from KVStore and delete them.
Arguments:
obj_collection: Collection where object currently stored.
obj_ids: IDs of objects.
"""
query = json.dumps(
{
"$or": [
{"_key": ObjectACL.generate_key(obj_collection, obj_id)}
for obj_id in obj_ids
]
}
)
self._collection_data.delete(query=query)
@utils.retry(exceptions=[binding.HTTPError])
def get_accessible_object_ids(
self, user: str, operation: str, obj_collection: str, obj_ids: List[str]
) -> List[str]:
"""Get accessible IDs of objects from `obj_acls`.
Arguments:
user: User name of current `operation`.
operation: User operation, possible option: (read/write/delete).
obj_collection: Collection where object currently stored.
obj_ids: IDs of objects.
Returns:
List of IDs of accessible objects.
"""
obj_acls = self.get_acls(obj_collection, obj_ids)
accessible_obj_ids = []
for obj_acl in obj_acls:
perms = obj_acl.obj_perms[operation]
if ObjectACL.OBJ_PERMS_ALLOW_ALL in perms or user in perms:
accessible_obj_ids.append(obj_acl.obj_id)
return accessible_obj_ids
class AppCapabilityManagerException(Exception):
"""Exception for AppCapabilityManager."""
pass
class AppCapabilityNotExistException(Exception):
"""Exception for the situation when AppCapability does not exist for a
specific app."""
pass
class AppCapabilityManager:
"""App capability manager.
Examples:
>>> from solnlib import user_access
>>> acm = user_access.AppCapabilityManager('test_collection',
session_key,
'Splunk_TA_test')
>>> acm.register_capabilities(...)
>>> acm.unregister_capabilities(...)
"""
def __init__(
self,
collection_name: str,
session_key: str,
app: str,
owner: str = "nobody",
scheme: str = None,
host: str = None,
port: int = None,
**context: dict,
):
"""Initializes AppCapabilityManager.
Arguments:
collection_name: Collection name to store capabilities.
session_key: Splunk access token.
app: App name of namespace.
owner: (optional) Owner of namespace, default is `nobody`.
scheme: (optional) The access scheme, default is None.
host: (optional) The host name, default is None.
port: (optional) The port number, default is None.
context: Other configurations for Splunk rest client.
Raises:
AppCapabilityManagerException: If init AppCapabilityManager failed.
"""
self._app = app
collection_name = f"{app}_{collection_name}"
try:
self._collection_data = _utils.get_collection_data(
collection_name,
session_key,
app,
owner,
scheme,
host,
port,
None,
**context,
)
except KeyError:
raise AppCapabilityManagerException(
f"Get app capabilities collection: {collection_name} failed."
)
@utils.retry(exceptions=[binding.HTTPError])
def register_capabilities(self, capabilities: dict):
"""Register app capabilities.
Arguments:
capabilities: App capabilities, example:
{
'object_type1': {
'read': 'read_app_object_type1',
'write': 'write_app_object_type1',
'delete': 'delete_app_object_type1'},
'object_type2': {
'read': 'read_app_object_type2',
'write': 'write_app_object_type2',
'delete': 'delete_app_object_type2'
},
...
}
"""
record = {"_key": self._app, "capabilities": capabilities}
self._collection_data.batch_save(record)
@utils.retry(exceptions=[binding.HTTPError])
def unregister_capabilities(self):
"""Unregister app capabilities.
Raises:
AppCapabilityNotExistException: If app capabilities are not registered.
"""
try:
self._collection_data.delete_by_id(self._app)
except binding.HTTPError as e:
if e.status != 404:
raise
raise AppCapabilityNotExistException(
"App capabilities for %s have not been registered." % self._app
)
@utils.retry(exceptions=[binding.HTTPError])
def capabilities_are_registered(self) -> bool:
"""Check if app capabilities are registered.
Returns:
True if app capabilities are registered else False.
"""
try:
self._collection_data.query_by_id(self._app)
except binding.HTTPError as e:
if e.status != 404:
raise
return False
return True
@utils.retry(exceptions=[binding.HTTPError])
def get_capabilities(self) -> dict:
"""Get app capabilities.
Returns:
App capabilities.
Raises:
AppCapabilityNotExistException: If app capabilities are not registered.
"""
try:
record = self._collection_data.query_by_id(self._app)
except binding.HTTPError as e:
if e.status != 404:
raise
raise AppCapabilityNotExistException(
"App capabilities for %s have not been registered." % self._app
)
return record["capabilities"]
class UserAccessException(Exception):
"""Exception for the situation when there is user access exception."""
pass
def check_user_access(
session_key: str,
capabilities: dict,
obj_type: str,
operation: str,
scheme: str = None,
host: str = None,
port: int = None,
**context: dict,
):
"""User access checker.
It will fetch user capabilities from given `session_key` and check if
the capability extracted from `capabilities`, `obj_type` and `operation`
is contained, if user capabilities include the extracted capability user
access is ok else fail.
Arguments:
session_key: Splunk access token.
capabilities: App capabilities, example:
{
'object_type1': {
'read': 'read_app_object_type1',
'write': 'write_app_object_type1',
'delete': 'delete_app_object_type1'},
'object_type2': {
'read': 'read_app_object_type2',
'write': 'write_app_object_type2',
'delete': 'delete_app_object_type2'
},
...
}
obj_type: Object type.
operation: User operation, possible option: (read/write/delete).
scheme: (optional) The access scheme, default is None.
host: (optional) The host name, default is None.
port: (optional) The port number, default is None.
context: Other configurations for Splunk rest client.
Raises:
UserAccessException: If user access permission is denied.
Examples:
>>> from solnlib.user_access import check_user_access
>>> def fun():
>>> check_user_access(
>>> session_key, capabilities, 'test_object', 'read')
>>> ...
"""
username = get_current_username(
session_key, scheme=scheme, host=host, port=port, **context
)
capability = capabilities[obj_type][operation]
if not user_is_capable(
session_key,
username,
capability,
scheme=scheme,
host=host,
port=port,
**context,
):
raise UserAccessException(
"Permission denied, %s does not have the capability: %s."
% (username, capability)
)
class InvalidSessionKeyException(Exception):
"""Exception when Splunk session key is invalid."""
pass
@utils.retry(exceptions=[binding.HTTPError])
def get_current_username(
session_key: str,
scheme: str = None,
host: str = None,
port: int = None,
**context: dict,
) -> str:
"""Get current user name from `session_key`.
Arguments:
session_key: Splunk access token.
scheme: (optional) The access scheme, default is None.
host: (optional) The host name, default is None.
port: (optional) The port number, default is None.
context: Other configurations for Splunk rest client.
Returns:
Current user name.
Raises:
InvalidSessionKeyException: If `session_key` is invalid.
Examples:
>>> from solnlib import user_access
>>> user_name = user_access.get_current_username(session_key)
"""
_rest_client = rest_client.SplunkRestClient(
session_key, "-", scheme=scheme, host=host, port=port, **context
)
try:
response = _rest_client.get(
"/services/authentication/current-context", output_mode="json"
).body.read()
except binding.HTTPError as e:
if e.status != 401:
raise
raise InvalidSessionKeyException("Invalid session key.")
return json.loads(response)["entry"][0]["content"]["username"]
class UserNotExistException(Exception):
"""Exception when user does not exist."""
pass
@utils.retry(exceptions=[binding.HTTPError])
def get_user_capabilities(
session_key: str,
username: str,
scheme: str = None,
host: str = None,
port: int = None,
**context: dict,
) -> List[dict]:
"""Get user capabilities.
Arguments:
session_key: Splunk access token.
scheme: (optional) The access scheme, default is None.
host: (optional) The host name, default is None.
port: (optional) The port number, default is None.
context: Other configurations for Splunk rest client.
Returns:
User capabilities.
Raises:
UserNotExistException: If `username` does not exist.
Examples:
>>> from solnlib import user_access
>>> user_capabilities = user_access.get_user_capabilities(
>>> session_key, 'test_user')
"""
_rest_client = rest_client.SplunkRestClient(
session_key, "-", scheme=scheme, host=host, port=port, **context
)
url = f"/services/authentication/users/{username}"
try:
response = _rest_client.get(url, output_mode="json").body.read()
except binding.HTTPError as e:
if e.status != 404:
raise
raise UserNotExistException("User: %s does not exist." % username)
return json.loads(response)["entry"][0]["content"]["capabilities"]
def user_is_capable(
session_key: str,
username: str,
capability: str,
scheme: str = None,
host: str = None,
port: int = None,
**context: dict,
) -> bool:
"""Check if user is capable for given `capability`.
Arguments:
session_key: Splunk access token.
username: (optional) User name of roles to get.
capability: The capability we wish to check for.
scheme: (optional) The access scheme, default is None.
host: (optional) The host name, default is None.
port: (optional) The port number, default is None.
context: Other configurations for Splunk rest client.
Returns:
True if user is capable else False.
Raises:
UserNotExistException: If `username` does not exist.
Examples:
>>> from solnlib import user_access
>>> is_capable = user_access.user_is_capable(
>>> session_key, 'test_user', 'object_read_capability')
"""
capabilities = get_user_capabilities(
session_key, username, scheme=scheme, host=host, port=port, **context
)
return capability in capabilities
@utils.retry(exceptions=[binding.HTTPError])
def get_user_roles(
session_key: str, username: str, scheme=None, host=None, port=None, **context
) -> List:
"""Get user roles.
Arguments:
session_key: Splunk access token.
username: (optional) User name of roles to get.
scheme: (optional) The access scheme, default is None.
host: (optional) The host name, default is None.
port: (optional) The port number, default is None.
context: Other configurations for Splunk rest client.
Returns:
User roles.
Raises:
UserNotExistException: If `username` does not exist.
Examples:
>>> from solnlib import user_access
>>> user_roles = user_access.get_user_roles(session_key, 'test_user')
"""
_rest_client = rest_client.SplunkRestClient(
session_key, "-", scheme=scheme, host=host, port=port, **context
)
url = f"/services/authentication/users/{username}"
try:
response = _rest_client.get(url, output_mode="json").body.read()
except binding.HTTPError as e:
if e.status != 404:
raise
raise UserNotExistException("User: %s does not exist." % username)
return json.loads(response)["entry"][0]["content"]["roles"]