You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
309 lines
11 KiB
309 lines
11 KiB
# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
|
|
import json
|
|
import xml.etree.ElementTree as ET
|
|
import time
|
|
import asyncio
|
|
|
|
try:
|
|
import http.client as httplib
|
|
except ImportError:
|
|
import httplib
|
|
|
|
import sys
|
|
|
|
from splunk.clilib.bundle_paths import make_splunkhome_path
|
|
sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib']))
|
|
sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-UserAccess', 'lib']))
|
|
|
|
# Process .pth files
|
|
import site
|
|
site.addsitedir(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib', 'SA_ITOA_app_common']))
|
|
|
|
from SA_ITOA_app_common.solnlib.conf_manager import ConfManager
|
|
from splunk import RESTException
|
|
from ITOA.event_management.event_onboarding_utils import EventOnboardingUtils
|
|
from ITOA.setup_logging import getLogger
|
|
from ITOA.rest_interface_provider_base import SplunkdRestInterfaceBase
|
|
from ITOA.itoa_exceptions import ItoaValidationError
|
|
from splunk.persistconn.application import PersistentServerConnectionApplication
|
|
from itsi.itoa_rest_interface_provider.itoa_rest_interface_provider import ItoaInterfaceProvider
|
|
from ITOA.controller_utils import ITOAError, itoa_response_headers
|
|
from ITOA.storage.statestore import StateStoreError
|
|
from user_access_errors import UserAccessError
|
|
from ITOA.itoa_exceptions import ItoaError
|
|
from itsi.itsi_utils import ITOAInterfaceUtils
|
|
|
|
try:
|
|
from SA_ITOA_app_common.splunklib import client
|
|
from SA_ITOA_app_common.splunklib import results
|
|
from SA_ITOA_app_common.solnlib.splunk_rest_client import _request_handler
|
|
except ImportError as err:
|
|
print('*** ERROR ***')
|
|
print(err)
|
|
sys.exit(1)
|
|
|
|
import itsi_path
|
|
import itsi_py3
|
|
|
|
from itsi.rest_handler import rest_interface_splunkd
|
|
from itsi.rest_handler.rest_interface_splunkd import route
|
|
|
|
logger = getLogger()
|
|
|
|
|
|
class EventManagementTelemetryInterfaceProviderSplunkd(ItoaInterfaceProvider):
|
|
def __init__(self, session_key, current_user, rest_method):
|
|
"""
|
|
The decorator invoked wrapper for the decorated function (REST handler)
|
|
This wrapper does the access check on the REST request and throws an exception if access is denied
|
|
|
|
@type: string
|
|
@param session_key: the splunkd session key for the request
|
|
|
|
@type: string
|
|
@param current_user: current user invoking the request
|
|
|
|
@type: string
|
|
@param: type of REST method of this request, GET/PUT/POST/DELETE
|
|
"""
|
|
self._setup(session_key, current_user, rest_method)
|
|
self.service = client.connect(token=self._session_key, handler=_request_handler({}))
|
|
|
|
def event_onboarding_telemetry(self):
|
|
"""
|
|
Gets the usage summary of data integrations in a user's environment
|
|
"""
|
|
|
|
event_onboarding_utils = EventOnboardingUtils(self._session_key, self._current_user)
|
|
connections = event_onboarding_utils.get_all_connection_objects()
|
|
|
|
summary = {}
|
|
connection_types = ['generic', 'nagios', 'solarwinds', 'o11y', 'scom', 'appdynamics', 'thousandeyes', 'cloudtrail']
|
|
|
|
for connection_type in connection_types:
|
|
summary[connection_type] = {
|
|
'active': 0,
|
|
'inactive': 0,
|
|
'titles': []
|
|
}
|
|
for connection in connections:
|
|
data_source = connection['data_source']
|
|
title = connection['title']
|
|
status = connection['status']
|
|
|
|
if status == 'active':
|
|
summary[data_source]['active'] += 1
|
|
else:
|
|
summary[data_source]['inactive'] += 1
|
|
summary[data_source]['titles'].append(title)
|
|
|
|
return json.dumps({'data': summary})
|
|
|
|
@staticmethod
|
|
async def wait_for_job(search_job, maxtime=10):
|
|
"""
|
|
Wait up to maxtime seconds for search_job to finish. If maxtime is
|
|
negative, waits forever. Returns true, if job finished.
|
|
"""
|
|
pause = 0.2
|
|
lapsed = 0.0
|
|
while not search_job.is_done():
|
|
time.sleep(pause)
|
|
lapsed += pause
|
|
if maxtime >= 0 and lapsed > maxtime:
|
|
break
|
|
return search_job.is_done()
|
|
|
|
@staticmethod
|
|
async def get_search_output(job, job_results):
|
|
"""
|
|
Waits for the search job to complete and returns its contents
|
|
@param job: splunk search job
|
|
@param job_results: list of job results
|
|
@return:
|
|
"""
|
|
await EventManagementTelemetryInterfaceProviderSplunkd.wait_for_job(job)
|
|
rr = results.JSONResultsReader(job.results(output_mode='json'))
|
|
for result in rr:
|
|
if isinstance(result, dict):
|
|
# Normal events are returned as dicts
|
|
job_results.append(result)
|
|
job_results.append({})
|
|
|
|
async def nats_telemetry(self):
|
|
"""
|
|
Get NATS usage statistics
|
|
@return:
|
|
"""
|
|
|
|
cfm = ConfManager(self._session_key, 'SA-ITOA')
|
|
conf = cfm.get_conf('itsi_event_management')
|
|
telemetry = conf.get('telemetry')
|
|
latency_query = telemetry['latency_query']
|
|
queue_enabled_query = telemetry['queue_enabled_query']
|
|
cpu_mem_query = telemetry['cpu_mem_query']
|
|
backfill_rate_query = telemetry['backfill_rate_query']
|
|
events_processed_rate_query = telemetry['events_processed_rate_query']
|
|
messages_pushed_to_nats_rate_query = telemetry['messages_pushed_to_nats_rate_query']
|
|
rules_engine_start_stop_query = telemetry['rules_engine_start_stop_query']
|
|
|
|
query_list = [latency_query,
|
|
queue_enabled_query,
|
|
cpu_mem_query,
|
|
backfill_rate_query,
|
|
events_processed_rate_query,
|
|
messages_pushed_to_nats_rate_query,
|
|
rules_engine_start_stop_query]
|
|
job_list = [ITOAInterfaceUtils.run_search(self._session_key, logger, query) for query in query_list]
|
|
|
|
jobs = []
|
|
job_results = []
|
|
for job in job_list:
|
|
jobs.append(asyncio.create_task(self.get_search_output(job, job_results)))
|
|
await asyncio.gather(*jobs)
|
|
|
|
result_json = {
|
|
"eventProcessingLatency": "0",
|
|
"queueEnabled": "0",
|
|
"cpuAverage": "0",
|
|
"memAverage": "0",
|
|
"eventsBackfilledPerMinute": "0",
|
|
"eventsProcessedPerMinute": "0",
|
|
"eventsIngestedPerMinute": "0",
|
|
"rulesEngineStarted": "0",
|
|
"rulesEngineStopped": "0"
|
|
}
|
|
for r in job_results:
|
|
result_json.update(r)
|
|
|
|
return json.dumps({'data' : result_json})
|
|
|
|
|
|
class EventManagementTelemetryInterfaceSplunkd(PersistentServerConnectionApplication, SplunkdRestInterfaceBase):
|
|
"""
|
|
This wrapper class for the REST provider in EventManagementRestProvider which
|
|
handles all access check decorators and passes on to provider to serve
|
|
rest of the request
|
|
"""
|
|
|
|
def __init__(self, command_line, command_arg):
|
|
'''
|
|
Basic constructor
|
|
|
|
@type: string
|
|
@param command_line: command line invoked for handler
|
|
|
|
@type: string
|
|
@param command_arg: args for invoked command line for handler
|
|
'''
|
|
super(EventManagementTelemetryInterfaceSplunkd, self).__init__()
|
|
|
|
def migration_check(self, session_key):
|
|
'''
|
|
Override migration_check in SplunkdRestInterfaceBase
|
|
MigrationInterfaceSplunkd should be accessible during migration and serve request regardless of migration
|
|
running
|
|
Thus override migration_check of SplunkdRestInterfaceBase
|
|
'''
|
|
pass
|
|
|
|
def handle(self, args):
|
|
"""
|
|
Blanket handler for all REST calls on the interface routing the GET/POST/PUT/DELETE requests.
|
|
Derived implementation from PersistentServerConnectionApplication.
|
|
|
|
@type args: json
|
|
@param args: a JSON string representing a dictionary of arguments to the REST call.
|
|
|
|
@rtype: json
|
|
@return: a valid REST response
|
|
"""
|
|
return self._default_handle(args)
|
|
|
|
def _default_handle(self, args):
|
|
"""
|
|
Blanket handler for all REST calls on the interface routing the GET/POST/PUT/DELETE requests.
|
|
Derived implementation from PersistentServerConnectionApplication.
|
|
This is a generic implementation that specific derived implementation could use optionally
|
|
|
|
@type args: json
|
|
@param args: a JSON string representing a dictionary of arguments to the REST call.
|
|
|
|
@rtype: json
|
|
@return: a valid REST response
|
|
"""
|
|
logger.cleanUpContext()
|
|
logger.info('Splunkd REST handler for EA Telemetry received request with args: %s', args)
|
|
|
|
response_status = 500
|
|
response_payload = []
|
|
|
|
try:
|
|
args = json.loads(args)
|
|
self.migration_check(args['session']['authtoken'])
|
|
|
|
result = self._dispatch_to_provider(args)
|
|
|
|
if result is None or isinstance(result, itsi_py3.string_type):
|
|
rest_method = args['method']
|
|
response_status = 200
|
|
if rest_method == 'DELETE':
|
|
response_status = 204
|
|
response_payload = result
|
|
else:
|
|
response_status = 500
|
|
response_payload = {'message': 'Received unexpected results from dispatcher: {}'.format(result)}
|
|
except (ITOAError, UserAccessError) as e:
|
|
logger.exception(e)
|
|
response_status = e.status
|
|
response_payload = self.handle_payload_error(e, e._message)
|
|
except RESTException as e:
|
|
logger.exception(e)
|
|
response_status = e.statusCode
|
|
response_payload = self.handle_payload_error(e, str(e))
|
|
except StateStoreError as e:
|
|
response_status = e.status_code or 500
|
|
response_payload = self.handle_payload_error(e, str(e))
|
|
except ItoaError as e:
|
|
response_status = e.status_code or 500
|
|
response_payload = self.handle_payload_error(e, str(e))
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
response_status = 500
|
|
response_payload = self.handle_payload_error(e, str(e))
|
|
try:
|
|
response_status = int(response_status)
|
|
except (ValueError, TypeError):
|
|
response_status = 500
|
|
|
|
headers = [['Content-Type', 'text/plain']]
|
|
response = {
|
|
'status': response_status,
|
|
'payload': response_payload,
|
|
'headers': headers,
|
|
}
|
|
|
|
return response
|
|
|
|
def _dispatch_to_provider(self, args):
|
|
if not isinstance(args, dict):
|
|
message = f'Invalid REST args received by Data Integrations Template Interface - {args}'
|
|
raise ItoaValidationError(message=message, logger=logger)
|
|
|
|
session_key = args['session']['authtoken']
|
|
current_user = args['session']['user']
|
|
rest_method = args['method']
|
|
|
|
rest_path = args['rest_path']
|
|
|
|
path_parts = rest_path.strip().strip('/').split('/')
|
|
|
|
interface_provider = EventManagementTelemetryInterfaceProviderSplunkd(session_key, current_user,
|
|
rest_method)
|
|
|
|
if len(path_parts) == 3 and path_parts[2] == 'event_onboarding':
|
|
return interface_provider.event_onboarding_telemetry()
|
|
if len(path_parts) == 3 and path_parts[2] == 'nats':
|
|
return asyncio.run(interface_provider.nats_telemetry())
|
|
raise ITOAError(status=404, message=f'Specified REST url/path is invalid - {rest_path}.')
|