# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. import sys import time from splunk.clilib.bundle_paths import make_splunkhome_path sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib'])) import itsi_py3 import requests import base64 import json import http.client from itsi.data_integrations.data_integrations_constants import CLOUD_SETUP_IAC_VALIDATION, CLIENT_ID_LOGGING_KEY, SCP_METHOD_LOGGING_KEY, ERROR_DETAIL_LOGGING_KEY, COLLECTION_LOGGING_KEY, STATUS_CODE_LOGGING_KEY, ENDPOINT_LOGGING_KEY, SCP_CLUSTER_LOGGING_KEY, RETRY_COUNT_LOGGING_KEY from ITOA.itoa_exceptions import ItoaError from itsi.data_integrations.utils.secret_store_manager import StoragePasswordManager from itsi.data_integrations.utils.error_manager import is_retryable, get_sleep_time env = 'production' url = { 'playground': { 'auth': 'https://auth.playground.scp.splunk.com', 'app': 'https://app.playground.scp.splunk.com', }, 'staging': { 'auth': 'https://auth.staging.scp.splunk.com', 'app': 'https://app.staging.scp.splunk.com', }, 'production': { 'auth': 'https://auth.scp.splunk.com', 'app': 'https://app.scp.splunk.com', } } class ScpInterface: def __init__(self, logger): self.logger = logger self.password_manager = StoragePasswordManager(self.logger) def get_headers_for_app_saimc_service(self, kvstore_collection, splunk_rest_client, need_auth): """ Return headers for the REST call to be made to saimulticloudprovisioning service """ if not need_auth: self.logger.debug('Returning headers with no bearer token') return {'Content-Type': 'application/json'} client_id = "" try: # KV store wait is already done, so we don't need to call it again here cloud_config = kvstore_collection.load() client_id = cloud_config[0].get('clientId') except Exception as e: self.logger.error('Error while loading kvstore collection', props={ERROR_DETAIL_LOGGING_KEY: e, COLLECTION_LOGGING_KEY: self.collection}) raise e if not client_id: self.logger.error('Could not find the client id', props={COLLECTION_LOGGING_KEY: self.collection}) raise ItoaError('Could not find the client id in the collection', self.logger) secret = self.password_manager.get_secret(splunk_rest_client, client_id, key_type='client_id') b64val = base64.b64encode(itsi_py3.to_bytes('%s:%s' % (client_id, secret))) auth_header = {'Authorization': 'Basic %s' % itsi_py3.decode(b64val)} bearer_token_response = self.handle_scp_calls(cluster='auth', method='POST', endpoint=CLOUD_SETUP_IAC_VALIDATION, data={'grant_type': 'client_credentials'}, header=auth_header) status_code = bearer_token_response.status_code json_data = {} if bearer_token_response.text: json_data = json.loads(bearer_token_response.text) if status_code != http.client.OK: self.logger.error('Auth credential flow failed ', props={STATUS_CODE_LOGGING_KEY: status_code, CLIENT_ID_LOGGING_KEY: client_id}) raise ItoaError(('%s returned from auth with credential flow' % status_code), self.logger) return {'Content-Type': 'application/json', 'Authorization': 'Bearer %s' % str(json_data['access_token'])} def handle_scp_calls(self, cluster, method, endpoint, data=None, header=None): """ Construct the REST request to SCP, send it and return the response """ _endpoint = str(endpoint) if _endpoint[0] != '/': raise ItoaError('Endpoint error', self.logger) scp_url = url[env][cluster] + _endpoint payloads = data # If the header indicates that the content type is JSON, set payloads to the JSON string if header: content_type = header.get('Content-Type', '') if content_type == 'application/json': payloads = json.dumps(data) response = {} tries = 0 maxRetrys = 3 while True: self.logger.info('Invoking SCP endpoint', props={SCP_METHOD_LOGGING_KEY: method, ENDPOINT_LOGGING_KEY: _endpoint, RETRY_COUNT_LOGGING_KEY: tries, SCP_CLUSTER_LOGGING_KEY: cluster}) if method == 'POST': response = requests.post(scp_url, headers=header, data=payloads) elif method == 'PUT': response = requests.put(scp_url, headers=header, data=payloads) elif method == 'GET': response = requests.get(scp_url, headers=header, data=payloads) elif method == 'DELETE': response = requests.delete(scp_url, headers=header, data=payloads) if response == {}: self.logger.error('Empty response from SCP', props={SCP_METHOD_LOGGING_KEY: method, ENDPOINT_LOGGING_KEY: _endpoint, RETRY_COUNT_LOGGING_KEY: tries, SCP_CLUSTER_LOGGING_KEY: cluster }) raise ItoaError('Failed to get a response from SCP', self.logger) self.logger.info('SCP response received', props={SCP_METHOD_LOGGING_KEY: method, ENDPOINT_LOGGING_KEY: _endpoint, RETRY_COUNT_LOGGING_KEY: tries, SCP_CLUSTER_LOGGING_KEY: cluster, STATUS_CODE_LOGGING_KEY: response.status_code}) tries = tries + 1 if (300 > response.status_code > 199) or (is_retryable(response.status_code, self.logger) is False) or (tries == maxRetrys): break time.sleep(get_sleep_time(tries)) return response