# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. import sys import uuid import time import json import traceback from splunk.clilib.bundle_paths import make_splunkhome_path sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib'])) from itsi.itsi_utils import ITOAInterfaceUtils from .metrics_backfill_worker import MetricsBackfillWorker JOB_MANAGER_PAGE = '/app/itsi/job_manager?app=&filter=' KVSTORE_COLLECTION = 'itsi_metrics_backfill_queue' DOCS_URL = '/help?location=[itsi:4.20.0]metrics_summary_index' class MetricsBackfillQueue(object): def __init__(self, session_key, logger): self.session_key = session_key self.service = ITOAInterfaceUtils.service_connection(session_key, 'SA-ITOA') self.logger = logger try: self.queue_collection = self.service.kvstore[KVSTORE_COLLECTION] except KeyError: self.queue_collection = None def get_backfill_queue_info(self): if not self.queue_collection: return {} return self.queue_collection.data.query() def clear_backfill_queue(self): self.queue_collection.data.delete() def notify_of_start(self, batch_id): self.logger.info('Metric index backfill process has started') msg = { 'severity': 'info', 'value': ('Your summary index data is being migrated to the new ITSI metrics summary index. ' '[[{0}|View the import process.]] The itsi_summary_metrics index creates a more responsive UI ' 'experience by increasing the performance of the searches dispatched by ITSI. ' '[[{1}|Learn more.]]').format((JOB_MANAGER_PAGE + batch_id), DOCS_URL) } self.service.messages.create(name='metric_backfill_started', **msg) def notify_of_finish(self, elapsed): self.logger.info('Metric index backfill process completed in {} minutes'.format(elapsed)) msg = { 'severity': 'info', 'value': 'Metric index backfill process completed in {} minutes'.format(elapsed) } self.service.messages.create(name='metric_backfill_completed', **msg) def update_queue(self, backfill_queue_info, current_hour): backfill_queue_info['current_hour'] = current_hour self.queue_collection.data.update(backfill_queue_info['_key'], json.dumps(backfill_queue_info)) def execute_backfill_queue(self, config_settings={}, summary_index_name='`get_itsi_summary_index`', metrics_index_name='`get_itsi_summary_metrics_index`', sourcetype='itsi_summary:metrics'): """ Main function that generates the searches that will be used to backfill data from itsi_summary index into the itsi_summary_metrics index (introduced in ITSI 4.6.0). Uses configuration settings defined in inputs.conf (or modified through UI) for how many searches or how quickly backfill process will run. @type config_settings: dict @param config_settings: the configuration settings from the modular input configuration (inputs.conf) Configurations include: - metrics_backfill_length: how many days back backfill should look for data - metrics_backfill_throttle: how long the backfill process should wait in between running searches - metrics_backfill_concurrent_searches: how many searches run at the same time :param summary_index_name: index where the data is coming from :param metrics_index_name: index where the data will be written to :param sourcetype: sourcetype to be used in the backfill events/results :return: Boolean indicating success or failure """ try: backfill_queue_info = self.get_backfill_queue_info() if not len(backfill_queue_info): # queue is empty return False else: backfill_queue_info = backfill_queue_info[0] is_backfill_triggered = backfill_queue_info.get('is_backfill_triggered') if not is_backfill_triggered: return False batch_id = str(uuid.uuid4()) self.notify_of_start(batch_id) self.logger.info('Backfill queue info: ' + str(backfill_queue_info)) start = time.time() # Configuration settings come from the modular input settings. Default settings: # Lookback of 3 days, throttle 10s, 1 concurrent search hours_lookback = int(config_settings.get('metrics_backfill_length', 3)) * 24 wait_throttle = int(config_settings.get('metrics_backfill_throttle', 10)) concurrent_searches = int(config_settings.get('metrics_backfill_concurrent_searches', 1)) current_hour = backfill_queue_info.get('current_hour', 1) workers = [MetricsBackfillWorker(self.session_key, self.logger) for w in range(concurrent_searches)] current_hour = 1 if current_hour < 1 else current_hour while current_hour < hours_lookback: for worker in workers: if worker.is_work_done(): earliest = '-{}h@h'.format(current_hour) latest = '-{}h@h'.format(current_hour - 1) if current_hour > 1 else 'now' worker.migrate_summary_to_metrics(earliest, latest, batch_id, summary_index_name, metrics_index_name, sourcetype) current_hour += 1 self.update_queue(backfill_queue_info, current_hour) # Give the indexing tier a brief break in between each backfill search time.sleep(wait_throttle) elapsed = int((time.time() - start) // 60) self.notify_of_finish(elapsed) self.clear_backfill_queue() return True except Exception as e: self.logger.error('Error running metrics_backfill: ' + str(e)) self.logger.error(str(traceback.format_exc())) return False