You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
213 lines
8.5 KiB
213 lines
8.5 KiB
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
|
|
import exec_anaconda
|
|
|
|
exec_anaconda.exec_anaconda()
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "lib"))
|
|
|
|
# Add the directory where this script resides to the Python path
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from splunklib.searchcommands import dispatch, GeneratingCommand, Configuration
|
|
|
|
from util.constants import (ITSI_DRIFT_DETECTION_KPIS_URI,
|
|
LOOKBACK_PERIOD,
|
|
AGGREGATION_SPAN,
|
|
AGGREGATION_FUNCTION,
|
|
TOLERANCE_IN_PERCENT,
|
|
THRESHOLD_DIRECTION,
|
|
KVSTORE_KEY,
|
|
ITSI_APP_NAME,
|
|
ITSI_DRIFT_DETECTION_CONFIGURATION,
|
|
ITSI_APP_OWNER,
|
|
DEFAULT_DATA_SPAN,
|
|
ADJUSTED_DATA_SPAN,
|
|
DAYS_OF_TWO_YEARS)
|
|
|
|
from logger import get_logger
|
|
from util.telemetry_logger import log_telemetry
|
|
|
|
logger = get_logger()
|
|
|
|
|
|
@Configuration()
|
|
class BatchDriftDetectionCommand(GeneratingCommand):
|
|
|
|
@staticmethod
|
|
def safe_network_call(func_, *args, **kwargs):
|
|
try:
|
|
response = func_(*args, **kwargs)
|
|
if response.status != 200:
|
|
owner = kwargs.get("owner", None)
|
|
app = kwargs.get("app", None)
|
|
end_point = args[0] if args else None
|
|
logger.error(f"Network call failed with status {response.status}, end point: {end_point}, owner: {owner}, app: {app}")
|
|
return None
|
|
return json.loads(response.body.read())
|
|
except Exception as e:
|
|
logger.error(f"Exception occurred during network call: {str(e)}")
|
|
return None
|
|
|
|
def get_kpis_configured_for_drift_detection(self):
|
|
return self.safe_network_call(self.service.get,
|
|
ITSI_DRIFT_DETECTION_KPIS_URI,
|
|
owner=ITSI_APP_OWNER,
|
|
app=ITSI_APP_NAME) or []
|
|
|
|
@staticmethod
|
|
def calculate_data_span(lookback_period):
|
|
try:
|
|
# the lookback_period looks like "-70d", "-50d", "-750d"
|
|
# the unit is always "d", and there is an "-" sign at the begining.
|
|
period = int(lookback_period[1:-1])
|
|
# We switch to ADJUSTED_DATA_SPAN when the period is longer than two years
|
|
# in which the data will be larger than ITSI reading limit, (1 million rows)
|
|
return ADJUSTED_DATA_SPAN if period >= DAYS_OF_TWO_YEARS else DEFAULT_DATA_SPAN
|
|
except:
|
|
return DEFAULT_DATA_SPAN
|
|
|
|
@staticmethod
|
|
def construct_spl_query(template_info, kpi_id):
|
|
spl_template = """| mstats latest(alert_value) AS alert_value latest(alert_level) AS alert_level WHERE index=itsi_summary_metrics earliest={lookback_period} latest=now() AND itsi_kpi_id="{kpi_id}" AND is_filled_gap_event!=1 AND is_null_alert_value=0 by itsi_kpi_id, itsi_service_id span={data_span}
|
|
| where alert_level!=-2
|
|
| bin _time span={aggregation_span}
|
|
| stats {aggregation_function}(alert_value) as alert_value by _time, itsi_kpi_id, itsi_service_id
|
|
| table _time, alert_value, itsi_kpi_id, itsi_service_id
|
|
| detectdrift threshold_direction="{threshold_direction}", threshold={tolerance_in_percent}"""
|
|
# data_span is generated accroding to the lookback_period
|
|
data_span = BatchDriftDetectionCommand.calculate_data_span(template_info[LOOKBACK_PERIOD])
|
|
return spl_template.format(
|
|
lookback_period=template_info[LOOKBACK_PERIOD],
|
|
aggregation_span=template_info[AGGREGATION_SPAN],
|
|
aggregation_function=template_info[AGGREGATION_FUNCTION],
|
|
tolerance_in_percent=template_info[TOLERANCE_IN_PERCENT],
|
|
threshold_direction=template_info[THRESHOLD_DIRECTION],
|
|
kpi_id=kpi_id,
|
|
data_span=data_span
|
|
)
|
|
|
|
def execute_spl_query(self, spl_query):
|
|
"""
|
|
Executes an SPL query in blocking mode using the Splunk service.
|
|
|
|
This method creates a search job, waits for it to complete, and logs the job's SID (Search ID)
|
|
along with its final status. It returns the SID of the completed job or None if the execution fails.
|
|
|
|
Parameters:
|
|
- spl_query (str): The SPL query to be executed.
|
|
|
|
Returns:
|
|
- sid (str): The SID of the successfully executed job or None upon failure.
|
|
|
|
Note:
|
|
The job's success is evaluated based on the 'isFailed' flag and the 'dispatchState'.
|
|
A job is considered successful if 'isFailed' is 0 (false) and 'dispatchState' is "DONE".
|
|
Any other states indicate a failure or an incomplete job.
|
|
|
|
For more details on parameters to `jobs.create` and job management,
|
|
visit: https://dev.splunk.com/enterprise/docs/devtools/python/sdk-python/howtousesplunkpython/howtorunsearchespython/
|
|
"""
|
|
try:
|
|
# Replace newline characters with spaces
|
|
single_line_query = spl_query.replace('\n', ' ')
|
|
logger.info(f"Executing SPL query: {single_line_query}")
|
|
|
|
job = self.service.jobs.create(spl_query, exec_mode="blocking")
|
|
sid = job.sid
|
|
|
|
logger.info(f"Search job initiated: SID={sid}")
|
|
|
|
while not job.is_done():
|
|
time.sleep(1)
|
|
|
|
# Retrieve the final job status details
|
|
is_failed = job["isFailed"] # 0: success, 1: failure
|
|
dispatch_state = job["dispatchState"] # "DONE": success, "FAILED": failure
|
|
|
|
# Evaluate job success
|
|
if is_failed == "0" and dispatch_state == "DONE":
|
|
job_status = "Success"
|
|
else:
|
|
job_status = "Failure"
|
|
|
|
logger.info(f"Search job finished: SID={sid}, Status={job_status}, Dispatch State={dispatch_state}")
|
|
|
|
return sid
|
|
except Exception as e:
|
|
logger.error(f"Failed to execute SPL query: {str(e)}")
|
|
return None
|
|
|
|
@staticmethod
|
|
def get_kpi_id(kpi):
|
|
return kpi.get(KVSTORE_KEY, "unknown")
|
|
|
|
def generate(self):
|
|
time_0 = time.time()
|
|
log_telemetry(
|
|
event_type = 'batchdetectdrift_start',
|
|
)
|
|
|
|
kpis = self.get_kpis_configured_for_drift_detection()
|
|
|
|
cnt_kpi = len(kpis)
|
|
log_telemetry(
|
|
event_type = 'get_drift_configured_kpi',
|
|
count_kpi = cnt_kpi
|
|
)
|
|
|
|
if cnt_kpi == 0:
|
|
logger.info("No KPIs configured for drift detection.")
|
|
return
|
|
|
|
for kpi in kpis:
|
|
try:
|
|
kpi_id = self.get_kpi_id(kpi)
|
|
template_info = kpi.get(ITSI_DRIFT_DETECTION_CONFIGURATION, {})
|
|
|
|
if not template_info:
|
|
log_telemetry(
|
|
event_type = 'no_template_found',
|
|
kpi_id = kpi_id
|
|
)
|
|
kpis_in_log = kpis[:10] if len(kpi)>5 else kpis
|
|
logger.error((f"No template info found for KPI ID: {kpi_id}. Skipping drift detection."
|
|
f"The full response from kpi is {kpis_in_log} (up to 5 kpis)."))
|
|
continue
|
|
|
|
logger.info(f"Initiating drift detection job for KPI ID: {kpi_id}")
|
|
time_1 = time.time()
|
|
|
|
# Construct the SPL query using the template information and KPI ID
|
|
spl_query = self.construct_spl_query(template_info=template_info, kpi_id=kpi_id)
|
|
|
|
# Execute the SPL query and proceed only if an SID was successfully returned
|
|
sid = self.execute_spl_query(spl_query)
|
|
|
|
log_telemetry(
|
|
event_type = 'inner_csc_complete',
|
|
kpi_id = kpi_id,
|
|
sid = sid,
|
|
total_time = f'{time.time() - time_1:.3f}s',
|
|
)
|
|
|
|
if not sid:
|
|
logger.error(f"Execution failed for KPI ID: {kpi_id}")
|
|
continue
|
|
|
|
logger.info(f"Completed drift detection job: KPI ID={kpi_id}, SID={sid}")
|
|
yield {'SID': sid}
|
|
|
|
except Exception as e:
|
|
logger.error(f"An error occurred during drift detection for KPI ID: {self.get_kpi_id(kpi)}: {e}")
|
|
|
|
log_telemetry(
|
|
event_type = 'batchdetectdrift_complete',
|
|
total_time = f'{time.time() - time_0:.3f}s',
|
|
)
|
|
|
|
dispatch(BatchDriftDetectionCommand, sys.argv, sys.stdin, sys.stdout, __name__)
|