You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
265 lines
9.1 KiB
265 lines
9.1 KiB
# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
|
|
import time
|
|
|
|
from ITOA.setup_logging import logger
|
|
from ITOA.itoa_object import ItoaObject, CRUDMethodTypes
|
|
|
|
STATUS_IN_PROGRESS = "In-Progress"
|
|
STATUS_QUEUED = "Queued"
|
|
STATUS_SKIPPED = "Skipped"
|
|
STATUS_FAILED = "Failed"
|
|
|
|
|
|
class ItsiRefreshQueueJob(ItoaObject):
|
|
"""
|
|
Implements ITSI Refresh Queue Job
|
|
"""
|
|
|
|
log_prefix = '[ITSI Refresh Queue Job] '
|
|
collection_name = 'itsi_refresh_queue'
|
|
logger = logger
|
|
|
|
def __init__(self, session_key, current_user_name):
|
|
super(ItsiRefreshQueueJob, self).__init__(session_key, current_user_name, 'refresh_queue_job',
|
|
collection_name=self.collection_name, title_validation_required=False)
|
|
|
|
def do_object_validation(self, owner, objects, validate_name=True, dupname_tag=None, transaction_id=None,
|
|
skip_local_failure=False):
|
|
"""
|
|
Generic object validation routine for the refresh job.
|
|
It consists of title-related validation and specific checks for required fields:
|
|
'change_type', 'changed_object_key', 'changed_object_type'. It also validates the 'priority' field.
|
|
All new object level validation should be invoked from here.
|
|
|
|
@type objects: list[dict]
|
|
@param objects: list of dict
|
|
@return: None. Throws exceptions on validation errors.
|
|
"""
|
|
super(ItsiRefreshQueueJob, self).do_object_validation(owner, objects, validate_name, dupname_tag,
|
|
transaction_id, skip_local_failure)
|
|
required_fields = ['change_type', 'changed_object_key', 'changed_object_type']
|
|
for json_data in objects:
|
|
priority = json_data.get('priority')
|
|
if priority and priority not in [0, -1, 1]:
|
|
self.raise_error_bad_validation(logger, 'Invalid priority: Priority must be either 0, -1, or 1.', 409)
|
|
for field in required_fields:
|
|
value = json_data.get(field)
|
|
if value is None:
|
|
self.raise_error_bad_validation(logger, f'Missing required field: {field}', 409)
|
|
|
|
def do_additional_setup(self, owner, objects, req_source='unknown', method=CRUDMethodTypes.METHOD_UPSERT,
|
|
transaction_id=None, skip_local_failure=False):
|
|
"""
|
|
@type owner: string
|
|
@param owner: user who is performing this operation
|
|
@type objects: list of dictionary
|
|
@param objects: list of objects being written
|
|
@type req_source: string
|
|
@param req_source: string identifying source of this request
|
|
@return: none, throws exceptions on errors
|
|
"""
|
|
for json_data in objects:
|
|
if 'priority' not in json_data:
|
|
json_data['priority'] = 0
|
|
if 'number_of_failures' not in json_data:
|
|
json_data['number_of_failures'] = 0
|
|
if 'status' in json_data:
|
|
del json_data['status']
|
|
return
|
|
|
|
def get(self, owner, object_id, req_source='unknown', transaction_id=None):
|
|
"""
|
|
Over the parent method of retrieves object by id, add queue_time to the data
|
|
@type owner: basestring
|
|
@param owner: user who is performing this operation
|
|
@type object_id: basestring
|
|
|
|
@type object_id: string
|
|
@param object_id: id of object to retrieve
|
|
@type req_source: basestring
|
|
|
|
@type req_source: string
|
|
@param req_source: identified source initiating the operation
|
|
|
|
@rtype: dictionary
|
|
@return: object matching id on success, empty rows if object is not found, throws exceptions on errors
|
|
"""
|
|
job_data = super(ItsiRefreshQueueJob, self).get(
|
|
owner, object_id, req_source, transaction_id
|
|
)
|
|
if job_data:
|
|
self._add_queue_time(job_data)
|
|
self._add_status(job_data)
|
|
return job_data
|
|
|
|
def get_bulk(
|
|
self,
|
|
owner,
|
|
sort_key=None,
|
|
sort_dir=None,
|
|
filter_data=None,
|
|
fields=None,
|
|
skip=None,
|
|
limit=None,
|
|
req_source="unknown",
|
|
transaction_id=None,
|
|
):
|
|
"""
|
|
Override the parent method of retrieves objects matching criteria, if no filtering specified, retrieves all
|
|
objects of this object type, add queue_time
|
|
|
|
@type owner: string
|
|
@param owner: user who is performing this operation
|
|
|
|
@type sort_key: string
|
|
@param sort_key: string defining keys to sort by
|
|
|
|
@type sort_dir: string
|
|
@param sort_dir: string defining direction for sorting - asc or desc
|
|
|
|
@type filter_data: dictionary
|
|
@param filter_data: json filter constructed to filter data. Follows mongodb syntax
|
|
|
|
@type fields: list
|
|
@param fields: list of fields to retrieve, fetches all fields if not specified
|
|
|
|
@type skip: number
|
|
@param skip: number of items to skip from the start
|
|
|
|
@type limit: number
|
|
@param limit: maximum number of items to return
|
|
|
|
@type req_source: string
|
|
@param req_source: identified source initiating the operation
|
|
|
|
@type transaction_id: string
|
|
@param transaction_id: transaction id
|
|
|
|
@rtype: list of dictionary
|
|
@return: objects retrieved on success, throws exceptions on errors
|
|
"""
|
|
job_data_list = super(ItsiRefreshQueueJob, self).get_bulk(
|
|
owner,
|
|
sort_key,
|
|
sort_dir,
|
|
filter_data,
|
|
fields,
|
|
skip,
|
|
limit,
|
|
req_source,
|
|
transaction_id,
|
|
)
|
|
for job_data in job_data_list:
|
|
self._add_queue_time(job_data)
|
|
self._add_status(job_data)
|
|
return job_data_list
|
|
|
|
def fetch_queued_jobs(self, owner, **kwargs):
|
|
"""
|
|
Fetch jobs that are considered queued (non-blocking).
|
|
|
|
:param owner: user who is performing this operation
|
|
:type owner: string
|
|
|
|
:return: objects retrieved on success, throws exceptions on errors
|
|
:rtype: list of dictionary
|
|
"""
|
|
return self.get_bulk(owner, filter_data={"priority": {"$ne": -1}}, **kwargs)
|
|
|
|
def get_queue_size(self, owner, **kwargs):
|
|
"""
|
|
Return the current queue size
|
|
|
|
@type owner: string
|
|
@param owner: user who is performing this operation
|
|
|
|
@return the count of jobs in the refresh queue awaiting processing
|
|
@rtype int
|
|
"""
|
|
return len(self.fetch_queued_jobs(owner, fields=["_key"], **kwargs))
|
|
|
|
def get_failed_jobs(self, owner, **kwargs):
|
|
"""
|
|
Return jobs marked as failed
|
|
|
|
@type owner: string
|
|
@param owner: user who is performing this operation
|
|
|
|
@return the list of failed jobs
|
|
@rtype list(dict)
|
|
"""
|
|
return self.get_bulk(
|
|
owner,
|
|
filter_data={"priority": -1, "number_of_failures": {"$gt": 0}},
|
|
**kwargs,
|
|
)
|
|
|
|
def get_skipped_jobs(self, owner, **kwargs):
|
|
"""
|
|
Return jobs marked as skipped
|
|
|
|
@type owner: string
|
|
@param owner: user who is performing this operation
|
|
|
|
@return the list of skipped jobs
|
|
@rtype list(dict)
|
|
"""
|
|
return self.get_bulk(
|
|
owner,
|
|
filter_data={"priority": -1, "number_of_failures": 0},
|
|
**kwargs,
|
|
)
|
|
|
|
def _add_queue_time(self, job_data):
|
|
"""
|
|
Add queue time field to refresh queue job
|
|
|
|
:type job_data: dict
|
|
:param job_data: dict containing refresh queue job info
|
|
"""
|
|
last_queued_time = job_data.get("last_queued_time")
|
|
last_saved_queue_time = job_data.get("last_saved_queue_time", 0)
|
|
if job_data.get("number_of_failures", 0) > 0 or last_queued_time is None:
|
|
queue_time = last_saved_queue_time
|
|
else:
|
|
queue_time = time.time() - last_queued_time + last_saved_queue_time
|
|
job_data["queue_time"] = queue_time
|
|
|
|
def _add_status(self, job):
|
|
"""
|
|
Add status field to refresh queue job
|
|
|
|
:param job: dict containing refresh queue job info
|
|
:type job: dict
|
|
"""
|
|
processor = job.get("processor")
|
|
priority = job.get("priority", 0)
|
|
number_of_failures = job.get("number_of_failures", 0)
|
|
|
|
if processor:
|
|
job["status"] = STATUS_IN_PROGRESS
|
|
elif priority == -1:
|
|
job["status"] = STATUS_FAILED if number_of_failures > 0 else STATUS_SKIPPED
|
|
else:
|
|
job["status"] = STATUS_QUEUED
|
|
|
|
def wait_for_unblocked_queue(self):
|
|
"""
|
|
Wait for the refresh queue to be unblocked
|
|
|
|
:return: Is the queue unblocked?
|
|
:rtype: Boolean
|
|
"""
|
|
is_queue_blocked = True
|
|
total_retries = 6
|
|
retry = 0
|
|
|
|
while is_queue_blocked and retry < total_retries:
|
|
if self.get_queue_size("nobody") == 0:
|
|
is_queue_blocked = False
|
|
else:
|
|
time.sleep(5)
|
|
retry += 1
|
|
logger.info(f"waiting for empty queue (retry: {retry})")
|
|
return not is_queue_blocked
|