# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. import os import uuid from concurrent.futures import ThreadPoolExecutor from time import sleep from datetime import datetime from ITOA.setup_logging import logger, InstrumentCall from ITOA.storage.itoa_storage import ITOAStorage from itsi.itsi_utils import ITOAInterfaceUtils from itsi.objects.itsi_upgrade_readiness_prechecks import ItsiUpgradeReadinessPrechecks from migration_utility.constants import MODES, MODES_STR_MAP, UPGRADE_READINESS_URL from migration_utility.migration_utility_interface import ItsiMigrationUtilityInterface from migration_utility.constants import ( UPGRADE_READINESS_JOB_TIMEOUT_LIMIT, ) class ItsiUpgradeReadinessLog(object): def __init__(self, session_key, app="SA-ITOA", user='nobody'): ''' Constructor @type: string @param session_key: @type: string @param app: context of app invoking the request @type: string @param owner: "owner" user invoking this call @rtype: None @return: None ''' self._session_key = session_key self._app = app self._user = user self._executor = ThreadPoolExecutor(max_workers=10) self.upgrade_readiness_precheck_obj = ItsiUpgradeReadinessPrechecks( self._session_key, 'nobody' ) self.transaction_id = uuid.uuid4().hex def get_queued_precheck_job(self): """Get the Upgrade readiness jobs in Queued state Returns: list: Return a list of Upgrade readiness jobs in Queued state """ filter_data = { 'status': 'QUEUED' } upgrade_readiness_precheck_job = self.upgrade_readiness_precheck_obj.get_bulk( 'nobody', fields=['_key', 'start_time', 'transaction_id', 'status', 'object_type', 'title', 'operation_mode', 'precheck_started', 'precheck_start_time', 'remediation_started', 'remediation_start_time', 'process_id', 'failed_precheck', 'precheck_to_remediate'], filter_data=filter_data ) return upgrade_readiness_precheck_job def check_existing_precheck_job_and_create_new_job(self, transaction_id): """Checks whether a precheck/remediation job is in Queued or In progress state or not to avoid unnecessary creation of KV Store object. Args: transaction_id (str): Transaction ID of the Upgrade readiness job Returns: dict: Return a dictionary with Success/Failure message """ response_object = { "transaction_id": transaction_id, "status": "SUCCESS", "message": "A new job is added in a queue." } current_in_progress_and_queued_jobs = self.upgrade_readiness_precheck_obj.get_in_progress_upgrade_readiness_prechecks( lookback_time=UPGRADE_READINESS_JOB_TIMEOUT_LIMIT, exclude_transaction_id=transaction_id, ) if len(current_in_progress_and_queued_jobs) > 0: for current_in_progress_and_queued_job in current_in_progress_and_queued_jobs: if current_in_progress_and_queued_job.get('status', '') == 'QUEUED': response_object = { "transaction_id": transaction_id, "status": "FAIL", "message": "There is an already a precheck job exist in a queue.", } elif current_in_progress_and_queued_job.get('status', '') == 'IN_PROGRESS': response_object = { "transaction_id": transaction_id, "status": "FAIL", "message": "There is a Precheck job exist in a running state", } else: response_object = { "transaction_id": transaction_id, "status": "SUCCESS", "message": "A new job is added in a queue." } return response_object @InstrumentCall(logger) def upgrade_readiness_activity(self, operation_mode=MODES["PRECHECK"]): ''' Start a new upgrade readiness precheck job ''' kvstore = ITOAStorage() in_queued_job = self.get_queued_precheck_job() response_object = self.check_existing_precheck_job_and_create_new_job(self.transaction_id) if kvstore.wait_for_storage_init(self._session_key) and response_object['status'] == 'SUCCESS': self.upgrade_readiness_precheck_obj.create_upgrade_readiness_precheck_job(self.transaction_id, MODES["PRECHECK"]) logger.info("[transaction_id=%s] [operation_mode=%s] Start" " synchronous job.", self.transaction_id, MODES_STR_MAP[operation_mode]) self._perform_upgrade_readiness_activity(self.transaction_id, MODES["PRECHECK"]) elif in_queued_job: self._perform_upgrade_readiness_activity(in_queued_job[0]['transaction_id'], in_queued_job[0]['operation_mode'], in_queued_job[0]['precheck_to_remediate']) self.transaction_id = uuid.uuid4().hex @InstrumentCall(logger) def upgrade_readiness_activity_async(self, operation_mode, precheck_ids=None): ''' Start a new async upgrade readiness precheck job ''' kvstore = ITOAStorage() try: response_object = self.check_existing_precheck_job_and_create_new_job(self.transaction_id) if kvstore.wait_for_storage_init(self._session_key) and response_object['status'] == 'SUCCESS': self.upgrade_readiness_precheck_obj.create_upgrade_readiness_precheck_job(self.transaction_id, operation_mode, precheck_ids) logger.info("[transaction_id=%s] [operation_mode=%s] Start" " synchronous job.", self.transaction_id, MODES_STR_MAP[operation_mode]) status = ITOAInterfaceUtils.control_modular_input(self._session_key, 'SA-ITOA', 'nobody', 'itsi_upgrade_readiness', 'upgrade_readiness', 'disable') if status: logger.info("ITSI Upgrade readiness modular input is disabled" " transaction_id=%s", self.transaction_id) sleep(3) status = ITOAInterfaceUtils.control_modular_input(self._session_key, 'SA-ITOA', 'nobody', 'itsi_upgrade_readiness', 'upgrade_readiness', 'enable') if status: logger.info("ITSI Upgrade readiness modular input is enabled" " transaction_id=%s", self.transaction_id) self.transaction_id = uuid.uuid4().hex return response_object except Exception as e: logger.exception(e) logger.error("[transaction_id=%s] [operation_mode=%s] Adhoc job" " could not be enqueued.", self.transaction_id, MODES_STR_MAP[operation_mode]) self.upgrade_readiness_precheck_obj.update_upgrade_readiness_precheck_job( self.transaction_id, status='FAILED', fail_message='SI_UR_1001') transaction_id_value = self.transaction_id self.transaction_id = uuid.uuid4().hex return { "transaction_id": transaction_id_value, "status": "FAIL", "message": f"{e}", } def _generate_transaction_id(self): ''' Generate new transaction id by uuid. @rtype: string @return: new transaction_id ''' return uuid.uuid4().hex def _perform_upgrade_readiness_activity(self, transaction_id, operation_mode, precheck_ids=None): ''' Start a new upgrade readiness precheck job with a transaction id @type: string @return: transaction_id ''' logger.info(f'[transaction_id={transaction_id}] ' f'[operation_mode={MODES_STR_MAP[operation_mode]}] ' 'Performing upgrade readiness job.') kvstore = ITOAStorage() if kvstore.wait_for_storage_init(self._session_key): self.upgrade_readiness_precheck_obj.update_upgrade_readiness_precheck_job(transaction_id, status='IN_PROGRESS', process_id=os.getpid(), fail_message='') # Execute upgrade readiness precheck or remediation jobs_processor = ItsiMigrationUtilityInterface( self._session_key, transaction_id) results, message, failed_precheck_count = jobs_processor.execute(operation_mode, precheck_ids) if results: self.upgrade_readiness_precheck_obj.update_upgrade_readiness_precheck_job( transaction_id, status='COMPLETED', fail_message='') time = datetime.now().strftime('%H:%M:%S') date = datetime.now().strftime('%m/%d/%Y') if failed_precheck_count > 0: WARNING_MESSAGE = ( 'The last upgrade readiness check executed at {0} on {1} and detected {2} issues. ' '[[{3}|View the Upgrade Readiness Dashboard.]]' ).format(time, date, failed_precheck_count, UPGRADE_READINESS_URL) ITOAInterfaceUtils.create_message(self._session_key, WARNING_MESSAGE) logger.info(f'[transaction_id={transaction_id}] ' f'[operation_mode={MODES_STR_MAP[operation_mode]}] ' '[final_message=completed_upgrade_readiness_job] Completed upgrade readiness job.') else: self.upgrade_readiness_precheck_obj.update_upgrade_readiness_precheck_job( transaction_id, status='FAILED', fail_message="SI_UR_1003") logger.error(f'[transaction_id={transaction_id}] ' f'[operation_mode={MODES_STR_MAP[operation_mode]}] ' f'Error message={message} ' 'Job failed.') else: logger.error(f'[transaction_id={transaction_id}] ' f'[operation_mode={MODES_STR_MAP[operation_mode]}] ' 'KV Store unavailable for Upgrade Readiness Job. Exiting.') self.upgrade_readiness_precheck_obj.update_upgrade_readiness_precheck_job( transaction_id, status='FAILED', fail_message='SI_UR_1002')