You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
149 lines
4.5 KiB
149 lines
4.5 KiB
import os
|
|
from typing import Optional, Dict, Any
|
|
|
|
import requests
|
|
|
|
from mappers import (
|
|
get_bitwarden_event,
|
|
get_bitwarden_group,
|
|
get_bitwarden_member
|
|
)
|
|
from models import (
|
|
BitwardenApiConfig,
|
|
BitwardenEventsResponse,
|
|
BitwardenEventsRequest,
|
|
BitwardenGroupsResponse,
|
|
BitwardenMembersResponse
|
|
)
|
|
|
|
from utils import get_logger
|
|
|
|
REQUESTS_TIMEOUT = (10, 30)
|
|
|
|
|
|
def _join_urls(base: str, *paths: str):
|
|
url = base
|
|
if not url.endswith("/"):
|
|
url += "/"
|
|
|
|
for path in paths:
|
|
if path.startswith("/"):
|
|
url += path[1:]
|
|
else:
|
|
url += path
|
|
url += "/"
|
|
|
|
url = url[:-1]
|
|
|
|
return url
|
|
|
|
|
|
def _get_custom_ca_certificate_location() -> Optional[str]:
|
|
if 'SPLUNK_HOME' not in os.environ:
|
|
return None
|
|
|
|
app_cacerts_file = os.path.join(os.environ.get('SPLUNK_HOME'), 'etc', 'auth',
|
|
'bitwarden_event_logs_cacerts.pem')
|
|
if not os.path.isfile(app_cacerts_file):
|
|
return None
|
|
|
|
return app_cacerts_file
|
|
|
|
|
|
class BitwardenApi:
|
|
def __init__(self, api_config: BitwardenApiConfig):
|
|
self.logger = get_logger()
|
|
self.api_config = api_config
|
|
self.access_token = self.get_access_token()
|
|
|
|
def get_access_token(self) -> str:
|
|
url = _join_urls(self.api_config.identity_api_url, "connect/token")
|
|
headers = {
|
|
"Content-Type": "application/x-www-form-urlencoded"
|
|
}
|
|
data = {
|
|
"grant_type": "client_credentials",
|
|
"client_id": self.api_config.client_id,
|
|
"client_secret": self.api_config.client_secret,
|
|
"scope": "api.organization",
|
|
}
|
|
|
|
self.logger.debug('Request url %s',
|
|
url)
|
|
|
|
response = requests.post(url,
|
|
headers=headers,
|
|
data=data,
|
|
timeout=REQUESTS_TIMEOUT,
|
|
verify=_get_custom_ca_certificate_location())
|
|
|
|
response_dict = self.__get_response_json(response)
|
|
|
|
if 'access_token' not in response_dict:
|
|
raise Exception('Access token not found in response')
|
|
|
|
return response_dict["access_token"]
|
|
|
|
def get_events(self, request: BitwardenEventsRequest) -> BitwardenEventsResponse:
|
|
url = _join_urls(self.api_config.events_api_url, "/public/events")
|
|
|
|
query_params = {
|
|
"start": request.start,
|
|
"end": request.end
|
|
}
|
|
|
|
if request.continuation_token is not None:
|
|
query_params["continuationToken"] = request.continuation_token
|
|
|
|
response_dict = self.__send_get_request(url, query_params)
|
|
|
|
data = [get_bitwarden_event(item_dict) for item_dict in response_dict.get("data", [])]
|
|
|
|
# The Bitwarden API may return an empty string for the continuationToken
|
|
# when there are no more pages. Normalize this to "None" to ensure it is
|
|
# handled properly.
|
|
return BitwardenEventsResponse(
|
|
data=data,
|
|
continuationToken=response_dict.get("continuationToken", None) or None
|
|
)
|
|
|
|
def get_groups(self):
|
|
url = _join_urls(self.api_config.events_api_url, "/public/groups")
|
|
|
|
response_dict = self.__send_get_request(url, None)
|
|
|
|
data = [get_bitwarden_group(item_dict) for item_dict in response_dict.get("data", [])]
|
|
|
|
return BitwardenGroupsResponse(data)
|
|
|
|
def get_members(self):
|
|
url = _join_urls(self.api_config.events_api_url, "/public/members")
|
|
|
|
response_dict = self.__send_get_request(url, None)
|
|
|
|
data = [get_bitwarden_member(item_dict) for item_dict in response_dict.get("data", [])]
|
|
|
|
return BitwardenMembersResponse(data)
|
|
|
|
def __send_get_request(self, url: str, query_params: Optional[Dict[str, Any]]) -> Any:
|
|
headers = {
|
|
"Authorization": f"Bearer {self.access_token}"
|
|
}
|
|
|
|
self.logger.debug('Request url %s, query params %s',
|
|
url, query_params)
|
|
|
|
response = requests.get(url,
|
|
headers=headers,
|
|
params=query_params,
|
|
timeout=REQUESTS_TIMEOUT,
|
|
verify=_get_custom_ca_certificate_location())
|
|
|
|
return self.__get_response_json(response)
|
|
|
|
def __get_response_json(self, response):
|
|
self.logger.debug('Response status code %s', response.status_code)
|
|
response.raise_for_status()
|
|
|
|
return response.json()
|