You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

82 lines
3.6 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
from time import sleep
import splunk.rest as rest
from integrations.commons.logutil import get_logger
import SA_ITOA_app_common.splunklib.client as client
import SA_ITOA_app_common.splunklib.results as results
logger = get_logger(name=__name__)
def get_notable_events(splunk_server, itsi_group_id, earliest_time, latest_time, page_size, results_max_limit=None, search_string=None):
'''
This is a generator function. This is done this way so that we will achieve two things
1) Not bombard splunk sever when episode has too many notable events in it (benefit of pagination)
2) we dont go to some case (possible case when we accumulate events ). so we dispatch artifact creation as and when
we have a page in hand
'''
service = splunk_server.get_service()
SEARCH_STRING = 'search `itsi_event_management_group_index` itsi_group_id="{itsi_groupid}" | eval itsi_service_ids = split(itsi_service_ids,",") | mvexpand itsi_service_ids | dedup event_id'.format(itsi_groupid=itsi_group_id)
kwargs = {"earliest_time": earliest_time, "latest_time": latest_time}
if search_string is not None:
SEARCH_STRING = search_string
if results_max_limit is not None:
SEARCH_STRING = SEARCH_STRING + ' | head ' + str(results_max_limit)
logger.info('search_string = ' + str(SEARCH_STRING) + ' , parms = ' + str(kwargs))
job = service.jobs.create(SEARCH_STRING, **kwargs)
while not job.is_done():
sleep(.2)
# having pagination so as not to make our action memory intensive one
resultCount = job["resultCount"] # Number of results this job returned
offset = 0 # type: int # Starting number of each page. for page 1 it is 0, page 2 it is 10
count = page_size
page = 0
while (offset < int(resultCount)):
page = page + 1
logger.info('Splunk page ' + str(page))
kwargs_paginate = {"count": count,
"offset": offset}
# get a page
paginatedsearch_results = job.results(**kwargs_paginate)
rr = results.ResultsReader(paginatedsearch_results)
curr_page_results = []
for result in rr:
if isinstance(result, results.Message):
# Diagnostic messages may be returned in the results
# ignored
logger.info(result.type, result.message)
elif isinstance(result, dict):
# Normal events are returned as dicts
curr_page_results.append(result)
yield curr_page_results
# Increase the offset to get the next set of results
offset += count
# pythonic form of function in EventManagementUtils.js getGroupTimeRange
# this is done to not miss the last event, if we just use start_time and last_time, we will miss last event
def get_group_time_range(start_time, last_time, itsi_first_event_time, itsi_last_event_time, itsi_earliest_event_time):
min_time = min([float(i) if i else float("inf") for i in [start_time, itsi_first_event_time, itsi_earliest_event_time]])
max_time = max([float(i) if i else -1 for i in [last_time, itsi_last_event_time]])
time_padding = 60
if min_time >= time_padding and min_time != float("inf"):
earliest_time = min_time - time_padding
else:
earliest_time = 0
logger.error("Invalid earliest_time, setting it to 0")
if max_time != -1:
latest_time = max_time + time_padding
else:
latest_time = 'now'
logger.error("Invalid latest_time, setting it to 'now'.")
return {'earliest_time': earliest_time, 'latest_time': latest_time}