You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

330 lines
13 KiB

import json
import os
import re
import sys
from collections import defaultdict
import traceback
# Add the "lib" directory to the Python path so we can import our modules
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "lib"))
# Add the directory where this script resides to the Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from splunk.persistconn.application import PersistentServerConnectionApplication
import splunklib
import splunklib.client as client
from constants import KV_AT_TIME_POLICIES_COLLECTION, MISSING_JOB_ID, METHOD_NOT_ALLOWED, ITSI_SERVICE_ID, ITSI_KPI_ID,\
FIELD_TO_SNAKE_CASE_DICT, JOB_ID_NOT_FOUND, ENTITY_KEY, ENTITY_TITLE, ALL_DATA_RECEIVED, ENTITY_AT_CONFIGURATIONS, \
NA, USE_STATIC, ITSI_SERVICE_ID
from kpis_utils import get_valid_entity_identifier
from util import setup_logging
PATH_INFO = "path_info"
# Set up logger
logger = setup_logging.get_logger()
class ATConfigurationsAPIHandler(PersistentServerConnectionApplication):
"""
The ATConfigurationsAPIHandler class serves as a RESTful API endpoint for handling
machine learning-assisted KPI thresholding configurations within Splunk IT Service Intelligence (ITSI) environment.
The API supports the following HTTP methods:
- GET: Retrieves assisted thresholding (AT) configurations associated with a given job ID
in the Splunk ITSI environment. The job ID should be provided as part of the REST URI.
- DELETE: Deletes the AT configurations associated with a given job ID from the Splunk KV Store.
"""
def __init__(self, _command_line, _command_arg):
super(PersistentServerConnectionApplication, self).__init__()
self.service = None
self.entity_level_processing = False
def handle(self, in_string):
try:
request = json.loads(in_string)
method = request.get("method", "")
rest_path = request.get('rest_path', "")
if rest_path.startswith("/api/v1/kpis_at_configurations/entities"):
self.entity_level_processing = True
job_id = self.extract_job_id(request)
if not job_id:
return self.create_response(400, error=MISSING_JOB_ID)
logger.info(f"Processing request for job_id={job_id}")
self.initialize_service_if_needed(request)
if method == "GET":
transformed_configurations = self.handle_get(job_id)
return self.create_response(200, result=transformed_configurations)
elif method == "DELETE":
self.handle_delete(job_id)
return self.create_response(200, result=f"Job_id {job_id} deleted")
else:
return self.create_response(405, error=METHOD_NOT_ALLOWED)
except splunklib.binding.HTTPError as e:
return self.handle_http_error(e)
except Exception as e:
logger.exception(e)
# the full_traceback provide all traces of the exception e.
full_traceback = traceback.format_exc()
return self.create_response(500, error=f"Server error: {str(e)}, full trace back {full_traceback}")
def handle_get(self, job_id):
transformed_configurations = self.process_request_for_job_id(job_id)
return transformed_configurations
def handle_delete(self, job_id):
try:
collection = self.service.kvstore[KV_AT_TIME_POLICIES_COLLECTION]
return collection.data.delete_by_id(str(job_id))
except splunklib.binding.HTTPError as e:
logger.exception(f"Failed to delete _key={job_id} from KV Store, error: {e}")
raise e
def initialize_service_if_needed(self, request):
if self.service is None:
self.initialize_service(request)
def initialize_service(self, request):
"""Initialize the Splunk service connection."""
try:
session_key = request["session"]["authtoken"]
port = self.extract_management_port(request)
# Prepare arguments for client.Service
service_kwargs = {
"token": session_key,
"owner": "nobody"
}
# Only add port to the arguments if it is not None
if port is not None:
service_kwargs["port"] = port
self.service = client.Service(**service_kwargs)
except Exception as e:
logger.error(f"Failed to initialize Splunk service connection, error={str(e)}")
self.service = None
@staticmethod
def extract_management_port(request):
"""
Extracts the port number from the 'rest_uri' within a request dictionary.
The 'rest_uri' is expected under request['server']['rest_uri'].
Args:
request (dict): The request dictionary containing the server information.
Returns:
int or None: The extracted port number as an integer, or None if not found.
"""
try:
# Extract the rest_uri from the server dictionary
rest_uri = request.get("server", {}).get("rest_uri", "")
match = re.search(r':(\d+)', rest_uri)
if match:
# Convert the matched group to an integer
return int(match.group(1))
else:
# Return None if no port is found in the URI
return None
except ValueError as e:
# Handle cases where the port number is not a valid integer
logger.error(f"Invalid port number in request, error={str(e)}")
return None
@staticmethod
def extract_job_id(request):
path_info = request.get(PATH_INFO)
if not path_info:
return None
return path_info.split('/')[-1].strip()
def process_request_for_job_id(self, job_id):
kpis_at_configurations = self.get_kpis_at_configurations_from_kv_store(job_id)
transformed_configurations = self.transform_data(kpis_at_configurations)
transformed_configurations["job_id"] = job_id
logger.info(f"Processed request for job_id={job_id}")
return transformed_configurations
def handle_http_error(self, error):
if error.status == 404:
return self.create_response(error.status, error=JOB_ID_NOT_FOUND)
raise error
def get_kpis_at_configurations_from_kv_store(self, job_id):
"""
Fetch KPIs AT configurations from KV Store using job_id
"""
try:
collection = self.service.kvstore[KV_AT_TIME_POLICIES_COLLECTION]
return collection.data.query_by_id(str(job_id))
except splunklib.binding.HTTPError as e:
logger.exception(f"Failed to query KV Store for job_id={job_id}, error: {e}")
raise e
@staticmethod
def create_response(status, result=None, error=None):
"""
Create response to send back to client.
"""
return {
"payload": {"result": result, "error": error},
"status": status
}
@staticmethod
def transform_record(record):
"""Transforms a record to have snake_case keys"""
transformed = {}
for key, value in list(record.items()):
# We also exclude ITSI_SERVICE_ID here, we have this field in results,
# but not save it into kv store for consistency
if key not in (ITSI_KPI_ID, ITSI_SERVICE_ID, ENTITY_KEY, ENTITY_TITLE):
transformed[FIELD_TO_SNAKE_CASE_DICT[key]] = value
if key == USE_STATIC:
transformed[FIELD_TO_SNAKE_CASE_DICT[USE_STATIC]] = value == "True"
return transformed
def transform_data(self, input_data):
if self.entity_level_processing:
return self.transform_entities_data(input_data)
return self.transform_kpis_data(input_data)
def transform_kpis_data(self, input_data):
kpi_dict = defaultdict(list)
records = json.loads(input_data['data'])
for record in records:
kpi_id = record[ITSI_KPI_ID]
transformed_record = self.transform_record(record)
kpi_dict[kpi_id].append(transformed_record)
output_data = {'data': [
{ITSI_KPI_ID: kpi_id, 'kpi_at_configurations': kpi_at_configurations}
for kpi_id, kpi_at_configurations in list(kpi_dict.items())
], ALL_DATA_RECEIVED: input_data[ALL_DATA_RECEIVED]}
return output_data
def transform_entities_data(self, input_data):
"""
Transforms input data containing KPIs, entities, and their configurations into a structured format.
The transformation aggregates entities under their respective KPI IDs, along with their configurations.
Example input format:
{
"data": json.dumps([
{
"kpi_id": "5c840c54-ba84-4edb-9e3c-5e624dad2c78",
"entity_key": "130e1f7f-ad8b-4718-9d9b-acef32af244f",
"entity_title": "entity1",
"recommendation_flag": "SUCCESSFUL",
... // other attributes
},
... // more records
]),
"all_data_received": True
}
Example output format:
{
"data": [
{
"kpi_id": "5c840c54-ba84-4edb-9e3c-5e624dad2c78",
"entities": [
{
"entity_key": "130e1f7f-ad8b-4718-9d9b-acef32af244f",
"entity_title": "entity1",
"entity_at_configurations": [
{
"recommendation_flag": "SUCCESSFUL",
"algorithm": "stdev",
"cron_expression": "0 0 * * 0,4,5,6",
"duration": 60,
... // other configuration attributes
},
... // more configurations
]
}
... // more entities
]
}
... // more KPI groups
],
"all_data_received": True
}
"""
# Initialize a nested dictionary to hold KPIs, entities, and their configurations
# This structure allows for dynamically adding KPIs, entities, and their configurations
# without having to check if the keys already exist in the dictionary.
# Structure of kpi_dict after processing:
# {
# kpi_id: {
# entity_key_or_title: {
# ENTITY_AT_CONFIGURATIONS: [
# { ...configuration details... },
# ... // more configurations
# ],
# ENTITY_KEY: entity_key, # added later in the code
# ENTITY_TITLE: entity_title # added later in the code
# },
# ... // more entities under the same KPI
# },
# ... // more KPIs
# }
kpi_dict = defaultdict(lambda: defaultdict(lambda: {ENTITY_AT_CONFIGURATIONS: []}))
records = json.loads(input_data['data'])
for record in records:
kpi_id = record[ITSI_KPI_ID]
transformed_record = self.transform_record(record)
entity_key = record.get(ENTITY_KEY, NA)
entity_title = record.get(ENTITY_TITLE, NA)
# Determine the key to use for grouping based on validity of entity_key
key_to_use = get_valid_entity_identifier(entity_key, entity_title)
if not key_to_use:
raise ValueError("Valid entity identifier not found for the given key or title.")
# Append the transformed record to the appropriate list
kpi_dict[kpi_id][key_to_use][ENTITY_AT_CONFIGURATIONS].append(transformed_record)
# Ensure both entity_key and entity_title are available in the output
kpi_dict[kpi_id][key_to_use][ENTITY_KEY] = entity_key
kpi_dict[kpi_id][key_to_use][ENTITY_TITLE] = entity_title
output_data = {
'data': [
{
ITSI_KPI_ID: kpi_id,
'entities': [
{
ENTITY_KEY: details[ENTITY_KEY],
ENTITY_TITLE: details[ENTITY_TITLE],
ENTITY_AT_CONFIGURATIONS: details[ENTITY_AT_CONFIGURATIONS]
} for key, details in list(entity_details.items())
]
} for kpi_id, entity_details in list(kpi_dict.items())
], ALL_DATA_RECEIVED: input_data[ALL_DATA_RECEIVED]
}
return output_data