import json import os import sys import time from collections import defaultdict, deque, OrderedDict import exec_anaconda exec_anaconda.exec_anaconda() sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "lib")) # Add the directory where this script resides to the Python path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from util.data_prepare import (ITSI_TIMESTAMP_FORMAT, COL_VALUE, COL_DATE, COL_HOUR, COL_DAY_OF_WEEK, COL_ID) from util.csc_input import parse_timestamp from util.csc_output import ( confidence_description, output_thresholds_dict, THR_DIR_BOTH, THR_DIR_LO, THR_DIR_UP, THR_DIR_AUTO ) from util.timepolicy import (generate_cron_output, NO_PATTERN, INSUFFICIENT_DATA, PATTERN_SWITCH, SUCCESSFUL) from splunklib.searchcommands import dispatch, Configuration, Option, validators, StreamingCommand from util import setup_logging from util.telemetry_logger import log_telemetry import numpy as np import pandas as pd from constants import ( KV_AT_TIME_POLICIES_COLLECTION, ITSI_KPI_ID, RECOMMENDATION_FLAG, ALGORITHM, CRON_EXPRESSION, DURATION, THRESHOLDS, MEAN, STD, SCORE, CONFIDENCE, TIME_POLICY_DESCRIPTION, ALL_DATA_RECEIVED, THRESHOLD_DIRECTION, ANALYSIS_WINDOW, USE_STATIC, SENSITIVITY, CONSTANT_KPI, ENTITY_KEY, ENTITY_TITLE, ALERT_VALUE, ITSI_ENTITIES_AT_RESULTS_POST_URI, ) from kpis_utils import is_valid_value, get_valid_entity_identifier # Set up logger logger = setup_logging.get_logger() # Define a minimum number of events needed for processing. MIN_EVENTS_FOR_PROCESSING = 100 # Map time policy descriptions to log messages LOG_MESSAGES = { NO_PATTERN: 'We were unable to find a time policy that fits your data.', INSUFFICIENT_DATA: f'There is not enough data to make a recommendation on your data. We require at least 1 day worth of data and at least {MIN_EVENTS_FOR_PROCESSING} events.', PATTERN_SWITCH: 'We could not detect a consistent pattern in your data. It seems that there is more than one pattern.' } class KPIResponseBuilder: """ KPIResponseBuilder provides static methods for preprocessing, structuring, and enhancing KPI data. """ @staticmethod def preprocess(df): """ Preprocesses the DataFrame by setting appropriate column data types and making the date column the DataFrame index. Parameters: - df (DataFrame): The DataFrame to be preprocessed. Returns: - DataFrame: The preprocessed DataFrame. """ df[COL_DAY_OF_WEEK] = df[COL_DATE].dt.dayofweek.astype(int) df[COL_HOUR] = df[COL_DATE].dt.hour.astype(int) df[COL_VALUE] = df[COL_VALUE].astype(float) df.set_index(COL_DATE, inplace=True) return df @staticmethod def structure_kpi_output( itsi_kpi_id, recommendation_flag, cron_lists=None, score=None, threshold_rounding=None, threshold_direction=None, time_policy_description=None, analysis_window=None, use_static=False, mean=None, stdev=None, ): """ Structures the KPI data based on various parameters. Returns: - list: Structured KPI data. """ use_static = "True" if use_static else "False" if recommendation_flag == SUCCESSFUL: return [ { ITSI_KPI_ID: itsi_kpi_id, RECOMMENDATION_FLAG: recommendation_flag, ALGORITHM: 'stdev', CRON_EXPRESSION: cron.cron_expression, DURATION: cron.time_length, THRESHOLD_DIRECTION: threshold_direction, THRESHOLDS: f"{output_thresholds_dict(cron.z_value, threshold_rounding, threshold_direction)}", MEAN: round(cron.mean, threshold_rounding), STD: round(cron.std, threshold_rounding), SENSITIVITY: round(cron.sensitivity, threshold_rounding), SCORE: score, CONFIDENCE: confidence_description(score), TIME_POLICY_DESCRIPTION: time_policy_description, ANALYSIS_WINDOW: analysis_window, USE_STATIC: use_static, } for cron in cron_lists or [] ] if recommendation_flag == NO_PATTERN: return [{ ITSI_KPI_ID: itsi_kpi_id, RECOMMENDATION_FLAG: recommendation_flag, ALGORITHM: 'static', CRON_EXPRESSION: 'None', DURATION: 'None', THRESHOLD_DIRECTION: threshold_direction, THRESHOLDS: f"{output_thresholds_dict(cron_lists[0].z_value, threshold_rounding, threshold_direction, static_params=(mean, stdev))}", MEAN: round(cron_lists[0].mean, threshold_rounding), STD: round(cron_lists[0].std, threshold_rounding), SENSITIVITY: round(cron_lists[0].sensitivity, threshold_rounding), SCORE: score, CONFIDENCE: confidence_description(score), TIME_POLICY_DESCRIPTION: 'Static thresholding', ANALYSIS_WINDOW: analysis_window, USE_STATIC: use_static, }] return [{ ITSI_KPI_ID: itsi_kpi_id, RECOMMENDATION_FLAG: recommendation_flag, ALGORITHM: 'None', CRON_EXPRESSION: 'None', DURATION: 'None', SENSITIVITY: 'None', THRESHOLDS: 'None', MEAN: 'None', STD: 'None', SCORE: 'None', CONFIDENCE: 'None', TIME_POLICY_DESCRIPTION: 'None', ANALYSIS_WINDOW: analysis_window, USE_STATIC: use_static, }] @staticmethod def append_entity_details(formatted_data, entity_key, entity_title): """ Enhances formatted KPI data with entity_key and entity_title if they are non-empty. Parameters: formatted_data (list): Structured KPI data. entity_key (str): The entity key to add. entity_title (str): The entity title to add. Returns: list: Enhanced KPI data with entity details. """ if not entity_key and not entity_title: return formatted_data def add_entity_fields(entry): if entity_key: entry[ENTITY_KEY] = entity_key if entity_title: entry[ENTITY_TITLE] = entity_title return entry return [add_entity_fields(entry) for entry in formatted_data] @staticmethod def prepare_kpi_response(itsi_kpi_id, entity_key, entity_title, recommendation_flag, entity_level_processing, **kwargs): """ Prepares the KPI response by structuring the KPI data and enhancing it with entity information if entity_level_processing is enabled. Parameters: itsi_kpi_id (str): The KPI ID. entity_key (str): The entity key. entity_title (str): The entity title. recommendation_flag (str): The recommendation flag. entity_level_processing (bool): Flag to determine if entity-level processing is enabled. **kwargs: Additional keyword arguments for format_kpi_data. Returns: list: Prepared KPI response. """ formatted_data = KPIResponseBuilder.structure_kpi_output(itsi_kpi_id, recommendation_flag, **kwargs) if entity_level_processing: return KPIResponseBuilder.append_entity_details(formatted_data, entity_key, entity_title) return formatted_data class KPIThresholdRecommender: """ KPIThresholdRecommender is responsible for recommending thresholds for Key Performance Indicators (KPIs). It performs various checks like data sufficiency and constancy of KPI before proceeding to the recommendation. """ def __init__(self, df): """ Initialize the KPIThresholdRecommender with a DataFrame. Parameters: - dataframe (pd.DataFrame): The DataFrame containing the KPI data. """ self.df = df @property def kpi_id(self): """ Returns the KPI ID from the DataFrame. Returns: str: The KPI ID. """ return self.df[COL_ID].iloc[0] @property def entity_key(self): """ Returns the entity key from the DataFrame, if it exists. Returns: str or None: The entity key, or None if the column doesn't exist. """ return self.df.get(ENTITY_KEY).iloc[0] if ENTITY_KEY in self.df else None @property def entity_title(self): """ Returns the entity title from the DataFrame, if it exists. Returns: str or None: The entity title, or None if the column doesn't exist. """ return self.df.get(ENTITY_TITLE).iloc[0] if ENTITY_TITLE in self.df else None def has_insufficient_data(self, min_events=MIN_EVENTS_FOR_PROCESSING): """ Check if the DataFrame has a sufficient number of events for processing. Parameters: min_events (int, optional): The minimum number of events required for processing. Defaults to MIN_EVENTS_FOR_PROCESSING. Returns: bool: True if data is insufficient, False otherwise. """ if self.df[COL_VALUE].count() < min_events: logger.warning(f"{self.kpi_id} ({self.entity_title}) - Not enough data. Requires at least {min_events} events.") return True return False def is_constant_timeseries(self): """ Check if the KPI/entity timeseries is constant across all events. Returns: bool: True if timeseries is constant, False otherwise. """ return self.df[COL_VALUE].min() == self.df[COL_VALUE].max() def process(self, threshold_rounding, threshold_direction, analysis_window, use_static, entity_level_processing=False): """ The main function to process the DataFrame and produce KPI recommendations. Parameters: threshold_rounding (float): Rounding to apply to calculated thresholds. threshold_direction (str): Direction for threshold calculations. analysis_window (str): Represents analysis window, returned without modification use_static (bool): Represents whether we are using one-time static values or not, returned without modification entity_level_processing (bool): Flag to determine if entity-level processing is enabled. Returns: dict: Recommendations for the KPI. """ if self.has_insufficient_data(): return KPIResponseBuilder.prepare_kpi_response(self.kpi_id, self.entity_key, self.entity_title, INSUFFICIENT_DATA, entity_level_processing, analysis_window=analysis_window, use_static=use_static, ) KPIResponseBuilder.preprocess(self.df) if self.is_constant_timeseries(): logger.warning( f"KPI '{self.kpi_id}' has a constant time series; no recommendations can be returned for time-based policies or thresholds." ) return KPIResponseBuilder.prepare_kpi_response(self.kpi_id, self.entity_key, self.entity_title, CONSTANT_KPI, entity_level_processing, analysis_window=analysis_window, use_static=use_static, ) # Get the cron output, time policy description, and score. # Choose threshold direction only if input threshold_direction is auto # Determine if automatic threshold direction should be used choose_auto_direction = (threshold_direction == THR_DIR_AUTO) # Generate required output and automatically detect threshold direction if needed timepolicy_output = generate_cron_output(self.df, choose_auto_direction) # Decide on the final threshold direction final_direction = timepolicy_output.threshold_direction if choose_auto_direction else threshold_direction # Sort and prepare cron lists cron_dict = timepolicy_output.cron_dict cron_lists = [cron_dict[k] for k in sorted(cron_dict.keys())] time_policy_desc = timepolicy_output.time_policy_desc recommendation_flag = 'SUCCESSFUL' if time_policy_desc not in ['NO_PATTERN', 'INSUFFICIENT_DATA', 'PATTERN_SWITCH'] else time_policy_desc return KPIResponseBuilder.prepare_kpi_response( self.kpi_id, self.entity_key, self.entity_title, recommendation_flag, entity_level_processing, cron_lists=cron_lists, score=timepolicy_output.time_policy_score, threshold_rounding=threshold_rounding, threshold_direction=final_direction, time_policy_description=time_policy_desc, analysis_window=analysis_window, use_static=use_static, mean=self.df[COL_VALUE].mean(), stdev=self.df[COL_VALUE].std(), ) @Configuration() class RecommendThresholdTemplateCommand(StreamingCommand): """ The RecommendThresholdTemplateCommand class is responsible for real-time KPI threshold recommendations based on incoming streaming data. This class takes streaming records as input, groups them by a specified 'itsi_kpi_id_field', and then applies a set of computations to recommend appropriate KPI thresholds for each group of records. """ alert_value_field = Option(require=False, default=ALERT_VALUE) # An option to specify the field for ITSI KPI IDs. Defaults to 'itsi_kpi_id' if not provided. itsi_kpi_id_field = Option(require=False, default=ITSI_KPI_ID) entity_key_field = Option(require=False, default=ENTITY_KEY) entity_title_field = Option(require=False, default=ENTITY_TITLE) entity_level_processing = Option(require=False, default=False, validate=validators.Boolean()) send_to_api = Option(require=False, default=False, validate=validators.Boolean()) time_field = '_time' # An option for the timestamp format. Defaults to a pre-defined ITSI timestamp format if not provided. timestamp_format = Option(require=False, default=ITSI_TIMESTAMP_FORMAT) # An option to specify if the input data has a header. Defaults to False if not provided. has_header = Option(require=False, default=False) # An option for the precision to round threshold values. Defaults to 2 decimal places. threshold_rounding = Option(require=False, default=2, validate=validators.Integer()) # An option to specify the direction in which to consider thresholds (both, up, or low). # Validates the provided value against a predefined set of values (both, up, or low). threshold_direction = Option( require=False, default=THR_DIR_AUTO, validate=validators.Set(THR_DIR_AUTO, THR_DIR_BOTH, THR_DIR_UP, THR_DIR_LO) ) # An option to specify a specific analysis window, serves no other function than to make it clear what timerange was chosen by the user. analysis_window = Option(require=False, default="") # An option to force one-time static recommendations, serves no other function than to make it clear whether mean and stdev values should be recomputed nightly or not. use_static = Option(require=False, default=False, validate=validators.Boolean()) # Buffer to store records by record key buffer = defaultdict(deque) # List to maintain the order in which keys are received order_of_received_keys = [] start_time = time.time() collection_name = KV_AT_TIME_POLICIES_COLLECTION def __init__(self): super().__init__() self.df = None self.count_of_processed_ids = 0 def generate_kpi_recommendations(self, records): """ Convert a list of records to a DataFrame and apply KPI threshold recommendations processing. Parameters: records (deque[dict]): A list of dictionaries where each dictionary represents a record with keys corresponding to the fields in the DataFrame. Returns: dict: Recommendations for KPI thresholds. """ # Create DataFrame 'df' from 'records'; skip first row if 'has_header' is True. # Note: The DataFrame will have columns in the order of 'time_field', 'value_field', and 'kpi_id_field'. df = pd.DataFrame(records[1:] if self.has_header else records) # Renaming ignored if column does not exist in DataFrame column_renames = { self.time_field: COL_DATE, self.alert_value_field: COL_VALUE, self.itsi_kpi_id_field: COL_ID, self.entity_key_field: ENTITY_KEY, self.entity_title_field: ENTITY_TITLE } df.rename(columns=column_renames, inplace=True) # Replace field that's entirely space (or empty) with NaN df = df.replace(r'^\s*$', np.nan, regex=True) # Parse and update the timestamps in the DataFrame 'df' using the given 'timestamp_format'. # It converts 'COL_DATE' column in the DataFrame 'df' into a DateTime object. df = parse_timestamp(df, self.timestamp_format) return KPIThresholdRecommender(df).process( self.threshold_rounding, self.threshold_direction, self.analysis_window, self.use_static, self.entity_level_processing, ) def save_to_kvstore(self, records, all_data_received=False): job_id = self.metadata.searchinfo.sid # Get the unique Splunk job ID # Get or create KV Store collection collection = self.get_or_create_collection() # Fetch existing record if available existing_record = self.fetch_existing_record(collection, job_id) if existing_record: self.update_existing_record(collection, job_id, existing_record, records, all_data_received) else: self.create_new_record(collection, job_id, records, all_data_received) def get_or_create_collection(self): kvstore = self.service.kvstore return kvstore.create(self.collection_name) if self.collection_name not in kvstore else kvstore[ self.collection_name] @staticmethod def fetch_existing_record(collection, job_id): try: return collection.data.query_by_id(job_id) except Exception as e: logger.error(f"Unable to find record with _key={job_id} in KV Store due to error: {str(e)}") return None @staticmethod def update_existing_record(collection, job_id, existing_record, records, all_data_received): existing_data = json.loads(existing_record['data']) existing_data.extend(records) updated_data = json.dumps(existing_data) collection.data.update(job_id, {'data': updated_data, ALL_DATA_RECEIVED: all_data_received}) def log_kv_store_record_by_id(self, job_id): collection = self.get_or_create_collection() record = self.fetch_existing_record(collection, job_id) logger.info(f"KVStore job_id={job_id}, record={record}") @staticmethod def create_new_record(collection, job_id, records, all_data_received): new_data = json.dumps(records) insert_result = collection.data.insert( json.dumps({'_key': job_id, 'data': new_data, ALL_DATA_RECEIVED: all_data_received})) logger.info(f"KVStore insert operation result, job_id={job_id}, insert_result={insert_result}") @staticmethod def generate_empty_records_response(use_static, analysis_window): return KPIResponseBuilder.structure_kpi_output('None', INSUFFICIENT_DATA, use_static=use_static, analysis_window=analysis_window) def save_empty_records_response_to_kvstore(self, empty_records_response): self.save_to_kvstore(records=empty_records_response, all_data_received=True) def _record_processing_metrics(self, results, id_, kpi_rec_start_time, is_end_of_kpis, entity_level_processing): first_row = results[0] log_telemetry( event_type="pattern_detection_completed", kpi_id=first_row.get(ITSI_KPI_ID), entity_level_processing=entity_level_processing, entity_key=first_row.get(ENTITY_KEY), entity_title=first_row.get(ENTITY_TITLE), recommendation_flag=first_row.get(RECOMMENDATION_FLAG), algorithm=first_row.get(ALGORITHM), cron_expression=f"'{first_row.get(CRON_EXPRESSION, '')}'", threshold_direction=first_row.get(THRESHOLD_DIRECTION), thresholds=first_row.get(THRESHOLDS), mean=first_row.get(MEAN), std=first_row.get(STD), sensitivity=first_row.get(SENSITIVITY), score=first_row.get(SCORE), confidence=first_row.get(CONFIDENCE), time_policy_desc=f"'{first_row.get(TIME_POLICY_DESCRIPTION, '')}'", processing_time=f"{time.time() - kpi_rec_start_time:.2f}s" ) log_telemetry( event_type="kpi_processed", kpi_id=id_, entity_level_processing=entity_level_processing, entity_key=first_row.get(ENTITY_KEY), entity_title=first_row.get(ENTITY_TITLE), processing_time=f"{time.time() - self.start_time:.5f}s" ) if is_end_of_kpis: log_telemetry( event_type="kpis_processing_complete", count_of_processed_ids=self.count_of_processed_ids, entity_level_processing=entity_level_processing, entity_key=first_row.get(ENTITY_KEY), entity_title=first_row.get(ENTITY_TITLE), processing_time=f"{time.time() - self.start_time:.5f}s" ) def process_empty_records(self): """ Handles the scenario when the records are empty. """ empty_records_response = self.generate_empty_records_response(use_static=self.use_static, analysis_window=self.analysis_window) if self.send_to_api: self.post_results_to_api(empty_records_response) else: self.save_empty_records_response_to_kvstore(empty_records_response) return empty_records_response def validate_record_fields(self, record): # Method to validate required fields in the record required_fields = [self.time_field, self.alert_value_field, self.itsi_kpi_id_field] for field in required_fields: if field not in record: raise ValueError(f'The field {field} is not a field in the dataset. \ Ensure the field is passed correctly to the {field} argument of recommendthresholdtemplate.') # Additional validation for entity level processing if self.entity_level_processing: entity_key_valid = is_valid_value(record.get(self.entity_key_field)) entity_title_valid = is_valid_value(record.get(self.entity_title_field)) if not entity_key_valid and not entity_title_valid: # Check if both are invalid raise ValueError( f"Both {self.entity_key_field} and {self.entity_title_field} are missing or invalid in the record. " "At least one is required for entity level processing.") def determine_record_key(self, record): # Method to determine the key for the record if self.entity_level_processing: key_to_use = get_valid_entity_identifier(record.get(self.entity_key_field), record.get(self.entity_title_field)) if not key_to_use: raise ValueError( f"Both {self.entity_key_field} and {self.entity_title_field} are invalid or missing for entity level processing.") return key_to_use.strip() # For non-entity level processing, use the itsi_kpi_id_field return record[self.itsi_kpi_id_field].strip() def create_buffer_entry(self, record): # Method to create a buffer entry from the record buffer_entry = OrderedDict([ (self.time_field, record[self.time_field]), (self.alert_value_field, record[self.alert_value_field]), (self.itsi_kpi_id_field, record[self.itsi_kpi_id_field]) ]) if self.entity_level_processing: if self.entity_key_field in record: buffer_entry[self.entity_key_field] = record[self.entity_key_field] if self.entity_title_field in record: buffer_entry[self.entity_title_field] = record[self.entity_title_field] return buffer_entry def determine_keys_to_process(self): # Check whether to process all ids or just the previous ones based on the _finished flag # If not finished, skip the last one to_process_ids = [] for key_current in self.order_of_received_keys: if not self._finished and key_current == self.order_of_received_keys[-1]: break to_process_ids.append(key_current) return to_process_ids def cleanup_processed_keys(self, processed_ids): # Method to clean up processed keys from buffer and list for key in processed_ids: del self.buffer[key] self.order_of_received_keys.remove(key) def post_results_to_api(self, data, id_=None): try: json_body = {"data": json.dumps(data)} response = self.service.post(ITSI_ENTITIES_AT_RESULTS_POST_URI, owner="nobody", app="SA-ITOA", body=json_body) if response.status in [201, 200]: logger.info(f"Data successfully posted for ID: {id_}.") else: logger.error(f"Failed to post data for ID: {id_}. Status: {response.status}, Reason: {response.reason}") except Exception as e: logger.exception(f"An error occurred while posting data for ID: {id_}. Exception: {e}") raise def buffer_record(self, key, record): buffer_entry = self.create_buffer_entry(record) self.buffer[key].append(buffer_entry) def process_and_yield_empty_records(self): """ Handles the scenario when no records have been processed. """ log_telemetry( event_type="no_data_error", error="Received empty records - no data available to process." ) return self.process_empty_records() def handle_record(self, record): self.validate_record_fields(record) key = self.determine_record_key(record) if key not in self.order_of_received_keys: self.order_of_received_keys.append(key) self.buffer_record(key, record) def stream(self, records): """ Group, process, and yield processed records based on the entity_key, entity_title, or itsi_kpi_id field, depending on the configuration. The function performs the following tasks: 1. Iterates over the incoming records: a. If entity_level_processing is enabled, it first tries to use `entity_key` as the primary key for each record. If `entity_key` is absent or empty, `entity_title` is used as a fallback. Otherwise, `itsi_kpi_id` is used as the key. b. Tracks the order in which unique keys (entity_key/entity_title/itsi_kpi_id) are encountered for the first time. c. Buffers each record by its key (entity_key/entity_title/itsi_kpi_id) into an internal data structure. 2. Determines the keys that should be processed based on the `_finished` flag: - If `_finished` flag is unset, the most recent key in the buffer is skipped. 3. Processes records serially for each determined key using the `process_records` function: - Yields the processed records individually if they are returned as a list. - Yields the entire result directly otherwise. 4. Once records of a particular key are processed and yielded, they are removed from the buffer and the order list to free up memory. Args: - records (iterable): Stream of records to be processed. Yields: - dict: Processed records or results. """ # Use a flag to check if records were processed records_processed = False for record in records: records_processed = True self.handle_record(record) if records_processed: to_process_ids = self.determine_keys_to_process() for index, id_ in enumerate(to_process_ids): kpi_rec_start_time = time.time() results = self.generate_kpi_recommendations(self.buffer[id_]) is_last_element = (index == len(to_process_ids) - 1) self.count_of_processed_ids += 1 if self.send_to_api: self.post_results_to_api(results, id_) else: self.save_to_kvstore(results, self._finished & is_last_element) yield from results self._record_processing_metrics(results, id_, kpi_rec_start_time, self._finished & is_last_element, self.entity_level_processing) self.cleanup_processed_keys(to_process_ids) else: yield from self.process_and_yield_empty_records() dispatch(RecommendThresholdTemplateCommand, sys.argv, sys.stdin, sys.stdout, __name__)