# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. import ipaddress import json import requests import socket import sys from urllib.parse import urlparse from splunk.clilib.bundle_paths import make_splunkhome_path sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib'])) sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib', 'SA_ITOA_app_common'])) from ITOA.itoa_common import get_conf from ITOA.setup_logging import getLogger from ITOA.event_management.notable_event_utils import Audit from itsi.event_management.sdk.custom_group_action_base import CustomGroupActionBase import splunk.rest as splunk_rest from splunk.util import safeURLQuote WEBHOOK_AUTH_TYPES = { 'BASIC_AUTH': 'Basic authentication', 'BEARER_TOKEN': 'Bearer token', 'NO_AUTH': 'No authentication' } class Webhook(CustomGroupActionBase): """ Class that performs Webhook action on notable events group. """ def __init__(self, settings, app='SA-ITOA'): """ Initialize the object @type settings: dict/basestring @param settings: incoming settings for this alert action that splunkd passes via stdin. @returns Nothing """ self.logger = getLogger(logger_name="itsi.event_action.webhook") super(Webhook, self).__init__(settings, self.logger) self.app = app self.owner = 'nobody' self.session_key = self.get_session_key() self.webhook_name = None self.webhook_url = None self.conf_file_name = 'webhooks' self.webhook_header = '' self.parsed_url_hostname = '' self.initial_ip = '' def get_clear_password(self, username): """ Get the actual password and token @type: str @param username: username/webhook name @rtype: str @return: decoded password/token """ try: realm_user = self.webhook_name + ':' + username uri_string = ('/servicesNS/{0}/{1}/storage/passwords/{2}').format(self.owner, self.app, realm_user) uri = safeURLQuote(uri_string) res, content = splunk_rest.simpleRequest(uri, getargs={'output_mode': 'json'}, sessionKey=self.session_key) if res.status == 200: self.logger.info( 'Password is fetched successfully for webhook=%s', self.webhook_name) else: self.logger.error( 'Error in getting password from passwords.conf. response=%s content=%s', res, content) return None if not content: self.logger.error('content was not returned.') return None parsed_content = json.loads(content) password = parsed_content.get('entry', [])[0].get('content', {}).get('clear_password', {}) return password except Exception as e: self.logger.exception('An error occurred while fetching the password. Exception: %s', e) return None def send_post_request(self, data): """ Send POST request with data to url @type: dict @param data: payload to send via url @return: Nothing """ try: addr_info = socket.getaddrinfo(self.parsed_url_hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM) final_ip = [info[4][0] for info in addr_info] if self.initial_ip[0] == final_ip[0]: headers = {"Content-Type": "application/json"} if self.webhook_header is not None: try: headers = json.loads(self.webhook_header) except Exception as e: self.logger.error( 'Unable to complete webhook action because the header is invalid. Update the header and try again. Webhook={0} header={1} Exception: {2}.'.format( self.webhook_name, self.webhook_header, e ) ) sys.exit(1) response = requests.post(self.webhook_url, data=data, headers=headers, allow_redirects=False) # Check if the request was successful (status code 2xx) if response.status_code >= 200 and response.status_code < 300: self.logger.info( 'Webhook action for webhook {0} executed successfully.'.format( self.webhook_name ) ) else: self.logger.error( 'Failed to execute Webhook action for webhook={0}'.format( self.webhook_name ) ) sys.exit(1) else: self.logger.error('IP does not match with the validated IP') raise Exception('IP did not match the original IP') except Exception as e: self.logger.exception( 'An error occurred while executing the webhook action. Webhook={0} Exception: {1}'.format( self.webhook_name, e ) ) sys.exit(1) def send_post_request_with_auth(self, data, username): """ Send POST request with data to url using username and password @type: dict @param data: payload to send via url @type: str @param username: username for authentication @return: Nothing """ try: addr_info = socket.getaddrinfo(self.parsed_url_hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM) final_ip = [info[4][0] for info in addr_info] if self.initial_ip[0] == final_ip[0]: decrypted_password = self.get_clear_password(username) headers = {'Content-Type': 'application/json'} if self.webhook_header is not None: try: headers = json.loads(self.webhook_header) except Exception as e: self.logger.error( 'Unable to complete webhook action because the header is invalid. Update the header and try again. Webhook={0} header={1} Exception: {2}.'.format( self.webhook_name, self.webhook_header, e ) ) sys.exit(1) if decrypted_password is None: self.logger.error( 'Password not found for provided Webhook=%s', self.webhook_name ) sys.exit(1) response = requests.post( self.webhook_url, headers=headers, data=data, auth=(username, decrypted_password), verify=self.is_ssl_certificate_validation_disabled, allow_redirects=False, ) # Check if the request was successful (status code 2xx) if response.status_code >= 200 and response.status_code < 300: self.logger.info( 'Webhook action for webhook {0} executed successfully. Response: {1}'.format( self.webhook_name, response ) ) else: self.logger.error( 'Failed to execute Webhook action for webhook={0}. status={1}.'.format( self.webhook_name, response.status_code ) ) sys.exit(1) else: self.logger.error('IP does not match with the validated IP') raise Exception('IP did not match the original IP') except Exception as e: self.logger.exception( 'An error occurred while executing the webhook action. Webhook={0} Exception: {1}'.format( self.webhook_name, e ) ) sys.exit(1) def send_post_request_with_token(self, data): """ Send POST request with data to url using Token @type: dict @param data: payload to send via url @return: Nothing """ try: addr_info = socket.getaddrinfo(self.parsed_url_hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM) final_ip = [info[4][0] for info in addr_info] if self.initial_ip[0] == final_ip[0]: decrypted_token = self.get_clear_password(self.webhook_name) if decrypted_token is None: self.logger.error( 'Token not found for provied Webhook=%s', self.webhook_name ) sys.exit(1) headers = { "Authorization": f"Bearer {decrypted_token}", "Content-Type": "application/json", } if self.webhook_header is not None: try: self.webhook_header = json.loads(self.webhook_header) if "Authorization" not in self.webhook_header: self.webhook_header.update( {"Authorization": f"Bearer {decrypted_token}"} ) headers = self.webhook_header except Exception as e: self.logger.error( 'Unable to complete webhook action because the header is invalid. Update the header and try again. Webhook={0} header={1} Exception: {2}.'.format( self.webhook_name, self.webhook_header, e ) ) sys.exit(1) response = requests.post( self.webhook_url, data=data, headers=headers, verify=self.is_ssl_certificate_validation_disabled, allow_redirects=False, ) # Check if the request was successful (status code 2xx) if response.status_code >= 200 and response.status_code < 300: self.logger.info( 'Webhook action for webhook {0} executed successfully. Response: {1}'.format( self.webhook_name, response ) ) else: self.logger.error( 'Failed to execute Webhook action for webhook={0}. status={1}.'.format( self.webhook_name, response.status_code ) ) sys.exit(1) else: self.logger.error('IP does not match with the validated IP') raise Exception('IP did not match the original IP') except Exception as e: self.logger.exception( 'An error occurred while executing the webhook action. Webhook={0} Exception: {1}'.format( self.webhook_name, e ) ) sys.exit(1) def get_configuration(self, conf_file_name=None, app=None): """ Get configurations for webhooks @type: str @param conf_file_name: configuration file name @type: str @param app: app name @rtype: dict @return: config fields with value for provided configuration file """ conf_file_name = conf_file_name if conf_file_name else self.conf_file_name app = app if app else self.app rval = get_conf(self.session_key, conf_file_name, search='disabled=0', count=-1, app=app) response = rval.get('response') if response.status != 200: self.logger.error( 'Failed to fetch configuration file=`%s`, rval=`%s`', self.conf_file_name, rval ) raise Exception('Failed to fetch data for config="%s".', self.conf_file_name) content = rval.get('content') content = json.loads(content) configuration = {} for entry in content.get('entry', []): configuration[entry.get('name')] = entry.get('content', {}) return configuration def execute_action(self, payload): """ Performs the POST request for the provided webhook @type: dict @param payload: payload of event """ # Reading configuration file configuration = self.get_configuration(self.conf_file_name) # Getting required fields from conf file webhook_obj = configuration.get(self.webhook_name, None) webhook_auth_type = webhook_obj.get('auth_type', None) self.webhook_header = webhook_obj.get('header', None) self.is_ssl_certificate_validation_disabled = webhook_obj.get('should_ssl_verified', False) if self.is_ssl_certificate_validation_disabled: self.is_ssl_certificate_validation_disabled = bool(int(self.is_ssl_certificate_validation_disabled)) # Execute POST call based on the Auth Type if webhook_auth_type == WEBHOOK_AUTH_TYPES['BASIC_AUTH']: # using username password webhook_username = webhook_obj.get('username', None) if webhook_username is None: self.logger.error('Username must be defined for Webhook with Auth Type Basic') sys.exit(1) self.send_post_request_with_auth(payload, webhook_username) elif webhook_auth_type == WEBHOOK_AUTH_TYPES['BEARER_TOKEN']: # using token self.send_post_request_with_token(payload) elif webhook_auth_type == WEBHOOK_AUTH_TYPES['NO_AUTH']: # direct POST call self.send_post_request(payload) else: raise Exception('Correct Auth Type is not provided for webhook %s', self.webhook_name) def execute(self): """ Performs webhook action. executes the execute_action for the webhook with payload """ self.logger.debug('Received settings from splunkd=`%s`', json.dumps(self.settings)) payload = self.settings.get('result', None) config = self.settings.get('configuration', None) self.webhook_name = config.get('webhook_name', None) self.webhook_url = config.get('webhook_uri', None) try: if self.webhook_name is None: self.logger.error('Webhook Name must be defined for webhook action') sys.exit(1) if self.webhook_url is None: self.logger.error('Webhook URL must be defined for webhook action') sys.exit(1) self.parsed_url_hostname = urlparse(self.webhook_url).hostname addr_info = socket.getaddrinfo(self.parsed_url_hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM) self.initial_ip = [info[4][0] for info in addr_info] if not ipaddress.ip_address(self.initial_ip[0]).is_global: self.logger.error('Webhook URL must be public for webhook action') sys.exit(1) self.execute_action(json.dumps(payload)) except Exception as e: self.logger.error('Failed to execute webhook action.') self.logger.exception(e) sys.exit(1) if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == '--execute': input_params = sys.stdin.read() webhook = Webhook(input_params) webhook.execute()