# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. import time import uuid from ITOA import itoa_refresh_queue_utils from itsi.objects.changehandlers.itoa_change_handler import ItoaChangeHandler class SimulationChangeHandler(ItoaChangeHandler): """ This is a change handler simulator for exercising the Refresh Queue without modifying real objects. """ def deferred(self, change, transaction_id=None): """ Logs the simulated change and spawns additional child jobs if appropriate. """ self.logger.info("Simulation change handler tid=%s change=%s", transaction_id, change) change_detail = change.get("change_detail", {}) # The simulation job duration can be controlled by passing a "delay" # in the "change_detail" of the job. By default, it runs for 1 second. delay = change_detail.get("delay", 1) time.sleep(delay) job_result = True override_result = False for key in change_detail.keys(): if key == "child_job_count": self._schedule_child_jobs( int(change_detail.get("child_job_count")), change.get("change_type"), change_detail, transaction_id ) elif key == "result": job_result = change_detail.get("result") elif key == "succeed_on_retry": override_result = change.get("number_of_failures", 0) > 0 return job_result or override_result def _schedule_child_jobs(self, count, change_type, change_detail, transaction_id): adapter = itoa_refresh_queue_utils.RefreshQueueAdapter(self.session_key) for _ in range(count): adapter.create_refresh_job( change_type=change_type, changed_object_key=uuid.uuid4().hex, changed_object_type="simulation_object", change_detail={ **change_detail, "child_job_count": count - 1, }, transaction_id=transaction_id, )