#!/usr/bin/env python # coding=utf-8 __author__ = "TrackMe Limited" __copyright__ = "Copyright 2022-2026, TrackMe Limited, U.K." __credits__ = "TrackMe Limited, U.K." __license__ = "TrackMe Limited, all rights reserved" __version__ = "0.1.0" __maintainer__ = "TrackMe Limited, U.K." __email__ = "support@trackme-solutions.com" __status__ = "PRODUCTION" # Standard library imports import os import sys import re import json import random import time import hashlib import logging import configparser from logging.handlers import RotatingFileHandler # Networking and URL handling imports import requests from requests.structures import CaseInsensitiveDict from urllib.parse import urlencode import urllib.parse import urllib3 # Disable insecure request warnings for urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # splunk home splunkhome = os.environ["SPLUNK_HOME"] # append lib sys.path.append(os.path.join(splunkhome, "etc", "apps", "trackme", "lib")) # import Splunk libs import splunklib.client as client import splunklib.results as results # import trackme libs from trackme_libs_licensing import trackme_check_license # import trackme libs utils from trackme_libs_utils import remove_leading_spaces # import trackme libs croniter from trackme_libs_croniter import validate_cron_schedule # logging: # To avoid overriding logging destination of callers, the libs will not set on purpose any logging definition # and rely on callers themselves # cd context manager class cd: """Context manager for changing the current working directory""" def __init__(self, newPath): self.newPath = os.path.expanduser(newPath) def __enter__(self): self.savedPath = os.getcwd() os.chdir(self.newPath) def __exit__(self, etype, value, traceback): os.chdir(self.savedPath) class JSONFormatter(logging.Formatter): def __init__(self, *args, timestamp=None, **kwargs): super().__init__(*args, **kwargs) self.custom_timestamp = timestamp def format(self, record): log_record = { "time": self.custom_timestamp if self.custom_timestamp else time.time(), } unwanted_attrs = set( [ "name", "msg", "args", "levelname", "levelno", "pathname", "filename", "module", "exc_info", "exc_text", "stack_info", "lineno", "funcName", "created", "msecs", "relativeCreated", "thread", "threadName", "processName", "process", ] ) for key, value in record.__dict__.items(): if ( key not in log_record and not key.startswith("_") and key not in unwanted_attrs ): log_record[key] = value return json.dumps(log_record) def trackme_reqinfo(session_key, splunkd_uri): """ Retrieve request info & settings with automatic retry logic. """ # Ensure splunkd_uri starts with "https://" if not splunkd_uri.startswith("https://"): splunkd_uri = f"https://{splunkd_uri}" # Build header and target URL headers = CaseInsensitiveDict() headers["Authorization"] = f"Splunk {session_key}" headers["Content-Type"] = "application/json" target_url = f"{splunkd_uri}/services/trackme/v2/configuration/request_info" # Create a requests session for better performance session = requests.Session() session.headers.update(headers) # Retry configuration max_retries = 5 base_delay = 2 # seconds for attempt in range(max_retries + 1): try: # Use a context manager to handle the request with session.get(target_url, verify=False) as response: if response.ok: logging.debug(f'Success retrieving conf on attempt {attempt + 1}, data="{response}"') response_json = response.json() return response_json else: error_message = f'Failed to retrieve conf on attempt {attempt + 1}, status_code={response.status_code}, response_text="{response.text}"' if attempt < max_retries: delay = base_delay * (2 ** attempt) # Exponential backoff: 2, 4, 8, 16, 32 seconds logging.warning(f'{error_message}. Retrying in {delay} seconds...') time.sleep(delay) continue else: logging.error(f'{error_message}. Max retries ({max_retries}) exceeded.') raise Exception(error_message) except Exception as e: error_message = f'Failed to retrieve conf on attempt {attempt + 1}, exception="{str(e)}"' if attempt < max_retries: delay = base_delay * (2 ** attempt) # Exponential backoff: 2, 4, 8, 16, 32 seconds logging.warning(f'{error_message}. Retrying in {delay} seconds...') time.sleep(delay) continue else: logging.error(f'{error_message}. Max retries ({max_retries}) exceeded.') raise Exception(error_message) def trackme_getloglevel(system_authtoken, splunkd_port): """ Simply get and return the loglevel with elevated privileges to avoid code duplication """ # Get service service = client.connect( owner="nobody", app="trackme", port=splunkd_port, token=system_authtoken, timeout=600, ) # set loglevel loglevel = "INFO" conf_file = "trackme_settings" confs = service.confs[str(conf_file)] for stanza in confs: if stanza.name == "logging": for stanzakey, stanzavalue in stanza.content.items(): if stanzakey == "loglevel": loglevel = stanzavalue return loglevel def trackme_get_version(service, log_context=None): """ Get TrackMe version from app configuration with fallback to file-based reading. This function handles permission issues that occur when DB Connect is installed and the user has limited privileges. When iterating over app_confs, it tries to access all apps including DB Connect which requires special capabilities ($db_connect_read_app_conf$). Falls back to reading app.conf file directly. Args: service: Splunk service object (from splunklib.client) log_context: Optional dictionary with context for logging (e.g., {'tenant_id': '...', 'instance_id': '...'}) Returns: str: TrackMe version string, or None if not found """ trackme_version = None try: app_confs = service.confs["app"] for stanza in app_confs: if stanza.name == "id": for stanzakey, stanzavalue in stanza.content.items(): if stanzakey == "version": trackme_version = stanzavalue break if trackme_version: break except Exception as e: # Handle permission errors when DB Connect is installed and user has limited privileges # When iterating over app_confs, it tries to access all apps including DB Connect which requires # special capabilities ($db_connect_read_app_conf$). Fall back to reading app.conf file directly. log_msg = "failed to retrieve trackme version via service API (likely due to DB Connect permission requirements)" if log_context: log_msg = f'{log_context.get("context_prefix", "")} {log_msg}'.strip() logging.debug(f'{log_msg}, exception="{str(e)}", trying file-based fallback') try: # Read from the app.conf file directly as a fallback using configparser # This is more robust than manual parsing and handles various INI file formats app_conf_path = os.path.join(splunkhome, "etc", "apps", "trackme", "default", "app.conf") if os.path.exists(app_conf_path): config = configparser.ConfigParser() # Preserve case sensitivity for section and option names config.optionxform = str try: config.read(app_conf_path) # Get version from [id] section if config.has_section('id') and config.has_option('id', 'version'): trackme_version = config.get('id', 'version').strip() except (configparser.Error, ValueError) as config_error: log_msg = f"failed to parse app.conf using configparser: {str(config_error)}" if log_context: log_msg = f'{log_context.get("context_prefix", "")} {log_msg}'.strip() logging.debug(log_msg) if not trackme_version: log_msg = "failed to retrieve trackme version from file, version will be None" if log_context: log_msg = f'{log_context.get("context_prefix", "")} {log_msg}'.strip() logging.warning(log_msg) except Exception as e2: log_msg = f'failed to retrieve trackme version via file fallback, exception="{str(e2)}", version will be None' if log_context: log_msg = f'{log_context.get("context_prefix", "")} {log_msg}'.strip() logging.warning(log_msg) # Continue with None version - the schema_version_required function should handle this return trackme_version def trackme_vtenant_account(session_key, splunkd_uri, tenant_id): """ Retrieve vtenant specific settings. """ # Ensure splunkd_uri starts with "https://" if not splunkd_uri.startswith("https://"): splunkd_uri = f"https://{splunkd_uri}" # Build header and target URL headers = CaseInsensitiveDict() headers["Authorization"] = f"Splunk {session_key}" headers["Content-Type"] = "application/json" target_url = f"{splunkd_uri}/services/trackme/v2/vtenants/vtenants_accounts" # Create a requests session for better performance session = requests.Session() session.headers.update(headers) try: # Use a context manager to handle the request with session.post( target_url, data=json.dumps({"tenant_id": tenant_id}), verify=False ) as response: if response.ok: logging.debug(f'Success retrieving conf, data="{response}"') response_json = response.json() return response_json else: error_message = f'Failed to retrieve conf, status_code={response.status_code}, response_text="{response.text}"' logging.error(error_message) raise Exception(error_message) except Exception as e: error_message = f'Failed to retrieve conf, exception="{str(e)}"' logging.error(error_message) raise Exception(error_message) def trackme_vtenant_component_info(session_key, splunkd_uri, tenant_id): """ Retrieve vtenant component information. """ # Ensure splunkd_uri starts with "https://" if not splunkd_uri.startswith("https://"): splunkd_uri = f"https://{splunkd_uri}" # Build header and target URL headers = CaseInsensitiveDict() headers["Authorization"] = f"Splunk {session_key}" headers["Content-Type"] = "application/json" target_url = f"{splunkd_uri}/services/trackme/v2/configuration/components" # Create a requests session for better performance session = requests.Session() session.headers.update(headers) try: # Use a context manager to handle the request with session.post( target_url, data=json.dumps({"tenant_id": tenant_id}), verify=False ) as response: if response.ok: logging.debug(f'Success retrieving conf, data="{response}"') response_json = response.json() return response_json else: error_message = f'Failed to retrieve conf, status_code={response.status_code}, response_text="{response.text}"' logging.error(error_message) raise Exception(error_message) except Exception as e: error_message = f'Failed to retrieve conf, exception="{str(e)}"' logging.error(error_message) raise Exception(error_message) def trackme_idx_for_tenant(session_key, splunkd_uri, tenant_id): """ Retrieve request info & settings. """ # Ensure splunkd_uri starts with "https://" if not splunkd_uri.startswith("https://"): splunkd_uri = f"https://{splunkd_uri}" # Build header and target URL headers = CaseInsensitiveDict() headers["Authorization"] = f"Splunk {session_key}" headers["Content-Type"] = "application/json" target_url = f"{splunkd_uri}/services/trackme/v2/vtenants/tenant_idx_settings" # Create a requests session for better performance session = requests.Session() session.headers.update(headers) try: # Use a context manager to handle the request with session.post( target_url, data=json.dumps({"tenant_id": tenant_id}), verify=False ) as response: if response.ok: logging.debug(f'Success retrieving conf, data="{response}"') response_json = response.json() return response_json else: error_message = f'Failed to retrieve conf, status_code={response.status_code}, response_text="{response.text}"' logging.error(error_message) raise Exception(error_message) except Exception as e: error_message = f'Failed to retrieve conf, exception="{str(e)}"' logging.error(error_message) raise Exception(error_message) def trackme_gen_state(index, source, sourcetype, event): try: # Create a dedicated logger for state events state_logger = logging.getLogger("trackme.state.events") state_logger.setLevel(logging.INFO) # Only add the handler if it doesn't exist yet if not state_logger.handlers: # Set up the file handler filehandler = RotatingFileHandler( f"{splunkhome}/var/log/splunk/trackme_state_events.log", mode="a", maxBytes=100000000, backupCount=1, ) formatter = JSONFormatter() logging.Formatter.converter = time.gmtime filehandler.setFormatter(formatter) state_logger.addHandler(filehandler) # Prevent propagation to root logger state_logger.propagate = False else: # Find the RotatingFileHandler among existing handlers filehandler = None for handler in state_logger.handlers: if isinstance(handler, RotatingFileHandler): filehandler = handler break # If no RotatingFileHandler found, create one if filehandler is None: filehandler = RotatingFileHandler( f"{splunkhome}/var/log/splunk/trackme_state_events.log", mode="a", maxBytes=100000000, backupCount=1, ) formatter = JSONFormatter() logging.Formatter.converter = time.gmtime filehandler.setFormatter(formatter) state_logger.addHandler(filehandler) # if the event is a string, convert it to a dictionary if isinstance(event, str): event = json.loads(event) # if the event_id is not in the event, generate it if "event_id" not in event: event["event_id"] = hashlib.sha256(json.dumps(event).encode()).hexdigest() # log the event state_logger.info( "TrackMe State Events", extra={ "target_index": index, "target_sourcetype": sourcetype, "target_source": source, "event": json.dumps(event), }, ) except Exception as e: raise Exception(str(e)) # # remote account connectivity # def is_reachable(session, url, timeout=15): try: session.get(url, timeout=timeout, verify=False) return True, None except Exception as e: return False, str(e) def select_url(session, splunk_url, timeout=15): splunk_urls = splunk_url.split(",") unreachable_errors = [] reachable_urls = [] for url in splunk_urls: reachable, error = is_reachable(session, url, timeout=timeout) if reachable: reachable_urls.append(url) else: unreachable_errors.append((url, error)) selected_url = random.choice(reachable_urls) if reachable_urls else False return selected_url, unreachable_errors def get_bearer_token(storage_passwords, account): # realm credential_realm = "__REST_CREDENTIAL__#trackme#configs/conf-trackme_account" credential_name = f"{credential_realm}:{account}``" # extract as raw json bearer_token_rawvalue = "" for credential in storage_passwords: if credential.content.get("realm") == str( credential_realm ) and credential.name.startswith(credential_name): bearer_token_rawvalue = bearer_token_rawvalue + str( credential.content.clear_password ) # extract a clean json object bearer_token_rawvalue_match = re.search( r'\{"bearer_token":\s*"(.*)"\}', bearer_token_rawvalue ) if bearer_token_rawvalue_match: bearer_token = bearer_token_rawvalue_match.group(1) else: bearer_token = None return bearer_token def establish_sdk_remote_service( parsed_url, bearer_token, app_namespace, account, timeout=600 ): # Set default port if not explicitly provided port = parsed_url.port or 443 # Combine hostname and path to handle sub-root endpoints, if any. base_path = parsed_url.path.rstrip("/") # Ensure no trailing slash host_with_path = f"{parsed_url.hostname}{base_path}" try: service = client.connect( host=host_with_path, splunkToken=str(bearer_token), owner="nobody", app=app_namespace, port=port, autologin=True, timeout=timeout, ) remote_apps = [app.label for app in service.apps] if remote_apps: logging.info( f'Remote search connectivity check to host="{parsed_url.hostname}" on port="{parsed_url.port}" was successful' ) return service except Exception as e: error_msg = f'Remote search for account="{account}" has failed at connectivity check, host="{parsed_url.hostname}" on port="{parsed_url.port}", url={host_with_path}, timeout={timeout}, exception="{str(e)}"' raise Exception(error_msg) return None # Test remote account connectivity, for a least privileges approach, this function uses a system_authtoken def trackme_test_remote_account(reqinfo, account): # get service service = client.connect( owner="nobody", app="trackme", port=reqinfo.server_rest_port, token=reqinfo.system_authtoken, timeout=600, ) # Splunk credentials store storage_passwords = service.storage_passwords # get all acounts accounts = [] conf_file = "trackme_account" # if there are no account, raise an exception, otherwise what we would do here? try: confs = service.confs[str(conf_file)] except Exception as e: error_msg = ( "splunkremotesearch was called but we have no remote account configured yet" ) raise Exception(error_msg) for stanza in confs: # get all accounts for name in stanza.name: accounts.append(stanza.name) break # account configuration isfound = False splunk_url = None app_namespace = None rbac_roles = None timeout_connect_check = None timeout_search_check = None token_rotation_enablement = None token_rotation_frequency = None # get account for stanza in confs: if stanza.name == str(account): isfound = True for key, value in stanza.content.items(): if key == "splunk_url": splunk_url = value if key == "app_namespace": app_namespace = value if key == "rbac_roles": rbac_roles = value if key == "timeout_connect_check": timeout_connect_check = int(value) if key == "timeout_search_check": timeout_search_check = int(value) if key == "token_rotation_enablement": token_rotation_enablement = int(value) if key == "token_rotation_frequency": token_rotation_frequency = int(value) # checks timeout if not timeout_connect_check: timeout_connect_check = 15 if not timeout_search_check: timeout_search_check = 300 # Stop here if we cannot find the submitted account if not isfound: error_msg = f'The account="{account}" has not been configured on this instance, cannot proceed!' logging.error(error_msg) raise Exception( { "status": "failure", "message": error_msg, "targets": account, } ) # Create a session within the generate function session = requests.Session() # Call target selector and pass the session as an argument selected_url, errors = select_url( session, splunk_url, timeout=timeout_connect_check ) # end of get configuration # If none of the endpoints could be reached if not selected_url: error_msg = f"None of the endpoints provided in the account URLs could be reached successfully, verify your network connectivity! (timeout: {timeout_connect_check}) " error_msg += "Errors: " + ", ".join( [f"{url}: {error}" for url, error in errors] ) logging.error(error_msg) raise Exception( { "status": "failure", "message": error_msg, "account": account, "targets": splunk_url, } ) # check the license try: check_license = trackme_check_license( reqinfo.server_rest_uri, reqinfo.session_key ) license_is_valid = check_license.get("license_is_valid") logging.debug( f'function check_license called, response="{json.dumps(check_license, indent=2)}"' ) except Exception as e: license_is_valid = 0 logging.error(f'function check_license exception="{str(e)}"') # try and return if license_is_valid != 1 and len(accounts) >= 2 and accounts[0] != account: error_msg = f"This TrackMe deployment is running in Free limited edition and you have reached the maximum number of 1 remote deployment, only the first remote account ({accounts[0]}) can be used" raise Exception( { "status": "failure", "message": error_msg, "account": account, } ) else: # Enforce https and remove trailing slash in the URL, if any selected_url = f"https://{selected_url.replace('https://', '').rstrip('/')}" # Splunk remote application namespace where searches are going to be executed, default to search if not defined if not app_namespace: app_namespace = "search" # else get the bearer token stored encrypted else: bearer_token = get_bearer_token(storage_passwords, account) if not bearer_token: error_msg = f'The bearer token for the account="{account}" could not be retrieved, cannot proceed!' raise Exception( { "status": "failure", "message": error_msg, "account": account, "host": parsed_url.hostname, "port": parsed_url.port, } ) else: # Use urlparse to extract relevant info from target parsed_url = urllib.parse.urlparse(selected_url) # Establish the remote service try: remoteservice = establish_sdk_remote_service( parsed_url, bearer_token, app_namespace, account, timeout=timeout_search_check, ) except Exception as e: error_msg = f'remote search for account="{account}" has failed at connectivity check, host="{parsed_url.hostname}" on port="{parsed_url.port}" for Splunk remote account="{account}", timeout={timeout_search_check}, exception="{str(e)}"' logging.error(error_msg) error_info = { "status": "failure", "message": f"remote search check failed at connectivity verification, response: {str(e)}", "account": account, "host": parsed_url.hostname, "port": parsed_url.port, "timeout_connect_check": timeout_connect_check, "timeout_search_check": timeout_search_check, "rbac_roles": rbac_roles, "app_namespace": app_namespace, "token_rotation_enablement": token_rotation_enablement, "token_rotation_frequency": token_rotation_frequency, } raise TrackMeRemoteConnectionError(error_info) if remoteservice: logging.info( f'remote search connectivity check to host="{parsed_url.hostname}" on port="{parsed_url.port}" for Splunk remote account="{account}" was successful' ) return { "status": "success", "message": "remote search connectivity check was successful, service was established", "account": account, "host": parsed_url.hostname, "port": parsed_url.port, "timeout_connect_check": timeout_connect_check, "timeout_search_check": timeout_search_check, "rbac_roles": rbac_roles, "app_namespace": app_namespace, "token_rotation_enablement": token_rotation_enablement, "token_rotation_frequency": token_rotation_frequency, } else: error_msg = "remote search connectivity check has failed to retrieve the list of applications on the remote system" logging.error(error_msg) raise Exception( { "status": "failure", "message": "remote search check failed at connectivity verification", "account": account, "host": parsed_url.hostname, "port": parsed_url.port, "timeout_connect_check": timeout_connect_check, "timeout_search_check": timeout_search_check, "rbac_roles": rbac_roles, "app_namespace": app_namespace, "token_rotation_enablement": token_rotation_enablement, "token_rotation_frequency": token_rotation_frequency, } ) # Test remote connectivity before the account is created, expects a dict account object containing required information to test the connectivity def trackme_test_remote_connectivity(connection_info): splunk_url = connection_info.get("target_endpoints") app_namespace = connection_info.get("app_namespace", "search") bearer_token = connection_info.get("bearer_token") timeout_connect_check = connection_info.get("timeout_connect_check", 15) timeout_search_check = connection_info.get("timeout_search_check", 300) # Create a session within the generate function session = requests.Session() # Call target selector and pass the session as an argument selected_url, errors = select_url( session, splunk_url, timeout=timeout_connect_check ) # end of get configuration # Stop here if none of the submitted endpoints can be reached if not selected_url: error_msg = f"None of the endpoints provided in the account URLs could be reached successfully. Verify your network connectivity! (timeout: {timeout_connect_check}) " error_msg += "Errors: " + ", ".join( [f"{url}: {error}" for url, error in errors] ) logging.error(error_msg) raise Exception( { "status": "failure", "message": error_msg, "targets": splunk_url, } ) # Enforce https and remove trailing slash in the URL, if any selected_url = f"https://{selected_url.replace('https://', '').rstrip('/')}" if not bearer_token: error_msg = f"The bearer token was not provided, cannot proceed!" raise Exception( { "status": "failure", "message": error_msg, "host": parsed_url.hostname, "port": parsed_url.port, } ) else: # Use urlparse to extract relevant info from target parsed_url = urllib.parse.urlparse(selected_url) # Establish the remote service try: remoteservice = establish_sdk_remote_service( parsed_url, bearer_token, app_namespace, "connection_test", timeout=timeout_search_check, ) except Exception as e: error_msg = f'remote search has failed at connectivitity check, host="{parsed_url.hostname}" on port="{parsed_url.port}", timeout={timeout_search_check}, exception="{str(e)}"' logging.error(error_msg) raise Exception( { "message": "remote search check failed at connectivity verification", "host": parsed_url.hostname, "port": parsed_url.port, "exception": str(e), } ) if remoteservice: logging.info( f'remote search connectivity check to host="{parsed_url.hostname}" on port="{parsed_url.port}" was successful' ) return { "status": "success", "message": "remote search connectivity check was successful, service was established", "host": parsed_url.hostname, "port": parsed_url.port, } else: error_msg = "remote search connectivity check has failed to retrieve the list of applications on the remote system" logging.error(error_msg) raise Exception( { "message": error_msg, "host": parsed_url.hostname, "port": parsed_url.port, } ) # Get remote account credentials, designed to be used for a least privileges approach in a programmatic approach def trackme_get_remote_account(reqinfo, account): # get service service = client.connect( owner="nobody", app="trackme", port=reqinfo.server_rest_port, token=reqinfo.system_authtoken, timeout=600, ) # Splunk credentials store storage_passwords = service.storage_passwords # get all acounts accounts = [] conf_file = "trackme_account" # if there are no account, raise an exception, otherwise what we would do here? try: confs = service.confs[str(conf_file)] except Exception as e: error_msg = ( "splunkremotesearch was called but we have no remote account configured yet" ) raise Exception(error_msg) for stanza in confs: # get all accounts for name in stanza.name: accounts.append(stanza.name) break # account configuration isfound = False splunk_url = None app_namespace = None rbac_roles = None timeout_connect_check = None timeout_search_check = None token_rotation_enablement = None token_rotation_frequency = None # get account for stanza in confs: if stanza.name == str(account): isfound = True for key, value in stanza.content.items(): if key == "splunk_url": splunk_url = value if key == "app_namespace": app_namespace = value if key == "rbac_roles": rbac_roles = value if key == "timeout_connect_check": timeout_connect_check = value if key == "timeout_search_check": timeout_search_check = value if key == "token_rotation_enablement": token_rotation_enablement = value if key == "token_rotation_frequency": token_rotation_frequency = value # end of get configuration # Stop here if we cannot find the submitted account if not isfound: error_msg = f'The account="{account}" has not been configured on this instance, cannot proceed!' raise Exception( { "status": "failure", "message": error_msg, "account": account, } ) # check the license try: check_license = trackme_check_license( reqinfo.server_rest_uri, reqinfo.session_key ) license_is_valid = check_license.get("license_is_valid") logging.debug( f'function check_license called, response="{json.dumps(check_license, indent=2)}"' ) except Exception as e: license_is_valid = 0 logging.error(f'function check_license exception="{str(e)}"') # try and return if license_is_valid != 1 and len(accounts) >= 2 and accounts[0] != account: error_msg = f"This TrackMe deployment is running in Free limited edition and you have reached the maximum number of 1 remote deployment, only the first remote account ({accounts[0]}) can be used" raise Exception( { "status": "failure", "message": error_msg, "account": account, } ) else: # Splunk remote application namespace where searches are going to be executed, default to search if not defined if not app_namespace: app_namespace = "search" # RBAC: the user must be a member of the grante roles for this account, for retro-compatibility purposes, # if this was not defined yet, use builtin TrackMe roles if not rbac_roles: rbac_roles = [ "admin", "sc_admin", "trackme_user", "trackme_power", "trackme_admin", ] else: rbac_roles = rbac_roles.split(",") # timeouts if not timeout_connect_check: timeout_connect_check = 10 if not timeout_search_check: timeout_search_check = 300 # get the bearer token stored encrypted bearer_token = get_bearer_token(storage_passwords, account) if not bearer_token: error_msg = f'The bearer token for the account="{account}" could not be retrieved, cannot proceed!' raise Exception( { "status": "failure", "message": error_msg, "account": account, "splunk_url": splunk_url, } ) else: # render return { "status": "success", "message": "remote search connectivity check was successful, service was established", "account": account, "splunk_url": splunk_url, "app_namespace": app_namespace, "token": bearer_token, "rbac_roles": rbac_roles, "timeout_connect_check": timeout_connect_check, "timeout_search_check": timeout_search_check, "token_rotation_enablement": token_rotation_enablement, "token_rotation_frequency": token_rotation_frequency, } # # # def trackme_get_report( session_key, splunkd_uri, tenant_id, report_name, ): parsed_url = urllib.parse.urlparse(splunkd_uri) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) # create a new report logging.info( f'tenant_id="{tenant_id}", attempting to get report report_name="{report_name}"' ) # get the report try: savedsearch = service.saved_searches[report_name] savedsearch_search = savedsearch.content["search"] savedsearch_cron_schedule = savedsearch.content["cron_schedule"] savedsearch_description = savedsearch.content["description"] savedsearch_disabled = savedsearch.content["disabled"] savedsearch_is_scheduled = savedsearch.content["is_scheduled"] savedsearch_schedule_window = savedsearch.content["schedule_window"] savedsearch_workload_pool = savedsearch.content["workload_pool"] savedsearch_earliest_time = savedsearch.content["dispatch.earliest_time"] savedsearch_latest_time = savedsearch.content["dispatch.latest_time"] logging.info( f'tenant_id="{tenant_id}", action="success", report_name="{report_name}"' ) return { "savedsearch_search": savedsearch_search, "savedsearch_cron_schedule": savedsearch_cron_schedule, "savedsearch_description": savedsearch_description, "savedsearch_disabled": savedsearch_disabled, "savedsearch_is_scheduled": savedsearch_is_scheduled, "savedsearch_schedule_window": savedsearch_schedule_window, "savedsearch_workload_pool": savedsearch_workload_pool, "savedsearch_earliest_time": savedsearch_earliest_time, "savedsearch_latest_time": savedsearch_latest_time, } except Exception as e: error_msg = f'tenant_id="{tenant_id}", failed to get report report_name="{report_name}" with exception:"{str(e)}"' logging.error(error_msg) raise Exception(error_msg) # # # def trackme_create_report( session_key, splunkd_uri, tenant_id, report_name, report_search, report_properties, report_acl, ): parsed_url = urllib.parse.urlparse(splunkd_uri) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) # create a new report logging.info( f'tenant_id="{tenant_id}", attempting to create report report_name="{report_name}"' ) # # Splunkd API needs a couple of seconds to refresh while macros were created # In a programmatic context, this may lead the report creation to be failing # the function will check the KO status, and wait if needed for a certain amount of time # set max failed re-attempt max_failures_count = 24 sleep_time = 5 creation_success = False current_failures_count = 0 while current_failures_count < max_failures_count and not creation_success: try: newtracker = service.saved_searches.create( str(report_name), str(report_search) ) logging.info( f'tenant_id="{tenant_id}", action="success", report_name="{report_name}"' ) creation_success = True break except Exception as e: # We except this sentence in the exception if the API is not ready yet logging.warning( f'tenant_id="{tenant_id}", temporary failure, the report is not yet available, will sleep and re-attempt, report report_name="{report_name}"' ) time.sleep(sleep_time) current_failures_count += 1 if current_failures_count >= max_failures_count: logging.error( f'tenant_id="{tenant_id}", max attempt reached, failure to create report report_name="{report_name}", report_search="{report_search}" with exception:"{str(e)}"' ) raise Exception( f'tenant_id="{tenant_id}", max attempt reached, failure to create report report_name="{report_name}", report_search="{report_search}" with exception:"{str(e)}"' ) # update the properties newtracker_update = service.saved_searches[str(report_name)] # Complete the report definition kwargs = report_properties # For optimization purposes, if the schedule is set to every 5 minutes, randomly choose an every 5 minutes schedule if kwargs.get("cron_schedule") == "*/5 * * * *": cron_random_list = [ "*/5 * * * *", "1-56/5 * * * *", "2-57/5 * * * *", "3-58/5 * * * *", "4-59/5 * * * *", ] kwargs["cron_schedule"] = random.choice(cron_random_list) elif kwargs.get("cron_schedule") == "*/10 * * * *": cron_random_list = [ "*/10 * * * *", "1-59/10 * * * *", "2-59/10 * * * *", "3-59/10 * * * *", "4-59/10 * * * *", "5-59/10 * * * *", "6-59/10 * * * *", "7-59/10 * * * *", "8-59/10 * * * *", "9-59/10 * * * *", ] kwargs["cron_schedule"] = random.choice(cron_random_list) elif kwargs.get("cron_schedule") == "*/15 * * * *": cron_random_list = [ "*/15 * * * *", "1-59/15 * * * *", "2-59/15 * * * *", "3-59/15 * * * *", "4-59/15 * * * *", "5-59/15 * * * *", "6-59/15 * * * *", "7-59/15 * * * *", "8-59/15 * * * *", "9-59/15 * * * *", "10-59/15 * * * *", "11-59/15 * * * *", "12-59/15 * * * *", "13-59/15 * * * *", "14-59/15 * * * *", ] kwargs["cron_schedule"] = random.choice(cron_random_list) elif kwargs.get("cron_schedule") == "*/20 * * * *": cron_random_list = [ "*/20 * * * *", "1-59/20 * * * *", "2-59/20 * * * *", "3-59/20 * * * *", "4-59/20 * * * *", "5-59/20 * * * *", "6-59/20 * * * *", "7-59/20 * * * *", "8-59/20 * * * *", "9-59/20 * * * *", "10-59/20 * * * *", "11-59/20 * * * *", "12-59/20 * * * *", "13-59/20 * * * *", "14-59/20 * * * *", "15-59/20 * * * *", "16-59/20 * * * *", "17-59/20 * * * *", "18-59/20 * * * *", "19-59/20 * * * *", ] kwargs["cron_schedule"] = random.choice(cron_random_list) elif kwargs.get("cron_schedule") == "*/20 22-23,0-6 * * *": cron_random_list = [ "*/20 22-23,0-6 * * *", "1-59/20 22-23,0-6 * * *", "2-59/20 22-23,0-6 * * *", "3-59/20 22-23,0-6 * * *", "4-59/20 22-23,0-6 * * *", "5-59/20 22-23,0-6 * * *", "6-59/20 22-23,0-6 * * *", "7-59/20 22-23,0-6 * * *", "8-59/20 22-23,0-6 * * *", "9-59/20 22-23,0-6 * * *", "10-59/20 22-23,0-6 * * *", "11-59/20 22-23,0-6 * * *", "12-59/20 22-23,0-6 * * *", "13-59/20 22-23,0-6 * * *", "14-59/20 22-23,0-6 * * *", "15-59/20 22-23,0-6 * * *", "16-59/20 22-23,0-6 * * *", "17-59/20 22-23,0-6 * * *", "18-59/20 22-23,0-6 * * *", "19-59/20 22-23,0-6 * * *", ] kwargs["cron_schedule"] = random.choice(cron_random_list) elif ( kwargs.get("cron_schedule") == "*/30 * * * *" or kwargs.get("cron_schedule") == "30 * * * *" ): cron_random_list = [ "*/30 * * * *", "1,31 * * * *", "2,32 * * * *", "3,33 * * * *", "4,34 * * * *", "5,35 * * * *", ] kwargs["cron_schedule"] = random.choice(cron_random_list) elif kwargs.get("cron_schedule") == "*/60 * * * *": cron_random_list = [ "*/60 * * * *", "1 * * * *", "2 * * * *", "3 * * * *", "4 * * * *", "5 * * * *", "6 * * * *", "7 * * * *", "8 * * * *", "9 * * * *", ] kwargs["cron_schedule"] = random.choice(cron_random_list) elif kwargs.get("cron_schedule") == "0 22-23,0-6 * * *": cron_random_list = [ "0 22-23,0-6 * * *", "1 22-23,0-6 * * *", "2 22-23,0-6 * * *", "3 22-23,0-6 * * *", "4 22-23,0-6 * * *", "5 22-23,0-6 * * *", "6 22-23,0-6 * * *", "7 22-23,0-6 * * *", "8 22-23,0-6 * * *", "9 22-23,0-6 * * *", ] kwargs["cron_schedule"] = random.choice(cron_random_list) # Filter out invalid cron_schedule values cron_schedule = kwargs.get("cron_schedule") if cron_schedule in (None, "None", "null", ""): kwargs.pop("cron_schedule", None) cron_schedule = None # verify the cron schedule validity, if submitted if cron_schedule: try: validate_cron_schedule(cron_schedule) except Exception as e: logging.error(str(e)) return { "payload": { "action": "failure", "response": str(e), }, "status": 500, } # Update the server and refresh the local copy of the object logging.info( f'tenant_id="{tenant_id}", attempting to update report_name="{report_name}" with kwargs="{json.dumps(kwargs, indent=1)}"' ) try: # # Splunkd API needs a couple of seconds to refresh while macros were created # In a programmatic context, this may lead the report creation to be failing # the function will check the KO status, and wait if needed for a certain amount of time # set max failed re-attempt max_failures_count = 24 sleep_time = 5 creation_success = False current_failures_count = 0 while current_failures_count < max_failures_count and not creation_success: try: newtracker_update.update(**kwargs).refresh() logging.info( f'tenant_id="{tenant_id}", action="success", report_name="{report_name}" with kwargs="{json.dumps(kwargs, indent=1)}"' ) newtracker_info = service.saved_searches[str(report_name)].get logging.debug( f'tenant_id="{tenant_id}", report_name="{report_name}", response="{newtracker_info}"' ) creation_success = True break except Exception as e: logging.warning( f'tenant_id="{tenant_id}", temporary failure, the report is not yet available, will sleep and re-attempt, report report_name="{report_name}"' ) time.sleep(sleep_time) current_failures_count += 1 if current_failures_count >= max_failures_count: logging.error( f'tenant_id="{tenant_id}", max attempt reached, failure to create report report_name="{report_name}" with exception:"{str(e)}"' ) raise Exception(str(e)) except Exception as e: logging.error( f'tenant_id="{tenant_id}", failure to update report report_name="{report_name}" with exception:"{str(e)}"' ) raise Exception(str(e)) # Handler RBAC logging.info( f'tenant_id="{tenant_id}", attempting to update report_name="{report_name}" with kwargs="{json.dumps(report_acl, indent=1)}"' ) try: service.post( "%s/%s" % (newtracker_update.links["alternate"], "acl"), body=urlencode(report_acl), ) logging.info( f'tenant_id="{tenant_id}", action="success", report_name="{report_name}" with kwargs="{json.dumps(report_acl, indent=1)}"' ) return { "action": "success", "tenant_id": tenant_id, "report_name": report_name, "report_search": report_search, "report_owner": report_acl.get("owner"), "report_perms_read": report_acl.get("perms.read"), "report_perms_write": report_acl.get("perms.write"), "description": kwargs.get("description"), "is_scheduled": kwargs.get("is_scheduled"), "schedule_window": kwargs.get("schedule_window"), "dispatch.earliest_time": kwargs.get("dispatch.earliest_time"), "dispatch.latest_time": kwargs.get("dispatch.latest_time"), "cron_schedule": kwargs.get("cron_schedule"), } except Exception as e: logging.error( f'tenant_id="{tenant_id}", failure to update report report_name="{report_name}" with exception:"{str(e)}"' ) raise Exception(str(e)) def trackme_manage_report_schedule( logger, session_key, splunkd_uri, tenant_id, report_name, input_report_properties=None, action=None, ): parsed_url = urllib.parse.urlparse(splunkd_uri) """ This function is used to enable, disable the report schedule, or show the current schedule enablement status """ # check action, allowed values are: enable, disable if action not in ["enable", "disable", "status"]: raise Exception( f'tenant_id="{tenant_id}", invalid action="{action}", allowed values are: enable, disable, status' ) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) # log start logger.debug( f'tenant_id="{tenant_id}", attempting to run handle schedule management with action={action} for report report_name="{report_name}"' ) # get the report object try: savedsearch_object = service.saved_searches[str(report_name)] except Exception as e: error_msg = f'tenant_id="{tenant_id}", failure to get report report_name="{report_name}" with exception:"{str(e)}"' logging.error(error_msg) raise Exception(error_msg) # # check orphan & retrieve acl # # Build header and target URL headers = { "Authorization": f"Splunk {session_key}", "Content-Type": "application/json", } # url url = f'{splunkd_uri}/{savedsearch_object.links["alternate"]}/acl/list?output_mode=json' try: response = requests.get(url, headers=headers, verify=False, timeout=600) response.raise_for_status() response_json = response.json() savedsearch_content = savedsearch_object.content savedsearch_acl = response_json.get("entry")[0]["acl"] # log logger.debug( f'tenant_id="{tenant_id}", action="success", report_name="{report_name}", savedsearch_content="{json.dumps(savedsearch_content, indent=2)}", savedsearch_acl="{json.dumps(savedsearch_acl, indent=2)}"' ) # get the report properties if input_report_properties is None: report_properties = { "description": savedsearch_content.get("description"), "disabled": savedsearch_content.get("disabled"), "is_scheduled": savedsearch_content.get("is_scheduled"), "schedule_window": savedsearch_content.get("schedule_window"), "cron_schedule": savedsearch_content.get("cron_schedule"), "dispatch.earliest_time": savedsearch_content.get("dispatch.earliest_time"), "dispatch.latest_time": savedsearch_content.get("dispatch.latest_time"), } else: report_properties = { "description": savedsearch_content.get("description"), "disabled": savedsearch_content.get("disabled"), "is_scheduled": input_report_properties.get("is_scheduled"), "schedule_window": input_report_properties.get("schedule_window"), "cron_schedule": input_report_properties.get("cron_schedule"), "dispatch.earliest_time": input_report_properties.get("dispatch.earliest_time"), "dispatch.latest_time": input_report_properties.get("dispatch.latest_time"), } # get the report acl report_acl = { "owner": savedsearch_acl.get("owner"), "app": savedsearch_acl.get("app"), "sharing": savedsearch_acl.get("sharing"), "perms_read": ",".join(savedsearch_acl.get("perms").get("read", [])), "perms_write": ",".join(savedsearch_acl.get("perms").get("write", [])), } except Exception as e: error_msg = f'tenant_id="{tenant_id}", failure to get report report_name="{report_name}" with exception:"{str(e)}"' raise Exception(error_msg) # for now, return if action == "status": return report_properties, report_acl elif action in ["enable", "disable"]: if action == "enable": report_properties["is_scheduled"] = 1 elif action == "disable": report_properties["is_scheduled"] = 0 try: savedsearch_object.update(**report_properties).refresh() except Exception as e: error_msg = f'tenant_id="{tenant_id}", failure to update report report_name="{report_name}" with exception:"{str(e)}"' raise Exception(error_msg) return report_properties, report_acl def trackme_delete_report(session_key, splunkd_uri, tenant_id, report_name): parsed_url = urllib.parse.urlparse(splunkd_uri) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) try: # first retrieve the report definition for logging purposes savedsearch = service.saved_searches[report_name] savedsearch_search = savedsearch.content["search"] savedsearch_cron_schedule = savedsearch.content["cron_schedule"] savedsearch_description = savedsearch.content["description"] savedsearch_is_scheduled = savedsearch.content["is_scheduled"] savedsearch_earliest_time = savedsearch.content["dispatch.earliest_time"] savedsearch_latest_time = savedsearch.content["dispatch.latest_time"] # delete report logging.info( f'tenant_id="{tenant_id}", attempting to delete report report_name="{report_name}", report_search="{savedsearch_search}", report_cron_schedule="{savedsearch_cron_schedule}", report_description="{savedsearch_description}", report_is_scheduled="{savedsearch_is_scheduled}", report_earliest_time="{savedsearch_earliest_time}", report_latest_time="{savedsearch_latest_time}"' ) service.saved_searches.delete(str(report_name)) logging.info( f'tenant_id="{tenant_id}", action="success", report_name="{report_name}"' ) return "success" except Exception as e: logging.error( f'tenant_id="{tenant_id}", failure to delete report report_name="{report_name}" with exception:"{str(e)}"' ) raise Exception(str(e)) def trackme_create_alert( session_key, splunkd_uri, tenant_id, alert_name, alert_search, properties, alert_properties, alert_acl, ): parsed_url = urllib.parse.urlparse(splunkd_uri) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) # Define an header for requests authenticated communications with splunkd header = { "Authorization": "Splunk %s" % session_key, "Content-Type": "application/json", } # create a new alert logging.info( f'tenant_id="{tenant_id}", attempting to create alert alert_name="{alert_name}"' ) # # Splunkd API needs a couple of seconds to refresh while macros were created # In a programmatic context, this may lead the report creation to be failing # the function will check the KO status, and wait if needed for a certain amount of time # set max failed re-attempt max_failures_count = 24 sleep_time = 5 creation_success = False current_failures_count = 0 while current_failures_count < max_failures_count and not creation_success: try: newalert = service.saved_searches.create(str(alert_name), str(alert_search)) logging.info( f'tenant_id="{tenant_id}", action="success", alert_name="{alert_name}"' ) creation_success = True break except Exception as e: # We except this sentence in the exception if the API is not ready yet logging.warning( f'tenant_id="{tenant_id}", temporary failure, the alert is not yet available, will sleep and re-attempt, alert alert_name="{alert_name}"' ) time.sleep(sleep_time) current_failures_count += 1 if current_failures_count >= max_failures_count: error_msg = f'tenant_id="{tenant_id}", max attempt reached, failure to create alert alert_name="{alert_name}" with exception:"{str(e)}"' logging.error(error_msg) raise Exception(error_msg) # update the properties newalert_update = service.saved_searches[str(alert_name)] # Complete the report definition logging.debug( f'tenant_id="{tenant_id}", properties="{properties}", alert_properties="{alert_properties}", alert_acl="{alert_acl}"' ) kwargs = {} kwargs.update(properties) kwargs.update(alert_properties) # For optimization purposes, if the schedule is set to every 5 minutes, randomly choose an every 5 minutes schedule if kwargs.get("cron_schedule") == "*/5 * * * *": cron_random_list = [ "*/5 * * * *", "1-56/5 * * * *", "2-57/5 * * * *", "3-58/5 * * * *", "4-59/5 * * * *", ] kwargs["cron_schedule"] = random.choice(cron_random_list) elif kwargs.get("cron_schedule") == "*/10 * * * *": cron_random_list = [ "*/10 * * * *", "1-59/10 * * * *", "2-59/10 * * * *", "3-59/10 * * * *", "4-59/10 * * * *", "5-59/10 * * * *", "6-59/10 * * * *", "7-59/10 * * * *", "8-59/10 * * * *", "9-59/10 * * * *", ] kwargs["cron_schedule"] = random.choice(cron_random_list) elif kwargs.get("cron_schedule") == "*/15 * * * *": cron_random_list = [ "*/10 * * * *", "1-59/10 * * * *", "2-59/10 * * * *", "3-59/10 * * * *", "4-59/10 * * * *", "5-59/10 * * * *", "6-59/10 * * * *", "7-59/10 * * * *", "8-59/10 * * * *", "9-59/10 * * * *", "10-59/10 * * * *", "11-59/10 * * * *", "12-59/10 * * * *", "13-59/10 * * * *", "14-59/10 * * * *", ] kwargs["cron_schedule"] = random.choice(cron_random_list) elif ( kwargs.get("cron_schedule") == "*/30 * * * *" or kwargs.get("cron_schedule") == "30 * * * *" ): cron_random_list = [ "*/30 * * * *", "1,31 * * * *", "2,32 * * * *", "3,33 * * * *", "4,34 * * * *", "5,35 * * * *", ] kwargs["cron_schedule"] = random.choice(cron_random_list) elif kwargs.get("cron_schedule") == "*/60 * * * *": cron_random_list = [ "*/60 * * * *", "2,32 * * * *", "3,33 * * * *", "4,34 * * * *", "5,35 * * * *", "6,36 * * * *", "7,37 * * * *", "8,38 * * * *", "9,39 * * * *", ] kwargs["cron_schedule"] = random.choice(cron_random_list) elif kwargs.get("cron_schedule") == "0 22-23,0-6 * * *": cron_random_list = [ "0 22-23,0-6 * * *", "1 22-23,0-6 * * *", "2 22-23,0-6 * * *", "3 22-23,0-6 * * *", "4 22-23,0-6 * * *", "5 22-23,0-6 * * *", "6 22-23,0-6 * * *", "7 22-23,0-6 * * *", "8 22-23,0-6 * * *", "9 22-23,0-6 * * *", ] kwargs["cron_schedule"] = random.choice(cron_random_list) # verify the cron schedule validity, if submitted if kwargs.get("cron_schedule"): try: validate_cron_schedule(kwargs.get("cron_schedule")) except Exception as e: logging.error(str(e)) return { "payload": { "action": "failure", "response": str(e), }, "status": 500, } # Update the server and refresh the local copy of the object logging.info( f'tenant_id="{tenant_id}", attempting to update alert_name="{alert_name}" with kwargs="{json.dumps(kwargs, indent=1)}"' ) try: newalert_update.update(**kwargs).refresh() logging.info( f'tenant_id="{tenant_id}", action="success", alert_name="{alert_name}" with kwargs="{json.dumps(kwargs, indent=1)}"' ) except Exception as e: error_msg = f'tenant_id="{tenant_id}", failure to update alert alert_name="{alert_name}", kwargs="{json.dumps(kwargs, indent=1)}" with exception:"{str(e)}"' logging.error(error_msg) raise Exception(error_msg) record_url = f"{splunkd_uri}/servicesNS/nobody/trackme/saved/searches/{urllib.parse.quote(alert_name)}/acl" logging.info( f'tenant_id="{tenant_id}", attempting to update alert_name="{alert_name}"' ) try: response = requests.post( record_url, headers=header, data=alert_acl, verify=False, timeout=600 ) logging.info( f'tenant_id="{tenant_id}", action="success", alert_name="{alert_name}"' ) except Exception as e: error_msg = f'tenant_id="{tenant_id}", failure to update alert alert_name="{alert_name}", alert_acl="{json.dumps(alert_acl, indent=1)}" with exception:"{str(e)}"' logging.error(error_msg) raise Exception(error_msg) return { "action": "success", "tenant_id": tenant_id, "alert_name": alert_name, "alert_search": alert_search, "alert_owner": alert_acl.get("owner"), "report_perms_read": alert_acl.get("perms.read"), "report_perms_write": alert_acl.get("perms.write"), "description": kwargs.get("description"), "is_scheduled": kwargs.get("is_scheduled"), "schedule_window": kwargs.get("schedule_window"), "dispatch.earliest_time": kwargs.get("dispatch.earliest_time"), "dispatch.latest_time": kwargs.get("dispatch.latest_time"), "cron_schedule": kwargs.get("cron_schedule"), } def trackme_create_kvcollection( session_key, splunkd_uri, tenant_id, collection_name, collection_acl ): parsed_url = urllib.parse.urlparse(splunkd_uri) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) # Define an header for requests authenticated communications with splunkd header = { "Authorization": "Splunk %s" % session_key, "Content-Type": "application/json", } # create a new KVstore collection # if the collection is found, print it out # if not, then create the collection if collection_name not in service.kvstore: logging.info( f'tenant_id="{tenant_id}", attempting to create collection collection_name="{collection_name}"' ) try: service.kvstore.create( collection_name, **{"app": "trackme", "owner": "nobody"} ) logging.info( f'tenant_id="{tenant_id}", action="success", collection_name="{collection_name}"' ) except Exception as e: logging.error( f'tenant_id="{tenant_id}", failure to create collection collection_name="{collection_name}" with exception:"{str(e)}"' ) raise Exception( f'tenant_id="{tenant_id}", failure to create collection collection_name="{collection_name}" with exception:"{str(e)}"' ) record_url = f"{splunkd_uri}/servicesNS/nobody/trackme/storage/collections/config/{collection_name}/acl" logging.info( f'tenant_id="{tenant_id}", attempting to update collection collection_name="{collection_name}"' ) try: response = requests.post( record_url, headers=header, data=collection_acl, verify=False, timeout=600, ) logging.info( f'tenant_id="{tenant_id}", action="success", collection_name="{collection_name}"' ) return "success" except Exception as e: logging.error( f'tenant_id="{tenant_id}", failure to update collection collection_name="{collection_name}" with exception:"{str(e)}"' ) raise Exception(str(e)) def trackme_delete_kvcollection(session_key, splunkd_uri, tenant_id, collection_name): parsed_url = urllib.parse.urlparse(splunkd_uri) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) logging.info( f'tenant_id="{tenant_id}", attempting to delete collection collection_name="{collection_name}"' ) try: service.kvstore.delete(collection_name) logging.info( f'tenant_id="{tenant_id}", action="success", collection_name="{collection_name}"' ) return "success" except Exception as e: logging.error( f'tenant_id="{tenant_id}", failure to delete collection collection_name="{collection_name}" with exception:"{str(e)}"' ) raise Exception(str(e)) def trackme_create_kvtransform( session_key, splunkd_uri, tenant_id, transform_name, transform_fields, collection_name, transform_owner, transform_acl, ): parsed_url = urllib.parse.urlparse(splunkd_uri) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) # Define an header for requests authenticated communications with splunkd header = { "Authorization": "Splunk %s" % session_key, "Content-Type": "application/json", } # transforms transforms = service.confs["transforms"] logging.info( f'tenant_id="{tenant_id}", attempting to create transforms transforms_name="{transform_name}"' ) try: transforms.create( name=str(transform_name), **{ "app": "trackme", "sharing": "app", "external_type": "kvstore", "collection": str(collection_name), "fields_list": transform_fields, "owner": transform_owner, }, ) logging.info( f'tenant_id="{tenant_id}", action="success", transforms_name="{transform_name}"' ) except Exception as e: logging.error( f'tenant_id="{tenant_id}", failure to create transforms transforms_name="{transform_name}" with exception:"{str(e)}"' ) raise Exception( f'tenant_id="{tenant_id}", failure to create transforms transforms_name="{transform_name}" with exception:"{str(e)}"' ) record_url = f"{splunkd_uri}/servicesNS/admin/trackme/data/transforms/lookups/{transform_name}/acl" logging.info( f'tenant_id="{tenant_id}", attempting to update transforms transforms_name="{transform_name}"' ) try: response = requests.post( record_url, headers=header, data=transform_acl, verify=False, timeout=600, ) logging.info( f'tenant_id="{tenant_id}", action="success", transforms_name="{transform_name}"' ) return "success" except Exception as e: logging.error( f'tenant_id="{tenant_id}", failure to update transforms transforms_name="{transform_name}" with exception:"{str(e)}"' ) raise Exception(str(e)) def trackme_delete_kvtransform(session_key, splunkd_uri, tenant_id, transform_name): parsed_url = urllib.parse.urlparse(splunkd_uri) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) # transforms transforms = service.confs["transforms"] # proceed logging.info( f'tenant_id="{tenant_id}", attempting to delete transform transform_name="{transform_name}"' ) try: transforms.delete(name=str(transform_name)) logging.info( f'tenant_id="{tenant_id}", action="success", transform_name="{transform_name}"' ) return "success" except Exception as e: logging.error( f'tenant_id="{tenant_id}", failure to delete transform transform_name="{transform_name}" with exception:"{str(e)}"' ) raise Exception(str(e)) def trackme_create_macro( session_key, splunkd_uri, tenant_id, macro_name, macro_definition, macro_owner, macro_acl, ): parsed_url = urllib.parse.urlparse(splunkd_uri) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) # Define an header for requests authenticated communications with splunkd header = { "Authorization": "Splunk %s" % session_key, "Content-Type": "application/json", } # macros macros = service.confs["macros"] logging.info( f'tenant_id="{tenant_id}", attempting to create macro macro_name="{macro_name}"' ) try: macros.create( name=str(macro_name), **{ "app": "trackme", "sharing": "app", "definition": str(macro_definition), "owner": str(macro_owner), }, ) logging.info( f'tenant_id="{tenant_id}", action="success", macro_name="{macro_name}"' ) except Exception as e: logging.error( f'tenant_id="{tenant_id}", failure to create macro macro_name="{macro_name}" with exception:"{str(e)}"' ) raise Exception(str(e)) record_url = f"{splunkd_uri}/servicesNS/admin/trackme/data/macros/{macro_name}/acl" logging.info( f'tenant_id="{tenant_id}", attempting to update macro macro_name="{macro_name}"' ) try: # # Splunkd API needs a couple of seconds to refresh while macros were created # In a programmatic context, this may lead the report creation to be failing # the function will check the KO status, and wait if needed for a certain amount of time # set max failed re-attempt max_failures_count = 24 sleep_time = 5 creation_success = False current_failures_count = 0 while current_failures_count < max_failures_count and not creation_success: try: response = requests.post( record_url, headers=header, data=macro_acl, verify=False, timeout=600, ) logging.info( f'tenant_id="{tenant_id}", action="success", macro_name="{macro_name}"' ) new_macro = macros[str(macro_name)].get logging.debug( f'tenant_id="{tenant_id}", macro_name="{macro_name}", response="{new_macro}"' ) creation_success = True break except Exception as e: logging.warning( f'tenant_id="{tenant_id}", temporary failure, the macro is not yet available, will sleep and re-attempt, macro macro_name="{macro_name}"' ) time.sleep(sleep_time) current_failures_count += 1 if current_failures_count >= max_failures_count: logging.error( f'tenant_id="{tenant_id}", max attempt reached, failure to create macro macro_name="{macro_name}" with exception:"{str(e)}"' ) raise Exception(str(e)) return "success" except Exception as e: logging.error( f'tenant_id="{tenant_id}", failure to update macro macro_name="{macro_name}" with exception:"{str(e)}"' ) raise Exception(str(e)) def trackme_delete_macro(session_key, splunkd_uri, tenant_id, macro_name): parsed_url = urllib.parse.urlparse(splunkd_uri) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) # macros macros = service.confs["macros"] try: # get the definition first macro_definition = macros[macro_name].content["definition"] logging.info( f'tenant_id="{tenant_id}", attempting to delete macro macro_name="{macro_name}", macro_definition="{macro_definition}"' ) # delete macros.delete(name=str(macro_name)) logging.info( f'tenant_id="{tenant_id}", action="success", macro_name="{macro_name}"' ) return "success" except Exception as e: logging.error( f'tenant_id="{tenant_id}", failure to delete macro macro_name="{macro_name}" with exception:"{str(e)}"' ) raise Exception(str(e)) def trackme_report_update_enablement( session_key, splunkd_uri, tenant_id, report_name, action ): parsed_url = urllib.parse.urlparse(splunkd_uri) # Define an header for requests authenticated communications with splunkd header = { "Authorization": "Splunk %s" % session_key, "Content-Type": "application/json", } if action not in ("enable", "disable"): raise Exception( f'Invalid value for action="{action}", valid options are: enable | disable' ) else: record_url = f"{splunkd_uri}/servicesNS/nobody/trackme/saved/searches/{urllib.parse.quote(str(report_name))}/{action}" logging.info( f'tenant_id="{tenant_id}", attempting to {action} report report_name="{report_name}"' ) try: response = requests.post( record_url, headers=header, verify=False, timeout=600 ) logging.info( f'tenant_id="{tenant_id}", action="success", report_name="{report_name}"' ) return "success" except Exception as e: logging.error( f'tenant_id="{tenant_id}", failure to update report report_name="{report_name}" with exception:"{str(e)}"' ) raise Exception(str(e)) def trackme_macro_update_enablement( session_key, splunkd_uri, tenant_id, macro_name, action ): parsed_url = urllib.parse.urlparse(splunkd_uri) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) # get macros macros = service.confs["macros"] if action not in ("enable", "disable"): raise Exception( f'Invalid value for action="{action}", valid options are: enable | disable' ) else: if action == "enable": kwargs = {"disabled": "false"} elif action == "disable": kwargs = {"disabled": "true"} # update the properties macro_update = macros[str(macro_name)] logging.info( f'tenant_id="{tenant_id}", attempting to update macro macro_name="{macro_name}"' ) try: macro_update.update(**kwargs).refresh() logging.info( f'tenant_id="{tenant_id}", action="success", macro_name="{macro_name}"' ) return "success" except Exception as e: logging.error( f'tenant_id="{tenant_id}", failure to update macro macro_name="{macro_name}" with exception:"{str(e)}"' ) raise Exception(str(e)) def trackme_kvcollection_update_enablement( session_key, splunkd_uri, tenant_id, collection_name, action ): parsed_url = urllib.parse.urlparse(splunkd_uri) # Define an header for requests authenticated communications with splunkd header = { "Authorization": "Splunk %s" % session_key, "Content-Type": "application/json", } if action not in ("enable", "disable"): raise Exception( f'Invalid value for action="{action}", valid options are: enable | disable' ) else: record_url = f"{splunkd_uri}/servicesNS/nobody/trackme/storage/collections/config/{collection_name}/{action}" logging.info( f'tenant_id="{tenant_id}", attempting to {action} collection collection_name="{collection_name}"' ) try: response = requests.post( record_url, headers=header, verify=False, timeout=600 ) logging.info( f'tenant_id="{tenant_id}", action="success", collection_name="{collection_name}"' ) return "success" except Exception as e: logging.error( f'tenant_id="{tenant_id}", failure to update collection collection_name="{collection_name}" with exception:"{str(e)}"' ) raise Exception(str(e)) def trackme_transform_update_enablement( session_key, splunkd_uri, tenant_id, transform_name, action ): parsed_url = urllib.parse.urlparse(splunkd_uri) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) # get transforms transforms = service.confs["transforms"] if action not in ("enable", "disable"): raise Exception( f'Invalid value for action="{action}", valid options are: enable | disable' ) else: if action == "enable": kwargs = {"disabled": "false"} elif action == "disable": kwargs = {"disabled": "true"} # update the properties transform_update = transforms[str(transform_name)] logging.info( f'tenant_id="{tenant_id}", attempting to update transform="{transform_name}"' ) try: transform_update.update(**kwargs).refresh() logging.info( f'tenant_id="{tenant_id}", action="success", transform_name="{transform_name}"' ) return "success" except Exception as e: logging.error( f'tenant_id="{tenant_id}", failure to update transforms transform_name="{transform_name}" with exception:"{str(e)}"' ) raise Exception(str(e)) def trackme_audit_event( session_key, splunkd_uri, tenant_id, user, action, change_type, object_name, object_category, object_attrs, result, comment, object_id=None, ): # Define an header for requests authenticated communications with splunkd header = { "Authorization": "Splunk %s" % session_key, "Content-Type": "application/json", } # Audit # set url = "%s/services/trackme/v2/audit/audit_events_v2" % splunkd_uri # set events list audit_events = { "action": action, "change_type": change_type, "object": object_name, "object_category": object_category, "object_attrs": object_attrs, "user": user, "result": result, "comment": comment, } if object_id: audit_events["object_id"] = object_id # set data data = {"tenant_id": f"{tenant_id}", "audit_events": [audit_events]} # Proceed try: # Validate data before sending try: json_data = json.dumps(data) except (TypeError, ValueError) as json_error: error_message = f'Failed to serialize audit data to JSON, error="{str(json_error)}", data="{data}"' logging.error(error_message) raise Exception(error_message) response = requests.post( url, headers=header, data=json_data, verify=False, timeout=600 ) if response.ok: logging.debug(f'Success audit event, data="{response}"') response_json = response.json() return response_json else: error_message = f'Failed to generate an audit event, status_code={response.status_code}, response_text="{response.text}"' logging.error(error_message) raise Exception(error_message) except Exception as e: error_msg = f'trackme_audit_event has failed, exception="{str(e)}"' raise Exception(error_msg) def trackme_audit_flip( session_key, splunkd_uri, tenant_id, keyid, alias, object, object_category, priority, object_state, object_previous_state, latest_flip_time, latest_flip_state, anomaly_reason, result, ): # Define an header for requests authenticated communications with splunkd header = { "Authorization": "Splunk %s" % session_key, "Content-Type": "application/json", } # set url = "%s/services/trackme/v2/audit/flip_event" % splunkd_uri data = { "tenant_id": str(tenant_id), "keyid": str(keyid), "alias": str(alias), "object": str(object), "object_category": str(object_category), "priority": str(priority), "object_state": str(object_state), "object_previous_state": str(object_previous_state), "latest_flip_time": str(latest_flip_time), "latest_flip_state": str(latest_flip_state), "anomaly_reason": str(anomaly_reason), "result": str(result), } # Proceed try: response = requests.post( url, headers=header, data=json.dumps(data, indent=1), verify=False, timeout=600, ) if response.ok: logging.debug(f'Success flip event, data="{response}"') response_json = response.json() return response_json else: error_message = f'Failed to generate a flip event, status_code={response.status_code}, response_text="{response.text}", data="{json.dumps(data, indent=1)}"' logging.error(error_message) raise Exception(error_message) except Exception as e: error_msg = f'trackme_audit_flip has failed, exception="{str(e)}"' raise Exception(error_msg) def trackme_state_event( session_key, splunkd_uri, tenant_id, index, sourcetype, source, record ): # Define an header for requests authenticated communications with splunkd header = { "Authorization": "Splunk %s" % session_key, "Content-Type": "application/json", } # set url = "%s/services/trackme/v2/audit/state_event" % splunkd_uri data = { "tenant_id": str(tenant_id), "index": str(index), "sourcetype": str(sourcetype), "source": str(source), "record": record, } # Proceed try: response = requests.post( url, headers=header, data=json.dumps(data), verify=False, timeout=600 ) if response.ok: logging.debug(f'Success state event, data="{response}"') response_json = response.json() return response_json else: error_message = f'Failed to generate a state event, status_code={response.status_code}, response_text="{response.text}"' logging.error(error_message) raise Exception(error_message) except Exception as e: error_msg = f'trackme_state_event has failed, exception="{str(e)}"' raise Exception(error_msg) # Register multiple handler events at once def trackme_handler_events( session_key, splunkd_uri, tenant_id, handler_events, source, sourcetype ): # Define an header for requests authenticated communications with splunkd header = { "Authorization": f"Splunk {session_key}", "Content-Type": "application/json", } # set url = f"{splunkd_uri}/services/trackme/v2/audit/handler_events" data = { "tenant_id": str(tenant_id), "handler_events": handler_events, "source": str(source), "sourcetype": str(sourcetype), } # check if the handler_events is a list, otherwise convert it to a list if not isinstance(handler_events, list): handler_events = [handler_events] # Proceed try: response = requests.post( url, headers=header, data=json.dumps(data), verify=False, timeout=600 ) if response.ok: logging.debug(f'Success handler event, data="{response}"') response_json = response.json() return response_json else: error_message = f'Failed to generate a handler event, status_code={response.status_code}, response_text="{response.text}"' logging.error(error_message) raise Exception(error_message) except Exception as e: error_msg = f'trackme_handler_events has failed, exception="{str(e)}"' raise Exception(error_msg) def trackme_components_register_gen_metrics( session_key, splunkd_uri, tenant_id, records ): # proceed try: # get the target index tenant_indexes = trackme_idx_for_tenant( session_key, splunkd_uri, tenant_id, ) # Create a dedicated logger for component metrics metrics_logger = logging.getLogger("trackme.components.metrics") metrics_logger.setLevel(logging.INFO) # Only add the handler if it doesn't exist yet if not metrics_logger.handlers: # Set up the file handler filehandler = RotatingFileHandler( f"{splunkhome}/var/log/splunk/trackme_components_register_metrics.log", mode="a", maxBytes=100000000, backupCount=1, ) formatter = JSONFormatter() logging.Formatter.converter = time.gmtime filehandler.setFormatter(formatter) metrics_logger.addHandler(filehandler) # Prevent propagation to root logger metrics_logger.propagate = False else: # Find the RotatingFileHandler among existing handlers filehandler = None for handler in metrics_logger.handlers: if isinstance(handler, RotatingFileHandler): filehandler = handler break # If no RotatingFileHandler found, create one if filehandler is None: filehandler = RotatingFileHandler( f"{splunkhome}/var/log/splunk/trackme_components_register_metrics.log", mode="a", maxBytes=100000000, backupCount=1, ) formatter = JSONFormatter() logging.Formatter.converter = time.gmtime filehandler.setFormatter(formatter) metrics_logger.addHandler(filehandler) for record in records: metrics_logger.info( "Metrics - group=components_register_metrics", extra={ "target_index": tenant_indexes["trackme_metric_idx"], "tenant_id": tenant_id, "component": record.get("component"), "tracker": record.get("tracker"), "metrics_event": json.dumps(record.get("metrics_event")), }, ) return True except Exception as e: raise Exception(str(e)) # register the tenant object summary status def trackme_register_tenant_object_summary( session_key, splunkd_uri, tenant_id, component, report, last_status, last_exec, last_duration, last_result, earliest, latest, ): parsed_url = urllib.parse.urlparse(splunkd_uri) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) # Register the object summary in the vtenant collection collection_vtenants_name = "kv_trackme_virtual_tenants" collection_vtenants = service.kvstore[collection_vtenants_name] # Define the KV query search string query_string = { "tenant_id": tenant_id, } # log logging.debug( f'Starting function trackme_register_tenant_object_summary, tenant_id="{tenant_id}", component="{component}", report="{report}", last_exec="{last_exec}", last_status="{last_status}", last_duration="{last_duration}", last_result="{last_result}", earliest="{earliest}", latest="{latest}"' ) try: vtenant_record = collection_vtenants.data.query(query=json.dumps(query_string))[ 0 ] vtenant_key = vtenant_record.get("_key") logging.debug( f'The vtenant_key was successfully found in the collection, query_string="{query_string}", originating_report="{report}"' ) except Exception as e: vtenant_key = None logging.error( f'The vtenant_key was not found in the collection, query_string="{query_string}", originating_report="{report}"' ) if vtenant_key: # try to load the dict try: tenant_objects_exec_summary = json.loads( vtenant_record.get("tenant_objects_exec_summary") ) except Exception as e: tenant_objects_exec_summary = None # logging debug logging.debug( f'tenant_id="{tenant_id}", component="{component}", report="{report}", Retrieve tenant_objects_exec_summary="{tenant_objects_exec_summary}"' ) # add to existing disct if tenant_objects_exec_summary and tenant_objects_exec_summary != "None": try: # log logging.debug( f'tenant_id="{tenant_id}", component="{component}", report="{report}", Updating the existing record in the dictionnary, summary_dict="{json.dumps(tenant_objects_exec_summary, indent=1)}"' ) report_dict = tenant_objects_exec_summary[report] # Update the existing record in the dict report_dict["component"] = str(component) report_dict["last_status"] = str(last_status) report_dict["last_exec"] = str(last_exec) report_dict["last_duration"] = round(float(last_duration), 3) report_dict["last_result"] = str(last_result) report_dict["earliest"] = str(earliest) report_dict["latest"] = str(latest) # persistent report_dict["persistent"] = "False" # sort report_dict alphabetically tenant_objects_exec_summary[report] = dict(sorted(report_dict.items())) # generate metrics components_register_metrics_gen_start = time.time() # convert the value of last_status from a string to a numerical: # - 1: success # - 2: failure # - 3: unknown if last_status == "success": last_status_num = 1 elif last_status == "failure": last_status_num = 2 else: last_status_num = 3 # check that last_duration is a positive numerical, integer or float, otherqise set it to 0 if last_duration: try: last_duration = round(float(last_duration), 3) except Exception as e: last_duration = 0 else: last_duration = 0 try: components_register_metrics = ( trackme_components_register_gen_metrics( session_key, splunkd_uri, tenant_id, [ { "tenant_id": tenant_id, "component": component, "tracker": report, "metrics_event": { "status": last_status_num, "runtime": last_duration, }, } ], ) ) logging.info( f'context="components_register_gen_metrics", tenant_id="{tenant_id}", function trackme_components_register_gen_metrics success {components_register_metrics}, run_time={round(time.time()-components_register_metrics_gen_start, 3)}' ) except Exception as e: logging.error( f'context="components_register_gen_metrics", tenant_id="{tenant_id}", function trackme_components_register_gen_metrics failed with exception {str(e)}' ) except Exception as e: # set the dict summary_dict = { report: { "component": component, "last_status": last_status, "last_exec": last_exec, "last_duration": last_duration, "last_result": last_result, "earliest": earliest, "latest": latest, } } # log logging.debug( f'tenant_id="{tenant_id}", component="{component}", report="{report}", Adding a new record to the dictionnary, summary_dict="{json.dumps(summary_dict, indent=1)}"' ) # Update with a new record tenant_objects_exec_summary.update(summary_dict) # Empty dict else: # Set the dict tenant_objects_exec_summary = { report: { "component": component, "last_status": last_status, "last_exec": last_exec, "last_duration": last_duration, "last_result": last_result, "earliest": earliest, "latest": latest, } } # log logging.debug( f'tenant_id="{tenant_id}", component="{component}", report="{report}", Creating a new dictionnary, tenant_objects_exec_summary="{json.dumps(tenant_objects_exec_summary, indent=1)}"' ) # logging debug logging.debug( f'tenant_id="{tenant_id}", component="{component}", report="{report}", Ended processing, tenant_objects_exec_summary="{tenant_objects_exec_summary}"' ) try: vtenant_record["tenant_objects_exec_summary"] = json.dumps( tenant_objects_exec_summary, indent=2 ) collection_vtenants.data.update( str(vtenant_key), json.dumps(vtenant_record) ) except Exception as e: logging.error( f'failure while trying to update the vtenant KVstore record, exception="{str(e)}"' ) # register the tenant object summary status (do not gen metrics, non persistent) def trackme_register_tenant_object_summary_gen_non_persistent( session_key, splunkd_uri, tenant_id, component, report, last_status, last_exec, last_duration, last_result, earliest, latest, ): parsed_url = urllib.parse.urlparse(splunkd_uri) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) # Register the object summary in the vtenant collection collection_vtenants_name = "kv_trackme_virtual_tenants" collection_vtenants = service.kvstore[collection_vtenants_name] # Define the KV query search string query_string = { "tenant_id": tenant_id, } # log logging.debug( f'Starting function trackme_register_tenant_object_summary, tenant_id="{tenant_id}", component="{component}", report="{report}", last_exec="{last_exec}", last_status="{last_status}", last_duration="{last_duration}", last_result="{last_result}", earliest="{earliest}", latest="{latest}"' ) try: vtenant_record = collection_vtenants.data.query(query=json.dumps(query_string))[ 0 ] vtenant_key = vtenant_record.get("_key") logging.debug( f'The vtenant_key was successfully found in the collection, query_string="{query_string}", originating_report="{report}"' ) except Exception as e: vtenant_key = None logging.error( f'The vtenant_key was not found in the collection, query_string="{query_string}", originating_report="{report}"' ) if vtenant_key: # try to load the dict try: tenant_objects_exec_summary = json.loads( vtenant_record.get("tenant_objects_exec_summary") ) except Exception as e: tenant_objects_exec_summary = None # logging debug logging.debug( f'tenant_id="{tenant_id}", component="{component}", report="{report}", Retrieve tenant_objects_exec_summary="{tenant_objects_exec_summary}"' ) # add to existing disct if tenant_objects_exec_summary and tenant_objects_exec_summary != "None": try: # log logging.debug( f'tenant_id="{tenant_id}", component="{component}", report="{report}", Updating the existing record in the dictionnary, summary_dict="{json.dumps(tenant_objects_exec_summary, indent=1)}"' ) report_dict = tenant_objects_exec_summary[report] # Update the existing record in the dict report_dict["component"] = str(component) report_dict["last_status"] = str(last_status) report_dict["last_exec"] = str(last_exec) report_dict["last_duration"] = round(float(last_duration), 3) report_dict["last_result"] = str(last_result) report_dict["earliest"] = str(earliest) report_dict["latest"] = str(latest) # persistent report_dict["persistent"] = "False" # sort report_dict alphabetically tenant_objects_exec_summary[report] = dict(sorted(report_dict.items())) except Exception as e: # set the dict summary_dict = { report: { "component": component, "last_status": last_status, "last_exec": last_exec, "last_duration": last_duration, "last_result": last_result, "earliest": earliest, "latest": latest, } } # log logging.debug( f'tenant_id="{tenant_id}", component="{component}", report="{report}", Adding a new record to the dictionnary, summary_dict="{json.dumps(summary_dict, indent=1)}"' ) # Update with a new record tenant_objects_exec_summary.update(summary_dict) # Empty dict else: # Set the dict tenant_objects_exec_summary = { report: { "component": component, "last_status": last_status, "last_exec": last_exec, "last_duration": last_duration, "last_result": last_result, "earliest": earliest, "latest": latest, } } # log logging.debug( f'tenant_id="{tenant_id}", component="{component}", report="{report}", Creating a new dictionnary, tenant_objects_exec_summary="{json.dumps(tenant_objects_exec_summary, indent=1)}"' ) # logging debug logging.debug( f'tenant_id="{tenant_id}", component="{component}", report="{report}", Ended processing, tenant_objects_exec_summary="{tenant_objects_exec_summary}"' ) try: vtenant_record["tenant_objects_exec_summary"] = json.dumps( tenant_objects_exec_summary, indent=2 ) collection_vtenants.data.update( str(vtenant_key), json.dumps(vtenant_record) ) except Exception as e: logging.error( f'failure while trying to update the vtenant KVstore record, exception="{str(e)}"' ) # register the tenant object summary status (persistent) def trackme_register_tenant_object_summary_gen_persistent( session_key, splunkd_uri, tenant_id, component, report, last_status, last_exec, last_duration, last_result, earliest, latest, ): parsed_url = urllib.parse.urlparse(splunkd_uri) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) # Register the object summary in the vtenant collection collection_vtenants_name = "kv_trackme_virtual_tenants" collection_vtenants = service.kvstore[collection_vtenants_name] # Define the KV query search string query_string = { "tenant_id": tenant_id, } # log logging.debug( f'Starting function trackme_register_tenant_object_summary_from_splunkremotesearch, tenant_id="{tenant_id}", component="{component}", report="{report}", last_exec="{last_exec}", last_status="{last_status}", last_duration="{last_duration}", last_result="{last_result}", earliest="{earliest}", latest="{latest}"' ) try: vtenant_record = collection_vtenants.data.query(query=json.dumps(query_string))[ 0 ] vtenant_key = vtenant_record.get("_key") logging.debug( f'The vtenant_key was successfully found in the collection, query_string="{query_string}", originating_report="{report}"' ) except Exception as e: vtenant_key = None logging.error( f'The vtenant_key was not found in the collection, query_string="{query_string}", originating_report="{report}"' ) if vtenant_key: # try to load the dict try: tenant_objects_exec_summary = json.loads( vtenant_record.get("tenant_objects_exec_summary") ) except Exception as e: tenant_objects_exec_summary = None # logging debug logging.debug( f'tenant_id="{tenant_id}", component="{component}", report="{report}", Retrieve tenant_objects_exec_summary="{tenant_objects_exec_summary}"' ) # add to existing disct if tenant_objects_exec_summary and tenant_objects_exec_summary != "None": try: # log logging.debug( f'tenant_id="{tenant_id}", component="{component}", report="{report}", Updating the existing record in the dictionnary, summary_dict="{json.dumps(tenant_objects_exec_summary, indent=1)}"' ) report_dict = tenant_objects_exec_summary[report] # Update the existing record in the dict report_dict["component"] = str(component) report_dict["last_status"] = str(last_status) report_dict["last_exec"] = str(last_exec) report_dict["last_duration"] = round(float(last_duration), 3) report_dict["last_result"] = str(last_result) report_dict["earliest"] = str(earliest) report_dict["latest"] = str(latest) # persistent report_dict["persistent"] = "True" # sort report_dict alphabetically tenant_objects_exec_summary[report] = dict(sorted(report_dict.items())) # generate metrics components_register_metrics_gen_start = time.time() # convert the value of last_status from a string to a numerical: # - 1: success # - 2: failure # - 3: unknown if last_status == "success": last_status_num = 1 elif last_status == "failure": last_status_num = 2 else: last_status_num = 3 # check that last_duration is a positive numerical, integer or float, otherqise set it to 0 if last_duration: try: last_duration = round(float(last_duration), 3) except Exception as e: last_duration = 0 else: last_duration = 0 try: components_register_metrics = ( trackme_components_register_gen_metrics( session_key, splunkd_uri, tenant_id, [ { "tenant_id": tenant_id, "component": component, "tracker": report, "metrics_event": { "status": last_status_num, "runtime": last_duration, }, } ], ) ) logging.info( f'context="components_register_gen_metrics", tenant_id="{tenant_id}", function trackme_register_tenant_object_summary_from_splunkremotesearch success {components_register_metrics}, run_time={round(time.time()-components_register_metrics_gen_start, 3)}' ) except Exception as e: logging.error( f'context="components_register_gen_metrics", tenant_id="{tenant_id}", function trackme_register_tenant_object_summary_from_splunkremotesearch failed with exception {str(e)}' ) except Exception as e: # set the dict summary_dict = { report: { "component": component, "last_status": last_status, "last_exec": last_exec, "last_duration": last_duration, "last_result": last_result, "earliest": earliest, "latest": latest, } } # log logging.debug( f'tenant_id="{tenant_id}", component="{component}", report="{report}", Adding a new record to the dictionnary, summary_dict="{json.dumps(summary_dict, indent=1)}"' ) # Update with a new record tenant_objects_exec_summary.update(summary_dict) # Empty dict else: # Set the dict tenant_objects_exec_summary = { report: { "component": component, "last_status": last_status, "last_exec": last_exec, "last_duration": last_duration, "last_result": last_result, "earliest": earliest, "latest": latest, } } # log logging.debug( f'tenant_id="{tenant_id}", component="{component}", report="{report}", Creating a new dictionnary, tenant_objects_exec_summary="{json.dumps(tenant_objects_exec_summary, indent=1)}"' ) # logging debug logging.debug( f'tenant_id="{tenant_id}", component="{component}", report="{report}", Ended processing, tenant_objects_exec_summary="{tenant_objects_exec_summary}"' ) try: vtenant_record["tenant_objects_exec_summary"] = json.dumps( tenant_objects_exec_summary, indent=2 ) collection_vtenants.data.update( str(vtenant_key), json.dumps(vtenant_record) ) except Exception as e: logging.error( f'failure while trying to update the vtenant KVstore record, exception="{str(e)}"' ) # return the tenant object summary status for the last execution registered def trackme_return_tenant_object_summary( session_key, splunkd_uri, tenant_id, component, report ): parsed_url = urllib.parse.urlparse(splunkd_uri) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) # Register the object summary in the vtenant collection collection_vtenants_name = "kv_trackme_virtual_tenants" collection_vtenants = service.kvstore[collection_vtenants_name] # Define the KV query search string query_string = { "tenant_id": tenant_id, } # log logging.debug( f'Starting function trackme_return_tenant_object_summary, tenant_id="{tenant_id}", component="{component}", report="{report}"' ) try: vtenant_record = collection_vtenants.data.query(query=json.dumps(query_string))[ 0 ] vtenant_key = vtenant_record.get("_key") logging.debug( f'The vtenant_key was successfully found in the collection, query_string="{query_string}", originating_report="{report}"' ) except Exception as e: vtenant_key = None logging.error( f'The vtenant_key was not found in the collection, query_string="{query_string}", originating_report="{report}"' ) if vtenant_key: # try to load the dict try: tenant_objects_exec_summary = json.loads( vtenant_record.get("tenant_objects_exec_summary") ) except Exception as e: tenant_objects_exec_summary = None # load if tenant_objects_exec_summary and tenant_objects_exec_summary != "None": # logging debug logging.debug( f'function trackme_return_tenant_object_summary, tenant_id="{tenant_id}", component="{component}", report="{report}", Retrieve tenant_objects_exec_summary="{json.dumps(tenant_objects_exec_summary.get(report), indent=2)}"' ) # return the dict return tenant_objects_exec_summary.get(report) # Empty dict else: return { "component": component, "last_status": "unknown", "last_exec": "unknown", "last_duration": "unknown", "last_result": "unknown", "earliest": "unknown", "latest": "unknown", } # delete a tenant object summary record def trackme_delete_tenant_object_summary( session_key, splunkd_uri, tenant_id, component, report ): parsed_url = urllib.parse.urlparse(splunkd_uri) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) # Register the object summary in the vtenant collection collection_vtenants_name = "kv_trackme_virtual_tenants" collection_vtenants = service.kvstore[collection_vtenants_name] # Define the KV query search string query_string = { "tenant_id": tenant_id, } # log logging.debug( f'Starting function trackme_return_tenant_object_summary, tenant_id="{tenant_id}", component="{component}", report="{report}"' ) try: vtenant_record = collection_vtenants.data.query(query=json.dumps(query_string))[ 0 ] vtenant_key = vtenant_record.get("_key") logging.debug( f'The vtenant_key was successfully found in the collection, query_string="{query_string}", originating_report="{report}"' ) except Exception as e: vtenant_key = None logging.error( f'The vtenant_key was not found in the collection, query_string="{query_string}", originating_report="{report}"' ) if vtenant_key: # try to load the dict try: tenant_objects_exec_summary = json.loads( vtenant_record.get("tenant_objects_exec_summary") ) except Exception as e: tenant_objects_exec_summary = None # load if tenant_objects_exec_summary and tenant_objects_exec_summary != "None": # logging debug logging.debug( f'function trackme_return_tenant_object_summary, tenant_id="{tenant_id}", component="{component}", report="{report}", Retrieve tenant_objects_exec_summary="{json.dumps(tenant_objects_exec_summary.get(report), indent=2)}"' ) # delete the record from the dict tenant_objects_exec_summary.pop(report, None) # update vtenant_record["tenant_objects_exec_summary"] = json.dumps( tenant_objects_exec_summary, indent=2 ) # update the KVstore record try: collection_vtenants.data.update( str(vtenant_key), json.dumps(vtenant_record) ) logging.info( f'function trackme_return_tenant_object_summary, tenant_id="{tenant_id}", report="{report}", register summary record="{json.dumps(vtenant_record, indent=2)}" was successfully purged' ) return "success" except Exception as e: logging.error( f'function trackme_return_tenant_object_summary, tenant_id="{tenant_id}", report="{report}", Failure to remove the register summary record="{json.dumps(vtenant_record, indent=2)}", exception="{str(e)}"' ) return "failure" # Empty dict else: logging.info( f'function trackme_return_tenant_object_summary, found no record to be purged in the register object summary for tenant_id="{tenant_id}"' ) # Return the Elastic Source search to be executed depending on the various options def trackme_return_elastic_exec_search( search_mode, search_constraint, object, data_index, data_sourcetype, tenant_id, register_component, wrapper_name, ): # init remote remote = False # init core_search core_search = None # if search_mode starts by remote_ if search_mode.startswith("remote_"): # set remote to True remote = True # extract using rex logging.debug(f'search_constraint="{search_constraint}"') remote_matches = re.match( r"(account\=\s{0,}\"{0,1}[^\|]+\"{0,1})\s{0,}\|\s{0,}(.*)", search_constraint, ) if remote_matches: remote_target = remote_matches.group(1).replace('\\"', '"') search_constraint = remote_matches.group(2) else: logging.error( f'invalid search, account or search constraint could not be extracted, search_constraint="{search_constraint}"' ) raise Exception( f'invalid search, account or search constraint could not be extracted, search_constraint="{search_constraint}"' ) if search_mode in ("tstats", "remote_tstats"): core_search = remove_leading_spaces( f"""\ | tstats max(_indextime) as data_last_ingest, min(_time) as data_first_time_seen, max(_time) as data_last_time_seen, count as data_eventcount, dc(host) as dcount_host where {search_constraint} by _time, index, sourcetype span=1s | eval data_last_ingestion_lag_seen=data_last_ingest-data_last_time_seen | eval spantime=data_last_ingest | eventstats max(data_last_time_seen) as data_last_time_seen, max(dcount_host) as global_dcount_host | eval spantime=if(spantime>=(now()-300), spantime, null()) | eventstats sum(data_eventcount) as eventcount_5m, avg(data_last_ingestion_lag_seen) as latency_5m, avg(dcount_host) as dcount_host_5m by spantime | stats latest(eventcount_5m) as latest_eventcount_5m, avg(eventcount_5m) as avg_eventcount_5m, stdev(eventcount_5m) as stdev_eventcount_5m, perc95(eventcount_5m) as perc95_eventcount_5m, latest(latency_5m) as latest_latency_5m, avg(latency_5m) as avg_latency_5m, stdev(latency_5m) as stdev_latency_5m, perc95(latency_5m) as perc95_latency_5m, latest(dcount_host_5m) as latest_dcount_host_5m, avg(dcount_host_5m) as avg_dcount_host_5m, stdev(dcount_host_5m) as stdev_dcount_host_5m, perc95(dcount_host_5m) as perc95_dcount_host_5m, max(data_last_ingest) as data_last_ingest, min(data_first_time_seen) as data_first_time_seen, max(data_last_time_seen) as data_last_time_seen, avg(data_last_ingestion_lag_seen) as data_last_ingestion_lag_seen, sum(data_eventcount) as data_eventcount, first(global_dcount_host) as global_dcount_host | eval dcount_host=round(global_dcount_host, 0) | eval data_last_ingestion_lag_seen=round(data_last_ingestion_lag_seen, 0) | eval object="{object}", data_index="{data_index}", data_sourcetype="{data_sourcetype}" """ ) elif search_mode in ("raw", "remote_raw"): core_search = remove_leading_spaces( f"""\ {search_constraint} | eval ingest_lag=_indextime-_time | eventstats max(_indextime) as data_last_ingest, max(_time) as data_last_time_seen | eval spantime=data_last_ingest | eval spantime=if(spantime>=(now()-300), spantime, null()) | eventstats count as eventcount_5m, avg(ingest_lag) as latency_5m, dc(host) as dcount_host_5m by spantime | stats latest(eventcount_5m) as latest_eventcount_5m, avg(eventcount_5m) as avg_eventcount_5m, stdev(eventcount_5m) as stdev_eventcount_5m, perc95(eventcount_5m) as perc95_eventcount_5m, latest(latency_5m) as latest_latency_5m, avg(latency_5m) as avg_latency_5m, stdev(latency_5m) as stdev_latency_5m, perc95(latency_5m) as perc95_latency_5m, latest(dcount_host_5m) as latest_dcount_host_5m, avg(dcount_host_5m) as avg_dcount_host_5m, stdev(dcount_host_5m) as stdev_dcount_host_5m, perc95(dcount_host_5m) as perc95_dcount_host_5m, max(_indextime) as data_last_ingest, min(_time) as data_first_time_seen, max(_time) as data_last_time_seen, avg(ingest_lag) as data_last_ingestion_lag_seen, count as data_eventcount, dc(host) as global_dcount_host | eval dcount_host=round(global_dcount_host, 0) | eval data_last_ingestion_lag_seen=round(data_last_ingestion_lag_seen, 0) | eval object="{object}", data_index="{data_index}", data_sourcetype="{data_sourcetype}" """ ) elif search_mode in ("mpreview", "remote_mpreview"): core_search = remove_leading_spaces( f"""\ | mpreview {search_constraint} | eventstats max(_time) as data_last_time_seen | eval spantime=data_last_time_seen | eval spantime=if(spantime>=(now()-300), spantime, null()) | eventstats count as eventcount_5m, dc(host) as dcount_host_5m by spantime | stats latest(eventcount_5m) as latest_eventcount_5m, avg(eventcount_5m) as avg_eventcount_5m, stdev(eventcount_5m) as stdev_eventcount_5m, perc95(eventcount_5m) as perc95_eventcount_5m, latest(latency_5m) as latest_latency_5m, avg(latency_5m) as avg_latency_5m, stdev(latency_5m) as stdev_latency_5m, perc95(latency_5m) as perc95_latency_5m, latest(dcount_host_5m) as latest_dcount_host_5m, avg(dcount_host_5m) as avg_dcount_host_5m, stdev(dcount_host_5m) as stdev_dcount_host_5m, perc95(dcount_host_5m) as perc95_dcount_host_5m, max(_indextime) as data_last_ingest, min(_time) as data_first_time_seen, max(_time) as data_last_time_seen, count as data_eventcount, dc(host) as global_dcount_host | eval data_last_ingest=data_last_time_seen | eval dcount_host=round(global_dcount_host, 0) | eval object="{object}", data_index="{data_index}", data_sourcetype="{data_sourcetype}" """ ) elif search_mode in ("from", "remote_from"): if re.match("datamodel:", str(search_constraint)): core_search = remove_leading_spaces( f"""\ | from {search_constraint} | eval ingest_lag=_indextime-_time | eventstats max(_indextime) as data_last_ingest, max(_time) as data_last_time_seen | eval spantime=data_last_ingest | eval spantime=if(spantime>=(now()-300), spantime, null()) | eventstats count as eventcount_5m, avg(ingest_lag) as latency_5m, dc(host) as dcount_host_5m by spantime | stats latest(eventcount_5m) as latest_eventcount_5m, avg(eventcount_5m) as avg_eventcount_5m, stdev(eventcount_5m) as stdev_eventcount_5m, perc95(eventcount_5m) as perc95_eventcount_5m, latest(latency_5m) as latest_latency_5m, avg(latency_5m) as avg_latency_5m, stdev(latency_5m) as stdev_latency_5m, perc95(latency_5m) as perc95_latency_5m, latest(dcount_host_5m) as latest_dcount_host_5m, avg(dcount_host_5m) as avg_dcount_host_5m, stdev(dcount_host_5m) as stdev_dcount_host_5m, perc95(dcount_host_5m) as perc95_dcount_host_5m, max(_indextime) as data_last_ingest, min(_time) as data_first_time_seen, max(_time) as data_last_time_seen, avg(ingest_lag) as data_last_ingestion_lag_seen, count as data_eventcount, dc(host) as global_dcount_host | eval dcount_host=round(global_dcount_host, 0) | eval data_last_ingestion_lag_seen=round(data_last_ingestion_lag_seen, 0) | eval object="{object}", data_index="{data_index}", data_sourcetype="{data_sourcetype}" """ ) if re.match("lookup:", str(search_constraint)): core_search = remove_leading_spaces( f"""\ | from {search_constraint} | eventstats max(_time) as indextime | eval _indextime=if(isnum(_indextime), _indextime, indextime) | fields - indextime | eval host=if(isnull(host), "none", host) | stats max(_indextime) as data_last_ingest, min(_time) as data_first_time_seen, max(_time) as data_last_time_seen, count as data_eventcount, dc(host) as dcount_host | eval latest_eventcount_5m=data_eventcount | eval object="{object}", data_index="{data_index}", data_sourcetype="{data_sourcetype}" """ ) elif search_mode in ("mstats", "remote_mstats"): core_search = remove_leading_spaces( f"""\ | mstats latest(_value) as value where {search_constraint} by host, metric_name span=1m | stats min(_time) as data_first_time_seen, max(_time) as data_last_time_seen, dc(metric_name) as data_eventcount, dc(host) as global_dcount_host | eval data_last_ingest=data_last_time_seen, data_last_ingestion_lag_seen=now()-data_last_time_seen | eval object="{object}", data_index="{data_index}", data_sourcetype="{data_sourcetype}" """ ) # Conditional components based on register_component register_component_part = ( f' register_component="True" tenant_id="{tenant_id}" component="splk-dsm" report="{wrapper_name}"' if register_component == "True" else "" ) # Final assembly of the query, including handling for remote mode if remote: # escape double quotes in core_search core_search = core_search.replace('"', '\\"') query = remove_leading_spaces( f"""\ | splunkremotesearch {remote_target} search="{core_search}" {register_component_part} | `trackme_elastic_dedicated_tracker("{tenant_id}")` | eval tenant_id="{tenant_id}" | stats count as report_entities_count by tenant_id | `register_tenant_component_summary({tenant_id}, dsm)` """ ) else: # Standard query format for non-remote mode query = remove_leading_spaces( f"""\ {core_search} | `trackme_elastic_dedicated_tracker("{tenant_id}")` | eval tenant_id="{tenant_id}" | stats count as report_entities_count by tenant_id | `register_tenant_component_summary({tenant_id}, dsm)` """ ) if not core_search: error_msg = f'search_mode="{search_mode}", search_constraint="{search_constraint}", data_index="{data_index}", data_sourcetype="{data_sourcetype}", wrapper_name="{wrapper_name}", register_component="{register_component}", failed to generate a valid search' logging.error(error_msg) raise Exception(error_msg) return query def trackme_register_tenant_component_summary( session_key, splunkd_uri, tenant_id, component ): # if the component is submitted with a prefix, extract the component name component_segments = component.split("-") if len(component_segments) >= 2: extracted_component = component_segments[1] else: extracted_component = component # Define an header for requests authenticated communications with splunkd header = { "Authorization": f"Splunk {session_key}", "Content-Type": "application/json", } # data data = { "tenant_id": tenant_id, "component": extracted_component, } # Add the vtenant account url = f"{splunkd_uri}/services/trackme/v2/component/write/component_summary_update" # Proceed try: response = requests.post( url, headers=header, data=json.dumps(data), verify=False, timeout=600, ) if response.status_code not in (200, 201, 204): msg = f'tenant_id="{tenant_id}", component="{extracted_component}", function trackme_register_tenant_component_summary has failed, response.status_code="{response.status_code}", response.text="{response.text}"' raise Exception(msg) else: logging.info( f'tenant_id="{tenant_id}", component="{extracted_component}", function trackme_register_tenant_component_summary has succeeded, response.status_code="{response.status_code}", response.json="{json.dumps(response.json(), indent=2)}"' ) return response.json() except Exception as e: error_msg = f'tenant_id="{tenant_id}", component="{extracted_component}", function trackme_register_tenant_component_summary has failed, exception="{str(e)}"' raise Exception(error_msg) def trackme_send_to_tcm(session_key, splunkd_uri, resp_dict, http_mode, http_service): """ Send the transaction to TrackMe Configuration Manager """ # Ensure splunkd_uri starts with "https://" if not splunkd_uri.startswith("https://"): splunkd_uri = f"https://{splunkd_uri}" # Build header and target URL headers = CaseInsensitiveDict() headers["Authorization"] = f"Splunk {session_key}" headers["Content-Type"] = "application/json" target_url = f"{splunkd_uri}/services/trackme_conf_manager/v1/conf_manager_receiver" # Create a requests session for better performance session = requests.Session() session.headers.update(headers) data = { "transaction_request": resp_dict, "transaction_http_mode": http_mode, "transaction_http_service": http_service, } try: # Use a context manager to handle the request with session.post(target_url, data=json.dumps(data), verify=False) as response: if response.ok: logging.debug( f'Success sending the transaction to TCM, data="{response}"' ) response_json = response.json() return response_json else: error_message = f'Failed to send the transaction to TCM, status_code={response.status_code}, response_text="{response.text}"' logging.error(error_message) raise Exception(error_message) except Exception as e: error_message = f'Failed to send the transaction to TCM, exception="{str(e)}"' logging.error(error_message) raise Exception(error_message) def run_splunk_search(service, search_query, search_params, max_retries, sleep_time=5, sample_ratio=None): """ Executes a Splunk search with a retry mechanism and progressive backoff. :param search_query: The Splunk search query to execute. :param search_params: Parameters for the search query. :param max_retries: Maximum number of retries for the search. :param sleep_time: Base time to wait between retries in seconds. :param sample_ratio: The sample ratio to use for the search. :return: A reader object with the search results. """ # ensure preview is set to False in search_params or results may appear to be duplicated search_params["preview"] = False # if sample_ratio is provided, set the sample_ratio in search_params if sample_ratio: search_params["sample_ratio"] = sample_ratio current_retries = 0 total_wait_time = 0 # Track total time spent waiting max_total_wait_time = 900 # 15 minutes in seconds last_exception = None # Track the last exception that occurred while current_retries < max_retries: try: search_results = service.jobs.export(search_query, **search_params) return results.JSONResultsReader(search_results) except Exception as e: last_exception = str(e) # Store the exception message if "maximum number of concurrent historical searches" in str( e ) or "This search could not be dispatched because the role-based concurrency limit of historical searches" in str( e ): current_retries += 1 # Calculate progressive backoff sleep time # Use linear progression to maximize attempts within 15-minute limit # Target: 24 attempts within 900 seconds, starting at 10s, ending at ~50s progressive_sleep_time = ( 10 + (current_retries - 1) * 1.8 ) # Linear progression progressive_sleep_time = min(progressive_sleep_time, 120) # Cap at 120s # Check if this sleep would exceed the 15-minute total wait time limit if total_wait_time + progressive_sleep_time > max_total_wait_time: logging.error( f'function run_splunk_search, would exceed 15-minute total wait time limit, stopping after {current_retries} retries, total wait time={total_wait_time:.1f}s, search_query="{search_query}"' ) raise Exception( f'function run_splunk_search, would exceed 15-minute total wait time limit, stopping after {current_retries} retries, total wait time={total_wait_time:.1f}s, search_query="{search_query}"' ) logging.warn( f'function run_splunk_search, temporary search failure, retry {current_retries}/{max_retries} for Splunk search due to error="{str(e)}", will re-attempt in {progressive_sleep_time:.1f} seconds (progressive backoff), total wait time so far={total_wait_time:.1f}s.' ) time.sleep(progressive_sleep_time) total_wait_time += progressive_sleep_time else: logging.error( f'function run_splunk_search, permanent search failure, search failed with exception="{str(e)}", search_query="{search_query}", search_params="{search_params}"' ) raise raise Exception( f'function run_splunk_search, permanent search failure after reaching max retries, last_exception="{last_exception}", attempt="{current_retries}", max_retries="{max_retries}", total_wait_time="{total_wait_time:.1f}s", search_query="{search_query}", search_params="{search_params}"' ) def get_kv_collection(collection, collection_name): """ Get all records from a KVstore collection. :param collection: The KVstore collection object. :param collection_name: The name of the collection to query. :return: A tuple containing the records, keys, and a dictionary of the records. """ start_time = time.time() collection_records = [] collection_records_keys = set() collection_dict = {} try: end = False skip_tracker = 0 while not end: process_collection_records = collection.data.query(skip=skip_tracker) if len(process_collection_records) == 0: end = True else: for item in process_collection_records: if item.get("_key") not in collection_records_keys: collection_records.append(item) collection_records_keys.add(item["_key"]) collection_dict[item["_key"]] = item skip_tracker += 1000 logging.info( f'context="perf", KVstore select terminated, no_records="{len(collection_records)}", run_time="{round((time.time() - start_time), 3)}", collection="{collection_name}"' ) return collection_records, collection_records_keys, collection_dict except Exception as e: logging.error( f"failed to call get_kv_collection, args={collection_name}, exception={str(e)}" ) raise Exception(str(e)) # Get emails delivery account credentials, designed to be used for a least privileges approach in a programmatic approach def trackme_get_emails_account(reqinfo, account): # get service service = client.connect( owner="nobody", app="trackme", port=reqinfo.server_rest_port, token=reqinfo.system_authtoken, timeout=600, ) # Splunk credentials store storage_passwords = service.storage_passwords # get all acounts accounts = [] conf_file = "trackme_emails" # if there are no account, raise an exception, otherwise what we would do here? try: confs = service.confs[str(conf_file)] except Exception as e: error_msg = "We have no emails delivery account configured yet" raise Exception(error_msg) for stanza in confs: # get all accounts for name in stanza.name: accounts.append(stanza.name) break # email account configuration isfound = False email_server = None email_username = None email_password = None email_security = None allowed_email_domains = None sender_email = None email_format = None email_footer = None # get account for stanza in confs: if stanza.name == str(account): isfound = True for key, value in stanza.content.items(): if key == "email_server": email_server = value if key == "email_username": email_username = value if key == "email_security": email_security = value if key == "allowed_email_domains": allowed_email_domains = value if key == "sender_email": sender_email = value if key == "email_format": email_format = value if key == "email_footer": email_footer = value # end of get configuration # Stop here if we cannot find the submitted account if not isfound: error_msg = f'The account="{account}" has not been configured on this instance, cannot proceed!' raise Exception( { "status": "failure", "message": error_msg, "account": account, } ) # get the email password, if any if email_username and email_server != "localhost:25": try: credential_realm = "__REST_CREDENTIAL__#trackme#configs/conf-trackme_emails" credential_name = f"{credential_realm}:{account}``" for credential in storage_passwords: if ( credential.content.get("realm") == str(credential_realm) and credential.name.startswith(credential_name) and "email_password" in credential.content.clear_password ): email_password = json.loads(credential.content.clear_password).get( "email_password" ) except Exception as e: email_password = None # render return { "account": account, "email_server": email_server, "email_username": email_username, "email_password": email_password, "email_security": email_security, "allowed_email_domains": allowed_email_domains, "sender_email": sender_email, "email_format": email_format, "email_footer": email_footer, } def trackme_check_report_exists(session_key, splunkd_uri, tenant_id, report_name): """ Check if a report exists in Splunk. :param session_key: Splunk session key :param splunkd_uri: Splunkd URI :param tenant_id: Tenant ID :param report_name: Name of the report to check :return: True if report exists, False otherwise """ parsed_url = urllib.parse.urlparse(splunkd_uri) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) try: savedsearch = service.saved_searches[report_name] logging.info(f'tenant_id="{tenant_id}", report exists, report_name="{report_name}"') return True except Exception as e: logging.info(f'tenant_id="{tenant_id}", report does not exist, report_name="{report_name}", exception="{str(e)}"') return False def trackme_check_kvcollection_exists(session_key, splunkd_uri, tenant_id, collection_name): """ Check if a KVstore collection exists in Splunk. :param session_key: Splunk session key :param splunkd_uri: Splunkd URI :param tenant_id: Tenant ID :param collection_name: Name of the collection to check :return: True if collection exists, False otherwise """ parsed_url = urllib.parse.urlparse(splunkd_uri) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) try: collection = service.kvstore[collection_name] # Try to access the collection to verify it exists collection.data.query() logging.info(f'tenant_id="{tenant_id}", collection exists, collection_name="{collection_name}"') return True except Exception as e: logging.info(f'tenant_id="{tenant_id}", collection does not exist, collection_name="{collection_name}", exception="{str(e)}"') return False def trackme_check_kvtransform_exists(session_key, splunkd_uri, tenant_id, transform_name): """ Check if a KVstore transform exists in Splunk. :param session_key: Splunk session key :param splunkd_uri: Splunkd URI :param tenant_id: Tenant ID :param transform_name: Name of the transform to check :return: True if transform exists, False otherwise """ parsed_url = urllib.parse.urlparse(splunkd_uri) # get service service = client.connect( owner="nobody", app="trackme", port=parsed_url.port, token=session_key, timeout=600, ) try: transforms = service.confs["transforms"] transform = transforms[transform_name] logging.info(f'tenant_id="{tenant_id}", transform exists, transform_name="{transform_name}"') return True except Exception as e: logging.info(f'tenant_id="{tenant_id}", transform does not exist, transform_name="{transform_name}", exception="{str(e)}"') return False class TrackMeRemoteConnectionError(Exception): def __init__(self, error_info): self.error_info = error_info super().__init__(str(error_info))