# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. import time from ITOA.setup_logging import logger from ITOA.itoa_object import ItoaObject, CRUDMethodTypes STATUS_IN_PROGRESS = "In-Progress" STATUS_QUEUED = "Queued" STATUS_SKIPPED = "Skipped" STATUS_FAILED = "Failed" class ItsiRefreshQueueJob(ItoaObject): """ Implements ITSI Refresh Queue Job """ log_prefix = '[ITSI Refresh Queue Job] ' collection_name = 'itsi_refresh_queue' logger = logger def __init__(self, session_key, current_user_name): super(ItsiRefreshQueueJob, self).__init__(session_key, current_user_name, 'refresh_queue_job', collection_name=self.collection_name, title_validation_required=False) def do_object_validation(self, owner, objects, validate_name=True, dupname_tag=None, transaction_id=None, skip_local_failure=False): """ Generic object validation routine for the refresh job. It consists of title-related validation and specific checks for required fields: 'change_type', 'changed_object_key', 'changed_object_type'. It also validates the 'priority' field. All new object level validation should be invoked from here. @type objects: list[dict] @param objects: list of dict @return: None. Throws exceptions on validation errors. """ super(ItsiRefreshQueueJob, self).do_object_validation(owner, objects, validate_name, dupname_tag, transaction_id, skip_local_failure) required_fields = ['change_type', 'changed_object_key', 'changed_object_type'] for json_data in objects: priority = json_data.get('priority') if priority and priority not in [0, -1, 1]: self.raise_error_bad_validation(logger, 'Invalid priority: Priority must be either 0, -1, or 1.', 409) for field in required_fields: value = json_data.get(field) if value is None: self.raise_error_bad_validation(logger, f'Missing required field: {field}', 409) def do_additional_setup(self, owner, objects, req_source='unknown', method=CRUDMethodTypes.METHOD_UPSERT, transaction_id=None, skip_local_failure=False): """ @type owner: string @param owner: user who is performing this operation @type objects: list of dictionary @param objects: list of objects being written @type req_source: string @param req_source: string identifying source of this request @return: none, throws exceptions on errors """ for json_data in objects: if 'priority' not in json_data: json_data['priority'] = 0 if 'number_of_failures' not in json_data: json_data['number_of_failures'] = 0 if 'status' in json_data: del json_data['status'] return def get(self, owner, object_id, req_source='unknown', transaction_id=None): """ Over the parent method of retrieves object by id, add queue_time to the data @type owner: basestring @param owner: user who is performing this operation @type object_id: basestring @type object_id: string @param object_id: id of object to retrieve @type req_source: basestring @type req_source: string @param req_source: identified source initiating the operation @rtype: dictionary @return: object matching id on success, empty rows if object is not found, throws exceptions on errors """ job_data = super(ItsiRefreshQueueJob, self).get( owner, object_id, req_source, transaction_id ) if job_data: self._add_queue_time(job_data) self._add_status(job_data) return job_data def get_bulk( self, owner, sort_key=None, sort_dir=None, filter_data=None, fields=None, skip=None, limit=None, req_source="unknown", transaction_id=None, ): """ Override the parent method of retrieves objects matching criteria, if no filtering specified, retrieves all objects of this object type, add queue_time @type owner: string @param owner: user who is performing this operation @type sort_key: string @param sort_key: string defining keys to sort by @type sort_dir: string @param sort_dir: string defining direction for sorting - asc or desc @type filter_data: dictionary @param filter_data: json filter constructed to filter data. Follows mongodb syntax @type fields: list @param fields: list of fields to retrieve, fetches all fields if not specified @type skip: number @param skip: number of items to skip from the start @type limit: number @param limit: maximum number of items to return @type req_source: string @param req_source: identified source initiating the operation @type transaction_id: string @param transaction_id: transaction id @rtype: list of dictionary @return: objects retrieved on success, throws exceptions on errors """ job_data_list = super(ItsiRefreshQueueJob, self).get_bulk( owner, sort_key, sort_dir, filter_data, fields, skip, limit, req_source, transaction_id, ) for job_data in job_data_list: self._add_queue_time(job_data) self._add_status(job_data) return job_data_list def fetch_queued_jobs(self, owner, **kwargs): """ Fetch jobs that are considered queued (non-blocking). :param owner: user who is performing this operation :type owner: string :return: objects retrieved on success, throws exceptions on errors :rtype: list of dictionary """ return self.get_bulk(owner, filter_data={"priority": {"$ne": -1}}, **kwargs) def get_queue_size(self, owner, **kwargs): """ Return the current queue size @type owner: string @param owner: user who is performing this operation @return the count of jobs in the refresh queue awaiting processing @rtype int """ return len(self.fetch_queued_jobs(owner, fields=["_key"], **kwargs)) def get_failed_jobs(self, owner, **kwargs): """ Return jobs marked as failed @type owner: string @param owner: user who is performing this operation @return the list of failed jobs @rtype list(dict) """ return self.get_bulk( owner, filter_data={"priority": -1, "number_of_failures": {"$gt": 0}}, **kwargs, ) def get_skipped_jobs(self, owner, **kwargs): """ Return jobs marked as skipped @type owner: string @param owner: user who is performing this operation @return the list of skipped jobs @rtype list(dict) """ return self.get_bulk( owner, filter_data={"priority": -1, "number_of_failures": 0}, **kwargs, ) def _add_queue_time(self, job_data): """ Add queue time field to refresh queue job :type job_data: dict :param job_data: dict containing refresh queue job info """ last_queued_time = job_data.get("last_queued_time") last_saved_queue_time = job_data.get("last_saved_queue_time", 0) if job_data.get("number_of_failures", 0) > 0 or last_queued_time is None: queue_time = last_saved_queue_time else: queue_time = time.time() - last_queued_time + last_saved_queue_time job_data["queue_time"] = queue_time def _add_status(self, job): """ Add status field to refresh queue job :param job: dict containing refresh queue job info :type job: dict """ processor = job.get("processor") priority = job.get("priority", 0) number_of_failures = job.get("number_of_failures", 0) if processor: job["status"] = STATUS_IN_PROGRESS elif priority == -1: job["status"] = STATUS_FAILED if number_of_failures > 0 else STATUS_SKIPPED else: job["status"] = STATUS_QUEUED def wait_for_unblocked_queue(self): """ Wait for the refresh queue to be unblocked :return: Is the queue unblocked? :rtype: Boolean """ is_queue_blocked = True total_retries = 6 retry = 0 while is_queue_blocked and retry < total_retries: if self.get_queue_size("nobody") == 0: is_queue_blocked = False else: time.sleep(5) retry += 1 logger.info(f"waiting for empty queue (retry: {retry})") return not is_queue_blocked